2  * Copyright (c) 2000-2007 Apple Inc. All rights reserved. 
   4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ 
   6  * This file contains Original Code and/or Modifications of Original Code 
   7  * as defined in and that are subject to the Apple Public Source License 
   8  * Version 2.0 (the 'License'). You may not use this file except in 
   9  * compliance with the License. The rights granted to you under the License 
  10  * may not be used to create, or enable the creation or redistribution of, 
  11  * unlawful or unlicensed copies of an Apple operating system, or to 
  12  * circumvent, violate, or enable the circumvention or violation of, any 
  13  * terms of an Apple operating system software license agreement. 
  15  * Please obtain a copy of the License at 
  16  * http://www.opensource.apple.com/apsl/ and read it before using this file. 
  18  * The Original Code and all software distributed under the License are 
  19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER 
  20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, 
  21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, 
  22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. 
  23  * Please see the License for the specific language governing rights and 
  24  * limitations under the License. 
  26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ 
  29  * @OSF_FREE_COPYRIGHT@ 
  32  * Mach Operating System 
  33  * Copyright (c) 1991,1990,1989 Carnegie Mellon University 
  34  * All Rights Reserved. 
  36  * Permission to use, copy, modify and distribute this software and its 
  37  * documentation is hereby granted, provided that both the copyright 
  38  * notice and this permission notice appear in all copies of the 
  39  * software, derivative works or modified versions, and any portions 
  40  * thereof, and that both notices appear in supporting documentation. 
  42  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 
  43  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR 
  44  * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 
  46  * Carnegie Mellon requests users of this software to return to 
  48  *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU 
  49  *  School of Computer Science 
  50  *  Carnegie Mellon University 
  51  *  Pittsburgh PA 15213-3890 
  53  * any improvements or extensions that they make and grant Carnegie Mellon 
  54  * the rights to redistribute these changes. 
  59  *      File:   ipc/ipc_mqueue.c 
  63  *      Functions to manipulate IPC message queues. 
  66  * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce 
  67  * support for mandatory and extensible security protections.  This notice 
  68  * is included in support of clause 2.2 (b) of the Apple Public License, 
  73 #include <mach/port.h> 
  74 #include <mach/message.h> 
  75 #include <mach/sync_policy.h> 
  77 #include <kern/assert.h> 
  78 #include <kern/counter.h> 
  79 #include <kern/sched_prim.h> 
  80 #include <kern/ipc_kobject.h> 
  81 #include <kern/ipc_mig.h>       /* XXX - for mach_msg_receive_continue */ 
  82 #include <kern/misc_protos.h> 
  83 #include <kern/task.h> 
  84 #include <kern/thread.h> 
  85 #include <kern/waitq.h> 
  88 #include <ipc/ipc_mqueue.h> 
  89 #include <ipc/ipc_kmsg.h> 
  90 #include <ipc/ipc_port.h> 
  91 #include <ipc/ipc_pset.h> 
  92 #include <ipc/ipc_space.h> 
  95 #include <ipc/flipc.h> 
  99 #include <vm/vm_map.h> 
 102 #include <sys/event.h> 
 104 extern char     *proc_name_address(void *p
); 
 106 int ipc_mqueue_full
;            /* address is event for queue space */ 
 107 int ipc_mqueue_rcv
;             /* address is event for message arrival */ 
 109 /* forward declarations */ 
 110 static void ipc_mqueue_receive_results(wait_result_t result
); 
 111 static void ipc_mqueue_peek_on_thread( 
 112         ipc_mqueue_t        port_mq
, 
 113         mach_msg_option_t   option
, 
 117  *      Routine:        ipc_mqueue_init 
 119  *              Initialize a newly-allocated message queue. 
 124         ipc_mqueue_kind_t       kind
) 
 127         case IPC_MQUEUE_KIND_SET
: 
 128                 waitq_set_init(&mqueue
->imq_set_queue
, 
 129                     SYNC_POLICY_FIFO 
| SYNC_POLICY_PREPOST
, 
 132         case IPC_MQUEUE_KIND_NONE
: /* cheat: we really should have "no" mqueue */ 
 133         case IPC_MQUEUE_KIND_PORT
: 
 134                 waitq_init(&mqueue
->imq_wait_queue
, 
 135                     SYNC_POLICY_FIFO 
| SYNC_POLICY_TURNSTILE_PROXY
); 
 136                 ipc_kmsg_queue_init(&mqueue
->imq_messages
); 
 137                 mqueue
->imq_seqno 
= 0; 
 138                 mqueue
->imq_msgcount 
= 0; 
 139                 mqueue
->imq_qlimit 
= MACH_PORT_QLIMIT_DEFAULT
; 
 140                 mqueue
->imq_context 
= 0; 
 141                 mqueue
->imq_fullwaiters 
= FALSE
; 
 143                 mqueue
->imq_fport 
= FPORT_NULL
; 
 147         klist_init(&mqueue
->imq_klist
); 
 154         boolean_t is_set 
= imq_is_set(mqueue
); 
 157                 waitq_set_deinit(&mqueue
->imq_set_queue
); 
 159                 waitq_deinit(&mqueue
->imq_wait_queue
); 
 164  *      Routine:        imq_reserve_and_lock 
 166  *              Atomically lock an ipc_mqueue_t object and reserve 
 167  *              an appropriate number of prepost linkage objects for 
 168  *              use in wakeup operations. 
 173 imq_reserve_and_lock(ipc_mqueue_t mq
, uint64_t *reserved_prepost
) 
 175         *reserved_prepost 
= waitq_prepost_reserve(&mq
->imq_wait_queue
, 0, 
 181  *      Routine:        imq_release_and_unlock 
 183  *              Unlock an ipc_mqueue_t object, re-enable interrupts, 
 184  *              and release any unused prepost object reservations. 
 189 imq_release_and_unlock(ipc_mqueue_t mq
, uint64_t reserved_prepost
) 
 191         waitq_unlock(&mq
->imq_wait_queue
); 
 192         waitq_prepost_release_reserve(reserved_prepost
); 
 197  *      Routine:        ipc_mqueue_member 
 199  *              Indicate whether the (port) mqueue is a member of 
 200  *              this portset's mqueue.  We do this by checking 
 201  *              whether the portset mqueue's waitq is an member of 
 202  *              the port's mqueue waitq. 
 204  *              the portset's mqueue is not already a member 
 205  *              this may block while allocating linkage structures. 
 210         ipc_mqueue_t            port_mqueue
, 
 211         ipc_mqueue_t            set_mqueue
) 
 213         struct waitq 
*port_waitq 
= &port_mqueue
->imq_wait_queue
; 
 214         struct waitq_set 
*set_waitq 
= &set_mqueue
->imq_set_queue
; 
 216         return waitq_member(port_waitq
, set_waitq
); 
 220  *      Routine:        ipc_mqueue_remove 
 222  *              Remove the association between the queue and the specified 
 229         ipc_mqueue_t      set_mqueue
) 
 231         struct waitq 
*mq_waitq 
= &mqueue
->imq_wait_queue
; 
 232         struct waitq_set 
*set_waitq 
= &set_mqueue
->imq_set_queue
; 
 234         return waitq_unlink(mq_waitq
, set_waitq
); 
 238  *      Routine:        ipc_mqueue_remove_from_all 
 240  *              Remove the mqueue from all the sets it is a member of 
 244  *              mqueue unlocked and set links deallocated 
 247 ipc_mqueue_remove_from_all(ipc_mqueue_t mqueue
) 
 249         struct waitq 
*mq_waitq 
= &mqueue
->imq_wait_queue
; 
 254         assert(waitq_valid(mq_waitq
)); 
 255         kr 
