* Copyright (c) 2000-2007 Apple Inc. All rights reserved.
*
* @APPLE_OSREFERENCE_LICENSE_HEADER_START@
- *
+ *
* This file contains Original Code and/or Modifications of Original Code
* as defined in and that are subject to the Apple Public Source License
* Version 2.0 (the 'License'). You may not use this file except in
* unlawful or unlicensed copies of an Apple operating system, or to
* circumvent, violate, or enable the circumvention or violation of, any
* terms of an Apple operating system software license agreement.
- *
+ *
* Please obtain a copy of the License at
* http://www.opensource.apple.com/apsl/ and read it before using this file.
- *
+ *
* The Original Code and all software distributed under the License are
* distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
* EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
* FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
* Please see the License for the specific language governing rights and
* limitations under the License.
- *
+ *
* @APPLE_OSREFERENCE_LICENSE_HEADER_END@
*/
/*
* @OSF_FREE_COPYRIGHT@
*/
-/*
+/*
* Mach Operating System
* Copyright (c) 1991,1990,1989 Carnegie Mellon University
* All Rights Reserved.
- *
+ *
* Permission to use, copy, modify and distribute this software and its
* documentation is hereby granted, provided that both the copyright
* notice and this permission notice appear in all copies of the
* software, derivative works or modified versions, and any portions
* thereof, and that both notices appear in supporting documentation.
- *
+ *
* CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
* CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
* ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
- *
+ *
* Carnegie Mellon requests users of this software to return to
- *
+ *
* Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
* School of Computer Science
* Carnegie Mellon University
* Pittsburgh PA 15213-3890
- *
+ *
* any improvements or extensions that they make and grant Carnegie Mellon
* the rights to redistribute these changes.
*/
* is included in support of clause 2.2 (b) of the Apple Public License,
* Version 2.0.
*/
-
+
#include <mach/port.h>
#include <mach/message.h>
#include <kern/counters.h>
#include <kern/sched_prim.h>
#include <kern/ipc_kobject.h>
-#include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
+#include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
#include <kern/misc_protos.h>
#include <kern/task.h>
#include <kern/thread.h>
#include <sys/event.h>
-extern char *proc_name_address(void *p);
+extern char *proc_name_address(void *p);
-int ipc_mqueue_full; /* address is event for queue space */
-int ipc_mqueue_rcv; /* address is event for message arrival */
+int ipc_mqueue_full; /* address is event for queue space */
+int ipc_mqueue_rcv; /* address is event for message arrival */
/* forward declarations */
void ipc_mqueue_receive_results(wait_result_t result);
*/
void
ipc_mqueue_init(
- ipc_mqueue_t mqueue,
- boolean_t is_set)
+ ipc_mqueue_t mqueue,
+ boolean_t is_set)
{
if (is_set) {
waitq_set_init(&mqueue->imq_set_queue,
- SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST,
- NULL, NULL);
+ SYNC_POLICY_FIFO | SYNC_POLICY_PREPOST,
+ NULL, NULL);
} else {
waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO | SYNC_POLICY_PORT);
ipc_kmsg_queue_init(&mqueue->imq_messages);
klist_init(&mqueue->imq_klist);
}
-void ipc_mqueue_deinit(
- ipc_mqueue_t mqueue)
+void
+ipc_mqueue_deinit(
+ ipc_mqueue_t mqueue)
{
boolean_t is_set = imq_is_set(mqueue);
- if (is_set)
+ if (is_set) {
waitq_set_deinit(&mqueue->imq_set_queue);
- else
+ } else {
waitq_deinit(&mqueue->imq_wait_queue);
+ }
}
/*
imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost)
{
*reserved_prepost = waitq_prepost_reserve(&mq->imq_wait_queue, 0,
- WAITQ_KEEP_LOCKED);
-
+ WAITQ_KEEP_LOCKED);
}
boolean_t
ipc_mqueue_member(
- ipc_mqueue_t port_mqueue,
- ipc_mqueue_t set_mqueue)
+ ipc_mqueue_t port_mqueue,
+ ipc_mqueue_t set_mqueue)
{
struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
return waitq_member(port_waitq, set_waitq);
-
}
/*
kern_return_t
ipc_mqueue_remove(
- ipc_mqueue_t mqueue,
- ipc_mqueue_t set_mqueue)
+ ipc_mqueue_t mqueue,
+ ipc_mqueue_t set_mqueue)
{
struct waitq *mq_waitq = &mqueue->imq_wait_queue;
struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
* mqueue unlocked and set links deallocated
*/
void
-ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue)
+ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue)
{
struct waitq *mq_waitq = &mqueue->imq_wait_queue;
kern_return_t kr;
* mqueue unlocked all set links deallocated
*/
void
-ipc_mqueue_remove_all(ipc_mqueue_t mqueue)
+ipc_mqueue_remove_all(ipc_mqueue_t mqueue)
{
struct waitq_set *mq_setq = &mqueue->imq_set_queue;
*/
kern_return_t
ipc_mqueue_add(
- ipc_mqueue_t port_mqueue,
- ipc_mqueue_t set_mqueue,
- uint64_t *reserved_link,
- uint64_t *reserved_prepost)
+ ipc_mqueue_t port_mqueue,
+ ipc_mqueue_t set_mqueue,
+ uint64_t *reserved_link,
+ uint64_t *reserved_prepost)
{
struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
ipc_kmsg_queue_t kmsgq;
ipc_kmsg_t kmsg, next;
- kern_return_t kr;
+ kern_return_t kr;
assert(reserved_link && *reserved_link != 0);
assert(waitqs_is_linked(set_waitq));
*/
kmsgq = &port_mqueue->imq_messages;
for (kmsg = ipc_kmsg_queue_first(kmsgq);
- kmsg != IKM_NULL;
- kmsg = next) {
+ kmsg != IKM_NULL;
+ kmsg = next) {
next = ipc_kmsg_queue_next(kmsgq, kmsg);
for (;;) {
spl_t th_spl;
th = waitq_wakeup64_identify_locked(
- port_waitq,
- IPC_MQUEUE_RECEIVE,
- THREAD_AWAKENED, &th_spl,
- reserved_prepost, WAITQ_ALL_PRIORITIES,
- WAITQ_KEEP_LOCKED);
+ port_waitq,
+ IPC_MQUEUE_RECEIVE,
+ THREAD_AWAKENED, &th_spl,
+ reserved_prepost, WAITQ_ALL_PRIORITIES,
+ WAITQ_KEEP_LOCKED);
/* waitq/mqueue still locked, thread locked */
- if (th == THREAD_NULL)
+ if (th == THREAD_NULL) {
goto leave;
+ }
/*
* If the receiver waited with a facility not directly
* if there are any actual receivers
*/
ipc_mqueue_peek_on_thread(port_mqueue,
- th->ith_option,
- th);
+ th->ith_option,
+ th);
}
thread_unlock(th);
splx(th_spl);
*/
msize = ipc_kmsg_copyout_size(kmsg, th->map);
if (th->ith_rsize <
- (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th), th->ith_option))) {
+ (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th), th->ith_option))) {
th->ith_state = MACH_RCV_TOO_LARGE;
th->ith_msize = msize;
if (th->ith_option & MACH_RCV_LARGE) {
thread_unlock(th);
splx(th_spl);
#if MACH_FLIPC
- if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport))
- flipc_msg_ack(node, port_mqueue, TRUE);
+ if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) {
+ flipc_msg_ack(node, port_mqueue, TRUE);
+ }
#endif
break; /* go to next message */
}
}
- leave:
+leave:
imq_unlock(port_mqueue);
return KERN_SUCCESS;
}
* Conditions:
* The message queue is locked.
*/
-
void
ipc_mqueue_changed(
- ipc_mqueue_t mqueue)
+ ipc_space_t space,
+ ipc_mqueue_t mqueue)
{
- if (IMQ_KLIST_VALID(mqueue)) {
+ if (IMQ_KLIST_VALID(mqueue) && SLIST_FIRST(&mqueue->imq_klist)) {
/*
* Indicate that this message queue is vanishing
*
*
* Fortunately, we really don't need this linkage anymore after this
* point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
+ *
+ * Note: we don't have the space lock here, however, this covers the
+ * case of when a task is terminating the space, triggering
+ * several knote_vanish() calls.
+ *
+ * We don't need the lock to observe that the space is inactive as
+ * we just deactivated it on the same thread.
+ *
+ * We still need to call knote_vanish() so that the knote is
+ * marked with EV_VANISHED or EV_EOF so that the detach step
+ * in filt_machportdetach is skipped correctly.
*/
- knote_vanish(&mqueue->imq_klist);
+ assert(space);
+ knote_vanish(&mqueue->imq_klist, is_active(space));
klist_init(&mqueue->imq_klist);
}
waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_RECEIVE,
- THREAD_RESTART,
- NULL,
- WAITQ_ALL_PRIORITIES,
- WAITQ_KEEP_LOCKED);
+ IPC_MQUEUE_RECEIVE,
+ THREAD_RESTART,
+ NULL,
+ WAITQ_ALL_PRIORITIES,
+ WAITQ_KEEP_LOCKED);
}
*/
mach_msg_return_t
ipc_mqueue_send(
- ipc_mqueue_t mqueue,
- ipc_kmsg_t kmsg,
- mach_msg_option_t option,
+ ipc_mqueue_t mqueue,
+ ipc_kmsg_t kmsg,
+ mach_msg_option_t option,
mach_msg_timeout_t send_timeout)
{
int wresult;
*/
if (!imq_full(mqueue) ||
(!imq_full_kernel(mqueue) &&
- ((option & MACH_SEND_ALWAYS) ||
- (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
- MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
+ ((option & MACH_SEND_ALWAYS) ||
+ (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
+ MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
mqueue->imq_msgcount++;
assert(mqueue->imq_msgcount > 0);
imq_unlock(mqueue);
}
mqueue->imq_fullwaiters = TRUE;
- if (option & MACH_SEND_TIMEOUT)
- clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
- else
+ if (option & MACH_SEND_TIMEOUT) {
+ clock_interval_to_deadline(send_timeout, 1000 * NSEC_PER_USEC, &deadline);
+ } else {
deadline = 0;
+ }
thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
send_turnstile = turnstile_prepare((uintptr_t)port,
- port_send_turnstile_address(port),
- TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
+ port_send_turnstile_address(port),
+ TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
/* Check if the port in is in transit, get the destination port's turnstile */
if (ip_active(port) &&
}
turnstile_update_inheritor(send_turnstile, inheritor,
- TURNSTILE_DELAYED_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
+ TURNSTILE_DELAYED_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
wresult = waitq_assert_wait64_leeway(
- &send_turnstile->ts_waitq,
- IPC_MQUEUE_FULL,
- THREAD_ABORTSAFE,
- TIMEOUT_URGENCY_USER_NORMAL,
- deadline,
- TIMEOUT_NO_LEEWAY);
+ &send_turnstile->ts_waitq,
+ IPC_MQUEUE_FULL,
+ THREAD_ABORTSAFE,
+ TIMEOUT_URGENCY_USER_NORMAL,
+ deadline,
+ TIMEOUT_NO_LEEWAY);
imq_unlock(mqueue);
turnstile_update_inheritor_complete(send_turnstile,
- TURNSTILE_INTERLOCK_NOT_HELD);
+ TURNSTILE_INTERLOCK_NOT_HELD);
if (wresult == THREAD_WAITING) {
wresult = thread_block(THREAD_CONTINUE_NULL);
turnstile_cleanup();
switch (wresult) {
-
case THREAD_AWAKENED:
/*
* we can proceed - inherited msgcount from waker
* The message queue is not locked.
* The caller holds a reference on the message queue.
*/
-extern void ipc_mqueue_override_send(
+extern void
+ipc_mqueue_override_send(
ipc_mqueue_t mqueue,
mach_msg_priority_t override)
{
ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override)) {
- if (IMQ_KLIST_VALID(mqueue))
+ ipc_port_t port = ip_from_mq(mqueue);
+ if (ip_active(port) &&
+ port->ip_receiver_name != MACH_PORT_NULL &&
+ is_active(port->ip_receiver) &&
+ IMQ_KLIST_VALID(mqueue)) {
KNOTE(&mqueue->imq_klist, 0);
+ }
}
- if (!first)
+ if (!first) {
full_queue_empty = TRUE;
+ }
}
imq_unlock(mqueue);
port_mq->imq_msgcount--;
if (!imq_full(port_mq) && port_mq->imq_fullwaiters &&
- send_turnstile != TURNSTILE_NULL) {
+ send_turnstile != TURNSTILE_NULL) {
/*
* boost the priority of the awoken thread
* (WAITQ_PROMOTE_PRIORITY) to ensure it uses
* high priority threads trying to send to this port.
*/
if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
- IPC_MQUEUE_FULL,
- THREAD_AWAKENED,
- WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
+ IPC_MQUEUE_FULL,
+ THREAD_AWAKENED,
+ WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
port_mq->imq_fullwaiters = FALSE;
} else {
/* gave away our slot - add reference back */
mach_msg_size_t msize;
receiver = waitq_wakeup64_identify_locked(waitq,
- IPC_MQUEUE_RECEIVE,
- THREAD_AWAKENED,
- &th_spl,
- &reserved_prepost,
- WAITQ_ALL_PRIORITIES,
- WAITQ_KEEP_LOCKED);
+ IPC_MQUEUE_RECEIVE,
+ THREAD_AWAKENED,
+ &th_spl,
+ &reserved_prepost,
+ WAITQ_ALL_PRIORITIES,
+ WAITQ_KEEP_LOCKED);
/* waitq still locked, thread locked */
if (receiver == THREAD_NULL) {
-
/*
* no receivers; queue kmsg if space still reserved
* Reservations are cancelled when the port goes inactive.
*/
if (mqueue->imq_msgcount > 0) {
if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
- if (IMQ_KLIST_VALID(mqueue))
+ /* if the space is dead there is no point calling KNOTE */
+ ipc_port_t port = ip_from_mq(mqueue);
+ if (ip_active(port) &&
+ port->ip_receiver_name != MACH_PORT_NULL &&
+ is_active(port->ip_receiver) &&
+ IMQ_KLIST_VALID(mqueue)) {
KNOTE(&mqueue->imq_klist, 0);
+ }
}
break;
}
* If the message is too large or the scatter list is too small
* the thread we wake up will get that as its status.
*/
- msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
+ msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
if (receiver->ith_rsize <
- (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver), receiver->ith_option))) {
+ (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver), receiver->ith_option))) {
receiver->ith_msize = msize;
receiver->ith_state = MACH_RCV_TOO_LARGE;
} else {
ipc_mqueue_release_msgcount(mqueue, IMQ_NULL);
#if MACH_FLIPC
- if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport))
- flipc_msg_ack(node, mqueue, TRUE);
+ if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) {
+ flipc_msg_ack(node, mqueue, TRUE);
+ }
#endif
break;
}
/* clear the waitq boost we may have been given */
waitq_clear_promotion_locked(&mqueue->imq_wait_queue, current_thread());
imq_release_and_unlock(mqueue, reserved_prepost);
- if (destroy_msg)
+ if (destroy_msg) {
ipc_kmsg_destroy(kmsg);
+ }
current_task()->messages_sent++;
return;
/* static */ void
ipc_mqueue_receive_results(wait_result_t saved_wait_result)
{
- thread_t self = current_thread();
- mach_msg_option_t option = self->ith_option;
+ thread_t self = current_thread();
+ mach_msg_option_t option = self->ith_option;
/*
* why did we wake up?
imq_lock(mqueue);
wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size,
- rcv_timeout, interruptible,
- self);
+ rcv_timeout, interruptible,
+ self);
/* mqueue unlocked */
- if (wresult == THREAD_NOT_WAITING)
+ if (wresult == THREAD_NOT_WAITING) {
return;
+ }
if (wresult == THREAD_WAITING) {
counter((interruptible == THREAD_ABORTSAFE) ?
- c_ipc_mqueue_receive_block_user++ :
- c_ipc_mqueue_receive_block_kernel++);
+ c_ipc_mqueue_receive_block_user++ :
+ c_ipc_mqueue_receive_block_kernel++);
- if (self->ith_continuation)
+ if (self->ith_continuation) {
thread_block(ipc_mqueue_receive_continue);
- /* NOTREACHED */
+ }
+ /* NOTREACHED */
wresult = thread_block(THREAD_CONTINUE_NULL);
}
ipc_mqueue_receive_results(wresult);
}
-static int mqueue_process_prepost_receive(void *ctx, struct waitq *waitq,
- struct waitq_set *wqset)
+static int
+mqueue_process_prepost_receive(void *ctx, struct waitq *waitq,
+ struct waitq_set *wqset)
{
ipc_mqueue_t port_mq, *pmq_ptr;
* If there are no messages on this queue, skip it and remove
* it from the prepost list
*/
- if (ipc_kmsg_queue_empty(&port_mq->imq_messages))
+ if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
return WQ_ITERATE_INVALIDATE_CONTINUE;
+ }
/*
* There are messages waiting on this port.
* waitq locked.
*/
pmq_ptr = (ipc_mqueue_t *)ctx;
- if (pmq_ptr)
+ if (pmq_ptr) {
*pmq_ptr = port_mq;
+ }
return WQ_ITERATE_BREAK_KEEP_LOCKED;
}
thread_t thread)
{
wait_result_t wresult;
- uint64_t deadline;
+ uint64_t deadline;
struct turnstile *rcv_turnstile = TURNSTILE_NULL;
turnstile_inheritor_t inheritor = NULL;
ipc_mqueue_t port_mq = IMQ_NULL;
(void)waitq_set_iterate_preposts(&mqueue->imq_set_queue,
- &port_mq,
- mqueue_process_prepost_receive);
+ &port_mq,
+ mqueue_process_prepost_receive);
if (port_mq != IMQ_NULL) {
/*
* Continue on to handling the message with just
* the port mqueue locked.
*/
- if (option & MACH_PEEK_MSG)
+ if (option & MACH_PEEK_MSG) {
ipc_mqueue_peek_on_thread(port_mq, option, thread);
- else
+ } else {
ipc_mqueue_select_on_thread(port_mq, mqueue, option,
- max_size, thread);
+ max_size, thread);
+ }
imq_unlock(port_mq);
return THREAD_NOT_WAITING;
*/
kmsgs = &mqueue->imq_messages;
if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
- if (option & MACH_PEEK_MSG)
+ if (option & MACH_PEEK_MSG) {
ipc_mqueue_peek_on_thread(mqueue, option, thread);
- else
+ } else {
ipc_mqueue_select_on_thread(mqueue, IMQ_NULL, option,
- max_size, thread);
+ max_size, thread);
+ }
imq_unlock(mqueue);
return THREAD_NOT_WAITING;
}
} else {
panic("Unknown mqueue type 0x%x: likely memory corruption!\n",
- mqueue->imq_wait_queue.waitq_type);
+ mqueue->imq_wait_queue.waitq_type);
}
/*
thread->ith_rsize = max_size;
thread->ith_msize = 0;
- if (option & MACH_PEEK_MSG)
+ if (option & MACH_PEEK_MSG) {
thread->ith_state = MACH_PEEK_IN_PROGRESS;
- else
+ } else {
thread->ith_state = MACH_RCV_IN_PROGRESS;
+ }
- if (option & MACH_RCV_TIMEOUT)
- clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
- else
+ if (option & MACH_RCV_TIMEOUT) {
+ clock_interval_to_deadline(rcv_timeout, 1000 * NSEC_PER_USEC, &deadline);
+ } else {
deadline = 0;
+ }
/*
* Threads waiting on a port (not portset)
if (imq_is_queue(mqueue)) {
ipc_port_t port = ip_from_mq(mqueue);
rcv_turnstile = turnstile_prepare((uintptr_t)port,
- port_rcv_turnstile_address(port),
- TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
+ port_rcv_turnstile_address(port),
+ TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
if (port->ip_specialreply) {
inheritor = ipc_port_get_special_reply_port_inheritor(port);
}
turnstile_update_inheritor(rcv_turnstile, inheritor,
- (TURNSTILE_INHERITOR_TURNSTILE | TURNSTILE_DELAYED_UPDATE));
+ (TURNSTILE_INHERITOR_TURNSTILE | TURNSTILE_DELAYED_UPDATE));
}
thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_RECEIVE,
- interruptible,
- TIMEOUT_URGENCY_USER_NORMAL,
- deadline,
- TIMEOUT_NO_LEEWAY,
- thread);
+ IPC_MQUEUE_RECEIVE,
+ interruptible,
+ TIMEOUT_URGENCY_USER_NORMAL,
+ deadline,
+ TIMEOUT_NO_LEEWAY,
+ thread);
/* preposts should be detected above, not here */
- if (wresult == THREAD_AWAKENED)
+ if (wresult == THREAD_AWAKENED) {
panic("ipc_mqueue_receive_on_thread: sleep walking");
+ }
imq_unlock(mqueue);
*/
void
ipc_mqueue_select_on_thread(
- ipc_mqueue_t port_mq,
- ipc_mqueue_t set_mq,
- mach_msg_option_t option,
- mach_msg_size_t max_size,
+ ipc_mqueue_t port_mq,
+ ipc_mqueue_t set_mq,
+ mach_msg_option_t option,
+ mach_msg_size_t max_size,
thread_t thread)
{
ipc_kmsg_t kmsg;
ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
#if MACH_FLIPC
- if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport))
- flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
+ if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) {
+ flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
+ }
#endif
ipc_mqueue_release_msgcount(port_mq, set_mq);
thread->ith_seqno = port_mq->imq_seqno++;
*/
unsigned
ipc_mqueue_peek_locked(ipc_mqueue_t mq,
- mach_port_seqno_t * seqnop,
- mach_msg_size_t * msg_sizep,
- mach_msg_id_t * msg_idp,
- mach_msg_max_trailer_t * msg_trailerp,
- ipc_kmsg_t *kmsgp)
+ mach_port_seqno_t * seqnop,
+ mach_msg_size_t * msg_sizep,
+ mach_msg_id_t * msg_idp,
+ mach_msg_max_trailer_t * msg_trailerp,
+ ipc_kmsg_t *kmsgp)
{
ipc_kmsg_queue_t kmsgq;
ipc_kmsg_t kmsg;
assert(!imq_is_set(mq));
seqno = 0;
- if (seqnop != NULL)
+ if (seqnop != NULL) {
seqno = *seqnop;
+ }
if (seqno == 0) {
seqno = mq->imq_seqno;
msgoff = 0;
} else if (seqno >= mq->imq_seqno &&
- seqno < mq->imq_seqno + mq->imq_msgcount) {
+ seqno < mq->imq_seqno + mq->imq_msgcount) {
msgoff = seqno - mq->imq_seqno;
- } else
+ } else {
goto out;
+ }
/* look for the message that would match that seqno */
kmsgq = &mq->imq_messages;
while (msgoff-- && kmsg != IKM_NULL) {
kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
}
- if (kmsg == IKM_NULL)
+ if (kmsg == IKM_NULL) {
goto out;
+ }
/* found one - return the requested info */
- if (seqnop != NULL)
+ if (seqnop != NULL) {
*seqnop = seqno;
- if (msg_sizep != NULL)
+ }
+ if (msg_sizep != NULL) {
*msg_sizep = kmsg->ikm_header->msgh_size;
- if (msg_idp != NULL)
+ }
+ if (msg_idp != NULL) {
*msg_idp = kmsg->ikm_header->msgh_id;
- if (msg_trailerp != NULL)
+ }
+ if (msg_trailerp != NULL) {
memcpy(msg_trailerp,
- (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
- round_msg(kmsg->ikm_header->msgh_size)),
- sizeof(mach_msg_max_trailer_t));
- if (kmsgp != NULL)
+ (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
+ round_msg(kmsg->ikm_header->msgh_size)),
+ sizeof(mach_msg_max_trailer_t));
+ }
+ if (kmsgp != NULL) {
*kmsgp = kmsg;
+ }
res = 1;
*/
unsigned
ipc_mqueue_peek(ipc_mqueue_t mq,
- mach_port_seqno_t * seqnop,
- mach_msg_size_t * msg_sizep,
- mach_msg_id_t * msg_idp,
- mach_msg_max_trailer_t * msg_trailerp,
- ipc_kmsg_t *kmsgp)
+ mach_port_seqno_t * seqnop,
+ mach_msg_size_t * msg_sizep,
+ mach_msg_id_t * msg_idp,
+ mach_msg_max_trailer_t * msg_trailerp,
+ ipc_kmsg_t *kmsgp)
{
unsigned res;
imq_lock(mq);
res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
- msg_trailerp, kmsgp);
+ msg_trailerp, kmsgp);
imq_unlock(mq);
return res;
* (and potentially invalid!)
*
*/
-void ipc_mqueue_release_peek_ref(ipc_mqueue_t mq)
+void
+ipc_mqueue_release_peek_ref(ipc_mqueue_t mq)
{
assert(!imq_is_set(mq));
assert(imq_held(mq));
* queue is checked. If a message wasn't there before we entered here, no need
* to find it (if we do, great).
*/
-static int mqueue_peek_iterator(void *ctx, struct waitq *waitq,
- struct waitq_set *wqset)
+static int
+mqueue_peek_iterator(void *ctx, struct waitq *waitq,
+ struct waitq_set *wqset)
{
ipc_mqueue_t port_mq = (ipc_mqueue_t)waitq;
ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
(void)ctx;
(void)wqset;
- if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL)
+ if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
return WQ_ITERATE_BREAK; /* break out of the prepost iteration */
-
+ }
return WQ_ITERATE_CONTINUE;
}
* as invalid. In that case, even though we don't have messages, we
* have an end-of-life event to deliver.
*/
- if (!imq_is_valid(mq))
+ if (!imq_is_valid(mq)) {
return 1;
+ }
ret = waitq_set_iterate_preposts(&mq->imq_set_queue, NULL,
- mqueue_peek_iterator);
+ mqueue_peek_iterator);
imq_unlock(mq);
- return (ret == WQ_ITERATE_BREAK);
+ return ret == WQ_ITERATE_BREAK;
}
/*
assert(IP_VALID(port));
if (ip_active(port) &&
waitq_member(&mq->imq_wait_queue, wqset)) {
- if (actual < maxnames)
+ if (actual < maxnames) {
names[actual] = mq->imq_receiver_name;
+ }
actual++;
}
}
* Purpose:
* Destroy a (non-set) message queue.
* Set any blocked senders running.
- * Destroy the kmsgs in the queue.
+ * Destroy the kmsgs in the queue.
* Conditions:
* mqueue locked
* Receivers were removed when the receive right was "changed"
if (send_turnstile != TURNSTILE_NULL) {
waitq_wakeup64_all(&send_turnstile->ts_waitq,
- IPC_MQUEUE_FULL,
- THREAD_RESTART,
- WAITQ_ALL_PRIORITIES);
+ IPC_MQUEUE_FULL,
+ THREAD_RESTART,
+ WAITQ_ALL_PRIORITIES);
}
/*
kmqueue = &mqueue->imq_messages;
while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
#if MACH_FLIPC
- if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(mqueue->imq_fport))
- flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
+ if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(mqueue->imq_fport)) {
+ flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
+ }
#endif
boolean_t first;
first = ipc_kmsg_delayed_destroy(kmsg);
- if (first)
+ if (first) {
reap = first;
+ }
}
/*
void
ipc_mqueue_set_qlimit(
- ipc_mqueue_t mqueue,
- mach_port_msgcount_t qlimit)
+ ipc_mqueue_t mqueue,
+ mach_port_msgcount_t qlimit)
{
+ assert(qlimit <= MACH_PORT_QLIMIT_MAX);
- assert(qlimit <= MACH_PORT_QLIMIT_MAX);
-
- /* wake up senders allowed by the new qlimit */
- imq_lock(mqueue);
- if (qlimit > mqueue->imq_qlimit) {
- mach_port_msgcount_t i, wakeup;
- struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
+ /* wake up senders allowed by the new qlimit */
+ imq_lock(mqueue);
+ if (qlimit > mqueue->imq_qlimit) {
+ mach_port_msgcount_t i, wakeup;
+ struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
- /* caution: wakeup, qlimit are unsigned */
- wakeup = qlimit - mqueue->imq_qlimit;
+ /* caution: wakeup, qlimit are unsigned */
+ wakeup = qlimit - mqueue->imq_qlimit;
- for (i = 0; i < wakeup; i++) {
+ for (i = 0; i < wakeup; i++) {
/*
* boost the priority of the awoken thread
* (WAITQ_PROMOTE_PRIORITY) to ensure it uses
*/
if (send_turnstile == TURNSTILE_NULL ||
waitq_wakeup64_one(&send_turnstile->ts_waitq,
- IPC_MQUEUE_FULL,
- THREAD_AWAKENED,
- WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
+ IPC_MQUEUE_FULL,
+ THREAD_AWAKENED,
+ WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
mqueue->imq_fullwaiters = FALSE;
break;
}
mqueue->imq_msgcount++; /* give it to the awakened thread */
- }
+ }
}
mqueue->imq_qlimit = qlimit;
imq_unlock(mqueue);
*/
void
ipc_mqueue_set_seqno(
- ipc_mqueue_t mqueue,
- mach_port_seqno_t seqno)
+ ipc_mqueue_t mqueue,
+ mach_port_seqno_t seqno)
{
imq_lock(mqueue);
mqueue->imq_seqno = seqno;
mach_msg_return_t
ipc_mqueue_copyin(
- ipc_space_t space,
- mach_port_name_t name,
- ipc_mqueue_t *mqueuep,
- ipc_object_t *objectp)
+ ipc_space_t space,
+ mach_port_name_t name,
+ ipc_mqueue_t *mqueuep,
+ ipc_object_t *objectp)
{
ipc_entry_t entry;
ipc_object_t object;
assert(port->ip_receiver == space);
is_read_unlock(space);
mqueue = &port->ip_messages;
-
} else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
ipc_pset_t pset;