X-Git-Url: https://git.saurik.com/apple/xnu.git/blobdiff_plain/fe8ab488e9161c46dd9885d58fc52996dc0249ff..7e41aa883dd258f888d0470250eead40a53ef1f5:/osfmk/ipc/ipc_mqueue.c diff --git a/osfmk/ipc/ipc_mqueue.c b/osfmk/ipc/ipc_mqueue.c index 1a98bab1f..1b5d82c19 100644 --- a/osfmk/ipc/ipc_mqueue.c +++ b/osfmk/ipc/ipc_mqueue.c @@ -82,7 +82,7 @@ #include #include #include -#include +#include #include #include @@ -108,12 +108,15 @@ void ipc_mqueue_receive_results(wait_result_t result); void ipc_mqueue_init( ipc_mqueue_t mqueue, - boolean_t is_set) + boolean_t is_set, + uint64_t *reserved_link) { if (is_set) { - wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST); + waitq_set_init(&mqueue->imq_set_queue, + SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST|SYNC_POLICY_DISABLE_IRQ, + reserved_link); } else { - wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO); + waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO|SYNC_POLICY_DISABLE_IRQ); ipc_kmsg_queue_init(&mqueue->imq_messages); mqueue->imq_seqno = 0; mqueue->imq_msgcount = 0; @@ -122,6 +125,53 @@ ipc_mqueue_init( } } +void ipc_mqueue_deinit( + ipc_mqueue_t mqueue) +{ + boolean_t is_set = imq_is_set(mqueue); + + if (is_set) + waitq_set_deinit(&mqueue->imq_set_queue); + else + waitq_deinit(&mqueue->imq_wait_queue); +} + +/* + * Routine: imq_reserve_and_lock + * Purpose: + * Atomically lock an ipc_mqueue_t object and reserve + * an appropriate number of prepost linkage objects for + * use in wakeup operations. + * Conditions: + * mq is unlocked + */ +void +imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost, spl_t *spl) +{ + *reserved_prepost = waitq_prepost_reserve(&mq->imq_wait_queue, 0, + WAITQ_KEEP_LOCKED, spl); + +} + + +/* + * Routine: imq_release_and_unlock + * Purpose: + * Unlock an ipc_mqueue_t object, re-enable interrupts, + * and release any unused prepost object reservations. + * Conditions: + * mq is locked + */ +void +imq_release_and_unlock(ipc_mqueue_t mq, uint64_t reserved_prepost, spl_t spl) +{ + assert(imq_held(mq)); + waitq_unlock(&mq->imq_wait_queue); + splx(spl); + waitq_prepost_release_reserve(reserved_prepost); +} + + /* * Routine: ipc_mqueue_member * Purpose: @@ -139,10 +189,10 @@ ipc_mqueue_member( ipc_mqueue_t port_mqueue, ipc_mqueue_t set_mqueue) { - wait_queue_t port_waitq = &port_mqueue->imq_wait_queue; - wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue; + struct waitq *port_waitq = &port_mqueue->imq_wait_queue; + struct waitq_set *set_waitq = &set_mqueue->imq_set_queue; - return (wait_queue_member(port_waitq, set_waitq)); + return waitq_member(port_waitq, set_waitq); } @@ -156,13 +206,12 @@ ipc_mqueue_member( kern_return_t ipc_mqueue_remove( ipc_mqueue_t mqueue, - ipc_mqueue_t set_mqueue, - wait_queue_link_t *wqlp) + ipc_mqueue_t set_mqueue) { - wait_queue_t mq_waitq = &mqueue->imq_wait_queue; - wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue; + struct waitq *mq_waitq = &mqueue->imq_wait_queue; + struct waitq_set *set_waitq = &set_mqueue->imq_set_queue; - return wait_queue_unlink_nofree(mq_waitq, set_waitq, wqlp); + return waitq_unlink(mq_waitq, set_waitq); } /* @@ -173,13 +222,11 @@ ipc_mqueue_remove( * Nothing locked. */ void -ipc_mqueue_remove_from_all( - ipc_mqueue_t mqueue, - queue_t links) +ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue) { - wait_queue_t mq_waitq = &mqueue->imq_wait_queue; + struct waitq *mq_waitq = &mqueue->imq_wait_queue; - wait_queue_unlink_all_nofree(mq_waitq, links); + waitq_unlink_all(mq_waitq); return; } @@ -187,17 +234,15 @@ ipc_mqueue_remove_from_all( * Routine: ipc_mqueue_remove_all * Purpose: * Remove all the member queues from the specified set. + * Also removes the queue from any containing sets. * Conditions: * Nothing locked. */ void -ipc_mqueue_remove_all( - ipc_mqueue_t mqueue, - queue_t links) +ipc_mqueue_remove_all(ipc_mqueue_t mqueue) { - wait_queue_set_t mq_setq = &mqueue->imq_set_queue; - - wait_queue_set_unlink_all_nofree(mq_setq, links); + struct waitq_set *mq_setq = &mqueue->imq_set_queue; + waitq_set_unlink_all(mq_setq); return; } @@ -215,28 +260,39 @@ ipc_mqueue_remove_all( */ kern_return_t ipc_mqueue_add( - ipc_mqueue_t port_mqueue, - ipc_mqueue_t set_mqueue, - wait_queue_link_t wql) + ipc_mqueue_t port_mqueue, + ipc_mqueue_t set_mqueue, + uint64_t *reserved_link, + uint64_t *reserved_prepost) { - wait_queue_t port_waitq = &port_mqueue->imq_wait_queue; - wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue; + struct waitq *port_waitq = &port_mqueue->imq_wait_queue; + struct waitq_set *set_waitq = &set_mqueue->imq_set_queue; ipc_kmsg_queue_t kmsgq; ipc_kmsg_t kmsg, next; kern_return_t kr; spl_t s; - kr = wait_queue_link_noalloc(port_waitq, set_waitq, wql); - if (kr != KERN_SUCCESS) + assert(reserved_link && *reserved_link != 0); + + s = splsched(); + imq_lock(port_mqueue); + + /* + * The link operation is now under the same lock-hold as + * message iteration and thread wakeup, but doesn't have to be... + */ + kr = waitq_link(port_waitq, set_waitq, WAITQ_ALREADY_LOCKED, reserved_link); + if (kr != KERN_SUCCESS) { + imq_unlock(port_mqueue); + splx(s); return kr; + } /* * Now that the set has been added to the port, there may be * messages queued on the port and threads waiting on the set * waitq. Lets get them together. */ - s = splsched(); - imq_lock(port_mqueue); kmsgq = &port_mqueue->imq_messages; for (kmsg = ipc_kmsg_queue_first(kmsgq); kmsg != IKM_NULL; @@ -246,12 +302,13 @@ ipc_mqueue_add( for (;;) { thread_t th; mach_msg_size_t msize; + spl_t th_spl; - th = wait_queue_wakeup64_identity_locked( + th = waitq_wakeup64_identity_locked( port_waitq, IPC_MQUEUE_RECEIVE, - THREAD_AWAKENED, - FALSE); + THREAD_AWAKENED, &th_spl, + reserved_prepost, WAITQ_KEEP_LOCKED); /* waitq/mqueue still locked, thread locked */ if (th == THREAD_NULL) @@ -265,6 +322,7 @@ ipc_mqueue_add( */ if (th->ith_state != MACH_RCV_IN_PROGRESS) { thread_unlock(th); + splx(th_spl); continue; } @@ -289,6 +347,7 @@ ipc_mqueue_add( th->ith_kmsg = IKM_NULL; th->ith_seqno = 0; thread_unlock(th); + splx(th_spl); continue; /* find another thread */ } } else { @@ -300,14 +359,14 @@ ipc_mqueue_add( * so give it to him. */ ipc_kmsg_rmqueue(kmsgq, kmsg); - ipc_mqueue_release_msgcount(port_mqueue); + ipc_mqueue_release_msgcount(port_mqueue, IMQ_NULL); th->ith_kmsg = kmsg; th->ith_seqno = port_mqueue->imq_seqno++; thread_unlock(th); + splx(th_spl); break; /* go to next message */ } - } leave: imq_unlock(port_mqueue); @@ -327,11 +386,12 @@ void ipc_mqueue_changed( ipc_mqueue_t mqueue) { - wait_queue_wakeup64_all_locked( - &mqueue->imq_wait_queue, - IPC_MQUEUE_RECEIVE, - THREAD_RESTART, - FALSE); /* unlock waitq? */ + waitq_wakeup64_all_locked(&mqueue->imq_wait_queue, + IPC_MQUEUE_RECEIVE, + THREAD_RESTART, + NULL, + WAITQ_ALL_PRIORITIES, + WAITQ_KEEP_LOCKED); } @@ -402,12 +462,12 @@ ipc_mqueue_send( clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline); else deadline = 0; - wresult = wait_queue_assert_wait64_locked( + wresult = waitq_assert_wait64_locked( &mqueue->imq_wait_queue, IPC_MQUEUE_FULL, THREAD_ABORTSAFE, TIMEOUT_URGENCY_USER_NORMAL, - deadline, 0, + deadline, TIMEOUT_NO_LEEWAY, cur_thread); thread_unlock(cur_thread); imq_unlock(mqueue); @@ -419,15 +479,19 @@ ipc_mqueue_send( } switch (wresult) { + + case THREAD_AWAKENED: + /* + * we can proceed - inherited msgcount from waker + * or the message queue has been destroyed and the msgcount + * has been reset to zero (will detect in ipc_mqueue_post()). + */ + break; + case THREAD_TIMED_OUT: assert(option & MACH_SEND_TIMEOUT); return MACH_SEND_TIMED_OUT; - case THREAD_AWAKENED: - /* we can proceed - inherited msgcount from waker */ - assert(mqueue->imq_msgcount > 0); - break; - case THREAD_INTERRUPTED: return MACH_SEND_INTERRUPTED; @@ -453,28 +517,43 @@ ipc_mqueue_send( * Conditions: * The message queue is locked. * The message corresponding to this reference is off the queue. + * There is no need to pass reserved preposts because this will + * never prepost to anyone */ void -ipc_mqueue_release_msgcount( - ipc_mqueue_t mqueue) +ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq) { - assert(imq_held(mqueue)); - assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages)); + (void)set_mq; + assert(imq_held(port_mq)); + assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages)); - mqueue->imq_msgcount--; + port_mq->imq_msgcount--; - if (!imq_full(mqueue) && mqueue->imq_fullwaiters) { - if (wait_queue_wakeup64_one_locked( - &mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_AWAKENED, - FALSE) != KERN_SUCCESS) { - mqueue->imq_fullwaiters = FALSE; + if (!imq_full(port_mq) && port_mq->imq_fullwaiters) { + /* + * boost the priority of the awoken thread + * (WAITQ_PROMOTE_PRIORITY) to ensure it uses + * the message queue slot we've just reserved. + * + * NOTE: this will never prepost + */ + if (waitq_wakeup64_one_locked(&port_mq->imq_wait_queue, + IPC_MQUEUE_FULL, + THREAD_AWAKENED, + NULL, + WAITQ_PROMOTE_PRIORITY, + WAITQ_KEEP_LOCKED) != KERN_SUCCESS) { + port_mq->imq_fullwaiters = FALSE; } else { /* gave away our slot - add reference back */ - mqueue->imq_msgcount++; + port_mq->imq_msgcount++; } } + + if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) { + /* no more msgs: invalidate the port's prepost object */ + waitq_clear_prepost_locked(&port_mq->imq_wait_queue, NULL); + } } /* @@ -485,6 +564,7 @@ ipc_mqueue_release_msgcount( * the message queue. * * Conditions: + * mqueue is unlocked * If we need to queue, our space in the message queue is reserved. */ void @@ -493,6 +573,7 @@ ipc_mqueue_post( register ipc_kmsg_t kmsg) { spl_t s; + uint64_t reserved_prepost = 0; /* * While the msg queue is locked, we have control of the @@ -500,27 +581,41 @@ ipc_mqueue_post( * * Check for a receiver for the message. */ - s = splsched(); - imq_lock(mqueue); + imq_reserve_and_lock(mqueue, &reserved_prepost, &s); for (;;) { - wait_queue_t waitq = &mqueue->imq_wait_queue; + struct waitq *waitq = &mqueue->imq_wait_queue; + spl_t th_spl; thread_t receiver; mach_msg_size_t msize; - receiver = wait_queue_wakeup64_identity_locked( - waitq, - IPC_MQUEUE_RECEIVE, - THREAD_AWAKENED, - FALSE); + receiver = waitq_wakeup64_identity_locked(waitq, + IPC_MQUEUE_RECEIVE, + THREAD_AWAKENED, + &th_spl, + &reserved_prepost, + WAITQ_KEEP_LOCKED); /* waitq still locked, thread locked */ if (receiver == THREAD_NULL) { + /* - * no receivers; queue kmsg + * no receivers; queue kmsg if space still reserved. */ - assert(mqueue->imq_msgcount > 0); - ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg); - break; + if (mqueue->imq_msgcount > 0) { + ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg); + break; + } + + /* + * Otherwise, the message queue must belong to an inactive + * port, so just destroy the message and pretend it was posted. + */ + /* clear the waitq boost we may have been given */ + waitq_clear_promotion_locked(waitq, current_thread()); + imq_release_and_unlock(mqueue, reserved_prepost, s); + ipc_kmsg_destroy(kmsg); + current_task()->messages_sent++; + return; } /* @@ -531,6 +626,7 @@ ipc_mqueue_post( */ if (receiver->ith_state != MACH_RCV_IN_PROGRESS) { thread_unlock(receiver); + splx(th_spl); continue; } @@ -560,9 +656,10 @@ ipc_mqueue_post( receiver->ith_kmsg = kmsg; receiver->ith_seqno = mqueue->imq_seqno++; thread_unlock(receiver); + splx(th_spl); /* we didn't need our reserved spot in the queue */ - ipc_mqueue_release_msgcount(mqueue); + ipc_mqueue_release_msgcount(mqueue, IMQ_NULL); break; } @@ -575,10 +672,12 @@ ipc_mqueue_post( receiver->ith_kmsg = IKM_NULL; receiver->ith_seqno = 0; thread_unlock(receiver); + splx(th_spl); } - imq_unlock(mqueue); - splx(s); + /* clear the waitq boost we may have been given */ + waitq_clear_promotion_locked(&mqueue->imq_wait_queue, current_thread()); + imq_release_and_unlock(mqueue, reserved_prepost, s); current_task()->messages_sent++; return; @@ -707,6 +806,32 @@ ipc_mqueue_receive( ipc_mqueue_receive_results(wresult); } +static int mqueue_process_prepost_receive(void *ctx, struct waitq *waitq, + struct waitq_set *wqset) +{ + ipc_mqueue_t port_mq, *pmq_ptr; + + (void)wqset; + port_mq = (ipc_mqueue_t)waitq; + + /* + * If there are no messages on this queue, skip it and remove + * it from the prepost list + */ + if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) + return WQ_ITERATE_INVALIDATE_CONTINUE; + + /* + * There are messages waiting on this port. + * Instruct the prepost iteration logic to break, but keep the + * waitq locked. + */ + pmq_ptr = (ipc_mqueue_t *)ctx; + if (pmq_ptr) + *pmq_ptr = port_mq; + return WQ_ITERATE_BREAK_KEEP_LOCKED; +} + wait_result_t ipc_mqueue_receive_on_thread( ipc_mqueue_t mqueue, @@ -716,92 +841,65 @@ ipc_mqueue_receive_on_thread( int interruptible, thread_t thread) { - ipc_kmsg_queue_t kmsgs; wait_result_t wresult; uint64_t deadline; spl_t s; s = splsched(); imq_lock(mqueue); + /* no need to reserve anything: we never prepost to anyone */ if (imq_is_set(mqueue)) { - queue_t q; - - q = &mqueue->imq_preposts; + ipc_mqueue_t port_mq = IMQ_NULL; + spl_t set_spl; - /* - * If we are waiting on a portset mqueue, we need to see if - * any of the member ports have work for us. Ports that - * have (or recently had) messages will be linked in the - * prepost queue for the portset. By holding the portset's - * mqueue lock during the search, we tie up any attempts by - * mqueue_deliver or portset membership changes that may - * cross our path. - */ - search_set: - while(!queue_empty(q)) { - wait_queue_link_t wql; - ipc_mqueue_t port_mq; - - queue_remove_first(q, wql, wait_queue_link_t, wql_preposts); - assert(!wql_is_preposted(wql)); - - /* - * This is a lock order violation, so we have to do it - * "softly," putting the link back on the prepost list - * if it fails (at the tail is fine since the order of - * handling messages from different sources in a set is - * not guaranteed and we'd like to skip to the next source - * if one is available). - */ - port_mq = (ipc_mqueue_t)wql->wql_queue; - if (!imq_lock_try(port_mq)) { - queue_enter(q, wql, wait_queue_link_t, wql_preposts); - imq_unlock(mqueue); - splx(s); - mutex_pause(0); - s = splsched(); - imq_lock(mqueue); - goto search_set; /* start again at beginning - SMP */ - } + (void)waitq_set_iterate_preposts(&mqueue->imq_set_queue, + &port_mq, + mqueue_process_prepost_receive, + &set_spl); + if (port_mq != IMQ_NULL) { /* - * If there are no messages on this queue, just skip it - * (we already removed the link from the set's prepost queue). + * We get here if there is at least one message + * waiting on port_mq. We have instructed the prepost + * iteration logic to leave both the port_mq and the + * set mqueue locked. + * + * TODO: previously, we would place this port at the + * back of the prepost list... */ - kmsgs = &port_mq->imq_messages; - if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) { - imq_unlock(port_mq); - continue; - } + imq_unlock(mqueue); - /* - * There are messages, so reinsert the link back - * at the tail of the preposted queue (for fairness) - * while we still have the portset mqueue locked. + /* TODO: if/when port mqueues become non irq safe, + * we won't need this spl, and we should be + * able to call splx(s) (if that's even + * necessary). + * For now, we've still disabled interrupts via + * imq_reserve_and_lock(); */ - queue_enter(q, wql, wait_queue_link_t, wql_preposts); - imq_unlock(mqueue); + splx(set_spl); /* * Continue on to handling the message with just * the port mqueue locked. */ - ipc_mqueue_select_on_thread(port_mq, option, max_size, thread); + ipc_mqueue_select_on_thread(port_mq, mqueue, option, + max_size, thread); + imq_unlock(port_mq); splx(s); return THREAD_NOT_WAITING; - } - } else { + ipc_kmsg_queue_t kmsgs; /* * Receive on a single port. Just try to get the messages. */ kmsgs = &mqueue->imq_messages; if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) { - ipc_mqueue_select_on_thread(mqueue, option, max_size, thread); + ipc_mqueue_select_on_thread(mqueue, IMQ_NULL, option, + max_size, thread); imq_unlock(mqueue); splx(s); return THREAD_NOT_WAITING; @@ -822,6 +920,7 @@ ipc_mqueue_receive_on_thread( } } + /* NOTE: need splsched() here if mqueue no longer needs irq disabled */ thread_lock(thread); thread->ith_state = MACH_RCV_IN_PROGRESS; thread->ith_option = option; @@ -832,12 +931,13 @@ ipc_mqueue_receive_on_thread( else deadline = 0; - wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_RECEIVE, - interruptible, - TIMEOUT_URGENCY_USER_NORMAL, - deadline, 0, - thread); + wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue, + IPC_MQUEUE_RECEIVE, + interruptible, + TIMEOUT_URGENCY_USER_NORMAL, + deadline, + TIMEOUT_NO_LEEWAY, + thread); /* preposts should be detected above, not here */ if (wresult == THREAD_AWAKENED) panic("ipc_mqueue_receive_on_thread: sleep walking"); @@ -859,13 +959,16 @@ ipc_mqueue_receive_on_thread( * mqueue locked. * thread not locked. * There is a message. + * No need to reserve prepost objects - it will never prepost + * * Returns: * MACH_MSG_SUCCESS Actually selected a message for ourselves. * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large */ void ipc_mqueue_select_on_thread( - ipc_mqueue_t mqueue, + ipc_mqueue_t port_mq, + ipc_mqueue_t set_mq, mach_msg_option_t option, mach_msg_size_t max_size, thread_t thread) @@ -878,7 +981,7 @@ ipc_mqueue_select_on_thread( * Do some sanity checking of our ability to receive * before pulling the message off the queue. */ - kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages); + kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages); assert(kmsg != IKM_NULL); /* @@ -891,7 +994,7 @@ ipc_mqueue_select_on_thread( if (rcv_size + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) { mr = MACH_RCV_TOO_LARGE; if (option & MACH_RCV_LARGE) { - thread->ith_receiver_name = mqueue->imq_receiver_name; + thread->ith_receiver_name = port_mq->imq_receiver_name; thread->ith_kmsg = IKM_NULL; thread->ith_msize = rcv_size; thread->ith_seqno = 0; @@ -900,9 +1003,9 @@ ipc_mqueue_select_on_thread( } } - ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg); - ipc_mqueue_release_msgcount(mqueue); - thread->ith_seqno = mqueue->imq_seqno++; + ipc_kmsg_rmqueue_first_macro(&port_mq->imq_messages, kmsg); + ipc_mqueue_release_msgcount(port_mq, set_mq); + thread->ith_seqno = port_mq->imq_seqno++; thread->ith_kmsg = kmsg; thread->ith_state = mr; @@ -923,14 +1026,14 @@ ipc_mqueue_select_on_thread( * Caller holds reference on the message queue. */ unsigned -ipc_mqueue_peek(ipc_mqueue_t mq, - mach_port_seqno_t *seqnop, - mach_msg_size_t *msg_sizep, - mach_msg_id_t *msg_idp, - mach_msg_max_trailer_t *msg_trailerp) +ipc_mqueue_peek(ipc_mqueue_t mq, + mach_port_seqno_t * seqnop, + mach_msg_size_t * msg_sizep, + mach_msg_id_t * msg_idp, + mach_msg_max_trailer_t * msg_trailerp) { ipc_kmsg_queue_t kmsgq; - ipc_kmsg_t kmsg; + ipc_kmsg_t kmsg; mach_port_seqno_t seqno, msgoff; int res = 0; spl_t s; @@ -940,7 +1043,9 @@ ipc_mqueue_peek(ipc_mqueue_t mq, s = splsched(); imq_lock(mq); - seqno = (seqnop != NULL) ? seqno = *seqnop : 0; + seqno = 0; + if (seqnop != NULL) + seqno = *seqnop; if (seqno == 0) { seqno = mq->imq_seqno; @@ -980,6 +1085,29 @@ ipc_mqueue_peek(ipc_mqueue_t mq, return res; } + +/* + * peek at the contained port message queues, break prepost iteration as soon + * as we spot a message on one of the message queues referenced by the set's + * prepost list. No need to lock each message queue, as only the head of each + * queue is checked. If a message wasn't there before we entered here, no need + * to find it (if we do, great). + */ +static int mqueue_peek_iterator(void *ctx, struct waitq *waitq, + struct waitq_set *wqset) +{ + ipc_mqueue_t port_mq = (ipc_mqueue_t)waitq; + ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages; + + (void)ctx; + (void)wqset; + + if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) + return WQ_ITERATE_BREAK; /* break out of the prepost iteration */ + + return WQ_ITERATE_CONTINUE; +} + /* * Routine: ipc_mqueue_set_peek * Purpose: @@ -993,83 +1121,91 @@ ipc_mqueue_peek(ipc_mqueue_t mq, unsigned ipc_mqueue_set_peek(ipc_mqueue_t mq) { - wait_queue_link_t wql; - queue_t q; spl_t s; - int res; + int ret; assert(imq_is_set(mq)); s = splsched(); imq_lock(mq); - /* - * peek at the contained port message queues, return as soon as - * we spot a message on one of the message queues linked on the - * prepost list. No need to lock each message queue, as only the - * head of each queue is checked. If a message wasn't there before - * we entered here, no need to find it (if we do, great). - */ - res = 0; - q = &mq->imq_preposts; - queue_iterate(q, wql, wait_queue_link_t, wql_preposts) { - ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue; - ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages; - - if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) { - res = 1; - break; - } - } + ret = waitq_set_iterate_preposts(&mq->imq_set_queue, NULL, + mqueue_peek_iterator, NULL); + imq_unlock(mq); splx(s); - return res; + return (ret == WQ_ITERATE_BREAK); } /* * Routine: ipc_mqueue_set_gather_member_names * Purpose: - * Iterate a message queue set to identify the member port - * names. Actual returned names is limited to maxnames entries, - * but we keep counting the actual number of members to let - * the caller decide to retry if necessary. + * Discover all ports which are members of a given port set. + * Because the waitq linkage mechanism was redesigned to save + * significan amounts of memory, it no longer keeps back-pointers + * from a port set to a port. Therefore, we must iterate over all + * ports within a given IPC space and individually query them to + * see if they are members of the given set. Port names of ports + * found to be members of the given set will be gathered into the + * provided 'names' array. Actual returned names are limited to + * maxnames entries, but we keep counting the actual number of + * members to let the caller decide to retry if necessary. * * Conditions: * Locks may be held by callers, so this routine cannot block. - * Caller holds reference on the message queue. + * Caller holds reference on the message queue (via port set). */ void ipc_mqueue_set_gather_member_names( - ipc_mqueue_t mq, - ipc_entry_num_t maxnames, + ipc_space_t space, + ipc_mqueue_t set_mq, + ipc_entry_num_t maxnames, mach_port_name_t *names, ipc_entry_num_t *actualp) { - wait_queue_link_t wql; - queue_t q; - spl_t s; + ipc_entry_t table; + ipc_entry_num_t tsize; + struct waitq_set *wqset; ipc_entry_num_t actual = 0; - assert(imq_is_set(mq)); + assert(set_mq != IMQ_NULL); + wqset = &set_mq->imq_set_queue; - s = splsched(); - imq_lock(mq); + assert(space != IS_NULL); + is_read_lock(space); + if (!is_active(space)) { + is_read_unlock(space); + goto out; + } - /* - * Iterate over the member ports through the mqueue set links - * capturing as many names as we can. - */ - q = &mq->imq_setlinks; - queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) { - ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue; + if (!waitq_set_is_valid(wqset)) { + is_read_unlock(space); + goto out; + } - if (actual < maxnames) - names[actual] = port_mq->imq_receiver_name; - actual++; + table = space->is_table; + tsize = space->is_table_size; + for (ipc_entry_num_t idx = 0; idx < tsize; idx++) { + ipc_entry_t entry = &table[idx]; + + /* only receive rights can be members of port sets */ + if ((entry->ie_bits & MACH_PORT_TYPE_RECEIVE) != MACH_PORT_TYPE_NONE) { + __IGNORE_WCASTALIGN(ipc_port_t port = (ipc_port_t)entry->ie_object); + ipc_mqueue_t mq = &port->ip_messages; + + assert(IP_VALID(port)); + if (ip_active(port) && + waitq_member(&mq->imq_wait_queue, wqset)) { + if (actual < maxnames) + names[actual] = mq->imq_receiver_name; + actual++; + } + } } - imq_unlock(mq); - splx(s); + is_read_unlock(space); + +out: *actualp = actual; } @@ -1093,17 +1229,23 @@ ipc_mqueue_destroy( boolean_t reap = FALSE; spl_t s; + assert(!imq_is_set(mqueue)); + s = splsched(); imq_lock(mqueue); + /* * rouse all blocked senders + * (don't boost anyone - we're tearing this queue down) + * (never preposts) */ mqueue->imq_fullwaiters = FALSE; - wait_queue_wakeup64_all_locked( - &mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_RESTART, - FALSE); + waitq_wakeup64_all_locked(&mqueue->imq_wait_queue, + IPC_MQUEUE_FULL, + THREAD_RESTART, + NULL, + WAITQ_ALL_PRIORITIES, + WAITQ_KEEP_LOCKED); /* * Move messages from the specified queue to the per-thread @@ -1117,9 +1259,28 @@ ipc_mqueue_destroy( reap = first; } + /* + * Wipe out message count, both for messages about to be + * reaped and for reserved space for (previously) woken senders. + * This is the indication to them that their reserved space is gone + * (the mqueue was destroyed). + */ + mqueue->imq_msgcount = 0; + + /* clear out any preposting we may have done */ + waitq_clear_prepost_locked(&mqueue->imq_wait_queue, &s); + imq_unlock(mqueue); splx(s); + /* + * assert that we're destroying a queue that's not a + * member of any other queue + */ + assert(mqueue->imq_wait_queue.waitq_prepost_id == 0); + assert(mqueue->imq_wait_queue.waitq_set_id == 0); + + /* * Destroy the messages we enqueued if we aren't nested * inside some other attempt to drain the same queue. @@ -1156,17 +1317,25 @@ ipc_mqueue_set_qlimit( wakeup = qlimit - mqueue->imq_qlimit; for (i = 0; i < wakeup; i++) { - if (wait_queue_wakeup64_one_locked( - &mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_AWAKENED, - FALSE) == KERN_NOT_WAITING) { - mqueue->imq_fullwaiters = FALSE; - break; - } - mqueue->imq_msgcount++; /* give it to the awakened thread */ + /* + * boost the priority of the awoken thread + * (WAITQ_PROMOTE_PRIORITY) to ensure it uses + * the message queue slot we've just reserved. + * + * NOTE: this will never prepost + */ + if (waitq_wakeup64_one_locked(&mqueue->imq_wait_queue, + IPC_MQUEUE_FULL, + THREAD_AWAKENED, + NULL, + WAITQ_PROMOTE_PRIORITY, + WAITQ_KEEP_LOCKED) == KERN_NOT_WAITING) { + mqueue->imq_fullwaiters = FALSE; + break; + } + mqueue->imq_msgcount++; /* give it to the awakened thread */ } - } + } mqueue->imq_qlimit = qlimit; imq_unlock(mqueue); splx(s); @@ -1239,7 +1408,7 @@ ipc_mqueue_copyin( if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) { ipc_port_t port; - port = (ipc_port_t) object; + __IGNORE_WCASTALIGN(port = (ipc_port_t) object); assert(port != IP_NULL); ip_lock(port); @@ -1252,7 +1421,7 @@ ipc_mqueue_copyin( } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) { ipc_pset_t pset; - pset = (ipc_pset_t) object; + __IGNORE_WCASTALIGN(pset = (ipc_pset_t) object); assert(pset != IPS_NULL); ips_lock(pset); @@ -1278,4 +1447,3 @@ ipc_mqueue_copyin( *mqueuep = mqueue; return MACH_MSG_SUCCESS; } -