]> git.saurik.com Git - apple/libdispatch.git/blobdiff - src/firehose/firehose_server.c
libdispatch-913.30.4.tar.gz
[apple/libdispatch.git] / src / firehose / firehose_server.c
index a6be2fab75698bc10b0f6908f9a75e6e4f607fa6..ba335dbe30631088c93a7e3edc75c763643d78b5 100644 (file)
@@ -31,6 +31,11 @@ _Static_assert(offsetof(struct firehose_client_s, fc_mem_sent_flushed_pos)
                % 8 == 0, "Make sure atomic fields are properly aligned");
 #endif
 
+typedef struct fs_client_queue_s {
+       struct firehose_client_s *volatile fs_client_head;
+       struct firehose_client_s *volatile fs_client_tail;
+} fs_client_queue_s, *fs_client_queue_t;
+
 static struct firehose_server_s {
        mach_port_t                     fs_bootstrap_port;
        dispatch_mach_t         fs_mach_channel;
@@ -41,24 +46,161 @@ static struct firehose_server_s {
        firehose_handler_t      fs_handler;
 
        firehose_snapshot_t fs_snapshot;
-
        int                                     fs_kernel_fd;
        firehose_client_t       fs_kernel_client;
 
        TAILQ_HEAD(, firehose_client_s) fs_clients;
+       os_unfair_lock      fs_clients_lock;
+       fs_client_queue_s       fs_queues[4];
+       dispatch_source_t       fs_sources[4];
 } server_config = {
        .fs_clients = TAILQ_HEAD_INITIALIZER(server_config.fs_clients),
+       .fs_clients_lock = OS_UNFAIR_LOCK_INIT,
        .fs_kernel_fd = -1,
 };
 
-#pragma mark -
-#pragma mark firehose client state machine
+OS_ALWAYS_INLINE
+static inline void
+fs_clients_lock(void)
+{
+       os_unfair_lock_lock_with_options(&server_config.fs_clients_lock,
+                       OS_UNFAIR_LOCK_DATA_SYNCHRONIZATION);
+}
+
+OS_ALWAYS_INLINE
+static inline void
+fs_clients_unlock(void)
+{
+       os_unfair_lock_unlock(&server_config.fs_clients_lock);
+}
 
 static void firehose_server_demux(firehose_client_t fc,
                mach_msg_header_t *msg_hdr);
 static void firehose_client_cancel(firehose_client_t fc);
 static void firehose_client_snapshot_finish(firehose_client_t fc,
                firehose_snapshot_t snapshot, bool for_io);
+static void firehose_client_handle_death(void *ctxt);
+
+#pragma mark -
+#pragma mark firehose client enqueueing
+
+OS_ALWAYS_INLINE
+static inline bool
+fs_idx_is_for_io(size_t idx)
+{
+       return idx & 1;
+}
+
+OS_ALWAYS_INLINE
+static inline bool
+fs_queue_is_for_io(fs_client_queue_t q)
+{
+       return (q - server_config.fs_queues) & 1;
+}
+
+OS_ALWAYS_INLINE
+static inline bool
+fs_queue_is_for_quarantined(fs_client_queue_t q)
+{
+       return (q - server_config.fs_queues) & 2;
+}
+
+OS_ALWAYS_INLINE
+static inline fs_client_queue_t
+fs_queue(bool quarantined, bool for_io)
+{
+       return &server_config.fs_queues[quarantined * 2 + for_io];
+}
+
+OS_ALWAYS_INLINE
+static inline dispatch_source_t
+fs_source(bool quarantined, bool for_io)
+{
+       return server_config.fs_sources[quarantined * 2 + for_io];
+}
+
+OS_ALWAYS_INLINE
+static inline void
+firehose_client_push(firehose_client_t fc, pthread_priority_t pp,
+               bool quarantined, bool for_io)
+{
+       fs_client_queue_t queue = fs_queue(quarantined, for_io);
+       if (fc && os_mpsc_push_update_tail(queue, fs_client, fc, fc_next[for_io])) {
+               os_mpsc_push_update_head(queue, fs_client, fc);
+               _dispatch_source_merge_data(fs_source(quarantined, for_io), pp, 1);
+       } else if (pp) {
+               _dispatch_source_merge_data(fs_source(quarantined, for_io), pp, 1);
+       }
+}
+
+OS_ALWAYS_INLINE
+static inline bool
+firehose_client_wakeup(firehose_client_t fc, pthread_priority_t pp,
+               bool for_io)
+{
+       uintptr_t canceled_bit = FC_STATE_CANCELED(for_io);
+       uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io);
+       uintptr_t old_state, new_state;
+
+       os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, {
+               if (old_state & canceled_bit) {
+                       os_atomic_rmw_loop_give_up(return false);
+               }
+               if (old_state & enqueued_bit) {
+                       os_atomic_rmw_loop_give_up(break);
+               }
+               new_state = old_state | enqueued_bit;
+       });
+       firehose_client_push(old_state & enqueued_bit ? NULL : fc, pp,
+                       fc->fc_quarantined, for_io);
+       return true;
+}
+
+OS_ALWAYS_INLINE
+static inline void
+firehose_client_start_cancel(firehose_client_t fc, bool for_io)
+{
+       uintptr_t canceling_bit = FC_STATE_CANCELING(for_io);
+       uintptr_t canceled_bit = FC_STATE_CANCELED(for_io);
+       uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io);
+       uintptr_t old_state, new_state;
+
+       os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, {
+               if (old_state & (canceled_bit | canceling_bit)) {
+                       os_atomic_rmw_loop_give_up(return);
+               }
+               new_state = old_state | enqueued_bit | canceling_bit;
+       });
+       firehose_client_push(old_state & enqueued_bit ? NULL : fc, 0,
+                       fc->fc_quarantined, for_io);
+}
+
+OS_ALWAYS_INLINE
+static inline bool
+firehose_client_dequeue(firehose_client_t fc, bool for_io)
+{
+       uintptr_t canceling_bit = FC_STATE_CANCELING(for_io);
+       uintptr_t canceled_bit = FC_STATE_CANCELED(for_io);
+       uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io);
+       uintptr_t old_state, new_state;
+
+       os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, {
+               new_state = old_state & ~(canceling_bit | enqueued_bit);
+               if (old_state & canceling_bit) {
+                       new_state |= canceled_bit;
+               }
+       });
+
+       if (((old_state ^ new_state) & FC_STATE_CANCELED_MASK) &&
+                       (new_state & FC_STATE_CANCELED_MASK) == FC_STATE_CANCELED_MASK) {
+               dispatch_async_f(server_config.fs_io_drain_queue, fc,
+                               firehose_client_handle_death);
+       }
+       return !(new_state & canceled_bit);
+}
+
+#pragma mark -
+#pragma mark firehose client state machine
 
 static void
 firehose_client_notify(firehose_client_t fc, mach_port_t reply_port)
