/*
- * Copyright (c) 2000 Apple Computer, Inc. All rights reserved.
+ * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
*
- * @APPLE_LICENSE_HEADER_START@
+ * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
*
- * The contents of this file constitute Original Code as defined in and
- * are subject to the Apple Public Source License Version 1.1 (the
- * "License"). You may not use this file except in compliance with the
- * License. Please obtain a copy of the License at
- * http://www.apple.com/publicsource and read it before using this file.
+ * This file contains Original Code and/or Modifications of Original Code
+ * as defined in and that are subject to the Apple Public Source License
+ * Version 2.0 (the 'License'). You may not use this file except in
+ * compliance with the License. The rights granted to you under the License
+ * may not be used to create, or enable the creation or redistribution of,
+ * unlawful or unlicensed copies of an Apple operating system, or to
+ * circumvent, violate, or enable the circumvention or violation of, any
+ * terms of an Apple operating system software license agreement.
*
- * This Original Code and all software distributed under the License are
- * distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, EITHER
+ * Please obtain a copy of the License at
+ * http://www.opensource.apple.com/apsl/ and read it before using this file.
+ *
+ * The Original Code and all software distributed under the License are
+ * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
* EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
* INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. Please see the
- * License for the specific language governing rights and limitations
- * under the License.
+ * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
+ * Please see the License for the specific language governing rights and
+ * limitations under the License.
*
- * @APPLE_LICENSE_HEADER_END@
+ * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
*/
/*
* @OSF_FREE_COPYRIGHT@
*
* Functions to manipulate IPC message queues.
*/
+/*
+ * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
+ * support for mandatory and extensible security protections. This notice
+ * is included in support of clause 2.2 (b) of the Apple Public License,
+ * Version 2.0.
+ */
+
#include <mach/port.h>
#include <mach/message.h>
#include <kern/counters.h>
#include <kern/sched_prim.h>
#include <kern/ipc_kobject.h>
+#include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
#include <kern/misc_protos.h>
#include <kern/task.h>
#include <kern/thread.h>
#include <ipc/ipc_pset.h>
#include <ipc/ipc_space.h>
-#include <ddb/tr.h>
+#ifdef __LP64__
+#include <vm/vm_map.h>
+#endif
+
+#if CONFIG_MACF_MACH
+#include <security/mac_mach_internal.h>
+#endif
int ipc_mqueue_full; /* address is event for queue space */
int ipc_mqueue_rcv; /* address is event for message arrival */
-#define TR_ENABLE 0
+/* forward declarations */
+void ipc_mqueue_receive_results(wait_result_t result);
/*
* Routine: ipc_mqueue_init
boolean_t is_set)
{
if (is_set) {
- wait_queue_sub_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
+ wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST);
} else {
wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
ipc_kmsg_queue_init(&mqueue->imq_messages);
boolean_t
ipc_mqueue_member(
- ipc_mqueue_t port_mqueue,
- ipc_mqueue_t set_mqueue)
+ ipc_mqueue_t port_mqueue,
+ ipc_mqueue_t set_mqueue)
{
wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
- wait_queue_t set_waitq = &set_mqueue->imq_wait_queue;
+ wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
return (wait_queue_member(port_waitq, set_waitq));
* Routine: ipc_mqueue_remove
* Purpose:
* Remove the association between the queue and the specified
- * subordinate message queue.
+ * set message queue.
*/
kern_return_t
ipc_mqueue_remove(
ipc_mqueue_t mqueue,
- ipc_mqueue_t sub_mqueue)
+ ipc_mqueue_t set_mqueue)
{
wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
- wait_queue_sub_t sub_waitq = &sub_mqueue->imq_set_queue;
+ wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
- if (wait_queue_member(mq_waitq, sub_waitq)) {
- wait_queue_unlink(mq_waitq, sub_waitq);
- return KERN_SUCCESS;
- }
- return KERN_NOT_IN_SET;
+ return wait_queue_unlink(mq_waitq, set_waitq);
}
/*
- * Routine: ipc_mqueue_remove_one
+ * Routine: ipc_mqueue_remove_from_all
* Purpose:
- * Find and remove one subqueue from the queue.
+ * Remove the mqueue from all the sets it is a member of
* Conditions:
- * Will return the set mqueue that was removed
+ * Nothing locked.
*/
void
-ipc_mqueue_remove_one(
- ipc_mqueue_t mqueue,
- ipc_mqueue_t *sub_queuep)
+ipc_mqueue_remove_from_all(
+ ipc_mqueue_t mqueue)
{
wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
- wait_queue_unlink_one(mq_waitq, (wait_queue_sub_t *)sub_queuep);
+ wait_queue_unlink_all(mq_waitq);
+ return;
+}
+
+/*
+ * Routine: ipc_mqueue_remove_all
+ * Purpose:
+ * Remove all the member queues from the specified set.
+ * Conditions:
+ * Nothing locked.
+ */
+void
+ipc_mqueue_remove_all(
+ ipc_mqueue_t mqueue)
+{
+ wait_queue_set_t mq_setq = &mqueue->imq_set_queue;
+
+ wait_queue_set_unlink_all(mq_setq);
return;
}
ipc_mqueue_t set_mqueue)
{
wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
- wait_queue_sub_t set_waitq = &set_mqueue->imq_set_queue;
+ wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
ipc_kmsg_queue_t kmsgq;
ipc_kmsg_t kmsg, next;
kern_return_t kr;
for (;;) {
thread_t th;
+ mach_msg_size_t msize;
- th = wait_queue_wakeup_identity_locked(port_waitq,
- IPC_MQUEUE_RECEIVE,
- THREAD_AWAKENED,
- FALSE);
+ th = wait_queue_wakeup64_identity_locked(
+ port_waitq,
+ IPC_MQUEUE_RECEIVE,
+ THREAD_AWAKENED,
+ FALSE);
/* waitq/mqueue still locked, thread locked */
if (th == THREAD_NULL)
goto leave;
+ /*
+ * If the receiver waited with a facility not directly
+ * related to Mach messaging, then it isn't prepared to get
+ * handed the message directly. Just set it running, and
+ * go look for another thread that can.
+ */
+ if (th->ith_state != MACH_RCV_IN_PROGRESS) {
+ thread_unlock(th);
+ continue;
+ }
+
/*
* Found a receiver. see if they can handle the message
* correctly (the message is not too large for them, or
* the list and let them go back and figure it out and
* just move onto the next.
*/
+ msize = ipc_kmsg_copyout_size(kmsg, th->map);
if (th->ith_msize <
- kmsg->ikm_header.msgh_size +
- REQUESTED_TRAILER_SIZE(th->ith_option)) {
+ (msize + REQUESTED_TRAILER_SIZE(th->ith_option))) {
th->ith_state = MACH_RCV_TOO_LARGE;
- th->ith_msize = kmsg->ikm_header.msgh_size;
+ th->ith_msize = msize;
if (th->ith_option & MACH_RCV_LARGE) {
/*
* let him go without message
*/
+ th->ith_receiver_name = port_mqueue->imq_receiver_name;
th->ith_kmsg = IKM_NULL;
th->ith_seqno = 0;
thread_unlock(th);
* This thread is going to take this message,
* so give it to him.
*/
- ipc_mqueue_release_msgcount(port_mqueue);
ipc_kmsg_rmqueue(kmsgq, kmsg);
+ ipc_mqueue_release_msgcount(port_mqueue);
+
th->ith_kmsg = kmsg;
th->ith_seqno = port_mqueue->imq_seqno++;
thread_unlock(th);
ipc_mqueue_changed(
ipc_mqueue_t mqueue)
{
- wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_RECEIVE,
- THREAD_RESTART,
- FALSE); /* unlock waitq? */
+ wait_queue_wakeup64_all_locked(
+ &mqueue->imq_wait_queue,
+ IPC_MQUEUE_RECEIVE,
+ THREAD_RESTART,
+ FALSE); /* unlock waitq? */
}
mach_msg_return_t
ipc_mqueue_send(
ipc_mqueue_t mqueue,
- ipc_kmsg_t kmsg,
+ ipc_kmsg_t kmsg,
mach_msg_option_t option,
- mach_msg_timeout_t timeout)
+ mach_msg_timeout_t send_timeout,
+ spl_t s)
{
- int save_wait_result;
- spl_t s;
+ int wresult;
/*
* Don't block if:
* 2) Caller used the MACH_SEND_ALWAYS internal option.
* 3) Message is sent to a send-once right.
*/
- s = splsched();
- imq_lock(mqueue);
-
if (!imq_full(mqueue) ||
- (option & MACH_SEND_ALWAYS) ||
- (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) ==
- MACH_MSG_TYPE_PORT_SEND_ONCE)) {
+ (!imq_full_kernel(mqueue) &&
+ ((option & MACH_SEND_ALWAYS) ||
+ (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
+ MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
mqueue->imq_msgcount++;
+ assert(mqueue->imq_msgcount > 0);
imq_unlock(mqueue);
splx(s);
} else {
+ thread_t cur_thread = current_thread();
+ uint64_t deadline;
/*
* We have to wait for space to be granted to us.
*/
- if ((option & MACH_SEND_TIMEOUT) && (timeout == 0)) {
+ if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
imq_unlock(mqueue);
splx(s);
return MACH_SEND_TIMED_OUT;
}
+ if (imq_full_kernel(mqueue)) {
+ imq_unlock(mqueue);
+ splx(s);
+ return MACH_SEND_NO_BUFFER;
+ }
mqueue->imq_fullwaiters = TRUE;
- wait_queue_assert_wait_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_ABORTSAFE,
- TRUE); /* unlock? */
- /* wait/mqueue is unlocked */
+ thread_lock(cur_thread);
+ if (option & MACH_SEND_TIMEOUT)
+ clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
+ else
+ deadline = 0;
+ wresult = wait_queue_assert_wait64_locked(
+ &mqueue->imq_wait_queue,
+ IPC_MQUEUE_FULL,
+ THREAD_ABORTSAFE, deadline,
+ cur_thread);
+ thread_unlock(cur_thread);
+ imq_unlock(mqueue);
splx(s);
- if (option & MACH_SEND_TIMEOUT)
- thread_set_timer(timeout, 1000*NSEC_PER_USEC);
-
- counter(c_ipc_mqueue_send_block++);
- save_wait_result = thread_block((void (*)(void)) 0);
+ if (wresult == THREAD_WAITING) {
+ wresult = thread_block(THREAD_CONTINUE_NULL);
+ counter(c_ipc_mqueue_send_block++);
+ }
- switch (save_wait_result) {
+ switch (wresult) {
case THREAD_TIMED_OUT:
assert(option & MACH_SEND_TIMEOUT);
return MACH_SEND_TIMED_OUT;
case THREAD_AWAKENED:
/* we can proceed - inherited msgcount from waker */
- if (option & MACH_SEND_TIMEOUT)
- thread_cancel_timer();
+ assert(mqueue->imq_msgcount > 0);
break;
case THREAD_INTERRUPTED:
- if (option & MACH_SEND_TIMEOUT)
- thread_cancel_timer();
return MACH_SEND_INTERRUPTED;
case THREAD_RESTART:
+ /* mqueue is being destroyed */
+ return MACH_SEND_INVALID_DEST;
default:
panic("ipc_mqueue_send");
}
* found a waiter.
*
* Conditions:
- * The message queue is locked
+ * The message queue is locked.
+ * The message corresponding to this reference is off the queue.
*/
void
ipc_mqueue_release_msgcount(
ipc_mqueue_t mqueue)
{
assert(imq_held(mqueue));
- assert(mqueue->imq_msgcount > 0);
+ assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
mqueue->imq_msgcount--;
+
if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
- if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_AWAKENED,
- FALSE) != KERN_SUCCESS) {
+ if (wait_queue_wakeup64_one_locked(
+ &mqueue->imq_wait_queue,
+ IPC_MQUEUE_FULL,
+ THREAD_AWAKENED,
+ FALSE) != KERN_SUCCESS) {
mqueue->imq_fullwaiters = FALSE;
} else {
- mqueue->imq_msgcount++; /* gave it away */
+ /* gave away our slot - add reference back */
+ mqueue->imq_msgcount++;
}
}
}
register ipc_mqueue_t mqueue,
register ipc_kmsg_t kmsg)
{
-
spl_t s;
/*
for (;;) {
wait_queue_t waitq = &mqueue->imq_wait_queue;
thread_t receiver;
+ mach_msg_size_t msize;
- receiver = wait_queue_wakeup_identity_locked(waitq,
- IPC_MQUEUE_RECEIVE,
- THREAD_AWAKENED,
- FALSE);
+ receiver = wait_queue_wakeup64_identity_locked(
+ waitq,
+ IPC_MQUEUE_RECEIVE,
+ THREAD_AWAKENED,
+ FALSE);
/* waitq still locked, thread locked */
if (receiver == THREAD_NULL) {
ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
break;
}
-
+
+ /*
+ * If the receiver waited with a facility not directly
+ * related to Mach messaging, then it isn't prepared to get
+ * handed the message directly. Just set it running, and
+ * go look for another thread that can.
+ */
+ if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
+ thread_unlock(receiver);
+ continue;
+ }
+
+
/*
* We found a waiting thread.
* If the message is too large or the scatter list is too small
* the thread we wake up will get that as its status.
*/
+ msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
if (receiver->ith_msize <
- (kmsg->ikm_header.msgh_size) +
- REQUESTED_TRAILER_SIZE(receiver->ith_option)) {
- receiver->ith_msize = kmsg->ikm_header.msgh_size;
+ (msize + REQUESTED_TRAILER_SIZE(receiver->ith_option))) {
+ receiver->ith_msize = msize;
receiver->ith_state = MACH_RCV_TOO_LARGE;
} else {
receiver->ith_state = MACH_MSG_SUCCESS;
}
-kern_return_t
-ipc_mqueue_receive_results(void)
+/* static */ void
+ipc_mqueue_receive_results(wait_result_t saved_wait_result)
{
thread_t self = current_thread();
mach_msg_option_t option = self->ith_option;
- kern_return_t saved_wait_result = self->wait_result;
- kern_return_t mr;
/*
* why did we wake up?
return;
case THREAD_INTERRUPTED:
- if (option & MACH_RCV_TIMEOUT)
- thread_cancel_timer();
self->ith_state = MACH_RCV_INTERRUPTED;
return;
case THREAD_RESTART:
/* something bad happened to the port/set */
- if (option & MACH_RCV_TIMEOUT)
- thread_cancel_timer();
self->ith_state = MACH_RCV_PORT_CHANGED;
return;
* We do not need to go select a message, somebody
* handed us one (or a too-large indication).
*/
- if (option & MACH_RCV_TIMEOUT)
- thread_cancel_timer();
-
- mr = MACH_MSG_SUCCESS;
-
switch (self->ith_state) {
case MACH_RCV_SCATTER_SMALL:
case MACH_RCV_TOO_LARGE:
}
void
-ipc_mqueue_receive_continue(void)
+ipc_mqueue_receive_continue(
+ __unused void *param,
+ wait_result_t wresult)
{
- ipc_mqueue_receive_results();
+ ipc_mqueue_receive_results(wresult);
mach_msg_receive_continue(); /* hard-coded for now */
}
void
ipc_mqueue_receive(
- ipc_mqueue_t mqueue,
- mach_msg_option_t option,
- mach_msg_size_t max_size,
- mach_msg_timeout_t timeout,
- int interruptible)
+ ipc_mqueue_t mqueue,
+ mach_msg_option_t option,
+ mach_msg_size_t max_size,
+ mach_msg_timeout_t rcv_timeout,
+ int interruptible)
{
- ipc_port_t port;
- mach_msg_return_t mr, mr2;
- ipc_kmsg_queue_t kmsgs;
- kern_return_t save_wait_result;
- thread_t self;
- ipc_kmsg_t *kmsgp;
- mach_port_seqno_t *seqnop;
- spl_t s;
+ wait_result_t wresult;
+ thread_t self = current_thread();
+
+ wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size,
+ rcv_timeout, interruptible,
+ self);
+ if (wresult == THREAD_NOT_WAITING)
+ return;
+
+ if (wresult == THREAD_WAITING) {
+ counter((interruptible == THREAD_ABORTSAFE) ?
+ c_ipc_mqueue_receive_block_user++ :
+ c_ipc_mqueue_receive_block_kernel++);
+
+ if (self->ith_continuation)
+ thread_block(ipc_mqueue_receive_continue);
+ /* NOTREACHED */
+
+ wresult = thread_block(THREAD_CONTINUE_NULL);
+ }
+ ipc_mqueue_receive_results(wresult);
+}
+
+wait_result_t
+ipc_mqueue_receive_on_thread(
+ ipc_mqueue_t mqueue,
+ mach_msg_option_t option,
+ mach_msg_size_t max_size,
+ mach_msg_timeout_t rcv_timeout,
+ int interruptible,
+ thread_t thread)
+{
+ ipc_kmsg_queue_t kmsgs;
+ wait_result_t wresult;
+ uint64_t deadline;
+ spl_t s;
+#if CONFIG_MACF_MACH
+ ipc_labelh_t lh;
+ task_t task;
+ int rc;
+#endif
s = splsched();
imq_lock(mqueue);
if (imq_is_set(mqueue)) {
- wait_queue_link_t wql;
- ipc_mqueue_t port_mq;
queue_t q;
- q = &mqueue->imq_setlinks;
+ q = &mqueue->imq_preposts;
/*
* If we are waiting on a portset mqueue, we need to see if
- * any of the member ports have work for us. If so, try to
- * deliver one of those messages. By holding the portset's
+ * any of the member ports have work for us. Ports that
+ * have (or recently had) messages will be linked in the
+ * prepost queue for the portset. By holding the portset's
* mqueue lock during the search, we tie up any attempts by
* mqueue_deliver or portset membership changes that may
- * cross our path. But this is a lock order violation, so we
- * have to do it "softly." If we don't find a message waiting
- * for us, we will assert our intention to wait while still
- * holding that lock. When we release the lock, the deliver/
- * change will succeed and find us.
+ * cross our path.
*/
search_set:
- queue_iterate(q, wql, wait_queue_link_t, wql_sublinks) {
+ while(!queue_empty(q)) {
+ wait_queue_link_t wql;
+ ipc_mqueue_t port_mq;
+
+ queue_remove_first(q, wql, wait_queue_link_t, wql_preposts);
+ assert(!wql_is_preposted(wql));
+
+ /*
+ * This is a lock order violation, so we have to do it
+ * "softly," putting the link back on the prepost list
+ * if it fails (at the tail is fine since the order of
+ * handling messages from different sources in a set is
+ * not guaranteed and we'd like to skip to the next source
+ * if one is available).
+ */
port_mq = (ipc_mqueue_t)wql->wql_queue;
- kmsgs = &port_mq->imq_messages;
-
if (!imq_lock_try(port_mq)) {
+ queue_enter(q, wql, wait_queue_link_t, wql_preposts);
imq_unlock(mqueue);
splx(s);
- delay(1);
+ mutex_pause(0);
s = splsched();
imq_lock(mqueue);
goto search_set; /* start again at beginning - SMP */
}
/*
- * If there is still a message to be had, we will
- * try to select it (may not succeed because of size
- * and options). In any case, we deliver those
- * results back to the user.
- *
- * We also move the port's linkage to the tail of the
- * list for this set (fairness). Future versions will
- * sort by timestamp or priority.
+ * If there are no messages on this queue, just skip it
+ * (we already removed the link from the set's prepost queue).
*/
+ kmsgs = &port_mq->imq_messages;
if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
imq_unlock(port_mq);
continue;
}
- queue_remove(q, wql, wait_queue_link_t, wql_sublinks);
- queue_enter(q, wql, wait_queue_link_t, wql_sublinks);
+
+ /*
+ * There are messages, so reinsert the link back
+ * at the tail of the preposted queue (for fairness)
+ * while we still have the portset mqueue locked.
+ */
+ queue_enter(q, wql, wait_queue_link_t, wql_preposts);
imq_unlock(mqueue);
- ipc_mqueue_select(port_mq, option, max_size);
+ /*
+ * Continue on to handling the message with just
+ * the port mqueue locked.
+ */
+ ipc_mqueue_select_on_thread(port_mq, option, max_size, thread);
imq_unlock(port_mq);
+#if CONFIG_MACF_MACH
+ if (thread->task != TASK_NULL &&
+ thread->ith_kmsg != NULL &&
+ thread->ith_kmsg->ikm_sender != NULL) {
+ lh = thread->ith_kmsg->ikm_sender->label;
+ tasklabel_lock(thread->task);
+ ip_lock(lh->lh_port);
+ rc = mac_port_check_receive(&thread->task->maclabel,
+ &lh->lh_label);
+ ip_unlock(lh->lh_port);
+ tasklabel_unlock(thread->task);
+ if (rc)
+ thread->ith_state = MACH_RCV_INVALID_DATA;
+ }
+#endif
splx(s);
- return;
+ return THREAD_NOT_WAITING;
}
*/
kmsgs = &mqueue->imq_messages;
if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
- ipc_mqueue_select(mqueue, option, max_size);
+ ipc_mqueue_select_on_thread(mqueue, option, max_size, thread);
imq_unlock(mqueue);
+#if CONFIG_MACF_MACH
+ if (thread->task != TASK_NULL &&
+ thread->ith_kmsg != NULL &&
+ thread->ith_kmsg->ikm_sender != NULL) {
+ lh = thread->ith_kmsg->ikm_sender->label;
+ tasklabel_lock(thread->task);
+ ip_lock(lh->lh_port);
+ rc = mac_port_check_receive(&thread->task->maclabel,
+ &lh->lh_label);
+ ip_unlock(lh->lh_port);
+ tasklabel_unlock(thread->task);
+ if (rc)
+ thread->ith_state = MACH_RCV_INVALID_DATA;
+ }
+#endif
splx(s);
- return;
+ return THREAD_NOT_WAITING;
}
}
-
+
/*
* Looks like we'll have to block. The mqueue we will
* block on (whether the set's or the local port's) is
* still locked.
*/
- self = current_thread();
if (option & MACH_RCV_TIMEOUT) {
- if (timeout == 0) {
+ if (rcv_timeout == 0) {
imq_unlock(mqueue);
splx(s);
- self->ith_state = MACH_RCV_TIMED_OUT;
- return;
+ thread->ith_state = MACH_RCV_TIMED_OUT;
+ return THREAD_NOT_WAITING;
}
}
- self->ith_state = MACH_RCV_IN_PROGRESS;
- self->ith_option = option;
- self->ith_msize = max_size;
-
- wait_queue_assert_wait_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_RECEIVE,
- interruptible,
- TRUE); /* unlock? */
- /* mqueue/waitq is unlocked */
+ thread_lock(thread);
+ thread->ith_state = MACH_RCV_IN_PROGRESS;
+ thread->ith_option = option;
+ thread->ith_msize = max_size;
+
+ if (option & MACH_RCV_TIMEOUT)
+ clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
+ else
+ deadline = 0;
+
+ wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
+ IPC_MQUEUE_RECEIVE,
+ interruptible, deadline,
+ thread);
+ /* preposts should be detected above, not here */
+ if (wresult == THREAD_AWAKENED)
+ panic("ipc_mqueue_receive_on_thread: sleep walking");
+
+ thread_unlock(thread);
+ imq_unlock(mqueue);
splx(s);
-
- if (option & MACH_RCV_TIMEOUT) {
- thread_set_timer(timeout, 1000*NSEC_PER_USEC);
- }
-
- if (interruptible == THREAD_ABORTSAFE) {
- counter(c_ipc_mqueue_receive_block_user++);
- } else {
- counter(c_ipc_mqueue_receive_block_kernel++);
- }
-
-#if defined (__i386__)
- thread_block((void (*)(void))0);
-#else
- if (self->ith_continuation) {
- thread_block(ipc_mqueue_receive_continue);
- } else {
- thread_block((void (*)(void))0);
- }
-#endif
-
- ipc_mqueue_receive_results(); /* if we fell thru */
+ return wresult;
}
/*
- * Routine: ipc_mqueue_select
+ * Routine: ipc_mqueue_select_on_thread
* Purpose:
* A receiver discovered that there was a message on the queue
* before he had to block. Pick the message off the queue and
- * "post" it to himself.
+ * "post" it to thread.
* Conditions:
* mqueue locked.
+ * thread not locked.
* There is a message.
* Returns:
* MACH_MSG_SUCCESS Actually selected a message for ourselves.
* MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
*/
void
-ipc_mqueue_select(
+ipc_mqueue_select_on_thread(
ipc_mqueue_t mqueue,
mach_msg_option_t option,
- mach_msg_size_t max_size)
+ mach_msg_size_t max_size,
+ thread_t thread)
{
- thread_t self = current_thread();
ipc_kmsg_t kmsg;
- mach_port_seqno_t seqno;
- mach_msg_return_t mr;
-
- mr = MACH_MSG_SUCCESS;
-
+ mach_msg_return_t mr = MACH_MSG_SUCCESS;
+ mach_msg_size_t rcv_size;
/*
* Do some sanity checking of our ability to receive
* before pulling the message off the queue.
*/
kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
-
assert(kmsg != IKM_NULL);
- if (kmsg->ikm_header.msgh_size +
- REQUESTED_TRAILER_SIZE(option) > max_size) {
- mr = MACH_RCV_TOO_LARGE;
- }
-
/*
* If we really can't receive it, but we had the
* MACH_RCV_LARGE option set, then don't take it off
* the queue, instead return the appropriate error
* (and size needed).
*/
- if ((mr == MACH_RCV_TOO_LARGE) && (option & MACH_RCV_LARGE)) {
- self->ith_kmsg = IKM_NULL;
- self->ith_msize = kmsg->ikm_header.msgh_size;
- self->ith_seqno = 0;
- self->ith_state = mr;
- return;
+ rcv_size = ipc_kmsg_copyout_size(kmsg, thread->map);
+ if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
+ mr = MACH_RCV_TOO_LARGE;
+ if (option & MACH_RCV_LARGE) {
+ thread->ith_receiver_name = mqueue->imq_receiver_name;
+ thread->ith_kmsg = IKM_NULL;
+ thread->ith_msize = rcv_size;
+ thread->ith_seqno = 0;
+ thread->ith_state = mr;
+ return;
+ }
}
ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
ipc_mqueue_release_msgcount(mqueue);
- self->ith_seqno = mqueue->imq_seqno++;
- self->ith_kmsg = kmsg;
- self->ith_state = mr;
+ thread->ith_seqno = mqueue->imq_seqno++;
+ thread->ith_kmsg = kmsg;
+ thread->ith_state = mr;
current_task()->messages_received++;
return;
}
+/*
+ * Routine: ipc_mqueue_peek
+ * Purpose:
+ * Peek at a message queue to see if it has any messages
+ * (in it or contained message queues for a set).
+ *
+ * Conditions:
+ * Locks may be held by callers, so this routine cannot block.
+ * Caller holds reference on the message queue.
+ */
+int
+ipc_mqueue_peek(ipc_mqueue_t mq)
+{
+ wait_queue_link_t wql;
+ queue_t q;
+ spl_t s;
+
+ if (!imq_is_set(mq))
+ return (ipc_kmsg_queue_first(&mq->imq_messages) != IKM_NULL);
+
+ /*
+ * Don't block trying to get the lock.
+ */
+ s = splsched();
+ if (!imq_lock_try(mq)) {
+ splx(s);
+ return -1;
+ }
+
+ /*
+ * peek at the contained port message queues, return as soon as
+ * we spot a message on one of the message queues linked on the
+ * prepost list.
+ */
+ q = &mq->imq_preposts;
+ queue_iterate(q, wql, wait_queue_link_t, wql_preposts) {
+ ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
+ ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
+
+ if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
+ imq_unlock(mq);
+ splx(s);
+ return 1;
+ }
+ }
+ imq_unlock(mq);
+ splx(s);
+ return 0;
+}
+
/*
* Routine: ipc_mqueue_destroy
* Purpose:
* rouse all blocked senders
*/
mqueue->imq_fullwaiters = FALSE;
- wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_AWAKENED,
- FALSE);
+ wait_queue_wakeup64_all_locked(
+ &mqueue->imq_wait_queue,
+ IPC_MQUEUE_FULL,
+ THREAD_RESTART,
+ FALSE);
kmqueue = &mqueue->imq_messages;
{
spl_t s;
+ assert(qlimit <= MACH_PORT_QLIMIT_MAX);
+
/* wake up senders allowed by the new qlimit */
s = splsched();
imq_lock(mqueue);
wakeup = qlimit - mqueue->imq_qlimit;
for (i = 0; i < wakeup; i++) {
- if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_AWAKENED,
- FALSE) == KERN_NOT_WAITING) {
+ if (wait_queue_wakeup64_one_locked(
+ &mqueue->imq_wait_queue,
+ IPC_MQUEUE_FULL,
+ THREAD_AWAKENED,
+ FALSE) == KERN_NOT_WAITING) {
mqueue->imq_fullwaiters = FALSE;
break;
}
+ mqueue->imq_msgcount++; /* give it to the awakened thread */
}
}
mqueue->imq_qlimit = qlimit;
if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
ipc_port_t port;
- ipc_pset_t pset;
port = (ipc_port_t) object;
assert(port != IP_NULL);