X-Git-Url: https://git.saurik.com/apple/libdispatch.git/blobdiff_plain/fa22f35b3ccab0081bb7090c32773dcd7463a045..refs/heads/master:/src/firehose/firehose_server.c diff --git a/src/firehose/firehose_server.c b/src/firehose/firehose_server.c index e27293e..ba335db 100644 --- a/src/firehose/firehose_server.c +++ b/src/firehose/firehose_server.c @@ -31,6 +31,11 @@ _Static_assert(offsetof(struct firehose_client_s, fc_mem_sent_flushed_pos) % 8 == 0, "Make sure atomic fields are properly aligned"); #endif +typedef struct fs_client_queue_s { + struct firehose_client_s *volatile fs_client_head; + struct firehose_client_s *volatile fs_client_tail; +} fs_client_queue_s, *fs_client_queue_t; + static struct firehose_server_s { mach_port_t fs_bootstrap_port; dispatch_mach_t fs_mach_channel; @@ -41,26 +46,161 @@ static struct firehose_server_s { firehose_handler_t fs_handler; firehose_snapshot_t fs_snapshot; - bool fs_io_snapshot_started; - bool fs_mem_snapshot_started; - int fs_kernel_fd; firehose_client_t fs_kernel_client; TAILQ_HEAD(, firehose_client_s) fs_clients; + os_unfair_lock fs_clients_lock; + fs_client_queue_s fs_queues[4]; + dispatch_source_t fs_sources[4]; } server_config = { .fs_clients = TAILQ_HEAD_INITIALIZER(server_config.fs_clients), + .fs_clients_lock = OS_UNFAIR_LOCK_INIT, .fs_kernel_fd = -1, }; -#pragma mark - -#pragma mark firehose client state machine +OS_ALWAYS_INLINE +static inline void +fs_clients_lock(void) +{ + os_unfair_lock_lock_with_options(&server_config.fs_clients_lock, + OS_UNFAIR_LOCK_DATA_SYNCHRONIZATION); +} + +OS_ALWAYS_INLINE +static inline void +fs_clients_unlock(void) +{ + os_unfair_lock_unlock(&server_config.fs_clients_lock); +} static void firehose_server_demux(firehose_client_t fc, mach_msg_header_t *msg_hdr); static void firehose_client_cancel(firehose_client_t fc); static void firehose_client_snapshot_finish(firehose_client_t fc, firehose_snapshot_t snapshot, bool for_io); +static void firehose_client_handle_death(void *ctxt); + +#pragma mark - +#pragma mark firehose client enqueueing + +OS_ALWAYS_INLINE +static inline bool +fs_idx_is_for_io(size_t idx) +{ + return idx & 1; +} + +OS_ALWAYS_INLINE +static inline bool +fs_queue_is_for_io(fs_client_queue_t q) +{ + return (q - server_config.fs_queues) & 1; +} + +OS_ALWAYS_INLINE +static inline bool +fs_queue_is_for_quarantined(fs_client_queue_t q) +{ + return (q - server_config.fs_queues) & 2; +} + +OS_ALWAYS_INLINE +static inline fs_client_queue_t +fs_queue(bool quarantined, bool for_io) +{ + return &server_config.fs_queues[quarantined * 2 + for_io]; +} + +OS_ALWAYS_INLINE +static inline dispatch_source_t +fs_source(bool quarantined, bool for_io) +{ + return server_config.fs_sources[quarantined * 2 + for_io]; +} + +OS_ALWAYS_INLINE +static inline void +firehose_client_push(firehose_client_t fc, pthread_priority_t pp, + bool quarantined, bool for_io) +{ + fs_client_queue_t queue = fs_queue(quarantined, for_io); + if (fc && os_mpsc_push_update_tail(queue, fs_client, fc, fc_next[for_io])) { + os_mpsc_push_update_head(queue, fs_client, fc); + _dispatch_source_merge_data(fs_source(quarantined, for_io), pp, 1); + } else if (pp) { + _dispatch_source_merge_data(fs_source(quarantined, for_io), pp, 1); + } +} + +OS_ALWAYS_INLINE +static inline bool +firehose_client_wakeup(firehose_client_t fc, pthread_priority_t pp, + bool for_io) +{ + uintptr_t canceled_bit = FC_STATE_CANCELED(for_io); + uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io); + uintptr_t old_state, new_state; + + os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, { + if (old_state & canceled_bit) { + os_atomic_rmw_loop_give_up(return false); + } + if (old_state & enqueued_bit) { + os_atomic_rmw_loop_give_up(break); + } + new_state = old_state | enqueued_bit; + }); + firehose_client_push(old_state & enqueued_bit ? NULL : fc, pp, + fc->fc_quarantined, for_io); + return true; +} + +OS_ALWAYS_INLINE +static inline void +firehose_client_start_cancel(firehose_client_t fc, bool for_io) +{ + uintptr_t canceling_bit = FC_STATE_CANCELING(for_io); + uintptr_t canceled_bit = FC_STATE_CANCELED(for_io); + uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io); + uintptr_t old_state, new_state; + + os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, { + if (old_state & (canceled_bit | canceling_bit)) { + os_atomic_rmw_loop_give_up(return); + } + new_state = old_state | enqueued_bit | canceling_bit; + }); + firehose_client_push(old_state & enqueued_bit ? NULL : fc, 0, + fc->fc_quarantined, for_io); +} + +OS_ALWAYS_INLINE +static inline bool +firehose_client_dequeue(firehose_client_t fc, bool for_io) +{ + uintptr_t canceling_bit = FC_STATE_CANCELING(for_io); + uintptr_t canceled_bit = FC_STATE_CANCELED(for_io); + uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io); + uintptr_t old_state, new_state; + + os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, { + new_state = old_state & ~(canceling_bit | enqueued_bit); + if (old_state & canceling_bit) { + new_state |= canceled_bit; + } + }); + + if (((old_state ^ new_state) & FC_STATE_CANCELED_MASK) && + (new_state & FC_STATE_CANCELED_MASK) == FC_STATE_CANCELED_MASK) { + dispatch_async_f(server_config.fs_io_drain_queue, fc, + firehose_client_handle_death); + } + return !(new_state & canceled_bit); +} + +#pragma mark - +#pragma mark firehose client state machine static void firehose_client_notify(firehose_client_t fc, mach_port_t reply_port) @@ -82,9 +222,11 @@ firehose_client_notify(firehose_client_t fc, mach_port_t reply_port) } } else { if (reply_port == fc->fc_sendp) { - kr = firehose_send_push_notify_async(reply_port, push_reply, 0); + kr = firehose_send_push_notify_async(reply_port, push_reply, + fc->fc_quarantined, 0); } else { - kr = firehose_send_push_reply(reply_port, KERN_SUCCESS, push_reply); + kr = firehose_send_push_reply(reply_port, KERN_SUCCESS, push_reply, + fc->fc_quarantined); } if (kr != MACH_SEND_INVALID_DEST) { DISPATCH_VERIFY_MIG(kr); @@ -106,18 +248,6 @@ firehose_client_acquire_head(firehose_buffer_t fb, bool for_io) return head; } -OS_ALWAYS_INLINE -static inline void -firehose_client_push_async_merge(firehose_client_t fc, pthread_priority_t pp, - bool for_io) -{ - if (for_io) { - _dispatch_source_merge_data(fc->fc_io_source, pp, 1); - } else { - _dispatch_source_merge_data(fc->fc_mem_source, pp, 1); - } -} - OS_NOINLINE OS_COLD static void firehose_client_mark_corrupted(firehose_client_t fc, mach_port_t reply_port) @@ -131,7 +261,7 @@ firehose_client_mark_corrupted(firehose_client_t fc, mach_port_t reply_port) if (reply_port) { kern_return_t kr = firehose_send_push_reply(reply_port, 0, - FIREHOSE_PUSH_REPLY_CORRUPTED); + FIREHOSE_PUSH_REPLY_CORRUPTED, false); DISPATCH_VERIFY_MIG(kr); dispatch_assume_zero(kr); } @@ -156,7 +286,7 @@ firehose_client_snapshot_mark_done(firehose_client_t fc, OS_NOINLINE static void -firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags) +firehose_client_drain_one(firehose_client_t fc, mach_port_t port, uint32_t flags) { firehose_buffer_t fb = fc->fc_buffer; firehose_chunk_t fbc; @@ -174,9 +304,7 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags) fbh_ring = fb->fb_header.fbh_io_ring; sent_flushed = (uint16_t)fc->fc_io_sent_flushed_pos; flushed = (uint16_t)fc->fc_io_flushed_pos; - if (fc->fc_needs_io_snapshot && server_config.fs_io_snapshot_started) { - snapshot = server_config.fs_snapshot; - } + if (fc->fc_needs_io_snapshot) snapshot = server_config.fs_snapshot; } else { evt = FIREHOSE_EVENT_MEM_BUFFER_RECEIVED; _Static_assert(FIREHOSE_EVENT_MEM_BUFFER_RECEIVED == @@ -184,9 +312,7 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags) fbh_ring = fb->fb_header.fbh_mem_ring; sent_flushed = (uint16_t)fc->fc_mem_sent_flushed_pos; flushed = (uint16_t)fc->fc_mem_flushed_pos; - if (fc->fc_needs_mem_snapshot && server_config.fs_mem_snapshot_started) { - snapshot = server_config.fs_snapshot; - } + if (fc->fc_needs_mem_snapshot) snapshot = server_config.fs_snapshot; } if (slowpath(fc->fc_memory_corrupted)) { @@ -273,12 +399,12 @@ firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags) // and there's more to drain, so optimistically schedule draining // again this is cheap since the queue is hot, and is fair for other // clients - firehose_client_push_async_merge(fc, 0, for_io); + firehose_client_wakeup(fc, 0, for_io); } if (count && server_config.fs_kernel_client) { // the kernel is special because it can drop messages, so if we're // draining, poll the kernel each time while we're bound to a thread - firehose_client_drain(server_config.fs_kernel_client, + firehose_client_drain_one(server_config.fs_kernel_client, MACH_PORT_NULL, flags | FIREHOSE_DRAIN_POLL); } } @@ -293,20 +419,36 @@ corrupt: // (needs__snapshot: false, memory_corrupted: true). we can safely // silence the corresponding source of drain wake-ups. if (fc->fc_pid) { - dispatch_source_cancel(for_io ? fc->fc_io_source : fc->fc_mem_source); + firehose_client_start_cancel(fc, for_io); } } static void -firehose_client_drain_io_async(void *ctx) -{ - firehose_client_drain(ctx, MACH_PORT_NULL, FIREHOSE_DRAIN_FOR_IO); -} - -static void -firehose_client_drain_mem_async(void *ctx) +firehose_client_drain(void *ctxt) { - firehose_client_drain(ctx, MACH_PORT_NULL, 0); + fs_client_queue_t queue = ctxt; + bool for_io = fs_queue_is_for_io(queue); + bool quarantined = fs_queue_is_for_quarantined(queue); + firehose_client_t fc, fc_next; + size_t clients = 0; + + while (queue->fs_client_tail) { + fc = os_mpsc_get_head(queue, fs_client); + do { + fc_next = os_mpsc_pop_head(queue, fs_client, fc, fc_next[for_io]); + if (firehose_client_dequeue(fc, for_io)) { + firehose_client_drain_one(fc, MACH_PORT_NULL, + for_io ? FIREHOSE_DRAIN_FOR_IO : 0); + } + // process quarantined clients 4 times as slow as the other ones + // also reasyncing every 4 clients allows for discovering + // quarantined suspension faster + if (++clients == (quarantined ? 1 : 4)) { + dispatch_source_merge_data(fs_source(quarantined, for_io), 1); + return; + } + } while ((fc = fc_next)); + } } OS_NOINLINE @@ -335,7 +477,10 @@ firehose_client_finalize(firehose_client_t fc OS_OBJECT_CONSUMED) } server_config.fs_handler(fc, FIREHOSE_EVENT_CLIENT_DIED, NULL); + fs_clients_lock(); TAILQ_REMOVE(&server_config.fs_clients, fc, fc_entry); + fs_clients_unlock(); + dispatch_release(fc->fc_mach_channel); fc->fc_mach_channel = NULL; fc->fc_entry.tqe_next = DISPATCH_OBJECT_LISTLESS; @@ -413,7 +558,7 @@ firehose_client_handle_death(void *ctxt) continue; } server_config.fs_handler(fc, FIREHOSE_EVENT_IO_BUFFER_RECEIVED, fbc); - if (fc->fc_needs_io_snapshot && server_config.fs_io_snapshot_started) { + if (fc->fc_needs_io_snapshot) { snapshot->handler(fc, FIREHOSE_SNAPSHOT_EVENT_IO_BUFFER, fbc); } } @@ -431,7 +576,7 @@ firehose_client_handle_death(void *ctxt) mem_bitmap_copy &= ~(1ULL << ref); server_config.fs_handler(fc, FIREHOSE_EVENT_MEM_BUFFER_RECEIVED, fbc); - if (fc->fc_needs_mem_snapshot && server_config.fs_mem_snapshot_started) { + if (fc->fc_needs_mem_snapshot) { snapshot->handler(fc, FIREHOSE_SNAPSHOT_EVENT_MEM_BUFFER, fbc); } } @@ -445,8 +590,9 @@ static void firehose_client_handle_mach_event(void *ctx, dispatch_mach_reason_t reason, dispatch_mach_msg_t dmsg, mach_error_t error OS_UNUSED) { - mach_msg_header_t *msg_hdr; + mach_msg_header_t *msg_hdr = NULL; firehose_client_t fc = ctx; + mach_port_t port; switch (reason) { case DISPATCH_MACH_MESSAGE_RECEIVED: @@ -460,7 +606,33 @@ firehose_client_handle_mach_event(void *ctx, dispatch_mach_reason_t reason, } break; + case DISPATCH_MACH_DISCONNECTED: + msg_hdr = dispatch_mach_msg_get_msg(dmsg, NULL); + port = msg_hdr->msgh_remote_port; + if (MACH_PORT_VALID(port)) { + if (port != fc->fc_sendp) { + DISPATCH_INTERNAL_CRASH(port, "Unknown send-right"); + } + firehose_mach_port_send_release(fc->fc_sendp); + fc->fc_sendp = MACH_PORT_NULL; + } + port = msg_hdr->msgh_local_port; + if (MACH_PORT_VALID(port)) { + if (port != fc->fc_recvp) { + DISPATCH_INTERNAL_CRASH(port, "Unknown recv-right"); + } + firehose_mach_port_recv_dispose(fc->fc_recvp, fc); + fc->fc_recvp = MACH_PORT_NULL; + } + break; + case DISPATCH_MACH_CANCELED: + if (MACH_PORT_VALID(fc->fc_sendp)) { + DISPATCH_INTERNAL_CRASH(fc->fc_sendp, "send-right leak"); + } + if (MACH_PORT_VALID(fc->fc_recvp)) { + DISPATCH_INTERNAL_CRASH(fc->fc_recvp, "recv-right leak"); + } firehose_client_cancel(fc); break; } @@ -475,10 +647,8 @@ firehose_client_kernel_source_handle_event(void *ctxt) // resumed in firehose_client_drain for both memory and I/O dispatch_suspend(fc->fc_kernel_source); dispatch_suspend(fc->fc_kernel_source); - dispatch_async_f(server_config.fs_mem_drain_queue, - fc, firehose_client_drain_mem_async); - dispatch_async_f(server_config.fs_io_drain_queue, - fc, firehose_client_drain_io_async); + firehose_client_wakeup(fc, 0, false); + firehose_client_wakeup(fc, 0, true); } #endif @@ -487,23 +657,23 @@ firehose_client_resume(firehose_client_t fc, const struct firehose_client_connected_info_s *fcci) { dispatch_assert_queue(server_config.fs_io_drain_queue); + + fs_clients_lock(); TAILQ_INSERT_TAIL(&server_config.fs_clients, fc, fc_entry); + fs_clients_unlock(); + server_config.fs_handler(fc, FIREHOSE_EVENT_CLIENT_CONNECTED, (void *)fcci); if (!fc->fc_pid) { dispatch_activate(fc->fc_kernel_source); } else { dispatch_mach_connect(fc->fc_mach_channel, fc->fc_recvp, fc->fc_sendp, NULL); - dispatch_activate(fc->fc_io_source); - dispatch_activate(fc->fc_mem_source); } } static void firehose_client_cancel(firehose_client_t fc) { - dispatch_block_t block; - _dispatch_debug("client died (unique_pid: 0x%llx", firehose_client_get_unique_pid(fc, NULL)); @@ -516,15 +686,8 @@ firehose_client_cancel(firehose_client_t fc) fc->fc_recvp = MACH_PORT_NULL; } fc->fc_use_notifs = false; - dispatch_source_cancel(fc->fc_io_source); - dispatch_source_cancel(fc->fc_mem_source); - - block = dispatch_block_create(DISPATCH_BLOCK_DETACHED, ^{ - dispatch_async_f(server_config.fs_io_drain_queue, fc, - firehose_client_handle_death); - }); - dispatch_async(server_config.fs_mem_drain_queue, block); - _Block_release(block); + firehose_client_start_cancel(fc, false); + firehose_client_start_cancel(fc, true); } static firehose_client_t @@ -562,28 +725,10 @@ firehose_client_create(firehose_buffer_t fb, firehose_token_t token, uint64_t unique_pid = fb->fb_header.fbh_uniquepid; firehose_client_t fc = _firehose_client_create(fb); dispatch_mach_t dm; - dispatch_source_t ds; fc->fc_pid = token->pid ? token->pid : ~0; fc->fc_euid = token->euid; fc->fc_pidversion = token->execcnt; - ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0, - server_config.fs_mem_drain_queue); - _os_object_retain_internal_inline(&fc->fc_as_os_object); - dispatch_set_context(ds, fc); - dispatch_set_finalizer_f(ds, - (dispatch_function_t)_os_object_release_internal); - dispatch_source_set_event_handler_f(ds, firehose_client_drain_mem_async); - fc->fc_mem_source = ds; - - ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0, - server_config.fs_io_drain_queue); - _os_object_retain_internal_inline(&fc->fc_as_os_object); - dispatch_set_context(ds, fc); - dispatch_set_finalizer_f(ds, - (dispatch_function_t)_os_object_release_internal); - dispatch_source_set_event_handler_f(ds, firehose_client_drain_io_async); - fc->fc_io_source = ds; _dispatch_debug("FIREHOSE_REGISTER (unique_pid: 0x%llx)", unique_pid); fc->fc_recvp = comm_recvp; @@ -656,12 +801,6 @@ _firehose_client_xref_dispose(firehose_client_t fc) { _dispatch_debug("Cleaning up client info for unique_pid 0x%llx", firehose_client_get_unique_pid(fc, NULL)); - - dispatch_release(fc->fc_io_source); - fc->fc_io_source = NULL; - - dispatch_release(fc->fc_mem_source); - fc->fc_mem_source = NULL; } uint64_t @@ -706,6 +845,12 @@ firehose_client_set_context(firehose_client_t fc, void *ctxt) return os_atomic_xchg2o(fc, fc_ctxt, ctxt, relaxed); } +void +firehose_client_initiate_quarantine(firehose_client_t fc) +{ + fc->fc_quarantined = true; +} + #pragma mark - #pragma mark firehose server @@ -734,22 +879,24 @@ void firehose_server_init(mach_port_t comm_port, firehose_handler_t handler) { struct firehose_server_s *fs = &server_config; - dispatch_queue_attr_t attr; + dispatch_queue_attr_t attr = DISPATCH_QUEUE_SERIAL_WITH_AUTORELEASE_POOL; + dispatch_queue_attr_t attr_ui; dispatch_mach_t dm; + dispatch_source_t ds; // just reference the string so that it's captured (void)os_atomic_load(&__libfirehose_serverVersionString[0], relaxed); - attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, + attr_ui = dispatch_queue_attr_make_with_qos_class(attr, QOS_CLASS_USER_INITIATED, 0); fs->fs_ipc_queue = dispatch_queue_create_with_target( - "com.apple.firehose.ipc", attr, NULL); + "com.apple.firehose.ipc", attr_ui, NULL); fs->fs_snapshot_gate_queue = dispatch_queue_create_with_target( - "com.apple.firehose.snapshot-gate", DISPATCH_QUEUE_SERIAL, NULL); + "com.apple.firehose.snapshot-gate", attr, NULL); fs->fs_io_drain_queue = dispatch_queue_create_with_target( - "com.apple.firehose.drain-io", DISPATCH_QUEUE_SERIAL, NULL); + "com.apple.firehose.drain-io", attr, NULL); fs->fs_mem_drain_queue = dispatch_queue_create_with_target( - "com.apple.firehose.drain-mem", DISPATCH_QUEUE_SERIAL, NULL); + "com.apple.firehose.drain-mem", attr, NULL); dm = dispatch_mach_create_f("com.apple.firehose.listener", fs->fs_ipc_queue, NULL, firehose_server_handle_mach_event); @@ -757,6 +904,15 @@ firehose_server_init(mach_port_t comm_port, firehose_handler_t handler) fs->fs_mach_channel = dm; fs->fs_handler = _Block_copy(handler); firehose_kernel_client_create(); + + for (size_t i = 0; i < countof(fs->fs_sources); i++) { + ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0, + fs_idx_is_for_io(i) ? server_config.fs_io_drain_queue : + server_config.fs_mem_drain_queue); + dispatch_set_context(ds, &fs->fs_queues[i]); + dispatch_source_set_event_handler_f(ds, firehose_client_drain); + fs->fs_sources[i] = ds; + } } void @@ -800,24 +956,23 @@ firehose_server_resume(void) } dispatch_mach_connect(fs->fs_mach_channel, fs->fs_bootstrap_port, MACH_PORT_NULL, NULL); -} - -OS_NOINLINE -static void -_firehose_server_cancel(void *ctxt OS_UNUSED) -{ - firehose_client_t fc; - TAILQ_FOREACH(fc, &server_config.fs_clients, fc_entry) { - dispatch_mach_cancel(fc->fc_mach_channel); + for (size_t i = 0; i < countof(fs->fs_sources); i++) { + dispatch_activate(fs->fs_sources[i]); } } void firehose_server_cancel(void) { + firehose_client_t fc; + dispatch_mach_cancel(server_config.fs_mach_channel); - dispatch_async_f(server_config.fs_io_drain_queue, NULL, - _firehose_server_cancel); + + fs_clients_lock(); + TAILQ_FOREACH(fc, &server_config.fs_clients, fc_entry) { + dispatch_mach_cancel(fc->fc_mach_channel); + } + fs_clients_unlock(); } dispatch_queue_t @@ -838,6 +993,37 @@ firehose_server_copy_queue(firehose_server_queue_t which) return dq; } +void +firehose_server_quarantined_suspend(firehose_server_queue_t which) +{ + switch (which) { + case FIREHOSE_SERVER_QUEUE_IO: + dispatch_suspend(fs_source(true, true)); + break; + case FIREHOSE_SERVER_QUEUE_MEMORY: + dispatch_suspend(fs_source(true, false)); + break; + default: + DISPATCH_INTERNAL_CRASH(which, "Invalid firehose server queue type"); + } +} + +void +firehose_server_quarantined_resume(firehose_server_queue_t which) +{ + switch (which) { + case FIREHOSE_SERVER_QUEUE_IO: + dispatch_resume(fs_source(true, true)); + break; + case FIREHOSE_SERVER_QUEUE_MEMORY: + dispatch_resume(fs_source(true, false)); + break; + default: + DISPATCH_INTERNAL_CRASH(which, "Invalid firehose server queue type"); + } +} + + #pragma mark - #pragma mark firehose snapshot and peeking @@ -950,73 +1136,35 @@ firehose_client_snapshot_finish(firehose_client_t fc, } static void -firehose_snapshot_start(void *ctxt) +firehose_snapshot_tickle_clients(firehose_snapshot_t fs, bool for_io) { - firehose_snapshot_t snapshot = ctxt; - firehose_client_t fci; + firehose_client_t fc; long n = 0; - // 0. we need to be on the IO queue so that client connection and/or death - // cannot happen concurrently - dispatch_assert_queue(server_config.fs_io_drain_queue); - server_config.fs_snapshot = snapshot; - - // 1. mark all the clients participating in the current snapshot - // and enter the group for each bit set - TAILQ_FOREACH(fci, &server_config.fs_clients, fc_entry) { - if (!fci->fc_pid) { + fs_clients_lock(); + TAILQ_FOREACH(fc, &server_config.fs_clients, fc_entry) { + if (slowpath(fc->fc_memory_corrupted)) { + continue; + } + if (!fc->fc_pid) { #if TARGET_OS_SIMULATOR continue; #endif - } - if (slowpath(fci->fc_memory_corrupted)) { + } else if (!firehose_client_wakeup(fc, 0, for_io)) { continue; } - fci->fc_needs_io_snapshot = true; - fci->fc_needs_mem_snapshot = true; - n += 2; - } - if (n) { - // cheating: equivalent to dispatch_group_enter() n times - // without the acquire barriers that we don't need - os_atomic_add2o(snapshot->fs_group, dg_value, n, relaxed); + n++; + if (for_io) { + fc->fc_needs_io_snapshot = true; + } else { + fc->fc_needs_mem_snapshot = true; + } } + fs_clients_unlock(); - dispatch_async(server_config.fs_mem_drain_queue, ^{ - // 2. start the fs_mem_snapshot, this is what triggers the snapshot - // logic from _drain() or handle_death() - server_config.fs_mem_snapshot_started = true; - snapshot->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_MEM_START, NULL); - - dispatch_async(server_config.fs_io_drain_queue, ^{ - firehose_client_t fcj; - - // 3. start the fs_io_snapshot, this is what triggers the snapshot - // logic from _drain() or handle_death() - // 29868879: must always happen after the memory snapshot started - server_config.fs_io_snapshot_started = true; - snapshot->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_IO_START, NULL); - - // match group_enter from firehose_snapshot() after MEM+IO_START - dispatch_group_leave(snapshot->fs_group); - - // 3. tickle all the clients. the list of clients may have changed - // since step 1, but worry not - new clients don't have - // fc_needs_*_snapshot set so drain is harmless; clients that - // were removed from the list have already left the group - // (see firehose_client_finalize()) - TAILQ_FOREACH(fcj, &server_config.fs_clients, fc_entry) { - if (!fcj->fc_pid) { -#if !TARGET_OS_SIMULATOR - firehose_client_kernel_source_handle_event(fcj); -#endif - } else { - dispatch_source_merge_data(fcj->fc_io_source, 1); - dispatch_source_merge_data(fcj->fc_mem_source, 1); - } - } - }); - }); + // cheating: equivalent to dispatch_group_enter() n times + // without the acquire barriers that we don't need + if (n) os_atomic_add2o(fs->fs_group, dg_value, n, relaxed); } static void @@ -1026,8 +1174,6 @@ firehose_snapshot_finish(void *ctxt) fs->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_COMPLETE, NULL); server_config.fs_snapshot = NULL; - server_config.fs_mem_snapshot_started = false; - server_config.fs_io_snapshot_started = false; dispatch_release(fs->fs_group); Block_release(fs->handler); @@ -1040,10 +1186,37 @@ firehose_snapshot_finish(void *ctxt) static void firehose_snapshot_gate(void *ctxt) { + firehose_snapshot_t fs = ctxt; + // prevent other snapshots from running until done + dispatch_suspend(server_config.fs_snapshot_gate_queue); - dispatch_async_f(server_config.fs_io_drain_queue, ctxt, - firehose_snapshot_start); + + server_config.fs_snapshot = fs; + dispatch_group_async(fs->fs_group, server_config.fs_mem_drain_queue, ^{ + // start the fs_mem_snapshot, this is what triggers the snapshot + // logic from _drain() or handle_death() + fs->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_MEM_START, NULL); + firehose_snapshot_tickle_clients(fs, false); + + dispatch_group_async(fs->fs_group, server_config.fs_io_drain_queue, ^{ + // start the fs_io_snapshot, this is what triggers the snapshot + // logic from _drain() or handle_death() + // 29868879: must always happen after the memory snapshot started + fs->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_IO_START, NULL); + firehose_snapshot_tickle_clients(fs, true); + +#if !TARGET_OS_SIMULATOR + if (server_config.fs_kernel_client) { + firehose_client_kernel_source_handle_event( + server_config.fs_kernel_client); + } +#endif + }); + }); + + dispatch_group_notify_f(fs->fs_group, server_config.fs_io_drain_queue, + fs, firehose_snapshot_finish); } void @@ -1054,12 +1227,6 @@ firehose_snapshot(firehose_snapshot_handler_t handler) snapshot->handler = Block_copy(handler); snapshot->fs_group = dispatch_group_create(); - // keep the group entered until IO_START and MEM_START have been sent - // See firehose_snapshot_start() - dispatch_group_enter(snapshot->fs_group); - dispatch_group_notify_f(snapshot->fs_group, server_config.fs_io_drain_queue, - snapshot, firehose_snapshot_finish); - dispatch_async_f(server_config.fs_snapshot_gate_queue, snapshot, firehose_snapshot_gate); } @@ -1150,15 +1317,16 @@ firehose_server_push_async(mach_port_t server_port OS_UNUSED, if (expects_notifs && !fc->fc_use_notifs) { fc->fc_use_notifs = true; } - firehose_client_push_async_merge(fc, pp, for_io); + firehose_client_wakeup(fc, pp, for_io); } return KERN_SUCCESS; } kern_return_t -firehose_server_push(mach_port_t server_port OS_UNUSED, +firehose_server_push_and_wait(mach_port_t server_port OS_UNUSED, mach_port_t reply_port, qos_class_t qos, boolean_t for_io, - firehose_push_reply_t *push_reply OS_UNUSED) + firehose_push_reply_t *push_reply OS_UNUSED, + boolean_t *quarantinedOut OS_UNUSED) { firehose_client_t fc = cur_client_info; dispatch_block_flags_t flags = DISPATCH_BLOCK_ENFORCE_QOS_CLASS; @@ -1180,7 +1348,7 @@ firehose_server_push(mach_port_t server_port OS_UNUSED, } block = dispatch_block_create_with_qos_class(flags, qos, 0, ^{ - firehose_client_drain(fc, reply_port, + firehose_client_drain_one(fc, reply_port, for_io ? FIREHOSE_DRAIN_FOR_IO : 0); }); dispatch_async(q, block);