]> git.saurik.com Git - apple/xnu.git/blobdiff - osfmk/ipc/ipc_mqueue.c
xnu-6153.121.1.tar.gz
[apple/xnu.git] / osfmk / ipc / ipc_mqueue.c
index 4bcb66adc9fddf38f177f921f3bb3b14a107570d..360879748d76c9db446377ebad1bd6cdc0c907e2 100644 (file)
@@ -2,7 +2,7 @@
  * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
  *
  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
- * 
+ *
  * This file contains Original Code and/or Modifications of Original Code
  * as defined in and that are subject to the Apple Public Source License
  * Version 2.0 (the 'License'). You may not use this file except in
  * unlawful or unlicensed copies of an Apple operating system, or to
  * circumvent, violate, or enable the circumvention or violation of, any
  * terms of an Apple operating system software license agreement.
- * 
+ *
  * Please obtain a copy of the License at
  * http://www.opensource.apple.com/apsl/ and read it before using this file.
- * 
+ *
  * The Original Code and all software distributed under the License are
  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
  * Please see the License for the specific language governing rights and
  * limitations under the License.
- * 
+ *
  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
  */
 /*
  * @OSF_FREE_COPYRIGHT@
  */
-/* 
+/*
  * Mach Operating System
  * Copyright (c) 1991,1990,1989 Carnegie Mellon University
  * All Rights Reserved.
- * 
+ *
  * Permission to use, copy, modify and distribute this software and its
  * documentation is hereby granted, provided that both the copyright
  * notice and this permission notice appear in all copies of the
  * software, derivative works or modified versions, and any portions
  * thereof, and that both notices appear in supporting documentation.
- * 
+ *
  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
  * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
- * 
+ *
  * Carnegie Mellon requests users of this software to return to
- * 
+ *
  *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
  *  School of Computer Science
  *  Carnegie Mellon University
  *  Pittsburgh PA 15213-3890
- * 
+ *
  * any improvements or extensions that they make and grant Carnegie Mellon
  * the rights to redistribute these changes.
  */
@@ -68,7 +68,7 @@
  * is included in support of clause 2.2 (b) of the Apple Public License,
  * Version 2.0.
  */
-    
+
 
 #include <mach/port.h>
 #include <mach/message.h>
 #include <kern/counters.h>
 #include <kern/sched_prim.h>
 #include <kern/ipc_kobject.h>
-#include <kern/ipc_mig.h>      /* XXX - for mach_msg_receive_continue */
+#include <kern/ipc_mig.h>       /* XXX - for mach_msg_receive_continue */
 #include <kern/misc_protos.h>
 #include <kern/task.h>
 #include <kern/thread.h>
-#include <kern/wait_queue.h>
+#include <kern/waitq.h>
 
+#include <ipc/port.h>
 #include <ipc/ipc_mqueue.h>
 #include <ipc/ipc_kmsg.h>
 #include <ipc/ipc_port.h>
 #include <ipc/ipc_pset.h>
 #include <ipc/ipc_space.h>
 
+#if MACH_FLIPC
+#include <ipc/flipc.h>
+#endif
+
 #ifdef __LP64__
 #include <vm/vm_map.h>
 #endif
 
-#if CONFIG_MACF_MACH
-#include <security/mac_mach_internal.h>
-#endif
+#include <sys/event.h>
+
+extern char     *proc_name_address(void *p);
 
-int ipc_mqueue_full;           /* address is event for queue space */
-int ipc_mqueue_rcv;            /* address is event for message arrival */
+int ipc_mqueue_full;            /* address is event for queue space */
+int ipc_mqueue_rcv;             /* address is event for message arrival */
 
 /* forward declarations */
-void ipc_mqueue_receive_results(wait_result_t result);
+static void ipc_mqueue_receive_results(wait_result_t result);
+static void ipc_mqueue_peek_on_thread(
+       ipc_mqueue_t        port_mq,
+       mach_msg_option_t   option,
+       thread_t            thread);
 
 /*
  *     Routine:        ipc_mqueue_init
@@ -111,21 +120,80 @@ void ipc_mqueue_receive_results(wait_result_t result);
  */
 void
 ipc_mqueue_init(
-       ipc_mqueue_t    mqueue,
-       boolean_t       is_set)
+       ipc_mqueue_t            mqueue,
+       ipc_mqueue_kind_t       kind)
 {
-       if (is_set) {
-               wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST);
-       } else {
-               wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
+       switch (kind) {
+       case IPC_MQUEUE_KIND_SET:
+               waitq_set_init(&mqueue->imq_set_queue,
+                   SYNC_POLICY_FIFO | SYNC_POLICY_PREPOST,
+                   NULL, NULL);
+               break;
+       case IPC_MQUEUE_KIND_NONE: /* cheat: we really should have "no" mqueue */
+       case IPC_MQUEUE_KIND_PORT:
+               waitq_init(&mqueue->imq_wait_queue,
+                   SYNC_POLICY_FIFO | SYNC_POLICY_TURNSTILE_PROXY);
                ipc_kmsg_queue_init(&mqueue->imq_messages);
                mqueue->imq_seqno = 0;
                mqueue->imq_msgcount = 0;
                mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
+               mqueue->imq_context = 0;
                mqueue->imq_fullwaiters = FALSE;
+#if MACH_FLIPC
+               mqueue->imq_fport = FPORT_NULL;
+#endif
+               break;
        }
+       klist_init(&mqueue->imq_klist);
 }
 
+void
+ipc_mqueue_deinit(
+       ipc_mqueue_t            mqueue)
+{
+       boolean_t is_set = imq_is_set(mqueue);
+
+       if (is_set) {
+               waitq_set_deinit(&mqueue->imq_set_queue);
+       } else {
+               waitq_deinit(&mqueue->imq_wait_queue);
+       }
+}
+
+/*
+ *     Routine:        imq_reserve_and_lock
+ *     Purpose:
+ *             Atomically lock an ipc_mqueue_t object and reserve
+ *             an appropriate number of prepost linkage objects for
+ *             use in wakeup operations.
+ *     Conditions:
+ *             mq is unlocked
+ */
+void
+imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost)
+{
+       *reserved_prepost = waitq_prepost_reserve(&mq->imq_wait_queue, 0,
+           WAITQ_KEEP_LOCKED);
+}
+
+
+/*
+ *     Routine:        imq_release_and_unlock
+ *     Purpose:
+ *             Unlock an ipc_mqueue_t object, re-enable interrupts,
+ *             and release any unused prepost object reservations.
+ *     Conditions:
+ *             mq is locked
+ */
+void
+imq_release_and_unlock(ipc_mqueue_t mq, uint64_t reserved_prepost)
+{
+       assert(imq_held(mq));
+       waitq_unlock(&mq->imq_wait_queue);
+       waitq_prepost_release_reserve(reserved_prepost);
+}
+
+
 /*
  *     Routine:        ipc_mqueue_member
  *     Purpose:
@@ -140,14 +208,13 @@ ipc_mqueue_init(
 
 boolean_t
 ipc_mqueue_member(
-       ipc_mqueue_t            port_mqueue,
-       ipc_mqueue_t            set_mqueue)
+       ipc_mqueue_t            port_mqueue,
+       ipc_mqueue_t            set_mqueue)
 {
-       wait_queue_t    port_waitq = &port_mqueue->imq_wait_queue;
-       wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
-
-       return (wait_queue_member(port_waitq, set_waitq));
+       struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
+       struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
 
+       return waitq_member(port_waitq, set_waitq);
 }
 
 /*
@@ -159,14 +226,13 @@ ipc_mqueue_member(
 
 kern_return_t
 ipc_mqueue_remove(
-       ipc_mqueue_t      mqueue,
-       ipc_mqueue_t      set_mqueue,
-       wait_queue_link_t *wqlp)
+       ipc_mqueue_t      mqueue,
+       ipc_mqueue_t      set_mqueue)
 {
-       wait_queue_t     mq_waitq = &mqueue->imq_wait_queue;
-       wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
+       struct waitq *mq_waitq = &mqueue->imq_wait_queue;
+       struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
 
-       return wait_queue_unlink_nofree(mq_waitq, set_waitq, wqlp);
+       return waitq_unlink(mq_waitq, set_waitq);
 }
 
 /*
@@ -175,34 +241,41 @@ ipc_mqueue_remove(
  *             Remove the mqueue from all the sets it is a member of
  *     Conditions:
  *             Nothing locked.
+ *     Returns:
+ *             mqueue unlocked and set links deallocated
  */
 void
-ipc_mqueue_remove_from_all(
-       ipc_mqueue_t    mqueue,
-       queue_t         links)
+ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue)
 {
-       wait_queue_t    mq_waitq = &mqueue->imq_wait_queue;
+       struct waitq *mq_waitq = &mqueue->imq_wait_queue;
+       kern_return_t kr;
 
-       wait_queue_unlink_all_nofree(mq_waitq, links);
-       return;
+       imq_lock(mqueue);
+
+       assert(waitq_valid(mq_waitq));
+       kr = waitq_unlink_all_unlock(mq_waitq);
+       /* mqueue unlocked and set links deallocated */
 }
 
 /*
  *     Routine:        ipc_mqueue_remove_all
  *     Purpose:
  *             Remove all the member queues from the specified set.
+ *             Also removes the queue from any containing sets.
  *     Conditions:
  *             Nothing locked.
+ *     Returns:
+ *             mqueue unlocked all set links deallocated
  */
 void
