]> git.saurik.com Git - apple/libdispatch.git/blob - src/queue.c
libdispatch-228.18.tar.gz
[apple/libdispatch.git] / src / queue.c
1 /*
2 * Copyright (c) 2008-2011 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #endif
25
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
29 #endif
30 #if DISPATCH_ENABLE_THREAD_POOL && !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
31 #define pthread_workqueue_t void*
32 #endif
33
34 static void _dispatch_cache_cleanup(void *value);
35 static void _dispatch_async_f_redirect(dispatch_queue_t dq,
36 dispatch_continuation_t dc);
37 static void _dispatch_queue_cleanup(void *ctxt);
38 static inline void _dispatch_queue_wakeup_global2(dispatch_queue_t dq,
39 unsigned int n);
40 static inline void _dispatch_queue_wakeup_global(dispatch_queue_t dq);
41 static _dispatch_thread_semaphore_t _dispatch_queue_drain(dispatch_queue_t dq);
42 static inline _dispatch_thread_semaphore_t
43 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq);
44 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
45 static void _dispatch_worker_thread3(void *context);
46 #endif
47 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
48 static void _dispatch_worker_thread2(int priority, int options, void *context);
49 #endif
50 #if DISPATCH_ENABLE_THREAD_POOL
51 static void *_dispatch_worker_thread(void *context);
52 static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
53 #endif
54
55 #if DISPATCH_COCOA_COMPAT
56 static unsigned int _dispatch_worker_threads;
57 static dispatch_once_t _dispatch_main_q_port_pred;
58 static mach_port_t main_q_port;
59
60 static void _dispatch_main_q_port_init(void *ctxt);
61 static dispatch_queue_t _dispatch_queue_wakeup_main(void);
62 static void _dispatch_main_queue_drain(void);
63 #endif
64
65 #pragma mark -
66 #pragma mark dispatch_root_queue
67
68 #if DISPATCH_ENABLE_THREAD_POOL
69 static struct dispatch_semaphore_s _dispatch_thread_mediator[] = {
70 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
71 .do_vtable = DISPATCH_VTABLE(semaphore),
72 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
73 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
74 },
75 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
76 .do_vtable = DISPATCH_VTABLE(semaphore),
77 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
78 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
79 },
80 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
81 .do_vtable = DISPATCH_VTABLE(semaphore),
82 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
83 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
84 },
85 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
86 .do_vtable = DISPATCH_VTABLE(semaphore),
87 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
88 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
89 },
90 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
91 .do_vtable = DISPATCH_VTABLE(semaphore),
92 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
93 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
94 },
95 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
96 .do_vtable = DISPATCH_VTABLE(semaphore),
97 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
98 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
99 },
100 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
101 .do_vtable = DISPATCH_VTABLE(semaphore),
102 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
103 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
104 },
105 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
106 .do_vtable = DISPATCH_VTABLE(semaphore),
107 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
108 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
109 },
110 };
111 #endif
112
113 #define MAX_THREAD_COUNT 255
114
115 struct dispatch_root_queue_context_s {
116 union {
117 struct {
118 unsigned int volatile dgq_pending;
119 #if HAVE_PTHREAD_WORKQUEUES
120 int dgq_wq_priority, dgq_wq_options;
121 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
122 pthread_workqueue_t dgq_kworkqueue;
123 #endif
124 #endif // HAVE_PTHREAD_WORKQUEUES
125 #if DISPATCH_ENABLE_THREAD_POOL
126 dispatch_semaphore_t dgq_thread_mediator;
127 uint32_t dgq_thread_pool_size;
128 #endif
129 };
130 char _dgq_pad[DISPATCH_CACHELINE_SIZE];
131 };
132 };
133
134 DISPATCH_CACHELINE_ALIGN
135 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
136 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {{{
137 #if HAVE_PTHREAD_WORKQUEUES
138 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
139 .dgq_wq_options = 0,
140 #endif
141 #if DISPATCH_ENABLE_THREAD_POOL
142 .dgq_thread_mediator = &_dispatch_thread_mediator[
143 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
144 .dgq_thread_pool_size = MAX_THREAD_COUNT,
145 #endif
146 }}},
147 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {{{
148 #if HAVE_PTHREAD_WORKQUEUES
149 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
150 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
151 #endif
152 #if DISPATCH_ENABLE_THREAD_POOL
153 .dgq_thread_mediator = &_dispatch_thread_mediator[
154 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
155 .dgq_thread_pool_size = MAX_THREAD_COUNT,
156 #endif
157 }}},
158 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {{{
159 #if HAVE_PTHREAD_WORKQUEUES
160 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
161 .dgq_wq_options = 0,
162 #endif
163 #if DISPATCH_ENABLE_THREAD_POOL
164 .dgq_thread_mediator = &_dispatch_thread_mediator[
165 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
166 .dgq_thread_pool_size = MAX_THREAD_COUNT,
167 #endif
168 }}},
169 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {{{
170 #if HAVE_PTHREAD_WORKQUEUES
171 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
172 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
173 #endif
174 #if DISPATCH_ENABLE_THREAD_POOL
175 .dgq_thread_mediator = &_dispatch_thread_mediator[
176 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
177 .dgq_thread_pool_size = MAX_THREAD_COUNT,
178 #endif
179 }}},
180 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {{{
181 #if HAVE_PTHREAD_WORKQUEUES
182 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
183 .dgq_wq_options = 0,
184 #endif
185 #if DISPATCH_ENABLE_THREAD_POOL
186 .dgq_thread_mediator = &_dispatch_thread_mediator[
187 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
188 .dgq_thread_pool_size = MAX_THREAD_COUNT,
189 #endif
190 }}},
191 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {{{
192 #if HAVE_PTHREAD_WORKQUEUES
193 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
194 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
195 #endif
196 #if DISPATCH_ENABLE_THREAD_POOL
197 .dgq_thread_mediator = &_dispatch_thread_mediator[
198 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
199 .dgq_thread_pool_size = MAX_THREAD_COUNT,
200 #endif
201 }}},
202 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {{{
203 #if HAVE_PTHREAD_WORKQUEUES
204 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
205 .dgq_wq_options = 0,
206 #endif
207 #if DISPATCH_ENABLE_THREAD_POOL
208 .dgq_thread_mediator = &_dispatch_thread_mediator[
209 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
210 .dgq_thread_pool_size = MAX_THREAD_COUNT,
211 #endif
212 }}},
213 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {{{
214 #if HAVE_PTHREAD_WORKQUEUES
215 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
216 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
217 #endif
218 #if DISPATCH_ENABLE_THREAD_POOL
219 .dgq_thread_mediator = &_dispatch_thread_mediator[
220 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
221 .dgq_thread_pool_size = MAX_THREAD_COUNT,
222 #endif
223 }}},
224 };
225
226 // 6618342 Contact the team that owns the Instrument DTrace probe before
227 // renaming this symbol
228 // dq_running is set to 2 so that barrier operations go through the slow path
229 DISPATCH_CACHELINE_ALIGN
230 struct dispatch_queue_s _dispatch_root_queues[] = {
231 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
232 .do_vtable = DISPATCH_VTABLE(queue_root),
233 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
234 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
235 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
236 .do_ctxt = &_dispatch_root_queue_contexts[
237 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
238 .dq_label = "com.apple.root.low-priority",
239 .dq_running = 2,
240 .dq_width = UINT32_MAX,
241 .dq_serialnum = 4,
242 },
243 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
244 .do_vtable = DISPATCH_VTABLE(queue_root),
245 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
246 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
247 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
248 .do_ctxt = &_dispatch_root_queue_contexts[
249 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
250 .dq_label = "com.apple.root.low-overcommit-priority",
251 .dq_running = 2,
252 .dq_width = UINT32_MAX,
253 .dq_serialnum = 5,
254 },
255 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
256 .do_vtable = DISPATCH_VTABLE(queue_root),
257 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
258 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
259 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
260 .do_ctxt = &_dispatch_root_queue_contexts[
261 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
262 .dq_label = "com.apple.root.default-priority",
263 .dq_running = 2,
264 .dq_width = UINT32_MAX,
265 .dq_serialnum = 6,
266 },
267 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
268 .do_vtable = DISPATCH_VTABLE(queue_root),
269 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
270 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
271 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
272 .do_ctxt = &_dispatch_root_queue_contexts[
273 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
274 .dq_label = "com.apple.root.default-overcommit-priority",
275 .dq_running = 2,
276 .dq_width = UINT32_MAX,
277 .dq_serialnum = 7,
278 },
279 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
280 .do_vtable = DISPATCH_VTABLE(queue_root),
281 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
282 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
283 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
284 .do_ctxt = &_dispatch_root_queue_contexts[
285 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
286 .dq_label = "com.apple.root.high-priority",
287 .dq_running = 2,
288 .dq_width = UINT32_MAX,
289 .dq_serialnum = 8,
290 },
291 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
292 .do_vtable = DISPATCH_VTABLE(queue_root),
293 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
294 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
295 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
296 .do_ctxt = &_dispatch_root_queue_contexts[
297 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
298 .dq_label = "com.apple.root.high-overcommit-priority",
299 .dq_running = 2,
300 .dq_width = UINT32_MAX,
301 .dq_serialnum = 9,
302 },
303 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
304 .do_vtable = DISPATCH_VTABLE(queue_root),
305 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
306 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
307 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
308 .do_ctxt = &_dispatch_root_queue_contexts[
309 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
310 .dq_label = "com.apple.root.background-priority",
311 .dq_running = 2,
312 .dq_width = UINT32_MAX,
313 .dq_serialnum = 10,
314 },
315 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
316 .do_vtable = DISPATCH_VTABLE(queue_root),
317 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
318 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
319 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
320 .do_ctxt = &_dispatch_root_queue_contexts[
321 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
322 .dq_label = "com.apple.root.background-overcommit-priority",
323 .dq_running = 2,
324 .dq_width = UINT32_MAX,
325 .dq_serialnum = 11,
326 },
327 };
328
329 #if HAVE_PTHREAD_WORKQUEUES
330 static const dispatch_queue_t _dispatch_wq2root_queues[][2] = {
331 [WORKQ_LOW_PRIOQUEUE][0] = &_dispatch_root_queues[
332 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
333 [WORKQ_LOW_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
334 &_dispatch_root_queues[
335 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
336 [WORKQ_DEFAULT_PRIOQUEUE][0] = &_dispatch_root_queues[
337 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
338 [WORKQ_DEFAULT_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
339 &_dispatch_root_queues[
340 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
341 [WORKQ_HIGH_PRIOQUEUE][0] = &_dispatch_root_queues[
342 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
343 [WORKQ_HIGH_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
344 &_dispatch_root_queues[
345 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
346 [WORKQ_BG_PRIOQUEUE][0] = &_dispatch_root_queues[
347 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
348 [WORKQ_BG_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
349 &_dispatch_root_queues[
350 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
351 };
352 #endif // HAVE_PTHREAD_WORKQUEUES
353
354 // 6618342 Contact the team that owns the Instrument DTrace probe before
355 // renaming this symbol
356 DISPATCH_CACHELINE_ALIGN
357 struct dispatch_queue_s _dispatch_mgr_q = {
358 .do_vtable = DISPATCH_VTABLE(queue_mgr),
359 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
360 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
361 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
362 .do_targetq = &_dispatch_root_queues[
363 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
364 .dq_label = "com.apple.libdispatch-manager",
365 .dq_width = 1,
366 .dq_serialnum = 2,
367 };
368
369 dispatch_queue_t
370 dispatch_get_global_queue(long priority, unsigned long flags)
371 {
372 if (flags & ~DISPATCH_QUEUE_OVERCOMMIT) {
373 return NULL;
374 }
375 return _dispatch_get_root_queue(priority,
376 flags & DISPATCH_QUEUE_OVERCOMMIT);
377 }
378
379 dispatch_queue_t
380 dispatch_get_current_queue(void)
381 {
382 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
383 }
384
385 #pragma mark -
386 #pragma mark dispatch_init
387
388 static void
389 _dispatch_hw_config_init(void)
390 {
391 _dispatch_hw_config.cc_max_active = _dispatch_get_activecpu();
392 _dispatch_hw_config.cc_max_logical = _dispatch_get_logicalcpu_max();
393 _dispatch_hw_config.cc_max_physical = _dispatch_get_physicalcpu_max();
394 }
395
396 static inline bool
397 _dispatch_root_queues_init_workq(void)
398 {
399 bool result = false;
400 #if HAVE_PTHREAD_WORKQUEUES
401 bool disable_wq = false;
402 #if DISPATCH_ENABLE_THREAD_POOL
403 disable_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
404 #endif
405 int r;
406 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
407 if (!disable_wq) {
408 r = pthread_workqueue_setdispatch_np(_dispatch_worker_thread2);
409 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
410 (void)dispatch_assume_zero(r);
411 #endif
412 result = !r;
413 }
414 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
415 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
416 if (!result) {
417 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
418 pthread_workqueue_attr_t pwq_attr;
419 if (!disable_wq) {
420 r = pthread_workqueue_attr_init_np(&pwq_attr);
421 (void)dispatch_assume_zero(r);
422 }
423 #endif
424 int i;
425 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
426 pthread_workqueue_t pwq = NULL;
427 struct dispatch_root_queue_context_s *qc =
428 &_dispatch_root_queue_contexts[i];
429 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
430 if (!disable_wq
431 #if DISPATCH_NO_BG_PRIORITY
432 && (qc->dgq_wq_priority != WORKQ_BG_PRIOQUEUE)
433 #endif
434 ) {
435 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr,
436 qc->dgq_wq_priority);
437 (void)dispatch_assume_zero(r);
438 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr,
439 qc->dgq_wq_options & WORKQ_ADDTHREADS_OPTION_OVERCOMMIT);
440 (void)dispatch_assume_zero(r);
441 r = pthread_workqueue_create_np(&pwq, &pwq_attr);
442 (void)dispatch_assume_zero(r);
443 result = result || dispatch_assume(pwq);
444 }
445 #endif
446 qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul);
447 }
448 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
449 if (!disable_wq) {
450 r = pthread_workqueue_attr_destroy_np(&pwq_attr);
451 (void)dispatch_assume_zero(r);
452 }
453 #endif
454 }
455 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
456 #endif // HAVE_PTHREAD_WORKQUEUES
457 return result;
458 }
459
460 static inline void
461 _dispatch_root_queues_init_thread_pool(void)
462 {
463 #if DISPATCH_ENABLE_THREAD_POOL
464 int i;
465 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
466 #if TARGET_OS_EMBEDDED
467 // some software hangs if the non-overcommitting queues do not
468 // overcommit when threads block. Someday, this behavior should apply
469 // to all platforms
470 if (!(i & 1)) {
471 _dispatch_root_queue_contexts[i].dgq_thread_pool_size =
472 _dispatch_hw_config.cc_max_active;
473 }
474 #endif
475 #if USE_MACH_SEM
476 // override the default FIFO behavior for the pool semaphores
477 kern_return_t kr = semaphore_create(mach_task_self(),
478 &_dispatch_thread_mediator[i].dsema_port, SYNC_POLICY_LIFO, 0);
479 DISPATCH_VERIFY_MIG(kr);
480 (void)dispatch_assume_zero(kr);
481 (void)dispatch_assume(_dispatch_thread_mediator[i].dsema_port);
482 #elif USE_POSIX_SEM
483 /* XXXRW: POSIX semaphores don't support LIFO? */
484 int ret = sem_init(&_dispatch_thread_mediator[i].dsema_sem, 0, 0);
485 (void)dispatch_assume_zero(ret);
486 #endif
487 }
488 #else
489 DISPATCH_CRASH("Thread pool creation failed");
490 #endif // DISPATCH_ENABLE_THREAD_POOL
491 }
492
493 static void
494 _dispatch_root_queues_init(void *context DISPATCH_UNUSED)
495 {
496 _dispatch_safe_fork = false;
497 if (!_dispatch_root_queues_init_workq()) {
498 _dispatch_root_queues_init_thread_pool();
499 }
500
501 }
502
503 #define countof(x) (sizeof(x) / sizeof(x[0]))
504
505 DISPATCH_EXPORT DISPATCH_NOTHROW
506 void
507 libdispatch_init(void)
508 {
509 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT == 4);
510 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 8);
511
512 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW ==
513 -DISPATCH_QUEUE_PRIORITY_HIGH);
514 dispatch_assert(countof(_dispatch_root_queues) ==
515 DISPATCH_ROOT_QUEUE_COUNT);
516 dispatch_assert(countof(_dispatch_root_queue_contexts) ==
517 DISPATCH_ROOT_QUEUE_COUNT);
518 #if HAVE_PTHREAD_WORKQUEUES
519 dispatch_assert(sizeof(_dispatch_wq2root_queues) /
520 sizeof(_dispatch_wq2root_queues[0][0]) ==
521 DISPATCH_ROOT_QUEUE_COUNT);
522 #endif
523 #if DISPATCH_ENABLE_THREAD_POOL
524 dispatch_assert(countof(_dispatch_thread_mediator) ==
525 DISPATCH_ROOT_QUEUE_COUNT);
526 #endif
527
528 dispatch_assert(sizeof(struct dispatch_apply_s) <=
529 ROUND_UP_TO_CACHELINE_SIZE(sizeof(
530 struct dispatch_continuation_s)));
531 dispatch_assert(sizeof(struct dispatch_source_s) ==
532 sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
533 dispatch_assert(sizeof(struct dispatch_queue_s) % DISPATCH_CACHELINE_SIZE
534 == 0);
535 dispatch_assert(sizeof(struct dispatch_root_queue_context_s) %
536 DISPATCH_CACHELINE_SIZE == 0);
537
538 _dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup);
539 _dispatch_thread_key_create(&dispatch_sema4_key,
540 (void (*)(void *))_dispatch_thread_semaphore_dispose);
541 _dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup);
542 _dispatch_thread_key_create(&dispatch_io_key, NULL);
543 _dispatch_thread_key_create(&dispatch_apply_key, NULL);
544 #if DISPATCH_PERF_MON
545 _dispatch_thread_key_create(&dispatch_bcounter_key, NULL);
546 #endif
547
548 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
549 _dispatch_main_q.do_targetq = &_dispatch_root_queues[
550 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY];
551 #endif
552
553 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_main_q);
554
555 #if DISPATCH_USE_PTHREAD_ATFORK
556 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare,
557 dispatch_atfork_parent, dispatch_atfork_child));
558 #endif
559
560 _dispatch_hw_config_init();
561 _dispatch_vtable_init();
562 _os_object_init();
563 }
564
565 DISPATCH_EXPORT DISPATCH_NOTHROW
566 void
567 dispatch_atfork_child(void)
568 {
569 void *crash = (void *)0x100;
570 size_t i;
571
572 if (_dispatch_safe_fork) {
573 return;
574 }
575
576 _dispatch_main_q.dq_items_head = crash;
577 _dispatch_main_q.dq_items_tail = crash;
578
579 _dispatch_mgr_q.dq_items_head = crash;
580 _dispatch_mgr_q.dq_items_tail = crash;
581
582 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
583 _dispatch_root_queues[i].dq_items_head = crash;
584 _dispatch_root_queues[i].dq_items_tail = crash;
585 }
586 }
587
588 #pragma mark -
589 #pragma mark dispatch_queue_t
590
591 // skip zero
592 // 1 - main_q
593 // 2 - mgr_q
594 // 3 - _unused_
595 // 4,5,6,7,8,9,10,11 - global queues
596 // we use 'xadd' on Intel, so the initial value == next assigned
597 unsigned long _dispatch_queue_serial_numbers = 12;
598
599 dispatch_queue_t
600 dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
601 {
602 dispatch_queue_t dq;
603 size_t label_len;
604
605 if (!label) {
606 label = "";
607 }
608
609 label_len = strlen(label);
610 if (label_len < (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1)) {
611 label_len = (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1);
612 }
613
614 // XXX switch to malloc()
615 dq = _dispatch_alloc(DISPATCH_VTABLE(queue),
616 sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_MIN_LABEL_SIZE -
617 DISPATCH_QUEUE_CACHELINE_PAD + label_len + 1);
618
619 _dispatch_queue_init(dq);
620 strcpy(dq->dq_label, label);
621
622 if (fastpath(!attr)) {
623 return dq;
624 }
625 if (fastpath(attr == DISPATCH_QUEUE_CONCURRENT)) {
626 dq->dq_width = UINT32_MAX;
627 dq->do_targetq = _dispatch_get_root_queue(0, false);
628 } else {
629 dispatch_debug_assert(!attr, "Invalid attribute");
630 }
631 return dq;
632 }
633
634 // 6618342 Contact the team that owns the Instrument DTrace probe before
635 // renaming this symbol
636 void
637 _dispatch_queue_dispose(dispatch_queue_t dq)
638 {
639 if (slowpath(dq == _dispatch_queue_get_current())) {
640 DISPATCH_CRASH("Release of a queue by itself");
641 }
642 if (slowpath(dq->dq_items_tail)) {
643 DISPATCH_CRASH("Release of a queue while items are enqueued");
644 }
645
646 // trash the tail queue so that use after free will crash
647 dq->dq_items_tail = (void *)0x200;
648
649 dispatch_queue_t dqsq = dispatch_atomic_xchg2o(dq, dq_specific_q,
650 (void *)0x200);
651 if (dqsq) {
652 _dispatch_release(dqsq);
653 }
654 }
655
656 const char *
657 dispatch_queue_get_label(dispatch_queue_t dq)
658 {
659 return dq->dq_label;
660 }
661
662 static void
663 _dispatch_queue_set_width2(void *ctxt)
664 {
665 int w = (int)(intptr_t)ctxt; // intentional truncation
666 uint32_t tmp;
667 dispatch_queue_t dq = _dispatch_queue_get_current();
668
669 if (w == 1 || w == 0) {
670 dq->dq_width = 1;
671 return;
672 }
673 if (w > 0) {
674 tmp = w;
675 } else switch (w) {
676 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
677 tmp = _dispatch_hw_config.cc_max_physical;
678 break;
679 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
680 tmp = _dispatch_hw_config.cc_max_active;
681 break;
682 default:
683 // fall through
684 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
685 tmp = _dispatch_hw_config.cc_max_logical;
686 break;
687 }
688 // multiply by two since the running count is inc/dec by two
689 // (the low bit == barrier)
690 dq->dq_width = tmp * 2;
691 }
692
693 void
694 dispatch_queue_set_width(dispatch_queue_t dq, long width)
695 {
696 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
697 return;
698 }
699 dispatch_barrier_async_f(dq, (void*)(intptr_t)width,
700 _dispatch_queue_set_width2);
701 }
702
703 // 6618342 Contact the team that owns the Instrument DTrace probe before
704 // renaming this symbol
705 static void
706 _dispatch_set_target_queue2(void *ctxt)
707 {
708 dispatch_queue_t prev_dq, dq = _dispatch_queue_get_current();
709
710 prev_dq = dq->do_targetq;
711 dq->do_targetq = ctxt;
712 _dispatch_release(prev_dq);
713 }
714
715 void
716 dispatch_set_target_queue(dispatch_object_t dou, dispatch_queue_t dq)
717 {
718 dispatch_queue_t prev_dq;
719 unsigned long type;
720
721 if (slowpath(dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
722 return;
723 }
724 type = dx_type(dou._do) & _DISPATCH_META_TYPE_MASK;
725 if (slowpath(!dq)) {
726 bool is_concurrent_q = (type == _DISPATCH_QUEUE_TYPE &&
727 slowpath(dou._dq->dq_width > 1));
728 dq = _dispatch_get_root_queue(0, !is_concurrent_q);
729 }
730 // TODO: put into the vtable
731 switch(type) {
732 case _DISPATCH_QUEUE_TYPE:
733 case _DISPATCH_SOURCE_TYPE:
734 _dispatch_retain(dq);
735 return dispatch_barrier_async_f(dou._dq, dq,
736 _dispatch_set_target_queue2);
737 case _DISPATCH_IO_TYPE:
738 return _dispatch_io_set_target_queue(dou._dchannel, dq);
739 default:
740 _dispatch_retain(dq);
741 dispatch_atomic_store_barrier();
742 prev_dq = dispatch_atomic_xchg2o(dou._do, do_targetq, dq);
743 if (prev_dq) _dispatch_release(prev_dq);
744 return;
745 }
746 }
747
748 void
749 dispatch_set_current_target_queue(dispatch_queue_t dq)
750 {
751 dispatch_queue_t queue = _dispatch_queue_get_current();
752
753 if (slowpath(!queue)) {
754 DISPATCH_CLIENT_CRASH("SPI not called from a queue");
755 }
756 if (slowpath(queue->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
757 DISPATCH_CLIENT_CRASH("SPI not supported on this queue");
758 }
759 if (slowpath(queue->dq_width != 1)) {
760 DISPATCH_CLIENT_CRASH("SPI not called from a serial queue");
761 }
762 if (slowpath(!dq)) {
763 dq = _dispatch_get_root_queue(0, true);
764 }
765 _dispatch_retain(dq);
766 _dispatch_set_target_queue2(dq);
767 }
768
769 #pragma mark -
770 #pragma mark dispatch_queue_specific
771
772 struct dispatch_queue_specific_queue_s {
773 DISPATCH_STRUCT_HEADER(queue_specific_queue);
774 DISPATCH_QUEUE_HEADER;
775 union {
776 char _dqsq_pad[DISPATCH_QUEUE_MIN_LABEL_SIZE];
777 struct {
778 char dq_label[16];
779 TAILQ_HEAD(dispatch_queue_specific_head_s,
780 dispatch_queue_specific_s) dqsq_contexts;
781 };
782 };
783 };
784
785 struct dispatch_queue_specific_s {
786 const void *dqs_key;
787 void *dqs_ctxt;
788 dispatch_function_t dqs_destructor;
789 TAILQ_ENTRY(dispatch_queue_specific_s) dqs_list;
790 };
791 DISPATCH_DECL(dispatch_queue_specific);
792
793 void
794 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq)
795 {
796 dispatch_queue_specific_t dqs, tmp;
797
798 TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) {
799 if (dqs->dqs_destructor) {
800 dispatch_async_f(_dispatch_get_root_queue(
801 DISPATCH_QUEUE_PRIORITY_DEFAULT, false), dqs->dqs_ctxt,
802 dqs->dqs_destructor);
803 }
804 free(dqs);
805 }
806 _dispatch_queue_dispose((dispatch_queue_t)dqsq);
807 }
808
809 static void
810 _dispatch_queue_init_specific(dispatch_queue_t dq)
811 {
812 dispatch_queue_specific_queue_t dqsq;
813
814 dqsq = _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue),
815 sizeof(struct dispatch_queue_specific_queue_s));
816 _dispatch_queue_init((dispatch_queue_t)dqsq);
817 dqsq->do_xref_cnt = -1;
818 dqsq->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_HIGH,
819 true);
820 dqsq->dq_width = UINT32_MAX;
821 strlcpy(dqsq->dq_label, "queue-specific", sizeof(dqsq->dq_label));
822 TAILQ_INIT(&dqsq->dqsq_contexts);
823 dispatch_atomic_store_barrier();
824 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_specific_q, NULL,
825 (dispatch_queue_t)dqsq))) {
826 _dispatch_release((dispatch_queue_t)dqsq);
827 }
828 }
829
830 static void
831 _dispatch_queue_set_specific(void *ctxt)
832 {
833 dispatch_queue_specific_t dqs, dqsn = ctxt;
834 dispatch_queue_specific_queue_t dqsq =
835 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
836
837 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
838 if (dqs->dqs_key == dqsn->dqs_key) {
839 // Destroy previous context for existing key
840 if (dqs->dqs_destructor) {
841 dispatch_async_f(_dispatch_get_root_queue(
842 DISPATCH_QUEUE_PRIORITY_DEFAULT, false), dqs->dqs_ctxt,
843 dqs->dqs_destructor);
844 }
845 if (dqsn->dqs_ctxt) {
846 // Copy new context for existing key
847 dqs->dqs_ctxt = dqsn->dqs_ctxt;
848 dqs->dqs_destructor = dqsn->dqs_destructor;
849 } else {
850 // Remove context storage for existing key
851 TAILQ_REMOVE(&dqsq->dqsq_contexts, dqs, dqs_list);
852 free(dqs);
853 }
854 return free(dqsn);
855 }
856 }
857 // Insert context storage for new key
858 TAILQ_INSERT_TAIL(&dqsq->dqsq_contexts, dqsn, dqs_list);
859 }
860
861 DISPATCH_NOINLINE
862 void
863 dispatch_queue_set_specific(dispatch_queue_t dq, const void *key,
864 void *ctxt, dispatch_function_t destructor)
865 {
866 if (slowpath(!key)) {
867 return;
868 }
869 dispatch_queue_specific_t dqs;
870
871 dqs = calloc(1, sizeof(struct dispatch_queue_specific_s));
872 dqs->dqs_key = key;
873 dqs->dqs_ctxt = ctxt;
874 dqs->dqs_destructor = destructor;
875 if (slowpath(!dq->dq_specific_q)) {
876 _dispatch_queue_init_specific(dq);
877 }
878 dispatch_barrier_async_f(dq->dq_specific_q, dqs,
879 _dispatch_queue_set_specific);
880 }
881
882 static void
883 _dispatch_queue_get_specific(void *ctxt)
884 {
885 void **ctxtp = ctxt;
886 void *key = *ctxtp;
887 dispatch_queue_specific_queue_t dqsq =
888 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
889 dispatch_queue_specific_t dqs;
890
891 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
892 if (dqs->dqs_key == key) {
893 *ctxtp = dqs->dqs_ctxt;
894 return;
895 }
896 }
897 *ctxtp = NULL;
898 }
899
900 DISPATCH_NOINLINE
901 void *
902 dispatch_queue_get_specific(dispatch_queue_t dq, const void *key)
903 {
904 if (slowpath(!key)) {
905 return NULL;
906 }
907 void *ctxt = NULL;
908
909 if (fastpath(dq->dq_specific_q)) {
910 ctxt = (void *)key;
911 dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific);
912 }
913 return ctxt;
914 }
915
916 DISPATCH_NOINLINE
917 void *
918 dispatch_get_specific(const void *key)
919 {
920 if (slowpath(!key)) {
921 return NULL;
922 }
923 void *ctxt = NULL;
924 dispatch_queue_t dq = _dispatch_queue_get_current();
925
926 while (slowpath(dq)) {
927 if (slowpath(dq->dq_specific_q)) {
928 ctxt = (void *)key;
929 dispatch_sync_f(dq->dq_specific_q, &ctxt,
930 _dispatch_queue_get_specific);
931 if (ctxt) break;
932 }
933 dq = dq->do_targetq;
934 }
935 return ctxt;
936 }
937
938 #pragma mark -
939 #pragma mark dispatch_queue_debug
940
941 size_t
942 _dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
943 {
944 dispatch_queue_t target = dq->do_targetq;
945 return snprintf(buf, bufsiz, "target = %s[%p], width = 0x%x, "
946 "running = 0x%x, barrier = %d ", target ? target->dq_label : "",
947 target, dq->dq_width / 2, dq->dq_running / 2, dq->dq_running & 1);
948 }
949
950 size_t
951 dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
952 {
953 size_t offset = 0;
954 offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
955 dq->dq_label, dq);
956 offset += _dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
957 offset += _dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
958 offset += snprintf(&buf[offset], bufsiz - offset, "}");
959 return offset;
960 }
961
962 #if DISPATCH_DEBUG
963 void
964 dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
965 if (fastpath(dq)) {
966 dispatch_debug(dq, "%s", str);
967 } else {
968 _dispatch_log("queue[NULL]: %s", str);
969 }
970 }
971 #endif
972
973 #if DISPATCH_PERF_MON
974 static OSSpinLock _dispatch_stats_lock;
975 static size_t _dispatch_bad_ratio;
976 static struct {
977 uint64_t time_total;
978 uint64_t count_total;
979 uint64_t thread_total;
980 } _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set
981
982 static void
983 _dispatch_queue_merge_stats(uint64_t start)
984 {
985 uint64_t avg, delta = _dispatch_absolute_time() - start;
986 unsigned long count, bucket;
987
988 count = (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key);
989 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
990
991 if (count) {
992 avg = delta / count;
993 bucket = flsll(avg);
994 } else {
995 bucket = 0;
996 }
997
998 // 64-bit counters on 32-bit require a lock or a queue
999 OSSpinLockLock(&_dispatch_stats_lock);
1000
1001 _dispatch_stats[bucket].time_total += delta;
1002 _dispatch_stats[bucket].count_total += count;
1003 _dispatch_stats[bucket].thread_total++;
1004
1005 OSSpinLockUnlock(&_dispatch_stats_lock);
1006 }
1007 #endif
1008
1009 #pragma mark -
1010 #pragma mark dispatch_continuation_t
1011
1012 static malloc_zone_t *_dispatch_ccache_zone;
1013
1014 static void
1015 _dispatch_ccache_init(void *context DISPATCH_UNUSED)
1016 {
1017 _dispatch_ccache_zone = malloc_create_zone(0, 0);
1018 dispatch_assert(_dispatch_ccache_zone);
1019 malloc_set_zone_name(_dispatch_ccache_zone, "DispatchContinuations");
1020 }
1021
1022 dispatch_continuation_t
1023 _dispatch_continuation_alloc_from_heap(void)
1024 {
1025 static dispatch_once_t pred;
1026 dispatch_continuation_t dc;
1027
1028 dispatch_once_f(&pred, NULL, _dispatch_ccache_init);
1029
1030 // This is also used for allocating struct dispatch_apply_s. If the
1031 // ROUND_UP behavior is changed, adjust the assert in libdispatch_init
1032 while (!(dc = fastpath(malloc_zone_calloc(_dispatch_ccache_zone, 1,
1033 ROUND_UP_TO_CACHELINE_SIZE(sizeof(*dc)))))) {
1034 sleep(1);
1035 }
1036
1037 return dc;
1038 }
1039
1040 static void
1041 _dispatch_force_cache_cleanup(void)
1042 {
1043 dispatch_continuation_t dc;
1044 dc = _dispatch_thread_getspecific(dispatch_cache_key);
1045 if (dc) {
1046 _dispatch_thread_setspecific(dispatch_cache_key, NULL);
1047 _dispatch_cache_cleanup(dc);
1048 }
1049 }
1050
1051 // rdar://problem/11500155
1052 void
1053 dispatch_flush_continuation_cache(void)
1054 {
1055 _dispatch_force_cache_cleanup();
1056 }
1057
1058 DISPATCH_NOINLINE
1059 static void
1060 _dispatch_cache_cleanup(void *value)
1061 {
1062 dispatch_continuation_t dc, next_dc = value;
1063
1064 while ((dc = next_dc)) {
1065 next_dc = dc->do_next;
1066 malloc_zone_free(_dispatch_ccache_zone, dc);
1067 }
1068 }
1069
1070 DISPATCH_ALWAYS_INLINE_NDEBUG
1071 static inline void
1072 _dispatch_continuation_redirect(dispatch_queue_t dq, dispatch_object_t dou)
1073 {
1074 dispatch_continuation_t dc = dou._dc;
1075
1076 _dispatch_trace_continuation_pop(dq, dou);
1077 (void)dispatch_atomic_add2o(dq, dq_running, 2);
1078 if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
1079 (long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT) {
1080 dispatch_atomic_barrier();
1081 _dispatch_thread_semaphore_signal(
1082 (_dispatch_thread_semaphore_t)dc->dc_ctxt);
1083 } else {
1084 _dispatch_async_f_redirect(dq, dc);
1085 }
1086 }
1087
1088 DISPATCH_ALWAYS_INLINE_NDEBUG
1089 static inline void
1090 _dispatch_continuation_pop(dispatch_object_t dou)
1091 {
1092 dispatch_continuation_t dc = dou._dc;
1093 dispatch_group_t dg;
1094
1095 _dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou);
1096 if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
1097 return _dispatch_queue_invoke(dou._dq);
1098 }
1099
1100 // Add the item back to the cache before calling the function. This
1101 // allows the 'hot' continuation to be used for a quick callback.
1102 //
1103 // The ccache version is per-thread.
1104 // Therefore, the object has not been reused yet.
1105 // This generates better assembly.
1106 if ((long)dc->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
1107 _dispatch_continuation_free(dc);
1108 }
1109 if ((long)dc->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
1110 dg = dc->dc_data;
1111 } else {
1112 dg = NULL;
1113 }
1114 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
1115 if (dg) {
1116 dispatch_group_leave(dg);
1117 _dispatch_release(dg);
1118 }
1119 }
1120
1121 #pragma mark -
1122 #pragma mark dispatch_barrier_async
1123
1124 DISPATCH_NOINLINE
1125 static void
1126 _dispatch_barrier_async_f_slow(dispatch_queue_t dq, void *ctxt,
1127 dispatch_function_t func)
1128 {
1129 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
1130
1131 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
1132 dc->dc_func = func;
1133 dc->dc_ctxt = ctxt;
1134
1135 _dispatch_queue_push(dq, dc);
1136 }
1137
1138 DISPATCH_NOINLINE
1139 void
1140 dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
1141 dispatch_function_t func)
1142 {
1143 dispatch_continuation_t dc;
1144
1145 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
1146 if (!dc) {
1147 return _dispatch_barrier_async_f_slow(dq, ctxt, func);
1148 }
1149
1150 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
1151 dc->dc_func = func;
1152 dc->dc_ctxt = ctxt;
1153
1154 _dispatch_queue_push(dq, dc);
1155 }
1156
1157 #ifdef __BLOCKS__
1158 void
1159 dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void))
1160 {
1161 dispatch_barrier_async_f(dq, _dispatch_Block_copy(work),
1162 _dispatch_call_block_and_release);
1163 }
1164 #endif
1165
1166 #pragma mark -
1167 #pragma mark dispatch_async
1168
1169 static void
1170 _dispatch_async_f_redirect_invoke(void *_ctxt)
1171 {
1172 struct dispatch_continuation_s *dc = _ctxt;
1173 struct dispatch_continuation_s *other_dc = dc->dc_other;
1174 dispatch_queue_t old_dq, dq = dc->dc_data, rq;
1175
1176 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1177 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1178 _dispatch_continuation_pop(other_dc);
1179 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1180
1181 rq = dq->do_targetq;
1182 while (slowpath(rq->do_targetq) && rq != old_dq) {
1183 if (dispatch_atomic_sub2o(rq, dq_running, 2) == 0) {
1184 _dispatch_wakeup(rq);
1185 }
1186 rq = rq->do_targetq;
1187 }
1188
1189 if (dispatch_atomic_sub2o(dq, dq_running, 2) == 0) {
1190 _dispatch_wakeup(dq);
1191 }
1192 _dispatch_release(dq);
1193 }
1194
1195 DISPATCH_NOINLINE
1196 static void
1197 _dispatch_async_f2_slow(dispatch_queue_t dq, dispatch_continuation_t dc)
1198 {
1199 _dispatch_wakeup(dq);
1200 _dispatch_queue_push(dq, dc);
1201 }
1202
1203 DISPATCH_NOINLINE
1204 static void
1205 _dispatch_async_f_redirect(dispatch_queue_t dq,
1206 dispatch_continuation_t other_dc)
1207 {
1208 dispatch_continuation_t dc;
1209 dispatch_queue_t rq;
1210
1211 _dispatch_retain(dq);
1212
1213 dc = _dispatch_continuation_alloc();
1214
1215 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1216 dc->dc_func = _dispatch_async_f_redirect_invoke;
1217 dc->dc_ctxt = dc;
1218 dc->dc_data = dq;
1219 dc->dc_other = other_dc;
1220
1221 // Find the queue to redirect to
1222 rq = dq->do_targetq;
1223 while (slowpath(rq->do_targetq)) {
1224 uint32_t running;
1225
1226 if (slowpath(rq->dq_items_tail) ||
1227 slowpath(DISPATCH_OBJECT_SUSPENDED(rq)) ||
1228 slowpath(rq->dq_width == 1)) {
1229 break;
1230 }
1231 running = dispatch_atomic_add2o(rq, dq_running, 2) - 2;
1232 if (slowpath(running & 1) || slowpath(running + 2 > rq->dq_width)) {
1233 if (slowpath(dispatch_atomic_sub2o(rq, dq_running, 2) == 0)) {
1234 return _dispatch_async_f2_slow(rq, dc);
1235 }
1236 break;
1237 }
1238 rq = rq->do_targetq;
1239 }
1240 _dispatch_queue_push(rq, dc);
1241 }
1242
1243 DISPATCH_NOINLINE
1244 static void
1245 _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
1246 {
1247 uint32_t running;
1248 bool locked;
1249
1250 do {
1251 if (slowpath(dq->dq_items_tail)
1252 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
1253 break;
1254 }
1255 running = dispatch_atomic_add2o(dq, dq_running, 2);
1256 if (slowpath(running > dq->dq_width)) {
1257 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
1258 return _dispatch_async_f2_slow(dq, dc);
1259 }
1260 break;
1261 }
1262 locked = running & 1;
1263 if (fastpath(!locked)) {
1264 return _dispatch_async_f_redirect(dq, dc);
1265 }
1266 locked = dispatch_atomic_sub2o(dq, dq_running, 2) & 1;
1267 // We might get lucky and find that the barrier has ended by now
1268 } while (!locked);
1269
1270 _dispatch_queue_push(dq, dc);
1271 }
1272
1273 DISPATCH_NOINLINE
1274 static void
1275 _dispatch_async_f_slow(dispatch_queue_t dq, void *ctxt,
1276 dispatch_function_t func)
1277 {
1278 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
1279
1280 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1281 dc->dc_func = func;
1282 dc->dc_ctxt = ctxt;
1283
1284 // No fastpath/slowpath hint because we simply don't know
1285 if (dq->do_targetq) {
1286 return _dispatch_async_f2(dq, dc);
1287 }
1288
1289 _dispatch_queue_push(dq, dc);
1290 }
1291
1292 DISPATCH_NOINLINE
1293 void
1294 dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
1295 {
1296 dispatch_continuation_t dc;
1297
1298 // No fastpath/slowpath hint because we simply don't know
1299 if (dq->dq_width == 1) {
1300 return dispatch_barrier_async_f(dq, ctxt, func);
1301 }
1302
1303 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
1304 if (!dc) {
1305 return _dispatch_async_f_slow(dq, ctxt, func);
1306 }
1307
1308 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1309 dc->dc_func = func;
1310 dc->dc_ctxt = ctxt;
1311
1312 // No fastpath/slowpath hint because we simply don't know
1313 if (dq->do_targetq) {
1314 return _dispatch_async_f2(dq, dc);
1315 }
1316
1317 _dispatch_queue_push(dq, dc);
1318 }
1319
1320 #ifdef __BLOCKS__
1321 void
1322 dispatch_async(dispatch_queue_t dq, void (^work)(void))
1323 {
1324 dispatch_async_f(dq, _dispatch_Block_copy(work),
1325 _dispatch_call_block_and_release);
1326 }
1327 #endif
1328
1329 #pragma mark -
1330 #pragma mark dispatch_group_async
1331
1332 DISPATCH_NOINLINE
1333 void
1334 dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
1335 dispatch_function_t func)
1336 {
1337 dispatch_continuation_t dc;
1338
1339 _dispatch_retain(dg);
1340 dispatch_group_enter(dg);
1341
1342 dc = _dispatch_continuation_alloc();
1343
1344 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_GROUP_BIT);
1345 dc->dc_func = func;
1346 dc->dc_ctxt = ctxt;
1347 dc->dc_data = dg;
1348
1349 // No fastpath/slowpath hint because we simply don't know
1350 if (dq->dq_width != 1 && dq->do_targetq) {
1351 return _dispatch_async_f2(dq, dc);
1352 }
1353
1354 _dispatch_queue_push(dq, dc);
1355 }
1356
1357 #ifdef __BLOCKS__
1358 void
1359 dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
1360 dispatch_block_t db)
1361 {
1362 dispatch_group_async_f(dg, dq, _dispatch_Block_copy(db),
1363 _dispatch_call_block_and_release);
1364 }
1365 #endif
1366
1367 #pragma mark -
1368 #pragma mark dispatch_function_invoke
1369
1370 DISPATCH_ALWAYS_INLINE
1371 static inline void
1372 _dispatch_function_invoke(dispatch_queue_t dq, void *ctxt,
1373 dispatch_function_t func)
1374 {
1375 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1376 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1377 _dispatch_client_callout(ctxt, func);
1378 _dispatch_workitem_inc();
1379 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1380 }
1381
1382 struct dispatch_function_recurse_s {
1383 dispatch_queue_t dfr_dq;
1384 void* dfr_ctxt;
1385 dispatch_function_t dfr_func;
1386 };
1387
1388 static void
1389 _dispatch_function_recurse_invoke(void *ctxt)
1390 {
1391 struct dispatch_function_recurse_s *dfr = ctxt;
1392 _dispatch_function_invoke(dfr->dfr_dq, dfr->dfr_ctxt, dfr->dfr_func);
1393 }
1394
1395 DISPATCH_ALWAYS_INLINE
1396 static inline void
1397 _dispatch_function_recurse(dispatch_queue_t dq, void *ctxt,
1398 dispatch_function_t func)
1399 {
1400 struct dispatch_function_recurse_s dfr = {
1401 .dfr_dq = dq,
1402 .dfr_func = func,
1403 .dfr_ctxt = ctxt,
1404 };
1405 dispatch_sync_f(dq->do_targetq, &dfr, _dispatch_function_recurse_invoke);
1406 }
1407
1408 #pragma mark -
1409 #pragma mark dispatch_barrier_sync
1410
1411 struct dispatch_barrier_sync_slow_s {
1412 DISPATCH_CONTINUATION_HEADER(barrier_sync_slow);
1413 };
1414
1415 struct dispatch_barrier_sync_slow2_s {
1416 dispatch_queue_t dbss2_dq;
1417 #if DISPATCH_COCOA_COMPAT
1418 dispatch_function_t dbss2_func;
1419 void *dbss2_ctxt;
1420 #endif
1421 _dispatch_thread_semaphore_t dbss2_sema;
1422 };
1423
1424 DISPATCH_ALWAYS_INLINE_NDEBUG
1425 static inline _dispatch_thread_semaphore_t
1426 _dispatch_barrier_sync_f_pop(dispatch_queue_t dq, dispatch_object_t dou,
1427 bool lock)
1428 {
1429 dispatch_continuation_t dc = dou._dc;
1430
1431 if (DISPATCH_OBJ_IS_VTABLE(dc) || ((long)dc->do_vtable &
1432 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) !=
1433 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) {
1434 return 0;
1435 }
1436 _dispatch_trace_continuation_pop(dq, dc);
1437 _dispatch_workitem_inc();
1438
1439 struct dispatch_barrier_sync_slow_s *dbssp = (void *)dc;
1440 struct dispatch_barrier_sync_slow2_s *dbss2 = dbssp->dc_ctxt;
1441 if (lock) {
1442 (void)dispatch_atomic_add2o(dbss2->dbss2_dq, do_suspend_cnt,
1443 DISPATCH_OBJECT_SUSPEND_INTERVAL);
1444 // rdar://problem/9032024 running lock must be held until sync_f_slow
1445 // returns
1446 (void)dispatch_atomic_add2o(dbss2->dbss2_dq, dq_running, 2);
1447 }
1448 return dbss2->dbss2_sema ? dbss2->dbss2_sema : MACH_PORT_DEAD;
1449 }
1450
1451 static void
1452 _dispatch_barrier_sync_f_slow_invoke(void *ctxt)
1453 {
1454 struct dispatch_barrier_sync_slow2_s *dbss2 = ctxt;
1455
1456 dispatch_assert(dbss2->dbss2_dq == _dispatch_queue_get_current());
1457 #if DISPATCH_COCOA_COMPAT
1458 // When the main queue is bound to the main thread
1459 if (dbss2->dbss2_dq == &_dispatch_main_q && pthread_main_np()) {
1460 dbss2->dbss2_func(dbss2->dbss2_ctxt);
1461 dbss2->dbss2_func = NULL;
1462 dispatch_atomic_barrier();
1463 _dispatch_thread_semaphore_signal(dbss2->dbss2_sema);
1464 return;
1465 }
1466 #endif
1467 (void)dispatch_atomic_add2o(dbss2->dbss2_dq, do_suspend_cnt,
1468 DISPATCH_OBJECT_SUSPEND_INTERVAL);
1469 // rdar://9032024 running lock must be held until sync_f_slow returns
1470 (void)dispatch_atomic_add2o(dbss2->dbss2_dq, dq_running, 2);
1471 dispatch_atomic_barrier();
1472 _dispatch_thread_semaphore_signal(dbss2->dbss2_sema);
1473 }
1474
1475 DISPATCH_NOINLINE
1476 static void
1477 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
1478 dispatch_function_t func)
1479 {
1480 // It's preferred to execute synchronous blocks on the current thread
1481 // due to thread-local side effects, garbage collection, etc. However,
1482 // blocks submitted to the main thread MUST be run on the main thread
1483
1484 struct dispatch_barrier_sync_slow2_s dbss2 = {
1485 .dbss2_dq = dq,
1486 #if DISPATCH_COCOA_COMPAT
1487 .dbss2_func = func,
1488 .dbss2_ctxt = ctxt,
1489 #endif
1490 .dbss2_sema = _dispatch_get_thread_semaphore(),
1491 };
1492 struct dispatch_barrier_sync_slow_s dbss = {
1493 .do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT |
1494 DISPATCH_OBJ_SYNC_SLOW_BIT),
1495 .dc_func = _dispatch_barrier_sync_f_slow_invoke,
1496 .dc_ctxt = &dbss2,
1497 };
1498 _dispatch_queue_push(dq, (void *)&dbss);
1499
1500 _dispatch_thread_semaphore_wait(dbss2.dbss2_sema);
1501 _dispatch_put_thread_semaphore(dbss2.dbss2_sema);
1502
1503 #if DISPATCH_COCOA_COMPAT
1504 // Main queue bound to main thread
1505 if (dbss2.dbss2_func == NULL) {
1506 return;
1507 }
1508 #endif
1509 dispatch_atomic_acquire_barrier();
1510 if (slowpath(dq->do_targetq) && slowpath(dq->do_targetq->do_targetq)) {
1511 _dispatch_function_recurse(dq, ctxt, func);
1512 } else {
1513 _dispatch_function_invoke(dq, ctxt, func);
1514 }
1515 dispatch_atomic_release_barrier();
1516 if (fastpath(dq->do_suspend_cnt < 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL) &&
1517 dq->dq_running == 2) {
1518 // rdar://problem/8290662 "lock transfer"
1519 _dispatch_thread_semaphore_t sema;
1520 sema = _dispatch_queue_drain_one_barrier_sync(dq);
1521 if (sema) {
1522 _dispatch_thread_semaphore_signal(sema);
1523 return;
1524 }
1525 }
1526 (void)dispatch_atomic_sub2o(dq, do_suspend_cnt,
1527 DISPATCH_OBJECT_SUSPEND_INTERVAL);
1528 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
1529 _dispatch_wakeup(dq);
1530 }
1531 }
1532
1533 DISPATCH_NOINLINE
1534 static void
1535 _dispatch_barrier_sync_f2(dispatch_queue_t dq)
1536 {
1537 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
1538 // rdar://problem/8290662 "lock transfer"
1539 _dispatch_thread_semaphore_t sema;
1540 sema = _dispatch_queue_drain_one_barrier_sync(dq);
1541 if (sema) {
1542 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
1543 DISPATCH_OBJECT_SUSPEND_INTERVAL);
1544 // rdar://9032024 running lock must be held until sync_f_slow
1545 // returns: increment by 2 and decrement by 1
1546 (void)dispatch_atomic_inc2o(dq, dq_running);
1547 _dispatch_thread_semaphore_signal(sema);
1548 return;
1549 }
1550 }
1551 if (slowpath(dispatch_atomic_dec2o(dq, dq_running) == 0)) {
1552 _dispatch_wakeup(dq);
1553 }
1554 }
1555
1556 DISPATCH_NOINLINE
1557 static void
1558 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
1559 dispatch_function_t func)
1560 {
1561 dispatch_atomic_acquire_barrier();
1562 _dispatch_function_invoke(dq, ctxt, func);
1563 dispatch_atomic_release_barrier();
1564 if (slowpath(dq->dq_items_tail)) {
1565 return _dispatch_barrier_sync_f2(dq);
1566 }
1567 if (slowpath(dispatch_atomic_dec2o(dq, dq_running) == 0)) {
1568 _dispatch_wakeup(dq);
1569 }
1570 }
1571
1572 DISPATCH_NOINLINE
1573 static void
1574 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
1575 dispatch_function_t func)
1576 {
1577 dispatch_atomic_acquire_barrier();
1578 _dispatch_function_recurse(dq, ctxt, func);
1579 dispatch_atomic_release_barrier();
1580 if (slowpath(dq->dq_items_tail)) {
1581 return _dispatch_barrier_sync_f2(dq);
1582 }
1583 if (slowpath(dispatch_atomic_dec2o(dq, dq_running) == 0)) {
1584 _dispatch_wakeup(dq);
1585 }
1586 }
1587
1588 DISPATCH_NOINLINE
1589 void
1590 dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
1591 dispatch_function_t func)
1592 {
1593 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1594 // 2) the queue is not suspended
1595 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
1596 return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
1597 }
1598 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) {
1599 // global queues and main queue bound to main thread always falls into
1600 // the slow case
1601 return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
1602 }
1603 if (slowpath(dq->do_targetq->do_targetq)) {
1604 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func);
1605 }
1606 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
1607 }
1608
1609 #ifdef __BLOCKS__
1610 #if DISPATCH_COCOA_COMPAT
1611 DISPATCH_NOINLINE
1612 static void
1613 _dispatch_barrier_sync_slow(dispatch_queue_t dq, void (^work)(void))
1614 {
1615 // Blocks submitted to the main queue MUST be run on the main thread,
1616 // therefore under GC we must Block_copy in order to notify the thread-local
1617 // garbage collector that the objects are transferring to the main thread
1618 // rdar://problem/7176237&7181849&7458685
1619 if (dispatch_begin_thread_4GC) {
1620 dispatch_block_t block = _dispatch_Block_copy(work);
1621 return dispatch_barrier_sync_f(dq, block,
1622 _dispatch_call_block_and_release);
1623 }
1624 struct Block_basic *bb = (void *)work;
1625 dispatch_barrier_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
1626 }
1627 #endif
1628
1629 void
1630 dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void))
1631 {
1632 #if DISPATCH_COCOA_COMPAT
1633 if (slowpath(dq == &_dispatch_main_q)) {
1634 return _dispatch_barrier_sync_slow(dq, work);
1635 }
1636 #endif
1637 struct Block_basic *bb = (void *)work;
1638 dispatch_barrier_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
1639 }
1640 #endif
1641
1642 #pragma mark -
1643 #pragma mark dispatch_sync
1644
1645 DISPATCH_NOINLINE
1646 static void
1647 _dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
1648 {
1649 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
1650 struct dispatch_sync_slow_s {
1651 DISPATCH_CONTINUATION_HEADER(sync_slow);
1652 } dss = {
1653 .do_vtable = (void*)DISPATCH_OBJ_SYNC_SLOW_BIT,
1654 .dc_ctxt = (void*)sema,
1655 };
1656 _dispatch_queue_push(dq, (void *)&dss);
1657
1658 _dispatch_thread_semaphore_wait(sema);
1659 _dispatch_put_thread_semaphore(sema);
1660
1661 if (slowpath(dq->do_targetq->do_targetq)) {
1662 _dispatch_function_recurse(dq, ctxt, func);
1663 } else {
1664 _dispatch_function_invoke(dq, ctxt, func);
1665 }
1666 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
1667 _dispatch_wakeup(dq);
1668 }
1669 }
1670
1671 DISPATCH_NOINLINE
1672 static void
1673 _dispatch_sync_f_slow2(dispatch_queue_t dq, void *ctxt,
1674 dispatch_function_t func)
1675 {
1676 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
1677 _dispatch_wakeup(dq);
1678 }
1679 _dispatch_sync_f_slow(dq, ctxt, func);
1680 }
1681
1682 DISPATCH_NOINLINE
1683 static void
1684 _dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
1685 dispatch_function_t func)
1686 {
1687 _dispatch_function_invoke(dq, ctxt, func);
1688 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
1689 _dispatch_wakeup(dq);
1690 }
1691 }
1692
1693 DISPATCH_NOINLINE
1694 static void
1695 _dispatch_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
1696 dispatch_function_t func)
1697 {
1698 _dispatch_function_recurse(dq, ctxt, func);
1699 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
1700 _dispatch_wakeup(dq);
1701 }
1702 }
1703
1704 DISPATCH_NOINLINE
1705 static void
1706 _dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
1707 {
1708 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1709 // 2) the queue is not suspended
1710 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
1711 return _dispatch_sync_f_slow(dq, ctxt, func);
1712 }
1713 if (slowpath(dispatch_atomic_add2o(dq, dq_running, 2) & 1)) {
1714 return _dispatch_sync_f_slow2(dq, ctxt, func);
1715 }
1716 if (slowpath(dq->do_targetq->do_targetq)) {
1717 return _dispatch_sync_f_recurse(dq, ctxt, func);
1718 }
1719 _dispatch_sync_f_invoke(dq, ctxt, func);
1720 }
1721
1722 DISPATCH_NOINLINE
1723 void
1724 dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
1725 {
1726 if (fastpath(dq->dq_width == 1)) {
1727 return dispatch_barrier_sync_f(dq, ctxt, func);
1728 }
1729 if (slowpath(!dq->do_targetq)) {
1730 // the global root queues do not need strict ordering
1731 (void)dispatch_atomic_add2o(dq, dq_running, 2);
1732 return _dispatch_sync_f_invoke(dq, ctxt, func);
1733 }
1734 _dispatch_sync_f2(dq, ctxt, func);
1735 }
1736
1737 #ifdef __BLOCKS__
1738 #if DISPATCH_COCOA_COMPAT
1739 DISPATCH_NOINLINE
1740 static void
1741 _dispatch_sync_slow(dispatch_queue_t dq, void (^work)(void))
1742 {
1743 // Blocks submitted to the main queue MUST be run on the main thread,
1744 // therefore under GC we must Block_copy in order to notify the thread-local
1745 // garbage collector that the objects are transferring to the main thread
1746 // rdar://problem/7176237&7181849&7458685
1747 if (dispatch_begin_thread_4GC) {
1748 dispatch_block_t block = _dispatch_Block_copy(work);
1749 return dispatch_sync_f(dq, block, _dispatch_call_block_and_release);
1750 }
1751 struct Block_basic *bb = (void *)work;
1752 dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
1753 }
1754 #endif
1755
1756 void
1757 dispatch_sync(dispatch_queue_t dq, void (^work)(void))
1758 {
1759 #if DISPATCH_COCOA_COMPAT
1760 if (slowpath(dq == &_dispatch_main_q)) {
1761 return _dispatch_sync_slow(dq, work);
1762 }
1763 #endif
1764 struct Block_basic *bb = (void *)work;
1765 dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
1766 }
1767 #endif
1768
1769 #pragma mark -
1770 #pragma mark dispatch_after
1771
1772 struct _dispatch_after_time_s {
1773 void *datc_ctxt;
1774 void (*datc_func)(void *);
1775 dispatch_source_t ds;
1776 };
1777
1778 static void
1779 _dispatch_after_timer_callback(void *ctxt)
1780 {
1781 struct _dispatch_after_time_s *datc = ctxt;
1782
1783 dispatch_assert(datc->datc_func);
1784 _dispatch_client_callout(datc->datc_ctxt, datc->datc_func);
1785
1786 dispatch_source_t ds = datc->ds;
1787 free(datc);
1788
1789 dispatch_source_cancel(ds); // Needed until 7287561 gets integrated
1790 dispatch_release(ds);
1791 }
1792
1793 DISPATCH_NOINLINE
1794 void
1795 dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
1796 dispatch_function_t func)
1797 {
1798 uint64_t delta;
1799 struct _dispatch_after_time_s *datc = NULL;
1800 dispatch_source_t ds;
1801
1802 if (when == DISPATCH_TIME_FOREVER) {
1803 #if DISPATCH_DEBUG
1804 DISPATCH_CLIENT_CRASH(
1805 "dispatch_after_f() called with 'when' == infinity");
1806 #endif
1807 return;
1808 }
1809
1810 // this function can and should be optimized to not use a dispatch source
1811 delta = _dispatch_timeout(when);
1812 if (delta == 0) {
1813 return dispatch_async_f(queue, ctxt, func);
1814 }
1815 // on successful creation, source owns malloc-ed context (which it frees in
1816 // the event handler)
1817 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);
1818 dispatch_assert(ds);
1819
1820 datc = malloc(sizeof(*datc));
1821 dispatch_assert(datc);
1822 datc->datc_ctxt = ctxt;
1823 datc->datc_func = func;
1824 datc->ds = ds;
1825
1826 dispatch_set_context(ds, datc);
1827 dispatch_source_set_event_handler_f(ds, _dispatch_after_timer_callback);
1828 dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, 0);
1829 dispatch_resume(ds);
1830 }
1831
1832 #ifdef __BLOCKS__
1833 void
1834 dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
1835 dispatch_block_t work)
1836 {
1837 // test before the copy of the block
1838 if (when == DISPATCH_TIME_FOREVER) {
1839 #if DISPATCH_DEBUG
1840 DISPATCH_CLIENT_CRASH(
1841 "dispatch_after() called with 'when' == infinity");
1842 #endif
1843 return;
1844 }
1845 dispatch_after_f(when, queue, _dispatch_Block_copy(work),
1846 _dispatch_call_block_and_release);
1847 }
1848 #endif
1849
1850 #pragma mark -
1851 #pragma mark dispatch_wakeup
1852
1853 DISPATCH_NOINLINE
1854 static void
1855 _dispatch_queue_push_list_slow2(dispatch_queue_t dq,
1856 struct dispatch_object_s *obj)
1857 {
1858 // The queue must be retained before dq_items_head is written in order
1859 // to ensure that the reference is still valid when _dispatch_wakeup is
1860 // called. Otherwise, if preempted between the assignment to
1861 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
1862 // queue may release the last reference to the queue when invoked by
1863 // _dispatch_queue_drain. <rdar://problem/6932776>
1864 _dispatch_retain(dq);
1865 dq->dq_items_head = obj;
1866 _dispatch_wakeup(dq);
1867 _dispatch_release(dq);
1868 }
1869
1870 DISPATCH_NOINLINE
1871 void
1872 _dispatch_queue_push_list_slow(dispatch_queue_t dq,
1873 struct dispatch_object_s *obj, unsigned int n)
1874 {
1875 if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_TYPE) {
1876 dq->dq_items_head = obj;
1877 return _dispatch_queue_wakeup_global2(dq, n);
1878 }
1879 _dispatch_queue_push_list_slow2(dq, obj);
1880 }
1881
1882 DISPATCH_NOINLINE
1883 void
1884 _dispatch_queue_push_slow(dispatch_queue_t dq,
1885 struct dispatch_object_s *obj)
1886 {
1887 if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_TYPE) {
1888 dq->dq_items_head = obj;
1889 return _dispatch_queue_wakeup_global(dq);
1890 }
1891 _dispatch_queue_push_list_slow2(dq, obj);
1892 }
1893
1894 // 6618342 Contact the team that owns the Instrument DTrace probe before
1895 // renaming this symbol
1896 dispatch_queue_t
1897 _dispatch_wakeup(dispatch_object_t dou)
1898 {
1899 dispatch_queue_t tq;
1900
1901 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
1902 return NULL;
1903 }
1904 if (!dx_probe(dou._do) && !dou._dq->dq_items_tail) {
1905 return NULL;
1906 }
1907
1908 // _dispatch_source_invoke() relies on this testing the whole suspend count
1909 // word, not just the lock bit. In other words, no point taking the lock
1910 // if the source is suspended or canceled.
1911 if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,
1912 DISPATCH_OBJECT_SUSPEND_LOCK)) {
1913 #if DISPATCH_COCOA_COMPAT
1914 if (dou._dq == &_dispatch_main_q) {
1915 return _dispatch_queue_wakeup_main();
1916 }
1917 #endif
1918 return NULL;
1919 }
1920 dispatch_atomic_acquire_barrier();
1921 _dispatch_retain(dou._do);
1922 tq = dou._do->do_targetq;
1923 _dispatch_queue_push(tq, dou._do);
1924 return tq; // libdispatch does not need this, but the Instrument DTrace
1925 // probe does
1926 }
1927
1928 #if DISPATCH_COCOA_COMPAT
1929 DISPATCH_NOINLINE
1930 dispatch_queue_t
1931 _dispatch_queue_wakeup_main(void)
1932 {
1933 kern_return_t kr;
1934
1935 dispatch_once_f(&_dispatch_main_q_port_pred, NULL,
1936 _dispatch_main_q_port_init);
1937 if (!main_q_port) {
1938 return NULL;
1939 }
1940 kr = _dispatch_send_wakeup_main_thread(main_q_port, 0);
1941
1942 switch (kr) {
1943 case MACH_SEND_TIMEOUT:
1944 case MACH_SEND_TIMED_OUT:
1945 case MACH_SEND_INVALID_DEST:
1946 break;
1947 default:
1948 (void)dispatch_assume_zero(kr);
1949 break;
1950 }
1951 return NULL;
1952 }
1953 #endif
1954
1955 DISPATCH_NOINLINE
1956 static void
1957 _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq, unsigned int n)
1958 {
1959 static dispatch_once_t pred;
1960 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
1961 int r;
1962
1963 dispatch_debug_queue(dq, __func__);
1964 dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);
1965
1966 #if HAVE_PTHREAD_WORKQUEUES
1967 #if DISPATCH_ENABLE_THREAD_POOL
1968 if (qc->dgq_kworkqueue != (void*)(~0ul))
1969 #endif
1970 {
1971 _dispatch_debug("requesting new worker thread");
1972 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
1973 if (qc->dgq_kworkqueue) {
1974 pthread_workitem_handle_t wh;
1975 unsigned int gen_cnt, i = n;
1976 do {
1977 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
1978 _dispatch_worker_thread3, dq, &wh, &gen_cnt);
1979 (void)dispatch_assume_zero(r);
1980 } while (--i);
1981 return;
1982 }
1983 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
1984 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
1985 r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
1986 qc->dgq_wq_options, n);
1987 (void)dispatch_assume_zero(r);
1988 #endif
1989 return;
1990 }
1991 #endif // HAVE_PTHREAD_WORKQUEUES
1992 #if DISPATCH_ENABLE_THREAD_POOL
1993 if (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
1994 return;
1995 }
1996
1997 pthread_t pthr;
1998 int t_count;
1999 do {
2000 t_count = qc->dgq_thread_pool_size;
2001 if (!t_count) {
2002 _dispatch_debug("The thread pool is full: %p", dq);
2003 return;
2004 }
2005 } while (!dispatch_atomic_cmpxchg2o(qc, dgq_thread_pool_size, t_count,
2006 t_count - 1));
2007
2008 while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
2009 if (r != EAGAIN) {
2010 (void)dispatch_assume_zero(r);
2011 }
2012 sleep(1);
2013 }
2014 r = pthread_detach(pthr);
2015 (void)dispatch_assume_zero(r);
2016 #endif // DISPATCH_ENABLE_THREAD_POOL
2017 }
2018
2019 static inline void
2020 _dispatch_queue_wakeup_global2(dispatch_queue_t dq, unsigned int n)
2021 {
2022 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
2023
2024 if (!dq->dq_items_tail) {
2025 return;
2026 }
2027 #if HAVE_PTHREAD_WORKQUEUES
2028 if (
2029 #if DISPATCH_ENABLE_THREAD_POOL
2030 (qc->dgq_kworkqueue != (void*)(~0ul)) &&
2031 #endif
2032 !dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, n)) {
2033 _dispatch_debug("work thread request still pending on global queue: "
2034 "%p", dq);
2035 return;
2036 }
2037 #endif // HAVE_PTHREAD_WORKQUEUES
2038 return _dispatch_queue_wakeup_global_slow(dq, n);
2039 }
2040
2041 static inline void
2042 _dispatch_queue_wakeup_global(dispatch_queue_t dq)
2043 {
2044 return _dispatch_queue_wakeup_global2(dq, 1);
2045 }
2046
2047 bool
2048 _dispatch_queue_probe_root(dispatch_queue_t dq)
2049 {
2050 _dispatch_queue_wakeup_global2(dq, 1);
2051 return false;
2052 }
2053
2054 #pragma mark -
2055 #pragma mark dispatch_queue_drain
2056
2057 // 6618342 Contact the team that owns the Instrument DTrace probe before
2058 // renaming this symbol
2059 DISPATCH_NOINLINE
2060 void
2061 _dispatch_queue_invoke(dispatch_queue_t dq)
2062 {
2063 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) &&
2064 fastpath(dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) {
2065 dispatch_atomic_acquire_barrier();
2066 dispatch_queue_t otq = dq->do_targetq, tq = NULL;
2067 _dispatch_thread_semaphore_t sema = _dispatch_queue_drain(dq);
2068 if (dq->do_vtable->do_invoke) {
2069 // Assume that object invoke checks it is executing on correct queue
2070 tq = dx_invoke(dq);
2071 } else if (slowpath(otq != dq->do_targetq)) {
2072 // An item on the queue changed the target queue
2073 tq = dq->do_targetq;
2074 }
2075 // We do not need to check the result.
2076 // When the suspend-count lock is dropped, then the check will happen.
2077 dispatch_atomic_release_barrier();
2078 (void)dispatch_atomic_dec2o(dq, dq_running);
2079 if (sema) {
2080 _dispatch_thread_semaphore_signal(sema);
2081 } else if (tq) {
2082 return _dispatch_queue_push(tq, dq);
2083 }
2084 }
2085
2086 dq->do_next = DISPATCH_OBJECT_LISTLESS;
2087 dispatch_atomic_release_barrier();
2088 if (!dispatch_atomic_sub2o(dq, do_suspend_cnt,
2089 DISPATCH_OBJECT_SUSPEND_LOCK)) {
2090 if (dq->dq_running == 0) {
2091 _dispatch_wakeup(dq); // verify that the queue is idle
2092 }
2093 }
2094 _dispatch_release(dq); // added when the queue is put on the list
2095 }
2096
2097 static _dispatch_thread_semaphore_t
2098 _dispatch_queue_drain(dispatch_queue_t dq)
2099 {
2100 dispatch_queue_t orig_tq, old_dq;
2101 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2102 struct dispatch_object_s *dc = NULL, *next_dc = NULL;
2103 _dispatch_thread_semaphore_t sema = 0;
2104
2105 // Continue draining sources after target queue change rdar://8928171
2106 bool check_tq = (dx_type(dq) != DISPATCH_SOURCE_KEVENT_TYPE);
2107
2108 orig_tq = dq->do_targetq;
2109
2110 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2111 //dispatch_debug_queue(dq, __func__);
2112
2113 while (dq->dq_items_tail) {
2114 while (!(dc = fastpath(dq->dq_items_head))) {
2115 _dispatch_hardware_pause();
2116 }
2117 dq->dq_items_head = NULL;
2118 do {
2119 next_dc = fastpath(dc->do_next);
2120 if (!next_dc &&
2121 !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, dc, NULL)) {
2122 // Enqueue is TIGHTLY controlled, we won't wait long.
2123 while (!(next_dc = fastpath(dc->do_next))) {
2124 _dispatch_hardware_pause();
2125 }
2126 }
2127 if (DISPATCH_OBJECT_SUSPENDED(dq)) {
2128 goto out;
2129 }
2130 if (dq->dq_running > dq->dq_width) {
2131 goto out;
2132 }
2133 if (slowpath(orig_tq != dq->do_targetq) && check_tq) {
2134 goto out;
2135 }
2136 if (!fastpath(dq->dq_width == 1)) {
2137 if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
2138 (long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
2139 if (dq->dq_running > 1) {
2140 goto out;
2141 }
2142 } else {
2143 _dispatch_continuation_redirect(dq, dc);
2144 continue;
2145 }
2146 }
2147 if ((sema = _dispatch_barrier_sync_f_pop(dq, dc, true))) {
2148 dc = next_dc;
2149 goto out;
2150 }
2151 _dispatch_continuation_pop(dc);
2152 _dispatch_workitem_inc();
2153 } while ((dc = next_dc));
2154 }
2155
2156 out:
2157 // if this is not a complete drain, we must undo some things
2158 if (slowpath(dc)) {
2159 // 'dc' must NOT be "popped"
2160 // 'dc' might be the last item
2161 if (!next_dc &&
2162 !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, NULL, dc)) {
2163 // wait for enqueue slow path to finish
2164 while (!(next_dc = fastpath(dq->dq_items_head))) {
2165 _dispatch_hardware_pause();
2166 }
2167 dc->do_next = next_dc;
2168 }
2169 dq->dq_items_head = dc;
2170 }
2171
2172 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2173 return sema;
2174 }
2175
2176 static void
2177 _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq)
2178 {
2179 #if DISPATCH_PERF_MON
2180 uint64_t start = _dispatch_absolute_time();
2181 #endif
2182 _dispatch_thread_semaphore_t sema = _dispatch_queue_drain(dq);
2183 if (sema) {
2184 dispatch_atomic_barrier();
2185 _dispatch_thread_semaphore_signal(sema);
2186 }
2187 #if DISPATCH_PERF_MON
2188 _dispatch_queue_merge_stats(start);
2189 #endif
2190 _dispatch_force_cache_cleanup();
2191 }
2192
2193 #if DISPATCH_COCOA_COMPAT
2194 void
2195 _dispatch_main_queue_drain(void)
2196 {
2197 dispatch_queue_t dq = &_dispatch_main_q;
2198 if (!dq->dq_items_tail) {
2199 return;
2200 }
2201 struct dispatch_main_queue_drain_marker_s {
2202 DISPATCH_CONTINUATION_HEADER(main_queue_drain_marker);
2203 } marker = {
2204 .do_vtable = NULL,
2205 };
2206 struct dispatch_object_s *dmarker = (void*)&marker;
2207 _dispatch_queue_push_notrace(dq, dmarker);
2208
2209 #if DISPATCH_PERF_MON
2210 uint64_t start = _dispatch_absolute_time();
2211 #endif
2212 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2213 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2214
2215 struct dispatch_object_s *dc = NULL, *next_dc = NULL;
2216 while (dq->dq_items_tail) {
2217 while (!(dc = fastpath(dq->dq_items_head))) {
2218 _dispatch_hardware_pause();
2219 }
2220 dq->dq_items_head = NULL;
2221 do {
2222 next_dc = fastpath(dc->do_next);
2223 if (!next_dc &&
2224 !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, dc, NULL)) {
2225 // Enqueue is TIGHTLY controlled, we won't wait long.
2226 while (!(next_dc = fastpath(dc->do_next))) {
2227 _dispatch_hardware_pause();
2228 }
2229 }
2230 if (dc == dmarker) {
2231 if (next_dc) {
2232 dq->dq_items_head = next_dc;
2233 _dispatch_queue_wakeup_main();
2234 }
2235 goto out;
2236 }
2237 _dispatch_continuation_pop(dc);
2238 _dispatch_workitem_inc();
2239 } while ((dc = next_dc));
2240 }
2241 dispatch_assert(dc); // did not encounter marker
2242
2243 out:
2244 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2245 #if DISPATCH_PERF_MON
2246 _dispatch_queue_merge_stats(start);
2247 #endif
2248 _dispatch_force_cache_cleanup();
2249 }
2250 #endif
2251
2252 DISPATCH_ALWAYS_INLINE_NDEBUG
2253 static inline _dispatch_thread_semaphore_t
2254 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq)
2255 {
2256 // rdar://problem/8290662 "lock transfer"
2257 struct dispatch_object_s *dc, *next_dc;
2258 _dispatch_thread_semaphore_t sema;
2259
2260 // queue is locked, or suspended and not being drained
2261 dc = dq->dq_items_head;
2262 if (slowpath(!dc) || !(sema = _dispatch_barrier_sync_f_pop(dq, dc, false))){
2263 return 0;
2264 }
2265 // dequeue dc, it is a barrier sync
2266 next_dc = fastpath(dc->do_next);
2267 dq->dq_items_head = next_dc;
2268 if (!next_dc && !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, dc, NULL)) {
2269 // Enqueue is TIGHTLY controlled, we won't wait long.
2270 while (!(next_dc = fastpath(dc->do_next))) {
2271 _dispatch_hardware_pause();
2272 }
2273 dq->dq_items_head = next_dc;
2274 }
2275 return sema;
2276 }
2277
2278 #ifndef DISPATCH_HEAD_CONTENTION_SPINS
2279 #define DISPATCH_HEAD_CONTENTION_SPINS 10000
2280 #endif
2281
2282 static struct dispatch_object_s *
2283 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq)
2284 {
2285 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
2286
2287 start:
2288 // The mediator value acts both as a "lock" and a signal
2289 head = dispatch_atomic_xchg2o(dq, dq_items_head, mediator);
2290
2291 if (slowpath(head == NULL)) {
2292 // The first xchg on the tail will tell the enqueueing thread that it
2293 // is safe to blindly write out to the head pointer. A cmpxchg honors
2294 // the algorithm.
2295 (void)dispatch_atomic_cmpxchg2o(dq, dq_items_head, mediator, NULL);
2296 _dispatch_debug("no work on global work queue");
2297 return NULL;
2298 }
2299
2300 if (slowpath(head == mediator)) {
2301 // This thread lost the race for ownership of the queue.
2302 // Spin for a short while in case many threads have started draining at
2303 // once as part of a dispatch_apply
2304 unsigned int i = DISPATCH_HEAD_CONTENTION_SPINS;
2305 do {
2306 _dispatch_hardware_pause();
2307 if (dq->dq_items_head != mediator) goto start;
2308 } while (--i);
2309 // The ratio of work to libdispatch overhead must be bad. This
2310 // scenario implies that there are too many threads in the pool.
2311 // Create a new pending thread and then exit this thread.
2312 // The kernel will grant a new thread when the load subsides.
2313 _dispatch_debug("Contention on queue: %p", dq);
2314 _dispatch_queue_wakeup_global(dq);
2315 #if DISPATCH_PERF_MON
2316 dispatch_atomic_inc(&_dispatch_bad_ratio);
2317 #endif
2318 return NULL;
2319 }
2320
2321 // Restore the head pointer to a sane value before returning.
2322 // If 'next' is NULL, then this item _might_ be the last item.
2323 next = fastpath(head->do_next);
2324
2325 if (slowpath(!next)) {
2326 dq->dq_items_head = NULL;
2327
2328 if (dispatch_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL)) {
2329 // both head and tail are NULL now
2330 goto out;
2331 }
2332
2333 // There must be a next item now. This thread won't wait long.
2334 while (!(next = head->do_next)) {
2335 _dispatch_hardware_pause();
2336 }
2337 }
2338
2339 dq->dq_items_head = next;
2340 _dispatch_queue_wakeup_global(dq);
2341 out:
2342 return head;
2343 }
2344
2345 #pragma mark -
2346 #pragma mark dispatch_worker_thread
2347
2348 static void
2349 _dispatch_worker_thread4(dispatch_queue_t dq)
2350 {
2351 struct dispatch_object_s *item;
2352
2353
2354 #if DISPATCH_DEBUG
2355 if (_dispatch_thread_getspecific(dispatch_queue_key)) {
2356 DISPATCH_CRASH("Premature thread recycling");
2357 }
2358 #endif
2359 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2360
2361 #if DISPATCH_COCOA_COMPAT
2362 (void)dispatch_atomic_inc(&_dispatch_worker_threads);
2363 // ensure that high-level memory management techniques do not leak/crash
2364 if (dispatch_begin_thread_4GC) {
2365 dispatch_begin_thread_4GC();
2366 }
2367 void *pool = _dispatch_autorelease_pool_push();
2368 #endif // DISPATCH_COCOA_COMPAT
2369
2370 #if DISPATCH_PERF_MON
2371 uint64_t start = _dispatch_absolute_time();
2372 #endif
2373 while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
2374 _dispatch_continuation_pop(item);
2375 }
2376 #if DISPATCH_PERF_MON
2377 _dispatch_queue_merge_stats(start);
2378 #endif
2379
2380 #if DISPATCH_COCOA_COMPAT
2381 _dispatch_autorelease_pool_pop(pool);
2382 if (dispatch_end_thread_4GC) {
2383 dispatch_end_thread_4GC();
2384 }
2385 if (!dispatch_atomic_dec(&_dispatch_worker_threads) &&
2386 dispatch_no_worker_threads_4GC) {
2387 dispatch_no_worker_threads_4GC();
2388 }
2389 #endif // DISPATCH_COCOA_COMPAT
2390
2391 _dispatch_thread_setspecific(dispatch_queue_key, NULL);
2392
2393 _dispatch_force_cache_cleanup();
2394
2395 }
2396
2397 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
2398 static void
2399 _dispatch_worker_thread3(void *context)
2400 {
2401 dispatch_queue_t dq = context;
2402 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
2403
2404 (void)dispatch_atomic_dec2o(qc, dgq_pending);
2405 _dispatch_worker_thread4(dq);
2406 }
2407 #endif
2408
2409 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2410 // 6618342 Contact the team that owns the Instrument DTrace probe before
2411 // renaming this symbol
2412 static void
2413 _dispatch_worker_thread2(int priority, int options,
2414 void *context DISPATCH_UNUSED)
2415 {
2416 dispatch_assert(priority >= 0 && priority < WORKQ_NUM_PRIOQUEUE);
2417 dispatch_assert(!(options & ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT));
2418 dispatch_queue_t dq = _dispatch_wq2root_queues[priority][options];
2419 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
2420
2421 (void)dispatch_atomic_dec2o(qc, dgq_pending);
2422 _dispatch_worker_thread4(dq);
2423 }
2424 #endif
2425
2426 #if DISPATCH_ENABLE_THREAD_POOL
2427 // 6618342 Contact the team that owns the Instrument DTrace probe before
2428 // renaming this symbol
2429 static void *
2430 _dispatch_worker_thread(void *context)
2431 {
2432 dispatch_queue_t dq = context;
2433 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
2434 sigset_t mask;
2435 int r;
2436
2437 // workaround tweaks the kernel workqueue does for us
2438 r = sigfillset(&mask);
2439 (void)dispatch_assume_zero(r);
2440 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
2441 (void)dispatch_assume_zero(r);
2442
2443 do {
2444 _dispatch_worker_thread4(dq);
2445 // we use 65 seconds in case there are any timers that run once a minute
2446 } while (dispatch_semaphore_wait(qc->dgq_thread_mediator,
2447 dispatch_time(0, 65ull * NSEC_PER_SEC)) == 0);
2448
2449 (void)dispatch_atomic_inc2o(qc, dgq_thread_pool_size);
2450 if (dq->dq_items_tail) {
2451 _dispatch_queue_wakeup_global(dq);
2452 }
2453
2454 return NULL;
2455 }
2456
2457 int
2458 _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset)
2459 {
2460 int r;
2461
2462 /* Workaround: 6269619 Not all signals can be delivered on any thread */
2463
2464 r = sigdelset(set, SIGILL);
2465 (void)dispatch_assume_zero(r);
2466 r = sigdelset(set, SIGTRAP);
2467 (void)dispatch_assume_zero(r);
2468 #if HAVE_DECL_SIGEMT
2469 r = sigdelset(set, SIGEMT);
2470 (void)dispatch_assume_zero(r);
2471 #endif
2472 r = sigdelset(set, SIGFPE);
2473 (void)dispatch_assume_zero(r);
2474 r = sigdelset(set, SIGBUS);
2475 (void)dispatch_assume_zero(r);
2476 r = sigdelset(set, SIGSEGV);
2477 (void)dispatch_assume_zero(r);
2478 r = sigdelset(set, SIGSYS);
2479 (void)dispatch_assume_zero(r);
2480 r = sigdelset(set, SIGPIPE);
2481 (void)dispatch_assume_zero(r);
2482
2483 return pthread_sigmask(how, set, oset);
2484 }
2485 #endif
2486
2487 #pragma mark -
2488 #pragma mark dispatch_main_queue
2489
2490 static bool _dispatch_program_is_probably_callback_driven;
2491
2492 #if DISPATCH_COCOA_COMPAT
2493 static void
2494 _dispatch_main_q_port_init(void *ctxt DISPATCH_UNUSED)
2495 {
2496 kern_return_t kr;
2497
2498 _dispatch_safe_fork = false;
2499 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
2500 &main_q_port);
2501 DISPATCH_VERIFY_MIG(kr);
2502 (void)dispatch_assume_zero(kr);
2503 kr = mach_port_insert_right(mach_task_self(), main_q_port, main_q_port,
2504 MACH_MSG_TYPE_MAKE_SEND);
2505 DISPATCH_VERIFY_MIG(kr);
2506 (void)dispatch_assume_zero(kr);
2507
2508 _dispatch_program_is_probably_callback_driven = true;
2509 }
2510
2511 mach_port_t
2512 _dispatch_get_main_queue_port_4CF(void)
2513 {
2514 dispatch_once_f(&_dispatch_main_q_port_pred, NULL,
2515 _dispatch_main_q_port_init);
2516 return main_q_port;
2517 }
2518
2519 static bool main_q_is_draining;
2520
2521 // 6618342 Contact the team that owns the Instrument DTrace probe before
2522 // renaming this symbol
2523 DISPATCH_NOINLINE
2524 static void
2525 _dispatch_queue_set_mainq_drain_state(bool arg)
2526 {
2527 main_q_is_draining = arg;
2528 }
2529
2530 void
2531 _dispatch_main_queue_callback_4CF(mach_msg_header_t *msg DISPATCH_UNUSED)
2532 {
2533 if (main_q_is_draining) {
2534 return;
2535 }
2536 _dispatch_queue_set_mainq_drain_state(true);
2537 _dispatch_main_queue_drain();
2538 _dispatch_queue_set_mainq_drain_state(false);
2539 }
2540
2541 #endif
2542
2543 void
2544 dispatch_main(void)
2545 {
2546 #if HAVE_PTHREAD_MAIN_NP
2547 if (pthread_main_np()) {
2548 #endif
2549 _dispatch_program_is_probably_callback_driven = true;
2550 pthread_exit(NULL);
2551 DISPATCH_CRASH("pthread_exit() returned");
2552 #if HAVE_PTHREAD_MAIN_NP
2553 }
2554 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
2555 #endif
2556 }
2557
2558 DISPATCH_NOINLINE DISPATCH_NORETURN
2559 static void
2560 _dispatch_sigsuspend(void)
2561 {
2562 static const sigset_t mask;
2563
2564 #if DISPATCH_COCOA_COMPAT
2565 // Do not count the signal handling thread as a worker thread
2566 (void)dispatch_atomic_dec(&_dispatch_worker_threads);
2567 #endif
2568 for (;;) {
2569 sigsuspend(&mask);
2570 }
2571 }
2572
2573 DISPATCH_NORETURN
2574 static void
2575 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED)
2576 {
2577 // never returns, so burn bridges behind us
2578 _dispatch_clear_stack(0);
2579 _dispatch_sigsuspend();
2580 }
2581
2582 DISPATCH_NOINLINE
2583 static void
2584 _dispatch_queue_cleanup2(void)
2585 {
2586 (void)dispatch_atomic_dec(&_dispatch_main_q.dq_running);
2587
2588 dispatch_atomic_release_barrier();
2589 if (dispatch_atomic_sub2o(&_dispatch_main_q, do_suspend_cnt,
2590 DISPATCH_OBJECT_SUSPEND_LOCK) == 0) {
2591 _dispatch_wakeup(&_dispatch_main_q);
2592 }
2593
2594 // overload the "probably" variable to mean that dispatch_main() or
2595 // similar non-POSIX API was called
2596 // this has to run before the DISPATCH_COCOA_COMPAT below
2597 if (_dispatch_program_is_probably_callback_driven) {
2598 dispatch_async_f(_dispatch_get_root_queue(0, true), NULL,
2599 _dispatch_sig_thread);
2600 sleep(1); // workaround 6778970
2601 }
2602
2603 #if DISPATCH_COCOA_COMPAT
2604 dispatch_once_f(&_dispatch_main_q_port_pred, NULL,
2605 _dispatch_main_q_port_init);
2606
2607 mach_port_t mp = main_q_port;
2608 kern_return_t kr;
2609
2610 main_q_port = 0;
2611
2612 if (mp) {
2613 kr = mach_port_deallocate(mach_task_self(), mp);
2614 DISPATCH_VERIFY_MIG(kr);
2615 (void)dispatch_assume_zero(kr);
2616 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE,
2617 -1);
2618 DISPATCH_VERIFY_MIG(kr);
2619 (void)dispatch_assume_zero(kr);
2620 }
2621 #endif
2622 }
2623
2624 static void
2625 _dispatch_queue_cleanup(void *ctxt)
2626 {
2627 if (ctxt == &_dispatch_main_q) {
2628 return _dispatch_queue_cleanup2();
2629 }
2630 // POSIX defines that destructors are only called if 'ctxt' is non-null
2631 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
2632 }
2633
2634 #pragma mark -
2635 #pragma mark dispatch_manager_queue
2636
2637 static unsigned int _dispatch_select_workaround;
2638 static fd_set _dispatch_rfds;
2639 static fd_set _dispatch_wfds;
2640 static void **_dispatch_rfd_ptrs;
2641 static void **_dispatch_wfd_ptrs;
2642
2643 static int _dispatch_kq;
2644
2645 static void
2646 _dispatch_get_kq_init(void *context DISPATCH_UNUSED)
2647 {
2648 static const struct kevent kev = {
2649 .ident = 1,
2650 .filter = EVFILT_USER,
2651 .flags = EV_ADD|EV_CLEAR,
2652 };
2653
2654 _dispatch_safe_fork = false;
2655 _dispatch_kq = kqueue();
2656 if (_dispatch_kq == -1) {
2657 DISPATCH_CLIENT_CRASH("kqueue() create failed: "
2658 "probably out of file descriptors");
2659 } else if (dispatch_assume(_dispatch_kq < FD_SETSIZE)) {
2660 // in case we fall back to select()
2661 FD_SET(_dispatch_kq, &_dispatch_rfds);
2662 }
2663
2664 (void)dispatch_assume_zero(kevent(_dispatch_kq, &kev, 1, NULL, 0, NULL));
2665
2666 _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q);
2667 }
2668
2669 static int
2670 _dispatch_get_kq(void)
2671 {
2672 static dispatch_once_t pred;
2673
2674 dispatch_once_f(&pred, NULL, _dispatch_get_kq_init);
2675
2676 return _dispatch_kq;
2677 }
2678
2679 long
2680 _dispatch_update_kq(const struct kevent *kev)
2681 {
2682 int rval;
2683 struct kevent kev_copy = *kev;
2684 // This ensures we don't get a pending kevent back while registering
2685 // a new kevent
2686 kev_copy.flags |= EV_RECEIPT;
2687
2688 if (_dispatch_select_workaround && (kev_copy.flags & EV_DELETE)) {
2689 // Only executed on manager queue
2690 switch (kev_copy.filter) {
2691 case EVFILT_READ:
2692 if (kev_copy.ident < FD_SETSIZE &&
2693 FD_ISSET((int)kev_copy.ident, &_dispatch_rfds)) {
2694 FD_CLR((int)kev_copy.ident, &_dispatch_rfds);
2695 _dispatch_rfd_ptrs[kev_copy.ident] = 0;
2696 (void)dispatch_atomic_dec(&_dispatch_select_workaround);
2697 return 0;
2698 }
2699 break;
2700 case EVFILT_WRITE:
2701 if (kev_copy.ident < FD_SETSIZE &&
2702 FD_ISSET((int)kev_copy.ident, &_dispatch_wfds)) {
2703 FD_CLR((int)kev_copy.ident, &_dispatch_wfds);
2704 _dispatch_wfd_ptrs[kev_copy.ident] = 0;
2705 (void)dispatch_atomic_dec(&_dispatch_select_workaround);
2706 return 0;
2707 }
2708 break;
2709 default:
2710 break;
2711 }
2712 }
2713
2714 retry:
2715 rval = kevent(_dispatch_get_kq(), &kev_copy, 1, &kev_copy, 1, NULL);
2716 if (rval == -1) {
2717 // If we fail to register with kevents, for other reasons aside from
2718 // changelist elements.
2719 int err = errno;
2720 switch (err) {
2721 case EINTR:
2722 goto retry;
2723 case EBADF:
2724 _dispatch_bug_client("Do not close random Unix descriptors");
2725 break;
2726 default:
2727 (void)dispatch_assume_zero(err);
2728 break;
2729 }
2730 //kev_copy.flags |= EV_ERROR;
2731 //kev_copy.data = err;
2732 return err;
2733 }
2734
2735 // The following select workaround only applies to adding kevents
2736 if ((kev->flags & (EV_DISABLE|EV_DELETE)) ||
2737 !(kev->flags & (EV_ADD|EV_ENABLE))) {
2738 return 0;
2739 }
2740
2741 // Only executed on manager queue
2742 switch (kev_copy.data) {
2743 case 0:
2744 return 0;
2745 case EBADF:
2746 break;
2747 default:
2748 // If an error occurred while registering with kevent, and it was
2749 // because of a kevent changelist processing && the kevent involved
2750 // either doing a read or write, it would indicate we were trying
2751 // to register a /dev/* port; fall back to select
2752 switch (kev_copy.filter) {
2753 case EVFILT_READ:
2754 if (dispatch_assume(kev_copy.ident < FD_SETSIZE)) {
2755 if (!_dispatch_rfd_ptrs) {
2756 _dispatch_rfd_ptrs = calloc(FD_SETSIZE, sizeof(void*));
2757 }
2758 _dispatch_rfd_ptrs[kev_copy.ident] = kev_copy.udata;
2759 FD_SET((int)kev_copy.ident, &_dispatch_rfds);
2760 (void)dispatch_atomic_inc(&_dispatch_select_workaround);
2761 _dispatch_debug("select workaround used to read fd %d: 0x%lx",
2762 (int)kev_copy.ident, (long)kev_copy.data);
2763 return 0;
2764 }
2765 break;
2766 case EVFILT_WRITE:
2767 if (dispatch_assume(kev_copy.ident < FD_SETSIZE)) {
2768 if (!_dispatch_wfd_ptrs) {
2769 _dispatch_wfd_ptrs = calloc(FD_SETSIZE, sizeof(void*));
2770 }
2771 _dispatch_wfd_ptrs[kev_copy.ident] = kev_copy.udata;
2772 FD_SET((int)kev_copy.ident, &_dispatch_wfds);
2773 (void)dispatch_atomic_inc(&_dispatch_select_workaround);
2774 _dispatch_debug("select workaround used to write fd %d: 0x%lx",
2775 (int)kev_copy.ident, (long)kev_copy.data);
2776 return 0;
2777 }
2778 break;
2779 default:
2780 // kevent error, _dispatch_source_merge_kevent() will handle it
2781 _dispatch_source_drain_kevent(&kev_copy);
2782 break;
2783 }
2784 break;
2785 }
2786 return kev_copy.data;
2787 }
2788
2789 bool
2790 _dispatch_mgr_wakeup(dispatch_queue_t dq)
2791 {
2792 static const struct kevent kev = {
2793 .ident = 1,
2794 .filter = EVFILT_USER,
2795 .fflags = NOTE_TRIGGER,
2796 };
2797
2798 _dispatch_debug("waking up the _dispatch_mgr_q: %p", dq);
2799
2800 _dispatch_update_kq(&kev);
2801
2802 return false;
2803 }
2804
2805 static void
2806 _dispatch_mgr_thread2(struct kevent *kev, size_t cnt)
2807 {
2808 size_t i;
2809
2810 for (i = 0; i < cnt; i++) {
2811 // EVFILT_USER isn't used by sources
2812 if (kev[i].filter == EVFILT_USER) {
2813 // If _dispatch_mgr_thread2() ever is changed to return to the
2814 // caller, then this should become _dispatch_queue_drain()
2815 _dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q);
2816 } else {
2817 _dispatch_source_drain_kevent(&kev[i]);
2818 }
2819 }
2820 }
2821
2822 #if DISPATCH_USE_VM_PRESSURE && DISPATCH_USE_MALLOC_VM_PRESSURE_SOURCE
2823 // VM Pressure source for malloc <rdar://problem/7805121>
2824 static dispatch_source_t _dispatch_malloc_vm_pressure_source;
2825
2826 static void
2827 _dispatch_malloc_vm_pressure_handler(void *context DISPATCH_UNUSED)
2828 {
2829 malloc_zone_pressure_relief(0,0);
2830 }
2831
2832 static void
2833 _dispatch_malloc_vm_pressure_setup(void)
2834 {
2835 _dispatch_malloc_vm_pressure_source = dispatch_source_create(
2836 DISPATCH_SOURCE_TYPE_VM, 0, DISPATCH_VM_PRESSURE,
2837 _dispatch_get_root_queue(0, true));
2838 dispatch_source_set_event_handler_f(_dispatch_malloc_vm_pressure_source,
2839 _dispatch_malloc_vm_pressure_handler);
2840 dispatch_resume(_dispatch_malloc_vm_pressure_source);
2841 }
2842 #else
2843 #define _dispatch_malloc_vm_pressure_setup()
2844 #endif
2845
2846 DISPATCH_NOINLINE DISPATCH_NORETURN
2847 static void
2848 _dispatch_mgr_invoke(void)
2849 {
2850 static const struct timespec timeout_immediately = { 0, 0 };
2851 struct timespec timeout;
2852 const struct timespec *timeoutp;
2853 struct timeval sel_timeout, *sel_timeoutp;
2854 fd_set tmp_rfds, tmp_wfds;
2855 struct kevent kev[1];
2856 int k_cnt, err, i, r;
2857
2858 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_mgr_q);
2859 #if DISPATCH_COCOA_COMPAT
2860 // Do not count the manager thread as a worker thread
2861 (void)dispatch_atomic_dec(&_dispatch_worker_threads);
2862 #endif
2863 _dispatch_malloc_vm_pressure_setup();
2864
2865 for (;;) {
2866 _dispatch_run_timers();
2867
2868 timeoutp = _dispatch_get_next_timer_fire(&timeout);
2869
2870 if (_dispatch_select_workaround) {
2871 FD_COPY(&_dispatch_rfds, &tmp_rfds);
2872 FD_COPY(&_dispatch_wfds, &tmp_wfds);
2873 if (timeoutp) {
2874 sel_timeout.tv_sec = timeoutp->tv_sec;
2875 sel_timeout.tv_usec = (typeof(sel_timeout.tv_usec))
2876 (timeoutp->tv_nsec / 1000u);
2877 sel_timeoutp = &sel_timeout;
2878 } else {
2879 sel_timeoutp = NULL;
2880 }
2881
2882 r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, sel_timeoutp);
2883 if (r == -1) {
2884 err = errno;
2885 if (err != EBADF) {
2886 if (err != EINTR) {
2887 (void)dispatch_assume_zero(err);
2888 }
2889 continue;
2890 }
2891 for (i = 0; i < FD_SETSIZE; i++) {
2892 if (i == _dispatch_kq) {
2893 continue;
2894 }
2895 if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i,
2896 &_dispatch_wfds)) {
2897 continue;
2898 }
2899 r = dup(i);
2900 if (r != -1) {
2901 close(r);
2902 } else {
2903 if (FD_ISSET(i, &_dispatch_rfds)) {
2904 FD_CLR(i, &_dispatch_rfds);
2905 _dispatch_rfd_ptrs[i] = 0;
2906 (void)dispatch_atomic_dec(
2907 &_dispatch_select_workaround);
2908 }
2909 if (FD_ISSET(i, &_dispatch_wfds)) {
2910 FD_CLR(i, &_dispatch_wfds);
2911 _dispatch_wfd_ptrs[i] = 0;
2912 (void)dispatch_atomic_dec(
2913 &_dispatch_select_workaround);
2914 }
2915 }
2916 }
2917 continue;
2918 }
2919
2920 if (r > 0) {
2921 for (i = 0; i < FD_SETSIZE; i++) {
2922 if (i == _dispatch_kq) {
2923 continue;
2924 }
2925 if (FD_ISSET(i, &tmp_rfds)) {
2926 FD_CLR(i, &_dispatch_rfds); // emulate EV_DISABLE
2927 EV_SET(&kev[0], i, EVFILT_READ,
2928 EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2929 _dispatch_rfd_ptrs[i]);
2930 _dispatch_rfd_ptrs[i] = 0;
2931 (void)dispatch_atomic_dec(&_dispatch_select_workaround);
2932 _dispatch_mgr_thread2(kev, 1);
2933 }
2934 if (FD_ISSET(i, &tmp_wfds)) {
2935 FD_CLR(i, &_dispatch_wfds); // emulate EV_DISABLE
2936 EV_SET(&kev[0], i, EVFILT_WRITE,
2937 EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2938 _dispatch_wfd_ptrs[i]);
2939 _dispatch_wfd_ptrs[i] = 0;
2940 (void)dispatch_atomic_dec(&_dispatch_select_workaround);
2941 _dispatch_mgr_thread2(kev, 1);
2942 }
2943 }
2944 }
2945
2946 timeoutp = &timeout_immediately;
2947 }
2948
2949 k_cnt = kevent(_dispatch_kq, NULL, 0, kev, sizeof(kev) / sizeof(kev[0]),
2950 timeoutp);
2951 err = errno;
2952
2953 switch (k_cnt) {
2954 case -1:
2955 if (err == EBADF) {
2956 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2957 }
2958 if (err != EINTR) {
2959 (void)dispatch_assume_zero(err);
2960 }
2961 continue;
2962 default:
2963 _dispatch_mgr_thread2(kev, (size_t)k_cnt);
2964 // fall through
2965 case 0:
2966 _dispatch_force_cache_cleanup();
2967 continue;
2968 }
2969 }
2970 }
2971
2972 DISPATCH_NORETURN
2973 dispatch_queue_t
2974 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED)
2975 {
2976 // never returns, so burn bridges behind us & clear stack 2k ahead
2977 _dispatch_clear_stack(2048);
2978 _dispatch_mgr_invoke();
2979 }