]> git.saurik.com Git - apple/xnu.git/blob - osfmk/ipc/ipc_mqueue.c
xnu-792.21.3.tar.gz
[apple/xnu.git] / osfmk / ipc / ipc_mqueue.c
1 /*
2 * Copyright (c) 2000-2004 Apple Computer, Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28 /*
29 * @OSF_FREE_COPYRIGHT@
30 */
31 /*
32 * Mach Operating System
33 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34 * All Rights Reserved.
35 *
36 * Permission to use, copy, modify and distribute this software and its
37 * documentation is hereby granted, provided that both the copyright
38 * notice and this permission notice appear in all copies of the
39 * software, derivative works or modified versions, and any portions
40 * thereof, and that both notices appear in supporting documentation.
41 *
42 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45 *
46 * Carnegie Mellon requests users of this software to return to
47 *
48 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
49 * School of Computer Science
50 * Carnegie Mellon University
51 * Pittsburgh PA 15213-3890
52 *
53 * any improvements or extensions that they make and grant Carnegie Mellon
54 * the rights to redistribute these changes.
55 */
56 /*
57 */
58 /*
59 * File: ipc/ipc_mqueue.c
60 * Author: Rich Draves
61 * Date: 1989
62 *
63 * Functions to manipulate IPC message queues.
64 */
65
66 #include <mach/port.h>
67 #include <mach/message.h>
68 #include <mach/sync_policy.h>
69
70 #include <kern/assert.h>
71 #include <kern/counters.h>
72 #include <kern/sched_prim.h>
73 #include <kern/ipc_kobject.h>
74 #include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
75 #include <kern/misc_protos.h>
76 #include <kern/task.h>
77 #include <kern/thread.h>
78 #include <kern/wait_queue.h>
79
80 #include <ipc/ipc_mqueue.h>
81 #include <ipc/ipc_kmsg.h>
82 #include <ipc/ipc_port.h>
83 #include <ipc/ipc_pset.h>
84 #include <ipc/ipc_space.h>
85
86 #include <ddb/tr.h>
87
88 int ipc_mqueue_full; /* address is event for queue space */
89 int ipc_mqueue_rcv; /* address is event for message arrival */
90
91 #define TR_ENABLE 0
92
93 /* forward declarations */
94 void ipc_mqueue_receive_results(wait_result_t result);
95
96 /*
97 * Routine: ipc_mqueue_init
98 * Purpose:
99 * Initialize a newly-allocated message queue.
100 */
101 void
102 ipc_mqueue_init(
103 ipc_mqueue_t mqueue,
104 boolean_t is_set)
105 {
106 if (is_set) {
107 wait_queue_set_init(&mqueue->imq_set_queue, SYNC_POLICY_FIFO);
108 } else {
109 wait_queue_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO);
110 ipc_kmsg_queue_init(&mqueue->imq_messages);
111 mqueue->imq_seqno = 0;
112 mqueue->imq_msgcount = 0;
113 mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
114 mqueue->imq_fullwaiters = FALSE;
115 }
116 }
117
118 /*
119 * Routine: ipc_mqueue_member
120 * Purpose:
121 * Indicate whether the (port) mqueue is a member of
122 * this portset's mqueue. We do this by checking
123 * whether the portset mqueue's waitq is an member of
124 * the port's mqueue waitq.
125 * Conditions:
126 * the portset's mqueue is not already a member
127 * this may block while allocating linkage structures.
128 */
129
130 boolean_t
131 ipc_mqueue_member(
132 ipc_mqueue_t port_mqueue,
133 ipc_mqueue_t set_mqueue)
134 {
135 wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
136 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
137
138 return (wait_queue_member(port_waitq, set_waitq));
139
140 }
141
142 /*
143 * Routine: ipc_mqueue_remove
144 * Purpose:
145 * Remove the association between the queue and the specified
146 * set message queue.
147 */
148
149 kern_return_t
150 ipc_mqueue_remove(
151 ipc_mqueue_t mqueue,
152 ipc_mqueue_t set_mqueue)
153 {
154 wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
155 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
156
157 return wait_queue_unlink(mq_waitq, set_waitq);
158 }
159
160 /*
161 * Routine: ipc_mqueue_remove_from_all
162 * Purpose:
163 * Remove the mqueue from all the sets it is a member of
164 * Conditions:
165 * Nothing locked.
166 */
167 void
168 ipc_mqueue_remove_from_all(
169 ipc_mqueue_t mqueue)
170 {
171 wait_queue_t mq_waitq = &mqueue->imq_wait_queue;
172
173 wait_queue_unlink_all(mq_waitq);
174 return;
175 }
176
177 /*
178 * Routine: ipc_mqueue_remove_all
179 * Purpose:
180 * Remove all the member queues from the specified set.
181 * Conditions:
182 * Nothing locked.
183 */
184 void
185 ipc_mqueue_remove_all(
186 ipc_mqueue_t mqueue)
187 {
188 wait_queue_set_t mq_setq = &mqueue->imq_set_queue;
189
190 wait_queue_set_unlink_all(mq_setq);
191 return;
192 }
193
194
195 /*
196 * Routine: ipc_mqueue_add
197 * Purpose:
198 * Associate the portset's mqueue with the port's mqueue.
199 * This has to be done so that posting the port will wakeup
200 * a portset waiter. If there are waiters on the portset
201 * mqueue and messages on the port mqueue, try to match them
202 * up now.
203 * Conditions:
204 * May block.
205 */
206 kern_return_t
207 ipc_mqueue_add(
208 ipc_mqueue_t port_mqueue,
209 ipc_mqueue_t set_mqueue)
210 {
211 wait_queue_t port_waitq = &port_mqueue->imq_wait_queue;
212 wait_queue_set_t set_waitq = &set_mqueue->imq_set_queue;
213 ipc_kmsg_queue_t kmsgq;
214 ipc_kmsg_t kmsg, next;
215 kern_return_t kr;
216 spl_t s;
217
218 kr = wait_queue_link(port_waitq, set_waitq);
219 if (kr != KERN_SUCCESS)
220 return kr;
221
222 /*
223 * Now that the set has been added to the port, there may be
224 * messages queued on the port and threads waiting on the set
225 * waitq. Lets get them together.
226 */
227 s = splsched();
228 imq_lock(port_mqueue);
229 kmsgq = &port_mqueue->imq_messages;
230 for (kmsg = ipc_kmsg_queue_first(kmsgq);
231 kmsg != IKM_NULL;
232 kmsg = next) {
233 next = ipc_kmsg_queue_next(kmsgq, kmsg);
234
235 for (;;) {
236 thread_t th;
237
238 th = wait_queue_wakeup64_identity_locked(
239 port_waitq,
240 IPC_MQUEUE_RECEIVE,
241 THREAD_AWAKENED,
242 FALSE);
243 /* waitq/mqueue still locked, thread locked */
244
245 if (th == THREAD_NULL)
246 goto leave;
247
248 /*
249 * Found a receiver. see if they can handle the message
250 * correctly (the message is not too large for them, or
251 * they didn't care to be informed that the message was
252 * too large). If they can't handle it, take them off
253 * the list and let them go back and figure it out and
254 * just move onto the next.
255 */
256 if (th->ith_msize <
257 kmsg->ikm_header->msgh_size +
258 REQUESTED_TRAILER_SIZE(th->ith_option)) {
259 th->ith_state = MACH_RCV_TOO_LARGE;
260 th->ith_msize = kmsg->ikm_header->msgh_size;
261 if (th->ith_option & MACH_RCV_LARGE) {
262 /*
263 * let him go without message
264 */
265 th->ith_kmsg = IKM_NULL;
266 th->ith_seqno = 0;
267 thread_unlock(th);
268 continue; /* find another thread */
269 }
270 } else {
271 th->ith_state = MACH_MSG_SUCCESS;
272 }
273
274 /*
275 * This thread is going to take this message,
276 * so give it to him.
277 */
278 ipc_kmsg_rmqueue(kmsgq, kmsg);
279 ipc_mqueue_release_msgcount(port_mqueue);
280
281 th->ith_kmsg = kmsg;
282 th->ith_seqno = port_mqueue->imq_seqno++;
283 thread_unlock(th);
284 break; /* go to next message */
285 }
286
287 }
288 leave:
289 imq_unlock(port_mqueue);
290 splx(s);
291 return KERN_SUCCESS;
292 }
293
294 /*
295 * Routine: ipc_mqueue_changed
296 * Purpose:
297 * Wake up receivers waiting in a message queue.
298 * Conditions:
299 * The message queue is locked.
300 */
301
302 void
303 ipc_mqueue_changed(
304 ipc_mqueue_t mqueue)
305 {
306 wait_queue_wakeup64_all_locked(
307 &mqueue->imq_wait_queue,
308 IPC_MQUEUE_RECEIVE,
309 THREAD_RESTART,
310 FALSE); /* unlock waitq? */
311 }
312
313
314
315
316 /*
317 * Routine: ipc_mqueue_send
318 * Purpose:
319 * Send a message to a message queue. The message holds a reference
320 * for the destination port for this message queue in the
321 * msgh_remote_port field.
322 *
323 * If unsuccessful, the caller still has possession of
324 * the message and must do something with it. If successful,
325 * the message is queued, given to a receiver, or destroyed.
326 * Conditions:
327 * Nothing locked.
328 * Returns:
329 * MACH_MSG_SUCCESS The message was accepted.
330 * MACH_SEND_TIMED_OUT Caller still has message.
331 * MACH_SEND_INTERRUPTED Caller still has message.
332 */
333 mach_msg_return_t
334 ipc_mqueue_send(
335 ipc_mqueue_t mqueue,
336 ipc_kmsg_t kmsg,
337 mach_msg_option_t option,
338 mach_msg_timeout_t send_timeout)
339 {
340 int wresult;
341 spl_t s;
342
343 /*
344 * Don't block if:
345 * 1) We're under the queue limit.
346 * 2) Caller used the MACH_SEND_ALWAYS internal option.
347 * 3) Message is sent to a send-once right.
348 */
349 s = splsched();
350 imq_lock(mqueue);
351
352 if (!imq_full(mqueue) ||
353 (option & MACH_SEND_ALWAYS) ||
354 (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
355 MACH_MSG_TYPE_PORT_SEND_ONCE)) {
356 mqueue->imq_msgcount++;
357 assert(mqueue->imq_msgcount > 0);
358 imq_unlock(mqueue);
359 splx(s);
360 } else {
361 thread_t cur_thread = current_thread();
362 uint64_t deadline;
363
364 /*
365 * We have to wait for space to be granted to us.
366 */
367 if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
368 imq_unlock(mqueue);
369 splx(s);
370 return MACH_SEND_TIMED_OUT;
371 }
372 mqueue->imq_fullwaiters = TRUE;
373 thread_lock(cur_thread);
374 if (option & MACH_SEND_TIMEOUT)
375 clock_interval_to_deadline(send_timeout, 1000*NSEC_PER_USEC, &deadline);
376 else
377 deadline = 0;
378 wresult = wait_queue_assert_wait64_locked(
379 &mqueue->imq_wait_queue,
380 IPC_MQUEUE_FULL,
381 THREAD_ABORTSAFE, deadline,
382 cur_thread);
383 thread_unlock(cur_thread);
384 imq_unlock(mqueue);
385 splx(s);
386
387 if (wresult == THREAD_WAITING) {
388 wresult = thread_block(THREAD_CONTINUE_NULL);
389 counter(c_ipc_mqueue_send_block++);
390 }
391
392 switch (wresult) {
393 case THREAD_TIMED_OUT:
394 assert(option & MACH_SEND_TIMEOUT);
395 return MACH_SEND_TIMED_OUT;
396
397 case THREAD_AWAKENED:
398 /* we can proceed - inherited msgcount from waker */
399 assert(mqueue->imq_msgcount > 0);
400 break;
401
402 case THREAD_INTERRUPTED:
403 return MACH_SEND_INTERRUPTED;
404
405 case THREAD_RESTART:
406 default:
407 panic("ipc_mqueue_send");
408 }
409 }
410
411 ipc_mqueue_post(mqueue, kmsg);
412 return MACH_MSG_SUCCESS;
413 }
414
415 /*
416 * Routine: ipc_mqueue_release_msgcount
417 * Purpose:
418 * Release a message queue reference in the case where we
419 * found a waiter.
420 *
421 * Conditions:
422 * The message queue is locked.
423 * The message corresponding to this reference is off the queue.
424 */
425 void
426 ipc_mqueue_release_msgcount(
427 ipc_mqueue_t mqueue)
428 {
429 assert(imq_held(mqueue));
430 assert(mqueue->imq_msgcount > 1 || ipc_kmsg_queue_empty(&mqueue->imq_messages));
431
432 mqueue->imq_msgcount--;
433
434 if (!imq_full(mqueue) && mqueue->imq_fullwaiters) {
435 if (wait_queue_wakeup64_one_locked(
436 &mqueue->imq_wait_queue,
437 IPC_MQUEUE_FULL,
438 THREAD_AWAKENED,
439 FALSE) != KERN_SUCCESS) {
440 mqueue->imq_fullwaiters = FALSE;
441 } else {
442 /* gave away our slot - add reference back */
443 mqueue->imq_msgcount++;
444 }
445 }
446 }
447
448 /*
449 * Routine: ipc_mqueue_post
450 * Purpose:
451 * Post a message to a waiting receiver or enqueue it. If a
452 * receiver is waiting, we can release our reserved space in
453 * the message queue.
454 *
455 * Conditions:
456 * If we need to queue, our space in the message queue is reserved.
457 */
458 void
459 ipc_mqueue_post(
460 register ipc_mqueue_t mqueue,
461 register ipc_kmsg_t kmsg)
462 {
463
464 spl_t s;
465
466 /*
467 * While the msg queue is locked, we have control of the
468 * kmsg, so the ref in it for the port is still good.
469 *
470 * Check for a receiver for the message.
471 */
472 s = splsched();
473 imq_lock(mqueue);
474 for (;;) {
475 wait_queue_t waitq = &mqueue->imq_wait_queue;
476 thread_t receiver;
477
478 receiver = wait_queue_wakeup64_identity_locked(
479 waitq,
480 IPC_MQUEUE_RECEIVE,
481 THREAD_AWAKENED,
482 FALSE);
483 /* waitq still locked, thread locked */
484
485 if (receiver == THREAD_NULL) {
486 /*
487 * no receivers; queue kmsg
488 */
489 assert(mqueue->imq_msgcount > 0);
490 ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
491 break;
492 }
493
494 /*
495 * We found a waiting thread.
496 * If the message is too large or the scatter list is too small
497 * the thread we wake up will get that as its status.
498 */
499 if (receiver->ith_msize <
500 (kmsg->ikm_header->msgh_size) +
501 REQUESTED_TRAILER_SIZE(receiver->ith_option)) {
502 receiver->ith_msize = kmsg->ikm_header->msgh_size;
503 receiver->ith_state = MACH_RCV_TOO_LARGE;
504 } else {
505 receiver->ith_state = MACH_MSG_SUCCESS;
506 }
507
508 /*
509 * If there is no problem with the upcoming receive, or the
510 * receiver thread didn't specifically ask for special too
511 * large error condition, go ahead and select it anyway.
512 */
513 if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
514 !(receiver->ith_option & MACH_RCV_LARGE)) {
515
516 receiver->ith_kmsg = kmsg;
517 receiver->ith_seqno = mqueue->imq_seqno++;
518 thread_unlock(receiver);
519
520 /* we didn't need our reserved spot in the queue */
521 ipc_mqueue_release_msgcount(mqueue);
522 break;
523 }
524
525 /*
526 * Otherwise, this thread needs to be released to run
527 * and handle its error without getting the message. We
528 * need to go back and pick another one.
529 */
530 receiver->ith_kmsg = IKM_NULL;
531 receiver->ith_seqno = 0;
532 thread_unlock(receiver);
533 }
534
535 imq_unlock(mqueue);
536 splx(s);
537
538 current_task()->messages_sent++;
539 return;
540 }
541
542
543 /* static */ void
544 ipc_mqueue_receive_results(wait_result_t saved_wait_result)
545 {
546 thread_t self = current_thread();
547 mach_msg_option_t option = self->ith_option;
548 kern_return_t mr;
549
550 /*
551 * why did we wake up?
552 */
553 switch (saved_wait_result) {
554 case THREAD_TIMED_OUT:
555 self->ith_state = MACH_RCV_TIMED_OUT;
556 return;
557
558 case THREAD_INTERRUPTED:
559 self->ith_state = MACH_RCV_INTERRUPTED;
560 return;
561
562 case THREAD_RESTART:
563 /* something bad happened to the port/set */
564 self->ith_state = MACH_RCV_PORT_CHANGED;
565 return;
566
567 case THREAD_AWAKENED:
568 /*
569 * We do not need to go select a message, somebody
570 * handed us one (or a too-large indication).
571 */
572 mr = MACH_MSG_SUCCESS;
573
574 switch (self->ith_state) {
575 case MACH_RCV_SCATTER_SMALL:
576 case MACH_RCV_TOO_LARGE:
577 /*
578 * Somebody tried to give us a too large
579 * message. If we indicated that we cared,
580 * then they only gave us the indication,
581 * otherwise they gave us the indication
582 * AND the message anyway.
583 */
584 if (option & MACH_RCV_LARGE) {
585 return;
586 }
587
588 case MACH_MSG_SUCCESS:
589 return;
590
591 default:
592 panic("ipc_mqueue_receive_results: strange ith_state");
593 }
594
595 default:
596 panic("ipc_mqueue_receive_results: strange wait_result");
597 }
598 }
599
600 void
601 ipc_mqueue_receive_continue(
602 __unused void *param,
603 wait_result_t wresult)
604 {
605 ipc_mqueue_receive_results(wresult);
606 mach_msg_receive_continue(); /* hard-coded for now */
607 }
608
609 /*
610 * Routine: ipc_mqueue_receive
611 * Purpose:
612 * Receive a message from a message queue.
613 *
614 * If continuation is non-zero, then we might discard
615 * our kernel stack when we block. We will continue
616 * after unblocking by executing continuation.
617 *
618 * If resume is true, then we are resuming a receive
619 * operation after a blocked receive discarded our stack.
620 * Conditions:
621 * Our caller must hold a reference for the port or port set
622 * to which this queue belongs, to keep the queue
623 * from being deallocated.
624 *
625 * The kmsg is returned with clean header fields
626 * and with the circular bit turned off.
627 * Returns:
628 * MACH_MSG_SUCCESS Message returned in kmsgp.
629 * MACH_RCV_TOO_LARGE Message size returned in kmsgp.
630 * MACH_RCV_TIMED_OUT No message obtained.
631 * MACH_RCV_INTERRUPTED No message obtained.
632 * MACH_RCV_PORT_DIED Port/set died; no message.
633 * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
634 *
635 */
636
637 void
638 ipc_mqueue_receive(
639 ipc_mqueue_t mqueue,
640 mach_msg_option_t option,
641 mach_msg_size_t max_size,
642 mach_msg_timeout_t rcv_timeout,
643 int interruptible)
644 {
645 ipc_kmsg_queue_t kmsgs;
646 wait_result_t wresult;
647 thread_t self;
648 uint64_t deadline;
649 spl_t s;
650
651 s = splsched();
652 imq_lock(mqueue);
653
654 if (imq_is_set(mqueue)) {
655 wait_queue_link_t wql;
656 ipc_mqueue_t port_mq;
657 queue_t q;
658
659 q = &mqueue->imq_setlinks;
660
661 /*
662 * If we are waiting on a portset mqueue, we need to see if
663 * any of the member ports have work for us. If so, try to
664 * deliver one of those messages. By holding the portset's
665 * mqueue lock during the search, we tie up any attempts by
666 * mqueue_deliver or portset membership changes that may
667 * cross our path. But this is a lock order violation, so we
668 * have to do it "softly." If we don't find a message waiting
669 * for us, we will assert our intention to wait while still
670 * holding that lock. When we release the lock, the deliver/
671 * change will succeed and find us.
672 */
673 search_set:
674 queue_iterate(q, wql, wait_queue_link_t, wql_setlinks) {
675 port_mq = (ipc_mqueue_t)wql->wql_queue;
676 kmsgs = &port_mq->imq_messages;
677
678 if (!imq_lock_try(port_mq)) {
679 imq_unlock(mqueue);
680 splx(s);
681 delay(1);
682 s = splsched();
683 imq_lock(mqueue);
684 goto search_set; /* start again at beginning - SMP */
685 }
686
687 /*
688 * If there is still a message to be had, we will
689 * try to select it (may not succeed because of size
690 * and options). In any case, we deliver those
691 * results back to the user.
692 *
693 * We also move the port's linkage to the tail of the
694 * list for this set (fairness). Future versions will
695 * sort by timestamp or priority.
696 */
697 if (ipc_kmsg_queue_first(kmsgs) == IKM_NULL) {
698 imq_unlock(port_mq);
699 continue;
700 }
701 queue_remove(q, wql, wait_queue_link_t, wql_setlinks);
702 queue_enter(q, wql, wait_queue_link_t, wql_setlinks);
703 imq_unlock(mqueue);
704
705 ipc_mqueue_select(port_mq, option, max_size);
706 imq_unlock(port_mq);
707 splx(s);
708 return;
709
710 }
711
712 } else {
713
714 /*
715 * Receive on a single port. Just try to get the messages.
716 */
717 kmsgs = &mqueue->imq_messages;
718 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
719 ipc_mqueue_select(mqueue, option, max_size);
720 imq_unlock(mqueue);
721 splx(s);
722 return;
723 }
724 }
725
726 /*
727 * Looks like we'll have to block. The mqueue we will
728 * block on (whether the set's or the local port's) is
729 * still locked.
730 */
731 self = current_thread();
732 if (option & MACH_RCV_TIMEOUT) {
733 if (rcv_timeout == 0) {
734 imq_unlock(mqueue);
735 splx(s);
736 self->ith_state = MACH_RCV_TIMED_OUT;
737 return;
738 }
739 }
740
741 thread_lock(self);
742 self->ith_state = MACH_RCV_IN_PROGRESS;
743 self->ith_option = option;
744 self->ith_msize = max_size;
745
746 if (option & MACH_RCV_TIMEOUT)
747 clock_interval_to_deadline(rcv_timeout, 1000*NSEC_PER_USEC, &deadline);
748 else
749 deadline = 0;
750
751 wresult = wait_queue_assert_wait64_locked(&mqueue->imq_wait_queue,
752 IPC_MQUEUE_RECEIVE,
753 interruptible, deadline,
754 self);
755 thread_unlock(self);
756 imq_unlock(mqueue);
757 splx(s);
758
759 if (wresult == THREAD_WAITING) {
760 counter((interruptible == THREAD_ABORTSAFE) ?
761 c_ipc_mqueue_receive_block_user++ :
762 c_ipc_mqueue_receive_block_kernel++);
763
764 if (self->ith_continuation)
765 thread_block(ipc_mqueue_receive_continue);
766 /* NOTREACHED */
767
768 wresult = thread_block(THREAD_CONTINUE_NULL);
769 }
770 ipc_mqueue_receive_results(wresult);
771 }
772
773
774 /*
775 * Routine: ipc_mqueue_select
776 * Purpose:
777 * A receiver discovered that there was a message on the queue
778 * before he had to block. Pick the message off the queue and
779 * "post" it to himself.
780 * Conditions:
781 * mqueue locked.
782 * There is a message.
783 * Returns:
784 * MACH_MSG_SUCCESS Actually selected a message for ourselves.
785 * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
786 */
787 void
788 ipc_mqueue_select(
789 ipc_mqueue_t mqueue,
790 mach_msg_option_t option,
791 mach_msg_size_t max_size)
792 {
793 thread_t self = current_thread();
794 ipc_kmsg_t kmsg;
795 mach_msg_return_t mr;
796 mach_msg_size_t rcv_size;
797
798 mr = MACH_MSG_SUCCESS;
799
800
801 /*
802 * Do some sanity checking of our ability to receive
803 * before pulling the message off the queue.
804 */
805 kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
806 assert(kmsg != IKM_NULL);
807
808 /*
809 * If we really can't receive it, but we had the
810 * MACH_RCV_LARGE option set, then don't take it off
811 * the queue, instead return the appropriate error
812 * (and size needed).
813 */
814 rcv_size = ipc_kmsg_copyout_size(kmsg, self->map);
815 if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
816 mr = MACH_RCV_TOO_LARGE;
817 if (option & MACH_RCV_LARGE) {
818 self->ith_kmsg = IKM_NULL;
819 self->ith_msize = rcv_size;
820 self->ith_seqno = 0;
821 self->ith_state = mr;
822 return;
823 }
824 }
825
826 ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
827 ipc_mqueue_release_msgcount(mqueue);
828 self->ith_seqno = mqueue->imq_seqno++;
829 self->ith_kmsg = kmsg;
830 self->ith_state = mr;
831
832 current_task()->messages_received++;
833 return;
834 }
835
836 /*
837 * Routine: ipc_mqueue_destroy
838 * Purpose:
839 * Destroy a message queue. Set any blocked senders running.
840 * Destroy the kmsgs in the queue.
841 * Conditions:
842 * Nothing locked.
843 * Receivers were removed when the receive right was "changed"
844 */
845 void
846 ipc_mqueue_destroy(
847 ipc_mqueue_t mqueue)
848 {
849 ipc_kmsg_queue_t kmqueue;
850 ipc_kmsg_t kmsg;
851 spl_t s;
852
853
854 s = splsched();
855 imq_lock(mqueue);
856 /*
857 * rouse all blocked senders
858 */
859 mqueue->imq_fullwaiters = FALSE;
860 wait_queue_wakeup64_all_locked(
861 &mqueue->imq_wait_queue,
862 IPC_MQUEUE_FULL,
863 THREAD_AWAKENED,
864 FALSE);
865
866 kmqueue = &mqueue->imq_messages;
867
868 while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
869 imq_unlock(mqueue);
870 splx(s);
871
872 ipc_kmsg_destroy_dest(kmsg);
873
874 s = splsched();
875 imq_lock(mqueue);
876 }
877 imq_unlock(mqueue);
878 splx(s);
879 }
880
881 /*
882 * Routine: ipc_mqueue_set_qlimit
883 * Purpose:
884 * Changes a message queue limit; the maximum number
885 * of messages which may be queued.
886 * Conditions:
887 * Nothing locked.
888 */
889
890 void
891 ipc_mqueue_set_qlimit(
892 ipc_mqueue_t mqueue,
893 mach_port_msgcount_t qlimit)
894 {
895 spl_t s;
896
897 assert(qlimit <= MACH_PORT_QLIMIT_MAX);
898
899 /* wake up senders allowed by the new qlimit */
900 s = splsched();
901 imq_lock(mqueue);
902 if (qlimit > mqueue->imq_qlimit) {
903 mach_port_msgcount_t i, wakeup;
904
905 /* caution: wakeup, qlimit are unsigned */
906 wakeup = qlimit - mqueue->imq_qlimit;
907
908 for (i = 0; i < wakeup; i++) {
909 if (wait_queue_wakeup64_one_locked(
910 &mqueue->imq_wait_queue,
911 IPC_MQUEUE_FULL,
912 THREAD_AWAKENED,
913 FALSE) == KERN_NOT_WAITING) {
914 mqueue->imq_fullwaiters = FALSE;
915 break;
916 }
917 mqueue->imq_msgcount++; /* give it to the awakened thread */
918 }
919 }
920 mqueue->imq_qlimit = qlimit;
921 imq_unlock(mqueue);
922 splx(s);
923 }
924
925 /*
926 * Routine: ipc_mqueue_set_seqno
927 * Purpose:
928 * Changes an mqueue's sequence number.
929 * Conditions:
930 * Caller holds a reference to the queue's containing object.
931 */
932 void
933 ipc_mqueue_set_seqno(
934 ipc_mqueue_t mqueue,
935 mach_port_seqno_t seqno)
936 {
937 spl_t s;
938
939 s = splsched();
940 imq_lock(mqueue);
941 mqueue->imq_seqno = seqno;
942 imq_unlock(mqueue);
943 splx(s);
944 }
945
946
947 /*
948 * Routine: ipc_mqueue_copyin
949 * Purpose:
950 * Convert a name in a space to a message queue.
951 * Conditions:
952 * Nothing locked. If successful, the caller gets a ref for
953 * for the object. This ref ensures the continued existence of
954 * the queue.
955 * Returns:
956 * MACH_MSG_SUCCESS Found a message queue.
957 * MACH_RCV_INVALID_NAME The space is dead.
958 * MACH_RCV_INVALID_NAME The name doesn't denote a right.
959 * MACH_RCV_INVALID_NAME
960 * The denoted right is not receive or port set.
961 * MACH_RCV_IN_SET Receive right is a member of a set.
962 */
963
964 mach_msg_return_t
965 ipc_mqueue_copyin(
966 ipc_space_t space,
967 mach_port_name_t name,
968 ipc_mqueue_t *mqueuep,
969 ipc_object_t *objectp)
970 {
971 ipc_entry_t entry;
972 ipc_object_t object;
973 ipc_mqueue_t mqueue;
974
975 is_read_lock(space);
976 if (!space->is_active) {
977 is_read_unlock(space);
978 return MACH_RCV_INVALID_NAME;
979 }
980
981 entry = ipc_entry_lookup(space, name);
982 if (entry == IE_NULL) {
983 is_read_unlock(space);
984 return MACH_RCV_INVALID_NAME;
985 }
986
987 object = entry->ie_object;
988
989 if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
990 ipc_port_t port;
991
992 port = (ipc_port_t) object;
993 assert(port != IP_NULL);
994
995 ip_lock(port);
996 assert(ip_active(port));
997 assert(port->ip_receiver_name == name);
998 assert(port->ip_receiver == space);
999 is_read_unlock(space);
1000 mqueue = &port->ip_messages;
1001
1002 } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
1003 ipc_pset_t pset;
1004
1005 pset = (ipc_pset_t) object;
1006 assert(pset != IPS_NULL);
1007
1008 ips_lock(pset);
1009 assert(ips_active(pset));
1010 assert(pset->ips_local_name == name);
1011 is_read_unlock(space);
1012
1013 mqueue = &pset->ips_messages;
1014 } else {
1015 is_read_unlock(space);
1016 return MACH_RCV_INVALID_NAME;
1017 }
1018
1019 /*
1020 * At this point, the object is locked and active,
1021 * the space is unlocked, and mqueue is initialized.
1022 */
1023
1024 io_reference(object);
1025 io_unlock(object);
1026
1027 *objectp = object;
1028 *mqueuep = mqueue;
1029 return MACH_MSG_SUCCESS;
1030 }
1031