]> git.saurik.com Git - apple/xnu.git/blob - osfmk/ipc/ipc_mqueue.c
xnu-4903.270.47.tar.gz
[apple/xnu.git] / osfmk / ipc / ipc_mqueue.c
1 /*
2 * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28 /*
29 * @OSF_FREE_COPYRIGHT@
30 */
31 /*
32 * Mach Operating System
33 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34 * All Rights Reserved.
35 *
36 * Permission to use, copy, modify and distribute this software and its
37 * documentation is hereby granted, provided that both the copyright
38 * notice and this permission notice appear in all copies of the
39 * software, derivative works or modified versions, and any portions
40 * thereof, and that both notices appear in supporting documentation.
41 *
42 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45 *
46 * Carnegie Mellon requests users of this software to return to
47 *
48 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
49 * School of Computer Science
50 * Carnegie Mellon University
51 * Pittsburgh PA 15213-3890
52 *
53 * any improvements or extensions that they make and grant Carnegie Mellon
54 * the rights to redistribute these changes.
55 */
56 /*
57 */
58 /*
59 * File: ipc/ipc_mqueue.c
60 * Author: Rich Draves
61 * Date: 1989
62 *
63 * Functions to manipulate IPC message queues.
64 */
65 /*
66 * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67 * support for mandatory and extensible security protections. This notice
68 * is included in support of clause 2.2 (b) of the Apple Public License,
69 * Version 2.0.
70 */
71
72
73 #include <mach/port.h>
74 #include <mach/message.h>
75 #include <mach/sync_policy.h>
76
77 #include <kern/assert.h>
78 #include <kern/counters.h>
79 #include <kern/sched_prim.h>
80 #include <kern/ipc_kobject.h>
81 #include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
82 #include <kern/misc_protos.h>
83 #include <kern/task.h>
84 #include <kern/thread.h>
85 #include <kern/waitq.h>
86
87 #include <ipc/ipc_mqueue.h>
88 #include <ipc/ipc_kmsg.h>
89 #include <ipc/ipc_port.h>
90 #include <ipc/ipc_pset.h>
91 #include <ipc/ipc_space.h>
92
93 #if MACH_FLIPC
94 #include <ipc/flipc.h>
95 #endif
96
97 #ifdef __LP64__
98 #include <vm/vm_map.h>
99 #endif
100
101 #include <sys/event.h>
102
103 extern char *proc_name_address(void *p);
104
105 int ipc_mqueue_full; /* address is event for queue space */
106 int ipc_mqueue_rcv; /* address is event for message arrival */
107
108 /* forward declarations */
109 void ipc_mqueue_receive_results(wait_result_t result);
110 static void ipc_mqueue_peek_on_thread(
111 ipc_mqueue_t port_mq,
112 mach_msg_option_t option,
113 thread_t thread);
114
115 /*
116 * Routine: ipc_mqueue_init
117 * Purpose:
118 * Initialize a newly-allocated message queue.
119 */
120 void
121 ipc_mqueue_init(
122 ipc_mqueue_t mqueue,
123 boolean_t is_set)
124 {
125 if (is_set) {
126 waitq_set_init(&mqueue->imq_set_queue,
127 SYNC_POLICY_FIFO | SYNC_POLICY_PREPOST,
128 NULL, NULL);
129 } else {
130 waitq_init(&mqueue->imq_wait_queue, SYNC_POLICY_FIFO | SYNC_POLICY_PORT);
131 ipc_kmsg_queue_init(&mqueue->imq_messages);
132 mqueue->imq_seqno = 0;
133 mqueue->imq_msgcount = 0;
134 mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
135 mqueue->imq_fullwaiters = FALSE;
136 #if MACH_FLIPC
137 mqueue->imq_fport = FPORT_NULL;
138 #endif
139 }
140 klist_init(&mqueue->imq_klist);
141 }
142
143 void
144 ipc_mqueue_deinit(
145 ipc_mqueue_t mqueue)
146 {
147 boolean_t is_set = imq_is_set(mqueue);
148
149 if (is_set) {
150 waitq_set_deinit(&mqueue->imq_set_queue);
151 } else {
152 waitq_deinit(&mqueue->imq_wait_queue);
153 }
154 }
155
156 /*
157 * Routine: imq_reserve_and_lock
158 * Purpose:
159 * Atomically lock an ipc_mqueue_t object and reserve
160 * an appropriate number of prepost linkage objects for
161 * use in wakeup operations.
162 * Conditions:
163 * mq is unlocked
164 */
165 void
166 imq_reserve_and_lock(ipc_mqueue_t mq, uint64_t *reserved_prepost)
167 {
168 *reserved_prepost = waitq_prepost_reserve(&mq->imq_wait_queue, 0,
169 WAITQ_KEEP_LOCKED);
170 }
171
172
173 /*
174 * Routine: imq_release_and_unlock
175 * Purpose:
176 * Unlock an ipc_mqueue_t object, re-enable interrupts,
177 * and release any unused prepost object reservations.
178 * Conditions:
179 * mq is locked
180 */
181 void
182 imq_release_and_unlock(ipc_mqueue_t mq, uint64_t reserved_prepost)
183 {
184 assert(imq_held(mq));
185 waitq_unlock(&mq->imq_wait_queue);
186 waitq_prepost_release_reserve(reserved_prepost);
187 }
188
189
190 /*
191 * Routine: ipc_mqueue_member
192 * Purpose:
193 * Indicate whether the (port) mqueue is a member of
194 * this portset's mqueue. We do this by checking
195 * whether the portset mqueue's waitq is an member of
196 * the port's mqueue waitq.
197 * Conditions:
198 * the portset's mqueue is not already a member
199 * this may block while allocating linkage structures.
200 */
201
202 boolean_t
203 ipc_mqueue_member(
204 ipc_mqueue_t port_mqueue,
205 ipc_mqueue_t set_mqueue)
206 {
207 struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
208 struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
209
210 return waitq_member(port_waitq, set_waitq);
211 }
212
213 /*
214 * Routine: ipc_mqueue_remove
215 * Purpose:
216 * Remove the association between the queue and the specified
217 * set message queue.
218 */
219
220 kern_return_t
221 ipc_mqueue_remove(
222 ipc_mqueue_t mqueue,
223 ipc_mqueue_t set_mqueue)
224 {
225 struct waitq *mq_waitq = &mqueue->imq_wait_queue;
226 struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
227
228 return waitq_unlink(mq_waitq, set_waitq);
229 }
230
231 /*
232 * Routine: ipc_mqueue_remove_from_all
233 * Purpose:
234 * Remove the mqueue from all the sets it is a member of
235 * Conditions:
236 * Nothing locked.
237 * Returns:
238 * mqueue unlocked and set links deallocated
239 */
240 void
241 ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue)
242 {
243 struct waitq *mq_waitq = &mqueue->imq_wait_queue;
244 kern_return_t kr;
245
246 imq_lock(mqueue);
247
248 assert(waitq_valid(mq_waitq));
249 kr = waitq_unlink_all_unlock(mq_waitq);
250 /* mqueue unlocked and set links deallocated */
251 }
252
253 /*
254 * Routine: ipc_mqueue_remove_all
255 * Purpose:
256 * Remove all the member queues from the specified set.
257 * Also removes the queue from any containing sets.
258 * Conditions:
259 * Nothing locked.
260 * Returns:
261 * mqueue unlocked all set links deallocated
262 */
263 void
264 ipc_mqueue_remove_all(ipc_mqueue_t mqueue)
265 {
266 struct waitq_set *mq_setq = &mqueue->imq_set_queue;
267
268 imq_lock(mqueue);
269 assert(waitqs_is_set(mq_setq));
270 waitq_set_unlink_all_unlock(mq_setq);
271 /* mqueue unlocked set links deallocated */
272 }
273
274
275 /*
276 * Routine: ipc_mqueue_add
277 * Purpose:
278 * Associate the portset's mqueue with the port's mqueue.
279 * This has to be done so that posting the port will wakeup
280 * a portset waiter. If there are waiters on the portset
281 * mqueue and messages on the port mqueue, try to match them
282 * up now.
283 * Conditions:
284 * May block.
285 */
286 kern_return_t
287 ipc_mqueue_add(
288 ipc_mqueue_t port_mqueue,
289 ipc_mqueue_t set_mqueue,
290 uint64_t *reserved_link,
291 uint64_t *reserved_prepost)
292 {
293 struct waitq *port_waitq = &port_mqueue->imq_wait_queue;
294 struct waitq_set *set_waitq = &set_mqueue->imq_set_queue;
295 ipc_kmsg_queue_t kmsgq;
296 ipc_kmsg_t kmsg, next;
297 kern_return_t kr;
298
299 assert(reserved_link && *reserved_link != 0);
300 assert(waitqs_is_linked(set_waitq));
301
302 imq_lock(port_mqueue);
303
304 /*
305 * The link operation is now under the same lock-hold as
306 * message iteration and thread wakeup, but doesn't have to be...
307 */
308 kr = waitq_link(port_waitq, set_waitq, WAITQ_ALREADY_LOCKED, reserved_link);
309 if (kr != KERN_SUCCESS) {
310 imq_unlock(port_mqueue);
311 return kr;
312 }
313
314 /*
315 * Now that the set has been added to the port, there may be
316 * messages queued on the port and threads waiting on the set
317 * waitq. Lets get them together.
318 */
319 kmsgq = &port_mqueue->imq_messages;
320 for (kmsg = ipc_kmsg_queue_first(kmsgq);
321 kmsg != IKM_NULL;
322 kmsg = next) {
323 next = ipc_kmsg_queue_next(kmsgq, kmsg);
324
325 for (;;) {
326 thread_t th;
327 mach_msg_size_t msize;
328 spl_t th_spl;
329
330 th = waitq_wakeup64_identify_locked(
331 port_waitq,
332 IPC_MQUEUE_RECEIVE,
333 THREAD_AWAKENED, &th_spl,
334 reserved_prepost, WAITQ_ALL_PRIORITIES,
335 WAITQ_KEEP_LOCKED);
336 /* waitq/mqueue still locked, thread locked */
337
338 if (th == THREAD_NULL) {
339 goto leave;
340 }
341
342 /*
343 * If the receiver waited with a facility not directly
344 * related to Mach messaging, then it isn't prepared to get
345 * handed the message directly. Just set it running, and
346 * go look for another thread that can.
347 */
348 if (th->ith_state != MACH_RCV_IN_PROGRESS) {
349 if (th->ith_state == MACH_PEEK_IN_PROGRESS) {
350 /*
351 * wakeup the peeking thread, but
352 * continue to loop over the threads
353 * waiting on the port's mqueue to see
354 * if there are any actual receivers
355 */
356 ipc_mqueue_peek_on_thread(port_mqueue,
357 th->ith_option,
358 th);
359 }
360 thread_unlock(th);
361 splx(th_spl);
362 continue;
363 }
364
365 /*
366 * Found a receiver. see if they can handle the message
367 * correctly (the message is not too large for them, or
368 * they didn't care to be informed that the message was
369 * too large). If they can't handle it, take them off
370 * the list and let them go back and figure it out and
371 * just move onto the next.
372 */
373 msize = ipc_kmsg_copyout_size(kmsg, th->map);
374 if (th->ith_rsize <
375 (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th), th->ith_option))) {
376 th->ith_state = MACH_RCV_TOO_LARGE;
377 th->ith_msize = msize;
378 if (th->ith_option & MACH_RCV_LARGE) {
379 /*
380 * let him go without message
381 */
382 th->ith_receiver_name = port_mqueue->imq_receiver_name;
383 th->ith_kmsg = IKM_NULL;
384 th->ith_seqno = 0;
385 thread_unlock(th);
386 splx(th_spl);
387 continue; /* find another thread */
388 }
389 } else {
390 th->ith_state = MACH_MSG_SUCCESS;
391 }
392
393 /*
394 * This thread is going to take this message,
395 * so give it to him.
396 */
397 ipc_kmsg_rmqueue(kmsgq, kmsg);
398 #if MACH_FLIPC
399 mach_node_t node = kmsg->ikm_node;
400 #endif
401 ipc_mqueue_release_msgcount(port_mqueue, IMQ_NULL);
402
403 th->ith_kmsg = kmsg;
404 th->ith_seqno = port_mqueue->imq_seqno++;
405 thread_unlock(th);
406 splx(th_spl);
407 #if MACH_FLIPC
408 if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) {
409 flipc_msg_ack(node, port_mqueue, TRUE);
410 }
411 #endif
412 break; /* go to next message */
413 }
414 }
415 leave:
416 imq_unlock(port_mqueue);
417 return KERN_SUCCESS;
418 }
419
420 /*
421 * Routine: ipc_mqueue_changed
422 * Purpose:
423 * Wake up receivers waiting in a message queue.
424 * Conditions:
425 * The message queue is locked.
426 */
427 void
428 ipc_mqueue_changed(
429 ipc_space_t space,
430 ipc_mqueue_t mqueue)
431 {
432 if (IMQ_KLIST_VALID(mqueue) && SLIST_FIRST(&mqueue->imq_klist)) {
433 /*
434 * Indicate that this message queue is vanishing
435 *
436 * When this is called, the associated receive right may be in flight
437 * between two tasks: the one it used to live in, and the one that armed
438 * a port destroyed notification for it.
439 *
440 * The new process may want to register the port it gets back with an
441 * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
442 * port pending already, in which case we want the imq_klist field to be
443 * reusable for nefarious purposes (see IMQ_SET_INHERITOR).
444 *
445 * Fortunately, we really don't need this linkage anymore after this
446 * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
447 *
448 * Note: we don't have the space lock here, however, this covers the
449 * case of when a task is terminating the space, triggering
450 * several knote_vanish() calls.
451 *
452 * We don't need the lock to observe that the space is inactive as
453 * we just deactivated it on the same thread.
454 *
455 * We still need to call knote_vanish() so that the knote is
456 * marked with EV_VANISHED or EV_EOF so that the detach step
457 * in filt_machportdetach is skipped correctly.
458 */
459 assert(space);
460 knote_vanish(&mqueue->imq_klist, is_active(space));
461 klist_init(&mqueue->imq_klist);
462 }
463
464 waitq_wakeup64_all_locked(&mqueue->imq_wait_queue,
465 IPC_MQUEUE_RECEIVE,
466 THREAD_RESTART,
467 NULL,
468 WAITQ_ALL_PRIORITIES,
469 WAITQ_KEEP_LOCKED);
470 }
471
472
473
474
475 /*
476 * Routine: ipc_mqueue_send
477 * Purpose:
478 * Send a message to a message queue. The message holds a reference
479 * for the destination port for this message queue in the
480 * msgh_remote_port field.
481 *
482 * If unsuccessful, the caller still has possession of
483 * the message and must do something with it. If successful,
484 * the message is queued, given to a receiver, or destroyed.
485 * Conditions:
486 * mqueue is locked.
487 * Returns:
488 * MACH_MSG_SUCCESS The message was accepted.
489 * MACH_SEND_TIMED_OUT Caller still has message.
490 * MACH_SEND_INTERRUPTED Caller still has message.
491 */
492 mach_msg_return_t
493 ipc_mqueue_send(
494 ipc_mqueue_t mqueue,
495 ipc_kmsg_t kmsg,
496 mach_msg_option_t option,
497 mach_msg_timeout_t send_timeout)
498 {
499 int wresult;
500
501 /*
502 * Don't block if:
503 * 1) We're under the queue limit.
504 * 2) Caller used the MACH_SEND_ALWAYS internal option.
505 * 3) Message is sent to a send-once right.
506 */
507 if (!imq_full(mqueue) ||
508 (!imq_full_kernel(mqueue) &&
509 ((option & MACH_SEND_ALWAYS) ||
510 (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
511 MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
512 mqueue->imq_msgcount++;
513 assert(mqueue->imq_msgcount > 0);
514 imq_unlock(mqueue);
515 } else {
516 thread_t cur_thread = current_thread();
517 ipc_port_t port = ip_from_mq(mqueue);
518 struct turnstile *send_turnstile = TURNSTILE_NULL;
519 turnstile_inheritor_t inheritor = TURNSTILE_INHERITOR_NULL;
520 uint64_t deadline;
521
522 /*
523 * We have to wait for space to be granted to us.
524 */
525 if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
526 imq_unlock(mqueue);
527 return MACH_SEND_TIMED_OUT;
528 }
529 if (imq_full_kernel(mqueue)) {
530 imq_unlock(mqueue);
531 return MACH_SEND_NO_BUFFER;
532 }
533 mqueue->imq_fullwaiters = TRUE;
534
535 if (option & MACH_SEND_TIMEOUT) {
536 clock_interval_to_deadline(send_timeout, 1000 * NSEC_PER_USEC, &deadline);
537 } else {
538 deadline = 0;
539 }
540
541 thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
542
543 send_turnstile = turnstile_prepare((uintptr_t)port,
544 port_send_turnstile_address(port),
545 TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
546
547 /* Check if the port in is in transit, get the destination port's turnstile */
548 if (ip_active(port) &&
549 port->ip_receiver_name == MACH_PORT_NULL &&
550 port->ip_destination != NULL) {
551 inheritor = port_send_turnstile(port->ip_destination);
552 } else {
553 inheritor = ipc_port_get_inheritor(port);
554 }
555
556 turnstile_update_inheritor(send_turnstile, inheritor,
557 TURNSTILE_DELAYED_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
558
559 wresult = waitq_assert_wait64_leeway(
560 &send_turnstile->ts_waitq,
561 IPC_MQUEUE_FULL,
562 THREAD_ABORTSAFE,
563 TIMEOUT_URGENCY_USER_NORMAL,
564 deadline,
565 TIMEOUT_NO_LEEWAY);
566
567 imq_unlock(mqueue);
568 turnstile_update_inheritor_complete(send_turnstile,
569 TURNSTILE_INTERLOCK_NOT_HELD);
570
571 if (wresult == THREAD_WAITING) {
572 wresult = thread_block(THREAD_CONTINUE_NULL);
573 counter(c_ipc_mqueue_send_block++);
574 }
575
576 /* Call turnstile complete with interlock held */
577 imq_lock(mqueue);
578 turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL);
579 imq_unlock(mqueue);
580
581 /* Call cleanup after dropping the interlock */
582 turnstile_cleanup();
583
584 switch (wresult) {
585 case THREAD_AWAKENED:
586 /*
587 * we can proceed - inherited msgcount from waker
588 * or the message queue has been destroyed and the msgcount
589 * has been reset to zero (will detect in ipc_mqueue_post()).
590 */
591 break;
592
593 case THREAD_TIMED_OUT:
594 assert(option & MACH_SEND_TIMEOUT);
595 return MACH_SEND_TIMED_OUT;
596
597 case THREAD_INTERRUPTED:
598 return MACH_SEND_INTERRUPTED;
599
600 case THREAD_RESTART:
601 /* mqueue is being destroyed */
602 return MACH_SEND_INVALID_DEST;
603 default:
604 panic("ipc_mqueue_send");
605 }
606 }
607
608 ipc_mqueue_post(mqueue, kmsg, option);
609 return MACH_MSG_SUCCESS;
610 }
611
612 /*
613 * Routine: ipc_mqueue_override_send
614 * Purpose:
615 * Set an override qos on the first message in the queue
616 * (if the queue is full). This is a send-possible override
617 * that will go away as soon as we drain a message from the
618 * queue.
619 *
620 * Conditions:
621 * The message queue is not locked.
622 * The caller holds a reference on the message queue.
623 */
624 extern void
625 ipc_mqueue_override_send(
626 ipc_mqueue_t mqueue,
627 mach_msg_priority_t override)
628 {
629 boolean_t __unused full_queue_empty = FALSE;
630
631 imq_lock(mqueue);
632 assert(imq_valid(mqueue));
633 assert(!imq_is_set(mqueue));
634
635 if (imq_full(mqueue)) {
636 ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
637
638 if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, override)) {
639 ipc_port_t port = ip_from_mq(mqueue);
640 if (ip_active(port) &&
641 port->ip_receiver_name != MACH_PORT_NULL &&
642 is_active(port->ip_receiver) &&
643 IMQ_KLIST_VALID(mqueue)) {
644 KNOTE(&mqueue->imq_klist, 0);
645 }
646 }
647 if (!first) {
648 full_queue_empty = TRUE;
649 }
650 }
651 imq_unlock(mqueue);
652
653 #if DEVELOPMENT || DEBUG
654 if (full_queue_empty) {
655 ipc_port_t port = ip_from_mq(mqueue);
656 int dst_pid = 0;
657 if (ip_active(port) && !port->ip_tempowner &&
658 port->ip_receiver_name && port->ip_receiver &&
659 port->ip_receiver != ipc_space_kernel) {
660 dst_pid = task_pid(port->ip_receiver->is_task);
661 }
662 }
663 #endif
664 }
665
666 /*
667 * Routine: ipc_mqueue_release_msgcount
668 * Purpose:
669 * Release a message queue reference in the case where we
670 * found a waiter.
671 *
672 * Conditions:
673 * The message queue is locked.
674 * The message corresponding to this reference is off the queue.
675 * There is no need to pass reserved preposts because this will
676 * never prepost to anyone
677 */
678 void
679 ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq, ipc_mqueue_t set_mq)
680 {
681 struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(port_mq));
682 (void)set_mq;
683 assert(imq_held(port_mq));
684 assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
685
686 port_mq->imq_msgcount--;
687
688 if (!imq_full(port_mq) && port_mq->imq_fullwaiters &&
689 send_turnstile != TURNSTILE_NULL) {
690 /*
691 * boost the priority of the awoken thread
692 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
693 * the message queue slot we've just reserved.
694 *
695 * NOTE: this will never prepost
696 *
697 * The wakeup happens on a turnstile waitq
698 * which will wakeup the highest priority waiter.
699 * A potential downside of this would be starving low
700 * priority senders if there is a constant churn of
701 * high priority threads trying to send to this port.
702 */
703 if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
704 IPC_MQUEUE_FULL,
705 THREAD_AWAKENED,
706 WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
707 port_mq->imq_fullwaiters = FALSE;
708 } else {
709 /* gave away our slot - add reference back */
710 port_mq->imq_msgcount++;
711 }
712 }
713
714 if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
715 /* no more msgs: invalidate the port's prepost object */
716 waitq_clear_prepost_locked(&port_mq->imq_wait_queue);
717 }
718 }
719
720 /*
721 * Routine: ipc_mqueue_post
722 * Purpose:
723 * Post a message to a waiting receiver or enqueue it. If a
724 * receiver is waiting, we can release our reserved space in
725 * the message queue.
726 *
727 * Conditions:
728 * mqueue is unlocked
729 * If we need to queue, our space in the message queue is reserved.
730 */
731 void
732 ipc_mqueue_post(
733 ipc_mqueue_t mqueue,
734 ipc_kmsg_t kmsg,
735 mach_msg_option_t __unused option)
736 {
737 uint64_t reserved_prepost = 0;
738 boolean_t destroy_msg = FALSE;
739
740 ipc_kmsg_trace_send(kmsg, option);
741
742 /*
743 * While the msg queue is locked, we have control of the
744 * kmsg, so the ref in it for the port is still good.
745 *
746 * Check for a receiver for the message.
747 */
748 imq_reserve_and_lock(mqueue, &reserved_prepost);
749
750 /* we may have raced with port destruction! */
751 if (!imq_valid(mqueue)) {
752 destroy_msg = TRUE;
753 goto out_unlock;
754 }
755
756 for (;;) {
757 struct waitq *waitq = &mqueue->imq_wait_queue;
758 spl_t th_spl;
759 thread_t receiver;
760 mach_msg_size_t msize;
761
762 receiver = waitq_wakeup64_identify_locked(waitq,
763 IPC_MQUEUE_RECEIVE,
764 THREAD_AWAKENED,
765 &th_spl,
766 &reserved_prepost,
767 WAITQ_ALL_PRIORITIES,
768 WAITQ_KEEP_LOCKED);
769 /* waitq still locked, thread locked */
770
771 if (receiver == THREAD_NULL) {
772 /*
773 * no receivers; queue kmsg if space still reserved
774 * Reservations are cancelled when the port goes inactive.
775 * note that this will enqueue the message for any
776 * "peeking" receivers.
777 *
778 * Also, post the knote to wake up any threads waiting
779 * on that style of interface if this insertion is of
780 * note (first insertion, or adjusted override qos all
781 * the way to the head of the queue).
782 *
783 * This is just for ports. portset knotes are stay-active,
784 * and their threads get awakened through the !MACH_RCV_IN_PROGRESS
785 * logic below).
786 */
787 if (mqueue->imq_msgcount > 0) {
788 if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
789 /* if the space is dead there is no point calling KNOTE */
790 ipc_port_t port = ip_from_mq(mqueue);
791 if (ip_active(port) &&
792 port->ip_receiver_name != MACH_PORT_NULL &&
793 is_active(port->ip_receiver) &&
794 IMQ_KLIST_VALID(mqueue)) {
795 KNOTE(&mqueue->imq_klist, 0);
796 }
797 }
798 break;
799 }
800
801 /*
802 * Otherwise, the message queue must belong to an inactive
803 * port, so just destroy the message and pretend it was posted.
804 */
805 destroy_msg = TRUE;
806 goto out_unlock;
807 }
808
809 /*
810 * If a thread is attempting a "peek" into the message queue
811 * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
812 * thread running. A successful peek is essentially the same as
813 * message delivery since the peeking thread takes responsibility
814 * for delivering the message and (eventually) removing it from
815 * the mqueue. Only one thread can successfully use the peek
816 * facility on any given port, so we exit the waitq loop after
817 * encountering such a thread.
818 */
819 if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) {
820 ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg);
821 ipc_mqueue_peek_on_thread(mqueue, receiver->ith_option, receiver);
822 thread_unlock(receiver);
823 splx(th_spl);
824 break; /* Message was posted, so break out of loop */
825 }
826
827 /*
828 * If the receiver waited with a facility not directly related
829 * to Mach messaging, then it isn't prepared to get handed the
830 * message directly. Just set it running, and go look for
831 * another thread that can.
832 */
833 if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
834 thread_unlock(receiver);
835 splx(th_spl);
836 continue;
837 }
838
839
840 /*
841 * We found a waiting thread.
842 * If the message is too large or the scatter list is too small
843 * the thread we wake up will get that as its status.
844 */
845 msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
846 if (receiver->ith_rsize <
847 (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver), receiver->ith_option))) {
848 receiver->ith_msize = msize;
849 receiver->ith_state = MACH_RCV_TOO_LARGE;
850 } else {
851 receiver->ith_state = MACH_MSG_SUCCESS;
852 }
853
854 /*
855 * If there is no problem with the upcoming receive, or the
856 * receiver thread didn't specifically ask for special too
857 * large error condition, go ahead and select it anyway.
858 */
859 if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
860 !(receiver->ith_option & MACH_RCV_LARGE)) {
861 receiver->ith_kmsg = kmsg;
862 receiver->ith_seqno = mqueue->imq_seqno++;
863 #if MACH_FLIPC
864 mach_node_t node = kmsg->ikm_node;
865 #endif
866 thread_unlock(receiver);
867 splx(th_spl);
868
869 /* we didn't need our reserved spot in the queue */
870 ipc_mqueue_release_msgcount(mqueue, IMQ_NULL);
871
872 #if MACH_FLIPC
873 if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) {
874 flipc_msg_ack(node, mqueue, TRUE);
875 }
876 #endif
877 break;
878 }
879
880 /*
881 * Otherwise, this thread needs to be released to run
882 * and handle its error without getting the message. We
883 * need to go back and pick another one.
884 */
885 receiver->ith_receiver_name = mqueue->imq_receiver_name;
886 receiver->ith_kmsg = IKM_NULL;
887 receiver->ith_seqno = 0;
888 thread_unlock(receiver);
889 splx(th_spl);
890 }
891
892 out_unlock:
893 /* clear the waitq boost we may have been given */
894 waitq_clear_promotion_locked(&mqueue->imq_wait_queue, current_thread());
895 imq_release_and_unlock(mqueue, reserved_prepost);
896 if (destroy_msg) {
897 ipc_kmsg_destroy(kmsg);
898 }
899
900 current_task()->messages_sent++;
901 return;
902 }
903
904
905 /* static */ void
906 ipc_mqueue_receive_results(wait_result_t saved_wait_result)
907 {
908 thread_t self = current_thread();
909 mach_msg_option_t option = self->ith_option;
910
911 /*
912 * why did we wake up?
913 */
914 switch (saved_wait_result) {
915 case THREAD_TIMED_OUT:
916 self->ith_state = MACH_RCV_TIMED_OUT;
917 return;
918
919 case THREAD_INTERRUPTED:
920 self->ith_state = MACH_RCV_INTERRUPTED;
921 return;
922
923 case THREAD_RESTART:
924 /* something bad happened to the port/set */
925 self->ith_state = MACH_RCV_PORT_CHANGED;
926 return;
927
928 case THREAD_AWAKENED:
929 /*
930 * We do not need to go select a message, somebody
931 * handed us one (or a too-large indication).
932 */
933 switch (self->ith_state) {
934 case MACH_RCV_SCATTER_SMALL:
935 case MACH_RCV_TOO_LARGE:
936 /*
937 * Somebody tried to give us a too large
938 * message. If we indicated that we cared,
939 * then they only gave us the indication,
940 * otherwise they gave us the indication
941 * AND the message anyway.
942 */
943 if (option & MACH_RCV_LARGE) {
944 return;
945 }
946
947 case MACH_MSG_SUCCESS:
948 case MACH_PEEK_READY:
949 return;
950
951 default:
952 panic("ipc_mqueue_receive_results: strange ith_state");
953 }
954
955 default:
956 panic("ipc_mqueue_receive_results: strange wait_result");
957 }
958 }
959
960 void
961 ipc_mqueue_receive_continue(
962 __unused void *param,
963 wait_result_t wresult)
964 {
965 ipc_mqueue_receive_results(wresult);
966 mach_msg_receive_continue(); /* hard-coded for now */
967 }
968
969 /*
970 * Routine: ipc_mqueue_receive
971 * Purpose:
972 * Receive a message from a message queue.
973 *
974 * Conditions:
975 * Our caller must hold a reference for the port or port set
976 * to which this queue belongs, to keep the queue
977 * from being deallocated.
978 *
979 * The kmsg is returned with clean header fields
980 * and with the circular bit turned off through the ith_kmsg
981 * field of the thread's receive continuation state.
982 * Returns:
983 * MACH_MSG_SUCCESS Message returned in ith_kmsg.
984 * MACH_RCV_TOO_LARGE Message size returned in ith_msize.
985 * MACH_RCV_TIMED_OUT No message obtained.
986 * MACH_RCV_INTERRUPTED No message obtained.
987 * MACH_RCV_PORT_DIED Port/set died; no message.
988 * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
989 *
990 */
991
992 void
993 ipc_mqueue_receive(
994 ipc_mqueue_t mqueue,
995 mach_msg_option_t option,
996 mach_msg_size_t max_size,
997 mach_msg_timeout_t rcv_timeout,
998 int interruptible)
999 {
1000 wait_result_t wresult;
1001 thread_t self = current_thread();
1002
1003 imq_lock(mqueue);
1004 wresult = ipc_mqueue_receive_on_thread(mqueue, option, max_size,
1005 rcv_timeout, interruptible,
1006 self);
1007 /* mqueue unlocked */
1008 if (wresult == THREAD_NOT_WAITING) {
1009 return;
1010 }
1011
1012 if (wresult == THREAD_WAITING) {
1013 counter((interruptible == THREAD_ABORTSAFE) ?
1014 c_ipc_mqueue_receive_block_user++ :
1015 c_ipc_mqueue_receive_block_kernel++);
1016
1017 if (self->ith_continuation) {
1018 thread_block(ipc_mqueue_receive_continue);
1019 }
1020 /* NOTREACHED */
1021
1022 wresult = thread_block(THREAD_CONTINUE_NULL);
1023 }
1024 ipc_mqueue_receive_results(wresult);
1025 }
1026
1027 static int
1028 mqueue_process_prepost_receive(void *ctx, struct waitq *waitq,
1029 struct waitq_set *wqset)
1030 {
1031 ipc_mqueue_t port_mq, *pmq_ptr;
1032
1033 (void)wqset;
1034 port_mq = (ipc_mqueue_t)waitq;
1035
1036 /*
1037 * If there are no messages on this queue, skip it and remove
1038 * it from the prepost list
1039 */
1040 if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
1041 return WQ_ITERATE_INVALIDATE_CONTINUE;
1042 }
1043
1044 /*
1045 * There are messages waiting on this port.
1046 * Instruct the prepost iteration logic to break, but keep the
1047 * waitq locked.
1048 */
1049 pmq_ptr = (ipc_mqueue_t *)ctx;
1050 if (pmq_ptr) {
1051 *pmq_ptr = port_mq;
1052 }
1053 return WQ_ITERATE_BREAK_KEEP_LOCKED;
1054 }
1055
1056 /*
1057 * Routine: ipc_mqueue_receive_on_thread
1058 * Purpose:
1059 * Receive a message from a message queue using a specified thread.
1060 * If no message available, assert_wait on the appropriate waitq.
1061 *
1062 * Conditions:
1063 * Assumes thread is self.
1064 * Called with mqueue locked.
1065 * Returns with mqueue unlocked.
1066 * May have assert-waited. Caller must block in those cases.
1067 */
1068 wait_result_t
1069 ipc_mqueue_receive_on_thread(
1070 ipc_mqueue_t mqueue,
1071 mach_msg_option_t option,
1072 mach_msg_size_t max_size,
1073 mach_msg_timeout_t rcv_timeout,
1074 int interruptible,
1075 thread_t thread)
1076 {
1077 wait_result_t wresult;
1078 uint64_t deadline;
1079 struct turnstile *rcv_turnstile = TURNSTILE_NULL;
1080 turnstile_inheritor_t inheritor = NULL;
1081
1082 /* called with mqueue locked */
1083
1084 /* no need to reserve anything: we never prepost to anyone */
1085
1086 if (!imq_valid(mqueue)) {
1087 /* someone raced us to destroy this mqueue/port! */
1088 imq_unlock(mqueue);
1089 /*
1090 * ipc_mqueue_receive_results updates the thread's ith_state
1091 * TODO: differentiate between rights being moved and
1092 * rights/ports being destroyed (21885327)
1093 */
1094 return THREAD_RESTART;
1095 }
1096
1097 if (imq_is_set(mqueue)) {
1098 ipc_mqueue_t port_mq = IMQ_NULL;
1099
1100 (void)waitq_set_iterate_preposts(&mqueue->imq_set_queue,
1101 &port_mq,
1102 mqueue_process_prepost_receive);
1103
1104 if (port_mq != IMQ_NULL) {
1105 /*
1106 * We get here if there is at least one message
1107 * waiting on port_mq. We have instructed the prepost
1108 * iteration logic to leave both the port_mq and the
1109 * set mqueue locked.
1110 *
1111 * TODO: previously, we would place this port at the
1112 * back of the prepost list...
1113 */
1114 imq_unlock(mqueue);
1115
1116 /*
1117 * Continue on to handling the message with just
1118 * the port mqueue locked.
1119 */
1120 if (option & MACH_PEEK_MSG) {
1121 ipc_mqueue_peek_on_thread(port_mq, option, thread);
1122 } else {
1123 ipc_mqueue_select_on_thread(port_mq, mqueue, option,
1124 max_size, thread);
1125 }
1126
1127 imq_unlock(port_mq);
1128 return THREAD_NOT_WAITING;
1129 }
1130 } else if (imq_is_queue(mqueue)) {
1131 ipc_kmsg_queue_t kmsgs;
1132
1133 /*
1134 * Receive on a single port. Just try to get the messages.
1135 */
1136 kmsgs = &mqueue->imq_messages;
1137 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
1138 if (option & MACH_PEEK_MSG) {
1139 ipc_mqueue_peek_on_thread(mqueue, option, thread);
1140 } else {
1141 ipc_mqueue_select_on_thread(mqueue, IMQ_NULL, option,
1142 max_size, thread);
1143 }
1144 imq_unlock(mqueue);
1145 return THREAD_NOT_WAITING;
1146 }
1147 } else {
1148 panic("Unknown mqueue type 0x%x: likely memory corruption!\n",
1149 mqueue->imq_wait_queue.waitq_type);
1150 }
1151
1152 /*
1153 * Looks like we'll have to block. The mqueue we will
1154 * block on (whether the set's or the local port's) is
1155 * still locked.
1156 */
1157 if (option & MACH_RCV_TIMEOUT) {
1158 if (rcv_timeout == 0) {
1159 imq_unlock(mqueue);
1160 thread->ith_state = MACH_RCV_TIMED_OUT;
1161 return THREAD_NOT_WAITING;
1162 }
1163 }
1164
1165 thread->ith_option = option;
1166 thread->ith_rsize = max_size;
1167 thread->ith_msize = 0;
1168
1169 if (option & MACH_PEEK_MSG) {
1170 thread->ith_state = MACH_PEEK_IN_PROGRESS;
1171 } else {
1172 thread->ith_state = MACH_RCV_IN_PROGRESS;
1173 }
1174
1175 if (option & MACH_RCV_TIMEOUT) {
1176 clock_interval_to_deadline(rcv_timeout, 1000 * NSEC_PER_USEC, &deadline);
1177 } else {
1178 deadline = 0;
1179 }
1180
1181 /*
1182 * Threads waiting on a port (not portset)
1183 * will wait on port's receive turnstile.
1184 * Donate waiting thread's turnstile and
1185 * setup inheritor for special reply port.
1186 * Based on the state of the special reply
1187 * port, the inheritor would be the send
1188 * turnstile of the connection port on which
1189 * the send of sync ipc would happen or
1190 * workloop's turnstile who would reply to
1191 * the sync ipc message.
1192 *
1193 * Pass in mqueue wait in waitq_assert_wait to
1194 * support port set wakeup. The mqueue waitq of port
1195 * will be converted to to turnstile waitq
1196 * in waitq_assert_wait instead of global waitqs.
1197 */
1198 if (imq_is_queue(mqueue)) {
1199 ipc_port_t port = ip_from_mq(mqueue);
1200 rcv_turnstile = turnstile_prepare((uintptr_t)port,
1201 port_rcv_turnstile_address(port),
1202 TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
1203
1204 if (port->ip_specialreply) {
1205 inheritor = ipc_port_get_special_reply_port_inheritor(port);
1206 }
1207
1208 turnstile_update_inheritor(rcv_turnstile, inheritor,
1209 (TURNSTILE_INHERITOR_TURNSTILE | TURNSTILE_DELAYED_UPDATE));
1210 }
1211
1212 thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
1213 wresult = waitq_assert_wait64_locked(&mqueue->imq_wait_queue,
1214 IPC_MQUEUE_RECEIVE,
1215 interruptible,
1216 TIMEOUT_URGENCY_USER_NORMAL,
1217 deadline,
1218 TIMEOUT_NO_LEEWAY,
1219 thread);
1220 /* preposts should be detected above, not here */
1221 if (wresult == THREAD_AWAKENED) {
1222 panic("ipc_mqueue_receive_on_thread: sleep walking");
1223 }
1224
1225 imq_unlock(mqueue);
1226
1227 /* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
1228 if (rcv_turnstile != TURNSTILE_NULL) {
1229 turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
1230 }
1231 /* Its callers responsibility to call turnstile_complete to get the turnstile back */
1232
1233 return wresult;
1234 }
1235
1236
1237 /*
1238 * Routine: ipc_mqueue_peek_on_thread
1239 * Purpose:
1240 * A receiver discovered that there was a message on the queue
1241 * before he had to block. Tell a thread about the message queue,
1242 * but don't pick off any messages.
1243 * Conditions:
1244 * port_mq locked
1245 * at least one message on port_mq's message queue
1246 *
1247 * Returns: (on thread->ith_state)
1248 * MACH_PEEK_READY ith_peekq contains a message queue
1249 */
1250 void
1251 ipc_mqueue_peek_on_thread(
1252 ipc_mqueue_t port_mq,
1253 mach_msg_option_t option,
1254 thread_t thread)
1255 {
1256 (void)option;
1257 assert(option & MACH_PEEK_MSG);
1258 assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL);
1259
1260 /*
1261 * Take a reference on the mqueue's associated port:
1262 * the peeking thread will be responsible to release this reference
1263 * using ip_release_mq()
1264 */
1265 ip_reference_mq(port_mq);
1266 thread->ith_peekq = port_mq;
1267 thread->ith_state = MACH_PEEK_READY;
1268 }
1269
1270 /*
1271 * Routine: ipc_mqueue_select_on_thread
1272 * Purpose:
1273 * A receiver discovered that there was a message on the queue
1274 * before he had to block. Pick the message off the queue and
1275 * "post" it to thread.
1276 * Conditions:
1277 * mqueue locked.
1278 * thread not locked.
1279 * There is a message.
1280 * No need to reserve prepost objects - it will never prepost
1281 *
1282 * Returns:
1283 * MACH_MSG_SUCCESS Actually selected a message for ourselves.
1284 * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
1285 */
1286 void
1287 ipc_mqueue_select_on_thread(
1288 ipc_mqueue_t port_mq,
1289 ipc_mqueue_t set_mq,
1290 mach_msg_option_t option,
1291 mach_msg_size_t max_size,
1292 thread_t thread)
1293 {
1294 ipc_kmsg_t kmsg;
1295 mach_msg_return_t mr = MACH_MSG_SUCCESS;
1296 mach_msg_size_t msize;
1297
1298 /*
1299 * Do some sanity checking of our ability to receive
1300 * before pulling the message off the queue.
1301 */
1302 kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
1303 assert(kmsg != IKM_NULL);
1304
1305 /*
1306 * If we really can't receive it, but we had the
1307 * MACH_RCV_LARGE option set, then don't take it off
1308 * the queue, instead return the appropriate error
1309 * (and size needed).
1310 */
1311 msize = ipc_kmsg_copyout_size(kmsg, thread->map);
1312 if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(thread), option) > max_size) {
1313 mr = MACH_RCV_TOO_LARGE;
1314 if (option & MACH_RCV_LARGE) {
1315 thread->ith_receiver_name = port_mq->imq_receiver_name;
1316 thread->ith_kmsg = IKM_NULL;
1317 thread->ith_msize = msize;
1318 thread->ith_seqno = 0;
1319 thread->ith_state = mr;
1320 return;
1321 }
1322 }
1323
1324 ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
1325 #if MACH_FLIPC
1326 if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) {
1327 flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
1328 }
1329 #endif
1330 ipc_mqueue_release_msgcount(port_mq, set_mq);
1331 thread->ith_seqno = port_mq->imq_seqno++;
1332 thread->ith_kmsg = kmsg;
1333 thread->ith_state = mr;
1334
1335 current_task()->messages_received++;
1336 return;
1337 }
1338
1339 /*
1340 * Routine: ipc_mqueue_peek_locked
1341 * Purpose:
1342 * Peek at a (non-set) message queue to see if it has a message
1343 * matching the sequence number provided (if zero, then the
1344 * first message in the queue) and return vital info about the
1345 * message.
1346 *
1347 * Conditions:
1348 * The ipc_mqueue_t is locked by callers.
1349 * Other locks may be held by callers, so this routine cannot block.
1350 * Caller holds reference on the message queue.
1351 */
1352 unsigned
1353 ipc_mqueue_peek_locked(ipc_mqueue_t mq,
1354 mach_port_seqno_t * seqnop,
1355 mach_msg_size_t * msg_sizep,
1356 mach_msg_id_t * msg_idp,
1357 mach_msg_max_trailer_t * msg_trailerp,
1358 ipc_kmsg_t *kmsgp)
1359 {
1360 ipc_kmsg_queue_t kmsgq;
1361 ipc_kmsg_t kmsg;
1362 mach_port_seqno_t seqno, msgoff;
1363 unsigned res = 0;
1364
1365 assert(!imq_is_set(mq));
1366
1367 seqno = 0;
1368 if (seqnop != NULL) {
1369 seqno = *seqnop;
1370 }
1371
1372 if (seqno == 0) {
1373 seqno = mq->imq_seqno;
1374 msgoff = 0;
1375 } else if (seqno >= mq->imq_seqno &&
1376 seqno < mq->imq_seqno + mq->imq_msgcount) {
1377 msgoff = seqno - mq->imq_seqno;
1378 } else {
1379 goto out;
1380 }
1381
1382 /* look for the message that would match that seqno */
1383 kmsgq = &mq->imq_messages;
1384 kmsg = ipc_kmsg_queue_first(kmsgq);
1385 while (msgoff-- && kmsg != IKM_NULL) {
1386 kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
1387 }
1388 if (kmsg == IKM_NULL) {
1389 goto out;
1390 }
1391
1392 /* found one - return the requested info */
1393 if (seqnop != NULL) {
1394 *seqnop = seqno;
1395 }
1396 if (msg_sizep != NULL) {
1397 *msg_sizep = kmsg->ikm_header->msgh_size;
1398 }
1399 if (msg_idp != NULL) {
1400 *msg_idp = kmsg->ikm_header->msgh_id;
1401 }
1402 if (msg_trailerp != NULL) {
1403 memcpy(msg_trailerp,
1404 (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
1405 round_msg(kmsg->ikm_header->msgh_size)),
1406 sizeof(mach_msg_max_trailer_t));
1407 }
1408 if (kmsgp != NULL) {
1409 *kmsgp = kmsg;
1410 }
1411
1412 res = 1;
1413
1414 out:
1415 return res;
1416 }
1417
1418
1419 /*
1420 * Routine: ipc_mqueue_peek
1421 * Purpose:
1422 * Peek at a (non-set) message queue to see if it has a message
1423 * matching the sequence number provided (if zero, then the
1424 * first message in the queue) and return vital info about the
1425 * message.
1426 *
1427 * Conditions:
1428 * The ipc_mqueue_t is unlocked.
1429 * Locks may be held by callers, so this routine cannot block.
1430 * Caller holds reference on the message queue.
1431 */
1432 unsigned
1433 ipc_mqueue_peek(ipc_mqueue_t mq,
1434 mach_port_seqno_t * seqnop,
1435 mach_msg_size_t * msg_sizep,
1436 mach_msg_id_t * msg_idp,
1437 mach_msg_max_trailer_t * msg_trailerp,
1438 ipc_kmsg_t *kmsgp)
1439 {
1440 unsigned res;
1441
1442 imq_lock(mq);
1443
1444 res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
1445 msg_trailerp, kmsgp);
1446
1447 imq_unlock(mq);
1448 return res;
1449 }
1450
1451 /*
1452 * Routine: ipc_mqueue_release_peek_ref
1453 * Purpose:
1454 * Release the reference on an mqueue's associated port which was
1455 * granted to a thread in ipc_mqueue_peek_on_thread (on the
1456 * MACH_PEEK_MSG thread wakeup path).
1457 *
1458 * Conditions:
1459 * The ipc_mqueue_t should be locked on entry.
1460 * The ipc_mqueue_t will be _unlocked_ on return
1461 * (and potentially invalid!)
1462 *
1463 */
1464 void
1465 ipc_mqueue_release_peek_ref(ipc_mqueue_t mq)
1466 {
1467 assert(!imq_is_set(mq));
1468 assert(imq_held(mq));
1469
1470 /*
1471 * clear any preposts this mq may have generated
1472 * (which would cause subsequent immediate wakeups)
1473 */
1474 waitq_clear_prepost_locked(&mq->imq_wait_queue);
1475
1476 imq_unlock(mq);
1477
1478 /*
1479 * release the port reference: we need to do this outside the lock
1480 * because we might be holding the last port reference!
1481 **/
1482 ip_release_mq(mq);
1483 }
1484
1485 /*
1486 * peek at the contained port message queues, break prepost iteration as soon
1487 * as we spot a message on one of the message queues referenced by the set's
1488 * prepost list. No need to lock each message queue, as only the head of each
1489 * queue is checked. If a message wasn't there before we entered here, no need
1490 * to find it (if we do, great).
1491 */
1492 static int
1493 mqueue_peek_iterator(void *ctx, struct waitq *waitq,
1494 struct waitq_set *wqset)
1495 {
1496 ipc_mqueue_t port_mq = (ipc_mqueue_t)waitq;
1497 ipc_kmsg_queue_t kmsgs = &port_mq->imq_messages;
1498
1499 (void)ctx;
1500 (void)wqset;
1501
1502 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
1503 return WQ_ITERATE_BREAK; /* break out of the prepost iteration */
1504 }
1505 return WQ_ITERATE_CONTINUE;
1506 }
1507
1508 /*
1509 * Routine: ipc_mqueue_set_peek
1510 * Purpose:
1511 * Peek at a message queue set to see if it has any ports
1512 * with messages.
1513 *
1514 * Conditions:
1515 * Locks may be held by callers, so this routine cannot block.
1516 * Caller holds reference on the message queue.
1517 */
1518 unsigned
1519 ipc_mqueue_set_peek(ipc_mqueue_t mq)
1520 {
1521 int ret;
1522
1523 imq_lock(mq);
1524
1525 /*
1526 * We may have raced with port destruction where the mqueue is marked
1527 * as invalid. In that case, even though we don't have messages, we
1528 * have an end-of-life event to deliver.
1529 */
1530 if (!imq_is_valid(mq)) {
1531 return 1;
1532 }
1533
1534 ret = waitq_set_iterate_preposts(&mq->imq_set_queue, NULL,
1535 mqueue_peek_iterator);
1536
1537 imq_unlock(mq);
1538
1539 return ret == WQ_ITERATE_BREAK;
1540 }
1541
1542 /*
1543 * Routine: ipc_mqueue_set_gather_member_names
1544 * Purpose:
1545 * Discover all ports which are members of a given port set.
1546 * Because the waitq linkage mechanism was redesigned to save
1547 * significan amounts of memory, it no longer keeps back-pointers
1548 * from a port set to a port. Therefore, we must iterate over all
1549 * ports within a given IPC space and individually query them to
1550 * see if they are members of the given set. Port names of ports
1551 * found to be members of the given set will be gathered into the
1552 * provided 'names' array. Actual returned names are limited to
1553 * maxnames entries, but we keep counting the actual number of
1554 * members to let the caller decide to retry if necessary.
1555 *
1556 * Conditions:
1557 * Locks may be held by callers, so this routine cannot block.
1558 * Caller holds reference on the message queue (via port set).
1559 */
1560 void
1561 ipc_mqueue_set_gather_member_names(
1562 ipc_space_t space,
1563 ipc_mqueue_t set_mq,
1564 ipc_entry_num_t maxnames,
1565 mach_port_name_t *names,
1566 ipc_entry_num_t *actualp)
1567 {
1568 ipc_entry_t table;
1569 ipc_entry_num_t tsize;
1570 struct waitq_set *wqset;
1571 ipc_entry_num_t actual = 0;
1572
1573 assert(set_mq != IMQ_NULL);
1574 wqset = &set_mq->imq_set_queue;
1575
1576 assert(space != IS_NULL);
1577 is_read_lock(space);
1578 if (!is_active(space)) {
1579 is_read_unlock(space);
1580 goto out;
1581 }
1582
1583 if (!waitq_set_is_valid(wqset)) {
1584 is_read_unlock(space);
1585 goto out;
1586 }
1587
1588 table = space->is_table;
1589 tsize = space->is_table_size;
1590 for (ipc_entry_num_t idx = 0; idx < tsize; idx++) {
1591 ipc_entry_t entry = &table[idx];
1592
1593 /* only receive rights can be members of port sets */
1594 if ((entry->ie_bits & MACH_PORT_TYPE_RECEIVE) != MACH_PORT_TYPE_NONE) {
1595 __IGNORE_WCASTALIGN(ipc_port_t port = (ipc_port_t)entry->ie_object);
1596 ipc_mqueue_t mq = &port->ip_messages;
1597
1598 assert(IP_VALID(port));
1599 if (ip_active(port) &&
1600 waitq_member(&mq->imq_wait_queue, wqset)) {
1601 if (actual < maxnames) {
1602 names[actual] = mq->imq_receiver_name;
1603 }
1604 actual++;
1605 }
1606 }
1607 }
1608
1609 is_read_unlock(space);
1610
1611 out:
1612 *actualp = actual;
1613 }
1614
1615
1616 /*
1617 * Routine: ipc_mqueue_destroy_locked
1618 * Purpose:
1619 * Destroy a (non-set) message queue.
1620 * Set any blocked senders running.
1621 * Destroy the kmsgs in the queue.
1622 * Conditions:
1623 * mqueue locked
1624 * Receivers were removed when the receive right was "changed"
1625 */
1626 boolean_t
1627 ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue)
1628 {
1629 ipc_kmsg_queue_t kmqueue;
1630 ipc_kmsg_t kmsg;
1631 boolean_t reap = FALSE;
1632 struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
1633
1634 assert(!imq_is_set(mqueue));
1635
1636 /*
1637 * rouse all blocked senders
1638 * (don't boost anyone - we're tearing this queue down)
1639 * (never preposts)
1640 */
1641 mqueue->imq_fullwaiters = FALSE;
1642
1643 if (send_turnstile != TURNSTILE_NULL) {
1644 waitq_wakeup64_all(&send_turnstile->ts_waitq,
1645 IPC_MQUEUE_FULL,
1646 THREAD_RESTART,
1647 WAITQ_ALL_PRIORITIES);
1648 }
1649
1650 /*
1651 * Move messages from the specified queue to the per-thread
1652 * clean/drain queue while we have the mqueue lock.
1653 */
1654 kmqueue = &mqueue->imq_messages;
1655 while ((kmsg = ipc_kmsg_dequeue(kmqueue)) != IKM_NULL) {
1656 #if MACH_FLIPC
1657 if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(mqueue->imq_fport)) {
1658 flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
1659 }
1660 #endif
1661 boolean_t first;
1662 first = ipc_kmsg_delayed_destroy(kmsg);
1663 if (first) {
1664 reap = first;
1665 }
1666 }
1667
1668 /*
1669 * Wipe out message count, both for messages about to be
1670 * reaped and for reserved space for (previously) woken senders.
1671 * This is the indication to them that their reserved space is gone
1672 * (the mqueue was destroyed).
1673 */
1674 mqueue->imq_msgcount = 0;
1675
1676 /* invalidate the waitq for subsequent mqueue operations */
1677 waitq_invalidate_locked(&mqueue->imq_wait_queue);
1678
1679 /* clear out any preposting we may have done */
1680 waitq_clear_prepost_locked(&mqueue->imq_wait_queue);
1681
1682 /*
1683 * assert that we are destroying / invalidating a queue that's
1684 * not a member of any other queue.
1685 */
1686 assert(mqueue->imq_preposts == 0);
1687 assert(mqueue->imq_in_pset == 0);
1688
1689 return reap;
1690 }
1691
1692 /*
1693 * Routine: ipc_mqueue_set_qlimit
1694 * Purpose:
1695 * Changes a message queue limit; the maximum number
1696 * of messages which may be queued.
1697 * Conditions:
1698 * Nothing locked.
1699 */
1700
1701 void
1702 ipc_mqueue_set_qlimit(
1703 ipc_mqueue_t mqueue,
1704 mach_port_msgcount_t qlimit)
1705 {
1706 assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1707
1708 /* wake up senders allowed by the new qlimit */
1709 imq_lock(mqueue);
1710 if (qlimit > mqueue->imq_qlimit) {
1711 mach_port_msgcount_t i, wakeup;
1712 struct turnstile *send_turnstile = port_send_turnstile(ip_from_mq(mqueue));
1713
1714 /* caution: wakeup, qlimit are unsigned */
1715 wakeup = qlimit - mqueue->imq_qlimit;
1716
1717 for (i = 0; i < wakeup; i++) {
1718 /*
1719 * boost the priority of the awoken thread
1720 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
1721 * the message queue slot we've just reserved.
1722 *
1723 * NOTE: this will never prepost
1724 */
1725 if (send_turnstile == TURNSTILE_NULL ||
1726 waitq_wakeup64_one(&send_turnstile->ts_waitq,
1727 IPC_MQUEUE_FULL,
1728 THREAD_AWAKENED,
1729 WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
1730 mqueue->imq_fullwaiters = FALSE;
1731 break;
1732 }
1733 mqueue->imq_msgcount++; /* give it to the awakened thread */
1734 }
1735 }
1736 mqueue->imq_qlimit = qlimit;
1737 imq_unlock(mqueue);
1738 }
1739
1740 /*
1741 * Routine: ipc_mqueue_set_seqno
1742 * Purpose:
1743 * Changes an mqueue's sequence number.
1744 * Conditions:
1745 * Caller holds a reference to the queue's containing object.
1746 */
1747 void
1748 ipc_mqueue_set_seqno(
1749 ipc_mqueue_t mqueue,
1750 mach_port_seqno_t seqno)
1751 {
1752 imq_lock(mqueue);
1753 mqueue->imq_seqno = seqno;
1754 imq_unlock(mqueue);
1755 }
1756
1757
1758 /*
1759 * Routine: ipc_mqueue_copyin
1760 * Purpose:
1761 * Convert a name in a space to a message queue.
1762 * Conditions:
1763 * Nothing locked. If successful, the caller gets a ref for
1764 * for the object. This ref ensures the continued existence of
1765 * the queue.
1766 * Returns:
1767 * MACH_MSG_SUCCESS Found a message queue.
1768 * MACH_RCV_INVALID_NAME The space is dead.
1769 * MACH_RCV_INVALID_NAME The name doesn't denote a right.
1770 * MACH_RCV_INVALID_NAME
1771 * The denoted right is not receive or port set.
1772 * MACH_RCV_IN_SET Receive right is a member of a set.
1773 */
1774
1775 mach_msg_return_t
1776 ipc_mqueue_copyin(
1777 ipc_space_t space,
1778 mach_port_name_t name,
1779 ipc_mqueue_t *mqueuep,
1780 ipc_object_t *objectp)
1781 {
1782 ipc_entry_t entry;
1783 ipc_object_t object;
1784 ipc_mqueue_t mqueue;
1785
1786 is_read_lock(space);
1787 if (!is_active(space)) {
1788 is_read_unlock(space);
1789 return MACH_RCV_INVALID_NAME;
1790 }
1791
1792 entry = ipc_entry_lookup(space, name);
1793 if (entry == IE_NULL) {
1794 is_read_unlock(space);
1795 return MACH_RCV_INVALID_NAME;
1796 }
1797
1798 object = entry->ie_object;
1799
1800 if (entry->ie_bits & MACH_PORT_TYPE_RECEIVE) {
1801 ipc_port_t port;
1802
1803 __IGNORE_WCASTALIGN(port = (ipc_port_t) object);
1804 assert(port != IP_NULL);
1805
1806 ip_lock(port);
1807 assert(ip_active(port));
1808 assert(port->ip_receiver_name == name);
1809 assert(port->ip_receiver == space);
1810 is_read_unlock(space);
1811 mqueue = &port->ip_messages;
1812 } else if (entry->ie_bits & MACH_PORT_TYPE_PORT_SET) {
1813 ipc_pset_t pset;
1814
1815 __IGNORE_WCASTALIGN(pset = (ipc_pset_t) object);
1816 assert(pset != IPS_NULL);
1817
1818 ips_lock(pset);
1819 assert(ips_active(pset));
1820 is_read_unlock(space);
1821
1822 mqueue = &pset->ips_messages;
1823 } else {
1824 is_read_unlock(space);
1825 return MACH_RCV_INVALID_NAME;
1826 }
1827
1828 /*
1829 * At this point, the object is locked and active,
1830 * the space is unlocked, and mqueue is initialized.
1831 */
1832
1833 io_reference(object);
1834 io_unlock(object);
1835
1836 *objectp = object;
1837 *mqueuep = mqueue;
1838 return MACH_MSG_SUCCESS;
1839 }