]> git.saurik.com Git - apple/xnu.git/blame - osfmk/ipc/ipc_mqueue.c
xnu-2422.1.72.tar.gz
[apple/xnu.git] / osfmk / ipc / ipc_mqueue.c
CommitLineData
1c79356b 1/*
2d21ac55 2 * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
1c79356b 3 *
2d21ac55 4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
1c79356b 5 *
2d21ac55
A
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
8f6c56a5 14 *
2d21ac55
A
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
8f6c56a5
A
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
2d21ac55
A
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
8f6c56a5 25 *
2d21ac55 26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
1c79356b
A
27 */
28/*
29 * @OSF_FREE_COPYRIGHT@
30 */
31/*
32 * Mach Operating System
33 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34 * All Rights Reserved.
35 *
36 * Permission to use, copy, modify and distribute this software and its
37 * documentation is hereby granted, provided that both the copyright
38 * notice and this permission notice appear in all copies of the
39 * software, derivative works or modified versions, and any portions
40 * thereof, and that both notices appear in supporting documentation.
41 *
42 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45 *
46 * Carnegie Mellon requests users of this software to return to
47 *
48 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
49 * School of Computer Science
50 * Carnegie Mellon University
51 * Pittsburgh PA 15213-3890
52 *
53 * any improvements or extensions that they make and grant Carnegie Mellon
54 * the rights to redistribute these changes.
55 */
56/*
57 */
58/*
59 * File: ipc/ipc_mqueue.c
60 * Author: Rich Draves
61 * Date: 1989
62 *
63 * Functions to manipulate IPC message queues.
64 */
2d21ac55
A
65/*
66 * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67 * support for mandatory and extensible security protections. This notice
68 * is included in support of clause 2.2 (b) of the Apple Public License,
69 * Version 2.0.
70 */
71
1c79356b
A
72
73#include <mach/port.h>
74#include <mach/message.h>
75#include <mach/sync_policy.h>
76
77#include <kern/assert.h>
78#include <kern/counters.h>
79#include <kern/sched_prim.h>
80#include <kern/ipc_kobject.h>
91447636 81#include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
1c79356b
A
82#include <kern/misc_protos.h>
83#include <kern/task.h>
84#include <kern/thread.h>
85#include <kern/wait_queue.h>
86
87#include <ipc/ipc_mqueue.h>
88#include <ipc/ipc_kmsg.h>
89#include <ipc/ipc_port.h>
90#include <ipc/ipc_pset.h>
91#include <ipc/ipc_space.h>
92
b0d623f7
A
93#ifdef __LP64__
94#include <vm/vm_map.h>
95#endif
1c79356b 96
2d21ac55
A
97#if CONFIG_MACF_MACH
98#include <security/mac_mach_internal.h>
99#endif
100
1c79356b
A
101int ipc_mqueue_full; /* address is event for queue space */
102int ipc_mqueue_rcv; /* address is event for message arrival */
103
91447636
A
104/* forward declarations */
105void ipc_mqueue_receive_results(wait_result_t result);
106
1c79356b
A
107/*
108 * Routine: ipc_mqueue_init
109 * Purpose:
110 * Initialize a newly-allocated message queue.
111 */
112void
113ipc_mqueue_init(
114 ipc_mqueue_t mqueue,
115 boolean_t is_set)
116{
117 if (is_set) {
b0d623f7 118 wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO|SYNC_POLICY_PREPOST);
1c79356b
A
119 } else {
120 wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
121 ipc_kmsg_queue_init(&mqueue->imq_messages);
122 mqueue->imq_seqno = 0;
123 mqueue->imq_msgcount = 0;
124 mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
125 mqueue->imq_fullwaiters = FALSE;
126 }
127}
128
129/*
130 * Routine: ipc_mqueue_member
131 * Purpose:
132 * Indicate whether the (port) mqueue is a member of
133 * this portset's mqueue. We do this by checking
134 * whether the portset mqueue's waitq is an member of
135 * the port's mqueue waitq.
136 * Conditions:
137 * the portset's mqueue is not already a member
138 * this may block while allocating linkage structures.
139 */
140
141boolean_t
142ipc_mqueue_member(
91447636
A
143 ipc_mqueue_t port_mqueue,
144 ipc_mqueue_t set_mqueue)
1c79356b
A
145{
146 wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
91447636 147 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
1c79356b
A
148
149 return (wait_queue_member(port_waitq, set_waitq));
150
151}
152
153/*
154 * Routine: ipc_mqueue_remove
155 * Purpose:
156 * Remove the association between the queue and the specified
9bccf70c 157 * set message queue.
1c79356b
A
158 */
159
160kern_return_t
161ipc_mqueue_remove(
316670eb
A
162 ipc_mqueue_t mqueue,
163 ipc_mqueue_t set_mqueue,
164 wait_queue_link_t *wqlp)
1c79356b
A
165{
166 wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
9bccf70c 167 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
1c79356b 168
316670eb 169 return wait_queue_unlink_nofree(mq_waitq, set_waitq, wqlp);
1c79356b
A
170}
171
172/*
9bccf70c 173 * Routine: ipc_mqueue_remove_from_all
1c79356b 174 * Purpose:
9bccf70c 175 * Remove the mqueue from all the sets it is a member of
1c79356b 176 * Conditions:
9bccf70c 177 * Nothing locked.
1c79356b
A
178 */
179void
9bccf70c 180ipc_mqueue_remove_from_all(
316670eb
A
181 ipc_mqueue_t mqueue,
182 queue_t links)
1c79356b
A
183{
184 wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
185
316670eb 186 wait_queue_unlink_all_nofree(mq_waitq, links);
9bccf70c
A
187 return;
188}
189
190/*
191 * Routine: ipc_mqueue_remove_all
192 * Purpose:
193 * Remove all the member queues from the specified set.
194 * Conditions:
195 * Nothing locked.
196 */
197void
198ipc_mqueue_remove_all(
316670eb
A
199 ipc_mqueue_t mqueue,
200 queue_t links)
9bccf70c
A
201{
202 wait_queue_set_t mq_setq = &mqueue->imq_set_queue;
203
316670eb 204 wait_queue_set_unlink_all_nofree(mq_setq, links);
1c79356b
A
205 return;
206}
207
208
209/*
210 * Routine: ipc_mqueue_add
211 * Purpose:
212 * Associate the portset's mqueue with the port's mqueue.
213 * This has to be done so that posting the port will wakeup
214 * a portset waiter. If there are waiters on the portset
215 * mqueue and messages on the port mqueue, try to match them
216 * up now.
217 * Conditions:
218 * May block.
219 */
220kern_return_t
221ipc_mqueue_add(
222 ipc_mqueue_t port_mqueue,
316670eb
A
223 ipc_mqueue_t set_mqueue,
224 wait_queue_link_t wql)
1c79356b
A
225{
226 wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
9bccf70c 227 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
1c79356b
A
228 ipc_kmsg_queue_t kmsgq;
229 ipc_kmsg_t kmsg, next;
230 kern_return_t kr;
231 spl_t s;
232
316670eb 233 kr = wait_queue_link_noalloc(port_waitq, set_waitq, wql);
1c79356b
A
234 if (kr != KERN_SUCCESS)
235 return kr;
236
237 /*
238 * Now that the set has been added to the port, there may be
239 * messages queued on the port and threads waiting on the set
240 * waitq. Lets get them together.
241 */
242 s = splsched();
243 imq_lock(port_mqueue);
244 kmsgq = &port_mqueue->imq_messages;
245 for (kmsg = ipc_kmsg_queue_first(kmsgq);
246 kmsg != IKM_NULL;
247 kmsg = next) {
248 next = ipc_kmsg_queue_next(kmsgq, kmsg);
249
250 for (;;) {
251 thread_t th;
b0d623f7 252 mach_msg_size_t msize;
1c79356b 253
9bccf70c
A
254 th = wait_queue_wakeup64_identity_locked(
255 port_waitq,
256 IPC_MQUEUE_RECEIVE,
257 THREAD_AWAKENED,
258 FALSE);
1c79356b
A
259 /* waitq/mqueue still locked, thread locked */
260
261 if (th == THREAD_NULL)
262 goto leave;
263
b0d623f7
A
264 /*
265 * If the receiver waited with a facility not directly
266 * related to Mach messaging, then it isn't prepared to get
267 * handed the message directly. Just set it running, and
268 * go look for another thread that can.
269 */
270 if (th->ith_state != MACH_RCV_IN_PROGRESS) {
271 thread_unlock(th);
272 continue;
273 }
274
1c79356b
A
275 /*
276 * Found a receiver. see if they can handle the message
277 * correctly (the message is not too large for them, or
278 * they didn't care to be informed that the message was
279 * too large). If they can't handle it, take them off
280 * the list and let them go back and figure it out and
281 * just move onto the next.
282 */
b0d623f7 283 msize = ipc_kmsg_copyout_size(kmsg, th->map);
1c79356b 284 if (th->ith_msize <
316670eb 285 (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(th), th->ith_option))) {
1c79356b 286 th->ith_state = MACH_RCV_TOO_LARGE;
b0d623f7 287 th->ith_msize = msize;
1c79356b
A
288 if (th->ith_option & MACH_RCV_LARGE) {
289 /*
290 * let him go without message
291 */
b0d623f7 292 th->ith_receiver_name = port_mqueue->imq_receiver_name;
1c79356b
A
293 th->ith_kmsg = IKM_NULL;
294 th->ith_seqno = 0;
295 thread_unlock(th);
296 continue; /* find another thread */
297 }
298 } else {
299 th->ith_state = MACH_MSG_SUCCESS;
300 }
301
302 /*
303 * This thread is going to take this message,
304 * so give it to him.
305 */
1c79356b 306 ipc_kmsg_rmqueue(kmsgq, kmsg);
91447636
A
307 ipc_mqueue_release_msgcount(port_mqueue);
308
1c79356b
A
309 th->ith_kmsg = kmsg;
310 th->ith_seqno = port_mqueue->imq_seqno++;
311 thread_unlock(th);
312 break; /* go to next message */
313 }
314
315 }
316 leave:
317 imq_unlock(port_mqueue);
318 splx(s);
319 return KERN_SUCCESS;
320}
321
322/*
323 * Routine: ipc_mqueue_changed
324 * Purpose:
325 * Wake up receivers waiting in a message queue.
326 * Conditions:
327 * The message queue is locked.
328 */
329
330void
331ipc_mqueue_changed(
332 ipc_mqueue_t mqueue)
333{
9bccf70c
A
334 wait_queue_wakeup64_all_locked(
335 &mqueue->imq_wait_queue,
336 IPC_MQUEUE_RECEIVE,
337 THREAD_RESTART,
338 FALSE); /* unlock waitq? */
1c79356b
A
339}
340
341
342
343
344/*
345 * Routine: ipc_mqueue_send
346 * Purpose:
347 * Send a message to a message queue. The message holds a reference
348 * for the destination port for this message queue in the
349 * msgh_remote_port field.
350 *
351 * If unsuccessful, the caller still has possession of
352 * the message and must do something with it. If successful,
353 * the message is queued, given to a receiver, or destroyed.
354 * Conditions:
39236c6e 355 * mqueue is locked.
1c79356b
A
356 * Returns:
357 * MACH_MSG_SUCCESS The message was accepted.
358 * MACH_SEND_TIMED_OUT Caller still has message.
359 * MACH_SEND_INTERRUPTED Caller still has message.
360 */
361mach_msg_return_t
362ipc_mqueue_send(
363 ipc_mqueue_t mqueue,
b0d623f7 364 ipc_kmsg_t kmsg,
1c79356b 365 mach_msg_option_t option,
b0d623f7
A
366 mach_msg_timeout_t send_timeout,
367 spl_t s)
1c79356b 368{
9bccf70c 369 int wresult;
1c79356b
A
370
371 /*
372 * Don't block if:
373 * 1) We're under the queue limit.
374 * 2) Caller used the MACH_SEND_ALWAYS internal option.
375 * 3) Message is sent to a send-once right.
376 */
1c79356b 377 if (!imq_full(mqueue) ||
c910b4d9
A
378 (!imq_full_kernel(mqueue) &&
379 ((option & MACH_SEND_ALWAYS) ||
380 (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
381 MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
1c79356b 382 mqueue->imq_msgcount++;
91447636 383 assert(mqueue->imq_msgcount > 0);
1c79356b
A
384 imq_unlock(mqueue);
385 splx(s);
386 } else {
55e303ae 387 thread_t cur_thread = current_thread();
91447636 388 uint64_t deadline;
1c79356b
A
389
390 /*
391 * We have to wait for space to be granted to us.
392 */
91447636 393 if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
1c79356b
A
394 imq_unlock(mqueue);
395 splx(s);
396 return MACH_SEND_TIMED_OUT;
397 }
c910b4d9
A
398 if (imq_full_kernel(mqueue)) {
399 imq_unlock(mqueue);
400 splx(s);
401 return MACH_SEND_NO_BUFFER;
402 }
1c79356b 403 mqueue->imq_fullwaiters = TRUE;
55e303ae 404 thread_lock(cur_thread);
91447636
A
405 if (option & MACH_SEND_TIMEOUT)
406 clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
407 else
408 deadline = 0;
9bccf70c
A
409 wresult = wait_queue_assert_wait64_locked(
410 &mqueue->imq_wait_queue,
411 IPC_MQUEUE_FULL,
39236c6e
A
412 THREAD_ABORTSAFE,
413 TIMEOUT_URGENCY_USER_NORMAL,
414 deadline, 0,
55e303ae
A
415 cur_thread);
416 thread_unlock(cur_thread);
417 imq_unlock(mqueue);
1c79356b
A
418 splx(s);
419
9bccf70c 420 if (wresult == THREAD_WAITING) {
91447636 421 wresult = thread_block(THREAD_CONTINUE_NULL);
9bccf70c
A
422 counter(c_ipc_mqueue_send_block++);
423 }
1c79356b 424
9bccf70c 425 switch (wresult) {
1c79356b
A
426 case THREAD_TIMED_OUT:
427 assert(option & MACH_SEND_TIMEOUT);
428 return MACH_SEND_TIMED_OUT;
429
430 case THREAD_AWAKENED:
431 /* we can proceed - inherited msgcount from waker */
91447636 432 assert(mqueue->imq_msgcount > 0);
1c79356b
A
433 break;
434
435 case THREAD_INTERRUPTED:
1c79356b
A
436 return MACH_SEND_INTERRUPTED;
437
438 case THREAD_RESTART:
b0d623f7
A
439 /* mqueue is being destroyed */
440 return MACH_SEND_INVALID_DEST;
1c79356b
A
441 default:
442 panic("ipc_mqueue_send");
443 }
444 }
445
446 ipc_mqueue_post(mqueue, kmsg);
447 return MACH_MSG_SUCCESS;
448}
449
39236c6e 450
1c79356b
A
451/*
452 * Routine: ipc_mqueue_release_msgcount
453 * Purpose:
454 * Release a message queue reference in the case where we
455 * found a waiter.
456 *
457 * Conditions:
91447636
A
458 * The message queue is locked.
459 * The message corresponding to this reference is off the queue.
1c79356b
A
460 */
461void
462ipc_mqueue_release_msgcount(
463 ipc_mqueue_t mqueue)
464{
465 assert(imq_held(mqueue));
91447636 466 assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
1c79356b
A
467
468 mqueue->imq_msgcount--;
91447636 469
1c79356b 470 if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
9bccf70c
A
471 if (wait_queue_wakeup64_one_locked(
472 &mqueue->imq_wait_queue,
473 IPC_MQUEUE_FULL,
474 THREAD_AWAKENED,
475 FALSE) != KERN_SUCCESS) {
1c79356b
A
476 mqueue->imq_fullwaiters = FALSE;
477 } else {
91447636
A
478 /* gave away our slot - add reference back */
479 mqueue->imq_msgcount++;
1c79356b
A
480 }
481 }
482}
483
484/*
485 * Routine: ipc_mqueue_post
486 * Purpose:
487 * Post a message to a waiting receiver or enqueue it. If a
488 * receiver is waiting, we can release our reserved space in
489 * the message queue.
490 *
491 * Conditions:
492 * If we need to queue, our space in the message queue is reserved.
493 */
494void
495ipc_mqueue_post(
496 register ipc_mqueue_t mqueue,
497 register ipc_kmsg_t kmsg)
498{
1c79356b
A
499 spl_t s;
500
501 /*
502 * While the msg queue is locked, we have control of the
503 * kmsg, so the ref in it for the port is still good.
504 *
505 * Check for a receiver for the message.
506 */
507 s = splsched();
508 imq_lock(mqueue);
509 for (;;) {
510 wait_queue_t waitq = &mqueue->imq_wait_queue;
511 thread_t receiver;
b0d623f7 512 mach_msg_size_t msize;
1c79356b 513
9bccf70c
A
514 receiver = wait_queue_wakeup64_identity_locked(
515 waitq,
516 IPC_MQUEUE_RECEIVE,
517 THREAD_AWAKENED,
518 FALSE);
1c79356b
A
519 /* waitq still locked, thread locked */
520
521 if (receiver == THREAD_NULL) {
522 /*
523 * no receivers; queue kmsg
524 */
525 assert(mqueue->imq_msgcount > 0);
526 ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
527 break;
528 }
b0d623f7
A
529
530 /*
531 * If the receiver waited with a facility not directly
532 * related to Mach messaging, then it isn't prepared to get
533 * handed the message directly. Just set it running, and
534 * go look for another thread that can.
535 */
536 if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
537 thread_unlock(receiver);
538 continue;
539 }
540
541
1c79356b
A
542 /*
543 * We found a waiting thread.
544 * If the message is too large or the scatter list is too small
545 * the thread we wake up will get that as its status.
546 */
b0d623f7 547 msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
1c79356b 548 if (receiver->ith_msize <
316670eb 549 (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit(receiver), receiver->ith_option))) {
b0d623f7 550 receiver->ith_msize = msize;
1c79356b
A
551 receiver->ith_state = MACH_RCV_TOO_LARGE;
552 } else {
553 receiver->ith_state = MACH_MSG_SUCCESS;
554 }
555
556 /*
557 * If there is no problem with the upcoming receive, or the
558 * receiver thread didn't specifically ask for special too
559 * large error condition, go ahead and select it anyway.
560 */
561 if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
562 !(receiver->ith_option & MACH_RCV_LARGE)) {
563
564 receiver->ith_kmsg = kmsg;
565 receiver->ith_seqno = mqueue->imq_seqno++;
566 thread_unlock(receiver);
567
568 /* we didn't need our reserved spot in the queue */
569 ipc_mqueue_release_msgcount(mqueue);
570 break;
571 }
572
573 /*
574 * Otherwise, this thread needs to be released to run
575 * and handle its error without getting the message. We
576 * need to go back and pick another one.
577 */
39236c6e 578 receiver->ith_receiver_name = mqueue->imq_receiver_name;
1c79356b
A
579 receiver->ith_kmsg = IKM_NULL;
580 receiver->ith_seqno = 0;
581 thread_unlock(receiver);
582 }
583
584 imq_unlock(mqueue);
585 splx(s);
586
587 current_task()->messages_sent++;
588 return;
589}
590
591
91447636
A
592/* static */ void
593ipc_mqueue_receive_results(wait_result_t saved_wait_result)
1c79356b
A
594{
595 thread_t self = current_thread();
596 mach_msg_option_t option = self->ith_option;
1c79356b
A
597
598 /*
599 * why did we wake up?
600 */
601 switch (saved_wait_result) {
602 case THREAD_TIMED_OUT:
603 self->ith_state = MACH_RCV_TIMED_OUT;
604 return;
605
606 case THREAD_INTERRUPTED:
1c79356b
A
607 self->ith_state = MACH_RCV_INTERRUPTED;
608 return;
609
610 case THREAD_RESTART:
611 /* something bad happened to the port/set */
1c79356b
A
612 self->ith_state = MACH_RCV_PORT_CHANGED;
613 return;
614
615 case THREAD_AWAKENED:
616 /*
617 * We do not need to go select a message, somebody
618 * handed us one (or a too-large indication).
619 */
1c79356b
A
620 switch (self->ith_state) {
621 case MACH_RCV_SCATTER_SMALL:
622 case MACH_RCV_TOO_LARGE:
623 /*
624 * Somebody tried to give us a too large
625 * message. If we indicated that we cared,
626 * then they only gave us the indication,
627 * otherwise they gave us the indication
628 * AND the message anyway.
629 */
630 if (option & MACH_RCV_LARGE) {
631 return;
632 }
633
634 case MACH_MSG_SUCCESS:
635 return;
636
637 default:
638 panic("ipc_mqueue_receive_results: strange ith_state");
639 }
640
641 default:
642 panic("ipc_mqueue_receive_results: strange wait_result");
643 }
644}
645
646void
91447636
A
647ipc_mqueue_receive_continue(
648 __unused void *param,
649 wait_result_t wresult)
1c79356b 650{
91447636 651 ipc_mqueue_receive_results(wresult);
1c79356b
A
652 mach_msg_receive_continue(); /* hard-coded for now */
653}
654
655/*
656 * Routine: ipc_mqueue_receive
657 * Purpose:
658 * Receive a message from a message queue.
659 *
660 * If continuation is non-zero, then we might discard
661 * our kernel stack when we block. We will continue
662 * after unblocking by executing continuation.
663 *
664 * If resume is true, then we are resuming a receive
665 * operation after a blocked receive discarded our stack.
666 * Conditions:
667 * Our caller must hold a reference for the port or port set
668 * to which this queue belongs, to keep the queue
669 * from being deallocated.
670 *
671 * The kmsg is returned with clean header fields
672 * and with the circular bit turned off.
673 * Returns:
674 * MACH_MSG_SUCCESS Message returned in kmsgp.
675 * MACH_RCV_TOO_LARGE Message size returned in kmsgp.
676 * MACH_RCV_TIMED_OUT No message obtained.
677 * MACH_RCV_INTERRUPTED No message obtained.
678 * MACH_RCV_PORT_DIED Port/set died; no message.
679 * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
680 *
681 */
682
683void
684ipc_mqueue_receive(
b0d623f7
A
685 ipc_mqueue_t mqueue,
686 mach_msg_option_t option,
687 mach_msg_size_t max_size,
688 mach_msg_timeout_t rcv_timeout,
689 int interruptible)
690{
691 wait_result_t wresult;
692 thread_t self = current_thread();
693
694 wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size,
695 rcv_timeout, interruptible,
696 self);
697 if (wresult == THREAD_NOT_WAITING)
698 return;
699
700 if (wresult == THREAD_WAITING) {
701 counter((interruptible == THREAD_ABORTSAFE) ?
702 c_ipc_mqueue_receive_block_user++ :
703 c_ipc_mqueue_receive_block_kernel++);
704
705 if (self->ith_continuation)
706 thread_block(ipc_mqueue_receive_continue);
707 /* NOTREACHED */
708
709 wresult = thread_block(THREAD_CONTINUE_NULL);
710 }
711 ipc_mqueue_receive_results(wresult);
712}
713
714wait_result_t
715ipc_mqueue_receive_on_thread(
716 ipc_mqueue_t mqueue,
717 mach_msg_option_t option,
718 mach_msg_size_t max_size,
719 mach_msg_timeout_t rcv_timeout,
720 int interruptible,
721 thread_t thread)
1c79356b 722{
91447636
A
723 ipc_kmsg_queue_t kmsgs;
724 wait_result_t wresult;
b0d623f7 725 uint64_t deadline;
91447636 726 spl_t s;
2d21ac55
A
727#if CONFIG_MACF_MACH
728 ipc_labelh_t lh;
729 task_t task;
730 int rc;
731#endif
1c79356b
A
732
733 s = splsched();
734 imq_lock(mqueue);
735
736 if (imq_is_set(mqueue)) {
1c79356b
A
737 queue_t q;
738
b0d623f7 739 q = &mqueue->imq_preposts;
1c79356b
A
740
741 /*
742 * If we are waiting on a portset mqueue, we need to see if
b0d623f7
A
743 * any of the member ports have work for us. Ports that
744 * have (or recently had) messages will be linked in the
745 * prepost queue for the portset. By holding the portset's
1c79356b
A
746 * mqueue lock during the search, we tie up any attempts by
747 * mqueue_deliver or portset membership changes that may
b0d623f7 748 * cross our path.
1c79356b
A
749 */
750 search_set:
b0d623f7
A
751 while(!queue_empty(q)) {
752 wait_queue_link_t wql;
753 ipc_mqueue_t port_mq;
754
755 queue_remove_first(q, wql, wait_queue_link_t, wql_preposts);
756 assert(!wql_is_preposted(wql));
757
758 /*
759 * This is a lock order violation, so we have to do it
760 * "softly," putting the link back on the prepost list
761 * if it fails (at the tail is fine since the order of
762 * handling messages from different sources in a set is
763 * not guaranteed and we'd like to skip to the next source
764 * if one is available).
765 */
1c79356b 766 port_mq = (ipc_mqueue_t)wql->wql_queue;
1c79356b 767 if (!imq_lock_try(port_mq)) {
b0d623f7 768 queue_enter(q, wql, wait_queue_link_t, wql_preposts);
1c79356b
A
769 imq_unlock(mqueue);
770 splx(s);
2d21ac55 771 mutex_pause(0);
1c79356b
A
772 s = splsched();
773 imq_lock(mqueue);
774 goto search_set; /* start again at beginning - SMP */
775 }
776
777 /*
b0d623f7
A
778 * If there are no messages on this queue, just skip it
779 * (we already removed the link from the set's prepost queue).
1c79356b 780 */
b0d623f7 781 kmsgs = &port_mq->imq_messages;
1c79356b
A
782 if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
783 imq_unlock(port_mq);
784 continue;
785 }
b0d623f7
A
786
787 /*
788 * There are messages, so reinsert the link back
789 * at the tail of the preposted queue (for fairness)
790 * while we still have the portset mqueue locked.
791 */
792 queue_enter(q, wql, wait_queue_link_t, wql_preposts);
1c79356b
A
793 imq_unlock(mqueue);
794
b0d623f7
A
795 /*
796 * Continue on to handling the message with just
797 * the port mqueue locked.
798 */
799 ipc_mqueue_select_on_thread(port_mq, option, max_size, thread);
1c79356b 800 imq_unlock(port_mq);
2d21ac55 801#if CONFIG_MACF_MACH
b0d623f7
A
802 if (thread->task != TASK_NULL &&
803 thread->ith_kmsg != NULL &&
804 thread->ith_kmsg->ikm_sender != NULL) {
805 lh = thread->ith_kmsg->ikm_sender->label;
806 tasklabel_lock(thread->task);
2d21ac55 807 ip_lock(lh->lh_port);
b0d623f7
A
808 rc = mac_port_check_receive(&thread->task->maclabel,
809 &lh->lh_label);
2d21ac55 810 ip_unlock(lh->lh_port);
b0d623f7 811 tasklabel_unlock(thread->task);
2d21ac55 812 if (rc)
b0d623f7 813 thread->ith_state = MACH_RCV_INVALID_DATA;
2d21ac55
A
814 }
815#endif
1c79356b 816 splx(s);
b0d623f7 817 return THREAD_NOT_WAITING;
1c79356b
A
818
819 }
820
821 } else {
822
823 /*
824 * Receive on a single port. Just try to get the messages.
825 */
826 kmsgs = &mqueue->imq_messages;
827 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
b0d623f7 828 ipc_mqueue_select_on_thread(mqueue, option, max_size, thread);
1c79356b 829 imq_unlock(mqueue);
2d21ac55 830#if CONFIG_MACF_MACH
b0d623f7
A
831 if (thread->task != TASK_NULL &&
832 thread->ith_kmsg != NULL &&
833 thread->ith_kmsg->ikm_sender != NULL) {
834 lh = thread->ith_kmsg->ikm_sender->label;
835 tasklabel_lock(thread->task);
2d21ac55 836 ip_lock(lh->lh_port);
b0d623f7
A
837 rc = mac_port_check_receive(&thread->task->maclabel,
838 &lh->lh_label);
2d21ac55 839 ip_unlock(lh->lh_port);
b0d623f7 840 tasklabel_unlock(thread->task);
2d21ac55 841 if (rc)
b0d623f7 842 thread->ith_state = MACH_RCV_INVALID_DATA;
2d21ac55
A
843 }
844#endif
1c79356b 845 splx(s);
b0d623f7 846 return THREAD_NOT_WAITING;
1c79356b
A
847 }
848 }
b0d623f7 849
1c79356b
A
850 /*
851 * Looks like we'll have to block. The mqueue we will
852 * block on (whether the set's or the local port's) is
853 * still locked.
854 */
1c79356b 855 if (option & MACH_RCV_TIMEOUT) {
91447636 856 if (rcv_timeout == 0) {
1c79356b
A
857 imq_unlock(mqueue);
858 splx(s);
b0d623f7
A
859 thread->ith_state = MACH_RCV_TIMED_OUT;
860 return THREAD_NOT_WAITING;
1c79356b
A
861 }
862 }
863
b0d623f7
A
864 thread_lock(thread);
865 thread->ith_state = MACH_RCV_IN_PROGRESS;
866 thread->ith_option = option;
867 thread->ith_msize = max_size;
55e303ae 868
91447636
A
869 if (option & MACH_RCV_TIMEOUT)
870 clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
871 else
872 deadline = 0;
873
9bccf70c 874 wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
b0d623f7 875 IPC_MQUEUE_RECEIVE,
39236c6e
A
876 interruptible,
877 TIMEOUT_URGENCY_USER_NORMAL,
878 deadline, 0,
b0d623f7
A
879 thread);
880 /* preposts should be detected above, not here */
881 if (wresult == THREAD_AWAKENED)
882 panic("ipc_mqueue_receive_on_thread: sleep walking");
883
884 thread_unlock(thread);
55e303ae 885 imq_unlock(mqueue);
1c79356b 886 splx(s);
b0d623f7 887 return wresult;
1c79356b
A
888}
889
890
891/*
b0d623f7 892 * Routine: ipc_mqueue_select_on_thread
1c79356b
A
893 * Purpose:
894 * A receiver discovered that there was a message on the queue
895 * before he had to block. Pick the message off the queue and
b0d623f7 896 * "post" it to thread.
1c79356b
A
897 * Conditions:
898 * mqueue locked.
b0d623f7 899 * thread not locked.
1c79356b
A
900 * There is a message.
901 * Returns:
902 * MACH_MSG_SUCCESS Actually selected a message for ourselves.
903 * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
904 */
905void
b0d623f7 906ipc_mqueue_select_on_thread(
1c79356b
A
907 ipc_mqueue_t mqueue,
908 mach_msg_option_t option,
b0d623f7
A
909 mach_msg_size_t max_size,
910 thread_t thread)
1c79356b 911{
1c79356b 912 ipc_kmsg_t kmsg;
b0d623f7 913 mach_msg_return_t mr = MACH_MSG_SUCCESS;
91447636 914 mach_msg_size_t rcv_size;
1c79356b 915
1c79356b
A
916 /*
917 * Do some sanity checking of our ability to receive
918 * before pulling the message off the queue.
919 */
920 kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
1c79356b
A
921 assert(kmsg != IKM_NULL);
922
1c79356b
A
923 /*
924 * If we really can't receive it, but we had the
925 * MACH_RCV_LARGE option set, then don't take it off
926 * the queue, instead return the appropriate error
927 * (and size needed).
928 */
b0d623f7 929 rcv_size = ipc_kmsg_copyout_size(kmsg, thread->map);
316670eb 930 if (rcv_size + REQUESTED_TRAILER_SIZE(thread_is_64bit(thread), option) > max_size) {
91447636
A
931 mr = MACH_RCV_TOO_LARGE;
932 if (option & MACH_RCV_LARGE) {
b0d623f7
A
933 thread->ith_receiver_name = mqueue->imq_receiver_name;
934 thread->ith_kmsg = IKM_NULL;
935 thread->ith_msize = rcv_size;
936 thread->ith_seqno = 0;
937 thread->ith_state = mr;
91447636
A
938 return;
939 }
1c79356b
A
940 }
941
942 ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
943 ipc_mqueue_release_msgcount(mqueue);
b0d623f7
A
944 thread->ith_seqno = mqueue->imq_seqno++;
945 thread->ith_kmsg = kmsg;
946 thread->ith_state = mr;
1c79356b
A
947
948 current_task()->messages_received++;
949 return;
950}
951
b0d623f7
A
952/*
953 * Routine: ipc_mqueue_peek
954 * Purpose:
39236c6e
A
955 * Peek at a (non-set) message queue to see if it has a message
956 * matching the sequence number provided (if zero, then the
957 * first message in the queue) and return vital info about the
958 * message.
959 *
960 * Conditions:
961 * Locks may be held by callers, so this routine cannot block.
962 * Caller holds reference on the message queue.
963 */
964unsigned
965ipc_mqueue_peek(ipc_mqueue_t mq,
966 mach_port_seqno_t *seqnop,
967 mach_msg_size_t *msg_sizep,
968 mach_msg_id_t *msg_idp,
969 mach_msg_max_trailer_t *msg_trailerp)
970{
971 ipc_kmsg_queue_t kmsgq;
972 ipc_kmsg_t kmsg;
973 mach_port_seqno_t seqno, msgoff;
974 int res = 0;
975 spl_t s;
976
977 assert(!imq_is_set(mq));
978
979 s = splsched();
980 imq_lock(mq);
981
982 seqno = (seqnop != NULL) ? seqno = *seqnop : 0;
983
984 if (seqno == 0) {
985 seqno = mq->imq_seqno;
986 msgoff = 0;
987 } else if (seqno >= mq->imq_seqno &&
988 seqno < mq->imq_seqno + mq->imq_msgcount) {
989 msgoff = seqno - mq->imq_seqno;
990 } else
991 goto out;
992
993 /* look for the message that would match that seqno */
994 kmsgq = &mq->imq_messages;
995 kmsg = ipc_kmsg_queue_first(kmsgq);
996 while (msgoff-- && kmsg != IKM_NULL) {
997 kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
998 }
999 if (kmsg == IKM_NULL)
1000 goto out;
1001
1002 /* found one - return the requested info */
1003 if (seqnop != NULL)
1004 *seqnop = seqno;
1005 if (msg_sizep != NULL)
1006 *msg_sizep = kmsg->ikm_header->msgh_size;
1007 if (msg_idp != NULL)
1008 *msg_idp = kmsg->ikm_header->msgh_id;
1009 if (msg_trailerp != NULL)
1010 memcpy(msg_trailerp,
1011 (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
1012 round_msg(kmsg->ikm_header->msgh_size)),
1013 sizeof(mach_msg_max_trailer_t));
1014 res = 1;
1015
1016 out:
1017 imq_unlock(mq);
1018 splx(s);
1019 return res;
1020}
1021
1022/*
1023 * Routine: ipc_mqueue_set_peek
1024 * Purpose:
1025 * Peek at a message queue set to see if it has any ports
1026 * with messages.
b0d623f7
A
1027 *
1028 * Conditions:
1029 * Locks may be held by callers, so this routine cannot block.
1030 * Caller holds reference on the message queue.
1031 */
6d2010ae 1032unsigned
39236c6e 1033ipc_mqueue_set_peek(ipc_mqueue_t mq)
b0d623f7
A
1034{
1035 wait_queue_link_t wql;
1036 queue_t q;
1037 spl_t s;
39236c6e 1038 int res;
b0d623f7 1039
39236c6e 1040 assert(imq_is_set(mq));
b0d623f7 1041
b0d623f7 1042 s = splsched();
6d2010ae 1043 imq_lock(mq);
b0d623f7
A
1044
1045 /*
1046 * peek at the contained port message queues, return as soon as
1047 * we spot a message on one of the message queues linked on the
39236c6e
A
1048 * prepost list. No need to lock each message queue, as only the
1049 * head of each queue is checked. If a message wasn't there before
1050 * we entered here, no need to find it (if we do, great).
b0d623f7 1051 */
39236c6e 1052 res = 0;
b0d623f7
A
1053 q = &mq->imq_preposts;
1054 queue_iterate(q, wql, wait_queue_link_t, wql_preposts) {
1055 ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
1056 ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
1057
1058 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
39236c6e
A
1059 res = 1;
1060 break;
b0d623f7
A
1061 }
1062 }
1063 imq_unlock(mq);
1064 splx(s);
39236c6e
A
1065 return res;
1066}
1067
1068/*
1069 * Routine: ipc_mqueue_set_gather_member_names
1070 * Purpose:
1071 * Iterate a message queue set to identify the member port
1072 * names. Actual returned names is limited to maxnames entries,
1073 * but we keep counting the actual number of members to let
1074 * the caller decide to retry if necessary.
1075 *
1076 * Conditions:
1077 * Locks may be held by callers, so this routine cannot block.
1078 * Caller holds reference on the message queue.
1079 */
1080void
1081ipc_mqueue_set_gather_member_names(
1082 ipc_mqueue_t mq,
1083 ipc_entry_num_t maxnames,
1084 mach_port_name_t *names,
1085 ipc_entry_num_t *actualp)
1086{
1087 wait_queue_link_t wql;
1088 queue_t q;
1089 spl_t s;
1090 ipc_entry_num_t actual = 0;
1091
1092 assert(imq_is_set(mq));
1093
1094 s = splsched();
1095 imq_lock(mq);
1096
1097 /*
1098 * Iterate over the member ports through the mqueue set links
1099 * capturing as many names as we can.
1100 */
1101 q = &mq->imq_setlinks;
1102 queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
1103 ipc_mqueue_t port_mq = (ipc_mqueue_t)wql->wql_queue;
1104
1105 if (actual < maxnames)
1106 names[actual] = port_mq->imq_receiver_name;
1107 actual++;
1108 }
1109 imq_unlock(mq);
1110 splx(s);
1111
1112 *actualp = actual;
b0d623f7
A
1113}
1114
39236c6e 1115
1c79356b
A
1116/*
1117 * Routine: ipc_mqueue_destroy
1118 * Purpose:
6d2010ae
A
1119 * Destroy a (non-set) message queue.
1120 * Set any blocked senders running.
1c79356b
A
1121 * Destroy the kmsgs in the queue.
1122 * Conditions:
1123 * Nothing locked.
1124 * Receivers were removed when the receive right was "changed"
1125 */
1126void
1127ipc_mqueue_destroy(
6d2010ae 1128 ipc_mqueue_t mqueue)
1c79356b
A
1129{
1130 ipc_kmsg_queue_t kmqueue;
1131 ipc_kmsg_t kmsg;
6d2010ae 1132 boolean_t reap = FALSE;
1c79356b
A
1133 spl_t s;
1134
1c79356b
A
1135 s = splsched();
1136 imq_lock(mqueue);
1137 /*
1138 * rouse all blocked senders
1139 */
1140 mqueue->imq_fullwaiters = FALSE;
9bccf70c
A
1141 wait_queue_wakeup64_all_locked(
1142 &mqueue->imq_wait_queue,
1143 IPC_MQUEUE_FULL,
b0d623f7 1144 THREAD_RESTART,
9bccf70c 1145 FALSE);
1c79356b 1146
6d2010ae
A
1147 /*
1148 * Move messages from the specified queue to the per-thread
1149 * clean/drain queue while we have the mqueue lock.
1150 */
1c79356b 1151 kmqueue = &mqueue->imq_messages;
1c79356b 1152 while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
6d2010ae
A
1153 boolean_t first;
1154 first = ipc_kmsg_delayed_destroy(kmsg);
1155 if (first)
1156 reap = first;
1c79356b 1157 }
6d2010ae 1158
1c79356b
A
1159 imq_unlock(mqueue);
1160 splx(s);
6d2010ae
A
1161
1162 /*
1163 * Destroy the messages we enqueued if we aren't nested
1164 * inside some other attempt to drain the same queue.
1165 */
1166 if (reap)
1167 ipc_kmsg_reap_delayed();
1c79356b
A
1168}
1169
1170/*
1171 * Routine: ipc_mqueue_set_qlimit
1172 * Purpose:
1173 * Changes a message queue limit; the maximum number
1174 * of messages which may be queued.
1175 * Conditions:
1176 * Nothing locked.
1177 */
1178
1179void
1180ipc_mqueue_set_qlimit(
1181 ipc_mqueue_t mqueue,
1182 mach_port_msgcount_t qlimit)
1183{
1184 spl_t s;
1185
91447636
A
1186 assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1187
1c79356b
A
1188 /* wake up senders allowed by the new qlimit */
1189 s = splsched();
1190 imq_lock(mqueue);
1191 if (qlimit > mqueue->imq_qlimit) {
1192 mach_port_msgcount_t i, wakeup;
1193
1194 /* caution: wakeup, qlimit are unsigned */
1195 wakeup = qlimit - mqueue->imq_qlimit;
1196
1197 for (i = 0; i < wakeup; i++) {
9bccf70c
A
1198 if (wait_queue_wakeup64_one_locked(
1199 &mqueue->imq_wait_queue,
1200 IPC_MQUEUE_FULL,
1201 THREAD_AWAKENED,
1202 FALSE) == KERN_NOT_WAITING) {
1c79356b
A
1203 mqueue->imq_fullwaiters = FALSE;
1204 break;
1205 }
91447636 1206 mqueue->imq_msgcount++; /* give it to the awakened thread */
1c79356b
A
1207 }
1208 }
1209 mqueue->imq_qlimit = qlimit;
1210 imq_unlock(mqueue);
1211 splx(s);
1212}
1213
1214/*
1215 * Routine: ipc_mqueue_set_seqno
1216 * Purpose:
1217 * Changes an mqueue's sequence number.
1218 * Conditions:
1219 * Caller holds a reference to the queue's containing object.
1220 */
1221void
1222ipc_mqueue_set_seqno(
1223 ipc_mqueue_t mqueue,
1224 mach_port_seqno_t seqno)
1225{
1226 spl_t s;
1227
1228 s = splsched();
1229 imq_lock(mqueue);
1230 mqueue->imq_seqno = seqno;
1231 imq_unlock(mqueue);
1232 splx(s);
1233}
1234
1235
1236/*
1237 * Routine: ipc_mqueue_copyin
1238 * Purpose:
1239 * Convert a name in a space to a message queue.
1240 * Conditions:
1241 * Nothing locked. If successful, the caller gets a ref for
1242 * for the object. This ref ensures the continued existence of
1243 * the queue.
1244 * Returns:
1245 * MACH_MSG_SUCCESS Found a message queue.
1246 * MACH_RCV_INVALID_NAME The space is dead.
1247 * MACH_RCV_INVALID_NAME The name doesn't denote a right.
1248 * MACH_RCV_INVALID_NAME
1249 * The denoted right is not receive or port set.
1250 * MACH_RCV_IN_SET Receive right is a member of a set.
1251 */
1252
1253mach_msg_return_t
1254ipc_mqueue_copyin(
1255 ipc_space_t space,
1256 mach_port_name_t name,
1257 ipc_mqueue_t *mqueuep,
1258 ipc_object_t *objectp)
1259{
1260 ipc_entry_t entry;
1261 ipc_object_t object;
1262 ipc_mqueue_t mqueue;
1263
1264 is_read_lock(space);
316670eb 1265 if (!is_active(space)) {
1c79356b
A
1266 is_read_unlock(space);
1267 return MACH_RCV_INVALID_NAME;
1268 }
1269
1270 entry = ipc_entry_lookup(space, name);
1271 if (entry == IE_NULL) {
1272 is_read_unlock(space);
1273 return MACH_RCV_INVALID_NAME;
1274 }
1275
1276 object = entry->ie_object;
1277
1278 if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
1279 ipc_port_t port;
1c79356b
A
1280
1281 port = (ipc_port_t) object;
1282 assert(port != IP_NULL);
1283
1284 ip_lock(port);
1285 assert(ip_active(port));
1286 assert(port->ip_receiver_name == name);
1287 assert(port->ip_receiver == space);
1288 is_read_unlock(space);
1289 mqueue = &port->ip_messages;
1290
1291 } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
1292 ipc_pset_t pset;
1293
1294 pset = (ipc_pset_t) object;
1295 assert(pset != IPS_NULL);
1296
1297 ips_lock(pset);
1298 assert(ips_active(pset));
1299 assert(pset->ips_local_name == name);
1300 is_read_unlock(space);
1301
1302 mqueue = &pset->ips_messages;
1303 } else {
1304 is_read_unlock(space);
1305 return MACH_RCV_INVALID_NAME;
1306 }
1307
1308 /*
1309 * At this point, the object is locked and active,
1310 * the space is unlocked, and mqueue is initialized.
1311 */
1312
1313 io_reference(object);
1314 io_unlock(object);
1315
1316 *objectp = object;
1317 *mqueuep = mqueue;
1318 return MACH_MSG_SUCCESS;
1319}
1320