]> git.saurik.com Git - apple/libdispatch.git/blob - src/source.c
libdispatch-703.50.37.tar.gz
[apple/libdispatch.git] / src / source.c
1 /*
2 * Copyright (c) 2008-2016 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #include "protocolServer.h"
25 #endif
26
27 #define DKEV_DISPOSE_IMMEDIATE_DELETE 0x1
28 #define DKEV_UNREGISTER_DISCONNECTED 0x2
29 #define DKEV_UNREGISTER_REPLY_REMOVE 0x4
30 #define DKEV_UNREGISTER_WAKEUP 0x8
31
32 static pthread_priority_t
33 _dispatch_source_compute_kevent_priority(dispatch_source_t ds);
34 static void _dispatch_source_handler_free(dispatch_source_t ds, long kind);
35 static void _dispatch_source_merge_kevent(dispatch_source_t ds,
36 const _dispatch_kevent_qos_s *ke);
37 static bool _dispatch_kevent_register(dispatch_kevent_t *dkp,
38 pthread_priority_t pp, uint32_t *flgp);
39 static long _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg,
40 unsigned int options);
41 static long _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
42 uint32_t del_flags);
43 static void _dispatch_kevent_drain(_dispatch_kevent_qos_s *ke);
44 static void _dispatch_kevent_merge(_dispatch_kevent_qos_s *ke);
45 static void _dispatch_timers_kevent(_dispatch_kevent_qos_s *ke);
46 static void _dispatch_timers_unregister(dispatch_source_t ds,
47 dispatch_kevent_t dk);
48 static void _dispatch_timers_update(dispatch_source_t ds);
49 static void _dispatch_timer_aggregates_check(void);
50 static void _dispatch_timer_aggregates_register(dispatch_source_t ds);
51 static void _dispatch_timer_aggregates_update(dispatch_source_t ds,
52 unsigned int tidx);
53 static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds,
54 unsigned int tidx);
55 static inline unsigned long _dispatch_source_timer_data(
56 dispatch_source_refs_t dr, unsigned long prev);
57 static void _dispatch_kq_deferred_update(const _dispatch_kevent_qos_s *ke);
58 static long _dispatch_kq_immediate_update(_dispatch_kevent_qos_s *ke);
59 static void _dispatch_memorypressure_init(void);
60 #if HAVE_MACH
61 static void _dispatch_mach_host_calendar_change_register(void);
62 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
63 static void _dispatch_mach_recv_msg_buf_init(void);
64 static kern_return_t _dispatch_kevent_machport_resume(dispatch_kevent_t dk,
65 uint32_t new_flags, uint32_t del_flags);
66 #endif
67 static kern_return_t _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk,
68 uint32_t new_flags, uint32_t del_flags);
69 static void _dispatch_mach_kevent_merge(_dispatch_kevent_qos_s *ke);
70 static mach_msg_size_t _dispatch_kevent_mach_msg_size(
71 _dispatch_kevent_qos_s *ke);
72 #else
73 static inline void _dispatch_mach_host_calendar_change_register(void) {}
74 static inline void _dispatch_mach_recv_msg_buf_init(void) {}
75 #endif
76 static const char * _evfiltstr(short filt);
77 #if DISPATCH_DEBUG
78 static void dispatch_kevent_debug(const char *verb,
79 const _dispatch_kevent_qos_s *kev, int i, int n,
80 const char *function, unsigned int line);
81 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \
82 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q)
83 #else
84 static inline void
85 dispatch_kevent_debug(const char *verb, const _dispatch_kevent_qos_s *kev,
86 int i, int n, const char *function, unsigned int line)
87 {
88 (void)verb; (void)kev; (void)i; (void)n; (void)function; (void)line;
89 }
90 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE()
91 #endif
92 #define _dispatch_kevent_debug(verb, _kev) \
93 dispatch_kevent_debug(verb, _kev, 0, 1, __FUNCTION__, __LINE__)
94 #define _dispatch_kevent_debug_n(verb, _kev, i, n) \
95 dispatch_kevent_debug(verb, _kev, i, n, __FUNCTION__, __LINE__)
96 #ifndef DISPATCH_MGR_QUEUE_DEBUG
97 #define DISPATCH_MGR_QUEUE_DEBUG 0
98 #endif
99 #if DISPATCH_MGR_QUEUE_DEBUG
100 #define _dispatch_kevent_mgr_debug _dispatch_kevent_debug
101 #else
102 static inline void
103 _dispatch_kevent_mgr_debug(_dispatch_kevent_qos_s* kev DISPATCH_UNUSED) {}
104 #endif
105
106 #pragma mark -
107 #pragma mark dispatch_source_t
108
109 dispatch_source_t
110 dispatch_source_create(dispatch_source_type_t type, uintptr_t handle,
111 unsigned long mask, dispatch_queue_t dq)
112 {
113 // ensure _dispatch_evfilt_machport_direct_enabled is initialized
114 _dispatch_root_queues_init();
115 const _dispatch_kevent_qos_s *proto_kev = &type->ke;
116 dispatch_source_t ds;
117 dispatch_kevent_t dk;
118
119 // input validation
120 if (type == NULL || (mask & ~type->mask)) {
121 return DISPATCH_BAD_INPUT;
122 }
123 if (type->mask && !mask) {
124 // expect a non-zero mask when the type declares one ... except
125 switch (type->ke.filter) {
126 case DISPATCH_EVFILT_TIMER:
127 break; // timers don't need masks
128 case DISPATCH_EVFILT_MACH_NOTIFICATION:
129 break; // type->init handles zero mask as a legacy case
130 default:
131 // otherwise reject as invalid input
132 return DISPATCH_BAD_INPUT;
133 }
134 }
135
136 switch (type->ke.filter) {
137 case EVFILT_SIGNAL:
138 if (handle >= NSIG) {
139 return DISPATCH_BAD_INPUT;
140 }
141 break;
142 case EVFILT_FS:
143 #if DISPATCH_USE_MEMORYSTATUS
144 case EVFILT_MEMORYSTATUS:
145 #endif
146 case DISPATCH_EVFILT_CUSTOM_ADD:
147 case DISPATCH_EVFILT_CUSTOM_OR:
148 if (handle) {
149 return DISPATCH_BAD_INPUT;
150 }
151 break;
152 case DISPATCH_EVFILT_TIMER:
153 if ((handle == 0) != (type->ke.ident == 0)) {
154 return DISPATCH_BAD_INPUT;
155 }
156 break;
157 default:
158 break;
159 }
160
161 ds = _dispatch_alloc(DISPATCH_VTABLE(source),
162 sizeof(struct dispatch_source_s));
163 // Initialize as a queue first, then override some settings below.
164 _dispatch_queue_init(ds->_as_dq, DQF_NONE, 1, true);
165 ds->dq_label = "source";
166 ds->do_ref_cnt++; // the reference the manager queue holds
167
168 switch (type->ke.filter) {
169 case DISPATCH_EVFILT_CUSTOM_OR:
170 dk = DISPATCH_KEV_CUSTOM_OR;
171 break;
172 case DISPATCH_EVFILT_CUSTOM_ADD:
173 dk = DISPATCH_KEV_CUSTOM_ADD;
174 break;
175 default:
176 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
177 dk->dk_kevent = *proto_kev;
178 dk->dk_kevent.ident = handle;
179 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
180 dk->dk_kevent.fflags |= (uint32_t)mask;
181 dk->dk_kevent.udata = (_dispatch_kevent_qos_udata_t)dk;
182 TAILQ_INIT(&dk->dk_sources);
183
184 ds->ds_pending_data_mask = dk->dk_kevent.fflags;
185 ds->ds_ident_hack = (uintptr_t)dk->dk_kevent.ident;
186 if (EV_UDATA_SPECIFIC & proto_kev->flags) {
187 dk->dk_kevent.flags |= EV_DISPATCH;
188 ds->ds_is_direct_kevent = true;
189 ds->ds_needs_rearm = true;
190 }
191 break;
192 }
193 ds->ds_dkev = dk;
194
195 if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
196 ds->ds_needs_rearm = true;
197 } else if (!(EV_CLEAR & proto_kev->flags)) {
198 // we cheat and use EV_CLEAR to mean a "flag thingy"
199 ds->ds_is_adder = true;
200 }
201 // Some sources require special processing
202 if (type->init != NULL) {
203 type->init(ds, type, handle, mask);
204 }
205 dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
206 if (!ds->ds_is_custom_source && (dk->dk_kevent.flags & EV_VANISHED)) {
207 // see _dispatch_source_merge_kevent
208 dispatch_assert(!(dk->dk_kevent.flags & EV_ONESHOT));
209 dispatch_assert(dk->dk_kevent.flags & EV_DISPATCH);
210 dispatch_assert(dk->dk_kevent.flags & EV_UDATA_SPECIFIC);
211 }
212
213 if (fastpath(!ds->ds_refs)) {
214 ds->ds_refs = _dispatch_calloc(1ul,
215 sizeof(struct dispatch_source_refs_s));
216 }
217 ds->ds_refs->dr_source_wref = _dispatch_ptr2wref(ds);
218
219 if (slowpath(!dq)) {
220 dq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true);
221 } else {
222 _dispatch_retain(dq);
223 }
224 ds->do_targetq = dq;
225 _dispatch_object_debug(ds, "%s", __func__);
226 return ds;
227 }
228
229 void
230 _dispatch_source_dispose(dispatch_source_t ds)
231 {
232 _dispatch_object_debug(ds, "%s", __func__);
233 _dispatch_source_handler_free(ds, DS_REGISTN_HANDLER);
234 _dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
235 _dispatch_source_handler_free(ds, DS_CANCEL_HANDLER);
236 free(ds->ds_refs);
237 _dispatch_queue_destroy(ds->_as_dq);
238 }
239
240 void
241 _dispatch_source_xref_dispose(dispatch_source_t ds)
242 {
243 dx_wakeup(ds, 0, DISPATCH_WAKEUP_FLUSH);
244 }
245
246 long
247 dispatch_source_testcancel(dispatch_source_t ds)
248 {
249 return (bool)(ds->dq_atomic_flags & DSF_CANCELED);
250 }
251
252 unsigned long
253 dispatch_source_get_mask(dispatch_source_t ds)
254 {
255 unsigned long mask = ds->ds_pending_data_mask;
256 if (ds->ds_vmpressure_override) {
257 mask = NOTE_VM_PRESSURE;
258 }
259 #if TARGET_IPHONE_SIMULATOR
260 else if (ds->ds_memorypressure_override) {
261 mask = NOTE_MEMORYSTATUS_PRESSURE_WARN;
262 }
263 #endif
264 return mask;
265 }
266
267 uintptr_t
268 dispatch_source_get_handle(dispatch_source_t ds)
269 {
270 unsigned int handle = (unsigned int)ds->ds_ident_hack;
271 #if TARGET_IPHONE_SIMULATOR
272 if (ds->ds_memorypressure_override) {
273 handle = 0;
274 }
275 #endif
276 return handle;
277 }
278
279 unsigned long
280 dispatch_source_get_data(dispatch_source_t ds)
281 {
282 unsigned long data = ds->ds_data;
283 if (ds->ds_vmpressure_override) {
284 data = NOTE_VM_PRESSURE;
285 }
286 #if TARGET_IPHONE_SIMULATOR
287 else if (ds->ds_memorypressure_override) {
288 data = NOTE_MEMORYSTATUS_PRESSURE_WARN;
289 }
290 #endif
291 return data;
292 }
293
294 DISPATCH_ALWAYS_INLINE
295 static inline void
296 _dispatch_source_merge_data2(dispatch_source_t ds,
297 pthread_priority_t pp, unsigned long val)
298 {
299 _dispatch_kevent_qos_s kev = {
300 .fflags = (typeof(kev.fflags))val,
301 .data = (typeof(kev.data))val,
302 #if DISPATCH_USE_KEVENT_QOS
303 .qos = (_dispatch_kevent_priority_t)pp,
304 #endif
305 };
306 #if !DISPATCH_USE_KEVENT_QOS
307 (void)pp;
308 #endif
309
310 dispatch_assert(ds->ds_dkev == DISPATCH_KEV_CUSTOM_OR ||
311 ds->ds_dkev == DISPATCH_KEV_CUSTOM_ADD);
312 _dispatch_kevent_debug("synthetic data", &kev);
313 _dispatch_source_merge_kevent(ds, &kev);
314 }
315
316 void
317 dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
318 {
319 _dispatch_source_merge_data2(ds, 0, val);
320 }
321
322 void
323 _dispatch_source_merge_data(dispatch_source_t ds, pthread_priority_t pp,
324 unsigned long val)
325 {
326 _dispatch_source_merge_data2(ds, pp, val);
327 }
328
329 #pragma mark -
330 #pragma mark dispatch_source_handler
331
332 DISPATCH_ALWAYS_INLINE
333 static inline dispatch_continuation_t
334 _dispatch_source_get_handler(dispatch_source_refs_t dr, long kind)
335 {
336 return os_atomic_load(&dr->ds_handler[kind], relaxed);
337 }
338 #define _dispatch_source_get_event_handler(dr) \
339 _dispatch_source_get_handler(dr, DS_EVENT_HANDLER)
340 #define _dispatch_source_get_cancel_handler(dr) \
341 _dispatch_source_get_handler(dr, DS_CANCEL_HANDLER)
342 #define _dispatch_source_get_registration_handler(dr) \
343 _dispatch_source_get_handler(dr, DS_REGISTN_HANDLER)
344
345 DISPATCH_ALWAYS_INLINE
346 static inline dispatch_continuation_t
347 _dispatch_source_handler_alloc(dispatch_source_t ds, void *func, long kind,
348 bool block)
349 {
350 // sources don't propagate priority by default
351 const dispatch_block_flags_t flags =
352 DISPATCH_BLOCK_HAS_PRIORITY | DISPATCH_BLOCK_NO_VOUCHER;
353 dispatch_continuation_t dc = _dispatch_continuation_alloc();
354 if (func) {
355 uintptr_t dc_flags = 0;
356
357 if (kind != DS_EVENT_HANDLER) {
358 dc_flags |= DISPATCH_OBJ_CONSUME_BIT;
359 }
360 if (block) {
361 #ifdef __BLOCKS__
362 _dispatch_continuation_init(dc, ds, func, 0, flags, dc_flags);
363 #endif /* __BLOCKS__ */
364 } else {
365 dc_flags |= DISPATCH_OBJ_CTXT_FETCH_BIT;
366 _dispatch_continuation_init_f(dc, ds, ds->do_ctxt, func,
367 0, flags, dc_flags);
368 }
369 _dispatch_trace_continuation_push(ds->_as_dq, dc);
370 } else {
371 dc->dc_flags = 0;
372 dc->dc_func = NULL;
373 }
374 return dc;
375 }
376
377 DISPATCH_NOINLINE
378 static void
379 _dispatch_source_handler_dispose(dispatch_continuation_t dc)
380 {
381 #ifdef __BLOCKS__
382 if (dc->dc_flags & DISPATCH_OBJ_BLOCK_BIT) {
383 Block_release(dc->dc_ctxt);
384 }
385 #endif /* __BLOCKS__ */
386 if (dc->dc_voucher) {
387 _voucher_release(dc->dc_voucher);
388 dc->dc_voucher = VOUCHER_INVALID;
389 }
390 _dispatch_continuation_free(dc);
391 }
392
393 DISPATCH_ALWAYS_INLINE
394 static inline dispatch_continuation_t
395 _dispatch_source_handler_take(dispatch_source_t ds, long kind)
396 {
397 return os_atomic_xchg(&ds->ds_refs->ds_handler[kind], NULL, relaxed);
398 }
399
400 DISPATCH_ALWAYS_INLINE
401 static inline void
402 _dispatch_source_handler_free(dispatch_source_t ds, long kind)
403 {
404 dispatch_continuation_t dc = _dispatch_source_handler_take(ds, kind);
405 if (dc) _dispatch_source_handler_dispose(dc);
406 }
407
408 DISPATCH_ALWAYS_INLINE
409 static inline void
410 _dispatch_source_handler_replace(dispatch_source_t ds, long kind,
411 dispatch_continuation_t dc)
412 {
413 if (!dc->dc_func) {
414 _dispatch_continuation_free(dc);
415 dc = NULL;
416 } else if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
417 dc->dc_ctxt = ds->do_ctxt;
418 }
419 dc = os_atomic_xchg(&ds->ds_refs->ds_handler[kind], dc, release);
420 if (dc) _dispatch_source_handler_dispose(dc);
421 }
422
423 DISPATCH_NOINLINE
424 static void
425 _dispatch_source_set_handler_slow(void *context)
426 {
427 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
428 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
429
430 dispatch_continuation_t dc = context;
431 long kind = (long)dc->dc_data;
432 dc->dc_data = NULL;
433 _dispatch_source_handler_replace(ds, kind, dc);
434 }
435
436 DISPATCH_NOINLINE
437 static void
438 _dispatch_source_set_handler(dispatch_source_t ds, long kind,
439 dispatch_continuation_t dc)
440 {
441 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
442 if (_dispatch_queue_try_inactive_suspend(ds->_as_dq)) {
443 _dispatch_source_handler_replace(ds, kind, dc);
444 return dx_vtable(ds)->do_resume(ds, false);
445 }
446 _dispatch_ktrace1(DISPATCH_PERF_post_activate_mutation, ds);
447 if (kind == DS_REGISTN_HANDLER) {
448 _dispatch_bug_deprecated("Setting registration handler after "
449 "the source has been activated");
450 }
451 dc->dc_data = (void *)kind;
452 _dispatch_barrier_trysync_or_async_f(ds->_as_dq, dc,
453 _dispatch_source_set_handler_slow);
454 }
455
456 #ifdef __BLOCKS__
457 void
458 dispatch_source_set_event_handler(dispatch_source_t ds,
459 dispatch_block_t handler)
460 {
461 dispatch_continuation_t dc;
462 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true);
463 _dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
464 }
465 #endif /* __BLOCKS__ */
466
467 void
468 dispatch_source_set_event_handler_f(dispatch_source_t ds,
469 dispatch_function_t handler)
470 {
471 dispatch_continuation_t dc;
472 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false);
473 _dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
474 }
475
476 void
477 _dispatch_source_set_event_handler_continuation(dispatch_source_t ds,
478 dispatch_continuation_t dc)
479 {
480 _dispatch_trace_continuation_push(ds->_as_dq, dc);
481 _dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
482 }
483
484 #ifdef __BLOCKS__
485 void
486 dispatch_source_set_cancel_handler(dispatch_source_t ds,
487 dispatch_block_t handler)
488 {
489 dispatch_continuation_t dc;
490 dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, true);
491 _dispatch_source_set_handler(ds, DS_CANCEL_HANDLER, dc);
492 }
493 #endif /* __BLOCKS__ */
494
495 void
496 dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
497 dispatch_function_t handler)
498 {
499 dispatch_continuation_t dc;
500 dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, false);
501 _dispatch_source_set_handler(ds, DS_CANCEL_HANDLER, dc);
502 }
503
504 #ifdef __BLOCKS__
505 void
506 dispatch_source_set_registration_handler(dispatch_source_t ds,
507 dispatch_block_t handler)
508 {
509 dispatch_continuation_t dc;
510 dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, true);
511 _dispatch_source_set_handler(ds, DS_REGISTN_HANDLER, dc);
512 }
513 #endif /* __BLOCKS__ */
514
515 void
516 dispatch_source_set_registration_handler_f(dispatch_source_t ds,
517 dispatch_function_t handler)
518 {
519 dispatch_continuation_t dc;
520 dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, false);
521 _dispatch_source_set_handler(ds, DS_REGISTN_HANDLER, dc);
522 }
523
524 #pragma mark -
525 #pragma mark dispatch_source_invoke
526
527 static void
528 _dispatch_source_registration_callout(dispatch_source_t ds, dispatch_queue_t cq,
529 dispatch_invoke_flags_t flags)
530 {
531 dispatch_continuation_t dc;
532
533 dc = _dispatch_source_handler_take(ds, DS_REGISTN_HANDLER);
534 if (ds->dq_atomic_flags & (DSF_CANCELED | DQF_RELEASED)) {
535 // no registration callout if source is canceled rdar://problem/8955246
536 return _dispatch_source_handler_dispose(dc);
537 }
538 if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
539 dc->dc_ctxt = ds->do_ctxt;
540 }
541 _dispatch_continuation_pop(dc, cq, flags);
542 }
543
544 static void
545 _dispatch_source_cancel_callout(dispatch_source_t ds, dispatch_queue_t cq,
546 dispatch_invoke_flags_t flags)
547 {
548 dispatch_continuation_t dc;
549
550 dc = _dispatch_source_handler_take(ds, DS_CANCEL_HANDLER);
551 ds->ds_pending_data_mask = 0;
552 ds->ds_pending_data = 0;
553 ds->ds_data = 0;
554 _dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
555 _dispatch_source_handler_free(ds, DS_REGISTN_HANDLER);
556 if (!dc) {
557 return;
558 }
559 if (!(ds->dq_atomic_flags & DSF_CANCELED)) {
560 return _dispatch_source_handler_dispose(dc);
561 }
562 if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
563 dc->dc_ctxt = ds->do_ctxt;
564 }
565 _dispatch_continuation_pop(dc, cq, flags);
566 }
567
568 static void
569 _dispatch_source_latch_and_call(dispatch_source_t ds, dispatch_queue_t cq,
570 dispatch_invoke_flags_t flags)
571 {
572 unsigned long prev;
573
574 dispatch_source_refs_t dr = ds->ds_refs;
575 dispatch_continuation_t dc = _dispatch_source_get_handler(dr, DS_EVENT_HANDLER);
576 prev = os_atomic_xchg2o(ds, ds_pending_data, 0, relaxed);
577 if (ds->ds_is_level) {
578 ds->ds_data = ~prev;
579 } else if (ds->ds_is_timer && ds_timer(dr).target && prev) {
580 ds->ds_data = _dispatch_source_timer_data(dr, prev);
581 } else {
582 ds->ds_data = prev;
583 }
584 if (!dispatch_assume(prev) || !dc) {
585 return;
586 }
587 _dispatch_continuation_pop(dc, cq, flags);
588 if (ds->ds_is_timer && (ds_timer(dr).flags & DISPATCH_TIMER_AFTER)) {
589 _dispatch_source_handler_free(ds, DS_EVENT_HANDLER);
590 dispatch_release(ds); // dispatch_after sources are one-shot
591 }
592 }
593
594 static void
595 _dispatch_source_kevent_unregister(dispatch_source_t ds)
596 {
597 _dispatch_object_debug(ds, "%s", __func__);
598 uint32_t flags = (uint32_t)ds->ds_pending_data_mask;
599 dispatch_kevent_t dk = ds->ds_dkev;
600 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
601 if (ds->ds_is_custom_source) {
602 ds->ds_dkev = NULL;
603 goto done;
604 }
605
606 if (ds->ds_is_direct_kevent &&
607 ((dqf & DSF_DELETED) || !(ds->ds_is_installed))) {
608 dk->dk_kevent.flags |= EV_DELETE; // already deleted
609 dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE|EV_VANISHED);
610 }
611 if (dk->dk_kevent.filter == DISPATCH_EVFILT_TIMER) {
612 ds->ds_dkev = NULL;
613 if (ds->ds_is_installed) {
614 _dispatch_timers_unregister(ds, dk);
615 }
616 } else if (!ds->ds_is_direct_kevent) {
617 ds->ds_dkev = NULL;
618 dispatch_assert((bool)ds->ds_is_installed);
619 TAILQ_REMOVE(&dk->dk_sources, ds->ds_refs, dr_list);
620 _dispatch_kevent_unregister(dk, flags, 0);
621 } else {
622 unsigned int dkev_dispose_options = 0;
623 if (ds->ds_needs_rearm && !(dqf & DSF_ARMED)) {
624 dkev_dispose_options |= DKEV_DISPOSE_IMMEDIATE_DELETE;
625 } else if (dx_type(ds) == DISPATCH_MACH_CHANNEL_TYPE) {
626 if (!ds->ds_is_direct_kevent) {
627 dkev_dispose_options |= DKEV_DISPOSE_IMMEDIATE_DELETE;
628 }
629 }
630 long r = _dispatch_kevent_unregister(dk, flags, dkev_dispose_options);
631 if (r == EINPROGRESS) {
632 _dispatch_debug("kevent-source[%p]: deferred delete kevent[%p]",
633 ds, dk);
634 _dispatch_queue_atomic_flags_set(ds->_as_dq, DSF_DEFERRED_DELETE);
635 return; // deferred unregistration
636 #if DISPATCH_KEVENT_TREAT_ENOENT_AS_EINPROGRESS
637 } else if (r == ENOENT) {
638 _dispatch_debug("kevent-source[%p]: ENOENT delete kevent[%p]",
639 ds, dk);
640 _dispatch_queue_atomic_flags_set(ds->_as_dq, DSF_DEFERRED_DELETE);
641 return; // potential concurrent EV_DELETE delivery rdar://22047283
642 #endif
643 } else {
644 dispatch_assume_zero(r);
645 }
646 ds->ds_dkev = NULL;
647 _TAILQ_TRASH_ENTRY(ds->ds_refs, dr_list);
648 }
649 done:
650 dqf = _dispatch_queue_atomic_flags_set_and_clear_orig(ds->_as_dq,
651 DSF_DELETED, DSF_ARMED | DSF_DEFERRED_DELETE | DSF_CANCEL_WAITER);
652 if (dqf & DSF_CANCEL_WAITER) {
653 _dispatch_wake_by_address(&ds->dq_atomic_flags);
654 }
655 ds->ds_is_installed = true;
656 ds->ds_needs_rearm = false; // re-arm is pointless and bad now
657 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds, dk);
658 _dispatch_release(ds); // the retain is done at creation time
659 }
660
661 DISPATCH_ALWAYS_INLINE
662 static bool
663 _dispatch_source_tryarm(dispatch_source_t ds)
664 {
665 dispatch_queue_flags_t oqf, nqf;
666 return os_atomic_rmw_loop2o(ds, dq_atomic_flags, oqf, nqf, relaxed, {
667 if (oqf & (DSF_DEFERRED_DELETE | DSF_DELETED)) {
668 // the test is inside the loop because it's convenient but the
669 // result should not change for the duration of the rmw_loop
670 os_atomic_rmw_loop_give_up(break);
671 }
672 nqf = oqf | DSF_ARMED;
673 });
674 }
675
676 static bool
677 _dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags)
678 {
679 switch (ds->ds_dkev->dk_kevent.filter) {
680 case DISPATCH_EVFILT_TIMER:
681 _dispatch_timers_update(ds);
682 _dispatch_queue_atomic_flags_set(ds->_as_dq, DSF_ARMED);
683 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds,
684 ds->ds_dkev);
685 return true;
686 #if HAVE_MACH
687 case EVFILT_MACHPORT:
688 if ((ds->ds_pending_data_mask & DISPATCH_MACH_RECV_MESSAGE) &&
689 !ds->ds_is_direct_kevent) {
690 new_flags |= DISPATCH_MACH_RECV_MESSAGE; // emulate EV_DISPATCH
691 }
692 break;
693 #endif
694 }
695 if (unlikely(!_dispatch_source_tryarm(ds))) {
696 return false;
697 }
698 if (unlikely(_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0))) {
699 _dispatch_queue_atomic_flags_set_and_clear(ds->_as_dq, DSF_DELETED,
700 DSF_ARMED);
701 return false;
702 }
703 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds, ds->ds_dkev);
704 return true;
705 }
706
707 static void
708 _dispatch_source_kevent_register(dispatch_source_t ds, pthread_priority_t pp)
709 {
710 dispatch_assert_zero((bool)ds->ds_is_installed);
711 switch (ds->ds_dkev->dk_kevent.filter) {
712 case DISPATCH_EVFILT_TIMER:
713 // aggressively coalesce background/maintenance QoS timers
714 // <rdar://problem/12200216&27342536>
715 pp = _dispatch_source_compute_kevent_priority(ds);
716 if (_dispatch_is_background_priority(pp)) {
717 ds_timer(ds->ds_refs).flags |= DISPATCH_TIMER_BACKGROUND;
718 }
719 _dispatch_timers_update(ds);
720 _dispatch_queue_atomic_flags_set(ds->_as_dq, DSF_ARMED);
721 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds, ds->ds_dkev);
722 return;
723 }
724 uint32_t flags;
725 bool do_resume = _dispatch_kevent_register(&ds->ds_dkev, pp, &flags);
726 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds->ds_refs, dr_list);
727 ds->ds_is_installed = true;
728 if (do_resume || ds->ds_needs_rearm) {
729 if (unlikely(!_dispatch_source_kevent_resume(ds, flags))) {
730 _dispatch_source_kevent_unregister(ds);
731 }
732 } else {
733 _dispatch_queue_atomic_flags_set(ds->_as_dq, DSF_ARMED);
734 }
735 _dispatch_object_debug(ds, "%s", __func__);
736 }
737
738 static void
739 _dispatch_source_set_event_handler_context(void *ctxt)
740 {
741 dispatch_source_t ds = ctxt;
742 dispatch_continuation_t dc = _dispatch_source_get_event_handler(ds->ds_refs);
743
744 if (dc && (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT)) {
745 dc->dc_ctxt = ds->do_ctxt;
746 }
747 }
748
749 static pthread_priority_t
750 _dispatch_source_compute_kevent_priority(dispatch_source_t ds)
751 {
752 pthread_priority_t p = ds->dq_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK;
753 dispatch_queue_t tq = ds->do_targetq;
754 pthread_priority_t tqp = tq->dq_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK;
755
756 while (unlikely(tq->do_targetq)) {
757 if (unlikely(tq == &_dispatch_mgr_q)) {
758 return _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
759 }
760 if (unlikely(_dispatch_queue_is_thread_bound(tq))) {
761 // thread bound hierarchies are weird, we need to install
762 // from the context of the thread this hierarchy is bound to
763 return 0;
764 }
765 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(tq))) {
766 // this queue may not be activated yet, so the queue graph may not
767 // have stabilized yet
768 _dispatch_ktrace1(DISPATCH_PERF_delayed_registration, ds);
769 return 0;
770 }
771 if (unlikely(!_dispatch_queue_has_immutable_target(tq))) {
772 if (!_dispatch_is_in_root_queues_array(tq->do_targetq)) {
773 // we're not allowed to dereference tq->do_targetq
774 _dispatch_ktrace1(DISPATCH_PERF_delayed_registration, ds);
775 return 0;
776 }
777 }
778 if (!(tq->dq_priority & _PTHREAD_PRIORITY_INHERIT_FLAG)) {
779 if (p < tqp) p = tqp;
780 }
781 tq = tq->do_targetq;
782 tqp = tq->dq_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK;
783 }
784
785 if (unlikely(!tqp)) {
786 // pthread root queues opt out of QoS
787 return 0;
788 }
789 return _dispatch_priority_inherit_from_root_queue(p, tq);
790 }
791
792 void
793 _dispatch_source_finalize_activation(dispatch_source_t ds)
794 {
795 dispatch_continuation_t dc;
796
797 if (unlikely(ds->ds_is_direct_kevent &&
798 (_dispatch_queue_atomic_flags(ds->_as_dq) & DSF_CANCELED))) {
799 return _dispatch_source_kevent_unregister(ds);
800 }
801
802 dc = _dispatch_source_get_event_handler(ds->ds_refs);
803 if (dc) {
804 if (_dispatch_object_is_barrier(dc)) {
805 _dispatch_queue_atomic_flags_set(ds->_as_dq, DQF_BARRIER_BIT);
806 }
807 ds->dq_priority = dc->dc_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK;
808 if (dc->dc_flags & DISPATCH_OBJ_CTXT_FETCH_BIT) {
809 _dispatch_barrier_async_detached_f(ds->_as_dq, ds,
810 _dispatch_source_set_event_handler_context);
811 }
812 }
813
814 // call "super"
815 _dispatch_queue_finalize_activation(ds->_as_dq);
816
817 if (ds->ds_is_direct_kevent && !ds->ds_is_installed) {
818 pthread_priority_t pp = _dispatch_source_compute_kevent_priority(ds);
819 if (pp) _dispatch_source_kevent_register(ds, pp);
820 }
821 }
822
823 DISPATCH_ALWAYS_INLINE
824 static inline dispatch_queue_t
825 _dispatch_source_invoke2(dispatch_object_t dou, dispatch_invoke_flags_t flags,
826 uint64_t *owned, struct dispatch_object_s **dc_ptr DISPATCH_UNUSED)
827 {
828 dispatch_source_t ds = dou._ds;
829 dispatch_queue_t retq = NULL;
830 dispatch_queue_t dq = _dispatch_queue_get_current();
831
832 if (_dispatch_queue_class_probe(ds)) {
833 // Intentionally always drain even when on the manager queue
834 // and not the source's regular target queue: we need to be able
835 // to drain timer setting and the like there.
836 retq = _dispatch_queue_serial_drain(ds->_as_dq, flags, owned, NULL);
837 }
838
839 // This function performs all source actions. Each action is responsible
840 // for verifying that it takes place on the appropriate queue. If the
841 // current queue is not the correct queue for this action, the correct queue
842 // will be returned and the invoke will be re-driven on that queue.
843
844 // The order of tests here in invoke and in wakeup should be consistent.
845
846 dispatch_source_refs_t dr = ds->ds_refs;
847 dispatch_queue_t dkq = &_dispatch_mgr_q;
848
849 if (ds->ds_is_direct_kevent) {
850 dkq = ds->do_targetq;
851 }
852
853 if (!ds->ds_is_installed) {
854 // The source needs to be installed on the kevent queue.
855 if (dq != dkq) {
856 return dkq;
857 }
858 _dispatch_source_kevent_register(ds, _dispatch_get_defaultpriority());
859 }
860
861 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
862 // Source suspended by an item drained from the source queue.
863 return ds->do_targetq;
864 }
865
866 if (_dispatch_source_get_registration_handler(dr)) {
867 // The source has been registered and the registration handler needs
868 // to be delivered on the target queue.
869 if (dq != ds->do_targetq) {
870 return ds->do_targetq;
871 }
872 // clears ds_registration_handler
873 _dispatch_source_registration_callout(ds, dq, flags);
874 }
875
876 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
877 bool prevent_starvation = false;
878
879 if ((dqf & DSF_DEFERRED_DELETE) &&
880 ((dqf & DSF_DELETED) || !(dqf & DSF_ARMED))) {
881 unregister_event:
882 // DSF_DELETE: Pending source kevent unregistration has been completed
883 // !DSF_ARMED: event was delivered and can safely be unregistered
884 if (dq != dkq) {
885 return dkq;
886 }
887 _dispatch_source_kevent_unregister(ds);
888 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
889 }
890
891 if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) && ds->ds_pending_data) {
892 // The source has pending data to deliver via the event handler callback
893 // on the target queue. Some sources need to be rearmed on the kevent
894 // queue after event delivery.
895 if (dq == ds->do_targetq) {
896 _dispatch_source_latch_and_call(ds, dq, flags);
897 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
898
899 // starvation avoidance: if the source triggers itself then force a
900 // re-queue to give other things already queued on the target queue
901 // a chance to run.
902 //
903 // however, if the source is directly targetting an overcommit root
904 // queue, this would requeue the source and ask for a new overcommit
905 // thread right away.
906 prevent_starvation = dq->do_targetq ||
907 !(dq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
908 if (prevent_starvation && ds->ds_pending_data) {
909 retq = ds->do_targetq;
910 }
911 } else {
912 // there is no point trying to be eager, the next thing to do is
913 // to deliver the event
914 return ds->do_targetq;
915 }
916 }
917
918 if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !(dqf & DSF_DEFERRED_DELETE)) {
919 // The source has been cancelled and needs to be uninstalled from the
920 // kevent queue. After uninstallation, the cancellation handler needs
921 // to be delivered to the target queue.
922 if (!(dqf & DSF_DELETED)) {
923 if (dq != dkq) {
924 return dkq;
925 }
926 _dispatch_source_kevent_unregister(ds);
927 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
928 if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
929 if (!(dqf & DSF_ARMED)) {
930 goto unregister_event;
931 }
932 // we need to wait for the EV_DELETE
933 return retq;
934 }
935 }
936 if (dq != ds->do_targetq && (_dispatch_source_get_event_handler(dr) ||
937 _dispatch_source_get_cancel_handler(dr) ||
938 _dispatch_source_get_registration_handler(dr))) {
939 retq = ds->do_targetq;
940 } else {
941 _dispatch_source_cancel_callout(ds, dq, flags);
942 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
943 }
944 prevent_starvation = false;
945 }
946
947 if (ds->ds_needs_rearm && !(dqf & DSF_ARMED)) {
948 // The source needs to be rearmed on the kevent queue.
949 if (dq != dkq) {
950 return dkq;
951 }
952 if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
953 // no need for resume when we can directly unregister the kevent
954 goto unregister_event;
955 }
956 if (prevent_starvation) {
957 // keep the old behavior to force re-enqueue to our target queue
958 // for the rearm. It is inefficient though and we should
959 // improve this <rdar://problem/24635615>.
960 //
961 // if the handler didn't run, or this is a pending delete
962 // or our target queue is a global queue, then starvation is
963 // not a concern and we can rearm right away.
964 return ds->do_targetq;
965 }
966 if (unlikely(!_dispatch_source_kevent_resume(ds, 0))) {
967 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
968 goto unregister_event;
969 }
970 }
971
972 return retq;
973 }
974
975 DISPATCH_NOINLINE
976 void
977 _dispatch_source_invoke(dispatch_source_t ds, dispatch_invoke_flags_t flags)
978 {
979 _dispatch_queue_class_invoke(ds->_as_dq, flags, _dispatch_source_invoke2);
980 }
981
982 void
983 _dispatch_source_wakeup(dispatch_source_t ds, pthread_priority_t pp,
984 dispatch_wakeup_flags_t flags)
985 {
986 // This function determines whether the source needs to be invoked.
987 // The order of tests here in wakeup and in invoke should be consistent.
988
989 dispatch_source_refs_t dr = ds->ds_refs;
990 dispatch_queue_wakeup_target_t dkq = DISPATCH_QUEUE_WAKEUP_MGR;
991 dispatch_queue_wakeup_target_t tq = DISPATCH_QUEUE_WAKEUP_NONE;
992 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
993 bool deferred_delete = (dqf & DSF_DEFERRED_DELETE);
994
995 if (ds->ds_is_direct_kevent) {
996 dkq = DISPATCH_QUEUE_WAKEUP_TARGET;
997 }
998
999 if (!ds->ds_is_installed) {
1000 // The source needs to be installed on the kevent queue.
1001 tq = dkq;
1002 } else if (_dispatch_source_get_registration_handler(dr)) {
1003 // The registration handler needs to be delivered to the target queue.
1004 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
1005 } else if (deferred_delete && ((dqf & DSF_DELETED) || !(dqf & DSF_ARMED))) {
1006 // Pending source kevent unregistration has been completed
1007 // or EV_ONESHOT event can be acknowledged
1008 tq = dkq;
1009 } else if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) && ds->ds_pending_data) {
1010 // The source has pending data to deliver to the target queue.
1011 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
1012 } else if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !deferred_delete) {
1013 // The source needs to be uninstalled from the kevent queue, or the
1014 // cancellation handler needs to be delivered to the target queue.
1015 // Note: cancellation assumes installation.
1016 if (!(dqf & DSF_DELETED)) {
1017 tq = dkq;
1018 } else if (_dispatch_source_get_event_handler(dr) ||
1019 _dispatch_source_get_cancel_handler(dr) ||
1020 _dispatch_source_get_registration_handler(dr)) {
1021 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
1022 }
1023 } else if (ds->ds_needs_rearm && !(dqf & DSF_ARMED)) {
1024 // The source needs to be rearmed on the kevent queue.
1025 tq = dkq;
1026 }
1027 if (!tq && _dispatch_queue_class_probe(ds)) {
1028 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
1029 }
1030
1031 if (tq) {
1032 return _dispatch_queue_class_wakeup(ds->_as_dq, pp, flags, tq);
1033 } else if (pp) {
1034 return _dispatch_queue_class_override_drainer(ds->_as_dq, pp, flags);
1035 } else if (flags & DISPATCH_WAKEUP_CONSUME) {
1036 return _dispatch_release_tailcall(ds);
1037 }
1038 }
1039
1040 void
1041 dispatch_source_cancel(dispatch_source_t ds)
1042 {
1043 _dispatch_object_debug(ds, "%s", __func__);
1044 // Right after we set the cancel flag, someone else
1045 // could potentially invoke the source, do the cancellation,
1046 // unregister the source, and deallocate it. We would
1047 // need to therefore retain/release before setting the bit
1048 _dispatch_retain(ds);
1049
1050 dispatch_queue_t q = ds->_as_dq;
1051 if (_dispatch_queue_atomic_flags_set_orig(q, DSF_CANCELED) & DSF_CANCELED) {
1052 _dispatch_release_tailcall(ds);
1053 } else {
1054 dx_wakeup(ds, 0, DISPATCH_WAKEUP_FLUSH | DISPATCH_WAKEUP_CONSUME);
1055 }
1056 }
1057
1058 void
1059 dispatch_source_cancel_and_wait(dispatch_source_t ds)
1060 {
1061 dispatch_queue_flags_t old_dqf, dqf, new_dqf;
1062 pthread_priority_t pp;
1063
1064 if (unlikely(_dispatch_source_get_cancel_handler(ds->ds_refs))) {
1065 DISPATCH_CLIENT_CRASH(ds, "Source has a cancel handler");
1066 }
1067
1068 _dispatch_object_debug(ds, "%s", __func__);
1069 os_atomic_rmw_loop2o(ds, dq_atomic_flags, old_dqf, new_dqf, relaxed, {
1070 new_dqf = old_dqf | DSF_CANCELED;
1071 if (old_dqf & DSF_CANCEL_WAITER) {
1072 os_atomic_rmw_loop_give_up(break);
1073 }
1074 if ((old_dqf & DSF_STATE_MASK) == DSF_DELETED) {
1075 // just add DSF_CANCELED
1076 } else if ((old_dqf & DSF_DEFERRED_DELETE) || !ds->ds_is_direct_kevent){
1077 new_dqf |= DSF_CANCEL_WAITER;
1078 }
1079 });
1080 dqf = new_dqf;
1081
1082 if (old_dqf & DQF_RELEASED) {
1083 DISPATCH_CLIENT_CRASH(ds, "Dispatch source used after last release");
1084 }
1085 if ((old_dqf & DSF_STATE_MASK) == DSF_DELETED) {
1086 return;
1087 }
1088 if (dqf & DSF_CANCEL_WAITER) {
1089 goto override;
1090 }
1091
1092 // simplified version of _dispatch_queue_drain_try_lock
1093 // that also sets the DIRTY bit on failure to lock
1094 dispatch_lock_owner tid_self = _dispatch_tid_self();
1095 uint64_t xor_owner_and_set_full_width = tid_self |
1096 DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER;
1097 uint64_t old_state, new_state;
1098
1099 os_atomic_rmw_loop2o(ds, dq_state, old_state, new_state, seq_cst, {
1100 new_state = old_state;
1101 if (likely(_dq_state_is_runnable(old_state) &&
1102 !_dq_state_drain_locked(old_state))) {
1103 new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
1104 new_state ^= xor_owner_and_set_full_width;
1105 } else if (old_dqf & DSF_CANCELED) {
1106 os_atomic_rmw_loop_give_up(break);
1107 } else {
1108 // this case needs a release barrier, hence the seq_cst above
1109 new_state |= DISPATCH_QUEUE_DIRTY;
1110 }
1111 });
1112
1113 if (unlikely(_dq_state_is_suspended(old_state))) {
1114 if (unlikely(_dq_state_suspend_cnt(old_state))) {
1115 DISPATCH_CLIENT_CRASH(ds, "Source is suspended");
1116 }
1117 // inactive sources have never been registered and there is no need
1118 // to wait here because activation will notice and mark the source
1119 // as deleted without ever trying to use the fd or mach port.
1120 return dispatch_activate(ds);
1121 }
1122
1123 if (likely(_dq_state_is_runnable(old_state) &&
1124 !_dq_state_drain_locked(old_state))) {
1125 // same thing _dispatch_source_invoke2() does when handling cancellation
1126 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1127 if (!(dqf & (DSF_DEFERRED_DELETE | DSF_DELETED))) {
1128 _dispatch_source_kevent_unregister(ds);
1129 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1130 if (likely((dqf & DSF_STATE_MASK) == DSF_DELETED)) {
1131 _dispatch_source_cancel_callout(ds, NULL, DISPATCH_INVOKE_NONE);
1132 }
1133 }
1134 _dispatch_try_lock_transfer_or_wakeup(ds->_as_dq);
1135 } else if (unlikely(_dq_state_drain_locked_by(old_state, tid_self))) {
1136 DISPATCH_CLIENT_CRASH(ds, "dispatch_source_cancel_and_wait "
1137 "called from a source handler");
1138 } else {
1139 override:
1140 pp = _dispatch_get_priority() & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
1141 if (pp) dx_wakeup(ds, pp, DISPATCH_WAKEUP_OVERRIDING);
1142 dispatch_activate(ds);
1143 }
1144
1145 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1146 while (unlikely((dqf & DSF_STATE_MASK) != DSF_DELETED)) {
1147 if (unlikely(!(dqf & DSF_CANCEL_WAITER))) {
1148 if (!os_atomic_cmpxchgvw2o(ds, dq_atomic_flags,
1149 dqf, dqf | DSF_CANCEL_WAITER, &dqf, relaxed)) {
1150 continue;
1151 }
1152 dqf |= DSF_CANCEL_WAITER;
1153 }
1154 _dispatch_wait_on_address(&ds->dq_atomic_flags, dqf, DLOCK_LOCK_NONE);
1155 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1156 }
1157 }
1158
1159 static void
1160 _dispatch_source_merge_kevent(dispatch_source_t ds,
1161 const _dispatch_kevent_qos_s *ke)
1162 {
1163 _dispatch_object_debug(ds, "%s", __func__);
1164 dispatch_wakeup_flags_t flags = 0;
1165 dispatch_queue_flags_t dqf;
1166 pthread_priority_t pp = 0;
1167
1168 if (ds->ds_needs_rearm || (ke->flags & (EV_DELETE | EV_ONESHOT))) {
1169 // once we modify the queue atomic flags below, it will allow concurrent
1170 // threads running _dispatch_source_invoke2 to dispose of the source,
1171 // so we can't safely borrow the reference we get from the knote udata
1172 // anymore, and need our own
1173 flags = DISPATCH_WAKEUP_CONSUME;
1174 _dispatch_retain(ds); // rdar://20382435
1175 }
1176
1177 if ((ke->flags & EV_UDATA_SPECIFIC) && (ke->flags & EV_ONESHOT) &&
1178 !(ke->flags & EV_DELETE)) {
1179 dqf = _dispatch_queue_atomic_flags_set_and_clear(ds->_as_dq,
1180 DSF_DEFERRED_DELETE, DSF_ARMED);
1181 if (ke->flags & EV_VANISHED) {
1182 _dispatch_bug_kevent_client("kevent", _evfiltstr(ke->filter),
1183 "monitored resource vanished before the source "
1184 "cancel handler was invoked", 0);
1185 }
1186 _dispatch_debug("kevent-source[%p]: %s kevent[%p]", ds,
1187 (ke->flags & EV_VANISHED) ? "vanished" :
1188 "deferred delete oneshot", (void*)ke->udata);
1189 } else if ((ke->flags & EV_DELETE) || (ke->flags & EV_ONESHOT)) {
1190 dqf = _dispatch_queue_atomic_flags_set_and_clear(ds->_as_dq,
1191 DSF_DELETED, DSF_ARMED);
1192 _dispatch_debug("kevent-source[%p]: delete kevent[%p]",
1193 ds, (void*)ke->udata);
1194 if (ke->flags & EV_DELETE) goto done;
1195 } else if (ds->ds_needs_rearm) {
1196 dqf = _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
1197 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p] ",
1198 ds, (void*)ke->udata);
1199 } else {
1200 dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
1201 }
1202
1203 if (dqf & (DSF_CANCELED | DQF_RELEASED)) {
1204 goto done; // rdar://20204025
1205 }
1206 #if HAVE_MACH
1207 if (ke->filter == EVFILT_MACHPORT &&
1208 dx_type(ds) == DISPATCH_MACH_CHANNEL_TYPE) {
1209 DISPATCH_INTERNAL_CRASH(ke->flags,"Unexpected kevent for mach channel");
1210 }
1211 #endif
1212
1213 unsigned long data;
1214 if ((ke->flags & EV_UDATA_SPECIFIC) && (ke->flags & EV_ONESHOT) &&
1215 (ke->flags & EV_VANISHED)) {
1216 // if the resource behind the ident vanished, the event handler can't
1217 // do anything useful anymore, so do not try to call it at all
1218 //
1219 // Note: if the kernel doesn't support EV_VANISHED we always get it
1220 // back unchanged from the flags passed at EV_ADD (registration) time
1221 // Since we never ask for both EV_ONESHOT and EV_VANISHED for sources,
1222 // if we get both bits it was a real EV_VANISHED delivery
1223 os_atomic_store2o(ds, ds_pending_data, 0, relaxed);
1224 #if HAVE_MACH
1225 } else if (ke->filter == EVFILT_MACHPORT) {
1226 data = DISPATCH_MACH_RECV_MESSAGE;
1227 os_atomic_store2o(ds, ds_pending_data, data, relaxed);
1228 #endif
1229 } else if (ds->ds_is_level) {
1230 // ke->data is signed and "negative available data" makes no sense
1231 // zero bytes happens when EV_EOF is set
1232 dispatch_assert(ke->data >= 0l);
1233 data = ~(unsigned long)ke->data;
1234 os_atomic_store2o(ds, ds_pending_data, data, relaxed);
1235 } else if (ds->ds_is_adder) {
1236 data = (unsigned long)ke->data;
1237 os_atomic_add2o(ds, ds_pending_data, data, relaxed);
1238 } else if (ke->fflags & ds->ds_pending_data_mask) {
1239 data = ke->fflags & ds->ds_pending_data_mask;
1240 os_atomic_or2o(ds, ds_pending_data, data, relaxed);
1241 }
1242
1243 done:
1244 #if DISPATCH_USE_KEVENT_QOS
1245 pp = ((pthread_priority_t)ke->qos) & ~_PTHREAD_PRIORITY_FLAGS_MASK;
1246 #endif
1247 dx_wakeup(ds, pp, flags | DISPATCH_WAKEUP_FLUSH);
1248 }
1249
1250 #pragma mark -
1251 #pragma mark dispatch_kevent_t
1252
1253 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
1254
1255 DISPATCH_CACHELINE_ALIGN
1256 static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
1257
1258 static void
1259 _dispatch_kevent_init()
1260 {
1261 unsigned int i;
1262 for (i = 0; i < DSL_HASH_SIZE; i++) {
1263 TAILQ_INIT(&_dispatch_sources[i]);
1264 }
1265 }
1266
1267 static inline uintptr_t
1268 _dispatch_kevent_hash(uint64_t ident, short filter)
1269 {
1270 uint64_t value;
1271 #if HAVE_MACH
1272 value = (filter == EVFILT_MACHPORT ||
1273 filter == DISPATCH_EVFILT_MACH_NOTIFICATION ?
1274 MACH_PORT_INDEX(ident) : ident);
1275 #else
1276 value = ident;
1277 (void)filter;
1278 #endif
1279 return DSL_HASH((uintptr_t)value);
1280 }
1281
1282 static dispatch_kevent_t
1283 _dispatch_kevent_find(uint64_t ident, short filter)
1284 {
1285 uintptr_t hash = _dispatch_kevent_hash(ident, filter);
1286 dispatch_kevent_t dki;
1287
1288 TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
1289 if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
1290 break;
1291 }
1292 }
1293 return dki;
1294 }
1295
1296 static void
1297 _dispatch_kevent_insert(dispatch_kevent_t dk)
1298 {
1299 if (dk->dk_kevent.flags & EV_UDATA_SPECIFIC) return;
1300 uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
1301 dk->dk_kevent.filter);
1302 TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
1303 }
1304
1305 // Find existing kevents, and merge any new flags if necessary
1306 static bool
1307 _dispatch_kevent_register(dispatch_kevent_t *dkp, pthread_priority_t pp,
1308 uint32_t *flgp)
1309 {
1310 dispatch_kevent_t dk = NULL, ds_dkev = *dkp;
1311 uint32_t new_flags;
1312 bool do_resume = false;
1313
1314 if (!(ds_dkev->dk_kevent.flags & EV_UDATA_SPECIFIC)) {
1315 dk = _dispatch_kevent_find(ds_dkev->dk_kevent.ident,
1316 ds_dkev->dk_kevent.filter);
1317 }
1318 if (dk) {
1319 // If an existing dispatch kevent is found, check to see if new flags
1320 // need to be added to the existing kevent
1321 new_flags = ~dk->dk_kevent.fflags & ds_dkev->dk_kevent.fflags;
1322 dk->dk_kevent.fflags |= ds_dkev->dk_kevent.fflags;
1323 free(ds_dkev);
1324 *dkp = dk;
1325 do_resume = new_flags;
1326 } else {
1327 dk = ds_dkev;
1328 #if DISPATCH_USE_KEVENT_WORKQUEUE
1329 if (!_dispatch_kevent_workqueue_enabled) {
1330 // do nothing
1331 } else if (!(dk->dk_kevent.flags & EV_UDATA_SPECIFIC)) {
1332 dk->dk_kevent.qos = _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
1333 } else {
1334 pp &= (~_PTHREAD_PRIORITY_FLAGS_MASK |
1335 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
1336 if (!pp) pp = _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
1337 _dispatch_assert_is_valid_qos_class(pp);
1338 dk->dk_kevent.qos = (_dispatch_kevent_priority_t)pp;
1339 }
1340 #else
1341 (void)pp;
1342 #endif
1343 _dispatch_kevent_insert(dk);
1344 new_flags = dk->dk_kevent.fflags;
1345 do_resume = true;
1346 }
1347 // Re-register the kevent with the kernel if new flags were added
1348 // by the dispatch kevent
1349 if (do_resume) {
1350 dk->dk_kevent.flags |= EV_ADD;
1351 }
1352 *flgp = new_flags;
1353 return do_resume;
1354 }
1355
1356 static long
1357 _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
1358 uint32_t del_flags)
1359 {
1360 long r;
1361 bool oneshot;
1362 if (dk->dk_kevent.flags & EV_DELETE) {
1363 return 0;
1364 }
1365 switch (dk->dk_kevent.filter) {
1366 case DISPATCH_EVFILT_TIMER:
1367 case DISPATCH_EVFILT_CUSTOM_ADD:
1368 case DISPATCH_EVFILT_CUSTOM_OR:
1369 // these types not registered with kevent
1370 return 0;
1371 #if HAVE_MACH
1372 case DISPATCH_EVFILT_MACH_NOTIFICATION:
1373 return _dispatch_kevent_mach_notify_resume(dk, new_flags, del_flags);
1374 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
1375 case EVFILT_MACHPORT:
1376 if (!(dk->dk_kevent.flags & EV_UDATA_SPECIFIC)) {
1377 return _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
1378 }
1379 // fall through
1380 #endif
1381 #endif // HAVE_MACH
1382 default:
1383 // oneshot dk may be freed by the time we return from
1384 // _dispatch_kq_immediate_update if the event was delivered (and then
1385 // unregistered) concurrently.
1386 oneshot = (dk->dk_kevent.flags & EV_ONESHOT);
1387 r = _dispatch_kq_immediate_update(&dk->dk_kevent);
1388 if (r && (dk->dk_kevent.flags & EV_ADD) &&
1389 (dk->dk_kevent.flags & EV_UDATA_SPECIFIC)) {
1390 dk->dk_kevent.flags |= EV_DELETE;
1391 dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE|EV_VANISHED);
1392 } else if (!oneshot && (dk->dk_kevent.flags & EV_DISPATCH)) {
1393 // we can safely skip doing this for ONESHOT events because
1394 // the next kq update we will do is _dispatch_kevent_dispose()
1395 // which also clears EV_ADD.
1396 dk->dk_kevent.flags &= ~(EV_ADD|EV_VANISHED);
1397 }
1398 return r;
1399 }
1400 (void)new_flags; (void)del_flags;
1401 }
1402
1403 static long
1404 _dispatch_kevent_dispose(dispatch_kevent_t dk, unsigned int options)
1405 {
1406 long r = 0;
1407 switch (dk->dk_kevent.filter) {
1408 case DISPATCH_EVFILT_TIMER:
1409 case DISPATCH_EVFILT_CUSTOM_ADD:
1410 case DISPATCH_EVFILT_CUSTOM_OR:
1411 if (dk->dk_kevent.flags & EV_UDATA_SPECIFIC) {
1412 free(dk);
1413 } else {
1414 // these sources live on statically allocated lists
1415 }
1416 return r;
1417 }
1418 if (!(dk->dk_kevent.flags & EV_DELETE)) {
1419 dk->dk_kevent.flags |= EV_DELETE;
1420 dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE|EV_VANISHED);
1421 if (options & DKEV_DISPOSE_IMMEDIATE_DELETE) {
1422 dk->dk_kevent.flags |= EV_ENABLE;
1423 }
1424 switch (dk->dk_kevent.filter) {
1425 #if HAVE_MACH
1426 case DISPATCH_EVFILT_MACH_NOTIFICATION:
1427 r = _dispatch_kevent_mach_notify_resume(dk, 0,dk->dk_kevent.fflags);
1428 break;
1429 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
1430 case EVFILT_MACHPORT:
1431 if (!(dk->dk_kevent.flags & EV_UDATA_SPECIFIC)) {
1432 r = _dispatch_kevent_machport_resume(dk,0,dk->dk_kevent.fflags);
1433 break;
1434 }
1435 // fall through
1436 #endif
1437 #endif
1438 default:
1439 if (options & DKEV_DISPOSE_IMMEDIATE_DELETE) {
1440 _dispatch_kq_deferred_update(&dk->dk_kevent);
1441 } else {
1442 r = _dispatch_kq_immediate_update(&dk->dk_kevent);
1443 }
1444 break;
1445 }
1446 if (options & DKEV_DISPOSE_IMMEDIATE_DELETE) {
1447 dk->dk_kevent.flags &= ~EV_ENABLE;
1448 }
1449 }
1450 if (dk->dk_kevent.flags & EV_UDATA_SPECIFIC) {
1451 bool deferred_delete = (r == EINPROGRESS);
1452 #if DISPATCH_KEVENT_TREAT_ENOENT_AS_EINPROGRESS
1453 if (r == ENOENT) deferred_delete = true;
1454 #endif
1455 if (deferred_delete) {
1456 // deferred EV_DELETE or concurrent concurrent EV_DELETE delivery
1457 dk->dk_kevent.flags &= ~EV_DELETE;
1458 dk->dk_kevent.flags |= EV_ENABLE;
1459 return r;
1460 }
1461 } else {
1462 uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
1463 dk->dk_kevent.filter);
1464 TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list);
1465 }
1466 free(dk);
1467 return r;
1468 }
1469
1470 static long
1471 _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg,
1472 unsigned int options)
1473 {
1474 dispatch_source_refs_t dri;
1475 uint32_t del_flags, fflags = 0;
1476 long r = 0;
1477
1478 if (TAILQ_EMPTY(&dk->dk_sources) ||
1479 (dk->dk_kevent.flags & EV_UDATA_SPECIFIC)) {
1480 r = _dispatch_kevent_dispose(dk, options);
1481 } else {
1482 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
1483 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
1484 uint32_t mask = (uint32_t)dsi->ds_pending_data_mask;
1485 fflags |= mask;
1486 }
1487 del_flags = flg & ~fflags;
1488 if (del_flags) {
1489 dk->dk_kevent.flags |= EV_ADD;
1490 dk->dk_kevent.fflags &= ~del_flags;
1491 r = _dispatch_kevent_resume(dk, 0, del_flags);
1492 }
1493 }
1494 return r;
1495 }
1496
1497 DISPATCH_NOINLINE
1498 static void
1499 _dispatch_kevent_proc_exit(_dispatch_kevent_qos_s *ke)
1500 {
1501 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
1502 // <rdar://problem/5067725>. As a workaround, we simulate an exit event for
1503 // any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
1504 _dispatch_kevent_qos_s fake;
1505 fake = *ke;
1506 fake.flags &= ~EV_ERROR;
1507 fake.flags |= EV_ONESHOT;
1508 fake.fflags = NOTE_EXIT;
1509 fake.data = 0;
1510 _dispatch_kevent_debug("synthetic NOTE_EXIT", ke);
1511 _dispatch_kevent_merge(&fake);
1512 }
1513
1514 DISPATCH_NOINLINE
1515 static void
1516 _dispatch_kevent_error(_dispatch_kevent_qos_s *ke)
1517 {
1518 _dispatch_kevent_qos_s *kev = NULL;
1519
1520 if (ke->flags & EV_DELETE) {
1521 if (ke->flags & EV_UDATA_SPECIFIC) {
1522 if (ke->data == EINPROGRESS) {
1523 // deferred EV_DELETE
1524 return;
1525 }
1526 #if DISPATCH_KEVENT_TREAT_ENOENT_AS_EINPROGRESS
1527 if (ke->data == ENOENT) {
1528 // deferred EV_DELETE
1529 return;
1530 }
1531 #endif
1532 }
1533 // for EV_DELETE if the update was deferred we may have reclaimed
1534 // our dispatch_kevent_t, and it is unsafe to dereference it now.
1535 } else if (ke->udata) {
1536 kev = &((dispatch_kevent_t)ke->udata)->dk_kevent;
1537 ke->flags |= kev->flags;
1538 }
1539
1540 #if HAVE_MACH
1541 if (ke->filter == EVFILT_MACHPORT && ke->data == ENOTSUP &&
1542 (ke->flags & EV_ADD) && _dispatch_evfilt_machport_direct_enabled &&
1543 kev && (kev->fflags & MACH_RCV_MSG)) {
1544 DISPATCH_INTERNAL_CRASH(ke->ident,
1545 "Missing EVFILT_MACHPORT support for ports");
1546 }
1547 #endif
1548
1549 if (ke->data) {
1550 // log the unexpected error
1551 _dispatch_bug_kevent_client("kevent", _evfiltstr(ke->filter),
1552 !ke->udata ? NULL :
1553 ke->flags & EV_DELETE ? "delete" :
1554 ke->flags & EV_ADD ? "add" :
1555 ke->flags & EV_ENABLE ? "enable" : "monitor",
1556 (int)ke->data);
1557 }
1558 }
1559
1560 static void
1561 _dispatch_kevent_drain(_dispatch_kevent_qos_s *ke)
1562 {
1563 if (ke->filter == EVFILT_USER) {
1564 _dispatch_kevent_mgr_debug(ke);
1565 return;
1566 }
1567 if (slowpath(ke->flags & EV_ERROR)) {
1568 if (ke->filter == EVFILT_PROC && ke->data == ESRCH) {
1569 _dispatch_debug("kevent[0x%llx]: ESRCH from EVFILT_PROC: "
1570 "generating fake NOTE_EXIT", (unsigned long long)ke->udata);
1571 return _dispatch_kevent_proc_exit(ke);
1572 }
1573 _dispatch_debug("kevent[0x%llx]: handling error",
1574 (unsigned long long)ke->udata);
1575 return _dispatch_kevent_error(ke);
1576 }
1577 if (ke->filter == EVFILT_TIMER) {
1578 _dispatch_debug("kevent[0x%llx]: handling timer",
1579 (unsigned long long)ke->udata);
1580 return _dispatch_timers_kevent(ke);
1581 }
1582 #if HAVE_MACH
1583 if (ke->filter == EVFILT_MACHPORT) {
1584 _dispatch_debug("kevent[0x%llx]: handling mach port",
1585 (unsigned long long)ke->udata);
1586 return _dispatch_mach_kevent_merge(ke);
1587 }
1588 #endif
1589 return _dispatch_kevent_merge(ke);
1590 }
1591
1592 DISPATCH_NOINLINE
1593 static void
1594 _dispatch_kevent_merge(_dispatch_kevent_qos_s *ke)
1595 {
1596 dispatch_kevent_t dk = (void*)ke->udata;
1597 dispatch_source_refs_t dri, dr_next;
1598
1599 TAILQ_FOREACH_SAFE(dri, &dk->dk_sources, dr_list, dr_next) {
1600 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), ke);
1601 }
1602 }
1603
1604 #pragma mark -
1605 #pragma mark dispatch_source_timer
1606
1607 #if DISPATCH_USE_DTRACE
1608 static dispatch_source_refs_t
1609 _dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT];
1610 #define _dispatch_trace_next_timer_set(x, q) \
1611 _dispatch_trace_next_timer[(q)] = (x)
1612 #define _dispatch_trace_next_timer_program(d, q) \
1613 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1614 #define _dispatch_trace_next_timer_wake(q) \
1615 _dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)])
1616 #else
1617 #define _dispatch_trace_next_timer_set(x, q)
1618 #define _dispatch_trace_next_timer_program(d, q)
1619 #define _dispatch_trace_next_timer_wake(q)
1620 #endif
1621
1622 #define _dispatch_source_timer_telemetry_enabled() false
1623
1624 DISPATCH_NOINLINE
1625 static void
1626 _dispatch_source_timer_telemetry_slow(dispatch_source_t ds,
1627 dispatch_clock_t clock, struct dispatch_timer_source_s *values)
1628 {
1629 if (_dispatch_trace_timer_configure_enabled()) {
1630 _dispatch_trace_timer_configure(ds, clock, values);
1631 }
1632 }
1633
1634 DISPATCH_ALWAYS_INLINE
1635 static inline void
1636 _dispatch_source_timer_telemetry(dispatch_source_t ds, dispatch_clock_t clock,
1637 struct dispatch_timer_source_s *values)
1638 {
1639 if (_dispatch_trace_timer_configure_enabled() ||
1640 _dispatch_source_timer_telemetry_enabled()) {
1641 _dispatch_source_timer_telemetry_slow(ds, clock, values);
1642 asm(""); // prevent tailcall
1643 }
1644 }
1645
1646 static inline unsigned long
1647 _dispatch_source_timer_data(dispatch_source_refs_t dr, unsigned long prev)
1648 {
1649 // calculate the number of intervals since last fire
1650 unsigned long data, missed;
1651 uint64_t now;
1652 now = _dispatch_time_now(DISPATCH_TIMER_CLOCK(_dispatch_source_timer_idx(dr)));
1653 missed = (unsigned long)((now - ds_timer(dr).last_fire) /
1654 ds_timer(dr).interval);
1655 // correct for missed intervals already delivered last time
1656 data = prev - ds_timer(dr).missed + missed;
1657 ds_timer(dr).missed = missed;
1658 return data;
1659 }
1660
1661 struct dispatch_set_timer_params {
1662 dispatch_source_t ds;
1663 struct dispatch_timer_source_s values;
1664 dispatch_clock_t clock;
1665 };
1666
1667 static void
1668 _dispatch_source_set_timer3(void *context)
1669 {
1670 // Called on the _dispatch_mgr_q
1671 struct dispatch_set_timer_params *params = context;
1672 dispatch_source_t ds = params->ds;
1673 dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t)ds->ds_refs;
1674
1675 params->values.flags = ds_timer(dt).flags;
1676 if (params->clock == DISPATCH_CLOCK_WALL) {
1677 params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
1678 #if HAVE_MACH
1679 _dispatch_mach_host_calendar_change_register();
1680 #endif
1681 } else {
1682 params->values.flags &= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK;
1683 }
1684 ds_timer(dt) = params->values;
1685 ds->ds_ident_hack = _dispatch_source_timer_idx(ds->ds_refs);
1686 // Clear any pending data that might have accumulated on
1687 // older timer params <rdar://problem/8574886>
1688 ds->ds_pending_data = 0;
1689
1690 dispatch_resume(ds);
1691 if (_dispatch_source_tryarm(ds)) {
1692 // Re-arm in case we got disarmed because of pending set_timer suspension
1693 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds, dt);
1694 // Must happen after resume to avoid getting disarmed due to suspension
1695 _dispatch_timers_update(ds);
1696 }
1697 dispatch_release(ds);
1698 free(params);
1699 }
1700
1701 static void
1702 _dispatch_source_set_timer2(void *context)
1703 {
1704 // Called on the source queue
1705 struct dispatch_set_timer_params *params = context;
1706 dispatch_suspend(params->ds);
1707 _dispatch_barrier_async_detached_f(&_dispatch_mgr_q, params,
1708 _dispatch_source_set_timer3);
1709 }
1710
1711 DISPATCH_NOINLINE
1712 static struct dispatch_set_timer_params *
1713 _dispatch_source_timer_params(dispatch_source_t ds, dispatch_time_t start,
1714 uint64_t interval, uint64_t leeway)
1715 {
1716 struct dispatch_set_timer_params *params;
1717 params = _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params));
1718 params->ds = ds;
1719
1720 if (interval == 0) {
1721 // we use zero internally to mean disabled
1722 interval = 1;
1723 } else if ((int64_t)interval < 0) {
1724 // 6866347 - make sure nanoseconds won't overflow
1725 interval = INT64_MAX;
1726 }
1727 if ((int64_t)leeway < 0) {
1728 leeway = INT64_MAX;
1729 }
1730 if (start == DISPATCH_TIME_NOW) {
1731 start = _dispatch_absolute_time();
1732 } else if (start == DISPATCH_TIME_FOREVER) {
1733 start = INT64_MAX;
1734 }
1735
1736 if ((int64_t)start < 0) {
1737 // wall clock
1738 start = (dispatch_time_t)-((int64_t)start);
1739 params->clock = DISPATCH_CLOCK_WALL;
1740 } else {
1741 // absolute clock
1742 interval = _dispatch_time_nano2mach(interval);
1743 if (interval < 1) {
1744 // rdar://problem/7287561 interval must be at least one in
1745 // in order to avoid later division by zero when calculating
1746 // the missed interval count. (NOTE: the wall clock's
1747 // interval is already "fixed" to be 1 or more)
1748 interval = 1;
1749 }
1750 leeway = _dispatch_time_nano2mach(leeway);
1751 params->clock = DISPATCH_CLOCK_MACH;
1752 }
1753 params->values.target = start;
1754 params->values.deadline = (start < UINT64_MAX - leeway) ?
1755 start + leeway : UINT64_MAX;
1756 params->values.interval = interval;
1757 params->values.leeway = (interval == INT64_MAX || leeway < interval / 2) ?
1758 leeway : interval / 2;
1759 return params;
1760 }
1761
1762 DISPATCH_ALWAYS_INLINE
1763 static inline void
1764 _dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1765 uint64_t interval, uint64_t leeway, bool source_sync)
1766 {
1767 if (slowpath(!ds->ds_is_timer) ||
1768 slowpath(ds_timer(ds->ds_refs).flags & DISPATCH_TIMER_INTERVAL)) {
1769 DISPATCH_CLIENT_CRASH(ds, "Attempt to set timer on a non-timer source");
1770 }
1771
1772 struct dispatch_set_timer_params *params;
1773 params = _dispatch_source_timer_params(ds, start, interval, leeway);
1774
1775 _dispatch_source_timer_telemetry(ds, params->clock, &params->values);
1776 // Suspend the source so that it doesn't fire with pending changes
1777 // The use of suspend/resume requires the external retain/release
1778 dispatch_retain(ds);
1779 if (source_sync) {
1780 return _dispatch_barrier_trysync_or_async_f(ds->_as_dq, params,
1781 _dispatch_source_set_timer2);
1782 } else {
1783 return _dispatch_source_set_timer2(params);
1784 }
1785 }
1786
1787 void
1788 dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1789 uint64_t interval, uint64_t leeway)
1790 {
1791 _dispatch_source_set_timer(ds, start, interval, leeway, true);
1792 }
1793
1794 void
1795 _dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds,
1796 dispatch_time_t start, uint64_t interval, uint64_t leeway)
1797 {
1798 // Don't serialize through the source queue for CF timers <rdar://13833190>
1799 _dispatch_source_set_timer(ds, start, interval, leeway, false);
1800 }
1801
1802 void
1803 _dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval)
1804 {
1805 #define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1806 // approx 1 year (60s * 60m * 24h * 365d)
1807 #define FOREVER_NSEC 31536000000000000ull
1808
1809 dispatch_source_refs_t dr = ds->ds_refs;
1810 const bool animation = ds_timer(dr).flags & DISPATCH_INTERVAL_UI_ANIMATION;
1811 if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME :
1812 FOREVER_NSEC/NSEC_PER_MSEC))) {
1813 interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC;
1814 } else {
1815 interval = FOREVER_NSEC;
1816 }
1817 interval = _dispatch_time_nano2mach(interval);
1818 uint64_t target = _dispatch_absolute_time() + interval;
1819 target = (target / interval) * interval;
1820 const uint64_t leeway = animation ?
1821 _dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2;
1822 ds_timer(dr).target = target;
1823 ds_timer(dr).deadline = target + leeway;
1824 ds_timer(dr).interval = interval;
1825 ds_timer(dr).leeway = leeway;
1826 dispatch_clock_t clock = DISPATCH_TIMER_CLOCK(ds->ds_ident_hack);
1827 _dispatch_source_timer_telemetry(ds, clock, &ds_timer(dr));
1828 }
1829
1830 #pragma mark -
1831 #pragma mark dispatch_timers
1832
1833 #define DISPATCH_TIMER_STRUCT(refs) \
1834 uint64_t target, deadline; \
1835 TAILQ_HEAD(, refs) dt_sources
1836
1837 typedef struct dispatch_timer_s {
1838 DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s);
1839 } *dispatch_timer_t;
1840
1841 #define DISPATCH_TIMER_INITIALIZER(tidx) \
1842 [tidx] = { \
1843 .target = UINT64_MAX, \
1844 .deadline = UINT64_MAX, \
1845 .dt_sources = TAILQ_HEAD_INITIALIZER( \
1846 _dispatch_timer[tidx].dt_sources), \
1847 }
1848 #define DISPATCH_TIMER_INIT(kind, qos) \
1849 DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1850 DISPATCH_CLOCK_##kind, DISPATCH_TIMER_QOS_##qos))
1851
1852 struct dispatch_timer_s _dispatch_timer[] = {
1853 DISPATCH_TIMER_INIT(WALL, NORMAL),
1854 DISPATCH_TIMER_INIT(WALL, CRITICAL),
1855 DISPATCH_TIMER_INIT(WALL, BACKGROUND),
1856 DISPATCH_TIMER_INIT(MACH, NORMAL),
1857 DISPATCH_TIMER_INIT(MACH, CRITICAL),
1858 DISPATCH_TIMER_INIT(MACH, BACKGROUND),
1859 };
1860 #define DISPATCH_TIMER_COUNT \
1861 ((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0])))
1862
1863 #if __linux__
1864 #define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1865 (void*)&_dispatch_kevent_timer[tidx]
1866 #else
1867 #define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1868 (uintptr_t)&_dispatch_kevent_timer[tidx]
1869 #endif
1870 #ifdef __LP64__
1871 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1872 .udata = DISPATCH_KEVENT_TIMER_UDATA(tidx)
1873 #else // __LP64__
1874 // dynamic initialization in _dispatch_timers_init()
1875 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1876 .udata = 0
1877 #endif // __LP64__
1878 #define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \
1879 [tidx] = { \
1880 .dk_kevent = { \
1881 .ident = tidx, \
1882 .filter = DISPATCH_EVFILT_TIMER, \
1883 DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \
1884 }, \
1885 .dk_sources = TAILQ_HEAD_INITIALIZER( \
1886 _dispatch_kevent_timer[tidx].dk_sources), \
1887 }
1888 #define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \
1889 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1890 DISPATCH_CLOCK_##kind, DISPATCH_TIMER_QOS_##qos))
1891
1892 struct dispatch_kevent_s _dispatch_kevent_timer[] = {
1893 DISPATCH_KEVENT_TIMER_INIT(WALL, NORMAL),
1894 DISPATCH_KEVENT_TIMER_INIT(WALL, CRITICAL),
1895 DISPATCH_KEVENT_TIMER_INIT(WALL, BACKGROUND),
1896 DISPATCH_KEVENT_TIMER_INIT(MACH, NORMAL),
1897 DISPATCH_KEVENT_TIMER_INIT(MACH, CRITICAL),
1898 DISPATCH_KEVENT_TIMER_INIT(MACH, BACKGROUND),
1899 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM),
1900 };
1901 #define DISPATCH_KEVENT_TIMER_COUNT \
1902 ((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0])))
1903
1904 #define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8)
1905 #define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(tidx, note) \
1906 [tidx] = { \
1907 .ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(tidx), \
1908 .filter = EVFILT_TIMER, \
1909 .flags = EV_ONESHOT, \
1910 .fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \
1911 }
1912 #define DISPATCH_KEVENT_TIMEOUT_INIT(kind, qos, note) \
1913 DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_INDEX( \
1914 DISPATCH_CLOCK_##kind, DISPATCH_TIMER_QOS_##qos), note)
1915
1916 _dispatch_kevent_qos_s _dispatch_kevent_timeout[] = {
1917 DISPATCH_KEVENT_TIMEOUT_INIT(WALL, NORMAL, NOTE_MACH_CONTINUOUS_TIME),
1918 DISPATCH_KEVENT_TIMEOUT_INIT(WALL, CRITICAL, NOTE_MACH_CONTINUOUS_TIME | NOTE_CRITICAL),
1919 DISPATCH_KEVENT_TIMEOUT_INIT(WALL, BACKGROUND, NOTE_MACH_CONTINUOUS_TIME | NOTE_BACKGROUND),
1920 DISPATCH_KEVENT_TIMEOUT_INIT(MACH, NORMAL, 0),
1921 DISPATCH_KEVENT_TIMEOUT_INIT(MACH, CRITICAL, NOTE_CRITICAL),
1922 DISPATCH_KEVENT_TIMEOUT_INIT(MACH, BACKGROUND, NOTE_BACKGROUND),
1923 };
1924 #define DISPATCH_KEVENT_TIMEOUT_COUNT \
1925 ((sizeof(_dispatch_kevent_timeout) / sizeof(_dispatch_kevent_timeout[0])))
1926 static_assert(DISPATCH_KEVENT_TIMEOUT_COUNT == DISPATCH_TIMER_INDEX_COUNT - 1,
1927 "should have a kevent for everything but disarm (ddt assumes this)");
1928
1929 #define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
1930 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
1931
1932 static const uint64_t _dispatch_kevent_coalescing_window[] = {
1933 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75),
1934 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1),
1935 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100),
1936 };
1937
1938 #define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \
1939 typeof(dr) dri = NULL; typeof(dt) dti; \
1940 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1941 TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \
1942 if (ds_timer(dr).target < ds_timer(dri).target) { \
1943 break; \
1944 } \
1945 } \
1946 TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \
1947 if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \
1948 break; \
1949 } \
1950 } \
1951 if (dti) { \
1952 TAILQ_INSERT_BEFORE(dti, dt, dt_list); \
1953 } else { \
1954 TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \
1955 } \
1956 } \
1957 if (dri) { \
1958 TAILQ_INSERT_BEFORE(dri, dr, dr_list); \
1959 } else { \
1960 TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \
1961 } \
1962 })
1963
1964 #define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \
1965 ({ \
1966 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1967 TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \
1968 } \
1969 TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \
1970 dr_list); })
1971
1972 #define _dispatch_timers_check(dra, dta) ({ \
1973 unsigned int timerm = _dispatch_timers_mask; \
1974 bool update = false; \
1975 unsigned int tidx; \
1976 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \
1977 if (!(timerm & (1 << tidx))){ \
1978 continue; \
1979 } \
1980 dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \
1981 TAILQ_FIRST(&dra[tidx].dk_sources); \
1982 dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \
1983 TAILQ_FIRST(&dta[tidx].dt_sources); \
1984 uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \
1985 uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \
1986 if (target != dta[tidx].target) { \
1987 dta[tidx].target = target; \
1988 update = true; \
1989 } \
1990 if (deadline != dta[tidx].deadline) { \
1991 dta[tidx].deadline = deadline; \
1992 update = true; \
1993 } \
1994 } \
1995 update; })
1996
1997 static bool _dispatch_timers_reconfigure, _dispatch_timer_expired;
1998 static unsigned int _dispatch_timers_mask;
1999 static bool _dispatch_timers_force_max_leeway;
2000
2001 static void
2002 _dispatch_timers_init(void)
2003 {
2004 #ifndef __LP64__
2005 unsigned int tidx;
2006 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
2007 _dispatch_kevent_timer[tidx].dk_kevent.udata =
2008 DISPATCH_KEVENT_TIMER_UDATA(tidx);
2009 }
2010 #endif // __LP64__
2011 if (slowpath(getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY"))) {
2012 _dispatch_timers_force_max_leeway = true;
2013 }
2014 }
2015
2016 static inline void
2017 _dispatch_timers_unregister(dispatch_source_t ds, dispatch_kevent_t dk)
2018 {
2019 dispatch_source_refs_t dr = ds->ds_refs;
2020 unsigned int tidx = (unsigned int)dk->dk_kevent.ident;
2021
2022 if (slowpath(ds_timer_aggregate(ds))) {
2023 _dispatch_timer_aggregates_unregister(ds, tidx);
2024 }
2025 _dispatch_timers_remove(tidx, dk, _dispatch_kevent_timer, dr, dr_list,
2026 _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
2027 if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
2028 _dispatch_timers_reconfigure = true;
2029 _dispatch_timers_mask |= 1 << tidx;
2030 }
2031 }
2032
2033 // Updates the ordered list of timers based on next fire date for changes to ds.
2034 // Should only be called from the context of _dispatch_mgr_q.
2035 static void
2036 _dispatch_timers_update(dispatch_source_t ds)
2037 {
2038 dispatch_kevent_t dk = ds->ds_dkev;
2039 dispatch_source_refs_t dr = ds->ds_refs;
2040 unsigned int tidx;
2041
2042 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
2043
2044 // Do not reschedule timers unregistered with _dispatch_kevent_unregister()
2045 if (slowpath(!dk)) {
2046 return;
2047 }
2048 // Move timers that are disabled, suspended or have missed intervals to the
2049 // disarmed list, rearm after resume resp. source invoke will reenable them
2050 if (!ds_timer(dr).target || DISPATCH_QUEUE_IS_SUSPENDED(ds) ||
2051 ds->ds_pending_data) {
2052 tidx = DISPATCH_TIMER_INDEX_DISARM;
2053 _dispatch_queue_atomic_flags_clear(ds->_as_dq, DSF_ARMED);
2054 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds,
2055 ds->ds_dkev);
2056 } else {
2057 tidx = _dispatch_source_timer_idx(dr);
2058 }
2059 if (slowpath(ds_timer_aggregate(ds))) {
2060 _dispatch_timer_aggregates_register(ds);
2061 }
2062 if (slowpath(!ds->ds_is_installed)) {
2063 ds->ds_is_installed = true;
2064 if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
2065 _dispatch_queue_atomic_flags_set(ds->_as_dq, DSF_ARMED);
2066 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds,
2067 ds->ds_dkev);
2068 }
2069 _dispatch_object_debug(ds, "%s", __func__);
2070 ds->ds_dkev = NULL;
2071 free(dk);
2072 } else {
2073 _dispatch_timers_unregister(ds, dk);
2074 }
2075 if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
2076 _dispatch_timers_reconfigure = true;
2077 _dispatch_timers_mask |= 1 << tidx;
2078 }
2079 if (dk != &_dispatch_kevent_timer[tidx]){
2080 ds->ds_dkev = &_dispatch_kevent_timer[tidx];
2081 }
2082 _dispatch_timers_insert(tidx, _dispatch_kevent_timer, dr, dr_list,
2083 _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
2084 if (slowpath(ds_timer_aggregate(ds))) {
2085 _dispatch_timer_aggregates_update(ds, tidx);
2086 }
2087 }
2088
2089 static inline void
2090 _dispatch_timers_run2(dispatch_clock_now_cache_t nows, unsigned int tidx)
2091 {
2092 dispatch_source_refs_t dr;
2093 dispatch_source_t ds;
2094 uint64_t now, missed;
2095
2096 now = _dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx), nows);
2097 while ((dr = TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources))) {
2098 ds = _dispatch_source_from_refs(dr);
2099 // We may find timers on the wrong list due to a pending update from
2100 // dispatch_source_set_timer. Force an update of the list in that case.
2101 if (tidx != ds->ds_ident_hack) {
2102 _dispatch_timers_update(ds);
2103 continue;
2104 }
2105 if (!ds_timer(dr).target) {
2106 // No configured timers on the list
2107 break;
2108 }
2109 if (ds_timer(dr).target > now) {
2110 // Done running timers for now.
2111 break;
2112 }
2113 // Remove timers that are suspended or have missed intervals from the
2114 // list, rearm after resume resp. source invoke will reenable them
2115 if (DISPATCH_QUEUE_IS_SUSPENDED(ds) || ds->ds_pending_data) {
2116 _dispatch_timers_update(ds);
2117 continue;
2118 }
2119 // Calculate number of missed intervals.
2120 missed = (now - ds_timer(dr).target) / ds_timer(dr).interval;
2121 if (++missed > INT_MAX) {
2122 missed = INT_MAX;
2123 }
2124 if (ds_timer(dr).interval < INT64_MAX) {
2125 ds_timer(dr).target += missed * ds_timer(dr).interval;
2126 ds_timer(dr).deadline = ds_timer(dr).target + ds_timer(dr).leeway;
2127 } else {
2128 ds_timer(dr).target = UINT64_MAX;
2129 ds_timer(dr).deadline = UINT64_MAX;
2130 }
2131 _dispatch_timers_update(ds);
2132 ds_timer(dr).last_fire = now;
2133
2134 unsigned long data;
2135 data = os_atomic_add2o(ds, ds_pending_data,
2136 (unsigned long)missed, relaxed);
2137 _dispatch_trace_timer_fire(dr, data, (unsigned long)missed);
2138 dx_wakeup(ds, 0, DISPATCH_WAKEUP_FLUSH);
2139 if (ds_timer(dr).flags & DISPATCH_TIMER_AFTER) {
2140 _dispatch_source_kevent_unregister(ds);
2141 }
2142 }
2143 }
2144
2145 DISPATCH_NOINLINE
2146 static void
2147 _dispatch_timers_run(dispatch_clock_now_cache_t nows)
2148 {
2149 unsigned int tidx;
2150 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
2151 if (!TAILQ_EMPTY(&_dispatch_kevent_timer[tidx].dk_sources)) {
2152 _dispatch_timers_run2(nows, tidx);
2153 }
2154 }
2155 }
2156
2157 #define DISPATCH_TIMERS_GET_DELAY_ALL (~0u)
2158
2159 static inline unsigned int
2160 _dispatch_timers_get_delay(dispatch_clock_now_cache_t nows,
2161 struct dispatch_timer_s timer[],
2162 uint64_t *delay, uint64_t *leeway, unsigned int query)
2163 {
2164 unsigned int tidx, ridx = DISPATCH_TIMER_COUNT, minidx, maxidx;
2165 uint64_t tmp, delta = INT64_MAX, dldelta = INT64_MAX;
2166
2167 if (query == DISPATCH_TIMERS_GET_DELAY_ALL) {
2168 minidx = 0;
2169 maxidx = DISPATCH_TIMER_COUNT - 1;
2170 } else {
2171 minidx = maxidx = query;
2172 }
2173
2174 for (tidx = minidx; tidx <= maxidx; tidx++) {
2175 dispatch_clock_t clock = DISPATCH_TIMER_CLOCK(tidx);
2176 uint64_t target = timer[tidx].target;
2177 if (target >= INT64_MAX) {
2178 continue;
2179 }
2180 uint64_t deadline = timer[tidx].deadline;
2181 if (query != DISPATCH_TIMERS_GET_DELAY_ALL) {
2182 // Timer pre-coalescing <rdar://problem/13222034>
2183 unsigned int qos = DISPATCH_TIMER_QOS(tidx);
2184 uint64_t window = _dispatch_kevent_coalescing_window[qos];
2185 uint64_t latest = deadline > window ? deadline - window : 0;
2186 dispatch_source_refs_t dri;
2187 TAILQ_FOREACH(dri, &_dispatch_kevent_timer[tidx].dk_sources,
2188 dr_list) {
2189 tmp = ds_timer(dri).target;
2190 if (tmp > latest) break;
2191 target = tmp;
2192 }
2193 }
2194 uint64_t now = _dispatch_time_now_cached(clock, nows);
2195 if (target <= now) {
2196 delta = 0;
2197 break;
2198 }
2199 tmp = target - now;
2200 if (clock != DISPATCH_CLOCK_WALL) {
2201 tmp = _dispatch_time_mach2nano(tmp);
2202 }
2203 if (tmp < INT64_MAX && tmp < delta) {
2204 ridx = tidx;
2205 delta = tmp;
2206 }
2207 dispatch_assert(target <= deadline);
2208 tmp = deadline - now;
2209 if (clock != DISPATCH_CLOCK_WALL) {
2210 tmp = _dispatch_time_mach2nano(tmp);
2211 }
2212 if (tmp < INT64_MAX && tmp < dldelta) {
2213 dldelta = tmp;
2214 }
2215 }
2216 *delay = delta;
2217 *leeway = delta && delta < INT64_MAX ? dldelta - delta : INT64_MAX;
2218 return ridx;
2219 }
2220
2221
2222 #ifdef __linux__
2223 // in linux we map the _dispatch_kevent_qos_s to struct kevent instead
2224 // of struct kevent64. We loose the kevent.ext[] members and the time
2225 // out is based on relavite msec based time vs. absolute nsec based time.
2226 // For now we make the adjustments right here until the solution
2227 // to either extend libkqueue with a proper kevent64 API or removing kevent
2228 // all together and move to a lower API (e.g. epoll or kernel_module.
2229 // Also leeway is ignored.
2230
2231 static void
2232 _dispatch_kevent_timer_set_delay(_dispatch_kevent_qos_s *ke, uint64_t delay,
2233 uint64_t leeway, dispatch_clock_now_cache_t nows)
2234 {
2235 // call to update nows[]
2236 _dispatch_time_now_cached(DISPATCH_CLOCK_WALL, nows);
2237 #ifdef KEVENT_NSEC_NOT_SUPPORTED
2238 // adjust nsec based delay to msec based and ignore leeway
2239 delay /= 1000000L;
2240 if ((int64_t)(delay) <= 0) {
2241 delay = 1; // if value <= 0 the dispatch will stop
2242 }
2243 #else
2244 ke->fflags |= NOTE_NSECONDS;
2245 #endif
2246 ke->data = (int64_t)delay;
2247 }
2248
2249 #else
2250 static void
2251 _dispatch_kevent_timer_set_delay(_dispatch_kevent_qos_s *ke, uint64_t delay,
2252 uint64_t leeway, dispatch_clock_now_cache_t nows)
2253 {
2254 delay += _dispatch_time_now_cached(DISPATCH_CLOCK_WALL, nows);
2255 if (slowpath(_dispatch_timers_force_max_leeway)) {
2256 ke->data = (int64_t)(delay + leeway);
2257 ke->ext[1] = 0;
2258 } else {
2259 ke->data = (int64_t)delay;
2260 ke->ext[1] = leeway;
2261 }
2262 }
2263 #endif // __linux__
2264
2265 static bool
2266 _dispatch_timers_program2(dispatch_clock_now_cache_t nows,
2267 _dispatch_kevent_qos_s *ke, unsigned int tidx)
2268 {
2269 bool poll;
2270 uint64_t delay, leeway;
2271
2272 _dispatch_timers_get_delay(nows, _dispatch_timer, &delay, &leeway, tidx);
2273 poll = (delay == 0);
2274 if (poll || delay == UINT64_MAX) {
2275 _dispatch_trace_next_timer_set(NULL, DISPATCH_TIMER_QOS(tidx));
2276 if (!ke->data) {
2277 return poll;
2278 }
2279 ke->data = 0;
2280 ke->flags |= EV_DELETE;
2281 ke->flags &= ~(EV_ADD|EV_ENABLE);
2282 } else {
2283 _dispatch_trace_next_timer_set(
2284 TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources), DISPATCH_TIMER_QOS(tidx));
2285 _dispatch_trace_next_timer_program(delay, DISPATCH_TIMER_QOS(tidx));
2286 _dispatch_kevent_timer_set_delay(ke, delay, leeway, nows);
2287 ke->flags |= EV_ADD|EV_ENABLE;
2288 ke->flags &= ~EV_DELETE;
2289 #if DISPATCH_USE_KEVENT_WORKQUEUE
2290 if (_dispatch_kevent_workqueue_enabled) {
2291 ke->qos = _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
2292 }
2293 #endif
2294 }
2295 _dispatch_kq_deferred_update(ke);
2296 return poll;
2297 }
2298
2299 DISPATCH_NOINLINE
2300 static bool
2301 _dispatch_timers_program(dispatch_clock_now_cache_t nows)
2302 {
2303 bool poll = false;
2304 unsigned int tidx, timerm = _dispatch_timers_mask;
2305 for (tidx = 0; tidx < DISPATCH_KEVENT_TIMEOUT_COUNT; tidx++) {
2306 if (!(timerm & 1 << tidx)){
2307 continue;
2308 }
2309 poll |= _dispatch_timers_program2(nows, &_dispatch_kevent_timeout[tidx],
2310 tidx);
2311 }
2312 return poll;
2313 }
2314
2315 DISPATCH_NOINLINE
2316 static bool
2317 _dispatch_timers_configure(void)
2318 {
2319 _dispatch_timer_aggregates_check();
2320 // Find out if there is a new target/deadline on the timer lists
2321 return _dispatch_timers_check(_dispatch_kevent_timer, _dispatch_timer);
2322 }
2323
2324 #if HAVE_MACH
2325 static void
2326 _dispatch_timers_calendar_change(void)
2327 {
2328 unsigned int qos;
2329
2330 // calendar change may have gone past the wallclock deadline
2331 _dispatch_timer_expired = true;
2332 for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) {
2333 _dispatch_timers_mask |=
2334 1 << DISPATCH_TIMER_INDEX(DISPATCH_CLOCK_WALL, qos);
2335 }
2336 }
2337 #endif
2338
2339 static void
2340 _dispatch_timers_kevent(_dispatch_kevent_qos_s *ke)
2341 {
2342 dispatch_assert(ke->data > 0);
2343 dispatch_assert((ke->ident & DISPATCH_KEVENT_TIMEOUT_IDENT_MASK) ==
2344 DISPATCH_KEVENT_TIMEOUT_IDENT_MASK);
2345 unsigned int tidx = ke->ident & ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK;
2346 dispatch_assert(tidx < DISPATCH_KEVENT_TIMEOUT_COUNT);
2347 dispatch_assert(_dispatch_kevent_timeout[tidx].data != 0);
2348 _dispatch_kevent_timeout[tidx].data = 0; // kevent deleted via EV_ONESHOT
2349 _dispatch_timer_expired = true;
2350 _dispatch_timers_mask |= 1 << tidx;
2351 _dispatch_trace_next_timer_wake(DISPATCH_TIMER_QOS(tidx));
2352 }
2353
2354 static inline bool
2355 _dispatch_mgr_timers(void)
2356 {
2357 dispatch_clock_now_cache_s nows = { };
2358 bool expired = slowpath(_dispatch_timer_expired);
2359 if (expired) {
2360 _dispatch_timers_run(&nows);
2361 }
2362 bool reconfigure = slowpath(_dispatch_timers_reconfigure);
2363 if (reconfigure || expired) {
2364 if (reconfigure) {
2365 reconfigure = _dispatch_timers_configure();
2366 _dispatch_timers_reconfigure = false;
2367 }
2368 if (reconfigure || expired) {
2369 expired = _dispatch_timer_expired = _dispatch_timers_program(&nows);
2370 expired = expired || _dispatch_mgr_q.dq_items_tail;
2371 }
2372 _dispatch_timers_mask = 0;
2373 }
2374 return expired;
2375 }
2376
2377 #pragma mark -
2378 #pragma mark dispatch_timer_aggregate
2379
2380 typedef struct {
2381 TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s) dk_sources;
2382 } dispatch_timer_aggregate_refs_s;
2383
2384 typedef struct dispatch_timer_aggregate_s {
2385 DISPATCH_QUEUE_HEADER(queue);
2386 TAILQ_ENTRY(dispatch_timer_aggregate_s) dta_list;
2387 dispatch_timer_aggregate_refs_s
2388 dta_kevent_timer[DISPATCH_KEVENT_TIMER_COUNT];
2389 struct {
2390 DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s);
2391 } dta_timer[DISPATCH_TIMER_COUNT];
2392 struct dispatch_timer_s dta_timer_data[DISPATCH_TIMER_COUNT];
2393 unsigned int dta_refcount;
2394 } DISPATCH_QUEUE_ALIGN dispatch_timer_aggregate_s;
2395
2396 typedef TAILQ_HEAD(, dispatch_timer_aggregate_s) dispatch_timer_aggregates_s;
2397 static dispatch_timer_aggregates_s _dispatch_timer_aggregates =
2398 TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates);
2399
2400 dispatch_timer_aggregate_t
2401 dispatch_timer_aggregate_create(void)
2402 {
2403 unsigned int tidx;
2404 dispatch_timer_aggregate_t dta = _dispatch_alloc(DISPATCH_VTABLE(queue),
2405 sizeof(struct dispatch_timer_aggregate_s));
2406 _dispatch_queue_init(dta->_as_dq, DQF_NONE,
2407 DISPATCH_QUEUE_WIDTH_MAX, false);
2408 dta->do_targetq = _dispatch_get_root_queue(
2409 _DISPATCH_QOS_CLASS_USER_INITIATED, true);
2410 //FIXME: aggregates need custom vtable
2411 //dta->dq_label = "timer-aggregate";
2412 for (tidx = 0; tidx < DISPATCH_KEVENT_TIMER_COUNT; tidx++) {
2413 TAILQ_INIT(&dta->dta_kevent_timer[tidx].dk_sources);
2414 }
2415 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
2416 TAILQ_INIT(&dta->dta_timer[tidx].dt_sources);
2417 dta->dta_timer[tidx].target = UINT64_MAX;
2418 dta->dta_timer[tidx].deadline = UINT64_MAX;
2419 dta->dta_timer_data[tidx].target = UINT64_MAX;
2420 dta->dta_timer_data[tidx].deadline = UINT64_MAX;
2421 }
2422 return (dispatch_timer_aggregate_t)_dispatch_introspection_queue_create(
2423 dta->_as_dq);
2424 }
2425
2426 typedef struct dispatch_timer_delay_s {
2427 dispatch_timer_t timer;
2428 uint64_t delay, leeway;
2429 } *dispatch_timer_delay_t;
2430
2431 static void
2432 _dispatch_timer_aggregate_get_delay(void *ctxt)
2433 {
2434 dispatch_timer_delay_t dtd = ctxt;
2435 dispatch_clock_now_cache_s nows = { };
2436 _dispatch_timers_get_delay(&nows, dtd->timer, &dtd->delay, &dtd->leeway,
2437 DISPATCH_TIMERS_GET_DELAY_ALL);
2438 }
2439
2440 uint64_t
2441 dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta,
2442 uint64_t *leeway_ptr)
2443 {
2444 struct dispatch_timer_delay_s dtd = {
2445 .timer = dta->dta_timer_data,
2446 };
2447 dispatch_sync_f(dta->_as_dq, &dtd, _dispatch_timer_aggregate_get_delay);
2448 if (leeway_ptr) {
2449 *leeway_ptr = dtd.leeway;
2450 }
2451 return dtd.delay;
2452 }
2453
2454 static void
2455 _dispatch_timer_aggregate_update(void *ctxt)
2456 {
2457 dispatch_timer_aggregate_t dta = (void*)_dispatch_queue_get_current();
2458 dispatch_timer_t dtau = ctxt;
2459 unsigned int tidx;
2460 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
2461 dta->dta_timer_data[tidx].target = dtau[tidx].target;
2462 dta->dta_timer_data[tidx].deadline = dtau[tidx].deadline;
2463 }
2464 free(dtau);
2465 }
2466
2467 DISPATCH_NOINLINE
2468 static void
2469 _dispatch_timer_aggregates_configure(void)
2470 {
2471 dispatch_timer_aggregate_t dta;
2472 dispatch_timer_t dtau;
2473 TAILQ_FOREACH(dta, &_dispatch_timer_aggregates, dta_list) {
2474 if (!_dispatch_timers_check(dta->dta_kevent_timer, dta->dta_timer)) {
2475 continue;
2476 }
2477 dtau = _dispatch_calloc(DISPATCH_TIMER_COUNT, sizeof(*dtau));
2478 memcpy(dtau, dta->dta_timer, sizeof(dta->dta_timer));
2479 _dispatch_barrier_async_detached_f(dta->_as_dq, dtau,
2480 _dispatch_timer_aggregate_update);
2481 }
2482 }
2483
2484 static inline void
2485 _dispatch_timer_aggregates_check(void)
2486 {
2487 if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates))) {
2488 return;
2489 }
2490 _dispatch_timer_aggregates_configure();
2491 }
2492
2493 static void
2494 _dispatch_timer_aggregates_register(dispatch_source_t ds)
2495 {
2496 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
2497 if (!dta->dta_refcount++) {
2498 TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates, dta, dta_list);
2499 }
2500 }
2501
2502 DISPATCH_NOINLINE
2503 static void
2504 _dispatch_timer_aggregates_update(dispatch_source_t ds, unsigned int tidx)
2505 {
2506 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
2507 dispatch_timer_source_aggregate_refs_t dr;
2508 dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
2509 _dispatch_timers_insert(tidx, dta->dta_kevent_timer, dr, dra_list,
2510 dta->dta_timer, dr, dta_list);
2511 }
2512
2513 DISPATCH_NOINLINE
2514 static void
2515 _dispatch_timer_aggregates_unregister(dispatch_source_t ds, unsigned int tidx)
2516 {
2517 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
2518 dispatch_timer_source_aggregate_refs_t dr;
2519 dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
2520 _dispatch_timers_remove(tidx, (dispatch_timer_aggregate_refs_s*)NULL,
2521 dta->dta_kevent_timer, dr, dra_list, dta->dta_timer, dr, dta_list);
2522 if (!--dta->dta_refcount) {
2523 TAILQ_REMOVE(&_dispatch_timer_aggregates, dta, dta_list);
2524 }
2525 }
2526
2527 #pragma mark -
2528 #pragma mark dispatch_kqueue
2529
2530 static int _dispatch_kq;
2531
2532 #if DISPATCH_DEBUG_QOS && DISPATCH_USE_KEVENT_WORKQUEUE
2533 #define _dispatch_kevent_assert_valid_qos(ke) ({ \
2534 if (_dispatch_kevent_workqueue_enabled) { \
2535 const _dispatch_kevent_qos_s *_ke = (ke); \
2536 if (_ke->flags & (EV_ADD|EV_ENABLE)) { \
2537 _dispatch_assert_is_valid_qos_class(\
2538 (pthread_priority_t)_ke->qos); \
2539 dispatch_assert(_ke->qos); \
2540 } \
2541 } \
2542 })
2543 #else
2544 #define _dispatch_kevent_assert_valid_qos(ke) ((void)ke)
2545 #endif
2546
2547
2548 static void
2549 _dispatch_kq_init(void *context DISPATCH_UNUSED)
2550 {
2551 _dispatch_fork_becomes_unsafe();
2552 #if DISPATCH_USE_KEVENT_WORKQUEUE
2553 _dispatch_kevent_workqueue_init();
2554 if (_dispatch_kevent_workqueue_enabled) {
2555 int r;
2556 const _dispatch_kevent_qos_s kev[] = {
2557 [0] = {
2558 .ident = 1,
2559 .filter = EVFILT_USER,
2560 .flags = EV_ADD|EV_CLEAR,
2561 .qos = _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG,
2562 },
2563 [1] = {
2564 .ident = 1,
2565 .filter = EVFILT_USER,
2566 .fflags = NOTE_TRIGGER,
2567 },
2568 };
2569 _dispatch_kq = -1;
2570 retry:
2571 r = kevent_qos(-1, kev, 2, NULL, 0, NULL, NULL,
2572 KEVENT_FLAG_WORKQ|KEVENT_FLAG_IMMEDIATE);
2573 if (slowpath(r == -1)) {
2574 int err = errno;
2575 switch (err) {
2576 case EINTR:
2577 goto retry;
2578 default:
2579 DISPATCH_CLIENT_CRASH(err,
2580 "Failed to initalize workqueue kevent");
2581 break;
2582 }
2583 }
2584 return;
2585 }
2586 #endif // DISPATCH_USE_KEVENT_WORKQUEUE
2587 #if DISPATCH_USE_MGR_THREAD
2588 static const _dispatch_kevent_qos_s kev = {
2589 .ident = 1,
2590 .filter = EVFILT_USER,
2591 .flags = EV_ADD|EV_CLEAR,
2592 };
2593
2594 _dispatch_fork_becomes_unsafe();
2595 #if DISPATCH_USE_GUARDED_FD
2596 guardid_t guard = (uintptr_t)&kev;
2597 _dispatch_kq = guarded_kqueue_np(&guard, GUARD_CLOSE | GUARD_DUP);
2598 #else
2599 _dispatch_kq = kqueue();
2600 #endif
2601 if (_dispatch_kq == -1) {
2602 int err = errno;
2603 switch (err) {
2604 case EMFILE:
2605 DISPATCH_CLIENT_CRASH(err, "kqueue() failure: "
2606 "process is out of file descriptors");
2607 break;
2608 case ENFILE:
2609 DISPATCH_CLIENT_CRASH(err, "kqueue() failure: "
2610 "system is out of file descriptors");
2611 break;
2612 case ENOMEM:
2613 DISPATCH_CLIENT_CRASH(err, "kqueue() failure: "
2614 "kernel is out of memory");
2615 break;
2616 default:
2617 DISPATCH_INTERNAL_CRASH(err, "kqueue() failure");
2618 break;
2619 }
2620 }
2621 (void)dispatch_assume_zero(kevent_qos(_dispatch_kq, &kev, 1, NULL, 0, NULL,
2622 NULL, 0));
2623 _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q, 0);
2624 #endif // DISPATCH_USE_MGR_THREAD
2625 }
2626
2627 DISPATCH_NOINLINE
2628 static long
2629 _dispatch_kq_update(const _dispatch_kevent_qos_s *ke, int n)
2630 {
2631 int i, r;
2632 _dispatch_kevent_qos_s kev_error[n];
2633 static dispatch_once_t pred;
2634 dispatch_once_f(&pred, NULL, _dispatch_kq_init);
2635
2636 for (i = 0; i < n; i++) {
2637 if (ke[i].filter != EVFILT_USER || DISPATCH_MGR_QUEUE_DEBUG) {
2638 _dispatch_kevent_debug_n("updating", ke + i, i, n);
2639 }
2640 }
2641
2642 unsigned int flags = KEVENT_FLAG_ERROR_EVENTS;
2643 #if DISPATCH_USE_KEVENT_WORKQUEUE
2644 if (_dispatch_kevent_workqueue_enabled) {
2645 flags |= KEVENT_FLAG_WORKQ;
2646 }
2647 #endif
2648
2649 retry:
2650 r = kevent_qos(_dispatch_kq, ke, n, kev_error, n, NULL, NULL, flags);
2651 if (slowpath(r == -1)) {
2652 int err = errno;
2653 switch (err) {
2654 case EINTR:
2655 goto retry;
2656 case EBADF:
2657 DISPATCH_CLIENT_CRASH(err, "Do not close random Unix descriptors");
2658 break;
2659 default:
2660 (void)dispatch_assume_zero(err);
2661 break;
2662 }
2663 return err;
2664 }
2665 for (i = 0, n = r; i < n; i++) {
2666 if (kev_error[i].flags & EV_ERROR) {
2667 _dispatch_kevent_debug("returned error", &kev_error[i]);
2668 _dispatch_kevent_drain(&kev_error[i]);
2669 r = (int)kev_error[i].data;
2670 } else {
2671 _dispatch_kevent_mgr_debug(&kev_error[i]);
2672 r = 0;
2673 }
2674 }
2675 return r;
2676 }
2677
2678 DISPATCH_ALWAYS_INLINE
2679 static void
2680 _dispatch_kq_update_all(const _dispatch_kevent_qos_s *kev, int n)
2681 {
2682 (void)_dispatch_kq_update(kev, n);
2683 }
2684
2685 DISPATCH_ALWAYS_INLINE
2686 static long
2687 _dispatch_kq_update_one(const _dispatch_kevent_qos_s *kev)
2688 {
2689 return _dispatch_kq_update(kev, 1);
2690 }
2691
2692 static inline bool
2693 _dispatch_kevent_maps_to_same_knote(const _dispatch_kevent_qos_s *e1,
2694 const _dispatch_kevent_qos_s *e2)
2695 {
2696 return e1->filter == e2->filter &&
2697 e1->ident == e2->ident &&
2698 e1->udata == e2->udata;
2699 }
2700
2701 static inline int
2702 _dispatch_deferred_event_find_slot(dispatch_deferred_items_t ddi,
2703 const _dispatch_kevent_qos_s *ke)
2704 {
2705 _dispatch_kevent_qos_s *events = ddi->ddi_eventlist;
2706 int i;
2707
2708 for (i = 0; i < ddi->ddi_nevents; i++) {
2709 if (_dispatch_kevent_maps_to_same_knote(&events[i], ke)) {
2710 break;
2711 }
2712 }
2713 return i;
2714 }
2715
2716 static void
2717 _dispatch_kq_deferred_update(const _dispatch_kevent_qos_s *ke)
2718 {
2719 dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
2720 int slot;
2721
2722 _dispatch_kevent_assert_valid_qos(ke);
2723 if (ddi) {
2724 if (unlikely(ddi->ddi_nevents == ddi->ddi_maxevents)) {
2725 _dispatch_deferred_items_set(NULL);
2726 _dispatch_kq_update_all(ddi->ddi_eventlist, ddi->ddi_nevents);
2727 ddi->ddi_nevents = 0;
2728 _dispatch_deferred_items_set(ddi);
2729 }
2730 if (ke->filter != EVFILT_USER || DISPATCH_MGR_QUEUE_DEBUG) {
2731 _dispatch_kevent_debug("deferred", ke);
2732 }
2733 bool needs_enable = false;
2734 slot = _dispatch_deferred_event_find_slot(ddi, ke);
2735 if (slot == ddi->ddi_nevents) {
2736 ddi->ddi_nevents++;
2737 } else if (ke->flags & EV_DELETE) {
2738 // <rdar://problem/26202376> when deleting and an enable is pending,
2739 // we must merge EV_ENABLE to do an immediate deletion
2740 needs_enable = (ddi->ddi_eventlist[slot].flags & EV_ENABLE);
2741 }
2742 ddi->ddi_eventlist[slot] = *ke;
2743 if (needs_enable) {
2744 ddi->ddi_eventlist[slot].flags |= EV_ENABLE;
2745 }
2746 } else {
2747 _dispatch_kq_update_one(ke);
2748 }
2749 }
2750
2751 static long
2752 _dispatch_kq_immediate_update(_dispatch_kevent_qos_s *ke)
2753 {
2754 dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
2755 int slot, last;
2756
2757 _dispatch_kevent_assert_valid_qos(ke);
2758 if (ddi) {
2759 _dispatch_kevent_qos_s *events = ddi->ddi_eventlist;
2760 slot = _dispatch_deferred_event_find_slot(ddi, ke);
2761 if (slot < ddi->ddi_nevents) {
2762 // <rdar://problem/26202376> when deleting and an enable is pending,
2763 // we must merge EV_ENABLE to do an immediate deletion
2764 if ((ke->flags & EV_DELETE) && (events[slot].flags & EV_ENABLE)) {
2765 ke->flags |= EV_ENABLE;
2766 }
2767 last = --ddi->ddi_nevents;
2768 if (slot != last) {
2769 events[slot] = events[last];
2770 }
2771 }
2772 }
2773 return _dispatch_kq_update_one(ke);
2774 }
2775
2776 #pragma mark -
2777 #pragma mark dispatch_mgr
2778
2779 DISPATCH_NOINLINE
2780 static void
2781 _dispatch_mgr_queue_poke(dispatch_queue_t dq DISPATCH_UNUSED,
2782 pthread_priority_t pp DISPATCH_UNUSED)
2783 {
2784 static const _dispatch_kevent_qos_s kev = {
2785 .ident = 1,
2786 .filter = EVFILT_USER,
2787 .fflags = NOTE_TRIGGER,
2788 };
2789
2790 #if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG
2791 _dispatch_debug("waking up the dispatch manager queue: %p", dq);
2792 #endif
2793 _dispatch_kq_deferred_update(&kev);
2794 }
2795
2796 void
2797 _dispatch_mgr_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
2798 dispatch_wakeup_flags_t flags)
2799 {
2800 if (flags & DISPATCH_WAKEUP_FLUSH) {
2801 os_atomic_or2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, release);
2802 }
2803
2804 if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
2805 return;
2806 }
2807
2808 if (!_dispatch_queue_class_probe(&_dispatch_mgr_q)) {
2809 return;
2810 }
2811
2812 _dispatch_mgr_queue_poke(dq, pp);
2813 }
2814
2815 DISPATCH_NOINLINE
2816 static void
2817 _dispatch_event_init(void)
2818 {
2819 _dispatch_kevent_init();
2820 _dispatch_timers_init();
2821 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
2822 _dispatch_mach_recv_msg_buf_init();
2823 #endif
2824 _dispatch_memorypressure_init();
2825 _voucher_activity_debug_channel_init();
2826 }
2827
2828 #if DISPATCH_USE_MGR_THREAD
2829 DISPATCH_NOINLINE
2830 static void
2831 _dispatch_mgr_init(void)
2832 {
2833 uint64_t owned = DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
2834 _dispatch_queue_set_current(&_dispatch_mgr_q);
2835 if (_dispatch_queue_drain_try_lock(&_dispatch_mgr_q,
2836 DISPATCH_INVOKE_STEALING, NULL) != owned) {
2837 DISPATCH_INTERNAL_CRASH(0, "Locking the manager should not fail");
2838 }
2839 _dispatch_mgr_priority_init();
2840 _dispatch_event_init();
2841 }
2842
2843 DISPATCH_NOINLINE
2844 static bool
2845 _dispatch_mgr_wait_for_event(dispatch_deferred_items_t ddi, bool poll)
2846 {
2847 int r;
2848 dispatch_assert((size_t)ddi->ddi_maxevents < countof(ddi->ddi_eventlist));
2849
2850 retry:
2851 r = kevent_qos(_dispatch_kq, ddi->ddi_eventlist, ddi->ddi_nevents,
2852 ddi->ddi_eventlist + ddi->ddi_maxevents, 1, NULL, NULL,
2853 poll ? KEVENT_FLAG_IMMEDIATE : KEVENT_FLAG_NONE);
2854 if (slowpath(r == -1)) {
2855 int err = errno;
2856 switch (err) {
2857 case EINTR:
2858 goto retry;
2859 case EBADF:
2860 DISPATCH_CLIENT_CRASH(err, "Do not close random Unix descriptors");
2861 break;
2862 default:
2863 (void)dispatch_assume_zero(err);
2864 break;
2865 }
2866 }
2867 ddi->ddi_nevents = 0;
2868 return r > 0;
2869 }
2870
2871 DISPATCH_NOINLINE DISPATCH_NORETURN
2872 static void
2873 _dispatch_mgr_invoke(void)
2874 {
2875 dispatch_deferred_items_s ddi;
2876 bool poll;
2877
2878 ddi.ddi_magic = DISPATCH_DEFERRED_ITEMS_MAGIC;
2879 ddi.ddi_stashed_pp = _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
2880 ddi.ddi_nevents = 0;
2881 ddi.ddi_maxevents = 1;
2882
2883 _dispatch_deferred_items_set(&ddi);
2884
2885 for (;;) {
2886 _dispatch_mgr_queue_drain();
2887 poll = _dispatch_mgr_timers();
2888 poll = poll || _dispatch_queue_class_probe(&_dispatch_mgr_q);
2889 if (_dispatch_mgr_wait_for_event(&ddi, poll)) {
2890 _dispatch_kevent_qos_s *ke = ddi.ddi_eventlist + ddi.ddi_maxevents;
2891 _dispatch_kevent_debug("received", ke);
2892 _dispatch_kevent_drain(ke);
2893 }
2894 }
2895 }
2896 #endif // DISPATCH_USE_MGR_THREAD
2897
2898 DISPATCH_NORETURN
2899 void
2900 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED,
2901 dispatch_invoke_flags_t flags DISPATCH_UNUSED)
2902 {
2903 #if DISPATCH_USE_KEVENT_WORKQUEUE
2904 if (_dispatch_kevent_workqueue_enabled) {
2905 DISPATCH_INTERNAL_CRASH(0, "Manager queue invoked with "
2906 "kevent workqueue enabled");
2907 }
2908 #endif
2909 #if DISPATCH_USE_MGR_THREAD
2910 _dispatch_mgr_init();
2911 // never returns, so burn bridges behind us & clear stack 2k ahead
2912 _dispatch_clear_stack(2048);
2913 _dispatch_mgr_invoke();
2914 #endif
2915 }
2916
2917 #if DISPATCH_USE_KEVENT_WORKQUEUE
2918
2919 #define DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER ((pthread_priority_t)(~0ul))
2920
2921 DISPATCH_ALWAYS_INLINE
2922 static inline pthread_priority_t
2923 _dispatch_kevent_worker_thread_init(dispatch_deferred_items_t ddi)
2924 {
2925 uint64_t owned = DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
2926
2927 ddi->ddi_magic = DISPATCH_DEFERRED_ITEMS_MAGIC;
2928 ddi->ddi_nevents = 0;
2929 ddi->ddi_maxevents = countof(ddi->ddi_eventlist);
2930 ddi->ddi_stashed_pp = _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
2931
2932 pthread_priority_t pp = _dispatch_get_priority();
2933 if (!(pp & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) {
2934 // If this thread does not have the event manager flag set, don't setup
2935 // as the dispatch manager and let the caller know to only process
2936 // the delivered events.
2937 //
2938 // Also add the NEEDS_UNBIND flag so that
2939 // _dispatch_priority_compute_update knows it has to unbind
2940 pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
2941 pp |= _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
2942 _dispatch_thread_setspecific(dispatch_priority_key,
2943 (void *)(uintptr_t)pp);
2944 ddi->ddi_stashed_pp = 0;
2945 return DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER;
2946 }
2947
2948 if ((pp & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) ||
2949 !(pp & ~_PTHREAD_PRIORITY_FLAGS_MASK)) {
2950 // When the phtread kext is delivering kevents to us, and pthread
2951 // root queues are in use, then the pthread priority TSD is set
2952 // to a sched pri with the _PTHREAD_PRIORITY_SCHED_PRI_FLAG bit set.
2953 //
2954 // Given that this isn't a valid QoS we need to fixup the TSD,
2955 // and the best option is to clear the qos/priority bits which tells
2956 // us to not do any QoS related calls on this thread.
2957 //
2958 // However, in that case the manager thread is opted out of QoS,
2959 // as far as pthread is concerned, and can't be turned into
2960 // something else, so we can't stash.
2961 pp &= (pthread_priority_t)_PTHREAD_PRIORITY_FLAGS_MASK;
2962 }
2963 // Managers always park without mutating to a regular worker thread, and
2964 // hence never need to unbind from userland, and when draining a manager,
2965 // the NEEDS_UNBIND flag would cause the mutation to happen.
2966 // So we need to strip this flag
2967 pp &= ~(pthread_priority_t)_PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
2968 _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
2969
2970 // ensure kevents registered from this thread are registered at manager QoS
2971 pthread_priority_t old_dp = _dispatch_set_defaultpriority(
2972 (pthread_priority_t)_PTHREAD_PRIORITY_EVENT_MANAGER_FLAG, NULL);
2973 _dispatch_queue_set_current(&_dispatch_mgr_q);
2974 if (_dispatch_queue_drain_try_lock(&_dispatch_mgr_q,
2975 DISPATCH_INVOKE_STEALING, NULL) != owned) {
2976 DISPATCH_INTERNAL_CRASH(0, "Locking the manager should not fail");
2977 }
2978 static int event_thread_init;
2979 if (!event_thread_init) {
2980 event_thread_init = 1;
2981 _dispatch_event_init();
2982 }
2983 return old_dp;
2984 }
2985
2986 DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
2987 static inline bool
2988 _dispatch_kevent_worker_thread_reset(pthread_priority_t old_dp)
2989 {
2990 dispatch_queue_t dq = &_dispatch_mgr_q;
2991 uint64_t orig_dq_state;
2992
2993 _dispatch_queue_drain_unlock(dq, DISPATCH_QUEUE_SERIAL_DRAIN_OWNED,
2994 &orig_dq_state);
2995 _dispatch_reset_defaultpriority(old_dp);
2996 _dispatch_queue_set_current(NULL);
2997 return _dq_state_is_dirty(orig_dq_state);
2998 }
2999
3000 DISPATCH_NOINLINE
3001 void
3002 _dispatch_kevent_worker_thread(_dispatch_kevent_qos_s **events, int *nevents)
3003 {
3004 _dispatch_introspection_thread_add();
3005
3006 if (!events && !nevents) {
3007 // events for worker thread request have already been delivered earlier
3008 return;
3009 }
3010
3011 _dispatch_kevent_qos_s *ke = *events;
3012 int n = *nevents;
3013 if (!dispatch_assume(n) || !dispatch_assume(*events)) return;
3014
3015 dispatch_deferred_items_s ddi;
3016 pthread_priority_t old_dp = _dispatch_kevent_worker_thread_init(&ddi);
3017
3018 _dispatch_deferred_items_set(&ddi);
3019 for (int i = 0; i < n; i++) {
3020 _dispatch_kevent_debug("received", ke);
3021 _dispatch_kevent_drain(ke++);
3022 }
3023
3024 if (old_dp != DISPATCH_KEVENT_WORKER_IS_NOT_MANAGER) {
3025 _dispatch_mgr_queue_drain();
3026 bool poll = _dispatch_mgr_timers();
3027 if (_dispatch_kevent_worker_thread_reset(old_dp)) {
3028 poll = true;
3029 }
3030 if (poll) _dispatch_mgr_queue_poke(&_dispatch_mgr_q, 0);
3031 }
3032 _dispatch_deferred_items_set(NULL);
3033
3034 if (ddi.ddi_stashed_pp & _PTHREAD_PRIORITY_PRIORITY_MASK) {
3035 *nevents = 0;
3036 if (ddi.ddi_nevents) {
3037 _dispatch_kq_update_all(ddi.ddi_eventlist, ddi.ddi_nevents);
3038 }
3039 ddi.ddi_stashed_pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
3040 return _dispatch_root_queue_drain_deferred_item(ddi.ddi_stashed_dq,
3041 ddi.ddi_stashed_dou, ddi.ddi_stashed_pp);
3042 #ifndef WORKQ_KEVENT_EVENT_BUFFER_LEN
3043 } else if (ddi.ddi_nevents > *nevents) {
3044 *nevents = 0;
3045 _dispatch_kq_update_all(ddi.ddi_eventlist, ddi.ddi_nevents);
3046 #endif
3047 } else {
3048 *nevents = ddi.ddi_nevents;
3049 dispatch_static_assert(__builtin_types_compatible_p(typeof(**events),
3050 typeof(*ddi.ddi_eventlist)));
3051 memcpy(*events, ddi.ddi_eventlist,
3052 (size_t)ddi.ddi_nevents * sizeof(*ddi.ddi_eventlist));
3053 }
3054 }
3055 #endif // DISPATCH_USE_KEVENT_WORKQUEUE
3056
3057 #pragma mark -
3058 #pragma mark dispatch_memorypressure
3059
3060 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
3061 #define DISPATCH_MEMORYPRESSURE_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYPRESSURE
3062 #define DISPATCH_MEMORYPRESSURE_SOURCE_MASK ( \
3063 DISPATCH_MEMORYPRESSURE_NORMAL | \
3064 DISPATCH_MEMORYPRESSURE_WARN | \
3065 DISPATCH_MEMORYPRESSURE_CRITICAL | \
3066 DISPATCH_MEMORYPRESSURE_PROC_LIMIT_WARN | \
3067 DISPATCH_MEMORYPRESSURE_PROC_LIMIT_CRITICAL)
3068 #define DISPATCH_MEMORYPRESSURE_MALLOC_MASK ( \
3069 DISPATCH_MEMORYPRESSURE_WARN | \
3070 DISPATCH_MEMORYPRESSURE_CRITICAL | \
3071 DISPATCH_MEMORYPRESSURE_PROC_LIMIT_WARN | \
3072 DISPATCH_MEMORYPRESSURE_PROC_LIMIT_CRITICAL)
3073 #endif
3074
3075 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
3076 static dispatch_source_t _dispatch_memorypressure_source;
3077
3078 static void
3079 _dispatch_memorypressure_handler(void *context DISPATCH_UNUSED)
3080 {
3081 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
3082 unsigned long memorypressure;
3083 memorypressure = dispatch_source_get_data(_dispatch_memorypressure_source);
3084
3085 if (memorypressure & DISPATCH_MEMORYPRESSURE_NORMAL) {
3086 _dispatch_memory_warn = false;
3087 _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT;
3088 #if VOUCHER_USE_MACH_VOUCHER
3089 if (_firehose_task_buffer) {
3090 firehose_buffer_clear_bank_flags(_firehose_task_buffer,
3091 FIREHOSE_BUFFER_BANK_FLAG_LOW_MEMORY);
3092 }
3093 #endif
3094 }
3095 if (memorypressure & DISPATCH_MEMORYPRESSURE_WARN) {
3096 _dispatch_memory_warn = true;
3097 _dispatch_continuation_cache_limit =
3098 DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYPRESSURE_PRESSURE_WARN;
3099 #if VOUCHER_USE_MACH_VOUCHER
3100 if (_firehose_task_buffer) {
3101 firehose_buffer_set_bank_flags(_firehose_task_buffer,
3102 FIREHOSE_BUFFER_BANK_FLAG_LOW_MEMORY);
3103 }
3104 #endif
3105 }
3106 if (memorypressure & DISPATCH_MEMORYPRESSURE_MALLOC_MASK) {
3107 malloc_memory_event_handler(memorypressure & DISPATCH_MEMORYPRESSURE_MALLOC_MASK);
3108 }
3109 #endif
3110 }
3111
3112 static void
3113 _dispatch_memorypressure_init(void)
3114 {
3115 _dispatch_memorypressure_source = dispatch_source_create(
3116 DISPATCH_MEMORYPRESSURE_SOURCE_TYPE, 0,
3117 DISPATCH_MEMORYPRESSURE_SOURCE_MASK,
3118 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true));
3119 dispatch_source_set_event_handler_f(_dispatch_memorypressure_source,
3120 _dispatch_memorypressure_handler);
3121 dispatch_activate(_dispatch_memorypressure_source);
3122 }
3123 #else
3124 static inline void _dispatch_memorypressure_init(void) {}
3125 #endif // DISPATCH_USE_MEMORYPRESSURE_SOURCE
3126
3127 #pragma mark -
3128 #pragma mark dispatch_mach
3129
3130 #if HAVE_MACH
3131
3132 #if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
3133 #define _dispatch_debug_machport(name) \
3134 dispatch_debug_machport((name), __func__)
3135 #else
3136 #define _dispatch_debug_machport(name) ((void)(name))
3137 #endif
3138
3139 // Flags for all notifications that are registered/unregistered when a
3140 // send-possible notification is requested/delivered
3141 #define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
3142 DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
3143 #define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \
3144 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
3145 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
3146 #define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \
3147 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
3148 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
3149
3150 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
3151 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
3152 (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
3153
3154 #define _DISPATCH_MACHPORT_HASH_SIZE 32
3155 #define _DISPATCH_MACHPORT_HASH(x) \
3156 _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
3157
3158 #ifndef MACH_RCV_VOUCHER
3159 #define MACH_RCV_VOUCHER 0x00000800
3160 #endif
3161 #define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX
3162 #define DISPATCH_MACH_RCV_OPTIONS ( \
3163 MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \
3164 MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \
3165 MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)) | \
3166 MACH_RCV_VOUCHER
3167
3168 #define DISPATCH_MACH_NOTIFICATION_ARMED(dk) ((dk)->dk_kevent.ext[0])
3169
3170 static void _dispatch_kevent_mach_msg_recv(_dispatch_kevent_qos_s *ke,
3171 mach_msg_header_t *hdr);
3172 static void _dispatch_kevent_mach_msg_destroy(_dispatch_kevent_qos_s *ke,
3173 mach_msg_header_t *hdr);
3174 static void _dispatch_source_merge_mach_msg(dispatch_source_t ds,
3175 dispatch_source_refs_t dr, dispatch_kevent_t dk,
3176 _dispatch_kevent_qos_s *ke, mach_msg_header_t *hdr,
3177 mach_msg_size_t siz);
3178 static kern_return_t _dispatch_mach_notify_update(dispatch_kevent_t dk,
3179 uint32_t new_flags, uint32_t del_flags, uint32_t mask,
3180 mach_msg_id_t notify_msgid, mach_port_mscount_t notify_sync);
3181 static void _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr);
3182 static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
3183 dispatch_mach_reply_refs_t dmr, unsigned int options);
3184 static void _dispatch_mach_notification_kevent_unregister(dispatch_mach_t dm);
3185 static void _dispatch_mach_msg_recv(dispatch_mach_t dm,
3186 dispatch_mach_reply_refs_t dmr, _dispatch_kevent_qos_s *ke,
3187 mach_msg_header_t *hdr, mach_msg_size_t siz);
3188 static void _dispatch_mach_merge_notification_kevent(dispatch_mach_t dm,
3189 const _dispatch_kevent_qos_s *ke);
3190 static inline mach_msg_option_t _dispatch_mach_checkin_options(void);
3191
3192 static const size_t _dispatch_mach_recv_msg_size =
3193 DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE;
3194 static const size_t dispatch_mach_trailer_size =
3195 sizeof(dispatch_mach_trailer_t);
3196 static mach_port_t _dispatch_mach_notify_port;
3197 static dispatch_source_t _dispatch_mach_notify_source;
3198
3199 static inline void*
3200 _dispatch_kevent_mach_msg_buf(_dispatch_kevent_qos_s *ke)
3201 {
3202 return (void*)ke->ext[0];
3203 }
3204
3205 static inline mach_msg_size_t
3206 _dispatch_kevent_mach_msg_size(_dispatch_kevent_qos_s *ke)
3207 {
3208 // buffer size in the successful receive case, but message size (like
3209 // msgh_size) in the MACH_RCV_TOO_LARGE case, i.e. add trailer size.
3210 return (mach_msg_size_t)ke->ext[1];
3211 }
3212
3213 static void
3214 _dispatch_source_type_mach_recv_direct_init(dispatch_source_t ds,
3215 dispatch_source_type_t type DISPATCH_UNUSED,
3216 uintptr_t handle DISPATCH_UNUSED,
3217 unsigned long mask DISPATCH_UNUSED)
3218 {
3219 ds->ds_pending_data_mask = DISPATCH_MACH_RECV_MESSAGE_DIRECT;
3220 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3221 if (_dispatch_evfilt_machport_direct_enabled) return;
3222 ds->ds_dkev->dk_kevent.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT;
3223 ds->ds_dkev->dk_kevent.flags &= ~(EV_UDATA_SPECIFIC|EV_VANISHED);
3224 ds->ds_is_direct_kevent = false;
3225 #endif
3226 }
3227
3228 static const
3229 struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct = {
3230 .ke = {
3231 .filter = EVFILT_MACHPORT,
3232 .flags = EV_VANISHED|EV_DISPATCH|EV_UDATA_SPECIFIC,
3233 .fflags = DISPATCH_MACH_RCV_OPTIONS,
3234 },
3235 .init = _dispatch_source_type_mach_recv_direct_init,
3236 };
3237
3238 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3239 static mach_port_t _dispatch_mach_portset, _dispatch_mach_recv_portset;
3240 static _dispatch_kevent_qos_s _dispatch_mach_recv_kevent = {
3241 .filter = EVFILT_MACHPORT,
3242 .flags = EV_ADD|EV_ENABLE|EV_DISPATCH,
3243 .fflags = DISPATCH_MACH_RCV_OPTIONS,
3244 };
3245
3246 static void
3247 _dispatch_mach_recv_msg_buf_init(void)
3248 {
3249 if (_dispatch_evfilt_machport_direct_enabled) return;
3250 mach_vm_size_t vm_size = mach_vm_round_page(
3251 _dispatch_mach_recv_msg_size + dispatch_mach_trailer_size);
3252 mach_vm_address_t vm_addr = vm_page_size;
3253 kern_return_t kr;
3254
3255 while (slowpath(kr = mach_vm_allocate(mach_task_self(), &vm_addr, vm_size,
3256 VM_FLAGS_ANYWHERE))) {
3257 if (kr != KERN_NO_SPACE) {
3258 DISPATCH_CLIENT_CRASH(kr,
3259 "Could not allocate mach msg receive buffer");
3260 }
3261 _dispatch_temporary_resource_shortage();
3262 vm_addr = vm_page_size;
3263 }
3264 _dispatch_mach_recv_kevent.ext[0] = (uintptr_t)vm_addr;
3265 _dispatch_mach_recv_kevent.ext[1] = vm_size;
3266 }
3267 #endif
3268
3269 DISPATCH_NOINLINE
3270 static void
3271 _dispatch_source_merge_mach_msg_direct(dispatch_source_t ds,
3272 _dispatch_kevent_qos_s *ke, mach_msg_header_t *hdr)
3273 {
3274 dispatch_continuation_t dc = _dispatch_source_get_event_handler(ds->ds_refs);
3275 dispatch_queue_t cq = _dispatch_queue_get_current();
3276
3277 // see firehose_client_push_notify_async
3278 _dispatch_queue_set_current(ds->_as_dq);
3279 dc->dc_func(hdr);
3280 _dispatch_queue_set_current(cq);
3281 if (hdr != _dispatch_kevent_mach_msg_buf(ke)) {
3282 free(hdr);
3283 }
3284 }
3285
3286 dispatch_source_t
3287 _dispatch_source_create_mach_msg_direct_recv(mach_port_t recvp,
3288 const struct dispatch_continuation_s *dc)
3289 {
3290 dispatch_source_t ds;
3291 ds = dispatch_source_create(&_dispatch_source_type_mach_recv_direct,
3292 recvp, 0, &_dispatch_mgr_q);
3293 os_atomic_store(&ds->ds_refs->ds_handler[DS_EVENT_HANDLER],
3294 (dispatch_continuation_t)dc, relaxed);
3295 return ds;
3296 }
3297
3298 static void
3299 _dispatch_mach_notify_port_init(void *context DISPATCH_UNUSED)
3300 {
3301 kern_return_t kr;
3302 #if HAVE_MACH_PORT_CONSTRUCT
3303 mach_port_options_t opts = { .flags = MPO_CONTEXT_AS_GUARD | MPO_STRICT };
3304 #ifdef __LP64__
3305 const mach_port_context_t guard = 0xfeed09071f1ca7edull;
3306 #else
3307 const mach_port_context_t guard = 0xff1ca7edull;
3308 #endif
3309 kr = mach_port_construct(mach_task_self(), &opts, guard,
3310 &_dispatch_mach_notify_port);
3311 #else
3312 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
3313 &_dispatch_mach_notify_port);
3314 #endif
3315 DISPATCH_VERIFY_MIG(kr);
3316 if (slowpath(kr)) {
3317 DISPATCH_CLIENT_CRASH(kr,
3318 "mach_port_construct() failed: cannot create receive right");
3319 }
3320
3321 static const struct dispatch_continuation_s dc = {
3322 .dc_func = (void*)_dispatch_mach_notify_source_invoke,
3323 };
3324 _dispatch_mach_notify_source = _dispatch_source_create_mach_msg_direct_recv(
3325 _dispatch_mach_notify_port, &dc);
3326 dispatch_assert(_dispatch_mach_notify_source);
3327 dispatch_activate(_dispatch_mach_notify_source);
3328 }
3329
3330 static mach_port_t
3331 _dispatch_get_mach_notify_port(void)
3332 {
3333 static dispatch_once_t pred;
3334 dispatch_once_f(&pred, NULL, _dispatch_mach_notify_port_init);
3335 return _dispatch_mach_notify_port;
3336 }
3337
3338 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3339 static void
3340 _dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED)
3341 {
3342 kern_return_t kr;
3343
3344 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
3345 &_dispatch_mach_recv_portset);
3346 DISPATCH_VERIFY_MIG(kr);
3347 if (slowpath(kr)) {
3348 DISPATCH_CLIENT_CRASH(kr,
3349 "mach_port_allocate() failed: cannot create port set");
3350 }
3351 _dispatch_kevent_qos_s *ke = &_dispatch_mach_recv_kevent;
3352 dispatch_assert(_dispatch_kevent_mach_msg_buf(ke));
3353 dispatch_assert(dispatch_mach_trailer_size ==
3354 REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS(
3355 DISPATCH_MACH_RCV_TRAILER)));
3356 ke->ident = _dispatch_mach_recv_portset;
3357 #if DISPATCH_USE_KEVENT_WORKQUEUE
3358 if (_dispatch_kevent_workqueue_enabled) {
3359 ke->qos = _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
3360 }
3361 #endif
3362 _dispatch_kq_immediate_update(&_dispatch_mach_recv_kevent);
3363 }
3364
3365 static mach_port_t
3366 _dispatch_get_mach_recv_portset(void)
3367 {
3368 static dispatch_once_t pred;
3369 dispatch_once_f(&pred, NULL, _dispatch_mach_recv_portset_init);
3370 return _dispatch_mach_recv_portset;
3371 }
3372
3373 static void
3374 _dispatch_mach_portset_init(void *context DISPATCH_UNUSED)
3375 {
3376 _dispatch_kevent_qos_s kev = {
3377 .filter = EVFILT_MACHPORT,
3378 .flags = EV_ADD,
3379 };
3380 #if DISPATCH_USE_KEVENT_WORKQUEUE
3381 if (_dispatch_kevent_workqueue_enabled) {
3382 kev.qos = _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
3383 }
3384 #endif
3385
3386 kern_return_t kr;
3387
3388 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
3389 &_dispatch_mach_portset);
3390 DISPATCH_VERIFY_MIG(kr);
3391 if (slowpath(kr)) {
3392 DISPATCH_CLIENT_CRASH(kr,
3393 "mach_port_allocate() failed: cannot create port set");
3394 }
3395 kev.ident = _dispatch_mach_portset;
3396 _dispatch_kq_immediate_update(&kev);
3397 }
3398
3399 static mach_port_t
3400 _dispatch_get_mach_portset(void)
3401 {
3402 static dispatch_once_t pred;
3403 dispatch_once_f(&pred, NULL, _dispatch_mach_portset_init);
3404 return _dispatch_mach_portset;
3405 }
3406
3407 static kern_return_t
3408 _dispatch_mach_portset_update(dispatch_kevent_t dk, mach_port_t mps)
3409 {
3410 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
3411 kern_return_t kr;
3412
3413 _dispatch_debug_machport(mp);
3414 kr = mach_port_move_member(mach_task_self(), mp, mps);
3415 if (slowpath(kr)) {
3416 DISPATCH_VERIFY_MIG(kr);
3417 switch (kr) {
3418 case KERN_INVALID_RIGHT:
3419 if (mps) {
3420 _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
3421 "mach_port_move_member() failed ", kr);
3422 break;
3423 }
3424 //fall through
3425 case KERN_INVALID_NAME:
3426 #if DISPATCH_DEBUG
3427 _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
3428 "prematurely", mp);
3429 #endif
3430 break;
3431 default:
3432 (void)dispatch_assume_zero(kr);
3433 break;
3434 }
3435 }
3436 return mps ? kr : 0;
3437 }
3438
3439 static kern_return_t
3440 _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags,
3441 uint32_t del_flags)
3442 {
3443 kern_return_t kr = 0;
3444 dispatch_assert_zero(new_flags & del_flags);
3445 if ((new_flags & _DISPATCH_MACH_RECV_FLAGS) ||
3446 (del_flags & _DISPATCH_MACH_RECV_FLAGS)) {
3447 mach_port_t mps;
3448 if (new_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
3449 mps = _dispatch_get_mach_recv_portset();
3450 } else if ((new_flags & DISPATCH_MACH_RECV_MESSAGE) ||
3451 ((del_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) &&
3452 (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE))) {
3453 mps = _dispatch_get_mach_portset();
3454 } else {
3455 mps = MACH_PORT_NULL;
3456 }
3457 kr = _dispatch_mach_portset_update(dk, mps);
3458 }
3459 return kr;
3460 }
3461 #endif // DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3462
3463 static kern_return_t
3464 _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk, uint32_t new_flags,
3465 uint32_t del_flags)
3466 {
3467 kern_return_t kr = 0;
3468 dispatch_assert_zero(new_flags & del_flags);
3469 if ((new_flags & _DISPATCH_MACH_SP_FLAGS) ||
3470 (del_flags & _DISPATCH_MACH_SP_FLAGS)) {
3471 // Requesting a (delayed) non-sync send-possible notification
3472 // registers for both immediate dead-name notification and delayed-arm
3473 // send-possible notification for the port.
3474 // The send-possible notification is armed when a mach_msg() with the
3475 // the MACH_SEND_NOTIFY to the port times out.
3476 // If send-possible is unavailable, fall back to immediate dead-name
3477 // registration rdar://problem/2527840&9008724
3478 kr = _dispatch_mach_notify_update(dk, new_flags, del_flags,
3479 _DISPATCH_MACH_SP_FLAGS, MACH_NOTIFY_SEND_POSSIBLE,
3480 MACH_NOTIFY_SEND_POSSIBLE == MACH_NOTIFY_DEAD_NAME ? 1 : 0);
3481 }
3482 return kr;
3483 }
3484
3485 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3486 DISPATCH_NOINLINE
3487 static void
3488 _dispatch_kevent_machport_drain(_dispatch_kevent_qos_s *ke)
3489 {
3490 mach_port_t name = (mach_port_name_t)ke->data;
3491 dispatch_kevent_t dk;
3492
3493 _dispatch_debug_machport(name);
3494 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
3495 if (!dispatch_assume(dk)) {
3496 return;
3497 }
3498 _dispatch_mach_portset_update(dk, MACH_PORT_NULL); // emulate EV_DISPATCH
3499
3500 _dispatch_kevent_qos_s kev = {
3501 .ident = name,
3502 .filter = EVFILT_MACHPORT,
3503 .flags = EV_ADD|EV_ENABLE|EV_DISPATCH,
3504 .fflags = DISPATCH_MACH_RECV_MESSAGE,
3505 .udata = (uintptr_t)dk,
3506 };
3507 _dispatch_kevent_debug("synthetic", &kev);
3508 _dispatch_kevent_merge(&kev);
3509 }
3510 #endif
3511
3512 DISPATCH_NOINLINE
3513 static void
3514 _dispatch_kevent_mach_msg_drain(_dispatch_kevent_qos_s *ke)
3515 {
3516 mach_msg_header_t *hdr = _dispatch_kevent_mach_msg_buf(ke);
3517 mach_msg_size_t siz;
3518 mach_msg_return_t kr = (mach_msg_return_t)ke->fflags;
3519
3520 if (!fastpath(hdr)) {
3521 DISPATCH_INTERNAL_CRASH(kr, "EVFILT_MACHPORT with no message");
3522 }
3523 if (fastpath(!kr)) {
3524 _dispatch_kevent_mach_msg_recv(ke, hdr);
3525 goto out;
3526 } else if (kr != MACH_RCV_TOO_LARGE) {
3527 goto out;
3528 } else if (!ke->data) {
3529 DISPATCH_INTERNAL_CRASH(0, "MACH_RCV_LARGE_IDENTITY with no identity");
3530 }
3531 if (slowpath(ke->ext[1] > (UINT_MAX - dispatch_mach_trailer_size))) {
3532 DISPATCH_INTERNAL_CRASH(ke->ext[1],
3533 "EVFILT_MACHPORT with overlarge message");
3534 }
3535 siz = _dispatch_kevent_mach_msg_size(ke) + dispatch_mach_trailer_size;
3536 hdr = malloc(siz);
3537 if (!dispatch_assume(hdr)) {
3538 // Kernel will discard message too large to fit
3539 hdr = NULL;
3540 siz = 0;
3541 }
3542 mach_port_t name = (mach_port_name_t)ke->data;
3543 const mach_msg_option_t options = ((DISPATCH_MACH_RCV_OPTIONS |
3544 MACH_RCV_TIMEOUT) & ~MACH_RCV_LARGE);
3545 kr = mach_msg(hdr, options, 0, siz, name, MACH_MSG_TIMEOUT_NONE,
3546 MACH_PORT_NULL);
3547 if (fastpath(!kr)) {
3548 _dispatch_kevent_mach_msg_recv(ke, hdr);
3549 goto out;
3550 } else if (kr == MACH_RCV_TOO_LARGE) {
3551 _dispatch_log("BUG in libdispatch client: "
3552 "_dispatch_kevent_mach_msg_drain: dropped message too "
3553 "large to fit in memory: id = 0x%x, size = %u",
3554 hdr->msgh_id, _dispatch_kevent_mach_msg_size(ke));
3555 kr = MACH_MSG_SUCCESS;
3556 }
3557 if (hdr != _dispatch_kevent_mach_msg_buf(ke)) {
3558 free(hdr);
3559 }
3560 out:
3561 if (slowpath(kr)) {
3562 _dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: "
3563 "message reception failed", kr);
3564 }
3565 }
3566
3567 DISPATCH_NOINLINE
3568 static void
3569 _dispatch_mach_kevent_merge(_dispatch_kevent_qos_s *ke)
3570 {
3571 if (unlikely(!(ke->flags & EV_UDATA_SPECIFIC))) {
3572 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
3573 if (ke->ident == _dispatch_mach_recv_portset) {
3574 _dispatch_kevent_mach_msg_drain(ke);
3575 return _dispatch_kq_deferred_update(&_dispatch_mach_recv_kevent);
3576 } else if (ke->ident == _dispatch_mach_portset) {
3577 return _dispatch_kevent_machport_drain(ke);
3578 }
3579 #endif
3580 return _dispatch_kevent_error(ke);
3581 }
3582
3583 dispatch_kevent_t dk = (dispatch_kevent_t)ke->udata;
3584 dispatch_source_refs_t dr = TAILQ_FIRST(&dk->dk_sources);
3585 bool is_reply = (dk->dk_kevent.flags & EV_ONESHOT);
3586 dispatch_source_t ds = _dispatch_source_from_refs(dr);
3587
3588 if (_dispatch_kevent_mach_msg_size(ke)) {
3589 _dispatch_kevent_mach_msg_drain(ke);
3590 if (is_reply) {
3591 // _dispatch_kevent_mach_msg_drain() should have deleted this event
3592 dispatch_assert(ke->flags & EV_DELETE);
3593 return;
3594 }
3595
3596 if (!(ds->dq_atomic_flags & DSF_CANCELED)) {
3597 // re-arm the mach channel
3598 ke->fflags = DISPATCH_MACH_RCV_OPTIONS;
3599 ke->data = 0;
3600 ke->ext[0] = 0;
3601 ke->ext[1] = 0;
3602 return _dispatch_kq_deferred_update(ke);
3603 }
3604 } else if (is_reply) {
3605 DISPATCH_INTERNAL_CRASH(ke->flags, "Unexpected EVFILT_MACHPORT event");
3606 }
3607 if (unlikely((ke->flags & EV_VANISHED) &&
3608 (dx_type(ds) == DISPATCH_MACH_CHANNEL_TYPE))) {
3609 DISPATCH_CLIENT_CRASH(ke->flags,
3610 "Unexpected EV_VANISHED (do not destroy random mach ports)");
3611 }
3612 return _dispatch_kevent_merge(ke);
3613 }
3614
3615 static void
3616 _dispatch_kevent_mach_msg_recv(_dispatch_kevent_qos_s *ke,
3617 mach_msg_header_t *hdr)
3618 {
3619 dispatch_source_refs_t dri;
3620 dispatch_kevent_t dk;
3621 mach_port_t name = hdr->msgh_local_port;
3622 mach_msg_size_t siz = hdr->msgh_size + dispatch_mach_trailer_size;
3623
3624 if (!dispatch_assume(hdr->msgh_size <= UINT_MAX -
3625 dispatch_mach_trailer_size)) {
3626 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3627 "received overlarge message");
3628 return _dispatch_kevent_mach_msg_destroy(ke, hdr);
3629 }
3630 if (!dispatch_assume(name)) {
3631 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3632 "received message with MACH_PORT_NULL port");
3633 return _dispatch_kevent_mach_msg_destroy(ke, hdr);
3634 }
3635 _dispatch_debug_machport(name);
3636 if (ke->flags & EV_UDATA_SPECIFIC) {
3637 dk = (void*)ke->udata;
3638 } else {
3639 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
3640 }
3641 if (!dispatch_assume(dk)) {
3642 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3643 "received message with unknown kevent");
3644 return _dispatch_kevent_mach_msg_destroy(ke, hdr);
3645 }
3646 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
3647 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
3648 if (dsi->ds_pending_data_mask & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
3649 return _dispatch_source_merge_mach_msg(dsi, dri, dk, ke, hdr, siz);
3650 }
3651 }
3652 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3653 "received message with no listeners");
3654 return _dispatch_kevent_mach_msg_destroy(ke, hdr);
3655 }
3656
3657 static void
3658 _dispatch_kevent_mach_msg_destroy(_dispatch_kevent_qos_s *ke,
3659 mach_msg_header_t *hdr)
3660 {
3661 if (hdr) {
3662 mach_msg_destroy(hdr);
3663 if (hdr != _dispatch_kevent_mach_msg_buf(ke)) {
3664 free(hdr);
3665 }
3666 }
3667 }
3668
3669 static void
3670 _dispatch_source_merge_mach_msg(dispatch_source_t ds, dispatch_source_refs_t dr,
3671 dispatch_kevent_t dk, _dispatch_kevent_qos_s *ke,
3672 mach_msg_header_t *hdr, mach_msg_size_t siz)
3673 {
3674 if (dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE) {
3675 return _dispatch_source_merge_mach_msg_direct(ds, ke, hdr);
3676 }
3677 dispatch_mach_reply_refs_t dmr = NULL;
3678 if (dk->dk_kevent.flags & EV_ONESHOT) {
3679 dmr = (dispatch_mach_reply_refs_t)dr;
3680 }
3681 return _dispatch_mach_msg_recv((dispatch_mach_t)ds, dmr, ke, hdr, siz);
3682 }
3683
3684 DISPATCH_NOINLINE
3685 static void
3686 _dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, bool final)
3687 {
3688 dispatch_source_refs_t dri, dr_next;
3689 dispatch_kevent_t dk;
3690 bool unreg;
3691
3692 dk = _dispatch_kevent_find(name, DISPATCH_EVFILT_MACH_NOTIFICATION);
3693 if (!dk) {
3694 return;
3695 }
3696
3697 // Update notification registration state.
3698 dk->dk_kevent.data &= ~_DISPATCH_MACH_SP_FLAGS;
3699 _dispatch_kevent_qos_s kev = {
3700 .ident = name,
3701 .filter = DISPATCH_EVFILT_MACH_NOTIFICATION,
3702 .flags = EV_ADD|EV_ENABLE,
3703 .fflags = flag,
3704 .udata = (uintptr_t)dk,
3705 };
3706 if (final) {
3707 // This can never happen again
3708 unreg = true;
3709 } else {
3710 // Re-register for notification before delivery
3711 unreg = _dispatch_kevent_resume(dk, flag, 0);
3712 }
3713 DISPATCH_MACH_NOTIFICATION_ARMED(dk) = 0;
3714 TAILQ_FOREACH_SAFE(dri, &dk->dk_sources, dr_list, dr_next) {
3715 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
3716 if (dx_type(dsi) == DISPATCH_MACH_CHANNEL_TYPE) {
3717 dispatch_mach_t dm = (dispatch_mach_t)dsi;
3718 _dispatch_mach_merge_notification_kevent(dm, &kev);
3719 if (unreg && dm->dm_dkev) {
3720 _dispatch_mach_notification_kevent_unregister(dm);
3721 }
3722 } else {
3723 _dispatch_source_merge_kevent(dsi, &kev);
3724 if (unreg) {
3725 _dispatch_source_kevent_unregister(dsi);
3726 }
3727 }
3728 if (!dr_next || DISPATCH_MACH_NOTIFICATION_ARMED(dk)) {
3729 // current merge is last in list (dk might have been freed)
3730 // or it re-armed the notification
3731 return;
3732 }
3733 }
3734 }
3735
3736 static kern_return_t
3737 _dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags,
3738 uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid,
3739 mach_port_mscount_t notify_sync)
3740 {
3741 mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
3742 typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data;
3743 kern_return_t kr, krr = 0;
3744
3745 // Update notification registration state.
3746 dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask;
3747 dk->dk_kevent.data &= ~(del_flags & mask);
3748
3749 _dispatch_debug_machport(port);
3750 if ((dk->dk_kevent.data & mask) && !(prev & mask)) {
3751 _dispatch_debug("machport[0x%08x]: registering for send-possible "
3752 "notification", port);
3753 previous = MACH_PORT_NULL;
3754 krr = mach_port_request_notification(mach_task_self(), port,
3755 notify_msgid, notify_sync, _dispatch_get_mach_notify_port(),
3756 MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
3757 DISPATCH_VERIFY_MIG(krr);
3758
3759 switch(krr) {
3760 case KERN_INVALID_NAME:
3761 case KERN_INVALID_RIGHT:
3762 // Suppress errors & clear registration state
3763 dk->dk_kevent.data &= ~mask;
3764 break;
3765 default:
3766 // Else, we don't expect any errors from mach. Log any errors
3767 if (dispatch_assume_zero(krr)) {
3768 // log the error & clear registration state
3769 dk->dk_kevent.data &= ~mask;
3770 } else if (dispatch_assume_zero(previous)) {
3771 // Another subsystem has beat libdispatch to requesting the
3772 // specified Mach notification on this port. We should
3773 // technically cache the previous port and message it when the
3774 // kernel messages our port. Or we can just say screw those
3775 // subsystems and deallocate the previous port.
3776 // They should adopt libdispatch :-P
3777 kr = mach_port_deallocate(mach_task_self(), previous);
3778 DISPATCH_VERIFY_MIG(kr);
3779 (void)dispatch_assume_zero(kr);
3780 previous = MACH_PORT_NULL;
3781 }
3782 }
3783 } else if (!(dk->dk_kevent.data & mask) && (prev & mask)) {
3784 _dispatch_debug("machport[0x%08x]: unregistering for send-possible "
3785 "notification", port);
3786 previous = MACH_PORT_NULL;
3787 kr = mach_port_request_notification(mach_task_self(), port,
3788 notify_msgid, notify_sync, MACH_PORT_NULL,
3789 MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous);
3790 DISPATCH_VERIFY_MIG(kr);
3791
3792 switch (kr) {
3793 case KERN_INVALID_NAME:
3794 case KERN_INVALID_RIGHT:
3795 case KERN_INVALID_ARGUMENT:
3796 break;
3797 default:
3798 if (dispatch_assume_zero(kr)) {
3799 // log the error
3800 }
3801 }
3802 } else {
3803 return 0;
3804 }
3805 if (slowpath(previous)) {
3806 // the kernel has not consumed the send-once right yet
3807 (void)dispatch_assume_zero(
3808 _dispatch_send_consume_send_once_right(previous));
3809 }
3810 return krr;
3811 }
3812
3813 static void
3814 _dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED)
3815 {
3816 static int notify_type = HOST_NOTIFY_CALENDAR_SET;
3817 kern_return_t kr;
3818 _dispatch_debug("registering for calendar-change notification");
3819 retry:
3820 kr = host_request_notification(_dispatch_get_mach_host_port(),
3821 notify_type, _dispatch_get_mach_notify_port());
3822 // Fallback when missing support for newer _SET variant, fires strictly more.
3823 if (kr == KERN_INVALID_ARGUMENT &&
3824 notify_type != HOST_NOTIFY_CALENDAR_CHANGE){
3825 notify_type = HOST_NOTIFY_CALENDAR_CHANGE;
3826 goto retry;
3827 }
3828 DISPATCH_VERIFY_MIG(kr);
3829 (void)dispatch_assume_zero(kr);
3830 }
3831
3832 static void
3833 _dispatch_mach_host_calendar_change_register(void)
3834 {
3835 static dispatch_once_t pred;
3836 dispatch_once_f(&pred, NULL, _dispatch_mach_host_notify_update);
3837 }
3838
3839 static void
3840 _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr)
3841 {
3842 mig_reply_error_t reply;
3843 dispatch_assert(sizeof(mig_reply_error_t) == sizeof(union
3844 __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem));
3845 dispatch_assert(sizeof(mig_reply_error_t) < _dispatch_mach_recv_msg_size);
3846 boolean_t success = libdispatch_internal_protocol_server(hdr, &reply.Head);
3847 if (!success && reply.RetCode == MIG_BAD_ID &&
3848 (hdr->msgh_id == HOST_CALENDAR_SET_REPLYID ||
3849 hdr->msgh_id == HOST_CALENDAR_CHANGED_REPLYID)) {
3850 _dispatch_debug("calendar-change notification");
3851 _dispatch_timers_calendar_change();
3852 _dispatch_mach_host_notify_update(NULL);
3853 success = TRUE;
3854 reply.RetCode = KERN_SUCCESS;
3855 }
3856 if (dispatch_assume(success) && reply.RetCode != MIG_NO_REPLY) {
3857 (void)dispatch_assume_zero(reply.RetCode);
3858 }
3859 if (!success || (reply.RetCode && reply.RetCode != MIG_NO_REPLY)) {
3860 mach_msg_destroy(hdr);
3861 }
3862 }
3863
3864 kern_return_t
3865 _dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED,
3866 mach_port_name_t name)
3867 {
3868 #if DISPATCH_DEBUG
3869 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
3870 "deleted prematurely", name);
3871 #endif
3872
3873 _dispatch_debug_machport(name);
3874 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED, true);
3875
3876 return KERN_SUCCESS;
3877 }
3878
3879 kern_return_t
3880 _dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED,
3881 mach_port_name_t name)
3882 {
3883 kern_return_t kr;
3884
3885 _dispatch_debug("machport[0x%08x]: dead-name notification", name);
3886 _dispatch_debug_machport(name);
3887 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD, true);
3888
3889 // the act of receiving a dead name notification allocates a dead-name
3890 // right that must be deallocated
3891 kr = mach_port_deallocate(mach_task_self(), name);
3892 DISPATCH_VERIFY_MIG(kr);
3893 //(void)dispatch_assume_zero(kr);
3894
3895 return KERN_SUCCESS;
3896 }
3897
3898 kern_return_t
3899 _dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED,
3900 mach_port_name_t name)
3901 {
3902 _dispatch_debug("machport[0x%08x]: send-possible notification", name);
3903 _dispatch_debug_machport(name);
3904 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE, false);
3905
3906 return KERN_SUCCESS;
3907 }
3908
3909 #pragma mark -
3910 #pragma mark dispatch_mach_t
3911
3912 #define DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT 0x1
3913 #define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2
3914 #define DISPATCH_MACH_WAIT_FOR_REPLY 0x4
3915 #define DISPATCH_MACH_OWNED_REPLY_PORT 0x8
3916 #define DISPATCH_MACH_OPTIONS_MASK 0xffff
3917
3918 #define DM_SEND_STATUS_SUCCESS 0x1
3919 #define DM_SEND_STATUS_RETURNING_IMMEDIATE_SEND_RESULT 0x2
3920
3921 DISPATCH_ENUM(dispatch_mach_send_invoke_flags, uint32_t,
3922 DM_SEND_INVOKE_NONE = 0x0,
3923 DM_SEND_INVOKE_FLUSH = 0x1,
3924 DM_SEND_INVOKE_NEEDS_BARRIER = 0x2,
3925 DM_SEND_INVOKE_CANCEL = 0x4,
3926 DM_SEND_INVOKE_CAN_RUN_BARRIER = 0x8,
3927 DM_SEND_INVOKE_IMMEDIATE_SEND = 0x10,
3928 );
3929 #define DM_SEND_INVOKE_IMMEDIATE_SEND_MASK \
3930 ((dispatch_mach_send_invoke_flags_t)DM_SEND_INVOKE_IMMEDIATE_SEND)
3931
3932 static inline pthread_priority_t _dispatch_mach_priority_propagate(
3933 mach_msg_option_t options);
3934 static mach_port_t _dispatch_mach_msg_get_remote_port(dispatch_object_t dou);
3935 static mach_port_t _dispatch_mach_msg_get_reply_port(dispatch_object_t dou);
3936 static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm,
3937 mach_port_t local_port, mach_port_t remote_port);
3938 static inline void _dispatch_mach_msg_reply_received(dispatch_mach_t dm,
3939 dispatch_mach_reply_refs_t dmr, mach_port_t local_port);
3940 static dispatch_mach_msg_t _dispatch_mach_msg_create_reply_disconnected(
3941 dispatch_object_t dou, dispatch_mach_reply_refs_t dmr);
3942 static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm,
3943 dispatch_object_t dou);
3944 static inline mach_msg_header_t* _dispatch_mach_msg_get_msg(
3945 dispatch_mach_msg_t dmsg);
3946 static void _dispatch_mach_send_push(dispatch_mach_t dm, dispatch_object_t dou,
3947 pthread_priority_t pp);
3948
3949 static dispatch_mach_t
3950 _dispatch_mach_create(const char *label, dispatch_queue_t q, void *context,
3951 dispatch_mach_handler_function_t handler, bool handler_is_block)
3952 {
3953 dispatch_mach_t dm;
3954 dispatch_mach_refs_t dr;
3955
3956 dm = _dispatch_alloc(DISPATCH_VTABLE(mach),
3957 sizeof(struct dispatch_mach_s));
3958 _dispatch_queue_init(dm->_as_dq, DQF_NONE, 1, true);
3959
3960 dm->dq_label = label;
3961 dm->do_ref_cnt++; // the reference _dispatch_mach_cancel_invoke holds
3962
3963 dr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s));
3964 dr->dr_source_wref = _dispatch_ptr2wref(dm);
3965 dr->dm_handler_func = handler;
3966 dr->dm_handler_ctxt = context;
3967 dm->ds_refs = dr;
3968 dm->dm_handler_is_block = handler_is_block;
3969
3970 dm->dm_refs = _dispatch_calloc(1ul,
3971 sizeof(struct dispatch_mach_send_refs_s));
3972 dm->dm_refs->dr_source_wref = _dispatch_ptr2wref(dm);
3973 dm->dm_refs->dm_disconnect_cnt = DISPATCH_MACH_NEVER_CONNECTED;
3974 TAILQ_INIT(&dm->dm_refs->dm_replies);
3975
3976 if (slowpath(!q)) {
3977 q = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true);
3978 } else {
3979 _dispatch_retain(q);
3980 }
3981 dm->do_targetq = q;
3982 _dispatch_object_debug(dm, "%s", __func__);
3983 return dm;
3984 }
3985
3986 dispatch_mach_t
3987 dispatch_mach_create(const char *label, dispatch_queue_t q,
3988 dispatch_mach_handler_t handler)
3989 {
3990 dispatch_block_t bb = _dispatch_Block_copy((void*)handler);
3991 return _dispatch_mach_create(label, q, bb,
3992 (dispatch_mach_handler_function_t)_dispatch_Block_invoke(bb), true);
3993 }
3994
3995 dispatch_mach_t
3996 dispatch_mach_create_f(const char *label, dispatch_queue_t q, void *context,
3997 dispatch_mach_handler_function_t handler)
3998 {
3999 return _dispatch_mach_create(label, q, context, handler, false);
4000 }
4001
4002 void
4003 _dispatch_mach_dispose(dispatch_mach_t dm)
4004 {
4005 _dispatch_object_debug(dm, "%s", __func__);
4006 dispatch_mach_refs_t dr = dm->ds_refs;
4007 if (dm->dm_handler_is_block && dr->dm_handler_ctxt) {
4008 Block_release(dr->dm_handler_ctxt);
4009 }
4010 free(dr);
4011 free(dm->dm_refs);
4012 _dispatch_queue_destroy(dm->_as_dq);
4013 }
4014
4015 void
4016 dispatch_mach_connect(dispatch_mach_t dm, mach_port_t receive,
4017 mach_port_t send, dispatch_mach_msg_t checkin)
4018 {
4019 dispatch_mach_send_refs_t dr = dm->dm_refs;
4020 dispatch_kevent_t dk;
4021 uint32_t disconnect_cnt;
4022 dispatch_source_type_t type = &_dispatch_source_type_mach_recv_direct;
4023
4024 dm->ds_is_direct_kevent = (bool)_dispatch_evfilt_machport_direct_enabled;
4025 if (MACH_PORT_VALID(receive)) {
4026 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
4027 dk->dk_kevent = type->ke;
4028 dk->dk_kevent.ident = receive;
4029 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE|EV_VANISHED;
4030 dk->dk_kevent.udata = (uintptr_t)dk;
4031 TAILQ_INIT(&dk->dk_sources);
4032 dm->ds_dkev = dk;
4033 dm->ds_pending_data_mask = DISPATCH_MACH_RECV_MESSAGE_DIRECT;
4034 dm->ds_needs_rearm = dm->ds_is_direct_kevent;
4035 if (!dm->ds_is_direct_kevent) {
4036 dk->dk_kevent.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT;
4037 dk->dk_kevent.flags &= ~(EV_UDATA_SPECIFIC|EV_VANISHED);
4038 }
4039 _dispatch_retain(dm); // the reference the manager queue holds
4040 }
4041 dr->dm_send = send;
4042 if (MACH_PORT_VALID(send)) {
4043 if (checkin) {
4044 dispatch_retain(checkin);
4045 checkin->dmsg_options = _dispatch_mach_checkin_options();
4046 dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
4047 }
4048 dr->dm_checkin = checkin;
4049 }
4050 // monitor message reply ports
4051 dm->ds_pending_data_mask |= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
4052 dispatch_assert(DISPATCH_MACH_NEVER_CONNECTED - 1 ==
4053 DISPATCH_MACH_NEVER_INSTALLED);
4054 disconnect_cnt = os_atomic_dec2o(dr, dm_disconnect_cnt, release);
4055 if (unlikely(disconnect_cnt != DISPATCH_MACH_NEVER_INSTALLED)) {
4056 DISPATCH_CLIENT_CRASH(disconnect_cnt, "Channel already connected");
4057 }
4058 _dispatch_object_debug(dm, "%s", __func__);
4059 return dispatch_activate(dm);
4060 }
4061
4062 // assumes low bit of mach port names is always set
4063 #define DISPATCH_MACH_REPLY_PORT_UNOWNED 0x1u
4064
4065 static inline void
4066 _dispatch_mach_reply_mark_reply_port_owned(dispatch_mach_reply_refs_t dmr)
4067 {
4068 dmr->dmr_reply &= ~DISPATCH_MACH_REPLY_PORT_UNOWNED;
4069 }
4070
4071 static inline bool
4072 _dispatch_mach_reply_is_reply_port_owned(dispatch_mach_reply_refs_t dmr)
4073 {
4074 mach_port_t reply_port = dmr->dmr_reply;
4075 return reply_port ? !(reply_port & DISPATCH_MACH_REPLY_PORT_UNOWNED) :false;
4076 }
4077
4078 static inline mach_port_t
4079 _dispatch_mach_reply_get_reply_port(dispatch_mach_reply_refs_t dmr)
4080 {
4081 mach_port_t reply_port = dmr->dmr_reply;
4082 return reply_port ? (reply_port | DISPATCH_MACH_REPLY_PORT_UNOWNED) : 0;
4083 }
4084
4085 static inline bool
4086 _dispatch_mach_reply_tryremove(dispatch_mach_t dm,
4087 dispatch_mach_reply_refs_t dmr)
4088 {
4089 bool removed;
4090 _dispatch_unfair_lock_lock(&dm->dm_refs->dm_replies_lock);
4091 if ((removed = _TAILQ_IS_ENQUEUED(dmr, dmr_list))) {
4092 TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dmr_list);
4093 _TAILQ_MARK_NOT_ENQUEUED(dmr, dmr_list);
4094 }
4095 _dispatch_unfair_lock_unlock(&dm->dm_refs->dm_replies_lock);
4096 return removed;
4097 }
4098
4099 DISPATCH_NOINLINE
4100 static void
4101 _dispatch_mach_reply_waiter_unregister(dispatch_mach_t dm,
4102 dispatch_mach_reply_refs_t dmr, unsigned int options)
4103 {
4104 dispatch_mach_msg_t dmsgr = NULL;
4105 bool disconnected = (options & DKEV_UNREGISTER_DISCONNECTED);
4106 if (options & DKEV_UNREGISTER_REPLY_REMOVE) {
4107 _dispatch_unfair_lock_lock(&dm->dm_refs->dm_replies_lock);
4108 if (unlikely(!_TAILQ_IS_ENQUEUED(dmr, dmr_list))) {
4109 DISPATCH_INTERNAL_CRASH(0, "Could not find reply registration");
4110 }
4111 TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dmr_list);
4112 _TAILQ_MARK_NOT_ENQUEUED(dmr, dmr_list);
4113 _dispatch_unfair_lock_unlock(&dm->dm_refs->dm_replies_lock);
4114 }
4115 if (disconnected) {
4116 dmsgr = _dispatch_mach_msg_create_reply_disconnected(NULL, dmr);
4117 } else if (dmr->dmr_voucher) {
4118 _voucher_release(dmr->dmr_voucher);
4119 dmr->dmr_voucher = NULL;
4120 }
4121 _dispatch_debug("machport[0x%08x]: unregistering for sync reply%s, ctxt %p",
4122 _dispatch_mach_reply_get_reply_port(dmr),
4123 disconnected ? " (disconnected)" : "", dmr->dmr_ctxt);
4124 if (dmsgr) {
4125 return _dispatch_queue_push(dm->_as_dq, dmsgr, dmsgr->dmsg_priority);
4126 }
4127 dispatch_assert(!(options & DKEV_UNREGISTER_WAKEUP));
4128 }
4129
4130 DISPATCH_NOINLINE
4131 static void
4132 _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
4133 dispatch_mach_reply_refs_t dmr, unsigned int options)
4134 {
4135 dispatch_mach_msg_t dmsgr = NULL;
4136 bool replies_empty = false;
4137 bool disconnected = (options & DKEV_UNREGISTER_DISCONNECTED);
4138 if (options & DKEV_UNREGISTER_REPLY_REMOVE) {
4139 _dispatch_unfair_lock_lock(&dm->dm_refs->dm_replies_lock);
4140 if (unlikely(!_TAILQ_IS_ENQUEUED(dmr, dmr_list))) {
4141 DISPATCH_INTERNAL_CRASH(0, "Could not find reply registration");
4142 }
4143 TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dmr_list);
4144 _TAILQ_MARK_NOT_ENQUEUED(dmr, dmr_list);
4145 replies_empty = TAILQ_EMPTY(&dm->dm_refs->dm_replies);
4146 _dispatch_unfair_lock_unlock(&dm->dm_refs->dm_replies_lock);
4147 }
4148 if (disconnected) {
4149 dmsgr = _dispatch_mach_msg_create_reply_disconnected(NULL, dmr);
4150 } else if (dmr->dmr_voucher) {
4151 _voucher_release(dmr->dmr_voucher);
4152 dmr->dmr_voucher = NULL;
4153 }
4154 uint32_t flags = DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
4155 dispatch_kevent_t dk = dmr->dmr_dkev;
4156 _dispatch_debug("machport[0x%08x]: unregistering for reply%s, ctxt %p",
4157 (mach_port_t)dk->dk_kevent.ident,
4158 disconnected ? " (disconnected)" : "", dmr->dmr_ctxt);
4159 if (!dm->ds_is_direct_kevent) {
4160 dmr->dmr_dkev = NULL;
4161 TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dmr, dr_list);
4162 _dispatch_kevent_unregister(dk, flags, 0);
4163 } else {
4164 long r = _dispatch_kevent_unregister(dk, flags, options);
4165 if (r == EINPROGRESS) {
4166 _dispatch_debug("machport[0x%08x]: deferred delete kevent[%p]",
4167 (mach_port_t)dk->dk_kevent.ident, dk);
4168 dispatch_assert(options == DKEV_UNREGISTER_DISCONNECTED);
4169 // dmr must be put back so that the event delivery finds it, the
4170 // replies lock is held by the caller.
4171 TAILQ_INSERT_HEAD(&dm->dm_refs->dm_replies, dmr, dmr_list);
4172 if (dmsgr) {
4173 dmr->dmr_voucher = dmsgr->dmsg_voucher;
4174 dmsgr->dmsg_voucher = NULL;
4175 dispatch_release(dmsgr);
4176 }
4177 return; // deferred unregistration
4178 }
4179 dispatch_assume_zero(r);
4180 dmr->dmr_dkev = NULL;
4181 _TAILQ_TRASH_ENTRY(dmr, dr_list);
4182 }
4183 free(dmr);
4184 if (dmsgr) {
4185 return _dispatch_queue_push(dm->_as_dq, dmsgr, dmsgr->dmsg_priority);
4186 }
4187 if ((options & DKEV_UNREGISTER_WAKEUP) && replies_empty &&
4188 (dm->dm_refs->dm_disconnect_cnt ||
4189 (dm->dq_atomic_flags & DSF_CANCELED))) {
4190 dx_wakeup(dm, 0, DISPATCH_WAKEUP_FLUSH);
4191 }
4192 }
4193
4194 DISPATCH_NOINLINE
4195 static void
4196 _dispatch_mach_reply_waiter_register(dispatch_mach_t dm,
4197 dispatch_mach_reply_refs_t dmr, mach_port_t reply_port,
4198 dispatch_mach_msg_t dmsg, mach_msg_option_t msg_opts)
4199 {
4200 dmr->dr_source_wref = _dispatch_ptr2wref(dm);
4201 dmr->dmr_dkev = NULL;
4202 dmr->dmr_reply = reply_port;
4203 if (msg_opts & DISPATCH_MACH_OWNED_REPLY_PORT) {
4204 _dispatch_mach_reply_mark_reply_port_owned(dmr);
4205 } else {
4206 if (dmsg->dmsg_voucher) {
4207 dmr->dmr_voucher = _voucher_retain(dmsg->dmsg_voucher);
4208 }
4209 dmr->dmr_priority = (dispatch_priority_t)dmsg->dmsg_priority;
4210 // make reply context visible to leaks rdar://11777199
4211 dmr->dmr_ctxt = dmsg->do_ctxt;
4212 }
4213
4214 _dispatch_debug("machport[0x%08x]: registering for sync reply, ctxt %p",
4215 reply_port, dmsg->do_ctxt);
4216 _dispatch_unfair_lock_lock(&dm->dm_refs->dm_replies_lock);
4217 if (unlikely(_TAILQ_IS_ENQUEUED(dmr, dmr_list))) {
4218 DISPATCH_INTERNAL_CRASH(dmr->dmr_list.tqe_prev, "Reply already registered");
4219 }
4220 TAILQ_INSERT_TAIL(&dm->dm_refs->dm_replies, dmr, dmr_list);
4221 _dispatch_unfair_lock_unlock(&dm->dm_refs->dm_replies_lock);
4222 }
4223
4224 DISPATCH_NOINLINE
4225 static void
4226 _dispatch_mach_reply_kevent_register(dispatch_mach_t dm, mach_port_t reply_port,
4227 dispatch_mach_msg_t dmsg)
4228 {
4229 dispatch_kevent_t dk;
4230 dispatch_mach_reply_refs_t dmr;
4231 dispatch_source_type_t type = &_dispatch_source_type_mach_recv_direct;
4232 pthread_priority_t mp, pp;
4233
4234 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
4235 dk->dk_kevent = type->ke;
4236 dk->dk_kevent.ident = reply_port;
4237 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE|EV_ONESHOT;
4238 dk->dk_kevent.udata = (uintptr_t)dk;
4239 TAILQ_INIT(&dk->dk_sources);
4240 if (!dm->ds_is_direct_kevent) {
4241 dk->dk_kevent.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
4242 dk->dk_kevent.flags &= ~(EV_UDATA_SPECIFIC|EV_VANISHED);
4243 }
4244
4245 dmr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s));
4246 dmr->dr_source_wref = _dispatch_ptr2wref(dm);
4247 dmr->dmr_dkev = dk;
4248 dmr->dmr_reply = reply_port;
4249 if (dmsg->dmsg_voucher) {
4250 dmr->dmr_voucher = _voucher_retain(dmsg->dmsg_voucher);
4251 }
4252 dmr->dmr_priority = (dispatch_priority_t)dmsg->dmsg_priority;
4253 // make reply context visible to leaks rdar://11777199
4254 dmr->dmr_ctxt = dmsg->do_ctxt;
4255
4256 pp = dm->dq_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK;
4257 if (pp && dm->ds_is_direct_kevent) {
4258 mp = dmsg->dmsg_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK;
4259 if (pp < mp) pp = mp;
4260 pp |= dm->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
4261 } else {
4262 pp = _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
4263 }
4264
4265 _dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p",
4266 reply_port, dmsg->do_ctxt);
4267 uint32_t flags;
4268 bool do_resume = _dispatch_kevent_register(&dmr->dmr_dkev, pp, &flags);
4269 TAILQ_INSERT_TAIL(&dmr->dmr_dkev->dk_sources, (dispatch_source_refs_t)dmr,
4270 dr_list);
4271 _dispatch_unfair_lock_lock(&dm->dm_refs->dm_replies_lock);
4272 if (unlikely(_TAILQ_IS_ENQUEUED(dmr, dmr_list))) {
4273 DISPATCH_INTERNAL_CRASH(dmr->dmr_list.tqe_prev, "Reply already registered");
4274 }
4275 TAILQ_INSERT_TAIL(&dm->dm_refs->dm_replies, dmr, dmr_list);
4276 _dispatch_unfair_lock_unlock(&dm->dm_refs->dm_replies_lock);
4277 if (do_resume && _dispatch_kevent_resume(dmr->dmr_dkev, flags, 0)) {
4278 return _dispatch_mach_reply_kevent_unregister(dm, dmr,
4279 DKEV_UNREGISTER_DISCONNECTED|DKEV_UNREGISTER_REPLY_REMOVE);
4280 }
4281 }
4282
4283 DISPATCH_NOINLINE
4284 static void
4285 _dispatch_mach_notification_kevent_unregister(dispatch_mach_t dm)
4286 {
4287 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
4288 dispatch_kevent_t dk = dm->dm_dkev;
4289 dm->dm_dkev = NULL;
4290 TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dm->dm_refs,
4291 dr_list);
4292 dm->ds_pending_data_mask &= ~(unsigned long)
4293 (DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
4294 _dispatch_kevent_unregister(dk,
4295 DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD, 0);
4296 }
4297
4298 DISPATCH_NOINLINE
4299 static void
4300 _dispatch_mach_notification_kevent_register(dispatch_mach_t dm,mach_port_t send)
4301 {
4302 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
4303 dispatch_kevent_t dk;
4304
4305 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
4306 dk->dk_kevent = _dispatch_source_type_mach_send.ke;
4307 dk->dk_kevent.ident = send;
4308 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
4309 dk->dk_kevent.fflags = DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD;
4310 dk->dk_kevent.udata = (uintptr_t)dk;
4311 TAILQ_INIT(&dk->dk_sources);
4312
4313 dm->ds_pending_data_mask |= dk->dk_kevent.fflags;
4314
4315 uint32_t flags;
4316 bool do_resume = _dispatch_kevent_register(&dk,
4317 _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG, &flags);
4318 TAILQ_INSERT_TAIL(&dk->dk_sources,
4319 (dispatch_source_refs_t)dm->dm_refs, dr_list);
4320 dm->dm_dkev = dk;
4321 if (do_resume && _dispatch_kevent_resume(dm->dm_dkev, flags, 0)) {
4322 _dispatch_mach_notification_kevent_unregister(dm);
4323 }
4324 }
4325
4326 static mach_port_t
4327 _dispatch_get_thread_reply_port(void)
4328 {
4329 mach_port_t reply_port, mrp = _dispatch_get_thread_mig_reply_port();
4330 if (mrp) {
4331 reply_port = mrp;
4332 _dispatch_debug("machport[0x%08x]: borrowed thread sync reply port",
4333 reply_port);
4334 } else {
4335 reply_port = mach_reply_port();
4336 _dispatch_set_thread_mig_reply_port(reply_port);
4337 _dispatch_debug("machport[0x%08x]: allocated thread sync reply port",
4338 reply_port);
4339 }
4340 _dispatch_debug_machport(reply_port);
4341 return reply_port;
4342 }
4343
4344 static void
4345 _dispatch_clear_thread_reply_port(mach_port_t reply_port)
4346 {
4347 mach_port_t mrp = _dispatch_get_thread_mig_reply_port();
4348 if (reply_port != mrp) {
4349 if (mrp) {
4350 _dispatch_debug("machport[0x%08x]: did not clear thread sync reply "
4351 "port (found 0x%08x)", reply_port, mrp);
4352 }
4353 return;
4354 }
4355 _dispatch_set_thread_mig_reply_port(MACH_PORT_NULL);
4356 _dispatch_debug_machport(reply_port);
4357 _dispatch_debug("machport[0x%08x]: cleared thread sync reply port",
4358 reply_port);
4359 }
4360
4361 static void
4362 _dispatch_set_thread_reply_port(mach_port_t reply_port)
4363 {
4364 _dispatch_debug_machport(reply_port);
4365 mach_port_t mrp = _dispatch_get_thread_mig_reply_port();
4366 if (mrp) {
4367 kern_return_t kr = mach_port_mod_refs(mach_task_self(), reply_port,
4368 MACH_PORT_RIGHT_RECEIVE, -1);
4369 DISPATCH_VERIFY_MIG(kr);
4370 dispatch_assume_zero(kr);
4371 _dispatch_debug("machport[0x%08x]: deallocated sync reply port "
4372 "(found 0x%08x)", reply_port, mrp);
4373 } else {
4374 _dispatch_set_thread_mig_reply_port(reply_port);
4375 _dispatch_debug("machport[0x%08x]: restored thread sync reply port",
4376 reply_port);
4377 }
4378 }
4379
4380 static inline mach_port_t
4381 _dispatch_mach_msg_get_remote_port(dispatch_object_t dou)
4382 {
4383 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg);
4384 mach_port_t remote = hdr->msgh_remote_port;
4385 return remote;
4386 }
4387
4388 static inline mach_port_t
4389 _dispatch_mach_msg_get_reply_port(dispatch_object_t dou)
4390 {
4391 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg);
4392 mach_port_t local = hdr->msgh_local_port;
4393 if (!MACH_PORT_VALID(local) || MACH_MSGH_BITS_LOCAL(hdr->msgh_bits) !=
4394 MACH_MSG_TYPE_MAKE_SEND_ONCE) return MACH_PORT_NULL;
4395 return local;
4396 }
4397
4398 static inline void
4399 _dispatch_mach_msg_set_reason(dispatch_mach_msg_t dmsg, mach_error_t err,
4400 unsigned long reason)
4401 {
4402 dispatch_assert_zero(reason & ~(unsigned long)code_emask);
4403 dmsg->dmsg_error = ((err || !reason) ? err :
4404 err_local|err_sub(0x3e0)|(mach_error_t)reason);
4405 }
4406
4407 static inline unsigned long
4408 _dispatch_mach_msg_get_reason(dispatch_mach_msg_t dmsg, mach_error_t *err_ptr)
4409 {
4410 mach_error_t err = dmsg->dmsg_error;
4411
4412 dmsg->dmsg_error = 0;
4413 if ((err & system_emask) == err_local && err_get_sub(err) == 0x3e0) {
4414 *err_ptr = 0;
4415 return err_get_code(err);
4416 }
4417 *err_ptr = err;
4418 return err ? DISPATCH_MACH_MESSAGE_SEND_FAILED : DISPATCH_MACH_MESSAGE_SENT;
4419 }
4420
4421 static void
4422 _dispatch_mach_msg_recv(dispatch_mach_t dm, dispatch_mach_reply_refs_t dmr,
4423 _dispatch_kevent_qos_s *ke, mach_msg_header_t *hdr, mach_msg_size_t siz)
4424 {
4425 _dispatch_debug_machport(hdr->msgh_remote_port);
4426 _dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x",
4427 hdr->msgh_local_port, hdr->msgh_id, hdr->msgh_remote_port);
4428 bool canceled = (dm->dq_atomic_flags & DSF_CANCELED);
4429 if (!dmr && canceled) {
4430 // message received after cancellation, _dispatch_mach_kevent_merge is
4431 // responsible for mach channel source state (e.g. deferred deletion)
4432 return _dispatch_kevent_mach_msg_destroy(ke, hdr);
4433 }
4434 dispatch_mach_msg_t dmsg;
4435 voucher_t voucher;
4436 pthread_priority_t priority;
4437 void *ctxt = NULL;
4438 if (dmr) {
4439 _voucher_mach_msg_clear(hdr, false); // deallocate reply message voucher
4440 voucher = dmr->dmr_voucher;
4441 dmr->dmr_voucher = NULL; // transfer reference
4442 priority = dmr->dmr_priority;
4443 ctxt = dmr->dmr_ctxt;
4444 unsigned int options = DKEV_DISPOSE_IMMEDIATE_DELETE;
4445 options |= DKEV_UNREGISTER_REPLY_REMOVE;
4446 options |= DKEV_UNREGISTER_WAKEUP;
4447 if (canceled) options |= DKEV_UNREGISTER_DISCONNECTED;
4448 _dispatch_mach_reply_kevent_unregister(dm, dmr, options);
4449 ke->flags |= EV_DELETE; // remember that unregister deleted the event
4450 if (canceled) return;
4451 } else {
4452 voucher = voucher_create_with_mach_msg(hdr);
4453 priority = _voucher_get_priority(voucher);
4454 }
4455 dispatch_mach_msg_destructor_t destructor;
4456 destructor = (hdr == _dispatch_kevent_mach_msg_buf(ke)) ?
4457 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT :
4458 DISPATCH_MACH_MSG_DESTRUCTOR_FREE;
4459 dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL);
4460 if (hdr == _dispatch_kevent_mach_msg_buf(ke)) {
4461 _dispatch_ktrace2(DISPATCH_MACH_MSG_hdr_move, (uint64_t)hdr, (uint64_t)dmsg->dmsg_buf);
4462 }
4463 dmsg->dmsg_voucher = voucher;
4464 dmsg->dmsg_priority = priority;
4465 dmsg->do_ctxt = ctxt;
4466 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_RECEIVED);
4467 _dispatch_voucher_debug("mach-msg[%p] create", voucher, dmsg);
4468 _dispatch_voucher_ktrace_dmsg_push(dmsg);
4469 return _dispatch_queue_push(dm->_as_dq, dmsg, dmsg->dmsg_priority);
4470 }
4471
4472 DISPATCH_ALWAYS_INLINE
4473 static inline dispatch_mach_msg_t
4474 _dispatch_mach_msg_reply_recv(dispatch_mach_t dm,
4475 dispatch_mach_reply_refs_t dmr, mach_port_t reply_port)
4476 {
4477 if (slowpath(!MACH_PORT_VALID(reply_port))) {
4478 DISPATCH_CLIENT_CRASH(reply_port, "Invalid reply port");
4479 }
4480 void *ctxt = dmr->dmr_ctxt;
4481 mach_msg_header_t *hdr, *hdr2 = NULL;
4482 void *hdr_copyout_addr;
4483 mach_msg_size_t siz, msgsiz = 0;
4484 mach_msg_return_t kr;
4485 mach_msg_option_t options;
4486 siz = mach_vm_round_page(_dispatch_mach_recv_msg_size +
4487 dispatch_mach_trailer_size);
4488 hdr = alloca(siz);
4489 for (mach_vm_address_t p = mach_vm_trunc_page(hdr + vm_page_size);
4490 p < (mach_vm_address_t)hdr + siz; p += vm_page_size) {
4491 *(char*)p = 0; // ensure alloca buffer doesn't overlap with stack guard
4492 }
4493 options = DISPATCH_MACH_RCV_OPTIONS & (~MACH_RCV_VOUCHER);
4494 retry:
4495 _dispatch_debug_machport(reply_port);
4496 _dispatch_debug("machport[0x%08x]: MACH_RCV_MSG %s", reply_port,
4497 (options & MACH_RCV_TIMEOUT) ? "poll" : "wait");
4498 kr = mach_msg(hdr, options, 0, siz, reply_port, MACH_MSG_TIMEOUT_NONE,
4499 MACH_PORT_NULL);
4500 hdr_copyout_addr = hdr;
4501 _dispatch_debug_machport(reply_port);
4502 _dispatch_debug("machport[0x%08x]: MACH_RCV_MSG (size %u, opts 0x%x) "
4503 "returned: %s - 0x%x", reply_port, siz, options,
4504 mach_error_string(kr), kr);
4505 switch (kr) {
4506 case MACH_RCV_TOO_LARGE:
4507 if (!fastpath(hdr->msgh_size <= UINT_MAX -
4508 dispatch_mach_trailer_size)) {
4509 DISPATCH_CLIENT_CRASH(hdr->msgh_size, "Overlarge message");
4510 }
4511 if (options & MACH_RCV_LARGE) {
4512 msgsiz = hdr->msgh_size + dispatch_mach_trailer_size;
4513 hdr2 = malloc(msgsiz);
4514 if (dispatch_assume(hdr2)) {
4515 hdr = hdr2;
4516 siz = msgsiz;
4517 }
4518 options |= MACH_RCV_TIMEOUT;
4519 options &= ~MACH_RCV_LARGE;
4520 goto retry;
4521 }
4522 _dispatch_log("BUG in libdispatch client: "
4523 "dispatch_mach_send_and_wait_for_reply: dropped message too "
4524 "large to fit in memory: id = 0x%x, size = %u", hdr->msgh_id,
4525 hdr->msgh_size);
4526 break;
4527 case MACH_RCV_INVALID_NAME: // rdar://problem/21963848
4528 case MACH_RCV_PORT_CHANGED: // rdar://problem/21885327
4529 case MACH_RCV_PORT_DIED:
4530 // channel was disconnected/canceled and reply port destroyed
4531 _dispatch_debug("machport[0x%08x]: sync reply port destroyed, ctxt %p: "
4532 "%s - 0x%x", reply_port, ctxt, mach_error_string(kr), kr);
4533 goto out;
4534 case MACH_MSG_SUCCESS:
4535 if (hdr->msgh_remote_port) {
4536 _dispatch_debug_machport(hdr->msgh_remote_port);
4537 }
4538 _dispatch_debug("machport[0x%08x]: received msg id 0x%x, size = %u, "
4539 "reply on 0x%08x", hdr->msgh_local_port, hdr->msgh_id,
4540 hdr->msgh_size, hdr->msgh_remote_port);
4541 siz = hdr->msgh_size + dispatch_mach_trailer_size;
4542 if (hdr2 && siz < msgsiz) {
4543 void *shrink = realloc(hdr2, msgsiz);
4544 if (shrink) hdr = hdr2 = shrink;
4545 }
4546 break;
4547 default:
4548 dispatch_assume_zero(kr);
4549 break;
4550 }
4551 _dispatch_mach_msg_reply_received(dm, dmr, hdr->msgh_local_port);
4552 hdr->msgh_local_port = MACH_PORT_NULL;
4553 if (slowpath((dm->dq_atomic_flags & DSF_CANCELED) || kr)) {
4554 if (!kr) mach_msg_destroy(hdr);
4555 goto out;
4556 }
4557 dispatch_mach_msg_t dmsg;
4558 dispatch_mach_msg_destructor_t destructor = (!hdr2) ?
4559 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT :
4560 DISPATCH_MACH_MSG_DESTRUCTOR_FREE;
4561 dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL);
4562 if (!hdr2 || hdr != hdr_copyout_addr) {
4563 _dispatch_ktrace2(DISPATCH_MACH_MSG_hdr_move, (uint64_t)hdr_copyout_addr, (uint64_t)_dispatch_mach_msg_get_msg(dmsg));
4564 }
4565 dmsg->do_ctxt = ctxt;
4566 return dmsg;
4567 out:
4568 free(hdr2);
4569 return NULL;
4570 }
4571
4572 static inline void
4573 _dispatch_mach_msg_reply_received(dispatch_mach_t dm,
4574 dispatch_mach_reply_refs_t dmr, mach_port_t local_port)
4575 {
4576 bool removed = _dispatch_mach_reply_tryremove(dm, dmr);
4577 if (!MACH_PORT_VALID(local_port) || !removed) {
4578 // port moved/destroyed during receive, or reply waiter was never
4579 // registered or already removed (disconnected)
4580 return;
4581 }
4582 mach_port_t reply_port = _dispatch_mach_reply_get_reply_port(dmr);
4583 _dispatch_debug("machport[0x%08x]: unregistered for sync reply, ctxt %p",
4584 reply_port, dmr->dmr_ctxt);
4585 if (_dispatch_mach_reply_is_reply_port_owned(dmr)) {
4586 _dispatch_set_thread_reply_port(reply_port);
4587 if (local_port != reply_port) {
4588 DISPATCH_CLIENT_CRASH(local_port,
4589 "Reply received on unexpected port");
4590 }
4591 return;
4592 }
4593 mach_msg_header_t *hdr;
4594 dispatch_mach_msg_t dmsg;
4595 dmsg = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
4596 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
4597 hdr->msgh_local_port = local_port;
4598 dmsg->dmsg_voucher = dmr->dmr_voucher;
4599 dmr->dmr_voucher = NULL; // transfer reference
4600 dmsg->dmsg_priority = dmr->dmr_priority;
4601 dmsg->do_ctxt = dmr->dmr_ctxt;
4602 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_REPLY_RECEIVED);
4603 return _dispatch_queue_push(dm->_as_dq, dmsg, dmsg->dmsg_priority);
4604 }
4605
4606 static inline void
4607 _dispatch_mach_msg_disconnected(dispatch_mach_t dm, mach_port_t local_port,
4608 mach_port_t remote_port)
4609 {
4610 mach_msg_header_t *hdr;
4611 dispatch_mach_msg_t dmsg;
4612 dmsg = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
4613 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
4614 if (local_port) hdr->msgh_local_port = local_port;
4615 if (remote_port) hdr->msgh_remote_port = remote_port;
4616 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_DISCONNECTED);
4617 _dispatch_debug("machport[0x%08x]: %s right disconnected", local_port ?
4618 local_port : remote_port, local_port ? "receive" : "send");
4619 return _dispatch_queue_push(dm->_as_dq, dmsg, dmsg->dmsg_priority);
4620 }
4621
4622 static inline dispatch_mach_msg_t
4623 _dispatch_mach_msg_create_reply_disconnected(dispatch_object_t dou,
4624 dispatch_mach_reply_refs_t dmr)
4625 {
4626 dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr;
4627 mach_port_t reply_port = dmsg ? dmsg->dmsg_reply :
4628 _dispatch_mach_reply_get_reply_port(dmr);
4629 voucher_t v;
4630
4631 if (!reply_port) {
4632 if (!dmsg) {
4633 v = dmr->dmr_voucher;
4634 dmr->dmr_voucher = NULL; // transfer reference
4635 if (v) _voucher_release(v);
4636 }
4637 return NULL;
4638 }
4639
4640 if (dmsg) {
4641 v = dmsg->dmsg_voucher;
4642 if (v) _voucher_retain(v);
4643 } else {
4644 v = dmr->dmr_voucher;
4645 dmr->dmr_voucher = NULL; // transfer reference
4646 }
4647
4648 if ((dmsg && (dmsg->dmsg_options & DISPATCH_MACH_WAIT_FOR_REPLY) &&
4649 (dmsg->dmsg_options & DISPATCH_MACH_OWNED_REPLY_PORT)) ||
4650 (dmr && !dmr->dmr_dkev &&
4651 _dispatch_mach_reply_is_reply_port_owned(dmr))) {
4652 if (v) _voucher_release(v);
4653 // deallocate owned reply port to break _dispatch_mach_msg_reply_recv
4654 // out of waiting in mach_msg(MACH_RCV_MSG)
4655 kern_return_t kr = mach_port_mod_refs(mach_task_self(), reply_port,
4656 MACH_PORT_RIGHT_RECEIVE, -1);
4657 DISPATCH_VERIFY_MIG(kr);
4658 dispatch_assume_zero(kr);
4659 return NULL;
4660 }
4661
4662 mach_msg_header_t *hdr;
4663 dmsgr = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
4664 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
4665 dmsgr->dmsg_voucher = v;
4666 hdr->msgh_local_port = reply_port;
4667 if (dmsg) {
4668 dmsgr->dmsg_priority = dmsg->dmsg_priority;
4669 dmsgr->do_ctxt = dmsg->do_ctxt;
4670 } else {
4671 dmsgr->dmsg_priority = dmr->dmr_priority;
4672 dmsgr->do_ctxt = dmr->dmr_ctxt;
4673 }
4674 _dispatch_mach_msg_set_reason(dmsgr, 0, DISPATCH_MACH_DISCONNECTED);
4675 _dispatch_debug("machport[0x%08x]: reply disconnected, ctxt %p",
4676 hdr->msgh_local_port, dmsgr->do_ctxt);
4677 return dmsgr;
4678 }
4679
4680 DISPATCH_NOINLINE
4681 static void
4682 _dispatch_mach_msg_not_sent(dispatch_mach_t dm, dispatch_object_t dou)
4683 {
4684 dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr;
4685 mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
4686 mach_msg_option_t msg_opts = dmsg->dmsg_options;
4687 _dispatch_debug("machport[0x%08x]: not sent msg id 0x%x, ctxt %p, "
4688 "msg_opts 0x%x, kvoucher 0x%08x, reply on 0x%08x",
4689 msg->msgh_remote_port, msg->msgh_id, dmsg->do_ctxt,
4690 msg_opts, msg->msgh_voucher_port, dmsg->dmsg_reply);
4691 unsigned long reason = (msg_opts & DISPATCH_MACH_REGISTER_FOR_REPLY) ?
4692 0 : DISPATCH_MACH_MESSAGE_NOT_SENT;
4693 dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL);
4694 _dispatch_mach_msg_set_reason(dmsg, 0, reason);
4695 _dispatch_queue_push(dm->_as_dq, dmsg, dmsg->dmsg_priority);
4696 if (dmsgr) _dispatch_queue_push(dm->_as_dq, dmsgr, dmsgr->dmsg_priority);
4697 }
4698
4699 DISPATCH_NOINLINE
4700 static uint32_t
4701 _dispatch_mach_msg_send(dispatch_mach_t dm, dispatch_object_t dou,
4702 dispatch_mach_reply_refs_t dmr, pthread_priority_t pp,
4703 dispatch_mach_send_invoke_flags_t send_flags)
4704 {
4705 dispatch_mach_send_refs_t dr = dm->dm_refs;
4706 dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr = NULL;
4707 voucher_t voucher = dmsg->dmsg_voucher;
4708 mach_voucher_t ipc_kvoucher = MACH_VOUCHER_NULL;
4709 uint32_t send_status = 0;
4710 bool clear_voucher = false, kvoucher_move_send = false;
4711 mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
4712 bool is_reply = (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) ==
4713 MACH_MSG_TYPE_MOVE_SEND_ONCE);
4714 mach_port_t reply_port = dmsg->dmsg_reply;
4715 if (!is_reply) {
4716 dr->dm_needs_mgr = 0;
4717 if (unlikely(dr->dm_checkin && dmsg != dr->dm_checkin)) {
4718 // send initial checkin message
4719 if (dm->dm_dkev && slowpath(_dispatch_queue_get_current() !=
4720 &_dispatch_mgr_q)) {
4721 // send kevent must be uninstalled on the manager queue
4722 dr->dm_needs_mgr = 1;
4723 goto out;
4724 }
4725 if (unlikely(!_dispatch_mach_msg_send(dm,
4726 dr->dm_checkin, NULL, pp, DM_SEND_INVOKE_NONE))) {
4727 goto out;
4728 }
4729 dr->dm_checkin = NULL;
4730 }
4731 }
4732 mach_msg_return_t kr = 0;
4733 mach_msg_option_t opts = 0, msg_opts = dmsg->dmsg_options;
4734 if (!(msg_opts & DISPATCH_MACH_REGISTER_FOR_REPLY)) {
4735 mach_msg_priority_t msg_priority = MACH_MSG_PRIORITY_UNSPECIFIED;
4736 opts = MACH_SEND_MSG | (msg_opts & ~DISPATCH_MACH_OPTIONS_MASK);
4737 if (!is_reply) {
4738 if (dmsg != dr->dm_checkin) {
4739 msg->msgh_remote_port = dr->dm_send;
4740 }
4741 if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
4742 if (slowpath(!dm->dm_dkev)) {
4743 _dispatch_mach_notification_kevent_register(dm,
4744 msg->msgh_remote_port);
4745 }
4746 if (fastpath(dm->dm_dkev)) {
4747 if (DISPATCH_MACH_NOTIFICATION_ARMED(dm->dm_dkev)) {
4748 goto out;
4749 }
4750 opts |= MACH_SEND_NOTIFY;
4751 }
4752 }
4753 opts |= MACH_SEND_TIMEOUT;
4754 if (dmsg->dmsg_priority != _voucher_get_priority(voucher)) {
4755 ipc_kvoucher = _voucher_create_mach_voucher_with_priority(
4756 voucher, dmsg->dmsg_priority);
4757 }
4758 _dispatch_voucher_debug("mach-msg[%p] msg_set", voucher, dmsg);
4759 if (ipc_kvoucher) {
4760 kvoucher_move_send = true;
4761 clear_voucher = _voucher_mach_msg_set_mach_voucher(msg,
4762 ipc_kvoucher, kvoucher_move_send);
4763 } else {
4764 clear_voucher = _voucher_mach_msg_set(msg, voucher);
4765 }
4766 if (pp && _dispatch_evfilt_machport_direct_enabled) {
4767 opts |= MACH_SEND_OVERRIDE;
4768 msg_priority = (mach_msg_priority_t)pp;
4769 }
4770 }
4771 _dispatch_debug_machport(msg->msgh_remote_port);
4772 if (reply_port) _dispatch_debug_machport(reply_port);
4773 if (msg_opts & DISPATCH_MACH_WAIT_FOR_REPLY) {
4774 if (msg_opts & DISPATCH_MACH_OWNED_REPLY_PORT) {
4775 _dispatch_clear_thread_reply_port(reply_port);
4776 }
4777 _dispatch_mach_reply_waiter_register(dm, dmr, reply_port, dmsg,
4778 msg_opts);
4779 }
4780 kr = mach_msg(msg, opts, msg->msgh_size, 0, MACH_PORT_NULL, 0,
4781 msg_priority);
4782 _dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, "
4783 "opts 0x%x, msg_opts 0x%x, kvoucher 0x%08x, reply on 0x%08x: "
4784 "%s - 0x%x", msg->msgh_remote_port, msg->msgh_id, dmsg->do_ctxt,
4785 opts, msg_opts, msg->msgh_voucher_port, reply_port,
4786 mach_error_string(kr), kr);
4787 if (unlikely(kr && (msg_opts & DISPATCH_MACH_WAIT_FOR_REPLY))) {
4788 _dispatch_mach_reply_waiter_unregister(dm, dmr,
4789 DKEV_UNREGISTER_REPLY_REMOVE);
4790 }
4791 if (clear_voucher) {
4792 if (kr == MACH_SEND_INVALID_VOUCHER && msg->msgh_voucher_port) {
4793 DISPATCH_CLIENT_CRASH(kr, "Voucher port corruption");
4794 }
4795 mach_voucher_t kv;
4796 kv = _voucher_mach_msg_clear(msg, kvoucher_move_send);
4797 if (kvoucher_move_send) ipc_kvoucher = kv;
4798 }
4799 }
4800 if (kr == MACH_SEND_TIMED_OUT && (opts & MACH_SEND_TIMEOUT)) {
4801 if (opts & MACH_SEND_NOTIFY) {
4802 _dispatch_debug("machport[0x%08x]: send-possible notification "
4803 "armed", (mach_port_t)dm->dm_dkev->dk_kevent.ident);
4804 DISPATCH_MACH_NOTIFICATION_ARMED(dm->dm_dkev) = 1;
4805 } else {
4806 // send kevent must be installed on the manager queue
4807 dr->dm_needs_mgr = 1;
4808 }
4809 if (ipc_kvoucher) {
4810 _dispatch_kvoucher_debug("reuse on re-send", ipc_kvoucher);
4811 voucher_t ipc_voucher;
4812 ipc_voucher = _voucher_create_with_priority_and_mach_voucher(
4813 voucher, dmsg->dmsg_priority, ipc_kvoucher);
4814 _dispatch_voucher_debug("mach-msg[%p] replace voucher[%p]",
4815 ipc_voucher, dmsg, voucher);
4816 if (dmsg->dmsg_voucher) _voucher_release(dmsg->dmsg_voucher);
4817 dmsg->dmsg_voucher = ipc_voucher;
4818 }
4819 goto out;
4820 } else if (ipc_kvoucher && (kr || !kvoucher_move_send)) {
4821 _voucher_dealloc_mach_voucher(ipc_kvoucher);
4822 }
4823 if (!(msg_opts & DISPATCH_MACH_WAIT_FOR_REPLY) && !kr && reply_port &&
4824 !(dm->ds_dkev && dm->ds_dkev->dk_kevent.ident == reply_port)) {
4825 if (!dm->ds_is_direct_kevent &&
4826 _dispatch_queue_get_current() != &_dispatch_mgr_q) {
4827 // reply receive kevent must be installed on the manager queue
4828 dr->dm_needs_mgr = 1;
4829 dmsg->dmsg_options = msg_opts | DISPATCH_MACH_REGISTER_FOR_REPLY;
4830 goto out;
4831 }
4832 _dispatch_mach_reply_kevent_register(dm, reply_port, dmsg);
4833 }
4834 if (unlikely(!is_reply && dmsg == dr->dm_checkin && dm->dm_dkev)) {
4835 _dispatch_mach_notification_kevent_unregister(dm);
4836 }
4837 if (slowpath(kr)) {
4838 // Send failed, so reply was never registered <rdar://problem/14309159>
4839 dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL);
4840 }
4841 _dispatch_mach_msg_set_reason(dmsg, kr, 0);
4842 if ((send_flags & DM_SEND_INVOKE_IMMEDIATE_SEND) &&
4843 (msg_opts & DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT)) {
4844 // Return sent message synchronously <rdar://problem/25947334>
4845 send_status |= DM_SEND_STATUS_RETURNING_IMMEDIATE_SEND_RESULT;
4846 } else {
4847 _dispatch_queue_push(dm->_as_dq, dmsg, dmsg->dmsg_priority);
4848 }
4849 if (dmsgr) _dispatch_queue_push(dm->_as_dq, dmsgr, dmsgr->dmsg_priority);
4850 send_status |= DM_SEND_STATUS_SUCCESS;
4851 out:
4852 return send_status;
4853 }
4854
4855 #pragma mark -
4856 #pragma mark dispatch_mach_send_refs_t
4857
4858 static void _dispatch_mach_cancel(dispatch_mach_t dm);
4859 static void _dispatch_mach_send_barrier_drain_push(dispatch_mach_t dm,
4860 pthread_priority_t pp);
4861
4862 DISPATCH_ALWAYS_INLINE
4863 static inline pthread_priority_t
4864 _dm_state_get_override(uint64_t dm_state)
4865 {
4866 dm_state &= DISPATCH_MACH_STATE_OVERRIDE_MASK;
4867 return (pthread_priority_t)(dm_state >> 32);
4868 }
4869
4870 DISPATCH_ALWAYS_INLINE
4871 static inline uint64_t
4872 _dm_state_override_from_priority(pthread_priority_t pp)
4873 {
4874 uint64_t pp_state = pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
4875 return pp_state << 32;
4876 }
4877
4878 DISPATCH_ALWAYS_INLINE
4879 static inline bool
4880 _dm_state_needs_override(uint64_t dm_state, uint64_t pp_state)
4881 {
4882 return (pp_state > (dm_state & DISPATCH_MACH_STATE_OVERRIDE_MASK));
4883 }
4884
4885 DISPATCH_ALWAYS_INLINE
4886 static inline uint64_t
4887 _dm_state_merge_override(uint64_t dm_state, uint64_t pp_state)
4888 {
4889 if (_dm_state_needs_override(dm_state, pp_state)) {
4890 dm_state &= ~DISPATCH_MACH_STATE_OVERRIDE_MASK;
4891 dm_state |= pp_state;
4892 dm_state |= DISPATCH_MACH_STATE_DIRTY;
4893 dm_state |= DISPATCH_MACH_STATE_RECEIVED_OVERRIDE;
4894 }
4895 return dm_state;
4896 }
4897
4898 #define _dispatch_mach_send_push_update_tail(dr, tail) \
4899 os_mpsc_push_update_tail(dr, dm, tail, do_next)
4900 #define _dispatch_mach_send_push_update_head(dr, head) \
4901 os_mpsc_push_update_head(dr, dm, head)
4902 #define _dispatch_mach_send_get_head(dr) \
4903 os_mpsc_get_head(dr, dm)
4904 #define _dispatch_mach_send_unpop_head(dr, dc, dc_next) \
4905 os_mpsc_undo_pop_head(dr, dm, dc, dc_next, do_next)
4906 #define _dispatch_mach_send_pop_head(dr, head) \
4907 os_mpsc_pop_head(dr, dm, head, do_next)
4908
4909 DISPATCH_ALWAYS_INLINE
4910 static inline bool
4911 _dispatch_mach_send_push_inline(dispatch_mach_send_refs_t dr,
4912 dispatch_object_t dou)
4913 {
4914 if (_dispatch_mach_send_push_update_tail(dr, dou._do)) {
4915 _dispatch_mach_send_push_update_head(dr, dou._do);
4916 return true;
4917 }
4918 return false;
4919 }
4920
4921 DISPATCH_NOINLINE
4922 static bool
4923 _dispatch_mach_send_drain(dispatch_mach_t dm, dispatch_invoke_flags_t flags,
4924 dispatch_mach_send_invoke_flags_t send_flags)
4925 {
4926 dispatch_mach_send_refs_t dr = dm->dm_refs;
4927 dispatch_mach_reply_refs_t dmr;
4928 dispatch_mach_msg_t dmsg;
4929 struct dispatch_object_s *dc = NULL, *next_dc = NULL;
4930 pthread_priority_t pp = _dm_state_get_override(dr->dm_state);
4931 uint64_t old_state, new_state;
4932 uint32_t send_status;
4933 bool needs_mgr, disconnecting, returning_send_result = false;
4934
4935 again:
4936 needs_mgr = false; disconnecting = false;
4937 while (dr->dm_tail) {
4938 dc = _dispatch_mach_send_get_head(dr);
4939 do {
4940 dispatch_mach_send_invoke_flags_t sf = send_flags;
4941 // Only request immediate send result for the first message
4942 send_flags &= ~DM_SEND_INVOKE_IMMEDIATE_SEND_MASK;
4943 next_dc = _dispatch_mach_send_pop_head(dr, dc);
4944 if (_dispatch_object_has_type(dc,
4945 DISPATCH_CONTINUATION_TYPE(MACH_SEND_BARRIER))) {
4946 if (!(send_flags & DM_SEND_INVOKE_CAN_RUN_BARRIER)) {
4947 goto partial_drain;
4948 }
4949 _dispatch_continuation_pop(dc, dm->_as_dq, flags);
4950 continue;
4951 }
4952 if (_dispatch_object_is_slow_item(dc)) {
4953 dmsg = ((dispatch_continuation_t)dc)->dc_data;
4954 dmr = ((dispatch_continuation_t)dc)->dc_other;
4955 } else if (_dispatch_object_has_vtable(dc)) {
4956 dmsg = (dispatch_mach_msg_t)dc;
4957 dmr = NULL;
4958 } else {
4959 if ((dm->dm_dkev || !dm->ds_is_direct_kevent) &&
4960 (_dispatch_queue_get_current() != &_dispatch_mgr_q)) {
4961 // send kevent must be uninstalled on the manager queue
4962 needs_mgr = true;
4963 goto partial_drain;
4964 }
4965 if (unlikely(!_dispatch_mach_reconnect_invoke(dm, dc))) {
4966 disconnecting = true;
4967 goto partial_drain;
4968 }
4969 continue;
4970 }
4971 _dispatch_voucher_ktrace_dmsg_pop(dmsg);
4972 if (unlikely(dr->dm_disconnect_cnt ||
4973 (dm->dq_atomic_flags & DSF_CANCELED))) {
4974 _dispatch_mach_msg_not_sent(dm, dmsg);
4975 continue;
4976 }
4977 send_status = _dispatch_mach_msg_send(dm, dmsg, dmr, pp, sf);
4978 if (unlikely(!send_status)) {
4979 goto partial_drain;
4980 }
4981 if (send_status & DM_SEND_STATUS_RETURNING_IMMEDIATE_SEND_RESULT) {
4982 returning_send_result = true;
4983 }
4984 } while ((dc = next_dc));
4985 }
4986
4987 os_atomic_rmw_loop2o(dr, dm_state, old_state, new_state, release, {
4988 if (old_state & DISPATCH_MACH_STATE_DIRTY) {
4989 new_state = old_state;
4990 new_state &= ~DISPATCH_MACH_STATE_DIRTY;
4991 new_state &= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE;
4992 new_state &= ~DISPATCH_MACH_STATE_PENDING_BARRIER;
4993 } else {
4994 // unlock
4995 new_state = 0;
4996 }
4997 });
4998 goto out;
4999
5000 partial_drain:
5001 // if this is not a complete drain, we must undo some things
5002 _dispatch_mach_send_unpop_head(dr, dc, next_dc);
5003
5004 if (_dispatch_object_has_type(dc,
5005 DISPATCH_CONTINUATION_TYPE(MACH_SEND_BARRIER))) {
5006 os_atomic_rmw_loop2o(dr, dm_state, old_state, new_state, release, {
5007 new_state = old_state;
5008 new_state |= DISPATCH_MACH_STATE_DIRTY;
5009 new_state |= DISPATCH_MACH_STATE_PENDING_BARRIER;
5010 new_state &= ~DISPATCH_MACH_STATE_UNLOCK_MASK;
5011 });
5012 } else {
5013 os_atomic_rmw_loop2o(dr, dm_state, old_state, new_state, release, {
5014 new_state = old_state;
5015 if (old_state & (DISPATCH_MACH_STATE_DIRTY |
5016 DISPATCH_MACH_STATE_RECEIVED_OVERRIDE)) {
5017 new_state &= ~DISPATCH_MACH_STATE_DIRTY;
5018 new_state &= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE;
5019 new_state &= ~DISPATCH_MACH_STATE_PENDING_BARRIER;
5020 } else {
5021 new_state |= DISPATCH_MACH_STATE_DIRTY;
5022 new_state &= ~DISPATCH_MACH_STATE_UNLOCK_MASK;
5023 }
5024 });
5025 }
5026
5027 out:
5028 if (old_state & DISPATCH_MACH_STATE_RECEIVED_OVERRIDE) {
5029 // Ensure that the root queue sees that this thread was overridden.
5030 _dispatch_set_defaultpriority_override();
5031 }
5032
5033 if (unlikely(new_state & DISPATCH_MACH_STATE_UNLOCK_MASK)) {
5034 os_atomic_thread_fence(acquire);
5035 pp = _dm_state_get_override(new_state);
5036 goto again;
5037 }
5038
5039 if (new_state & DISPATCH_MACH_STATE_PENDING_BARRIER) {
5040 pp = _dm_state_get_override(new_state);
5041 _dispatch_mach_send_barrier_drain_push(dm, pp);
5042 } else {
5043 if (needs_mgr || dr->dm_needs_mgr) {
5044 pp = _dm_state_get_override(new_state);
5045 } else {
5046 pp = 0;
5047 }
5048 if (!disconnecting) dx_wakeup(dm, pp, DISPATCH_WAKEUP_FLUSH);
5049 }
5050 return returning_send_result;
5051 }
5052
5053 DISPATCH_NOINLINE
5054 static void
5055 _dispatch_mach_send_invoke(dispatch_mach_t dm,
5056 dispatch_invoke_flags_t flags,
5057 dispatch_mach_send_invoke_flags_t send_flags)
5058 {
5059 dispatch_lock_owner tid_self = _dispatch_tid_self();
5060 uint64_t old_state, new_state;
5061 pthread_priority_t pp_floor;
5062
5063 uint64_t canlock_mask = DISPATCH_MACH_STATE_UNLOCK_MASK;
5064 uint64_t canlock_state = 0;
5065
5066 if (send_flags & DM_SEND_INVOKE_NEEDS_BARRIER) {
5067 canlock_mask |= DISPATCH_MACH_STATE_PENDING_BARRIER;
5068 canlock_state = DISPATCH_MACH_STATE_PENDING_BARRIER;
5069 } else if (!(send_flags & DM_SEND_INVOKE_CAN_RUN_BARRIER)) {
5070 canlock_mask |= DISPATCH_MACH_STATE_PENDING_BARRIER;
5071 }
5072
5073 if (flags & DISPATCH_INVOKE_MANAGER_DRAIN) {
5074 pp_floor = 0;
5075 } else {
5076 // _dispatch_queue_class_invoke will have applied the queue override
5077 // (if any) before we get here. Else use the default base priority
5078 // as an estimation of the priority we already asked for.
5079 pp_floor = dm->_as_dq->dq_override;
5080 if (!pp_floor) {
5081 pp_floor = _dispatch_get_defaultpriority();
5082 pp_floor &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
5083 }
5084 }
5085
5086 retry:
5087 os_atomic_rmw_loop2o(dm->dm_refs, dm_state, old_state, new_state, acquire, {
5088 new_state = old_state;
5089 if (unlikely((old_state & canlock_mask) != canlock_state)) {
5090 if (!(send_flags & DM_SEND_INVOKE_FLUSH)) {
5091 os_atomic_rmw_loop_give_up(break);
5092 }
5093 new_state |= DISPATCH_MACH_STATE_DIRTY;
5094 } else {
5095 if (likely(pp_floor)) {
5096 pthread_priority_t pp = _dm_state_get_override(old_state);
5097 if (unlikely(pp > pp_floor)) {
5098 os_atomic_rmw_loop_give_up({
5099 _dispatch_wqthread_override_start(tid_self, pp);
5100 // Ensure that the root queue sees
5101 // that this thread was overridden.
5102 _dispatch_set_defaultpriority_override();
5103 pp_floor = pp;
5104 goto retry;
5105 });
5106 }
5107 }
5108 new_state |= tid_self;
5109 new_state &= ~DISPATCH_MACH_STATE_DIRTY;
5110 new_state &= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE;
5111 new_state &= ~DISPATCH_MACH_STATE_PENDING_BARRIER;
5112 }
5113 });
5114
5115 if (unlikely((old_state & canlock_mask) != canlock_state)) {
5116 return;
5117 }
5118 if (send_flags & DM_SEND_INVOKE_CANCEL) {
5119 _dispatch_mach_cancel(dm);
5120 }
5121 _dispatch_mach_send_drain(dm, flags, send_flags);
5122 }
5123
5124 DISPATCH_NOINLINE
5125 void
5126 _dispatch_mach_send_barrier_drain_invoke(dispatch_continuation_t dc,
5127 dispatch_invoke_flags_t flags)
5128 {
5129 dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current();
5130 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
5131 dispatch_thread_frame_s dtf;
5132
5133 DISPATCH_COMPILER_CAN_ASSUME(dc->dc_priority == DISPATCH_NO_PRIORITY);
5134 DISPATCH_COMPILER_CAN_ASSUME(dc->dc_voucher == DISPATCH_NO_VOUCHER);
5135 // hide the mach channel (see _dispatch_mach_barrier_invoke comment)
5136 _dispatch_thread_frame_stash(&dtf);
5137 _dispatch_continuation_pop_forwarded(dc, DISPATCH_NO_VOUCHER, dc_flags,{
5138 _dispatch_mach_send_invoke(dm, flags,
5139 DM_SEND_INVOKE_NEEDS_BARRIER | DM_SEND_INVOKE_CAN_RUN_BARRIER);
5140 });
5141 _dispatch_thread_frame_unstash(&dtf);
5142 }
5143
5144 DISPATCH_NOINLINE
5145 static void
5146 _dispatch_mach_send_barrier_drain_push(dispatch_mach_t dm,
5147 pthread_priority_t pp)
5148 {
5149 dispatch_continuation_t dc = _dispatch_continuation_alloc();
5150
5151 dc->do_vtable = DC_VTABLE(MACH_SEND_BARRRIER_DRAIN);
5152 dc->dc_func = NULL;
5153 dc->dc_ctxt = NULL;
5154 dc->dc_voucher = DISPATCH_NO_VOUCHER;
5155 dc->dc_priority = DISPATCH_NO_PRIORITY;
5156 return _dispatch_queue_push(dm->_as_dq, dc, pp);
5157 }
5158
5159 DISPATCH_NOINLINE
5160 static void
5161 _dispatch_mach_send_push(dispatch_mach_t dm, dispatch_continuation_t dc,
5162 pthread_priority_t pp)
5163 {
5164 dispatch_mach_send_refs_t dr = dm->dm_refs;
5165 uint64_t pp_state, old_state, new_state, state_flags = 0;
5166 dispatch_lock_owner owner;
5167 bool wakeup;
5168
5169 // <rdar://problem/25896179> when pushing a send barrier that destroys
5170 // the last reference to this channel, and the send queue is already
5171 // draining on another thread, the send barrier may run as soon as
5172 // _dispatch_mach_send_push_inline() returns.
5173 _dispatch_retain(dm);
5174 pp_state = _dm_state_override_from_priority(pp);
5175
5176 wakeup = _dispatch_mach_send_push_inline(dr, dc);
5177 if (wakeup) {
5178 state_flags = DISPATCH_MACH_STATE_DIRTY;
5179 if (dc->do_vtable == DC_VTABLE(MACH_SEND_BARRIER)) {
5180 state_flags |= DISPATCH_MACH_STATE_PENDING_BARRIER;
5181 }
5182 }
5183
5184 if (state_flags) {
5185 os_atomic_rmw_loop2o(dr, dm_state, old_state, new_state, release, {
5186 new_state = _dm_state_merge_override(old_state, pp_state);
5187 new_state |= state_flags;
5188 });
5189 } else {
5190 os_atomic_rmw_loop2o(dr, dm_state, old_state, new_state, relaxed, {
5191 new_state = _dm_state_merge_override(old_state, pp_state);
5192 if (old_state == new_state) {
5193 os_atomic_rmw_loop_give_up(break);
5194 }
5195 });
5196 }
5197
5198 pp = _dm_state_get_override(new_state);
5199 owner = _dispatch_lock_owner((dispatch_lock)old_state);
5200 if (owner) {
5201 if (_dm_state_needs_override(old_state, pp_state)) {
5202 _dispatch_wqthread_override_start_check_owner(owner, pp,
5203 &dr->dm_state_lock.dul_lock);
5204 }
5205 return _dispatch_release_tailcall(dm);
5206 }
5207
5208 dispatch_wakeup_flags_t wflags = 0;
5209 if (state_flags & DISPATCH_MACH_STATE_PENDING_BARRIER) {
5210 _dispatch_mach_send_barrier_drain_push(dm, pp);
5211 } else if (wakeup || dr->dm_disconnect_cnt ||
5212 (dm->dq_atomic_flags & DSF_CANCELED)) {
5213 wflags = DISPATCH_WAKEUP_FLUSH | DISPATCH_WAKEUP_CONSUME;
5214 } else if (old_state & DISPATCH_MACH_STATE_PENDING_BARRIER) {
5215 wflags = DISPATCH_WAKEUP_OVERRIDING | DISPATCH_WAKEUP_CONSUME;
5216 }
5217 if (wflags) {
5218 return dx_wakeup(dm, pp, wflags);
5219 }
5220 return _dispatch_release_tailcall(dm);
5221 }
5222
5223 DISPATCH_NOINLINE
5224 static bool
5225 _dispatch_mach_send_push_and_trydrain(dispatch_mach_t dm,
5226 dispatch_object_t dou, pthread_priority_t pp,
5227 dispatch_mach_send_invoke_flags_t send_flags)
5228 {
5229 dispatch_mach_send_refs_t dr = dm->dm_refs;
5230 dispatch_lock_owner tid_self = _dispatch_tid_self();
5231 uint64_t pp_state, old_state, new_state, canlock_mask, state_flags = 0;
5232 dispatch_lock_owner owner;
5233
5234 pp_state = _dm_state_override_from_priority(pp);
5235 bool wakeup = _dispatch_mach_send_push_inline(dr, dou);
5236 if (wakeup) {
5237 state_flags = DISPATCH_MACH_STATE_DIRTY;
5238 }
5239
5240 if (unlikely(dr->dm_disconnect_cnt ||
5241 (dm->dq_atomic_flags & DSF_CANCELED))) {
5242 os_atomic_rmw_loop2o(dr, dm_state, old_state, new_state, release, {
5243 new_state = _dm_state_merge_override(old_state, pp_state);
5244 new_state |= state_flags;
5245 });
5246 dx_wakeup(dm, pp, DISPATCH_WAKEUP_FLUSH);
5247 return false;
5248 }
5249
5250 canlock_mask = DISPATCH_MACH_STATE_UNLOCK_MASK |
5251 DISPATCH_MACH_STATE_PENDING_BARRIER;
5252 if (state_flags) {
5253 os_atomic_rmw_loop2o(dr, dm_state, old_state, new_state, seq_cst, {
5254 new_state = _dm_state_merge_override(old_state, pp_state);
5255 new_state |= state_flags;
5256 if (likely((old_state & canlock_mask) == 0)) {
5257 new_state |= tid_self;
5258 new_state &= ~DISPATCH_MACH_STATE_DIRTY;
5259 new_state &= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE;
5260 new_state &= ~DISPATCH_MACH_STATE_PENDING_BARRIER;
5261 }
5262 });
5263 } else {
5264 os_atomic_rmw_loop2o(dr, dm_state, old_state, new_state, acquire, {
5265 new_state = _dm_state_merge_override(old_state, pp_state);
5266 if (new_state == old_state) {
5267 os_atomic_rmw_loop_give_up(return false);
5268 }
5269 if (likely((old_state & canlock_mask) == 0)) {
5270 new_state |= tid_self;
5271 new_state &= ~DISPATCH_MACH_STATE_DIRTY;
5272 new_state &= ~DISPATCH_MACH_STATE_RECEIVED_OVERRIDE;
5273 new_state &= ~DISPATCH_MACH_STATE_PENDING_BARRIER;
5274 }
5275 });
5276 }
5277
5278 owner = _dispatch_lock_owner((dispatch_lock)old_state);
5279 if (owner) {
5280 if (_dm_state_needs_override(old_state, pp_state)) {
5281 _dispatch_wqthread_override_start_check_owner(owner, pp,
5282 &dr->dm_state_lock.dul_lock);
5283 }
5284 return false;
5285 }
5286
5287 if (old_state & DISPATCH_MACH_STATE_PENDING_BARRIER) {
5288 dx_wakeup(dm, pp, DISPATCH_WAKEUP_OVERRIDING);
5289 return false;
5290 }
5291
5292 // Ensure our message is still at the head of the queue and has not already
5293 // been dequeued by another thread that raced us to the send queue lock.
5294 // A plain load of the head and comparison against our object pointer is
5295 // sufficient.
5296 if (unlikely(!(wakeup && dou._do == dr->dm_head))) {
5297 // Don't request immediate send result for messages we don't own
5298 send_flags &= ~DM_SEND_INVOKE_IMMEDIATE_SEND_MASK;
5299 }
5300 return _dispatch_mach_send_drain(dm, DISPATCH_INVOKE_NONE, send_flags);
5301 }
5302
5303 static void
5304 _dispatch_mach_merge_notification_kevent(dispatch_mach_t dm,
5305 const _dispatch_kevent_qos_s *ke)
5306 {
5307 if (!(ke->fflags & dm->ds_pending_data_mask)) {
5308 return;
5309 }
5310 _dispatch_mach_send_invoke(dm, DISPATCH_INVOKE_MANAGER_DRAIN,
5311 DM_SEND_INVOKE_FLUSH);
5312 }
5313
5314 #pragma mark -
5315 #pragma mark dispatch_mach_t
5316
5317 static inline mach_msg_option_t
5318 _dispatch_mach_checkin_options(void)
5319 {
5320 mach_msg_option_t options = 0;
5321 #if DISPATCH_USE_CHECKIN_NOIMPORTANCE
5322 options = MACH_SEND_NOIMPORTANCE; // <rdar://problem/16996737>
5323 #endif
5324 return options;
5325 }
5326
5327
5328 static inline mach_msg_option_t
5329 _dispatch_mach_send_options(void)
5330 {
5331 mach_msg_option_t options = 0;
5332 return options;
5333 }
5334
5335 DISPATCH_ALWAYS_INLINE
5336 static inline pthread_priority_t
5337 _dispatch_mach_priority_propagate(mach_msg_option_t options)
5338 {
5339 #if DISPATCH_USE_NOIMPORTANCE_QOS
5340 if (options & MACH_SEND_NOIMPORTANCE) return 0;
5341 #else
5342 (void)options;
5343 #endif
5344 return _dispatch_priority_propagate();
5345 }
5346
5347 DISPATCH_NOINLINE
5348 static bool
5349 _dispatch_mach_send_msg(dispatch_mach_t dm, dispatch_mach_msg_t dmsg,
5350 dispatch_continuation_t dc_wait, mach_msg_option_t options)
5351 {
5352 dispatch_mach_send_refs_t dr = dm->dm_refs;
5353 if (slowpath(dmsg->do_next != DISPATCH_OBJECT_LISTLESS)) {
5354 DISPATCH_CLIENT_CRASH(dmsg->do_next, "Message already enqueued");
5355 }
5356 dispatch_retain(dmsg);
5357 pthread_priority_t priority = _dispatch_mach_priority_propagate(options);
5358 options |= _dispatch_mach_send_options();
5359 dmsg->dmsg_options = options;
5360 mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
5361 dmsg->dmsg_reply = _dispatch_mach_msg_get_reply_port(dmsg);
5362 bool is_reply = (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) ==
5363 MACH_MSG_TYPE_MOVE_SEND_ONCE);
5364 dmsg->dmsg_priority = priority;
5365 dmsg->dmsg_voucher = _voucher_copy();
5366 _dispatch_voucher_debug("mach-msg[%p] set", dmsg->dmsg_voucher, dmsg);
5367
5368 uint32_t send_status;
5369 bool returning_send_result = false;
5370 dispatch_mach_send_invoke_flags_t send_flags = DM_SEND_INVOKE_NONE;
5371 if (options & DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT) {
5372 send_flags = DM_SEND_INVOKE_IMMEDIATE_SEND;
5373 }
5374 if (is_reply && !dmsg->dmsg_reply && !dr->dm_disconnect_cnt &&
5375 !(dm->dq_atomic_flags & DSF_CANCELED)) {
5376 // replies are sent to a send-once right and don't need the send queue
5377 dispatch_assert(!dc_wait);
5378 send_status = _dispatch_mach_msg_send(dm, dmsg, NULL, 0, send_flags);
5379 dispatch_assert(send_status);
5380 returning_send_result = !!(send_status &
5381 DM_SEND_STATUS_RETURNING_IMMEDIATE_SEND_RESULT);
5382 } else {
5383 _dispatch_voucher_ktrace_dmsg_push(dmsg);
5384 priority &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
5385 dispatch_object_t dou = { ._dmsg = dmsg };
5386 if (dc_wait) dou._dc = dc_wait;
5387 returning_send_result = _dispatch_mach_send_push_and_trydrain(dm, dou,
5388 priority, send_flags);
5389 }
5390 if (returning_send_result) {
5391 _dispatch_voucher_debug("mach-msg[%p] clear", dmsg->dmsg_voucher, dmsg);
5392 if (dmsg->dmsg_voucher) _voucher_release(dmsg->dmsg_voucher);
5393 dmsg->dmsg_voucher = NULL;
5394 dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
5395 dispatch_release(dmsg);
5396 }
5397 return returning_send_result;
5398 }
5399
5400 DISPATCH_NOINLINE
5401 void
5402 dispatch_mach_send(dispatch_mach_t dm, dispatch_mach_msg_t dmsg,
5403 mach_msg_option_t options)
5404 {
5405 dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK);
5406 options &= ~DISPATCH_MACH_OPTIONS_MASK;
5407 bool returned_send_result = _dispatch_mach_send_msg(dm, dmsg, NULL,options);
5408 dispatch_assert(!returned_send_result);
5409 }
5410
5411 DISPATCH_NOINLINE
5412 void
5413 dispatch_mach_send_with_result(dispatch_mach_t dm, dispatch_mach_msg_t dmsg,
5414 mach_msg_option_t options, dispatch_mach_send_flags_t send_flags,
5415 dispatch_mach_reason_t *send_result, mach_error_t *send_error)
5416 {
5417 if (unlikely(send_flags != DISPATCH_MACH_SEND_DEFAULT)) {
5418 DISPATCH_CLIENT_CRASH(send_flags, "Invalid send flags");
5419 }
5420 dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK);
5421 options &= ~DISPATCH_MACH_OPTIONS_MASK;
5422 options |= DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT;
5423 bool returned_send_result = _dispatch_mach_send_msg(dm, dmsg, NULL,options);
5424 unsigned long reason = DISPATCH_MACH_NEEDS_DEFERRED_SEND;
5425 mach_error_t err = 0;
5426 if (returned_send_result) {
5427 reason = _dispatch_mach_msg_get_reason(dmsg, &err);
5428 }
5429 *send_result = reason;
5430 *send_error = err;
5431 }
5432
5433 static inline
5434 dispatch_mach_msg_t
5435 _dispatch_mach_send_and_wait_for_reply(dispatch_mach_t dm,
5436 dispatch_mach_msg_t dmsg, mach_msg_option_t options,
5437 bool *returned_send_result)
5438 {
5439 mach_port_t reply_port = _dispatch_mach_msg_get_reply_port(dmsg);
5440 if (!reply_port) {
5441 // use per-thread mach reply port <rdar://24597802>
5442 reply_port = _dispatch_get_thread_reply_port();
5443 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dmsg);
5444 dispatch_assert(MACH_MSGH_BITS_LOCAL(hdr->msgh_bits) ==
5445 MACH_MSG_TYPE_MAKE_SEND_ONCE);
5446 hdr->msgh_local_port = reply_port;
5447 options |= DISPATCH_MACH_OWNED_REPLY_PORT;
5448 }
5449
5450 dispatch_mach_reply_refs_t dmr;
5451 #if DISPATCH_DEBUG
5452 dmr = _dispatch_calloc(1, sizeof(*dmr));
5453 #else
5454 struct dispatch_mach_reply_refs_s dmr_buf = { };
5455 dmr = &dmr_buf;
5456 #endif
5457 struct dispatch_continuation_s dc_wait = {
5458 .dc_flags = DISPATCH_OBJ_SYNC_SLOW_BIT,
5459 .dc_data = dmsg,
5460 .dc_other = dmr,
5461 .dc_priority = DISPATCH_NO_PRIORITY,
5462 .dc_voucher = DISPATCH_NO_VOUCHER,
5463 };
5464 dmr->dmr_ctxt = dmsg->do_ctxt;
5465 *returned_send_result = _dispatch_mach_send_msg(dm, dmsg, &dc_wait,options);
5466 if (options & DISPATCH_MACH_OWNED_REPLY_PORT) {
5467 _dispatch_clear_thread_reply_port(reply_port);
5468 }
5469 dmsg = _dispatch_mach_msg_reply_recv(dm, dmr, reply_port);
5470 #if DISPATCH_DEBUG
5471 free(dmr);
5472 #endif
5473 return dmsg;
5474 }
5475
5476 DISPATCH_NOINLINE
5477 dispatch_mach_msg_t
5478 dispatch_mach_send_and_wait_for_reply(dispatch_mach_t dm,
5479 dispatch_mach_msg_t dmsg, mach_msg_option_t options)
5480 {
5481 bool returned_send_result;
5482 dispatch_mach_msg_t reply;
5483 dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK);
5484 options &= ~DISPATCH_MACH_OPTIONS_MASK;
5485 options |= DISPATCH_MACH_WAIT_FOR_REPLY;
5486 reply = _dispatch_mach_send_and_wait_for_reply(dm, dmsg, options,
5487 &returned_send_result);
5488 dispatch_assert(!returned_send_result);
5489 return reply;
5490 }
5491
5492 DISPATCH_NOINLINE
5493 dispatch_mach_msg_t
5494 dispatch_mach_send_with_result_and_wait_for_reply(dispatch_mach_t dm,
5495 dispatch_mach_msg_t dmsg, mach_msg_option_t options,
5496 dispatch_mach_send_flags_t send_flags,
5497 dispatch_mach_reason_t *send_result, mach_error_t *send_error)
5498 {
5499 if (unlikely(send_flags != DISPATCH_MACH_SEND_DEFAULT)) {
5500 DISPATCH_CLIENT_CRASH(send_flags, "Invalid send flags");
5501 }
5502 bool returned_send_result;
5503 dispatch_mach_msg_t reply;
5504 dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK);
5505 options &= ~DISPATCH_MACH_OPTIONS_MASK;
5506 options |= DISPATCH_MACH_WAIT_FOR_REPLY;
5507 options |= DISPATCH_MACH_RETURN_IMMEDIATE_SEND_RESULT;
5508 reply = _dispatch_mach_send_and_wait_for_reply(dm, dmsg, options,
5509 &returned_send_result);
5510 unsigned long reason = DISPATCH_MACH_NEEDS_DEFERRED_SEND;
5511 mach_error_t err = 0;
5512 if (returned_send_result) {
5513 reason = _dispatch_mach_msg_get_reason(dmsg, &err);
5514 }
5515 *send_result = reason;
5516 *send_error = err;
5517 return reply;
5518 }
5519
5520 DISPATCH_NOINLINE
5521 static bool
5522 _dispatch_mach_disconnect(dispatch_mach_t dm)
5523 {
5524 dispatch_mach_send_refs_t dr = dm->dm_refs;
5525 bool disconnected;
5526 if (dm->dm_dkev) {
5527 _dispatch_mach_notification_kevent_unregister(dm);
5528 }
5529 if (MACH_PORT_VALID(dr->dm_send)) {
5530 _dispatch_mach_msg_disconnected(dm, MACH_PORT_NULL, dr->dm_send);
5531 }
5532 dr->dm_send = MACH_PORT_NULL;
5533 if (dr->dm_checkin) {
5534 _dispatch_mach_msg_not_sent(dm, dr->dm_checkin);
5535 dr->dm_checkin = NULL;
5536 }
5537 _dispatch_unfair_lock_lock(&dm->dm_refs->dm_replies_lock);
5538 dispatch_mach_reply_refs_t dmr, tmp;
5539 TAILQ_FOREACH_SAFE(dmr, &dm->dm_refs->dm_replies, dmr_list, tmp) {
5540 TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dmr_list);
5541 _TAILQ_MARK_NOT_ENQUEUED(dmr, dmr_list);
5542 if (dmr->dmr_dkev) {
5543 _dispatch_mach_reply_kevent_unregister(dm, dmr,
5544 DKEV_UNREGISTER_DISCONNECTED);
5545 } else {
5546 _dispatch_mach_reply_waiter_unregister(dm, dmr,
5547 DKEV_UNREGISTER_DISCONNECTED);
5548 }
5549 }
5550 disconnected = TAILQ_EMPTY(&dm->dm_refs->dm_replies);
5551 _dispatch_unfair_lock_unlock(&dm->dm_refs->dm_replies_lock);
5552 return disconnected;
5553 }
5554
5555 static void
5556 _dispatch_mach_cancel(dispatch_mach_t dm)
5557 {
5558 _dispatch_object_debug(dm, "%s", __func__);
5559 if (!_dispatch_mach_disconnect(dm)) return;
5560 if (dm->ds_dkev) {
5561 mach_port_t local_port = (mach_port_t)dm->ds_dkev->dk_kevent.ident;
5562 _dispatch_source_kevent_unregister(dm->_as_ds);
5563 if ((dm->dq_atomic_flags & DSF_STATE_MASK) == DSF_DELETED) {
5564 _dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL);
5565 }
5566 } else {
5567 _dispatch_queue_atomic_flags_set_and_clear(dm->_as_dq, DSF_DELETED,
5568 DSF_ARMED | DSF_DEFERRED_DELETE);
5569 }
5570 }
5571
5572 DISPATCH_NOINLINE
5573 static bool
5574 _dispatch_mach_reconnect_invoke(dispatch_mach_t dm, dispatch_object_t dou)
5575 {
5576 if (!_dispatch_mach_disconnect(dm)) return false;
5577 dispatch_mach_send_refs_t dr = dm->dm_refs;
5578 dr->dm_checkin = dou._dc->dc_data;
5579 dr->dm_send = (mach_port_t)dou._dc->dc_other;
5580 _dispatch_continuation_free(dou._dc);
5581 (void)os_atomic_dec2o(dr, dm_disconnect_cnt, relaxed);
5582 _dispatch_object_debug(dm, "%s", __func__);
5583 _dispatch_release(dm); // <rdar://problem/26266265>
5584 return true;
5585 }
5586
5587 DISPATCH_NOINLINE
5588 void
5589 dispatch_mach_reconnect(dispatch_mach_t dm, mach_port_t send,
5590 dispatch_mach_msg_t checkin)
5591 {
5592 dispatch_mach_send_refs_t dr = dm->dm_refs;
5593 (void)os_atomic_inc2o(dr, dm_disconnect_cnt, relaxed);
5594 if (MACH_PORT_VALID(send) && checkin) {
5595 dispatch_retain(checkin);
5596 checkin->dmsg_options = _dispatch_mach_checkin_options();
5597 dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
5598 } else {
5599 checkin = NULL;
5600 dr->dm_checkin_port = MACH_PORT_NULL;
5601 }
5602 dispatch_continuation_t dc = _dispatch_continuation_alloc();
5603 dc->dc_flags = DISPATCH_OBJ_CONSUME_BIT;
5604 // actually called manually in _dispatch_mach_send_drain
5605 dc->dc_func = (void*)_dispatch_mach_reconnect_invoke;
5606 dc->dc_ctxt = dc;
5607 dc->dc_data = checkin;
5608 dc->dc_other = (void*)(uintptr_t)send;
5609 dc->dc_voucher = DISPATCH_NO_VOUCHER;
5610 dc->dc_priority = DISPATCH_NO_PRIORITY;
5611 _dispatch_retain(dm); // <rdar://problem/26266265>
5612 return _dispatch_mach_send_push(dm, dc, 0);
5613 }
5614
5615 DISPATCH_NOINLINE
5616 mach_port_t
5617 dispatch_mach_get_checkin_port(dispatch_mach_t dm)
5618 {
5619 dispatch_mach_send_refs_t dr = dm->dm_refs;
5620 if (slowpath(dm->dq_atomic_flags & DSF_CANCELED)) {
5621 return MACH_PORT_DEAD;
5622 }
5623 return dr->dm_checkin_port;
5624 }
5625
5626 DISPATCH_NOINLINE
5627 static void
5628 _dispatch_mach_connect_invoke(dispatch_mach_t dm)
5629 {
5630 dispatch_mach_refs_t dr = dm->ds_refs;
5631 _dispatch_client_callout4(dr->dm_handler_ctxt,
5632 DISPATCH_MACH_CONNECTED, NULL, 0, dr->dm_handler_func);
5633 dm->dm_connect_handler_called = 1;
5634 }
5635
5636 DISPATCH_NOINLINE
5637 void
5638 _dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg,
5639 dispatch_invoke_flags_t flags)
5640 {
5641 dispatch_thread_frame_s dtf;
5642 dispatch_mach_refs_t dr;
5643 dispatch_mach_t dm;
5644 mach_error_t err;
5645 unsigned long reason = _dispatch_mach_msg_get_reason(dmsg, &err);
5646 _dispatch_thread_set_self_t adopt_flags = DISPATCH_PRIORITY_ENFORCE|
5647 DISPATCH_VOUCHER_CONSUME|DISPATCH_VOUCHER_REPLACE;
5648
5649 // hide mach channel
5650 dm = (dispatch_mach_t)_dispatch_thread_frame_stash(&dtf);
5651 dr = dm->ds_refs;
5652 dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
5653 _dispatch_voucher_ktrace_dmsg_pop(dmsg);
5654 _dispatch_voucher_debug("mach-msg[%p] adopt", dmsg->dmsg_voucher, dmsg);
5655 (void)_dispatch_adopt_priority_and_set_voucher(dmsg->dmsg_priority,
5656 dmsg->dmsg_voucher, adopt_flags);
5657 dmsg->dmsg_voucher = NULL;
5658 dispatch_invoke_with_autoreleasepool(flags, {
5659 if (slowpath(!dm->dm_connect_handler_called)) {
5660 _dispatch_mach_connect_invoke(dm);
5661 }
5662 _dispatch_client_callout4(dr->dm_handler_ctxt, reason, dmsg, err,
5663 dr->dm_handler_func);
5664 });
5665 _dispatch_thread_frame_unstash(&dtf);
5666 _dispatch_introspection_queue_item_complete(dmsg);
5667 dispatch_release(dmsg);
5668 }
5669
5670 DISPATCH_NOINLINE
5671 void
5672 _dispatch_mach_barrier_invoke(dispatch_continuation_t dc,
5673 dispatch_invoke_flags_t flags)
5674 {
5675 dispatch_thread_frame_s dtf;
5676 dispatch_mach_t dm = dc->dc_other;
5677 dispatch_mach_refs_t dr;
5678 uintptr_t dc_flags = (uintptr_t)dc->dc_data;
5679 unsigned long type = dc_type(dc);
5680
5681 // hide mach channel from clients
5682 if (type == DISPATCH_CONTINUATION_TYPE(MACH_RECV_BARRIER)) {
5683 // on the send queue, the mach channel isn't the current queue
5684 // its target queue is the current one already
5685 _dispatch_thread_frame_stash(&dtf);
5686 }
5687 dr = dm->ds_refs;
5688 DISPATCH_COMPILER_CAN_ASSUME(dc_flags & DISPATCH_OBJ_CONSUME_BIT);
5689 _dispatch_continuation_pop_forwarded(dc, dm->dq_override_voucher, dc_flags,{
5690 dispatch_invoke_with_autoreleasepool(flags, {
5691 if (slowpath(!dm->dm_connect_handler_called)) {
5692 _dispatch_mach_connect_invoke(dm);
5693 }
5694 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
5695 _dispatch_client_callout4(dr->dm_handler_ctxt,
5696 DISPATCH_MACH_BARRIER_COMPLETED, NULL, 0,
5697 dr->dm_handler_func);
5698 });
5699 });
5700 if (type == DISPATCH_CONTINUATION_TYPE(MACH_RECV_BARRIER)) {
5701 _dispatch_thread_frame_unstash(&dtf);
5702 }
5703 }
5704
5705 DISPATCH_NOINLINE
5706 void
5707 dispatch_mach_send_barrier_f(dispatch_mach_t dm, void *context,
5708 dispatch_function_t func)
5709 {
5710 dispatch_continuation_t dc = _dispatch_continuation_alloc();
5711 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
5712 pthread_priority_t pp;
5713
5714 _dispatch_continuation_init_f(dc, dm, context, func, 0, 0, dc_flags);
5715 dc->dc_data = (void *)dc->dc_flags;
5716 dc->dc_other = dm;
5717 dc->do_vtable = DC_VTABLE(MACH_SEND_BARRIER);
5718 _dispatch_trace_continuation_push(dm->_as_dq, dc);
5719 pp = _dispatch_continuation_get_override_priority(dm->_as_dq, dc);
5720 return _dispatch_mach_send_push(dm, dc, pp);
5721 }
5722
5723 DISPATCH_NOINLINE
5724 void
5725 dispatch_mach_send_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
5726 {
5727 dispatch_continuation_t dc = _dispatch_continuation_alloc();
5728 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
5729 pthread_priority_t pp;
5730
5731 _dispatch_continuation_init(dc, dm, barrier, 0, 0, dc_flags);
5732 dc->dc_data = (void *)dc->dc_flags;
5733 dc->dc_other = dm;
5734 dc->do_vtable = DC_VTABLE(MACH_SEND_BARRIER);
5735 _dispatch_trace_continuation_push(dm->_as_dq, dc);
5736 pp = _dispatch_continuation_get_override_priority(dm->_as_dq, dc);
5737 return _dispatch_mach_send_push(dm, dc, pp);
5738 }
5739
5740 DISPATCH_NOINLINE
5741 void
5742 dispatch_mach_receive_barrier_f(dispatch_mach_t dm, void *context,
5743 dispatch_function_t func)
5744 {
5745 dispatch_continuation_t dc = _dispatch_continuation_alloc();
5746 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
5747
5748 _dispatch_continuation_init_f(dc, dm, context, func, 0, 0, dc_flags);
5749 dc->dc_data = (void *)dc->dc_flags;
5750 dc->dc_other = dm;
5751 dc->do_vtable = DC_VTABLE(MACH_RECV_BARRIER);
5752 return _dispatch_continuation_async(dm->_as_dq, dc);
5753 }
5754
5755 DISPATCH_NOINLINE
5756 void
5757 dispatch_mach_receive_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
5758 {
5759 dispatch_continuation_t dc = _dispatch_continuation_alloc();
5760 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
5761
5762 _dispatch_continuation_init(dc, dm, barrier, 0, 0, dc_flags);
5763 dc->dc_data = (void *)dc->dc_flags;
5764 dc->dc_other = dm;
5765 dc->do_vtable = DC_VTABLE(MACH_RECV_BARRIER);
5766 return _dispatch_continuation_async(dm->_as_dq, dc);
5767 }
5768
5769 DISPATCH_NOINLINE
5770 static void
5771 _dispatch_mach_cancel_invoke(dispatch_mach_t dm, dispatch_invoke_flags_t flags)
5772 {
5773 dispatch_mach_refs_t dr = dm->ds_refs;
5774
5775 dispatch_invoke_with_autoreleasepool(flags, {
5776 if (slowpath(!dm->dm_connect_handler_called)) {
5777 _dispatch_mach_connect_invoke(dm);
5778 }
5779 _dispatch_client_callout4(dr->dm_handler_ctxt,
5780 DISPATCH_MACH_CANCELED, NULL, 0, dr->dm_handler_func);
5781 });
5782 dm->dm_cancel_handler_called = 1;
5783 _dispatch_release(dm); // the retain is done at creation time
5784 }
5785
5786 DISPATCH_NOINLINE
5787 void
5788 dispatch_mach_cancel(dispatch_mach_t dm)
5789 {
5790 dispatch_source_cancel(dm->_as_ds);
5791 }
5792
5793 static void
5794 _dispatch_mach_install(dispatch_mach_t dm, pthread_priority_t pp)
5795 {
5796 uint32_t disconnect_cnt;
5797
5798 if (dm->ds_dkev) {
5799 _dispatch_source_kevent_register(dm->_as_ds, pp);
5800 }
5801 if (dm->ds_is_direct_kevent) {
5802 pp &= (~_PTHREAD_PRIORITY_FLAGS_MASK |
5803 _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG |
5804 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
5805 // _dispatch_mach_reply_kevent_register assumes this has been done
5806 // which is unlike regular sources or queues, the DEFAULTQUEUE flag
5807 // is used so that the priority of that channel doesn't act as a floor
5808 // QoS for incoming messages (26761457)
5809 dm->dq_priority = (dispatch_priority_t)pp;
5810 }
5811 dm->ds_is_installed = true;
5812 if (unlikely(!os_atomic_cmpxchgv2o(dm->dm_refs, dm_disconnect_cnt,
5813 DISPATCH_MACH_NEVER_INSTALLED, 0, &disconnect_cnt, release))) {
5814 DISPATCH_INTERNAL_CRASH(disconnect_cnt, "Channel already installed");
5815 }
5816 }
5817
5818 void
5819 _dispatch_mach_finalize_activation(dispatch_mach_t dm)
5820 {
5821 if (dm->ds_is_direct_kevent && !dm->ds_is_installed) {
5822 dispatch_source_t ds = dm->_as_ds;
5823 pthread_priority_t pp = _dispatch_source_compute_kevent_priority(ds);
5824 if (pp) _dispatch_mach_install(dm, pp);
5825 }
5826
5827 // call "super"
5828 _dispatch_queue_finalize_activation(dm->_as_dq);
5829 }
5830
5831 DISPATCH_ALWAYS_INLINE
5832 static inline dispatch_queue_t
5833 _dispatch_mach_invoke2(dispatch_object_t dou, dispatch_invoke_flags_t flags,
5834 uint64_t *owned, struct dispatch_object_s **dc_ptr DISPATCH_UNUSED)
5835 {
5836 dispatch_mach_t dm = dou._dm;
5837 dispatch_queue_t retq = NULL;
5838 dispatch_queue_t dq = _dispatch_queue_get_current();
5839
5840 // This function performs all mach channel actions. Each action is
5841 // responsible for verifying that it takes place on the appropriate queue.
5842 // If the current queue is not the correct queue for this action, the
5843 // correct queue will be returned and the invoke will be re-driven on that
5844 // queue.
5845
5846 // The order of tests here in invoke and in wakeup should be consistent.
5847
5848 dispatch_mach_send_refs_t dr = dm->dm_refs;
5849 dispatch_queue_t dkq = &_dispatch_mgr_q;
5850
5851 if (dm->ds_is_direct_kevent) {
5852 dkq = dm->do_targetq;
5853 }
5854
5855 if (slowpath(!dm->ds_is_installed)) {
5856 // The channel needs to be installed on the kevent queue.
5857 if (dq != dkq) {
5858 return dkq;
5859 }
5860 _dispatch_mach_install(dm, _dispatch_get_defaultpriority());
5861 }
5862
5863 if (_dispatch_queue_class_probe(dm)) {
5864 if (dq == dm->do_targetq) {
5865 retq = _dispatch_queue_serial_drain(dm->_as_dq, flags, owned, NULL);
5866 } else {
5867 retq = dm->do_targetq;
5868 }
5869 }
5870
5871 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(dm->_as_dq);
5872
5873 if (dr->dm_tail) {
5874 bool requires_mgr = dr->dm_needs_mgr || (dr->dm_disconnect_cnt &&
5875 (dm->dm_dkev || !dm->ds_is_direct_kevent));
5876 if (!(dm->dm_dkev && DISPATCH_MACH_NOTIFICATION_ARMED(dm->dm_dkev)) ||
5877 (dqf & DSF_CANCELED) || dr->dm_disconnect_cnt) {
5878 // The channel has pending messages to send.
5879 if (unlikely(requires_mgr && dq != &_dispatch_mgr_q)) {
5880 return retq ? retq : &_dispatch_mgr_q;
5881 }
5882 dispatch_mach_send_invoke_flags_t send_flags = DM_SEND_INVOKE_NONE;
5883 if (dq != &_dispatch_mgr_q) {
5884 send_flags |= DM_SEND_INVOKE_CAN_RUN_BARRIER;
5885 }
5886 _dispatch_mach_send_invoke(dm, flags, send_flags);
5887 }
5888 } else if (dqf & DSF_CANCELED) {
5889 // The channel has been cancelled and needs to be uninstalled from the
5890 // manager queue. After uninstallation, the cancellation handler needs
5891 // to be delivered to the target queue.
5892 if ((dqf & DSF_STATE_MASK) == (DSF_ARMED | DSF_DEFERRED_DELETE)) {
5893 // waiting for the delivery of a deferred delete event
5894 return retq;
5895 }
5896 if ((dqf & DSF_STATE_MASK) != DSF_DELETED) {
5897 if (dq != &_dispatch_mgr_q) {
5898 return retq ? retq : &_dispatch_mgr_q;
5899 }
5900 _dispatch_mach_send_invoke(dm, flags, DM_SEND_INVOKE_CANCEL);
5901 dqf = _dispatch_queue_atomic_flags(dm->_as_dq);
5902 if (unlikely((dqf & DSF_STATE_MASK) != DSF_DELETED)) {
5903 // waiting for the delivery of a deferred delete event
5904 // or deletion didn't happen because send_invoke couldn't
5905 // acquire the send lock
5906 return retq;
5907 }
5908 }
5909 if (!dm->dm_cancel_handler_called) {
5910 if (dq != dm->do_targetq) {
5911 return retq ? retq : dm->do_targetq;
5912 }
5913 _dispatch_mach_cancel_invoke(dm, flags);
5914 }
5915 }
5916
5917 return retq;
5918 }
5919
5920 DISPATCH_NOINLINE
5921 void
5922 _dispatch_mach_invoke(dispatch_mach_t dm, dispatch_invoke_flags_t flags)
5923 {
5924 _dispatch_queue_class_invoke(dm, flags, _dispatch_mach_invoke2);
5925 }
5926
5927 void
5928 _dispatch_mach_wakeup(dispatch_mach_t dm, pthread_priority_t pp,
5929 dispatch_wakeup_flags_t flags)
5930 {
5931 // This function determines whether the mach channel needs to be invoked.
5932 // The order of tests here in probe and in invoke should be consistent.
5933
5934 dispatch_mach_send_refs_t dr = dm->dm_refs;
5935 dispatch_queue_wakeup_target_t dkq = DISPATCH_QUEUE_WAKEUP_MGR;
5936 dispatch_queue_wakeup_target_t tq = DISPATCH_QUEUE_WAKEUP_NONE;
5937 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(dm->_as_dq);
5938
5939 if (dm->ds_is_direct_kevent) {
5940 dkq = DISPATCH_QUEUE_WAKEUP_TARGET;
5941 }
5942
5943 if (!dm->ds_is_installed) {
5944 // The channel needs to be installed on the kevent queue.
5945 tq = dkq;
5946 goto done;
5947 }
5948
5949 if (_dispatch_queue_class_probe(dm)) {
5950 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
5951 goto done;
5952 }
5953
5954 if (_dispatch_lock_is_locked(dr->dm_state_lock.dul_lock)) {
5955 // Sending and uninstallation below require the send lock, the channel
5956 // will be woken up when the lock is dropped <rdar://15132939&15203957>
5957 _dispatch_queue_reinstate_override_priority(dm, (dispatch_priority_t)pp);
5958 goto done;
5959 }
5960
5961 if (dr->dm_tail) {
5962 bool requires_mgr = dr->dm_needs_mgr || (dr->dm_disconnect_cnt &&
5963 (dm->dm_dkev || !dm->ds_is_direct_kevent));
5964 if (!(dm->dm_dkev && DISPATCH_MACH_NOTIFICATION_ARMED(dm->dm_dkev)) ||
5965 (dqf & DSF_CANCELED) || dr->dm_disconnect_cnt) {
5966 if (unlikely(requires_mgr)) {
5967 tq = DISPATCH_QUEUE_WAKEUP_MGR;
5968 } else {
5969 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
5970 }
5971 } else {
5972 // can happen when we can't send because the port is full
5973 // but we should not lose the override
5974 _dispatch_queue_reinstate_override_priority(dm,
5975 (dispatch_priority_t)pp);
5976 }
5977 } else if (dqf & DSF_CANCELED) {
5978 if ((dqf & DSF_STATE_MASK) == (DSF_ARMED | DSF_DEFERRED_DELETE)) {
5979 // waiting for the delivery of a deferred delete event
5980 } else if ((dqf & DSF_STATE_MASK) != DSF_DELETED) {
5981 // The channel needs to be uninstalled from the manager queue
5982 tq = DISPATCH_QUEUE_WAKEUP_MGR;
5983 } else if (!dm->dm_cancel_handler_called) {
5984 // the cancellation handler needs to be delivered to the target
5985 // queue.
5986 tq = DISPATCH_QUEUE_WAKEUP_TARGET;
5987 }
5988 }
5989
5990 done:
5991 if (tq) {
5992 return _dispatch_queue_class_wakeup(dm->_as_dq, pp, flags, tq);
5993 } else if (pp) {
5994 return _dispatch_queue_class_override_drainer(dm->_as_dq, pp, flags);
5995 } else if (flags & DISPATCH_WAKEUP_CONSUME) {
5996 return _dispatch_release_tailcall(dm);
5997 }
5998 }
5999
6000 #pragma mark -
6001 #pragma mark dispatch_mach_msg_t
6002
6003 dispatch_mach_msg_t
6004 dispatch_mach_msg_create(mach_msg_header_t *msg, size_t size,
6005 dispatch_mach_msg_destructor_t destructor, mach_msg_header_t **msg_ptr)
6006 {
6007 if (slowpath(size < sizeof(mach_msg_header_t)) ||
6008 slowpath(destructor && !msg)) {
6009 DISPATCH_CLIENT_CRASH(size, "Empty message");
6010 }
6011 dispatch_mach_msg_t dmsg = _dispatch_alloc(DISPATCH_VTABLE(mach_msg),
6012 sizeof(struct dispatch_mach_msg_s) +
6013 (destructor ? 0 : size - sizeof(dmsg->dmsg_msg)));
6014 if (destructor) {
6015 dmsg->dmsg_msg = msg;
6016 } else if (msg) {
6017 memcpy(dmsg->dmsg_buf, msg, size);
6018 }
6019 dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
6020 dmsg->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
6021 false);
6022 dmsg->dmsg_destructor = destructor;
6023 dmsg->dmsg_size = size;
6024 if (msg_ptr) {
6025 *msg_ptr = _dispatch_mach_msg_get_msg(dmsg);
6026 }
6027 return dmsg;
6028 }
6029
6030 void
6031 _dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg)
6032 {
6033 if (dmsg->dmsg_voucher) {
6034 _voucher_release(dmsg->dmsg_voucher);
6035 dmsg->dmsg_voucher = NULL;
6036 }
6037 switch (dmsg->dmsg_destructor) {
6038 case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT:
6039 break;
6040 case DISPATCH_MACH_MSG_DESTRUCTOR_FREE:
6041 free(dmsg->dmsg_msg);
6042 break;
6043 case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE: {
6044 mach_vm_size_t vm_size = dmsg->dmsg_size;
6045 mach_vm_address_t vm_addr = (uintptr_t)dmsg->dmsg_msg;
6046 (void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(),
6047 vm_addr, vm_size));
6048 break;
6049 }}
6050 }
6051
6052 static inline mach_msg_header_t*
6053 _dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg)
6054 {
6055 return dmsg->dmsg_destructor ? dmsg->dmsg_msg :
6056 (mach_msg_header_t*)dmsg->dmsg_buf;
6057 }
6058
6059 mach_msg_header_t*
6060 dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg, size_t *size_ptr)
6061 {
6062 if (size_ptr) {
6063 *size_ptr = dmsg->dmsg_size;
6064 }
6065 return _dispatch_mach_msg_get_msg(dmsg);
6066 }
6067
6068 size_t
6069 _dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg, char* buf, size_t bufsiz)
6070 {
6071 size_t offset = 0;
6072 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
6073 dx_kind(dmsg), dmsg);
6074 offset += dsnprintf(&buf[offset], bufsiz - offset, "xrefcnt = 0x%x, "
6075 "refcnt = 0x%x, ", dmsg->do_xref_cnt + 1, dmsg->do_ref_cnt + 1);
6076 offset += dsnprintf(&buf[offset], bufsiz - offset, "opts/err = 0x%x, "
6077 "msgh[%p] = { ", dmsg->dmsg_options, dmsg->dmsg_buf);
6078 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dmsg);
6079 if (hdr->msgh_id) {
6080 offset += dsnprintf(&buf[offset], bufsiz - offset, "id 0x%x, ",
6081 hdr->msgh_id);
6082 }
6083 if (hdr->msgh_size) {
6084 offset += dsnprintf(&buf[offset], bufsiz - offset, "size %u, ",
6085 hdr->msgh_size);
6086 }
6087 if (hdr->msgh_bits) {
6088 offset += dsnprintf(&buf[offset], bufsiz - offset, "bits <l %u, r %u",
6089 MACH_MSGH_BITS_LOCAL(hdr->msgh_bits),
6090 MACH_MSGH_BITS_REMOTE(hdr->msgh_bits));
6091 if (MACH_MSGH_BITS_OTHER(hdr->msgh_bits)) {
6092 offset += dsnprintf(&buf[offset], bufsiz - offset, ", o 0x%x",
6093 MACH_MSGH_BITS_OTHER(hdr->msgh_bits));
6094 }
6095 offset += dsnprintf(&buf[offset], bufsiz - offset, ">, ");
6096 }
6097 if (hdr->msgh_local_port && hdr->msgh_remote_port) {
6098 offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x, "
6099 "remote 0x%x", hdr->msgh_local_port, hdr->msgh_remote_port);
6100 } else if (hdr->msgh_local_port) {
6101 offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x",
6102 hdr->msgh_local_port);
6103 } else if (hdr->msgh_remote_port) {
6104 offset += dsnprintf(&buf[offset], bufsiz - offset, "remote 0x%x",
6105 hdr->msgh_remote_port);
6106 } else {
6107 offset += dsnprintf(&buf[offset], bufsiz - offset, "no ports");
6108 }
6109 offset += dsnprintf(&buf[offset], bufsiz - offset, " } }");
6110 return offset;
6111 }
6112
6113 #pragma mark -
6114 #pragma mark dispatch_mig_server
6115
6116 mach_msg_return_t
6117 dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz,
6118 dispatch_mig_callback_t callback)
6119 {
6120 mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
6121 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
6122 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0) | MACH_RCV_VOUCHER;
6123 mach_msg_options_t tmp_options;
6124 mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
6125 mach_msg_return_t kr = 0;
6126 uint64_t assertion_token = 0;
6127 unsigned int cnt = 1000; // do not stall out serial queues
6128 boolean_t demux_success;
6129 bool received = false;
6130 size_t rcv_size = maxmsgsz + MAX_TRAILER_SIZE;
6131
6132 bufRequest = alloca(rcv_size);
6133 bufRequest->RetCode = 0;
6134 for (mach_vm_address_t p = mach_vm_trunc_page(bufRequest + vm_page_size);
6135 p < (mach_vm_address_t)bufRequest + rcv_size; p += vm_page_size) {
6136 *(char*)p = 0; // ensure alloca buffer doesn't overlap with stack guard
6137 }
6138
6139 bufReply = alloca(rcv_size);
6140 bufReply->Head.msgh_size = 0;
6141 for (mach_vm_address_t p = mach_vm_trunc_page(bufReply + vm_page_size);
6142 p < (mach_vm_address_t)bufReply + rcv_size; p += vm_page_size) {
6143 *(char*)p = 0; // ensure alloca buffer doesn't overlap with stack guard
6144 }
6145
6146 #if DISPATCH_DEBUG
6147 options |= MACH_RCV_LARGE; // rdar://problem/8422992
6148 #endif
6149 tmp_options = options;
6150 // XXX FIXME -- change this to not starve out the target queue
6151 for (;;) {
6152 if (DISPATCH_QUEUE_IS_SUSPENDED(ds) || (--cnt == 0)) {
6153 options &= ~MACH_RCV_MSG;
6154 tmp_options &= ~MACH_RCV_MSG;
6155
6156 if (!(tmp_options & MACH_SEND_MSG)) {
6157 goto out;
6158 }
6159 }
6160 kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
6161 (mach_msg_size_t)rcv_size, (mach_port_t)ds->ds_ident_hack, 0,0);
6162
6163 tmp_options = options;
6164
6165 if (slowpath(kr)) {
6166 switch (kr) {
6167 case MACH_SEND_INVALID_DEST:
6168 case MACH_SEND_TIMED_OUT:
6169 if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
6170 mach_msg_destroy(&bufReply->Head);
6171 }
6172 break;
6173 case MACH_RCV_TIMED_OUT:
6174 // Don't return an error if a message was sent this time or
6175 // a message was successfully received previously
6176 // rdar://problems/7363620&7791738
6177 if(bufReply->Head.msgh_remote_port || received) {
6178 kr = MACH_MSG_SUCCESS;
6179 }
6180 break;
6181 case MACH_RCV_INVALID_NAME:
6182 break;
6183 #if DISPATCH_DEBUG
6184 case MACH_RCV_TOO_LARGE:
6185 // receive messages that are too large and log their id and size
6186 // rdar://problem/8422992
6187 tmp_options &= ~MACH_RCV_LARGE;
6188 size_t large_size = bufReply->Head.msgh_size + MAX_TRAILER_SIZE;
6189 void *large_buf = malloc(large_size);
6190 if (large_buf) {
6191 rcv_size = large_size;
6192 bufReply = large_buf;
6193 }
6194 if (!mach_msg(&bufReply->Head, tmp_options, 0,
6195 (mach_msg_size_t)rcv_size,
6196 (mach_port_t)ds->ds_ident_hack, 0, 0)) {
6197 _dispatch_log("BUG in libdispatch client: "
6198 "dispatch_mig_server received message larger than "
6199 "requested size %zd: id = 0x%x, size = %d",
6200 maxmsgsz, bufReply->Head.msgh_id,
6201 bufReply->Head.msgh_size);
6202 }
6203 if (large_buf) {
6204 free(large_buf);
6205 }
6206 // fall through
6207 #endif
6208 default:
6209 _dispatch_bug_mach_client(
6210 "dispatch_mig_server: mach_msg() failed", kr);
6211 break;
6212 }
6213 goto out;
6214 }
6215
6216 if (!(tmp_options & MACH_RCV_MSG)) {
6217 goto out;
6218 }
6219
6220 if (assertion_token) {
6221 #if DISPATCH_USE_IMPORTANCE_ASSERTION
6222 int r = proc_importance_assertion_complete(assertion_token);
6223 (void)dispatch_assume_zero(r);
6224 #endif
6225 assertion_token = 0;
6226 }
6227 received = true;
6228
6229 bufTemp = bufRequest;
6230 bufRequest = bufReply;
6231 bufReply = bufTemp;
6232
6233 #if DISPATCH_USE_IMPORTANCE_ASSERTION
6234 #pragma clang diagnostic push
6235 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
6236 int r = proc_importance_assertion_begin_with_msg(&bufRequest->Head,
6237 NULL, &assertion_token);
6238 if (r && slowpath(r != EIO)) {
6239 (void)dispatch_assume_zero(r);
6240 }
6241 #pragma clang diagnostic pop
6242 #endif
6243 _voucher_replace(voucher_create_with_mach_msg(&bufRequest->Head));
6244 demux_success = callback(&bufRequest->Head, &bufReply->Head);
6245
6246 if (!demux_success) {
6247 // destroy the request - but not the reply port
6248 bufRequest->Head.msgh_remote_port = 0;
6249 mach_msg_destroy(&bufRequest->Head);
6250 } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
6251 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
6252 // is present
6253 if (slowpath(bufReply->RetCode)) {
6254 if (bufReply->RetCode == MIG_NO_REPLY) {
6255 continue;
6256 }
6257
6258 // destroy the request - but not the reply port
6259 bufRequest->Head.msgh_remote_port = 0;
6260 mach_msg_destroy(&bufRequest->Head);
6261 }
6262 }
6263
6264 if (bufReply->Head.msgh_remote_port) {
6265 tmp_options |= MACH_SEND_MSG;
6266 if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) !=
6267 MACH_MSG_TYPE_MOVE_SEND_ONCE) {
6268 tmp_options |= MACH_SEND_TIMEOUT;
6269 }
6270 }
6271 }
6272
6273 out:
6274 if (assertion_token) {
6275 #if DISPATCH_USE_IMPORTANCE_ASSERTION
6276 int r = proc_importance_assertion_complete(assertion_token);
6277 (void)dispatch_assume_zero(r);
6278 #endif
6279 }
6280
6281 return kr;
6282 }
6283
6284 #endif /* HAVE_MACH */
6285
6286 #pragma mark -
6287 #pragma mark dispatch_source_debug
6288
6289 DISPATCH_NOINLINE
6290 static const char *
6291 _evfiltstr(short filt)
6292 {
6293 switch (filt) {
6294 #define _evfilt2(f) case (f): return #f
6295 _evfilt2(EVFILT_READ);
6296 _evfilt2(EVFILT_WRITE);
6297 _evfilt2(EVFILT_AIO);
6298 _evfilt2(EVFILT_VNODE);
6299 _evfilt2(EVFILT_PROC);
6300 _evfilt2(EVFILT_SIGNAL);
6301 _evfilt2(EVFILT_TIMER);
6302 #if HAVE_MACH
6303 _evfilt2(EVFILT_MACHPORT);
6304 _evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION);
6305 #endif
6306 _evfilt2(EVFILT_FS);
6307 _evfilt2(EVFILT_USER);
6308 #ifdef EVFILT_SOCK
6309 _evfilt2(EVFILT_SOCK);
6310 #endif
6311 #ifdef EVFILT_MEMORYSTATUS
6312 _evfilt2(EVFILT_MEMORYSTATUS);
6313 #endif
6314
6315 _evfilt2(DISPATCH_EVFILT_TIMER);
6316 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
6317 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
6318 default:
6319 return "EVFILT_missing";
6320 }
6321 }
6322
6323 #if DISPATCH_DEBUG
6324 static const char *
6325 _evflagstr2(uint16_t *flagsp)
6326 {
6327 #define _evflag2(f) \
6328 if ((*flagsp & (f)) == (f) && (f)) { \
6329 *flagsp &= ~(f); \
6330 return #f "|"; \
6331 }
6332 _evflag2(EV_ADD);
6333 _evflag2(EV_DELETE);
6334 _evflag2(EV_ENABLE);
6335 _evflag2(EV_DISABLE);
6336 _evflag2(EV_ONESHOT);
6337 _evflag2(EV_CLEAR);
6338 _evflag2(EV_RECEIPT);
6339 _evflag2(EV_DISPATCH);
6340 _evflag2(EV_UDATA_SPECIFIC);
6341 #ifdef EV_POLL
6342 _evflag2(EV_POLL);
6343 #endif
6344 #ifdef EV_OOBAND
6345 _evflag2(EV_OOBAND);
6346 #endif
6347 _evflag2(EV_ERROR);
6348 _evflag2(EV_EOF);
6349 _evflag2(EV_VANISHED);
6350 *flagsp = 0;
6351 return "EV_UNKNOWN ";
6352 }
6353
6354 DISPATCH_NOINLINE
6355 static const char *
6356 _evflagstr(uint16_t flags, char *str, size_t strsize)
6357 {
6358 str[0] = 0;
6359 while (flags) {
6360 strlcat(str, _evflagstr2(&flags), strsize);
6361 }
6362 size_t sz = strlen(str);
6363 if (sz) str[sz-1] = 0;
6364 return str;
6365 }
6366 #endif
6367
6368 static size_t
6369 _dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
6370 {
6371 dispatch_queue_t target = ds->do_targetq;
6372 return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%lx, "
6373 "mask = 0x%lx, pending_data = 0x%lx, registered = %d, "
6374 "armed = %d, deleted = %d%s, canceled = %d, ",
6375 target && target->dq_label ? target->dq_label : "", target,
6376 ds->ds_ident_hack, ds->ds_pending_data_mask, ds->ds_pending_data,
6377 ds->ds_is_installed, (bool)(ds->dq_atomic_flags & DSF_ARMED),
6378 (bool)(ds->dq_atomic_flags & DSF_DELETED),
6379 (ds->dq_atomic_flags & DSF_DEFERRED_DELETE) ? " (pending)" : "",
6380 (bool)(ds->dq_atomic_flags & DSF_CANCELED));
6381 }
6382
6383 static size_t
6384 _dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
6385 {
6386 dispatch_source_refs_t dr = ds->ds_refs;
6387 return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx"
6388 ", last_fire = 0x%llx, interval = 0x%llx, flags = 0x%lx }, ",
6389 (unsigned long long)ds_timer(dr).target,
6390 (unsigned long long)ds_timer(dr).deadline,
6391 (unsigned long long)ds_timer(dr).last_fire,
6392 (unsigned long long)ds_timer(dr).interval, ds_timer(dr).flags);
6393 }
6394
6395 size_t
6396 _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
6397 {
6398 size_t offset = 0;
6399 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
6400 dx_kind(ds), ds);
6401 offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
6402 offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
6403 if (ds->ds_is_timer) {
6404 offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
6405 }
6406 const char *filter;
6407 if (!ds->ds_dkev) {
6408 filter = "????";
6409 } else if (ds->ds_is_custom_source) {
6410 filter = _evfiltstr((int16_t)(uintptr_t)ds->ds_dkev);
6411 } else {
6412 filter = _evfiltstr(ds->ds_dkev->dk_kevent.filter);
6413 }
6414 offset += dsnprintf(&buf[offset], bufsiz - offset, "kevent = %p%s, "
6415 "filter = %s }", ds->ds_dkev, ds->ds_is_direct_kevent ? " (direct)"
6416 : "", filter);
6417 return offset;
6418 }
6419
6420 #if HAVE_MACH
6421 static size_t
6422 _dispatch_mach_debug_attr(dispatch_mach_t dm, char* buf, size_t bufsiz)
6423 {
6424 dispatch_queue_t target = dm->do_targetq;
6425 return dsnprintf(buf, bufsiz, "target = %s[%p], receive = 0x%x, "
6426 "send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, "
6427 "send state = %016llx, disconnected = %d, canceled = %d ",
6428 target && target->dq_label ? target->dq_label : "", target,
6429 dm->ds_dkev ?(mach_port_t)dm->ds_dkev->dk_kevent.ident:0,
6430 dm->dm_refs->dm_send,
6431 dm->dm_dkev ?(mach_port_t)dm->dm_dkev->dk_kevent.ident:0,
6432 dm->dm_dkev && DISPATCH_MACH_NOTIFICATION_ARMED(dm->dm_dkev) ?
6433 " (armed)" : "", dm->dm_refs->dm_checkin_port,
6434 dm->dm_refs->dm_checkin ? " (pending)" : "",
6435 dm->dm_refs->dm_state, dm->dm_refs->dm_disconnect_cnt,
6436 (bool)(dm->dq_atomic_flags & DSF_CANCELED));
6437 }
6438
6439 size_t
6440 _dispatch_mach_debug(dispatch_mach_t dm, char* buf, size_t bufsiz)
6441 {
6442 size_t offset = 0;
6443 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
6444 dm->dq_label && !dm->dm_cancel_handler_called ? dm->dq_label :
6445 dx_kind(dm), dm);
6446 offset += _dispatch_object_debug_attr(dm, &buf[offset], bufsiz - offset);
6447 offset += _dispatch_mach_debug_attr(dm, &buf[offset], bufsiz - offset);
6448 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
6449 return offset;
6450 }
6451 #endif // HAVE_MACH
6452
6453 #if DISPATCH_DEBUG
6454 DISPATCH_NOINLINE
6455 static void
6456 dispatch_kevent_debug(const char *verb, const _dispatch_kevent_qos_s *kev,
6457 int i, int n, const char *function, unsigned int line)
6458 {
6459 char flagstr[256];
6460 char i_n[31];
6461
6462 if (n > 1) {
6463 snprintf(i_n, sizeof(i_n), "%d/%d ", i + 1, n);
6464 } else {
6465 i_n[0] = '\0';
6466 }
6467 #if DISPATCH_USE_KEVENT_QOS
6468 _dispatch_debug("%s kevent[%p] %s= { ident = 0x%llx, filter = %s, "
6469 "flags = %s (0x%x), fflags = 0x%x, data = 0x%llx, udata = 0x%llx, "
6470 "qos = 0x%x, ext[0] = 0x%llx, ext[1] = 0x%llx, ext[2] = 0x%llx, "
6471 "ext[3] = 0x%llx }: %s #%u", verb, kev, i_n, kev->ident,
6472 _evfiltstr(kev->filter), _evflagstr(kev->flags, flagstr,
6473 sizeof(flagstr)), kev->flags, kev->fflags, kev->data, kev->udata,
6474 kev->qos, kev->ext[0], kev->ext[1], kev->ext[2], kev->ext[3],
6475 function, line);
6476 #else
6477 _dispatch_debug("%s kevent[%p] %s= { ident = 0x%llx, filter = %s, "
6478 "flags = %s (0x%x), fflags = 0x%x, data = 0x%llx, udata = 0x%llx, "
6479 "ext[0] = 0x%llx, ext[1] = 0x%llx }: %s #%u", verb, kev, i_n,
6480 kev->ident, _evfiltstr(kev->filter), _evflagstr(kev->flags, flagstr,
6481 sizeof(flagstr)), kev->flags, kev->fflags, kev->data, kev->udata,
6482 #ifndef IGNORE_KEVENT64_EXT
6483 kev->ext[0], kev->ext[1],
6484 #else
6485 0ull, 0ull,
6486 #endif
6487 function, line);
6488 #endif
6489 }
6490
6491 #if HAVE_MACH
6492
6493 #ifndef MACH_PORT_TYPE_SPREQUEST
6494 #define MACH_PORT_TYPE_SPREQUEST 0x40000000
6495 #endif
6496
6497 DISPATCH_NOINLINE
6498 void
6499 dispatch_debug_machport(mach_port_t name, const char* str)
6500 {
6501 mach_port_type_t type;
6502 mach_msg_bits_t ns = 0, nr = 0, nso = 0, nd = 0;
6503 unsigned int dnreqs = 0, dnrsiz;
6504 kern_return_t kr = mach_port_type(mach_task_self(), name, &type);
6505 if (kr) {
6506 _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name,
6507 kr, mach_error_string(kr), str);
6508 return;
6509 }
6510 if (type & MACH_PORT_TYPE_SEND) {
6511 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
6512 MACH_PORT_RIGHT_SEND, &ns));
6513 }
6514 if (type & MACH_PORT_TYPE_SEND_ONCE) {
6515 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
6516 MACH_PORT_RIGHT_SEND_ONCE, &nso));
6517 }
6518 if (type & MACH_PORT_TYPE_DEAD_NAME) {
6519 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
6520 MACH_PORT_RIGHT_DEAD_NAME, &nd));
6521 }
6522 if (type & (MACH_PORT_TYPE_RECEIVE|MACH_PORT_TYPE_SEND)) {
6523 kr = mach_port_dnrequest_info(mach_task_self(), name, &dnrsiz, &dnreqs);
6524 if (kr != KERN_INVALID_RIGHT) (void)dispatch_assume_zero(kr);
6525 }
6526 if (type & MACH_PORT_TYPE_RECEIVE) {
6527 mach_port_status_t status = { .mps_pset = 0, };
6528 mach_msg_type_number_t cnt = MACH_PORT_RECEIVE_STATUS_COUNT;
6529 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
6530 MACH_PORT_RIGHT_RECEIVE, &nr));
6531 (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
6532 name, MACH_PORT_RECEIVE_STATUS, (void*)&status, &cnt));
6533 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
6534 "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
6535 "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
6536 "seqno(%03u) }: %s", name, nr, ns, nso, nd, dnreqs,
6537 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N",
6538 status.mps_nsrequest ? "Y":"N", status.mps_pdrequest ? "Y":"N",
6539 status.mps_srights ? "Y":"N", status.mps_sorights,
6540 status.mps_qlimit, status.mps_msgcount, status.mps_mscount,
6541 status.mps_seqno, str);
6542 } else if (type & (MACH_PORT_TYPE_SEND|MACH_PORT_TYPE_SEND_ONCE|
6543 MACH_PORT_TYPE_DEAD_NAME)) {
6544 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
6545 "dnreqs(%03u) spreq(%s) }: %s", name, nr, ns, nso, nd, dnreqs,
6546 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", str);
6547 } else {
6548 _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name, type,
6549 str);
6550 }
6551 }
6552
6553 #endif // HAVE_MACH
6554
6555 #endif // DISPATCH_DEBUG