-ipc_mqueue_remove_all(
-       ipc_mqueue_t    mqueue,
-       queue_t         links)
+ipc_mqueue_remove_all(ipc_mqueue_t      mqueue)
 {
-       wait_queue_set_t        mq_setq = &mqueue->imq_set_queue;
+       struct waitq_set *mq_setq = &mqueue->imq_set_queue;
 
-       wait_queue_set_unlink_all_nofree(mq_setq, links);
-       return;
+       imq_lock(mqueue);
+       assert(waitqs_is_set(mq_setq));
+       waitq_set_unlink_all_unlock(mq_setq);
+       /* mqueue unlocked set links deallocated */
 }
 
 
@@ -219,47 +292,59 @@ ipc_mqueue_remove_all(
  */
 kern_return_t
 ipc_mqueue_add(
-       ipc_mqueue_t     port_mqueue,
-       ipc_mqueue_t     set_mqueue,
-       wait_queue_link_t wql)
+       ipc_mqueue_t    port_mqueue,
+       ipc_mqueue_t    set_mqueue,
+       uint64_t        *reserved_link,
+       uint64_t        *reserved_prepost)
 {
-       wait_queue_t     port_waitq = &port_mqueue->imq_wait_queue;
-       wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
+       struct waitq     *port_waitq = &port_mqueue->imq_wait_queue;
+       struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
        ipc_kmsg_queue_t kmsgq;
        ipc_kmsg_t       kmsg, next;
-       kern_return_t    kr;
-       spl_t            s;
+       kern_return_t    kr;
 
-       kr = wait_queue_link_noalloc(port_waitq, set_waitq, wql);
-       if (kr != KERN_SUCCESS)
+       assert(reserved_link && *reserved_link != 0);
+       assert(waitqs_is_linked(set_waitq));
+
+       imq_lock(port_mqueue);
+
+       /*
+        * The link operation is now under the same lock-hold as
+        * message iteration and thread wakeup, but doesn't have to be...
+        */
+       kr = waitq_link(port_waitq, set_waitq, WAITQ_ALREADY_LOCKED, reserved_link);
+       if (kr != KERN_SUCCESS) {
+               imq_unlock(port_mqueue);
                return kr;
+       }
 
        /*
         * Now that the set has been added to the port, there may be
         * messages queued on the port and threads waiting on the set
         * waitq.  Lets get them together.
         */
-       s = splsched();
-       imq_lock(port_mqueue);
        kmsgq = &port_mqueue->imq_messages;
        for (kmsg = ipc_kmsg_queue_first(kmsgq);
-            kmsg != IKM_NULL;
-            kmsg = next) {
+           kmsg != IKM_NULL;
+           kmsg = next) {
                next = ipc_kmsg_queue_next(kmsgq, kmsg);
 
                for (;;) {
                        thread_t th;
                        mach_msg_size_t msize;
+                       spl_t th_spl;
 
-                       th = wait_queue_wakeup64_identity_locked(
-                                               port_waitq,
-                                               IPC_MQUEUE_RECEIVE,
-                                               THREAD_AWAKENED,
-                                               FALSE);
+                       th = waitq_wakeup64_identify_locked(
+                               port_waitq,
+                               IPC_MQUEUE_RECEIVE,
+                               THREAD_AWAKENED, &th_spl,
+                               reserved_prepost, WAITQ_ALL_PRIORITIES,
+                               WAITQ_KEEP_LOCKED);
                        /* waitq/mqueue still locked, thread locked */
 
-                       if (th == THREAD_NULL)
+                       if (th == THREAD_NULL) {
                                goto leave;
+                       }
 
                        /*
                         * If the receiver waited with a facility not directly
@@ -268,8 +353,20 @@ ipc_mqueue_add(
                         * go look for another thread that can.
                         */
                        if (th->ith_state != MACH_RCV_IN_PROGRESS) {
-                                 thread_unlock(th);
-                                 continue;
+                               if (th->ith_state == MACH_PEEK_IN_PROGRESS) {
+                                       /*
+                                        * wakeup the peeking thread, but
+                                        * continue to loop over the threads
+                                        * waiting on the port's mqueue to see
+                                        * if there are any actual receivers
+                                        */
+                                       ipc_mqueue_peek_on_thread(port_mqueue,
+                                           th->ith_option,
+                                           th);
+                               }
+                               thread_unlock(th);
+                               splx(th_spl);
+                               continue;
                        }
 
                        /*
@@ -281,8 +378,8 @@ ipc_mqueue_add(
                         * just move onto the next.
                         */
                        msize = ipc_kmsg_copyout_size(kmsg, th->map);
-                       if (th->ith_msize <
-                                       (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(th), th->ith_option))) {
+                       if (th->ith_rsize <
+                           (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th), th->ith_option))) {
                                th->ith_state = MACH_RCV_TOO_LARGE;
                                th->ith_msize = msize;
                                if (th->ith_option & MACH_RCV_LARGE) {
@@ -293,6 +390,7 @@ ipc_mqueue_add(
                                        th->ith_kmsg = IKM_NULL;
                                        th->ith_seqno = 0;
                                        thread_unlock(th);
+                                       splx(th_spl);
                                        continue; /* find another thread */
                                }
                        } else {
@@ -304,21 +402,48 @@ ipc_mqueue_add(
                         * so give it to him.
                         */
                        ipc_kmsg_rmqueue(kmsgq, kmsg);
-                       ipc_mqueue_release_msgcount(port_mqueue);
+#if MACH_FLIPC
+                       mach_node_t  node = kmsg->ikm_node;
+#endif
+                       ipc_mqueue_release_msgcount(port_mqueue, IMQ_NULL);
 
                        th->ith_kmsg = kmsg;
                        th->ith_seqno = port_mqueue->imq_seqno++;
                        thread_unlock(th);
+                       splx(th_spl);
+#if MACH_FLIPC
+                       if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) {
+                               flipc_msg_ack(node, port_mqueue, TRUE);
+                       }
+#endif
                        break;  /* go to next message */
                }
-                       
        }
- leave:
+leave:
        imq_unlock(port_mqueue);
-       splx(s);
        return KERN_SUCCESS;
 }
 
+
+/*
+ *     Routine:        ipc_mqueue_has_klist
+ *     Purpose:
+ *             Returns whether the given mqueue imq_klist field can be used as a klist.
+ */
+static inline bool
+ipc_mqueue_has_klist(ipc_mqueue_t mqueue)
+{
+       ipc_object_t object = imq_to_object(mqueue);
+       if (io_otype(object) != IOT_PORT) {
+               return true;
+       }
+       ipc_port_t port = ip_from_mq(mqueue);
+       if (port->ip_specialreply) {
+               return false;
+       }
+       return port->ip_sync_link_state == PORT_SYNC_LINK_ANY;
+}
+
 /*
  *     Routine:        ipc_mqueue_changed
  *     Purpose:
@@ -326,26 +451,64 @@ ipc_mqueue_add(
  *     Conditions:
  *             The message queue is locked.
  */
-
 void
 ipc_mqueue_changed(
-       ipc_mqueue_t            mqueue)
+       ipc_space_t     space,
+       ipc_mqueue_t    mqueue)
 {
-       wait_queue_wakeup64_all_locked(
-                               &mqueue->imq_wait_queue,
-                               IPC_MQUEUE_RECEIVE,
-                               THREAD_RESTART,
-                               FALSE);         /* unlock waitq? */
+       if (ipc_mqueue_has_klist(mqueue) && SLIST_FIRST(&mqueue->imq_klist)) {
+               /*
+                * Indicate that this message queue is vanishing
+                *
+                * When this is called, the associated receive right may be in flight
+                * between two tasks: the one it used to live in, and the one that armed
+                * a port destroyed notification for it.
+                *
+                * The new process may want to register the port it gets back with an
+                * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
+                * port pending already, in which case we want the imq_klist field to be
+                * reusable for nefarious purposes.
+                *
+                * Fortunately, we really don't need this linkage anymore after this
+                * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
+                *
+                * Note: we don't have the space lock here, however, this covers the
+                *       case of when a task is terminating the space, triggering
+                *       several knote_vanish() calls.
+                *
+                *       We don't need the lock to observe that the space is inactive as
+                *       we just deactivated it on the same thread.
+                *
+                *       We still need to call knote_vanish() so that the knote is
+                *       marked with EV_VANISHED or EV_EOF so that the detach step
+                *       in filt_machportdetach is skipped correctly.
+                */
+               assert(space);
+               knote_vanish(&mqueue->imq_klist, is_active(space));
+       }
+
+       if (io_otype(imq_to_object(mqueue)) == IOT_PORT) {
+               ipc_port_adjust_sync_link_state_locked(ip_from_mq(mqueue), PORT_SYNC_LINK_ANY, NULL);
+       } else {
+               klist_init(&mqueue->imq_klist);
+       }
+
+       waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
+           IPC_MQUEUE_RECEIVE,
+           THREAD_RESTART,
+           NULL,
+           WAITQ_ALL_PRIORITIES,
+           WAITQ_KEEP_LOCKED);
 }
 
 
