]> git.saurik.com Git - apple/libdispatch.git/blob - src/source.c
572912b62a0f2ba8fa5b055f87f697eb1aaea4f6
[apple/libdispatch.git] / src / source.c
1 /*
2 * Copyright (c) 2008-2011 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #include "protocolServer.h"
25 #endif
26 #include <sys/mount.h>
27
28 static void _dispatch_source_dispose(dispatch_source_t ds);
29 static dispatch_queue_t _dispatch_source_invoke(dispatch_source_t ds);
30 static bool _dispatch_source_probe(dispatch_source_t ds);
31 static void _dispatch_source_merge_kevent(dispatch_source_t ds,
32 const struct kevent *ke);
33 static void _dispatch_kevent_register(dispatch_source_t ds);
34 static void _dispatch_kevent_unregister(dispatch_source_t ds);
35 static bool _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
36 uint32_t del_flags);
37 static inline void _dispatch_source_timer_init(void);
38 static void _dispatch_timer_list_update(dispatch_source_t ds);
39 static inline unsigned long _dispatch_source_timer_data(
40 dispatch_source_refs_t dr, unsigned long prev);
41 #if HAVE_MACH
42 static kern_return_t _dispatch_kevent_machport_resume(dispatch_kevent_t dk,
43 uint32_t new_flags, uint32_t del_flags);
44 static void _dispatch_drain_mach_messages(struct kevent *ke);
45 #endif
46 static size_t _dispatch_source_kevent_debug(dispatch_source_t ds,
47 char* buf, size_t bufsiz);
48 #if DISPATCH_DEBUG
49 static void _dispatch_kevent_debugger(void *context);
50 #endif
51
52 #pragma mark -
53 #pragma mark dispatch_source_t
54
55 const struct dispatch_source_vtable_s _dispatch_source_kevent_vtable = {
56 .do_type = DISPATCH_SOURCE_KEVENT_TYPE,
57 .do_kind = "kevent-source",
58 .do_invoke = _dispatch_source_invoke,
59 .do_dispose = _dispatch_source_dispose,
60 .do_probe = _dispatch_source_probe,
61 .do_debug = _dispatch_source_kevent_debug,
62 };
63
64 dispatch_source_t
65 dispatch_source_create(dispatch_source_type_t type,
66 uintptr_t handle,
67 unsigned long mask,
68 dispatch_queue_t q)
69 {
70 const struct kevent *proto_kev = &type->ke;
71 dispatch_source_t ds = NULL;
72 dispatch_kevent_t dk = NULL;
73
74 // input validation
75 if (type == NULL || (mask & ~type->mask)) {
76 goto out_bad;
77 }
78
79 switch (type->ke.filter) {
80 case EVFILT_SIGNAL:
81 if (handle >= NSIG) {
82 goto out_bad;
83 }
84 break;
85 case EVFILT_FS:
86 #if DISPATCH_USE_VM_PRESSURE
87 case EVFILT_VM:
88 #endif
89 case DISPATCH_EVFILT_CUSTOM_ADD:
90 case DISPATCH_EVFILT_CUSTOM_OR:
91 case DISPATCH_EVFILT_TIMER:
92 if (handle) {
93 goto out_bad;
94 }
95 break;
96 default:
97 break;
98 }
99
100 ds = calloc(1ul, sizeof(struct dispatch_source_s));
101 if (slowpath(!ds)) {
102 goto out_bad;
103 }
104 dk = calloc(1ul, sizeof(struct dispatch_kevent_s));
105 if (slowpath(!dk)) {
106 goto out_bad;
107 }
108
109 dk->dk_kevent = *proto_kev;
110 dk->dk_kevent.ident = handle;
111 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
112 dk->dk_kevent.fflags |= (uint32_t)mask;
113 dk->dk_kevent.udata = dk;
114 TAILQ_INIT(&dk->dk_sources);
115
116 // Initialize as a queue first, then override some settings below.
117 _dispatch_queue_init((dispatch_queue_t)ds);
118 strlcpy(ds->dq_label, "source", sizeof(ds->dq_label));
119
120 // Dispatch Object
121 ds->do_vtable = &_dispatch_source_kevent_vtable;
122 ds->do_ref_cnt++; // the reference the manger queue holds
123 ds->do_ref_cnt++; // since source is created suspended
124 ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
125 // The initial target queue is the manager queue, in order to get
126 // the source installed. <rdar://problem/8928171>
127 ds->do_targetq = &_dispatch_mgr_q;
128
129 // Dispatch Source
130 ds->ds_ident_hack = dk->dk_kevent.ident;
131 ds->ds_dkev = dk;
132 ds->ds_pending_data_mask = dk->dk_kevent.fflags;
133 if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
134 ds->ds_is_level = true;
135 ds->ds_needs_rearm = true;
136 } else if (!(EV_CLEAR & proto_kev->flags)) {
137 // we cheat and use EV_CLEAR to mean a "flag thingy"
138 ds->ds_is_adder = true;
139 }
140
141 // Some sources require special processing
142 if (type->init != NULL) {
143 type->init(ds, type, handle, mask, q);
144 }
145 if (fastpath(!ds->ds_refs)) {
146 ds->ds_refs = calloc(1ul, sizeof(struct dispatch_source_refs_s));
147 if (slowpath(!ds->ds_refs)) {
148 goto out_bad;
149 }
150 }
151 ds->ds_refs->dr_source_wref = _dispatch_ptr2wref(ds);
152 dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
153
154 // First item on the queue sets the user-specified target queue
155 dispatch_set_target_queue(ds, q);
156 #if DISPATCH_DEBUG
157 dispatch_debug(ds, "%s", __FUNCTION__);
158 #endif
159 return ds;
160
161 out_bad:
162 free(ds);
163 free(dk);
164 return NULL;
165 }
166
167 static void
168 _dispatch_source_dispose(dispatch_source_t ds)
169 {
170 free(ds->ds_refs);
171 _dispatch_queue_dispose((dispatch_queue_t)ds);
172 }
173
174 void
175 _dispatch_source_xref_release(dispatch_source_t ds)
176 {
177 if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
178 // Arguments for and against this assert are within 6705399
179 DISPATCH_CLIENT_CRASH("Release of a suspended object");
180 }
181 _dispatch_wakeup(ds);
182 _dispatch_release(ds);
183 }
184
185 void
186 dispatch_source_cancel(dispatch_source_t ds)
187 {
188 #if DISPATCH_DEBUG
189 dispatch_debug(ds, "%s", __FUNCTION__);
190 #endif
191 // Right after we set the cancel flag, someone else
192 // could potentially invoke the source, do the cancelation,
193 // unregister the source, and deallocate it. We would
194 // need to therefore retain/release before setting the bit
195
196 _dispatch_retain(ds);
197 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_CANCELED);
198 _dispatch_wakeup(ds);
199 _dispatch_release(ds);
200 }
201
202 long
203 dispatch_source_testcancel(dispatch_source_t ds)
204 {
205 return (bool)(ds->ds_atomic_flags & DSF_CANCELED);
206 }
207
208
209 unsigned long
210 dispatch_source_get_mask(dispatch_source_t ds)
211 {
212 return ds->ds_pending_data_mask;
213 }
214
215 uintptr_t
216 dispatch_source_get_handle(dispatch_source_t ds)
217 {
218 return (int)ds->ds_ident_hack;
219 }
220
221 unsigned long
222 dispatch_source_get_data(dispatch_source_t ds)
223 {
224 return ds->ds_data;
225 }
226
227 void
228 dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
229 {
230 struct kevent kev = {
231 .fflags = (typeof(kev.fflags))val,
232 .data = val,
233 };
234
235 dispatch_assert(
236 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
237 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
238
239 _dispatch_source_merge_kevent(ds, &kev);
240 }
241
242 #pragma mark -
243 #pragma mark dispatch_source_handler
244
245 #ifdef __BLOCKS__
246 // 6618342 Contact the team that owns the Instrument DTrace probe before
247 // renaming this symbol
248 static void
249 _dispatch_source_set_event_handler2(void *context)
250 {
251 struct Block_layout *bl = context;
252
253 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
254 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
255 dispatch_source_refs_t dr = ds->ds_refs;
256
257 if (ds->ds_handler_is_block && dr->ds_handler_ctxt) {
258 Block_release(dr->ds_handler_ctxt);
259 }
260 dr->ds_handler_func = bl ? (void *)bl->invoke : NULL;
261 dr->ds_handler_ctxt = bl;
262 ds->ds_handler_is_block = true;
263 }
264
265 void
266 dispatch_source_set_event_handler(dispatch_source_t ds,
267 dispatch_block_t handler)
268 {
269 handler = _dispatch_Block_copy(handler);
270 dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
271 _dispatch_source_set_event_handler2);
272 }
273 #endif /* __BLOCKS__ */
274
275 static void
276 _dispatch_source_set_event_handler_f(void *context)
277 {
278 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
279 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
280 dispatch_source_refs_t dr = ds->ds_refs;
281
282 #ifdef __BLOCKS__
283 if (ds->ds_handler_is_block && dr->ds_handler_ctxt) {
284 Block_release(dr->ds_handler_ctxt);
285 }
286 #endif
287 dr->ds_handler_func = context;
288 dr->ds_handler_ctxt = ds->do_ctxt;
289 ds->ds_handler_is_block = false;
290 }
291
292 void
293 dispatch_source_set_event_handler_f(dispatch_source_t ds,
294 dispatch_function_t handler)
295 {
296 dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
297 _dispatch_source_set_event_handler_f);
298 }
299
300 #ifdef __BLOCKS__
301 // 6618342 Contact the team that owns the Instrument DTrace probe before
302 // renaming this symbol
303 static void
304 _dispatch_source_set_cancel_handler2(void *context)
305 {
306 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
307 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
308 dispatch_source_refs_t dr = ds->ds_refs;
309
310 if (ds->ds_cancel_is_block && dr->ds_cancel_handler) {
311 Block_release(dr->ds_cancel_handler);
312 }
313 dr->ds_cancel_handler = context;
314 ds->ds_cancel_is_block = true;
315 }
316
317 void
318 dispatch_source_set_cancel_handler(dispatch_source_t ds,
319 dispatch_block_t handler)
320 {
321 handler = _dispatch_Block_copy(handler);
322 dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
323 _dispatch_source_set_cancel_handler2);
324 }
325 #endif /* __BLOCKS__ */
326
327 static void
328 _dispatch_source_set_cancel_handler_f(void *context)
329 {
330 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
331 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
332 dispatch_source_refs_t dr = ds->ds_refs;
333
334 #ifdef __BLOCKS__
335 if (ds->ds_cancel_is_block && dr->ds_cancel_handler) {
336 Block_release(dr->ds_cancel_handler);
337 }
338 #endif
339 dr->ds_cancel_handler = context;
340 ds->ds_cancel_is_block = false;
341 }
342
343 void
344 dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
345 dispatch_function_t handler)
346 {
347 dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
348 _dispatch_source_set_cancel_handler_f);
349 }
350
351 #ifdef __BLOCKS__
352 static void
353 _dispatch_source_set_registration_handler2(void *context)
354 {
355 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
356 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
357 dispatch_source_refs_t dr = ds->ds_refs;
358
359 if (ds->ds_registration_is_block && dr->ds_registration_handler) {
360 Block_release(dr->ds_registration_handler);
361 }
362 dr->ds_registration_handler = context;
363 ds->ds_registration_is_block = true;
364 }
365
366 void
367 dispatch_source_set_registration_handler(dispatch_source_t ds,
368 dispatch_block_t handler)
369 {
370 handler = _dispatch_Block_copy(handler);
371 dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
372 _dispatch_source_set_registration_handler2);
373 }
374 #endif /* __BLOCKS__ */
375
376 static void
377 _dispatch_source_set_registration_handler_f(void *context)
378 {
379 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
380 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
381 dispatch_source_refs_t dr = ds->ds_refs;
382
383 #ifdef __BLOCKS__
384 if (ds->ds_registration_is_block && dr->ds_registration_handler) {
385 Block_release(dr->ds_registration_handler);
386 }
387 #endif
388 dr->ds_registration_handler = context;
389 ds->ds_registration_is_block = false;
390 }
391
392 void
393 dispatch_source_set_registration_handler_f(dispatch_source_t ds,
394 dispatch_function_t handler)
395 {
396 dispatch_barrier_async_f((dispatch_queue_t)ds, handler,
397 _dispatch_source_set_registration_handler_f);
398 }
399
400 #pragma mark -
401 #pragma mark dispatch_source_invoke
402
403 static void
404 _dispatch_source_registration_callout(dispatch_source_t ds)
405 {
406 dispatch_source_refs_t dr = ds->ds_refs;
407
408 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
409 // no registration callout if source is canceled rdar://problem/8955246
410 #ifdef __BLOCKS__
411 if (ds->ds_registration_is_block) {
412 Block_release(dr->ds_registration_handler);
413 }
414 } else if (ds->ds_registration_is_block) {
415 dispatch_block_t b = dr->ds_registration_handler;
416 _dispatch_client_callout_block(b);
417 Block_release(dr->ds_registration_handler);
418 #endif
419 } else {
420 dispatch_function_t f = dr->ds_registration_handler;
421 _dispatch_client_callout(ds->do_ctxt, f);
422 }
423 ds->ds_registration_is_block = false;
424 dr->ds_registration_handler = NULL;
425 }
426
427 static void
428 _dispatch_source_cancel_callout(dispatch_source_t ds)
429 {
430 dispatch_source_refs_t dr = ds->ds_refs;
431
432 ds->ds_pending_data_mask = 0;
433 ds->ds_pending_data = 0;
434 ds->ds_data = 0;
435
436 #ifdef __BLOCKS__
437 if (ds->ds_handler_is_block) {
438 Block_release(dr->ds_handler_ctxt);
439 ds->ds_handler_is_block = false;
440 dr->ds_handler_func = NULL;
441 dr->ds_handler_ctxt = NULL;
442 }
443 if (ds->ds_registration_is_block) {
444 Block_release(dr->ds_registration_handler);
445 ds->ds_registration_is_block = false;
446 dr->ds_registration_handler = NULL;
447 }
448 #endif
449
450 if (!dr->ds_cancel_handler) {
451 return;
452 }
453 if (ds->ds_cancel_is_block) {
454 #ifdef __BLOCKS__
455 dispatch_block_t b = dr->ds_cancel_handler;
456 if (ds->ds_atomic_flags & DSF_CANCELED) {
457 _dispatch_client_callout_block(b);
458 }
459 Block_release(dr->ds_cancel_handler);
460 ds->ds_cancel_is_block = false;
461 #endif
462 } else {
463 dispatch_function_t f = dr->ds_cancel_handler;
464 if (ds->ds_atomic_flags & DSF_CANCELED) {
465 _dispatch_client_callout(ds->do_ctxt, f);
466 }
467 }
468 dr->ds_cancel_handler = NULL;
469 }
470
471 static void
472 _dispatch_source_latch_and_call(dispatch_source_t ds)
473 {
474 unsigned long prev;
475
476 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
477 return;
478 }
479 dispatch_source_refs_t dr = ds->ds_refs;
480 prev = dispatch_atomic_xchg2o(ds, ds_pending_data, 0);
481 if (ds->ds_is_level) {
482 ds->ds_data = ~prev;
483 } else if (ds->ds_is_timer && ds_timer(dr).target && prev) {
484 ds->ds_data = _dispatch_source_timer_data(dr, prev);
485 } else {
486 ds->ds_data = prev;
487 }
488 if (dispatch_assume(prev) && dr->ds_handler_func) {
489 _dispatch_client_callout(dr->ds_handler_ctxt, dr->ds_handler_func);
490 }
491 }
492
493 static void
494 _dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags)
495 {
496 switch (ds->ds_dkev->dk_kevent.filter) {
497 case DISPATCH_EVFILT_TIMER:
498 // called on manager queue only
499 return _dispatch_timer_list_update(ds);
500 case EVFILT_MACHPORT:
501 if (ds->ds_pending_data_mask & DISPATCH_MACH_RECV_MESSAGE) {
502 new_flags |= DISPATCH_MACH_RECV_MESSAGE; // emulate EV_DISPATCH
503 }
504 break;
505 }
506 if (_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0)) {
507 _dispatch_kevent_unregister(ds);
508 }
509 }
510
511 static dispatch_queue_t
512 _dispatch_source_invoke(dispatch_source_t ds)
513 {
514 // This function performs all source actions. Each action is responsible
515 // for verifying that it takes place on the appropriate queue. If the
516 // current queue is not the correct queue for this action, the correct queue
517 // will be returned and the invoke will be re-driven on that queue.
518
519 // The order of tests here in invoke and in probe should be consistent.
520
521 dispatch_queue_t dq = _dispatch_queue_get_current();
522 dispatch_source_refs_t dr = ds->ds_refs;
523
524 if (!ds->ds_is_installed) {
525 // The source needs to be installed on the manager queue.
526 if (dq != &_dispatch_mgr_q) {
527 return &_dispatch_mgr_q;
528 }
529 _dispatch_kevent_register(ds);
530 if (dr->ds_registration_handler) {
531 return ds->do_targetq;
532 }
533 if (slowpath(ds->do_xref_cnt == 0)) {
534 return &_dispatch_mgr_q; // rdar://problem/9558246
535 }
536 } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
537 // Source suspended by an item drained from the source queue.
538 return NULL;
539 } else if (dr->ds_registration_handler) {
540 // The source has been registered and the registration handler needs
541 // to be delivered on the target queue.
542 if (dq != ds->do_targetq) {
543 return ds->do_targetq;
544 }
545 // clears ds_registration_handler
546 _dispatch_source_registration_callout(ds);
547 if (slowpath(ds->do_xref_cnt == 0)) {
548 return &_dispatch_mgr_q; // rdar://problem/9558246
549 }
550 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
551 // The source has been cancelled and needs to be uninstalled from the
552 // manager queue. After uninstallation, the cancellation handler needs
553 // to be delivered to the target queue.
554 if (ds->ds_dkev) {
555 if (dq != &_dispatch_mgr_q) {
556 return &_dispatch_mgr_q;
557 }
558 _dispatch_kevent_unregister(ds);
559 }
560 if (dr->ds_cancel_handler || ds->ds_handler_is_block ||
561 ds->ds_registration_is_block) {
562 if (dq != ds->do_targetq) {
563 return ds->do_targetq;
564 }
565 }
566 _dispatch_source_cancel_callout(ds);
567 } else if (ds->ds_pending_data) {
568 // The source has pending data to deliver via the event handler callback
569 // on the target queue. Some sources need to be rearmed on the manager
570 // queue after event delivery.
571 if (dq != ds->do_targetq) {
572 return ds->do_targetq;
573 }
574 _dispatch_source_latch_and_call(ds);
575 if (ds->ds_needs_rearm) {
576 return &_dispatch_mgr_q;
577 }
578 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
579 // The source needs to be rearmed on the manager queue.
580 if (dq != &_dispatch_mgr_q) {
581 return &_dispatch_mgr_q;
582 }
583 _dispatch_source_kevent_resume(ds, 0);
584 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED);
585 }
586
587 return NULL;
588 }
589
590 static bool
591 _dispatch_source_probe(dispatch_source_t ds)
592 {
593 // This function determines whether the source needs to be invoked.
594 // The order of tests here in probe and in invoke should be consistent.
595
596 dispatch_source_refs_t dr = ds->ds_refs;
597 if (!ds->ds_is_installed) {
598 // The source needs to be installed on the manager queue.
599 return true;
600 } else if (dr->ds_registration_handler) {
601 // The registration handler needs to be delivered to the target queue.
602 return true;
603 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
604 // The source needs to be uninstalled from the manager queue, or the
605 // cancellation handler needs to be delivered to the target queue.
606 // Note: cancellation assumes installation.
607 if (ds->ds_dkev || dr->ds_cancel_handler
608 #ifdef __BLOCKS__
609 || ds->ds_handler_is_block || ds->ds_registration_is_block
610 #endif
611 ) {
612 return true;
613 }
614 } else if (ds->ds_pending_data) {
615 // The source has pending data to deliver to the target queue.
616 return true;
617 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
618 // The source needs to be rearmed on the manager queue.
619 return true;
620 }
621 // Nothing to do.
622 return false;
623 }
624
625 #pragma mark -
626 #pragma mark dispatch_source_kevent
627
628 static void
629 _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke)
630 {
631 struct kevent fake;
632
633 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
634 return;
635 }
636
637 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
638 // <rdar://problem/5067725>. As a workaround, we simulate an exit event for
639 // any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
640 if (ke->flags & EV_ERROR) {
641 if (ke->filter == EVFILT_PROC && ke->data == ESRCH) {
642 fake = *ke;
643 fake.flags &= ~EV_ERROR;
644 fake.fflags = NOTE_EXIT;
645 fake.data = 0;
646 ke = &fake;
647 #if DISPATCH_USE_VM_PRESSURE
648 } else if (ke->filter == EVFILT_VM && ke->data == ENOTSUP) {
649 // Memory pressure kevent is not supported on all platforms
650 // <rdar://problem/8636227>
651 return;
652 #endif
653 } else {
654 // log the unexpected error
655 (void)dispatch_assume_zero(ke->data);
656 return;
657 }
658 }
659
660 if (ds->ds_is_level) {
661 // ke->data is signed and "negative available data" makes no sense
662 // zero bytes happens when EV_EOF is set
663 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
664 dispatch_assert(ke->data >= 0l);
665 ds->ds_pending_data = ~ke->data;
666 } else if (ds->ds_is_adder) {
667 (void)dispatch_atomic_add2o(ds, ds_pending_data, ke->data);
668 } else if (ke->fflags & ds->ds_pending_data_mask) {
669 (void)dispatch_atomic_or2o(ds, ds_pending_data,
670 ke->fflags & ds->ds_pending_data_mask);
671 }
672
673 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
674 if (ds->ds_needs_rearm) {
675 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED);
676 }
677
678 _dispatch_wakeup(ds);
679 }
680
681 void
682 _dispatch_source_drain_kevent(struct kevent *ke)
683 {
684 dispatch_kevent_t dk = ke->udata;
685 dispatch_source_refs_t dri;
686
687 #if DISPATCH_DEBUG
688 static dispatch_once_t pred;
689 dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
690 #endif
691
692 dispatch_debug_kevents(ke, 1, __func__);
693
694 #if HAVE_MACH
695 if (ke->filter == EVFILT_MACHPORT) {
696 return _dispatch_drain_mach_messages(ke);
697 }
698 #endif
699 dispatch_assert(dk);
700
701 if (ke->flags & EV_ONESHOT) {
702 dk->dk_kevent.flags |= EV_ONESHOT;
703 }
704
705 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
706 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), ke);
707 }
708 }
709
710 #pragma mark -
711 #pragma mark dispatch_kevent_t
712
713 static struct dispatch_kevent_s _dispatch_kevent_data_or = {
714 .dk_kevent = {
715 .filter = DISPATCH_EVFILT_CUSTOM_OR,
716 .flags = EV_CLEAR,
717 .udata = &_dispatch_kevent_data_or,
718 },
719 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
720 };
721 static struct dispatch_kevent_s _dispatch_kevent_data_add = {
722 .dk_kevent = {
723 .filter = DISPATCH_EVFILT_CUSTOM_ADD,
724 .udata = &_dispatch_kevent_data_add,
725 },
726 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
727 };
728
729 #if TARGET_OS_EMBEDDED
730 #define DSL_HASH_SIZE 64u // must be a power of two
731 #else
732 #define DSL_HASH_SIZE 256u // must be a power of two
733 #endif
734 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
735
736 DISPATCH_CACHELINE_ALIGN
737 static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
738
739 static dispatch_once_t __dispatch_kevent_init_pred;
740
741 static void
742 _dispatch_kevent_init(void *context DISPATCH_UNUSED)
743 {
744 unsigned int i;
745 for (i = 0; i < DSL_HASH_SIZE; i++) {
746 TAILQ_INIT(&_dispatch_sources[i]);
747 }
748
749 TAILQ_INSERT_TAIL(&_dispatch_sources[0],
750 &_dispatch_kevent_data_or, dk_list);
751 TAILQ_INSERT_TAIL(&_dispatch_sources[0],
752 &_dispatch_kevent_data_add, dk_list);
753
754 _dispatch_source_timer_init();
755 }
756
757 static inline uintptr_t
758 _dispatch_kevent_hash(uintptr_t ident, short filter)
759 {
760 uintptr_t value;
761 #if HAVE_MACH
762 value = (filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
763 #else
764 value = ident;
765 #endif
766 return DSL_HASH(value);
767 }
768
769 static dispatch_kevent_t
770 _dispatch_kevent_find(uintptr_t ident, short filter)
771 {
772 uintptr_t hash = _dispatch_kevent_hash(ident, filter);
773 dispatch_kevent_t dki;
774
775 TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
776 if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
777 break;
778 }
779 }
780 return dki;
781 }
782
783 static void
784 _dispatch_kevent_insert(dispatch_kevent_t dk)
785 {
786 uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
787 dk->dk_kevent.filter);
788
789 TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
790 }
791
792 // Find existing kevents, and merge any new flags if necessary
793 static void
794 _dispatch_kevent_register(dispatch_source_t ds)
795 {
796 dispatch_kevent_t dk;
797 typeof(dk->dk_kevent.fflags) new_flags;
798 bool do_resume = false;
799
800 if (ds->ds_is_installed) {
801 return;
802 }
803 ds->ds_is_installed = true;
804
805 dispatch_once_f(&__dispatch_kevent_init_pred,
806 NULL, _dispatch_kevent_init);
807
808 dk = _dispatch_kevent_find(ds->ds_dkev->dk_kevent.ident,
809 ds->ds_dkev->dk_kevent.filter);
810
811 if (dk) {
812 // If an existing dispatch kevent is found, check to see if new flags
813 // need to be added to the existing kevent
814 new_flags = ~dk->dk_kevent.fflags & ds->ds_dkev->dk_kevent.fflags;
815 dk->dk_kevent.fflags |= ds->ds_dkev->dk_kevent.fflags;
816 free(ds->ds_dkev);
817 ds->ds_dkev = dk;
818 do_resume = new_flags;
819 } else {
820 dk = ds->ds_dkev;
821 _dispatch_kevent_insert(dk);
822 new_flags = dk->dk_kevent.fflags;
823 do_resume = true;
824 }
825
826 TAILQ_INSERT_TAIL(&dk->dk_sources, ds->ds_refs, dr_list);
827
828 // Re-register the kevent with the kernel if new flags were added
829 // by the dispatch kevent
830 if (do_resume) {
831 dk->dk_kevent.flags |= EV_ADD;
832 }
833 if (do_resume || ds->ds_needs_rearm) {
834 _dispatch_source_kevent_resume(ds, new_flags);
835 }
836 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED);
837 }
838
839 static bool
840 _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
841 uint32_t del_flags)
842 {
843 long r;
844 switch (dk->dk_kevent.filter) {
845 case DISPATCH_EVFILT_TIMER:
846 case DISPATCH_EVFILT_CUSTOM_ADD:
847 case DISPATCH_EVFILT_CUSTOM_OR:
848 // these types not registered with kevent
849 return 0;
850 #if HAVE_MACH
851 case EVFILT_MACHPORT:
852 return _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
853 #endif
854 case EVFILT_PROC:
855 if (dk->dk_kevent.flags & EV_ONESHOT) {
856 return 0;
857 }
858 // fall through
859 default:
860 r = _dispatch_update_kq(&dk->dk_kevent);
861 if (dk->dk_kevent.flags & EV_DISPATCH) {
862 dk->dk_kevent.flags &= ~EV_ADD;
863 }
864 return r;
865 }
866 }
867
868 static void
869 _dispatch_kevent_dispose(dispatch_kevent_t dk)
870 {
871 uintptr_t hash;
872
873 switch (dk->dk_kevent.filter) {
874 case DISPATCH_EVFILT_TIMER:
875 case DISPATCH_EVFILT_CUSTOM_ADD:
876 case DISPATCH_EVFILT_CUSTOM_OR:
877 // these sources live on statically allocated lists
878 return;
879 #if HAVE_MACH
880 case EVFILT_MACHPORT:
881 _dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
882 break;
883 #endif
884 case EVFILT_PROC:
885 if (dk->dk_kevent.flags & EV_ONESHOT) {
886 break; // implicitly deleted
887 }
888 // fall through
889 default:
890 if (~dk->dk_kevent.flags & EV_DELETE) {
891 dk->dk_kevent.flags |= EV_DELETE;
892 _dispatch_update_kq(&dk->dk_kevent);
893 }
894 break;
895 }
896
897 hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
898 dk->dk_kevent.filter);
899 TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list);
900 free(dk);
901 }
902
903 static void
904 _dispatch_kevent_unregister(dispatch_source_t ds)
905 {
906 dispatch_kevent_t dk = ds->ds_dkev;
907 dispatch_source_refs_t dri;
908 uint32_t del_flags, fflags = 0;
909
910 ds->ds_dkev = NULL;
911
912 TAILQ_REMOVE(&dk->dk_sources, ds->ds_refs, dr_list);
913
914 if (TAILQ_EMPTY(&dk->dk_sources)) {
915 _dispatch_kevent_dispose(dk);
916 } else {
917 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
918 dispatch_source_t dsi = _dispatch_source_from_refs(dri);
919 fflags |= (uint32_t)dsi->ds_pending_data_mask;
920 }
921 del_flags = (uint32_t)ds->ds_pending_data_mask & ~fflags;
922 if (del_flags) {
923 dk->dk_kevent.flags |= EV_ADD;
924 dk->dk_kevent.fflags = fflags;
925 _dispatch_kevent_resume(dk, 0, del_flags);
926 }
927 }
928
929 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED);
930 ds->ds_needs_rearm = false; // re-arm is pointless and bad now
931 _dispatch_release(ds); // the retain is done at creation time
932 }
933
934 #pragma mark -
935 #pragma mark dispatch_timer
936
937 DISPATCH_CACHELINE_ALIGN
938 static struct dispatch_kevent_s _dispatch_kevent_timer[] = {
939 [DISPATCH_TIMER_INDEX_WALL] = {
940 .dk_kevent = {
941 .ident = DISPATCH_TIMER_INDEX_WALL,
942 .filter = DISPATCH_EVFILT_TIMER,
943 .udata = &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL],
944 },
945 .dk_sources = TAILQ_HEAD_INITIALIZER(
946 _dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL].dk_sources),
947 },
948 [DISPATCH_TIMER_INDEX_MACH] = {
949 .dk_kevent = {
950 .ident = DISPATCH_TIMER_INDEX_MACH,
951 .filter = DISPATCH_EVFILT_TIMER,
952 .udata = &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH],
953 },
954 .dk_sources = TAILQ_HEAD_INITIALIZER(
955 _dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH].dk_sources),
956 },
957 [DISPATCH_TIMER_INDEX_DISARM] = {
958 .dk_kevent = {
959 .ident = DISPATCH_TIMER_INDEX_DISARM,
960 .filter = DISPATCH_EVFILT_TIMER,
961 .udata = &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_DISARM],
962 },
963 .dk_sources = TAILQ_HEAD_INITIALIZER(
964 _dispatch_kevent_timer[DISPATCH_TIMER_INDEX_DISARM].dk_sources),
965 },
966 };
967 // Don't count disarmed timer list
968 #define DISPATCH_TIMER_COUNT ((sizeof(_dispatch_kevent_timer) \
969 / sizeof(_dispatch_kevent_timer[0])) - 1)
970
971 static inline void
972 _dispatch_source_timer_init(void)
973 {
974 TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_WALL)],
975 &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL], dk_list);
976 TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_MACH)],
977 &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH], dk_list);
978 TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_DISARM)],
979 &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_DISARM], dk_list);
980 }
981
982 DISPATCH_ALWAYS_INLINE
983 static inline unsigned int
984 _dispatch_source_timer_idx(dispatch_source_refs_t dr)
985 {
986 return ds_timer(dr).flags & DISPATCH_TIMER_WALL_CLOCK ?
987 DISPATCH_TIMER_INDEX_WALL : DISPATCH_TIMER_INDEX_MACH;
988 }
989
990 DISPATCH_ALWAYS_INLINE
991 static inline uint64_t
992 _dispatch_source_timer_now2(unsigned int timer)
993 {
994 switch (timer) {
995 case DISPATCH_TIMER_INDEX_MACH:
996 return _dispatch_absolute_time();
997 case DISPATCH_TIMER_INDEX_WALL:
998 return _dispatch_get_nanoseconds();
999 default:
1000 DISPATCH_CRASH("Invalid timer");
1001 }
1002 }
1003
1004 DISPATCH_ALWAYS_INLINE
1005 static inline uint64_t
1006 _dispatch_source_timer_now(dispatch_source_refs_t dr)
1007 {
1008 return _dispatch_source_timer_now2(_dispatch_source_timer_idx(dr));
1009 }
1010
1011 // Updates the ordered list of timers based on next fire date for changes to ds.
1012 // Should only be called from the context of _dispatch_mgr_q.
1013 static void
1014 _dispatch_timer_list_update(dispatch_source_t ds)
1015 {
1016 dispatch_source_refs_t dr = ds->ds_refs, dri = NULL;
1017
1018 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q);
1019
1020 // do not reschedule timers unregistered with _dispatch_kevent_unregister()
1021 if (!ds->ds_dkev) {
1022 return;
1023 }
1024
1025 // Ensure the source is on the global kevent lists before it is removed and
1026 // readded below.
1027 _dispatch_kevent_register(ds);
1028
1029 TAILQ_REMOVE(&ds->ds_dkev->dk_sources, dr, dr_list);
1030
1031 // Move timers that are disabled, suspended or have missed intervals to the
1032 // disarmed list, rearm after resume resp. source invoke will reenable them
1033 if (!ds_timer(dr).target || DISPATCH_OBJECT_SUSPENDED(ds) ||
1034 ds->ds_pending_data) {
1035 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED);
1036 ds->ds_dkev = &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_DISARM];
1037 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, (dispatch_source_refs_t)dr,
1038 dr_list);
1039 return;
1040 }
1041
1042 // change the list if the clock type has changed
1043 ds->ds_dkev = &_dispatch_kevent_timer[_dispatch_source_timer_idx(dr)];
1044
1045 TAILQ_FOREACH(dri, &ds->ds_dkev->dk_sources, dr_list) {
1046 if (ds_timer(dri).target == 0 ||
1047 ds_timer(dr).target < ds_timer(dri).target) {
1048 break;
1049 }
1050 }
1051
1052 if (dri) {
1053 TAILQ_INSERT_BEFORE(dri, dr, dr_list);
1054 } else {
1055 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, dr, dr_list);
1056 }
1057 }
1058
1059 static inline void
1060 _dispatch_run_timers2(unsigned int timer)
1061 {
1062 dispatch_source_refs_t dr;
1063 dispatch_source_t ds;
1064 uint64_t now, missed;
1065
1066 now = _dispatch_source_timer_now2(timer);
1067 while ((dr = TAILQ_FIRST(&_dispatch_kevent_timer[timer].dk_sources))) {
1068 ds = _dispatch_source_from_refs(dr);
1069 // We may find timers on the wrong list due to a pending update from
1070 // dispatch_source_set_timer. Force an update of the list in that case.
1071 if (timer != ds->ds_ident_hack) {
1072 _dispatch_timer_list_update(ds);
1073 continue;
1074 }
1075 if (!ds_timer(dr).target) {
1076 // no configured timers on the list
1077 break;
1078 }
1079 if (ds_timer(dr).target > now) {
1080 // Done running timers for now.
1081 break;
1082 }
1083 // Remove timers that are suspended or have missed intervals from the
1084 // list, rearm after resume resp. source invoke will reenable them
1085 if (DISPATCH_OBJECT_SUSPENDED(ds) || ds->ds_pending_data) {
1086 _dispatch_timer_list_update(ds);
1087 continue;
1088 }
1089 // Calculate number of missed intervals.
1090 missed = (now - ds_timer(dr).target) / ds_timer(dr).interval;
1091 if (++missed > INT_MAX) {
1092 missed = INT_MAX;
1093 }
1094 ds_timer(dr).target += missed * ds_timer(dr).interval;
1095 _dispatch_timer_list_update(ds);
1096 ds_timer(dr).last_fire = now;
1097 (void)dispatch_atomic_add2o(ds, ds_pending_data, (int)missed);
1098 _dispatch_wakeup(ds);
1099 }
1100 }
1101
1102 void
1103 _dispatch_run_timers(void)
1104 {
1105 dispatch_once_f(&__dispatch_kevent_init_pred,
1106 NULL, _dispatch_kevent_init);
1107
1108 unsigned int i;
1109 for (i = 0; i < DISPATCH_TIMER_COUNT; i++) {
1110 if (!TAILQ_EMPTY(&_dispatch_kevent_timer[i].dk_sources)) {
1111 _dispatch_run_timers2(i);
1112 }
1113 }
1114 }
1115
1116 static inline unsigned long
1117 _dispatch_source_timer_data(dispatch_source_refs_t dr, unsigned long prev)
1118 {
1119 // calculate the number of intervals since last fire
1120 unsigned long data, missed;
1121 uint64_t now = _dispatch_source_timer_now(dr);
1122 missed = (unsigned long)((now - ds_timer(dr).last_fire) /
1123 ds_timer(dr).interval);
1124 // correct for missed intervals already delivered last time
1125 data = prev - ds_timer(dr).missed + missed;
1126 ds_timer(dr).missed = missed;
1127 return data;
1128 }
1129
1130 // approx 1 year (60s * 60m * 24h * 365d)
1131 #define FOREVER_NSEC 31536000000000000ull
1132
1133 struct timespec *
1134 _dispatch_get_next_timer_fire(struct timespec *howsoon)
1135 {
1136 // <rdar://problem/6459649>
1137 // kevent(2) does not allow large timeouts, so we use a long timeout
1138 // instead (approximately 1 year).
1139 dispatch_source_refs_t dr = NULL;
1140 unsigned int timer;
1141 uint64_t now, delta_tmp, delta = UINT64_MAX;
1142
1143 for (timer = 0; timer < DISPATCH_TIMER_COUNT; timer++) {
1144 // Timers are kept in order, first one will fire next
1145 dr = TAILQ_FIRST(&_dispatch_kevent_timer[timer].dk_sources);
1146 if (!dr || !ds_timer(dr).target) {
1147 // Empty list or disabled timer
1148 continue;
1149 }
1150 now = _dispatch_source_timer_now(dr);
1151 if (ds_timer(dr).target <= now) {
1152 howsoon->tv_sec = 0;
1153 howsoon->tv_nsec = 0;
1154 return howsoon;
1155 }
1156 // the subtraction cannot go negative because the previous "if"
1157 // verified that the target is greater than now.
1158 delta_tmp = ds_timer(dr).target - now;
1159 if (!(ds_timer(dr).flags & DISPATCH_TIMER_WALL_CLOCK)) {
1160 delta_tmp = _dispatch_time_mach2nano(delta_tmp);
1161 }
1162 if (delta_tmp < delta) {
1163 delta = delta_tmp;
1164 }
1165 }
1166 if (slowpath(delta > FOREVER_NSEC)) {
1167 return NULL;
1168 } else {
1169 howsoon->tv_sec = (time_t)(delta / NSEC_PER_SEC);
1170 howsoon->tv_nsec = (long)(delta % NSEC_PER_SEC);
1171 }
1172 return howsoon;
1173 }
1174
1175 struct dispatch_set_timer_params {
1176 dispatch_source_t ds;
1177 uintptr_t ident;
1178 struct dispatch_timer_source_s values;
1179 };
1180
1181 static void
1182 _dispatch_source_set_timer3(void *context)
1183 {
1184 // Called on the _dispatch_mgr_q
1185 struct dispatch_set_timer_params *params = context;
1186 dispatch_source_t ds = params->ds;
1187 ds->ds_ident_hack = params->ident;
1188 ds_timer(ds->ds_refs) = params->values;
1189 // Clear any pending data that might have accumulated on
1190 // older timer params <rdar://problem/8574886>
1191 ds->ds_pending_data = 0;
1192 _dispatch_timer_list_update(ds);
1193 dispatch_resume(ds);
1194 dispatch_release(ds);
1195 free(params);
1196 }
1197
1198 static void
1199 _dispatch_source_set_timer2(void *context)
1200 {
1201 // Called on the source queue
1202 struct dispatch_set_timer_params *params = context;
1203 dispatch_suspend(params->ds);
1204 dispatch_barrier_async_f(&_dispatch_mgr_q, params,
1205 _dispatch_source_set_timer3);
1206 }
1207
1208 void
1209 dispatch_source_set_timer(dispatch_source_t ds,
1210 dispatch_time_t start,
1211 uint64_t interval,
1212 uint64_t leeway)
1213 {
1214 if (slowpath(!ds->ds_is_timer)) {
1215 DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source");
1216 }
1217
1218 struct dispatch_set_timer_params *params;
1219
1220 // we use zero internally to mean disabled
1221 if (interval == 0) {
1222 interval = 1;
1223 } else if ((int64_t)interval < 0) {
1224 // 6866347 - make sure nanoseconds won't overflow
1225 interval = INT64_MAX;
1226 }
1227 if ((int64_t)leeway < 0) {
1228 leeway = INT64_MAX;
1229 }
1230
1231 if (start == DISPATCH_TIME_NOW) {
1232 start = _dispatch_absolute_time();
1233 } else if (start == DISPATCH_TIME_FOREVER) {
1234 start = INT64_MAX;
1235 }
1236
1237 while (!(params = calloc(1ul, sizeof(struct dispatch_set_timer_params)))) {
1238 sleep(1);
1239 }
1240
1241 params->ds = ds;
1242 params->values.flags = ds_timer(ds->ds_refs).flags;
1243
1244 if ((int64_t)start < 0) {
1245 // wall clock
1246 params->ident = DISPATCH_TIMER_INDEX_WALL;
1247 params->values.target = -((int64_t)start);
1248 params->values.interval = interval;
1249 params->values.leeway = leeway;
1250 params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
1251 } else {
1252 // absolute clock
1253 params->ident = DISPATCH_TIMER_INDEX_MACH;
1254 params->values.target = start;
1255 params->values.interval = _dispatch_time_nano2mach(interval);
1256
1257 // rdar://problem/7287561 interval must be at least one in
1258 // in order to avoid later division by zero when calculating
1259 // the missed interval count. (NOTE: the wall clock's
1260 // interval is already "fixed" to be 1 or more)
1261 if (params->values.interval < 1) {
1262 params->values.interval = 1;
1263 }
1264
1265 params->values.leeway = _dispatch_time_nano2mach(leeway);
1266 params->values.flags &= ~DISPATCH_TIMER_WALL_CLOCK;
1267 }
1268 // Suspend the source so that it doesn't fire with pending changes
1269 // The use of suspend/resume requires the external retain/release
1270 dispatch_retain(ds);
1271 dispatch_barrier_async_f((dispatch_queue_t)ds, params,
1272 _dispatch_source_set_timer2);
1273 }
1274
1275 #pragma mark -
1276 #pragma mark dispatch_mach
1277
1278 #if HAVE_MACH
1279
1280 #if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
1281 #define _dispatch_debug_machport(name) \
1282 dispatch_debug_machport((name), __func__)
1283 #else
1284 #define _dispatch_debug_machport(name)
1285 #endif
1286
1287 // Flags for all notifications that are registered/unregistered when a
1288 // send-possible notification is requested/delivered
1289 #define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
1290 DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
1291
1292 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
1293 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
1294 (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
1295
1296 #define _DISPATCH_MACHPORT_HASH_SIZE 32
1297 #define _DISPATCH_MACHPORT_HASH(x) \
1298 _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
1299
1300 static dispatch_source_t _dispatch_mach_notify_source;
1301 static mach_port_t _dispatch_port_set;
1302 static mach_port_t _dispatch_event_port;
1303
1304 static kern_return_t _dispatch_mach_notify_update(dispatch_kevent_t dk,
1305 uint32_t new_flags, uint32_t del_flags, uint32_t mask,
1306 mach_msg_id_t notify_msgid, mach_port_mscount_t notify_sync);
1307
1308 static void
1309 _dispatch_port_set_init(void *context DISPATCH_UNUSED)
1310 {
1311 struct kevent kev = {
1312 .filter = EVFILT_MACHPORT,
1313 .flags = EV_ADD,
1314 };
1315 kern_return_t kr;
1316
1317 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
1318 &_dispatch_port_set);
1319 DISPATCH_VERIFY_MIG(kr);
1320 if (kr) {
1321 _dispatch_bug_mach_client(
1322 "_dispatch_port_set_init: mach_port_allocate() failed", kr);
1323 DISPATCH_CLIENT_CRASH(
1324 "mach_port_allocate() failed: cannot create port set");
1325 }
1326 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
1327 &_dispatch_event_port);
1328 DISPATCH_VERIFY_MIG(kr);
1329 if (kr) {
1330 _dispatch_bug_mach_client(
1331 "_dispatch_port_set_init: mach_port_allocate() failed", kr);
1332 DISPATCH_CLIENT_CRASH(
1333 "mach_port_allocate() failed: cannot create receive right");
1334 }
1335 kr = mach_port_move_member(mach_task_self(), _dispatch_event_port,
1336 _dispatch_port_set);
1337 DISPATCH_VERIFY_MIG(kr);
1338 if (kr) {
1339 _dispatch_bug_mach_client(
1340 "_dispatch_port_set_init: mach_port_move_member() failed", kr);
1341 DISPATCH_CLIENT_CRASH("mach_port_move_member() failed");
1342 }
1343
1344 kev.ident = _dispatch_port_set;
1345
1346 _dispatch_update_kq(&kev);
1347 }
1348
1349 static mach_port_t
1350 _dispatch_get_port_set(void)
1351 {
1352 static dispatch_once_t pred;
1353
1354 dispatch_once_f(&pred, NULL, _dispatch_port_set_init);
1355
1356 return _dispatch_port_set;
1357 }
1358
1359 static kern_return_t
1360 _dispatch_kevent_machport_enable(dispatch_kevent_t dk)
1361 {
1362 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
1363 kern_return_t kr;
1364
1365 _dispatch_debug_machport(mp);
1366 kr = mach_port_move_member(mach_task_self(), mp, _dispatch_get_port_set());
1367 if (slowpath(kr)) {
1368 DISPATCH_VERIFY_MIG(kr);
1369 switch (kr) {
1370 case KERN_INVALID_NAME:
1371 #if DISPATCH_DEBUG
1372 _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
1373 "prematurely", mp);
1374 #endif
1375 break;
1376 case KERN_INVALID_RIGHT:
1377 _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
1378 "mach_port_move_member() failed ", kr);
1379 break;
1380 default:
1381 (void)dispatch_assume_zero(kr);
1382 break;
1383 }
1384 }
1385 return kr;
1386 }
1387
1388 static void
1389 _dispatch_kevent_machport_disable(dispatch_kevent_t dk)
1390 {
1391 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
1392 kern_return_t kr;
1393
1394 _dispatch_debug_machport(mp);
1395 kr = mach_port_move_member(mach_task_self(), mp, 0);
1396 if (slowpath(kr)) {
1397 DISPATCH_VERIFY_MIG(kr);
1398 switch (kr) {
1399 case KERN_INVALID_RIGHT:
1400 case KERN_INVALID_NAME:
1401 #if DISPATCH_DEBUG
1402 _dispatch_log("Corruption: Mach receive right 0x%x destroyed "
1403 "prematurely", mp);
1404 #endif
1405 break;
1406 default:
1407 (void)dispatch_assume_zero(kr);
1408 break;
1409 }
1410 }
1411 }
1412
1413 kern_return_t
1414 _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags,
1415 uint32_t del_flags)
1416 {
1417 kern_return_t kr_recv = 0, kr_sp = 0;
1418
1419 dispatch_assert_zero(new_flags & del_flags);
1420 if (new_flags & DISPATCH_MACH_RECV_MESSAGE) {
1421 kr_recv = _dispatch_kevent_machport_enable(dk);
1422 } else if (del_flags & DISPATCH_MACH_RECV_MESSAGE) {
1423 _dispatch_kevent_machport_disable(dk);
1424 }
1425 if ((new_flags & _DISPATCH_MACH_SP_FLAGS) ||
1426 (del_flags & _DISPATCH_MACH_SP_FLAGS)) {
1427 // Requesting a (delayed) non-sync send-possible notification
1428 // registers for both immediate dead-name notification and delayed-arm
1429 // send-possible notification for the port.
1430 // The send-possible notification is armed when a mach_msg() with the
1431 // the MACH_SEND_NOTIFY to the port times out.
1432 // If send-possible is unavailable, fall back to immediate dead-name
1433 // registration rdar://problem/2527840&9008724
1434 kr_sp = _dispatch_mach_notify_update(dk, new_flags, del_flags,
1435 _DISPATCH_MACH_SP_FLAGS, MACH_NOTIFY_SEND_POSSIBLE,
1436 MACH_NOTIFY_SEND_POSSIBLE == MACH_NOTIFY_DEAD_NAME ? 1 : 0);
1437 }
1438
1439 return (kr_recv ? kr_recv : kr_sp);
1440 }
1441
1442 void
1443 _dispatch_drain_mach_messages(struct kevent *ke)
1444 {
1445 mach_port_t name = (mach_port_name_t)ke->data;
1446 dispatch_source_refs_t dri;
1447 dispatch_kevent_t dk;
1448 struct kevent kev;
1449
1450 if (!dispatch_assume(name)) {
1451 return;
1452 }
1453 _dispatch_debug_machport(name);
1454 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
1455 if (!dispatch_assume(dk)) {
1456 return;
1457 }
1458 _dispatch_kevent_machport_disable(dk); // emulate EV_DISPATCH
1459
1460 EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH,
1461 DISPATCH_MACH_RECV_MESSAGE, 0, dk);
1462
1463 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
1464 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), &kev);
1465 }
1466 }
1467
1468 static inline void
1469 _dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, uint32_t unreg,
1470 bool final)
1471 {
1472 dispatch_source_refs_t dri;
1473 dispatch_kevent_t dk;
1474 struct kevent kev;
1475
1476 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
1477 if (!dk) {
1478 return;
1479 }
1480
1481 // Update notification registration state.
1482 dk->dk_kevent.data &= ~unreg;
1483 if (!final) {
1484 // Re-register for notification before delivery
1485 _dispatch_kevent_resume(dk, flag, 0);
1486 }
1487
1488 EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE, flag, 0, dk);
1489
1490 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
1491 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), &kev);
1492 if (final) {
1493 // this can never happen again
1494 // this must happen after the merge
1495 // this may be racy in the future, but we don't provide a 'setter'
1496 // API for the mask yet
1497 _dispatch_source_from_refs(dri)->ds_pending_data_mask &= ~unreg;
1498 }
1499 }
1500
1501 if (final) {
1502 // no more sources have these flags
1503 dk->dk_kevent.fflags &= ~unreg;
1504 }
1505 }
1506
1507 static kern_return_t
1508 _dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags,
1509 uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid,
1510 mach_port_mscount_t notify_sync)
1511 {
1512 mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
1513 typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data;
1514 kern_return_t kr, krr = 0;
1515
1516 // Update notification registration state.
1517 dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask;
1518 dk->dk_kevent.data &= ~(del_flags & mask);
1519
1520 _dispatch_debug_machport(port);
1521 if ((dk->dk_kevent.data & mask) && !(prev & mask)) {
1522 previous = MACH_PORT_NULL;
1523 krr = mach_port_request_notification(mach_task_self(), port,
1524 notify_msgid, notify_sync, _dispatch_event_port,
1525 MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
1526 DISPATCH_VERIFY_MIG(krr);
1527
1528 switch(krr) {
1529 case KERN_INVALID_NAME:
1530 case KERN_INVALID_RIGHT:
1531 // Supress errors & clear registration state
1532 dk->dk_kevent.data &= ~mask;
1533 break;
1534 default:
1535 // Else, we dont expect any errors from mach. Log any errors
1536 if (dispatch_assume_zero(krr)) {
1537 // log the error & clear registration state
1538 dk->dk_kevent.data &= ~mask;
1539 } else if (dispatch_assume_zero(previous)) {
1540 // Another subsystem has beat libdispatch to requesting the
1541 // specified Mach notification on this port. We should
1542 // technically cache the previous port and message it when the
1543 // kernel messages our port. Or we can just say screw those
1544 // subsystems and deallocate the previous port.
1545 // They should adopt libdispatch :-P
1546 kr = mach_port_deallocate(mach_task_self(), previous);
1547 DISPATCH_VERIFY_MIG(kr);
1548 (void)dispatch_assume_zero(kr);
1549 previous = MACH_PORT_NULL;
1550 }
1551 }
1552 } else if (!(dk->dk_kevent.data & mask) && (prev & mask)) {
1553 previous = MACH_PORT_NULL;
1554 kr = mach_port_request_notification(mach_task_self(), port,
1555 notify_msgid, notify_sync, MACH_PORT_NULL,
1556 MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous);
1557 DISPATCH_VERIFY_MIG(kr);
1558
1559 switch (kr) {
1560 case KERN_INVALID_NAME:
1561 case KERN_INVALID_RIGHT:
1562 case KERN_INVALID_ARGUMENT:
1563 break;
1564 default:
1565 if (dispatch_assume_zero(kr)) {
1566 // log the error
1567 }
1568 }
1569 } else {
1570 return 0;
1571 }
1572 if (slowpath(previous)) {
1573 // the kernel has not consumed the send-once right yet
1574 (void)dispatch_assume_zero(
1575 _dispatch_send_consume_send_once_right(previous));
1576 }
1577 return krr;
1578 }
1579
1580 static void
1581 _dispatch_mach_notify_source2(void *context)
1582 {
1583 dispatch_source_t ds = context;
1584 size_t maxsz = MAX(sizeof(union
1585 __RequestUnion___dispatch_send_libdispatch_internal_protocol_subsystem),
1586 sizeof(union
1587 __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem));
1588
1589 dispatch_mig_server(ds, maxsz, libdispatch_internal_protocol_server);
1590 }
1591
1592 void
1593 _dispatch_mach_notify_source_init(void *context DISPATCH_UNUSED)
1594 {
1595 _dispatch_get_port_set();
1596
1597 _dispatch_mach_notify_source = dispatch_source_create(
1598 DISPATCH_SOURCE_TYPE_MACH_RECV, _dispatch_event_port, 0,
1599 &_dispatch_mgr_q);
1600 dispatch_assert(_dispatch_mach_notify_source);
1601 dispatch_set_context(_dispatch_mach_notify_source,
1602 _dispatch_mach_notify_source);
1603 dispatch_source_set_event_handler_f(_dispatch_mach_notify_source,
1604 _dispatch_mach_notify_source2);
1605 dispatch_resume(_dispatch_mach_notify_source);
1606 }
1607
1608 kern_return_t
1609 _dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED,
1610 mach_port_name_t name)
1611 {
1612 #if DISPATCH_DEBUG
1613 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
1614 "deleted prematurely", name);
1615 #endif
1616
1617 _dispatch_debug_machport(name);
1618 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED,
1619 _DISPATCH_MACH_SP_FLAGS, true);
1620
1621 return KERN_SUCCESS;
1622 }
1623
1624 kern_return_t
1625 _dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED,
1626 mach_port_name_t name)
1627 {
1628 kern_return_t kr;
1629
1630 #if DISPATCH_DEBUG
1631 _dispatch_log("machport[0x%08x]: dead-name notification: %s",
1632 name, __func__);
1633 #endif
1634 _dispatch_debug_machport(name);
1635 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD,
1636 _DISPATCH_MACH_SP_FLAGS, true);
1637
1638 // the act of receiving a dead name notification allocates a dead-name
1639 // right that must be deallocated
1640 kr = mach_port_deallocate(mach_task_self(), name);
1641 DISPATCH_VERIFY_MIG(kr);
1642 //(void)dispatch_assume_zero(kr);
1643
1644 return KERN_SUCCESS;
1645 }
1646
1647 kern_return_t
1648 _dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED,
1649 mach_port_name_t name)
1650 {
1651 #if DISPATCH_DEBUG
1652 _dispatch_log("machport[0x%08x]: send-possible notification: %s",
1653 name, __func__);
1654 #endif
1655 _dispatch_debug_machport(name);
1656 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE,
1657 _DISPATCH_MACH_SP_FLAGS, false);
1658
1659 return KERN_SUCCESS;
1660 }
1661
1662 mach_msg_return_t
1663 dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz,
1664 dispatch_mig_callback_t callback)
1665 {
1666 mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
1667 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
1668 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0);
1669 mach_msg_options_t tmp_options;
1670 mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
1671 mach_msg_return_t kr = 0;
1672 unsigned int cnt = 1000; // do not stall out serial queues
1673 int demux_success;
1674 bool received = false;
1675 size_t rcv_size = maxmsgsz + MAX_TRAILER_SIZE;
1676
1677 // XXX FIXME -- allocate these elsewhere
1678 bufRequest = alloca(rcv_size);
1679 bufReply = alloca(rcv_size);
1680 bufReply->Head.msgh_size = 0; // make CLANG happy
1681 bufRequest->RetCode = 0;
1682
1683 #if DISPATCH_DEBUG
1684 options |= MACH_RCV_LARGE; // rdar://problem/8422992
1685 #endif
1686 tmp_options = options;
1687 // XXX FIXME -- change this to not starve out the target queue
1688 for (;;) {
1689 if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
1690 options &= ~MACH_RCV_MSG;
1691 tmp_options &= ~MACH_RCV_MSG;
1692
1693 if (!(tmp_options & MACH_SEND_MSG)) {
1694 break;
1695 }
1696 }
1697 kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
1698 (mach_msg_size_t)rcv_size, (mach_port_t)ds->ds_ident_hack, 0,0);
1699
1700 tmp_options = options;
1701
1702 if (slowpath(kr)) {
1703 switch (kr) {
1704 case MACH_SEND_INVALID_DEST:
1705 case MACH_SEND_TIMED_OUT:
1706 if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
1707 mach_msg_destroy(&bufReply->Head);
1708 }
1709 break;
1710 case MACH_RCV_TIMED_OUT:
1711 // Don't return an error if a message was sent this time or
1712 // a message was successfully received previously
1713 // rdar://problems/7363620&7791738
1714 if(bufReply->Head.msgh_remote_port || received) {
1715 kr = MACH_MSG_SUCCESS;
1716 }
1717 break;
1718 case MACH_RCV_INVALID_NAME:
1719 break;
1720 #if DISPATCH_DEBUG
1721 case MACH_RCV_TOO_LARGE:
1722 // receive messages that are too large and log their id and size
1723 // rdar://problem/8422992
1724 tmp_options &= ~MACH_RCV_LARGE;
1725 size_t large_size = bufReply->Head.msgh_size + MAX_TRAILER_SIZE;
1726 void *large_buf = malloc(large_size);
1727 if (large_buf) {
1728 rcv_size = large_size;
1729 bufReply = large_buf;
1730 }
1731 if (!mach_msg(&bufReply->Head, tmp_options, 0,
1732 (mach_msg_size_t)rcv_size,
1733 (mach_port_t)ds->ds_ident_hack, 0, 0)) {
1734 _dispatch_log("BUG in libdispatch client: "
1735 "dispatch_mig_server received message larger than "
1736 "requested size %zd: id = 0x%x, size = %d",
1737 maxmsgsz, bufReply->Head.msgh_id,
1738 bufReply->Head.msgh_size);
1739 }
1740 if (large_buf) {
1741 free(large_buf);
1742 }
1743 // fall through
1744 #endif
1745 default:
1746 _dispatch_bug_mach_client(
1747 "dispatch_mig_server: mach_msg() failed", kr);
1748 break;
1749 }
1750 break;
1751 }
1752
1753 if (!(tmp_options & MACH_RCV_MSG)) {
1754 break;
1755 }
1756 received = true;
1757
1758 bufTemp = bufRequest;
1759 bufRequest = bufReply;
1760 bufReply = bufTemp;
1761
1762 demux_success = callback(&bufRequest->Head, &bufReply->Head);
1763
1764 if (!demux_success) {
1765 // destroy the request - but not the reply port
1766 bufRequest->Head.msgh_remote_port = 0;
1767 mach_msg_destroy(&bufRequest->Head);
1768 } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
1769 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
1770 // is present
1771 if (slowpath(bufReply->RetCode)) {
1772 if (bufReply->RetCode == MIG_NO_REPLY) {
1773 continue;
1774 }
1775
1776 // destroy the request - but not the reply port
1777 bufRequest->Head.msgh_remote_port = 0;
1778 mach_msg_destroy(&bufRequest->Head);
1779 }
1780 }
1781
1782 if (bufReply->Head.msgh_remote_port) {
1783 tmp_options |= MACH_SEND_MSG;
1784 if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) !=
1785 MACH_MSG_TYPE_MOVE_SEND_ONCE) {
1786 tmp_options |= MACH_SEND_TIMEOUT;
1787 }
1788 }
1789 }
1790
1791 return kr;
1792 }
1793
1794 #endif /* HAVE_MACH */
1795
1796 #pragma mark -
1797 #pragma mark dispatch_source_debug
1798
1799 DISPATCH_NOINLINE
1800 static const char *
1801 _evfiltstr(short filt)
1802 {
1803 switch (filt) {
1804 #define _evfilt2(f) case (f): return #f
1805 _evfilt2(EVFILT_READ);
1806 _evfilt2(EVFILT_WRITE);
1807 _evfilt2(EVFILT_AIO);
1808 _evfilt2(EVFILT_VNODE);
1809 _evfilt2(EVFILT_PROC);
1810 _evfilt2(EVFILT_SIGNAL);
1811 _evfilt2(EVFILT_TIMER);
1812 #ifdef EVFILT_VM
1813 _evfilt2(EVFILT_VM);
1814 #endif
1815 #if HAVE_MACH
1816 _evfilt2(EVFILT_MACHPORT);
1817 #endif
1818 _evfilt2(EVFILT_FS);
1819 _evfilt2(EVFILT_USER);
1820
1821 _evfilt2(DISPATCH_EVFILT_TIMER);
1822 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
1823 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
1824 default:
1825 return "EVFILT_missing";
1826 }
1827 }
1828
1829 static size_t
1830 _dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
1831 {
1832 dispatch_queue_t target = ds->do_targetq;
1833 return snprintf(buf, bufsiz, "target = %s[%p], pending_data = 0x%lx, "
1834 "pending_data_mask = 0x%lx, ",
1835 target ? target->dq_label : "", target,
1836 ds->ds_pending_data, ds->ds_pending_data_mask);
1837 }
1838
1839 static size_t
1840 _dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
1841 {
1842 dispatch_source_refs_t dr = ds->ds_refs;
1843 return snprintf(buf, bufsiz, "timer = { target = 0x%llx, "
1844 "last_fire = 0x%llx, interval = 0x%llx, flags = 0x%llx }, ",
1845 ds_timer(dr).target, ds_timer(dr).last_fire, ds_timer(dr).interval,
1846 ds_timer(dr).flags);
1847 }
1848
1849 static size_t
1850 _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
1851 {
1852 size_t offset = 0;
1853 offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
1854 dx_kind(ds), ds);
1855 offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
1856 offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
1857 if (ds->ds_is_timer) {
1858 offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
1859 }
1860 return offset;
1861 }
1862
1863 static size_t
1864 _dispatch_source_kevent_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
1865 {
1866 size_t offset = _dispatch_source_debug(ds, buf, bufsiz);
1867 offset += snprintf(&buf[offset], bufsiz - offset, "filter = %s }",
1868 ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????");
1869 return offset;
1870 }
1871
1872 #if DISPATCH_DEBUG
1873 void
1874 dispatch_debug_kevents(struct kevent* kev, size_t count, const char* str)
1875 {
1876 size_t i;
1877 for (i = 0; i < count; ++i) {
1878 _dispatch_log("kevent[%lu] = { ident = %p, filter = %s, flags = 0x%x, "
1879 "fflags = 0x%x, data = %p, udata = %p }: %s",
1880 i, (void*)kev[i].ident, _evfiltstr(kev[i].filter), kev[i].flags,
1881 kev[i].fflags, (void*)kev[i].data, (void*)kev[i].udata, str);
1882 }
1883 }
1884
1885 static void
1886 _dispatch_kevent_debugger2(void *context)
1887 {
1888 struct sockaddr sa;
1889 socklen_t sa_len = sizeof(sa);
1890 int c, fd = (int)(long)context;
1891 unsigned int i;
1892 dispatch_kevent_t dk;
1893 dispatch_source_t ds;
1894 dispatch_source_refs_t dr;
1895 FILE *debug_stream;
1896
1897 c = accept(fd, &sa, &sa_len);
1898 if (c == -1) {
1899 if (errno != EAGAIN) {
1900 (void)dispatch_assume_zero(errno);
1901 }
1902 return;
1903 }
1904 #if 0
1905 int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO
1906 if (r == -1) {
1907 (void)dispatch_assume_zero(errno);
1908 }
1909 #endif
1910 debug_stream = fdopen(c, "a");
1911 if (!dispatch_assume(debug_stream)) {
1912 close(c);
1913 return;
1914 }
1915
1916 fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
1917 fprintf(debug_stream, "Content-type: text/html\r\n");
1918 fprintf(debug_stream, "Pragma: nocache\r\n");
1919 fprintf(debug_stream, "\r\n");
1920 fprintf(debug_stream, "<html>\n");
1921 fprintf(debug_stream, "<head><title>PID %u</title></head>\n", getpid());
1922 fprintf(debug_stream, "<body>\n<ul>\n");
1923
1924 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td>"
1925 // "<td>DK</td><td>DK</td><td>DK</td></tr>\n");
1926
1927 for (i = 0; i < DSL_HASH_SIZE; i++) {
1928 if (TAILQ_EMPTY(&_dispatch_sources[i])) {
1929 continue;
1930 }
1931 TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
1932 fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags "
1933 "0x%hx fflags 0x%x data 0x%lx udata %p\n",
1934 dk, (unsigned long)dk->dk_kevent.ident,
1935 _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
1936 dk->dk_kevent.fflags, (unsigned long)dk->dk_kevent.data,
1937 dk->dk_kevent.udata);
1938 fprintf(debug_stream, "\t\t<ul>\n");
1939 TAILQ_FOREACH(dr, &dk->dk_sources, dr_list) {
1940 ds = _dispatch_source_from_refs(dr);
1941 fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend "
1942 "0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
1943 ds, ds->do_ref_cnt, ds->do_suspend_cnt,
1944 ds->ds_pending_data, ds->ds_pending_data_mask,
1945 ds->ds_atomic_flags);
1946 if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
1947 dispatch_queue_t dq = ds->do_targetq;
1948 fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend "
1949 "0x%x label: %s\n", dq, dq->do_ref_cnt,
1950 dq->do_suspend_cnt, dq->dq_label);
1951 }
1952 }
1953 fprintf(debug_stream, "\t\t</ul>\n");
1954 fprintf(debug_stream, "\t</li>\n");
1955 }
1956 }
1957 fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
1958 fflush(debug_stream);
1959 fclose(debug_stream);
1960 }
1961
1962 static void
1963 _dispatch_kevent_debugger2_cancel(void *context)
1964 {
1965 int ret, fd = (int)(long)context;
1966
1967 ret = close(fd);
1968 if (ret != -1) {
1969 (void)dispatch_assume_zero(errno);
1970 }
1971 }
1972
1973 static void
1974 _dispatch_kevent_debugger(void *context DISPATCH_UNUSED)
1975 {
1976 union {
1977 struct sockaddr_in sa_in;
1978 struct sockaddr sa;
1979 } sa_u = {
1980 .sa_in = {
1981 .sin_family = AF_INET,
1982 .sin_addr = { htonl(INADDR_LOOPBACK), },
1983 },
1984 };
1985 dispatch_source_t ds;
1986 const char *valstr;
1987 int val, r, fd, sock_opt = 1;
1988 socklen_t slen = sizeof(sa_u);
1989
1990 if (issetugid()) {
1991 return;
1992 }
1993 valstr = getenv("LIBDISPATCH_DEBUGGER");
1994 if (!valstr) {
1995 return;
1996 }
1997 val = atoi(valstr);
1998 if (val == 2) {
1999 sa_u.sa_in.sin_addr.s_addr = 0;
2000 }
2001 fd = socket(PF_INET, SOCK_STREAM, 0);
2002 if (fd == -1) {
2003 (void)dispatch_assume_zero(errno);
2004 return;
2005 }
2006 r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt,
2007 (socklen_t) sizeof sock_opt);
2008 if (r == -1) {
2009 (void)dispatch_assume_zero(errno);
2010 goto out_bad;
2011 }
2012 #if 0
2013 r = fcntl(fd, F_SETFL, O_NONBLOCK);
2014 if (r == -1) {
2015 (void)dispatch_assume_zero(errno);
2016 goto out_bad;
2017 }
2018 #endif
2019 r = bind(fd, &sa_u.sa, sizeof(sa_u));
2020 if (r == -1) {
2021 (void)dispatch_assume_zero(errno);
2022 goto out_bad;
2023 }
2024 r = listen(fd, SOMAXCONN);
2025 if (r == -1) {
2026 (void)dispatch_assume_zero(errno);
2027 goto out_bad;
2028 }
2029 r = getsockname(fd, &sa_u.sa, &slen);
2030 if (r == -1) {
2031 (void)dispatch_assume_zero(errno);
2032 goto out_bad;
2033 }
2034
2035 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0,
2036 &_dispatch_mgr_q);
2037 if (dispatch_assume(ds)) {
2038 _dispatch_log("LIBDISPATCH: debug port: %hu",
2039 (in_port_t)ntohs(sa_u.sa_in.sin_port));
2040
2041 /* ownership of fd transfers to ds */
2042 dispatch_set_context(ds, (void *)(long)fd);
2043 dispatch_source_set_event_handler_f(ds, _dispatch_kevent_debugger2);
2044 dispatch_source_set_cancel_handler_f(ds,
2045 _dispatch_kevent_debugger2_cancel);
2046 dispatch_resume(ds);
2047
2048 return;
2049 }
2050 out_bad:
2051 close(fd);
2052 }
2053
2054 #if HAVE_MACH
2055
2056 #ifndef MACH_PORT_TYPE_SPREQUEST
2057 #define MACH_PORT_TYPE_SPREQUEST 0x40000000
2058 #endif
2059
2060 void
2061 dispatch_debug_machport(mach_port_t name, const char* str)
2062 {
2063 mach_port_type_t type;
2064 mach_msg_bits_t ns = 0, nr = 0, nso = 0, nd = 0;
2065 unsigned int dnreqs = 0, dnrsiz;
2066 kern_return_t kr = mach_port_type(mach_task_self(), name, &type);
2067 if (kr) {
2068 _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name,
2069 kr, mach_error_string(kr), str);
2070 return;
2071 }
2072 if (type & MACH_PORT_TYPE_SEND) {
2073 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
2074 MACH_PORT_RIGHT_SEND, &ns));
2075 }
2076 if (type & MACH_PORT_TYPE_SEND_ONCE) {
2077 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
2078 MACH_PORT_RIGHT_SEND_ONCE, &nso));
2079 }
2080 if (type & MACH_PORT_TYPE_DEAD_NAME) {
2081 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
2082 MACH_PORT_RIGHT_DEAD_NAME, &nd));
2083 }
2084 if (type & (MACH_PORT_TYPE_RECEIVE|MACH_PORT_TYPE_SEND|
2085 MACH_PORT_TYPE_SEND_ONCE)) {
2086 (void)dispatch_assume_zero(mach_port_dnrequest_info(mach_task_self(),
2087 name, &dnrsiz, &dnreqs));
2088 }
2089 if (type & MACH_PORT_TYPE_RECEIVE) {
2090 mach_port_status_t status = { .mps_pset = 0, };
2091 mach_msg_type_number_t cnt = MACH_PORT_RECEIVE_STATUS_COUNT;
2092 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
2093 MACH_PORT_RIGHT_RECEIVE, &nr));
2094 (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
2095 name, MACH_PORT_RECEIVE_STATUS, (void*)&status, &cnt));
2096 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
2097 "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
2098 "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
2099 "seqno(%03u) }: %s", name, nr, ns, nso, nd, dnreqs,
2100 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N",
2101 status.mps_nsrequest ? "Y":"N", status.mps_pdrequest ? "Y":"N",
2102 status.mps_srights ? "Y":"N", status.mps_sorights,
2103 status.mps_qlimit, status.mps_msgcount, status.mps_mscount,
2104 status.mps_seqno, str);
2105 } else if (type & (MACH_PORT_TYPE_SEND|MACH_PORT_TYPE_SEND_ONCE|
2106 MACH_PORT_TYPE_DEAD_NAME)) {
2107 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
2108 "dnreqs(%03u) spreq(%s) }: %s", name, nr, ns, nso, nd, dnreqs,
2109 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", str);
2110 } else {
2111 _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name, type,
2112 str);
2113 }
2114 }
2115
2116 #endif // HAVE_MACH
2117
2118 #endif // DISPATCH_DEBUG