= waitq_unlink_all_unlock(mq_waitq
); 
 256         /* mqueue unlocked and set links deallocated */ 
 260  *      Routine:        ipc_mqueue_remove_all 
 262  *              Remove all the member queues from the specified set. 
 263  *              Also removes the queue from any containing sets. 
 267  *              mqueue unlocked all set links deallocated 
 270 ipc_mqueue_remove_all(ipc_mqueue_t      mqueue
) 
 272         struct waitq_set 
*mq_setq 
= &mqueue
->imq_set_queue
; 
 275         assert(waitqs_is_set(mq_setq
)); 
 276         waitq_set_unlink_all_unlock(mq_setq
); 
 277         /* mqueue unlocked set links deallocated */ 
 282  *      Routine:        ipc_mqueue_add 
 284  *              Associate the portset's mqueue with the port's mqueue. 
 285  *              This has to be done so that posting the port will wakeup 
 286  *              a portset waiter.  If there are waiters on the portset 
 287  *              mqueue and messages on the port mqueue, try to match them 
 294         ipc_mqueue_t    port_mqueue
, 
 295         ipc_mqueue_t    set_mqueue
, 
 296         uint64_t        *reserved_link
, 
 297         uint64_t        *reserved_prepost
) 
 299         struct waitq     
*port_waitq 
= &port_mqueue
->imq_wait_queue
; 
 300         struct waitq_set 
*set_waitq 
= &set_mqueue
->imq_set_queue
; 
 301         ipc_kmsg_queue_t kmsgq
; 
 302         ipc_kmsg_t       kmsg
, next
; 
 305         assert(reserved_link 
&& *reserved_link 
!= 0); 
 306         assert(waitqs_is_linked(set_waitq
)); 
 308         imq_lock(port_mqueue
); 
 311          * The link operation is now under the same lock-hold as 
 312          * message iteration and thread wakeup, but doesn't have to be... 
 314         kr 