-               
+
 
 /*
  *     Routine:        ipc_mqueue_send
  *     Purpose:
  *             Send a message to a message queue.  The message holds a reference
- *             for the destination port for this message queue in the 
+ *             for the destination port for this message queue in the
  *             msgh_remote_port field.
  *
  *             If unsuccessful, the caller still has possession of
@@ -360,11 +523,10 @@ ipc_mqueue_changed(
  */
 mach_msg_return_t
 ipc_mqueue_send(
-       ipc_mqueue_t            mqueue,
-       ipc_kmsg_t              kmsg,
-       mach_msg_option_t       option,
-       mach_msg_timeout_t      send_timeout,
-       spl_t                   s)
+       ipc_mqueue_t            mqueue,
+       ipc_kmsg_t              kmsg,
+       mach_msg_option_t       option,
+       mach_msg_timeout_t  send_timeout)
 {
        int wresult;
 
@@ -375,66 +537,88 @@ ipc_mqueue_send(
         *      3) Message is sent to a send-once right.
         */
        if (!imq_full(mqueue) ||
-           (!imq_full_kernel(mqueue) && 
-            ((option & MACH_SEND_ALWAYS) ||
-             (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
-              MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
+           (!imq_full_kernel(mqueue) &&
+           ((option & MACH_SEND_ALWAYS) ||
+           (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
+           MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
                mqueue->imq_msgcount++;
                assert(mqueue->imq_msgcount > 0);
                imq_unlock(mqueue);
-               splx(s);
        } else {
                thread_t cur_thread = current_thread();
+               ipc_port_t port = ip_from_mq(mqueue);
+               struct turnstile *send_turnstile = TURNSTILE_NULL;
                uint64_t deadline;
 
-               /* 
+               /*
                 * We have to wait for space to be granted to us.
                 */
                if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
                        imq_unlock(mqueue);
-                       splx(s);
                        return MACH_SEND_TIMED_OUT;
                }
                if (imq_full_kernel(mqueue)) {
                        imq_unlock(mqueue);
-                       splx(s);
                        return MACH_SEND_NO_BUFFER;
                }
                mqueue->imq_fullwaiters = TRUE;
-               thread_lock(cur_thread);
-               if (option & MACH_SEND_TIMEOUT)
-                       clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
-               else
+
+               if (option & MACH_SEND_TIMEOUT) {
+                       clock_interval_to_deadline(send_timeout, 1000 * NSEC_PER_USEC, &deadline);
+               } else {
                        deadline = 0;
-               wresult = wait_queue_assert_wait64_locked(
-                                               &mqueue->imq_wait_queue,
-                                               IPC_MQUEUE_FULL,
-                                               THREAD_ABORTSAFE,
-                                               TIMEOUT_URGENCY_USER_NORMAL,
-                                               deadline, 0,
-                                               cur_thread);
-               thread_unlock(cur_thread);
+               }
+
+               thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
+
+               send_turnstile = turnstile_prepare((uintptr_t)port,
+                   port_send_turnstile_address(port),
+                   TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
+
+               ipc_port_send_update_inheritor(port, send_turnstile,
+                   TURNSTILE_DELAYED_UPDATE);
+
+               wresult = waitq_assert_wait64_leeway(
+                       &send_turnstile->ts_waitq,
+                       IPC_MQUEUE_FULL,
+                       THREAD_ABORTSAFE,
+                       TIMEOUT_URGENCY_USER_NORMAL,
+                       deadline,
+                       TIMEOUT_NO_LEEWAY);
+
                imq_unlock(mqueue);
-               splx(s);
-               
+               turnstile_update_inheritor_complete(send_turnstile,
+                   TURNSTILE_INTERLOCK_NOT_HELD);
+
                if (wresult == THREAD_WAITING) {
                        wresult = thread_block(THREAD_CONTINUE_NULL);
                        counter(c_ipc_mqueue_send_block++);
                }
-               
+
+               /* Call turnstile complete with interlock held */
+               imq_lock(mqueue);
+               turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL, TURNSTILE_SYNC_IPC);
+               imq_unlock(mqueue);
+
+               /* Call cleanup after dropping the interlock */
+               turnstile_cleanup();
+
                switch (wresult) {
+               case THREAD_AWAKENED:
+                       /*
+                        * we can proceed - inherited msgcount from waker
+                        * or the message queue has been destroyed and the msgcount
+                        * has been reset to zero (will detect in ipc_mqueue_post()).
+                        */
+                       break;
+
                case THREAD_TIMED_OUT:
                        assert(option & MACH_SEND_TIMEOUT);
                        return MACH_SEND_TIMED_OUT;
-                       
-               case THREAD_AWAKENED:
-                       /* we can proceed - inherited msgcount from waker */
-                       assert(mqueue->imq_msgcount > 0);
-                       break;
-                       
+
                case THREAD_INTERRUPTED:
                        return MACH_SEND_INTERRUPTED;
-                       
+
                case THREAD_RESTART:
                        /* mqueue is being destroyed */
                        return MACH_SEND_INVALID_DEST;
@@ -443,10 +627,65 @@ ipc_mqueue_send(
                }
        }
 
-       ipc_mqueue_post(mqueue, kmsg);
+       ipc_mqueue_post(mqueue, kmsg, option);
        return MACH_MSG_SUCCESS;
 }
 
+/*
+ *     Routine:        ipc_mqueue_override_send
+ *     Purpose:
+ *             Set an override qos on the first message in the queue
+ *             (if the queue is full). This is a send-possible override
+ *             that will go away as soon as we drain a message from the
+ *             queue.
+ *
+ *     Conditions:
+ *             The message queue is not locked.
+ *             The caller holds a reference on the message queue.
+ */
+extern void
+ipc_mqueue_override_send(
+       ipc_mqueue_t        mqueue,
+       mach_msg_priority_t override)
+{
+       boolean_t __unused full_queue_empty = FALSE;
+
+       imq_lock(mqueue);
+       assert(imq_valid(mqueue));
+       assert(!imq_is_set(mqueue));
+
+       if (imq_full(mqueue)) {
+               ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
+
+               if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override)) {
+                       ipc_object_t object = imq_to_object(mqueue);
+                       assert(io_otype(object) == IOT_PORT);
+                       ipc_port_t port = ip_object_to_port(object);
+                       if (ip_active(port) &&
+                           port->ip_receiver_name != MACH_PORT_NULL &&
+                           is_active(port->ip_receiver) &&
+                           ipc_mqueue_has_klist(mqueue)) {
+                               KNOTE(&mqueue->imq_klist, 0);
+                       }
+               }
+               if (!first) {
+                       full_queue_empty = TRUE;
+               }
+       }
+       imq_unlock(mqueue);
+
+#if DEVELOPMENT || DEBUG
+       if (full_queue_empty) {
+               ipc_port_t port = ip_from_mq(mqueue);
+               int dst_pid = 0;
+               if (ip_active(port) && !port->ip_tempowner &&
+                   port->ip_receiver_name && port->ip_receiver &&
+                   port->ip_receiver != ipc_space_kernel) {
+                       dst_pid = task_pid(port->ip_receiver->is_task);
+               }
+       }
+#endif
+}
 
 /*
  *     Routine:        ipc_mqueue_release_msgcount
@@ -457,28 +696,49 @@ ipc_mqueue_send(
  *     Conditions:
  *             The message queue is locked.
  *             The message corresponding to this reference is off the queue.
+ *             There is no need to pass reserved preposts because this will
+ *             never prepost to anyone
  */
 void
-ipc_mqueue_release_msgcount(
-       ipc_mqueue_t mqueue)    
+ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq)
 {
-       assert(imq_held(mqueue));
-       assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
-
-       mqueue->imq_msgcount--;
-
-       if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
-               if (wait_queue_wakeup64_one_locked(
-                                               &mqueue->imq_wait_queue,
-                                               IPC_MQUEUE_FULL,
-                                               THREAD_AWAKENED,
-                                               FALSE) != KERN_SUCCESS) {
-                       mqueue->imq_fullwaiters = FALSE;
+       struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(port_mq));
+       (void)set_mq;
+       assert(imq_held(port_mq));
+       assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
+
+       port_mq->imq_msgcount--;
+
+       if (!imq_full(port_mq) && port_mq->imq_fullwaiters &&
+           send_turnstile != TURNSTILE_NULL) {
+               /*
+                * boost the priority of the awoken thread
+                * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
+                * the message queue slot we've just reserved.
+                *
+                * NOTE: this will never prepost
+                *
+                * The wakeup happens on a turnstile waitq
+                * which will wakeup the highest priority waiter.
+                * A potential downside of this would be starving low
+                * priority senders if there is a constant churn of
+                * high priority threads trying to send to this port.
+                */
+               if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
+                   IPC_MQUEUE_FULL,
+                   THREAD_AWAKENED,
+                   WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
+                       port_mq->imq_fullwaiters = FALSE;
                } else {
                        /* gave away our slot - add reference back */
-                       mqueue->imq_msgcount++; 
+                       port_mq->imq_msgcount++;
                }
        }
+
+       if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
+               /* no more msgs: invalidate the port's prepost object */
+               waitq_clear_prepost_locked(&port_mq->imq_wait_queue);
+       }
 }
 
 /*
@@ -489,14 +749,19 @@ ipc_mqueue_release_msgcount(
  *             the message queue.
  *
  *     Conditions:
+ *             mqueue is unlocked
  *             If we need to queue, our space in the message queue is reserved.
  */
 void
 ipc_mqueue_post(
-       register ipc_mqueue_t   mqueue,
-       register ipc_kmsg_t             kmsg)
+       ipc_mqueue_t               mqueue,
+       ipc_kmsg_t                 kmsg,
+       mach_msg_option_t __unused option)
 {
-       spl_t s;
+       uint64_t reserved_prepost = 0;
+       boolean_t destroy_msg = FALSE;
+
+       ipc_kmsg_trace_send(kmsg, option);
 
        /*
         *      While the msg queue     is locked, we have control of the
@@ -504,49 +769,108 @@ ipc_mqueue_post(
         *
         *      Check for a receiver for the message.
         */
-       s = splsched();
-       imq_lock(mqueue);
+       imq_reserve_and_lock(mqueue, &reserved_prepost);
+
+       /* we may have raced with port destruction! */
+       if (!imq_valid(mqueue)) {
+               destroy_msg = TRUE;
+               goto out_unlock;
+       }
+
        for (;;) {
-               wait_queue_t waitq = &mqueue->imq_wait_queue;
+               struct waitq *waitq = &mqueue->imq_wait_queue;
+               spl_t th_spl;
                thread_t receiver;
                mach_msg_size_t msize;
 
-               receiver = wait_queue_wakeup64_identity_locked(
-                                                       waitq,
-                                                       IPC_MQUEUE_RECEIVE,
-                                                       THREAD_AWAKENED,
-                                                       FALSE);
+               receiver = waitq_wakeup64_identify_locked(waitq,
+                   IPC_MQUEUE_RECEIVE,
+                   THREAD_AWAKENED,
+                   &th_spl,
+                   &reserved_prepost,
+                   WAITQ_ALL_PRIORITIES,
+                   WAITQ_KEEP_LOCKED);
                /* waitq still locked, thread locked */
 
                if (receiver == THREAD_NULL) {
-                       /* 
-                        * no receivers; queue kmsg
+                       /*
+                        * no receivers; queue kmsg if space still reserved
+                        * Reservations are cancelled when the port goes inactive.
+                        * note that this will enqueue the message for any
+                        * "peeking" receivers.
+                        *
+                        * Also, post the knote to wake up any threads waiting
+                        * on that style of interface if this insertion is of
+                        * note (first insertion, or adjusted override qos all
+                        * the way to the head of the queue).
+                        *
+                        * This is just for ports. portset knotes are stay-active,
+                        * and their threads get awakened through the !MACH_RCV_IN_PROGRESS
+                        * logic below).
                         */
-                       assert(mqueue->imq_msgcount > 0);
-                       ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
-                       break;
+                       if (mqueue->imq_msgcount > 0) {
+                               if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
+                                       /* if the space is dead there is no point calling KNOTE */
+                                       ipc_object_t object = imq_to_object(mqueue);
+                                       assert(io_otype(object) == IOT_PORT);
+                                       ipc_port_t port = ip_object_to_port(object);
+                                       if (ip_active(port) &&
+                                           port->ip_receiver_name != MACH_PORT_NULL &&
+                                           is_active(port->ip_receiver) &&
+                                           ipc_mqueue_has_klist(mqueue)) {
+                                               KNOTE(&mqueue->imq_klist, 0);
+                                       }
+                               }
+                               break;
+                       }
+
+                       /*
+                        * Otherwise, the message queue must belong to an inactive
+                        * port, so just destroy the message and pretend it was posted.
+                        */
+                       destroy_msg = TRUE;
+                       goto out_unlock;
                }
-       
+
                /*
-                * If the receiver waited with a facility not directly
-                * related to Mach messaging, then it isn't prepared to get
-                * handed the message directly.  Just set it running, and
-                * go look for another thread that can.
+                * If a thread is attempting a "peek" into the message queue
+                * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
+                * thread running.  A successful peek is essentially the same as
+                * message delivery since the peeking thread takes responsibility
+                * for delivering the message and (eventually) removing it from
+                * the mqueue.  Only one thread can successfully use the peek
+                * facility on any given port, so we exit the waitq loop after
+                * encountering such a thread.
+                */
+               if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) {
+                       ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg);
+                       ipc_mqueue_peek_on_thread(mqueue, receiver->ith_option, receiver);
+                       thread_unlock(receiver);
+                       splx(th_spl);
+                       break; /* Message was posted, so break out of loop */
+               }
+
+               /*
+                * If the receiver waited with a facility not directly related
+                * to Mach messaging, then it isn't prepared to get handed the
+                * message directly. Just set it running, and go look for
+                * another thread that can.
                 */
                if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