@@ -74,15 +216,17 @@ firehose_client_notify(firehose_client_t fc, mach_port_t reply_port)
        firehose_atomic_max2o(fc, fc_io_sent_flushed_pos,
                        push_reply.fpr_io_flushed_pos, relaxed);
 
-       if (fc->fc_is_kernel) {
+       if (!fc->fc_pid) {
                if (ioctl(server_config.fs_kernel_fd, LOGFLUSHED, &push_reply) < 0) {
                        dispatch_assume_zero(errno);
                }
        } else {
                if (reply_port == fc->fc_sendp) {
-                       kr = firehose_send_push_notify_async(reply_port, push_reply, 0);
+                       kr = firehose_send_push_notify_async(reply_port, push_reply,
+                                       fc->fc_quarantined, 0);
                } else {
-                       kr = firehose_send_push_reply(reply_port, KERN_SUCCESS, push_reply);
+                       kr = firehose_send_push_reply(reply_port, KERN_SUCCESS, push_reply,
+                                       fc->fc_quarantined);
                }
                if (kr != MACH_SEND_INVALID_DEST) {
                        DISPATCH_VERIFY_MIG(kr);
@@ -104,18 +248,6 @@ firehose_client_acquire_head(firehose_buffer_t fb, bool for_io)
        return head;
 }
 
-OS_ALWAYS_INLINE
-static inline void
-firehose_client_push_async_merge(firehose_client_t fc, pthread_priority_t pp,
-               bool for_io)
-{
-       if (for_io) {
-               _dispatch_source_merge_data(fc->fc_io_source, pp, 1);
-       } else {
-               _dispatch_source_merge_data(fc->fc_mem_source, pp, 1);
-       }
-}
-
 OS_NOINLINE OS_COLD
 static void
 firehose_client_mark_corrupted(firehose_client_t fc, mach_port_t reply_port)
@@ -129,7 +261,7 @@ firehose_client_mark_corrupted(firehose_client_t fc, mach_port_t reply_port)
 
        if (reply_port) {
                kern_return_t kr = firehose_send_push_reply(reply_port, 0,
-                               FIREHOSE_PUSH_REPLY_CORRUPTED);
+                               FIREHOSE_PUSH_REPLY_CORRUPTED, false);
                DISPATCH_VERIFY_MIG(kr);
                dispatch_assume_zero(kr);
        }
@@ -154,10 +286,10 @@ firehose_client_snapshot_mark_done(firehose_client_t fc,
 
 OS_NOINLINE
 static void
-firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags)
+firehose_client_drain_one(firehose_client_t fc, mach_port_t port, uint32_t flags)
 {
        firehose_buffer_t fb = fc->fc_buffer;
-       firehose_buffer_chunk_t fbc;
+       firehose_chunk_t fbc;
        firehose_event_t evt;
        uint16_t volatile *fbh_ring;
        uint16_t flushed, ref, count = 0;
@@ -172,9 +304,7 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags)
                fbh_ring = fb->fb_header.fbh_io_ring;
                sent_flushed = (uint16_t)fc->fc_io_sent_flushed_pos;
                flushed = (uint16_t)fc->fc_io_flushed_pos;
-               if (fc->fc_needs_io_snapshot) {
-                       snapshot = server_config.fs_snapshot;
-               }
+               if (fc->fc_needs_io_snapshot) snapshot = server_config.fs_snapshot;
        } else {
                evt = FIREHOSE_EVENT_MEM_BUFFER_RECEIVED;
                _Static_assert(FIREHOSE_EVENT_MEM_BUFFER_RECEIVED ==
@@ -182,9 +312,7 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags)
                fbh_ring = fb->fb_header.fbh_mem_ring;
                sent_flushed = (uint16_t)fc->fc_mem_sent_flushed_pos;
                flushed = (uint16_t)fc->fc_mem_flushed_pos;
-               if (fc->fc_needs_mem_snapshot) {
-                       snapshot = server_config.fs_snapshot;
-               }
+               if (fc->fc_needs_mem_snapshot) snapshot = server_config.fs_snapshot;
        }
 
        if (slowpath(fc->fc_memory_corrupted)) {
@@ -209,7 +337,7 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags)
                        ref = (flushed + count) & FIREHOSE_RING_POS_IDX_MASK;
                        ref = os_atomic_load(&fbh_ring[ref], relaxed);
                        ref &= FIREHOSE_RING_POS_IDX_MASK;
-               } while (fc->fc_is_kernel && !ref);
+               } while (!fc->fc_pid && !ref);
                count++;
                if (!ref) {
                        _dispatch_debug("Ignoring invalid page reference in ring: %d", ref);
@@ -217,10 +345,17 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags)
                }
 
                fbc = firehose_buffer_ref_to_chunk(fb, ref);
+               if (fbc->fc_pos.fcp_stream == firehose_stream_metadata) {
+                       // serialize with firehose_client_metadata_stream_peek
+                       os_unfair_lock_lock(&fc->fc_lock);
+               }
                server_config.fs_handler(fc, evt, fbc);
                if (slowpath(snapshot)) {
                        snapshot->handler(fc, evt, fbc);
                }
+               if (fbc->fc_pos.fcp_stream == firehose_stream_metadata) {
+                       os_unfair_lock_unlock(&fc->fc_lock);
+               }
                // clients not using notifications (single threaded) always drain fully
                // because they use all their limit, always
        } while (!fc->fc_use_notifs || count < DRAIN_BATCH_SIZE || snapshot);
