]> git.saurik.com Git - apple/libdispatch.git/blob - src/queue.c
libdispatch-84.5.5.tar.gz
[apple/libdispatch.git] / src / queue.c
1 /*
2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #include "protocol.h"
23
24 void
25 dummy_function(void)
26 {
27 }
28
29 long
30 dummy_function_r0(void)
31 {
32 return 0;
33 }
34
35 static bool _dispatch_select_workaround;
36 static fd_set _dispatch_rfds;
37 static fd_set _dispatch_wfds;
38 static void *_dispatch_rfd_ptrs[FD_SETSIZE];
39 static void *_dispatch_wfd_ptrs[FD_SETSIZE];
40
41
42 static struct dispatch_semaphore_s _dispatch_thread_mediator[] = {
43 {
44 .do_vtable = &_dispatch_semaphore_vtable,
45 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
46 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
47 },
48 {
49 .do_vtable = &_dispatch_semaphore_vtable,
50 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
51 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
52 },
53 {
54 .do_vtable = &_dispatch_semaphore_vtable,
55 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
56 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
57 },
58 {
59 .do_vtable = &_dispatch_semaphore_vtable,
60 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
61 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
62 },
63 {
64 .do_vtable = &_dispatch_semaphore_vtable,
65 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
66 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
67 },
68 {
69 .do_vtable = &_dispatch_semaphore_vtable,
70 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
71 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
72 },
73 };
74
75 static struct dispatch_queue_s _dispatch_root_queues[];
76
77 static inline dispatch_queue_t
78 _dispatch_get_root_queue(long priority, bool overcommit)
79 {
80 if (overcommit) switch (priority) {
81 case DISPATCH_QUEUE_PRIORITY_LOW:
82 return &_dispatch_root_queues[1];
83 case DISPATCH_QUEUE_PRIORITY_DEFAULT:
84 return &_dispatch_root_queues[3];
85 case DISPATCH_QUEUE_PRIORITY_HIGH:
86 return &_dispatch_root_queues[5];
87 }
88 switch (priority) {
89 case DISPATCH_QUEUE_PRIORITY_LOW:
90 return &_dispatch_root_queues[0];
91 case DISPATCH_QUEUE_PRIORITY_DEFAULT:
92 return &_dispatch_root_queues[2];
93 case DISPATCH_QUEUE_PRIORITY_HIGH:
94 return &_dispatch_root_queues[4];
95 default:
96 return NULL;
97 }
98 }
99
100 #ifdef __BLOCKS__
101 dispatch_block_t
102 _dispatch_Block_copy(dispatch_block_t db)
103 {
104 dispatch_block_t rval;
105
106 while (!(rval = Block_copy(db))) {
107 sleep(1);
108 }
109
110 return rval;
111 }
112 #define _dispatch_Block_copy(x) ((typeof(x))_dispatch_Block_copy(x))
113
114 void
115 _dispatch_call_block_and_release(void *block)
116 {
117 void (^b)(void) = block;
118 b();
119 Block_release(b);
120 }
121
122 void
123 _dispatch_call_block_and_release2(void *block, void *ctxt)
124 {
125 void (^b)(void*) = block;
126 b(ctxt);
127 Block_release(b);
128 }
129
130 #endif /* __BLOCKS__ */
131
132 struct dispatch_queue_attr_vtable_s {
133 DISPATCH_VTABLE_HEADER(dispatch_queue_attr_s);
134 };
135
136 struct dispatch_queue_attr_s {
137 DISPATCH_STRUCT_HEADER(dispatch_queue_attr_s, dispatch_queue_attr_vtable_s);
138
139 // Public:
140 int qa_priority;
141 void* finalizer_ctxt;
142 dispatch_queue_finalizer_function_t finalizer_func;
143
144 // Private:
145 unsigned long qa_flags;
146 };
147
148 static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
149
150 #define _dispatch_queue_trylock(dq) dispatch_atomic_cmpxchg(&(dq)->dq_running, 0, 1)
151 static inline void _dispatch_queue_unlock(dispatch_queue_t dq);
152 static void _dispatch_queue_invoke(dispatch_queue_t dq);
153 static void _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq);
154 static bool _dispatch_queue_wakeup_global(dispatch_queue_t dq);
155 static struct dispatch_object_s *_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq);
156
157 static bool _dispatch_program_is_probably_callback_driven;
158
159 #if DISPATCH_COCOA_COMPAT
160 // dispatch_begin_thread_4GC having non-default value triggers GC-only slow paths and
161 // is checked frequently, testing against NULL is faster than comparing for equality
162 // with "dummy_function"
163 void (*dispatch_begin_thread_4GC)(void) = NULL;
164 void (*dispatch_end_thread_4GC)(void) = dummy_function;
165 void *(*_dispatch_begin_NSAutoReleasePool)(void) = (void *)dummy_function;
166 void (*_dispatch_end_NSAutoReleasePool)(void *) = (void *)dummy_function;
167 static void _dispatch_queue_wakeup_main(void);
168
169 static dispatch_once_t _dispatch_main_q_port_pred;
170 static bool main_q_is_draining;
171 static mach_port_t main_q_port;
172 #endif
173
174 static void _dispatch_cache_cleanup2(void *value);
175 static void _dispatch_force_cache_cleanup(void);
176
177 static const struct dispatch_queue_vtable_s _dispatch_queue_vtable = {
178 .do_type = DISPATCH_QUEUE_TYPE,
179 .do_kind = "queue",
180 .do_dispose = _dispatch_queue_dispose,
181 .do_invoke = (void *)dummy_function_r0,
182 .do_probe = (void *)dummy_function_r0,
183 .do_debug = dispatch_queue_debug,
184 };
185
186 static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable = {
187 .do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
188 .do_kind = "global-queue",
189 .do_debug = dispatch_queue_debug,
190 .do_probe = _dispatch_queue_wakeup_global,
191 };
192
193 #define MAX_THREAD_COUNT 255
194
195 struct dispatch_root_queue_context_s {
196 pthread_workqueue_t dgq_kworkqueue;
197 uint32_t dgq_pending;
198 uint32_t dgq_thread_pool_size;
199 dispatch_semaphore_t dgq_thread_mediator;
200 };
201
202 #define DISPATCH_ROOT_QUEUE_COUNT (DISPATCH_QUEUE_PRIORITY_COUNT * 2)
203 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
204 {
205 .dgq_thread_mediator = &_dispatch_thread_mediator[0],
206 .dgq_thread_pool_size = MAX_THREAD_COUNT,
207 },
208 {
209 .dgq_thread_mediator = &_dispatch_thread_mediator[1],
210 .dgq_thread_pool_size = MAX_THREAD_COUNT,
211 },
212 {
213 .dgq_thread_mediator = &_dispatch_thread_mediator[2],
214 .dgq_thread_pool_size = MAX_THREAD_COUNT,
215 },
216 {
217 .dgq_thread_mediator = &_dispatch_thread_mediator[3],
218 .dgq_thread_pool_size = MAX_THREAD_COUNT,
219 },
220 {
221 .dgq_thread_mediator = &_dispatch_thread_mediator[4],
222 .dgq_thread_pool_size = MAX_THREAD_COUNT,
223 },
224 {
225 .dgq_thread_mediator = &_dispatch_thread_mediator[5],
226 .dgq_thread_pool_size = MAX_THREAD_COUNT,
227 },
228 };
229
230 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
231 // dq_running is set to 2 so that barrier operations go through the slow path
232 static struct dispatch_queue_s _dispatch_root_queues[] = {
233 {
234 .do_vtable = &_dispatch_queue_root_vtable,
235 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
236 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
237 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
238 .do_ctxt = &_dispatch_root_queue_contexts[0],
239
240 .dq_label = "com.apple.root.low-priority",
241 .dq_running = 2,
242 .dq_width = UINT32_MAX,
243 .dq_serialnum = 4,
244 },
245 {
246 .do_vtable = &_dispatch_queue_root_vtable,
247 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
248 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
249 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
250 .do_ctxt = &_dispatch_root_queue_contexts[1],
251
252 .dq_label = "com.apple.root.low-overcommit-priority",
253 .dq_running = 2,
254 .dq_width = UINT32_MAX,
255 .dq_serialnum = 5,
256 },
257 {
258 .do_vtable = &_dispatch_queue_root_vtable,
259 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
260 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
261 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
262 .do_ctxt = &_dispatch_root_queue_contexts[2],
263
264 .dq_label = "com.apple.root.default-priority",
265 .dq_running = 2,
266 .dq_width = UINT32_MAX,
267 .dq_serialnum = 6,
268 },
269 {
270 .do_vtable = &_dispatch_queue_root_vtable,
271 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
272 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
273 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
274 .do_ctxt = &_dispatch_root_queue_contexts[3],
275
276 .dq_label = "com.apple.root.default-overcommit-priority",
277 .dq_running = 2,
278 .dq_width = UINT32_MAX,
279 .dq_serialnum = 7,
280 },
281 {
282 .do_vtable = &_dispatch_queue_root_vtable,
283 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
284 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
285 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
286 .do_ctxt = &_dispatch_root_queue_contexts[4],
287
288 .dq_label = "com.apple.root.high-priority",
289 .dq_running = 2,
290 .dq_width = UINT32_MAX,
291 .dq_serialnum = 8,
292 },
293 {
294 .do_vtable = &_dispatch_queue_root_vtable,
295 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
296 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
297 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
298 .do_ctxt = &_dispatch_root_queue_contexts[5],
299
300 .dq_label = "com.apple.root.high-overcommit-priority",
301 .dq_running = 2,
302 .dq_width = UINT32_MAX,
303 .dq_serialnum = 9,
304 },
305 };
306
307 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
308 struct dispatch_queue_s _dispatch_main_q = {
309 .do_vtable = &_dispatch_queue_vtable,
310 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
311 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
312 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
313 .do_targetq = &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_COUNT / 2],
314
315 .dq_label = "com.apple.main-thread",
316 .dq_running = 1,
317 .dq_width = 1,
318 .dq_serialnum = 1,
319 };
320
321 #if DISPATCH_PERF_MON
322 static OSSpinLock _dispatch_stats_lock;
323 static size_t _dispatch_bad_ratio;
324 static struct {
325 uint64_t time_total;
326 uint64_t count_total;
327 uint64_t thread_total;
328 } _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set
329 static void _dispatch_queue_merge_stats(uint64_t start);
330 #endif
331
332 static void *_dispatch_worker_thread(void *context);
333 static void _dispatch_worker_thread2(void *context);
334
335 malloc_zone_t *_dispatch_ccache_zone;
336
337 static inline void
338 _dispatch_continuation_free(dispatch_continuation_t dc)
339 {
340 dispatch_continuation_t prev_dc = _dispatch_thread_getspecific(dispatch_cache_key);
341 dc->do_next = prev_dc;
342 _dispatch_thread_setspecific(dispatch_cache_key, dc);
343 }
344
345 static inline void
346 _dispatch_continuation_pop(dispatch_object_t dou)
347 {
348 dispatch_continuation_t dc = dou._dc;
349 dispatch_group_t dg;
350
351 if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
352 return _dispatch_queue_invoke(dou._dq);
353 }
354
355 // Add the item back to the cache before calling the function. This
356 // allows the 'hot' continuation to be used for a quick callback.
357 //
358 // The ccache version is per-thread.
359 // Therefore, the object has not been reused yet.
360 // This generates better assembly.
361 if ((long)dou._do->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
362 _dispatch_continuation_free(dc);
363 }
364 if ((long)dou._do->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
365 dg = dc->dc_group;
366 } else {
367 dg = NULL;
368 }
369 dc->dc_func(dc->dc_ctxt);
370 if (dg) {
371 dispatch_group_leave(dg);
372 _dispatch_release(dg);
373 }
374 }
375
376 struct dispatch_object_s *
377 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq)
378 {
379 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
380
381 // The mediator value acts both as a "lock" and a signal
382 head = dispatch_atomic_xchg(&dq->dq_items_head, mediator);
383
384 if (slowpath(head == NULL)) {
385 // The first xchg on the tail will tell the enqueueing thread that it
386 // is safe to blindly write out to the head pointer. A cmpxchg honors
387 // the algorithm.
388 dispatch_atomic_cmpxchg(&dq->dq_items_head, mediator, NULL);
389 _dispatch_debug("no work on global work queue");
390 return NULL;
391 }
392
393 if (slowpath(head == mediator)) {
394 // This thread lost the race for ownership of the queue.
395 //
396 // The ratio of work to libdispatch overhead must be bad. This
397 // scenario implies that there are too many threads in the pool.
398 // Create a new pending thread and then exit this thread.
399 // The kernel will grant a new thread when the load subsides.
400 _dispatch_debug("Contention on queue: %p", dq);
401 _dispatch_queue_wakeup_global(dq);
402 #if DISPATCH_PERF_MON
403 dispatch_atomic_inc(&_dispatch_bad_ratio);
404 #endif
405 return NULL;
406 }
407
408 // Restore the head pointer to a sane value before returning.
409 // If 'next' is NULL, then this item _might_ be the last item.
410 next = fastpath(head->do_next);
411
412 if (slowpath(!next)) {
413 dq->dq_items_head = NULL;
414
415 if (dispatch_atomic_cmpxchg(&dq->dq_items_tail, head, NULL)) {
416 // both head and tail are NULL now
417 goto out;
418 }
419
420 // There must be a next item now. This thread won't wait long.
421 while (!(next = head->do_next)) {
422 _dispatch_hardware_pause();
423 }
424 }
425
426 dq->dq_items_head = next;
427 _dispatch_queue_wakeup_global(dq);
428 out:
429 return head;
430 }
431
432 dispatch_queue_t
433 dispatch_get_current_queue(void)
434 {
435 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
436 }
437
438 #undef dispatch_get_main_queue
439 __OSX_AVAILABLE_STARTING(__MAC_10_6,__IPHONE_NA)
440 dispatch_queue_t dispatch_get_main_queue(void);
441
442 dispatch_queue_t
443 dispatch_get_main_queue(void)
444 {
445 return &_dispatch_main_q;
446 }
447 #define dispatch_get_main_queue() (&_dispatch_main_q)
448
449 struct _dispatch_hw_config_s _dispatch_hw_config;
450
451 static void
452 _dispatch_queue_set_width_init(void)
453 {
454 size_t valsz = sizeof(uint32_t);
455
456 errno = 0;
457 sysctlbyname("hw.activecpu", &_dispatch_hw_config.cc_max_active, &valsz, NULL, 0);
458 dispatch_assume_zero(errno);
459 dispatch_assume(valsz == sizeof(uint32_t));
460
461 errno = 0;
462 sysctlbyname("hw.logicalcpu_max", &_dispatch_hw_config.cc_max_logical, &valsz, NULL, 0);
463 dispatch_assume_zero(errno);
464 dispatch_assume(valsz == sizeof(uint32_t));
465
466 errno = 0;
467 sysctlbyname("hw.physicalcpu_max", &_dispatch_hw_config.cc_max_physical, &valsz, NULL, 0);
468 dispatch_assume_zero(errno);
469 dispatch_assume(valsz == sizeof(uint32_t));
470 }
471
472 void
473 dispatch_queue_set_width(dispatch_queue_t dq, long width)
474 {
475 int w = (int)width; // intentional truncation
476 uint32_t tmp;
477
478 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
479 return;
480 }
481 if (w == 1 || w == 0) {
482 dq->dq_width = 1;
483 return;
484 }
485 if (w > 0) {
486 tmp = w;
487 } else switch (w) {
488 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
489 tmp = _dispatch_hw_config.cc_max_physical;
490 break;
491 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
492 tmp = _dispatch_hw_config.cc_max_active;
493 break;
494 default:
495 // fall through
496 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
497 tmp = _dispatch_hw_config.cc_max_logical;
498 break;
499 }
500 // multiply by two since the running count is inc/dec by two (the low bit == barrier)
501 dq->dq_width = tmp * 2;
502
503 // XXX if the queue has items and the width is increased, we should try to wake the queue
504 }
505
506 // skip zero
507 // 1 - main_q
508 // 2 - mgr_q
509 // 3 - _unused_
510 // 4,5,6,7,8,9 - global queues
511 // we use 'xadd' on Intel, so the initial value == next assigned
512 static unsigned long _dispatch_queue_serial_numbers = 10;
513
514 // Note to later developers: ensure that any initialization changes are
515 // made for statically allocated queues (i.e. _dispatch_main_q).
516 inline void
517 _dispatch_queue_init(dispatch_queue_t dq)
518 {
519 dq->do_vtable = &_dispatch_queue_vtable;
520 dq->do_next = DISPATCH_OBJECT_LISTLESS;
521 dq->do_ref_cnt = 1;
522 dq->do_xref_cnt = 1;
523 dq->do_targetq = _dispatch_get_root_queue(0, true);
524 dq->dq_running = 0;
525 dq->dq_width = 1;
526 dq->dq_serialnum = dispatch_atomic_inc(&_dispatch_queue_serial_numbers) - 1;
527 }
528
529 dispatch_queue_t
530 dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
531 {
532 dispatch_queue_t dq;
533 size_t label_len;
534
535 if (!label) {
536 label = "";
537 }
538
539 label_len = strlen(label);
540 if (label_len < (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1)) {
541 label_len = (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1);
542 }
543
544 // XXX switch to malloc()
545 dq = calloc(1ul, sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_MIN_LABEL_SIZE + label_len + 1);
546 if (slowpath(!dq)) {
547 return dq;
548 }
549
550 _dispatch_queue_init(dq);
551 strcpy(dq->dq_label, label);
552
553 #ifndef DISPATCH_NO_LEGACY
554 if (slowpath(attr)) {
555 dq->do_targetq = _dispatch_get_root_queue(attr->qa_priority, attr->qa_flags & DISPATCH_QUEUE_OVERCOMMIT);
556 dq->dq_finalizer_ctxt = attr->finalizer_ctxt;
557 dq->dq_finalizer_func = attr->finalizer_func;
558 #ifdef __BLOCKS__
559 if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
560 // if finalizer_ctxt is a Block, retain it.
561 dq->dq_finalizer_ctxt = Block_copy(dq->dq_finalizer_ctxt);
562 if (!(dq->dq_finalizer_ctxt)) {
563 goto out_bad;
564 }
565 }
566 #endif
567 }
568 #endif
569
570 return dq;
571
572 out_bad:
573 free(dq);
574 return NULL;
575 }
576
577 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
578 void
579 _dispatch_queue_dispose(dispatch_queue_t dq)
580 {
581 if (slowpath(dq == _dispatch_queue_get_current())) {
582 DISPATCH_CRASH("Release of a queue by itself");
583 }
584 if (slowpath(dq->dq_items_tail)) {
585 DISPATCH_CRASH("Release of a queue while items are enqueued");
586 }
587
588 #ifndef DISPATCH_NO_LEGACY
589 if (dq->dq_finalizer_func) {
590 dq->dq_finalizer_func(dq->dq_finalizer_ctxt, dq);
591 }
592 #endif
593
594 // trash the tail queue so that use after free will crash
595 dq->dq_items_tail = (void *)0x200;
596
597 _dispatch_dispose(dq);
598 }
599
600 DISPATCH_NOINLINE
601 void
602 _dispatch_queue_push_list_slow(dispatch_queue_t dq, struct dispatch_object_s *obj)
603 {
604 // The queue must be retained before dq_items_head is written in order
605 // to ensure that the reference is still valid when _dispatch_wakeup is
606 // called. Otherwise, if preempted between the assignment to
607 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
608 // queue may release the last reference to the queue when invoked by
609 // _dispatch_queue_drain. <rdar://problem/6932776>
610 _dispatch_retain(dq);
611 dq->dq_items_head = obj;
612 _dispatch_wakeup(dq);
613 _dispatch_release(dq);
614 }
615
616 DISPATCH_NOINLINE
617 static void
618 _dispatch_barrier_async_f_slow(dispatch_queue_t dq, void *context, dispatch_function_t func)
619 {
620 dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_from_heap());
621
622 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
623 dc->dc_func = func;
624 dc->dc_ctxt = context;
625
626 _dispatch_queue_push(dq, dc);
627 }
628
629 #ifdef __BLOCKS__
630 void
631 dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void))
632 {
633 dispatch_barrier_async_f(dq, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
634 }
635 #endif
636
637 DISPATCH_NOINLINE
638 void
639 dispatch_barrier_async_f(dispatch_queue_t dq, void *context, dispatch_function_t func)
640 {
641 dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_cacheonly());
642
643 if (!dc) {
644 return _dispatch_barrier_async_f_slow(dq, context, func);
645 }
646
647 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
648 dc->dc_func = func;
649 dc->dc_ctxt = context;
650
651 _dispatch_queue_push(dq, dc);
652 }
653
654 DISPATCH_NOINLINE
655 static void
656 _dispatch_async_f_slow(dispatch_queue_t dq, void *context, dispatch_function_t func)
657 {
658 dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_from_heap());
659
660 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
661 dc->dc_func = func;
662 dc->dc_ctxt = context;
663
664 _dispatch_queue_push(dq, dc);
665 }
666
667 #ifdef __BLOCKS__
668 void
669 dispatch_async(dispatch_queue_t dq, void (^work)(void))
670 {
671 dispatch_async_f(dq, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
672 }
673 #endif
674
675 DISPATCH_NOINLINE
676 void
677 dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
678 {
679 dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_cacheonly());
680
681 // unlike dispatch_sync_f(), we do NOT need to check the queue width,
682 // the "drain" function will do this test
683
684 if (!dc) {
685 return _dispatch_async_f_slow(dq, ctxt, func);
686 }
687
688 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
689 dc->dc_func = func;
690 dc->dc_ctxt = ctxt;
691
692 _dispatch_queue_push(dq, dc);
693 }
694
695 struct dispatch_barrier_sync_slow2_s {
696 dispatch_queue_t dbss2_dq;
697 #if DISPATCH_COCOA_COMPAT
698 dispatch_function_t dbss2_func;
699 dispatch_function_t dbss2_ctxt;
700 #endif
701 dispatch_semaphore_t dbss2_sema;
702 };
703
704 static void
705 _dispatch_barrier_sync_f_slow_invoke(void *ctxt)
706 {
707 struct dispatch_barrier_sync_slow2_s *dbss2 = ctxt;
708
709 dispatch_assert(dbss2->dbss2_dq == dispatch_get_current_queue());
710 #if DISPATCH_COCOA_COMPAT
711 // When the main queue is bound to the main thread
712 if (dbss2->dbss2_dq == &_dispatch_main_q && pthread_main_np()) {
713 dbss2->dbss2_func(dbss2->dbss2_ctxt);
714 dbss2->dbss2_func = NULL;
715 dispatch_semaphore_signal(dbss2->dbss2_sema);
716 return;
717 }
718 #endif
719 dispatch_suspend(dbss2->dbss2_dq);
720 dispatch_semaphore_signal(dbss2->dbss2_sema);
721 }
722
723 DISPATCH_NOINLINE
724 static void
725 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
726 {
727
728 // It's preferred to execute synchronous blocks on the current thread
729 // due to thread-local side effects, garbage collection, etc. However,
730 // blocks submitted to the main thread MUST be run on the main thread
731
732 struct dispatch_barrier_sync_slow2_s dbss2 = {
733 .dbss2_dq = dq,
734 #if DISPATCH_COCOA_COMPAT
735 .dbss2_func = func,
736 .dbss2_ctxt = ctxt,
737 #endif
738 .dbss2_sema = _dispatch_get_thread_semaphore(),
739 };
740 struct dispatch_barrier_sync_slow_s {
741 DISPATCH_CONTINUATION_HEADER(dispatch_barrier_sync_slow_s);
742 } dbss = {
743 .do_vtable = (void *)DISPATCH_OBJ_BARRIER_BIT,
744 .dc_func = _dispatch_barrier_sync_f_slow_invoke,
745 .dc_ctxt = &dbss2,
746 };
747
748 _dispatch_queue_push(dq, (void *)&dbss);
749 dispatch_semaphore_wait(dbss2.dbss2_sema, DISPATCH_TIME_FOREVER);
750 _dispatch_put_thread_semaphore(dbss2.dbss2_sema);
751
752 #if DISPATCH_COCOA_COMPAT
753 // Main queue bound to main thread
754 if (dbss2.dbss2_func == NULL) {
755 return;
756 }
757 #endif
758 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
759 _dispatch_thread_setspecific(dispatch_queue_key, dq);
760 func(ctxt);
761 _dispatch_workitem_inc();
762 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
763 dispatch_resume(dq);
764 }
765
766 #ifdef __BLOCKS__
767 #if DISPATCH_COCOA_COMPAT
768 DISPATCH_NOINLINE
769 static void
770 _dispatch_barrier_sync_slow(dispatch_queue_t dq, void (^work)(void))
771 {
772 // Blocks submitted to the main queue MUST be run on the main thread,
773 // therefore under GC we must Block_copy in order to notify the thread-local
774 // garbage collector that the objects are transferring to the main thread
775 // rdar://problem/7176237&7181849&7458685
776 if (dispatch_begin_thread_4GC) {
777 dispatch_block_t block = _dispatch_Block_copy(work);
778 return dispatch_barrier_sync_f(dq, block, _dispatch_call_block_and_release);
779 }
780 struct Block_basic *bb = (void *)work;
781 dispatch_barrier_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
782 }
783 #endif
784
785 void
786 dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void))
787 {
788 #if DISPATCH_COCOA_COMPAT
789 if (slowpath(dq == &_dispatch_main_q)) {
790 return _dispatch_barrier_sync_slow(dq, work);
791 }
792 #endif
793 struct Block_basic *bb = (void *)work;
794 dispatch_barrier_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
795 }
796 #endif
797
798 DISPATCH_NOINLINE
799 void
800 dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
801 {
802 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
803
804 // 1) ensure that this thread hasn't enqueued anything ahead of this call
805 // 2) the queue is not suspended
806 // 3) the queue is not weird
807 if (slowpath(dq->dq_items_tail)
808 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))
809 || slowpath(!_dispatch_queue_trylock(dq))) {
810 return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
811 }
812
813 _dispatch_thread_setspecific(dispatch_queue_key, dq);
814 func(ctxt);
815 _dispatch_workitem_inc();
816 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
817 _dispatch_queue_unlock(dq);
818 }
819
820 static void
821 _dispatch_sync_f_slow2(void *ctxt)
822 {
823 dispatch_queue_t dq = _dispatch_queue_get_current();
824 dispatch_atomic_add(&dq->dq_running, 2);
825 dispatch_semaphore_signal(ctxt);
826 }
827
828 DISPATCH_NOINLINE
829 static void
830 _dispatch_sync_f_slow(dispatch_queue_t dq)
831 {
832 // the global root queues do not need strict ordering
833 if (dq->do_targetq == NULL) {
834 dispatch_atomic_add(&dq->dq_running, 2);
835 return;
836 }
837
838 struct dispatch_sync_slow_s {
839 DISPATCH_CONTINUATION_HEADER(dispatch_sync_slow_s);
840 } dss = {
841 .do_vtable = NULL,
842 .dc_func = _dispatch_sync_f_slow2,
843 .dc_ctxt = _dispatch_get_thread_semaphore(),
844 };
845
846 // XXX FIXME -- concurrent queues can be come serial again
847 _dispatch_queue_push(dq, (void *)&dss);
848
849 dispatch_semaphore_wait(dss.dc_ctxt, DISPATCH_TIME_FOREVER);
850 _dispatch_put_thread_semaphore(dss.dc_ctxt);
851 }
852
853 #ifdef __BLOCKS__
854 #if DISPATCH_COCOA_COMPAT
855 DISPATCH_NOINLINE
856 static void
857 _dispatch_sync_slow(dispatch_queue_t dq, void (^work)(void))
858 {
859 // Blocks submitted to the main queue MUST be run on the main thread,
860 // therefore under GC we must Block_copy in order to notify the thread-local
861 // garbage collector that the objects are transferring to the main thread
862 // rdar://problem/7176237&7181849&7458685
863 if (dispatch_begin_thread_4GC) {
864 dispatch_block_t block = _dispatch_Block_copy(work);
865 return dispatch_sync_f(dq, block, _dispatch_call_block_and_release);
866 }
867 struct Block_basic *bb = (void *)work;
868 dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
869 }
870 #endif
871
872 void
873 dispatch_sync(dispatch_queue_t dq, void (^work)(void))
874 {
875 #if DISPATCH_COCOA_COMPAT
876 if (slowpath(dq == &_dispatch_main_q)) {
877 return _dispatch_sync_slow(dq, work);
878 }
879 #endif
880 struct Block_basic *bb = (void *)work;
881 dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
882 }
883 #endif
884
885 DISPATCH_NOINLINE
886 void
887 dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
888 {
889 typeof(dq->dq_running) prev_cnt;
890 dispatch_queue_t old_dq;
891
892 if (dq->dq_width == 1) {
893 return dispatch_barrier_sync_f(dq, ctxt, func);
894 }
895
896 // 1) ensure that this thread hasn't enqueued anything ahead of this call
897 // 2) the queue is not suspended
898 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
899 _dispatch_sync_f_slow(dq);
900 } else {
901 prev_cnt = dispatch_atomic_add(&dq->dq_running, 2) - 2;
902
903 if (slowpath(prev_cnt & 1)) {
904 if (dispatch_atomic_sub(&dq->dq_running, 2) == 0) {
905 _dispatch_wakeup(dq);
906 }
907 _dispatch_sync_f_slow(dq);
908 }
909 }
910
911 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
912 _dispatch_thread_setspecific(dispatch_queue_key, dq);
913 func(ctxt);
914 _dispatch_workitem_inc();
915 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
916
917 if (slowpath(dispatch_atomic_sub(&dq->dq_running, 2) == 0)) {
918 _dispatch_wakeup(dq);
919 }
920 }
921
922 const char *
923 dispatch_queue_get_label(dispatch_queue_t dq)
924 {
925 return dq->dq_label;
926 }
927
928 #if DISPATCH_COCOA_COMPAT
929 static void
930 _dispatch_main_q_port_init(void *ctxt __attribute__((unused)))
931 {
932 kern_return_t kr;
933
934 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &main_q_port);
935 DISPATCH_VERIFY_MIG(kr);
936 dispatch_assume_zero(kr);
937 kr = mach_port_insert_right(mach_task_self(), main_q_port, main_q_port, MACH_MSG_TYPE_MAKE_SEND);
938 DISPATCH_VERIFY_MIG(kr);
939 dispatch_assume_zero(kr);
940
941 _dispatch_program_is_probably_callback_driven = true;
942 _dispatch_safe_fork = false;
943 }
944
945 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
946 DISPATCH_NOINLINE
947 static void
948 _dispatch_queue_set_mainq_drain_state(bool arg)
949 {
950 main_q_is_draining = arg;
951 }
952 #endif
953
954 void
955 dispatch_main(void)
956 {
957 if (pthread_main_np()) {
958 _dispatch_program_is_probably_callback_driven = true;
959 pthread_exit(NULL);
960 DISPATCH_CRASH("pthread_exit() returned");
961 }
962 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
963 }
964
965 static void
966 _dispatch_sigsuspend(void *ctxt __attribute__((unused)))
967 {
968 static const sigset_t mask;
969
970 for (;;) {
971 sigsuspend(&mask);
972 }
973 }
974
975 DISPATCH_NOINLINE
976 static void
977 _dispatch_queue_cleanup2(void)
978 {
979 dispatch_atomic_dec(&_dispatch_main_q.dq_running);
980
981 if (dispatch_atomic_sub(&_dispatch_main_q.do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK) == 0) {
982 _dispatch_wakeup(&_dispatch_main_q);
983 }
984
985 // overload the "probably" variable to mean that dispatch_main() or
986 // similar non-POSIX API was called
987 // this has to run before the DISPATCH_COCOA_COMPAT below
988 if (_dispatch_program_is_probably_callback_driven) {
989 dispatch_async_f(_dispatch_get_root_queue(0, 0), NULL, _dispatch_sigsuspend);
990 sleep(1); // workaround 6778970
991 }
992
993 #if DISPATCH_COCOA_COMPAT
994 dispatch_once_f(&_dispatch_main_q_port_pred, NULL, _dispatch_main_q_port_init);
995
996 mach_port_t mp = main_q_port;
997 kern_return_t kr;
998
999 main_q_port = 0;
1000
1001 if (mp) {
1002 kr = mach_port_deallocate(mach_task_self(), mp);
1003 DISPATCH_VERIFY_MIG(kr);
1004 dispatch_assume_zero(kr);
1005 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE, -1);
1006 DISPATCH_VERIFY_MIG(kr);
1007 dispatch_assume_zero(kr);
1008 }
1009 #endif
1010 }
1011
1012 dispatch_queue_t
1013 dispatch_get_concurrent_queue(long pri)
1014 {
1015 if (pri > 0) {
1016 pri = DISPATCH_QUEUE_PRIORITY_HIGH;
1017 } else if (pri < 0) {
1018 pri = DISPATCH_QUEUE_PRIORITY_LOW;
1019 }
1020 return _dispatch_get_root_queue(pri, false);
1021 }
1022
1023 static void
1024 _dispatch_queue_cleanup(void *ctxt)
1025 {
1026 if (ctxt == &_dispatch_main_q) {
1027 return _dispatch_queue_cleanup2();
1028 }
1029 // POSIX defines that destructors are only called if 'ctxt' is non-null
1030 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
1031 }
1032
1033 dispatch_queue_t
1034 dispatch_get_global_queue(long priority, unsigned long flags)
1035 {
1036 if (flags & ~DISPATCH_QUEUE_OVERCOMMIT) {
1037 return NULL;
1038 }
1039 return _dispatch_get_root_queue(priority, flags & DISPATCH_QUEUE_OVERCOMMIT);
1040 }
1041
1042 #define countof(x) (sizeof(x) / sizeof(x[0]))
1043 void
1044 libdispatch_init(void)
1045 {
1046 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT == 3);
1047 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 6);
1048
1049 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW == -DISPATCH_QUEUE_PRIORITY_HIGH);
1050 dispatch_assert(countof(_dispatch_root_queues) == DISPATCH_ROOT_QUEUE_COUNT);
1051 dispatch_assert(countof(_dispatch_thread_mediator) == DISPATCH_ROOT_QUEUE_COUNT);
1052 dispatch_assert(countof(_dispatch_root_queue_contexts) == DISPATCH_ROOT_QUEUE_COUNT);
1053
1054 _dispatch_thread_key_init_np(dispatch_queue_key, _dispatch_queue_cleanup);
1055 _dispatch_thread_key_init_np(dispatch_sema4_key, (void (*)(void *))dispatch_release); // use the extern release
1056 _dispatch_thread_key_init_np(dispatch_cache_key, _dispatch_cache_cleanup2);
1057 #if DISPATCH_PERF_MON
1058 _dispatch_thread_key_init_np(dispatch_bcounter_key, NULL);
1059 #endif
1060
1061 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_main_q);
1062
1063 _dispatch_queue_set_width_init();
1064 }
1065
1066 void
1067 _dispatch_queue_unlock(dispatch_queue_t dq)
1068 {
1069 if (slowpath(dispatch_atomic_dec(&dq->dq_running))) {
1070 return;
1071 }
1072
1073 _dispatch_wakeup(dq);
1074 }
1075
1076 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1077 dispatch_queue_t
1078 _dispatch_wakeup(dispatch_object_t dou)
1079 {
1080 dispatch_queue_t tq;
1081
1082 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
1083 return NULL;
1084 }
1085 if (!dx_probe(dou._do) && !dou._dq->dq_items_tail) {
1086 return NULL;
1087 }
1088
1089 if (!_dispatch_trylock(dou._do)) {
1090 #if DISPATCH_COCOA_COMPAT
1091 if (dou._dq == &_dispatch_main_q) {
1092 _dispatch_queue_wakeup_main();
1093 }
1094 #endif
1095 return NULL;
1096 }
1097 _dispatch_retain(dou._do);
1098 tq = dou._do->do_targetq;
1099 _dispatch_queue_push(tq, dou._do);
1100 return tq; // libdispatch doesn't need this, but the Instrument DTrace probe does
1101 }
1102
1103 #if DISPATCH_COCOA_COMPAT
1104 DISPATCH_NOINLINE
1105 void
1106 _dispatch_queue_wakeup_main(void)
1107 {
1108 kern_return_t kr;
1109
1110 dispatch_once_f(&_dispatch_main_q_port_pred, NULL, _dispatch_main_q_port_init);
1111
1112 kr = _dispatch_send_wakeup_main_thread(main_q_port, 0);
1113
1114 switch (kr) {
1115 case MACH_SEND_TIMEOUT:
1116 case MACH_SEND_TIMED_OUT:
1117 case MACH_SEND_INVALID_DEST:
1118 break;
1119 default:
1120 dispatch_assume_zero(kr);
1121 break;
1122 }
1123
1124 _dispatch_safe_fork = false;
1125 }
1126 #endif
1127
1128 static inline int
1129 _dispatch_rootq2wq_pri(long idx)
1130 {
1131 #ifdef WORKQ_DEFAULT_PRIOQUEUE
1132 switch (idx) {
1133 case 0:
1134 case 1:
1135 return WORKQ_LOW_PRIOQUEUE;
1136 case 2:
1137 case 3:
1138 default:
1139 return WORKQ_DEFAULT_PRIOQUEUE;
1140 case 4:
1141 case 5:
1142 return WORKQ_HIGH_PRIOQUEUE;
1143 }
1144 #else
1145 return pri;
1146 #endif
1147 }
1148
1149 static void
1150 _dispatch_root_queues_init(void *context __attribute__((unused)))
1151 {
1152 bool disable_wq = getenv("LIBDISPATCH_DISABLE_KWQ");
1153 pthread_workqueue_attr_t pwq_attr;
1154 kern_return_t kr;
1155 int i, r;
1156
1157 r = pthread_workqueue_attr_init_np(&pwq_attr);
1158 dispatch_assume_zero(r);
1159
1160 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
1161 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr, _dispatch_rootq2wq_pri(i));
1162 dispatch_assume_zero(r);
1163 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr, i & 1);
1164 dispatch_assume_zero(r);
1165 // some software hangs if the non-overcommitting queues do not overcommit when threads block
1166 #if 0
1167 if (!(i & 1)) {
1168 dispatch_root_queue_contexts[i].dgq_thread_pool_size = _dispatch_hw_config.cc_max_active;
1169 }
1170 #endif
1171
1172 r = 0;
1173 if (disable_wq || (r = pthread_workqueue_create_np(&_dispatch_root_queue_contexts[i].dgq_kworkqueue, &pwq_attr))) {
1174 if (r != ENOTSUP) {
1175 dispatch_assume_zero(r);
1176 }
1177 // override the default FIFO behavior for the pool semaphores
1178 kr = semaphore_create(mach_task_self(), &_dispatch_thread_mediator[i].dsema_port, SYNC_POLICY_LIFO, 0);
1179 DISPATCH_VERIFY_MIG(kr);
1180 dispatch_assume_zero(kr);
1181 dispatch_assume(_dispatch_thread_mediator[i].dsema_port);
1182 } else {
1183 dispatch_assume(_dispatch_root_queue_contexts[i].dgq_kworkqueue);
1184 }
1185 }
1186
1187 r = pthread_workqueue_attr_destroy_np(&pwq_attr);
1188 dispatch_assume_zero(r);
1189 }
1190
1191 bool
1192 _dispatch_queue_wakeup_global(dispatch_queue_t dq)
1193 {
1194 static dispatch_once_t pred;
1195 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
1196 pthread_workitem_handle_t wh;
1197 unsigned int gen_cnt;
1198 pthread_t pthr;
1199 int r, t_count;
1200
1201 if (!dq->dq_items_tail) {
1202 return false;
1203 }
1204
1205 _dispatch_safe_fork = false;
1206
1207 dispatch_debug_queue(dq, __PRETTY_FUNCTION__);
1208
1209 dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);
1210
1211 if (qc->dgq_kworkqueue) {
1212 if (dispatch_atomic_cmpxchg(&qc->dgq_pending, 0, 1)) {
1213 _dispatch_debug("requesting new worker thread");
1214
1215 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue, _dispatch_worker_thread2, dq, &wh, &gen_cnt);
1216 dispatch_assume_zero(r);
1217 } else {
1218 _dispatch_debug("work thread request still pending on global queue: %p", dq);
1219 }
1220 goto out;
1221 }
1222
1223 if (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
1224 goto out;
1225 }
1226
1227 do {
1228 t_count = qc->dgq_thread_pool_size;
1229 if (!t_count) {
1230 _dispatch_debug("The thread pool is full: %p", dq);
1231 goto out;
1232 }
1233 } while (!dispatch_atomic_cmpxchg(&qc->dgq_thread_pool_size, t_count, t_count - 1));
1234
1235 while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
1236 if (r != EAGAIN) {
1237 dispatch_assume_zero(r);
1238 }
1239 sleep(1);
1240 }
1241 r = pthread_detach(pthr);
1242 dispatch_assume_zero(r);
1243
1244 out:
1245 return false;
1246 }
1247
1248 void
1249 _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq)
1250 {
1251 #if DISPATCH_PERF_MON
1252 uint64_t start = mach_absolute_time();
1253 #endif
1254 _dispatch_queue_drain(dq);
1255 #if DISPATCH_PERF_MON
1256 _dispatch_queue_merge_stats(start);
1257 #endif
1258 _dispatch_force_cache_cleanup();
1259 }
1260
1261 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1262 DISPATCH_NOINLINE
1263 void
1264 _dispatch_queue_invoke(dispatch_queue_t dq)
1265 {
1266 dispatch_queue_t tq = dq->do_targetq;
1267
1268 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) && fastpath(_dispatch_queue_trylock(dq))) {
1269 _dispatch_queue_drain(dq);
1270 if (tq == dq->do_targetq) {
1271 tq = dx_invoke(dq);
1272 } else {
1273 tq = dq->do_targetq;
1274 }
1275 // We do not need to check the result.
1276 // When the suspend-count lock is dropped, then the check will happen.
1277 dispatch_atomic_dec(&dq->dq_running);
1278 if (tq) {
1279 return _dispatch_queue_push(tq, dq);
1280 }
1281 }
1282
1283 dq->do_next = DISPATCH_OBJECT_LISTLESS;
1284 if (dispatch_atomic_sub(&dq->do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK) == 0) {
1285 if (dq->dq_running == 0) {
1286 _dispatch_wakeup(dq); // verify that the queue is idle
1287 }
1288 }
1289 _dispatch_release(dq); // added when the queue is put on the list
1290 }
1291
1292 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1293 static void
1294 _dispatch_set_target_queue2(void *ctxt)
1295 {
1296 dispatch_queue_t prev_dq, dq = _dispatch_queue_get_current();
1297
1298 prev_dq = dq->do_targetq;
1299 dq->do_targetq = ctxt;
1300 _dispatch_release(prev_dq);
1301 }
1302
1303 void
1304 dispatch_set_target_queue(dispatch_object_t dou, dispatch_queue_t dq)
1305 {
1306 if (slowpath(dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
1307 return;
1308 }
1309 // NOTE: we test for NULL target queues internally to detect root queues
1310 // therefore, if the retain crashes due to a bad input, that is OK
1311 _dispatch_retain(dq);
1312 dispatch_barrier_async_f(dou._dq, dq, _dispatch_set_target_queue2);
1313 }
1314
1315 static void
1316 _dispatch_async_f_redirect2(void *_ctxt)
1317 {
1318 struct dispatch_continuation_s *dc = _ctxt;
1319 struct dispatch_continuation_s *other_dc = dc->dc_data[1];
1320 dispatch_queue_t old_dq, dq = dc->dc_data[0];
1321
1322 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1323 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1324 _dispatch_continuation_pop(other_dc);
1325 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1326
1327 if (dispatch_atomic_sub(&dq->dq_running, 2) == 0) {
1328 _dispatch_wakeup(dq);
1329 }
1330 _dispatch_release(dq);
1331 }
1332
1333 static void
1334 _dispatch_async_f_redirect(dispatch_queue_t dq, struct dispatch_object_s *other_dc)
1335 {
1336 dispatch_continuation_t dc = (void *)other_dc;
1337 dispatch_queue_t root_dq = dq;
1338
1339 if (dc->dc_func == _dispatch_sync_f_slow2) {
1340 return dc->dc_func(dc->dc_ctxt);
1341 }
1342
1343 dispatch_atomic_add(&dq->dq_running, 2);
1344 _dispatch_retain(dq);
1345
1346 dc = _dispatch_continuation_alloc_cacheonly() ?: _dispatch_continuation_alloc_from_heap();
1347
1348 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1349 dc->dc_func = _dispatch_async_f_redirect2;
1350 dc->dc_ctxt = dc;
1351 dc->dc_data[0] = dq;
1352 dc->dc_data[1] = other_dc;
1353
1354 do {
1355 root_dq = root_dq->do_targetq;
1356 } while (root_dq->do_targetq);
1357
1358 _dispatch_queue_push(root_dq, dc);
1359 }
1360
1361
1362 void
1363 _dispatch_queue_drain(dispatch_queue_t dq)
1364 {
1365 dispatch_queue_t orig_tq, old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1366 struct dispatch_object_s *dc = NULL, *next_dc = NULL;
1367
1368 orig_tq = dq->do_targetq;
1369
1370 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1371
1372 while (dq->dq_items_tail) {
1373 while (!fastpath(dq->dq_items_head)) {
1374 _dispatch_hardware_pause();
1375 }
1376
1377 dc = dq->dq_items_head;
1378 dq->dq_items_head = NULL;
1379
1380 do {
1381 // Enqueue is TIGHTLY controlled, we won't wait long.
1382 do {
1383 next_dc = fastpath(dc->do_next);
1384 } while (!next_dc && !dispatch_atomic_cmpxchg(&dq->dq_items_tail, dc, NULL));
1385 if (DISPATCH_OBJECT_SUSPENDED(dq)) {
1386 goto out;
1387 }
1388 if (dq->dq_running > dq->dq_width) {
1389 goto out;
1390 }
1391 if (orig_tq != dq->do_targetq) {
1392 goto out;
1393 }
1394 if (fastpath(dq->dq_width == 1)) {
1395 _dispatch_continuation_pop(dc);
1396 _dispatch_workitem_inc();
1397 } else if ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
1398 if (dq->dq_running > 1) {
1399 goto out;
1400 }
1401 _dispatch_continuation_pop(dc);
1402 _dispatch_workitem_inc();
1403 } else {
1404 _dispatch_async_f_redirect(dq, dc);
1405 }
1406 } while ((dc = next_dc));
1407 }
1408
1409 out:
1410 // if this is not a complete drain, we must undo some things
1411 if (slowpath(dc)) {
1412 // 'dc' must NOT be "popped"
1413 // 'dc' might be the last item
1414 if (next_dc || dispatch_atomic_cmpxchg(&dq->dq_items_tail, NULL, dc)) {
1415 dq->dq_items_head = dc;
1416 } else {
1417 while (!(next_dc = dq->dq_items_head)) {
1418 _dispatch_hardware_pause();
1419 }
1420 dq->dq_items_head = dc;
1421 dc->do_next = next_dc;
1422 }
1423 }
1424
1425 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1426 }
1427
1428 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1429 void *
1430 _dispatch_worker_thread(void *context)
1431 {
1432 dispatch_queue_t dq = context;
1433 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
1434 sigset_t mask;
1435 int r;
1436
1437 // workaround tweaks the kernel workqueue does for us
1438 r = sigfillset(&mask);
1439 dispatch_assume_zero(r);
1440 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
1441 dispatch_assume_zero(r);
1442
1443 do {
1444 _dispatch_worker_thread2(context);
1445 // we use 65 seconds in case there are any timers that run once a minute
1446 } while (dispatch_semaphore_wait(qc->dgq_thread_mediator, dispatch_time(0, 65ull * NSEC_PER_SEC)) == 0);
1447
1448 dispatch_atomic_inc(&qc->dgq_thread_pool_size);
1449 if (dq->dq_items_tail) {
1450 _dispatch_queue_wakeup_global(dq);
1451 }
1452
1453 return NULL;
1454 }
1455
1456 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1457 void
1458 _dispatch_worker_thread2(void *context)
1459 {
1460 struct dispatch_object_s *item;
1461 dispatch_queue_t dq = context;
1462 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
1463
1464 if (_dispatch_thread_getspecific(dispatch_queue_key)) {
1465 DISPATCH_CRASH("Premature thread recycling");
1466 }
1467
1468 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1469 qc->dgq_pending = 0;
1470
1471 #if DISPATCH_COCOA_COMPAT
1472 // ensure that high-level memory management techniques do not leak/crash
1473 if (dispatch_begin_thread_4GC) {
1474 dispatch_begin_thread_4GC();
1475 }
1476 void *pool = _dispatch_begin_NSAutoReleasePool();
1477 #endif
1478
1479 #if DISPATCH_PERF_MON
1480 uint64_t start = mach_absolute_time();
1481 #endif
1482 while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
1483 _dispatch_continuation_pop(item);
1484 }
1485 #if DISPATCH_PERF_MON
1486 _dispatch_queue_merge_stats(start);
1487 #endif
1488
1489 #if DISPATCH_COCOA_COMPAT
1490 _dispatch_end_NSAutoReleasePool(pool);
1491 dispatch_end_thread_4GC();
1492 #endif
1493
1494 _dispatch_thread_setspecific(dispatch_queue_key, NULL);
1495
1496 _dispatch_force_cache_cleanup();
1497 }
1498
1499 #if DISPATCH_PERF_MON
1500 void
1501 _dispatch_queue_merge_stats(uint64_t start)
1502 {
1503 uint64_t avg, delta = mach_absolute_time() - start;
1504 unsigned long count, bucket;
1505
1506 count = (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key);
1507 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
1508
1509 if (count) {
1510 avg = delta / count;
1511 bucket = flsll(avg);
1512 } else {
1513 bucket = 0;
1514 }
1515
1516 // 64-bit counters on 32-bit require a lock or a queue
1517 OSSpinLockLock(&_dispatch_stats_lock);
1518
1519 _dispatch_stats[bucket].time_total += delta;
1520 _dispatch_stats[bucket].count_total += count;
1521 _dispatch_stats[bucket].thread_total++;
1522
1523 OSSpinLockUnlock(&_dispatch_stats_lock);
1524 }
1525 #endif
1526
1527 size_t
1528 dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
1529 {
1530 return snprintf(buf, bufsiz, "parent = %p ", dq->do_targetq);
1531 }
1532
1533 size_t
1534 dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
1535 {
1536 size_t offset = 0;
1537 offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", dq->dq_label, dq);
1538 offset += dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
1539 offset += dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
1540 offset += snprintf(&buf[offset], bufsiz - offset, "}");
1541 return offset;
1542 }
1543
1544 #if DISPATCH_DEBUG
1545 void
1546 dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
1547 if (fastpath(dq)) {
1548 dispatch_debug(dq, "%s", str);
1549 } else {
1550 _dispatch_log("queue[NULL]: %s", str);
1551 }
1552 }
1553 #endif
1554
1555 #if DISPATCH_COCOA_COMPAT
1556 void
1557 _dispatch_main_queue_callback_4CF(mach_msg_header_t *msg __attribute__((unused)))
1558 {
1559 if (main_q_is_draining) {
1560 return;
1561 }
1562 _dispatch_queue_set_mainq_drain_state(true);
1563 _dispatch_queue_serial_drain_till_empty(&_dispatch_main_q);
1564 _dispatch_queue_set_mainq_drain_state(false);
1565 }
1566
1567 mach_port_t
1568 _dispatch_get_main_queue_port_4CF(void)
1569 {
1570 dispatch_once_f(&_dispatch_main_q_port_pred, NULL, _dispatch_main_q_port_init);
1571 return main_q_port;
1572 }
1573 #endif
1574
1575 static void
1576 dispatch_queue_attr_dispose(dispatch_queue_attr_t attr)
1577 {
1578 dispatch_queue_attr_set_finalizer(attr, NULL);
1579 _dispatch_dispose(attr);
1580 }
1581
1582 static const struct dispatch_queue_attr_vtable_s dispatch_queue_attr_vtable = {
1583 .do_type = DISPATCH_QUEUE_ATTR_TYPE,
1584 .do_kind = "queue-attr",
1585 .do_dispose = dispatch_queue_attr_dispose,
1586 };
1587
1588 dispatch_queue_attr_t
1589 dispatch_queue_attr_create(void)
1590 {
1591 dispatch_queue_attr_t a = calloc(1, sizeof(struct dispatch_queue_attr_s));
1592
1593 if (a) {
1594 a->do_vtable = &dispatch_queue_attr_vtable;
1595 a->do_next = DISPATCH_OBJECT_LISTLESS;
1596 a->do_ref_cnt = 1;
1597 a->do_xref_cnt = 1;
1598 a->do_targetq = _dispatch_get_root_queue(0, 0);
1599 a->qa_flags = DISPATCH_QUEUE_OVERCOMMIT;
1600 }
1601 return a;
1602 }
1603
1604 void
1605 dispatch_queue_attr_set_flags(dispatch_queue_attr_t attr, uint64_t flags)
1606 {
1607 dispatch_assert_zero(flags & ~DISPATCH_QUEUE_FLAGS_MASK);
1608 attr->qa_flags = (unsigned long)flags & DISPATCH_QUEUE_FLAGS_MASK;
1609 }
1610
1611 void
1612 dispatch_queue_attr_set_priority(dispatch_queue_attr_t attr, int priority)
1613 {
1614 dispatch_debug_assert(attr, "NULL pointer");
1615 dispatch_debug_assert(priority <= DISPATCH_QUEUE_PRIORITY_HIGH && priority >= DISPATCH_QUEUE_PRIORITY_LOW, "Invalid priority");
1616
1617 if (priority > 0) {
1618 priority = DISPATCH_QUEUE_PRIORITY_HIGH;
1619 } else if (priority < 0) {
1620 priority = DISPATCH_QUEUE_PRIORITY_LOW;
1621 }
1622
1623 attr->qa_priority = priority;
1624 }
1625
1626 void
1627 dispatch_queue_attr_set_finalizer_f(dispatch_queue_attr_t attr,
1628 void *context, dispatch_queue_finalizer_function_t finalizer)
1629 {
1630 #ifdef __BLOCKS__
1631 if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
1632 Block_release(attr->finalizer_ctxt);
1633 }
1634 #endif
1635 attr->finalizer_ctxt = context;
1636 attr->finalizer_func = finalizer;
1637 }
1638
1639 #ifdef __BLOCKS__
1640 long
1641 dispatch_queue_attr_set_finalizer(dispatch_queue_attr_t attr,
1642 dispatch_queue_finalizer_t finalizer)
1643 {
1644 void *ctxt;
1645 dispatch_queue_finalizer_function_t func;
1646
1647 if (finalizer) {
1648 if (!(ctxt = Block_copy(finalizer))) {
1649 return 1;
1650 }
1651 func = (void *)_dispatch_call_block_and_release2;
1652 } else {
1653 ctxt = NULL;
1654 func = NULL;
1655 }
1656
1657 dispatch_queue_attr_set_finalizer_f(attr, ctxt, func);
1658
1659 return 0;
1660 }
1661 #endif
1662
1663 static void
1664 _dispatch_ccache_init(void *context __attribute__((unused)))
1665 {
1666 _dispatch_ccache_zone = malloc_create_zone(0, 0);
1667 dispatch_assert(_dispatch_ccache_zone);
1668 malloc_set_zone_name(_dispatch_ccache_zone, "DispatchContinuations");
1669 }
1670
1671 dispatch_continuation_t
1672 _dispatch_continuation_alloc_from_heap(void)
1673 {
1674 static dispatch_once_t pred;
1675 dispatch_continuation_t dc;
1676
1677 dispatch_once_f(&pred, NULL, _dispatch_ccache_init);
1678
1679 while (!(dc = fastpath(malloc_zone_calloc(_dispatch_ccache_zone, 1, ROUND_UP_TO_CACHELINE_SIZE(sizeof(*dc)))))) {
1680 sleep(1);
1681 }
1682
1683 return dc;
1684 }
1685
1686 void
1687 _dispatch_force_cache_cleanup(void)
1688 {
1689 dispatch_continuation_t dc = _dispatch_thread_getspecific(dispatch_cache_key);
1690 if (dc) {
1691 _dispatch_thread_setspecific(dispatch_cache_key, NULL);
1692 _dispatch_cache_cleanup2(dc);
1693 }
1694 }
1695
1696 DISPATCH_NOINLINE
1697 void
1698 _dispatch_cache_cleanup2(void *value)
1699 {
1700 dispatch_continuation_t dc, next_dc = value;
1701
1702 while ((dc = next_dc)) {
1703 next_dc = dc->do_next;
1704 malloc_zone_free(_dispatch_ccache_zone, dc);
1705 }
1706 }
1707
1708 static char _dispatch_build[16];
1709
1710 static void
1711 _dispatch_bug_init(void *context __attribute__((unused)))
1712 {
1713 int mib[] = { CTL_KERN, KERN_OSVERSION };
1714 size_t bufsz = sizeof(_dispatch_build);
1715
1716 sysctl(mib, 2, _dispatch_build, &bufsz, NULL, 0);
1717 }
1718
1719 void
1720 _dispatch_bug(size_t line, long val)
1721 {
1722 static dispatch_once_t pred;
1723 static void *last_seen;
1724 void *ra = __builtin_return_address(0);
1725
1726 dispatch_once_f(&pred, NULL, _dispatch_bug_init);
1727 if (last_seen != ra) {
1728 last_seen = ra;
1729 _dispatch_log("BUG in libdispatch: %s - %lu - 0x%lx", _dispatch_build, line, val);
1730 }
1731 }
1732
1733 void
1734 _dispatch_abort(size_t line, long val)
1735 {
1736 _dispatch_bug(line, val);
1737 abort();
1738 }
1739
1740 void
1741 _dispatch_log(const char *msg, ...)
1742 {
1743 va_list ap;
1744
1745 va_start(ap, msg);
1746
1747 _dispatch_logv(msg, ap);
1748
1749 va_end(ap);
1750 }
1751
1752 void
1753 _dispatch_logv(const char *msg, va_list ap)
1754 {
1755 #if DISPATCH_DEBUG
1756 static FILE *logfile, *tmp;
1757 char newbuf[strlen(msg) + 2];
1758 char path[PATH_MAX];
1759
1760 sprintf(newbuf, "%s\n", msg);
1761
1762 if (!logfile) {
1763 snprintf(path, sizeof(path), "/var/tmp/libdispatch.%d.log", getpid());
1764 tmp = fopen(path, "a");
1765 assert(tmp);
1766 if (!dispatch_atomic_cmpxchg(&logfile, NULL, tmp)) {
1767 fclose(tmp);
1768 } else {
1769 struct timeval tv;
1770 gettimeofday(&tv, NULL);
1771 fprintf(logfile, "=== log file opened for %s[%u] at %ld.%06u ===\n",
1772 getprogname() ?: "", getpid(), tv.tv_sec, tv.tv_usec);
1773 }
1774 }
1775 vfprintf(logfile, newbuf, ap);
1776 fflush(logfile);
1777 #else
1778 vsyslog(LOG_NOTICE, msg, ap);
1779 #endif
1780 }
1781
1782 int
1783 _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset)
1784 {
1785 int r;
1786
1787 /* Workaround: 6269619 Not all signals can be delivered on any thread */
1788
1789 r = sigdelset(set, SIGILL);
1790 dispatch_assume_zero(r);
1791 r = sigdelset(set, SIGTRAP);
1792 dispatch_assume_zero(r);
1793 r = sigdelset(set, SIGEMT);
1794 dispatch_assume_zero(r);
1795 r = sigdelset(set, SIGFPE);
1796 dispatch_assume_zero(r);
1797 r = sigdelset(set, SIGBUS);
1798 dispatch_assume_zero(r);
1799 r = sigdelset(set, SIGSEGV);
1800 dispatch_assume_zero(r);
1801 r = sigdelset(set, SIGSYS);
1802 dispatch_assume_zero(r);
1803 r = sigdelset(set, SIGPIPE);
1804 dispatch_assume_zero(r);
1805
1806 return pthread_sigmask(how, set, oset);
1807 }
1808
1809 bool _dispatch_safe_fork = true;
1810
1811 void
1812 dispatch_atfork_prepare(void)
1813 {
1814 }
1815
1816 void
1817 dispatch_atfork_parent(void)
1818 {
1819 }
1820
1821 void
1822 dispatch_atfork_child(void)
1823 {
1824 void *crash = (void *)0x100;
1825 size_t i;
1826
1827 if (_dispatch_safe_fork) {
1828 return;
1829 }
1830
1831 _dispatch_main_q.dq_items_head = crash;
1832 _dispatch_main_q.dq_items_tail = crash;
1833
1834 _dispatch_mgr_q.dq_items_head = crash;
1835 _dispatch_mgr_q.dq_items_tail = crash;
1836
1837 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
1838 _dispatch_root_queues[i].dq_items_head = crash;
1839 _dispatch_root_queues[i].dq_items_tail = crash;
1840 }
1841 }
1842
1843 void
1844 dispatch_init_pthread(pthread_t pthr __attribute__((unused)))
1845 {
1846 }
1847
1848 static int _dispatch_kq;
1849
1850 static void
1851 _dispatch_get_kq_init(void *context __attribute__((unused)))
1852 {
1853 static const struct kevent kev = {
1854 .ident = 1,
1855 .filter = EVFILT_USER,
1856 .flags = EV_ADD|EV_CLEAR,
1857 };
1858
1859 _dispatch_kq = kqueue();
1860 _dispatch_safe_fork = false;
1861 // in case we fall back to select()
1862 FD_SET(_dispatch_kq, &_dispatch_rfds);
1863
1864 if (_dispatch_kq == -1) {
1865 dispatch_assert_zero(errno);
1866 }
1867
1868 dispatch_assume_zero(kevent(_dispatch_kq, &kev, 1, NULL, 0, NULL));
1869
1870 _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q);
1871 }
1872
1873 static int
1874 _dispatch_get_kq(void)
1875 {
1876 static dispatch_once_t pred;
1877
1878 dispatch_once_f(&pred, NULL, _dispatch_get_kq_init);
1879
1880 return _dispatch_kq;
1881 }
1882
1883 static void
1884 _dispatch_mgr_thread2(struct kevent *kev, size_t cnt)
1885 {
1886 size_t i;
1887
1888 for (i = 0; i < cnt; i++) {
1889 // EVFILT_USER isn't used by sources
1890 if (kev[i].filter == EVFILT_USER) {
1891 // If _dispatch_mgr_thread2() ever is changed to return to the
1892 // caller, then this should become _dispatch_queue_drain()
1893 _dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q);
1894 } else {
1895 _dispatch_source_drain_kevent(&kev[i]);
1896 }
1897 }
1898 }
1899
1900 static dispatch_queue_t
1901 _dispatch_mgr_invoke(dispatch_queue_t dq)
1902 {
1903 static const struct timespec timeout_immediately = { 0, 0 };
1904 struct timespec timeout;
1905 const struct timespec *timeoutp;
1906 struct timeval sel_timeout, *sel_timeoutp;
1907 fd_set tmp_rfds, tmp_wfds;
1908 struct kevent kev[1];
1909 int k_cnt, k_err, i, r;
1910
1911 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1912
1913 for (;;) {
1914 _dispatch_run_timers();
1915
1916 timeoutp = _dispatch_get_next_timer_fire(&timeout);
1917
1918 if (_dispatch_select_workaround) {
1919 FD_COPY(&_dispatch_rfds, &tmp_rfds);
1920 FD_COPY(&_dispatch_wfds, &tmp_wfds);
1921 if (timeoutp) {
1922 sel_timeout.tv_sec = timeoutp->tv_sec;
1923 sel_timeout.tv_usec = (typeof(sel_timeout.tv_usec))(timeoutp->tv_nsec / 1000u);
1924 sel_timeoutp = &sel_timeout;
1925 } else {
1926 sel_timeoutp = NULL;
1927 }
1928
1929 r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, sel_timeoutp);
1930 if (r == -1) {
1931 if (errno != EBADF) {
1932 dispatch_assume_zero(errno);
1933 continue;
1934 }
1935 for (i = 0; i < FD_SETSIZE; i++) {
1936 if (i == _dispatch_kq) {
1937 continue;
1938 }
1939 if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)) {
1940 continue;
1941 }
1942 r = dup(i);
1943 if (r != -1) {
1944 close(r);
1945 } else {
1946 FD_CLR(i, &_dispatch_rfds);
1947 FD_CLR(i, &_dispatch_wfds);
1948 _dispatch_rfd_ptrs[i] = 0;
1949 _dispatch_wfd_ptrs[i] = 0;
1950 }
1951 }
1952 continue;
1953 }
1954
1955 if (r > 0) {
1956 for (i = 0; i < FD_SETSIZE; i++) {
1957 if (i == _dispatch_kq) {
1958 continue;
1959 }
1960 if (FD_ISSET(i, &tmp_rfds)) {
1961 FD_CLR(i, &_dispatch_rfds); // emulate EV_DISABLE
1962 EV_SET(&kev[0], i, EVFILT_READ, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_rfd_ptrs[i]);
1963 _dispatch_rfd_ptrs[i] = 0;
1964 _dispatch_mgr_thread2(kev, 1);
1965 }
1966 if (FD_ISSET(i, &tmp_wfds)) {
1967 FD_CLR(i, &_dispatch_wfds); // emulate EV_DISABLE
1968 EV_SET(&kev[0], i, EVFILT_WRITE, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_wfd_ptrs[i]);
1969 _dispatch_wfd_ptrs[i] = 0;
1970 _dispatch_mgr_thread2(kev, 1);
1971 }
1972 }
1973 }
1974
1975 timeoutp = &timeout_immediately;
1976 }
1977
1978 k_cnt = kevent(_dispatch_kq, NULL, 0, kev, sizeof(kev) / sizeof(kev[0]), timeoutp);
1979 k_err = errno;
1980
1981 switch (k_cnt) {
1982 case -1:
1983 if (k_err == EBADF) {
1984 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
1985 }
1986 dispatch_assume_zero(k_err);
1987 continue;
1988 default:
1989 _dispatch_mgr_thread2(kev, (size_t)k_cnt);
1990 // fall through
1991 case 0:
1992 _dispatch_force_cache_cleanup();
1993 continue;
1994 }
1995 }
1996
1997 return NULL;
1998 }
1999
2000 static bool
2001 _dispatch_mgr_wakeup(dispatch_queue_t dq)
2002 {
2003 static const struct kevent kev = {
2004 .ident = 1,
2005 .filter = EVFILT_USER,
2006 #ifdef EV_TRIGGER
2007 .flags = EV_TRIGGER,
2008 #endif
2009 #ifdef NOTE_TRIGGER
2010 .fflags = NOTE_TRIGGER,
2011 #endif
2012 };
2013
2014 _dispatch_debug("waking up the _dispatch_mgr_q: %p", dq);
2015
2016 _dispatch_update_kq(&kev);
2017
2018 return false;
2019 }
2020
2021 void
2022 _dispatch_update_kq(const struct kevent *kev)
2023 {
2024 struct kevent kev_copy = *kev;
2025 kev_copy.flags |= EV_RECEIPT;
2026
2027 if (kev_copy.flags & EV_DELETE) {
2028 switch (kev_copy.filter) {
2029 case EVFILT_READ:
2030 if (FD_ISSET((int)kev_copy.ident, &_dispatch_rfds)) {
2031 FD_CLR((int)kev_copy.ident, &_dispatch_rfds);
2032 _dispatch_rfd_ptrs[kev_copy.ident] = 0;
2033 return;
2034 }
2035 case EVFILT_WRITE:
2036 if (FD_ISSET((int)kev_copy.ident, &_dispatch_wfds)) {
2037 FD_CLR((int)kev_copy.ident, &_dispatch_wfds);
2038 _dispatch_wfd_ptrs[kev_copy.ident] = 0;
2039 return;
2040 }
2041 default:
2042 break;
2043 }
2044 }
2045
2046 int rval = kevent(_dispatch_get_kq(), &kev_copy, 1, &kev_copy, 1, NULL);
2047 if (rval == -1) {
2048 // If we fail to register with kevents, for other reasons aside from
2049 // changelist elements.
2050 dispatch_assume_zero(errno);
2051 //kev_copy.flags |= EV_ERROR;
2052 //kev_copy.data = error;
2053 return;
2054 }
2055
2056 // The following select workaround only applies to adding kevents
2057 if (!(kev->flags & EV_ADD)) {
2058 return;
2059 }
2060
2061 switch (kev_copy.data) {
2062 case 0:
2063 return;
2064 case EBADF:
2065 break;
2066 default:
2067 // If an error occurred while registering with kevent, and it was
2068 // because of a kevent changelist processing && the kevent involved
2069 // either doing a read or write, it would indicate we were trying
2070 // to register a /dev/* port; fall back to select
2071 switch (kev_copy.filter) {
2072 case EVFILT_READ:
2073 _dispatch_select_workaround = true;
2074 FD_SET((int)kev_copy.ident, &_dispatch_rfds);
2075 _dispatch_rfd_ptrs[kev_copy.ident] = kev_copy.udata;
2076 break;
2077 case EVFILT_WRITE:
2078 _dispatch_select_workaround = true;
2079 FD_SET((int)kev_copy.ident, &_dispatch_wfds);
2080 _dispatch_wfd_ptrs[kev_copy.ident] = kev_copy.udata;
2081 break;
2082 default:
2083 _dispatch_source_drain_kevent(&kev_copy);
2084 break;
2085 }
2086 break;
2087 }
2088 }
2089
2090 static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = {
2091 .do_type = DISPATCH_QUEUE_MGR_TYPE,
2092 .do_kind = "mgr-queue",
2093 .do_invoke = _dispatch_mgr_invoke,
2094 .do_debug = dispatch_queue_debug,
2095 .do_probe = _dispatch_mgr_wakeup,
2096 };
2097
2098 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
2099 struct dispatch_queue_s _dispatch_mgr_q = {
2100 .do_vtable = &_dispatch_queue_mgr_vtable,
2101 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
2102 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
2103 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
2104 .do_targetq = &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_COUNT - 1],
2105
2106 .dq_label = "com.apple.libdispatch-manager",
2107 .dq_width = 1,
2108 .dq_serialnum = 2,
2109 };
2110
2111 const struct dispatch_queue_offsets_s dispatch_queue_offsets = {
2112 .dqo_version = 3,
2113 .dqo_label = offsetof(struct dispatch_queue_s, dq_label),
2114 .dqo_label_size = sizeof(_dispatch_main_q.dq_label),
2115 .dqo_flags = 0,
2116 .dqo_flags_size = 0,
2117 .dqo_width = offsetof(struct dispatch_queue_s, dq_width),
2118 .dqo_width_size = sizeof(_dispatch_main_q.dq_width),
2119 .dqo_serialnum = offsetof(struct dispatch_queue_s, dq_serialnum),
2120 .dqo_serialnum_size = sizeof(_dispatch_main_q.dq_serialnum),
2121 .dqo_running = offsetof(struct dispatch_queue_s, dq_running),
2122 .dqo_running_size = sizeof(_dispatch_main_q.dq_running),
2123 };
2124
2125 #ifdef __BLOCKS__
2126 void
2127 dispatch_after(dispatch_time_t when, dispatch_queue_t queue, dispatch_block_t work)
2128 {
2129 // test before the copy of the block
2130 if (when == DISPATCH_TIME_FOREVER) {
2131 #if DISPATCH_DEBUG
2132 DISPATCH_CLIENT_CRASH("dispatch_after() called with 'when' == infinity");
2133 #endif
2134 return;
2135 }
2136 dispatch_after_f(when, queue, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
2137 }
2138 #endif
2139
2140 DISPATCH_NOINLINE
2141 void
2142 dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt, void (*func)(void *))
2143 {
2144 uint64_t delta;
2145 if (when == DISPATCH_TIME_FOREVER) {
2146 #if DISPATCH_DEBUG
2147 DISPATCH_CLIENT_CRASH("dispatch_after_f() called with 'when' == infinity");
2148 #endif
2149 return;
2150 }
2151
2152 // this function can and should be optimized to not use a dispatch source
2153 again:
2154 delta = _dispatch_timeout(when);
2155 if (delta == 0) {
2156 return dispatch_async_f(queue, ctxt, func);
2157 }
2158 if (!dispatch_source_timer_create(DISPATCH_TIMER_INTERVAL, delta, 0, NULL, queue, ^(dispatch_source_t ds) {
2159 long err_dom, err_val;
2160 if ((err_dom = dispatch_source_get_error(ds, &err_val))) {
2161 dispatch_assert(err_dom == DISPATCH_ERROR_DOMAIN_POSIX);
2162 dispatch_assert(err_val == ECANCELED);
2163 func(ctxt);
2164 dispatch_release(ds); // MUST NOT be _dispatch_release()
2165 } else {
2166 dispatch_source_cancel(ds);
2167 }
2168 })) {
2169 goto again;
2170 }
2171 }