]> git.saurik.com Git - apple/libdispatch.git/blob - src/source.c
libdispatch-339.92.1.tar.gz
[apple/libdispatch.git] / src / source.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #include "protocolServer.h"
25 #endif
26 #include <sys/mount.h>
27
28 static void _dispatch_source_merge_kevent(dispatch_source_t ds,
29 const struct kevent64_s *ke);
30 static bool _dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp);
31 static void _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg);
32 static bool _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
33 uint32_t del_flags);
34 static void _dispatch_kevent_drain(struct kevent64_s *ke);
35 static void _dispatch_kevent_merge(struct kevent64_s *ke);
36 static void _dispatch_timers_kevent(struct kevent64_s *ke);
37 static void _dispatch_timers_unregister(dispatch_source_t ds,
38 dispatch_kevent_t dk);
39 static void _dispatch_timers_update(dispatch_source_t ds);
40 static void _dispatch_timer_aggregates_check(void);
41 static void _dispatch_timer_aggregates_register(dispatch_source_t ds);
42 static void _dispatch_timer_aggregates_update(dispatch_source_t ds,
43 unsigned int tidx);
44 static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds,
45 unsigned int tidx);
46 static inline unsigned long _dispatch_source_timer_data(
47 dispatch_source_refs_t dr, unsigned long prev);
48 static long _dispatch_kq_update(const struct kevent64_s *);
49 static void _dispatch_memorystatus_init(void);
50 #if HAVE_MACH
51 static void _dispatch_mach_host_calendar_change_register(void);
52 static void _dispatch_mach_recv_msg_buf_init(void);
53 static kern_return_t _dispatch_kevent_machport_resume(dispatch_kevent_t dk,
54 uint32_t new_flags, uint32_t del_flags);
55 static kern_return_t _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk,
56 uint32_t new_flags, uint32_t del_flags);
57 static inline void _dispatch_kevent_mach_portset(struct kevent64_s *ke);
58 #else
59 static inline void _dispatch_mach_host_calendar_change_register(void) {}
60 static inline void _dispatch_mach_recv_msg_buf_init(void) {}
61 #endif
62 static const char * _evfiltstr(short filt);
63 #if DISPATCH_DEBUG
64 static void _dispatch_kevent_debug(struct kevent64_s* kev, const char* str);
65 static void _dispatch_kevent_debugger(void *context);
66 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \
67 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q)
68 #else
69 static inline void
70 _dispatch_kevent_debug(struct kevent64_s* kev DISPATCH_UNUSED,
71 const char* str DISPATCH_UNUSED) {}
72 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE()
73 #endif
74
75 #pragma mark -
76 #pragma mark dispatch_source_t
77
78 dispatch_source_t
79 dispatch_source_create(dispatch_source_type_t type,
80 uintptr_t handle,
81 unsigned long mask,
82 dispatch_queue_t q)
83 {
84 const struct kevent64_s *proto_kev = &type->ke;
85 dispatch_source_t ds;
86 dispatch_kevent_t dk;
87
88 // input validation
89 if (type == NULL || (mask & ~type->mask)) {
90 return NULL;
91 }
92
93 switch (type->ke.filter) {
94 case EVFILT_SIGNAL:
95 if (handle >= NSIG) {
96 return NULL;
97 }
98 break;
99 case EVFILT_FS:
100 #if DISPATCH_USE_VM_PRESSURE
101 case EVFILT_VM:
102 #endif
103 #if DISPATCH_USE_MEMORYSTATUS
104 case EVFILT_MEMORYSTATUS:
105 #endif
106 case DISPATCH_EVFILT_CUSTOM_ADD:
107 case DISPATCH_EVFILT_CUSTOM_OR:
108 if (handle) {
109 return NULL;
110 }
111 break;
112 case DISPATCH_EVFILT_TIMER:
113 if (!!handle ^ !!type->ke.ident) {
114 return NULL;
115 }
116 break;
117 default:
118 break;
119 }
120
121 ds = _dispatch_alloc(DISPATCH_VTABLE(source),
122 sizeof(struct dispatch_source_s));
123 // Initialize as a queue first, then override some settings below.
124 _dispatch_queue_init((dispatch_queue_t)ds);
125 ds->dq_label = "source";
126
127 ds->do_ref_cnt++; // the reference the manager queue holds
128 ds->do_ref_cnt++; // since source is created suspended
129 ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
130 // The initial target queue is the manager queue, in order to get
131 // the source installed. <rdar://problem/8928171>
132 ds->do_targetq = &_dispatch_mgr_q;
133
134 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
135 dk->dk_kevent = *proto_kev;
136 dk->dk_kevent.ident = handle;
137 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
138 dk->dk_kevent.fflags |= (uint32_t)mask;
139 dk->dk_kevent.udata = (uintptr_t)dk;
140 TAILQ_INIT(&dk->dk_sources);
141
142 ds->ds_dkev = dk;
143 ds->ds_pending_data_mask = dk->dk_kevent.fflags;
144 ds->ds_ident_hack = (uintptr_t)dk->dk_kevent.ident;
145 if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
146 ds->ds_is_level = true;
147 ds->ds_needs_rearm = true;
148 } else if (!(EV_CLEAR & proto_kev->flags)) {
149 // we cheat and use EV_CLEAR to mean a "flag thingy"
150 ds->ds_is_adder = true;
151 }
152 // Some sources require special processing
153 if (type->init != NULL) {
154 type->init(ds, type, handle, mask, q);
155 }
156 dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
157
158 if (fastpath(!ds->ds_refs)) {
159 ds->ds_refs = _dispatch_calloc(1ul,
160 sizeof(struct dispatch_source_refs_s));
161 }
162 ds->ds_refs->dr_source_wref = _dispatch_ptr2wref(ds);
163
164 // First item on the queue sets the user-specified target queue
165 dispatch_set_target_queue(ds, q);
166 _dispatch_object_debug(ds, "%s", __func__);
167 return ds;
168 }
169
170 void
171 _dispatch_source_dispose(dispatch_source_t ds)
172 {
173 _dispatch_object_debug(ds, "%s", __func__);
174 free(ds->ds_refs);
175 _dispatch_queue_destroy(ds);
176 }
177
178 void
179 _dispatch_source_xref_dispose(dispatch_source_t ds)
180 {
181 _dispatch_wakeup(ds);
182 }
183
184 void
185 dispatch_source_cancel(dispatch_source_t ds)
186 {
187 _dispatch_object_debug(ds, "%s", __func__);
188 // Right after we set the cancel flag, someone else
189 // could potentially invoke the source, do the cancelation,
190 // unregister the source, and deallocate it. We would
191 // need to therefore retain/release before setting the bit
192
193 _dispatch_retain(ds);
194 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_CANCELED, relaxed);
195 _dispatch_wakeup(ds);
196 _dispatch_release(ds);
197 }
198
199 long
200 dispatch_source_testcancel(dispatch_source_t ds)
201 {
202 return (bool)(ds->ds_atomic_flags & DSF_CANCELED);
203 }
204
205
206 unsigned long
207 dispatch_source_get_mask(dispatch_source_t ds)
208 {
209 return ds->ds_pending_data_mask;
210 }
211
212 uintptr_t
213 dispatch_source_get_handle(dispatch_source_t ds)
214 {
215 return (unsigned int)ds->ds_ident_hack;
216 }
217
218 unsigned long
219 dispatch_source_get_data(dispatch_source_t ds)
220 {
221 return ds->ds_data;
222 }
223
224 void
225 dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
226 {
227 struct kevent64_s kev = {
228 .fflags = (typeof(kev.fflags))val,
229 .data = (typeof(kev.data))val,
230 };
231
232 dispatch_assert(
233 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
234 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
235
236 _dispatch_source_merge_kevent(ds, &kev);
237 }
238
239 #pragma mark -
240 #pragma mark dispatch_source_handler
241
242 #ifdef __BLOCKS__
243 // 6618342 Contact the team that owns the Instrument DTrace probe before
244 // renaming this symbol
245 static void
246 _dispatch_source_set_event_handler2(void *context)
247 {
248 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
249 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
250 dispatch_source_refs_t dr = ds->ds_refs;
251
252 if (ds->ds_handler_is_block && dr->ds_handler_ctxt) {
253 Block_release(dr->ds_handler_ctxt);
254 }
255 dr->ds_handler_func = context ? _dispatch_Block_invoke(context) : NULL;
256 dr->ds_handler_ctxt = context;
257 ds->ds_handler_is_block = true;
258 }
259
260 void
261 dispatch_source_set_event_handler(dispatch_source_t ds,
262 dispatch_block_t handler)
263 {
264 handler = _dispatch_Block_copy(handler);
265 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler,
266 _dispatch_source_set_event_handler2);
267 }
268 #endif /* __BLOCKS__ */
269
270 static void
271 _dispatch_source_set_event_handler_f(void *context)
272 {
273 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
274 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
275 dispatch_source_refs_t dr = ds->ds_refs;
276
277 #ifdef __BLOCKS__
278 if (ds->ds_handler_is_block && dr->ds_handler_ctxt) {
279 Block_release(dr->ds_handler_ctxt);
280 }
281 #endif
282 dr->ds_handler_func = context;
283 dr->ds_handler_ctxt = ds->do_ctxt;
284 ds->ds_handler_is_block = false;
285 }
286
287 void
288 dispatch_source_set_event_handler_f(dispatch_source_t ds,
289 dispatch_function_t handler)
290 {
291 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler,
292 _dispatch_source_set_event_handler_f);
293 }
294
295 #ifdef __BLOCKS__
296 // 6618342 Contact the team that owns the Instrument DTrace probe before
297 // renaming this symbol
298 static void
299 _dispatch_source_set_cancel_handler2(void *context)
300 {
301 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
302 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
303 dispatch_source_refs_t dr = ds->ds_refs;
304
305 if (ds->ds_cancel_is_block && dr->ds_cancel_handler) {
306 Block_release(dr->ds_cancel_handler);
307 }
308 dr->ds_cancel_handler = context;
309 ds->ds_cancel_is_block = true;
310 }
311
312 void
313 dispatch_source_set_cancel_handler(dispatch_source_t ds,
314 dispatch_block_t handler)
315 {
316 handler = _dispatch_Block_copy(handler);
317 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler,
318 _dispatch_source_set_cancel_handler2);
319 }
320 #endif /* __BLOCKS__ */
321
322 static void
323 _dispatch_source_set_cancel_handler_f(void *context)
324 {
325 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
326 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
327 dispatch_source_refs_t dr = ds->ds_refs;
328
329 #ifdef __BLOCKS__
330 if (ds->ds_cancel_is_block && dr->ds_cancel_handler) {
331 Block_release(dr->ds_cancel_handler);
332 }
333 #endif
334 dr->ds_cancel_handler = context;
335 ds->ds_cancel_is_block = false;
336 }
337
338 void
339 dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
340 dispatch_function_t handler)
341 {
342 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler,
343 _dispatch_source_set_cancel_handler_f);
344 }
345
346 #ifdef __BLOCKS__
347 static void
348 _dispatch_source_set_registration_handler2(void *context)
349 {
350 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
351 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
352 dispatch_source_refs_t dr = ds->ds_refs;
353
354 if (ds->ds_registration_is_block && dr->ds_registration_handler) {
355 Block_release(dr->ds_registration_handler);
356 }
357 dr->ds_registration_handler = context;
358 ds->ds_registration_is_block = true;
359 }
360
361 void
362 dispatch_source_set_registration_handler(dispatch_source_t ds,
363 dispatch_block_t handler)
364 {
365 handler = _dispatch_Block_copy(handler);
366 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler,
367 _dispatch_source_set_registration_handler2);
368 }
369 #endif /* __BLOCKS__ */
370
371 static void
372 _dispatch_source_set_registration_handler_f(void *context)
373 {
374 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
375 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
376 dispatch_source_refs_t dr = ds->ds_refs;
377
378 #ifdef __BLOCKS__
379 if (ds->ds_registration_is_block && dr->ds_registration_handler) {
380 Block_release(dr->ds_registration_handler);
381 }
382 #endif
383 dr->ds_registration_handler = context;
384 ds->ds_registration_is_block = false;
385 }
386
387 void
388 dispatch_source_set_registration_handler_f(dispatch_source_t ds,
389 dispatch_function_t handler)
390 {
391 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler,
392 _dispatch_source_set_registration_handler_f);
393 }
394
395 #pragma mark -
396 #pragma mark dispatch_source_invoke
397
398 static void
399 _dispatch_source_registration_callout(dispatch_source_t ds)
400 {
401 dispatch_source_refs_t dr = ds->ds_refs;
402
403 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
404 // no registration callout if source is canceled rdar://problem/8955246
405 #ifdef __BLOCKS__
406 if (ds->ds_registration_is_block) {
407 Block_release(dr->ds_registration_handler);
408 }
409 } else if (ds->ds_registration_is_block) {
410 dispatch_block_t b = dr->ds_registration_handler;
411 _dispatch_client_callout_block(b);
412 Block_release(dr->ds_registration_handler);
413 #endif
414 } else {
415 dispatch_function_t f = dr->ds_registration_handler;
416 _dispatch_client_callout(ds->do_ctxt, f);
417 }
418 ds->ds_registration_is_block = false;
419 dr->ds_registration_handler = NULL;
420 }
421
422 static void
423 _dispatch_source_cancel_callout(dispatch_source_t ds)
424 {
425 dispatch_source_refs_t dr = ds->ds_refs;
426
427 ds->ds_pending_data_mask = 0;
428 ds->ds_pending_data = 0;
429 ds->ds_data = 0;
430
431 #ifdef __BLOCKS__
432 if (ds->ds_handler_is_block) {
433 Block_release(dr->ds_handler_ctxt);
434 ds->ds_handler_is_block = false;
435 dr->ds_handler_func = NULL;
436 dr->ds_handler_ctxt = NULL;
437 }
438 if (ds->ds_registration_is_block) {
439 Block_release(dr->ds_registration_handler);
440 ds->ds_registration_is_block = false;
441 dr->ds_registration_handler = NULL;
442 }
443 #endif
444
445 if (!dr->ds_cancel_handler) {
446 return;
447 }
448 if (ds->ds_cancel_is_block) {
449 #ifdef __BLOCKS__
450 dispatch_block_t b = dr->ds_cancel_handler;
451 if (ds->ds_atomic_flags & DSF_CANCELED) {
452 _dispatch_client_callout_block(b);
453 }
454 Block_release(dr->ds_cancel_handler);
455 ds->ds_cancel_is_block = false;
456 #endif
457 } else {
458 dispatch_function_t f = dr->ds_cancel_handler;
459 if (ds->ds_atomic_flags & DSF_CANCELED) {
460 _dispatch_client_callout(ds->do_ctxt, f);
461 }
462 }
463 dr->ds_cancel_handler = NULL;
464 }
465
466 static void
467 _dispatch_source_latch_and_call(dispatch_source_t ds)
468 {
469 unsigned long prev;
470
471 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
472 return;
473 }
474 dispatch_source_refs_t dr = ds->ds_refs;
475 prev = dispatch_atomic_xchg2o(ds, ds_pending_data, 0, relaxed);
476 if (ds->ds_is_level) {
477 ds->ds_data = ~prev;
478 } else if (ds->ds_is_timer && ds_timer(dr).target && prev) {
479 ds->ds_data = _dispatch_source_timer_data(dr, prev);
480 } else {
481 ds->ds_data = prev;
482 }
483 if (dispatch_assume(prev) && dr->ds_handler_func) {
484 _dispatch_client_callout(dr->ds_handler_ctxt, dr->ds_handler_func);
485 }
486 }
487
488 static void
489 _dispatch_source_kevent_unregister(dispatch_source_t ds)
490 {
491 _dispatch_object_debug(ds, "%s", __func__);
492 dispatch_kevent_t dk = ds->ds_dkev;
493 ds->ds_dkev = NULL;
494 switch (dk->dk_kevent.filter) {
495 case DISPATCH_EVFILT_TIMER:
496 _dispatch_timers_unregister(ds, dk);
497 break;
498 default:
499 TAILQ_REMOVE(&dk->dk_sources, ds->ds_refs, dr_list);
500 _dispatch_kevent_unregister(dk, (uint32_t)ds->ds_pending_data_mask);
501 break;
502 }
503
504 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
505 ds->ds_needs_rearm = false; // re-arm is pointless and bad now
506 _dispatch_release(ds); // the retain is done at creation time
507 }
508
509 static void
510 _dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags)
511 {
512 switch (ds->ds_dkev->dk_kevent.filter) {
513 case DISPATCH_EVFILT_TIMER:
514 return _dispatch_timers_update(ds);
515 case EVFILT_MACHPORT:
516 if (ds->ds_pending_data_mask & DISPATCH_MACH_RECV_MESSAGE) {
517 new_flags |= DISPATCH_MACH_RECV_MESSAGE; // emulate EV_DISPATCH
518 }
519 break;
520 }
521 if (_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0)) {
522 _dispatch_source_kevent_unregister(ds);
523 }
524 }
525
526 static void
527 _dispatch_source_kevent_register(dispatch_source_t ds)
528 {
529 dispatch_assert_zero(ds->ds_is_installed);
530 switch (ds->ds_dkev->dk_kevent.filter) {
531 case DISPATCH_EVFILT_TIMER:
532 return _dispatch_timers_update(ds);
533 }
534 uint32_t flags;
535 bool do_resume = _dispatch_kevent_register(&ds->ds_dkev, &flags);
536 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds->ds_refs, dr_list);
537 if (do_resume || ds->ds_needs_rearm) {
538 _dispatch_source_kevent_resume(ds, flags);
539 }
540 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
541 _dispatch_object_debug(ds, "%s", __func__);
542 }
543
544 DISPATCH_ALWAYS_INLINE
545 static inline dispatch_queue_t
546 _dispatch_source_invoke2(dispatch_object_t dou,
547 _dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED)
548 {
549 dispatch_source_t ds = dou._ds;
550 if (slowpath(_dispatch_queue_drain(ds))) {
551 DISPATCH_CLIENT_CRASH("Sync onto source");
552 }
553
554 // This function performs all source actions. Each action is responsible
555 // for verifying that it takes place on the appropriate queue. If the
556 // current queue is not the correct queue for this action, the correct queue
557 // will be returned and the invoke will be re-driven on that queue.
558
559 // The order of tests here in invoke and in probe should be consistent.
560
561 dispatch_queue_t dq = _dispatch_queue_get_current();
562 dispatch_source_refs_t dr = ds->ds_refs;
563
564 if (!ds->ds_is_installed) {
565 // The source needs to be installed on the manager queue.
566 if (dq != &_dispatch_mgr_q) {
567 return &_dispatch_mgr_q;
568 }
569 _dispatch_source_kevent_register(ds);
570 ds->ds_is_installed = true;
571 if (dr->ds_registration_handler) {
572 return ds->do_targetq;
573 }
574 if (slowpath(ds->do_xref_cnt == -1)) {
575 return &_dispatch_mgr_q; // rdar://problem/9558246
576 }
577 } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
578 // Source suspended by an item drained from the source queue.
579 return NULL;
580 } else if (dr->ds_registration_handler) {
581 // The source has been registered and the registration handler needs
582 // to be delivered on the target queue.
583 if (dq != ds->do_targetq) {
584 return ds->do_targetq;
585 }
586 // clears ds_registration_handler
587 _dispatch_source_registration_callout(ds);
588 if (slowpath(ds->do_xref_cnt == -1)) {
589 return &_dispatch_mgr_q; // rdar://problem/9558246
590 }
591 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
592 // The source has been cancelled and needs to be uninstalled from the
593 // manager queue. After uninstallation, the cancellation handler needs
594 // to be delivered to the target queue.
595 if (ds->ds_dkev) {
596 if (dq != &_dispatch_mgr_q) {
597 return &_dispatch_mgr_q;
598 }
599 _dispatch_source_kevent_unregister(ds);
600 }
601 if (dr->ds_cancel_handler || ds->ds_handler_is_block ||
602 ds->ds_registration_is_block) {
603 if (dq != ds->do_targetq) {
604 return ds->do_targetq;
605 }
606 }
607 _dispatch_source_cancel_callout(ds);
608 } else if (ds->ds_pending_data) {
609 // The source has pending data to deliver via the event handler callback
610 // on the target queue. Some sources need to be rearmed on the manager
611 // queue after event delivery.
612 if (dq != ds->do_targetq) {
613 return ds->do_targetq;
614 }
615 _dispatch_source_latch_and_call(ds);
616 if (ds->ds_needs_rearm) {
617 return &_dispatch_mgr_q;
618 }
619 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
620 // The source needs to be rearmed on the manager queue.
621 if (dq != &_dispatch_mgr_q) {
622 return &_dispatch_mgr_q;
623 }
624 _dispatch_source_kevent_resume(ds, 0);
625 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
626 }
627
628 return NULL;
629 }
630
631 DISPATCH_NOINLINE
632 void
633 _dispatch_source_invoke(dispatch_source_t ds)
634 {
635 _dispatch_queue_class_invoke(ds, _dispatch_source_invoke2);
636 }
637
638 unsigned long
639 _dispatch_source_probe(dispatch_source_t ds)
640 {
641 // This function determines whether the source needs to be invoked.
642 // The order of tests here in probe and in invoke should be consistent.
643
644 dispatch_source_refs_t dr = ds->ds_refs;
645 if (!ds->ds_is_installed) {
646 // The source needs to be installed on the manager queue.
647 return true;
648 } else if (dr->ds_registration_handler) {
649 // The registration handler needs to be delivered to the target queue.
650 return true;
651 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
652 // The source needs to be uninstalled from the manager queue, or the
653 // cancellation handler needs to be delivered to the target queue.
654 // Note: cancellation assumes installation.
655 if (ds->ds_dkev || dr->ds_cancel_handler
656 #ifdef __BLOCKS__
657 || ds->ds_handler_is_block || ds->ds_registration_is_block
658 #endif
659 ) {
660 return true;
661 }
662 } else if (ds->ds_pending_data) {
663 // The source has pending data to deliver to the target queue.
664 return true;
665 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
666 // The source needs to be rearmed on the manager queue.
667 return true;
668 }
669 return (ds->dq_items_tail != NULL);
670 }
671
672 static void
673 _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent64_s *ke)
674 {
675 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
676 return;
677 }
678 if (ds->ds_is_level) {
679 // ke->data is signed and "negative available data" makes no sense
680 // zero bytes happens when EV_EOF is set
681 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
682 dispatch_assert(ke->data >= 0l);
683 dispatch_atomic_store2o(ds, ds_pending_data, ~(unsigned long)ke->data,
684 relaxed);
685 } else if (ds->ds_is_adder) {
686 (void)dispatch_atomic_add2o(ds, ds_pending_data,
687 (unsigned long)ke->data, relaxed);
688 } else if (ke->fflags & ds->ds_pending_data_mask) {
689 (void)dispatch_atomic_or2o(ds, ds_pending_data,
690 ke->fflags & ds->ds_pending_data_mask, relaxed);
691 }
692 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
693 if (ds->ds_needs_rearm) {
694 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
695 }
696
697 _dispatch_wakeup(ds);
698 }
699
700 #pragma mark -
701 #pragma mark dispatch_kevent_t
702
703 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
704 static void _dispatch_kevent_guard(dispatch_kevent_t dk);
705 static void _dispatch_kevent_unguard(dispatch_kevent_t dk);
706 #else
707 static inline void _dispatch_kevent_guard(dispatch_kevent_t dk) { (void)dk; }
708 static inline void _dispatch_kevent_unguard(dispatch_kevent_t dk) { (void)dk; }
709 #endif
710
711 static struct dispatch_kevent_s _dispatch_kevent_data_or = {
712 .dk_kevent = {
713 .filter = DISPATCH_EVFILT_CUSTOM_OR,
714 .flags = EV_CLEAR,
715 },
716 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
717 };
718 static struct dispatch_kevent_s _dispatch_kevent_data_add = {
719 .dk_kevent = {
720 .filter = DISPATCH_EVFILT_CUSTOM_ADD,
721 },
722 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
723 };
724
725 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
726
727 DISPATCH_CACHELINE_ALIGN
728 static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
729
730 static void
731 _dispatch_kevent_init()
732 {
733 unsigned int i;
734 for (i = 0; i < DSL_HASH_SIZE; i++) {
735 TAILQ_INIT(&_dispatch_sources[i]);
736 }
737
738 TAILQ_INSERT_TAIL(&_dispatch_sources[0],
739 &_dispatch_kevent_data_or, dk_list);
740 TAILQ_INSERT_TAIL(&_dispatch_sources[0],
741 &_dispatch_kevent_data_add, dk_list);
742 _dispatch_kevent_data_or.dk_kevent.udata =
743 (uintptr_t)&_dispatch_kevent_data_or;
744 _dispatch_kevent_data_add.dk_kevent.udata =
745 (uintptr_t)&_dispatch_kevent_data_add;
746 }
747
748 static inline uintptr_t
749 _dispatch_kevent_hash(uint64_t ident, short filter)
750 {
751 uint64_t value;
752 #if HAVE_MACH
753 value = (filter == EVFILT_MACHPORT ||
754 filter == DISPATCH_EVFILT_MACH_NOTIFICATION ?
755 MACH_PORT_INDEX(ident) : ident);
756 #else
757 value = ident;
758 #endif
759 return DSL_HASH((uintptr_t)value);
760 }
761
762 static dispatch_kevent_t
763 _dispatch_kevent_find(uint64_t ident, short filter)
764 {
765 uintptr_t hash = _dispatch_kevent_hash(ident, filter);
766 dispatch_kevent_t dki;
767
768 TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
769 if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
770 break;
771 }
772 }
773 return dki;
774 }
775
776 static void
777 _dispatch_kevent_insert(dispatch_kevent_t dk)
778 {
779 _dispatch_kevent_guard(dk);
780 uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
781 dk->dk_kevent.filter);
782 TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
783 }
784
785 // Find existing kevents, and merge any new flags if necessary
786 static bool
787 _dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp)
788 {
789 dispatch_kevent_t dk, ds_dkev = *dkp;
790 uint32_t new_flags;
791 bool do_resume = false;
792
793 dk = _dispatch_kevent_find(ds_dkev->dk_kevent.ident,
794 ds_dkev->dk_kevent.filter);
795 if (dk) {
796 // If an existing dispatch kevent is found, check to see if new flags
797 // need to be added to the existing kevent
798 new_flags = ~dk->dk_kevent.fflags & ds_dkev->dk_kevent.fflags;
799 dk->dk_kevent.fflags |= ds_dkev->dk_kevent.fflags;
800 free(ds_dkev);
801 *dkp = dk;
802 do_resume = new_flags;
803 } else {
804 dk = ds_dkev;
805 _dispatch_kevent_insert(dk);
806 new_flags = dk->dk_kevent.fflags;
807 do_resume = true;
808 }
809 // Re-register the kevent with the kernel if new flags were added
810 // by the dispatch kevent
811 if (do_resume) {
812 dk->dk_kevent.flags |= EV_ADD;
813 }
814 *flgp = new_flags;
815 return do_resume;
816 }
817
818 static bool
819 _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
820 uint32_t del_flags)
821 {
822 long r;
823 switch (dk->dk_kevent.filter) {
824 case DISPATCH_EVFILT_TIMER:
825 case DISPATCH_EVFILT_CUSTOM_ADD:
826 case DISPATCH_EVFILT_CUSTOM_OR:
827 // these types not registered with kevent
828 return 0;
829 #if HAVE_MACH
830 case EVFILT_MACHPORT:
831 return _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
832 case DISPATCH_EVFILT_MACH_NOTIFICATION:
833 return _dispatch_kevent_mach_notify_resume(dk, new_flags, del_flags);
834 #endif
835 case EVFILT_PROC:
836 if (dk->dk_kevent.flags & EV_ONESHOT) {
837 return 0;
838 }
839 // fall through
840 default:
841 r = _dispatch_kq_update(&dk->dk_kevent);
842 if (dk->dk_kevent.flags & EV_DISPATCH) {
843 dk->dk_kevent.flags &= ~EV_ADD;
844 }
845 return r;
846 }
847 }
848
849 static void
850 _dispatch_kevent_dispose(dispatch_kevent_t dk)
851 {
852 uintptr_t hash;
853
854 switch (dk->dk_kevent.filter) {
855 case DISPATCH_EVFILT_TIMER:
856 case DISPATCH_EVFILT_CUSTOM_ADD:
857 case DISPATCH_EVFILT_CUSTOM_OR:
858 // these sources live on statically allocated lists
859 return;
860 #if HAVE_MACH
861 case EVFILT_MACHPORT:
862 _dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
863 break;
864 case DISPATCH_EVFILT_MACH_NOTIFICATION:
865 _dispatch_kevent_mach_notify_resume(dk, 0, dk->dk_kevent.fflags);
866 break;
867 #endif
868 case EVFILT_PROC:
869 if (dk->dk_kevent.flags & EV_ONESHOT) {
870 break; // implicitly deleted
871 }
872 // fall through
873 default:
874 if (~dk->dk_kevent.flags & EV_DELETE) {
875 dk->dk_kevent.flags |= EV_DELETE;
876 dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE);
877 _dispatch_kq_update(&dk->dk_kevent);
878 }
879 break;
880 }
881
882 hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
883 dk->dk_kevent.filter);
884 TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list);
885 _dispatch_kevent_unguard(dk);
886 free(dk);
887 }
888
889 static void
890 _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg)
891 {
892 dispatch_source_refs_t dri;
893 uint32_t del_flags, fflags = 0;
894
895 if (TAILQ_EMPTY(&dk->dk_sources)) {
896 _dispatch_kevent_dispose(dk);
897 } else {
898 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
899 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
900 uint32_t mask = (uint32_t)dsi->ds_pending_data_mask;
901 fflags |= mask;
902 }
903 del_flags = flg & ~fflags;
904 if (del_flags) {
905 dk->dk_kevent.flags |= EV_ADD;
906 dk->dk_kevent.fflags = fflags;
907 _dispatch_kevent_resume(dk, 0, del_flags);
908 }
909 }
910 }
911
912 DISPATCH_NOINLINE
913 static void
914 _dispatch_kevent_proc_exit(struct kevent64_s *ke)
915 {
916 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
917 // <rdar://problem/5067725>. As a workaround, we simulate an exit event for
918 // any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
919 struct kevent64_s fake;
920 fake = *ke;
921 fake.flags &= ~EV_ERROR;
922 fake.fflags = NOTE_EXIT;
923 fake.data = 0;
924 _dispatch_kevent_drain(&fake);
925 }
926
927 DISPATCH_NOINLINE
928 static void
929 _dispatch_kevent_error(struct kevent64_s *ke)
930 {
931 _dispatch_kevent_debug(ke, __func__);
932 if (ke->data) {
933 // log the unexpected error
934 _dispatch_bug_kevent_client("kevent", _evfiltstr(ke->filter),
935 ke->flags & EV_DELETE ? "delete" :
936 ke->flags & EV_ADD ? "add" :
937 ke->flags & EV_ENABLE ? "enable" : "monitor",
938 (int)ke->data);
939 }
940 }
941
942 static void
943 _dispatch_kevent_drain(struct kevent64_s *ke)
944 {
945 #if DISPATCH_DEBUG
946 static dispatch_once_t pred;
947 dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
948 #endif
949 if (ke->filter == EVFILT_USER) {
950 return;
951 }
952 if (slowpath(ke->flags & EV_ERROR)) {
953 if (ke->filter == EVFILT_PROC) {
954 if (ke->flags & EV_DELETE) {
955 // Process exited while monitored
956 return;
957 } else if (ke->data == ESRCH) {
958 return _dispatch_kevent_proc_exit(ke);
959 }
960 #if DISPATCH_USE_VM_PRESSURE
961 } else if (ke->filter == EVFILT_VM && ke->data == ENOTSUP) {
962 // Memory pressure kevent is not supported on all platforms
963 // <rdar://problem/8636227>
964 return;
965 #endif
966 #if DISPATCH_USE_MEMORYSTATUS
967 } else if (ke->filter == EVFILT_MEMORYSTATUS &&
968 (ke->data == EINVAL || ke->data == ENOTSUP)) {
969 // Memory status kevent is not supported on all platforms
970 return;
971 #endif
972 }
973 return _dispatch_kevent_error(ke);
974 }
975 _dispatch_kevent_debug(ke, __func__);
976 if (ke->filter == EVFILT_TIMER) {
977 return _dispatch_timers_kevent(ke);
978 }
979 #if HAVE_MACH
980 if (ke->filter == EVFILT_MACHPORT) {
981 return _dispatch_kevent_mach_portset(ke);
982 }
983 #endif
984 return _dispatch_kevent_merge(ke);
985 }
986
987 DISPATCH_NOINLINE
988 static void
989 _dispatch_kevent_merge(struct kevent64_s *ke)
990 {
991 dispatch_kevent_t dk;
992 dispatch_source_refs_t dri;
993
994 dk = (void*)ke->udata;
995 dispatch_assert(dk);
996
997 if (ke->flags & EV_ONESHOT) {
998 dk->dk_kevent.flags |= EV_ONESHOT;
999 }
1000 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
1001 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), ke);
1002 }
1003 }
1004
1005 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1006 static void
1007 _dispatch_kevent_guard(dispatch_kevent_t dk)
1008 {
1009 guardid_t guard;
1010 const unsigned int guard_flags = GUARD_CLOSE;
1011 int r, fd_flags = 0;
1012 switch (dk->dk_kevent.filter) {
1013 case EVFILT_READ:
1014 case EVFILT_WRITE:
1015 case EVFILT_VNODE:
1016 guard = &dk->dk_kevent;
1017 r = change_fdguard_np((int)dk->dk_kevent.ident, NULL, 0,
1018 &guard, guard_flags, &fd_flags);
1019 if (slowpath(r == -1)) {
1020 int err = errno;
1021 if (err != EPERM) {
1022 (void)dispatch_assume_zero(err);
1023 }
1024 return;
1025 }
1026 dk->dk_kevent.ext[0] = guard_flags;
1027 dk->dk_kevent.ext[1] = fd_flags;
1028 break;
1029 }
1030 }
1031
1032 static void
1033 _dispatch_kevent_unguard(dispatch_kevent_t dk)
1034 {
1035 guardid_t guard;
1036 unsigned int guard_flags;
1037 int r, fd_flags;
1038 switch (dk->dk_kevent.filter) {
1039 case EVFILT_READ:
1040 case EVFILT_WRITE:
1041 case EVFILT_VNODE:
1042 guard_flags = (unsigned int)dk->dk_kevent.ext[0];
1043 if (!guard_flags) {
1044 return;
1045 }
1046 guard = &dk->dk_kevent;
1047 fd_flags = (int)dk->dk_kevent.ext[1];
1048 r = change_fdguard_np((int)dk->dk_kevent.ident, &guard,
1049 guard_flags, NULL, 0, &fd_flags);
1050 if (slowpath(r == -1)) {
1051 (void)dispatch_assume_zero(errno);
1052 return;
1053 }
1054 dk->dk_kevent.ext[0] = 0;
1055 break;
1056 }
1057 }
1058 #endif // DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1059
1060 #pragma mark -
1061 #pragma mark dispatch_source_timer
1062
1063 #if DISPATCH_USE_DTRACE && DISPATCH_USE_DTRACE_INTROSPECTION
1064 static dispatch_source_refs_t
1065 _dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT];
1066 #define _dispatch_trace_next_timer_set(x, q) \
1067 _dispatch_trace_next_timer[(q)] = (x)
1068 #define _dispatch_trace_next_timer_program(d, q) \
1069 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1070 #define _dispatch_trace_next_timer_wake(q) \
1071 _dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)])
1072 #else
1073 #define _dispatch_trace_next_timer_set(x, q)
1074 #define _dispatch_trace_next_timer_program(d, q)
1075 #define _dispatch_trace_next_timer_wake(q)
1076 #endif
1077
1078 #define _dispatch_source_timer_telemetry_enabled() false
1079
1080 DISPATCH_NOINLINE
1081 static void
1082 _dispatch_source_timer_telemetry_slow(dispatch_source_t ds,
1083 uintptr_t ident, struct dispatch_timer_source_s *values)
1084 {
1085 if (_dispatch_trace_timer_configure_enabled()) {
1086 _dispatch_trace_timer_configure(ds, ident, values);
1087 }
1088 }
1089
1090 DISPATCH_ALWAYS_INLINE
1091 static inline void
1092 _dispatch_source_timer_telemetry(dispatch_source_t ds, uintptr_t ident,
1093 struct dispatch_timer_source_s *values)
1094 {
1095 if (_dispatch_trace_timer_configure_enabled() ||
1096 _dispatch_source_timer_telemetry_enabled()) {
1097 _dispatch_source_timer_telemetry_slow(ds, ident, values);
1098 asm(""); // prevent tailcall
1099 }
1100 }
1101
1102 // approx 1 year (60s * 60m * 24h * 365d)
1103 #define FOREVER_NSEC 31536000000000000ull
1104
1105 DISPATCH_ALWAYS_INLINE
1106 static inline uint64_t
1107 _dispatch_source_timer_now(uint64_t nows[], unsigned int tidx)
1108 {
1109 unsigned int tk = DISPATCH_TIMER_KIND(tidx);
1110 if (nows && fastpath(nows[tk])) {
1111 return nows[tk];
1112 }
1113 uint64_t now;
1114 switch (tk) {
1115 case DISPATCH_TIMER_KIND_MACH:
1116 now = _dispatch_absolute_time();
1117 break;
1118 case DISPATCH_TIMER_KIND_WALL:
1119 now = _dispatch_get_nanoseconds();
1120 break;
1121 }
1122 if (nows) {
1123 nows[tk] = now;
1124 }
1125 return now;
1126 }
1127
1128 static inline unsigned long
1129 _dispatch_source_timer_data(dispatch_source_refs_t dr, unsigned long prev)
1130 {
1131 // calculate the number of intervals since last fire
1132 unsigned long data, missed;
1133 uint64_t now;
1134 now = _dispatch_source_timer_now(NULL, _dispatch_source_timer_idx(dr));
1135 missed = (unsigned long)((now - ds_timer(dr).last_fire) /
1136 ds_timer(dr).interval);
1137 // correct for missed intervals already delivered last time
1138 data = prev - ds_timer(dr).missed + missed;
1139 ds_timer(dr).missed = missed;
1140 return data;
1141 }
1142
1143 struct dispatch_set_timer_params {
1144 dispatch_source_t ds;
1145 uintptr_t ident;
1146 struct dispatch_timer_source_s values;
1147 };
1148
1149 static void
1150 _dispatch_source_set_timer3(void *context)
1151 {
1152 // Called on the _dispatch_mgr_q
1153 struct dispatch_set_timer_params *params = context;
1154 dispatch_source_t ds = params->ds;
1155 ds->ds_ident_hack = params->ident;
1156 ds_timer(ds->ds_refs) = params->values;
1157 // Clear any pending data that might have accumulated on
1158 // older timer params <rdar://problem/8574886>
1159 ds->ds_pending_data = 0;
1160 // Re-arm in case we got disarmed because of pending set_timer suspension
1161 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, release);
1162 dispatch_resume(ds);
1163 // Must happen after resume to avoid getting disarmed due to suspension
1164 _dispatch_timers_update(ds);
1165 dispatch_release(ds);
1166 if (params->values.flags & DISPATCH_TIMER_WALL_CLOCK) {
1167 _dispatch_mach_host_calendar_change_register();
1168 }
1169 free(params);
1170 }
1171
1172 static void
1173 _dispatch_source_set_timer2(void *context)
1174 {
1175 // Called on the source queue
1176 struct dispatch_set_timer_params *params = context;
1177 dispatch_suspend(params->ds);
1178 dispatch_barrier_async_f(&_dispatch_mgr_q, params,
1179 _dispatch_source_set_timer3);
1180 }
1181
1182 DISPATCH_NOINLINE
1183 static struct dispatch_set_timer_params *
1184 _dispatch_source_timer_params(dispatch_source_t ds, dispatch_time_t start,
1185 uint64_t interval, uint64_t leeway)
1186 {
1187 struct dispatch_set_timer_params *params;
1188 params = _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params));
1189 params->ds = ds;
1190 params->values.flags = ds_timer(ds->ds_refs).flags;
1191
1192 if (interval == 0) {
1193 // we use zero internally to mean disabled
1194 interval = 1;
1195 } else if ((int64_t)interval < 0) {
1196 // 6866347 - make sure nanoseconds won't overflow
1197 interval = INT64_MAX;
1198 }
1199 if ((int64_t)leeway < 0) {
1200 leeway = INT64_MAX;
1201 }
1202 if (start == DISPATCH_TIME_NOW) {
1203 start = _dispatch_absolute_time();
1204 } else if (start == DISPATCH_TIME_FOREVER) {
1205 start = INT64_MAX;
1206 }
1207
1208 if ((int64_t)start < 0) {
1209 // wall clock
1210 start = (dispatch_time_t)-((int64_t)start);
1211 params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
1212 } else {
1213 // absolute clock
1214 interval = _dispatch_time_nano2mach(interval);
1215 if (interval < 1) {
1216 // rdar://problem/7287561 interval must be at least one in
1217 // in order to avoid later division by zero when calculating
1218 // the missed interval count. (NOTE: the wall clock's
1219 // interval is already "fixed" to be 1 or more)
1220 interval = 1;
1221 }
1222 leeway = _dispatch_time_nano2mach(leeway);
1223 params->values.flags &= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK;
1224 }
1225 params->ident = DISPATCH_TIMER_IDENT(params->values.flags);
1226 params->values.target = start;
1227 params->values.deadline = (start < UINT64_MAX - leeway) ?
1228 start + leeway : UINT64_MAX;
1229 params->values.interval = interval;
1230 params->values.leeway = (interval == INT64_MAX || leeway < interval / 2) ?
1231 leeway : interval / 2;
1232 return params;
1233 }
1234
1235 DISPATCH_ALWAYS_INLINE
1236 static inline void
1237 _dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1238 uint64_t interval, uint64_t leeway, bool source_sync)
1239 {
1240 if (slowpath(!ds->ds_is_timer) ||
1241 slowpath(ds_timer(ds->ds_refs).flags & DISPATCH_TIMER_INTERVAL)) {
1242 DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source");
1243 }
1244
1245 struct dispatch_set_timer_params *params;
1246 params = _dispatch_source_timer_params(ds, start, interval, leeway);
1247
1248 _dispatch_source_timer_telemetry(ds, params->ident, &params->values);
1249 // Suspend the source so that it doesn't fire with pending changes
1250 // The use of suspend/resume requires the external retain/release
1251 dispatch_retain(ds);
1252 if (source_sync) {
1253 return _dispatch_barrier_trysync_f((dispatch_queue_t)ds, params,
1254 _dispatch_source_set_timer2);
1255 } else {
1256 return _dispatch_source_set_timer2(params);
1257 }
1258 }
1259
1260 void
1261 dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1262 uint64_t interval, uint64_t leeway)
1263 {
1264 _dispatch_source_set_timer(ds, start, interval, leeway, true);
1265 }
1266
1267 void
1268 _dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds,
1269 dispatch_time_t start, uint64_t interval, uint64_t leeway)
1270 {
1271 // Don't serialize through the source queue for CF timers <rdar://13833190>
1272 _dispatch_source_set_timer(ds, start, interval, leeway, false);
1273 }
1274
1275 void
1276 _dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval)
1277 {
1278 dispatch_source_refs_t dr = ds->ds_refs;
1279 #define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1280 const bool animation = ds_timer(dr).flags & DISPATCH_INTERVAL_UI_ANIMATION;
1281 if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME :
1282 FOREVER_NSEC/NSEC_PER_MSEC))) {
1283 interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC;
1284 } else {
1285 interval = FOREVER_NSEC;
1286 }
1287 interval = _dispatch_time_nano2mach(interval);
1288 uint64_t target = _dispatch_absolute_time() + interval;
1289 target = (target / interval) * interval;
1290 const uint64_t leeway = animation ?
1291 _dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2;
1292 ds_timer(dr).target = target;
1293 ds_timer(dr).deadline = target + leeway;
1294 ds_timer(dr).interval = interval;
1295 ds_timer(dr).leeway = leeway;
1296 _dispatch_source_timer_telemetry(ds, ds->ds_ident_hack, &ds_timer(dr));
1297 }
1298
1299 #pragma mark -
1300 #pragma mark dispatch_timers
1301
1302 #define DISPATCH_TIMER_STRUCT(refs) \
1303 uint64_t target, deadline; \
1304 TAILQ_HEAD(, refs) dt_sources
1305
1306 typedef struct dispatch_timer_s {
1307 DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s);
1308 } *dispatch_timer_t;
1309
1310 #define DISPATCH_TIMER_INITIALIZER(tidx) \
1311 [tidx] = { \
1312 .target = UINT64_MAX, \
1313 .deadline = UINT64_MAX, \
1314 .dt_sources = TAILQ_HEAD_INITIALIZER( \
1315 _dispatch_timer[tidx].dt_sources), \
1316 }
1317 #define DISPATCH_TIMER_INIT(kind, qos) \
1318 DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1319 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1320
1321 struct dispatch_timer_s _dispatch_timer[] = {
1322 DISPATCH_TIMER_INIT(WALL, NORMAL),
1323 DISPATCH_TIMER_INIT(WALL, CRITICAL),
1324 DISPATCH_TIMER_INIT(WALL, BACKGROUND),
1325 DISPATCH_TIMER_INIT(MACH, NORMAL),
1326 DISPATCH_TIMER_INIT(MACH, CRITICAL),
1327 DISPATCH_TIMER_INIT(MACH, BACKGROUND),
1328 };
1329 #define DISPATCH_TIMER_COUNT \
1330 ((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0])))
1331
1332 #define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1333 (uintptr_t)&_dispatch_kevent_timer[tidx]
1334 #ifdef __LP64__
1335 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1336 .udata = DISPATCH_KEVENT_TIMER_UDATA(tidx)
1337 #else // __LP64__
1338 // dynamic initialization in _dispatch_timers_init()
1339 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1340 .udata = 0
1341 #endif // __LP64__
1342 #define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \
1343 [tidx] = { \
1344 .dk_kevent = { \
1345 .ident = tidx, \
1346 .filter = DISPATCH_EVFILT_TIMER, \
1347 DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \
1348 }, \
1349 .dk_sources = TAILQ_HEAD_INITIALIZER( \
1350 _dispatch_kevent_timer[tidx].dk_sources), \
1351 }
1352 #define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \
1353 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1354 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1355
1356 struct dispatch_kevent_s _dispatch_kevent_timer[] = {
1357 DISPATCH_KEVENT_TIMER_INIT(WALL, NORMAL),
1358 DISPATCH_KEVENT_TIMER_INIT(WALL, CRITICAL),
1359 DISPATCH_KEVENT_TIMER_INIT(WALL, BACKGROUND),
1360 DISPATCH_KEVENT_TIMER_INIT(MACH, NORMAL),
1361 DISPATCH_KEVENT_TIMER_INIT(MACH, CRITICAL),
1362 DISPATCH_KEVENT_TIMER_INIT(MACH, BACKGROUND),
1363 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM),
1364 };
1365 #define DISPATCH_KEVENT_TIMER_COUNT \
1366 ((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0])))
1367
1368 #define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8)
1369 #define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(qos, note) \
1370 [qos] = { \
1371 .ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(qos), \
1372 .filter = EVFILT_TIMER, \
1373 .flags = EV_ONESHOT, \
1374 .fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \
1375 }
1376 #define DISPATCH_KEVENT_TIMEOUT_INIT(qos, note) \
1377 DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_QOS_##qos, note)
1378
1379 struct kevent64_s _dispatch_kevent_timeout[] = {
1380 DISPATCH_KEVENT_TIMEOUT_INIT(NORMAL, 0),
1381 DISPATCH_KEVENT_TIMEOUT_INIT(CRITICAL, NOTE_CRITICAL),
1382 DISPATCH_KEVENT_TIMEOUT_INIT(BACKGROUND, NOTE_BACKGROUND),
1383 };
1384
1385 #define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
1386 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
1387
1388 static const uint64_t _dispatch_kevent_coalescing_window[] = {
1389 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75),
1390 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1),
1391 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100),
1392 };
1393
1394 #define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \
1395 typeof(dr) dri = NULL; typeof(dt) dti; \
1396 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1397 TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \
1398 if (ds_timer(dr).target < ds_timer(dri).target) { \
1399 break; \
1400 } \
1401 } \
1402 TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \
1403 if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \
1404 break; \
1405 } \
1406 } \
1407 if (dti) { \
1408 TAILQ_INSERT_BEFORE(dti, dt, dt_list); \
1409 } else { \
1410 TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \
1411 } \
1412 } \
1413 if (dri) { \
1414 TAILQ_INSERT_BEFORE(dri, dr, dr_list); \
1415 } else { \
1416 TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \
1417 } \
1418 })
1419
1420 #define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \
1421 ({ \
1422 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1423 TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \
1424 } \
1425 TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \
1426 dr_list); })
1427
1428 #define _dispatch_timers_check(dra, dta) ({ \
1429 unsigned int qosm = _dispatch_timers_qos_mask; \
1430 bool update = false; \
1431 unsigned int tidx; \
1432 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \
1433 if (!(qosm & 1 << DISPATCH_TIMER_QOS(tidx))){ \
1434 continue; \
1435 } \
1436 dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \
1437 TAILQ_FIRST(&dra[tidx].dk_sources); \
1438 dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \
1439 TAILQ_FIRST(&dta[tidx].dt_sources); \
1440 uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \
1441 uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \
1442 if (target != dta[tidx].target) { \
1443 dta[tidx].target = target; \
1444 update = true; \
1445 } \
1446 if (deadline != dta[tidx].deadline) { \
1447 dta[tidx].deadline = deadline; \
1448 update = true; \
1449 } \
1450 } \
1451 update; })
1452
1453 static bool _dispatch_timers_reconfigure, _dispatch_timer_expired;
1454 static unsigned int _dispatch_timers_qos_mask;
1455 static bool _dispatch_timers_force_max_leeway;
1456
1457 static void
1458 _dispatch_timers_init(void)
1459 {
1460 #ifndef __LP64__
1461 unsigned int tidx;
1462 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1463 _dispatch_kevent_timer[tidx].dk_kevent.udata = \
1464 DISPATCH_KEVENT_TIMER_UDATA(tidx);
1465 }
1466 #endif // __LP64__
1467 _dispatch_timers_force_max_leeway =
1468 getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY");
1469 }
1470
1471 static inline void
1472 _dispatch_timers_unregister(dispatch_source_t ds, dispatch_kevent_t dk)
1473 {
1474 dispatch_source_refs_t dr = ds->ds_refs;
1475 unsigned int tidx = (unsigned int)dk->dk_kevent.ident;
1476
1477 if (slowpath(ds_timer_aggregate(ds))) {
1478 _dispatch_timer_aggregates_unregister(ds, tidx);
1479 }
1480 _dispatch_timers_remove(tidx, dk, _dispatch_kevent_timer, dr, dr_list,
1481 _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
1482 if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1483 _dispatch_timers_reconfigure = true;
1484 _dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx);
1485 }
1486 }
1487
1488 // Updates the ordered list of timers based on next fire date for changes to ds.
1489 // Should only be called from the context of _dispatch_mgr_q.
1490 static void
1491 _dispatch_timers_update(dispatch_source_t ds)
1492 {
1493 dispatch_kevent_t dk = ds->ds_dkev;
1494 dispatch_source_refs_t dr = ds->ds_refs;
1495 unsigned int tidx;
1496
1497 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1498
1499 // Do not reschedule timers unregistered with _dispatch_kevent_unregister()
1500 if (slowpath(!dk)) {
1501 return;
1502 }
1503 // Move timers that are disabled, suspended or have missed intervals to the
1504 // disarmed list, rearm after resume resp. source invoke will reenable them
1505 if (!ds_timer(dr).target || DISPATCH_OBJECT_SUSPENDED(ds) ||
1506 ds->ds_pending_data) {
1507 tidx = DISPATCH_TIMER_INDEX_DISARM;
1508 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
1509 } else {
1510 tidx = _dispatch_source_timer_idx(dr);
1511 }
1512 if (slowpath(ds_timer_aggregate(ds))) {
1513 _dispatch_timer_aggregates_register(ds);
1514 }
1515 if (slowpath(!ds->ds_is_installed)) {
1516 ds->ds_is_installed = true;
1517 if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1518 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
1519 }
1520 free(dk);
1521 _dispatch_object_debug(ds, "%s", __func__);
1522 } else {
1523 _dispatch_timers_unregister(ds, dk);
1524 }
1525 if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1526 _dispatch_timers_reconfigure = true;
1527 _dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx);
1528 }
1529 if (dk != &_dispatch_kevent_timer[tidx]){
1530 ds->ds_dkev = &_dispatch_kevent_timer[tidx];
1531 }
1532 _dispatch_timers_insert(tidx, _dispatch_kevent_timer, dr, dr_list,
1533 _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
1534 if (slowpath(ds_timer_aggregate(ds))) {
1535 _dispatch_timer_aggregates_update(ds, tidx);
1536 }
1537 }
1538
1539 static inline void
1540 _dispatch_timers_run2(uint64_t nows[], unsigned int tidx)
1541 {
1542 dispatch_source_refs_t dr;
1543 dispatch_source_t ds;
1544 uint64_t now, missed;
1545
1546 now = _dispatch_source_timer_now(nows, tidx);
1547 while ((dr = TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources))) {
1548 ds = _dispatch_source_from_refs(dr);
1549 // We may find timers on the wrong list due to a pending update from
1550 // dispatch_source_set_timer. Force an update of the list in that case.
1551 if (tidx != ds->ds_ident_hack) {
1552 _dispatch_timers_update(ds);
1553 continue;
1554 }
1555 if (!ds_timer(dr).target) {
1556 // No configured timers on the list
1557 break;
1558 }
1559 if (ds_timer(dr).target > now) {
1560 // Done running timers for now.
1561 break;
1562 }
1563 // Remove timers that are suspended or have missed intervals from the
1564 // list, rearm after resume resp. source invoke will reenable them
1565 if (DISPATCH_OBJECT_SUSPENDED(ds) || ds->ds_pending_data) {
1566 _dispatch_timers_update(ds);
1567 continue;
1568 }
1569 // Calculate number of missed intervals.
1570 missed = (now - ds_timer(dr).target) / ds_timer(dr).interval;
1571 if (++missed > INT_MAX) {
1572 missed = INT_MAX;
1573 }
1574 if (ds_timer(dr).interval < INT64_MAX) {
1575 ds_timer(dr).target += missed * ds_timer(dr).interval;
1576 ds_timer(dr).deadline = ds_timer(dr).target + ds_timer(dr).leeway;
1577 } else {
1578 ds_timer(dr).target = UINT64_MAX;
1579 ds_timer(dr).deadline = UINT64_MAX;
1580 }
1581 _dispatch_timers_update(ds);
1582 ds_timer(dr).last_fire = now;
1583
1584 unsigned long data;
1585 data = dispatch_atomic_add2o(ds, ds_pending_data,
1586 (unsigned long)missed, relaxed);
1587 _dispatch_trace_timer_fire(dr, data, (unsigned long)missed);
1588 _dispatch_wakeup(ds);
1589 }
1590 }
1591
1592 DISPATCH_NOINLINE
1593 static void
1594 _dispatch_timers_run(uint64_t nows[])
1595 {
1596 unsigned int tidx;
1597 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1598 if (!TAILQ_EMPTY(&_dispatch_kevent_timer[tidx].dk_sources)) {
1599 _dispatch_timers_run2(nows, tidx);
1600 }
1601 }
1602 }
1603
1604 static inline unsigned int
1605 _dispatch_timers_get_delay(uint64_t nows[], struct dispatch_timer_s timer[],
1606 uint64_t *delay, uint64_t *leeway, int qos)
1607 {
1608 unsigned int tidx, ridx = DISPATCH_TIMER_COUNT;
1609 uint64_t tmp, delta = UINT64_MAX, dldelta = UINT64_MAX;
1610
1611 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1612 if (qos >= 0 && qos != DISPATCH_TIMER_QOS(tidx)){
1613 continue;
1614 }
1615 uint64_t target = timer[tidx].target;
1616 if (target == UINT64_MAX) {
1617 continue;
1618 }
1619 uint64_t deadline = timer[tidx].deadline;
1620 if (qos >= 0) {
1621 // Timer pre-coalescing <rdar://problem/13222034>
1622 uint64_t window = _dispatch_kevent_coalescing_window[qos];
1623 uint64_t latest = deadline > window ? deadline - window : 0;
1624 dispatch_source_refs_t dri;
1625 TAILQ_FOREACH(dri, &_dispatch_kevent_timer[tidx].dk_sources,
1626 dr_list) {
1627 tmp = ds_timer(dri).target;
1628 if (tmp > latest) break;
1629 target = tmp;
1630 }
1631 }
1632 uint64_t now = _dispatch_source_timer_now(nows, tidx);
1633 if (target <= now) {
1634 delta = 0;
1635 break;
1636 }
1637 tmp = target - now;
1638 if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) {
1639 tmp = _dispatch_time_mach2nano(tmp);
1640 }
1641 if (tmp < INT64_MAX && tmp < delta) {
1642 ridx = tidx;
1643 delta = tmp;
1644 }
1645 dispatch_assert(target <= deadline);
1646 tmp = deadline - now;
1647 if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) {
1648 tmp = _dispatch_time_mach2nano(tmp);
1649 }
1650 if (tmp < INT64_MAX && tmp < dldelta) {
1651 dldelta = tmp;
1652 }
1653 }
1654 *delay = delta;
1655 *leeway = delta && delta < UINT64_MAX ? dldelta - delta : UINT64_MAX;
1656 return ridx;
1657 }
1658
1659 static bool
1660 _dispatch_timers_program2(uint64_t nows[], struct kevent64_s *ke,
1661 unsigned int qos)
1662 {
1663 unsigned int tidx;
1664 bool poll;
1665 uint64_t delay, leeway;
1666
1667 tidx = _dispatch_timers_get_delay(nows, _dispatch_timer, &delay, &leeway,
1668 (int)qos);
1669 poll = (delay == 0);
1670 if (poll || delay == UINT64_MAX) {
1671 _dispatch_trace_next_timer_set(NULL, qos);
1672 if (!ke->data) {
1673 return poll;
1674 }
1675 ke->data = 0;
1676 ke->flags |= EV_DELETE;
1677 ke->flags &= ~(EV_ADD|EV_ENABLE);
1678 } else {
1679 _dispatch_trace_next_timer_set(
1680 TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources), qos);
1681 _dispatch_trace_next_timer_program(delay, qos);
1682 delay += _dispatch_source_timer_now(nows, DISPATCH_TIMER_KIND_WALL);
1683 if (slowpath(_dispatch_timers_force_max_leeway)) {
1684 ke->data = (int64_t)(delay + leeway);
1685 ke->ext[1] = 0;
1686 } else {
1687 ke->data = (int64_t)delay;
1688 ke->ext[1] = leeway;
1689 }
1690 ke->flags |= EV_ADD|EV_ENABLE;
1691 ke->flags &= ~EV_DELETE;
1692 }
1693 _dispatch_kq_update(ke);
1694 return poll;
1695 }
1696
1697 DISPATCH_NOINLINE
1698 static bool
1699 _dispatch_timers_program(uint64_t nows[])
1700 {
1701 bool poll = false;
1702 unsigned int qos, qosm = _dispatch_timers_qos_mask;
1703 for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) {
1704 if (!(qosm & 1 << qos)){
1705 continue;
1706 }
1707 poll |= _dispatch_timers_program2(nows, &_dispatch_kevent_timeout[qos],
1708 qos);
1709 }
1710 return poll;
1711 }
1712
1713 DISPATCH_NOINLINE
1714 static bool
1715 _dispatch_timers_configure(void)
1716 {
1717 _dispatch_timer_aggregates_check();
1718 // Find out if there is a new target/deadline on the timer lists
1719 return _dispatch_timers_check(_dispatch_kevent_timer, _dispatch_timer);
1720 }
1721
1722 static void
1723 _dispatch_timers_calendar_change(void)
1724 {
1725 // calendar change may have gone past the wallclock deadline
1726 _dispatch_timer_expired = true;
1727 _dispatch_timers_qos_mask = ~0u;
1728 }
1729
1730 static void
1731 _dispatch_timers_kevent(struct kevent64_s *ke)
1732 {
1733 dispatch_assert(ke->data > 0);
1734 dispatch_assert((ke->ident & DISPATCH_KEVENT_TIMEOUT_IDENT_MASK) ==
1735 DISPATCH_KEVENT_TIMEOUT_IDENT_MASK);
1736 unsigned int qos = ke->ident & ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK;
1737 dispatch_assert(qos < DISPATCH_TIMER_QOS_COUNT);
1738 dispatch_assert(_dispatch_kevent_timeout[qos].data);
1739 _dispatch_kevent_timeout[qos].data = 0; // kevent deleted via EV_ONESHOT
1740 _dispatch_timer_expired = true;
1741 _dispatch_timers_qos_mask |= 1 << qos;
1742 _dispatch_trace_next_timer_wake(qos);
1743 }
1744
1745 static inline bool
1746 _dispatch_mgr_timers(void)
1747 {
1748 uint64_t nows[DISPATCH_TIMER_KIND_COUNT] = {};
1749 bool expired = slowpath(_dispatch_timer_expired);
1750 if (expired) {
1751 _dispatch_timers_run(nows);
1752 }
1753 bool reconfigure = slowpath(_dispatch_timers_reconfigure);
1754 if (reconfigure || expired) {
1755 if (reconfigure) {
1756 reconfigure = _dispatch_timers_configure();
1757 _dispatch_timers_reconfigure = false;
1758 }
1759 if (reconfigure || expired) {
1760 expired = _dispatch_timer_expired = _dispatch_timers_program(nows);
1761 expired = expired || _dispatch_mgr_q.dq_items_tail;
1762 }
1763 _dispatch_timers_qos_mask = 0;
1764 }
1765 return expired;
1766 }
1767
1768 #pragma mark -
1769 #pragma mark dispatch_timer_aggregate
1770
1771 typedef struct {
1772 TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s) dk_sources;
1773 } dispatch_timer_aggregate_refs_s;
1774
1775 typedef struct dispatch_timer_aggregate_s {
1776 DISPATCH_STRUCT_HEADER(queue);
1777 DISPATCH_QUEUE_HEADER;
1778 TAILQ_ENTRY(dispatch_timer_aggregate_s) dta_list;
1779 dispatch_timer_aggregate_refs_s
1780 dta_kevent_timer[DISPATCH_KEVENT_TIMER_COUNT];
1781 struct {
1782 DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s);
1783 } dta_timer[DISPATCH_TIMER_COUNT];
1784 struct dispatch_timer_s dta_timer_data[DISPATCH_TIMER_COUNT];
1785 unsigned int dta_refcount;
1786 } dispatch_timer_aggregate_s;
1787
1788 typedef TAILQ_HEAD(, dispatch_timer_aggregate_s) dispatch_timer_aggregates_s;
1789 static dispatch_timer_aggregates_s _dispatch_timer_aggregates =
1790 TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates);
1791
1792 dispatch_timer_aggregate_t
1793 dispatch_timer_aggregate_create(void)
1794 {
1795 unsigned int tidx;
1796 dispatch_timer_aggregate_t dta = _dispatch_alloc(DISPATCH_VTABLE(queue),
1797 sizeof(struct dispatch_timer_aggregate_s));
1798 _dispatch_queue_init((dispatch_queue_t)dta);
1799 dta->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_HIGH,
1800 true);
1801 dta->dq_width = UINT32_MAX;
1802 //FIXME: aggregates need custom vtable
1803 //dta->dq_label = "timer-aggregate";
1804 for (tidx = 0; tidx < DISPATCH_KEVENT_TIMER_COUNT; tidx++) {
1805 TAILQ_INIT(&dta->dta_kevent_timer[tidx].dk_sources);
1806 }
1807 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1808 TAILQ_INIT(&dta->dta_timer[tidx].dt_sources);
1809 dta->dta_timer[tidx].target = UINT64_MAX;
1810 dta->dta_timer[tidx].deadline = UINT64_MAX;
1811 dta->dta_timer_data[tidx].target = UINT64_MAX;
1812 dta->dta_timer_data[tidx].deadline = UINT64_MAX;
1813 }
1814 return (dispatch_timer_aggregate_t)_dispatch_introspection_queue_create(
1815 (dispatch_queue_t)dta);
1816 }
1817
1818 typedef struct dispatch_timer_delay_s {
1819 dispatch_timer_t timer;
1820 uint64_t delay, leeway;
1821 } *dispatch_timer_delay_t;
1822
1823 static void
1824 _dispatch_timer_aggregate_get_delay(void *ctxt)
1825 {
1826 dispatch_timer_delay_t dtd = ctxt;
1827 struct { uint64_t nows[DISPATCH_TIMER_KIND_COUNT]; } dtn = {};
1828 _dispatch_timers_get_delay(dtn.nows, dtd->timer, &dtd->delay, &dtd->leeway,
1829 -1);
1830 }
1831
1832 uint64_t
1833 dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta,
1834 uint64_t *leeway_ptr)
1835 {
1836 struct dispatch_timer_delay_s dtd = {
1837 .timer = dta->dta_timer_data,
1838 };
1839 dispatch_sync_f((dispatch_queue_t)dta, &dtd,
1840 _dispatch_timer_aggregate_get_delay);
1841 if (leeway_ptr) {
1842 *leeway_ptr = dtd.leeway;
1843 }
1844 return dtd.delay;
1845 }
1846
1847 static void
1848 _dispatch_timer_aggregate_update(void *ctxt)
1849 {
1850 dispatch_timer_aggregate_t dta = (void*)_dispatch_queue_get_current();
1851 dispatch_timer_t dtau = ctxt;
1852 unsigned int tidx;
1853 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1854 dta->dta_timer_data[tidx].target = dtau[tidx].target;
1855 dta->dta_timer_data[tidx].deadline = dtau[tidx].deadline;
1856 }
1857 free(dtau);
1858 }
1859
1860 DISPATCH_NOINLINE
1861 static void
1862 _dispatch_timer_aggregates_configure(void)
1863 {
1864 dispatch_timer_aggregate_t dta;
1865 dispatch_timer_t dtau;
1866 TAILQ_FOREACH(dta, &_dispatch_timer_aggregates, dta_list) {
1867 if (!_dispatch_timers_check(dta->dta_kevent_timer, dta->dta_timer)) {
1868 continue;
1869 }
1870 dtau = _dispatch_calloc(DISPATCH_TIMER_COUNT, sizeof(*dtau));
1871 memcpy(dtau, dta->dta_timer, sizeof(dta->dta_timer));
1872 dispatch_barrier_async_f((dispatch_queue_t)dta, dtau,
1873 _dispatch_timer_aggregate_update);
1874 }
1875 }
1876
1877 static inline void
1878 _dispatch_timer_aggregates_check(void)
1879 {
1880 if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates))) {
1881 return;
1882 }
1883 _dispatch_timer_aggregates_configure();
1884 }
1885
1886 static void
1887 _dispatch_timer_aggregates_register(dispatch_source_t ds)
1888 {
1889 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1890 if (!dta->dta_refcount++) {
1891 TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates, dta, dta_list);
1892 }
1893 }
1894
1895 DISPATCH_NOINLINE
1896 static void
1897 _dispatch_timer_aggregates_update(dispatch_source_t ds, unsigned int tidx)
1898 {
1899 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1900 dispatch_timer_source_aggregate_refs_t dr;
1901 dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
1902 _dispatch_timers_insert(tidx, dta->dta_kevent_timer, dr, dra_list,
1903 dta->dta_timer, dr, dta_list);
1904 }
1905
1906 DISPATCH_NOINLINE
1907 static void
1908 _dispatch_timer_aggregates_unregister(dispatch_source_t ds, unsigned int tidx)
1909 {
1910 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1911 dispatch_timer_source_aggregate_refs_t dr;
1912 dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
1913 _dispatch_timers_remove(tidx, (dispatch_timer_aggregate_refs_s*)NULL,
1914 dta->dta_kevent_timer, dr, dra_list, dta->dta_timer, dr, dta_list);
1915 if (!--dta->dta_refcount) {
1916 TAILQ_REMOVE(&_dispatch_timer_aggregates, dta, dta_list);
1917 }
1918 }
1919
1920 #pragma mark -
1921 #pragma mark dispatch_select
1922
1923 static int _dispatch_kq;
1924
1925 static unsigned int _dispatch_select_workaround;
1926 static fd_set _dispatch_rfds;
1927 static fd_set _dispatch_wfds;
1928 static uint64_t*_dispatch_rfd_ptrs;
1929 static uint64_t*_dispatch_wfd_ptrs;
1930
1931 DISPATCH_NOINLINE
1932 static bool
1933 _dispatch_select_register(struct kevent64_s *kev)
1934 {
1935
1936 // Must execute on manager queue
1937 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1938
1939 // If an EINVAL or ENOENT error occurred while adding/enabling a read or
1940 // write kevent, assume it was due to a type of filedescriptor not
1941 // supported by kqueue and fall back to select
1942 switch (kev->filter) {
1943 case EVFILT_READ:
1944 if ((kev->data == EINVAL || kev->data == ENOENT) &&
1945 dispatch_assume(kev->ident < FD_SETSIZE)) {
1946 FD_SET((int)kev->ident, &_dispatch_rfds);
1947 if (slowpath(!_dispatch_rfd_ptrs)) {
1948 _dispatch_rfd_ptrs = _dispatch_calloc(FD_SETSIZE,
1949 sizeof(*_dispatch_rfd_ptrs));
1950 }
1951 if (!_dispatch_rfd_ptrs[kev->ident]) {
1952 _dispatch_rfd_ptrs[kev->ident] = kev->udata;
1953 _dispatch_select_workaround++;
1954 _dispatch_debug("select workaround used to read fd %d: 0x%lx",
1955 (int)kev->ident, (long)kev->data);
1956 }
1957 }
1958 return true;
1959 case EVFILT_WRITE:
1960 if ((kev->data == EINVAL || kev->data == ENOENT) &&
1961 dispatch_assume(kev->ident < FD_SETSIZE)) {
1962 FD_SET((int)kev->ident, &_dispatch_wfds);
1963 if (slowpath(!_dispatch_wfd_ptrs)) {
1964 _dispatch_wfd_ptrs = _dispatch_calloc(FD_SETSIZE,
1965 sizeof(*_dispatch_wfd_ptrs));
1966 }
1967 if (!_dispatch_wfd_ptrs[kev->ident]) {
1968 _dispatch_wfd_ptrs[kev->ident] = kev->udata;
1969 _dispatch_select_workaround++;
1970 _dispatch_debug("select workaround used to write fd %d: 0x%lx",
1971 (int)kev->ident, (long)kev->data);
1972 }
1973 }
1974 return true;
1975 }
1976 return false;
1977 }
1978
1979 DISPATCH_NOINLINE
1980 static bool
1981 _dispatch_select_unregister(const struct kevent64_s *kev)
1982 {
1983 // Must execute on manager queue
1984 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1985
1986 switch (kev->filter) {
1987 case EVFILT_READ:
1988 if (_dispatch_rfd_ptrs && kev->ident < FD_SETSIZE &&
1989 _dispatch_rfd_ptrs[kev->ident]) {
1990 FD_CLR((int)kev->ident, &_dispatch_rfds);
1991 _dispatch_rfd_ptrs[kev->ident] = 0;
1992 _dispatch_select_workaround--;
1993 return true;
1994 }
1995 break;
1996 case EVFILT_WRITE:
1997 if (_dispatch_wfd_ptrs && kev->ident < FD_SETSIZE &&
1998 _dispatch_wfd_ptrs[kev->ident]) {
1999 FD_CLR((int)kev->ident, &_dispatch_wfds);
2000 _dispatch_wfd_ptrs[kev->ident] = 0;
2001 _dispatch_select_workaround--;
2002 return true;
2003 }
2004 break;
2005 }
2006 return false;
2007 }
2008
2009 DISPATCH_NOINLINE
2010 static bool
2011 _dispatch_mgr_select(bool poll)
2012 {
2013 static const struct timeval timeout_immediately = { 0, 0 };
2014 fd_set tmp_rfds, tmp_wfds;
2015 struct kevent64_s kev;
2016 int err, i, r;
2017 bool kevent_avail = false;
2018
2019 FD_COPY(&_dispatch_rfds, &tmp_rfds);
2020 FD_COPY(&_dispatch_wfds, &tmp_wfds);
2021
2022 r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL,
2023 poll ? (struct timeval*)&timeout_immediately : NULL);
2024 if (slowpath(r == -1)) {
2025 err = errno;
2026 if (err != EBADF) {
2027 if (err != EINTR) {
2028 (void)dispatch_assume_zero(err);
2029 }
2030 return false;
2031 }
2032 for (i = 0; i < FD_SETSIZE; i++) {
2033 if (i == _dispatch_kq) {
2034 continue;
2035 }
2036 if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)){
2037 continue;
2038 }
2039 r = dup(i);
2040 if (dispatch_assume(r != -1)) {
2041 close(r);
2042 } else {
2043 if (_dispatch_rfd_ptrs && _dispatch_rfd_ptrs[i]) {
2044 FD_CLR(i, &_dispatch_rfds);
2045 _dispatch_rfd_ptrs[i] = 0;
2046 _dispatch_select_workaround--;
2047 }
2048 if (_dispatch_wfd_ptrs && _dispatch_wfd_ptrs[i]) {
2049 FD_CLR(i, &_dispatch_wfds);
2050 _dispatch_wfd_ptrs[i] = 0;
2051 _dispatch_select_workaround--;
2052 }
2053 }
2054 }
2055 return false;
2056 }
2057 if (r > 0) {
2058 for (i = 0; i < FD_SETSIZE; i++) {
2059 if (FD_ISSET(i, &tmp_rfds)) {
2060 if (i == _dispatch_kq) {
2061 kevent_avail = true;
2062 continue;
2063 }
2064 FD_CLR(i, &_dispatch_rfds); // emulate EV_DISPATCH
2065 EV_SET64(&kev, i, EVFILT_READ,
2066 EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2067 _dispatch_rfd_ptrs[i], 0, 0);
2068 _dispatch_kevent_drain(&kev);
2069 }
2070 if (FD_ISSET(i, &tmp_wfds)) {
2071 FD_CLR(i, &_dispatch_wfds); // emulate EV_DISPATCH
2072 EV_SET64(&kev, i, EVFILT_WRITE,
2073 EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2074 _dispatch_wfd_ptrs[i], 0, 0);
2075 _dispatch_kevent_drain(&kev);
2076 }
2077 }
2078 }
2079 return kevent_avail;
2080 }
2081
2082 #pragma mark -
2083 #pragma mark dispatch_kqueue
2084
2085 static void
2086 _dispatch_kq_init(void *context DISPATCH_UNUSED)
2087 {
2088 static const struct kevent64_s kev = {
2089 .ident = 1,
2090 .filter = EVFILT_USER,
2091 .flags = EV_ADD|EV_CLEAR,
2092 };
2093
2094 _dispatch_safe_fork = false;
2095 #if DISPATCH_USE_GUARDED_FD
2096 guardid_t guard = (uintptr_t)&kev;
2097 _dispatch_kq = guarded_kqueue_np(&guard, GUARD_CLOSE | GUARD_DUP);
2098 #else
2099 _dispatch_kq = kqueue();
2100 #endif
2101 if (_dispatch_kq == -1) {
2102 DISPATCH_CLIENT_CRASH("kqueue() create failed: "
2103 "probably out of file descriptors");
2104 } else if (dispatch_assume(_dispatch_kq < FD_SETSIZE)) {
2105 // in case we fall back to select()
2106 FD_SET(_dispatch_kq, &_dispatch_rfds);
2107 }
2108
2109 (void)dispatch_assume_zero(kevent64(_dispatch_kq, &kev, 1, NULL, 0, 0,
2110 NULL));
2111 _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q);
2112 }
2113
2114 static int
2115 _dispatch_get_kq(void)
2116 {
2117 static dispatch_once_t pred;
2118
2119 dispatch_once_f(&pred, NULL, _dispatch_kq_init);
2120
2121 return _dispatch_kq;
2122 }
2123
2124 DISPATCH_NOINLINE
2125 static long
2126 _dispatch_kq_update(const struct kevent64_s *kev)
2127 {
2128 int r;
2129 struct kevent64_s kev_copy;
2130
2131 if (slowpath(_dispatch_select_workaround) && (kev->flags & EV_DELETE)) {
2132 if (_dispatch_select_unregister(kev)) {
2133 return 0;
2134 }
2135 }
2136 kev_copy = *kev;
2137 // This ensures we don't get a pending kevent back while registering
2138 // a new kevent
2139 kev_copy.flags |= EV_RECEIPT;
2140 retry:
2141 r = dispatch_assume(kevent64(_dispatch_get_kq(), &kev_copy, 1,
2142 &kev_copy, 1, 0, NULL));
2143 if (slowpath(r == -1)) {
2144 int err = errno;
2145 switch (err) {
2146 case EINTR:
2147 goto retry;
2148 case EBADF:
2149 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2150 break;
2151 default:
2152 (void)dispatch_assume_zero(err);
2153 break;
2154 }
2155 return err;
2156 }
2157 switch (kev_copy.data) {
2158 case 0:
2159 return 0;
2160 case EBADF:
2161 case EPERM:
2162 case EINVAL:
2163 case ENOENT:
2164 if ((kev->flags & (EV_ADD|EV_ENABLE)) && !(kev->flags & EV_DELETE)) {
2165 if (_dispatch_select_register(&kev_copy)) {
2166 return 0;
2167 }
2168 }
2169 // fall through
2170 default:
2171 kev_copy.flags |= kev->flags;
2172 _dispatch_kevent_drain(&kev_copy);
2173 break;
2174 }
2175 return (long)kev_copy.data;
2176 }
2177
2178 #pragma mark -
2179 #pragma mark dispatch_mgr
2180
2181 static struct kevent64_s *_dispatch_kevent_enable;
2182
2183 static void inline
2184 _dispatch_mgr_kevent_reenable(struct kevent64_s *ke)
2185 {
2186 dispatch_assert(!_dispatch_kevent_enable || _dispatch_kevent_enable == ke);
2187 _dispatch_kevent_enable = ke;
2188 }
2189
2190 unsigned long
2191 _dispatch_mgr_wakeup(dispatch_queue_t dq DISPATCH_UNUSED)
2192 {
2193 if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
2194 return false;
2195 }
2196
2197 static const struct kevent64_s kev = {
2198 .ident = 1,
2199 .filter = EVFILT_USER,
2200 .fflags = NOTE_TRIGGER,
2201 };
2202
2203 #if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG
2204 _dispatch_debug("waking up the dispatch manager queue: %p", dq);
2205 #endif
2206
2207 _dispatch_kq_update(&kev);
2208
2209 return false;
2210 }
2211
2212 DISPATCH_NOINLINE
2213 static void
2214 _dispatch_mgr_init(void)
2215 {
2216 (void)dispatch_atomic_inc2o(&_dispatch_mgr_q, dq_running, relaxed);
2217 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_mgr_q);
2218 _dispatch_queue_set_bound_thread(&_dispatch_mgr_q);
2219 _dispatch_mgr_priority_init();
2220 _dispatch_kevent_init();
2221 _dispatch_timers_init();
2222 _dispatch_mach_recv_msg_buf_init();
2223 _dispatch_memorystatus_init();
2224 }
2225
2226 DISPATCH_NOINLINE DISPATCH_NORETURN
2227 static void
2228 _dispatch_mgr_invoke(void)
2229 {
2230 static const struct timespec timeout_immediately = { 0, 0 };
2231 struct kevent64_s kev;
2232 bool poll;
2233 int r;
2234
2235 for (;;) {
2236 _dispatch_mgr_queue_drain();
2237 poll = _dispatch_mgr_timers();
2238 if (slowpath(_dispatch_select_workaround)) {
2239 poll = _dispatch_mgr_select(poll);
2240 if (!poll) continue;
2241 }
2242 r = kevent64(_dispatch_kq, _dispatch_kevent_enable,
2243 _dispatch_kevent_enable ? 1 : 0, &kev, 1, 0,
2244 poll ? &timeout_immediately : NULL);
2245 _dispatch_kevent_enable = NULL;
2246 if (slowpath(r == -1)) {
2247 int err = errno;
2248 switch (err) {
2249 case EINTR:
2250 break;
2251 case EBADF:
2252 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2253 break;
2254 default:
2255 (void)dispatch_assume_zero(err);
2256 break;
2257 }
2258 } else if (r) {
2259 _dispatch_kevent_drain(&kev);
2260 }
2261 }
2262 }
2263
2264 DISPATCH_NORETURN
2265 void
2266 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED)
2267 {
2268 _dispatch_mgr_init();
2269 // never returns, so burn bridges behind us & clear stack 2k ahead
2270 _dispatch_clear_stack(2048);
2271 _dispatch_mgr_invoke();
2272 }
2273
2274 #pragma mark -
2275 #pragma mark dispatch_memorystatus
2276
2277 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
2278 #define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYSTATUS
2279 #define DISPATCH_MEMORYSTATUS_SOURCE_MASK ( \
2280 DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL | \
2281 DISPATCH_MEMORYSTATUS_PRESSURE_WARN)
2282 #elif DISPATCH_USE_VM_PRESSURE_SOURCE
2283 #define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_VM
2284 #define DISPATCH_MEMORYSTATUS_SOURCE_MASK DISPATCH_VM_PRESSURE
2285 #endif
2286
2287 #if DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2288 static dispatch_source_t _dispatch_memorystatus_source;
2289
2290 static void
2291 _dispatch_memorystatus_handler(void *context DISPATCH_UNUSED)
2292 {
2293 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
2294 unsigned long memorystatus;
2295 memorystatus = dispatch_source_get_data(_dispatch_memorystatus_source);
2296 if (memorystatus & DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL) {
2297 _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT;
2298 return;
2299 }
2300 _dispatch_continuation_cache_limit =
2301 DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYSTATUS_PRESSURE_WARN;
2302 #endif
2303 malloc_zone_pressure_relief(0,0);
2304 }
2305
2306 static void
2307 _dispatch_memorystatus_init(void)
2308 {
2309 _dispatch_memorystatus_source = dispatch_source_create(
2310 DISPATCH_MEMORYSTATUS_SOURCE_TYPE, 0,
2311 DISPATCH_MEMORYSTATUS_SOURCE_MASK,
2312 _dispatch_get_root_queue(0, true));
2313 dispatch_source_set_event_handler_f(_dispatch_memorystatus_source,
2314 _dispatch_memorystatus_handler);
2315 dispatch_resume(_dispatch_memorystatus_source);
2316 }
2317 #else
2318 static inline void _dispatch_memorystatus_init(void) {}
2319 #endif // DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2320
2321 #pragma mark -
2322 #pragma mark dispatch_mach
2323
2324 #if HAVE_MACH
2325
2326 #if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
2327 #define _dispatch_debug_machport(name) \
2328 dispatch_debug_machport((name), __func__)
2329 #else
2330 #define _dispatch_debug_machport(name) ((void)(name))
2331 #endif
2332
2333 // Flags for all notifications that are registered/unregistered when a
2334 // send-possible notification is requested/delivered
2335 #define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
2336 DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
2337 #define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \
2338 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2339 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2340 #define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \
2341 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2342 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2343
2344 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
2345 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
2346 (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
2347
2348 #define _DISPATCH_MACHPORT_HASH_SIZE 32
2349 #define _DISPATCH_MACHPORT_HASH(x) \
2350 _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
2351
2352 #ifndef MACH_RCV_LARGE_IDENTITY
2353 #define MACH_RCV_LARGE_IDENTITY 0x00000008
2354 #endif
2355 #define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX
2356 #define DISPATCH_MACH_RCV_OPTIONS ( \
2357 MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \
2358 MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \
2359 MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0))
2360
2361 #define DISPATCH_MACH_KEVENT_ARMED(dk) ((dk)->dk_kevent.ext[0])
2362
2363 static void _dispatch_kevent_machport_drain(struct kevent64_s *ke);
2364 static void _dispatch_kevent_mach_msg_drain(struct kevent64_s *ke);
2365 static void _dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr);
2366 static void _dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr);
2367 static void _dispatch_source_merge_mach_msg(dispatch_source_t ds,
2368 dispatch_source_refs_t dr, dispatch_kevent_t dk,
2369 mach_msg_header_t *hdr, mach_msg_size_t siz);
2370 static kern_return_t _dispatch_mach_notify_update(dispatch_kevent_t dk,
2371 uint32_t new_flags, uint32_t del_flags, uint32_t mask,
2372 mach_msg_id_t notify_msgid, mach_port_mscount_t notify_sync);
2373 static void _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr);
2374 static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
2375 dispatch_mach_reply_refs_t dmr, bool disconnected);
2376 static void _dispatch_mach_msg_recv(dispatch_mach_t dm, mach_msg_header_t *hdr,
2377 mach_msg_size_t siz);
2378 static void _dispatch_mach_merge_kevent(dispatch_mach_t dm,
2379 const struct kevent64_s *ke);
2380 static void _dispatch_mach_kevent_unregister(dispatch_mach_t dm);
2381
2382 static const size_t _dispatch_mach_recv_msg_size =
2383 DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE;
2384 static const size_t dispatch_mach_trailer_size =
2385 sizeof(dispatch_mach_trailer_t);
2386 static const size_t _dispatch_mach_recv_msg_buf_size = mach_vm_round_page(
2387 _dispatch_mach_recv_msg_size + dispatch_mach_trailer_size);
2388 static mach_port_t _dispatch_mach_portset, _dispatch_mach_recv_portset;
2389 static mach_port_t _dispatch_mach_notify_port;
2390 static struct kevent64_s _dispatch_mach_recv_kevent = {
2391 .filter = EVFILT_MACHPORT,
2392 .flags = EV_ADD|EV_ENABLE|EV_DISPATCH,
2393 .fflags = DISPATCH_MACH_RCV_OPTIONS,
2394 };
2395 static dispatch_source_t _dispatch_mach_notify_source;
2396 static const
2397 struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct = {
2398 .ke = {
2399 .filter = EVFILT_MACHPORT,
2400 .flags = EV_CLEAR,
2401 .fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT,
2402 },
2403 };
2404
2405 static void
2406 _dispatch_mach_recv_msg_buf_init(void)
2407 {
2408 mach_vm_size_t vm_size = _dispatch_mach_recv_msg_buf_size;
2409 mach_vm_address_t vm_addr = vm_page_size;
2410 kern_return_t kr;
2411
2412 while (slowpath(kr = mach_vm_allocate(mach_task_self(), &vm_addr, vm_size,
2413 VM_FLAGS_ANYWHERE))) {
2414 if (kr != KERN_NO_SPACE) {
2415 (void)dispatch_assume_zero(kr);
2416 DISPATCH_CLIENT_CRASH("Could not allocate mach msg receive buffer");
2417 }
2418 _dispatch_temporary_resource_shortage();
2419 vm_addr = vm_page_size;
2420 }
2421 _dispatch_mach_recv_kevent.ext[0] = (uintptr_t)vm_addr;
2422 _dispatch_mach_recv_kevent.ext[1] = _dispatch_mach_recv_msg_buf_size;
2423 }
2424
2425 static inline void*
2426 _dispatch_get_mach_recv_msg_buf(void)
2427 {
2428 return (void*)_dispatch_mach_recv_kevent.ext[0];
2429 }
2430
2431 static void
2432 _dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED)
2433 {
2434 kern_return_t kr;
2435
2436 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
2437 &_dispatch_mach_recv_portset);
2438 DISPATCH_VERIFY_MIG(kr);
2439 if (dispatch_assume_zero(kr)) {
2440 DISPATCH_CLIENT_CRASH(
2441 "mach_port_allocate() failed: cannot create port set");
2442 }
2443 dispatch_assert(_dispatch_get_mach_recv_msg_buf());
2444 dispatch_assert(dispatch_mach_trailer_size ==
2445 REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS(
2446 DISPATCH_MACH_RCV_TRAILER)));
2447 _dispatch_mach_recv_kevent.ident = _dispatch_mach_recv_portset;
2448 _dispatch_kq_update(&_dispatch_mach_recv_kevent);
2449
2450 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
2451 &_dispatch_mach_notify_port);
2452 DISPATCH_VERIFY_MIG(kr);
2453 if (dispatch_assume_zero(kr)) {
2454 DISPATCH_CLIENT_CRASH(
2455 "mach_port_allocate() failed: cannot create receive right");
2456 }
2457 _dispatch_mach_notify_source = dispatch_source_create(
2458 &_dispatch_source_type_mach_recv_direct,
2459 _dispatch_mach_notify_port, 0, &_dispatch_mgr_q);
2460 _dispatch_mach_notify_source->ds_refs->ds_handler_func =
2461 (void*)_dispatch_mach_notify_source_invoke;
2462 dispatch_assert(_dispatch_mach_notify_source);
2463 dispatch_resume(_dispatch_mach_notify_source);
2464 }
2465
2466 static mach_port_t
2467 _dispatch_get_mach_recv_portset(void)
2468 {
2469 static dispatch_once_t pred;
2470 dispatch_once_f(&pred, NULL, _dispatch_mach_recv_portset_init);
2471 return _dispatch_mach_recv_portset;
2472 }
2473
2474 static void
2475 _dispatch_mach_portset_init(void *context DISPATCH_UNUSED)
2476 {
2477 struct kevent64_s kev = {
2478 .filter = EVFILT_MACHPORT,
2479 .flags = EV_ADD,
2480 };
2481 kern_return_t kr;
2482
2483 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
2484 &_dispatch_mach_portset);
2485 DISPATCH_VERIFY_MIG(kr);
2486 if (dispatch_assume_zero(kr)) {
2487 DISPATCH_CLIENT_CRASH(
2488 "mach_port_allocate() failed: cannot create port set");
2489 }
2490 kev.ident = _dispatch_mach_portset;
2491 _dispatch_kq_update(&kev);
2492 }
2493
2494 static mach_port_t
2495 _dispatch_get_mach_portset(void)
2496 {
2497 static dispatch_once_t pred;
2498 dispatch_once_f(&pred, NULL, _dispatch_mach_portset_init);
2499 return _dispatch_mach_portset;
2500 }
2501
2502 static kern_return_t
2503 _dispatch_mach_portset_update(dispatch_kevent_t dk, mach_port_t mps)
2504 {
2505 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
2506 kern_return_t kr;
2507
2508 _dispatch_debug_machport(mp);
2509 kr = mach_port_move_member(mach_task_self(), mp, mps);
2510 if (slowpath(kr)) {
2511 DISPATCH_VERIFY_MIG(kr);
2512 switch (kr) {
2513 case KERN_INVALID_RIGHT:
2514 if (mps) {
2515 _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
2516 "mach_port_move_member() failed ", kr);
2517 break;
2518 }
2519 //fall through
2520 case KERN_INVALID_NAME:
2521 #if DISPATCH_DEBUG
2522 _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
2523 "prematurely", mp);
2524 #endif
2525 break;
2526 default:
2527 (void)dispatch_assume_zero(kr);
2528 break;
2529 }
2530 }
2531 return mps ? kr : 0;
2532 }
2533
2534 static void
2535 _dispatch_kevent_mach_recv_reenable(struct kevent64_s *ke DISPATCH_UNUSED)
2536 {
2537 #if (TARGET_IPHONE_SIMULATOR && \
2538 IPHONE_SIMULATOR_HOST_MIN_VERSION_REQUIRED < 1090) || \
2539 (!TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED < 1090)
2540 // delete and re-add kevent to workaround <rdar://problem/13924256>
2541 if (ke->ext[1] != _dispatch_mach_recv_kevent.ext[1]) {
2542 struct kevent64_s kev = _dispatch_mach_recv_kevent;
2543 kev.flags = EV_DELETE;
2544 _dispatch_kq_update(&kev);
2545 }
2546 #endif
2547 _dispatch_mgr_kevent_reenable(&_dispatch_mach_recv_kevent);
2548 }
2549
2550 static kern_return_t
2551 _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags,
2552 uint32_t del_flags)
2553 {
2554 kern_return_t kr = 0;
2555 dispatch_assert_zero(new_flags & del_flags);
2556 if ((new_flags & _DISPATCH_MACH_RECV_FLAGS) ||
2557 (del_flags & _DISPATCH_MACH_RECV_FLAGS)) {
2558 mach_port_t mps;
2559 if (new_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
2560 mps = _dispatch_get_mach_recv_portset();
2561 } else if ((new_flags & DISPATCH_MACH_RECV_MESSAGE) ||
2562 ((del_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) &&
2563 (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE))) {
2564 mps = _dispatch_get_mach_portset();
2565 } else {
2566 mps = MACH_PORT_NULL;
2567 }
2568 kr = _dispatch_mach_portset_update(dk, mps);
2569 }
2570 return kr;
2571 }
2572
2573 static kern_return_t
2574 _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk, uint32_t new_flags,
2575 uint32_t del_flags)
2576 {
2577 kern_return_t kr = 0;
2578 dispatch_assert_zero(new_flags & del_flags);
2579 if ((new_flags & _DISPATCH_MACH_SP_FLAGS) ||
2580 (del_flags & _DISPATCH_MACH_SP_FLAGS)) {
2581 // Requesting a (delayed) non-sync send-possible notification
2582 // registers for both immediate dead-name notification and delayed-arm
2583 // send-possible notification for the port.
2584 // The send-possible notification is armed when a mach_msg() with the
2585 // the MACH_SEND_NOTIFY to the port times out.
2586 // If send-possible is unavailable, fall back to immediate dead-name
2587 // registration rdar://problem/2527840&9008724
2588 kr = _dispatch_mach_notify_update(dk, new_flags, del_flags,
2589 _DISPATCH_MACH_SP_FLAGS, MACH_NOTIFY_SEND_POSSIBLE,
2590 MACH_NOTIFY_SEND_POSSIBLE == MACH_NOTIFY_DEAD_NAME ? 1 : 0);
2591 }
2592 return kr;
2593 }
2594
2595 static inline void
2596 _dispatch_kevent_mach_portset(struct kevent64_s *ke)
2597 {
2598 if (ke->ident == _dispatch_mach_recv_portset) {
2599 return _dispatch_kevent_mach_msg_drain(ke);
2600 } else if (ke->ident == _dispatch_mach_portset) {
2601 return _dispatch_kevent_machport_drain(ke);
2602 } else {
2603 return _dispatch_kevent_error(ke);
2604 }
2605 }
2606
2607 DISPATCH_NOINLINE
2608 static void
2609 _dispatch_kevent_machport_drain(struct kevent64_s *ke)
2610 {
2611 mach_port_t name = (mach_port_name_t)ke->data;
2612 dispatch_kevent_t dk;
2613 struct kevent64_s kev;
2614
2615 _dispatch_debug_machport(name);
2616 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
2617 if (!dispatch_assume(dk)) {
2618 return;
2619 }
2620 _dispatch_mach_portset_update(dk, MACH_PORT_NULL); // emulate EV_DISPATCH
2621
2622 EV_SET64(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH,
2623 DISPATCH_MACH_RECV_MESSAGE, 0, (uintptr_t)dk, 0, 0);
2624 _dispatch_kevent_debug(&kev, __func__);
2625 _dispatch_kevent_merge(&kev);
2626 }
2627
2628 DISPATCH_NOINLINE
2629 static void
2630 _dispatch_kevent_mach_msg_drain(struct kevent64_s *ke)
2631 {
2632 mach_msg_header_t *hdr = (mach_msg_header_t*)ke->ext[0];
2633 mach_msg_size_t siz, msgsiz;
2634 mach_msg_return_t kr = (mach_msg_return_t)ke->fflags;
2635
2636 _dispatch_kevent_mach_recv_reenable(ke);
2637 if (!dispatch_assume(hdr)) {
2638 DISPATCH_CRASH("EVFILT_MACHPORT with no message");
2639 }
2640 if (fastpath(!kr)) {
2641 return _dispatch_kevent_mach_msg_recv(hdr);
2642 } else if (kr != MACH_RCV_TOO_LARGE) {
2643 goto out;
2644 }
2645 if (!dispatch_assume(ke->ext[1] <= UINT_MAX -
2646 dispatch_mach_trailer_size)) {
2647 DISPATCH_CRASH("EVFILT_MACHPORT with overlarge message");
2648 }
2649 siz = (mach_msg_size_t)ke->ext[1] + dispatch_mach_trailer_size;
2650 hdr = malloc(siz);
2651 if (ke->data) {
2652 if (!dispatch_assume(hdr)) {
2653 // Kernel will discard message too large to fit
2654 hdr = _dispatch_get_mach_recv_msg_buf();
2655 siz = _dispatch_mach_recv_msg_buf_size;
2656 }
2657 mach_port_t name = (mach_port_name_t)ke->data;
2658 const mach_msg_option_t options = ((DISPATCH_MACH_RCV_OPTIONS |
2659 MACH_RCV_TIMEOUT) & ~MACH_RCV_LARGE);
2660 kr = mach_msg(hdr, options, 0, siz, name, MACH_MSG_TIMEOUT_NONE,
2661 MACH_PORT_NULL);
2662 if (fastpath(!kr)) {
2663 return _dispatch_kevent_mach_msg_recv(hdr);
2664 } else if (kr == MACH_RCV_TOO_LARGE) {
2665 _dispatch_log("BUG in libdispatch client: "
2666 "_dispatch_kevent_mach_msg_drain: dropped message too "
2667 "large to fit in memory: id = 0x%x, size = %lld",
2668 hdr->msgh_id, ke->ext[1]);
2669 kr = MACH_MSG_SUCCESS;
2670 }
2671 } else {
2672 // We don't know which port in the portset contains the large message,
2673 // so need to receive all messages pending on the portset to ensure the
2674 // large message is drained. <rdar://problem/13950432>
2675 bool received = false;
2676 for (;;) {
2677 if (!dispatch_assume(hdr)) {
2678 DISPATCH_CLIENT_CRASH("Message too large to fit in memory");
2679 }
2680 const mach_msg_option_t options = (DISPATCH_MACH_RCV_OPTIONS |
2681 MACH_RCV_TIMEOUT);
2682 kr = mach_msg(hdr, options, 0, siz, _dispatch_mach_recv_portset,
2683 MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
2684 if ((!kr || kr == MACH_RCV_TOO_LARGE) && !dispatch_assume(
2685 hdr->msgh_size <= UINT_MAX - dispatch_mach_trailer_size)) {
2686 DISPATCH_CRASH("Overlarge message");
2687 }
2688 if (fastpath(!kr)) {
2689 msgsiz = hdr->msgh_size + dispatch_mach_trailer_size;
2690 if (msgsiz < siz) {
2691 void *shrink = realloc(hdr, msgsiz);
2692 if (shrink) hdr = shrink;
2693 }
2694 _dispatch_kevent_mach_msg_recv(hdr);
2695 hdr = NULL;
2696 received = true;
2697 } else if (kr == MACH_RCV_TOO_LARGE) {
2698 siz = hdr->msgh_size + dispatch_mach_trailer_size;
2699 } else {
2700 if (kr == MACH_RCV_TIMED_OUT && received) {
2701 kr = MACH_MSG_SUCCESS;
2702 }
2703 break;
2704 }
2705 hdr = reallocf(hdr, siz);
2706 }
2707 }
2708 if (hdr != _dispatch_get_mach_recv_msg_buf()) {
2709 free(hdr);
2710 }
2711 out:
2712 if (slowpath(kr)) {
2713 _dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: "
2714 "message reception failed", kr);
2715 }
2716 }
2717
2718 static void
2719 _dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr)
2720 {
2721 dispatch_source_refs_t dri;
2722 dispatch_kevent_t dk;
2723 mach_port_t name = hdr->msgh_local_port;
2724 mach_msg_size_t siz = hdr->msgh_size + dispatch_mach_trailer_size;
2725
2726 if (!dispatch_assume(hdr->msgh_size <= UINT_MAX -
2727 dispatch_mach_trailer_size)) {
2728 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2729 "received overlarge message");
2730 return _dispatch_kevent_mach_msg_destroy(hdr);
2731 }
2732 if (!dispatch_assume(name)) {
2733 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2734 "received message with MACH_PORT_NULL port");
2735 return _dispatch_kevent_mach_msg_destroy(hdr);
2736 }
2737 _dispatch_debug_machport(name);
2738 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
2739 if (!dispatch_assume(dk)) {
2740 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2741 "received message with unknown kevent");
2742 return _dispatch_kevent_mach_msg_destroy(hdr);
2743 }
2744 _dispatch_kevent_debug(&dk->dk_kevent, __func__);
2745 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
2746 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
2747 if (dsi->ds_pending_data_mask & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
2748 return _dispatch_source_merge_mach_msg(dsi, dri, dk, hdr, siz);
2749 }
2750 }
2751 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2752 "received message with no listeners");
2753 return _dispatch_kevent_mach_msg_destroy(hdr);
2754 }
2755
2756 static void
2757 _dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr)
2758 {
2759 if (hdr) {
2760 mach_msg_destroy(hdr);
2761 if (hdr != _dispatch_get_mach_recv_msg_buf()) {
2762 free(hdr);
2763 }
2764 }
2765 }
2766
2767 static void
2768 _dispatch_source_merge_mach_msg(dispatch_source_t ds, dispatch_source_refs_t dr,
2769 dispatch_kevent_t dk, mach_msg_header_t *hdr, mach_msg_size_t siz)
2770 {
2771 if (ds == _dispatch_mach_notify_source) {
2772 _dispatch_mach_notify_source_invoke(hdr);
2773 return _dispatch_kevent_mach_msg_destroy(hdr);
2774 }
2775 if (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) {
2776 _dispatch_mach_reply_kevent_unregister((dispatch_mach_t)ds,
2777 (dispatch_mach_reply_refs_t)dr, false);
2778 }
2779 return _dispatch_mach_msg_recv((dispatch_mach_t)ds, hdr, siz);
2780 }
2781
2782 DISPATCH_ALWAYS_INLINE
2783 static inline void
2784 _dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, bool final)
2785 {
2786 dispatch_source_refs_t dri, dr_next;
2787 dispatch_kevent_t dk;
2788 struct kevent64_s kev;
2789 bool unreg;
2790
2791 dk = _dispatch_kevent_find(name, DISPATCH_EVFILT_MACH_NOTIFICATION);
2792 if (!dk) {
2793 return;
2794 }
2795
2796 // Update notification registration state.
2797 dk->dk_kevent.data &= ~_DISPATCH_MACH_SP_FLAGS;
2798 EV_SET64(&kev, name, DISPATCH_EVFILT_MACH_NOTIFICATION, EV_ADD|EV_ENABLE,
2799 flag, 0, (uintptr_t)dk, 0, 0);
2800 if (final) {
2801 // This can never happen again
2802 unreg = true;
2803 } else {
2804 // Re-register for notification before delivery
2805 unreg = _dispatch_kevent_resume(dk, flag, 0);
2806 }
2807 DISPATCH_MACH_KEVENT_ARMED(dk) = 0;
2808 TAILQ_FOREACH_SAFE(dri, &dk->dk_sources, dr_list, dr_next) {
2809 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
2810 if (dx_type(dsi) == DISPATCH_MACH_CHANNEL_TYPE) {
2811 dispatch_mach_t dm = (dispatch_mach_t)dsi;
2812 _dispatch_mach_merge_kevent(dm, &kev);
2813 if (unreg && dm->dm_dkev) {
2814 _dispatch_mach_kevent_unregister(dm);
2815 }
2816 } else {
2817 _dispatch_source_merge_kevent(dsi, &kev);
2818 if (unreg) {
2819 _dispatch_source_kevent_unregister(dsi);
2820 }
2821 }
2822 if (!dr_next || DISPATCH_MACH_KEVENT_ARMED(dk)) {
2823 // current merge is last in list (dk might have been freed)
2824 // or it re-armed the notification
2825 return;
2826 }
2827 }
2828 }
2829
2830 static kern_return_t
2831 _dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags,
2832 uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid,
2833 mach_port_mscount_t notify_sync)
2834 {
2835 mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
2836 typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data;
2837 kern_return_t kr, krr = 0;
2838
2839 // Update notification registration state.
2840 dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask;
2841 dk->dk_kevent.data &= ~(del_flags & mask);
2842
2843 _dispatch_debug_machport(port);
2844 if ((dk->dk_kevent.data & mask) && !(prev & mask)) {
2845 // initialize _dispatch_mach_notify_port:
2846 (void)_dispatch_get_mach_recv_portset();
2847 _dispatch_debug("machport[0x%08x]: registering for send-possible "
2848 "notification", port);
2849 previous = MACH_PORT_NULL;
2850 krr = mach_port_request_notification(mach_task_self(), port,
2851 notify_msgid, notify_sync, _dispatch_mach_notify_port,
2852 MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
2853 DISPATCH_VERIFY_MIG(krr);
2854
2855 switch(krr) {
2856 case KERN_INVALID_NAME:
2857 case KERN_INVALID_RIGHT:
2858 // Supress errors & clear registration state
2859 dk->dk_kevent.data &= ~mask;
2860 break;
2861 default:
2862 // Else, we dont expect any errors from mach. Log any errors
2863 if (dispatch_assume_zero(krr)) {
2864 // log the error & clear registration state
2865 dk->dk_kevent.data &= ~mask;
2866 } else if (dispatch_assume_zero(previous)) {
2867 // Another subsystem has beat libdispatch to requesting the
2868 // specified Mach notification on this port. We should
2869 // technically cache the previous port and message it when the
2870 // kernel messages our port. Or we can just say screw those
2871 // subsystems and deallocate the previous port.
2872 // They should adopt libdispatch :-P
2873 kr = mach_port_deallocate(mach_task_self(), previous);
2874 DISPATCH_VERIFY_MIG(kr);
2875 (void)dispatch_assume_zero(kr);
2876 previous = MACH_PORT_NULL;
2877 }
2878 }
2879 } else if (!(dk->dk_kevent.data & mask) && (prev & mask)) {
2880 _dispatch_debug("machport[0x%08x]: unregistering for send-possible "
2881 "notification", port);
2882 previous = MACH_PORT_NULL;
2883 kr = mach_port_request_notification(mach_task_self(), port,
2884 notify_msgid, notify_sync, MACH_PORT_NULL,
2885 MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous);
2886 DISPATCH_VERIFY_MIG(kr);
2887
2888 switch (kr) {
2889 case KERN_INVALID_NAME:
2890 case KERN_INVALID_RIGHT:
2891 case KERN_INVALID_ARGUMENT:
2892 break;
2893 default:
2894 if (dispatch_assume_zero(kr)) {
2895 // log the error
2896 }
2897 }
2898 } else {
2899 return 0;
2900 }
2901 if (slowpath(previous)) {
2902 // the kernel has not consumed the send-once right yet
2903 (void)dispatch_assume_zero(
2904 _dispatch_send_consume_send_once_right(previous));
2905 }
2906 return krr;
2907 }
2908
2909 static void
2910 _dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED)
2911 {
2912 (void)_dispatch_get_mach_recv_portset();
2913 _dispatch_debug("registering for calendar-change notification");
2914 kern_return_t kr = host_request_notification(mach_host_self(),
2915 HOST_NOTIFY_CALENDAR_CHANGE, _dispatch_mach_notify_port);
2916 DISPATCH_VERIFY_MIG(kr);
2917 (void)dispatch_assume_zero(kr);
2918 }
2919
2920 static void
2921 _dispatch_mach_host_calendar_change_register(void)
2922 {
2923 static dispatch_once_t pred;
2924 dispatch_once_f(&pred, NULL, _dispatch_mach_host_notify_update);
2925 }
2926
2927 static void
2928 _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr)
2929 {
2930 mig_reply_error_t reply;
2931 dispatch_assert(sizeof(mig_reply_error_t) == sizeof(union
2932 __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem));
2933 dispatch_assert(sizeof(mig_reply_error_t) < _dispatch_mach_recv_msg_size);
2934 boolean_t success = libdispatch_internal_protocol_server(hdr, &reply.Head);
2935 if (!success && reply.RetCode == MIG_BAD_ID && hdr->msgh_id == 950) {
2936 // host_notify_reply.defs: host_calendar_changed
2937 _dispatch_debug("calendar-change notification");
2938 _dispatch_timers_calendar_change();
2939 _dispatch_mach_host_notify_update(NULL);
2940 success = TRUE;
2941 reply.RetCode = KERN_SUCCESS;
2942 }
2943 if (dispatch_assume(success) && reply.RetCode != MIG_NO_REPLY) {
2944 (void)dispatch_assume_zero(reply.RetCode);
2945 }
2946 }
2947
2948 kern_return_t
2949 _dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED,
2950 mach_port_name_t name)
2951 {
2952 #if DISPATCH_DEBUG
2953 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
2954 "deleted prematurely", name);
2955 #endif
2956
2957 _dispatch_debug_machport(name);
2958 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED, true);
2959
2960 return KERN_SUCCESS;
2961 }
2962
2963 kern_return_t
2964 _dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED,
2965 mach_port_name_t name)
2966 {
2967 kern_return_t kr;
2968
2969 _dispatch_debug("machport[0x%08x]: dead-name notification", name);
2970 _dispatch_debug_machport(name);
2971 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD, true);
2972
2973 // the act of receiving a dead name notification allocates a dead-name
2974 // right that must be deallocated
2975 kr = mach_port_deallocate(mach_task_self(), name);
2976 DISPATCH_VERIFY_MIG(kr);
2977 //(void)dispatch_assume_zero(kr);
2978
2979 return KERN_SUCCESS;
2980 }
2981
2982 kern_return_t
2983 _dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED,
2984 mach_port_name_t name)
2985 {
2986 _dispatch_debug("machport[0x%08x]: send-possible notification", name);
2987 _dispatch_debug_machport(name);
2988 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE, false);
2989
2990 return KERN_SUCCESS;
2991 }
2992
2993 #pragma mark -
2994 #pragma mark dispatch_mach_t
2995
2996 #define DISPATCH_MACH_NEVER_CONNECTED (UINT32_MAX/2)
2997 #define DISPATCH_MACH_PSEUDO_RECEIVED 0x1
2998 #define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2
2999 #define DISPATCH_MACH_OPTIONS_MASK 0xffff
3000
3001 static mach_port_t _dispatch_mach_msg_get_remote_port(dispatch_object_t dou);
3002 static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm,
3003 mach_port_t local_port, mach_port_t remote_port);
3004 static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm,
3005 dispatch_object_t dou);
3006 static inline mach_msg_header_t* _dispatch_mach_msg_get_msg(
3007 dispatch_mach_msg_t dmsg);
3008
3009 static dispatch_mach_t
3010 _dispatch_mach_create(const char *label, dispatch_queue_t q, void *context,
3011 dispatch_mach_handler_function_t handler, bool handler_is_block)
3012 {
3013 dispatch_mach_t dm;
3014 dispatch_mach_refs_t dr;
3015
3016 dm = _dispatch_alloc(DISPATCH_VTABLE(mach),
3017 sizeof(struct dispatch_mach_s));
3018 _dispatch_queue_init((dispatch_queue_t)dm);
3019 dm->dq_label = label;
3020
3021 dm->do_ref_cnt++; // the reference _dispatch_mach_cancel_invoke holds
3022 dm->do_ref_cnt++; // since channel is created suspended
3023 dm->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
3024 dm->do_targetq = &_dispatch_mgr_q;
3025
3026 dr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s));
3027 dr->dr_source_wref = _dispatch_ptr2wref(dm);
3028 dr->dm_handler_func = handler;
3029 dr->dm_handler_ctxt = context;
3030 dm->ds_refs = dr;
3031 dm->ds_handler_is_block = handler_is_block;
3032
3033 dm->dm_refs = _dispatch_calloc(1ul,
3034 sizeof(struct dispatch_mach_send_refs_s));
3035 dm->dm_refs->dr_source_wref = _dispatch_ptr2wref(dm);
3036 dm->dm_refs->dm_disconnect_cnt = DISPATCH_MACH_NEVER_CONNECTED;
3037 TAILQ_INIT(&dm->dm_refs->dm_replies);
3038
3039 // First item on the channel sets the user-specified target queue
3040 dispatch_set_target_queue(dm, q);
3041 _dispatch_object_debug(dm, "%s", __func__);
3042 return dm;
3043 }
3044
3045 dispatch_mach_t
3046 dispatch_mach_create(const char *label, dispatch_queue_t q,
3047 dispatch_mach_handler_t handler)
3048 {
3049 dispatch_block_t bb = _dispatch_Block_copy((void*)handler);
3050 return _dispatch_mach_create(label, q, bb,
3051 (dispatch_mach_handler_function_t)_dispatch_Block_invoke(bb), true);
3052 }
3053
3054 dispatch_mach_t
3055 dispatch_mach_create_f(const char *label, dispatch_queue_t q, void *context,
3056 dispatch_mach_handler_function_t handler)
3057 {
3058 return _dispatch_mach_create(label, q, context, handler, false);
3059 }
3060
3061 void
3062 _dispatch_mach_dispose(dispatch_mach_t dm)
3063 {
3064 _dispatch_object_debug(dm, "%s", __func__);
3065 dispatch_mach_refs_t dr = dm->ds_refs;
3066 if (dm->ds_handler_is_block && dr->dm_handler_ctxt) {
3067 Block_release(dr->dm_handler_ctxt);
3068 }
3069 free(dr);
3070 free(dm->dm_refs);
3071 _dispatch_queue_destroy(dm);
3072 }
3073
3074 void
3075 dispatch_mach_connect(dispatch_mach_t dm, mach_port_t receive,
3076 mach_port_t send, dispatch_mach_msg_t checkin)
3077 {
3078 dispatch_mach_send_refs_t dr = dm->dm_refs;
3079 dispatch_kevent_t dk;
3080
3081 if (MACH_PORT_VALID(receive)) {
3082 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3083 dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
3084 dk->dk_kevent.ident = receive;
3085 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3086 dk->dk_kevent.udata = (uintptr_t)dk;
3087 TAILQ_INIT(&dk->dk_sources);
3088 dm->ds_dkev = dk;
3089 dm->ds_pending_data_mask = dk->dk_kevent.fflags;
3090 _dispatch_retain(dm); // the reference the manager queue holds
3091 }
3092 dr->dm_send = send;
3093 if (MACH_PORT_VALID(send)) {
3094 if (checkin) {
3095 dispatch_retain(checkin);
3096 dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
3097 }
3098 dr->dm_checkin = checkin;
3099 }
3100 // monitor message reply ports
3101 dm->ds_pending_data_mask |= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
3102 if (slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_disconnect_cnt,
3103 DISPATCH_MACH_NEVER_CONNECTED, 0, release))) {
3104 DISPATCH_CLIENT_CRASH("Channel already connected");
3105 }
3106 _dispatch_object_debug(dm, "%s", __func__);
3107 return dispatch_resume(dm);
3108 }
3109
3110 DISPATCH_NOINLINE
3111 static void
3112 _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
3113 dispatch_mach_reply_refs_t dmr, bool disconnected)
3114 {
3115 dispatch_kevent_t dk = dmr->dm_dkev;
3116 mach_port_t local_port = (mach_port_t)dk->dk_kevent.ident;
3117 TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dmr, dr_list);
3118 _dispatch_kevent_unregister(dk, DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE);
3119 TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dm_list);
3120 free(dmr);
3121 if (disconnected) {
3122 _dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL);
3123 }
3124 }
3125
3126 DISPATCH_NOINLINE
3127 static void
3128 _dispatch_mach_reply_kevent_register(dispatch_mach_t dm, mach_port_t reply,
3129 void *ctxt)
3130 {
3131 dispatch_kevent_t dk;
3132 dispatch_mach_reply_refs_t dmr;
3133
3134 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3135 dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
3136 dk->dk_kevent.ident = reply;
3137 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3138 dk->dk_kevent.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
3139 dk->dk_kevent.udata = (uintptr_t)dk;
3140 // make reply context visible to leaks rdar://11777199
3141 dk->dk_kevent.ext[1] = (uintptr_t)ctxt;
3142 TAILQ_INIT(&dk->dk_sources);
3143
3144 dmr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s));
3145 dmr->dr_source_wref = _dispatch_ptr2wref(dm);
3146 dmr->dm_dkev = dk;
3147
3148 _dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p", reply,
3149 ctxt);
3150 uint32_t flags;
3151 bool do_resume = _dispatch_kevent_register(&dmr->dm_dkev, &flags);
3152 TAILQ_INSERT_TAIL(&dmr->dm_dkev->dk_sources, (dispatch_source_refs_t)dmr,
3153 dr_list);
3154 TAILQ_INSERT_TAIL(&dm->dm_refs->dm_replies, dmr, dm_list);
3155 if (do_resume && _dispatch_kevent_resume(dmr->dm_dkev, flags, 0)) {
3156 _dispatch_mach_reply_kevent_unregister(dm, dmr, true);
3157 }
3158 }
3159
3160 DISPATCH_NOINLINE
3161 static void
3162 _dispatch_mach_kevent_unregister(dispatch_mach_t dm)
3163 {
3164 dispatch_kevent_t dk = dm->dm_dkev;
3165 dm->dm_dkev = NULL;
3166 TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dm->dm_refs,
3167 dr_list);
3168 dm->ds_pending_data_mask &= ~(unsigned long)
3169 (DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
3170 _dispatch_kevent_unregister(dk,
3171 DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
3172 }
3173
3174 DISPATCH_NOINLINE
3175 static void
3176 _dispatch_mach_kevent_register(dispatch_mach_t dm, mach_port_t send)
3177 {
3178 dispatch_kevent_t dk;
3179
3180 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3181 dk->dk_kevent = _dispatch_source_type_mach_send.ke;
3182 dk->dk_kevent.ident = send;
3183 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3184 dk->dk_kevent.fflags = DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD;
3185 dk->dk_kevent.udata = (uintptr_t)dk;
3186 TAILQ_INIT(&dk->dk_sources);
3187
3188 dm->ds_pending_data_mask |= dk->dk_kevent.fflags;
3189
3190 uint32_t flags;
3191 bool do_resume = _dispatch_kevent_register(&dk, &flags);
3192 TAILQ_INSERT_TAIL(&dk->dk_sources,
3193 (dispatch_source_refs_t)dm->dm_refs, dr_list);
3194 dm->dm_dkev = dk;
3195 if (do_resume && _dispatch_kevent_resume(dm->dm_dkev, flags, 0)) {
3196 _dispatch_mach_kevent_unregister(dm);
3197 }
3198 }
3199
3200 static inline void
3201 _dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou)
3202 {
3203 return _dispatch_queue_push(dm._dq, dou);
3204 }
3205
3206 static inline void
3207 _dispatch_mach_msg_set_options(dispatch_object_t dou, mach_msg_option_t options)
3208 {
3209 dou._do->do_suspend_cnt = (unsigned int)options;
3210 }
3211
3212 static inline mach_msg_option_t
3213 _dispatch_mach_msg_get_options(dispatch_object_t dou)
3214 {
3215 mach_msg_option_t options = (mach_msg_option_t)dou._do->do_suspend_cnt;
3216 return options;
3217 }
3218
3219 static inline void
3220 _dispatch_mach_msg_set_reason(dispatch_object_t dou, mach_error_t err,
3221 unsigned long reason)
3222 {
3223 dispatch_assert_zero(reason & ~(unsigned long)code_emask);
3224 dou._do->do_suspend_cnt = (unsigned int)((err || !reason) ? err :
3225 err_local|err_sub(0x3e0)|(mach_error_t)reason);
3226 }
3227
3228 static inline unsigned long
3229 _dispatch_mach_msg_get_reason(dispatch_object_t dou, mach_error_t *err_ptr)
3230 {
3231 mach_error_t err = (mach_error_t)dou._do->do_suspend_cnt;
3232 dou._do->do_suspend_cnt = 0;
3233 if ((err & system_emask) == err_local && err_get_sub(err) == 0x3e0) {
3234 *err_ptr = 0;
3235 return err_get_code(err);
3236 }
3237 *err_ptr = err;
3238 return err ? DISPATCH_MACH_MESSAGE_SEND_FAILED : DISPATCH_MACH_MESSAGE_SENT;
3239 }
3240
3241 static void
3242 _dispatch_mach_msg_recv(dispatch_mach_t dm, mach_msg_header_t *hdr,
3243 mach_msg_size_t siz)
3244 {
3245 _dispatch_debug_machport(hdr->msgh_remote_port);
3246 _dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x",
3247 hdr->msgh_local_port, hdr->msgh_id, hdr->msgh_remote_port);
3248 if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3249 return _dispatch_kevent_mach_msg_destroy(hdr);
3250 }
3251 dispatch_mach_msg_t dmsg;
3252 dispatch_mach_msg_destructor_t destructor;
3253 destructor = (hdr == _dispatch_get_mach_recv_msg_buf()) ?
3254 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT :
3255 DISPATCH_MACH_MSG_DESTRUCTOR_FREE;
3256 dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL);
3257 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_RECEIVED);
3258 return _dispatch_mach_push(dm, dmsg);
3259 }
3260
3261 static inline mach_port_t
3262 _dispatch_mach_msg_get_remote_port(dispatch_object_t dou)
3263 {
3264 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg);
3265 mach_port_t remote = hdr->msgh_remote_port;
3266 return remote;
3267 }
3268
3269 static inline mach_port_t
3270 _dispatch_mach_msg_get_reply_port(dispatch_mach_t dm, dispatch_object_t dou)
3271 {
3272 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg);
3273 mach_port_t reply = MACH_PORT_NULL;
3274 mach_msg_option_t msg_opts = _dispatch_mach_msg_get_options(dou);
3275 if (msg_opts & DISPATCH_MACH_PSEUDO_RECEIVED) {
3276 reply = hdr->msgh_reserved;
3277 hdr->msgh_reserved = 0;
3278 } else if (MACH_MSGH_BITS_LOCAL(hdr->msgh_bits) ==
3279 MACH_MSG_TYPE_MAKE_SEND_ONCE &&
3280 MACH_PORT_VALID(hdr->msgh_local_port) && (!dm->ds_dkev ||
3281 dm->ds_dkev->dk_kevent.ident != hdr->msgh_local_port)) {
3282 reply = hdr->msgh_local_port;
3283 }
3284 return reply;
3285 }
3286
3287 static inline void
3288 _dispatch_mach_msg_disconnected(dispatch_mach_t dm, mach_port_t local_port,
3289 mach_port_t remote_port)
3290 {
3291 mach_msg_header_t *hdr;
3292 dispatch_mach_msg_t dmsg;
3293 dmsg = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
3294 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
3295 if (local_port) hdr->msgh_local_port = local_port;
3296 if (remote_port) hdr->msgh_remote_port = remote_port;
3297 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_DISCONNECTED);
3298 return _dispatch_mach_push(dm, dmsg);
3299 }
3300
3301 DISPATCH_NOINLINE
3302 static void
3303 _dispatch_mach_msg_not_sent(dispatch_mach_t dm, dispatch_object_t dou)
3304 {
3305 mach_port_t reply = _dispatch_mach_msg_get_reply_port(dm, dou);
3306 _dispatch_mach_msg_set_reason(dou, 0, DISPATCH_MACH_MESSAGE_NOT_SENT);
3307 _dispatch_mach_push(dm, dou);
3308 if (reply) {
3309 _dispatch_mach_msg_disconnected(dm, reply, MACH_PORT_NULL);
3310 }
3311 }
3312
3313 DISPATCH_NOINLINE
3314 static dispatch_object_t
3315 _dispatch_mach_msg_send(dispatch_mach_t dm, dispatch_object_t dou)
3316 {
3317 dispatch_mach_send_refs_t dr = dm->dm_refs;
3318 dispatch_mach_msg_t dmsg = dou._dmsg;
3319 dr->dm_needs_mgr = 0;
3320 if (slowpath(dr->dm_checkin) && dmsg != dr->dm_checkin) {
3321 // send initial checkin message
3322 if (dm->dm_dkev && slowpath(_dispatch_queue_get_current() !=
3323 &_dispatch_mgr_q)) {
3324 // send kevent must be uninstalled on the manager queue
3325 dr->dm_needs_mgr = 1;
3326 goto out;
3327 }
3328 dr->dm_checkin = _dispatch_mach_msg_send(dm, dr->dm_checkin)._dmsg;
3329 if (slowpath(dr->dm_checkin)) {
3330 goto out;
3331 }
3332 }
3333 mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
3334 mach_msg_return_t kr = 0;
3335 mach_port_t reply = _dispatch_mach_msg_get_reply_port(dm, dmsg);
3336 mach_msg_option_t opts = 0, msg_opts = _dispatch_mach_msg_get_options(dmsg);
3337 if (!slowpath(msg_opts & DISPATCH_MACH_REGISTER_FOR_REPLY)) {
3338 opts = MACH_SEND_MSG | (msg_opts & DISPATCH_MACH_OPTIONS_MASK);
3339 if (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) !=
3340 MACH_MSG_TYPE_MOVE_SEND_ONCE) {
3341 if (dmsg != dr->dm_checkin) {
3342 msg->msgh_remote_port = dr->dm_send;
3343 }
3344 if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
3345 if (slowpath(!dm->dm_dkev)) {
3346 _dispatch_mach_kevent_register(dm, msg->msgh_remote_port);
3347 }
3348 if (fastpath(dm->dm_dkev)) {
3349 if (DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) {
3350 goto out;
3351 }
3352 opts |= MACH_SEND_NOTIFY;
3353 }
3354 }
3355 opts |= MACH_SEND_TIMEOUT;
3356 }
3357 _dispatch_debug_machport(msg->msgh_remote_port);
3358 if (reply) _dispatch_debug_machport(reply);
3359 kr = mach_msg(msg, opts, msg->msgh_size, 0, MACH_PORT_NULL, 0,
3360 MACH_PORT_NULL);
3361 }
3362 _dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, opts 0x%x, "
3363 "msg_opts 0x%x, reply on 0x%08x: %s - 0x%x", msg->msgh_remote_port,
3364 msg->msgh_id, dmsg->do_ctxt, opts, msg_opts, reply,
3365 mach_error_string(kr), kr);
3366 if (kr == MACH_SEND_TIMED_OUT && (opts & MACH_SEND_TIMEOUT)) {
3367 if (opts & MACH_SEND_NOTIFY) {
3368 _dispatch_debug("machport[0x%08x]: send-possible notification "
3369 "armed", (mach_port_t)dm->dm_dkev->dk_kevent.ident);
3370 DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) = 1;
3371 } else {
3372 // send kevent must be installed on the manager queue
3373 dr->dm_needs_mgr = 1;
3374 }
3375 if (reply) {
3376 _dispatch_mach_msg_set_options(dmsg, msg_opts |
3377 DISPATCH_MACH_PSEUDO_RECEIVED);
3378 msg->msgh_reserved = reply; // Remember the original reply port
3379 }
3380 goto out;
3381 }
3382 if (fastpath(!kr) && reply) {
3383 if (_dispatch_queue_get_current() != &_dispatch_mgr_q) {
3384 // reply receive kevent must be installed on the manager queue
3385 dr->dm_needs_mgr = 1;
3386 _dispatch_mach_msg_set_options(dmsg, msg_opts |
3387 DISPATCH_MACH_REGISTER_FOR_REPLY);
3388 if (msg_opts & DISPATCH_MACH_PSEUDO_RECEIVED) {
3389 msg->msgh_reserved = reply; // Remember the original reply port
3390 }
3391 goto out;
3392 }
3393 _dispatch_mach_reply_kevent_register(dm, reply, dmsg->do_ctxt);
3394 }
3395 if (slowpath(dmsg == dr->dm_checkin) && dm->dm_dkev) {
3396 _dispatch_mach_kevent_unregister(dm);
3397 }
3398 _dispatch_mach_msg_set_reason(dmsg, kr, 0);
3399 _dispatch_mach_push(dm, dmsg);
3400 dmsg = NULL;
3401 if (slowpath(kr) && reply) {
3402 // Send failed, so reply was never connected <rdar://problem/14309159>
3403 _dispatch_mach_msg_disconnected(dm, reply, MACH_PORT_NULL);
3404 }
3405 out:
3406 return (dispatch_object_t)dmsg;
3407 }
3408
3409 static void
3410 _dispatch_mach_send_push(dispatch_mach_t dm, dispatch_object_t dou)
3411 {
3412 dispatch_mach_send_refs_t dr = dm->dm_refs;
3413 struct dispatch_object_s *prev, *dc = dou._do;
3414 dc->do_next = NULL;
3415
3416 prev = dispatch_atomic_xchg2o(dr, dm_tail, dc, release);
3417 if (fastpath(prev)) {
3418 prev->do_next = dc;
3419 return;
3420 }
3421 dr->dm_head = dc;
3422 _dispatch_wakeup(dm);
3423 }
3424
3425 DISPATCH_NOINLINE
3426 static void
3427 _dispatch_mach_send_drain(dispatch_mach_t dm)
3428 {
3429 dispatch_mach_send_refs_t dr = dm->dm_refs;
3430 struct dispatch_object_s *dc = NULL, *next_dc = NULL;
3431 while (dr->dm_tail) {
3432 while (!(dc = fastpath(dr->dm_head))) {
3433 dispatch_hardware_pause();
3434 }
3435 do {
3436 next_dc = fastpath(dc->do_next);
3437 dr->dm_head = next_dc;
3438 if (!next_dc && !dispatch_atomic_cmpxchg2o(dr, dm_tail, dc, NULL,
3439 relaxed)) {
3440 // Enqueue is TIGHTLY controlled, we won't wait long.
3441 while (!(next_dc = fastpath(dc->do_next))) {
3442 dispatch_hardware_pause();
3443 }
3444 dr->dm_head = next_dc;
3445 }
3446 if (!DISPATCH_OBJ_IS_VTABLE(dc)) {
3447 if ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
3448 // send barrier
3449 // leave send queue locked until barrier has completed
3450 return _dispatch_mach_push(dm, dc);
3451 }
3452 #if DISPATCH_MACH_SEND_SYNC
3453 if (slowpath((long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT)){
3454 _dispatch_thread_semaphore_signal(
3455 (_dispatch_thread_semaphore_t)dc->do_ctxt);
3456 continue;
3457 }
3458 #endif // DISPATCH_MACH_SEND_SYNC
3459 if (slowpath(!_dispatch_mach_reconnect_invoke(dm, dc))) {
3460 goto out;
3461 }
3462 continue;
3463 }
3464 if (slowpath(dr->dm_disconnect_cnt) ||
3465 slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3466 _dispatch_mach_msg_not_sent(dm, dc);
3467 continue;
3468 }
3469 if (slowpath(dc = _dispatch_mach_msg_send(dm, dc)._do)) {
3470 goto out;
3471 }
3472 } while ((dc = next_dc));
3473 }
3474 out:
3475 // if this is not a complete drain, we must undo some things
3476 if (slowpath(dc)) {
3477 if (!next_dc &&
3478 !dispatch_atomic_cmpxchg2o(dr, dm_tail, NULL, dc, relaxed)) {
3479 // wait for enqueue slow path to finish
3480 while (!(next_dc = fastpath(dr->dm_head))) {
3481 dispatch_hardware_pause();
3482 }
3483 dc->do_next = next_dc;
3484 }
3485 dr->dm_head = dc;
3486 }
3487 (void)dispatch_atomic_dec2o(dr, dm_sending, release);
3488 _dispatch_wakeup(dm);
3489 }
3490
3491 static inline void
3492 _dispatch_mach_send(dispatch_mach_t dm)
3493 {
3494 dispatch_mach_send_refs_t dr = dm->dm_refs;
3495 if (!fastpath(dr->dm_tail) || !fastpath(dispatch_atomic_cmpxchg2o(dr,
3496 dm_sending, 0, 1, acquire))) {
3497 return;
3498 }
3499 _dispatch_object_debug(dm, "%s", __func__);
3500 _dispatch_mach_send_drain(dm);
3501 }
3502
3503 DISPATCH_NOINLINE
3504 static void
3505 _dispatch_mach_merge_kevent(dispatch_mach_t dm, const struct kevent64_s *ke)
3506 {
3507 if (!(ke->fflags & dm->ds_pending_data_mask)) {
3508 return;
3509 }
3510 _dispatch_mach_send(dm);
3511 }
3512
3513 DISPATCH_NOINLINE
3514 void
3515 dispatch_mach_send(dispatch_mach_t dm, dispatch_mach_msg_t dmsg,
3516 mach_msg_option_t options)
3517 {
3518 dispatch_mach_send_refs_t dr = dm->dm_refs;
3519 if (slowpath(dmsg->do_next != DISPATCH_OBJECT_LISTLESS)) {
3520 DISPATCH_CLIENT_CRASH("Message already enqueued");
3521 }
3522 dispatch_retain(dmsg);
3523 dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK);
3524 _dispatch_mach_msg_set_options(dmsg, options & ~DISPATCH_MACH_OPTIONS_MASK);
3525 if (slowpath(dr->dm_tail) || slowpath(dr->dm_disconnect_cnt) ||
3526 slowpath(dm->ds_atomic_flags & DSF_CANCELED) ||
3527 slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1,
3528 acquire))) {
3529 return _dispatch_mach_send_push(dm, dmsg);
3530 }
3531 if (slowpath(dmsg = _dispatch_mach_msg_send(dm, dmsg)._dmsg)) {
3532 (void)dispatch_atomic_dec2o(dr, dm_sending, release);
3533 return _dispatch_mach_send_push(dm, dmsg);
3534 }
3535 if (slowpath(dr->dm_tail)) {
3536 return _dispatch_mach_send_drain(dm);
3537 }
3538 (void)dispatch_atomic_dec2o(dr, dm_sending, release);
3539 _dispatch_wakeup(dm);
3540 }
3541
3542 static void
3543 _dispatch_mach_disconnect(dispatch_mach_t dm)
3544 {
3545 dispatch_mach_send_refs_t dr = dm->dm_refs;
3546 if (dm->dm_dkev) {
3547 _dispatch_mach_kevent_unregister(dm);
3548 }
3549 if (MACH_PORT_VALID(dr->dm_send)) {
3550 _dispatch_mach_msg_disconnected(dm, MACH_PORT_NULL, dr->dm_send);
3551 }
3552 dr->dm_send = MACH_PORT_NULL;
3553 if (dr->dm_checkin) {
3554 _dispatch_mach_msg_not_sent(dm, dr->dm_checkin);
3555 dr->dm_checkin = NULL;
3556 }
3557 if (!TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3558 dispatch_mach_reply_refs_t dmr, tmp;
3559 TAILQ_FOREACH_SAFE(dmr, &dm->dm_refs->dm_replies, dm_list, tmp){
3560 _dispatch_mach_reply_kevent_unregister(dm, dmr, true);
3561 }
3562 }
3563 }
3564
3565 DISPATCH_NOINLINE
3566 static bool
3567 _dispatch_mach_cancel(dispatch_mach_t dm)
3568 {
3569 dispatch_mach_send_refs_t dr = dm->dm_refs;
3570 if (!fastpath(dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1, acquire))) {
3571 return false;
3572 }
3573 _dispatch_object_debug(dm, "%s", __func__);
3574 _dispatch_mach_disconnect(dm);
3575 if (dm->ds_dkev) {
3576 mach_port_t local_port = (mach_port_t)dm->ds_dkev->dk_kevent.ident;
3577 _dispatch_source_kevent_unregister((dispatch_source_t)dm);
3578 _dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL);
3579 }
3580 (void)dispatch_atomic_dec2o(dr, dm_sending, release);
3581 return true;
3582 }
3583
3584 DISPATCH_NOINLINE
3585 static bool
3586 _dispatch_mach_reconnect_invoke(dispatch_mach_t dm, dispatch_object_t dou)
3587 {
3588 if (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3589 if (slowpath(_dispatch_queue_get_current() != &_dispatch_mgr_q)) {
3590 // send/reply kevents must be uninstalled on the manager queue
3591 return false;
3592 }
3593 }
3594 _dispatch_mach_disconnect(dm);
3595 dispatch_mach_send_refs_t dr = dm->dm_refs;
3596 dr->dm_checkin = dou._dc->dc_data;
3597 dr->dm_send = (mach_port_t)dou._dc->dc_other;
3598 _dispatch_continuation_free(dou._dc);
3599 (void)dispatch_atomic_dec2o(dr, dm_disconnect_cnt, relaxed);
3600 _dispatch_object_debug(dm, "%s", __func__);
3601 return true;
3602 }
3603
3604 DISPATCH_NOINLINE
3605 void
3606 dispatch_mach_reconnect(dispatch_mach_t dm, mach_port_t send,
3607 dispatch_mach_msg_t checkin)
3608 {
3609 dispatch_mach_send_refs_t dr = dm->dm_refs;
3610 (void)dispatch_atomic_inc2o(dr, dm_disconnect_cnt, relaxed);
3611 if (MACH_PORT_VALID(send) && checkin) {
3612 dispatch_retain(checkin);
3613 dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
3614 } else {
3615 checkin = NULL;
3616 dr->dm_checkin_port = MACH_PORT_NULL;
3617 }
3618 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3619 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
3620 dc->dc_func = (void*)_dispatch_mach_reconnect_invoke;
3621 dc->dc_ctxt = dc;
3622 dc->dc_data = checkin;
3623 dc->dc_other = (void*)(uintptr_t)send;
3624 return _dispatch_mach_send_push(dm, dc);
3625 }
3626
3627 #if DISPATCH_MACH_SEND_SYNC
3628 DISPATCH_NOINLINE
3629 static void
3630 _dispatch_mach_send_sync_slow(dispatch_mach_t dm)
3631 {
3632 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
3633 struct dispatch_object_s dc = {
3634 .do_vtable = (void *)(DISPATCH_OBJ_SYNC_SLOW_BIT),
3635 .do_ctxt = (void*)sema,
3636 };
3637 _dispatch_mach_send_push(dm, &dc);
3638 _dispatch_thread_semaphore_wait(sema);
3639 _dispatch_put_thread_semaphore(sema);
3640 }
3641 #endif // DISPATCH_MACH_SEND_SYNC
3642
3643 DISPATCH_NOINLINE
3644 mach_port_t
3645 dispatch_mach_get_checkin_port(dispatch_mach_t dm)
3646 {
3647 dispatch_mach_send_refs_t dr = dm->dm_refs;
3648 if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3649 return MACH_PORT_DEAD;
3650 }
3651 return dr->dm_checkin_port;
3652 }
3653
3654 DISPATCH_NOINLINE
3655 static void
3656 _dispatch_mach_connect_invoke(dispatch_mach_t dm)
3657 {
3658 dispatch_mach_refs_t dr = dm->ds_refs;
3659 _dispatch_client_callout4(dr->dm_handler_ctxt,
3660 DISPATCH_MACH_CONNECTED, NULL, 0, dr->dm_handler_func);
3661 dm->dm_connect_handler_called = 1;
3662 }
3663
3664 DISPATCH_NOINLINE
3665 void
3666 _dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg)
3667 {
3668 dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current();
3669 dispatch_mach_refs_t dr = dm->ds_refs;
3670 mach_error_t err;
3671 unsigned long reason = _dispatch_mach_msg_get_reason(dmsg, &err);
3672
3673 dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
3674 _dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq);
3675 if (slowpath(!dm->dm_connect_handler_called)) {
3676 _dispatch_mach_connect_invoke(dm);
3677 }
3678 _dispatch_client_callout4(dr->dm_handler_ctxt, reason, dmsg, err,
3679 dr->dm_handler_func);
3680 _dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm);
3681 _dispatch_introspection_queue_item_complete(dmsg);
3682 dispatch_release(dmsg);
3683 }
3684
3685 DISPATCH_NOINLINE
3686 void
3687 _dispatch_mach_barrier_invoke(void *ctxt)
3688 {
3689 dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current();
3690 dispatch_mach_refs_t dr = dm->ds_refs;
3691 struct dispatch_continuation_s *dc = ctxt;
3692 void *context = dc->dc_data;
3693 dispatch_function_t barrier = dc->dc_other;
3694 bool send_barrier = ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT);
3695
3696 _dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq);
3697 if (slowpath(!dm->dm_connect_handler_called)) {
3698 _dispatch_mach_connect_invoke(dm);
3699 }
3700 _dispatch_client_callout(context, barrier);
3701 _dispatch_client_callout4(dr->dm_handler_ctxt,
3702 DISPATCH_MACH_BARRIER_COMPLETED, NULL, 0, dr->dm_handler_func);
3703 _dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm);
3704 if (send_barrier) {
3705 (void)dispatch_atomic_dec2o(dm->dm_refs, dm_sending, release);
3706 }
3707 }
3708
3709 DISPATCH_NOINLINE
3710 void
3711 dispatch_mach_send_barrier_f(dispatch_mach_t dm, void *context,
3712 dispatch_function_t barrier)
3713 {
3714 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3715 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
3716 dc->dc_func = _dispatch_mach_barrier_invoke;
3717 dc->dc_ctxt = dc;
3718 dc->dc_data = context;
3719 dc->dc_other = barrier;
3720
3721 dispatch_mach_send_refs_t dr = dm->dm_refs;
3722 if (slowpath(dr->dm_tail) || slowpath(!dispatch_atomic_cmpxchg2o(dr,
3723 dm_sending, 0, 1, acquire))) {
3724 return _dispatch_mach_send_push(dm, dc);
3725 }
3726 // leave send queue locked until barrier has completed
3727 return _dispatch_mach_push(dm, dc);
3728 }
3729
3730 DISPATCH_NOINLINE
3731 void
3732 dispatch_mach_receive_barrier_f(dispatch_mach_t dm, void *context,
3733 dispatch_function_t barrier)
3734 {
3735 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3736 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
3737 dc->dc_func = _dispatch_mach_barrier_invoke;
3738 dc->dc_ctxt = dc;
3739 dc->dc_data = context;
3740 dc->dc_other = barrier;
3741 return _dispatch_mach_push(dm, dc);
3742 }
3743
3744 DISPATCH_NOINLINE
3745 void
3746 dispatch_mach_send_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
3747 {
3748 dispatch_mach_send_barrier_f(dm, _dispatch_Block_copy(barrier),
3749 _dispatch_call_block_and_release);
3750 }
3751
3752 DISPATCH_NOINLINE
3753 void
3754 dispatch_mach_receive_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
3755 {
3756 dispatch_mach_receive_barrier_f(dm, _dispatch_Block_copy(barrier),
3757 _dispatch_call_block_and_release);
3758 }
3759
3760 DISPATCH_NOINLINE
3761 static void
3762 _dispatch_mach_cancel_invoke(dispatch_mach_t dm)
3763 {
3764 dispatch_mach_refs_t dr = dm->ds_refs;
3765 if (slowpath(!dm->dm_connect_handler_called)) {
3766 _dispatch_mach_connect_invoke(dm);
3767 }
3768 _dispatch_client_callout4(dr->dm_handler_ctxt,
3769 DISPATCH_MACH_CANCELED, NULL, 0, dr->dm_handler_func);
3770 dm->dm_cancel_handler_called = 1;
3771 _dispatch_release(dm); // the retain is done at creation time
3772 }
3773
3774 DISPATCH_NOINLINE
3775 void
3776 dispatch_mach_cancel(dispatch_mach_t dm)
3777 {
3778 dispatch_source_cancel((dispatch_source_t)dm);
3779 }
3780
3781 DISPATCH_ALWAYS_INLINE
3782 static inline dispatch_queue_t
3783 _dispatch_mach_invoke2(dispatch_object_t dou,
3784 _dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED)
3785 {
3786 dispatch_mach_t dm = dou._dm;
3787
3788 // This function performs all mach channel actions. Each action is
3789 // responsible for verifying that it takes place on the appropriate queue.
3790 // If the current queue is not the correct queue for this action, the
3791 // correct queue will be returned and the invoke will be re-driven on that
3792 // queue.
3793
3794 // The order of tests here in invoke and in probe should be consistent.
3795
3796 dispatch_queue_t dq = _dispatch_queue_get_current();
3797 dispatch_mach_send_refs_t dr = dm->dm_refs;
3798
3799 if (slowpath(!dm->ds_is_installed)) {
3800 // The channel needs to be installed on the manager queue.
3801 if (dq != &_dispatch_mgr_q) {
3802 return &_dispatch_mgr_q;
3803 }
3804 if (dm->ds_dkev) {
3805 _dispatch_source_kevent_register((dispatch_source_t)dm);
3806 }
3807 dm->ds_is_installed = true;
3808 _dispatch_mach_send(dm);
3809 // Apply initial target queue change
3810 _dispatch_queue_drain(dou);
3811 if (dm->dq_items_tail) {
3812 return dm->do_targetq;
3813 }
3814 } else if (dm->dq_items_tail) {
3815 // The channel has pending messages to deliver to the target queue.
3816 if (dq != dm->do_targetq) {
3817 return dm->do_targetq;
3818 }
3819 dispatch_queue_t tq = dm->do_targetq;
3820 if (slowpath(_dispatch_queue_drain(dou))) {
3821 DISPATCH_CLIENT_CRASH("Sync onto mach channel");
3822 }
3823 if (slowpath(tq != dm->do_targetq)) {
3824 // An item on the channel changed the target queue
3825 return dm->do_targetq;
3826 }
3827 } else if (dr->dm_tail) {
3828 if (slowpath(dr->dm_needs_mgr) || (slowpath(dr->dm_disconnect_cnt) &&
3829 (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)))) {
3830 // Send/reply kevents need to be installed or uninstalled
3831 if (dq != &_dispatch_mgr_q) {
3832 return &_dispatch_mgr_q;
3833 }
3834 }
3835 if (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) ||
3836 (dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt) {
3837 // The channel has pending messages to send.
3838 _dispatch_mach_send(dm);
3839 }
3840 } else if (dm->ds_atomic_flags & DSF_CANCELED){
3841 // The channel has been cancelled and needs to be uninstalled from the
3842 // manager queue. After uninstallation, the cancellation handler needs
3843 // to be delivered to the target queue.
3844 if (dm->ds_dkev || dm->dm_dkev || dr->dm_send ||
3845 !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3846 if (dq != &_dispatch_mgr_q) {
3847 return &_dispatch_mgr_q;
3848 }
3849 if (!_dispatch_mach_cancel(dm)) {
3850 return NULL;
3851 }
3852 }
3853 if (!dm->dm_cancel_handler_called) {
3854 if (dq != dm->do_targetq) {
3855 return dm->do_targetq;
3856 }
3857 _dispatch_mach_cancel_invoke(dm);
3858 }
3859 }
3860 return NULL;
3861 }
3862
3863 DISPATCH_NOINLINE
3864 void
3865 _dispatch_mach_invoke(dispatch_mach_t dm)
3866 {
3867 _dispatch_queue_class_invoke(dm, _dispatch_mach_invoke2);
3868 }
3869
3870 unsigned long
3871 _dispatch_mach_probe(dispatch_mach_t dm)
3872 {
3873 // This function determines whether the mach channel needs to be invoked.
3874 // The order of tests here in probe and in invoke should be consistent.
3875
3876 dispatch_mach_send_refs_t dr = dm->dm_refs;
3877
3878 if (slowpath(!dm->ds_is_installed)) {
3879 // The channel needs to be installed on the manager queue.
3880 return true;
3881 } else if (dm->dq_items_tail) {
3882 // The source has pending messages to deliver to the target queue.
3883 return true;
3884 } else if (dr->dm_tail &&
3885 (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) ||
3886 (dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt)) {
3887 // The channel has pending messages to send.
3888 return true;
3889 } else if (dm->ds_atomic_flags & DSF_CANCELED) {
3890 if (dm->ds_dkev || dm->dm_dkev || dr->dm_send ||
3891 !TAILQ_EMPTY(&dm->dm_refs->dm_replies) ||
3892 !dm->dm_cancel_handler_called) {
3893 // The channel needs to be uninstalled from the manager queue, or
3894 // the cancellation handler needs to be delivered to the target
3895 // queue.
3896 return true;
3897 }
3898 }
3899 // Nothing to do.
3900 return false;
3901 }
3902
3903 #pragma mark -
3904 #pragma mark dispatch_mach_msg_t
3905
3906 dispatch_mach_msg_t
3907 dispatch_mach_msg_create(mach_msg_header_t *msg, size_t size,
3908 dispatch_mach_msg_destructor_t destructor, mach_msg_header_t **msg_ptr)
3909 {
3910 if (slowpath(size < sizeof(mach_msg_header_t)) ||
3911 slowpath(destructor && !msg)) {
3912 DISPATCH_CLIENT_CRASH("Empty message");
3913 }
3914 dispatch_mach_msg_t dmsg = _dispatch_alloc(DISPATCH_VTABLE(mach_msg),
3915 sizeof(struct dispatch_mach_msg_s) +
3916 (destructor ? 0 : size - sizeof(dmsg->msg)));
3917 if (destructor) {
3918 dmsg->msg = msg;
3919 } else if (msg) {
3920 memcpy(dmsg->buf, msg, size);
3921 }
3922 dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
3923 dmsg->do_targetq = _dispatch_get_root_queue(0, false);
3924 dmsg->destructor = destructor;
3925 dmsg->size = size;
3926 if (msg_ptr) {
3927 *msg_ptr = _dispatch_mach_msg_get_msg(dmsg);
3928 }
3929 return dmsg;
3930 }
3931
3932 void
3933 _dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg)
3934 {
3935 switch (dmsg->destructor) {
3936 case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT:
3937 break;
3938 case DISPATCH_MACH_MSG_DESTRUCTOR_FREE:
3939 free(dmsg->msg);
3940 break;
3941 case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE: {
3942 mach_vm_size_t vm_size = dmsg->size;
3943 mach_vm_address_t vm_addr = (uintptr_t)dmsg->msg;
3944 (void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(),
3945 vm_addr, vm_size));
3946 break;
3947 }}
3948 }
3949
3950 static inline mach_msg_header_t*
3951 _dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg)
3952 {
3953 return dmsg->destructor ? dmsg->msg : (mach_msg_header_t*)dmsg->buf;
3954 }
3955
3956 mach_msg_header_t*
3957 dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg, size_t *size_ptr)
3958 {
3959 if (size_ptr) {
3960 *size_ptr = dmsg->size;
3961 }
3962 return _dispatch_mach_msg_get_msg(dmsg);
3963 }
3964
3965 size_t
3966 _dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg, char* buf, size_t bufsiz)
3967 {
3968 size_t offset = 0;
3969 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
3970 dx_kind(dmsg), dmsg);
3971 offset += dsnprintf(&buf[offset], bufsiz - offset, "xrefcnt = 0x%x, "
3972 "refcnt = 0x%x, ", dmsg->do_xref_cnt + 1, dmsg->do_ref_cnt + 1);
3973 offset += dsnprintf(&buf[offset], bufsiz - offset, "opts/err = 0x%x, "
3974 "msgh[%p] = { ", dmsg->do_suspend_cnt, dmsg->buf);
3975 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dmsg);
3976 if (hdr->msgh_id) {
3977 offset += dsnprintf(&buf[offset], bufsiz - offset, "id 0x%x, ",
3978 hdr->msgh_id);
3979 }
3980 if (hdr->msgh_size) {
3981 offset += dsnprintf(&buf[offset], bufsiz - offset, "size %u, ",
3982 hdr->msgh_size);
3983 }
3984 if (hdr->msgh_bits) {
3985 offset += dsnprintf(&buf[offset], bufsiz - offset, "bits <l %u, r %u",
3986 MACH_MSGH_BITS_LOCAL(hdr->msgh_bits),
3987 MACH_MSGH_BITS_REMOTE(hdr->msgh_bits));
3988 if (MACH_MSGH_BITS_OTHER(hdr->msgh_bits)) {
3989 offset += dsnprintf(&buf[offset], bufsiz - offset, ", o 0x%x",
3990 MACH_MSGH_BITS_OTHER(hdr->msgh_bits));
3991 }
3992 offset += dsnprintf(&buf[offset], bufsiz - offset, ">, ");
3993 }
3994 if (hdr->msgh_local_port && hdr->msgh_remote_port) {
3995 offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x, "
3996 "remote 0x%x", hdr->msgh_local_port, hdr->msgh_remote_port);
3997 } else if (hdr->msgh_local_port) {
3998 offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x",
3999 hdr->msgh_local_port);
4000 } else if (hdr->msgh_remote_port) {
4001 offset += dsnprintf(&buf[offset], bufsiz - offset, "remote 0x%x",
4002 hdr->msgh_remote_port);
4003 } else {
4004 offset += dsnprintf(&buf[offset], bufsiz - offset, "no ports");
4005 }
4006 offset += dsnprintf(&buf[offset], bufsiz - offset, " } }");
4007 return offset;
4008 }
4009
4010 #pragma mark -
4011 #pragma mark dispatch_mig_server
4012
4013 mach_msg_return_t
4014 dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz,
4015 dispatch_mig_callback_t callback)
4016 {
4017 mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
4018 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
4019 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0);
4020 mach_msg_options_t tmp_options;
4021 mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
4022 mach_msg_return_t kr = 0;
4023 uint64_t assertion_token = 0;
4024 unsigned int cnt = 1000; // do not stall out serial queues
4025 boolean_t demux_success;
4026 bool received = false;
4027 size_t rcv_size = maxmsgsz + MAX_TRAILER_SIZE;
4028
4029 // XXX FIXME -- allocate these elsewhere
4030 bufRequest = alloca(rcv_size);
4031 bufReply = alloca(rcv_size);
4032 bufReply->Head.msgh_size = 0;
4033 bufRequest->RetCode = 0;
4034
4035 #if DISPATCH_DEBUG
4036 options |= MACH_RCV_LARGE; // rdar://problem/8422992
4037 #endif
4038 tmp_options = options;
4039 // XXX FIXME -- change this to not starve out the target queue
4040 for (;;) {
4041 if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
4042 options &= ~MACH_RCV_MSG;
4043 tmp_options &= ~MACH_RCV_MSG;
4044
4045 if (!(tmp_options & MACH_SEND_MSG)) {
4046 goto out;
4047 }
4048 }
4049 kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
4050 (mach_msg_size_t)rcv_size, (mach_port_t)ds->ds_ident_hack, 0,0);
4051
4052 tmp_options = options;
4053
4054 if (slowpath(kr)) {
4055 switch (kr) {
4056 case MACH_SEND_INVALID_DEST:
4057 case MACH_SEND_TIMED_OUT:
4058 if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
4059 mach_msg_destroy(&bufReply->Head);
4060 }
4061 break;
4062 case MACH_RCV_TIMED_OUT:
4063 // Don't return an error if a message was sent this time or
4064 // a message was successfully received previously
4065 // rdar://problems/7363620&7791738
4066 if(bufReply->Head.msgh_remote_port || received) {
4067 kr = MACH_MSG_SUCCESS;
4068 }
4069 break;
4070 case MACH_RCV_INVALID_NAME:
4071 break;
4072 #if DISPATCH_DEBUG
4073 case MACH_RCV_TOO_LARGE:
4074 // receive messages that are too large and log their id and size
4075 // rdar://problem/8422992
4076 tmp_options &= ~MACH_RCV_LARGE;
4077 size_t large_size = bufReply->Head.msgh_size + MAX_TRAILER_SIZE;
4078 void *large_buf = malloc(large_size);
4079 if (large_buf) {
4080 rcv_size = large_size;
4081 bufReply = large_buf;
4082 }
4083 if (!mach_msg(&bufReply->Head, tmp_options, 0,
4084 (mach_msg_size_t)rcv_size,
4085 (mach_port_t)ds->ds_ident_hack, 0, 0)) {
4086 _dispatch_log("BUG in libdispatch client: "
4087 "dispatch_mig_server received message larger than "
4088 "requested size %zd: id = 0x%x, size = %d",
4089 maxmsgsz, bufReply->Head.msgh_id,
4090 bufReply->Head.msgh_size);
4091 }
4092 if (large_buf) {
4093 free(large_buf);
4094 }
4095 // fall through
4096 #endif
4097 default:
4098 _dispatch_bug_mach_client(
4099 "dispatch_mig_server: mach_msg() failed", kr);
4100 break;
4101 }
4102 goto out;
4103 }
4104
4105 if (!(tmp_options & MACH_RCV_MSG)) {
4106 goto out;
4107 }
4108
4109 if (assertion_token) {
4110 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4111 int r = proc_importance_assertion_complete(assertion_token);
4112 (void)dispatch_assume_zero(r);
4113 #endif
4114 assertion_token = 0;
4115 }
4116 received = true;
4117
4118 bufTemp = bufRequest;
4119 bufRequest = bufReply;
4120 bufReply = bufTemp;
4121
4122 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4123 int r = proc_importance_assertion_begin_with_msg(&bufRequest->Head,
4124 NULL, &assertion_token);
4125 if (r && slowpath(r != EIO)) {
4126 (void)dispatch_assume_zero(r);
4127 }
4128 #endif
4129
4130 demux_success = callback(&bufRequest->Head, &bufReply->Head);
4131
4132 if (!demux_success) {
4133 // destroy the request - but not the reply port
4134 bufRequest->Head.msgh_remote_port = 0;
4135 mach_msg_destroy(&bufRequest->Head);
4136 } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
4137 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
4138 // is present
4139 if (slowpath(bufReply->RetCode)) {
4140 if (bufReply->RetCode == MIG_NO_REPLY) {
4141 continue;
4142 }
4143
4144 // destroy the request - but not the reply port
4145 bufRequest->Head.msgh_remote_port = 0;
4146 mach_msg_destroy(&bufRequest->Head);
4147 }
4148 }
4149
4150 if (bufReply->Head.msgh_remote_port) {
4151 tmp_options |= MACH_SEND_MSG;
4152 if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) !=
4153 MACH_MSG_TYPE_MOVE_SEND_ONCE) {
4154 tmp_options |= MACH_SEND_TIMEOUT;
4155 }
4156 }
4157 }
4158
4159 out:
4160 if (assertion_token) {
4161 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4162 int r = proc_importance_assertion_complete(assertion_token);
4163 (void)dispatch_assume_zero(r);
4164 #endif
4165 }
4166
4167 return kr;
4168 }
4169
4170 #endif /* HAVE_MACH */
4171
4172 #pragma mark -
4173 #pragma mark dispatch_source_debug
4174
4175 DISPATCH_NOINLINE
4176 static const char *
4177 _evfiltstr(short filt)
4178 {
4179 switch (filt) {
4180 #define _evfilt2(f) case (f): return #f
4181 _evfilt2(EVFILT_READ);
4182 _evfilt2(EVFILT_WRITE);
4183 _evfilt2(EVFILT_AIO);
4184 _evfilt2(EVFILT_VNODE);
4185 _evfilt2(EVFILT_PROC);
4186 _evfilt2(EVFILT_SIGNAL);
4187 _evfilt2(EVFILT_TIMER);
4188 #ifdef EVFILT_VM
4189 _evfilt2(EVFILT_VM);
4190 #endif
4191 #ifdef EVFILT_MEMORYSTATUS
4192 _evfilt2(EVFILT_MEMORYSTATUS);
4193 #endif
4194 #if HAVE_MACH
4195 _evfilt2(EVFILT_MACHPORT);
4196 _evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION);
4197 #endif
4198 _evfilt2(EVFILT_FS);
4199 _evfilt2(EVFILT_USER);
4200
4201 _evfilt2(DISPATCH_EVFILT_TIMER);
4202 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
4203 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
4204 default:
4205 return "EVFILT_missing";
4206 }
4207 }
4208
4209 static size_t
4210 _dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
4211 {
4212 dispatch_queue_t target = ds->do_targetq;
4213 return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%lx, "
4214 "pending_data = 0x%lx, pending_data_mask = 0x%lx, ",
4215 target && target->dq_label ? target->dq_label : "", target,
4216 ds->ds_ident_hack, ds->ds_pending_data, ds->ds_pending_data_mask);
4217 }
4218
4219 static size_t
4220 _dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
4221 {
4222 dispatch_source_refs_t dr = ds->ds_refs;
4223 return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx,"
4224 " last_fire = 0x%llx, interval = 0x%llx, flags = 0x%lx }, ",
4225 ds_timer(dr).target, ds_timer(dr).deadline, ds_timer(dr).last_fire,
4226 ds_timer(dr).interval, ds_timer(dr).flags);
4227 }
4228
4229 size_t
4230 _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
4231 {
4232 size_t offset = 0;
4233 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4234 dx_kind(ds), ds);
4235 offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
4236 offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
4237 if (ds->ds_is_timer) {
4238 offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
4239 }
4240 offset += dsnprintf(&buf[offset], bufsiz - offset, "filter = %s }",
4241 ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????");
4242 return offset;
4243 }
4244
4245 static size_t
4246 _dispatch_mach_debug_attr(dispatch_mach_t dm, char* buf, size_t bufsiz)
4247 {
4248 dispatch_queue_t target = dm->do_targetq;
4249 return dsnprintf(buf, bufsiz, "target = %s[%p], receive = 0x%x, "
4250 "send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, "
4251 "sending = %d, disconnected = %d, canceled = %d ",
4252 target && target->dq_label ? target->dq_label : "", target,
4253 dm->ds_dkev ?(mach_port_t)dm->ds_dkev->dk_kevent.ident:0,
4254 dm->dm_refs->dm_send,
4255 dm->dm_dkev ?(mach_port_t)dm->dm_dkev->dk_kevent.ident:0,
4256 dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) ?
4257 " (armed)" : "", dm->dm_refs->dm_checkin_port,
4258 dm->dm_refs->dm_checkin ? " (pending)" : "",
4259 dm->dm_refs->dm_sending, dm->dm_refs->dm_disconnect_cnt,
4260 (bool)(dm->ds_atomic_flags & DSF_CANCELED));
4261 }
4262 size_t
4263 _dispatch_mach_debug(dispatch_mach_t dm, char* buf, size_t bufsiz)
4264 {
4265 size_t offset = 0;
4266 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4267 dm->dq_label ? dm->dq_label : dx_kind(dm), dm);
4268 offset += _dispatch_object_debug_attr(dm, &buf[offset], bufsiz - offset);
4269 offset += _dispatch_mach_debug_attr(dm, &buf[offset], bufsiz - offset);
4270 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
4271 return offset;
4272 }
4273
4274 #if DISPATCH_DEBUG
4275 static void
4276 _dispatch_kevent_debug(struct kevent64_s* kev, const char* str)
4277 {
4278 _dispatch_log("kevent[%p] = { ident = 0x%llx, filter = %s, flags = 0x%x, "
4279 "fflags = 0x%x, data = 0x%llx, udata = 0x%llx, ext[0] = 0x%llx, "
4280 "ext[1] = 0x%llx }: %s", kev, kev->ident, _evfiltstr(kev->filter),
4281 kev->flags, kev->fflags, kev->data, kev->udata, kev->ext[0],
4282 kev->ext[1], str);
4283 }
4284
4285 static void
4286 _dispatch_kevent_debugger2(void *context)
4287 {
4288 struct sockaddr sa;
4289 socklen_t sa_len = sizeof(sa);
4290 int c, fd = (int)(long)context;
4291 unsigned int i;
4292 dispatch_kevent_t dk;
4293 dispatch_source_t ds;
4294 dispatch_source_refs_t dr;
4295 FILE *debug_stream;
4296
4297 c = accept(fd, &sa, &sa_len);
4298 if (c == -1) {
4299 if (errno != EAGAIN) {
4300 (void)dispatch_assume_zero(errno);
4301 }
4302 return;
4303 }
4304 #if 0
4305 int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO
4306 if (r == -1) {
4307 (void)dispatch_assume_zero(errno);
4308 }
4309 #endif
4310 debug_stream = fdopen(c, "a");
4311 if (!dispatch_assume(debug_stream)) {
4312 close(c);
4313 return;
4314 }
4315
4316 fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
4317 fprintf(debug_stream, "Content-type: text/html\r\n");
4318 fprintf(debug_stream, "Pragma: nocache\r\n");
4319 fprintf(debug_stream, "\r\n");
4320 fprintf(debug_stream, "<html>\n");
4321 fprintf(debug_stream, "<head><title>PID %u</title></head>\n", getpid());
4322 fprintf(debug_stream, "<body>\n<ul>\n");
4323
4324 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td>"
4325 // "<td>DK</td><td>DK</td><td>DK</td></tr>\n");
4326
4327 for (i = 0; i < DSL_HASH_SIZE; i++) {
4328 if (TAILQ_EMPTY(&_dispatch_sources[i])) {
4329 continue;
4330 }
4331 TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
4332 fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags "
4333 "0x%hx fflags 0x%x data 0x%lx udata %p\n",
4334 dk, (unsigned long)dk->dk_kevent.ident,
4335 _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
4336 dk->dk_kevent.fflags, (unsigned long)dk->dk_kevent.data,
4337 (void*)dk->dk_kevent.udata);
4338 fprintf(debug_stream, "\t\t<ul>\n");
4339 TAILQ_FOREACH(dr, &dk->dk_sources, dr_list) {
4340 ds = _dispatch_source_from_refs(dr);
4341 fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend "
4342 "0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
4343 ds, ds->do_ref_cnt + 1, ds->do_suspend_cnt,
4344 ds->ds_pending_data, ds->ds_pending_data_mask,
4345 ds->ds_atomic_flags);
4346 if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
4347 dispatch_queue_t dq = ds->do_targetq;
4348 fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend "
4349 "0x%x label: %s\n", dq, dq->do_ref_cnt + 1,
4350 dq->do_suspend_cnt, dq->dq_label ? dq->dq_label:"");
4351 }
4352 }
4353 fprintf(debug_stream, "\t\t</ul>\n");
4354 fprintf(debug_stream, "\t</li>\n");
4355 }
4356 }
4357 fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
4358 fflush(debug_stream);
4359 fclose(debug_stream);
4360 }
4361
4362 static void
4363 _dispatch_kevent_debugger2_cancel(void *context)
4364 {
4365 int ret, fd = (int)(long)context;
4366
4367 ret = close(fd);
4368 if (ret != -1) {
4369 (void)dispatch_assume_zero(errno);
4370 }
4371 }
4372
4373 static void
4374 _dispatch_kevent_debugger(void *context DISPATCH_UNUSED)
4375 {
4376 union {
4377 struct sockaddr_in sa_in;
4378 struct sockaddr sa;
4379 } sa_u = {
4380 .sa_in = {
4381 .sin_family = AF_INET,
4382 .sin_addr = { htonl(INADDR_LOOPBACK), },
4383 },
4384 };
4385 dispatch_source_t ds;
4386 const char *valstr;
4387 int val, r, fd, sock_opt = 1;
4388 socklen_t slen = sizeof(sa_u);
4389
4390 if (issetugid()) {
4391 return;
4392 }
4393 valstr = getenv("LIBDISPATCH_DEBUGGER");
4394 if (!valstr) {
4395 return;
4396 }
4397 val = atoi(valstr);
4398 if (val == 2) {
4399 sa_u.sa_in.sin_addr.s_addr = 0;
4400 }
4401 fd = socket(PF_INET, SOCK_STREAM, 0);
4402 if (fd == -1) {
4403 (void)dispatch_assume_zero(errno);
4404 return;
4405 }
4406 r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt,
4407 (socklen_t) sizeof sock_opt);
4408 if (r == -1) {
4409 (void)dispatch_assume_zero(errno);
4410 goto out_bad;
4411 }
4412 #if 0
4413 r = fcntl(fd, F_SETFL, O_NONBLOCK);
4414 if (r == -1) {
4415 (void)dispatch_assume_zero(errno);
4416 goto out_bad;
4417 }
4418 #endif
4419 r = bind(fd, &sa_u.sa, sizeof(sa_u));
4420 if (r == -1) {
4421 (void)dispatch_assume_zero(errno);
4422 goto out_bad;
4423 }
4424 r = listen(fd, SOMAXCONN);
4425 if (r == -1) {
4426 (void)dispatch_assume_zero(errno);
4427 goto out_bad;
4428 }
4429 r = getsockname(fd, &sa_u.sa, &slen);
4430 if (r == -1) {
4431 (void)dispatch_assume_zero(errno);
4432 goto out_bad;
4433 }
4434
4435 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, (uintptr_t)fd, 0,
4436 &_dispatch_mgr_q);
4437 if (dispatch_assume(ds)) {
4438 _dispatch_log("LIBDISPATCH: debug port: %hu",
4439 (in_port_t)ntohs(sa_u.sa_in.sin_port));
4440
4441 /* ownership of fd transfers to ds */
4442 dispatch_set_context(ds, (void *)(long)fd);
4443 dispatch_source_set_event_handler_f(ds, _dispatch_kevent_debugger2);
4444 dispatch_source_set_cancel_handler_f(ds,
4445 _dispatch_kevent_debugger2_cancel);
4446 dispatch_resume(ds);
4447
4448 return;
4449 }
4450 out_bad:
4451 close(fd);
4452 }
4453
4454 #if HAVE_MACH
4455
4456 #ifndef MACH_PORT_TYPE_SPREQUEST
4457 #define MACH_PORT_TYPE_SPREQUEST 0x40000000
4458 #endif
4459
4460 DISPATCH_NOINLINE
4461 void
4462 dispatch_debug_machport(mach_port_t name, const char* str)
4463 {
4464 mach_port_type_t type;
4465 mach_msg_bits_t ns = 0, nr = 0, nso = 0, nd = 0;
4466 unsigned int dnreqs = 0, dnrsiz;
4467 kern_return_t kr = mach_port_type(mach_task_self(), name, &type);
4468 if (kr) {
4469 _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name,
4470 kr, mach_error_string(kr), str);
4471 return;
4472 }
4473 if (type & MACH_PORT_TYPE_SEND) {
4474 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4475 MACH_PORT_RIGHT_SEND, &ns));
4476 }
4477 if (type & MACH_PORT_TYPE_SEND_ONCE) {
4478 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4479 MACH_PORT_RIGHT_SEND_ONCE, &nso));
4480 }
4481 if (type & MACH_PORT_TYPE_DEAD_NAME) {
4482 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4483 MACH_PORT_RIGHT_DEAD_NAME, &nd));
4484 }
4485 if (type & (MACH_PORT_TYPE_RECEIVE|MACH_PORT_TYPE_SEND)) {
4486 (void)dispatch_assume_zero(mach_port_dnrequest_info(mach_task_self(),
4487 name, &dnrsiz, &dnreqs));
4488 }
4489 if (type & MACH_PORT_TYPE_RECEIVE) {
4490 mach_port_status_t status = { .mps_pset = 0, };
4491 mach_msg_type_number_t cnt = MACH_PORT_RECEIVE_STATUS_COUNT;
4492 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4493 MACH_PORT_RIGHT_RECEIVE, &nr));
4494 (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
4495 name, MACH_PORT_RECEIVE_STATUS, (void*)&status, &cnt));
4496 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4497 "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
4498 "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
4499 "seqno(%03u) }: %s", name, nr, ns, nso, nd, dnreqs,
4500 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N",
4501 status.mps_nsrequest ? "Y":"N", status.mps_pdrequest ? "Y":"N",
4502 status.mps_srights ? "Y":"N", status.mps_sorights,
4503 status.mps_qlimit, status.mps_msgcount, status.mps_mscount,
4504 status.mps_seqno, str);
4505 } else if (type & (MACH_PORT_TYPE_SEND|MACH_PORT_TYPE_SEND_ONCE|
4506 MACH_PORT_TYPE_DEAD_NAME)) {
4507 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4508 "dnreqs(%03u) spreq(%s) }: %s", name, nr, ns, nso, nd, dnreqs,
4509 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", str);
4510 } else {
4511 _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name, type,
4512 str);
4513 }
4514 }
4515
4516 #endif // HAVE_MACH
4517
4518 #endif // DISPATCH_DEBUG