-                                 thread_unlock(receiver);
-                                 continue;
+                       thread_unlock(receiver);
+                       splx(th_spl);
+                       continue;
                }
 
-       
+
                /*
                 * We found a waiting thread.
                 * If the message is too large or the scatter list is too small
                 * the thread we wake up will get that as its status.
                 */
-               msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
-               if (receiver->ith_msize <
-                               (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(receiver), receiver->ith_option))) {
+               msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
+               if (receiver->ith_rsize <
+                   (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver), receiver->ith_option))) {
                        receiver->ith_msize = msize;
                        receiver->ith_state = MACH_RCV_TOO_LARGE;
                } else {
@@ -560,13 +884,22 @@ ipc_mqueue_post(
                 */
                if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
                    !(receiver->ith_option & MACH_RCV_LARGE)) {
-
                        receiver->ith_kmsg = kmsg;
                        receiver->ith_seqno = mqueue->imq_seqno++;
+#if MACH_FLIPC
+                       mach_node_t node = kmsg->ikm_node;
+#endif
                        thread_unlock(receiver);
+                       splx(th_spl);
 
                        /* we didn't need our reserved spot in the queue */
-                       ipc_mqueue_release_msgcount(mqueue);
+                       ipc_mqueue_release_msgcount(mqueue, IMQ_NULL);
+
+#if MACH_FLIPC
+                       if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) {
+                               flipc_msg_ack(node, mqueue, TRUE);
+                       }
+#endif
                        break;
                }
 
@@ -579,21 +912,27 @@ ipc_mqueue_post(
                receiver->ith_kmsg = IKM_NULL;
                receiver->ith_seqno = 0;
                thread_unlock(receiver);
+               splx(th_spl);
+       }
+
+out_unlock:
+       /* clear the waitq boost we may have been given */
+       waitq_clear_promotion_locked(&mqueue->imq_wait_queue, current_thread());
+       imq_release_and_unlock(mqueue, reserved_prepost);
+       if (destroy_msg) {
+               ipc_kmsg_destroy(kmsg);
        }
 
-       imq_unlock(mqueue);
-       splx(s);
-       
        current_task()->messages_sent++;
        return;
 }
 
 
-/* static */ void
+static void
 ipc_mqueue_receive_results(wait_result_t saved_wait_result)
 {
-       thread_t                self = current_thread();
-       mach_msg_option_t       option = self->ith_option;
+       thread_t                self = current_thread();
+       mach_msg_option_t       option = self->ith_option;
 
        /*
         * why did we wake up?
@@ -632,6 +971,7 @@ ipc_mqueue_receive_results(wait_result_t saved_wait_result)
                        }
 
                case MACH_MSG_SUCCESS:
+               case MACH_PEEK_READY:
                        return;
 
                default:
@@ -657,22 +997,17 @@ ipc_mqueue_receive_continue(
  *     Purpose:
  *             Receive a message from a message queue.
  *
- *             If continuation is non-zero, then we might discard
- *             our kernel stack when we block.  We will continue
- *             after unblocking by executing continuation.
- *
- *             If resume is true, then we are resuming a receive
- *             operation after a blocked receive discarded our stack.
  *     Conditions:
  *             Our caller must hold a reference for the port or port set
  *             to which this queue belongs, to keep the queue
  *             from being deallocated.
  *
  *             The kmsg is returned with clean header fields
- *             and with the circular bit turned off.
+ *             and with the circular bit turned off through the ith_kmsg
+ *             field of the thread's receive continuation state.
  *     Returns:
- *             MACH_MSG_SUCCESS        Message returned in kmsgp.
- *             MACH_RCV_TOO_LARGE      Message size returned in kmsgp.
+ *             MACH_MSG_SUCCESS        Message returned in ith_kmsg.
+ *             MACH_RCV_TOO_LARGE      Message size returned in ith_msize.
  *             MACH_RCV_TIMED_OUT      No message obtained.
  *             MACH_RCV_INTERRUPTED    No message obtained.
  *             MACH_RCV_PORT_DIED      Port/set died; no message.
@@ -689,164 +1024,156 @@ ipc_mqueue_receive(
        int                     interruptible)
 {
        wait_result_t           wresult;
-        thread_t                self = current_thread();
-        
-        wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size,
-                                               rcv_timeout, interruptible,
-                                               self);
-        if (wresult == THREAD_NOT_WAITING)
-                return;
+       thread_t                self = current_thread();
+
+       imq_lock(mqueue);
+       wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size,
+           rcv_timeout, interruptible,
+           self);
+       /* mqueue unlocked */
+       if (wresult == THREAD_NOT_WAITING) {
+               return;
+       }
 
        if (wresult == THREAD_WAITING) {
-               counter((interruptible == THREAD_ABORTSAFE) ? 
-                       c_ipc_mqueue_receive_block_user++ :
-                       c_ipc_mqueue_receive_block_kernel++);
+               counter((interruptible == THREAD_ABORTSAFE) ?
+                   c_ipc_mqueue_receive_block_user++ :
+                   c_ipc_mqueue_receive_block_kernel++);
 
-               if (self->ith_continuation)
+               if (self->ith_continuation) {
                        thread_block(ipc_mqueue_receive_continue);
-                       /* NOTREACHED */
+               }
+               /* NOTREACHED */
 
                wresult = thread_block(THREAD_CONTINUE_NULL);
        }
        ipc_mqueue_receive_results(wresult);
 }
 