@@ -238,7 +373,7 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags)
                        client_flushed = os_atomic_load2o(&fb->fb_header,
                                fbh_ring_tail.frp_mem_flushed, relaxed);
                }
-               if (fc->fc_is_kernel) {
+               if (!fc->fc_pid) {
                        // will fire firehose_client_notify() because port is MACH_PORT_DEAD
                        port = fc->fc_sendp;
                } else if (!port && client_flushed == sent_flushed && fc->fc_use_notifs) {
@@ -253,7 +388,7 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags)
        if (port) {
                firehose_client_notify(fc, port);
        }
-       if (fc->fc_is_kernel) {
+       if (!fc->fc_pid) {
                if (!(flags & FIREHOSE_DRAIN_POLL)) {
                        // see firehose_client_kernel_source_handle_event
                        dispatch_resume(fc->fc_kernel_source);
@@ -264,12 +399,12 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags)
                        // and there's more to drain, so optimistically schedule draining
                        // again this is cheap since the queue is hot, and is fair for other
                        // clients
-                       firehose_client_push_async_merge(fc, 0, for_io);
+                       firehose_client_wakeup(fc, 0, for_io);
                }
                if (count && server_config.fs_kernel_client) {
                        // the kernel is special because it can drop messages, so if we're
                        // draining, poll the kernel each time while we're bound to a thread
-                       firehose_client_drain(server_config.fs_kernel_client,
+                       firehose_client_drain_one(server_config.fs_kernel_client,
                                        MACH_PORT_NULL, flags | FIREHOSE_DRAIN_POLL);
                }
        }
@@ -283,21 +418,37 @@ corrupt:
        // from now on all IO/mem drains depending on `for_io` will be no-op
        // (needs_<for_io>_snapshot: false, memory_corrupted: true). we can safely
        // silence the corresponding source of drain wake-ups.
-       if (!fc->fc_is_kernel) {
-               dispatch_source_cancel(for_io ? fc->fc_io_source : fc->fc_mem_source);
+       if (fc->fc_pid) {
+               firehose_client_start_cancel(fc, for_io);
        }
 }
 
 static void
-firehose_client_drain_io_async(void *ctx)
-{
-       firehose_client_drain(ctx, MACH_PORT_NULL, FIREHOSE_DRAIN_FOR_IO);
-}
-
-static void
-firehose_client_drain_mem_async(void *ctx)
+firehose_client_drain(void *ctxt)
 {
-       firehose_client_drain(ctx, MACH_PORT_NULL, 0);
+       fs_client_queue_t queue = ctxt;
+       bool for_io = fs_queue_is_for_io(queue);
+       bool quarantined = fs_queue_is_for_quarantined(queue);
+       firehose_client_t fc, fc_next;
+       size_t clients = 0;
+
+       while (queue->fs_client_tail) {
+               fc = os_mpsc_get_head(queue, fs_client);
+               do {
+                       fc_next = os_mpsc_pop_head(queue, fs_client, fc, fc_next[for_io]);
+                       if (firehose_client_dequeue(fc, for_io)) {
+                               firehose_client_drain_one(fc, MACH_PORT_NULL,
+                                               for_io ? FIREHOSE_DRAIN_FOR_IO : 0);
+                       }
+                       // process quarantined clients 4 times as slow as the other ones
+                       // also reasyncing every 4 clients allows for discovering
+                       // quarantined suspension faster
+                       if (++clients == (quarantined ? 1 : 4)) {
+                               dispatch_source_merge_data(fs_source(quarantined, for_io), 1);
+                               return;
+                       }
+               } while ((fc = fc_next));
+       }
 }
 
 OS_NOINLINE
@@ -326,7 +477,12 @@ firehose_client_finalize(firehose_client_t fc OS_OBJECT_CONSUMED)
        }
        server_config.fs_handler(fc, FIREHOSE_EVENT_CLIENT_DIED, NULL);
 
+       fs_clients_lock();
        TAILQ_REMOVE(&server_config.fs_clients, fc, fc_entry);
+       fs_clients_unlock();
+
+       dispatch_release(fc->fc_mach_channel);
+       fc->fc_mach_channel = NULL;
        fc->fc_entry.tqe_next = DISPATCH_OBJECT_LISTLESS;
        fc->fc_entry.tqe_prev = DISPATCH_OBJECT_LISTLESS;
        _os_object_release(&fc->fc_as_os_object);
@@ -383,26 +539,26 @@ firehose_client_handle_death(void *ctxt)
        // Then look at all the allocated pages not seen in the ring
        while (bitmap) {
                uint16_t ref = firehose_bitmap_first_set(bitmap);
-               firehose_buffer_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref);
-               uint16_t fbc_length = fbc->fbc_pos.fbc_next_entry_offs;
+               firehose_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref);
+               uint16_t fbc_length = fbc->fc_pos.fcp_next_entry_offs;
 
                bitmap &= ~(1ULL << ref);
