void
ipc_mqueue_init(
ipc_mqueue_t mqueue,
- boolean_t is_set,
- uint64_t *reserved_link)
+ boolean_t is_set)
{
if (is_set) {
waitq_set_init(&mqueue->imq_set_queue,
SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST,
- reserved_link, NULL);
+ NULL, NULL);
} else {
- waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
+ waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO | SYNC_POLICY_PORT);
ipc_kmsg_queue_init(&mqueue->imq_messages);
mqueue->imq_seqno = 0;
mqueue->imq_msgcount = 0;
kern_return_t kr;
assert(reserved_link && *reserved_link != 0);
+ assert(waitqs_is_linked(set_waitq));
imq_lock(port_mqueue);
*/
msize = ipc_kmsg_copyout_size(kmsg, th->map);
if (th->ith_rsize <
- (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(th), th->ith_option))) {
+ (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th), th->ith_option))) {
th->ith_state = MACH_RCV_TOO_LARGE;
th->ith_msize = msize;
if (th->ith_option & MACH_RCV_LARGE) {
ipc_mqueue_changed(
ipc_mqueue_t mqueue)
{
- /* Indicate that this message queue is vanishing */
- knote_vanish(&mqueue->imq_klist);
+ if (IMQ_KLIST_VALID(mqueue)) {
+ /*
+ * Indicate that this message queue is vanishing
+ *
+ * When this is called, the associated receive right may be in flight
+ * between two tasks: the one it used to live in, and the one that armed
+ * a port destroyed notification for it.
+ *
+ * The new process may want to register the port it gets back with an
+ * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
+ * port pending already, in which case we want the imq_klist field to be
+ * reusable for nefarious purposes (see IMQ_SET_INHERITOR).
+ *
+ * Fortunately, we really don't need this linkage anymore after this
+ * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
+ */
+ knote_vanish(&mqueue->imq_klist);
+ klist_init(&mqueue->imq_klist);
+ }
waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
IPC_MQUEUE_RECEIVE,
}
-
+
/*
* Routine: ipc_mqueue_send
* Purpose:
* Send a message to a message queue. The message holds a reference
- * for the destination port for this message queue in the
+ * for the destination port for this message queue in the
* msgh_remote_port field.
*
* If unsuccessful, the caller still has possession of
* 3) Message is sent to a send-once right.
*/
if (!imq_full(mqueue) ||
- (!imq_full_kernel(mqueue) &&
+ (!imq_full_kernel(mqueue) &&
((option & MACH_SEND_ALWAYS) ||
(MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
imq_unlock(mqueue);
} else {
thread_t cur_thread = current_thread();
+ ipc_port_t port = ip_from_mq(mqueue);
+ struct turnstile *send_turnstile = TURNSTILE_NULL;
+ turnstile_inheritor_t inheritor = TURNSTILE_INHERITOR_NULL;
uint64_t deadline;
- /*
+ /*
* We have to wait for space to be granted to us.
*/
if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
deadline = 0;
thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
- wresult = waitq_assert_wait64_locked(
- &mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_ABORTSAFE,
- TIMEOUT_URGENCY_USER_NORMAL,
- deadline, TIMEOUT_NO_LEEWAY,
- cur_thread);
+
+ send_turnstile = turnstile_prepare((uintptr_t)port,
+ port_send_turnstile_address(port),
+ TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
+
+ /* Check if the port in is in transit, get the destination port's turnstile */
+ if (ip_active(port) &&
+ port->ip_receiver_name == MACH_PORT_NULL &&
+ port->ip_destination != NULL) {
+ inheritor = port_send_turnstile(port->ip_destination);
+ } else {
+ inheritor = ipc_port_get_inheritor(port);
+ }
+
+ turnstile_update_inheritor(send_turnstile, inheritor,
+ TURNSTILE_DELAYED_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
+
+ wresult = waitq_assert_wait64_leeway(
+ &send_turnstile->ts_waitq,
+ IPC_MQUEUE_FULL,
+ THREAD_ABORTSAFE,
+ TIMEOUT_URGENCY_USER_NORMAL,
+ deadline,
+ TIMEOUT_NO_LEEWAY);
imq_unlock(mqueue);
-
+ turnstile_update_inheritor_complete(send_turnstile,
+ TURNSTILE_INTERLOCK_NOT_HELD);
+
if (wresult == THREAD_WAITING) {
wresult = thread_block(THREAD_CONTINUE_NULL);
counter(c_ipc_mqueue_send_block++);
}
-
+
+ /* Call turnstile complete with interlock held */
+ imq_lock(mqueue);
+ turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL);
+ imq_unlock(mqueue);
+
+ /* Call cleanup after dropping the interlock */
+ turnstile_cleanup();
+
switch (wresult) {
case THREAD_AWAKENED:
- /*
+ /*
* we can proceed - inherited msgcount from waker
* or the message queue has been destroyed and the msgcount
* has been reset to zero (will detect in ipc_mqueue_post()).
*/
break;
-
+
case THREAD_TIMED_OUT:
assert(option & MACH_SEND_TIMEOUT);
return MACH_SEND_TIMED_OUT;
-
+
case THREAD_INTERRUPTED:
return MACH_SEND_INTERRUPTED;
-
+
case THREAD_RESTART:
/* mqueue is being destroyed */
return MACH_SEND_INVALID_DEST;
imq_lock(mqueue);
assert(imq_valid(mqueue));
assert(!imq_is_set(mqueue));
-
+
if (imq_full(mqueue)) {
ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
- if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override))
- KNOTE(&mqueue->imq_klist, 0);
+ if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override)) {
+ if (IMQ_KLIST_VALID(mqueue))
+ KNOTE(&mqueue->imq_klist, 0);
+ }
if (!first)
full_queue_empty = TRUE;
}
void
ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq)
{
+ struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(port_mq));
(void)set_mq;
assert(imq_held(port_mq));
assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
port_mq->imq_msgcount--;
- if (!imq_full(port_mq) && port_mq->imq_fullwaiters) {
+ if (!imq_full(port_mq) && port_mq->imq_fullwaiters &&
+ send_turnstile != TURNSTILE_NULL) {
/*
* boost the priority of the awoken thread
* (WAITQ_PROMOTE_PRIORITY) to ensure it uses
* the message queue slot we've just reserved.
*
* NOTE: this will never prepost
+ *
+ * The wakeup happens on a turnstile waitq
+ * which will wakeup the highest priority waiter.
+ * A potential downside of this would be starving low
+ * priority senders if there is a constant churn of
+ * high priority threads trying to send to this port.
*/
- if (waitq_wakeup64_one_locked(&port_mq->imq_wait_queue,
+ if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
IPC_MQUEUE_FULL,
THREAD_AWAKENED,
- NULL,
- WAITQ_PROMOTE_PRIORITY,
- WAITQ_KEEP_LOCKED) != KERN_SUCCESS) {
+ WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
port_mq->imq_fullwaiters = FALSE;
} else {
/* gave away our slot - add reference back */
if (receiver == THREAD_NULL) {
- /*
+ /*
* no receivers; queue kmsg if space still reserved
* Reservations are cancelled when the port goes inactive.
* note that this will enqueue the message for any
- * "peeking" receivers.
+ * "peeking" receivers.
*
* Also, post the knote to wake up any threads waiting
* on that style of interface if this insertion is of
* note (first insertion, or adjusted override qos all
* the way to the head of the queue).
- *
+ *
* This is just for ports. portset knotes are stay-active,
* and their threads get awakened through the !MACH_RCV_IN_PROGRESS
* logic below).
*/
if (mqueue->imq_msgcount > 0) {
- if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg))
- KNOTE(&mqueue->imq_klist, 0);
+ if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
+ if (IMQ_KLIST_VALID(mqueue))
+ KNOTE(&mqueue->imq_klist, 0);
+ }
break;
}
destroy_msg = TRUE;
goto out_unlock;
}
-
+
/*
* If a thread is attempting a "peek" into the message queue
* (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
continue;
}
-
+
/*
* We found a waiting thread.
* If the message is too large or the scatter list is too small
*/
msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
if (receiver->ith_rsize <
- (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(receiver), receiver->ith_option))) {
+ (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver), receiver->ith_option))) {
receiver->ith_msize = msize;
receiver->ith_state = MACH_RCV_TOO_LARGE;
} else {
return;
if (wresult == THREAD_WAITING) {
- counter((interruptible == THREAD_ABORTSAFE) ?
+ counter((interruptible == THREAD_ABORTSAFE) ?
c_ipc_mqueue_receive_block_user++ :
c_ipc_mqueue_receive_block_kernel++);
{
wait_result_t wresult;
uint64_t deadline;
+ struct turnstile *rcv_turnstile = TURNSTILE_NULL;
+ turnstile_inheritor_t inheritor = NULL;
/* called with mqueue locked */
*/
return THREAD_RESTART;
}
-
+
if (imq_is_set(mqueue)) {
ipc_mqueue_t port_mq = IMQ_NULL;
/*
* Receive on a single port. Just try to get the messages.
*/
- kmsgs = &mqueue->imq_messages;
+ kmsgs = &mqueue->imq_messages;
if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
if (option & MACH_PEEK_MSG)
ipc_mqueue_peek_on_thread(mqueue, option, thread);
panic("Unknown mqueue type 0x%x: likely memory corruption!\n",
mqueue->imq_wait_queue.waitq_type);
}
-
+
/*
* Looks like we'll have to block. The mqueue we will
* block on (whether the set's or the local port's) is
else
deadline = 0;
+ /*
+ * Threads waiting on a port (not portset)
+ * will wait on port's receive turnstile.
+ * Donate waiting thread's turnstile and
+ * setup inheritor for special reply port.
+ * Based on the state of the special reply
+ * port, the inheritor would be the send
+ * turnstile of the connection port on which
+ * the send of sync ipc would happen or
+ * workloop's turnstile who would reply to
+ * the sync ipc message.
+ *
+ * Pass in mqueue wait in waitq_assert_wait to
+ * support port set wakeup. The mqueue waitq of port
+ * will be converted to to turnstile waitq
+ * in waitq_assert_wait instead of global waitqs.
+ */
+ if (imq_is_queue(mqueue)) {
+ ipc_port_t port = ip_from_mq(mqueue);
+ rcv_turnstile = turnstile_prepare((uintptr_t)port,
+ port_rcv_turnstile_address(port),
+ TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
+
+ if (port->ip_specialreply) {
+ inheritor = ipc_port_get_special_reply_port_inheritor(port);
+ }
+
+ turnstile_update_inheritor(rcv_turnstile, inheritor,
+ (TURNSTILE_INHERITOR_TURNSTILE | TURNSTILE_DELAYED_UPDATE));
+ }
+
thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue,
IPC_MQUEUE_RECEIVE,
imq_unlock(mqueue);
+ /* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
+ if (rcv_turnstile != TURNSTILE_NULL) {
+ turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
+ }
+ /* Its callers responsibility to call turnstile_complete to get the turnstile back */
+
return wresult;
}
* (and size needed).
*/
msize = ipc_kmsg_copyout_size(kmsg, thread->map);
- if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) {
+ if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(thread), option) > max_size) {
mr = MACH_RCV_TOO_LARGE;
if (option & MACH_RCV_LARGE) {
thread->ith_receiver_name = port_mq->imq_receiver_name;
if (seqno == 0) {
seqno = mq->imq_seqno;
msgoff = 0;
- } else if (seqno >= mq->imq_seqno &&
+ } else if (seqno >= mq->imq_seqno &&
seqno < mq->imq_seqno + mq->imq_msgcount) {
msgoff = seqno - mq->imq_seqno;
} else
if (msg_idp != NULL)
*msg_idp = kmsg->ikm_header->msgh_id;
if (msg_trailerp != NULL)
- memcpy(msg_trailerp,
+ memcpy(msg_trailerp,
(mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
round_msg(kmsg->ikm_header->msgh_size)),
sizeof(mach_msg_max_trailer_t));
(void)ctx;
(void)wqset;
-
+
if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL)
return WQ_ITERATE_BREAK; /* break out of the prepost iteration */
ipc_kmsg_queue_t kmqueue;
ipc_kmsg_t kmsg;
boolean_t reap = FALSE;
+ struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
assert(!imq_is_set(mqueue));
* (never preposts)
*/
mqueue->imq_fullwaiters = FALSE;
- waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_RESTART,
- NULL,
- WAITQ_ALL_PRIORITIES,
- WAITQ_KEEP_LOCKED);
+
+ if (send_turnstile != TURNSTILE_NULL) {
+ waitq_wakeup64_all(&send_turnstile->ts_waitq,
+ IPC_MQUEUE_FULL,
+ THREAD_RESTART,
+ WAITQ_ALL_PRIORITIES);
+ }
/*
* Move messages from the specified queue to the per-thread
imq_lock(mqueue);
if (qlimit > mqueue->imq_qlimit) {
mach_port_msgcount_t i, wakeup;
+ struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
/* caution: wakeup, qlimit are unsigned */
wakeup = qlimit - mqueue->imq_qlimit;
*
* NOTE: this will never prepost
*/
- if (waitq_wakeup64_one_locked(&mqueue->imq_wait_queue,
- IPC_MQUEUE_FULL,
- THREAD_AWAKENED,
- NULL,
- WAITQ_PROMOTE_PRIORITY,
- WAITQ_KEEP_LOCKED) == KERN_NOT_WAITING) {
+ if (send_turnstile == TURNSTILE_NULL ||
+ waitq_wakeup64_one(&send_turnstile->ts_waitq,
+ IPC_MQUEUE_FULL,
+ THREAD_AWAKENED,
+ WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
mqueue->imq_fullwaiters = FALSE;
break;
}