2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 #include "protocolServer.h"
24 #include <sys/mount.h>
26 #define DISPATCH_EVFILT_TIMER (-EVFILT_SYSCOUNT - 1)
27 #define DISPATCH_EVFILT_CUSTOM_ADD (-EVFILT_SYSCOUNT - 2)
28 #define DISPATCH_EVFILT_CUSTOM_OR (-EVFILT_SYSCOUNT - 3)
29 #define DISPATCH_EVFILT_SYSCOUNT (EVFILT_SYSCOUNT + 3)
31 #define DISPATCH_TIMER_INDEX_WALL 0
32 #define DISPATCH_TIMER_INDEX_MACH 1
33 static struct dispatch_kevent_s _dispatch_kevent_timer
[] = {
36 .ident
= DISPATCH_TIMER_INDEX_WALL
,
37 .filter
= DISPATCH_EVFILT_TIMER
,
38 .udata
= &_dispatch_kevent_timer
[0],
40 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer
[0].dk_sources
),
44 .ident
= DISPATCH_TIMER_INDEX_MACH
,
45 .filter
= DISPATCH_EVFILT_TIMER
,
46 .udata
= &_dispatch_kevent_timer
[1],
48 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer
[1].dk_sources
),
51 #define DISPATCH_TIMER_COUNT (sizeof _dispatch_kevent_timer / sizeof _dispatch_kevent_timer[0])
53 static struct dispatch_kevent_s _dispatch_kevent_data_or
= {
55 .filter
= DISPATCH_EVFILT_CUSTOM_OR
,
57 .udata
= &_dispatch_kevent_data_or
,
59 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or
.dk_sources
),
61 static struct dispatch_kevent_s _dispatch_kevent_data_add
= {
63 .filter
= DISPATCH_EVFILT_CUSTOM_ADD
,
64 .udata
= &_dispatch_kevent_data_add
,
66 .dk_sources
= TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add
.dk_sources
),
69 #ifndef DISPATCH_NO_LEGACY
70 struct dispatch_source_attr_vtable_s
{
71 DISPATCH_VTABLE_HEADER(dispatch_source_attr_s
);
74 struct dispatch_source_attr_s
{
75 DISPATCH_STRUCT_HEADER(dispatch_source_attr_s
, dispatch_source_attr_vtable_s
);
77 dispatch_source_finalizer_function_t finalizer_func
;
80 #endif /* DISPATCH_NO_LEGACY */
82 #define _dispatch_source_call_block ((void *)-1)
83 static void _dispatch_source_latch_and_call(dispatch_source_t ds
);
84 static void _dispatch_source_cancel_callout(dispatch_source_t ds
);
85 static bool _dispatch_source_probe(dispatch_source_t ds
);
86 static void _dispatch_source_dispose(dispatch_source_t ds
);
87 static void _dispatch_source_merge_kevent(dispatch_source_t ds
, const struct kevent
*ke
);
88 static size_t _dispatch_source_debug(dispatch_source_t ds
, char* buf
, size_t bufsiz
);
89 static size_t dispatch_source_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
);
90 static dispatch_queue_t
_dispatch_source_invoke(dispatch_source_t ds
);
92 static void _dispatch_kevent_merge(dispatch_source_t ds
);
93 static void _dispatch_kevent_release(dispatch_source_t ds
);
94 static void _dispatch_kevent_resume(dispatch_kevent_t dk
, uint32_t new_flags
, uint32_t del_flags
);
95 static void _dispatch_kevent_machport_resume(dispatch_kevent_t dk
, uint32_t new_flags
, uint32_t del_flags
);
96 static void _dispatch_kevent_machport_enable(dispatch_kevent_t dk
);
97 static void _dispatch_kevent_machport_disable(dispatch_kevent_t dk
);
99 static void _dispatch_drain_mach_messages(struct kevent
*ke
);
100 static void _dispatch_timer_list_update(dispatch_source_t ds
);
103 _dispatch_mach_notify_source_init(void *context
__attribute__((unused
)));
106 _evfiltstr(short filt
)
109 #define _evfilt2(f) case (f): return #f
110 _evfilt2(EVFILT_READ
);
111 _evfilt2(EVFILT_WRITE
);
112 _evfilt2(EVFILT_AIO
);
113 _evfilt2(EVFILT_VNODE
);
114 _evfilt2(EVFILT_PROC
);
115 _evfilt2(EVFILT_SIGNAL
);
116 _evfilt2(EVFILT_TIMER
);
117 _evfilt2(EVFILT_MACHPORT
);
119 _evfilt2(EVFILT_USER
);
120 _evfilt2(EVFILT_SESSION
);
122 _evfilt2(DISPATCH_EVFILT_TIMER
);
123 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD
);
124 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR
);
126 return "EVFILT_missing";
130 #define DSL_HASH_SIZE 256u // must be a power of two
131 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
133 static TAILQ_HEAD(, dispatch_kevent_s
) _dispatch_sources
[DSL_HASH_SIZE
];
135 static dispatch_kevent_t
136 _dispatch_kevent_find(uintptr_t ident
, short filter
)
138 uintptr_t hash
= DSL_HASH(filter
== EVFILT_MACHPORT
? MACH_PORT_INDEX(ident
) : ident
);
139 dispatch_kevent_t dki
;
141 TAILQ_FOREACH(dki
, &_dispatch_sources
[hash
], dk_list
) {
142 if (dki
->dk_kevent
.ident
== ident
&& dki
->dk_kevent
.filter
== filter
) {
150 _dispatch_kevent_insert(dispatch_kevent_t dk
)
152 uintptr_t ident
= dk
->dk_kevent
.ident
;
153 uintptr_t hash
= DSL_HASH(dk
->dk_kevent
.filter
== EVFILT_MACHPORT
? MACH_PORT_INDEX(ident
) : ident
);
155 TAILQ_INSERT_TAIL(&_dispatch_sources
[hash
], dk
, dk_list
);
159 dispatch_source_cancel(dispatch_source_t ds
)
162 dispatch_debug(ds
, __FUNCTION__
);
164 // Right after we set the cancel flag, someone else
165 // could potentially invoke the source, do the cancelation,
166 // unregister the source, and deallocate it. We would
167 // need to therefore retain/release before setting the bit
169 _dispatch_retain(ds
);
170 dispatch_atomic_or(&ds
->ds_atomic_flags
, DSF_CANCELED
);
171 _dispatch_wakeup(ds
);
172 _dispatch_release(ds
);
175 #ifndef DISPATCH_NO_LEGACY
177 _dispatch_source_legacy_xref_release(dispatch_source_t ds
)
179 if (ds
->ds_is_legacy
) {
180 if (!(ds
->ds_timer
.flags
& DISPATCH_TIMER_ONESHOT
)) {
181 dispatch_source_cancel(ds
);
184 // Clients often leave sources suspended at the last release
185 dispatch_atomic_and(&ds
->do_suspend_cnt
, DISPATCH_OBJECT_SUSPEND_LOCK
);
186 } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds
))) {
187 // Arguments for and against this assert are within 6705399
188 DISPATCH_CLIENT_CRASH("Release of a suspended object");
190 _dispatch_wakeup(ds
);
191 _dispatch_release(ds
);
193 #endif /* DISPATCH_NO_LEGACY */
196 dispatch_source_testcancel(dispatch_source_t ds
)
198 return (bool)(ds
->ds_atomic_flags
& DSF_CANCELED
);
203 dispatch_source_get_mask(dispatch_source_t ds
)
205 return ds
->ds_pending_data_mask
;
209 dispatch_source_get_handle(dispatch_source_t ds
)
211 return (int)ds
->ds_ident_hack
;
215 dispatch_source_get_data(dispatch_source_t ds
)
222 dispatch_debug_kevents(struct kevent
* kev
, size_t count
, const char* str
)
225 for (i
= 0; i
< count
; ++i
) {
226 _dispatch_log("kevent[%lu] = { ident = %p, filter = %s, flags = 0x%x, fflags = 0x%x, data = %p, udata = %p }: %s",
227 i
, (void*)kev
[i
].ident
, _evfiltstr(kev
[i
].filter
), kev
[i
].flags
, kev
[i
].fflags
, (void*)kev
[i
].data
, (void*)kev
[i
].udata
, str
);
233 _dispatch_source_kevent_debug(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
235 size_t offset
= _dispatch_source_debug(ds
, buf
, bufsiz
);
236 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "filter = %s }",
237 ds
->ds_dkev
? _evfiltstr(ds
->ds_dkev
->dk_kevent
.filter
) : "????");
242 _dispatch_source_init_tail_queue_array(void *context
__attribute__((unused
)))
245 for (i
= 0; i
< DSL_HASH_SIZE
; i
++) {
246 TAILQ_INIT(&_dispatch_sources
[i
]);
249 TAILQ_INSERT_TAIL(&_dispatch_sources
[DSL_HASH(DISPATCH_TIMER_INDEX_WALL
)], &_dispatch_kevent_timer
[DISPATCH_TIMER_INDEX_WALL
], dk_list
);
250 TAILQ_INSERT_TAIL(&_dispatch_sources
[DSL_HASH(DISPATCH_TIMER_INDEX_MACH
)], &_dispatch_kevent_timer
[DISPATCH_TIMER_INDEX_MACH
], dk_list
);
251 TAILQ_INSERT_TAIL(&_dispatch_sources
[0], &_dispatch_kevent_data_or
, dk_list
);
252 TAILQ_INSERT_TAIL(&_dispatch_sources
[0], &_dispatch_kevent_data_add
, dk_list
);
255 // Find existing kevents, and merge any new flags if necessary
257 _dispatch_kevent_merge(dispatch_source_t ds
)
259 static dispatch_once_t pred
;
260 dispatch_kevent_t dk
;
261 typeof(dk
->dk_kevent
.fflags
) new_flags
;
262 bool do_resume
= false;
264 if (ds
->ds_is_installed
) {
267 ds
->ds_is_installed
= true;
269 dispatch_once_f(&pred
, NULL
, _dispatch_source_init_tail_queue_array
);
271 dk
= _dispatch_kevent_find(ds
->ds_dkev
->dk_kevent
.ident
, ds
->ds_dkev
->dk_kevent
.filter
);
274 // If an existing dispatch kevent is found, check to see if new flags
275 // need to be added to the existing kevent
276 new_flags
= ~dk
->dk_kevent
.fflags
& ds
->ds_dkev
->dk_kevent
.fflags
;
277 dk
->dk_kevent
.fflags
|= ds
->ds_dkev
->dk_kevent
.fflags
;
280 do_resume
= new_flags
;
283 _dispatch_kevent_insert(dk
);
284 new_flags
= dk
->dk_kevent
.fflags
;
288 TAILQ_INSERT_TAIL(&dk
->dk_sources
, ds
, ds_list
);
290 // Re-register the kevent with the kernel if new flags were added
291 // by the dispatch kevent
293 dk
->dk_kevent
.flags
|= EV_ADD
;
294 _dispatch_kevent_resume(ds
->ds_dkev
, new_flags
, 0);
295 ds
->ds_is_armed
= true;
301 _dispatch_kevent_resume(dispatch_kevent_t dk
, uint32_t new_flags
, uint32_t del_flags
)
303 switch (dk
->dk_kevent
.filter
) {
304 case DISPATCH_EVFILT_TIMER
:
305 case DISPATCH_EVFILT_CUSTOM_ADD
:
306 case DISPATCH_EVFILT_CUSTOM_OR
:
307 // these types not registered with kevent
309 case EVFILT_MACHPORT
:
310 _dispatch_kevent_machport_resume(dk
, new_flags
, del_flags
);
313 if (dk
->dk_kevent
.flags
& EV_ONESHOT
) {
318 _dispatch_update_kq(&dk
->dk_kevent
);
319 if (dk
->dk_kevent
.flags
& EV_DISPATCH
) {
320 dk
->dk_kevent
.flags
&= ~EV_ADD
;
327 _dispatch_source_invoke(dispatch_source_t ds
)
329 // This function performs all source actions. Each action is responsible
330 // for verifying that it takes place on the appropriate queue. If the
331 // current queue is not the correct queue for this action, the correct queue
332 // will be returned and the invoke will be re-driven on that queue.
334 // The order of tests here in invoke and in probe should be consistent.
336 dispatch_queue_t dq
= _dispatch_queue_get_current();
338 if (!ds
->ds_is_installed
) {
339 // The source needs to be installed on the manager queue.
340 if (dq
!= &_dispatch_mgr_q
) {
341 return &_dispatch_mgr_q
;
343 _dispatch_kevent_merge(ds
);
344 } else if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== 0)) {
345 // The source has been cancelled and needs to be uninstalled from the
346 // manager queue. After uninstallation, the cancellation handler needs
347 // to be delivered to the target queue.
349 if (dq
!= &_dispatch_mgr_q
) {
350 return &_dispatch_mgr_q
;
352 _dispatch_kevent_release(ds
);
353 return ds
->do_targetq
;
354 } else if (ds
->ds_cancel_handler
) {
355 if (dq
!= ds
->do_targetq
) {
356 return ds
->do_targetq
;
359 _dispatch_source_cancel_callout(ds
);
360 } else if (ds
->ds_pending_data
) {
361 // The source has pending data to deliver via the event handler callback
362 // on the target queue. Some sources need to be rearmed on the manager
363 // queue after event delivery.
364 if (dq
!= ds
->do_targetq
) {
365 return ds
->do_targetq
;
367 _dispatch_source_latch_and_call(ds
);
368 if (ds
->ds_needs_rearm
) {
369 return &_dispatch_mgr_q
;
371 } else if (ds
->ds_needs_rearm
&& !ds
->ds_is_armed
) {
372 // The source needs to be rearmed on the manager queue.
373 if (dq
!= &_dispatch_mgr_q
) {
374 return &_dispatch_mgr_q
;
376 _dispatch_kevent_resume(ds
->ds_dkev
, 0, 0);
377 ds
->ds_is_armed
= true;
384 _dispatch_source_probe(dispatch_source_t ds
)
386 // This function determines whether the source needs to be invoked.
387 // The order of tests here in probe and in invoke should be consistent.
389 if (!ds
->ds_is_installed
) {
390 // The source needs to be installed on the manager queue.
392 } else if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== 0)) {
393 // The source needs to be uninstalled from the manager queue, or the
394 // cancellation handler needs to be delivered to the target queue.
395 // Note: cancellation assumes installation.
396 if (ds
->ds_dkev
|| ds
->ds_cancel_handler
) {
399 } else if (ds
->ds_pending_data
) {
400 // The source has pending data to deliver to the target queue.
402 } else if (ds
->ds_needs_rearm
&& !ds
->ds_is_armed
) {
403 // The source needs to be rearmed on the manager queue.
411 _dispatch_source_dispose(dispatch_source_t ds
)
413 _dispatch_queue_dispose((dispatch_queue_t
)ds
);
417 _dispatch_kevent_debugger2(void *context
, dispatch_source_t unused
__attribute__((unused
)))
420 socklen_t sa_len
= sizeof(sa
);
421 int c
, fd
= (int)(long)context
;
423 dispatch_kevent_t dk
;
424 dispatch_source_t ds
;
427 c
= accept(fd
, &sa
, &sa_len
);
429 if (errno
!= EAGAIN
) {
430 dispatch_assume_zero(errno
);
435 int r
= fcntl(c
, F_SETFL
, 0); // disable non-blocking IO
437 dispatch_assume_zero(errno
);
440 debug_stream
= fdopen(c
, "a");
441 if (!dispatch_assume(debug_stream
)) {
446 fprintf(debug_stream
, "HTTP/1.0 200 OK\r\n");
447 fprintf(debug_stream
, "Content-type: text/html\r\n");
448 fprintf(debug_stream
, "Pragma: nocache\r\n");
449 fprintf(debug_stream
, "\r\n");
450 fprintf(debug_stream
, "<html>\n<head><title>PID %u</title></head>\n<body>\n<ul>\n", getpid());
452 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td></tr>\n");
454 for (i
= 0; i
< DSL_HASH_SIZE
; i
++) {
455 if (TAILQ_EMPTY(&_dispatch_sources
[i
])) {
458 TAILQ_FOREACH(dk
, &_dispatch_sources
[i
], dk_list
) {
459 fprintf(debug_stream
, "\t<br><li>DK %p ident %lu filter %s flags 0x%hx fflags 0x%x data 0x%lx udata %p\n",
460 dk
, dk
->dk_kevent
.ident
, _evfiltstr(dk
->dk_kevent
.filter
), dk
->dk_kevent
.flags
,
461 dk
->dk_kevent
.fflags
, dk
->dk_kevent
.data
, dk
->dk_kevent
.udata
);
462 fprintf(debug_stream
, "\t\t<ul>\n");
463 TAILQ_FOREACH(ds
, &dk
->dk_sources
, ds_list
) {
464 fprintf(debug_stream
, "\t\t\t<li>DS %p refcnt 0x%x suspend 0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
465 ds
, ds
->do_ref_cnt
, ds
->do_suspend_cnt
, ds
->ds_pending_data
, ds
->ds_pending_data_mask
,
466 ds
->ds_atomic_flags
);
467 if (ds
->do_suspend_cnt
== DISPATCH_OBJECT_SUSPEND_LOCK
) {
468 dispatch_queue_t dq
= ds
->do_targetq
;
469 fprintf(debug_stream
, "\t\t<br>DQ: %p refcnt 0x%x suspend 0x%x label: %s\n", dq
, dq
->do_ref_cnt
, dq
->do_suspend_cnt
, dq
->dq_label
);
472 fprintf(debug_stream
, "\t\t</ul>\n");
473 fprintf(debug_stream
, "\t</li>\n");
476 fprintf(debug_stream
, "</ul>\n</body>\n</html>\n");
477 fflush(debug_stream
);
478 fclose(debug_stream
);
482 _dispatch_kevent_debugger(void *context
__attribute__((unused
)))
485 struct sockaddr_in sa_in
;
489 .sin_family
= AF_INET
,
490 .sin_addr
= { htonl(INADDR_LOOPBACK
), },
493 dispatch_source_t ds
;
495 int val
, r
, fd
, sock_opt
= 1;
496 socklen_t slen
= sizeof(sa_u
);
501 valstr
= getenv("LIBDISPATCH_DEBUGGER");
507 sa_u
.sa_in
.sin_addr
.s_addr
= 0;
509 fd
= socket(PF_INET
, SOCK_STREAM
, 0);
511 dispatch_assume_zero(errno
);
514 r
= setsockopt(fd
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&sock_opt
, (socklen_t
) sizeof sock_opt
);
516 dispatch_assume_zero(errno
);
520 r
= fcntl(fd
, F_SETFL
, O_NONBLOCK
);
522 dispatch_assume_zero(errno
);
526 r
= bind(fd
, &sa_u
.sa
, sizeof(sa_u
));
528 dispatch_assume_zero(errno
);
531 r
= listen(fd
, SOMAXCONN
);
533 dispatch_assume_zero(errno
);
536 r
= getsockname(fd
, &sa_u
.sa
, &slen
);
538 dispatch_assume_zero(errno
);
541 ds
= dispatch_source_read_create_f(fd
, NULL
, &_dispatch_mgr_q
, (void *)(long)fd
, _dispatch_kevent_debugger2
);
542 if (dispatch_assume(ds
)) {
543 _dispatch_log("LIBDISPATCH: debug port: %hu", ntohs(sa_u
.sa_in
.sin_port
));
551 _dispatch_source_drain_kevent(struct kevent
*ke
)
553 static dispatch_once_t pred
;
554 dispatch_kevent_t dk
= ke
->udata
;
555 dispatch_source_t dsi
;
557 dispatch_once_f(&pred
, NULL
, _dispatch_kevent_debugger
);
559 dispatch_debug_kevents(ke
, 1, __func__
);
561 if (ke
->filter
== EVFILT_MACHPORT
) {
562 return _dispatch_drain_mach_messages(ke
);
566 if (ke
->flags
& EV_ONESHOT
) {
567 dk
->dk_kevent
.flags
|= EV_ONESHOT
;
570 TAILQ_FOREACH(dsi
, &dk
->dk_sources
, ds_list
) {
571 _dispatch_source_merge_kevent(dsi
, ke
);
576 _dispatch_kevent_dispose(dispatch_kevent_t dk
)
580 switch (dk
->dk_kevent
.filter
) {
581 case DISPATCH_EVFILT_TIMER
:
582 case DISPATCH_EVFILT_CUSTOM_ADD
:
583 case DISPATCH_EVFILT_CUSTOM_OR
:
584 // these sources live on statically allocated lists
586 case EVFILT_MACHPORT
:
587 _dispatch_kevent_machport_resume(dk
, 0, dk
->dk_kevent
.fflags
);
590 if (dk
->dk_kevent
.flags
& EV_ONESHOT
) {
591 break; // implicitly deleted
595 if (~dk
->dk_kevent
.flags
& EV_DELETE
) {
596 dk
->dk_kevent
.flags
|= EV_DELETE
;
597 _dispatch_update_kq(&dk
->dk_kevent
);
602 if (dk
->dk_kevent
.filter
== EVFILT_MACHPORT
) {
603 key
= MACH_PORT_INDEX(dk
->dk_kevent
.ident
);
605 key
= dk
->dk_kevent
.ident
;
608 TAILQ_REMOVE(&_dispatch_sources
[DSL_HASH(key
)], dk
, dk_list
);
613 _dispatch_kevent_release(dispatch_source_t ds
)
615 dispatch_kevent_t dk
= ds
->ds_dkev
;
616 dispatch_source_t dsi
;
617 uint32_t del_flags
, fflags
= 0;
621 TAILQ_REMOVE(&dk
->dk_sources
, ds
, ds_list
);
623 if (TAILQ_EMPTY(&dk
->dk_sources
)) {
624 _dispatch_kevent_dispose(dk
);
626 TAILQ_FOREACH(dsi
, &dk
->dk_sources
, ds_list
) {
627 fflags
|= (uint32_t)dsi
->ds_pending_data_mask
;
629 del_flags
= (uint32_t)ds
->ds_pending_data_mask
& ~fflags
;
631 dk
->dk_kevent
.flags
|= EV_ADD
;
632 dk
->dk_kevent
.fflags
= fflags
;
633 _dispatch_kevent_resume(dk
, 0, del_flags
);
637 ds
->ds_is_armed
= false;
638 ds
->ds_needs_rearm
= false; // re-arm is pointless and bad now
639 _dispatch_release(ds
); // the retain is done at creation time
643 _dispatch_source_merge_kevent(dispatch_source_t ds
, const struct kevent
*ke
)
647 if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== 0)) {
651 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie.
652 // We simulate an exit event in this case. <rdar://problem/5067725>
653 if (ke
->flags
& EV_ERROR
) {
654 if (ke
->filter
== EVFILT_PROC
&& ke
->data
== ESRCH
) {
656 fake
.flags
&= ~EV_ERROR
;
657 fake
.fflags
= NOTE_EXIT
;
661 // log the unexpected error
662 dispatch_assume_zero(ke
->data
);
667 if (ds
->ds_is_level
) {
668 // ke->data is signed and "negative available data" makes no sense
669 // zero bytes happens when EV_EOF is set
670 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
671 dispatch_assert(ke
->data
>= 0l);
672 ds
->ds_pending_data
= ~ke
->data
;
673 } else if (ds
->ds_is_adder
) {
674 dispatch_atomic_add(&ds
->ds_pending_data
, ke
->data
);
676 dispatch_atomic_or(&ds
->ds_pending_data
, ke
->fflags
& ds
->ds_pending_data_mask
);
679 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
680 if (ds
->ds_needs_rearm
) {
681 ds
->ds_is_armed
= false;
684 _dispatch_wakeup(ds
);
688 _dispatch_source_latch_and_call(dispatch_source_t ds
)
692 if ((ds
->ds_atomic_flags
& DSF_CANCELED
) || (ds
->do_xref_cnt
== 0)) {
695 prev
= dispatch_atomic_xchg(&ds
->ds_pending_data
, 0);
696 if (ds
->ds_is_level
) {
701 if (dispatch_assume(prev
)) {
702 if (ds
->ds_handler_func
) {
703 ds
->ds_handler_func(ds
->ds_handler_ctxt
, ds
);
709 _dispatch_source_cancel_callout(dispatch_source_t ds
)
711 ds
->ds_pending_data_mask
= 0;
712 ds
->ds_pending_data
= 0;
716 if (ds
->ds_handler_is_block
) {
717 Block_release(ds
->ds_handler_ctxt
);
718 ds
->ds_handler_is_block
= false;
719 ds
->ds_handler_func
= NULL
;
720 ds
->ds_handler_ctxt
= NULL
;
724 if (!ds
->ds_cancel_handler
) {
727 if (ds
->ds_cancel_is_block
) {
729 dispatch_block_t b
= ds
->ds_cancel_handler
;
730 if (ds
->ds_atomic_flags
& DSF_CANCELED
) {
733 Block_release(ds
->ds_cancel_handler
);
734 ds
->ds_cancel_is_block
= false;
737 dispatch_function_t f
= ds
->ds_cancel_handler
;
738 if (ds
->ds_atomic_flags
& DSF_CANCELED
) {
742 ds
->ds_cancel_handler
= NULL
;
745 const struct dispatch_source_vtable_s _dispatch_source_kevent_vtable
= {
746 .do_type
= DISPATCH_SOURCE_KEVENT_TYPE
,
747 .do_kind
= "kevent-source",
748 .do_invoke
= _dispatch_source_invoke
,
749 .do_dispose
= _dispatch_source_dispose
,
750 .do_probe
= _dispatch_source_probe
,
751 .do_debug
= _dispatch_source_kevent_debug
,
755 dispatch_source_merge_data(dispatch_source_t ds
, unsigned long val
)
757 struct kevent kev
= {
758 .fflags
= (typeof(kev
.fflags
))val
,
762 dispatch_assert(ds
->ds_dkev
->dk_kevent
.filter
== DISPATCH_EVFILT_CUSTOM_ADD
||
763 ds
->ds_dkev
->dk_kevent
.filter
== DISPATCH_EVFILT_CUSTOM_OR
);
765 _dispatch_source_merge_kevent(ds
, &kev
);
769 dispatch_source_debug_attr(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
771 dispatch_queue_t target
= ds
->do_targetq
;
772 return snprintf(buf
, bufsiz
,
773 "target = %s[%p], pending_data = 0x%lx, pending_data_mask = 0x%lx, ",
774 target
? target
->dq_label
: "", target
,
775 ds
->ds_pending_data
, ds
->ds_pending_data_mask
);
779 _dispatch_source_debug(dispatch_source_t ds
, char* buf
, size_t bufsiz
)
782 offset
+= snprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ", dx_kind(ds
), ds
);
783 offset
+= dispatch_object_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
784 offset
+= dispatch_source_debug_attr(ds
, &buf
[offset
], bufsiz
- offset
);
788 #ifndef DISPATCH_NO_LEGACY
790 dispatch_source_attr_dispose(dispatch_source_attr_t attr
)
792 // release the finalizer block if necessary
793 dispatch_source_attr_set_finalizer(attr
, NULL
);
794 _dispatch_dispose(attr
);
797 static const struct dispatch_source_attr_vtable_s dispatch_source_attr_vtable
= {
798 .do_type
= DISPATCH_SOURCE_ATTR_TYPE
,
799 .do_kind
= "source-attr",
800 .do_dispose
= dispatch_source_attr_dispose
,
803 dispatch_source_attr_t
804 dispatch_source_attr_create(void)
806 dispatch_source_attr_t rval
= calloc(1, sizeof(struct dispatch_source_attr_s
));
809 rval
->do_vtable
= &dispatch_source_attr_vtable
;
810 rval
->do_next
= DISPATCH_OBJECT_LISTLESS
;
811 rval
->do_targetq
= dispatch_get_global_queue(0, 0);
812 rval
->do_ref_cnt
= 1;
813 rval
->do_xref_cnt
= 1;
820 dispatch_source_attr_set_finalizer_f(dispatch_source_attr_t attr
,
821 void *context
, dispatch_source_finalizer_function_t finalizer
)
824 if (attr
->finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
825 Block_release(attr
->finalizer_ctxt
);
829 attr
->finalizer_ctxt
= context
;
830 attr
->finalizer_func
= finalizer
;
835 dispatch_source_attr_set_finalizer(dispatch_source_attr_t attr
,
836 dispatch_source_finalizer_t finalizer
)
839 dispatch_source_finalizer_function_t func
;
842 if (!(ctxt
= Block_copy(finalizer
))) {
845 func
= (void *)_dispatch_call_block_and_release2
;
851 dispatch_source_attr_set_finalizer_f(attr
, ctxt
, func
);
856 dispatch_source_finalizer_t
857 dispatch_source_attr_get_finalizer(dispatch_source_attr_t attr
)
859 if (attr
->finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
860 return (dispatch_source_finalizer_t
)attr
->finalizer_ctxt
;
861 } else if (attr
->finalizer_func
== NULL
) {
864 abort(); // finalizer is not a block...
870 dispatch_source_attr_set_context(dispatch_source_attr_t attr
, void *context
)
872 attr
->context
= context
;
875 dispatch_source_attr_t
876 dispatch_source_attr_copy(dispatch_source_attr_t proto
)
878 dispatch_source_attr_t rval
= NULL
;
880 if (proto
&& (rval
= malloc(sizeof(struct dispatch_source_attr_s
)))) {
881 memcpy(rval
, proto
, sizeof(struct dispatch_source_attr_s
));
883 if (rval
->finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
884 rval
->finalizer_ctxt
= Block_copy(rval
->finalizer_ctxt
);
888 rval
= dispatch_source_attr_create();
892 #endif /* DISPATCH_NO_LEGACY */
895 struct dispatch_source_type_s
{
900 const struct dispatch_source_type_s _dispatch_source_type_timer
= {
902 .filter
= DISPATCH_EVFILT_TIMER
,
904 .mask
= DISPATCH_TIMER_INTERVAL
|DISPATCH_TIMER_ONESHOT
|DISPATCH_TIMER_ABSOLUTE
|DISPATCH_TIMER_WALL_CLOCK
,
907 const struct dispatch_source_type_s _dispatch_source_type_read
= {
909 .filter
= EVFILT_READ
,
910 .flags
= EV_DISPATCH
,
914 const struct dispatch_source_type_s _dispatch_source_type_write
= {
916 .filter
= EVFILT_WRITE
,
917 .flags
= EV_DISPATCH
,
921 const struct dispatch_source_type_s _dispatch_source_type_proc
= {
923 .filter
= EVFILT_PROC
,
926 .mask
= NOTE_EXIT
|NOTE_FORK
|NOTE_EXEC
|NOTE_SIGNAL
|NOTE_REAP
,
929 const struct dispatch_source_type_s _dispatch_source_type_signal
= {
931 .filter
= EVFILT_SIGNAL
,
935 const struct dispatch_source_type_s _dispatch_source_type_vnode
= {
937 .filter
= EVFILT_VNODE
,
940 .mask
= NOTE_DELETE
|NOTE_WRITE
|NOTE_EXTEND
|NOTE_ATTRIB
|NOTE_LINK
|NOTE_RENAME
|NOTE_REVOKE
|NOTE_NONE
,
943 const struct dispatch_source_type_s _dispatch_source_type_vfs
= {
948 .mask
= VQ_NOTRESP
|VQ_NEEDAUTH
|VQ_LOWDISK
|VQ_MOUNT
|VQ_UNMOUNT
|VQ_DEAD
|VQ_ASSIST
|VQ_NOTRESPLOCK
|VQ_UPDATE
|VQ_VERYLOWDISK
,
951 const struct dispatch_source_type_s _dispatch_source_type_mach_send
= {
953 .filter
= EVFILT_MACHPORT
,
954 .flags
= EV_DISPATCH
,
955 .fflags
= DISPATCH_MACHPORT_DEAD
,
957 .mask
= DISPATCH_MACH_SEND_DEAD
,
960 const struct dispatch_source_type_s _dispatch_source_type_mach_recv
= {
962 .filter
= EVFILT_MACHPORT
,
963 .flags
= EV_DISPATCH
,
964 .fflags
= DISPATCH_MACHPORT_RECV
,
968 const struct dispatch_source_type_s _dispatch_source_type_data_add
= {
970 .filter
= DISPATCH_EVFILT_CUSTOM_ADD
,
974 const struct dispatch_source_type_s _dispatch_source_type_data_or
= {
976 .filter
= DISPATCH_EVFILT_CUSTOM_OR
,
983 dispatch_source_create(dispatch_source_type_t type
,
988 const struct kevent
*proto_kev
= &type
->ke
;
989 dispatch_source_t ds
= NULL
;
990 dispatch_kevent_t dk
= NULL
;
993 if (type
== NULL
|| (mask
& ~type
->mask
)) {
997 switch (type
->ke
.filter
) {
999 if (handle
>= NSIG
) {
1004 case DISPATCH_EVFILT_CUSTOM_ADD
:
1005 case DISPATCH_EVFILT_CUSTOM_OR
:
1006 case DISPATCH_EVFILT_TIMER
:
1015 ds
= calloc(1ul, sizeof(struct dispatch_source_s
));
1016 if (slowpath(!ds
)) {
1019 dk
= calloc(1ul, sizeof(struct dispatch_kevent_s
));
1020 if (slowpath(!dk
)) {
1024 dk
->dk_kevent
= *proto_kev
;
1025 dk
->dk_kevent
.ident
= handle
;
1026 dk
->dk_kevent
.flags
|= EV_ADD
|EV_ENABLE
;
1027 dk
->dk_kevent
.fflags
|= (uint32_t)mask
;
1028 dk
->dk_kevent
.udata
= dk
;
1029 TAILQ_INIT(&dk
->dk_sources
);
1031 // Initialize as a queue first, then override some settings below.
1032 _dispatch_queue_init((dispatch_queue_t
)ds
);
1033 strlcpy(ds
->dq_label
, "source", sizeof(ds
->dq_label
));
1036 ds
->do_vtable
= &_dispatch_source_kevent_vtable
;
1037 ds
->do_ref_cnt
++; // the reference the manger queue holds
1038 ds
->do_suspend_cnt
= DISPATCH_OBJECT_SUSPEND_INTERVAL
;
1039 // do_targetq will be retained below, past point of no-return
1043 ds
->ds_ident_hack
= dk
->dk_kevent
.ident
;
1045 ds
->ds_pending_data_mask
= dk
->dk_kevent
.fflags
;
1046 if ((EV_DISPATCH
|EV_ONESHOT
) & proto_kev
->flags
) {
1047 if (proto_kev
->filter
!= EVFILT_MACHPORT
) {
1048 ds
->ds_is_level
= true;
1050 ds
->ds_needs_rearm
= true;
1051 } else if (!(EV_CLEAR
& proto_kev
->flags
)) {
1052 // we cheat and use EV_CLEAR to mean a "flag thingy"
1053 ds
->ds_is_adder
= true;
1056 // If its a timer source, it needs to be re-armed
1057 if (type
->ke
.filter
== DISPATCH_EVFILT_TIMER
) {
1058 ds
->ds_needs_rearm
= true;
1061 dispatch_assert(!(ds
->ds_is_level
&& ds
->ds_is_adder
));
1063 dispatch_debug(ds
, __FUNCTION__
);
1066 // Some sources require special processing
1067 if (type
== DISPATCH_SOURCE_TYPE_MACH_SEND
) {
1068 static dispatch_once_t pred
;
1069 dispatch_once_f(&pred
, NULL
, _dispatch_mach_notify_source_init
);
1070 } else if (type
== DISPATCH_SOURCE_TYPE_TIMER
) {
1071 ds
->ds_timer
.flags
= mask
;
1074 _dispatch_retain(ds
->do_targetq
);
1083 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1085 _dispatch_source_set_event_handler2(void *context
)
1087 struct Block_layout
*bl
= context
;
1089 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
1090 dispatch_assert(ds
->do_vtable
== &_dispatch_source_kevent_vtable
);
1092 if (ds
->ds_handler_is_block
&& ds
->ds_handler_ctxt
) {
1093 Block_release(ds
->ds_handler_ctxt
);
1095 ds
->ds_handler_func
= bl
? (void *)bl
->invoke
: NULL
;
1096 ds
->ds_handler_ctxt
= bl
;
1097 ds
->ds_handler_is_block
= true;
1101 dispatch_source_set_event_handler(dispatch_source_t ds
, dispatch_block_t handler
)
1103 dispatch_assert(!ds
->ds_is_legacy
);
1104 handler
= _dispatch_Block_copy(handler
);
1105 dispatch_barrier_async_f((dispatch_queue_t
)ds
,
1106 handler
, _dispatch_source_set_event_handler2
);
1110 _dispatch_source_set_event_handler_f(void *context
)
1112 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
1113 dispatch_assert(ds
->do_vtable
== &_dispatch_source_kevent_vtable
);
1115 if (ds
->ds_handler_is_block
&& ds
->ds_handler_ctxt
) {
1116 Block_release(ds
->ds_handler_ctxt
);
1118 ds
->ds_handler_func
= context
;
1119 ds
->ds_handler_ctxt
= ds
->do_ctxt
;
1120 ds
->ds_handler_is_block
= false;
1124 dispatch_source_set_event_handler_f(dispatch_source_t ds
,
1125 dispatch_function_t handler
)
1127 dispatch_assert(!ds
->ds_is_legacy
);
1128 dispatch_barrier_async_f((dispatch_queue_t
)ds
,
1129 handler
, _dispatch_source_set_event_handler_f
);
1132 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1134 _dispatch_source_set_cancel_handler2(void *context
)
1136 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
1137 dispatch_assert(ds
->do_vtable
== &_dispatch_source_kevent_vtable
);
1139 if (ds
->ds_cancel_is_block
&& ds
->ds_cancel_handler
) {
1140 Block_release(ds
->ds_cancel_handler
);
1142 ds
->ds_cancel_handler
= context
;
1143 ds
->ds_cancel_is_block
= true;
1147 dispatch_source_set_cancel_handler(dispatch_source_t ds
,
1148 dispatch_block_t handler
)
1150 dispatch_assert(!ds
->ds_is_legacy
);
1151 handler
= _dispatch_Block_copy(handler
);
1152 dispatch_barrier_async_f((dispatch_queue_t
)ds
,
1153 handler
, _dispatch_source_set_cancel_handler2
);
1157 _dispatch_source_set_cancel_handler_f(void *context
)
1159 dispatch_source_t ds
= (dispatch_source_t
)_dispatch_queue_get_current();
1160 dispatch_assert(ds
->do_vtable
== &_dispatch_source_kevent_vtable
);
1162 if (ds
->ds_cancel_is_block
&& ds
->ds_cancel_handler
) {
1163 Block_release(ds
->ds_cancel_handler
);
1165 ds
->ds_cancel_handler
= context
;
1166 ds
->ds_cancel_is_block
= false;
1170 dispatch_source_set_cancel_handler_f(dispatch_source_t ds
,
1171 dispatch_function_t handler
)
1173 dispatch_assert(!ds
->ds_is_legacy
);
1174 dispatch_barrier_async_f((dispatch_queue_t
)ds
,
1175 handler
, _dispatch_source_set_cancel_handler_f
);
1178 #ifndef DISPATCH_NO_LEGACY
1179 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1181 _dispatch_source_create2(dispatch_source_t ds
,
1182 dispatch_source_attr_t attr
,
1184 dispatch_source_handler_function_t handler
)
1186 if (ds
== NULL
|| handler
== NULL
) {
1190 ds
->ds_is_legacy
= true;
1192 ds
->ds_handler_func
= handler
;
1193 ds
->ds_handler_ctxt
= context
;
1195 if (attr
&& attr
!= DISPATCH_SOURCE_CREATE_SUSPENDED
) {
1196 ds
->dq_finalizer_ctxt
= attr
->finalizer_ctxt
;
1197 ds
->dq_finalizer_func
= (typeof(ds
->dq_finalizer_func
))attr
->finalizer_func
;
1198 ds
->do_ctxt
= attr
->context
;
1201 if (ds
->dq_finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
1202 ds
->dq_finalizer_ctxt
= Block_copy(ds
->dq_finalizer_ctxt
);
1203 if (!ds
->dq_finalizer_ctxt
) {
1207 if (handler
== _dispatch_source_call_block
) {
1208 struct Block_layout
*bl
= ds
->ds_handler_ctxt
= Block_copy(context
);
1209 if (!ds
->ds_handler_ctxt
) {
1210 if (ds
->dq_finalizer_func
== (void*)_dispatch_call_block_and_release2
) {
1211 Block_release(ds
->dq_finalizer_ctxt
);
1215 ds
->ds_handler_func
= (void *)bl
->invoke
;
1216 ds
->ds_handler_is_block
= true;
1219 // all legacy sources get a cancellation event on the normal event handler.
1220 dispatch_source_handler_function_t func
= ds
->ds_handler_func
;
1221 dispatch_source_handler_t block
= ds
->ds_handler_ctxt
;
1222 void *ctxt
= ds
->ds_handler_ctxt
;
1223 bool handler_is_block
= ds
->ds_handler_is_block
;
1225 ds
->ds_cancel_is_block
= true;
1226 if (handler_is_block
) {
1227 ds
->ds_cancel_handler
= _dispatch_Block_copy(^{
1231 ds
->ds_cancel_handler
= _dispatch_Block_copy(^{
1236 if (attr
!= DISPATCH_SOURCE_CREATE_SUSPENDED
) {
1237 dispatch_resume(ds
);
1248 dispatch_source_get_error(dispatch_source_t ds
, long *err_out
)
1250 // 6863892 don't report ECANCELED until kevent is unregistered
1251 if ((ds
->ds_atomic_flags
& DSF_CANCELED
) && !ds
->ds_dkev
) {
1253 *err_out
= ECANCELED
;
1255 return DISPATCH_ERROR_DOMAIN_POSIX
;
1257 return DISPATCH_ERROR_DOMAIN_NO_ERROR
;
1260 #endif /* DISPATCH_NO_LEGACY */
1262 // Updates the ordered list of timers based on next fire date for changes to ds.
1263 // Should only be called from the context of _dispatch_mgr_q.
1265 _dispatch_timer_list_update(dispatch_source_t ds
)
1267 dispatch_source_t dsi
= NULL
;
1270 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q
);
1272 // do not reschedule timers unregistered with _dispatch_kevent_release()
1277 // Ensure the source is on the global kevent lists before it is removed and
1279 _dispatch_kevent_merge(ds
);
1281 TAILQ_REMOVE(&ds
->ds_dkev
->dk_sources
, ds
, ds_list
);
1283 // change the list if the clock type has changed
1284 if (ds
->ds_timer
.flags
& DISPATCH_TIMER_WALL_CLOCK
) {
1285 idx
= DISPATCH_TIMER_INDEX_WALL
;
1287 idx
= DISPATCH_TIMER_INDEX_MACH
;
1289 ds
->ds_dkev
= &_dispatch_kevent_timer
[idx
];
1291 if (ds
->ds_timer
.target
) {
1292 TAILQ_FOREACH(dsi
, &ds
->ds_dkev
->dk_sources
, ds_list
) {
1293 if (dsi
->ds_timer
.target
== 0 || ds
->ds_timer
.target
< dsi
->ds_timer
.target
) {
1300 TAILQ_INSERT_BEFORE(dsi
, ds
, ds_list
);
1302 TAILQ_INSERT_TAIL(&ds
->ds_dkev
->dk_sources
, ds
, ds_list
);
1307 _dispatch_run_timers2(unsigned int timer
)
1309 dispatch_source_t ds
;
1310 uint64_t now
, missed
;
1312 if (timer
== DISPATCH_TIMER_INDEX_MACH
) {
1313 now
= mach_absolute_time();
1315 now
= _dispatch_get_nanoseconds();
1318 while ((ds
= TAILQ_FIRST(&_dispatch_kevent_timer
[timer
].dk_sources
))) {
1319 // We may find timers on the wrong list due to a pending update from
1320 // dispatch_source_set_timer. Force an update of the list in that case.
1321 if (timer
!= ds
->ds_ident_hack
) {
1322 _dispatch_timer_list_update(ds
);
1325 if (!ds
->ds_timer
.target
) {
1326 // no configured timers on the list
1329 if (ds
->ds_timer
.target
> now
) {
1330 // Done running timers for now.
1334 if (ds
->ds_timer
.flags
& (DISPATCH_TIMER_ONESHOT
|DISPATCH_TIMER_ABSOLUTE
)) {
1335 dispatch_atomic_inc(&ds
->ds_pending_data
);
1336 ds
->ds_timer
.target
= 0;
1338 // Calculate number of missed intervals.
1339 missed
= (now
- ds
->ds_timer
.target
) / ds
->ds_timer
.interval
;
1340 dispatch_atomic_add(&ds
->ds_pending_data
, missed
+ 1);
1341 ds
->ds_timer
.target
+= (missed
+ 1) * ds
->ds_timer
.interval
;
1344 _dispatch_timer_list_update(ds
);
1345 _dispatch_wakeup(ds
);
1350 _dispatch_run_timers(void)
1353 for (i
= 0; i
< DISPATCH_TIMER_COUNT
; i
++) {
1354 _dispatch_run_timers2(i
);
1358 #if defined(__i386__) || defined(__x86_64__)
1359 // these architectures always return mach_absolute_time() in nanoseconds
1360 #define _dispatch_convert_mach2nano(x) (x)
1361 #define _dispatch_convert_nano2mach(x) (x)
1363 static mach_timebase_info_data_t tbi
;
1364 static dispatch_once_t tbi_pred
;
1367 _dispatch_convert_init(void *context
__attribute__((unused
)))
1369 dispatch_assume_zero(mach_timebase_info(&tbi
));
1373 _dispatch_convert_mach2nano(uint64_t val
)
1381 dispatch_once_f(&tbi_pred
, NULL
, _dispatch_convert_init
);
1391 _dispatch_convert_nano2mach(uint64_t val
)
1399 dispatch_once_f(&tbi_pred
, NULL
, _dispatch_convert_init
);
1409 // approx 1 year (60s * 60m * 24h * 365d)
1410 #define FOREVER_SEC 3153600l
1411 #define FOREVER_NSEC 31536000000000000ull
1414 _dispatch_get_next_timer_fire(struct timespec
*howsoon
)
1416 // <rdar://problem/6459649>
1417 // kevent(2) does not allow large timeouts, so we use a long timeout
1418 // instead (approximately 1 year).
1419 dispatch_source_t ds
= NULL
;
1421 uint64_t now
, delta_tmp
, delta
= UINT64_MAX
;
1423 // We are looking for the first unsuspended timer which has its target
1424 // time set. Given timers are kept in order, if we hit an timer that's
1425 // unset there's no point in continuing down the list.
1426 for (timer
= 0; timer
< DISPATCH_TIMER_COUNT
; timer
++) {
1427 TAILQ_FOREACH(ds
, &_dispatch_kevent_timer
[timer
].dk_sources
, ds_list
) {
1428 if (!ds
->ds_timer
.target
) {
1431 if (DISPATCH_OBJECT_SUSPENDED(ds
)) {
1432 ds
->ds_is_armed
= false;
1438 if (!ds
|| !ds
->ds_timer
.target
) {
1442 if (ds
->ds_timer
.flags
& DISPATCH_TIMER_WALL_CLOCK
) {
1443 now
= _dispatch_get_nanoseconds();
1445 now
= mach_absolute_time();
1447 if (ds
->ds_timer
.target
<= now
) {
1448 howsoon
->tv_sec
= 0;
1449 howsoon
->tv_nsec
= 0;
1453 // the subtraction cannot go negative because the previous "if"
1454 // verified that the target is greater than now.
1455 delta_tmp
= ds
->ds_timer
.target
- now
;
1456 if (!(ds
->ds_timer
.flags
& DISPATCH_TIMER_WALL_CLOCK
)) {
1457 delta_tmp
= _dispatch_convert_mach2nano(delta_tmp
);
1459 if (delta_tmp
< delta
) {
1463 if (slowpath(delta
> FOREVER_NSEC
)) {
1466 howsoon
->tv_sec
= (time_t)(delta
/ NSEC_PER_SEC
);
1467 howsoon
->tv_nsec
= (long)(delta
% NSEC_PER_SEC
);
1472 struct dispatch_set_timer_params
{
1473 dispatch_source_t ds
;
1475 struct dispatch_timer_source_s values
;
1478 // To be called from the context of the _dispatch_mgr_q
1480 _dispatch_source_set_timer2(void *context
)
1482 struct dispatch_set_timer_params
*params
= context
;
1483 dispatch_source_t ds
= params
->ds
;
1484 ds
->ds_ident_hack
= params
->ident
;
1485 ds
->ds_timer
= params
->values
;
1486 _dispatch_timer_list_update(ds
);
1487 dispatch_resume(ds
);
1488 dispatch_release(ds
);
1493 dispatch_source_set_timer(dispatch_source_t ds
,
1494 dispatch_time_t start
,
1498 struct dispatch_set_timer_params
*params
;
1500 // we use zero internally to mean disabled
1501 if (interval
== 0) {
1503 } else if ((int64_t)interval
< 0) {
1504 // 6866347 - make sure nanoseconds won't overflow
1505 interval
= INT64_MAX
;
1508 // Suspend the source so that it doesn't fire with pending changes
1509 // The use of suspend/resume requires the external retain/release
1510 dispatch_retain(ds
);
1511 dispatch_suspend(ds
);
1513 if (start
== DISPATCH_TIME_NOW
) {
1514 start
= mach_absolute_time();
1515 } else if (start
== DISPATCH_TIME_FOREVER
) {
1519 while (!(params
= malloc(sizeof(struct dispatch_set_timer_params
)))) {
1524 params
->values
.flags
= ds
->ds_timer
.flags
;
1526 if ((int64_t)start
< 0) {
1528 params
->ident
= DISPATCH_TIMER_INDEX_WALL
;
1529 params
->values
.start
= -((int64_t)start
);
1530 params
->values
.target
= -((int64_t)start
);
1531 params
->values
.interval
= interval
;
1532 params
->values
.leeway
= leeway
;
1533 params
->values
.flags
|= DISPATCH_TIMER_WALL_CLOCK
;
1536 params
->ident
= DISPATCH_TIMER_INDEX_MACH
;
1537 params
->values
.start
= start
;
1538 params
->values
.target
= start
;
1539 params
->values
.interval
= _dispatch_convert_nano2mach(interval
);
1540 params
->values
.leeway
= _dispatch_convert_nano2mach(leeway
);
1541 params
->values
.flags
&= ~DISPATCH_TIMER_WALL_CLOCK
;
1544 dispatch_barrier_async_f(&_dispatch_mgr_q
, params
, _dispatch_source_set_timer2
);
1547 #ifndef DISPATCH_NO_LEGACY
1550 dispatch_source_timer_set_time(dispatch_source_t ds
, uint64_t nanoseconds
, uint64_t leeway
)
1552 dispatch_time_t start
;
1553 if (nanoseconds
== 0) {
1556 if (ds
->ds_timer
.flags
== (DISPATCH_TIMER_ABSOLUTE
|DISPATCH_TIMER_WALL_CLOCK
)) {
1557 static const struct timespec t0
;
1558 start
= dispatch_walltime(&t0
, nanoseconds
);
1559 } else if (ds
->ds_timer
.flags
& DISPATCH_TIMER_WALL_CLOCK
) {
1560 start
= dispatch_walltime(DISPATCH_TIME_NOW
, nanoseconds
);
1562 start
= dispatch_time(DISPATCH_TIME_NOW
, nanoseconds
);
1564 if (ds
->ds_timer
.flags
& (DISPATCH_TIMER_ABSOLUTE
|DISPATCH_TIMER_ONESHOT
)) {
1565 // 6866347 - make sure nanoseconds won't overflow
1566 nanoseconds
= INT64_MAX
; // non-repeating (~292 years)
1568 dispatch_source_set_timer(ds
, start
, nanoseconds
, leeway
);
1574 dispatch_event_get_nanoseconds(dispatch_source_t ds
)
1576 if (ds
->ds_timer
.flags
& DISPATCH_TIMER_WALL_CLOCK
) {
1577 return ds
->ds_timer
.interval
;
1579 return _dispatch_convert_mach2nano(ds
->ds_timer
.interval
);
1582 #endif /* DISPATCH_NO_LEGACY */
1584 static dispatch_source_t _dispatch_mach_notify_source
;
1585 static mach_port_t _dispatch_port_set
;
1586 static mach_port_t _dispatch_event_port
;
1588 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
1589 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
1591 #define _DISPATCH_MACHPORT_HASH_SIZE 32
1592 #define _DISPATCH_MACHPORT_HASH(x) _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
1594 static void _dispatch_port_set_init(void *);
1595 static mach_port_t
_dispatch_get_port_set(void);
1598 _dispatch_drain_mach_messages(struct kevent
*ke
)
1600 dispatch_source_t dsi
;
1601 dispatch_kevent_t dk
;
1604 if (!dispatch_assume(ke
->data
)) {
1607 dk
= _dispatch_kevent_find(ke
->data
, EVFILT_MACHPORT
);
1608 if (!dispatch_assume(dk
)) {
1611 _dispatch_kevent_machport_disable(dk
); // emulate EV_DISPATCH
1613 EV_SET(&ke2
, ke
->data
, EVFILT_MACHPORT
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
, DISPATCH_MACHPORT_RECV
, 0, dk
);
1615 TAILQ_FOREACH(dsi
, &dk
->dk_sources
, ds_list
) {
1616 _dispatch_source_merge_kevent(dsi
, &ke2
);
1621 _dispatch_port_set_init(void *context
__attribute__((unused
)))
1623 struct kevent kev
= {
1624 .filter
= EVFILT_MACHPORT
,
1629 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET
, &_dispatch_port_set
);
1630 DISPATCH_VERIFY_MIG(kr
);
1631 dispatch_assume_zero(kr
);
1632 kr
= mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE
, &_dispatch_event_port
);
1633 DISPATCH_VERIFY_MIG(kr
);
1634 dispatch_assume_zero(kr
);
1635 kr
= mach_port_move_member(mach_task_self(), _dispatch_event_port
, _dispatch_port_set
);
1636 DISPATCH_VERIFY_MIG(kr
);
1637 dispatch_assume_zero(kr
);
1639 kev
.ident
= _dispatch_port_set
;
1641 _dispatch_update_kq(&kev
);
1645 _dispatch_get_port_set(void)
1647 static dispatch_once_t pred
;
1649 dispatch_once_f(&pred
, NULL
, _dispatch_port_set_init
);
1651 return _dispatch_port_set
;
1655 _dispatch_kevent_machport_resume(dispatch_kevent_t dk
, uint32_t new_flags
, uint32_t del_flags
)
1657 mach_port_t previous
, port
= (mach_port_t
)dk
->dk_kevent
.ident
;
1660 if ((new_flags
& DISPATCH_MACHPORT_RECV
) || (!new_flags
&& !del_flags
&& dk
->dk_kevent
.fflags
& DISPATCH_MACHPORT_RECV
)) {
1661 _dispatch_kevent_machport_enable(dk
);
1663 if (new_flags
& DISPATCH_MACHPORT_DEAD
) {
1664 kr
= mach_port_request_notification(mach_task_self(), port
, MACH_NOTIFY_DEAD_NAME
, 1,
1665 _dispatch_event_port
, MACH_MSG_TYPE_MAKE_SEND_ONCE
, &previous
);
1666 DISPATCH_VERIFY_MIG(kr
);
1670 case KERN_INVALID_NAME
:
1671 case KERN_INVALID_RIGHT
:
1675 // Else, we dont expect any errors from mach. Log any errors if we do
1676 if (dispatch_assume_zero(kr
)) {
1678 } else if (dispatch_assume_zero(previous
)) {
1679 // Another subsystem has beat libdispatch to requesting the Mach
1680 // dead-name notification on this port. We should technically cache the
1681 // previous port and message it when the kernel messages our port. Or
1682 // we can just say screw those subsystems and drop the previous port.
1683 // They should adopt libdispatch :-P
1684 kr
= mach_port_deallocate(mach_task_self(), previous
);
1685 DISPATCH_VERIFY_MIG(kr
);
1686 dispatch_assume_zero(kr
);
1691 if (del_flags
& DISPATCH_MACHPORT_RECV
) {
1692 _dispatch_kevent_machport_disable(dk
);
1694 if (del_flags
& DISPATCH_MACHPORT_DEAD
) {
1695 kr
= mach_port_request_notification(mach_task_self(), (mach_port_t
)dk
->dk_kevent
.ident
,
1696 MACH_NOTIFY_DEAD_NAME
, 1, MACH_PORT_NULL
, MACH_MSG_TYPE_MAKE_SEND_ONCE
, &previous
);
1697 DISPATCH_VERIFY_MIG(kr
);
1700 case KERN_INVALID_NAME
:
1701 case KERN_INVALID_RIGHT
:
1702 case KERN_INVALID_ARGUMENT
:
1705 if (dispatch_assume_zero(kr
)) {
1707 } else if (previous
) {
1708 // the kernel has not consumed the right yet
1709 dispatch_assume_zero(_dispatch_send_consume_send_once_right(previous
));
1716 _dispatch_kevent_machport_enable(dispatch_kevent_t dk
)
1718 mach_port_t mp
= (mach_port_t
)dk
->dk_kevent
.ident
;
1721 kr
= mach_port_move_member(mach_task_self(), mp
, _dispatch_get_port_set());
1722 DISPATCH_VERIFY_MIG(kr
);
1724 case KERN_INVALID_NAME
:
1726 _dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp
);
1730 dispatch_assume_zero(kr
);
1735 _dispatch_kevent_machport_disable(dispatch_kevent_t dk
)
1737 mach_port_t mp
= (mach_port_t
)dk
->dk_kevent
.ident
;
1740 kr
= mach_port_move_member(mach_task_self(), mp
, 0);
1741 DISPATCH_VERIFY_MIG(kr
);
1743 case KERN_INVALID_RIGHT
:
1744 case KERN_INVALID_NAME
:
1746 _dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp
);
1752 dispatch_assume_zero(kr
);
1757 #define _DISPATCH_MIN_MSG_SZ (8ul * 1024ul - MAX_TRAILER_SIZE)
1758 #ifndef DISPATCH_NO_LEGACY
1760 dispatch_source_mig_create(mach_port_t mport
, size_t max_msg_size
, dispatch_source_attr_t attr
,
1761 dispatch_queue_t dq
, dispatch_mig_callback_t mig_callback
)
1763 if (max_msg_size
< _DISPATCH_MIN_MSG_SZ
) {
1764 max_msg_size
= _DISPATCH_MIN_MSG_SZ
;
1766 return dispatch_source_machport_create(mport
, DISPATCH_MACHPORT_RECV
, attr
, dq
,
1767 ^(dispatch_source_t ds
) {
1768 if (!dispatch_source_get_error(ds
, NULL
)) {
1769 if (dq
->dq_width
!= 1) {
1770 dispatch_retain(ds
); // this is a shim -- use the external retain
1771 dispatch_async(dq
, ^{
1772 dispatch_mig_server(ds
, max_msg_size
, mig_callback
);
1773 dispatch_release(ds
); // this is a shim -- use the external release
1776 dispatch_mig_server(ds
, max_msg_size
, mig_callback
);
1781 #endif /* DISPATCH_NO_LEGACY */
1784 _dispatch_mach_notify_source_init(void *context
__attribute__((unused
)))
1786 size_t maxsz
= sizeof(union __RequestUnion___dispatch_send_libdispatch_internal_protocol_subsystem
);
1788 if (sizeof(union __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem
) > maxsz
) {
1789 maxsz
= sizeof(union __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem
);
1792 _dispatch_get_port_set();
1794 _dispatch_mach_notify_source
= dispatch_source_mig_create(_dispatch_event_port
,
1795 maxsz
, NULL
, &_dispatch_mgr_q
, libdispatch_internal_protocol_server
);
1797 dispatch_assert(_dispatch_mach_notify_source
);
1801 _dispatch_mach_notify_port_deleted(mach_port_t notify
__attribute__((unused
)), mach_port_name_t name
)
1803 dispatch_source_t dsi
;
1804 dispatch_kevent_t dk
;
1808 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x deleted prematurely", name
);
1811 dk
= _dispatch_kevent_find(name
, EVFILT_MACHPORT
);
1816 EV_SET(&kev
, name
, EVFILT_MACHPORT
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
|EV_EOF
, DISPATCH_MACHPORT_DELETED
, 0, dk
);
1818 TAILQ_FOREACH(dsi
, &dk
->dk_sources
, ds_list
) {
1819 _dispatch_source_merge_kevent(dsi
, &kev
);
1820 // this can never happen again
1821 // this must happen after the merge
1822 // this may be racy in the future, but we don't provide a 'setter' API for the mask yet
1823 dsi
->ds_pending_data_mask
&= ~DISPATCH_MACHPORT_DELETED
;
1826 // no more sources have this flag
1827 dk
->dk_kevent
.fflags
&= ~DISPATCH_MACHPORT_DELETED
;
1830 return KERN_SUCCESS
;
1834 _dispatch_mach_notify_port_destroyed(mach_port_t notify
__attribute__((unused
)), mach_port_t name
)
1837 // this function should never be called
1838 dispatch_assume_zero(name
);
1839 kr
= mach_port_mod_refs(mach_task_self(), name
, MACH_PORT_RIGHT_RECEIVE
, -1);
1840 DISPATCH_VERIFY_MIG(kr
);
1841 dispatch_assume_zero(kr
);
1842 return KERN_SUCCESS
;
1846 _dispatch_mach_notify_no_senders(mach_port_t notify
, mach_port_mscount_t mscnt
__attribute__((unused
)))
1848 // this function should never be called
1849 dispatch_assume_zero(notify
);
1850 return KERN_SUCCESS
;
1854 _dispatch_mach_notify_send_once(mach_port_t notify
__attribute__((unused
)))
1856 // we only register for dead-name notifications
1857 // some code deallocated our send-once right without consuming it
1859 _dispatch_log("Corruption: An app/library deleted a libdispatch dead-name notification");
1861 return KERN_SUCCESS
;
1865 _dispatch_mach_notify_dead_name(mach_port_t notify
__attribute__((unused
)), mach_port_name_t name
)
1867 dispatch_source_t dsi
;
1868 dispatch_kevent_t dk
;
1872 dk
= _dispatch_kevent_find(name
, EVFILT_MACHPORT
);
1877 EV_SET(&kev
, name
, EVFILT_MACHPORT
, EV_ADD
|EV_ENABLE
|EV_DISPATCH
|EV_EOF
, DISPATCH_MACHPORT_DEAD
, 0, dk
);
1879 TAILQ_FOREACH(dsi
, &dk
->dk_sources
, ds_list
) {
1880 _dispatch_source_merge_kevent(dsi
, &kev
);
1881 // this can never happen again
1882 // this must happen after the merge
1883 // this may be racy in the future, but we don't provide a 'setter' API for the mask yet
1884 dsi
->ds_pending_data_mask
&= ~DISPATCH_MACHPORT_DEAD
;
1887 // no more sources have this flag
1888 dk
->dk_kevent
.fflags
&= ~DISPATCH_MACHPORT_DEAD
;
1891 // the act of receiving a dead name notification allocates a dead-name right that must be deallocated
1892 kr
= mach_port_deallocate(mach_task_self(), name
);
1893 DISPATCH_VERIFY_MIG(kr
);
1894 //dispatch_assume_zero(kr);
1896 return KERN_SUCCESS
;
1900 _dispatch_wakeup_main_thread(mach_port_t mp
__attribute__((unused
)))
1902 // dummy function just to pop out the main thread out of mach_msg()
1907 _dispatch_consume_send_once_right(mach_port_t mp
__attribute__((unused
)))
1909 // dummy function to consume a send-once right
1914 dispatch_mig_server(dispatch_source_t ds
, size_t maxmsgsz
, dispatch_mig_callback_t callback
)
1916 mach_msg_options_t options
= MACH_RCV_MSG
| MACH_RCV_TIMEOUT
1917 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX
)
1918 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0
);
1919 mach_msg_options_t tmp_options
= options
;
1920 mig_reply_error_t
*bufTemp
, *bufRequest
, *bufReply
;
1921 mach_msg_return_t kr
= 0;
1922 unsigned int cnt
= 1000; // do not stall out serial queues
1925 maxmsgsz
+= MAX_TRAILER_SIZE
;
1927 // XXX FIXME -- allocate these elsewhere
1928 bufRequest
= alloca(maxmsgsz
);
1929 bufReply
= alloca(maxmsgsz
);
1930 bufReply
->Head
.msgh_size
= 0; // make CLANG happy
1932 // XXX FIXME -- change this to not starve out the target queue
1934 if (DISPATCH_OBJECT_SUSPENDED(ds
) || (--cnt
== 0)) {
1935 options
&= ~MACH_RCV_MSG
;
1936 tmp_options
&= ~MACH_RCV_MSG
;
1938 if (!(tmp_options
& MACH_SEND_MSG
)) {
1943 kr
= mach_msg(&bufReply
->Head
, tmp_options
, bufReply
->Head
.msgh_size
,
1944 (mach_msg_size_t
)maxmsgsz
, (mach_port_t
)ds
->ds_ident_hack
, 0, 0);
1946 tmp_options
= options
;
1950 case MACH_SEND_INVALID_DEST
:
1951 case MACH_SEND_TIMED_OUT
:
1952 if (bufReply
->Head
.msgh_bits
& MACH_MSGH_BITS_COMPLEX
) {
1953 mach_msg_destroy(&bufReply
->Head
);
1956 case MACH_RCV_TIMED_OUT
:
1957 case MACH_RCV_INVALID_NAME
:
1960 dispatch_assume_zero(kr
);
1966 if (!(tmp_options
& MACH_RCV_MSG
)) {
1970 bufTemp
= bufRequest
;
1971 bufRequest
= bufReply
;
1974 demux_success
= callback(&bufRequest
->Head
, &bufReply
->Head
);
1976 if (!demux_success
) {
1977 // destroy the request - but not the reply port
1978 bufRequest
->Head
.msgh_remote_port
= 0;
1979 mach_msg_destroy(&bufRequest
->Head
);
1980 } else if (!(bufReply
->Head
.msgh_bits
& MACH_MSGH_BITS_COMPLEX
)) {
1981 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode is present
1982 if (slowpath(bufReply
->RetCode
)) {
1983 if (bufReply
->RetCode
== MIG_NO_REPLY
) {
1987 // destroy the request - but not the reply port
1988 bufRequest
->Head
.msgh_remote_port
= 0;
1989 mach_msg_destroy(&bufRequest
->Head
);
1993 if (bufReply
->Head
.msgh_remote_port
) {
1994 tmp_options
|= MACH_SEND_MSG
;
1995 if (MACH_MSGH_BITS_REMOTE(bufReply
->Head
.msgh_bits
) != MACH_MSG_TYPE_MOVE_SEND_ONCE
) {
1996 tmp_options
|= MACH_SEND_TIMEOUT
;