% 8 == 0, "Make sure atomic fields are properly aligned");
#endif
+typedef struct fs_client_queue_s {
+ struct firehose_client_s *volatile fs_client_head;
+ struct firehose_client_s *volatile fs_client_tail;
+} fs_client_queue_s, *fs_client_queue_t;
+
static struct firehose_server_s {
mach_port_t fs_bootstrap_port;
dispatch_mach_t fs_mach_channel;
firehose_handler_t fs_handler;
firehose_snapshot_t fs_snapshot;
- bool fs_io_snapshot_started;
- bool fs_mem_snapshot_started;
-
int fs_kernel_fd;
firehose_client_t fs_kernel_client;
TAILQ_HEAD(, firehose_client_s) fs_clients;
+ os_unfair_lock fs_clients_lock;
+ fs_client_queue_s fs_queues[4];
+ dispatch_source_t fs_sources[4];
} server_config = {
.fs_clients = TAILQ_HEAD_INITIALIZER(server_config.fs_clients),
+ .fs_clients_lock = OS_UNFAIR_LOCK_INIT,
.fs_kernel_fd = -1,
};
-#pragma mark -
-#pragma mark firehose client state machine
+OS_ALWAYS_INLINE
+static inline void
+fs_clients_lock(void)
+{
+ os_unfair_lock_lock_with_options(&server_config.fs_clients_lock,
+ OS_UNFAIR_LOCK_DATA_SYNCHRONIZATION);
+}
+
+OS_ALWAYS_INLINE
+static inline void
+fs_clients_unlock(void)
+{
+ os_unfair_lock_unlock(&server_config.fs_clients_lock);
+}
static void firehose_server_demux(firehose_client_t fc,
mach_msg_header_t *msg_hdr);
static void firehose_client_cancel(firehose_client_t fc);
static void firehose_client_snapshot_finish(firehose_client_t fc,
firehose_snapshot_t snapshot, bool for_io);
+static void firehose_client_handle_death(void *ctxt);
+
+#pragma mark -
+#pragma mark firehose client enqueueing
+
+OS_ALWAYS_INLINE
+static inline bool
+fs_idx_is_for_io(size_t idx)
+{
+ return idx & 1;
+}
+
+OS_ALWAYS_INLINE
+static inline bool
+fs_queue_is_for_io(fs_client_queue_t q)
+{
+ return (q - server_config.fs_queues) & 1;
+}
+
+OS_ALWAYS_INLINE
+static inline bool
+fs_queue_is_for_quarantined(fs_client_queue_t q)
+{
+ return (q - server_config.fs_queues) & 2;
+}
+
+OS_ALWAYS_INLINE
+static inline fs_client_queue_t
+fs_queue(bool quarantined, bool for_io)
+{
+ return &server_config.fs_queues[quarantined * 2 + for_io];
+}
+
+OS_ALWAYS_INLINE
+static inline dispatch_source_t
+fs_source(bool quarantined, bool for_io)
+{
+ return server_config.fs_sources[quarantined * 2 + for_io];
+}
+
+OS_ALWAYS_INLINE
+static inline void
+firehose_client_push(firehose_client_t fc, pthread_priority_t pp,
+ bool quarantined, bool for_io)
+{
+ fs_client_queue_t queue = fs_queue(quarantined, for_io);
+ if (fc && os_mpsc_push_update_tail(queue, fs_client, fc, fc_next[for_io])) {
+ os_mpsc_push_update_head(queue, fs_client, fc);
+ _dispatch_source_merge_data(fs_source(quarantined, for_io), pp, 1);
+ } else if (pp) {
+ _dispatch_source_merge_data(fs_source(quarantined, for_io), pp, 1);
+ }
+}
+
+OS_ALWAYS_INLINE
+static inline bool
+firehose_client_wakeup(firehose_client_t fc, pthread_priority_t pp,
+ bool for_io)
+{
+ uintptr_t canceled_bit = FC_STATE_CANCELED(for_io);
+ uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io);
+ uintptr_t old_state, new_state;
+
+ os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, {
+ if (old_state & canceled_bit) {
+ os_atomic_rmw_loop_give_up(return false);
+ }
+ if (old_state & enqueued_bit) {
+ os_atomic_rmw_loop_give_up(break);
+ }
+ new_state = old_state | enqueued_bit;
+ });
+ firehose_client_push(old_state & enqueued_bit ? NULL : fc, pp,
+ fc->fc_quarantined, for_io);
+ return true;
+}
+
+OS_ALWAYS_INLINE
+static inline void
+firehose_client_start_cancel(firehose_client_t fc, bool for_io)
+{
+ uintptr_t canceling_bit = FC_STATE_CANCELING(for_io);
+ uintptr_t canceled_bit = FC_STATE_CANCELED(for_io);
+ uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io);
+ uintptr_t old_state, new_state;
+
+ os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, {
+ if (old_state & (canceled_bit | canceling_bit)) {
+ os_atomic_rmw_loop_give_up(return);
+ }
+ new_state = old_state | enqueued_bit | canceling_bit;
+ });
+ firehose_client_push(old_state & enqueued_bit ? NULL : fc, 0,
+ fc->fc_quarantined, for_io);
+}
+
+OS_ALWAYS_INLINE
+static inline bool
+firehose_client_dequeue(firehose_client_t fc, bool for_io)
+{
+ uintptr_t canceling_bit = FC_STATE_CANCELING(for_io);
+ uintptr_t canceled_bit = FC_STATE_CANCELED(for_io);
+ uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io);
+ uintptr_t old_state, new_state;
+
+ os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, {
+ new_state = old_state & ~(canceling_bit | enqueued_bit);
+ if (old_state & canceling_bit) {
+ new_state |= canceled_bit;
+ }
+ });
+
+ if (((old_state ^ new_state) & FC_STATE_CANCELED_MASK) &&
+ (new_state & FC_STATE_CANCELED_MASK) == FC_STATE_CANCELED_MASK) {
+ dispatch_async_f(server_config.fs_io_drain_queue, fc,
+ firehose_client_handle_death);
+ }
+ return !(new_state & canceled_bit);
+}
+
+#pragma mark -
+#pragma mark firehose client state machine
static void
firehose_client_notify(firehose_client_t fc, mach_port_t reply_port)
}
} else {
if (reply_port == fc->fc_sendp) {
- kr = firehose_send_push_notify_async(reply_port, push_reply, 0);
+ kr = firehose_send_push_notify_async(reply_port, push_reply,
+ fc->fc_quarantined, 0);
} else {
- kr = firehose_send_push_reply(reply_port, KERN_SUCCESS, push_reply);
+ kr = firehose_send_push_reply(reply_port, KERN_SUCCESS, push_reply,
+ fc->fc_quarantined);
}
if (kr != MACH_SEND_INVALID_DEST) {
DISPATCH_VERIFY_MIG(kr);
return head;
}
-OS_ALWAYS_INLINE
-static inline void
-firehose_client_push_async_merge(firehose_client_t fc, pthread_priority_t pp,
- bool for_io)
-{
- if (for_io) {
- _dispatch_source_merge_data(fc->fc_io_source, pp, 1);
- } else {
- _dispatch_source_merge_data(fc->fc_mem_source, pp, 1);
- }
-}
-
OS_NOINLINE OS_COLD
static void
firehose_client_mark_corrupted(firehose_client_t fc, mach_port_t reply_port)
if (reply_port) {
kern_return_t kr = firehose_send_push_reply(reply_port, 0,
- FIREHOSE_PUSH_REPLY_CORRUPTED);
+ FIREHOSE_PUSH_REPLY_CORRUPTED, false);
DISPATCH_VERIFY_MIG(kr);
dispatch_assume_zero(kr);
}
OS_NOINLINE
static void
-firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags)
+firehose_client_drain_one(firehose_client_t fc, mach_port_t port, uint32_t flags)
{
firehose_buffer_t fb = fc->fc_buffer;
firehose_chunk_t fbc;
fbh_ring = fb->fb_header.fbh_io_ring;
sent_flushed = (uint16_t)fc->fc_io_sent_flushed_pos;
flushed = (uint16_t)fc->fc_io_flushed_pos;
- if (fc->fc_needs_io_snapshot && server_config.fs_io_snapshot_started) {
- snapshot = server_config.fs_snapshot;
- }
+ if (fc->fc_needs_io_snapshot) snapshot = server_config.fs_snapshot;
} else {
evt = FIREHOSE_EVENT_MEM_BUFFER_RECEIVED;
_Static_assert(FIREHOSE_EVENT_MEM_BUFFER_RECEIVED ==
fbh_ring = fb->fb_header.fbh_mem_ring;
sent_flushed = (uint16_t)fc->fc_mem_sent_flushed_pos;
flushed = (uint16_t)fc->fc_mem_flushed_pos;
- if (fc->fc_needs_mem_snapshot && server_config.fs_mem_snapshot_started) {
- snapshot = server_config.fs_snapshot;
- }
+ if (fc->fc_needs_mem_snapshot) snapshot = server_config.fs_snapshot;
}
if (slowpath(fc->fc_memory_corrupted)) {
// and there's more to drain, so optimistically schedule draining
// again this is cheap since the queue is hot, and is fair for other
// clients
- firehose_client_push_async_merge(fc, 0, for_io);
+ firehose_client_wakeup(fc, 0, for_io);
}
if (count && server_config.fs_kernel_client) {
// the kernel is special because it can drop messages, so if we're
// draining, poll the kernel each time while we're bound to a thread
- firehose_client_drain(server_config.fs_kernel_client,
+ firehose_client_drain_one(server_config.fs_kernel_client,
MACH_PORT_NULL, flags | FIREHOSE_DRAIN_POLL);
}
}
// (needs_<for_io>_snapshot: false, memory_corrupted: true). we can safely
// silence the corresponding source of drain wake-ups.
if (fc->fc_pid) {
- dispatch_source_cancel(for_io ? fc->fc_io_source : fc->fc_mem_source);
+ firehose_client_start_cancel(fc, for_io);
}
}
static void
-firehose_client_drain_io_async(void *ctx)
-{
- firehose_client_drain(ctx, MACH_PORT_NULL, FIREHOSE_DRAIN_FOR_IO);
-}
-
-static void
-firehose_client_drain_mem_async(void *ctx)
+firehose_client_drain(void *ctxt)
{
- firehose_client_drain(ctx, MACH_PORT_NULL, 0);
+ fs_client_queue_t queue = ctxt;
+ bool for_io = fs_queue_is_for_io(queue);
+ bool quarantined = fs_queue_is_for_quarantined(queue);
+ firehose_client_t fc, fc_next;
+ size_t clients = 0;
+
+ while (queue->fs_client_tail) {
+ fc = os_mpsc_get_head(queue, fs_client);
+ do {
+ fc_next = os_mpsc_pop_head(queue, fs_client, fc, fc_next[for_io]);
+ if (firehose_client_dequeue(fc, for_io)) {
+ firehose_client_drain_one(fc, MACH_PORT_NULL,
+ for_io ? FIREHOSE_DRAIN_FOR_IO : 0);
+ }
+ // process quarantined clients 4 times as slow as the other ones
+ // also reasyncing every 4 clients allows for discovering
+ // quarantined suspension faster
+ if (++clients == (quarantined ? 1 : 4)) {
+ dispatch_source_merge_data(fs_source(quarantined, for_io), 1);
+ return;
+ }
+ } while ((fc = fc_next));
+ }
}
OS_NOINLINE
}
server_config.fs_handler(fc, FIREHOSE_EVENT_CLIENT_DIED, NULL);
+ fs_clients_lock();
TAILQ_REMOVE(&server_config.fs_clients, fc, fc_entry);
+ fs_clients_unlock();
+
dispatch_release(fc->fc_mach_channel);
fc->fc_mach_channel = NULL;
fc->fc_entry.tqe_next = DISPATCH_OBJECT_LISTLESS;
continue;
}
server_config.fs_handler(fc, FIREHOSE_EVENT_IO_BUFFER_RECEIVED, fbc);
- if (fc->fc_needs_io_snapshot && server_config.fs_io_snapshot_started) {
+ if (fc->fc_needs_io_snapshot) {
snapshot->handler(fc, FIREHOSE_SNAPSHOT_EVENT_IO_BUFFER, fbc);
}
}
mem_bitmap_copy &= ~(1ULL << ref);
server_config.fs_handler(fc, FIREHOSE_EVENT_MEM_BUFFER_RECEIVED, fbc);
- if (fc->fc_needs_mem_snapshot && server_config.fs_mem_snapshot_started) {
+ if (fc->fc_needs_mem_snapshot) {
snapshot->handler(fc, FIREHOSE_SNAPSHOT_EVENT_MEM_BUFFER, fbc);
}
}
firehose_client_handle_mach_event(void *ctx, dispatch_mach_reason_t reason,
dispatch_mach_msg_t dmsg, mach_error_t error OS_UNUSED)
{
- mach_msg_header_t *msg_hdr;
+ mach_msg_header_t *msg_hdr = NULL;
firehose_client_t fc = ctx;
+ mach_port_t port;
switch (reason) {
case DISPATCH_MACH_MESSAGE_RECEIVED:
}
break;
+ case DISPATCH_MACH_DISCONNECTED:
+ msg_hdr = dispatch_mach_msg_get_msg(dmsg, NULL);
+ port = msg_hdr->msgh_remote_port;
+ if (MACH_PORT_VALID(port)) {
+ if (port != fc->fc_sendp) {
+ DISPATCH_INTERNAL_CRASH(port, "Unknown send-right");
+ }
+ firehose_mach_port_send_release(fc->fc_sendp);
+ fc->fc_sendp = MACH_PORT_NULL;
+ }
+ port = msg_hdr->msgh_local_port;
+ if (MACH_PORT_VALID(port)) {
+ if (port != fc->fc_recvp) {
+ DISPATCH_INTERNAL_CRASH(port, "Unknown recv-right");
+ }
+ firehose_mach_port_recv_dispose(fc->fc_recvp, fc);
+ fc->fc_recvp = MACH_PORT_NULL;
+ }
+ break;
+
case DISPATCH_MACH_CANCELED:
+ if (MACH_PORT_VALID(fc->fc_sendp)) {
+ DISPATCH_INTERNAL_CRASH(fc->fc_sendp, "send-right leak");
+ }
+ if (MACH_PORT_VALID(fc->fc_recvp)) {
+ DISPATCH_INTERNAL_CRASH(fc->fc_recvp, "recv-right leak");
+ }
firehose_client_cancel(fc);
break;
}
// resumed in firehose_client_drain for both memory and I/O
dispatch_suspend(fc->fc_kernel_source);
dispatch_suspend(fc->fc_kernel_source);
- dispatch_async_f(server_config.fs_mem_drain_queue,
- fc, firehose_client_drain_mem_async);
- dispatch_async_f(server_config.fs_io_drain_queue,
- fc, firehose_client_drain_io_async);
+ firehose_client_wakeup(fc, 0, false);
+ firehose_client_wakeup(fc, 0, true);
}
#endif
const struct firehose_client_connected_info_s *fcci)
{
dispatch_assert_queue(server_config.fs_io_drain_queue);
+
+ fs_clients_lock();
TAILQ_INSERT_TAIL(&server_config.fs_clients, fc, fc_entry);
+ fs_clients_unlock();
+
server_config.fs_handler(fc, FIREHOSE_EVENT_CLIENT_CONNECTED, (void *)fcci);
if (!fc->fc_pid) {
dispatch_activate(fc->fc_kernel_source);
} else {
dispatch_mach_connect(fc->fc_mach_channel,
fc->fc_recvp, fc->fc_sendp, NULL);
- dispatch_activate(fc->fc_io_source);
- dispatch_activate(fc->fc_mem_source);
}
}
static void
firehose_client_cancel(firehose_client_t fc)
{
- dispatch_block_t block;
-
_dispatch_debug("client died (unique_pid: 0x%llx",
firehose_client_get_unique_pid(fc, NULL));
fc->fc_recvp = MACH_PORT_NULL;
}
fc->fc_use_notifs = false;
- dispatch_source_cancel(fc->fc_io_source);
- dispatch_source_cancel(fc->fc_mem_source);
-
- block = dispatch_block_create(DISPATCH_BLOCK_DETACHED, ^{
- dispatch_async_f(server_config.fs_io_drain_queue, fc,
- firehose_client_handle_death);
- });
- dispatch_async(server_config.fs_mem_drain_queue, block);
- _Block_release(block);
+ firehose_client_start_cancel(fc, false);
+ firehose_client_start_cancel(fc, true);
}
static firehose_client_t
uint64_t unique_pid = fb->fb_header.fbh_uniquepid;
firehose_client_t fc = _firehose_client_create(fb);
dispatch_mach_t dm;
- dispatch_source_t ds;
fc->fc_pid = token->pid ? token->pid : ~0;
fc->fc_euid = token->euid;
fc->fc_pidversion = token->execcnt;
- ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0,
- server_config.fs_mem_drain_queue);
- _os_object_retain_internal_inline(&fc->fc_as_os_object);
- dispatch_set_context(ds, fc);
- dispatch_set_finalizer_f(ds,
- (dispatch_function_t)_os_object_release_internal);
- dispatch_source_set_event_handler_f(ds, firehose_client_drain_mem_async);
- fc->fc_mem_source = ds;
-
- ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0,
- server_config.fs_io_drain_queue);
- _os_object_retain_internal_inline(&fc->fc_as_os_object);
- dispatch_set_context(ds, fc);
- dispatch_set_finalizer_f(ds,
- (dispatch_function_t)_os_object_release_internal);
- dispatch_source_set_event_handler_f(ds, firehose_client_drain_io_async);
- fc->fc_io_source = ds;
_dispatch_debug("FIREHOSE_REGISTER (unique_pid: 0x%llx)", unique_pid);
fc->fc_recvp = comm_recvp;
{
_dispatch_debug("Cleaning up client info for unique_pid 0x%llx",
firehose_client_get_unique_pid(fc, NULL));
-
- dispatch_release(fc->fc_io_source);
- fc->fc_io_source = NULL;
-
- dispatch_release(fc->fc_mem_source);
- fc->fc_mem_source = NULL;
}
uint64_t
return os_atomic_xchg2o(fc, fc_ctxt, ctxt, relaxed);
}
+void
+firehose_client_initiate_quarantine(firehose_client_t fc)
+{
+ fc->fc_quarantined = true;
+}
+
#pragma mark -
#pragma mark firehose server
firehose_server_init(mach_port_t comm_port, firehose_handler_t handler)
{
struct firehose_server_s *fs = &server_config;
- dispatch_queue_attr_t attr;
+ dispatch_queue_attr_t attr = DISPATCH_QUEUE_SERIAL_WITH_AUTORELEASE_POOL;
+ dispatch_queue_attr_t attr_ui;
dispatch_mach_t dm;
+ dispatch_source_t ds;
// just reference the string so that it's captured
(void)os_atomic_load(&__libfirehose_serverVersionString[0], relaxed);
- attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL,
+ attr_ui = dispatch_queue_attr_make_with_qos_class(attr,
QOS_CLASS_USER_INITIATED, 0);
fs->fs_ipc_queue = dispatch_queue_create_with_target(
- "com.apple.firehose.ipc", attr, NULL);
+ "com.apple.firehose.ipc", attr_ui, NULL);
fs->fs_snapshot_gate_queue = dispatch_queue_create_with_target(
- "com.apple.firehose.snapshot-gate", DISPATCH_QUEUE_SERIAL, NULL);
+ "com.apple.firehose.snapshot-gate", attr, NULL);
fs->fs_io_drain_queue = dispatch_queue_create_with_target(
- "com.apple.firehose.drain-io", DISPATCH_QUEUE_SERIAL, NULL);
+ "com.apple.firehose.drain-io", attr, NULL);
fs->fs_mem_drain_queue = dispatch_queue_create_with_target(
- "com.apple.firehose.drain-mem", DISPATCH_QUEUE_SERIAL, NULL);
+ "com.apple.firehose.drain-mem", attr, NULL);
dm = dispatch_mach_create_f("com.apple.firehose.listener",
fs->fs_ipc_queue, NULL, firehose_server_handle_mach_event);
fs->fs_mach_channel = dm;
fs->fs_handler = _Block_copy(handler);
firehose_kernel_client_create();
+
+ for (size_t i = 0; i < countof(fs->fs_sources); i++) {
+ ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR, 0, 0,
+ fs_idx_is_for_io(i) ? server_config.fs_io_drain_queue :
+ server_config.fs_mem_drain_queue);
+ dispatch_set_context(ds, &fs->fs_queues[i]);
+ dispatch_source_set_event_handler_f(ds, firehose_client_drain);
+ fs->fs_sources[i] = ds;
+ }
}
void
}
dispatch_mach_connect(fs->fs_mach_channel, fs->fs_bootstrap_port,
MACH_PORT_NULL, NULL);
-}
-
-OS_NOINLINE
-static void
-_firehose_server_cancel(void *ctxt OS_UNUSED)
-{
- firehose_client_t fc;
- TAILQ_FOREACH(fc, &server_config.fs_clients, fc_entry) {
- dispatch_mach_cancel(fc->fc_mach_channel);
+ for (size_t i = 0; i < countof(fs->fs_sources); i++) {
+ dispatch_activate(fs->fs_sources[i]);
}
}
void
firehose_server_cancel(void)
{
+ firehose_client_t fc;
+
dispatch_mach_cancel(server_config.fs_mach_channel);
- dispatch_async_f(server_config.fs_io_drain_queue, NULL,
- _firehose_server_cancel);
+
+ fs_clients_lock();
+ TAILQ_FOREACH(fc, &server_config.fs_clients, fc_entry) {
+ dispatch_mach_cancel(fc->fc_mach_channel);
+ }
+ fs_clients_unlock();
}
dispatch_queue_t
return dq;
}
+void
+firehose_server_quarantined_suspend(firehose_server_queue_t which)
+{
+ switch (which) {
+ case FIREHOSE_SERVER_QUEUE_IO:
+ dispatch_suspend(fs_source(true, true));
+ break;
+ case FIREHOSE_SERVER_QUEUE_MEMORY:
+ dispatch_suspend(fs_source(true, false));
+ break;
+ default:
+ DISPATCH_INTERNAL_CRASH(which, "Invalid firehose server queue type");
+ }
+}
+
+void
+firehose_server_quarantined_resume(firehose_server_queue_t which)
+{
+ switch (which) {
+ case FIREHOSE_SERVER_QUEUE_IO:
+ dispatch_resume(fs_source(true, true));
+ break;
+ case FIREHOSE_SERVER_QUEUE_MEMORY:
+ dispatch_resume(fs_source(true, false));
+ break;
+ default:
+ DISPATCH_INTERNAL_CRASH(which, "Invalid firehose server queue type");
+ }
+}
+
+
#pragma mark -
#pragma mark firehose snapshot and peeking
}
static void
-firehose_snapshot_start(void *ctxt)
+firehose_snapshot_tickle_clients(firehose_snapshot_t fs, bool for_io)
{
- firehose_snapshot_t snapshot = ctxt;
- firehose_client_t fci;
+ firehose_client_t fc;
long n = 0;
- // 0. we need to be on the IO queue so that client connection and/or death
- // cannot happen concurrently
- dispatch_assert_queue(server_config.fs_io_drain_queue);
- server_config.fs_snapshot = snapshot;
-
- // 1. mark all the clients participating in the current snapshot
- // and enter the group for each bit set
- TAILQ_FOREACH(fci, &server_config.fs_clients, fc_entry) {
- if (!fci->fc_pid) {
+ fs_clients_lock();
+ TAILQ_FOREACH(fc, &server_config.fs_clients, fc_entry) {
+ if (slowpath(fc->fc_memory_corrupted)) {
+ continue;
+ }
+ if (!fc->fc_pid) {
#if TARGET_OS_SIMULATOR
continue;
#endif
- }
- if (slowpath(fci->fc_memory_corrupted)) {
+ } else if (!firehose_client_wakeup(fc, 0, for_io)) {
continue;
}
- fci->fc_needs_io_snapshot = true;
- fci->fc_needs_mem_snapshot = true;
- n += 2;
- }
- if (n) {
- // cheating: equivalent to dispatch_group_enter() n times
- // without the acquire barriers that we don't need
- os_atomic_add2o(snapshot->fs_group, dg_value, n, relaxed);
+ n++;
+ if (for_io) {
+ fc->fc_needs_io_snapshot = true;
+ } else {
+ fc->fc_needs_mem_snapshot = true;
+ }
}
+ fs_clients_unlock();
- dispatch_async(server_config.fs_mem_drain_queue, ^{
- // 2. start the fs_mem_snapshot, this is what triggers the snapshot
- // logic from _drain() or handle_death()
- server_config.fs_mem_snapshot_started = true;
- snapshot->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_MEM_START, NULL);
-
- dispatch_async(server_config.fs_io_drain_queue, ^{
- firehose_client_t fcj;
-
- // 3. start the fs_io_snapshot, this is what triggers the snapshot
- // logic from _drain() or handle_death()
- // 29868879: must always happen after the memory snapshot started
- server_config.fs_io_snapshot_started = true;
- snapshot->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_IO_START, NULL);
-
- // match group_enter from firehose_snapshot() after MEM+IO_START
- dispatch_group_leave(snapshot->fs_group);
-
- // 3. tickle all the clients. the list of clients may have changed
- // since step 1, but worry not - new clients don't have
- // fc_needs_*_snapshot set so drain is harmless; clients that
- // were removed from the list have already left the group
- // (see firehose_client_finalize())
- TAILQ_FOREACH(fcj, &server_config.fs_clients, fc_entry) {
- if (!fcj->fc_pid) {
-#if !TARGET_OS_SIMULATOR
- firehose_client_kernel_source_handle_event(fcj);
-#endif
- } else {
- dispatch_source_merge_data(fcj->fc_io_source, 1);
- dispatch_source_merge_data(fcj->fc_mem_source, 1);
- }
- }
- });
- });
+ // cheating: equivalent to dispatch_group_enter() n times
+ // without the acquire barriers that we don't need
+ if (n) os_atomic_add2o(fs->fs_group, dg_value, n, relaxed);
}
static void
fs->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_COMPLETE, NULL);
server_config.fs_snapshot = NULL;
- server_config.fs_mem_snapshot_started = false;
- server_config.fs_io_snapshot_started = false;
dispatch_release(fs->fs_group);
Block_release(fs->handler);
static void
firehose_snapshot_gate(void *ctxt)
{
+ firehose_snapshot_t fs = ctxt;
+
// prevent other snapshots from running until done
+
dispatch_suspend(server_config.fs_snapshot_gate_queue);
- dispatch_async_f(server_config.fs_io_drain_queue, ctxt,
- firehose_snapshot_start);
+
+ server_config.fs_snapshot = fs;
+ dispatch_group_async(fs->fs_group, server_config.fs_mem_drain_queue, ^{
+ // start the fs_mem_snapshot, this is what triggers the snapshot
+ // logic from _drain() or handle_death()
+ fs->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_MEM_START, NULL);
+ firehose_snapshot_tickle_clients(fs, false);
+
+ dispatch_group_async(fs->fs_group, server_config.fs_io_drain_queue, ^{
+ // start the fs_io_snapshot, this is what triggers the snapshot
+ // logic from _drain() or handle_death()
+ // 29868879: must always happen after the memory snapshot started
+ fs->handler(NULL, FIREHOSE_SNAPSHOT_EVENT_IO_START, NULL);
+ firehose_snapshot_tickle_clients(fs, true);
+
+#if !TARGET_OS_SIMULATOR
+ if (server_config.fs_kernel_client) {
+ firehose_client_kernel_source_handle_event(
+ server_config.fs_kernel_client);
+ }
+#endif
+ });
+ });
+
+ dispatch_group_notify_f(fs->fs_group, server_config.fs_io_drain_queue,
+ fs, firehose_snapshot_finish);
}
void
snapshot->handler = Block_copy(handler);
snapshot->fs_group = dispatch_group_create();
- // keep the group entered until IO_START and MEM_START have been sent
- // See firehose_snapshot_start()
- dispatch_group_enter(snapshot->fs_group);
- dispatch_group_notify_f(snapshot->fs_group, server_config.fs_io_drain_queue,
- snapshot, firehose_snapshot_finish);
-
dispatch_async_f(server_config.fs_snapshot_gate_queue, snapshot,
firehose_snapshot_gate);
}
if (expects_notifs && !fc->fc_use_notifs) {
fc->fc_use_notifs = true;
}
- firehose_client_push_async_merge(fc, pp, for_io);
+ firehose_client_wakeup(fc, pp, for_io);
}
return KERN_SUCCESS;
}
kern_return_t
-firehose_server_push(mach_port_t server_port OS_UNUSED,
+firehose_server_push_and_wait(mach_port_t server_port OS_UNUSED,
mach_port_t reply_port, qos_class_t qos, boolean_t for_io,
- firehose_push_reply_t *push_reply OS_UNUSED)
+ firehose_push_reply_t *push_reply OS_UNUSED,
+ boolean_t *quarantinedOut OS_UNUSED)
{
firehose_client_t fc = cur_client_info;
dispatch_block_flags_t flags = DISPATCH_BLOCK_ENFORCE_QOS_CLASS;
}
block = dispatch_block_create_with_qos_class(flags, qos, 0, ^{
- firehose_client_drain(fc, reply_port,
+ firehose_client_drain_one(fc, reply_port,
for_io ? FIREHOSE_DRAIN_FOR_IO : 0);
});
dispatch_async(q, block);