+static int
+mqueue_process_prepost_receive(void *ctx, struct waitq *waitq,
+    struct waitq_set *wqset)
+{
+       ipc_mqueue_t     port_mq, *pmq_ptr;
+
+       (void)wqset;
+       port_mq = (ipc_mqueue_t)waitq;
+
+       /*
+        * If there are no messages on this queue, skip it and remove
+        * it from the prepost list
+        */
+       if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
+               return WQ_ITERATE_INVALIDATE_CONTINUE;
+       }
+
+       /*
+        * There are messages waiting on this port.
+        * Instruct the prepost iteration logic to break, but keep the
+        * waitq locked.
+        */
+       pmq_ptr = (ipc_mqueue_t *)ctx;
+       if (pmq_ptr) {
+               *pmq_ptr = port_mq;
+       }
+       return WQ_ITERATE_BREAK_KEEP_LOCKED;
+}
+
+/*
+ *     Routine:        ipc_mqueue_receive_on_thread
+ *     Purpose:
+ *             Receive a message from a message queue using a specified thread.
+ *             If no message available, assert_wait on the appropriate waitq.
+ *
+ *     Conditions:
+ *             Assumes thread is self.
+ *             Called with mqueue locked.
+ *             Returns with mqueue unlocked.
+ *             May have assert-waited. Caller must block in those cases.
+ */
 wait_result_t
 ipc_mqueue_receive_on_thread(
-        ipc_mqueue_t            mqueue,
+       ipc_mqueue_t            mqueue,
        mach_msg_option_t       option,
        mach_msg_size_t         max_size,
        mach_msg_timeout_t      rcv_timeout,
        int                     interruptible,
        thread_t                thread)
 {
-       ipc_kmsg_queue_t        kmsgs;
        wait_result_t           wresult;
-       uint64_t                deadline;
-       spl_t                   s;
-#if CONFIG_MACF_MACH
-       ipc_labelh_t lh;
-       task_t task;
-       int rc;
-#endif
+       uint64_t                deadline;
+       struct turnstile        *rcv_turnstile = TURNSTILE_NULL;
 
-       s = splsched();
-       imq_lock(mqueue);
-       
-       if (imq_is_set(mqueue)) {
-               queue_t q;
+       /* called with mqueue locked */
 
-               q = &mqueue->imq_preposts;
+       /* no need to reserve anything: we never prepost to anyone */
 
+       if (!imq_valid(mqueue)) {
+               /* someone raced us to destroy this mqueue/port! */
+               imq_unlock(mqueue);
                /*
-                * If we are waiting on a portset mqueue, we need to see if
-                * any of the member ports have work for us.  Ports that
-                * have (or recently had) messages will be linked in the
-                * prepost queue for the portset. By holding the portset's
-                * mqueue lock during the search, we tie up any attempts by
-                * mqueue_deliver or portset membership changes that may
-                * cross our path.
+                * ipc_mqueue_receive_results updates the thread's ith_state
+                * TODO: differentiate between rights being moved and
+                * rights/ports being destroyed (21885327)
                 */
-       search_set:
-               while(!queue_empty(q)) {
-                       wait_queue_link_t wql;
-                       ipc_mqueue_t port_mq;
-
-                       queue_remove_first(q, wql, wait_queue_link_t, wql_preposts);
-                       assert(!wql_is_preposted(wql));
+               return THREAD_RESTART;
+       }
 
-                       /*
-                        * This is a lock order violation, so we have to do it
-                        * "softly," putting the link back on the prepost list
-                        * if it fails (at the tail is fine since the order of
-                        * handling messages from different sources in a set is
-                        * not guaranteed and we'd like to skip to the next source
-                        * if one is available).
-                        */
-                       port_mq = (ipc_mqueue_t)wql->wql_queue;
-                       if (!imq_lock_try(port_mq)) {
-                               queue_enter(q, wql, wait_queue_link_t, wql_preposts);
-                               imq_unlock(mqueue);
-                               splx(s);
-                               mutex_pause(0);
-                               s = splsched();
-                               imq_lock(mqueue);
-                               goto search_set; /* start again at beginning - SMP */
-                       }
+       if (imq_is_set(mqueue)) {
+               ipc_mqueue_t port_mq = IMQ_NULL;
 
-                       /*
-                        * If there are no messages on this queue, just skip it
-                        * (we already removed the link from the set's prepost queue).
-                        */
-                       kmsgs = &port_mq->imq_messages;
-                       if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
-                               imq_unlock(port_mq);
-                               continue;
-                       }
+               (void)waitq_set_iterate_preposts(&mqueue->imq_set_queue,
+                   &port_mq,
+                   mqueue_process_prepost_receive);
 
+               if (port_mq != IMQ_NULL) {
                        /*
-                        * There are messages, so reinsert the link back
-                        * at the tail of the preposted queue (for fairness)
-                        * while we still have the portset mqueue locked.
+                        * We get here if there is at least one message
+                        * waiting on port_mq. We have instructed the prepost
+                        * iteration logic to leave both the port_mq and the
+                        * set mqueue locked.
+                        *
+                        * TODO: previously, we would place this port at the
+                        *       back of the prepost list...
                         */
-                       queue_enter(q, wql, wait_queue_link_t, wql_preposts);
                        imq_unlock(mqueue);
 
                        /*
                         * Continue on to handling the message with just
                         * the port mqueue locked.
                         */
-                       ipc_mqueue_select_on_thread(port_mq, option, max_size, thread);
-                       imq_unlock(port_mq);
-#if CONFIG_MACF_MACH
-                       if (thread->task != TASK_NULL &&
-                           thread->ith_kmsg != NULL &&
-                           thread->ith_kmsg->ikm_sender != NULL) {
-                               lh = thread->ith_kmsg->ikm_sender->label;
-                               tasklabel_lock(thread->task);
-                               ip_lock(lh->lh_port);
-                               rc = mac_port_check_receive(&thread->task->maclabel,
-                                                            &lh->lh_label);
-                               ip_unlock(lh->lh_port);
-                               tasklabel_unlock(thread->task);
-                               if (rc)
-                                       thread->ith_state = MACH_RCV_INVALID_DATA;
+                       if (option & MACH_PEEK_MSG) {
+                               ipc_mqueue_peek_on_thread(port_mq, option, thread);
+                       } else {
+                               ipc_mqueue_select_on_thread(port_mq, mqueue, option,
+                                   max_size, thread);
                        }
-#endif
-                       splx(s);
+
+                       imq_unlock(port_mq);
                        return THREAD_NOT_WAITING;
-                       
                }
-
-       } else {
+       } else if (imq_is_queue(mqueue) || imq_is_turnstile_proxy(mqueue)) {
+               ipc_kmsg_queue_t kmsgs;
 
                /*
                 * Receive on a single port. Just try to get the messages.
                 */
-               kmsgs = &mqueue->imq_messages;
+               kmsgs = &mqueue->imq_messages;
                if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
-                       ipc_mqueue_select_on_thread(mqueue, option, max_size, thread);
-                       imq_unlock(mqueue);
-#if CONFIG_MACF_MACH
-                       if (thread->task != TASK_NULL &&
-                           thread->ith_kmsg != NULL &&
-                           thread->ith_kmsg->ikm_sender != NULL) {
-                               lh = thread->ith_kmsg->ikm_sender->label;
-                               tasklabel_lock(thread->task);
-                               ip_lock(lh->lh_port);
-                               rc = mac_port_check_receive(&thread->task->maclabel,
-                                                            &lh->lh_label);
-                               ip_unlock(lh->lh_port);
-                               tasklabel_unlock(thread->task);
-                               if (rc)
-                                       thread->ith_state = MACH_RCV_INVALID_DATA;
+                       if (option & MACH_PEEK_MSG) {
+                               ipc_mqueue_peek_on_thread(mqueue, option, thread);
+                       } else {
+                               ipc_mqueue_select_on_thread(mqueue, IMQ_NULL, option,
+                                   max_size, thread);
                        }
-#endif
-                       splx(s);
+                       imq_unlock(mqueue);
                        return THREAD_NOT_WAITING;
                }
+       } else {
+               panic("Unknown mqueue type 0x%x: likely memory corruption!\n",
+                   mqueue->imq_wait_queue.waitq_type);
        }
-       
+
        /*
         * Looks like we'll have to block.  The mqueue we will
         * block on (whether the set's or the local port's) is
@@ -855,39 +1182,113 @@ ipc_mqueue_receive_on_thread(
        if (option & MACH_RCV_TIMEOUT) {
                if (rcv_timeout == 0) {
                        imq_unlock(mqueue);
-                       splx(s);
                        thread->ith_state = MACH_RCV_TIMED_OUT;
                        return THREAD_NOT_WAITING;
                }
        }
 
-       thread_lock(thread);
-       thread->ith_state = MACH_RCV_IN_PROGRESS;
        thread->ith_option = option;
-       thread->ith_msize = max_size;
+       thread->ith_rsize = max_size;
+       thread->ith_msize = 0;
+
+       if (option & MACH_PEEK_MSG) {
+               thread->ith_state = MACH_PEEK_IN_PROGRESS;
+       } else {
+               thread->ith_state = MACH_RCV_IN_PROGRESS;
+       }
 
-       if (option & MACH_RCV_TIMEOUT)
-               clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
-       else
+       if (option & MACH_RCV_TIMEOUT) {
+               clock_interval_to_deadline(rcv_timeout, 1000 * NSEC_PER_USEC, &deadline);
+       } else {
                deadline = 0;
+       }
 
-       wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
-                                                 IPC_MQUEUE_RECEIVE,
-                                                 interruptible, 
-                                                 TIMEOUT_URGENCY_USER_NORMAL,
-                                                 deadline, 0,
-                                                 thread);
+       /*
+        * Threads waiting on a reply port (not portset)
+        * will wait on its receive turnstile.
+        *
+        * Donate waiting thread's turnstile and
+        * setup inheritor for special reply port.
+        * Based on the state of the special reply
+        * port, the inheritor would be the send
+        * turnstile of the connection port on which
+        * the send of sync ipc would happen or
+        * workloop's turnstile who would reply to
+        * the sync ipc message.
+        *
+        * Pass in mqueue wait in waitq_assert_wait to
+        * support port set wakeup. The mqueue waitq of port
+        * will be converted to to turnstile waitq
+        * in waitq_assert_wait instead of global waitqs.
+        */
+       if (imq_is_turnstile_proxy(mqueue)) {
+               ipc_port_t port = ip_from_mq(mqueue);
+               rcv_turnstile = turnstile_prepare((uintptr_t)port,
+                   port_rcv_turnstile_address(port),
+                   TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
+
+               ipc_port_recv_update_inheritor(port, rcv_turnstile,
+                   TURNSTILE_DELAYED_UPDATE);
+       }
+
+       thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
+       wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue,
+           IPC_MQUEUE_RECEIVE,
+           interruptible,
+           TIMEOUT_URGENCY_USER_NORMAL,
+           deadline,
+           TIMEOUT_NO_LEEWAY,
+           thread);
        /* preposts should be detected above, not here */
-       if (wresult == THREAD_AWAKENED)
+       if (wresult == THREAD_AWAKENED) {
                panic("ipc_mqueue_receive_on_thread: sleep walking");
+       }
 
-       thread_unlock(thread);
        imq_unlock(mqueue);
-       splx(s);
+
+       /* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
+       if (rcv_turnstile != TURNSTILE_NULL) {
+               turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
+       }
+       /* Its callers responsibility to call turnstile_complete to get the turnstile back */
+
        return wresult;
 }
 
 
+/*
+ *     Routine:        ipc_mqueue_peek_on_thread
+ *     Purpose:
+ *             A receiver discovered that there was a message on the queue
+ *             before he had to block. Tell a thread about the message queue,
+ *             but don't pick off any messages.
+ *     Conditions:
+ *             port_mq locked
+ *             at least one message on port_mq's message queue
+ *
+ *     Returns: (on thread->ith_state)
+ *             MACH_PEEK_READY         ith_peekq contains a message queue
+ */
+void
+ipc_mqueue_peek_on_thread(
+       ipc_mqueue_t        port_mq,
+       mach_msg_option_t   option,
+       thread_t            thread)
+{
+       (void)option;
+       assert(option & MACH_PEEK_MSG);
+       assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL);
+
+       /*
+        * Take a reference on the mqueue's associated port:
+        * the peeking thread will be responsible to release this reference
+        * using ip_release_mq()
+        */
+       ip_reference_mq(port_mq);
+       thread->ith_peekq = port_mq;
+       thread->ith_state = MACH_PEEK_READY;
+}
+
 /*
  *     Routine:        ipc_mqueue_select_on_thread
  *     Purpose:
@@ -898,26 +1299,29 @@ ipc_mqueue_receive_on_thread(
  *             mqueue locked.
  *              thread not locked.
  *             There is a message.
+ *             No need to reserve prepost objects - it will never prepost
+ *
  *     Returns:
  *             MACH_MSG_SUCCESS        Actually selected a message for ourselves.
  *             MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
  */
 void
 ipc_mqueue_select_on_thread(
-       ipc_mqueue_t            mqueue,
-       mach_msg_option_t       option,
-       mach_msg_size_t         max_size,
+       ipc_mqueue_t            port_mq,
+       ipc_mqueue_t            set_mq,
+       mach_msg_option_t       option,
+       mach_msg_size_t         max_size,
        thread_t                thread)
 {
        ipc_kmsg_t kmsg;
        mach_msg_return_t mr = MACH_MSG_SUCCESS;
-       mach_msg_size_t rcv_size;
+       mach_msg_size_t msize;
 
        /*
         * Do some sanity checking of our ability to receive
         * before pulling the message off the queue.
         */
-       kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
+       kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
        assert(kmsg != IKM_NULL);
 
        /*
@@ -926,22 +1330,27 @@ ipc_mqueue_select_on_thread(
         * the queue, instead return the appropriate error
         * (and size needed).
         */
-       rcv_size = ipc_kmsg_copyout_size(kmsg, thread->map);
-       if (rcv_size + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) {
+       msize = ipc_kmsg_copyout_size(kmsg, thread->map);
+       if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(thread), option) > max_size) {
                mr = MACH_RCV_TOO_LARGE;
                if (option & MACH_RCV_LARGE) {
-                       thread->ith_receiver_name = mqueue->imq_receiver_name;
+                       thread->ith_receiver_name = port_mq->imq_receiver_name;
                        thread->ith_kmsg = IKM_NULL;
-                       thread->ith_msize = rcv_size;
+                       thread->ith_msize = msize;
                        thread->ith_seqno = 0;
                        thread->ith_state = mr;
                        return;
                }
        }
 
-       ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
-       ipc_mqueue_release_msgcount(mqueue);
-       thread->ith_seqno = mqueue->imq_seqno++;
+       ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
+#if MACH_FLIPC
+       if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) {
+               flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
+       }
+#endif
+       ipc_mqueue_release_msgcount(port_mq, set_mq);
+       thread->ith_seqno = port_mq->imq_seqno++;
        thread->ith_kmsg = kmsg;
        thread->ith_state = mr;
 
