]> git.saurik.com Git - apple/xnu.git/blobdiff - osfmk/ipc/ipc_mqueue.c
xnu-792.24.17.tar.gz
[apple/xnu.git] / osfmk / ipc / ipc_mqueue.c
index d00ebe34f6f82c45055423a4c47d3070e485dc9a..9150acbc069bf6f78e77dc6de319eaba726d39c2 100644 (file)
@@ -1,24 +1,21 @@
 /*
- * Copyright (c) 2000 Apple Computer, Inc. All rights reserved.
+ * Copyright (c) 2000-2004 Apple Computer, Inc. All rights reserved.
  *
  * @APPLE_LICENSE_HEADER_START@
  * 
- * Copyright (c) 1999-2003 Apple Computer, Inc.  All Rights Reserved.
+ * The contents of this file constitute Original Code as defined in and
+ * are subject to the Apple Public Source License Version 1.1 (the
+ * "License").  You may not use this file except in compliance with the
+ * License.  Please obtain a copy of the License at
+ * http://www.apple.com/publicsource and read it before using this file.
  * 
- * This file contains Original Code and/or Modifications of Original Code
- * as defined in and that are subject to the Apple Public Source License
- * Version 2.0 (the 'License'). You may not use this file except in
- * compliance with the License. Please obtain a copy of the License at
- * http://www.opensource.apple.com/apsl/ and read it before using this
- * file.
- * 
- * The Original Code and all software distributed under the License are
- * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
+ * This Original Code and all software distributed under the License are
+ * distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, EITHER
  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
- * Please see the License for the specific language governing rights and
- * limitations under the License.
+ * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT.  Please see the
+ * License for the specific language governing rights and limitations
+ * under the License.
  * 
  * @APPLE_LICENSE_HEADER_END@
  */
@@ -68,6 +65,7 @@
 #include <kern/counters.h>
 #include <kern/sched_prim.h>
 #include <kern/ipc_kobject.h>
+#include <kern/ipc_mig.h>      /* XXX - for mach_msg_receive_continue */
 #include <kern/misc_protos.h>
 #include <kern/task.h>
 #include <kern/thread.h>
@@ -86,6 +84,9 @@ int ipc_mqueue_rcv;           /* address is event for message arrival */
 
 #define TR_ENABLE 0
 
