2 * Copyright (c) 2015 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
21 #include <servers/bootstrap.h>
22 #include <sys/ioctl.h>
23 #include <sys/ttycom.h>
26 #include "firehoseServer.h" // MiG
27 #include "firehose_reply.h" // MiG
29 #if __has_feature(c_static_assert)
30 _Static_assert(offsetof(struct firehose_client_s
, fc_mem_sent_flushed_pos
)
31 % 8 == 0, "Make sure atomic fields are properly aligned");
34 typedef struct fs_client_queue_s
{
35 struct firehose_client_s
*volatile fs_client_head
;
36 struct firehose_client_s
*volatile fs_client_tail
;
37 } fs_client_queue_s
, *fs_client_queue_t
;
39 static struct firehose_server_s
{
40 mach_port_t fs_bootstrap_port
;
41 dispatch_mach_t fs_mach_channel
;
42 dispatch_queue_t fs_ipc_queue
;
43 dispatch_queue_t fs_snapshot_gate_queue
;
44 dispatch_queue_t fs_io_drain_queue
;
45 dispatch_queue_t fs_mem_drain_queue
;
46 firehose_handler_t fs_handler
;
48 firehose_snapshot_t fs_snapshot
;
50 firehose_client_t fs_kernel_client
;
52 TAILQ_HEAD(, firehose_client_s
) fs_clients
;
53 os_unfair_lock fs_clients_lock
;
54 fs_client_queue_s fs_queues
[4];
55 dispatch_source_t fs_sources
[4];
57 .fs_clients
= TAILQ_HEAD_INITIALIZER(server_config
.fs_clients
),
58 .fs_clients_lock
= OS_UNFAIR_LOCK_INIT
,
66 os_unfair_lock_lock_with_options(&server_config
.fs_clients_lock
,
67 OS_UNFAIR_LOCK_DATA_SYNCHRONIZATION
);
72 fs_clients_unlock(void)
74 os_unfair_lock_unlock(&server_config
.fs_clients_lock
);
77 static void firehose_server_demux(firehose_client_t fc
,
78 mach_msg_header_t
*msg_hdr
);
79 static void firehose_client_cancel(firehose_client_t fc
);
80 static void firehose_client_snapshot_finish(firehose_client_t fc
,
81 firehose_snapshot_t snapshot
, bool for_io
);
82 static void firehose_client_handle_death(void *ctxt
);
85 #pragma mark firehose client enqueueing
89 fs_idx_is_for_io(size_t idx
)
96 fs_queue_is_for_io(fs_client_queue_t q
)
98 return (q
- server_config
.fs_queues
) & 1;
103 fs_queue_is_for_quarantined(fs_client_queue_t q
)
105 return (q
- server_config
.fs_queues
) & 2;
109 static inline fs_client_queue_t
110 fs_queue(bool quarantined
, bool for_io
)
112 return &server_config
.fs_queues
[quarantined
* 2 + for_io
];
116 static inline dispatch_source_t
117 fs_source(bool quarantined
, bool for_io
)
119 return server_config
.fs_sources
[quarantined
* 2 + for_io
];
124 firehose_client_push(firehose_client_t fc
, pthread_priority_t pp
,
125 bool quarantined
, bool for_io
)
127 fs_client_queue_t queue
= fs_queue(quarantined
, for_io
);
128 if (fc
&& os_mpsc_push_update_tail(queue
, fs_client
, fc
, fc_next
[for_io
])) {
129 os_mpsc_push_update_head(queue
, fs_client
, fc
);
130 _dispatch_source_merge_data(fs_source(quarantined
, for_io
), pp
, 1);
132 _dispatch_source_merge_data(fs_source(quarantined
, for_io
), pp
, 1);
138 firehose_client_wakeup(firehose_client_t fc
, pthread_priority_t pp
,
141 uintptr_t canceled_bit
= FC_STATE_CANCELED(for_io
);
142 uintptr_t enqueued_bit
= FC_STATE_ENQUEUED(for_io
);
143 uintptr_t old_state
, new_state
;
145 os_atomic_rmw_loop(&fc
->fc_state
, old_state
, new_state
, relaxed
, {
146 if (old_state
& canceled_bit
) {
147 os_atomic_rmw_loop_give_up(return false);
149 if (old_state
& enqueued_bit
) {
150 os_atomic_rmw_loop_give_up(break);
152 new_state
= old_state
| enqueued_bit
;
154 firehose_client_push(old_state
& enqueued_bit
? NULL
: fc
, pp
,
155 fc
->fc_quarantined
, for_io
);
161 firehose_client_start_cancel(firehose_client_t fc
, bool for_io
)
163 uintptr_t canceling_bit
= FC_STATE_CANCELING(for_io
);
164 uintptr_t canceled_bit
= FC_STATE_CANCELED(for_io
);
165 uintptr_t enqueued_bit
= FC_STATE_ENQUEUED(for_io
);
166 uintptr_t old_state
, new_state
;
168 os_atomic_rmw_loop(&fc
->fc_state
, old_state
, new_state
, relaxed
, {
169 if (old_state
& (canceled_bit
| canceling_bit
)) {
170 os_atomic_rmw_loop_give_up(return);
172 new_state
= old_state
| enqueued_bit
| canceling_bit
;
174 firehose_client_push(old_state
& enqueued_bit
? NULL
: fc
, 0,
175 fc
->fc_quarantined
, for_io
);
180 firehose_client_dequeue(firehose_client_t fc
, bool for_io
)
182 uintptr_t canceling_bit
= FC_STATE_CANCELING(for_io
);
183 uintptr_t canceled_bit
= FC_STATE_CANCELED(for_io
);
184 uintptr_t enqueued_bit
= FC_STATE_ENQUEUED(for_io
);
185 uintptr_t old_state
, new_state
;
187 os_atomic_rmw_loop(&fc
->fc_state
, old_state
, new_state
, relaxed
, {
188 new_state
= old_state
& ~(canceling_bit
| enqueued_bit
);
189 if (old_state
& canceling_bit
) {
190 new_state
|= canceled_bit
;
194 if (((old_state
^ new_state
) & FC_STATE_CANCELED_MASK
) &&
195 (new_state
& FC_STATE_CANCELED_MASK
) == FC_STATE_CANCELED_MASK
) {
196 dispatch_async_f(server_config
.fs_io_drain_queue
, fc
,
197 firehose_client_handle_death
);
199 return !(new_state
& canceled_bit
);
203 #pragma mark firehose client state machine
206 firehose_client_notify(firehose_client_t fc
, mach_port_t reply_port
)
208 firehose_push_reply_t push_reply
= {
209 .fpr_mem_flushed_pos
= os_atomic_load2o(fc
, fc_mem_flushed_pos
,relaxed
),
210 .fpr_io_flushed_pos
= os_atomic_load2o(fc
, fc_io_flushed_pos
, relaxed
),
214 firehose_atomic_max2o(fc
, fc_mem_sent_flushed_pos
,
215 push_reply
.fpr_mem_flushed_pos
, relaxed
);
216 firehose_atomic_max2o(fc
, fc_io_sent_flushed_pos
,
217 push_reply
.fpr_io_flushed_pos
, relaxed
);
220 if (ioctl(server_config
.fs_kernel_fd
, LOGFLUSHED
, &push_reply
) < 0) {
221 dispatch_assume_zero(errno
);
224 if (reply_port
== fc
->fc_sendp
) {
225 kr
= firehose_send_push_notify_async(reply_port
, push_reply
,
226 fc
->fc_quarantined
, 0);
228 kr
= firehose_send_push_reply(reply_port
, KERN_SUCCESS
, push_reply
,
231 if (kr
!= MACH_SEND_INVALID_DEST
) {
232 DISPATCH_VERIFY_MIG(kr
);
233 dispatch_assume_zero(kr
);
239 static inline uint16_t
240 firehose_client_acquire_head(firehose_buffer_t fb
, bool for_io
)
244 head
= os_atomic_load2o(&fb
->fb_header
, fbh_ring_io_head
, acquire
);
246 head
= os_atomic_load2o(&fb
->fb_header
, fbh_ring_mem_head
, acquire
);
253 firehose_client_mark_corrupted(firehose_client_t fc
, mach_port_t reply_port
)
255 // this client is really confused, do *not* answer to asyncs anymore
256 fc
->fc_memory_corrupted
= true;
257 fc
->fc_use_notifs
= false;
259 // XXX: do not cancel the data sources or a corrupted client could
260 // prevent snapshots from being taken if unlucky with ordering
263 kern_return_t kr
= firehose_send_push_reply(reply_port
, 0,
264 FIREHOSE_PUSH_REPLY_CORRUPTED
, false);
265 DISPATCH_VERIFY_MIG(kr
);
266 dispatch_assume_zero(kr
);
272 firehose_client_snapshot_mark_done(firehose_client_t fc
,
273 firehose_snapshot_t snapshot
, bool for_io
)
276 fc
->fc_needs_io_snapshot
= false;
278 fc
->fc_needs_mem_snapshot
= false;
280 dispatch_group_leave(snapshot
->fs_group
);
283 #define DRAIN_BATCH_SIZE 4
284 #define FIREHOSE_DRAIN_FOR_IO 0x1
285 #define FIREHOSE_DRAIN_POLL 0x2
289 firehose_client_drain_one(firehose_client_t fc
, mach_port_t port
, uint32_t flags
)
291 firehose_buffer_t fb
= fc
->fc_buffer
;
292 firehose_chunk_t fbc
;
293 firehose_event_t evt
;
294 uint16_t volatile *fbh_ring
;
295 uint16_t flushed
, ref
, count
= 0;
296 uint16_t client_head
, client_flushed
, sent_flushed
;
297 firehose_snapshot_t snapshot
= NULL
;
298 bool for_io
= (flags
& FIREHOSE_DRAIN_FOR_IO
);
301 evt
= FIREHOSE_EVENT_IO_BUFFER_RECEIVED
;
302 _Static_assert(FIREHOSE_EVENT_IO_BUFFER_RECEIVED
==
303 FIREHOSE_SNAPSHOT_EVENT_IO_BUFFER
, "");
304 fbh_ring
= fb
->fb_header
.fbh_io_ring
;
305 sent_flushed
= (uint16_t)fc
->fc_io_sent_flushed_pos
;
306 flushed
= (uint16_t)fc
->fc_io_flushed_pos
;
307 if (fc
->fc_needs_io_snapshot
) snapshot
= server_config
.fs_snapshot
;
309 evt
= FIREHOSE_EVENT_MEM_BUFFER_RECEIVED
;
310 _Static_assert(FIREHOSE_EVENT_MEM_BUFFER_RECEIVED
==
311 FIREHOSE_SNAPSHOT_EVENT_MEM_BUFFER
, "");
312 fbh_ring
= fb
->fb_header
.fbh_mem_ring
;
313 sent_flushed
= (uint16_t)fc
->fc_mem_sent_flushed_pos
;
314 flushed
= (uint16_t)fc
->fc_mem_flushed_pos
;
315 if (fc
->fc_needs_mem_snapshot
) snapshot
= server_config
.fs_snapshot
;
318 if (slowpath(fc
->fc_memory_corrupted
)) {
322 client_head
= flushed
;
324 if ((uint16_t)(flushed
+ count
) == client_head
) {
325 client_head
= firehose_client_acquire_head(fb
, for_io
);
326 if ((uint16_t)(flushed
+ count
) == client_head
) {
329 if ((uint16_t)(client_head
- sent_flushed
) >=
330 FIREHOSE_BUFFER_CHUNK_COUNT
) {
335 // see firehose_buffer_ring_enqueue
337 ref
= (flushed
+ count
) & FIREHOSE_RING_POS_IDX_MASK
;
338 ref
= os_atomic_load(&fbh_ring
[ref
], relaxed
);
339 ref
&= FIREHOSE_RING_POS_IDX_MASK
;
340 } while (!fc
->fc_pid
&& !ref
);
343 _dispatch_debug("Ignoring invalid page reference in ring: %d", ref
);
347 fbc
= firehose_buffer_ref_to_chunk(fb
, ref
);
348 if (fbc
->fc_pos
.fcp_stream
== firehose_stream_metadata
) {
349 // serialize with firehose_client_metadata_stream_peek
350 os_unfair_lock_lock(&fc
->fc_lock
);
352 server_config
.fs_handler(fc
, evt
, fbc
);
353 if (slowpath(snapshot
)) {
354 snapshot
->handler(fc
, evt
, fbc
);
356 if (fbc
->fc_pos
.fcp_stream
== firehose_stream_metadata
) {
357 os_unfair_lock_unlock(&fc
->fc_lock
);
359 // clients not using notifications (single threaded) always drain fully
360 // because they use all their limit, always
361 } while (!fc
->fc_use_notifs
|| count
< DRAIN_BATCH_SIZE
|| snapshot
);
364 // we don't load the full fbh_ring_tail because that is a 64bit quantity
365 // and we only need 16bits from it. and on 32bit arm, there's no way to
366 // perform an atomic load of a 64bit quantity on read-only memory.
368 os_atomic_add2o(fc
, fc_io_flushed_pos
, count
, relaxed
);
369 client_flushed
= os_atomic_load2o(&fb
->fb_header
,
370 fbh_ring_tail
.frp_io_flushed
, relaxed
);
372 os_atomic_add2o(fc
, fc_mem_flushed_pos
, count
, relaxed
);
373 client_flushed
= os_atomic_load2o(&fb
->fb_header
,
374 fbh_ring_tail
.frp_mem_flushed
, relaxed
);
377 // will fire firehose_client_notify() because port is MACH_PORT_DEAD
379 } else if (!port
&& client_flushed
== sent_flushed
&& fc
->fc_use_notifs
) {
384 if (slowpath(snapshot
)) {
385 firehose_client_snapshot_finish(fc
, snapshot
, for_io
);
386 firehose_client_snapshot_mark_done(fc
, snapshot
, for_io
);
389 firehose_client_notify(fc
, port
);
392 if (!(flags
& FIREHOSE_DRAIN_POLL
)) {
393 // see firehose_client_kernel_source_handle_event
394 dispatch_resume(fc
->fc_kernel_source
);
397 if (fc
->fc_use_notifs
&& count
>= DRAIN_BATCH_SIZE
) {
398 // if we hit the drain batch size, the client probably logs a lot
399 // and there's more to drain, so optimistically schedule draining
400 // again this is cheap since the queue is hot, and is fair for other
402 firehose_client_wakeup(fc
, 0, for_io
);
404 if (count
&& server_config
.fs_kernel_client
) {
405 // the kernel is special because it can drop messages, so if we're
406 // draining, poll the kernel each time while we're bound to a thread
407 firehose_client_drain_one(server_config
.fs_kernel_client
,
408 MACH_PORT_NULL
, flags
| FIREHOSE_DRAIN_POLL
);
415 firehose_client_snapshot_mark_done(fc
, snapshot
, for_io
);
417 firehose_client_mark_corrupted(fc
, port
);
418 // from now on all IO/mem drains depending on `for_io` will be no-op
419 // (needs_<for_io>_snapshot: false, memory_corrupted: true). we can safely
420 // silence the corresponding source of drain wake-ups.
422 firehose_client_start_cancel(fc
, for_io
);
427 firehose_client_drain(void *ctxt
)
429 fs_client_queue_t queue
= ctxt
;
430 bool for_io
= fs_queue_is_for_io(queue
);
431 bool quarantined
= fs_queue_is_for_quarantined(queue
);
432 firehose_client_t fc
, fc_next
;
435 while (queue
->fs_client_tail
) {
436 fc
= os_mpsc_get_head(queue
, fs_client
);
438 fc_next
= os_mpsc_pop_head(queue
, fs_client
, fc
, fc_next
[for_io
]);
439 if (firehose_client_dequeue(fc
, for_io
)) {
440 firehose_client_drain_one(fc
, MACH_PORT_NULL
,
441 for_io
? FIREHOSE_DRAIN_FOR_IO
: 0);
443 // process quarantined clients 4 times as slow as the other ones
444 // also reasyncing every 4 clients allows for discovering
445 // quarantined suspension faster
446 if (++clients
== (quarantined
? 1 : 4)) {
447 dispatch_source_merge_data(fs_source(quarantined
, for_io
), 1);
450 } while ((fc
= fc_next
));
456 firehose_client_finalize(firehose_client_t fc OS_OBJECT_CONSUMED
)
458 firehose_snapshot_t snapshot
= server_config
.fs_snapshot
;
459 firehose_buffer_t fb
= fc
->fc_buffer
;
461 dispatch_assert_queue(server_config
.fs_io_drain_queue
);
463 // if a client dies between phase 1 and 2 of starting the snapshot
464 // (see firehose_snapshot_start)) there's no handler to call, but the
465 // dispatch group has to be adjusted for this client going away.
466 if (fc
->fc_needs_io_snapshot
) {
467 dispatch_group_leave(snapshot
->fs_group
);
468 fc
->fc_needs_io_snapshot
= false;
470 if (fc
->fc_needs_mem_snapshot
) {
471 dispatch_group_leave(snapshot
->fs_group
);
472 fc
->fc_needs_mem_snapshot
= false;
474 if (fc
->fc_memory_corrupted
) {
475 server_config
.fs_handler(fc
, FIREHOSE_EVENT_CLIENT_CORRUPTED
,
478 server_config
.fs_handler(fc
, FIREHOSE_EVENT_CLIENT_DIED
, NULL
);
481 TAILQ_REMOVE(&server_config
.fs_clients
, fc
, fc_entry
);
484 dispatch_release(fc
->fc_mach_channel
);
485 fc
->fc_mach_channel
= NULL
;
486 fc
->fc_entry
.tqe_next
= DISPATCH_OBJECT_LISTLESS
;
487 fc
->fc_entry
.tqe_prev
= DISPATCH_OBJECT_LISTLESS
;
488 _os_object_release(&fc
->fc_as_os_object
);
493 firehose_client_handle_death(void *ctxt
)
495 firehose_client_t fc
= ctxt
;
496 firehose_buffer_t fb
= fc
->fc_buffer
;
497 firehose_buffer_header_t fbh
= &fb
->fb_header
;
498 uint64_t mem_bitmap
= 0, bitmap
;
500 if (fc
->fc_memory_corrupted
) {
501 return firehose_client_finalize(fc
);
504 dispatch_assert_queue(server_config
.fs_io_drain_queue
);
506 // acquire to match release barriers from threads that died
507 os_atomic_thread_fence(acquire
);
509 bitmap
= fbh
->fbh_bank
.fbb_bitmap
& ~1ULL;
510 for (int for_io
= 0; for_io
< 2; for_io
++) {
511 uint16_t volatile *fbh_ring
;
512 uint16_t tail
, flushed
;
515 fbh_ring
= fbh
->fbh_io_ring
;
516 tail
= fbh
->fbh_ring_tail
.frp_io_tail
;
517 flushed
= (uint16_t)fc
->fc_io_flushed_pos
;
519 fbh_ring
= fbh
->fbh_mem_ring
;
520 tail
= fbh
->fbh_ring_tail
.frp_mem_tail
;
521 flushed
= (uint16_t)fc
->fc_mem_flushed_pos
;
523 if ((uint16_t)(flushed
- tail
) >= FIREHOSE_BUFFER_CHUNK_COUNT
) {
524 fc
->fc_memory_corrupted
= true;
525 return firehose_client_finalize(fc
);
528 // remove the pages that we flushed already from the bitmap
529 for (; tail
!= flushed
; tail
++) {
530 uint16_t ring_pos
= tail
& FIREHOSE_RING_POS_IDX_MASK
;
531 uint16_t ref
= fbh_ring
[ring_pos
] & FIREHOSE_RING_POS_IDX_MASK
;
533 bitmap
&= ~(1ULL << ref
);
537 firehose_snapshot_t snapshot
= server_config
.fs_snapshot
;
539 // Then look at all the allocated pages not seen in the ring
541 uint16_t ref
= firehose_bitmap_first_set(bitmap
);
542 firehose_chunk_t fbc
= firehose_buffer_ref_to_chunk(fb
, ref
);
543 uint16_t fbc_length
= fbc
->fc_pos
.fcp_next_entry_offs
;
545 bitmap
&= ~(1ULL << ref
);
546 if (fbc
->fc_start
+ fbc_length
<= fbc
->fc_data
) {
547 // this page has its "recycle-requeue" done, but hasn't gone
548 // through "recycle-reuse", or it has no data, ditch it
551 if (!((firehose_tracepoint_t
)fbc
->fc_data
)->ft_length
) {
552 // this thing has data, but the first tracepoint is unreadable
553 // so also just ditch it
556 if (!fbc
->fc_pos
.fcp_flag_io
) {
557 mem_bitmap
|= 1ULL << ref
;
560 server_config
.fs_handler(fc
, FIREHOSE_EVENT_IO_BUFFER_RECEIVED
, fbc
);
561 if (fc
->fc_needs_io_snapshot
) {
562 snapshot
->handler(fc
, FIREHOSE_SNAPSHOT_EVENT_IO_BUFFER
, fbc
);
567 return firehose_client_finalize(fc
);
570 dispatch_async(server_config
.fs_mem_drain_queue
, ^{
571 uint64_t mem_bitmap_copy
= mem_bitmap
;
573 while (mem_bitmap_copy
) {
574 uint16_t ref
= firehose_bitmap_first_set(mem_bitmap_copy
);
575 firehose_chunk_t fbc
= firehose_buffer_ref_to_chunk(fb
, ref
);
577 mem_bitmap_copy
&= ~(1ULL << ref
);
578 server_config
.fs_handler(fc
, FIREHOSE_EVENT_MEM_BUFFER_RECEIVED
, fbc
);
579 if (fc
->fc_needs_mem_snapshot
) {
580 snapshot
->handler(fc
, FIREHOSE_SNAPSHOT_EVENT_MEM_BUFFER
, fbc
);
584 dispatch_async_f(server_config
.fs_io_drain_queue
, fc
,
585 (dispatch_function_t
)firehose_client_finalize
);
590 firehose_client_handle_mach_event(void *ctx
, dispatch_mach_reason_t reason
,
591 dispatch_mach_msg_t dmsg
, mach_error_t error OS_UNUSED
)
593 mach_msg_header_t
*msg_hdr
= NULL
;
594 firehose_client_t fc
= ctx
;
598 case DISPATCH_MACH_MESSAGE_RECEIVED
:
599 msg_hdr
= dispatch_mach_msg_get_msg(dmsg
, NULL
);
600 if (msg_hdr
->msgh_id
== MACH_NOTIFY_NO_SENDERS
) {
601 _dispatch_debug("FIREHOSE NO_SENDERS (unique_pid: 0x%llx)",
602 firehose_client_get_unique_pid(fc
, NULL
));
603 dispatch_mach_cancel(fc
->fc_mach_channel
);
605 firehose_server_demux(fc
, msg_hdr
);
609 case DISPATCH_MACH_DISCONNECTED
:
610 msg_hdr
= dispatch_mach_msg_get_msg(dmsg
, NULL
);
611 port
= msg_hdr
->msgh_remote_port
;
612 if (MACH_PORT_VALID(port
)) {
613 if (port
!= fc
->fc_sendp
) {
614 DISPATCH_INTERNAL_CRASH(port
, "Unknown send-right");
616 firehose_mach_port_send_release(fc
->fc_sendp
);
617 fc
->fc_sendp
= MACH_PORT_NULL
;
619 port
= msg_hdr
->msgh_local_port
;
620 if (MACH_PORT_VALID(port
)) {
621 if (port
!= fc
->fc_recvp
) {
622 DISPATCH_INTERNAL_CRASH(port
, "Unknown recv-right");
624 firehose_mach_port_recv_dispose(fc
->fc_recvp
, fc
);
625 fc
->fc_recvp
= MACH_PORT_NULL
;
629 case DISPATCH_MACH_CANCELED
:
630 if (MACH_PORT_VALID(fc
->fc_sendp
)) {
631 DISPATCH_INTERNAL_CRASH(fc
->fc_sendp
, "send-right leak");
633 if (MACH_PORT_VALID(fc
->fc_recvp
)) {
634 DISPATCH_INTERNAL_CRASH(fc
->fc_recvp
, "recv-right leak");
636 firehose_client_cancel(fc
);
641 #if !TARGET_OS_SIMULATOR
643 firehose_client_kernel_source_handle_event(void *ctxt
)
645 firehose_client_t fc
= ctxt
;
647 // resumed in firehose_client_drain for both memory and I/O
648 dispatch_suspend(fc
->fc_kernel_source
);
649 dispatch_suspend(fc
->fc_kernel_source
);
650 firehose_client_wakeup(fc
, 0, false);
651 firehose_client_wakeup(fc
, 0, true);
656 firehose_client_resume(firehose_client_t fc
,
657 const struct firehose_client_connected_info_s
*fcci
)
659 dispatch_assert_queue(server_config
.fs_io_drain_queue
);
662 TAILQ_INSERT_TAIL(&server_config
.fs_clients
, fc
, fc_entry
);
665 server_config
.fs_handler(fc
, FIREHOSE_EVENT_CLIENT_CONNECTED
, (void *)fcci
);
667 dispatch_activate(fc
->fc_kernel_source
);
669 dispatch_mach_connect(fc
->fc_mach_channel
,
670 fc
->fc_recvp
, fc
->fc_sendp
, NULL
);
675 firehose_client_cancel(firehose_client_t fc
)
677 _dispatch_debug("client died (unique_pid: 0x%llx",
678 firehose_client_get_unique_pid(fc
, NULL
));
680 if (MACH_PORT_VALID(fc
->fc_sendp
)) {
681 firehose_mach_port_send_release(fc
->fc_sendp
);
682 fc
->fc_sendp
= MACH_PORT_NULL
;
684 if (MACH_PORT_VALID(fc
->fc_recvp
)) {
685 firehose_mach_port_recv_dispose(fc
->fc_recvp
, fc
);
686 fc
->fc_recvp
= MACH_PORT_NULL
;
688 fc
->fc_use_notifs
= false;
689 firehose_client_start_cancel(fc
, false);
690 firehose_client_start_cancel(fc
, true);
693 static firehose_client_t
694 _firehose_client_create(firehose_buffer_t fb
)
696 firehose_client_t fc
;
698 fc
= (firehose_client_t
)_os_object_alloc_realized(FIREHOSE_CLIENT_CLASS
,
699 sizeof(struct firehose_client_s
));
701 fc
->fc_mem_flushed_pos
= fb
->fb_header
.fbh_bank
.fbb_mem_flushed
;
702 fc
->fc_mem_sent_flushed_pos
= fc
->fc_mem_flushed_pos
;
703 fc
->fc_io_flushed_pos
= fb
->fb_header
.fbh_bank
.fbb_io_flushed
;
704 fc
->fc_io_sent_flushed_pos
= fc
->fc_io_flushed_pos
;
709 typedef struct firehose_token_s
{
721 static firehose_client_t
722 firehose_client_create(firehose_buffer_t fb
, firehose_token_t token
,
723 mach_port_t comm_recvp
, mach_port_t comm_sendp
)
725 uint64_t unique_pid
= fb
->fb_header
.fbh_uniquepid
;
726 firehose_client_t fc
= _firehose_client_create(fb
);
729 fc
->fc_pid
= token
->pid
? token
->pid
: ~0;
730 fc
->fc_euid
= token
->euid
;
731 fc
->fc_pidversion
= token
->execcnt
;
733 _dispatch_debug("FIREHOSE_REGISTER (unique_pid: 0x%llx)", unique_pid
);
734 fc
->fc_recvp
= comm_recvp
;
735 fc
->fc_sendp
= comm_sendp
;
736 firehose_mach_port_guard(comm_recvp
, true, fc
);
737 dm
= dispatch_mach_create_f("com.apple.firehose.peer",
738 server_config
.fs_ipc_queue
,
739 fc
, firehose_client_handle_mach_event
);
740 fc
->fc_mach_channel
= dm
;
745 firehose_kernel_client_create(void)
747 #if !TARGET_OS_SIMULATOR
748 struct firehose_server_s
*fs
= &server_config
;
749 firehose_buffer_map_info_t fb_map
;
750 firehose_client_t fc
;
751 dispatch_source_t ds
;
754 while ((fd
= open("/dev/oslog", O_RDWR
)) < 0) {
755 if (errno
== EINTR
) {
758 if (errno
== ENOENT
) {
761 DISPATCH_INTERNAL_CRASH(errno
, "Unable to open /dev/oslog");
764 while (ioctl(fd
, LOGBUFFERMAP
, &fb_map
) < 0) {
765 if (errno
== EINTR
) {
768 DISPATCH_INTERNAL_CRASH(errno
, "Unable to map kernel buffer");
770 if (fb_map
.fbmi_size
!=
771 FIREHOSE_BUFFER_KERNEL_CHUNK_COUNT
* FIREHOSE_CHUNK_SIZE
) {
772 DISPATCH_INTERNAL_CRASH(fb_map
.fbmi_size
, "Unexpected kernel buffer size");
775 fc
= _firehose_client_create((firehose_buffer_t
)(uintptr_t)fb_map
.fbmi_addr
);
776 ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, (uintptr_t)fd
, 0,
778 dispatch_set_context(ds
, fc
);
779 dispatch_source_set_event_handler_f(ds
,
780 firehose_client_kernel_source_handle_event
);
781 fc
->fc_kernel_source
= ds
;
782 fc
->fc_use_notifs
= true;
783 fc
->fc_sendp
= MACH_PORT_DEAD
; // causes drain() to call notify
785 fs
->fs_kernel_fd
= fd
;
786 fs
->fs_kernel_client
= fc
;
791 _firehose_client_dispose(firehose_client_t fc
)
793 vm_deallocate(mach_task_self(), (vm_address_t
)fc
->fc_buffer
,
794 sizeof(*fc
->fc_buffer
));
795 fc
->fc_buffer
= NULL
;
796 server_config
.fs_handler(fc
, FIREHOSE_EVENT_CLIENT_FINALIZE
, NULL
);
800 _firehose_client_xref_dispose(firehose_client_t fc
)
802 _dispatch_debug("Cleaning up client info for unique_pid 0x%llx",
803 firehose_client_get_unique_pid(fc
, NULL
));
807 firehose_client_get_unique_pid(firehose_client_t fc
, pid_t
*pid_out
)
809 firehose_buffer_header_t fbh
= &fc
->fc_buffer
->fb_header
;
810 if (pid_out
) *pid_out
= fc
->fc_pid
;
811 if (!fc
->fc_pid
) return 0;
812 return fbh
->fbh_uniquepid
? fbh
->fbh_uniquepid
: ~0ull;
816 firehose_client_get_euid(firehose_client_t fc
)
822 firehose_client_get_pid_version(firehose_client_t fc
)
824 return fc
->fc_pidversion
;
828 firehose_client_get_metadata_buffer(firehose_client_t client
, size_t *size
)
830 firehose_buffer_header_t fbh
= &client
->fc_buffer
->fb_header
;
832 *size
= FIREHOSE_BUFFER_LIBTRACE_HEADER_SIZE
;
833 return (void *)((uintptr_t)(fbh
+ 1) - *size
);
837 firehose_client_get_context(firehose_client_t fc
)
839 return os_atomic_load2o(fc
, fc_ctxt
, relaxed
);
843 firehose_client_set_context(firehose_client_t fc
, void *ctxt
)
845 return os_atomic_xchg2o(fc
, fc_ctxt
, ctxt
, relaxed
);
849 firehose_client_initiate_quarantine(firehose_client_t fc
)
851 fc
->fc_quarantined
= true;
855 #pragma mark firehose server
858 * The current_message context stores the client info for the current message
859 * being handled. The only reason this works is because currently the message
860 * processing is serial. If that changes, this would not work.
862 static firehose_client_t cur_client_info
;
865 firehose_server_handle_mach_event(void *ctx OS_UNUSED
,
866 dispatch_mach_reason_t reason
, dispatch_mach_msg_t dmsg
,
867 mach_error_t error OS_UNUSED
)
869 mach_msg_header_t
*msg_hdr
= NULL
;
871 if (reason
== DISPATCH_MACH_MESSAGE_RECEIVED
) {
872 msg_hdr
= dispatch_mach_msg_get_msg(dmsg
, NULL
);
873 /* TODO: Assert this should be a register message */
874 firehose_server_demux(NULL
, msg_hdr
);
879 firehose_server_init(mach_port_t comm_port
, firehose_handler_t handler
)
881 struct firehose_server_s
*fs
= &server_config
;
882 dispatch_queue_attr_t attr
= DISPATCH_QUEUE_SERIAL_WITH_AUTORELEASE_POOL
;
883 dispatch_queue_attr_t attr_ui
;
885 dispatch_source_t ds
;
887 // just reference the string so that it's captured
888 (void)os_atomic_load(&__libfirehose_serverVersionString
[0], relaxed
);
890 attr_ui
= dispatch_queue_attr_make_with_qos_class(attr
,
891 QOS_CLASS_USER_INITIATED
, 0);
892 fs
->fs_ipc_queue
= dispatch_queue_create_with_target(
893 "com.apple.firehose.ipc", attr_ui
, NULL
);
894 fs
->fs_snapshot_gate_queue
= dispatch_queue_create_with_target(
895 "com.apple.firehose.snapshot-gate", attr
, NULL
);
896 fs
->fs_io_drain_queue
= dispatch_queue_create_with_target(
897 "com.apple.firehose.drain-io", attr
, NULL
);
898 fs
->fs_mem_drain_queue
= dispatch_queue_create_with_target(
899 "com.apple.firehose.drain-mem", attr
, NULL
);
901 dm
= dispatch_mach_create_f("com.apple.firehose.listener",
902 fs
->fs_ipc_queue
, NULL
, firehose_server_handle_mach_event
);
903 fs
->fs_bootstrap_port
= comm_port
;
904 fs
->fs_mach_channel
= dm
;
905 fs
->fs_handler
= _Block_copy(handler
);
906 firehose_kernel_client_create();
908 for (size_t i
= 0; i
< countof(fs
->fs_sources
); i
++) {
909 ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_OR
, 0, 0,
910 fs_idx_is_for_io(i
) ? server_config
.fs_io_drain_queue
:
911 server_config
.fs_mem_drain_queue
);
912 dispatch_set_context(ds
, &fs
->fs_queues
[i
]);
913 dispatch_source_set_event_handler_f(ds
, firehose_client_drain
);
914 fs
->fs_sources
[i
] = ds
;
919 firehose_server_assert_spi_version(uint32_t spi_version
)
921 if (spi_version
!= OS_FIREHOSE_SPI_VERSION
) {
922 DISPATCH_CLIENT_CRASH(spi_version
, "firehose server version mismatch ("
923 OS_STRINGIFY(OS_FIREHOSE_SPI_VERSION
) ")");
925 if (_firehose_spi_version
!= OS_FIREHOSE_SPI_VERSION
) {
926 DISPATCH_CLIENT_CRASH(_firehose_spi_version
,
927 "firehose libdispatch version mismatch ("
928 OS_STRINGIFY(OS_FIREHOSE_SPI_VERSION
) ")");
934 firehose_server_has_ever_flushed_pages(void)
936 // Use the IO pages flushed count from the kernel client as an
937 // approximation for whether the firehose has ever flushed pages during
938 // this boot. logd uses this detect the first time it starts after a
940 firehose_client_t fhc
= server_config
.fs_kernel_client
;
941 return !fhc
|| fhc
->fc_io_flushed_pos
> 0;
945 firehose_server_resume(void)
947 struct firehose_server_s
*fs
= &server_config
;
949 if (fs
->fs_kernel_client
) {
950 dispatch_async(fs
->fs_io_drain_queue
, ^{
951 struct firehose_client_connected_info_s fcci
= {
952 .fcci_version
= FIREHOSE_CLIENT_CONNECTED_INFO_VERSION
,
954 firehose_client_resume(fs
->fs_kernel_client
, &fcci
);
957 dispatch_mach_connect(fs
->fs_mach_channel
, fs
->fs_bootstrap_port
,
958 MACH_PORT_NULL
, NULL
);
959 for (size_t i
= 0; i
< countof(fs
->fs_sources
); i
++) {
960 dispatch_activate(fs
->fs_sources
[i
]);
965 firehose_server_cancel(void)
967 firehose_client_t fc
;
969 dispatch_mach_cancel(server_config
.fs_mach_channel
);
972 TAILQ_FOREACH(fc
, &server_config
.fs_clients
, fc_entry
) {
973 dispatch_mach_cancel(fc
->fc_mach_channel
);
979 firehose_server_copy_queue(firehose_server_queue_t which
)
983 case FIREHOSE_SERVER_QUEUE_IO
:
984 dq
= server_config
.fs_io_drain_queue
;
986 case FIREHOSE_SERVER_QUEUE_MEMORY
:
987 dq
= server_config
.fs_mem_drain_queue
;
990 DISPATCH_INTERNAL_CRASH(which
, "Invalid firehose server queue type");
997 firehose_server_quarantined_suspend(firehose_server_queue_t which
)
1000 case FIREHOSE_SERVER_QUEUE_IO
:
1001 dispatch_suspend(fs_source(true, true));
1003 case FIREHOSE_SERVER_QUEUE_MEMORY
:
1004 dispatch_suspend(fs_source(true, false));
1007 DISPATCH_INTERNAL_CRASH(which
, "Invalid firehose server queue type");
1012 firehose_server_quarantined_resume(firehose_server_queue_t which
)
1015 case FIREHOSE_SERVER_QUEUE_IO
:
1016 dispatch_resume(fs_source(true, true));
1018 case FIREHOSE_SERVER_QUEUE_MEMORY
:
1019 dispatch_resume(fs_source(true, false));
1022 DISPATCH_INTERNAL_CRASH(which
, "Invalid firehose server queue type");
1028 #pragma mark firehose snapshot and peeking
1031 firehose_client_metadata_stream_peek(firehose_client_t fc
,
1032 OS_UNUSED firehose_event_t context
, bool (^peek_should_start
)(void),
1033 bool (^peek
)(firehose_chunk_t fbc
))
1035 os_unfair_lock_lock(&fc
->fc_lock
);
1037 if (peek_should_start
&& peek_should_start()) {
1038 firehose_buffer_t fb
= fc
->fc_buffer
;
1039 firehose_buffer_header_t fbh
= &fb
->fb_header
;
1040 uint64_t bitmap
= fbh
->fbh_bank
.fbb_metadata_bitmap
;
1043 uint16_t ref
= firehose_bitmap_first_set(bitmap
);
1044 firehose_chunk_t fbc
= firehose_buffer_ref_to_chunk(fb
, ref
);
1045 uint16_t fbc_length
= fbc
->fc_pos
.fcp_next_entry_offs
;
1047 bitmap
&= ~(1ULL << ref
);
1048 if (fbc
->fc_start
+ fbc_length
<= fbc
->fc_data
) {
1049 // this page has its "recycle-requeue" done, but hasn't gone
1050 // through "recycle-reuse", or it has no data, ditch it
1053 if (!((firehose_tracepoint_t
)fbc
->fc_data
)->ft_length
) {
1054 // this thing has data, but the first tracepoint is unreadable
1055 // so also just ditch it
1058 if (fbc
->fc_pos
.fcp_stream
!= firehose_stream_metadata
) {
1067 os_unfair_lock_unlock(&fc
->fc_lock
);
1072 firehose_client_snapshot_finish(firehose_client_t fc
,
1073 firehose_snapshot_t snapshot
, bool for_io
)
1075 firehose_buffer_t fb
= fc
->fc_buffer
;
1076 firehose_buffer_header_t fbh
= &fb
->fb_header
;
1077 firehose_snapshot_event_t evt
;
1078 uint16_t volatile *fbh_ring
;
1079 uint16_t tail
, flushed
;
1085 fbh_ring
= fbh
->fbh_io_ring
;
1086 tail
= fbh
->fbh_ring_tail
.frp_io_tail
;
1087 flushed
= (uint16_t)fc
->fc_io_flushed_pos
;
1088 evt
= FIREHOSE_SNAPSHOT_EVENT_IO_BUFFER
;
1090 fbh_ring
= fbh
->fbh_mem_ring
;
1091 tail
= fbh
->fbh_ring_tail
.frp_mem_tail
;
1092 flushed
= (uint16_t)fc
->fc_mem_flushed_pos
;
1093 evt
= FIREHOSE_SNAPSHOT_EVENT_MEM_BUFFER
;
1095 if ((uint16_t)(flushed
- tail
) >= FIREHOSE_BUFFER_CHUNK_COUNT
) {
1096 fc
->fc_memory_corrupted
= true;
1100 // remove the pages that we flushed already from the bitmap
1101 for (; tail
!= flushed
; tail
++) {
1102 uint16_t idx
= tail
& FIREHOSE_RING_POS_IDX_MASK
;
1103 uint16_t ref
= fbh_ring
[idx
] & FIREHOSE_RING_POS_IDX_MASK
;
1105 bitmap
&= ~(1ULL << ref
);
1108 // Remove pages that are free by AND-ing with the allocating bitmap.
1109 // The load of fbb_bitmap may not be atomic, but it's ok because bits
1110 // being flipped are pages we don't care about snapshotting. The worst thing
1111 // that can happen is that we go peek at an unmapped page and we fault it in
1112 bitmap
&= fbh
->fbh_bank
.fbb_bitmap
;
1114 // Then look at all the allocated pages not seen in the ring
1116 uint16_t ref
= firehose_bitmap_first_set(bitmap
);
1117 firehose_chunk_t fbc
= firehose_buffer_ref_to_chunk(fb
, ref
);
1118 uint16_t fbc_length
= fbc
->fc_pos
.fcp_next_entry_offs
;
1120 bitmap
&= ~(1ULL << ref
);
1121 if (fbc
->fc_start
+ fbc_length
<= fbc
->fc_data
) {
1122 // this page has its "recycle-requeue" done, but hasn't gone
1123 // through "recycle-reuse", or it has no data, ditch it
1126 if (!((firehose_tracepoint_t
)fbc
->fc_data
)->ft_length
) {
1127 // this thing has data, but the first tracepoint is unreadable
1128 // so also just ditch it
1131 if (fbc
->fc_pos
.fcp_flag_io
!= for_io
) {
1134 snapshot
->handler(fc
, evt
, fbc
);
1139 firehose_snapshot_tickle_clients(firehose_snapshot_t fs
, bool for_io
)
1141 firehose_client_t fc
;
1145 TAILQ_FOREACH(fc
, &server_config
.fs_clients
, fc_entry
) {
1146 if (slowpath(fc
->fc_memory_corrupted
)) {
1150 #if TARGET_OS_SIMULATOR
1153 } else if (!firehose_client_wakeup(fc
, 0, for_io
)) {
1158 fc
->fc_needs_io_snapshot
= true;
1160 fc
->fc_needs_mem_snapshot
= true;
1163 fs_clients_unlock();
1165 // cheating: equivalent to dispatch_group_enter() n times
1166 // without the acquire barriers that we don't need
1167 if (n
) os_atomic_add2o(fs
->fs_group
, dg_value
, n
, relaxed
);
1171 firehose_snapshot_finish(void *ctxt
)
1173 firehose_snapshot_t fs
= ctxt
;
1175 fs
->handler(NULL
, FIREHOSE_SNAPSHOT_EVENT_COMPLETE
, NULL
);
1176 server_config
.fs_snapshot
= NULL
;
1178 dispatch_release(fs
->fs_group
);
1179 Block_release(fs
->handler
);
1182 // resume the snapshot gate queue to maybe handle the next snapshot
1183 dispatch_resume(server_config
.fs_snapshot_gate_queue
);
1187 firehose_snapshot_gate(void *ctxt
)
1189 firehose_snapshot_t fs
= ctxt
;
1191 // prevent other snapshots from running until done
1193 dispatch_suspend(server_config
.fs_snapshot_gate_queue
);
1195 server_config
.fs_snapshot
= fs
;
1196 dispatch_group_async(fs
->fs_group
, server_config
.fs_mem_drain_queue
, ^{
1197 // start the fs_mem_snapshot, this is what triggers the snapshot
1198 // logic from _drain() or handle_death()
1199 fs
->handler(NULL
, FIREHOSE_SNAPSHOT_EVENT_MEM_START
, NULL
);
1200 firehose_snapshot_tickle_clients(fs
, false);
1202 dispatch_group_async(fs
->fs_group
, server_config
.fs_io_drain_queue
, ^{
1203 // start the fs_io_snapshot, this is what triggers the snapshot
1204 // logic from _drain() or handle_death()
1205 // 29868879: must always happen after the memory snapshot started
1206 fs
->handler(NULL
, FIREHOSE_SNAPSHOT_EVENT_IO_START
, NULL
);
1207 firehose_snapshot_tickle_clients(fs
, true);
1209 #if !TARGET_OS_SIMULATOR
1210 if (server_config
.fs_kernel_client
) {
1211 firehose_client_kernel_source_handle_event(
1212 server_config
.fs_kernel_client
);
1218 dispatch_group_notify_f(fs
->fs_group
, server_config
.fs_io_drain_queue
,
1219 fs
, firehose_snapshot_finish
);
1223 firehose_snapshot(firehose_snapshot_handler_t handler
)
1225 firehose_snapshot_t snapshot
= malloc(sizeof(struct firehose_snapshot_s
));
1227 snapshot
->handler
= Block_copy(handler
);
1228 snapshot
->fs_group
= dispatch_group_create();
1230 dispatch_async_f(server_config
.fs_snapshot_gate_queue
, snapshot
,
1231 firehose_snapshot_gate
);
1235 #pragma mark MiG handler routines
1238 firehose_server_register(mach_port_t server_port OS_UNUSED
,
1239 mach_port_t mem_port
, mach_vm_size_t mem_size
,
1240 mach_port_t comm_recvp
, mach_port_t comm_sendp
,
1241 mach_port_t extra_info_port
, mach_vm_size_t extra_info_size
,
1242 audit_token_t atoken
)
1244 mach_vm_address_t base_addr
= 0;
1245 firehose_client_t fc
= NULL
;
1247 struct firehose_client_connected_info_s fcci
= {
1248 .fcci_version
= FIREHOSE_CLIENT_CONNECTED_INFO_VERSION
,
1251 if (mem_size
!= sizeof(union firehose_buffer_u
)) {
1252 return KERN_INVALID_VALUE
;
1256 * Request a MACH_NOTIFY_NO_SENDERS notification for recvp. That should
1257 * indicate the client going away.
1259 mach_port_t previous
= MACH_PORT_NULL
;
1260 kr
= mach_port_request_notification(mach_task_self(), comm_recvp
,
1261 MACH_NOTIFY_NO_SENDERS
, 0, comm_recvp
,
1262 MACH_MSG_TYPE_MAKE_SEND_ONCE
, &previous
);
1263 DISPATCH_VERIFY_MIG(kr
);
1264 if (dispatch_assume_zero(kr
)) {
1265 return KERN_FAILURE
;
1267 dispatch_assert(previous
== MACH_PORT_NULL
);
1269 /* Map the memory handle into the server address space */
1270 kr
= mach_vm_map(mach_task_self(), &base_addr
, mem_size
, 0,
1271 VM_FLAGS_ANYWHERE
, mem_port
, 0, FALSE
,
1272 VM_PROT_READ
, VM_PROT_READ
, VM_INHERIT_NONE
);
1273 DISPATCH_VERIFY_MIG(kr
);
1274 if (dispatch_assume_zero(kr
)) {
1275 return KERN_NO_SPACE
;
1278 if (extra_info_port
&& extra_info_size
) {
1279 mach_vm_address_t addr
= 0;
1280 kr
= mach_vm_map(mach_task_self(), &addr
, extra_info_size
, 0,
1281 VM_FLAGS_ANYWHERE
, extra_info_port
, 0, FALSE
,
1282 VM_PROT_READ
, VM_PROT_READ
, VM_INHERIT_NONE
);
1283 if (dispatch_assume_zero(kr
)) {
1284 mach_vm_deallocate(mach_task_self(), base_addr
, mem_size
);
1285 return KERN_NO_SPACE
;
1287 fcci
.fcci_data
= (void *)(uintptr_t)addr
;
1288 fcci
.fcci_size
= (size_t)extra_info_size
;
1291 fc
= firehose_client_create((firehose_buffer_t
)base_addr
,
1292 (firehose_token_t
)&atoken
, comm_recvp
, comm_sendp
);
1293 dispatch_async(server_config
.fs_io_drain_queue
, ^{
1294 firehose_client_resume(fc
, &fcci
);
1295 if (fcci
.fcci_size
) {
1296 vm_deallocate(mach_task_self(), (vm_address_t
)fcci
.fcci_data
,
1301 if (extra_info_port
) firehose_mach_port_send_release(extra_info_port
);
1302 firehose_mach_port_send_release(mem_port
);
1303 return KERN_SUCCESS
;
1307 firehose_server_push_async(mach_port_t server_port OS_UNUSED
,
1308 qos_class_t qos
, boolean_t for_io
, boolean_t expects_notifs
)
1310 firehose_client_t fc
= cur_client_info
;
1311 pthread_priority_t pp
= _pthread_qos_class_encode(qos
, 0,
1312 _PTHREAD_PRIORITY_ENFORCE_FLAG
);
1314 _dispatch_debug("FIREHOSE_PUSH_ASYNC (unique_pid %llx)",
1315 firehose_client_get_unique_pid(fc
, NULL
));
1316 if (!slowpath(fc
->fc_memory_corrupted
)) {
1317 if (expects_notifs
&& !fc
->fc_use_notifs
) {
1318 fc
->fc_use_notifs
= true;
1320 firehose_client_wakeup(fc
, pp
, for_io
);
1322 return KERN_SUCCESS
;
1326 firehose_server_push_and_wait(mach_port_t server_port OS_UNUSED
,
1327 mach_port_t reply_port
, qos_class_t qos
, boolean_t for_io
,
1328 firehose_push_reply_t
*push_reply OS_UNUSED
,
1329 boolean_t
*quarantinedOut OS_UNUSED
)
1331 firehose_client_t fc
= cur_client_info
;
1332 dispatch_block_flags_t flags
= DISPATCH_BLOCK_ENFORCE_QOS_CLASS
;
1333 dispatch_block_t block
;
1336 _dispatch_debug("FIREHOSE_PUSH (unique_pid %llx)",
1337 firehose_client_get_unique_pid(fc
, NULL
));
1339 if (slowpath(fc
->fc_memory_corrupted
)) {
1340 firehose_client_mark_corrupted(fc
, reply_port
);
1341 return MIG_NO_REPLY
;
1345 q
= server_config
.fs_io_drain_queue
;
1347 q
= server_config
.fs_mem_drain_queue
;
1350 block
= dispatch_block_create_with_qos_class(flags
, qos
, 0, ^{
1351 firehose_client_drain_one(fc
, reply_port
,
1352 for_io
? FIREHOSE_DRAIN_FOR_IO
: 0);
1354 dispatch_async(q
, block
);
1355 _Block_release(block
);
1356 return MIG_NO_REPLY
;
1360 firehose_server_demux(firehose_client_t fc
, mach_msg_header_t
*msg_hdr
)
1362 const size_t reply_size
=
1363 sizeof(union __ReplyUnion__firehose_server_firehose_subsystem
);
1365 cur_client_info
= fc
;
1366 firehose_mig_server(firehose_server
, reply_size
, msg_hdr
);