@@ -950,7 +1359,7 @@ ipc_mqueue_select_on_thread(
 }
 
 /*
- *     Routine:        ipc_mqueue_peek
+ *     Routine:        ipc_mqueue_peek_locked
  *     Purpose:
  *             Peek at a (non-set) message queue to see if it has a message
  *             matching the sequence number provided (if zero, then the
@@ -958,37 +1367,39 @@ ipc_mqueue_select_on_thread(
  *             message.
  *
  *     Conditions:
- *             Locks may be held by callers, so this routine cannot block.
+ *             The ipc_mqueue_t is locked by callers.
+ *             Other locks may be held by callers, so this routine cannot block.
  *             Caller holds reference on the message queue.
  */
 unsigned
-ipc_mqueue_peek(ipc_mqueue_t           mq,
-               mach_port_seqno_t       *seqnop,
-               mach_msg_size_t         *msg_sizep,
-               mach_msg_id_t           *msg_idp,
-               mach_msg_max_trailer_t  *msg_trailerp)
+ipc_mqueue_peek_locked(ipc_mqueue_t mq,
+    mach_port_seqno_t * seqnop,
+    mach_msg_size_t * msg_sizep,
+    mach_msg_id_t * msg_idp,
+    mach_msg_max_trailer_t * msg_trailerp,
+    ipc_kmsg_t *kmsgp)
 {
        ipc_kmsg_queue_t kmsgq;
-       ipc_kmsg_t kmsg; 
+       ipc_kmsg_t kmsg;
        mach_port_seqno_t seqno, msgoff;
-       int res = 0;
-       spl_t s;
+       unsigned res = 0;
 
        assert(!imq_is_set(mq));
 
-       s = splsched();
-       imq_lock(mq);
-
-       seqno = (seqnop != NULL) ? seqno = *seqnop : 0;
+       seqno = 0;
+       if (seqnop != NULL) {
+               seqno = *seqnop;
+       }
 
        if (seqno == 0) {
                seqno = mq->imq_seqno;
                msgoff = 0;
-       } else if (seqno >= mq->imq_seqno && 
-                  seqno < mq->imq_seqno + mq->imq_msgcount) {
+       } else if (seqno >= mq->imq_seqno &&
+           seqno < mq->imq_seqno + mq->imq_msgcount) {
                msgoff = seqno - mq->imq_seqno;
-       } else
+       } else {
                goto out;
+       }
 
        /* look for the message that would match that seqno */
        kmsgq = &mq->imq_messages;
