]> git.saurik.com Git - apple/xnu.git/blobdiff - osfmk/ipc/ipc_mqueue.c
xnu-3248.60.10.tar.gz
[apple/xnu.git] / osfmk / ipc / ipc_mqueue.c
index 1a98bab1f1929e30f64f060cc20c26132debd9bf..1b5d82c194477545b0ee3ac6adb8a35c31261569 100644 (file)
@@ -82,7 +82,7 @@
 #include <kern/misc_protos.h>
 #include <kern/task.h>
 #include <kern/thread.h>
 #include <kern/misc_protos.h>
 #include <kern/task.h>
 #include <kern/thread.h>
-#include <kern/wait_queue.h>
+#include <kern/waitq.h>
 
 #include <ipc/ipc_mqueue.h>
 #include <ipc/ipc_kmsg.h>
 
 #include <ipc/ipc_mqueue.h>
 #include <ipc/ipc_kmsg.h>
@@ -108,12 +108,15 @@ void ipc_mqueue_receive_results(wait_result_t result);
 void
 ipc_mqueue_init(
        ipc_mqueue_t    mqueue,
 void
 ipc_mqueue_init(
        ipc_mqueue_t    mqueue,
-       boolean_t       is_set)
+       boolean_t       is_set,
+       uint64_t        *reserved_link)
 {
        if (is_set) {
 {
        if (is_set) {
-               wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST);
+               waitq_set_init(&mqueue->imq_set_queue,
+                              SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST|SYNC_POLICY_DISABLE_IRQ,
+                              reserved_link);
        } else {
        } else {
-               wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
+               waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO|SYNC_POLICY_DISABLE_IRQ);
                ipc_kmsg_queue_init(&mqueue->imq_messages);
                mqueue->imq_seqno = 0;
                mqueue->imq_msgcount = 0;
                ipc_kmsg_queue_init(&mqueue->imq_messages);
                mqueue->imq_seqno = 0;
                mqueue->imq_msgcount = 0;
@@ -122,6 +125,53 @@ ipc_mqueue_init(
        }
 }
 
        }
 }
 
+void ipc_mqueue_deinit(
+       ipc_mqueue_t            mqueue)
+{
+       boolean_t is_set = imq_is_set(mqueue);
+
+       if (is_set)
+               waitq_set_deinit(&mqueue->imq_set_queue);
+       else
+               waitq_deinit(&mqueue->imq_wait_queue);
+}
+
+/*
+ *     Routine:        imq_reserve_and_lock
+ *     Purpose:
+ *             Atomically lock an ipc_mqueue_t object and reserve
+ *             an appropriate number of prepost linkage objects for
+ *             use in wakeup operations.
+ *     Conditions:
+ *             mq is unlocked
+ */
+void
+imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost, spl_t *spl)
+{
+       *reserved_prepost = waitq_prepost_reserve(&mq->imq_wait_queue, 0,
+                                                 WAITQ_KEEP_LOCKED, spl);
+
+}
+
+
+/*
+ *     Routine:        imq_release_and_unlock
+ *     Purpose:
+ *             Unlock an ipc_mqueue_t object, re-enable interrupts,
+ *             and release any unused prepost object reservations.
+ *     Conditions:
+ *             mq is locked
+ */
+void
+imq_release_and_unlock(ipc_mqueue_t mq, uint64_t reserved_prepost, spl_t spl)
+{
+       assert(imq_held(mq));
+       waitq_unlock(&mq->imq_wait_queue);
+       splx(spl);
+       waitq_prepost_release_reserve(reserved_prepost);
+}
+
+
 /*
  *     Routine:        ipc_mqueue_member
  *     Purpose:
 /*
  *     Routine:        ipc_mqueue_member
  *     Purpose:
@@ -139,10 +189,10 @@ ipc_mqueue_member(
        ipc_mqueue_t            port_mqueue,
        ipc_mqueue_t            set_mqueue)
 {
        ipc_mqueue_t            port_mqueue,
        ipc_mqueue_t            set_mqueue)
 {
-       wait_queue_t    port_waitq = &port_mqueue->imq_wait_queue;
-       wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
+       struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
+       struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
 
 
-       return (wait_queue_member(port_waitq, set_waitq));
+       return waitq_member(port_waitq, set_waitq);
 
 }
 
 
 }
 
@@ -156,13 +206,12 @@ ipc_mqueue_member(
 kern_return_t
 ipc_mqueue_remove(
        ipc_mqueue_t      mqueue,
 kern_return_t
 ipc_mqueue_remove(
        ipc_mqueue_t      mqueue,
-       ipc_mqueue_t      set_mqueue,
-       wait_queue_link_t *wqlp)
+       ipc_mqueue_t      set_mqueue)
 {
 {
-       wait_queue_t     mq_waitq = &mqueue->imq_wait_queue;
-       wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
+       struct waitq *mq_waitq = &mqueue->imq_wait_queue;
+       struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
 
 
-       return wait_queue_unlink_nofree(mq_waitq, set_waitq, wqlp);
+       return waitq_unlink(mq_waitq, set_waitq);
 }
 
 /*
 }
 
 /*
@@ -173,13 +222,11 @@ ipc_mqueue_remove(
  *             Nothing locked.
  */
 void
  *             Nothing locked.
  */
 void
-ipc_mqueue_remove_from_all(
-       ipc_mqueue_t    mqueue,
-       queue_t         links)
+ipc_mqueue_remove_from_all(ipc_mqueue_t        mqueue)
 {
 {
-       wait_queue_t    mq_waitq = &mqueue->imq_wait_queue;
+       struct waitq *mq_waitq = &mqueue->imq_wait_queue;
 
 
-       wait_queue_unlink_all_nofree(mq_waitq, links);
+       waitq_unlink_all(mq_waitq);
        return;
 }
 
        return;
 }
 
