]> git.saurik.com Git - apple/libdispatch.git/blob - src/source.c
libdispatch-84.5.1.tar.gz
[apple/libdispatch.git] / src / source.c
1 /*
2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #include "protocol.h"
23 #include "protocolServer.h"
24 #include <sys/mount.h>
25
26 #define DISPATCH_EVFILT_TIMER (-EVFILT_SYSCOUNT - 1)
27 #define DISPATCH_EVFILT_CUSTOM_ADD (-EVFILT_SYSCOUNT - 2)
28 #define DISPATCH_EVFILT_CUSTOM_OR (-EVFILT_SYSCOUNT - 3)
29 #define DISPATCH_EVFILT_SYSCOUNT (EVFILT_SYSCOUNT + 3)
30
31 #define DISPATCH_TIMER_INDEX_WALL 0
32 #define DISPATCH_TIMER_INDEX_MACH 1
33 static struct dispatch_kevent_s _dispatch_kevent_timer[] = {
34 {
35 .dk_kevent = {
36 .ident = DISPATCH_TIMER_INDEX_WALL,
37 .filter = DISPATCH_EVFILT_TIMER,
38 .udata = &_dispatch_kevent_timer[0],
39 },
40 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[0].dk_sources),
41 },
42 {
43 .dk_kevent = {
44 .ident = DISPATCH_TIMER_INDEX_MACH,
45 .filter = DISPATCH_EVFILT_TIMER,
46 .udata = &_dispatch_kevent_timer[1],
47 },
48 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[1].dk_sources),
49 },
50 };
51 #define DISPATCH_TIMER_COUNT (sizeof _dispatch_kevent_timer / sizeof _dispatch_kevent_timer[0])
52
53 static struct dispatch_kevent_s _dispatch_kevent_data_or = {
54 .dk_kevent = {
55 .filter = DISPATCH_EVFILT_CUSTOM_OR,
56 .flags = EV_CLEAR,
57 .udata = &_dispatch_kevent_data_or,
58 },
59 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
60 };
61 static struct dispatch_kevent_s _dispatch_kevent_data_add = {
62 .dk_kevent = {
63 .filter = DISPATCH_EVFILT_CUSTOM_ADD,
64 .udata = &_dispatch_kevent_data_add,
65 },
66 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
67 };
68
69 #ifndef DISPATCH_NO_LEGACY
70 struct dispatch_source_attr_vtable_s {
71 DISPATCH_VTABLE_HEADER(dispatch_source_attr_s);
72 };
73
74 struct dispatch_source_attr_s {
75 DISPATCH_STRUCT_HEADER(dispatch_source_attr_s, dispatch_source_attr_vtable_s);
76 void* finalizer_ctxt;
77 dispatch_source_finalizer_function_t finalizer_func;
78 void* context;
79 };
80 #endif /* DISPATCH_NO_LEGACY */
81
82 #define _dispatch_source_call_block ((void *)-1)
83 static void _dispatch_source_latch_and_call(dispatch_source_t ds);
84 static void _dispatch_source_cancel_callout(dispatch_source_t ds);
85 static bool _dispatch_source_probe(dispatch_source_t ds);
86 static void _dispatch_source_dispose(dispatch_source_t ds);
87 static void _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke);
88 static size_t _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz);
89 static size_t dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz);
90 static dispatch_queue_t _dispatch_source_invoke(dispatch_source_t ds);
91
92 static void _dispatch_kevent_merge(dispatch_source_t ds);
93 static void _dispatch_kevent_release(dispatch_source_t ds);
94 static void _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags);
95 static void _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags);
96 static void _dispatch_kevent_machport_enable(dispatch_kevent_t dk);
97 static void _dispatch_kevent_machport_disable(dispatch_kevent_t dk);
98
99 static void _dispatch_drain_mach_messages(struct kevent *ke);
100 static void _dispatch_timer_list_update(dispatch_source_t ds);
101
102 static void
103 _dispatch_mach_notify_source_init(void *context __attribute__((unused)));
104
105 static const char *
106 _evfiltstr(short filt)
107 {
108 switch (filt) {
109 #define _evfilt2(f) case (f): return #f
110 _evfilt2(EVFILT_READ);
111 _evfilt2(EVFILT_WRITE);
112 _evfilt2(EVFILT_AIO);
113 _evfilt2(EVFILT_VNODE);
114 _evfilt2(EVFILT_PROC);
115 _evfilt2(EVFILT_SIGNAL);
116 _evfilt2(EVFILT_TIMER);
117 _evfilt2(EVFILT_MACHPORT);
118 _evfilt2(EVFILT_FS);
119 _evfilt2(EVFILT_USER);
120 _evfilt2(EVFILT_SESSION);
121
122 _evfilt2(DISPATCH_EVFILT_TIMER);
123 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
124 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
125 default:
126 return "EVFILT_missing";
127 }
128 }
129
130 #define DSL_HASH_SIZE 256u // must be a power of two
131 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
132
133 static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
134
135 static dispatch_kevent_t
136 _dispatch_kevent_find(uintptr_t ident, short filter)
137 {
138 uintptr_t hash = DSL_HASH(filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
139 dispatch_kevent_t dki;
140
141 TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
142 if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
143 break;
144 }
145 }
146 return dki;
147 }
148
149 static void
150 _dispatch_kevent_insert(dispatch_kevent_t dk)
151 {
152 uintptr_t ident = dk->dk_kevent.ident;
153 uintptr_t hash = DSL_HASH(dk->dk_kevent.filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
154
155 TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
156 }
157
158 void
159 dispatch_source_cancel(dispatch_source_t ds)
160 {
161 #if DISPATCH_DEBUG
162 dispatch_debug(ds, __FUNCTION__);
163 #endif
164 // Right after we set the cancel flag, someone else
165 // could potentially invoke the source, do the cancelation,
166 // unregister the source, and deallocate it. We would
167 // need to therefore retain/release before setting the bit
168
169 _dispatch_retain(ds);
170 dispatch_atomic_or(&ds->ds_atomic_flags, DSF_CANCELED);
171 _dispatch_wakeup(ds);
172 _dispatch_release(ds);
173 }
174
175 #ifndef DISPATCH_NO_LEGACY
176 void
177 _dispatch_source_legacy_xref_release(dispatch_source_t ds)
178 {
179 if (ds->ds_is_legacy) {
180 if (!(ds->ds_timer.flags & DISPATCH_TIMER_ONESHOT)) {
181 dispatch_source_cancel(ds);
182 }
183
184 // Clients often leave sources suspended at the last release
185 dispatch_atomic_and(&ds->do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK);
186 } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
187 // Arguments for and against this assert are within 6705399
188 DISPATCH_CLIENT_CRASH("Release of a suspended object");
189 }
190 _dispatch_wakeup(ds);
191 _dispatch_release(ds);
192 }
193 #endif /* DISPATCH_NO_LEGACY */
194
195 long
196 dispatch_source_testcancel(dispatch_source_t ds)
197 {
198 return (bool)(ds->ds_atomic_flags & DSF_CANCELED);
199 }
200
201
202 unsigned long
203 dispatch_source_get_mask(dispatch_source_t ds)
204 {
205 return ds->ds_pending_data_mask;
206 }
207
208 uintptr_t
209 dispatch_source_get_handle(dispatch_source_t ds)
210 {
211 return (int)ds->ds_ident_hack;
212 }
213
214 unsigned long
215 dispatch_source_get_data(dispatch_source_t ds)
216 {
217 return ds->ds_data;
218 }
219
220 #if DISPATCH_DEBUG
221 void
222 dispatch_debug_kevents(struct kevent* kev, size_t count, const char* str)
223 {
224 size_t i;
225 for (i = 0; i < count; ++i) {
226 _dispatch_log("kevent[%lu] = { ident = %p, filter = %s, flags = 0x%x, fflags = 0x%x, data = %p, udata = %p }: %s",
227 i, (void*)kev[i].ident, _evfiltstr(kev[i].filter), kev[i].flags, kev[i].fflags, (void*)kev[i].data, (void*)kev[i].udata, str);
228 }
229 }
230 #endif
231
232 static size_t
233 _dispatch_source_kevent_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
234 {
235 size_t offset = _dispatch_source_debug(ds, buf, bufsiz);
236 offset += snprintf(&buf[offset], bufsiz - offset, "filter = %s }",
237 ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????");
238 return offset;
239 }
240
241 static void
242 _dispatch_source_init_tail_queue_array(void *context __attribute__((unused)))
243 {
244 unsigned int i;
245 for (i = 0; i < DSL_HASH_SIZE; i++) {
246 TAILQ_INIT(&_dispatch_sources[i]);
247 }
248
249 TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_WALL)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL], dk_list);
250 TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_MACH)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH], dk_list);
251 TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_or, dk_list);
252 TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_add, dk_list);
253 }
254
255 // Find existing kevents, and merge any new flags if necessary
256 void
257 _dispatch_kevent_merge(dispatch_source_t ds)
258 {
259 static dispatch_once_t pred;
260 dispatch_kevent_t dk;
261 typeof(dk->dk_kevent.fflags) new_flags;
262 bool do_resume = false;
263
264 if (ds->ds_is_installed) {
265 return;
266 }
267 ds->ds_is_installed = true;
268
269 dispatch_once_f(&pred, NULL, _dispatch_source_init_tail_queue_array);
270
271 dk = _dispatch_kevent_find(ds->ds_dkev->dk_kevent.ident, ds->ds_dkev->dk_kevent.filter);
272
273 if (dk) {
274 // If an existing dispatch kevent is found, check to see if new flags
275 // need to be added to the existing kevent
276 new_flags = ~dk->dk_kevent.fflags & ds->ds_dkev->dk_kevent.fflags;
277 dk->dk_kevent.fflags |= ds->ds_dkev->dk_kevent.fflags;
278 free(ds->ds_dkev);
279 ds->ds_dkev = dk;
280 do_resume = new_flags;
281 } else {
282 dk = ds->ds_dkev;
283 _dispatch_kevent_insert(dk);
284 new_flags = dk->dk_kevent.fflags;
285 do_resume = true;
286 }
287
288 TAILQ_INSERT_TAIL(&dk->dk_sources, ds, ds_list);
289
290 // Re-register the kevent with the kernel if new flags were added
291 // by the dispatch kevent
292 if (do_resume) {
293 dk->dk_kevent.flags |= EV_ADD;
294 _dispatch_kevent_resume(ds->ds_dkev, new_flags, 0);
295 ds->ds_is_armed = true;
296 }
297 }
298
299
300 void
301 _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags)
302 {
303 switch (dk->dk_kevent.filter) {
304 case DISPATCH_EVFILT_TIMER:
305 case DISPATCH_EVFILT_CUSTOM_ADD:
306 case DISPATCH_EVFILT_CUSTOM_OR:
307 // these types not registered with kevent
308 return;
309 case EVFILT_MACHPORT:
310 _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
311 break;
312 case EVFILT_PROC:
313 if (dk->dk_kevent.flags & EV_ONESHOT) {
314 return;
315 }
316 // fall through
317 default:
318 _dispatch_update_kq(&dk->dk_kevent);
319 if (dk->dk_kevent.flags & EV_DISPATCH) {
320 dk->dk_kevent.flags &= ~EV_ADD;
321 }
322 break;
323 }
324 }
325
326 dispatch_queue_t
327 _dispatch_source_invoke(dispatch_source_t ds)
328 {
329 // This function performs all source actions. Each action is responsible
330 // for verifying that it takes place on the appropriate queue. If the
331 // current queue is not the correct queue for this action, the correct queue
332 // will be returned and the invoke will be re-driven on that queue.
333
334 // The order of tests here in invoke and in probe should be consistent.
335
336 dispatch_queue_t dq = _dispatch_queue_get_current();
337
338 if (!ds->ds_is_installed) {
339 // The source needs to be installed on the manager queue.
340 if (dq != &_dispatch_mgr_q) {
341 return &_dispatch_mgr_q;
342 }
343 _dispatch_kevent_merge(ds);
344 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
345 // The source has been cancelled and needs to be uninstalled from the
346 // manager queue. After uninstallation, the cancellation handler needs
347 // to be delivered to the target queue.
348 if (ds->ds_dkev) {
349 if (dq != &_dispatch_mgr_q) {
350 return &_dispatch_mgr_q;
351 }
352 _dispatch_kevent_release(ds);
353 return ds->do_targetq;
354 } else if (ds->ds_cancel_handler) {
355 if (dq != ds->do_targetq) {
356 return ds->do_targetq;
357 }
358 }
359 _dispatch_source_cancel_callout(ds);
360 } else if (ds->ds_pending_data) {
361 // The source has pending data to deliver via the event handler callback
362 // on the target queue. Some sources need to be rearmed on the manager
363 // queue after event delivery.
364 if (dq != ds->do_targetq) {
365 return ds->do_targetq;
366 }
367 _dispatch_source_latch_and_call(ds);
368 if (ds->ds_needs_rearm) {
369 return &_dispatch_mgr_q;
370 }
371 } else if (ds->ds_needs_rearm && !ds->ds_is_armed) {
372 // The source needs to be rearmed on the manager queue.
373 if (dq != &_dispatch_mgr_q) {
374 return &_dispatch_mgr_q;
375 }
376 _dispatch_kevent_resume(ds->ds_dkev, 0, 0);
377 ds->ds_is_armed = true;
378 }
379
380 return NULL;
381 }
382
383 bool
384 _dispatch_source_probe(dispatch_source_t ds)
385 {
386 // This function determines whether the source needs to be invoked.
387 // The order of tests here in probe and in invoke should be consistent.
388
389 if (!ds->ds_is_installed) {
390 // The source needs to be installed on the manager queue.
391 return true;
392 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
393 // The source needs to be uninstalled from the manager queue, or the
394 // cancellation handler needs to be delivered to the target queue.
395 // Note: cancellation assumes installation.
396 if (ds->ds_dkev || ds->ds_cancel_handler) {
397 return true;
398 }
399 } else if (ds->ds_pending_data) {
400 // The source has pending data to deliver to the target queue.
401 return true;
402 } else if (ds->ds_needs_rearm && !ds->ds_is_armed) {
403 // The source needs to be rearmed on the manager queue.
404 return true;
405 }
406 // Nothing to do.
407 return false;
408 }
409
410 void
411 _dispatch_source_dispose(dispatch_source_t ds)
412 {
413 _dispatch_queue_dispose((dispatch_queue_t)ds);
414 }
415
416 static void
417 _dispatch_kevent_debugger2(void *context, dispatch_source_t unused __attribute__((unused)))
418 {
419 struct sockaddr sa;
420 socklen_t sa_len = sizeof(sa);
421 int c, fd = (int)(long)context;
422 unsigned int i;
423 dispatch_kevent_t dk;
424 dispatch_source_t ds;
425 FILE *debug_stream;
426
427 c = accept(fd, &sa, &sa_len);
428 if (c == -1) {
429 if (errno != EAGAIN) {
430 dispatch_assume_zero(errno);
431 }
432 return;
433 }
434 #if 0
435 int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO
436 if (r == -1) {
437 dispatch_assume_zero(errno);
438 }
439 #endif
440 debug_stream = fdopen(c, "a");
441 if (!dispatch_assume(debug_stream)) {
442 close(c);
443 return;
444 }
445
446 fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
447 fprintf(debug_stream, "Content-type: text/html\r\n");
448 fprintf(debug_stream, "Pragma: nocache\r\n");
449 fprintf(debug_stream, "\r\n");
450 fprintf(debug_stream, "<html>\n<head><title>PID %u</title></head>\n<body>\n<ul>\n", getpid());
451
452 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td></tr>\n");
453
454 for (i = 0; i < DSL_HASH_SIZE; i++) {
455 if (TAILQ_EMPTY(&_dispatch_sources[i])) {
456 continue;
457 }
458 TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
459 fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags 0x%hx fflags 0x%x data 0x%lx udata %p\n",
460 dk, dk->dk_kevent.ident, _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
461 dk->dk_kevent.fflags, dk->dk_kevent.data, dk->dk_kevent.udata);
462 fprintf(debug_stream, "\t\t<ul>\n");
463 TAILQ_FOREACH(ds, &dk->dk_sources, ds_list) {
464 fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend 0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
465 ds, ds->do_ref_cnt, ds->do_suspend_cnt, ds->ds_pending_data, ds->ds_pending_data_mask,
466 ds->ds_atomic_flags);
467 if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
468 dispatch_queue_t dq = ds->do_targetq;
469 fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend 0x%x label: %s\n", dq, dq->do_ref_cnt, dq->do_suspend_cnt, dq->dq_label);
470 }
471 }
472 fprintf(debug_stream, "\t\t</ul>\n");
473 fprintf(debug_stream, "\t</li>\n");
474 }
475 }
476 fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
477 fflush(debug_stream);
478 fclose(debug_stream);
479 }
480
481 static void
482 _dispatch_kevent_debugger(void *context __attribute__((unused)))
483 {
484 union {
485 struct sockaddr_in sa_in;
486 struct sockaddr sa;
487 } sa_u = {
488 .sa_in = {
489 .sin_family = AF_INET,
490 .sin_addr = { htonl(INADDR_LOOPBACK), },
491 },
492 };
493 dispatch_source_t ds;
494 const char *valstr;
495 int val, r, fd, sock_opt = 1;
496 socklen_t slen = sizeof(sa_u);
497
498 if (issetugid()) {
499 return;
500 }
501 valstr = getenv("LIBDISPATCH_DEBUGGER");
502 if (!valstr) {
503 return;
504 }
505 val = atoi(valstr);
506 if (val == 2) {
507 sa_u.sa_in.sin_addr.s_addr = 0;
508 }
509 fd = socket(PF_INET, SOCK_STREAM, 0);
510 if (fd == -1) {
511 dispatch_assume_zero(errno);
512 return;
513 }
514 r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt, (socklen_t) sizeof sock_opt);
515 if (r == -1) {
516 dispatch_assume_zero(errno);
517 goto out_bad;
518 }
519 #if 0
520 r = fcntl(fd, F_SETFL, O_NONBLOCK);
521 if (r == -1) {
522 dispatch_assume_zero(errno);
523 goto out_bad;
524 }
525 #endif
526 r = bind(fd, &sa_u.sa, sizeof(sa_u));
527 if (r == -1) {
528 dispatch_assume_zero(errno);
529 goto out_bad;
530 }
531 r = listen(fd, SOMAXCONN);
532 if (r == -1) {
533 dispatch_assume_zero(errno);
534 goto out_bad;
535 }
536 r = getsockname(fd, &sa_u.sa, &slen);
537 if (r == -1) {
538 dispatch_assume_zero(errno);
539 goto out_bad;
540 }
541 ds = dispatch_source_read_create_f(fd, NULL, &_dispatch_mgr_q, (void *)(long)fd, _dispatch_kevent_debugger2);
542 if (dispatch_assume(ds)) {
543 _dispatch_log("LIBDISPATCH: debug port: %hu", ntohs(sa_u.sa_in.sin_port));
544 return;
545 }
546 out_bad:
547 close(fd);
548 }
549
550 void
551 _dispatch_source_drain_kevent(struct kevent *ke)
552 {
553 static dispatch_once_t pred;
554 dispatch_kevent_t dk = ke->udata;
555 dispatch_source_t dsi;
556
557 dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
558
559 dispatch_debug_kevents(ke, 1, __func__);
560
561 if (ke->filter == EVFILT_MACHPORT) {
562 return _dispatch_drain_mach_messages(ke);
563 }
564 dispatch_assert(dk);
565
566 if (ke->flags & EV_ONESHOT) {
567 dk->dk_kevent.flags |= EV_ONESHOT;
568 }
569
570 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
571 _dispatch_source_merge_kevent(dsi, ke);
572 }
573 }
574
575 static void
576 _dispatch_kevent_dispose(dispatch_kevent_t dk)
577 {
578 uintptr_t key;
579
580 switch (dk->dk_kevent.filter) {
581 case DISPATCH_EVFILT_TIMER:
582 case DISPATCH_EVFILT_CUSTOM_ADD:
583 case DISPATCH_EVFILT_CUSTOM_OR:
584 // these sources live on statically allocated lists
585 return;
586 case EVFILT_MACHPORT:
587 _dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
588 break;
589 case EVFILT_PROC:
590 if (dk->dk_kevent.flags & EV_ONESHOT) {
591 break; // implicitly deleted
592 }
593 // fall through
594 default:
595 if (~dk->dk_kevent.flags & EV_DELETE) {
596 dk->dk_kevent.flags |= EV_DELETE;
597 _dispatch_update_kq(&dk->dk_kevent);
598 }
599 break;
600 }
601
602 if (dk->dk_kevent.filter == EVFILT_MACHPORT) {
603 key = MACH_PORT_INDEX(dk->dk_kevent.ident);
604 } else {
605 key = dk->dk_kevent.ident;
606 }
607
608 TAILQ_REMOVE(&_dispatch_sources[DSL_HASH(key)], dk, dk_list);
609 free(dk);
610 }
611
612 void
613 _dispatch_kevent_release(dispatch_source_t ds)
614 {
615 dispatch_kevent_t dk = ds->ds_dkev;
616 dispatch_source_t dsi;
617 uint32_t del_flags, fflags = 0;
618
619 ds->ds_dkev = NULL;
620
621 TAILQ_REMOVE(&dk->dk_sources, ds, ds_list);
622
623 if (TAILQ_EMPTY(&dk->dk_sources)) {
624 _dispatch_kevent_dispose(dk);
625 } else {
626 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
627 fflags |= (uint32_t)dsi->ds_pending_data_mask;
628 }
629 del_flags = (uint32_t)ds->ds_pending_data_mask & ~fflags;
630 if (del_flags) {
631 dk->dk_kevent.flags |= EV_ADD;
632 dk->dk_kevent.fflags = fflags;
633 _dispatch_kevent_resume(dk, 0, del_flags);
634 }
635 }
636
637 ds->ds_is_armed = false;
638 ds->ds_needs_rearm = false; // re-arm is pointless and bad now
639 _dispatch_release(ds); // the retain is done at creation time
640 }
641
642 void
643 _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke)
644 {
645 struct kevent fake;
646
647 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
648 return;
649 }
650
651 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie.
652 // We simulate an exit event in this case. <rdar://problem/5067725>
653 if (ke->flags & EV_ERROR) {
654 if (ke->filter == EVFILT_PROC && ke->data == ESRCH) {
655 fake = *ke;
656 fake.flags &= ~EV_ERROR;
657 fake.fflags = NOTE_EXIT;
658 fake.data = 0;
659 ke = &fake;
660 } else {
661 // log the unexpected error
662 dispatch_assume_zero(ke->data);
663 return;
664 }
665 }
666
667 if (ds->ds_is_level) {
668 // ke->data is signed and "negative available data" makes no sense
669 // zero bytes happens when EV_EOF is set
670 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
671 dispatch_assert(ke->data >= 0l);
672 ds->ds_pending_data = ~ke->data;
673 } else if (ds->ds_is_adder) {
674 dispatch_atomic_add(&ds->ds_pending_data, ke->data);
675 } else {
676 dispatch_atomic_or(&ds->ds_pending_data, ke->fflags & ds->ds_pending_data_mask);
677 }
678
679 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
680 if (ds->ds_needs_rearm) {
681 ds->ds_is_armed = false;
682 }
683
684 _dispatch_wakeup(ds);
685 }
686
687 void
688 _dispatch_source_latch_and_call(dispatch_source_t ds)
689 {
690 unsigned long prev;
691
692 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
693 return;
694 }
695 prev = dispatch_atomic_xchg(&ds->ds_pending_data, 0);
696 if (ds->ds_is_level) {
697 ds->ds_data = ~prev;
698 } else {
699 ds->ds_data = prev;
700 }
701 if (dispatch_assume(prev)) {
702 if (ds->ds_handler_func) {
703 ds->ds_handler_func(ds->ds_handler_ctxt, ds);
704 }
705 }
706 }
707
708 void
709 _dispatch_source_cancel_callout(dispatch_source_t ds)
710 {
711 ds->ds_pending_data_mask = 0;
712 ds->ds_pending_data = 0;
713 ds->ds_data = 0;
714
715 #ifdef __BLOCKS__
716 if (ds->ds_handler_is_block) {
717 Block_release(ds->ds_handler_ctxt);
718 ds->ds_handler_is_block = false;
719 ds->ds_handler_func = NULL;
720 ds->ds_handler_ctxt = NULL;
721 }
722 #endif
723
724 if (!ds->ds_cancel_handler) {
725 return;
726 }
727 if (ds->ds_cancel_is_block) {
728 #ifdef __BLOCKS__
729 dispatch_block_t b = ds->ds_cancel_handler;
730 if (ds->ds_atomic_flags & DSF_CANCELED) {
731 b();
732 }
733 Block_release(ds->ds_cancel_handler);
734 ds->ds_cancel_is_block = false;
735 #endif
736 } else {
737 dispatch_function_t f = ds->ds_cancel_handler;
738 if (ds->ds_atomic_flags & DSF_CANCELED) {
739 f(ds->do_ctxt);
740 }
741 }
742 ds->ds_cancel_handler = NULL;
743 }
744
745 const struct dispatch_source_vtable_s _dispatch_source_kevent_vtable = {
746 .do_type = DISPATCH_SOURCE_KEVENT_TYPE,
747 .do_kind = "kevent-source",
748 .do_invoke = _dispatch_source_invoke,
749 .do_dispose = _dispatch_source_dispose,
750 .do_probe = _dispatch_source_probe,
751 .do_debug = _dispatch_source_kevent_debug,
752 };
753
754 void
755 dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
756 {
757 struct kevent kev = {
758 .fflags = (typeof(kev.fflags))val,
759 .data = val,
760 };
761
762 dispatch_assert(ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
763 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
764
765 _dispatch_source_merge_kevent(ds, &kev);
766 }
767
768 size_t
769 dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
770 {
771 dispatch_queue_t target = ds->do_targetq;
772 return snprintf(buf, bufsiz,
773 "target = %s[%p], pending_data = 0x%lx, pending_data_mask = 0x%lx, ",
774 target ? target->dq_label : "", target,
775 ds->ds_pending_data, ds->ds_pending_data_mask);
776 }
777
778 size_t
779 _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
780 {
781 size_t offset = 0;
782 offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", dx_kind(ds), ds);
783 offset += dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
784 offset += dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
785 return offset;
786 }
787
788 #ifndef DISPATCH_NO_LEGACY
789 static void
790 dispatch_source_attr_dispose(dispatch_source_attr_t attr)
791 {
792 // release the finalizer block if necessary
793 dispatch_source_attr_set_finalizer(attr, NULL);
794 _dispatch_dispose(attr);
795 }
796
797 static const struct dispatch_source_attr_vtable_s dispatch_source_attr_vtable = {
798 .do_type = DISPATCH_SOURCE_ATTR_TYPE,
799 .do_kind = "source-attr",
800 .do_dispose = dispatch_source_attr_dispose,
801 };
802
803 dispatch_source_attr_t
804 dispatch_source_attr_create(void)
805 {
806 dispatch_source_attr_t rval = calloc(1, sizeof(struct dispatch_source_attr_s));
807
808 if (rval) {
809 rval->do_vtable = &dispatch_source_attr_vtable;
810 rval->do_next = DISPATCH_OBJECT_LISTLESS;
811 rval->do_targetq = dispatch_get_global_queue(0, 0);
812 rval->do_ref_cnt = 1;
813 rval->do_xref_cnt = 1;
814 }
815
816 return rval;
817 }
818
819 void
820 dispatch_source_attr_set_finalizer_f(dispatch_source_attr_t attr,
821 void *context, dispatch_source_finalizer_function_t finalizer)
822 {
823 #ifdef __BLOCKS__
824 if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
825 Block_release(attr->finalizer_ctxt);
826 }
827 #endif
828
829 attr->finalizer_ctxt = context;
830 attr->finalizer_func = finalizer;
831 }
832
833 #ifdef __BLOCKS__
834 long
835 dispatch_source_attr_set_finalizer(dispatch_source_attr_t attr,
836 dispatch_source_finalizer_t finalizer)
837 {
838 void *ctxt;
839 dispatch_source_finalizer_function_t func;
840
841 if (finalizer) {
842 if (!(ctxt = Block_copy(finalizer))) {
843 return 1;
844 }
845 func = (void *)_dispatch_call_block_and_release2;
846 } else {
847 ctxt = NULL;
848 func = NULL;
849 }
850
851 dispatch_source_attr_set_finalizer_f(attr, ctxt, func);
852
853 return 0;
854 }
855
856 dispatch_source_finalizer_t
857 dispatch_source_attr_get_finalizer(dispatch_source_attr_t attr)
858 {
859 if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
860 return (dispatch_source_finalizer_t)attr->finalizer_ctxt;
861 } else if (attr->finalizer_func == NULL) {
862 return NULL;
863 } else {
864 abort(); // finalizer is not a block...
865 }
866 }
867 #endif
868
869 void
870 dispatch_source_attr_set_context(dispatch_source_attr_t attr, void *context)
871 {
872 attr->context = context;
873 }
874
875 dispatch_source_attr_t
876 dispatch_source_attr_copy(dispatch_source_attr_t proto)
877 {
878 dispatch_source_attr_t rval = NULL;
879
880 if (proto && (rval = malloc(sizeof(struct dispatch_source_attr_s)))) {
881 memcpy(rval, proto, sizeof(struct dispatch_source_attr_s));
882 #ifdef __BLOCKS__
883 if (rval->finalizer_func == (void*)_dispatch_call_block_and_release2) {
884 rval->finalizer_ctxt = Block_copy(rval->finalizer_ctxt);
885 }
886 #endif
887 } else if (!proto) {
888 rval = dispatch_source_attr_create();
889 }
890 return rval;
891 }
892 #endif /* DISPATCH_NO_LEGACY */
893
894
895 struct dispatch_source_type_s {
896 struct kevent ke;
897 uint64_t mask;
898 };
899
900 const struct dispatch_source_type_s _dispatch_source_type_timer = {
901 .ke = {
902 .filter = DISPATCH_EVFILT_TIMER,
903 },
904 .mask = DISPATCH_TIMER_INTERVAL|DISPATCH_TIMER_ONESHOT|DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_WALL_CLOCK,
905 };
906
907 const struct dispatch_source_type_s _dispatch_source_type_read = {
908 .ke = {
909 .filter = EVFILT_READ,
910 .flags = EV_DISPATCH,
911 },
912 };
913
914 const struct dispatch_source_type_s _dispatch_source_type_write = {
915 .ke = {
916 .filter = EVFILT_WRITE,
917 .flags = EV_DISPATCH,
918 },
919 };
920
921 const struct dispatch_source_type_s _dispatch_source_type_proc = {
922 .ke = {
923 .filter = EVFILT_PROC,
924 .flags = EV_CLEAR,
925 },
926 .mask = NOTE_EXIT|NOTE_FORK|NOTE_EXEC|NOTE_SIGNAL|NOTE_REAP,
927 };
928
929 const struct dispatch_source_type_s _dispatch_source_type_signal = {
930 .ke = {
931 .filter = EVFILT_SIGNAL,
932 },
933 };
934
935 const struct dispatch_source_type_s _dispatch_source_type_vnode = {
936 .ke = {
937 .filter = EVFILT_VNODE,
938 .flags = EV_CLEAR,
939 },
940 .mask = NOTE_DELETE|NOTE_WRITE|NOTE_EXTEND|NOTE_ATTRIB|NOTE_LINK|NOTE_RENAME|NOTE_REVOKE|NOTE_NONE,
941 };
942
943 const struct dispatch_source_type_s _dispatch_source_type_vfs = {
944 .ke = {
945 .filter = EVFILT_FS,
946 .flags = EV_CLEAR,
947 },
948 .mask = VQ_NOTRESP|VQ_NEEDAUTH|VQ_LOWDISK|VQ_MOUNT|VQ_UNMOUNT|VQ_DEAD|VQ_ASSIST|VQ_NOTRESPLOCK|VQ_UPDATE|VQ_VERYLOWDISK,
949 };
950
951 const struct dispatch_source_type_s _dispatch_source_type_mach_send = {
952 .ke = {
953 .filter = EVFILT_MACHPORT,
954 .flags = EV_DISPATCH,
955 .fflags = DISPATCH_MACHPORT_DEAD,
956 },
957 .mask = DISPATCH_MACH_SEND_DEAD,
958 };
959
960 const struct dispatch_source_type_s _dispatch_source_type_mach_recv = {
961 .ke = {
962 .filter = EVFILT_MACHPORT,
963 .flags = EV_DISPATCH,
964 .fflags = DISPATCH_MACHPORT_RECV,
965 },
966 };
967
968 const struct dispatch_source_type_s _dispatch_source_type_data_add = {
969 .ke = {
970 .filter = DISPATCH_EVFILT_CUSTOM_ADD,
971 },
972 };
973
974 const struct dispatch_source_type_s _dispatch_source_type_data_or = {
975 .ke = {
976 .filter = DISPATCH_EVFILT_CUSTOM_OR,
977 .flags = EV_CLEAR,
978 .fflags = ~0,
979 },
980 };
981
982 dispatch_source_t
983 dispatch_source_create(dispatch_source_type_t type,
984 uintptr_t handle,
985 unsigned long mask,
986 dispatch_queue_t q)
987 {
988 const struct kevent *proto_kev = &type->ke;
989 dispatch_source_t ds = NULL;
990 dispatch_kevent_t dk = NULL;
991
992 // input validation
993 if (type == NULL || (mask & ~type->mask)) {
994 goto out_bad;
995 }
996
997 switch (type->ke.filter) {
998 case EVFILT_SIGNAL:
999 if (handle >= NSIG) {
1000 goto out_bad;
1001 }
1002 break;
1003 case EVFILT_FS:
1004 case DISPATCH_EVFILT_CUSTOM_ADD:
1005 case DISPATCH_EVFILT_CUSTOM_OR:
1006 case DISPATCH_EVFILT_TIMER:
1007 if (handle) {
1008 goto out_bad;
1009 }
1010 break;
1011 default:
1012 break;
1013 }
1014
1015 ds = calloc(1ul, sizeof(struct dispatch_source_s));
1016 if (slowpath(!ds)) {
1017 goto out_bad;
1018 }
1019 dk = calloc(1ul, sizeof(struct dispatch_kevent_s));
1020 if (slowpath(!dk)) {
1021 goto out_bad;
1022 }
1023
1024 dk->dk_kevent = *proto_kev;
1025 dk->dk_kevent.ident = handle;
1026 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
1027 dk->dk_kevent.fflags |= (uint32_t)mask;
1028 dk->dk_kevent.udata = dk;
1029 TAILQ_INIT(&dk->dk_sources);
1030
1031 // Initialize as a queue first, then override some settings below.
1032 _dispatch_queue_init((dispatch_queue_t)ds);
1033 strlcpy(ds->dq_label, "source", sizeof(ds->dq_label));
1034
1035 // Dispatch Object
1036 ds->do_vtable = &_dispatch_source_kevent_vtable;
1037 ds->do_ref_cnt++; // the reference the manger queue holds
1038 ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
1039 // do_targetq will be retained below, past point of no-return
1040 ds->do_targetq = q;
1041
1042 // Dispatch Source
1043 ds->ds_ident_hack = dk->dk_kevent.ident;
1044 ds->ds_dkev = dk;
1045 ds->ds_pending_data_mask = dk->dk_kevent.fflags;
1046 if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
1047 if (proto_kev->filter != EVFILT_MACHPORT) {
1048 ds->ds_is_level = true;
1049 }
1050 ds->ds_needs_rearm = true;
1051 } else if (!(EV_CLEAR & proto_kev->flags)) {
1052 // we cheat and use EV_CLEAR to mean a "flag thingy"
1053 ds->ds_is_adder = true;
1054 }
1055
1056 // If its a timer source, it needs to be re-armed
1057 if (type->ke.filter == DISPATCH_EVFILT_TIMER) {
1058 ds->ds_needs_rearm = true;
1059 }
1060
1061 dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
1062 #if DISPATCH_DEBUG
1063 dispatch_debug(ds, __FUNCTION__);
1064 #endif
1065
1066 // Some sources require special processing
1067 if (type == DISPATCH_SOURCE_TYPE_MACH_SEND) {
1068 static dispatch_once_t pred;
1069 dispatch_once_f(&pred, NULL, _dispatch_mach_notify_source_init);
1070 } else if (type == DISPATCH_SOURCE_TYPE_TIMER) {
1071 ds->ds_timer.flags = mask;
1072 }
1073
1074 _dispatch_retain(ds->do_targetq);
1075 return ds;
1076
1077 out_bad:
1078 free(ds);
1079 free(dk);
1080 return NULL;
1081 }
1082
1083 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1084 static void
1085 _dispatch_source_set_event_handler2(void *context)
1086 {
1087 struct Block_layout *bl = context;
1088
1089 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
1090 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
1091
1092 if (ds->ds_handler_is_block && ds->ds_handler_ctxt) {
1093 Block_release(ds->ds_handler_ctxt);
1094 }
1095 ds->ds_handler_func = bl ? (void *)bl->invoke : NULL;
1096 ds->ds_handler_ctxt = bl;
1097 ds->ds_handler_is_block = true;
1098 }
1099
1100 void
1101 dispatch_source_set_event_handler(dispatch_source_t ds, dispatch_block_t handler)
1102 {
1103 dispatch_assert(!ds->ds_is_legacy);
1104 handler = _dispatch_Block_copy(handler);
1105 dispatch_barrier_async_f((dispatch_queue_t)ds,
1106 handler, _dispatch_source_set_event_handler2);
1107 }
1108
1109 static void
1110 _dispatch_source_set_event_handler_f(void *context)
1111 {
1112 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
1113 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
1114
1115 if (ds->ds_handler_is_block && ds->ds_handler_ctxt) {
1116 Block_release(ds->ds_handler_ctxt);
1117 }
1118 ds->ds_handler_func = context;
1119 ds->ds_handler_ctxt = ds->do_ctxt;
1120 ds->ds_handler_is_block = false;
1121 }
1122
1123 void
1124 dispatch_source_set_event_handler_f(dispatch_source_t ds,
1125 dispatch_function_t handler)
1126 {
1127 dispatch_assert(!ds->ds_is_legacy);
1128 dispatch_barrier_async_f((dispatch_queue_t)ds,
1129 handler, _dispatch_source_set_event_handler_f);
1130 }
1131
1132 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1133 static void
1134 _dispatch_source_set_cancel_handler2(void *context)
1135 {
1136 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
1137 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
1138
1139 if (ds->ds_cancel_is_block && ds->ds_cancel_handler) {
1140 Block_release(ds->ds_cancel_handler);
1141 }
1142 ds->ds_cancel_handler = context;
1143 ds->ds_cancel_is_block = true;
1144 }
1145
1146 void
1147 dispatch_source_set_cancel_handler(dispatch_source_t ds,
1148 dispatch_block_t handler)
1149 {
1150 dispatch_assert(!ds->ds_is_legacy);
1151 handler = _dispatch_Block_copy(handler);
1152 dispatch_barrier_async_f((dispatch_queue_t)ds,
1153 handler, _dispatch_source_set_cancel_handler2);
1154 }
1155
1156 static void
1157 _dispatch_source_set_cancel_handler_f(void *context)
1158 {
1159 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
1160 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
1161
1162 if (ds->ds_cancel_is_block && ds->ds_cancel_handler) {
1163 Block_release(ds->ds_cancel_handler);
1164 }
1165 ds->ds_cancel_handler = context;
1166 ds->ds_cancel_is_block = false;
1167 }
1168
1169 void
1170 dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
1171 dispatch_function_t handler)
1172 {
1173 dispatch_assert(!ds->ds_is_legacy);
1174 dispatch_barrier_async_f((dispatch_queue_t)ds,
1175 handler, _dispatch_source_set_cancel_handler_f);
1176 }
1177
1178 #ifndef DISPATCH_NO_LEGACY
1179 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1180 dispatch_source_t
1181 _dispatch_source_create2(dispatch_source_t ds,
1182 dispatch_source_attr_t attr,
1183 void *context,
1184 dispatch_source_handler_function_t handler)
1185 {
1186 if (ds == NULL || handler == NULL) {
1187 return NULL;
1188 }
1189
1190 ds->ds_is_legacy = true;
1191
1192 ds->ds_handler_func = handler;
1193 ds->ds_handler_ctxt = context;
1194
1195 if (attr && attr != DISPATCH_SOURCE_CREATE_SUSPENDED) {
1196 ds->dq_finalizer_ctxt = attr->finalizer_ctxt;
1197 ds->dq_finalizer_func = (typeof(ds->dq_finalizer_func))attr->finalizer_func;
1198 ds->do_ctxt = attr->context;
1199 }
1200 #ifdef __BLOCKS__
1201 if (ds->dq_finalizer_func == (void*)_dispatch_call_block_and_release2) {
1202 ds->dq_finalizer_ctxt = Block_copy(ds->dq_finalizer_ctxt);
1203 if (!ds->dq_finalizer_ctxt) {
1204 goto out_bad;
1205 }
1206 }
1207 if (handler == _dispatch_source_call_block) {
1208 struct Block_layout *bl = ds->ds_handler_ctxt = Block_copy(context);
1209 if (!ds->ds_handler_ctxt) {
1210 if (ds->dq_finalizer_func == (void*)_dispatch_call_block_and_release2) {
1211 Block_release(ds->dq_finalizer_ctxt);
1212 }
1213 goto out_bad;
1214 }
1215 ds->ds_handler_func = (void *)bl->invoke;
1216 ds->ds_handler_is_block = true;
1217 }
1218
1219 // all legacy sources get a cancellation event on the normal event handler.
1220 dispatch_source_handler_function_t func = ds->ds_handler_func;
1221 dispatch_source_handler_t block = ds->ds_handler_ctxt;
1222 void *ctxt = ds->ds_handler_ctxt;
1223 bool handler_is_block = ds->ds_handler_is_block;
1224
1225 ds->ds_cancel_is_block = true;
1226 if (handler_is_block) {
1227 ds->ds_cancel_handler = _dispatch_Block_copy(^{
1228 block(ds);
1229 });
1230 } else {
1231 ds->ds_cancel_handler = _dispatch_Block_copy(^{
1232 func(ctxt, ds);
1233 });
1234 }
1235 #endif
1236 if (attr != DISPATCH_SOURCE_CREATE_SUSPENDED) {
1237 dispatch_resume(ds);
1238 }
1239
1240 return ds;
1241
1242 out_bad:
1243 free(ds);
1244 return NULL;
1245 }
1246
1247 long
1248 dispatch_source_get_error(dispatch_source_t ds, long *err_out)
1249 {
1250 // 6863892 don't report ECANCELED until kevent is unregistered
1251 if ((ds->ds_atomic_flags & DSF_CANCELED) && !ds->ds_dkev) {
1252 if (err_out) {
1253 *err_out = ECANCELED;
1254 }
1255 return DISPATCH_ERROR_DOMAIN_POSIX;
1256 } else {
1257 return DISPATCH_ERROR_DOMAIN_NO_ERROR;
1258 }
1259 }
1260 #endif /* DISPATCH_NO_LEGACY */
1261
1262 // Updates the ordered list of timers based on next fire date for changes to ds.
1263 // Should only be called from the context of _dispatch_mgr_q.
1264 void
1265 _dispatch_timer_list_update(dispatch_source_t ds)
1266 {
1267 dispatch_source_t dsi = NULL;
1268 int idx;
1269
1270 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q);
1271
1272 // do not reschedule timers unregistered with _dispatch_kevent_release()
1273 if (!ds->ds_dkev) {
1274 return;
1275 }
1276
1277 // Ensure the source is on the global kevent lists before it is removed and
1278 // readded below.
1279 _dispatch_kevent_merge(ds);
1280
1281 TAILQ_REMOVE(&ds->ds_dkev->dk_sources, ds, ds_list);
1282
1283 // change the list if the clock type has changed
1284 if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
1285 idx = DISPATCH_TIMER_INDEX_WALL;
1286 } else {
1287 idx = DISPATCH_TIMER_INDEX_MACH;
1288 }
1289 ds->ds_dkev = &_dispatch_kevent_timer[idx];
1290
1291 if (ds->ds_timer.target) {
1292 TAILQ_FOREACH(dsi, &ds->ds_dkev->dk_sources, ds_list) {
1293 if (dsi->ds_timer.target == 0 || ds->ds_timer.target < dsi->ds_timer.target) {
1294 break;
1295 }
1296 }
1297 }
1298
1299 if (dsi) {
1300 TAILQ_INSERT_BEFORE(dsi, ds, ds_list);
1301 } else {
1302 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds, ds_list);
1303 }
1304 }
1305
1306 static void
1307 _dispatch_run_timers2(unsigned int timer)
1308 {
1309 dispatch_source_t ds;
1310 uint64_t now, missed;
1311
1312 if (timer == DISPATCH_TIMER_INDEX_MACH) {
1313 now = mach_absolute_time();
1314 } else {
1315 now = _dispatch_get_nanoseconds();
1316 }
1317
1318 while ((ds = TAILQ_FIRST(&_dispatch_kevent_timer[timer].dk_sources))) {
1319 // We may find timers on the wrong list due to a pending update from
1320 // dispatch_source_set_timer. Force an update of the list in that case.
1321 if (timer != ds->ds_ident_hack) {
1322 _dispatch_timer_list_update(ds);
1323 continue;
1324 }
1325 if (!ds->ds_timer.target) {
1326 // no configured timers on the list
1327 break;
1328 }
1329 if (ds->ds_timer.target > now) {
1330 // Done running timers for now.
1331 break;
1332 }
1333
1334 if (ds->ds_timer.flags & (DISPATCH_TIMER_ONESHOT|DISPATCH_TIMER_ABSOLUTE)) {
1335 dispatch_atomic_inc(&ds->ds_pending_data);
1336 ds->ds_timer.target = 0;
1337 } else {
1338 // Calculate number of missed intervals.
1339 missed = (now - ds->ds_timer.target) / ds->ds_timer.interval;
1340 dispatch_atomic_add(&ds->ds_pending_data, missed + 1);
1341 ds->ds_timer.target += (missed + 1) * ds->ds_timer.interval;
1342 }
1343
1344 _dispatch_timer_list_update(ds);
1345 _dispatch_wakeup(ds);
1346 }
1347 }
1348
1349 void
1350 _dispatch_run_timers(void)
1351 {
1352 unsigned int i;
1353 for (i = 0; i < DISPATCH_TIMER_COUNT; i++) {
1354 _dispatch_run_timers2(i);
1355 }
1356 }
1357
1358 #if defined(__i386__) || defined(__x86_64__)
1359 // these architectures always return mach_absolute_time() in nanoseconds
1360 #define _dispatch_convert_mach2nano(x) (x)
1361 #define _dispatch_convert_nano2mach(x) (x)
1362 #else
1363 static mach_timebase_info_data_t tbi;
1364 static dispatch_once_t tbi_pred;
1365
1366 static void
1367 _dispatch_convert_init(void *context __attribute__((unused)))
1368 {
1369 dispatch_assume_zero(mach_timebase_info(&tbi));
1370 }
1371
1372 static uint64_t
1373 _dispatch_convert_mach2nano(uint64_t val)
1374 {
1375 #ifdef __LP64__
1376 __uint128_t tmp;
1377 #else
1378 long double tmp;
1379 #endif
1380
1381 dispatch_once_f(&tbi_pred, NULL, _dispatch_convert_init);
1382
1383 tmp = val;
1384 tmp *= tbi.numer;
1385 tmp /= tbi.denom;
1386
1387 return tmp;
1388 }
1389
1390 static uint64_t
1391 _dispatch_convert_nano2mach(uint64_t val)
1392 {
1393 #ifdef __LP64__
1394 __uint128_t tmp;
1395 #else
1396 long double tmp;
1397 #endif
1398
1399 dispatch_once_f(&tbi_pred, NULL, _dispatch_convert_init);
1400
1401 tmp = val;
1402 tmp *= tbi.denom;
1403 tmp /= tbi.numer;
1404
1405 return tmp;
1406 }
1407 #endif
1408
1409 // approx 1 year (60s * 60m * 24h * 365d)
1410 #define FOREVER_SEC 3153600l
1411 #define FOREVER_NSEC 31536000000000000ull
1412
1413 struct timespec *
1414 _dispatch_get_next_timer_fire(struct timespec *howsoon)
1415 {
1416 // <rdar://problem/6459649>
1417 // kevent(2) does not allow large timeouts, so we use a long timeout
1418 // instead (approximately 1 year).
1419 dispatch_source_t ds = NULL;
1420 unsigned int timer;
1421 uint64_t now, delta_tmp, delta = UINT64_MAX;
1422
1423 // We are looking for the first unsuspended timer which has its target
1424 // time set. Given timers are kept in order, if we hit an timer that's
1425 // unset there's no point in continuing down the list.
1426 for (timer = 0; timer < DISPATCH_TIMER_COUNT; timer++) {
1427 TAILQ_FOREACH(ds, &_dispatch_kevent_timer[timer].dk_sources, ds_list) {
1428 if (!ds->ds_timer.target) {
1429 break;
1430 }
1431 if (DISPATCH_OBJECT_SUSPENDED(ds)) {
1432 ds->ds_is_armed = false;
1433 } else {
1434 break;
1435 }
1436 }
1437
1438 if (!ds || !ds->ds_timer.target) {
1439 continue;
1440 }
1441
1442 if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
1443 now = _dispatch_get_nanoseconds();
1444 } else {
1445 now = mach_absolute_time();
1446 }
1447 if (ds->ds_timer.target <= now) {
1448 howsoon->tv_sec = 0;
1449 howsoon->tv_nsec = 0;
1450 return howsoon;
1451 }
1452
1453 // the subtraction cannot go negative because the previous "if"
1454 // verified that the target is greater than now.
1455 delta_tmp = ds->ds_timer.target - now;
1456 if (!(ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK)) {
1457 delta_tmp = _dispatch_convert_mach2nano(delta_tmp);
1458 }
1459 if (delta_tmp < delta) {
1460 delta = delta_tmp;
1461 }
1462 }
1463 if (slowpath(delta > FOREVER_NSEC)) {
1464 return NULL;
1465 } else {
1466 howsoon->tv_sec = (time_t)(delta / NSEC_PER_SEC);
1467 howsoon->tv_nsec = (long)(delta % NSEC_PER_SEC);
1468 }
1469 return howsoon;
1470 }
1471
1472 struct dispatch_set_timer_params {
1473 dispatch_source_t ds;
1474 uintptr_t ident;
1475 struct dispatch_timer_source_s values;
1476 };
1477
1478 // To be called from the context of the _dispatch_mgr_q
1479 static void
1480 _dispatch_source_set_timer2(void *context)
1481 {
1482 struct dispatch_set_timer_params *params = context;
1483 dispatch_source_t ds = params->ds;
1484 ds->ds_ident_hack = params->ident;
1485 ds->ds_timer = params->values;
1486 _dispatch_timer_list_update(ds);
1487 dispatch_resume(ds);
1488 dispatch_release(ds);
1489 free(params);
1490 }
1491
1492 void
1493 dispatch_source_set_timer(dispatch_source_t ds,
1494 dispatch_time_t start,
1495 uint64_t interval,
1496 uint64_t leeway)
1497 {
1498 struct dispatch_set_timer_params *params;
1499
1500 // we use zero internally to mean disabled
1501 if (interval == 0) {
1502 interval = 1;
1503 } else if ((int64_t)interval < 0) {
1504 // 6866347 - make sure nanoseconds won't overflow
1505 interval = INT64_MAX;
1506 }
1507
1508 // Suspend the source so that it doesn't fire with pending changes
1509 // The use of suspend/resume requires the external retain/release
1510 dispatch_retain(ds);
1511 dispatch_suspend(ds);
1512
1513 if (start == DISPATCH_TIME_NOW) {
1514 start = mach_absolute_time();
1515 } else if (start == DISPATCH_TIME_FOREVER) {
1516 start = INT64_MAX;
1517 }
1518
1519 while (!(params = malloc(sizeof(struct dispatch_set_timer_params)))) {
1520 sleep(1);
1521 }
1522
1523 params->ds = ds;
1524 params->values.flags = ds->ds_timer.flags;
1525
1526 if ((int64_t)start < 0) {
1527 // wall clock
1528 params->ident = DISPATCH_TIMER_INDEX_WALL;
1529 params->values.start = -((int64_t)start);
1530 params->values.target = -((int64_t)start);
1531 params->values.interval = interval;
1532 params->values.leeway = leeway;
1533 params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
1534 } else {
1535 // mach clock
1536 params->ident = DISPATCH_TIMER_INDEX_MACH;
1537 params->values.start = start;
1538 params->values.target = start;
1539 params->values.interval = _dispatch_convert_nano2mach(interval);
1540 params->values.leeway = _dispatch_convert_nano2mach(leeway);
1541 params->values.flags &= ~DISPATCH_TIMER_WALL_CLOCK;
1542 }
1543
1544 dispatch_barrier_async_f(&_dispatch_mgr_q, params, _dispatch_source_set_timer2);
1545 }
1546
1547 #ifndef DISPATCH_NO_LEGACY
1548 // LEGACY
1549 long
1550 dispatch_source_timer_set_time(dispatch_source_t ds, uint64_t nanoseconds, uint64_t leeway)
1551 {
1552 dispatch_time_t start;
1553 if (nanoseconds == 0) {
1554 nanoseconds = 1;
1555 }
1556 if (ds->ds_timer.flags == (DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_WALL_CLOCK)) {
1557 static const struct timespec t0;
1558 start = dispatch_walltime(&t0, nanoseconds);
1559 } else if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
1560 start = dispatch_walltime(DISPATCH_TIME_NOW, nanoseconds);
1561 } else {
1562 start = dispatch_time(DISPATCH_TIME_NOW, nanoseconds);
1563 }
1564 if (ds->ds_timer.flags & (DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_ONESHOT)) {
1565 // 6866347 - make sure nanoseconds won't overflow
1566 nanoseconds = INT64_MAX; // non-repeating (~292 years)
1567 }
1568 dispatch_source_set_timer(ds, start, nanoseconds, leeway);
1569 return 0;
1570 }
1571
1572 // LEGACY
1573 uint64_t
1574 dispatch_event_get_nanoseconds(dispatch_source_t ds)
1575 {
1576 if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
1577 return ds->ds_timer.interval;
1578 } else {
1579 return _dispatch_convert_mach2nano(ds->ds_timer.interval);
1580 }
1581 }
1582 #endif /* DISPATCH_NO_LEGACY */
1583
1584 static dispatch_source_t _dispatch_mach_notify_source;
1585 static mach_port_t _dispatch_port_set;
1586 static mach_port_t _dispatch_event_port;
1587
1588 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
1589 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
1590
1591 #define _DISPATCH_MACHPORT_HASH_SIZE 32
1592 #define _DISPATCH_MACHPORT_HASH(x) _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
1593
1594 static void _dispatch_port_set_init(void *);
1595 static mach_port_t _dispatch_get_port_set(void);
1596
1597 void
1598 _dispatch_drain_mach_messages(struct kevent *ke)
1599 {
1600 dispatch_source_t dsi;
1601 dispatch_kevent_t dk;
1602 struct kevent ke2;
1603
1604 if (!dispatch_assume(ke->data)) {
1605 return;
1606 }
1607 dk = _dispatch_kevent_find(ke->data, EVFILT_MACHPORT);
1608 if (!dispatch_assume(dk)) {
1609 return;
1610 }
1611 _dispatch_kevent_machport_disable(dk); // emulate EV_DISPATCH
1612
1613 EV_SET(&ke2, ke->data, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH, DISPATCH_MACHPORT_RECV, 0, dk);
1614
1615 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
1616 _dispatch_source_merge_kevent(dsi, &ke2);
1617 }
1618 }
1619
1620 void
1621 _dispatch_port_set_init(void *context __attribute__((unused)))
1622 {
1623 struct kevent kev = {
1624 .filter = EVFILT_MACHPORT,
1625 .flags = EV_ADD,
1626 };
1627 kern_return_t kr;
1628
1629 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &_dispatch_port_set);
1630 DISPATCH_VERIFY_MIG(kr);
1631 dispatch_assume_zero(kr);
1632 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &_dispatch_event_port);
1633 DISPATCH_VERIFY_MIG(kr);
1634 dispatch_assume_zero(kr);
1635 kr = mach_port_move_member(mach_task_self(), _dispatch_event_port, _dispatch_port_set);
1636 DISPATCH_VERIFY_MIG(kr);
1637 dispatch_assume_zero(kr);
1638
1639 kev.ident = _dispatch_port_set;
1640
1641 _dispatch_update_kq(&kev);
1642 }
1643
1644 mach_port_t
1645 _dispatch_get_port_set(void)
1646 {
1647 static dispatch_once_t pred;
1648
1649 dispatch_once_f(&pred, NULL, _dispatch_port_set_init);
1650
1651 return _dispatch_port_set;
1652 }
1653
1654 void
1655 _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags)
1656 {
1657 mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
1658 kern_return_t kr;
1659
1660 if ((new_flags & DISPATCH_MACHPORT_RECV) || (!new_flags && !del_flags && dk->dk_kevent.fflags & DISPATCH_MACHPORT_RECV)) {
1661 _dispatch_kevent_machport_enable(dk);
1662 }
1663 if (new_flags & DISPATCH_MACHPORT_DEAD) {
1664 kr = mach_port_request_notification(mach_task_self(), port, MACH_NOTIFY_DEAD_NAME, 1,
1665 _dispatch_event_port, MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
1666 DISPATCH_VERIFY_MIG(kr);
1667
1668
1669 switch(kr) {
1670 case KERN_INVALID_NAME:
1671 case KERN_INVALID_RIGHT:
1672 // Supress errors
1673 break;
1674 default:
1675 // Else, we dont expect any errors from mach. Log any errors if we do
1676 if (dispatch_assume_zero(kr)) {
1677 // log the error
1678 } else if (dispatch_assume_zero(previous)) {
1679 // Another subsystem has beat libdispatch to requesting the Mach
1680 // dead-name notification on this port. We should technically cache the
1681 // previous port and message it when the kernel messages our port. Or
1682 // we can just say screw those subsystems and drop the previous port.
1683 // They should adopt libdispatch :-P
1684 kr = mach_port_deallocate(mach_task_self(), previous);
1685 DISPATCH_VERIFY_MIG(kr);
1686 dispatch_assume_zero(kr);
1687 }
1688 }
1689 }
1690
1691 if (del_flags & DISPATCH_MACHPORT_RECV) {
1692 _dispatch_kevent_machport_disable(dk);
1693 }
1694 if (del_flags & DISPATCH_MACHPORT_DEAD) {
1695 kr = mach_port_request_notification(mach_task_self(), (mach_port_t)dk->dk_kevent.ident,
1696 MACH_NOTIFY_DEAD_NAME, 1, MACH_PORT_NULL, MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
1697 DISPATCH_VERIFY_MIG(kr);
1698
1699 switch (kr) {
1700 case KERN_INVALID_NAME:
1701 case KERN_INVALID_RIGHT:
1702 case KERN_INVALID_ARGUMENT:
1703 break;
1704 default:
1705 if (dispatch_assume_zero(kr)) {
1706 // log the error
1707 } else if (previous) {
1708 // the kernel has not consumed the right yet
1709 dispatch_assume_zero(_dispatch_send_consume_send_once_right(previous));
1710 }
1711 }
1712 }
1713 }
1714
1715 void
1716 _dispatch_kevent_machport_enable(dispatch_kevent_t dk)
1717 {
1718 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
1719 kern_return_t kr;
1720
1721 kr = mach_port_move_member(mach_task_self(), mp, _dispatch_get_port_set());
1722 DISPATCH_VERIFY_MIG(kr);
1723 switch (kr) {
1724 case KERN_INVALID_NAME:
1725 #if DISPATCH_DEBUG
1726 _dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp);
1727 #endif
1728 break;
1729 default:
1730 dispatch_assume_zero(kr);
1731 }
1732 }
1733
1734 void
1735 _dispatch_kevent_machport_disable(dispatch_kevent_t dk)
1736 {
1737 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
1738 kern_return_t kr;
1739
1740 kr = mach_port_move_member(mach_task_self(), mp, 0);
1741 DISPATCH_VERIFY_MIG(kr);
1742 switch (kr) {
1743 case KERN_INVALID_RIGHT:
1744 case KERN_INVALID_NAME:
1745 #if DISPATCH_DEBUG
1746 _dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp);
1747 #endif
1748 break;
1749 case 0:
1750 break;
1751 default:
1752 dispatch_assume_zero(kr);
1753 break;
1754 }
1755 }
1756
1757 #define _DISPATCH_MIN_MSG_SZ (8ul * 1024ul - MAX_TRAILER_SIZE)
1758 #ifndef DISPATCH_NO_LEGACY
1759 dispatch_source_t
1760 dispatch_source_mig_create(mach_port_t mport, size_t max_msg_size, dispatch_source_attr_t attr,
1761 dispatch_queue_t dq, dispatch_mig_callback_t mig_callback)
1762 {
1763 if (max_msg_size < _DISPATCH_MIN_MSG_SZ) {
1764 max_msg_size = _DISPATCH_MIN_MSG_SZ;
1765 }
1766 return dispatch_source_machport_create(mport, DISPATCH_MACHPORT_RECV, attr, dq,
1767 ^(dispatch_source_t ds) {
1768 if (!dispatch_source_get_error(ds, NULL)) {
1769 if (dq->dq_width != 1) {
1770 dispatch_retain(ds); // this is a shim -- use the external retain
1771 dispatch_async(dq, ^{
1772 dispatch_mig_server(ds, max_msg_size, mig_callback);
1773 dispatch_release(ds); // this is a shim -- use the external release
1774 });
1775 } else {
1776 dispatch_mig_server(ds, max_msg_size, mig_callback);
1777 }
1778 }
1779 });
1780 }
1781 #endif /* DISPATCH_NO_LEGACY */
1782
1783 static void
1784 _dispatch_mach_notify_source_init(void *context __attribute__((unused)))
1785 {
1786 size_t maxsz = sizeof(union __RequestUnion___dispatch_send_libdispatch_internal_protocol_subsystem);
1787
1788 if (sizeof(union __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem) > maxsz) {
1789 maxsz = sizeof(union __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem);
1790 }
1791
1792 _dispatch_get_port_set();
1793
1794 _dispatch_mach_notify_source = dispatch_source_mig_create(_dispatch_event_port,
1795 maxsz, NULL, &_dispatch_mgr_q, libdispatch_internal_protocol_server);
1796
1797 dispatch_assert(_dispatch_mach_notify_source);
1798 }
1799
1800 kern_return_t
1801 _dispatch_mach_notify_port_deleted(mach_port_t notify __attribute__((unused)), mach_port_name_t name)
1802 {
1803 dispatch_source_t dsi;
1804 dispatch_kevent_t dk;
1805 struct kevent kev;
1806
1807 #if DISPATCH_DEBUG
1808 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x deleted prematurely", name);
1809 #endif
1810
1811 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
1812 if (!dk) {
1813 goto out;
1814 }
1815
1816 EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH|EV_EOF, DISPATCH_MACHPORT_DELETED, 0, dk);
1817
1818 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
1819 _dispatch_source_merge_kevent(dsi, &kev);
1820 // this can never happen again
1821 // this must happen after the merge
1822 // this may be racy in the future, but we don't provide a 'setter' API for the mask yet
1823 dsi->ds_pending_data_mask &= ~DISPATCH_MACHPORT_DELETED;
1824 }
1825
1826 // no more sources have this flag
1827 dk->dk_kevent.fflags &= ~DISPATCH_MACHPORT_DELETED;
1828
1829 out:
1830 return KERN_SUCCESS;
1831 }
1832
1833 kern_return_t
1834 _dispatch_mach_notify_port_destroyed(mach_port_t notify __attribute__((unused)), mach_port_t name)
1835 {
1836 kern_return_t kr;
1837 // this function should never be called
1838 dispatch_assume_zero(name);
1839 kr = mach_port_mod_refs(mach_task_self(), name, MACH_PORT_RIGHT_RECEIVE, -1);
1840 DISPATCH_VERIFY_MIG(kr);
1841 dispatch_assume_zero(kr);
1842 return KERN_SUCCESS;
1843 }
1844
1845 kern_return_t
1846 _dispatch_mach_notify_no_senders(mach_port_t notify, mach_port_mscount_t mscnt __attribute__((unused)))
1847 {
1848 // this function should never be called
1849 dispatch_assume_zero(notify);
1850 return KERN_SUCCESS;
1851 }
1852
1853 kern_return_t
1854 _dispatch_mach_notify_send_once(mach_port_t notify __attribute__((unused)))
1855 {
1856 // we only register for dead-name notifications
1857 // some code deallocated our send-once right without consuming it
1858 #if DISPATCH_DEBUG
1859 _dispatch_log("Corruption: An app/library deleted a libdispatch dead-name notification");
1860 #endif
1861 return KERN_SUCCESS;
1862 }
1863
1864 kern_return_t
1865 _dispatch_mach_notify_dead_name(mach_port_t notify __attribute__((unused)), mach_port_name_t name)
1866 {
1867 dispatch_source_t dsi;
1868 dispatch_kevent_t dk;
1869 struct kevent kev;
1870 kern_return_t kr;
1871
1872 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
1873 if (!dk) {
1874 goto out;
1875 }
1876
1877 EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH|EV_EOF, DISPATCH_MACHPORT_DEAD, 0, dk);
1878
1879 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
1880 _dispatch_source_merge_kevent(dsi, &kev);
1881 // this can never happen again
1882 // this must happen after the merge
1883 // this may be racy in the future, but we don't provide a 'setter' API for the mask yet
1884 dsi->ds_pending_data_mask &= ~DISPATCH_MACHPORT_DEAD;
1885 }
1886
1887 // no more sources have this flag
1888 dk->dk_kevent.fflags &= ~DISPATCH_MACHPORT_DEAD;
1889
1890 out:
1891 // the act of receiving a dead name notification allocates a dead-name right that must be deallocated
1892 kr = mach_port_deallocate(mach_task_self(), name);
1893 DISPATCH_VERIFY_MIG(kr);
1894 //dispatch_assume_zero(kr);
1895
1896 return KERN_SUCCESS;
1897 }
1898
1899 kern_return_t
1900 _dispatch_wakeup_main_thread(mach_port_t mp __attribute__((unused)))
1901 {
1902 // dummy function just to pop out the main thread out of mach_msg()
1903 return 0;
1904 }
1905
1906 kern_return_t
1907 _dispatch_consume_send_once_right(mach_port_t mp __attribute__((unused)))
1908 {
1909 // dummy function to consume a send-once right
1910 return 0;
1911 }
1912
1913 mach_msg_return_t
1914 dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz, dispatch_mig_callback_t callback)
1915 {
1916 mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
1917 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
1918 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0);
1919 mach_msg_options_t tmp_options = options;
1920 mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
1921 mach_msg_return_t kr = 0;
1922 unsigned int cnt = 1000; // do not stall out serial queues
1923 int demux_success;
1924
1925 maxmsgsz += MAX_TRAILER_SIZE;
1926
1927 // XXX FIXME -- allocate these elsewhere
1928 bufRequest = alloca(maxmsgsz);
1929 bufReply = alloca(maxmsgsz);
1930 bufReply->Head.msgh_size = 0; // make CLANG happy
1931
1932 // XXX FIXME -- change this to not starve out the target queue
1933 for (;;) {
1934 if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
1935 options &= ~MACH_RCV_MSG;
1936 tmp_options &= ~MACH_RCV_MSG;
1937
1938 if (!(tmp_options & MACH_SEND_MSG)) {
1939 break;
1940 }
1941 }
1942
1943 kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
1944 (mach_msg_size_t)maxmsgsz, (mach_port_t)ds->ds_ident_hack, 0, 0);
1945
1946 tmp_options = options;
1947
1948 if (slowpath(kr)) {
1949 switch (kr) {
1950 case MACH_SEND_INVALID_DEST:
1951 case MACH_SEND_TIMED_OUT:
1952 if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
1953 mach_msg_destroy(&bufReply->Head);
1954 }
1955 break;
1956 case MACH_RCV_TIMED_OUT:
1957 case MACH_RCV_INVALID_NAME:
1958 break;
1959 default:
1960 dispatch_assume_zero(kr);
1961 break;
1962 }
1963 break;
1964 }
1965
1966 if (!(tmp_options & MACH_RCV_MSG)) {
1967 break;
1968 }
1969
1970 bufTemp = bufRequest;
1971 bufRequest = bufReply;
1972 bufReply = bufTemp;
1973
1974 demux_success = callback(&bufRequest->Head, &bufReply->Head);
1975
1976 if (!demux_success) {
1977 // destroy the request - but not the reply port
1978 bufRequest->Head.msgh_remote_port = 0;
1979 mach_msg_destroy(&bufRequest->Head);
1980 } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
1981 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode is present
1982 if (slowpath(bufReply->RetCode)) {
1983 if (bufReply->RetCode == MIG_NO_REPLY) {
1984 continue;
1985 }
1986
1987 // destroy the request - but not the reply port
1988 bufRequest->Head.msgh_remote_port = 0;
1989 mach_msg_destroy(&bufRequest->Head);
1990 }
1991 }
1992
1993 if (bufReply->Head.msgh_remote_port) {
1994 tmp_options |= MACH_SEND_MSG;
1995 if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) != MACH_MSG_TYPE_MOVE_SEND_ONCE) {
1996 tmp_options |= MACH_SEND_TIMEOUT;
1997 }
1998 }
1999 }
2000
2001 return kr;
2002 }