]> git.saurik.com Git - apple/libdispatch.git/blob - src/source.c
libdispatch-501.40.12.tar.gz
[apple/libdispatch.git] / src / source.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #include "protocolServer.h"
25 #endif
26 #include <sys/mount.h>
27
28 #define DKEV_DISPOSE_IMMEDIATE_DELETE 0x1
29 #define DKEV_DISPOSE_IGNORE_ENOENT 0x2
30
31 static void _dispatch_source_merge_kevent(dispatch_source_t ds,
32 const _dispatch_kevent_qos_s *ke);
33 static bool _dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp);
34 static long _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg,
35 int options);
36 static long _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
37 uint32_t del_flags);
38 static void _dispatch_kevent_drain(_dispatch_kevent_qos_s *ke);
39 static void _dispatch_kevent_merge(_dispatch_kevent_qos_s *ke);
40 static void _dispatch_timers_kevent(_dispatch_kevent_qos_s *ke);
41 static void _dispatch_timers_unregister(dispatch_source_t ds,
42 dispatch_kevent_t dk);
43 static void _dispatch_timers_update(dispatch_source_t ds);
44 static void _dispatch_timer_aggregates_check(void);
45 static void _dispatch_timer_aggregates_register(dispatch_source_t ds);
46 static void _dispatch_timer_aggregates_update(dispatch_source_t ds,
47 unsigned int tidx);
48 static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds,
49 unsigned int tidx);
50 static inline unsigned long _dispatch_source_timer_data(
51 dispatch_source_refs_t dr, unsigned long prev);
52 static long _dispatch_kq_update(const _dispatch_kevent_qos_s *);
53 static void _dispatch_memorystatus_init(void);
54 #if HAVE_MACH
55 static void _dispatch_mach_host_calendar_change_register(void);
56 static void _dispatch_mach_recv_msg_buf_init(void);
57 static kern_return_t _dispatch_kevent_machport_resume(dispatch_kevent_t dk,
58 uint32_t new_flags, uint32_t del_flags);
59 static kern_return_t _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk,
60 uint32_t new_flags, uint32_t del_flags);
61 static inline void _dispatch_kevent_mach_portset(_dispatch_kevent_qos_s *ke);
62 #else
63 static inline void _dispatch_mach_host_calendar_change_register(void) {}
64 static inline void _dispatch_mach_recv_msg_buf_init(void) {}
65 #endif
66 static const char * _evfiltstr(short filt);
67 #if DISPATCH_DEBUG
68 static void _dispatch_kevent_debug(const _dispatch_kevent_qos_s* kev,
69 const char* str);
70 static void _dispatch_kevent_debugger(void *context);
71 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \
72 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q)
73 #else
74 static inline void
75 _dispatch_kevent_debug(const _dispatch_kevent_qos_s* kev DISPATCH_UNUSED,
76 const char* str DISPATCH_UNUSED) {}
77 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE()
78 #endif
79 #ifndef DISPATCH_MGR_QUEUE_DEBUG
80 #define DISPATCH_MGR_QUEUE_DEBUG 0
81 #endif
82 #if DISPATCH_MGR_QUEUE_DEBUG
83 #define _dispatch_kevent_mgr_debug _dispatch_kevent_debug
84 #else
85 static inline void
86 _dispatch_kevent_mgr_debug(_dispatch_kevent_qos_s* kev DISPATCH_UNUSED,
87 const char* str DISPATCH_UNUSED) {}
88 #endif
89
90 #pragma mark -
91 #pragma mark dispatch_source_t
92
93 dispatch_source_t
94 dispatch_source_create(dispatch_source_type_t type,
95 uintptr_t handle,
96 unsigned long mask,
97 dispatch_queue_t dq)
98 {
99 const _dispatch_kevent_qos_s *proto_kev = &type->ke;
100 dispatch_source_t ds;
101 dispatch_kevent_t dk;
102
103 // input validation
104 if (type == NULL || (mask & ~type->mask)) {
105 return NULL;
106 }
107
108 switch (type->ke.filter) {
109 case EVFILT_SIGNAL:
110 if (handle >= NSIG) {
111 return NULL;
112 }
113 break;
114 case EVFILT_FS:
115 #if DISPATCH_USE_VM_PRESSURE
116 case EVFILT_VM:
117 #endif
118 #if DISPATCH_USE_MEMORYSTATUS
119 case EVFILT_MEMORYSTATUS:
120 #endif
121 case DISPATCH_EVFILT_CUSTOM_ADD:
122 case DISPATCH_EVFILT_CUSTOM_OR:
123 if (handle) {
124 return NULL;
125 }
126 break;
127 case DISPATCH_EVFILT_TIMER:
128 if (!!handle ^ !!type->ke.ident) {
129 return NULL;
130 }
131 break;
132 default:
133 break;
134 }
135
136 ds = _dispatch_alloc(DISPATCH_VTABLE(source),
137 sizeof(struct dispatch_source_s));
138 // Initialize as a queue first, then override some settings below.
139 _dispatch_queue_init((dispatch_queue_t)ds);
140 ds->dq_label = "source";
141
142 ds->do_ref_cnt++; // the reference the manager queue holds
143 ds->do_ref_cnt++; // since source is created suspended
144 ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
145
146 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
147 dk->dk_kevent = *proto_kev;
148 dk->dk_kevent.ident = handle;
149 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
150 dk->dk_kevent.fflags |= (uint32_t)mask;
151 dk->dk_kevent.udata = (uintptr_t)dk;
152 TAILQ_INIT(&dk->dk_sources);
153
154 ds->ds_dkev = dk;
155 ds->ds_pending_data_mask = dk->dk_kevent.fflags;
156 ds->ds_ident_hack = (uintptr_t)dk->dk_kevent.ident;
157 if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
158 ds->ds_is_level = true;
159 ds->ds_needs_rearm = true;
160 } else if (!(EV_CLEAR & proto_kev->flags)) {
161 // we cheat and use EV_CLEAR to mean a "flag thingy"
162 ds->ds_is_adder = true;
163 }
164 if (EV_UDATA_SPECIFIC & proto_kev->flags) {
165 dispatch_assert(!(EV_ONESHOT & proto_kev->flags));
166 dk->dk_kevent.flags |= EV_DISPATCH;
167 ds->ds_is_direct_kevent = true;
168 ds->ds_needs_rearm = true;
169 }
170 // Some sources require special processing
171 if (type->init != NULL) {
172 type->init(ds, type, handle, mask, dq);
173 }
174 dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
175
176 if (fastpath(!ds->ds_refs)) {
177 ds->ds_refs = _dispatch_calloc(1ul,
178 sizeof(struct dispatch_source_refs_s));
179 }
180 ds->ds_refs->dr_source_wref = _dispatch_ptr2wref(ds);
181
182 if (!ds->ds_is_direct_kevent) {
183 // The initial target queue is the manager queue, in order to get
184 // the source installed. <rdar://problem/8928171>
185 ds->do_targetq = &_dispatch_mgr_q;
186 // First item on the queue sets the user-specified target queue
187 dispatch_set_target_queue(ds, dq);
188 } else {
189 if (slowpath(!dq)) {
190 dq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true);
191 } else {
192 _dispatch_retain(dq);
193 }
194 ds->do_targetq = dq;
195 _dispatch_queue_priority_inherit_from_target((dispatch_queue_t)ds, dq);
196 _dispatch_queue_set_override_priority(dq);
197 }
198 _dispatch_object_debug(ds, "%s", __func__);
199 return ds;
200 }
201
202 DISPATCH_ALWAYS_INLINE
203 static inline dispatch_queue_t
204 _dispatch_source_get_kevent_queue(dispatch_source_t ds)
205 {
206 if (ds->ds_is_direct_kevent) {
207 return ds->do_targetq;
208 }
209 return &_dispatch_mgr_q;
210 }
211
212 void
213 _dispatch_source_dispose(dispatch_source_t ds)
214 {
215 _dispatch_object_debug(ds, "%s", __func__);
216 free(ds->ds_refs);
217 _dispatch_queue_destroy(ds);
218 }
219
220 void
221 _dispatch_source_xref_dispose(dispatch_source_t ds)
222 {
223 _dispatch_wakeup(ds);
224 }
225
226 void
227 dispatch_source_cancel(dispatch_source_t ds)
228 {
229 _dispatch_object_debug(ds, "%s", __func__);
230 // Right after we set the cancel flag, someone else
231 // could potentially invoke the source, do the cancelation,
232 // unregister the source, and deallocate it. We would
233 // need to therefore retain/release before setting the bit
234
235 _dispatch_retain(ds);
236 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_CANCELED, relaxed);
237 _dispatch_wakeup(ds);
238 _dispatch_release(ds);
239 }
240
241 long
242 dispatch_source_testcancel(dispatch_source_t ds)
243 {
244 return (bool)(ds->ds_atomic_flags & DSF_CANCELED);
245 }
246
247 unsigned long
248 dispatch_source_get_mask(dispatch_source_t ds)
249 {
250 unsigned long mask = ds->ds_pending_data_mask;
251 if (ds->ds_vmpressure_override) {
252 mask = NOTE_VM_PRESSURE;
253 }
254 #if TARGET_IPHONE_SIMULATOR
255 else if (ds->ds_memorystatus_override) {
256 mask = NOTE_MEMORYSTATUS_PRESSURE_WARN;
257 }
258 #endif
259 return mask;
260 }
261
262 uintptr_t
263 dispatch_source_get_handle(dispatch_source_t ds)
264 {
265 unsigned int handle = (unsigned int)ds->ds_ident_hack;
266 #if TARGET_IPHONE_SIMULATOR
267 if (ds->ds_memorystatus_override) {
268 handle = 0;
269 }
270 #endif
271 return handle;
272 }
273
274 unsigned long
275 dispatch_source_get_data(dispatch_source_t ds)
276 {
277 unsigned long data = ds->ds_data;
278 if (ds->ds_vmpressure_override) {
279 data = NOTE_VM_PRESSURE;
280 }
281 #if TARGET_IPHONE_SIMULATOR
282 else if (ds->ds_memorystatus_override) {
283 data = NOTE_MEMORYSTATUS_PRESSURE_WARN;
284 }
285 #endif
286 return data;
287 }
288
289 void
290 dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
291 {
292 _dispatch_kevent_qos_s kev = {
293 .fflags = (typeof(kev.fflags))val,
294 .data = (typeof(kev.data))val,
295 };
296
297 dispatch_assert(
298 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
299 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
300
301 _dispatch_source_merge_kevent(ds, &kev);
302 }
303
304 #pragma mark -
305 #pragma mark dispatch_source_handler
306
307 DISPATCH_ALWAYS_INLINE
308 static inline dispatch_continuation_t
309 _dispatch_source_handler_alloc(dispatch_source_t ds, void *handler, long kind,
310 bool block)
311 {
312 dispatch_continuation_t dc = _dispatch_continuation_alloc();
313 if (handler) {
314 dc->do_vtable = (void *)((block ? DISPATCH_OBJ_BLOCK_RELEASE_BIT :
315 DISPATCH_OBJ_CTXT_FETCH_BIT) | (kind != DS_EVENT_HANDLER ?
316 DISPATCH_OBJ_ASYNC_BIT : 0l));
317 dc->dc_priority = 0;
318 dc->dc_voucher = NULL;
319 if (block) {
320 #ifdef __BLOCKS__
321 if (slowpath(_dispatch_block_has_private_data(handler))) {
322 // sources don't propagate priority by default
323 dispatch_block_flags_t flags = DISPATCH_BLOCK_NO_QOS_CLASS;
324 flags |= _dispatch_block_get_flags(handler);
325 _dispatch_continuation_priority_set(dc,
326 _dispatch_block_get_priority(handler), flags);
327 }
328 if (kind != DS_EVENT_HANDLER) {
329 dc->dc_func = _dispatch_call_block_and_release;
330 } else {
331 dc->dc_func = _dispatch_Block_invoke(handler);
332 }
333 dc->dc_ctxt = _dispatch_Block_copy(handler);
334 #endif /* __BLOCKS__ */
335 } else {
336 dc->dc_func = handler;
337 dc->dc_ctxt = ds->do_ctxt;
338 }
339 _dispatch_trace_continuation_push((dispatch_queue_t)ds, dc);
340 } else {
341 dc->dc_func = NULL;
342 }
343 dc->dc_data = (void*)kind;
344 return dc;
345 }
346
347 static inline void
348 _dispatch_source_handler_replace(dispatch_source_refs_t dr, long kind,
349 dispatch_continuation_t dc_new)
350 {
351 dispatch_continuation_t dc = dr->ds_handler[kind];
352 if (dc) {
353 #ifdef __BLOCKS__
354 if ((long)dc->do_vtable & DISPATCH_OBJ_BLOCK_RELEASE_BIT) {
355 Block_release(dc->dc_ctxt);
356 }
357 #endif /* __BLOCKS__ */
358 if (dc->dc_voucher) {
359 _voucher_release(dc->dc_voucher);
360 dc->dc_voucher = NULL;
361 }
362 _dispatch_continuation_free(dc);
363 }
364 dr->ds_handler[kind] = dc_new;
365 }
366
367 static inline void
368 _dispatch_source_handler_free(dispatch_source_refs_t dr, long kind)
369 {
370 _dispatch_source_handler_replace(dr, kind, NULL);
371 }
372
373 static void
374 _dispatch_source_set_handler(void *context)
375 {
376 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
377 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
378 dispatch_continuation_t dc = context;
379 long kind = (long)dc->dc_data;
380 dc->dc_data = 0;
381 if (!dc->dc_func) {
382 _dispatch_continuation_free(dc);
383 dc = NULL;
384 } else if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) {
385 dc->dc_ctxt = ds->do_ctxt;
386 }
387 _dispatch_source_handler_replace(ds->ds_refs, kind, dc);
388 if (kind == DS_EVENT_HANDLER && dc && dc->dc_priority) {
389 #if HAVE_PTHREAD_WORKQUEUE_QOS
390 ds->dq_priority = dc->dc_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK;
391 _dispatch_queue_set_override_priority((dispatch_queue_t)ds);
392 #endif
393 }
394 }
395
396 #ifdef __BLOCKS__
397 void
398 dispatch_source_set_event_handler(dispatch_source_t ds,
399 dispatch_block_t handler)
400 {
401 dispatch_continuation_t dc;
402 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true);
403 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
404 _dispatch_source_set_handler);
405 }
406 #endif /* __BLOCKS__ */
407
408 void
409 dispatch_source_set_event_handler_f(dispatch_source_t ds,
410 dispatch_function_t handler)
411 {
412 dispatch_continuation_t dc;
413 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false);
414 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
415 _dispatch_source_set_handler);
416 }
417
418 void
419 _dispatch_source_set_event_handler_with_context_f(dispatch_source_t ds,
420 void *ctxt, dispatch_function_t handler)
421 {
422 dispatch_continuation_t dc;
423 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false);
424 dc->do_vtable = (void *)((long)dc->do_vtable &~DISPATCH_OBJ_CTXT_FETCH_BIT);
425 dc->dc_other = dc->dc_ctxt;
426 dc->dc_ctxt = ctxt;
427 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
428 _dispatch_source_set_handler);
429 }
430
431 #ifdef __BLOCKS__
432 void
433 dispatch_source_set_cancel_handler(dispatch_source_t ds,
434 dispatch_block_t handler)
435 {
436 dispatch_continuation_t dc;
437 dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, true);
438 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
439 _dispatch_source_set_handler);
440 }
441 #endif /* __BLOCKS__ */
442
443 void
444 dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
445 dispatch_function_t handler)
446 {
447 dispatch_continuation_t dc;
448 dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, false);
449 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
450 _dispatch_source_set_handler);
451 }
452
453 #ifdef __BLOCKS__
454 void
455 dispatch_source_set_registration_handler(dispatch_source_t ds,
456 dispatch_block_t handler)
457 {
458 dispatch_continuation_t dc;
459 dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, true);
460 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
461 _dispatch_source_set_handler);
462 }
463 #endif /* __BLOCKS__ */
464
465 void
466 dispatch_source_set_registration_handler_f(dispatch_source_t ds,
467 dispatch_function_t handler)
468 {
469 dispatch_continuation_t dc;
470 dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, false);
471 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
472 _dispatch_source_set_handler);
473 }
474
475 #pragma mark -
476 #pragma mark dispatch_source_invoke
477
478 static void
479 _dispatch_source_registration_callout(dispatch_source_t ds)
480 {
481 dispatch_source_refs_t dr = ds->ds_refs;
482 dispatch_continuation_t dc = dr->ds_handler[DS_REGISTN_HANDLER];
483 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
484 // no registration callout if source is canceled rdar://problem/8955246
485 return _dispatch_source_handler_free(dr, DS_REGISTN_HANDLER);
486 }
487 pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority);
488 if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) {
489 dc->dc_ctxt = ds->do_ctxt;
490 }
491 _dispatch_continuation_pop(dc);
492 dr->ds_handler[DS_REGISTN_HANDLER] = NULL;
493 _dispatch_reset_defaultpriority(old_dp);
494 }
495
496 static void
497 _dispatch_source_cancel_callout(dispatch_source_t ds)
498 {
499 dispatch_source_refs_t dr = ds->ds_refs;
500 dispatch_continuation_t dc = dr->ds_handler[DS_CANCEL_HANDLER];
501 ds->ds_pending_data_mask = 0;
502 ds->ds_pending_data = 0;
503 ds->ds_data = 0;
504 _dispatch_source_handler_free(dr, DS_EVENT_HANDLER);
505 _dispatch_source_handler_free(dr, DS_REGISTN_HANDLER);
506 if (!dc) {
507 return;
508 }
509 if (!(ds->ds_atomic_flags & DSF_CANCELED)) {
510 return _dispatch_source_handler_free(dr, DS_CANCEL_HANDLER);
511 }
512 pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority);
513 if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) {
514 dc->dc_ctxt = ds->do_ctxt;
515 }
516 _dispatch_continuation_pop(dc);
517 dr->ds_handler[DS_CANCEL_HANDLER] = NULL;
518 _dispatch_reset_defaultpriority(old_dp);
519 }
520
521 static void
522 _dispatch_source_latch_and_call(dispatch_source_t ds)
523 {
524 unsigned long prev;
525
526 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
527 return;
528 }
529 dispatch_source_refs_t dr = ds->ds_refs;
530 dispatch_continuation_t dc = dr->ds_handler[DS_EVENT_HANDLER];
531 prev = dispatch_atomic_xchg2o(ds, ds_pending_data, 0, relaxed);
532 if (ds->ds_is_level) {
533 ds->ds_data = ~prev;
534 } else if (ds->ds_is_timer && ds_timer(dr).target && prev) {
535 ds->ds_data = _dispatch_source_timer_data(dr, prev);
536 } else {
537 ds->ds_data = prev;
538 }
539 if (!dispatch_assume(prev) || !dc) {
540 return;
541 }
542 pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority);
543 voucher_t voucher = dc->dc_voucher ? _voucher_retain(dc->dc_voucher) : NULL;
544 _dispatch_continuation_voucher_adopt(dc); // consumes voucher reference
545 _dispatch_continuation_pop(dc);
546 if (voucher) dc->dc_voucher = voucher;
547 _dispatch_reset_defaultpriority(old_dp);
548 }
549
550 static void
551 _dispatch_source_kevent_unregister(dispatch_source_t ds)
552 {
553 _dispatch_object_debug(ds, "%s", __func__);
554 uint32_t flags = (uint32_t)ds->ds_pending_data_mask;
555 dispatch_kevent_t dk = ds->ds_dkev;
556 if (ds->ds_atomic_flags & DSF_DELETED) {
557 dk->dk_kevent.flags |= EV_DELETE; // already deleted
558 dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE);
559 }
560 if (dk->dk_kevent.filter == DISPATCH_EVFILT_TIMER) {
561 ds->ds_dkev = NULL;
562 _dispatch_timers_unregister(ds, dk);
563 } else if (!ds->ds_is_direct_kevent) {
564 ds->ds_dkev = NULL;
565 TAILQ_REMOVE(&dk->dk_sources, ds->ds_refs, dr_list);
566 _dispatch_kevent_unregister(dk, flags, 0);
567 } else {
568 int dkev_dispose_options = 0;
569 if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
570 dkev_dispose_options |= DKEV_DISPOSE_IMMEDIATE_DELETE;
571 }
572 if (ds->ds_needs_mgr) {
573 dkev_dispose_options |= DKEV_DISPOSE_IGNORE_ENOENT;
574 ds->ds_needs_mgr = false;
575 }
576 long r = _dispatch_kevent_unregister(dk, flags, dkev_dispose_options);
577 if (r == EINPROGRESS) {
578 _dispatch_debug("kevent-source[%p]: deferred delete kevent[%p]",
579 ds, dk);
580 ds->ds_pending_delete = true;
581 return; // deferred unregistration
582 } else if (r == ENOENT) {
583 _dispatch_debug("kevent-source[%p]: ENOENT delete kevent[%p]",
584 ds, dk);
585 ds->ds_needs_mgr = true;
586 return; // potential concurrent EV_DELETE delivery rdar://22047283
587 }
588 ds->ds_dkev = NULL;
589 _TAILQ_TRASH_ENTRY(ds->ds_refs, dr_list);
590 }
591 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
592 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds, ds->ds_dkev);
593 ds->ds_needs_rearm = false; // re-arm is pointless and bad now
594 _dispatch_release(ds); // the retain is done at creation time
595 }
596
597 static void
598 _dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags)
599 {
600 switch (ds->ds_dkev->dk_kevent.filter) {
601 case DISPATCH_EVFILT_TIMER:
602 _dispatch_timers_update(ds);
603 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
604 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds,
605 ds->ds_dkev);
606 return;
607 case EVFILT_MACHPORT:
608 if (ds->ds_pending_data_mask & DISPATCH_MACH_RECV_MESSAGE) {
609 new_flags |= DISPATCH_MACH_RECV_MESSAGE; // emulate EV_DISPATCH
610 }
611 break;
612 }
613 if ((ds->ds_atomic_flags & DSF_DELETED) ||
614 _dispatch_kevent_resume(ds->ds_dkev, new_flags, 0)) {
615 _dispatch_source_kevent_unregister(ds);
616 }
617 }
618
619 static void
620 _dispatch_source_kevent_register(dispatch_source_t ds)
621 {
622 dispatch_assert_zero(ds->ds_is_installed);
623 switch (ds->ds_dkev->dk_kevent.filter) {
624 case DISPATCH_EVFILT_TIMER:
625 _dispatch_timers_update(ds);
626 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
627 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds, ds->ds_dkev);
628 return;
629 }
630 uint32_t flags;
631 bool do_resume = _dispatch_kevent_register(&ds->ds_dkev, &flags);
632 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds->ds_refs, dr_list);
633 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
634 _dispatch_debug("kevent-source[%p]: armed kevent[%p]", ds, ds->ds_dkev);
635 if (do_resume || ds->ds_needs_rearm) {
636 _dispatch_source_kevent_resume(ds, flags);
637 }
638 _dispatch_object_debug(ds, "%s", __func__);
639 }
640
641 DISPATCH_ALWAYS_INLINE
642 static inline dispatch_queue_t
643 _dispatch_source_invoke2(dispatch_object_t dou,
644 _dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED)
645 {
646 dispatch_source_t ds = dou._ds;
647 if (_dispatch_queue_class_probe(ds)) {
648 if (slowpath(_dispatch_queue_drain(ds))) {
649 DISPATCH_CLIENT_CRASH("Sync onto source");
650 }
651 }
652
653 // This function performs all source actions. Each action is responsible
654 // for verifying that it takes place on the appropriate queue. If the
655 // current queue is not the correct queue for this action, the correct queue
656 // will be returned and the invoke will be re-driven on that queue.
657
658 // The order of tests here in invoke and in probe should be consistent.
659
660 dispatch_queue_t dq = _dispatch_queue_get_current();
661 dispatch_queue_t dkq = _dispatch_source_get_kevent_queue(ds);
662 dispatch_source_refs_t dr = ds->ds_refs;
663
664 if (!ds->ds_is_installed) {
665 // The source needs to be installed on the kevent queue.
666 if (dq != dkq) {
667 return dkq;
668 }
669 _dispatch_source_kevent_register(ds);
670 ds->ds_is_installed = true;
671 if (dr->ds_handler[DS_REGISTN_HANDLER]) {
672 return ds->do_targetq;
673 }
674 if (slowpath(ds->do_xref_cnt == -1)) {
675 return dkq; // rdar://problem/9558246
676 }
677 } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
678 // Source suspended by an item drained from the source queue.
679 return NULL;
680 } else if (dr->ds_handler[DS_REGISTN_HANDLER]) {
681 // The source has been registered and the registration handler needs
682 // to be delivered on the target queue.
683 if (dq != ds->do_targetq) {
684 return ds->do_targetq;
685 }
686 // clears ds_registration_handler
687 _dispatch_source_registration_callout(ds);
688 if (slowpath(ds->do_xref_cnt == -1)) {
689 return dkq; // rdar://problem/9558246
690 }
691 } else if ((ds->ds_atomic_flags & DSF_DELETED) && (ds->ds_pending_delete ||
692 (ds->ds_atomic_flags & DSF_ONESHOT))) {
693 // Pending source kevent unregistration has been completed
694 if (ds->ds_needs_mgr) {
695 dkq = &_dispatch_mgr_q;
696 }
697 if (dq != dkq) {
698 return dkq;
699 }
700 ds->ds_pending_delete = false;
701 if (ds->ds_atomic_flags & DSF_ONESHOT) {
702 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ONESHOT,
703 relaxed);
704 }
705 if (ds->ds_dkev) {
706 _dispatch_source_kevent_unregister(ds);
707 if (ds->ds_needs_mgr) {
708 return &_dispatch_mgr_q;
709 }
710 }
711 if (dr->ds_handler[DS_EVENT_HANDLER] ||
712 dr->ds_handler[DS_CANCEL_HANDLER] ||
713 dr->ds_handler[DS_REGISTN_HANDLER]) {
714 return ds->do_targetq;
715 }
716 } else if (((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1))
717 && !ds->ds_pending_delete) {
718 // The source has been cancelled and needs to be uninstalled from the
719 // kevent queue. After uninstallation, the cancellation handler needs
720 // to be delivered to the target queue.
721 if (ds->ds_dkev) {
722 if (ds->ds_needs_mgr) {
723 dkq = &_dispatch_mgr_q;
724 }
725 if (dq != dkq) {
726 return dkq;
727 }
728 _dispatch_source_kevent_unregister(ds);
729 if (ds->ds_needs_mgr) {
730 return &_dispatch_mgr_q;
731 }
732 if (ds->ds_pending_delete) {
733 // deferred unregistration
734 if (ds->ds_needs_rearm) {
735 return dkq;
736 }
737 return NULL;
738 }
739 }
740 if (dr->ds_handler[DS_EVENT_HANDLER] ||
741 dr->ds_handler[DS_CANCEL_HANDLER] ||
742 dr->ds_handler[DS_REGISTN_HANDLER]) {
743 if (dq != ds->do_targetq) {
744 return ds->do_targetq;
745 }
746 }
747 _dispatch_source_cancel_callout(ds);
748 } else if (ds->ds_pending_data && !ds->ds_pending_delete) {
749 // The source has pending data to deliver via the event handler callback
750 // on the target queue. Some sources need to be rearmed on the kevent
751 // queue after event delivery.
752 if (dq != ds->do_targetq) {
753 return ds->do_targetq;
754 }
755 _dispatch_source_latch_and_call(ds);
756 if (ds->ds_needs_rearm) {
757 return dkq;
758 }
759 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
760 // The source needs to be rearmed on the kevent queue.
761 if (dq != dkq) {
762 return dkq;
763 }
764 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
765 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds,
766 ds->ds_dkev);
767 _dispatch_source_kevent_resume(ds, 0);
768 }
769
770 return NULL;
771 }
772
773 DISPATCH_NOINLINE
774 void
775 _dispatch_source_invoke(dispatch_source_t ds, dispatch_object_t dou,
776 dispatch_invoke_flags_t flags)
777 {
778 _dispatch_queue_class_invoke(ds, dou._dc, flags, _dispatch_source_invoke2);
779 }
780
781 unsigned long
782 _dispatch_source_probe(dispatch_source_t ds)
783 {
784 // This function determines whether the source needs to be invoked.
785 // The order of tests here in probe and in invoke should be consistent.
786
787 dispatch_source_refs_t dr = ds->ds_refs;
788 if (!ds->ds_is_installed) {
789 // The source needs to be installed on the kevent queue.
790 return true;
791 } else if (dr->ds_handler[DS_REGISTN_HANDLER]) {
792 // The registration handler needs to be delivered to the target queue.
793 return true;
794 } else if ((ds->ds_atomic_flags & DSF_DELETED) && (ds->ds_pending_delete ||
795 (ds->ds_atomic_flags & DSF_ONESHOT))) {
796 // Pending source kevent unregistration has been completed
797 return true;
798 } else if (((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1))
799 && !ds->ds_pending_delete) {
800 // The source needs to be uninstalled from the kevent queue, or the
801 // cancellation handler needs to be delivered to the target queue.
802 // Note: cancellation assumes installation.
803 if (ds->ds_dkev || dr->ds_handler[DS_EVENT_HANDLER] ||
804 dr->ds_handler[DS_CANCEL_HANDLER] ||
805 dr->ds_handler[DS_REGISTN_HANDLER]) {
806 return true;
807 }
808 } else if (ds->ds_pending_data && !ds->ds_pending_delete) {
809 // The source has pending data to deliver to the target queue.
810 return true;
811 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
812 // The source needs to be rearmed on the kevent queue.
813 return true;
814 }
815 return _dispatch_queue_class_probe(ds);
816 }
817
818 static void
819 _dispatch_source_merge_kevent(dispatch_source_t ds,
820 const _dispatch_kevent_qos_s *ke)
821 {
822 _dispatch_object_debug(ds, "%s", __func__);
823 bool retained = false;
824 if ((ke->flags & EV_UDATA_SPECIFIC) && (ke->flags & EV_ONESHOT) &&
825 !(ke->flags & EV_DELETE)) {
826 _dispatch_debug("kevent-source[%p]: deferred delete oneshot kevent[%p]",
827 ds, (void*)ke->udata);
828 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ONESHOT, relaxed);
829 } else if ((ke->flags & EV_DELETE) || (ke->flags & EV_ONESHOT)) {
830 _dispatch_debug("kevent-source[%p]: delete kevent[%p]",
831 ds, (void*)ke->udata);
832 retained = true;
833 _dispatch_retain(ds);
834 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_DELETED, relaxed);
835 if (ke->flags & EV_DELETE) goto done;
836 }
837 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
838 goto done; // rdar://20204025
839 }
840 if (ds->ds_is_level) {
841 // ke->data is signed and "negative available data" makes no sense
842 // zero bytes happens when EV_EOF is set
843 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
844 dispatch_assert(ke->data >= 0l);
845 dispatch_atomic_store2o(ds, ds_pending_data, ~(unsigned long)ke->data,
846 relaxed);
847 } else if (ds->ds_is_adder) {
848 (void)dispatch_atomic_add2o(ds, ds_pending_data,
849 (unsigned long)ke->data, relaxed);
850 } else if (ke->fflags & ds->ds_pending_data_mask) {
851 (void)dispatch_atomic_or2o(ds, ds_pending_data,
852 ke->fflags & ds->ds_pending_data_mask, relaxed);
853 }
854 done:
855 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
856 if (ds->ds_needs_rearm) {
857 if (!retained) {
858 retained = true;
859 _dispatch_retain(ds); // rdar://20382435
860 }
861 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
862 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p] ",
863 ds, (void*)ke->udata);
864 }
865 if (retained) {
866 _dispatch_queue_wakeup_and_release((dispatch_queue_t)ds);
867 } else {
868 _dispatch_queue_wakeup((dispatch_queue_t)ds);
869 }
870 }
871
872 #pragma mark -
873 #pragma mark dispatch_kevent_t
874
875 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
876 static void _dispatch_kevent_guard(dispatch_kevent_t dk);
877 static void _dispatch_kevent_unguard(dispatch_kevent_t dk);
878 #else
879 static inline void _dispatch_kevent_guard(dispatch_kevent_t dk) { (void)dk; }
880 static inline void _dispatch_kevent_unguard(dispatch_kevent_t dk) { (void)dk; }
881 #endif
882
883 #if !DISPATCH_USE_EV_UDATA_SPECIFIC
884 static struct dispatch_kevent_s _dispatch_kevent_data_or = {
885 .dk_kevent = {
886 .filter = DISPATCH_EVFILT_CUSTOM_OR,
887 .flags = EV_CLEAR,
888 },
889 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
890 };
891 static struct dispatch_kevent_s _dispatch_kevent_data_add = {
892 .dk_kevent = {
893 .filter = DISPATCH_EVFILT_CUSTOM_ADD,
894 },
895 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
896 };
897 #endif // !DISPATCH_USE_EV_UDATA_SPECIFIC
898
899 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
900
901 DISPATCH_CACHELINE_ALIGN
902 static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
903
904 static void
905 _dispatch_kevent_init()
906 {
907 unsigned int i;
908 for (i = 0; i < DSL_HASH_SIZE; i++) {
909 TAILQ_INIT(&_dispatch_sources[i]);
910 }
911
912 #if !DISPATCH_USE_EV_UDATA_SPECIFIC
913 TAILQ_INSERT_TAIL(&_dispatch_sources[0],
914 &_dispatch_kevent_data_or, dk_list);
915 TAILQ_INSERT_TAIL(&_dispatch_sources[0],
916 &_dispatch_kevent_data_add, dk_list);
917 _dispatch_kevent_data_or.dk_kevent.udata =
918 (uintptr_t)&_dispatch_kevent_data_or;
919 _dispatch_kevent_data_add.dk_kevent.udata =
920 (uintptr_t)&_dispatch_kevent_data_add;
921 #endif // !DISPATCH_USE_EV_UDATA_SPECIFIC
922 }
923
924 static inline uintptr_t
925 _dispatch_kevent_hash(uint64_t ident, short filter)
926 {
927 uint64_t value;
928 #if HAVE_MACH
929 value = (filter == EVFILT_MACHPORT ||
930 filter == DISPATCH_EVFILT_MACH_NOTIFICATION ?
931 MACH_PORT_INDEX(ident) : ident);
932 #else
933 value = ident;
934 #endif
935 return DSL_HASH((uintptr_t)value);
936 }
937
938 static dispatch_kevent_t
939 _dispatch_kevent_find(uint64_t ident, short filter)
940 {
941 uintptr_t hash = _dispatch_kevent_hash(ident, filter);
942 dispatch_kevent_t dki;
943
944 TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
945 if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
946 break;
947 }
948 }
949 return dki;
950 }
951
952 static void
953 _dispatch_kevent_insert(dispatch_kevent_t dk)
954 {
955 if (dk->dk_kevent.flags & EV_UDATA_SPECIFIC) return;
956 _dispatch_kevent_guard(dk);
957 uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
958 dk->dk_kevent.filter);
959 TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
960 }
961
962 // Find existing kevents, and merge any new flags if necessary
963 static bool
964 _dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp)
965 {
966 dispatch_kevent_t dk = NULL, ds_dkev = *dkp;
967 uint32_t new_flags;
968 bool do_resume = false;
969
970 if (!(ds_dkev->dk_kevent.flags & EV_UDATA_SPECIFIC)) {
971 dk = _dispatch_kevent_find(ds_dkev->dk_kevent.ident,
972 ds_dkev->dk_kevent.filter);
973 }
974 if (dk) {
975 // If an existing dispatch kevent is found, check to see if new flags
976 // need to be added to the existing kevent
977 new_flags = ~dk->dk_kevent.fflags & ds_dkev->dk_kevent.fflags;
978 dk->dk_kevent.fflags |= ds_dkev->dk_kevent.fflags;
979 free(ds_dkev);
980 *dkp = dk;
981 do_resume = new_flags;
982 } else {
983 dk = ds_dkev;
984 _dispatch_kevent_insert(dk);
985 new_flags = dk->dk_kevent.fflags;
986 do_resume = true;
987 }
988 // Re-register the kevent with the kernel if new flags were added
989 // by the dispatch kevent
990 if (do_resume) {
991 dk->dk_kevent.flags |= EV_ADD;
992 }
993 *flgp = new_flags;
994 return do_resume;
995 }
996
997 static long
998 _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
999 uint32_t del_flags)
1000 {
1001 long r;
1002 switch (dk->dk_kevent.filter) {
1003 case DISPATCH_EVFILT_TIMER:
1004 case DISPATCH_EVFILT_CUSTOM_ADD:
1005 case DISPATCH_EVFILT_CUSTOM_OR:
1006 // these types not registered with kevent
1007 return 0;
1008 #if HAVE_MACH
1009 case EVFILT_MACHPORT:
1010 return _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
1011 case DISPATCH_EVFILT_MACH_NOTIFICATION:
1012 return _dispatch_kevent_mach_notify_resume(dk, new_flags, del_flags);
1013 #endif
1014 default:
1015 if (dk->dk_kevent.flags & EV_DELETE) {
1016 return 0;
1017 }
1018 r = _dispatch_kq_update(&dk->dk_kevent);
1019 if (r && (dk->dk_kevent.flags & EV_ADD) &&
1020 (dk->dk_kevent.flags & EV_UDATA_SPECIFIC)) {
1021 dk->dk_kevent.flags |= EV_DELETE;
1022 dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE);
1023 } else if (dk->dk_kevent.flags & EV_DISPATCH) {
1024 dk->dk_kevent.flags &= ~EV_ADD;
1025 }
1026 return r;
1027 }
1028 }
1029
1030 static long
1031 _dispatch_kevent_dispose(dispatch_kevent_t dk, int options)
1032 {
1033 long r = 0;
1034 switch (dk->dk_kevent.filter) {
1035 case DISPATCH_EVFILT_TIMER:
1036 case DISPATCH_EVFILT_CUSTOM_ADD:
1037 case DISPATCH_EVFILT_CUSTOM_OR:
1038 if (dk->dk_kevent.flags & EV_UDATA_SPECIFIC) {
1039 free(dk);
1040 } else {
1041 // these sources live on statically allocated lists
1042 }
1043 return r;
1044 #if HAVE_MACH
1045 case EVFILT_MACHPORT:
1046 _dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
1047 break;
1048 case DISPATCH_EVFILT_MACH_NOTIFICATION:
1049 _dispatch_kevent_mach_notify_resume(dk, 0, dk->dk_kevent.fflags);
1050 break;
1051 #endif
1052 default:
1053 if (~dk->dk_kevent.flags & EV_DELETE) {
1054 dk->dk_kevent.flags |= EV_DELETE;
1055 dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE);
1056 if (options & DKEV_DISPOSE_IMMEDIATE_DELETE) {
1057 dk->dk_kevent.flags |= EV_ENABLE;
1058 }
1059 r = _dispatch_kq_update(&dk->dk_kevent);
1060 if (r == ENOENT && (options & DKEV_DISPOSE_IGNORE_ENOENT)) {
1061 r = 0;
1062 }
1063 if (options & DKEV_DISPOSE_IMMEDIATE_DELETE) {
1064 dk->dk_kevent.flags &= ~EV_ENABLE;
1065 }
1066 }
1067 break;
1068 }
1069 if ((r == EINPROGRESS || r == ENOENT) &&
1070 (dk->dk_kevent.flags & EV_UDATA_SPECIFIC)) {
1071 // deferred EV_DELETE or concurrent concurrent EV_DELETE delivery
1072 dk->dk_kevent.flags &= ~EV_DELETE;
1073 dk->dk_kevent.flags |= EV_ENABLE;
1074 } else {
1075 if ((dk->dk_kevent.flags & EV_UDATA_SPECIFIC)) {
1076 #if DISPATCH_DEBUG
1077 // zero/trash dr linkage
1078 dispatch_source_refs_t dr = TAILQ_FIRST(&dk->dk_sources);
1079 TAILQ_REMOVE(&dk->dk_sources, dr, dr_list);
1080 #endif
1081 } else {
1082 uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
1083 dk->dk_kevent.filter);
1084 TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list);
1085 }
1086 _dispatch_kevent_unguard(dk);
1087 free(dk);
1088 }
1089 return r;
1090 }
1091
1092 static long
1093 _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg, int options)
1094 {
1095 dispatch_source_refs_t dri;
1096 uint32_t del_flags, fflags = 0;
1097 long r = 0;
1098
1099 if (TAILQ_EMPTY(&dk->dk_sources) ||
1100 (dk->dk_kevent.flags & EV_UDATA_SPECIFIC)) {
1101 r = _dispatch_kevent_dispose(dk, options);
1102 } else {
1103 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
1104 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
1105 uint32_t mask = (uint32_t)dsi->ds_pending_data_mask;
1106 fflags |= mask;
1107 }
1108 del_flags = flg & ~fflags;
1109 if (del_flags) {
1110 dk->dk_kevent.flags |= EV_ADD;
1111 dk->dk_kevent.fflags &= ~del_flags;
1112 r = _dispatch_kevent_resume(dk, 0, del_flags);
1113 }
1114 }
1115 return r;
1116 }
1117
1118 DISPATCH_NOINLINE
1119 static void
1120 _dispatch_kevent_proc_exit(_dispatch_kevent_qos_s *ke)
1121 {
1122 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
1123 // <rdar://problem/5067725>. As a workaround, we simulate an exit event for
1124 // any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
1125 _dispatch_kevent_qos_s fake;
1126 fake = *ke;
1127 fake.flags &= ~EV_ERROR;
1128 fake.fflags = NOTE_EXIT;
1129 fake.data = 0;
1130 _dispatch_kevent_drain(&fake);
1131 }
1132
1133 DISPATCH_NOINLINE
1134 static void
1135 _dispatch_kevent_error(_dispatch_kevent_qos_s *ke)
1136 {
1137 _dispatch_kevent_debug(ke, __func__);
1138 if (ke->data) {
1139 // log the unexpected error
1140 _dispatch_bug_kevent_client("kevent", _evfiltstr(ke->filter),
1141 ke->flags & EV_DELETE ? "delete" :
1142 ke->flags & EV_ADD ? "add" :
1143 ke->flags & EV_ENABLE ? "enable" : "monitor",
1144 (int)ke->data);
1145 }
1146 }
1147
1148 static void
1149 _dispatch_kevent_drain(_dispatch_kevent_qos_s *ke)
1150 {
1151 #if DISPATCH_DEBUG
1152 static dispatch_once_t pred;
1153 dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
1154 #endif
1155 if (ke->filter == EVFILT_USER) {
1156 _dispatch_kevent_mgr_debug(ke, __func__);
1157 return;
1158 }
1159 if (slowpath(ke->flags & EV_ERROR)) {
1160 if (ke->filter == EVFILT_PROC && ke->data == ESRCH) {
1161 ke->data = 0; // don't return error from caller
1162 if (ke->flags & EV_DELETE) {
1163 _dispatch_debug("kevent[0x%llx]: ignoring ESRCH from "
1164 "EVFILT_PROC EV_DELETE", ke->udata);
1165 return;
1166 }
1167 _dispatch_debug("kevent[0x%llx]: ESRCH from EVFILT_PROC: "
1168 "generating fake NOTE_EXIT", ke->udata);
1169 return _dispatch_kevent_proc_exit(ke);
1170 }
1171 return _dispatch_kevent_error(ke);
1172 }
1173 if (ke->filter == EVFILT_TIMER) {
1174 return _dispatch_timers_kevent(ke);
1175 }
1176 #if HAVE_MACH
1177 if (ke->filter == EVFILT_MACHPORT) {
1178 return _dispatch_kevent_mach_portset(ke);
1179 }
1180 #endif
1181 return _dispatch_kevent_merge(ke);
1182 }
1183
1184 DISPATCH_NOINLINE
1185 static void
1186 _dispatch_kevent_merge(_dispatch_kevent_qos_s *ke)
1187 {
1188 _dispatch_kevent_debug(ke, __func__);
1189 dispatch_kevent_t dk;
1190 dispatch_source_refs_t dri, dr_next;
1191
1192 dk = (void*)ke->udata;
1193 dispatch_assert(dk);
1194
1195 TAILQ_FOREACH_SAFE(dri, &dk->dk_sources, dr_list, dr_next) {
1196 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), ke);
1197 }
1198 }
1199
1200 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1201 static void
1202 _dispatch_kevent_guard(dispatch_kevent_t dk)
1203 {
1204 guardid_t guard;
1205 const unsigned int guard_flags = GUARD_CLOSE;
1206 int r, fd_flags = 0;
1207 switch (dk->dk_kevent.filter) {
1208 case EVFILT_READ:
1209 case EVFILT_WRITE:
1210 case EVFILT_VNODE:
1211 guard = &dk->dk_kevent;
1212 r = change_fdguard_np((int)dk->dk_kevent.ident, NULL, 0,
1213 &guard, guard_flags, &fd_flags);
1214 if (slowpath(r == -1)) {
1215 int err = errno;
1216 if (err != EPERM) {
1217 (void)dispatch_assume_zero(err);
1218 }
1219 return;
1220 }
1221 dk->dk_kevent.ext[0] = guard_flags;
1222 dk->dk_kevent.ext[1] = fd_flags;
1223 break;
1224 }
1225 }
1226
1227 static void
1228 _dispatch_kevent_unguard(dispatch_kevent_t dk)
1229 {
1230 guardid_t guard;
1231 unsigned int guard_flags;
1232 int r, fd_flags;
1233 switch (dk->dk_kevent.filter) {
1234 case EVFILT_READ:
1235 case EVFILT_WRITE:
1236 case EVFILT_VNODE:
1237 guard_flags = (unsigned int)dk->dk_kevent.ext[0];
1238 if (!guard_flags) {
1239 return;
1240 }
1241 guard = &dk->dk_kevent;
1242 fd_flags = (int)dk->dk_kevent.ext[1];
1243 r = change_fdguard_np((int)dk->dk_kevent.ident, &guard,
1244 guard_flags, NULL, 0, &fd_flags);
1245 if (slowpath(r == -1)) {
1246 (void)dispatch_assume_zero(errno);
1247 return;
1248 }
1249 dk->dk_kevent.ext[0] = 0;
1250 break;
1251 }
1252 }
1253 #endif // DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1254
1255 #pragma mark -
1256 #pragma mark dispatch_source_timer
1257
1258 #if DISPATCH_USE_DTRACE
1259 static dispatch_source_refs_t
1260 _dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT];
1261 #define _dispatch_trace_next_timer_set(x, q) \
1262 _dispatch_trace_next_timer[(q)] = (x)
1263 #define _dispatch_trace_next_timer_program(d, q) \
1264 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1265 #define _dispatch_trace_next_timer_wake(q) \
1266 _dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)])
1267 #else
1268 #define _dispatch_trace_next_timer_set(x, q)
1269 #define _dispatch_trace_next_timer_program(d, q)
1270 #define _dispatch_trace_next_timer_wake(q)
1271 #endif
1272
1273 #define _dispatch_source_timer_telemetry_enabled() false
1274
1275 DISPATCH_NOINLINE
1276 static void
1277 _dispatch_source_timer_telemetry_slow(dispatch_source_t ds,
1278 uintptr_t ident, struct dispatch_timer_source_s *values)
1279 {
1280 if (_dispatch_trace_timer_configure_enabled()) {
1281 _dispatch_trace_timer_configure(ds, ident, values);
1282 }
1283 }
1284
1285 DISPATCH_ALWAYS_INLINE
1286 static inline void
1287 _dispatch_source_timer_telemetry(dispatch_source_t ds, uintptr_t ident,
1288 struct dispatch_timer_source_s *values)
1289 {
1290 if (_dispatch_trace_timer_configure_enabled() ||
1291 _dispatch_source_timer_telemetry_enabled()) {
1292 _dispatch_source_timer_telemetry_slow(ds, ident, values);
1293 asm(""); // prevent tailcall
1294 }
1295 }
1296
1297 // approx 1 year (60s * 60m * 24h * 365d)
1298 #define FOREVER_NSEC 31536000000000000ull
1299
1300 DISPATCH_ALWAYS_INLINE
1301 static inline uint64_t
1302 _dispatch_source_timer_now(uint64_t nows[], unsigned int tidx)
1303 {
1304 unsigned int tk = DISPATCH_TIMER_KIND(tidx);
1305 if (nows && fastpath(nows[tk])) {
1306 return nows[tk];
1307 }
1308 uint64_t now;
1309 switch (tk) {
1310 case DISPATCH_TIMER_KIND_MACH:
1311 now = _dispatch_absolute_time();
1312 break;
1313 case DISPATCH_TIMER_KIND_WALL:
1314 now = _dispatch_get_nanoseconds();
1315 break;
1316 }
1317 if (nows) {
1318 nows[tk] = now;
1319 }
1320 return now;
1321 }
1322
1323 static inline unsigned long
1324 _dispatch_source_timer_data(dispatch_source_refs_t dr, unsigned long prev)
1325 {
1326 // calculate the number of intervals since last fire
1327 unsigned long data, missed;
1328 uint64_t now;
1329 now = _dispatch_source_timer_now(NULL, _dispatch_source_timer_idx(dr));
1330 missed = (unsigned long)((now - ds_timer(dr).last_fire) /
1331 ds_timer(dr).interval);
1332 // correct for missed intervals already delivered last time
1333 data = prev - ds_timer(dr).missed + missed;
1334 ds_timer(dr).missed = missed;
1335 return data;
1336 }
1337
1338 struct dispatch_set_timer_params {
1339 dispatch_source_t ds;
1340 uintptr_t ident;
1341 struct dispatch_timer_source_s values;
1342 };
1343
1344 static void
1345 _dispatch_source_set_timer3(void *context)
1346 {
1347 // Called on the _dispatch_mgr_q
1348 struct dispatch_set_timer_params *params = context;
1349 dispatch_source_t ds = params->ds;
1350 ds->ds_ident_hack = params->ident;
1351 ds_timer(ds->ds_refs) = params->values;
1352 // Clear any pending data that might have accumulated on
1353 // older timer params <rdar://problem/8574886>
1354 ds->ds_pending_data = 0;
1355 // Re-arm in case we got disarmed because of pending set_timer suspension
1356 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, release);
1357 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds, ds->ds_dkev);
1358 dispatch_resume(ds);
1359 // Must happen after resume to avoid getting disarmed due to suspension
1360 _dispatch_timers_update(ds);
1361 dispatch_release(ds);
1362 if (params->values.flags & DISPATCH_TIMER_WALL_CLOCK) {
1363 _dispatch_mach_host_calendar_change_register();
1364 }
1365 free(params);
1366 }
1367
1368 static void
1369 _dispatch_source_set_timer2(void *context)
1370 {
1371 // Called on the source queue
1372 struct dispatch_set_timer_params *params = context;
1373 dispatch_suspend(params->ds);
1374 _dispatch_barrier_async_detached_f(&_dispatch_mgr_q, params,
1375 _dispatch_source_set_timer3);
1376 }
1377
1378 DISPATCH_NOINLINE
1379 static struct dispatch_set_timer_params *
1380 _dispatch_source_timer_params(dispatch_source_t ds, dispatch_time_t start,
1381 uint64_t interval, uint64_t leeway)
1382 {
1383 struct dispatch_set_timer_params *params;
1384 params = _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params));
1385 params->ds = ds;
1386 params->values.flags = ds_timer(ds->ds_refs).flags;
1387
1388 if (interval == 0) {
1389 // we use zero internally to mean disabled
1390 interval = 1;
1391 } else if ((int64_t)interval < 0) {
1392 // 6866347 - make sure nanoseconds won't overflow
1393 interval = INT64_MAX;
1394 }
1395 if ((int64_t)leeway < 0) {
1396 leeway = INT64_MAX;
1397 }
1398 if (start == DISPATCH_TIME_NOW) {
1399 start = _dispatch_absolute_time();
1400 } else if (start == DISPATCH_TIME_FOREVER) {
1401 start = INT64_MAX;
1402 }
1403
1404 if ((int64_t)start < 0) {
1405 // wall clock
1406 start = (dispatch_time_t)-((int64_t)start);
1407 params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
1408 } else {
1409 // absolute clock
1410 interval = _dispatch_time_nano2mach(interval);
1411 if (interval < 1) {
1412 // rdar://problem/7287561 interval must be at least one in
1413 // in order to avoid later division by zero when calculating
1414 // the missed interval count. (NOTE: the wall clock's
1415 // interval is already "fixed" to be 1 or more)
1416 interval = 1;
1417 }
1418 leeway = _dispatch_time_nano2mach(leeway);
1419 params->values.flags &= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK;
1420 }
1421 params->ident = DISPATCH_TIMER_IDENT(params->values.flags);
1422 params->values.target = start;
1423 params->values.deadline = (start < UINT64_MAX - leeway) ?
1424 start + leeway : UINT64_MAX;
1425 params->values.interval = interval;
1426 params->values.leeway = (interval == INT64_MAX || leeway < interval / 2) ?
1427 leeway : interval / 2;
1428 return params;
1429 }
1430
1431 DISPATCH_ALWAYS_INLINE
1432 static inline void
1433 _dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1434 uint64_t interval, uint64_t leeway, bool source_sync)
1435 {
1436 if (slowpath(!ds->ds_is_timer) ||
1437 slowpath(ds_timer(ds->ds_refs).flags & DISPATCH_TIMER_INTERVAL)) {
1438 DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source");
1439 }
1440
1441 struct dispatch_set_timer_params *params;
1442 params = _dispatch_source_timer_params(ds, start, interval, leeway);
1443
1444 _dispatch_source_timer_telemetry(ds, params->ident, &params->values);
1445 // Suspend the source so that it doesn't fire with pending changes
1446 // The use of suspend/resume requires the external retain/release
1447 dispatch_retain(ds);
1448 if (source_sync) {
1449 return _dispatch_barrier_trysync_f((dispatch_queue_t)ds, params,
1450 _dispatch_source_set_timer2);
1451 } else {
1452 return _dispatch_source_set_timer2(params);
1453 }
1454 }
1455
1456 void
1457 dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1458 uint64_t interval, uint64_t leeway)
1459 {
1460 _dispatch_source_set_timer(ds, start, interval, leeway, true);
1461 }
1462
1463 void
1464 _dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds,
1465 dispatch_time_t start, uint64_t interval, uint64_t leeway)
1466 {
1467 // Don't serialize through the source queue for CF timers <rdar://13833190>
1468 _dispatch_source_set_timer(ds, start, interval, leeway, false);
1469 }
1470
1471 void
1472 _dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval)
1473 {
1474 dispatch_source_refs_t dr = ds->ds_refs;
1475 #define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1476 const bool animation = ds_timer(dr).flags & DISPATCH_INTERVAL_UI_ANIMATION;
1477 if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME :
1478 FOREVER_NSEC/NSEC_PER_MSEC))) {
1479 interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC;
1480 } else {
1481 interval = FOREVER_NSEC;
1482 }
1483 interval = _dispatch_time_nano2mach(interval);
1484 uint64_t target = _dispatch_absolute_time() + interval;
1485 target = (target / interval) * interval;
1486 const uint64_t leeway = animation ?
1487 _dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2;
1488 ds_timer(dr).target = target;
1489 ds_timer(dr).deadline = target + leeway;
1490 ds_timer(dr).interval = interval;
1491 ds_timer(dr).leeway = leeway;
1492 _dispatch_source_timer_telemetry(ds, ds->ds_ident_hack, &ds_timer(dr));
1493 }
1494
1495 #pragma mark -
1496 #pragma mark dispatch_timers
1497
1498 #define DISPATCH_TIMER_STRUCT(refs) \
1499 uint64_t target, deadline; \
1500 TAILQ_HEAD(, refs) dt_sources
1501
1502 typedef struct dispatch_timer_s {
1503 DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s);
1504 } *dispatch_timer_t;
1505
1506 #define DISPATCH_TIMER_INITIALIZER(tidx) \
1507 [tidx] = { \
1508 .target = UINT64_MAX, \
1509 .deadline = UINT64_MAX, \
1510 .dt_sources = TAILQ_HEAD_INITIALIZER( \
1511 _dispatch_timer[tidx].dt_sources), \
1512 }
1513 #define DISPATCH_TIMER_INIT(kind, qos) \
1514 DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1515 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1516
1517 struct dispatch_timer_s _dispatch_timer[] = {
1518 DISPATCH_TIMER_INIT(WALL, NORMAL),
1519 DISPATCH_TIMER_INIT(WALL, CRITICAL),
1520 DISPATCH_TIMER_INIT(WALL, BACKGROUND),
1521 DISPATCH_TIMER_INIT(MACH, NORMAL),
1522 DISPATCH_TIMER_INIT(MACH, CRITICAL),
1523 DISPATCH_TIMER_INIT(MACH, BACKGROUND),
1524 };
1525 #define DISPATCH_TIMER_COUNT \
1526 ((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0])))
1527
1528 #define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1529 (uintptr_t)&_dispatch_kevent_timer[tidx]
1530 #ifdef __LP64__
1531 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1532 .udata = DISPATCH_KEVENT_TIMER_UDATA(tidx)
1533 #else // __LP64__
1534 // dynamic initialization in _dispatch_timers_init()
1535 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1536 .udata = 0
1537 #endif // __LP64__
1538 #define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \
1539 [tidx] = { \
1540 .dk_kevent = { \
1541 .ident = tidx, \
1542 .filter = DISPATCH_EVFILT_TIMER, \
1543 DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \
1544 }, \
1545 .dk_sources = TAILQ_HEAD_INITIALIZER( \
1546 _dispatch_kevent_timer[tidx].dk_sources), \
1547 }
1548 #define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \
1549 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1550 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1551
1552 struct dispatch_kevent_s _dispatch_kevent_timer[] = {
1553 DISPATCH_KEVENT_TIMER_INIT(WALL, NORMAL),
1554 DISPATCH_KEVENT_TIMER_INIT(WALL, CRITICAL),
1555 DISPATCH_KEVENT_TIMER_INIT(WALL, BACKGROUND),
1556 DISPATCH_KEVENT_TIMER_INIT(MACH, NORMAL),
1557 DISPATCH_KEVENT_TIMER_INIT(MACH, CRITICAL),
1558 DISPATCH_KEVENT_TIMER_INIT(MACH, BACKGROUND),
1559 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM),
1560 };
1561 #define DISPATCH_KEVENT_TIMER_COUNT \
1562 ((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0])))
1563
1564 #define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8)
1565 #define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(qos, note) \
1566 [qos] = { \
1567 .ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(qos), \
1568 .filter = EVFILT_TIMER, \
1569 .flags = EV_ONESHOT, \
1570 .fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \
1571 }
1572 #define DISPATCH_KEVENT_TIMEOUT_INIT(qos, note) \
1573 DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_QOS_##qos, note)
1574
1575 _dispatch_kevent_qos_s _dispatch_kevent_timeout[] = {
1576 DISPATCH_KEVENT_TIMEOUT_INIT(NORMAL, 0),
1577 DISPATCH_KEVENT_TIMEOUT_INIT(CRITICAL, NOTE_CRITICAL),
1578 DISPATCH_KEVENT_TIMEOUT_INIT(BACKGROUND, NOTE_BACKGROUND),
1579 };
1580
1581 #define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
1582 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
1583
1584 static const uint64_t _dispatch_kevent_coalescing_window[] = {
1585 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75),
1586 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1),
1587 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100),
1588 };
1589
1590 #define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \
1591 typeof(dr) dri = NULL; typeof(dt) dti; \
1592 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1593 TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \
1594 if (ds_timer(dr).target < ds_timer(dri).target) { \
1595 break; \
1596 } \
1597 } \
1598 TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \
1599 if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \
1600 break; \
1601 } \
1602 } \
1603 if (dti) { \
1604 TAILQ_INSERT_BEFORE(dti, dt, dt_list); \
1605 } else { \
1606 TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \
1607 } \
1608 } \
1609 if (dri) { \
1610 TAILQ_INSERT_BEFORE(dri, dr, dr_list); \
1611 } else { \
1612 TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \
1613 } \
1614 })
1615
1616 #define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \
1617 ({ \
1618 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1619 TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \
1620 } \
1621 TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \
1622 dr_list); })
1623
1624 #define _dispatch_timers_check(dra, dta) ({ \
1625 unsigned int qosm = _dispatch_timers_qos_mask; \
1626 bool update = false; \
1627 unsigned int tidx; \
1628 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \
1629 if (!(qosm & 1 << DISPATCH_TIMER_QOS(tidx))){ \
1630 continue; \
1631 } \
1632 dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \
1633 TAILQ_FIRST(&dra[tidx].dk_sources); \
1634 dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \
1635 TAILQ_FIRST(&dta[tidx].dt_sources); \
1636 uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \
1637 uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \
1638 if (target != dta[tidx].target) { \
1639 dta[tidx].target = target; \
1640 update = true; \
1641 } \
1642 if (deadline != dta[tidx].deadline) { \
1643 dta[tidx].deadline = deadline; \
1644 update = true; \
1645 } \
1646 } \
1647 update; })
1648
1649 static bool _dispatch_timers_reconfigure, _dispatch_timer_expired;
1650 static unsigned int _dispatch_timers_qos_mask;
1651 static bool _dispatch_timers_force_max_leeway;
1652
1653 static void
1654 _dispatch_timers_init(void)
1655 {
1656 #ifndef __LP64__
1657 unsigned int tidx;
1658 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1659 _dispatch_kevent_timer[tidx].dk_kevent.udata = \
1660 DISPATCH_KEVENT_TIMER_UDATA(tidx);
1661 }
1662 #endif // __LP64__
1663 if (slowpath(getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY"))) {
1664 _dispatch_timers_force_max_leeway = true;
1665 }
1666 }
1667
1668 static inline void
1669 _dispatch_timers_unregister(dispatch_source_t ds, dispatch_kevent_t dk)
1670 {
1671 dispatch_source_refs_t dr = ds->ds_refs;
1672 unsigned int tidx = (unsigned int)dk->dk_kevent.ident;
1673
1674 if (slowpath(ds_timer_aggregate(ds))) {
1675 _dispatch_timer_aggregates_unregister(ds, tidx);
1676 }
1677 _dispatch_timers_remove(tidx, dk, _dispatch_kevent_timer, dr, dr_list,
1678 _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
1679 if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1680 _dispatch_timers_reconfigure = true;
1681 _dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx);
1682 }
1683 }
1684
1685 // Updates the ordered list of timers based on next fire date for changes to ds.
1686 // Should only be called from the context of _dispatch_mgr_q.
1687 static void
1688 _dispatch_timers_update(dispatch_source_t ds)
1689 {
1690 dispatch_kevent_t dk = ds->ds_dkev;
1691 dispatch_source_refs_t dr = ds->ds_refs;
1692 unsigned int tidx;
1693
1694 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1695
1696 // Do not reschedule timers unregistered with _dispatch_kevent_unregister()
1697 if (slowpath(!dk)) {
1698 return;
1699 }
1700 // Move timers that are disabled, suspended or have missed intervals to the
1701 // disarmed list, rearm after resume resp. source invoke will reenable them
1702 if (!ds_timer(dr).target || DISPATCH_OBJECT_SUSPENDED(ds) ||
1703 ds->ds_pending_data) {
1704 tidx = DISPATCH_TIMER_INDEX_DISARM;
1705 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
1706 _dispatch_debug("kevent-source[%p]: disarmed kevent[%p]", ds,
1707 ds->ds_dkev);
1708 } else {
1709 tidx = _dispatch_source_timer_idx(dr);
1710 }
1711 if (slowpath(ds_timer_aggregate(ds))) {
1712 _dispatch_timer_aggregates_register(ds);
1713 }
1714 if (slowpath(!ds->ds_is_installed)) {
1715 ds->ds_is_installed = true;
1716 if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1717 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
1718 _dispatch_debug("kevent-source[%p]: rearmed kevent[%p]", ds,
1719 ds->ds_dkev);
1720 }
1721 _dispatch_object_debug(ds, "%s", __func__);
1722 ds->ds_dkev = NULL;
1723 free(dk);
1724 } else {
1725 _dispatch_timers_unregister(ds, dk);
1726 }
1727 if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1728 _dispatch_timers_reconfigure = true;
1729 _dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx);
1730 }
1731 if (dk != &_dispatch_kevent_timer[tidx]){
1732 ds->ds_dkev = &_dispatch_kevent_timer[tidx];
1733 }
1734 _dispatch_timers_insert(tidx, _dispatch_kevent_timer, dr, dr_list,
1735 _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
1736 if (slowpath(ds_timer_aggregate(ds))) {
1737 _dispatch_timer_aggregates_update(ds, tidx);
1738 }
1739 }
1740
1741 static inline void
1742 _dispatch_timers_run2(uint64_t nows[], unsigned int tidx)
1743 {
1744 dispatch_source_refs_t dr;
1745 dispatch_source_t ds;
1746 uint64_t now, missed;
1747
1748 now = _dispatch_source_timer_now(nows, tidx);
1749 while ((dr = TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources))) {
1750 ds = _dispatch_source_from_refs(dr);
1751 // We may find timers on the wrong list due to a pending update from
1752 // dispatch_source_set_timer. Force an update of the list in that case.
1753 if (tidx != ds->ds_ident_hack) {
1754 _dispatch_timers_update(ds);
1755 continue;
1756 }
1757 if (!ds_timer(dr).target) {
1758 // No configured timers on the list
1759 break;
1760 }
1761 if (ds_timer(dr).target > now) {
1762 // Done running timers for now.
1763 break;
1764 }
1765 // Remove timers that are suspended or have missed intervals from the
1766 // list, rearm after resume resp. source invoke will reenable them
1767 if (DISPATCH_OBJECT_SUSPENDED(ds) || ds->ds_pending_data) {
1768 _dispatch_timers_update(ds);
1769 continue;
1770 }
1771 // Calculate number of missed intervals.
1772 missed = (now - ds_timer(dr).target) / ds_timer(dr).interval;
1773 if (++missed > INT_MAX) {
1774 missed = INT_MAX;
1775 }
1776 if (ds_timer(dr).interval < INT64_MAX) {
1777 ds_timer(dr).target += missed * ds_timer(dr).interval;
1778 ds_timer(dr).deadline = ds_timer(dr).target + ds_timer(dr).leeway;
1779 } else {
1780 ds_timer(dr).target = UINT64_MAX;
1781 ds_timer(dr).deadline = UINT64_MAX;
1782 }
1783 _dispatch_timers_update(ds);
1784 ds_timer(dr).last_fire = now;
1785
1786 unsigned long data;
1787 data = dispatch_atomic_add2o(ds, ds_pending_data,
1788 (unsigned long)missed, relaxed);
1789 _dispatch_trace_timer_fire(dr, data, (unsigned long)missed);
1790 _dispatch_wakeup(ds);
1791 }
1792 }
1793
1794 DISPATCH_NOINLINE
1795 static void
1796 _dispatch_timers_run(uint64_t nows[])
1797 {
1798 unsigned int tidx;
1799 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1800 if (!TAILQ_EMPTY(&_dispatch_kevent_timer[tidx].dk_sources)) {
1801 _dispatch_timers_run2(nows, tidx);
1802 }
1803 }
1804 }
1805
1806 static inline unsigned int
1807 _dispatch_timers_get_delay(uint64_t nows[], struct dispatch_timer_s timer[],
1808 uint64_t *delay, uint64_t *leeway, int qos)
1809 {
1810 unsigned int tidx, ridx = DISPATCH_TIMER_COUNT;
1811 uint64_t tmp, delta = UINT64_MAX, dldelta = UINT64_MAX;
1812
1813 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1814 if (qos >= 0 && qos != DISPATCH_TIMER_QOS(tidx)){
1815 continue;
1816 }
1817 uint64_t target = timer[tidx].target;
1818 if (target == UINT64_MAX) {
1819 continue;
1820 }
1821 uint64_t deadline = timer[tidx].deadline;
1822 if (qos >= 0) {
1823 // Timer pre-coalescing <rdar://problem/13222034>
1824 uint64_t window = _dispatch_kevent_coalescing_window[qos];
1825 uint64_t latest = deadline > window ? deadline - window : 0;
1826 dispatch_source_refs_t dri;
1827 TAILQ_FOREACH(dri, &_dispatch_kevent_timer[tidx].dk_sources,
1828 dr_list) {
1829 tmp = ds_timer(dri).target;
1830 if (tmp > latest) break;
1831 target = tmp;
1832 }
1833 }
1834 uint64_t now = _dispatch_source_timer_now(nows, tidx);
1835 if (target <= now) {
1836 delta = 0;
1837 break;
1838 }
1839 tmp = target - now;
1840 if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) {
1841 tmp = _dispatch_time_mach2nano(tmp);
1842 }
1843 if (tmp < INT64_MAX && tmp < delta) {
1844 ridx = tidx;
1845 delta = tmp;
1846 }
1847 dispatch_assert(target <= deadline);
1848 tmp = deadline - now;
1849 if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) {
1850 tmp = _dispatch_time_mach2nano(tmp);
1851 }
1852 if (tmp < INT64_MAX && tmp < dldelta) {
1853 dldelta = tmp;
1854 }
1855 }
1856 *delay = delta;
1857 *leeway = delta && delta < UINT64_MAX ? dldelta - delta : UINT64_MAX;
1858 return ridx;
1859 }
1860
1861 static bool
1862 _dispatch_timers_program2(uint64_t nows[], _dispatch_kevent_qos_s *ke,
1863 unsigned int qos)
1864 {
1865 unsigned int tidx;
1866 bool poll;
1867 uint64_t delay, leeway;
1868
1869 tidx = _dispatch_timers_get_delay(nows, _dispatch_timer, &delay, &leeway,
1870 (int)qos);
1871 poll = (delay == 0);
1872 if (poll || delay == UINT64_MAX) {
1873 _dispatch_trace_next_timer_set(NULL, qos);
1874 if (!ke->data) {
1875 return poll;
1876 }
1877 ke->data = 0;
1878 ke->flags |= EV_DELETE;
1879 ke->flags &= ~(EV_ADD|EV_ENABLE);
1880 } else {
1881 _dispatch_trace_next_timer_set(
1882 TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources), qos);
1883 _dispatch_trace_next_timer_program(delay, qos);
1884 delay += _dispatch_source_timer_now(nows, DISPATCH_TIMER_KIND_WALL);
1885 if (slowpath(_dispatch_timers_force_max_leeway)) {
1886 ke->data = (int64_t)(delay + leeway);
1887 ke->ext[1] = 0;
1888 } else {
1889 ke->data = (int64_t)delay;
1890 ke->ext[1] = leeway;
1891 }
1892 ke->flags |= EV_ADD|EV_ENABLE;
1893 ke->flags &= ~EV_DELETE;
1894 }
1895 _dispatch_kq_update(ke);
1896 return poll;
1897 }
1898
1899 DISPATCH_NOINLINE
1900 static bool
1901 _dispatch_timers_program(uint64_t nows[])
1902 {
1903 bool poll = false;
1904 unsigned int qos, qosm = _dispatch_timers_qos_mask;
1905 for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) {
1906 if (!(qosm & 1 << qos)){
1907 continue;
1908 }
1909 poll |= _dispatch_timers_program2(nows, &_dispatch_kevent_timeout[qos],
1910 qos);
1911 }
1912 return poll;
1913 }
1914
1915 DISPATCH_NOINLINE
1916 static bool
1917 _dispatch_timers_configure(void)
1918 {
1919 _dispatch_timer_aggregates_check();
1920 // Find out if there is a new target/deadline on the timer lists
1921 return _dispatch_timers_check(_dispatch_kevent_timer, _dispatch_timer);
1922 }
1923
1924 static void
1925 _dispatch_timers_calendar_change(void)
1926 {
1927 // calendar change may have gone past the wallclock deadline
1928 _dispatch_timer_expired = true;
1929 _dispatch_timers_qos_mask = ~0u;
1930 }
1931
1932 static void
1933 _dispatch_timers_kevent(_dispatch_kevent_qos_s *ke)
1934 {
1935 _dispatch_kevent_debug(ke, __func__);
1936 dispatch_assert(ke->data > 0);
1937 dispatch_assert((ke->ident & DISPATCH_KEVENT_TIMEOUT_IDENT_MASK) ==
1938 DISPATCH_KEVENT_TIMEOUT_IDENT_MASK);
1939 unsigned int qos = ke->ident & ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK;
1940 dispatch_assert(qos < DISPATCH_TIMER_QOS_COUNT);
1941 dispatch_assert(_dispatch_kevent_timeout[qos].data);
1942 _dispatch_kevent_timeout[qos].data = 0; // kevent deleted via EV_ONESHOT
1943 _dispatch_timer_expired = true;
1944 _dispatch_timers_qos_mask |= 1 << qos;
1945 _dispatch_trace_next_timer_wake(qos);
1946 }
1947
1948 static inline bool
1949 _dispatch_mgr_timers(void)
1950 {
1951 uint64_t nows[DISPATCH_TIMER_KIND_COUNT] = {};
1952 bool expired = slowpath(_dispatch_timer_expired);
1953 if (expired) {
1954 _dispatch_timers_run(nows);
1955 }
1956 bool reconfigure = slowpath(_dispatch_timers_reconfigure);
1957 if (reconfigure || expired) {
1958 if (reconfigure) {
1959 reconfigure = _dispatch_timers_configure();
1960 _dispatch_timers_reconfigure = false;
1961 }
1962 if (reconfigure || expired) {
1963 expired = _dispatch_timer_expired = _dispatch_timers_program(nows);
1964 expired = expired || _dispatch_mgr_q.dq_items_tail;
1965 }
1966 _dispatch_timers_qos_mask = 0;
1967 }
1968 return expired;
1969 }
1970
1971 #pragma mark -
1972 #pragma mark dispatch_timer_aggregate
1973
1974 typedef struct {
1975 TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s) dk_sources;
1976 } dispatch_timer_aggregate_refs_s;
1977
1978 typedef struct dispatch_timer_aggregate_s {
1979 DISPATCH_STRUCT_HEADER(queue);
1980 DISPATCH_QUEUE_HEADER;
1981 TAILQ_ENTRY(dispatch_timer_aggregate_s) dta_list;
1982 dispatch_timer_aggregate_refs_s
1983 dta_kevent_timer[DISPATCH_KEVENT_TIMER_COUNT];
1984 struct {
1985 DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s);
1986 } dta_timer[DISPATCH_TIMER_COUNT];
1987 struct dispatch_timer_s dta_timer_data[DISPATCH_TIMER_COUNT];
1988 unsigned int dta_refcount;
1989 } dispatch_timer_aggregate_s;
1990
1991 typedef TAILQ_HEAD(, dispatch_timer_aggregate_s) dispatch_timer_aggregates_s;
1992 static dispatch_timer_aggregates_s _dispatch_timer_aggregates =
1993 TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates);
1994
1995 dispatch_timer_aggregate_t
1996 dispatch_timer_aggregate_create(void)
1997 {
1998 unsigned int tidx;
1999 dispatch_timer_aggregate_t dta = _dispatch_alloc(DISPATCH_VTABLE(queue),
2000 sizeof(struct dispatch_timer_aggregate_s));
2001 _dispatch_queue_init((dispatch_queue_t)dta);
2002 dta->do_targetq = _dispatch_get_root_queue(
2003 _DISPATCH_QOS_CLASS_USER_INITIATED, true);
2004 dta->dq_width = DISPATCH_QUEUE_WIDTH_MAX;
2005 //FIXME: aggregates need custom vtable
2006 //dta->dq_label = "timer-aggregate";
2007 for (tidx = 0; tidx < DISPATCH_KEVENT_TIMER_COUNT; tidx++) {
2008 TAILQ_INIT(&dta->dta_kevent_timer[tidx].dk_sources);
2009 }
2010 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
2011 TAILQ_INIT(&dta->dta_timer[tidx].dt_sources);
2012 dta->dta_timer[tidx].target = UINT64_MAX;
2013 dta->dta_timer[tidx].deadline = UINT64_MAX;
2014 dta->dta_timer_data[tidx].target = UINT64_MAX;
2015 dta->dta_timer_data[tidx].deadline = UINT64_MAX;
2016 }
2017 return (dispatch_timer_aggregate_t)_dispatch_introspection_queue_create(
2018 (dispatch_queue_t)dta);
2019 }
2020
2021 typedef struct dispatch_timer_delay_s {
2022 dispatch_timer_t timer;
2023 uint64_t delay, leeway;
2024 } *dispatch_timer_delay_t;
2025
2026 static void
2027 _dispatch_timer_aggregate_get_delay(void *ctxt)
2028 {
2029 dispatch_timer_delay_t dtd = ctxt;
2030 struct { uint64_t nows[DISPATCH_TIMER_KIND_COUNT]; } dtn = {};
2031 _dispatch_timers_get_delay(dtn.nows, dtd->timer, &dtd->delay, &dtd->leeway,
2032 -1);
2033 }
2034
2035 uint64_t
2036 dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta,
2037 uint64_t *leeway_ptr)
2038 {
2039 struct dispatch_timer_delay_s dtd = {
2040 .timer = dta->dta_timer_data,
2041 };
2042 dispatch_sync_f((dispatch_queue_t)dta, &dtd,
2043 _dispatch_timer_aggregate_get_delay);
2044 if (leeway_ptr) {
2045 *leeway_ptr = dtd.leeway;
2046 }
2047 return dtd.delay;
2048 }
2049
2050 static void
2051 _dispatch_timer_aggregate_update(void *ctxt)
2052 {
2053 dispatch_timer_aggregate_t dta = (void*)_dispatch_queue_get_current();
2054 dispatch_timer_t dtau = ctxt;
2055 unsigned int tidx;
2056 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
2057 dta->dta_timer_data[tidx].target = dtau[tidx].target;
2058 dta->dta_timer_data[tidx].deadline = dtau[tidx].deadline;
2059 }
2060 free(dtau);
2061 }
2062
2063 DISPATCH_NOINLINE
2064 static void
2065 _dispatch_timer_aggregates_configure(void)
2066 {
2067 dispatch_timer_aggregate_t dta;
2068 dispatch_timer_t dtau;
2069 TAILQ_FOREACH(dta, &_dispatch_timer_aggregates, dta_list) {
2070 if (!_dispatch_timers_check(dta->dta_kevent_timer, dta->dta_timer)) {
2071 continue;
2072 }
2073 dtau = _dispatch_calloc(DISPATCH_TIMER_COUNT, sizeof(*dtau));
2074 memcpy(dtau, dta->dta_timer, sizeof(dta->dta_timer));
2075 _dispatch_barrier_async_detached_f((dispatch_queue_t)dta, dtau,
2076 _dispatch_timer_aggregate_update);
2077 }
2078 }
2079
2080 static inline void
2081 _dispatch_timer_aggregates_check(void)
2082 {
2083 if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates))) {
2084 return;
2085 }
2086 _dispatch_timer_aggregates_configure();
2087 }
2088
2089 static void
2090 _dispatch_timer_aggregates_register(dispatch_source_t ds)
2091 {
2092 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
2093 if (!dta->dta_refcount++) {
2094 TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates, dta, dta_list);
2095 }
2096 }
2097
2098 DISPATCH_NOINLINE
2099 static void
2100 _dispatch_timer_aggregates_update(dispatch_source_t ds, unsigned int tidx)
2101 {
2102 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
2103 dispatch_timer_source_aggregate_refs_t dr;
2104 dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
2105 _dispatch_timers_insert(tidx, dta->dta_kevent_timer, dr, dra_list,
2106 dta->dta_timer, dr, dta_list);
2107 }
2108
2109 DISPATCH_NOINLINE
2110 static void
2111 _dispatch_timer_aggregates_unregister(dispatch_source_t ds, unsigned int tidx)
2112 {
2113 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
2114 dispatch_timer_source_aggregate_refs_t dr;
2115 dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
2116 _dispatch_timers_remove(tidx, (dispatch_timer_aggregate_refs_s*)NULL,
2117 dta->dta_kevent_timer, dr, dra_list, dta->dta_timer, dr, dta_list);
2118 if (!--dta->dta_refcount) {
2119 TAILQ_REMOVE(&_dispatch_timer_aggregates, dta, dta_list);
2120 }
2121 }
2122
2123 #pragma mark -
2124 #pragma mark dispatch_select
2125
2126 static int _dispatch_kq;
2127
2128 #if DISPATCH_USE_SELECT_FALLBACK
2129
2130 static unsigned int _dispatch_select_workaround;
2131 static fd_set _dispatch_rfds;
2132 static fd_set _dispatch_wfds;
2133 static uint64_t*_dispatch_rfd_ptrs;
2134 static uint64_t*_dispatch_wfd_ptrs;
2135
2136 DISPATCH_NOINLINE
2137 static bool
2138 _dispatch_select_register(const _dispatch_kevent_qos_s *kev)
2139 {
2140 // Must execute on manager queue
2141 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
2142
2143 // If an EINVAL or ENOENT error occurred while adding/enabling a read or
2144 // write kevent, assume it was due to a type of filedescriptor not
2145 // supported by kqueue and fall back to select
2146 switch (kev->filter) {
2147 case EVFILT_READ:
2148 if ((kev->data == EINVAL || kev->data == ENOENT) &&
2149 dispatch_assume(kev->ident < FD_SETSIZE)) {
2150 FD_SET((int)kev->ident, &_dispatch_rfds);
2151 if (slowpath(!_dispatch_rfd_ptrs)) {
2152 _dispatch_rfd_ptrs = _dispatch_calloc(FD_SETSIZE,
2153 sizeof(*_dispatch_rfd_ptrs));
2154 }
2155 if (!_dispatch_rfd_ptrs[kev->ident]) {
2156 _dispatch_rfd_ptrs[kev->ident] = kev->udata;
2157 _dispatch_select_workaround++;
2158 _dispatch_debug("select workaround used to read fd %d: 0x%lx",
2159 (int)kev->ident, (long)kev->data);
2160 }
2161 return true;
2162 }
2163 break;
2164 case EVFILT_WRITE:
2165 if ((kev->data == EINVAL || kev->data == ENOENT) &&
2166 dispatch_assume(kev->ident < FD_SETSIZE)) {
2167 FD_SET((int)kev->ident, &_dispatch_wfds);
2168 if (slowpath(!_dispatch_wfd_ptrs)) {
2169 _dispatch_wfd_ptrs = _dispatch_calloc(FD_SETSIZE,
2170 sizeof(*_dispatch_wfd_ptrs));
2171 }
2172 if (!_dispatch_wfd_ptrs[kev->ident]) {
2173 _dispatch_wfd_ptrs[kev->ident] = kev->udata;
2174 _dispatch_select_workaround++;
2175 _dispatch_debug("select workaround used to write fd %d: 0x%lx",
2176 (int)kev->ident, (long)kev->data);
2177 }
2178 return true;
2179 }
2180 break;
2181 }
2182 return false;
2183 }
2184
2185 DISPATCH_NOINLINE
2186 static bool
2187 _dispatch_select_unregister(const _dispatch_kevent_qos_s *kev)
2188 {
2189 // Must execute on manager queue
2190 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
2191
2192 switch (kev->filter) {
2193 case EVFILT_READ:
2194 if (_dispatch_rfd_ptrs && kev->ident < FD_SETSIZE &&
2195 _dispatch_rfd_ptrs[kev->ident]) {
2196 FD_CLR((int)kev->ident, &_dispatch_rfds);
2197 _dispatch_rfd_ptrs[kev->ident] = 0;
2198 _dispatch_select_workaround--;
2199 return true;
2200 }
2201 break;
2202 case EVFILT_WRITE:
2203 if (_dispatch_wfd_ptrs && kev->ident < FD_SETSIZE &&
2204 _dispatch_wfd_ptrs[kev->ident]) {
2205 FD_CLR((int)kev->ident, &_dispatch_wfds);
2206 _dispatch_wfd_ptrs[kev->ident] = 0;
2207 _dispatch_select_workaround--;
2208 return true;
2209 }
2210 break;
2211 }
2212 return false;
2213 }
2214
2215 DISPATCH_NOINLINE
2216 static bool
2217 _dispatch_mgr_select(bool poll)
2218 {
2219 static const struct timeval timeout_immediately = { 0, 0 };
2220 fd_set tmp_rfds, tmp_wfds;
2221 int err, i, r;
2222 bool kevent_avail = false;
2223
2224 FD_COPY(&_dispatch_rfds, &tmp_rfds);
2225 FD_COPY(&_dispatch_wfds, &tmp_wfds);
2226
2227 r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL,
2228 poll ? (struct timeval*)&timeout_immediately : NULL);
2229 if (slowpath(r == -1)) {
2230 err = errno;
2231 if (err != EBADF) {
2232 if (err != EINTR) {
2233 (void)dispatch_assume_zero(err);
2234 }
2235 return false;
2236 }
2237 for (i = 0; i < FD_SETSIZE; i++) {
2238 if (i == _dispatch_kq) {
2239 continue;
2240 }
2241 if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)){
2242 continue;
2243 }
2244 r = dup(i);
2245 if (dispatch_assume(r != -1)) {
2246 close(r);
2247 } else {
2248 if (_dispatch_rfd_ptrs && _dispatch_rfd_ptrs[i]) {
2249 FD_CLR(i, &_dispatch_rfds);
2250 _dispatch_rfd_ptrs[i] = 0;
2251 _dispatch_select_workaround--;
2252 }
2253 if (_dispatch_wfd_ptrs && _dispatch_wfd_ptrs[i]) {
2254 FD_CLR(i, &_dispatch_wfds);
2255 _dispatch_wfd_ptrs[i] = 0;
2256 _dispatch_select_workaround--;
2257 }
2258 }
2259 }
2260 return false;
2261 }
2262 if (r > 0) {
2263 for (i = 0; i < FD_SETSIZE; i++) {
2264 if (FD_ISSET(i, &tmp_rfds)) {
2265 if (i == _dispatch_kq) {
2266 kevent_avail = true;
2267 continue;
2268 }
2269 FD_CLR(i, &_dispatch_rfds); // emulate EV_DISPATCH
2270 _dispatch_kevent_qos_s kev = {
2271 .ident = (uint64_t)i,
2272 .filter = EVFILT_READ,
2273 .flags = EV_ADD|EV_ENABLE|EV_DISPATCH,
2274 .data = 1,
2275 .udata = _dispatch_rfd_ptrs[i],
2276 };
2277 _dispatch_kevent_drain(&kev);
2278 }
2279 if (FD_ISSET(i, &tmp_wfds)) {
2280 FD_CLR(i, &_dispatch_wfds); // emulate EV_DISPATCH
2281 _dispatch_kevent_qos_s kev = {
2282 .ident = (uint64_t)i,
2283 .filter = EVFILT_WRITE,
2284 .flags = EV_ADD|EV_ENABLE|EV_DISPATCH,
2285 .data = 1,
2286 .udata = _dispatch_wfd_ptrs[i],
2287 };
2288 _dispatch_kevent_drain(&kev);
2289 }
2290 }
2291 }
2292 return kevent_avail;
2293 }
2294
2295 #endif // DISPATCH_USE_SELECT_FALLBACK
2296
2297 #pragma mark -
2298 #pragma mark dispatch_kqueue
2299
2300 static void
2301 _dispatch_kq_init(void *context DISPATCH_UNUSED)
2302 {
2303 static const _dispatch_kevent_qos_s kev = {
2304 .ident = 1,
2305 .filter = EVFILT_USER,
2306 .flags = EV_ADD|EV_CLEAR,
2307 };
2308
2309 _dispatch_safe_fork = false;
2310 #if DISPATCH_USE_GUARDED_FD
2311 guardid_t guard = (uintptr_t)&kev;
2312 _dispatch_kq = guarded_kqueue_np(&guard, GUARD_CLOSE | GUARD_DUP);
2313 #else
2314 _dispatch_kq = kqueue();
2315 #endif
2316 if (_dispatch_kq == -1) {
2317 int err = errno;
2318 switch (err) {
2319 case EMFILE:
2320 DISPATCH_CLIENT_CRASH("kqueue() failure: "
2321 "process is out of file descriptors");
2322 break;
2323 case ENFILE:
2324 DISPATCH_CLIENT_CRASH("kqueue() failure: "
2325 "system is out of file descriptors");
2326 break;
2327 case ENOMEM:
2328 DISPATCH_CLIENT_CRASH("kqueue() failure: "
2329 "kernel is out of memory");
2330 break;
2331 default:
2332 (void)dispatch_assume_zero(err);
2333 DISPATCH_CRASH("kqueue() failure");
2334 break;
2335 }
2336 }
2337 #if DISPATCH_USE_SELECT_FALLBACK
2338 else if (dispatch_assume(_dispatch_kq < FD_SETSIZE)) {
2339 // in case we fall back to select()
2340 FD_SET(_dispatch_kq, &_dispatch_rfds);
2341 }
2342 #endif // DISPATCH_USE_SELECT_FALLBACK
2343
2344 (void)dispatch_assume_zero(kevent_qos(_dispatch_kq, &kev, 1, NULL, 0, NULL,
2345 NULL, 0));
2346 _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q, 0);
2347 }
2348
2349 static int
2350 _dispatch_get_kq(void)
2351 {
2352 static dispatch_once_t pred;
2353
2354 dispatch_once_f(&pred, NULL, _dispatch_kq_init);
2355
2356 return _dispatch_kq;
2357 }
2358
2359 DISPATCH_NOINLINE
2360 static long
2361 _dispatch_kq_update(const _dispatch_kevent_qos_s *kev)
2362 {
2363 int r;
2364 _dispatch_kevent_qos_s kev_error;
2365
2366 #if DISPATCH_USE_SELECT_FALLBACK
2367 if (slowpath(_dispatch_select_workaround) && (kev->flags & EV_DELETE)) {
2368 if (_dispatch_select_unregister(kev)) {
2369 return 0;
2370 }
2371 }
2372 #endif // DISPATCH_USE_SELECT_FALLBACK
2373 if (kev->filter != EVFILT_USER || DISPATCH_MGR_QUEUE_DEBUG) {
2374 _dispatch_kevent_debug(kev, __func__);
2375 }
2376 retry:
2377 r = kevent_qos(_dispatch_get_kq(), kev, 1, &kev_error,
2378 1, NULL, NULL, KEVENT_FLAG_ERROR_EVENTS);
2379 if (slowpath(r == -1)) {
2380 int err = errno;
2381 switch (err) {
2382 case EINTR:
2383 goto retry;
2384 case EBADF:
2385 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2386 break;
2387 default:
2388 (void)dispatch_assume_zero(err);
2389 break;
2390 }
2391 return err;
2392 }
2393 if (r == 0) {
2394 return 0;
2395 }
2396 if (kev_error.flags & EV_ERROR && kev_error.data) {
2397 _dispatch_kevent_debug(&kev_error, __func__);
2398 }
2399 r = (int)kev_error.data;
2400 switch (r) {
2401 case 0:
2402 _dispatch_kevent_mgr_debug(&kev_error, __func__);
2403 break;
2404 case EINPROGRESS:
2405 // deferred EV_DELETE
2406 break;
2407 case ENOENT:
2408 if ((kev->flags & EV_DELETE) && (kev->flags & EV_UDATA_SPECIFIC)) {
2409 // potential concurrent EV_DELETE delivery
2410 break;
2411 }
2412 // fall through
2413 case EINVAL:
2414 if ((kev->flags & (EV_ADD|EV_ENABLE)) && !(kev->flags & EV_DELETE)) {
2415 #if DISPATCH_USE_SELECT_FALLBACK
2416 if (_dispatch_select_register(&kev_error)) {
2417 r = 0;
2418 break;
2419 }
2420 #elif DISPATCH_DEBUG
2421 if (kev->filter == EVFILT_READ || kev->filter == EVFILT_WRITE) {
2422 DISPATCH_CRASH("Unsupported fd for EVFILT_READ or EVFILT_WRITE "
2423 "kevent");
2424 }
2425 #endif // DISPATCH_USE_SELECT_FALLBACK
2426 }
2427 // fall through
2428 case EBADF:
2429 case EPERM:
2430 default:
2431 kev_error.flags |= kev->flags;
2432 _dispatch_kevent_drain(&kev_error);
2433 r = (int)kev_error.data;
2434 break;
2435 }
2436 return r;
2437 }
2438
2439 #pragma mark -
2440 #pragma mark dispatch_mgr
2441
2442 static _dispatch_kevent_qos_s *_dispatch_kevent_enable;
2443
2444 static void inline
2445 _dispatch_mgr_kevent_reenable(_dispatch_kevent_qos_s *ke)
2446 {
2447 dispatch_assert(!_dispatch_kevent_enable || _dispatch_kevent_enable == ke);
2448 _dispatch_kevent_enable = ke;
2449 }
2450
2451 unsigned long
2452 _dispatch_mgr_wakeup(dispatch_queue_t dq DISPATCH_UNUSED)
2453 {
2454 if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
2455 return false;
2456 }
2457
2458 static const _dispatch_kevent_qos_s kev = {
2459 .ident = 1,
2460 .filter = EVFILT_USER,
2461 .fflags = NOTE_TRIGGER,
2462 };
2463
2464 #if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG
2465 _dispatch_debug("waking up the dispatch manager queue: %p", dq);
2466 #endif
2467
2468 _dispatch_kq_update(&kev);
2469
2470 return false;
2471 }
2472
2473 DISPATCH_NOINLINE
2474 static void
2475 _dispatch_mgr_init(void)
2476 {
2477 (void)dispatch_atomic_inc2o(&_dispatch_mgr_q, dq_running, relaxed);
2478 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_mgr_q);
2479 _dispatch_queue_set_bound_thread(&_dispatch_mgr_q);
2480 _dispatch_mgr_priority_init();
2481 _dispatch_kevent_init();
2482 _dispatch_timers_init();
2483 _dispatch_mach_recv_msg_buf_init();
2484 _dispatch_memorystatus_init();
2485 }
2486
2487 DISPATCH_NOINLINE DISPATCH_NORETURN
2488 static void
2489 _dispatch_mgr_invoke(void)
2490 {
2491 _dispatch_kevent_qos_s kev;
2492 bool poll;
2493 int r;
2494
2495 for (;;) {
2496 _dispatch_mgr_queue_drain();
2497 poll = _dispatch_mgr_timers();
2498 #if DISPATCH_USE_SELECT_FALLBACK
2499 if (slowpath(_dispatch_select_workaround)) {
2500 poll = _dispatch_mgr_select(poll);
2501 if (!poll) continue;
2502 }
2503 #endif // DISPATCH_USE_SELECT_FALLBACK
2504 poll = poll || _dispatch_queue_class_probe(&_dispatch_mgr_q);
2505 r = kevent_qos(_dispatch_kq, _dispatch_kevent_enable,
2506 _dispatch_kevent_enable ? 1 : 0, &kev, 1, NULL, NULL,
2507 poll ? KEVENT_FLAG_IMMEDIATE : KEVENT_FLAG_NONE);
2508 _dispatch_kevent_enable = NULL;
2509 if (slowpath(r == -1)) {
2510 int err = errno;
2511 switch (err) {
2512 case EINTR:
2513 break;
2514 case EBADF:
2515 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2516 break;
2517 default:
2518 (void)dispatch_assume_zero(err);
2519 break;
2520 }
2521 } else if (r) {
2522 _dispatch_kevent_drain(&kev);
2523 }
2524 }
2525 }
2526
2527 DISPATCH_NORETURN
2528 void
2529 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED,
2530 dispatch_object_t dou DISPATCH_UNUSED,
2531 dispatch_invoke_flags_t flags DISPATCH_UNUSED)
2532 {
2533 _dispatch_mgr_init();
2534 // never returns, so burn bridges behind us & clear stack 2k ahead
2535 _dispatch_clear_stack(2048);
2536 _dispatch_mgr_invoke();
2537 }
2538
2539 #pragma mark -
2540 #pragma mark dispatch_memorystatus
2541
2542 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
2543 #define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYSTATUS
2544 #define DISPATCH_MEMORYSTATUS_SOURCE_MASK ( \
2545 DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL | \
2546 DISPATCH_MEMORYSTATUS_PRESSURE_WARN)
2547 #elif DISPATCH_USE_VM_PRESSURE_SOURCE
2548 #define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_VM
2549 #define DISPATCH_MEMORYSTATUS_SOURCE_MASK DISPATCH_VM_PRESSURE
2550 #endif
2551
2552 #if DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2553 static dispatch_source_t _dispatch_memorystatus_source;
2554
2555 static void
2556 _dispatch_memorystatus_handler(void *context DISPATCH_UNUSED)
2557 {
2558 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
2559 unsigned long memorystatus;
2560 memorystatus = dispatch_source_get_data(_dispatch_memorystatus_source);
2561 if (memorystatus & DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL) {
2562 _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT;
2563 _voucher_activity_heap_pressure_normal();
2564 return;
2565 }
2566 _dispatch_continuation_cache_limit =
2567 DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYSTATUS_PRESSURE_WARN;
2568 _voucher_activity_heap_pressure_warn();
2569 #endif
2570 malloc_zone_pressure_relief(0,0);
2571 }
2572
2573 static void
2574 _dispatch_memorystatus_init(void)
2575 {
2576 _dispatch_memorystatus_source = dispatch_source_create(
2577 DISPATCH_MEMORYSTATUS_SOURCE_TYPE, 0,
2578 DISPATCH_MEMORYSTATUS_SOURCE_MASK,
2579 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true));
2580 dispatch_source_set_event_handler_f(_dispatch_memorystatus_source,
2581 _dispatch_memorystatus_handler);
2582 dispatch_resume(_dispatch_memorystatus_source);
2583 }
2584 #else
2585 static inline void _dispatch_memorystatus_init(void) {}
2586 #endif // DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2587
2588 #pragma mark -
2589 #pragma mark dispatch_mach
2590
2591 #if HAVE_MACH
2592
2593 #if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
2594 #define _dispatch_debug_machport(name) \
2595 dispatch_debug_machport((name), __func__)
2596 #else
2597 #define _dispatch_debug_machport(name) ((void)(name))
2598 #endif
2599
2600 // Flags for all notifications that are registered/unregistered when a
2601 // send-possible notification is requested/delivered
2602 #define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
2603 DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
2604 #define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \
2605 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2606 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2607 #define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \
2608 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2609 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2610
2611 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
2612 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
2613 (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
2614
2615 #define _DISPATCH_MACHPORT_HASH_SIZE 32
2616 #define _DISPATCH_MACHPORT_HASH(x) \
2617 _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
2618
2619 #ifndef MACH_RCV_LARGE_IDENTITY
2620 #define MACH_RCV_LARGE_IDENTITY 0x00000008
2621 #endif
2622 #ifndef MACH_RCV_VOUCHER
2623 #define MACH_RCV_VOUCHER 0x00000800
2624 #endif
2625 #define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX
2626 #define DISPATCH_MACH_RCV_OPTIONS ( \
2627 MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \
2628 MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \
2629 MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)) | \
2630 MACH_RCV_VOUCHER
2631
2632 #define DISPATCH_MACH_KEVENT_ARMED(dk) ((dk)->dk_kevent.ext[0])
2633
2634 static void _dispatch_kevent_machport_drain(_dispatch_kevent_qos_s *ke);
2635 static void _dispatch_kevent_mach_msg_drain(_dispatch_kevent_qos_s *ke);
2636 static void _dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr);
2637 static void _dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr);
2638 static void _dispatch_source_merge_mach_msg(dispatch_source_t ds,
2639 dispatch_source_refs_t dr, dispatch_kevent_t dk,
2640 mach_msg_header_t *hdr, mach_msg_size_t siz);
2641 static kern_return_t _dispatch_mach_notify_update(dispatch_kevent_t dk,
2642 uint32_t new_flags, uint32_t del_flags, uint32_t mask,
2643 mach_msg_id_t notify_msgid, mach_port_mscount_t notify_sync);
2644 static void _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr);
2645 static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
2646 dispatch_mach_reply_refs_t dmr, bool disconnected);
2647 static void _dispatch_mach_kevent_unregister(dispatch_mach_t dm);
2648 static inline void _dispatch_mach_msg_set_options(dispatch_object_t dou,
2649 mach_msg_option_t options);
2650 static void _dispatch_mach_msg_recv(dispatch_mach_t dm,
2651 dispatch_mach_reply_refs_t dmr, mach_msg_header_t *hdr,
2652 mach_msg_size_t siz);
2653 static void _dispatch_mach_merge_kevent(dispatch_mach_t dm,
2654 const _dispatch_kevent_qos_s *ke);
2655 static inline mach_msg_option_t _dispatch_mach_checkin_options(void);
2656
2657 static const size_t _dispatch_mach_recv_msg_size =
2658 DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE;
2659 static const size_t dispatch_mach_trailer_size =
2660 sizeof(dispatch_mach_trailer_t);
2661 static mach_msg_size_t _dispatch_mach_recv_msg_buf_size;
2662 static mach_port_t _dispatch_mach_portset, _dispatch_mach_recv_portset;
2663 static mach_port_t _dispatch_mach_notify_port;
2664 static _dispatch_kevent_qos_s _dispatch_mach_recv_kevent = {
2665 .filter = EVFILT_MACHPORT,
2666 .flags = EV_ADD|EV_ENABLE|EV_DISPATCH,
2667 .fflags = DISPATCH_MACH_RCV_OPTIONS,
2668 };
2669 static dispatch_source_t _dispatch_mach_notify_source;
2670 static const
2671 struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct = {
2672 .ke = {
2673 .filter = EVFILT_MACHPORT,
2674 .flags = EV_CLEAR,
2675 .fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT,
2676 },
2677 };
2678
2679 static void
2680 _dispatch_mach_recv_msg_buf_init(void)
2681 {
2682 mach_vm_size_t vm_size = mach_vm_round_page(
2683 _dispatch_mach_recv_msg_size + dispatch_mach_trailer_size);
2684 _dispatch_mach_recv_msg_buf_size = (mach_msg_size_t)vm_size;
2685 mach_vm_address_t vm_addr = vm_page_size;
2686 kern_return_t kr;
2687
2688 while (slowpath(kr = mach_vm_allocate(mach_task_self(), &vm_addr, vm_size,
2689 VM_FLAGS_ANYWHERE))) {
2690 if (kr != KERN_NO_SPACE) {
2691 (void)dispatch_assume_zero(kr);
2692 DISPATCH_CLIENT_CRASH("Could not allocate mach msg receive buffer");
2693 }
2694 _dispatch_temporary_resource_shortage();
2695 vm_addr = vm_page_size;
2696 }
2697 _dispatch_mach_recv_kevent.ext[0] = (uintptr_t)vm_addr;
2698 _dispatch_mach_recv_kevent.ext[1] = vm_size;
2699 }
2700
2701 static inline void*
2702 _dispatch_get_mach_recv_msg_buf(void)
2703 {
2704 return (void*)_dispatch_mach_recv_kevent.ext[0];
2705 }
2706
2707 static void
2708 _dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED)
2709 {
2710 kern_return_t kr;
2711
2712 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
2713 &_dispatch_mach_recv_portset);
2714 DISPATCH_VERIFY_MIG(kr);
2715 if (dispatch_assume_zero(kr)) {
2716 DISPATCH_CLIENT_CRASH(
2717 "mach_port_allocate() failed: cannot create port set");
2718 }
2719 dispatch_assert(_dispatch_get_mach_recv_msg_buf());
2720 dispatch_assert(dispatch_mach_trailer_size ==
2721 REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS(
2722 DISPATCH_MACH_RCV_TRAILER)));
2723 _dispatch_mach_recv_kevent.ident = _dispatch_mach_recv_portset;
2724 _dispatch_kq_update(&_dispatch_mach_recv_kevent);
2725
2726 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
2727 &_dispatch_mach_notify_port);
2728 DISPATCH_VERIFY_MIG(kr);
2729 if (dispatch_assume_zero(kr)) {
2730 DISPATCH_CLIENT_CRASH(
2731 "mach_port_allocate() failed: cannot create receive right");
2732 }
2733 _dispatch_mach_notify_source = dispatch_source_create(
2734 &_dispatch_source_type_mach_recv_direct,
2735 _dispatch_mach_notify_port, 0, &_dispatch_mgr_q);
2736 static const struct dispatch_continuation_s dc = {
2737 .dc_func = (void*)_dispatch_mach_notify_source_invoke,
2738 };
2739 _dispatch_mach_notify_source->ds_refs->ds_handler[DS_EVENT_HANDLER] =
2740 (dispatch_continuation_t)&dc;
2741 dispatch_assert(_dispatch_mach_notify_source);
2742 dispatch_resume(_dispatch_mach_notify_source);
2743 }
2744
2745 static mach_port_t
2746 _dispatch_get_mach_recv_portset(void)
2747 {
2748 static dispatch_once_t pred;
2749 dispatch_once_f(&pred, NULL, _dispatch_mach_recv_portset_init);
2750 return _dispatch_mach_recv_portset;
2751 }
2752
2753 static void
2754 _dispatch_mach_portset_init(void *context DISPATCH_UNUSED)
2755 {
2756 _dispatch_kevent_qos_s kev = {
2757 .filter = EVFILT_MACHPORT,
2758 .flags = EV_ADD,
2759 };
2760 kern_return_t kr;
2761
2762 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
2763 &_dispatch_mach_portset);
2764 DISPATCH_VERIFY_MIG(kr);
2765 if (dispatch_assume_zero(kr)) {
2766 DISPATCH_CLIENT_CRASH(
2767 "mach_port_allocate() failed: cannot create port set");
2768 }
2769 kev.ident = _dispatch_mach_portset;
2770 _dispatch_kq_update(&kev);
2771 }
2772
2773 static mach_port_t
2774 _dispatch_get_mach_portset(void)
2775 {
2776 static dispatch_once_t pred;
2777 dispatch_once_f(&pred, NULL, _dispatch_mach_portset_init);
2778 return _dispatch_mach_portset;
2779 }
2780
2781 static kern_return_t
2782 _dispatch_mach_portset_update(dispatch_kevent_t dk, mach_port_t mps)
2783 {
2784 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
2785 kern_return_t kr;
2786
2787 _dispatch_debug_machport(mp);
2788 kr = mach_port_move_member(mach_task_self(), mp, mps);
2789 if (slowpath(kr)) {
2790 DISPATCH_VERIFY_MIG(kr);
2791 switch (kr) {
2792 case KERN_INVALID_RIGHT:
2793 if (mps) {
2794 _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
2795 "mach_port_move_member() failed ", kr);
2796 break;
2797 }
2798 //fall through
2799 case KERN_INVALID_NAME:
2800 #if DISPATCH_DEBUG
2801 _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
2802 "prematurely", mp);
2803 #endif
2804 break;
2805 default:
2806 (void)dispatch_assume_zero(kr);
2807 break;
2808 }
2809 }
2810 return mps ? kr : 0;
2811 }
2812
2813 static void
2814 _dispatch_kevent_mach_recv_reenable(_dispatch_kevent_qos_s *ke DISPATCH_UNUSED)
2815 {
2816 #if (TARGET_IPHONE_SIMULATOR && \
2817 IPHONE_SIMULATOR_HOST_MIN_VERSION_REQUIRED < 1090) || \
2818 (!TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED < 1090)
2819 // delete and re-add kevent to workaround <rdar://problem/13924256>
2820 if (ke->ext[1] != _dispatch_mach_recv_kevent.ext[1]) {
2821 _dispatch_kevent_qos_s kev = _dispatch_mach_recv_kevent;
2822 kev.flags = EV_DELETE;
2823 _dispatch_kq_update(&kev);
2824 }
2825 #endif
2826 _dispatch_mgr_kevent_reenable(&_dispatch_mach_recv_kevent);
2827 }
2828
2829 static kern_return_t
2830 _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags,
2831 uint32_t del_flags)
2832 {
2833 kern_return_t kr = 0;
2834 dispatch_assert_zero(new_flags & del_flags);
2835 if ((new_flags & _DISPATCH_MACH_RECV_FLAGS) ||
2836 (del_flags & _DISPATCH_MACH_RECV_FLAGS)) {
2837 mach_port_t mps;
2838 if (new_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
2839 mps = _dispatch_get_mach_recv_portset();
2840 } else if ((new_flags & DISPATCH_MACH_RECV_MESSAGE) ||
2841 ((del_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) &&
2842 (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE))) {
2843 mps = _dispatch_get_mach_portset();
2844 } else {
2845 mps = MACH_PORT_NULL;
2846 }
2847 kr = _dispatch_mach_portset_update(dk, mps);
2848 }
2849 return kr;
2850 }
2851
2852 static kern_return_t
2853 _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk, uint32_t new_flags,
2854 uint32_t del_flags)
2855 {
2856 kern_return_t kr = 0;
2857 dispatch_assert_zero(new_flags & del_flags);
2858 if ((new_flags & _DISPATCH_MACH_SP_FLAGS) ||
2859 (del_flags & _DISPATCH_MACH_SP_FLAGS)) {
2860 // Requesting a (delayed) non-sync send-possible notification
2861 // registers for both immediate dead-name notification and delayed-arm
2862 // send-possible notification for the port.
2863 // The send-possible notification is armed when a mach_msg() with the
2864 // the MACH_SEND_NOTIFY to the port times out.
2865 // If send-possible is unavailable, fall back to immediate dead-name
2866 // registration rdar://problem/2527840&9008724
2867 kr = _dispatch_mach_notify_update(dk, new_flags, del_flags,
2868 _DISPATCH_MACH_SP_FLAGS, MACH_NOTIFY_SEND_POSSIBLE,
2869 MACH_NOTIFY_SEND_POSSIBLE == MACH_NOTIFY_DEAD_NAME ? 1 : 0);
2870 }
2871 return kr;
2872 }
2873
2874 static inline void
2875 _dispatch_kevent_mach_portset(_dispatch_kevent_qos_s *ke)
2876 {
2877 if (ke->ident == _dispatch_mach_recv_portset) {
2878 return _dispatch_kevent_mach_msg_drain(ke);
2879 } else if (ke->ident == _dispatch_mach_portset) {
2880 return _dispatch_kevent_machport_drain(ke);
2881 } else {
2882 return _dispatch_kevent_error(ke);
2883 }
2884 }
2885
2886 DISPATCH_NOINLINE
2887 static void
2888 _dispatch_kevent_machport_drain(_dispatch_kevent_qos_s *ke)
2889 {
2890 _dispatch_kevent_debug(ke, __func__);
2891 mach_port_t name = (mach_port_name_t)ke->data;
2892 dispatch_kevent_t dk;
2893
2894 _dispatch_debug_machport(name);
2895 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
2896 if (!dispatch_assume(dk)) {
2897 return;
2898 }
2899 _dispatch_mach_portset_update(dk, MACH_PORT_NULL); // emulate EV_DISPATCH
2900
2901 _dispatch_kevent_qos_s kev = {
2902 .ident = name,
2903 .filter = EVFILT_MACHPORT,
2904 .flags = EV_ADD|EV_ENABLE|EV_DISPATCH,
2905 .fflags = DISPATCH_MACH_RECV_MESSAGE,
2906 .udata = (uintptr_t)dk,
2907 };
2908 _dispatch_kevent_debug(&kev, __func__);
2909 _dispatch_kevent_merge(&kev);
2910 }
2911
2912 DISPATCH_NOINLINE
2913 static void
2914 _dispatch_kevent_mach_msg_drain(_dispatch_kevent_qos_s *ke)
2915 {
2916 _dispatch_kevent_debug(ke, __func__);
2917 mach_msg_header_t *hdr = (mach_msg_header_t*)ke->ext[0];
2918 mach_msg_size_t siz, msgsiz;
2919 mach_msg_return_t kr = (mach_msg_return_t)ke->fflags;
2920
2921 _dispatch_kevent_mach_recv_reenable(ke);
2922 if (!dispatch_assume(hdr)) {
2923 DISPATCH_CRASH("EVFILT_MACHPORT with no message");
2924 }
2925 if (fastpath(!kr)) {
2926 return _dispatch_kevent_mach_msg_recv(hdr);
2927 } else if (kr != MACH_RCV_TOO_LARGE) {
2928 goto out;
2929 }
2930 if (!dispatch_assume(ke->ext[1] <= UINT_MAX -
2931 dispatch_mach_trailer_size)) {
2932 DISPATCH_CRASH("EVFILT_MACHPORT with overlarge message");
2933 }
2934 siz = (mach_msg_size_t)ke->ext[1] + dispatch_mach_trailer_size;
2935 hdr = malloc(siz);
2936 if (ke->data) {
2937 if (!dispatch_assume(hdr)) {
2938 // Kernel will discard message too large to fit
2939 hdr = _dispatch_get_mach_recv_msg_buf();
2940 siz = _dispatch_mach_recv_msg_buf_size;
2941 }
2942 mach_port_t name = (mach_port_name_t)ke->data;
2943 const mach_msg_option_t options = ((DISPATCH_MACH_RCV_OPTIONS |
2944 MACH_RCV_TIMEOUT) & ~MACH_RCV_LARGE);
2945 kr = mach_msg(hdr, options, 0, siz, name, MACH_MSG_TIMEOUT_NONE,
2946 MACH_PORT_NULL);
2947 if (fastpath(!kr)) {
2948 return _dispatch_kevent_mach_msg_recv(hdr);
2949 } else if (kr == MACH_RCV_TOO_LARGE) {
2950 _dispatch_log("BUG in libdispatch client: "
2951 "_dispatch_kevent_mach_msg_drain: dropped message too "
2952 "large to fit in memory: id = 0x%x, size = %lld",
2953 hdr->msgh_id, ke->ext[1]);
2954 kr = MACH_MSG_SUCCESS;
2955 }
2956 } else {
2957 // We don't know which port in the portset contains the large message,
2958 // so need to receive all messages pending on the portset to ensure the
2959 // large message is drained. <rdar://problem/13950432>
2960 bool received = false;
2961 for (;;) {
2962 if (!dispatch_assume(hdr)) {
2963 DISPATCH_CLIENT_CRASH("Message too large to fit in memory");
2964 }
2965 const mach_msg_option_t options = (DISPATCH_MACH_RCV_OPTIONS |
2966 MACH_RCV_TIMEOUT);
2967 kr = mach_msg(hdr, options, 0, siz, _dispatch_mach_recv_portset,
2968 MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
2969 if ((!kr || kr == MACH_RCV_TOO_LARGE) && !dispatch_assume(
2970 hdr->msgh_size <= UINT_MAX - dispatch_mach_trailer_size)) {
2971 DISPATCH_CRASH("Overlarge message");
2972 }
2973 if (fastpath(!kr)) {
2974 msgsiz = hdr->msgh_size + dispatch_mach_trailer_size;
2975 if (msgsiz < siz) {
2976 void *shrink = realloc(hdr, msgsiz);
2977 if (shrink) hdr = shrink;
2978 }
2979 _dispatch_kevent_mach_msg_recv(hdr);
2980 hdr = NULL;
2981 received = true;
2982 } else if (kr == MACH_RCV_TOO_LARGE) {
2983 siz = hdr->msgh_size + dispatch_mach_trailer_size;
2984 } else {
2985 if (kr == MACH_RCV_TIMED_OUT && received) {
2986 kr = MACH_MSG_SUCCESS;
2987 }
2988 break;
2989 }
2990 hdr = reallocf(hdr, siz);
2991 }
2992 }
2993 if (hdr != _dispatch_get_mach_recv_msg_buf()) {
2994 free(hdr);
2995 }
2996 out:
2997 if (slowpath(kr)) {
2998 _dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: "
2999 "message reception failed", kr);
3000 }
3001 }
3002
3003 static void
3004 _dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr)
3005 {
3006 dispatch_source_refs_t dri;
3007 dispatch_kevent_t dk;
3008 mach_port_t name = hdr->msgh_local_port;
3009 mach_msg_size_t siz = hdr->msgh_size + dispatch_mach_trailer_size;
3010
3011 if (!dispatch_assume(hdr->msgh_size <= UINT_MAX -
3012 dispatch_mach_trailer_size)) {
3013 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3014 "received overlarge message");
3015 return _dispatch_kevent_mach_msg_destroy(hdr);
3016 }
3017 if (!dispatch_assume(name)) {
3018 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3019 "received message with MACH_PORT_NULL port");
3020 return _dispatch_kevent_mach_msg_destroy(hdr);
3021 }
3022 _dispatch_debug_machport(name);
3023 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
3024 if (!dispatch_assume(dk)) {
3025 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3026 "received message with unknown kevent");
3027 return _dispatch_kevent_mach_msg_destroy(hdr);
3028 }
3029 _dispatch_kevent_debug(&dk->dk_kevent, __func__);
3030 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
3031 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
3032 if (dsi->ds_pending_data_mask & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
3033 return _dispatch_source_merge_mach_msg(dsi, dri, dk, hdr, siz);
3034 }
3035 }
3036 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
3037 "received message with no listeners");
3038 return _dispatch_kevent_mach_msg_destroy(hdr);
3039 }
3040
3041 static void
3042 _dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr)
3043 {
3044 if (hdr) {
3045 mach_msg_destroy(hdr);
3046 if (hdr != _dispatch_get_mach_recv_msg_buf()) {
3047 free(hdr);
3048 }
3049 }
3050 }
3051
3052 static void
3053 _dispatch_source_merge_mach_msg(dispatch_source_t ds, dispatch_source_refs_t dr,
3054 dispatch_kevent_t dk, mach_msg_header_t *hdr, mach_msg_size_t siz)
3055 {
3056 if (ds == _dispatch_mach_notify_source) {
3057 _dispatch_mach_notify_source_invoke(hdr);
3058 return _dispatch_kevent_mach_msg_destroy(hdr);
3059 }
3060 dispatch_mach_reply_refs_t dmr = NULL;
3061 if (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) {
3062 dmr = (dispatch_mach_reply_refs_t)dr;
3063 }
3064 return _dispatch_mach_msg_recv((dispatch_mach_t)ds, dmr, hdr, siz);
3065 }
3066
3067 DISPATCH_NOINLINE
3068 static void
3069 _dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, bool final)
3070 {
3071 dispatch_source_refs_t dri, dr_next;
3072 dispatch_kevent_t dk;
3073 bool unreg;
3074
3075 dk = _dispatch_kevent_find(name, DISPATCH_EVFILT_MACH_NOTIFICATION);
3076 if (!dk) {
3077 return;
3078 }
3079
3080 // Update notification registration state.
3081 dk->dk_kevent.data &= ~_DISPATCH_MACH_SP_FLAGS;
3082 _dispatch_kevent_qos_s kev = {
3083 .ident = name,
3084 .filter = DISPATCH_EVFILT_MACH_NOTIFICATION,
3085 .flags = EV_ADD|EV_ENABLE,
3086 .fflags = flag,
3087 .udata = (uintptr_t)dk,
3088 };
3089 if (final) {
3090 // This can never happen again
3091 unreg = true;
3092 } else {
3093 // Re-register for notification before delivery
3094 unreg = _dispatch_kevent_resume(dk, flag, 0);
3095 }
3096 DISPATCH_MACH_KEVENT_ARMED(dk) = 0;
3097 TAILQ_FOREACH_SAFE(dri, &dk->dk_sources, dr_list, dr_next) {
3098 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
3099 if (dx_type(dsi) == DISPATCH_MACH_CHANNEL_TYPE) {
3100 dispatch_mach_t dm = (dispatch_mach_t)dsi;
3101 _dispatch_mach_merge_kevent(dm, &kev);
3102 if (unreg && dm->dm_dkev) {
3103 _dispatch_mach_kevent_unregister(dm);
3104 }
3105 } else {
3106 _dispatch_source_merge_kevent(dsi, &kev);
3107 if (unreg) {
3108 _dispatch_source_kevent_unregister(dsi);
3109 }
3110 }
3111 if (!dr_next || DISPATCH_MACH_KEVENT_ARMED(dk)) {
3112 // current merge is last in list (dk might have been freed)
3113 // or it re-armed the notification
3114 return;
3115 }
3116 }
3117 }
3118
3119 static kern_return_t
3120 _dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags,
3121 uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid,
3122 mach_port_mscount_t notify_sync)
3123 {
3124 mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
3125 typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data;
3126 kern_return_t kr, krr = 0;
3127
3128 // Update notification registration state.
3129 dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask;
3130 dk->dk_kevent.data &= ~(del_flags & mask);
3131
3132 _dispatch_debug_machport(port);
3133 if ((dk->dk_kevent.data & mask) && !(prev & mask)) {
3134 // initialize _dispatch_mach_notify_port:
3135 (void)_dispatch_get_mach_recv_portset();
3136 _dispatch_debug("machport[0x%08x]: registering for send-possible "
3137 "notification", port);
3138 previous = MACH_PORT_NULL;
3139 krr = mach_port_request_notification(mach_task_self(), port,
3140 notify_msgid, notify_sync, _dispatch_mach_notify_port,
3141 MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
3142 DISPATCH_VERIFY_MIG(krr);
3143
3144 switch(krr) {
3145 case KERN_INVALID_NAME:
3146 case KERN_INVALID_RIGHT:
3147 // Supress errors & clear registration state
3148 dk->dk_kevent.data &= ~mask;
3149 break;
3150 default:
3151 // Else, we dont expect any errors from mach. Log any errors
3152 if (dispatch_assume_zero(krr)) {
3153 // log the error & clear registration state
3154 dk->dk_kevent.data &= ~mask;
3155 } else if (dispatch_assume_zero(previous)) {
3156 // Another subsystem has beat libdispatch to requesting the
3157 // specified Mach notification on this port. We should
3158 // technically cache the previous port and message it when the
3159 // kernel messages our port. Or we can just say screw those
3160 // subsystems and deallocate the previous port.
3161 // They should adopt libdispatch :-P
3162 kr = mach_port_deallocate(mach_task_self(), previous);
3163 DISPATCH_VERIFY_MIG(kr);
3164 (void)dispatch_assume_zero(kr);
3165 previous = MACH_PORT_NULL;
3166 }
3167 }
3168 } else if (!(dk->dk_kevent.data & mask) && (prev & mask)) {
3169 _dispatch_debug("machport[0x%08x]: unregistering for send-possible "
3170 "notification", port);
3171 previous = MACH_PORT_NULL;
3172 kr = mach_port_request_notification(mach_task_self(), port,
3173 notify_msgid, notify_sync, MACH_PORT_NULL,
3174 MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous);
3175 DISPATCH_VERIFY_MIG(kr);
3176
3177 switch (kr) {
3178 case KERN_INVALID_NAME:
3179 case KERN_INVALID_RIGHT:
3180 case KERN_INVALID_ARGUMENT:
3181 break;
3182 default:
3183 if (dispatch_assume_zero(kr)) {
3184 // log the error
3185 }
3186 }
3187 } else {
3188 return 0;
3189 }
3190 if (slowpath(previous)) {
3191 // the kernel has not consumed the send-once right yet
3192 (void)dispatch_assume_zero(
3193 _dispatch_send_consume_send_once_right(previous));
3194 }
3195 return krr;
3196 }
3197
3198 static void
3199 _dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED)
3200 {
3201 (void)_dispatch_get_mach_recv_portset();
3202 _dispatch_debug("registering for calendar-change notification");
3203 kern_return_t kr = host_request_notification(_dispatch_get_mach_host_port(),
3204 HOST_NOTIFY_CALENDAR_CHANGE, _dispatch_mach_notify_port);
3205 DISPATCH_VERIFY_MIG(kr);
3206 (void)dispatch_assume_zero(kr);
3207 }
3208
3209 static void
3210 _dispatch_mach_host_calendar_change_register(void)
3211 {
3212 static dispatch_once_t pred;
3213 dispatch_once_f(&pred, NULL, _dispatch_mach_host_notify_update);
3214 }
3215
3216 static void
3217 _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr)
3218 {
3219 mig_reply_error_t reply;
3220 dispatch_assert(sizeof(mig_reply_error_t) == sizeof(union
3221 __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem));
3222 dispatch_assert(sizeof(mig_reply_error_t) < _dispatch_mach_recv_msg_size);
3223 boolean_t success = libdispatch_internal_protocol_server(hdr, &reply.Head);
3224 if (!success && reply.RetCode == MIG_BAD_ID && hdr->msgh_id == 950) {
3225 // host_notify_reply.defs: host_calendar_changed
3226 _dispatch_debug("calendar-change notification");
3227 _dispatch_timers_calendar_change();
3228 _dispatch_mach_host_notify_update(NULL);
3229 success = TRUE;
3230 reply.RetCode = KERN_SUCCESS;
3231 }
3232 if (dispatch_assume(success) && reply.RetCode != MIG_NO_REPLY) {
3233 (void)dispatch_assume_zero(reply.RetCode);
3234 }
3235 }
3236
3237 kern_return_t
3238 _dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED,
3239 mach_port_name_t name)
3240 {
3241 #if DISPATCH_DEBUG
3242 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
3243 "deleted prematurely", name);
3244 #endif
3245
3246 _dispatch_debug_machport(name);
3247 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED, true);
3248
3249 return KERN_SUCCESS;
3250 }
3251
3252 kern_return_t
3253 _dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED,
3254 mach_port_name_t name)
3255 {
3256 kern_return_t kr;
3257
3258 _dispatch_debug("machport[0x%08x]: dead-name notification", name);
3259 _dispatch_debug_machport(name);
3260 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD, true);
3261
3262 // the act of receiving a dead name notification allocates a dead-name
3263 // right that must be deallocated
3264 kr = mach_port_deallocate(mach_task_self(), name);
3265 DISPATCH_VERIFY_MIG(kr);
3266 //(void)dispatch_assume_zero(kr);
3267
3268 return KERN_SUCCESS;
3269 }
3270
3271 kern_return_t
3272 _dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED,
3273 mach_port_name_t name)
3274 {
3275 _dispatch_debug("machport[0x%08x]: send-possible notification", name);
3276 _dispatch_debug_machport(name);
3277 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE, false);
3278
3279 return KERN_SUCCESS;
3280 }
3281
3282 #pragma mark -
3283 #pragma mark dispatch_mach_t
3284
3285 #define DISPATCH_MACH_NEVER_CONNECTED (UINT32_MAX/2)
3286 #define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2
3287 #define DISPATCH_MACH_OPTIONS_MASK 0xffff
3288
3289 static mach_port_t _dispatch_mach_msg_get_remote_port(dispatch_object_t dou);
3290 static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm,
3291 mach_port_t local_port, mach_port_t remote_port);
3292 static dispatch_mach_msg_t _dispatch_mach_msg_create_reply_disconnected(
3293 dispatch_object_t dou, dispatch_mach_reply_refs_t dmr);
3294 static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm,
3295 dispatch_object_t dou);
3296 static inline mach_msg_header_t* _dispatch_mach_msg_get_msg(
3297 dispatch_mach_msg_t dmsg);
3298 static void _dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou,
3299 pthread_priority_t pp);
3300
3301 static dispatch_mach_t
3302 _dispatch_mach_create(const char *label, dispatch_queue_t q, void *context,
3303 dispatch_mach_handler_function_t handler, bool handler_is_block)
3304 {
3305 dispatch_mach_t dm;
3306 dispatch_mach_refs_t dr;
3307
3308 dm = _dispatch_alloc(DISPATCH_VTABLE(mach),
3309 sizeof(struct dispatch_mach_s));
3310 _dispatch_queue_init((dispatch_queue_t)dm);
3311 dm->dq_label = label;
3312
3313 dm->do_ref_cnt++; // the reference _dispatch_mach_cancel_invoke holds
3314 dm->do_ref_cnt++; // since channel is created suspended
3315 dm->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
3316 dm->do_targetq = &_dispatch_mgr_q;
3317
3318 dr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s));
3319 dr->dr_source_wref = _dispatch_ptr2wref(dm);
3320 dr->dm_handler_func = handler;
3321 dr->dm_handler_ctxt = context;
3322 dm->ds_refs = dr;
3323 dm->dm_handler_is_block = handler_is_block;
3324
3325 dm->dm_refs = _dispatch_calloc(1ul,
3326 sizeof(struct dispatch_mach_send_refs_s));
3327 dm->dm_refs->dr_source_wref = _dispatch_ptr2wref(dm);
3328 dm->dm_refs->dm_disconnect_cnt = DISPATCH_MACH_NEVER_CONNECTED;
3329 TAILQ_INIT(&dm->dm_refs->dm_replies);
3330
3331 // First item on the channel sets the user-specified target queue
3332 dispatch_set_target_queue(dm, q);
3333 _dispatch_object_debug(dm, "%s", __func__);
3334 return dm;
3335 }
3336
3337 dispatch_mach_t
3338 dispatch_mach_create(const char *label, dispatch_queue_t q,
3339 dispatch_mach_handler_t handler)
3340 {
3341 dispatch_block_t bb = _dispatch_Block_copy((void*)handler);
3342 return _dispatch_mach_create(label, q, bb,
3343 (dispatch_mach_handler_function_t)_dispatch_Block_invoke(bb), true);
3344 }
3345
3346 dispatch_mach_t
3347 dispatch_mach_create_f(const char *label, dispatch_queue_t q, void *context,
3348 dispatch_mach_handler_function_t handler)
3349 {
3350 return _dispatch_mach_create(label, q, context, handler, false);
3351 }
3352
3353 void
3354 _dispatch_mach_dispose(dispatch_mach_t dm)
3355 {
3356 _dispatch_object_debug(dm, "%s", __func__);
3357 dispatch_mach_refs_t dr = dm->ds_refs;
3358 if (dm->dm_handler_is_block && dr->dm_handler_ctxt) {
3359 Block_release(dr->dm_handler_ctxt);
3360 }
3361 free(dr);
3362 free(dm->dm_refs);
3363 _dispatch_queue_destroy(dm);
3364 }
3365
3366 void
3367 dispatch_mach_connect(dispatch_mach_t dm, mach_port_t receive,
3368 mach_port_t send, dispatch_mach_msg_t checkin)
3369 {
3370 dispatch_mach_send_refs_t dr = dm->dm_refs;
3371 dispatch_kevent_t dk;
3372
3373 if (MACH_PORT_VALID(receive)) {
3374 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3375 dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
3376 dk->dk_kevent.ident = receive;
3377 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3378 dk->dk_kevent.udata = (uintptr_t)dk;
3379 TAILQ_INIT(&dk->dk_sources);
3380 dm->ds_dkev = dk;
3381 dm->ds_pending_data_mask = dk->dk_kevent.fflags;
3382 _dispatch_retain(dm); // the reference the manager queue holds
3383 }
3384 dr->dm_send = send;
3385 if (MACH_PORT_VALID(send)) {
3386 if (checkin) {
3387 dispatch_retain(checkin);
3388 mach_msg_option_t options = _dispatch_mach_checkin_options();
3389 _dispatch_mach_msg_set_options(checkin, options);
3390 dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
3391 }
3392 dr->dm_checkin = checkin;
3393 }
3394 // monitor message reply ports
3395 dm->ds_pending_data_mask |= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
3396 if (slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_disconnect_cnt,
3397 DISPATCH_MACH_NEVER_CONNECTED, 0, release))) {
3398 DISPATCH_CLIENT_CRASH("Channel already connected");
3399 }
3400 _dispatch_object_debug(dm, "%s", __func__);
3401 return dispatch_resume(dm);
3402 }
3403
3404 DISPATCH_NOINLINE
3405 static void
3406 _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
3407 dispatch_mach_reply_refs_t dmr, bool disconnected)
3408 {
3409 dispatch_mach_msg_t dmsgr = NULL;
3410 if (disconnected) {
3411 dmsgr = _dispatch_mach_msg_create_reply_disconnected(NULL, dmr);
3412 }
3413 dispatch_kevent_t dk = dmr->dmr_dkev;
3414 TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dmr, dr_list);
3415 _dispatch_kevent_unregister(dk, DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE, 0);
3416 TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dmr_list);
3417 if (dmr->dmr_voucher) _voucher_release(dmr->dmr_voucher);
3418 free(dmr);
3419 if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority);
3420 }
3421
3422 DISPATCH_NOINLINE
3423 static void
3424 _dispatch_mach_reply_kevent_register(dispatch_mach_t dm, mach_port_t reply,
3425 dispatch_mach_msg_t dmsg)
3426 {
3427 dispatch_kevent_t dk;
3428 dispatch_mach_reply_refs_t dmr;
3429
3430 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3431 dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
3432 dk->dk_kevent.ident = reply;
3433 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3434 dk->dk_kevent.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
3435 dk->dk_kevent.udata = (uintptr_t)dk;
3436 TAILQ_INIT(&dk->dk_sources);
3437
3438 dmr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s));
3439 dmr->dr_source_wref = _dispatch_ptr2wref(dm);
3440 dmr->dmr_dkev = dk;
3441 if (dmsg->dmsg_voucher) {
3442 dmr->dmr_voucher =_voucher_retain(dmsg->dmsg_voucher);
3443 }
3444 dmr->dmr_priority = dmsg->dmsg_priority;
3445 // make reply context visible to leaks rdar://11777199
3446 dmr->dmr_ctxt = dmsg->do_ctxt;
3447
3448 _dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p", reply,
3449 dmsg->do_ctxt);
3450 uint32_t flags;
3451 bool do_resume = _dispatch_kevent_register(&dmr->dmr_dkev, &flags);
3452 TAILQ_INSERT_TAIL(&dmr->dmr_dkev->dk_sources, (dispatch_source_refs_t)dmr,
3453 dr_list);
3454 TAILQ_INSERT_TAIL(&dm->dm_refs->dm_replies, dmr, dmr_list);
3455 if (do_resume && _dispatch_kevent_resume(dmr->dmr_dkev, flags, 0)) {
3456 _dispatch_mach_reply_kevent_unregister(dm, dmr, true);
3457 }
3458 }
3459
3460 DISPATCH_NOINLINE
3461 static void
3462 _dispatch_mach_kevent_unregister(dispatch_mach_t dm)
3463 {
3464 dispatch_kevent_t dk = dm->dm_dkev;
3465 dm->dm_dkev = NULL;
3466 TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dm->dm_refs,
3467 dr_list);
3468 dm->ds_pending_data_mask &= ~(unsigned long)
3469 (DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
3470 _dispatch_kevent_unregister(dk,
3471 DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD, 0);
3472 }
3473
3474 DISPATCH_NOINLINE
3475 static void
3476 _dispatch_mach_kevent_register(dispatch_mach_t dm, mach_port_t send)
3477 {
3478 dispatch_kevent_t dk;
3479
3480 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3481 dk->dk_kevent = _dispatch_source_type_mach_send.ke;
3482 dk->dk_kevent.ident = send;
3483 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3484 dk->dk_kevent.fflags = DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD;
3485 dk->dk_kevent.udata = (uintptr_t)dk;
3486 TAILQ_INIT(&dk->dk_sources);
3487
3488 dm->ds_pending_data_mask |= dk->dk_kevent.fflags;
3489
3490 uint32_t flags;
3491 bool do_resume = _dispatch_kevent_register(&dk, &flags);
3492 TAILQ_INSERT_TAIL(&dk->dk_sources,
3493 (dispatch_source_refs_t)dm->dm_refs, dr_list);
3494 dm->dm_dkev = dk;
3495 if (do_resume && _dispatch_kevent_resume(dm->dm_dkev, flags, 0)) {
3496 _dispatch_mach_kevent_unregister(dm);
3497 }
3498 }
3499
3500 static inline void
3501 _dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou,
3502 pthread_priority_t pp)
3503 {
3504 return _dispatch_queue_push(dm._dq, dou, pp);
3505 }
3506
3507 static inline void
3508 _dispatch_mach_msg_set_options(dispatch_object_t dou, mach_msg_option_t options)
3509 {
3510 dou._do->do_suspend_cnt = (unsigned int)options;
3511 }
3512
3513 static inline mach_msg_option_t
3514 _dispatch_mach_msg_get_options(dispatch_object_t dou)
3515 {
3516 mach_msg_option_t options = (mach_msg_option_t)dou._do->do_suspend_cnt;
3517 return options;
3518 }
3519
3520 static inline void
3521 _dispatch_mach_msg_set_reason(dispatch_object_t dou, mach_error_t err,
3522 unsigned long reason)
3523 {
3524 dispatch_assert_zero(reason & ~(unsigned long)code_emask);
3525 dou._do->do_suspend_cnt = (unsigned int)((err || !reason) ? err :
3526 err_local|err_sub(0x3e0)|(mach_error_t)reason);
3527 }
3528
3529 static inline unsigned long
3530 _dispatch_mach_msg_get_reason(dispatch_object_t dou, mach_error_t *err_ptr)
3531 {
3532 mach_error_t err = (mach_error_t)dou._do->do_suspend_cnt;
3533 dou._do->do_suspend_cnt = 0;
3534 if ((err & system_emask) == err_local && err_get_sub(err) == 0x3e0) {
3535 *err_ptr = 0;
3536 return err_get_code(err);
3537 }
3538 *err_ptr = err;
3539 return err ? DISPATCH_MACH_MESSAGE_SEND_FAILED : DISPATCH_MACH_MESSAGE_SENT;
3540 }
3541
3542 static void
3543 _dispatch_mach_msg_recv(dispatch_mach_t dm, dispatch_mach_reply_refs_t dmr,
3544 mach_msg_header_t *hdr, mach_msg_size_t siz)
3545 {
3546 _dispatch_debug_machport(hdr->msgh_remote_port);
3547 _dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x",
3548 hdr->msgh_local_port, hdr->msgh_id, hdr->msgh_remote_port);
3549 if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3550 return _dispatch_kevent_mach_msg_destroy(hdr);
3551 }
3552 dispatch_mach_msg_t dmsg;
3553 voucher_t voucher;
3554 pthread_priority_t priority;
3555 void *ctxt = NULL;
3556 if (dmr) {
3557 _voucher_mach_msg_clear(hdr, false); // deallocate reply message voucher
3558 voucher = dmr->dmr_voucher;
3559 dmr->dmr_voucher = NULL; // transfer reference
3560 priority = dmr->dmr_priority;
3561 ctxt = dmr->dmr_ctxt;
3562 _dispatch_mach_reply_kevent_unregister(dm, dmr, false);
3563 } else {
3564 voucher = voucher_create_with_mach_msg(hdr);
3565 priority = _voucher_get_priority(voucher);
3566 }
3567 dispatch_mach_msg_destructor_t destructor;
3568 destructor = (hdr == _dispatch_get_mach_recv_msg_buf()) ?
3569 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT :
3570 DISPATCH_MACH_MSG_DESTRUCTOR_FREE;
3571 dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL);
3572 dmsg->dmsg_voucher = voucher;
3573 dmsg->dmsg_priority = priority;
3574 dmsg->do_ctxt = ctxt;
3575 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_RECEIVED);
3576 _dispatch_voucher_debug("mach-msg[%p] create", voucher, dmsg);
3577 _dispatch_voucher_ktrace_dmsg_push(dmsg);
3578 return _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3579 }
3580
3581 static inline mach_port_t
3582 _dispatch_mach_msg_get_remote_port(dispatch_object_t dou)
3583 {
3584 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg);
3585 mach_port_t remote = hdr->msgh_remote_port;
3586 return remote;
3587 }
3588
3589 static inline void
3590 _dispatch_mach_msg_disconnected(dispatch_mach_t dm, mach_port_t local_port,
3591 mach_port_t remote_port)
3592 {
3593 mach_msg_header_t *hdr;
3594 dispatch_mach_msg_t dmsg;
3595 dmsg = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
3596 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
3597 if (local_port) hdr->msgh_local_port = local_port;
3598 if (remote_port) hdr->msgh_remote_port = remote_port;
3599 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_DISCONNECTED);
3600 return _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3601 }
3602
3603 static inline dispatch_mach_msg_t
3604 _dispatch_mach_msg_create_reply_disconnected(dispatch_object_t dou,
3605 dispatch_mach_reply_refs_t dmr)
3606 {
3607 dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr;
3608 if (dmsg && !dmsg->dmsg_reply) return NULL;
3609 mach_msg_header_t *hdr;
3610 dmsgr = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
3611 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
3612 if (dmsg) {
3613 hdr->msgh_local_port = dmsg->dmsg_reply;
3614 if (dmsg->dmsg_voucher) {
3615 dmsgr->dmsg_voucher = _voucher_retain(dmsg->dmsg_voucher);
3616 }
3617 dmsgr->dmsg_priority = dmsg->dmsg_priority;
3618 dmsgr->do_ctxt = dmsg->do_ctxt;
3619 } else {
3620 hdr->msgh_local_port = (mach_port_t)dmr->dmr_dkev->dk_kevent.ident;
3621 dmsgr->dmsg_voucher = dmr->dmr_voucher;
3622 dmr->dmr_voucher = NULL; // transfer reference
3623 dmsgr->dmsg_priority = dmr->dmr_priority;
3624 dmsgr->do_ctxt = dmr->dmr_ctxt;
3625 }
3626 _dispatch_mach_msg_set_reason(dmsgr, 0, DISPATCH_MACH_DISCONNECTED);
3627 return dmsgr;
3628 }
3629
3630 DISPATCH_NOINLINE
3631 static void
3632 _dispatch_mach_msg_not_sent(dispatch_mach_t dm, dispatch_object_t dou)
3633 {
3634 dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr;
3635 dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL);
3636 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_NOT_SENT);
3637 _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3638 if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority);
3639 }
3640
3641 DISPATCH_NOINLINE
3642 static dispatch_object_t
3643 _dispatch_mach_msg_send(dispatch_mach_t dm, dispatch_object_t dou)
3644 {
3645 dispatch_mach_send_refs_t dr = dm->dm_refs;
3646 dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr = NULL;
3647 voucher_t voucher = dmsg->dmsg_voucher;
3648 mach_voucher_t ipc_kvoucher = MACH_VOUCHER_NULL;
3649 bool clear_voucher = false, kvoucher_move_send = false;
3650 dr->dm_needs_mgr = 0;
3651 if (slowpath(dr->dm_checkin) && dmsg != dr->dm_checkin) {
3652 // send initial checkin message
3653 if (dm->dm_dkev && slowpath(_dispatch_queue_get_current() !=
3654 &_dispatch_mgr_q)) {
3655 // send kevent must be uninstalled on the manager queue
3656 dr->dm_needs_mgr = 1;
3657 goto out;
3658 }
3659 dr->dm_checkin = _dispatch_mach_msg_send(dm, dr->dm_checkin)._dmsg;
3660 if (slowpath(dr->dm_checkin)) {
3661 goto out;
3662 }
3663 }
3664 mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
3665 mach_msg_return_t kr = 0;
3666 mach_port_t reply = dmsg->dmsg_reply;
3667 mach_msg_option_t opts = 0, msg_opts = _dispatch_mach_msg_get_options(dmsg);
3668 if (!slowpath(msg_opts & DISPATCH_MACH_REGISTER_FOR_REPLY)) {
3669 opts = MACH_SEND_MSG | (msg_opts & ~DISPATCH_MACH_OPTIONS_MASK);
3670 if (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) !=
3671 MACH_MSG_TYPE_MOVE_SEND_ONCE) {
3672 if (dmsg != dr->dm_checkin) {
3673 msg->msgh_remote_port = dr->dm_send;
3674 }
3675 if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
3676 if (slowpath(!dm->dm_dkev)) {
3677 _dispatch_mach_kevent_register(dm, msg->msgh_remote_port);
3678 }
3679 if (fastpath(dm->dm_dkev)) {
3680 if (DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) {
3681 goto out;
3682 }
3683 opts |= MACH_SEND_NOTIFY;
3684 }
3685 }
3686 opts |= MACH_SEND_TIMEOUT;
3687 if (dmsg->dmsg_priority != _voucher_get_priority(voucher)) {
3688 ipc_kvoucher = _voucher_create_mach_voucher_with_priority(
3689 voucher, dmsg->dmsg_priority);
3690 }
3691 _dispatch_voucher_debug("mach-msg[%p] msg_set", voucher, dmsg);
3692 if (ipc_kvoucher) {
3693 kvoucher_move_send = true;
3694 clear_voucher = _voucher_mach_msg_set_mach_voucher(msg,
3695 ipc_kvoucher, kvoucher_move_send);
3696 } else {
3697 clear_voucher = _voucher_mach_msg_set(msg, voucher);
3698 }
3699 }
3700 _voucher_activity_trace_msg(voucher, msg, send);
3701 _dispatch_debug_machport(msg->msgh_remote_port);
3702 if (reply) _dispatch_debug_machport(reply);
3703 kr = mach_msg(msg, opts, msg->msgh_size, 0, MACH_PORT_NULL, 0,
3704 MACH_PORT_NULL);
3705 _dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, "
3706 "opts 0x%x, msg_opts 0x%x, kvoucher 0x%08x, reply on 0x%08x: "
3707 "%s - 0x%x", msg->msgh_remote_port, msg->msgh_id, dmsg->do_ctxt,
3708 opts, msg_opts, msg->msgh_voucher_port, reply,
3709 mach_error_string(kr), kr);
3710 if (clear_voucher) {
3711 if (kr == MACH_SEND_INVALID_VOUCHER && msg->msgh_voucher_port) {
3712 DISPATCH_CRASH("Voucher port corruption");
3713 }
3714 mach_voucher_t kv;
3715 kv = _voucher_mach_msg_clear(msg, kvoucher_move_send);
3716 if (kvoucher_move_send) ipc_kvoucher = kv;
3717 }
3718 }
3719 if (kr == MACH_SEND_TIMED_OUT && (opts & MACH_SEND_TIMEOUT)) {
3720 if (opts & MACH_SEND_NOTIFY) {
3721 _dispatch_debug("machport[0x%08x]: send-possible notification "
3722 "armed", (mach_port_t)dm->dm_dkev->dk_kevent.ident);
3723 DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) = 1;
3724 } else {
3725 // send kevent must be installed on the manager queue
3726 dr->dm_needs_mgr = 1;
3727 }
3728 if (ipc_kvoucher) {
3729 _dispatch_kvoucher_debug("reuse on re-send", ipc_kvoucher);
3730 voucher_t ipc_voucher;
3731 ipc_voucher = _voucher_create_with_priority_and_mach_voucher(
3732 voucher, dmsg->dmsg_priority, ipc_kvoucher);
3733 _dispatch_voucher_debug("mach-msg[%p] replace voucher[%p]",
3734 ipc_voucher, dmsg, voucher);
3735 if (dmsg->dmsg_voucher) _voucher_release(dmsg->dmsg_voucher);
3736 dmsg->dmsg_voucher = ipc_voucher;
3737 }
3738 goto out;
3739 } else if (ipc_kvoucher && (kr || !kvoucher_move_send)) {
3740 _voucher_dealloc_mach_voucher(ipc_kvoucher);
3741 }
3742 if (fastpath(!kr) && reply &&
3743 !(dm->ds_dkev && dm->ds_dkev->dk_kevent.ident == reply)) {
3744 if (_dispatch_queue_get_current() != &_dispatch_mgr_q) {
3745 // reply receive kevent must be installed on the manager queue
3746 dr->dm_needs_mgr = 1;
3747 _dispatch_mach_msg_set_options(dmsg, msg_opts |
3748 DISPATCH_MACH_REGISTER_FOR_REPLY);
3749 goto out;
3750 }
3751 _dispatch_mach_reply_kevent_register(dm, reply, dmsg);
3752 }
3753 if (slowpath(dmsg == dr->dm_checkin) && dm->dm_dkev) {
3754 _dispatch_mach_kevent_unregister(dm);
3755 }
3756 if (slowpath(kr)) {
3757 // Send failed, so reply was never connected <rdar://problem/14309159>
3758 dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL);
3759 }
3760 _dispatch_mach_msg_set_reason(dmsg, kr, 0);
3761 _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3762 if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority);
3763 dmsg = NULL;
3764 out:
3765 return (dispatch_object_t)dmsg;
3766 }
3767
3768 DISPATCH_ALWAYS_INLINE
3769 static inline void
3770 _dispatch_mach_send_push_wakeup(dispatch_mach_t dm, dispatch_object_t dou,
3771 bool wakeup)
3772 {
3773 dispatch_mach_send_refs_t dr = dm->dm_refs;
3774 struct dispatch_object_s *prev, *dc = dou._do;
3775 dc->do_next = NULL;
3776
3777 prev = dispatch_atomic_xchg2o(dr, dm_tail, dc, release);
3778 if (fastpath(prev)) {
3779 prev->do_next = dc;
3780 } else {
3781 dr->dm_head = dc;
3782 }
3783 if (wakeup || !prev) {
3784 _dispatch_wakeup(dm);
3785 }
3786 }
3787
3788 DISPATCH_ALWAYS_INLINE
3789 static inline void
3790 _dispatch_mach_send_push(dispatch_mach_t dm, dispatch_object_t dou)
3791 {
3792 return _dispatch_mach_send_push_wakeup(dm, dou, false);
3793 }
3794
3795 DISPATCH_NOINLINE
3796 static void
3797 _dispatch_mach_send_drain(dispatch_mach_t dm)
3798 {
3799 dispatch_mach_send_refs_t dr = dm->dm_refs;
3800 struct dispatch_object_s *dc = NULL, *next_dc = NULL;
3801 while (dr->dm_tail) {
3802 _dispatch_wait_until(dc = fastpath(dr->dm_head));
3803 do {
3804 next_dc = fastpath(dc->do_next);
3805 dr->dm_head = next_dc;
3806 if (!next_dc && !dispatch_atomic_cmpxchg2o(dr, dm_tail, dc, NULL,
3807 relaxed)) {
3808 _dispatch_wait_until(next_dc = fastpath(dc->do_next));
3809 dr->dm_head = next_dc;
3810 }
3811 if (!DISPATCH_OBJ_IS_VTABLE(dc)) {
3812 if ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
3813 // send barrier
3814 // leave send queue locked until barrier has completed
3815 return _dispatch_mach_push(dm, dc,
3816 ((dispatch_continuation_t)dc)->dc_priority);
3817 }
3818 #if DISPATCH_MACH_SEND_SYNC
3819 if (slowpath((long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT)){
3820 _dispatch_thread_semaphore_signal(
3821 (_dispatch_thread_semaphore_t)dc->do_ctxt);
3822 continue;
3823 }
3824 #endif // DISPATCH_MACH_SEND_SYNC
3825 if (slowpath(!_dispatch_mach_reconnect_invoke(dm, dc))) {
3826 goto out;
3827 }
3828 continue;
3829 }
3830 _dispatch_voucher_ktrace_dmsg_pop((dispatch_mach_msg_t)dc);
3831 if (slowpath(dr->dm_disconnect_cnt) ||
3832 slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3833 _dispatch_mach_msg_not_sent(dm, dc);
3834 continue;
3835 }
3836 if (slowpath(dc = _dispatch_mach_msg_send(dm, dc)._do)) {
3837 goto out;
3838 }
3839 } while ((dc = next_dc));
3840 }
3841 out:
3842 // if this is not a complete drain, we must undo some things
3843 if (slowpath(dc)) {
3844 if (!next_dc &&
3845 !dispatch_atomic_cmpxchg2o(dr, dm_tail, NULL, dc, relaxed)) {
3846 // wait for enqueue slow path to finish
3847 _dispatch_wait_until(next_dc = fastpath(dr->dm_head));
3848 dc->do_next = next_dc;
3849 }
3850 dr->dm_head = dc;
3851 }
3852 (void)dispatch_atomic_dec2o(dr, dm_sending, release);
3853 _dispatch_wakeup(dm);
3854 }
3855
3856 static inline void
3857 _dispatch_mach_send(dispatch_mach_t dm)
3858 {
3859 dispatch_mach_send_refs_t dr = dm->dm_refs;
3860 if (!fastpath(dr->dm_tail) || !fastpath(dispatch_atomic_cmpxchg2o(dr,
3861 dm_sending, 0, 1, acquire))) {
3862 return;
3863 }
3864 _dispatch_object_debug(dm, "%s", __func__);
3865 _dispatch_mach_send_drain(dm);
3866 }
3867
3868 static void
3869 _dispatch_mach_merge_kevent(dispatch_mach_t dm,
3870 const _dispatch_kevent_qos_s *ke)
3871 {
3872 if (!(ke->fflags & dm->ds_pending_data_mask)) {
3873 return;
3874 }
3875 _dispatch_mach_send(dm);
3876 }
3877
3878 static inline mach_msg_option_t
3879 _dispatch_mach_checkin_options(void)
3880 {
3881 mach_msg_option_t options = 0;
3882 #if DISPATCH_USE_CHECKIN_NOIMPORTANCE
3883 options = MACH_SEND_NOIMPORTANCE; // <rdar://problem/16996737>
3884 #endif
3885 return options;
3886 }
3887
3888
3889 static inline mach_msg_option_t
3890 _dispatch_mach_send_options(void)
3891 {
3892 mach_msg_option_t options = 0;
3893 return options;
3894 }
3895
3896 DISPATCH_ALWAYS_INLINE
3897 static inline pthread_priority_t
3898 _dispatch_mach_priority_propagate(mach_msg_option_t options)
3899 {
3900 #if DISPATCH_USE_NOIMPORTANCE_QOS
3901 if (options & MACH_SEND_NOIMPORTANCE) return 0;
3902 #else
3903 (void)options;
3904 #endif
3905 return _dispatch_priority_propagate();
3906 }
3907
3908 DISPATCH_NOINLINE
3909 void
3910 dispatch_mach_send(dispatch_mach_t dm, dispatch_mach_msg_t dmsg,
3911 mach_msg_option_t options)
3912 {
3913 dispatch_mach_send_refs_t dr = dm->dm_refs;
3914 if (slowpath(dmsg->do_next != DISPATCH_OBJECT_LISTLESS)) {
3915 DISPATCH_CLIENT_CRASH("Message already enqueued");
3916 }
3917 dispatch_retain(dmsg);
3918 dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK);
3919 pthread_priority_t priority = _dispatch_mach_priority_propagate(options);
3920 options |= _dispatch_mach_send_options();
3921 _dispatch_mach_msg_set_options(dmsg, options & ~DISPATCH_MACH_OPTIONS_MASK);
3922 mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
3923 dmsg->dmsg_reply = (MACH_MSGH_BITS_LOCAL(msg->msgh_bits) ==
3924 MACH_MSG_TYPE_MAKE_SEND_ONCE &&
3925 MACH_PORT_VALID(msg->msgh_local_port) ? msg->msgh_local_port :
3926 MACH_PORT_NULL);
3927 bool is_reply = (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) ==
3928 MACH_MSG_TYPE_MOVE_SEND_ONCE);
3929 dmsg->dmsg_priority = priority;
3930 dmsg->dmsg_voucher = _voucher_copy();
3931 _dispatch_voucher_debug("mach-msg[%p] set", dmsg->dmsg_voucher, dmsg);
3932 if ((!is_reply && slowpath(dr->dm_tail)) ||
3933 slowpath(dr->dm_disconnect_cnt) ||
3934 slowpath(dm->ds_atomic_flags & DSF_CANCELED) ||
3935 slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1,
3936 acquire))) {
3937 _dispatch_voucher_ktrace_dmsg_push(dmsg);
3938 return _dispatch_mach_send_push(dm, dmsg);
3939 }
3940 if (slowpath(dmsg = _dispatch_mach_msg_send(dm, dmsg)._dmsg)) {
3941 (void)dispatch_atomic_dec2o(dr, dm_sending, release);
3942 _dispatch_voucher_ktrace_dmsg_push(dmsg);
3943 return _dispatch_mach_send_push_wakeup(dm, dmsg, true);
3944 }
3945 if (!is_reply && slowpath(dr->dm_tail)) {
3946 return _dispatch_mach_send_drain(dm);
3947 }
3948 (void)dispatch_atomic_dec2o(dr, dm_sending, release);
3949 _dispatch_wakeup(dm);
3950 }
3951
3952 static void
3953 _dispatch_mach_disconnect(dispatch_mach_t dm)
3954 {
3955 dispatch_mach_send_refs_t dr = dm->dm_refs;
3956 if (dm->dm_dkev) {
3957 _dispatch_mach_kevent_unregister(dm);
3958 }
3959 if (MACH_PORT_VALID(dr->dm_send)) {
3960 _dispatch_mach_msg_disconnected(dm, MACH_PORT_NULL, dr->dm_send);
3961 }
3962 dr->dm_send = MACH_PORT_NULL;
3963 if (dr->dm_checkin) {
3964 _dispatch_mach_msg_not_sent(dm, dr->dm_checkin);
3965 dr->dm_checkin = NULL;
3966 }
3967 if (!TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3968 dispatch_mach_reply_refs_t dmr, tmp;
3969 TAILQ_FOREACH_SAFE(dmr, &dm->dm_refs->dm_replies, dmr_list, tmp){
3970 _dispatch_mach_reply_kevent_unregister(dm, dmr, true);
3971 }
3972 }
3973 }
3974
3975 DISPATCH_NOINLINE
3976 static bool
3977 _dispatch_mach_cancel(dispatch_mach_t dm)
3978 {
3979 dispatch_mach_send_refs_t dr = dm->dm_refs;
3980 if (!fastpath(dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1, acquire))) {
3981 return false;
3982 }
3983 _dispatch_object_debug(dm, "%s", __func__);
3984 _dispatch_mach_disconnect(dm);
3985 if (dm->ds_dkev) {
3986 mach_port_t local_port = (mach_port_t)dm->ds_dkev->dk_kevent.ident;
3987 _dispatch_source_kevent_unregister((dispatch_source_t)dm);
3988 _dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL);
3989 }
3990 (void)dispatch_atomic_dec2o(dr, dm_sending, release);
3991 return true;
3992 }
3993
3994 DISPATCH_NOINLINE
3995 static bool
3996 _dispatch_mach_reconnect_invoke(dispatch_mach_t dm, dispatch_object_t dou)
3997 {
3998 if (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3999 if (slowpath(_dispatch_queue_get_current() != &_dispatch_mgr_q)) {
4000 // send/reply kevents must be uninstalled on the manager queue
4001 return false;
4002 }
4003 }
4004 _dispatch_mach_disconnect(dm);
4005 dispatch_mach_send_refs_t dr = dm->dm_refs;
4006 dr->dm_checkin = dou._dc->dc_data;
4007 dr->dm_send = (mach_port_t)dou._dc->dc_other;
4008 _dispatch_continuation_free(dou._dc);
4009 (void)dispatch_atomic_dec2o(dr, dm_disconnect_cnt, relaxed);
4010 _dispatch_object_debug(dm, "%s", __func__);
4011 return true;
4012 }
4013
4014 DISPATCH_NOINLINE
4015 void
4016 dispatch_mach_reconnect(dispatch_mach_t dm, mach_port_t send,
4017 dispatch_mach_msg_t checkin)
4018 {
4019 dispatch_mach_send_refs_t dr = dm->dm_refs;
4020 (void)dispatch_atomic_inc2o(dr, dm_disconnect_cnt, relaxed);
4021 if (MACH_PORT_VALID(send) && checkin) {
4022 dispatch_retain(checkin);
4023 mach_msg_option_t options = _dispatch_mach_checkin_options();
4024 _dispatch_mach_msg_set_options(checkin, options);
4025 dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
4026 } else {
4027 checkin = NULL;
4028 dr->dm_checkin_port = MACH_PORT_NULL;
4029 }
4030 dispatch_continuation_t dc = _dispatch_continuation_alloc();
4031 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
4032 dc->dc_func = (void*)_dispatch_mach_reconnect_invoke;
4033 dc->dc_ctxt = dc;
4034 dc->dc_data = checkin;
4035 dc->dc_other = (void*)(uintptr_t)send;
4036 return _dispatch_mach_send_push(dm, dc);
4037 }
4038
4039 #if DISPATCH_MACH_SEND_SYNC
4040 DISPATCH_NOINLINE
4041 static void
4042 _dispatch_mach_send_sync_slow(dispatch_mach_t dm)
4043 {
4044 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
4045 struct dispatch_object_s dc = {
4046 .do_vtable = (void *)(DISPATCH_OBJ_SYNC_SLOW_BIT),
4047 .do_ctxt = (void*)sema,
4048 };
4049 _dispatch_mach_send_push(dm, &dc);
4050 _dispatch_thread_semaphore_wait(sema);
4051 _dispatch_put_thread_semaphore(sema);
4052 }
4053 #endif // DISPATCH_MACH_SEND_SYNC
4054
4055 DISPATCH_NOINLINE
4056 mach_port_t
4057 dispatch_mach_get_checkin_port(dispatch_mach_t dm)
4058 {
4059 dispatch_mach_send_refs_t dr = dm->dm_refs;
4060 if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
4061 return MACH_PORT_DEAD;
4062 }
4063 return dr->dm_checkin_port;
4064 }
4065
4066 DISPATCH_NOINLINE
4067 static void
4068 _dispatch_mach_connect_invoke(dispatch_mach_t dm)
4069 {
4070 dispatch_mach_refs_t dr = dm->ds_refs;
4071 _dispatch_client_callout4(dr->dm_handler_ctxt,
4072 DISPATCH_MACH_CONNECTED, NULL, 0, dr->dm_handler_func);
4073 dm->dm_connect_handler_called = 1;
4074 }
4075
4076 DISPATCH_NOINLINE
4077 void
4078 _dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg,
4079 dispatch_object_t dou DISPATCH_UNUSED,
4080 dispatch_invoke_flags_t flags DISPATCH_UNUSED)
4081 {
4082 dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current();
4083 dispatch_mach_refs_t dr = dm->ds_refs;
4084 mach_error_t err;
4085 unsigned long reason = _dispatch_mach_msg_get_reason(dmsg, &err);
4086
4087 dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
4088 _dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq);
4089 _dispatch_voucher_ktrace_dmsg_pop(dmsg);
4090 _dispatch_voucher_debug("mach-msg[%p] adopt", dmsg->dmsg_voucher, dmsg);
4091 _dispatch_adopt_priority_and_replace_voucher(dmsg->dmsg_priority,
4092 dmsg->dmsg_voucher, DISPATCH_PRIORITY_ENFORCE);
4093 dmsg->dmsg_voucher = NULL;
4094 if (slowpath(!dm->dm_connect_handler_called)) {
4095 _dispatch_mach_connect_invoke(dm);
4096 }
4097 _dispatch_client_callout4(dr->dm_handler_ctxt, reason, dmsg, err,
4098 dr->dm_handler_func);
4099 _dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm);
4100 _dispatch_introspection_queue_item_complete(dmsg);
4101 dispatch_release(dmsg);
4102 }
4103
4104 DISPATCH_NOINLINE
4105 void
4106 _dispatch_mach_barrier_invoke(void *ctxt)
4107 {
4108 dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current();
4109 dispatch_mach_refs_t dr = dm->ds_refs;
4110 struct dispatch_continuation_s *dc = ctxt;
4111 void *context = dc->dc_data;
4112 dispatch_function_t barrier = dc->dc_other;
4113 bool send_barrier = ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT);
4114
4115 _dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq);
4116 if (slowpath(!dm->dm_connect_handler_called)) {
4117 _dispatch_mach_connect_invoke(dm);
4118 }
4119 _dispatch_client_callout(context, barrier);
4120 _dispatch_client_callout4(dr->dm_handler_ctxt,
4121 DISPATCH_MACH_BARRIER_COMPLETED, NULL, 0, dr->dm_handler_func);
4122 _dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm);
4123 if (send_barrier) {
4124 (void)dispatch_atomic_dec2o(dm->dm_refs, dm_sending, release);
4125 }
4126 }
4127
4128 DISPATCH_NOINLINE
4129 void
4130 dispatch_mach_send_barrier_f(dispatch_mach_t dm, void *context,
4131 dispatch_function_t barrier)
4132 {
4133 dispatch_continuation_t dc = _dispatch_continuation_alloc();
4134 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
4135 dc->dc_func = _dispatch_mach_barrier_invoke;
4136 dc->dc_ctxt = dc;
4137 dc->dc_data = context;
4138 dc->dc_other = barrier;
4139 _dispatch_continuation_voucher_set(dc, 0);
4140 _dispatch_continuation_priority_set(dc, 0, 0);
4141
4142 dispatch_mach_send_refs_t dr = dm->dm_refs;
4143 if (slowpath(dr->dm_tail) || slowpath(!dispatch_atomic_cmpxchg2o(dr,
4144 dm_sending, 0, 1, acquire))) {
4145 return _dispatch_mach_send_push(dm, dc);
4146 }
4147 // leave send queue locked until barrier has completed
4148 return _dispatch_mach_push(dm, dc, dc->dc_priority);
4149 }
4150
4151 DISPATCH_NOINLINE
4152 void
4153 dispatch_mach_receive_barrier_f(dispatch_mach_t dm, void *context,
4154 dispatch_function_t barrier)
4155 {
4156 dispatch_continuation_t dc = _dispatch_continuation_alloc();
4157 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
4158 dc->dc_func = _dispatch_mach_barrier_invoke;
4159 dc->dc_ctxt = dc;
4160 dc->dc_data = context;
4161 dc->dc_other = barrier;
4162 _dispatch_continuation_voucher_set(dc, 0);
4163 _dispatch_continuation_priority_set(dc, 0, 0);
4164
4165 return _dispatch_mach_push(dm, dc, dc->dc_priority);
4166 }
4167
4168 DISPATCH_NOINLINE
4169 void
4170 dispatch_mach_send_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
4171 {
4172 dispatch_mach_send_barrier_f(dm, _dispatch_Block_copy(barrier),
4173 _dispatch_call_block_and_release);
4174 }
4175
4176 DISPATCH_NOINLINE
4177 void
4178 dispatch_mach_receive_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
4179 {
4180 dispatch_mach_receive_barrier_f(dm, _dispatch_Block_copy(barrier),
4181 _dispatch_call_block_and_release);
4182 }
4183
4184 DISPATCH_NOINLINE
4185 static void
4186 _dispatch_mach_cancel_invoke(dispatch_mach_t dm)
4187 {
4188 dispatch_mach_refs_t dr = dm->ds_refs;
4189 if (slowpath(!dm->dm_connect_handler_called)) {
4190 _dispatch_mach_connect_invoke(dm);
4191 }
4192 _dispatch_client_callout4(dr->dm_handler_ctxt,
4193 DISPATCH_MACH_CANCELED, NULL, 0, dr->dm_handler_func);
4194 dm->dm_cancel_handler_called = 1;
4195 _dispatch_release(dm); // the retain is done at creation time
4196 }
4197
4198 DISPATCH_NOINLINE
4199 void
4200 dispatch_mach_cancel(dispatch_mach_t dm)
4201 {
4202 dispatch_source_cancel((dispatch_source_t)dm);
4203 }
4204
4205 DISPATCH_ALWAYS_INLINE
4206 static inline dispatch_queue_t
4207 _dispatch_mach_invoke2(dispatch_object_t dou,
4208 _dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED)
4209 {
4210 dispatch_mach_t dm = dou._dm;
4211
4212 // This function performs all mach channel actions. Each action is
4213 // responsible for verifying that it takes place on the appropriate queue.
4214 // If the current queue is not the correct queue for this action, the
4215 // correct queue will be returned and the invoke will be re-driven on that
4216 // queue.
4217
4218 // The order of tests here in invoke and in probe should be consistent.
4219
4220 dispatch_queue_t dq = _dispatch_queue_get_current();
4221 dispatch_mach_send_refs_t dr = dm->dm_refs;
4222
4223 if (slowpath(!dm->ds_is_installed)) {
4224 // The channel needs to be installed on the manager queue.
4225 if (dq != &_dispatch_mgr_q) {
4226 return &_dispatch_mgr_q;
4227 }
4228 if (dm->ds_dkev) {
4229 _dispatch_source_kevent_register((dispatch_source_t)dm);
4230 }
4231 dm->ds_is_installed = true;
4232 _dispatch_mach_send(dm);
4233 // Apply initial target queue change
4234 _dispatch_queue_drain(dou);
4235 if (dm->dq_items_tail) {
4236 return dm->do_targetq;
4237 }
4238 } else if (dm->dq_items_tail) {
4239 // The channel has pending messages to deliver to the target queue.
4240 if (dq != dm->do_targetq) {
4241 return dm->do_targetq;
4242 }
4243 dispatch_queue_t tq = dm->do_targetq;
4244 if (slowpath(_dispatch_queue_drain(dou))) {
4245 DISPATCH_CLIENT_CRASH("Sync onto mach channel");
4246 }
4247 if (slowpath(tq != dm->do_targetq)) {
4248 // An item on the channel changed the target queue
4249 return dm->do_targetq;
4250 }
4251 } else if (dr->dm_sending) {
4252 // Sending and uninstallation below require the send lock, the channel
4253 // will be woken up when the lock is dropped <rdar://15132939&15203957>
4254 return NULL;
4255 } else if (dr->dm_tail) {
4256 if (slowpath(dr->dm_needs_mgr) || (slowpath(dr->dm_disconnect_cnt) &&
4257 (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)))) {
4258 // Send/reply kevents need to be installed or uninstalled
4259 if (dq != &_dispatch_mgr_q) {
4260 return &_dispatch_mgr_q;
4261 }
4262 }
4263 if (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) ||
4264 (dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt) {
4265 // The channel has pending messages to send.
4266 _dispatch_mach_send(dm);
4267 }
4268 } else if (dm->ds_atomic_flags & DSF_CANCELED){
4269 // The channel has been cancelled and needs to be uninstalled from the
4270 // manager queue. After uninstallation, the cancellation handler needs
4271 // to be delivered to the target queue.
4272 if (dm->ds_dkev || dm->dm_dkev || dr->dm_send ||
4273 !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
4274 if (dq != &_dispatch_mgr_q) {
4275 return &_dispatch_mgr_q;
4276 }
4277 if (!_dispatch_mach_cancel(dm)) {
4278 return NULL;
4279 }
4280 }
4281 if (!dm->dm_cancel_handler_called) {
4282 if (dq != dm->do_targetq) {
4283 return dm->do_targetq;
4284 }
4285 _dispatch_mach_cancel_invoke(dm);
4286 }
4287 }
4288 return NULL;
4289 }
4290
4291 DISPATCH_NOINLINE
4292 void
4293 _dispatch_mach_invoke(dispatch_mach_t dm, dispatch_object_t dou,
4294 dispatch_invoke_flags_t flags)
4295 {
4296 _dispatch_queue_class_invoke(dm, dou._dc, flags, _dispatch_mach_invoke2);
4297 }
4298
4299 unsigned long
4300 _dispatch_mach_probe(dispatch_mach_t dm)
4301 {
4302 // This function determines whether the mach channel needs to be invoked.
4303 // The order of tests here in probe and in invoke should be consistent.
4304
4305 dispatch_mach_send_refs_t dr = dm->dm_refs;
4306
4307 if (slowpath(!dm->ds_is_installed)) {
4308 // The channel needs to be installed on the manager queue.
4309 return true;
4310 } else if (_dispatch_queue_class_probe(dm)) {
4311 // The source has pending messages to deliver to the target queue.
4312 return true;
4313 } else if (dr->dm_sending) {
4314 // Sending and uninstallation below require the send lock, the channel
4315 // will be woken up when the lock is dropped <rdar://15132939&15203957>
4316 return false;
4317 } else if (dr->dm_tail &&
4318 (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) ||
4319 (dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt)) {
4320 // The channel has pending messages to send.
4321 return true;
4322 } else if (dm->ds_atomic_flags & DSF_CANCELED) {
4323 if (dm->ds_dkev || dm->dm_dkev || dr->dm_send ||
4324 !TAILQ_EMPTY(&dm->dm_refs->dm_replies) ||
4325 !dm->dm_cancel_handler_called) {
4326 // The channel needs to be uninstalled from the manager queue, or
4327 // the cancellation handler needs to be delivered to the target
4328 // queue.
4329 return true;
4330 }
4331 }
4332 // Nothing to do.
4333 return false;
4334 }
4335
4336 #pragma mark -
4337 #pragma mark dispatch_mach_msg_t
4338
4339 dispatch_mach_msg_t
4340 dispatch_mach_msg_create(mach_msg_header_t *msg, size_t size,
4341 dispatch_mach_msg_destructor_t destructor, mach_msg_header_t **msg_ptr)
4342 {
4343 if (slowpath(size < sizeof(mach_msg_header_t)) ||
4344 slowpath(destructor && !msg)) {
4345 DISPATCH_CLIENT_CRASH("Empty message");
4346 }
4347 dispatch_mach_msg_t dmsg = _dispatch_alloc(DISPATCH_VTABLE(mach_msg),
4348 sizeof(struct dispatch_mach_msg_s) +
4349 (destructor ? 0 : size - sizeof(dmsg->dmsg_msg)));
4350 if (destructor) {
4351 dmsg->dmsg_msg = msg;
4352 } else if (msg) {
4353 memcpy(dmsg->dmsg_buf, msg, size);
4354 }
4355 dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
4356 dmsg->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
4357 false);
4358 dmsg->dmsg_destructor = destructor;
4359 dmsg->dmsg_size = size;
4360 if (msg_ptr) {
4361 *msg_ptr = _dispatch_mach_msg_get_msg(dmsg);
4362 }
4363 return dmsg;
4364 }
4365
4366 void
4367 _dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg)
4368 {
4369 if (dmsg->dmsg_voucher) {
4370 _voucher_release(dmsg->dmsg_voucher);
4371 dmsg->dmsg_voucher = NULL;
4372 }
4373 switch (dmsg->dmsg_destructor) {
4374 case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT:
4375 break;
4376 case DISPATCH_MACH_MSG_DESTRUCTOR_FREE:
4377 free(dmsg->dmsg_msg);
4378 break;
4379 case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE: {
4380 mach_vm_size_t vm_size = dmsg->dmsg_size;
4381 mach_vm_address_t vm_addr = (uintptr_t)dmsg->dmsg_msg;
4382 (void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(),
4383 vm_addr, vm_size));
4384 break;
4385 }}
4386 }
4387
4388 static inline mach_msg_header_t*
4389 _dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg)
4390 {
4391 return dmsg->dmsg_destructor ? dmsg->dmsg_msg :
4392 (mach_msg_header_t*)dmsg->dmsg_buf;
4393 }
4394
4395 mach_msg_header_t*
4396 dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg, size_t *size_ptr)
4397 {
4398 if (size_ptr) {
4399 *size_ptr = dmsg->dmsg_size;
4400 }
4401 return _dispatch_mach_msg_get_msg(dmsg);
4402 }
4403
4404 size_t
4405 _dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg, char* buf, size_t bufsiz)
4406 {
4407 size_t offset = 0;
4408 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4409 dx_kind(dmsg), dmsg);
4410 offset += dsnprintf(&buf[offset], bufsiz - offset, "xrefcnt = 0x%x, "
4411 "refcnt = 0x%x, ", dmsg->do_xref_cnt + 1, dmsg->do_ref_cnt + 1);
4412 offset += dsnprintf(&buf[offset], bufsiz - offset, "opts/err = 0x%x, "
4413 "msgh[%p] = { ", dmsg->do_suspend_cnt, dmsg->dmsg_buf);
4414 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dmsg);
4415 if (hdr->msgh_id) {
4416 offset += dsnprintf(&buf[offset], bufsiz - offset, "id 0x%x, ",
4417 hdr->msgh_id);
4418 }
4419 if (hdr->msgh_size) {
4420 offset += dsnprintf(&buf[offset], bufsiz - offset, "size %u, ",
4421 hdr->msgh_size);
4422 }
4423 if (hdr->msgh_bits) {
4424 offset += dsnprintf(&buf[offset], bufsiz - offset, "bits <l %u, r %u",
4425 MACH_MSGH_BITS_LOCAL(hdr->msgh_bits),
4426 MACH_MSGH_BITS_REMOTE(hdr->msgh_bits));
4427 if (MACH_MSGH_BITS_OTHER(hdr->msgh_bits)) {
4428 offset += dsnprintf(&buf[offset], bufsiz - offset, ", o 0x%x",
4429 MACH_MSGH_BITS_OTHER(hdr->msgh_bits));
4430 }
4431 offset += dsnprintf(&buf[offset], bufsiz - offset, ">, ");
4432 }
4433 if (hdr->msgh_local_port && hdr->msgh_remote_port) {
4434 offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x, "
4435 "remote 0x%x", hdr->msgh_local_port, hdr->msgh_remote_port);
4436 } else if (hdr->msgh_local_port) {
4437 offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x",
4438 hdr->msgh_local_port);
4439 } else if (hdr->msgh_remote_port) {
4440 offset += dsnprintf(&buf[offset], bufsiz - offset, "remote 0x%x",
4441 hdr->msgh_remote_port);
4442 } else {
4443 offset += dsnprintf(&buf[offset], bufsiz - offset, "no ports");
4444 }
4445 offset += dsnprintf(&buf[offset], bufsiz - offset, " } }");
4446 return offset;
4447 }
4448
4449 #pragma mark -
4450 #pragma mark dispatch_mig_server
4451
4452 mach_msg_return_t
4453 dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz,
4454 dispatch_mig_callback_t callback)
4455 {
4456 mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
4457 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
4458 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0) | MACH_RCV_VOUCHER;
4459 mach_msg_options_t tmp_options;
4460 mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
4461 mach_msg_return_t kr = 0;
4462 uint64_t assertion_token = 0;
4463 unsigned int cnt = 1000; // do not stall out serial queues
4464 boolean_t demux_success;
4465 bool received = false;
4466 size_t rcv_size = maxmsgsz + MAX_TRAILER_SIZE;
4467
4468 // XXX FIXME -- allocate these elsewhere
4469 bufRequest = alloca(rcv_size);
4470 bufReply = alloca(rcv_size);
4471 bufReply->Head.msgh_size = 0;
4472 bufRequest->RetCode = 0;
4473
4474 #if DISPATCH_DEBUG
4475 options |= MACH_RCV_LARGE; // rdar://problem/8422992
4476 #endif
4477 tmp_options = options;
4478 // XXX FIXME -- change this to not starve out the target queue
4479 for (;;) {
4480 if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
4481 options &= ~MACH_RCV_MSG;
4482 tmp_options &= ~MACH_RCV_MSG;
4483
4484 if (!(tmp_options & MACH_SEND_MSG)) {
4485 goto out;
4486 }
4487 }
4488 kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
4489 (mach_msg_size_t)rcv_size, (mach_port_t)ds->ds_ident_hack, 0,0);
4490
4491 tmp_options = options;
4492
4493 if (slowpath(kr)) {
4494 switch (kr) {
4495 case MACH_SEND_INVALID_DEST:
4496 case MACH_SEND_TIMED_OUT:
4497 if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
4498 mach_msg_destroy(&bufReply->Head);
4499 }
4500 break;
4501 case MACH_RCV_TIMED_OUT:
4502 // Don't return an error if a message was sent this time or
4503 // a message was successfully received previously
4504 // rdar://problems/7363620&7791738
4505 if(bufReply->Head.msgh_remote_port || received) {
4506 kr = MACH_MSG_SUCCESS;
4507 }
4508 break;
4509 case MACH_RCV_INVALID_NAME:
4510 break;
4511 #if DISPATCH_DEBUG
4512 case MACH_RCV_TOO_LARGE:
4513 // receive messages that are too large and log their id and size
4514 // rdar://problem/8422992
4515 tmp_options &= ~MACH_RCV_LARGE;
4516 size_t large_size = bufReply->Head.msgh_size + MAX_TRAILER_SIZE;
4517 void *large_buf = malloc(large_size);
4518 if (large_buf) {
4519 rcv_size = large_size;
4520 bufReply = large_buf;
4521 }
4522 if (!mach_msg(&bufReply->Head, tmp_options, 0,
4523 (mach_msg_size_t)rcv_size,
4524 (mach_port_t)ds->ds_ident_hack, 0, 0)) {
4525 _dispatch_log("BUG in libdispatch client: "
4526 "dispatch_mig_server received message larger than "
4527 "requested size %zd: id = 0x%x, size = %d",
4528 maxmsgsz, bufReply->Head.msgh_id,
4529 bufReply->Head.msgh_size);
4530 }
4531 if (large_buf) {
4532 free(large_buf);
4533 }
4534 // fall through
4535 #endif
4536 default:
4537 _dispatch_bug_mach_client(
4538 "dispatch_mig_server: mach_msg() failed", kr);
4539 break;
4540 }
4541 goto out;
4542 }
4543
4544 if (!(tmp_options & MACH_RCV_MSG)) {
4545 goto out;
4546 }
4547
4548 if (assertion_token) {
4549 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4550 int r = proc_importance_assertion_complete(assertion_token);
4551 (void)dispatch_assume_zero(r);
4552 #endif
4553 assertion_token = 0;
4554 }
4555 received = true;
4556
4557 bufTemp = bufRequest;
4558 bufRequest = bufReply;
4559 bufReply = bufTemp;
4560
4561 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4562 #pragma clang diagnostic push
4563 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
4564 int r = proc_importance_assertion_begin_with_msg(&bufRequest->Head,
4565 NULL, &assertion_token);
4566 if (r && slowpath(r != EIO)) {
4567 (void)dispatch_assume_zero(r);
4568 }
4569 #pragma clang diagnostic pop
4570 #endif
4571 _voucher_replace(voucher_create_with_mach_msg(&bufRequest->Head));
4572 demux_success = callback(&bufRequest->Head, &bufReply->Head);
4573
4574 if (!demux_success) {
4575 // destroy the request - but not the reply port
4576 bufRequest->Head.msgh_remote_port = 0;
4577 mach_msg_destroy(&bufRequest->Head);
4578 } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
4579 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
4580 // is present
4581 if (slowpath(bufReply->RetCode)) {
4582 if (bufReply->RetCode == MIG_NO_REPLY) {
4583 continue;
4584 }
4585
4586 // destroy the request - but not the reply port
4587 bufRequest->Head.msgh_remote_port = 0;
4588 mach_msg_destroy(&bufRequest->Head);
4589 }
4590 }
4591
4592 if (bufReply->Head.msgh_remote_port) {
4593 tmp_options |= MACH_SEND_MSG;
4594 if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) !=
4595 MACH_MSG_TYPE_MOVE_SEND_ONCE) {
4596 tmp_options |= MACH_SEND_TIMEOUT;
4597 }
4598 }
4599 }
4600
4601 out:
4602 if (assertion_token) {
4603 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4604 int r = proc_importance_assertion_complete(assertion_token);
4605 (void)dispatch_assume_zero(r);
4606 #endif
4607 }
4608
4609 return kr;
4610 }
4611
4612 #endif /* HAVE_MACH */
4613
4614 #pragma mark -
4615 #pragma mark dispatch_source_debug
4616
4617 DISPATCH_NOINLINE
4618 static const char *
4619 _evfiltstr(short filt)
4620 {
4621 switch (filt) {
4622 #define _evfilt2(f) case (f): return #f
4623 _evfilt2(EVFILT_READ);
4624 _evfilt2(EVFILT_WRITE);
4625 _evfilt2(EVFILT_AIO);
4626 _evfilt2(EVFILT_VNODE);
4627 _evfilt2(EVFILT_PROC);
4628 _evfilt2(EVFILT_SIGNAL);
4629 _evfilt2(EVFILT_TIMER);
4630 #if HAVE_MACH
4631 _evfilt2(EVFILT_MACHPORT);
4632 _evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION);
4633 #endif
4634 _evfilt2(EVFILT_FS);
4635 _evfilt2(EVFILT_USER);
4636 #ifdef EVFILT_VM
4637 _evfilt2(EVFILT_VM);
4638 #endif
4639 #ifdef EVFILT_SOCK
4640 _evfilt2(EVFILT_SOCK);
4641 #endif
4642 #ifdef EVFILT_MEMORYSTATUS
4643 _evfilt2(EVFILT_MEMORYSTATUS);
4644 #endif
4645
4646 _evfilt2(DISPATCH_EVFILT_TIMER);
4647 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
4648 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
4649 default:
4650 return "EVFILT_missing";
4651 }
4652 }
4653
4654 #if DISPATCH_DEBUG
4655 static const char *
4656 _evflagstr2(uint16_t *flagsp)
4657 {
4658 #define _evflag2(f) \
4659 if ((*flagsp & (f)) == (f) && (f)) { \
4660 *flagsp &= ~(f); \
4661 return #f "|"; \
4662 }
4663 _evflag2(EV_ADD);
4664 _evflag2(EV_DELETE);
4665 _evflag2(EV_ENABLE);
4666 _evflag2(EV_DISABLE);
4667 _evflag2(EV_ONESHOT);
4668 _evflag2(EV_CLEAR);
4669 _evflag2(EV_RECEIPT);
4670 _evflag2(EV_DISPATCH);
4671 _evflag2(EV_UDATA_SPECIFIC);
4672 _evflag2(EV_POLL);
4673 _evflag2(EV_OOBAND);
4674 _evflag2(EV_ERROR);
4675 _evflag2(EV_EOF);
4676 *flagsp = 0;
4677 return "EV_UNKNOWN ";
4678 }
4679
4680 DISPATCH_NOINLINE
4681 static const char *
4682 _evflagstr(uint16_t flags, char *str, size_t strsize)
4683 {
4684 str[0] = 0;
4685 while (flags) {
4686 strlcat(str, _evflagstr2(&flags), strsize);
4687 }
4688 size_t sz = strlen(str);
4689 if (sz) str[sz-1] = 0;
4690 return str;
4691 }
4692 #endif
4693
4694 static size_t
4695 _dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
4696 {
4697 dispatch_queue_t target = ds->do_targetq;
4698 return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%lx, "
4699 "mask = 0x%lx, pending_data = 0x%lx, registered = %d, "
4700 "armed = %d, deleted = %d%s%s, canceled = %d, needs_mgr = %d, ",
4701 target && target->dq_label ? target->dq_label : "", target,
4702 ds->ds_ident_hack, ds->ds_pending_data_mask, ds->ds_pending_data,
4703 ds->ds_is_installed, (bool)(ds->ds_atomic_flags & DSF_ARMED),
4704 (bool)(ds->ds_atomic_flags & DSF_DELETED), ds->ds_pending_delete ?
4705 " (pending)" : "", (ds->ds_atomic_flags & DSF_ONESHOT) ?
4706 " (oneshot)" : "", (bool)(ds->ds_atomic_flags & DSF_CANCELED),
4707 ds->ds_needs_mgr);
4708 }
4709
4710 static size_t
4711 _dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
4712 {
4713 dispatch_source_refs_t dr = ds->ds_refs;
4714 return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx,"
4715 " last_fire = 0x%llx, interval = 0x%llx, flags = 0x%lx }, ",
4716 ds_timer(dr).target, ds_timer(dr).deadline, ds_timer(dr).last_fire,
4717 ds_timer(dr).interval, ds_timer(dr).flags);
4718 }
4719
4720 size_t
4721 _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
4722 {
4723 size_t offset = 0;
4724 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4725 dx_kind(ds), ds);
4726 offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
4727 offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
4728 if (ds->ds_is_timer) {
4729 offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
4730 }
4731 offset += dsnprintf(&buf[offset], bufsiz - offset, "kevent = %p%s, "
4732 "filter = %s }", ds->ds_dkev, ds->ds_is_direct_kevent ? " (direct)"
4733 : "", ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) :
4734 "????");
4735 return offset;
4736 }
4737
4738 static size_t
4739 _dispatch_mach_debug_attr(dispatch_mach_t dm, char* buf, size_t bufsiz)
4740 {
4741 dispatch_queue_t target = dm->do_targetq;
4742 return dsnprintf(buf, bufsiz, "target = %s[%p], receive = 0x%x, "
4743 "send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, "
4744 "sending = %d, disconnected = %d, canceled = %d ",
4745 target && target->dq_label ? target->dq_label : "", target,
4746 dm->ds_dkev ?(mach_port_t)dm->ds_dkev->dk_kevent.ident:0,
4747 dm->dm_refs->dm_send,
4748 dm->dm_dkev ?(mach_port_t)dm->dm_dkev->dk_kevent.ident:0,
4749 dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) ?
4750 " (armed)" : "", dm->dm_refs->dm_checkin_port,
4751 dm->dm_refs->dm_checkin ? " (pending)" : "",
4752 dm->dm_refs->dm_sending, dm->dm_refs->dm_disconnect_cnt,
4753 (bool)(dm->ds_atomic_flags & DSF_CANCELED));
4754 }
4755 size_t
4756 _dispatch_mach_debug(dispatch_mach_t dm, char* buf, size_t bufsiz)
4757 {
4758 size_t offset = 0;
4759 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4760 dm->dq_label && !dm->dm_cancel_handler_called ? dm->dq_label :
4761 dx_kind(dm), dm);
4762 offset += _dispatch_object_debug_attr(dm, &buf[offset], bufsiz - offset);
4763 offset += _dispatch_mach_debug_attr(dm, &buf[offset], bufsiz - offset);
4764 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
4765 return offset;
4766 }
4767
4768 #if DISPATCH_DEBUG
4769 DISPATCH_NOINLINE
4770 static void
4771 _dispatch_kevent_debug(const _dispatch_kevent_qos_s* kev, const char* str)
4772 {
4773 char flagstr[256];
4774 _dispatch_debug("kevent[%p] = { ident = 0x%llx, filter = %s, "
4775 "flags = %s (0x%x), fflags = 0x%x, data = 0x%llx, udata = 0x%llx, "
4776 "ext[0] = 0x%llx, ext[1] = 0x%llx }: %s", kev, kev->ident,
4777 _evfiltstr(kev->filter), _evflagstr(kev->flags, flagstr,
4778 sizeof(flagstr)), kev->flags, kev->fflags, kev->data, kev->udata,
4779 kev->ext[0], kev->ext[1], str);
4780 }
4781
4782 static void
4783 _dispatch_kevent_debugger2(void *context)
4784 {
4785 struct sockaddr sa;
4786 socklen_t sa_len = sizeof(sa);
4787 int c, fd = (int)(long)context;
4788 unsigned int i;
4789 dispatch_kevent_t dk;
4790 dispatch_source_t ds;
4791 dispatch_source_refs_t dr;
4792 FILE *debug_stream;
4793
4794 c = accept(fd, &sa, &sa_len);
4795 if (c == -1) {
4796 if (errno != EAGAIN) {
4797 (void)dispatch_assume_zero(errno);
4798 }
4799 return;
4800 }
4801 #if 0
4802 int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO
4803 if (r == -1) {
4804 (void)dispatch_assume_zero(errno);
4805 }
4806 #endif
4807 debug_stream = fdopen(c, "a");
4808 if (!dispatch_assume(debug_stream)) {
4809 close(c);
4810 return;
4811 }
4812
4813 fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
4814 fprintf(debug_stream, "Content-type: text/html\r\n");
4815 fprintf(debug_stream, "Pragma: nocache\r\n");
4816 fprintf(debug_stream, "\r\n");
4817 fprintf(debug_stream, "<html>\n");
4818 fprintf(debug_stream, "<head><title>PID %u</title></head>\n", getpid());
4819 fprintf(debug_stream, "<body>\n<ul>\n");
4820
4821 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td>"
4822 // "<td>DK</td><td>DK</td><td>DK</td></tr>\n");
4823
4824 for (i = 0; i < DSL_HASH_SIZE; i++) {
4825 if (TAILQ_EMPTY(&_dispatch_sources[i])) {
4826 continue;
4827 }
4828 TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
4829 fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags "
4830 "0x%hx fflags 0x%x data 0x%lx udata %p\n",
4831 dk, (unsigned long)dk->dk_kevent.ident,
4832 _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
4833 dk->dk_kevent.fflags, (unsigned long)dk->dk_kevent.data,
4834 (void*)dk->dk_kevent.udata);
4835 fprintf(debug_stream, "\t\t<ul>\n");
4836 TAILQ_FOREACH(dr, &dk->dk_sources, dr_list) {
4837 ds = _dispatch_source_from_refs(dr);
4838 fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend "
4839 "0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
4840 ds, ds->do_ref_cnt + 1, ds->do_suspend_cnt,
4841 ds->ds_pending_data, ds->ds_pending_data_mask,
4842 ds->ds_atomic_flags);
4843 if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
4844 dispatch_queue_t dq = ds->do_targetq;
4845 fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend "
4846 "0x%x label: %s\n", dq, dq->do_ref_cnt + 1,
4847 dq->do_suspend_cnt, dq->dq_label ? dq->dq_label:"");
4848 }
4849 }
4850 fprintf(debug_stream, "\t\t</ul>\n");
4851 fprintf(debug_stream, "\t</li>\n");
4852 }
4853 }
4854 fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
4855 fflush(debug_stream);
4856 fclose(debug_stream);
4857 }
4858
4859 static void
4860 _dispatch_kevent_debugger2_cancel(void *context)
4861 {
4862 int ret, fd = (int)(long)context;
4863
4864 ret = close(fd);
4865 if (ret != -1) {
4866 (void)dispatch_assume_zero(errno);
4867 }
4868 }
4869
4870 static void
4871 _dispatch_kevent_debugger(void *context DISPATCH_UNUSED)
4872 {
4873 union {
4874 struct sockaddr_in sa_in;
4875 struct sockaddr sa;
4876 } sa_u = {
4877 .sa_in = {
4878 .sin_family = AF_INET,
4879 .sin_addr = { htonl(INADDR_LOOPBACK), },
4880 },
4881 };
4882 dispatch_source_t ds;
4883 const char *valstr;
4884 int val, r, fd, sock_opt = 1;
4885 socklen_t slen = sizeof(sa_u);
4886
4887 if (issetugid()) {
4888 return;
4889 }
4890 valstr = getenv("LIBDISPATCH_DEBUGGER");
4891 if (!valstr) {
4892 return;
4893 }
4894 val = atoi(valstr);
4895 if (val == 2) {
4896 sa_u.sa_in.sin_addr.s_addr = 0;
4897 }
4898 fd = socket(PF_INET, SOCK_STREAM, 0);
4899 if (fd == -1) {
4900 (void)dispatch_assume_zero(errno);
4901 return;
4902 }
4903 r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt,
4904 (socklen_t) sizeof sock_opt);
4905 if (r == -1) {
4906 (void)dispatch_assume_zero(errno);
4907 goto out_bad;
4908 }
4909 #if 0
4910 r = fcntl(fd, F_SETFL, O_NONBLOCK);
4911 if (r == -1) {
4912 (void)dispatch_assume_zero(errno);
4913 goto out_bad;
4914 }
4915 #endif
4916 r = bind(fd, &sa_u.sa, sizeof(sa_u));
4917 if (r == -1) {
4918 (void)dispatch_assume_zero(errno);
4919 goto out_bad;
4920 }
4921 r = listen(fd, SOMAXCONN);
4922 if (r == -1) {
4923 (void)dispatch_assume_zero(errno);
4924 goto out_bad;
4925 }
4926 r = getsockname(fd, &sa_u.sa, &slen);
4927 if (r == -1) {
4928 (void)dispatch_assume_zero(errno);
4929 goto out_bad;
4930 }
4931
4932 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, (uintptr_t)fd, 0,
4933 &_dispatch_mgr_q);
4934 if (dispatch_assume(ds)) {
4935 _dispatch_log("LIBDISPATCH: debug port: %hu",
4936 (in_port_t)ntohs(sa_u.sa_in.sin_port));
4937
4938 /* ownership of fd transfers to ds */
4939 dispatch_set_context(ds, (void *)(long)fd);
4940 dispatch_source_set_event_handler_f(ds, _dispatch_kevent_debugger2);
4941 dispatch_source_set_cancel_handler_f(ds,
4942 _dispatch_kevent_debugger2_cancel);
4943 dispatch_resume(ds);
4944
4945 return;
4946 }
4947 out_bad:
4948 close(fd);
4949 }
4950
4951 #if HAVE_MACH
4952
4953 #ifndef MACH_PORT_TYPE_SPREQUEST
4954 #define MACH_PORT_TYPE_SPREQUEST 0x40000000
4955 #endif
4956
4957 DISPATCH_NOINLINE
4958 void
4959 dispatch_debug_machport(mach_port_t name, const char* str)
4960 {
4961 mach_port_type_t type;
4962 mach_msg_bits_t ns = 0, nr = 0, nso = 0, nd = 0;
4963 unsigned int dnreqs = 0, dnrsiz;
4964 kern_return_t kr = mach_port_type(mach_task_self(), name, &type);
4965 if (kr) {
4966 _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name,
4967 kr, mach_error_string(kr), str);
4968 return;
4969 }
4970 if (type & MACH_PORT_TYPE_SEND) {
4971 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4972 MACH_PORT_RIGHT_SEND, &ns));
4973 }
4974 if (type & MACH_PORT_TYPE_SEND_ONCE) {
4975 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4976 MACH_PORT_RIGHT_SEND_ONCE, &nso));
4977 }
4978 if (type & MACH_PORT_TYPE_DEAD_NAME) {
4979 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4980 MACH_PORT_RIGHT_DEAD_NAME, &nd));
4981 }
4982 if (type & (MACH_PORT_TYPE_RECEIVE|MACH_PORT_TYPE_SEND)) {
4983 kr = mach_port_dnrequest_info(mach_task_self(), name, &dnrsiz, &dnreqs);
4984 if (kr != KERN_INVALID_RIGHT) (void)dispatch_assume_zero(kr);
4985 }
4986 if (type & MACH_PORT_TYPE_RECEIVE) {
4987 mach_port_status_t status = { .mps_pset = 0, };
4988 mach_msg_type_number_t cnt = MACH_PORT_RECEIVE_STATUS_COUNT;
4989 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4990 MACH_PORT_RIGHT_RECEIVE, &nr));
4991 (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
4992 name, MACH_PORT_RECEIVE_STATUS, (void*)&status, &cnt));
4993 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4994 "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
4995 "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
4996 "seqno(%03u) }: %s", name, nr, ns, nso, nd, dnreqs,
4997 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N",
4998 status.mps_nsrequest ? "Y":"N", status.mps_pdrequest ? "Y":"N",
4999 status.mps_srights ? "Y":"N", status.mps_sorights,
5000 status.mps_qlimit, status.mps_msgcount, status.mps_mscount,
5001 status.mps_seqno, str);
5002 } else if (type & (MACH_PORT_TYPE_SEND|MACH_PORT_TYPE_SEND_ONCE|
5003 MACH_PORT_TYPE_DEAD_NAME)) {
5004 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
5005 "dnreqs(%03u) spreq(%s) }: %s", name, nr, ns, nso, nd, dnreqs,
5006 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", str);
5007 } else {
5008 _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name, type,
5009 str);
5010 }
5011 }
5012
5013 #endif // HAVE_MACH
5014
5015 #endif // DISPATCH_DEBUG