-               if (fbc->fbc_start + fbc_length <= fbc->fbc_data) {
+               if (fbc->fc_start + fbc_length <= fbc->fc_data) {
                        // this page has its "recycle-requeue" done, but hasn't gone
                        // through "recycle-reuse", or it has no data, ditch it
                        continue;
                }
-               if (!((firehose_tracepoint_t)fbc->fbc_data)->ft_length) {
+               if (!((firehose_tracepoint_t)fbc->fc_data)->ft_length) {
                        // this thing has data, but the first tracepoint is unreadable
                        // so also just ditch it
                        continue;
                }
-               if (!fbc->fbc_pos.fbc_flag_io) {
+               if (!fbc->fc_pos.fcp_flag_io) {
                        mem_bitmap |= 1ULL << ref;
                        continue;
                }
                server_config.fs_handler(fc, FIREHOSE_EVENT_IO_BUFFER_RECEIVED, fbc);
-               if (fc->fc_needs_io_snapshot && snapshot) {
+               if (fc->fc_needs_io_snapshot) {
                        snapshot->handler(fc, FIREHOSE_SNAPSHOT_EVENT_IO_BUFFER, fbc);
                }
        }
@@ -416,11 +572,11 @@ firehose_client_handle_death(void *ctxt)
 
                while (mem_bitmap_copy) {
                        uint16_t ref = firehose_bitmap_first_set(mem_bitmap_copy);
-                       firehose_buffer_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref);
+                       firehose_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref);
 
                        mem_bitmap_copy &= ~(1ULL << ref);
                        server_config.fs_handler(fc, FIREHOSE_EVENT_MEM_BUFFER_RECEIVED, fbc);
-                       if (fc->fc_needs_mem_snapshot && snapshot) {
+                       if (fc->fc_needs_mem_snapshot) {
                                snapshot->handler(fc, FIREHOSE_SNAPSHOT_EVENT_MEM_BUFFER, fbc);
                        }
                }
@@ -434,18 +590,13 @@ static void
 firehose_client_handle_mach_event(void *ctx, dispatch_mach_reason_t reason,
                dispatch_mach_msg_t dmsg, mach_error_t error OS_UNUSED)
 {
-       mach_msg_header_t *msg_hdr;
+       mach_msg_header_t *msg_hdr = NULL;
        firehose_client_t fc = ctx;
-       mach_port_t oldsendp, oldrecvp;
-
-       if (dmsg) {
-               msg_hdr = dispatch_mach_msg_get_msg(dmsg, NULL);
-               oldsendp = msg_hdr->msgh_remote_port;
-               oldrecvp = msg_hdr->msgh_local_port;
-       }
+       mach_port_t port;
 
        switch (reason) {
        case DISPATCH_MACH_MESSAGE_RECEIVED:
+               msg_hdr = dispatch_mach_msg_get_msg(dmsg, NULL);
                if (msg_hdr->msgh_id == MACH_NOTIFY_NO_SENDERS) {
                        _dispatch_debug("FIREHOSE NO_SENDERS (unique_pid: 0x%llx)",
                                        firehose_client_get_unique_pid(fc, NULL));
@@ -456,25 +607,33 @@ firehose_client_handle_mach_event(void *ctx, dispatch_mach_reason_t reason,
                break;
 
        case DISPATCH_MACH_DISCONNECTED:
-               if (oldsendp) {
-                       if (slowpath(oldsendp != fc->fc_sendp)) {
-                               DISPATCH_INTERNAL_CRASH(oldsendp,
-                                               "disconnect event about unknown send-right");
+               msg_hdr = dispatch_mach_msg_get_msg(dmsg, NULL);
+               port = msg_hdr->msgh_remote_port;
+               if (MACH_PORT_VALID(port)) {
+                       if (port != fc->fc_sendp) {
+                               DISPATCH_INTERNAL_CRASH(port, "Unknown send-right");
                        }
                        firehose_mach_port_send_release(fc->fc_sendp);
                        fc->fc_sendp = MACH_PORT_NULL;
                }
-               if (oldrecvp) {
-                       if (slowpath(oldrecvp != fc->fc_recvp)) {
-                               DISPATCH_INTERNAL_CRASH(oldrecvp,
-                                               "disconnect event about unknown receive-right");
+               port = msg_hdr->msgh_local_port;
+               if (MACH_PORT_VALID(port)) {
+                       if (port != fc->fc_recvp) {
+                               DISPATCH_INTERNAL_CRASH(port, "Unknown recv-right");
                        }
                        firehose_mach_port_recv_dispose(fc->fc_recvp, fc);
                        fc->fc_recvp = MACH_PORT_NULL;
                }
-               if (fc->fc_recvp == MACH_PORT_NULL && fc->fc_sendp == MACH_PORT_NULL) {
-                       firehose_client_cancel(fc);
+               break;
+
+       case DISPATCH_MACH_CANCELED:
+               if (MACH_PORT_VALID(fc->fc_sendp)) {
+                       DISPATCH_INTERNAL_CRASH(fc->fc_sendp, "send-right leak");
+               }
+               if (MACH_PORT_VALID(fc->fc_recvp)) {
+                       DISPATCH_INTERNAL_CRASH(fc->fc_recvp, "recv-right leak");
                }
+               firehose_client_cancel(fc);
                break;
        }
 }
@@ -488,10 +647,8 @@ firehose_client_kernel_source_handle_event(void *ctxt)
        // resumed in firehose_client_drain for both memory and I/O
        dispatch_suspend(fc->fc_kernel_source);
        dispatch_suspend(fc->fc_kernel_source);
-       dispatch_async_f(server_config.fs_mem_drain_queue,
-                       fc, firehose_client_drain_mem_async);
-       dispatch_async_f(server_config.fs_io_drain_queue,
-                       fc, firehose_client_drain_io_async);
+       firehose_client_wakeup(fc, 0, false);
+       firehose_client_wakeup(fc, 0, true);
 }
 #endif
 
@@ -500,41 +657,37 @@ firehose_client_resume(firehose_client_t fc,
                const struct firehose_client_connected_info_s *fcci)
 {
        dispatch_assert_queue(server_config.fs_io_drain_queue);
+
+       fs_clients_lock();
        TAILQ_INSERT_TAIL(&server_config.fs_clients, fc, fc_entry);
+       fs_clients_unlock();
+
        server_config.fs_handler(fc, FIREHOSE_EVENT_CLIENT_CONNECTED, (void *)fcci);
-       if (fc->fc_is_kernel) {
+       if (!fc->fc_pid) {
                dispatch_activate(fc->fc_kernel_source);
        } else {
                dispatch_mach_connect(fc->fc_mach_channel,
                                fc->fc_recvp, fc->fc_sendp, NULL);
-               dispatch_activate(fc->fc_io_source);
-               dispatch_activate(fc->fc_mem_source);
        }
 }
 
 static void
 firehose_client_cancel(firehose_client_t fc)
 {
-       dispatch_mach_t dm;
-       dispatch_block_t block;
-
        _dispatch_debug("client died (unique_pid: 0x%llx",
                        firehose_client_get_unique_pid(fc, NULL));
 
-       dm = fc->fc_mach_channel;
-       fc->fc_mach_channel = NULL;
-       dispatch_release(dm);
-
+       if (MACH_PORT_VALID(fc->fc_sendp)) {
+               firehose_mach_port_send_release(fc->fc_sendp);
+               fc->fc_sendp = MACH_PORT_NULL;
+       }
+       if (MACH_PORT_VALID(fc->fc_recvp)) {
+               firehose_mach_port_recv_dispose(fc->fc_recvp, fc);
+               fc->fc_recvp = MACH_PORT_NULL;
+       }
        fc->fc_use_notifs = false;
-       dispatch_source_cancel(fc->fc_io_source);
-       dispatch_source_cancel(fc->fc_mem_source);
-
-       block = dispatch_block_create(DISPATCH_BLOCK_DETACHED, ^{
-               dispatch_async_f(server_config.fs_io_drain_queue, fc,
-                               firehose_client_handle_death);
-       });
-       dispatch_async(server_config.fs_mem_drain_queue, block);
-       _Block_release(block);
+       firehose_client_start_cancel(fc, false);
+       firehose_client_start_cancel(fc, true);
 }
 
 static firehose_client_t
@@ -552,32 +705,30 @@ _firehose_client_create(firehose_buffer_t fb)
        return fc;
 }
 
+#pragma pack(4)
+typedef struct firehose_token_s {
+       uid_t auid;
+       uid_t euid;
+       gid_t egid;
+       uid_t ruid;
+       gid_t rgid;
+       pid_t pid;
+       au_asid_t asid;
+       dev_t execcnt;
+} *firehose_token_t;
+#pragma pack()
+
 static firehose_client_t
-firehose_client_create(firehose_buffer_t fb,
+firehose_client_create(firehose_buffer_t fb, firehose_token_t token,
                mach_port_t comm_recvp, mach_port_t comm_sendp)
 {
        uint64_t unique_pid = fb->fb_header.fbh_uniquepid;
        firehose_client_t fc = _firehose_client_create(fb);
        dispatch_mach_t dm;
-       dispatch_source_t ds;
 
-       ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0,
-                       server_config.fs_mem_drain_queue);
-       _os_object_retain_internal_inline(&fc->fc_as_os_object);
-       dispatch_set_context(ds, fc);
-       dispatch_set_finalizer_f(ds,
-                       (dispatch_function_t)_os_object_release_internal);
-       dispatch_source_set_event_handler_f(ds, firehose_client_drain_mem_async);
-       fc->fc_mem_source = ds;
-
-       ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0,
-                       server_config.fs_io_drain_queue);
-       _os_object_retain_internal_inline(&fc->fc_as_os_object);
-       dispatch_set_context(ds, fc);
-       dispatch_set_finalizer_f(ds,
-                       (dispatch_function_t)_os_object_release_internal);
-       dispatch_source_set_event_handler_f(ds, firehose_client_drain_io_async);
-       fc->fc_io_source = ds;
+       fc->fc_pid = token->pid ? token->pid : ~0;
+       fc->fc_euid = token->euid;
+       fc->fc_pidversion = token->execcnt;
 
        _dispatch_debug("FIREHOSE_REGISTER (unique_pid: 0x%llx)", unique_pid);
        fc->fc_recvp = comm_recvp;
@@ -617,12 +768,11 @@ firehose_kernel_client_create(void)
                DISPATCH_INTERNAL_CRASH(errno, "Unable to map kernel buffer");
        }
        if (fb_map.fbmi_size !=
-                       FIREHOSE_BUFFER_KERNEL_CHUNK_COUNT * FIREHOSE_BUFFER_CHUNK_SIZE) {
+                       FIREHOSE_BUFFER_KERNEL_CHUNK_COUNT * FIREHOSE_CHUNK_SIZE) {
                DISPATCH_INTERNAL_CRASH(fb_map.fbmi_size, "Unexpected kernel buffer size");
        }
 
        fc = _firehose_client_create((firehose_buffer_t)(uintptr_t)fb_map.fbmi_addr);
-       fc->fc_is_kernel = true;
        ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, (uintptr_t)fd, 0,
                        fs->fs_ipc_queue);
        dispatch_set_context(ds, fc);
@@ -651,24 +801,27 @@ _firehose_client_xref_dispose(firehose_client_t fc)
 {
        _dispatch_debug("Cleaning up client info for unique_pid 0x%llx",
                        firehose_client_get_unique_pid(fc, NULL));
-
-       dispatch_release(fc->fc_io_source);
-       fc->fc_io_source = NULL;
-
-       dispatch_release(fc->fc_mem_source);
-       fc->fc_mem_source = NULL;
 }
 
 uint64_t
 firehose_client_get_unique_pid(firehose_client_t fc, pid_t *pid_out)
 {
        firehose_buffer_header_t fbh = &fc->fc_buffer->fb_header;
-       if (fc->fc_is_kernel) {
-               if (pid_out) *pid_out = 0;
-               return 0;
-       }
-       if (pid_out) *pid_out = fbh->fbh_pid ?: ~(pid_t)0;
-       return fbh->fbh_uniquepid ?: ~0ull;
+       if (pid_out) *pid_out = fc->fc_pid;
+       if (!fc->fc_pid) return 0;
+       return fbh->fbh_uniquepid ? fbh->fbh_uniquepid : ~0ull;
+}
+
+uid_t
+firehose_client_get_euid(firehose_client_t fc)
+{
+       return fc->fc_euid;
+}
+
+int
+firehose_client_get_pid_version(firehose_client_t fc)
+{
+       return fc->fc_pidversion;
 }
 
 void *
@@ -692,6 +845,12 @@ firehose_client_set_context(firehose_client_t fc, void *ctxt)
        return os_atomic_xchg2o(fc, fc_ctxt, ctxt, relaxed);
 }
 
+void
+firehose_client_initiate_quarantine(firehose_client_t fc)
+{
+       fc->fc_quarantined = true;
+}
+
 #pragma mark -
 #pragma mark firehose server
 
@@ -720,22 +879,24 @@ void
 firehose_server_init(mach_port_t comm_port, firehose_handler_t handler)
 {
        struct firehose_server_s *fs = &server_config;
-       dispatch_queue_attr_t attr;
+       dispatch_queue_attr_t attr = DISPATCH_QUEUE_SERIAL_WITH_AUTORELEASE_POOL;
+       dispatch_queue_attr_t attr_ui;
        dispatch_mach_t dm;
+       dispatch_source_t ds;
 
        // just reference the string so that it's captured
        (void)os_atomic_load(&__libfirehose_serverVersionString[0], relaxed);
 
-       attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL,
+       attr_ui = dispatch_queue_attr_make_with_qos_class(attr,
                        QOS_CLASS_USER_INITIATED, 0);
        fs->fs_ipc_queue = dispatch_queue_create_with_target(
-                       "com.apple.firehose.ipc", attr, NULL);
+                       "com.apple.firehose.ipc", attr_ui, NULL);
        fs->fs_snapshot_gate_queue = dispatch_queue_create_with_target(
-                       "com.apple.firehose.snapshot-gate", DISPATCH_QUEUE_SERIAL, NULL);
+                       "com.apple.firehose.snapshot-gate", attr, NULL);
        fs->fs_io_drain_queue = dispatch_queue_create_with_target(
-                       "com.apple.firehose.drain-io", DISPATCH_QUEUE_SERIAL, NULL);
+                       "com.apple.firehose.drain-io", attr, NULL);
        fs->fs_mem_drain_queue = dispatch_queue_create_with_target(
-                       "com.apple.firehose.drain-mem", DISPATCH_QUEUE_SERIAL, NULL);
+                       "com.apple.firehose.drain-mem", attr, NULL);
 
        dm = dispatch_mach_create_f("com.apple.firehose.listener",
                        fs->fs_ipc_queue, NULL, firehose_server_handle_mach_event);
@@ -743,6 +904,15 @@ firehose_server_init(mach_port_t comm_port, firehose_handler_t handler)
        fs->fs_mach_channel = dm;
        fs->fs_handler = _Block_copy(handler);
        firehose_kernel_client_create();
+
+       for (size_t i = 0; i < countof(fs->fs_sources); i++) {
+               ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0,
+                               fs_idx_is_for_io(i) ? server_config.fs_io_drain_queue :
+                               server_config.fs_mem_drain_queue);
+               dispatch_set_context(ds, &fs->fs_queues[i]);
+               dispatch_source_set_event_handler_f(ds, firehose_client_drain);
+               fs->fs_sources[i] = ds;
+       }
 }
 
 void
