]> git.saurik.com Git - apple/xnu.git/blobdiff - osfmk/ipc/ipc_mqueue.c
xnu-344.tar.gz
[apple/xnu.git] / osfmk / ipc / ipc_mqueue.c
index 0c7502e6afa0efbb3ce6a46884c99daa9d4fe0b3..81defb81f9531e4d65efe63fedd202cef47b21a3 100644 (file)
@@ -94,7 +94,7 @@ ipc_mqueue_init(
        boolean_t       is_set)
 {
        if (is_set) {
-               wait_queue_sub_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
+               wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
        } else {
                wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
                ipc_kmsg_queue_init(&mqueue->imq_messages);
@@ -133,39 +133,51 @@ ipc_mqueue_member(
  *     Routine:        ipc_mqueue_remove
  *     Purpose:
  *             Remove the association between the queue and the specified
- *             subordinate message queue.
+ *             set message queue.
  */
 
 kern_return_t
 ipc_mqueue_remove(
        ipc_mqueue_t     mqueue,
-       ipc_mqueue_t     sub_mqueue)
+       ipc_mqueue_t     set_mqueue)
 {
        wait_queue_t     mq_waitq = &mqueue->imq_wait_queue;
-       wait_queue_sub_t sub_waitq = &sub_mqueue->imq_set_queue;
+       wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
 
-       if (wait_queue_member(mq_waitq, sub_waitq)) {
-           wait_queue_unlink(mq_waitq, sub_waitq);
-           return KERN_SUCCESS;
-       }
-       return KERN_NOT_IN_SET;    
+       return wait_queue_unlink(mq_waitq, set_waitq);
 }
 
 /*
- *     Routine:        ipc_mqueue_remove_one
+ *     Routine:        ipc_mqueue_remove_from_all
  *     Purpose:
- *             Find and remove one subqueue from the queue.
+ *             Remove the mqueue from all the sets it is a member of
  *     Conditions:
- *             Will return the set mqueue that was removed
+ *             Nothing locked.
  */
 void
-ipc_mqueue_remove_one(
-       ipc_mqueue_t    mqueue,
-       ipc_mqueue_t    *sub_queuep)
+ipc_mqueue_remove_from_all(
+       ipc_mqueue_t    mqueue)
 {
        wait_queue_t    mq_waitq = &mqueue->imq_wait_queue;
 
-       wait_queue_unlink_one(mq_waitq, (wait_queue_sub_t *)sub_queuep);
+       wait_queue_unlink_all(mq_waitq);
+       return;
+}
+
+/*
+ *     Routine:        ipc_mqueue_remove_all
+ *     Purpose:
+ *             Remove all the member queues from the specified set.
+ *     Conditions:
+ *             Nothing locked.
+ */
+void
+ipc_mqueue_remove_all(
+       ipc_mqueue_t    mqueue)
+{
+       wait_queue_set_t        mq_setq = &mqueue->imq_set_queue;
+
+       wait_queue_set_unlink_all(mq_setq);
        return;
 }
 
@@ -187,7 +199,7 @@ ipc_mqueue_add(
        ipc_mqueue_t     set_mqueue)
 {
        wait_queue_t     port_waitq = &port_mqueue->imq_wait_queue;
-       wait_queue_sub_t set_waitq = &set_mqueue->imq_set_queue;
+       wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
        ipc_kmsg_queue_t kmsgq;
        ipc_kmsg_t       kmsg, next;
        kern_return_t    kr;
@@ -213,10 +225,11 @@ ipc_mqueue_add(
                for (;;) {
                        thread_t th;
 
-                       th = wait_queue_wakeup_identity_locked(port_waitq,
-                                                                                                  IPC_MQUEUE_RECEIVE,
-                                                                                                  THREAD_AWAKENED,
-                                                                                                  FALSE);
+                       th = wait_queue_wakeup64_identity_locked(
+                                               port_waitq,
+                                               IPC_MQUEUE_RECEIVE,
+                                               THREAD_AWAKENED,
+                                               FALSE);
                        /* waitq/mqueue still locked, thread locked */
 
                        if (th == THREAD_NULL)
@@ -279,10 +292,11 @@ void
 ipc_mqueue_changed(
        ipc_mqueue_t            mqueue)
 {
-       wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue,
-                                                                IPC_MQUEUE_RECEIVE,
-                                                                THREAD_RESTART,
-                                                                FALSE);                /* unlock waitq? */
+       wait_queue_wakeup64_all_locked(
+                               &mqueue->imq_wait_queue,
+                               IPC_MQUEUE_RECEIVE,
+                               THREAD_RESTART,
+                               FALSE);         /* unlock waitq? */
 }
 
 
@@ -312,7 +326,7 @@ ipc_mqueue_send(
        mach_msg_option_t       option,
        mach_msg_timeout_t      timeout)
 {
-       int save_wait_result;
+       int wresult;
        spl_t s;
 
        /*
@@ -342,33 +356,36 @@ ipc_mqueue_send(
                        return MACH_SEND_TIMED_OUT;
                }
                mqueue->imq_fullwaiters = TRUE;
-               wait_queue_assert_wait_locked(&mqueue->imq_wait_queue,
-                                                                         IPC_MQUEUE_FULL,
-                                                                         THREAD_ABORTSAFE,
-                                                                         TRUE); /* unlock? */
+               wresult = wait_queue_assert_wait64_locked(
+                                               &mqueue->imq_wait_queue,
+                                               IPC_MQUEUE_FULL,
+                                               THREAD_ABORTSAFE,
+                                               TRUE); /* unlock? */
                /* wait/mqueue is unlocked */
                splx(s);
                
-               if (option & MACH_SEND_TIMEOUT)
-                       thread_set_timer(timeout, 1000*NSEC_PER_USEC);
-
-               counter(c_ipc_mqueue_send_block++);
-               save_wait_result = thread_block((void (*)(void)) 0);
+               if (wresult == THREAD_WAITING) {
+                       if (option & MACH_SEND_TIMEOUT) {
+                               thread_set_timer(timeout, 1000*NSEC_PER_USEC);
+                               wresult = thread_block(THREAD_CONTINUE_NULL);
+                               if (wresult != THREAD_TIMED_OUT)
+                                       thread_cancel_timer();
+                       } else {
+                               wresult = thread_block(THREAD_CONTINUE_NULL);
+                       }
+                       counter(c_ipc_mqueue_send_block++);
+               }
                
-               switch (save_wait_result) {
+               switch (wresult) {
                case THREAD_TIMED_OUT:
                        assert(option & MACH_SEND_TIMEOUT);
                        return MACH_SEND_TIMED_OUT;
                        
                case THREAD_AWAKENED:
                        /* we can proceed - inherited msgcount from waker */
-                       if (option & MACH_SEND_TIMEOUT)
-                               thread_cancel_timer();
                        break;
                        
                case THREAD_INTERRUPTED:
-                       if (option & MACH_SEND_TIMEOUT)
-                               thread_cancel_timer();
                        return MACH_SEND_INTERRUPTED;
                        
                case THREAD_RESTART:
@@ -399,10 +416,11 @@ ipc_mqueue_release_msgcount(
 
        mqueue->imq_msgcount--;
        if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
-               if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue,
-                                                                                IPC_MQUEUE_FULL,
-                                                                                THREAD_AWAKENED,
-                                                                                FALSE) != KERN_SUCCESS) {
+               if (wait_queue_wakeup64_one_locked(
+                                               &mqueue->imq_wait_queue,
+                                               IPC_MQUEUE_FULL,
+                                               THREAD_AWAKENED,
+                                               FALSE) != KERN_SUCCESS) {
                        mqueue->imq_fullwaiters = FALSE;
                } else {
                        mqueue->imq_msgcount++;  /* gave it away */
@@ -440,10 +458,11 @@ ipc_mqueue_post(
                wait_queue_t waitq = &mqueue->imq_wait_queue;
                thread_t receiver;
 
-               receiver = wait_queue_wakeup_identity_locked(waitq,
-                                                                                                        IPC_MQUEUE_RECEIVE,
-                                                                                                        THREAD_AWAKENED,
-                                                                                                        FALSE);
+               receiver = wait_queue_wakeup64_identity_locked(
+                                                       waitq,
+                                                       IPC_MQUEUE_RECEIVE,
+                                                       THREAD_AWAKENED,
+                                                       FALSE);
                /* waitq still locked, thread locked */
 
                if (receiver == THREAD_NULL) {
@@ -612,11 +631,11 @@ ipc_mqueue_receive(
        mach_msg_timeout_t      timeout,
        int                     interruptible)
 {
-       ipc_port_t port;
-       mach_msg_return_t mr, mr2;
-       ipc_kmsg_queue_t kmsgs;
-       kern_return_t   save_wait_result;
-       thread_t self;
+       ipc_port_t              port;
+       mach_msg_return_t       mr, mr2;
+       ipc_kmsg_queue_t        kmsgs;
+       wait_result_t           wresult;
+       thread_t                self;
        ipc_kmsg_t              *kmsgp;
        mach_port_seqno_t       *seqnop;
        spl_t s;
@@ -644,7 +663,7 @@ ipc_mqueue_receive(
                 * change will succeed and find us.
                 */
        search_set:
-               queue_iterate(q, wql, wait_queue_link_t, wql_sublinks) {
+               queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
                        port_mq = (ipc_mqueue_t)wql->wql_queue;
                        kmsgs = &port_mq->imq_messages;
                        
@@ -671,8 +690,8 @@ ipc_mqueue_receive(
                                imq_unlock(port_mq);
                                continue;
                        }
-                       queue_remove(q, wql, wait_queue_link_t, wql_sublinks);
-                       queue_enter(q, wql, wait_queue_link_t, wql_sublinks);
+                       queue_remove(q, wql, wait_queue_link_t, wql_setlinks);
+                       queue_enter(q, wql, wait_queue_link_t, wql_setlinks);
                        imq_unlock(mqueue);
 
                        ipc_mqueue_select(port_mq, option, max_size);
@@ -715,34 +734,29 @@ ipc_mqueue_receive(
        self->ith_option = option;
        self->ith_msize = max_size;
                
-       wait_queue_assert_wait_locked(&mqueue->imq_wait_queue,
-                                                                 IPC_MQUEUE_RECEIVE,
-                                                                 interruptible,
-                                                                 TRUE); /* unlock? */
+       wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
+                                               IPC_MQUEUE_RECEIVE,
+                                               interruptible,
+                                               TRUE); /* unlock? */
        /* mqueue/waitq is unlocked */
        splx(s);
 
-       if (option & MACH_RCV_TIMEOUT) {
-               thread_set_timer(timeout, 1000*NSEC_PER_USEC);
-       }
+       if (wresult == THREAD_WAITING) {
+               if (option & MACH_RCV_TIMEOUT)
+                       thread_set_timer(timeout, 1000*NSEC_PER_USEC);
 
-       if (interruptible == THREAD_ABORTSAFE) {
-               counter(c_ipc_mqueue_receive_block_user++);
-       } else {
-               counter(c_ipc_mqueue_receive_block_kernel++);
-       }
+               if (interruptible == THREAD_ABORTSAFE)
+                       counter(c_ipc_mqueue_receive_block_user++);
+               else
+                       counter(c_ipc_mqueue_receive_block_kernel++);
 
-#if defined (__i386__)
-       thread_block((void (*)(void))0);
-#else
-       if (self->ith_continuation) {
-               thread_block(ipc_mqueue_receive_continue);
-       } else {
-               thread_block((void (*)(void))0);
-       }
-#endif
+               if (self->ith_continuation)
+                       thread_block(ipc_mqueue_receive_continue);
+                       /* NOTREACHED */
 
-       ipc_mqueue_receive_results();  /* if we fell thru */
+               thread_block(THREAD_CONTINUE_NULL);
+       }
+       ipc_mqueue_receive_results();
 }
 
 
@@ -834,10 +848,11 @@ ipc_mqueue_destroy(
         *      rouse all blocked senders
         */
        mqueue->imq_fullwaiters = FALSE;
-       wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue,
-                                                                IPC_MQUEUE_FULL,
-                                                                THREAD_AWAKENED,
-                                                                FALSE);
+       wait_queue_wakeup64_all_locked(
+                               &mqueue->imq_wait_queue,
+                               IPC_MQUEUE_FULL,
+                               THREAD_AWAKENED,
+                               FALSE);
 
        kmqueue = &mqueue->imq_messages;
 
@@ -880,10 +895,11 @@ ipc_mqueue_set_qlimit(
                 wakeup = qlimit - mqueue->imq_qlimit;
 
                 for (i = 0; i < wakeup; i++) {
-                        if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue,
-                                                                                         IPC_MQUEUE_FULL,
-                                                                                         THREAD_AWAKENED,
-                                                                                         FALSE) == KERN_NOT_WAITING) {
+                        if (wait_queue_wakeup64_one_locked(
+                                                       &mqueue->imq_wait_queue,
+                                                       IPC_MQUEUE_FULL,
+                                                       THREAD_AWAKENED,
+                                                       FALSE) == KERN_NOT_WAITING) {
                                         mqueue->imq_fullwaiters = FALSE;
                                         break;
                         }