X-Git-Url: https://git.saurik.com/apple/xnu.git/blobdiff_plain/d26ffc64f583ab2d29df48f13518685602bc8832..d9a64523371fa019c4575bb400cbbc3a50ac9903:/osfmk/ipc/ipc_mqueue.c diff --git a/osfmk/ipc/ipc_mqueue.c b/osfmk/ipc/ipc_mqueue.c index 38af0db4c..685950c90 100644 --- a/osfmk/ipc/ipc_mqueue.c +++ b/osfmk/ipc/ipc_mqueue.c @@ -120,15 +120,14 @@ static void ipc_mqueue_peek_on_thread( void ipc_mqueue_init( ipc_mqueue_t mqueue, - boolean_t is_set, - uint64_t *reserved_link) + boolean_t is_set) { if (is_set) { waitq_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST, - reserved_link, NULL); + NULL, NULL); } else { - waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO); + waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO | SYNC_POLICY_PORT); ipc_kmsg_queue_init(&mqueue->imq_messages); mqueue->imq_seqno = 0; mqueue->imq_msgcount = 0; @@ -298,6 +297,7 @@ ipc_mqueue_add( kern_return_t kr; assert(reserved_link && *reserved_link != 0); + assert(waitqs_is_linked(set_waitq)); imq_lock(port_mqueue); @@ -371,7 +371,7 @@ ipc_mqueue_add( */ msize = ipc_kmsg_copyout_size(kmsg, th->map); if (th->ith_rsize < - (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(th), th->ith_option))) { + (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th), th->ith_option))) { th->ith_state = MACH_RCV_TOO_LARGE; th->ith_msize = msize; if (th->ith_option & MACH_RCV_LARGE) { @@ -427,8 +427,25 @@ void ipc_mqueue_changed( ipc_mqueue_t mqueue) { - /* Indicate that this message queue is vanishing */ - knote_vanish(&mqueue->imq_klist); + if (IMQ_KLIST_VALID(mqueue)) { + /* + * Indicate that this message queue is vanishing + * + * When this is called, the associated receive right may be in flight + * between two tasks: the one it used to live in, and the one that armed + * a port destroyed notification for it. + * + * The new process may want to register the port it gets back with an + * EVFILT_MACHPORT filter again, and may have pending sync IPC on this + * port pending already, in which case we want the imq_klist field to be + * reusable for nefarious purposes (see IMQ_SET_INHERITOR). + * + * Fortunately, we really don't need this linkage anymore after this + * point as EV_VANISHED / EV_EOF will be the last thing delivered ever. + */ + knote_vanish(&mqueue->imq_klist); + klist_init(&mqueue->imq_klist); + } waitq_wakeup64_all_locked(&mqueue->imq_wait_queue, IPC_MQUEUE_RECEIVE, @@ -439,13 +456,13 @@ ipc_mqueue_changed( } - + /* * Routine: ipc_mqueue_send * Purpose: * Send a message to a message queue. The message holds a reference - * for the destination port for this message queue in the + * for the destination port for this message queue in the * msgh_remote_port field. * * If unsuccessful, the caller still has possession of @@ -474,7 +491,7 @@ ipc_mqueue_send( * 3) Message is sent to a send-once right. */ if (!imq_full(mqueue) || - (!imq_full_kernel(mqueue) && + (!imq_full_kernel(mqueue) && ((option & MACH_SEND_ALWAYS) || (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) == MACH_MSG_TYPE_PORT_SEND_ONCE)))) { @@ -483,9 +500,12 @@ ipc_mqueue_send( imq_unlock(mqueue); } else { thread_t cur_thread = current_thread(); + ipc_port_t port = ip_from_mq(mqueue); + struct turnstile *send_turnstile = TURNSTILE_NULL; + turnstile_inheritor_t inheritor = TURNSTILE_INHERITOR_NULL; uint64_t deadline; - /* + /* * We have to wait for space to be granted to us. */ if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) { @@ -504,38 +524,65 @@ ipc_mqueue_send( deadline = 0; thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend); - wresult = waitq_assert_wait64_locked( - &mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_ABORTSAFE, - TIMEOUT_URGENCY_USER_NORMAL, - deadline, TIMEOUT_NO_LEEWAY, - cur_thread); + + send_turnstile = turnstile_prepare((uintptr_t)port, + port_send_turnstile_address(port), + TURNSTILE_NULL, TURNSTILE_SYNC_IPC); + + /* Check if the port in is in transit, get the destination port's turnstile */ + if (ip_active(port) && + port->ip_receiver_name == MACH_PORT_NULL && + port->ip_destination != NULL) { + inheritor = port_send_turnstile(port->ip_destination); + } else { + inheritor = ipc_port_get_inheritor(port); + } + + turnstile_update_inheritor(send_turnstile, inheritor, + TURNSTILE_DELAYED_UPDATE | TURNSTILE_INHERITOR_TURNSTILE); + + wresult = waitq_assert_wait64_leeway( + &send_turnstile->ts_waitq, + IPC_MQUEUE_FULL, + THREAD_ABORTSAFE, + TIMEOUT_URGENCY_USER_NORMAL, + deadline, + TIMEOUT_NO_LEEWAY); imq_unlock(mqueue); - + turnstile_update_inheritor_complete(send_turnstile, + TURNSTILE_INTERLOCK_NOT_HELD); + if (wresult == THREAD_WAITING) { wresult = thread_block(THREAD_CONTINUE_NULL); counter(c_ipc_mqueue_send_block++); } - + + /* Call turnstile complete with interlock held */ + imq_lock(mqueue); + turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL); + imq_unlock(mqueue); + + /* Call cleanup after dropping the interlock */ + turnstile_cleanup(); + switch (wresult) { case THREAD_AWAKENED: - /* + /* * we can proceed - inherited msgcount from waker * or the message queue has been destroyed and the msgcount * has been reset to zero (will detect in ipc_mqueue_post()). */ break; - + case THREAD_TIMED_OUT: assert(option & MACH_SEND_TIMEOUT); return MACH_SEND_TIMED_OUT; - + case THREAD_INTERRUPTED: return MACH_SEND_INTERRUPTED; - + case THREAD_RESTART: /* mqueue is being destroyed */ return MACH_SEND_INVALID_DEST; @@ -569,12 +616,14 @@ extern void ipc_mqueue_override_send( imq_lock(mqueue); assert(imq_valid(mqueue)); assert(!imq_is_set(mqueue)); - + if (imq_full(mqueue)) { ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages); - if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override)) - KNOTE(&mqueue->imq_klist, 0); + if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override)) { + if (IMQ_KLIST_VALID(mqueue)) + KNOTE(&mqueue->imq_klist, 0); + } if (!first) full_queue_empty = TRUE; } @@ -608,26 +657,32 @@ extern void ipc_mqueue_override_send( void ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq) { + struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(port_mq)); (void)set_mq; assert(imq_held(port_mq)); assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages)); port_mq->imq_msgcount--; - if (!imq_full(port_mq) && port_mq->imq_fullwaiters) { + if (!imq_full(port_mq) && port_mq->imq_fullwaiters && + send_turnstile != TURNSTILE_NULL) { /* * boost the priority of the awoken thread * (WAITQ_PROMOTE_PRIORITY) to ensure it uses * the message queue slot we've just reserved. * * NOTE: this will never prepost + * + * The wakeup happens on a turnstile waitq + * which will wakeup the highest priority waiter. + * A potential downside of this would be starving low + * priority senders if there is a constant churn of + * high priority threads trying to send to this port. */ - if (waitq_wakeup64_one_locked(&port_mq->imq_wait_queue, + if (waitq_wakeup64_one(&send_turnstile->ts_waitq, IPC_MQUEUE_FULL, THREAD_AWAKENED, - NULL, - WAITQ_PROMOTE_PRIORITY, - WAITQ_KEEP_LOCKED) != KERN_SUCCESS) { + WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) { port_mq->imq_fullwaiters = FALSE; } else { /* gave away our slot - add reference back */ @@ -694,24 +749,26 @@ ipc_mqueue_post( if (receiver == THREAD_NULL) { - /* + /* * no receivers; queue kmsg if space still reserved * Reservations are cancelled when the port goes inactive. * note that this will enqueue the message for any - * "peeking" receivers. + * "peeking" receivers. * * Also, post the knote to wake up any threads waiting * on that style of interface if this insertion is of * note (first insertion, or adjusted override qos all * the way to the head of the queue). - * + * * This is just for ports. portset knotes are stay-active, * and their threads get awakened through the !MACH_RCV_IN_PROGRESS * logic below). */ if (mqueue->imq_msgcount > 0) { - if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) - KNOTE(&mqueue->imq_klist, 0); + if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) { + if (IMQ_KLIST_VALID(mqueue)) + KNOTE(&mqueue->imq_klist, 0); + } break; } @@ -722,7 +779,7 @@ ipc_mqueue_post( destroy_msg = TRUE; goto out_unlock; } - + /* * If a thread is attempting a "peek" into the message queue * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the @@ -753,7 +810,7 @@ ipc_mqueue_post( continue; } - + /* * We found a waiting thread. * If the message is too large or the scatter list is too small @@ -761,7 +818,7 @@ ipc_mqueue_post( */ msize = ipc_kmsg_copyout_size(kmsg, receiver->map); if (receiver->ith_rsize < - (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(receiver), receiver->ith_option))) { + (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver), receiver->ith_option))) { receiver->ith_msize = msize; receiver->ith_state = MACH_RCV_TOO_LARGE; } else { @@ -924,7 +981,7 @@ ipc_mqueue_receive( return; if (wresult == THREAD_WAITING) { - counter((interruptible == THREAD_ABORTSAFE) ? + counter((interruptible == THREAD_ABORTSAFE) ? c_ipc_mqueue_receive_block_user++ : c_ipc_mqueue_receive_block_kernel++); @@ -986,6 +1043,8 @@ ipc_mqueue_receive_on_thread( { wait_result_t wresult; uint64_t deadline; + struct turnstile *rcv_turnstile = TURNSTILE_NULL; + turnstile_inheritor_t inheritor = NULL; /* called with mqueue locked */ @@ -1001,7 +1060,7 @@ ipc_mqueue_receive_on_thread( */ return THREAD_RESTART; } - + if (imq_is_set(mqueue)) { ipc_mqueue_t port_mq = IMQ_NULL; @@ -1040,7 +1099,7 @@ ipc_mqueue_receive_on_thread( /* * Receive on a single port. Just try to get the messages. */ - kmsgs = &mqueue->imq_messages; + kmsgs = &mqueue->imq_messages; if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) { if (option & MACH_PEEK_MSG) ipc_mqueue_peek_on_thread(mqueue, option, thread); @@ -1054,7 +1113,7 @@ ipc_mqueue_receive_on_thread( panic("Unknown mqueue type 0x%x: likely memory corruption!\n", mqueue->imq_wait_queue.waitq_type); } - + /* * Looks like we'll have to block. The mqueue we will * block on (whether the set's or the local port's) is @@ -1082,6 +1141,37 @@ ipc_mqueue_receive_on_thread( else deadline = 0; + /* + * Threads waiting on a port (not portset) + * will wait on port's receive turnstile. + * Donate waiting thread's turnstile and + * setup inheritor for special reply port. + * Based on the state of the special reply + * port, the inheritor would be the send + * turnstile of the connection port on which + * the send of sync ipc would happen or + * workloop's turnstile who would reply to + * the sync ipc message. + * + * Pass in mqueue wait in waitq_assert_wait to + * support port set wakeup. The mqueue waitq of port + * will be converted to to turnstile waitq + * in waitq_assert_wait instead of global waitqs. + */ + if (imq_is_queue(mqueue)) { + ipc_port_t port = ip_from_mq(mqueue); + rcv_turnstile = turnstile_prepare((uintptr_t)port, + port_rcv_turnstile_address(port), + TURNSTILE_NULL, TURNSTILE_SYNC_IPC); + + if (port->ip_specialreply) { + inheritor = ipc_port_get_special_reply_port_inheritor(port); + } + + turnstile_update_inheritor(rcv_turnstile, inheritor, + (TURNSTILE_INHERITOR_TURNSTILE | TURNSTILE_DELAYED_UPDATE)); + } + thread_set_pending_block_hint(thread, kThreadWaitPortReceive); wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue, IPC_MQUEUE_RECEIVE, @@ -1096,6 +1186,12 @@ ipc_mqueue_receive_on_thread( imq_unlock(mqueue); + /* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */ + if (rcv_turnstile != TURNSTILE_NULL) { + turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD); + } + /* Its callers responsibility to call turnstile_complete to get the turnstile back */ + return wresult; } @@ -1175,7 +1271,7 @@ ipc_mqueue_select_on_thread( * (and size needed). */ msize = ipc_kmsg_copyout_size(kmsg, thread->map); - if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) { + if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(thread), option) > max_size) { mr = MACH_RCV_TOO_LARGE; if (option & MACH_RCV_LARGE) { thread->ith_receiver_name = port_mq->imq_receiver_name; @@ -1236,7 +1332,7 @@ ipc_mqueue_peek_locked(ipc_mqueue_t mq, if (seqno == 0) { seqno = mq->imq_seqno; msgoff = 0; - } else if (seqno >= mq->imq_seqno && + } else if (seqno >= mq->imq_seqno && seqno < mq->imq_seqno + mq->imq_msgcount) { msgoff = seqno - mq->imq_seqno; } else @@ -1259,7 +1355,7 @@ ipc_mqueue_peek_locked(ipc_mqueue_t mq, if (msg_idp != NULL) *msg_idp = kmsg->ikm_header->msgh_id; if (msg_trailerp != NULL) - memcpy(msg_trailerp, + memcpy(msg_trailerp, (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header + round_msg(kmsg->ikm_header->msgh_size)), sizeof(mach_msg_max_trailer_t)); @@ -1353,7 +1449,7 @@ static int mqueue_peek_iterator(void *ctx, struct waitq *waitq, (void)ctx; (void)wqset; - + if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) return WQ_ITERATE_BREAK; /* break out of the prepost iteration */ @@ -1482,6 +1578,7 @@ ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue) ipc_kmsg_queue_t kmqueue; ipc_kmsg_t kmsg; boolean_t reap = FALSE; + struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue)); assert(!imq_is_set(mqueue)); @@ -1491,12 +1588,13 @@ ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue) * (never preposts) */ mqueue->imq_fullwaiters = FALSE; - waitq_wakeup64_all_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_RESTART, - NULL, - WAITQ_ALL_PRIORITIES, - WAITQ_KEEP_LOCKED); + + if (send_turnstile != TURNSTILE_NULL) { + waitq_wakeup64_all(&send_turnstile->ts_waitq, + IPC_MQUEUE_FULL, + THREAD_RESTART, + WAITQ_ALL_PRIORITIES); + } /* * Move messages from the specified queue to the per-thread @@ -1559,6 +1657,7 @@ ipc_mqueue_set_qlimit( imq_lock(mqueue); if (qlimit > mqueue->imq_qlimit) { mach_port_msgcount_t i, wakeup; + struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue)); /* caution: wakeup, qlimit are unsigned */ wakeup = qlimit - mqueue->imq_qlimit; @@ -1571,12 +1670,11 @@ ipc_mqueue_set_qlimit( * * NOTE: this will never prepost */ - if (waitq_wakeup64_one_locked(&mqueue->imq_wait_queue, - IPC_MQUEUE_FULL, - THREAD_AWAKENED, - NULL, - WAITQ_PROMOTE_PRIORITY, - WAITQ_KEEP_LOCKED) == KERN_NOT_WAITING) { + if (send_turnstile == TURNSTILE_NULL || + waitq_wakeup64_one(&send_turnstile->ts_waitq, + IPC_MQUEUE_FULL, + THREAD_AWAKENED, + WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) { mqueue->imq_fullwaiters = FALSE; break; }