@@ -996,29 +1407,126 @@ ipc_mqueue_peek(ipc_mqueue_t            mq,
        while (msgoff-- && kmsg != IKM_NULL) {
                kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
        }
-       if (kmsg == IKM_NULL)
+       if (kmsg == IKM_NULL) {
                goto out;
+       }
 
        /* found one - return the requested info */
-       if (seqnop != NULL)
+       if (seqnop != NULL) {
                *seqnop = seqno;
-       if (msg_sizep != NULL)
+       }
+       if (msg_sizep != NULL) {
                *msg_sizep = kmsg->ikm_header->msgh_size;
-       if (msg_idp != NULL)
+       }
+       if (msg_idp != NULL) {
                *msg_idp = kmsg->ikm_header->msgh_id;
-       if (msg_trailerp != NULL)
-               memcpy(msg_trailerp, 
-                      (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
-                                                 round_msg(kmsg->ikm_header->msgh_size)),
-                      sizeof(mach_msg_max_trailer_t));
+       }
+       if (msg_trailerp != NULL) {
+               memcpy(msg_trailerp,
+                   (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
+                   round_msg(kmsg->ikm_header->msgh_size)),
+                   sizeof(mach_msg_max_trailer_t));
+       }
+       if (kmsgp != NULL) {
+               *kmsgp = kmsg;
+       }
+
        res = 1;
 
- out:
+out:
+       return res;
+}
+
+
+/*
+ *     Routine:        ipc_mqueue_peek
+ *     Purpose:
+ *             Peek at a (non-set) message queue to see if it has a message
+ *             matching the sequence number provided (if zero, then the
+ *             first message in the queue) and return vital info about the
+ *             message.
+ *
+ *     Conditions:
+ *             The ipc_mqueue_t is unlocked.
+ *             Locks may be held by callers, so this routine cannot block.
+ *             Caller holds reference on the message queue.
+ */
+unsigned
+ipc_mqueue_peek(ipc_mqueue_t mq,
+    mach_port_seqno_t * seqnop,
+    mach_msg_size_t * msg_sizep,
+    mach_msg_id_t * msg_idp,
+    mach_msg_max_trailer_t * msg_trailerp,
+    ipc_kmsg_t *kmsgp)
+{
+       unsigned res;
+
+       imq_lock(mq);
+
+       res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
+           msg_trailerp, kmsgp);
+
        imq_unlock(mq);
-       splx(s);
        return res;
 }
 
+/*
+ *     Routine:        ipc_mqueue_release_peek_ref
+ *     Purpose:
+ *             Release the reference on an mqueue's associated port which was
+ *             granted to a thread in ipc_mqueue_peek_on_thread (on the
+ *             MACH_PEEK_MSG thread wakeup path).
+ *
+ *     Conditions:
+ *             The ipc_mqueue_t should be locked on entry.
+ *             The ipc_mqueue_t will be _unlocked_ on return
+ *                     (and potentially invalid!)
+ *
+ */
+void
+ipc_mqueue_release_peek_ref(ipc_mqueue_t mq)
+{
+       assert(!imq_is_set(mq));
+       assert(imq_held(mq));
+
+       /*
+        * clear any preposts this mq may have generated
+        * (which would cause subsequent immediate wakeups)
+        */
+       waitq_clear_prepost_locked(&mq->imq_wait_queue);
+
+       imq_unlock(mq);
+
+       /*
+        * release the port reference: we need to do this outside the lock
+        * because we might be holding the last port reference!
+        **/
+       ip_release_mq(mq);
+}
+
+/*
+ * peek at the contained port message queues, break prepost iteration as soon
+ * as we spot a message on one of the message queues referenced by the set's
+ * prepost list.  No need to lock each message queue, as only the head of each
+ * queue is checked. If a message wasn't there before we entered here, no need
+ * to find it (if we do, great).
+ */
+static int
+mqueue_peek_iterator(void *ctx, struct waitq *waitq,
+    struct waitq_set *wqset)
+{
+       ipc_mqueue_t port_mq = (ipc_mqueue_t)waitq;
+       ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
+
+       (void)ctx;
+       (void)wqset;
+
+       if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
+               return WQ_ITERATE_BREAK; /* break out of the prepost iteration */
+       }
+       return WQ_ITERATE_CONTINUE;
+}
+
 /*
  *     Routine:        ipc_mqueue_set_peek
  *     Purpose:
@@ -1032,117 +1540,134 @@ ipc_mqueue_peek(ipc_mqueue_t          mq,
 unsigned
 ipc_mqueue_set_peek(ipc_mqueue_t mq)
 {
-       wait_queue_link_t       wql;
-       queue_t                 q;
-       spl_t s;
-       int res;
+       int ret;
 
-       assert(imq_is_set(mq));
-
-       s = splsched();
        imq_lock(mq);
 
-       /* 
-        * peek at the contained port message queues, return as soon as
-        * we spot a message on one of the message queues linked on the
-        * prepost list.  No need to lock each message queue, as only the
-        * head of each queue is checked. If a message wasn't there before
-        * we entered here, no need to find it (if we do, great).
+       /*
+        * We may have raced with port destruction where the mqueue is marked
+        * as invalid. In that case, even though we don't have messages, we
+        * have an end-of-life event to deliver.
         */
-       res = 0;
-       q = &mq->imq_preposts;
-       queue_iterate(q, wql, wait_queue_link_t, wql_preposts) {
-               ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
-               ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
-                       
-               if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
-                       res = 1;
-                       break;
-               }
+       if (!imq_is_valid(mq)) {
+               return 1;
        }
+
+       ret = waitq_set_iterate_preposts(&mq->imq_set_queue, NULL,
+           mqueue_peek_iterator);
+
        imq_unlock(mq);
-       splx(s);
-       return res;
+
+       return ret == WQ_ITERATE_BREAK;
 }
 
 /*
  *     Routine:        ipc_mqueue_set_gather_member_names
  *     Purpose:
- *             Iterate a message queue set to identify the member port
- *             names. Actual returned names is limited to maxnames entries,
- *             but we keep counting the actual number of members to let
- *             the caller decide to retry if necessary.
+ *             Discover all ports which are members of a given port set.
+ *             Because the waitq linkage mechanism was redesigned to save
+ *             significan amounts of memory, it no longer keeps back-pointers
+ *             from a port set to a port. Therefore, we must iterate over all
+ *             ports within a given IPC space and individually query them to
+ *             see if they are members of the given set. Port names of ports
+ *             found to be members of the given set will be gathered into the
+ *             provided 'names' array.  Actual returned names are limited to
+ *             maxnames entries, but we keep counting the actual number of
+ *             members to let the caller decide to retry if necessary.
  *
  *     Conditions:
  *             Locks may be held by callers, so this routine cannot block.
- *             Caller holds reference on the message queue.
+ *             Caller holds reference on the message queue (via port set).
  */
 void
 ipc_mqueue_set_gather_member_names(
-       ipc_mqueue_t mq, 
-       ipc_entry_num_t maxnames, 
+       ipc_space_t space,
+       ipc_mqueue_t set_mq,
+       ipc_entry_num_t maxnames,
        mach_port_name_t *names,
        ipc_entry_num_t *actualp)
 {
-       wait_queue_link_t       wql;
-       queue_t                 q;
-       spl_t s;
+       ipc_entry_t table;
+       ipc_entry_num_t tsize;
+       struct waitq_set *wqset;
        ipc_entry_num_t actual = 0;
 
-       assert(imq_is_set(mq));
+       assert(set_mq != IMQ_NULL);
+       wqset = &set_mq->imq_set_queue;
 
-       s = splsched();
-       imq_lock(mq);
+       assert(space != IS_NULL);
+       is_read_lock(space);
+       if (!is_active(space)) {
+               is_read_unlock(space);
+               goto out;
+       }
 
-       /* 
-        * Iterate over the member ports through the mqueue set links
-        * capturing as many names as we can.
-        */
-       q = &mq->imq_setlinks;
-       queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
-               ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
+       if (!waitq_set_is_valid(wqset)) {
+               is_read_unlock(space);
+               goto out;
+       }
 
-               if (actual < maxnames)
-                       names[actual] = port_mq->imq_receiver_name;
-               actual++;
+       table = space->is_table;
+       tsize = space->is_table_size;
+       for (ipc_entry_num_t idx = 0; idx < tsize; idx++) {
+               ipc_entry_t entry = &table[idx];
+
+               /* only receive rights can be members of port sets */
+               if ((entry->ie_bits & MACH_PORT_TYPE_RECEIVE) != MACH_PORT_TYPE_NONE) {
+                       ipc_port_t port = ip_object_to_port(entry->ie_object);
+                       ipc_mqueue_t mq = &port->ip_messages;
+
+                       assert(IP_VALID(port));
+                       if (ip_active(port) &&
+                           waitq_member(&mq->imq_wait_queue, wqset)) {
+                               if (actual < maxnames) {
+                                       names[actual] = mq->imq_receiver_name;
+                               }
+                               actual++;
+                       }
+               }
        }
-       imq_unlock(mq);
-       splx(s);
 
+       is_read_unlock(space);
+
+out:
        *actualp = actual;
 }
 
 
 /*
- *     Routine:        ipc_mqueue_destroy
+ *     Routine:        ipc_mqueue_destroy_locked
  *     Purpose:
  *             Destroy a (non-set) message queue.
  *             Set any blocked senders running.
- *             Destroy the kmsgs in the queue.
+ *             Destroy the kmsgs in the queue.
  *     Conditions:
- *             Nothing locked.
+ *             mqueue locked
  *             Receivers were removed when the receive right was "changed"
  */
