2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
24 #include "protocolServer.h"
26 #include <sys/mount.h>
28 static void _dispatch_source_merge_kevent(dispatch_source_t ds
,
29 const struct kevent64_s
*ke
);
30 static bool _dispatch_kevent_register(dispatch_kevent_t
*dkp
, uint32_t *flgp
);
31 static void _dispatch_kevent_unregister(dispatch_kevent_t dk
, uint32_t flg
);
32 static bool _dispatch_kevent_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
34 static void _dispatch_kevent_drain(struct kevent64_s
*ke
);
35 static void _dispatch_kevent_merge(struct kevent64_s
*ke
);
36 static void _dispatch_timers_kevent(struct kevent64_s
*ke
);
37 static void _dispatch_timers_unregister(dispatch_source_t ds
,
38 dispatch_kevent_t dk
);
39 static void _dispatch_timers_update(dispatch_source_t ds
);
40 static void _dispatch_timer_aggregates_check(void);
41 static void _dispatch_timer_aggregates_register(dispatch_source_t ds
);
42 static void _dispatch_timer_aggregates_update(dispatch_source_t ds
,
44 static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds
,
46 static inline unsigned long _dispatch_source_timer_data(
47 dispatch_source_refs_t dr
, unsigned long prev
);
48 static long _dispatch_kq_update(const struct kevent64_s
*);
49 static void _dispatch_memorystatus_init(void);
51 static void _dispatch_mach_host_calendar_change_register(void);
52 static void _dispatch_mach_recv_msg_buf_init(void);
53 static kern_return_t
_dispatch_kevent_machport_resume(dispatch_kevent_t dk
,
54 uint32_t new_flags
, uint32_t del_flags
);
55 static kern_return_t
_dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk
,
56 uint32_t new_flags
, uint32_t del_flags
);
57 static inline void _dispatch_kevent_mach_portset(struct kevent64_s
*ke
);
59 static inline void _dispatch_mach_host_calendar_change_register(void) {}
60 static inline void _dispatch_mach_recv_msg_buf_init(void) {}
62 static const char * _evfiltstr(short filt
);
64 static void _dispatch_kevent_debug(struct kevent64_s
* kev
, const char* str
);
65 static void _dispatch_kevent_debugger(void *context
);
66 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \
67 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q)
70 _dispatch_kevent_debug(struct kevent64_s
* kev DISPATCH_UNUSED
,
71 const char* str DISPATCH_UNUSED
) {}
72 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE()
76 #pragma mark dispatch_source_t
79 dispatch_source_create(dispatch_source_type_t type
,
84 const struct kevent64_s
*proto_kev
= &type
->ke
;
89 if (type
== NULL
|| (mask
& ~type
->mask
)) {
93 switch (type
->ke
.filter
) {
100 #if DISPATCH_USE_VM_PRESSURE
103 #if DISPATCH_USE_MEMORYSTATUS
104 case EVFILT_MEMORYSTATUS
:
106 case DISPATCH_EVFILT_CUSTOM_ADD
:
107 case DISPATCH_EVFILT_CUSTOM_OR
:
112 case DISPATCH_EVFILT_TIMER
:
113 if (!!handle
^ !!type
->ke
.ident
) {
121 ds
= _dispatch_alloc(DISPATCH_VTABLE(source
),
122 sizeof(struct dispatch_source_s
));
123 // Initialize as a queue first, then override some settings below.
124 _dispatch_queue_init((dispatch_queue_t
)ds
);
125 ds
->dq_label
= "source";
127 ds
->do_ref_cnt
++; // the reference the manager queue holds
128 ds
->do_ref_cnt
++; // since source is created suspended
129 ds
->do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_INTERVAL
;
130 // The initial target queue is the manager queue, in order to get
131 // the source installed. <rdar://problem/8928171>
132 ds
->do_targetq
= &_dispatch_mgr_q
;
134 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
135 dk
->dk_kevent
= *proto_kev
;
136 dk
->dk_kevent
.ident
= handle
;
137 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
138 dk
->dk_kevent
.fflags
|= (uint32_t)mask
;
139 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
140 TAILQ_INIT(&dk
->dk_sources
);
143 ds
->ds_pending_data_mask
= dk
->dk_kevent
.fflags
;
144 ds
->ds_ident_hack
= (uintptr_t)dk
->dk_kevent
.ident
;
145 if ((EV_DISPATCH
|EV_ONESHOT
) & proto_kev
->flags
) {
146 ds
->ds_is_level
= true;
147 ds
->ds_needs_rearm
= true;
148 } else if (!(EV_CLEAR
& proto_kev
->flags
)) {
149 // we cheat and use EV_CLEAR to mean a "flag thingy"
150 ds
->ds_is_adder
= true;
152 // Some sources require special processing
153 if (type
->init
!= NULL
) {
154 type
->init(ds
, type
, handle
, mask
, q
);
156 dispatch_assert(!(ds
->ds_is_level
&& ds
->ds_is_adder
));
158 if (fastpath(!ds
->ds_refs
)) {
159 ds
->ds_refs
= _dispatch_calloc(1ul,
160 sizeof(struct dispatch_source_refs_s
));
162 ds
->ds_refs
->dr_source_wref
= _dispatch_ptr2wref(ds
);
164 // First item on the queue sets the user-specified target queue
165 dispatch_set_target_queue(ds
, q
);
166 _dispatch_object_debug(ds
, "%s", __func__
);
171 _dispatch_source_dispose(dispatch_source_t ds
)
173 _dispatch_object_debug(ds
, "%s", __func__
);
175 _dispatch_queue_destroy(ds
);
179 _dispatch_source_xref_dispose(dispatch_source_t ds
)
181 _dispatch_wakeup(ds
);
185 dispatch_source_cancel(dispatch_source_t ds
)
187 _dispatch_object_debug(ds
, "%s", __func__
);
188 // Right after we set the cancel flag, someone else
189 // could potentially invoke the source, do the cancelation,
190 // unregister the source, and deallocate it. We would
191 // need to therefore retain/release before setting the bit
193 _dispatch_retain(ds
);
194 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_CANCELED
, relaxed
);
195 _dispatch_wakeup(ds
);
196 _dispatch_release(ds
);
200 dispatch_source_testcancel(dispatch_source_t ds
)
202 return (bool)(ds
->ds_atomic_flags
& DSF_CANCELED
);
207 dispatch_source_get_mask(dispatch_source_t ds
)
209 return ds
->ds_pending_data_mask
;
213 dispatch_source_get_handle(dispatch_source_t ds
)
215 return (unsigned int)ds
->ds_ident_hack
;
219 dispatch_source_get_data(dispatch_source_t ds
)
225 dispatch_source_merge_data(dispatch_source_t ds
, unsigned long val
)
227 struct kevent64_s kev
= {
228 .fflags
= (typeof(kev
.fflags
))val
,
229 .data
= (typeof(kev
.data
))val
,
233 ds
->ds_dkev
->dk_kevent
.filter
== DISPATCH_EVFILT_CUSTOM_ADD
||
234 ds
->ds_dkev
->dk_kevent
.filter
== DISPATCH_EVFILT_CUSTOM_OR
);
236 _dispatch_source_merge_kevent(ds
, &kev
);
240 #pragma mark dispatch_source_handler
243 // 6618342 Contact the team that owns the Instrument DTrace probe before
244 // renaming this symbol
246 _dispatch_source_set_event_handler2(void *context
)
248 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
249 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
250 dispatch_source_refs_t dr
= ds
->ds_refs
;
252 if (ds
->ds_handler_is_block
&& dr
->ds_handler_ctxt
) {
253 Block_release(dr
->ds_handler_ctxt
);
255 dr
->ds_handler_func
= context
? _dispatch_Block_invoke(context
) : NULL
;
256 dr
->ds_handler_ctxt
= context
;
257 ds
->ds_handler_is_block
= true;
261 dispatch_source_set_event_handler(dispatch_source_t ds
,
262 dispatch_block_t handler
)
264 handler
= _dispatch_Block_copy(handler
);
265 _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, handler
,
266 _dispatch_source_set_event_handler2
);
268 #endif /* __BLOCKS__ */
271 _dispatch_source_set_event_handler_f(void *context
)
273 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
274 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
275 dispatch_source_refs_t dr
= ds
->ds_refs
;
278 if (ds
->ds_handler_is_block
&& dr
->ds_handler_ctxt
) {
279 Block_release(dr
->ds_handler_ctxt
);
282 dr
->ds_handler_func
= context
;
283 dr
->ds_handler_ctxt
= ds
->do_ctxt
;
284 ds
->ds_handler_is_block
= false;
288 dispatch_source_set_event_handler_f(dispatch_source_t ds
,
289 dispatch_function_t handler
)
291 _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, handler
,
292 _dispatch_source_set_event_handler_f
);
296 // 6618342 Contact the team that owns the Instrument DTrace probe before
297 // renaming this symbol
299 _dispatch_source_set_cancel_handler2(void *context
)
301 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
302 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
303 dispatch_source_refs_t dr
= ds
->ds_refs
;
305 if (ds
->ds_cancel_is_block
&& dr
->ds_cancel_handler
) {
306 Block_release(dr
->ds_cancel_handler
);
308 dr
->ds_cancel_handler
= context
;
309 ds
->ds_cancel_is_block
= true;
313 dispatch_source_set_cancel_handler(dispatch_source_t ds
,
314 dispatch_block_t handler
)
316 handler
= _dispatch_Block_copy(handler
);
317 _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, handler
,
318 _dispatch_source_set_cancel_handler2
);
320 #endif /* __BLOCKS__ */
323 _dispatch_source_set_cancel_handler_f(void *context
)
325 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
326 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
327 dispatch_source_refs_t dr
= ds
->ds_refs
;
330 if (ds
->ds_cancel_is_block
&& dr
->ds_cancel_handler
) {
331 Block_release(dr
->ds_cancel_handler
);
334 dr
->ds_cancel_handler
= context
;
335 ds
->ds_cancel_is_block
= false;
339 dispatch_source_set_cancel_handler_f(dispatch_source_t ds
,
340 dispatch_function_t handler
)
342 _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, handler
,
343 _dispatch_source_set_cancel_handler_f
);
348 _dispatch_source_set_registration_handler2(void *context
)
350 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
351 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
352 dispatch_source_refs_t dr
= ds
->ds_refs
;
354 if (ds
->ds_registration_is_block
&& dr
->ds_registration_handler
) {
355 Block_release(dr
->ds_registration_handler
);
357 dr
->ds_registration_handler
= context
;
358 ds
->ds_registration_is_block
= true;
362 dispatch_source_set_registration_handler(dispatch_source_t ds
,
363 dispatch_block_t handler
)
365 handler
= _dispatch_Block_copy(handler
);
366 _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, handler
,
367 _dispatch_source_set_registration_handler2
);
369 #endif /* __BLOCKS__ */
372 _dispatch_source_set_registration_handler_f(void *context
)
374 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
375 dispatch_assert(dx_type(ds
) == DISPATCH_SOURCE_KEVENT_TYPE
);
376 dispatch_source_refs_t dr
= ds
->ds_refs
;
379 if (ds
->ds_registration_is_block
&& dr
->ds_registration_handler
) {
380 Block_release(dr
->ds_registration_handler
);
383 dr
->ds_registration_handler
= context
;
384 ds
->ds_registration_is_block
= false;
388 dispatch_source_set_registration_handler_f(dispatch_source_t ds
,
389 dispatch_function_t handler
)
391 _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, handler
,
392 _dispatch_source_set_registration_handler_f
);
396 #pragma mark dispatch_source_invoke
399 _dispatch_source_registration_callout(dispatch_source_t ds
)
401 dispatch_source_refs_t dr
= ds
->ds_refs
;
403 if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== -1)) {
404 // no registration callout if source is canceled rdar://problem/8955246
406 if (ds
->ds_registration_is_block
) {
407 Block_release(dr
->ds_registration_handler
);
409 } else if (ds
->ds_registration_is_block
) {
410 dispatch_block_t b
= dr
->ds_registration_handler
;
411 _dispatch_client_callout_block(b
);
412 Block_release(dr
->ds_registration_handler
);
415 dispatch_function_t f
= dr
->ds_registration_handler
;
416 _dispatch_client_callout(ds
->do_ctxt
, f
);
418 ds
->ds_registration_is_block
= false;
419 dr
->ds_registration_handler
= NULL
;
423 _dispatch_source_cancel_callout(dispatch_source_t ds
)
425 dispatch_source_refs_t dr
= ds
->ds_refs
;
427 ds
->ds_pending_data_mask
= 0;
428 ds
->ds_pending_data
= 0;
432 if (ds
->ds_handler_is_block
) {
433 Block_release(dr
->ds_handler_ctxt
);
434 ds
->ds_handler_is_block
= false;
435 dr
->ds_handler_func
= NULL
;
436 dr
->ds_handler_ctxt
= NULL
;
438 if (ds
->ds_registration_is_block
) {
439 Block_release(dr
->ds_registration_handler
);
440 ds
->ds_registration_is_block
= false;
441 dr
->ds_registration_handler
= NULL
;
445 if (!dr
->ds_cancel_handler
) {
448 if (ds
->ds_cancel_is_block
) {
450 dispatch_block_t b
= dr
->ds_cancel_handler
;
451 if (ds
->ds_atomic_flags
& DSF_CANCELED
) {
452 _dispatch_client_callout_block(b
);
454 Block_release(dr
->ds_cancel_handler
);
455 ds
->ds_cancel_is_block
= false;
458 dispatch_function_t f
= dr
->ds_cancel_handler
;
459 if (ds
->ds_atomic_flags
& DSF_CANCELED
) {
460 _dispatch_client_callout(ds
->do_ctxt
, f
);
463 dr
->ds_cancel_handler
= NULL
;
467 _dispatch_source_latch_and_call(dispatch_source_t ds
)
471 if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== -1)) {
474 dispatch_source_refs_t dr
= ds
->ds_refs
;
475 prev
= dispatch_atomic_xchg2o(ds
, ds_pending_data
, 0, relaxed
);
476 if (ds
->ds_is_level
) {
478 } else if (ds
->ds_is_timer
&& ds_timer(dr
).target
&& prev
) {
479 ds
->ds_data
= _dispatch_source_timer_data(dr
, prev
);
483 if (dispatch_assume(prev
) && dr
->ds_handler_func
) {
484 _dispatch_client_callout(dr
->ds_handler_ctxt
, dr
->ds_handler_func
);
489 _dispatch_source_kevent_unregister(dispatch_source_t ds
)
491 _dispatch_object_debug(ds
, "%s", __func__
);
492 dispatch_kevent_t dk
= ds
->ds_dkev
;
494 switch (dk
->dk_kevent
.filter
) {
495 case DISPATCH_EVFILT_TIMER
:
496 _dispatch_timers_unregister(ds
, dk
);
499 TAILQ_REMOVE(&dk
->dk_sources
, ds
->ds_refs
, dr_list
);
500 _dispatch_kevent_unregister(dk
, (uint32_t)ds
->ds_pending_data_mask
);
504 (void)dispatch_atomic_and2o(ds
, ds_atomic_flags
, ~DSF_ARMED
, relaxed
);
505 ds
->ds_needs_rearm
= false; // re-arm is pointless and bad now
506 _dispatch_release(ds
); // the retain is done at creation time
510 _dispatch_source_kevent_resume(dispatch_source_t ds
, uint32_t new_flags
)
512 switch (ds
->ds_dkev
->dk_kevent
.filter
) {
513 case DISPATCH_EVFILT_TIMER
:
514 return _dispatch_timers_update(ds
);
515 case EVFILT_MACHPORT
:
516 if (ds
->ds_pending_data_mask
& DISPATCH_MACH_RECV_MESSAGE
) {
517 new_flags
|= DISPATCH_MACH_RECV_MESSAGE
; // emulate EV_DISPATCH
521 if (_dispatch_kevent_resume(ds
->ds_dkev
, new_flags
, 0)) {
522 _dispatch_source_kevent_unregister(ds
);
527 _dispatch_source_kevent_register(dispatch_source_t ds
)
529 dispatch_assert_zero(ds
->ds_is_installed
);
530 switch (ds
->ds_dkev
->dk_kevent
.filter
) {
531 case DISPATCH_EVFILT_TIMER
:
532 return _dispatch_timers_update(ds
);
535 bool do_resume
= _dispatch_kevent_register(&ds
->ds_dkev
, &flags
);
536 TAILQ_INSERT_TAIL(&ds
->ds_dkev
->dk_sources
, ds
->ds_refs
, dr_list
);
537 if (do_resume
|| ds
->ds_needs_rearm
) {
538 _dispatch_source_kevent_resume(ds
, flags
);
540 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_ARMED
, relaxed
);
541 _dispatch_object_debug(ds
, "%s", __func__
);
544 DISPATCH_ALWAYS_INLINE
545 static inline dispatch_queue_t
546 _dispatch_source_invoke2(dispatch_object_t dou
,
547 _dispatch_thread_semaphore_t
*sema_ptr DISPATCH_UNUSED
)
549 dispatch_source_t ds
= dou
._ds
;
550 if (slowpath(_dispatch_queue_drain(ds
))) {
551 DISPATCH_CLIENT_CRASH("Sync onto source");
554 // This function performs all source actions. Each action is responsible
555 // for verifying that it takes place on the appropriate queue. If the
556 // current queue is not the correct queue for this action, the correct queue
557 // will be returned and the invoke will be re-driven on that queue.
559 // The order of tests here in invoke and in probe should be consistent.
561 dispatch_queue_t dq
= _dispatch_queue_get_current();
562 dispatch_source_refs_t dr
= ds
->ds_refs
;
564 if (!ds
->ds_is_installed
) {
565 // The source needs to be installed on the manager queue.
566 if (dq
!= &_dispatch_mgr_q
) {
567 return &_dispatch_mgr_q
;
569 _dispatch_source_kevent_register(ds
);
570 ds
->ds_is_installed
= true;
571 if (dr
->ds_registration_handler
) {
572 return ds
->do_targetq
;
574 if (slowpath(ds
->do_xref_cnt
== -1)) {
575 return &_dispatch_mgr_q
; // rdar://problem/9558246
577 } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds
))) {
578 // Source suspended by an item drained from the source queue.
580 } else if (dr
->ds_registration_handler
) {
581 // The source has been registered and the registration handler needs
582 // to be delivered on the target queue.
583 if (dq
!= ds
->do_targetq
) {
584 return ds
->do_targetq
;
586 // clears ds_registration_handler
587 _dispatch_source_registration_callout(ds
);
588 if (slowpath(ds
->do_xref_cnt
== -1)) {
589 return &_dispatch_mgr_q
; // rdar://problem/9558246
591 } else if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== -1)){
592 // The source has been cancelled and needs to be uninstalled from the
593 // manager queue. After uninstallation, the cancellation handler needs
594 // to be delivered to the target queue.
596 if (dq
!= &_dispatch_mgr_q
) {
597 return &_dispatch_mgr_q
;
599 _dispatch_source_kevent_unregister(ds
);
601 if (dr
->ds_cancel_handler
|| ds
->ds_handler_is_block
||
602 ds
->ds_registration_is_block
) {
603 if (dq
!= ds
->do_targetq
) {
604 return ds
->do_targetq
;
607 _dispatch_source_cancel_callout(ds
);
608 } else if (ds
->ds_pending_data
) {
609 // The source has pending data to deliver via the event handler callback
610 // on the target queue. Some sources need to be rearmed on the manager
611 // queue after event delivery.
612 if (dq
!= ds
->do_targetq
) {
613 return ds
->do_targetq
;
615 _dispatch_source_latch_and_call(ds
);
616 if (ds
->ds_needs_rearm
) {
617 return &_dispatch_mgr_q
;
619 } else if (ds
->ds_needs_rearm
&& !(ds
->ds_atomic_flags
& DSF_ARMED
)) {
620 // The source needs to be rearmed on the manager queue.
621 if (dq
!= &_dispatch_mgr_q
) {
622 return &_dispatch_mgr_q
;
624 _dispatch_source_kevent_resume(ds
, 0);
625 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_ARMED
, relaxed
);
633 _dispatch_source_invoke(dispatch_source_t ds
)
635 _dispatch_queue_class_invoke(ds
, _dispatch_source_invoke2
);
639 _dispatch_source_probe(dispatch_source_t ds
)
641 // This function determines whether the source needs to be invoked.
642 // The order of tests here in probe and in invoke should be consistent.
644 dispatch_source_refs_t dr
= ds
->ds_refs
;
645 if (!ds
->ds_is_installed
) {
646 // The source needs to be installed on the manager queue.
648 } else if (dr
->ds_registration_handler
) {
649 // The registration handler needs to be delivered to the target queue.
651 } else if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== -1)){
652 // The source needs to be uninstalled from the manager queue, or the
653 // cancellation handler needs to be delivered to the target queue.
654 // Note: cancellation assumes installation.
655 if (ds
->ds_dkev
|| dr
->ds_cancel_handler
657 || ds
->ds_handler_is_block
|| ds
->ds_registration_is_block
662 } else if (ds
->ds_pending_data
) {
663 // The source has pending data to deliver to the target queue.
665 } else if (ds
->ds_needs_rearm
&& !(ds
->ds_atomic_flags
& DSF_ARMED
)) {
666 // The source needs to be rearmed on the manager queue.
669 return (ds
->dq_items_tail
!= NULL
);
673 _dispatch_source_merge_kevent(dispatch_source_t ds
, const struct kevent64_s
*ke
)
675 if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== -1)) {
678 if (ds
->ds_is_level
) {
679 // ke->data is signed and "negative available data" makes no sense
680 // zero bytes happens when EV_EOF is set
681 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
682 dispatch_assert(ke
->data
>= 0l);
683 dispatch_atomic_store2o(ds
, ds_pending_data
, ~(unsigned long)ke
->data
,
685 } else if (ds
->ds_is_adder
) {
686 (void)dispatch_atomic_add2o(ds
, ds_pending_data
,
687 (unsigned long)ke
->data
, relaxed
);
688 } else if (ke
->fflags
& ds
->ds_pending_data_mask
) {
689 (void)dispatch_atomic_or2o(ds
, ds_pending_data
,
690 ke
->fflags
& ds
->ds_pending_data_mask
, relaxed
);
692 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
693 if (ds
->ds_needs_rearm
) {
694 (void)dispatch_atomic_and2o(ds
, ds_atomic_flags
, ~DSF_ARMED
, relaxed
);
697 _dispatch_wakeup(ds
);
701 #pragma mark dispatch_kevent_t
703 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
704 static void _dispatch_kevent_guard(dispatch_kevent_t dk
);
705 static void _dispatch_kevent_unguard(dispatch_kevent_t dk
);
707 static inline void _dispatch_kevent_guard(dispatch_kevent_t dk
) { (void)dk
; }
708 static inline void _dispatch_kevent_unguard(dispatch_kevent_t dk
) { (void)dk
; }
711 static struct dispatch_kevent_s _dispatch_kevent_data_or
= {
713 .filter
= DISPATCH_EVFILT_CUSTOM_OR
,
716 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or
.dk_sources
),
718 static struct dispatch_kevent_s _dispatch_kevent_data_add
= {
720 .filter
= DISPATCH_EVFILT_CUSTOM_ADD
,
722 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add
.dk_sources
),
725 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
727 DISPATCH_CACHELINE_ALIGN
728 static TAILQ_HEAD(, dispatch_kevent_s
) _dispatch_sources
[DSL_HASH_SIZE
];
731 _dispatch_kevent_init()
734 for (i
= 0; i
< DSL_HASH_SIZE
; i
++) {
735 TAILQ_INIT(&_dispatch_sources
[i
]);
738 TAILQ_INSERT_TAIL(&_dispatch_sources
[0],
739 &_dispatch_kevent_data_or
, dk_list
);
740 TAILQ_INSERT_TAIL(&_dispatch_sources
[0],
741 &_dispatch_kevent_data_add
, dk_list
);
742 _dispatch_kevent_data_or
.dk_kevent
.udata
=
743 (uintptr_t)&_dispatch_kevent_data_or
;
744 _dispatch_kevent_data_add
.dk_kevent
.udata
=
745 (uintptr_t)&_dispatch_kevent_data_add
;
748 static inline uintptr_t
749 _dispatch_kevent_hash(uint64_t ident
, short filter
)
753 value
= (filter
== EVFILT_MACHPORT
||
754 filter
== DISPATCH_EVFILT_MACH_NOTIFICATION
?
755 MACH_PORT_INDEX(ident
) : ident
);
759 return DSL_HASH((uintptr_t)value
);
762 static dispatch_kevent_t
763 _dispatch_kevent_find(uint64_t ident
, short filter
)
765 uintptr_t hash
= _dispatch_kevent_hash(ident
, filter
);
766 dispatch_kevent_t dki
;
768 TAILQ_FOREACH(dki
, &_dispatch_sources
[hash
], dk_list
) {
769 if (dki
->dk_kevent
.ident
== ident
&& dki
->dk_kevent
.filter
== filter
) {
777 _dispatch_kevent_insert(dispatch_kevent_t dk
)
779 _dispatch_kevent_guard(dk
);
780 uintptr_t hash
= _dispatch_kevent_hash(dk
->dk_kevent
.ident
,
781 dk
->dk_kevent
.filter
);
782 TAILQ_INSERT_TAIL(&_dispatch_sources
[hash
], dk
, dk_list
);
785 // Find existing kevents, and merge any new flags if necessary
787 _dispatch_kevent_register(dispatch_kevent_t
*dkp
, uint32_t *flgp
)
789 dispatch_kevent_t dk
, ds_dkev
= *dkp
;
791 bool do_resume
= false;
793 dk
= _dispatch_kevent_find(ds_dkev
->dk_kevent
.ident
,
794 ds_dkev
->dk_kevent
.filter
);
796 // If an existing dispatch kevent is found, check to see if new flags
797 // need to be added to the existing kevent
798 new_flags
= ~dk
->dk_kevent
.fflags
& ds_dkev
->dk_kevent
.fflags
;
799 dk
->dk_kevent
.fflags
|= ds_dkev
->dk_kevent
.fflags
;
802 do_resume
= new_flags
;
805 _dispatch_kevent_insert(dk
);
806 new_flags
= dk
->dk_kevent
.fflags
;
809 // Re-register the kevent with the kernel if new flags were added
810 // by the dispatch kevent
812 dk
->dk_kevent
.flags
|= EV_ADD
;
819 _dispatch_kevent_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
823 switch (dk
->dk_kevent
.filter
) {
824 case DISPATCH_EVFILT_TIMER
:
825 case DISPATCH_EVFILT_CUSTOM_ADD
:
826 case DISPATCH_EVFILT_CUSTOM_OR
:
827 // these types not registered with kevent
830 case EVFILT_MACHPORT
:
831 return _dispatch_kevent_machport_resume(dk
, new_flags
, del_flags
);
832 case DISPATCH_EVFILT_MACH_NOTIFICATION
:
833 return _dispatch_kevent_mach_notify_resume(dk
, new_flags
, del_flags
);
836 if (dk
->dk_kevent
.flags
& EV_ONESHOT
) {
841 r
= _dispatch_kq_update(&dk
->dk_kevent
);
842 if (dk
->dk_kevent
.flags
& EV_DISPATCH
) {
843 dk
->dk_kevent
.flags
&= ~EV_ADD
;
850 _dispatch_kevent_dispose(dispatch_kevent_t dk
)
854 switch (dk
->dk_kevent
.filter
) {
855 case DISPATCH_EVFILT_TIMER
:
856 case DISPATCH_EVFILT_CUSTOM_ADD
:
857 case DISPATCH_EVFILT_CUSTOM_OR
:
858 // these sources live on statically allocated lists
861 case EVFILT_MACHPORT
:
862 _dispatch_kevent_machport_resume(dk
, 0, dk
->dk_kevent
.fflags
);
864 case DISPATCH_EVFILT_MACH_NOTIFICATION
:
865 _dispatch_kevent_mach_notify_resume(dk
, 0, dk
->dk_kevent
.fflags
);
869 if (dk
->dk_kevent
.flags
& EV_ONESHOT
) {
870 break; // implicitly deleted
874 if (~dk
->dk_kevent
.flags
& EV_DELETE
) {
875 dk
->dk_kevent
.flags
|= EV_DELETE
;
876 dk
->dk_kevent
.flags
&= ~(EV_ADD
|EV_ENABLE
);
877 _dispatch_kq_update(&dk
->dk_kevent
);
882 hash
= _dispatch_kevent_hash(dk
->dk_kevent
.ident
,
883 dk
->dk_kevent
.filter
);
884 TAILQ_REMOVE(&_dispatch_sources
[hash
], dk
, dk_list
);
885 _dispatch_kevent_unguard(dk
);
890 _dispatch_kevent_unregister(dispatch_kevent_t dk
, uint32_t flg
)
892 dispatch_source_refs_t dri
;
893 uint32_t del_flags
, fflags
= 0;
895 if (TAILQ_EMPTY(&dk
->dk_sources
)) {
896 _dispatch_kevent_dispose(dk
);
898 TAILQ_FOREACH(dri
, &dk
->dk_sources
, dr_list
) {
899 dispatch_source_t dsi
= _dispatch_source_from_refs(dri
);
900 uint32_t mask
= (uint32_t)dsi
->ds_pending_data_mask
;
903 del_flags
= flg
& ~fflags
;
905 dk
->dk_kevent
.flags
|= EV_ADD
;
906 dk
->dk_kevent
.fflags
= fflags
;
907 _dispatch_kevent_resume(dk
, 0, del_flags
);
914 _dispatch_kevent_proc_exit(struct kevent64_s
*ke
)
916 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
917 // <rdar://problem/5067725>. As a workaround, we simulate an exit event for
918 // any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
919 struct kevent64_s fake
;
921 fake
.flags
&= ~EV_ERROR
;
922 fake
.fflags
= NOTE_EXIT
;
924 _dispatch_kevent_drain(&fake
);
929 _dispatch_kevent_error(struct kevent64_s
*ke
)
931 _dispatch_kevent_debug(ke
, __func__
);
933 // log the unexpected error
934 _dispatch_bug_kevent_client("kevent", _evfiltstr(ke
->filter
),
935 ke
->flags
& EV_DELETE
? "delete" :
936 ke
->flags
& EV_ADD
? "add" :
937 ke
->flags
& EV_ENABLE
? "enable" : "monitor",
943 _dispatch_kevent_drain(struct kevent64_s
*ke
)
946 static dispatch_once_t pred
;
947 dispatch_once_f(&pred
, NULL
, _dispatch_kevent_debugger
);
949 if (ke
->filter
== EVFILT_USER
) {
952 if (slowpath(ke
->flags
& EV_ERROR
)) {
953 if (ke
->filter
== EVFILT_PROC
) {
954 if (ke
->flags
& EV_DELETE
) {
955 // Process exited while monitored
957 } else if (ke
->data
== ESRCH
) {
958 return _dispatch_kevent_proc_exit(ke
);
960 #if DISPATCH_USE_VM_PRESSURE
961 } else if (ke
->filter
== EVFILT_VM
&& ke
->data
== ENOTSUP
) {
962 // Memory pressure kevent is not supported on all platforms
963 // <rdar://problem/8636227>
966 #if DISPATCH_USE_MEMORYSTATUS
967 } else if (ke
->filter
== EVFILT_MEMORYSTATUS
&&
968 (ke
->data
== EINVAL
|| ke
->data
== ENOTSUP
)) {
969 // Memory status kevent is not supported on all platforms
973 return _dispatch_kevent_error(ke
);
975 _dispatch_kevent_debug(ke
, __func__
);
976 if (ke
->filter
== EVFILT_TIMER
) {
977 return _dispatch_timers_kevent(ke
);
980 if (ke
->filter
== EVFILT_MACHPORT
) {
981 return _dispatch_kevent_mach_portset(ke
);
984 return _dispatch_kevent_merge(ke
);
989 _dispatch_kevent_merge(struct kevent64_s
*ke
)
991 dispatch_kevent_t dk
;
992 dispatch_source_refs_t dri
;
994 dk
= (void*)ke
->udata
;
997 if (ke
->flags
& EV_ONESHOT
) {
998 dk
->dk_kevent
.flags
|= EV_ONESHOT
;
1000 TAILQ_FOREACH(dri
, &dk
->dk_sources
, dr_list
) {
1001 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri
), ke
);
1005 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1007 _dispatch_kevent_guard(dispatch_kevent_t dk
)
1010 const unsigned int guard_flags
= GUARD_CLOSE
;
1011 int r
, fd_flags
= 0;
1012 switch (dk
->dk_kevent
.filter
) {
1016 guard
= &dk
->dk_kevent
;
1017 r
= change_fdguard_np((int)dk
->dk_kevent
.ident
, NULL
, 0,
1018 &guard
, guard_flags
, &fd_flags
);
1019 if (slowpath(r
== -1)) {
1022 (void)dispatch_assume_zero(err
);
1026 dk
->dk_kevent
.ext
[0] = guard_flags
;
1027 dk
->dk_kevent
.ext
[1] = fd_flags
;
1033 _dispatch_kevent_unguard(dispatch_kevent_t dk
)
1036 unsigned int guard_flags
;
1038 switch (dk
->dk_kevent
.filter
) {
1042 guard_flags
= (unsigned int)dk
->dk_kevent
.ext
[0];
1046 guard
= &dk
->dk_kevent
;
1047 fd_flags
= (int)dk
->dk_kevent
.ext
[1];
1048 r
= change_fdguard_np((int)dk
->dk_kevent
.ident
, &guard
,
1049 guard_flags
, NULL
, 0, &fd_flags
);
1050 if (slowpath(r
== -1)) {
1051 (void)dispatch_assume_zero(errno
);
1054 dk
->dk_kevent
.ext
[0] = 0;
1058 #endif // DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1061 #pragma mark dispatch_source_timer
1063 #if DISPATCH_USE_DTRACE && DISPATCH_USE_DTRACE_INTROSPECTION
1064 static dispatch_source_refs_t
1065 _dispatch_trace_next_timer
[DISPATCH_TIMER_QOS_COUNT
];
1066 #define _dispatch_trace_next_timer_set(x, q) \
1067 _dispatch_trace_next_timer[(q)] = (x)
1068 #define _dispatch_trace_next_timer_program(d, q) \
1069 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1070 #define _dispatch_trace_next_timer_wake(q) \
1071 _dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)])
1073 #define _dispatch_trace_next_timer_set(x, q)
1074 #define _dispatch_trace_next_timer_program(d, q)
1075 #define _dispatch_trace_next_timer_wake(q)
1078 #define _dispatch_source_timer_telemetry_enabled() false
1082 _dispatch_source_timer_telemetry_slow(dispatch_source_t ds
,
1083 uintptr_t ident
, struct dispatch_timer_source_s
*values
)
1085 if (_dispatch_trace_timer_configure_enabled()) {
1086 _dispatch_trace_timer_configure(ds
, ident
, values
);
1090 DISPATCH_ALWAYS_INLINE
1092 _dispatch_source_timer_telemetry(dispatch_source_t ds
, uintptr_t ident
,
1093 struct dispatch_timer_source_s
*values
)
1095 if (_dispatch_trace_timer_configure_enabled() ||
1096 _dispatch_source_timer_telemetry_enabled()) {
1097 _dispatch_source_timer_telemetry_slow(ds
, ident
, values
);
1098 asm(""); // prevent tailcall
1102 // approx 1 year (60s * 60m * 24h * 365d)
1103 #define FOREVER_NSEC 31536000000000000ull
1105 DISPATCH_ALWAYS_INLINE
1106 static inline uint64_t
1107 _dispatch_source_timer_now(uint64_t nows
[], unsigned int tidx
)
1109 unsigned int tk
= DISPATCH_TIMER_KIND(tidx
);
1110 if (nows
&& fastpath(nows
[tk
])) {
1115 case DISPATCH_TIMER_KIND_MACH
:
1116 now
= _dispatch_absolute_time();
1118 case DISPATCH_TIMER_KIND_WALL
:
1119 now
= _dispatch_get_nanoseconds();
1128 static inline unsigned long
1129 _dispatch_source_timer_data(dispatch_source_refs_t dr
, unsigned long prev
)
1131 // calculate the number of intervals since last fire
1132 unsigned long data
, missed
;
1134 now
= _dispatch_source_timer_now(NULL
, _dispatch_source_timer_idx(dr
));
1135 missed
= (unsigned long)((now
- ds_timer(dr
).last_fire
) /
1136 ds_timer(dr
).interval
);
1137 // correct for missed intervals already delivered last time
1138 data
= prev
- ds_timer(dr
).missed
+ missed
;
1139 ds_timer(dr
).missed
= missed
;
1143 struct dispatch_set_timer_params
{
1144 dispatch_source_t ds
;
1146 struct dispatch_timer_source_s values
;
1150 _dispatch_source_set_timer3(void *context
)
1152 // Called on the _dispatch_mgr_q
1153 struct dispatch_set_timer_params
*params
= context
;
1154 dispatch_source_t ds
= params
->ds
;
1155 ds
->ds_ident_hack
= params
->ident
;
1156 ds_timer(ds
->ds_refs
) = params
->values
;
1157 // Clear any pending data that might have accumulated on
1158 // older timer params <rdar://problem/8574886>
1159 ds
->ds_pending_data
= 0;
1160 // Re-arm in case we got disarmed because of pending set_timer suspension
1161 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_ARMED
, release
);
1162 dispatch_resume(ds
);
1163 // Must happen after resume to avoid getting disarmed due to suspension
1164 _dispatch_timers_update(ds
);
1165 dispatch_release(ds
);
1166 if (params
->values
.flags
& DISPATCH_TIMER_WALL_CLOCK
) {
1167 _dispatch_mach_host_calendar_change_register();
1173 _dispatch_source_set_timer2(void *context
)
1175 // Called on the source queue
1176 struct dispatch_set_timer_params
*params
= context
;
1177 dispatch_suspend(params
->ds
);
1178 dispatch_barrier_async_f(&_dispatch_mgr_q
, params
,
1179 _dispatch_source_set_timer3
);
1183 static struct dispatch_set_timer_params
*
1184 _dispatch_source_timer_params(dispatch_source_t ds
, dispatch_time_t start
,
1185 uint64_t interval
, uint64_t leeway
)
1187 struct dispatch_set_timer_params
*params
;
1188 params
= _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params
));
1190 params
->values
.flags
= ds_timer(ds
->ds_refs
).flags
;
1192 if (interval
== 0) {
1193 // we use zero internally to mean disabled
1195 } else if ((int64_t)interval
< 0) {
1196 // 6866347 - make sure nanoseconds won't overflow
1197 interval
= INT64_MAX
;
1199 if ((int64_t)leeway
< 0) {
1202 if (start
== DISPATCH_TIME_NOW
) {
1203 start
= _dispatch_absolute_time();
1204 } else if (start
== DISPATCH_TIME_FOREVER
) {
1208 if ((int64_t)start
< 0) {
1210 start
= (dispatch_time_t
)-((int64_t)start
);
1211 params
->values
.flags
|= DISPATCH_TIMER_WALL_CLOCK
;
1214 interval
= _dispatch_time_nano2mach(interval
);
1216 // rdar://problem/7287561 interval must be at least one in
1217 // in order to avoid later division by zero when calculating
1218 // the missed interval count. (NOTE: the wall clock's
1219 // interval is already "fixed" to be 1 or more)
1222 leeway
= _dispatch_time_nano2mach(leeway
);
1223 params
->values
.flags
&= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK
;
1225 params
->ident
= DISPATCH_TIMER_IDENT(params
->values
.flags
);
1226 params
->values
.target
= start
;
1227 params
->values
.deadline
= (start
< UINT64_MAX
- leeway
) ?
1228 start
+ leeway
: UINT64_MAX
;
1229 params
->values
.interval
= interval
;
1230 params
->values
.leeway
= (interval
== INT64_MAX
|| leeway
< interval
/ 2) ?
1231 leeway
: interval
/ 2;
1235 DISPATCH_ALWAYS_INLINE
1237 _dispatch_source_set_timer(dispatch_source_t ds
, dispatch_time_t start
,
1238 uint64_t interval
, uint64_t leeway
, bool source_sync
)
1240 if (slowpath(!ds
->ds_is_timer
) ||
1241 slowpath(ds_timer(ds
->ds_refs
).flags
& DISPATCH_TIMER_INTERVAL
)) {
1242 DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source");
1245 struct dispatch_set_timer_params
*params
;
1246 params
= _dispatch_source_timer_params(ds
, start
, interval
, leeway
);
1248 _dispatch_source_timer_telemetry(ds
, params
->ident
, ¶ms
->values
);
1249 // Suspend the source so that it doesn't fire with pending changes
1250 // The use of suspend/resume requires the external retain/release
1251 dispatch_retain(ds
);
1253 return _dispatch_barrier_trysync_f((dispatch_queue_t
)ds
, params
,
1254 _dispatch_source_set_timer2
);
1256 return _dispatch_source_set_timer2(params
);
1261 dispatch_source_set_timer(dispatch_source_t ds
, dispatch_time_t start
,
1262 uint64_t interval
, uint64_t leeway
)
1264 _dispatch_source_set_timer(ds
, start
, interval
, leeway
, true);
1268 _dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds
,
1269 dispatch_time_t start
, uint64_t interval
, uint64_t leeway
)
1271 // Don't serialize through the source queue for CF timers <rdar://13833190>
1272 _dispatch_source_set_timer(ds
, start
, interval
, leeway
, false);
1276 _dispatch_source_set_interval(dispatch_source_t ds
, uint64_t interval
)
1278 dispatch_source_refs_t dr
= ds
->ds_refs
;
1279 #define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1280 const bool animation
= ds_timer(dr
).flags
& DISPATCH_INTERVAL_UI_ANIMATION
;
1281 if (fastpath(interval
<= (animation
? FOREVER_NSEC
/NSEC_PER_FRAME
:
1282 FOREVER_NSEC
/NSEC_PER_MSEC
))) {
1283 interval
*= animation
? NSEC_PER_FRAME
: NSEC_PER_MSEC
;
1285 interval
= FOREVER_NSEC
;
1287 interval
= _dispatch_time_nano2mach(interval
);
1288 uint64_t target
= _dispatch_absolute_time() + interval
;
1289 target
= (target
/ interval
) * interval
;
1290 const uint64_t leeway
= animation
?
1291 _dispatch_time_nano2mach(NSEC_PER_FRAME
) : interval
/ 2;
1292 ds_timer(dr
).target
= target
;
1293 ds_timer(dr
).deadline
= target
+ leeway
;
1294 ds_timer(dr
).interval
= interval
;
1295 ds_timer(dr
).leeway
= leeway
;
1296 _dispatch_source_timer_telemetry(ds
, ds
->ds_ident_hack
, &ds_timer(dr
));
1300 #pragma mark dispatch_timers
1302 #define DISPATCH_TIMER_STRUCT(refs) \
1303 uint64_t target, deadline; \
1304 TAILQ_HEAD(, refs) dt_sources
1306 typedef struct dispatch_timer_s
{
1307 DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s
);
1308 } *dispatch_timer_t
;
1310 #define DISPATCH_TIMER_INITIALIZER(tidx) \
1312 .target = UINT64_MAX, \
1313 .deadline = UINT64_MAX, \
1314 .dt_sources = TAILQ_HEAD_INITIALIZER( \
1315 _dispatch_timer[tidx].dt_sources), \
1317 #define DISPATCH_TIMER_INIT(kind, qos) \
1318 DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1319 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1321 struct dispatch_timer_s _dispatch_timer
[] = {
1322 DISPATCH_TIMER_INIT(WALL
, NORMAL
),
1323 DISPATCH_TIMER_INIT(WALL
, CRITICAL
),
1324 DISPATCH_TIMER_INIT(WALL
, BACKGROUND
),
1325 DISPATCH_TIMER_INIT(MACH
, NORMAL
),
1326 DISPATCH_TIMER_INIT(MACH
, CRITICAL
),
1327 DISPATCH_TIMER_INIT(MACH
, BACKGROUND
),
1329 #define DISPATCH_TIMER_COUNT \
1330 ((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0])))
1332 #define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1333 (uintptr_t)&_dispatch_kevent_timer[tidx]
1335 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1336 .udata = DISPATCH_KEVENT_TIMER_UDATA(tidx)
1338 // dynamic initialization in _dispatch_timers_init()
1339 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1342 #define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \
1346 .filter = DISPATCH_EVFILT_TIMER, \
1347 DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \
1349 .dk_sources = TAILQ_HEAD_INITIALIZER( \
1350 _dispatch_kevent_timer[tidx].dk_sources), \
1352 #define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \
1353 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1354 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1356 struct dispatch_kevent_s _dispatch_kevent_timer
[] = {
1357 DISPATCH_KEVENT_TIMER_INIT(WALL
, NORMAL
),
1358 DISPATCH_KEVENT_TIMER_INIT(WALL
, CRITICAL
),
1359 DISPATCH_KEVENT_TIMER_INIT(WALL
, BACKGROUND
),
1360 DISPATCH_KEVENT_TIMER_INIT(MACH
, NORMAL
),
1361 DISPATCH_KEVENT_TIMER_INIT(MACH
, CRITICAL
),
1362 DISPATCH_KEVENT_TIMER_INIT(MACH
, BACKGROUND
),
1363 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM
),
1365 #define DISPATCH_KEVENT_TIMER_COUNT \
1366 ((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0])))
1368 #define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8)
1369 #define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(qos, note) \
1371 .ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(qos), \
1372 .filter = EVFILT_TIMER, \
1373 .flags = EV_ONESHOT, \
1374 .fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \
1376 #define DISPATCH_KEVENT_TIMEOUT_INIT(qos, note) \
1377 DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_QOS_##qos, note)
1379 struct kevent64_s _dispatch_kevent_timeout
[] = {
1380 DISPATCH_KEVENT_TIMEOUT_INIT(NORMAL
, 0),
1381 DISPATCH_KEVENT_TIMEOUT_INIT(CRITICAL
, NOTE_CRITICAL
),
1382 DISPATCH_KEVENT_TIMEOUT_INIT(BACKGROUND
, NOTE_BACKGROUND
),
1385 #define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
1386 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
1388 static const uint64_t _dispatch_kevent_coalescing_window
[] = {
1389 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL
, 75),
1390 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL
, 1),
1391 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND
, 100),
1394 #define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \
1395 typeof(dr) dri = NULL; typeof(dt) dti; \
1396 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1397 TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \
1398 if (ds_timer(dr).target < ds_timer(dri).target) { \
1402 TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \
1403 if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \
1408 TAILQ_INSERT_BEFORE(dti, dt, dt_list); \
1410 TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \
1414 TAILQ_INSERT_BEFORE(dri, dr, dr_list); \
1416 TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \
1420 #define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \
1422 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1423 TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \
1425 TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \
1428 #define _dispatch_timers_check(dra, dta) ({ \
1429 unsigned int qosm = _dispatch_timers_qos_mask; \
1430 bool update = false; \
1431 unsigned int tidx; \
1432 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \
1433 if (!(qosm & 1 << DISPATCH_TIMER_QOS(tidx))){ \
1436 dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \
1437 TAILQ_FIRST(&dra[tidx].dk_sources); \
1438 dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \
1439 TAILQ_FIRST(&dta[tidx].dt_sources); \
1440 uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \
1441 uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \
1442 if (target != dta[tidx].target) { \
1443 dta[tidx].target = target; \
1446 if (deadline != dta[tidx].deadline) { \
1447 dta[tidx].deadline = deadline; \
1453 static bool _dispatch_timers_reconfigure
, _dispatch_timer_expired
;
1454 static unsigned int _dispatch_timers_qos_mask
;
1455 static bool _dispatch_timers_force_max_leeway
;
1458 _dispatch_timers_init(void)
1462 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
1463 _dispatch_kevent_timer
[tidx
].dk_kevent
.udata
= \
1464 DISPATCH_KEVENT_TIMER_UDATA(tidx
);
1467 _dispatch_timers_force_max_leeway
=
1468 getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY");
1472 _dispatch_timers_unregister(dispatch_source_t ds
, dispatch_kevent_t dk
)
1474 dispatch_source_refs_t dr
= ds
->ds_refs
;
1475 unsigned int tidx
= (unsigned int)dk
->dk_kevent
.ident
;
1477 if (slowpath(ds_timer_aggregate(ds
))) {
1478 _dispatch_timer_aggregates_unregister(ds
, tidx
);
1480 _dispatch_timers_remove(tidx
, dk
, _dispatch_kevent_timer
, dr
, dr_list
,
1481 _dispatch_timer
, (dispatch_timer_source_refs_t
)dr
, dt_list
);
1482 if (tidx
!= DISPATCH_TIMER_INDEX_DISARM
) {
1483 _dispatch_timers_reconfigure
= true;
1484 _dispatch_timers_qos_mask
|= 1 << DISPATCH_TIMER_QOS(tidx
);
1488 // Updates the ordered list of timers based on next fire date for changes to ds.
1489 // Should only be called from the context of _dispatch_mgr_q.
1491 _dispatch_timers_update(dispatch_source_t ds
)
1493 dispatch_kevent_t dk
= ds
->ds_dkev
;
1494 dispatch_source_refs_t dr
= ds
->ds_refs
;
1497 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1499 // Do not reschedule timers unregistered with _dispatch_kevent_unregister()
1500 if (slowpath(!dk
)) {
1503 // Move timers that are disabled, suspended or have missed intervals to the
1504 // disarmed list, rearm after resume resp. source invoke will reenable them
1505 if (!ds_timer(dr
).target
|| DISPATCH_OBJECT_SUSPENDED(ds
) ||
1506 ds
->ds_pending_data
) {
1507 tidx
= DISPATCH_TIMER_INDEX_DISARM
;
1508 (void)dispatch_atomic_and2o(ds
, ds_atomic_flags
, ~DSF_ARMED
, relaxed
);
1510 tidx
= _dispatch_source_timer_idx(dr
);
1512 if (slowpath(ds_timer_aggregate(ds
))) {
1513 _dispatch_timer_aggregates_register(ds
);
1515 if (slowpath(!ds
->ds_is_installed
)) {
1516 ds
->ds_is_installed
= true;
1517 if (tidx
!= DISPATCH_TIMER_INDEX_DISARM
) {
1518 (void)dispatch_atomic_or2o(ds
, ds_atomic_flags
, DSF_ARMED
, relaxed
);
1521 _dispatch_object_debug(ds
, "%s", __func__
);
1523 _dispatch_timers_unregister(ds
, dk
);
1525 if (tidx
!= DISPATCH_TIMER_INDEX_DISARM
) {
1526 _dispatch_timers_reconfigure
= true;
1527 _dispatch_timers_qos_mask
|= 1 << DISPATCH_TIMER_QOS(tidx
);
1529 if (dk
!= &_dispatch_kevent_timer
[tidx
]){
1530 ds
->ds_dkev
= &_dispatch_kevent_timer
[tidx
];
1532 _dispatch_timers_insert(tidx
, _dispatch_kevent_timer
, dr
, dr_list
,
1533 _dispatch_timer
, (dispatch_timer_source_refs_t
)dr
, dt_list
);
1534 if (slowpath(ds_timer_aggregate(ds
))) {
1535 _dispatch_timer_aggregates_update(ds
, tidx
);
1540 _dispatch_timers_run2(uint64_t nows
[], unsigned int tidx
)
1542 dispatch_source_refs_t dr
;
1543 dispatch_source_t ds
;
1544 uint64_t now
, missed
;
1546 now
= _dispatch_source_timer_now(nows
, tidx
);
1547 while ((dr
= TAILQ_FIRST(&_dispatch_kevent_timer
[tidx
].dk_sources
))) {
1548 ds
= _dispatch_source_from_refs(dr
);
1549 // We may find timers on the wrong list due to a pending update from
1550 // dispatch_source_set_timer. Force an update of the list in that case.
1551 if (tidx
!= ds
->ds_ident_hack
) {
1552 _dispatch_timers_update(ds
);
1555 if (!ds_timer(dr
).target
) {
1556 // No configured timers on the list
1559 if (ds_timer(dr
).target
> now
) {
1560 // Done running timers for now.
1563 // Remove timers that are suspended or have missed intervals from the
1564 // list, rearm after resume resp. source invoke will reenable them
1565 if (DISPATCH_OBJECT_SUSPENDED(ds
) || ds
->ds_pending_data
) {
1566 _dispatch_timers_update(ds
);
1569 // Calculate number of missed intervals.
1570 missed
= (now
- ds_timer(dr
).target
) / ds_timer(dr
).interval
;
1571 if (++missed
> INT_MAX
) {
1574 if (ds_timer(dr
).interval
< INT64_MAX
) {
1575 ds_timer(dr
).target
+= missed
* ds_timer(dr
).interval
;
1576 ds_timer(dr
).deadline
= ds_timer(dr
).target
+ ds_timer(dr
).leeway
;
1578 ds_timer(dr
).target
= UINT64_MAX
;
1579 ds_timer(dr
).deadline
= UINT64_MAX
;
1581 _dispatch_timers_update(ds
);
1582 ds_timer(dr
).last_fire
= now
;
1585 data
= dispatch_atomic_add2o(ds
, ds_pending_data
,
1586 (unsigned long)missed
, relaxed
);
1587 _dispatch_trace_timer_fire(dr
, data
, (unsigned long)missed
);
1588 _dispatch_wakeup(ds
);
1594 _dispatch_timers_run(uint64_t nows
[])
1597 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
1598 if (!TAILQ_EMPTY(&_dispatch_kevent_timer
[tidx
].dk_sources
)) {
1599 _dispatch_timers_run2(nows
, tidx
);
1604 static inline unsigned int
1605 _dispatch_timers_get_delay(uint64_t nows
[], struct dispatch_timer_s timer
[],
1606 uint64_t *delay
, uint64_t *leeway
, int qos
)
1608 unsigned int tidx
, ridx
= DISPATCH_TIMER_COUNT
;
1609 uint64_t tmp
, delta
= UINT64_MAX
, dldelta
= UINT64_MAX
;
1611 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
1612 if (qos
>= 0 && qos
!= DISPATCH_TIMER_QOS(tidx
)){
1615 uint64_t target
= timer
[tidx
].target
;
1616 if (target
== UINT64_MAX
) {
1619 uint64_t deadline
= timer
[tidx
].deadline
;
1621 // Timer pre-coalescing <rdar://problem/13222034>
1622 uint64_t window
= _dispatch_kevent_coalescing_window
[qos
];
1623 uint64_t latest
= deadline
> window
? deadline
- window
: 0;
1624 dispatch_source_refs_t dri
;
1625 TAILQ_FOREACH(dri
, &_dispatch_kevent_timer
[tidx
].dk_sources
,
1627 tmp
= ds_timer(dri
).target
;
1628 if (tmp
> latest
) break;
1632 uint64_t now
= _dispatch_source_timer_now(nows
, tidx
);
1633 if (target
<= now
) {
1638 if (DISPATCH_TIMER_KIND(tidx
) != DISPATCH_TIMER_KIND_WALL
) {
1639 tmp
= _dispatch_time_mach2nano(tmp
);
1641 if (tmp
< INT64_MAX
&& tmp
< delta
) {
1645 dispatch_assert(target
<= deadline
);
1646 tmp
= deadline
- now
;
1647 if (DISPATCH_TIMER_KIND(tidx
) != DISPATCH_TIMER_KIND_WALL
) {
1648 tmp
= _dispatch_time_mach2nano(tmp
);
1650 if (tmp
< INT64_MAX
&& tmp
< dldelta
) {
1655 *leeway
= delta
&& delta
< UINT64_MAX
? dldelta
- delta
: UINT64_MAX
;
1660 _dispatch_timers_program2(uint64_t nows
[], struct kevent64_s
*ke
,
1665 uint64_t delay
, leeway
;
1667 tidx
= _dispatch_timers_get_delay(nows
, _dispatch_timer
, &delay
, &leeway
,
1669 poll
= (delay
== 0);
1670 if (poll
|| delay
== UINT64_MAX
) {
1671 _dispatch_trace_next_timer_set(NULL
, qos
);
1676 ke
->flags
|= EV_DELETE
;
1677 ke
->flags
&= ~(EV_ADD
|EV_ENABLE
);
1679 _dispatch_trace_next_timer_set(
1680 TAILQ_FIRST(&_dispatch_kevent_timer
[tidx
].dk_sources
), qos
);
1681 _dispatch_trace_next_timer_program(delay
, qos
);
1682 delay
+= _dispatch_source_timer_now(nows
, DISPATCH_TIMER_KIND_WALL
);
1683 if (slowpath(_dispatch_timers_force_max_leeway
)) {
1684 ke
->data
= (int64_t)(delay
+ leeway
);
1687 ke
->data
= (int64_t)delay
;
1688 ke
->ext
[1] = leeway
;
1690 ke
->flags
|= EV_ADD
|EV_ENABLE
;
1691 ke
->flags
&= ~EV_DELETE
;
1693 _dispatch_kq_update(ke
);
1699 _dispatch_timers_program(uint64_t nows
[])
1702 unsigned int qos
, qosm
= _dispatch_timers_qos_mask
;
1703 for (qos
= 0; qos
< DISPATCH_TIMER_QOS_COUNT
; qos
++) {
1704 if (!(qosm
& 1 << qos
)){
1707 poll
|= _dispatch_timers_program2(nows
, &_dispatch_kevent_timeout
[qos
],
1715 _dispatch_timers_configure(void)
1717 _dispatch_timer_aggregates_check();
1718 // Find out if there is a new target/deadline on the timer lists
1719 return _dispatch_timers_check(_dispatch_kevent_timer
, _dispatch_timer
);
1723 _dispatch_timers_calendar_change(void)
1725 // calendar change may have gone past the wallclock deadline
1726 _dispatch_timer_expired
= true;
1727 _dispatch_timers_qos_mask
= ~0u;
1731 _dispatch_timers_kevent(struct kevent64_s
*ke
)
1733 dispatch_assert(ke
->data
> 0);
1734 dispatch_assert((ke
->ident
& DISPATCH_KEVENT_TIMEOUT_IDENT_MASK
) ==
1735 DISPATCH_KEVENT_TIMEOUT_IDENT_MASK
);
1736 unsigned int qos
= ke
->ident
& ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK
;
1737 dispatch_assert(qos
< DISPATCH_TIMER_QOS_COUNT
);
1738 dispatch_assert(_dispatch_kevent_timeout
[qos
].data
);
1739 _dispatch_kevent_timeout
[qos
].data
= 0; // kevent deleted via EV_ONESHOT
1740 _dispatch_timer_expired
= true;
1741 _dispatch_timers_qos_mask
|= 1 << qos
;
1742 _dispatch_trace_next_timer_wake(qos
);
1746 _dispatch_mgr_timers(void)
1748 uint64_t nows
[DISPATCH_TIMER_KIND_COUNT
] = {};
1749 bool expired
= slowpath(_dispatch_timer_expired
);
1751 _dispatch_timers_run(nows
);
1753 bool reconfigure
= slowpath(_dispatch_timers_reconfigure
);
1754 if (reconfigure
|| expired
) {
1756 reconfigure
= _dispatch_timers_configure();
1757 _dispatch_timers_reconfigure
= false;
1759 if (reconfigure
|| expired
) {
1760 expired
= _dispatch_timer_expired
= _dispatch_timers_program(nows
);
1761 expired
= expired
|| _dispatch_mgr_q
.dq_items_tail
;
1763 _dispatch_timers_qos_mask
= 0;
1769 #pragma mark dispatch_timer_aggregate
1772 TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s
) dk_sources
;
1773 } dispatch_timer_aggregate_refs_s
;
1775 typedef struct dispatch_timer_aggregate_s
{
1776 DISPATCH_STRUCT_HEADER(queue
);
1777 DISPATCH_QUEUE_HEADER
;
1778 TAILQ_ENTRY(dispatch_timer_aggregate_s
) dta_list
;
1779 dispatch_timer_aggregate_refs_s
1780 dta_kevent_timer
[DISPATCH_KEVENT_TIMER_COUNT
];
1782 DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s
);
1783 } dta_timer
[DISPATCH_TIMER_COUNT
];
1784 struct dispatch_timer_s dta_timer_data
[DISPATCH_TIMER_COUNT
];
1785 unsigned int dta_refcount
;
1786 } dispatch_timer_aggregate_s
;
1788 typedef TAILQ_HEAD(, dispatch_timer_aggregate_s
) dispatch_timer_aggregates_s
;
1789 static dispatch_timer_aggregates_s _dispatch_timer_aggregates
=
1790 TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates
);
1792 dispatch_timer_aggregate_t
1793 dispatch_timer_aggregate_create(void)
1796 dispatch_timer_aggregate_t dta
= _dispatch_alloc(DISPATCH_VTABLE(queue
),
1797 sizeof(struct dispatch_timer_aggregate_s
));
1798 _dispatch_queue_init((dispatch_queue_t
)dta
);
1799 dta
->do_targetq
= _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_HIGH
,
1801 dta
->dq_width
= UINT32_MAX
;
1802 //FIXME: aggregates need custom vtable
1803 //dta->dq_label = "timer-aggregate";
1804 for (tidx
= 0; tidx
< DISPATCH_KEVENT_TIMER_COUNT
; tidx
++) {
1805 TAILQ_INIT(&dta
->dta_kevent_timer
[tidx
].dk_sources
);
1807 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
1808 TAILQ_INIT(&dta
->dta_timer
[tidx
].dt_sources
);
1809 dta
->dta_timer
[tidx
].target
= UINT64_MAX
;
1810 dta
->dta_timer
[tidx
].deadline
= UINT64_MAX
;
1811 dta
->dta_timer_data
[tidx
].target
= UINT64_MAX
;
1812 dta
->dta_timer_data
[tidx
].deadline
= UINT64_MAX
;
1814 return (dispatch_timer_aggregate_t
)_dispatch_introspection_queue_create(
1815 (dispatch_queue_t
)dta
);
1818 typedef struct dispatch_timer_delay_s
{
1819 dispatch_timer_t timer
;
1820 uint64_t delay
, leeway
;
1821 } *dispatch_timer_delay_t
;
1824 _dispatch_timer_aggregate_get_delay(void *ctxt
)
1826 dispatch_timer_delay_t dtd
= ctxt
;
1827 struct { uint64_t nows
[DISPATCH_TIMER_KIND_COUNT
]; } dtn
= {};
1828 _dispatch_timers_get_delay(dtn
.nows
, dtd
->timer
, &dtd
->delay
, &dtd
->leeway
,
1833 dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta
,
1834 uint64_t *leeway_ptr
)
1836 struct dispatch_timer_delay_s dtd
= {
1837 .timer
= dta
->dta_timer_data
,
1839 dispatch_sync_f((dispatch_queue_t
)dta
, &dtd
,
1840 _dispatch_timer_aggregate_get_delay
);
1842 *leeway_ptr
= dtd
.leeway
;
1848 _dispatch_timer_aggregate_update(void *ctxt
)
1850 dispatch_timer_aggregate_t dta
= (void*)_dispatch_queue_get_current();
1851 dispatch_timer_t dtau
= ctxt
;
1853 for (tidx
= 0; tidx
< DISPATCH_TIMER_COUNT
; tidx
++) {
1854 dta
->dta_timer_data
[tidx
].target
= dtau
[tidx
].target
;
1855 dta
->dta_timer_data
[tidx
].deadline
= dtau
[tidx
].deadline
;
1862 _dispatch_timer_aggregates_configure(void)
1864 dispatch_timer_aggregate_t dta
;
1865 dispatch_timer_t dtau
;
1866 TAILQ_FOREACH(dta
, &_dispatch_timer_aggregates
, dta_list
) {
1867 if (!_dispatch_timers_check(dta
->dta_kevent_timer
, dta
->dta_timer
)) {
1870 dtau
= _dispatch_calloc(DISPATCH_TIMER_COUNT
, sizeof(*dtau
));
1871 memcpy(dtau
, dta
->dta_timer
, sizeof(dta
->dta_timer
));
1872 dispatch_barrier_async_f((dispatch_queue_t
)dta
, dtau
,
1873 _dispatch_timer_aggregate_update
);
1878 _dispatch_timer_aggregates_check(void)
1880 if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates
))) {
1883 _dispatch_timer_aggregates_configure();
1887 _dispatch_timer_aggregates_register(dispatch_source_t ds
)
1889 dispatch_timer_aggregate_t dta
= ds_timer_aggregate(ds
);
1890 if (!dta
->dta_refcount
++) {
1891 TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates
, dta
, dta_list
);
1897 _dispatch_timer_aggregates_update(dispatch_source_t ds
, unsigned int tidx
)
1899 dispatch_timer_aggregate_t dta
= ds_timer_aggregate(ds
);
1900 dispatch_timer_source_aggregate_refs_t dr
;
1901 dr
= (dispatch_timer_source_aggregate_refs_t
)ds
->ds_refs
;
1902 _dispatch_timers_insert(tidx
, dta
->dta_kevent_timer
, dr
, dra_list
,
1903 dta
->dta_timer
, dr
, dta_list
);
1908 _dispatch_timer_aggregates_unregister(dispatch_source_t ds
, unsigned int tidx
)
1910 dispatch_timer_aggregate_t dta
= ds_timer_aggregate(ds
);
1911 dispatch_timer_source_aggregate_refs_t dr
;
1912 dr
= (dispatch_timer_source_aggregate_refs_t
)ds
->ds_refs
;
1913 _dispatch_timers_remove(tidx
, (dispatch_timer_aggregate_refs_s
*)NULL
,
1914 dta
->dta_kevent_timer
, dr
, dra_list
, dta
->dta_timer
, dr
, dta_list
);
1915 if (!--dta
->dta_refcount
) {
1916 TAILQ_REMOVE(&_dispatch_timer_aggregates
, dta
, dta_list
);
1921 #pragma mark dispatch_select
1923 static int _dispatch_kq
;
1925 static unsigned int _dispatch_select_workaround
;
1926 static fd_set _dispatch_rfds
;
1927 static fd_set _dispatch_wfds
;
1928 static uint64_t*_dispatch_rfd_ptrs
;
1929 static uint64_t*_dispatch_wfd_ptrs
;
1933 _dispatch_select_register(struct kevent64_s
*kev
)
1936 // Must execute on manager queue
1937 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1939 // If an EINVAL or ENOENT error occurred while adding/enabling a read or
1940 // write kevent, assume it was due to a type of filedescriptor not
1941 // supported by kqueue and fall back to select
1942 switch (kev
->filter
) {
1944 if ((kev
->data
== EINVAL
|| kev
->data
== ENOENT
) &&
1945 dispatch_assume(kev
->ident
< FD_SETSIZE
)) {
1946 FD_SET((int)kev
->ident
, &_dispatch_rfds
);
1947 if (slowpath(!_dispatch_rfd_ptrs
)) {
1948 _dispatch_rfd_ptrs
= _dispatch_calloc(FD_SETSIZE
,
1949 sizeof(*_dispatch_rfd_ptrs
));
1951 if (!_dispatch_rfd_ptrs
[kev
->ident
]) {
1952 _dispatch_rfd_ptrs
[kev
->ident
] = kev
->udata
;
1953 _dispatch_select_workaround
++;
1954 _dispatch_debug("select workaround used to read fd %d: 0x%lx",
1955 (int)kev
->ident
, (long)kev
->data
);
1960 if ((kev
->data
== EINVAL
|| kev
->data
== ENOENT
) &&
1961 dispatch_assume(kev
->ident
< FD_SETSIZE
)) {
1962 FD_SET((int)kev
->ident
, &_dispatch_wfds
);
1963 if (slowpath(!_dispatch_wfd_ptrs
)) {
1964 _dispatch_wfd_ptrs
= _dispatch_calloc(FD_SETSIZE
,
1965 sizeof(*_dispatch_wfd_ptrs
));
1967 if (!_dispatch_wfd_ptrs
[kev
->ident
]) {
1968 _dispatch_wfd_ptrs
[kev
->ident
] = kev
->udata
;
1969 _dispatch_select_workaround
++;
1970 _dispatch_debug("select workaround used to write fd %d: 0x%lx",
1971 (int)kev
->ident
, (long)kev
->data
);
1981 _dispatch_select_unregister(const struct kevent64_s
*kev
)
1983 // Must execute on manager queue
1984 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1986 switch (kev
->filter
) {
1988 if (_dispatch_rfd_ptrs
&& kev
->ident
< FD_SETSIZE
&&
1989 _dispatch_rfd_ptrs
[kev
->ident
]) {
1990 FD_CLR((int)kev
->ident
, &_dispatch_rfds
);
1991 _dispatch_rfd_ptrs
[kev
->ident
] = 0;
1992 _dispatch_select_workaround
--;
1997 if (_dispatch_wfd_ptrs
&& kev
->ident
< FD_SETSIZE
&&
1998 _dispatch_wfd_ptrs
[kev
->ident
]) {
1999 FD_CLR((int)kev
->ident
, &_dispatch_wfds
);
2000 _dispatch_wfd_ptrs
[kev
->ident
] = 0;
2001 _dispatch_select_workaround
--;
2011 _dispatch_mgr_select(bool poll
)
2013 static const struct timeval timeout_immediately
= { 0, 0 };
2014 fd_set tmp_rfds
, tmp_wfds
;
2015 struct kevent64_s kev
;
2017 bool kevent_avail
= false;
2019 FD_COPY(&_dispatch_rfds
, &tmp_rfds
);
2020 FD_COPY(&_dispatch_wfds
, &tmp_wfds
);
2022 r
= select(FD_SETSIZE
, &tmp_rfds
, &tmp_wfds
, NULL
,
2023 poll
? (struct timeval
*)&timeout_immediately
: NULL
);
2024 if (slowpath(r
== -1)) {
2028 (void)dispatch_assume_zero(err
);
2032 for (i
= 0; i
< FD_SETSIZE
; i
++) {
2033 if (i
== _dispatch_kq
) {
2036 if (!FD_ISSET(i
, &_dispatch_rfds
) && !FD_ISSET(i
, &_dispatch_wfds
)){
2040 if (dispatch_assume(r
!= -1)) {
2043 if (_dispatch_rfd_ptrs
&& _dispatch_rfd_ptrs
[i
]) {
2044 FD_CLR(i
, &_dispatch_rfds
);
2045 _dispatch_rfd_ptrs
[i
] = 0;
2046 _dispatch_select_workaround
--;
2048 if (_dispatch_wfd_ptrs
&& _dispatch_wfd_ptrs
[i
]) {
2049 FD_CLR(i
, &_dispatch_wfds
);
2050 _dispatch_wfd_ptrs
[i
] = 0;
2051 _dispatch_select_workaround
--;
2058 for (i
= 0; i
< FD_SETSIZE
; i
++) {
2059 if (FD_ISSET(i
, &tmp_rfds
)) {
2060 if (i
== _dispatch_kq
) {
2061 kevent_avail
= true;
2064 FD_CLR(i
, &_dispatch_rfds
); // emulate EV_DISPATCH
2065 EV_SET64(&kev
, i
, EVFILT_READ
,
2066 EV_ADD
|EV_ENABLE
|EV_DISPATCH
, 0, 1,
2067 _dispatch_rfd_ptrs
[i
], 0, 0);
2068 _dispatch_kevent_drain(&kev
);
2070 if (FD_ISSET(i
, &tmp_wfds
)) {
2071 FD_CLR(i
, &_dispatch_wfds
); // emulate EV_DISPATCH
2072 EV_SET64(&kev
, i
, EVFILT_WRITE
,
2073 EV_ADD
|EV_ENABLE
|EV_DISPATCH
, 0, 1,
2074 _dispatch_wfd_ptrs
[i
], 0, 0);
2075 _dispatch_kevent_drain(&kev
);
2079 return kevent_avail
;
2083 #pragma mark dispatch_kqueue
2086 _dispatch_kq_init(void *context DISPATCH_UNUSED
)
2088 static const struct kevent64_s kev
= {
2090 .filter
= EVFILT_USER
,
2091 .flags
= EV_ADD
|EV_CLEAR
,
2094 _dispatch_safe_fork
= false;
2095 #if DISPATCH_USE_GUARDED_FD
2096 guardid_t guard
= (uintptr_t)&kev
;
2097 _dispatch_kq
= guarded_kqueue_np(&guard
, GUARD_CLOSE
| GUARD_DUP
);
2099 _dispatch_kq
= kqueue();
2101 if (_dispatch_kq
== -1) {
2102 DISPATCH_CLIENT_CRASH("kqueue() create failed: "
2103 "probably out of file descriptors");
2104 } else if (dispatch_assume(_dispatch_kq
< FD_SETSIZE
)) {
2105 // in case we fall back to select()
2106 FD_SET(_dispatch_kq
, &_dispatch_rfds
);
2109 (void)dispatch_assume_zero(kevent64(_dispatch_kq
, &kev
, 1, NULL
, 0, 0,
2111 _dispatch_queue_push(_dispatch_mgr_q
.do_targetq
, &_dispatch_mgr_q
);
2115 _dispatch_get_kq(void)
2117 static dispatch_once_t pred
;
2119 dispatch_once_f(&pred
, NULL
, _dispatch_kq_init
);
2121 return _dispatch_kq
;
2126 _dispatch_kq_update(const struct kevent64_s
*kev
)
2129 struct kevent64_s kev_copy
;
2131 if (slowpath(_dispatch_select_workaround
) && (kev
->flags
& EV_DELETE
)) {
2132 if (_dispatch_select_unregister(kev
)) {
2137 // This ensures we don't get a pending kevent back while registering
2139 kev_copy
.flags
|= EV_RECEIPT
;
2141 r
= dispatch_assume(kevent64(_dispatch_get_kq(), &kev_copy
, 1,
2142 &kev_copy
, 1, 0, NULL
));
2143 if (slowpath(r
== -1)) {
2149 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2152 (void)dispatch_assume_zero(err
);
2157 switch (kev_copy
.data
) {
2164 if ((kev
->flags
& (EV_ADD
|EV_ENABLE
)) && !(kev
->flags
& EV_DELETE
)) {
2165 if (_dispatch_select_register(&kev_copy
)) {
2171 kev_copy
.flags
|= kev
->flags
;
2172 _dispatch_kevent_drain(&kev_copy
);
2175 return (long)kev_copy
.data
;
2179 #pragma mark dispatch_mgr
2181 static struct kevent64_s
*_dispatch_kevent_enable
;
2184 _dispatch_mgr_kevent_reenable(struct kevent64_s
*ke
)
2186 dispatch_assert(!_dispatch_kevent_enable
|| _dispatch_kevent_enable
== ke
);
2187 _dispatch_kevent_enable
= ke
;
2191 _dispatch_mgr_wakeup(dispatch_queue_t dq DISPATCH_UNUSED
)
2193 if (_dispatch_queue_get_current() == &_dispatch_mgr_q
) {
2197 static const struct kevent64_s kev
= {
2199 .filter
= EVFILT_USER
,
2200 .fflags
= NOTE_TRIGGER
,
2203 #if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG
2204 _dispatch_debug("waking up the dispatch manager queue: %p", dq
);
2207 _dispatch_kq_update(&kev
);
2214 _dispatch_mgr_init(void)
2216 (void)dispatch_atomic_inc2o(&_dispatch_mgr_q
, dq_running
, relaxed
);
2217 _dispatch_thread_setspecific(dispatch_queue_key
, &_dispatch_mgr_q
);
2218 _dispatch_queue_set_bound_thread(&_dispatch_mgr_q
);
2219 _dispatch_mgr_priority_init();
2220 _dispatch_kevent_init();
2221 _dispatch_timers_init();
2222 _dispatch_mach_recv_msg_buf_init();
2223 _dispatch_memorystatus_init();
2226 DISPATCH_NOINLINE DISPATCH_NORETURN
2228 _dispatch_mgr_invoke(void)
2230 static const struct timespec timeout_immediately
= { 0, 0 };
2231 struct kevent64_s kev
;
2236 _dispatch_mgr_queue_drain();
2237 poll
= _dispatch_mgr_timers();
2238 if (slowpath(_dispatch_select_workaround
)) {
2239 poll
= _dispatch_mgr_select(poll
);
2240 if (!poll
) continue;
2242 r
= kevent64(_dispatch_kq
, _dispatch_kevent_enable
,
2243 _dispatch_kevent_enable
? 1 : 0, &kev
, 1, 0,
2244 poll
? &timeout_immediately
: NULL
);
2245 _dispatch_kevent_enable
= NULL
;
2246 if (slowpath(r
== -1)) {
2252 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2255 (void)dispatch_assume_zero(err
);
2259 _dispatch_kevent_drain(&kev
);
2266 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED
)
2268 _dispatch_mgr_init();
2269 // never returns, so burn bridges behind us & clear stack 2k ahead
2270 _dispatch_clear_stack(2048);
2271 _dispatch_mgr_invoke();
2275 #pragma mark dispatch_memorystatus
2277 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
2278 #define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYSTATUS
2279 #define DISPATCH_MEMORYSTATUS_SOURCE_MASK ( \
2280 DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL | \
2281 DISPATCH_MEMORYSTATUS_PRESSURE_WARN)
2282 #elif DISPATCH_USE_VM_PRESSURE_SOURCE
2283 #define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_VM
2284 #define DISPATCH_MEMORYSTATUS_SOURCE_MASK DISPATCH_VM_PRESSURE
2287 #if DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2288 static dispatch_source_t _dispatch_memorystatus_source
;
2291 _dispatch_memorystatus_handler(void *context DISPATCH_UNUSED
)
2293 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
2294 unsigned long memorystatus
;
2295 memorystatus
= dispatch_source_get_data(_dispatch_memorystatus_source
);
2296 if (memorystatus
& DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL
) {
2297 _dispatch_continuation_cache_limit
= DISPATCH_CONTINUATION_CACHE_LIMIT
;
2300 _dispatch_continuation_cache_limit
=
2301 DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYSTATUS_PRESSURE_WARN
;
2303 malloc_zone_pressure_relief(0,0);
2307 _dispatch_memorystatus_init(void)
2309 _dispatch_memorystatus_source
= dispatch_source_create(
2310 DISPATCH_MEMORYSTATUS_SOURCE_TYPE
, 0,
2311 DISPATCH_MEMORYSTATUS_SOURCE_MASK
,
2312 _dispatch_get_root_queue(0, true));
2313 dispatch_source_set_event_handler_f(_dispatch_memorystatus_source
,
2314 _dispatch_memorystatus_handler
);
2315 dispatch_resume(_dispatch_memorystatus_source
);
2318 static inline void _dispatch_memorystatus_init(void) {}
2319 #endif // DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2322 #pragma mark dispatch_mach
2326 #if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
2327 #define _dispatch_debug_machport(name) \
2328 dispatch_debug_machport((name), __func__)
2330 #define _dispatch_debug_machport(name) ((void)(name))
2333 // Flags for all notifications that are registered/unregistered when a
2334 // send-possible notification is requested/delivered
2335 #define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
2336 DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
2337 #define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \
2338 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2339 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2340 #define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \
2341 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2342 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2344 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
2345 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
2346 (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
2348 #define _DISPATCH_MACHPORT_HASH_SIZE 32
2349 #define _DISPATCH_MACHPORT_HASH(x) \
2350 _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
2352 #ifndef MACH_RCV_LARGE_IDENTITY
2353 #define MACH_RCV_LARGE_IDENTITY 0x00000008
2355 #define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX
2356 #define DISPATCH_MACH_RCV_OPTIONS ( \
2357 MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \
2358 MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \
2359 MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0))
2361 #define DISPATCH_MACH_KEVENT_ARMED(dk) ((dk)->dk_kevent.ext[0])
2363 static void _dispatch_kevent_machport_drain(struct kevent64_s
*ke
);
2364 static void _dispatch_kevent_mach_msg_drain(struct kevent64_s
*ke
);
2365 static void _dispatch_kevent_mach_msg_recv(mach_msg_header_t
*hdr
);
2366 static void _dispatch_kevent_mach_msg_destroy(mach_msg_header_t
*hdr
);
2367 static void _dispatch_source_merge_mach_msg(dispatch_source_t ds
,
2368 dispatch_source_refs_t dr
, dispatch_kevent_t dk
,
2369 mach_msg_header_t
*hdr
, mach_msg_size_t siz
);
2370 static kern_return_t
_dispatch_mach_notify_update(dispatch_kevent_t dk
,
2371 uint32_t new_flags
, uint32_t del_flags
, uint32_t mask
,
2372 mach_msg_id_t notify_msgid
, mach_port_mscount_t notify_sync
);
2373 static void _dispatch_mach_notify_source_invoke(mach_msg_header_t
*hdr
);
2374 static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm
,
2375 dispatch_mach_reply_refs_t dmr
, bool disconnected
);
2376 static void _dispatch_mach_msg_recv(dispatch_mach_t dm
, mach_msg_header_t
*hdr
,
2377 mach_msg_size_t siz
);
2378 static void _dispatch_mach_merge_kevent(dispatch_mach_t dm
,
2379 const struct kevent64_s
*ke
);
2380 static void _dispatch_mach_kevent_unregister(dispatch_mach_t dm
);
2382 static const size_t _dispatch_mach_recv_msg_size
=
2383 DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE
;
2384 static const size_t dispatch_mach_trailer_size
=
2385 sizeof(dispatch_mach_trailer_t
);
2386 static const size_t _dispatch_mach_recv_msg_buf_size
= mach_vm_round_page(
2387 _dispatch_mach_recv_msg_size
+ dispatch_mach_trailer_size
);
2388 static mach_port_t _dispatch_mach_portset
, _dispatch_mach_recv_portset
;
2389 static mach_port_t _dispatch_mach_notify_port
;
2390 static struct kevent64_s _dispatch_mach_recv_kevent
= {
2391 .filter
= EVFILT_MACHPORT
,
2392 .flags
= EV_ADD
|EV_ENABLE
|EV_DISPATCH
,
2393 .fflags
= DISPATCH_MACH_RCV_OPTIONS
,
2395 static dispatch_source_t _dispatch_mach_notify_source
;
2397 struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct
= {
2399 .filter
= EVFILT_MACHPORT
,
2401 .fflags
= DISPATCH_MACH_RECV_MESSAGE_DIRECT
,
2406 _dispatch_mach_recv_msg_buf_init(void)
2408 mach_vm_size_t vm_size
= _dispatch_mach_recv_msg_buf_size
;
2409 mach_vm_address_t vm_addr
= vm_page_size
;
2412 while (slowpath(kr
= mach_vm_allocate(mach_task_self(), &vm_addr
, vm_size
,
2413 VM_FLAGS_ANYWHERE
))) {
2414 if (kr
!= KERN_NO_SPACE
) {
2415 (void)dispatch_assume_zero(kr
);
2416 DISPATCH_CLIENT_CRASH("Could not allocate mach msg receive buffer");
2418 _dispatch_temporary_resource_shortage();
2419 vm_addr
= vm_page_size
;
2421 _dispatch_mach_recv_kevent
.ext
[0] = (uintptr_t)vm_addr
;
2422 _dispatch_mach_recv_kevent
.ext
[1] = _dispatch_mach_recv_msg_buf_size
;
2426 _dispatch_get_mach_recv_msg_buf(void)
2428 return (void*)_dispatch_mach_recv_kevent
.ext
[0];
2432 _dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED
)
2436 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET
,
2437 &_dispatch_mach_recv_portset
);
2438 DISPATCH_VERIFY_MIG(kr
);
2439 if (dispatch_assume_zero(kr
)) {
2440 DISPATCH_CLIENT_CRASH(
2441 "mach_port_allocate() failed: cannot create port set");
2443 dispatch_assert(_dispatch_get_mach_recv_msg_buf());
2444 dispatch_assert(dispatch_mach_trailer_size
==
2445 REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS(
2446 DISPATCH_MACH_RCV_TRAILER
)));
2447 _dispatch_mach_recv_kevent
.ident
= _dispatch_mach_recv_portset
;
2448 _dispatch_kq_update(&_dispatch_mach_recv_kevent
);
2450 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
,
2451 &_dispatch_mach_notify_port
);
2452 DISPATCH_VERIFY_MIG(kr
);
2453 if (dispatch_assume_zero(kr
)) {
2454 DISPATCH_CLIENT_CRASH(
2455 "mach_port_allocate() failed: cannot create receive right");
2457 _dispatch_mach_notify_source
= dispatch_source_create(
2458 &_dispatch_source_type_mach_recv_direct
,
2459 _dispatch_mach_notify_port
, 0, &_dispatch_mgr_q
);
2460 _dispatch_mach_notify_source
->ds_refs
->ds_handler_func
=
2461 (void*)_dispatch_mach_notify_source_invoke
;
2462 dispatch_assert(_dispatch_mach_notify_source
);
2463 dispatch_resume(_dispatch_mach_notify_source
);
2467 _dispatch_get_mach_recv_portset(void)
2469 static dispatch_once_t pred
;
2470 dispatch_once_f(&pred
, NULL
, _dispatch_mach_recv_portset_init
);
2471 return _dispatch_mach_recv_portset
;
2475 _dispatch_mach_portset_init(void *context DISPATCH_UNUSED
)
2477 struct kevent64_s kev
= {
2478 .filter
= EVFILT_MACHPORT
,
2483 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET
,
2484 &_dispatch_mach_portset
);
2485 DISPATCH_VERIFY_MIG(kr
);
2486 if (dispatch_assume_zero(kr
)) {
2487 DISPATCH_CLIENT_CRASH(
2488 "mach_port_allocate() failed: cannot create port set");
2490 kev
.ident
= _dispatch_mach_portset
;
2491 _dispatch_kq_update(&kev
);
2495 _dispatch_get_mach_portset(void)
2497 static dispatch_once_t pred
;
2498 dispatch_once_f(&pred
, NULL
, _dispatch_mach_portset_init
);
2499 return _dispatch_mach_portset
;
2502 static kern_return_t
2503 _dispatch_mach_portset_update(dispatch_kevent_t dk
, mach_port_t mps
)
2505 mach_port_t mp
= (mach_port_t
)dk
->dk_kevent
.ident
;
2508 _dispatch_debug_machport(mp
);
2509 kr
= mach_port_move_member(mach_task_self(), mp
, mps
);
2511 DISPATCH_VERIFY_MIG(kr
);
2513 case KERN_INVALID_RIGHT
:
2515 _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
2516 "mach_port_move_member() failed ", kr
);
2520 case KERN_INVALID_NAME
:
2522 _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
2527 (void)dispatch_assume_zero(kr
);
2531 return mps
? kr
: 0;
2535 _dispatch_kevent_mach_recv_reenable(struct kevent64_s
*ke DISPATCH_UNUSED
)
2537 #if (TARGET_IPHONE_SIMULATOR && \
2538 IPHONE_SIMULATOR_HOST_MIN_VERSION_REQUIRED < 1090) || \
2539 (!TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED < 1090)
2540 // delete and re-add kevent to workaround <rdar://problem/13924256>
2541 if (ke
->ext
[1] != _dispatch_mach_recv_kevent
.ext
[1]) {
2542 struct kevent64_s kev
= _dispatch_mach_recv_kevent
;
2543 kev
.flags
= EV_DELETE
;
2544 _dispatch_kq_update(&kev
);
2547 _dispatch_mgr_kevent_reenable(&_dispatch_mach_recv_kevent
);
2550 static kern_return_t
2551 _dispatch_kevent_machport_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
2554 kern_return_t kr
= 0;
2555 dispatch_assert_zero(new_flags
& del_flags
);
2556 if ((new_flags
& _DISPATCH_MACH_RECV_FLAGS
) ||
2557 (del_flags
& _DISPATCH_MACH_RECV_FLAGS
)) {
2559 if (new_flags
& _DISPATCH_MACH_RECV_DIRECT_FLAGS
) {
2560 mps
= _dispatch_get_mach_recv_portset();
2561 } else if ((new_flags
& DISPATCH_MACH_RECV_MESSAGE
) ||
2562 ((del_flags
& _DISPATCH_MACH_RECV_DIRECT_FLAGS
) &&
2563 (dk
->dk_kevent
.fflags
& DISPATCH_MACH_RECV_MESSAGE
))) {
2564 mps
= _dispatch_get_mach_portset();
2566 mps
= MACH_PORT_NULL
;
2568 kr
= _dispatch_mach_portset_update(dk
, mps
);
2573 static kern_return_t
2574 _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk
, uint32_t new_flags
,
2577 kern_return_t kr
= 0;
2578 dispatch_assert_zero(new_flags
& del_flags
);
2579 if ((new_flags
& _DISPATCH_MACH_SP_FLAGS
) ||
2580 (del_flags
& _DISPATCH_MACH_SP_FLAGS
)) {
2581 // Requesting a (delayed) non-sync send-possible notification
2582 // registers for both immediate dead-name notification and delayed-arm
2583 // send-possible notification for the port.
2584 // The send-possible notification is armed when a mach_msg() with the
2585 // the MACH_SEND_NOTIFY to the port times out.
2586 // If send-possible is unavailable, fall back to immediate dead-name
2587 // registration rdar://problem/2527840&9008724
2588 kr
= _dispatch_mach_notify_update(dk
, new_flags
, del_flags
,
2589 _DISPATCH_MACH_SP_FLAGS
, MACH_NOTIFY_SEND_POSSIBLE
,
2590 MACH_NOTIFY_SEND_POSSIBLE
== MACH_NOTIFY_DEAD_NAME
? 1 : 0);
2596 _dispatch_kevent_mach_portset(struct kevent64_s
*ke
)
2598 if (ke
->ident
== _dispatch_mach_recv_portset
) {
2599 return _dispatch_kevent_mach_msg_drain(ke
);
2600 } else if (ke
->ident
== _dispatch_mach_portset
) {
2601 return _dispatch_kevent_machport_drain(ke
);
2603 return _dispatch_kevent_error(ke
);
2609 _dispatch_kevent_machport_drain(struct kevent64_s
*ke
)
2611 mach_port_t name
= (mach_port_name_t
)ke
->data
;
2612 dispatch_kevent_t dk
;
2613 struct kevent64_s kev
;
2615 _dispatch_debug_machport(name
);
2616 dk
= _dispatch_kevent_find(name
, EVFILT_MACHPORT
);
2617 if (!dispatch_assume(dk
)) {
2620 _dispatch_mach_portset_update(dk
, MACH_PORT_NULL
); // emulate EV_DISPATCH
2622 EV_SET64(&kev
, name
, EVFILT_MACHPORT
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
,
2623 DISPATCH_MACH_RECV_MESSAGE
, 0, (uintptr_t)dk
, 0, 0);
2624 _dispatch_kevent_debug(&kev
, __func__
);
2625 _dispatch_kevent_merge(&kev
);
2630 _dispatch_kevent_mach_msg_drain(struct kevent64_s
*ke
)
2632 mach_msg_header_t
*hdr
= (mach_msg_header_t
*)ke
->ext
[0];
2633 mach_msg_size_t siz
, msgsiz
;
2634 mach_msg_return_t kr
= (mach_msg_return_t
)ke
->fflags
;
2636 _dispatch_kevent_mach_recv_reenable(ke
);
2637 if (!dispatch_assume(hdr
)) {
2638 DISPATCH_CRASH("EVFILT_MACHPORT with no message");
2640 if (fastpath(!kr
)) {
2641 return _dispatch_kevent_mach_msg_recv(hdr
);
2642 } else if (kr
!= MACH_RCV_TOO_LARGE
) {
2645 if (!dispatch_assume(ke
->ext
[1] <= UINT_MAX
-
2646 dispatch_mach_trailer_size
)) {
2647 DISPATCH_CRASH("EVFILT_MACHPORT with overlarge message");
2649 siz
= (mach_msg_size_t
)ke
->ext
[1] + dispatch_mach_trailer_size
;
2652 if (!dispatch_assume(hdr
)) {
2653 // Kernel will discard message too large to fit
2654 hdr
= _dispatch_get_mach_recv_msg_buf();
2655 siz
= _dispatch_mach_recv_msg_buf_size
;
2657 mach_port_t name
= (mach_port_name_t
)ke
->data
;
2658 const mach_msg_option_t options
= ((DISPATCH_MACH_RCV_OPTIONS
|
2659 MACH_RCV_TIMEOUT
) & ~MACH_RCV_LARGE
);
2660 kr
= mach_msg(hdr
, options
, 0, siz
, name
, MACH_MSG_TIMEOUT_NONE
,
2662 if (fastpath(!kr
)) {
2663 return _dispatch_kevent_mach_msg_recv(hdr
);
2664 } else if (kr
== MACH_RCV_TOO_LARGE
) {
2665 _dispatch_log("BUG in libdispatch client: "
2666 "_dispatch_kevent_mach_msg_drain: dropped message too "
2667 "large to fit in memory: id = 0x%x, size = %lld",
2668 hdr
->msgh_id
, ke
->ext
[1]);
2669 kr
= MACH_MSG_SUCCESS
;
2672 // We don't know which port in the portset contains the large message,
2673 // so need to receive all messages pending on the portset to ensure the
2674 // large message is drained. <rdar://problem/13950432>
2675 bool received
= false;
2677 if (!dispatch_assume(hdr
)) {
2678 DISPATCH_CLIENT_CRASH("Message too large to fit in memory");
2680 const mach_msg_option_t options
= (DISPATCH_MACH_RCV_OPTIONS
|
2682 kr
= mach_msg(hdr
, options
, 0, siz
, _dispatch_mach_recv_portset
,
2683 MACH_MSG_TIMEOUT_NONE
, MACH_PORT_NULL
);
2684 if ((!kr
|| kr
== MACH_RCV_TOO_LARGE
) && !dispatch_assume(
2685 hdr
->msgh_size
<= UINT_MAX
- dispatch_mach_trailer_size
)) {
2686 DISPATCH_CRASH("Overlarge message");
2688 if (fastpath(!kr
)) {
2689 msgsiz
= hdr
->msgh_size
+ dispatch_mach_trailer_size
;
2691 void *shrink
= realloc(hdr
, msgsiz
);
2692 if (shrink
) hdr
= shrink
;
2694 _dispatch_kevent_mach_msg_recv(hdr
);
2697 } else if (kr
== MACH_RCV_TOO_LARGE
) {
2698 siz
= hdr
->msgh_size
+ dispatch_mach_trailer_size
;
2700 if (kr
== MACH_RCV_TIMED_OUT
&& received
) {
2701 kr
= MACH_MSG_SUCCESS
;
2705 hdr
= reallocf(hdr
, siz
);
2708 if (hdr
!= _dispatch_get_mach_recv_msg_buf()) {
2713 _dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: "
2714 "message reception failed", kr
);
2719 _dispatch_kevent_mach_msg_recv(mach_msg_header_t
*hdr
)
2721 dispatch_source_refs_t dri
;
2722 dispatch_kevent_t dk
;
2723 mach_port_t name
= hdr
->msgh_local_port
;
2724 mach_msg_size_t siz
= hdr
->msgh_size
+ dispatch_mach_trailer_size
;
2726 if (!dispatch_assume(hdr
->msgh_size
<= UINT_MAX
-
2727 dispatch_mach_trailer_size
)) {
2728 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2729 "received overlarge message");
2730 return _dispatch_kevent_mach_msg_destroy(hdr
);
2732 if (!dispatch_assume(name
)) {
2733 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2734 "received message with MACH_PORT_NULL port");
2735 return _dispatch_kevent_mach_msg_destroy(hdr
);
2737 _dispatch_debug_machport(name
);
2738 dk
= _dispatch_kevent_find(name
, EVFILT_MACHPORT
);
2739 if (!dispatch_assume(dk
)) {
2740 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2741 "received message with unknown kevent");
2742 return _dispatch_kevent_mach_msg_destroy(hdr
);
2744 _dispatch_kevent_debug(&dk
->dk_kevent
, __func__
);
2745 TAILQ_FOREACH(dri
, &dk
->dk_sources
, dr_list
) {
2746 dispatch_source_t dsi
= _dispatch_source_from_refs(dri
);
2747 if (dsi
->ds_pending_data_mask
& _DISPATCH_MACH_RECV_DIRECT_FLAGS
) {
2748 return _dispatch_source_merge_mach_msg(dsi
, dri
, dk
, hdr
, siz
);
2751 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2752 "received message with no listeners");
2753 return _dispatch_kevent_mach_msg_destroy(hdr
);
2757 _dispatch_kevent_mach_msg_destroy(mach_msg_header_t
*hdr
)
2760 mach_msg_destroy(hdr
);
2761 if (hdr
!= _dispatch_get_mach_recv_msg_buf()) {
2768 _dispatch_source_merge_mach_msg(dispatch_source_t ds
, dispatch_source_refs_t dr
,
2769 dispatch_kevent_t dk
, mach_msg_header_t
*hdr
, mach_msg_size_t siz
)
2771 if (ds
== _dispatch_mach_notify_source
) {
2772 _dispatch_mach_notify_source_invoke(hdr
);
2773 return _dispatch_kevent_mach_msg_destroy(hdr
);
2775 if (dk
->dk_kevent
.fflags
& DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
) {
2776 _dispatch_mach_reply_kevent_unregister((dispatch_mach_t
)ds
,
2777 (dispatch_mach_reply_refs_t
)dr
, false);
2779 return _dispatch_mach_msg_recv((dispatch_mach_t
)ds
, hdr
, siz
);
2782 DISPATCH_ALWAYS_INLINE
2784 _dispatch_mach_notify_merge(mach_port_t name
, uint32_t flag
, bool final
)
2786 dispatch_source_refs_t dri
, dr_next
;
2787 dispatch_kevent_t dk
;
2788 struct kevent64_s kev
;
2791 dk
= _dispatch_kevent_find(name
, DISPATCH_EVFILT_MACH_NOTIFICATION
);
2796 // Update notification registration state.
2797 dk
->dk_kevent
.data
&= ~_DISPATCH_MACH_SP_FLAGS
;
2798 EV_SET64(&kev
, name
, DISPATCH_EVFILT_MACH_NOTIFICATION
, EV_ADD
|EV_ENABLE
,
2799 flag
, 0, (uintptr_t)dk
, 0, 0);
2801 // This can never happen again
2804 // Re-register for notification before delivery
2805 unreg
= _dispatch_kevent_resume(dk
, flag
, 0);
2807 DISPATCH_MACH_KEVENT_ARMED(dk
) = 0;
2808 TAILQ_FOREACH_SAFE(dri
, &dk
->dk_sources
, dr_list
, dr_next
) {
2809 dispatch_source_t dsi
= _dispatch_source_from_refs(dri
);
2810 if (dx_type(dsi
) == DISPATCH_MACH_CHANNEL_TYPE
) {
2811 dispatch_mach_t dm
= (dispatch_mach_t
)dsi
;
2812 _dispatch_mach_merge_kevent(dm
, &kev
);
2813 if (unreg
&& dm
->dm_dkev
) {
2814 _dispatch_mach_kevent_unregister(dm
);
2817 _dispatch_source_merge_kevent(dsi
, &kev
);
2819 _dispatch_source_kevent_unregister(dsi
);
2822 if (!dr_next
|| DISPATCH_MACH_KEVENT_ARMED(dk
)) {
2823 // current merge is last in list (dk might have been freed)
2824 // or it re-armed the notification
2830 static kern_return_t
2831 _dispatch_mach_notify_update(dispatch_kevent_t dk
, uint32_t new_flags
,
2832 uint32_t del_flags
, uint32_t mask
, mach_msg_id_t notify_msgid
,
2833 mach_port_mscount_t notify_sync
)
2835 mach_port_t previous
, port
= (mach_port_t
)dk
->dk_kevent
.ident
;
2836 typeof(dk
->dk_kevent
.data
) prev
= dk
->dk_kevent
.data
;
2837 kern_return_t kr
, krr
= 0;
2839 // Update notification registration state.
2840 dk
->dk_kevent
.data
|= (new_flags
| dk
->dk_kevent
.fflags
) & mask
;
2841 dk
->dk_kevent
.data
&= ~(del_flags
& mask
);
2843 _dispatch_debug_machport(port
);
2844 if ((dk
->dk_kevent
.data
& mask
) && !(prev
& mask
)) {
2845 // initialize _dispatch_mach_notify_port:
2846 (void)_dispatch_get_mach_recv_portset();
2847 _dispatch_debug("machport[0x%08x]: registering for send-possible "
2848 "notification", port
);
2849 previous
= MACH_PORT_NULL
;
2850 krr
= mach_port_request_notification(mach_task_self(), port
,
2851 notify_msgid
, notify_sync
, _dispatch_mach_notify_port
,
2852 MACH_MSG_TYPE_MAKE_SEND_ONCE
, &previous
);
2853 DISPATCH_VERIFY_MIG(krr
);
2856 case KERN_INVALID_NAME
:
2857 case KERN_INVALID_RIGHT
:
2858 // Supress errors & clear registration state
2859 dk
->dk_kevent
.data
&= ~mask
;
2862 // Else, we dont expect any errors from mach. Log any errors
2863 if (dispatch_assume_zero(krr
)) {
2864 // log the error & clear registration state
2865 dk
->dk_kevent
.data
&= ~mask
;
2866 } else if (dispatch_assume_zero(previous
)) {
2867 // Another subsystem has beat libdispatch to requesting the
2868 // specified Mach notification on this port. We should
2869 // technically cache the previous port and message it when the
2870 // kernel messages our port. Or we can just say screw those
2871 // subsystems and deallocate the previous port.
2872 // They should adopt libdispatch :-P
2873 kr
= mach_port_deallocate(mach_task_self(), previous
);
2874 DISPATCH_VERIFY_MIG(kr
);
2875 (void)dispatch_assume_zero(kr
);
2876 previous
= MACH_PORT_NULL
;
2879 } else if (!(dk
->dk_kevent
.data
& mask
) && (prev
& mask
)) {
2880 _dispatch_debug("machport[0x%08x]: unregistering for send-possible "
2881 "notification", port
);
2882 previous
= MACH_PORT_NULL
;
2883 kr
= mach_port_request_notification(mach_task_self(), port
,
2884 notify_msgid
, notify_sync
, MACH_PORT_NULL
,
2885 MACH_MSG_TYPE_MOVE_SEND_ONCE
, &previous
);
2886 DISPATCH_VERIFY_MIG(kr
);
2889 case KERN_INVALID_NAME
:
2890 case KERN_INVALID_RIGHT
:
2891 case KERN_INVALID_ARGUMENT
:
2894 if (dispatch_assume_zero(kr
)) {
2901 if (slowpath(previous
)) {
2902 // the kernel has not consumed the send-once right yet
2903 (void)dispatch_assume_zero(
2904 _dispatch_send_consume_send_once_right(previous
));
2910 _dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED
)
2912 (void)_dispatch_get_mach_recv_portset();
2913 _dispatch_debug("registering for calendar-change notification");
2914 kern_return_t kr
= host_request_notification(mach_host_self(),
2915 HOST_NOTIFY_CALENDAR_CHANGE
, _dispatch_mach_notify_port
);
2916 DISPATCH_VERIFY_MIG(kr
);
2917 (void)dispatch_assume_zero(kr
);
2921 _dispatch_mach_host_calendar_change_register(void)
2923 static dispatch_once_t pred
;
2924 dispatch_once_f(&pred
, NULL
, _dispatch_mach_host_notify_update
);
2928 _dispatch_mach_notify_source_invoke(mach_msg_header_t
*hdr
)
2930 mig_reply_error_t reply
;
2931 dispatch_assert(sizeof(mig_reply_error_t
) == sizeof(union
2932 __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem
));
2933 dispatch_assert(sizeof(mig_reply_error_t
) < _dispatch_mach_recv_msg_size
);
2934 boolean_t success
= libdispatch_internal_protocol_server(hdr
, &reply
.Head
);
2935 if (!success
&& reply
.RetCode
== MIG_BAD_ID
&& hdr
->msgh_id
== 950) {
2936 // host_notify_reply.defs: host_calendar_changed
2937 _dispatch_debug("calendar-change notification");
2938 _dispatch_timers_calendar_change();
2939 _dispatch_mach_host_notify_update(NULL
);
2941 reply
.RetCode
= KERN_SUCCESS
;
2943 if (dispatch_assume(success
) && reply
.RetCode
!= MIG_NO_REPLY
) {
2944 (void)dispatch_assume_zero(reply
.RetCode
);
2949 _dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED
,
2950 mach_port_name_t name
)
2953 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
2954 "deleted prematurely", name
);
2957 _dispatch_debug_machport(name
);
2958 _dispatch_mach_notify_merge(name
, DISPATCH_MACH_SEND_DELETED
, true);
2960 return KERN_SUCCESS
;
2964 _dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED
,
2965 mach_port_name_t name
)
2969 _dispatch_debug("machport[0x%08x]: dead-name notification", name
);
2970 _dispatch_debug_machport(name
);
2971 _dispatch_mach_notify_merge(name
, DISPATCH_MACH_SEND_DEAD
, true);
2973 // the act of receiving a dead name notification allocates a dead-name
2974 // right that must be deallocated
2975 kr
= mach_port_deallocate(mach_task_self(), name
);
2976 DISPATCH_VERIFY_MIG(kr
);
2977 //(void)dispatch_assume_zero(kr);
2979 return KERN_SUCCESS
;
2983 _dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED
,
2984 mach_port_name_t name
)
2986 _dispatch_debug("machport[0x%08x]: send-possible notification", name
);
2987 _dispatch_debug_machport(name
);
2988 _dispatch_mach_notify_merge(name
, DISPATCH_MACH_SEND_POSSIBLE
, false);
2990 return KERN_SUCCESS
;
2994 #pragma mark dispatch_mach_t
2996 #define DISPATCH_MACH_NEVER_CONNECTED (UINT32_MAX/2)
2997 #define DISPATCH_MACH_PSEUDO_RECEIVED 0x1
2998 #define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2
2999 #define DISPATCH_MACH_OPTIONS_MASK 0xffff
3001 static mach_port_t
_dispatch_mach_msg_get_remote_port(dispatch_object_t dou
);
3002 static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm
,
3003 mach_port_t local_port
, mach_port_t remote_port
);
3004 static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm
,
3005 dispatch_object_t dou
);
3006 static inline mach_msg_header_t
* _dispatch_mach_msg_get_msg(
3007 dispatch_mach_msg_t dmsg
);
3009 static dispatch_mach_t
3010 _dispatch_mach_create(const char *label
, dispatch_queue_t q
, void *context
,
3011 dispatch_mach_handler_function_t handler
, bool handler_is_block
)
3014 dispatch_mach_refs_t dr
;
3016 dm
= _dispatch_alloc(DISPATCH_VTABLE(mach
),
3017 sizeof(struct dispatch_mach_s
));
3018 _dispatch_queue_init((dispatch_queue_t
)dm
);
3019 dm
->dq_label
= label
;
3021 dm
->do_ref_cnt
++; // the reference _dispatch_mach_cancel_invoke holds
3022 dm
->do_ref_cnt
++; // since channel is created suspended
3023 dm
->do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_INTERVAL
;
3024 dm
->do_targetq
= &_dispatch_mgr_q
;
3026 dr
= _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s
));
3027 dr
->dr_source_wref
= _dispatch_ptr2wref(dm
);
3028 dr
->dm_handler_func
= handler
;
3029 dr
->dm_handler_ctxt
= context
;
3031 dm
->ds_handler_is_block
= handler_is_block
;
3033 dm
->dm_refs
= _dispatch_calloc(1ul,
3034 sizeof(struct dispatch_mach_send_refs_s
));
3035 dm
->dm_refs
->dr_source_wref
= _dispatch_ptr2wref(dm
);
3036 dm
->dm_refs
->dm_disconnect_cnt
= DISPATCH_MACH_NEVER_CONNECTED
;
3037 TAILQ_INIT(&dm
->dm_refs
->dm_replies
);
3039 // First item on the channel sets the user-specified target queue
3040 dispatch_set_target_queue(dm
, q
);
3041 _dispatch_object_debug(dm
, "%s", __func__
);
3046 dispatch_mach_create(const char *label
, dispatch_queue_t q
,
3047 dispatch_mach_handler_t handler
)
3049 dispatch_block_t bb
= _dispatch_Block_copy((void*)handler
);
3050 return _dispatch_mach_create(label
, q
, bb
,
3051 (dispatch_mach_handler_function_t
)_dispatch_Block_invoke(bb
), true);
3055 dispatch_mach_create_f(const char *label
, dispatch_queue_t q
, void *context
,
3056 dispatch_mach_handler_function_t handler
)
3058 return _dispatch_mach_create(label
, q
, context
, handler
, false);
3062 _dispatch_mach_dispose(dispatch_mach_t dm
)
3064 _dispatch_object_debug(dm
, "%s", __func__
);
3065 dispatch_mach_refs_t dr
= dm
->ds_refs
;
3066 if (dm
->ds_handler_is_block
&& dr
->dm_handler_ctxt
) {
3067 Block_release(dr
->dm_handler_ctxt
);
3071 _dispatch_queue_destroy(dm
);
3075 dispatch_mach_connect(dispatch_mach_t dm
, mach_port_t receive
,
3076 mach_port_t send
, dispatch_mach_msg_t checkin
)
3078 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3079 dispatch_kevent_t dk
;
3081 if (MACH_PORT_VALID(receive
)) {
3082 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
3083 dk
->dk_kevent
= _dispatch_source_type_mach_recv_direct
.ke
;
3084 dk
->dk_kevent
.ident
= receive
;
3085 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
3086 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
3087 TAILQ_INIT(&dk
->dk_sources
);
3089 dm
->ds_pending_data_mask
= dk
->dk_kevent
.fflags
;
3090 _dispatch_retain(dm
); // the reference the manager queue holds
3093 if (MACH_PORT_VALID(send
)) {
3095 dispatch_retain(checkin
);
3096 dr
->dm_checkin_port
= _dispatch_mach_msg_get_remote_port(checkin
);
3098 dr
->dm_checkin
= checkin
;
3100 // monitor message reply ports
3101 dm
->ds_pending_data_mask
|= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
;
3102 if (slowpath(!dispatch_atomic_cmpxchg2o(dr
, dm_disconnect_cnt
,
3103 DISPATCH_MACH_NEVER_CONNECTED
, 0, release
))) {
3104 DISPATCH_CLIENT_CRASH("Channel already connected");
3106 _dispatch_object_debug(dm
, "%s", __func__
);
3107 return dispatch_resume(dm
);
3112 _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm
,
3113 dispatch_mach_reply_refs_t dmr
, bool disconnected
)
3115 dispatch_kevent_t dk
= dmr
->dm_dkev
;
3116 mach_port_t local_port
= (mach_port_t
)dk
->dk_kevent
.ident
;
3117 TAILQ_REMOVE(&dk
->dk_sources
, (dispatch_source_refs_t
)dmr
, dr_list
);
3118 _dispatch_kevent_unregister(dk
, DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
);
3119 TAILQ_REMOVE(&dm
->dm_refs
->dm_replies
, dmr
, dm_list
);
3122 _dispatch_mach_msg_disconnected(dm
, local_port
, MACH_PORT_NULL
);
3128 _dispatch_mach_reply_kevent_register(dispatch_mach_t dm
, mach_port_t reply
,
3131 dispatch_kevent_t dk
;
3132 dispatch_mach_reply_refs_t dmr
;
3134 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
3135 dk
->dk_kevent
= _dispatch_source_type_mach_recv_direct
.ke
;
3136 dk
->dk_kevent
.ident
= reply
;
3137 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
3138 dk
->dk_kevent
.fflags
= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE
;
3139 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
3140 // make reply context visible to leaks rdar://11777199
3141 dk
->dk_kevent
.ext
[1] = (uintptr_t)ctxt
;
3142 TAILQ_INIT(&dk
->dk_sources
);
3144 dmr
= _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s
));
3145 dmr
->dr_source_wref
= _dispatch_ptr2wref(dm
);
3148 _dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p", reply
,
3151 bool do_resume
= _dispatch_kevent_register(&dmr
->dm_dkev
, &flags
);
3152 TAILQ_INSERT_TAIL(&dmr
->dm_dkev
->dk_sources
, (dispatch_source_refs_t
)dmr
,
3154 TAILQ_INSERT_TAIL(&dm
->dm_refs
->dm_replies
, dmr
, dm_list
);
3155 if (do_resume
&& _dispatch_kevent_resume(dmr
->dm_dkev
, flags
, 0)) {
3156 _dispatch_mach_reply_kevent_unregister(dm
, dmr
, true);
3162 _dispatch_mach_kevent_unregister(dispatch_mach_t dm
)
3164 dispatch_kevent_t dk
= dm
->dm_dkev
;
3166 TAILQ_REMOVE(&dk
->dk_sources
, (dispatch_source_refs_t
)dm
->dm_refs
,
3168 dm
->ds_pending_data_mask
&= ~(unsigned long)
3169 (DISPATCH_MACH_SEND_POSSIBLE
|DISPATCH_MACH_SEND_DEAD
);
3170 _dispatch_kevent_unregister(dk
,
3171 DISPATCH_MACH_SEND_POSSIBLE
|DISPATCH_MACH_SEND_DEAD
);
3176 _dispatch_mach_kevent_register(dispatch_mach_t dm
, mach_port_t send
)
3178 dispatch_kevent_t dk
;
3180 dk
= _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s
));
3181 dk
->dk_kevent
= _dispatch_source_type_mach_send
.ke
;
3182 dk
->dk_kevent
.ident
= send
;
3183 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
3184 dk
->dk_kevent
.fflags
= DISPATCH_MACH_SEND_POSSIBLE
|DISPATCH_MACH_SEND_DEAD
;
3185 dk
->dk_kevent
.udata
= (uintptr_t)dk
;
3186 TAILQ_INIT(&dk
->dk_sources
);
3188 dm
->ds_pending_data_mask
|= dk
->dk_kevent
.fflags
;
3191 bool do_resume
= _dispatch_kevent_register(&dk
, &flags
);
3192 TAILQ_INSERT_TAIL(&dk
->dk_sources
,
3193 (dispatch_source_refs_t
)dm
->dm_refs
, dr_list
);
3195 if (do_resume
&& _dispatch_kevent_resume(dm
->dm_dkev
, flags
, 0)) {
3196 _dispatch_mach_kevent_unregister(dm
);
3201 _dispatch_mach_push(dispatch_object_t dm
, dispatch_object_t dou
)
3203 return _dispatch_queue_push(dm
._dq
, dou
);
3207 _dispatch_mach_msg_set_options(dispatch_object_t dou
, mach_msg_option_t options
)
3209 dou
._do
->do_suspend_cnt
= (unsigned int)options
;
3212 static inline mach_msg_option_t
3213 _dispatch_mach_msg_get_options(dispatch_object_t dou
)
3215 mach_msg_option_t options
= (mach_msg_option_t
)dou
._do
->do_suspend_cnt
;
3220 _dispatch_mach_msg_set_reason(dispatch_object_t dou
, mach_error_t err
,
3221 unsigned long reason
)
3223 dispatch_assert_zero(reason
& ~(unsigned long)code_emask
);
3224 dou
._do
->do_suspend_cnt
= (unsigned int)((err
|| !reason
) ? err
:
3225 err_local
|err_sub(0x3e0)|(mach_error_t
)reason
);
3228 static inline unsigned long
3229 _dispatch_mach_msg_get_reason(dispatch_object_t dou
, mach_error_t
*err_ptr
)
3231 mach_error_t err
= (mach_error_t
)dou
._do
->do_suspend_cnt
;
3232 dou
._do
->do_suspend_cnt
= 0;
3233 if ((err
& system_emask
) == err_local
&& err_get_sub(err
) == 0x3e0) {
3235 return err_get_code(err
);
3238 return err
? DISPATCH_MACH_MESSAGE_SEND_FAILED
: DISPATCH_MACH_MESSAGE_SENT
;
3242 _dispatch_mach_msg_recv(dispatch_mach_t dm
, mach_msg_header_t
*hdr
,
3243 mach_msg_size_t siz
)
3245 _dispatch_debug_machport(hdr
->msgh_remote_port
);
3246 _dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x",
3247 hdr
->msgh_local_port
, hdr
->msgh_id
, hdr
->msgh_remote_port
);
3248 if (slowpath(dm
->ds_atomic_flags
& DSF_CANCELED
)) {
3249 return _dispatch_kevent_mach_msg_destroy(hdr
);
3251 dispatch_mach_msg_t dmsg
;
3252 dispatch_mach_msg_destructor_t destructor
;
3253 destructor
= (hdr
== _dispatch_get_mach_recv_msg_buf()) ?
3254 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
:
3255 DISPATCH_MACH_MSG_DESTRUCTOR_FREE
;
3256 dmsg
= dispatch_mach_msg_create(hdr
, siz
, destructor
, NULL
);
3257 _dispatch_mach_msg_set_reason(dmsg
, 0, DISPATCH_MACH_MESSAGE_RECEIVED
);
3258 return _dispatch_mach_push(dm
, dmsg
);
3261 static inline mach_port_t
3262 _dispatch_mach_msg_get_remote_port(dispatch_object_t dou
)
3264 mach_msg_header_t
*hdr
= _dispatch_mach_msg_get_msg(dou
._dmsg
);
3265 mach_port_t remote
= hdr
->msgh_remote_port
;
3269 static inline mach_port_t
3270 _dispatch_mach_msg_get_reply_port(dispatch_mach_t dm
, dispatch_object_t dou
)
3272 mach_msg_header_t
*hdr
= _dispatch_mach_msg_get_msg(dou
._dmsg
);
3273 mach_port_t reply
= MACH_PORT_NULL
;
3274 mach_msg_option_t msg_opts
= _dispatch_mach_msg_get_options(dou
);
3275 if (msg_opts
& DISPATCH_MACH_PSEUDO_RECEIVED
) {
3276 reply
= hdr
->msgh_reserved
;
3277 hdr
->msgh_reserved
= 0;
3278 } else if (MACH_MSGH_BITS_LOCAL(hdr
->msgh_bits
) ==
3279 MACH_MSG_TYPE_MAKE_SEND_ONCE
&&
3280 MACH_PORT_VALID(hdr
->msgh_local_port
) && (!dm
->ds_dkev
||
3281 dm
->ds_dkev
->dk_kevent
.ident
!= hdr
->msgh_local_port
)) {
3282 reply
= hdr
->msgh_local_port
;
3288 _dispatch_mach_msg_disconnected(dispatch_mach_t dm
, mach_port_t local_port
,
3289 mach_port_t remote_port
)
3291 mach_msg_header_t
*hdr
;
3292 dispatch_mach_msg_t dmsg
;
3293 dmsg
= dispatch_mach_msg_create(NULL
, sizeof(mach_msg_header_t
),
3294 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
, &hdr
);
3295 if (local_port
) hdr
->msgh_local_port
= local_port
;
3296 if (remote_port
) hdr
->msgh_remote_port
= remote_port
;
3297 _dispatch_mach_msg_set_reason(dmsg
, 0, DISPATCH_MACH_DISCONNECTED
);
3298 return _dispatch_mach_push(dm
, dmsg
);
3303 _dispatch_mach_msg_not_sent(dispatch_mach_t dm
, dispatch_object_t dou
)
3305 mach_port_t reply
= _dispatch_mach_msg_get_reply_port(dm
, dou
);
3306 _dispatch_mach_msg_set_reason(dou
, 0, DISPATCH_MACH_MESSAGE_NOT_SENT
);
3307 _dispatch_mach_push(dm
, dou
);
3309 _dispatch_mach_msg_disconnected(dm
, reply
, MACH_PORT_NULL
);
3314 static dispatch_object_t
3315 _dispatch_mach_msg_send(dispatch_mach_t dm
, dispatch_object_t dou
)
3317 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3318 dispatch_mach_msg_t dmsg
= dou
._dmsg
;
3319 dr
->dm_needs_mgr
= 0;
3320 if (slowpath(dr
->dm_checkin
) && dmsg
!= dr
->dm_checkin
) {
3321 // send initial checkin message
3322 if (dm
->dm_dkev
&& slowpath(_dispatch_queue_get_current() !=
3323 &_dispatch_mgr_q
)) {
3324 // send kevent must be uninstalled on the manager queue
3325 dr
->dm_needs_mgr
= 1;
3328 dr
->dm_checkin
= _dispatch_mach_msg_send(dm
, dr
->dm_checkin
)._dmsg
;
3329 if (slowpath(dr
->dm_checkin
)) {
3333 mach_msg_header_t
*msg
= _dispatch_mach_msg_get_msg(dmsg
);
3334 mach_msg_return_t kr
= 0;
3335 mach_port_t reply
= _dispatch_mach_msg_get_reply_port(dm
, dmsg
);
3336 mach_msg_option_t opts
= 0, msg_opts
= _dispatch_mach_msg_get_options(dmsg
);
3337 if (!slowpath(msg_opts
& DISPATCH_MACH_REGISTER_FOR_REPLY
)) {
3338 opts
= MACH_SEND_MSG
| (msg_opts
& DISPATCH_MACH_OPTIONS_MASK
);
3339 if (MACH_MSGH_BITS_REMOTE(msg
->msgh_bits
) !=
3340 MACH_MSG_TYPE_MOVE_SEND_ONCE
) {
3341 if (dmsg
!= dr
->dm_checkin
) {
3342 msg
->msgh_remote_port
= dr
->dm_send
;
3344 if (_dispatch_queue_get_current() == &_dispatch_mgr_q
) {
3345 if (slowpath(!dm
->dm_dkev
)) {
3346 _dispatch_mach_kevent_register(dm
, msg
->msgh_remote_port
);
3348 if (fastpath(dm
->dm_dkev
)) {
3349 if (DISPATCH_MACH_KEVENT_ARMED(dm
->dm_dkev
)) {
3352 opts
|= MACH_SEND_NOTIFY
;
3355 opts
|= MACH_SEND_TIMEOUT
;
3357 _dispatch_debug_machport(msg
->msgh_remote_port
);
3358 if (reply
) _dispatch_debug_machport(reply
);
3359 kr
= mach_msg(msg
, opts
, msg
->msgh_size
, 0, MACH_PORT_NULL
, 0,
3362 _dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, opts 0x%x, "
3363 "msg_opts 0x%x, reply on 0x%08x: %s - 0x%x", msg
->msgh_remote_port
,
3364 msg
->msgh_id
, dmsg
->do_ctxt
, opts
, msg_opts
, reply
,
3365 mach_error_string(kr
), kr
);
3366 if (kr
== MACH_SEND_TIMED_OUT
&& (opts
& MACH_SEND_TIMEOUT
)) {
3367 if (opts
& MACH_SEND_NOTIFY
) {
3368 _dispatch_debug("machport[0x%08x]: send-possible notification "
3369 "armed", (mach_port_t
)dm
->dm_dkev
->dk_kevent
.ident
);
3370 DISPATCH_MACH_KEVENT_ARMED(dm
->dm_dkev
) = 1;
3372 // send kevent must be installed on the manager queue
3373 dr
->dm_needs_mgr
= 1;
3376 _dispatch_mach_msg_set_options(dmsg
, msg_opts
|
3377 DISPATCH_MACH_PSEUDO_RECEIVED
);
3378 msg
->msgh_reserved
= reply
; // Remember the original reply port
3382 if (fastpath(!kr
) && reply
) {
3383 if (_dispatch_queue_get_current() != &_dispatch_mgr_q
) {
3384 // reply receive kevent must be installed on the manager queue
3385 dr
->dm_needs_mgr
= 1;
3386 _dispatch_mach_msg_set_options(dmsg
, msg_opts
|
3387 DISPATCH_MACH_REGISTER_FOR_REPLY
);
3388 if (msg_opts
& DISPATCH_MACH_PSEUDO_RECEIVED
) {
3389 msg
->msgh_reserved
= reply
; // Remember the original reply port
3393 _dispatch_mach_reply_kevent_register(dm
, reply
, dmsg
->do_ctxt
);
3395 if (slowpath(dmsg
== dr
->dm_checkin
) && dm
->dm_dkev
) {
3396 _dispatch_mach_kevent_unregister(dm
);
3398 _dispatch_mach_msg_set_reason(dmsg
, kr
, 0);
3399 _dispatch_mach_push(dm
, dmsg
);
3401 if (slowpath(kr
) && reply
) {
3402 // Send failed, so reply was never connected <rdar://problem/14309159>
3403 _dispatch_mach_msg_disconnected(dm
, reply
, MACH_PORT_NULL
);
3406 return (dispatch_object_t
)dmsg
;
3410 _dispatch_mach_send_push(dispatch_mach_t dm
, dispatch_object_t dou
)
3412 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3413 struct dispatch_object_s
*prev
, *dc
= dou
._do
;
3416 prev
= dispatch_atomic_xchg2o(dr
, dm_tail
, dc
, release
);
3417 if (fastpath(prev
)) {
3422 _dispatch_wakeup(dm
);
3427 _dispatch_mach_send_drain(dispatch_mach_t dm
)
3429 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3430 struct dispatch_object_s
*dc
= NULL
, *next_dc
= NULL
;
3431 while (dr
->dm_tail
) {
3432 while (!(dc
= fastpath(dr
->dm_head
))) {
3433 dispatch_hardware_pause();
3436 next_dc
= fastpath(dc
->do_next
);
3437 dr
->dm_head
= next_dc
;
3438 if (!next_dc
&& !dispatch_atomic_cmpxchg2o(dr
, dm_tail
, dc
, NULL
,
3440 // Enqueue is TIGHTLY controlled, we won't wait long.
3441 while (!(next_dc
= fastpath(dc
->do_next
))) {
3442 dispatch_hardware_pause();
3444 dr
->dm_head
= next_dc
;
3446 if (!DISPATCH_OBJ_IS_VTABLE(dc
)) {
3447 if ((long)dc
->do_vtable
& DISPATCH_OBJ_BARRIER_BIT
) {
3449 // leave send queue locked until barrier has completed
3450 return _dispatch_mach_push(dm
, dc
);
3452 #if DISPATCH_MACH_SEND_SYNC
3453 if (slowpath((long)dc
->do_vtable
& DISPATCH_OBJ_SYNC_SLOW_BIT
)){
3454 _dispatch_thread_semaphore_signal(
3455 (_dispatch_thread_semaphore_t
)dc
->do_ctxt
);
3458 #endif // DISPATCH_MACH_SEND_SYNC
3459 if (slowpath(!_dispatch_mach_reconnect_invoke(dm
, dc
))) {
3464 if (slowpath(dr
->dm_disconnect_cnt
) ||
3465 slowpath(dm
->ds_atomic_flags
& DSF_CANCELED
)) {
3466 _dispatch_mach_msg_not_sent(dm
, dc
);
3469 if (slowpath(dc
= _dispatch_mach_msg_send(dm
, dc
)._do
)) {
3472 } while ((dc
= next_dc
));
3475 // if this is not a complete drain, we must undo some things
3478 !dispatch_atomic_cmpxchg2o(dr
, dm_tail
, NULL
, dc
, relaxed
)) {
3479 // wait for enqueue slow path to finish
3480 while (!(next_dc
= fastpath(dr
->dm_head
))) {
3481 dispatch_hardware_pause();
3483 dc
->do_next
= next_dc
;
3487 (void)dispatch_atomic_dec2o(dr
, dm_sending
, release
);
3488 _dispatch_wakeup(dm
);
3492 _dispatch_mach_send(dispatch_mach_t dm
)
3494 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3495 if (!fastpath(dr
->dm_tail
) || !fastpath(dispatch_atomic_cmpxchg2o(dr
,
3496 dm_sending
, 0, 1, acquire
))) {
3499 _dispatch_object_debug(dm
, "%s", __func__
);
3500 _dispatch_mach_send_drain(dm
);
3505 _dispatch_mach_merge_kevent(dispatch_mach_t dm
, const struct kevent64_s
*ke
)
3507 if (!(ke
->fflags
& dm
->ds_pending_data_mask
)) {
3510 _dispatch_mach_send(dm
);
3515 dispatch_mach_send(dispatch_mach_t dm
, dispatch_mach_msg_t dmsg
,
3516 mach_msg_option_t options
)
3518 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3519 if (slowpath(dmsg
->do_next
!= DISPATCH_OBJECT_LISTLESS
)) {
3520 DISPATCH_CLIENT_CRASH("Message already enqueued");
3522 dispatch_retain(dmsg
);
3523 dispatch_assert_zero(options
& DISPATCH_MACH_OPTIONS_MASK
);
3524 _dispatch_mach_msg_set_options(dmsg
, options
& ~DISPATCH_MACH_OPTIONS_MASK
);
3525 if (slowpath(dr
->dm_tail
) || slowpath(dr
->dm_disconnect_cnt
) ||
3526 slowpath(dm
->ds_atomic_flags
& DSF_CANCELED
) ||
3527 slowpath(!dispatch_atomic_cmpxchg2o(dr
, dm_sending
, 0, 1,
3529 return _dispatch_mach_send_push(dm
, dmsg
);
3531 if (slowpath(dmsg
= _dispatch_mach_msg_send(dm
, dmsg
)._dmsg
)) {
3532 (void)dispatch_atomic_dec2o(dr
, dm_sending
, release
);
3533 return _dispatch_mach_send_push(dm
, dmsg
);
3535 if (slowpath(dr
->dm_tail
)) {
3536 return _dispatch_mach_send_drain(dm
);
3538 (void)dispatch_atomic_dec2o(dr
, dm_sending
, release
);
3539 _dispatch_wakeup(dm
);
3543 _dispatch_mach_disconnect(dispatch_mach_t dm
)
3545 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3547 _dispatch_mach_kevent_unregister(dm
);
3549 if (MACH_PORT_VALID(dr
->dm_send
)) {
3550 _dispatch_mach_msg_disconnected(dm
, MACH_PORT_NULL
, dr
->dm_send
);
3552 dr
->dm_send
= MACH_PORT_NULL
;
3553 if (dr
->dm_checkin
) {
3554 _dispatch_mach_msg_not_sent(dm
, dr
->dm_checkin
);
3555 dr
->dm_checkin
= NULL
;
3557 if (!TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
)) {
3558 dispatch_mach_reply_refs_t dmr
, tmp
;
3559 TAILQ_FOREACH_SAFE(dmr
, &dm
->dm_refs
->dm_replies
, dm_list
, tmp
){
3560 _dispatch_mach_reply_kevent_unregister(dm
, dmr
, true);
3567 _dispatch_mach_cancel(dispatch_mach_t dm
)
3569 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3570 if (!fastpath(dispatch_atomic_cmpxchg2o(dr
, dm_sending
, 0, 1, acquire
))) {
3573 _dispatch_object_debug(dm
, "%s", __func__
);
3574 _dispatch_mach_disconnect(dm
);
3576 mach_port_t local_port
= (mach_port_t
)dm
->ds_dkev
->dk_kevent
.ident
;
3577 _dispatch_source_kevent_unregister((dispatch_source_t
)dm
);
3578 _dispatch_mach_msg_disconnected(dm
, local_port
, MACH_PORT_NULL
);
3580 (void)dispatch_atomic_dec2o(dr
, dm_sending
, release
);
3586 _dispatch_mach_reconnect_invoke(dispatch_mach_t dm
, dispatch_object_t dou
)
3588 if (dm
->dm_dkev
|| !TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
)) {
3589 if (slowpath(_dispatch_queue_get_current() != &_dispatch_mgr_q
)) {
3590 // send/reply kevents must be uninstalled on the manager queue
3594 _dispatch_mach_disconnect(dm
);
3595 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3596 dr
->dm_checkin
= dou
._dc
->dc_data
;
3597 dr
->dm_send
= (mach_port_t
)dou
._dc
->dc_other
;
3598 _dispatch_continuation_free(dou
._dc
);
3599 (void)dispatch_atomic_dec2o(dr
, dm_disconnect_cnt
, relaxed
);
3600 _dispatch_object_debug(dm
, "%s", __func__
);
3606 dispatch_mach_reconnect(dispatch_mach_t dm
, mach_port_t send
,
3607 dispatch_mach_msg_t checkin
)
3609 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3610 (void)dispatch_atomic_inc2o(dr
, dm_disconnect_cnt
, relaxed
);
3611 if (MACH_PORT_VALID(send
) && checkin
) {
3612 dispatch_retain(checkin
);
3613 dr
->dm_checkin_port
= _dispatch_mach_msg_get_remote_port(checkin
);
3616 dr
->dm_checkin_port
= MACH_PORT_NULL
;
3618 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3619 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
);
3620 dc
->dc_func
= (void*)_dispatch_mach_reconnect_invoke
;
3622 dc
->dc_data
= checkin
;
3623 dc
->dc_other
= (void*)(uintptr_t)send
;
3624 return _dispatch_mach_send_push(dm
, dc
);
3627 #if DISPATCH_MACH_SEND_SYNC
3630 _dispatch_mach_send_sync_slow(dispatch_mach_t dm
)
3632 _dispatch_thread_semaphore_t sema
= _dispatch_get_thread_semaphore();
3633 struct dispatch_object_s dc
= {
3634 .do_vtable
= (void *)(DISPATCH_OBJ_SYNC_SLOW_BIT
),
3635 .do_ctxt
= (void*)sema
,
3637 _dispatch_mach_send_push(dm
, &dc
);
3638 _dispatch_thread_semaphore_wait(sema
);
3639 _dispatch_put_thread_semaphore(sema
);
3641 #endif // DISPATCH_MACH_SEND_SYNC
3645 dispatch_mach_get_checkin_port(dispatch_mach_t dm
)
3647 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3648 if (slowpath(dm
->ds_atomic_flags
& DSF_CANCELED
)) {
3649 return MACH_PORT_DEAD
;
3651 return dr
->dm_checkin_port
;
3656 _dispatch_mach_connect_invoke(dispatch_mach_t dm
)
3658 dispatch_mach_refs_t dr
= dm
->ds_refs
;
3659 _dispatch_client_callout4(dr
->dm_handler_ctxt
,
3660 DISPATCH_MACH_CONNECTED
, NULL
, 0, dr
->dm_handler_func
);
3661 dm
->dm_connect_handler_called
= 1;
3666 _dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg
)
3668 dispatch_mach_t dm
= (dispatch_mach_t
)_dispatch_queue_get_current();
3669 dispatch_mach_refs_t dr
= dm
->ds_refs
;
3671 unsigned long reason
= _dispatch_mach_msg_get_reason(dmsg
, &err
);
3673 dmsg
->do_next
= DISPATCH_OBJECT_LISTLESS
;
3674 _dispatch_thread_setspecific(dispatch_queue_key
, dm
->do_targetq
);
3675 if (slowpath(!dm
->dm_connect_handler_called
)) {
3676 _dispatch_mach_connect_invoke(dm
);
3678 _dispatch_client_callout4(dr
->dm_handler_ctxt
, reason
, dmsg
, err
,
3679 dr
->dm_handler_func
);
3680 _dispatch_thread_setspecific(dispatch_queue_key
, (dispatch_queue_t
)dm
);
3681 _dispatch_introspection_queue_item_complete(dmsg
);
3682 dispatch_release(dmsg
);
3687 _dispatch_mach_barrier_invoke(void *ctxt
)
3689 dispatch_mach_t dm
= (dispatch_mach_t
)_dispatch_queue_get_current();
3690 dispatch_mach_refs_t dr
= dm
->ds_refs
;
3691 struct dispatch_continuation_s
*dc
= ctxt
;
3692 void *context
= dc
->dc_data
;
3693 dispatch_function_t barrier
= dc
->dc_other
;
3694 bool send_barrier
= ((long)dc
->do_vtable
& DISPATCH_OBJ_BARRIER_BIT
);
3696 _dispatch_thread_setspecific(dispatch_queue_key
, dm
->do_targetq
);
3697 if (slowpath(!dm
->dm_connect_handler_called
)) {
3698 _dispatch_mach_connect_invoke(dm
);
3700 _dispatch_client_callout(context
, barrier
);
3701 _dispatch_client_callout4(dr
->dm_handler_ctxt
,
3702 DISPATCH_MACH_BARRIER_COMPLETED
, NULL
, 0, dr
->dm_handler_func
);
3703 _dispatch_thread_setspecific(dispatch_queue_key
, (dispatch_queue_t
)dm
);
3705 (void)dispatch_atomic_dec2o(dm
->dm_refs
, dm_sending
, release
);
3711 dispatch_mach_send_barrier_f(dispatch_mach_t dm
, void *context
,
3712 dispatch_function_t barrier
)
3714 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3715 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
| DISPATCH_OBJ_BARRIER_BIT
);
3716 dc
->dc_func
= _dispatch_mach_barrier_invoke
;
3718 dc
->dc_data
= context
;
3719 dc
->dc_other
= barrier
;
3721 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3722 if (slowpath(dr
->dm_tail
) || slowpath(!dispatch_atomic_cmpxchg2o(dr
,
3723 dm_sending
, 0, 1, acquire
))) {
3724 return _dispatch_mach_send_push(dm
, dc
);
3726 // leave send queue locked until barrier has completed
3727 return _dispatch_mach_push(dm
, dc
);
3732 dispatch_mach_receive_barrier_f(dispatch_mach_t dm
, void *context
,
3733 dispatch_function_t barrier
)
3735 dispatch_continuation_t dc
= _dispatch_continuation_alloc();
3736 dc
->do_vtable
= (void *)(DISPATCH_OBJ_ASYNC_BIT
);
3737 dc
->dc_func
= _dispatch_mach_barrier_invoke
;
3739 dc
->dc_data
= context
;
3740 dc
->dc_other
= barrier
;
3741 return _dispatch_mach_push(dm
, dc
);
3746 dispatch_mach_send_barrier(dispatch_mach_t dm
, dispatch_block_t barrier
)
3748 dispatch_mach_send_barrier_f(dm
, _dispatch_Block_copy(barrier
),
3749 _dispatch_call_block_and_release
);
3754 dispatch_mach_receive_barrier(dispatch_mach_t dm
, dispatch_block_t barrier
)
3756 dispatch_mach_receive_barrier_f(dm
, _dispatch_Block_copy(barrier
),
3757 _dispatch_call_block_and_release
);
3762 _dispatch_mach_cancel_invoke(dispatch_mach_t dm
)
3764 dispatch_mach_refs_t dr
= dm
->ds_refs
;
3765 if (slowpath(!dm
->dm_connect_handler_called
)) {
3766 _dispatch_mach_connect_invoke(dm
);
3768 _dispatch_client_callout4(dr
->dm_handler_ctxt
,
3769 DISPATCH_MACH_CANCELED
, NULL
, 0, dr
->dm_handler_func
);
3770 dm
->dm_cancel_handler_called
= 1;
3771 _dispatch_release(dm
); // the retain is done at creation time
3776 dispatch_mach_cancel(dispatch_mach_t dm
)
3778 dispatch_source_cancel((dispatch_source_t
)dm
);
3781 DISPATCH_ALWAYS_INLINE
3782 static inline dispatch_queue_t
3783 _dispatch_mach_invoke2(dispatch_object_t dou
,
3784 _dispatch_thread_semaphore_t
*sema_ptr DISPATCH_UNUSED
)
3786 dispatch_mach_t dm
= dou
._dm
;
3788 // This function performs all mach channel actions. Each action is
3789 // responsible for verifying that it takes place on the appropriate queue.
3790 // If the current queue is not the correct queue for this action, the
3791 // correct queue will be returned and the invoke will be re-driven on that
3794 // The order of tests here in invoke and in probe should be consistent.
3796 dispatch_queue_t dq
= _dispatch_queue_get_current();
3797 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3799 if (slowpath(!dm
->ds_is_installed
)) {
3800 // The channel needs to be installed on the manager queue.
3801 if (dq
!= &_dispatch_mgr_q
) {
3802 return &_dispatch_mgr_q
;
3805 _dispatch_source_kevent_register((dispatch_source_t
)dm
);
3807 dm
->ds_is_installed
= true;
3808 _dispatch_mach_send(dm
);
3809 // Apply initial target queue change
3810 _dispatch_queue_drain(dou
);
3811 if (dm
->dq_items_tail
) {
3812 return dm
->do_targetq
;
3814 } else if (dm
->dq_items_tail
) {
3815 // The channel has pending messages to deliver to the target queue.
3816 if (dq
!= dm
->do_targetq
) {
3817 return dm
->do_targetq
;
3819 dispatch_queue_t tq
= dm
->do_targetq
;
3820 if (slowpath(_dispatch_queue_drain(dou
))) {
3821 DISPATCH_CLIENT_CRASH("Sync onto mach channel");
3823 if (slowpath(tq
!= dm
->do_targetq
)) {
3824 // An item on the channel changed the target queue
3825 return dm
->do_targetq
;
3827 } else if (dr
->dm_tail
) {
3828 if (slowpath(dr
->dm_needs_mgr
) || (slowpath(dr
->dm_disconnect_cnt
) &&
3829 (dm
->dm_dkev
|| !TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
)))) {
3830 // Send/reply kevents need to be installed or uninstalled
3831 if (dq
!= &_dispatch_mgr_q
) {
3832 return &_dispatch_mgr_q
;
3835 if (!(dm
->dm_dkev
&& DISPATCH_MACH_KEVENT_ARMED(dm
->dm_dkev
)) ||
3836 (dm
->ds_atomic_flags
& DSF_CANCELED
) || dr
->dm_disconnect_cnt
) {
3837 // The channel has pending messages to send.
3838 _dispatch_mach_send(dm
);
3840 } else if (dm
->ds_atomic_flags
& DSF_CANCELED
){
3841 // The channel has been cancelled and needs to be uninstalled from the
3842 // manager queue. After uninstallation, the cancellation handler needs
3843 // to be delivered to the target queue.
3844 if (dm
->ds_dkev
|| dm
->dm_dkev
|| dr
->dm_send
||
3845 !TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
)) {
3846 if (dq
!= &_dispatch_mgr_q
) {
3847 return &_dispatch_mgr_q
;
3849 if (!_dispatch_mach_cancel(dm
)) {
3853 if (!dm
->dm_cancel_handler_called
) {
3854 if (dq
!= dm
->do_targetq
) {
3855 return dm
->do_targetq
;
3857 _dispatch_mach_cancel_invoke(dm
);
3865 _dispatch_mach_invoke(dispatch_mach_t dm
)
3867 _dispatch_queue_class_invoke(dm
, _dispatch_mach_invoke2
);
3871 _dispatch_mach_probe(dispatch_mach_t dm
)
3873 // This function determines whether the mach channel needs to be invoked.
3874 // The order of tests here in probe and in invoke should be consistent.
3876 dispatch_mach_send_refs_t dr
= dm
->dm_refs
;
3878 if (slowpath(!dm
->ds_is_installed
)) {
3879 // The channel needs to be installed on the manager queue.
3881 } else if (dm
->dq_items_tail
) {
3882 // The source has pending messages to deliver to the target queue.
3884 } else if (dr
->dm_tail
&&
3885 (!(dm
->dm_dkev
&& DISPATCH_MACH_KEVENT_ARMED(dm
->dm_dkev
)) ||
3886 (dm
->ds_atomic_flags
& DSF_CANCELED
) || dr
->dm_disconnect_cnt
)) {
3887 // The channel has pending messages to send.
3889 } else if (dm
->ds_atomic_flags
& DSF_CANCELED
) {
3890 if (dm
->ds_dkev
|| dm
->dm_dkev
|| dr
->dm_send
||
3891 !TAILQ_EMPTY(&dm
->dm_refs
->dm_replies
) ||
3892 !dm
->dm_cancel_handler_called
) {
3893 // The channel needs to be uninstalled from the manager queue, or
3894 // the cancellation handler needs to be delivered to the target
3904 #pragma mark dispatch_mach_msg_t
3907 dispatch_mach_msg_create(mach_msg_header_t
*msg
, size_t size
,
3908 dispatch_mach_msg_destructor_t destructor
, mach_msg_header_t
**msg_ptr
)
3910 if (slowpath(size
< sizeof(mach_msg_header_t
)) ||
3911 slowpath(destructor
&& !msg
)) {
3912 DISPATCH_CLIENT_CRASH("Empty message");
3914 dispatch_mach_msg_t dmsg
= _dispatch_alloc(DISPATCH_VTABLE(mach_msg
),
3915 sizeof(struct dispatch_mach_msg_s
) +
3916 (destructor
? 0 : size
- sizeof(dmsg
->msg
)));
3920 memcpy(dmsg
->buf
, msg
, size
);
3922 dmsg
->do_next
= DISPATCH_OBJECT_LISTLESS
;
3923 dmsg
->do_targetq
= _dispatch_get_root_queue(0, false);
3924 dmsg
->destructor
= destructor
;
3927 *msg_ptr
= _dispatch_mach_msg_get_msg(dmsg
);
3933 _dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg
)
3935 switch (dmsg
->destructor
) {
3936 case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT
:
3938 case DISPATCH_MACH_MSG_DESTRUCTOR_FREE
:
3941 case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE
: {
3942 mach_vm_size_t vm_size
= dmsg
->size
;
3943 mach_vm_address_t vm_addr
= (uintptr_t)dmsg
->msg
;
3944 (void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(),
3950 static inline mach_msg_header_t
*
3951 _dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg
)
3953 return dmsg
->destructor
? dmsg
->msg
: (mach_msg_header_t
*)dmsg
->buf
;
3957 dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg
, size_t *size_ptr
)
3960 *size_ptr
= dmsg
->size
;
3962 return _dispatch_mach_msg_get_msg(dmsg
);
3966 _dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg
, char* buf
, size_t bufsiz
)
3969 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
3970 dx_kind(dmsg
), dmsg
);
3971 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "xrefcnt = 0x%x, "
3972 "refcnt = 0x%x, ", dmsg
->do_xref_cnt
+ 1, dmsg
->do_ref_cnt
+ 1);
3973 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "opts/err = 0x%x, "
3974 "msgh[%p] = { ", dmsg
->do_suspend_cnt
, dmsg
->buf
);
3975 mach_msg_header_t
*hdr
= _dispatch_mach_msg_get_msg(dmsg
);
3977 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "id 0x%x, ",
3980 if (hdr
->msgh_size
) {
3981 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "size %u, ",
3984 if (hdr
->msgh_bits
) {
3985 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "bits <l %u, r %u",
3986 MACH_MSGH_BITS_LOCAL(hdr
->msgh_bits
),
3987 MACH_MSGH_BITS_REMOTE(hdr
->msgh_bits
));
3988 if (MACH_MSGH_BITS_OTHER(hdr
->msgh_bits
)) {
3989 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ", o 0x%x",
3990 MACH_MSGH_BITS_OTHER(hdr
->msgh_bits
));
3992 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, ">, ");
3994 if (hdr
->msgh_local_port
&& hdr
->msgh_remote_port
) {
3995 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "local 0x%x, "
3996 "remote 0x%x", hdr
->msgh_local_port
, hdr
->msgh_remote_port
);
3997 } else if (hdr
->msgh_local_port
) {
3998 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "local 0x%x",
3999 hdr
->msgh_local_port
);
4000 } else if (hdr
->msgh_remote_port
) {
4001 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "remote 0x%x",
4002 hdr
->msgh_remote_port
);
4004 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "no ports");
4006 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, " } }");
4011 #pragma mark dispatch_mig_server
4014 dispatch_mig_server(dispatch_source_t ds
, size_t maxmsgsz
,
4015 dispatch_mig_callback_t callback
)
4017 mach_msg_options_t options
= MACH_RCV_MSG
| MACH_RCV_TIMEOUT
4018 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX
)
4019 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0
);
4020 mach_msg_options_t tmp_options
;
4021 mig_reply_error_t
*bufTemp
, *bufRequest
, *bufReply
;
4022 mach_msg_return_t kr
= 0;
4023 uint64_t assertion_token
= 0;
4024 unsigned int cnt
= 1000; // do not stall out serial queues
4025 boolean_t demux_success
;
4026 bool received
= false;
4027 size_t rcv_size
= maxmsgsz
+ MAX_TRAILER_SIZE
;
4029 // XXX FIXME -- allocate these elsewhere
4030 bufRequest
= alloca(rcv_size
);
4031 bufReply
= alloca(rcv_size
);
4032 bufReply
->Head
.msgh_size
= 0;
4033 bufRequest
->RetCode
= 0;
4036 options
|= MACH_RCV_LARGE
; // rdar://problem/8422992
4038 tmp_options
= options
;
4039 // XXX FIXME -- change this to not starve out the target queue
4041 if (DISPATCH_OBJECT_SUSPENDED(ds
) || (--cnt
== 0)) {
4042 options
&= ~MACH_RCV_MSG
;
4043 tmp_options
&= ~MACH_RCV_MSG
;
4045 if (!(tmp_options
& MACH_SEND_MSG
)) {
4049 kr
= mach_msg(&bufReply
->Head
, tmp_options
, bufReply
->Head
.msgh_size
,
4050 (mach_msg_size_t
)rcv_size
, (mach_port_t
)ds
->ds_ident_hack
, 0,0);
4052 tmp_options
= options
;
4056 case MACH_SEND_INVALID_DEST
:
4057 case MACH_SEND_TIMED_OUT
:
4058 if (bufReply
->Head
.msgh_bits
& MACH_MSGH_BITS_COMPLEX
) {
4059 mach_msg_destroy(&bufReply
->Head
);
4062 case MACH_RCV_TIMED_OUT
:
4063 // Don't return an error if a message was sent this time or
4064 // a message was successfully received previously
4065 // rdar://problems/7363620&7791738
4066 if(bufReply
->Head
.msgh_remote_port
|| received
) {
4067 kr
= MACH_MSG_SUCCESS
;
4070 case MACH_RCV_INVALID_NAME
:
4073 case MACH_RCV_TOO_LARGE
:
4074 // receive messages that are too large and log their id and size
4075 // rdar://problem/8422992
4076 tmp_options
&= ~MACH_RCV_LARGE
;
4077 size_t large_size
= bufReply
->Head
.msgh_size
+ MAX_TRAILER_SIZE
;
4078 void *large_buf
= malloc(large_size
);
4080 rcv_size
= large_size
;
4081 bufReply
= large_buf
;
4083 if (!mach_msg(&bufReply
->Head
, tmp_options
, 0,
4084 (mach_msg_size_t
)rcv_size
,
4085 (mach_port_t
)ds
->ds_ident_hack
, 0, 0)) {
4086 _dispatch_log("BUG in libdispatch client: "
4087 "dispatch_mig_server received message larger than "
4088 "requested size %zd: id = 0x%x, size = %d",
4089 maxmsgsz
, bufReply
->Head
.msgh_id
,
4090 bufReply
->Head
.msgh_size
);
4098 _dispatch_bug_mach_client(
4099 "dispatch_mig_server: mach_msg() failed", kr
);
4105 if (!(tmp_options
& MACH_RCV_MSG
)) {
4109 if (assertion_token
) {
4110 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4111 int r
= proc_importance_assertion_complete(assertion_token
);
4112 (void)dispatch_assume_zero(r
);
4114 assertion_token
= 0;
4118 bufTemp
= bufRequest
;
4119 bufRequest
= bufReply
;
4122 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4123 int r
= proc_importance_assertion_begin_with_msg(&bufRequest
->Head
,
4124 NULL
, &assertion_token
);
4125 if (r
&& slowpath(r
!= EIO
)) {
4126 (void)dispatch_assume_zero(r
);
4130 demux_success
= callback(&bufRequest
->Head
, &bufReply
->Head
);
4132 if (!demux_success
) {
4133 // destroy the request - but not the reply port
4134 bufRequest
->Head
.msgh_remote_port
= 0;
4135 mach_msg_destroy(&bufRequest
->Head
);
4136 } else if (!(bufReply
->Head
.msgh_bits
& MACH_MSGH_BITS_COMPLEX
)) {
4137 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
4139 if (slowpath(bufReply
->RetCode
)) {
4140 if (bufReply
->RetCode
== MIG_NO_REPLY
) {
4144 // destroy the request - but not the reply port
4145 bufRequest
->Head
.msgh_remote_port
= 0;
4146 mach_msg_destroy(&bufRequest
->Head
);
4150 if (bufReply
->Head
.msgh_remote_port
) {
4151 tmp_options
|= MACH_SEND_MSG
;
4152 if (MACH_MSGH_BITS_REMOTE(bufReply
->Head
.msgh_bits
) !=
4153 MACH_MSG_TYPE_MOVE_SEND_ONCE
) {
4154 tmp_options
|= MACH_SEND_TIMEOUT
;
4160 if (assertion_token
) {
4161 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4162 int r
= proc_importance_assertion_complete(assertion_token
);
4163 (void)dispatch_assume_zero(r
);
4170 #endif /* HAVE_MACH */
4173 #pragma mark dispatch_source_debug
4177 _evfiltstr(short filt
)
4180 #define _evfilt2(f) case (f): return #f
4181 _evfilt2(EVFILT_READ
);
4182 _evfilt2(EVFILT_WRITE
);
4183 _evfilt2(EVFILT_AIO
);
4184 _evfilt2(EVFILT_VNODE
);
4185 _evfilt2(EVFILT_PROC
);
4186 _evfilt2(EVFILT_SIGNAL
);
4187 _evfilt2(EVFILT_TIMER
);
4189 _evfilt2(EVFILT_VM
);
4191 #ifdef EVFILT_MEMORYSTATUS
4192 _evfilt2(EVFILT_MEMORYSTATUS
);
4195 _evfilt2(EVFILT_MACHPORT
);
4196 _evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION
);
4198 _evfilt2(EVFILT_FS
);
4199 _evfilt2(EVFILT_USER
);
4201 _evfilt2(DISPATCH_EVFILT_TIMER
);
4202 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD
);
4203 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR
);
4205 return "EVFILT_missing";
4210 _dispatch_source_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
4212 dispatch_queue_t target
= ds
->do_targetq
;
4213 return dsnprintf(buf
, bufsiz
, "target = %s[%p], ident = 0x%lx, "
4214 "pending_data = 0x%lx, pending_data_mask = 0x%lx, ",
4215 target
&& target
->dq_label
? target
->dq_label
: "", target
,
4216 ds
->ds_ident_hack
, ds
->ds_pending_data
, ds
->ds_pending_data_mask
);
4220 _dispatch_timer_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
4222 dispatch_source_refs_t dr
= ds
->ds_refs
;
4223 return dsnprintf(buf
, bufsiz
, "timer = { target = 0x%llx, deadline = 0x%llx,"
4224 " last_fire = 0x%llx, interval = 0x%llx, flags = 0x%lx }, ",
4225 ds_timer(dr
).target
, ds_timer(dr
).deadline
, ds_timer(dr
).last_fire
,
4226 ds_timer(dr
).interval
, ds_timer(dr
).flags
);
4230 _dispatch_source_debug(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
4233 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
4235 offset
+= _dispatch_object_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
4236 offset
+= _dispatch_source_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
4237 if (ds
->ds_is_timer
) {
4238 offset
+= _dispatch_timer_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
4240 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "filter = %s }",
4241 ds
->ds_dkev
? _evfiltstr(ds
->ds_dkev
->dk_kevent
.filter
) : "????");
4246 _dispatch_mach_debug_attr(dispatch_mach_t dm
, char* buf
, size_t bufsiz
)
4248 dispatch_queue_t target
= dm
->do_targetq
;
4249 return dsnprintf(buf
, bufsiz
, "target = %s[%p], receive = 0x%x, "
4250 "send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, "
4251 "sending = %d, disconnected = %d, canceled = %d ",
4252 target
&& target
->dq_label
? target
->dq_label
: "", target
,
4253 dm
->ds_dkev
?(mach_port_t
)dm
->ds_dkev
->dk_kevent
.ident
:0,
4254 dm
->dm_refs
->dm_send
,
4255 dm
->dm_dkev
?(mach_port_t
)dm
->dm_dkev
->dk_kevent
.ident
:0,
4256 dm
->dm_dkev
&& DISPATCH_MACH_KEVENT_ARMED(dm
->dm_dkev
) ?
4257 " (armed)" : "", dm
->dm_refs
->dm_checkin_port
,
4258 dm
->dm_refs
->dm_checkin
? " (pending)" : "",
4259 dm
->dm_refs
->dm_sending
, dm
->dm_refs
->dm_disconnect_cnt
,
4260 (bool)(dm
->ds_atomic_flags
& DSF_CANCELED
));
4263 _dispatch_mach_debug(dispatch_mach_t dm
, char* buf
, size_t bufsiz
)
4266 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
4267 dm
->dq_label
? dm
->dq_label
: dx_kind(dm
), dm
);
4268 offset
+= _dispatch_object_debug_attr(dm
, &buf
[offset
], bufsiz
- offset
);
4269 offset
+= _dispatch_mach_debug_attr(dm
, &buf
[offset
], bufsiz
- offset
);
4270 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
4276 _dispatch_kevent_debug(struct kevent64_s
* kev
, const char* str
)
4278 _dispatch_log("kevent[%p] = { ident = 0x%llx, filter = %s, flags = 0x%x, "
4279 "fflags = 0x%x, data = 0x%llx, udata = 0x%llx, ext[0] = 0x%llx, "
4280 "ext[1] = 0x%llx }: %s", kev
, kev
->ident
, _evfiltstr(kev
->filter
),
4281 kev
->flags
, kev
->fflags
, kev
->data
, kev
->udata
, kev
->ext
[0],
4286 _dispatch_kevent_debugger2(void *context
)
4289 socklen_t sa_len
= sizeof(sa
);
4290 int c
, fd
= (int)(long)context
;
4292 dispatch_kevent_t dk
;
4293 dispatch_source_t ds
;
4294 dispatch_source_refs_t dr
;
4297 c
= accept(fd
, &sa
, &sa_len
);
4299 if (errno
!= EAGAIN
) {
4300 (void)dispatch_assume_zero(errno
);
4305 int r
= fcntl(c
, F_SETFL
, 0); // disable non-blocking IO
4307 (void)dispatch_assume_zero(errno
);
4310 debug_stream
= fdopen(c
, "a");
4311 if (!dispatch_assume(debug_stream
)) {
4316 fprintf(debug_stream
, "HTTP/1.0 200 OK\r\n");
4317 fprintf(debug_stream
, "Content-type: text/html\r\n");
4318 fprintf(debug_stream
, "Pragma: nocache\r\n");
4319 fprintf(debug_stream
, "\r\n");
4320 fprintf(debug_stream
, "<html>\n");
4321 fprintf(debug_stream
, "<head><title>PID %u</title></head>\n", getpid());
4322 fprintf(debug_stream
, "<body>\n<ul>\n");
4324 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td>"
4325 // "<td>DK</td><td>DK</td><td>DK</td></tr>\n");
4327 for (i
= 0; i
< DSL_HASH_SIZE
; i
++) {
4328 if (TAILQ_EMPTY(&_dispatch_sources
[i
])) {
4331 TAILQ_FOREACH(dk
, &_dispatch_sources
[i
], dk_list
) {
4332 fprintf(debug_stream
, "\t<br><li>DK %p ident %lu filter %s flags "
4333 "0x%hx fflags 0x%x data 0x%lx udata %p\n",
4334 dk
, (unsigned long)dk
->dk_kevent
.ident
,
4335 _evfiltstr(dk
->dk_kevent
.filter
), dk
->dk_kevent
.flags
,
4336 dk
->dk_kevent
.fflags
, (unsigned long)dk
->dk_kevent
.data
,
4337 (void*)dk
->dk_kevent
.udata
);
4338 fprintf(debug_stream
, "\t\t<ul>\n");
4339 TAILQ_FOREACH(dr
, &dk
->dk_sources
, dr_list
) {
4340 ds
= _dispatch_source_from_refs(dr
);
4341 fprintf(debug_stream
, "\t\t\t<li>DS %p refcnt 0x%x suspend "
4342 "0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
4343 ds
, ds
->do_ref_cnt
+ 1, ds
->do_suspend_cnt
,
4344 ds
->ds_pending_data
, ds
->ds_pending_data_mask
,
4345 ds
->ds_atomic_flags
);
4346 if (ds
->do_suspend_cnt
== DISPATCH_OBJECT_SUSPEND_LOCK
) {
4347 dispatch_queue_t dq
= ds
->do_targetq
;
4348 fprintf(debug_stream
, "\t\t<br>DQ: %p refcnt 0x%x suspend "
4349 "0x%x label: %s\n", dq
, dq
->do_ref_cnt
+ 1,
4350 dq
->do_suspend_cnt
, dq
->dq_label
? dq
->dq_label
:"");
4353 fprintf(debug_stream
, "\t\t</ul>\n");
4354 fprintf(debug_stream
, "\t</li>\n");
4357 fprintf(debug_stream
, "</ul>\n</body>\n</html>\n");
4358 fflush(debug_stream
);
4359 fclose(debug_stream
);
4363 _dispatch_kevent_debugger2_cancel(void *context
)
4365 int ret
, fd
= (int)(long)context
;
4369 (void)dispatch_assume_zero(errno
);
4374 _dispatch_kevent_debugger(void *context DISPATCH_UNUSED
)
4377 struct sockaddr_in sa_in
;
4381 .sin_family
= AF_INET
,
4382 .sin_addr
= { htonl(INADDR_LOOPBACK
), },
4385 dispatch_source_t ds
;
4387 int val
, r
, fd
, sock_opt
= 1;
4388 socklen_t slen
= sizeof(sa_u
);
4393 valstr
= getenv("LIBDISPATCH_DEBUGGER");
4399 sa_u
.sa_in
.sin_addr
.s_addr
= 0;
4401 fd
= socket(PF_INET
, SOCK_STREAM
, 0);
4403 (void)dispatch_assume_zero(errno
);
4406 r
= setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&sock_opt
,
4407 (socklen_t
) sizeof sock_opt
);
4409 (void)dispatch_assume_zero(errno
);
4413 r
= fcntl(fd
, F_SETFL
, O_NONBLOCK
);
4415 (void)dispatch_assume_zero(errno
);
4419 r
= bind(fd
, &sa_u
.sa
, sizeof(sa_u
));
4421 (void)dispatch_assume_zero(errno
);
4424 r
= listen(fd
, SOMAXCONN
);
4426 (void)dispatch_assume_zero(errno
);
4429 r
= getsockname(fd
, &sa_u
.sa
, &slen
);
4431 (void)dispatch_assume_zero(errno
);
4435 ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, (uintptr_t)fd
, 0,
4437 if (dispatch_assume(ds
)) {
4438 _dispatch_log("LIBDISPATCH: debug port: %hu",
4439 (in_port_t
)ntohs(sa_u
.sa_in
.sin_port
));
4441 /* ownership of fd transfers to ds */
4442 dispatch_set_context(ds
, (void *)(long)fd
);
4443 dispatch_source_set_event_handler_f(ds
, _dispatch_kevent_debugger2
);
4444 dispatch_source_set_cancel_handler_f(ds
,
4445 _dispatch_kevent_debugger2_cancel
);
4446 dispatch_resume(ds
);
4456 #ifndef MACH_PORT_TYPE_SPREQUEST
4457 #define MACH_PORT_TYPE_SPREQUEST 0x40000000
4462 dispatch_debug_machport(mach_port_t name
, const char* str
)
4464 mach_port_type_t type
;
4465 mach_msg_bits_t ns
= 0, nr
= 0, nso
= 0, nd
= 0;
4466 unsigned int dnreqs
= 0, dnrsiz
;
4467 kern_return_t kr
= mach_port_type(mach_task_self(), name
, &type
);
4469 _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name
,
4470 kr
, mach_error_string(kr
), str
);
4473 if (type
& MACH_PORT_TYPE_SEND
) {
4474 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
4475 MACH_PORT_RIGHT_SEND
, &ns
));
4477 if (type
& MACH_PORT_TYPE_SEND_ONCE
) {
4478 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
4479 MACH_PORT_RIGHT_SEND_ONCE
, &nso
));
4481 if (type
& MACH_PORT_TYPE_DEAD_NAME
) {
4482 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
4483 MACH_PORT_RIGHT_DEAD_NAME
, &nd
));
4485 if (type
& (MACH_PORT_TYPE_RECEIVE
|MACH_PORT_TYPE_SEND
)) {
4486 (void)dispatch_assume_zero(mach_port_dnrequest_info(mach_task_self(),
4487 name
, &dnrsiz
, &dnreqs
));
4489 if (type
& MACH_PORT_TYPE_RECEIVE
) {
4490 mach_port_status_t status
= { .mps_pset
= 0, };
4491 mach_msg_type_number_t cnt
= MACH_PORT_RECEIVE_STATUS_COUNT
;
4492 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name
,
4493 MACH_PORT_RIGHT_RECEIVE
, &nr
));
4494 (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
4495 name
, MACH_PORT_RECEIVE_STATUS
, (void*)&status
, &cnt
));
4496 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4497 "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
4498 "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
4499 "seqno(%03u) }: %s", name
, nr
, ns
, nso
, nd
, dnreqs
,
4500 type
& MACH_PORT_TYPE_SPREQUEST
? "Y":"N",
4501 status
.mps_nsrequest
? "Y":"N", status
.mps_pdrequest
? "Y":"N",
4502 status
.mps_srights
? "Y":"N", status
.mps_sorights
,
4503 status
.mps_qlimit
, status
.mps_msgcount
, status
.mps_mscount
,
4504 status
.mps_seqno
, str
);
4505 } else if (type
& (MACH_PORT_TYPE_SEND
|MACH_PORT_TYPE_SEND_ONCE
|
4506 MACH_PORT_TYPE_DEAD_NAME
)) {
4507 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4508 "dnreqs(%03u) spreq(%s) }: %s", name
, nr
, ns
, nso
, nd
, dnreqs
,
4509 type
& MACH_PORT_TYPE_SPREQUEST
? "Y":"N", str
);
4511 _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name
, type
,
4518 #endif // DISPATCH_DEBUG