* Copyright (c) 2000-2007 Apple Inc. All rights reserved.
*
* @APPLE_OSREFERENCE_LICENSE_HEADER_START@
- *
+ *
* This file contains Original Code and/or Modifications of Original Code
* as defined in and that are subject to the Apple Public Source License
* Version 2.0 (the 'License'). You may not use this file except in
* unlawful or unlicensed copies of an Apple operating system, or to
* circumvent, violate, or enable the circumvention or violation of, any
* terms of an Apple operating system software license agreement.
- *
+ *
* Please obtain a copy of the License at
* http://www.opensource.apple.com/apsl/ and read it before using this file.
- *
+ *
* The Original Code and all software distributed under the License are
* distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
* EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
* FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
* Please see the License for the specific language governing rights and
* limitations under the License.
- *
+ *
* @APPLE_OSREFERENCE_LICENSE_HEADER_END@
*/
/*
* @OSF_FREE_COPYRIGHT@
*/
-/*
+/*
* Mach Operating System
* Copyright (c) 1991,1990,1989 Carnegie Mellon University
* All Rights Reserved.
- *
+ *
* Permission to use, copy, modify and distribute this software and its
* documentation is hereby granted, provided that both the copyright
* notice and this permission notice appear in all copies of the
* software, derivative works or modified versions, and any portions
* thereof, and that both notices appear in supporting documentation.
- *
+ *
* CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
* CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
* ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
- *
+ *
* Carnegie Mellon requests users of this software to return to
- *
+ *
* Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
* School of Computer Science
* Carnegie Mellon University
* Pittsburgh PA 15213-3890
- *
+ *
* any improvements or extensions that they make and grant Carnegie Mellon
* the rights to redistribute these changes.
*/
* is included in support of clause 2.2 (b) of the Apple Public License,
* Version 2.0.
*/
-
+
#include <mach/port.h>
#include <mach/message.h>
#include <kern/counters.h>
#include <kern/sched_prim.h>
#include <kern/ipc_kobject.h>
-#include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
+#include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
#include <kern/misc_protos.h>
#include <kern/task.h>
#include <kern/thread.h>
-#include <kern/wait_queue.h>
+#include <kern/waitq.h>
+#include <ipc/port.h>
#include <ipc/ipc_mqueue.h>
#include <ipc/ipc_kmsg.h>
#include <ipc/ipc_port.h>
#include <ipc/ipc_pset.h>
#include <ipc/ipc_space.h>
+#if MACH_FLIPC
+#include <ipc/flipc.h>
+#endif
+
#ifdef __LP64__
#include <vm/vm_map.h>
#endif
-int ipc_mqueue_full; /* address is event for queue space */
-int ipc_mqueue_rcv; /* address is event for message arrival */
+#include <sys/event.h>
+
+extern char *proc_name_address(void *p);
+
+int ipc_mqueue_full; /* address is event for queue space */
+int ipc_mqueue_rcv; /* address is event for message arrival */
/* forward declarations */
-void ipc_mqueue_receive_results(wait_result_t result);
+static void ipc_mqueue_receive_results(wait_result_t result);
+static void ipc_mqueue_peek_on_thread(
+ ipc_mqueue_t port_mq,
+ mach_msg_option_t option,
+ thread_t thread);
/*
* Routine: ipc_mqueue_init
*/
void
ipc_mqueue_init(
- ipc_mqueue_t mqueue,
- boolean_t is_set)
+ ipc_mqueue_t mqueue,
+ ipc_mqueue_kind_t kind)
{
- if (is_set) {
- wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST);
- } else {
- wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
+ switch (kind) {
+ case IPC_MQUEUE_KIND_SET:
+ waitq_set_init(&mqueue->imq_set_queue,
+ SYNC_POLICY_FIFO | SYNC_POLICY_PREPOST,
+ NULL, NULL);
+ break;
+ case IPC_MQUEUE_KIND_NONE: /* cheat: we really should have "no" mqueue */
+ case IPC_MQUEUE_KIND_PORT:
+ waitq_init(&mqueue->imq_wait_queue,
+ SYNC_POLICY_FIFO | SYNC_POLICY_TURNSTILE_PROXY);
ipc_kmsg_queue_init(&mqueue->imq_messages);
mqueue->imq_seqno = 0;
mqueue->imq_msgcount = 0;
mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
+ mqueue->imq_context = 0;
mqueue->imq_fullwaiters = FALSE;
+#if MACH_FLIPC
+ mqueue->imq_fport = FPORT_NULL;
+#endif
+ break;
+ }
+ klist_init(&mqueue->imq_klist);
+}
+
+void
+ipc_mqueue_deinit(
+ ipc_mqueue_t mqueue)
+{
+ boolean_t is_set = imq_is_set(mqueue);
+
+ if (is_set) {
+ waitq_set_deinit(&mqueue->imq_set_queue);
+ } else {
+ waitq_deinit(&mqueue->imq_wait_queue);
}
}
+/*
+ * Routine: imq_reserve_and_lock
+ * Purpose:
+ * Atomically lock an ipc_mqueue_t object and reserve
+ * an appropriate number of prepost linkage objects for
+ * use in wakeup operations.
+ * Conditions:
+ * mq is unlocked
+ */
+void
+imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost)
+{
+ *reserved_prepost = waitq_prepost_reserve(&mq->imq_wait_queue, 0,
+ WAITQ_KEEP_LOCKED);
+}
+
+
+/*
+ * Routine: imq_release_and_unlock
+ * Purpose:
+ * Unlock an ipc_mqueue_t object, re-enable interrupts,
+ * and release any unused prepost object reservations.
+ * Conditions:
+ * mq is locked
+ */
+void
+imq_release_and_unlock(ipc_mqueue_t mq, uint64_t reserved_prepost)
+{
+ assert(imq_held(mq));
+ waitq_unlock(&mq->imq_wait_queue);
+ waitq_prepost_release_reserve(reserved_prepost);
+}
+
+
/*
* Routine: ipc_mqueue_member
* Purpose:
boolean_t
ipc_mqueue_member(
- ipc_mqueue_t port_mqueue,
- ipc_mqueue_t set_mqueue)
+ ipc_mqueue_t port_mqueue,
+ ipc_mqueue_t set_mqueue)
{
- wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
- wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
-
- return (wait_queue_member(port_waitq, set_waitq));
+ struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
+ struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
+ return waitq_member(port_waitq, set_waitq);
}
/*
kern_return_t
ipc_mqueue_remove(
- ipc_mqueue_t mqueue,
- ipc_mqueue_t set_mqueue,
- wait_queue_link_t *wqlp)
+ ipc_mqueue_t mqueue,
+ ipc_mqueue_t set_mqueue)
{
- wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
- wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
+ struct waitq *mq_waitq = &mqueue->imq_wait_queue;
+ struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
- return wait_queue_unlink_nofree(mq_waitq, set_waitq, wqlp);
+ return waitq_unlink(mq_waitq, set_waitq);
}
/*
* Remove the mqueue from all the sets it is a member of
* Conditions:
* Nothing locked.
+ * Returns:
+ * mqueue unlocked and set links deallocated
*/
void
-ipc_mqueue_remove_from_all(
- ipc_mqueue_t mqueue,
- queue_t links)
+ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue)
{
- wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
+ struct waitq *mq_waitq = &mqueue->imq_wait_queue;
+ kern_return_t kr;
- wait_queue_unlink_all_nofree(mq_waitq, links);
- return;
+ imq_lock(mqueue);
+
+ assert(waitq_valid(mq_waitq));
+ kr = waitq_unlink_all_unlock(mq_waitq);
+ /* mqueue unlocked and set links deallocated */
}
/*
* Routine: ipc_mqueue_remove_all
* Purpose:
* Remove all the member queues from the specified set.
+ * Also removes the queue from any containing sets.
* Conditions:
* Nothing locked.
+ * Returns:
+ * mqueue unlocked all set links deallocated
*/
void
-ipc_mqueue_remove_all(
- ipc_mqueue_t mqueue,
- queue_t links)
+ipc_mqueue_remove_all(ipc_mqueue_t mqueue)
{
- wait_queue_set_t mq_setq = &mqueue->imq_set_queue;
+ struct waitq_set *mq_setq = &mqueue->imq_set_queue;
- wait_queue_set_unlink_all_nofree(mq_setq, links);
- return;
+ imq_lock(mqueue);
+ assert(waitqs_is_set(mq_setq));
+ waitq_set_unlink_all_unlock(mq_setq);
+ /* mqueue unlocked set links deallocated */
}
*/
kern_return_t
ipc_mqueue_add(
- ipc_mqueue_t port_mqueue,
- ipc_mqueue_t set_mqueue,
- wait_queue_link_t wql)
+ ipc_mqueue_t port_mqueue,
+ ipc_mqueue_t set_mqueue,
+ uint64_t *reserved_link,
+ uint64_t *reserved_prepost)
{
- wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
- wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
+ struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
+ struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
ipc_kmsg_queue_t kmsgq;
ipc_kmsg_t kmsg, next;
- kern_return_t kr;
- spl_t s;
+ kern_return_t kr;
+
+ assert(reserved_link && *reserved_link != 0);
+ assert(waitqs_is_linked(set_waitq));
+
+ imq_lock(port_mqueue);
- kr = wait_queue_link_noalloc(port_waitq, set_waitq, wql);
- if (kr != KERN_SUCCESS)
+ /*
+ * The link operation is now under the same lock-hold as
+ * message iteration and thread wakeup, but doesn't have to be...
+ */
+ kr = waitq_link(port_waitq, set_waitq, WAITQ_ALREADY_LOCKED, reserved_link);
+ if (kr != KERN_SUCCESS) {
+ imq_unlock(port_mqueue);
return kr;
+ }
/*
* Now that the set has been added to the port, there may be
* messages queued on the port and threads waiting on the set
* waitq. Lets get them together.
*/
- s = splsched();
- imq_lock(port_mqueue);
kmsgq = &port_mqueue->imq_messages;
for (kmsg = ipc_kmsg_queue_first(kmsgq);
- kmsg != IKM_NULL;
- kmsg = next) {
+ kmsg != IKM_NULL;
+ kmsg = next) {
next = ipc_kmsg_queue_next(kmsgq, kmsg);
for (;;) {
thread_t th;
mach_msg_size_t msize;
+ spl_t th_spl;
- th = wait_queue_wakeup64_identity_locked(
- port_waitq,
- IPC_MQUEUE_RECEIVE,
- THREAD_AWAKENED,
- FALSE);
+ th = waitq_wakeup64_identify_locked(
+ port_waitq,
+ IPC_MQUEUE_RECEIVE,
+ THREAD_AWAKENED, &th_spl,
+ reserved_prepost, WAITQ_ALL_PRIORITIES,
+ WAITQ_KEEP_LOCKED);
/* waitq/mqueue still locked, thread locked */
- if (th == THREAD_NULL)
+ if (th == THREAD_NULL) {
goto leave;
+ }
/*
* If the receiver waited with a facility not directly
* go look for another thread that can.
*/
if (th->ith_state != MACH_RCV_IN_PROGRESS) {
- thread_unlock(th);
- continue;
+ if (th->ith_state == MACH_PEEK_IN_PROGRESS) {
+ /*
+ * wakeup the peeking thread, but
+ * continue to loop over the threads
+ * waiting on the port's mqueue to see
+ * if there are any actual receivers
+ */
+ ipc_mqueue_peek_on_thread(port_mqueue,
+ th->ith_option,
+ th);
+ }
+ thread_unlock(th);
+ splx(th_spl);
+ continue;
}
/*
* just move onto the next.
*/
msize = ipc_kmsg_copyout_size(kmsg, th->map);
- if (th->ith_msize <
- (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(th), th->ith_option))) {
+ if (th->ith_rsize <
+ (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th), th->ith_option))) {
th->ith_state = MACH_RCV_TOO_LARGE;
th->ith_msize = msize;
if (th->ith_option & MACH_RCV_LARGE) {
th->ith_kmsg = IKM_NULL;
th->ith_seqno = 0;
thread_unlock(th);
+ splx(th_spl);
continue; /* find another thread */
}
} else {
* so give it to him.
*/
ipc_kmsg_rmqueue(kmsgq, kmsg);
- ipc_mqueue_release_msgcount(port_mqueue);
+#if MACH_FLIPC
+ mach_node_t node = kmsg->ikm_node;
+#endif
+ ipc_mqueue_release_msgcount(port_mqueue, IMQ_NULL);
th->ith_kmsg = kmsg;
th->ith_seqno = port_mqueue->imq_seqno++;
thread_unlock(th);
+ splx(th_spl);
+#if MACH_FLIPC
+ if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) {
+ flipc_msg_ack(node, port_mqueue, TRUE);
+ }
+#endif
break; /* go to next message */
}
-
}
- leave:
+leave:
imq_unlock(port_mqueue);
- splx(s);
return KERN_SUCCESS;
}
+
+/*
+ * Routine: ipc_mqueue_has_klist
+ * Purpose:
+ * Returns whether the given mqueue imq_klist field can be used as a klist.
+ */
+static inline bool
+ipc_mqueue_has_klist(ipc_mqueue_t mqueue)
+{
+ ipc_object_t object = imq_to_object(mqueue);
+ if (io_otype(object) != IOT_PORT) {
+ return true;
+ }
+ ipc_port_t port = ip_from_mq(mqueue);
+ if (port->ip_specialreply) {
+ return false;
+ }
+ return port->ip_sync_link_state == PORT_SYNC_LINK_ANY;
+}
+
/*
* Routine: ipc_mqueue_changed
* Purpose:
* Conditions:
* The message queue is locked.
*/
-
void
ipc_mqueue_changed(
- ipc_mqueue_t mqueue)
+ ipc_space_t space,
+ ipc_mqueue_t mqueue)
{
- wait_queue_wakeup64_all_locked(
- &mqueue->imq_wait_queue,
- IPC_MQUEUE_RECEIVE,
- THREAD_RESTART,
- FALSE); /* unlock waitq? */
+ if (ipc_mqueue_has_klist(mqueue) && SLIST_FIRST(&mqueue->imq_klist)) {
+ /*
+ * Indicate that this message queue is vanishing
+ *
+ * When this is called, the associated receive right may be in flight
+ * between two tasks: the one it used to live in, and the one that armed
+ * a port destroyed notification for it.
+ *
+ * The new process may want to register the port it gets back with an
+ * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
+ * port pending already, in which case we want the imq_klist field to be
+ * reusable for nefarious purposes.
+ *
+ * Fortunately, we really don't need this linkage anymore after this
+ * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
+ *
+ * Note: we don't have the space lock here, however, this covers the
+ * case of when a task is terminating the space, triggering
+ * several knote_vanish() calls.
+ *
+ * We don't need the lock to observe that the space is inactive as
+ * we just deactivated it on the same thread.
+ *
+ * We still need to call knote_vanish() so that the knote is
+ * marked with EV_VANISHED or EV_EOF so that the detach step
+ * in filt_machportdetach is skipped correctly.
+ */
+ assert(space);
+ knote_vanish(&mqueue->imq_klist, is_active(space));
+ }
+
+ if (io_otype(imq_to_object(mqueue)) == IOT_PORT) {
+ ipc_port_adjust_sync_link_state_locked(ip_from_mq(mqueue), PORT_SYNC_LINK_ANY, NULL);
+ } else {
+ klist_init(&mqueue->imq_klist);
+ }
+
+ waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
+ IPC_MQUEUE_RECEIVE,
+ THREAD_RESTART,
+ NULL,
+ WAITQ_ALL_PRIORITIES,
+ WAITQ_KEEP_LOCKED);
}
-
+
/*
* Routine: ipc_mqueue_send
* Purpose:
* Send a message to a message queue. The message holds a reference
- * for the destination port for this message queue in the
+ * for the destination port for this message queue in the
* msgh_remote_port field.
*
* If unsuccessful, the caller still has possession of
*/
mach_msg_return_t
ipc_mqueue_send(
- ipc_mqueue_t mqueue,
- ipc_kmsg_t kmsg,
- mach_msg_option_t option,
- mach_msg_timeout_t send_timeout,
- spl_t s)
+ ipc_mqueue_t mqueue,
+ ipc_kmsg_t kmsg,
+ mach_msg_option_t option,
+ mach_msg_timeout_t send_timeout)
{
int wresult;
* 3) Message is sent to a send-once right.
*/
if (!imq_full(mqueue) ||
- (!imq_full_kernel(mqueue) &&
- ((option & MACH_SEND_ALWAYS) ||
- (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
- MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
+ (!imq_full_kernel(mqueue) &&
+ ((option & MACH_SEND_ALWAYS) ||
+ (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
+ MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
mqueue->imq_msgcount++;
assert(mqueue->imq_msgcount > 0);
imq_unlock(mqueue);
- splx(s);
} else {
thread_t cur_thread = current_thread();
+ ipc_port_t port = ip_from_mq(mqueue);
+ struct turnstile *send_turnstile = TURNSTILE_NULL;
uint64_t deadline;
- /*
+ /*
* We have to wait for space to be granted to us.
*/
if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
imq_unlock(mqueue);
- splx(s);
return MACH_SEND_TIMED_OUT;
}
if (imq_full_kernel(mqueue)) {
imq_unlock(mqueue);
- splx(s);
return MACH_SEND_NO_BUFFER;
}
mqueue->imq_fullwaiters = TRUE;
- thread_lock(cur_thread);
- if (option & MACH_SEND_TIMEOUT)
- clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
- else
+
+ if (option & MACH_SEND_TIMEOUT) {
+ clock_interval_to_deadline(send_timeout, 1000 * NSEC_PER_USEC, &deadline);
+ } else {
deadline = 0;
- wresult = wait_queue_assert_wait64_locked(
- &mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_ABORTSAFE,
- TIMEOUT_URGENCY_USER_NORMAL,
- deadline, 0,
- cur_thread);
- thread_unlock(cur_thread);
+ }
+
+ thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
+
+ send_turnstile = turnstile_prepare((uintptr_t)port,
+ port_send_turnstile_address(port),
+ TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
+
+ ipc_port_send_update_inheritor(port, send_turnstile,
+ TURNSTILE_DELAYED_UPDATE);
+
+ wresult = waitq_assert_wait64_leeway(
+ &send_turnstile->ts_waitq,
+ IPC_MQUEUE_FULL,
+ THREAD_ABORTSAFE,
+ TIMEOUT_URGENCY_USER_NORMAL,
+ deadline,
+ TIMEOUT_NO_LEEWAY);
+
imq_unlock(mqueue);
- splx(s);
-
+ turnstile_update_inheritor_complete(send_turnstile,
+ TURNSTILE_INTERLOCK_NOT_HELD);
+
if (wresult == THREAD_WAITING) {
wresult = thread_block(THREAD_CONTINUE_NULL);
counter(c_ipc_mqueue_send_block++);
}
-
+
+ /* Call turnstile complete with interlock held */
+ imq_lock(mqueue);
+ turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL, TURNSTILE_SYNC_IPC);
+ imq_unlock(mqueue);
+
+ /* Call cleanup after dropping the interlock */
+ turnstile_cleanup();
+
switch (wresult) {
+ case THREAD_AWAKENED:
+ /*
+ * we can proceed - inherited msgcount from waker
+ * or the message queue has been destroyed and the msgcount
+ * has been reset to zero (will detect in ipc_mqueue_post()).
+ */
+ break;
+
case THREAD_TIMED_OUT:
assert(option & MACH_SEND_TIMEOUT);
return MACH_SEND_TIMED_OUT;
-
- case THREAD_AWAKENED:
- /* we can proceed - inherited msgcount from waker */
- assert(mqueue->imq_msgcount > 0);
- break;
-
+
case THREAD_INTERRUPTED:
return MACH_SEND_INTERRUPTED;
-
+
case THREAD_RESTART:
/* mqueue is being destroyed */
return MACH_SEND_INVALID_DEST;
}
}
- ipc_mqueue_post(mqueue, kmsg);
+ ipc_mqueue_post(mqueue, kmsg, option);
return MACH_MSG_SUCCESS;
}
+/*
+ * Routine: ipc_mqueue_override_send
+ * Purpose:
+ * Set an override qos on the first message in the queue
+ * (if the queue is full). This is a send-possible override
+ * that will go away as soon as we drain a message from the
+ * queue.
+ *
+ * Conditions:
+ * The message queue is not locked.
+ * The caller holds a reference on the message queue.
+ */
+extern void
+ipc_mqueue_override_send(
+ ipc_mqueue_t mqueue,
+ mach_msg_priority_t override)
+{
+ boolean_t __unused full_queue_empty = FALSE;
+
+ imq_lock(mqueue);
+ assert(imq_valid(mqueue));
+ assert(!imq_is_set(mqueue));
+
+ if (imq_full(mqueue)) {
+ ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
+
+ if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override)) {
+ ipc_object_t object = imq_to_object(mqueue);
+ assert(io_otype(object) == IOT_PORT);
+ ipc_port_t port = ip_object_to_port(object);
+ if (ip_active(port) &&
+ port->ip_receiver_name != MACH_PORT_NULL &&
+ is_active(port->ip_receiver) &&
+ ipc_mqueue_has_klist(mqueue)) {
+ KNOTE(&mqueue->imq_klist, 0);
+ }
+ }
+ if (!first) {
+ full_queue_empty = TRUE;
+ }
+ }
+ imq_unlock(mqueue);
+
+#if DEVELOPMENT || DEBUG
+ if (full_queue_empty) {
+ ipc_port_t port = ip_from_mq(mqueue);
+ int dst_pid = 0;
+ if (ip_active(port) && !port->ip_tempowner &&
+ port->ip_receiver_name && port->ip_receiver &&
+ port->ip_receiver != ipc_space_kernel) {
+ dst_pid = task_pid(port->ip_receiver->is_task);
+ }
+ }
+#endif
+}
/*
* Routine: ipc_mqueue_release_msgcount
* Conditions:
* The message queue is locked.
* The message corresponding to this reference is off the queue.
+ * There is no need to pass reserved preposts because this will
+ * never prepost to anyone
*/
void
-ipc_mqueue_release_msgcount(
- ipc_mqueue_t mqueue)
+ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq)
{
- assert(imq_held(mqueue));
- assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
-
- mqueue->imq_msgcount--;
-
- if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
- if (wait_queue_wakeup64_one_locked(
- &mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_AWAKENED,
- FALSE) != KERN_SUCCESS) {
- mqueue->imq_fullwaiters = FALSE;
+ struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(port_mq));
+ (void)set_mq;
+ assert(imq_held(port_mq));
+ assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
+
+ port_mq->imq_msgcount--;
+
+ if (!imq_full(port_mq) && port_mq->imq_fullwaiters &&
+ send_turnstile != TURNSTILE_NULL) {
+ /*
+ * boost the priority of the awoken thread
+ * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
+ * the message queue slot we've just reserved.
+ *
+ * NOTE: this will never prepost
+ *
+ * The wakeup happens on a turnstile waitq
+ * which will wakeup the highest priority waiter.
+ * A potential downside of this would be starving low
+ * priority senders if there is a constant churn of
+ * high priority threads trying to send to this port.
+ */
+ if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
+ IPC_MQUEUE_FULL,
+ THREAD_AWAKENED,
+ WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
+ port_mq->imq_fullwaiters = FALSE;
} else {
/* gave away our slot - add reference back */
- mqueue->imq_msgcount++;
+ port_mq->imq_msgcount++;
}
}
+
+ if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
+ /* no more msgs: invalidate the port's prepost object */
+ waitq_clear_prepost_locked(&port_mq->imq_wait_queue);
+ }
}
/*
* the message queue.
*
* Conditions:
+ * mqueue is unlocked
* If we need to queue, our space in the message queue is reserved.
*/
void
ipc_mqueue_post(
- register ipc_mqueue_t mqueue,
- register ipc_kmsg_t kmsg)
+ ipc_mqueue_t mqueue,
+ ipc_kmsg_t kmsg,
+ mach_msg_option_t __unused option)
{
- spl_t s;
+ uint64_t reserved_prepost = 0;
+ boolean_t destroy_msg = FALSE;
+
+ ipc_kmsg_trace_send(kmsg, option);
/*
* While the msg queue is locked, we have control of the
*
* Check for a receiver for the message.
*/
- s = splsched();
- imq_lock(mqueue);
+ imq_reserve_and_lock(mqueue, &reserved_prepost);
+
+ /* we may have raced with port destruction! */
+ if (!imq_valid(mqueue)) {
+ destroy_msg = TRUE;
+ goto out_unlock;
+ }
+
for (;;) {
- wait_queue_t waitq = &mqueue->imq_wait_queue;
+ struct waitq *waitq = &mqueue->imq_wait_queue;
+ spl_t th_spl;
thread_t receiver;
mach_msg_size_t msize;
- receiver = wait_queue_wakeup64_identity_locked(
- waitq,
- IPC_MQUEUE_RECEIVE,
- THREAD_AWAKENED,
- FALSE);
+ receiver = waitq_wakeup64_identify_locked(waitq,
+ IPC_MQUEUE_RECEIVE,
+ THREAD_AWAKENED,
+ &th_spl,
+ &reserved_prepost,
+ WAITQ_ALL_PRIORITIES,
+ WAITQ_KEEP_LOCKED);
/* waitq still locked, thread locked */
if (receiver == THREAD_NULL) {
- /*
- * no receivers; queue kmsg
+ /*
+ * no receivers; queue kmsg if space still reserved
+ * Reservations are cancelled when the port goes inactive.
+ * note that this will enqueue the message for any
+ * "peeking" receivers.
+ *
+ * Also, post the knote to wake up any threads waiting
+ * on that style of interface if this insertion is of
+ * note (first insertion, or adjusted override qos all
+ * the way to the head of the queue).
+ *
+ * This is just for ports. portset knotes are stay-active,
+ * and their threads get awakened through the !MACH_RCV_IN_PROGRESS
+ * logic below).
*/
- assert(mqueue->imq_msgcount > 0);
- ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
- break;
+ if (mqueue->imq_msgcount > 0) {
+ if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
+ /* if the space is dead there is no point calling KNOTE */
+ ipc_object_t object = imq_to_object(mqueue);
+ assert(io_otype(object) == IOT_PORT);
+ ipc_port_t port = ip_object_to_port(object);
+ if (ip_active(port) &&
+ port->ip_receiver_name != MACH_PORT_NULL &&
+ is_active(port->ip_receiver) &&
+ ipc_mqueue_has_klist(mqueue)) {
+ KNOTE(&mqueue->imq_klist, 0);
+ }
+ }
+ break;
+ }
+
+ /*
+ * Otherwise, the message queue must belong to an inactive
+ * port, so just destroy the message and pretend it was posted.
+ */
+ destroy_msg = TRUE;
+ goto out_unlock;
}
-
+
/*
- * If the receiver waited with a facility not directly
- * related to Mach messaging, then it isn't prepared to get
- * handed the message directly. Just set it running, and
- * go look for another thread that can.
+ * If a thread is attempting a "peek" into the message queue
+ * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
+ * thread running. A successful peek is essentially the same as
+ * message delivery since the peeking thread takes responsibility
+ * for delivering the message and (eventually) removing it from
+ * the mqueue. Only one thread can successfully use the peek
+ * facility on any given port, so we exit the waitq loop after
+ * encountering such a thread.
+ */
+ if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) {
+ ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg);
+ ipc_mqueue_peek_on_thread(mqueue, receiver->ith_option, receiver);
+ thread_unlock(receiver);
+ splx(th_spl);
+ break; /* Message was posted, so break out of loop */
+ }
+
+ /*
+ * If the receiver waited with a facility not directly related
+ * to Mach messaging, then it isn't prepared to get handed the
+ * message directly. Just set it running, and go look for
+ * another thread that can.
*/
if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
- thread_unlock(receiver);
- continue;
+ thread_unlock(receiver);
+ splx(th_spl);
+ continue;
}
-
+
/*
* We found a waiting thread.
* If the message is too large or the scatter list is too small
* the thread we wake up will get that as its status.
*/
- msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
- if (receiver->ith_msize <
- (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(receiver), receiver->ith_option))) {
+ msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
+ if (receiver->ith_rsize <
+ (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver), receiver->ith_option))) {
receiver->ith_msize = msize;
receiver->ith_state = MACH_RCV_TOO_LARGE;
} else {
*/
if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
!(receiver->ith_option & MACH_RCV_LARGE)) {
-
receiver->ith_kmsg = kmsg;
receiver->ith_seqno = mqueue->imq_seqno++;
+#if MACH_FLIPC
+ mach_node_t node = kmsg->ikm_node;
+#endif
thread_unlock(receiver);
+ splx(th_spl);
/* we didn't need our reserved spot in the queue */
- ipc_mqueue_release_msgcount(mqueue);
+ ipc_mqueue_release_msgcount(mqueue, IMQ_NULL);
+
+#if MACH_FLIPC
+ if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) {
+ flipc_msg_ack(node, mqueue, TRUE);
+ }
+#endif
break;
}
receiver->ith_kmsg = IKM_NULL;
receiver->ith_seqno = 0;
thread_unlock(receiver);
+ splx(th_spl);
+ }
+
+out_unlock:
+ /* clear the waitq boost we may have been given */
+ waitq_clear_promotion_locked(&mqueue->imq_wait_queue, current_thread());
+ imq_release_and_unlock(mqueue, reserved_prepost);
+ if (destroy_msg) {
+ ipc_kmsg_destroy(kmsg);
}
- imq_unlock(mqueue);
- splx(s);
-
current_task()->messages_sent++;
return;
}
-/* static */ void
+static void
ipc_mqueue_receive_results(wait_result_t saved_wait_result)
{
- thread_t self = current_thread();
- mach_msg_option_t option = self->ith_option;
+ thread_t self = current_thread();
+ mach_msg_option_t option = self->ith_option;
/*
* why did we wake up?
}
case MACH_MSG_SUCCESS:
+ case MACH_PEEK_READY:
return;
default:
* Purpose:
* Receive a message from a message queue.
*
- * If continuation is non-zero, then we might discard
- * our kernel stack when we block. We will continue
- * after unblocking by executing continuation.
- *
- * If resume is true, then we are resuming a receive
- * operation after a blocked receive discarded our stack.
* Conditions:
* Our caller must hold a reference for the port or port set
* to which this queue belongs, to keep the queue
* from being deallocated.
*
* The kmsg is returned with clean header fields
- * and with the circular bit turned off.
+ * and with the circular bit turned off through the ith_kmsg
+ * field of the thread's receive continuation state.
* Returns:
- * MACH_MSG_SUCCESS Message returned in kmsgp.
- * MACH_RCV_TOO_LARGE Message size returned in kmsgp.
+ * MACH_MSG_SUCCESS Message returned in ith_kmsg.
+ * MACH_RCV_TOO_LARGE Message size returned in ith_msize.
* MACH_RCV_TIMED_OUT No message obtained.
* MACH_RCV_INTERRUPTED No message obtained.
* MACH_RCV_PORT_DIED Port/set died; no message.
int interruptible)
{
wait_result_t wresult;
- thread_t self = current_thread();
-
- wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size,
- rcv_timeout, interruptible,
- self);
- if (wresult == THREAD_NOT_WAITING)
- return;
+ thread_t self = current_thread();
+
+ imq_lock(mqueue);
+ wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size,
+ rcv_timeout, interruptible,
+ self);
+ /* mqueue unlocked */
+ if (wresult == THREAD_NOT_WAITING) {
+ return;
+ }
if (wresult == THREAD_WAITING) {
- counter((interruptible == THREAD_ABORTSAFE) ?
- c_ipc_mqueue_receive_block_user++ :
- c_ipc_mqueue_receive_block_kernel++);
+ counter((interruptible == THREAD_ABORTSAFE) ?
+ c_ipc_mqueue_receive_block_user++ :
+ c_ipc_mqueue_receive_block_kernel++);
- if (self->ith_continuation)
+ if (self->ith_continuation) {
thread_block(ipc_mqueue_receive_continue);
- /* NOTREACHED */
+ }
+ /* NOTREACHED */
wresult = thread_block(THREAD_CONTINUE_NULL);
}
ipc_mqueue_receive_results(wresult);
}
+static int
+mqueue_process_prepost_receive(void *ctx, struct waitq *waitq,
+ struct waitq_set *wqset)
+{
+ ipc_mqueue_t port_mq, *pmq_ptr;
+
+ (void)wqset;
+ port_mq = (ipc_mqueue_t)waitq;
+
+ /*
+ * If there are no messages on this queue, skip it and remove
+ * it from the prepost list
+ */
+ if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
+ return WQ_ITERATE_INVALIDATE_CONTINUE;
+ }
+
+ /*
+ * There are messages waiting on this port.
+ * Instruct the prepost iteration logic to break, but keep the
+ * waitq locked.
+ */
+ pmq_ptr = (ipc_mqueue_t *)ctx;
+ if (pmq_ptr) {
+ *pmq_ptr = port_mq;
+ }
+ return WQ_ITERATE_BREAK_KEEP_LOCKED;
+}
+
+/*
+ * Routine: ipc_mqueue_receive_on_thread
+ * Purpose:
+ * Receive a message from a message queue using a specified thread.
+ * If no message available, assert_wait on the appropriate waitq.
+ *
+ * Conditions:
+ * Assumes thread is self.
+ * Called with mqueue locked.
+ * Returns with mqueue unlocked.
+ * May have assert-waited. Caller must block in those cases.
+ */
wait_result_t
ipc_mqueue_receive_on_thread(
- ipc_mqueue_t mqueue,
+ ipc_mqueue_t mqueue,
mach_msg_option_t option,
mach_msg_size_t max_size,
mach_msg_timeout_t rcv_timeout,
int interruptible,
thread_t thread)
{
- ipc_kmsg_queue_t kmsgs;
wait_result_t wresult;
- uint64_t deadline;
- spl_t s;
+ uint64_t deadline;
+ struct turnstile *rcv_turnstile = TURNSTILE_NULL;
- s = splsched();
- imq_lock(mqueue);
-
- if (imq_is_set(mqueue)) {
- queue_t q;
+ /* called with mqueue locked */
- q = &mqueue->imq_preposts;
+ /* no need to reserve anything: we never prepost to anyone */
+ if (!imq_valid(mqueue)) {
+ /* someone raced us to destroy this mqueue/port! */
+ imq_unlock(mqueue);
/*
- * If we are waiting on a portset mqueue, we need to see if
- * any of the member ports have work for us. Ports that
- * have (or recently had) messages will be linked in the
- * prepost queue for the portset. By holding the portset's
- * mqueue lock during the search, we tie up any attempts by
- * mqueue_deliver or portset membership changes that may
- * cross our path.
+ * ipc_mqueue_receive_results updates the thread's ith_state
+ * TODO: differentiate between rights being moved and
+ * rights/ports being destroyed (21885327)
*/
- search_set:
- while(!queue_empty(q)) {
- wait_queue_link_t wql;
- ipc_mqueue_t port_mq;
-
- queue_remove_first(q, wql, wait_queue_link_t, wql_preposts);
- assert(!wql_is_preposted(wql));
+ return THREAD_RESTART;
+ }
- /*
- * This is a lock order violation, so we have to do it
- * "softly," putting the link back on the prepost list
- * if it fails (at the tail is fine since the order of
- * handling messages from different sources in a set is
- * not guaranteed and we'd like to skip to the next source
- * if one is available).
- */
- port_mq = (ipc_mqueue_t)wql->wql_queue;
- if (!imq_lock_try(port_mq)) {
- queue_enter(q, wql, wait_queue_link_t, wql_preposts);
- imq_unlock(mqueue);
- splx(s);
- mutex_pause(0);
- s = splsched();
- imq_lock(mqueue);
- goto search_set; /* start again at beginning - SMP */
- }
+ if (imq_is_set(mqueue)) {
+ ipc_mqueue_t port_mq = IMQ_NULL;
- /*
- * If there are no messages on this queue, just skip it
- * (we already removed the link from the set's prepost queue).
- */
- kmsgs = &port_mq->imq_messages;
- if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
- imq_unlock(port_mq);
- continue;
- }
+ (void)waitq_set_iterate_preposts(&mqueue->imq_set_queue,
+ &port_mq,
+ mqueue_process_prepost_receive);
+ if (port_mq != IMQ_NULL) {
/*
- * There are messages, so reinsert the link back
- * at the tail of the preposted queue (for fairness)
- * while we still have the portset mqueue locked.
+ * We get here if there is at least one message
+ * waiting on port_mq. We have instructed the prepost
+ * iteration logic to leave both the port_mq and the
+ * set mqueue locked.
+ *
+ * TODO: previously, we would place this port at the
+ * back of the prepost list...
*/
- queue_enter(q, wql, wait_queue_link_t, wql_preposts);
imq_unlock(mqueue);
/*
* Continue on to handling the message with just
* the port mqueue locked.
*/
- ipc_mqueue_select_on_thread(port_mq, option, max_size, thread);
+ if (option & MACH_PEEK_MSG) {
+ ipc_mqueue_peek_on_thread(port_mq, option, thread);
+ } else {
+ ipc_mqueue_select_on_thread(port_mq, mqueue, option,
+ max_size, thread);
+ }
+
imq_unlock(port_mq);
- splx(s);
return THREAD_NOT_WAITING;
-
}
-
- } else {
+ } else if (imq_is_queue(mqueue) || imq_is_turnstile_proxy(mqueue)) {
+ ipc_kmsg_queue_t kmsgs;
/*
* Receive on a single port. Just try to get the messages.
*/
- kmsgs = &mqueue->imq_messages;
+ kmsgs = &mqueue->imq_messages;
if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
- ipc_mqueue_select_on_thread(mqueue, option, max_size, thread);
+ if (option & MACH_PEEK_MSG) {
+ ipc_mqueue_peek_on_thread(mqueue, option, thread);
+ } else {
+ ipc_mqueue_select_on_thread(mqueue, IMQ_NULL, option,
+ max_size, thread);
+ }
imq_unlock(mqueue);
- splx(s);
return THREAD_NOT_WAITING;
}
+ } else {
+ panic("Unknown mqueue type 0x%x: likely memory corruption!\n",
+ mqueue->imq_wait_queue.waitq_type);
}
-
+
/*
* Looks like we'll have to block. The mqueue we will
* block on (whether the set's or the local port's) is
if (option & MACH_RCV_TIMEOUT) {
if (rcv_timeout == 0) {
imq_unlock(mqueue);
- splx(s);
thread->ith_state = MACH_RCV_TIMED_OUT;
return THREAD_NOT_WAITING;
}
}
- thread_lock(thread);
- thread->ith_state = MACH_RCV_IN_PROGRESS;
thread->ith_option = option;
- thread->ith_msize = max_size;
+ thread->ith_rsize = max_size;
+ thread->ith_msize = 0;
+
+ if (option & MACH_PEEK_MSG) {
+ thread->ith_state = MACH_PEEK_IN_PROGRESS;
+ } else {
+ thread->ith_state = MACH_RCV_IN_PROGRESS;
+ }
- if (option & MACH_RCV_TIMEOUT)
- clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
- else
+ if (option & MACH_RCV_TIMEOUT) {
+ clock_interval_to_deadline(rcv_timeout, 1000 * NSEC_PER_USEC, &deadline);
+ } else {
deadline = 0;
+ }
- wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_RECEIVE,
- interruptible,
- TIMEOUT_URGENCY_USER_NORMAL,
- deadline, 0,
- thread);
+ /*
+ * Threads waiting on a reply port (not portset)
+ * will wait on its receive turnstile.
+ *
+ * Donate waiting thread's turnstile and
+ * setup inheritor for special reply port.
+ * Based on the state of the special reply
+ * port, the inheritor would be the send
+ * turnstile of the connection port on which
+ * the send of sync ipc would happen or
+ * workloop's turnstile who would reply to
+ * the sync ipc message.
+ *
+ * Pass in mqueue wait in waitq_assert_wait to
+ * support port set wakeup. The mqueue waitq of port
+ * will be converted to to turnstile waitq
+ * in waitq_assert_wait instead of global waitqs.
+ */
+ if (imq_is_turnstile_proxy(mqueue)) {
+ ipc_port_t port = ip_from_mq(mqueue);
+ rcv_turnstile = turnstile_prepare((uintptr_t)port,
+ port_rcv_turnstile_address(port),
+ TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
+
+ ipc_port_recv_update_inheritor(port, rcv_turnstile,
+ TURNSTILE_DELAYED_UPDATE);
+ }
+
+ thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
+ wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue,
+ IPC_MQUEUE_RECEIVE,
+ interruptible,
+ TIMEOUT_URGENCY_USER_NORMAL,
+ deadline,
+ TIMEOUT_NO_LEEWAY,
+ thread);
/* preposts should be detected above, not here */
- if (wresult == THREAD_AWAKENED)
+ if (wresult == THREAD_AWAKENED) {
panic("ipc_mqueue_receive_on_thread: sleep walking");
+ }
- thread_unlock(thread);
imq_unlock(mqueue);
- splx(s);
+
+ /* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
+ if (rcv_turnstile != TURNSTILE_NULL) {
+ turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
+ }
+ /* Its callers responsibility to call turnstile_complete to get the turnstile back */
+
return wresult;
}
+/*
+ * Routine: ipc_mqueue_peek_on_thread
+ * Purpose:
+ * A receiver discovered that there was a message on the queue
+ * before he had to block. Tell a thread about the message queue,
+ * but don't pick off any messages.
+ * Conditions:
+ * port_mq locked
+ * at least one message on port_mq's message queue
+ *
+ * Returns: (on thread->ith_state)
+ * MACH_PEEK_READY ith_peekq contains a message queue
+ */
+void
+ipc_mqueue_peek_on_thread(
+ ipc_mqueue_t port_mq,
+ mach_msg_option_t option,
+ thread_t thread)
+{
+ (void)option;
+ assert(option & MACH_PEEK_MSG);
+ assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL);
+
+ /*
+ * Take a reference on the mqueue's associated port:
+ * the peeking thread will be responsible to release this reference
+ * using ip_release_mq()
+ */
+ ip_reference_mq(port_mq);
+ thread->ith_peekq = port_mq;
+ thread->ith_state = MACH_PEEK_READY;
+}
+
/*
* Routine: ipc_mqueue_select_on_thread
* Purpose:
* mqueue locked.
* thread not locked.
* There is a message.
+ * No need to reserve prepost objects - it will never prepost
+ *
* Returns:
* MACH_MSG_SUCCESS Actually selected a message for ourselves.
* MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
*/
void
ipc_mqueue_select_on_thread(
- ipc_mqueue_t mqueue,
- mach_msg_option_t option,
- mach_msg_size_t max_size,
+ ipc_mqueue_t port_mq,
+ ipc_mqueue_t set_mq,
+ mach_msg_option_t option,
+ mach_msg_size_t max_size,
thread_t thread)
{
ipc_kmsg_t kmsg;
mach_msg_return_t mr = MACH_MSG_SUCCESS;
- mach_msg_size_t rcv_size;
+ mach_msg_size_t msize;
/*
* Do some sanity checking of our ability to receive
* before pulling the message off the queue.
*/
- kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
+ kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
assert(kmsg != IKM_NULL);
/*
* the queue, instead return the appropriate error
* (and size needed).
*/
- rcv_size = ipc_kmsg_copyout_size(kmsg, thread->map);
- if (rcv_size + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) {
+ msize = ipc_kmsg_copyout_size(kmsg, thread->map);
+ if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(thread), option) > max_size) {
mr = MACH_RCV_TOO_LARGE;
if (option & MACH_RCV_LARGE) {
- thread->ith_receiver_name = mqueue->imq_receiver_name;
+ thread->ith_receiver_name = port_mq->imq_receiver_name;
thread->ith_kmsg = IKM_NULL;
- thread->ith_msize = rcv_size;
+ thread->ith_msize = msize;
thread->ith_seqno = 0;
thread->ith_state = mr;
return;
}
}
- ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
- ipc_mqueue_release_msgcount(mqueue);
- thread->ith_seqno = mqueue->imq_seqno++;
+ ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
+#if MACH_FLIPC
+ if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) {
+ flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
+ }
+#endif
+ ipc_mqueue_release_msgcount(port_mq, set_mq);
+ thread->ith_seqno = port_mq->imq_seqno++;
thread->ith_kmsg = kmsg;
thread->ith_state = mr;
}
/*
- * Routine: ipc_mqueue_peek
+ * Routine: ipc_mqueue_peek_locked
* Purpose:
* Peek at a (non-set) message queue to see if it has a message
* matching the sequence number provided (if zero, then the
* message.
*
* Conditions:
- * Locks may be held by callers, so this routine cannot block.
+ * The ipc_mqueue_t is locked by callers.
+ * Other locks may be held by callers, so this routine cannot block.
* Caller holds reference on the message queue.
*/
unsigned
-ipc_mqueue_peek(ipc_mqueue_t mq,
- mach_port_seqno_t *seqnop,
- mach_msg_size_t *msg_sizep,
- mach_msg_id_t *msg_idp,
- mach_msg_max_trailer_t *msg_trailerp)
+ipc_mqueue_peek_locked(ipc_mqueue_t mq,
+ mach_port_seqno_t * seqnop,
+ mach_msg_size_t * msg_sizep,
+ mach_msg_id_t * msg_idp,
+ mach_msg_max_trailer_t * msg_trailerp,
+ ipc_kmsg_t *kmsgp)
{
ipc_kmsg_queue_t kmsgq;
- ipc_kmsg_t kmsg;
+ ipc_kmsg_t kmsg;
mach_port_seqno_t seqno, msgoff;
- int res = 0;
- spl_t s;
+ unsigned res = 0;
assert(!imq_is_set(mq));
- s = splsched();
- imq_lock(mq);
-
- seqno = (seqnop != NULL) ? seqno = *seqnop : 0;
+ seqno = 0;
+ if (seqnop != NULL) {
+ seqno = *seqnop;
+ }
if (seqno == 0) {
seqno = mq->imq_seqno;
msgoff = 0;
- } else if (seqno >= mq->imq_seqno &&
- seqno < mq->imq_seqno + mq->imq_msgcount) {
+ } else if (seqno >= mq->imq_seqno &&
+ seqno < mq->imq_seqno + mq->imq_msgcount) {
msgoff = seqno - mq->imq_seqno;
- } else
+ } else {
goto out;
+ }
/* look for the message that would match that seqno */
kmsgq = &mq->imq_messages;
while (msgoff-- && kmsg != IKM_NULL) {
kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
}
- if (kmsg == IKM_NULL)
+ if (kmsg == IKM_NULL) {
goto out;
+ }
/* found one - return the requested info */
- if (seqnop != NULL)
+ if (seqnop != NULL) {
*seqnop = seqno;
- if (msg_sizep != NULL)
+ }
+ if (msg_sizep != NULL) {
*msg_sizep = kmsg->ikm_header->msgh_size;
- if (msg_idp != NULL)
+ }
+ if (msg_idp != NULL) {
*msg_idp = kmsg->ikm_header->msgh_id;
- if (msg_trailerp != NULL)
- memcpy(msg_trailerp,
- (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
- round_msg(kmsg->ikm_header->msgh_size)),
- sizeof(mach_msg_max_trailer_t));
+ }
+ if (msg_trailerp != NULL) {
+ memcpy(msg_trailerp,
+ (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
+ round_msg(kmsg->ikm_header->msgh_size)),
+ sizeof(mach_msg_max_trailer_t));
+ }
+ if (kmsgp != NULL) {
+ *kmsgp = kmsg;
+ }
+
res = 1;
- out:
+out:
+ return res;
+}
+
+
+/*
+ * Routine: ipc_mqueue_peek
+ * Purpose:
+ * Peek at a (non-set) message queue to see if it has a message
+ * matching the sequence number provided (if zero, then the
+ * first message in the queue) and return vital info about the
+ * message.
+ *
+ * Conditions:
+ * The ipc_mqueue_t is unlocked.
+ * Locks may be held by callers, so this routine cannot block.
+ * Caller holds reference on the message queue.
+ */
+unsigned
+ipc_mqueue_peek(ipc_mqueue_t mq,
+ mach_port_seqno_t * seqnop,
+ mach_msg_size_t * msg_sizep,
+ mach_msg_id_t * msg_idp,
+ mach_msg_max_trailer_t * msg_trailerp,
+ ipc_kmsg_t *kmsgp)
+{
+ unsigned res;
+
+ imq_lock(mq);
+
+ res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
+ msg_trailerp, kmsgp);
+
imq_unlock(mq);
- splx(s);
return res;
}
+/*
+ * Routine: ipc_mqueue_release_peek_ref
+ * Purpose:
+ * Release the reference on an mqueue's associated port which was
+ * granted to a thread in ipc_mqueue_peek_on_thread (on the
+ * MACH_PEEK_MSG thread wakeup path).
+ *
+ * Conditions:
+ * The ipc_mqueue_t should be locked on entry.
+ * The ipc_mqueue_t will be _unlocked_ on return
+ * (and potentially invalid!)
+ *
+ */
+void
+ipc_mqueue_release_peek_ref(ipc_mqueue_t mq)
+{
+ assert(!imq_is_set(mq));
+ assert(imq_held(mq));
+
+ /*
+ * clear any preposts this mq may have generated
+ * (which would cause subsequent immediate wakeups)
+ */
+ waitq_clear_prepost_locked(&mq->imq_wait_queue);
+
+ imq_unlock(mq);
+
+ /*
+ * release the port reference: we need to do this outside the lock
+ * because we might be holding the last port reference!
+ **/
+ ip_release_mq(mq);
+}
+
+/*
+ * peek at the contained port message queues, break prepost iteration as soon
+ * as we spot a message on one of the message queues referenced by the set's
+ * prepost list. No need to lock each message queue, as only the head of each
+ * queue is checked. If a message wasn't there before we entered here, no need
+ * to find it (if we do, great).
+ */
+static int
+mqueue_peek_iterator(void *ctx, struct waitq *waitq,
+ struct waitq_set *wqset)
+{
+ ipc_mqueue_t port_mq = (ipc_mqueue_t)waitq;
+ ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
+
+ (void)ctx;
+ (void)wqset;
+
+ if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
+ return WQ_ITERATE_BREAK; /* break out of the prepost iteration */
+ }
+ return WQ_ITERATE_CONTINUE;
+}
+
/*
* Routine: ipc_mqueue_set_peek
* Purpose:
unsigned
ipc_mqueue_set_peek(ipc_mqueue_t mq)
{
- wait_queue_link_t wql;
- queue_t q;
- spl_t s;
- int res;
+ int ret;
- assert(imq_is_set(mq));
-
- s = splsched();
imq_lock(mq);
- /*
- * peek at the contained port message queues, return as soon as
- * we spot a message on one of the message queues linked on the
- * prepost list. No need to lock each message queue, as only the
- * head of each queue is checked. If a message wasn't there before
- * we entered here, no need to find it (if we do, great).
+ /*
+ * We may have raced with port destruction where the mqueue is marked
+ * as invalid. In that case, even though we don't have messages, we
+ * have an end-of-life event to deliver.
*/
- res = 0;
- q = &mq->imq_preposts;
- queue_iterate(q, wql, wait_queue_link_t, wql_preposts) {
- ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
- ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
-
- if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
- res = 1;
- break;
- }
+ if (!imq_is_valid(mq)) {
+ return 1;
}
+
+ ret = waitq_set_iterate_preposts(&mq->imq_set_queue, NULL,
+ mqueue_peek_iterator);
+
imq_unlock(mq);
- splx(s);
- return res;
+
+ return ret == WQ_ITERATE_BREAK;
}
/*
* Routine: ipc_mqueue_set_gather_member_names
* Purpose:
- * Iterate a message queue set to identify the member port
- * names. Actual returned names is limited to maxnames entries,
- * but we keep counting the actual number of members to let
- * the caller decide to retry if necessary.
+ * Discover all ports which are members of a given port set.
+ * Because the waitq linkage mechanism was redesigned to save
+ * significan amounts of memory, it no longer keeps back-pointers
+ * from a port set to a port. Therefore, we must iterate over all
+ * ports within a given IPC space and individually query them to
+ * see if they are members of the given set. Port names of ports
+ * found to be members of the given set will be gathered into the
+ * provided 'names' array. Actual returned names are limited to
+ * maxnames entries, but we keep counting the actual number of
+ * members to let the caller decide to retry if necessary.
*
* Conditions:
* Locks may be held by callers, so this routine cannot block.
- * Caller holds reference on the message queue.
+ * Caller holds reference on the message queue (via port set).
*/
void
ipc_mqueue_set_gather_member_names(
- ipc_mqueue_t mq,
- ipc_entry_num_t maxnames,
+ ipc_space_t space,
+ ipc_mqueue_t set_mq,
+ ipc_entry_num_t maxnames,
mach_port_name_t *names,
ipc_entry_num_t *actualp)
{
- wait_queue_link_t wql;
- queue_t q;
- spl_t s;
+ ipc_entry_t table;
+ ipc_entry_num_t tsize;
+ struct waitq_set *wqset;
ipc_entry_num_t actual = 0;
- assert(imq_is_set(mq));
+ assert(set_mq != IMQ_NULL);
+ wqset = &set_mq->imq_set_queue;
- s = splsched();
- imq_lock(mq);
+ assert(space != IS_NULL);
+ is_read_lock(space);
+ if (!is_active(space)) {
+ is_read_unlock(space);
+ goto out;
+ }
- /*
- * Iterate over the member ports through the mqueue set links
- * capturing as many names as we can.
- */
- q = &mq->imq_setlinks;
- queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
- ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
+ if (!waitq_set_is_valid(wqset)) {
+ is_read_unlock(space);
+ goto out;
+ }
- if (actual < maxnames)
- names[actual] = port_mq->imq_receiver_name;
- actual++;
+ table = space->is_table;
+ tsize = space->is_table_size;
+ for (ipc_entry_num_t idx = 0; idx < tsize; idx++) {
+ ipc_entry_t entry = &table[idx];
+
+ /* only receive rights can be members of port sets */
+ if ((entry->ie_bits & MACH_PORT_TYPE_RECEIVE) != MACH_PORT_TYPE_NONE) {
+ ipc_port_t port = ip_object_to_port(entry->ie_object);
+ ipc_mqueue_t mq = &port->ip_messages;
+
+ assert(IP_VALID(port));
+ if (ip_active(port) &&
+ waitq_member(&mq->imq_wait_queue, wqset)) {
+ if (actual < maxnames) {
+ names[actual] = mq->imq_receiver_name;
+ }
+ actual++;
+ }
+ }
}
- imq_unlock(mq);
- splx(s);
+ is_read_unlock(space);
+
+out:
*actualp = actual;
}
/*
- * Routine: ipc_mqueue_destroy
+ * Routine: ipc_mqueue_destroy_locked
* Purpose:
* Destroy a (non-set) message queue.
* Set any blocked senders running.
- * Destroy the kmsgs in the queue.
+ * Destroy the kmsgs in the queue.
* Conditions:
- * Nothing locked.
+ * mqueue locked
* Receivers were removed when the receive right was "changed"
*/
-void
-ipc_mqueue_destroy(
- ipc_mqueue_t mqueue)
+boolean_t
+ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue)
{
ipc_kmsg_queue_t kmqueue;
ipc_kmsg_t kmsg;
boolean_t reap = FALSE;
- spl_t s;
+ struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
+
+ assert(!imq_is_set(mqueue));
- s = splsched();
- imq_lock(mqueue);
/*
* rouse all blocked senders
+ * (don't boost anyone - we're tearing this queue down)
+ * (never preposts)
*/
mqueue->imq_fullwaiters = FALSE;
- wait_queue_wakeup64_all_locked(
- &mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_RESTART,
- FALSE);
+
+ if (send_turnstile != TURNSTILE_NULL) {
+ waitq_wakeup64_all(&send_turnstile->ts_waitq,
+ IPC_MQUEUE_FULL,
+ THREAD_RESTART,
+ WAITQ_ALL_PRIORITIES);
+ }
/*
* Move messages from the specified queue to the per-thread
*/
kmqueue = &mqueue->imq_messages;
while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
+#if MACH_FLIPC
+ if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(mqueue->imq_fport)) {
+ flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
+ }
+#endif
boolean_t first;
first = ipc_kmsg_delayed_destroy(kmsg);
- if (first)
+ if (first) {
reap = first;
+ }
}
- imq_unlock(mqueue);
- splx(s);
+ /*
+ * Wipe out message count, both for messages about to be
+ * reaped and for reserved space for (previously) woken senders.
+ * This is the indication to them that their reserved space is gone
+ * (the mqueue was destroyed).
+ */
+ mqueue->imq_msgcount = 0;
+
+ /* invalidate the waitq for subsequent mqueue operations */
+ waitq_invalidate_locked(&mqueue->imq_wait_queue);
+
+ /* clear out any preposting we may have done */
+ waitq_clear_prepost_locked(&mqueue->imq_wait_queue);
/*
- * Destroy the messages we enqueued if we aren't nested
- * inside some other attempt to drain the same queue.
+ * assert that we are destroying / invalidating a queue that's
+ * not a member of any other queue.
*/
- if (reap)
- ipc_kmsg_reap_delayed();
+ assert(mqueue->imq_preposts == 0);
+ assert(mqueue->imq_in_pset == 0);
+
+ return reap;
}
/*
void
ipc_mqueue_set_qlimit(
- ipc_mqueue_t mqueue,
- mach_port_msgcount_t qlimit)
+ ipc_mqueue_t mqueue,
+ mach_port_msgcount_t qlimit)
{
- spl_t s;
-
- assert(qlimit <= MACH_PORT_QLIMIT_MAX);
-
- /* wake up senders allowed by the new qlimit */
- s = splsched();
- imq_lock(mqueue);
- if (qlimit > mqueue->imq_qlimit) {
- mach_port_msgcount_t i, wakeup;
-
- /* caution: wakeup, qlimit are unsigned */
- wakeup = qlimit - mqueue->imq_qlimit;
-
- for (i = 0; i < wakeup; i++) {
- if (wait_queue_wakeup64_one_locked(
- &mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_AWAKENED,
- FALSE) == KERN_NOT_WAITING) {
- mqueue->imq_fullwaiters = FALSE;
- break;
- }
- mqueue->imq_msgcount++; /* give it to the awakened thread */
- }
- }
+ assert(qlimit <= MACH_PORT_QLIMIT_MAX);
+
+ /* wake up senders allowed by the new qlimit */
+ imq_lock(mqueue);
+ if (qlimit > mqueue->imq_qlimit) {
+ mach_port_msgcount_t i, wakeup;
+ struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
+
+ /* caution: wakeup, qlimit are unsigned */
+ wakeup = qlimit - mqueue->imq_qlimit;
+
+ for (i = 0; i < wakeup; i++) {
+ /*
+ * boost the priority of the awoken thread
+ * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
+ * the message queue slot we've just reserved.
+ *
+ * NOTE: this will never prepost
+ */
+ if (send_turnstile == TURNSTILE_NULL ||
+ waitq_wakeup64_one(&send_turnstile->ts_waitq,
+ IPC_MQUEUE_FULL,
+ THREAD_AWAKENED,
+ WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
+ mqueue->imq_fullwaiters = FALSE;
+ break;
+ }
+ mqueue->imq_msgcount++; /* give it to the awakened thread */
+ }
+ }
mqueue->imq_qlimit = qlimit;
imq_unlock(mqueue);
- splx(s);
}
/*
*/
void
ipc_mqueue_set_seqno(
- ipc_mqueue_t mqueue,
- mach_port_seqno_t seqno)
+ ipc_mqueue_t mqueue,
+ mach_port_seqno_t seqno)
{
- spl_t s;
-
- s = splsched();
imq_lock(mqueue);
mqueue->imq_seqno = seqno;
imq_unlock(mqueue);
- splx(s);
}
mach_msg_return_t
ipc_mqueue_copyin(
- ipc_space_t space,
- mach_port_name_t name,
- ipc_mqueue_t *mqueuep,
- ipc_object_t *objectp)
+ ipc_space_t space,
+ mach_port_name_t name,
+ ipc_mqueue_t *mqueuep,
+ ipc_object_t *objectp)
{
ipc_entry_t entry;
+ ipc_entry_bits_t bits;
ipc_object_t object;
ipc_mqueue_t mqueue;
return MACH_RCV_INVALID_NAME;
}
+ bits = entry->ie_bits;
object = entry->ie_object;
- if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
- ipc_port_t port;
+ if (bits & MACH_PORT_TYPE_RECEIVE) {
+ ipc_port_t port = ip_object_to_port(object);
- port = (ipc_port_t) object;
assert(port != IP_NULL);
ip_lock(port);
- assert(ip_active(port));
+ require_ip_active(port);
assert(port->ip_receiver_name == name);
assert(port->ip_receiver == space);
is_read_unlock(space);
mqueue = &port->ip_messages;
+ } else if (bits & MACH_PORT_TYPE_PORT_SET) {
+ ipc_pset_t pset = ips_object_to_pset(object);
- } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
- ipc_pset_t pset;
-
- pset = (ipc_pset_t) object;
assert(pset != IPS_NULL);
ips_lock(pset);
assert(ips_active(pset));
- assert(pset->ips_local_name == name);
is_read_unlock(space);
mqueue = &pset->ips_messages;
} else {
is_read_unlock(space);
+ /* guard exception if we never held the receive right in this entry */
+ if ((bits & MACH_PORT_TYPE_EX_RECEIVE) == 0) {
+ mach_port_guard_exception(name, 0, 0, kGUARD_EXC_RCV_INVALID_NAME);
+ }
return MACH_RCV_INVALID_NAME;
}
return MACH_MSG_SUCCESS;
}
+void
+imq_lock(ipc_mqueue_t mq)
+{
+ ipc_object_t object = imq_to_object(mq);
+ ipc_object_validate(object);
+ waitq_lock(&(mq)->imq_wait_queue);
+}
+
+unsigned int
+imq_lock_try(ipc_mqueue_t mq)
+{
+ ipc_object_t object = imq_to_object(mq);
+ ipc_object_validate(object);
+ return waitq_lock_try(&(mq)->imq_wait_queue);
+}