-void
-ipc_mqueue_destroy(
-       ipc_mqueue_t    mqueue)
+boolean_t
+ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue)
 {
        ipc_kmsg_queue_t kmqueue;
        ipc_kmsg_t kmsg;
        boolean_t reap = FALSE;
-       spl_t s;
+       struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
+
+       assert(!imq_is_set(mqueue));
 
-       s = splsched();
-       imq_lock(mqueue);
        /*
         *      rouse all blocked senders
+        *      (don't boost anyone - we're tearing this queue down)
+        *      (never preposts)
         */
        mqueue->imq_fullwaiters = FALSE;
-       wait_queue_wakeup64_all_locked(
-                               &mqueue->imq_wait_queue,
-                               IPC_MQUEUE_FULL,
-                               THREAD_RESTART,
-                               FALSE);
+
+       if (send_turnstile != TURNSTILE_NULL) {
+               waitq_wakeup64_all(&send_turnstile->ts_waitq,
+                   IPC_MQUEUE_FULL,
+                   THREAD_RESTART,
+                   WAITQ_ALL_PRIORITIES);
+       }
 
        /*
         * Move messages from the specified queue to the per-thread
@@ -1150,21 +1675,40 @@ ipc_mqueue_destroy(
         */
        kmqueue = &mqueue->imq_messages;
        while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
+#if MACH_FLIPC
+               if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(mqueue->imq_fport)) {
+                       flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
+               }
+#endif
                boolean_t first;
                first = ipc_kmsg_delayed_destroy(kmsg);
-               if (first)
+               if (first) {
                        reap = first;
+               }
        }
 
-       imq_unlock(mqueue);
-       splx(s);
+       /*
+        * Wipe out message count, both for messages about to be
+        * reaped and for reserved space for (previously) woken senders.
+        * This is the indication to them that their reserved space is gone
+        * (the mqueue was destroyed).
+        */
+       mqueue->imq_msgcount = 0;
+
+       /* invalidate the waitq for subsequent mqueue operations */
+       waitq_invalidate_locked(&mqueue->imq_wait_queue);
+
+       /* clear out any preposting we may have done */
+       waitq_clear_prepost_locked(&mqueue->imq_wait_queue);
 
        /*
-        * Destroy the messages we enqueued if we aren't nested
-        * inside some other attempt to drain the same queue.
+        * assert that we are destroying / invalidating a queue that's
+        * not a member of any other queue.
         */
-       if (reap)
-               ipc_kmsg_reap_delayed();
+       assert(mqueue->imq_preposts == 0);
+       assert(mqueue->imq_in_pset == 0);
+
+       return reap;
 }
 
 /*
@@ -1178,37 +1722,41 @@ ipc_mqueue_destroy(
 
 void
 ipc_mqueue_set_qlimit(
-        ipc_mqueue_t                   mqueue,
-        mach_port_msgcount_t   qlimit)
+       ipc_mqueue_t                   mqueue,
+       mach_port_msgcount_t   qlimit)
 {
-        spl_t s;
-
-        assert(qlimit <= MACH_PORT_QLIMIT_MAX);
-
-        /* wake up senders allowed by the new qlimit */
-        s = splsched();
-        imq_lock(mqueue);
-        if (qlimit > mqueue->imq_qlimit) {
-                mach_port_msgcount_t i, wakeup;
-
-                /* caution: wakeup, qlimit are unsigned */
-                wakeup = qlimit - mqueue->imq_qlimit;
-
-                for (i = 0; i < wakeup; i++) {
-                        if (wait_queue_wakeup64_one_locked(
-                                                       &mqueue->imq_wait_queue,
-                                                       IPC_MQUEUE_FULL,
-                                                       THREAD_AWAKENED,
-                                                       FALSE) == KERN_NOT_WAITING) {
-                                        mqueue->imq_fullwaiters = FALSE;
-                                        break;
-                        }
-                        mqueue->imq_msgcount++;  /* give it to the awakened thread */
-                }
-        }
+       assert(qlimit <= MACH_PORT_QLIMIT_MAX);
+
+       /* wake up senders allowed by the new qlimit */
+       imq_lock(mqueue);
+       if (qlimit > mqueue->imq_qlimit) {
+               mach_port_msgcount_t i, wakeup;
+               struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
+
+               /* caution: wakeup, qlimit are unsigned */
+               wakeup = qlimit - mqueue->imq_qlimit;
+
+               for (i = 0; i < wakeup; i++) {
+                       /*
+                        * boost the priority of the awoken thread
+                        * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
+                        * the message queue slot we've just reserved.
+                        *
+                        * NOTE: this will never prepost
+                        */
+                       if (send_turnstile == TURNSTILE_NULL ||
+                           waitq_wakeup64_one(&send_turnstile->ts_waitq,
+                           IPC_MQUEUE_FULL,
+                           THREAD_AWAKENED,
+                           WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
+                               mqueue->imq_fullwaiters = FALSE;
+                               break;
+                       }
+                       mqueue->imq_msgcount++;  /* give it to the awakened thread */
+               }
+       }
        mqueue->imq_qlimit = qlimit;
        imq_unlock(mqueue);
-       splx(s);
 }
 
 /*
@@ -1220,16 +1768,12 @@ ipc_mqueue_set_qlimit(
  */
 void
 ipc_mqueue_set_seqno(
-       ipc_mqueue_t            mqueue,
-       mach_port_seqno_t       seqno)
+       ipc_mqueue_t            mqueue,
+       mach_port_seqno_t       seqno)
 {
-       spl_t s;
-
-       s = splsched();
        imq_lock(mqueue);
        mqueue->imq_seqno = seqno;
        imq_unlock(mqueue);
-       splx(s);
 }
 
 
@@ -1252,12 +1796,13 @@ ipc_mqueue_set_seqno(
 
 mach_msg_return_t
 ipc_mqueue_copyin(
-       ipc_space_t             space,
-       mach_port_name_t        name,
-       ipc_mqueue_t            *mqueuep,
-       ipc_object_t            *objectp)
+       ipc_space_t             space,
+       mach_port_name_t        name,
+       ipc_mqueue_t            *mqueuep,
+       ipc_object_t            *objectp)
 {
        ipc_entry_t entry;
+       ipc_entry_bits_t bits;
        ipc_object_t object;
        ipc_mqueue_t mqueue;
 
@@ -1273,35 +1818,36 @@ ipc_mqueue_copyin(
                return MACH_RCV_INVALID_NAME;
        }
 
+       bits = entry->ie_bits;
        object = entry->ie_object;
 
-       if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
-               ipc_port_t port;
+       if (bits & MACH_PORT_TYPE_RECEIVE) {
+               ipc_port_t port = ip_object_to_port(object);
 
-               port = (ipc_port_t) object;
                assert(port != IP_NULL);
 
                ip_lock(port);
-               assert(ip_active(port));
+               require_ip_active(port);
                assert(port->ip_receiver_name == name);
                assert(port->ip_receiver == space);
                is_read_unlock(space);
                mqueue = &port->ip_messages;
+       } else if (bits & MACH_PORT_TYPE_PORT_SET) {
+               ipc_pset_t pset = ips_object_to_pset(object);
 
-       } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
-               ipc_pset_t pset;
-
-               pset = (ipc_pset_t) object;
                assert(pset != IPS_NULL);
 
                ips_lock(pset);
                assert(ips_active(pset));
-               assert(pset->ips_local_name == name);
                is_read_unlock(space);
 
                mqueue = &pset->ips_messages;
        } else {
                is_read_unlock(space);
+               /* guard exception if we never held the receive right in this entry */
+               if ((bits & MACH_PORT_TYPE_EX_RECEIVE) == 0) {
+                       mach_port_guard_exception(name, 0, 0, kGUARD_EXC_RCV_INVALID_NAME);
+               }
                return MACH_RCV_INVALID_NAME;
        }
 
@@ -1318,3 +1864,18 @@ ipc_mqueue_copyin(
        return MACH_MSG_SUCCESS;
 }
 
+void
+imq_lock(ipc_mqueue_t mq)
+{
+       ipc_object_t object = imq_to_object(mq);
+       ipc_object_validate(object);
+       waitq_lock(&(mq)->imq_wait_queue);
+}
+
+unsigned int
+imq_lock_try(ipc_mqueue_t mq)
+{
+       ipc_object_t object = imq_to_object(mq);
+       ipc_object_validate(object);
+       return waitq_lock_try(&(mq)->imq_wait_queue);
+}