X-Git-Url: https://git.saurik.com/apple/xnu.git/blobdiff_plain/1c79356b52d46aa6b508fb032f5ae709b1f2897b..060df5ea7c632b1ac8cc8aac1fb59758165c2084:/osfmk/ipc/ipc_mqueue.c diff --git a/osfmk/ipc/ipc_mqueue.c b/osfmk/ipc/ipc_mqueue.c index 0c7502e6a..9d17b81b9 100644 --- a/osfmk/ipc/ipc_mqueue.c +++ b/osfmk/ipc/ipc_mqueue.c @@ -1,23 +1,29 @@ /* - * Copyright (c) 2000 Apple Computer, Inc. All rights reserved. + * Copyright (c) 2000-2007 Apple Inc. All rights reserved. * - * @APPLE_LICENSE_HEADER_START@ + * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ * - * The contents of this file constitute Original Code as defined in and - * are subject to the Apple Public Source License Version 1.1 (the - * "License"). You may not use this file except in compliance with the - * License. Please obtain a copy of the License at - * http://www.apple.com/publicsource and read it before using this file. + * This file contains Original Code and/or Modifications of Original Code + * as defined in and that are subject to the Apple Public Source License + * Version 2.0 (the 'License'). You may not use this file except in + * compliance with the License. The rights granted to you under the License + * may not be used to create, or enable the creation or redistribution of, + * unlawful or unlicensed copies of an Apple operating system, or to + * circumvent, violate, or enable the circumvention or violation of, any + * terms of an Apple operating system software license agreement. * - * This Original Code and all software distributed under the License are - * distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, EITHER + * Please obtain a copy of the License at + * http://www.opensource.apple.com/apsl/ and read it before using this file. + * + * The Original Code and all software distributed under the License are + * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. Please see the - * License for the specific language governing rights and limitations - * under the License. + * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. + * Please see the License for the specific language governing rights and + * limitations under the License. * - * @APPLE_LICENSE_HEADER_END@ + * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ */ /* * @OSF_FREE_COPYRIGHT@ @@ -56,6 +62,13 @@ * * Functions to manipulate IPC message queues. */ +/* + * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce + * support for mandatory and extensible security protections. This notice + * is included in support of clause 2.2 (b) of the Apple Public License, + * Version 2.0. + */ + #include #include @@ -65,6 +78,7 @@ #include #include #include +#include /* XXX - for mach_msg_receive_continue */ #include #include #include @@ -76,12 +90,19 @@ #include #include -#include +#ifdef __LP64__ +#include +#endif + +#if CONFIG_MACF_MACH +#include +#endif int ipc_mqueue_full; /* address is event for queue space */ int ipc_mqueue_rcv; /* address is event for message arrival */ -#define TR_ENABLE 0 +/* forward declarations */ +void ipc_mqueue_receive_results(wait_result_t result); /* * Routine: ipc_mqueue_init @@ -94,7 +115,7 @@ ipc_mqueue_init( boolean_t is_set) { if (is_set) { - wait_queue_sub_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO); + wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST); } else { wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO); ipc_kmsg_queue_init(&mqueue->imq_messages); @@ -119,11 +140,11 @@ ipc_mqueue_init( boolean_t ipc_mqueue_member( - ipc_mqueue_t port_mqueue, - ipc_mqueue_t set_mqueue) + ipc_mqueue_t port_mqueue, + ipc_mqueue_t set_mqueue) { wait_queue_t port_waitq = &port_mqueue->imq_wait_queue; - wait_queue_t set_waitq = &set_mqueue->imq_wait_queue; + wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue; return (wait_queue_member(port_waitq, set_waitq)); @@ -133,39 +154,51 @@ ipc_mqueue_member( * Routine: ipc_mqueue_remove * Purpose: * Remove the association between the queue and the specified - * subordinate message queue. + * set message queue. */ kern_return_t ipc_mqueue_remove( ipc_mqueue_t mqueue, - ipc_mqueue_t sub_mqueue) + ipc_mqueue_t set_mqueue) { wait_queue_t mq_waitq = &mqueue->imq_wait_queue; - wait_queue_sub_t sub_waitq = &sub_mqueue->imq_set_queue; + wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue; - if (wait_queue_member(mq_waitq, sub_waitq)) { - wait_queue_unlink(mq_waitq, sub_waitq); - return KERN_SUCCESS; - } - return KERN_NOT_IN_SET; + return wait_queue_unlink(mq_waitq, set_waitq); } /* - * Routine: ipc_mqueue_remove_one + * Routine: ipc_mqueue_remove_from_all * Purpose: - * Find and remove one subqueue from the queue. + * Remove the mqueue from all the sets it is a member of * Conditions: - * Will return the set mqueue that was removed + * Nothing locked. */ void -ipc_mqueue_remove_one( - ipc_mqueue_t mqueue, - ipc_mqueue_t *sub_queuep) +ipc_mqueue_remove_from_all( + ipc_mqueue_t mqueue) { wait_queue_t mq_waitq = &mqueue->imq_wait_queue; - wait_queue_unlink_one(mq_waitq, (wait_queue_sub_t *)sub_queuep); + wait_queue_unlink_all(mq_waitq); + return; +} + +/* + * Routine: ipc_mqueue_remove_all + * Purpose: + * Remove all the member queues from the specified set. + * Conditions: + * Nothing locked. + */ +void +ipc_mqueue_remove_all( + ipc_mqueue_t mqueue) +{ + wait_queue_set_t mq_setq = &mqueue->imq_set_queue; + + wait_queue_set_unlink_all(mq_setq); return; } @@ -187,7 +220,7 @@ ipc_mqueue_add( ipc_mqueue_t set_mqueue) { wait_queue_t port_waitq = &port_mqueue->imq_wait_queue; - wait_queue_sub_t set_waitq = &set_mqueue->imq_set_queue; + wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue; ipc_kmsg_queue_t kmsgq; ipc_kmsg_t kmsg, next; kern_return_t kr; @@ -212,16 +245,29 @@ ipc_mqueue_add( for (;;) { thread_t th; + mach_msg_size_t msize; - th = wait_queue_wakeup_identity_locked(port_waitq, - IPC_MQUEUE_RECEIVE, - THREAD_AWAKENED, - FALSE); + th = wait_queue_wakeup64_identity_locked( + port_waitq, + IPC_MQUEUE_RECEIVE, + THREAD_AWAKENED, + FALSE); /* waitq/mqueue still locked, thread locked */ if (th == THREAD_NULL) goto leave; + /* + * If the receiver waited with a facility not directly + * related to Mach messaging, then it isn't prepared to get + * handed the message directly. Just set it running, and + * go look for another thread that can. + */ + if (th->ith_state != MACH_RCV_IN_PROGRESS) { + thread_unlock(th); + continue; + } + /* * Found a receiver. see if they can handle the message * correctly (the message is not too large for them, or @@ -230,15 +276,16 @@ ipc_mqueue_add( * the list and let them go back and figure it out and * just move onto the next. */ + msize = ipc_kmsg_copyout_size(kmsg, th->map); if (th->ith_msize < - kmsg->ikm_header.msgh_size + - REQUESTED_TRAILER_SIZE(th->ith_option)) { + (msize + REQUESTED_TRAILER_SIZE(th->ith_option))) { th->ith_state = MACH_RCV_TOO_LARGE; - th->ith_msize = kmsg->ikm_header.msgh_size; + th->ith_msize = msize; if (th->ith_option & MACH_RCV_LARGE) { /* * let him go without message */ + th->ith_receiver_name = port_mqueue->imq_receiver_name; th->ith_kmsg = IKM_NULL; th->ith_seqno = 0; thread_unlock(th); @@ -252,8 +299,9 @@ ipc_mqueue_add( * This thread is going to take this message, * so give it to him. */ - ipc_mqueue_release_msgcount(port_mqueue); ipc_kmsg_rmqueue(kmsgq, kmsg); + ipc_mqueue_release_msgcount(port_mqueue); + th->ith_kmsg = kmsg; th->ith_seqno = port_mqueue->imq_seqno++; thread_unlock(th); @@ -279,10 +327,11 @@ void ipc_mqueue_changed( ipc_mqueue_t mqueue) { - wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_RECEIVE, - THREAD_RESTART, - FALSE); /* unlock waitq? */ + wait_queue_wakeup64_all_locked( + &mqueue->imq_wait_queue, + IPC_MQUEUE_RECEIVE, + THREAD_RESTART, + FALSE); /* unlock waitq? */ } @@ -308,12 +357,12 @@ ipc_mqueue_changed( mach_msg_return_t ipc_mqueue_send( ipc_mqueue_t mqueue, - ipc_kmsg_t kmsg, + ipc_kmsg_t kmsg, mach_msg_option_t option, - mach_msg_timeout_t timeout) + mach_msg_timeout_t send_timeout, + spl_t s) { - int save_wait_result; - spl_t s; + int wresult; /* * Don't block if: @@ -321,57 +370,68 @@ ipc_mqueue_send( * 2) Caller used the MACH_SEND_ALWAYS internal option. * 3) Message is sent to a send-once right. */ - s = splsched(); - imq_lock(mqueue); - if (!imq_full(mqueue) || - (option & MACH_SEND_ALWAYS) || - (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) == - MACH_MSG_TYPE_PORT_SEND_ONCE)) { + (!imq_full_kernel(mqueue) && + ((option & MACH_SEND_ALWAYS) || + (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) == + MACH_MSG_TYPE_PORT_SEND_ONCE)))) { mqueue->imq_msgcount++; + assert(mqueue->imq_msgcount > 0); imq_unlock(mqueue); splx(s); } else { + thread_t cur_thread = current_thread(); + uint64_t deadline; /* * We have to wait for space to be granted to us. */ - if ((option & MACH_SEND_TIMEOUT) && (timeout == 0)) { + if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) { imq_unlock(mqueue); splx(s); return MACH_SEND_TIMED_OUT; } + if (imq_full_kernel(mqueue)) { + imq_unlock(mqueue); + splx(s); + return MACH_SEND_NO_BUFFER; + } mqueue->imq_fullwaiters = TRUE; - wait_queue_assert_wait_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_ABORTSAFE, - TRUE); /* unlock? */ - /* wait/mqueue is unlocked */ + thread_lock(cur_thread); + if (option & MACH_SEND_TIMEOUT) + clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline); + else + deadline = 0; + wresult = wait_queue_assert_wait64_locked( + &mqueue->imq_wait_queue, + IPC_MQUEUE_FULL, + THREAD_ABORTSAFE, deadline, + cur_thread); + thread_unlock(cur_thread); + imq_unlock(mqueue); splx(s); - if (option & MACH_SEND_TIMEOUT) - thread_set_timer(timeout, 1000*NSEC_PER_USEC); - - counter(c_ipc_mqueue_send_block++); - save_wait_result = thread_block((void (*)(void)) 0); + if (wresult == THREAD_WAITING) { + wresult = thread_block(THREAD_CONTINUE_NULL); + counter(c_ipc_mqueue_send_block++); + } - switch (save_wait_result) { + switch (wresult) { case THREAD_TIMED_OUT: assert(option & MACH_SEND_TIMEOUT); return MACH_SEND_TIMED_OUT; case THREAD_AWAKENED: /* we can proceed - inherited msgcount from waker */ - if (option & MACH_SEND_TIMEOUT) - thread_cancel_timer(); + assert(mqueue->imq_msgcount > 0); break; case THREAD_INTERRUPTED: - if (option & MACH_SEND_TIMEOUT) - thread_cancel_timer(); return MACH_SEND_INTERRUPTED; case THREAD_RESTART: + /* mqueue is being destroyed */ + return MACH_SEND_INVALID_DEST; default: panic("ipc_mqueue_send"); } @@ -388,24 +448,28 @@ ipc_mqueue_send( * found a waiter. * * Conditions: - * The message queue is locked + * The message queue is locked. + * The message corresponding to this reference is off the queue. */ void ipc_mqueue_release_msgcount( ipc_mqueue_t mqueue) { assert(imq_held(mqueue)); - assert(mqueue->imq_msgcount > 0); + assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages)); mqueue->imq_msgcount--; + if (!imq_full(mqueue) && mqueue->imq_fullwaiters) { - if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_AWAKENED, - FALSE) != KERN_SUCCESS) { + if (wait_queue_wakeup64_one_locked( + &mqueue->imq_wait_queue, + IPC_MQUEUE_FULL, + THREAD_AWAKENED, + FALSE) != KERN_SUCCESS) { mqueue->imq_fullwaiters = FALSE; } else { - mqueue->imq_msgcount++; /* gave it away */ + /* gave away our slot - add reference back */ + mqueue->imq_msgcount++; } } } @@ -425,7 +489,6 @@ ipc_mqueue_post( register ipc_mqueue_t mqueue, register ipc_kmsg_t kmsg) { - spl_t s; /* @@ -439,11 +502,13 @@ ipc_mqueue_post( for (;;) { wait_queue_t waitq = &mqueue->imq_wait_queue; thread_t receiver; + mach_msg_size_t msize; - receiver = wait_queue_wakeup_identity_locked(waitq, - IPC_MQUEUE_RECEIVE, - THREAD_AWAKENED, - FALSE); + receiver = wait_queue_wakeup64_identity_locked( + waitq, + IPC_MQUEUE_RECEIVE, + THREAD_AWAKENED, + FALSE); /* waitq still locked, thread locked */ if (receiver == THREAD_NULL) { @@ -454,16 +519,28 @@ ipc_mqueue_post( ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg); break; } - + + /* + * If the receiver waited with a facility not directly + * related to Mach messaging, then it isn't prepared to get + * handed the message directly. Just set it running, and + * go look for another thread that can. + */ + if (receiver->ith_state != MACH_RCV_IN_PROGRESS) { + thread_unlock(receiver); + continue; + } + + /* * We found a waiting thread. * If the message is too large or the scatter list is too small * the thread we wake up will get that as its status. */ + msize = ipc_kmsg_copyout_size(kmsg, receiver->map); if (receiver->ith_msize < - (kmsg->ikm_header.msgh_size) + - REQUESTED_TRAILER_SIZE(receiver->ith_option)) { - receiver->ith_msize = kmsg->ikm_header.msgh_size; + (msize + REQUESTED_TRAILER_SIZE(receiver->ith_option))) { + receiver->ith_msize = msize; receiver->ith_state = MACH_RCV_TOO_LARGE; } else { receiver->ith_state = MACH_MSG_SUCCESS; @@ -504,13 +581,11 @@ ipc_mqueue_post( } -kern_return_t -ipc_mqueue_receive_results(void) +/* static */ void +ipc_mqueue_receive_results(wait_result_t saved_wait_result) { thread_t self = current_thread(); mach_msg_option_t option = self->ith_option; - kern_return_t saved_wait_result = self->wait_result; - kern_return_t mr; /* * why did we wake up? @@ -521,15 +596,11 @@ ipc_mqueue_receive_results(void) return; case THREAD_INTERRUPTED: - if (option & MACH_RCV_TIMEOUT) - thread_cancel_timer(); self->ith_state = MACH_RCV_INTERRUPTED; return; case THREAD_RESTART: /* something bad happened to the port/set */ - if (option & MACH_RCV_TIMEOUT) - thread_cancel_timer(); self->ith_state = MACH_RCV_PORT_CHANGED; return; @@ -538,11 +609,6 @@ ipc_mqueue_receive_results(void) * We do not need to go select a message, somebody * handed us one (or a too-large indication). */ - if (option & MACH_RCV_TIMEOUT) - thread_cancel_timer(); - - mr = MACH_MSG_SUCCESS; - switch (self->ith_state) { case MACH_RCV_SCATTER_SMALL: case MACH_RCV_TOO_LARGE: @@ -570,9 +636,11 @@ ipc_mqueue_receive_results(void) } void -ipc_mqueue_receive_continue(void) +ipc_mqueue_receive_continue( + __unused void *param, + wait_result_t wresult) { - ipc_mqueue_receive_results(); + ipc_mqueue_receive_results(wresult); mach_msg_receive_continue(); /* hard-coded for now */ } @@ -606,79 +674,139 @@ ipc_mqueue_receive_continue(void) void ipc_mqueue_receive( - ipc_mqueue_t mqueue, - mach_msg_option_t option, - mach_msg_size_t max_size, - mach_msg_timeout_t timeout, - int interruptible) + ipc_mqueue_t mqueue, + mach_msg_option_t option, + mach_msg_size_t max_size, + mach_msg_timeout_t rcv_timeout, + int interruptible) { - ipc_port_t port; - mach_msg_return_t mr, mr2; - ipc_kmsg_queue_t kmsgs; - kern_return_t save_wait_result; - thread_t self; - ipc_kmsg_t *kmsgp; - mach_port_seqno_t *seqnop; - spl_t s; + wait_result_t wresult; + thread_t self = current_thread(); + + wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size, + rcv_timeout, interruptible, + self); + if (wresult == THREAD_NOT_WAITING) + return; + + if (wresult == THREAD_WAITING) { + counter((interruptible == THREAD_ABORTSAFE) ? + c_ipc_mqueue_receive_block_user++ : + c_ipc_mqueue_receive_block_kernel++); + + if (self->ith_continuation) + thread_block(ipc_mqueue_receive_continue); + /* NOTREACHED */ + + wresult = thread_block(THREAD_CONTINUE_NULL); + } + ipc_mqueue_receive_results(wresult); +} + +wait_result_t +ipc_mqueue_receive_on_thread( + ipc_mqueue_t mqueue, + mach_msg_option_t option, + mach_msg_size_t max_size, + mach_msg_timeout_t rcv_timeout, + int interruptible, + thread_t thread) +{ + ipc_kmsg_queue_t kmsgs; + wait_result_t wresult; + uint64_t deadline; + spl_t s; +#if CONFIG_MACF_MACH + ipc_labelh_t lh; + task_t task; + int rc; +#endif s = splsched(); imq_lock(mqueue); if (imq_is_set(mqueue)) { - wait_queue_link_t wql; - ipc_mqueue_t port_mq; queue_t q; - q = &mqueue->imq_setlinks; + q = &mqueue->imq_preposts; /* * If we are waiting on a portset mqueue, we need to see if - * any of the member ports have work for us. If so, try to - * deliver one of those messages. By holding the portset's + * any of the member ports have work for us. Ports that + * have (or recently had) messages will be linked in the + * prepost queue for the portset. By holding the portset's * mqueue lock during the search, we tie up any attempts by * mqueue_deliver or portset membership changes that may - * cross our path. But this is a lock order violation, so we - * have to do it "softly." If we don't find a message waiting - * for us, we will assert our intention to wait while still - * holding that lock. When we release the lock, the deliver/ - * change will succeed and find us. + * cross our path. */ search_set: - queue_iterate(q, wql, wait_queue_link_t, wql_sublinks) { + while(!queue_empty(q)) { + wait_queue_link_t wql; + ipc_mqueue_t port_mq; + + queue_remove_first(q, wql, wait_queue_link_t, wql_preposts); + assert(!wql_is_preposted(wql)); + + /* + * This is a lock order violation, so we have to do it + * "softly," putting the link back on the prepost list + * if it fails (at the tail is fine since the order of + * handling messages from different sources in a set is + * not guaranteed and we'd like to skip to the next source + * if one is available). + */ port_mq = (ipc_mqueue_t)wql->wql_queue; - kmsgs = &port_mq->imq_messages; - if (!imq_lock_try(port_mq)) { + queue_enter(q, wql, wait_queue_link_t, wql_preposts); imq_unlock(mqueue); splx(s); - delay(1); + mutex_pause(0); s = splsched(); imq_lock(mqueue); goto search_set; /* start again at beginning - SMP */ } /* - * If there is still a message to be had, we will - * try to select it (may not succeed because of size - * and options). In any case, we deliver those - * results back to the user. - * - * We also move the port's linkage to the tail of the - * list for this set (fairness). Future versions will - * sort by timestamp or priority. + * If there are no messages on this queue, just skip it + * (we already removed the link from the set's prepost queue). */ + kmsgs = &port_mq->imq_messages; if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) { imq_unlock(port_mq); continue; } - queue_remove(q, wql, wait_queue_link_t, wql_sublinks); - queue_enter(q, wql, wait_queue_link_t, wql_sublinks); + + /* + * There are messages, so reinsert the link back + * at the tail of the preposted queue (for fairness) + * while we still have the portset mqueue locked. + */ + queue_enter(q, wql, wait_queue_link_t, wql_preposts); imq_unlock(mqueue); - ipc_mqueue_select(port_mq, option, max_size); + /* + * Continue on to handling the message with just + * the port mqueue locked. + */ + ipc_mqueue_select_on_thread(port_mq, option, max_size, thread); imq_unlock(port_mq); +#if CONFIG_MACF_MACH + if (thread->task != TASK_NULL && + thread->ith_kmsg != NULL && + thread->ith_kmsg->ikm_sender != NULL) { + lh = thread->ith_kmsg->ikm_sender->label; + tasklabel_lock(thread->task); + ip_lock(lh->lh_port); + rc = mac_port_check_receive(&thread->task->maclabel, + &lh->lh_label); + ip_unlock(lh->lh_port); + tasklabel_unlock(thread->task); + if (rc) + thread->ith_state = MACH_RCV_INVALID_DATA; + } +#endif splx(s); - return; + return THREAD_NOT_WAITING; } @@ -689,127 +817,178 @@ ipc_mqueue_receive( */ kmsgs = &mqueue->imq_messages; if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) { - ipc_mqueue_select(mqueue, option, max_size); + ipc_mqueue_select_on_thread(mqueue, option, max_size, thread); imq_unlock(mqueue); +#if CONFIG_MACF_MACH + if (thread->task != TASK_NULL && + thread->ith_kmsg != NULL && + thread->ith_kmsg->ikm_sender != NULL) { + lh = thread->ith_kmsg->ikm_sender->label; + tasklabel_lock(thread->task); + ip_lock(lh->lh_port); + rc = mac_port_check_receive(&thread->task->maclabel, + &lh->lh_label); + ip_unlock(lh->lh_port); + tasklabel_unlock(thread->task); + if (rc) + thread->ith_state = MACH_RCV_INVALID_DATA; + } +#endif splx(s); - return; + return THREAD_NOT_WAITING; } } - + /* * Looks like we'll have to block. The mqueue we will * block on (whether the set's or the local port's) is * still locked. */ - self = current_thread(); if (option & MACH_RCV_TIMEOUT) { - if (timeout == 0) { + if (rcv_timeout == 0) { imq_unlock(mqueue); splx(s); - self->ith_state = MACH_RCV_TIMED_OUT; - return; + thread->ith_state = MACH_RCV_TIMED_OUT; + return THREAD_NOT_WAITING; } } - self->ith_state = MACH_RCV_IN_PROGRESS; - self->ith_option = option; - self->ith_msize = max_size; - - wait_queue_assert_wait_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_RECEIVE, - interruptible, - TRUE); /* unlock? */ - /* mqueue/waitq is unlocked */ + thread_lock(thread); + thread->ith_state = MACH_RCV_IN_PROGRESS; + thread->ith_option = option; + thread->ith_msize = max_size; + + if (option & MACH_RCV_TIMEOUT) + clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline); + else + deadline = 0; + + wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue, + IPC_MQUEUE_RECEIVE, + interruptible, deadline, + thread); + /* preposts should be detected above, not here */ + if (wresult == THREAD_AWAKENED) + panic("ipc_mqueue_receive_on_thread: sleep walking"); + + thread_unlock(thread); + imq_unlock(mqueue); splx(s); - - if (option & MACH_RCV_TIMEOUT) { - thread_set_timer(timeout, 1000*NSEC_PER_USEC); - } - - if (interruptible == THREAD_ABORTSAFE) { - counter(c_ipc_mqueue_receive_block_user++); - } else { - counter(c_ipc_mqueue_receive_block_kernel++); - } - -#if defined (__i386__) - thread_block((void (*)(void))0); -#else - if (self->ith_continuation) { - thread_block(ipc_mqueue_receive_continue); - } else { - thread_block((void (*)(void))0); - } -#endif - - ipc_mqueue_receive_results(); /* if we fell thru */ + return wresult; } /* - * Routine: ipc_mqueue_select + * Routine: ipc_mqueue_select_on_thread * Purpose: * A receiver discovered that there was a message on the queue * before he had to block. Pick the message off the queue and - * "post" it to himself. + * "post" it to thread. * Conditions: * mqueue locked. + * thread not locked. * There is a message. * Returns: * MACH_MSG_SUCCESS Actually selected a message for ourselves. * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large */ void -ipc_mqueue_select( +ipc_mqueue_select_on_thread( ipc_mqueue_t mqueue, mach_msg_option_t option, - mach_msg_size_t max_size) + mach_msg_size_t max_size, + thread_t thread) { - thread_t self = current_thread(); ipc_kmsg_t kmsg; - mach_port_seqno_t seqno; - mach_msg_return_t mr; - - mr = MACH_MSG_SUCCESS; - + mach_msg_return_t mr = MACH_MSG_SUCCESS; + mach_msg_size_t rcv_size; /* * Do some sanity checking of our ability to receive * before pulling the message off the queue. */ kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages); - assert(kmsg != IKM_NULL); - if (kmsg->ikm_header.msgh_size + - REQUESTED_TRAILER_SIZE(option) > max_size) { - mr = MACH_RCV_TOO_LARGE; - } - /* * If we really can't receive it, but we had the * MACH_RCV_LARGE option set, then don't take it off * the queue, instead return the appropriate error * (and size needed). */ - if ((mr == MACH_RCV_TOO_LARGE) && (option & MACH_RCV_LARGE)) { - self->ith_kmsg = IKM_NULL; - self->ith_msize = kmsg->ikm_header.msgh_size; - self->ith_seqno = 0; - self->ith_state = mr; - return; + rcv_size = ipc_kmsg_copyout_size(kmsg, thread->map); + if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) { + mr = MACH_RCV_TOO_LARGE; + if (option & MACH_RCV_LARGE) { + thread->ith_receiver_name = mqueue->imq_receiver_name; + thread->ith_kmsg = IKM_NULL; + thread->ith_msize = rcv_size; + thread->ith_seqno = 0; + thread->ith_state = mr; + return; + } } ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg); ipc_mqueue_release_msgcount(mqueue); - self->ith_seqno = mqueue->imq_seqno++; - self->ith_kmsg = kmsg; - self->ith_state = mr; + thread->ith_seqno = mqueue->imq_seqno++; + thread->ith_kmsg = kmsg; + thread->ith_state = mr; current_task()->messages_received++; return; } +/* + * Routine: ipc_mqueue_peek + * Purpose: + * Peek at a message queue to see if it has any messages + * (in it or contained message queues for a set). + * + * Conditions: + * Locks may be held by callers, so this routine cannot block. + * Caller holds reference on the message queue. + */ +int +ipc_mqueue_peek(ipc_mqueue_t mq) +{ + wait_queue_link_t wql; + queue_t q; + spl_t s; + + if (!imq_is_set(mq)) + return (ipc_kmsg_queue_first(&mq->imq_messages) != IKM_NULL); + + /* + * Don't block trying to get the lock. + */ + s = splsched(); + if (!imq_lock_try(mq)) { + splx(s); + return -1; + } + + /* + * peek at the contained port message queues, return as soon as + * we spot a message on one of the message queues linked on the + * prepost list. + */ + q = &mq->imq_preposts; + queue_iterate(q, wql, wait_queue_link_t, wql_preposts) { + ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue; + ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages; + + if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) { + imq_unlock(mq); + splx(s); + return 1; + } + } + imq_unlock(mq); + splx(s); + return 0; +} + /* * Routine: ipc_mqueue_destroy * Purpose: @@ -834,10 +1013,11 @@ ipc_mqueue_destroy( * rouse all blocked senders */ mqueue->imq_fullwaiters = FALSE; - wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_AWAKENED, - FALSE); + wait_queue_wakeup64_all_locked( + &mqueue->imq_wait_queue, + IPC_MQUEUE_FULL, + THREAD_RESTART, + FALSE); kmqueue = &mqueue->imq_messages; @@ -870,6 +1050,8 @@ ipc_mqueue_set_qlimit( { spl_t s; + assert(qlimit <= MACH_PORT_QLIMIT_MAX); + /* wake up senders allowed by the new qlimit */ s = splsched(); imq_lock(mqueue); @@ -880,13 +1062,15 @@ ipc_mqueue_set_qlimit( wakeup = qlimit - mqueue->imq_qlimit; for (i = 0; i < wakeup; i++) { - if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_AWAKENED, - FALSE) == KERN_NOT_WAITING) { + if (wait_queue_wakeup64_one_locked( + &mqueue->imq_wait_queue, + IPC_MQUEUE_FULL, + THREAD_AWAKENED, + FALSE) == KERN_NOT_WAITING) { mqueue->imq_fullwaiters = FALSE; break; } + mqueue->imq_msgcount++; /* give it to the awakened thread */ } } mqueue->imq_qlimit = qlimit; @@ -960,7 +1144,6 @@ ipc_mqueue_copyin( if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) { ipc_port_t port; - ipc_pset_t pset; port = (ipc_port_t) object; assert(port != IP_NULL);