= waitq_link(port_waitq
, set_waitq
, WAITQ_ALREADY_LOCKED
, reserved_link
); 
 315         if (kr 
!= KERN_SUCCESS
) { 
 316                 imq_unlock(port_mqueue
); 
 321          * Now that the set has been added to the port, there may be 
 322          * messages queued on the port and threads waiting on the set 
 323          * waitq.  Lets get them together. 
 325         kmsgq 
= &port_mqueue
->imq_messages
; 
 326         for (kmsg 
= ipc_kmsg_queue_first(kmsgq
); 
 329                 next 
= ipc_kmsg_queue_next(kmsgq
, kmsg
); 
 333                         mach_msg_size_t msize
; 
 336                         th 
= waitq_wakeup64_identify_locked( 
 339                                 THREAD_AWAKENED
, &th_spl
, 
 340                                 reserved_prepost
, WAITQ_ALL_PRIORITIES
, 
 342                         /* waitq/mqueue still locked, thread locked */ 
 344                         if (th 
== THREAD_NULL
) { 
 349                          * If the receiver waited with a facility not directly 
 350                          * related to Mach messaging, then it isn't prepared to get 
 351                          * handed the message directly.  Just set it running, and 
 352                          * go look for another thread that can. 
 354                         if (th
->ith_state 
!= MACH_RCV_IN_PROGRESS
) { 
 355                                 if (th
->ith_state 
== MACH_PEEK_IN_PROGRESS
) { 
 357                                          * wakeup the peeking thread, but 
 358                                          * continue to loop over the threads 
 359                                          * waiting on the port's mqueue to see 
 360                                          * if there are any actual receivers 
 362                                         ipc_mqueue_peek_on_thread(port_mqueue
, 
 372                          * Found a receiver. see if they can handle the message 
 373                          * correctly (the message is not too large for them, or 
 374                          * they didn't care to be informed that the message was 
 375                          * too large).  If they can't handle it, take them off 
 376                          * the list and let them go back and figure it out and 
 377                          * just move onto the next. 
 379                         msize 
= ipc_kmsg_copyout_size(kmsg
, th
->map
); 
 381                             (msize 
+ REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th
), th
->ith_option
))) { 
 382                                 th
->ith_state 
= MACH_RCV_TOO_LARGE
; 
 383                                 th
->ith_msize 
= msize
; 
 384                                 if (th
->ith_option 
& MACH_RCV_LARGE
) { 
 386                                          * let him go without message 
 388                                         th
->ith_receiver_name 
= port_mqueue
->imq_receiver_name
; 
 389                                         th
->ith_kmsg 
= IKM_NULL
; 
 393                                         continue; /* find another thread */ 
 396                                 th
->ith_state 
= MACH_MSG_SUCCESS
; 
 400                          * This thread is going to take this message, 
 403                         ipc_kmsg_rmqueue(kmsgq
, kmsg
); 
 405                         mach_node_t  node 
= kmsg
->ikm_node
; 
 407                         ipc_mqueue_release_msgcount(port_mqueue
, IMQ_NULL
); 
 410                         th
->ith_seqno 
= port_mqueue
->imq_seqno
++; 
 414                         if (MACH_NODE_VALID(node
) && FPORT_VALID(port_mqueue
->imq_fport
)) { 
 415                                 flipc_msg_ack(node
, port_mqueue
, TRUE
); 
 418                         break;  /* go to next message */ 
 422         imq_unlock(port_mqueue
); 
 428  *      Routine:        ipc_mqueue_has_klist 
 430  *              Returns whether the given mqueue imq_klist field can be used as a klist. 
 433 ipc_mqueue_has_klist(ipc_mqueue_t mqueue
) 
 435         ipc_object_t object 
= imq_to_object(mqueue
); 
 436         if (io_otype(object
) != IOT_PORT
) { 
 439         ipc_port_t port 
= ip_from_mq(mqueue
); 
 440         if (port
->ip_specialreply
) { 
 443         return port
->ip_sync_link_state 
== PORT_SYNC_LINK_ANY
; 
 447  *      Routine:        ipc_mqueue_changed 
 449  *              Wake up receivers waiting in a message queue. 
 451  *              The message queue is locked. 
 458         if (ipc_mqueue_has_klist(mqueue
) && SLIST_FIRST(&mqueue
->imq_klist
)) { 
 460                  * Indicate that this message queue is vanishing 
 462                  * When this is called, the associated receive right may be in flight 
 463                  * between two tasks: the one it used to live in, and the one that armed 
 464                  * a port destroyed notification for it. 
 466                  * The new process may want to register the port it gets back with an 
 467                  * EVFILT_MACHPORT filter again, and may have pending sync IPC on this 
 468                  * port pending already, in which case we want the imq_klist field to be 
 469                  * reusable for nefarious purposes. 
 471                  * Fortunately, we really don't need this linkage anymore after this 
 472                  * point as EV_VANISHED / EV_EOF will be the last thing delivered ever. 
 474                  * Note: we don't have the space lock here, however, this covers the 
 475                  *       case of when a task is terminating the space, triggering 
 476                  *       several knote_vanish() calls. 
 478                  *       We don't need the lock to observe that the space is inactive as 
 479                  *       we just deactivated it on the same thread. 
 481                  *       We still need to call knote_vanish() so that the knote is 
 482                  *       marked with EV_VANISHED or EV_EOF so that the detach step 
 483                  *       in filt_machportdetach is skipped correctly. 
 486                 knote_vanish(&mqueue
->imq_klist
, is_active(space
)); 
 489         if (io_otype(imq_to_object(mqueue
)) == IOT_PORT
) { 
 490                 ipc_port_adjust_sync_link_state_locked(ip_from_mq(mqueue
), PORT_SYNC_LINK_ANY
, NULL
); 
 492                 klist_init(&mqueue
->imq_klist
); 
 495         waitq_wakeup64_all_locked(&mqueue
->imq_wait_queue
, 
 499             WAITQ_ALL_PRIORITIES
, 
 507  *      Routine:        ipc_mqueue_send 
 509  *              Send a message to a message queue.  The message holds a reference 
 510  *              for the destination port for this message queue in the 
 511  *              msgh_remote_port field. 
 513  *              If unsuccessful, the caller still has possession of 
 514  *              the message and must do something with it.  If successful, 
 515  *              the message is queued, given to a receiver, or destroyed. 
 519  *              MACH_MSG_SUCCESS        The message was accepted. 
 520  *              MACH_SEND_TIMED_OUT     Caller still has message. 
 521  *              MACH_SEND_INTERRUPTED   Caller still has message. 
 527         mach_msg_option_t       option
, 
 528         mach_msg_timeout_t  send_timeout
) 
 534          *      1) We're under the queue limit. 
 535          *      2) Caller used the MACH_SEND_ALWAYS internal option. 
 536          *      3) Message is sent to a send-once right. 
 538         if (!imq_full(mqueue
) || 
 539             (!imq_full_kernel(mqueue
) && 
 540             ((option 
& MACH_SEND_ALWAYS
) || 
 541             (MACH_MSGH_BITS_REMOTE(kmsg
->ikm_header
->msgh_bits
) == 
 542             MACH_MSG_TYPE_PORT_SEND_ONCE
)))) { 
 543                 mqueue
->imq_msgcount
++; 
 544                 assert(mqueue
->imq_msgcount 
> 0); 
 547                 thread_t cur_thread 
= current_thread(); 
 548                 ipc_port_t port 
= ip_from_mq(mqueue
); 
 549                 struct turnstile 
*send_turnstile 
= TURNSTILE_NULL
; 
 553                  * We have to wait for space to be granted to us. 
 555                 if ((option 
& MACH_SEND_TIMEOUT
) && (send_timeout 
== 0)) { 
 557                         return MACH_SEND_TIMED_OUT
; 
 559                 if (imq_full_kernel(mqueue
)) { 
 561                         return MACH_SEND_NO_BUFFER
; 
 563                 mqueue
->imq_fullwaiters 
= TRUE
; 
 565                 if (option 
& MACH_SEND_TIMEOUT
) { 
 566                         clock_interval_to_deadline(send_timeout
, 1000 * NSEC_PER_USEC
, &deadline
); 
 571                 thread_set_pending_block_hint(cur_thread
, kThreadWaitPortSend
); 
 573                 send_turnstile 
= turnstile_prepare((uintptr_t)port
, 
 574                     port_send_turnstile_address(port
), 
 575                     TURNSTILE_NULL
, TURNSTILE_SYNC_IPC
); 
 577                 ipc_port_send_update_inheritor(port
, send_turnstile
, 
 578                     TURNSTILE_DELAYED_UPDATE
); 
 580                 wresult 
= waitq_assert_wait64_leeway( 
 581                         &send_turnstile
->ts_waitq
, 
 584                         TIMEOUT_URGENCY_USER_NORMAL
, 
 589                 turnstile_update_inheritor_complete(send_turnstile
, 
 590                     TURNSTILE_INTERLOCK_NOT_HELD
); 
 592                 if (wresult 
== THREAD_WAITING
) { 
 593                         wresult 
= thread_block(THREAD_CONTINUE_NULL
); 
 596                 /* Call turnstile complete with interlock held */ 
 598                 turnstile_complete((uintptr_t)port
, port_send_turnstile_address(port
), NULL
, TURNSTILE_SYNC_IPC
); 
 601                 /* Call cleanup after dropping the interlock */ 
 605                 case THREAD_AWAKENED
: 
 607                          * we can proceed - inherited msgcount from waker 
 608                          * or the message queue has been destroyed and the msgcount 
 609                          * has been reset to zero (will detect in ipc_mqueue_post()). 
 613                 case THREAD_TIMED_OUT
: 
 614                         assert(option 
& MACH_SEND_TIMEOUT
); 
 615                         return MACH_SEND_TIMED_OUT
; 
 617                 case THREAD_INTERRUPTED
: 
 618                         return MACH_SEND_INTERRUPTED
; 
 621                         /* mqueue is being destroyed */ 
 622                         return MACH_SEND_INVALID_DEST
; 
 624                         panic("ipc_mqueue_send"); 
 628         ipc_mqueue_post(mqueue
, kmsg
, option
); 
 629         return MACH_MSG_SUCCESS
; 
 633  *      Routine:        ipc_mqueue_override_send 
 635  *              Set an override qos on the first message in the queue 
 636  *              (if the queue is full). This is a send-possible override 
 637  *              that will go away as soon as we drain a message from the 
 641  *              The message queue is not locked. 
 642  *              The caller holds a reference on the message queue. 
 645 ipc_mqueue_override_send( 
 647         mach_msg_qos_t      qos_ovr
) 
 649         boolean_t __unused full_queue_empty 
= FALSE
; 
 652         assert(imq_valid(mqueue
)); 
 653         assert(!imq_is_set(mqueue
)); 
 655         if (imq_full(mqueue
)) { 
 656                 ipc_kmsg_t first 
= ipc_kmsg_queue_first(&mqueue
->imq_messages
); 
 658                 if (first 
&& ipc_kmsg_override_qos(&mqueue
->imq_messages
, first
, qos_ovr
)) { 
 659                         ipc_object_t object 
= imq_to_object(mqueue
); 
 660                         assert(io_otype(object
) == IOT_PORT
); 
 661                         ipc_port_t port 
= ip_object_to_port(object
); 
 662                         if (ip_active(port
) && 
 663                             port
->ip_receiver_name 
!= MACH_PORT_NULL 
&& 
 664                             is_active(port
->ip_receiver
) && 
 665                             ipc_mqueue_has_klist(mqueue
)) { 
 666                                 KNOTE(&mqueue
->imq_klist
, 0); 
 670                         full_queue_empty 
= TRUE
; 
 675 #if DEVELOPMENT || DEBUG 
 676         if (full_queue_empty
) { 
 677                 ipc_port_t port 
= ip_from_mq(mqueue
); 
 679                 dst_pid 
= ipc_port_get_receiver_task(port
, NULL
); 
 685  *      Routine:        ipc_mqueue_release_msgcount 
 687  *              Release a message queue reference in the case where we 
 691  *              The message queue is locked. 
 692  *              The message corresponding to this reference is off the queue. 
 693  *              There is no need to pass reserved preposts because this will 
 694  *              never prepost to anyone 
 697 ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq
, ipc_mqueue_t set_mq
) 
 699         struct turnstile 
*send_turnstile 
= port_send_turnstile(ip_from_mq(port_mq
)); 
 702         assert(port_mq
->imq_msgcount 
> 1 || ipc_kmsg_queue_empty(&port_mq
->imq_messages
)); 
 704         port_mq
->imq_msgcount
--; 
 706         if (!imq_full(port_mq
) && port_mq
->imq_fullwaiters 
&& 
 707             send_turnstile 
!= TURNSTILE_NULL
) { 
 709                  * boost the priority of the awoken thread 
 710                  * (WAITQ_PROMOTE_PRIORITY) to ensure it uses 
 711                  * the message queue slot we've just reserved. 
 713                  * NOTE: this will never prepost 
 715                  * The wakeup happens on a turnstile waitq 
 716                  * which will wakeup the highest priority waiter. 
 717                  * A potential downside of this would be starving low 
 718                  * priority senders if there is a constant churn of 
 719                  * high priority threads trying to send to this port. 
 721                 if (waitq_wakeup64_one(&send_turnstile
->ts_waitq
, 
 724                     WAITQ_PROMOTE_PRIORITY
) != KERN_SUCCESS
) { 
 725                         port_mq
->imq_fullwaiters 
= FALSE
; 
 727                         /* gave away our slot - add reference back */ 
 728                         port_mq
->imq_msgcount
++; 
 732         if (ipc_kmsg_queue_empty(&port_mq
->imq_messages
)) { 
 733                 /* no more msgs: invalidate the port's prepost object */ 
 734                 waitq_clear_prepost_locked(&port_mq
->imq_wait_queue
); 
 739  *      Routine:        ipc_mqueue_post 
 741  *              Post a message to a waiting receiver or enqueue it.  If a 
 742  *              receiver is waiting, we can release our reserved space in 
 747  *              If we need to queue, our space in the message queue is reserved. 
 753         mach_msg_option_t __unused option
) 
 755         uint64_t reserved_prepost 
= 0; 
 756         boolean_t destroy_msg 
= FALSE
; 
 758         ipc_kmsg_trace_send(kmsg
, option
); 
 761          *      While the msg queue     is locked, we have control of the 
 762          *  kmsg, so the ref in it for the port is still good. 
 764          *      Check for a receiver for the message. 
 766         imq_reserve_and_lock(mqueue
, &reserved_prepost
); 
 768         /* we may have raced with port destruction! */ 
 769         if (!imq_valid(mqueue
)) { 
 775                 struct waitq 
*waitq 
= &mqueue
->imq_wait_queue
; 
 778                 mach_msg_size_t msize
; 
 780                 receiver 
= waitq_wakeup64_identify_locked(waitq
, 
 785                     WAITQ_ALL_PRIORITIES
, 
 787                 /* waitq still locked, thread locked */ 
 789                 if (receiver 
== THREAD_NULL
) { 
 791                          * no receivers; queue kmsg if space still reserved 
 792                          * Reservations are cancelled when the port goes inactive. 
 793                          * note that this will enqueue the message for any 
 794                          * "peeking" receivers. 
 796                          * Also, post the knote to wake up any threads waiting 
 797                          * on that style of interface if this insertion is of 
 798                          * note (first insertion, or adjusted override qos all 
 799                          * the way to the head of the queue). 
 801                          * This is just for ports. portset knotes are stay-active, 
 802                          * and their threads get awakened through the !MACH_RCV_IN_PROGRESS 
 805                         if (mqueue
->imq_msgcount 
> 0) { 
 806                                 if (ipc_kmsg_enqueue_qos(&mqueue
->imq_messages
, kmsg
)) { 
 807                                         /* if the space is dead there is no point calling KNOTE */ 
 808                                         ipc_object_t object 
= imq_to_object(mqueue
); 
 809                                         assert(io_otype(object
) == IOT_PORT
); 
 810                                         ipc_port_t port 
= ip_object_to_port(object
); 
 811                                         if (ip_active(port
) && 
 812                                             port
->ip_receiver_name 
!= MACH_PORT_NULL 
&& 
 813                                             is_active(port
->ip_receiver
) && 
 814                                             ipc_mqueue_has_klist(mqueue
)) { 
 815                                                 KNOTE(&mqueue
->imq_klist
, 0); 
 822                          * Otherwise, the message queue must belong to an inactive 
 823                          * port, so just destroy the message and pretend it was posted. 
 830                  * If a thread is attempting a "peek" into the message queue 
 831                  * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the 
 832                  * thread running.  A successful peek is essentially the same as 
 833                  * message delivery since the peeking thread takes responsibility 
 834                  * for delivering the message and (eventually) removing it from 
 835                  * the mqueue.  Only one thread can successfully use the peek 
 836                  * facility on any given port, so we exit the waitq loop after 
 837                  * encountering such a thread. 
 839                 if (receiver
->ith_state 
== MACH_PEEK_IN_PROGRESS 
&& mqueue
->imq_msgcount 
> 0) { 
 840                         ipc_kmsg_enqueue_qos(&mqueue
->imq_messages
, kmsg
); 
 841                         ipc_mqueue_peek_on_thread(mqueue
, receiver
->ith_option
, receiver
); 
 842                         thread_unlock(receiver
); 
 844                         break; /* Message was posted, so break out of loop */ 
 848                  * If the receiver waited with a facility not directly related 
 849                  * to Mach messaging, then it isn't prepared to get handed the 
 850                  * message directly. Just set it running, and go look for 
 851                  * another thread that can. 
 853                 if (receiver
->ith_state 
!= MACH_RCV_IN_PROGRESS
) { 
 854                         thread_unlock(receiver
); 
 861                  * We found a waiting thread. 
 862                  * If the message is too large or the scatter list is too small 
 863                  * the thread we wake up will get that as its status. 
 865                 msize 
= ipc_kmsg_copyout_size(kmsg
, receiver
->map
); 
 866                 if (receiver
->ith_rsize 
< 
 867                     (msize 
+ REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver
), receiver
->ith_option
))) { 
 868                         receiver
->ith_msize 
= msize
; 
 869                         receiver
->ith_state 
= MACH_RCV_TOO_LARGE
; 
 871                         receiver
->ith_state 
= MACH_MSG_SUCCESS
; 
 875                  * If there is no problem with the upcoming receive, or the 
 876                  * receiver thread didn't specifically ask for special too 
 877                  * large error condition, go ahead and select it anyway. 
 879                 if ((receiver
->ith_state 
== MACH_MSG_SUCCESS
) || 
 880                     !(receiver
->ith_option 
& MACH_RCV_LARGE
)) { 
 881                         receiver
->ith_kmsg 
= kmsg
; 
 882                         receiver
->ith_seqno 
= mqueue
->imq_seqno
++; 
 884                         mach_node_t node 
= kmsg
->ikm_node
; 
 886                         thread_unlock(receiver
); 
 889                         /* we didn't need our reserved spot in the queue */ 
 890                         ipc_mqueue_release_msgcount(mqueue
, IMQ_NULL
); 
 893                         if (MACH_NODE_VALID(node
) && FPORT_VALID(mqueue
->imq_fport
)) { 
 894                                 flipc_msg_ack(node
, mqueue
, TRUE
); 
 901                  * Otherwise, this thread needs to be released to run 
 902                  * and handle its error without getting the message.  We 
 903                  * need to go back and pick another one. 
 905                 receiver
->ith_receiver_name 
= mqueue
->imq_receiver_name
; 
 906                 receiver
->ith_kmsg 
= IKM_NULL
; 
 907                 receiver
->ith_seqno 
= 0; 
 908                 thread_unlock(receiver
); 
 913         /* clear the waitq boost we may have been given */ 
 914         waitq_clear_promotion_locked(&mqueue
->imq_wait_queue
, current_thread()); 
 915         imq_release_and_unlock(mqueue
, reserved_prepost
); 
 917                 ipc_kmsg_destroy(kmsg
); 
 920         current_task()->messages_sent
++; 
 926 ipc_mqueue_receive_results(wait_result_t saved_wait_result
) 
 928         thread_t                self 
= current_thread(); 
 929         mach_msg_option_t       option 
= self
->ith_option
; 
 932          * why did we wake up? 
 934         switch (saved_wait_result
) { 
 935         case THREAD_TIMED_OUT
: 
 936                 self
->ith_state 
= MACH_RCV_TIMED_OUT
; 
 939         case THREAD_INTERRUPTED
: 
 940                 self
->ith_state 
= MACH_RCV_INTERRUPTED
; 
 944                 /* something bad happened to the port/set */ 
 945                 self
->ith_state 
= MACH_RCV_PORT_CHANGED
; 
 948         case THREAD_AWAKENED
: 
 950                  * We do not need to go select a message, somebody 
 951                  * handed us one (or a too-large indication). 
 953                 switch (self
->ith_state
) { 
 954                 case MACH_RCV_SCATTER_SMALL
: 
 955                 case MACH_RCV_TOO_LARGE
: 
 957                          * Somebody tried to give us a too large 
 958                          * message. If we indicated that we cared, 
 959                          * then they only gave us the indication, 
 960                          * otherwise they gave us the indication 
 961                          * AND the message anyway. 
 963                         if (option 
& MACH_RCV_LARGE
) { 
 967                 case MACH_MSG_SUCCESS
: 
 969                 case MACH_PEEK_READY
: 
 973                         panic("ipc_mqueue_receive_results: strange ith_state"); 
 977                 panic("ipc_mqueue_receive_results: strange wait_result"); 
 982 ipc_mqueue_receive_continue( 
 983         __unused 
void *param
, 
 984         wait_result_t wresult
) 
 986         ipc_mqueue_receive_results(wresult
); 
 987         mach_msg_receive_continue();  /* hard-coded for now */ 
 991  *      Routine:        ipc_mqueue_receive 
 993  *              Receive a message from a message queue. 
 996  *              Our caller must hold a reference for the port or port set 
 997  *              to which this queue belongs, to keep the queue 
 998  *              from being deallocated. 
1000  *              The kmsg is returned with clean header fields 
1001  *              and with the circular bit turned off through the ith_kmsg 
1002  *              field of the thread's receive continuation state. 
1004  *              MACH_MSG_SUCCESS        Message returned in ith_kmsg. 
1005  *              MACH_RCV_TOO_LARGE      Message size returned in ith_msize. 
1006  *              MACH_RCV_TIMED_OUT      No message obtained. 
1007  *              MACH_RCV_INTERRUPTED    No message obtained. 
1008  *              MACH_RCV_PORT_DIED      Port/set died; no message. 
1009  *              MACH_RCV_PORT_CHANGED   Port moved into set; no msg. 
1015         ipc_mqueue_t            mqueue
, 
1016         mach_msg_option_t       option
, 
1017         mach_msg_size_t         max_size
, 
1018         mach_msg_timeout_t      rcv_timeout
, 
1021         wait_result_t           wresult
; 
1022         thread_t                self 
= current_thread(); 
1025         wresult 
= ipc_mqueue_receive_on_thread(mqueue
, option
, max_size
, 
1026             rcv_timeout
, interruptible
, 
1028         /* mqueue unlocked */ 
1029         if (wresult 
== THREAD_NOT_WAITING
) { 
1033         if (wresult 
== THREAD_WAITING
) { 
1034                 if (self
->ith_continuation
) { 
1035                         thread_block(ipc_mqueue_receive_continue
); 
1039                 wresult 
= thread_block(THREAD_CONTINUE_NULL
); 
1041         ipc_mqueue_receive_results(wresult
); 
1045 mqueue_process_prepost_receive(void *ctx
, struct waitq 
*waitq
, 
1046     struct waitq_set 
*wqset
) 
1048         ipc_mqueue_t     port_mq
, *pmq_ptr
; 
1051         port_mq 
= (ipc_mqueue_t
)waitq
; 
1054          * If there are no messages on this queue, skip it and remove 
1055          * it from the prepost list 
1057         if (ipc_kmsg_queue_empty(&port_mq
->imq_messages
)) { 
1058                 return WQ_ITERATE_INVALIDATE_CONTINUE
; 
1062          * There are messages waiting on this port. 
1063          * Instruct the prepost iteration logic to break, but keep the 
1066         pmq_ptr 
= (ipc_mqueue_t 
*)ctx
; 
1070         return WQ_ITERATE_BREAK_KEEP_LOCKED
; 
1074  *      Routine:        ipc_mqueue_receive_on_thread 
1076  *              Receive a message from a message queue using a specified thread. 
1077  *              If no message available, assert_wait on the appropriate waitq. 
1080  *              Assumes thread is self. 
1081  *              Called with mqueue locked. 
1082  *              Returns with mqueue unlocked. 
1083  *              May have assert-waited. Caller must block in those cases. 
1086 ipc_mqueue_receive_on_thread( 
1087         ipc_mqueue_t            mqueue
, 
1088         mach_msg_option_t       option
, 
1089         mach_msg_size_t         max_size
, 
1090         mach_msg_timeout_t      rcv_timeout
, 
1094         wait_result_t           wresult
; 
1096         struct turnstile        
*rcv_turnstile 
= TURNSTILE_NULL
; 
1098         /* called with mqueue locked */ 
1100         /* no need to reserve anything: we never prepost to anyone */ 
1102         if (!imq_valid(mqueue
)) { 
1103                 /* someone raced us to destroy this mqueue/port! */ 
1106                  * ipc_mqueue_receive_results updates the thread's ith_state 
1107                  * TODO: differentiate between rights being moved and 
1108                  * rights/ports being destroyed (21885327) 
1110                 return THREAD_RESTART
; 
1113         if (imq_is_set(mqueue
)) { 
1114                 ipc_mqueue_t port_mq 
= IMQ_NULL
; 
1116                 (void)waitq_set_iterate_preposts(&mqueue
->imq_set_queue
, 
1118                     mqueue_process_prepost_receive
); 
1120                 if (port_mq 
!= IMQ_NULL
) { 
1122                          * We get here if there is at least one message 
1123                          * waiting on port_mq. We have instructed the prepost 
1124                          * iteration logic to leave both the port_mq and the 
1125                          * set mqueue locked. 
1127                          * TODO: previously, we would place this port at the 
1128                          *       back of the prepost list... 
1133                          * Continue on to handling the message with just 
1134                          * the port mqueue locked. 
1136                         if (option 
& MACH_PEEK_MSG
) { 
1137                                 ipc_mqueue_peek_on_thread(port_mq
, option
, thread
); 
1139                                 ipc_mqueue_select_on_thread(port_mq
, mqueue
, option
, 
1143                         imq_unlock(port_mq
); 
1144                         return THREAD_NOT_WAITING
; 
1146         } else if (imq_is_queue(mqueue
) || imq_is_turnstile_proxy(mqueue
)) { 
1147                 ipc_kmsg_queue_t kmsgs
; 
1150                  * Receive on a single port. Just try to get the messages. 
1152                 kmsgs 
= &mqueue
->imq_messages
; 
1153                 if (ipc_kmsg_queue_first(kmsgs
) != IKM_NULL
) { 
1154                         if (option 
& MACH_PEEK_MSG
) { 
1155                                 ipc_mqueue_peek_on_thread(mqueue
, option
, thread
); 
1157                                 ipc_mqueue_select_on_thread(mqueue
, IMQ_NULL
, option
, 
1161                         return THREAD_NOT_WAITING
; 
1164                 panic("Unknown mqueue type 0x%x: likely memory corruption!\n", 
1165                     mqueue
->imq_wait_queue
.waitq_type
); 
1169          * Looks like we'll have to block.  The mqueue we will 
1170          * block on (whether the set's or the local port's) is 
1173         if (option 
& MACH_RCV_TIMEOUT
) { 
1174                 if (rcv_timeout 
== 0) { 
1176                         thread
->ith_state 
= MACH_RCV_TIMED_OUT
; 
1177                         return THREAD_NOT_WAITING
; 
1181         thread
->ith_option 
= option
; 
1182         thread
->ith_rsize 
= max_size
; 
1183         thread
->ith_msize 
= 0; 
1185         if (option 
& MACH_PEEK_MSG
) { 
1186                 thread
->ith_state 
= MACH_PEEK_IN_PROGRESS
; 
1188                 thread
->ith_state 
= MACH_RCV_IN_PROGRESS
; 
1191         if (option 
& MACH_RCV_TIMEOUT
) { 
1192                 clock_interval_to_deadline(rcv_timeout
, 1000 * NSEC_PER_USEC
, &deadline
); 
1198          * Threads waiting on a reply port (not portset) 
1199          * will wait on its receive turnstile. 
1201          * Donate waiting thread's turnstile and 
1202          * setup inheritor for special reply port. 
1203          * Based on the state of the special reply 
1204          * port, the inheritor would be the send 
1205          * turnstile of the connection port on which 
1206          * the send of sync ipc would happen or 
1207          * workloop's turnstile who would reply to 
1208          * the sync ipc message. 
1210          * Pass in mqueue wait in waitq_assert_wait to 
1211          * support port set wakeup. The mqueue waitq of port 
1212          * will be converted to to turnstile waitq 
1213          * in waitq_assert_wait instead of global waitqs. 
1215         if (imq_is_turnstile_proxy(mqueue
)) { 
1216                 ipc_port_t port 
= ip_from_mq(mqueue
); 
1217                 rcv_turnstile 
= turnstile_prepare((uintptr_t)port
, 
1218                     port_rcv_turnstile_address(port
), 
1219                     TURNSTILE_NULL
, TURNSTILE_SYNC_IPC
); 
1221                 ipc_port_recv_update_inheritor(port
, rcv_turnstile
, 
1222                     TURNSTILE_DELAYED_UPDATE
); 
1225         thread_set_pending_block_hint(thread
, kThreadWaitPortReceive
); 
1226         wresult 
= waitq_assert_wait64_locked(&mqueue
->imq_wait_queue
, 
1229             TIMEOUT_URGENCY_USER_NORMAL
, 
1233         /* preposts should be detected above, not here */ 
1234         if (wresult 
== THREAD_AWAKENED
) { 
1235                 panic("ipc_mqueue_receive_on_thread: sleep walking"); 
1240         /* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */ 
1241         if (rcv_turnstile 
!= TURNSTILE_NULL
) { 
1242                 turnstile_update_inheritor_complete(rcv_turnstile
, TURNSTILE_INTERLOCK_NOT_HELD
); 
1244         /* Its callers responsibility to call turnstile_complete to get the turnstile back */ 
1251  *      Routine:        ipc_mqueue_peek_on_thread 
1253  *              A receiver discovered that there was a message on the queue 
1254  *              before he had to block. Tell a thread about the message queue, 
1255  *              but don't pick off any messages. 
1258  *              at least one message on port_mq's message queue 
1260  *      Returns: (on thread->ith_state) 
1261  *              MACH_PEEK_READY         ith_peekq contains a message queue 
1264 ipc_mqueue_peek_on_thread( 
1265         ipc_mqueue_t        port_mq
, 
1266         mach_msg_option_t   option
, 
1270         assert(option 
& MACH_PEEK_MSG
); 
1271         assert(ipc_kmsg_queue_first(&port_mq
->imq_messages
) != IKM_NULL
); 
1274          * Take a reference on the mqueue's associated port: 
1275          * the peeking thread will be responsible to release this reference 
1276          * using ip_release_mq() 
1278         ip_reference_mq(port_mq
); 
1279         thread
->ith_peekq 
= port_mq
; 
1280         thread
->ith_state 
= MACH_PEEK_READY
; 
1284  *      Routine:        ipc_mqueue_select_on_thread 
1286  *              A receiver discovered that there was a message on the queue 
1287  *              before he had to block.  Pick the message off the queue and 
1288  *              "post" it to thread. 
1291  *              thread not locked. 
1292  *              There is a message. 
1293  *              No need to reserve prepost objects - it will never prepost 
1296  *              MACH_MSG_SUCCESS        Actually selected a message for ourselves. 
1297  *              MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large 
1300 ipc_mqueue_select_on_thread( 
1301         ipc_mqueue_t            port_mq
, 
1302         ipc_mqueue_t            set_mq
, 
1303         mach_msg_option_t       option
, 
1304         mach_msg_size_t         max_size
, 
1308         mach_msg_return_t mr 
= MACH_MSG_SUCCESS
; 
1309         mach_msg_size_t msize
; 
1312          * Do some sanity checking of our ability to receive 
1313          * before pulling the message off the queue. 
1315         kmsg 
= ipc_kmsg_queue_first(&port_mq
->imq_messages
); 
1316         assert(kmsg 
!= IKM_NULL
); 
1319          * If we really can't receive it, but we had the 
1320          * MACH_RCV_LARGE option set, then don't take it off 
1321          * the queue, instead return the appropriate error 
1322          * (and size needed). 
1324         msize 
= ipc_kmsg_copyout_size(kmsg
, thread
->map
); 
1325         if (msize 
+ REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(thread
), option
) > max_size
) { 
1326                 mr 
= MACH_RCV_TOO_LARGE
; 
1327                 if (option 
& MACH_RCV_LARGE
) { 
1328                         thread
->ith_receiver_name 
= port_mq
->imq_receiver_name
; 
1329                         thread
->ith_kmsg 
= IKM_NULL
; 
1330                         thread
->ith_msize 
= msize
; 
1331                         thread
->ith_seqno 
= 0; 
1332                         thread
->ith_state 
= mr
; 
1337         ipc_kmsg_rmqueue(&port_mq
->imq_messages
, kmsg
); 
1339         if (MACH_NODE_VALID(kmsg
->ikm_node
) && FPORT_VALID(port_mq
->imq_fport
)) { 
1340                 flipc_msg_ack(kmsg
->ikm_node
, port_mq
, TRUE
); 
1343         ipc_mqueue_release_msgcount(port_mq
, set_mq
); 
1344         thread
->ith_seqno 
= port_mq
->imq_seqno
++; 
1345         thread
->ith_kmsg 
= kmsg
; 
1346         thread
->ith_state 
= mr
; 
1348         current_task()->messages_received
++; 
1353  *      Routine:        ipc_mqueue_peek_locked 
1355  *              Peek at a (non-set) message queue to see if it has a message 
1356  *              matching the sequence number provided (if zero, then the 
1357  *              first message in the queue) and return vital info about the 
1361  *              The ipc_mqueue_t is locked by callers. 
1362  *              Other locks may be held by callers, so this routine cannot block. 
1363  *              Caller holds reference on the message queue. 
1366 ipc_mqueue_peek_locked(ipc_mqueue_t mq
, 
1367     mach_port_seqno_t 
* seqnop
, 
1368     mach_msg_size_t 
* msg_sizep
, 
1369     mach_msg_id_t 
* msg_idp
, 
1370     mach_msg_max_trailer_t 
* msg_trailerp
, 
1373         ipc_kmsg_queue_t kmsgq
; 
1375         mach_port_seqno_t seqno
, msgoff
; 
1378         assert(!imq_is_set(mq
)); 
1381         if (seqnop 
!= NULL
) { 
1386                 seqno 
= mq
->imq_seqno
; 
1388         } else if (seqno 
>= mq
->imq_seqno 
&& 
1389             seqno 
< mq
->imq_seqno 
+ mq
->imq_msgcount
) { 
1390                 msgoff 
= seqno 
- mq
->imq_seqno
; 
1395         /* look for the message that would match that seqno */ 
1396         kmsgq 
= &mq
->imq_messages
; 
1397         kmsg 
= ipc_kmsg_queue_first(kmsgq
); 
1398         while (msgoff
-- && kmsg 
!= IKM_NULL
) { 
1399                 kmsg 
= ipc_kmsg_queue_next(kmsgq
, kmsg
); 
1401         if (kmsg 
== IKM_NULL
) { 
1405         /* found one - return the requested info */ 
1406         if (seqnop 
!= NULL
) { 
1409         if (msg_sizep 
!= NULL
) { 
1410                 *msg_sizep 
= kmsg
->ikm_header
->msgh_size
; 
1412         if (msg_idp 
!= NULL
) { 
1413                 *msg_idp 
= kmsg
->ikm_header
->msgh_id
; 
1415         if (msg_trailerp 
!= NULL
) { 
1416                 memcpy(msg_trailerp
, 
1417                     (mach_msg_max_trailer_t 
*)((vm_offset_t
)kmsg
->ikm_header 
+ 
1418                     mach_round_msg(kmsg
->ikm_header
->msgh_size
)), 
1419                     sizeof(mach_msg_max_trailer_t
)); 
1421         if (kmsgp 
!= NULL
) { 
1433  *      Routine:        ipc_mqueue_peek 
1435  *              Peek at a (non-set) message queue to see if it has a message 
1436  *              matching the sequence number provided (if zero, then the 
1437  *              first message in the queue) and return vital info about the 
1441  *              The ipc_mqueue_t is unlocked. 
1442  *              Locks may be held by callers, so this routine cannot block. 
1443  *              Caller holds reference on the message queue. 
1446 ipc_mqueue_peek(ipc_mqueue_t mq
, 
1447     mach_port_seqno_t 
* seqnop
, 
1448     mach_msg_size_t 
* msg_sizep
, 
1449     mach_msg_id_t 
* msg_idp
, 
1450     mach_msg_max_trailer_t 
* msg_trailerp
, 
1457         res 
= ipc_mqueue_peek_locked(mq
, seqnop
, msg_sizep
, msg_idp
, 
1458             msg_trailerp
, kmsgp
); 
1465  *      Routine:        ipc_mqueue_release_peek_ref 
1467  *              Release the reference on an mqueue's associated port which was 
1468  *              granted to a thread in ipc_mqueue_peek_on_thread (on the 
1469  *              MACH_PEEK_MSG thread wakeup path). 
1472  *              The ipc_mqueue_t should be locked on entry. 
1473  *              The ipc_mqueue_t will be _unlocked_ on return 
1474  *                      (and potentially invalid!) 
1478 ipc_mqueue_release_peek_ref(ipc_mqueue_t mq
) 
1480         assert(!imq_is_set(mq
)); 
1484          * clear any preposts this mq may have generated 
1485          * (which would cause subsequent immediate wakeups) 
1487         waitq_clear_prepost_locked(&mq
->imq_wait_queue
); 
1492          * release the port reference: we need to do this outside the lock 
1493          * because we might be holding the last port reference! 
1499  * peek at the contained port message queues, break prepost iteration as soon 
1500  * as we spot a message on one of the message queues referenced by the set's 
1501  * prepost list.  No need to lock each message queue, as only the head of each 
1502  * queue is checked. If a message wasn't there before we entered here, no need 
1503  * to find it (if we do, great). 
1506 mqueue_peek_iterator(void *ctx
, struct waitq 
*waitq
, 
1507     struct waitq_set 
*wqset
) 
1509         ipc_mqueue_t port_mq 
= (ipc_mqueue_t
)waitq
; 
1510         ipc_kmsg_queue_t kmsgs 
= &port_mq
->imq_messages
; 
1515         if (ipc_kmsg_queue_first(kmsgs
) != IKM_NULL
) { 
1516                 return WQ_ITERATE_BREAK
; /* break out of the prepost iteration */ 
1518         return WQ_ITERATE_CONTINUE
; 
1522  *      Routine:        ipc_mqueue_set_peek 
1524  *              Peek at a message queue set to see if it has any ports 
1528  *              Locks may be held by callers, so this routine cannot block. 
1529  *              Caller holds reference on the message queue. 
1532 ipc_mqueue_set_peek(ipc_mqueue_t mq
) 
1539          * We may have raced with port destruction where the mqueue is marked 
1540          * as invalid. In that case, even though we don't have messages, we 
1541          * have an end-of-life event to deliver. 
1543         if (!imq_is_valid(mq
)) { 
1547         ret 
= waitq_set_iterate_preposts(&mq
->imq_set_queue
, NULL
, 
1548             mqueue_peek_iterator
); 
1552         return ret 
== WQ_ITERATE_BREAK
; 
1556  *      Routine:        ipc_mqueue_set_gather_member_names 
1558  *              Discover all ports which are members of a given port set. 
1559  *              Because the waitq linkage mechanism was redesigned to save 
1560  *              significan amounts of memory, it no longer keeps back-pointers 
1561  *              from a port set to a port. Therefore, we must iterate over all 
1562  *              ports within a given IPC space and individually query them to 
1563  *              see if they are members of the given set. Port names of ports 
1564  *              found to be members of the given set will be gathered into the 
1565  *              provided 'names' array.  Actual returned names are limited to 
1566  *              maxnames entries, but we keep counting the actual number of 
1567  *              members to let the caller decide to retry if necessary. 
1570  *              Locks may be held by callers, so this routine cannot block. 
1571  *              Caller holds reference on the message queue (via port set). 
1574 ipc_mqueue_set_gather_member_names( 
1576         ipc_mqueue_t set_mq
, 
1577         ipc_entry_num_t maxnames
, 
1578         mach_port_name_t 
*names
, 
1579         ipc_entry_num_t 
*actualp
) 
1582         ipc_entry_num_t tsize
; 
1583         struct waitq_set 
*wqset
; 
1584         ipc_entry_num_t actual 
= 0; 
1586         assert(set_mq 
!= IMQ_NULL
); 
1587         wqset 
= &set_mq
->imq_set_queue
; 
1589         assert(space 
!= IS_NULL
); 
1590         is_read_lock(space
); 
1591         if (!is_active(space
)) { 
1592                 is_read_unlock(space
); 
1596         if (!waitq_set_is_valid(wqset
)) { 
1597                 is_read_unlock(space
); 
1601         table 
= space
->is_table
; 
1602         tsize 
= space
->is_table_size
; 
1603         for (ipc_entry_num_t idx 
= 0; idx 
< tsize
; idx
++) { 
1604                 ipc_entry_t entry 
= &table
[idx
]; 
1606                 /* only receive rights can be members of port sets */ 
1607                 if ((entry
->ie_bits 
& MACH_PORT_TYPE_RECEIVE
) != MACH_PORT_TYPE_NONE
) { 
1608                         ipc_port_t port 
= ip_object_to_port(entry
->ie_object
); 
1609                         ipc_mqueue_t mq 
= &port
->ip_messages
; 
1611                         assert(IP_VALID(port
)); 
1612                         if (ip_active(port
) && 
1613                             waitq_member(&mq
->imq_wait_queue
, wqset
)) { 
1614                                 if (actual 
< maxnames
) { 
1615                                         names
[actual
] = mq
->imq_receiver_name
; 
1622         is_read_unlock(space
); 
1630  *      Routine:        ipc_mqueue_destroy_locked 
1632  *              Destroy a (non-set) message queue. 
1633  *              Set any blocked senders running. 
1634  *              Destroy the kmsgs in the queue. 
1637  *              Receivers were removed when the receive right was "changed" 
1640 ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue
) 
1642         ipc_kmsg_queue_t kmqueue
; 
1644         boolean_t reap 
= FALSE
; 
1645         struct turnstile 
*send_turnstile 
= port_send_turnstile(ip_from_mq(mqueue
)); 
1647         assert(!imq_is_set(mqueue
)); 
1650          *      rouse all blocked senders 
1651          *      (don't boost anyone - we're tearing this queue down) 
1654         mqueue
->imq_fullwaiters 
= FALSE
; 
1656         if (send_turnstile 
!= TURNSTILE_NULL
) { 
1657                 waitq_wakeup64_all(&send_turnstile
->ts_waitq
, 
1660                     WAITQ_ALL_PRIORITIES
); 
1664          * Move messages from the specified queue to the per-thread 
1665          * clean/drain queue while we have the mqueue lock. 
1667         kmqueue 
= &mqueue
->imq_messages
; 
1668         while ((kmsg 
= ipc_kmsg_dequeue(kmqueue
)) != IKM_NULL
) { 
1670                 if (MACH_NODE_VALID(kmsg
->ikm_node
) && FPORT_VALID(mqueue
->imq_fport
)) { 
1671                         flipc_msg_ack(kmsg
->ikm_node
, mqueue
, TRUE
); 
1675                 first 
= ipc_kmsg_delayed_destroy(kmsg
); 
1682          * Wipe out message count, both for messages about to be 
1683          * reaped and for reserved space for (previously) woken senders. 
1684          * This is the indication to them that their reserved space is gone 
1685          * (the mqueue was destroyed). 
1687         mqueue
->imq_msgcount 
= 0; 
1689         /* invalidate the waitq for subsequent mqueue operations */ 
1690         waitq_invalidate_locked(&mqueue
->imq_wait_queue
); 
1692         /* clear out any preposting we may have done */ 
1693         waitq_clear_prepost_locked(&mqueue
->imq_wait_queue
); 
1696          * assert that we are destroying / invalidating a queue that's 
1697          * not a member of any other queue. 
1699         assert(mqueue
->imq_preposts 
== 0); 
1700         assert(mqueue
->imq_in_pset 
== 0); 
1706  *      Routine:        ipc_mqueue_set_qlimit 
1708  *              Changes a message queue limit; the maximum number 
1709  *              of messages which may be queued. 
1715 ipc_mqueue_set_qlimit( 
1716         ipc_mqueue_t                   mqueue
, 
1717         mach_port_msgcount_t   qlimit
) 
1719         assert(qlimit 
<= MACH_PORT_QLIMIT_MAX
); 
1721         /* wake up senders allowed by the new qlimit */ 
1723         if (qlimit 
> mqueue
->imq_qlimit
) { 
1724                 mach_port_msgcount_t i
, wakeup
; 
1725                 struct turnstile 
*send_turnstile 
= port_send_turnstile(ip_from_mq(mqueue
)); 
1727                 /* caution: wakeup, qlimit are unsigned */ 
1728                 wakeup 
= qlimit 
- mqueue
->imq_qlimit
; 
1730                 for (i 
= 0; i 
< wakeup
; i
++) { 
1732                          * boost the priority of the awoken thread 
1733                          * (WAITQ_PROMOTE_PRIORITY) to ensure it uses 
1734                          * the message queue slot we've just reserved. 
1736                          * NOTE: this will never prepost 
1738                         if (send_turnstile 
== TURNSTILE_NULL 
|| 
1739                             waitq_wakeup64_one(&send_turnstile
->ts_waitq
, 
1742                             WAITQ_PROMOTE_PRIORITY
) == KERN_NOT_WAITING
) { 
1743                                 mqueue
->imq_fullwaiters 
= FALSE
; 
1746                         mqueue
->imq_msgcount
++;  /* give it to the awakened thread */ 
1749         mqueue
->imq_qlimit 
= (uint16_t)qlimit
; 
1754  *      Routine:        ipc_mqueue_set_seqno 
1756  *              Changes an mqueue's sequence number. 
1758  *              Caller holds a reference to the queue's containing object. 
1761 ipc_mqueue_set_seqno( 
1762         ipc_mqueue_t            mqueue
, 
1763         mach_port_seqno_t       seqno
) 
1766         mqueue
->imq_seqno 
= seqno
; 
1772  *      Routine:        ipc_mqueue_copyin 
1774  *              Convert a name in a space to a message queue. 
1776  *              Nothing locked.  If successful, the caller gets a ref for 
1777  *              for the object. This ref ensures the continued existence of 
1780  *              MACH_MSG_SUCCESS        Found a message queue. 
1781  *              MACH_RCV_INVALID_NAME   The space is dead. 
1782  *              MACH_RCV_INVALID_NAME   The name doesn't denote a right. 
1783  *              MACH_RCV_INVALID_NAME 
1784  *                      The denoted right is not receive or port set. 
1785  *              MACH_RCV_IN_SET         Receive right is a member of a set. 
1791         mach_port_name_t        name
, 
1792         ipc_mqueue_t            
*mqueuep
, 
1793         ipc_object_t            
*objectp
) 
1796         ipc_entry_bits_t bits
; 
1797         ipc_object_t object
; 
1798         ipc_mqueue_t mqueue
; 
1800         is_read_lock(space
); 
1801         if (!is_active(space
)) { 
1802                 is_read_unlock(space
); 
1803                 return MACH_RCV_INVALID_NAME
; 
1806         entry 
= ipc_entry_lookup(space
, name
); 
1807         if (entry 
== IE_NULL
) { 
1808                 is_read_unlock(space
); 
1809                 return MACH_RCV_INVALID_NAME
; 
1812         bits 
= entry
->ie_bits
; 
1813         object 
= entry
->ie_object
; 
1815         if (bits 
& MACH_PORT_TYPE_RECEIVE
) { 
1816                 ipc_port_t port 
= ip_object_to_port(object
); 
1818                 assert(port 
!= IP_NULL
); 
1821                 require_ip_active(port
); 
1822                 assert(port
->ip_receiver_name 
== name
); 
1823                 assert(port
->ip_receiver 
== space
); 
1824                 is_read_unlock(space
); 
1825                 mqueue 
= &port
->ip_messages
; 
1826         } else if (bits 
& MACH_PORT_TYPE_PORT_SET
) { 
1827                 ipc_pset_t pset 
= ips_object_to_pset(object
); 
1829                 assert(pset 
!= IPS_NULL
); 
1832                 assert(ips_active(pset
)); 
1833                 is_read_unlock(space
); 
1835                 mqueue 
= &pset
->ips_messages
; 
1837                 is_read_unlock(space
); 
1838                 /* guard exception if we never held the receive right in this entry */ 
1839                 if ((bits 
& MACH_PORT_TYPE_EX_RECEIVE
) == 0) { 
1840                         mach_port_guard_exception(name
, 0, 0, kGUARD_EXC_RCV_INVALID_NAME
); 
1842                 return MACH_RCV_INVALID_NAME
; 
1846          *      At this point, the object is locked and active, 
1847          *      the space is unlocked, and mqueue is initialized. 
1850         io_reference(object
); 
1855         return MACH_MSG_SUCCESS
; 
1859 imq_lock(ipc_mqueue_t mq
) 
1861         ipc_object_t object 
= imq_to_object(mq
); 
1862         ipc_object_validate(object
); 
1863         waitq_lock(&(mq
)->imq_wait_queue
); 
1867 imq_lock_try(ipc_mqueue_t mq
) 
1869         ipc_object_t object 
= imq_to_object(mq
); 
1870         ipc_object_validate(object
); 
1871         return waitq_lock_try(&(mq
)->imq_wait_queue
);