]> git.saurik.com Git - apple/xnu.git/blob - osfmk/ipc/ipc_mqueue.c
e45388c494c5021fd974f087b7c6134b9e2c22c3
[apple/xnu.git] / osfmk / ipc / ipc_mqueue.c
1 /*
2 * Copyright (c) 2000-2004 Apple Computer, Inc. All rights reserved.
3 *
4 * @APPLE_LICENSE_OSREFERENCE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the
10 * License may not be used to create, or enable the creation or
11 * redistribution of, unlawful or unlicensed copies of an Apple operating
12 * system, or to circumvent, violate, or enable the circumvention or
13 * violation of, any terms of an Apple operating system software license
14 * agreement.
15 *
16 * Please obtain a copy of the License at
17 * http://www.opensource.apple.com/apsl/ and read it before using this
18 * file.
19 *
20 * The Original Code and all software distributed under the License are
21 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
22 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
23 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
24 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
25 * Please see the License for the specific language governing rights and
26 * limitations under the License.
27 *
28 * @APPLE_LICENSE_OSREFERENCE_HEADER_END@
29 */
30 /*
31 * @OSF_FREE_COPYRIGHT@
32 */
33 /*
34 * Mach Operating System
35 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
36 * All Rights Reserved.
37 *
38 * Permission to use, copy, modify and distribute this software and its
39 * documentation is hereby granted, provided that both the copyright
40 * notice and this permission notice appear in all copies of the
41 * software, derivative works or modified versions, and any portions
42 * thereof, and that both notices appear in supporting documentation.
43 *
44 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
45 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
46 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
47 *
48 * Carnegie Mellon requests users of this software to return to
49 *
50 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
51 * School of Computer Science
52 * Carnegie Mellon University
53 * Pittsburgh PA 15213-3890
54 *
55 * any improvements or extensions that they make and grant Carnegie Mellon
56 * the rights to redistribute these changes.
57 */
58 /*
59 */
60 /*
61 * File: ipc/ipc_mqueue.c
62 * Author: Rich Draves
63 * Date: 1989
64 *
65 * Functions to manipulate IPC message queues.
66 */
67
68 #include <mach/port.h>
69 #include <mach/message.h>
70 #include <mach/sync_policy.h>
71
72 #include <kern/assert.h>
73 #include <kern/counters.h>
74 #include <kern/sched_prim.h>
75 #include <kern/ipc_kobject.h>
76 #include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
77 #include <kern/misc_protos.h>
78 #include <kern/task.h>
79 #include <kern/thread.h>
80 #include <kern/wait_queue.h>
81
82 #include <ipc/ipc_mqueue.h>
83 #include <ipc/ipc_kmsg.h>
84 #include <ipc/ipc_port.h>
85 #include <ipc/ipc_pset.h>
86 #include <ipc/ipc_space.h>
87
88 #include <ddb/tr.h>
89
90 int ipc_mqueue_full; /* address is event for queue space */
91 int ipc_mqueue_rcv; /* address is event for message arrival */
92
93 #define TR_ENABLE 0
94
95 /* forward declarations */
96 void ipc_mqueue_receive_results(wait_result_t result);
97
98 /*
99 * Routine: ipc_mqueue_init
100 * Purpose:
101 * Initialize a newly-allocated message queue.
102 */
103 void
104 ipc_mqueue_init(
105 ipc_mqueue_t mqueue,
106 boolean_t is_set)
107 {
108 if (is_set) {
109 wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
110 } else {
111 wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
112 ipc_kmsg_queue_init(&mqueue->imq_messages);
113 mqueue->imq_seqno = 0;
114 mqueue->imq_msgcount = 0;
115 mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
116 mqueue->imq_fullwaiters = FALSE;
117 }
118 }
119
120 /*
121 * Routine: ipc_mqueue_member
122 * Purpose:
123 * Indicate whether the (port) mqueue is a member of
124 * this portset's mqueue. We do this by checking
125 * whether the portset mqueue's waitq is an member of
126 * the port's mqueue waitq.
127 * Conditions:
128 * the portset's mqueue is not already a member
129 * this may block while allocating linkage structures.
130 */
131
132 boolean_t
133 ipc_mqueue_member(
134 ipc_mqueue_t port_mqueue,
135 ipc_mqueue_t set_mqueue)
136 {
137 wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
138 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
139
140 return (wait_queue_member(port_waitq, set_waitq));
141
142 }
143
144 /*
145 * Routine: ipc_mqueue_remove
146 * Purpose:
147 * Remove the association between the queue and the specified
148 * set message queue.
149 */
150
151 kern_return_t
152 ipc_mqueue_remove(
153 ipc_mqueue_t mqueue,
154 ipc_mqueue_t set_mqueue)
155 {
156 wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
157 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
158
159 return wait_queue_unlink(mq_waitq, set_waitq);
160 }
161
162 /*
163 * Routine: ipc_mqueue_remove_from_all
164 * Purpose:
165 * Remove the mqueue from all the sets it is a member of
166 * Conditions:
167 * Nothing locked.
168 */
169 void
170 ipc_mqueue_remove_from_all(
171 ipc_mqueue_t mqueue)
172 {
173 wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
174
175 wait_queue_unlink_all(mq_waitq);
176 return;
177 }
178
179 /*
180 * Routine: ipc_mqueue_remove_all
181 * Purpose:
182 * Remove all the member queues from the specified set.
183 * Conditions:
184 * Nothing locked.
185 */
186 void
187 ipc_mqueue_remove_all(
188 ipc_mqueue_t mqueue)
189 {
190 wait_queue_set_t mq_setq = &mqueue->imq_set_queue;
191
192 wait_queue_set_unlink_all(mq_setq);
193 return;
194 }
195
196
197 /*
198 * Routine: ipc_mqueue_add
199 * Purpose:
200 * Associate the portset's mqueue with the port's mqueue.
201 * This has to be done so that posting the port will wakeup
202 * a portset waiter. If there are waiters on the portset
203 * mqueue and messages on the port mqueue, try to match them
204 * up now.
205 * Conditions:
206 * May block.
207 */
208 kern_return_t
209 ipc_mqueue_add(
210 ipc_mqueue_t port_mqueue,
211 ipc_mqueue_t set_mqueue)
212 {
213 wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
214 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
215 ipc_kmsg_queue_t kmsgq;
216 ipc_kmsg_t kmsg, next;
217 kern_return_t kr;
218 spl_t s;
219
220 kr = wait_queue_link(port_waitq, set_waitq);
221 if (kr != KERN_SUCCESS)
222 return kr;
223
224 /*
225 * Now that the set has been added to the port, there may be
226 * messages queued on the port and threads waiting on the set
227 * waitq. Lets get them together.
228 */
229 s = splsched();
230 imq_lock(port_mqueue);
231 kmsgq = &port_mqueue->imq_messages;
232 for (kmsg = ipc_kmsg_queue_first(kmsgq);
233 kmsg != IKM_NULL;
234 kmsg = next) {
235 next = ipc_kmsg_queue_next(kmsgq, kmsg);
236
237 for (;;) {
238 thread_t th;
239
240 th = wait_queue_wakeup64_identity_locked(
241 port_waitq,
242 IPC_MQUEUE_RECEIVE,
243 THREAD_AWAKENED,
244 FALSE);
245 /* waitq/mqueue still locked, thread locked */
246
247 if (th == THREAD_NULL)
248 goto leave;
249
250 /*
251 * Found a receiver. see if they can handle the message
252 * correctly (the message is not too large for them, or
253 * they didn't care to be informed that the message was
254 * too large). If they can't handle it, take them off
255 * the list and let them go back and figure it out and
256 * just move onto the next.
257 */
258 if (th->ith_msize <
259 kmsg->ikm_header->msgh_size +
260 REQUESTED_TRAILER_SIZE(th->ith_option)) {
261 th->ith_state = MACH_RCV_TOO_LARGE;
262 th->ith_msize = kmsg->ikm_header->msgh_size;
263 if (th->ith_option & MACH_RCV_LARGE) {
264 /*
265 * let him go without message
266 */
267 th->ith_kmsg = IKM_NULL;
268 th->ith_seqno = 0;
269 thread_unlock(th);
270 continue; /* find another thread */
271 }
272 } else {
273 th->ith_state = MACH_MSG_SUCCESS;
274 }
275
276 /*
277 * This thread is going to take this message,
278 * so give it to him.
279 */
280 ipc_kmsg_rmqueue(kmsgq, kmsg);
281 ipc_mqueue_release_msgcount(port_mqueue);
282
283 th->ith_kmsg = kmsg;
284 th->ith_seqno = port_mqueue->imq_seqno++;
285 thread_unlock(th);
286 break; /* go to next message */
287 }
288
289 }
290 leave:
291 imq_unlock(port_mqueue);
292 splx(s);
293 return KERN_SUCCESS;
294 }
295
296 /*
297 * Routine: ipc_mqueue_changed
298 * Purpose:
299 * Wake up receivers waiting in a message queue.
300 * Conditions:
301 * The message queue is locked.
302 */
303
304 void
305 ipc_mqueue_changed(
306 ipc_mqueue_t mqueue)
307 {
308 wait_queue_wakeup64_all_locked(
309 &mqueue->imq_wait_queue,
310 IPC_MQUEUE_RECEIVE,
311 THREAD_RESTART,
312 FALSE); /* unlock waitq? */
313 }
314
315
316
317
318 /*
319 * Routine: ipc_mqueue_send
320 * Purpose:
321 * Send a message to a message queue. The message holds a reference
322 * for the destination port for this message queue in the
323 * msgh_remote_port field.
324 *
325 * If unsuccessful, the caller still has possession of
326 * the message and must do something with it. If successful,
327 * the message is queued, given to a receiver, or destroyed.
328 * Conditions:
329 * Nothing locked.
330 * Returns:
331 * MACH_MSG_SUCCESS The message was accepted.
332 * MACH_SEND_TIMED_OUT Caller still has message.
333 * MACH_SEND_INTERRUPTED Caller still has message.
334 */
335 mach_msg_return_t
336 ipc_mqueue_send(
337 ipc_mqueue_t mqueue,
338 ipc_kmsg_t kmsg,
339 mach_msg_option_t option,
340 mach_msg_timeout_t send_timeout)
341 {
342 int wresult;
343 spl_t s;
344
345 /*
346 * Don't block if:
347 * 1) We're under the queue limit.
348 * 2) Caller used the MACH_SEND_ALWAYS internal option.
349 * 3) Message is sent to a send-once right.
350 */
351 s = splsched();
352 imq_lock(mqueue);
353
354 if (!imq_full(mqueue) ||
355 (option & MACH_SEND_ALWAYS) ||
356 (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
357 MACH_MSG_TYPE_PORT_SEND_ONCE)) {
358 mqueue->imq_msgcount++;
359 assert(mqueue->imq_msgcount > 0);
360 imq_unlock(mqueue);
361 splx(s);
362 } else {
363 thread_t cur_thread = current_thread();
364 uint64_t deadline;
365
366 /*
367 * We have to wait for space to be granted to us.
368 */
369 if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
370 imq_unlock(mqueue);
371 splx(s);
372 return MACH_SEND_TIMED_OUT;
373 }
374 mqueue->imq_fullwaiters = TRUE;
375 thread_lock(cur_thread);
376 if (option & MACH_SEND_TIMEOUT)
377 clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
378 else
379 deadline = 0;
380 wresult = wait_queue_assert_wait64_locked(
381 &mqueue->imq_wait_queue,
382 IPC_MQUEUE_FULL,
383 THREAD_ABORTSAFE, deadline,
384 cur_thread);
385 thread_unlock(cur_thread);
386 imq_unlock(mqueue);
387 splx(s);
388
389 if (wresult == THREAD_WAITING) {
390 wresult = thread_block(THREAD_CONTINUE_NULL);
391 counter(c_ipc_mqueue_send_block++);
392 }
393
394 switch (wresult) {
395 case THREAD_TIMED_OUT:
396 assert(option & MACH_SEND_TIMEOUT);
397 return MACH_SEND_TIMED_OUT;
398
399 case THREAD_AWAKENED:
400 /* we can proceed - inherited msgcount from waker */
401 assert(mqueue->imq_msgcount > 0);
402 break;
403
404 case THREAD_INTERRUPTED:
405 return MACH_SEND_INTERRUPTED;
406
407 case THREAD_RESTART:
408 default:
409 panic("ipc_mqueue_send");
410 }
411 }
412
413 ipc_mqueue_post(mqueue, kmsg);
414 return MACH_MSG_SUCCESS;
415 }
416
417 /*
418 * Routine: ipc_mqueue_release_msgcount
419 * Purpose:
420 * Release a message queue reference in the case where we
421 * found a waiter.
422 *
423 * Conditions:
424 * The message queue is locked.
425 * The message corresponding to this reference is off the queue.
426 */
427 void
428 ipc_mqueue_release_msgcount(
429 ipc_mqueue_t mqueue)
430 {
431 assert(imq_held(mqueue));
432 assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
433
434 mqueue->imq_msgcount--;
435
436 if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
437 if (wait_queue_wakeup64_one_locked(
438 &mqueue->imq_wait_queue,
439 IPC_MQUEUE_FULL,
440 THREAD_AWAKENED,
441 FALSE) != KERN_SUCCESS) {
442 mqueue->imq_fullwaiters = FALSE;
443 } else {
444 /* gave away our slot - add reference back */
445 mqueue->imq_msgcount++;
446 }
447 }
448 }
449
450 /*
451 * Routine: ipc_mqueue_post
452 * Purpose:
453 * Post a message to a waiting receiver or enqueue it. If a
454 * receiver is waiting, we can release our reserved space in
455 * the message queue.
456 *
457 * Conditions:
458 * If we need to queue, our space in the message queue is reserved.
459 */
460 void
461 ipc_mqueue_post(
462 register ipc_mqueue_t mqueue,
463 register ipc_kmsg_t kmsg)
464 {
465
466 spl_t s;
467
468 /*
469 * While the msg queue is locked, we have control of the
470 * kmsg, so the ref in it for the port is still good.
471 *
472 * Check for a receiver for the message.
473 */
474 s = splsched();
475 imq_lock(mqueue);
476 for (;;) {
477 wait_queue_t waitq = &mqueue->imq_wait_queue;
478 thread_t receiver;
479
480 receiver = wait_queue_wakeup64_identity_locked(
481 waitq,
482 IPC_MQUEUE_RECEIVE,
483 THREAD_AWAKENED,
484 FALSE);
485 /* waitq still locked, thread locked */
486
487 if (receiver == THREAD_NULL) {
488 /*
489 * no receivers; queue kmsg
490 */
491 assert(mqueue->imq_msgcount > 0);
492 ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
493 break;
494 }
495
496 /*
497 * We found a waiting thread.
498 * If the message is too large or the scatter list is too small
499 * the thread we wake up will get that as its status.
500 */
501 if (receiver->ith_msize <
502 (kmsg->ikm_header->msgh_size) +
503 REQUESTED_TRAILER_SIZE(receiver->ith_option)) {
504 receiver->ith_msize = kmsg->ikm_header->msgh_size;
505 receiver->ith_state = MACH_RCV_TOO_LARGE;
506 } else {
507 receiver->ith_state = MACH_MSG_SUCCESS;
508 }
509
510 /*
511 * If there is no problem with the upcoming receive, or the
512 * receiver thread didn't specifically ask for special too
513 * large error condition, go ahead and select it anyway.
514 */
515 if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
516 !(receiver->ith_option & MACH_RCV_LARGE)) {
517
518 receiver->ith_kmsg = kmsg;
519 receiver->ith_seqno = mqueue->imq_seqno++;
520 thread_unlock(receiver);
521
522 /* we didn't need our reserved spot in the queue */
523 ipc_mqueue_release_msgcount(mqueue);
524 break;
525 }
526
527 /*
528 * Otherwise, this thread needs to be released to run
529 * and handle its error without getting the message. We
530 * need to go back and pick another one.
531 */
532 receiver->ith_kmsg = IKM_NULL;
533 receiver->ith_seqno = 0;
534 thread_unlock(receiver);
535 }
536
537 imq_unlock(mqueue);
538 splx(s);
539
540 current_task()->messages_sent++;
541 return;
542 }
543
544
545 /* static */ void
546 ipc_mqueue_receive_results(wait_result_t saved_wait_result)
547 {
548 thread_t self = current_thread();
549 mach_msg_option_t option = self->ith_option;
550 kern_return_t mr;
551
552 /*
553 * why did we wake up?
554 */
555 switch (saved_wait_result) {
556 case THREAD_TIMED_OUT:
557 self->ith_state = MACH_RCV_TIMED_OUT;
558 return;
559
560 case THREAD_INTERRUPTED:
561 self->ith_state = MACH_RCV_INTERRUPTED;
562 return;
563
564 case THREAD_RESTART:
565 /* something bad happened to the port/set */
566 self->ith_state = MACH_RCV_PORT_CHANGED;
567 return;
568
569 case THREAD_AWAKENED:
570 /*
571 * We do not need to go select a message, somebody
572 * handed us one (or a too-large indication).
573 */
574 mr = MACH_MSG_SUCCESS;
575
576 switch (self->ith_state) {
577 case MACH_RCV_SCATTER_SMALL:
578 case MACH_RCV_TOO_LARGE:
579 /*
580 * Somebody tried to give us a too large
581 * message. If we indicated that we cared,
582 * then they only gave us the indication,
583 * otherwise they gave us the indication
584 * AND the message anyway.
585 */
586 if (option & MACH_RCV_LARGE) {
587 return;
588 }
589
590 case MACH_MSG_SUCCESS:
591 return;
592
593 default:
594 panic("ipc_mqueue_receive_results: strange ith_state");
595 }
596
597 default:
598 panic("ipc_mqueue_receive_results: strange wait_result");
599 }
600 }
601
602 void
603 ipc_mqueue_receive_continue(
604 __unused void *param,
605 wait_result_t wresult)
606 {
607 ipc_mqueue_receive_results(wresult);
608 mach_msg_receive_continue(); /* hard-coded for now */
609 }
610
611 /*
612 * Routine: ipc_mqueue_receive
613 * Purpose:
614 * Receive a message from a message queue.
615 *
616 * If continuation is non-zero, then we might discard
617 * our kernel stack when we block. We will continue
618 * after unblocking by executing continuation.
619 *
620 * If resume is true, then we are resuming a receive
621 * operation after a blocked receive discarded our stack.
622 * Conditions:
623 * Our caller must hold a reference for the port or port set
624 * to which this queue belongs, to keep the queue
625 * from being deallocated.
626 *
627 * The kmsg is returned with clean header fields
628 * and with the circular bit turned off.
629 * Returns:
630 * MACH_MSG_SUCCESS Message returned in kmsgp.
631 * MACH_RCV_TOO_LARGE Message size returned in kmsgp.
632 * MACH_RCV_TIMED_OUT No message obtained.
633 * MACH_RCV_INTERRUPTED No message obtained.
634 * MACH_RCV_PORT_DIED Port/set died; no message.
635 * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
636 *
637 */
638
639 void
640 ipc_mqueue_receive(
641 ipc_mqueue_t mqueue,
642 mach_msg_option_t option,
643 mach_msg_size_t max_size,
644 mach_msg_timeout_t rcv_timeout,
645 int interruptible)
646 {
647 ipc_kmsg_queue_t kmsgs;
648 wait_result_t wresult;
649 thread_t self;
650 uint64_t deadline;
651 spl_t s;
652
653 s = splsched();
654 imq_lock(mqueue);
655
656 if (imq_is_set(mqueue)) {
657 wait_queue_link_t wql;
658 ipc_mqueue_t port_mq;
659 queue_t q;
660
661 q = &mqueue->imq_setlinks;
662
663 /*
664 * If we are waiting on a portset mqueue, we need to see if
665 * any of the member ports have work for us. If so, try to
666 * deliver one of those messages. By holding the portset's
667 * mqueue lock during the search, we tie up any attempts by
668 * mqueue_deliver or portset membership changes that may
669 * cross our path. But this is a lock order violation, so we
670 * have to do it "softly." If we don't find a message waiting
671 * for us, we will assert our intention to wait while still
672 * holding that lock. When we release the lock, the deliver/
673 * change will succeed and find us.
674 */
675 search_set:
676 queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
677 port_mq = (ipc_mqueue_t)wql->wql_queue;
678 kmsgs = &port_mq->imq_messages;
679
680 if (!imq_lock_try(port_mq)) {
681 imq_unlock(mqueue);
682 splx(s);
683 delay(1);
684 s = splsched();
685 imq_lock(mqueue);
686 goto search_set; /* start again at beginning - SMP */
687 }
688
689 /*
690 * If there is still a message to be had, we will
691 * try to select it (may not succeed because of size
692 * and options). In any case, we deliver those
693 * results back to the user.
694 *
695 * We also move the port's linkage to the tail of the
696 * list for this set (fairness). Future versions will
697 * sort by timestamp or priority.
698 */
699 if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
700 imq_unlock(port_mq);
701 continue;
702 }
703 queue_remove(q, wql, wait_queue_link_t, wql_setlinks);
704 queue_enter(q, wql, wait_queue_link_t, wql_setlinks);
705 imq_unlock(mqueue);
706
707 ipc_mqueue_select(port_mq, option, max_size);
708 imq_unlock(port_mq);
709 splx(s);
710 return;
711
712 }
713
714 } else {
715
716 /*
717 * Receive on a single port. Just try to get the messages.
718 */
719 kmsgs = &mqueue->imq_messages;
720 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
721 ipc_mqueue_select(mqueue, option, max_size);
722 imq_unlock(mqueue);
723 splx(s);
724 return;
725 }
726 }
727
728 /*
729 * Looks like we'll have to block. The mqueue we will
730 * block on (whether the set's or the local port's) is
731 * still locked.
732 */
733 self = current_thread();
734 if (option & MACH_RCV_TIMEOUT) {
735 if (rcv_timeout == 0) {
736 imq_unlock(mqueue);
737 splx(s);
738 self->ith_state = MACH_RCV_TIMED_OUT;
739 return;
740 }
741 }
742
743 thread_lock(self);
744 self->ith_state = MACH_RCV_IN_PROGRESS;
745 self->ith_option = option;
746 self->ith_msize = max_size;
747
748 if (option & MACH_RCV_TIMEOUT)
749 clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
750 else
751 deadline = 0;
752
753 wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
754 IPC_MQUEUE_RECEIVE,
755 interruptible, deadline,
756 self);
757 thread_unlock(self);
758 imq_unlock(mqueue);
759 splx(s);
760
761 if (wresult == THREAD_WAITING) {
762 counter((interruptible == THREAD_ABORTSAFE) ?
763 c_ipc_mqueue_receive_block_user++ :
764 c_ipc_mqueue_receive_block_kernel++);
765
766 if (self->ith_continuation)
767 thread_block(ipc_mqueue_receive_continue);
768 /* NOTREACHED */
769
770 wresult = thread_block(THREAD_CONTINUE_NULL);
771 }
772 ipc_mqueue_receive_results(wresult);
773 }
774
775
776 /*
777 * Routine: ipc_mqueue_select
778 * Purpose:
779 * A receiver discovered that there was a message on the queue
780 * before he had to block. Pick the message off the queue and
781 * "post" it to himself.
782 * Conditions:
783 * mqueue locked.
784 * There is a message.
785 * Returns:
786 * MACH_MSG_SUCCESS Actually selected a message for ourselves.
787 * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
788 */
789 void
790 ipc_mqueue_select(
791 ipc_mqueue_t mqueue,
792 mach_msg_option_t option,
793 mach_msg_size_t max_size)
794 {
795 thread_t self = current_thread();
796 ipc_kmsg_t kmsg;
797 mach_msg_return_t mr;
798 mach_msg_size_t rcv_size;
799
800 mr = MACH_MSG_SUCCESS;
801
802
803 /*
804 * Do some sanity checking of our ability to receive
805 * before pulling the message off the queue.
806 */
807 kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
808 assert(kmsg != IKM_NULL);
809
810 /*
811 * If we really can't receive it, but we had the
812 * MACH_RCV_LARGE option set, then don't take it off
813 * the queue, instead return the appropriate error
814 * (and size needed).
815 */
816 rcv_size = ipc_kmsg_copyout_size(kmsg, self->map);
817 if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
818 mr = MACH_RCV_TOO_LARGE;
819 if (option & MACH_RCV_LARGE) {
820 self->ith_kmsg = IKM_NULL;
821 self->ith_msize = rcv_size;
822 self->ith_seqno = 0;
823 self->ith_state = mr;
824 return;
825 }
826 }
827
828 ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
829 ipc_mqueue_release_msgcount(mqueue);
830 self->ith_seqno = mqueue->imq_seqno++;
831 self->ith_kmsg = kmsg;
832 self->ith_state = mr;
833
834 current_task()->messages_received++;
835 return;
836 }
837
838 /*
839 * Routine: ipc_mqueue_destroy
840 * Purpose:
841 * Destroy a message queue. Set any blocked senders running.
842 * Destroy the kmsgs in the queue.
843 * Conditions:
844 * Nothing locked.
845 * Receivers were removed when the receive right was "changed"
846 */
847 void
848 ipc_mqueue_destroy(
849 ipc_mqueue_t mqueue)
850 {
851 ipc_kmsg_queue_t kmqueue;
852 ipc_kmsg_t kmsg;
853 spl_t s;
854
855
856 s = splsched();
857 imq_lock(mqueue);
858 /*
859 * rouse all blocked senders
860 */
861 mqueue->imq_fullwaiters = FALSE;
862 wait_queue_wakeup64_all_locked(
863 &mqueue->imq_wait_queue,
864 IPC_MQUEUE_FULL,
865 THREAD_AWAKENED,
866 FALSE);
867
868 kmqueue = &mqueue->imq_messages;
869
870 while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
871 imq_unlock(mqueue);
872 splx(s);
873
874 ipc_kmsg_destroy_dest(kmsg);
875
876 s = splsched();
877 imq_lock(mqueue);
878 }
879 imq_unlock(mqueue);
880 splx(s);
881 }
882
883 /*
884 * Routine: ipc_mqueue_set_qlimit
885 * Purpose:
886 * Changes a message queue limit; the maximum number
887 * of messages which may be queued.
888 * Conditions:
889 * Nothing locked.
890 */
891
892 void
893 ipc_mqueue_set_qlimit(
894 ipc_mqueue_t mqueue,
895 mach_port_msgcount_t qlimit)
896 {
897 spl_t s;
898
899 assert(qlimit <= MACH_PORT_QLIMIT_MAX);
900
901 /* wake up senders allowed by the new qlimit */
902 s = splsched();
903 imq_lock(mqueue);
904 if (qlimit > mqueue->imq_qlimit) {
905 mach_port_msgcount_t i, wakeup;
906
907 /* caution: wakeup, qlimit are unsigned */
908 wakeup = qlimit - mqueue->imq_qlimit;
909
910 for (i = 0; i < wakeup; i++) {
911 if (wait_queue_wakeup64_one_locked(
912 &mqueue->imq_wait_queue,
913 IPC_MQUEUE_FULL,
914 THREAD_AWAKENED,
915 FALSE) == KERN_NOT_WAITING) {
916 mqueue->imq_fullwaiters = FALSE;
917 break;
918 }
919 mqueue->imq_msgcount++; /* give it to the awakened thread */
920 }
921 }
922 mqueue->imq_qlimit = qlimit;
923 imq_unlock(mqueue);
924 splx(s);
925 }
926
927 /*
928 * Routine: ipc_mqueue_set_seqno
929 * Purpose:
930 * Changes an mqueue's sequence number.
931 * Conditions:
932 * Caller holds a reference to the queue's containing object.
933 */
934 void
935 ipc_mqueue_set_seqno(
936 ipc_mqueue_t mqueue,
937 mach_port_seqno_t seqno)
938 {
939 spl_t s;
940
941 s = splsched();
942 imq_lock(mqueue);
943 mqueue->imq_seqno = seqno;
944 imq_unlock(mqueue);
945 splx(s);
946 }
947
948
949 /*
950 * Routine: ipc_mqueue_copyin
951 * Purpose:
952 * Convert a name in a space to a message queue.
953 * Conditions:
954 * Nothing locked. If successful, the caller gets a ref for
955 * for the object. This ref ensures the continued existence of
956 * the queue.
957 * Returns:
958 * MACH_MSG_SUCCESS Found a message queue.
959 * MACH_RCV_INVALID_NAME The space is dead.
960 * MACH_RCV_INVALID_NAME The name doesn't denote a right.
961 * MACH_RCV_INVALID_NAME
962 * The denoted right is not receive or port set.
963 * MACH_RCV_IN_SET Receive right is a member of a set.
964 */
965
966 mach_msg_return_t
967 ipc_mqueue_copyin(
968 ipc_space_t space,
969 mach_port_name_t name,
970 ipc_mqueue_t *mqueuep,
971 ipc_object_t *objectp)
972 {
973 ipc_entry_t entry;
974 ipc_object_t object;
975 ipc_mqueue_t mqueue;
976
977 is_read_lock(space);
978 if (!space->is_active) {
979 is_read_unlock(space);
980 return MACH_RCV_INVALID_NAME;
981 }
982
983 entry = ipc_entry_lookup(space, name);
984 if (entry == IE_NULL) {
985 is_read_unlock(space);
986 return MACH_RCV_INVALID_NAME;
987 }
988
989 object = entry->ie_object;
990
991 if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
992 ipc_port_t port;
993
994 port = (ipc_port_t) object;
995 assert(port != IP_NULL);
996
997 ip_lock(port);
998 assert(ip_active(port));
999 assert(port->ip_receiver_name == name);
1000 assert(port->ip_receiver == space);
1001 is_read_unlock(space);
1002 mqueue = &port->ip_messages;
1003
1004 } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
1005 ipc_pset_t pset;
1006
1007 pset = (ipc_pset_t) object;
1008 assert(pset != IPS_NULL);
1009
1010 ips_lock(pset);
1011 assert(ips_active(pset));
1012 assert(pset->ips_local_name == name);
1013 is_read_unlock(space);
1014
1015 mqueue = &pset->ips_messages;
1016 } else {
1017 is_read_unlock(space);
1018 return MACH_RCV_INVALID_NAME;
1019 }
1020
1021 /*
1022 * At this point, the object is locked and active,
1023 * the space is unlocked, and mqueue is initialized.
1024 */
1025
1026 io_reference(object);
1027 io_unlock(object);
1028
1029 *objectp = object;
1030 *mqueuep = mqueue;
1031 return MACH_MSG_SUCCESS;
1032 }
1033