]> git.saurik.com Git - apple/xnu.git/blame - osfmk/ipc/ipc_mqueue.c
xnu-3247.1.106.tar.gz
[apple/xnu.git] / osfmk / ipc / ipc_mqueue.c
CommitLineData
1c79356b 1/*
2d21ac55 2 * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
1c79356b 3 *
2d21ac55 4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
1c79356b 5 *
2d21ac55
A
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
8f6c56a5 14 *
2d21ac55
A
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
8f6c56a5
A
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
2d21ac55
A
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
8f6c56a5 25 *
2d21ac55 26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
1c79356b
A
27 */
28/*
29 * @OSF_FREE_COPYRIGHT@
30 */
31/*
32 * Mach Operating System
33 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34 * All Rights Reserved.
35 *
36 * Permission to use, copy, modify and distribute this software and its
37 * documentation is hereby granted, provided that both the copyright
38 * notice and this permission notice appear in all copies of the
39 * software, derivative works or modified versions, and any portions
40 * thereof, and that both notices appear in supporting documentation.
41 *
42 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45 *
46 * Carnegie Mellon requests users of this software to return to
47 *
48 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
49 * School of Computer Science
50 * Carnegie Mellon University
51 * Pittsburgh PA 15213-3890
52 *
53 * any improvements or extensions that they make and grant Carnegie Mellon
54 * the rights to redistribute these changes.
55 */
56/*
57 */
58/*
59 * File: ipc/ipc_mqueue.c
60 * Author: Rich Draves
61 * Date: 1989
62 *
63 * Functions to manipulate IPC message queues.
64 */
2d21ac55
A
65/*
66 * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67 * support for mandatory and extensible security protections. This notice
68 * is included in support of clause 2.2 (b) of the Apple Public License,
69 * Version 2.0.
70 */
71
1c79356b
A
72
73#include <mach/port.h>
74#include <mach/message.h>
75#include <mach/sync_policy.h>
76
77#include <kern/assert.h>
78#include <kern/counters.h>
79#include <kern/sched_prim.h>
80#include <kern/ipc_kobject.h>
91447636 81#include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
1c79356b
A
82#include <kern/misc_protos.h>
83#include <kern/task.h>
84#include <kern/thread.h>
3e170ce0 85#include <kern/waitq.h>
1c79356b
A
86
87#include <ipc/ipc_mqueue.h>
88#include <ipc/ipc_kmsg.h>
89#include <ipc/ipc_port.h>
90#include <ipc/ipc_pset.h>
91#include <ipc/ipc_space.h>
92
b0d623f7
A
93#ifdef __LP64__
94#include <vm/vm_map.h>
95#endif
1c79356b
A
96
97int ipc_mqueue_full; /* address is event for queue space */
98int ipc_mqueue_rcv; /* address is event for message arrival */
99
91447636
A
100/* forward declarations */
101void ipc_mqueue_receive_results(wait_result_t result);
102
1c79356b
A
103/*
104 * Routine: ipc_mqueue_init
105 * Purpose:
106 * Initialize a newly-allocated message queue.
107 */
108void
109ipc_mqueue_init(
110 ipc_mqueue_t mqueue,
3e170ce0
A
111 boolean_t is_set,
112 uint64_t *reserved_link)
1c79356b
A
113{
114 if (is_set) {
3e170ce0
A
115 waitq_set_init(&mqueue->imq_set_queue,
116 SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST|SYNC_POLICY_DISABLE_IRQ,
117 reserved_link);
1c79356b 118 } else {
3e170ce0 119 waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO|SYNC_POLICY_DISABLE_IRQ);
1c79356b
A
120 ipc_kmsg_queue_init(&mqueue->imq_messages);
121 mqueue->imq_seqno = 0;
122 mqueue->imq_msgcount = 0;
123 mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
124 mqueue->imq_fullwaiters = FALSE;
125 }
126}
127
3e170ce0
A
128void ipc_mqueue_deinit(
129 ipc_mqueue_t mqueue)
130{
131 boolean_t is_set = imq_is_set(mqueue);
132
133 if (is_set)
134 waitq_set_deinit(&mqueue->imq_set_queue);
135 else
136 waitq_deinit(&mqueue->imq_wait_queue);
137}
138
139/*
140 * Routine: imq_reserve_and_lock
141 * Purpose:
142 * Atomically lock an ipc_mqueue_t object and reserve
143 * an appropriate number of prepost linkage objects for
144 * use in wakeup operations.
145 * Conditions:
146 * mq is unlocked
147 */
148void
149imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost, spl_t *spl)
150{
151 *reserved_prepost = waitq_prepost_reserve(&mq->imq_wait_queue, 0,
152 WAITQ_KEEP_LOCKED, spl);
153
154}
155
156
157/*
158 * Routine: imq_release_and_unlock
159 * Purpose:
160 * Unlock an ipc_mqueue_t object, re-enable interrupts,
161 * and release any unused prepost object reservations.
162 * Conditions:
163 * mq is locked
164 */
165void
166imq_release_and_unlock(ipc_mqueue_t mq, uint64_t reserved_prepost, spl_t spl)
167{
168 assert(imq_held(mq));
169 waitq_unlock(&mq->imq_wait_queue);
170 splx(spl);
171 waitq_prepost_release_reserve(reserved_prepost);
172}
173
174
1c79356b
A
175/*
176 * Routine: ipc_mqueue_member
177 * Purpose:
178 * Indicate whether the (port) mqueue is a member of
179 * this portset's mqueue. We do this by checking
180 * whether the portset mqueue's waitq is an member of
181 * the port's mqueue waitq.
182 * Conditions:
183 * the portset's mqueue is not already a member
184 * this may block while allocating linkage structures.
185 */
186
187boolean_t
188ipc_mqueue_member(
91447636
A
189 ipc_mqueue_t port_mqueue,
190 ipc_mqueue_t set_mqueue)
1c79356b 191{
3e170ce0
A
192 struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
193 struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
1c79356b 194
3e170ce0 195 return waitq_member(port_waitq, set_waitq);
1c79356b
A
196
197}
198
199/*
200 * Routine: ipc_mqueue_remove
201 * Purpose:
202 * Remove the association between the queue and the specified
9bccf70c 203 * set message queue.
1c79356b
A
204 */
205
206kern_return_t
207ipc_mqueue_remove(
316670eb 208 ipc_mqueue_t mqueue,
3e170ce0 209 ipc_mqueue_t set_mqueue)
1c79356b 210{
3e170ce0
A
211 struct waitq *mq_waitq = &mqueue->imq_wait_queue;
212 struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
1c79356b 213
3e170ce0 214 return waitq_unlink(mq_waitq, set_waitq);
1c79356b
A
215}
216
217/*
9bccf70c 218 * Routine: ipc_mqueue_remove_from_all
1c79356b 219 * Purpose:
9bccf70c 220 * Remove the mqueue from all the sets it is a member of
1c79356b 221 * Conditions:
9bccf70c 222 * Nothing locked.
1c79356b
A
223 */
224void
3e170ce0 225ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue)
1c79356b 226{
3e170ce0 227 struct waitq *mq_waitq = &mqueue->imq_wait_queue;
1c79356b 228
3e170ce0 229 waitq_unlink_all(mq_waitq);
9bccf70c
A
230 return;
231}
232
233/*
234 * Routine: ipc_mqueue_remove_all
235 * Purpose:
236 * Remove all the member queues from the specified set.
3e170ce0 237 * Also removes the queue from any containing sets.
9bccf70c
A
238 * Conditions:
239 * Nothing locked.
240 */
241void
3e170ce0 242ipc_mqueue_remove_all(ipc_mqueue_t mqueue)
9bccf70c 243{
3e170ce0
A
244 struct waitq_set *mq_setq = &mqueue->imq_set_queue;
245 waitq_set_unlink_all(mq_setq);
1c79356b
A
246 return;
247}
248
249
250/*
251 * Routine: ipc_mqueue_add
252 * Purpose:
253 * Associate the portset's mqueue with the port's mqueue.
254 * This has to be done so that posting the port will wakeup
255 * a portset waiter. If there are waiters on the portset
256 * mqueue and messages on the port mqueue, try to match them
257 * up now.
258 * Conditions:
259 * May block.
260 */
261kern_return_t
262ipc_mqueue_add(
3e170ce0
A
263 ipc_mqueue_t port_mqueue,
264 ipc_mqueue_t set_mqueue,
265 uint64_t *reserved_link,
266 uint64_t *reserved_prepost)
1c79356b 267{
3e170ce0
A
268 struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
269 struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
1c79356b
A
270 ipc_kmsg_queue_t kmsgq;
271 ipc_kmsg_t kmsg, next;
272 kern_return_t kr;
273 spl_t s;
274
3e170ce0
A
275 assert(reserved_link && *reserved_link != 0);
276
277 s = splsched();
278 imq_lock(port_mqueue);
279
280 /*
281 * The link operation is now under the same lock-hold as
282 * message iteration and thread wakeup, but doesn't have to be...
283 */
284 kr = waitq_link(port_waitq, set_waitq, WAITQ_ALREADY_LOCKED, reserved_link);
285 if (kr != KERN_SUCCESS) {
286 imq_unlock(port_mqueue);
287 splx(s);
1c79356b 288 return kr;
3e170ce0 289 }
1c79356b
A
290
291 /*
292 * Now that the set has been added to the port, there may be
293 * messages queued on the port and threads waiting on the set
294 * waitq. Lets get them together.
295 */
1c79356b
A
296 kmsgq = &port_mqueue->imq_messages;
297 for (kmsg = ipc_kmsg_queue_first(kmsgq);
298 kmsg != IKM_NULL;
299 kmsg = next) {
300 next = ipc_kmsg_queue_next(kmsgq, kmsg);
301
302 for (;;) {
303 thread_t th;
b0d623f7 304 mach_msg_size_t msize;
3e170ce0 305 spl_t th_spl;
1c79356b 306
3e170ce0 307 th = waitq_wakeup64_identity_locked(
9bccf70c
A
308 port_waitq,
309 IPC_MQUEUE_RECEIVE,
3e170ce0
A
310 THREAD_AWAKENED, &th_spl,
311 reserved_prepost, WAITQ_KEEP_LOCKED);
1c79356b
A
312 /* waitq/mqueue still locked, thread locked */
313
314 if (th == THREAD_NULL)
315 goto leave;
316
b0d623f7
A
317 /*
318 * If the receiver waited with a facility not directly
319 * related to Mach messaging, then it isn't prepared to get
320 * handed the message directly. Just set it running, and
321 * go look for another thread that can.
322 */
323 if (th->ith_state != MACH_RCV_IN_PROGRESS) {
324 thread_unlock(th);
3e170ce0 325 splx(th_spl);
b0d623f7
A
326 continue;
327 }
328
1c79356b
A
329 /*
330 * Found a receiver. see if they can handle the message
331 * correctly (the message is not too large for them, or
332 * they didn't care to be informed that the message was
333 * too large). If they can't handle it, take them off
334 * the list and let them go back and figure it out and
335 * just move onto the next.
336 */
b0d623f7 337 msize = ipc_kmsg_copyout_size(kmsg, th->map);
1c79356b 338 if (th->ith_msize <
316670eb 339 (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(th), th->ith_option))) {
1c79356b 340 th->ith_state = MACH_RCV_TOO_LARGE;
b0d623f7 341 th->ith_msize = msize;
1c79356b
A
342 if (th->ith_option & MACH_RCV_LARGE) {
343 /*
344 * let him go without message
345 */
b0d623f7 346 th->ith_receiver_name = port_mqueue->imq_receiver_name;
1c79356b
A
347 th->ith_kmsg = IKM_NULL;
348 th->ith_seqno = 0;
349 thread_unlock(th);
3e170ce0 350 splx(th_spl);
1c79356b
A
351 continue; /* find another thread */
352 }
353 } else {
354 th->ith_state = MACH_MSG_SUCCESS;
355 }
356
357 /*
358 * This thread is going to take this message,
359 * so give it to him.
360 */
1c79356b 361 ipc_kmsg_rmqueue(kmsgq, kmsg);
3e170ce0 362 ipc_mqueue_release_msgcount(port_mqueue, IMQ_NULL);
91447636 363
1c79356b
A
364 th->ith_kmsg = kmsg;
365 th->ith_seqno = port_mqueue->imq_seqno++;
366 thread_unlock(th);
3e170ce0 367 splx(th_spl);
1c79356b
A
368 break; /* go to next message */
369 }
1c79356b
A
370 }
371 leave:
372 imq_unlock(port_mqueue);
373 splx(s);
374 return KERN_SUCCESS;
375}
376
377/*
378 * Routine: ipc_mqueue_changed
379 * Purpose:
380 * Wake up receivers waiting in a message queue.
381 * Conditions:
382 * The message queue is locked.
383 */
384
385void
386ipc_mqueue_changed(
387 ipc_mqueue_t mqueue)
388{
3e170ce0
A
389 waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
390 IPC_MQUEUE_RECEIVE,
391 THREAD_RESTART,
392 NULL,
393 WAITQ_ALL_PRIORITIES,
394 WAITQ_KEEP_LOCKED);
1c79356b
A
395}
396
397
398
399
400/*
401 * Routine: ipc_mqueue_send
402 * Purpose:
403 * Send a message to a message queue. The message holds a reference
404 * for the destination port for this message queue in the
405 * msgh_remote_port field.
406 *
407 * If unsuccessful, the caller still has possession of
408 * the message and must do something with it. If successful,
409 * the message is queued, given to a receiver, or destroyed.
410 * Conditions:
39236c6e 411 * mqueue is locked.
1c79356b
A
412 * Returns:
413 * MACH_MSG_SUCCESS The message was accepted.
414 * MACH_SEND_TIMED_OUT Caller still has message.
415 * MACH_SEND_INTERRUPTED Caller still has message.
416 */
417mach_msg_return_t
418ipc_mqueue_send(
419 ipc_mqueue_t mqueue,
b0d623f7 420 ipc_kmsg_t kmsg,
1c79356b 421 mach_msg_option_t option,
b0d623f7
A
422 mach_msg_timeout_t send_timeout,
423 spl_t s)
1c79356b 424{
9bccf70c 425 int wresult;
1c79356b
A
426
427 /*
428 * Don't block if:
429 * 1) We're under the queue limit.
430 * 2) Caller used the MACH_SEND_ALWAYS internal option.
431 * 3) Message is sent to a send-once right.
432 */
1c79356b 433 if (!imq_full(mqueue) ||
c910b4d9
A
434 (!imq_full_kernel(mqueue) &&
435 ((option & MACH_SEND_ALWAYS) ||
436 (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
437 MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
1c79356b 438 mqueue->imq_msgcount++;
91447636 439 assert(mqueue->imq_msgcount > 0);
1c79356b
A
440 imq_unlock(mqueue);
441 splx(s);
442 } else {
55e303ae 443 thread_t cur_thread = current_thread();
91447636 444 uint64_t deadline;
1c79356b
A
445
446 /*
447 * We have to wait for space to be granted to us.
448 */
91447636 449 if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
1c79356b
A
450 imq_unlock(mqueue);
451 splx(s);
452 return MACH_SEND_TIMED_OUT;
453 }
c910b4d9
A
454 if (imq_full_kernel(mqueue)) {
455 imq_unlock(mqueue);
456 splx(s);
457 return MACH_SEND_NO_BUFFER;
458 }
1c79356b 459 mqueue->imq_fullwaiters = TRUE;
55e303ae 460 thread_lock(cur_thread);
91447636
A
461 if (option & MACH_SEND_TIMEOUT)
462 clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
463 else
464 deadline = 0;
3e170ce0 465 wresult = waitq_assert_wait64_locked(
9bccf70c
A
466 &mqueue->imq_wait_queue,
467 IPC_MQUEUE_FULL,
39236c6e
A
468 THREAD_ABORTSAFE,
469 TIMEOUT_URGENCY_USER_NORMAL,
3e170ce0 470 deadline, TIMEOUT_NO_LEEWAY,
55e303ae
A
471 cur_thread);
472 thread_unlock(cur_thread);
473 imq_unlock(mqueue);
1c79356b
A
474 splx(s);
475
9bccf70c 476 if (wresult == THREAD_WAITING) {
91447636 477 wresult = thread_block(THREAD_CONTINUE_NULL);
9bccf70c
A
478 counter(c_ipc_mqueue_send_block++);
479 }
1c79356b 480
9bccf70c 481 switch (wresult) {
3e170ce0
A
482
483 case THREAD_AWAKENED:
484 /*
485 * we can proceed - inherited msgcount from waker
486 * or the message queue has been destroyed and the msgcount
487 * has been reset to zero (will detect in ipc_mqueue_post()).
488 */
489 break;
490
1c79356b
A
491 case THREAD_TIMED_OUT:
492 assert(option & MACH_SEND_TIMEOUT);
493 return MACH_SEND_TIMED_OUT;
494
1c79356b 495 case THREAD_INTERRUPTED:
1c79356b
A
496 return MACH_SEND_INTERRUPTED;
497
498 case THREAD_RESTART:
b0d623f7
A
499 /* mqueue is being destroyed */
500 return MACH_SEND_INVALID_DEST;
1c79356b
A
501 default:
502 panic("ipc_mqueue_send");
503 }
504 }
505
506 ipc_mqueue_post(mqueue, kmsg);
507 return MACH_MSG_SUCCESS;
508}
509
39236c6e 510
1c79356b
A
511/*
512 * Routine: ipc_mqueue_release_msgcount
513 * Purpose:
514 * Release a message queue reference in the case where we
515 * found a waiter.
516 *
517 * Conditions:
91447636
A
518 * The message queue is locked.
519 * The message corresponding to this reference is off the queue.
3e170ce0
A
520 * There is no need to pass reserved preposts because this will
521 * never prepost to anyone
1c79356b
A
522 */
523void
3e170ce0 524ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq)
1c79356b 525{
3e170ce0
A
526 (void)set_mq;
527 assert(imq_held(port_mq));
528 assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
1c79356b 529
3e170ce0 530 port_mq->imq_msgcount--;
91447636 531
3e170ce0
A
532 if (!imq_full(port_mq) && port_mq->imq_fullwaiters) {
533 /*
534 * boost the priority of the awoken thread
535 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
536 * the message queue slot we've just reserved.
537 *
538 * NOTE: this will never prepost
539 */
540 if (waitq_wakeup64_one_locked(&port_mq->imq_wait_queue,
541 IPC_MQUEUE_FULL,
542 THREAD_AWAKENED,
543 NULL,
544 WAITQ_PROMOTE_PRIORITY,
545 WAITQ_KEEP_LOCKED) != KERN_SUCCESS) {
546 port_mq->imq_fullwaiters = FALSE;
1c79356b 547 } else {
91447636 548 /* gave away our slot - add reference back */
3e170ce0 549 port_mq->imq_msgcount++;
1c79356b
A
550 }
551 }
3e170ce0
A
552
553 if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
554 /* no more msgs: invalidate the port's prepost object */
555 waitq_clear_prepost_locked(&port_mq->imq_wait_queue, NULL);
556 }
1c79356b
A
557}
558
559/*
560 * Routine: ipc_mqueue_post
561 * Purpose:
562 * Post a message to a waiting receiver or enqueue it. If a
563 * receiver is waiting, we can release our reserved space in
564 * the message queue.
565 *
566 * Conditions:
3e170ce0 567 * mqueue is unlocked
1c79356b
A
568 * If we need to queue, our space in the message queue is reserved.
569 */
570void
571ipc_mqueue_post(
572 register ipc_mqueue_t mqueue,
573 register ipc_kmsg_t kmsg)
574{
1c79356b 575 spl_t s;
3e170ce0 576 uint64_t reserved_prepost = 0;
1c79356b
A
577
578 /*
579 * While the msg queue is locked, we have control of the
580 * kmsg, so the ref in it for the port is still good.
581 *
582 * Check for a receiver for the message.
583 */
3e170ce0 584 imq_reserve_and_lock(mqueue, &reserved_prepost, &s);
1c79356b 585 for (;;) {
3e170ce0
A
586 struct waitq *waitq = &mqueue->imq_wait_queue;
587 spl_t th_spl;
1c79356b 588 thread_t receiver;
b0d623f7 589 mach_msg_size_t msize;
1c79356b 590
3e170ce0
A
591 receiver = waitq_wakeup64_identity_locked(waitq,
592 IPC_MQUEUE_RECEIVE,
593 THREAD_AWAKENED,
594 &th_spl,
595 &reserved_prepost,
596 WAITQ_KEEP_LOCKED);
1c79356b
A
597 /* waitq still locked, thread locked */
598
599 if (receiver == THREAD_NULL) {
3e170ce0 600
1c79356b 601 /*
3e170ce0 602 * no receivers; queue kmsg if space still reserved.
1c79356b 603 */
3e170ce0
A
604 if (mqueue->imq_msgcount > 0) {
605 ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
606 break;
607 }
608
609 /*
610 * Otherwise, the message queue must belong to an inactive
611 * port, so just destroy the message and pretend it was posted.
612 */
613 /* clear the waitq boost we may have been given */
614 waitq_clear_promotion_locked(waitq, current_thread());
615 imq_release_and_unlock(mqueue, reserved_prepost, s);
616 ipc_kmsg_destroy(kmsg);
617 current_task()->messages_sent++;
618 return;
1c79356b 619 }
b0d623f7
A
620
621 /*
622 * If the receiver waited with a facility not directly
623 * related to Mach messaging, then it isn't prepared to get
624 * handed the message directly. Just set it running, and
625 * go look for another thread that can.
626 */
627 if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
628 thread_unlock(receiver);
3e170ce0 629 splx(th_spl);
b0d623f7
A
630 continue;
631 }
632
633
1c79356b
A
634 /*
635 * We found a waiting thread.
636 * If the message is too large or the scatter list is too small
637 * the thread we wake up will get that as its status.
638 */
b0d623f7 639 msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
1c79356b 640 if (receiver->ith_msize <
316670eb 641 (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(receiver), receiver->ith_option))) {
b0d623f7 642 receiver->ith_msize = msize;
1c79356b
A
643 receiver->ith_state = MACH_RCV_TOO_LARGE;
644 } else {
645 receiver->ith_state = MACH_MSG_SUCCESS;
646 }
647
648 /*
649 * If there is no problem with the upcoming receive, or the
650 * receiver thread didn't specifically ask for special too
651 * large error condition, go ahead and select it anyway.
652 */
653 if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
654 !(receiver->ith_option & MACH_RCV_LARGE)) {
655
656 receiver->ith_kmsg = kmsg;
657 receiver->ith_seqno = mqueue->imq_seqno++;
658 thread_unlock(receiver);
3e170ce0 659 splx(th_spl);
1c79356b
A
660
661 /* we didn't need our reserved spot in the queue */
3e170ce0 662 ipc_mqueue_release_msgcount(mqueue, IMQ_NULL);
1c79356b
A
663 break;
664 }
665
666 /*
667 * Otherwise, this thread needs to be released to run
668 * and handle its error without getting the message. We
669 * need to go back and pick another one.
670 */
39236c6e 671 receiver->ith_receiver_name = mqueue->imq_receiver_name;
1c79356b
A
672 receiver->ith_kmsg = IKM_NULL;
673 receiver->ith_seqno = 0;
674 thread_unlock(receiver);
3e170ce0 675 splx(th_spl);
1c79356b
A
676 }
677
3e170ce0
A
678 /* clear the waitq boost we may have been given */
679 waitq_clear_promotion_locked(&mqueue->imq_wait_queue, current_thread());
680 imq_release_and_unlock(mqueue, reserved_prepost, s);
1c79356b
A
681
682 current_task()->messages_sent++;
683 return;
684}
685
686
91447636
A
687/* static */ void
688ipc_mqueue_receive_results(wait_result_t saved_wait_result)
1c79356b
A
689{
690 thread_t self = current_thread();
691 mach_msg_option_t option = self->ith_option;
1c79356b
A
692
693 /*
694 * why did we wake up?
695 */
696 switch (saved_wait_result) {
697 case THREAD_TIMED_OUT:
698 self->ith_state = MACH_RCV_TIMED_OUT;
699 return;
700
701 case THREAD_INTERRUPTED:
1c79356b
A
702 self->ith_state = MACH_RCV_INTERRUPTED;
703 return;
704
705 case THREAD_RESTART:
706 /* something bad happened to the port/set */
1c79356b
A
707 self->ith_state = MACH_RCV_PORT_CHANGED;
708 return;
709
710 case THREAD_AWAKENED:
711 /*
712 * We do not need to go select a message, somebody
713 * handed us one (or a too-large indication).
714 */
1c79356b
A
715 switch (self->ith_state) {
716 case MACH_RCV_SCATTER_SMALL:
717 case MACH_RCV_TOO_LARGE:
718 /*
719 * Somebody tried to give us a too large
720 * message. If we indicated that we cared,
721 * then they only gave us the indication,
722 * otherwise they gave us the indication
723 * AND the message anyway.
724 */
725 if (option & MACH_RCV_LARGE) {
726 return;
727 }
728
729 case MACH_MSG_SUCCESS:
730 return;
731
732 default:
733 panic("ipc_mqueue_receive_results: strange ith_state");
734 }
735
736 default:
737 panic("ipc_mqueue_receive_results: strange wait_result");
738 }
739}
740
741void
91447636
A
742ipc_mqueue_receive_continue(
743 __unused void *param,
744 wait_result_t wresult)
1c79356b 745{
91447636 746 ipc_mqueue_receive_results(wresult);
1c79356b
A
747 mach_msg_receive_continue(); /* hard-coded for now */
748}
749
750/*
751 * Routine: ipc_mqueue_receive
752 * Purpose:
753 * Receive a message from a message queue.
754 *
755 * If continuation is non-zero, then we might discard
756 * our kernel stack when we block. We will continue
757 * after unblocking by executing continuation.
758 *
759 * If resume is true, then we are resuming a receive
760 * operation after a blocked receive discarded our stack.
761 * Conditions:
762 * Our caller must hold a reference for the port or port set
763 * to which this queue belongs, to keep the queue
764 * from being deallocated.
765 *
766 * The kmsg is returned with clean header fields
767 * and with the circular bit turned off.
768 * Returns:
769 * MACH_MSG_SUCCESS Message returned in kmsgp.
770 * MACH_RCV_TOO_LARGE Message size returned in kmsgp.
771 * MACH_RCV_TIMED_OUT No message obtained.
772 * MACH_RCV_INTERRUPTED No message obtained.
773 * MACH_RCV_PORT_DIED Port/set died; no message.
774 * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
775 *
776 */
777
778void
779ipc_mqueue_receive(
b0d623f7
A
780 ipc_mqueue_t mqueue,
781 mach_msg_option_t option,
782 mach_msg_size_t max_size,
783 mach_msg_timeout_t rcv_timeout,
784 int interruptible)
785{
786 wait_result_t wresult;
787 thread_t self = current_thread();
788
789 wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size,
790 rcv_timeout, interruptible,
791 self);
792 if (wresult == THREAD_NOT_WAITING)
793 return;
794
795 if (wresult == THREAD_WAITING) {
796 counter((interruptible == THREAD_ABORTSAFE) ?
797 c_ipc_mqueue_receive_block_user++ :
798 c_ipc_mqueue_receive_block_kernel++);
799
800 if (self->ith_continuation)
801 thread_block(ipc_mqueue_receive_continue);
802 /* NOTREACHED */
803
804 wresult = thread_block(THREAD_CONTINUE_NULL);
805 }
806 ipc_mqueue_receive_results(wresult);
807}
808
3e170ce0
A
809static int mqueue_process_prepost_receive(void *ctx, struct waitq *waitq,
810 struct waitq_set *wqset)
811{
812 ipc_mqueue_t port_mq, *pmq_ptr;
813
814 (void)wqset;
815 port_mq = (ipc_mqueue_t)waitq;
816
817 /*
818 * If there are no messages on this queue, skip it and remove
819 * it from the prepost list
820 */
821 if (ipc_kmsg_queue_empty(&port_mq->imq_messages))
822 return WQ_ITERATE_INVALIDATE_CONTINUE;
823
824 /*
825 * There are messages waiting on this port.
826 * Instruct the prepost iteration logic to break, but keep the
827 * waitq locked.
828 */
829 pmq_ptr = (ipc_mqueue_t *)ctx;
830 if (pmq_ptr)
831 *pmq_ptr = port_mq;
832 return WQ_ITERATE_BREAK_KEEP_LOCKED;
833}
834
b0d623f7
A
835wait_result_t
836ipc_mqueue_receive_on_thread(
837 ipc_mqueue_t mqueue,
838 mach_msg_option_t option,
839 mach_msg_size_t max_size,
840 mach_msg_timeout_t rcv_timeout,
841 int interruptible,
842 thread_t thread)
1c79356b 843{
91447636 844 wait_result_t wresult;
b0d623f7 845 uint64_t deadline;
91447636 846 spl_t s;
1c79356b
A
847
848 s = splsched();
849 imq_lock(mqueue);
3e170ce0 850 /* no need to reserve anything: we never prepost to anyone */
1c79356b
A
851
852 if (imq_is_set(mqueue)) {
3e170ce0
A
853 ipc_mqueue_t port_mq = IMQ_NULL;
854 spl_t set_spl;
1c79356b 855
3e170ce0
A
856 (void)waitq_set_iterate_preposts(&mqueue->imq_set_queue,
857 &port_mq,
858 mqueue_process_prepost_receive,
859 &set_spl);
1c79356b 860
3e170ce0 861 if (port_mq != IMQ_NULL) {
1c79356b 862 /*
3e170ce0
A
863 * We get here if there is at least one message
864 * waiting on port_mq. We have instructed the prepost
865 * iteration logic to leave both the port_mq and the
866 * set mqueue locked.
867 *
868 * TODO: previously, we would place this port at the
869 * back of the prepost list...
1c79356b 870 */
3e170ce0 871 imq_unlock(mqueue);
b0d623f7 872
3e170ce0
A
873 /* TODO: if/when port mqueues become non irq safe,
874 * we won't need this spl, and we should be
875 * able to call splx(s) (if that's even
876 * necessary).
877 * For now, we've still disabled interrupts via
878 * imq_reserve_and_lock();
b0d623f7 879 */
3e170ce0 880 splx(set_spl);
1c79356b 881
b0d623f7
A
882 /*
883 * Continue on to handling the message with just
884 * the port mqueue locked.
885 */
3e170ce0
A
886 ipc_mqueue_select_on_thread(port_mq, mqueue, option,
887 max_size, thread);
888
1c79356b
A
889 imq_unlock(port_mq);
890 splx(s);
b0d623f7 891 return THREAD_NOT_WAITING;
1c79356b 892 }
1c79356b 893 } else {
3e170ce0 894 ipc_kmsg_queue_t kmsgs;
1c79356b
A
895
896 /*
897 * Receive on a single port. Just try to get the messages.
898 */
899 kmsgs = &mqueue->imq_messages;
900 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
3e170ce0
A
901 ipc_mqueue_select_on_thread(mqueue, IMQ_NULL, option,
902 max_size, thread);
1c79356b
A
903 imq_unlock(mqueue);
904 splx(s);
b0d623f7 905 return THREAD_NOT_WAITING;
1c79356b
A
906 }
907 }
b0d623f7 908
1c79356b
A
909 /*
910 * Looks like we'll have to block. The mqueue we will
911 * block on (whether the set's or the local port's) is
912 * still locked.
913 */
1c79356b 914 if (option & MACH_RCV_TIMEOUT) {
91447636 915 if (rcv_timeout == 0) {
1c79356b
A
916 imq_unlock(mqueue);
917 splx(s);
b0d623f7
A
918 thread->ith_state = MACH_RCV_TIMED_OUT;
919 return THREAD_NOT_WAITING;
1c79356b
A
920 }
921 }
922
3e170ce0 923 /* NOTE: need splsched() here if mqueue no longer needs irq disabled */
b0d623f7
A
924 thread_lock(thread);
925 thread->ith_state = MACH_RCV_IN_PROGRESS;
926 thread->ith_option = option;
927 thread->ith_msize = max_size;
55e303ae 928
91447636
A
929 if (option & MACH_RCV_TIMEOUT)
930 clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
931 else
932 deadline = 0;
933
3e170ce0
A
934 wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue,
935 IPC_MQUEUE_RECEIVE,
936 interruptible,
937 TIMEOUT_URGENCY_USER_NORMAL,
938 deadline,
939 TIMEOUT_NO_LEEWAY,
940 thread);
b0d623f7
A
941 /* preposts should be detected above, not here */
942 if (wresult == THREAD_AWAKENED)
943 panic("ipc_mqueue_receive_on_thread: sleep walking");
944
945 thread_unlock(thread);
55e303ae 946 imq_unlock(mqueue);
1c79356b 947 splx(s);
b0d623f7 948 return wresult;
1c79356b
A
949}
950
951
952/*
b0d623f7 953 * Routine: ipc_mqueue_select_on_thread
1c79356b
A
954 * Purpose:
955 * A receiver discovered that there was a message on the queue
956 * before he had to block. Pick the message off the queue and
b0d623f7 957 * "post" it to thread.
1c79356b
A
958 * Conditions:
959 * mqueue locked.
b0d623f7 960 * thread not locked.
1c79356b 961 * There is a message.
3e170ce0
A
962 * No need to reserve prepost objects - it will never prepost
963 *
1c79356b
A
964 * Returns:
965 * MACH_MSG_SUCCESS Actually selected a message for ourselves.
966 * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
967 */
968void
b0d623f7 969ipc_mqueue_select_on_thread(
3e170ce0
A
970 ipc_mqueue_t port_mq,
971 ipc_mqueue_t set_mq,
1c79356b 972 mach_msg_option_t option,
b0d623f7
A
973 mach_msg_size_t max_size,
974 thread_t thread)
1c79356b 975{
1c79356b 976 ipc_kmsg_t kmsg;
b0d623f7 977 mach_msg_return_t mr = MACH_MSG_SUCCESS;
91447636 978 mach_msg_size_t rcv_size;
1c79356b 979
1c79356b
A
980 /*
981 * Do some sanity checking of our ability to receive
982 * before pulling the message off the queue.
983 */
3e170ce0 984 kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
1c79356b
A
985 assert(kmsg != IKM_NULL);
986
1c79356b
A
987 /*
988 * If we really can't receive it, but we had the
989 * MACH_RCV_LARGE option set, then don't take it off
990 * the queue, instead return the appropriate error
991 * (and size needed).
992 */
b0d623f7 993 rcv_size = ipc_kmsg_copyout_size(kmsg, thread->map);
316670eb 994 if (rcv_size + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) {
91447636
A
995 mr = MACH_RCV_TOO_LARGE;
996 if (option & MACH_RCV_LARGE) {
3e170ce0 997 thread->ith_receiver_name = port_mq->imq_receiver_name;
b0d623f7
A
998 thread->ith_kmsg = IKM_NULL;
999 thread->ith_msize = rcv_size;
1000 thread->ith_seqno = 0;
1001 thread->ith_state = mr;
91447636
A
1002 return;
1003 }
1c79356b
A
1004 }
1005
3e170ce0
A
1006 ipc_kmsg_rmqueue_first_macro(&port_mq->imq_messages, kmsg);
1007 ipc_mqueue_release_msgcount(port_mq, set_mq);
1008 thread->ith_seqno = port_mq->imq_seqno++;
b0d623f7
A
1009 thread->ith_kmsg = kmsg;
1010 thread->ith_state = mr;
1c79356b
A
1011
1012 current_task()->messages_received++;
1013 return;
1014}
1015
b0d623f7
A
1016/*
1017 * Routine: ipc_mqueue_peek
1018 * Purpose:
39236c6e
A
1019 * Peek at a (non-set) message queue to see if it has a message
1020 * matching the sequence number provided (if zero, then the
1021 * first message in the queue) and return vital info about the
1022 * message.
1023 *
1024 * Conditions:
1025 * Locks may be held by callers, so this routine cannot block.
1026 * Caller holds reference on the message queue.
1027 */
1028unsigned
3e170ce0
A
1029ipc_mqueue_peek(ipc_mqueue_t mq,
1030 mach_port_seqno_t * seqnop,
1031 mach_msg_size_t * msg_sizep,
1032 mach_msg_id_t * msg_idp,
1033 mach_msg_max_trailer_t * msg_trailerp)
39236c6e
A
1034{
1035 ipc_kmsg_queue_t kmsgq;
3e170ce0 1036 ipc_kmsg_t kmsg;
39236c6e
A
1037 mach_port_seqno_t seqno, msgoff;
1038 int res = 0;
1039 spl_t s;
1040
1041 assert(!imq_is_set(mq));
1042
1043 s = splsched();
1044 imq_lock(mq);
1045
3e170ce0
A
1046 seqno = 0;
1047 if (seqnop != NULL)
1048 seqno = *seqnop;
39236c6e
A
1049
1050 if (seqno == 0) {
1051 seqno = mq->imq_seqno;
1052 msgoff = 0;
1053 } else if (seqno >= mq->imq_seqno &&
1054 seqno < mq->imq_seqno + mq->imq_msgcount) {
1055 msgoff = seqno - mq->imq_seqno;
1056 } else
1057 goto out;
1058
1059 /* look for the message that would match that seqno */
1060 kmsgq = &mq->imq_messages;
1061 kmsg = ipc_kmsg_queue_first(kmsgq);
1062 while (msgoff-- && kmsg != IKM_NULL) {
1063 kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
1064 }
1065 if (kmsg == IKM_NULL)
1066 goto out;
1067
1068 /* found one - return the requested info */
1069 if (seqnop != NULL)
1070 *seqnop = seqno;
1071 if (msg_sizep != NULL)
1072 *msg_sizep = kmsg->ikm_header->msgh_size;
1073 if (msg_idp != NULL)
1074 *msg_idp = kmsg->ikm_header->msgh_id;
1075 if (msg_trailerp != NULL)
1076 memcpy(msg_trailerp,
1077 (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
1078 round_msg(kmsg->ikm_header->msgh_size)),
1079 sizeof(mach_msg_max_trailer_t));
1080 res = 1;
1081
1082 out:
1083 imq_unlock(mq);
1084 splx(s);
1085 return res;
1086}
1087
3e170ce0
A
1088
1089/*
1090 * peek at the contained port message queues, break prepost iteration as soon
1091 * as we spot a message on one of the message queues referenced by the set's
1092 * prepost list. No need to lock each message queue, as only the head of each
1093 * queue is checked. If a message wasn't there before we entered here, no need
1094 * to find it (if we do, great).
1095 */
1096static int mqueue_peek_iterator(void *ctx, struct waitq *waitq,
1097 struct waitq_set *wqset)
1098{
1099 ipc_mqueue_t port_mq = (ipc_mqueue_t)waitq;
1100 ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
1101
1102 (void)ctx;
1103 (void)wqset;
1104
1105 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL)
1106 return WQ_ITERATE_BREAK; /* break out of the prepost iteration */
1107
1108 return WQ_ITERATE_CONTINUE;
1109}
1110
39236c6e
A
1111/*
1112 * Routine: ipc_mqueue_set_peek
1113 * Purpose:
1114 * Peek at a message queue set to see if it has any ports
1115 * with messages.
b0d623f7
A
1116 *
1117 * Conditions:
1118 * Locks may be held by callers, so this routine cannot block.
1119 * Caller holds reference on the message queue.
1120 */
6d2010ae 1121unsigned
39236c6e 1122ipc_mqueue_set_peek(ipc_mqueue_t mq)
b0d623f7 1123{
b0d623f7 1124 spl_t s;
3e170ce0 1125 int ret;
b0d623f7 1126
39236c6e 1127 assert(imq_is_set(mq));
b0d623f7 1128
b0d623f7 1129 s = splsched();
6d2010ae 1130 imq_lock(mq);
b0d623f7 1131
3e170ce0
A
1132 ret = waitq_set_iterate_preposts(&mq->imq_set_queue, NULL,
1133 mqueue_peek_iterator, NULL);
1134
b0d623f7
A
1135 imq_unlock(mq);
1136 splx(s);
3e170ce0 1137 return (ret == WQ_ITERATE_BREAK);
39236c6e
A
1138}
1139
1140/*
1141 * Routine: ipc_mqueue_set_gather_member_names
1142 * Purpose:
3e170ce0
A
1143 * Discover all ports which are members of a given port set.
1144 * Because the waitq linkage mechanism was redesigned to save
1145 * significan amounts of memory, it no longer keeps back-pointers
1146 * from a port set to a port. Therefore, we must iterate over all
1147 * ports within a given IPC space and individually query them to
1148 * see if they are members of the given set. Port names of ports
1149 * found to be members of the given set will be gathered into the
1150 * provided 'names' array. Actual returned names are limited to
1151 * maxnames entries, but we keep counting the actual number of
1152 * members to let the caller decide to retry if necessary.
39236c6e
A
1153 *
1154 * Conditions:
1155 * Locks may be held by callers, so this routine cannot block.
3e170ce0 1156 * Caller holds reference on the message queue (via port set).
39236c6e
A
1157 */
1158void
1159ipc_mqueue_set_gather_member_names(
3e170ce0
A
1160 ipc_space_t space,
1161 ipc_mqueue_t set_mq,
1162 ipc_entry_num_t maxnames,
39236c6e
A
1163 mach_port_name_t *names,
1164 ipc_entry_num_t *actualp)
1165{
3e170ce0
A
1166 ipc_entry_t table;
1167 ipc_entry_num_t tsize;
1168 struct waitq_set *wqset;
39236c6e
A
1169 ipc_entry_num_t actual = 0;
1170
3e170ce0
A
1171 assert(set_mq != IMQ_NULL);
1172 wqset = &set_mq->imq_set_queue;
39236c6e 1173
3e170ce0
A
1174 assert(space != IS_NULL);
1175 is_read_lock(space);
1176 if (!is_active(space)) {
1177 is_read_unlock(space);
1178 goto out;
1179 }
39236c6e 1180
3e170ce0
A
1181 if (!waitq_set_is_valid(wqset)) {
1182 is_read_unlock(space);
1183 goto out;
1184 }
39236c6e 1185
3e170ce0
A
1186 table = space->is_table;
1187 tsize = space->is_table_size;
1188 for (ipc_entry_num_t idx = 0; idx < tsize; idx++) {
1189 ipc_entry_t entry = &table[idx];
1190
1191 /* only receive rights can be members of port sets */
1192 if ((entry->ie_bits & MACH_PORT_TYPE_RECEIVE) != MACH_PORT_TYPE_NONE) {
1193 __IGNORE_WCASTALIGN(ipc_port_t port = (ipc_port_t)entry->ie_object);
1194 ipc_mqueue_t mq = &port->ip_messages;
1195
1196 assert(IP_VALID(port));
1197 if (ip_active(port) &&
1198 waitq_member(&mq->imq_wait_queue, wqset)) {
1199 if (actual < maxnames)
1200 names[actual] = mq->imq_receiver_name;
1201 actual++;
1202 }
1203 }
39236c6e 1204 }
39236c6e 1205
3e170ce0
A
1206 is_read_unlock(space);
1207
1208out:
39236c6e 1209 *actualp = actual;
b0d623f7
A
1210}
1211
39236c6e 1212
1c79356b
A
1213/*
1214 * Routine: ipc_mqueue_destroy
1215 * Purpose:
6d2010ae
A
1216 * Destroy a (non-set) message queue.
1217 * Set any blocked senders running.
1c79356b
A
1218 * Destroy the kmsgs in the queue.
1219 * Conditions:
1220 * Nothing locked.
1221 * Receivers were removed when the receive right was "changed"
1222 */
1223void
1224ipc_mqueue_destroy(
6d2010ae 1225 ipc_mqueue_t mqueue)
1c79356b
A
1226{
1227 ipc_kmsg_queue_t kmqueue;
1228 ipc_kmsg_t kmsg;
6d2010ae 1229 boolean_t reap = FALSE;
1c79356b
A
1230 spl_t s;
1231
3e170ce0
A
1232 assert(!imq_is_set(mqueue));
1233
1c79356b
A
1234 s = splsched();
1235 imq_lock(mqueue);
3e170ce0 1236
1c79356b
A
1237 /*
1238 * rouse all blocked senders
3e170ce0
A
1239 * (don't boost anyone - we're tearing this queue down)
1240 * (never preposts)
1c79356b
A
1241 */
1242 mqueue->imq_fullwaiters = FALSE;
3e170ce0
A
1243 waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
1244 IPC_MQUEUE_FULL,
1245 THREAD_RESTART,
1246 NULL,
1247 WAITQ_ALL_PRIORITIES,
1248 WAITQ_KEEP_LOCKED);
1c79356b 1249
6d2010ae
A
1250 /*
1251 * Move messages from the specified queue to the per-thread
1252 * clean/drain queue while we have the mqueue lock.
1253 */
1c79356b 1254 kmqueue = &mqueue->imq_messages;
1c79356b 1255 while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
6d2010ae
A
1256 boolean_t first;
1257 first = ipc_kmsg_delayed_destroy(kmsg);
1258 if (first)
1259 reap = first;
1c79356b 1260 }
6d2010ae 1261
3e170ce0
A
1262 /*
1263 * Wipe out message count, both for messages about to be
1264 * reaped and for reserved space for (previously) woken senders.
1265 * This is the indication to them that their reserved space is gone
1266 * (the mqueue was destroyed).
1267 */
1268 mqueue->imq_msgcount = 0;
1269
1270 /* clear out any preposting we may have done */
1271 waitq_clear_prepost_locked(&mqueue->imq_wait_queue, &s);
1272
1c79356b
A
1273 imq_unlock(mqueue);
1274 splx(s);
6d2010ae 1275
3e170ce0
A
1276 /*
1277 * assert that we're destroying a queue that's not a
1278 * member of any other queue
1279 */
1280 assert(mqueue->imq_wait_queue.waitq_prepost_id == 0);
1281 assert(mqueue->imq_wait_queue.waitq_set_id == 0);
1282
1283
6d2010ae
A
1284 /*
1285 * Destroy the messages we enqueued if we aren't nested
1286 * inside some other attempt to drain the same queue.
1287 */
1288 if (reap)
1289 ipc_kmsg_reap_delayed();
1c79356b
A
1290}
1291
1292/*
1293 * Routine: ipc_mqueue_set_qlimit
1294 * Purpose:
1295 * Changes a message queue limit; the maximum number
1296 * of messages which may be queued.
1297 * Conditions:
1298 * Nothing locked.
1299 */
1300
1301void
1302ipc_mqueue_set_qlimit(
1303 ipc_mqueue_t mqueue,
1304 mach_port_msgcount_t qlimit)
1305{
1306 spl_t s;
1307
91447636
A
1308 assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1309
1c79356b
A
1310 /* wake up senders allowed by the new qlimit */
1311 s = splsched();
1312 imq_lock(mqueue);
1313 if (qlimit > mqueue->imq_qlimit) {
1314 mach_port_msgcount_t i, wakeup;
1315
1316 /* caution: wakeup, qlimit are unsigned */
1317 wakeup = qlimit - mqueue->imq_qlimit;
1318
1319 for (i = 0; i < wakeup; i++) {
3e170ce0
A
1320 /*
1321 * boost the priority of the awoken thread
1322 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
1323 * the message queue slot we've just reserved.
1324 *
1325 * NOTE: this will never prepost
1326 */
1327 if (waitq_wakeup64_one_locked(&mqueue->imq_wait_queue,
1328 IPC_MQUEUE_FULL,
1329 THREAD_AWAKENED,
1330 NULL,
1331 WAITQ_PROMOTE_PRIORITY,
1332 WAITQ_KEEP_LOCKED) == KERN_NOT_WAITING) {
1333 mqueue->imq_fullwaiters = FALSE;
1334 break;
1335 }
1336 mqueue->imq_msgcount++; /* give it to the awakened thread */
1c79356b 1337 }
3e170ce0 1338 }
1c79356b
A
1339 mqueue->imq_qlimit = qlimit;
1340 imq_unlock(mqueue);
1341 splx(s);
1342}
1343
1344/*
1345 * Routine: ipc_mqueue_set_seqno
1346 * Purpose:
1347 * Changes an mqueue's sequence number.
1348 * Conditions:
1349 * Caller holds a reference to the queue's containing object.
1350 */
1351void
1352ipc_mqueue_set_seqno(
1353 ipc_mqueue_t mqueue,
1354 mach_port_seqno_t seqno)
1355{
1356 spl_t s;
1357
1358 s = splsched();
1359 imq_lock(mqueue);
1360 mqueue->imq_seqno = seqno;
1361 imq_unlock(mqueue);
1362 splx(s);
1363}
1364
1365
1366/*
1367 * Routine: ipc_mqueue_copyin
1368 * Purpose:
1369 * Convert a name in a space to a message queue.
1370 * Conditions:
1371 * Nothing locked. If successful, the caller gets a ref for
1372 * for the object. This ref ensures the continued existence of
1373 * the queue.
1374 * Returns:
1375 * MACH_MSG_SUCCESS Found a message queue.
1376 * MACH_RCV_INVALID_NAME The space is dead.
1377 * MACH_RCV_INVALID_NAME The name doesn't denote a right.
1378 * MACH_RCV_INVALID_NAME
1379 * The denoted right is not receive or port set.
1380 * MACH_RCV_IN_SET Receive right is a member of a set.
1381 */
1382
1383mach_msg_return_t
1384ipc_mqueue_copyin(
1385 ipc_space_t space,
1386 mach_port_name_t name,
1387 ipc_mqueue_t *mqueuep,
1388 ipc_object_t *objectp)
1389{
1390 ipc_entry_t entry;
1391 ipc_object_t object;
1392 ipc_mqueue_t mqueue;
1393
1394 is_read_lock(space);
316670eb 1395 if (!is_active(space)) {
1c79356b
A
1396 is_read_unlock(space);
1397 return MACH_RCV_INVALID_NAME;
1398 }
1399
1400 entry = ipc_entry_lookup(space, name);
1401 if (entry == IE_NULL) {
1402 is_read_unlock(space);
1403 return MACH_RCV_INVALID_NAME;
1404 }
1405
1406 object = entry->ie_object;
1407
1408 if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
1409 ipc_port_t port;
1c79356b 1410
3e170ce0 1411 __IGNORE_WCASTALIGN(port = (ipc_port_t) object);
1c79356b
A
1412 assert(port != IP_NULL);
1413
1414 ip_lock(port);
1415 assert(ip_active(port));
1416 assert(port->ip_receiver_name == name);
1417 assert(port->ip_receiver == space);
1418 is_read_unlock(space);
1419 mqueue = &port->ip_messages;
1420
1421 } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
1422 ipc_pset_t pset;
1423
3e170ce0 1424 __IGNORE_WCASTALIGN(pset = (ipc_pset_t) object);
1c79356b
A
1425 assert(pset != IPS_NULL);
1426
1427 ips_lock(pset);
1428 assert(ips_active(pset));
1429 assert(pset->ips_local_name == name);
1430 is_read_unlock(space);
1431
1432 mqueue = &pset->ips_messages;
1433 } else {
1434 is_read_unlock(space);
1435 return MACH_RCV_INVALID_NAME;
1436 }
1437
1438 /*
1439 * At this point, the object is locked and active,
1440 * the space is unlocked, and mqueue is initialized.
1441 */
1442
1443 io_reference(object);
1444 io_unlock(object);
1445
1446 *objectp = object;
1447 *mqueuep = mqueue;
1448 return MACH_MSG_SUCCESS;
1449}