X-Git-Url: https://git.saurik.com/apple/xnu.git/blobdiff_plain/3e170ce000f1506b7b5d2c5c7faec85ceabb573d..4ba76501152d51ccb5647018f3192c6096367d48:/osfmk/ipc/ipc_mqueue.c diff --git a/osfmk/ipc/ipc_mqueue.c b/osfmk/ipc/ipc_mqueue.c index 1b5d82c19..360879748 100644 --- a/osfmk/ipc/ipc_mqueue.c +++ b/osfmk/ipc/ipc_mqueue.c @@ -2,7 +2,7 @@ * Copyright (c) 2000-2007 Apple Inc. All rights reserved. * * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ - * + * * This file contains Original Code and/or Modifications of Original Code * as defined in and that are subject to the Apple Public Source License * Version 2.0 (the 'License'). You may not use this file except in @@ -11,10 +11,10 @@ * unlawful or unlicensed copies of an Apple operating system, or to * circumvent, violate, or enable the circumvention or violation of, any * terms of an Apple operating system software license agreement. - * + * * Please obtain a copy of the License at * http://www.opensource.apple.com/apsl/ and read it before using this file. - * + * * The Original Code and all software distributed under the License are * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, @@ -22,34 +22,34 @@ * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. * Please see the License for the specific language governing rights and * limitations under the License. - * + * * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ */ /* * @OSF_FREE_COPYRIGHT@ */ -/* +/* * Mach Operating System * Copyright (c) 1991,1990,1989 Carnegie Mellon University * All Rights Reserved. - * + * * Permission to use, copy, modify and distribute this software and its * documentation is hereby granted, provided that both the copyright * notice and this permission notice appear in all copies of the * software, derivative works or modified versions, and any portions * thereof, and that both notices appear in supporting documentation. - * + * * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. - * + * * Carnegie Mellon requests users of this software to return to - * + * * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU * School of Computer Science * Carnegie Mellon University * Pittsburgh PA 15213-3890 - * + * * any improvements or extensions that they make and grant Carnegie Mellon * the rights to redistribute these changes. */ @@ -68,7 +68,7 @@ * is included in support of clause 2.2 (b) of the Apple Public License, * Version 2.0. */ - + #include #include @@ -78,27 +78,40 @@ #include #include #include -#include /* XXX - for mach_msg_receive_continue */ +#include /* XXX - for mach_msg_receive_continue */ #include #include #include #include +#include #include #include #include #include #include +#if MACH_FLIPC +#include +#endif + #ifdef __LP64__ #include #endif -int ipc_mqueue_full; /* address is event for queue space */ -int ipc_mqueue_rcv; /* address is event for message arrival */ +#include + +extern char *proc_name_address(void *p); + +int ipc_mqueue_full; /* address is event for queue space */ +int ipc_mqueue_rcv; /* address is event for message arrival */ /* forward declarations */ -void ipc_mqueue_receive_results(wait_result_t result); +static void ipc_mqueue_receive_results(wait_result_t result); +static void ipc_mqueue_peek_on_thread( + ipc_mqueue_t port_mq, + mach_msg_option_t option, + thread_t thread); /* * Routine: ipc_mqueue_init @@ -107,33 +120,44 @@ void ipc_mqueue_receive_results(wait_result_t result); */ void ipc_mqueue_init( - ipc_mqueue_t mqueue, - boolean_t is_set, - uint64_t *reserved_link) + ipc_mqueue_t mqueue, + ipc_mqueue_kind_t kind) { - if (is_set) { + switch (kind) { + case IPC_MQUEUE_KIND_SET: waitq_set_init(&mqueue->imq_set_queue, - SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST|SYNC_POLICY_DISABLE_IRQ, - reserved_link); - } else { - waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO|SYNC_POLICY_DISABLE_IRQ); + SYNC_POLICY_FIFO | SYNC_POLICY_PREPOST, + NULL, NULL); + break; + case IPC_MQUEUE_KIND_NONE: /* cheat: we really should have "no" mqueue */ + case IPC_MQUEUE_KIND_PORT: + waitq_init(&mqueue->imq_wait_queue, + SYNC_POLICY_FIFO | SYNC_POLICY_TURNSTILE_PROXY); ipc_kmsg_queue_init(&mqueue->imq_messages); mqueue->imq_seqno = 0; mqueue->imq_msgcount = 0; mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT; + mqueue->imq_context = 0; mqueue->imq_fullwaiters = FALSE; +#if MACH_FLIPC + mqueue->imq_fport = FPORT_NULL; +#endif + break; } + klist_init(&mqueue->imq_klist); } -void ipc_mqueue_deinit( - ipc_mqueue_t mqueue) +void +ipc_mqueue_deinit( + ipc_mqueue_t mqueue) { boolean_t is_set = imq_is_set(mqueue); - if (is_set) + if (is_set) { waitq_set_deinit(&mqueue->imq_set_queue); - else + } else { waitq_deinit(&mqueue->imq_wait_queue); + } } /* @@ -146,11 +170,10 @@ void ipc_mqueue_deinit( * mq is unlocked */ void -imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost, spl_t *spl) +imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost) { *reserved_prepost = waitq_prepost_reserve(&mq->imq_wait_queue, 0, - WAITQ_KEEP_LOCKED, spl); - + WAITQ_KEEP_LOCKED); } @@ -163,11 +186,10 @@ imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost, spl_t *spl) * mq is locked */ void -imq_release_and_unlock(ipc_mqueue_t mq, uint64_t reserved_prepost, spl_t spl) +imq_release_and_unlock(ipc_mqueue_t mq, uint64_t reserved_prepost) { assert(imq_held(mq)); waitq_unlock(&mq->imq_wait_queue); - splx(spl); waitq_prepost_release_reserve(reserved_prepost); } @@ -186,14 +208,13 @@ imq_release_and_unlock(ipc_mqueue_t mq, uint64_t reserved_prepost, spl_t spl) boolean_t ipc_mqueue_member( - ipc_mqueue_t port_mqueue, - ipc_mqueue_t set_mqueue) + ipc_mqueue_t port_mqueue, + ipc_mqueue_t set_mqueue) { struct waitq *port_waitq = &port_mqueue->imq_wait_queue; struct waitq_set *set_waitq = &set_mqueue->imq_set_queue; return waitq_member(port_waitq, set_waitq); - } /* @@ -205,8 +226,8 @@ ipc_mqueue_member( kern_return_t ipc_mqueue_remove( - ipc_mqueue_t mqueue, - ipc_mqueue_t set_mqueue) + ipc_mqueue_t mqueue, + ipc_mqueue_t set_mqueue) { struct waitq *mq_waitq = &mqueue->imq_wait_queue; struct waitq_set *set_waitq = &set_mqueue->imq_set_queue; @@ -220,14 +241,20 @@ ipc_mqueue_remove( * Remove the mqueue from all the sets it is a member of * Conditions: * Nothing locked. + * Returns: + * mqueue unlocked and set links deallocated */ void -ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue) +ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue) { struct waitq *mq_waitq = &mqueue->imq_wait_queue; + kern_return_t kr; - waitq_unlink_all(mq_waitq); - return; + imq_lock(mqueue); + + assert(waitq_valid(mq_waitq)); + kr = waitq_unlink_all_unlock(mq_waitq); + /* mqueue unlocked and set links deallocated */ } /* @@ -237,13 +264,18 @@ ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue) * Also removes the queue from any containing sets. * Conditions: * Nothing locked. + * Returns: + * mqueue unlocked all set links deallocated */ void -ipc_mqueue_remove_all(ipc_mqueue_t mqueue) +ipc_mqueue_remove_all(ipc_mqueue_t mqueue) { struct waitq_set *mq_setq = &mqueue->imq_set_queue; - waitq_set_unlink_all(mq_setq); - return; + + imq_lock(mqueue); + assert(waitqs_is_set(mq_setq)); + waitq_set_unlink_all_unlock(mq_setq); + /* mqueue unlocked set links deallocated */ } @@ -260,21 +292,20 @@ ipc_mqueue_remove_all(ipc_mqueue_t mqueue) */ kern_return_t ipc_mqueue_add( - ipc_mqueue_t port_mqueue, - ipc_mqueue_t set_mqueue, - uint64_t *reserved_link, - uint64_t *reserved_prepost) + ipc_mqueue_t port_mqueue, + ipc_mqueue_t set_mqueue, + uint64_t *reserved_link, + uint64_t *reserved_prepost) { struct waitq *port_waitq = &port_mqueue->imq_wait_queue; struct waitq_set *set_waitq = &set_mqueue->imq_set_queue; ipc_kmsg_queue_t kmsgq; ipc_kmsg_t kmsg, next; - kern_return_t kr; - spl_t s; + kern_return_t kr; assert(reserved_link && *reserved_link != 0); + assert(waitqs_is_linked(set_waitq)); - s = splsched(); imq_lock(port_mqueue); /* @@ -284,7 +315,6 @@ ipc_mqueue_add( kr = waitq_link(port_waitq, set_waitq, WAITQ_ALREADY_LOCKED, reserved_link); if (kr != KERN_SUCCESS) { imq_unlock(port_mqueue); - splx(s); return kr; } @@ -295,8 +325,8 @@ ipc_mqueue_add( */ kmsgq = &port_mqueue->imq_messages; for (kmsg = ipc_kmsg_queue_first(kmsgq); - kmsg != IKM_NULL; - kmsg = next) { + kmsg != IKM_NULL; + kmsg = next) { next = ipc_kmsg_queue_next(kmsgq, kmsg); for (;;) { @@ -304,15 +334,17 @@ ipc_mqueue_add( mach_msg_size_t msize; spl_t th_spl; - th = waitq_wakeup64_identity_locked( - port_waitq, - IPC_MQUEUE_RECEIVE, - THREAD_AWAKENED, &th_spl, - reserved_prepost, WAITQ_KEEP_LOCKED); + th = waitq_wakeup64_identify_locked( + port_waitq, + IPC_MQUEUE_RECEIVE, + THREAD_AWAKENED, &th_spl, + reserved_prepost, WAITQ_ALL_PRIORITIES, + WAITQ_KEEP_LOCKED); /* waitq/mqueue still locked, thread locked */ - if (th == THREAD_NULL) + if (th == THREAD_NULL) { goto leave; + } /* * If the receiver waited with a facility not directly @@ -321,9 +353,20 @@ ipc_mqueue_add( * go look for another thread that can. */ if (th->ith_state != MACH_RCV_IN_PROGRESS) { - thread_unlock(th); - splx(th_spl); - continue; + if (th->ith_state == MACH_PEEK_IN_PROGRESS) { + /* + * wakeup the peeking thread, but + * continue to loop over the threads + * waiting on the port's mqueue to see + * if there are any actual receivers + */ + ipc_mqueue_peek_on_thread(port_mqueue, + th->ith_option, + th); + } + thread_unlock(th); + splx(th_spl); + continue; } /* @@ -335,8 +378,8 @@ ipc_mqueue_add( * just move onto the next. */ msize = ipc_kmsg_copyout_size(kmsg, th->map); - if (th->ith_msize < - (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(th), th->ith_option))) { + if (th->ith_rsize < + (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th), th->ith_option))) { th->ith_state = MACH_RCV_TOO_LARGE; th->ith_msize = msize; if (th->ith_option & MACH_RCV_LARGE) { @@ -359,21 +402,48 @@ ipc_mqueue_add( * so give it to him. */ ipc_kmsg_rmqueue(kmsgq, kmsg); +#if MACH_FLIPC + mach_node_t node = kmsg->ikm_node; +#endif ipc_mqueue_release_msgcount(port_mqueue, IMQ_NULL); th->ith_kmsg = kmsg; th->ith_seqno = port_mqueue->imq_seqno++; thread_unlock(th); splx(th_spl); +#if MACH_FLIPC + if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) { + flipc_msg_ack(node, port_mqueue, TRUE); + } +#endif break; /* go to next message */ } } - leave: +leave: imq_unlock(port_mqueue); - splx(s); return KERN_SUCCESS; } + +/* + * Routine: ipc_mqueue_has_klist + * Purpose: + * Returns whether the given mqueue imq_klist field can be used as a klist. + */ +static inline bool +ipc_mqueue_has_klist(ipc_mqueue_t mqueue) +{ + ipc_object_t object = imq_to_object(mqueue); + if (io_otype(object) != IOT_PORT) { + return true; + } + ipc_port_t port = ip_from_mq(mqueue); + if (port->ip_specialreply) { + return false; + } + return port->ip_sync_link_state == PORT_SYNC_LINK_ANY; +} + /* * Routine: ipc_mqueue_changed * Purpose: @@ -381,27 +451,64 @@ ipc_mqueue_add( * Conditions: * The message queue is locked. */ - void ipc_mqueue_changed( - ipc_mqueue_t mqueue) + ipc_space_t space, + ipc_mqueue_t mqueue) { + if (ipc_mqueue_has_klist(mqueue) && SLIST_FIRST(&mqueue->imq_klist)) { + /* + * Indicate that this message queue is vanishing + * + * When this is called, the associated receive right may be in flight + * between two tasks: the one it used to live in, and the one that armed + * a port destroyed notification for it. + * + * The new process may want to register the port it gets back with an + * EVFILT_MACHPORT filter again, and may have pending sync IPC on this + * port pending already, in which case we want the imq_klist field to be + * reusable for nefarious purposes. + * + * Fortunately, we really don't need this linkage anymore after this + * point as EV_VANISHED / EV_EOF will be the last thing delivered ever. + * + * Note: we don't have the space lock here, however, this covers the + * case of when a task is terminating the space, triggering + * several knote_vanish() calls. + * + * We don't need the lock to observe that the space is inactive as + * we just deactivated it on the same thread. + * + * We still need to call knote_vanish() so that the knote is + * marked with EV_VANISHED or EV_EOF so that the detach step + * in filt_machportdetach is skipped correctly. + */ + assert(space); + knote_vanish(&mqueue->imq_klist, is_active(space)); + } + + if (io_otype(imq_to_object(mqueue)) == IOT_PORT) { + ipc_port_adjust_sync_link_state_locked(ip_from_mq(mqueue), PORT_SYNC_LINK_ANY, NULL); + } else { + klist_init(&mqueue->imq_klist); + } + waitq_wakeup64_all_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_RECEIVE, - THREAD_RESTART, - NULL, - WAITQ_ALL_PRIORITIES, - WAITQ_KEEP_LOCKED); + IPC_MQUEUE_RECEIVE, + THREAD_RESTART, + NULL, + WAITQ_ALL_PRIORITIES, + WAITQ_KEEP_LOCKED); } - + /* * Routine: ipc_mqueue_send * Purpose: * Send a message to a message queue. The message holds a reference - * for the destination port for this message queue in the + * for the destination port for this message queue in the * msgh_remote_port field. * * If unsuccessful, the caller still has possession of @@ -416,11 +523,10 @@ ipc_mqueue_changed( */ mach_msg_return_t ipc_mqueue_send( - ipc_mqueue_t mqueue, - ipc_kmsg_t kmsg, - mach_msg_option_t option, - mach_msg_timeout_t send_timeout, - spl_t s) + ipc_mqueue_t mqueue, + ipc_kmsg_t kmsg, + mach_msg_option_t option, + mach_msg_timeout_t send_timeout) { int wresult; @@ -431,70 +537,88 @@ ipc_mqueue_send( * 3) Message is sent to a send-once right. */ if (!imq_full(mqueue) || - (!imq_full_kernel(mqueue) && - ((option & MACH_SEND_ALWAYS) || - (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) == - MACH_MSG_TYPE_PORT_SEND_ONCE)))) { + (!imq_full_kernel(mqueue) && + ((option & MACH_SEND_ALWAYS) || + (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) == + MACH_MSG_TYPE_PORT_SEND_ONCE)))) { mqueue->imq_msgcount++; assert(mqueue->imq_msgcount > 0); imq_unlock(mqueue); - splx(s); } else { thread_t cur_thread = current_thread(); + ipc_port_t port = ip_from_mq(mqueue); + struct turnstile *send_turnstile = TURNSTILE_NULL; uint64_t deadline; - /* + /* * We have to wait for space to be granted to us. */ if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) { imq_unlock(mqueue); - splx(s); return MACH_SEND_TIMED_OUT; } if (imq_full_kernel(mqueue)) { imq_unlock(mqueue); - splx(s); return MACH_SEND_NO_BUFFER; } mqueue->imq_fullwaiters = TRUE; - thread_lock(cur_thread); - if (option & MACH_SEND_TIMEOUT) - clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline); - else + + if (option & MACH_SEND_TIMEOUT) { + clock_interval_to_deadline(send_timeout, 1000 * NSEC_PER_USEC, &deadline); + } else { deadline = 0; - wresult = waitq_assert_wait64_locked( - &mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_ABORTSAFE, - TIMEOUT_URGENCY_USER_NORMAL, - deadline, TIMEOUT_NO_LEEWAY, - cur_thread); - thread_unlock(cur_thread); + } + + thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend); + + send_turnstile = turnstile_prepare((uintptr_t)port, + port_send_turnstile_address(port), + TURNSTILE_NULL, TURNSTILE_SYNC_IPC); + + ipc_port_send_update_inheritor(port, send_turnstile, + TURNSTILE_DELAYED_UPDATE); + + wresult = waitq_assert_wait64_leeway( + &send_turnstile->ts_waitq, + IPC_MQUEUE_FULL, + THREAD_ABORTSAFE, + TIMEOUT_URGENCY_USER_NORMAL, + deadline, + TIMEOUT_NO_LEEWAY); + imq_unlock(mqueue); - splx(s); - + turnstile_update_inheritor_complete(send_turnstile, + TURNSTILE_INTERLOCK_NOT_HELD); + if (wresult == THREAD_WAITING) { wresult = thread_block(THREAD_CONTINUE_NULL); counter(c_ipc_mqueue_send_block++); } - - switch (wresult) { + /* Call turnstile complete with interlock held */ + imq_lock(mqueue); + turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL, TURNSTILE_SYNC_IPC); + imq_unlock(mqueue); + + /* Call cleanup after dropping the interlock */ + turnstile_cleanup(); + + switch (wresult) { case THREAD_AWAKENED: - /* + /* * we can proceed - inherited msgcount from waker * or the message queue has been destroyed and the msgcount * has been reset to zero (will detect in ipc_mqueue_post()). */ break; - + case THREAD_TIMED_OUT: assert(option & MACH_SEND_TIMEOUT); return MACH_SEND_TIMED_OUT; - + case THREAD_INTERRUPTED: return MACH_SEND_INTERRUPTED; - + case THREAD_RESTART: /* mqueue is being destroyed */ return MACH_SEND_INVALID_DEST; @@ -503,10 +627,65 @@ ipc_mqueue_send( } } - ipc_mqueue_post(mqueue, kmsg); + ipc_mqueue_post(mqueue, kmsg, option); return MACH_MSG_SUCCESS; } +/* + * Routine: ipc_mqueue_override_send + * Purpose: + * Set an override qos on the first message in the queue + * (if the queue is full). This is a send-possible override + * that will go away as soon as we drain a message from the + * queue. + * + * Conditions: + * The message queue is not locked. + * The caller holds a reference on the message queue. + */ +extern void +ipc_mqueue_override_send( + ipc_mqueue_t mqueue, + mach_msg_priority_t override) +{ + boolean_t __unused full_queue_empty = FALSE; + + imq_lock(mqueue); + assert(imq_valid(mqueue)); + assert(!imq_is_set(mqueue)); + + if (imq_full(mqueue)) { + ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages); + + if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override)) { + ipc_object_t object = imq_to_object(mqueue); + assert(io_otype(object) == IOT_PORT); + ipc_port_t port = ip_object_to_port(object); + if (ip_active(port) && + port->ip_receiver_name != MACH_PORT_NULL && + is_active(port->ip_receiver) && + ipc_mqueue_has_klist(mqueue)) { + KNOTE(&mqueue->imq_klist, 0); + } + } + if (!first) { + full_queue_empty = TRUE; + } + } + imq_unlock(mqueue); + +#if DEVELOPMENT || DEBUG + if (full_queue_empty) { + ipc_port_t port = ip_from_mq(mqueue); + int dst_pid = 0; + if (ip_active(port) && !port->ip_tempowner && + port->ip_receiver_name && port->ip_receiver && + port->ip_receiver != ipc_space_kernel) { + dst_pid = task_pid(port->ip_receiver->is_task); + } + } +#endif +} /* * Routine: ipc_mqueue_release_msgcount @@ -523,26 +702,32 @@ ipc_mqueue_send( void ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq) { + struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(port_mq)); (void)set_mq; assert(imq_held(port_mq)); assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages)); port_mq->imq_msgcount--; - if (!imq_full(port_mq) && port_mq->imq_fullwaiters) { + if (!imq_full(port_mq) && port_mq->imq_fullwaiters && + send_turnstile != TURNSTILE_NULL) { /* * boost the priority of the awoken thread * (WAITQ_PROMOTE_PRIORITY) to ensure it uses * the message queue slot we've just reserved. * * NOTE: this will never prepost + * + * The wakeup happens on a turnstile waitq + * which will wakeup the highest priority waiter. + * A potential downside of this would be starving low + * priority senders if there is a constant churn of + * high priority threads trying to send to this port. */ - if (waitq_wakeup64_one_locked(&port_mq->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_AWAKENED, - NULL, - WAITQ_PROMOTE_PRIORITY, - WAITQ_KEEP_LOCKED) != KERN_SUCCESS) { + if (waitq_wakeup64_one(&send_turnstile->ts_waitq, + IPC_MQUEUE_FULL, + THREAD_AWAKENED, + WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) { port_mq->imq_fullwaiters = FALSE; } else { /* gave away our slot - add reference back */ @@ -552,7 +737,7 @@ ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq) if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) { /* no more msgs: invalidate the port's prepost object */ - waitq_clear_prepost_locked(&port_mq->imq_wait_queue, NULL); + waitq_clear_prepost_locked(&port_mq->imq_wait_queue); } } @@ -569,11 +754,14 @@ ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq) */ void ipc_mqueue_post( - register ipc_mqueue_t mqueue, - register ipc_kmsg_t kmsg) + ipc_mqueue_t mqueue, + ipc_kmsg_t kmsg, + mach_msg_option_t __unused option) { - spl_t s; uint64_t reserved_prepost = 0; + boolean_t destroy_msg = FALSE; + + ipc_kmsg_trace_send(kmsg, option); /* * While the msg queue is locked, we have control of the @@ -581,28 +769,58 @@ ipc_mqueue_post( * * Check for a receiver for the message. */ - imq_reserve_and_lock(mqueue, &reserved_prepost, &s); + imq_reserve_and_lock(mqueue, &reserved_prepost); + + /* we may have raced with port destruction! */ + if (!imq_valid(mqueue)) { + destroy_msg = TRUE; + goto out_unlock; + } + for (;;) { struct waitq *waitq = &mqueue->imq_wait_queue; spl_t th_spl; thread_t receiver; mach_msg_size_t msize; - receiver = waitq_wakeup64_identity_locked(waitq, - IPC_MQUEUE_RECEIVE, - THREAD_AWAKENED, - &th_spl, - &reserved_prepost, - WAITQ_KEEP_LOCKED); + receiver = waitq_wakeup64_identify_locked(waitq, + IPC_MQUEUE_RECEIVE, + THREAD_AWAKENED, + &th_spl, + &reserved_prepost, + WAITQ_ALL_PRIORITIES, + WAITQ_KEEP_LOCKED); /* waitq still locked, thread locked */ if (receiver == THREAD_NULL) { - - /* - * no receivers; queue kmsg if space still reserved. + /* + * no receivers; queue kmsg if space still reserved + * Reservations are cancelled when the port goes inactive. + * note that this will enqueue the message for any + * "peeking" receivers. + * + * Also, post the knote to wake up any threads waiting + * on that style of interface if this insertion is of + * note (first insertion, or adjusted override qos all + * the way to the head of the queue). + * + * This is just for ports. portset knotes are stay-active, + * and their threads get awakened through the !MACH_RCV_IN_PROGRESS + * logic below). */ if (mqueue->imq_msgcount > 0) { - ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg); + if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) { + /* if the space is dead there is no point calling KNOTE */ + ipc_object_t object = imq_to_object(mqueue); + assert(io_otype(object) == IOT_PORT); + ipc_port_t port = ip_object_to_port(object); + if (ip_active(port) && + port->ip_receiver_name != MACH_PORT_NULL && + is_active(port->ip_receiver) && + ipc_mqueue_has_klist(mqueue)) { + KNOTE(&mqueue->imq_klist, 0); + } + } break; } @@ -610,35 +828,49 @@ ipc_mqueue_post( * Otherwise, the message queue must belong to an inactive * port, so just destroy the message and pretend it was posted. */ - /* clear the waitq boost we may have been given */ - waitq_clear_promotion_locked(waitq, current_thread()); - imq_release_and_unlock(mqueue, reserved_prepost, s); - ipc_kmsg_destroy(kmsg); - current_task()->messages_sent++; - return; + destroy_msg = TRUE; + goto out_unlock; } - + /* - * If the receiver waited with a facility not directly - * related to Mach messaging, then it isn't prepared to get - * handed the message directly. Just set it running, and - * go look for another thread that can. + * If a thread is attempting a "peek" into the message queue + * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the + * thread running. A successful peek is essentially the same as + * message delivery since the peeking thread takes responsibility + * for delivering the message and (eventually) removing it from + * the mqueue. Only one thread can successfully use the peek + * facility on any given port, so we exit the waitq loop after + * encountering such a thread. + */ + if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) { + ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg); + ipc_mqueue_peek_on_thread(mqueue, receiver->ith_option, receiver); + thread_unlock(receiver); + splx(th_spl); + break; /* Message was posted, so break out of loop */ + } + + /* + * If the receiver waited with a facility not directly related + * to Mach messaging, then it isn't prepared to get handed the + * message directly. Just set it running, and go look for + * another thread that can. */ if (receiver->ith_state != MACH_RCV_IN_PROGRESS) { - thread_unlock(receiver); - splx(th_spl); - continue; + thread_unlock(receiver); + splx(th_spl); + continue; } - + /* * We found a waiting thread. * If the message is too large or the scatter list is too small * the thread we wake up will get that as its status. */ - msize = ipc_kmsg_copyout_size(kmsg, receiver->map); - if (receiver->ith_msize < - (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(receiver), receiver->ith_option))) { + msize = ipc_kmsg_copyout_size(kmsg, receiver->map); + if (receiver->ith_rsize < + (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver), receiver->ith_option))) { receiver->ith_msize = msize; receiver->ith_state = MACH_RCV_TOO_LARGE; } else { @@ -652,14 +884,22 @@ ipc_mqueue_post( */ if ((receiver->ith_state == MACH_MSG_SUCCESS) || !(receiver->ith_option & MACH_RCV_LARGE)) { - receiver->ith_kmsg = kmsg; receiver->ith_seqno = mqueue->imq_seqno++; +#if MACH_FLIPC + mach_node_t node = kmsg->ikm_node; +#endif thread_unlock(receiver); splx(th_spl); /* we didn't need our reserved spot in the queue */ ipc_mqueue_release_msgcount(mqueue, IMQ_NULL); + +#if MACH_FLIPC + if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) { + flipc_msg_ack(node, mqueue, TRUE); + } +#endif break; } @@ -675,20 +915,24 @@ ipc_mqueue_post( splx(th_spl); } +out_unlock: /* clear the waitq boost we may have been given */ waitq_clear_promotion_locked(&mqueue->imq_wait_queue, current_thread()); - imq_release_and_unlock(mqueue, reserved_prepost, s); - + imq_release_and_unlock(mqueue, reserved_prepost); + if (destroy_msg) { + ipc_kmsg_destroy(kmsg); + } + current_task()->messages_sent++; return; } -/* static */ void +static void ipc_mqueue_receive_results(wait_result_t saved_wait_result) { - thread_t self = current_thread(); - mach_msg_option_t option = self->ith_option; + thread_t self = current_thread(); + mach_msg_option_t option = self->ith_option; /* * why did we wake up? @@ -727,6 +971,7 @@ ipc_mqueue_receive_results(wait_result_t saved_wait_result) } case MACH_MSG_SUCCESS: + case MACH_PEEK_READY: return; default: @@ -752,22 +997,17 @@ ipc_mqueue_receive_continue( * Purpose: * Receive a message from a message queue. * - * If continuation is non-zero, then we might discard - * our kernel stack when we block. We will continue - * after unblocking by executing continuation. - * - * If resume is true, then we are resuming a receive - * operation after a blocked receive discarded our stack. * Conditions: * Our caller must hold a reference for the port or port set * to which this queue belongs, to keep the queue * from being deallocated. * * The kmsg is returned with clean header fields - * and with the circular bit turned off. + * and with the circular bit turned off through the ith_kmsg + * field of the thread's receive continuation state. * Returns: - * MACH_MSG_SUCCESS Message returned in kmsgp. - * MACH_RCV_TOO_LARGE Message size returned in kmsgp. + * MACH_MSG_SUCCESS Message returned in ith_kmsg. + * MACH_RCV_TOO_LARGE Message size returned in ith_msize. * MACH_RCV_TIMED_OUT No message obtained. * MACH_RCV_INTERRUPTED No message obtained. * MACH_RCV_PORT_DIED Port/set died; no message. @@ -784,30 +1024,35 @@ ipc_mqueue_receive( int interruptible) { wait_result_t wresult; - thread_t self = current_thread(); - - wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size, - rcv_timeout, interruptible, - self); - if (wresult == THREAD_NOT_WAITING) - return; + thread_t self = current_thread(); + + imq_lock(mqueue); + wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size, + rcv_timeout, interruptible, + self); + /* mqueue unlocked */ + if (wresult == THREAD_NOT_WAITING) { + return; + } if (wresult == THREAD_WAITING) { - counter((interruptible == THREAD_ABORTSAFE) ? - c_ipc_mqueue_receive_block_user++ : - c_ipc_mqueue_receive_block_kernel++); + counter((interruptible == THREAD_ABORTSAFE) ? + c_ipc_mqueue_receive_block_user++ : + c_ipc_mqueue_receive_block_kernel++); - if (self->ith_continuation) + if (self->ith_continuation) { thread_block(ipc_mqueue_receive_continue); - /* NOTREACHED */ + } + /* NOTREACHED */ wresult = thread_block(THREAD_CONTINUE_NULL); } ipc_mqueue_receive_results(wresult); } -static int mqueue_process_prepost_receive(void *ctx, struct waitq *waitq, - struct waitq_set *wqset) +static int +mqueue_process_prepost_receive(void *ctx, struct waitq *waitq, + struct waitq_set *wqset) { ipc_mqueue_t port_mq, *pmq_ptr; @@ -818,8 +1063,9 @@ static int mqueue_process_prepost_receive(void *ctx, struct waitq *waitq, * If there are no messages on this queue, skip it and remove * it from the prepost list */ - if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) + if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) { return WQ_ITERATE_INVALIDATE_CONTINUE; + } /* * There are messages waiting on this port. @@ -827,14 +1073,27 @@ static int mqueue_process_prepost_receive(void *ctx, struct waitq *waitq, * waitq locked. */ pmq_ptr = (ipc_mqueue_t *)ctx; - if (pmq_ptr) + if (pmq_ptr) { *pmq_ptr = port_mq; + } return WQ_ITERATE_BREAK_KEEP_LOCKED; } +/* + * Routine: ipc_mqueue_receive_on_thread + * Purpose: + * Receive a message from a message queue using a specified thread. + * If no message available, assert_wait on the appropriate waitq. + * + * Conditions: + * Assumes thread is self. + * Called with mqueue locked. + * Returns with mqueue unlocked. + * May have assert-waited. Caller must block in those cases. + */ wait_result_t ipc_mqueue_receive_on_thread( - ipc_mqueue_t mqueue, + ipc_mqueue_t mqueue, mach_msg_option_t option, mach_msg_size_t max_size, mach_msg_timeout_t rcv_timeout, @@ -842,21 +1101,30 @@ ipc_mqueue_receive_on_thread( thread_t thread) { wait_result_t wresult; - uint64_t deadline; - spl_t s; + uint64_t deadline; + struct turnstile *rcv_turnstile = TURNSTILE_NULL; + + /* called with mqueue locked */ - s = splsched(); - imq_lock(mqueue); /* no need to reserve anything: we never prepost to anyone */ - + + if (!imq_valid(mqueue)) { + /* someone raced us to destroy this mqueue/port! */ + imq_unlock(mqueue); + /* + * ipc_mqueue_receive_results updates the thread's ith_state + * TODO: differentiate between rights being moved and + * rights/ports being destroyed (21885327) + */ + return THREAD_RESTART; + } + if (imq_is_set(mqueue)) { ipc_mqueue_t port_mq = IMQ_NULL; - spl_t set_spl; (void)waitq_set_iterate_preposts(&mqueue->imq_set_queue, - &port_mq, - mqueue_process_prepost_receive, - &set_spl); + &port_mq, + mqueue_process_prepost_receive); if (port_mq != IMQ_NULL) { /* @@ -870,42 +1138,42 @@ ipc_mqueue_receive_on_thread( */ imq_unlock(mqueue); - /* TODO: if/when port mqueues become non irq safe, - * we won't need this spl, and we should be - * able to call splx(s) (if that's even - * necessary). - * For now, we've still disabled interrupts via - * imq_reserve_and_lock(); - */ - splx(set_spl); - /* * Continue on to handling the message with just * the port mqueue locked. */ - ipc_mqueue_select_on_thread(port_mq, mqueue, option, - max_size, thread); + if (option & MACH_PEEK_MSG) { + ipc_mqueue_peek_on_thread(port_mq, option, thread); + } else { + ipc_mqueue_select_on_thread(port_mq, mqueue, option, + max_size, thread); + } imq_unlock(port_mq); - splx(s); return THREAD_NOT_WAITING; } - } else { + } else if (imq_is_queue(mqueue) || imq_is_turnstile_proxy(mqueue)) { ipc_kmsg_queue_t kmsgs; /* * Receive on a single port. Just try to get the messages. */ - kmsgs = &mqueue->imq_messages; + kmsgs = &mqueue->imq_messages; if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) { - ipc_mqueue_select_on_thread(mqueue, IMQ_NULL, option, - max_size, thread); + if (option & MACH_PEEK_MSG) { + ipc_mqueue_peek_on_thread(mqueue, option, thread); + } else { + ipc_mqueue_select_on_thread(mqueue, IMQ_NULL, option, + max_size, thread); + } imq_unlock(mqueue); - splx(s); return THREAD_NOT_WAITING; } + } else { + panic("Unknown mqueue type 0x%x: likely memory corruption!\n", + mqueue->imq_wait_queue.waitq_type); } - + /* * Looks like we'll have to block. The mqueue we will * block on (whether the set's or the local port's) is @@ -914,41 +1182,113 @@ ipc_mqueue_receive_on_thread( if (option & MACH_RCV_TIMEOUT) { if (rcv_timeout == 0) { imq_unlock(mqueue); - splx(s); thread->ith_state = MACH_RCV_TIMED_OUT; return THREAD_NOT_WAITING; } } - /* NOTE: need splsched() here if mqueue no longer needs irq disabled */ - thread_lock(thread); - thread->ith_state = MACH_RCV_IN_PROGRESS; thread->ith_option = option; - thread->ith_msize = max_size; + thread->ith_rsize = max_size; + thread->ith_msize = 0; - if (option & MACH_RCV_TIMEOUT) - clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline); - else + if (option & MACH_PEEK_MSG) { + thread->ith_state = MACH_PEEK_IN_PROGRESS; + } else { + thread->ith_state = MACH_RCV_IN_PROGRESS; + } + + if (option & MACH_RCV_TIMEOUT) { + clock_interval_to_deadline(rcv_timeout, 1000 * NSEC_PER_USEC, &deadline); + } else { deadline = 0; + } + + /* + * Threads waiting on a reply port (not portset) + * will wait on its receive turnstile. + * + * Donate waiting thread's turnstile and + * setup inheritor for special reply port. + * Based on the state of the special reply + * port, the inheritor would be the send + * turnstile of the connection port on which + * the send of sync ipc would happen or + * workloop's turnstile who would reply to + * the sync ipc message. + * + * Pass in mqueue wait in waitq_assert_wait to + * support port set wakeup. The mqueue waitq of port + * will be converted to to turnstile waitq + * in waitq_assert_wait instead of global waitqs. + */ + if (imq_is_turnstile_proxy(mqueue)) { + ipc_port_t port = ip_from_mq(mqueue); + rcv_turnstile = turnstile_prepare((uintptr_t)port, + port_rcv_turnstile_address(port), + TURNSTILE_NULL, TURNSTILE_SYNC_IPC); + + ipc_port_recv_update_inheritor(port, rcv_turnstile, + TURNSTILE_DELAYED_UPDATE); + } + thread_set_pending_block_hint(thread, kThreadWaitPortReceive); wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_RECEIVE, - interruptible, - TIMEOUT_URGENCY_USER_NORMAL, - deadline, - TIMEOUT_NO_LEEWAY, - thread); + IPC_MQUEUE_RECEIVE, + interruptible, + TIMEOUT_URGENCY_USER_NORMAL, + deadline, + TIMEOUT_NO_LEEWAY, + thread); /* preposts should be detected above, not here */ - if (wresult == THREAD_AWAKENED) + if (wresult == THREAD_AWAKENED) { panic("ipc_mqueue_receive_on_thread: sleep walking"); + } - thread_unlock(thread); imq_unlock(mqueue); - splx(s); + + /* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */ + if (rcv_turnstile != TURNSTILE_NULL) { + turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD); + } + /* Its callers responsibility to call turnstile_complete to get the turnstile back */ + return wresult; } +/* + * Routine: ipc_mqueue_peek_on_thread + * Purpose: + * A receiver discovered that there was a message on the queue + * before he had to block. Tell a thread about the message queue, + * but don't pick off any messages. + * Conditions: + * port_mq locked + * at least one message on port_mq's message queue + * + * Returns: (on thread->ith_state) + * MACH_PEEK_READY ith_peekq contains a message queue + */ +void +ipc_mqueue_peek_on_thread( + ipc_mqueue_t port_mq, + mach_msg_option_t option, + thread_t thread) +{ + (void)option; + assert(option & MACH_PEEK_MSG); + assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL); + + /* + * Take a reference on the mqueue's associated port: + * the peeking thread will be responsible to release this reference + * using ip_release_mq() + */ + ip_reference_mq(port_mq); + thread->ith_peekq = port_mq; + thread->ith_state = MACH_PEEK_READY; +} + /* * Routine: ipc_mqueue_select_on_thread * Purpose: @@ -967,15 +1307,15 @@ ipc_mqueue_receive_on_thread( */ void ipc_mqueue_select_on_thread( - ipc_mqueue_t port_mq, - ipc_mqueue_t set_mq, - mach_msg_option_t option, - mach_msg_size_t max_size, + ipc_mqueue_t port_mq, + ipc_mqueue_t set_mq, + mach_msg_option_t option, + mach_msg_size_t max_size, thread_t thread) { ipc_kmsg_t kmsg; mach_msg_return_t mr = MACH_MSG_SUCCESS; - mach_msg_size_t rcv_size; + mach_msg_size_t msize; /* * Do some sanity checking of our ability to receive @@ -990,20 +1330,25 @@ ipc_mqueue_select_on_thread( * the queue, instead return the appropriate error * (and size needed). */ - rcv_size = ipc_kmsg_copyout_size(kmsg, thread->map); - if (rcv_size + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) { + msize = ipc_kmsg_copyout_size(kmsg, thread->map); + if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(thread), option) > max_size) { mr = MACH_RCV_TOO_LARGE; if (option & MACH_RCV_LARGE) { thread->ith_receiver_name = port_mq->imq_receiver_name; thread->ith_kmsg = IKM_NULL; - thread->ith_msize = rcv_size; + thread->ith_msize = msize; thread->ith_seqno = 0; thread->ith_state = mr; return; } } - ipc_kmsg_rmqueue_first_macro(&port_mq->imq_messages, kmsg); + ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg); +#if MACH_FLIPC + if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) { + flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE); + } +#endif ipc_mqueue_release_msgcount(port_mq, set_mq); thread->ith_seqno = port_mq->imq_seqno++; thread->ith_kmsg = kmsg; @@ -1014,7 +1359,7 @@ ipc_mqueue_select_on_thread( } /* - * Routine: ipc_mqueue_peek + * Routine: ipc_mqueue_peek_locked * Purpose: * Peek at a (non-set) message queue to see if it has a message * matching the sequence number provided (if zero, then the @@ -1022,39 +1367,39 @@ ipc_mqueue_select_on_thread( * message. * * Conditions: - * Locks may be held by callers, so this routine cannot block. + * The ipc_mqueue_t is locked by callers. + * Other locks may be held by callers, so this routine cannot block. * Caller holds reference on the message queue. */ unsigned -ipc_mqueue_peek(ipc_mqueue_t mq, - mach_port_seqno_t * seqnop, - mach_msg_size_t * msg_sizep, - mach_msg_id_t * msg_idp, - mach_msg_max_trailer_t * msg_trailerp) +ipc_mqueue_peek_locked(ipc_mqueue_t mq, + mach_port_seqno_t * seqnop, + mach_msg_size_t * msg_sizep, + mach_msg_id_t * msg_idp, + mach_msg_max_trailer_t * msg_trailerp, + ipc_kmsg_t *kmsgp) { ipc_kmsg_queue_t kmsgq; ipc_kmsg_t kmsg; mach_port_seqno_t seqno, msgoff; - int res = 0; - spl_t s; + unsigned res = 0; assert(!imq_is_set(mq)); - s = splsched(); - imq_lock(mq); - seqno = 0; - if (seqnop != NULL) + if (seqnop != NULL) { seqno = *seqnop; + } if (seqno == 0) { seqno = mq->imq_seqno; msgoff = 0; - } else if (seqno >= mq->imq_seqno && - seqno < mq->imq_seqno + mq->imq_msgcount) { + } else if (seqno >= mq->imq_seqno && + seqno < mq->imq_seqno + mq->imq_msgcount) { msgoff = seqno - mq->imq_seqno; - } else + } else { goto out; + } /* look for the message that would match that seqno */ kmsgq = &mq->imq_messages; @@ -1062,29 +1407,102 @@ ipc_mqueue_peek(ipc_mqueue_t mq, while (msgoff-- && kmsg != IKM_NULL) { kmsg = ipc_kmsg_queue_next(kmsgq, kmsg); } - if (kmsg == IKM_NULL) + if (kmsg == IKM_NULL) { goto out; + } /* found one - return the requested info */ - if (seqnop != NULL) + if (seqnop != NULL) { *seqnop = seqno; - if (msg_sizep != NULL) + } + if (msg_sizep != NULL) { *msg_sizep = kmsg->ikm_header->msgh_size; - if (msg_idp != NULL) + } + if (msg_idp != NULL) { *msg_idp = kmsg->ikm_header->msgh_id; - if (msg_trailerp != NULL) - memcpy(msg_trailerp, - (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header + - round_msg(kmsg->ikm_header->msgh_size)), - sizeof(mach_msg_max_trailer_t)); + } + if (msg_trailerp != NULL) { + memcpy(msg_trailerp, + (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header + + round_msg(kmsg->ikm_header->msgh_size)), + sizeof(mach_msg_max_trailer_t)); + } + if (kmsgp != NULL) { + *kmsgp = kmsg; + } + res = 1; - out: +out: + return res; +} + + +/* + * Routine: ipc_mqueue_peek + * Purpose: + * Peek at a (non-set) message queue to see if it has a message + * matching the sequence number provided (if zero, then the + * first message in the queue) and return vital info about the + * message. + * + * Conditions: + * The ipc_mqueue_t is unlocked. + * Locks may be held by callers, so this routine cannot block. + * Caller holds reference on the message queue. + */ +unsigned +ipc_mqueue_peek(ipc_mqueue_t mq, + mach_port_seqno_t * seqnop, + mach_msg_size_t * msg_sizep, + mach_msg_id_t * msg_idp, + mach_msg_max_trailer_t * msg_trailerp, + ipc_kmsg_t *kmsgp) +{ + unsigned res; + + imq_lock(mq); + + res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp, + msg_trailerp, kmsgp); + imq_unlock(mq); - splx(s); return res; } +/* + * Routine: ipc_mqueue_release_peek_ref + * Purpose: + * Release the reference on an mqueue's associated port which was + * granted to a thread in ipc_mqueue_peek_on_thread (on the + * MACH_PEEK_MSG thread wakeup path). + * + * Conditions: + * The ipc_mqueue_t should be locked on entry. + * The ipc_mqueue_t will be _unlocked_ on return + * (and potentially invalid!) + * + */ +void +ipc_mqueue_release_peek_ref(ipc_mqueue_t mq) +{ + assert(!imq_is_set(mq)); + assert(imq_held(mq)); + + /* + * clear any preposts this mq may have generated + * (which would cause subsequent immediate wakeups) + */ + waitq_clear_prepost_locked(&mq->imq_wait_queue); + + imq_unlock(mq); + + /* + * release the port reference: we need to do this outside the lock + * because we might be holding the last port reference! + **/ + ip_release_mq(mq); +} /* * peek at the contained port message queues, break prepost iteration as soon @@ -1093,18 +1511,19 @@ ipc_mqueue_peek(ipc_mqueue_t mq, * queue is checked. If a message wasn't there before we entered here, no need * to find it (if we do, great). */ -static int mqueue_peek_iterator(void *ctx, struct waitq *waitq, - struct waitq_set *wqset) +static int +mqueue_peek_iterator(void *ctx, struct waitq *waitq, + struct waitq_set *wqset) { ipc_mqueue_t port_mq = (ipc_mqueue_t)waitq; ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages; (void)ctx; (void)wqset; - - if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) - return WQ_ITERATE_BREAK; /* break out of the prepost iteration */ + if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) { + return WQ_ITERATE_BREAK; /* break out of the prepost iteration */ + } return WQ_ITERATE_CONTINUE; } @@ -1121,20 +1540,25 @@ static int mqueue_peek_iterator(void *ctx, struct waitq *waitq, unsigned ipc_mqueue_set_peek(ipc_mqueue_t mq) { - spl_t s; int ret; - assert(imq_is_set(mq)); - - s = splsched(); imq_lock(mq); + /* + * We may have raced with port destruction where the mqueue is marked + * as invalid. In that case, even though we don't have messages, we + * have an end-of-life event to deliver. + */ + if (!imq_is_valid(mq)) { + return 1; + } + ret = waitq_set_iterate_preposts(&mq->imq_set_queue, NULL, - mqueue_peek_iterator, NULL); + mqueue_peek_iterator); imq_unlock(mq); - splx(s); - return (ret == WQ_ITERATE_BREAK); + + return ret == WQ_ITERATE_BREAK; } /* @@ -1190,14 +1614,15 @@ ipc_mqueue_set_gather_member_names( /* only receive rights can be members of port sets */ if ((entry->ie_bits & MACH_PORT_TYPE_RECEIVE) != MACH_PORT_TYPE_NONE) { - __IGNORE_WCASTALIGN(ipc_port_t port = (ipc_port_t)entry->ie_object); + ipc_port_t port = ip_object_to_port(entry->ie_object); ipc_mqueue_t mq = &port->ip_messages; assert(IP_VALID(port)); if (ip_active(port) && waitq_member(&mq->imq_wait_queue, wqset)) { - if (actual < maxnames) + if (actual < maxnames) { names[actual] = mq->imq_receiver_name; + } actual++; } } @@ -1211,41 +1636,38 @@ out: /* - * Routine: ipc_mqueue_destroy + * Routine: ipc_mqueue_destroy_locked * Purpose: * Destroy a (non-set) message queue. * Set any blocked senders running. - * Destroy the kmsgs in the queue. + * Destroy the kmsgs in the queue. * Conditions: - * Nothing locked. + * mqueue locked * Receivers were removed when the receive right was "changed" */ -void -ipc_mqueue_destroy( - ipc_mqueue_t mqueue) +boolean_t +ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue) { ipc_kmsg_queue_t kmqueue; ipc_kmsg_t kmsg; boolean_t reap = FALSE; - spl_t s; + struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue)); assert(!imq_is_set(mqueue)); - s = splsched(); - imq_lock(mqueue); - /* * rouse all blocked senders * (don't boost anyone - we're tearing this queue down) * (never preposts) */ mqueue->imq_fullwaiters = FALSE; - waitq_wakeup64_all_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_RESTART, - NULL, - WAITQ_ALL_PRIORITIES, - WAITQ_KEEP_LOCKED); + + if (send_turnstile != TURNSTILE_NULL) { + waitq_wakeup64_all(&send_turnstile->ts_waitq, + IPC_MQUEUE_FULL, + THREAD_RESTART, + WAITQ_ALL_PRIORITIES); + } /* * Move messages from the specified queue to the per-thread @@ -1253,10 +1675,16 @@ ipc_mqueue_destroy( */ kmqueue = &mqueue->imq_messages; while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) { +#if MACH_FLIPC + if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(mqueue->imq_fport)) { + flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE); + } +#endif boolean_t first; first = ipc_kmsg_delayed_destroy(kmsg); - if (first) + if (first) { reap = first; + } } /* @@ -1267,26 +1695,20 @@ ipc_mqueue_destroy( */ mqueue->imq_msgcount = 0; - /* clear out any preposting we may have done */ - waitq_clear_prepost_locked(&mqueue->imq_wait_queue, &s); + /* invalidate the waitq for subsequent mqueue operations */ + waitq_invalidate_locked(&mqueue->imq_wait_queue); - imq_unlock(mqueue); - splx(s); + /* clear out any preposting we may have done */ + waitq_clear_prepost_locked(&mqueue->imq_wait_queue); /* - * assert that we're destroying a queue that's not a - * member of any other queue + * assert that we are destroying / invalidating a queue that's + * not a member of any other queue. */ - assert(mqueue->imq_wait_queue.waitq_prepost_id == 0); - assert(mqueue->imq_wait_queue.waitq_set_id == 0); - + assert(mqueue->imq_preposts == 0); + assert(mqueue->imq_in_pset == 0); - /* - * Destroy the messages we enqueued if we aren't nested - * inside some other attempt to drain the same queue. - */ - if (reap) - ipc_kmsg_reap_delayed(); + return reap; } /* @@ -1300,23 +1722,21 @@ ipc_mqueue_destroy( void ipc_mqueue_set_qlimit( - ipc_mqueue_t mqueue, - mach_port_msgcount_t qlimit) + ipc_mqueue_t mqueue, + mach_port_msgcount_t qlimit) { - spl_t s; - - assert(qlimit <= MACH_PORT_QLIMIT_MAX); + assert(qlimit <= MACH_PORT_QLIMIT_MAX); - /* wake up senders allowed by the new qlimit */ - s = splsched(); - imq_lock(mqueue); - if (qlimit > mqueue->imq_qlimit) { - mach_port_msgcount_t i, wakeup; + /* wake up senders allowed by the new qlimit */ + imq_lock(mqueue); + if (qlimit > mqueue->imq_qlimit) { + mach_port_msgcount_t i, wakeup; + struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue)); - /* caution: wakeup, qlimit are unsigned */ - wakeup = qlimit - mqueue->imq_qlimit; + /* caution: wakeup, qlimit are unsigned */ + wakeup = qlimit - mqueue->imq_qlimit; - for (i = 0; i < wakeup; i++) { + for (i = 0; i < wakeup; i++) { /* * boost the priority of the awoken thread * (WAITQ_PROMOTE_PRIORITY) to ensure it uses @@ -1324,21 +1744,19 @@ ipc_mqueue_set_qlimit( * * NOTE: this will never prepost */ - if (waitq_wakeup64_one_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_AWAKENED, - NULL, - WAITQ_PROMOTE_PRIORITY, - WAITQ_KEEP_LOCKED) == KERN_NOT_WAITING) { + if (send_turnstile == TURNSTILE_NULL || + waitq_wakeup64_one(&send_turnstile->ts_waitq, + IPC_MQUEUE_FULL, + THREAD_AWAKENED, + WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) { mqueue->imq_fullwaiters = FALSE; break; } mqueue->imq_msgcount++; /* give it to the awakened thread */ - } + } } mqueue->imq_qlimit = qlimit; imq_unlock(mqueue); - splx(s); } /* @@ -1350,16 +1768,12 @@ ipc_mqueue_set_qlimit( */ void ipc_mqueue_set_seqno( - ipc_mqueue_t mqueue, - mach_port_seqno_t seqno) + ipc_mqueue_t mqueue, + mach_port_seqno_t seqno) { - spl_t s; - - s = splsched(); imq_lock(mqueue); mqueue->imq_seqno = seqno; imq_unlock(mqueue); - splx(s); } @@ -1382,12 +1796,13 @@ ipc_mqueue_set_seqno( mach_msg_return_t ipc_mqueue_copyin( - ipc_space_t space, - mach_port_name_t name, - ipc_mqueue_t *mqueuep, - ipc_object_t *objectp) + ipc_space_t space, + mach_port_name_t name, + ipc_mqueue_t *mqueuep, + ipc_object_t *objectp) { ipc_entry_t entry; + ipc_entry_bits_t bits; ipc_object_t object; ipc_mqueue_t mqueue; @@ -1403,35 +1818,36 @@ ipc_mqueue_copyin( return MACH_RCV_INVALID_NAME; } + bits = entry->ie_bits; object = entry->ie_object; - if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) { - ipc_port_t port; + if (bits & MACH_PORT_TYPE_RECEIVE) { + ipc_port_t port = ip_object_to_port(object); - __IGNORE_WCASTALIGN(port = (ipc_port_t) object); assert(port != IP_NULL); ip_lock(port); - assert(ip_active(port)); + require_ip_active(port); assert(port->ip_receiver_name == name); assert(port->ip_receiver == space); is_read_unlock(space); mqueue = &port->ip_messages; + } else if (bits & MACH_PORT_TYPE_PORT_SET) { + ipc_pset_t pset = ips_object_to_pset(object); - } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) { - ipc_pset_t pset; - - __IGNORE_WCASTALIGN(pset = (ipc_pset_t) object); assert(pset != IPS_NULL); ips_lock(pset); assert(ips_active(pset)); - assert(pset->ips_local_name == name); is_read_unlock(space); mqueue = &pset->ips_messages; } else { is_read_unlock(space); + /* guard exception if we never held the receive right in this entry */ + if ((bits & MACH_PORT_TYPE_EX_RECEIVE) == 0) { + mach_port_guard_exception(name, 0, 0, kGUARD_EXC_RCV_INVALID_NAME); + } return MACH_RCV_INVALID_NAME; } @@ -1447,3 +1863,19 @@ ipc_mqueue_copyin( *mqueuep = mqueue; return MACH_MSG_SUCCESS; } + +void +imq_lock(ipc_mqueue_t mq) +{ + ipc_object_t object = imq_to_object(mq); + ipc_object_validate(object); + waitq_lock(&(mq)->imq_wait_queue); +} + +unsigned int +imq_lock_try(ipc_mqueue_t mq) +{ + ipc_object_t object = imq_to_object(mq); + ipc_object_validate(object); + return waitq_lock_try(&(mq)->imq_wait_queue); +}