]> git.saurik.com Git - apple/xnu.git/blame - osfmk/ipc/ipc_mqueue.c
xnu-4903.241.1.tar.gz
[apple/xnu.git] / osfmk / ipc / ipc_mqueue.c
CommitLineData
1c79356b 1/*
2d21ac55 2 * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
1c79356b 3 *
2d21ac55 4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
1c79356b 5 *
2d21ac55
A
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
8f6c56a5 14 *
2d21ac55
A
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
8f6c56a5
A
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
2d21ac55
A
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
8f6c56a5 25 *
2d21ac55 26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
1c79356b
A
27 */
28/*
29 * @OSF_FREE_COPYRIGHT@
30 */
31/*
32 * Mach Operating System
33 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34 * All Rights Reserved.
35 *
36 * Permission to use, copy, modify and distribute this software and its
37 * documentation is hereby granted, provided that both the copyright
38 * notice and this permission notice appear in all copies of the
39 * software, derivative works or modified versions, and any portions
40 * thereof, and that both notices appear in supporting documentation.
41 *
42 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45 *
46 * Carnegie Mellon requests users of this software to return to
47 *
48 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
49 * School of Computer Science
50 * Carnegie Mellon University
51 * Pittsburgh PA 15213-3890
52 *
53 * any improvements or extensions that they make and grant Carnegie Mellon
54 * the rights to redistribute these changes.
55 */
56/*
57 */
58/*
59 * File: ipc/ipc_mqueue.c
60 * Author: Rich Draves
61 * Date: 1989
62 *
63 * Functions to manipulate IPC message queues.
64 */
2d21ac55
A
65/*
66 * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67 * support for mandatory and extensible security protections. This notice
68 * is included in support of clause 2.2 (b) of the Apple Public License,
69 * Version 2.0.
70 */
71
1c79356b
A
72
73#include <mach/port.h>
74#include <mach/message.h>
75#include <mach/sync_policy.h>
76
77#include <kern/assert.h>
78#include <kern/counters.h>
79#include <kern/sched_prim.h>
80#include <kern/ipc_kobject.h>
91447636 81#include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
1c79356b
A
82#include <kern/misc_protos.h>
83#include <kern/task.h>
84#include <kern/thread.h>
3e170ce0 85#include <kern/waitq.h>
1c79356b
A
86
87#include <ipc/ipc_mqueue.h>
88#include <ipc/ipc_kmsg.h>
89#include <ipc/ipc_port.h>
90#include <ipc/ipc_pset.h>
91#include <ipc/ipc_space.h>
92
39037602
A
93#if MACH_FLIPC
94#include <ipc/flipc.h>
95#endif
96
b0d623f7
A
97#ifdef __LP64__
98#include <vm/vm_map.h>
99#endif
1c79356b 100
39037602
A
101#include <sys/event.h>
102
103extern char *proc_name_address(void *p);
104
1c79356b
A
105int ipc_mqueue_full; /* address is event for queue space */
106int ipc_mqueue_rcv; /* address is event for message arrival */
107
91447636
A
108/* forward declarations */
109void ipc_mqueue_receive_results(wait_result_t result);
39037602
A
110static void ipc_mqueue_peek_on_thread(
111 ipc_mqueue_t port_mq,
112 mach_msg_option_t option,
113 thread_t thread);
91447636 114
1c79356b
A
115/*
116 * Routine: ipc_mqueue_init
117 * Purpose:
118 * Initialize a newly-allocated message queue.
119 */
120void
121ipc_mqueue_init(
122 ipc_mqueue_t mqueue,
d9a64523 123 boolean_t is_set)
1c79356b
A
124{
125 if (is_set) {
3e170ce0 126 waitq_set_init(&mqueue->imq_set_queue,
39037602 127 SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST,
d9a64523 128 NULL, NULL);
1c79356b 129 } else {
d9a64523 130 waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO | SYNC_POLICY_PORT);
1c79356b
A
131 ipc_kmsg_queue_init(&mqueue->imq_messages);
132 mqueue->imq_seqno = 0;
133 mqueue->imq_msgcount = 0;
134 mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
135 mqueue->imq_fullwaiters = FALSE;
39037602
A
136#if MACH_FLIPC
137 mqueue->imq_fport = FPORT_NULL;
138#endif
1c79356b 139 }
39037602 140 klist_init(&mqueue->imq_klist);
1c79356b
A
141}
142
3e170ce0
A
143void ipc_mqueue_deinit(
144 ipc_mqueue_t mqueue)
145{
146 boolean_t is_set = imq_is_set(mqueue);
147
148 if (is_set)
149 waitq_set_deinit(&mqueue->imq_set_queue);
150 else
151 waitq_deinit(&mqueue->imq_wait_queue);
152}
153
154/*
155 * Routine: imq_reserve_and_lock
156 * Purpose:
157 * Atomically lock an ipc_mqueue_t object and reserve
158 * an appropriate number of prepost linkage objects for
159 * use in wakeup operations.
160 * Conditions:
161 * mq is unlocked
162 */
163void
39037602 164imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost)
3e170ce0
A
165{
166 *reserved_prepost = waitq_prepost_reserve(&mq->imq_wait_queue, 0,
39037602 167 WAITQ_KEEP_LOCKED);
3e170ce0
A
168
169}
170
171
172/*
173 * Routine: imq_release_and_unlock
174 * Purpose:
175 * Unlock an ipc_mqueue_t object, re-enable interrupts,
176 * and release any unused prepost object reservations.
177 * Conditions:
178 * mq is locked
179 */
180void
39037602 181imq_release_and_unlock(ipc_mqueue_t mq, uint64_t reserved_prepost)
3e170ce0
A
182{
183 assert(imq_held(mq));
184 waitq_unlock(&mq->imq_wait_queue);
3e170ce0
A
185 waitq_prepost_release_reserve(reserved_prepost);
186}
187
188
1c79356b
A
189/*
190 * Routine: ipc_mqueue_member
191 * Purpose:
192 * Indicate whether the (port) mqueue is a member of
193 * this portset's mqueue. We do this by checking
194 * whether the portset mqueue's waitq is an member of
195 * the port's mqueue waitq.
196 * Conditions:
197 * the portset's mqueue is not already a member
198 * this may block while allocating linkage structures.
199 */
200
201boolean_t
202ipc_mqueue_member(
91447636
A
203 ipc_mqueue_t port_mqueue,
204 ipc_mqueue_t set_mqueue)
1c79356b 205{
3e170ce0
A
206 struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
207 struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
1c79356b 208
3e170ce0 209 return waitq_member(port_waitq, set_waitq);
1c79356b
A
210
211}
212
213/*
214 * Routine: ipc_mqueue_remove
215 * Purpose:
216 * Remove the association between the queue and the specified
9bccf70c 217 * set message queue.
1c79356b
A
218 */
219
220kern_return_t
221ipc_mqueue_remove(
316670eb 222 ipc_mqueue_t mqueue,
3e170ce0 223 ipc_mqueue_t set_mqueue)
1c79356b 224{
3e170ce0
A
225 struct waitq *mq_waitq = &mqueue->imq_wait_queue;
226 struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
1c79356b 227
3e170ce0 228 return waitq_unlink(mq_waitq, set_waitq);
1c79356b
A
229}
230
231/*
9bccf70c 232 * Routine: ipc_mqueue_remove_from_all
1c79356b 233 * Purpose:
9bccf70c 234 * Remove the mqueue from all the sets it is a member of
1c79356b 235 * Conditions:
9bccf70c 236 * Nothing locked.
39037602
A
237 * Returns:
238 * mqueue unlocked and set links deallocated
1c79356b
A
239 */
240void
3e170ce0 241ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue)
1c79356b 242{
3e170ce0 243 struct waitq *mq_waitq = &mqueue->imq_wait_queue;
39037602 244 kern_return_t kr;
1c79356b 245
39037602
A
246 imq_lock(mqueue);
247
248 assert(waitq_valid(mq_waitq));
249 kr = waitq_unlink_all_unlock(mq_waitq);
250 /* mqueue unlocked and set links deallocated */
9bccf70c
A
251}
252
253/*
254 * Routine: ipc_mqueue_remove_all
255 * Purpose:
256 * Remove all the member queues from the specified set.
3e170ce0 257 * Also removes the queue from any containing sets.
9bccf70c
A
258 * Conditions:
259 * Nothing locked.
39037602
A
260 * Returns:
261 * mqueue unlocked all set links deallocated
9bccf70c
A
262 */
263void
3e170ce0 264ipc_mqueue_remove_all(ipc_mqueue_t mqueue)
9bccf70c 265{
3e170ce0 266 struct waitq_set *mq_setq = &mqueue->imq_set_queue;
39037602
A
267
268 imq_lock(mqueue);
269 assert(waitqs_is_set(mq_setq));
270 waitq_set_unlink_all_unlock(mq_setq);
271 /* mqueue unlocked set links deallocated */
1c79356b
A
272}
273
274
275/*
276 * Routine: ipc_mqueue_add
277 * Purpose:
278 * Associate the portset's mqueue with the port's mqueue.
279 * This has to be done so that posting the port will wakeup
280 * a portset waiter. If there are waiters on the portset
281 * mqueue and messages on the port mqueue, try to match them
282 * up now.
283 * Conditions:
284 * May block.
285 */
286kern_return_t
287ipc_mqueue_add(
3e170ce0
A
288 ipc_mqueue_t port_mqueue,
289 ipc_mqueue_t set_mqueue,
290 uint64_t *reserved_link,
291 uint64_t *reserved_prepost)
1c79356b 292{
3e170ce0
A
293 struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
294 struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
1c79356b
A
295 ipc_kmsg_queue_t kmsgq;
296 ipc_kmsg_t kmsg, next;
297 kern_return_t kr;
1c79356b 298
3e170ce0 299 assert(reserved_link && *reserved_link != 0);
d9a64523 300 assert(waitqs_is_linked(set_waitq));
3e170ce0 301
3e170ce0
A
302 imq_lock(port_mqueue);
303
304 /*
305 * The link operation is now under the same lock-hold as
306 * message iteration and thread wakeup, but doesn't have to be...
307 */
308 kr = waitq_link(port_waitq, set_waitq, WAITQ_ALREADY_LOCKED, reserved_link);
309 if (kr != KERN_SUCCESS) {
310 imq_unlock(port_mqueue);
1c79356b 311 return kr;
3e170ce0 312 }
1c79356b
A
313
314 /*
315 * Now that the set has been added to the port, there may be
316 * messages queued on the port and threads waiting on the set
317 * waitq. Lets get them together.
318 */
1c79356b
A
319 kmsgq = &port_mqueue->imq_messages;
320 for (kmsg = ipc_kmsg_queue_first(kmsgq);
321 kmsg != IKM_NULL;
322 kmsg = next) {
323 next = ipc_kmsg_queue_next(kmsgq, kmsg);
324
325 for (;;) {
326 thread_t th;
b0d623f7 327 mach_msg_size_t msize;
3e170ce0 328 spl_t th_spl;
1c79356b 329
39037602 330 th = waitq_wakeup64_identify_locked(
9bccf70c
A
331 port_waitq,
332 IPC_MQUEUE_RECEIVE,
3e170ce0 333 THREAD_AWAKENED, &th_spl,
39037602
A
334 reserved_prepost, WAITQ_ALL_PRIORITIES,
335 WAITQ_KEEP_LOCKED);
1c79356b
A
336 /* waitq/mqueue still locked, thread locked */
337
338 if (th == THREAD_NULL)
339 goto leave;
340
b0d623f7
A
341 /*
342 * If the receiver waited with a facility not directly
343 * related to Mach messaging, then it isn't prepared to get
344 * handed the message directly. Just set it running, and
345 * go look for another thread that can.
346 */
347 if (th->ith_state != MACH_RCV_IN_PROGRESS) {
39037602
A
348 if (th->ith_state == MACH_PEEK_IN_PROGRESS) {
349 /*
350 * wakeup the peeking thread, but
351 * continue to loop over the threads
352 * waiting on the port's mqueue to see
353 * if there are any actual receivers
354 */
355 ipc_mqueue_peek_on_thread(port_mqueue,
356 th->ith_option,
357 th);
358 }
359 thread_unlock(th);
360 splx(th_spl);
361 continue;
b0d623f7
A
362 }
363
1c79356b
A
364 /*
365 * Found a receiver. see if they can handle the message
366 * correctly (the message is not too large for them, or
367 * they didn't care to be informed that the message was
368 * too large). If they can't handle it, take them off
369 * the list and let them go back and figure it out and
370 * just move onto the next.
371 */
b0d623f7 372 msize = ipc_kmsg_copyout_size(kmsg, th->map);
39037602 373 if (th->ith_rsize <
d9a64523 374 (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th), th->ith_option))) {
1c79356b 375 th->ith_state = MACH_RCV_TOO_LARGE;
b0d623f7 376 th->ith_msize = msize;
1c79356b
A
377 if (th->ith_option & MACH_RCV_LARGE) {
378 /*
379 * let him go without message
380 */
b0d623f7 381 th->ith_receiver_name = port_mqueue->imq_receiver_name;
1c79356b
A
382 th->ith_kmsg = IKM_NULL;
383 th->ith_seqno = 0;
384 thread_unlock(th);
3e170ce0 385 splx(th_spl);
1c79356b
A
386 continue; /* find another thread */
387 }
388 } else {
389 th->ith_state = MACH_MSG_SUCCESS;
390 }
391
392 /*
393 * This thread is going to take this message,
394 * so give it to him.
395 */
1c79356b 396 ipc_kmsg_rmqueue(kmsgq, kmsg);
39037602
A
397#if MACH_FLIPC
398 mach_node_t node = kmsg->ikm_node;
399#endif
3e170ce0 400 ipc_mqueue_release_msgcount(port_mqueue, IMQ_NULL);
91447636 401
1c79356b
A
402 th->ith_kmsg = kmsg;
403 th->ith_seqno = port_mqueue->imq_seqno++;
404 thread_unlock(th);
3e170ce0 405 splx(th_spl);
39037602
A
406#if MACH_FLIPC
407 if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport))
408 flipc_msg_ack(node, port_mqueue, TRUE);
409#endif
1c79356b
A
410 break; /* go to next message */
411 }
1c79356b
A
412 }
413 leave:
414 imq_unlock(port_mqueue);
1c79356b
A
415 return KERN_SUCCESS;
416}
417
418/*
419 * Routine: ipc_mqueue_changed
420 * Purpose:
421 * Wake up receivers waiting in a message queue.
422 * Conditions:
423 * The message queue is locked.
424 */
425
426void
427ipc_mqueue_changed(
428 ipc_mqueue_t mqueue)
429{
d9a64523
A
430 if (IMQ_KLIST_VALID(mqueue)) {
431 /*
432 * Indicate that this message queue is vanishing
433 *
434 * When this is called, the associated receive right may be in flight
435 * between two tasks: the one it used to live in, and the one that armed
436 * a port destroyed notification for it.
437 *
438 * The new process may want to register the port it gets back with an
439 * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
440 * port pending already, in which case we want the imq_klist field to be
441 * reusable for nefarious purposes (see IMQ_SET_INHERITOR).
442 *
443 * Fortunately, we really don't need this linkage anymore after this
444 * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
445 */
446 knote_vanish(&mqueue->imq_klist);
447 klist_init(&mqueue->imq_klist);
448 }
39037602 449
3e170ce0
A
450 waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
451 IPC_MQUEUE_RECEIVE,
452 THREAD_RESTART,
453 NULL,
454 WAITQ_ALL_PRIORITIES,
455 WAITQ_KEEP_LOCKED);
1c79356b
A
456}
457
458
d9a64523 459
1c79356b
A
460
461/*
462 * Routine: ipc_mqueue_send
463 * Purpose:
464 * Send a message to a message queue. The message holds a reference
d9a64523 465 * for the destination port for this message queue in the
1c79356b
A
466 * msgh_remote_port field.
467 *
468 * If unsuccessful, the caller still has possession of
469 * the message and must do something with it. If successful,
470 * the message is queued, given to a receiver, or destroyed.
471 * Conditions:
39236c6e 472 * mqueue is locked.
1c79356b
A
473 * Returns:
474 * MACH_MSG_SUCCESS The message was accepted.
475 * MACH_SEND_TIMED_OUT Caller still has message.
476 * MACH_SEND_INTERRUPTED Caller still has message.
477 */
478mach_msg_return_t
479ipc_mqueue_send(
480 ipc_mqueue_t mqueue,
b0d623f7 481 ipc_kmsg_t kmsg,
1c79356b 482 mach_msg_option_t option,
39037602 483 mach_msg_timeout_t send_timeout)
1c79356b 484{
9bccf70c 485 int wresult;
1c79356b
A
486
487 /*
488 * Don't block if:
489 * 1) We're under the queue limit.
490 * 2) Caller used the MACH_SEND_ALWAYS internal option.
491 * 3) Message is sent to a send-once right.
492 */
1c79356b 493 if (!imq_full(mqueue) ||
d9a64523 494 (!imq_full_kernel(mqueue) &&
c910b4d9
A
495 ((option & MACH_SEND_ALWAYS) ||
496 (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
497 MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
1c79356b 498 mqueue->imq_msgcount++;
91447636 499 assert(mqueue->imq_msgcount > 0);
1c79356b 500 imq_unlock(mqueue);
1c79356b 501 } else {
55e303ae 502 thread_t cur_thread = current_thread();
d9a64523
A
503 ipc_port_t port = ip_from_mq(mqueue);
504 struct turnstile *send_turnstile = TURNSTILE_NULL;
505 turnstile_inheritor_t inheritor = TURNSTILE_INHERITOR_NULL;
91447636 506 uint64_t deadline;
1c79356b 507
d9a64523 508 /*
1c79356b
A
509 * We have to wait for space to be granted to us.
510 */
91447636 511 if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
1c79356b 512 imq_unlock(mqueue);
1c79356b
A
513 return MACH_SEND_TIMED_OUT;
514 }
c910b4d9
A
515 if (imq_full_kernel(mqueue)) {
516 imq_unlock(mqueue);
c910b4d9
A
517 return MACH_SEND_NO_BUFFER;
518 }
1c79356b 519 mqueue->imq_fullwaiters = TRUE;
39037602 520
91447636
A
521 if (option & MACH_SEND_TIMEOUT)
522 clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
523 else
524 deadline = 0;
813fb2f6
A
525
526 thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
d9a64523
A
527
528 send_turnstile = turnstile_prepare((uintptr_t)port,
529 port_send_turnstile_address(port),
530 TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
531
532 /* Check if the port in is in transit, get the destination port's turnstile */
533 if (ip_active(port) &&
534 port->ip_receiver_name == MACH_PORT_NULL &&
535 port->ip_destination != NULL) {
536 inheritor = port_send_turnstile(port->ip_destination);
537 } else {
538 inheritor = ipc_port_get_inheritor(port);
539 }
540
541 turnstile_update_inheritor(send_turnstile, inheritor,
542 TURNSTILE_DELAYED_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
543
544 wresult = waitq_assert_wait64_leeway(
545 &send_turnstile->ts_waitq,
546 IPC_MQUEUE_FULL,
547 THREAD_ABORTSAFE,
548 TIMEOUT_URGENCY_USER_NORMAL,
549 deadline,
550 TIMEOUT_NO_LEEWAY);
39037602 551
55e303ae 552 imq_unlock(mqueue);
d9a64523
A
553 turnstile_update_inheritor_complete(send_turnstile,
554 TURNSTILE_INTERLOCK_NOT_HELD);
555
9bccf70c 556 if (wresult == THREAD_WAITING) {
91447636 557 wresult = thread_block(THREAD_CONTINUE_NULL);
9bccf70c
A
558 counter(c_ipc_mqueue_send_block++);
559 }
d9a64523
A
560
561 /* Call turnstile complete with interlock held */
562 imq_lock(mqueue);
563 turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL);
564 imq_unlock(mqueue);
565
566 /* Call cleanup after dropping the interlock */
567 turnstile_cleanup();
568
9bccf70c 569 switch (wresult) {
3e170ce0
A
570
571 case THREAD_AWAKENED:
d9a64523 572 /*
3e170ce0
A
573 * we can proceed - inherited msgcount from waker
574 * or the message queue has been destroyed and the msgcount
575 * has been reset to zero (will detect in ipc_mqueue_post()).
576 */
577 break;
d9a64523 578
1c79356b
A
579 case THREAD_TIMED_OUT:
580 assert(option & MACH_SEND_TIMEOUT);
581 return MACH_SEND_TIMED_OUT;
d9a64523 582
1c79356b 583 case THREAD_INTERRUPTED:
1c79356b 584 return MACH_SEND_INTERRUPTED;
d9a64523 585
1c79356b 586 case THREAD_RESTART:
b0d623f7
A
587 /* mqueue is being destroyed */
588 return MACH_SEND_INVALID_DEST;
1c79356b
A
589 default:
590 panic("ipc_mqueue_send");
591 }
592 }
593
39037602 594 ipc_mqueue_post(mqueue, kmsg, option);
1c79356b
A
595 return MACH_MSG_SUCCESS;
596}
597
39037602
A
598/*
599 * Routine: ipc_mqueue_override_send
600 * Purpose:
601 * Set an override qos on the first message in the queue
602 * (if the queue is full). This is a send-possible override
603 * that will go away as soon as we drain a message from the
604 * queue.
605 *
606 * Conditions:
607 * The message queue is not locked.
608 * The caller holds a reference on the message queue.
609 */
610extern void ipc_mqueue_override_send(
611 ipc_mqueue_t mqueue,
612 mach_msg_priority_t override)
613{
614 boolean_t __unused full_queue_empty = FALSE;
615
616 imq_lock(mqueue);
617 assert(imq_valid(mqueue));
618 assert(!imq_is_set(mqueue));
d9a64523 619
39037602
A
620 if (imq_full(mqueue)) {
621 ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
622
d9a64523
A
623 if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override)) {
624 if (IMQ_KLIST_VALID(mqueue))
625 KNOTE(&mqueue->imq_klist, 0);
626 }
39037602
A
627 if (!first)
628 full_queue_empty = TRUE;
629 }
630 imq_unlock(mqueue);
631
632#if DEVELOPMENT || DEBUG
633 if (full_queue_empty) {
634 ipc_port_t port = ip_from_mq(mqueue);
635 int dst_pid = 0;
636 if (ip_active(port) && !port->ip_tempowner &&
637 port->ip_receiver_name && port->ip_receiver &&
638 port->ip_receiver != ipc_space_kernel) {
639 dst_pid = task_pid(port->ip_receiver->is_task);
640 }
39037602
A
641 }
642#endif
643}
39236c6e 644
1c79356b
A
645/*
646 * Routine: ipc_mqueue_release_msgcount
647 * Purpose:
648 * Release a message queue reference in the case where we
649 * found a waiter.
650 *
651 * Conditions:
91447636
A
652 * The message queue is locked.
653 * The message corresponding to this reference is off the queue.
3e170ce0
A
654 * There is no need to pass reserved preposts because this will
655 * never prepost to anyone
1c79356b
A
656 */
657void
3e170ce0 658ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq)
1c79356b 659{
d9a64523 660 struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(port_mq));
3e170ce0
A
661 (void)set_mq;
662 assert(imq_held(port_mq));
663 assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
1c79356b 664
3e170ce0 665 port_mq->imq_msgcount--;
91447636 666
d9a64523
A
667 if (!imq_full(port_mq) && port_mq->imq_fullwaiters &&
668 send_turnstile != TURNSTILE_NULL) {
3e170ce0
A
669 /*
670 * boost the priority of the awoken thread
671 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
672 * the message queue slot we've just reserved.
673 *
674 * NOTE: this will never prepost
d9a64523
A
675 *
676 * The wakeup happens on a turnstile waitq
677 * which will wakeup the highest priority waiter.
678 * A potential downside of this would be starving low
679 * priority senders if there is a constant churn of
680 * high priority threads trying to send to this port.
3e170ce0 681 */
d9a64523 682 if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
3e170ce0
A
683 IPC_MQUEUE_FULL,
684 THREAD_AWAKENED,
d9a64523 685 WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
3e170ce0 686 port_mq->imq_fullwaiters = FALSE;
1c79356b 687 } else {
91447636 688 /* gave away our slot - add reference back */
3e170ce0 689 port_mq->imq_msgcount++;
1c79356b
A
690 }
691 }
3e170ce0
A
692
693 if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
694 /* no more msgs: invalidate the port's prepost object */
39037602 695 waitq_clear_prepost_locked(&port_mq->imq_wait_queue);
3e170ce0 696 }
1c79356b
A
697}
698
699/*
700 * Routine: ipc_mqueue_post
701 * Purpose:
702 * Post a message to a waiting receiver or enqueue it. If a
703 * receiver is waiting, we can release our reserved space in
704 * the message queue.
705 *
706 * Conditions:
3e170ce0 707 * mqueue is unlocked
1c79356b
A
708 * If we need to queue, our space in the message queue is reserved.
709 */
710void
711ipc_mqueue_post(
39037602
A
712 ipc_mqueue_t mqueue,
713 ipc_kmsg_t kmsg,
714 mach_msg_option_t __unused option)
1c79356b 715{
3e170ce0 716 uint64_t reserved_prepost = 0;
39037602
A
717 boolean_t destroy_msg = FALSE;
718
719 ipc_kmsg_trace_send(kmsg, option);
1c79356b
A
720
721 /*
722 * While the msg queue is locked, we have control of the
723 * kmsg, so the ref in it for the port is still good.
724 *
725 * Check for a receiver for the message.
726 */
39037602
A
727 imq_reserve_and_lock(mqueue, &reserved_prepost);
728
729 /* we may have raced with port destruction! */
730 if (!imq_valid(mqueue)) {
731 destroy_msg = TRUE;
732 goto out_unlock;
733 }
734
1c79356b 735 for (;;) {
3e170ce0
A
736 struct waitq *waitq = &mqueue->imq_wait_queue;
737 spl_t th_spl;
1c79356b 738 thread_t receiver;
b0d623f7 739 mach_msg_size_t msize;
1c79356b 740
39037602
A
741 receiver = waitq_wakeup64_identify_locked(waitq,
742 IPC_MQUEUE_RECEIVE,
743 THREAD_AWAKENED,
744 &th_spl,
745 &reserved_prepost,
746 WAITQ_ALL_PRIORITIES,
747 WAITQ_KEEP_LOCKED);
1c79356b
A
748 /* waitq still locked, thread locked */
749
750 if (receiver == THREAD_NULL) {
39037602 751
d9a64523 752 /*
39037602
A
753 * no receivers; queue kmsg if space still reserved
754 * Reservations are cancelled when the port goes inactive.
755 * note that this will enqueue the message for any
d9a64523 756 * "peeking" receivers.
39037602
A
757 *
758 * Also, post the knote to wake up any threads waiting
759 * on that style of interface if this insertion is of
760 * note (first insertion, or adjusted override qos all
761 * the way to the head of the queue).
d9a64523 762 *
39037602
A
763 * This is just for ports. portset knotes are stay-active,
764 * and their threads get awakened through the !MACH_RCV_IN_PROGRESS
765 * logic below).
1c79356b 766 */
3e170ce0 767 if (mqueue->imq_msgcount > 0) {
d9a64523
A
768 if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
769 if (IMQ_KLIST_VALID(mqueue))
770 KNOTE(&mqueue->imq_klist, 0);
771 }
3e170ce0
A
772 break;
773 }
774
775 /*
776 * Otherwise, the message queue must belong to an inactive
777 * port, so just destroy the message and pretend it was posted.
778 */
39037602
A
779 destroy_msg = TRUE;
780 goto out_unlock;
1c79356b 781 }
d9a64523 782
b0d623f7 783 /*
39037602
A
784 * If a thread is attempting a "peek" into the message queue
785 * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
786 * thread running. A successful peek is essentially the same as
787 * message delivery since the peeking thread takes responsibility
788 * for delivering the message and (eventually) removing it from
789 * the mqueue. Only one thread can successfully use the peek
790 * facility on any given port, so we exit the waitq loop after
791 * encountering such a thread.
792 */
793 if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) {
794 ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg);
795 ipc_mqueue_peek_on_thread(mqueue, receiver->ith_option, receiver);
796 thread_unlock(receiver);
797 splx(th_spl);
798 break; /* Message was posted, so break out of loop */
799 }
800
801 /*
802 * If the receiver waited with a facility not directly related
803 * to Mach messaging, then it isn't prepared to get handed the
804 * message directly. Just set it running, and go look for
805 * another thread that can.
b0d623f7
A
806 */
807 if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
39037602
A
808 thread_unlock(receiver);
809 splx(th_spl);
810 continue;
b0d623f7
A
811 }
812
d9a64523 813
1c79356b
A
814 /*
815 * We found a waiting thread.
816 * If the message is too large or the scatter list is too small
817 * the thread we wake up will get that as its status.
818 */
b0d623f7 819 msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
39037602 820 if (receiver->ith_rsize <
d9a64523 821 (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver), receiver->ith_option))) {
b0d623f7 822 receiver->ith_msize = msize;
1c79356b
A
823 receiver->ith_state = MACH_RCV_TOO_LARGE;
824 } else {
825 receiver->ith_state = MACH_MSG_SUCCESS;
826 }
827
828 /*
829 * If there is no problem with the upcoming receive, or the
830 * receiver thread didn't specifically ask for special too
831 * large error condition, go ahead and select it anyway.
832 */
833 if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
834 !(receiver->ith_option & MACH_RCV_LARGE)) {
1c79356b
A
835 receiver->ith_kmsg = kmsg;
836 receiver->ith_seqno = mqueue->imq_seqno++;
39037602
A
837#if MACH_FLIPC
838 mach_node_t node = kmsg->ikm_node;
839#endif
1c79356b 840 thread_unlock(receiver);
3e170ce0 841 splx(th_spl);
1c79356b
A
842
843 /* we didn't need our reserved spot in the queue */
3e170ce0 844 ipc_mqueue_release_msgcount(mqueue, IMQ_NULL);
39037602
A
845
846#if MACH_FLIPC
847 if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport))
848 flipc_msg_ack(node, mqueue, TRUE);
849#endif
1c79356b
A
850 break;
851 }
852
853 /*
854 * Otherwise, this thread needs to be released to run
855 * and handle its error without getting the message. We
856 * need to go back and pick another one.
857 */
39236c6e 858 receiver->ith_receiver_name = mqueue->imq_receiver_name;
1c79356b
A
859 receiver->ith_kmsg = IKM_NULL;
860 receiver->ith_seqno = 0;
861 thread_unlock(receiver);
3e170ce0 862 splx(th_spl);
1c79356b
A
863 }
864
39037602 865out_unlock:
3e170ce0
A
866 /* clear the waitq boost we may have been given */
867 waitq_clear_promotion_locked(&mqueue->imq_wait_queue, current_thread());
39037602
A
868 imq_release_and_unlock(mqueue, reserved_prepost);
869 if (destroy_msg)
870 ipc_kmsg_destroy(kmsg);
871
1c79356b
A
872 current_task()->messages_sent++;
873 return;
874}
875
876
91447636
A
877/* static */ void
878ipc_mqueue_receive_results(wait_result_t saved_wait_result)
1c79356b
A
879{
880 thread_t self = current_thread();
881 mach_msg_option_t option = self->ith_option;
1c79356b
A
882
883 /*
884 * why did we wake up?
885 */
886 switch (saved_wait_result) {
887 case THREAD_TIMED_OUT:
888 self->ith_state = MACH_RCV_TIMED_OUT;
889 return;
890
891 case THREAD_INTERRUPTED:
1c79356b
A
892 self->ith_state = MACH_RCV_INTERRUPTED;
893 return;
894
895 case THREAD_RESTART:
896 /* something bad happened to the port/set */
1c79356b
A
897 self->ith_state = MACH_RCV_PORT_CHANGED;
898 return;
899
900 case THREAD_AWAKENED:
901 /*
902 * We do not need to go select a message, somebody
903 * handed us one (or a too-large indication).
904 */
1c79356b
A
905 switch (self->ith_state) {
906 case MACH_RCV_SCATTER_SMALL:
907 case MACH_RCV_TOO_LARGE:
908 /*
909 * Somebody tried to give us a too large
910 * message. If we indicated that we cared,
911 * then they only gave us the indication,
912 * otherwise they gave us the indication
913 * AND the message anyway.
914 */
915 if (option & MACH_RCV_LARGE) {
916 return;
917 }
918
919 case MACH_MSG_SUCCESS:
39037602 920 case MACH_PEEK_READY:
1c79356b
A
921 return;
922
923 default:
924 panic("ipc_mqueue_receive_results: strange ith_state");
925 }
926
927 default:
928 panic("ipc_mqueue_receive_results: strange wait_result");
929 }
930}
931
932void
91447636
A
933ipc_mqueue_receive_continue(
934 __unused void *param,
935 wait_result_t wresult)
1c79356b 936{
91447636 937 ipc_mqueue_receive_results(wresult);
1c79356b
A
938 mach_msg_receive_continue(); /* hard-coded for now */
939}
940
941/*
942 * Routine: ipc_mqueue_receive
943 * Purpose:
944 * Receive a message from a message queue.
945 *
1c79356b
A
946 * Conditions:
947 * Our caller must hold a reference for the port or port set
948 * to which this queue belongs, to keep the queue
949 * from being deallocated.
950 *
951 * The kmsg is returned with clean header fields
39037602
A
952 * and with the circular bit turned off through the ith_kmsg
953 * field of the thread's receive continuation state.
1c79356b 954 * Returns:
39037602
A
955 * MACH_MSG_SUCCESS Message returned in ith_kmsg.
956 * MACH_RCV_TOO_LARGE Message size returned in ith_msize.
1c79356b
A
957 * MACH_RCV_TIMED_OUT No message obtained.
958 * MACH_RCV_INTERRUPTED No message obtained.
959 * MACH_RCV_PORT_DIED Port/set died; no message.
960 * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
961 *
962 */
963
964void
965ipc_mqueue_receive(
b0d623f7
A
966 ipc_mqueue_t mqueue,
967 mach_msg_option_t option,
968 mach_msg_size_t max_size,
969 mach_msg_timeout_t rcv_timeout,
970 int interruptible)
971{
972 wait_result_t wresult;
39037602
A
973 thread_t self = current_thread();
974
975 imq_lock(mqueue);
976 wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size,
977 rcv_timeout, interruptible,
978 self);
979 /* mqueue unlocked */
980 if (wresult == THREAD_NOT_WAITING)
981 return;
b0d623f7
A
982
983 if (wresult == THREAD_WAITING) {
d9a64523 984 counter((interruptible == THREAD_ABORTSAFE) ?
b0d623f7
A
985 c_ipc_mqueue_receive_block_user++ :
986 c_ipc_mqueue_receive_block_kernel++);
987
988 if (self->ith_continuation)
989 thread_block(ipc_mqueue_receive_continue);
990 /* NOTREACHED */
991
992 wresult = thread_block(THREAD_CONTINUE_NULL);
993 }
994 ipc_mqueue_receive_results(wresult);
995}
996
3e170ce0
A
997static int mqueue_process_prepost_receive(void *ctx, struct waitq *waitq,
998 struct waitq_set *wqset)
999{
1000 ipc_mqueue_t port_mq, *pmq_ptr;
1001
1002 (void)wqset;
1003 port_mq = (ipc_mqueue_t)waitq;
1004
1005 /*
1006 * If there are no messages on this queue, skip it and remove
1007 * it from the prepost list
1008 */
1009 if (ipc_kmsg_queue_empty(&port_mq->imq_messages))
1010 return WQ_ITERATE_INVALIDATE_CONTINUE;
1011
1012 /*
1013 * There are messages waiting on this port.
1014 * Instruct the prepost iteration logic to break, but keep the
1015 * waitq locked.
1016 */
1017 pmq_ptr = (ipc_mqueue_t *)ctx;
1018 if (pmq_ptr)
1019 *pmq_ptr = port_mq;
1020 return WQ_ITERATE_BREAK_KEEP_LOCKED;
1021}
1022
39037602
A
1023/*
1024 * Routine: ipc_mqueue_receive_on_thread
1025 * Purpose:
1026 * Receive a message from a message queue using a specified thread.
1027 * If no message available, assert_wait on the appropriate waitq.
1028 *
1029 * Conditions:
1030 * Assumes thread is self.
1031 * Called with mqueue locked.
1032 * Returns with mqueue unlocked.
1033 * May have assert-waited. Caller must block in those cases.
1034 */
b0d623f7
A
1035wait_result_t
1036ipc_mqueue_receive_on_thread(
39037602 1037 ipc_mqueue_t mqueue,
b0d623f7
A
1038 mach_msg_option_t option,
1039 mach_msg_size_t max_size,
1040 mach_msg_timeout_t rcv_timeout,
1041 int interruptible,
1042 thread_t thread)
1c79356b 1043{
91447636 1044 wait_result_t wresult;
b0d623f7 1045 uint64_t deadline;
d9a64523
A
1046 struct turnstile *rcv_turnstile = TURNSTILE_NULL;
1047 turnstile_inheritor_t inheritor = NULL;
1c79356b 1048
39037602
A
1049 /* called with mqueue locked */
1050
3e170ce0 1051 /* no need to reserve anything: we never prepost to anyone */
39037602
A
1052
1053 if (!imq_valid(mqueue)) {
1054 /* someone raced us to destroy this mqueue/port! */
1055 imq_unlock(mqueue);
1056 /*
1057 * ipc_mqueue_receive_results updates the thread's ith_state
1058 * TODO: differentiate between rights being moved and
1059 * rights/ports being destroyed (21885327)
1060 */
1061 return THREAD_RESTART;
1062 }
d9a64523 1063
1c79356b 1064 if (imq_is_set(mqueue)) {
3e170ce0 1065 ipc_mqueue_t port_mq = IMQ_NULL;
1c79356b 1066
3e170ce0
A
1067 (void)waitq_set_iterate_preposts(&mqueue->imq_set_queue,
1068 &port_mq,
39037602 1069 mqueue_process_prepost_receive);
1c79356b 1070
3e170ce0 1071 if (port_mq != IMQ_NULL) {
1c79356b 1072 /*
3e170ce0
A
1073 * We get here if there is at least one message
1074 * waiting on port_mq. We have instructed the prepost
1075 * iteration logic to leave both the port_mq and the
1076 * set mqueue locked.
1077 *
1078 * TODO: previously, we would place this port at the
1079 * back of the prepost list...
1c79356b 1080 */
3e170ce0 1081 imq_unlock(mqueue);
b0d623f7 1082
b0d623f7
A
1083 /*
1084 * Continue on to handling the message with just
1085 * the port mqueue locked.
1086 */
39037602
A
1087 if (option & MACH_PEEK_MSG)
1088 ipc_mqueue_peek_on_thread(port_mq, option, thread);
1089 else
1090 ipc_mqueue_select_on_thread(port_mq, mqueue, option,
1091 max_size, thread);
3e170ce0 1092
1c79356b 1093 imq_unlock(port_mq);
b0d623f7 1094 return THREAD_NOT_WAITING;
1c79356b 1095 }
39037602 1096 } else if (imq_is_queue(mqueue)) {
3e170ce0 1097 ipc_kmsg_queue_t kmsgs;
1c79356b
A
1098
1099 /*
1100 * Receive on a single port. Just try to get the messages.
1101 */
d9a64523 1102 kmsgs = &mqueue->imq_messages;
1c79356b 1103 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
39037602
A
1104 if (option & MACH_PEEK_MSG)
1105 ipc_mqueue_peek_on_thread(mqueue, option, thread);
1106 else
1107 ipc_mqueue_select_on_thread(mqueue, IMQ_NULL, option,
1108 max_size, thread);
1c79356b 1109 imq_unlock(mqueue);
b0d623f7 1110 return THREAD_NOT_WAITING;
1c79356b 1111 }
39037602
A
1112 } else {
1113 panic("Unknown mqueue type 0x%x: likely memory corruption!\n",
1114 mqueue->imq_wait_queue.waitq_type);
1c79356b 1115 }
d9a64523 1116
1c79356b
A
1117 /*
1118 * Looks like we'll have to block. The mqueue we will
1119 * block on (whether the set's or the local port's) is
1120 * still locked.
1121 */
1c79356b 1122 if (option & MACH_RCV_TIMEOUT) {
91447636 1123 if (rcv_timeout == 0) {
1c79356b 1124 imq_unlock(mqueue);
b0d623f7
A
1125 thread->ith_state = MACH_RCV_TIMED_OUT;
1126 return THREAD_NOT_WAITING;
1c79356b
A
1127 }
1128 }
1129
b0d623f7 1130 thread->ith_option = option;
39037602
A
1131 thread->ith_rsize = max_size;
1132 thread->ith_msize = 0;
1133
1134 if (option & MACH_PEEK_MSG)
1135 thread->ith_state = MACH_PEEK_IN_PROGRESS;
1136 else
1137 thread->ith_state = MACH_RCV_IN_PROGRESS;
55e303ae 1138
91447636
A
1139 if (option & MACH_RCV_TIMEOUT)
1140 clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
1141 else
1142 deadline = 0;
1143
d9a64523
A
1144 /*
1145 * Threads waiting on a port (not portset)
1146 * will wait on port's receive turnstile.
1147 * Donate waiting thread's turnstile and
1148 * setup inheritor for special reply port.
1149 * Based on the state of the special reply
1150 * port, the inheritor would be the send
1151 * turnstile of the connection port on which
1152 * the send of sync ipc would happen or
1153 * workloop's turnstile who would reply to
1154 * the sync ipc message.
1155 *
1156 * Pass in mqueue wait in waitq_assert_wait to
1157 * support port set wakeup. The mqueue waitq of port
1158 * will be converted to to turnstile waitq
1159 * in waitq_assert_wait instead of global waitqs.
1160 */
1161 if (imq_is_queue(mqueue)) {
1162 ipc_port_t port = ip_from_mq(mqueue);
1163 rcv_turnstile = turnstile_prepare((uintptr_t)port,
1164 port_rcv_turnstile_address(port),
1165 TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
1166
1167 if (port->ip_specialreply) {
1168 inheritor = ipc_port_get_special_reply_port_inheritor(port);
1169 }
1170
1171 turnstile_update_inheritor(rcv_turnstile, inheritor,
1172 (TURNSTILE_INHERITOR_TURNSTILE | TURNSTILE_DELAYED_UPDATE));
1173 }
1174
813fb2f6 1175 thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
3e170ce0
A
1176 wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue,
1177 IPC_MQUEUE_RECEIVE,
1178 interruptible,
1179 TIMEOUT_URGENCY_USER_NORMAL,
1180 deadline,
1181 TIMEOUT_NO_LEEWAY,
1182 thread);
b0d623f7
A
1183 /* preposts should be detected above, not here */
1184 if (wresult == THREAD_AWAKENED)
1185 panic("ipc_mqueue_receive_on_thread: sleep walking");
1186
55e303ae 1187 imq_unlock(mqueue);
39037602 1188
d9a64523
A
1189 /* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
1190 if (rcv_turnstile != TURNSTILE_NULL) {
1191 turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
1192 }
1193 /* Its callers responsibility to call turnstile_complete to get the turnstile back */
1194
b0d623f7 1195 return wresult;
1c79356b
A
1196}
1197
1198
39037602
A
1199/*
1200 * Routine: ipc_mqueue_peek_on_thread
1201 * Purpose:
1202 * A receiver discovered that there was a message on the queue
1203 * before he had to block. Tell a thread about the message queue,
1204 * but don't pick off any messages.
1205 * Conditions:
1206 * port_mq locked
1207 * at least one message on port_mq's message queue
1208 *
1209 * Returns: (on thread->ith_state)
1210 * MACH_PEEK_READY ith_peekq contains a message queue
1211 */
1212void
1213ipc_mqueue_peek_on_thread(
1214 ipc_mqueue_t port_mq,
1215 mach_msg_option_t option,
1216 thread_t thread)
1217{
1218 (void)option;
1219 assert(option & MACH_PEEK_MSG);
1220 assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL);
1221
1222 /*
1223 * Take a reference on the mqueue's associated port:
1224 * the peeking thread will be responsible to release this reference
1225 * using ip_release_mq()
1226 */
1227 ip_reference_mq(port_mq);
1228 thread->ith_peekq = port_mq;
1229 thread->ith_state = MACH_PEEK_READY;
1230}
1231
1c79356b 1232/*
b0d623f7 1233 * Routine: ipc_mqueue_select_on_thread
1c79356b
A
1234 * Purpose:
1235 * A receiver discovered that there was a message on the queue
1236 * before he had to block. Pick the message off the queue and
b0d623f7 1237 * "post" it to thread.
1c79356b
A
1238 * Conditions:
1239 * mqueue locked.
b0d623f7 1240 * thread not locked.
1c79356b 1241 * There is a message.
3e170ce0
A
1242 * No need to reserve prepost objects - it will never prepost
1243 *
1c79356b
A
1244 * Returns:
1245 * MACH_MSG_SUCCESS Actually selected a message for ourselves.
1246 * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
1247 */
1248void
b0d623f7 1249ipc_mqueue_select_on_thread(
3e170ce0
A
1250 ipc_mqueue_t port_mq,
1251 ipc_mqueue_t set_mq,
1c79356b 1252 mach_msg_option_t option,
b0d623f7
A
1253 mach_msg_size_t max_size,
1254 thread_t thread)
1c79356b 1255{
1c79356b 1256 ipc_kmsg_t kmsg;
b0d623f7 1257 mach_msg_return_t mr = MACH_MSG_SUCCESS;
39037602 1258 mach_msg_size_t msize;
1c79356b 1259
1c79356b
A
1260 /*
1261 * Do some sanity checking of our ability to receive
1262 * before pulling the message off the queue.
1263 */
3e170ce0 1264 kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
1c79356b
A
1265 assert(kmsg != IKM_NULL);
1266
1c79356b
A
1267 /*
1268 * If we really can't receive it, but we had the
1269 * MACH_RCV_LARGE option set, then don't take it off
1270 * the queue, instead return the appropriate error
1271 * (and size needed).
1272 */
39037602 1273 msize = ipc_kmsg_copyout_size(kmsg, thread->map);
d9a64523 1274 if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(thread), option) > max_size) {
91447636
A
1275 mr = MACH_RCV_TOO_LARGE;
1276 if (option & MACH_RCV_LARGE) {
3e170ce0 1277 thread->ith_receiver_name = port_mq->imq_receiver_name;
b0d623f7 1278 thread->ith_kmsg = IKM_NULL;
39037602 1279 thread->ith_msize = msize;
b0d623f7
A
1280 thread->ith_seqno = 0;
1281 thread->ith_state = mr;
91447636
A
1282 return;
1283 }
1c79356b
A
1284 }
1285
39037602
A
1286 ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
1287#if MACH_FLIPC
1288 if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport))
1289 flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
1290#endif
3e170ce0
A
1291 ipc_mqueue_release_msgcount(port_mq, set_mq);
1292 thread->ith_seqno = port_mq->imq_seqno++;
b0d623f7
A
1293 thread->ith_kmsg = kmsg;
1294 thread->ith_state = mr;
1c79356b
A
1295
1296 current_task()->messages_received++;
1297 return;
1298}
1299
b0d623f7 1300/*
39037602 1301 * Routine: ipc_mqueue_peek_locked
b0d623f7 1302 * Purpose:
39236c6e
A
1303 * Peek at a (non-set) message queue to see if it has a message
1304 * matching the sequence number provided (if zero, then the
1305 * first message in the queue) and return vital info about the
1306 * message.
1307 *
1308 * Conditions:
39037602
A
1309 * The ipc_mqueue_t is locked by callers.
1310 * Other locks may be held by callers, so this routine cannot block.
39236c6e
A
1311 * Caller holds reference on the message queue.
1312 */
1313unsigned
39037602
A
1314ipc_mqueue_peek_locked(ipc_mqueue_t mq,
1315 mach_port_seqno_t * seqnop,
1316 mach_msg_size_t * msg_sizep,
1317 mach_msg_id_t * msg_idp,
1318 mach_msg_max_trailer_t * msg_trailerp,
1319 ipc_kmsg_t *kmsgp)
39236c6e
A
1320{
1321 ipc_kmsg_queue_t kmsgq;
3e170ce0 1322 ipc_kmsg_t kmsg;
39236c6e 1323 mach_port_seqno_t seqno, msgoff;
39037602 1324 unsigned res = 0;
39236c6e
A
1325
1326 assert(!imq_is_set(mq));
1327
3e170ce0
A
1328 seqno = 0;
1329 if (seqnop != NULL)
1330 seqno = *seqnop;
39236c6e
A
1331
1332 if (seqno == 0) {
1333 seqno = mq->imq_seqno;
1334 msgoff = 0;
d9a64523 1335 } else if (seqno >= mq->imq_seqno &&
39236c6e
A
1336 seqno < mq->imq_seqno + mq->imq_msgcount) {
1337 msgoff = seqno - mq->imq_seqno;
1338 } else
1339 goto out;
1340
1341 /* look for the message that would match that seqno */
1342 kmsgq = &mq->imq_messages;
1343 kmsg = ipc_kmsg_queue_first(kmsgq);
1344 while (msgoff-- && kmsg != IKM_NULL) {
1345 kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
1346 }
1347 if (kmsg == IKM_NULL)
1348 goto out;
1349
1350 /* found one - return the requested info */
1351 if (seqnop != NULL)
1352 *seqnop = seqno;
1353 if (msg_sizep != NULL)
1354 *msg_sizep = kmsg->ikm_header->msgh_size;
1355 if (msg_idp != NULL)
1356 *msg_idp = kmsg->ikm_header->msgh_id;
1357 if (msg_trailerp != NULL)
d9a64523 1358 memcpy(msg_trailerp,
39236c6e
A
1359 (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
1360 round_msg(kmsg->ikm_header->msgh_size)),
1361 sizeof(mach_msg_max_trailer_t));
39037602
A
1362 if (kmsgp != NULL)
1363 *kmsgp = kmsg;
1364
39236c6e
A
1365 res = 1;
1366
39037602
A
1367out:
1368 return res;
1369}
1370
1371
1372/*
1373 * Routine: ipc_mqueue_peek
1374 * Purpose:
1375 * Peek at a (non-set) message queue to see if it has a message
1376 * matching the sequence number provided (if zero, then the
1377 * first message in the queue) and return vital info about the
1378 * message.
1379 *
1380 * Conditions:
1381 * The ipc_mqueue_t is unlocked.
1382 * Locks may be held by callers, so this routine cannot block.
1383 * Caller holds reference on the message queue.
1384 */
1385unsigned
1386ipc_mqueue_peek(ipc_mqueue_t mq,
1387 mach_port_seqno_t * seqnop,
1388 mach_msg_size_t * msg_sizep,
1389 mach_msg_id_t * msg_idp,
1390 mach_msg_max_trailer_t * msg_trailerp,
1391 ipc_kmsg_t *kmsgp)
1392{
1393 unsigned res;
1394
1395 imq_lock(mq);
1396
1397 res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
1398 msg_trailerp, kmsgp);
1399
39236c6e 1400 imq_unlock(mq);
39236c6e
A
1401 return res;
1402}
1403
39037602
A
1404/*
1405 * Routine: ipc_mqueue_release_peek_ref
1406 * Purpose:
1407 * Release the reference on an mqueue's associated port which was
1408 * granted to a thread in ipc_mqueue_peek_on_thread (on the
1409 * MACH_PEEK_MSG thread wakeup path).
1410 *
1411 * Conditions:
1412 * The ipc_mqueue_t should be locked on entry.
1413 * The ipc_mqueue_t will be _unlocked_ on return
1414 * (and potentially invalid!)
1415 *
1416 */
1417void ipc_mqueue_release_peek_ref(ipc_mqueue_t mq)
1418{
1419 assert(!imq_is_set(mq));
1420 assert(imq_held(mq));
1421
1422 /*
1423 * clear any preposts this mq may have generated
1424 * (which would cause subsequent immediate wakeups)
1425 */
1426 waitq_clear_prepost_locked(&mq->imq_wait_queue);
1427
1428 imq_unlock(mq);
1429
1430 /*
1431 * release the port reference: we need to do this outside the lock
1432 * because we might be holding the last port reference!
1433 **/
1434 ip_release_mq(mq);
1435}
3e170ce0
A
1436
1437/*
1438 * peek at the contained port message queues, break prepost iteration as soon
1439 * as we spot a message on one of the message queues referenced by the set's
1440 * prepost list. No need to lock each message queue, as only the head of each
1441 * queue is checked. If a message wasn't there before we entered here, no need
1442 * to find it (if we do, great).
1443 */
1444static int mqueue_peek_iterator(void *ctx, struct waitq *waitq,
1445 struct waitq_set *wqset)
1446{
1447 ipc_mqueue_t port_mq = (ipc_mqueue_t)waitq;
1448 ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
1449
1450 (void)ctx;
1451 (void)wqset;
d9a64523 1452
3e170ce0
A
1453 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL)
1454 return WQ_ITERATE_BREAK; /* break out of the prepost iteration */
1455
1456 return WQ_ITERATE_CONTINUE;
1457}
1458
39236c6e
A
1459/*
1460 * Routine: ipc_mqueue_set_peek
1461 * Purpose:
1462 * Peek at a message queue set to see if it has any ports
1463 * with messages.
b0d623f7
A
1464 *
1465 * Conditions:
1466 * Locks may be held by callers, so this routine cannot block.
1467 * Caller holds reference on the message queue.
1468 */
6d2010ae 1469unsigned
39236c6e 1470ipc_mqueue_set_peek(ipc_mqueue_t mq)
b0d623f7 1471{
3e170ce0 1472 int ret;
b0d623f7 1473
6d2010ae 1474 imq_lock(mq);
b0d623f7 1475
39037602
A
1476 /*
1477 * We may have raced with port destruction where the mqueue is marked
1478 * as invalid. In that case, even though we don't have messages, we
1479 * have an end-of-life event to deliver.
1480 */
1481 if (!imq_is_valid(mq))
1482 return 1;
1483
3e170ce0 1484 ret = waitq_set_iterate_preposts(&mq->imq_set_queue, NULL,
39037602 1485 mqueue_peek_iterator);
3e170ce0 1486
b0d623f7 1487 imq_unlock(mq);
39037602 1488
3e170ce0 1489 return (ret == WQ_ITERATE_BREAK);
39236c6e
A
1490}
1491
1492/*
1493 * Routine: ipc_mqueue_set_gather_member_names
1494 * Purpose:
3e170ce0
A
1495 * Discover all ports which are members of a given port set.
1496 * Because the waitq linkage mechanism was redesigned to save
1497 * significan amounts of memory, it no longer keeps back-pointers
1498 * from a port set to a port. Therefore, we must iterate over all
1499 * ports within a given IPC space and individually query them to
1500 * see if they are members of the given set. Port names of ports
1501 * found to be members of the given set will be gathered into the
1502 * provided 'names' array. Actual returned names are limited to
1503 * maxnames entries, but we keep counting the actual number of
1504 * members to let the caller decide to retry if necessary.
39236c6e
A
1505 *
1506 * Conditions:
1507 * Locks may be held by callers, so this routine cannot block.
3e170ce0 1508 * Caller holds reference on the message queue (via port set).
39236c6e
A
1509 */
1510void
1511ipc_mqueue_set_gather_member_names(
3e170ce0
A
1512 ipc_space_t space,
1513 ipc_mqueue_t set_mq,
1514 ipc_entry_num_t maxnames,
39236c6e
A
1515 mach_port_name_t *names,
1516 ipc_entry_num_t *actualp)
1517{
3e170ce0
A
1518 ipc_entry_t table;
1519 ipc_entry_num_t tsize;
1520 struct waitq_set *wqset;
39236c6e
A
1521 ipc_entry_num_t actual = 0;
1522
3e170ce0
A
1523 assert(set_mq != IMQ_NULL);
1524 wqset = &set_mq->imq_set_queue;
39236c6e 1525
3e170ce0
A
1526 assert(space != IS_NULL);
1527 is_read_lock(space);
1528 if (!is_active(space)) {
1529 is_read_unlock(space);
1530 goto out;
1531 }
39236c6e 1532
3e170ce0
A
1533 if (!waitq_set_is_valid(wqset)) {
1534 is_read_unlock(space);
1535 goto out;
1536 }
39236c6e 1537
3e170ce0
A
1538 table = space->is_table;
1539 tsize = space->is_table_size;
1540 for (ipc_entry_num_t idx = 0; idx < tsize; idx++) {
1541 ipc_entry_t entry = &table[idx];
1542
1543 /* only receive rights can be members of port sets */
1544 if ((entry->ie_bits & MACH_PORT_TYPE_RECEIVE) != MACH_PORT_TYPE_NONE) {
1545 __IGNORE_WCASTALIGN(ipc_port_t port = (ipc_port_t)entry->ie_object);
1546 ipc_mqueue_t mq = &port->ip_messages;
1547
1548 assert(IP_VALID(port));
1549 if (ip_active(port) &&
1550 waitq_member(&mq->imq_wait_queue, wqset)) {
1551 if (actual < maxnames)
1552 names[actual] = mq->imq_receiver_name;
1553 actual++;
1554 }
1555 }
39236c6e 1556 }
39236c6e 1557
3e170ce0
A
1558 is_read_unlock(space);
1559
1560out:
39236c6e 1561 *actualp = actual;
b0d623f7
A
1562}
1563
39236c6e 1564
1c79356b 1565/*
39037602 1566 * Routine: ipc_mqueue_destroy_locked
1c79356b 1567 * Purpose:
6d2010ae
A
1568 * Destroy a (non-set) message queue.
1569 * Set any blocked senders running.
1c79356b
A
1570 * Destroy the kmsgs in the queue.
1571 * Conditions:
39037602 1572 * mqueue locked
1c79356b
A
1573 * Receivers were removed when the receive right was "changed"
1574 */
39037602
A
1575boolean_t
1576ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue)
1c79356b
A
1577{
1578 ipc_kmsg_queue_t kmqueue;
1579 ipc_kmsg_t kmsg;
6d2010ae 1580 boolean_t reap = FALSE;
d9a64523 1581 struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
1c79356b 1582
3e170ce0
A
1583 assert(!imq_is_set(mqueue));
1584
1c79356b
A
1585 /*
1586 * rouse all blocked senders
3e170ce0
A
1587 * (don't boost anyone - we're tearing this queue down)
1588 * (never preposts)
1c79356b
A
1589 */
1590 mqueue->imq_fullwaiters = FALSE;
d9a64523
A
1591
1592 if (send_turnstile != TURNSTILE_NULL) {
1593 waitq_wakeup64_all(&send_turnstile->ts_waitq,
1594 IPC_MQUEUE_FULL,
1595 THREAD_RESTART,
1596 WAITQ_ALL_PRIORITIES);
1597 }
1c79356b 1598
6d2010ae
A
1599 /*
1600 * Move messages from the specified queue to the per-thread
1601 * clean/drain queue while we have the mqueue lock.
1602 */
1c79356b 1603 kmqueue = &mqueue->imq_messages;
1c79356b 1604 while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
39037602
A
1605#if MACH_FLIPC
1606 if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(mqueue->imq_fport))
1607 flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
1608#endif
6d2010ae
A
1609 boolean_t first;
1610 first = ipc_kmsg_delayed_destroy(kmsg);
1611 if (first)
1612 reap = first;
1c79356b 1613 }
6d2010ae 1614
3e170ce0
A
1615 /*
1616 * Wipe out message count, both for messages about to be
1617 * reaped and for reserved space for (previously) woken senders.
1618 * This is the indication to them that their reserved space is gone
1619 * (the mqueue was destroyed).
1620 */
1621 mqueue->imq_msgcount = 0;
1622
39037602
A
1623 /* invalidate the waitq for subsequent mqueue operations */
1624 waitq_invalidate_locked(&mqueue->imq_wait_queue);
3e170ce0 1625
39037602
A
1626 /* clear out any preposting we may have done */
1627 waitq_clear_prepost_locked(&mqueue->imq_wait_queue);
6d2010ae 1628
3e170ce0 1629 /*
39037602
A
1630 * assert that we are destroying / invalidating a queue that's
1631 * not a member of any other queue.
3e170ce0 1632 */
39037602
A
1633 assert(mqueue->imq_preposts == 0);
1634 assert(mqueue->imq_in_pset == 0);
3e170ce0 1635
39037602 1636 return reap;
1c79356b
A
1637}
1638
1639/*
1640 * Routine: ipc_mqueue_set_qlimit
1641 * Purpose:
1642 * Changes a message queue limit; the maximum number
1643 * of messages which may be queued.
1644 * Conditions:
1645 * Nothing locked.
1646 */
1647
1648void
1649ipc_mqueue_set_qlimit(
1650 ipc_mqueue_t mqueue,
1651 mach_port_msgcount_t qlimit)
1652{
1c79356b 1653
91447636
A
1654 assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1655
1c79356b 1656 /* wake up senders allowed by the new qlimit */
1c79356b
A
1657 imq_lock(mqueue);
1658 if (qlimit > mqueue->imq_qlimit) {
1659 mach_port_msgcount_t i, wakeup;
d9a64523 1660 struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
1c79356b
A
1661
1662 /* caution: wakeup, qlimit are unsigned */
1663 wakeup = qlimit - mqueue->imq_qlimit;
1664
1665 for (i = 0; i < wakeup; i++) {
3e170ce0
A
1666 /*
1667 * boost the priority of the awoken thread
1668 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
1669 * the message queue slot we've just reserved.
1670 *
1671 * NOTE: this will never prepost
1672 */
d9a64523
A
1673 if (send_turnstile == TURNSTILE_NULL ||
1674 waitq_wakeup64_one(&send_turnstile->ts_waitq,
1675 IPC_MQUEUE_FULL,
1676 THREAD_AWAKENED,
1677 WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
3e170ce0
A
1678 mqueue->imq_fullwaiters = FALSE;
1679 break;
1680 }
1681 mqueue->imq_msgcount++; /* give it to the awakened thread */
1c79356b 1682 }
3e170ce0 1683 }
1c79356b
A
1684 mqueue->imq_qlimit = qlimit;
1685 imq_unlock(mqueue);
1c79356b
A
1686}
1687
1688/*
1689 * Routine: ipc_mqueue_set_seqno
1690 * Purpose:
1691 * Changes an mqueue's sequence number.
1692 * Conditions:
1693 * Caller holds a reference to the queue's containing object.
1694 */
1695void
1696ipc_mqueue_set_seqno(
1697 ipc_mqueue_t mqueue,
1698 mach_port_seqno_t seqno)
1699{
1c79356b
A
1700 imq_lock(mqueue);
1701 mqueue->imq_seqno = seqno;
1702 imq_unlock(mqueue);
1c79356b
A
1703}
1704
1705
1706/*
1707 * Routine: ipc_mqueue_copyin
1708 * Purpose:
1709 * Convert a name in a space to a message queue.
1710 * Conditions:
1711 * Nothing locked. If successful, the caller gets a ref for
1712 * for the object. This ref ensures the continued existence of
1713 * the queue.
1714 * Returns:
1715 * MACH_MSG_SUCCESS Found a message queue.
1716 * MACH_RCV_INVALID_NAME The space is dead.
1717 * MACH_RCV_INVALID_NAME The name doesn't denote a right.
1718 * MACH_RCV_INVALID_NAME
1719 * The denoted right is not receive or port set.
1720 * MACH_RCV_IN_SET Receive right is a member of a set.
1721 */
1722
1723mach_msg_return_t
1724ipc_mqueue_copyin(
1725 ipc_space_t space,
1726 mach_port_name_t name,
1727 ipc_mqueue_t *mqueuep,
1728 ipc_object_t *objectp)
1729{
1730 ipc_entry_t entry;
1731 ipc_object_t object;
1732 ipc_mqueue_t mqueue;
1733
1734 is_read_lock(space);
316670eb 1735 if (!is_active(space)) {
1c79356b
A
1736 is_read_unlock(space);
1737 return MACH_RCV_INVALID_NAME;
1738 }
1739
1740 entry = ipc_entry_lookup(space, name);
1741 if (entry == IE_NULL) {
1742 is_read_unlock(space);
1743 return MACH_RCV_INVALID_NAME;
1744 }
1745
1746 object = entry->ie_object;
1747
1748 if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
1749 ipc_port_t port;
1c79356b 1750
3e170ce0 1751 __IGNORE_WCASTALIGN(port = (ipc_port_t) object);
1c79356b
A
1752 assert(port != IP_NULL);
1753
1754 ip_lock(port);
1755 assert(ip_active(port));
1756 assert(port->ip_receiver_name == name);
1757 assert(port->ip_receiver == space);
1758 is_read_unlock(space);
1759 mqueue = &port->ip_messages;
1760
1761 } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
1762 ipc_pset_t pset;
1763
3e170ce0 1764 __IGNORE_WCASTALIGN(pset = (ipc_pset_t) object);
1c79356b
A
1765 assert(pset != IPS_NULL);
1766
1767 ips_lock(pset);
1768 assert(ips_active(pset));
1c79356b
A
1769 is_read_unlock(space);
1770
1771 mqueue = &pset->ips_messages;
1772 } else {
1773 is_read_unlock(space);
1774 return MACH_RCV_INVALID_NAME;
1775 }
1776
1777 /*
1778 * At this point, the object is locked and active,
1779 * the space is unlocked, and mqueue is initialized.
1780 */
1781
1782 io_reference(object);
1783 io_unlock(object);
1784
1785 *objectp = object;
1786 *mqueuep = mqueue;
1787 return MACH_MSG_SUCCESS;
1788}