@@ -187,17 +234,15 @@ ipc_mqueue_remove_from_all(
  *     Routine:        ipc_mqueue_remove_all
  *     Purpose:
  *             Remove all the member queues from the specified set.
  *     Routine:        ipc_mqueue_remove_all
  *     Purpose:
  *             Remove all the member queues from the specified set.
+ *             Also removes the queue from any containing sets.
  *     Conditions:
  *             Nothing locked.
  */
 void
  *     Conditions:
  *             Nothing locked.
  */
 void
-ipc_mqueue_remove_all(
-       ipc_mqueue_t    mqueue,
-       queue_t         links)
+ipc_mqueue_remove_all(ipc_mqueue_t     mqueue)
 {
 {
-       wait_queue_set_t        mq_setq = &mqueue->imq_set_queue;
-
-       wait_queue_set_unlink_all_nofree(mq_setq, links);
+       struct waitq_set *mq_setq = &mqueue->imq_set_queue;
+       waitq_set_unlink_all(mq_setq);
        return;
 }
 
        return;
 }
 
@@ -215,28 +260,39 @@ ipc_mqueue_remove_all(
  */
 kern_return_t
 ipc_mqueue_add(
  */
 kern_return_t
 ipc_mqueue_add(
-       ipc_mqueue_t     port_mqueue,
-       ipc_mqueue_t     set_mqueue,
-       wait_queue_link_t wql)
+       ipc_mqueue_t    port_mqueue,
+       ipc_mqueue_t    set_mqueue,
+       uint64_t        *reserved_link,
+       uint64_t        *reserved_prepost)
 {
 {
-       wait_queue_t     port_waitq = &port_mqueue->imq_wait_queue;
-       wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
+       struct waitq     *port_waitq = &port_mqueue->imq_wait_queue;
+       struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
        ipc_kmsg_queue_t kmsgq;
        ipc_kmsg_t       kmsg, next;
        kern_return_t    kr;
        spl_t            s;
 
        ipc_kmsg_queue_t kmsgq;
        ipc_kmsg_t       kmsg, next;
        kern_return_t    kr;
        spl_t            s;
 
-       kr = wait_queue_link_noalloc(port_waitq, set_waitq, wql);
-       if (kr != KERN_SUCCESS)
+       assert(reserved_link && *reserved_link != 0);
+
+       s = splsched();
+       imq_lock(port_mqueue);
+
+       /*
+        * The link operation is now under the same lock-hold as
+        * message iteration and thread wakeup, but doesn't have to be...
+        */
+       kr = waitq_link(port_waitq, set_waitq, WAITQ_ALREADY_LOCKED, reserved_link);
+       if (kr != KERN_SUCCESS) {
+               imq_unlock(port_mqueue);
+               splx(s);
                return kr;
                return kr;
+       }
 
        /*
         * Now that the set has been added to the port, there may be
         * messages queued on the port and threads waiting on the set
         * waitq.  Lets get them together.
         */
 
        /*
         * Now that the set has been added to the port, there may be
         * messages queued on the port and threads waiting on the set
         * waitq.  Lets get them together.
         */
-       s = splsched();
-       imq_lock(port_mqueue);
        kmsgq = &port_mqueue->imq_messages;
        for (kmsg = ipc_kmsg_queue_first(kmsgq);
             kmsg != IKM_NULL;
        kmsgq = &port_mqueue->imq_messages;
        for (kmsg = ipc_kmsg_queue_first(kmsgq);
             kmsg != IKM_NULL;
@@ -246,12 +302,13 @@ ipc_mqueue_add(
                for (;;) {
                        thread_t th;
                        mach_msg_size_t msize;
                for (;;) {
                        thread_t th;
                        mach_msg_size_t msize;
+                       spl_t th_spl;
 
 
-                       th = wait_queue_wakeup64_identity_locked(
+                       th = waitq_wakeup64_identity_locked(
                                                port_waitq,
                                                IPC_MQUEUE_RECEIVE,
                                                port_waitq,
                                                IPC_MQUEUE_RECEIVE,
-                                               THREAD_AWAKENED,
-                                               FALSE);
+                                               THREAD_AWAKENED, &th_spl,
+                                               reserved_prepost, WAITQ_KEEP_LOCKED);
                        /* waitq/mqueue still locked, thread locked */
 
                        if (th == THREAD_NULL)
                        /* waitq/mqueue still locked, thread locked */
 
                        if (th == THREAD_NULL)
@@ -265,6 +322,7 @@ ipc_mqueue_add(
                         */
                        if (th->ith_state != MACH_RCV_IN_PROGRESS) {
                                  thread_unlock(th);
                         */
                        if (th->ith_state != MACH_RCV_IN_PROGRESS) {
                                  thread_unlock(th);
+                                 splx(th_spl);
                                  continue;
                        }
 
                                  continue;
                        }
 
@@ -289,6 +347,7 @@ ipc_mqueue_add(
                                        th->ith_kmsg = IKM_NULL;
                                        th->ith_seqno = 0;
                                        thread_unlock(th);
                                        th->ith_kmsg = IKM_NULL;
                                        th->ith_seqno = 0;
                                        thread_unlock(th);
+                                       splx(th_spl);
                                        continue; /* find another thread */
                                }
                        } else {
                                        continue; /* find another thread */
                                }
                        } else {
@@ -300,14 +359,14 @@ ipc_mqueue_add(
                         * so give it to him.
                         */
                        ipc_kmsg_rmqueue(kmsgq, kmsg);
                         * so give it to him.
                         */
                        ipc_kmsg_rmqueue(kmsgq, kmsg);
-                       ipc_mqueue_release_msgcount(port_mqueue);
+                       ipc_mqueue_release_msgcount(port_mqueue, IMQ_NULL);
 
                        th->ith_kmsg = kmsg;
                        th->ith_seqno = port_mqueue->imq_seqno++;
                        thread_unlock(th);
 
                        th->ith_kmsg = kmsg;
                        th->ith_seqno = port_mqueue->imq_seqno++;
                        thread_unlock(th);
+                       splx(th_spl);
                        break;  /* go to next message */
                }
                        break;  /* go to next message */
                }
-                       
        }
  leave:
        imq_unlock(port_mqueue);
        }
  leave:
        imq_unlock(port_mqueue);
@@ -327,11 +386,12 @@ void
 ipc_mqueue_changed(
        ipc_mqueue_t            mqueue)
 {
 ipc_mqueue_changed(
        ipc_mqueue_t            mqueue)
 {
-       wait_queue_wakeup64_all_locked(
-                               &mqueue->imq_wait_queue,
-                               IPC_MQUEUE_RECEIVE,
-                               THREAD_RESTART,
-                               FALSE);         /* unlock waitq? */
+       waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
+                                 IPC_MQUEUE_RECEIVE,
+                                 THREAD_RESTART,
+                                 NULL,
+                                 WAITQ_ALL_PRIORITIES,
+                                 WAITQ_KEEP_LOCKED);
 }
 
 
 }
 
 
@@ -402,12 +462,12 @@ ipc_mqueue_send(
                        clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
                else
                        deadline = 0;
                        clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
                else
                        deadline = 0;
-               wresult = wait_queue_assert_wait64_locked(
+               wresult = waitq_assert_wait64_locked(
                                                &mqueue->imq_wait_queue,
                                                IPC_MQUEUE_FULL,
                                                THREAD_ABORTSAFE,
                                                TIMEOUT_URGENCY_USER_NORMAL,
                                                &mqueue->imq_wait_queue,
                                                IPC_MQUEUE_FULL,
                                                THREAD_ABORTSAFE,
                                                TIMEOUT_URGENCY_USER_NORMAL,
-                                               deadline, 0,
+                                               deadline, TIMEOUT_NO_LEEWAY,
                                                cur_thread);
                thread_unlock(cur_thread);
                imq_unlock(mqueue);
                                                cur_thread);
                thread_unlock(cur_thread);
                imq_unlock(mqueue);
@@ -419,15 +479,19 @@ ipc_mqueue_send(
                }
                
                switch (wresult) {
                }
                
                switch (wresult) {
+
+               case THREAD_AWAKENED:
+                       /* 
+                        * we can proceed - inherited msgcount from waker
+                        * or the message queue has been destroyed and the msgcount
+                        * has been reset to zero (will detect in ipc_mqueue_post()).
+                        */
+                       break;
+                       
                case THREAD_TIMED_OUT:
                        assert(option & MACH_SEND_TIMEOUT);
                        return MACH_SEND_TIMED_OUT;
                        
                case THREAD_TIMED_OUT:
                        assert(option & MACH_SEND_TIMEOUT);
                        return MACH_SEND_TIMED_OUT;
                        
-               case THREAD_AWAKENED:
-                       /* we can proceed - inherited msgcount from waker */
-                       assert(mqueue->imq_msgcount > 0);
-                       break;
-                       
                case THREAD_INTERRUPTED:
                        return MACH_SEND_INTERRUPTED;
                        
                case THREAD_INTERRUPTED:
                        return MACH_SEND_INTERRUPTED;
                        
@@ -453,28 +517,43 @@ ipc_mqueue_send(
  *     Conditions:
  *             The message queue is locked.
  *             The message corresponding to this reference is off the queue.
  *     Conditions:
  *             The message queue is locked.
  *             The message corresponding to this reference is off the queue.
+ *             There is no need to pass reserved preposts because this will
+ *             never prepost to anyone
  */
 void
  */
 void
-ipc_mqueue_release_msgcount(
-       ipc_mqueue_t mqueue)    
+ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq)
 {
 {
-       assert(imq_held(mqueue));
-       assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
+       (void)set_mq;
+       assert(imq_held(port_mq));
+       assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
 
 
-       mqueue->imq_msgcount--;
+       port_mq->imq_msgcount--;
 
 
-       if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
-               if (wait_queue_wakeup64_one_locked(
-                                               &mqueue->imq_wait_queue,
-                                               IPC_MQUEUE_FULL,
-                                               THREAD_AWAKENED,
-                                               FALSE) != KERN_SUCCESS) {
-                       mqueue->imq_fullwaiters = FALSE;
+       if (!imq_full(port_mq) && port_mq->imq_fullwaiters) {
+               /*
+                * boost the priority of the awoken thread
+                * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
+                * the message queue slot we've just reserved.
+                *
+                * NOTE: this will never prepost
+                */
+               if (waitq_wakeup64_one_locked(&port_mq->imq_wait_queue,
+                                             IPC_MQUEUE_FULL,
+                                             THREAD_AWAKENED,
+                                             NULL,
+                                             WAITQ_PROMOTE_PRIORITY,
+                                             WAITQ_KEEP_LOCKED) != KERN_SUCCESS) {
+                       port_mq->imq_fullwaiters = FALSE;
                } else {
                        /* gave away our slot - add reference back */
                } else {
                        /* gave away our slot - add reference back */
-                       mqueue->imq_msgcount++; 
+                       port_mq->imq_msgcount++;
                }
        }
                }
        }
+
+       if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
+               /* no more msgs: invalidate the port's prepost object */
+               waitq_clear_prepost_locked(&port_mq->imq_wait_queue, NULL);
+       }
 }
 
 /*
 }
 
 /*
@@ -485,6 +564,7 @@ ipc_mqueue_release_msgcount(
  *             the message queue.
  *
  *     Conditions:
  *             the message queue.
  *
  *     Conditions:
+ *             mqueue is unlocked
  *             If we need to queue, our space in the message queue is reserved.
  */
 void
  *             If we need to queue, our space in the message queue is reserved.
  */
 void
@@ -493,6 +573,7 @@ ipc_mqueue_post(
        register ipc_kmsg_t             kmsg)
 {
        spl_t s;
        register ipc_kmsg_t             kmsg)
 {
        spl_t s;
+       uint64_t reserved_prepost = 0;
 
        /*
         *      While the msg queue     is locked, we have control of the
 
        /*
         *      While the msg queue     is locked, we have control of the
@@ -500,27 +581,41 @@ ipc_mqueue_post(
         *
         *      Check for a receiver for the message.
         */
         *
         *      Check for a receiver for the message.
         */
-       s = splsched();
-       imq_lock(mqueue);
+       imq_reserve_and_lock(mqueue, &reserved_prepost, &s);
        for (;;) {
        for (;;) {
-               wait_queue_t waitq = &mqueue->imq_wait_queue;
+               struct waitq *waitq = &mqueue->imq_wait_queue;
+               spl_t th_spl;
                thread_t receiver;
                mach_msg_size_t msize;
 
                thread_t receiver;
                mach_msg_size_t msize;
 
-               receiver = wait_queue_wakeup64_identity_locked(
-                                                       waitq,
-                                                       IPC_MQUEUE_RECEIVE,
-                                                       THREAD_AWAKENED,
-                                                       FALSE);
+               receiver = waitq_wakeup64_identity_locked(waitq,
+                                                         IPC_MQUEUE_RECEIVE,
+                                                         THREAD_AWAKENED,
+                                                         &th_spl,
+                                                         &reserved_prepost,
+                                                         WAITQ_KEEP_LOCKED);
                /* waitq still locked, thread locked */
 
                if (receiver == THREAD_NULL) {
                /* waitq still locked, thread locked */
 
                if (receiver == THREAD_NULL) {
+                       
                        /* 
                        /* 
-                        * no receivers; queue kmsg
+                        * no receivers; queue kmsg if space still reserved.
                         */
                         */
-                       assert(mqueue->imq_msgcount > 0);
-                       ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
-                       break;
+                       if (mqueue->imq_msgcount > 0) {
+                               ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
+                               break;
+                       }
+
+                       /*
+                        * Otherwise, the message queue must belong to an inactive
+                        * port, so just destroy the message and pretend it was posted.
+                        */
+                       /* clear the waitq boost we may have been given */
+                       waitq_clear_promotion_locked(waitq, current_thread());
+                       imq_release_and_unlock(mqueue, reserved_prepost, s);
+                       ipc_kmsg_destroy(kmsg);
+                       current_task()->messages_sent++;
+                       return;
                }
        
                /*
                }
        
                /*
@@ -531,6 +626,7 @@ ipc_mqueue_post(
                 */
                if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
                                  thread_unlock(receiver);
                 */
                if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
                                  thread_unlock(receiver);
+                                 splx(th_spl);
                                  continue;
                }
 
                                  continue;
                }
 
@@ -560,9 +656,10 @@ ipc_mqueue_post(
                        receiver->ith_kmsg = kmsg;
                        receiver->ith_seqno = mqueue->imq_seqno++;
                        thread_unlock(receiver);
                        receiver->ith_kmsg = kmsg;
                        receiver->ith_seqno = mqueue->imq_seqno++;
                        thread_unlock(receiver);
+                       splx(th_spl);
 
                        /* we didn't need our reserved spot in the queue */
 
                        /* we didn't need our reserved spot in the queue */
-                       ipc_mqueue_release_msgcount(mqueue);
+                       ipc_mqueue_release_msgcount(mqueue, IMQ_NULL);
                        break;
                }
 
                        break;
                }
 
@@ -575,10 +672,12 @@ ipc_mqueue_post(
                receiver->ith_kmsg = IKM_NULL;
                receiver->ith_seqno = 0;
                thread_unlock(receiver);
                receiver->ith_kmsg = IKM_NULL;
                receiver->ith_seqno = 0;
                thread_unlock(receiver);
+               splx(th_spl);
        }
 
        }
 
-       imq_unlock(mqueue);
-       splx(s);
+       /* clear the waitq boost we may have been given */
+       waitq_clear_promotion_locked(&mqueue->imq_wait_queue, current_thread());
+       imq_release_and_unlock(mqueue, reserved_prepost, s);
        
        current_task()->messages_sent++;
        return;
        
        current_task()->messages_sent++;
        return;
@@ -707,6 +806,32 @@ ipc_mqueue_receive(
        ipc_mqueue_receive_results(wresult);
 }
 
        ipc_mqueue_receive_results(wresult);
 }
 
+static int mqueue_process_prepost_receive(void *ctx, struct waitq *waitq,
+                                         struct waitq_set *wqset)
+{
+       ipc_mqueue_t     port_mq, *pmq_ptr;
+
+       (void)wqset;
+       port_mq = (ipc_mqueue_t)waitq;
+
+       /*
+        * If there are no messages on this queue, skip it and remove
+        * it from the prepost list
+        */
+       if (ipc_kmsg_queue_empty(&port_mq->imq_messages))
+               return WQ_ITERATE_INVALIDATE_CONTINUE;
+
+       /*
+        * There are messages waiting on this port.
+        * Instruct the prepost iteration logic to break, but keep the
+        * waitq locked.
+        */
+       pmq_ptr = (ipc_mqueue_t *)ctx;
+       if (pmq_ptr)
+               *pmq_ptr = port_mq;
+       return WQ_ITERATE_BREAK_KEEP_LOCKED;
+}
+
 wait_result_t
 ipc_mqueue_receive_on_thread(
         ipc_mqueue_t            mqueue,
 wait_result_t
 ipc_mqueue_receive_on_thread(
         ipc_mqueue_t            mqueue,
@@ -716,92 +841,65 @@ ipc_mqueue_receive_on_thread(
        int                     interruptible,
        thread_t                thread)
 {
        int                     interruptible,
        thread_t                thread)
 {
-       ipc_kmsg_queue_t        kmsgs;
        wait_result_t           wresult;
        uint64_t                deadline;
        spl_t                   s;
 
        s = splsched();
        imq_lock(mqueue);
        wait_result_t           wresult;
        uint64_t                deadline;
        spl_t                   s;
 
        s = splsched();
        imq_lock(mqueue);
+       /* no need to reserve anything: we never prepost to anyone */
        
        if (imq_is_set(mqueue)) {
        
        if (imq_is_set(mqueue)) {
-               queue_t q;
-
-               q = &mqueue->imq_preposts;
+               ipc_mqueue_t port_mq = IMQ_NULL;
+               spl_t set_spl;
 
 
-               /*
-                * If we are waiting on a portset mqueue, we need to see if
-                * any of the member ports have work for us.  Ports that
-                * have (or recently had) messages will be linked in the
-                * prepost queue for the portset. By holding the portset's
-                * mqueue lock during the search, we tie up any attempts by
-                * mqueue_deliver or portset membership changes that may
-                * cross our path.
-                */
-       search_set:
-               while(!queue_empty(q)) {
-                       wait_queue_link_t wql;
-                       ipc_mqueue_t port_mq;
-
-                       queue_remove_first(q, wql, wait_queue_link_t, wql_preposts);
-                       assert(!wql_is_preposted(wql));
-
-                       /*
-                        * This is a lock order violation, so we have to do it
-                        * "softly," putting the link back on the prepost list
-                        * if it fails (at the tail is fine since the order of
-                        * handling messages from different sources in a set is
-                        * not guaranteed and we'd like to skip to the next source
-                        * if one is available).
-                        */
-                       port_mq = (ipc_mqueue_t)wql->wql_queue;
-                       if (!imq_lock_try(port_mq)) {
-                               queue_enter(q, wql, wait_queue_link_t, wql_preposts);
-                               imq_unlock(mqueue);
-                               splx(s);
-                               mutex_pause(0);
-                               s = splsched();
-                               imq_lock(mqueue);
-                               goto search_set; /* start again at beginning - SMP */
-                       }
+               (void)waitq_set_iterate_preposts(&mqueue->imq_set_queue,
+                                                &port_mq,
+                                                mqueue_process_prepost_receive,
+                                                &set_spl);
 
 
+               if (port_mq != IMQ_NULL) {
                        /*
                        /*
-                        * If there are no messages on this queue, just skip it
-                        * (we already removed the link from the set's prepost queue).
+                        * We get here if there is at least one message
+                        * waiting on port_mq. We have instructed the prepost
+                        * iteration logic to leave both the port_mq and the
+                        * set mqueue locked.
+                        *
+                        * TODO: previously, we would place this port at the
+                        *       back of the prepost list...
                         */
                         */
-                       kmsgs = &port_mq->imq_messages;
-                       if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
-                               imq_unlock(port_mq);
-                               continue;
-                       }
+                       imq_unlock(mqueue);
 
 
-                       /*
-                        * There are messages, so reinsert the link back
-                        * at the tail of the preposted queue (for fairness)
-                        * while we still have the portset mqueue locked.
+                       /* TODO: if/when port mqueues become non irq safe,
+                        *       we won't need this spl, and we should be
+                        *       able to call splx(s) (if that's even
+                        *       necessary).
+                        * For now, we've still disabled interrupts via
+                        * imq_reserve_and_lock();
                         */
                         */
-                       queue_enter(q, wql, wait_queue_link_t, wql_preposts);
-                       imq_unlock(mqueue);
+                       splx(set_spl);
 
                        /*
                         * Continue on to handling the message with just
                         * the port mqueue locked.
                         */
 
                        /*
                         * Continue on to handling the message with just
                         * the port mqueue locked.
                         */
-                       ipc_mqueue_select_on_thread(port_mq, option, max_size, thread);
+                       ipc_mqueue_select_on_thread(port_mq, mqueue, option,
+                                                   max_size, thread);
+
                        imq_unlock(port_mq);
                        splx(s);
                        return THREAD_NOT_WAITING;
                        imq_unlock(port_mq);
                        splx(s);
                        return THREAD_NOT_WAITING;
-                       
                }
                }
-
        } else {
        } else {
+               ipc_kmsg_queue_t kmsgs;
 
                /*
                 * Receive on a single port. Just try to get the messages.
                 */
                kmsgs = &mqueue->imq_messages;
                if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
 
                /*
                 * Receive on a single port. Just try to get the messages.
                 */
                kmsgs = &mqueue->imq_messages;
                if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
-                       ipc_mqueue_select_on_thread(mqueue, option, max_size, thread);
+                       ipc_mqueue_select_on_thread(mqueue, IMQ_NULL, option,
+                                                   max_size, thread);
                        imq_unlock(mqueue);
                        splx(s);
                        return THREAD_NOT_WAITING;
                        imq_unlock(mqueue);
                        splx(s);
                        return THREAD_NOT_WAITING;
@@ -822,6 +920,7 @@ ipc_mqueue_receive_on_thread(
                }
        }
 
                }
        }
 
+       /* NOTE: need splsched() here if mqueue no longer needs irq disabled */
        thread_lock(thread);
        thread->ith_state = MACH_RCV_IN_PROGRESS;
        thread->ith_option = option;
        thread_lock(thread);
        thread->ith_state = MACH_RCV_IN_PROGRESS;
        thread->ith_option = option;
@@ -832,12 +931,13 @@ ipc_mqueue_receive_on_thread(
        else
                deadline = 0;
 
        else
                deadline = 0;
 
-       wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
-                                                 IPC_MQUEUE_RECEIVE,
-                                                 interruptible, 
-                                                 TIMEOUT_URGENCY_USER_NORMAL,
-                                                 deadline, 0,
-                                                 thread);
+       wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue,
+                                            IPC_MQUEUE_RECEIVE,
+                                            interruptible,
+                                            TIMEOUT_URGENCY_USER_NORMAL,
+                                            deadline,
+                                            TIMEOUT_NO_LEEWAY,
+                                            thread);
        /* preposts should be detected above, not here */
        if (wresult == THREAD_AWAKENED)
                panic("ipc_mqueue_receive_on_thread: sleep walking");
        /* preposts should be detected above, not here */
        if (wresult == THREAD_AWAKENED)
                panic("ipc_mqueue_receive_on_thread: sleep walking");
@@ -859,13 +959,16 @@ ipc_mqueue_receive_on_thread(
  *             mqueue locked.
  *              thread not locked.
  *             There is a message.
  *             mqueue locked.
  *              thread not locked.
  *             There is a message.
+ *             No need to reserve prepost objects - it will never prepost
+ *
  *     Returns:
  *             MACH_MSG_SUCCESS        Actually selected a message for ourselves.
  *             MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
  */
 void
 ipc_mqueue_select_on_thread(
  *     Returns:
  *             MACH_MSG_SUCCESS        Actually selected a message for ourselves.
  *             MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
  */
 void
 ipc_mqueue_select_on_thread(
-       ipc_mqueue_t            mqueue,
+       ipc_mqueue_t            port_mq,
+       ipc_mqueue_t            set_mq,
        mach_msg_option_t       option,
        mach_msg_size_t         max_size,
        thread_t                thread)
        mach_msg_option_t       option,
        mach_msg_size_t         max_size,
        thread_t                thread)
@@ -878,7 +981,7 @@ ipc_mqueue_select_on_thread(
         * Do some sanity checking of our ability to receive
         * before pulling the message off the queue.
         */
         * Do some sanity checking of our ability to receive
         * before pulling the message off the queue.
         */
-       kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
+       kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
        assert(kmsg != IKM_NULL);
 
        /*
        assert(kmsg != IKM_NULL);
 
        /*
@@ -891,7 +994,7 @@ ipc_mqueue_select_on_thread(
        if (rcv_size + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) {
                mr = MACH_RCV_TOO_LARGE;
                if (option & MACH_RCV_LARGE) {
        if (rcv_size + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) {
                mr = MACH_RCV_TOO_LARGE;
                if (option & MACH_RCV_LARGE) {
-                       thread->ith_receiver_name = mqueue->imq_receiver_name;
+                       thread->ith_receiver_name = port_mq->imq_receiver_name;
                        thread->ith_kmsg = IKM_NULL;
                        thread->ith_msize = rcv_size;
                        thread->ith_seqno = 0;
                        thread->ith_kmsg = IKM_NULL;
                        thread->ith_msize = rcv_size;
                        thread->ith_seqno = 0;
@@ -900,9 +1003,9 @@ ipc_mqueue_select_on_thread(
                }
        }
 
                }
        }
 
-       ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
-       ipc_mqueue_release_msgcount(mqueue);
-       thread->ith_seqno = mqueue->imq_seqno++;
+       ipc_kmsg_rmqueue_first_macro(&port_mq->imq_messages, kmsg);
+       ipc_mqueue_release_msgcount(port_mq, set_mq);
+       thread->ith_seqno = port_mq->imq_seqno++;
        thread->ith_kmsg = kmsg;
        thread->ith_state = mr;
 
        thread->ith_kmsg = kmsg;
        thread->ith_state = mr;
 
@@ -923,14 +1026,14 @@ ipc_mqueue_select_on_thread(
  *             Caller holds reference on the message queue.
  */
 unsigned
  *             Caller holds reference on the message queue.
  */
 unsigned
-ipc_mqueue_peek(ipc_mqueue_t           mq,
-               mach_port_seqno_t       *seqnop,
-               mach_msg_size_t         *msg_sizep,
-               mach_msg_id_t           *msg_idp,
-               mach_msg_max_trailer_t  *msg_trailerp)
+ipc_mqueue_peek(ipc_mqueue_t mq,
+                mach_port_seqno_t * seqnop,
+                mach_msg_size_t * msg_sizep,
+                mach_msg_id_t * msg_idp,
+                mach_msg_max_trailer_t * msg_trailerp)
 {
        ipc_kmsg_queue_t kmsgq;
 {
        ipc_kmsg_queue_t kmsgq;
-       ipc_kmsg_t kmsg; 
+       ipc_kmsg_t kmsg;
        mach_port_seqno_t seqno, msgoff;
        int res = 0;
        spl_t s;
        mach_port_seqno_t seqno, msgoff;
        int res = 0;
        spl_t s;
@@ -940,7 +1043,9 @@ ipc_mqueue_peek(ipc_mqueue_t               mq,
        s = splsched();
        imq_lock(mq);
 
        s = splsched();
        imq_lock(mq);
 
-       seqno = (seqnop != NULL) ? seqno = *seqnop : 0;
+       seqno = 0;
+       if (seqnop != NULL)
+               seqno = *seqnop;
 
        if (seqno == 0) {
                seqno = mq->imq_seqno;
 
        if (seqno == 0) {
                seqno = mq->imq_seqno;
@@ -980,6 +1085,29 @@ ipc_mqueue_peek(ipc_mqueue_t              mq,
        return res;
 }
 
        return res;
 }
 
+
+/*
+ * peek at the contained port message queues, break prepost iteration as soon
+ * as we spot a message on one of the message queues referenced by the set's
+ * prepost list.  No need to lock each message queue, as only the head of each
+ * queue is checked. If a message wasn't there before we entered here, no need
+ * to find it (if we do, great).
+ */
+static int mqueue_peek_iterator(void *ctx, struct waitq *waitq,
+                               struct waitq_set *wqset)
+{
+       ipc_mqueue_t port_mq = (ipc_mqueue_t)waitq;
+       ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
+
+       (void)ctx;
+       (void)wqset;
+               
+       if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL)
+               return WQ_ITERATE_BREAK; /* break out of the prepost iteration */
+
+       return WQ_ITERATE_CONTINUE;
+}
+
 /*
  *     Routine:        ipc_mqueue_set_peek
  *     Purpose:
 /*
  *     Routine:        ipc_mqueue_set_peek
  *     Purpose:
@@ -993,83 +1121,91 @@ ipc_mqueue_peek(ipc_mqueue_t             mq,
 unsigned
 ipc_mqueue_set_peek(ipc_mqueue_t mq)
 {
 unsigned
 ipc_mqueue_set_peek(ipc_mqueue_t mq)
 {
-       wait_queue_link_t       wql;
-       queue_t                 q;
        spl_t s;
        spl_t s;
-       int res;
+       int ret;
 
        assert(imq_is_set(mq));
 
        s = splsched();
        imq_lock(mq);
 
 
        assert(imq_is_set(mq));
 
        s = splsched();
        imq_lock(mq);
 
-       /* 
-        * peek at the contained port message queues, return as soon as
-        * we spot a message on one of the message queues linked on the
-        * prepost list.  No need to lock each message queue, as only the
-        * head of each queue is checked. If a message wasn't there before
-        * we entered here, no need to find it (if we do, great).
-        */
-       res = 0;
-       q = &mq->imq_preposts;
-       queue_iterate(q, wql, wait_queue_link_t, wql_preposts) {
-               ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
-               ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
-                       
-               if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
-                       res = 1;
-                       break;
-               }
-       }
+       ret = waitq_set_iterate_preposts(&mq->imq_set_queue, NULL,
+                                        mqueue_peek_iterator, NULL);
+
        imq_unlock(mq);
        splx(s);
        imq_unlock(mq);
        splx(s);
-       return res;
+       return (ret == WQ_ITERATE_BREAK);
 }
 
 /*
  *     Routine:        ipc_mqueue_set_gather_member_names
  *     Purpose:
 }
 
 /*
  *     Routine:        ipc_mqueue_set_gather_member_names
  *     Purpose:
- *             Iterate a message queue set to identify the member port
- *             names. Actual returned names is limited to maxnames entries,
- *             but we keep counting the actual number of members to let
- *             the caller decide to retry if necessary.
+ *             Discover all ports which are members of a given port set.
+ *             Because the waitq linkage mechanism was redesigned to save
+ *             significan amounts of memory, it no longer keeps back-pointers
+ *             from a port set to a port. Therefore, we must iterate over all
+ *             ports within a given IPC space and individually query them to
+ *             see if they are members of the given set. Port names of ports
+ *             found to be members of the given set will be gathered into the
+ *             provided 'names' array.  Actual returned names are limited to
+ *             maxnames entries, but we keep counting the actual number of
+ *             members to let the caller decide to retry if necessary.
  *
  *     Conditions:
  *             Locks may be held by callers, so this routine cannot block.
  *
  *     Conditions:
  *             Locks may be held by callers, so this routine cannot block.
- *             Caller holds reference on the message queue.
+ *             Caller holds reference on the message queue (via port set).
  */
 void
 ipc_mqueue_set_gather_member_names(
  */
 void
 ipc_mqueue_set_gather_member_names(
-       ipc_mqueue_t mq, 
-       ipc_entry_num_t maxnames, 
+       ipc_space_t space,
+       ipc_mqueue_t set_mq,
+       ipc_entry_num_t maxnames,
        mach_port_name_t *names,
        ipc_entry_num_t *actualp)
 {
        mach_port_name_t *names,
        ipc_entry_num_t *actualp)
 {
-       wait_queue_link_t       wql;
-       queue_t                 q;
-       spl_t s;
+       ipc_entry_t table;
+       ipc_entry_num_t tsize;
+       struct waitq_set *wqset;
        ipc_entry_num_t actual = 0;
 
        ipc_entry_num_t actual = 0;
 
-       assert(imq_is_set(mq));
+       assert(set_mq != IMQ_NULL);
+       wqset = &set_mq->imq_set_queue;
 
 
-       s = splsched();
-       imq_lock(mq);
+       assert(space != IS_NULL);
+       is_read_lock(space);
+       if (!is_active(space)) {
+               is_read_unlock(space);
+               goto out;
+       }
 
 
-       /* 
-        * Iterate over the member ports through the mqueue set links
-        * capturing as many names as we can.
-        */
-       q = &mq->imq_setlinks;
-       queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
-               ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
+       if (!waitq_set_is_valid(wqset)) {
+               is_read_unlock(space);
+               goto out;
+       }
 
 
-               if (actual < maxnames)
-                       names[actual] = port_mq->imq_receiver_name;
-               actual++;
+       table = space->is_table;
+       tsize = space->is_table_size;
+       for (ipc_entry_num_t idx = 0; idx < tsize; idx++) {
+               ipc_entry_t entry = &table[idx];
+
+               /* only receive rights can be members of port sets */
+               if ((entry->ie_bits & MACH_PORT_TYPE_RECEIVE) != MACH_PORT_TYPE_NONE) {
+                       __IGNORE_WCASTALIGN(ipc_port_t port = (ipc_port_t)entry->ie_object);
+                       ipc_mqueue_t mq = &port->ip_messages;
+
+                       assert(IP_VALID(port));
+                       if (ip_active(port) &&
+                           waitq_member(&mq->imq_wait_queue, wqset)) {
+                               if (actual < maxnames)
+                                       names[actual] = mq->imq_receiver_name;
+                               actual++;
+                       }
+               }
        }
        }
-       imq_unlock(mq);
-       splx(s);
 
 
+       is_read_unlock(space);
+
+out:
        *actualp = actual;
 }
 
        *actualp = actual;
 }
 
@@ -1093,17 +1229,23 @@ ipc_mqueue_destroy(
        boolean_t reap = FALSE;
        spl_t s;
 
        boolean_t reap = FALSE;
        spl_t s;
 
+       assert(!imq_is_set(mqueue));
+
        s = splsched();
        imq_lock(mqueue);
        s = splsched();
        imq_lock(mqueue);
+
        /*
         *      rouse all blocked senders
        /*
         *      rouse all blocked senders
+        *      (don't boost anyone - we're tearing this queue down)
+        *      (never preposts)
         */
        mqueue->imq_fullwaiters = FALSE;
         */
        mqueue->imq_fullwaiters = FALSE;
-       wait_queue_wakeup64_all_locked(
-                               &mqueue->imq_wait_queue,
-                               IPC_MQUEUE_FULL,
-                               THREAD_RESTART,
-                               FALSE);
+       waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
+                                 IPC_MQUEUE_FULL,
+                                 THREAD_RESTART,
+                                 NULL,
+                                 WAITQ_ALL_PRIORITIES,
+                                 WAITQ_KEEP_LOCKED);
 
        /*
         * Move messages from the specified queue to the per-thread
 
        /*
         * Move messages from the specified queue to the per-thread
@@ -1117,9 +1259,28 @@ ipc_mqueue_destroy(
                        reap = first;
        }
 
                        reap = first;
        }
 
+       /*
+        * Wipe out message count, both for messages about to be
+        * reaped and for reserved space for (previously) woken senders.
+        * This is the indication to them that their reserved space is gone
+        * (the mqueue was destroyed).
+        */
+       mqueue->imq_msgcount = 0;
+
+       /* clear out any preposting we may have done */
+       waitq_clear_prepost_locked(&mqueue->imq_wait_queue, &s);
+
        imq_unlock(mqueue);
        splx(s);
 
        imq_unlock(mqueue);
        splx(s);
 
+       /*
+        * assert that we're destroying a queue that's not a
+        * member of any other queue
+        */
+       assert(mqueue->imq_wait_queue.waitq_prepost_id == 0);
+       assert(mqueue->imq_wait_queue.waitq_set_id == 0);
+
+
        /*
         * Destroy the messages we enqueued if we aren't nested
         * inside some other attempt to drain the same queue.
        /*
         * Destroy the messages we enqueued if we aren't nested
         * inside some other attempt to drain the same queue.
@@ -1156,17 +1317,25 @@ ipc_mqueue_set_qlimit(
                 wakeup = qlimit - mqueue->imq_qlimit;
 
                 for (i = 0; i < wakeup; i++) {
                 wakeup = qlimit - mqueue->imq_qlimit;
 
                 for (i = 0; i < wakeup; i++) {
-                        if (wait_queue_wakeup64_one_locked(
-                                                       &mqueue->imq_wait_queue,
-                                                       IPC_MQUEUE_FULL,
-                                                       THREAD_AWAKENED,
-                                                       FALSE) == KERN_NOT_WAITING) {
-                                        mqueue->imq_fullwaiters = FALSE;
-                                        break;
-                        }
-                        mqueue->imq_msgcount++;  /* give it to the awakened thread */
+                       /*
+                        * boost the priority of the awoken thread
+                        * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
+                        * the message queue slot we've just reserved.
+                        *
+                        * NOTE: this will never prepost
+                        */
+                       if (waitq_wakeup64_one_locked(&mqueue->imq_wait_queue,
+                                                     IPC_MQUEUE_FULL,
+                                                     THREAD_AWAKENED,
+                                                     NULL,
+                                                     WAITQ_PROMOTE_PRIORITY,
+                                                     WAITQ_KEEP_LOCKED) == KERN_NOT_WAITING) {
+                               mqueue->imq_fullwaiters = FALSE;
+                               break;
+                       }
+                       mqueue->imq_msgcount++;  /* give it to the awakened thread */
                 }
                 }
-        }
+       }
        mqueue->imq_qlimit = qlimit;
        imq_unlock(mqueue);
        splx(s);
        mqueue->imq_qlimit = qlimit;
        imq_unlock(mqueue);
        splx(s);
@@ -1239,7 +1408,7 @@ ipc_mqueue_copyin(
        if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
                ipc_port_t port;
 
        if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
                ipc_port_t port;
 
-               port = (ipc_port_t) object;
+               __IGNORE_WCASTALIGN(port = (ipc_port_t) object);
                assert(port != IP_NULL);
 
                ip_lock(port);
                assert(port != IP_NULL);
 
                ip_lock(port);
@@ -1252,7 +1421,7 @@ ipc_mqueue_copyin(
        } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
                ipc_pset_t pset;
 
        } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
                ipc_pset_t pset;
 
-               pset = (ipc_pset_t) object;
+               __IGNORE_WCASTALIGN(pset = (ipc_pset_t) object);
                assert(pset != IPS_NULL);
 
                ips_lock(pset);
                assert(pset != IPS_NULL);
 
                ips_lock(pset);
@@ -1278,4 +1447,3 @@ ipc_mqueue_copyin(
        *mqueuep = mqueue;
        return MACH_MSG_SUCCESS;
 }
        *mqueuep = mqueue;
        return MACH_MSG_SUCCESS;
 }
-