#include <kern/misc_protos.h>
#include <kern/task.h>
#include <kern/thread.h>
-#include <kern/wait_queue.h>
+#include <kern/waitq.h>
#include <ipc/ipc_mqueue.h>
#include <ipc/ipc_kmsg.h>
void
ipc_mqueue_init(
ipc_mqueue_t mqueue,
- boolean_t is_set)
+ boolean_t is_set,
+ uint64_t *reserved_link)
{
if (is_set) {
- wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST);
+ waitq_set_init(&mqueue->imq_set_queue,
+ SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST|SYNC_POLICY_DISABLE_IRQ,
+ reserved_link);
} else {
- wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
+ waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO|SYNC_POLICY_DISABLE_IRQ);
ipc_kmsg_queue_init(&mqueue->imq_messages);
mqueue->imq_seqno = 0;
mqueue->imq_msgcount = 0;
}
}
+void ipc_mqueue_deinit(
+ ipc_mqueue_t mqueue)
+{
+ boolean_t is_set = imq_is_set(mqueue);
+
+ if (is_set)
+ waitq_set_deinit(&mqueue->imq_set_queue);
+ else
+ waitq_deinit(&mqueue->imq_wait_queue);
+}
+
+/*
+ * Routine: imq_reserve_and_lock
+ * Purpose:
+ * Atomically lock an ipc_mqueue_t object and reserve
+ * an appropriate number of prepost linkage objects for
+ * use in wakeup operations.
+ * Conditions:
+ * mq is unlocked
+ */
+void
+imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost, spl_t *spl)
+{
+ *reserved_prepost = waitq_prepost_reserve(&mq->imq_wait_queue, 0,
+ WAITQ_KEEP_LOCKED, spl);
+
+}
+
+
+/*
+ * Routine: imq_release_and_unlock
+ * Purpose:
+ * Unlock an ipc_mqueue_t object, re-enable interrupts,
+ * and release any unused prepost object reservations.
+ * Conditions:
+ * mq is locked
+ */
+void
+imq_release_and_unlock(ipc_mqueue_t mq, uint64_t reserved_prepost, spl_t spl)
+{
+ assert(imq_held(mq));
+ waitq_unlock(&mq->imq_wait_queue);
+ splx(spl);
+ waitq_prepost_release_reserve(reserved_prepost);
+}
+
+
/*
* Routine: ipc_mqueue_member
* Purpose:
ipc_mqueue_t port_mqueue,
ipc_mqueue_t set_mqueue)
{
- wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
- wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
+ struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
+ struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
- return (wait_queue_member(port_waitq, set_waitq));
+ return waitq_member(port_waitq, set_waitq);
}
kern_return_t
ipc_mqueue_remove(
ipc_mqueue_t mqueue,
- ipc_mqueue_t set_mqueue,
- wait_queue_link_t *wqlp)
+ ipc_mqueue_t set_mqueue)
{
- wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
- wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
+ struct waitq *mq_waitq = &mqueue->imq_wait_queue;
+ struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
- return wait_queue_unlink_nofree(mq_waitq, set_waitq, wqlp);
+ return waitq_unlink(mq_waitq, set_waitq);
}
/*
* Nothing locked.
*/
void
-ipc_mqueue_remove_from_all(
- ipc_mqueue_t mqueue,
- queue_t links)
+ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue)
{
- wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
+ struct waitq *mq_waitq = &mqueue->imq_wait_queue;
- wait_queue_unlink_all_nofree(mq_waitq, links);
+ waitq_unlink_all(mq_waitq);
return;
}
* Routine: ipc_mqueue_remove_all
* Purpose:
* Remove all the member queues from the specified set.
+ * Also removes the queue from any containing sets.
* Conditions:
* Nothing locked.
*/
void
-ipc_mqueue_remove_all(
- ipc_mqueue_t mqueue,
- queue_t links)
+ipc_mqueue_remove_all(ipc_mqueue_t mqueue)
{
- wait_queue_set_t mq_setq = &mqueue->imq_set_queue;
-
- wait_queue_set_unlink_all_nofree(mq_setq, links);
+ struct waitq_set *mq_setq = &mqueue->imq_set_queue;
+ waitq_set_unlink_all(mq_setq);
return;
}
*/
kern_return_t
ipc_mqueue_add(
- ipc_mqueue_t port_mqueue,
- ipc_mqueue_t set_mqueue,
- wait_queue_link_t wql)
+ ipc_mqueue_t port_mqueue,
+ ipc_mqueue_t set_mqueue,
+ uint64_t *reserved_link,
+ uint64_t *reserved_prepost)
{
- wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
- wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
+ struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
+ struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
ipc_kmsg_queue_t kmsgq;
ipc_kmsg_t kmsg, next;
kern_return_t kr;
spl_t s;
- kr = wait_queue_link_noalloc(port_waitq, set_waitq, wql);
- if (kr != KERN_SUCCESS)
+ assert(reserved_link && *reserved_link != 0);
+
+ s = splsched();
+ imq_lock(port_mqueue);
+
+ /*
+ * The link operation is now under the same lock-hold as
+ * message iteration and thread wakeup, but doesn't have to be...
+ */
+ kr = waitq_link(port_waitq, set_waitq, WAITQ_ALREADY_LOCKED, reserved_link);
+ if (kr != KERN_SUCCESS) {
+ imq_unlock(port_mqueue);
+ splx(s);
return kr;
+ }
/*
* Now that the set has been added to the port, there may be
* messages queued on the port and threads waiting on the set
* waitq. Lets get them together.
*/
- s = splsched();
- imq_lock(port_mqueue);
kmsgq = &port_mqueue->imq_messages;
for (kmsg = ipc_kmsg_queue_first(kmsgq);
kmsg != IKM_NULL;
for (;;) {
thread_t th;
mach_msg_size_t msize;
+ spl_t th_spl;
- th = wait_queue_wakeup64_identity_locked(
+ th = waitq_wakeup64_identity_locked(
port_waitq,
IPC_MQUEUE_RECEIVE,
- THREAD_AWAKENED,
- FALSE);
+ THREAD_AWAKENED, &th_spl,
+ reserved_prepost, WAITQ_KEEP_LOCKED);
/* waitq/mqueue still locked, thread locked */
if (th == THREAD_NULL)
*/
if (th->ith_state != MACH_RCV_IN_PROGRESS) {
thread_unlock(th);
+ splx(th_spl);
continue;
}
th->ith_kmsg = IKM_NULL;
th->ith_seqno = 0;
thread_unlock(th);
+ splx(th_spl);
continue; /* find another thread */
}
} else {
* so give it to him.
*/
ipc_kmsg_rmqueue(kmsgq, kmsg);
- ipc_mqueue_release_msgcount(port_mqueue);
+ ipc_mqueue_release_msgcount(port_mqueue, IMQ_NULL);
th->ith_kmsg = kmsg;
th->ith_seqno = port_mqueue->imq_seqno++;
thread_unlock(th);
+ splx(th_spl);
break; /* go to next message */
}
-
}
leave:
imq_unlock(port_mqueue);
ipc_mqueue_changed(
ipc_mqueue_t mqueue)
{
- wait_queue_wakeup64_all_locked(
- &mqueue->imq_wait_queue,
- IPC_MQUEUE_RECEIVE,
- THREAD_RESTART,
- FALSE); /* unlock waitq? */
+ waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
+ IPC_MQUEUE_RECEIVE,
+ THREAD_RESTART,
+ NULL,
+ WAITQ_ALL_PRIORITIES,
+ WAITQ_KEEP_LOCKED);
}
clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
else
deadline = 0;
- wresult = wait_queue_assert_wait64_locked(
+ wresult = waitq_assert_wait64_locked(
&mqueue->imq_wait_queue,
IPC_MQUEUE_FULL,
THREAD_ABORTSAFE,
TIMEOUT_URGENCY_USER_NORMAL,
- deadline, 0,
+ deadline, TIMEOUT_NO_LEEWAY,
cur_thread);
thread_unlock(cur_thread);
imq_unlock(mqueue);
}
switch (wresult) {
+
+ case THREAD_AWAKENED:
+ /*
+ * we can proceed - inherited msgcount from waker
+ * or the message queue has been destroyed and the msgcount
+ * has been reset to zero (will detect in ipc_mqueue_post()).
+ */
+ break;
+
case THREAD_TIMED_OUT:
assert(option & MACH_SEND_TIMEOUT);
return MACH_SEND_TIMED_OUT;
- case THREAD_AWAKENED:
- /* we can proceed - inherited msgcount from waker */
- assert(mqueue->imq_msgcount > 0);
- break;
-
case THREAD_INTERRUPTED:
return MACH_SEND_INTERRUPTED;
* Conditions:
* The message queue is locked.
* The message corresponding to this reference is off the queue.
+ * There is no need to pass reserved preposts because this will
+ * never prepost to anyone
*/
void
-ipc_mqueue_release_msgcount(
- ipc_mqueue_t mqueue)
+ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq)
{
- assert(imq_held(mqueue));
- assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
+ (void)set_mq;
+ assert(imq_held(port_mq));
+ assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
- mqueue->imq_msgcount--;
+ port_mq->imq_msgcount--;
- if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
- if (wait_queue_wakeup64_one_locked(
- &mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_AWAKENED,
- FALSE) != KERN_SUCCESS) {
- mqueue->imq_fullwaiters = FALSE;
+ if (!imq_full(port_mq) && port_mq->imq_fullwaiters) {
+ /*
+ * boost the priority of the awoken thread
+ * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
+ * the message queue slot we've just reserved.
+ *
+ * NOTE: this will never prepost
+ */
+ if (waitq_wakeup64_one_locked(&port_mq->imq_wait_queue,
+ IPC_MQUEUE_FULL,
+ THREAD_AWAKENED,
+ NULL,
+ WAITQ_PROMOTE_PRIORITY,
+ WAITQ_KEEP_LOCKED) != KERN_SUCCESS) {
+ port_mq->imq_fullwaiters = FALSE;
} else {
/* gave away our slot - add reference back */
- mqueue->imq_msgcount++;
+ port_mq->imq_msgcount++;
}
}
+
+ if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
+ /* no more msgs: invalidate the port's prepost object */
+ waitq_clear_prepost_locked(&port_mq->imq_wait_queue, NULL);
+ }
}
/*
* the message queue.
*
* Conditions:
+ * mqueue is unlocked
* If we need to queue, our space in the message queue is reserved.
*/
void
register ipc_kmsg_t kmsg)
{
spl_t s;
+ uint64_t reserved_prepost = 0;
/*
* While the msg queue is locked, we have control of the
*
* Check for a receiver for the message.
*/
- s = splsched();
- imq_lock(mqueue);
+ imq_reserve_and_lock(mqueue, &reserved_prepost, &s);
for (;;) {
- wait_queue_t waitq = &mqueue->imq_wait_queue;
+ struct waitq *waitq = &mqueue->imq_wait_queue;
+ spl_t th_spl;
thread_t receiver;
mach_msg_size_t msize;
- receiver = wait_queue_wakeup64_identity_locked(
- waitq,
- IPC_MQUEUE_RECEIVE,
- THREAD_AWAKENED,
- FALSE);
+ receiver = waitq_wakeup64_identity_locked(waitq,
+ IPC_MQUEUE_RECEIVE,
+ THREAD_AWAKENED,
+ &th_spl,
+ &reserved_prepost,
+ WAITQ_KEEP_LOCKED);
/* waitq still locked, thread locked */
if (receiver == THREAD_NULL) {
+
/*
- * no receivers; queue kmsg
+ * no receivers; queue kmsg if space still reserved.
*/
- assert(mqueue->imq_msgcount > 0);
- ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
- break;
+ if (mqueue->imq_msgcount > 0) {
+ ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
+ break;
+ }
+
+ /*
+ * Otherwise, the message queue must belong to an inactive
+ * port, so just destroy the message and pretend it was posted.
+ */
+ /* clear the waitq boost we may have been given */
+ waitq_clear_promotion_locked(waitq, current_thread());
+ imq_release_and_unlock(mqueue, reserved_prepost, s);
+ ipc_kmsg_destroy(kmsg);
+ current_task()->messages_sent++;
+ return;
}
/*
*/
if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
thread_unlock(receiver);
+ splx(th_spl);
continue;
}
receiver->ith_kmsg = kmsg;
receiver->ith_seqno = mqueue->imq_seqno++;
thread_unlock(receiver);
+ splx(th_spl);
/* we didn't need our reserved spot in the queue */
- ipc_mqueue_release_msgcount(mqueue);
+ ipc_mqueue_release_msgcount(mqueue, IMQ_NULL);
break;
}
receiver->ith_kmsg = IKM_NULL;
receiver->ith_seqno = 0;
thread_unlock(receiver);
+ splx(th_spl);
}
- imq_unlock(mqueue);
- splx(s);
+ /* clear the waitq boost we may have been given */
+ waitq_clear_promotion_locked(&mqueue->imq_wait_queue, current_thread());
+ imq_release_and_unlock(mqueue, reserved_prepost, s);
current_task()->messages_sent++;
return;
ipc_mqueue_receive_results(wresult);
}
+static int mqueue_process_prepost_receive(void *ctx, struct waitq *waitq,
+ struct waitq_set *wqset)
+{
+ ipc_mqueue_t port_mq, *pmq_ptr;
+
+ (void)wqset;
+ port_mq = (ipc_mqueue_t)waitq;
+
+ /*
+ * If there are no messages on this queue, skip it and remove
+ * it from the prepost list
+ */
+ if (ipc_kmsg_queue_empty(&port_mq->imq_messages))
+ return WQ_ITERATE_INVALIDATE_CONTINUE;
+
+ /*
+ * There are messages waiting on this port.
+ * Instruct the prepost iteration logic to break, but keep the
+ * waitq locked.
+ */
+ pmq_ptr = (ipc_mqueue_t *)ctx;
+ if (pmq_ptr)
+ *pmq_ptr = port_mq;
+ return WQ_ITERATE_BREAK_KEEP_LOCKED;
+}
+
wait_result_t
ipc_mqueue_receive_on_thread(
ipc_mqueue_t mqueue,
int interruptible,
thread_t thread)
{
- ipc_kmsg_queue_t kmsgs;
wait_result_t wresult;
uint64_t deadline;
spl_t s;
s = splsched();
imq_lock(mqueue);
+ /* no need to reserve anything: we never prepost to anyone */
if (imq_is_set(mqueue)) {
- queue_t q;
-
- q = &mqueue->imq_preposts;
+ ipc_mqueue_t port_mq = IMQ_NULL;
+ spl_t set_spl;
- /*
- * If we are waiting on a portset mqueue, we need to see if
- * any of the member ports have work for us. Ports that
- * have (or recently had) messages will be linked in the
- * prepost queue for the portset. By holding the portset's
- * mqueue lock during the search, we tie up any attempts by
- * mqueue_deliver or portset membership changes that may
- * cross our path.
- */
- search_set:
- while(!queue_empty(q)) {
- wait_queue_link_t wql;
- ipc_mqueue_t port_mq;
-
- queue_remove_first(q, wql, wait_queue_link_t, wql_preposts);
- assert(!wql_is_preposted(wql));
-
- /*
- * This is a lock order violation, so we have to do it
- * "softly," putting the link back on the prepost list
- * if it fails (at the tail is fine since the order of
- * handling messages from different sources in a set is
- * not guaranteed and we'd like to skip to the next source
- * if one is available).
- */
- port_mq = (ipc_mqueue_t)wql->wql_queue;
- if (!imq_lock_try(port_mq)) {
- queue_enter(q, wql, wait_queue_link_t, wql_preposts);
- imq_unlock(mqueue);
- splx(s);
- mutex_pause(0);
- s = splsched();
- imq_lock(mqueue);
- goto search_set; /* start again at beginning - SMP */
- }
+ (void)waitq_set_iterate_preposts(&mqueue->imq_set_queue,
+ &port_mq,
+ mqueue_process_prepost_receive,
+ &set_spl);
+ if (port_mq != IMQ_NULL) {
/*
- * If there are no messages on this queue, just skip it
- * (we already removed the link from the set's prepost queue).
+ * We get here if there is at least one message
+ * waiting on port_mq. We have instructed the prepost
+ * iteration logic to leave both the port_mq and the
+ * set mqueue locked.
+ *
+ * TODO: previously, we would place this port at the
+ * back of the prepost list...
*/
- kmsgs = &port_mq->imq_messages;
- if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
- imq_unlock(port_mq);
- continue;
- }
+ imq_unlock(mqueue);
- /*
- * There are messages, so reinsert the link back
- * at the tail of the preposted queue (for fairness)
- * while we still have the portset mqueue locked.
+ /* TODO: if/when port mqueues become non irq safe,
+ * we won't need this spl, and we should be
+ * able to call splx(s) (if that's even
+ * necessary).
+ * For now, we've still disabled interrupts via
+ * imq_reserve_and_lock();
*/
- queue_enter(q, wql, wait_queue_link_t, wql_preposts);
- imq_unlock(mqueue);
+ splx(set_spl);
/*
* Continue on to handling the message with just
* the port mqueue locked.
*/
- ipc_mqueue_select_on_thread(port_mq, option, max_size, thread);
+ ipc_mqueue_select_on_thread(port_mq, mqueue, option,
+ max_size, thread);
+
imq_unlock(port_mq);
splx(s);
return THREAD_NOT_WAITING;
-
}
-
} else {
+ ipc_kmsg_queue_t kmsgs;
/*
* Receive on a single port. Just try to get the messages.
*/
kmsgs = &mqueue->imq_messages;
if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
- ipc_mqueue_select_on_thread(mqueue, option, max_size, thread);
+ ipc_mqueue_select_on_thread(mqueue, IMQ_NULL, option,
+ max_size, thread);
imq_unlock(mqueue);
splx(s);
return THREAD_NOT_WAITING;
}
}
+ /* NOTE: need splsched() here if mqueue no longer needs irq disabled */
thread_lock(thread);
thread->ith_state = MACH_RCV_IN_PROGRESS;
thread->ith_option = option;
else
deadline = 0;
- wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_RECEIVE,
- interruptible,
- TIMEOUT_URGENCY_USER_NORMAL,
- deadline, 0,
- thread);
+ wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue,
+ IPC_MQUEUE_RECEIVE,
+ interruptible,
+ TIMEOUT_URGENCY_USER_NORMAL,
+ deadline,
+ TIMEOUT_NO_LEEWAY,
+ thread);
/* preposts should be detected above, not here */
if (wresult == THREAD_AWAKENED)
panic("ipc_mqueue_receive_on_thread: sleep walking");
* mqueue locked.
* thread not locked.
* There is a message.
+ * No need to reserve prepost objects - it will never prepost
+ *
* Returns:
* MACH_MSG_SUCCESS Actually selected a message for ourselves.
* MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
*/
void
ipc_mqueue_select_on_thread(
- ipc_mqueue_t mqueue,
+ ipc_mqueue_t port_mq,
+ ipc_mqueue_t set_mq,
mach_msg_option_t option,
mach_msg_size_t max_size,
thread_t thread)
* Do some sanity checking of our ability to receive
* before pulling the message off the queue.
*/
- kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
+ kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
assert(kmsg != IKM_NULL);
/*
if (rcv_size + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) {
mr = MACH_RCV_TOO_LARGE;
if (option & MACH_RCV_LARGE) {
- thread->ith_receiver_name = mqueue->imq_receiver_name;
+ thread->ith_receiver_name = port_mq->imq_receiver_name;
thread->ith_kmsg = IKM_NULL;
thread->ith_msize = rcv_size;
thread->ith_seqno = 0;
}
}
- ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
- ipc_mqueue_release_msgcount(mqueue);
- thread->ith_seqno = mqueue->imq_seqno++;
+ ipc_kmsg_rmqueue_first_macro(&port_mq->imq_messages, kmsg);
+ ipc_mqueue_release_msgcount(port_mq, set_mq);
+ thread->ith_seqno = port_mq->imq_seqno++;
thread->ith_kmsg = kmsg;
thread->ith_state = mr;
* Caller holds reference on the message queue.
*/
unsigned
-ipc_mqueue_peek(ipc_mqueue_t mq,
- mach_port_seqno_t *seqnop,
- mach_msg_size_t *msg_sizep,
- mach_msg_id_t *msg_idp,
- mach_msg_max_trailer_t *msg_trailerp)
+ipc_mqueue_peek(ipc_mqueue_t mq,
+ mach_port_seqno_t * seqnop,
+ mach_msg_size_t * msg_sizep,
+ mach_msg_id_t * msg_idp,
+ mach_msg_max_trailer_t * msg_trailerp)
{
ipc_kmsg_queue_t kmsgq;
- ipc_kmsg_t kmsg;
+ ipc_kmsg_t kmsg;
mach_port_seqno_t seqno, msgoff;
int res = 0;
spl_t s;
s = splsched();
imq_lock(mq);
- seqno = (seqnop != NULL) ? seqno = *seqnop : 0;
+ seqno = 0;
+ if (seqnop != NULL)
+ seqno = *seqnop;
if (seqno == 0) {
seqno = mq->imq_seqno;
return res;
}
+
+/*
+ * peek at the contained port message queues, break prepost iteration as soon
+ * as we spot a message on one of the message queues referenced by the set's
+ * prepost list. No need to lock each message queue, as only the head of each
+ * queue is checked. If a message wasn't there before we entered here, no need
+ * to find it (if we do, great).
+ */
+static int mqueue_peek_iterator(void *ctx, struct waitq *waitq,
+ struct waitq_set *wqset)
+{
+ ipc_mqueue_t port_mq = (ipc_mqueue_t)waitq;
+ ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
+
+ (void)ctx;
+ (void)wqset;
+
+ if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL)
+ return WQ_ITERATE_BREAK; /* break out of the prepost iteration */
+
+ return WQ_ITERATE_CONTINUE;
+}
+
/*
* Routine: ipc_mqueue_set_peek
* Purpose:
unsigned
ipc_mqueue_set_peek(ipc_mqueue_t mq)
{
- wait_queue_link_t wql;
- queue_t q;
spl_t s;
- int res;
+ int ret;
assert(imq_is_set(mq));
s = splsched();
imq_lock(mq);
- /*
- * peek at the contained port message queues, return as soon as
- * we spot a message on one of the message queues linked on the
- * prepost list. No need to lock each message queue, as only the
- * head of each queue is checked. If a message wasn't there before
- * we entered here, no need to find it (if we do, great).
- */
- res = 0;
- q = &mq->imq_preposts;
- queue_iterate(q, wql, wait_queue_link_t, wql_preposts) {
- ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
- ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
-
- if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
- res = 1;
- break;
- }
- }
+ ret = waitq_set_iterate_preposts(&mq->imq_set_queue, NULL,
+ mqueue_peek_iterator, NULL);
+
imq_unlock(mq);
splx(s);
- return res;
+ return (ret == WQ_ITERATE_BREAK);
}
/*
* Routine: ipc_mqueue_set_gather_member_names
* Purpose:
- * Iterate a message queue set to identify the member port
- * names. Actual returned names is limited to maxnames entries,
- * but we keep counting the actual number of members to let
- * the caller decide to retry if necessary.
+ * Discover all ports which are members of a given port set.
+ * Because the waitq linkage mechanism was redesigned to save
+ * significan amounts of memory, it no longer keeps back-pointers
+ * from a port set to a port. Therefore, we must iterate over all
+ * ports within a given IPC space and individually query them to
+ * see if they are members of the given set. Port names of ports
+ * found to be members of the given set will be gathered into the
+ * provided 'names' array. Actual returned names are limited to
+ * maxnames entries, but we keep counting the actual number of
+ * members to let the caller decide to retry if necessary.
*
* Conditions:
* Locks may be held by callers, so this routine cannot block.
- * Caller holds reference on the message queue.
+ * Caller holds reference on the message queue (via port set).
*/
void
ipc_mqueue_set_gather_member_names(
- ipc_mqueue_t mq,
- ipc_entry_num_t maxnames,
+ ipc_space_t space,
+ ipc_mqueue_t set_mq,
+ ipc_entry_num_t maxnames,
mach_port_name_t *names,
ipc_entry_num_t *actualp)
{
- wait_queue_link_t wql;
- queue_t q;
- spl_t s;
+ ipc_entry_t table;
+ ipc_entry_num_t tsize;
+ struct waitq_set *wqset;
ipc_entry_num_t actual = 0;
- assert(imq_is_set(mq));
+ assert(set_mq != IMQ_NULL);
+ wqset = &set_mq->imq_set_queue;
- s = splsched();
- imq_lock(mq);
+ assert(space != IS_NULL);
+ is_read_lock(space);
+ if (!is_active(space)) {
+ is_read_unlock(space);
+ goto out;
+ }
- /*
- * Iterate over the member ports through the mqueue set links
- * capturing as many names as we can.
- */
- q = &mq->imq_setlinks;
- queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
- ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
+ if (!waitq_set_is_valid(wqset)) {
+ is_read_unlock(space);
+ goto out;
+ }
- if (actual < maxnames)
- names[actual] = port_mq->imq_receiver_name;
- actual++;
+ table = space->is_table;
+ tsize = space->is_table_size;
+ for (ipc_entry_num_t idx = 0; idx < tsize; idx++) {
+ ipc_entry_t entry = &table[idx];
+
+ /* only receive rights can be members of port sets */
+ if ((entry->ie_bits & MACH_PORT_TYPE_RECEIVE) != MACH_PORT_TYPE_NONE) {
+ __IGNORE_WCASTALIGN(ipc_port_t port = (ipc_port_t)entry->ie_object);
+ ipc_mqueue_t mq = &port->ip_messages;
+
+ assert(IP_VALID(port));
+ if (ip_active(port) &&
+ waitq_member(&mq->imq_wait_queue, wqset)) {
+ if (actual < maxnames)
+ names[actual] = mq->imq_receiver_name;
+ actual++;
+ }
+ }
}
- imq_unlock(mq);
- splx(s);
+ is_read_unlock(space);
+
+out:
*actualp = actual;
}
boolean_t reap = FALSE;
spl_t s;
+ assert(!imq_is_set(mqueue));
+
s = splsched();
imq_lock(mqueue);
+
/*
* rouse all blocked senders
+ * (don't boost anyone - we're tearing this queue down)
+ * (never preposts)
*/
mqueue->imq_fullwaiters = FALSE;
- wait_queue_wakeup64_all_locked(
- &mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_RESTART,
- FALSE);
+ waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
+ IPC_MQUEUE_FULL,
+ THREAD_RESTART,
+ NULL,
+ WAITQ_ALL_PRIORITIES,
+ WAITQ_KEEP_LOCKED);
/*
* Move messages from the specified queue to the per-thread
reap = first;
}
+ /*
+ * Wipe out message count, both for messages about to be
+ * reaped and for reserved space for (previously) woken senders.
+ * This is the indication to them that their reserved space is gone
+ * (the mqueue was destroyed).
+ */
+ mqueue->imq_msgcount = 0;
+
+ /* clear out any preposting we may have done */
+ waitq_clear_prepost_locked(&mqueue->imq_wait_queue, &s);
+
imq_unlock(mqueue);
splx(s);
+ /*
+ * assert that we're destroying a queue that's not a
+ * member of any other queue
+ */
+ assert(mqueue->imq_wait_queue.waitq_prepost_id == 0);
+ assert(mqueue->imq_wait_queue.waitq_set_id == 0);
+
+
/*
* Destroy the messages we enqueued if we aren't nested
* inside some other attempt to drain the same queue.
wakeup = qlimit - mqueue->imq_qlimit;
for (i = 0; i < wakeup; i++) {
- if (wait_queue_wakeup64_one_locked(
- &mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_AWAKENED,
- FALSE) == KERN_NOT_WAITING) {
- mqueue->imq_fullwaiters = FALSE;
- break;
- }
- mqueue->imq_msgcount++; /* give it to the awakened thread */
+ /*
+ * boost the priority of the awoken thread
+ * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
+ * the message queue slot we've just reserved.
+ *
+ * NOTE: this will never prepost
+ */
+ if (waitq_wakeup64_one_locked(&mqueue->imq_wait_queue,
+ IPC_MQUEUE_FULL,
+ THREAD_AWAKENED,
+ NULL,
+ WAITQ_PROMOTE_PRIORITY,
+ WAITQ_KEEP_LOCKED) == KERN_NOT_WAITING) {
+ mqueue->imq_fullwaiters = FALSE;
+ break;
+ }
+ mqueue->imq_msgcount++; /* give it to the awakened thread */
}
- }
+ }
mqueue->imq_qlimit = qlimit;
imq_unlock(mqueue);
splx(s);
if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
ipc_port_t port;
- port = (ipc_port_t) object;
+ __IGNORE_WCASTALIGN(port = (ipc_port_t) object);
assert(port != IP_NULL);
ip_lock(port);
} else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
ipc_pset_t pset;
- pset = (ipc_pset_t) object;
+ __IGNORE_WCASTALIGN(pset = (ipc_pset_t) object);
assert(pset != IPS_NULL);
ips_lock(pset);
*mqueuep = mqueue;
return MACH_MSG_SUCCESS;
}
-