]> git.saurik.com Git - apple/xnu.git/blobdiff - osfmk/ipc/ipc_mqueue.c
xnu-4903.221.2.tar.gz
[apple/xnu.git] / osfmk / ipc / ipc_mqueue.c
index 38af0db4cc330f9c33ccc9b328e148f4c23282dd..685950c902f9f5f95642eb228e06bc0e8597c691 100644 (file)
@@ -120,15 +120,14 @@ static void ipc_mqueue_peek_on_thread(
 void
 ipc_mqueue_init(
        ipc_mqueue_t    mqueue,
-       boolean_t       is_set,
-       uint64_t        *reserved_link)
+       boolean_t       is_set)
 {
        if (is_set) {
                waitq_set_init(&mqueue->imq_set_queue,
                               SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST,
-                              reserved_link, NULL);
+                              NULL, NULL);
        } else {
-               waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
+               waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO | SYNC_POLICY_PORT);
                ipc_kmsg_queue_init(&mqueue->imq_messages);
                mqueue->imq_seqno = 0;
                mqueue->imq_msgcount = 0;
@@ -298,6 +297,7 @@ ipc_mqueue_add(
        kern_return_t    kr;
 
        assert(reserved_link && *reserved_link != 0);
+       assert(waitqs_is_linked(set_waitq));
 
        imq_lock(port_mqueue);
 
@@ -371,7 +371,7 @@ ipc_mqueue_add(
                         */
                        msize = ipc_kmsg_copyout_size(kmsg, th->map);
                        if (th->ith_rsize <
-                                       (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(th), th->ith_option))) {
+                                       (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th), th->ith_option))) {
                                th->ith_state = MACH_RCV_TOO_LARGE;
                                th->ith_msize = msize;
                                if (th->ith_option & MACH_RCV_LARGE) {
@@ -427,8 +427,25 @@ void
 ipc_mqueue_changed(
        ipc_mqueue_t            mqueue)
 {
-       /* Indicate that this message queue is vanishing */
-       knote_vanish(&mqueue->imq_klist);
+       if (IMQ_KLIST_VALID(mqueue)) {
+               /*
+                * Indicate that this message queue is vanishing
+                *
+                * When this is called, the associated receive right may be in flight
+                * between two tasks: the one it used to live in, and the one that armed
+                * a port destroyed notification for it.
+                *
+                * The new process may want to register the port it gets back with an
+                * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
+                * port pending already, in which case we want the imq_klist field to be
+                * reusable for nefarious purposes (see IMQ_SET_INHERITOR).
+                *
+                * Fortunately, we really don't need this linkage anymore after this
+                * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
+                */
+               knote_vanish(&mqueue->imq_klist);
+               klist_init(&mqueue->imq_klist);
+       }
 
        waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
                                  IPC_MQUEUE_RECEIVE,
@@ -439,13 +456,13 @@ ipc_mqueue_changed(
 }
 
 
-               
+
 
 /*
  *     Routine:        ipc_mqueue_send
  *     Purpose:
  *             Send a message to a message queue.  The message holds a reference
- *             for the destination port for this message queue in the 
+ *             for the destination port for this message queue in the
  *             msgh_remote_port field.
  *
  *             If unsuccessful, the caller still has possession of
@@ -474,7 +491,7 @@ ipc_mqueue_send(
         *      3) Message is sent to a send-once right.
         */
        if (!imq_full(mqueue) ||
-           (!imq_full_kernel(mqueue) && 
+           (!imq_full_kernel(mqueue) &&
             ((option & MACH_SEND_ALWAYS) ||
              (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
               MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
@@ -483,9 +500,12 @@ ipc_mqueue_send(
                imq_unlock(mqueue);
        } else {
                thread_t cur_thread = current_thread();
+               ipc_port_t port = ip_from_mq(mqueue);
+               struct turnstile *send_turnstile = TURNSTILE_NULL;
+               turnstile_inheritor_t inheritor = TURNSTILE_INHERITOR_NULL;
                uint64_t deadline;
 
-               /* 
+               /*
                 * We have to wait for space to be granted to us.
                 */
                if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
@@ -504,38 +524,65 @@ ipc_mqueue_send(
                        deadline = 0;
 
                thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
-               wresult = waitq_assert_wait64_locked(
-                                               &mqueue->imq_wait_queue,
-                                               IPC_MQUEUE_FULL,
-                                               THREAD_ABORTSAFE,
-                                               TIMEOUT_URGENCY_USER_NORMAL,
-                                               deadline, TIMEOUT_NO_LEEWAY,
-                                               cur_thread);
+
+               send_turnstile = turnstile_prepare((uintptr_t)port,
+                       port_send_turnstile_address(port),
+                       TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
+
+               /* Check if the port in is in transit, get the destination port's turnstile */
+               if (ip_active(port) &&
+                   port->ip_receiver_name == MACH_PORT_NULL &&
+                   port->ip_destination != NULL) {
+                       inheritor = port_send_turnstile(port->ip_destination);
+               } else {
+                       inheritor = ipc_port_get_inheritor(port);
+               }
+
+               turnstile_update_inheritor(send_turnstile, inheritor,
+                               TURNSTILE_DELAYED_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
+
+               wresult = waitq_assert_wait64_leeway(
+                                       &send_turnstile->ts_waitq,
+                                       IPC_MQUEUE_FULL,
+                                       THREAD_ABORTSAFE,
+                                       TIMEOUT_URGENCY_USER_NORMAL,
+                                       deadline,
+                                       TIMEOUT_NO_LEEWAY);
 
                imq_unlock(mqueue);
-               
+               turnstile_update_inheritor_complete(send_turnstile,
+                               TURNSTILE_INTERLOCK_NOT_HELD);
+
                if (wresult == THREAD_WAITING) {
                        wresult = thread_block(THREAD_CONTINUE_NULL);
                        counter(c_ipc_mqueue_send_block++);
                }
-               
+
+               /* Call turnstile complete with interlock held */
+               imq_lock(mqueue);
+               turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL);
+               imq_unlock(mqueue);
+
+               /* Call cleanup after dropping the interlock */
+               turnstile_cleanup();
+
                switch (wresult) {
 
                case THREAD_AWAKENED:
-                       /* 
+                       /*
                         * we can proceed - inherited msgcount from waker
                         * or the message queue has been destroyed and the msgcount
                         * has been reset to zero (will detect in ipc_mqueue_post()).
                         */
                        break;
-                       
+
                case THREAD_TIMED_OUT:
                        assert(option & MACH_SEND_TIMEOUT);
                        return MACH_SEND_TIMED_OUT;
-                       
+
                case THREAD_INTERRUPTED:
                        return MACH_SEND_INTERRUPTED;
-                       
+
                case THREAD_RESTART:
                        /* mqueue is being destroyed */
                        return MACH_SEND_INVALID_DEST;
@@ -569,12 +616,14 @@ extern void ipc_mqueue_override_send(
        imq_lock(mqueue);
        assert(imq_valid(mqueue));
        assert(!imq_is_set(mqueue));
-       
+
        if (imq_full(mqueue)) {
                ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
 
-               if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override))
-                       KNOTE(&mqueue->imq_klist, 0);
+               if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override)) {
+                       if (IMQ_KLIST_VALID(mqueue))
+                               KNOTE(&mqueue->imq_klist, 0);
+               }
                if (!first)
                        full_queue_empty = TRUE;
        }
@@ -608,26 +657,32 @@ extern void ipc_mqueue_override_send(
 void
 ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq)
 {
+       struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(port_mq));
        (void)set_mq;
        assert(imq_held(port_mq));
        assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
 
        port_mq->imq_msgcount--;
 
-       if (!imq_full(port_mq) && port_mq->imq_fullwaiters) {
+       if (!imq_full(port_mq) && port_mq->imq_fullwaiters &&
+               send_turnstile != TURNSTILE_NULL) {
                /*
                 * boost the priority of the awoken thread
                 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
                 * the message queue slot we've just reserved.
                 *
                 * NOTE: this will never prepost
+                *
+                * The wakeup happens on a turnstile waitq
+                * which will wakeup the highest priority waiter.
+                * A potential downside of this would be starving low
+                * priority senders if there is a constant churn of
+                * high priority threads trying to send to this port.
                 */
-               if (waitq_wakeup64_one_locked(&port_mq->imq_wait_queue,
+               if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
                                              IPC_MQUEUE_FULL,
                                              THREAD_AWAKENED,
-                                             NULL,
-                                             WAITQ_PROMOTE_PRIORITY,
-                                             WAITQ_KEEP_LOCKED) != KERN_SUCCESS) {
+                                             WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
                        port_mq->imq_fullwaiters = FALSE;
                } else {
                        /* gave away our slot - add reference back */
@@ -694,24 +749,26 @@ ipc_mqueue_post(
 
                if (receiver == THREAD_NULL) {
 
-                       /* 
+                       /*
                         * no receivers; queue kmsg if space still reserved
                         * Reservations are cancelled when the port goes inactive.
                         * note that this will enqueue the message for any
-                        * "peeking" receivers. 
+                        * "peeking" receivers.
                         *
                         * Also, post the knote to wake up any threads waiting
                         * on that style of interface if this insertion is of
                         * note (first insertion, or adjusted override qos all
                         * the way to the head of the queue).
-                        * 
+                        *
                         * This is just for ports. portset knotes are stay-active,
                         * and their threads get awakened through the !MACH_RCV_IN_PROGRESS
                         * logic below).
                         */
                        if (mqueue->imq_msgcount > 0) {
-                               if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg))
-                                       KNOTE(&mqueue->imq_klist, 0);
+                               if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
+                                       if (IMQ_KLIST_VALID(mqueue))
+                                               KNOTE(&mqueue->imq_klist, 0);
+                               }
                                break;
                        }
 
@@ -722,7 +779,7 @@ ipc_mqueue_post(
                        destroy_msg = TRUE;
                        goto out_unlock;
                }
-       
+
                /*
                 * If a thread is attempting a "peek" into the message queue
                 * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
@@ -753,7 +810,7 @@ ipc_mqueue_post(
                        continue;
                }
 
-       
+
                /*
                 * We found a waiting thread.
                 * If the message is too large or the scatter list is too small
@@ -761,7 +818,7 @@ ipc_mqueue_post(
                 */
                msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
                if (receiver->ith_rsize <
-                               (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(receiver), receiver->ith_option))) {
+                               (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver), receiver->ith_option))) {
                        receiver->ith_msize = msize;
                        receiver->ith_state = MACH_RCV_TOO_LARGE;
                } else {
@@ -924,7 +981,7 @@ ipc_mqueue_receive(
                return;
 
        if (wresult == THREAD_WAITING) {
-               counter((interruptible == THREAD_ABORTSAFE) ? 
+               counter((interruptible == THREAD_ABORTSAFE) ?
                        c_ipc_mqueue_receive_block_user++ :
                        c_ipc_mqueue_receive_block_kernel++);
 
@@ -986,6 +1043,8 @@ ipc_mqueue_receive_on_thread(
 {
        wait_result_t           wresult;
        uint64_t                deadline;
+       struct turnstile        *rcv_turnstile = TURNSTILE_NULL;
+       turnstile_inheritor_t   inheritor = NULL;
 
        /* called with mqueue locked */
 
@@ -1001,7 +1060,7 @@ ipc_mqueue_receive_on_thread(
                 */
                return THREAD_RESTART;
        }
-       
+
        if (imq_is_set(mqueue)) {
                ipc_mqueue_t port_mq = IMQ_NULL;
 
@@ -1040,7 +1099,7 @@ ipc_mqueue_receive_on_thread(
                /*
                 * Receive on a single port. Just try to get the messages.
                 */
-               kmsgs = &mqueue->imq_messages;
+               kmsgs = &mqueue->imq_messages;
                if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
                        if (option & MACH_PEEK_MSG)
                                ipc_mqueue_peek_on_thread(mqueue, option, thread);
@@ -1054,7 +1113,7 @@ ipc_mqueue_receive_on_thread(
                panic("Unknown mqueue type 0x%x: likely memory corruption!\n",
                      mqueue->imq_wait_queue.waitq_type);
        }
-       
+
        /*
         * Looks like we'll have to block.  The mqueue we will
         * block on (whether the set's or the local port's) is
@@ -1082,6 +1141,37 @@ ipc_mqueue_receive_on_thread(
        else
                deadline = 0;
 
+       /*
+        * Threads waiting on a port (not portset)
+        * will wait on port's receive turnstile.
+        * Donate waiting thread's turnstile and
+        * setup inheritor for special reply port.
+        * Based on the state of the special reply
+        * port, the inheritor would be the send
+        * turnstile of the connection port on which
+        * the send of sync ipc would happen or
+        * workloop's turnstile who would reply to
+        * the sync ipc message.
+        *
+        * Pass in mqueue wait in waitq_assert_wait to
+        * support port set wakeup. The mqueue waitq of port
+        * will be converted to to turnstile waitq
+        * in waitq_assert_wait instead of global waitqs.
+        */
+       if (imq_is_queue(mqueue)) {
+               ipc_port_t port = ip_from_mq(mqueue);
+               rcv_turnstile = turnstile_prepare((uintptr_t)port,
+                       port_rcv_turnstile_address(port),
+                       TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
+
+               if (port->ip_specialreply) {
+                       inheritor = ipc_port_get_special_reply_port_inheritor(port);
+               }
+
+               turnstile_update_inheritor(rcv_turnstile, inheritor,
+                       (TURNSTILE_INHERITOR_TURNSTILE | TURNSTILE_DELAYED_UPDATE));
+       }
+
        thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
        wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue,
                                             IPC_MQUEUE_RECEIVE,
@@ -1096,6 +1186,12 @@ ipc_mqueue_receive_on_thread(
 
        imq_unlock(mqueue);
 
+       /* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
+       if (rcv_turnstile != TURNSTILE_NULL) {
+               turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
+       }
+       /* Its callers responsibility to call turnstile_complete to get the turnstile back */
+
        return wresult;
 }
 
@@ -1175,7 +1271,7 @@ ipc_mqueue_select_on_thread(
         * (and size needed).
         */
        msize = ipc_kmsg_copyout_size(kmsg, thread->map);
-       if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) {
+       if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(thread), option) > max_size) {
                mr = MACH_RCV_TOO_LARGE;
                if (option & MACH_RCV_LARGE) {
                        thread->ith_receiver_name = port_mq->imq_receiver_name;
@@ -1236,7 +1332,7 @@ ipc_mqueue_peek_locked(ipc_mqueue_t mq,
        if (seqno == 0) {
                seqno = mq->imq_seqno;
                msgoff = 0;
-       } else if (seqno >= mq->imq_seqno && 
+       } else if (seqno >= mq->imq_seqno &&
                   seqno < mq->imq_seqno + mq->imq_msgcount) {
                msgoff = seqno - mq->imq_seqno;
        } else
@@ -1259,7 +1355,7 @@ ipc_mqueue_peek_locked(ipc_mqueue_t mq,
        if (msg_idp != NULL)
                *msg_idp = kmsg->ikm_header->msgh_id;
        if (msg_trailerp != NULL)
-               memcpy(msg_trailerp, 
+               memcpy(msg_trailerp,
                       (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
                                                  round_msg(kmsg->ikm_header->msgh_size)),
                       sizeof(mach_msg_max_trailer_t));
@@ -1353,7 +1449,7 @@ static int mqueue_peek_iterator(void *ctx, struct waitq *waitq,
 
        (void)ctx;
        (void)wqset;
-               
+
        if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL)
                return WQ_ITERATE_BREAK; /* break out of the prepost iteration */
 
@@ -1482,6 +1578,7 @@ ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue)
        ipc_kmsg_queue_t kmqueue;
        ipc_kmsg_t kmsg;
        boolean_t reap = FALSE;
+       struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
 
        assert(!imq_is_set(mqueue));
 
@@ -1491,12 +1588,13 @@ ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue)
         *      (never preposts)
         */
        mqueue->imq_fullwaiters = FALSE;
-       waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
-                                 IPC_MQUEUE_FULL,
-                                 THREAD_RESTART,
-                                 NULL,
-                                 WAITQ_ALL_PRIORITIES,
-                                 WAITQ_KEEP_LOCKED);
+
+       if (send_turnstile != TURNSTILE_NULL) {
+               waitq_wakeup64_all(&send_turnstile->ts_waitq,
+                                  IPC_MQUEUE_FULL,
+                                  THREAD_RESTART,
+                                  WAITQ_ALL_PRIORITIES);
+       }
 
        /*
         * Move messages from the specified queue to the per-thread
@@ -1559,6 +1657,7 @@ ipc_mqueue_set_qlimit(
         imq_lock(mqueue);
         if (qlimit > mqueue->imq_qlimit) {
                 mach_port_msgcount_t i, wakeup;
+                struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
 
                 /* caution: wakeup, qlimit are unsigned */
                 wakeup = qlimit - mqueue->imq_qlimit;
@@ -1571,12 +1670,11 @@ ipc_mqueue_set_qlimit(
                         *
                         * NOTE: this will never prepost
                         */
-                       if (waitq_wakeup64_one_locked(&mqueue->imq_wait_queue,
-                                                     IPC_MQUEUE_FULL,
-                                                     THREAD_AWAKENED,
-                                                     NULL,
-                                                     WAITQ_PROMOTE_PRIORITY,
-                                                     WAITQ_KEEP_LOCKED) == KERN_NOT_WAITING) {
+                       if (send_turnstile == TURNSTILE_NULL ||
+                           waitq_wakeup64_one(&send_turnstile->ts_waitq,
+                                              IPC_MQUEUE_FULL,
+                                              THREAD_AWAKENED,
+                                              WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
                                mqueue->imq_fullwaiters = FALSE;
                                break;
                        }