@@ -760,6 +930,17 @@ firehose_server_assert_spi_version(uint32_t spi_version)
        }
 }
 
+bool
+firehose_server_has_ever_flushed_pages(void)
+{
+       // Use the IO pages flushed count from the kernel client as an
+       // approximation for whether the firehose has ever flushed pages during
+       // this boot. logd uses this detect the first time it starts after a
+       // fresh boot.
+       firehose_client_t fhc = server_config.fs_kernel_client;
+       return !fhc || fhc->fc_io_flushed_pos > 0;
+}
+
 void
 firehose_server_resume(void)
 {
@@ -775,54 +956,115 @@ firehose_server_resume(void)
        }
        dispatch_mach_connect(fs->fs_mach_channel, fs->fs_bootstrap_port,
                        MACH_PORT_NULL, NULL);
+       for (size_t i = 0; i < countof(fs->fs_sources); i++) {
+               dispatch_activate(fs->fs_sources[i]);
+       }
 }
 
-#pragma mark -
-#pragma mark firehose snapshot and peeking
+void
+firehose_server_cancel(void)
+{
+       firehose_client_t fc;
+
+       dispatch_mach_cancel(server_config.fs_mach_channel);
+
+       fs_clients_lock();
+       TAILQ_FOREACH(fc, &server_config.fs_clients, fc_entry) {
+               dispatch_mach_cancel(fc->fc_mach_channel);
+       }
+       fs_clients_unlock();
+}
+
+dispatch_queue_t
+firehose_server_copy_queue(firehose_server_queue_t which)
+{
+       dispatch_queue_t dq;
+       switch (which) {
+       case FIREHOSE_SERVER_QUEUE_IO:
+               dq = server_config.fs_io_drain_queue;
+               break;
+       case FIREHOSE_SERVER_QUEUE_MEMORY:
+               dq = server_config.fs_mem_drain_queue;
+               break;
+       default:
+               DISPATCH_INTERNAL_CRASH(which, "Invalid firehose server queue type");
+       }
+       dispatch_retain(dq);
+       return dq;
+}
 
 void
