/*
- * Copyright (c) 2000 Apple Computer, Inc. All rights reserved.
+ * Copyright (c) 2000-2004 Apple Computer, Inc. All rights reserved.
*
* @APPLE_LICENSE_HEADER_START@
*
#include <kern/counters.h>
#include <kern/sched_prim.h>
#include <kern/ipc_kobject.h>
+#include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
#include <kern/misc_protos.h>
#include <kern/task.h>
#include <kern/thread.h>
#define TR_ENABLE 0
+/* forward declarations */
+void ipc_mqueue_receive_results(wait_result_t result);
+
/*
* Routine: ipc_mqueue_init
* Purpose:
boolean_t is_set)
{
if (is_set) {
- wait_queue_sub_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
+ wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
} else {
wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
ipc_kmsg_queue_init(&mqueue->imq_messages);
boolean_t
ipc_mqueue_member(
- ipc_mqueue_t port_mqueue,
- ipc_mqueue_t set_mqueue)
+ ipc_mqueue_t port_mqueue,
+ ipc_mqueue_t set_mqueue)
{
wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
- wait_queue_t set_waitq = &set_mqueue->imq_wait_queue;
+ wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
return (wait_queue_member(port_waitq, set_waitq));
* Routine: ipc_mqueue_remove
* Purpose:
* Remove the association between the queue and the specified
- * subordinate message queue.
+ * set message queue.
*/
kern_return_t
ipc_mqueue_remove(
ipc_mqueue_t mqueue,
- ipc_mqueue_t sub_mqueue)
+ ipc_mqueue_t set_mqueue)
{
wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
- wait_queue_sub_t sub_waitq = &sub_mqueue->imq_set_queue;
+ wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
- if (wait_queue_member(mq_waitq, sub_waitq)) {
- wait_queue_unlink(mq_waitq, sub_waitq);
- return KERN_SUCCESS;
- }
- return KERN_NOT_IN_SET;
+ return wait_queue_unlink(mq_waitq, set_waitq);
}
/*
- * Routine: ipc_mqueue_remove_one
+ * Routine: ipc_mqueue_remove_from_all
* Purpose:
- * Find and remove one subqueue from the queue.
+ * Remove the mqueue from all the sets it is a member of
* Conditions:
- * Will return the set mqueue that was removed
+ * Nothing locked.
*/
void
-ipc_mqueue_remove_one(
- ipc_mqueue_t mqueue,
- ipc_mqueue_t *sub_queuep)
+ipc_mqueue_remove_from_all(
+ ipc_mqueue_t mqueue)
{
wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
- wait_queue_unlink_one(mq_waitq, (wait_queue_sub_t *)sub_queuep);
+ wait_queue_unlink_all(mq_waitq);
+ return;
+}
+
+/*
+ * Routine: ipc_mqueue_remove_all
+ * Purpose:
+ * Remove all the member queues from the specified set.
+ * Conditions:
+ * Nothing locked.
+ */
+void
+ipc_mqueue_remove_all(
+ ipc_mqueue_t mqueue)
+{
+ wait_queue_set_t mq_setq = &mqueue->imq_set_queue;
+
+ wait_queue_set_unlink_all(mq_setq);
return;
}
ipc_mqueue_t set_mqueue)
{
wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
- wait_queue_sub_t set_waitq = &set_mqueue->imq_set_queue;
+ wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
ipc_kmsg_queue_t kmsgq;
ipc_kmsg_t kmsg, next;
kern_return_t kr;
for (;;) {
thread_t th;
- th = wait_queue_wakeup_identity_locked(port_waitq,
- IPC_MQUEUE_RECEIVE,
- THREAD_AWAKENED,
- FALSE);
+ th = wait_queue_wakeup64_identity_locked(
+ port_waitq,
+ IPC_MQUEUE_RECEIVE,
+ THREAD_AWAKENED,
+ FALSE);
/* waitq/mqueue still locked, thread locked */
if (th == THREAD_NULL)
* just move onto the next.
*/
if (th->ith_msize <
- kmsg->ikm_header.msgh_size +
+ kmsg->ikm_header->msgh_size +
REQUESTED_TRAILER_SIZE(th->ith_option)) {
th->ith_state = MACH_RCV_TOO_LARGE;
- th->ith_msize = kmsg->ikm_header.msgh_size;
+ th->ith_msize = kmsg->ikm_header->msgh_size;
if (th->ith_option & MACH_RCV_LARGE) {
/*
* let him go without message
* This thread is going to take this message,
* so give it to him.
*/
- ipc_mqueue_release_msgcount(port_mqueue);
ipc_kmsg_rmqueue(kmsgq, kmsg);
+ ipc_mqueue_release_msgcount(port_mqueue);
+
th->ith_kmsg = kmsg;
th->ith_seqno = port_mqueue->imq_seqno++;
thread_unlock(th);
ipc_mqueue_changed(
ipc_mqueue_t mqueue)
{
- wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_RECEIVE,
- THREAD_RESTART,
- FALSE); /* unlock waitq? */
+ wait_queue_wakeup64_all_locked(
+ &mqueue->imq_wait_queue,
+ IPC_MQUEUE_RECEIVE,
+ THREAD_RESTART,
+ FALSE); /* unlock waitq? */
}
ipc_mqueue_t mqueue,
ipc_kmsg_t kmsg,
mach_msg_option_t option,
- mach_msg_timeout_t timeout)
+ mach_msg_timeout_t send_timeout)
{
- int save_wait_result;
+ int wresult;
spl_t s;
/*
if (!imq_full(mqueue) ||
(option & MACH_SEND_ALWAYS) ||
- (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) ==
+ (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
MACH_MSG_TYPE_PORT_SEND_ONCE)) {
mqueue->imq_msgcount++;
+ assert(mqueue->imq_msgcount > 0);
imq_unlock(mqueue);
splx(s);
} else {
+ thread_t cur_thread = current_thread();
+ uint64_t deadline;
/*
* We have to wait for space to be granted to us.
*/
- if ((option & MACH_SEND_TIMEOUT) && (timeout == 0)) {
+ if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
imq_unlock(mqueue);
splx(s);
return MACH_SEND_TIMED_OUT;
}
mqueue->imq_fullwaiters = TRUE;
- wait_queue_assert_wait_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_ABORTSAFE,
- TRUE); /* unlock? */
- /* wait/mqueue is unlocked */
+ thread_lock(cur_thread);
+ if (option & MACH_SEND_TIMEOUT)
+ clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
+ else
+ deadline = 0;
+ wresult = wait_queue_assert_wait64_locked(
+ &mqueue->imq_wait_queue,
+ IPC_MQUEUE_FULL,
+ THREAD_ABORTSAFE, deadline,
+ cur_thread);
+ thread_unlock(cur_thread);
+ imq_unlock(mqueue);
splx(s);
- if (option & MACH_SEND_TIMEOUT)
- thread_set_timer(timeout, 1000*NSEC_PER_USEC);
-
- counter(c_ipc_mqueue_send_block++);
- save_wait_result = thread_block((void (*)(void)) 0);
+ if (wresult == THREAD_WAITING) {
+ wresult = thread_block(THREAD_CONTINUE_NULL);
+ counter(c_ipc_mqueue_send_block++);
+ }
- switch (save_wait_result) {
+ switch (wresult) {
case THREAD_TIMED_OUT:
assert(option & MACH_SEND_TIMEOUT);
return MACH_SEND_TIMED_OUT;
case THREAD_AWAKENED:
/* we can proceed - inherited msgcount from waker */
- if (option & MACH_SEND_TIMEOUT)
- thread_cancel_timer();
+ assert(mqueue->imq_msgcount > 0);
break;
case THREAD_INTERRUPTED:
- if (option & MACH_SEND_TIMEOUT)
- thread_cancel_timer();
return MACH_SEND_INTERRUPTED;
case THREAD_RESTART:
* found a waiter.
*
* Conditions:
- * The message queue is locked
+ * The message queue is locked.
+ * The message corresponding to this reference is off the queue.
*/
void
ipc_mqueue_release_msgcount(
ipc_mqueue_t mqueue)
{
assert(imq_held(mqueue));
- assert(mqueue->imq_msgcount > 0);
+ assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
mqueue->imq_msgcount--;
+
if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
- if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_AWAKENED,
- FALSE) != KERN_SUCCESS) {
+ if (wait_queue_wakeup64_one_locked(
+ &mqueue->imq_wait_queue,
+ IPC_MQUEUE_FULL,
+ THREAD_AWAKENED,
+ FALSE) != KERN_SUCCESS) {
mqueue->imq_fullwaiters = FALSE;
} else {
- mqueue->imq_msgcount++; /* gave it away */
+ /* gave away our slot - add reference back */
+ mqueue->imq_msgcount++;
}
}
}
wait_queue_t waitq = &mqueue->imq_wait_queue;
thread_t receiver;
- receiver = wait_queue_wakeup_identity_locked(waitq,
- IPC_MQUEUE_RECEIVE,
- THREAD_AWAKENED,
- FALSE);
+ receiver = wait_queue_wakeup64_identity_locked(
+ waitq,
+ IPC_MQUEUE_RECEIVE,
+ THREAD_AWAKENED,
+ FALSE);
/* waitq still locked, thread locked */
if (receiver == THREAD_NULL) {
* the thread we wake up will get that as its status.
*/
if (receiver->ith_msize <
- (kmsg->ikm_header.msgh_size) +
+ (kmsg->ikm_header->msgh_size) +
REQUESTED_TRAILER_SIZE(receiver->ith_option)) {
- receiver->ith_msize = kmsg->ikm_header.msgh_size;
+ receiver->ith_msize = kmsg->ikm_header->msgh_size;
receiver->ith_state = MACH_RCV_TOO_LARGE;
} else {
receiver->ith_state = MACH_MSG_SUCCESS;
}
-kern_return_t
-ipc_mqueue_receive_results(void)
+/* static */ void
+ipc_mqueue_receive_results(wait_result_t saved_wait_result)
{
thread_t self = current_thread();
mach_msg_option_t option = self->ith_option;
- kern_return_t saved_wait_result = self->wait_result;
kern_return_t mr;
/*
return;
case THREAD_INTERRUPTED:
- if (option & MACH_RCV_TIMEOUT)
- thread_cancel_timer();
self->ith_state = MACH_RCV_INTERRUPTED;
return;
case THREAD_RESTART:
/* something bad happened to the port/set */
- if (option & MACH_RCV_TIMEOUT)
- thread_cancel_timer();
self->ith_state = MACH_RCV_PORT_CHANGED;
return;
* We do not need to go select a message, somebody
* handed us one (or a too-large indication).
*/
- if (option & MACH_RCV_TIMEOUT)
- thread_cancel_timer();
-
mr = MACH_MSG_SUCCESS;
switch (self->ith_state) {
}
void
-ipc_mqueue_receive_continue(void)
+ipc_mqueue_receive_continue(
+ __unused void *param,
+ wait_result_t wresult)
{
- ipc_mqueue_receive_results();
+ ipc_mqueue_receive_results(wresult);
mach_msg_receive_continue(); /* hard-coded for now */
}
void
ipc_mqueue_receive(
- ipc_mqueue_t mqueue,
- mach_msg_option_t option,
- mach_msg_size_t max_size,
- mach_msg_timeout_t timeout,
- int interruptible)
+ ipc_mqueue_t mqueue,
+ mach_msg_option_t option,
+ mach_msg_size_t max_size,
+ mach_msg_timeout_t rcv_timeout,
+ int interruptible)
{
- ipc_port_t port;
- mach_msg_return_t mr, mr2;
- ipc_kmsg_queue_t kmsgs;
- kern_return_t save_wait_result;
- thread_t self;
- ipc_kmsg_t *kmsgp;
- mach_port_seqno_t *seqnop;
- spl_t s;
+ ipc_kmsg_queue_t kmsgs;
+ wait_result_t wresult;
+ thread_t self;
+ uint64_t deadline;
+ spl_t s;
s = splsched();
imq_lock(mqueue);
* change will succeed and find us.
*/
search_set:
- queue_iterate(q, wql, wait_queue_link_t, wql_sublinks) {
+ queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
port_mq = (ipc_mqueue_t)wql->wql_queue;
kmsgs = &port_mq->imq_messages;
imq_unlock(port_mq);
continue;
}
- queue_remove(q, wql, wait_queue_link_t, wql_sublinks);
- queue_enter(q, wql, wait_queue_link_t, wql_sublinks);
+ queue_remove(q, wql, wait_queue_link_t, wql_setlinks);
+ queue_enter(q, wql, wait_queue_link_t, wql_setlinks);
imq_unlock(mqueue);
ipc_mqueue_select(port_mq, option, max_size);
*/
self = current_thread();
if (option & MACH_RCV_TIMEOUT) {
- if (timeout == 0) {
+ if (rcv_timeout == 0) {
imq_unlock(mqueue);
splx(s);
self->ith_state = MACH_RCV_TIMED_OUT;
}
}
+ thread_lock(self);
self->ith_state = MACH_RCV_IN_PROGRESS;
self->ith_option = option;
self->ith_msize = max_size;
-
- wait_queue_assert_wait_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_RECEIVE,
- interruptible,
- TRUE); /* unlock? */
- /* mqueue/waitq is unlocked */
+
+ if (option & MACH_RCV_TIMEOUT)
+ clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
+ else
+ deadline = 0;
+
+ wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
+ IPC_MQUEUE_RECEIVE,
+ interruptible, deadline,
+ self);
+ thread_unlock(self);
+ imq_unlock(mqueue);
splx(s);
- if (option & MACH_RCV_TIMEOUT) {
- thread_set_timer(timeout, 1000*NSEC_PER_USEC);
- }
+ if (wresult == THREAD_WAITING) {
+ counter((interruptible == THREAD_ABORTSAFE) ?
+ c_ipc_mqueue_receive_block_user++ :
+ c_ipc_mqueue_receive_block_kernel++);
- if (interruptible == THREAD_ABORTSAFE) {
- counter(c_ipc_mqueue_receive_block_user++);
- } else {
- counter(c_ipc_mqueue_receive_block_kernel++);
- }
+ if (self->ith_continuation)
+ thread_block(ipc_mqueue_receive_continue);
+ /* NOTREACHED */
-#if defined (__i386__)
- thread_block((void (*)(void))0);
-#else
- if (self->ith_continuation) {
- thread_block(ipc_mqueue_receive_continue);
- } else {
- thread_block((void (*)(void))0);
+ wresult = thread_block(THREAD_CONTINUE_NULL);
}
-#endif
-
- ipc_mqueue_receive_results(); /* if we fell thru */
+ ipc_mqueue_receive_results(wresult);
}
{
thread_t self = current_thread();
ipc_kmsg_t kmsg;
- mach_port_seqno_t seqno;
mach_msg_return_t mr;
+ mach_msg_size_t rcv_size;
mr = MACH_MSG_SUCCESS;
* before pulling the message off the queue.
*/
kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
-
assert(kmsg != IKM_NULL);
- if (kmsg->ikm_header.msgh_size +
- REQUESTED_TRAILER_SIZE(option) > max_size) {
- mr = MACH_RCV_TOO_LARGE;
- }
-
/*
* If we really can't receive it, but we had the
* MACH_RCV_LARGE option set, then don't take it off
* the queue, instead return the appropriate error
* (and size needed).
*/
- if ((mr == MACH_RCV_TOO_LARGE) && (option & MACH_RCV_LARGE)) {
- self->ith_kmsg = IKM_NULL;
- self->ith_msize = kmsg->ikm_header.msgh_size;
- self->ith_seqno = 0;
- self->ith_state = mr;
- return;
+ rcv_size = ipc_kmsg_copyout_size(kmsg, self->map);
+ if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
+ mr = MACH_RCV_TOO_LARGE;
+ if (option & MACH_RCV_LARGE) {
+ self->ith_kmsg = IKM_NULL;
+ self->ith_msize = rcv_size;
+ self->ith_seqno = 0;
+ self->ith_state = mr;
+ return;
+ }
}
ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
* rouse all blocked senders
*/
mqueue->imq_fullwaiters = FALSE;
- wait_queue_wakeup_all_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_AWAKENED,
- FALSE);
+ wait_queue_wakeup64_all_locked(
+ &mqueue->imq_wait_queue,
+ IPC_MQUEUE_FULL,
+ THREAD_AWAKENED,
+ FALSE);
kmqueue = &mqueue->imq_messages;
{
spl_t s;
+ assert(qlimit <= MACH_PORT_QLIMIT_MAX);
+
/* wake up senders allowed by the new qlimit */
s = splsched();
imq_lock(mqueue);
wakeup = qlimit - mqueue->imq_qlimit;
for (i = 0; i < wakeup; i++) {
- if (wait_queue_wakeup_one_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_AWAKENED,
- FALSE) == KERN_NOT_WAITING) {
+ if (wait_queue_wakeup64_one_locked(
+ &mqueue->imq_wait_queue,
+ IPC_MQUEUE_FULL,
+ THREAD_AWAKENED,
+ FALSE) == KERN_NOT_WAITING) {
mqueue->imq_fullwaiters = FALSE;
break;
}
+ mqueue->imq_msgcount++; /* give it to the awakened thread */
}
}
mqueue->imq_qlimit = qlimit;
if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
ipc_port_t port;
- ipc_pset_t pset;
port = (ipc_port_t) object;
assert(port != IP_NULL);