X-Git-Url: https://git.saurik.com/apple/libdispatch.git/blobdiff_plain/beb15981c065ae4ed9a311077ec39909275640b6..refs/heads/master:/src/firehose/firehose_server.c diff --git a/src/firehose/firehose_server.c b/src/firehose/firehose_server.c index a6be2fa..ba335db 100644 --- a/src/firehose/firehose_server.c +++ b/src/firehose/firehose_server.c @@ -31,6 +31,11 @@ _Static_assert(offsetof(struct firehose_client_s, fc_mem_sent_flushed_pos) % 8 == 0, "Make sure atomic fields are properly aligned"); #endif +typedef struct fs_client_queue_s { + struct firehose_client_s *volatile fs_client_head; + struct firehose_client_s *volatile fs_client_tail; +} fs_client_queue_s, *fs_client_queue_t; + static struct firehose_server_s { mach_port_t fs_bootstrap_port; dispatch_mach_t fs_mach_channel; @@ -41,24 +46,161 @@ static struct firehose_server_s { firehose_handler_t fs_handler; firehose_snapshot_t fs_snapshot; - int fs_kernel_fd; firehose_client_t fs_kernel_client; TAILQ_HEAD(, firehose_client_s) fs_clients; + os_unfair_lock fs_clients_lock; + fs_client_queue_s fs_queues[4]; + dispatch_source_t fs_sources[4]; } server_config = { .fs_clients = TAILQ_HEAD_INITIALIZER(server_config.fs_clients), + .fs_clients_lock = OS_UNFAIR_LOCK_INIT, .fs_kernel_fd = -1, }; -#pragma mark - -#pragma mark firehose client state machine +OS_ALWAYS_INLINE +static inline void +fs_clients_lock(void) +{ + os_unfair_lock_lock_with_options(&server_config.fs_clients_lock, + OS_UNFAIR_LOCK_DATA_SYNCHRONIZATION); +} + +OS_ALWAYS_INLINE +static inline void +fs_clients_unlock(void) +{ + os_unfair_lock_unlock(&server_config.fs_clients_lock); +} static void firehose_server_demux(firehose_client_t fc, mach_msg_header_t *msg_hdr); static void firehose_client_cancel(firehose_client_t fc); static void firehose_client_snapshot_finish(firehose_client_t fc, firehose_snapshot_t snapshot, bool for_io); +static void firehose_client_handle_death(void *ctxt); + +#pragma mark - +#pragma mark firehose client enqueueing + +OS_ALWAYS_INLINE +static inline bool +fs_idx_is_for_io(size_t idx) +{ + return idx & 1; +} + +OS_ALWAYS_INLINE +static inline bool +fs_queue_is_for_io(fs_client_queue_t q) +{ + return (q - server_config.fs_queues) & 1; +} + +OS_ALWAYS_INLINE +static inline bool +fs_queue_is_for_quarantined(fs_client_queue_t q) +{ + return (q - server_config.fs_queues) & 2; +} + +OS_ALWAYS_INLINE +static inline fs_client_queue_t +fs_queue(bool quarantined, bool for_io) +{ + return &server_config.fs_queues[quarantined * 2 + for_io]; +} + +OS_ALWAYS_INLINE +static inline dispatch_source_t +fs_source(bool quarantined, bool for_io) +{ + return server_config.fs_sources[quarantined * 2 + for_io]; +} + +OS_ALWAYS_INLINE +static inline void +firehose_client_push(firehose_client_t fc, pthread_priority_t pp, + bool quarantined, bool for_io) +{ + fs_client_queue_t queue = fs_queue(quarantined, for_io); + if (fc && os_mpsc_push_update_tail(queue, fs_client, fc, fc_next[for_io])) { + os_mpsc_push_update_head(queue, fs_client, fc); + _dispatch_source_merge_data(fs_source(quarantined, for_io), pp, 1); + } else if (pp) { + _dispatch_source_merge_data(fs_source(quarantined, for_io), pp, 1); + } +} + +OS_ALWAYS_INLINE +static inline bool +firehose_client_wakeup(firehose_client_t fc, pthread_priority_t pp, + bool for_io) +{ + uintptr_t canceled_bit = FC_STATE_CANCELED(for_io); + uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io); + uintptr_t old_state, new_state; + + os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, { + if (old_state & canceled_bit) { + os_atomic_rmw_loop_give_up(return false); + } + if (old_state & enqueued_bit) { + os_atomic_rmw_loop_give_up(break); + } + new_state = old_state | enqueued_bit; + }); + firehose_client_push(old_state & enqueued_bit ? NULL : fc, pp, + fc->fc_quarantined, for_io); + return true; +} + +OS_ALWAYS_INLINE +static inline void +firehose_client_start_cancel(firehose_client_t fc, bool for_io) +{ + uintptr_t canceling_bit = FC_STATE_CANCELING(for_io); + uintptr_t canceled_bit = FC_STATE_CANCELED(for_io); + uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io); + uintptr_t old_state, new_state; + + os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, { + if (old_state & (canceled_bit | canceling_bit)) { + os_atomic_rmw_loop_give_up(return); + } + new_state = old_state | enqueued_bit | canceling_bit; + }); + firehose_client_push(old_state & enqueued_bit ? NULL : fc, 0, + fc->fc_quarantined, for_io); +} + +OS_ALWAYS_INLINE +static inline bool +firehose_client_dequeue(firehose_client_t fc, bool for_io) +{ + uintptr_t canceling_bit = FC_STATE_CANCELING(for_io); + uintptr_t canceled_bit = FC_STATE_CANCELED(for_io); + uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io); + uintptr_t old_state, new_state; + + os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, { + new_state = old_state & ~(canceling_bit | enqueued_bit); + if (old_state & canceling_bit) { + new_state |= canceled_bit; + } + }); + + if (((old_state ^ new_state) & FC_STATE_CANCELED_MASK) && + (new_state & FC_STATE_CANCELED_MASK) == FC_STATE_CANCELED_MASK) { + dispatch_async_f(server_config.fs_io_drain_queue, fc, + firehose_client_handle_death); + } + return !(new_state & canceled_bit); +} + +#pragma mark - +#pragma mark firehose client state machine static void firehose_client_notify(firehose_client_t fc, mach_port_t reply_port) @@ -74,15 +216,17 @@ firehose_client_notify(firehose_client_t fc, mach_port_t reply_port) firehose_atomic_max2o(fc, fc_io_sent_flushed_pos, push_reply.fpr_io_flushed_pos, relaxed); - if (fc->fc_is_kernel) { + if (!fc->fc_pid) { if (ioctl(server_config.fs_kernel_fd, LOGFLUSHED, &push_reply) < 0) { dispatch_assume_zero(errno); } } else { if (reply_port == fc->fc_sendp) { - kr = firehose_send_push_notify_async(reply_port, push_reply, 0); + kr = firehose_send_push_notify_async(reply_port, push_reply, + fc->fc_quarantined, 0); } else { - kr = firehose_send_push_reply(reply_port, KERN_SUCCESS, push_reply); + kr = firehose_send_push_reply(reply_port, KERN_SUCCESS, push_reply, + fc->fc_quarantined); } if (kr != MACH_SEND_INVALID_DEST) { DISPATCH_VERIFY_MIG(kr); @@ -104,18 +248,6 @@ firehose_client_acquire_head(firehose_buffer_t fb, bool for_io) return head; } -OS_ALWAYS_INLINE -static inline void -firehose_client_push_async_merge(firehose_client_t fc, pthread_priority_t pp, - bool for_io) -{ - if (for_io) { - _dispatch_source_merge_data(fc->fc_io_source, pp, 1); - } else { - _dispatch_source_merge_data(fc->fc_mem_source, pp, 1); - } -} - OS_NOINLINE OS_COLD static void firehose_client_mark_corrupted(firehose_client_t fc, mach_port_t reply_port) @@ -129,7 +261,7 @@ firehose_client_mark_corrupted(firehose_client_t fc, mach_port_t reply_port) if (reply_port) { kern_return_t kr = firehose_send_push_reply(reply_port, 0, - FIREHOSE_PUSH_REPLY_CORRUPTED); + FIREHOSE_PUSH_REPLY_CORRUPTED, false); DISPATCH_VERIFY_MIG(kr); dispatch_assume_zero(kr); } @@ -154,10 +286,10 @@ firehose_client_snapshot_mark_done(firehose_client_t fc, OS_NOINLINE static void -firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags) +firehose_client_drain_one(firehose_client_t fc, mach_port_t port, uint32_t flags) { firehose_buffer_t fb = fc->fc_buffer; - firehose_buffer_chunk_t fbc; + firehose_chunk_t fbc; firehose_event_t evt; uint16_t volatile *fbh_ring; uint16_t flushed, ref, count = 0; @@ -172,9 +304,7 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags) fbh_ring = fb->fb_header.fbh_io_ring; sent_flushed = (uint16_t)fc->fc_io_sent_flushed_pos; flushed = (uint16_t)fc->fc_io_flushed_pos; - if (fc->fc_needs_io_snapshot) { - snapshot = server_config.fs_snapshot; - } + if (fc->fc_needs_io_snapshot) snapshot = server_config.fs_snapshot; } else { evt = FIREHOSE_EVENT_MEM_BUFFER_RECEIVED; _Static_assert(FIREHOSE_EVENT_MEM_BUFFER_RECEIVED == @@ -182,9 +312,7 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags) fbh_ring = fb->fb_header.fbh_mem_ring; sent_flushed = (uint16_t)fc->fc_mem_sent_flushed_pos; flushed = (uint16_t)fc->fc_mem_flushed_pos; - if (fc->fc_needs_mem_snapshot) { - snapshot = server_config.fs_snapshot; - } + if (fc->fc_needs_mem_snapshot) snapshot = server_config.fs_snapshot; } if (slowpath(fc->fc_memory_corrupted)) { @@ -209,7 +337,7 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags) ref = (flushed + count) & FIREHOSE_RING_POS_IDX_MASK; ref = os_atomic_load(&fbh_ring[ref], relaxed); ref &= FIREHOSE_RING_POS_IDX_MASK; - } while (fc->fc_is_kernel && !ref); + } while (!fc->fc_pid && !ref); count++; if (!ref) { _dispatch_debug("Ignoring invalid page reference in ring: %d", ref); @@ -217,10 +345,17 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags) } fbc = firehose_buffer_ref_to_chunk(fb, ref); + if (fbc->fc_pos.fcp_stream == firehose_stream_metadata) { + // serialize with firehose_client_metadata_stream_peek + os_unfair_lock_lock(&fc->fc_lock); + } server_config.fs_handler(fc, evt, fbc); if (slowpath(snapshot)) { snapshot->handler(fc, evt, fbc); } + if (fbc->fc_pos.fcp_stream == firehose_stream_metadata) { + os_unfair_lock_unlock(&fc->fc_lock); + } // clients not using notifications (single threaded) always drain fully // because they use all their limit, always } while (!fc->fc_use_notifs || count < DRAIN_BATCH_SIZE || snapshot); @@ -238,7 +373,7 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags) client_flushed = os_atomic_load2o(&fb->fb_header, fbh_ring_tail.frp_mem_flushed, relaxed); } - if (fc->fc_is_kernel) { + if (!fc->fc_pid) { // will fire firehose_client_notify() because port is MACH_PORT_DEAD port = fc->fc_sendp; } else if (!port && client_flushed == sent_flushed && fc->fc_use_notifs) { @@ -253,7 +388,7 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags) if (port) { firehose_client_notify(fc, port); } - if (fc->fc_is_kernel) { + if (!fc->fc_pid) { if (!(flags & FIREHOSE_DRAIN_POLL)) { // see firehose_client_kernel_source_handle_event dispatch_resume(fc->fc_kernel_source); @@ -264,12 +399,12 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags) // and there's more to drain, so optimistically schedule draining // again this is cheap since the queue is hot, and is fair for other // clients - firehose_client_push_async_merge(fc, 0, for_io); + firehose_client_wakeup(fc, 0, for_io); } if (count && server_config.fs_kernel_client) { // the kernel is special because it can drop messages, so if we're // draining, poll the kernel each time while we're bound to a thread - firehose_client_drain(server_config.fs_kernel_client, + firehose_client_drain_one(server_config.fs_kernel_client, MACH_PORT_NULL, flags | FIREHOSE_DRAIN_POLL); } } @@ -283,21 +418,37 @@ corrupt: // from now on all IO/mem drains depending on `for_io` will be no-op // (needs__snapshot: false, memory_corrupted: true). we can safely // silence the corresponding source of drain wake-ups. - if (!fc->fc_is_kernel) { - dispatch_source_cancel(for_io ? fc->fc_io_source : fc->fc_mem_source); + if (fc->fc_pid) { + firehose_client_start_cancel(fc, for_io); } } static void -firehose_client_drain_io_async(void *ctx) -{ - firehose_client_drain(ctx, MACH_PORT_NULL, FIREHOSE_DRAIN_FOR_IO); -} - -static void -firehose_client_drain_mem_async(void *ctx) +firehose_client_drain(void *ctxt) { - firehose_client_drain(ctx, MACH_PORT_NULL, 0); + fs_client_queue_t queue = ctxt; + bool for_io = fs_queue_is_for_io(queue); + bool quarantined = fs_queue_is_for_quarantined(queue); + firehose_client_t fc, fc_next; + size_t clients = 0; + + while (queue->fs_client_tail) { + fc = os_mpsc_get_head(queue, fs_client); + do { + fc_next = os_mpsc_pop_head(queue, fs_client, fc, fc_next[for_io]); + if (firehose_client_dequeue(fc, for_io)) { + firehose_client_drain_one(fc, MACH_PORT_NULL, + for_io ? FIREHOSE_DRAIN_FOR_IO : 0); + } + // process quarantined clients 4 times as slow as the other ones + // also reasyncing every 4 clients allows for discovering + // quarantined suspension faster + if (++clients == (quarantined ? 1 : 4)) { + dispatch_source_merge_data(fs_source(quarantined, for_io), 1); + return; + } + } while ((fc = fc_next)); + } } OS_NOINLINE @@ -326,7 +477,12 @@ firehose_client_finalize(firehose_client_t fc OS_OBJECT_CONSUMED) } server_config.fs_handler(fc, FIREHOSE_EVENT_CLIENT_DIED, NULL); + fs_clients_lock(); TAILQ_REMOVE(&server_config.fs_clients, fc, fc_entry); + fs_clients_unlock(); + + dispatch_release(fc->fc_mach_channel); + fc->fc_mach_channel = NULL; fc->fc_entry.tqe_next = DISPATCH_OBJECT_LISTLESS; fc->fc_entry.tqe_prev = DISPATCH_OBJECT_LISTLESS; _os_object_release(&fc->fc_as_os_object); @@ -383,26 +539,26 @@ firehose_client_handle_death(void *ctxt) // Then look at all the allocated pages not seen in the ring while (bitmap) { uint16_t ref = firehose_bitmap_first_set(bitmap); - firehose_buffer_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref); - uint16_t fbc_length = fbc->fbc_pos.fbc_next_entry_offs; + firehose_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref); + uint16_t fbc_length = fbc->fc_pos.fcp_next_entry_offs; bitmap &= ~(1ULL << ref); - if (fbc->fbc_start + fbc_length <= fbc->fbc_data) { + if (fbc->fc_start + fbc_length <= fbc->fc_data) { // this page has its "recycle-requeue" done, but hasn't gone // through "recycle-reuse", or it has no data, ditch it continue; } - if (!((firehose_tracepoint_t)fbc->fbc_data)->ft_length) { + if (!((firehose_tracepoint_t)fbc->fc_data)->ft_length) { // this thing has data, but the first tracepoint is unreadable // so also just ditch it continue; } - if (!fbc->fbc_pos.fbc_flag_io) { + if (!fbc->fc_pos.fcp_flag_io) { mem_bitmap |= 1ULL << ref; continue; } server_config.fs_handler(fc, FIREHOSE_EVENT_IO_BUFFER_RECEIVED, fbc); - if (fc->fc_needs_io_snapshot && snapshot) { + if (fc->fc_needs_io_snapshot) { snapshot->handler(fc, FIREHOSE_SNAPSHOT_EVENT_IO_BUFFER, fbc); } } @@ -416,11 +572,11 @@ firehose_client_handle_death(void *ctxt) while (mem_bitmap_copy) { uint16_t ref = firehose_bitmap_first_set(mem_bitmap_copy); - firehose_buffer_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref); + firehose_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref); mem_bitmap_copy &= ~(1ULL << ref); server_config.fs_handler(fc, FIREHOSE_EVENT_MEM_BUFFER_RECEIVED, fbc); - if (fc->fc_needs_mem_snapshot && snapshot) { + if (fc->fc_needs_mem_snapshot) { snapshot->handler(fc, FIREHOSE_SNAPSHOT_EVENT_MEM_BUFFER, fbc); } } @@ -434,18 +590,13 @@ static void firehose_client_handle_mach_event(void *ctx, dispatch_mach_reason_t reason, dispatch_mach_msg_t dmsg, mach_error_t error OS_UNUSED) { - mach_msg_header_t *msg_hdr; + mach_msg_header_t *msg_hdr = NULL; firehose_client_t fc = ctx; - mach_port_t oldsendp, oldrecvp; - - if (dmsg) { - msg_hdr = dispatch_mach_msg_get_msg(dmsg, NULL); - oldsendp = msg_hdr->msgh_remote_port; - oldrecvp = msg_hdr->msgh_local_port; - } + mach_port_t port; switch (reason) { case DISPATCH_MACH_MESSAGE_RECEIVED: + msg_hdr = dispatch_mach_msg_get_msg(dmsg, NULL); if (msg_hdr->msgh_id == MACH_NOTIFY_NO_SENDERS) { _dispatch_debug("FIREHOSE NO_SENDERS (unique_pid: 0x%llx)", firehose_client_get_unique_pid(fc, NULL)); @@ -456,25 +607,33 @@ firehose_client_handle_mach_event(void *ctx, dispatch_mach_reason_t reason, break; case DISPATCH_MACH_DISCONNECTED: - if (oldsendp) { - if (slowpath(oldsendp != fc->fc_sendp)) { - DISPATCH_INTERNAL_CRASH(oldsendp, - "disconnect event about unknown send-right"); + msg_hdr = dispatch_mach_msg_get_msg(dmsg, NULL); + port = msg_hdr->msgh_remote_port; + if (MACH_PORT_VALID(port)) { + if (port != fc->fc_sendp) { + DISPATCH_INTERNAL_CRASH(port, "Unknown send-right"); } firehose_mach_port_send_release(fc->fc_sendp); fc->fc_sendp = MACH_PORT_NULL; } - if (oldrecvp) { - if (slowpath(oldrecvp != fc->fc_recvp)) { - DISPATCH_INTERNAL_CRASH(oldrecvp, - "disconnect event about unknown receive-right"); + port = msg_hdr->msgh_local_port; + if (MACH_PORT_VALID(port)) { + if (port != fc->fc_recvp) { + DISPATCH_INTERNAL_CRASH(port, "Unknown recv-right"); } firehose_mach_port_recv_dispose(fc->fc_recvp, fc); fc->fc_recvp = MACH_PORT_NULL; } - if (fc->fc_recvp == MACH_PORT_NULL && fc->fc_sendp == MACH_PORT_NULL) { - firehose_client_cancel(fc); + break; + + case DISPATCH_MACH_CANCELED: + if (MACH_PORT_VALID(fc->fc_sendp)) { + DISPATCH_INTERNAL_CRASH(fc->fc_sendp, "send-right leak"); + } + if (MACH_PORT_VALID(fc->fc_recvp)) { + DISPATCH_INTERNAL_CRASH(fc->fc_recvp, "recv-right leak"); } + firehose_client_cancel(fc); break; } } @@ -488,10 +647,8 @@ firehose_client_kernel_source_handle_event(void *ctxt) // resumed in firehose_client_drain for both memory and I/O dispatch_suspend(fc->fc_kernel_source); dispatch_suspend(fc->fc_kernel_source); - dispatch_async_f(server_config.fs_mem_drain_queue, - fc, firehose_client_drain_mem_async); - dispatch_async_f(server_config.fs_io_drain_queue, - fc, firehose_client_drain_io_async); + firehose_client_wakeup(fc, 0, false); + firehose_client_wakeup(fc, 0, true); } #endif @@ -500,41 +657,37 @@ firehose_client_resume(firehose_client_t fc, const struct firehose_client_connected_info_s *fcci) { dispatch_assert_queue(server_config.fs_io_drain_queue); + + fs_clients_lock(); TAILQ_INSERT_TAIL(&server_config.fs_clients, fc, fc_entry); + fs_clients_unlock(); + server_config.fs_handler(fc, FIREHOSE_EVENT_CLIENT_CONNECTED, (void *)fcci); - if (fc->fc_is_kernel) { + if (!fc->fc_pid) { dispatch_activate(fc->fc_kernel_source); } else { dispatch_mach_connect(fc->fc_mach_channel, fc->fc_recvp, fc->fc_sendp, NULL); - dispatch_activate(fc->fc_io_source); - dispatch_activate(fc->fc_mem_source); } } static void firehose_client_cancel(firehose_client_t fc) { - dispatch_mach_t dm; - dispatch_block_t block; - _dispatch_debug("client died (unique_pid: 0x%llx", firehose_client_get_unique_pid(fc, NULL)); - dm = fc->fc_mach_channel; - fc->fc_mach_channel = NULL; - dispatch_release(dm); - + if (MACH_PORT_VALID(fc->fc_sendp)) { + firehose_mach_port_send_release(fc->fc_sendp); + fc->fc_sendp = MACH_PORT_NULL; + } + if (MACH_PORT_VALID(fc->fc_recvp)) { + firehose_mach_port_recv_dispose(fc->fc_recvp, fc); + fc->fc_recvp = MACH_PORT_NULL; + } fc->fc_use_notifs = false; - dispatch_source_cancel(fc->fc_io_source); - dispatch_source_cancel(fc->fc_mem_source); - - block = dispatch_block_create(DISPATCH_BLOCK_DETACHED, ^{ - dispatch_async_f(server_config.fs_io_drain_queue, fc, - firehose_client_handle_death); - }); - dispatch_async(server_config.fs_mem_drain_queue, block); - _Block_release(block); + firehose_client_start_cancel(fc, false); + firehose_client_start_cancel(fc, true); } static firehose_client_t @@ -552,32 +705,30 @@ _firehose_client_create(firehose_buffer_t fb) return fc; } +#pragma pack(4) +typedef struct firehose_token_s { + uid_t auid; + uid_t euid; + gid_t egid; + uid_t ruid; + gid_t rgid; + pid_t pid; + au_asid_t asid; + dev_t execcnt; +} *firehose_token_t; +#pragma pack() + static firehose_client_t -firehose_client_create(firehose_buffer_t fb, +firehose_client_create(firehose_buffer_t fb, firehose_token_t token, mach_port_t comm_recvp, mach_port_t comm_sendp) { uint64_t unique_pid = fb->fb_header.fbh_uniquepid; firehose_client_t fc = _firehose_client_create(fb); dispatch_mach_t dm; - dispatch_source_t ds; - ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0, - server_config.fs_mem_drain_queue); - _os_object_retain_internal_inline(&fc->fc_as_os_object); - dispatch_set_context(ds, fc); - dispatch_set_finalizer_f(ds, - (dispatch_function_t)_os_object_release_internal); - dispatch_source_set_event_handler_f(ds, firehose_client_drain_mem_async); - fc->fc_mem_source = ds; - - ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0, - server_config.fs_io_drain_queue); - _os_object_retain_internal_inline(&fc->fc_as_os_object); - dispatch_set_context(ds, fc); - dispatch_set_finalizer_f(ds, - (dispatch_function_t)_os_object_release_internal); - dispatch_source_set_event_handler_f(ds, firehose_client_drain_io_async); - fc->fc_io_source = ds; + fc->fc_pid = token->pid ? token->pid : ~0; + fc->fc_euid = token->euid; + fc->fc_pidversion = token->execcnt; _dispatch_debug("FIREHOSE_REGISTER (unique_pid: 0x%llx)", unique_pid); fc->fc_recvp = comm_recvp; @@ -617,12 +768,11 @@ firehose_kernel_client_create(void) DISPATCH_INTERNAL_CRASH(errno, "Unable to map kernel buffer"); } if (fb_map.fbmi_size != - FIREHOSE_BUFFER_KERNEL_CHUNK_COUNT * FIREHOSE_BUFFER_CHUNK_SIZE) { + FIREHOSE_BUFFER_KERNEL_CHUNK_COUNT * FIREHOSE_CHUNK_SIZE) { DISPATCH_INTERNAL_CRASH(fb_map.fbmi_size, "Unexpected kernel buffer size"); } fc = _firehose_client_create((firehose_buffer_t)(uintptr_t)fb_map.fbmi_addr); - fc->fc_is_kernel = true; ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, (uintptr_t)fd, 0, fs->fs_ipc_queue); dispatch_set_context(ds, fc); @@ -651,24 +801,27 @@ _firehose_client_xref_dispose(firehose_client_t fc) { _dispatch_debug("Cleaning up client info for unique_pid 0x%llx", firehose_client_get_unique_pid(fc, NULL)); - - dispatch_release(fc->fc_io_source); - fc->fc_io_source = NULL; - - dispatch_release(fc->fc_mem_source); - fc->fc_mem_source = NULL; } uint64_t firehose_client_get_unique_pid(firehose_client_t fc, pid_t *pid_out) { firehose_buffer_header_t fbh = &fc->fc_buffer->fb_header; - if (fc->fc_is_kernel) { - if (pid_out) *pid_out = 0; - return 0; - } - if (pid_out) *pid_out = fbh->fbh_pid ?: ~(pid_t)0; - return fbh->fbh_uniquepid ?: ~0ull; + if (pid_out) *pid_out = fc->fc_pid; + if (!fc->fc_pid) return 0; + return fbh->fbh_uniquepid ? fbh->fbh_uniquepid : ~0ull; +} + +uid_t +firehose_client_get_euid(firehose_client_t fc) +{ + return fc->fc_euid; +} + +int +firehose_client_get_pid_version(firehose_client_t fc) +{ + return fc->fc_pidversion; } void * @@ -692,6 +845,12 @@ firehose_client_set_context(firehose_client_t fc, void *ctxt) return os_atomic_xchg2o(fc, fc_ctxt, ctxt, relaxed); } +void +firehose_client_initiate_quarantine(firehose_client_t fc) +{ + fc->fc_quarantined = true; +} + #pragma mark - #pragma mark firehose server @@ -720,22 +879,24 @@ void firehose_server_init(mach_port_t comm_port, firehose_handler_t handler) { struct firehose_server_s *fs = &server_config; - dispatch_queue_attr_t attr; + dispatch_queue_attr_t attr = DISPATCH_QUEUE_SERIAL_WITH_AUTORELEASE_POOL; + dispatch_queue_attr_t attr_ui; dispatch_mach_t dm; + dispatch_source_t ds; // just reference the string so that it's captured (void)os_atomic_load(&__libfirehose_serverVersionString[0], relaxed); - attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, + attr_ui = dispatch_queue_attr_make_with_qos_class(attr, QOS_CLASS_USER_INITIATED, 0); fs->fs_ipc_queue = dispatch_queue_create_with_target( - "com.apple.firehose.ipc", attr, NULL); + "com.apple.firehose.ipc", attr_ui, NULL); fs->fs_snapshot_gate_queue = dispatch_queue_create_with_target( - "com.apple.firehose.snapshot-gate", DISPATCH_QUEUE_SERIAL, NULL); + "com.apple.firehose.snapshot-gate", attr, NULL); fs->fs_io_drain_queue = dispatch_queue_create_with_target( - "com.apple.firehose.drain-io", DISPATCH_QUEUE_SERIAL, NULL); + "com.apple.firehose.drain-io", attr, NULL); fs->fs_mem_drain_queue = dispatch_queue_create_with_target( - "com.apple.firehose.drain-mem", DISPATCH_QUEUE_SERIAL, NULL); + "com.apple.firehose.drain-mem", attr, NULL); dm = dispatch_mach_create_f("com.apple.firehose.listener", fs->fs_ipc_queue, NULL, firehose_server_handle_mach_event); @@ -743,6 +904,15 @@ firehose_server_init(mach_port_t comm_port, firehose_handler_t handler) fs->fs_mach_channel = dm; fs->fs_handler = _Block_copy(handler); firehose_kernel_client_create(); + + for (size_t i = 0; i < countof(fs->fs_sources); i++) { + ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0, + fs_idx_is_for_io(i) ? server_config.fs_io_drain_queue : + server_config.fs_mem_drain_queue); + dispatch_set_context(ds, &fs->fs_queues[i]); + dispatch_source_set_event_handler_f(ds, firehose_client_drain); + fs->fs_sources[i] = ds; + } } void @@ -760,6 +930,17 @@ firehose_server_assert_spi_version(uint32_t spi_version) } } +bool +firehose_server_has_ever_flushed_pages(void) +{ + // Use the IO pages flushed count from the kernel client as an + // approximation for whether the firehose has ever flushed pages during + // this boot. logd uses this detect the first time it starts after a + // fresh boot. + firehose_client_t fhc = server_config.fs_kernel_client; + return !fhc || fhc->fc_io_flushed_pos > 0; +} + void firehose_server_resume(void) { @@ -775,54 +956,115 @@ firehose_server_resume(void) } dispatch_mach_connect(fs->fs_mach_channel, fs->fs_bootstrap_port, MACH_PORT_NULL, NULL); + for (size_t i = 0; i < countof(fs->fs_sources); i++) { + dispatch_activate(fs->fs_sources[i]); + } } -#pragma mark - -#pragma mark firehose snapshot and peeking +void +firehose_server_cancel(void) +{ + firehose_client_t fc; + + dispatch_mach_cancel(server_config.fs_mach_channel); + + fs_clients_lock(); + TAILQ_FOREACH(fc, &server_config.fs_clients, fc_entry) { + dispatch_mach_cancel(fc->fc_mach_channel); + } + fs_clients_unlock(); +} + +dispatch_queue_t +firehose_server_copy_queue(firehose_server_queue_t which) +{ + dispatch_queue_t dq; + switch (which) { + case FIREHOSE_SERVER_QUEUE_IO: + dq = server_config.fs_io_drain_queue; + break; + case FIREHOSE_SERVER_QUEUE_MEMORY: + dq = server_config.fs_mem_drain_queue; + break; + default: + DISPATCH_INTERNAL_CRASH(which, "Invalid firehose server queue type"); + } + dispatch_retain(dq); + return dq; +} void -firehose_client_metadata_stream_peek(firehose_client_t fc, - firehose_event_t context, bool (^peek_should_start)(void), - bool (^peek)(firehose_buffer_chunk_t fbc)) +firehose_server_quarantined_suspend(firehose_server_queue_t which) { - if (context != FIREHOSE_EVENT_MEM_BUFFER_RECEIVED) { - return dispatch_sync(server_config.fs_mem_drain_queue, ^{ - firehose_client_metadata_stream_peek(fc, - FIREHOSE_EVENT_MEM_BUFFER_RECEIVED, peek_should_start, peek); - }); + switch (which) { + case FIREHOSE_SERVER_QUEUE_IO: + dispatch_suspend(fs_source(true, true)); + break; + case FIREHOSE_SERVER_QUEUE_MEMORY: + dispatch_suspend(fs_source(true, false)); + break; + default: + DISPATCH_INTERNAL_CRASH(which, "Invalid firehose server queue type"); } +} - if (peek_should_start && !peek_should_start()) { - return; +void +firehose_server_quarantined_resume(firehose_server_queue_t which) +{ + switch (which) { + case FIREHOSE_SERVER_QUEUE_IO: + dispatch_resume(fs_source(true, true)); + break; + case FIREHOSE_SERVER_QUEUE_MEMORY: + dispatch_resume(fs_source(true, false)); + break; + default: + DISPATCH_INTERNAL_CRASH(which, "Invalid firehose server queue type"); } +} - firehose_buffer_t fb = fc->fc_buffer; - firehose_buffer_header_t fbh = &fb->fb_header; - uint64_t bitmap = fbh->fbh_bank.fbb_metadata_bitmap; - while (bitmap) { - uint16_t ref = firehose_bitmap_first_set(bitmap); - firehose_buffer_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref); - uint16_t fbc_length = fbc->fbc_pos.fbc_next_entry_offs; +#pragma mark - +#pragma mark firehose snapshot and peeking - bitmap &= ~(1ULL << ref); - if (fbc->fbc_start + fbc_length <= fbc->fbc_data) { - // this page has its "recycle-requeue" done, but hasn't gone - // through "recycle-reuse", or it has no data, ditch it - continue; - } - if (!((firehose_tracepoint_t)fbc->fbc_data)->ft_length) { - // this thing has data, but the first tracepoint is unreadable - // so also just ditch it - continue; - } - if (fbc->fbc_pos.fbc_stream != firehose_stream_metadata) { - continue; - } - if (!peek(fbc)) { - break; +void +firehose_client_metadata_stream_peek(firehose_client_t fc, + OS_UNUSED firehose_event_t context, bool (^peek_should_start)(void), + bool (^peek)(firehose_chunk_t fbc)) +{ + os_unfair_lock_lock(&fc->fc_lock); + + if (peek_should_start && peek_should_start()) { + firehose_buffer_t fb = fc->fc_buffer; + firehose_buffer_header_t fbh = &fb->fb_header; + uint64_t bitmap = fbh->fbh_bank.fbb_metadata_bitmap; + + while (bitmap) { + uint16_t ref = firehose_bitmap_first_set(bitmap); + firehose_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref); + uint16_t fbc_length = fbc->fc_pos.fcp_next_entry_offs; + + bitmap &= ~(1ULL << ref); + if (fbc->fc_start + fbc_length <= fbc->fc_data) { + // this page has its "recycle-requeue" done, but hasn't gone + // through "recycle-reuse", or it has no data, ditch it + continue; + } + if (!((firehose_tracepoint_t)fbc->fc_data)->ft_length) { + // this thing has data, but the first tracepoint is unreadable + // so also just ditch it + continue; + } + if (fbc->fc_pos.fcp_stream != firehose_stream_metadata) { + continue; + } + if (!peek(fbc)) { + break; + } } } + + os_unfair_lock_unlock(&fc->fc_lock); } OS_NOINLINE OS_COLD @@ -872,21 +1114,21 @@ firehose_client_snapshot_finish(firehose_client_t fc, // Then look at all the allocated pages not seen in the ring while (bitmap) { uint16_t ref = firehose_bitmap_first_set(bitmap); - firehose_buffer_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref); - uint16_t fbc_length = fbc->fbc_pos.fbc_next_entry_offs; + firehose_chunk_t fbc = firehose_buffer_ref_to_chunk(fb, ref); + uint16_t fbc_length = fbc->fc_pos.fcp_next_entry_offs; bitmap &= ~(1ULL << ref); - if (fbc->fbc_start + fbc_length <= fbc->fbc_data) { + if (fbc->fc_start + fbc_length <= fbc->fc_data) { // this page has its "recycle-requeue" done, but hasn't gone // through "recycle-reuse", or it has no data, ditch it continue; } - if (!((firehose_tracepoint_t)fbc->fbc_data)->ft_length) { + if (!((firehose_tracepoint_t)fbc->fc_data)->ft_length) { // this thing has data, but the first tracepoint is unreadable // so also just ditch it continue; } - if (fbc->fbc_pos.fbc_flag_io != for_io) { + if (fbc->fc_pos.fcp_flag_io != for_io) { continue; } snapshot->handler(fc, evt, fbc); @@ -894,70 +1136,35 @@ firehose_client_snapshot_finish(firehose_client_t fc, } static void -firehose_snapshot_start(void *ctxt) +firehose_snapshot_tickle_clients(firehose_snapshot_t fs, bool for_io) { - firehose_snapshot_t snapshot = ctxt; - firehose_client_t fci; + firehose_client_t fc; long n = 0; - // 0. we need to be on the IO queue so that client connection and/or death - // cannot happen concurrently - dispatch_assert_queue(server_config.fs_io_drain_queue); - - // 1. mark all the clients participating in the current snapshot - // and enter the group for each bit set - TAILQ_FOREACH(fci, &server_config.fs_clients, fc_entry) { - if (fci->fc_is_kernel) { + fs_clients_lock(); + TAILQ_FOREACH(fc, &server_config.fs_clients, fc_entry) { + if (slowpath(fc->fc_memory_corrupted)) { + continue; + } + if (!fc->fc_pid) { #if TARGET_OS_SIMULATOR continue; #endif - } - if (slowpath(fci->fc_memory_corrupted)) { + } else if (!firehose_client_wakeup(fc, 0, for_io)) { continue; } - fci->fc_needs_io_snapshot = true; - fci->fc_needs_mem_snapshot = true; - n += 2; - } - if (n) { - // cheating: equivalent to dispatch_group_enter() n times - // without the acquire barriers that we don't need - os_atomic_add2o(snapshot->fs_group, dg_value, n, relaxed); + n++; + if (for_io) { + fc->fc_needs_io_snapshot = true; + } else { + fc->fc_needs_mem_snapshot = true; + } } + fs_clients_unlock(); - dispatch_async(server_config.fs_mem_drain_queue, ^{ - // 2. make fs_snapshot visible, this is what triggers the snapshot - // logic from _drain() or handle_death(). until fs_snapshot is - // published, the bits set above are mostly ignored - server_config.fs_snapshot = snapshot; - - snapshot->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_MEM_START, NULL); - - dispatch_async(server_config.fs_io_drain_queue, ^{ - firehose_client_t fcj; - - snapshot->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_IO_START, NULL); - - // match group_enter from firehose_snapshot() after MEM+IO_START - dispatch_group_leave(snapshot->fs_group); - - // 3. tickle all the clients. the list of clients may have changed - // since step 1, but worry not - new clients don't have - // fc_needs_*_snapshot set so drain is harmless; clients that - // were removed from the list have already left the group - // (see firehose_client_finalize()) - TAILQ_FOREACH(fcj, &server_config.fs_clients, fc_entry) { - if (fcj->fc_is_kernel) { -#if !TARGET_OS_SIMULATOR - firehose_client_kernel_source_handle_event(fcj); -#endif - } else { - dispatch_source_merge_data(fcj->fc_io_source, 1); - dispatch_source_merge_data(fcj->fc_mem_source, 1); - } - } - }); - }); + // cheating: equivalent to dispatch_group_enter() n times + // without the acquire barriers that we don't need + if (n) os_atomic_add2o(fs->fs_group, dg_value, n, relaxed); } static void @@ -979,10 +1186,37 @@ firehose_snapshot_finish(void *ctxt) static void firehose_snapshot_gate(void *ctxt) { + firehose_snapshot_t fs = ctxt; + // prevent other snapshots from running until done + dispatch_suspend(server_config.fs_snapshot_gate_queue); - dispatch_async_f(server_config.fs_io_drain_queue, ctxt, - firehose_snapshot_start); + + server_config.fs_snapshot = fs; + dispatch_group_async(fs->fs_group, server_config.fs_mem_drain_queue, ^{ + // start the fs_mem_snapshot, this is what triggers the snapshot + // logic from _drain() or handle_death() + fs->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_MEM_START, NULL); + firehose_snapshot_tickle_clients(fs, false); + + dispatch_group_async(fs->fs_group, server_config.fs_io_drain_queue, ^{ + // start the fs_io_snapshot, this is what triggers the snapshot + // logic from _drain() or handle_death() + // 29868879: must always happen after the memory snapshot started + fs->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_IO_START, NULL); + firehose_snapshot_tickle_clients(fs, true); + +#if !TARGET_OS_SIMULATOR + if (server_config.fs_kernel_client) { + firehose_client_kernel_source_handle_event( + server_config.fs_kernel_client); + } +#endif + }); + }); + + dispatch_group_notify_f(fs->fs_group, server_config.fs_io_drain_queue, + fs, firehose_snapshot_finish); } void @@ -993,12 +1227,6 @@ firehose_snapshot(firehose_snapshot_handler_t handler) snapshot->handler = Block_copy(handler); snapshot->fs_group = dispatch_group_create(); - // keep the group entered until IO_START and MEM_START have been sent - // See firehose_snapshot_start() - dispatch_group_enter(snapshot->fs_group); - dispatch_group_notify_f(snapshot->fs_group, server_config.fs_io_drain_queue, - snapshot, firehose_snapshot_finish); - dispatch_async_f(server_config.fs_snapshot_gate_queue, snapshot, firehose_snapshot_gate); } @@ -1010,7 +1238,8 @@ kern_return_t firehose_server_register(mach_port_t server_port OS_UNUSED, mach_port_t mem_port, mach_vm_size_t mem_size, mach_port_t comm_recvp, mach_port_t comm_sendp, - mach_port_t extra_info_port, mach_vm_size_t extra_info_size) + mach_port_t extra_info_port, mach_vm_size_t extra_info_size, + audit_token_t atoken) { mach_vm_address_t base_addr = 0; firehose_client_t fc = NULL; @@ -1060,7 +1289,7 @@ firehose_server_register(mach_port_t server_port OS_UNUSED, } fc = firehose_client_create((firehose_buffer_t)base_addr, - comm_recvp, comm_sendp); + (firehose_token_t)&atoken, comm_recvp, comm_sendp); dispatch_async(server_config.fs_io_drain_queue, ^{ firehose_client_resume(fc, &fcci); if (fcci.fcci_size) { @@ -1088,15 +1317,16 @@ firehose_server_push_async(mach_port_t server_port OS_UNUSED, if (expects_notifs && !fc->fc_use_notifs) { fc->fc_use_notifs = true; } - firehose_client_push_async_merge(fc, pp, for_io); + firehose_client_wakeup(fc, pp, for_io); } return KERN_SUCCESS; } kern_return_t -firehose_server_push(mach_port_t server_port OS_UNUSED, +firehose_server_push_and_wait(mach_port_t server_port OS_UNUSED, mach_port_t reply_port, qos_class_t qos, boolean_t for_io, - firehose_push_reply_t *push_reply OS_UNUSED) + firehose_push_reply_t *push_reply OS_UNUSED, + boolean_t *quarantinedOut OS_UNUSED) { firehose_client_t fc = cur_client_info; dispatch_block_flags_t flags = DISPATCH_BLOCK_ENFORCE_QOS_CLASS; @@ -1118,7 +1348,7 @@ firehose_server_push(mach_port_t server_port OS_UNUSED, } block = dispatch_block_create_with_qos_class(flags, qos, 0, ^{ - firehose_client_drain(fc, reply_port, + firehose_client_drain_one(fc, reply_port, for_io ? FIREHOSE_DRAIN_FOR_IO : 0); }); dispatch_async(q, block);