]> git.saurik.com Git - apple/libdispatch.git/blob - src/source.c
libdispatch-442.1.4.tar.gz
[apple/libdispatch.git] / src / source.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #include "protocolServer.h"
25 #endif
26 #include <sys/mount.h>
27
28 static void _dispatch_source_merge_kevent(dispatch_source_t ds,
29 const struct kevent64_s *ke);
30 static bool _dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp);
31 static void _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg);
32 static bool _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
33 uint32_t del_flags);
34 static void _dispatch_kevent_drain(struct kevent64_s *ke);
35 static void _dispatch_kevent_merge(struct kevent64_s *ke);
36 static void _dispatch_timers_kevent(struct kevent64_s *ke);
37 static void _dispatch_timers_unregister(dispatch_source_t ds,
38 dispatch_kevent_t dk);
39 static void _dispatch_timers_update(dispatch_source_t ds);
40 static void _dispatch_timer_aggregates_check(void);
41 static void _dispatch_timer_aggregates_register(dispatch_source_t ds);
42 static void _dispatch_timer_aggregates_update(dispatch_source_t ds,
43 unsigned int tidx);
44 static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds,
45 unsigned int tidx);
46 static inline unsigned long _dispatch_source_timer_data(
47 dispatch_source_refs_t dr, unsigned long prev);
48 static long _dispatch_kq_update(const struct kevent64_s *);
49 static void _dispatch_memorystatus_init(void);
50 #if HAVE_MACH
51 static void _dispatch_mach_host_calendar_change_register(void);
52 static void _dispatch_mach_recv_msg_buf_init(void);
53 static kern_return_t _dispatch_kevent_machport_resume(dispatch_kevent_t dk,
54 uint32_t new_flags, uint32_t del_flags);
55 static kern_return_t _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk,
56 uint32_t new_flags, uint32_t del_flags);
57 static inline void _dispatch_kevent_mach_portset(struct kevent64_s *ke);
58 #else
59 static inline void _dispatch_mach_host_calendar_change_register(void) {}
60 static inline void _dispatch_mach_recv_msg_buf_init(void) {}
61 #endif
62 static const char * _evfiltstr(short filt);
63 #if DISPATCH_DEBUG
64 static void _dispatch_kevent_debug(struct kevent64_s* kev, const char* str);
65 static void _dispatch_kevent_debugger(void *context);
66 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \
67 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q)
68 #else
69 static inline void
70 _dispatch_kevent_debug(struct kevent64_s* kev DISPATCH_UNUSED,
71 const char* str DISPATCH_UNUSED) {}
72 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE()
73 #endif
74
75 #pragma mark -
76 #pragma mark dispatch_source_t
77
78 dispatch_source_t
79 dispatch_source_create(dispatch_source_type_t type,
80 uintptr_t handle,
81 unsigned long mask,
82 dispatch_queue_t q)
83 {
84 const struct kevent64_s *proto_kev = &type->ke;
85 dispatch_source_t ds;
86 dispatch_kevent_t dk;
87
88 // input validation
89 if (type == NULL || (mask & ~type->mask)) {
90 return NULL;
91 }
92
93 switch (type->ke.filter) {
94 case EVFILT_SIGNAL:
95 if (handle >= NSIG) {
96 return NULL;
97 }
98 break;
99 case EVFILT_FS:
100 #if DISPATCH_USE_VM_PRESSURE
101 case EVFILT_VM:
102 #endif
103 #if DISPATCH_USE_MEMORYSTATUS
104 case EVFILT_MEMORYSTATUS:
105 #endif
106 case DISPATCH_EVFILT_CUSTOM_ADD:
107 case DISPATCH_EVFILT_CUSTOM_OR:
108 if (handle) {
109 return NULL;
110 }
111 break;
112 case DISPATCH_EVFILT_TIMER:
113 if (!!handle ^ !!type->ke.ident) {
114 return NULL;
115 }
116 break;
117 default:
118 break;
119 }
120
121 ds = _dispatch_alloc(DISPATCH_VTABLE(source),
122 sizeof(struct dispatch_source_s));
123 // Initialize as a queue first, then override some settings below.
124 _dispatch_queue_init((dispatch_queue_t)ds);
125 ds->dq_label = "source";
126
127 ds->do_ref_cnt++; // the reference the manager queue holds
128 ds->do_ref_cnt++; // since source is created suspended
129 ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
130 // The initial target queue is the manager queue, in order to get
131 // the source installed. <rdar://problem/8928171>
132 ds->do_targetq = &_dispatch_mgr_q;
133
134 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
135 dk->dk_kevent = *proto_kev;
136 dk->dk_kevent.ident = handle;
137 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
138 dk->dk_kevent.fflags |= (uint32_t)mask;
139 dk->dk_kevent.udata = (uintptr_t)dk;
140 TAILQ_INIT(&dk->dk_sources);
141
142 ds->ds_dkev = dk;
143 ds->ds_pending_data_mask = dk->dk_kevent.fflags;
144 ds->ds_ident_hack = (uintptr_t)dk->dk_kevent.ident;
145 if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
146 ds->ds_is_level = true;
147 ds->ds_needs_rearm = true;
148 } else if (!(EV_CLEAR & proto_kev->flags)) {
149 // we cheat and use EV_CLEAR to mean a "flag thingy"
150 ds->ds_is_adder = true;
151 }
152 // Some sources require special processing
153 if (type->init != NULL) {
154 type->init(ds, type, handle, mask, q);
155 }
156 dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
157
158 if (fastpath(!ds->ds_refs)) {
159 ds->ds_refs = _dispatch_calloc(1ul,
160 sizeof(struct dispatch_source_refs_s));
161 }
162 ds->ds_refs->dr_source_wref = _dispatch_ptr2wref(ds);
163
164 // First item on the queue sets the user-specified target queue
165 dispatch_set_target_queue(ds, q);
166 _dispatch_object_debug(ds, "%s", __func__);
167 return ds;
168 }
169
170 void
171 _dispatch_source_dispose(dispatch_source_t ds)
172 {
173 _dispatch_object_debug(ds, "%s", __func__);
174 free(ds->ds_refs);
175 _dispatch_queue_destroy(ds);
176 }
177
178 void
179 _dispatch_source_xref_dispose(dispatch_source_t ds)
180 {
181 _dispatch_wakeup(ds);
182 }
183
184 void
185 dispatch_source_cancel(dispatch_source_t ds)
186 {
187 _dispatch_object_debug(ds, "%s", __func__);
188 // Right after we set the cancel flag, someone else
189 // could potentially invoke the source, do the cancelation,
190 // unregister the source, and deallocate it. We would
191 // need to therefore retain/release before setting the bit
192
193 _dispatch_retain(ds);
194 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_CANCELED, relaxed);
195 _dispatch_wakeup(ds);
196 _dispatch_release(ds);
197 }
198
199 long
200 dispatch_source_testcancel(dispatch_source_t ds)
201 {
202 return (bool)(ds->ds_atomic_flags & DSF_CANCELED);
203 }
204
205 unsigned long
206 dispatch_source_get_mask(dispatch_source_t ds)
207 {
208 unsigned long mask = ds->ds_pending_data_mask;
209 if (ds->ds_vmpressure_override) {
210 mask = NOTE_VM_PRESSURE;
211 }
212 #if TARGET_IPHONE_SIMULATOR
213 else if (ds->ds_memorystatus_override) {
214 mask = NOTE_MEMORYSTATUS_PRESSURE_WARN;
215 }
216 #endif
217 return mask;
218 }
219
220 uintptr_t
221 dispatch_source_get_handle(dispatch_source_t ds)
222 {
223 unsigned int handle = (unsigned int)ds->ds_ident_hack;
224 #if TARGET_IPHONE_SIMULATOR
225 if (ds->ds_memorystatus_override) {
226 handle = 0;
227 }
228 #endif
229 return handle;
230 }
231
232 unsigned long
233 dispatch_source_get_data(dispatch_source_t ds)
234 {
235 unsigned long data = ds->ds_data;
236 if (ds->ds_vmpressure_override) {
237 data = NOTE_VM_PRESSURE;
238 }
239 #if TARGET_IPHONE_SIMULATOR
240 else if (ds->ds_memorystatus_override) {
241 data = NOTE_MEMORYSTATUS_PRESSURE_WARN;
242 }
243 #endif
244 return data;
245 }
246
247 void
248 dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
249 {
250 struct kevent64_s kev = {
251 .fflags = (typeof(kev.fflags))val,
252 .data = (typeof(kev.data))val,
253 };
254
255 dispatch_assert(
256 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
257 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
258
259 _dispatch_source_merge_kevent(ds, &kev);
260 }
261
262 #pragma mark -
263 #pragma mark dispatch_source_handler
264
265 DISPATCH_ALWAYS_INLINE
266 static inline dispatch_continuation_t
267 _dispatch_source_handler_alloc(dispatch_source_t ds, void *handler, long kind,
268 bool block)
269 {
270 dispatch_continuation_t dc = _dispatch_continuation_alloc();
271 if (handler) {
272 dc->do_vtable = (void *)((block ? DISPATCH_OBJ_BLOCK_RELEASE_BIT :
273 DISPATCH_OBJ_CTXT_FETCH_BIT) | (kind != DS_EVENT_HANDLER ?
274 DISPATCH_OBJ_ASYNC_BIT : 0l));
275 dc->dc_priority = 0;
276 dc->dc_voucher = NULL;
277 if (block) {
278 #ifdef __BLOCKS__
279 if (slowpath(_dispatch_block_has_private_data(handler))) {
280 // sources don't propagate priority by default
281 dispatch_block_flags_t flags = DISPATCH_BLOCK_NO_QOS_CLASS;
282 flags |= _dispatch_block_get_flags(handler);
283 _dispatch_continuation_priority_set(dc,
284 _dispatch_block_get_priority(handler), flags);
285 }
286 if (kind != DS_EVENT_HANDLER) {
287 dc->dc_func = _dispatch_call_block_and_release;
288 } else {
289 dc->dc_func = _dispatch_Block_invoke(handler);
290 }
291 dc->dc_ctxt = _dispatch_Block_copy(handler);
292 #endif /* __BLOCKS__ */
293 } else {
294 dc->dc_func = handler;
295 dc->dc_ctxt = ds->do_ctxt;
296 }
297 _dispatch_trace_continuation_push((dispatch_queue_t)ds, dc);
298 } else {
299 dc->dc_func = NULL;
300 }
301 dc->dc_data = (void*)kind;
302 return dc;
303 }
304
305 static inline void
306 _dispatch_source_handler_replace(dispatch_source_refs_t dr, long kind,
307 dispatch_continuation_t dc_new)
308 {
309 dispatch_continuation_t dc = dr->ds_handler[kind];
310 if (dc) {
311 #ifdef __BLOCKS__
312 if ((long)dc->do_vtable & DISPATCH_OBJ_BLOCK_RELEASE_BIT) {
313 Block_release(dc->dc_ctxt);
314 }
315 #endif /* __BLOCKS__ */
316 if (dc->dc_voucher) {
317 _voucher_release(dc->dc_voucher);
318 dc->dc_voucher = NULL;
319 }
320 _dispatch_continuation_free(dc);
321 }
322 dr->ds_handler[kind] = dc_new;
323 }
324
325 static inline void
326 _dispatch_source_handler_free(dispatch_source_refs_t dr, long kind)
327 {
328 _dispatch_source_handler_replace(dr, kind, NULL);
329 }
330
331 static void
332 _dispatch_source_set_handler(void *context)
333 {
334 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
335 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
336 dispatch_continuation_t dc = context;
337 long kind = (long)dc->dc_data;
338 dc->dc_data = 0;
339 if (!dc->dc_func) {
340 _dispatch_continuation_free(dc);
341 dc = NULL;
342 } else if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) {
343 dc->dc_ctxt = ds->do_ctxt;
344 }
345 _dispatch_source_handler_replace(ds->ds_refs, kind, dc);
346 if (kind == DS_EVENT_HANDLER && dc && dc->dc_priority) {
347 #if HAVE_PTHREAD_WORKQUEUE_QOS
348 ds->dq_priority = dc->dc_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK;
349 _dispatch_queue_set_override_priority((dispatch_queue_t)ds);
350 #endif
351 }
352 }
353
354 #ifdef __BLOCKS__
355 void
356 dispatch_source_set_event_handler(dispatch_source_t ds,
357 dispatch_block_t handler)
358 {
359 dispatch_continuation_t dc;
360 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true);
361 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
362 _dispatch_source_set_handler);
363 }
364 #endif /* __BLOCKS__ */
365
366 void
367 dispatch_source_set_event_handler_f(dispatch_source_t ds,
368 dispatch_function_t handler)
369 {
370 dispatch_continuation_t dc;
371 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false);
372 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
373 _dispatch_source_set_handler);
374 }
375
376 void
377 _dispatch_source_set_event_handler_with_context_f(dispatch_source_t ds,
378 void *ctxt, dispatch_function_t handler)
379 {
380 dispatch_continuation_t dc;
381 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false);
382 dc->do_vtable = (void *)((long)dc->do_vtable &~DISPATCH_OBJ_CTXT_FETCH_BIT);
383 dc->dc_other = dc->dc_ctxt;
384 dc->dc_ctxt = ctxt;
385 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
386 _dispatch_source_set_handler);
387 }
388
389 #ifdef __BLOCKS__
390 void
391 dispatch_source_set_cancel_handler(dispatch_source_t ds,
392 dispatch_block_t handler)
393 {
394 dispatch_continuation_t dc;
395 dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, true);
396 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
397 _dispatch_source_set_handler);
398 }
399 #endif /* __BLOCKS__ */
400
401 void
402 dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
403 dispatch_function_t handler)
404 {
405 dispatch_continuation_t dc;
406 dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, false);
407 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
408 _dispatch_source_set_handler);
409 }
410
411 #ifdef __BLOCKS__
412 void
413 dispatch_source_set_registration_handler(dispatch_source_t ds,
414 dispatch_block_t handler)
415 {
416 dispatch_continuation_t dc;
417 dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, true);
418 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
419 _dispatch_source_set_handler);
420 }
421 #endif /* __BLOCKS__ */
422
423 void
424 dispatch_source_set_registration_handler_f(dispatch_source_t ds,
425 dispatch_function_t handler)
426 {
427 dispatch_continuation_t dc;
428 dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, false);
429 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
430 _dispatch_source_set_handler);
431 }
432
433 #pragma mark -
434 #pragma mark dispatch_source_invoke
435
436 static void
437 _dispatch_source_registration_callout(dispatch_source_t ds)
438 {
439 dispatch_source_refs_t dr = ds->ds_refs;
440 dispatch_continuation_t dc = dr->ds_handler[DS_REGISTN_HANDLER];
441 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
442 // no registration callout if source is canceled rdar://problem/8955246
443 return _dispatch_source_handler_free(dr, DS_REGISTN_HANDLER);
444 }
445 pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority);
446 if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) {
447 dc->dc_ctxt = ds->do_ctxt;
448 }
449 _dispatch_continuation_pop(dc);
450 dr->ds_handler[DS_REGISTN_HANDLER] = NULL;
451 _dispatch_reset_defaultpriority(old_dp);
452 }
453
454 static void
455 _dispatch_source_cancel_callout(dispatch_source_t ds)
456 {
457 dispatch_source_refs_t dr = ds->ds_refs;
458 dispatch_continuation_t dc = dr->ds_handler[DS_CANCEL_HANDLER];
459 ds->ds_pending_data_mask = 0;
460 ds->ds_pending_data = 0;
461 ds->ds_data = 0;
462 _dispatch_source_handler_free(dr, DS_EVENT_HANDLER);
463 _dispatch_source_handler_free(dr, DS_REGISTN_HANDLER);
464 if (!dc) {
465 return;
466 }
467 if (!(ds->ds_atomic_flags & DSF_CANCELED)) {
468 return _dispatch_source_handler_free(dr, DS_CANCEL_HANDLER);
469 }
470 pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority);
471 if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) {
472 dc->dc_ctxt = ds->do_ctxt;
473 }
474 _dispatch_continuation_pop(dc);
475 dr->ds_handler[DS_CANCEL_HANDLER] = NULL;
476 _dispatch_reset_defaultpriority(old_dp);
477 }
478
479 static void
480 _dispatch_source_latch_and_call(dispatch_source_t ds)
481 {
482 unsigned long prev;
483
484 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
485 return;
486 }
487 dispatch_source_refs_t dr = ds->ds_refs;
488 dispatch_continuation_t dc = dr->ds_handler[DS_EVENT_HANDLER];
489 prev = dispatch_atomic_xchg2o(ds, ds_pending_data, 0, relaxed);
490 if (ds->ds_is_level) {
491 ds->ds_data = ~prev;
492 } else if (ds->ds_is_timer && ds_timer(dr).target && prev) {
493 ds->ds_data = _dispatch_source_timer_data(dr, prev);
494 } else {
495 ds->ds_data = prev;
496 }
497 if (!dispatch_assume(prev) || !dc) {
498 return;
499 }
500 pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority);
501 _dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dc);
502 voucher_t voucher = dc->dc_voucher ? _voucher_retain(dc->dc_voucher) : NULL;
503 _dispatch_continuation_voucher_adopt(dc); // consumes voucher reference
504 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
505 _dispatch_introspection_queue_item_complete(dc);
506 if (voucher) dc->dc_voucher = voucher;
507 _dispatch_reset_defaultpriority(old_dp);
508 }
509
510 static void
511 _dispatch_source_kevent_unregister(dispatch_source_t ds)
512 {
513 _dispatch_object_debug(ds, "%s", __func__);
514 dispatch_kevent_t dk = ds->ds_dkev;
515 ds->ds_dkev = NULL;
516 switch (dk->dk_kevent.filter) {
517 case DISPATCH_EVFILT_TIMER:
518 _dispatch_timers_unregister(ds, dk);
519 break;
520 default:
521 TAILQ_REMOVE(&dk->dk_sources, ds->ds_refs, dr_list);
522 _dispatch_kevent_unregister(dk, (uint32_t)ds->ds_pending_data_mask);
523 break;
524 }
525
526 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
527 ds->ds_needs_rearm = false; // re-arm is pointless and bad now
528 _dispatch_release(ds); // the retain is done at creation time
529 }
530
531 static void
532 _dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags)
533 {
534 switch (ds->ds_dkev->dk_kevent.filter) {
535 case DISPATCH_EVFILT_TIMER:
536 return _dispatch_timers_update(ds);
537 case EVFILT_MACHPORT:
538 if (ds->ds_pending_data_mask & DISPATCH_MACH_RECV_MESSAGE) {
539 new_flags |= DISPATCH_MACH_RECV_MESSAGE; // emulate EV_DISPATCH
540 }
541 break;
542 }
543 if (_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0)) {
544 _dispatch_source_kevent_unregister(ds);
545 }
546 }
547
548 static void
549 _dispatch_source_kevent_register(dispatch_source_t ds)
550 {
551 dispatch_assert_zero(ds->ds_is_installed);
552 switch (ds->ds_dkev->dk_kevent.filter) {
553 case DISPATCH_EVFILT_TIMER:
554 return _dispatch_timers_update(ds);
555 }
556 uint32_t flags;
557 bool do_resume = _dispatch_kevent_register(&ds->ds_dkev, &flags);
558 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds->ds_refs, dr_list);
559 if (do_resume || ds->ds_needs_rearm) {
560 _dispatch_source_kevent_resume(ds, flags);
561 }
562 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
563 _dispatch_object_debug(ds, "%s", __func__);
564 }
565
566 DISPATCH_ALWAYS_INLINE
567 static inline dispatch_queue_t
568 _dispatch_source_invoke2(dispatch_object_t dou,
569 _dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED)
570 {
571 dispatch_source_t ds = dou._ds;
572 if (slowpath(_dispatch_queue_drain(ds))) {
573 DISPATCH_CLIENT_CRASH("Sync onto source");
574 }
575
576 // This function performs all source actions. Each action is responsible
577 // for verifying that it takes place on the appropriate queue. If the
578 // current queue is not the correct queue for this action, the correct queue
579 // will be returned and the invoke will be re-driven on that queue.
580
581 // The order of tests here in invoke and in probe should be consistent.
582
583 dispatch_queue_t dq = _dispatch_queue_get_current();
584 dispatch_source_refs_t dr = ds->ds_refs;
585
586 if (!ds->ds_is_installed) {
587 // The source needs to be installed on the manager queue.
588 if (dq != &_dispatch_mgr_q) {
589 return &_dispatch_mgr_q;
590 }
591 _dispatch_source_kevent_register(ds);
592 ds->ds_is_installed = true;
593 if (dr->ds_handler[DS_REGISTN_HANDLER]) {
594 return ds->do_targetq;
595 }
596 if (slowpath(ds->do_xref_cnt == -1)) {
597 return &_dispatch_mgr_q; // rdar://problem/9558246
598 }
599 } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
600 // Source suspended by an item drained from the source queue.
601 return NULL;
602 } else if (dr->ds_handler[DS_REGISTN_HANDLER]) {
603 // The source has been registered and the registration handler needs
604 // to be delivered on the target queue.
605 if (dq != ds->do_targetq) {
606 return ds->do_targetq;
607 }
608 // clears ds_registration_handler
609 _dispatch_source_registration_callout(ds);
610 if (slowpath(ds->do_xref_cnt == -1)) {
611 return &_dispatch_mgr_q; // rdar://problem/9558246
612 }
613 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
614 // The source has been cancelled and needs to be uninstalled from the
615 // manager queue. After uninstallation, the cancellation handler needs
616 // to be delivered to the target queue.
617 if (ds->ds_dkev) {
618 if (dq != &_dispatch_mgr_q) {
619 return &_dispatch_mgr_q;
620 }
621 _dispatch_source_kevent_unregister(ds);
622 }
623 if (dr->ds_handler[DS_EVENT_HANDLER] ||
624 dr->ds_handler[DS_CANCEL_HANDLER] ||
625 dr->ds_handler[DS_REGISTN_HANDLER]) {
626 if (dq != ds->do_targetq) {
627 return ds->do_targetq;
628 }
629 }
630 _dispatch_source_cancel_callout(ds);
631 } else if (ds->ds_pending_data) {
632 // The source has pending data to deliver via the event handler callback
633 // on the target queue. Some sources need to be rearmed on the manager
634 // queue after event delivery.
635 if (dq != ds->do_targetq) {
636 return ds->do_targetq;
637 }
638 _dispatch_source_latch_and_call(ds);
639 if (ds->ds_needs_rearm) {
640 return &_dispatch_mgr_q;
641 }
642 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
643 // The source needs to be rearmed on the manager queue.
644 if (dq != &_dispatch_mgr_q) {
645 return &_dispatch_mgr_q;
646 }
647 _dispatch_source_kevent_resume(ds, 0);
648 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
649 }
650
651 return NULL;
652 }
653
654 DISPATCH_NOINLINE
655 void
656 _dispatch_source_invoke(dispatch_source_t ds)
657 {
658 _dispatch_queue_class_invoke(ds, _dispatch_source_invoke2);
659 }
660
661 unsigned long
662 _dispatch_source_probe(dispatch_source_t ds)
663 {
664 // This function determines whether the source needs to be invoked.
665 // The order of tests here in probe and in invoke should be consistent.
666
667 dispatch_source_refs_t dr = ds->ds_refs;
668 if (!ds->ds_is_installed) {
669 // The source needs to be installed on the manager queue.
670 return true;
671 } else if (dr->ds_handler[DS_REGISTN_HANDLER]) {
672 // The registration handler needs to be delivered to the target queue.
673 return true;
674 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
675 // The source needs to be uninstalled from the manager queue, or the
676 // cancellation handler needs to be delivered to the target queue.
677 // Note: cancellation assumes installation.
678 if (ds->ds_dkev || dr->ds_handler[DS_EVENT_HANDLER] ||
679 dr->ds_handler[DS_CANCEL_HANDLER] ||
680 dr->ds_handler[DS_REGISTN_HANDLER]) {
681 return true;
682 }
683 } else if (ds->ds_pending_data) {
684 // The source has pending data to deliver to the target queue.
685 return true;
686 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
687 // The source needs to be rearmed on the manager queue.
688 return true;
689 }
690 return _dispatch_queue_class_probe(ds);
691 }
692
693 static void
694 _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent64_s *ke)
695 {
696 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
697 return;
698 }
699 if (ds->ds_is_level) {
700 // ke->data is signed and "negative available data" makes no sense
701 // zero bytes happens when EV_EOF is set
702 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
703 dispatch_assert(ke->data >= 0l);
704 dispatch_atomic_store2o(ds, ds_pending_data, ~(unsigned long)ke->data,
705 relaxed);
706 } else if (ds->ds_is_adder) {
707 (void)dispatch_atomic_add2o(ds, ds_pending_data,
708 (unsigned long)ke->data, relaxed);
709 } else if (ke->fflags & ds->ds_pending_data_mask) {
710 (void)dispatch_atomic_or2o(ds, ds_pending_data,
711 ke->fflags & ds->ds_pending_data_mask, relaxed);
712 }
713 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
714 if (ds->ds_needs_rearm) {
715 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
716 }
717
718 _dispatch_wakeup(ds);
719 }
720
721 #pragma mark -
722 #pragma mark dispatch_kevent_t
723
724 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
725 static void _dispatch_kevent_guard(dispatch_kevent_t dk);
726 static void _dispatch_kevent_unguard(dispatch_kevent_t dk);
727 #else
728 static inline void _dispatch_kevent_guard(dispatch_kevent_t dk) { (void)dk; }
729 static inline void _dispatch_kevent_unguard(dispatch_kevent_t dk) { (void)dk; }
730 #endif
731
732 static struct dispatch_kevent_s _dispatch_kevent_data_or = {
733 .dk_kevent = {
734 .filter = DISPATCH_EVFILT_CUSTOM_OR,
735 .flags = EV_CLEAR,
736 },
737 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
738 };
739 static struct dispatch_kevent_s _dispatch_kevent_data_add = {
740 .dk_kevent = {
741 .filter = DISPATCH_EVFILT_CUSTOM_ADD,
742 },
743 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
744 };
745
746 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
747
748 DISPATCH_CACHELINE_ALIGN
749 static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
750
751 static void
752 _dispatch_kevent_init()
753 {
754 unsigned int i;
755 for (i = 0; i < DSL_HASH_SIZE; i++) {
756 TAILQ_INIT(&_dispatch_sources[i]);
757 }
758
759 TAILQ_INSERT_TAIL(&_dispatch_sources[0],
760 &_dispatch_kevent_data_or, dk_list);
761 TAILQ_INSERT_TAIL(&_dispatch_sources[0],
762 &_dispatch_kevent_data_add, dk_list);
763 _dispatch_kevent_data_or.dk_kevent.udata =
764 (uintptr_t)&_dispatch_kevent_data_or;
765 _dispatch_kevent_data_add.dk_kevent.udata =
766 (uintptr_t)&_dispatch_kevent_data_add;
767 }
768
769 static inline uintptr_t
770 _dispatch_kevent_hash(uint64_t ident, short filter)
771 {
772 uint64_t value;
773 #if HAVE_MACH
774 value = (filter == EVFILT_MACHPORT ||
775 filter == DISPATCH_EVFILT_MACH_NOTIFICATION ?
776 MACH_PORT_INDEX(ident) : ident);
777 #else
778 value = ident;
779 #endif
780 return DSL_HASH((uintptr_t)value);
781 }
782
783 static dispatch_kevent_t
784 _dispatch_kevent_find(uint64_t ident, short filter)
785 {
786 uintptr_t hash = _dispatch_kevent_hash(ident, filter);
787 dispatch_kevent_t dki;
788
789 TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
790 if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
791 break;
792 }
793 }
794 return dki;
795 }
796
797 static void
798 _dispatch_kevent_insert(dispatch_kevent_t dk)
799 {
800 _dispatch_kevent_guard(dk);
801 uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
802 dk->dk_kevent.filter);
803 TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
804 }
805
806 // Find existing kevents, and merge any new flags if necessary
807 static bool
808 _dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp)
809 {
810 dispatch_kevent_t dk, ds_dkev = *dkp;
811 uint32_t new_flags;
812 bool do_resume = false;
813
814 dk = _dispatch_kevent_find(ds_dkev->dk_kevent.ident,
815 ds_dkev->dk_kevent.filter);
816 if (dk) {
817 // If an existing dispatch kevent is found, check to see if new flags
818 // need to be added to the existing kevent
819 new_flags = ~dk->dk_kevent.fflags & ds_dkev->dk_kevent.fflags;
820 dk->dk_kevent.fflags |= ds_dkev->dk_kevent.fflags;
821 free(ds_dkev);
822 *dkp = dk;
823 do_resume = new_flags;
824 } else {
825 dk = ds_dkev;
826 _dispatch_kevent_insert(dk);
827 new_flags = dk->dk_kevent.fflags;
828 do_resume = true;
829 }
830 // Re-register the kevent with the kernel if new flags were added
831 // by the dispatch kevent
832 if (do_resume) {
833 dk->dk_kevent.flags |= EV_ADD;
834 }
835 *flgp = new_flags;
836 return do_resume;
837 }
838
839 static bool
840 _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
841 uint32_t del_flags)
842 {
843 long r;
844 switch (dk->dk_kevent.filter) {
845 case DISPATCH_EVFILT_TIMER:
846 case DISPATCH_EVFILT_CUSTOM_ADD:
847 case DISPATCH_EVFILT_CUSTOM_OR:
848 // these types not registered with kevent
849 return 0;
850 #if HAVE_MACH
851 case EVFILT_MACHPORT:
852 return _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
853 case DISPATCH_EVFILT_MACH_NOTIFICATION:
854 return _dispatch_kevent_mach_notify_resume(dk, new_flags, del_flags);
855 #endif
856 case EVFILT_PROC:
857 if (dk->dk_kevent.flags & EV_ONESHOT) {
858 return 0;
859 }
860 // fall through
861 default:
862 r = _dispatch_kq_update(&dk->dk_kevent);
863 if (dk->dk_kevent.flags & EV_DISPATCH) {
864 dk->dk_kevent.flags &= ~EV_ADD;
865 }
866 return r;
867 }
868 }
869
870 static void
871 _dispatch_kevent_dispose(dispatch_kevent_t dk)
872 {
873 uintptr_t hash;
874
875 switch (dk->dk_kevent.filter) {
876 case DISPATCH_EVFILT_TIMER:
877 case DISPATCH_EVFILT_CUSTOM_ADD:
878 case DISPATCH_EVFILT_CUSTOM_OR:
879 // these sources live on statically allocated lists
880 return;
881 #if HAVE_MACH
882 case EVFILT_MACHPORT:
883 _dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
884 break;
885 case DISPATCH_EVFILT_MACH_NOTIFICATION:
886 _dispatch_kevent_mach_notify_resume(dk, 0, dk->dk_kevent.fflags);
887 break;
888 #endif
889 case EVFILT_PROC:
890 if (dk->dk_kevent.flags & EV_ONESHOT) {
891 break; // implicitly deleted
892 }
893 // fall through
894 default:
895 if (~dk->dk_kevent.flags & EV_DELETE) {
896 dk->dk_kevent.flags |= EV_DELETE;
897 dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE);
898 _dispatch_kq_update(&dk->dk_kevent);
899 }
900 break;
901 }
902
903 hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
904 dk->dk_kevent.filter);
905 TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list);
906 _dispatch_kevent_unguard(dk);
907 free(dk);
908 }
909
910 static void
911 _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg)
912 {
913 dispatch_source_refs_t dri;
914 uint32_t del_flags, fflags = 0;
915
916 if (TAILQ_EMPTY(&dk->dk_sources)) {
917 _dispatch_kevent_dispose(dk);
918 } else {
919 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
920 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
921 uint32_t mask = (uint32_t)dsi->ds_pending_data_mask;
922 fflags |= mask;
923 }
924 del_flags = flg & ~fflags;
925 if (del_flags) {
926 dk->dk_kevent.flags |= EV_ADD;
927 dk->dk_kevent.fflags = fflags;
928 _dispatch_kevent_resume(dk, 0, del_flags);
929 }
930 }
931 }
932
933 DISPATCH_NOINLINE
934 static void
935 _dispatch_kevent_proc_exit(struct kevent64_s *ke)
936 {
937 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
938 // <rdar://problem/5067725>. As a workaround, we simulate an exit event for
939 // any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
940 struct kevent64_s fake;
941 fake = *ke;
942 fake.flags &= ~EV_ERROR;
943 fake.fflags = NOTE_EXIT;
944 fake.data = 0;
945 _dispatch_kevent_drain(&fake);
946 }
947
948 DISPATCH_NOINLINE
949 static void
950 _dispatch_kevent_error(struct kevent64_s *ke)
951 {
952 _dispatch_kevent_debug(ke, __func__);
953 if (ke->data) {
954 // log the unexpected error
955 _dispatch_bug_kevent_client("kevent", _evfiltstr(ke->filter),
956 ke->flags & EV_DELETE ? "delete" :
957 ke->flags & EV_ADD ? "add" :
958 ke->flags & EV_ENABLE ? "enable" : "monitor",
959 (int)ke->data);
960 }
961 }
962
963 static void
964 _dispatch_kevent_drain(struct kevent64_s *ke)
965 {
966 #if DISPATCH_DEBUG
967 static dispatch_once_t pred;
968 dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
969 #endif
970 if (ke->filter == EVFILT_USER) {
971 return;
972 }
973 if (slowpath(ke->flags & EV_ERROR)) {
974 if (ke->filter == EVFILT_PROC) {
975 if (ke->flags & EV_DELETE) {
976 // Process exited while monitored
977 return;
978 } else if (ke->data == ESRCH) {
979 return _dispatch_kevent_proc_exit(ke);
980 }
981 }
982 return _dispatch_kevent_error(ke);
983 }
984 _dispatch_kevent_debug(ke, __func__);
985 if (ke->filter == EVFILT_TIMER) {
986 return _dispatch_timers_kevent(ke);
987 }
988 #if HAVE_MACH
989 if (ke->filter == EVFILT_MACHPORT) {
990 return _dispatch_kevent_mach_portset(ke);
991 }
992 #endif
993 return _dispatch_kevent_merge(ke);
994 }
995
996 DISPATCH_NOINLINE
997 static void
998 _dispatch_kevent_merge(struct kevent64_s *ke)
999 {
1000 dispatch_kevent_t dk;
1001 dispatch_source_refs_t dri;
1002
1003 dk = (void*)ke->udata;
1004 dispatch_assert(dk);
1005
1006 if (ke->flags & EV_ONESHOT) {
1007 dk->dk_kevent.flags |= EV_ONESHOT;
1008 }
1009 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
1010 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), ke);
1011 }
1012 }
1013
1014 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1015 static void
1016 _dispatch_kevent_guard(dispatch_kevent_t dk)
1017 {
1018 guardid_t guard;
1019 const unsigned int guard_flags = GUARD_CLOSE;
1020 int r, fd_flags = 0;
1021 switch (dk->dk_kevent.filter) {
1022 case EVFILT_READ:
1023 case EVFILT_WRITE:
1024 case EVFILT_VNODE:
1025 guard = &dk->dk_kevent;
1026 r = change_fdguard_np((int)dk->dk_kevent.ident, NULL, 0,
1027 &guard, guard_flags, &fd_flags);
1028 if (slowpath(r == -1)) {
1029 int err = errno;
1030 if (err != EPERM) {
1031 (void)dispatch_assume_zero(err);
1032 }
1033 return;
1034 }
1035 dk->dk_kevent.ext[0] = guard_flags;
1036 dk->dk_kevent.ext[1] = fd_flags;
1037 break;
1038 }
1039 }
1040
1041 static void
1042 _dispatch_kevent_unguard(dispatch_kevent_t dk)
1043 {
1044 guardid_t guard;
1045 unsigned int guard_flags;
1046 int r, fd_flags;
1047 switch (dk->dk_kevent.filter) {
1048 case EVFILT_READ:
1049 case EVFILT_WRITE:
1050 case EVFILT_VNODE:
1051 guard_flags = (unsigned int)dk->dk_kevent.ext[0];
1052 if (!guard_flags) {
1053 return;
1054 }
1055 guard = &dk->dk_kevent;
1056 fd_flags = (int)dk->dk_kevent.ext[1];
1057 r = change_fdguard_np((int)dk->dk_kevent.ident, &guard,
1058 guard_flags, NULL, 0, &fd_flags);
1059 if (slowpath(r == -1)) {
1060 (void)dispatch_assume_zero(errno);
1061 return;
1062 }
1063 dk->dk_kevent.ext[0] = 0;
1064 break;
1065 }
1066 }
1067 #endif // DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1068
1069 #pragma mark -
1070 #pragma mark dispatch_source_timer
1071
1072 #if DISPATCH_USE_DTRACE
1073 static dispatch_source_refs_t
1074 _dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT];
1075 #define _dispatch_trace_next_timer_set(x, q) \
1076 _dispatch_trace_next_timer[(q)] = (x)
1077 #define _dispatch_trace_next_timer_program(d, q) \
1078 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1079 #define _dispatch_trace_next_timer_wake(q) \
1080 _dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)])
1081 #else
1082 #define _dispatch_trace_next_timer_set(x, q)
1083 #define _dispatch_trace_next_timer_program(d, q)
1084 #define _dispatch_trace_next_timer_wake(q)
1085 #endif
1086
1087 #define _dispatch_source_timer_telemetry_enabled() false
1088
1089 DISPATCH_NOINLINE
1090 static void
1091 _dispatch_source_timer_telemetry_slow(dispatch_source_t ds,
1092 uintptr_t ident, struct dispatch_timer_source_s *values)
1093 {
1094 if (_dispatch_trace_timer_configure_enabled()) {
1095 _dispatch_trace_timer_configure(ds, ident, values);
1096 }
1097 }
1098
1099 DISPATCH_ALWAYS_INLINE
1100 static inline void
1101 _dispatch_source_timer_telemetry(dispatch_source_t ds, uintptr_t ident,
1102 struct dispatch_timer_source_s *values)
1103 {
1104 if (_dispatch_trace_timer_configure_enabled() ||
1105 _dispatch_source_timer_telemetry_enabled()) {
1106 _dispatch_source_timer_telemetry_slow(ds, ident, values);
1107 asm(""); // prevent tailcall
1108 }
1109 }
1110
1111 // approx 1 year (60s * 60m * 24h * 365d)
1112 #define FOREVER_NSEC 31536000000000000ull
1113
1114 DISPATCH_ALWAYS_INLINE
1115 static inline uint64_t
1116 _dispatch_source_timer_now(uint64_t nows[], unsigned int tidx)
1117 {
1118 unsigned int tk = DISPATCH_TIMER_KIND(tidx);
1119 if (nows && fastpath(nows[tk])) {
1120 return nows[tk];
1121 }
1122 uint64_t now;
1123 switch (tk) {
1124 case DISPATCH_TIMER_KIND_MACH:
1125 now = _dispatch_absolute_time();
1126 break;
1127 case DISPATCH_TIMER_KIND_WALL:
1128 now = _dispatch_get_nanoseconds();
1129 break;
1130 }
1131 if (nows) {
1132 nows[tk] = now;
1133 }
1134 return now;
1135 }
1136
1137 static inline unsigned long
1138 _dispatch_source_timer_data(dispatch_source_refs_t dr, unsigned long prev)
1139 {
1140 // calculate the number of intervals since last fire
1141 unsigned long data, missed;
1142 uint64_t now;
1143 now = _dispatch_source_timer_now(NULL, _dispatch_source_timer_idx(dr));
1144 missed = (unsigned long)((now - ds_timer(dr).last_fire) /
1145 ds_timer(dr).interval);
1146 // correct for missed intervals already delivered last time
1147 data = prev - ds_timer(dr).missed + missed;
1148 ds_timer(dr).missed = missed;
1149 return data;
1150 }
1151
1152 struct dispatch_set_timer_params {
1153 dispatch_source_t ds;
1154 uintptr_t ident;
1155 struct dispatch_timer_source_s values;
1156 };
1157
1158 static void
1159 _dispatch_source_set_timer3(void *context)
1160 {
1161 // Called on the _dispatch_mgr_q
1162 struct dispatch_set_timer_params *params = context;
1163 dispatch_source_t ds = params->ds;
1164 ds->ds_ident_hack = params->ident;
1165 ds_timer(ds->ds_refs) = params->values;
1166 // Clear any pending data that might have accumulated on
1167 // older timer params <rdar://problem/8574886>
1168 ds->ds_pending_data = 0;
1169 // Re-arm in case we got disarmed because of pending set_timer suspension
1170 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, release);
1171 dispatch_resume(ds);
1172 // Must happen after resume to avoid getting disarmed due to suspension
1173 _dispatch_timers_update(ds);
1174 dispatch_release(ds);
1175 if (params->values.flags & DISPATCH_TIMER_WALL_CLOCK) {
1176 _dispatch_mach_host_calendar_change_register();
1177 }
1178 free(params);
1179 }
1180
1181 static void
1182 _dispatch_source_set_timer2(void *context)
1183 {
1184 // Called on the source queue
1185 struct dispatch_set_timer_params *params = context;
1186 dispatch_suspend(params->ds);
1187 _dispatch_barrier_async_detached_f(&_dispatch_mgr_q, params,
1188 _dispatch_source_set_timer3);
1189 }
1190
1191 DISPATCH_NOINLINE
1192 static struct dispatch_set_timer_params *
1193 _dispatch_source_timer_params(dispatch_source_t ds, dispatch_time_t start,
1194 uint64_t interval, uint64_t leeway)
1195 {
1196 struct dispatch_set_timer_params *params;
1197 params = _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params));
1198 params->ds = ds;
1199 params->values.flags = ds_timer(ds->ds_refs).flags;
1200
1201 if (interval == 0) {
1202 // we use zero internally to mean disabled
1203 interval = 1;
1204 } else if ((int64_t)interval < 0) {
1205 // 6866347 - make sure nanoseconds won't overflow
1206 interval = INT64_MAX;
1207 }
1208 if ((int64_t)leeway < 0) {
1209 leeway = INT64_MAX;
1210 }
1211 if (start == DISPATCH_TIME_NOW) {
1212 start = _dispatch_absolute_time();
1213 } else if (start == DISPATCH_TIME_FOREVER) {
1214 start = INT64_MAX;
1215 }
1216
1217 if ((int64_t)start < 0) {
1218 // wall clock
1219 start = (dispatch_time_t)-((int64_t)start);
1220 params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
1221 } else {
1222 // absolute clock
1223 interval = _dispatch_time_nano2mach(interval);
1224 if (interval < 1) {
1225 // rdar://problem/7287561 interval must be at least one in
1226 // in order to avoid later division by zero when calculating
1227 // the missed interval count. (NOTE: the wall clock's
1228 // interval is already "fixed" to be 1 or more)
1229 interval = 1;
1230 }
1231 leeway = _dispatch_time_nano2mach(leeway);
1232 params->values.flags &= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK;
1233 }
1234 params->ident = DISPATCH_TIMER_IDENT(params->values.flags);
1235 params->values.target = start;
1236 params->values.deadline = (start < UINT64_MAX - leeway) ?
1237 start + leeway : UINT64_MAX;
1238 params->values.interval = interval;
1239 params->values.leeway = (interval == INT64_MAX || leeway < interval / 2) ?
1240 leeway : interval / 2;
1241 return params;
1242 }
1243
1244 DISPATCH_ALWAYS_INLINE
1245 static inline void
1246 _dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1247 uint64_t interval, uint64_t leeway, bool source_sync)
1248 {
1249 if (slowpath(!ds->ds_is_timer) ||
1250 slowpath(ds_timer(ds->ds_refs).flags & DISPATCH_TIMER_INTERVAL)) {
1251 DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source");
1252 }
1253
1254 struct dispatch_set_timer_params *params;
1255 params = _dispatch_source_timer_params(ds, start, interval, leeway);
1256
1257 _dispatch_source_timer_telemetry(ds, params->ident, &params->values);
1258 // Suspend the source so that it doesn't fire with pending changes
1259 // The use of suspend/resume requires the external retain/release
1260 dispatch_retain(ds);
1261 if (source_sync) {
1262 return _dispatch_barrier_trysync_f((dispatch_queue_t)ds, params,
1263 _dispatch_source_set_timer2);
1264 } else {
1265 return _dispatch_source_set_timer2(params);
1266 }
1267 }
1268
1269 void
1270 dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1271 uint64_t interval, uint64_t leeway)
1272 {
1273 _dispatch_source_set_timer(ds, start, interval, leeway, true);
1274 }
1275
1276 void
1277 _dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds,
1278 dispatch_time_t start, uint64_t interval, uint64_t leeway)
1279 {
1280 // Don't serialize through the source queue for CF timers <rdar://13833190>
1281 _dispatch_source_set_timer(ds, start, interval, leeway, false);
1282 }
1283
1284 void
1285 _dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval)
1286 {
1287 dispatch_source_refs_t dr = ds->ds_refs;
1288 #define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1289 const bool animation = ds_timer(dr).flags & DISPATCH_INTERVAL_UI_ANIMATION;
1290 if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME :
1291 FOREVER_NSEC/NSEC_PER_MSEC))) {
1292 interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC;
1293 } else {
1294 interval = FOREVER_NSEC;
1295 }
1296 interval = _dispatch_time_nano2mach(interval);
1297 uint64_t target = _dispatch_absolute_time() + interval;
1298 target = (target / interval) * interval;
1299 const uint64_t leeway = animation ?
1300 _dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2;
1301 ds_timer(dr).target = target;
1302 ds_timer(dr).deadline = target + leeway;
1303 ds_timer(dr).interval = interval;
1304 ds_timer(dr).leeway = leeway;
1305 _dispatch_source_timer_telemetry(ds, ds->ds_ident_hack, &ds_timer(dr));
1306 }
1307
1308 #pragma mark -
1309 #pragma mark dispatch_timers
1310
1311 #define DISPATCH_TIMER_STRUCT(refs) \
1312 uint64_t target, deadline; \
1313 TAILQ_HEAD(, refs) dt_sources
1314
1315 typedef struct dispatch_timer_s {
1316 DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s);
1317 } *dispatch_timer_t;
1318
1319 #define DISPATCH_TIMER_INITIALIZER(tidx) \
1320 [tidx] = { \
1321 .target = UINT64_MAX, \
1322 .deadline = UINT64_MAX, \
1323 .dt_sources = TAILQ_HEAD_INITIALIZER( \
1324 _dispatch_timer[tidx].dt_sources), \
1325 }
1326 #define DISPATCH_TIMER_INIT(kind, qos) \
1327 DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1328 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1329
1330 struct dispatch_timer_s _dispatch_timer[] = {
1331 DISPATCH_TIMER_INIT(WALL, NORMAL),
1332 DISPATCH_TIMER_INIT(WALL, CRITICAL),
1333 DISPATCH_TIMER_INIT(WALL, BACKGROUND),
1334 DISPATCH_TIMER_INIT(MACH, NORMAL),
1335 DISPATCH_TIMER_INIT(MACH, CRITICAL),
1336 DISPATCH_TIMER_INIT(MACH, BACKGROUND),
1337 };
1338 #define DISPATCH_TIMER_COUNT \
1339 ((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0])))
1340
1341 #define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1342 (uintptr_t)&_dispatch_kevent_timer[tidx]
1343 #ifdef __LP64__
1344 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1345 .udata = DISPATCH_KEVENT_TIMER_UDATA(tidx)
1346 #else // __LP64__
1347 // dynamic initialization in _dispatch_timers_init()
1348 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1349 .udata = 0
1350 #endif // __LP64__
1351 #define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \
1352 [tidx] = { \
1353 .dk_kevent = { \
1354 .ident = tidx, \
1355 .filter = DISPATCH_EVFILT_TIMER, \
1356 DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \
1357 }, \
1358 .dk_sources = TAILQ_HEAD_INITIALIZER( \
1359 _dispatch_kevent_timer[tidx].dk_sources), \
1360 }
1361 #define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \
1362 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1363 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1364
1365 struct dispatch_kevent_s _dispatch_kevent_timer[] = {
1366 DISPATCH_KEVENT_TIMER_INIT(WALL, NORMAL),
1367 DISPATCH_KEVENT_TIMER_INIT(WALL, CRITICAL),
1368 DISPATCH_KEVENT_TIMER_INIT(WALL, BACKGROUND),
1369 DISPATCH_KEVENT_TIMER_INIT(MACH, NORMAL),
1370 DISPATCH_KEVENT_TIMER_INIT(MACH, CRITICAL),
1371 DISPATCH_KEVENT_TIMER_INIT(MACH, BACKGROUND),
1372 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM),
1373 };
1374 #define DISPATCH_KEVENT_TIMER_COUNT \
1375 ((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0])))
1376
1377 #define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8)
1378 #define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(qos, note) \
1379 [qos] = { \
1380 .ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(qos), \
1381 .filter = EVFILT_TIMER, \
1382 .flags = EV_ONESHOT, \
1383 .fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \
1384 }
1385 #define DISPATCH_KEVENT_TIMEOUT_INIT(qos, note) \
1386 DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_QOS_##qos, note)
1387
1388 struct kevent64_s _dispatch_kevent_timeout[] = {
1389 DISPATCH_KEVENT_TIMEOUT_INIT(NORMAL, 0),
1390 DISPATCH_KEVENT_TIMEOUT_INIT(CRITICAL, NOTE_CRITICAL),
1391 DISPATCH_KEVENT_TIMEOUT_INIT(BACKGROUND, NOTE_BACKGROUND),
1392 };
1393
1394 #define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
1395 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
1396
1397 static const uint64_t _dispatch_kevent_coalescing_window[] = {
1398 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75),
1399 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1),
1400 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100),
1401 };
1402
1403 #define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \
1404 typeof(dr) dri = NULL; typeof(dt) dti; \
1405 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1406 TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \
1407 if (ds_timer(dr).target < ds_timer(dri).target) { \
1408 break; \
1409 } \
1410 } \
1411 TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \
1412 if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \
1413 break; \
1414 } \
1415 } \
1416 if (dti) { \
1417 TAILQ_INSERT_BEFORE(dti, dt, dt_list); \
1418 } else { \
1419 TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \
1420 } \
1421 } \
1422 if (dri) { \
1423 TAILQ_INSERT_BEFORE(dri, dr, dr_list); \
1424 } else { \
1425 TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \
1426 } \
1427 })
1428
1429 #define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \
1430 ({ \
1431 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1432 TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \
1433 } \
1434 TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \
1435 dr_list); })
1436
1437 #define _dispatch_timers_check(dra, dta) ({ \
1438 unsigned int qosm = _dispatch_timers_qos_mask; \
1439 bool update = false; \
1440 unsigned int tidx; \
1441 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \
1442 if (!(qosm & 1 << DISPATCH_TIMER_QOS(tidx))){ \
1443 continue; \
1444 } \
1445 dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \
1446 TAILQ_FIRST(&dra[tidx].dk_sources); \
1447 dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \
1448 TAILQ_FIRST(&dta[tidx].dt_sources); \
1449 uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \
1450 uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \
1451 if (target != dta[tidx].target) { \
1452 dta[tidx].target = target; \
1453 update = true; \
1454 } \
1455 if (deadline != dta[tidx].deadline) { \
1456 dta[tidx].deadline = deadline; \
1457 update = true; \
1458 } \
1459 } \
1460 update; })
1461
1462 static bool _dispatch_timers_reconfigure, _dispatch_timer_expired;
1463 static unsigned int _dispatch_timers_qos_mask;
1464 static bool _dispatch_timers_force_max_leeway;
1465
1466 static void
1467 _dispatch_timers_init(void)
1468 {
1469 #ifndef __LP64__
1470 unsigned int tidx;
1471 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1472 _dispatch_kevent_timer[tidx].dk_kevent.udata = \
1473 DISPATCH_KEVENT_TIMER_UDATA(tidx);
1474 }
1475 #endif // __LP64__
1476 if (slowpath(getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY"))) {
1477 _dispatch_timers_force_max_leeway = true;
1478 }
1479 }
1480
1481 static inline void
1482 _dispatch_timers_unregister(dispatch_source_t ds, dispatch_kevent_t dk)
1483 {
1484 dispatch_source_refs_t dr = ds->ds_refs;
1485 unsigned int tidx = (unsigned int)dk->dk_kevent.ident;
1486
1487 if (slowpath(ds_timer_aggregate(ds))) {
1488 _dispatch_timer_aggregates_unregister(ds, tidx);
1489 }
1490 _dispatch_timers_remove(tidx, dk, _dispatch_kevent_timer, dr, dr_list,
1491 _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
1492 if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1493 _dispatch_timers_reconfigure = true;
1494 _dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx);
1495 }
1496 }
1497
1498 // Updates the ordered list of timers based on next fire date for changes to ds.
1499 // Should only be called from the context of _dispatch_mgr_q.
1500 static void
1501 _dispatch_timers_update(dispatch_source_t ds)
1502 {
1503 dispatch_kevent_t dk = ds->ds_dkev;
1504 dispatch_source_refs_t dr = ds->ds_refs;
1505 unsigned int tidx;
1506
1507 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1508
1509 // Do not reschedule timers unregistered with _dispatch_kevent_unregister()
1510 if (slowpath(!dk)) {
1511 return;
1512 }
1513 // Move timers that are disabled, suspended or have missed intervals to the
1514 // disarmed list, rearm after resume resp. source invoke will reenable them
1515 if (!ds_timer(dr).target || DISPATCH_OBJECT_SUSPENDED(ds) ||
1516 ds->ds_pending_data) {
1517 tidx = DISPATCH_TIMER_INDEX_DISARM;
1518 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
1519 } else {
1520 tidx = _dispatch_source_timer_idx(dr);
1521 }
1522 if (slowpath(ds_timer_aggregate(ds))) {
1523 _dispatch_timer_aggregates_register(ds);
1524 }
1525 if (slowpath(!ds->ds_is_installed)) {
1526 ds->ds_is_installed = true;
1527 if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1528 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
1529 }
1530 _dispatch_object_debug(ds, "%s", __func__);
1531 ds->ds_dkev = NULL;
1532 free(dk);
1533 } else {
1534 _dispatch_timers_unregister(ds, dk);
1535 }
1536 if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1537 _dispatch_timers_reconfigure = true;
1538 _dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx);
1539 }
1540 if (dk != &_dispatch_kevent_timer[tidx]){
1541 ds->ds_dkev = &_dispatch_kevent_timer[tidx];
1542 }
1543 _dispatch_timers_insert(tidx, _dispatch_kevent_timer, dr, dr_list,
1544 _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
1545 if (slowpath(ds_timer_aggregate(ds))) {
1546 _dispatch_timer_aggregates_update(ds, tidx);
1547 }
1548 }
1549
1550 static inline void
1551 _dispatch_timers_run2(uint64_t nows[], unsigned int tidx)
1552 {
1553 dispatch_source_refs_t dr;
1554 dispatch_source_t ds;
1555 uint64_t now, missed;
1556
1557 now = _dispatch_source_timer_now(nows, tidx);
1558 while ((dr = TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources))) {
1559 ds = _dispatch_source_from_refs(dr);
1560 // We may find timers on the wrong list due to a pending update from
1561 // dispatch_source_set_timer. Force an update of the list in that case.
1562 if (tidx != ds->ds_ident_hack) {
1563 _dispatch_timers_update(ds);
1564 continue;
1565 }
1566 if (!ds_timer(dr).target) {
1567 // No configured timers on the list
1568 break;
1569 }
1570 if (ds_timer(dr).target > now) {
1571 // Done running timers for now.
1572 break;
1573 }
1574 // Remove timers that are suspended or have missed intervals from the
1575 // list, rearm after resume resp. source invoke will reenable them
1576 if (DISPATCH_OBJECT_SUSPENDED(ds) || ds->ds_pending_data) {
1577 _dispatch_timers_update(ds);
1578 continue;
1579 }
1580 // Calculate number of missed intervals.
1581 missed = (now - ds_timer(dr).target) / ds_timer(dr).interval;
1582 if (++missed > INT_MAX) {
1583 missed = INT_MAX;
1584 }
1585 if (ds_timer(dr).interval < INT64_MAX) {
1586 ds_timer(dr).target += missed * ds_timer(dr).interval;
1587 ds_timer(dr).deadline = ds_timer(dr).target + ds_timer(dr).leeway;
1588 } else {
1589 ds_timer(dr).target = UINT64_MAX;
1590 ds_timer(dr).deadline = UINT64_MAX;
1591 }
1592 _dispatch_timers_update(ds);
1593 ds_timer(dr).last_fire = now;
1594
1595 unsigned long data;
1596 data = dispatch_atomic_add2o(ds, ds_pending_data,
1597 (unsigned long)missed, relaxed);
1598 _dispatch_trace_timer_fire(dr, data, (unsigned long)missed);
1599 _dispatch_wakeup(ds);
1600 }
1601 }
1602
1603 DISPATCH_NOINLINE
1604 static void
1605 _dispatch_timers_run(uint64_t nows[])
1606 {
1607 unsigned int tidx;
1608 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1609 if (!TAILQ_EMPTY(&_dispatch_kevent_timer[tidx].dk_sources)) {
1610 _dispatch_timers_run2(nows, tidx);
1611 }
1612 }
1613 }
1614
1615 static inline unsigned int
1616 _dispatch_timers_get_delay(uint64_t nows[], struct dispatch_timer_s timer[],
1617 uint64_t *delay, uint64_t *leeway, int qos)
1618 {
1619 unsigned int tidx, ridx = DISPATCH_TIMER_COUNT;
1620 uint64_t tmp, delta = UINT64_MAX, dldelta = UINT64_MAX;
1621
1622 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1623 if (qos >= 0 && qos != DISPATCH_TIMER_QOS(tidx)){
1624 continue;
1625 }
1626 uint64_t target = timer[tidx].target;
1627 if (target == UINT64_MAX) {
1628 continue;
1629 }
1630 uint64_t deadline = timer[tidx].deadline;
1631 if (qos >= 0) {
1632 // Timer pre-coalescing <rdar://problem/13222034>
1633 uint64_t window = _dispatch_kevent_coalescing_window[qos];
1634 uint64_t latest = deadline > window ? deadline - window : 0;
1635 dispatch_source_refs_t dri;
1636 TAILQ_FOREACH(dri, &_dispatch_kevent_timer[tidx].dk_sources,
1637 dr_list) {
1638 tmp = ds_timer(dri).target;
1639 if (tmp > latest) break;
1640 target = tmp;
1641 }
1642 }
1643 uint64_t now = _dispatch_source_timer_now(nows, tidx);
1644 if (target <= now) {
1645 delta = 0;
1646 break;
1647 }
1648 tmp = target - now;
1649 if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) {
1650 tmp = _dispatch_time_mach2nano(tmp);
1651 }
1652 if (tmp < INT64_MAX && tmp < delta) {
1653 ridx = tidx;
1654 delta = tmp;
1655 }
1656 dispatch_assert(target <= deadline);
1657 tmp = deadline - now;
1658 if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) {
1659 tmp = _dispatch_time_mach2nano(tmp);
1660 }
1661 if (tmp < INT64_MAX && tmp < dldelta) {
1662 dldelta = tmp;
1663 }
1664 }
1665 *delay = delta;
1666 *leeway = delta && delta < UINT64_MAX ? dldelta - delta : UINT64_MAX;
1667 return ridx;
1668 }
1669
1670 static bool
1671 _dispatch_timers_program2(uint64_t nows[], struct kevent64_s *ke,
1672 unsigned int qos)
1673 {
1674 unsigned int tidx;
1675 bool poll;
1676 uint64_t delay, leeway;
1677
1678 tidx = _dispatch_timers_get_delay(nows, _dispatch_timer, &delay, &leeway,
1679 (int)qos);
1680 poll = (delay == 0);
1681 if (poll || delay == UINT64_MAX) {
1682 _dispatch_trace_next_timer_set(NULL, qos);
1683 if (!ke->data) {
1684 return poll;
1685 }
1686 ke->data = 0;
1687 ke->flags |= EV_DELETE;
1688 ke->flags &= ~(EV_ADD|EV_ENABLE);
1689 } else {
1690 _dispatch_trace_next_timer_set(
1691 TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources), qos);
1692 _dispatch_trace_next_timer_program(delay, qos);
1693 delay += _dispatch_source_timer_now(nows, DISPATCH_TIMER_KIND_WALL);
1694 if (slowpath(_dispatch_timers_force_max_leeway)) {
1695 ke->data = (int64_t)(delay + leeway);
1696 ke->ext[1] = 0;
1697 } else {
1698 ke->data = (int64_t)delay;
1699 ke->ext[1] = leeway;
1700 }
1701 ke->flags |= EV_ADD|EV_ENABLE;
1702 ke->flags &= ~EV_DELETE;
1703 }
1704 _dispatch_kq_update(ke);
1705 return poll;
1706 }
1707
1708 DISPATCH_NOINLINE
1709 static bool
1710 _dispatch_timers_program(uint64_t nows[])
1711 {
1712 bool poll = false;
1713 unsigned int qos, qosm = _dispatch_timers_qos_mask;
1714 for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) {
1715 if (!(qosm & 1 << qos)){
1716 continue;
1717 }
1718 poll |= _dispatch_timers_program2(nows, &_dispatch_kevent_timeout[qos],
1719 qos);
1720 }
1721 return poll;
1722 }
1723
1724 DISPATCH_NOINLINE
1725 static bool
1726 _dispatch_timers_configure(void)
1727 {
1728 _dispatch_timer_aggregates_check();
1729 // Find out if there is a new target/deadline on the timer lists
1730 return _dispatch_timers_check(_dispatch_kevent_timer, _dispatch_timer);
1731 }
1732
1733 static void
1734 _dispatch_timers_calendar_change(void)
1735 {
1736 // calendar change may have gone past the wallclock deadline
1737 _dispatch_timer_expired = true;
1738 _dispatch_timers_qos_mask = ~0u;
1739 }
1740
1741 static void
1742 _dispatch_timers_kevent(struct kevent64_s *ke)
1743 {
1744 dispatch_assert(ke->data > 0);
1745 dispatch_assert((ke->ident & DISPATCH_KEVENT_TIMEOUT_IDENT_MASK) ==
1746 DISPATCH_KEVENT_TIMEOUT_IDENT_MASK);
1747 unsigned int qos = ke->ident & ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK;
1748 dispatch_assert(qos < DISPATCH_TIMER_QOS_COUNT);
1749 dispatch_assert(_dispatch_kevent_timeout[qos].data);
1750 _dispatch_kevent_timeout[qos].data = 0; // kevent deleted via EV_ONESHOT
1751 _dispatch_timer_expired = true;
1752 _dispatch_timers_qos_mask |= 1 << qos;
1753 _dispatch_trace_next_timer_wake(qos);
1754 }
1755
1756 static inline bool
1757 _dispatch_mgr_timers(void)
1758 {
1759 uint64_t nows[DISPATCH_TIMER_KIND_COUNT] = {};
1760 bool expired = slowpath(_dispatch_timer_expired);
1761 if (expired) {
1762 _dispatch_timers_run(nows);
1763 }
1764 bool reconfigure = slowpath(_dispatch_timers_reconfigure);
1765 if (reconfigure || expired) {
1766 if (reconfigure) {
1767 reconfigure = _dispatch_timers_configure();
1768 _dispatch_timers_reconfigure = false;
1769 }
1770 if (reconfigure || expired) {
1771 expired = _dispatch_timer_expired = _dispatch_timers_program(nows);
1772 expired = expired || _dispatch_mgr_q.dq_items_tail;
1773 }
1774 _dispatch_timers_qos_mask = 0;
1775 }
1776 return expired;
1777 }
1778
1779 #pragma mark -
1780 #pragma mark dispatch_timer_aggregate
1781
1782 typedef struct {
1783 TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s) dk_sources;
1784 } dispatch_timer_aggregate_refs_s;
1785
1786 typedef struct dispatch_timer_aggregate_s {
1787 DISPATCH_STRUCT_HEADER(queue);
1788 DISPATCH_QUEUE_HEADER;
1789 TAILQ_ENTRY(dispatch_timer_aggregate_s) dta_list;
1790 dispatch_timer_aggregate_refs_s
1791 dta_kevent_timer[DISPATCH_KEVENT_TIMER_COUNT];
1792 struct {
1793 DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s);
1794 } dta_timer[DISPATCH_TIMER_COUNT];
1795 struct dispatch_timer_s dta_timer_data[DISPATCH_TIMER_COUNT];
1796 unsigned int dta_refcount;
1797 } dispatch_timer_aggregate_s;
1798
1799 typedef TAILQ_HEAD(, dispatch_timer_aggregate_s) dispatch_timer_aggregates_s;
1800 static dispatch_timer_aggregates_s _dispatch_timer_aggregates =
1801 TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates);
1802
1803 dispatch_timer_aggregate_t
1804 dispatch_timer_aggregate_create(void)
1805 {
1806 unsigned int tidx;
1807 dispatch_timer_aggregate_t dta = _dispatch_alloc(DISPATCH_VTABLE(queue),
1808 sizeof(struct dispatch_timer_aggregate_s));
1809 _dispatch_queue_init((dispatch_queue_t)dta);
1810 dta->do_targetq = _dispatch_get_root_queue(
1811 _DISPATCH_QOS_CLASS_USER_INITIATED, true);
1812 dta->dq_width = DISPATCH_QUEUE_WIDTH_MAX;
1813 //FIXME: aggregates need custom vtable
1814 //dta->dq_label = "timer-aggregate";
1815 for (tidx = 0; tidx < DISPATCH_KEVENT_TIMER_COUNT; tidx++) {
1816 TAILQ_INIT(&dta->dta_kevent_timer[tidx].dk_sources);
1817 }
1818 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1819 TAILQ_INIT(&dta->dta_timer[tidx].dt_sources);
1820 dta->dta_timer[tidx].target = UINT64_MAX;
1821 dta->dta_timer[tidx].deadline = UINT64_MAX;
1822 dta->dta_timer_data[tidx].target = UINT64_MAX;
1823 dta->dta_timer_data[tidx].deadline = UINT64_MAX;
1824 }
1825 return (dispatch_timer_aggregate_t)_dispatch_introspection_queue_create(
1826 (dispatch_queue_t)dta);
1827 }
1828
1829 typedef struct dispatch_timer_delay_s {
1830 dispatch_timer_t timer;
1831 uint64_t delay, leeway;
1832 } *dispatch_timer_delay_t;
1833
1834 static void
1835 _dispatch_timer_aggregate_get_delay(void *ctxt)
1836 {
1837 dispatch_timer_delay_t dtd = ctxt;
1838 struct { uint64_t nows[DISPATCH_TIMER_KIND_COUNT]; } dtn = {};
1839 _dispatch_timers_get_delay(dtn.nows, dtd->timer, &dtd->delay, &dtd->leeway,
1840 -1);
1841 }
1842
1843 uint64_t
1844 dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta,
1845 uint64_t *leeway_ptr)
1846 {
1847 struct dispatch_timer_delay_s dtd = {
1848 .timer = dta->dta_timer_data,
1849 };
1850 dispatch_sync_f((dispatch_queue_t)dta, &dtd,
1851 _dispatch_timer_aggregate_get_delay);
1852 if (leeway_ptr) {
1853 *leeway_ptr = dtd.leeway;
1854 }
1855 return dtd.delay;
1856 }
1857
1858 static void
1859 _dispatch_timer_aggregate_update(void *ctxt)
1860 {
1861 dispatch_timer_aggregate_t dta = (void*)_dispatch_queue_get_current();
1862 dispatch_timer_t dtau = ctxt;
1863 unsigned int tidx;
1864 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1865 dta->dta_timer_data[tidx].target = dtau[tidx].target;
1866 dta->dta_timer_data[tidx].deadline = dtau[tidx].deadline;
1867 }
1868 free(dtau);
1869 }
1870
1871 DISPATCH_NOINLINE
1872 static void
1873 _dispatch_timer_aggregates_configure(void)
1874 {
1875 dispatch_timer_aggregate_t dta;
1876 dispatch_timer_t dtau;
1877 TAILQ_FOREACH(dta, &_dispatch_timer_aggregates, dta_list) {
1878 if (!_dispatch_timers_check(dta->dta_kevent_timer, dta->dta_timer)) {
1879 continue;
1880 }
1881 dtau = _dispatch_calloc(DISPATCH_TIMER_COUNT, sizeof(*dtau));
1882 memcpy(dtau, dta->dta_timer, sizeof(dta->dta_timer));
1883 _dispatch_barrier_async_detached_f((dispatch_queue_t)dta, dtau,
1884 _dispatch_timer_aggregate_update);
1885 }
1886 }
1887
1888 static inline void
1889 _dispatch_timer_aggregates_check(void)
1890 {
1891 if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates))) {
1892 return;
1893 }
1894 _dispatch_timer_aggregates_configure();
1895 }
1896
1897 static void
1898 _dispatch_timer_aggregates_register(dispatch_source_t ds)
1899 {
1900 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1901 if (!dta->dta_refcount++) {
1902 TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates, dta, dta_list);
1903 }
1904 }
1905
1906 DISPATCH_NOINLINE
1907 static void
1908 _dispatch_timer_aggregates_update(dispatch_source_t ds, unsigned int tidx)
1909 {
1910 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1911 dispatch_timer_source_aggregate_refs_t dr;
1912 dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
1913 _dispatch_timers_insert(tidx, dta->dta_kevent_timer, dr, dra_list,
1914 dta->dta_timer, dr, dta_list);
1915 }
1916
1917 DISPATCH_NOINLINE
1918 static void
1919 _dispatch_timer_aggregates_unregister(dispatch_source_t ds, unsigned int tidx)
1920 {
1921 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1922 dispatch_timer_source_aggregate_refs_t dr;
1923 dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
1924 _dispatch_timers_remove(tidx, (dispatch_timer_aggregate_refs_s*)NULL,
1925 dta->dta_kevent_timer, dr, dra_list, dta->dta_timer, dr, dta_list);
1926 if (!--dta->dta_refcount) {
1927 TAILQ_REMOVE(&_dispatch_timer_aggregates, dta, dta_list);
1928 }
1929 }
1930
1931 #pragma mark -
1932 #pragma mark dispatch_select
1933
1934 static int _dispatch_kq;
1935
1936 static unsigned int _dispatch_select_workaround;
1937 static fd_set _dispatch_rfds;
1938 static fd_set _dispatch_wfds;
1939 static uint64_t*_dispatch_rfd_ptrs;
1940 static uint64_t*_dispatch_wfd_ptrs;
1941
1942 DISPATCH_NOINLINE
1943 static bool
1944 _dispatch_select_register(struct kevent64_s *kev)
1945 {
1946
1947 // Must execute on manager queue
1948 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1949
1950 // If an EINVAL or ENOENT error occurred while adding/enabling a read or
1951 // write kevent, assume it was due to a type of filedescriptor not
1952 // supported by kqueue and fall back to select
1953 switch (kev->filter) {
1954 case EVFILT_READ:
1955 if ((kev->data == EINVAL || kev->data == ENOENT) &&
1956 dispatch_assume(kev->ident < FD_SETSIZE)) {
1957 FD_SET((int)kev->ident, &_dispatch_rfds);
1958 if (slowpath(!_dispatch_rfd_ptrs)) {
1959 _dispatch_rfd_ptrs = _dispatch_calloc(FD_SETSIZE,
1960 sizeof(*_dispatch_rfd_ptrs));
1961 }
1962 if (!_dispatch_rfd_ptrs[kev->ident]) {
1963 _dispatch_rfd_ptrs[kev->ident] = kev->udata;
1964 _dispatch_select_workaround++;
1965 _dispatch_debug("select workaround used to read fd %d: 0x%lx",
1966 (int)kev->ident, (long)kev->data);
1967 }
1968 }
1969 return true;
1970 case EVFILT_WRITE:
1971 if ((kev->data == EINVAL || kev->data == ENOENT) &&
1972 dispatch_assume(kev->ident < FD_SETSIZE)) {
1973 FD_SET((int)kev->ident, &_dispatch_wfds);
1974 if (slowpath(!_dispatch_wfd_ptrs)) {
1975 _dispatch_wfd_ptrs = _dispatch_calloc(FD_SETSIZE,
1976 sizeof(*_dispatch_wfd_ptrs));
1977 }
1978 if (!_dispatch_wfd_ptrs[kev->ident]) {
1979 _dispatch_wfd_ptrs[kev->ident] = kev->udata;
1980 _dispatch_select_workaround++;
1981 _dispatch_debug("select workaround used to write fd %d: 0x%lx",
1982 (int)kev->ident, (long)kev->data);
1983 }
1984 }
1985 return true;
1986 }
1987 return false;
1988 }
1989
1990 DISPATCH_NOINLINE
1991 static bool
1992 _dispatch_select_unregister(const struct kevent64_s *kev)
1993 {
1994 // Must execute on manager queue
1995 DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1996
1997 switch (kev->filter) {
1998 case EVFILT_READ:
1999 if (_dispatch_rfd_ptrs && kev->ident < FD_SETSIZE &&
2000 _dispatch_rfd_ptrs[kev->ident]) {
2001 FD_CLR((int)kev->ident, &_dispatch_rfds);
2002 _dispatch_rfd_ptrs[kev->ident] = 0;
2003 _dispatch_select_workaround--;
2004 return true;
2005 }
2006 break;
2007 case EVFILT_WRITE:
2008 if (_dispatch_wfd_ptrs && kev->ident < FD_SETSIZE &&
2009 _dispatch_wfd_ptrs[kev->ident]) {
2010 FD_CLR((int)kev->ident, &_dispatch_wfds);
2011 _dispatch_wfd_ptrs[kev->ident] = 0;
2012 _dispatch_select_workaround--;
2013 return true;
2014 }
2015 break;
2016 }
2017 return false;
2018 }
2019
2020 DISPATCH_NOINLINE
2021 static bool
2022 _dispatch_mgr_select(bool poll)
2023 {
2024 static const struct timeval timeout_immediately = { 0, 0 };
2025 fd_set tmp_rfds, tmp_wfds;
2026 struct kevent64_s kev;
2027 int err, i, r;
2028 bool kevent_avail = false;
2029
2030 FD_COPY(&_dispatch_rfds, &tmp_rfds);
2031 FD_COPY(&_dispatch_wfds, &tmp_wfds);
2032
2033 r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL,
2034 poll ? (struct timeval*)&timeout_immediately : NULL);
2035 if (slowpath(r == -1)) {
2036 err = errno;
2037 if (err != EBADF) {
2038 if (err != EINTR) {
2039 (void)dispatch_assume_zero(err);
2040 }
2041 return false;
2042 }
2043 for (i = 0; i < FD_SETSIZE; i++) {
2044 if (i == _dispatch_kq) {
2045 continue;
2046 }
2047 if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)){
2048 continue;
2049 }
2050 r = dup(i);
2051 if (dispatch_assume(r != -1)) {
2052 close(r);
2053 } else {
2054 if (_dispatch_rfd_ptrs && _dispatch_rfd_ptrs[i]) {
2055 FD_CLR(i, &_dispatch_rfds);
2056 _dispatch_rfd_ptrs[i] = 0;
2057 _dispatch_select_workaround--;
2058 }
2059 if (_dispatch_wfd_ptrs && _dispatch_wfd_ptrs[i]) {
2060 FD_CLR(i, &_dispatch_wfds);
2061 _dispatch_wfd_ptrs[i] = 0;
2062 _dispatch_select_workaround--;
2063 }
2064 }
2065 }
2066 return false;
2067 }
2068 if (r > 0) {
2069 for (i = 0; i < FD_SETSIZE; i++) {
2070 if (FD_ISSET(i, &tmp_rfds)) {
2071 if (i == _dispatch_kq) {
2072 kevent_avail = true;
2073 continue;
2074 }
2075 FD_CLR(i, &_dispatch_rfds); // emulate EV_DISPATCH
2076 EV_SET64(&kev, i, EVFILT_READ,
2077 EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2078 _dispatch_rfd_ptrs[i], 0, 0);
2079 _dispatch_kevent_drain(&kev);
2080 }
2081 if (FD_ISSET(i, &tmp_wfds)) {
2082 FD_CLR(i, &_dispatch_wfds); // emulate EV_DISPATCH
2083 EV_SET64(&kev, i, EVFILT_WRITE,
2084 EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2085 _dispatch_wfd_ptrs[i], 0, 0);
2086 _dispatch_kevent_drain(&kev);
2087 }
2088 }
2089 }
2090 return kevent_avail;
2091 }
2092
2093 #pragma mark -
2094 #pragma mark dispatch_kqueue
2095
2096 static void
2097 _dispatch_kq_init(void *context DISPATCH_UNUSED)
2098 {
2099 static const struct kevent64_s kev = {
2100 .ident = 1,
2101 .filter = EVFILT_USER,
2102 .flags = EV_ADD|EV_CLEAR,
2103 };
2104
2105 _dispatch_safe_fork = false;
2106 #if DISPATCH_USE_GUARDED_FD
2107 guardid_t guard = (uintptr_t)&kev;
2108 _dispatch_kq = guarded_kqueue_np(&guard, GUARD_CLOSE | GUARD_DUP);
2109 #else
2110 _dispatch_kq = kqueue();
2111 #endif
2112 if (_dispatch_kq == -1) {
2113 int err = errno;
2114 switch (err) {
2115 case EMFILE:
2116 DISPATCH_CLIENT_CRASH("kqueue() failure: "
2117 "process is out of file descriptors");
2118 break;
2119 case ENFILE:
2120 DISPATCH_CLIENT_CRASH("kqueue() failure: "
2121 "system is out of file descriptors");
2122 break;
2123 case ENOMEM:
2124 DISPATCH_CLIENT_CRASH("kqueue() failure: "
2125 "kernel is out of memory");
2126 break;
2127 default:
2128 (void)dispatch_assume_zero(err);
2129 DISPATCH_CRASH("kqueue() failure");
2130 break;
2131 }
2132 } else if (dispatch_assume(_dispatch_kq < FD_SETSIZE)) {
2133 // in case we fall back to select()
2134 FD_SET(_dispatch_kq, &_dispatch_rfds);
2135 }
2136
2137 (void)dispatch_assume_zero(kevent64(_dispatch_kq, &kev, 1, NULL, 0, 0,
2138 NULL));
2139 _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q, 0);
2140 }
2141
2142 static int
2143 _dispatch_get_kq(void)
2144 {
2145 static dispatch_once_t pred;
2146
2147 dispatch_once_f(&pred, NULL, _dispatch_kq_init);
2148
2149 return _dispatch_kq;
2150 }
2151
2152 DISPATCH_NOINLINE
2153 static long
2154 _dispatch_kq_update(const struct kevent64_s *kev)
2155 {
2156 int r;
2157 struct kevent64_s kev_copy;
2158
2159 if (slowpath(_dispatch_select_workaround) && (kev->flags & EV_DELETE)) {
2160 if (_dispatch_select_unregister(kev)) {
2161 return 0;
2162 }
2163 }
2164 kev_copy = *kev;
2165 // This ensures we don't get a pending kevent back while registering
2166 // a new kevent
2167 kev_copy.flags |= EV_RECEIPT;
2168 retry:
2169 r = dispatch_assume(kevent64(_dispatch_get_kq(), &kev_copy, 1,
2170 &kev_copy, 1, 0, NULL));
2171 if (slowpath(r == -1)) {
2172 int err = errno;
2173 switch (err) {
2174 case EINTR:
2175 goto retry;
2176 case EBADF:
2177 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2178 break;
2179 default:
2180 (void)dispatch_assume_zero(err);
2181 break;
2182 }
2183 return err;
2184 }
2185 switch (kev_copy.data) {
2186 case 0:
2187 return 0;
2188 case EBADF:
2189 case EPERM:
2190 case EINVAL:
2191 case ENOENT:
2192 if ((kev->flags & (EV_ADD|EV_ENABLE)) && !(kev->flags & EV_DELETE)) {
2193 if (_dispatch_select_register(&kev_copy)) {
2194 return 0;
2195 }
2196 }
2197 // fall through
2198 default:
2199 kev_copy.flags |= kev->flags;
2200 _dispatch_kevent_drain(&kev_copy);
2201 break;
2202 }
2203 return (long)kev_copy.data;
2204 }
2205
2206 #pragma mark -
2207 #pragma mark dispatch_mgr
2208
2209 static struct kevent64_s *_dispatch_kevent_enable;
2210
2211 static void inline
2212 _dispatch_mgr_kevent_reenable(struct kevent64_s *ke)
2213 {
2214 dispatch_assert(!_dispatch_kevent_enable || _dispatch_kevent_enable == ke);
2215 _dispatch_kevent_enable = ke;
2216 }
2217
2218 unsigned long
2219 _dispatch_mgr_wakeup(dispatch_queue_t dq DISPATCH_UNUSED)
2220 {
2221 if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
2222 return false;
2223 }
2224
2225 static const struct kevent64_s kev = {
2226 .ident = 1,
2227 .filter = EVFILT_USER,
2228 .fflags = NOTE_TRIGGER,
2229 };
2230
2231 #if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG
2232 _dispatch_debug("waking up the dispatch manager queue: %p", dq);
2233 #endif
2234
2235 _dispatch_kq_update(&kev);
2236
2237 return false;
2238 }
2239
2240 DISPATCH_NOINLINE
2241 static void
2242 _dispatch_mgr_init(void)
2243 {
2244 (void)dispatch_atomic_inc2o(&_dispatch_mgr_q, dq_running, relaxed);
2245 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_mgr_q);
2246 _dispatch_queue_set_bound_thread(&_dispatch_mgr_q);
2247 _dispatch_mgr_priority_init();
2248 _dispatch_kevent_init();
2249 _dispatch_timers_init();
2250 _dispatch_mach_recv_msg_buf_init();
2251 _dispatch_memorystatus_init();
2252 }
2253
2254 DISPATCH_NOINLINE DISPATCH_NORETURN
2255 static void
2256 _dispatch_mgr_invoke(void)
2257 {
2258 static const struct timespec timeout_immediately = { 0, 0 };
2259 struct kevent64_s kev;
2260 bool poll;
2261 int r;
2262
2263 for (;;) {
2264 _dispatch_mgr_queue_drain();
2265 poll = _dispatch_mgr_timers();
2266 if (slowpath(_dispatch_select_workaround)) {
2267 poll = _dispatch_mgr_select(poll);
2268 if (!poll) continue;
2269 }
2270 poll = poll || _dispatch_queue_class_probe(&_dispatch_mgr_q);
2271 r = kevent64(_dispatch_kq, _dispatch_kevent_enable,
2272 _dispatch_kevent_enable ? 1 : 0, &kev, 1, 0,
2273 poll ? &timeout_immediately : NULL);
2274 _dispatch_kevent_enable = NULL;
2275 if (slowpath(r == -1)) {
2276 int err = errno;
2277 switch (err) {
2278 case EINTR:
2279 break;
2280 case EBADF:
2281 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2282 break;
2283 default:
2284 (void)dispatch_assume_zero(err);
2285 break;
2286 }
2287 } else if (r) {
2288 _dispatch_kevent_drain(&kev);
2289 }
2290 }
2291 }
2292
2293 DISPATCH_NORETURN
2294 void
2295 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED)
2296 {
2297 _dispatch_mgr_init();
2298 // never returns, so burn bridges behind us & clear stack 2k ahead
2299 _dispatch_clear_stack(2048);
2300 _dispatch_mgr_invoke();
2301 }
2302
2303 #pragma mark -
2304 #pragma mark dispatch_memorystatus
2305
2306 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
2307 #define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYSTATUS
2308 #define DISPATCH_MEMORYSTATUS_SOURCE_MASK ( \
2309 DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL | \
2310 DISPATCH_MEMORYSTATUS_PRESSURE_WARN)
2311 #elif DISPATCH_USE_VM_PRESSURE_SOURCE
2312 #define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_VM
2313 #define DISPATCH_MEMORYSTATUS_SOURCE_MASK DISPATCH_VM_PRESSURE
2314 #endif
2315
2316 #if DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2317 static dispatch_source_t _dispatch_memorystatus_source;
2318
2319 static void
2320 _dispatch_memorystatus_handler(void *context DISPATCH_UNUSED)
2321 {
2322 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
2323 unsigned long memorystatus;
2324 memorystatus = dispatch_source_get_data(_dispatch_memorystatus_source);
2325 if (memorystatus & DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL) {
2326 _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT;
2327 _voucher_activity_heap_pressure_normal();
2328 return;
2329 }
2330 _dispatch_continuation_cache_limit =
2331 DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYSTATUS_PRESSURE_WARN;
2332 _voucher_activity_heap_pressure_warn();
2333 #endif
2334 malloc_zone_pressure_relief(0,0);
2335 }
2336
2337 static void
2338 _dispatch_memorystatus_init(void)
2339 {
2340 _dispatch_memorystatus_source = dispatch_source_create(
2341 DISPATCH_MEMORYSTATUS_SOURCE_TYPE, 0,
2342 DISPATCH_MEMORYSTATUS_SOURCE_MASK,
2343 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true));
2344 dispatch_source_set_event_handler_f(_dispatch_memorystatus_source,
2345 _dispatch_memorystatus_handler);
2346 dispatch_resume(_dispatch_memorystatus_source);
2347 }
2348 #else
2349 static inline void _dispatch_memorystatus_init(void) {}
2350 #endif // DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2351
2352 #pragma mark -
2353 #pragma mark dispatch_mach
2354
2355 #if HAVE_MACH
2356
2357 #if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
2358 #define _dispatch_debug_machport(name) \
2359 dispatch_debug_machport((name), __func__)
2360 #else
2361 #define _dispatch_debug_machport(name) ((void)(name))
2362 #endif
2363
2364 // Flags for all notifications that are registered/unregistered when a
2365 // send-possible notification is requested/delivered
2366 #define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
2367 DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
2368 #define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \
2369 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2370 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2371 #define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \
2372 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2373 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2374
2375 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
2376 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
2377 (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
2378
2379 #define _DISPATCH_MACHPORT_HASH_SIZE 32
2380 #define _DISPATCH_MACHPORT_HASH(x) \
2381 _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
2382
2383 #ifndef MACH_RCV_LARGE_IDENTITY
2384 #define MACH_RCV_LARGE_IDENTITY 0x00000008
2385 #endif
2386 #ifndef MACH_RCV_VOUCHER
2387 #define MACH_RCV_VOUCHER 0x00000800
2388 #endif
2389 #define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX
2390 #define DISPATCH_MACH_RCV_OPTIONS ( \
2391 MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \
2392 MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \
2393 MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)) | \
2394 MACH_RCV_VOUCHER
2395
2396 #define DISPATCH_MACH_KEVENT_ARMED(dk) ((dk)->dk_kevent.ext[0])
2397
2398 static void _dispatch_kevent_machport_drain(struct kevent64_s *ke);
2399 static void _dispatch_kevent_mach_msg_drain(struct kevent64_s *ke);
2400 static void _dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr);
2401 static void _dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr);
2402 static void _dispatch_source_merge_mach_msg(dispatch_source_t ds,
2403 dispatch_source_refs_t dr, dispatch_kevent_t dk,
2404 mach_msg_header_t *hdr, mach_msg_size_t siz);
2405 static kern_return_t _dispatch_mach_notify_update(dispatch_kevent_t dk,
2406 uint32_t new_flags, uint32_t del_flags, uint32_t mask,
2407 mach_msg_id_t notify_msgid, mach_port_mscount_t notify_sync);
2408 static void _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr);
2409 static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
2410 dispatch_mach_reply_refs_t dmr, bool disconnected);
2411 static void _dispatch_mach_kevent_unregister(dispatch_mach_t dm);
2412 static inline void _dispatch_mach_msg_set_options(dispatch_object_t dou,
2413 mach_msg_option_t options);
2414 static void _dispatch_mach_msg_recv(dispatch_mach_t dm,
2415 dispatch_mach_reply_refs_t dmr, mach_msg_header_t *hdr,
2416 mach_msg_size_t siz);
2417 static void _dispatch_mach_merge_kevent(dispatch_mach_t dm,
2418 const struct kevent64_s *ke);
2419 static inline mach_msg_option_t _dispatch_mach_checkin_options(void);
2420
2421 static const size_t _dispatch_mach_recv_msg_size =
2422 DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE;
2423 static const size_t dispatch_mach_trailer_size =
2424 sizeof(dispatch_mach_trailer_t);
2425 static mach_msg_size_t _dispatch_mach_recv_msg_buf_size;
2426 static mach_port_t _dispatch_mach_portset, _dispatch_mach_recv_portset;
2427 static mach_port_t _dispatch_mach_notify_port;
2428 static struct kevent64_s _dispatch_mach_recv_kevent = {
2429 .filter = EVFILT_MACHPORT,
2430 .flags = EV_ADD|EV_ENABLE|EV_DISPATCH,
2431 .fflags = DISPATCH_MACH_RCV_OPTIONS,
2432 };
2433 static dispatch_source_t _dispatch_mach_notify_source;
2434 static const
2435 struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct = {
2436 .ke = {
2437 .filter = EVFILT_MACHPORT,
2438 .flags = EV_CLEAR,
2439 .fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT,
2440 },
2441 };
2442
2443 static void
2444 _dispatch_mach_recv_msg_buf_init(void)
2445 {
2446 mach_vm_size_t vm_size = mach_vm_round_page(
2447 _dispatch_mach_recv_msg_size + dispatch_mach_trailer_size);
2448 _dispatch_mach_recv_msg_buf_size = (mach_msg_size_t)vm_size;
2449 mach_vm_address_t vm_addr = vm_page_size;
2450 kern_return_t kr;
2451
2452 while (slowpath(kr = mach_vm_allocate(mach_task_self(), &vm_addr, vm_size,
2453 VM_FLAGS_ANYWHERE))) {
2454 if (kr != KERN_NO_SPACE) {
2455 (void)dispatch_assume_zero(kr);
2456 DISPATCH_CLIENT_CRASH("Could not allocate mach msg receive buffer");
2457 }
2458 _dispatch_temporary_resource_shortage();
2459 vm_addr = vm_page_size;
2460 }
2461 _dispatch_mach_recv_kevent.ext[0] = (uintptr_t)vm_addr;
2462 _dispatch_mach_recv_kevent.ext[1] = vm_size;
2463 }
2464
2465 static inline void*
2466 _dispatch_get_mach_recv_msg_buf(void)
2467 {
2468 return (void*)_dispatch_mach_recv_kevent.ext[0];
2469 }
2470
2471 static void
2472 _dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED)
2473 {
2474 kern_return_t kr;
2475
2476 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
2477 &_dispatch_mach_recv_portset);
2478 DISPATCH_VERIFY_MIG(kr);
2479 if (dispatch_assume_zero(kr)) {
2480 DISPATCH_CLIENT_CRASH(
2481 "mach_port_allocate() failed: cannot create port set");
2482 }
2483 dispatch_assert(_dispatch_get_mach_recv_msg_buf());
2484 dispatch_assert(dispatch_mach_trailer_size ==
2485 REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS(
2486 DISPATCH_MACH_RCV_TRAILER)));
2487 _dispatch_mach_recv_kevent.ident = _dispatch_mach_recv_portset;
2488 _dispatch_kq_update(&_dispatch_mach_recv_kevent);
2489
2490 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
2491 &_dispatch_mach_notify_port);
2492 DISPATCH_VERIFY_MIG(kr);
2493 if (dispatch_assume_zero(kr)) {
2494 DISPATCH_CLIENT_CRASH(
2495 "mach_port_allocate() failed: cannot create receive right");
2496 }
2497 _dispatch_mach_notify_source = dispatch_source_create(
2498 &_dispatch_source_type_mach_recv_direct,
2499 _dispatch_mach_notify_port, 0, &_dispatch_mgr_q);
2500 static const struct dispatch_continuation_s dc = {
2501 .dc_func = (void*)_dispatch_mach_notify_source_invoke,
2502 };
2503 _dispatch_mach_notify_source->ds_refs->ds_handler[DS_EVENT_HANDLER] =
2504 (dispatch_continuation_t)&dc;
2505 dispatch_assert(_dispatch_mach_notify_source);
2506 dispatch_resume(_dispatch_mach_notify_source);
2507 }
2508
2509 static mach_port_t
2510 _dispatch_get_mach_recv_portset(void)
2511 {
2512 static dispatch_once_t pred;
2513 dispatch_once_f(&pred, NULL, _dispatch_mach_recv_portset_init);
2514 return _dispatch_mach_recv_portset;
2515 }
2516
2517 static void
2518 _dispatch_mach_portset_init(void *context DISPATCH_UNUSED)
2519 {
2520 struct kevent64_s kev = {
2521 .filter = EVFILT_MACHPORT,
2522 .flags = EV_ADD,
2523 };
2524 kern_return_t kr;
2525
2526 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
2527 &_dispatch_mach_portset);
2528 DISPATCH_VERIFY_MIG(kr);
2529 if (dispatch_assume_zero(kr)) {
2530 DISPATCH_CLIENT_CRASH(
2531 "mach_port_allocate() failed: cannot create port set");
2532 }
2533 kev.ident = _dispatch_mach_portset;
2534 _dispatch_kq_update(&kev);
2535 }
2536
2537 static mach_port_t
2538 _dispatch_get_mach_portset(void)
2539 {
2540 static dispatch_once_t pred;
2541 dispatch_once_f(&pred, NULL, _dispatch_mach_portset_init);
2542 return _dispatch_mach_portset;
2543 }
2544
2545 static kern_return_t
2546 _dispatch_mach_portset_update(dispatch_kevent_t dk, mach_port_t mps)
2547 {
2548 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
2549 kern_return_t kr;
2550
2551 _dispatch_debug_machport(mp);
2552 kr = mach_port_move_member(mach_task_self(), mp, mps);
2553 if (slowpath(kr)) {
2554 DISPATCH_VERIFY_MIG(kr);
2555 switch (kr) {
2556 case KERN_INVALID_RIGHT:
2557 if (mps) {
2558 _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
2559 "mach_port_move_member() failed ", kr);
2560 break;
2561 }
2562 //fall through
2563 case KERN_INVALID_NAME:
2564 #if DISPATCH_DEBUG
2565 _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
2566 "prematurely", mp);
2567 #endif
2568 break;
2569 default:
2570 (void)dispatch_assume_zero(kr);
2571 break;
2572 }
2573 }
2574 return mps ? kr : 0;
2575 }
2576
2577 static void
2578 _dispatch_kevent_mach_recv_reenable(struct kevent64_s *ke DISPATCH_UNUSED)
2579 {
2580 #if (TARGET_IPHONE_SIMULATOR && \
2581 IPHONE_SIMULATOR_HOST_MIN_VERSION_REQUIRED < 1090) || \
2582 (!TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED < 1090)
2583 // delete and re-add kevent to workaround <rdar://problem/13924256>
2584 if (ke->ext[1] != _dispatch_mach_recv_kevent.ext[1]) {
2585 struct kevent64_s kev = _dispatch_mach_recv_kevent;
2586 kev.flags = EV_DELETE;
2587 _dispatch_kq_update(&kev);
2588 }
2589 #endif
2590 _dispatch_mgr_kevent_reenable(&_dispatch_mach_recv_kevent);
2591 }
2592
2593 static kern_return_t
2594 _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags,
2595 uint32_t del_flags)
2596 {
2597 kern_return_t kr = 0;
2598 dispatch_assert_zero(new_flags & del_flags);
2599 if ((new_flags & _DISPATCH_MACH_RECV_FLAGS) ||
2600 (del_flags & _DISPATCH_MACH_RECV_FLAGS)) {
2601 mach_port_t mps;
2602 if (new_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
2603 mps = _dispatch_get_mach_recv_portset();
2604 } else if ((new_flags & DISPATCH_MACH_RECV_MESSAGE) ||
2605 ((del_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) &&
2606 (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE))) {
2607 mps = _dispatch_get_mach_portset();
2608 } else {
2609 mps = MACH_PORT_NULL;
2610 }
2611 kr = _dispatch_mach_portset_update(dk, mps);
2612 }
2613 return kr;
2614 }
2615
2616 static kern_return_t
2617 _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk, uint32_t new_flags,
2618 uint32_t del_flags)
2619 {
2620 kern_return_t kr = 0;
2621 dispatch_assert_zero(new_flags & del_flags);
2622 if ((new_flags & _DISPATCH_MACH_SP_FLAGS) ||
2623 (del_flags & _DISPATCH_MACH_SP_FLAGS)) {
2624 // Requesting a (delayed) non-sync send-possible notification
2625 // registers for both immediate dead-name notification and delayed-arm
2626 // send-possible notification for the port.
2627 // The send-possible notification is armed when a mach_msg() with the
2628 // the MACH_SEND_NOTIFY to the port times out.
2629 // If send-possible is unavailable, fall back to immediate dead-name
2630 // registration rdar://problem/2527840&9008724
2631 kr = _dispatch_mach_notify_update(dk, new_flags, del_flags,
2632 _DISPATCH_MACH_SP_FLAGS, MACH_NOTIFY_SEND_POSSIBLE,
2633 MACH_NOTIFY_SEND_POSSIBLE == MACH_NOTIFY_DEAD_NAME ? 1 : 0);
2634 }
2635 return kr;
2636 }
2637
2638 static inline void
2639 _dispatch_kevent_mach_portset(struct kevent64_s *ke)
2640 {
2641 if (ke->ident == _dispatch_mach_recv_portset) {
2642 return _dispatch_kevent_mach_msg_drain(ke);
2643 } else if (ke->ident == _dispatch_mach_portset) {
2644 return _dispatch_kevent_machport_drain(ke);
2645 } else {
2646 return _dispatch_kevent_error(ke);
2647 }
2648 }
2649
2650 DISPATCH_NOINLINE
2651 static void
2652 _dispatch_kevent_machport_drain(struct kevent64_s *ke)
2653 {
2654 mach_port_t name = (mach_port_name_t)ke->data;
2655 dispatch_kevent_t dk;
2656 struct kevent64_s kev;
2657
2658 _dispatch_debug_machport(name);
2659 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
2660 if (!dispatch_assume(dk)) {
2661 return;
2662 }
2663 _dispatch_mach_portset_update(dk, MACH_PORT_NULL); // emulate EV_DISPATCH
2664
2665 EV_SET64(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH,
2666 DISPATCH_MACH_RECV_MESSAGE, 0, (uintptr_t)dk, 0, 0);
2667 _dispatch_kevent_debug(&kev, __func__);
2668 _dispatch_kevent_merge(&kev);
2669 }
2670
2671 DISPATCH_NOINLINE
2672 static void
2673 _dispatch_kevent_mach_msg_drain(struct kevent64_s *ke)
2674 {
2675 mach_msg_header_t *hdr = (mach_msg_header_t*)ke->ext[0];
2676 mach_msg_size_t siz, msgsiz;
2677 mach_msg_return_t kr = (mach_msg_return_t)ke->fflags;
2678
2679 _dispatch_kevent_mach_recv_reenable(ke);
2680 if (!dispatch_assume(hdr)) {
2681 DISPATCH_CRASH("EVFILT_MACHPORT with no message");
2682 }
2683 if (fastpath(!kr)) {
2684 return _dispatch_kevent_mach_msg_recv(hdr);
2685 } else if (kr != MACH_RCV_TOO_LARGE) {
2686 goto out;
2687 }
2688 if (!dispatch_assume(ke->ext[1] <= UINT_MAX -
2689 dispatch_mach_trailer_size)) {
2690 DISPATCH_CRASH("EVFILT_MACHPORT with overlarge message");
2691 }
2692 siz = (mach_msg_size_t)ke->ext[1] + dispatch_mach_trailer_size;
2693 hdr = malloc(siz);
2694 if (ke->data) {
2695 if (!dispatch_assume(hdr)) {
2696 // Kernel will discard message too large to fit
2697 hdr = _dispatch_get_mach_recv_msg_buf();
2698 siz = _dispatch_mach_recv_msg_buf_size;
2699 }
2700 mach_port_t name = (mach_port_name_t)ke->data;
2701 const mach_msg_option_t options = ((DISPATCH_MACH_RCV_OPTIONS |
2702 MACH_RCV_TIMEOUT) & ~MACH_RCV_LARGE);
2703 kr = mach_msg(hdr, options, 0, siz, name, MACH_MSG_TIMEOUT_NONE,
2704 MACH_PORT_NULL);
2705 if (fastpath(!kr)) {
2706 return _dispatch_kevent_mach_msg_recv(hdr);
2707 } else if (kr == MACH_RCV_TOO_LARGE) {
2708 _dispatch_log("BUG in libdispatch client: "
2709 "_dispatch_kevent_mach_msg_drain: dropped message too "
2710 "large to fit in memory: id = 0x%x, size = %lld",
2711 hdr->msgh_id, ke->ext[1]);
2712 kr = MACH_MSG_SUCCESS;
2713 }
2714 } else {
2715 // We don't know which port in the portset contains the large message,
2716 // so need to receive all messages pending on the portset to ensure the
2717 // large message is drained. <rdar://problem/13950432>
2718 bool received = false;
2719 for (;;) {
2720 if (!dispatch_assume(hdr)) {
2721 DISPATCH_CLIENT_CRASH("Message too large to fit in memory");
2722 }
2723 const mach_msg_option_t options = (DISPATCH_MACH_RCV_OPTIONS |
2724 MACH_RCV_TIMEOUT);
2725 kr = mach_msg(hdr, options, 0, siz, _dispatch_mach_recv_portset,
2726 MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
2727 if ((!kr || kr == MACH_RCV_TOO_LARGE) && !dispatch_assume(
2728 hdr->msgh_size <= UINT_MAX - dispatch_mach_trailer_size)) {
2729 DISPATCH_CRASH("Overlarge message");
2730 }
2731 if (fastpath(!kr)) {
2732 msgsiz = hdr->msgh_size + dispatch_mach_trailer_size;
2733 if (msgsiz < siz) {
2734 void *shrink = realloc(hdr, msgsiz);
2735 if (shrink) hdr = shrink;
2736 }
2737 _dispatch_kevent_mach_msg_recv(hdr);
2738 hdr = NULL;
2739 received = true;
2740 } else if (kr == MACH_RCV_TOO_LARGE) {
2741 siz = hdr->msgh_size + dispatch_mach_trailer_size;
2742 } else {
2743 if (kr == MACH_RCV_TIMED_OUT && received) {
2744 kr = MACH_MSG_SUCCESS;
2745 }
2746 break;
2747 }
2748 hdr = reallocf(hdr, siz);
2749 }
2750 }
2751 if (hdr != _dispatch_get_mach_recv_msg_buf()) {
2752 free(hdr);
2753 }
2754 out:
2755 if (slowpath(kr)) {
2756 _dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: "
2757 "message reception failed", kr);
2758 }
2759 }
2760
2761 static void
2762 _dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr)
2763 {
2764 dispatch_source_refs_t dri;
2765 dispatch_kevent_t dk;
2766 mach_port_t name = hdr->msgh_local_port;
2767 mach_msg_size_t siz = hdr->msgh_size + dispatch_mach_trailer_size;
2768
2769 if (!dispatch_assume(hdr->msgh_size <= UINT_MAX -
2770 dispatch_mach_trailer_size)) {
2771 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2772 "received overlarge message");
2773 return _dispatch_kevent_mach_msg_destroy(hdr);
2774 }
2775 if (!dispatch_assume(name)) {
2776 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2777 "received message with MACH_PORT_NULL port");
2778 return _dispatch_kevent_mach_msg_destroy(hdr);
2779 }
2780 _dispatch_debug_machport(name);
2781 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
2782 if (!dispatch_assume(dk)) {
2783 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2784 "received message with unknown kevent");
2785 return _dispatch_kevent_mach_msg_destroy(hdr);
2786 }
2787 _dispatch_kevent_debug(&dk->dk_kevent, __func__);
2788 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
2789 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
2790 if (dsi->ds_pending_data_mask & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
2791 return _dispatch_source_merge_mach_msg(dsi, dri, dk, hdr, siz);
2792 }
2793 }
2794 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2795 "received message with no listeners");
2796 return _dispatch_kevent_mach_msg_destroy(hdr);
2797 }
2798
2799 static void
2800 _dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr)
2801 {
2802 if (hdr) {
2803 mach_msg_destroy(hdr);
2804 if (hdr != _dispatch_get_mach_recv_msg_buf()) {
2805 free(hdr);
2806 }
2807 }
2808 }
2809
2810 static void
2811 _dispatch_source_merge_mach_msg(dispatch_source_t ds, dispatch_source_refs_t dr,
2812 dispatch_kevent_t dk, mach_msg_header_t *hdr, mach_msg_size_t siz)
2813 {
2814 if (ds == _dispatch_mach_notify_source) {
2815 _dispatch_mach_notify_source_invoke(hdr);
2816 return _dispatch_kevent_mach_msg_destroy(hdr);
2817 }
2818 dispatch_mach_reply_refs_t dmr = NULL;
2819 if (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) {
2820 dmr = (dispatch_mach_reply_refs_t)dr;
2821 }
2822 return _dispatch_mach_msg_recv((dispatch_mach_t)ds, dmr, hdr, siz);
2823 }
2824
2825 DISPATCH_ALWAYS_INLINE
2826 static inline void
2827 _dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, bool final)
2828 {
2829 dispatch_source_refs_t dri, dr_next;
2830 dispatch_kevent_t dk;
2831 struct kevent64_s kev;
2832 bool unreg;
2833
2834 dk = _dispatch_kevent_find(name, DISPATCH_EVFILT_MACH_NOTIFICATION);
2835 if (!dk) {
2836 return;
2837 }
2838
2839 // Update notification registration state.
2840 dk->dk_kevent.data &= ~_DISPATCH_MACH_SP_FLAGS;
2841 EV_SET64(&kev, name, DISPATCH_EVFILT_MACH_NOTIFICATION, EV_ADD|EV_ENABLE,
2842 flag, 0, (uintptr_t)dk, 0, 0);
2843 if (final) {
2844 // This can never happen again
2845 unreg = true;
2846 } else {
2847 // Re-register for notification before delivery
2848 unreg = _dispatch_kevent_resume(dk, flag, 0);
2849 }
2850 DISPATCH_MACH_KEVENT_ARMED(dk) = 0;
2851 TAILQ_FOREACH_SAFE(dri, &dk->dk_sources, dr_list, dr_next) {
2852 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
2853 if (dx_type(dsi) == DISPATCH_MACH_CHANNEL_TYPE) {
2854 dispatch_mach_t dm = (dispatch_mach_t)dsi;
2855 _dispatch_mach_merge_kevent(dm, &kev);
2856 if (unreg && dm->dm_dkev) {
2857 _dispatch_mach_kevent_unregister(dm);
2858 }
2859 } else {
2860 _dispatch_source_merge_kevent(dsi, &kev);
2861 if (unreg) {
2862 _dispatch_source_kevent_unregister(dsi);
2863 }
2864 }
2865 if (!dr_next || DISPATCH_MACH_KEVENT_ARMED(dk)) {
2866 // current merge is last in list (dk might have been freed)
2867 // or it re-armed the notification
2868 return;
2869 }
2870 }
2871 }
2872
2873 static kern_return_t
2874 _dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags,
2875 uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid,
2876 mach_port_mscount_t notify_sync)
2877 {
2878 mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
2879 typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data;
2880 kern_return_t kr, krr = 0;
2881
2882 // Update notification registration state.
2883 dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask;
2884 dk->dk_kevent.data &= ~(del_flags & mask);
2885
2886 _dispatch_debug_machport(port);
2887 if ((dk->dk_kevent.data & mask) && !(prev & mask)) {
2888 // initialize _dispatch_mach_notify_port:
2889 (void)_dispatch_get_mach_recv_portset();
2890 _dispatch_debug("machport[0x%08x]: registering for send-possible "
2891 "notification", port);
2892 previous = MACH_PORT_NULL;
2893 krr = mach_port_request_notification(mach_task_self(), port,
2894 notify_msgid, notify_sync, _dispatch_mach_notify_port,
2895 MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
2896 DISPATCH_VERIFY_MIG(krr);
2897
2898 switch(krr) {
2899 case KERN_INVALID_NAME:
2900 case KERN_INVALID_RIGHT:
2901 // Supress errors & clear registration state
2902 dk->dk_kevent.data &= ~mask;
2903 break;
2904 default:
2905 // Else, we dont expect any errors from mach. Log any errors
2906 if (dispatch_assume_zero(krr)) {
2907 // log the error & clear registration state
2908 dk->dk_kevent.data &= ~mask;
2909 } else if (dispatch_assume_zero(previous)) {
2910 // Another subsystem has beat libdispatch to requesting the
2911 // specified Mach notification on this port. We should
2912 // technically cache the previous port and message it when the
2913 // kernel messages our port. Or we can just say screw those
2914 // subsystems and deallocate the previous port.
2915 // They should adopt libdispatch :-P
2916 kr = mach_port_deallocate(mach_task_self(), previous);
2917 DISPATCH_VERIFY_MIG(kr);
2918 (void)dispatch_assume_zero(kr);
2919 previous = MACH_PORT_NULL;
2920 }
2921 }
2922 } else if (!(dk->dk_kevent.data & mask) && (prev & mask)) {
2923 _dispatch_debug("machport[0x%08x]: unregistering for send-possible "
2924 "notification", port);
2925 previous = MACH_PORT_NULL;
2926 kr = mach_port_request_notification(mach_task_self(), port,
2927 notify_msgid, notify_sync, MACH_PORT_NULL,
2928 MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous);
2929 DISPATCH_VERIFY_MIG(kr);
2930
2931 switch (kr) {
2932 case KERN_INVALID_NAME:
2933 case KERN_INVALID_RIGHT:
2934 case KERN_INVALID_ARGUMENT:
2935 break;
2936 default:
2937 if (dispatch_assume_zero(kr)) {
2938 // log the error
2939 }
2940 }
2941 } else {
2942 return 0;
2943 }
2944 if (slowpath(previous)) {
2945 // the kernel has not consumed the send-once right yet
2946 (void)dispatch_assume_zero(
2947 _dispatch_send_consume_send_once_right(previous));
2948 }
2949 return krr;
2950 }
2951
2952 static void
2953 _dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED)
2954 {
2955 (void)_dispatch_get_mach_recv_portset();
2956 _dispatch_debug("registering for calendar-change notification");
2957 kern_return_t kr = host_request_notification(_dispatch_get_mach_host_port(),
2958 HOST_NOTIFY_CALENDAR_CHANGE, _dispatch_mach_notify_port);
2959 DISPATCH_VERIFY_MIG(kr);
2960 (void)dispatch_assume_zero(kr);
2961 }
2962
2963 static void
2964 _dispatch_mach_host_calendar_change_register(void)
2965 {
2966 static dispatch_once_t pred;
2967 dispatch_once_f(&pred, NULL, _dispatch_mach_host_notify_update);
2968 }
2969
2970 static void
2971 _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr)
2972 {
2973 mig_reply_error_t reply;
2974 dispatch_assert(sizeof(mig_reply_error_t) == sizeof(union
2975 __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem));
2976 dispatch_assert(sizeof(mig_reply_error_t) < _dispatch_mach_recv_msg_size);
2977 boolean_t success = libdispatch_internal_protocol_server(hdr, &reply.Head);
2978 if (!success && reply.RetCode == MIG_BAD_ID && hdr->msgh_id == 950) {
2979 // host_notify_reply.defs: host_calendar_changed
2980 _dispatch_debug("calendar-change notification");
2981 _dispatch_timers_calendar_change();
2982 _dispatch_mach_host_notify_update(NULL);
2983 success = TRUE;
2984 reply.RetCode = KERN_SUCCESS;
2985 }
2986 if (dispatch_assume(success) && reply.RetCode != MIG_NO_REPLY) {
2987 (void)dispatch_assume_zero(reply.RetCode);
2988 }
2989 }
2990
2991 kern_return_t
2992 _dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED,
2993 mach_port_name_t name)
2994 {
2995 #if DISPATCH_DEBUG
2996 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
2997 "deleted prematurely", name);
2998 #endif
2999
3000 _dispatch_debug_machport(name);
3001 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED, true);
3002
3003 return KERN_SUCCESS;
3004 }
3005
3006 kern_return_t
3007 _dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED,
3008 mach_port_name_t name)
3009 {
3010 kern_return_t kr;
3011
3012 _dispatch_debug("machport[0x%08x]: dead-name notification", name);
3013 _dispatch_debug_machport(name);
3014 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD, true);
3015
3016 // the act of receiving a dead name notification allocates a dead-name
3017 // right that must be deallocated
3018 kr = mach_port_deallocate(mach_task_self(), name);
3019 DISPATCH_VERIFY_MIG(kr);
3020 //(void)dispatch_assume_zero(kr);
3021
3022 return KERN_SUCCESS;
3023 }
3024
3025 kern_return_t
3026 _dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED,
3027 mach_port_name_t name)
3028 {
3029 _dispatch_debug("machport[0x%08x]: send-possible notification", name);
3030 _dispatch_debug_machport(name);
3031 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE, false);
3032
3033 return KERN_SUCCESS;
3034 }
3035
3036 #pragma mark -
3037 #pragma mark dispatch_mach_t
3038
3039 #define DISPATCH_MACH_NEVER_CONNECTED (UINT32_MAX/2)
3040 #define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2
3041 #define DISPATCH_MACH_OPTIONS_MASK 0xffff
3042
3043 static mach_port_t _dispatch_mach_msg_get_remote_port(dispatch_object_t dou);
3044 static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm,
3045 mach_port_t local_port, mach_port_t remote_port);
3046 static dispatch_mach_msg_t _dispatch_mach_msg_create_reply_disconnected(
3047 dispatch_object_t dou, dispatch_mach_reply_refs_t dmr);
3048 static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm,
3049 dispatch_object_t dou);
3050 static inline mach_msg_header_t* _dispatch_mach_msg_get_msg(
3051 dispatch_mach_msg_t dmsg);
3052 static void _dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou,
3053 pthread_priority_t pp);
3054
3055 static dispatch_mach_t
3056 _dispatch_mach_create(const char *label, dispatch_queue_t q, void *context,
3057 dispatch_mach_handler_function_t handler, bool handler_is_block)
3058 {
3059 dispatch_mach_t dm;
3060 dispatch_mach_refs_t dr;
3061
3062 dm = _dispatch_alloc(DISPATCH_VTABLE(mach),
3063 sizeof(struct dispatch_mach_s));
3064 _dispatch_queue_init((dispatch_queue_t)dm);
3065 dm->dq_label = label;
3066
3067 dm->do_ref_cnt++; // the reference _dispatch_mach_cancel_invoke holds
3068 dm->do_ref_cnt++; // since channel is created suspended
3069 dm->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
3070 dm->do_targetq = &_dispatch_mgr_q;
3071
3072 dr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s));
3073 dr->dr_source_wref = _dispatch_ptr2wref(dm);
3074 dr->dm_handler_func = handler;
3075 dr->dm_handler_ctxt = context;
3076 dm->ds_refs = dr;
3077 dm->dm_handler_is_block = handler_is_block;
3078
3079 dm->dm_refs = _dispatch_calloc(1ul,
3080 sizeof(struct dispatch_mach_send_refs_s));
3081 dm->dm_refs->dr_source_wref = _dispatch_ptr2wref(dm);
3082 dm->dm_refs->dm_disconnect_cnt = DISPATCH_MACH_NEVER_CONNECTED;
3083 TAILQ_INIT(&dm->dm_refs->dm_replies);
3084
3085 // First item on the channel sets the user-specified target queue
3086 dispatch_set_target_queue(dm, q);
3087 _dispatch_object_debug(dm, "%s", __func__);
3088 return dm;
3089 }
3090
3091 dispatch_mach_t
3092 dispatch_mach_create(const char *label, dispatch_queue_t q,
3093 dispatch_mach_handler_t handler)
3094 {
3095 dispatch_block_t bb = _dispatch_Block_copy((void*)handler);
3096 return _dispatch_mach_create(label, q, bb,
3097 (dispatch_mach_handler_function_t)_dispatch_Block_invoke(bb), true);
3098 }
3099
3100 dispatch_mach_t
3101 dispatch_mach_create_f(const char *label, dispatch_queue_t q, void *context,
3102 dispatch_mach_handler_function_t handler)
3103 {
3104 return _dispatch_mach_create(label, q, context, handler, false);
3105 }
3106
3107 void
3108 _dispatch_mach_dispose(dispatch_mach_t dm)
3109 {
3110 _dispatch_object_debug(dm, "%s", __func__);
3111 dispatch_mach_refs_t dr = dm->ds_refs;
3112 if (dm->dm_handler_is_block && dr->dm_handler_ctxt) {
3113 Block_release(dr->dm_handler_ctxt);
3114 }
3115 free(dr);
3116 free(dm->dm_refs);
3117 _dispatch_queue_destroy(dm);
3118 }
3119
3120 void
3121 dispatch_mach_connect(dispatch_mach_t dm, mach_port_t receive,
3122 mach_port_t send, dispatch_mach_msg_t checkin)
3123 {
3124 dispatch_mach_send_refs_t dr = dm->dm_refs;
3125 dispatch_kevent_t dk;
3126
3127 if (MACH_PORT_VALID(receive)) {
3128 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3129 dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
3130 dk->dk_kevent.ident = receive;
3131 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3132 dk->dk_kevent.udata = (uintptr_t)dk;
3133 TAILQ_INIT(&dk->dk_sources);
3134 dm->ds_dkev = dk;
3135 dm->ds_pending_data_mask = dk->dk_kevent.fflags;
3136 _dispatch_retain(dm); // the reference the manager queue holds
3137 }
3138 dr->dm_send = send;
3139 if (MACH_PORT_VALID(send)) {
3140 if (checkin) {
3141 dispatch_retain(checkin);
3142 mach_msg_option_t options = _dispatch_mach_checkin_options();
3143 _dispatch_mach_msg_set_options(checkin, options);
3144 dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
3145 }
3146 dr->dm_checkin = checkin;
3147 }
3148 // monitor message reply ports
3149 dm->ds_pending_data_mask |= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
3150 if (slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_disconnect_cnt,
3151 DISPATCH_MACH_NEVER_CONNECTED, 0, release))) {
3152 DISPATCH_CLIENT_CRASH("Channel already connected");
3153 }
3154 _dispatch_object_debug(dm, "%s", __func__);
3155 return dispatch_resume(dm);
3156 }
3157
3158 DISPATCH_NOINLINE
3159 static void
3160 _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
3161 dispatch_mach_reply_refs_t dmr, bool disconnected)
3162 {
3163 dispatch_mach_msg_t dmsgr = NULL;
3164 if (disconnected) {
3165 dmsgr = _dispatch_mach_msg_create_reply_disconnected(NULL, dmr);
3166 }
3167 dispatch_kevent_t dk = dmr->dmr_dkev;
3168 TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dmr, dr_list);
3169 _dispatch_kevent_unregister(dk, DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE);
3170 TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dmr_list);
3171 if (dmr->dmr_voucher) _voucher_release(dmr->dmr_voucher);
3172 free(dmr);
3173 if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority);
3174 }
3175
3176 DISPATCH_NOINLINE
3177 static void
3178 _dispatch_mach_reply_kevent_register(dispatch_mach_t dm, mach_port_t reply,
3179 dispatch_mach_msg_t dmsg)
3180 {
3181 dispatch_kevent_t dk;
3182 dispatch_mach_reply_refs_t dmr;
3183
3184 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3185 dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
3186 dk->dk_kevent.ident = reply;
3187 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3188 dk->dk_kevent.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
3189 dk->dk_kevent.udata = (uintptr_t)dk;
3190 TAILQ_INIT(&dk->dk_sources);
3191
3192 dmr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s));
3193 dmr->dr_source_wref = _dispatch_ptr2wref(dm);
3194 dmr->dmr_dkev = dk;
3195 if (dmsg->dmsg_voucher) {
3196 dmr->dmr_voucher =_voucher_retain(dmsg->dmsg_voucher);
3197 }
3198 dmr->dmr_priority = dmsg->dmsg_priority;
3199 // make reply context visible to leaks rdar://11777199
3200 dmr->dmr_ctxt = dmsg->do_ctxt;
3201
3202 _dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p", reply,
3203 dmsg->do_ctxt);
3204 uint32_t flags;
3205 bool do_resume = _dispatch_kevent_register(&dmr->dmr_dkev, &flags);
3206 TAILQ_INSERT_TAIL(&dmr->dmr_dkev->dk_sources, (dispatch_source_refs_t)dmr,
3207 dr_list);
3208 TAILQ_INSERT_TAIL(&dm->dm_refs->dm_replies, dmr, dmr_list);
3209 if (do_resume && _dispatch_kevent_resume(dmr->dmr_dkev, flags, 0)) {
3210 _dispatch_mach_reply_kevent_unregister(dm, dmr, true);
3211 }
3212 }
3213
3214 DISPATCH_NOINLINE
3215 static void
3216 _dispatch_mach_kevent_unregister(dispatch_mach_t dm)
3217 {
3218 dispatch_kevent_t dk = dm->dm_dkev;
3219 dm->dm_dkev = NULL;
3220 TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dm->dm_refs,
3221 dr_list);
3222 dm->ds_pending_data_mask &= ~(unsigned long)
3223 (DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
3224 _dispatch_kevent_unregister(dk,
3225 DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
3226 }
3227
3228 DISPATCH_NOINLINE
3229 static void
3230 _dispatch_mach_kevent_register(dispatch_mach_t dm, mach_port_t send)
3231 {
3232 dispatch_kevent_t dk;
3233
3234 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3235 dk->dk_kevent = _dispatch_source_type_mach_send.ke;
3236 dk->dk_kevent.ident = send;
3237 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3238 dk->dk_kevent.fflags = DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD;
3239 dk->dk_kevent.udata = (uintptr_t)dk;
3240 TAILQ_INIT(&dk->dk_sources);
3241
3242 dm->ds_pending_data_mask |= dk->dk_kevent.fflags;
3243
3244 uint32_t flags;
3245 bool do_resume = _dispatch_kevent_register(&dk, &flags);
3246 TAILQ_INSERT_TAIL(&dk->dk_sources,
3247 (dispatch_source_refs_t)dm->dm_refs, dr_list);
3248 dm->dm_dkev = dk;
3249 if (do_resume && _dispatch_kevent_resume(dm->dm_dkev, flags, 0)) {
3250 _dispatch_mach_kevent_unregister(dm);
3251 }
3252 }
3253
3254 static inline void
3255 _dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou,
3256 pthread_priority_t pp)
3257 {
3258 return _dispatch_queue_push(dm._dq, dou, pp);
3259 }
3260
3261 static inline void
3262 _dispatch_mach_msg_set_options(dispatch_object_t dou, mach_msg_option_t options)
3263 {
3264 dou._do->do_suspend_cnt = (unsigned int)options;
3265 }
3266
3267 static inline mach_msg_option_t
3268 _dispatch_mach_msg_get_options(dispatch_object_t dou)
3269 {
3270 mach_msg_option_t options = (mach_msg_option_t)dou._do->do_suspend_cnt;
3271 return options;
3272 }
3273
3274 static inline void
3275 _dispatch_mach_msg_set_reason(dispatch_object_t dou, mach_error_t err,
3276 unsigned long reason)
3277 {
3278 dispatch_assert_zero(reason & ~(unsigned long)code_emask);
3279 dou._do->do_suspend_cnt = (unsigned int)((err || !reason) ? err :
3280 err_local|err_sub(0x3e0)|(mach_error_t)reason);
3281 }
3282
3283 static inline unsigned long
3284 _dispatch_mach_msg_get_reason(dispatch_object_t dou, mach_error_t *err_ptr)
3285 {
3286 mach_error_t err = (mach_error_t)dou._do->do_suspend_cnt;
3287 dou._do->do_suspend_cnt = 0;
3288 if ((err & system_emask) == err_local && err_get_sub(err) == 0x3e0) {
3289 *err_ptr = 0;
3290 return err_get_code(err);
3291 }
3292 *err_ptr = err;
3293 return err ? DISPATCH_MACH_MESSAGE_SEND_FAILED : DISPATCH_MACH_MESSAGE_SENT;
3294 }
3295
3296 static void
3297 _dispatch_mach_msg_recv(dispatch_mach_t dm, dispatch_mach_reply_refs_t dmr,
3298 mach_msg_header_t *hdr, mach_msg_size_t siz)
3299 {
3300 _dispatch_debug_machport(hdr->msgh_remote_port);
3301 _dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x",
3302 hdr->msgh_local_port, hdr->msgh_id, hdr->msgh_remote_port);
3303 if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3304 return _dispatch_kevent_mach_msg_destroy(hdr);
3305 }
3306 dispatch_mach_msg_t dmsg;
3307 voucher_t voucher;
3308 pthread_priority_t priority;
3309 void *ctxt = NULL;
3310 if (dmr) {
3311 _voucher_mach_msg_clear(hdr, false); // deallocate reply message voucher
3312 voucher = dmr->dmr_voucher;
3313 dmr->dmr_voucher = NULL; // transfer reference
3314 priority = dmr->dmr_priority;
3315 ctxt = dmr->dmr_ctxt;
3316 _dispatch_mach_reply_kevent_unregister(dm, dmr, false);
3317 } else {
3318 voucher = voucher_create_with_mach_msg(hdr);
3319 priority = _voucher_get_priority(voucher);
3320 }
3321 dispatch_mach_msg_destructor_t destructor;
3322 destructor = (hdr == _dispatch_get_mach_recv_msg_buf()) ?
3323 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT :
3324 DISPATCH_MACH_MSG_DESTRUCTOR_FREE;
3325 dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL);
3326 dmsg->dmsg_voucher = voucher;
3327 dmsg->dmsg_priority = priority;
3328 dmsg->do_ctxt = ctxt;
3329 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_RECEIVED);
3330 _dispatch_voucher_debug("mach-msg[%p] create", voucher, dmsg);
3331 _dispatch_voucher_ktrace_dmsg_push(dmsg);
3332 return _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3333 }
3334
3335 static inline mach_port_t
3336 _dispatch_mach_msg_get_remote_port(dispatch_object_t dou)
3337 {
3338 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg);
3339 mach_port_t remote = hdr->msgh_remote_port;
3340 return remote;
3341 }
3342
3343 static inline void
3344 _dispatch_mach_msg_disconnected(dispatch_mach_t dm, mach_port_t local_port,
3345 mach_port_t remote_port)
3346 {
3347 mach_msg_header_t *hdr;
3348 dispatch_mach_msg_t dmsg;
3349 dmsg = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
3350 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
3351 if (local_port) hdr->msgh_local_port = local_port;
3352 if (remote_port) hdr->msgh_remote_port = remote_port;
3353 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_DISCONNECTED);
3354 return _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3355 }
3356
3357 static inline dispatch_mach_msg_t
3358 _dispatch_mach_msg_create_reply_disconnected(dispatch_object_t dou,
3359 dispatch_mach_reply_refs_t dmr)
3360 {
3361 dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr;
3362 if (dmsg && !dmsg->dmsg_reply) return NULL;
3363 mach_msg_header_t *hdr;
3364 dmsgr = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
3365 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
3366 if (dmsg) {
3367 hdr->msgh_local_port = dmsg->dmsg_reply;
3368 if (dmsg->dmsg_voucher) {
3369 dmsgr->dmsg_voucher = _voucher_retain(dmsg->dmsg_voucher);
3370 }
3371 dmsgr->dmsg_priority = dmsg->dmsg_priority;
3372 dmsgr->do_ctxt = dmsg->do_ctxt;
3373 } else {
3374 hdr->msgh_local_port = (mach_port_t)dmr->dmr_dkev->dk_kevent.ident;
3375 dmsgr->dmsg_voucher = dmr->dmr_voucher;
3376 dmr->dmr_voucher = NULL; // transfer reference
3377 dmsgr->dmsg_priority = dmr->dmr_priority;
3378 dmsgr->do_ctxt = dmr->dmr_ctxt;
3379 }
3380 _dispatch_mach_msg_set_reason(dmsgr, 0, DISPATCH_MACH_DISCONNECTED);
3381 return dmsgr;
3382 }
3383
3384 DISPATCH_NOINLINE
3385 static void
3386 _dispatch_mach_msg_not_sent(dispatch_mach_t dm, dispatch_object_t dou)
3387 {
3388 dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr;
3389 dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL);
3390 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_NOT_SENT);
3391 _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3392 if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority);
3393 }
3394
3395 DISPATCH_NOINLINE
3396 static dispatch_object_t
3397 _dispatch_mach_msg_send(dispatch_mach_t dm, dispatch_object_t dou)
3398 {
3399 dispatch_mach_send_refs_t dr = dm->dm_refs;
3400 dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr = NULL;
3401 voucher_t voucher = dmsg->dmsg_voucher;
3402 mach_voucher_t ipc_kvoucher = MACH_VOUCHER_NULL;
3403 bool clear_voucher = false, kvoucher_move_send = false;
3404 dr->dm_needs_mgr = 0;
3405 if (slowpath(dr->dm_checkin) && dmsg != dr->dm_checkin) {
3406 // send initial checkin message
3407 if (dm->dm_dkev && slowpath(_dispatch_queue_get_current() !=
3408 &_dispatch_mgr_q)) {
3409 // send kevent must be uninstalled on the manager queue
3410 dr->dm_needs_mgr = 1;
3411 goto out;
3412 }
3413 dr->dm_checkin = _dispatch_mach_msg_send(dm, dr->dm_checkin)._dmsg;
3414 if (slowpath(dr->dm_checkin)) {
3415 goto out;
3416 }
3417 }
3418 mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
3419 mach_msg_return_t kr = 0;
3420 mach_port_t reply = dmsg->dmsg_reply;
3421 mach_msg_option_t opts = 0, msg_opts = _dispatch_mach_msg_get_options(dmsg);
3422 if (!slowpath(msg_opts & DISPATCH_MACH_REGISTER_FOR_REPLY)) {
3423 opts = MACH_SEND_MSG | (msg_opts & ~DISPATCH_MACH_OPTIONS_MASK);
3424 if (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) !=
3425 MACH_MSG_TYPE_MOVE_SEND_ONCE) {
3426 if (dmsg != dr->dm_checkin) {
3427 msg->msgh_remote_port = dr->dm_send;
3428 }
3429 if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
3430 if (slowpath(!dm->dm_dkev)) {
3431 _dispatch_mach_kevent_register(dm, msg->msgh_remote_port);
3432 }
3433 if (fastpath(dm->dm_dkev)) {
3434 if (DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) {
3435 goto out;
3436 }
3437 opts |= MACH_SEND_NOTIFY;
3438 }
3439 }
3440 opts |= MACH_SEND_TIMEOUT;
3441 if (dmsg->dmsg_priority != _voucher_get_priority(voucher)) {
3442 ipc_kvoucher = _voucher_create_mach_voucher_with_priority(
3443 voucher, dmsg->dmsg_priority);
3444 }
3445 _dispatch_voucher_debug("mach-msg[%p] msg_set", voucher, dmsg);
3446 if (ipc_kvoucher) {
3447 kvoucher_move_send = true;
3448 clear_voucher = _voucher_mach_msg_set_mach_voucher(msg,
3449 ipc_kvoucher, kvoucher_move_send);
3450 } else {
3451 clear_voucher = _voucher_mach_msg_set(msg, voucher);
3452 }
3453 }
3454 _voucher_activity_trace_msg(voucher, msg, send);
3455 _dispatch_debug_machport(msg->msgh_remote_port);
3456 if (reply) _dispatch_debug_machport(reply);
3457 kr = mach_msg(msg, opts, msg->msgh_size, 0, MACH_PORT_NULL, 0,
3458 MACH_PORT_NULL);
3459 _dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, "
3460 "opts 0x%x, msg_opts 0x%x, kvoucher 0x%08x, reply on 0x%08x: "
3461 "%s - 0x%x", msg->msgh_remote_port, msg->msgh_id, dmsg->do_ctxt,
3462 opts, msg_opts, msg->msgh_voucher_port, reply,
3463 mach_error_string(kr), kr);
3464 if (clear_voucher) {
3465 if (kr == MACH_SEND_INVALID_VOUCHER && msg->msgh_voucher_port) {
3466 DISPATCH_CRASH("Voucher port corruption");
3467 }
3468 mach_voucher_t kv;
3469 kv = _voucher_mach_msg_clear(msg, kvoucher_move_send);
3470 if (kvoucher_move_send) ipc_kvoucher = kv;
3471 }
3472 }
3473 if (kr == MACH_SEND_TIMED_OUT && (opts & MACH_SEND_TIMEOUT)) {
3474 if (opts & MACH_SEND_NOTIFY) {
3475 _dispatch_debug("machport[0x%08x]: send-possible notification "
3476 "armed", (mach_port_t)dm->dm_dkev->dk_kevent.ident);
3477 DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) = 1;
3478 } else {
3479 // send kevent must be installed on the manager queue
3480 dr->dm_needs_mgr = 1;
3481 }
3482 if (ipc_kvoucher) {
3483 _dispatch_kvoucher_debug("reuse on re-send", ipc_kvoucher);
3484 voucher_t ipc_voucher;
3485 ipc_voucher = _voucher_create_with_priority_and_mach_voucher(
3486 voucher, dmsg->dmsg_priority, ipc_kvoucher);
3487 _dispatch_voucher_debug("mach-msg[%p] replace voucher[%p]",
3488 ipc_voucher, dmsg, voucher);
3489 if (dmsg->dmsg_voucher) _voucher_release(dmsg->dmsg_voucher);
3490 dmsg->dmsg_voucher = ipc_voucher;
3491 }
3492 goto out;
3493 } else if (ipc_kvoucher && (kr || !kvoucher_move_send)) {
3494 _voucher_dealloc_mach_voucher(ipc_kvoucher);
3495 }
3496 if (fastpath(!kr) && reply &&
3497 !(dm->ds_dkev && dm->ds_dkev->dk_kevent.ident == reply)) {
3498 if (_dispatch_queue_get_current() != &_dispatch_mgr_q) {
3499 // reply receive kevent must be installed on the manager queue
3500 dr->dm_needs_mgr = 1;
3501 _dispatch_mach_msg_set_options(dmsg, msg_opts |
3502 DISPATCH_MACH_REGISTER_FOR_REPLY);
3503 goto out;
3504 }
3505 _dispatch_mach_reply_kevent_register(dm, reply, dmsg);
3506 }
3507 if (slowpath(dmsg == dr->dm_checkin) && dm->dm_dkev) {
3508 _dispatch_mach_kevent_unregister(dm);
3509 }
3510 if (slowpath(kr)) {
3511 // Send failed, so reply was never connected <rdar://problem/14309159>
3512 dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL);
3513 }
3514 _dispatch_mach_msg_set_reason(dmsg, kr, 0);
3515 _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3516 if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority);
3517 dmsg = NULL;
3518 out:
3519 return (dispatch_object_t)dmsg;
3520 }
3521
3522 DISPATCH_ALWAYS_INLINE
3523 static inline void
3524 _dispatch_mach_send_push_wakeup(dispatch_mach_t dm, dispatch_object_t dou,
3525 bool wakeup)
3526 {
3527 dispatch_mach_send_refs_t dr = dm->dm_refs;
3528 struct dispatch_object_s *prev, *dc = dou._do;
3529 dc->do_next = NULL;
3530
3531 prev = dispatch_atomic_xchg2o(dr, dm_tail, dc, release);
3532 if (fastpath(prev)) {
3533 prev->do_next = dc;
3534 } else {
3535 dr->dm_head = dc;
3536 }
3537 if (wakeup || !prev) {
3538 _dispatch_wakeup(dm);
3539 }
3540 }
3541
3542 DISPATCH_ALWAYS_INLINE
3543 static inline void
3544 _dispatch_mach_send_push(dispatch_mach_t dm, dispatch_object_t dou)
3545 {
3546 return _dispatch_mach_send_push_wakeup(dm, dou, false);
3547 }
3548
3549 DISPATCH_NOINLINE
3550 static void
3551 _dispatch_mach_send_drain(dispatch_mach_t dm)
3552 {
3553 dispatch_mach_send_refs_t dr = dm->dm_refs;
3554 struct dispatch_object_s *dc = NULL, *next_dc = NULL;
3555 while (dr->dm_tail) {
3556 _dispatch_wait_until(dc = fastpath(dr->dm_head));
3557 do {
3558 next_dc = fastpath(dc->do_next);
3559 dr->dm_head = next_dc;
3560 if (!next_dc && !dispatch_atomic_cmpxchg2o(dr, dm_tail, dc, NULL,
3561 relaxed)) {
3562 _dispatch_wait_until(next_dc = fastpath(dc->do_next));
3563 dr->dm_head = next_dc;
3564 }
3565 if (!DISPATCH_OBJ_IS_VTABLE(dc)) {
3566 if ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
3567 // send barrier
3568 // leave send queue locked until barrier has completed
3569 return _dispatch_mach_push(dm, dc,
3570 ((dispatch_continuation_t)dc)->dc_priority);
3571 }
3572 #if DISPATCH_MACH_SEND_SYNC
3573 if (slowpath((long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT)){
3574 _dispatch_thread_semaphore_signal(
3575 (_dispatch_thread_semaphore_t)dc->do_ctxt);
3576 continue;
3577 }
3578 #endif // DISPATCH_MACH_SEND_SYNC
3579 if (slowpath(!_dispatch_mach_reconnect_invoke(dm, dc))) {
3580 goto out;
3581 }
3582 continue;
3583 }
3584 _dispatch_voucher_ktrace_dmsg_pop((dispatch_mach_msg_t)dc);
3585 if (slowpath(dr->dm_disconnect_cnt) ||
3586 slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3587 _dispatch_mach_msg_not_sent(dm, dc);
3588 continue;
3589 }
3590 if (slowpath(dc = _dispatch_mach_msg_send(dm, dc)._do)) {
3591 goto out;
3592 }
3593 } while ((dc = next_dc));
3594 }
3595 out:
3596 // if this is not a complete drain, we must undo some things
3597 if (slowpath(dc)) {
3598 if (!next_dc &&
3599 !dispatch_atomic_cmpxchg2o(dr, dm_tail, NULL, dc, relaxed)) {
3600 // wait for enqueue slow path to finish
3601 _dispatch_wait_until(next_dc = fastpath(dr->dm_head));
3602 dc->do_next = next_dc;
3603 }
3604 dr->dm_head = dc;
3605 }
3606 (void)dispatch_atomic_dec2o(dr, dm_sending, release);
3607 _dispatch_wakeup(dm);
3608 }
3609
3610 static inline void
3611 _dispatch_mach_send(dispatch_mach_t dm)
3612 {
3613 dispatch_mach_send_refs_t dr = dm->dm_refs;
3614 if (!fastpath(dr->dm_tail) || !fastpath(dispatch_atomic_cmpxchg2o(dr,
3615 dm_sending, 0, 1, acquire))) {
3616 return;
3617 }
3618 _dispatch_object_debug(dm, "%s", __func__);
3619 _dispatch_mach_send_drain(dm);
3620 }
3621
3622 DISPATCH_NOINLINE
3623 static void
3624 _dispatch_mach_merge_kevent(dispatch_mach_t dm, const struct kevent64_s *ke)
3625 {
3626 if (!(ke->fflags & dm->ds_pending_data_mask)) {
3627 return;
3628 }
3629 _dispatch_mach_send(dm);
3630 }
3631
3632 static inline mach_msg_option_t
3633 _dispatch_mach_checkin_options(void)
3634 {
3635 mach_msg_option_t options = 0;
3636 #if DISPATCH_USE_CHECKIN_NOIMPORTANCE
3637 options = MACH_SEND_NOIMPORTANCE; // <rdar://problem/16996737>
3638 #endif
3639 return options;
3640 }
3641
3642
3643 static inline mach_msg_option_t
3644 _dispatch_mach_send_options(void)
3645 {
3646 mach_msg_option_t options = 0;
3647 return options;
3648 }
3649
3650 DISPATCH_NOINLINE
3651 void
3652 dispatch_mach_send(dispatch_mach_t dm, dispatch_mach_msg_t dmsg,
3653 mach_msg_option_t options)
3654 {
3655 dispatch_mach_send_refs_t dr = dm->dm_refs;
3656 if (slowpath(dmsg->do_next != DISPATCH_OBJECT_LISTLESS)) {
3657 DISPATCH_CLIENT_CRASH("Message already enqueued");
3658 }
3659 dispatch_retain(dmsg);
3660 dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK);
3661 options |= _dispatch_mach_send_options();
3662 _dispatch_mach_msg_set_options(dmsg, options & ~DISPATCH_MACH_OPTIONS_MASK);
3663 mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
3664 dmsg->dmsg_reply = (MACH_MSGH_BITS_LOCAL(msg->msgh_bits) ==
3665 MACH_MSG_TYPE_MAKE_SEND_ONCE &&
3666 MACH_PORT_VALID(msg->msgh_local_port) ? msg->msgh_local_port :
3667 MACH_PORT_NULL);
3668 bool is_reply = (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) ==
3669 MACH_MSG_TYPE_MOVE_SEND_ONCE);
3670 dmsg->dmsg_priority = _dispatch_priority_propagate();
3671 dmsg->dmsg_voucher = _voucher_copy();
3672 _dispatch_voucher_debug("mach-msg[%p] set", dmsg->dmsg_voucher, dmsg);
3673 if ((!is_reply && slowpath(dr->dm_tail)) ||
3674 slowpath(dr->dm_disconnect_cnt) ||
3675 slowpath(dm->ds_atomic_flags & DSF_CANCELED) ||
3676 slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1,
3677 acquire))) {
3678 _dispatch_voucher_ktrace_dmsg_push(dmsg);
3679 return _dispatch_mach_send_push(dm, dmsg);
3680 }
3681 if (slowpath(dmsg = _dispatch_mach_msg_send(dm, dmsg)._dmsg)) {
3682 (void)dispatch_atomic_dec2o(dr, dm_sending, release);
3683 _dispatch_voucher_ktrace_dmsg_push(dmsg);
3684 return _dispatch_mach_send_push_wakeup(dm, dmsg, true);
3685 }
3686 if (!is_reply && slowpath(dr->dm_tail)) {
3687 return _dispatch_mach_send_drain(dm);
3688 }
3689 (void)dispatch_atomic_dec2o(dr, dm_sending, release);
3690 _dispatch_wakeup(dm);
3691 }
3692
3693 static void
3694 _dispatch_mach_disconnect(dispatch_mach_t dm)
3695 {
3696 dispatch_mach_send_refs_t dr = dm->dm_refs;
3697 if (dm->dm_dkev) {
3698 _dispatch_mach_kevent_unregister(dm);
3699 }
3700 if (MACH_PORT_VALID(dr->dm_send)) {
3701 _dispatch_mach_msg_disconnected(dm, MACH_PORT_NULL, dr->dm_send);
3702 }
3703 dr->dm_send = MACH_PORT_NULL;
3704 if (dr->dm_checkin) {
3705 _dispatch_mach_msg_not_sent(dm, dr->dm_checkin);
3706 dr->dm_checkin = NULL;
3707 }
3708 if (!TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3709 dispatch_mach_reply_refs_t dmr, tmp;
3710 TAILQ_FOREACH_SAFE(dmr, &dm->dm_refs->dm_replies, dmr_list, tmp){
3711 _dispatch_mach_reply_kevent_unregister(dm, dmr, true);
3712 }
3713 }
3714 }
3715
3716 DISPATCH_NOINLINE
3717 static bool
3718 _dispatch_mach_cancel(dispatch_mach_t dm)
3719 {
3720 dispatch_mach_send_refs_t dr = dm->dm_refs;
3721 if (!fastpath(dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1, acquire))) {
3722 return false;
3723 }
3724 _dispatch_object_debug(dm, "%s", __func__);
3725 _dispatch_mach_disconnect(dm);
3726 if (dm->ds_dkev) {
3727 mach_port_t local_port = (mach_port_t)dm->ds_dkev->dk_kevent.ident;
3728 _dispatch_source_kevent_unregister((dispatch_source_t)dm);
3729 _dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL);
3730 }
3731 (void)dispatch_atomic_dec2o(dr, dm_sending, release);
3732 return true;
3733 }
3734
3735 DISPATCH_NOINLINE
3736 static bool
3737 _dispatch_mach_reconnect_invoke(dispatch_mach_t dm, dispatch_object_t dou)
3738 {
3739 if (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3740 if (slowpath(_dispatch_queue_get_current() != &_dispatch_mgr_q)) {
3741 // send/reply kevents must be uninstalled on the manager queue
3742 return false;
3743 }
3744 }
3745 _dispatch_mach_disconnect(dm);
3746 dispatch_mach_send_refs_t dr = dm->dm_refs;
3747 dr->dm_checkin = dou._dc->dc_data;
3748 dr->dm_send = (mach_port_t)dou._dc->dc_other;
3749 _dispatch_continuation_free(dou._dc);
3750 (void)dispatch_atomic_dec2o(dr, dm_disconnect_cnt, relaxed);
3751 _dispatch_object_debug(dm, "%s", __func__);
3752 return true;
3753 }
3754
3755 DISPATCH_NOINLINE
3756 void
3757 dispatch_mach_reconnect(dispatch_mach_t dm, mach_port_t send,
3758 dispatch_mach_msg_t checkin)
3759 {
3760 dispatch_mach_send_refs_t dr = dm->dm_refs;
3761 (void)dispatch_atomic_inc2o(dr, dm_disconnect_cnt, relaxed);
3762 if (MACH_PORT_VALID(send) && checkin) {
3763 dispatch_retain(checkin);
3764 mach_msg_option_t options = _dispatch_mach_checkin_options();
3765 _dispatch_mach_msg_set_options(checkin, options);
3766 dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
3767 } else {
3768 checkin = NULL;
3769 dr->dm_checkin_port = MACH_PORT_NULL;
3770 }
3771 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3772 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
3773 dc->dc_func = (void*)_dispatch_mach_reconnect_invoke;
3774 dc->dc_ctxt = dc;
3775 dc->dc_data = checkin;
3776 dc->dc_other = (void*)(uintptr_t)send;
3777 return _dispatch_mach_send_push(dm, dc);
3778 }
3779
3780 #if DISPATCH_MACH_SEND_SYNC
3781 DISPATCH_NOINLINE
3782 static void
3783 _dispatch_mach_send_sync_slow(dispatch_mach_t dm)
3784 {
3785 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
3786 struct dispatch_object_s dc = {
3787 .do_vtable = (void *)(DISPATCH_OBJ_SYNC_SLOW_BIT),
3788 .do_ctxt = (void*)sema,
3789 };
3790 _dispatch_mach_send_push(dm, &dc);
3791 _dispatch_thread_semaphore_wait(sema);
3792 _dispatch_put_thread_semaphore(sema);
3793 }
3794 #endif // DISPATCH_MACH_SEND_SYNC
3795
3796 DISPATCH_NOINLINE
3797 mach_port_t
3798 dispatch_mach_get_checkin_port(dispatch_mach_t dm)
3799 {
3800 dispatch_mach_send_refs_t dr = dm->dm_refs;
3801 if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3802 return MACH_PORT_DEAD;
3803 }
3804 return dr->dm_checkin_port;
3805 }
3806
3807 DISPATCH_NOINLINE
3808 static void
3809 _dispatch_mach_connect_invoke(dispatch_mach_t dm)
3810 {
3811 dispatch_mach_refs_t dr = dm->ds_refs;
3812 _dispatch_client_callout4(dr->dm_handler_ctxt,
3813 DISPATCH_MACH_CONNECTED, NULL, 0, dr->dm_handler_func);
3814 dm->dm_connect_handler_called = 1;
3815 }
3816
3817 DISPATCH_NOINLINE
3818 void
3819 _dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg)
3820 {
3821 dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current();
3822 dispatch_mach_refs_t dr = dm->ds_refs;
3823 mach_error_t err;
3824 unsigned long reason = _dispatch_mach_msg_get_reason(dmsg, &err);
3825
3826 dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
3827 _dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq);
3828 _dispatch_voucher_ktrace_dmsg_pop(dmsg);
3829 _dispatch_voucher_debug("mach-msg[%p] adopt", dmsg->dmsg_voucher, dmsg);
3830 _dispatch_adopt_priority_and_replace_voucher(dmsg->dmsg_priority,
3831 dmsg->dmsg_voucher, DISPATCH_PRIORITY_ENFORCE);
3832 dmsg->dmsg_voucher = NULL;
3833 if (slowpath(!dm->dm_connect_handler_called)) {
3834 _dispatch_mach_connect_invoke(dm);
3835 }
3836 _dispatch_client_callout4(dr->dm_handler_ctxt, reason, dmsg, err,
3837 dr->dm_handler_func);
3838 _dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm);
3839 _dispatch_introspection_queue_item_complete(dmsg);
3840 dispatch_release(dmsg);
3841 }
3842
3843 DISPATCH_NOINLINE
3844 void
3845 _dispatch_mach_barrier_invoke(void *ctxt)
3846 {
3847 dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current();
3848 dispatch_mach_refs_t dr = dm->ds_refs;
3849 struct dispatch_continuation_s *dc = ctxt;
3850 void *context = dc->dc_data;
3851 dispatch_function_t barrier = dc->dc_other;
3852 bool send_barrier = ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT);
3853
3854 _dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq);
3855 if (slowpath(!dm->dm_connect_handler_called)) {
3856 _dispatch_mach_connect_invoke(dm);
3857 }
3858 _dispatch_client_callout(context, barrier);
3859 _dispatch_client_callout4(dr->dm_handler_ctxt,
3860 DISPATCH_MACH_BARRIER_COMPLETED, NULL, 0, dr->dm_handler_func);
3861 _dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm);
3862 if (send_barrier) {
3863 (void)dispatch_atomic_dec2o(dm->dm_refs, dm_sending, release);
3864 }
3865 }
3866
3867 DISPATCH_NOINLINE
3868 void
3869 dispatch_mach_send_barrier_f(dispatch_mach_t dm, void *context,
3870 dispatch_function_t barrier)
3871 {
3872 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3873 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
3874 dc->dc_func = _dispatch_mach_barrier_invoke;
3875 dc->dc_ctxt = dc;
3876 dc->dc_data = context;
3877 dc->dc_other = barrier;
3878 _dispatch_continuation_voucher_set(dc, 0);
3879 _dispatch_continuation_priority_set(dc, 0, 0);
3880
3881 dispatch_mach_send_refs_t dr = dm->dm_refs;
3882 if (slowpath(dr->dm_tail) || slowpath(!dispatch_atomic_cmpxchg2o(dr,
3883 dm_sending, 0, 1, acquire))) {
3884 return _dispatch_mach_send_push(dm, dc);
3885 }
3886 // leave send queue locked until barrier has completed
3887 return _dispatch_mach_push(dm, dc, dc->dc_priority);
3888 }
3889
3890 DISPATCH_NOINLINE
3891 void
3892 dispatch_mach_receive_barrier_f(dispatch_mach_t dm, void *context,
3893 dispatch_function_t barrier)
3894 {
3895 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3896 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
3897 dc->dc_func = _dispatch_mach_barrier_invoke;
3898 dc->dc_ctxt = dc;
3899 dc->dc_data = context;
3900 dc->dc_other = barrier;
3901 _dispatch_continuation_voucher_set(dc, 0);
3902 _dispatch_continuation_priority_set(dc, 0, 0);
3903
3904 return _dispatch_mach_push(dm, dc, dc->dc_priority);
3905 }
3906
3907 DISPATCH_NOINLINE
3908 void
3909 dispatch_mach_send_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
3910 {
3911 dispatch_mach_send_barrier_f(dm, _dispatch_Block_copy(barrier),
3912 _dispatch_call_block_and_release);
3913 }
3914
3915 DISPATCH_NOINLINE
3916 void
3917 dispatch_mach_receive_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
3918 {
3919 dispatch_mach_receive_barrier_f(dm, _dispatch_Block_copy(barrier),
3920 _dispatch_call_block_and_release);
3921 }
3922
3923 DISPATCH_NOINLINE
3924 static void
3925 _dispatch_mach_cancel_invoke(dispatch_mach_t dm)
3926 {
3927 dispatch_mach_refs_t dr = dm->ds_refs;
3928 if (slowpath(!dm->dm_connect_handler_called)) {
3929 _dispatch_mach_connect_invoke(dm);
3930 }
3931 _dispatch_client_callout4(dr->dm_handler_ctxt,
3932 DISPATCH_MACH_CANCELED, NULL, 0, dr->dm_handler_func);
3933 dm->dm_cancel_handler_called = 1;
3934 _dispatch_release(dm); // the retain is done at creation time
3935 }
3936
3937 DISPATCH_NOINLINE
3938 void
3939 dispatch_mach_cancel(dispatch_mach_t dm)
3940 {
3941 dispatch_source_cancel((dispatch_source_t)dm);
3942 }
3943
3944 DISPATCH_ALWAYS_INLINE
3945 static inline dispatch_queue_t
3946 _dispatch_mach_invoke2(dispatch_object_t dou,
3947 _dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED)
3948 {
3949 dispatch_mach_t dm = dou._dm;
3950
3951 // This function performs all mach channel actions. Each action is
3952 // responsible for verifying that it takes place on the appropriate queue.
3953 // If the current queue is not the correct queue for this action, the
3954 // correct queue will be returned and the invoke will be re-driven on that
3955 // queue.
3956
3957 // The order of tests here in invoke and in probe should be consistent.
3958
3959 dispatch_queue_t dq = _dispatch_queue_get_current();
3960 dispatch_mach_send_refs_t dr = dm->dm_refs;
3961
3962 if (slowpath(!dm->ds_is_installed)) {
3963 // The channel needs to be installed on the manager queue.
3964 if (dq != &_dispatch_mgr_q) {
3965 return &_dispatch_mgr_q;
3966 }
3967 if (dm->ds_dkev) {
3968 _dispatch_source_kevent_register((dispatch_source_t)dm);
3969 }
3970 dm->ds_is_installed = true;
3971 _dispatch_mach_send(dm);
3972 // Apply initial target queue change
3973 _dispatch_queue_drain(dou);
3974 if (dm->dq_items_tail) {
3975 return dm->do_targetq;
3976 }
3977 } else if (dm->dq_items_tail) {
3978 // The channel has pending messages to deliver to the target queue.
3979 if (dq != dm->do_targetq) {
3980 return dm->do_targetq;
3981 }
3982 dispatch_queue_t tq = dm->do_targetq;
3983 if (slowpath(_dispatch_queue_drain(dou))) {
3984 DISPATCH_CLIENT_CRASH("Sync onto mach channel");
3985 }
3986 if (slowpath(tq != dm->do_targetq)) {
3987 // An item on the channel changed the target queue
3988 return dm->do_targetq;
3989 }
3990 } else if (dr->dm_sending) {
3991 // Sending and uninstallation below require the send lock, the channel
3992 // will be woken up when the lock is dropped <rdar://15132939&15203957>
3993 return NULL;
3994 } else if (dr->dm_tail) {
3995 if (slowpath(dr->dm_needs_mgr) || (slowpath(dr->dm_disconnect_cnt) &&
3996 (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)))) {
3997 // Send/reply kevents need to be installed or uninstalled
3998 if (dq != &_dispatch_mgr_q) {
3999 return &_dispatch_mgr_q;
4000 }
4001 }
4002 if (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) ||
4003 (dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt) {
4004 // The channel has pending messages to send.
4005 _dispatch_mach_send(dm);
4006 }
4007 } else if (dm->ds_atomic_flags & DSF_CANCELED){
4008 // The channel has been cancelled and needs to be uninstalled from the
4009 // manager queue. After uninstallation, the cancellation handler needs
4010 // to be delivered to the target queue.
4011 if (dm->ds_dkev || dm->dm_dkev || dr->dm_send ||
4012 !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
4013 if (dq != &_dispatch_mgr_q) {
4014 return &_dispatch_mgr_q;
4015 }
4016 if (!_dispatch_mach_cancel(dm)) {
4017 return NULL;
4018 }
4019 }
4020 if (!dm->dm_cancel_handler_called) {
4021 if (dq != dm->do_targetq) {
4022 return dm->do_targetq;
4023 }
4024 _dispatch_mach_cancel_invoke(dm);
4025 }
4026 }
4027 return NULL;
4028 }
4029
4030 DISPATCH_NOINLINE
4031 void
4032 _dispatch_mach_invoke(dispatch_mach_t dm)
4033 {
4034 _dispatch_queue_class_invoke(dm, _dispatch_mach_invoke2);
4035 }
4036
4037 unsigned long
4038 _dispatch_mach_probe(dispatch_mach_t dm)
4039 {
4040 // This function determines whether the mach channel needs to be invoked.
4041 // The order of tests here in probe and in invoke should be consistent.
4042
4043 dispatch_mach_send_refs_t dr = dm->dm_refs;
4044
4045 if (slowpath(!dm->ds_is_installed)) {
4046 // The channel needs to be installed on the manager queue.
4047 return true;
4048 } else if (_dispatch_queue_class_probe(dm)) {
4049 // The source has pending messages to deliver to the target queue.
4050 return true;
4051 } else if (dr->dm_sending) {
4052 // Sending and uninstallation below require the send lock, the channel
4053 // will be woken up when the lock is dropped <rdar://15132939&15203957>
4054 return false;
4055 } else if (dr->dm_tail &&
4056 (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) ||
4057 (dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt)) {
4058 // The channel has pending messages to send.
4059 return true;
4060 } else if (dm->ds_atomic_flags & DSF_CANCELED) {
4061 if (dm->ds_dkev || dm->dm_dkev || dr->dm_send ||
4062 !TAILQ_EMPTY(&dm->dm_refs->dm_replies) ||
4063 !dm->dm_cancel_handler_called) {
4064 // The channel needs to be uninstalled from the manager queue, or
4065 // the cancellation handler needs to be delivered to the target
4066 // queue.
4067 return true;
4068 }
4069 }
4070 // Nothing to do.
4071 return false;
4072 }
4073
4074 #pragma mark -
4075 #pragma mark dispatch_mach_msg_t
4076
4077 dispatch_mach_msg_t
4078 dispatch_mach_msg_create(mach_msg_header_t *msg, size_t size,
4079 dispatch_mach_msg_destructor_t destructor, mach_msg_header_t **msg_ptr)
4080 {
4081 if (slowpath(size < sizeof(mach_msg_header_t)) ||
4082 slowpath(destructor && !msg)) {
4083 DISPATCH_CLIENT_CRASH("Empty message");
4084 }
4085 dispatch_mach_msg_t dmsg = _dispatch_alloc(DISPATCH_VTABLE(mach_msg),
4086 sizeof(struct dispatch_mach_msg_s) +
4087 (destructor ? 0 : size - sizeof(dmsg->dmsg_msg)));
4088 if (destructor) {
4089 dmsg->dmsg_msg = msg;
4090 } else if (msg) {
4091 memcpy(dmsg->dmsg_buf, msg, size);
4092 }
4093 dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
4094 dmsg->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
4095 false);
4096 dmsg->dmsg_destructor = destructor;
4097 dmsg->dmsg_size = size;
4098 if (msg_ptr) {
4099 *msg_ptr = _dispatch_mach_msg_get_msg(dmsg);
4100 }
4101 return dmsg;
4102 }
4103
4104 void
4105 _dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg)
4106 {
4107 if (dmsg->dmsg_voucher) {
4108 _voucher_release(dmsg->dmsg_voucher);
4109 dmsg->dmsg_voucher = NULL;
4110 }
4111 switch (dmsg->dmsg_destructor) {
4112 case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT:
4113 break;
4114 case DISPATCH_MACH_MSG_DESTRUCTOR_FREE:
4115 free(dmsg->dmsg_msg);
4116 break;
4117 case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE: {
4118 mach_vm_size_t vm_size = dmsg->dmsg_size;
4119 mach_vm_address_t vm_addr = (uintptr_t)dmsg->dmsg_msg;
4120 (void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(),
4121 vm_addr, vm_size));
4122 break;
4123 }}
4124 }
4125
4126 static inline mach_msg_header_t*
4127 _dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg)
4128 {
4129 return dmsg->dmsg_destructor ? dmsg->dmsg_msg :
4130 (mach_msg_header_t*)dmsg->dmsg_buf;
4131 }
4132
4133 mach_msg_header_t*
4134 dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg, size_t *size_ptr)
4135 {
4136 if (size_ptr) {
4137 *size_ptr = dmsg->dmsg_size;
4138 }
4139 return _dispatch_mach_msg_get_msg(dmsg);
4140 }
4141
4142 size_t
4143 _dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg, char* buf, size_t bufsiz)
4144 {
4145 size_t offset = 0;
4146 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4147 dx_kind(dmsg), dmsg);
4148 offset += dsnprintf(&buf[offset], bufsiz - offset, "xrefcnt = 0x%x, "
4149 "refcnt = 0x%x, ", dmsg->do_xref_cnt + 1, dmsg->do_ref_cnt + 1);
4150 offset += dsnprintf(&buf[offset], bufsiz - offset, "opts/err = 0x%x, "
4151 "msgh[%p] = { ", dmsg->do_suspend_cnt, dmsg->dmsg_buf);
4152 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dmsg);
4153 if (hdr->msgh_id) {
4154 offset += dsnprintf(&buf[offset], bufsiz - offset, "id 0x%x, ",
4155 hdr->msgh_id);
4156 }
4157 if (hdr->msgh_size) {
4158 offset += dsnprintf(&buf[offset], bufsiz - offset, "size %u, ",
4159 hdr->msgh_size);
4160 }
4161 if (hdr->msgh_bits) {
4162 offset += dsnprintf(&buf[offset], bufsiz - offset, "bits <l %u, r %u",
4163 MACH_MSGH_BITS_LOCAL(hdr->msgh_bits),
4164 MACH_MSGH_BITS_REMOTE(hdr->msgh_bits));
4165 if (MACH_MSGH_BITS_OTHER(hdr->msgh_bits)) {
4166 offset += dsnprintf(&buf[offset], bufsiz - offset, ", o 0x%x",
4167 MACH_MSGH_BITS_OTHER(hdr->msgh_bits));
4168 }
4169 offset += dsnprintf(&buf[offset], bufsiz - offset, ">, ");
4170 }
4171 if (hdr->msgh_local_port && hdr->msgh_remote_port) {
4172 offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x, "
4173 "remote 0x%x", hdr->msgh_local_port, hdr->msgh_remote_port);
4174 } else if (hdr->msgh_local_port) {
4175 offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x",
4176 hdr->msgh_local_port);
4177 } else if (hdr->msgh_remote_port) {
4178 offset += dsnprintf(&buf[offset], bufsiz - offset, "remote 0x%x",
4179 hdr->msgh_remote_port);
4180 } else {
4181 offset += dsnprintf(&buf[offset], bufsiz - offset, "no ports");
4182 }
4183 offset += dsnprintf(&buf[offset], bufsiz - offset, " } }");
4184 return offset;
4185 }
4186
4187 #pragma mark -
4188 #pragma mark dispatch_mig_server
4189
4190 mach_msg_return_t
4191 dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz,
4192 dispatch_mig_callback_t callback)
4193 {
4194 mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
4195 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
4196 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0) | MACH_RCV_VOUCHER;
4197 mach_msg_options_t tmp_options;
4198 mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
4199 mach_msg_return_t kr = 0;
4200 uint64_t assertion_token = 0;
4201 unsigned int cnt = 1000; // do not stall out serial queues
4202 boolean_t demux_success;
4203 bool received = false;
4204 size_t rcv_size = maxmsgsz + MAX_TRAILER_SIZE;
4205
4206 // XXX FIXME -- allocate these elsewhere
4207 bufRequest = alloca(rcv_size);
4208 bufReply = alloca(rcv_size);
4209 bufReply->Head.msgh_size = 0;
4210 bufRequest->RetCode = 0;
4211
4212 #if DISPATCH_DEBUG
4213 options |= MACH_RCV_LARGE; // rdar://problem/8422992
4214 #endif
4215 tmp_options = options;
4216 // XXX FIXME -- change this to not starve out the target queue
4217 for (;;) {
4218 if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
4219 options &= ~MACH_RCV_MSG;
4220 tmp_options &= ~MACH_RCV_MSG;
4221
4222 if (!(tmp_options & MACH_SEND_MSG)) {
4223 goto out;
4224 }
4225 }
4226 kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
4227 (mach_msg_size_t)rcv_size, (mach_port_t)ds->ds_ident_hack, 0,0);
4228
4229 tmp_options = options;
4230
4231 if (slowpath(kr)) {
4232 switch (kr) {
4233 case MACH_SEND_INVALID_DEST:
4234 case MACH_SEND_TIMED_OUT:
4235 if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
4236 mach_msg_destroy(&bufReply->Head);
4237 }
4238 break;
4239 case MACH_RCV_TIMED_OUT:
4240 // Don't return an error if a message was sent this time or
4241 // a message was successfully received previously
4242 // rdar://problems/7363620&7791738
4243 if(bufReply->Head.msgh_remote_port || received) {
4244 kr = MACH_MSG_SUCCESS;
4245 }
4246 break;
4247 case MACH_RCV_INVALID_NAME:
4248 break;
4249 #if DISPATCH_DEBUG
4250 case MACH_RCV_TOO_LARGE:
4251 // receive messages that are too large and log their id and size
4252 // rdar://problem/8422992
4253 tmp_options &= ~MACH_RCV_LARGE;
4254 size_t large_size = bufReply->Head.msgh_size + MAX_TRAILER_SIZE;
4255 void *large_buf = malloc(large_size);
4256 if (large_buf) {
4257 rcv_size = large_size;
4258 bufReply = large_buf;
4259 }
4260 if (!mach_msg(&bufReply->Head, tmp_options, 0,
4261 (mach_msg_size_t)rcv_size,
4262 (mach_port_t)ds->ds_ident_hack, 0, 0)) {
4263 _dispatch_log("BUG in libdispatch client: "
4264 "dispatch_mig_server received message larger than "
4265 "requested size %zd: id = 0x%x, size = %d",
4266 maxmsgsz, bufReply->Head.msgh_id,
4267 bufReply->Head.msgh_size);
4268 }
4269 if (large_buf) {
4270 free(large_buf);
4271 }
4272 // fall through
4273 #endif
4274 default:
4275 _dispatch_bug_mach_client(
4276 "dispatch_mig_server: mach_msg() failed", kr);
4277 break;
4278 }
4279 goto out;
4280 }
4281
4282 if (!(tmp_options & MACH_RCV_MSG)) {
4283 goto out;
4284 }
4285
4286 if (assertion_token) {
4287 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4288 int r = proc_importance_assertion_complete(assertion_token);
4289 (void)dispatch_assume_zero(r);
4290 #endif
4291 assertion_token = 0;
4292 }
4293 received = true;
4294
4295 bufTemp = bufRequest;
4296 bufRequest = bufReply;
4297 bufReply = bufTemp;
4298
4299 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4300 int r = proc_importance_assertion_begin_with_msg(&bufRequest->Head,
4301 NULL, &assertion_token);
4302 if (r && slowpath(r != EIO)) {
4303 (void)dispatch_assume_zero(r);
4304 }
4305 #endif
4306 _voucher_replace(voucher_create_with_mach_msg(&bufRequest->Head));
4307 demux_success = callback(&bufRequest->Head, &bufReply->Head);
4308
4309 if (!demux_success) {
4310 // destroy the request - but not the reply port
4311 bufRequest->Head.msgh_remote_port = 0;
4312 mach_msg_destroy(&bufRequest->Head);
4313 } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
4314 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
4315 // is present
4316 if (slowpath(bufReply->RetCode)) {
4317 if (bufReply->RetCode == MIG_NO_REPLY) {
4318 continue;
4319 }
4320
4321 // destroy the request - but not the reply port
4322 bufRequest->Head.msgh_remote_port = 0;
4323 mach_msg_destroy(&bufRequest->Head);
4324 }
4325 }
4326
4327 if (bufReply->Head.msgh_remote_port) {
4328 tmp_options |= MACH_SEND_MSG;
4329 if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) !=
4330 MACH_MSG_TYPE_MOVE_SEND_ONCE) {
4331 tmp_options |= MACH_SEND_TIMEOUT;
4332 }
4333 }
4334 }
4335
4336 out:
4337 if (assertion_token) {
4338 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4339 int r = proc_importance_assertion_complete(assertion_token);
4340 (void)dispatch_assume_zero(r);
4341 #endif
4342 }
4343
4344 return kr;
4345 }
4346
4347 #endif /* HAVE_MACH */
4348
4349 #pragma mark -
4350 #pragma mark dispatch_source_debug
4351
4352 DISPATCH_NOINLINE
4353 static const char *
4354 _evfiltstr(short filt)
4355 {
4356 switch (filt) {
4357 #define _evfilt2(f) case (f): return #f
4358 _evfilt2(EVFILT_READ);
4359 _evfilt2(EVFILT_WRITE);
4360 _evfilt2(EVFILT_AIO);
4361 _evfilt2(EVFILT_VNODE);
4362 _evfilt2(EVFILT_PROC);
4363 _evfilt2(EVFILT_SIGNAL);
4364 _evfilt2(EVFILT_TIMER);
4365 #ifdef EVFILT_VM
4366 _evfilt2(EVFILT_VM);
4367 #endif
4368 #ifdef EVFILT_MEMORYSTATUS
4369 _evfilt2(EVFILT_MEMORYSTATUS);
4370 #endif
4371 #if HAVE_MACH
4372 _evfilt2(EVFILT_MACHPORT);
4373 _evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION);
4374 #endif
4375 _evfilt2(EVFILT_FS);
4376 _evfilt2(EVFILT_USER);
4377
4378 _evfilt2(DISPATCH_EVFILT_TIMER);
4379 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
4380 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
4381 default:
4382 return "EVFILT_missing";
4383 }
4384 }
4385
4386 static size_t
4387 _dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
4388 {
4389 dispatch_queue_t target = ds->do_targetq;
4390 return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%lx, "
4391 "pending_data = 0x%lx, pending_data_mask = 0x%lx, ",
4392 target && target->dq_label ? target->dq_label : "", target,
4393 ds->ds_ident_hack, ds->ds_pending_data, ds->ds_pending_data_mask);
4394 }
4395
4396 static size_t
4397 _dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
4398 {
4399 dispatch_source_refs_t dr = ds->ds_refs;
4400 return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx,"
4401 " last_fire = 0x%llx, interval = 0x%llx, flags = 0x%lx }, ",
4402 ds_timer(dr).target, ds_timer(dr).deadline, ds_timer(dr).last_fire,
4403 ds_timer(dr).interval, ds_timer(dr).flags);
4404 }
4405
4406 size_t
4407 _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
4408 {
4409 size_t offset = 0;
4410 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4411 dx_kind(ds), ds);
4412 offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
4413 offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
4414 if (ds->ds_is_timer) {
4415 offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
4416 }
4417 offset += dsnprintf(&buf[offset], bufsiz - offset, "filter = %s }",
4418 ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????");
4419 return offset;
4420 }
4421
4422 static size_t
4423 _dispatch_mach_debug_attr(dispatch_mach_t dm, char* buf, size_t bufsiz)
4424 {
4425 dispatch_queue_t target = dm->do_targetq;
4426 return dsnprintf(buf, bufsiz, "target = %s[%p], receive = 0x%x, "
4427 "send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, "
4428 "sending = %d, disconnected = %d, canceled = %d ",
4429 target && target->dq_label ? target->dq_label : "", target,
4430 dm->ds_dkev ?(mach_port_t)dm->ds_dkev->dk_kevent.ident:0,
4431 dm->dm_refs->dm_send,
4432 dm->dm_dkev ?(mach_port_t)dm->dm_dkev->dk_kevent.ident:0,
4433 dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) ?
4434 " (armed)" : "", dm->dm_refs->dm_checkin_port,
4435 dm->dm_refs->dm_checkin ? " (pending)" : "",
4436 dm->dm_refs->dm_sending, dm->dm_refs->dm_disconnect_cnt,
4437 (bool)(dm->ds_atomic_flags & DSF_CANCELED));
4438 }
4439 size_t
4440 _dispatch_mach_debug(dispatch_mach_t dm, char* buf, size_t bufsiz)
4441 {
4442 size_t offset = 0;
4443 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4444 dm->dq_label && !dm->dm_cancel_handler_called ? dm->dq_label :
4445 dx_kind(dm), dm);
4446 offset += _dispatch_object_debug_attr(dm, &buf[offset], bufsiz - offset);
4447 offset += _dispatch_mach_debug_attr(dm, &buf[offset], bufsiz - offset);
4448 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
4449 return offset;
4450 }
4451
4452 #if DISPATCH_DEBUG
4453 static void
4454 _dispatch_kevent_debug(struct kevent64_s* kev, const char* str)
4455 {
4456 _dispatch_log("kevent[%p] = { ident = 0x%llx, filter = %s, flags = 0x%x, "
4457 "fflags = 0x%x, data = 0x%llx, udata = 0x%llx, ext[0] = 0x%llx, "
4458 "ext[1] = 0x%llx }: %s", kev, kev->ident, _evfiltstr(kev->filter),
4459 kev->flags, kev->fflags, kev->data, kev->udata, kev->ext[0],
4460 kev->ext[1], str);
4461 }
4462
4463 static void
4464 _dispatch_kevent_debugger2(void *context)
4465 {
4466 struct sockaddr sa;
4467 socklen_t sa_len = sizeof(sa);
4468 int c, fd = (int)(long)context;
4469 unsigned int i;
4470 dispatch_kevent_t dk;
4471 dispatch_source_t ds;
4472 dispatch_source_refs_t dr;
4473 FILE *debug_stream;
4474
4475 c = accept(fd, &sa, &sa_len);
4476 if (c == -1) {
4477 if (errno != EAGAIN) {
4478 (void)dispatch_assume_zero(errno);
4479 }
4480 return;
4481 }
4482 #if 0
4483 int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO
4484 if (r == -1) {
4485 (void)dispatch_assume_zero(errno);
4486 }
4487 #endif
4488 debug_stream = fdopen(c, "a");
4489 if (!dispatch_assume(debug_stream)) {
4490 close(c);
4491 return;
4492 }
4493
4494 fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
4495 fprintf(debug_stream, "Content-type: text/html\r\n");
4496 fprintf(debug_stream, "Pragma: nocache\r\n");
4497 fprintf(debug_stream, "\r\n");
4498 fprintf(debug_stream, "<html>\n");
4499 fprintf(debug_stream, "<head><title>PID %u</title></head>\n", getpid());
4500 fprintf(debug_stream, "<body>\n<ul>\n");
4501
4502 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td>"
4503 // "<td>DK</td><td>DK</td><td>DK</td></tr>\n");
4504
4505 for (i = 0; i < DSL_HASH_SIZE; i++) {
4506 if (TAILQ_EMPTY(&_dispatch_sources[i])) {
4507 continue;
4508 }
4509 TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
4510 fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags "
4511 "0x%hx fflags 0x%x data 0x%lx udata %p\n",
4512 dk, (unsigned long)dk->dk_kevent.ident,
4513 _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
4514 dk->dk_kevent.fflags, (unsigned long)dk->dk_kevent.data,
4515 (void*)dk->dk_kevent.udata);
4516 fprintf(debug_stream, "\t\t<ul>\n");
4517 TAILQ_FOREACH(dr, &dk->dk_sources, dr_list) {
4518 ds = _dispatch_source_from_refs(dr);
4519 fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend "
4520 "0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
4521 ds, ds->do_ref_cnt + 1, ds->do_suspend_cnt,
4522 ds->ds_pending_data, ds->ds_pending_data_mask,
4523 ds->ds_atomic_flags);
4524 if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
4525 dispatch_queue_t dq = ds->do_targetq;
4526 fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend "
4527 "0x%x label: %s\n", dq, dq->do_ref_cnt + 1,
4528 dq->do_suspend_cnt, dq->dq_label ? dq->dq_label:"");
4529 }
4530 }
4531 fprintf(debug_stream, "\t\t</ul>\n");
4532 fprintf(debug_stream, "\t</li>\n");
4533 }
4534 }
4535 fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
4536 fflush(debug_stream);
4537 fclose(debug_stream);
4538 }
4539
4540 static void
4541 _dispatch_kevent_debugger2_cancel(void *context)
4542 {
4543 int ret, fd = (int)(long)context;
4544
4545 ret = close(fd);
4546 if (ret != -1) {
4547 (void)dispatch_assume_zero(errno);
4548 }
4549 }
4550
4551 static void
4552 _dispatch_kevent_debugger(void *context DISPATCH_UNUSED)
4553 {
4554 union {
4555 struct sockaddr_in sa_in;
4556 struct sockaddr sa;
4557 } sa_u = {
4558 .sa_in = {
4559 .sin_family = AF_INET,
4560 .sin_addr = { htonl(INADDR_LOOPBACK), },
4561 },
4562 };
4563 dispatch_source_t ds;
4564 const char *valstr;
4565 int val, r, fd, sock_opt = 1;
4566 socklen_t slen = sizeof(sa_u);
4567
4568 if (issetugid()) {
4569 return;
4570 }
4571 valstr = getenv("LIBDISPATCH_DEBUGGER");
4572 if (!valstr) {
4573 return;
4574 }
4575 val = atoi(valstr);
4576 if (val == 2) {
4577 sa_u.sa_in.sin_addr.s_addr = 0;
4578 }
4579 fd = socket(PF_INET, SOCK_STREAM, 0);
4580 if (fd == -1) {
4581 (void)dispatch_assume_zero(errno);
4582 return;
4583 }
4584 r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt,
4585 (socklen_t) sizeof sock_opt);
4586 if (r == -1) {
4587 (void)dispatch_assume_zero(errno);
4588 goto out_bad;
4589 }
4590 #if 0
4591 r = fcntl(fd, F_SETFL, O_NONBLOCK);
4592 if (r == -1) {
4593 (void)dispatch_assume_zero(errno);
4594 goto out_bad;
4595 }
4596 #endif
4597 r = bind(fd, &sa_u.sa, sizeof(sa_u));
4598 if (r == -1) {
4599 (void)dispatch_assume_zero(errno);
4600 goto out_bad;
4601 }
4602 r = listen(fd, SOMAXCONN);
4603 if (r == -1) {
4604 (void)dispatch_assume_zero(errno);
4605 goto out_bad;
4606 }
4607 r = getsockname(fd, &sa_u.sa, &slen);
4608 if (r == -1) {
4609 (void)dispatch_assume_zero(errno);
4610 goto out_bad;
4611 }
4612
4613 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, (uintptr_t)fd, 0,
4614 &_dispatch_mgr_q);
4615 if (dispatch_assume(ds)) {
4616 _dispatch_log("LIBDISPATCH: debug port: %hu",
4617 (in_port_t)ntohs(sa_u.sa_in.sin_port));
4618
4619 /* ownership of fd transfers to ds */
4620 dispatch_set_context(ds, (void *)(long)fd);
4621 dispatch_source_set_event_handler_f(ds, _dispatch_kevent_debugger2);
4622 dispatch_source_set_cancel_handler_f(ds,
4623 _dispatch_kevent_debugger2_cancel);
4624 dispatch_resume(ds);
4625
4626 return;
4627 }
4628 out_bad:
4629 close(fd);
4630 }
4631
4632 #if HAVE_MACH
4633
4634 #ifndef MACH_PORT_TYPE_SPREQUEST
4635 #define MACH_PORT_TYPE_SPREQUEST 0x40000000
4636 #endif
4637
4638 DISPATCH_NOINLINE
4639 void
4640 dispatch_debug_machport(mach_port_t name, const char* str)
4641 {
4642 mach_port_type_t type;
4643 mach_msg_bits_t ns = 0, nr = 0, nso = 0, nd = 0;
4644 unsigned int dnreqs = 0, dnrsiz;
4645 kern_return_t kr = mach_port_type(mach_task_self(), name, &type);
4646 if (kr) {
4647 _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name,
4648 kr, mach_error_string(kr), str);
4649 return;
4650 }
4651 if (type & MACH_PORT_TYPE_SEND) {
4652 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4653 MACH_PORT_RIGHT_SEND, &ns));
4654 }
4655 if (type & MACH_PORT_TYPE_SEND_ONCE) {
4656 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4657 MACH_PORT_RIGHT_SEND_ONCE, &nso));
4658 }
4659 if (type & MACH_PORT_TYPE_DEAD_NAME) {
4660 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4661 MACH_PORT_RIGHT_DEAD_NAME, &nd));
4662 }
4663 if (type & (MACH_PORT_TYPE_RECEIVE|MACH_PORT_TYPE_SEND)) {
4664 kr = mach_port_dnrequest_info(mach_task_self(), name, &dnrsiz, &dnreqs);
4665 if (kr != KERN_INVALID_RIGHT) (void)dispatch_assume_zero(kr);
4666 }
4667 if (type & MACH_PORT_TYPE_RECEIVE) {
4668 mach_port_status_t status = { .mps_pset = 0, };
4669 mach_msg_type_number_t cnt = MACH_PORT_RECEIVE_STATUS_COUNT;
4670 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4671 MACH_PORT_RIGHT_RECEIVE, &nr));
4672 (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
4673 name, MACH_PORT_RECEIVE_STATUS, (void*)&status, &cnt));
4674 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4675 "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
4676 "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
4677 "seqno(%03u) }: %s", name, nr, ns, nso, nd, dnreqs,
4678 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N",
4679 status.mps_nsrequest ? "Y":"N", status.mps_pdrequest ? "Y":"N",
4680 status.mps_srights ? "Y":"N", status.mps_sorights,
4681 status.mps_qlimit, status.mps_msgcount, status.mps_mscount,
4682 status.mps_seqno, str);
4683 } else if (type & (MACH_PORT_TYPE_SEND|MACH_PORT_TYPE_SEND_ONCE|
4684 MACH_PORT_TYPE_DEAD_NAME)) {
4685 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4686 "dnreqs(%03u) spreq(%s) }: %s", name, nr, ns, nso, nd, dnreqs,
4687 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", str);
4688 } else {
4689 _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name, type,
4690 str);
4691 }
4692 }
4693
4694 #endif // HAVE_MACH
4695
4696 #endif // DISPATCH_DEBUG