/*
- * Copyright (c) 2000 Apple Computer, Inc. All rights reserved.
+ * Copyright (c) 2000-2004 Apple Computer, Inc. All rights reserved.
*
* @APPLE_LICENSE_HEADER_START@
*
- * Copyright (c) 1999-2003 Apple Computer, Inc. All Rights Reserved.
+ * The contents of this file constitute Original Code as defined in and
+ * are subject to the Apple Public Source License Version 1.1 (the
+ * "License"). You may not use this file except in compliance with the
+ * License. Please obtain a copy of the License at
+ * http://www.apple.com/publicsource and read it before using this file.
*
- * This file contains Original Code and/or Modifications of Original Code
- * as defined in and that are subject to the Apple Public Source License
- * Version 2.0 (the 'License'). You may not use this file except in
- * compliance with the License. Please obtain a copy of the License at
- * http://www.opensource.apple.com/apsl/ and read it before using this
- * file.
- *
- * The Original Code and all software distributed under the License are
- * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
+ * This Original Code and all software distributed under the License are
+ * distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, EITHER
* EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
* INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
- * Please see the License for the specific language governing rights and
- * limitations under the License.
+ * FITNESS FOR A PARTICULAR PURPOSE OR NON-INFRINGEMENT. Please see the
+ * License for the specific language governing rights and limitations
+ * under the License.
*
* @APPLE_LICENSE_HEADER_END@
*/
#include <kern/counters.h>
#include <kern/sched_prim.h>
#include <kern/ipc_kobject.h>
+#include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
#include <kern/misc_protos.h>
#include <kern/task.h>
#include <kern/thread.h>
#define TR_ENABLE 0
+/* forward declarations */
+void ipc_mqueue_receive_results(wait_result_t result);
+
/*
* Routine: ipc_mqueue_init
* Purpose:
boolean_t
ipc_mqueue_member(
- ipc_mqueue_t port_mqueue,
- ipc_mqueue_t set_mqueue)
+ ipc_mqueue_t port_mqueue,
+ ipc_mqueue_t set_mqueue)
{
wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
- wait_queue_t set_waitq = &set_mqueue->imq_wait_queue;
+ wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
return (wait_queue_member(port_waitq, set_waitq));
* just move onto the next.
*/
if (th->ith_msize <
- kmsg->ikm_header.msgh_size +
+ kmsg->ikm_header->msgh_size +
REQUESTED_TRAILER_SIZE(th->ith_option)) {
th->ith_state = MACH_RCV_TOO_LARGE;
- th->ith_msize = kmsg->ikm_header.msgh_size;
+ th->ith_msize = kmsg->ikm_header->msgh_size;
if (th->ith_option & MACH_RCV_LARGE) {
/*
* let him go without message
* This thread is going to take this message,
* so give it to him.
*/
- ipc_mqueue_release_msgcount(port_mqueue);
ipc_kmsg_rmqueue(kmsgq, kmsg);
+ ipc_mqueue_release_msgcount(port_mqueue);
+
th->ith_kmsg = kmsg;
th->ith_seqno = port_mqueue->imq_seqno++;
thread_unlock(th);
ipc_mqueue_t mqueue,
ipc_kmsg_t kmsg,
mach_msg_option_t option,
- mach_msg_timeout_t timeout)
+ mach_msg_timeout_t send_timeout)
{
int wresult;
spl_t s;
if (!imq_full(mqueue) ||
(option & MACH_SEND_ALWAYS) ||
- (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) ==
+ (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
MACH_MSG_TYPE_PORT_SEND_ONCE)) {
mqueue->imq_msgcount++;
+ assert(mqueue->imq_msgcount > 0);
imq_unlock(mqueue);
splx(s);
} else {
+ thread_t cur_thread = current_thread();
+ uint64_t deadline;
/*
* We have to wait for space to be granted to us.
*/
- if ((option & MACH_SEND_TIMEOUT) && (timeout == 0)) {
+ if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
imq_unlock(mqueue);
splx(s);
return MACH_SEND_TIMED_OUT;
}
mqueue->imq_fullwaiters = TRUE;
+ thread_lock(cur_thread);
+ if (option & MACH_SEND_TIMEOUT)
+ clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
+ else
+ deadline = 0;
wresult = wait_queue_assert_wait64_locked(
&mqueue->imq_wait_queue,
IPC_MQUEUE_FULL,
- THREAD_ABORTSAFE,
- TRUE); /* unlock? */
- /* wait/mqueue is unlocked */
+ THREAD_ABORTSAFE, deadline,
+ cur_thread);
+ thread_unlock(cur_thread);
+ imq_unlock(mqueue);
splx(s);
if (wresult == THREAD_WAITING) {
- if (option & MACH_SEND_TIMEOUT) {
- thread_set_timer(timeout, 1000*NSEC_PER_USEC);
- wresult = thread_block(THREAD_CONTINUE_NULL);
- if (wresult != THREAD_TIMED_OUT)
- thread_cancel_timer();
- } else {
- wresult = thread_block(THREAD_CONTINUE_NULL);
- }
+ wresult = thread_block(THREAD_CONTINUE_NULL);
counter(c_ipc_mqueue_send_block++);
}
case THREAD_AWAKENED:
/* we can proceed - inherited msgcount from waker */
+ assert(mqueue->imq_msgcount > 0);
break;
case THREAD_INTERRUPTED:
* found a waiter.
*
* Conditions:
- * The message queue is locked
+ * The message queue is locked.
+ * The message corresponding to this reference is off the queue.
*/
void
ipc_mqueue_release_msgcount(
ipc_mqueue_t mqueue)
{
assert(imq_held(mqueue));
- assert(mqueue->imq_msgcount > 0);
+ assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
mqueue->imq_msgcount--;
+
if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
if (wait_queue_wakeup64_one_locked(
&mqueue->imq_wait_queue,
FALSE) != KERN_SUCCESS) {
mqueue->imq_fullwaiters = FALSE;
} else {
- mqueue->imq_msgcount++; /* gave it away */
+ /* gave away our slot - add reference back */
+ mqueue->imq_msgcount++;
}
}
}
* the thread we wake up will get that as its status.
*/
if (receiver->ith_msize <
- (kmsg->ikm_header.msgh_size) +
+ (kmsg->ikm_header->msgh_size) +
REQUESTED_TRAILER_SIZE(receiver->ith_option)) {
- receiver->ith_msize = kmsg->ikm_header.msgh_size;
+ receiver->ith_msize = kmsg->ikm_header->msgh_size;
receiver->ith_state = MACH_RCV_TOO_LARGE;
} else {
receiver->ith_state = MACH_MSG_SUCCESS;
}
-kern_return_t
-ipc_mqueue_receive_results(void)
+/* static */ void
+ipc_mqueue_receive_results(wait_result_t saved_wait_result)
{
thread_t self = current_thread();
mach_msg_option_t option = self->ith_option;
- kern_return_t saved_wait_result = self->wait_result;
kern_return_t mr;
/*
return;
case THREAD_INTERRUPTED:
- if (option & MACH_RCV_TIMEOUT)
- thread_cancel_timer();
self->ith_state = MACH_RCV_INTERRUPTED;
return;
case THREAD_RESTART:
/* something bad happened to the port/set */
- if (option & MACH_RCV_TIMEOUT)
- thread_cancel_timer();
self->ith_state = MACH_RCV_PORT_CHANGED;
return;
* We do not need to go select a message, somebody
* handed us one (or a too-large indication).
*/
- if (option & MACH_RCV_TIMEOUT)
- thread_cancel_timer();
-
mr = MACH_MSG_SUCCESS;
switch (self->ith_state) {
}
void
-ipc_mqueue_receive_continue(void)
+ipc_mqueue_receive_continue(
+ __unused void *param,
+ wait_result_t wresult)
{
- ipc_mqueue_receive_results();
+ ipc_mqueue_receive_results(wresult);
mach_msg_receive_continue(); /* hard-coded for now */
}
void
ipc_mqueue_receive(
- ipc_mqueue_t mqueue,
- mach_msg_option_t option,
- mach_msg_size_t max_size,
- mach_msg_timeout_t timeout,
- int interruptible)
+ ipc_mqueue_t mqueue,
+ mach_msg_option_t option,
+ mach_msg_size_t max_size,
+ mach_msg_timeout_t rcv_timeout,
+ int interruptible)
{
- ipc_port_t port;
- mach_msg_return_t mr, mr2;
- ipc_kmsg_queue_t kmsgs;
- wait_result_t wresult;
- thread_t self;
- ipc_kmsg_t *kmsgp;
- mach_port_seqno_t *seqnop;
- spl_t s;
+ ipc_kmsg_queue_t kmsgs;
+ wait_result_t wresult;
+ thread_t self;
+ uint64_t deadline;
+ spl_t s;
s = splsched();
imq_lock(mqueue);
*/
self = current_thread();
if (option & MACH_RCV_TIMEOUT) {
- if (timeout == 0) {
+ if (rcv_timeout == 0) {
imq_unlock(mqueue);
splx(s);
self->ith_state = MACH_RCV_TIMED_OUT;
}
}
+ thread_lock(self);
self->ith_state = MACH_RCV_IN_PROGRESS;
self->ith_option = option;
self->ith_msize = max_size;
-
+
+ if (option & MACH_RCV_TIMEOUT)
+ clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
+ else
+ deadline = 0;
+
wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_RECEIVE,
- interruptible,
- TRUE); /* unlock? */
- /* mqueue/waitq is unlocked */
+ IPC_MQUEUE_RECEIVE,
+ interruptible, deadline,
+ self);
+ thread_unlock(self);
+ imq_unlock(mqueue);
splx(s);
if (wresult == THREAD_WAITING) {
- if (option & MACH_RCV_TIMEOUT)
- thread_set_timer(timeout, 1000*NSEC_PER_USEC);
-
- if (interruptible == THREAD_ABORTSAFE)
- counter(c_ipc_mqueue_receive_block_user++);
- else
- counter(c_ipc_mqueue_receive_block_kernel++);
+ counter((interruptible == THREAD_ABORTSAFE) ?
+ c_ipc_mqueue_receive_block_user++ :
+ c_ipc_mqueue_receive_block_kernel++);
if (self->ith_continuation)
thread_block(ipc_mqueue_receive_continue);
/* NOTREACHED */
- thread_block(THREAD_CONTINUE_NULL);
+ wresult = thread_block(THREAD_CONTINUE_NULL);
}
- ipc_mqueue_receive_results();
+ ipc_mqueue_receive_results(wresult);
}
{
thread_t self = current_thread();
ipc_kmsg_t kmsg;
- mach_port_seqno_t seqno;
mach_msg_return_t mr;
+ mach_msg_size_t rcv_size;
mr = MACH_MSG_SUCCESS;
* before pulling the message off the queue.
*/
kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
-
assert(kmsg != IKM_NULL);
- if (kmsg->ikm_header.msgh_size +
- REQUESTED_TRAILER_SIZE(option) > max_size) {
- mr = MACH_RCV_TOO_LARGE;
- }
-
/*
* If we really can't receive it, but we had the
* MACH_RCV_LARGE option set, then don't take it off
* the queue, instead return the appropriate error
* (and size needed).
*/
- if ((mr == MACH_RCV_TOO_LARGE) && (option & MACH_RCV_LARGE)) {
- self->ith_kmsg = IKM_NULL;
- self->ith_msize = kmsg->ikm_header.msgh_size;
- self->ith_seqno = 0;
- self->ith_state = mr;
- return;
+ rcv_size = ipc_kmsg_copyout_size(kmsg, self->map);
+ if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
+ mr = MACH_RCV_TOO_LARGE;
+ if (option & MACH_RCV_LARGE) {
+ self->ith_kmsg = IKM_NULL;
+ self->ith_msize = rcv_size;
+ self->ith_seqno = 0;
+ self->ith_state = mr;
+ return;
+ }
}
ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
{
spl_t s;
+ assert(qlimit <= MACH_PORT_QLIMIT_MAX);
+
/* wake up senders allowed by the new qlimit */
s = splsched();
imq_lock(mqueue);
mqueue->imq_fullwaiters = FALSE;
break;
}
+ mqueue->imq_msgcount++; /* give it to the awakened thread */
}
}
mqueue->imq_qlimit = qlimit;
if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
ipc_port_t port;
- ipc_pset_t pset;
port = (ipc_port_t) object;
assert(port != IP_NULL);