-firehose_client_metadata_stream_peek(firehose_client_t fc,
-               firehose_event_t context, bool (^peek_should_start)(void),
-               bool (^peek)(firehose_buffer_chunk_t fbc))
+firehose_server_quarantined_suspend(firehose_server_queue_t which)
 {
-       if (context != FIREHOSE_EVENT_MEM_BUFFER_RECEIVED) {
-               return dispatch_sync(server_config.fs_mem_drain_queue, ^{
-                       firehose_client_metadata_stream_peek(fc,
-                                       FIREHOSE_EVENT_MEM_BUFFER_RECEIVED, peek_should_start, peek);
-               });
+       switch (which) {
+       case FIREHOSE_SERVER_QUEUE_IO:
+               dispatch_suspend(fs_source(true, true));
+               break;
+       case FIREHOSE_SERVER_QUEUE_MEMORY:
+               dispatch_suspend(fs_source(true, false));
+               break;
+       default:
+               DISPATCH_INTERNAL_CRASH(which, "Invalid firehose server queue type");
        }
+}
 
-       if (peek_should_start && !peek_should_start()) {
-               return;
+void
+firehose_server_quarantined_resume(firehose_server_queue_t which)
+{
+       switch (which) {
+       case FIREHOSE_SERVER_QUEUE_IO:
+               dispatch_resume(fs_source(true, true));
+               break;
+       case FIREHOSE_SERVER_QUEUE_MEMORY:
+               dispatch_resume(fs_source(true, false));
+               break;
+       default:
+               DISPATCH_INTERNAL_CRASH(which, "Invalid firehose server queue type");
        }
+}
 
-       firehose_buffer_t fb = fc->fc_buffer;
-       firehose_buffer_header_t fbh = &fb->fb_header;
-       uint64_t bitmap = fbh->fbh_bank.fbb_metadata_bitmap;
 
-       while (bitmap) {
-               uint16_t ref = firehose_bitmap_first_set(bitmap);
-               firehose_buffer_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref);
-               uint16_t fbc_length = fbc->fbc_pos.fbc_next_entry_offs;
+#pragma mark -
+#pragma mark firehose snapshot and peeking
 
-               bitmap &= ~(1ULL << ref);
-               if (fbc->fbc_start + fbc_length <= fbc->fbc_data) {
-                       // this page has its "recycle-requeue" done, but hasn't gone
-                       // through "recycle-reuse", or it has no data, ditch it
-                       continue;
-               }
-               if (!((firehose_tracepoint_t)fbc->fbc_data)->ft_length) {
-                       // this thing has data, but the first tracepoint is unreadable
-                       // so also just ditch it
-                       continue;
-               }
-               if (fbc->fbc_pos.fbc_stream != firehose_stream_metadata) {
-                       continue;
-               }
-               if (!peek(fbc)) {
-                       break;
+void
+firehose_client_metadata_stream_peek(firehose_client_t fc,
+               OS_UNUSED firehose_event_t context, bool (^peek_should_start)(void),
+               bool (^peek)(firehose_chunk_t fbc))
+{
+       os_unfair_lock_lock(&fc->fc_lock);
+
+       if (peek_should_start && peek_should_start()) {
+               firehose_buffer_t fb = fc->fc_buffer;
+               firehose_buffer_header_t fbh = &fb->fb_header;
+               uint64_t bitmap = fbh->fbh_bank.fbb_metadata_bitmap;
+
+               while (bitmap) {
+                       uint16_t ref = firehose_bitmap_first_set(bitmap);
+                       firehose_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref);
+                       uint16_t fbc_length = fbc->fc_pos.fcp_next_entry_offs;
+
+                       bitmap &= ~(1ULL << ref);
+                       if (fbc->fc_start + fbc_length <= fbc->fc_data) {
+                               // this page has its "recycle-requeue" done, but hasn't gone
+                               // through "recycle-reuse", or it has no data, ditch it
+                               continue;
+                       }
+                       if (!((firehose_tracepoint_t)fbc->fc_data)->ft_length) {
+                               // this thing has data, but the first tracepoint is unreadable
+                               // so also just ditch it
+                               continue;
+                       }
+                       if (fbc->fc_pos.fcp_stream != firehose_stream_metadata) {
+                               continue;
+                       }
+                       if (!peek(fbc)) {
+                               break;
+                       }
                }
        }
+
+       os_unfair_lock_unlock(&fc->fc_lock);
 }
 
 OS_NOINLINE OS_COLD
@@ -872,21 +1114,21 @@ firehose_client_snapshot_finish(firehose_client_t fc,
        // Then look at all the allocated pages not seen in the ring
        while (bitmap) {
                uint16_t ref = firehose_bitmap_first_set(bitmap);
-               firehose_buffer_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref);
-               uint16_t fbc_length = fbc->fbc_pos.fbc_next_entry_offs;
+               firehose_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref);
+               uint16_t fbc_length = fbc->fc_pos.fcp_next_entry_offs;
 
                bitmap &= ~(1ULL << ref);
-               if (fbc->fbc_start + fbc_length <= fbc->fbc_data) {
+               if (fbc->fc_start + fbc_length <= fbc->fc_data) {
                        // this page has its "recycle-requeue" done, but hasn't gone
                        // through "recycle-reuse", or it has no data, ditch it
                        continue;
                }
-               if (!((firehose_tracepoint_t)fbc->fbc_data)->ft_length) {
+               if (!((firehose_tracepoint_t)fbc->fc_data)->ft_length) {
                        // this thing has data, but the first tracepoint is unreadable
                        // so also just ditch it
                        continue;
                }
-               if (fbc->fbc_pos.fbc_flag_io != for_io) {
+               if (fbc->fc_pos.fcp_flag_io != for_io) {
                        continue;
                }
                snapshot->handler(fc, evt, fbc);
@@ -894,70 +1136,35 @@ firehose_client_snapshot_finish(firehose_client_t fc,
 }
 
 static void
-firehose_snapshot_start(void *ctxt)
+firehose_snapshot_tickle_clients(firehose_snapshot_t fs, bool for_io)
 {
-       firehose_snapshot_t snapshot = ctxt;
-       firehose_client_t fci;
+       firehose_client_t fc;
        long n = 0;
 
-       // 0. we need to be on the IO queue so that client connection and/or death
-       //    cannot happen concurrently
-       dispatch_assert_queue(server_config.fs_io_drain_queue);
-
-       // 1. mark all the clients participating in the current snapshot
-       //    and enter the group for each bit set
-       TAILQ_FOREACH(fci, &server_config.fs_clients, fc_entry) {
-               if (fci->fc_is_kernel) {
+       fs_clients_lock();
+       TAILQ_FOREACH(fc, &server_config.fs_clients, fc_entry) {
+               if (slowpath(fc->fc_memory_corrupted)) {
+                       continue;
+               }
+               if (!fc->fc_pid) {
 #if TARGET_OS_SIMULATOR
                        continue;
 #endif
-               }
-               if (slowpath(fci->fc_memory_corrupted)) {
+               } else if (!firehose_client_wakeup(fc, 0, for_io)) {
                        continue;
                }
-               fci->fc_needs_io_snapshot = true;
-               fci->fc_needs_mem_snapshot = true;
-               n += 2;
-       }
-       if (n) {
-               // cheating: equivalent to dispatch_group_enter() n times
-               // without the acquire barriers that we don't need
-               os_atomic_add2o(snapshot->fs_group, dg_value, n, relaxed);
+               n++;
+               if (for_io) {
+                       fc->fc_needs_io_snapshot = true;
+               } else {
+                       fc->fc_needs_mem_snapshot = true;
+               }
        }
+       fs_clients_unlock();
 
-       dispatch_async(server_config.fs_mem_drain_queue, ^{
-               // 2. make fs_snapshot visible, this is what triggers the snapshot
-               //    logic from _drain() or handle_death(). until fs_snapshot is
-               //    published, the bits set above are mostly ignored
-               server_config.fs_snapshot = snapshot;
-
-               snapshot->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_MEM_START, NULL);
-
-               dispatch_async(server_config.fs_io_drain_queue, ^{
-                       firehose_client_t fcj;
-
-                       snapshot->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_IO_START, NULL);
-
-                       // match group_enter from firehose_snapshot() after MEM+IO_START
-                       dispatch_group_leave(snapshot->fs_group);
-
-                       // 3. tickle all the clients. the list of clients may have changed
-                       //    since step 1, but worry not - new clients don't have
-                       //    fc_needs_*_snapshot set so drain is harmless; clients that
-                       //    were removed from the list have already left the group
-                       //    (see firehose_client_finalize())
-                       TAILQ_FOREACH(fcj, &server_config.fs_clients, fc_entry) {
-                               if (fcj->fc_is_kernel) {
-#if !TARGET_OS_SIMULATOR
-                                       firehose_client_kernel_source_handle_event(fcj);
-#endif
-                               } else {
-                                       dispatch_source_merge_data(fcj->fc_io_source, 1);
-                                       dispatch_source_merge_data(fcj->fc_mem_source, 1);
-                               }
-                       }
-               });
-       });
+       // cheating: equivalent to dispatch_group_enter() n times
+       // without the acquire barriers that we don't need
+       if (n) os_atomic_add2o(fs->fs_group, dg_value, n, relaxed);
 }
 
 static void
@@ -979,10 +1186,37 @@ firehose_snapshot_finish(void *ctxt)
 static void
 firehose_snapshot_gate(void *ctxt)
 {
+       firehose_snapshot_t fs = ctxt;
+
        // prevent other snapshots from running until done
+
        dispatch_suspend(server_config.fs_snapshot_gate_queue);
-       dispatch_async_f(server_config.fs_io_drain_queue, ctxt,
-                       firehose_snapshot_start);
+
+       server_config.fs_snapshot = fs;
+       dispatch_group_async(fs->fs_group, server_config.fs_mem_drain_queue, ^{
+               // start the fs_mem_snapshot, this is what triggers the snapshot
+               // logic from _drain() or handle_death()
+               fs->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_MEM_START, NULL);
+               firehose_snapshot_tickle_clients(fs, false);
+
+               dispatch_group_async(fs->fs_group, server_config.fs_io_drain_queue, ^{
+                       // start the fs_io_snapshot, this is what triggers the snapshot
+                       // logic from _drain() or handle_death()
+                       // 29868879: must always happen after the memory snapshot started
+                       fs->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_IO_START, NULL);
+                       firehose_snapshot_tickle_clients(fs, true);
+
+#if !TARGET_OS_SIMULATOR
+                       if (server_config.fs_kernel_client) {
+                               firehose_client_kernel_source_handle_event(
+                                               server_config.fs_kernel_client);
+                       }
+#endif
+               });
+       });
+
+       dispatch_group_notify_f(fs->fs_group, server_config.fs_io_drain_queue,
+                       fs, firehose_snapshot_finish);
 }
 
 void
@@ -993,12 +1227,6 @@ firehose_snapshot(firehose_snapshot_handler_t handler)
        snapshot->handler = Block_copy(handler);
        snapshot->fs_group = dispatch_group_create();
 
-       // keep the group entered until IO_START and MEM_START have been sent
-       // See firehose_snapshot_start()
-       dispatch_group_enter(snapshot->fs_group);
-       dispatch_group_notify_f(snapshot->fs_group, server_config.fs_io_drain_queue,
-                       snapshot, firehose_snapshot_finish);
-
        dispatch_async_f(server_config.fs_snapshot_gate_queue, snapshot,
                        firehose_snapshot_gate);
 }
@@ -1010,7 +1238,8 @@ kern_return_t
 firehose_server_register(mach_port_t server_port OS_UNUSED,
                mach_port_t mem_port, mach_vm_size_t mem_size,
                mach_port_t comm_recvp, mach_port_t comm_sendp,
-               mach_port_t extra_info_port, mach_vm_size_t extra_info_size)
+               mach_port_t extra_info_port, mach_vm_size_t extra_info_size,
+               audit_token_t atoken)
 {
        mach_vm_address_t base_addr = 0;
        firehose_client_t fc = NULL;
@@ -1060,7 +1289,7 @@ firehose_server_register(mach_port_t server_port OS_UNUSED,
        }
 
        fc = firehose_client_create((firehose_buffer_t)base_addr,
-                       comm_recvp, comm_sendp);
+                       (firehose_token_t)&atoken, comm_recvp, comm_sendp);
        dispatch_async(server_config.fs_io_drain_queue, ^{
                firehose_client_resume(fc, &fcci);
                if (fcci.fcci_size) {
@@ -1088,15 +1317,16 @@ firehose_server_push_async(mach_port_t server_port OS_UNUSED,
                if (expects_notifs && !fc->fc_use_notifs) {
                        fc->fc_use_notifs = true;
                }
-               firehose_client_push_async_merge(fc, pp, for_io);
+               firehose_client_wakeup(fc, pp, for_io);
        }
        return KERN_SUCCESS;
 }
 
 kern_return_t
-firehose_server_push(mach_port_t server_port OS_UNUSED,
+firehose_server_push_and_wait(mach_port_t server_port OS_UNUSED,
                mach_port_t reply_port, qos_class_t qos, boolean_t for_io,
-               firehose_push_reply_t *push_reply OS_UNUSED)
+               firehose_push_reply_t *push_reply OS_UNUSED,
+               boolean_t *quarantinedOut OS_UNUSED)
 {
        firehose_client_t fc = cur_client_info;
        dispatch_block_flags_t flags = DISPATCH_BLOCK_ENFORCE_QOS_CLASS;
@@ -1118,7 +1348,7 @@ firehose_server_push(mach_port_t server_port OS_UNUSED,
        }
 
        block = dispatch_block_create_with_qos_class(flags, qos, 0, ^{
-               firehose_client_drain(fc, reply_port,
+               firehose_client_drain_one(fc, reply_port,
                                for_io ? FIREHOSE_DRAIN_FOR_IO : 0);
        });
        dispatch_async(q, block);