+/* forward declarations */
+void ipc_mqueue_receive_results(wait_result_t result);
+
 /*
  *     Routine:        ipc_mqueue_init
  *     Purpose:
@@ -122,11 +123,11 @@ ipc_mqueue_init(
 
 boolean_t
 ipc_mqueue_member(
-       ipc_mqueue_t    port_mqueue,
-       ipc_mqueue_t    set_mqueue)
+       ipc_mqueue_t            port_mqueue,
+       ipc_mqueue_t            set_mqueue)
 {
        wait_queue_t    port_waitq = &port_mqueue->imq_wait_queue;
-       wait_queue_t    set_waitq = &set_mqueue->imq_wait_queue;
+       wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
 
        return (wait_queue_member(port_waitq, set_waitq));
 
@@ -247,10 +248,10 @@ ipc_mqueue_add(
                         * just move onto the next.
                         */
                        if (th->ith_msize <
-                           kmsg->ikm_header.msgh_size +
+                           kmsg->ikm_header->msgh_size +
                            REQUESTED_TRAILER_SIZE(th->ith_option)) {
                                th->ith_state = MACH_RCV_TOO_LARGE;
-                               th->ith_msize = kmsg->ikm_header.msgh_size;
+                               th->ith_msize = kmsg->ikm_header->msgh_size;
                                if (th->ith_option & MACH_RCV_LARGE) {
                                        /*
                                         * let him go without message
@@ -268,8 +269,9 @@ ipc_mqueue_add(
                         * This thread is going to take this message,
                         * so give it to him.
                         */
-                       ipc_mqueue_release_msgcount(port_mqueue);
                        ipc_kmsg_rmqueue(kmsgq, kmsg);
+                       ipc_mqueue_release_msgcount(port_mqueue);
+
                        th->ith_kmsg = kmsg;
                        th->ith_seqno = port_mqueue->imq_seqno++;
                        thread_unlock(th);
@@ -327,7 +329,7 @@ ipc_mqueue_send(
        ipc_mqueue_t            mqueue,
        ipc_kmsg_t                      kmsg,
        mach_msg_option_t       option,
-       mach_msg_timeout_t      timeout)
+       mach_msg_timeout_t      send_timeout)
 {
        int wresult;
        spl_t s;
@@ -343,39 +345,41 @@ ipc_mqueue_send(
 
        if (!imq_full(mqueue) ||
                (option & MACH_SEND_ALWAYS) ||
-               (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) ==
+               (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
                 MACH_MSG_TYPE_PORT_SEND_ONCE)) {
                mqueue->imq_msgcount++;
+               assert(mqueue->imq_msgcount > 0);
                imq_unlock(mqueue);
                splx(s);
        } else {
+               thread_t cur_thread = current_thread();
+               uint64_t deadline;
 
                /* 
                 * We have to wait for space to be granted to us.
                 */
-               if ((option & MACH_SEND_TIMEOUT) && (timeout == 0)) {
+               if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
                        imq_unlock(mqueue);
                        splx(s);
                        return MACH_SEND_TIMED_OUT;
                }
                mqueue->imq_fullwaiters = TRUE;
+               thread_lock(cur_thread);
+               if (option & MACH_SEND_TIMEOUT)
+                       clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
+               else
+                       deadline = 0;
                wresult = wait_queue_assert_wait64_locked(
                                                &mqueue->imq_wait_queue,
                                                IPC_MQUEUE_FULL,
-                                               THREAD_ABORTSAFE,
-                                               TRUE); /* unlock? */
-               /* wait/mqueue is unlocked */
+                                               THREAD_ABORTSAFE, deadline,
+                                               cur_thread);
+               thread_unlock(cur_thread);
+               imq_unlock(mqueue);
                splx(s);
                
                if (wresult == THREAD_WAITING) {
-                       if (option & MACH_SEND_TIMEOUT) {
-                               thread_set_timer(timeout, 1000*NSEC_PER_USEC);
-                               wresult = thread_block(THREAD_CONTINUE_NULL);
-                               if (wresult != THREAD_TIMED_OUT)
-                                       thread_cancel_timer();
-                       } else {
-                               wresult = thread_block(THREAD_CONTINUE_NULL);
-                       }
+                       wresult = thread_block(THREAD_CONTINUE_NULL);
                        counter(c_ipc_mqueue_send_block++);
                }
                
@@ -386,6 +390,7 @@ ipc_mqueue_send(
                        
                case THREAD_AWAKENED:
                        /* we can proceed - inherited msgcount from waker */
+                       assert(mqueue->imq_msgcount > 0);
                        break;
                        
                case THREAD_INTERRUPTED:
@@ -408,16 +413,18 @@ ipc_mqueue_send(
  *             found a waiter.
  *
  *     Conditions:
- *             The message queue is locked
+ *             The message queue is locked.
+ *             The message corresponding to this reference is off the queue.
  */
 void
 ipc_mqueue_release_msgcount(
        ipc_mqueue_t mqueue)    
 {
        assert(imq_held(mqueue));
-       assert(mqueue->imq_msgcount > 0);
+       assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
 
        mqueue->imq_msgcount--;
+
        if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
                if (wait_queue_wakeup64_one_locked(
                                                &mqueue->imq_wait_queue,
@@ -426,7 +433,8 @@ ipc_mqueue_release_msgcount(
                                                FALSE) != KERN_SUCCESS) {
                        mqueue->imq_fullwaiters = FALSE;
                } else {
-                       mqueue->imq_msgcount++;  /* gave it away */
+                       /* gave away our slot - add reference back */
+                       mqueue->imq_msgcount++; 
                }
        }
 }
@@ -483,9 +491,9 @@ ipc_mqueue_post(
                 * the thread we wake up will get that as its status.
                 */
                if (receiver->ith_msize <
-                   (kmsg->ikm_header.msgh_size) +
+                   (kmsg->ikm_header->msgh_size) +
                    REQUESTED_TRAILER_SIZE(receiver->ith_option)) {
-                       receiver->ith_msize = kmsg->ikm_header.msgh_size;
+                       receiver->ith_msize = kmsg->ikm_header->msgh_size;
                        receiver->ith_state = MACH_RCV_TOO_LARGE;
                } else {
                        receiver->ith_state = MACH_MSG_SUCCESS;
@@ -526,12 +534,11 @@ ipc_mqueue_post(
 }
 
 
-kern_return_t
-ipc_mqueue_receive_results(void)
+/* static */ void
+ipc_mqueue_receive_results(wait_result_t saved_wait_result)
 {
        thread_t                self = current_thread();
        mach_msg_option_t       option = self->ith_option;
-       kern_return_t           saved_wait_result = self->wait_result;
        kern_return_t           mr;
 
        /*
@@ -543,15 +550,11 @@ ipc_mqueue_receive_results(void)
                return;
 
        case THREAD_INTERRUPTED:
-               if (option & MACH_RCV_TIMEOUT)
-                       thread_cancel_timer();
                self->ith_state = MACH_RCV_INTERRUPTED;
                return;
 
        case THREAD_RESTART:
                /* something bad happened to the port/set */
-               if (option & MACH_RCV_TIMEOUT)
-                       thread_cancel_timer();
                self->ith_state = MACH_RCV_PORT_CHANGED;
                return;
 
@@ -560,9 +563,6 @@ ipc_mqueue_receive_results(void)
                 * We do not need to go select a message, somebody
                 * handed us one (or a too-large indication).
                 */
-               if (option & MACH_RCV_TIMEOUT)
-                       thread_cancel_timer();
-               
                mr = MACH_MSG_SUCCESS;
 
                switch (self->ith_state) {
@@ -592,9 +592,11 @@ ipc_mqueue_receive_results(void)
 }
 
 void
-ipc_mqueue_receive_continue(void)
+ipc_mqueue_receive_continue(
+       __unused void *param,
+       wait_result_t wresult)
 {
-       ipc_mqueue_receive_results();
+       ipc_mqueue_receive_results(wresult);
        mach_msg_receive_continue();  /* hard-coded for now */
 }
 
@@ -628,20 +630,17 @@ ipc_mqueue_receive_continue(void)
 
 void
 ipc_mqueue_receive(
-       ipc_mqueue_t            mqueue,
-       mach_msg_option_t       option,
-       mach_msg_size_t         max_size,
-       mach_msg_timeout_t      timeout,
-       int                     interruptible)
+       ipc_mqueue_t         mqueue,
+       mach_msg_option_t    option,
+       mach_msg_size_t      max_size,
+       mach_msg_timeout_t   rcv_timeout,
+       int                  interruptible)
 {
-       ipc_port_t              port;
-       mach_msg_return_t       mr, mr2;
-       ipc_kmsg_queue_t        kmsgs;
-       wait_result_t           wresult;
-       thread_t                self;
-       ipc_kmsg_t              *kmsgp;
-       mach_port_seqno_t       *seqnop;
-       spl_t s;
+       ipc_kmsg_queue_t        kmsgs;
+       wait_result_t           wresult;
+       thread_t                self;
+       uint64_t                                deadline;
+       spl_t                   s;
 
        s = splsched();
        imq_lock(mqueue);
@@ -725,7 +724,7 @@ ipc_mqueue_receive(
         */
        self = current_thread();
        if (option & MACH_RCV_TIMEOUT) {
-               if (timeout == 0) {
+               if (rcv_timeout == 0) {
                        imq_unlock(mqueue);
                        splx(s);
                        self->ith_state = MACH_RCV_TIMED_OUT;
@@ -733,33 +732,36 @@ ipc_mqueue_receive(
                }
        }
 
+       thread_lock(self);
        self->ith_state = MACH_RCV_IN_PROGRESS;
        self->ith_option = option;
        self->ith_msize = max_size;
-               
+
+       if (option & MACH_RCV_TIMEOUT)
+               clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
+       else
+               deadline = 0;
+
        wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
-                                               IPC_MQUEUE_RECEIVE,
-                                               interruptible,
-                                               TRUE); /* unlock? */
-       /* mqueue/waitq is unlocked */
+                                                                                               IPC_MQUEUE_RECEIVE,
+                                                                                               interruptible, deadline,
+                                                                                               self);
+       thread_unlock(self);
+       imq_unlock(mqueue);
        splx(s);
 
        if (wresult == THREAD_WAITING) {
-               if (option & MACH_RCV_TIMEOUT)
-                       thread_set_timer(timeout, 1000*NSEC_PER_USEC);
-
-               if (interruptible == THREAD_ABORTSAFE)
-                       counter(c_ipc_mqueue_receive_block_user++);
-               else
-                       counter(c_ipc_mqueue_receive_block_kernel++);
+               counter((interruptible == THREAD_ABORTSAFE) ? 
+                       c_ipc_mqueue_receive_block_user++ :
+                       c_ipc_mqueue_receive_block_kernel++);
 
                if (self->ith_continuation)
                        thread_block(ipc_mqueue_receive_continue);
                        /* NOTREACHED */
 
-               thread_block(THREAD_CONTINUE_NULL);
+               wresult = thread_block(THREAD_CONTINUE_NULL);
        }
-       ipc_mqueue_receive_results();
+       ipc_mqueue_receive_results(wresult);
 }
 
 
@@ -784,8 +786,8 @@ ipc_mqueue_select(
 {
        thread_t self = current_thread();
        ipc_kmsg_t kmsg;
-       mach_port_seqno_t seqno;
        mach_msg_return_t mr;
+       mach_msg_size_t rcv_size;
 
        mr = MACH_MSG_SUCCESS;
 
@@ -795,26 +797,24 @@ ipc_mqueue_select(
         * before pulling the message off the queue.
         */
        kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
-
        assert(kmsg != IKM_NULL);
 
-       if (kmsg->ikm_header.msgh_size + 
-               REQUESTED_TRAILER_SIZE(option) > max_size) {
-               mr = MACH_RCV_TOO_LARGE;
-       } 
-
        /*
         * If we really can't receive it, but we had the
         * MACH_RCV_LARGE option set, then don't take it off
         * the queue, instead return the appropriate error
         * (and size needed).
         */
-       if ((mr == MACH_RCV_TOO_LARGE) && (option & MACH_RCV_LARGE)) {
-               self->ith_kmsg = IKM_NULL;
-               self->ith_msize = kmsg->ikm_header.msgh_size;
-               self->ith_seqno = 0;
-               self->ith_state = mr;
-               return;
+       rcv_size = ipc_kmsg_copyout_size(kmsg, self->map);
+       if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
+               mr = MACH_RCV_TOO_LARGE;
+               if (option & MACH_RCV_LARGE) {
+                       self->ith_kmsg = IKM_NULL;
+                       self->ith_msize = rcv_size;
+                       self->ith_seqno = 0;
+                       self->ith_state = mr;
+                       return;
+               }
        }
 
        ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
@@ -888,6 +888,8 @@ ipc_mqueue_set_qlimit(
 {
         spl_t s;
 
+        assert(qlimit <= MACH_PORT_QLIMIT_MAX);
+
         /* wake up senders allowed by the new qlimit */
         s = splsched();
         imq_lock(mqueue);
@@ -906,6 +908,7 @@ ipc_mqueue_set_qlimit(
                                         mqueue->imq_fullwaiters = FALSE;
                                         break;
                         }
+                        mqueue->imq_msgcount++;  /* give it to the awakened thread */
                 }
         }
        mqueue->imq_qlimit = qlimit;
@@ -979,7 +982,6 @@ ipc_mqueue_copyin(
 
        if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
                ipc_port_t port;
-               ipc_pset_t pset;
 
                port = (ipc_port_t) object;
                assert(port != IP_NULL);