+static void firehose_client_handle_death(void *ctxt);
+
+#pragma mark -
+#pragma mark firehose client enqueueing
+
+OS_ALWAYS_INLINE
+static inline bool
+fs_idx_is_for_io(size_t idx)
+{
+ return idx & 1;
+}
+
+OS_ALWAYS_INLINE
+static inline bool
+fs_queue_is_for_io(fs_client_queue_t q)
+{
+ return (q - server_config.fs_queues) & 1;
+}
+
+OS_ALWAYS_INLINE
+static inline bool
+fs_queue_is_for_quarantined(fs_client_queue_t q)
+{
+ return (q - server_config.fs_queues) & 2;
+}
+
+OS_ALWAYS_INLINE
+static inline fs_client_queue_t
+fs_queue(bool quarantined, bool for_io)
+{
+ return &server_config.fs_queues[quarantined * 2 + for_io];
+}
+
+OS_ALWAYS_INLINE
+static inline dispatch_source_t
+fs_source(bool quarantined, bool for_io)
+{
+ return server_config.fs_sources[quarantined * 2 + for_io];
+}
+
+OS_ALWAYS_INLINE
+static inline void
+firehose_client_push(firehose_client_t fc, pthread_priority_t pp,
+ bool quarantined, bool for_io)
+{
+ fs_client_queue_t queue = fs_queue(quarantined, for_io);
+ if (fc && os_mpsc_push_update_tail(queue, fs_client, fc, fc_next[for_io])) {
+ os_mpsc_push_update_head(queue, fs_client, fc);
+ _dispatch_source_merge_data(fs_source(quarantined, for_io), pp, 1);
+ } else if (pp) {
+ _dispatch_source_merge_data(fs_source(quarantined, for_io), pp, 1);
+ }
+}
+
+OS_ALWAYS_INLINE
+static inline bool
+firehose_client_wakeup(firehose_client_t fc, pthread_priority_t pp,
+ bool for_io)
+{
+ uintptr_t canceled_bit = FC_STATE_CANCELED(for_io);
+ uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io);
+ uintptr_t old_state, new_state;
+
+ os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, {
+ if (old_state & canceled_bit) {
+ os_atomic_rmw_loop_give_up(return false);
+ }
+ if (old_state & enqueued_bit) {
+ os_atomic_rmw_loop_give_up(break);
+ }
+ new_state = old_state | enqueued_bit;
+ });
+ firehose_client_push(old_state & enqueued_bit ? NULL : fc, pp,
+ fc->fc_quarantined, for_io);
+ return true;
+}
+
+OS_ALWAYS_INLINE
+static inline void
+firehose_client_start_cancel(firehose_client_t fc, bool for_io)
+{
+ uintptr_t canceling_bit = FC_STATE_CANCELING(for_io);
+ uintptr_t canceled_bit = FC_STATE_CANCELED(for_io);
+ uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io);
+ uintptr_t old_state, new_state;
+
+ os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, {
+ if (old_state & (canceled_bit | canceling_bit)) {
+ os_atomic_rmw_loop_give_up(return);
+ }
+ new_state = old_state | enqueued_bit | canceling_bit;
+ });
+ firehose_client_push(old_state & enqueued_bit ? NULL : fc, 0,
+ fc->fc_quarantined, for_io);
+}
+
+OS_ALWAYS_INLINE
+static inline bool
+firehose_client_dequeue(firehose_client_t fc, bool for_io)
+{
+ uintptr_t canceling_bit = FC_STATE_CANCELING(for_io);
+ uintptr_t canceled_bit = FC_STATE_CANCELED(for_io);
+ uintptr_t enqueued_bit = FC_STATE_ENQUEUED(for_io);
+ uintptr_t old_state, new_state;
+
+ os_atomic_rmw_loop(&fc->fc_state, old_state, new_state, relaxed, {
+ new_state = old_state & ~(canceling_bit | enqueued_bit);
+ if (old_state & canceling_bit) {
+ new_state |= canceled_bit;
+ }
+ });
+
+ if (((old_state ^ new_state) & FC_STATE_CANCELED_MASK) &&
+ (new_state & FC_STATE_CANCELED_MASK) == FC_STATE_CANCELED_MASK) {
+ dispatch_async_f(server_config.fs_io_drain_queue, fc,
+ firehose_client_handle_death);
+ }
+ return !(new_state & canceled_bit);
+}
+
+#pragma mark -
+#pragma mark firehose client state machine