]> git.saurik.com Git - apple/libdispatch.git/blob - src/queue.c
libdispatch-84.5.tar.gz
[apple/libdispatch.git] / src / queue.c
1 /*
2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #include "protocol.h"
23
24 void
25 dummy_function(void)
26 {
27 }
28
29 long
30 dummy_function_r0(void)
31 {
32 return 0;
33 }
34
35 static bool _dispatch_select_workaround;
36 static fd_set _dispatch_rfds;
37 static fd_set _dispatch_wfds;
38 static void *_dispatch_rfd_ptrs[FD_SETSIZE];
39 static void *_dispatch_wfd_ptrs[FD_SETSIZE];
40
41
42 static struct dispatch_semaphore_s _dispatch_thread_mediator[] = {
43 {
44 .do_vtable = &_dispatch_semaphore_vtable,
45 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
46 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
47 },
48 {
49 .do_vtable = &_dispatch_semaphore_vtable,
50 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
51 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
52 },
53 {
54 .do_vtable = &_dispatch_semaphore_vtable,
55 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
56 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
57 },
58 {
59 .do_vtable = &_dispatch_semaphore_vtable,
60 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
61 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
62 },
63 {
64 .do_vtable = &_dispatch_semaphore_vtable,
65 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
66 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
67 },
68 {
69 .do_vtable = &_dispatch_semaphore_vtable,
70 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
71 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
72 },
73 };
74
75 static struct dispatch_queue_s _dispatch_root_queues[];
76
77 static inline dispatch_queue_t
78 _dispatch_get_root_queue(long priority, bool overcommit)
79 {
80 if (overcommit) switch (priority) {
81 case DISPATCH_QUEUE_PRIORITY_LOW:
82 return &_dispatch_root_queues[1];
83 case DISPATCH_QUEUE_PRIORITY_DEFAULT:
84 return &_dispatch_root_queues[3];
85 case DISPATCH_QUEUE_PRIORITY_HIGH:
86 return &_dispatch_root_queues[5];
87 }
88 switch (priority) {
89 case DISPATCH_QUEUE_PRIORITY_LOW:
90 return &_dispatch_root_queues[0];
91 case DISPATCH_QUEUE_PRIORITY_DEFAULT:
92 return &_dispatch_root_queues[2];
93 case DISPATCH_QUEUE_PRIORITY_HIGH:
94 return &_dispatch_root_queues[4];
95 default:
96 return NULL;
97 }
98 }
99
100 #ifdef __BLOCKS__
101 dispatch_block_t
102 _dispatch_Block_copy(dispatch_block_t db)
103 {
104 dispatch_block_t rval;
105
106 while (!(rval = Block_copy(db))) {
107 sleep(1);
108 }
109
110 return rval;
111 }
112 #define _dispatch_Block_copy(x) ((typeof(x))_dispatch_Block_copy(x))
113
114 void
115 _dispatch_call_block_and_release(void *block)
116 {
117 void (^b)(void) = block;
118 b();
119 Block_release(b);
120 }
121
122 void
123 _dispatch_call_block_and_release2(void *block, void *ctxt)
124 {
125 void (^b)(void*) = block;
126 b(ctxt);
127 Block_release(b);
128 }
129
130 #endif /* __BLOCKS__ */
131
132 struct dispatch_queue_attr_vtable_s {
133 DISPATCH_VTABLE_HEADER(dispatch_queue_attr_s);
134 };
135
136 struct dispatch_queue_attr_s {
137 DISPATCH_STRUCT_HEADER(dispatch_queue_attr_s, dispatch_queue_attr_vtable_s);
138
139 // Public:
140 int qa_priority;
141 void* finalizer_ctxt;
142 dispatch_queue_finalizer_function_t finalizer_func;
143
144 // Private:
145 unsigned long qa_flags;
146 };
147
148 static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
149
150 #define _dispatch_queue_trylock(dq) dispatch_atomic_cmpxchg(&(dq)->dq_running, 0, 1)
151 static inline void _dispatch_queue_unlock(dispatch_queue_t dq);
152 static void _dispatch_queue_invoke(dispatch_queue_t dq);
153 static void _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq);
154 static bool _dispatch_queue_wakeup_global(dispatch_queue_t dq);
155 static struct dispatch_object_s *_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq);
156
157 static bool _dispatch_program_is_probably_callback_driven;
158
159 #if DISPATCH_COCOA_COMPAT
160 void (*dispatch_begin_thread_4GC)(void) = dummy_function;
161 void (*dispatch_end_thread_4GC)(void) = dummy_function;
162 void *(*_dispatch_begin_NSAutoReleasePool)(void) = (void *)dummy_function;
163 void (*_dispatch_end_NSAutoReleasePool)(void *) = (void *)dummy_function;
164 static void _dispatch_queue_wakeup_main(void);
165
166 static dispatch_once_t _dispatch_main_q_port_pred;
167 static bool main_q_is_draining;
168 static mach_port_t main_q_port;
169 #endif
170
171 static void _dispatch_cache_cleanup2(void *value);
172 static void _dispatch_force_cache_cleanup(void);
173
174 static const struct dispatch_queue_vtable_s _dispatch_queue_vtable = {
175 .do_type = DISPATCH_QUEUE_TYPE,
176 .do_kind = "queue",
177 .do_dispose = _dispatch_queue_dispose,
178 .do_invoke = (void *)dummy_function_r0,
179 .do_probe = (void *)dummy_function_r0,
180 .do_debug = dispatch_queue_debug,
181 };
182
183 static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable = {
184 .do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
185 .do_kind = "global-queue",
186 .do_debug = dispatch_queue_debug,
187 .do_probe = _dispatch_queue_wakeup_global,
188 };
189
190 #define MAX_THREAD_COUNT 255
191
192 struct dispatch_root_queue_context_s {
193 pthread_workqueue_t dgq_kworkqueue;
194 uint32_t dgq_pending;
195 uint32_t dgq_thread_pool_size;
196 dispatch_semaphore_t dgq_thread_mediator;
197 };
198
199 #define DISPATCH_ROOT_QUEUE_COUNT (DISPATCH_QUEUE_PRIORITY_COUNT * 2)
200 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
201 {
202 .dgq_thread_mediator = &_dispatch_thread_mediator[0],
203 .dgq_thread_pool_size = MAX_THREAD_COUNT,
204 },
205 {
206 .dgq_thread_mediator = &_dispatch_thread_mediator[1],
207 .dgq_thread_pool_size = MAX_THREAD_COUNT,
208 },
209 {
210 .dgq_thread_mediator = &_dispatch_thread_mediator[2],
211 .dgq_thread_pool_size = MAX_THREAD_COUNT,
212 },
213 {
214 .dgq_thread_mediator = &_dispatch_thread_mediator[3],
215 .dgq_thread_pool_size = MAX_THREAD_COUNT,
216 },
217 {
218 .dgq_thread_mediator = &_dispatch_thread_mediator[4],
219 .dgq_thread_pool_size = MAX_THREAD_COUNT,
220 },
221 {
222 .dgq_thread_mediator = &_dispatch_thread_mediator[5],
223 .dgq_thread_pool_size = MAX_THREAD_COUNT,
224 },
225 };
226
227 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
228 // dq_running is set to 2 so that barrier operations go through the slow path
229 static struct dispatch_queue_s _dispatch_root_queues[] = {
230 {
231 .do_vtable = &_dispatch_queue_root_vtable,
232 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
233 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
234 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
235 .do_ctxt = &_dispatch_root_queue_contexts[0],
236
237 .dq_label = "com.apple.root.low-priority",
238 .dq_running = 2,
239 .dq_width = UINT32_MAX,
240 .dq_serialnum = 4,
241 },
242 {
243 .do_vtable = &_dispatch_queue_root_vtable,
244 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
245 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
246 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
247 .do_ctxt = &_dispatch_root_queue_contexts[1],
248
249 .dq_label = "com.apple.root.low-overcommit-priority",
250 .dq_running = 2,
251 .dq_width = UINT32_MAX,
252 .dq_serialnum = 5,
253 },
254 {
255 .do_vtable = &_dispatch_queue_root_vtable,
256 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
257 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
258 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
259 .do_ctxt = &_dispatch_root_queue_contexts[2],
260
261 .dq_label = "com.apple.root.default-priority",
262 .dq_running = 2,
263 .dq_width = UINT32_MAX,
264 .dq_serialnum = 6,
265 },
266 {
267 .do_vtable = &_dispatch_queue_root_vtable,
268 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
269 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
270 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
271 .do_ctxt = &_dispatch_root_queue_contexts[3],
272
273 .dq_label = "com.apple.root.default-overcommit-priority",
274 .dq_running = 2,
275 .dq_width = UINT32_MAX,
276 .dq_serialnum = 7,
277 },
278 {
279 .do_vtable = &_dispatch_queue_root_vtable,
280 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
281 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
282 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
283 .do_ctxt = &_dispatch_root_queue_contexts[4],
284
285 .dq_label = "com.apple.root.high-priority",
286 .dq_running = 2,
287 .dq_width = UINT32_MAX,
288 .dq_serialnum = 8,
289 },
290 {
291 .do_vtable = &_dispatch_queue_root_vtable,
292 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
293 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
294 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
295 .do_ctxt = &_dispatch_root_queue_contexts[5],
296
297 .dq_label = "com.apple.root.high-overcommit-priority",
298 .dq_running = 2,
299 .dq_width = UINT32_MAX,
300 .dq_serialnum = 9,
301 },
302 };
303
304 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
305 struct dispatch_queue_s _dispatch_main_q = {
306 .do_vtable = &_dispatch_queue_vtable,
307 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
308 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
309 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
310 .do_targetq = &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_COUNT / 2],
311
312 .dq_label = "com.apple.main-thread",
313 .dq_running = 1,
314 .dq_width = 1,
315 .dq_serialnum = 1,
316 };
317
318 #if DISPATCH_PERF_MON
319 static OSSpinLock _dispatch_stats_lock;
320 static size_t _dispatch_bad_ratio;
321 static struct {
322 uint64_t time_total;
323 uint64_t count_total;
324 uint64_t thread_total;
325 } _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set
326 static void _dispatch_queue_merge_stats(uint64_t start);
327 #endif
328
329 static void *_dispatch_worker_thread(void *context);
330 static void _dispatch_worker_thread2(void *context);
331
332 malloc_zone_t *_dispatch_ccache_zone;
333
334 static inline void
335 _dispatch_continuation_free(dispatch_continuation_t dc)
336 {
337 dispatch_continuation_t prev_dc = _dispatch_thread_getspecific(dispatch_cache_key);
338 dc->do_next = prev_dc;
339 _dispatch_thread_setspecific(dispatch_cache_key, dc);
340 }
341
342 static inline void
343 _dispatch_continuation_pop(dispatch_object_t dou)
344 {
345 dispatch_continuation_t dc = dou._dc;
346 dispatch_group_t dg;
347
348 if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
349 return _dispatch_queue_invoke(dou._dq);
350 }
351
352 // Add the item back to the cache before calling the function. This
353 // allows the 'hot' continuation to be used for a quick callback.
354 //
355 // The ccache version is per-thread.
356 // Therefore, the object has not been reused yet.
357 // This generates better assembly.
358 if ((long)dou._do->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
359 _dispatch_continuation_free(dc);
360 }
361 if ((long)dou._do->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
362 dg = dc->dc_group;
363 } else {
364 dg = NULL;
365 }
366 dc->dc_func(dc->dc_ctxt);
367 if (dg) {
368 dispatch_group_leave(dg);
369 _dispatch_release(dg);
370 }
371 }
372
373 struct dispatch_object_s *
374 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq)
375 {
376 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
377
378 // The mediator value acts both as a "lock" and a signal
379 head = dispatch_atomic_xchg(&dq->dq_items_head, mediator);
380
381 if (slowpath(head == NULL)) {
382 // The first xchg on the tail will tell the enqueueing thread that it
383 // is safe to blindly write out to the head pointer. A cmpxchg honors
384 // the algorithm.
385 dispatch_atomic_cmpxchg(&dq->dq_items_head, mediator, NULL);
386 _dispatch_debug("no work on global work queue");
387 return NULL;
388 }
389
390 if (slowpath(head == mediator)) {
391 // This thread lost the race for ownership of the queue.
392 //
393 // The ratio of work to libdispatch overhead must be bad. This
394 // scenario implies that there are too many threads in the pool.
395 // Create a new pending thread and then exit this thread.
396 // The kernel will grant a new thread when the load subsides.
397 _dispatch_debug("Contention on queue: %p", dq);
398 _dispatch_queue_wakeup_global(dq);
399 #if DISPATCH_PERF_MON
400 dispatch_atomic_inc(&_dispatch_bad_ratio);
401 #endif
402 return NULL;
403 }
404
405 // Restore the head pointer to a sane value before returning.
406 // If 'next' is NULL, then this item _might_ be the last item.
407 next = fastpath(head->do_next);
408
409 if (slowpath(!next)) {
410 dq->dq_items_head = NULL;
411
412 if (dispatch_atomic_cmpxchg(&dq->dq_items_tail, head, NULL)) {
413 // both head and tail are NULL now
414 goto out;
415 }
416
417 // There must be a next item now. This thread won't wait long.
418 while (!(next = head->do_next)) {
419 _dispatch_hardware_pause();
420 }
421 }
422
423 dq->dq_items_head = next;
424 _dispatch_queue_wakeup_global(dq);
425 out:
426 return head;
427 }
428
429 dispatch_queue_t
430 dispatch_get_current_queue(void)
431 {
432 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
433 }
434
435 #undef dispatch_get_main_queue
436 __OSX_AVAILABLE_STARTING(__MAC_10_6,__IPHONE_NA)
437 dispatch_queue_t dispatch_get_main_queue(void);
438
439 dispatch_queue_t
440 dispatch_get_main_queue(void)
441 {
442 return &_dispatch_main_q;
443 }
444 #define dispatch_get_main_queue() (&_dispatch_main_q)
445
446 struct _dispatch_hw_config_s _dispatch_hw_config;
447
448 static void
449 _dispatch_queue_set_width_init(void)
450 {
451 size_t valsz = sizeof(uint32_t);
452
453 errno = 0;
454 sysctlbyname("hw.activecpu", &_dispatch_hw_config.cc_max_active, &valsz, NULL, 0);
455 dispatch_assume_zero(errno);
456 dispatch_assume(valsz == sizeof(uint32_t));
457
458 errno = 0;
459 sysctlbyname("hw.logicalcpu_max", &_dispatch_hw_config.cc_max_logical, &valsz, NULL, 0);
460 dispatch_assume_zero(errno);
461 dispatch_assume(valsz == sizeof(uint32_t));
462
463 errno = 0;
464 sysctlbyname("hw.physicalcpu_max", &_dispatch_hw_config.cc_max_physical, &valsz, NULL, 0);
465 dispatch_assume_zero(errno);
466 dispatch_assume(valsz == sizeof(uint32_t));
467 }
468
469 void
470 dispatch_queue_set_width(dispatch_queue_t dq, long width)
471 {
472 int w = (int)width; // intentional truncation
473 uint32_t tmp;
474
475 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
476 return;
477 }
478 if (w == 1 || w == 0) {
479 dq->dq_width = 1;
480 return;
481 }
482 if (w > 0) {
483 tmp = w;
484 } else switch (w) {
485 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
486 tmp = _dispatch_hw_config.cc_max_physical;
487 break;
488 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
489 tmp = _dispatch_hw_config.cc_max_active;
490 break;
491 default:
492 // fall through
493 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
494 tmp = _dispatch_hw_config.cc_max_logical;
495 break;
496 }
497 // multiply by two since the running count is inc/dec by two (the low bit == barrier)
498 dq->dq_width = tmp * 2;
499
500 // XXX if the queue has items and the width is increased, we should try to wake the queue
501 }
502
503 // skip zero
504 // 1 - main_q
505 // 2 - mgr_q
506 // 3 - _unused_
507 // 4,5,6,7,8,9 - global queues
508 // we use 'xadd' on Intel, so the initial value == next assigned
509 static unsigned long _dispatch_queue_serial_numbers = 10;
510
511 // Note to later developers: ensure that any initialization changes are
512 // made for statically allocated queues (i.e. _dispatch_main_q).
513 inline void
514 _dispatch_queue_init(dispatch_queue_t dq)
515 {
516 dq->do_vtable = &_dispatch_queue_vtable;
517 dq->do_next = DISPATCH_OBJECT_LISTLESS;
518 dq->do_ref_cnt = 1;
519 dq->do_xref_cnt = 1;
520 dq->do_targetq = _dispatch_get_root_queue(0, true);
521 dq->dq_running = 0;
522 dq->dq_width = 1;
523 dq->dq_serialnum = dispatch_atomic_inc(&_dispatch_queue_serial_numbers) - 1;
524 }
525
526 dispatch_queue_t
527 dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
528 {
529 dispatch_queue_t dq;
530 size_t label_len;
531
532 if (!label) {
533 label = "";
534 }
535
536 label_len = strlen(label);
537 if (label_len < (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1)) {
538 label_len = (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1);
539 }
540
541 // XXX switch to malloc()
542 dq = calloc(1ul, sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_MIN_LABEL_SIZE + label_len + 1);
543 if (slowpath(!dq)) {
544 return dq;
545 }
546
547 _dispatch_queue_init(dq);
548 strcpy(dq->dq_label, label);
549
550 #ifndef DISPATCH_NO_LEGACY
551 if (slowpath(attr)) {
552 dq->do_targetq = _dispatch_get_root_queue(attr->qa_priority, attr->qa_flags & DISPATCH_QUEUE_OVERCOMMIT);
553 dq->dq_finalizer_ctxt = attr->finalizer_ctxt;
554 dq->dq_finalizer_func = attr->finalizer_func;
555 #ifdef __BLOCKS__
556 if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
557 // if finalizer_ctxt is a Block, retain it.
558 dq->dq_finalizer_ctxt = Block_copy(dq->dq_finalizer_ctxt);
559 if (!(dq->dq_finalizer_ctxt)) {
560 goto out_bad;
561 }
562 }
563 #endif
564 }
565 #endif
566
567 return dq;
568
569 out_bad:
570 free(dq);
571 return NULL;
572 }
573
574 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
575 void
576 _dispatch_queue_dispose(dispatch_queue_t dq)
577 {
578 if (slowpath(dq == _dispatch_queue_get_current())) {
579 DISPATCH_CRASH("Release of a queue by itself");
580 }
581 if (slowpath(dq->dq_items_tail)) {
582 DISPATCH_CRASH("Release of a queue while items are enqueued");
583 }
584
585 #ifndef DISPATCH_NO_LEGACY
586 if (dq->dq_finalizer_func) {
587 dq->dq_finalizer_func(dq->dq_finalizer_ctxt, dq);
588 }
589 #endif
590
591 // trash the tail queue so that use after free will crash
592 dq->dq_items_tail = (void *)0x200;
593
594 _dispatch_dispose(dq);
595 }
596
597 DISPATCH_NOINLINE
598 static void
599 _dispatch_barrier_async_f_slow(dispatch_queue_t dq, void *context, dispatch_function_t func)
600 {
601 dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_from_heap());
602
603 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
604 dc->dc_func = func;
605 dc->dc_ctxt = context;
606
607 _dispatch_queue_push(dq, dc);
608 }
609
610 #ifdef __BLOCKS__
611 void
612 dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void))
613 {
614 dispatch_barrier_async_f(dq, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
615 }
616 #endif
617
618 DISPATCH_NOINLINE
619 void
620 dispatch_barrier_async_f(dispatch_queue_t dq, void *context, dispatch_function_t func)
621 {
622 dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_cacheonly());
623
624 if (!dc) {
625 return _dispatch_barrier_async_f_slow(dq, context, func);
626 }
627
628 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
629 dc->dc_func = func;
630 dc->dc_ctxt = context;
631
632 _dispatch_queue_push(dq, dc);
633 }
634
635 DISPATCH_NOINLINE
636 static void
637 _dispatch_async_f_slow(dispatch_queue_t dq, void *context, dispatch_function_t func)
638 {
639 dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_from_heap());
640
641 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
642 dc->dc_func = func;
643 dc->dc_ctxt = context;
644
645 _dispatch_queue_push(dq, dc);
646 }
647
648 #ifdef __BLOCKS__
649 void
650 dispatch_async(dispatch_queue_t dq, void (^work)(void))
651 {
652 dispatch_async_f(dq, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
653 }
654 #endif
655
656 DISPATCH_NOINLINE
657 void
658 dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
659 {
660 dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_cacheonly());
661
662 // unlike dispatch_sync_f(), we do NOT need to check the queue width,
663 // the "drain" function will do this test
664
665 if (!dc) {
666 return _dispatch_async_f_slow(dq, ctxt, func);
667 }
668
669 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
670 dc->dc_func = func;
671 dc->dc_ctxt = ctxt;
672
673 _dispatch_queue_push(dq, dc);
674 }
675
676 struct dispatch_barrier_sync_slow2_s {
677 dispatch_function_t dbss2_func;
678 dispatch_function_t dbss2_ctxt;
679 dispatch_semaphore_t dbss2_sema;
680 };
681
682 static void
683 _dispatch_barrier_sync_f_slow_invoke(void *ctxt)
684 {
685 struct dispatch_barrier_sync_slow2_s *dbss2 = ctxt;
686
687 dbss2->dbss2_func(dbss2->dbss2_ctxt);
688 dispatch_semaphore_signal(dbss2->dbss2_sema);
689 }
690
691 DISPATCH_NOINLINE
692 static void
693 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
694 {
695 struct dispatch_barrier_sync_slow2_s dbss2 = {
696 .dbss2_func = func,
697 .dbss2_ctxt = ctxt,
698 .dbss2_sema = _dispatch_get_thread_semaphore(),
699 };
700 struct dispatch_barrier_sync_slow_s {
701 DISPATCH_CONTINUATION_HEADER(dispatch_barrier_sync_slow_s);
702 } dbss = {
703 .do_vtable = (void *)DISPATCH_OBJ_BARRIER_BIT,
704 .dc_func = _dispatch_barrier_sync_f_slow_invoke,
705 .dc_ctxt = &dbss2,
706 };
707
708 _dispatch_queue_push(dq, (void *)&dbss);
709
710 while (dispatch_semaphore_wait(dbss2.dbss2_sema, dispatch_time(0, 3ull * NSEC_PER_SEC))) {
711 if (DISPATCH_OBJECT_SUSPENDED(dq)) {
712 continue;
713 }
714 if (_dispatch_queue_trylock(dq)) {
715 _dispatch_queue_drain(dq);
716 _dispatch_queue_unlock(dq);
717 }
718 }
719 _dispatch_put_thread_semaphore(dbss2.dbss2_sema);
720 }
721
722 #ifdef __BLOCKS__
723 void
724 dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void))
725 {
726 struct Block_basic *bb = (void *)work;
727
728 dispatch_barrier_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
729 }
730 #endif
731
732 DISPATCH_NOINLINE
733 void
734 dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
735 {
736 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
737
738 // 1) ensure that this thread hasn't enqueued anything ahead of this call
739 // 2) the queue is not suspended
740 // 3) the queue is not weird
741 if (slowpath(dq->dq_items_tail)
742 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))
743 || slowpath(!_dispatch_queue_trylock(dq))) {
744 return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
745 }
746
747 _dispatch_thread_setspecific(dispatch_queue_key, dq);
748 func(ctxt);
749 _dispatch_workitem_inc();
750 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
751 _dispatch_queue_unlock(dq);
752 }
753
754 static void
755 _dispatch_sync_f_slow2(void *ctxt)
756 {
757 dispatch_queue_t dq = _dispatch_queue_get_current();
758 dispatch_atomic_add(&dq->dq_running, 2);
759 dispatch_semaphore_signal(ctxt);
760 }
761
762 DISPATCH_NOINLINE
763 static void
764 _dispatch_sync_f_slow(dispatch_queue_t dq)
765 {
766 // the global root queues do not need strict ordering
767 if (dq->do_targetq == NULL) {
768 dispatch_atomic_add(&dq->dq_running, 2);
769 return;
770 }
771
772 struct dispatch_sync_slow_s {
773 DISPATCH_CONTINUATION_HEADER(dispatch_sync_slow_s);
774 } dss = {
775 .do_vtable = NULL,
776 .dc_func = _dispatch_sync_f_slow2,
777 .dc_ctxt = _dispatch_get_thread_semaphore(),
778 };
779
780 // XXX FIXME -- concurrent queues can be come serial again
781 _dispatch_queue_push(dq, (void *)&dss);
782
783 dispatch_semaphore_wait(dss.dc_ctxt, DISPATCH_TIME_FOREVER);
784 _dispatch_put_thread_semaphore(dss.dc_ctxt);
785 }
786
787 #ifdef __BLOCKS__
788 void
789 dispatch_sync(dispatch_queue_t dq, void (^work)(void))
790 {
791 struct Block_basic *bb = (void *)work;
792 dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
793 }
794 #endif
795
796 DISPATCH_NOINLINE
797 void
798 dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
799 {
800 typeof(dq->dq_running) prev_cnt;
801 dispatch_queue_t old_dq;
802
803 if (dq->dq_width == 1) {
804 return dispatch_barrier_sync_f(dq, ctxt, func);
805 }
806
807 // 1) ensure that this thread hasn't enqueued anything ahead of this call
808 // 2) the queue is not suspended
809 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
810 _dispatch_sync_f_slow(dq);
811 } else {
812 prev_cnt = dispatch_atomic_add(&dq->dq_running, 2) - 2;
813
814 if (slowpath(prev_cnt & 1)) {
815 if (dispatch_atomic_sub(&dq->dq_running, 2) == 0) {
816 _dispatch_wakeup(dq);
817 }
818 _dispatch_sync_f_slow(dq);
819 }
820 }
821
822 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
823 _dispatch_thread_setspecific(dispatch_queue_key, dq);
824 func(ctxt);
825 _dispatch_workitem_inc();
826 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
827
828 if (slowpath(dispatch_atomic_sub(&dq->dq_running, 2) == 0)) {
829 _dispatch_wakeup(dq);
830 }
831 }
832
833 const char *
834 dispatch_queue_get_label(dispatch_queue_t dq)
835 {
836 return dq->dq_label;
837 }
838
839 #if DISPATCH_COCOA_COMPAT
840 static void
841 _dispatch_main_q_port_init(void *ctxt __attribute__((unused)))
842 {
843 kern_return_t kr;
844
845 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &main_q_port);
846 DISPATCH_VERIFY_MIG(kr);
847 dispatch_assume_zero(kr);
848 kr = mach_port_insert_right(mach_task_self(), main_q_port, main_q_port, MACH_MSG_TYPE_MAKE_SEND);
849 DISPATCH_VERIFY_MIG(kr);
850 dispatch_assume_zero(kr);
851
852 _dispatch_program_is_probably_callback_driven = true;
853 _dispatch_safe_fork = false;
854 }
855
856 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
857 DISPATCH_NOINLINE
858 static void
859 _dispatch_queue_set_mainq_drain_state(bool arg)
860 {
861 main_q_is_draining = arg;
862 }
863 #endif
864
865 void
866 dispatch_main(void)
867 {
868 if (pthread_main_np()) {
869 _dispatch_program_is_probably_callback_driven = true;
870 pthread_exit(NULL);
871 DISPATCH_CRASH("pthread_exit() returned");
872 }
873 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
874 }
875
876 static void
877 _dispatch_sigsuspend(void *ctxt __attribute__((unused)))
878 {
879 static const sigset_t mask;
880
881 for (;;) {
882 sigsuspend(&mask);
883 }
884 }
885
886 DISPATCH_NOINLINE
887 static void
888 _dispatch_queue_cleanup2(void)
889 {
890 dispatch_atomic_dec(&_dispatch_main_q.dq_running);
891
892 if (dispatch_atomic_sub(&_dispatch_main_q.do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK) == 0) {
893 _dispatch_wakeup(&_dispatch_main_q);
894 }
895
896 // overload the "probably" variable to mean that dispatch_main() or
897 // similar non-POSIX API was called
898 // this has to run before the DISPATCH_COCOA_COMPAT below
899 if (_dispatch_program_is_probably_callback_driven) {
900 dispatch_async_f(_dispatch_get_root_queue(0, 0), NULL, _dispatch_sigsuspend);
901 sleep(1); // workaround 6778970
902 }
903
904 #if DISPATCH_COCOA_COMPAT
905 dispatch_once_f(&_dispatch_main_q_port_pred, NULL, _dispatch_main_q_port_init);
906
907 mach_port_t mp = main_q_port;
908 kern_return_t kr;
909
910 main_q_port = 0;
911
912 if (mp) {
913 kr = mach_port_deallocate(mach_task_self(), mp);
914 DISPATCH_VERIFY_MIG(kr);
915 dispatch_assume_zero(kr);
916 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE, -1);
917 DISPATCH_VERIFY_MIG(kr);
918 dispatch_assume_zero(kr);
919 }
920 #endif
921 }
922
923 dispatch_queue_t
924 dispatch_get_concurrent_queue(long pri)
925 {
926 if (pri > 0) {
927 pri = DISPATCH_QUEUE_PRIORITY_HIGH;
928 } else if (pri < 0) {
929 pri = DISPATCH_QUEUE_PRIORITY_LOW;
930 }
931 return _dispatch_get_root_queue(pri, false);
932 }
933
934 static void
935 _dispatch_queue_cleanup(void *ctxt)
936 {
937 if (ctxt == &_dispatch_main_q) {
938 return _dispatch_queue_cleanup2();
939 }
940 // POSIX defines that destructors are only called if 'ctxt' is non-null
941 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
942 }
943
944 dispatch_queue_t
945 dispatch_get_global_queue(long priority, unsigned long flags)
946 {
947 if (flags & ~DISPATCH_QUEUE_OVERCOMMIT) {
948 return NULL;
949 }
950 return _dispatch_get_root_queue(priority, flags & DISPATCH_QUEUE_OVERCOMMIT);
951 }
952
953 #define countof(x) (sizeof(x) / sizeof(x[0]))
954 void
955 libdispatch_init(void)
956 {
957 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT == 3);
958 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 6);
959
960 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW == -DISPATCH_QUEUE_PRIORITY_HIGH);
961 dispatch_assert(countof(_dispatch_root_queues) == DISPATCH_ROOT_QUEUE_COUNT);
962 dispatch_assert(countof(_dispatch_thread_mediator) == DISPATCH_ROOT_QUEUE_COUNT);
963 dispatch_assert(countof(_dispatch_root_queue_contexts) == DISPATCH_ROOT_QUEUE_COUNT);
964
965 _dispatch_thread_key_init_np(dispatch_queue_key, _dispatch_queue_cleanup);
966 _dispatch_thread_key_init_np(dispatch_sema4_key, (void (*)(void *))dispatch_release); // use the extern release
967 _dispatch_thread_key_init_np(dispatch_cache_key, _dispatch_cache_cleanup2);
968 #if DISPATCH_PERF_MON
969 _dispatch_thread_key_init_np(dispatch_bcounter_key, NULL);
970 #endif
971
972 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_main_q);
973
974 _dispatch_queue_set_width_init();
975 }
976
977 void
978 _dispatch_queue_unlock(dispatch_queue_t dq)
979 {
980 if (slowpath(dispatch_atomic_dec(&dq->dq_running))) {
981 return;
982 }
983
984 _dispatch_wakeup(dq);
985 }
986
987 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
988 dispatch_queue_t
989 _dispatch_wakeup(dispatch_object_t dou)
990 {
991 dispatch_queue_t tq;
992
993 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
994 return NULL;
995 }
996 if (!dx_probe(dou._do) && !dou._dq->dq_items_tail) {
997 return NULL;
998 }
999
1000 if (!_dispatch_trylock(dou._do)) {
1001 #if DISPATCH_COCOA_COMPAT
1002 if (dou._dq == &_dispatch_main_q) {
1003 _dispatch_queue_wakeup_main();
1004 }
1005 #endif
1006 return NULL;
1007 }
1008 _dispatch_retain(dou._do);
1009 tq = dou._do->do_targetq;
1010 _dispatch_queue_push(tq, dou._do);
1011 return tq; // libdispatch doesn't need this, but the Instrument DTrace probe does
1012 }
1013
1014 #if DISPATCH_COCOA_COMPAT
1015 DISPATCH_NOINLINE
1016 void
1017 _dispatch_queue_wakeup_main(void)
1018 {
1019 kern_return_t kr;
1020
1021 dispatch_once_f(&_dispatch_main_q_port_pred, NULL, _dispatch_main_q_port_init);
1022
1023 kr = _dispatch_send_wakeup_main_thread(main_q_port, 0);
1024
1025 switch (kr) {
1026 case MACH_SEND_TIMEOUT:
1027 case MACH_SEND_TIMED_OUT:
1028 case MACH_SEND_INVALID_DEST:
1029 break;
1030 default:
1031 dispatch_assume_zero(kr);
1032 break;
1033 }
1034
1035 _dispatch_safe_fork = false;
1036 }
1037 #endif
1038
1039 static inline int
1040 _dispatch_rootq2wq_pri(long idx)
1041 {
1042 #ifdef WORKQ_DEFAULT_PRIOQUEUE
1043 switch (idx) {
1044 case 0:
1045 case 1:
1046 return WORKQ_LOW_PRIOQUEUE;
1047 case 2:
1048 case 3:
1049 default:
1050 return WORKQ_DEFAULT_PRIOQUEUE;
1051 case 4:
1052 case 5:
1053 return WORKQ_HIGH_PRIOQUEUE;
1054 }
1055 #else
1056 return pri;
1057 #endif
1058 }
1059
1060 static void
1061 _dispatch_root_queues_init(void *context __attribute__((unused)))
1062 {
1063 bool disable_wq = getenv("LIBDISPATCH_DISABLE_KWQ");
1064 pthread_workqueue_attr_t pwq_attr;
1065 kern_return_t kr;
1066 int i, r;
1067
1068 r = pthread_workqueue_attr_init_np(&pwq_attr);
1069 dispatch_assume_zero(r);
1070
1071 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
1072 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr, _dispatch_rootq2wq_pri(i));
1073 dispatch_assume_zero(r);
1074 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr, i & 1);
1075 dispatch_assume_zero(r);
1076 // some software hangs if the non-overcommitting queues do not overcommit when threads block
1077 #if 0
1078 if (!(i & 1)) {
1079 dispatch_root_queue_contexts[i].dgq_thread_pool_size = _dispatch_hw_config.cc_max_active;
1080 }
1081 #endif
1082
1083 r = 0;
1084 if (disable_wq || (r = pthread_workqueue_create_np(&_dispatch_root_queue_contexts[i].dgq_kworkqueue, &pwq_attr))) {
1085 if (r != ENOTSUP) {
1086 dispatch_assume_zero(r);
1087 }
1088 // override the default FIFO behavior for the pool semaphores
1089 kr = semaphore_create(mach_task_self(), &_dispatch_thread_mediator[i].dsema_port, SYNC_POLICY_LIFO, 0);
1090 DISPATCH_VERIFY_MIG(kr);
1091 dispatch_assume_zero(kr);
1092 dispatch_assume(_dispatch_thread_mediator[i].dsema_port);
1093 } else {
1094 dispatch_assume(_dispatch_root_queue_contexts[i].dgq_kworkqueue);
1095 }
1096 }
1097
1098 r = pthread_workqueue_attr_destroy_np(&pwq_attr);
1099 dispatch_assume_zero(r);
1100 }
1101
1102 bool
1103 _dispatch_queue_wakeup_global(dispatch_queue_t dq)
1104 {
1105 static dispatch_once_t pred;
1106 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
1107 pthread_workitem_handle_t wh;
1108 unsigned int gen_cnt;
1109 pthread_t pthr;
1110 int r, t_count;
1111
1112 if (!dq->dq_items_tail) {
1113 return false;
1114 }
1115
1116 _dispatch_safe_fork = false;
1117
1118 dispatch_debug_queue(dq, __PRETTY_FUNCTION__);
1119
1120 dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);
1121
1122 if (qc->dgq_kworkqueue) {
1123 if (dispatch_atomic_cmpxchg(&qc->dgq_pending, 0, 1)) {
1124 _dispatch_debug("requesting new worker thread");
1125
1126 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue, _dispatch_worker_thread2, dq, &wh, &gen_cnt);
1127 dispatch_assume_zero(r);
1128 } else {
1129 _dispatch_debug("work thread request still pending on global queue: %p", dq);
1130 }
1131 goto out;
1132 }
1133
1134 if (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
1135 goto out;
1136 }
1137
1138 do {
1139 t_count = qc->dgq_thread_pool_size;
1140 if (!t_count) {
1141 _dispatch_debug("The thread pool is full: %p", dq);
1142 goto out;
1143 }
1144 } while (!dispatch_atomic_cmpxchg(&qc->dgq_thread_pool_size, t_count, t_count - 1));
1145
1146 while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
1147 if (r != EAGAIN) {
1148 dispatch_assume_zero(r);
1149 }
1150 sleep(1);
1151 }
1152 r = pthread_detach(pthr);
1153 dispatch_assume_zero(r);
1154
1155 out:
1156 return false;
1157 }
1158
1159 void
1160 _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq)
1161 {
1162 #if DISPATCH_PERF_MON
1163 uint64_t start = mach_absolute_time();
1164 #endif
1165 _dispatch_queue_drain(dq);
1166 #if DISPATCH_PERF_MON
1167 _dispatch_queue_merge_stats(start);
1168 #endif
1169 _dispatch_force_cache_cleanup();
1170 }
1171
1172 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1173 DISPATCH_NOINLINE
1174 void
1175 _dispatch_queue_invoke(dispatch_queue_t dq)
1176 {
1177 dispatch_queue_t tq = dq->do_targetq;
1178
1179 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) && fastpath(_dispatch_queue_trylock(dq))) {
1180 _dispatch_queue_drain(dq);
1181 if (tq == dq->do_targetq) {
1182 tq = dx_invoke(dq);
1183 } else {
1184 tq = dq->do_targetq;
1185 }
1186 // We do not need to check the result.
1187 // When the suspend-count lock is dropped, then the check will happen.
1188 dispatch_atomic_dec(&dq->dq_running);
1189 if (tq) {
1190 return _dispatch_queue_push(tq, dq);
1191 }
1192 }
1193
1194 dq->do_next = DISPATCH_OBJECT_LISTLESS;
1195 if (dispatch_atomic_sub(&dq->do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK) == 0) {
1196 if (dq->dq_running == 0) {
1197 _dispatch_wakeup(dq); // verify that the queue is idle
1198 }
1199 }
1200 _dispatch_release(dq); // added when the queue is put on the list
1201 }
1202
1203 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1204 static void
1205 _dispatch_set_target_queue2(void *ctxt)
1206 {
1207 dispatch_queue_t prev_dq, dq = _dispatch_queue_get_current();
1208
1209 prev_dq = dq->do_targetq;
1210 dq->do_targetq = ctxt;
1211 _dispatch_release(prev_dq);
1212 }
1213
1214 void
1215 dispatch_set_target_queue(dispatch_object_t dou, dispatch_queue_t dq)
1216 {
1217 if (slowpath(dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
1218 return;
1219 }
1220 // NOTE: we test for NULL target queues internally to detect root queues
1221 // therefore, if the retain crashes due to a bad input, that is OK
1222 _dispatch_retain(dq);
1223 dispatch_barrier_async_f(dou._dq, dq, _dispatch_set_target_queue2);
1224 }
1225
1226 static void
1227 _dispatch_async_f_redirect2(void *_ctxt)
1228 {
1229 struct dispatch_continuation_s *dc = _ctxt;
1230 struct dispatch_continuation_s *other_dc = dc->dc_data[1];
1231 dispatch_queue_t old_dq, dq = dc->dc_data[0];
1232
1233 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1234 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1235 _dispatch_continuation_pop(other_dc);
1236 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1237
1238 if (dispatch_atomic_sub(&dq->dq_running, 2) == 0) {
1239 _dispatch_wakeup(dq);
1240 }
1241 _dispatch_release(dq);
1242 }
1243
1244 static void
1245 _dispatch_async_f_redirect(dispatch_queue_t dq, struct dispatch_object_s *other_dc)
1246 {
1247 dispatch_continuation_t dc = (void *)other_dc;
1248 dispatch_queue_t root_dq = dq;
1249
1250 if (dc->dc_func == _dispatch_sync_f_slow2) {
1251 return dc->dc_func(dc->dc_ctxt);
1252 }
1253
1254 dispatch_atomic_add(&dq->dq_running, 2);
1255 _dispatch_retain(dq);
1256
1257 dc = _dispatch_continuation_alloc_cacheonly() ?: _dispatch_continuation_alloc_from_heap();
1258
1259 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1260 dc->dc_func = _dispatch_async_f_redirect2;
1261 dc->dc_ctxt = dc;
1262 dc->dc_data[0] = dq;
1263 dc->dc_data[1] = other_dc;
1264
1265 do {
1266 root_dq = root_dq->do_targetq;
1267 } while (root_dq->do_targetq);
1268
1269 _dispatch_queue_push(root_dq, dc);
1270 }
1271
1272
1273 void
1274 _dispatch_queue_drain(dispatch_queue_t dq)
1275 {
1276 dispatch_queue_t orig_tq, old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1277 struct dispatch_object_s *dc = NULL, *next_dc = NULL;
1278
1279 orig_tq = dq->do_targetq;
1280
1281 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1282
1283 while (dq->dq_items_tail) {
1284 while (!fastpath(dq->dq_items_head)) {
1285 _dispatch_hardware_pause();
1286 }
1287
1288 dc = dq->dq_items_head;
1289 dq->dq_items_head = NULL;
1290
1291 do {
1292 // Enqueue is TIGHTLY controlled, we won't wait long.
1293 do {
1294 next_dc = fastpath(dc->do_next);
1295 } while (!next_dc && !dispatch_atomic_cmpxchg(&dq->dq_items_tail, dc, NULL));
1296 if (DISPATCH_OBJECT_SUSPENDED(dq)) {
1297 goto out;
1298 }
1299 if (dq->dq_running > dq->dq_width) {
1300 goto out;
1301 }
1302 if (orig_tq != dq->do_targetq) {
1303 goto out;
1304 }
1305 if (fastpath(dq->dq_width == 1)) {
1306 _dispatch_continuation_pop(dc);
1307 _dispatch_workitem_inc();
1308 } else if ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
1309 if (dq->dq_running > 1) {
1310 goto out;
1311 }
1312 _dispatch_continuation_pop(dc);
1313 _dispatch_workitem_inc();
1314 } else {
1315 _dispatch_async_f_redirect(dq, dc);
1316 }
1317 } while ((dc = next_dc));
1318 }
1319
1320 out:
1321 // if this is not a complete drain, we must undo some things
1322 if (slowpath(dc)) {
1323 // 'dc' must NOT be "popped"
1324 // 'dc' might be the last item
1325 if (next_dc || dispatch_atomic_cmpxchg(&dq->dq_items_tail, NULL, dc)) {
1326 dq->dq_items_head = dc;
1327 } else {
1328 while (!(next_dc = dq->dq_items_head)) {
1329 _dispatch_hardware_pause();
1330 }
1331 dq->dq_items_head = dc;
1332 dc->do_next = next_dc;
1333 }
1334 }
1335
1336 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1337 }
1338
1339 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1340 void *
1341 _dispatch_worker_thread(void *context)
1342 {
1343 dispatch_queue_t dq = context;
1344 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
1345 sigset_t mask;
1346 int r;
1347
1348 // workaround tweaks the kernel workqueue does for us
1349 r = sigfillset(&mask);
1350 dispatch_assume_zero(r);
1351 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
1352 dispatch_assume_zero(r);
1353
1354 do {
1355 _dispatch_worker_thread2(context);
1356 // we use 65 seconds in case there are any timers that run once a minute
1357 } while (dispatch_semaphore_wait(qc->dgq_thread_mediator, dispatch_time(0, 65ull * NSEC_PER_SEC)) == 0);
1358
1359 dispatch_atomic_inc(&qc->dgq_thread_pool_size);
1360 if (dq->dq_items_tail) {
1361 _dispatch_queue_wakeup_global(dq);
1362 }
1363
1364 return NULL;
1365 }
1366
1367 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1368 void
1369 _dispatch_worker_thread2(void *context)
1370 {
1371 struct dispatch_object_s *item;
1372 dispatch_queue_t dq = context;
1373 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
1374
1375 if (_dispatch_thread_getspecific(dispatch_queue_key)) {
1376 DISPATCH_CRASH("Premature thread recycling");
1377 }
1378
1379 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1380 qc->dgq_pending = 0;
1381
1382 #if DISPATCH_COCOA_COMPAT
1383 // ensure that high-level memory management techniques do not leak/crash
1384 dispatch_begin_thread_4GC();
1385 void *pool = _dispatch_begin_NSAutoReleasePool();
1386 #endif
1387
1388 #if DISPATCH_PERF_MON
1389 uint64_t start = mach_absolute_time();
1390 #endif
1391 while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
1392 _dispatch_continuation_pop(item);
1393 }
1394 #if DISPATCH_PERF_MON
1395 _dispatch_queue_merge_stats(start);
1396 #endif
1397
1398 #if DISPATCH_COCOA_COMPAT
1399 _dispatch_end_NSAutoReleasePool(pool);
1400 dispatch_end_thread_4GC();
1401 #endif
1402
1403 _dispatch_thread_setspecific(dispatch_queue_key, NULL);
1404
1405 _dispatch_force_cache_cleanup();
1406 }
1407
1408 #if DISPATCH_PERF_MON
1409 void
1410 _dispatch_queue_merge_stats(uint64_t start)
1411 {
1412 uint64_t avg, delta = mach_absolute_time() - start;
1413 unsigned long count, bucket;
1414
1415 count = (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key);
1416 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
1417
1418 if (count) {
1419 avg = delta / count;
1420 bucket = flsll(avg);
1421 } else {
1422 bucket = 0;
1423 }
1424
1425 // 64-bit counters on 32-bit require a lock or a queue
1426 OSSpinLockLock(&_dispatch_stats_lock);
1427
1428 _dispatch_stats[bucket].time_total += delta;
1429 _dispatch_stats[bucket].count_total += count;
1430 _dispatch_stats[bucket].thread_total++;
1431
1432 OSSpinLockUnlock(&_dispatch_stats_lock);
1433 }
1434 #endif
1435
1436 size_t
1437 dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
1438 {
1439 return snprintf(buf, bufsiz, "parent = %p ", dq->do_targetq);
1440 }
1441
1442 size_t
1443 dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
1444 {
1445 size_t offset = 0;
1446 offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", dq->dq_label, dq);
1447 offset += dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
1448 offset += dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
1449 offset += snprintf(&buf[offset], bufsiz - offset, "}");
1450 return offset;
1451 }
1452
1453 #if DISPATCH_DEBUG
1454 void
1455 dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
1456 if (fastpath(dq)) {
1457 dispatch_debug(dq, "%s", str);
1458 } else {
1459 _dispatch_log("queue[NULL]: %s", str);
1460 }
1461 }
1462 #endif
1463
1464 #if DISPATCH_COCOA_COMPAT
1465 void
1466 _dispatch_main_queue_callback_4CF(mach_msg_header_t *msg __attribute__((unused)))
1467 {
1468 if (main_q_is_draining) {
1469 return;
1470 }
1471 _dispatch_queue_set_mainq_drain_state(true);
1472 _dispatch_queue_serial_drain_till_empty(&_dispatch_main_q);
1473 _dispatch_queue_set_mainq_drain_state(false);
1474 }
1475
1476 mach_port_t
1477 _dispatch_get_main_queue_port_4CF(void)
1478 {
1479 dispatch_once_f(&_dispatch_main_q_port_pred, NULL, _dispatch_main_q_port_init);
1480 return main_q_port;
1481 }
1482 #endif
1483
1484 static void
1485 dispatch_queue_attr_dispose(dispatch_queue_attr_t attr)
1486 {
1487 dispatch_queue_attr_set_finalizer(attr, NULL);
1488 _dispatch_dispose(attr);
1489 }
1490
1491 static const struct dispatch_queue_attr_vtable_s dispatch_queue_attr_vtable = {
1492 .do_type = DISPATCH_QUEUE_ATTR_TYPE,
1493 .do_kind = "queue-attr",
1494 .do_dispose = dispatch_queue_attr_dispose,
1495 };
1496
1497 dispatch_queue_attr_t
1498 dispatch_queue_attr_create(void)
1499 {
1500 dispatch_queue_attr_t a = calloc(1, sizeof(struct dispatch_queue_attr_s));
1501
1502 if (a) {
1503 a->do_vtable = &dispatch_queue_attr_vtable;
1504 a->do_next = DISPATCH_OBJECT_LISTLESS;
1505 a->do_ref_cnt = 1;
1506 a->do_xref_cnt = 1;
1507 a->do_targetq = _dispatch_get_root_queue(0, 0);
1508 a->qa_flags = DISPATCH_QUEUE_OVERCOMMIT;
1509 }
1510 return a;
1511 }
1512
1513 void
1514 dispatch_queue_attr_set_flags(dispatch_queue_attr_t attr, uint64_t flags)
1515 {
1516 dispatch_assert_zero(flags & ~DISPATCH_QUEUE_FLAGS_MASK);
1517 attr->qa_flags = (unsigned long)flags & DISPATCH_QUEUE_FLAGS_MASK;
1518 }
1519
1520 void
1521 dispatch_queue_attr_set_priority(dispatch_queue_attr_t attr, int priority)
1522 {
1523 dispatch_debug_assert(attr, "NULL pointer");
1524 dispatch_debug_assert(priority <= DISPATCH_QUEUE_PRIORITY_HIGH && priority >= DISPATCH_QUEUE_PRIORITY_LOW, "Invalid priority");
1525
1526 if (priority > 0) {
1527 priority = DISPATCH_QUEUE_PRIORITY_HIGH;
1528 } else if (priority < 0) {
1529 priority = DISPATCH_QUEUE_PRIORITY_LOW;
1530 }
1531
1532 attr->qa_priority = priority;
1533 }
1534
1535 void
1536 dispatch_queue_attr_set_finalizer_f(dispatch_queue_attr_t attr,
1537 void *context, dispatch_queue_finalizer_function_t finalizer)
1538 {
1539 #ifdef __BLOCKS__
1540 if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
1541 Block_release(attr->finalizer_ctxt);
1542 }
1543 #endif
1544 attr->finalizer_ctxt = context;
1545 attr->finalizer_func = finalizer;
1546 }
1547
1548 #ifdef __BLOCKS__
1549 long
1550 dispatch_queue_attr_set_finalizer(dispatch_queue_attr_t attr,
1551 dispatch_queue_finalizer_t finalizer)
1552 {
1553 void *ctxt;
1554 dispatch_queue_finalizer_function_t func;
1555
1556 if (finalizer) {
1557 if (!(ctxt = Block_copy(finalizer))) {
1558 return 1;
1559 }
1560 func = (void *)_dispatch_call_block_and_release2;
1561 } else {
1562 ctxt = NULL;
1563 func = NULL;
1564 }
1565
1566 dispatch_queue_attr_set_finalizer_f(attr, ctxt, func);
1567
1568 return 0;
1569 }
1570 #endif
1571
1572 static void
1573 _dispatch_ccache_init(void *context __attribute__((unused)))
1574 {
1575 _dispatch_ccache_zone = malloc_create_zone(0, 0);
1576 dispatch_assert(_dispatch_ccache_zone);
1577 malloc_set_zone_name(_dispatch_ccache_zone, "DispatchContinuations");
1578 }
1579
1580 dispatch_continuation_t
1581 _dispatch_continuation_alloc_from_heap(void)
1582 {
1583 static dispatch_once_t pred;
1584 dispatch_continuation_t dc;
1585
1586 dispatch_once_f(&pred, NULL, _dispatch_ccache_init);
1587
1588 while (!(dc = fastpath(malloc_zone_calloc(_dispatch_ccache_zone, 1, ROUND_UP_TO_CACHELINE_SIZE(sizeof(*dc)))))) {
1589 sleep(1);
1590 }
1591
1592 return dc;
1593 }
1594
1595 void
1596 _dispatch_force_cache_cleanup(void)
1597 {
1598 dispatch_continuation_t dc = _dispatch_thread_getspecific(dispatch_cache_key);
1599 if (dc) {
1600 _dispatch_thread_setspecific(dispatch_cache_key, NULL);
1601 _dispatch_cache_cleanup2(dc);
1602 }
1603 }
1604
1605 DISPATCH_NOINLINE
1606 void
1607 _dispatch_cache_cleanup2(void *value)
1608 {
1609 dispatch_continuation_t dc, next_dc = value;
1610
1611 while ((dc = next_dc)) {
1612 next_dc = dc->do_next;
1613 malloc_zone_free(_dispatch_ccache_zone, dc);
1614 }
1615 }
1616
1617 static char _dispatch_build[16];
1618
1619 static void
1620 _dispatch_bug_init(void *context __attribute__((unused)))
1621 {
1622 int mib[] = { CTL_KERN, KERN_OSVERSION };
1623 size_t bufsz = sizeof(_dispatch_build);
1624
1625 sysctl(mib, 2, _dispatch_build, &bufsz, NULL, 0);
1626 }
1627
1628 void
1629 _dispatch_bug(size_t line, long val)
1630 {
1631 static dispatch_once_t pred;
1632 static void *last_seen;
1633 void *ra = __builtin_return_address(0);
1634
1635 dispatch_once_f(&pred, NULL, _dispatch_bug_init);
1636 if (last_seen != ra) {
1637 last_seen = ra;
1638 _dispatch_log("BUG in libdispatch: %s - %lu - 0x%lx", _dispatch_build, line, val);
1639 }
1640 }
1641
1642 void
1643 _dispatch_abort(size_t line, long val)
1644 {
1645 _dispatch_bug(line, val);
1646 abort();
1647 }
1648
1649 void
1650 _dispatch_log(const char *msg, ...)
1651 {
1652 va_list ap;
1653
1654 va_start(ap, msg);
1655
1656 _dispatch_logv(msg, ap);
1657
1658 va_end(ap);
1659 }
1660
1661 void
1662 _dispatch_logv(const char *msg, va_list ap)
1663 {
1664 #if DISPATCH_DEBUG
1665 static FILE *logfile, *tmp;
1666 char newbuf[strlen(msg) + 2];
1667 char path[PATH_MAX];
1668
1669 sprintf(newbuf, "%s\n", msg);
1670
1671 if (!logfile) {
1672 snprintf(path, sizeof(path), "/var/tmp/libdispatch.%d.log", getpid());
1673 tmp = fopen(path, "a");
1674 assert(tmp);
1675 if (!dispatch_atomic_cmpxchg(&logfile, NULL, tmp)) {
1676 fclose(tmp);
1677 } else {
1678 struct timeval tv;
1679 gettimeofday(&tv, NULL);
1680 fprintf(logfile, "=== log file opened for %s[%u] at %ld.%06u ===\n",
1681 getprogname() ?: "", getpid(), tv.tv_sec, tv.tv_usec);
1682 }
1683 }
1684 vfprintf(logfile, newbuf, ap);
1685 fflush(logfile);
1686 #else
1687 vsyslog(LOG_NOTICE, msg, ap);
1688 #endif
1689 }
1690
1691 int
1692 _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset)
1693 {
1694 int r;
1695
1696 /* Workaround: 6269619 Not all signals can be delivered on any thread */
1697
1698 r = sigdelset(set, SIGILL);
1699 dispatch_assume_zero(r);
1700 r = sigdelset(set, SIGTRAP);
1701 dispatch_assume_zero(r);
1702 r = sigdelset(set, SIGEMT);
1703 dispatch_assume_zero(r);
1704 r = sigdelset(set, SIGFPE);
1705 dispatch_assume_zero(r);
1706 r = sigdelset(set, SIGBUS);
1707 dispatch_assume_zero(r);
1708 r = sigdelset(set, SIGSEGV);
1709 dispatch_assume_zero(r);
1710 r = sigdelset(set, SIGSYS);
1711 dispatch_assume_zero(r);
1712 r = sigdelset(set, SIGPIPE);
1713 dispatch_assume_zero(r);
1714
1715 return pthread_sigmask(how, set, oset);
1716 }
1717
1718 bool _dispatch_safe_fork = true;
1719
1720 void
1721 dispatch_atfork_prepare(void)
1722 {
1723 }
1724
1725 void
1726 dispatch_atfork_parent(void)
1727 {
1728 }
1729
1730 void
1731 dispatch_atfork_child(void)
1732 {
1733 void *crash = (void *)0x100;
1734 size_t i;
1735
1736 if (_dispatch_safe_fork) {
1737 return;
1738 }
1739
1740 _dispatch_main_q.dq_items_head = crash;
1741 _dispatch_main_q.dq_items_tail = crash;
1742
1743 _dispatch_mgr_q.dq_items_head = crash;
1744 _dispatch_mgr_q.dq_items_tail = crash;
1745
1746 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
1747 _dispatch_root_queues[i].dq_items_head = crash;
1748 _dispatch_root_queues[i].dq_items_tail = crash;
1749 }
1750 }
1751
1752 void
1753 dispatch_init_pthread(pthread_t pthr __attribute__((unused)))
1754 {
1755 }
1756
1757 static int _dispatch_kq;
1758
1759 static void
1760 _dispatch_get_kq_init(void *context __attribute__((unused)))
1761 {
1762 static const struct kevent kev = {
1763 .ident = 1,
1764 .filter = EVFILT_USER,
1765 .flags = EV_ADD|EV_CLEAR,
1766 };
1767
1768 _dispatch_kq = kqueue();
1769 _dispatch_safe_fork = false;
1770 // in case we fall back to select()
1771 FD_SET(_dispatch_kq, &_dispatch_rfds);
1772
1773 if (_dispatch_kq == -1) {
1774 dispatch_assert_zero(errno);
1775 }
1776
1777 dispatch_assume_zero(kevent(_dispatch_kq, &kev, 1, NULL, 0, NULL));
1778
1779 _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q);
1780 }
1781
1782 static int
1783 _dispatch_get_kq(void)
1784 {
1785 static dispatch_once_t pred;
1786
1787 dispatch_once_f(&pred, NULL, _dispatch_get_kq_init);
1788
1789 return _dispatch_kq;
1790 }
1791
1792 static void
1793 _dispatch_mgr_thread2(struct kevent *kev, size_t cnt)
1794 {
1795 size_t i;
1796
1797 for (i = 0; i < cnt; i++) {
1798 // EVFILT_USER isn't used by sources
1799 if (kev[i].filter == EVFILT_USER) {
1800 // If _dispatch_mgr_thread2() ever is changed to return to the
1801 // caller, then this should become _dispatch_queue_drain()
1802 _dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q);
1803 } else {
1804 _dispatch_source_drain_kevent(&kev[i]);
1805 }
1806 }
1807 }
1808
1809 static dispatch_queue_t
1810 _dispatch_mgr_invoke(dispatch_queue_t dq)
1811 {
1812 static const struct timespec timeout_immediately = { 0, 0 };
1813 struct timespec timeout;
1814 const struct timespec *timeoutp;
1815 struct timeval sel_timeout, *sel_timeoutp;
1816 fd_set tmp_rfds, tmp_wfds;
1817 struct kevent kev[1];
1818 int k_cnt, k_err, i, r;
1819
1820 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1821
1822 for (;;) {
1823 _dispatch_run_timers();
1824
1825 timeoutp = _dispatch_get_next_timer_fire(&timeout);
1826
1827 if (_dispatch_select_workaround) {
1828 FD_COPY(&_dispatch_rfds, &tmp_rfds);
1829 FD_COPY(&_dispatch_wfds, &tmp_wfds);
1830 if (timeoutp) {
1831 sel_timeout.tv_sec = timeoutp->tv_sec;
1832 sel_timeout.tv_usec = (typeof(sel_timeout.tv_usec))(timeoutp->tv_nsec / 1000u);
1833 sel_timeoutp = &sel_timeout;
1834 } else {
1835 sel_timeoutp = NULL;
1836 }
1837
1838 r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, sel_timeoutp);
1839 if (r == -1) {
1840 if (errno != EBADF) {
1841 dispatch_assume_zero(errno);
1842 continue;
1843 }
1844 for (i = 0; i < FD_SETSIZE; i++) {
1845 if (i == _dispatch_kq) {
1846 continue;
1847 }
1848 if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)) {
1849 continue;
1850 }
1851 r = dup(i);
1852 if (r != -1) {
1853 close(r);
1854 } else {
1855 FD_CLR(i, &_dispatch_rfds);
1856 FD_CLR(i, &_dispatch_wfds);
1857 _dispatch_rfd_ptrs[i] = 0;
1858 _dispatch_wfd_ptrs[i] = 0;
1859 }
1860 }
1861 continue;
1862 }
1863
1864 if (r > 0) {
1865 for (i = 0; i < FD_SETSIZE; i++) {
1866 if (i == _dispatch_kq) {
1867 continue;
1868 }
1869 if (FD_ISSET(i, &tmp_rfds)) {
1870 FD_CLR(i, &_dispatch_rfds); // emulate EV_DISABLE
1871 EV_SET(&kev[0], i, EVFILT_READ, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_rfd_ptrs[i]);
1872 _dispatch_rfd_ptrs[i] = 0;
1873 _dispatch_mgr_thread2(kev, 1);
1874 }
1875 if (FD_ISSET(i, &tmp_wfds)) {
1876 FD_CLR(i, &_dispatch_wfds); // emulate EV_DISABLE
1877 EV_SET(&kev[0], i, EVFILT_WRITE, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_wfd_ptrs[i]);
1878 _dispatch_wfd_ptrs[i] = 0;
1879 _dispatch_mgr_thread2(kev, 1);
1880 }
1881 }
1882 }
1883
1884 timeoutp = &timeout_immediately;
1885 }
1886
1887 k_cnt = kevent(_dispatch_kq, NULL, 0, kev, sizeof(kev) / sizeof(kev[0]), timeoutp);
1888 k_err = errno;
1889
1890 switch (k_cnt) {
1891 case -1:
1892 if (k_err == EBADF) {
1893 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
1894 }
1895 dispatch_assume_zero(k_err);
1896 continue;
1897 default:
1898 _dispatch_mgr_thread2(kev, (size_t)k_cnt);
1899 // fall through
1900 case 0:
1901 _dispatch_force_cache_cleanup();
1902 continue;
1903 }
1904 }
1905
1906 return NULL;
1907 }
1908
1909 static bool
1910 _dispatch_mgr_wakeup(dispatch_queue_t dq)
1911 {
1912 static const struct kevent kev = {
1913 .ident = 1,
1914 .filter = EVFILT_USER,
1915 #ifdef EV_TRIGGER
1916 .flags = EV_TRIGGER,
1917 #endif
1918 #ifdef NOTE_TRIGGER
1919 .fflags = NOTE_TRIGGER,
1920 #endif
1921 };
1922
1923 _dispatch_debug("waking up the _dispatch_mgr_q: %p", dq);
1924
1925 _dispatch_update_kq(&kev);
1926
1927 return false;
1928 }
1929
1930 void
1931 _dispatch_update_kq(const struct kevent *kev)
1932 {
1933 struct kevent kev_copy = *kev;
1934 kev_copy.flags |= EV_RECEIPT;
1935
1936 if (kev_copy.flags & EV_DELETE) {
1937 switch (kev_copy.filter) {
1938 case EVFILT_READ:
1939 if (FD_ISSET((int)kev_copy.ident, &_dispatch_rfds)) {
1940 FD_CLR((int)kev_copy.ident, &_dispatch_rfds);
1941 _dispatch_rfd_ptrs[kev_copy.ident] = 0;
1942 return;
1943 }
1944 case EVFILT_WRITE:
1945 if (FD_ISSET((int)kev_copy.ident, &_dispatch_wfds)) {
1946 FD_CLR((int)kev_copy.ident, &_dispatch_wfds);
1947 _dispatch_wfd_ptrs[kev_copy.ident] = 0;
1948 return;
1949 }
1950 default:
1951 break;
1952 }
1953 }
1954
1955 int rval = kevent(_dispatch_get_kq(), &kev_copy, 1, &kev_copy, 1, NULL);
1956 if (rval == -1) {
1957 // If we fail to register with kevents, for other reasons aside from
1958 // changelist elements.
1959 dispatch_assume_zero(errno);
1960 //kev_copy.flags |= EV_ERROR;
1961 //kev_copy.data = error;
1962 return;
1963 }
1964
1965 // The following select workaround only applies to adding kevents
1966 if (!(kev->flags & EV_ADD)) {
1967 return;
1968 }
1969
1970 switch (kev_copy.data) {
1971 case 0:
1972 return;
1973 case EBADF:
1974 break;
1975 default:
1976 // If an error occurred while registering with kevent, and it was
1977 // because of a kevent changelist processing && the kevent involved
1978 // either doing a read or write, it would indicate we were trying
1979 // to register a /dev/* port; fall back to select
1980 switch (kev_copy.filter) {
1981 case EVFILT_READ:
1982 _dispatch_select_workaround = true;
1983 FD_SET((int)kev_copy.ident, &_dispatch_rfds);
1984 _dispatch_rfd_ptrs[kev_copy.ident] = kev_copy.udata;
1985 break;
1986 case EVFILT_WRITE:
1987 _dispatch_select_workaround = true;
1988 FD_SET((int)kev_copy.ident, &_dispatch_wfds);
1989 _dispatch_wfd_ptrs[kev_copy.ident] = kev_copy.udata;
1990 break;
1991 default:
1992 _dispatch_source_drain_kevent(&kev_copy);
1993 break;
1994 }
1995 break;
1996 }
1997 }
1998
1999 static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = {
2000 .do_type = DISPATCH_QUEUE_MGR_TYPE,
2001 .do_kind = "mgr-queue",
2002 .do_invoke = _dispatch_mgr_invoke,
2003 .do_debug = dispatch_queue_debug,
2004 .do_probe = _dispatch_mgr_wakeup,
2005 };
2006
2007 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
2008 struct dispatch_queue_s _dispatch_mgr_q = {
2009 .do_vtable = &_dispatch_queue_mgr_vtable,
2010 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
2011 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
2012 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
2013 .do_targetq = &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_COUNT - 1],
2014
2015 .dq_label = "com.apple.libdispatch-manager",
2016 .dq_width = 1,
2017 .dq_serialnum = 2,
2018 };
2019
2020 const struct dispatch_queue_offsets_s dispatch_queue_offsets = {
2021 .dqo_version = 3,
2022 .dqo_label = offsetof(struct dispatch_queue_s, dq_label),
2023 .dqo_label_size = sizeof(_dispatch_main_q.dq_label),
2024 .dqo_flags = 0,
2025 .dqo_flags_size = 0,
2026 .dqo_width = offsetof(struct dispatch_queue_s, dq_width),
2027 .dqo_width_size = sizeof(_dispatch_main_q.dq_width),
2028 .dqo_serialnum = offsetof(struct dispatch_queue_s, dq_serialnum),
2029 .dqo_serialnum_size = sizeof(_dispatch_main_q.dq_serialnum),
2030 .dqo_running = offsetof(struct dispatch_queue_s, dq_running),
2031 .dqo_running_size = sizeof(_dispatch_main_q.dq_running),
2032 };
2033
2034 #ifdef __BLOCKS__
2035 void
2036 dispatch_after(dispatch_time_t when, dispatch_queue_t queue, dispatch_block_t work)
2037 {
2038 // test before the copy of the block
2039 if (when == DISPATCH_TIME_FOREVER) {
2040 #if DISPATCH_DEBUG
2041 DISPATCH_CLIENT_CRASH("dispatch_after() called with 'when' == infinity");
2042 #endif
2043 return;
2044 }
2045 dispatch_after_f(when, queue, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
2046 }
2047 #endif
2048
2049 DISPATCH_NOINLINE
2050 void
2051 dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt, void (*func)(void *))
2052 {
2053 uint64_t delta;
2054 if (when == DISPATCH_TIME_FOREVER) {
2055 #if DISPATCH_DEBUG
2056 DISPATCH_CLIENT_CRASH("dispatch_after_f() called with 'when' == infinity");
2057 #endif
2058 return;
2059 }
2060
2061 // this function can and should be optimized to not use a dispatch source
2062 again:
2063 delta = _dispatch_timeout(when);
2064 if (delta == 0) {
2065 return dispatch_async_f(queue, ctxt, func);
2066 }
2067 if (!dispatch_source_timer_create(DISPATCH_TIMER_INTERVAL, delta, 0, NULL, queue, ^(dispatch_source_t ds) {
2068 long err_dom, err_val;
2069 if ((err_dom = dispatch_source_get_error(ds, &err_val))) {
2070 dispatch_assert(err_dom == DISPATCH_ERROR_DOMAIN_POSIX);
2071 dispatch_assert(err_val == ECANCELED);
2072 func(ctxt);
2073 dispatch_release(ds); // MUST NOT be _dispatch_release()
2074 } else {
2075 dispatch_source_cancel(ds);
2076 }
2077 })) {
2078 goto again;
2079 }
2080 }