]> git.saurik.com Git - apple/libdispatch.git/blob - src/source.c
libdispatch-84.5.tar.gz
[apple/libdispatch.git] / src / source.c
1 /*
2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #include "protocol.h"
23 #include "protocolServer.h"
24 #include <sys/mount.h>
25
26 #define DISPATCH_EVFILT_TIMER (-EVFILT_SYSCOUNT - 1)
27 #define DISPATCH_EVFILT_CUSTOM_ADD (-EVFILT_SYSCOUNT - 2)
28 #define DISPATCH_EVFILT_CUSTOM_OR (-EVFILT_SYSCOUNT - 3)
29 #define DISPATCH_EVFILT_SYSCOUNT (EVFILT_SYSCOUNT + 3)
30
31 #define DISPATCH_TIMER_INDEX_WALL 0
32 #define DISPATCH_TIMER_INDEX_MACH 1
33 static struct dispatch_kevent_s _dispatch_kevent_timer[] = {
34 {
35 .dk_kevent = {
36 .ident = DISPATCH_TIMER_INDEX_WALL,
37 .filter = DISPATCH_EVFILT_TIMER,
38 .udata = &_dispatch_kevent_timer[0],
39 },
40 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[0].dk_sources),
41 },
42 {
43 .dk_kevent = {
44 .ident = DISPATCH_TIMER_INDEX_MACH,
45 .filter = DISPATCH_EVFILT_TIMER,
46 .udata = &_dispatch_kevent_timer[1],
47 },
48 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_timer[1].dk_sources),
49 },
50 };
51 #define DISPATCH_TIMER_COUNT (sizeof _dispatch_kevent_timer / sizeof _dispatch_kevent_timer[0])
52
53 static struct dispatch_kevent_s _dispatch_kevent_data_or = {
54 .dk_kevent = {
55 .filter = DISPATCH_EVFILT_CUSTOM_OR,
56 .flags = EV_CLEAR,
57 .udata = &_dispatch_kevent_data_or,
58 },
59 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
60 };
61 static struct dispatch_kevent_s _dispatch_kevent_data_add = {
62 .dk_kevent = {
63 .filter = DISPATCH_EVFILT_CUSTOM_ADD,
64 .udata = &_dispatch_kevent_data_add,
65 },
66 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
67 };
68
69 #ifndef DISPATCH_NO_LEGACY
70 struct dispatch_source_attr_vtable_s {
71 DISPATCH_VTABLE_HEADER(dispatch_source_attr_s);
72 };
73
74 struct dispatch_source_attr_s {
75 DISPATCH_STRUCT_HEADER(dispatch_source_attr_s, dispatch_source_attr_vtable_s);
76 void* finalizer_ctxt;
77 dispatch_source_finalizer_function_t finalizer_func;
78 void* context;
79 };
80 #endif /* DISPATCH_NO_LEGACY */
81
82 #define _dispatch_source_call_block ((void *)-1)
83 static void _dispatch_source_latch_and_call(dispatch_source_t ds);
84 static void _dispatch_source_cancel_callout(dispatch_source_t ds);
85 static bool _dispatch_source_probe(dispatch_source_t ds);
86 static void _dispatch_source_dispose(dispatch_source_t ds);
87 static void _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke);
88 static size_t _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz);
89 static size_t dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz);
90 static dispatch_queue_t _dispatch_source_invoke(dispatch_source_t ds);
91
92 static void _dispatch_kevent_merge(dispatch_source_t ds);
93 static void _dispatch_kevent_release(dispatch_source_t ds);
94 static void _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags);
95 static void _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags);
96 static void _dispatch_kevent_machport_enable(dispatch_kevent_t dk);
97 static void _dispatch_kevent_machport_disable(dispatch_kevent_t dk);
98
99 static void _dispatch_drain_mach_messages(struct kevent *ke);
100 static void _dispatch_timer_list_update(dispatch_source_t ds);
101
102 static void
103 _dispatch_mach_notify_source_init(void *context __attribute__((unused)));
104
105 static const char *
106 _evfiltstr(short filt)
107 {
108 switch (filt) {
109 #define _evfilt2(f) case (f): return #f
110 _evfilt2(EVFILT_READ);
111 _evfilt2(EVFILT_WRITE);
112 _evfilt2(EVFILT_AIO);
113 _evfilt2(EVFILT_VNODE);
114 _evfilt2(EVFILT_PROC);
115 _evfilt2(EVFILT_SIGNAL);
116 _evfilt2(EVFILT_TIMER);
117 _evfilt2(EVFILT_MACHPORT);
118 _evfilt2(EVFILT_FS);
119 _evfilt2(EVFILT_USER);
120 _evfilt2(EVFILT_SESSION);
121
122 _evfilt2(DISPATCH_EVFILT_TIMER);
123 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
124 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
125 default:
126 return "EVFILT_missing";
127 }
128 }
129
130 #define DSL_HASH_SIZE 256u // must be a power of two
131 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
132
133 static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
134
135 static dispatch_kevent_t
136 _dispatch_kevent_find(uintptr_t ident, short filter)
137 {
138 uintptr_t hash = DSL_HASH(filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
139 dispatch_kevent_t dki;
140
141 TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
142 if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
143 break;
144 }
145 }
146 return dki;
147 }
148
149 static void
150 _dispatch_kevent_insert(dispatch_kevent_t dk)
151 {
152 uintptr_t ident = dk->dk_kevent.ident;
153 uintptr_t hash = DSL_HASH(dk->dk_kevent.filter == EVFILT_MACHPORT ? MACH_PORT_INDEX(ident) : ident);
154
155 TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
156 }
157
158 void
159 dispatch_source_cancel(dispatch_source_t ds)
160 {
161 #if DISPATCH_DEBUG
162 dispatch_debug(ds, __FUNCTION__);
163 #endif
164 dispatch_atomic_or(&ds->ds_atomic_flags, DSF_CANCELED);
165 _dispatch_wakeup(ds);
166 }
167
168 #ifndef DISPATCH_NO_LEGACY
169 void
170 _dispatch_source_legacy_xref_release(dispatch_source_t ds)
171 {
172 if (ds->ds_is_legacy) {
173 if (!(ds->ds_timer.flags & DISPATCH_TIMER_ONESHOT)) {
174 dispatch_source_cancel(ds);
175 }
176
177 // Clients often leave sources suspended at the last release
178 dispatch_atomic_and(&ds->do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK);
179 } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
180 // Arguments for and against this assert are within 6705399
181 DISPATCH_CLIENT_CRASH("Release of a suspended object");
182 }
183 _dispatch_wakeup(ds);
184 _dispatch_release(ds);
185 }
186 #endif /* DISPATCH_NO_LEGACY */
187
188 long
189 dispatch_source_testcancel(dispatch_source_t ds)
190 {
191 return (bool)(ds->ds_atomic_flags & DSF_CANCELED);
192 }
193
194
195 unsigned long
196 dispatch_source_get_mask(dispatch_source_t ds)
197 {
198 return ds->ds_pending_data_mask;
199 }
200
201 uintptr_t
202 dispatch_source_get_handle(dispatch_source_t ds)
203 {
204 return (int)ds->ds_ident_hack;
205 }
206
207 unsigned long
208 dispatch_source_get_data(dispatch_source_t ds)
209 {
210 return ds->ds_data;
211 }
212
213 #if DISPATCH_DEBUG
214 void
215 dispatch_debug_kevents(struct kevent* kev, size_t count, const char* str)
216 {
217 size_t i;
218 for (i = 0; i < count; ++i) {
219 _dispatch_log("kevent[%lu] = { ident = %p, filter = %s, flags = 0x%x, fflags = 0x%x, data = %p, udata = %p }: %s",
220 i, (void*)kev[i].ident, _evfiltstr(kev[i].filter), kev[i].flags, kev[i].fflags, (void*)kev[i].data, (void*)kev[i].udata, str);
221 }
222 }
223 #endif
224
225 static size_t
226 _dispatch_source_kevent_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
227 {
228 size_t offset = _dispatch_source_debug(ds, buf, bufsiz);
229 offset += snprintf(&buf[offset], bufsiz - offset, "filter = %s }",
230 ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????");
231 return offset;
232 }
233
234 static void
235 _dispatch_source_init_tail_queue_array(void *context __attribute__((unused)))
236 {
237 unsigned int i;
238 for (i = 0; i < DSL_HASH_SIZE; i++) {
239 TAILQ_INIT(&_dispatch_sources[i]);
240 }
241
242 TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_WALL)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_WALL], dk_list);
243 TAILQ_INSERT_TAIL(&_dispatch_sources[DSL_HASH(DISPATCH_TIMER_INDEX_MACH)], &_dispatch_kevent_timer[DISPATCH_TIMER_INDEX_MACH], dk_list);
244 TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_or, dk_list);
245 TAILQ_INSERT_TAIL(&_dispatch_sources[0], &_dispatch_kevent_data_add, dk_list);
246 }
247
248 // Find existing kevents, and merge any new flags if necessary
249 void
250 _dispatch_kevent_merge(dispatch_source_t ds)
251 {
252 static dispatch_once_t pred;
253 dispatch_kevent_t dk;
254 typeof(dk->dk_kevent.fflags) new_flags;
255 bool do_resume = false;
256
257 if (ds->ds_is_installed) {
258 return;
259 }
260 ds->ds_is_installed = true;
261
262 dispatch_once_f(&pred, NULL, _dispatch_source_init_tail_queue_array);
263
264 dk = _dispatch_kevent_find(ds->ds_dkev->dk_kevent.ident, ds->ds_dkev->dk_kevent.filter);
265
266 if (dk) {
267 // If an existing dispatch kevent is found, check to see if new flags
268 // need to be added to the existing kevent
269 new_flags = ~dk->dk_kevent.fflags & ds->ds_dkev->dk_kevent.fflags;
270 dk->dk_kevent.fflags |= ds->ds_dkev->dk_kevent.fflags;
271 free(ds->ds_dkev);
272 ds->ds_dkev = dk;
273 do_resume = new_flags;
274 } else {
275 dk = ds->ds_dkev;
276 _dispatch_kevent_insert(dk);
277 new_flags = dk->dk_kevent.fflags;
278 do_resume = true;
279 }
280
281 TAILQ_INSERT_TAIL(&dk->dk_sources, ds, ds_list);
282
283 // Re-register the kevent with the kernel if new flags were added
284 // by the dispatch kevent
285 if (do_resume) {
286 dk->dk_kevent.flags |= EV_ADD;
287 _dispatch_kevent_resume(ds->ds_dkev, new_flags, 0);
288 ds->ds_is_armed = true;
289 }
290 }
291
292
293 void
294 _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags)
295 {
296 switch (dk->dk_kevent.filter) {
297 case DISPATCH_EVFILT_TIMER:
298 case DISPATCH_EVFILT_CUSTOM_ADD:
299 case DISPATCH_EVFILT_CUSTOM_OR:
300 // these types not registered with kevent
301 return;
302 case EVFILT_MACHPORT:
303 _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
304 break;
305 case EVFILT_PROC:
306 if (dk->dk_kevent.flags & EV_ONESHOT) {
307 return;
308 }
309 // fall through
310 default:
311 _dispatch_update_kq(&dk->dk_kevent);
312 if (dk->dk_kevent.flags & EV_DISPATCH) {
313 dk->dk_kevent.flags &= ~EV_ADD;
314 }
315 break;
316 }
317 }
318
319 dispatch_queue_t
320 _dispatch_source_invoke(dispatch_source_t ds)
321 {
322 // This function performs all source actions. Each action is responsible
323 // for verifying that it takes place on the appropriate queue. If the
324 // current queue is not the correct queue for this action, the correct queue
325 // will be returned and the invoke will be re-driven on that queue.
326
327 // The order of tests here in invoke and in probe should be consistent.
328
329 dispatch_queue_t dq = _dispatch_queue_get_current();
330
331 if (!ds->ds_is_installed) {
332 // The source needs to be installed on the manager queue.
333 if (dq != &_dispatch_mgr_q) {
334 return &_dispatch_mgr_q;
335 }
336 _dispatch_kevent_merge(ds);
337 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
338 // The source has been cancelled and needs to be uninstalled from the
339 // manager queue. After uninstallation, the cancellation handler needs
340 // to be delivered to the target queue.
341 if (ds->ds_dkev) {
342 if (dq != &_dispatch_mgr_q) {
343 return &_dispatch_mgr_q;
344 }
345 _dispatch_kevent_release(ds);
346 return ds->do_targetq;
347 } else if (ds->ds_cancel_handler) {
348 if (dq != ds->do_targetq) {
349 return ds->do_targetq;
350 }
351 }
352 _dispatch_source_cancel_callout(ds);
353 } else if (ds->ds_pending_data) {
354 // The source has pending data to deliver via the event handler callback
355 // on the target queue. Some sources need to be rearmed on the manager
356 // queue after event delivery.
357 if (dq != ds->do_targetq) {
358 return ds->do_targetq;
359 }
360 _dispatch_source_latch_and_call(ds);
361 if (ds->ds_needs_rearm) {
362 return &_dispatch_mgr_q;
363 }
364 } else if (ds->ds_needs_rearm && !ds->ds_is_armed) {
365 // The source needs to be rearmed on the manager queue.
366 if (dq != &_dispatch_mgr_q) {
367 return &_dispatch_mgr_q;
368 }
369 _dispatch_kevent_resume(ds->ds_dkev, 0, 0);
370 ds->ds_is_armed = true;
371 }
372
373 return NULL;
374 }
375
376 bool
377 _dispatch_source_probe(dispatch_source_t ds)
378 {
379 // This function determines whether the source needs to be invoked.
380 // The order of tests here in probe and in invoke should be consistent.
381
382 if (!ds->ds_is_installed) {
383 // The source needs to be installed on the manager queue.
384 return true;
385 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
386 // The source needs to be uninstalled from the manager queue, or the
387 // cancellation handler needs to be delivered to the target queue.
388 // Note: cancellation assumes installation.
389 if (ds->ds_dkev || ds->ds_cancel_handler) {
390 return true;
391 }
392 } else if (ds->ds_pending_data) {
393 // The source has pending data to deliver to the target queue.
394 return true;
395 } else if (ds->ds_needs_rearm && !ds->ds_is_armed) {
396 // The source needs to be rearmed on the manager queue.
397 return true;
398 }
399 // Nothing to do.
400 return false;
401 }
402
403 void
404 _dispatch_source_dispose(dispatch_source_t ds)
405 {
406 _dispatch_queue_dispose((dispatch_queue_t)ds);
407 }
408
409 static void
410 _dispatch_kevent_debugger2(void *context, dispatch_source_t unused __attribute__((unused)))
411 {
412 struct sockaddr sa;
413 socklen_t sa_len = sizeof(sa);
414 int c, fd = (int)(long)context;
415 unsigned int i;
416 dispatch_kevent_t dk;
417 dispatch_source_t ds;
418 FILE *debug_stream;
419
420 c = accept(fd, &sa, &sa_len);
421 if (c == -1) {
422 if (errno != EAGAIN) {
423 dispatch_assume_zero(errno);
424 }
425 return;
426 }
427 #if 0
428 int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO
429 if (r == -1) {
430 dispatch_assume_zero(errno);
431 }
432 #endif
433 debug_stream = fdopen(c, "a");
434 if (!dispatch_assume(debug_stream)) {
435 close(c);
436 return;
437 }
438
439 fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
440 fprintf(debug_stream, "Content-type: text/html\r\n");
441 fprintf(debug_stream, "Pragma: nocache\r\n");
442 fprintf(debug_stream, "\r\n");
443 fprintf(debug_stream, "<html>\n<head><title>PID %u</title></head>\n<body>\n<ul>\n", getpid());
444
445 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td><td>DK</td></tr>\n");
446
447 for (i = 0; i < DSL_HASH_SIZE; i++) {
448 if (TAILQ_EMPTY(&_dispatch_sources[i])) {
449 continue;
450 }
451 TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
452 fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags 0x%hx fflags 0x%x data 0x%lx udata %p\n",
453 dk, dk->dk_kevent.ident, _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
454 dk->dk_kevent.fflags, dk->dk_kevent.data, dk->dk_kevent.udata);
455 fprintf(debug_stream, "\t\t<ul>\n");
456 TAILQ_FOREACH(ds, &dk->dk_sources, ds_list) {
457 fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend 0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
458 ds, ds->do_ref_cnt, ds->do_suspend_cnt, ds->ds_pending_data, ds->ds_pending_data_mask,
459 ds->ds_atomic_flags);
460 if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
461 dispatch_queue_t dq = ds->do_targetq;
462 fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend 0x%x label: %s\n", dq, dq->do_ref_cnt, dq->do_suspend_cnt, dq->dq_label);
463 }
464 }
465 fprintf(debug_stream, "\t\t</ul>\n");
466 fprintf(debug_stream, "\t</li>\n");
467 }
468 }
469 fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
470 fflush(debug_stream);
471 fclose(debug_stream);
472 }
473
474 static void
475 _dispatch_kevent_debugger(void *context __attribute__((unused)))
476 {
477 union {
478 struct sockaddr_in sa_in;
479 struct sockaddr sa;
480 } sa_u = {
481 .sa_in = {
482 .sin_family = AF_INET,
483 .sin_addr = { htonl(INADDR_LOOPBACK), },
484 },
485 };
486 dispatch_source_t ds;
487 const char *valstr;
488 int val, r, fd, sock_opt = 1;
489 socklen_t slen = sizeof(sa_u);
490
491 if (issetugid()) {
492 return;
493 }
494 valstr = getenv("LIBDISPATCH_DEBUGGER");
495 if (!valstr) {
496 return;
497 }
498 val = atoi(valstr);
499 if (val == 2) {
500 sa_u.sa_in.sin_addr.s_addr = 0;
501 }
502 fd = socket(PF_INET, SOCK_STREAM, 0);
503 if (fd == -1) {
504 dispatch_assume_zero(errno);
505 return;
506 }
507 r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt, (socklen_t) sizeof sock_opt);
508 if (r == -1) {
509 dispatch_assume_zero(errno);
510 goto out_bad;
511 }
512 #if 0
513 r = fcntl(fd, F_SETFL, O_NONBLOCK);
514 if (r == -1) {
515 dispatch_assume_zero(errno);
516 goto out_bad;
517 }
518 #endif
519 r = bind(fd, &sa_u.sa, sizeof(sa_u));
520 if (r == -1) {
521 dispatch_assume_zero(errno);
522 goto out_bad;
523 }
524 r = listen(fd, SOMAXCONN);
525 if (r == -1) {
526 dispatch_assume_zero(errno);
527 goto out_bad;
528 }
529 r = getsockname(fd, &sa_u.sa, &slen);
530 if (r == -1) {
531 dispatch_assume_zero(errno);
532 goto out_bad;
533 }
534 ds = dispatch_source_read_create_f(fd, NULL, &_dispatch_mgr_q, (void *)(long)fd, _dispatch_kevent_debugger2);
535 if (dispatch_assume(ds)) {
536 _dispatch_log("LIBDISPATCH: debug port: %hu", ntohs(sa_u.sa_in.sin_port));
537 return;
538 }
539 out_bad:
540 close(fd);
541 }
542
543 void
544 _dispatch_source_drain_kevent(struct kevent *ke)
545 {
546 static dispatch_once_t pred;
547 dispatch_kevent_t dk = ke->udata;
548 dispatch_source_t dsi;
549
550 dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
551
552 dispatch_debug_kevents(ke, 1, __func__);
553
554 if (ke->filter == EVFILT_MACHPORT) {
555 return _dispatch_drain_mach_messages(ke);
556 }
557 dispatch_assert(dk);
558
559 if (ke->flags & EV_ONESHOT) {
560 dk->dk_kevent.flags |= EV_ONESHOT;
561 }
562
563 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
564 _dispatch_source_merge_kevent(dsi, ke);
565 }
566 }
567
568 static void
569 _dispatch_kevent_dispose(dispatch_kevent_t dk)
570 {
571 uintptr_t key;
572
573 switch (dk->dk_kevent.filter) {
574 case DISPATCH_EVFILT_TIMER:
575 case DISPATCH_EVFILT_CUSTOM_ADD:
576 case DISPATCH_EVFILT_CUSTOM_OR:
577 // these sources live on statically allocated lists
578 return;
579 case EVFILT_MACHPORT:
580 _dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
581 break;
582 case EVFILT_PROC:
583 if (dk->dk_kevent.flags & EV_ONESHOT) {
584 break; // implicitly deleted
585 }
586 // fall through
587 default:
588 if (~dk->dk_kevent.flags & EV_DELETE) {
589 dk->dk_kevent.flags |= EV_DELETE;
590 _dispatch_update_kq(&dk->dk_kevent);
591 }
592 break;
593 }
594
595 if (dk->dk_kevent.filter == EVFILT_MACHPORT) {
596 key = MACH_PORT_INDEX(dk->dk_kevent.ident);
597 } else {
598 key = dk->dk_kevent.ident;
599 }
600
601 TAILQ_REMOVE(&_dispatch_sources[DSL_HASH(key)], dk, dk_list);
602 free(dk);
603 }
604
605 void
606 _dispatch_kevent_release(dispatch_source_t ds)
607 {
608 dispatch_kevent_t dk = ds->ds_dkev;
609 dispatch_source_t dsi;
610 uint32_t del_flags, fflags = 0;
611
612 ds->ds_dkev = NULL;
613
614 TAILQ_REMOVE(&dk->dk_sources, ds, ds_list);
615
616 if (TAILQ_EMPTY(&dk->dk_sources)) {
617 _dispatch_kevent_dispose(dk);
618 } else {
619 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
620 fflags |= (uint32_t)dsi->ds_pending_data_mask;
621 }
622 del_flags = (uint32_t)ds->ds_pending_data_mask & ~fflags;
623 if (del_flags) {
624 dk->dk_kevent.flags |= EV_ADD;
625 dk->dk_kevent.fflags = fflags;
626 _dispatch_kevent_resume(dk, 0, del_flags);
627 }
628 }
629
630 ds->ds_is_armed = false;
631 ds->ds_needs_rearm = false; // re-arm is pointless and bad now
632 _dispatch_release(ds); // the retain is done at creation time
633 }
634
635 void
636 _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent *ke)
637 {
638 struct kevent fake;
639
640 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
641 return;
642 }
643
644 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie.
645 // We simulate an exit event in this case. <rdar://problem/5067725>
646 if (ke->flags & EV_ERROR) {
647 if (ke->filter == EVFILT_PROC && ke->data == ESRCH) {
648 fake = *ke;
649 fake.flags &= ~EV_ERROR;
650 fake.fflags = NOTE_EXIT;
651 fake.data = 0;
652 ke = &fake;
653 } else {
654 // log the unexpected error
655 dispatch_assume_zero(ke->data);
656 return;
657 }
658 }
659
660 if (ds->ds_is_level) {
661 // ke->data is signed and "negative available data" makes no sense
662 // zero bytes happens when EV_EOF is set
663 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
664 dispatch_assert(ke->data >= 0l);
665 ds->ds_pending_data = ~ke->data;
666 } else if (ds->ds_is_adder) {
667 dispatch_atomic_add(&ds->ds_pending_data, ke->data);
668 } else {
669 dispatch_atomic_or(&ds->ds_pending_data, ke->fflags & ds->ds_pending_data_mask);
670 }
671
672 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
673 if (ds->ds_needs_rearm) {
674 ds->ds_is_armed = false;
675 }
676
677 _dispatch_wakeup(ds);
678 }
679
680 void
681 _dispatch_source_latch_and_call(dispatch_source_t ds)
682 {
683 unsigned long prev;
684
685 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == 0)) {
686 return;
687 }
688 prev = dispatch_atomic_xchg(&ds->ds_pending_data, 0);
689 if (ds->ds_is_level) {
690 ds->ds_data = ~prev;
691 } else {
692 ds->ds_data = prev;
693 }
694 if (dispatch_assume(prev)) {
695 if (ds->ds_handler_func) {
696 ds->ds_handler_func(ds->ds_handler_ctxt, ds);
697 }
698 }
699 }
700
701 void
702 _dispatch_source_cancel_callout(dispatch_source_t ds)
703 {
704 ds->ds_pending_data_mask = 0;
705 ds->ds_pending_data = 0;
706 ds->ds_data = 0;
707
708 #ifdef __BLOCKS__
709 if (ds->ds_handler_is_block) {
710 Block_release(ds->ds_handler_ctxt);
711 ds->ds_handler_is_block = false;
712 ds->ds_handler_func = NULL;
713 ds->ds_handler_ctxt = NULL;
714 }
715 #endif
716
717 if (!ds->ds_cancel_handler) {
718 return;
719 }
720 if (ds->ds_cancel_is_block) {
721 #ifdef __BLOCKS__
722 dispatch_block_t b = ds->ds_cancel_handler;
723 if (ds->ds_atomic_flags & DSF_CANCELED) {
724 b();
725 }
726 Block_release(ds->ds_cancel_handler);
727 ds->ds_cancel_is_block = false;
728 #endif
729 } else {
730 dispatch_function_t f = ds->ds_cancel_handler;
731 if (ds->ds_atomic_flags & DSF_CANCELED) {
732 f(ds->do_ctxt);
733 }
734 }
735 ds->ds_cancel_handler = NULL;
736 }
737
738 const struct dispatch_source_vtable_s _dispatch_source_kevent_vtable = {
739 .do_type = DISPATCH_SOURCE_KEVENT_TYPE,
740 .do_kind = "kevent-source",
741 .do_invoke = _dispatch_source_invoke,
742 .do_dispose = _dispatch_source_dispose,
743 .do_probe = _dispatch_source_probe,
744 .do_debug = _dispatch_source_kevent_debug,
745 };
746
747 void
748 dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
749 {
750 struct kevent kev = {
751 .fflags = (typeof(kev.fflags))val,
752 .data = val,
753 };
754
755 dispatch_assert(ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
756 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
757
758 _dispatch_source_merge_kevent(ds, &kev);
759 }
760
761 size_t
762 dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
763 {
764 dispatch_queue_t target = ds->do_targetq;
765 return snprintf(buf, bufsiz,
766 "target = %s[%p], pending_data = 0x%lx, pending_data_mask = 0x%lx, ",
767 target ? target->dq_label : "", target,
768 ds->ds_pending_data, ds->ds_pending_data_mask);
769 }
770
771 size_t
772 _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
773 {
774 size_t offset = 0;
775 offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", dx_kind(ds), ds);
776 offset += dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
777 offset += dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
778 return offset;
779 }
780
781 #ifndef DISPATCH_NO_LEGACY
782 static void
783 dispatch_source_attr_dispose(dispatch_source_attr_t attr)
784 {
785 // release the finalizer block if necessary
786 dispatch_source_attr_set_finalizer(attr, NULL);
787 _dispatch_dispose(attr);
788 }
789
790 static const struct dispatch_source_attr_vtable_s dispatch_source_attr_vtable = {
791 .do_type = DISPATCH_SOURCE_ATTR_TYPE,
792 .do_kind = "source-attr",
793 .do_dispose = dispatch_source_attr_dispose,
794 };
795
796 dispatch_source_attr_t
797 dispatch_source_attr_create(void)
798 {
799 dispatch_source_attr_t rval = calloc(1, sizeof(struct dispatch_source_attr_s));
800
801 if (rval) {
802 rval->do_vtable = &dispatch_source_attr_vtable;
803 rval->do_next = DISPATCH_OBJECT_LISTLESS;
804 rval->do_targetq = dispatch_get_global_queue(0, 0);
805 rval->do_ref_cnt = 1;
806 rval->do_xref_cnt = 1;
807 }
808
809 return rval;
810 }
811
812 void
813 dispatch_source_attr_set_finalizer_f(dispatch_source_attr_t attr,
814 void *context, dispatch_source_finalizer_function_t finalizer)
815 {
816 #ifdef __BLOCKS__
817 if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
818 Block_release(attr->finalizer_ctxt);
819 }
820 #endif
821
822 attr->finalizer_ctxt = context;
823 attr->finalizer_func = finalizer;
824 }
825
826 #ifdef __BLOCKS__
827 long
828 dispatch_source_attr_set_finalizer(dispatch_source_attr_t attr,
829 dispatch_source_finalizer_t finalizer)
830 {
831 void *ctxt;
832 dispatch_source_finalizer_function_t func;
833
834 if (finalizer) {
835 if (!(ctxt = Block_copy(finalizer))) {
836 return 1;
837 }
838 func = (void *)_dispatch_call_block_and_release2;
839 } else {
840 ctxt = NULL;
841 func = NULL;
842 }
843
844 dispatch_source_attr_set_finalizer_f(attr, ctxt, func);
845
846 return 0;
847 }
848
849 dispatch_source_finalizer_t
850 dispatch_source_attr_get_finalizer(dispatch_source_attr_t attr)
851 {
852 if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
853 return (dispatch_source_finalizer_t)attr->finalizer_ctxt;
854 } else if (attr->finalizer_func == NULL) {
855 return NULL;
856 } else {
857 abort(); // finalizer is not a block...
858 }
859 }
860 #endif
861
862 void
863 dispatch_source_attr_set_context(dispatch_source_attr_t attr, void *context)
864 {
865 attr->context = context;
866 }
867
868 dispatch_source_attr_t
869 dispatch_source_attr_copy(dispatch_source_attr_t proto)
870 {
871 dispatch_source_attr_t rval = NULL;
872
873 if (proto && (rval = malloc(sizeof(struct dispatch_source_attr_s)))) {
874 memcpy(rval, proto, sizeof(struct dispatch_source_attr_s));
875 #ifdef __BLOCKS__
876 if (rval->finalizer_func == (void*)_dispatch_call_block_and_release2) {
877 rval->finalizer_ctxt = Block_copy(rval->finalizer_ctxt);
878 }
879 #endif
880 } else if (!proto) {
881 rval = dispatch_source_attr_create();
882 }
883 return rval;
884 }
885 #endif /* DISPATCH_NO_LEGACY */
886
887
888 struct dispatch_source_type_s {
889 struct kevent ke;
890 uint64_t mask;
891 };
892
893 const struct dispatch_source_type_s _dispatch_source_type_timer = {
894 .ke = {
895 .filter = DISPATCH_EVFILT_TIMER,
896 },
897 .mask = DISPATCH_TIMER_INTERVAL|DISPATCH_TIMER_ONESHOT|DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_WALL_CLOCK,
898 };
899
900 const struct dispatch_source_type_s _dispatch_source_type_read = {
901 .ke = {
902 .filter = EVFILT_READ,
903 .flags = EV_DISPATCH,
904 },
905 };
906
907 const struct dispatch_source_type_s _dispatch_source_type_write = {
908 .ke = {
909 .filter = EVFILT_WRITE,
910 .flags = EV_DISPATCH,
911 },
912 };
913
914 const struct dispatch_source_type_s _dispatch_source_type_proc = {
915 .ke = {
916 .filter = EVFILT_PROC,
917 .flags = EV_CLEAR,
918 },
919 .mask = NOTE_EXIT|NOTE_FORK|NOTE_EXEC|NOTE_SIGNAL|NOTE_REAP,
920 };
921
922 const struct dispatch_source_type_s _dispatch_source_type_signal = {
923 .ke = {
924 .filter = EVFILT_SIGNAL,
925 },
926 };
927
928 const struct dispatch_source_type_s _dispatch_source_type_vnode = {
929 .ke = {
930 .filter = EVFILT_VNODE,
931 .flags = EV_CLEAR,
932 },
933 .mask = NOTE_DELETE|NOTE_WRITE|NOTE_EXTEND|NOTE_ATTRIB|NOTE_LINK|NOTE_RENAME|NOTE_REVOKE|NOTE_NONE,
934 };
935
936 const struct dispatch_source_type_s _dispatch_source_type_vfs = {
937 .ke = {
938 .filter = EVFILT_FS,
939 .flags = EV_CLEAR,
940 },
941 .mask = VQ_NOTRESP|VQ_NEEDAUTH|VQ_LOWDISK|VQ_MOUNT|VQ_UNMOUNT|VQ_DEAD|VQ_ASSIST|VQ_NOTRESPLOCK|VQ_UPDATE|VQ_VERYLOWDISK,
942 };
943
944 const struct dispatch_source_type_s _dispatch_source_type_mach_send = {
945 .ke = {
946 .filter = EVFILT_MACHPORT,
947 .flags = EV_DISPATCH,
948 .fflags = DISPATCH_MACHPORT_DEAD,
949 },
950 .mask = DISPATCH_MACH_SEND_DEAD,
951 };
952
953 const struct dispatch_source_type_s _dispatch_source_type_mach_recv = {
954 .ke = {
955 .filter = EVFILT_MACHPORT,
956 .flags = EV_DISPATCH,
957 .fflags = DISPATCH_MACHPORT_RECV,
958 },
959 };
960
961 const struct dispatch_source_type_s _dispatch_source_type_data_add = {
962 .ke = {
963 .filter = DISPATCH_EVFILT_CUSTOM_ADD,
964 },
965 };
966
967 const struct dispatch_source_type_s _dispatch_source_type_data_or = {
968 .ke = {
969 .filter = DISPATCH_EVFILT_CUSTOM_OR,
970 .flags = EV_CLEAR,
971 .fflags = ~0,
972 },
973 };
974
975 dispatch_source_t
976 dispatch_source_create(dispatch_source_type_t type,
977 uintptr_t handle,
978 unsigned long mask,
979 dispatch_queue_t q)
980 {
981 const struct kevent *proto_kev = &type->ke;
982 dispatch_source_t ds = NULL;
983 dispatch_kevent_t dk = NULL;
984
985 // input validation
986 if (type == NULL || (mask & ~type->mask)) {
987 goto out_bad;
988 }
989
990 switch (type->ke.filter) {
991 case EVFILT_SIGNAL:
992 if (handle >= NSIG) {
993 goto out_bad;
994 }
995 break;
996 case EVFILT_FS:
997 case DISPATCH_EVFILT_CUSTOM_ADD:
998 case DISPATCH_EVFILT_CUSTOM_OR:
999 case DISPATCH_EVFILT_TIMER:
1000 if (handle) {
1001 goto out_bad;
1002 }
1003 break;
1004 default:
1005 break;
1006 }
1007
1008 ds = calloc(1ul, sizeof(struct dispatch_source_s));
1009 if (slowpath(!ds)) {
1010 goto out_bad;
1011 }
1012 dk = calloc(1ul, sizeof(struct dispatch_kevent_s));
1013 if (slowpath(!dk)) {
1014 goto out_bad;
1015 }
1016
1017 dk->dk_kevent = *proto_kev;
1018 dk->dk_kevent.ident = handle;
1019 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
1020 dk->dk_kevent.fflags |= (uint32_t)mask;
1021 dk->dk_kevent.udata = dk;
1022 TAILQ_INIT(&dk->dk_sources);
1023
1024 // Initialize as a queue first, then override some settings below.
1025 _dispatch_queue_init((dispatch_queue_t)ds);
1026 strlcpy(ds->dq_label, "source", sizeof(ds->dq_label));
1027
1028 // Dispatch Object
1029 ds->do_vtable = &_dispatch_source_kevent_vtable;
1030 ds->do_ref_cnt++; // the reference the manger queue holds
1031 ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
1032 // do_targetq will be retained below, past point of no-return
1033 ds->do_targetq = q;
1034
1035 // Dispatch Source
1036 ds->ds_ident_hack = dk->dk_kevent.ident;
1037 ds->ds_dkev = dk;
1038 ds->ds_pending_data_mask = dk->dk_kevent.fflags;
1039 if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
1040 if (proto_kev->filter != EVFILT_MACHPORT) {
1041 ds->ds_is_level = true;
1042 }
1043 ds->ds_needs_rearm = true;
1044 } else if (!(EV_CLEAR & proto_kev->flags)) {
1045 // we cheat and use EV_CLEAR to mean a "flag thingy"
1046 ds->ds_is_adder = true;
1047 }
1048
1049 // If its a timer source, it needs to be re-armed
1050 if (type->ke.filter == DISPATCH_EVFILT_TIMER) {
1051 ds->ds_needs_rearm = true;
1052 }
1053
1054 dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
1055 #if DISPATCH_DEBUG
1056 dispatch_debug(ds, __FUNCTION__);
1057 #endif
1058
1059 // Some sources require special processing
1060 if (type == DISPATCH_SOURCE_TYPE_MACH_SEND) {
1061 static dispatch_once_t pred;
1062 dispatch_once_f(&pred, NULL, _dispatch_mach_notify_source_init);
1063 } else if (type == DISPATCH_SOURCE_TYPE_TIMER) {
1064 ds->ds_timer.flags = mask;
1065 }
1066
1067 _dispatch_retain(ds->do_targetq);
1068 return ds;
1069
1070 out_bad:
1071 free(ds);
1072 free(dk);
1073 return NULL;
1074 }
1075
1076 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1077 static void
1078 _dispatch_source_set_event_handler2(void *context)
1079 {
1080 struct Block_layout *bl = context;
1081
1082 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
1083 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
1084
1085 if (ds->ds_handler_is_block && ds->ds_handler_ctxt) {
1086 Block_release(ds->ds_handler_ctxt);
1087 }
1088 ds->ds_handler_func = bl ? (void *)bl->invoke : NULL;
1089 ds->ds_handler_ctxt = bl;
1090 ds->ds_handler_is_block = true;
1091 }
1092
1093 void
1094 dispatch_source_set_event_handler(dispatch_source_t ds, dispatch_block_t handler)
1095 {
1096 dispatch_assert(!ds->ds_is_legacy);
1097 handler = _dispatch_Block_copy(handler);
1098 dispatch_barrier_async_f((dispatch_queue_t)ds,
1099 handler, _dispatch_source_set_event_handler2);
1100 }
1101
1102 static void
1103 _dispatch_source_set_event_handler_f(void *context)
1104 {
1105 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
1106 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
1107
1108 if (ds->ds_handler_is_block && ds->ds_handler_ctxt) {
1109 Block_release(ds->ds_handler_ctxt);
1110 }
1111 ds->ds_handler_func = context;
1112 ds->ds_handler_ctxt = ds->do_ctxt;
1113 ds->ds_handler_is_block = false;
1114 }
1115
1116 void
1117 dispatch_source_set_event_handler_f(dispatch_source_t ds,
1118 dispatch_function_t handler)
1119 {
1120 dispatch_assert(!ds->ds_is_legacy);
1121 dispatch_barrier_async_f((dispatch_queue_t)ds,
1122 handler, _dispatch_source_set_event_handler_f);
1123 }
1124
1125 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1126 static void
1127 _dispatch_source_set_cancel_handler2(void *context)
1128 {
1129 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
1130 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
1131
1132 if (ds->ds_cancel_is_block && ds->ds_cancel_handler) {
1133 Block_release(ds->ds_cancel_handler);
1134 }
1135 ds->ds_cancel_handler = context;
1136 ds->ds_cancel_is_block = true;
1137 }
1138
1139 void
1140 dispatch_source_set_cancel_handler(dispatch_source_t ds,
1141 dispatch_block_t handler)
1142 {
1143 dispatch_assert(!ds->ds_is_legacy);
1144 handler = _dispatch_Block_copy(handler);
1145 dispatch_barrier_async_f((dispatch_queue_t)ds,
1146 handler, _dispatch_source_set_cancel_handler2);
1147 }
1148
1149 static void
1150 _dispatch_source_set_cancel_handler_f(void *context)
1151 {
1152 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
1153 dispatch_assert(ds->do_vtable == &_dispatch_source_kevent_vtable);
1154
1155 if (ds->ds_cancel_is_block && ds->ds_cancel_handler) {
1156 Block_release(ds->ds_cancel_handler);
1157 }
1158 ds->ds_cancel_handler = context;
1159 ds->ds_cancel_is_block = false;
1160 }
1161
1162 void
1163 dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
1164 dispatch_function_t handler)
1165 {
1166 dispatch_assert(!ds->ds_is_legacy);
1167 dispatch_barrier_async_f((dispatch_queue_t)ds,
1168 handler, _dispatch_source_set_cancel_handler_f);
1169 }
1170
1171 #ifndef DISPATCH_NO_LEGACY
1172 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1173 dispatch_source_t
1174 _dispatch_source_create2(dispatch_source_t ds,
1175 dispatch_source_attr_t attr,
1176 void *context,
1177 dispatch_source_handler_function_t handler)
1178 {
1179 if (ds == NULL || handler == NULL) {
1180 return NULL;
1181 }
1182
1183 ds->ds_is_legacy = true;
1184
1185 ds->ds_handler_func = handler;
1186 ds->ds_handler_ctxt = context;
1187
1188 if (attr && attr != DISPATCH_SOURCE_CREATE_SUSPENDED) {
1189 ds->dq_finalizer_ctxt = attr->finalizer_ctxt;
1190 ds->dq_finalizer_func = (typeof(ds->dq_finalizer_func))attr->finalizer_func;
1191 ds->do_ctxt = attr->context;
1192 }
1193 #ifdef __BLOCKS__
1194 if (ds->dq_finalizer_func == (void*)_dispatch_call_block_and_release2) {
1195 ds->dq_finalizer_ctxt = Block_copy(ds->dq_finalizer_ctxt);
1196 if (!ds->dq_finalizer_ctxt) {
1197 goto out_bad;
1198 }
1199 }
1200 if (handler == _dispatch_source_call_block) {
1201 struct Block_layout *bl = ds->ds_handler_ctxt = Block_copy(context);
1202 if (!ds->ds_handler_ctxt) {
1203 if (ds->dq_finalizer_func == (void*)_dispatch_call_block_and_release2) {
1204 Block_release(ds->dq_finalizer_ctxt);
1205 }
1206 goto out_bad;
1207 }
1208 ds->ds_handler_func = (void *)bl->invoke;
1209 ds->ds_handler_is_block = true;
1210 }
1211
1212 // all legacy sources get a cancellation event on the normal event handler.
1213 dispatch_source_handler_function_t func = ds->ds_handler_func;
1214 dispatch_source_handler_t block = ds->ds_handler_ctxt;
1215 void *ctxt = ds->ds_handler_ctxt;
1216 bool handler_is_block = ds->ds_handler_is_block;
1217
1218 ds->ds_cancel_is_block = true;
1219 if (handler_is_block) {
1220 ds->ds_cancel_handler = _dispatch_Block_copy(^{
1221 block(ds);
1222 });
1223 } else {
1224 ds->ds_cancel_handler = _dispatch_Block_copy(^{
1225 func(ctxt, ds);
1226 });
1227 }
1228 #endif
1229 if (attr != DISPATCH_SOURCE_CREATE_SUSPENDED) {
1230 dispatch_resume(ds);
1231 }
1232
1233 return ds;
1234
1235 out_bad:
1236 free(ds);
1237 return NULL;
1238 }
1239
1240 long
1241 dispatch_source_get_error(dispatch_source_t ds, long *err_out)
1242 {
1243 // 6863892 don't report ECANCELED until kevent is unregistered
1244 if ((ds->ds_atomic_flags & DSF_CANCELED) && !ds->ds_dkev) {
1245 if (err_out) {
1246 *err_out = ECANCELED;
1247 }
1248 return DISPATCH_ERROR_DOMAIN_POSIX;
1249 } else {
1250 return DISPATCH_ERROR_DOMAIN_NO_ERROR;
1251 }
1252 }
1253 #endif /* DISPATCH_NO_LEGACY */
1254
1255 // Updates the ordered list of timers based on next fire date for changes to ds.
1256 // Should only be called from the context of _dispatch_mgr_q.
1257 void
1258 _dispatch_timer_list_update(dispatch_source_t ds)
1259 {
1260 dispatch_source_t dsi = NULL;
1261 int idx;
1262
1263 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q);
1264
1265 // do not reschedule timers unregistered with _dispatch_kevent_release()
1266 if (!ds->ds_dkev) {
1267 return;
1268 }
1269
1270 // Ensure the source is on the global kevent lists before it is removed and
1271 // readded below.
1272 _dispatch_kevent_merge(ds);
1273
1274 TAILQ_REMOVE(&ds->ds_dkev->dk_sources, ds, ds_list);
1275
1276 // change the list if the clock type has changed
1277 if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
1278 idx = DISPATCH_TIMER_INDEX_WALL;
1279 } else {
1280 idx = DISPATCH_TIMER_INDEX_MACH;
1281 }
1282 ds->ds_dkev = &_dispatch_kevent_timer[idx];
1283
1284 if (ds->ds_timer.target) {
1285 TAILQ_FOREACH(dsi, &ds->ds_dkev->dk_sources, ds_list) {
1286 if (dsi->ds_timer.target == 0 || ds->ds_timer.target < dsi->ds_timer.target) {
1287 break;
1288 }
1289 }
1290 }
1291
1292 if (dsi) {
1293 TAILQ_INSERT_BEFORE(dsi, ds, ds_list);
1294 } else {
1295 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds, ds_list);
1296 }
1297 }
1298
1299 static void
1300 _dispatch_run_timers2(unsigned int timer)
1301 {
1302 dispatch_source_t ds;
1303 uint64_t now, missed;
1304
1305 if (timer == DISPATCH_TIMER_INDEX_MACH) {
1306 now = mach_absolute_time();
1307 } else {
1308 now = _dispatch_get_nanoseconds();
1309 }
1310
1311 while ((ds = TAILQ_FIRST(&_dispatch_kevent_timer[timer].dk_sources))) {
1312 // We may find timers on the wrong list due to a pending update from
1313 // dispatch_source_set_timer. Force an update of the list in that case.
1314 if (timer != ds->ds_ident_hack) {
1315 _dispatch_timer_list_update(ds);
1316 continue;
1317 }
1318 if (!ds->ds_timer.target) {
1319 // no configured timers on the list
1320 break;
1321 }
1322 if (ds->ds_timer.target > now) {
1323 // Done running timers for now.
1324 break;
1325 }
1326
1327 if (ds->ds_timer.flags & (DISPATCH_TIMER_ONESHOT|DISPATCH_TIMER_ABSOLUTE)) {
1328 dispatch_atomic_inc(&ds->ds_pending_data);
1329 ds->ds_timer.target = 0;
1330 } else {
1331 // Calculate number of missed intervals.
1332 missed = (now - ds->ds_timer.target) / ds->ds_timer.interval;
1333 dispatch_atomic_add(&ds->ds_pending_data, missed + 1);
1334 ds->ds_timer.target += (missed + 1) * ds->ds_timer.interval;
1335 }
1336
1337 _dispatch_timer_list_update(ds);
1338 _dispatch_wakeup(ds);
1339 }
1340 }
1341
1342 void
1343 _dispatch_run_timers(void)
1344 {
1345 unsigned int i;
1346 for (i = 0; i < DISPATCH_TIMER_COUNT; i++) {
1347 _dispatch_run_timers2(i);
1348 }
1349 }
1350
1351 #if defined(__i386__) || defined(__x86_64__)
1352 // these architectures always return mach_absolute_time() in nanoseconds
1353 #define _dispatch_convert_mach2nano(x) (x)
1354 #define _dispatch_convert_nano2mach(x) (x)
1355 #else
1356 static mach_timebase_info_data_t tbi;
1357 static dispatch_once_t tbi_pred;
1358
1359 static void
1360 _dispatch_convert_init(void *context __attribute__((unused)))
1361 {
1362 dispatch_assume_zero(mach_timebase_info(&tbi));
1363 }
1364
1365 static uint64_t
1366 _dispatch_convert_mach2nano(uint64_t val)
1367 {
1368 #ifdef __LP64__
1369 __uint128_t tmp;
1370 #else
1371 long double tmp;
1372 #endif
1373
1374 dispatch_once_f(&tbi_pred, NULL, _dispatch_convert_init);
1375
1376 tmp = val;
1377 tmp *= tbi.numer;
1378 tmp /= tbi.denom;
1379
1380 return tmp;
1381 }
1382
1383 static uint64_t
1384 _dispatch_convert_nano2mach(uint64_t val)
1385 {
1386 #ifdef __LP64__
1387 __uint128_t tmp;
1388 #else
1389 long double tmp;
1390 #endif
1391
1392 dispatch_once_f(&tbi_pred, NULL, _dispatch_convert_init);
1393
1394 tmp = val;
1395 tmp *= tbi.denom;
1396 tmp /= tbi.numer;
1397
1398 return tmp;
1399 }
1400 #endif
1401
1402 // approx 1 year (60s * 60m * 24h * 365d)
1403 #define FOREVER_SEC 3153600l
1404 #define FOREVER_NSEC 31536000000000000ull
1405
1406 struct timespec *
1407 _dispatch_get_next_timer_fire(struct timespec *howsoon)
1408 {
1409 // <rdar://problem/6459649>
1410 // kevent(2) does not allow large timeouts, so we use a long timeout
1411 // instead (approximately 1 year).
1412 dispatch_source_t ds = NULL;
1413 unsigned int timer;
1414 uint64_t now, delta_tmp, delta = UINT64_MAX;
1415
1416 // We are looking for the first unsuspended timer which has its target
1417 // time set. Given timers are kept in order, if we hit an timer that's
1418 // unset there's no point in continuing down the list.
1419 for (timer = 0; timer < DISPATCH_TIMER_COUNT; timer++) {
1420 TAILQ_FOREACH(ds, &_dispatch_kevent_timer[timer].dk_sources, ds_list) {
1421 if (!ds->ds_timer.target) {
1422 break;
1423 }
1424 if (DISPATCH_OBJECT_SUSPENDED(ds)) {
1425 ds->ds_is_armed = false;
1426 } else {
1427 break;
1428 }
1429 }
1430
1431 if (!ds || !ds->ds_timer.target) {
1432 continue;
1433 }
1434
1435 if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
1436 now = _dispatch_get_nanoseconds();
1437 } else {
1438 now = mach_absolute_time();
1439 }
1440 if (ds->ds_timer.target <= now) {
1441 howsoon->tv_sec = 0;
1442 howsoon->tv_nsec = 0;
1443 return howsoon;
1444 }
1445
1446 // the subtraction cannot go negative because the previous "if"
1447 // verified that the target is greater than now.
1448 delta_tmp = ds->ds_timer.target - now;
1449 if (!(ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK)) {
1450 delta_tmp = _dispatch_convert_mach2nano(delta_tmp);
1451 }
1452 if (delta_tmp < delta) {
1453 delta = delta_tmp;
1454 }
1455 }
1456 if (slowpath(delta > FOREVER_NSEC)) {
1457 return NULL;
1458 } else {
1459 howsoon->tv_sec = (time_t)(delta / NSEC_PER_SEC);
1460 howsoon->tv_nsec = (long)(delta % NSEC_PER_SEC);
1461 }
1462 return howsoon;
1463 }
1464
1465 struct dispatch_set_timer_params {
1466 dispatch_source_t ds;
1467 uintptr_t ident;
1468 struct dispatch_timer_source_s values;
1469 };
1470
1471 // To be called from the context of the _dispatch_mgr_q
1472 static void
1473 _dispatch_source_set_timer2(void *context)
1474 {
1475 struct dispatch_set_timer_params *params = context;
1476 dispatch_source_t ds = params->ds;
1477 ds->ds_ident_hack = params->ident;
1478 ds->ds_timer = params->values;
1479 _dispatch_timer_list_update(ds);
1480 dispatch_resume(ds);
1481 dispatch_release(ds);
1482 free(params);
1483 }
1484
1485 void
1486 dispatch_source_set_timer(dispatch_source_t ds,
1487 dispatch_time_t start,
1488 uint64_t interval,
1489 uint64_t leeway)
1490 {
1491 struct dispatch_set_timer_params *params;
1492
1493 // we use zero internally to mean disabled
1494 if (interval == 0) {
1495 interval = 1;
1496 } else if ((int64_t)interval < 0) {
1497 // 6866347 - make sure nanoseconds won't overflow
1498 interval = INT64_MAX;
1499 }
1500
1501 // Suspend the source so that it doesn't fire with pending changes
1502 // The use of suspend/resume requires the external retain/release
1503 dispatch_retain(ds);
1504 dispatch_suspend(ds);
1505
1506 if (start == DISPATCH_TIME_NOW) {
1507 start = mach_absolute_time();
1508 } else if (start == DISPATCH_TIME_FOREVER) {
1509 start = INT64_MAX;
1510 }
1511
1512 while (!(params = malloc(sizeof(struct dispatch_set_timer_params)))) {
1513 sleep(1);
1514 }
1515
1516 params->ds = ds;
1517 params->values.flags = ds->ds_timer.flags;
1518
1519 if ((int64_t)start < 0) {
1520 // wall clock
1521 params->ident = DISPATCH_TIMER_INDEX_WALL;
1522 params->values.start = -((int64_t)start);
1523 params->values.target = -((int64_t)start);
1524 params->values.interval = interval;
1525 params->values.leeway = leeway;
1526 params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
1527 } else {
1528 // mach clock
1529 params->ident = DISPATCH_TIMER_INDEX_MACH;
1530 params->values.start = start;
1531 params->values.target = start;
1532 params->values.interval = _dispatch_convert_nano2mach(interval);
1533 params->values.leeway = _dispatch_convert_nano2mach(leeway);
1534 params->values.flags &= ~DISPATCH_TIMER_WALL_CLOCK;
1535 }
1536
1537 dispatch_barrier_async_f(&_dispatch_mgr_q, params, _dispatch_source_set_timer2);
1538 }
1539
1540 #ifndef DISPATCH_NO_LEGACY
1541 // LEGACY
1542 long
1543 dispatch_source_timer_set_time(dispatch_source_t ds, uint64_t nanoseconds, uint64_t leeway)
1544 {
1545 dispatch_time_t start;
1546 if (nanoseconds == 0) {
1547 nanoseconds = 1;
1548 }
1549 if (ds->ds_timer.flags == (DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_WALL_CLOCK)) {
1550 static const struct timespec t0;
1551 start = dispatch_walltime(&t0, nanoseconds);
1552 } else if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
1553 start = dispatch_walltime(DISPATCH_TIME_NOW, nanoseconds);
1554 } else {
1555 start = dispatch_time(DISPATCH_TIME_NOW, nanoseconds);
1556 }
1557 if (ds->ds_timer.flags & (DISPATCH_TIMER_ABSOLUTE|DISPATCH_TIMER_ONESHOT)) {
1558 // 6866347 - make sure nanoseconds won't overflow
1559 nanoseconds = INT64_MAX; // non-repeating (~292 years)
1560 }
1561 dispatch_source_set_timer(ds, start, nanoseconds, leeway);
1562 return 0;
1563 }
1564
1565 // LEGACY
1566 uint64_t
1567 dispatch_event_get_nanoseconds(dispatch_source_t ds)
1568 {
1569 if (ds->ds_timer.flags & DISPATCH_TIMER_WALL_CLOCK) {
1570 return ds->ds_timer.interval;
1571 } else {
1572 return _dispatch_convert_mach2nano(ds->ds_timer.interval);
1573 }
1574 }
1575 #endif /* DISPATCH_NO_LEGACY */
1576
1577 static dispatch_source_t _dispatch_mach_notify_source;
1578 static mach_port_t _dispatch_port_set;
1579 static mach_port_t _dispatch_event_port;
1580
1581 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
1582 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
1583
1584 #define _DISPATCH_MACHPORT_HASH_SIZE 32
1585 #define _DISPATCH_MACHPORT_HASH(x) _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
1586
1587 static void _dispatch_port_set_init(void *);
1588 static mach_port_t _dispatch_get_port_set(void);
1589
1590 void
1591 _dispatch_drain_mach_messages(struct kevent *ke)
1592 {
1593 dispatch_source_t dsi;
1594 dispatch_kevent_t dk;
1595 struct kevent ke2;
1596
1597 if (!dispatch_assume(ke->data)) {
1598 return;
1599 }
1600 dk = _dispatch_kevent_find(ke->data, EVFILT_MACHPORT);
1601 if (!dispatch_assume(dk)) {
1602 return;
1603 }
1604 _dispatch_kevent_machport_disable(dk); // emulate EV_DISPATCH
1605
1606 EV_SET(&ke2, ke->data, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH, DISPATCH_MACHPORT_RECV, 0, dk);
1607
1608 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
1609 _dispatch_source_merge_kevent(dsi, &ke2);
1610 }
1611 }
1612
1613 void
1614 _dispatch_port_set_init(void *context __attribute__((unused)))
1615 {
1616 struct kevent kev = {
1617 .filter = EVFILT_MACHPORT,
1618 .flags = EV_ADD,
1619 };
1620 kern_return_t kr;
1621
1622 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &_dispatch_port_set);
1623 DISPATCH_VERIFY_MIG(kr);
1624 dispatch_assume_zero(kr);
1625 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &_dispatch_event_port);
1626 DISPATCH_VERIFY_MIG(kr);
1627 dispatch_assume_zero(kr);
1628 kr = mach_port_move_member(mach_task_self(), _dispatch_event_port, _dispatch_port_set);
1629 DISPATCH_VERIFY_MIG(kr);
1630 dispatch_assume_zero(kr);
1631
1632 kev.ident = _dispatch_port_set;
1633
1634 _dispatch_update_kq(&kev);
1635 }
1636
1637 mach_port_t
1638 _dispatch_get_port_set(void)
1639 {
1640 static dispatch_once_t pred;
1641
1642 dispatch_once_f(&pred, NULL, _dispatch_port_set_init);
1643
1644 return _dispatch_port_set;
1645 }
1646
1647 void
1648 _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags, uint32_t del_flags)
1649 {
1650 mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
1651 kern_return_t kr;
1652
1653 if ((new_flags & DISPATCH_MACHPORT_RECV) || (!new_flags && !del_flags && dk->dk_kevent.fflags & DISPATCH_MACHPORT_RECV)) {
1654 _dispatch_kevent_machport_enable(dk);
1655 }
1656 if (new_flags & DISPATCH_MACHPORT_DEAD) {
1657 kr = mach_port_request_notification(mach_task_self(), port, MACH_NOTIFY_DEAD_NAME, 1,
1658 _dispatch_event_port, MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
1659 DISPATCH_VERIFY_MIG(kr);
1660
1661
1662 switch(kr) {
1663 case KERN_INVALID_NAME:
1664 case KERN_INVALID_RIGHT:
1665 // Supress errors
1666 break;
1667 default:
1668 // Else, we dont expect any errors from mach. Log any errors if we do
1669 if (dispatch_assume_zero(kr)) {
1670 // log the error
1671 } else if (dispatch_assume_zero(previous)) {
1672 // Another subsystem has beat libdispatch to requesting the Mach
1673 // dead-name notification on this port. We should technically cache the
1674 // previous port and message it when the kernel messages our port. Or
1675 // we can just say screw those subsystems and drop the previous port.
1676 // They should adopt libdispatch :-P
1677 kr = mach_port_deallocate(mach_task_self(), previous);
1678 DISPATCH_VERIFY_MIG(kr);
1679 dispatch_assume_zero(kr);
1680 }
1681 }
1682 }
1683
1684 if (del_flags & DISPATCH_MACHPORT_RECV) {
1685 _dispatch_kevent_machport_disable(dk);
1686 }
1687 if (del_flags & DISPATCH_MACHPORT_DEAD) {
1688 kr = mach_port_request_notification(mach_task_self(), (mach_port_t)dk->dk_kevent.ident,
1689 MACH_NOTIFY_DEAD_NAME, 1, MACH_PORT_NULL, MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
1690 DISPATCH_VERIFY_MIG(kr);
1691
1692 switch (kr) {
1693 case KERN_INVALID_NAME:
1694 case KERN_INVALID_RIGHT:
1695 case KERN_INVALID_ARGUMENT:
1696 break;
1697 default:
1698 if (dispatch_assume_zero(kr)) {
1699 // log the error
1700 } else if (previous) {
1701 // the kernel has not consumed the right yet
1702 dispatch_assume_zero(_dispatch_send_consume_send_once_right(previous));
1703 }
1704 }
1705 }
1706 }
1707
1708 void
1709 _dispatch_kevent_machport_enable(dispatch_kevent_t dk)
1710 {
1711 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
1712 kern_return_t kr;
1713
1714 kr = mach_port_move_member(mach_task_self(), mp, _dispatch_get_port_set());
1715 DISPATCH_VERIFY_MIG(kr);
1716 switch (kr) {
1717 case KERN_INVALID_NAME:
1718 #if DISPATCH_DEBUG
1719 _dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp);
1720 #endif
1721 break;
1722 default:
1723 dispatch_assume_zero(kr);
1724 }
1725 }
1726
1727 void
1728 _dispatch_kevent_machport_disable(dispatch_kevent_t dk)
1729 {
1730 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
1731 kern_return_t kr;
1732
1733 kr = mach_port_move_member(mach_task_self(), mp, 0);
1734 DISPATCH_VERIFY_MIG(kr);
1735 switch (kr) {
1736 case KERN_INVALID_RIGHT:
1737 case KERN_INVALID_NAME:
1738 #if DISPATCH_DEBUG
1739 _dispatch_log("Corruption: Mach receive right 0x%x destroyed prematurely", mp);
1740 #endif
1741 break;
1742 case 0:
1743 break;
1744 default:
1745 dispatch_assume_zero(kr);
1746 break;
1747 }
1748 }
1749
1750 #define _DISPATCH_MIN_MSG_SZ (8ul * 1024ul - MAX_TRAILER_SIZE)
1751 #ifndef DISPATCH_NO_LEGACY
1752 dispatch_source_t
1753 dispatch_source_mig_create(mach_port_t mport, size_t max_msg_size, dispatch_source_attr_t attr,
1754 dispatch_queue_t dq, dispatch_mig_callback_t mig_callback)
1755 {
1756 if (max_msg_size < _DISPATCH_MIN_MSG_SZ) {
1757 max_msg_size = _DISPATCH_MIN_MSG_SZ;
1758 }
1759 return dispatch_source_machport_create(mport, DISPATCH_MACHPORT_RECV, attr, dq,
1760 ^(dispatch_source_t ds) {
1761 if (!dispatch_source_get_error(ds, NULL)) {
1762 if (dq->dq_width != 1) {
1763 dispatch_retain(ds); // this is a shim -- use the external retain
1764 dispatch_async(dq, ^{
1765 dispatch_mig_server(ds, max_msg_size, mig_callback);
1766 dispatch_release(ds); // this is a shim -- use the external release
1767 });
1768 } else {
1769 dispatch_mig_server(ds, max_msg_size, mig_callback);
1770 }
1771 }
1772 });
1773 }
1774 #endif /* DISPATCH_NO_LEGACY */
1775
1776 static void
1777 _dispatch_mach_notify_source_init(void *context __attribute__((unused)))
1778 {
1779 size_t maxsz = sizeof(union __RequestUnion___dispatch_send_libdispatch_internal_protocol_subsystem);
1780
1781 if (sizeof(union __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem) > maxsz) {
1782 maxsz = sizeof(union __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem);
1783 }
1784
1785 _dispatch_get_port_set();
1786
1787 _dispatch_mach_notify_source = dispatch_source_mig_create(_dispatch_event_port,
1788 maxsz, NULL, &_dispatch_mgr_q, libdispatch_internal_protocol_server);
1789
1790 dispatch_assert(_dispatch_mach_notify_source);
1791 }
1792
1793 kern_return_t
1794 _dispatch_mach_notify_port_deleted(mach_port_t notify __attribute__((unused)), mach_port_name_t name)
1795 {
1796 dispatch_source_t dsi;
1797 dispatch_kevent_t dk;
1798 struct kevent kev;
1799
1800 #if DISPATCH_DEBUG
1801 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x deleted prematurely", name);
1802 #endif
1803
1804 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
1805 if (!dk) {
1806 goto out;
1807 }
1808
1809 EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH|EV_EOF, DISPATCH_MACHPORT_DELETED, 0, dk);
1810
1811 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
1812 _dispatch_source_merge_kevent(dsi, &kev);
1813 // this can never happen again
1814 // this must happen after the merge
1815 // this may be racy in the future, but we don't provide a 'setter' API for the mask yet
1816 dsi->ds_pending_data_mask &= ~DISPATCH_MACHPORT_DELETED;
1817 }
1818
1819 // no more sources have this flag
1820 dk->dk_kevent.fflags &= ~DISPATCH_MACHPORT_DELETED;
1821
1822 out:
1823 return KERN_SUCCESS;
1824 }
1825
1826 kern_return_t
1827 _dispatch_mach_notify_port_destroyed(mach_port_t notify __attribute__((unused)), mach_port_t name)
1828 {
1829 kern_return_t kr;
1830 // this function should never be called
1831 dispatch_assume_zero(name);
1832 kr = mach_port_mod_refs(mach_task_self(), name, MACH_PORT_RIGHT_RECEIVE, -1);
1833 DISPATCH_VERIFY_MIG(kr);
1834 dispatch_assume_zero(kr);
1835 return KERN_SUCCESS;
1836 }
1837
1838 kern_return_t
1839 _dispatch_mach_notify_no_senders(mach_port_t notify, mach_port_mscount_t mscnt __attribute__((unused)))
1840 {
1841 // this function should never be called
1842 dispatch_assume_zero(notify);
1843 return KERN_SUCCESS;
1844 }
1845
1846 kern_return_t
1847 _dispatch_mach_notify_send_once(mach_port_t notify __attribute__((unused)))
1848 {
1849 // we only register for dead-name notifications
1850 // some code deallocated our send-once right without consuming it
1851 #if DISPATCH_DEBUG
1852 _dispatch_log("Corruption: An app/library deleted a libdispatch dead-name notification");
1853 #endif
1854 return KERN_SUCCESS;
1855 }
1856
1857 kern_return_t
1858 _dispatch_mach_notify_dead_name(mach_port_t notify __attribute__((unused)), mach_port_name_t name)
1859 {
1860 dispatch_source_t dsi;
1861 dispatch_kevent_t dk;
1862 struct kevent kev;
1863 kern_return_t kr;
1864
1865 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
1866 if (!dk) {
1867 goto out;
1868 }
1869
1870 EV_SET(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH|EV_EOF, DISPATCH_MACHPORT_DEAD, 0, dk);
1871
1872 TAILQ_FOREACH(dsi, &dk->dk_sources, ds_list) {
1873 _dispatch_source_merge_kevent(dsi, &kev);
1874 // this can never happen again
1875 // this must happen after the merge
1876 // this may be racy in the future, but we don't provide a 'setter' API for the mask yet
1877 dsi->ds_pending_data_mask &= ~DISPATCH_MACHPORT_DEAD;
1878 }
1879
1880 // no more sources have this flag
1881 dk->dk_kevent.fflags &= ~DISPATCH_MACHPORT_DEAD;
1882
1883 out:
1884 // the act of receiving a dead name notification allocates a dead-name right that must be deallocated
1885 kr = mach_port_deallocate(mach_task_self(), name);
1886 DISPATCH_VERIFY_MIG(kr);
1887 //dispatch_assume_zero(kr);
1888
1889 return KERN_SUCCESS;
1890 }
1891
1892 kern_return_t
1893 _dispatch_wakeup_main_thread(mach_port_t mp __attribute__((unused)))
1894 {
1895 // dummy function just to pop out the main thread out of mach_msg()
1896 return 0;
1897 }
1898
1899 kern_return_t
1900 _dispatch_consume_send_once_right(mach_port_t mp __attribute__((unused)))
1901 {
1902 // dummy function to consume a send-once right
1903 return 0;
1904 }
1905
1906 mach_msg_return_t
1907 dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz, dispatch_mig_callback_t callback)
1908 {
1909 mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
1910 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
1911 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0);
1912 mach_msg_options_t tmp_options = options;
1913 mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
1914 mach_msg_return_t kr = 0;
1915 unsigned int cnt = 1000; // do not stall out serial queues
1916 int demux_success;
1917
1918 maxmsgsz += MAX_TRAILER_SIZE;
1919
1920 // XXX FIXME -- allocate these elsewhere
1921 bufRequest = alloca(maxmsgsz);
1922 bufReply = alloca(maxmsgsz);
1923 bufReply->Head.msgh_size = 0; // make CLANG happy
1924
1925 // XXX FIXME -- change this to not starve out the target queue
1926 for (;;) {
1927 if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
1928 options &= ~MACH_RCV_MSG;
1929 tmp_options &= ~MACH_RCV_MSG;
1930
1931 if (!(tmp_options & MACH_SEND_MSG)) {
1932 break;
1933 }
1934 }
1935
1936 kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
1937 (mach_msg_size_t)maxmsgsz, (mach_port_t)ds->ds_ident_hack, 0, 0);
1938
1939 tmp_options = options;
1940
1941 if (slowpath(kr)) {
1942 switch (kr) {
1943 case MACH_SEND_INVALID_DEST:
1944 case MACH_SEND_TIMED_OUT:
1945 if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
1946 mach_msg_destroy(&bufReply->Head);
1947 }
1948 break;
1949 case MACH_RCV_TIMED_OUT:
1950 case MACH_RCV_INVALID_NAME:
1951 break;
1952 default:
1953 dispatch_assume_zero(kr);
1954 break;
1955 }
1956 break;
1957 }
1958
1959 if (!(tmp_options & MACH_RCV_MSG)) {
1960 break;
1961 }
1962
1963 bufTemp = bufRequest;
1964 bufRequest = bufReply;
1965 bufReply = bufTemp;
1966
1967 demux_success = callback(&bufRequest->Head, &bufReply->Head);
1968
1969 if (!demux_success) {
1970 // destroy the request - but not the reply port
1971 bufRequest->Head.msgh_remote_port = 0;
1972 mach_msg_destroy(&bufRequest->Head);
1973 } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
1974 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode is present
1975 if (slowpath(bufReply->RetCode)) {
1976 if (bufReply->RetCode == MIG_NO_REPLY) {
1977 continue;
1978 }
1979
1980 // destroy the request - but not the reply port
1981 bufRequest->Head.msgh_remote_port = 0;
1982 mach_msg_destroy(&bufRequest->Head);
1983 }
1984 }
1985
1986 if (bufReply->Head.msgh_remote_port) {
1987 tmp_options |= MACH_SEND_MSG;
1988 if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) != MACH_MSG_TYPE_MOVE_SEND_ONCE) {
1989 tmp_options |= MACH_SEND_TIMEOUT;
1990 }
1991 }
1992 }
1993
1994 return kr;
1995 }