X-Git-Url: https://git.saurik.com/apple/xnu.git/blobdiff_plain/de355530ae67247cbd0da700edb3a2a1dae884c2..527f99514973766e9c0382a4d8550dfb00f54939:/osfmk/ipc/ipc_mqueue.c diff --git a/osfmk/ipc/ipc_mqueue.c b/osfmk/ipc/ipc_mqueue.c index 81defb81f..38af0db4c 100644 --- a/osfmk/ipc/ipc_mqueue.c +++ b/osfmk/ipc/ipc_mqueue.c @@ -1,23 +1,29 @@ /* - * Copyright (c) 2000 Apple Computer, Inc. All rights reserved. + * Copyright (c) 2000-2007 Apple Inc. All rights reserved. * - * @APPLE_LICENSE_HEADER_START@ + * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ * - * The contents of this file constitute Original Code as defined in and - * are subject to the Apple Public Source License Version 1.1 (the - * "License"). You may not use this file except in compliance with the - * License. Please obtain a copy of the License at - * http://www.apple.com/publicsource and read it before using this file. + * This file contains Original Code and/or Modifications of Original Code + * as defined in and that are subject to the Apple Public Source License + * Version 2.0 (the 'License'). You may not use this file except in + * compliance with the License. The rights granted to you under the License + * may not be used to create, or enable the creation or redistribution of, + * unlawful or unlicensed copies of an Apple operating system, or to + * circumvent, violate, or enable the circumvention or violation of, any + * terms of an Apple operating system software license agreement. * - * This Original Code and all software distributed under the License are - * distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, EITHER + * Please obtain a copy of the License at + * http://www.opensource.apple.com/apsl/ and read it before using this file. + * + * The Original Code and all software distributed under the License are + * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. Please see the - * License for the specific language governing rights and limitations - * under the License. + * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. + * Please see the License for the specific language governing rights and + * limitations under the License. * - * @APPLE_LICENSE_HEADER_END@ + * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ */ /* * @OSF_FREE_COPYRIGHT@ @@ -56,6 +62,13 @@ * * Functions to manipulate IPC message queues. */ +/* + * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce + * support for mandatory and extensible security protections. This notice + * is included in support of clause 2.2 (b) of the Apple Public License, + * Version 2.0. + */ + #include #include @@ -65,10 +78,11 @@ #include #include #include +#include /* XXX - for mach_msg_receive_continue */ #include #include #include -#include +#include #include #include @@ -76,12 +90,27 @@ #include #include -#include +#if MACH_FLIPC +#include +#endif + +#ifdef __LP64__ +#include +#endif + +#include + +extern char *proc_name_address(void *p); int ipc_mqueue_full; /* address is event for queue space */ int ipc_mqueue_rcv; /* address is event for message arrival */ -#define TR_ENABLE 0 +/* forward declarations */ +void ipc_mqueue_receive_results(wait_result_t result); +static void ipc_mqueue_peek_on_thread( + ipc_mqueue_t port_mq, + mach_msg_option_t option, + thread_t thread); /* * Routine: ipc_mqueue_init @@ -91,20 +120,73 @@ int ipc_mqueue_rcv; /* address is event for message arrival */ void ipc_mqueue_init( ipc_mqueue_t mqueue, - boolean_t is_set) + boolean_t is_set, + uint64_t *reserved_link) { if (is_set) { - wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO); + waitq_set_init(&mqueue->imq_set_queue, + SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST, + reserved_link, NULL); } else { - wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO); + waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO); ipc_kmsg_queue_init(&mqueue->imq_messages); mqueue->imq_seqno = 0; mqueue->imq_msgcount = 0; mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT; mqueue->imq_fullwaiters = FALSE; +#if MACH_FLIPC + mqueue->imq_fport = FPORT_NULL; +#endif } + klist_init(&mqueue->imq_klist); +} + +void ipc_mqueue_deinit( + ipc_mqueue_t mqueue) +{ + boolean_t is_set = imq_is_set(mqueue); + + if (is_set) + waitq_set_deinit(&mqueue->imq_set_queue); + else + waitq_deinit(&mqueue->imq_wait_queue); } +/* + * Routine: imq_reserve_and_lock + * Purpose: + * Atomically lock an ipc_mqueue_t object and reserve + * an appropriate number of prepost linkage objects for + * use in wakeup operations. + * Conditions: + * mq is unlocked + */ +void +imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost) +{ + *reserved_prepost = waitq_prepost_reserve(&mq->imq_wait_queue, 0, + WAITQ_KEEP_LOCKED); + +} + + +/* + * Routine: imq_release_and_unlock + * Purpose: + * Unlock an ipc_mqueue_t object, re-enable interrupts, + * and release any unused prepost object reservations. + * Conditions: + * mq is locked + */ +void +imq_release_and_unlock(ipc_mqueue_t mq, uint64_t reserved_prepost) +{ + assert(imq_held(mq)); + waitq_unlock(&mq->imq_wait_queue); + waitq_prepost_release_reserve(reserved_prepost); +} + + /* * Routine: ipc_mqueue_member * Purpose: @@ -119,13 +201,13 @@ ipc_mqueue_init( boolean_t ipc_mqueue_member( - ipc_mqueue_t port_mqueue, - ipc_mqueue_t set_mqueue) + ipc_mqueue_t port_mqueue, + ipc_mqueue_t set_mqueue) { - wait_queue_t port_waitq = &port_mqueue->imq_wait_queue; - wait_queue_t set_waitq = &set_mqueue->imq_wait_queue; + struct waitq *port_waitq = &port_mqueue->imq_wait_queue; + struct waitq_set *set_waitq = &set_mqueue->imq_set_queue; - return (wait_queue_member(port_waitq, set_waitq)); + return waitq_member(port_waitq, set_waitq); } @@ -138,13 +220,13 @@ ipc_mqueue_member( kern_return_t ipc_mqueue_remove( - ipc_mqueue_t mqueue, - ipc_mqueue_t set_mqueue) + ipc_mqueue_t mqueue, + ipc_mqueue_t set_mqueue) { - wait_queue_t mq_waitq = &mqueue->imq_wait_queue; - wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue; + struct waitq *mq_waitq = &mqueue->imq_wait_queue; + struct waitq_set *set_waitq = &set_mqueue->imq_set_queue; - return wait_queue_unlink(mq_waitq, set_waitq); + return waitq_unlink(mq_waitq, set_waitq); } /* @@ -153,32 +235,41 @@ ipc_mqueue_remove( * Remove the mqueue from all the sets it is a member of * Conditions: * Nothing locked. + * Returns: + * mqueue unlocked and set links deallocated */ void -ipc_mqueue_remove_from_all( - ipc_mqueue_t mqueue) +ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue) { - wait_queue_t mq_waitq = &mqueue->imq_wait_queue; + struct waitq *mq_waitq = &mqueue->imq_wait_queue; + kern_return_t kr; - wait_queue_unlink_all(mq_waitq); - return; + imq_lock(mqueue); + + assert(waitq_valid(mq_waitq)); + kr = waitq_unlink_all_unlock(mq_waitq); + /* mqueue unlocked and set links deallocated */ } /* * Routine: ipc_mqueue_remove_all * Purpose: * Remove all the member queues from the specified set. + * Also removes the queue from any containing sets. * Conditions: * Nothing locked. + * Returns: + * mqueue unlocked all set links deallocated */ void -ipc_mqueue_remove_all( - ipc_mqueue_t mqueue) +ipc_mqueue_remove_all(ipc_mqueue_t mqueue) { - wait_queue_set_t mq_setq = &mqueue->imq_set_queue; + struct waitq_set *mq_setq = &mqueue->imq_set_queue; - wait_queue_set_unlink_all(mq_setq); - return; + imq_lock(mqueue); + assert(waitqs_is_set(mq_setq)); + waitq_set_unlink_all_unlock(mq_setq); + /* mqueue unlocked set links deallocated */ } @@ -195,27 +286,36 @@ ipc_mqueue_remove_all( */ kern_return_t ipc_mqueue_add( - ipc_mqueue_t port_mqueue, - ipc_mqueue_t set_mqueue) + ipc_mqueue_t port_mqueue, + ipc_mqueue_t set_mqueue, + uint64_t *reserved_link, + uint64_t *reserved_prepost) { - wait_queue_t port_waitq = &port_mqueue->imq_wait_queue; - wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue; + struct waitq *port_waitq = &port_mqueue->imq_wait_queue; + struct waitq_set *set_waitq = &set_mqueue->imq_set_queue; ipc_kmsg_queue_t kmsgq; ipc_kmsg_t kmsg, next; kern_return_t kr; - spl_t s; - kr = wait_queue_link(port_waitq, set_waitq); - if (kr != KERN_SUCCESS) + assert(reserved_link && *reserved_link != 0); + + imq_lock(port_mqueue); + + /* + * The link operation is now under the same lock-hold as + * message iteration and thread wakeup, but doesn't have to be... + */ + kr = waitq_link(port_waitq, set_waitq, WAITQ_ALREADY_LOCKED, reserved_link); + if (kr != KERN_SUCCESS) { + imq_unlock(port_mqueue); return kr; + } /* * Now that the set has been added to the port, there may be * messages queued on the port and threads waiting on the set * waitq. Lets get them together. */ - s = splsched(); - imq_lock(port_mqueue); kmsgq = &port_mqueue->imq_messages; for (kmsg = ipc_kmsg_queue_first(kmsgq); kmsg != IKM_NULL; @@ -224,17 +324,43 @@ ipc_mqueue_add( for (;;) { thread_t th; + mach_msg_size_t msize; + spl_t th_spl; - th = wait_queue_wakeup64_identity_locked( + th = waitq_wakeup64_identify_locked( port_waitq, IPC_MQUEUE_RECEIVE, - THREAD_AWAKENED, - FALSE); + THREAD_AWAKENED, &th_spl, + reserved_prepost, WAITQ_ALL_PRIORITIES, + WAITQ_KEEP_LOCKED); /* waitq/mqueue still locked, thread locked */ if (th == THREAD_NULL) goto leave; + /* + * If the receiver waited with a facility not directly + * related to Mach messaging, then it isn't prepared to get + * handed the message directly. Just set it running, and + * go look for another thread that can. + */ + if (th->ith_state != MACH_RCV_IN_PROGRESS) { + if (th->ith_state == MACH_PEEK_IN_PROGRESS) { + /* + * wakeup the peeking thread, but + * continue to loop over the threads + * waiting on the port's mqueue to see + * if there are any actual receivers + */ + ipc_mqueue_peek_on_thread(port_mqueue, + th->ith_option, + th); + } + thread_unlock(th); + splx(th_spl); + continue; + } + /* * Found a receiver. see if they can handle the message * correctly (the message is not too large for them, or @@ -243,18 +369,20 @@ ipc_mqueue_add( * the list and let them go back and figure it out and * just move onto the next. */ - if (th->ith_msize < - kmsg->ikm_header.msgh_size + - REQUESTED_TRAILER_SIZE(th->ith_option)) { + msize = ipc_kmsg_copyout_size(kmsg, th->map); + if (th->ith_rsize < + (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(th), th->ith_option))) { th->ith_state = MACH_RCV_TOO_LARGE; - th->ith_msize = kmsg->ikm_header.msgh_size; + th->ith_msize = msize; if (th->ith_option & MACH_RCV_LARGE) { /* * let him go without message */ + th->ith_receiver_name = port_mqueue->imq_receiver_name; th->ith_kmsg = IKM_NULL; th->ith_seqno = 0; thread_unlock(th); + splx(th_spl); continue; /* find another thread */ } } else { @@ -265,18 +393,25 @@ ipc_mqueue_add( * This thread is going to take this message, * so give it to him. */ - ipc_mqueue_release_msgcount(port_mqueue); ipc_kmsg_rmqueue(kmsgq, kmsg); +#if MACH_FLIPC + mach_node_t node = kmsg->ikm_node; +#endif + ipc_mqueue_release_msgcount(port_mqueue, IMQ_NULL); + th->ith_kmsg = kmsg; th->ith_seqno = port_mqueue->imq_seqno++; thread_unlock(th); + splx(th_spl); +#if MACH_FLIPC + if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) + flipc_msg_ack(node, port_mqueue, TRUE); +#endif break; /* go to next message */ } - } leave: imq_unlock(port_mqueue); - splx(s); return KERN_SUCCESS; } @@ -292,11 +427,15 @@ void ipc_mqueue_changed( ipc_mqueue_t mqueue) { - wait_queue_wakeup64_all_locked( - &mqueue->imq_wait_queue, - IPC_MQUEUE_RECEIVE, - THREAD_RESTART, - FALSE); /* unlock waitq? */ + /* Indicate that this message queue is vanishing */ + knote_vanish(&mqueue->imq_klist); + + waitq_wakeup64_all_locked(&mqueue->imq_wait_queue, + IPC_MQUEUE_RECEIVE, + THREAD_RESTART, + NULL, + WAITQ_ALL_PRIORITIES, + WAITQ_KEEP_LOCKED); } @@ -313,7 +452,7 @@ ipc_mqueue_changed( * the message and must do something with it. If successful, * the message is queued, given to a receiver, or destroyed. * Conditions: - * Nothing locked. + * mqueue is locked. * Returns: * MACH_MSG_SUCCESS The message was accepted. * MACH_SEND_TIMED_OUT Caller still has message. @@ -322,12 +461,11 @@ ipc_mqueue_changed( mach_msg_return_t ipc_mqueue_send( ipc_mqueue_t mqueue, - ipc_kmsg_t kmsg, + ipc_kmsg_t kmsg, mach_msg_option_t option, - mach_msg_timeout_t timeout) + mach_msg_timeout_t send_timeout) { int wresult; - spl_t s; /* * Don't block if: @@ -335,69 +473,126 @@ ipc_mqueue_send( * 2) Caller used the MACH_SEND_ALWAYS internal option. * 3) Message is sent to a send-once right. */ - s = splsched(); - imq_lock(mqueue); - if (!imq_full(mqueue) || - (option & MACH_SEND_ALWAYS) || - (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) == - MACH_MSG_TYPE_PORT_SEND_ONCE)) { + (!imq_full_kernel(mqueue) && + ((option & MACH_SEND_ALWAYS) || + (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) == + MACH_MSG_TYPE_PORT_SEND_ONCE)))) { mqueue->imq_msgcount++; + assert(mqueue->imq_msgcount > 0); imq_unlock(mqueue); - splx(s); } else { + thread_t cur_thread = current_thread(); + uint64_t deadline; /* * We have to wait for space to be granted to us. */ - if ((option & MACH_SEND_TIMEOUT) && (timeout == 0)) { + if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) { imq_unlock(mqueue); - splx(s); return MACH_SEND_TIMED_OUT; } + if (imq_full_kernel(mqueue)) { + imq_unlock(mqueue); + return MACH_SEND_NO_BUFFER; + } mqueue->imq_fullwaiters = TRUE; - wresult = wait_queue_assert_wait64_locked( + + if (option & MACH_SEND_TIMEOUT) + clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline); + else + deadline = 0; + + thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend); + wresult = waitq_assert_wait64_locked( &mqueue->imq_wait_queue, IPC_MQUEUE_FULL, THREAD_ABORTSAFE, - TRUE); /* unlock? */ - /* wait/mqueue is unlocked */ - splx(s); + TIMEOUT_URGENCY_USER_NORMAL, + deadline, TIMEOUT_NO_LEEWAY, + cur_thread); + + imq_unlock(mqueue); if (wresult == THREAD_WAITING) { - if (option & MACH_SEND_TIMEOUT) { - thread_set_timer(timeout, 1000*NSEC_PER_USEC); - wresult = thread_block(THREAD_CONTINUE_NULL); - if (wresult != THREAD_TIMED_OUT) - thread_cancel_timer(); - } else { - wresult = thread_block(THREAD_CONTINUE_NULL); - } + wresult = thread_block(THREAD_CONTINUE_NULL); counter(c_ipc_mqueue_send_block++); } switch (wresult) { + + case THREAD_AWAKENED: + /* + * we can proceed - inherited msgcount from waker + * or the message queue has been destroyed and the msgcount + * has been reset to zero (will detect in ipc_mqueue_post()). + */ + break; + case THREAD_TIMED_OUT: assert(option & MACH_SEND_TIMEOUT); return MACH_SEND_TIMED_OUT; - case THREAD_AWAKENED: - /* we can proceed - inherited msgcount from waker */ - break; - case THREAD_INTERRUPTED: return MACH_SEND_INTERRUPTED; case THREAD_RESTART: + /* mqueue is being destroyed */ + return MACH_SEND_INVALID_DEST; default: panic("ipc_mqueue_send"); } } - ipc_mqueue_post(mqueue, kmsg); + ipc_mqueue_post(mqueue, kmsg, option); return MACH_MSG_SUCCESS; } +/* + * Routine: ipc_mqueue_override_send + * Purpose: + * Set an override qos on the first message in the queue + * (if the queue is full). This is a send-possible override + * that will go away as soon as we drain a message from the + * queue. + * + * Conditions: + * The message queue is not locked. + * The caller holds a reference on the message queue. + */ +extern void ipc_mqueue_override_send( + ipc_mqueue_t mqueue, + mach_msg_priority_t override) +{ + boolean_t __unused full_queue_empty = FALSE; + + imq_lock(mqueue); + assert(imq_valid(mqueue)); + assert(!imq_is_set(mqueue)); + + if (imq_full(mqueue)) { + ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages); + + if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override)) + KNOTE(&mqueue->imq_klist, 0); + if (!first) + full_queue_empty = TRUE; + } + imq_unlock(mqueue); + +#if DEVELOPMENT || DEBUG + if (full_queue_empty) { + ipc_port_t port = ip_from_mq(mqueue); + int dst_pid = 0; + if (ip_active(port) && !port->ip_tempowner && + port->ip_receiver_name && port->ip_receiver && + port->ip_receiver != ipc_space_kernel) { + dst_pid = task_pid(port->ip_receiver->is_task); + } + } +#endif +} + /* * Routine: ipc_mqueue_release_msgcount * Purpose: @@ -405,27 +600,45 @@ ipc_mqueue_send( * found a waiter. * * Conditions: - * The message queue is locked + * The message queue is locked. + * The message corresponding to this reference is off the queue. + * There is no need to pass reserved preposts because this will + * never prepost to anyone */ void -ipc_mqueue_release_msgcount( - ipc_mqueue_t mqueue) +ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq) { - assert(imq_held(mqueue)); - assert(mqueue->imq_msgcount > 0); + (void)set_mq; + assert(imq_held(port_mq)); + assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages)); - mqueue->imq_msgcount--; - if (!imq_full(mqueue) && mqueue->imq_fullwaiters) { - if (wait_queue_wakeup64_one_locked( - &mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_AWAKENED, - FALSE) != KERN_SUCCESS) { - mqueue->imq_fullwaiters = FALSE; + port_mq->imq_msgcount--; + + if (!imq_full(port_mq) && port_mq->imq_fullwaiters) { + /* + * boost the priority of the awoken thread + * (WAITQ_PROMOTE_PRIORITY) to ensure it uses + * the message queue slot we've just reserved. + * + * NOTE: this will never prepost + */ + if (waitq_wakeup64_one_locked(&port_mq->imq_wait_queue, + IPC_MQUEUE_FULL, + THREAD_AWAKENED, + NULL, + WAITQ_PROMOTE_PRIORITY, + WAITQ_KEEP_LOCKED) != KERN_SUCCESS) { + port_mq->imq_fullwaiters = FALSE; } else { - mqueue->imq_msgcount++; /* gave it away */ + /* gave away our slot - add reference back */ + port_mq->imq_msgcount++; } } + + if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) { + /* no more msgs: invalidate the port's prepost object */ + waitq_clear_prepost_locked(&port_mq->imq_wait_queue); + } } /* @@ -436,15 +649,19 @@ ipc_mqueue_release_msgcount( * the message queue. * * Conditions: + * mqueue is unlocked * If we need to queue, our space in the message queue is reserved. */ void ipc_mqueue_post( - register ipc_mqueue_t mqueue, - register ipc_kmsg_t kmsg) + ipc_mqueue_t mqueue, + ipc_kmsg_t kmsg, + mach_msg_option_t __unused option) { + uint64_t reserved_prepost = 0; + boolean_t destroy_msg = FALSE; - spl_t s; + ipc_kmsg_trace_send(kmsg, option); /* * While the msg queue is locked, we have control of the @@ -452,37 +669,100 @@ ipc_mqueue_post( * * Check for a receiver for the message. */ - s = splsched(); - imq_lock(mqueue); + imq_reserve_and_lock(mqueue, &reserved_prepost); + + /* we may have raced with port destruction! */ + if (!imq_valid(mqueue)) { + destroy_msg = TRUE; + goto out_unlock; + } + for (;;) { - wait_queue_t waitq = &mqueue->imq_wait_queue; + struct waitq *waitq = &mqueue->imq_wait_queue; + spl_t th_spl; thread_t receiver; - - receiver = wait_queue_wakeup64_identity_locked( - waitq, - IPC_MQUEUE_RECEIVE, - THREAD_AWAKENED, - FALSE); + mach_msg_size_t msize; + + receiver = waitq_wakeup64_identify_locked(waitq, + IPC_MQUEUE_RECEIVE, + THREAD_AWAKENED, + &th_spl, + &reserved_prepost, + WAITQ_ALL_PRIORITIES, + WAITQ_KEEP_LOCKED); /* waitq still locked, thread locked */ if (receiver == THREAD_NULL) { + /* - * no receivers; queue kmsg + * no receivers; queue kmsg if space still reserved + * Reservations are cancelled when the port goes inactive. + * note that this will enqueue the message for any + * "peeking" receivers. + * + * Also, post the knote to wake up any threads waiting + * on that style of interface if this insertion is of + * note (first insertion, or adjusted override qos all + * the way to the head of the queue). + * + * This is just for ports. portset knotes are stay-active, + * and their threads get awakened through the !MACH_RCV_IN_PROGRESS + * logic below). */ - assert(mqueue->imq_msgcount > 0); - ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg); - break; + if (mqueue->imq_msgcount > 0) { + if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) + KNOTE(&mqueue->imq_klist, 0); + break; + } + + /* + * Otherwise, the message queue must belong to an inactive + * port, so just destroy the message and pretend it was posted. + */ + destroy_msg = TRUE; + goto out_unlock; } - + + /* + * If a thread is attempting a "peek" into the message queue + * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the + * thread running. A successful peek is essentially the same as + * message delivery since the peeking thread takes responsibility + * for delivering the message and (eventually) removing it from + * the mqueue. Only one thread can successfully use the peek + * facility on any given port, so we exit the waitq loop after + * encountering such a thread. + */ + if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) { + ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg); + ipc_mqueue_peek_on_thread(mqueue, receiver->ith_option, receiver); + thread_unlock(receiver); + splx(th_spl); + break; /* Message was posted, so break out of loop */ + } + + /* + * If the receiver waited with a facility not directly related + * to Mach messaging, then it isn't prepared to get handed the + * message directly. Just set it running, and go look for + * another thread that can. + */ + if (receiver->ith_state != MACH_RCV_IN_PROGRESS) { + thread_unlock(receiver); + splx(th_spl); + continue; + } + + /* * We found a waiting thread. * If the message is too large or the scatter list is too small * the thread we wake up will get that as its status. */ - if (receiver->ith_msize < - (kmsg->ikm_header.msgh_size) + - REQUESTED_TRAILER_SIZE(receiver->ith_option)) { - receiver->ith_msize = kmsg->ikm_header.msgh_size; + msize = ipc_kmsg_copyout_size(kmsg, receiver->map); + if (receiver->ith_rsize < + (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(receiver), receiver->ith_option))) { + receiver->ith_msize = msize; receiver->ith_state = MACH_RCV_TOO_LARGE; } else { receiver->ith_state = MACH_MSG_SUCCESS; @@ -495,13 +775,21 @@ ipc_mqueue_post( */ if ((receiver->ith_state == MACH_MSG_SUCCESS) || !(receiver->ith_option & MACH_RCV_LARGE)) { - receiver->ith_kmsg = kmsg; receiver->ith_seqno = mqueue->imq_seqno++; +#if MACH_FLIPC + mach_node_t node = kmsg->ikm_node; +#endif thread_unlock(receiver); + splx(th_spl); /* we didn't need our reserved spot in the queue */ - ipc_mqueue_release_msgcount(mqueue); + ipc_mqueue_release_msgcount(mqueue, IMQ_NULL); + +#if MACH_FLIPC + if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) + flipc_msg_ack(node, mqueue, TRUE); +#endif break; } @@ -510,26 +798,30 @@ ipc_mqueue_post( * and handle its error without getting the message. We * need to go back and pick another one. */ + receiver->ith_receiver_name = mqueue->imq_receiver_name; receiver->ith_kmsg = IKM_NULL; receiver->ith_seqno = 0; thread_unlock(receiver); + splx(th_spl); } - imq_unlock(mqueue); - splx(s); - +out_unlock: + /* clear the waitq boost we may have been given */ + waitq_clear_promotion_locked(&mqueue->imq_wait_queue, current_thread()); + imq_release_and_unlock(mqueue, reserved_prepost); + if (destroy_msg) + ipc_kmsg_destroy(kmsg); + current_task()->messages_sent++; return; } -kern_return_t -ipc_mqueue_receive_results(void) +/* static */ void +ipc_mqueue_receive_results(wait_result_t saved_wait_result) { thread_t self = current_thread(); mach_msg_option_t option = self->ith_option; - kern_return_t saved_wait_result = self->wait_result; - kern_return_t mr; /* * why did we wake up? @@ -540,15 +832,11 @@ ipc_mqueue_receive_results(void) return; case THREAD_INTERRUPTED: - if (option & MACH_RCV_TIMEOUT) - thread_cancel_timer(); self->ith_state = MACH_RCV_INTERRUPTED; return; case THREAD_RESTART: /* something bad happened to the port/set */ - if (option & MACH_RCV_TIMEOUT) - thread_cancel_timer(); self->ith_state = MACH_RCV_PORT_CHANGED; return; @@ -557,11 +845,6 @@ ipc_mqueue_receive_results(void) * We do not need to go select a message, somebody * handed us one (or a too-large indication). */ - if (option & MACH_RCV_TIMEOUT) - thread_cancel_timer(); - - mr = MACH_MSG_SUCCESS; - switch (self->ith_state) { case MACH_RCV_SCATTER_SMALL: case MACH_RCV_TOO_LARGE: @@ -577,6 +860,7 @@ ipc_mqueue_receive_results(void) } case MACH_MSG_SUCCESS: + case MACH_PEEK_READY: return; default: @@ -589,9 +873,11 @@ ipc_mqueue_receive_results(void) } void -ipc_mqueue_receive_continue(void) +ipc_mqueue_receive_continue( + __unused void *param, + wait_result_t wresult) { - ipc_mqueue_receive_results(); + ipc_mqueue_receive_results(wresult); mach_msg_receive_continue(); /* hard-coded for now */ } @@ -600,22 +886,17 @@ ipc_mqueue_receive_continue(void) * Purpose: * Receive a message from a message queue. * - * If continuation is non-zero, then we might discard - * our kernel stack when we block. We will continue - * after unblocking by executing continuation. - * - * If resume is true, then we are resuming a receive - * operation after a blocked receive discarded our stack. * Conditions: * Our caller must hold a reference for the port or port set * to which this queue belongs, to keep the queue * from being deallocated. * * The kmsg is returned with clean header fields - * and with the circular bit turned off. + * and with the circular bit turned off through the ith_kmsg + * field of the thread's receive continuation state. * Returns: - * MACH_MSG_SUCCESS Message returned in kmsgp. - * MACH_RCV_TOO_LARGE Message size returned in kmsgp. + * MACH_MSG_SUCCESS Message returned in ith_kmsg. + * MACH_RCV_TOO_LARGE Message size returned in ith_msize. * MACH_RCV_TIMED_OUT No message obtained. * MACH_RCV_INTERRUPTED No message obtained. * MACH_RCV_PORT_DIED Port/set died; no message. @@ -625,248 +906,636 @@ ipc_mqueue_receive_continue(void) void ipc_mqueue_receive( - ipc_mqueue_t mqueue, - mach_msg_option_t option, - mach_msg_size_t max_size, - mach_msg_timeout_t timeout, - int interruptible) + ipc_mqueue_t mqueue, + mach_msg_option_t option, + mach_msg_size_t max_size, + mach_msg_timeout_t rcv_timeout, + int interruptible) { - ipc_port_t port; - mach_msg_return_t mr, mr2; - ipc_kmsg_queue_t kmsgs; - wait_result_t wresult; - thread_t self; - ipc_kmsg_t *kmsgp; - mach_port_seqno_t *seqnop; - spl_t s; - - s = splsched(); + wait_result_t wresult; + thread_t self = current_thread(); + imq_lock(mqueue); - - if (imq_is_set(mqueue)) { - wait_queue_link_t wql; - ipc_mqueue_t port_mq; - queue_t q; + wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size, + rcv_timeout, interruptible, + self); + /* mqueue unlocked */ + if (wresult == THREAD_NOT_WAITING) + return; + + if (wresult == THREAD_WAITING) { + counter((interruptible == THREAD_ABORTSAFE) ? + c_ipc_mqueue_receive_block_user++ : + c_ipc_mqueue_receive_block_kernel++); + + if (self->ith_continuation) + thread_block(ipc_mqueue_receive_continue); + /* NOTREACHED */ + + wresult = thread_block(THREAD_CONTINUE_NULL); + } + ipc_mqueue_receive_results(wresult); +} + +static int mqueue_process_prepost_receive(void *ctx, struct waitq *waitq, + struct waitq_set *wqset) +{ + ipc_mqueue_t port_mq, *pmq_ptr; + + (void)wqset; + port_mq = (ipc_mqueue_t)waitq; + + /* + * If there are no messages on this queue, skip it and remove + * it from the prepost list + */ + if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) + return WQ_ITERATE_INVALIDATE_CONTINUE; + + /* + * There are messages waiting on this port. + * Instruct the prepost iteration logic to break, but keep the + * waitq locked. + */ + pmq_ptr = (ipc_mqueue_t *)ctx; + if (pmq_ptr) + *pmq_ptr = port_mq; + return WQ_ITERATE_BREAK_KEEP_LOCKED; +} + +/* + * Routine: ipc_mqueue_receive_on_thread + * Purpose: + * Receive a message from a message queue using a specified thread. + * If no message available, assert_wait on the appropriate waitq. + * + * Conditions: + * Assumes thread is self. + * Called with mqueue locked. + * Returns with mqueue unlocked. + * May have assert-waited. Caller must block in those cases. + */ +wait_result_t +ipc_mqueue_receive_on_thread( + ipc_mqueue_t mqueue, + mach_msg_option_t option, + mach_msg_size_t max_size, + mach_msg_timeout_t rcv_timeout, + int interruptible, + thread_t thread) +{ + wait_result_t wresult; + uint64_t deadline; - q = &mqueue->imq_setlinks; + /* called with mqueue locked */ + /* no need to reserve anything: we never prepost to anyone */ + + if (!imq_valid(mqueue)) { + /* someone raced us to destroy this mqueue/port! */ + imq_unlock(mqueue); /* - * If we are waiting on a portset mqueue, we need to see if - * any of the member ports have work for us. If so, try to - * deliver one of those messages. By holding the portset's - * mqueue lock during the search, we tie up any attempts by - * mqueue_deliver or portset membership changes that may - * cross our path. But this is a lock order violation, so we - * have to do it "softly." If we don't find a message waiting - * for us, we will assert our intention to wait while still - * holding that lock. When we release the lock, the deliver/ - * change will succeed and find us. + * ipc_mqueue_receive_results updates the thread's ith_state + * TODO: differentiate between rights being moved and + * rights/ports being destroyed (21885327) */ - search_set: - queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) { - port_mq = (ipc_mqueue_t)wql->wql_queue; - kmsgs = &port_mq->imq_messages; - - if (!imq_lock_try(port_mq)) { - imq_unlock(mqueue); - splx(s); - delay(1); - s = splsched(); - imq_lock(mqueue); - goto search_set; /* start again at beginning - SMP */ - } + return THREAD_RESTART; + } + + if (imq_is_set(mqueue)) { + ipc_mqueue_t port_mq = IMQ_NULL; + + (void)waitq_set_iterate_preposts(&mqueue->imq_set_queue, + &port_mq, + mqueue_process_prepost_receive); + if (port_mq != IMQ_NULL) { /* - * If there is still a message to be had, we will - * try to select it (may not succeed because of size - * and options). In any case, we deliver those - * results back to the user. + * We get here if there is at least one message + * waiting on port_mq. We have instructed the prepost + * iteration logic to leave both the port_mq and the + * set mqueue locked. * - * We also move the port's linkage to the tail of the - * list for this set (fairness). Future versions will - * sort by timestamp or priority. + * TODO: previously, we would place this port at the + * back of the prepost list... */ - if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) { - imq_unlock(port_mq); - continue; - } - queue_remove(q, wql, wait_queue_link_t, wql_setlinks); - queue_enter(q, wql, wait_queue_link_t, wql_setlinks); imq_unlock(mqueue); - ipc_mqueue_select(port_mq, option, max_size); + /* + * Continue on to handling the message with just + * the port mqueue locked. + */ + if (option & MACH_PEEK_MSG) + ipc_mqueue_peek_on_thread(port_mq, option, thread); + else + ipc_mqueue_select_on_thread(port_mq, mqueue, option, + max_size, thread); + imq_unlock(port_mq); - splx(s); - return; - + return THREAD_NOT_WAITING; } - - } else { + } else if (imq_is_queue(mqueue)) { + ipc_kmsg_queue_t kmsgs; /* * Receive on a single port. Just try to get the messages. */ kmsgs = &mqueue->imq_messages; if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) { - ipc_mqueue_select(mqueue, option, max_size); + if (option & MACH_PEEK_MSG) + ipc_mqueue_peek_on_thread(mqueue, option, thread); + else + ipc_mqueue_select_on_thread(mqueue, IMQ_NULL, option, + max_size, thread); imq_unlock(mqueue); - splx(s); - return; + return THREAD_NOT_WAITING; } + } else { + panic("Unknown mqueue type 0x%x: likely memory corruption!\n", + mqueue->imq_wait_queue.waitq_type); } - + /* * Looks like we'll have to block. The mqueue we will * block on (whether the set's or the local port's) is * still locked. */ - self = current_thread(); if (option & MACH_RCV_TIMEOUT) { - if (timeout == 0) { + if (rcv_timeout == 0) { imq_unlock(mqueue); - splx(s); - self->ith_state = MACH_RCV_TIMED_OUT; - return; + thread->ith_state = MACH_RCV_TIMED_OUT; + return THREAD_NOT_WAITING; } } - self->ith_state = MACH_RCV_IN_PROGRESS; - self->ith_option = option; - self->ith_msize = max_size; - - wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_RECEIVE, - interruptible, - TRUE); /* unlock? */ - /* mqueue/waitq is unlocked */ - splx(s); + thread->ith_option = option; + thread->ith_rsize = max_size; + thread->ith_msize = 0; + + if (option & MACH_PEEK_MSG) + thread->ith_state = MACH_PEEK_IN_PROGRESS; + else + thread->ith_state = MACH_RCV_IN_PROGRESS; + + if (option & MACH_RCV_TIMEOUT) + clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline); + else + deadline = 0; + + thread_set_pending_block_hint(thread, kThreadWaitPortReceive); + wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue, + IPC_MQUEUE_RECEIVE, + interruptible, + TIMEOUT_URGENCY_USER_NORMAL, + deadline, + TIMEOUT_NO_LEEWAY, + thread); + /* preposts should be detected above, not here */ + if (wresult == THREAD_AWAKENED) + panic("ipc_mqueue_receive_on_thread: sleep walking"); - if (wresult == THREAD_WAITING) { - if (option & MACH_RCV_TIMEOUT) - thread_set_timer(timeout, 1000*NSEC_PER_USEC); + imq_unlock(mqueue); - if (interruptible == THREAD_ABORTSAFE) - counter(c_ipc_mqueue_receive_block_user++); - else - counter(c_ipc_mqueue_receive_block_kernel++); + return wresult; +} - if (self->ith_continuation) - thread_block(ipc_mqueue_receive_continue); - /* NOTREACHED */ - thread_block(THREAD_CONTINUE_NULL); - } - ipc_mqueue_receive_results(); -} +/* + * Routine: ipc_mqueue_peek_on_thread + * Purpose: + * A receiver discovered that there was a message on the queue + * before he had to block. Tell a thread about the message queue, + * but don't pick off any messages. + * Conditions: + * port_mq locked + * at least one message on port_mq's message queue + * + * Returns: (on thread->ith_state) + * MACH_PEEK_READY ith_peekq contains a message queue + */ +void +ipc_mqueue_peek_on_thread( + ipc_mqueue_t port_mq, + mach_msg_option_t option, + thread_t thread) +{ + (void)option; + assert(option & MACH_PEEK_MSG); + assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL); + /* + * Take a reference on the mqueue's associated port: + * the peeking thread will be responsible to release this reference + * using ip_release_mq() + */ + ip_reference_mq(port_mq); + thread->ith_peekq = port_mq; + thread->ith_state = MACH_PEEK_READY; +} /* - * Routine: ipc_mqueue_select + * Routine: ipc_mqueue_select_on_thread * Purpose: * A receiver discovered that there was a message on the queue * before he had to block. Pick the message off the queue and - * "post" it to himself. + * "post" it to thread. * Conditions: * mqueue locked. + * thread not locked. * There is a message. + * No need to reserve prepost objects - it will never prepost + * * Returns: * MACH_MSG_SUCCESS Actually selected a message for ourselves. * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large */ void -ipc_mqueue_select( - ipc_mqueue_t mqueue, +ipc_mqueue_select_on_thread( + ipc_mqueue_t port_mq, + ipc_mqueue_t set_mq, mach_msg_option_t option, - mach_msg_size_t max_size) + mach_msg_size_t max_size, + thread_t thread) { - thread_t self = current_thread(); ipc_kmsg_t kmsg; - mach_port_seqno_t seqno; - mach_msg_return_t mr; - - mr = MACH_MSG_SUCCESS; - + mach_msg_return_t mr = MACH_MSG_SUCCESS; + mach_msg_size_t msize; /* * Do some sanity checking of our ability to receive * before pulling the message off the queue. */ - kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages); - + kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages); assert(kmsg != IKM_NULL); - if (kmsg->ikm_header.msgh_size + - REQUESTED_TRAILER_SIZE(option) > max_size) { - mr = MACH_RCV_TOO_LARGE; - } - /* * If we really can't receive it, but we had the * MACH_RCV_LARGE option set, then don't take it off * the queue, instead return the appropriate error * (and size needed). */ - if ((mr == MACH_RCV_TOO_LARGE) && (option & MACH_RCV_LARGE)) { - self->ith_kmsg = IKM_NULL; - self->ith_msize = kmsg->ikm_header.msgh_size; - self->ith_seqno = 0; - self->ith_state = mr; - return; + msize = ipc_kmsg_copyout_size(kmsg, thread->map); + if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) { + mr = MACH_RCV_TOO_LARGE; + if (option & MACH_RCV_LARGE) { + thread->ith_receiver_name = port_mq->imq_receiver_name; + thread->ith_kmsg = IKM_NULL; + thread->ith_msize = msize; + thread->ith_seqno = 0; + thread->ith_state = mr; + return; + } } - ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg); - ipc_mqueue_release_msgcount(mqueue); - self->ith_seqno = mqueue->imq_seqno++; - self->ith_kmsg = kmsg; - self->ith_state = mr; + ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg); +#if MACH_FLIPC + if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) + flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE); +#endif + ipc_mqueue_release_msgcount(port_mq, set_mq); + thread->ith_seqno = port_mq->imq_seqno++; + thread->ith_kmsg = kmsg; + thread->ith_state = mr; current_task()->messages_received++; return; } /* - * Routine: ipc_mqueue_destroy + * Routine: ipc_mqueue_peek_locked + * Purpose: + * Peek at a (non-set) message queue to see if it has a message + * matching the sequence number provided (if zero, then the + * first message in the queue) and return vital info about the + * message. + * + * Conditions: + * The ipc_mqueue_t is locked by callers. + * Other locks may be held by callers, so this routine cannot block. + * Caller holds reference on the message queue. + */ +unsigned +ipc_mqueue_peek_locked(ipc_mqueue_t mq, + mach_port_seqno_t * seqnop, + mach_msg_size_t * msg_sizep, + mach_msg_id_t * msg_idp, + mach_msg_max_trailer_t * msg_trailerp, + ipc_kmsg_t *kmsgp) +{ + ipc_kmsg_queue_t kmsgq; + ipc_kmsg_t kmsg; + mach_port_seqno_t seqno, msgoff; + unsigned res = 0; + + assert(!imq_is_set(mq)); + + seqno = 0; + if (seqnop != NULL) + seqno = *seqnop; + + if (seqno == 0) { + seqno = mq->imq_seqno; + msgoff = 0; + } else if (seqno >= mq->imq_seqno && + seqno < mq->imq_seqno + mq->imq_msgcount) { + msgoff = seqno - mq->imq_seqno; + } else + goto out; + + /* look for the message that would match that seqno */ + kmsgq = &mq->imq_messages; + kmsg = ipc_kmsg_queue_first(kmsgq); + while (msgoff-- && kmsg != IKM_NULL) { + kmsg = ipc_kmsg_queue_next(kmsgq, kmsg); + } + if (kmsg == IKM_NULL) + goto out; + + /* found one - return the requested info */ + if (seqnop != NULL) + *seqnop = seqno; + if (msg_sizep != NULL) + *msg_sizep = kmsg->ikm_header->msgh_size; + if (msg_idp != NULL) + *msg_idp = kmsg->ikm_header->msgh_id; + if (msg_trailerp != NULL) + memcpy(msg_trailerp, + (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header + + round_msg(kmsg->ikm_header->msgh_size)), + sizeof(mach_msg_max_trailer_t)); + if (kmsgp != NULL) + *kmsgp = kmsg; + + res = 1; + +out: + return res; +} + + +/* + * Routine: ipc_mqueue_peek + * Purpose: + * Peek at a (non-set) message queue to see if it has a message + * matching the sequence number provided (if zero, then the + * first message in the queue) and return vital info about the + * message. + * + * Conditions: + * The ipc_mqueue_t is unlocked. + * Locks may be held by callers, so this routine cannot block. + * Caller holds reference on the message queue. + */ +unsigned +ipc_mqueue_peek(ipc_mqueue_t mq, + mach_port_seqno_t * seqnop, + mach_msg_size_t * msg_sizep, + mach_msg_id_t * msg_idp, + mach_msg_max_trailer_t * msg_trailerp, + ipc_kmsg_t *kmsgp) +{ + unsigned res; + + imq_lock(mq); + + res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp, + msg_trailerp, kmsgp); + + imq_unlock(mq); + return res; +} + +/* + * Routine: ipc_mqueue_release_peek_ref + * Purpose: + * Release the reference on an mqueue's associated port which was + * granted to a thread in ipc_mqueue_peek_on_thread (on the + * MACH_PEEK_MSG thread wakeup path). + * + * Conditions: + * The ipc_mqueue_t should be locked on entry. + * The ipc_mqueue_t will be _unlocked_ on return + * (and potentially invalid!) + * + */ +void ipc_mqueue_release_peek_ref(ipc_mqueue_t mq) +{ + assert(!imq_is_set(mq)); + assert(imq_held(mq)); + + /* + * clear any preposts this mq may have generated + * (which would cause subsequent immediate wakeups) + */ + waitq_clear_prepost_locked(&mq->imq_wait_queue); + + imq_unlock(mq); + + /* + * release the port reference: we need to do this outside the lock + * because we might be holding the last port reference! + **/ + ip_release_mq(mq); +} + +/* + * peek at the contained port message queues, break prepost iteration as soon + * as we spot a message on one of the message queues referenced by the set's + * prepost list. No need to lock each message queue, as only the head of each + * queue is checked. If a message wasn't there before we entered here, no need + * to find it (if we do, great). + */ +static int mqueue_peek_iterator(void *ctx, struct waitq *waitq, + struct waitq_set *wqset) +{ + ipc_mqueue_t port_mq = (ipc_mqueue_t)waitq; + ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages; + + (void)ctx; + (void)wqset; + + if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) + return WQ_ITERATE_BREAK; /* break out of the prepost iteration */ + + return WQ_ITERATE_CONTINUE; +} + +/* + * Routine: ipc_mqueue_set_peek + * Purpose: + * Peek at a message queue set to see if it has any ports + * with messages. + * + * Conditions: + * Locks may be held by callers, so this routine cannot block. + * Caller holds reference on the message queue. + */ +unsigned +ipc_mqueue_set_peek(ipc_mqueue_t mq) +{ + int ret; + + imq_lock(mq); + + /* + * We may have raced with port destruction where the mqueue is marked + * as invalid. In that case, even though we don't have messages, we + * have an end-of-life event to deliver. + */ + if (!imq_is_valid(mq)) + return 1; + + ret = waitq_set_iterate_preposts(&mq->imq_set_queue, NULL, + mqueue_peek_iterator); + + imq_unlock(mq); + + return (ret == WQ_ITERATE_BREAK); +} + +/* + * Routine: ipc_mqueue_set_gather_member_names + * Purpose: + * Discover all ports which are members of a given port set. + * Because the waitq linkage mechanism was redesigned to save + * significan amounts of memory, it no longer keeps back-pointers + * from a port set to a port. Therefore, we must iterate over all + * ports within a given IPC space and individually query them to + * see if they are members of the given set. Port names of ports + * found to be members of the given set will be gathered into the + * provided 'names' array. Actual returned names are limited to + * maxnames entries, but we keep counting the actual number of + * members to let the caller decide to retry if necessary. + * + * Conditions: + * Locks may be held by callers, so this routine cannot block. + * Caller holds reference on the message queue (via port set). + */ +void +ipc_mqueue_set_gather_member_names( + ipc_space_t space, + ipc_mqueue_t set_mq, + ipc_entry_num_t maxnames, + mach_port_name_t *names, + ipc_entry_num_t *actualp) +{ + ipc_entry_t table; + ipc_entry_num_t tsize; + struct waitq_set *wqset; + ipc_entry_num_t actual = 0; + + assert(set_mq != IMQ_NULL); + wqset = &set_mq->imq_set_queue; + + assert(space != IS_NULL); + is_read_lock(space); + if (!is_active(space)) { + is_read_unlock(space); + goto out; + } + + if (!waitq_set_is_valid(wqset)) { + is_read_unlock(space); + goto out; + } + + table = space->is_table; + tsize = space->is_table_size; + for (ipc_entry_num_t idx = 0; idx < tsize; idx++) { + ipc_entry_t entry = &table[idx]; + + /* only receive rights can be members of port sets */ + if ((entry->ie_bits & MACH_PORT_TYPE_RECEIVE) != MACH_PORT_TYPE_NONE) { + __IGNORE_WCASTALIGN(ipc_port_t port = (ipc_port_t)entry->ie_object); + ipc_mqueue_t mq = &port->ip_messages; + + assert(IP_VALID(port)); + if (ip_active(port) && + waitq_member(&mq->imq_wait_queue, wqset)) { + if (actual < maxnames) + names[actual] = mq->imq_receiver_name; + actual++; + } + } + } + + is_read_unlock(space); + +out: + *actualp = actual; +} + + +/* + * Routine: ipc_mqueue_destroy_locked * Purpose: - * Destroy a message queue. Set any blocked senders running. + * Destroy a (non-set) message queue. + * Set any blocked senders running. * Destroy the kmsgs in the queue. * Conditions: - * Nothing locked. + * mqueue locked * Receivers were removed when the receive right was "changed" */ -void -ipc_mqueue_destroy( - ipc_mqueue_t mqueue) +boolean_t +ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue) { ipc_kmsg_queue_t kmqueue; ipc_kmsg_t kmsg; - spl_t s; + boolean_t reap = FALSE; + assert(!imq_is_set(mqueue)); - s = splsched(); - imq_lock(mqueue); /* * rouse all blocked senders + * (don't boost anyone - we're tearing this queue down) + * (never preposts) */ mqueue->imq_fullwaiters = FALSE; - wait_queue_wakeup64_all_locked( - &mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_AWAKENED, - FALSE); + waitq_wakeup64_all_locked(&mqueue->imq_wait_queue, + IPC_MQUEUE_FULL, + THREAD_RESTART, + NULL, + WAITQ_ALL_PRIORITIES, + WAITQ_KEEP_LOCKED); + /* + * Move messages from the specified queue to the per-thread + * clean/drain queue while we have the mqueue lock. + */ kmqueue = &mqueue->imq_messages; - while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) { - imq_unlock(mqueue); - splx(s); +#if MACH_FLIPC + if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(mqueue->imq_fport)) + flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE); +#endif + boolean_t first; + first = ipc_kmsg_delayed_destroy(kmsg); + if (first) + reap = first; + } - ipc_kmsg_destroy_dest(kmsg); + /* + * Wipe out message count, both for messages about to be + * reaped and for reserved space for (previously) woken senders. + * This is the indication to them that their reserved space is gone + * (the mqueue was destroyed). + */ + mqueue->imq_msgcount = 0; - s = splsched(); - imq_lock(mqueue); - } - imq_unlock(mqueue); - splx(s); + /* invalidate the waitq for subsequent mqueue operations */ + waitq_invalidate_locked(&mqueue->imq_wait_queue); + + /* clear out any preposting we may have done */ + waitq_clear_prepost_locked(&mqueue->imq_wait_queue); + + /* + * assert that we are destroying / invalidating a queue that's + * not a member of any other queue. + */ + assert(mqueue->imq_preposts == 0); + assert(mqueue->imq_in_pset == 0); + + return reap; } /* @@ -883,10 +1552,10 @@ ipc_mqueue_set_qlimit( ipc_mqueue_t mqueue, mach_port_msgcount_t qlimit) { - spl_t s; + + assert(qlimit <= MACH_PORT_QLIMIT_MAX); /* wake up senders allowed by the new qlimit */ - s = splsched(); imq_lock(mqueue); if (qlimit > mqueue->imq_qlimit) { mach_port_msgcount_t i, wakeup; @@ -895,19 +1564,27 @@ ipc_mqueue_set_qlimit( wakeup = qlimit - mqueue->imq_qlimit; for (i = 0; i < wakeup; i++) { - if (wait_queue_wakeup64_one_locked( - &mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_AWAKENED, - FALSE) == KERN_NOT_WAITING) { - mqueue->imq_fullwaiters = FALSE; - break; - } + /* + * boost the priority of the awoken thread + * (WAITQ_PROMOTE_PRIORITY) to ensure it uses + * the message queue slot we've just reserved. + * + * NOTE: this will never prepost + */ + if (waitq_wakeup64_one_locked(&mqueue->imq_wait_queue, + IPC_MQUEUE_FULL, + THREAD_AWAKENED, + NULL, + WAITQ_PROMOTE_PRIORITY, + WAITQ_KEEP_LOCKED) == KERN_NOT_WAITING) { + mqueue->imq_fullwaiters = FALSE; + break; + } + mqueue->imq_msgcount++; /* give it to the awakened thread */ } - } + } mqueue->imq_qlimit = qlimit; imq_unlock(mqueue); - splx(s); } /* @@ -922,13 +1599,9 @@ ipc_mqueue_set_seqno( ipc_mqueue_t mqueue, mach_port_seqno_t seqno) { - spl_t s; - - s = splsched(); imq_lock(mqueue); mqueue->imq_seqno = seqno; imq_unlock(mqueue); - splx(s); } @@ -961,7 +1634,7 @@ ipc_mqueue_copyin( ipc_mqueue_t mqueue; is_read_lock(space); - if (!space->is_active) { + if (!is_active(space)) { is_read_unlock(space); return MACH_RCV_INVALID_NAME; } @@ -976,9 +1649,8 @@ ipc_mqueue_copyin( if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) { ipc_port_t port; - ipc_pset_t pset; - port = (ipc_port_t) object; + __IGNORE_WCASTALIGN(port = (ipc_port_t) object); assert(port != IP_NULL); ip_lock(port); @@ -991,12 +1663,11 @@ ipc_mqueue_copyin( } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) { ipc_pset_t pset; - pset = (ipc_pset_t) object; + __IGNORE_WCASTALIGN(pset = (ipc_pset_t) object); assert(pset != IPS_NULL); ips_lock(pset); assert(ips_active(pset)); - assert(pset->ips_local_name == name); is_read_unlock(space); mqueue = &pset->ips_messages; @@ -1017,4 +1688,3 @@ ipc_mqueue_copyin( *mqueuep = mqueue; return MACH_MSG_SUCCESS; } -