]> git.saurik.com Git - apple/xnu.git/blob - osfmk/ipc/ipc_mqueue.c
xnu-1228.3.13.tar.gz
[apple/xnu.git] / osfmk / ipc / ipc_mqueue.c
1 /*
2 * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28 /*
29 * @OSF_FREE_COPYRIGHT@
30 */
31 /*
32 * Mach Operating System
33 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34 * All Rights Reserved.
35 *
36 * Permission to use, copy, modify and distribute this software and its
37 * documentation is hereby granted, provided that both the copyright
38 * notice and this permission notice appear in all copies of the
39 * software, derivative works or modified versions, and any portions
40 * thereof, and that both notices appear in supporting documentation.
41 *
42 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45 *
46 * Carnegie Mellon requests users of this software to return to
47 *
48 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
49 * School of Computer Science
50 * Carnegie Mellon University
51 * Pittsburgh PA 15213-3890
52 *
53 * any improvements or extensions that they make and grant Carnegie Mellon
54 * the rights to redistribute these changes.
55 */
56 /*
57 */
58 /*
59 * File: ipc/ipc_mqueue.c
60 * Author: Rich Draves
61 * Date: 1989
62 *
63 * Functions to manipulate IPC message queues.
64 */
65 /*
66 * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67 * support for mandatory and extensible security protections. This notice
68 * is included in support of clause 2.2 (b) of the Apple Public License,
69 * Version 2.0.
70 */
71
72
73 #include <mach/port.h>
74 #include <mach/message.h>
75 #include <mach/sync_policy.h>
76
77 #include <kern/assert.h>
78 #include <kern/counters.h>
79 #include <kern/sched_prim.h>
80 #include <kern/ipc_kobject.h>
81 #include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
82 #include <kern/misc_protos.h>
83 #include <kern/task.h>
84 #include <kern/thread.h>
85 #include <kern/wait_queue.h>
86
87 #include <ipc/ipc_mqueue.h>
88 #include <ipc/ipc_kmsg.h>
89 #include <ipc/ipc_port.h>
90 #include <ipc/ipc_pset.h>
91 #include <ipc/ipc_space.h>
92
93 #include <ddb/tr.h>
94
95 #if CONFIG_MACF_MACH
96 #include <security/mac_mach_internal.h>
97 #endif
98
99 int ipc_mqueue_full; /* address is event for queue space */
100 int ipc_mqueue_rcv; /* address is event for message arrival */
101
102 #define TR_ENABLE 0
103
104 /* forward declarations */
105 void ipc_mqueue_receive_results(wait_result_t result);
106
107 /*
108 * Routine: ipc_mqueue_init
109 * Purpose:
110 * Initialize a newly-allocated message queue.
111 */
112 void
113 ipc_mqueue_init(
114 ipc_mqueue_t mqueue,
115 boolean_t is_set)
116 {
117 if (is_set) {
118 wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
119 } else {
120 wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
121 ipc_kmsg_queue_init(&mqueue->imq_messages);
122 mqueue->imq_seqno = 0;
123 mqueue->imq_msgcount = 0;
124 mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
125 mqueue->imq_fullwaiters = FALSE;
126 }
127 }
128
129 /*
130 * Routine: ipc_mqueue_member
131 * Purpose:
132 * Indicate whether the (port) mqueue is a member of
133 * this portset's mqueue. We do this by checking
134 * whether the portset mqueue's waitq is an member of
135 * the port's mqueue waitq.
136 * Conditions:
137 * the portset's mqueue is not already a member
138 * this may block while allocating linkage structures.
139 */
140
141 boolean_t
142 ipc_mqueue_member(
143 ipc_mqueue_t port_mqueue,
144 ipc_mqueue_t set_mqueue)
145 {
146 wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
147 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
148
149 return (wait_queue_member(port_waitq, set_waitq));
150
151 }
152
153 /*
154 * Routine: ipc_mqueue_remove
155 * Purpose:
156 * Remove the association between the queue and the specified
157 * set message queue.
158 */
159
160 kern_return_t
161 ipc_mqueue_remove(
162 ipc_mqueue_t mqueue,
163 ipc_mqueue_t set_mqueue)
164 {
165 wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
166 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
167
168 return wait_queue_unlink(mq_waitq, set_waitq);
169 }
170
171 /*
172 * Routine: ipc_mqueue_remove_from_all
173 * Purpose:
174 * Remove the mqueue from all the sets it is a member of
175 * Conditions:
176 * Nothing locked.
177 */
178 void
179 ipc_mqueue_remove_from_all(
180 ipc_mqueue_t mqueue)
181 {
182 wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
183
184 wait_queue_unlink_all(mq_waitq);
185 return;
186 }
187
188 /*
189 * Routine: ipc_mqueue_remove_all
190 * Purpose:
191 * Remove all the member queues from the specified set.
192 * Conditions:
193 * Nothing locked.
194 */
195 void
196 ipc_mqueue_remove_all(
197 ipc_mqueue_t mqueue)
198 {
199 wait_queue_set_t mq_setq = &mqueue->imq_set_queue;
200
201 wait_queue_set_unlink_all(mq_setq);
202 return;
203 }
204
205
206 /*
207 * Routine: ipc_mqueue_add
208 * Purpose:
209 * Associate the portset's mqueue with the port's mqueue.
210 * This has to be done so that posting the port will wakeup
211 * a portset waiter. If there are waiters on the portset
212 * mqueue and messages on the port mqueue, try to match them
213 * up now.
214 * Conditions:
215 * May block.
216 */
217 kern_return_t
218 ipc_mqueue_add(
219 ipc_mqueue_t port_mqueue,
220 ipc_mqueue_t set_mqueue)
221 {
222 wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
223 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
224 ipc_kmsg_queue_t kmsgq;
225 ipc_kmsg_t kmsg, next;
226 kern_return_t kr;
227 spl_t s;
228
229 kr = wait_queue_link(port_waitq, set_waitq);
230 if (kr != KERN_SUCCESS)
231 return kr;
232
233 /*
234 * Now that the set has been added to the port, there may be
235 * messages queued on the port and threads waiting on the set
236 * waitq. Lets get them together.
237 */
238 s = splsched();
239 imq_lock(port_mqueue);
240 kmsgq = &port_mqueue->imq_messages;
241 for (kmsg = ipc_kmsg_queue_first(kmsgq);
242 kmsg != IKM_NULL;
243 kmsg = next) {
244 next = ipc_kmsg_queue_next(kmsgq, kmsg);
245
246 for (;;) {
247 thread_t th;
248
249 th = wait_queue_wakeup64_identity_locked(
250 port_waitq,
251 IPC_MQUEUE_RECEIVE,
252 THREAD_AWAKENED,
253 FALSE);
254 /* waitq/mqueue still locked, thread locked */
255
256 if (th == THREAD_NULL)
257 goto leave;
258
259 /*
260 * Found a receiver. see if they can handle the message
261 * correctly (the message is not too large for them, or
262 * they didn't care to be informed that the message was
263 * too large). If they can't handle it, take them off
264 * the list and let them go back and figure it out and
265 * just move onto the next.
266 */
267 if (th->ith_msize <
268 kmsg->ikm_header->msgh_size +
269 REQUESTED_TRAILER_SIZE(th->ith_option)) {
270 th->ith_state = MACH_RCV_TOO_LARGE;
271 th->ith_msize = kmsg->ikm_header->msgh_size;
272 if (th->ith_option & MACH_RCV_LARGE) {
273 /*
274 * let him go without message
275 */
276 th->ith_kmsg = IKM_NULL;
277 th->ith_seqno = 0;
278 thread_unlock(th);
279 continue; /* find another thread */
280 }
281 } else {
282 th->ith_state = MACH_MSG_SUCCESS;
283 }
284
285 /*
286 * This thread is going to take this message,
287 * so give it to him.
288 */
289 ipc_kmsg_rmqueue(kmsgq, kmsg);
290 ipc_mqueue_release_msgcount(port_mqueue);
291
292 th->ith_kmsg = kmsg;
293 th->ith_seqno = port_mqueue->imq_seqno++;
294 thread_unlock(th);
295 break; /* go to next message */
296 }
297
298 }
299 leave:
300 imq_unlock(port_mqueue);
301 splx(s);
302 return KERN_SUCCESS;
303 }
304
305 /*
306 * Routine: ipc_mqueue_changed
307 * Purpose:
308 * Wake up receivers waiting in a message queue.
309 * Conditions:
310 * The message queue is locked.
311 */
312
313 void
314 ipc_mqueue_changed(
315 ipc_mqueue_t mqueue)
316 {
317 wait_queue_wakeup64_all_locked(
318 &mqueue->imq_wait_queue,
319 IPC_MQUEUE_RECEIVE,
320 THREAD_RESTART,
321 FALSE); /* unlock waitq? */
322 }
323
324
325
326
327 /*
328 * Routine: ipc_mqueue_send
329 * Purpose:
330 * Send a message to a message queue. The message holds a reference
331 * for the destination port for this message queue in the
332 * msgh_remote_port field.
333 *
334 * If unsuccessful, the caller still has possession of
335 * the message and must do something with it. If successful,
336 * the message is queued, given to a receiver, or destroyed.
337 * Conditions:
338 * Nothing locked.
339 * Returns:
340 * MACH_MSG_SUCCESS The message was accepted.
341 * MACH_SEND_TIMED_OUT Caller still has message.
342 * MACH_SEND_INTERRUPTED Caller still has message.
343 */
344 mach_msg_return_t
345 ipc_mqueue_send(
346 ipc_mqueue_t mqueue,
347 ipc_kmsg_t kmsg,
348 mach_msg_option_t option,
349 mach_msg_timeout_t send_timeout)
350 {
351 int wresult;
352 spl_t s;
353
354 /*
355 * Don't block if:
356 * 1) We're under the queue limit.
357 * 2) Caller used the MACH_SEND_ALWAYS internal option.
358 * 3) Message is sent to a send-once right.
359 */
360 s = splsched();
361 imq_lock(mqueue);
362
363 if (!imq_full(mqueue) ||
364 (option & MACH_SEND_ALWAYS) ||
365 (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
366 MACH_MSG_TYPE_PORT_SEND_ONCE)) {
367 mqueue->imq_msgcount++;
368 assert(mqueue->imq_msgcount > 0);
369 imq_unlock(mqueue);
370 splx(s);
371 } else {
372 thread_t cur_thread = current_thread();
373 uint64_t deadline;
374
375 /*
376 * We have to wait for space to be granted to us.
377 */
378 if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
379 imq_unlock(mqueue);
380 splx(s);
381 return MACH_SEND_TIMED_OUT;
382 }
383 mqueue->imq_fullwaiters = TRUE;
384 thread_lock(cur_thread);
385 if (option & MACH_SEND_TIMEOUT)
386 clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
387 else
388 deadline = 0;
389 wresult = wait_queue_assert_wait64_locked(
390 &mqueue->imq_wait_queue,
391 IPC_MQUEUE_FULL,
392 THREAD_ABORTSAFE, deadline,
393 cur_thread);
394 thread_unlock(cur_thread);
395 imq_unlock(mqueue);
396 splx(s);
397
398 if (wresult == THREAD_WAITING) {
399 wresult = thread_block(THREAD_CONTINUE_NULL);
400 counter(c_ipc_mqueue_send_block++);
401 }
402
403 switch (wresult) {
404 case THREAD_TIMED_OUT:
405 assert(option & MACH_SEND_TIMEOUT);
406 return MACH_SEND_TIMED_OUT;
407
408 case THREAD_AWAKENED:
409 /* we can proceed - inherited msgcount from waker */
410 assert(mqueue->imq_msgcount > 0);
411 break;
412
413 case THREAD_INTERRUPTED:
414 return MACH_SEND_INTERRUPTED;
415
416 case THREAD_RESTART:
417 default:
418 panic("ipc_mqueue_send");
419 }
420 }
421
422 ipc_mqueue_post(mqueue, kmsg);
423 return MACH_MSG_SUCCESS;
424 }
425
426 /*
427 * Routine: ipc_mqueue_release_msgcount
428 * Purpose:
429 * Release a message queue reference in the case where we
430 * found a waiter.
431 *
432 * Conditions:
433 * The message queue is locked.
434 * The message corresponding to this reference is off the queue.
435 */
436 void
437 ipc_mqueue_release_msgcount(
438 ipc_mqueue_t mqueue)
439 {
440 assert(imq_held(mqueue));
441 assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
442
443 mqueue->imq_msgcount--;
444
445 if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
446 if (wait_queue_wakeup64_one_locked(
447 &mqueue->imq_wait_queue,
448 IPC_MQUEUE_FULL,
449 THREAD_AWAKENED,
450 FALSE) != KERN_SUCCESS) {
451 mqueue->imq_fullwaiters = FALSE;
452 } else {
453 /* gave away our slot - add reference back */
454 mqueue->imq_msgcount++;
455 }
456 }
457 }
458
459 /*
460 * Routine: ipc_mqueue_post
461 * Purpose:
462 * Post a message to a waiting receiver or enqueue it. If a
463 * receiver is waiting, we can release our reserved space in
464 * the message queue.
465 *
466 * Conditions:
467 * If we need to queue, our space in the message queue is reserved.
468 */
469 void
470 ipc_mqueue_post(
471 register ipc_mqueue_t mqueue,
472 register ipc_kmsg_t kmsg)
473 {
474
475 spl_t s;
476
477 /*
478 * While the msg queue is locked, we have control of the
479 * kmsg, so the ref in it for the port is still good.
480 *
481 * Check for a receiver for the message.
482 */
483 s = splsched();
484 imq_lock(mqueue);
485 for (;;) {
486 wait_queue_t waitq = &mqueue->imq_wait_queue;
487 thread_t receiver;
488
489 receiver = wait_queue_wakeup64_identity_locked(
490 waitq,
491 IPC_MQUEUE_RECEIVE,
492 THREAD_AWAKENED,
493 FALSE);
494 /* waitq still locked, thread locked */
495
496 if (receiver == THREAD_NULL) {
497 /*
498 * no receivers; queue kmsg
499 */
500 assert(mqueue->imq_msgcount > 0);
501 ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
502 break;
503 }
504
505 /*
506 * We found a waiting thread.
507 * If the message is too large or the scatter list is too small
508 * the thread we wake up will get that as its status.
509 */
510 if (receiver->ith_msize <
511 (kmsg->ikm_header->msgh_size) +
512 REQUESTED_TRAILER_SIZE(receiver->ith_option)) {
513 receiver->ith_msize = kmsg->ikm_header->msgh_size;
514 receiver->ith_state = MACH_RCV_TOO_LARGE;
515 } else {
516 receiver->ith_state = MACH_MSG_SUCCESS;
517 }
518
519 /*
520 * If there is no problem with the upcoming receive, or the
521 * receiver thread didn't specifically ask for special too
522 * large error condition, go ahead and select it anyway.
523 */
524 if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
525 !(receiver->ith_option & MACH_RCV_LARGE)) {
526
527 receiver->ith_kmsg = kmsg;
528 receiver->ith_seqno = mqueue->imq_seqno++;
529 thread_unlock(receiver);
530
531 /* we didn't need our reserved spot in the queue */
532 ipc_mqueue_release_msgcount(mqueue);
533 break;
534 }
535
536 /*
537 * Otherwise, this thread needs to be released to run
538 * and handle its error without getting the message. We
539 * need to go back and pick another one.
540 */
541 receiver->ith_kmsg = IKM_NULL;
542 receiver->ith_seqno = 0;
543 thread_unlock(receiver);
544 }
545
546 imq_unlock(mqueue);
547 splx(s);
548
549 current_task()->messages_sent++;
550 return;
551 }
552
553
554 /* static */ void
555 ipc_mqueue_receive_results(wait_result_t saved_wait_result)
556 {
557 thread_t self = current_thread();
558 mach_msg_option_t option = self->ith_option;
559
560 /*
561 * why did we wake up?
562 */
563 switch (saved_wait_result) {
564 case THREAD_TIMED_OUT:
565 self->ith_state = MACH_RCV_TIMED_OUT;
566 return;
567
568 case THREAD_INTERRUPTED:
569 self->ith_state = MACH_RCV_INTERRUPTED;
570 return;
571
572 case THREAD_RESTART:
573 /* something bad happened to the port/set */
574 self->ith_state = MACH_RCV_PORT_CHANGED;
575 return;
576
577 case THREAD_AWAKENED:
578 /*
579 * We do not need to go select a message, somebody
580 * handed us one (or a too-large indication).
581 */
582 switch (self->ith_state) {
583 case MACH_RCV_SCATTER_SMALL:
584 case MACH_RCV_TOO_LARGE:
585 /*
586 * Somebody tried to give us a too large
587 * message. If we indicated that we cared,
588 * then they only gave us the indication,
589 * otherwise they gave us the indication
590 * AND the message anyway.
591 */
592 if (option & MACH_RCV_LARGE) {
593 return;
594 }
595
596 case MACH_MSG_SUCCESS:
597 return;
598
599 default:
600 panic("ipc_mqueue_receive_results: strange ith_state");
601 }
602
603 default:
604 panic("ipc_mqueue_receive_results: strange wait_result");
605 }
606 }
607
608 void
609 ipc_mqueue_receive_continue(
610 __unused void *param,
611 wait_result_t wresult)
612 {
613 ipc_mqueue_receive_results(wresult);
614 mach_msg_receive_continue(); /* hard-coded for now */
615 }
616
617 /*
618 * Routine: ipc_mqueue_receive
619 * Purpose:
620 * Receive a message from a message queue.
621 *
622 * If continuation is non-zero, then we might discard
623 * our kernel stack when we block. We will continue
624 * after unblocking by executing continuation.
625 *
626 * If resume is true, then we are resuming a receive
627 * operation after a blocked receive discarded our stack.
628 * Conditions:
629 * Our caller must hold a reference for the port or port set
630 * to which this queue belongs, to keep the queue
631 * from being deallocated.
632 *
633 * The kmsg is returned with clean header fields
634 * and with the circular bit turned off.
635 * Returns:
636 * MACH_MSG_SUCCESS Message returned in kmsgp.
637 * MACH_RCV_TOO_LARGE Message size returned in kmsgp.
638 * MACH_RCV_TIMED_OUT No message obtained.
639 * MACH_RCV_INTERRUPTED No message obtained.
640 * MACH_RCV_PORT_DIED Port/set died; no message.
641 * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
642 *
643 */
644
645 void
646 ipc_mqueue_receive(
647 ipc_mqueue_t mqueue,
648 mach_msg_option_t option,
649 mach_msg_size_t max_size,
650 mach_msg_timeout_t rcv_timeout,
651 int interruptible)
652 {
653 ipc_kmsg_queue_t kmsgs;
654 wait_result_t wresult;
655 thread_t self;
656 uint64_t deadline;
657 spl_t s;
658 #if CONFIG_MACF_MACH
659 ipc_labelh_t lh;
660 task_t task;
661 int rc;
662 #endif
663
664 s = splsched();
665 imq_lock(mqueue);
666 self = current_thread();
667
668 if (imq_is_set(mqueue)) {
669 wait_queue_link_t wql;
670 ipc_mqueue_t port_mq;
671 queue_t q;
672
673 q = &mqueue->imq_setlinks;
674
675 /*
676 * If we are waiting on a portset mqueue, we need to see if
677 * any of the member ports have work for us. If so, try to
678 * deliver one of those messages. By holding the portset's
679 * mqueue lock during the search, we tie up any attempts by
680 * mqueue_deliver or portset membership changes that may
681 * cross our path. But this is a lock order violation, so we
682 * have to do it "softly." If we don't find a message waiting
683 * for us, we will assert our intention to wait while still
684 * holding that lock. When we release the lock, the deliver/
685 * change will succeed and find us.
686 */
687 search_set:
688 queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
689 port_mq = (ipc_mqueue_t)wql->wql_queue;
690 kmsgs = &port_mq->imq_messages;
691
692 if (!imq_lock_try(port_mq)) {
693 imq_unlock(mqueue);
694 splx(s);
695 mutex_pause(0);
696 s = splsched();
697 imq_lock(mqueue);
698 goto search_set; /* start again at beginning - SMP */
699 }
700
701 /*
702 * If there is still a message to be had, we will
703 * try to select it (may not succeed because of size
704 * and options). In any case, we deliver those
705 * results back to the user.
706 *
707 * We also move the port's linkage to the tail of the
708 * list for this set (fairness). Future versions will
709 * sort by timestamp or priority.
710 */
711 if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
712 imq_unlock(port_mq);
713 continue;
714 }
715 queue_remove(q, wql, wait_queue_link_t, wql_setlinks);
716 queue_enter(q, wql, wait_queue_link_t, wql_setlinks);
717 imq_unlock(mqueue);
718
719 ipc_mqueue_select(port_mq, option, max_size);
720 imq_unlock(port_mq);
721 #if CONFIG_MACF_MACH
722 if (self->ith_kmsg != NULL &&
723 self->ith_kmsg->ikm_sender != NULL) {
724 lh = self->ith_kmsg->ikm_sender->label;
725 task = current_task();
726 tasklabel_lock(task);
727 ip_lock(lh->lh_port);
728 rc = mac_port_check_receive(&task->maclabel,
729 &lh->lh_label);
730 ip_unlock(lh->lh_port);
731 tasklabel_unlock(task);
732 if (rc)
733 self->ith_state = MACH_RCV_INVALID_DATA;
734 }
735 #endif
736 splx(s);
737 return;
738
739 }
740
741 } else {
742
743 /*
744 * Receive on a single port. Just try to get the messages.
745 */
746 kmsgs = &mqueue->imq_messages;
747 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
748 ipc_mqueue_select(mqueue, option, max_size);
749 imq_unlock(mqueue);
750 #if CONFIG_MACF_MACH
751 if (self->ith_kmsg != NULL &&
752 self->ith_kmsg->ikm_sender != NULL) {
753 lh = self->ith_kmsg->ikm_sender->label;
754 task = current_task();
755 tasklabel_lock(task);
756 ip_lock(lh->lh_port);
757 rc = mac_port_check_receive(&task->maclabel,
758 &lh->lh_label);
759 ip_unlock(lh->lh_port);
760 tasklabel_unlock(task);
761 if (rc)
762 self->ith_state = MACH_RCV_INVALID_DATA;
763 }
764 #endif
765 splx(s);
766 return;
767 }
768 }
769
770 /*
771 * Looks like we'll have to block. The mqueue we will
772 * block on (whether the set's or the local port's) is
773 * still locked.
774 */
775 if (option & MACH_RCV_TIMEOUT) {
776 if (rcv_timeout == 0) {
777 imq_unlock(mqueue);
778 splx(s);
779 self->ith_state = MACH_RCV_TIMED_OUT;
780 return;
781 }
782 }
783
784 thread_lock(self);
785 self->ith_state = MACH_RCV_IN_PROGRESS;
786 self->ith_option = option;
787 self->ith_msize = max_size;
788
789 if (option & MACH_RCV_TIMEOUT)
790 clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
791 else
792 deadline = 0;
793
794 wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
795 IPC_MQUEUE_RECEIVE,
796 interruptible, deadline,
797 self);
798 thread_unlock(self);
799 imq_unlock(mqueue);
800 splx(s);
801
802 if (wresult == THREAD_WAITING) {
803 counter((interruptible == THREAD_ABORTSAFE) ?
804 c_ipc_mqueue_receive_block_user++ :
805 c_ipc_mqueue_receive_block_kernel++);
806
807 if (self->ith_continuation)
808 thread_block(ipc_mqueue_receive_continue);
809 /* NOTREACHED */
810
811 wresult = thread_block(THREAD_CONTINUE_NULL);
812 }
813 ipc_mqueue_receive_results(wresult);
814 }
815
816
817 /*
818 * Routine: ipc_mqueue_select
819 * Purpose:
820 * A receiver discovered that there was a message on the queue
821 * before he had to block. Pick the message off the queue and
822 * "post" it to himself.
823 * Conditions:
824 * mqueue locked.
825 * There is a message.
826 * Returns:
827 * MACH_MSG_SUCCESS Actually selected a message for ourselves.
828 * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
829 */
830 void
831 ipc_mqueue_select(
832 ipc_mqueue_t mqueue,
833 mach_msg_option_t option,
834 mach_msg_size_t max_size)
835 {
836 thread_t self = current_thread();
837 ipc_kmsg_t kmsg;
838 mach_msg_return_t mr;
839 mach_msg_size_t rcv_size;
840
841 mr = MACH_MSG_SUCCESS;
842
843
844 /*
845 * Do some sanity checking of our ability to receive
846 * before pulling the message off the queue.
847 */
848 kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
849 assert(kmsg != IKM_NULL);
850
851 /*
852 * If we really can't receive it, but we had the
853 * MACH_RCV_LARGE option set, then don't take it off
854 * the queue, instead return the appropriate error
855 * (and size needed).
856 */
857 rcv_size = ipc_kmsg_copyout_size(kmsg, self->map);
858 if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
859 mr = MACH_RCV_TOO_LARGE;
860 if (option & MACH_RCV_LARGE) {
861 self->ith_kmsg = IKM_NULL;
862 self->ith_msize = rcv_size;
863 self->ith_seqno = 0;
864 self->ith_state = mr;
865 return;
866 }
867 }
868
869 ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
870 ipc_mqueue_release_msgcount(mqueue);
871 self->ith_seqno = mqueue->imq_seqno++;
872 self->ith_kmsg = kmsg;
873 self->ith_state = mr;
874
875 current_task()->messages_received++;
876 return;
877 }
878
879 /*
880 * Routine: ipc_mqueue_destroy
881 * Purpose:
882 * Destroy a message queue. Set any blocked senders running.
883 * Destroy the kmsgs in the queue.
884 * Conditions:
885 * Nothing locked.
886 * Receivers were removed when the receive right was "changed"
887 */
888 void
889 ipc_mqueue_destroy(
890 ipc_mqueue_t mqueue)
891 {
892 ipc_kmsg_queue_t kmqueue;
893 ipc_kmsg_t kmsg;
894 spl_t s;
895
896
897 s = splsched();
898 imq_lock(mqueue);
899 /*
900 * rouse all blocked senders
901 */
902 mqueue->imq_fullwaiters = FALSE;
903 wait_queue_wakeup64_all_locked(
904 &mqueue->imq_wait_queue,
905 IPC_MQUEUE_FULL,
906 THREAD_AWAKENED,
907 FALSE);
908
909 kmqueue = &mqueue->imq_messages;
910
911 while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
912 imq_unlock(mqueue);
913 splx(s);
914
915 ipc_kmsg_destroy_dest(kmsg);
916
917 s = splsched();
918 imq_lock(mqueue);
919 }
920 imq_unlock(mqueue);
921 splx(s);
922 }
923
924 /*
925 * Routine: ipc_mqueue_set_qlimit
926 * Purpose:
927 * Changes a message queue limit; the maximum number
928 * of messages which may be queued.
929 * Conditions:
930 * Nothing locked.
931 */
932
933 void
934 ipc_mqueue_set_qlimit(
935 ipc_mqueue_t mqueue,
936 mach_port_msgcount_t qlimit)
937 {
938 spl_t s;
939
940 assert(qlimit <= MACH_PORT_QLIMIT_MAX);
941
942 /* wake up senders allowed by the new qlimit */
943 s = splsched();
944 imq_lock(mqueue);
945 if (qlimit > mqueue->imq_qlimit) {
946 mach_port_msgcount_t i, wakeup;
947
948 /* caution: wakeup, qlimit are unsigned */
949 wakeup = qlimit - mqueue->imq_qlimit;
950
951 for (i = 0; i < wakeup; i++) {
952 if (wait_queue_wakeup64_one_locked(
953 &mqueue->imq_wait_queue,
954 IPC_MQUEUE_FULL,
955 THREAD_AWAKENED,
956 FALSE) == KERN_NOT_WAITING) {
957 mqueue->imq_fullwaiters = FALSE;
958 break;
959 }
960 mqueue->imq_msgcount++; /* give it to the awakened thread */
961 }
962 }
963 mqueue->imq_qlimit = qlimit;
964 imq_unlock(mqueue);
965 splx(s);
966 }
967
968 /*
969 * Routine: ipc_mqueue_set_seqno
970 * Purpose:
971 * Changes an mqueue's sequence number.
972 * Conditions:
973 * Caller holds a reference to the queue's containing object.
974 */
975 void
976 ipc_mqueue_set_seqno(
977 ipc_mqueue_t mqueue,
978 mach_port_seqno_t seqno)
979 {
980 spl_t s;
981
982 s = splsched();
983 imq_lock(mqueue);
984 mqueue->imq_seqno = seqno;
985 imq_unlock(mqueue);
986 splx(s);
987 }
988
989
990 /*
991 * Routine: ipc_mqueue_copyin
992 * Purpose:
993 * Convert a name in a space to a message queue.
994 * Conditions:
995 * Nothing locked. If successful, the caller gets a ref for
996 * for the object. This ref ensures the continued existence of
997 * the queue.
998 * Returns:
999 * MACH_MSG_SUCCESS Found a message queue.
1000 * MACH_RCV_INVALID_NAME The space is dead.
1001 * MACH_RCV_INVALID_NAME The name doesn't denote a right.
1002 * MACH_RCV_INVALID_NAME
1003 * The denoted right is not receive or port set.
1004 * MACH_RCV_IN_SET Receive right is a member of a set.
1005 */
1006
1007 mach_msg_return_t
1008 ipc_mqueue_copyin(
1009 ipc_space_t space,
1010 mach_port_name_t name,
1011 ipc_mqueue_t *mqueuep,
1012 ipc_object_t *objectp)
1013 {
1014 ipc_entry_t entry;
1015 ipc_object_t object;
1016 ipc_mqueue_t mqueue;
1017
1018 is_read_lock(space);
1019 if (!space->is_active) {
1020 is_read_unlock(space);
1021 return MACH_RCV_INVALID_NAME;
1022 }
1023
1024 entry = ipc_entry_lookup(space, name);
1025 if (entry == IE_NULL) {
1026 is_read_unlock(space);
1027 return MACH_RCV_INVALID_NAME;
1028 }
1029
1030 object = entry->ie_object;
1031
1032 if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
1033 ipc_port_t port;
1034
1035 port = (ipc_port_t) object;
1036 assert(port != IP_NULL);
1037
1038 ip_lock(port);
1039 assert(ip_active(port));
1040 assert(port->ip_receiver_name == name);
1041 assert(port->ip_receiver == space);
1042 is_read_unlock(space);
1043 mqueue = &port->ip_messages;
1044
1045 } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
1046 ipc_pset_t pset;
1047
1048 pset = (ipc_pset_t) object;
1049 assert(pset != IPS_NULL);
1050
1051 ips_lock(pset);
1052 assert(ips_active(pset));
1053 assert(pset->ips_local_name == name);
1054 is_read_unlock(space);
1055
1056 mqueue = &pset->ips_messages;
1057 } else {
1058 is_read_unlock(space);
1059 return MACH_RCV_INVALID_NAME;
1060 }
1061
1062 /*
1063 * At this point, the object is locked and active,
1064 * the space is unlocked, and mqueue is initialized.
1065 */
1066
1067 io_reference(object);
1068 io_unlock(object);
1069
1070 *objectp = object;
1071 *mqueuep = mqueue;
1072 return MACH_MSG_SUCCESS;
1073 }
1074