2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 #include "protocolServer.h"
24 #include <sys/mount.h>
26 #define DISPATCH_EVFILT_TIMER (-EVFILT_SYSCOUNT - 1)
27 #define DISPATCH_EVFILT_CUSTOM_ADD (-EVFILT_SYSCOUNT - 2)
28 #define DISPATCH_EVFILT_CUSTOM_OR (-EVFILT_SYSCOUNT - 3)
29 #define DISPATCH_EVFILT_SYSCOUNT (EVFILT_SYSCOUNT + 3)
31 #define DISPATCH_TIMER_INDEX_WALL 0
32 #define DISPATCH_TIMER_INDEX_MACH 1
33 static struct dispatch_kevent_s _dispatch_kevent_timer
[] = {
36 .ident
= DISPATCH_TIMER_INDEX_WALL
,
37 .filter
= DISPATCH_EVFILT_TIMER
,
38 .udata
= &_dispatch_kevent_timer
[0],
40 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer
[0].dk_sources
),
44 .ident
= DISPATCH_TIMER_INDEX_MACH
,
45 .filter
= DISPATCH_EVFILT_TIMER
,
46 .udata
= &_dispatch_kevent_timer
[1],
48 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer
[1].dk_sources
),
51 #define DISPATCH_TIMER_COUNT (sizeof _dispatch_kevent_timer / sizeof _dispatch_kevent_timer[0])
53 static struct dispatch_kevent_s _dispatch_kevent_data_or
= {
55 .filter
= DISPATCH_EVFILT_CUSTOM_OR
,
57 .udata
= &_dispatch_kevent_data_or
,
59 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or
.dk_sources
),
61 static struct dispatch_kevent_s _dispatch_kevent_data_add
= {
63 .filter
= DISPATCH_EVFILT_CUSTOM_ADD
,
64 .udata
= &_dispatch_kevent_data_add
,
66 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add
.dk_sources
),
69 #ifndef DISPATCH_NO_LEGACY
70 struct dispatch_source_attr_vtable_s
{
71 DISPATCH_VTABLE_HEADER(dispatch_source_attr_s
);
74 struct dispatch_source_attr_s
{
75 DISPATCH_STRUCT_HEADER(dispatch_source_attr_s
, dispatch_source_attr_vtable_s
);
77 dispatch_source_finalizer_function_t finalizer_func
;
80 #endif /* DISPATCH_NO_LEGACY */
82 #define _dispatch_source_call_block ((void *)-1)
83 static void _dispatch_source_latch_and_call(dispatch_source_t ds
);
84 static void _dispatch_source_cancel_callout(dispatch_source_t ds
);
85 static bool _dispatch_source_probe(dispatch_source_t ds
);
86 static void _dispatch_source_dispose(dispatch_source_t ds
);
87 static void _dispatch_source_merge_kevent(dispatch_source_t ds
, const struct kevent
*ke
);
88 static size_t _dispatch_source_debug(dispatch_source_t ds
, char* buf
, size_t bufsiz
);
89 static size_t dispatch_source_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
);
90 static dispatch_queue_t
_dispatch_source_invoke(dispatch_source_t ds
);
92 static void _dispatch_kevent_merge(dispatch_source_t ds
);
93 static void _dispatch_kevent_release(dispatch_source_t ds
);
94 static void _dispatch_kevent_resume(dispatch_kevent_t dk
, uint32_t new_flags
, uint32_t del_flags
);
95 static void _dispatch_kevent_machport_resume(dispatch_kevent_t dk
, uint32_t new_flags
, uint32_t del_flags
);
96 static void _dispatch_kevent_machport_enable(dispatch_kevent_t dk
);
97 static void _dispatch_kevent_machport_disable(dispatch_kevent_t dk
);
99 static void _dispatch_drain_mach_messages(struct kevent
*ke
);
100 static void _dispatch_timer_list_update(dispatch_source_t ds
);
103 _dispatch_mach_notify_source_init(void *context
__attribute__((unused
)));
106 _evfiltstr(short filt
)
109 #define _evfilt2(f) case (f): return #f
110 _evfilt2(EVFILT_READ
);
111 _evfilt2(EVFILT_WRITE
);
112 _evfilt2(EVFILT_AIO
);
113 _evfilt2(EVFILT_VNODE
);
114 _evfilt2(EVFILT_PROC
);
115 _evfilt2(EVFILT_SIGNAL
);
116 _evfilt2(EVFILT_TIMER
);
117 _evfilt2(EVFILT_MACHPORT
);
119 _evfilt2(EVFILT_USER
);
120 _evfilt2(EVFILT_SESSION
);
122 _evfilt2(DISPATCH_EVFILT_TIMER
);
123 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD
);
124 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR
);
126 return "EVFILT_missing";
130 #define DSL_HASH_SIZE 256u // must be a power of two
131 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
133 static TAILQ_HEAD(, dispatch_kevent_s
) _dispatch_sources
[DSL_HASH_SIZE
];
135 static dispatch_kevent_t
136 _dispatch_kevent_find(uintptr_t ident
, short filter
)
138 uintptr_t hash
= DSL_HASH(filter
== EVFILT_MACHPORT
? MACH_PORT_INDEX(ident
) : ident
);
139 dispatch_kevent_t dki
;
141 TAILQ_FOREACH(dki
, &_dispatch_sources
[hash
], dk_list
) {
142 if (dki
->dk_kevent
.ident
== ident
&& dki
->dk_kevent
.filter
== filter
) {
150 _dispatch_kevent_insert(dispatch_kevent_t dk
)
152 uintptr_t ident
= dk
->dk_kevent
.ident
;
153 uintptr_t hash
= DSL_HASH(dk
->dk_kevent
.filter
== EVFILT_MACHPORT
? MACH_PORT_INDEX(ident
) : ident
);
155 TAILQ_INSERT_TAIL(&_dispatch_sources
[hash
], dk
, dk_list
);
159 dispatch_source_cancel(dispatch_source_t ds
)
162 dispatch_debug(ds
, __FUNCTION__
);
164 dispatch_atomic_or(&ds
->ds_atomic_flags
, DSF_CANCELED
);
165 _dispatch_wakeup(ds
);
168 #ifndef DISPATCH_NO_LEGACY
170 _dispatch_source_legacy_xref_release(dispatch_source_t ds
)
172 if (ds
->ds_is_legacy
) {
173 if (!(ds
->ds_timer
.flags
& DISPATCH_TIMER_ONESHOT
)) {
174 dispatch_source_cancel(ds
);
177 // Clients often leave sources suspended at the last release
178 dispatch_atomic_and(&ds
->do_suspend_cnt
, DISPATCH_OBJECT_SUSPEND_LOCK
);
179 } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds
))) {
180 // Arguments for and against this assert are within 6705399
181 DISPATCH_CLIENT_CRASH("Release of a suspended object");
183 _dispatch_wakeup(ds
);
184 _dispatch_release(ds
);
186 #endif /* DISPATCH_NO_LEGACY */
189 dispatch_source_testcancel(dispatch_source_t ds
)
191 return (bool)(ds
->ds_atomic_flags
& DSF_CANCELED
);
196 dispatch_source_get_mask(dispatch_source_t ds
)
198 return ds
->ds_pending_data_mask
;
202 dispatch_source_get_handle(dispatch_source_t ds
)
204 return (int)ds
->ds_ident_hack
;
208 dispatch_source_get_data(dispatch_source_t ds
)
215 dispatch_debug_kevents(struct kevent
* kev
, size_t count
, const char* str
)
218 for (i
= 0; i
< count
; ++i
) {
219 _dispatch_log("kevent[%lu] = { ident = %p, filter = %s, flags = 0x%x, fflags = 0x%x, data = %p, udata = %p }: %s",
220 i
, (void*)kev
[i
].ident
, _evfiltstr(kev
[i
].filter
), kev
[i
].flags
, kev
[i
].fflags
, (void*)kev
[i
].data
, (void*)kev
[i
].udata
, str
);
226 _dispatch_source_kevent_debug(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
228 size_t offset
= _dispatch_source_debug(ds
, buf
, bufsiz
);
229 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "filter = %s }",
230 ds
->ds_dkev
? _evfiltstr(ds
->ds_dkev
->dk_kevent
.filter
) : "????");
235 _dispatch_source_init_tail_queue_array(void *context
__attribute__((unused
)))
238 for (i
= 0; i
< DSL_HASH_SIZE
; i
++) {
239 TAILQ_INIT(&_dispatch_sources
[i
]);
242 TAILQ_INSERT_TAIL(&_dispatch_sources
[DSL_HASH(DISPATCH_TIMER_INDEX_WALL
)], &_dispatch_kevent_timer
[DISPATCH_TIMER_INDEX_WALL
], dk_list
);
243 TAILQ_INSERT_TAIL(&_dispatch_sources
[DSL_HASH(DISPATCH_TIMER_INDEX_MACH
)], &_dispatch_kevent_timer
[DISPATCH_TIMER_INDEX_MACH
], dk_list
);
244 TAILQ_INSERT_TAIL(&_dispatch_sources
[0], &_dispatch_kevent_data_or
, dk_list
);
245 TAILQ_INSERT_TAIL(&_dispatch_sources
[0], &_dispatch_kevent_data_add
, dk_list
);
248 // Find existing kevents, and merge any new flags if necessary
250 _dispatch_kevent_merge(dispatch_source_t ds
)
252 static dispatch_once_t pred
;
253 dispatch_kevent_t dk
;
254 typeof(dk
->dk_kevent
.fflags
) new_flags
;
255 bool do_resume
= false;
257 if (ds
->ds_is_installed
) {
260 ds
->ds_is_installed
= true;
262 dispatch_once_f(&pred
, NULL
, _dispatch_source_init_tail_queue_array
);
264 dk
= _dispatch_kevent_find(ds
->ds_dkev
->dk_kevent
.ident
, ds
->ds_dkev
->dk_kevent
.filter
);
267 // If an existing dispatch kevent is found, check to see if new flags
268 // need to be added to the existing kevent
269 new_flags
= ~dk
->dk_kevent
.fflags
& ds
->ds_dkev
->dk_kevent
.fflags
;
270 dk
->dk_kevent
.fflags
|= ds
->ds_dkev
->dk_kevent
.fflags
;
273 do_resume
= new_flags
;
276 _dispatch_kevent_insert(dk
);
277 new_flags
= dk
->dk_kevent
.fflags
;
281 TAILQ_INSERT_TAIL(&dk
->dk_sources
, ds
, ds_list
);
283 // Re-register the kevent with the kernel if new flags were added
284 // by the dispatch kevent
286 dk
->dk_kevent
.flags
|= EV_ADD
;
287 _dispatch_kevent_resume(ds
->ds_dkev
, new_flags
, 0);
288 ds
->ds_is_armed
= true;
294 _dispatch_kevent_resume(dispatch_kevent_t dk
, uint32_t new_flags
, uint32_t del_flags
)
296 switch (dk
->dk_kevent
.filter
) {
297 case DISPATCH_EVFILT_TIMER
:
298 case DISPATCH_EVFILT_CUSTOM_ADD
:
299 case DISPATCH_EVFILT_CUSTOM_OR
:
300 // these types not registered with kevent
302 case EVFILT_MACHPORT
:
303 _dispatch_kevent_machport_resume(dk
, new_flags
, del_flags
);
306 if (dk
->dk_kevent
.flags
& EV_ONESHOT
) {
311 _dispatch_update_kq(&dk
->dk_kevent
);
312 if (dk
->dk_kevent
.flags
& EV_DISPATCH
) {
313 dk
->dk_kevent
.flags
&= ~EV_ADD
;
320 _dispatch_source_invoke(dispatch_source_t ds
)
322 // This function performs all source actions. Each action is responsible
323 // for verifying that it takes place on the appropriate queue. If the
324 // current queue is not the correct queue for this action, the correct queue
325 // will be returned and the invoke will be re-driven on that queue.
327 // The order of tests here in invoke and in probe should be consistent.
329 dispatch_queue_t dq
= _dispatch_queue_get_current();
331 if (!ds
->ds_is_installed
) {
332 // The source needs to be installed on the manager queue.
333 if (dq
!= &_dispatch_mgr_q
) {
334 return &_dispatch_mgr_q
;
336 _dispatch_kevent_merge(ds
);
337 } else if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== 0)) {
338 // The source has been cancelled and needs to be uninstalled from the
339 // manager queue. After uninstallation, the cancellation handler needs
340 // to be delivered to the target queue.
342 if (dq
!= &_dispatch_mgr_q
) {
343 return &_dispatch_mgr_q
;
345 _dispatch_kevent_release(ds
);
346 return ds
->do_targetq
;
347 } else if (ds
->ds_cancel_handler
) {
348 if (dq
!= ds
->do_targetq
) {
349 return ds
->do_targetq
;
352 _dispatch_source_cancel_callout(ds
);
353 } else if (ds
->ds_pending_data
) {
354 // The source has pending data to deliver via the event handler callback
355 // on the target queue. Some sources need to be rearmed on the manager
356 // queue after event delivery.
357 if (dq
!= ds
->do_targetq
) {
358 return ds
->do_targetq
;
360 _dispatch_source_latch_and_call(ds
);
361 if (ds
->ds_needs_rearm
) {
362 return &_dispatch_mgr_q
;
364 } else if (ds
->ds_needs_rearm
&& !ds
->ds_is_armed
) {
365 // The source needs to be rearmed on the manager queue.
366 if (dq
!= &_dispatch_mgr_q
) {
367 return &_dispatch_mgr_q
;
369 _dispatch_kevent_resume(ds
->ds_dkev
, 0, 0);
370 ds
->ds_is_armed
= true;
377 _dispatch_source_probe(dispatch_source_t ds
)
379 // This function determines whether the source needs to be invoked.
380 // The order of tests here in probe and in invoke should be consistent.
382 if (!ds
->ds_is_installed
) {
383 // The source needs to be installed on the manager queue.
385 } else if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== 0)) {
386 // The source needs to be uninstalled from the manager queue, or the
387 // cancellation handler needs to be delivered to the target queue.
388 // Note: cancellation assumes installation.
389 if (ds
->ds_dkev
|| ds
->ds_cancel_handler
) {
392 } else if (ds
->ds_pending_data
) {
393 // The source has pending data to deliver to the target queue.
395 } else if (ds
->ds_needs_rearm
&& !ds
->ds_is_armed
) {
396 // The source needs to be rearmed on the manager queue.
404 _dispatch_source_dispose(dispatch_source_t ds
)
406 _dispatch_queue_dispose((dispatch_queue_t
)ds
);
410 _dispatch_kevent_debugger2(void *context
, dispatch_source_t unused
__attribute__((unused
)))
413 socklen_t sa_len
= sizeof(sa
);
414 int c
, fd
= (int)(long)context
;
416 dispatch_kevent_t dk
;
417 dispatch_source_t ds
;
420 c
= accept(fd
, &sa
, &sa_len
);
422 if (errno
!= EAGAIN
) {
423 dispatch_assume_zero(errno
);
428 int r
= fcntl(c
, F_SETFL
, 0); // disable non-blocking IO
430 dispatch_assume_zero(errno
);
433 debug_stream
= fdopen(c
, "a");
434 if (!dispatch_assume(debug_stream
)) {
439 fprintf(debug_stream
, "HTTP/1.0 200 OK\r\n");
440 fprintf(debug_stream
, "Content-type: text/html\r\n");
441 fprintf(debug_stream
, "Pragma: nocache\r\n");
442 fprintf(debug_stream
, "\r\n");
443 fprintf(debug_stream
, "<html>\n<head><title>PID %u</title></head>\n<body>\n<ul>\n", getpid());
445 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td></tr>\n");
447 for (i
= 0; i
< DSL_HASH_SIZE
; i
++) {
448 if (TAILQ_EMPTY(&_dispatch_sources
[i
])) {
451 TAILQ_FOREACH(dk
, &_dispatch_sources
[i
], dk_list
) {
452 fprintf(debug_stream
, "\t<br><li>DK %p ident %lu filter %s flags 0x%hx fflags 0x%x data 0x%lx udata %p\n",
453 dk
, dk
->dk_kevent
.ident
, _evfiltstr(dk
->dk_kevent
.filter
), dk
->dk_kevent
.flags
,
454 dk
->dk_kevent
.fflags
, dk
->dk_kevent
.data
, dk
->dk_kevent
.udata
);
455 fprintf(debug_stream
, "\t\t<ul>\n");
456 TAILQ_FOREACH(ds
, &dk
->dk_sources
, ds_list
) {
457 fprintf(debug_stream
, "\t\t\t<li>DS %p refcnt 0x%x suspend 0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
458 ds
, ds
->do_ref_cnt
, ds
->do_suspend_cnt
, ds
->ds_pending_data
, ds
->ds_pending_data_mask
,
459 ds
->ds_atomic_flags
);
460 if (ds
->do_suspend_cnt
== DISPATCH_OBJECT_SUSPEND_LOCK
) {
461 dispatch_queue_t dq
= ds
->do_targetq
;
462 fprintf(debug_stream
, "\t\t<br>DQ: %p refcnt 0x%x suspend 0x%x label: %s\n", dq
, dq
->do_ref_cnt
, dq
->do_suspend_cnt
, dq
->dq_label
);
465 fprintf(debug_stream
, "\t\t</ul>\n");
466 fprintf(debug_stream
, "\t</li>\n");
469 fprintf(debug_stream
, "</ul>\n</body>\n</html>\n");
470 fflush(debug_stream
);
471 fclose(debug_stream
);
475 _dispatch_kevent_debugger(void *context
__attribute__((unused
)))
478 struct sockaddr_in sa_in
;
482 .sin_family
= AF_INET
,
483 .sin_addr
= { htonl(INADDR_LOOPBACK
), },
486 dispatch_source_t ds
;
488 int val
, r
, fd
, sock_opt
= 1;
489 socklen_t slen
= sizeof(sa_u
);
494 valstr
= getenv("LIBDISPATCH_DEBUGGER");
500 sa_u
.sa_in
.sin_addr
.s_addr
= 0;
502 fd
= socket(PF_INET
, SOCK_STREAM
, 0);
504 dispatch_assume_zero(errno
);
507 r
= setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&sock_opt
, (socklen_t
) sizeof sock_opt
);
509 dispatch_assume_zero(errno
);
513 r
= fcntl(fd
, F_SETFL
, O_NONBLOCK
);
515 dispatch_assume_zero(errno
);
519 r
= bind(fd
, &sa_u
.sa
, sizeof(sa_u
));
521 dispatch_assume_zero(errno
);
524 r
= listen(fd
, SOMAXCONN
);
526 dispatch_assume_zero(errno
);
529 r
= getsockname(fd
, &sa_u
.sa
, &slen
);
531 dispatch_assume_zero(errno
);
534 ds
= dispatch_source_read_create_f(fd
, NULL
, &_dispatch_mgr_q
, (void *)(long)fd
, _dispatch_kevent_debugger2
);
535 if (dispatch_assume(ds
)) {
536 _dispatch_log("LIBDISPATCH: debug port: %hu", ntohs(sa_u
.sa_in
.sin_port
));
544 _dispatch_source_drain_kevent(struct kevent
*ke
)
546 static dispatch_once_t pred
;
547 dispatch_kevent_t dk
= ke
->udata
;
548 dispatch_source_t dsi
;
550 dispatch_once_f(&pred
, NULL
, _dispatch_kevent_debugger
);
552 dispatch_debug_kevents(ke
, 1, __func__
);
554 if (ke
->filter
== EVFILT_MACHPORT
) {
555 return _dispatch_drain_mach_messages(ke
);
559 if (ke
->flags
& EV_ONESHOT
) {
560 dk
->dk_kevent
.flags
|= EV_ONESHOT
;
563 TAILQ_FOREACH(dsi
, &dk
->dk_sources
, ds_list
) {
564 _dispatch_source_merge_kevent(dsi
, ke
);
569 _dispatch_kevent_dispose(dispatch_kevent_t dk
)
573 switch (dk
->dk_kevent
.filter
) {
574 case DISPATCH_EVFILT_TIMER
:
575 case DISPATCH_EVFILT_CUSTOM_ADD
:
576 case DISPATCH_EVFILT_CUSTOM_OR
:
577 // these sources live on statically allocated lists
579 case EVFILT_MACHPORT
:
580 _dispatch_kevent_machport_resume(dk
, 0, dk
->dk_kevent
.fflags
);
583 if (dk
->dk_kevent
.flags
& EV_ONESHOT
) {
584 break; // implicitly deleted
588 if (~dk
->dk_kevent
.flags
& EV_DELETE
) {
589 dk
->dk_kevent
.flags
|= EV_DELETE
;
590 _dispatch_update_kq(&dk
->dk_kevent
);
595 if (dk
->dk_kevent
.filter
== EVFILT_MACHPORT
) {
596 key
= MACH_PORT_INDEX(dk
->dk_kevent
.ident
);
598 key
= dk
->dk_kevent
.ident
;
601 TAILQ_REMOVE(&_dispatch_sources
[DSL_HASH(key
)], dk
, dk_list
);
606 _dispatch_kevent_release(dispatch_source_t ds
)
608 dispatch_kevent_t dk
= ds
->ds_dkev
;
609 dispatch_source_t dsi
;
610 uint32_t del_flags
, fflags
= 0;
614 TAILQ_REMOVE(&dk
->dk_sources
, ds
, ds_list
);
616 if (TAILQ_EMPTY(&dk
->dk_sources
)) {
617 _dispatch_kevent_dispose(dk
);
619 TAILQ_FOREACH(dsi
, &dk
->dk_sources
, ds_list
) {
620 fflags
|= (uint32_t)dsi
->ds_pending_data_mask
;
622 del_flags
= (uint32_t)ds
->ds_pending_data_mask
& ~fflags
;
624 dk
->dk_kevent
.flags
|= EV_ADD
;
625 dk
->dk_kevent
.fflags
= fflags
;
626 _dispatch_kevent_resume(dk
, 0, del_flags
);
630 ds
->ds_is_armed
= false;
631 ds
->ds_needs_rearm
= false; // re-arm is pointless and bad now
632 _dispatch_release(ds
); // the retain is done at creation time
636 _dispatch_source_merge_kevent(dispatch_source_t ds
, const struct kevent
*ke
)
640 if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== 0)) {
644 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie.
645 // We simulate an exit event in this case. <rdar://problem/5067725>
646 if (ke
->flags
& EV_ERROR
) {
647 if (ke
->filter
== EVFILT_PROC
&& ke
->data
== ESRCH
) {
649 fake
.flags
&= ~EV_ERROR
;
650 fake
.fflags
= NOTE_EXIT
;
654 // log the unexpected error
655 dispatch_assume_zero(ke
->data
);
660 if (ds
->ds_is_level
) {
661 // ke->data is signed and "negative available data" makes no sense
662 // zero bytes happens when EV_EOF is set
663 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
664 dispatch_assert(ke
->data
>= 0l);
665 ds
->ds_pending_data
= ~ke
->data
;
666 } else if (ds
->ds_is_adder
) {
667 dispatch_atomic_add(&ds
->ds_pending_data
, ke
->data
);
669 dispatch_atomic_or(&ds
->ds_pending_data
, ke
->fflags
& ds
->ds_pending_data_mask
);
672 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
673 if (ds
->ds_needs_rearm
) {
674 ds
->ds_is_armed
= false;
677 _dispatch_wakeup(ds
);
681 _dispatch_source_latch_and_call(dispatch_source_t ds
)
685 if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== 0)) {
688 prev
= dispatch_atomic_xchg(&ds
->ds_pending_data
, 0);
689 if (ds
->ds_is_level
) {
694 if (dispatch_assume(prev
)) {
695 if (ds
->ds_handler_func
) {
696 ds
->ds_handler_func(ds
->ds_handler_ctxt
, ds
);
702 _dispatch_source_cancel_callout(dispatch_source_t ds
)
704 ds
->ds_pending_data_mask
= 0;
705 ds
->ds_pending_data
= 0;
709 if (ds
->ds_handler_is_block
) {
710 Block_release(ds
->ds_handler_ctxt
);
711 ds
->ds_handler_is_block
= false;
712 ds
->ds_handler_func
= NULL
;
713 ds
->ds_handler_ctxt
= NULL
;
717 if (!ds
->ds_cancel_handler
) {
720 if (ds
->ds_cancel_is_block
) {
722 dispatch_block_t b
= ds
->ds_cancel_handler
;
723 if (ds
->ds_atomic_flags
& DSF_CANCELED
) {
726 Block_release(ds
->ds_cancel_handler
);
727 ds
->ds_cancel_is_block
= false;
730 dispatch_function_t f
= ds
->ds_cancel_handler
;
731 if (ds
->ds_atomic_flags
& DSF_CANCELED
) {
735 ds
->ds_cancel_handler
= NULL
;
738 const struct dispatch_source_vtable_s _dispatch_source_kevent_vtable
= {
739 .do_type
= DISPATCH_SOURCE_KEVENT_TYPE
,
740 .do_kind
= "kevent-source",
741 .do_invoke
= _dispatch_source_invoke
,
742 .do_dispose
= _dispatch_source_dispose
,
743 .do_probe
= _dispatch_source_probe
,
744 .do_debug
= _dispatch_source_kevent_debug
,
748 dispatch_source_merge_data(dispatch_source_t ds
, unsigned long val
)
750 struct kevent kev
= {
751 .fflags
= (typeof(kev
.fflags
))val
,
755 dispatch_assert(ds
->ds_dkev
->dk_kevent
.filter
== DISPATCH_EVFILT_CUSTOM_ADD
||
756 ds
->ds_dkev
->dk_kevent
.filter
== DISPATCH_EVFILT_CUSTOM_OR
);
758 _dispatch_source_merge_kevent(ds
, &kev
);
762 dispatch_source_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
764 dispatch_queue_t target
= ds
->do_targetq
;
765 return snprintf(buf
, bufsiz
,
766 "target = %s[%p], pending_data = 0x%lx, pending_data_mask = 0x%lx, ",
767 target
? target
->dq_label
: "", target
,
768 ds
->ds_pending_data
, ds
->ds_pending_data_mask
);
772 _dispatch_source_debug(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
775 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ", dx_kind(ds
), ds
);
776 offset
+= dispatch_object_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
777 offset
+= dispatch_source_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
781 #ifndef DISPATCH_NO_LEGACY
783 dispatch_source_attr_dispose(dispatch_source_attr_t attr
)
785 // release the finalizer block if necessary
786 dispatch_source_attr_set_finalizer(attr
, NULL
);
787 _dispatch_dispose(attr
);
790 static const struct dispatch_source_attr_vtable_s dispatch_source_attr_vtable
= {
791 .do_type
= DISPATCH_SOURCE_ATTR_TYPE
,
792 .do_kind
= "source-attr",
793 .do_dispose
= dispatch_source_attr_dispose
,
796 dispatch_source_attr_t
797 dispatch_source_attr_create(void)
799 dispatch_source_attr_t rval
= calloc(1, sizeof(struct dispatch_source_attr_s
));
802 rval
->do_vtable
= &dispatch_source_attr_vtable
;
803 rval
->do_next
= DISPATCH_OBJECT_LISTLESS
;
804 rval
->do_targetq
= dispatch_get_global_queue(0, 0);
805 rval
->do_ref_cnt
= 1;
806 rval
->do_xref_cnt
= 1;
813 dispatch_source_attr_set_finalizer_f(dispatch_source_attr_t attr
,
814 void *context
, dispatch_source_finalizer_function_t finalizer
)
817 if (attr
->finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
818 Block_release(attr
->finalizer_ctxt
);
822 attr
->finalizer_ctxt
= context
;
823 attr
->finalizer_func
= finalizer
;
828 dispatch_source_attr_set_finalizer(dispatch_source_attr_t attr
,
829 dispatch_source_finalizer_t finalizer
)
832 dispatch_source_finalizer_function_t func
;
835 if (!(ctxt
= Block_copy(finalizer
))) {
838 func
= (void *)_dispatch_call_block_and_release2
;
844 dispatch_source_attr_set_finalizer_f(attr
, ctxt
, func
);
849 dispatch_source_finalizer_t
850 dispatch_source_attr_get_finalizer(dispatch_source_attr_t attr
)
852 if (attr
->finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
853 return (dispatch_source_finalizer_t
)attr
->finalizer_ctxt
;
854 } else if (attr
->finalizer_func
== NULL
) {
857 abort(); // finalizer is not a block...
863 dispatch_source_attr_set_context(dispatch_source_attr_t attr
, void *context
)
865 attr
->context
= context
;
868 dispatch_source_attr_t
869 dispatch_source_attr_copy(dispatch_source_attr_t proto
)
871 dispatch_source_attr_t rval
= NULL
;
873 if (proto
&& (rval
= malloc(sizeof(struct dispatch_source_attr_s
)))) {
874 memcpy(rval
, proto
, sizeof(struct dispatch_source_attr_s
));
876 if (rval
->finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
877 rval
->finalizer_ctxt
= Block_copy(rval
->finalizer_ctxt
);
881 rval
= dispatch_source_attr_create();
885 #endif /* DISPATCH_NO_LEGACY */
888 struct dispatch_source_type_s
{
893 const struct dispatch_source_type_s _dispatch_source_type_timer
= {
895 .filter
= DISPATCH_EVFILT_TIMER
,
897 .mask
= DISPATCH_TIMER_INTERVAL
|DISPATCH_TIMER_ONESHOT
|DISPATCH_TIMER_ABSOLUTE
|DISPATCH_TIMER_WALL_CLOCK
,
900 const struct dispatch_source_type_s _dispatch_source_type_read
= {
902 .filter
= EVFILT_READ
,
903 .flags
= EV_DISPATCH
,
907 const struct dispatch_source_type_s _dispatch_source_type_write
= {
909 .filter
= EVFILT_WRITE
,
910 .flags
= EV_DISPATCH
,
914 const struct dispatch_source_type_s _dispatch_source_type_proc
= {
916 .filter
= EVFILT_PROC
,
919 .mask
= NOTE_EXIT
|NOTE_FORK
|NOTE_EXEC
|NOTE_SIGNAL
|NOTE_REAP
,
922 const struct dispatch_source_type_s _dispatch_source_type_signal
= {
924 .filter
= EVFILT_SIGNAL
,
928 const struct dispatch_source_type_s _dispatch_source_type_vnode
= {
930 .filter
= EVFILT_VNODE
,
933 .mask
= NOTE_DELETE
|NOTE_WRITE
|NOTE_EXTEND
|NOTE_ATTRIB
|NOTE_LINK
|NOTE_RENAME
|NOTE_REVOKE
|NOTE_NONE
,
936 const struct dispatch_source_type_s _dispatch_source_type_vfs
= {
941 .mask
= VQ_NOTRESP
|VQ_NEEDAUTH
|VQ_LOWDISK
|VQ_MOUNT
|VQ_UNMOUNT
|VQ_DEAD
|VQ_ASSIST
|VQ_NOTRESPLOCK
|VQ_UPDATE
|VQ_VERYLOWDISK
,
944 const struct dispatch_source_type_s _dispatch_source_type_mach_send
= {
946 .filter
= EVFILT_MACHPORT
,
947 .flags
= EV_DISPATCH
,
948 .fflags
= DISPATCH_MACHPORT_DEAD
,
950 .mask
= DISPATCH_MACH_SEND_DEAD
,
953 const struct dispatch_source_type_s _dispatch_source_type_mach_recv
= {
955 .filter
= EVFILT_MACHPORT
,
956 .flags
= EV_DISPATCH
,
957 .fflags
= DISPATCH_MACHPORT_RECV
,
961 const struct dispatch_source_type_s _dispatch_source_type_data_add
= {
963 .filter
= DISPATCH_EVFILT_CUSTOM_ADD
,
967 const struct dispatch_source_type_s _dispatch_source_type_data_or
= {
969 .filter
= DISPATCH_EVFILT_CUSTOM_OR
,
976 dispatch_source_create(dispatch_source_type_t type
,
981 const struct kevent
*proto_kev
= &type
->ke
;
982 dispatch_source_t ds
= NULL
;
983 dispatch_kevent_t dk
= NULL
;
986 if (type
== NULL
|| (mask
& ~type
->mask
)) {
990 switch (type
->ke
.filter
) {
992 if (handle
>= NSIG
) {
997 case DISPATCH_EVFILT_CUSTOM_ADD
:
998 case DISPATCH_EVFILT_CUSTOM_OR
:
999 case DISPATCH_EVFILT_TIMER
:
1008 ds
= calloc(1ul, sizeof(struct dispatch_source_s
));
1009 if (slowpath(!ds
)) {
1012 dk
= calloc(1ul, sizeof(struct dispatch_kevent_s
));
1013 if (slowpath(!dk
)) {
1017 dk
->dk_kevent
= *proto_kev
;
1018 dk
->dk_kevent
.ident
= handle
;
1019 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
1020 dk
->dk_kevent
.fflags
|= (uint32_t)mask
;
1021 dk
->dk_kevent
.udata
= dk
;
1022 TAILQ_INIT(&dk
->dk_sources
);
1024 // Initialize as a queue first, then override some settings below.
1025 _dispatch_queue_init((dispatch_queue_t
)ds
);
1026 strlcpy(ds
->dq_label
, "source", sizeof(ds
->dq_label
));
1029 ds
->do_vtable
= &_dispatch_source_kevent_vtable
;
1030 ds
->do_ref_cnt
++; // the reference the manger queue holds
1031 ds
->do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_INTERVAL
;
1032 // do_targetq will be retained below, past point of no-return
1036 ds
->ds_ident_hack
= dk
->dk_kevent
.ident
;
1038 ds
->ds_pending_data_mask
= dk
->dk_kevent
.fflags
;
1039 if ((EV_DISPATCH
|EV_ONESHOT
) & proto_kev
->flags
) {
1040 if (proto_kev
->filter
!= EVFILT_MACHPORT
) {
1041 ds
->ds_is_level
= true;
1043 ds
->ds_needs_rearm
= true;
1044 } else if (!(EV_CLEAR
& proto_kev
->flags
)) {
1045 // we cheat and use EV_CLEAR to mean a "flag thingy"
1046 ds
->ds_is_adder
= true;
1049 // If its a timer source, it needs to be re-armed
1050 if (type
->ke
.filter
== DISPATCH_EVFILT_TIMER
) {
1051 ds
->ds_needs_rearm
= true;
1054 dispatch_assert(!(ds
->ds_is_level
&& ds
->ds_is_adder
));
1056 dispatch_debug(ds
, __FUNCTION__
);
1059 // Some sources require special processing
1060 if (type
== DISPATCH_SOURCE_TYPE_MACH_SEND
) {
1061 static dispatch_once_t pred
;
1062 dispatch_once_f(&pred
, NULL
, _dispatch_mach_notify_source_init
);
1063 } else if (type
== DISPATCH_SOURCE_TYPE_TIMER
) {
1064 ds
->ds_timer
.flags
= mask
;
1067 _dispatch_retain(ds
->do_targetq
);
1076 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1078 _dispatch_source_set_event_handler2(void *context
)
1080 struct Block_layout
*bl
= context
;
1082 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
1083 dispatch_assert(ds
->do_vtable
== &_dispatch_source_kevent_vtable
);
1085 if (ds
->ds_handler_is_block
&& ds
->ds_handler_ctxt
) {
1086 Block_release(ds
->ds_handler_ctxt
);
1088 ds
->ds_handler_func
= bl
? (void *)bl
->invoke
: NULL
;
1089 ds
->ds_handler_ctxt
= bl
;
1090 ds
->ds_handler_is_block
= true;
1094 dispatch_source_set_event_handler(dispatch_source_t ds
, dispatch_block_t handler
)
1096 dispatch_assert(!ds
->ds_is_legacy
);
1097 handler
= _dispatch_Block_copy(handler
);
1098 dispatch_barrier_async_f((dispatch_queue_t
)ds
,
1099 handler
, _dispatch_source_set_event_handler2
);
1103 _dispatch_source_set_event_handler_f(void *context
)
1105 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
1106 dispatch_assert(ds
->do_vtable
== &_dispatch_source_kevent_vtable
);
1108 if (ds
->ds_handler_is_block
&& ds
->ds_handler_ctxt
) {
1109 Block_release(ds
->ds_handler_ctxt
);
1111 ds
->ds_handler_func
= context
;
1112 ds
->ds_handler_ctxt
= ds
->do_ctxt
;
1113 ds
->ds_handler_is_block
= false;
1117 dispatch_source_set_event_handler_f(dispatch_source_t ds
,
1118 dispatch_function_t handler
)
1120 dispatch_assert(!ds
->ds_is_legacy
);
1121 dispatch_barrier_async_f((dispatch_queue_t
)ds
,
1122 handler
, _dispatch_source_set_event_handler_f
);
1125 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1127 _dispatch_source_set_cancel_handler2(void *context
)
1129 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
1130 dispatch_assert(ds
->do_vtable
== &_dispatch_source_kevent_vtable
);
1132 if (ds
->ds_cancel_is_block
&& ds
->ds_cancel_handler
) {
1133 Block_release(ds
->ds_cancel_handler
);
1135 ds
->ds_cancel_handler
= context
;
1136 ds
->ds_cancel_is_block
= true;
1140 dispatch_source_set_cancel_handler(dispatch_source_t ds
,
1141 dispatch_block_t handler
)
1143 dispatch_assert(!ds
->ds_is_legacy
);
1144 handler
= _dispatch_Block_copy(handler
);
1145 dispatch_barrier_async_f((dispatch_queue_t
)ds
,
1146 handler
, _dispatch_source_set_cancel_handler2
);
1150 _dispatch_source_set_cancel_handler_f(void *context
)
1152 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
1153 dispatch_assert(ds
->do_vtable
== &_dispatch_source_kevent_vtable
);
1155 if (ds
->ds_cancel_is_block
&& ds
->ds_cancel_handler
) {
1156 Block_release(ds
->ds_cancel_handler
);
1158 ds
->ds_cancel_handler
= context
;
1159 ds
->ds_cancel_is_block
= false;
1163 dispatch_source_set_cancel_handler_f(dispatch_source_t ds
,
1164 dispatch_function_t handler
)
1166 dispatch_assert(!ds
->ds_is_legacy
);
1167 dispatch_barrier_async_f((dispatch_queue_t
)ds
,
1168 handler
, _dispatch_source_set_cancel_handler_f
);
1171 #ifndef DISPATCH_NO_LEGACY
1172 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1174 _dispatch_source_create2(dispatch_source_t ds
,
1175 dispatch_source_attr_t attr
,
1177 dispatch_source_handler_function_t handler
)
1179 if (ds
== NULL
|| handler
== NULL
) {
1183 ds
->ds_is_legacy
= true;
1185 ds
->ds_handler_func
= handler
;
1186 ds
->ds_handler_ctxt
= context
;
1188 if (attr
&& attr
!= DISPATCH_SOURCE_CREATE_SUSPENDED
) {
1189 ds
->dq_finalizer_ctxt
= attr
->finalizer_ctxt
;
1190 ds
->dq_finalizer_func
= (typeof(ds
->dq_finalizer_func
))attr
->finalizer_func
;
1191 ds
->do_ctxt
= attr
->context
;
1194 if (ds
->dq_finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
1195 ds
->dq_finalizer_ctxt
= Block_copy(ds
->dq_finalizer_ctxt
);
1196 if (!ds
->dq_finalizer_ctxt
) {
1200 if (handler
== _dispatch_source_call_block
) {
1201 struct Block_layout
*bl
= ds
->ds_handler_ctxt
= Block_copy(context
);
1202 if (!ds
->ds_handler_ctxt
) {
1203 if (ds
->dq_finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
1204 Block_release(ds
->dq_finalizer_ctxt
);
1208 ds
->ds_handler_func
= (void *)bl
->invoke
;
1209 ds
->ds_handler_is_block
= true;
1212 // all legacy sources get a cancellation event on the normal event handler.
1213 dispatch_source_handler_function_t func
= ds
->ds_handler_func
;
1214 dispatch_source_handler_t block
= ds
->ds_handler_ctxt
;
1215 void *ctxt
= ds
->ds_handler_ctxt
;
1216 bool handler_is_block
= ds
->ds_handler_is_block
;
1218 ds
->ds_cancel_is_block
= true;
1219 if (handler_is_block
) {
1220 ds
->ds_cancel_handler
= _dispatch_Block_copy(^{
1224 ds
->ds_cancel_handler
= _dispatch_Block_copy(^{
1229 if (attr
!= DISPATCH_SOURCE_CREATE_SUSPENDED
) {
1230 dispatch_resume(ds
);
1241 dispatch_source_get_error(dispatch_source_t ds
, long *err_out
)
1243 // 6863892 don't report ECANCELED until kevent is unregistered
1244 if ((ds
->ds_atomic_flags
& DSF_CANCELED
) && !ds
->ds_dkev
) {
1246 *err_out
= ECANCELED
;
1248 return DISPATCH_ERROR_DOMAIN_POSIX
;
1250 return DISPATCH_ERROR_DOMAIN_NO_ERROR
;
1253 #endif /* DISPATCH_NO_LEGACY */
1255 // Updates the ordered list of timers based on next fire date for changes to ds.
1256 // Should only be called from the context of _dispatch_mgr_q.
1258 _dispatch_timer_list_update(dispatch_source_t ds
)
1260 dispatch_source_t dsi
= NULL
;
1263 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q
);
1265 // do not reschedule timers unregistered with _dispatch_kevent_release()
1270 // Ensure the source is on the global kevent lists before it is removed and
1272 _dispatch_kevent_merge(ds
);
1274 TAILQ_REMOVE(&ds
->ds_dkev
->dk_sources
, ds
, ds_list
);
1276 // change the list if the clock type has changed
1277 if (ds
->ds_timer
.flags
& DISPATCH_TIMER_WALL_CLOCK
) {
1278 idx
= DISPATCH_TIMER_INDEX_WALL
;
1280 idx
= DISPATCH_TIMER_INDEX_MACH
;
1282 ds
->ds_dkev
= &_dispatch_kevent_timer
[idx
];
1284 if (ds
->ds_timer
.target
) {
1285 TAILQ_FOREACH(dsi
, &ds
->ds_dkev
->dk_sources
, ds_list
) {
1286 if (dsi
->ds_timer
.target
== 0 || ds
->ds_timer
.target
< dsi
->ds_timer
.target
) {
1293 TAILQ_INSERT_BEFORE(dsi
, ds
, ds_list
);
1295 TAILQ_INSERT_TAIL(&ds
->ds_dkev
->dk_sources
, ds
, ds_list
);
1300 _dispatch_run_timers2(unsigned int timer
)
1302 dispatch_source_t ds
;
1303 uint64_t now
, missed
;
1305 if (timer
== DISPATCH_TIMER_INDEX_MACH
) {
1306 now
= mach_absolute_time();
1308 now
= _dispatch_get_nanoseconds();
1311 while ((ds
= TAILQ_FIRST(&_dispatch_kevent_timer
[timer
].dk_sources
))) {
1312 // We may find timers on the wrong list due to a pending update from
1313 // dispatch_source_set_timer. Force an update of the list in that case.
1314 if (timer
!= ds
->ds_ident_hack
) {
1315 _dispatch_timer_list_update(ds
);
1318 if (!ds
->ds_timer
.target
) {
1319 // no configured timers on the list
1322 if (ds
->ds_timer
.target
> now
) {
1323 // Done running timers for now.
1327 if (ds
->ds_timer
.flags
& (DISPATCH_TIMER_ONESHOT
|DISPATCH_TIMER_ABSOLUTE
)) {
1328 dispatch_atomic_inc(&ds
->ds_pending_data
);
1329 ds
->ds_timer
.target
= 0;
1331 // Calculate number of missed intervals.
1332 missed
= (now
- ds
->ds_timer
.target
) / ds
->ds_timer
.interval
;
1333 dispatch_atomic_add(&ds
->ds_pending_data
, missed
+ 1);
1334 ds
->ds_timer
.target
+= (missed
+ 1) * ds
->ds_timer
.interval
;
1337 _dispatch_timer_list_update(ds
);
1338 _dispatch_wakeup(ds
);
1343 _dispatch_run_timers(void)
1346 for (i
= 0; i
< DISPATCH_TIMER_COUNT
; i
++) {
1347 _dispatch_run_timers2(i
);
1351 #if defined(__i386__) || defined(__x86_64__)
1352 // these architectures always return mach_absolute_time() in nanoseconds
1353 #define _dispatch_convert_mach2nano(x) (x)
1354 #define _dispatch_convert_nano2mach(x) (x)
1356 static mach_timebase_info_data_t tbi
;
1357 static dispatch_once_t tbi_pred
;
1360 _dispatch_convert_init(void *context
__attribute__((unused
)))
1362 dispatch_assume_zero(mach_timebase_info(&tbi
));
1366 _dispatch_convert_mach2nano(uint64_t val
)
1374 dispatch_once_f(&tbi_pred
, NULL
, _dispatch_convert_init
);
1384 _dispatch_convert_nano2mach(uint64_t val
)
1392 dispatch_once_f(&tbi_pred
, NULL
, _dispatch_convert_init
);
1402 // approx 1 year (60s * 60m * 24h * 365d)
1403 #define FOREVER_SEC 3153600l
1404 #define FOREVER_NSEC 31536000000000000ull
1407 _dispatch_get_next_timer_fire(struct timespec
*howsoon
)
1409 // <rdar://problem/6459649>
1410 // kevent(2) does not allow large timeouts, so we use a long timeout
1411 // instead (approximately 1 year).
1412 dispatch_source_t ds
= NULL
;
1414 uint64_t now
, delta_tmp
, delta
= UINT64_MAX
;
1416 // We are looking for the first unsuspended timer which has its target
1417 // time set. Given timers are kept in order, if we hit an timer that's
1418 // unset there's no point in continuing down the list.
1419 for (timer
= 0; timer
< DISPATCH_TIMER_COUNT
; timer
++) {
1420 TAILQ_FOREACH(ds
, &_dispatch_kevent_timer
[timer
].dk_sources
, ds_list
) {
1421 if (!ds
->ds_timer
.target
) {
1424 if (DISPATCH_OBJECT_SUSPENDED(ds
)) {
1425 ds
->ds_is_armed
= false;
1431 if (!ds
|| !ds
->ds_timer
.target
) {
1435 if (ds
->ds_timer
.flags
& DISPATCH_TIMER_WALL_CLOCK
) {
1436 now
= _dispatch_get_nanoseconds();
1438 now
= mach_absolute_time();
1440 if (ds
->ds_timer
.target
<= now
) {
1441 howsoon
->tv_sec
= 0;
1442 howsoon
->tv_nsec
= 0;
1446 // the subtraction cannot go negative because the previous "if"
1447 // verified that the target is greater than now.
1448 delta_tmp
= ds
->ds_timer
.target
- now
;
1449 if (!(ds
->ds_timer
.flags
& DISPATCH_TIMER_WALL_CLOCK
)) {
1450 delta_tmp
= _dispatch_convert_mach2nano(delta_tmp
);
1452 if (delta_tmp
< delta
) {
1456 if (slowpath(delta
> FOREVER_NSEC
)) {
1459 howsoon
->tv_sec
= (time_t)(delta
/ NSEC_PER_SEC
);
1460 howsoon
->tv_nsec
= (long)(delta
% NSEC_PER_SEC
);
1465 struct dispatch_set_timer_params
{
1466 dispatch_source_t ds
;
1468 struct dispatch_timer_source_s values
;
1471 // To be called from the context of the _dispatch_mgr_q
1473 _dispatch_source_set_timer2(void *context
)
1475 struct dispatch_set_timer_params
*params
= context
;
1476 dispatch_source_t ds
= params
->ds
;
1477 ds
->ds_ident_hack
= params
->ident
;
1478 ds
->ds_timer
= params
->values
;
1479 _dispatch_timer_list_update(ds
);
1480 dispatch_resume(ds
);
1481 dispatch_release(ds
);
1486 dispatch_source_set_timer(dispatch_source_t ds
,
1487 dispatch_time_t start
,
1491 struct dispatch_set_timer_params
*params
;
1493 // we use zero internally to mean disabled
1494 if (interval
== 0) {
1496 } else if ((int64_t)interval
< 0) {
1497 // 6866347 - make sure nanoseconds won't overflow
1498 interval
= INT64_MAX
;
1501 // Suspend the source so that it doesn't fire with pending changes
1502 // The use of suspend/resume requires the external retain/release
1503 dispatch_retain(ds
);
1504 dispatch_suspend(ds
);
1506 if (start
== DISPATCH_TIME_NOW
) {
1507 start
= mach_absolute_time();
1508 } else if (start
== DISPATCH_TIME_FOREVER
) {
1512 while (!(params
= malloc(sizeof(struct dispatch_set_timer_params
)))) {
1517 params
->values
.flags
= ds
->ds_timer
.flags
;
1519 if ((int64_t)start
< 0) {
1521 params
->ident
= DISPATCH_TIMER_INDEX_WALL
;
1522 params
->values
.start
= -((int64_t)start
);
1523 params
->values
.target
= -((int64_t)start
);
1524 params
->values
.interval
= interval
;
1525 params
->values
.leeway
= leeway
;
1526 params
->values
.flags
|= DISPATCH_TIMER_WALL_CLOCK
;
1529 params
->ident
= DISPATCH_TIMER_INDEX_MACH
;
1530 params
->values
.start
= start
;
1531 params
->values
.target
= start
;
1532 params
->values
.interval
= _dispatch_convert_nano2mach(interval
);
1533 params
->values
.leeway
= _dispatch_convert_nano2mach(leeway
);
1534 params
->values
.flags
&= ~DISPATCH_TIMER_WALL_CLOCK
;
1537 dispatch_barrier_async_f(&_dispatch_mgr_q
, params
, _dispatch_source_set_timer2
);
1540 #ifndef DISPATCH_NO_LEGACY
1543 dispatch_source_timer_set_time(dispatch_source_t ds
, uint64_t nanoseconds
, uint64_t leeway
)
1545 dispatch_time_t start
;
1546 if (nanoseconds
== 0) {
1549 if (ds
->ds_timer
.flags
== (DISPATCH_TIMER_ABSOLUTE
|DISPATCH_TIMER_WALL_CLOCK
)) {
1550 static const struct timespec t0
;
1551 start
= dispatch_walltime(&t0
, nanoseconds
);
1552 } else if (ds
->ds_timer
.flags
& DISPATCH_TIMER_WALL_CLOCK
) {
1553 start
= dispatch_walltime(DISPATCH_TIME_NOW
, nanoseconds
);
1555 start
= dispatch_time(DISPATCH_TIME_NOW
, nanoseconds
);
1557 if (ds
->ds_timer
.flags
& (DISPATCH_TIMER_ABSOLUTE
|DISPATCH_TIMER_ONESHOT
)) {
1558 // 6866347 - make sure nanoseconds won't overflow
1559 nanoseconds
= INT64_MAX
; // non-repeating (~292 years)
1561 dispatch_source_set_timer(ds
, start
, nanoseconds
, leeway
);
1567 dispatch_event_get_nanoseconds(dispatch_source_t ds
)
1569 if (ds
->ds_timer
.flags
& DISPATCH_TIMER_WALL_CLOCK
) {
1570 return ds
->ds_timer
.interval
;
1572 return _dispatch_convert_mach2nano(ds
->ds_timer
.interval
);
1575 #endif /* DISPATCH_NO_LEGACY */
1577 static dispatch_source_t _dispatch_mach_notify_source
;
1578 static mach_port_t _dispatch_port_set
;
1579 static mach_port_t _dispatch_event_port
;
1581 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
1582 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
1584 #define _DISPATCH_MACHPORT_HASH_SIZE 32
1585 #define _DISPATCH_MACHPORT_HASH(x) _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
1587 static void _dispatch_port_set_init(void *);
1588 static mach_port_t
_dispatch_get_port_set(void);
1591 _dispatch_drain_mach_messages(struct kevent
*ke
)
1593 dispatch_source_t dsi
;
1594 dispatch_kevent_t dk
;
1597 if (!dispatch_assume(ke
->data
)) {
1600 dk
= _dispatch_kevent_find(ke
->data
, EVFILT_MACHPORT
);
1601 if (!dispatch_assume(dk
)) {
1604 _dispatch_kevent_machport_disable(dk
); // emulate EV_DISPATCH
1606 EV_SET(&ke2
, ke
->data
, EVFILT_MACHPORT
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
, DISPATCH_MACHPORT_RECV
, 0, dk
);
1608 TAILQ_FOREACH(dsi
, &dk
->dk_sources
, ds_list
) {
1609 _dispatch_source_merge_kevent(dsi
, &ke2
);
1614 _dispatch_port_set_init(void *context
__attribute__((unused
)))
1616 struct kevent kev
= {
1617 .filter
= EVFILT_MACHPORT
,
1622 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET
, &_dispatch_port_set
);
1623 DISPATCH_VERIFY_MIG(kr
);
1624 dispatch_assume_zero(kr
);
1625 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
, &_dispatch_event_port
);
1626 DISPATCH_VERIFY_MIG(kr
);
1627 dispatch_assume_zero(kr
);
1628 kr
= mach_port_move_member(mach_task_self(), _dispatch_event_port
, _dispatch_port_set
);
1629 DISPATCH_VERIFY_MIG(kr
);
1630 dispatch_assume_zero(kr
);
1632 kev
.ident
= _dispatch_port_set
;
1634 _dispatch_update_kq(&kev
);
1638 _dispatch_get_port_set(void)
1640 static dispatch_once_t pred
;
1642 dispatch_once_f(&pred
, NULL
, _dispatch_port_set_init
);
1644 return _dispatch_port_set
;
1648 _dispatch_kevent_machport_resume(dispatch_kevent_t dk
, uint32_t new_flags
, uint32_t del_flags
)
1650 mach_port_t previous
, port
= (mach_port_t
)dk
->dk_kevent
.ident
;
1653 if ((new_flags
& DISPATCH_MACHPORT_RECV
) || (!new_flags
&& !del_flags
&& dk
->dk_kevent
.fflags
& DISPATCH_MACHPORT_RECV
)) {
1654 _dispatch_kevent_machport_enable(dk
);
1656 if (new_flags
& DISPATCH_MACHPORT_DEAD
) {
1657 kr
= mach_port_request_notification(mach_task_self(), port
, MACH_NOTIFY_DEAD_NAME
, 1,
1658 _dispatch_event_port
, MACH_MSG_TYPE_MAKE_SEND_ONCE
, &previous
);
1659 DISPATCH_VERIFY_MIG(kr
);
1663 case KERN_INVALID_NAME
:
1664 case KERN_INVALID_RIGHT
:
1668 // Else, we dont expect any errors from mach. Log any errors if we do
1669 if (dispatch_assume_zero(kr
)) {
1671 } else if (dispatch_assume_zero(previous
)) {
1672 // Another subsystem has beat libdispatch to requesting the Mach
1673 // dead-name notification on this port. We should technically cache the
1674 // previous port and message it when the kernel messages our port. Or
1675 // we can just say screw those subsystems and drop the previous port.
1676 // They should adopt libdispatch :-P
1677 kr
= mach_port_deallocate(mach_task_self(), previous
);
1678 DISPATCH_VERIFY_MIG(kr
);
1679 dispatch_assume_zero(kr
);
1684 if (del_flags
& DISPATCH_MACHPORT_RECV
) {
1685 _dispatch_kevent_machport_disable(dk
);
1687 if (del_flags
& DISPATCH_MACHPORT_DEAD
) {
1688 kr
= mach_port_request_notification(mach_task_self(), (mach_port_t
)dk
->dk_kevent
.ident
,
1689 MACH_NOTIFY_DEAD_NAME
, 1, MACH_PORT_NULL
, MACH_MSG_TYPE_MAKE_SEND_ONCE
, &previous
);
1690 DISPATCH_VERIFY_MIG(kr
);
1693 case KERN_INVALID_NAME
:
1694 case KERN_INVALID_RIGHT
:
1695 case KERN_INVALID_ARGUMENT
:
1698 if (dispatch_assume_zero(kr
)) {
1700 } else if (previous
) {
1701 // the kernel has not consumed the right yet
1702 dispatch_assume_zero(_dispatch_send_consume_send_once_right(previous
));
1709 _dispatch_kevent_machport_enable(dispatch_kevent_t dk
)
1711 mach_port_t mp
= (mach_port_t
)dk
->dk_kevent
.ident
;
1714 kr
= mach_port_move_member(mach_task_self(), mp
, _dispatch_get_port_set());
1715 DISPATCH_VERIFY_MIG(kr
);
1717 case KERN_INVALID_NAME
:
1719 _dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp
);
1723 dispatch_assume_zero(kr
);
1728 _dispatch_kevent_machport_disable(dispatch_kevent_t dk
)
1730 mach_port_t mp
= (mach_port_t
)dk
->dk_kevent
.ident
;
1733 kr
= mach_port_move_member(mach_task_self(), mp
, 0);
1734 DISPATCH_VERIFY_MIG(kr
);
1736 case KERN_INVALID_RIGHT
:
1737 case KERN_INVALID_NAME
:
1739 _dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp
);
1745 dispatch_assume_zero(kr
);
1750 #define _DISPATCH_MIN_MSG_SZ (8ul * 1024ul - MAX_TRAILER_SIZE)
1751 #ifndef DISPATCH_NO_LEGACY
1753 dispatch_source_mig_create(mach_port_t mport
, size_t max_msg_size
, dispatch_source_attr_t attr
,
1754 dispatch_queue_t dq
, dispatch_mig_callback_t mig_callback
)
1756 if (max_msg_size
< _DISPATCH_MIN_MSG_SZ
) {
1757 max_msg_size
= _DISPATCH_MIN_MSG_SZ
;
1759 return dispatch_source_machport_create(mport
, DISPATCH_MACHPORT_RECV
, attr
, dq
,
1760 ^(dispatch_source_t ds
) {
1761 if (!dispatch_source_get_error(ds
, NULL
)) {
1762 if (dq
->dq_width
!= 1) {
1763 dispatch_retain(ds
); // this is a shim -- use the external retain
1764 dispatch_async(dq
, ^{
1765 dispatch_mig_server(ds
, max_msg_size
, mig_callback
);
1766 dispatch_release(ds
); // this is a shim -- use the external release
1769 dispatch_mig_server(ds
, max_msg_size
, mig_callback
);
1774 #endif /* DISPATCH_NO_LEGACY */
1777 _dispatch_mach_notify_source_init(void *context
__attribute__((unused
)))
1779 size_t maxsz
= sizeof(union __RequestUnion___dispatch_send_libdispatch_internal_protocol_subsystem
);
1781 if (sizeof(union __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem
) > maxsz
) {
1782 maxsz
= sizeof(union __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem
);
1785 _dispatch_get_port_set();
1787 _dispatch_mach_notify_source
= dispatch_source_mig_create(_dispatch_event_port
,
1788 maxsz
, NULL
, &_dispatch_mgr_q
, libdispatch_internal_protocol_server
);
1790 dispatch_assert(_dispatch_mach_notify_source
);
1794 _dispatch_mach_notify_port_deleted(mach_port_t notify
__attribute__((unused
)), mach_port_name_t name
)
1796 dispatch_source_t dsi
;
1797 dispatch_kevent_t dk
;
1801 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x deleted prematurely", name
);
1804 dk
= _dispatch_kevent_find(name
, EVFILT_MACHPORT
);
1809 EV_SET(&kev
, name
, EVFILT_MACHPORT
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
|EV_EOF
, DISPATCH_MACHPORT_DELETED
, 0, dk
);
1811 TAILQ_FOREACH(dsi
, &dk
->dk_sources
, ds_list
) {
1812 _dispatch_source_merge_kevent(dsi
, &kev
);
1813 // this can never happen again
1814 // this must happen after the merge
1815 // this may be racy in the future, but we don't provide a 'setter' API for the mask yet
1816 dsi
->ds_pending_data_mask
&= ~DISPATCH_MACHPORT_DELETED
;
1819 // no more sources have this flag
1820 dk
->dk_kevent
.fflags
&= ~DISPATCH_MACHPORT_DELETED
;
1823 return KERN_SUCCESS
;
1827 _dispatch_mach_notify_port_destroyed(mach_port_t notify
__attribute__((unused
)), mach_port_t name
)
1830 // this function should never be called
1831 dispatch_assume_zero(name
);
1832 kr
= mach_port_mod_refs(mach_task_self(), name
, MACH_PORT_RIGHT_RECEIVE
, -1);
1833 DISPATCH_VERIFY_MIG(kr
);
1834 dispatch_assume_zero(kr
);
1835 return KERN_SUCCESS
;
1839 _dispatch_mach_notify_no_senders(mach_port_t notify
, mach_port_mscount_t mscnt
__attribute__((unused
)))
1841 // this function should never be called
1842 dispatch_assume_zero(notify
);
1843 return KERN_SUCCESS
;
1847 _dispatch_mach_notify_send_once(mach_port_t notify
__attribute__((unused
)))
1849 // we only register for dead-name notifications
1850 // some code deallocated our send-once right without consuming it
1852 _dispatch_log("Corruption: An app/library deleted a libdispatch dead-name notification");
1854 return KERN_SUCCESS
;
1858 _dispatch_mach_notify_dead_name(mach_port_t notify
__attribute__((unused
)), mach_port_name_t name
)
1860 dispatch_source_t dsi
;
1861 dispatch_kevent_t dk
;
1865 dk
= _dispatch_kevent_find(name
, EVFILT_MACHPORT
);
1870 EV_SET(&kev
, name
, EVFILT_MACHPORT
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
|EV_EOF
, DISPATCH_MACHPORT_DEAD
, 0, dk
);
1872 TAILQ_FOREACH(dsi
, &dk
->dk_sources
, ds_list
) {
1873 _dispatch_source_merge_kevent(dsi
, &kev
);
1874 // this can never happen again
1875 // this must happen after the merge
1876 // this may be racy in the future, but we don't provide a 'setter' API for the mask yet
1877 dsi
->ds_pending_data_mask
&= ~DISPATCH_MACHPORT_DEAD
;
1880 // no more sources have this flag
1881 dk
->dk_kevent
.fflags
&= ~DISPATCH_MACHPORT_DEAD
;
1884 // the act of receiving a dead name notification allocates a dead-name right that must be deallocated
1885 kr
= mach_port_deallocate(mach_task_self(), name
);
1886 DISPATCH_VERIFY_MIG(kr
);
1887 //dispatch_assume_zero(kr);
1889 return KERN_SUCCESS
;
1893 _dispatch_wakeup_main_thread(mach_port_t mp
__attribute__((unused
)))
1895 // dummy function just to pop out the main thread out of mach_msg()
1900 _dispatch_consume_send_once_right(mach_port_t mp
__attribute__((unused
)))
1902 // dummy function to consume a send-once right
1907 dispatch_mig_server(dispatch_source_t ds
, size_t maxmsgsz
, dispatch_mig_callback_t callback
)
1909 mach_msg_options_t options
= MACH_RCV_MSG
| MACH_RCV_TIMEOUT
1910 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX
)
1911 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0
);
1912 mach_msg_options_t tmp_options
= options
;
1913 mig_reply_error_t
*bufTemp
, *bufRequest
, *bufReply
;
1914 mach_msg_return_t kr
= 0;
1915 unsigned int cnt
= 1000; // do not stall out serial queues
1918 maxmsgsz
+= MAX_TRAILER_SIZE
;
1920 // XXX FIXME -- allocate these elsewhere
1921 bufRequest
= alloca(maxmsgsz
);
1922 bufReply
= alloca(maxmsgsz
);
1923 bufReply
->Head
.msgh_size
= 0; // make CLANG happy
1925 // XXX FIXME -- change this to not starve out the target queue
1927 if (DISPATCH_OBJECT_SUSPENDED(ds
) || (--cnt
== 0)) {
1928 options
&= ~MACH_RCV_MSG
;
1929 tmp_options
&= ~MACH_RCV_MSG
;
1931 if (!(tmp_options
& MACH_SEND_MSG
)) {
1936 kr
= mach_msg(&bufReply
->Head
, tmp_options
, bufReply
->Head
.msgh_size
,
1937 (mach_msg_size_t
)maxmsgsz
, (mach_port_t
)ds
->ds_ident_hack
, 0, 0);
1939 tmp_options
= options
;
1943 case MACH_SEND_INVALID_DEST
:
1944 case MACH_SEND_TIMED_OUT
:
1945 if (bufReply
->Head
.msgh_bits
& MACH_MSGH_BITS_COMPLEX
) {
1946 mach_msg_destroy(&bufReply
->Head
);
1949 case MACH_RCV_TIMED_OUT
:
1950 case MACH_RCV_INVALID_NAME
:
1953 dispatch_assume_zero(kr
);
1959 if (!(tmp_options
& MACH_RCV_MSG
)) {
1963 bufTemp
= bufRequest
;
1964 bufRequest
= bufReply
;
1967 demux_success
= callback(&bufRequest
->Head
, &bufReply
->Head
);
1969 if (!demux_success
) {
1970 // destroy the request - but not the reply port
1971 bufRequest
->Head
.msgh_remote_port
= 0;
1972 mach_msg_destroy(&bufRequest
->Head
);
1973 } else if (!(bufReply
->Head
.msgh_bits
& MACH_MSGH_BITS_COMPLEX
)) {
1974 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode is present
1975 if (slowpath(bufReply
->RetCode
)) {
1976 if (bufReply
->RetCode
== MIG_NO_REPLY
) {
1980 // destroy the request - but not the reply port
1981 bufRequest
->Head
.msgh_remote_port
= 0;
1982 mach_msg_destroy(&bufRequest
->Head
);
1986 if (bufReply
->Head
.msgh_remote_port
) {
1987 tmp_options
|= MACH_SEND_MSG
;
1988 if (MACH_MSGH_BITS_REMOTE(bufReply
->Head
.msgh_bits
) != MACH_MSG_TYPE_MOVE_SEND_ONCE
) {
1989 tmp_options
|= MACH_SEND_TIMEOUT
;