]> git.saurik.com Git - apple/libdispatch.git/blob - src/queue.c
595bac5620f54082327cb35ea9239989073fd74b
[apple/libdispatch.git] / src / queue.c
1 /*
2 * Copyright (c) 2008-2011 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #endif
25
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
29 #endif
30
31 static void _dispatch_cache_cleanup(void *value);
32 static void _dispatch_async_f_redirect(dispatch_queue_t dq,
33 dispatch_continuation_t dc);
34 static void _dispatch_queue_cleanup(void *ctxt);
35 static bool _dispatch_queue_wakeup_global(dispatch_queue_t dq);
36 static void _dispatch_queue_drain(dispatch_queue_t dq);
37 static inline _dispatch_thread_semaphore_t
38 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq);
39 static void _dispatch_worker_thread2(void *context);
40 #if DISPATCH_ENABLE_THREAD_POOL
41 static void *_dispatch_worker_thread(void *context);
42 static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
43 #endif
44 static bool _dispatch_mgr_wakeup(dispatch_queue_t dq);
45 static dispatch_queue_t _dispatch_mgr_thread(dispatch_queue_t dq);
46
47 #if DISPATCH_COCOA_COMPAT
48 static unsigned int _dispatch_worker_threads;
49 static dispatch_once_t _dispatch_main_q_port_pred;
50 static mach_port_t main_q_port;
51
52 static void _dispatch_main_q_port_init(void *ctxt);
53 static void _dispatch_queue_wakeup_main(void);
54 static void _dispatch_main_queue_drain(void);
55 #endif
56
57 #pragma mark -
58 #pragma mark dispatch_queue_vtable
59
60 const struct dispatch_queue_vtable_s _dispatch_queue_vtable = {
61 .do_type = DISPATCH_QUEUE_TYPE,
62 .do_kind = "queue",
63 .do_dispose = _dispatch_queue_dispose,
64 .do_invoke = NULL,
65 .do_probe = (void *)dummy_function_r0,
66 .do_debug = dispatch_queue_debug,
67 };
68
69 static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable = {
70 .do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
71 .do_kind = "global-queue",
72 .do_debug = dispatch_queue_debug,
73 .do_probe = _dispatch_queue_wakeup_global,
74 };
75
76 static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = {
77 .do_type = DISPATCH_QUEUE_MGR_TYPE,
78 .do_kind = "mgr-queue",
79 .do_invoke = _dispatch_mgr_thread,
80 .do_debug = dispatch_queue_debug,
81 .do_probe = _dispatch_mgr_wakeup,
82 };
83
84 #pragma mark -
85 #pragma mark dispatch_root_queue
86
87 #if HAVE_PTHREAD_WORKQUEUES
88 static const int _dispatch_root_queue_wq_priorities[] = {
89 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = WORKQ_LOW_PRIOQUEUE,
90 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = WORKQ_LOW_PRIOQUEUE,
91 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = WORKQ_DEFAULT_PRIOQUEUE,
92 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] =
93 WORKQ_DEFAULT_PRIOQUEUE,
94 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = WORKQ_HIGH_PRIOQUEUE,
95 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = WORKQ_HIGH_PRIOQUEUE,
96 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = WORKQ_BG_PRIOQUEUE,
97 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] =
98 WORKQ_BG_PRIOQUEUE,
99 };
100 #endif
101
102 #if DISPATCH_ENABLE_THREAD_POOL
103 static struct dispatch_semaphore_s _dispatch_thread_mediator[] = {
104 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
105 .do_vtable = &_dispatch_semaphore_vtable,
106 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
107 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
108 },
109 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
110 .do_vtable = &_dispatch_semaphore_vtable,
111 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
112 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
113 },
114 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
115 .do_vtable = &_dispatch_semaphore_vtable,
116 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
117 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
118 },
119 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
120 .do_vtable = &_dispatch_semaphore_vtable,
121 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
122 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
123 },
124 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
125 .do_vtable = &_dispatch_semaphore_vtable,
126 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
127 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
128 },
129 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
130 .do_vtable = &_dispatch_semaphore_vtable,
131 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
132 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
133 },
134 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
135 .do_vtable = &_dispatch_semaphore_vtable,
136 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
137 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
138 },
139 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
140 .do_vtable = &_dispatch_semaphore_vtable,
141 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
142 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
143 },
144 };
145 #endif
146
147 #define MAX_THREAD_COUNT 255
148
149 struct dispatch_root_queue_context_s {
150 #if HAVE_PTHREAD_WORKQUEUES
151 pthread_workqueue_t dgq_kworkqueue;
152 #endif
153 uint32_t dgq_pending;
154 #if DISPATCH_ENABLE_THREAD_POOL
155 uint32_t dgq_thread_pool_size;
156 dispatch_semaphore_t dgq_thread_mediator;
157 #endif
158 };
159
160 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
161 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
162 #if DISPATCH_ENABLE_THREAD_POOL
163 .dgq_thread_mediator = &_dispatch_thread_mediator[
164 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
165 .dgq_thread_pool_size = MAX_THREAD_COUNT,
166 #endif
167 },
168 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
169 #if DISPATCH_ENABLE_THREAD_POOL
170 .dgq_thread_mediator = &_dispatch_thread_mediator[
171 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
172 .dgq_thread_pool_size = MAX_THREAD_COUNT,
173 #endif
174 },
175 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
176 #if DISPATCH_ENABLE_THREAD_POOL
177 .dgq_thread_mediator = &_dispatch_thread_mediator[
178 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
179 .dgq_thread_pool_size = MAX_THREAD_COUNT,
180 #endif
181 },
182 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
183 #if DISPATCH_ENABLE_THREAD_POOL
184 .dgq_thread_mediator = &_dispatch_thread_mediator[
185 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
186 .dgq_thread_pool_size = MAX_THREAD_COUNT,
187 #endif
188 },
189 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
190 #if DISPATCH_ENABLE_THREAD_POOL
191 .dgq_thread_mediator = &_dispatch_thread_mediator[
192 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
193 .dgq_thread_pool_size = MAX_THREAD_COUNT,
194 #endif
195 },
196 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
197 #if DISPATCH_ENABLE_THREAD_POOL
198 .dgq_thread_mediator = &_dispatch_thread_mediator[
199 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
200 .dgq_thread_pool_size = MAX_THREAD_COUNT,
201 #endif
202 },
203 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
204 #if DISPATCH_ENABLE_THREAD_POOL
205 .dgq_thread_mediator = &_dispatch_thread_mediator[
206 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
207 .dgq_thread_pool_size = MAX_THREAD_COUNT,
208 #endif
209 },
210 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
211 #if DISPATCH_ENABLE_THREAD_POOL
212 .dgq_thread_mediator = &_dispatch_thread_mediator[
213 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
214 .dgq_thread_pool_size = MAX_THREAD_COUNT,
215 #endif
216 },
217 };
218
219 // 6618342 Contact the team that owns the Instrument DTrace probe before
220 // renaming this symbol
221 // dq_running is set to 2 so that barrier operations go through the slow path
222 DISPATCH_CACHELINE_ALIGN
223 struct dispatch_queue_s _dispatch_root_queues[] = {
224 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
225 .do_vtable = &_dispatch_queue_root_vtable,
226 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
227 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
228 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
229 .do_ctxt = &_dispatch_root_queue_contexts[
230 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
231
232 .dq_label = "com.apple.root.low-priority",
233 .dq_running = 2,
234 .dq_width = UINT32_MAX,
235 .dq_serialnum = 4,
236 },
237 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
238 .do_vtable = &_dispatch_queue_root_vtable,
239 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
240 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
241 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
242 .do_ctxt = &_dispatch_root_queue_contexts[
243 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
244
245 .dq_label = "com.apple.root.low-overcommit-priority",
246 .dq_running = 2,
247 .dq_width = UINT32_MAX,
248 .dq_serialnum = 5,
249 },
250 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
251 .do_vtable = &_dispatch_queue_root_vtable,
252 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
253 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
254 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
255 .do_ctxt = &_dispatch_root_queue_contexts[
256 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
257
258 .dq_label = "com.apple.root.default-priority",
259 .dq_running = 2,
260 .dq_width = UINT32_MAX,
261 .dq_serialnum = 6,
262 },
263 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
264 .do_vtable = &_dispatch_queue_root_vtable,
265 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
266 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
267 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
268 .do_ctxt = &_dispatch_root_queue_contexts[
269 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
270
271 .dq_label = "com.apple.root.default-overcommit-priority",
272 .dq_running = 2,
273 .dq_width = UINT32_MAX,
274 .dq_serialnum = 7,
275 },
276 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
277 .do_vtable = &_dispatch_queue_root_vtable,
278 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
279 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
280 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
281 .do_ctxt = &_dispatch_root_queue_contexts[
282 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
283
284 .dq_label = "com.apple.root.high-priority",
285 .dq_running = 2,
286 .dq_width = UINT32_MAX,
287 .dq_serialnum = 8,
288 },
289 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
290 .do_vtable = &_dispatch_queue_root_vtable,
291 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
292 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
293 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
294 .do_ctxt = &_dispatch_root_queue_contexts[
295 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
296
297 .dq_label = "com.apple.root.high-overcommit-priority",
298 .dq_running = 2,
299 .dq_width = UINT32_MAX,
300 .dq_serialnum = 9,
301 },
302 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
303 .do_vtable = &_dispatch_queue_root_vtable,
304 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
305 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
306 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
307 .do_ctxt = &_dispatch_root_queue_contexts[
308 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
309
310 .dq_label = "com.apple.root.background-priority",
311 .dq_running = 2,
312 .dq_width = UINT32_MAX,
313 .dq_serialnum = 10,
314 },
315 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
316 .do_vtable = &_dispatch_queue_root_vtable,
317 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
318 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
319 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
320 .do_ctxt = &_dispatch_root_queue_contexts[
321 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
322
323 .dq_label = "com.apple.root.background-overcommit-priority",
324 .dq_running = 2,
325 .dq_width = UINT32_MAX,
326 .dq_serialnum = 11,
327 },
328 };
329
330 // 6618342 Contact the team that owns the Instrument DTrace probe before
331 // renaming this symbol
332 DISPATCH_CACHELINE_ALIGN
333 struct dispatch_queue_s _dispatch_mgr_q = {
334 .do_vtable = &_dispatch_queue_mgr_vtable,
335 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
336 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
337 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
338 .do_targetq = &_dispatch_root_queues[
339 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
340
341 .dq_label = "com.apple.libdispatch-manager",
342 .dq_width = 1,
343 .dq_serialnum = 2,
344 };
345
346 dispatch_queue_t
347 dispatch_get_global_queue(long priority, unsigned long flags)
348 {
349 if (flags & ~DISPATCH_QUEUE_OVERCOMMIT) {
350 return NULL;
351 }
352 return _dispatch_get_root_queue(priority,
353 flags & DISPATCH_QUEUE_OVERCOMMIT);
354 }
355
356 dispatch_queue_t
357 dispatch_get_current_queue(void)
358 {
359 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
360 }
361
362 #pragma mark -
363 #pragma mark dispatch_init
364
365 static void
366 _dispatch_hw_config_init(void)
367 {
368 _dispatch_hw_config.cc_max_active = _dispatch_get_activecpu();
369 _dispatch_hw_config.cc_max_logical = _dispatch_get_logicalcpu_max();
370 _dispatch_hw_config.cc_max_physical = _dispatch_get_physicalcpu_max();
371 }
372
373 static inline bool
374 _dispatch_root_queues_init_workq(void)
375 {
376 bool result = false;
377 #if HAVE_PTHREAD_WORKQUEUES
378 #if DISPATCH_ENABLE_THREAD_POOL
379 if (slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"))) return result;
380 #endif
381 int i, r;
382 pthread_workqueue_attr_t pwq_attr;
383 r = pthread_workqueue_attr_init_np(&pwq_attr);
384 (void)dispatch_assume_zero(r);
385 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
386 pthread_workqueue_t pwq = NULL;
387 const int prio = _dispatch_root_queue_wq_priorities[i];
388
389 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr, prio);
390 (void)dispatch_assume_zero(r);
391 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr, i & 1);
392 (void)dispatch_assume_zero(r);
393 r = pthread_workqueue_create_np(&pwq, &pwq_attr);
394 (void)dispatch_assume_zero(r);
395 result = result || dispatch_assume(pwq);
396 _dispatch_root_queue_contexts[i].dgq_kworkqueue = pwq;
397 }
398 r = pthread_workqueue_attr_destroy_np(&pwq_attr);
399 (void)dispatch_assume_zero(r);
400 #endif // HAVE_PTHREAD_WORKQUEUES
401 return result;
402 }
403
404 static inline void
405 _dispatch_root_queues_init_thread_pool(void)
406 {
407 #if DISPATCH_ENABLE_THREAD_POOL
408 int i;
409 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
410 #if TARGET_OS_EMBEDDED
411 // some software hangs if the non-overcommitting queues do not
412 // overcommit when threads block. Someday, this behavior should apply
413 // to all platforms
414 if (!(i & 1)) {
415 _dispatch_root_queue_contexts[i].dgq_thread_pool_size =
416 _dispatch_hw_config.cc_max_active;
417 }
418 #endif
419 #if USE_MACH_SEM
420 // override the default FIFO behavior for the pool semaphores
421 kern_return_t kr = semaphore_create(mach_task_self(),
422 &_dispatch_thread_mediator[i].dsema_port, SYNC_POLICY_LIFO, 0);
423 DISPATCH_VERIFY_MIG(kr);
424 (void)dispatch_assume_zero(kr);
425 (void)dispatch_assume(_dispatch_thread_mediator[i].dsema_port);
426 #elif USE_POSIX_SEM
427 /* XXXRW: POSIX semaphores don't support LIFO? */
428 int ret = sem_init(&_dispatch_thread_mediator[i].dsema_sem, 0, 0);
429 (void)dispatch_assume_zero(ret);
430 #endif
431 }
432 #else
433 DISPATCH_CRASH("Thread pool creation failed");
434 #endif // DISPATCH_ENABLE_THREAD_POOL
435 }
436
437 static void
438 _dispatch_root_queues_init(void *context DISPATCH_UNUSED)
439 {
440 if (!_dispatch_root_queues_init_workq()) {
441 _dispatch_root_queues_init_thread_pool();
442 }
443
444 }
445
446 #define countof(x) (sizeof(x) / sizeof(x[0]))
447
448 DISPATCH_EXPORT DISPATCH_NOTHROW
449 void
450 libdispatch_init(void)
451 {
452 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT == 4);
453 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 8);
454
455 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW ==
456 -DISPATCH_QUEUE_PRIORITY_HIGH);
457 dispatch_assert(countof(_dispatch_root_queues) ==
458 DISPATCH_ROOT_QUEUE_COUNT);
459 dispatch_assert(countof(_dispatch_root_queue_contexts) ==
460 DISPATCH_ROOT_QUEUE_COUNT);
461 #if HAVE_PTHREAD_WORKQUEUES
462 dispatch_assert(countof(_dispatch_root_queue_wq_priorities) ==
463 DISPATCH_ROOT_QUEUE_COUNT);
464 #endif
465 #if DISPATCH_ENABLE_THREAD_POOL
466 dispatch_assert(countof(_dispatch_thread_mediator) ==
467 DISPATCH_ROOT_QUEUE_COUNT);
468 #endif
469 dispatch_assert(sizeof(struct dispatch_source_s) ==
470 sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
471 #if DISPATCH_DEBUG
472 dispatch_assert(sizeof(struct dispatch_queue_s) % DISPATCH_CACHELINE_SIZE
473 == 0);
474 #endif
475
476 _dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup);
477 _dispatch_thread_key_create(&dispatch_sema4_key,
478 (void (*)(void *))_dispatch_thread_semaphore_dispose);
479 _dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup);
480 _dispatch_thread_key_create(&dispatch_io_key, NULL);
481 _dispatch_thread_key_create(&dispatch_apply_key, NULL);
482 #if DISPATCH_PERF_MON
483 _dispatch_thread_key_create(&dispatch_bcounter_key, NULL);
484 #endif
485
486 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
487 _dispatch_main_q.do_vtable = &_dispatch_queue_vtable;
488 _dispatch_main_q.do_targetq = &_dispatch_root_queues[
489 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY];
490 _dispatch_data_empty.do_vtable = &_dispatch_data_vtable;
491 #endif
492
493 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_main_q);
494
495 #if DISPATCH_USE_PTHREAD_ATFORK
496 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare,
497 dispatch_atfork_parent, dispatch_atfork_child));
498 #endif
499
500 _dispatch_hw_config_init();
501 }
502
503 DISPATCH_EXPORT DISPATCH_NOTHROW
504 void
505 dispatch_atfork_child(void)
506 {
507 void *crash = (void *)0x100;
508 size_t i;
509
510 if (_dispatch_safe_fork) {
511 return;
512 }
513
514 _dispatch_main_q.dq_items_head = crash;
515 _dispatch_main_q.dq_items_tail = crash;
516
517 _dispatch_mgr_q.dq_items_head = crash;
518 _dispatch_mgr_q.dq_items_tail = crash;
519
520 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
521 _dispatch_root_queues[i].dq_items_head = crash;
522 _dispatch_root_queues[i].dq_items_tail = crash;
523 }
524 }
525
526 #pragma mark -
527 #pragma mark dispatch_queue_t
528
529 // skip zero
530 // 1 - main_q
531 // 2 - mgr_q
532 // 3 - _unused_
533 // 4,5,6,7,8,9,10,11 - global queues
534 // we use 'xadd' on Intel, so the initial value == next assigned
535 unsigned long _dispatch_queue_serial_numbers = 12;
536
537 dispatch_queue_t
538 dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
539 {
540 dispatch_queue_t dq;
541 size_t label_len;
542
543 if (!label) {
544 label = "";
545 }
546
547 label_len = strlen(label);
548 if (label_len < (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1)) {
549 label_len = (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1);
550 }
551
552 // XXX switch to malloc()
553 dq = calloc(1ul, sizeof(struct dispatch_queue_s) -
554 DISPATCH_QUEUE_MIN_LABEL_SIZE - DISPATCH_QUEUE_CACHELINE_PAD +
555 label_len + 1);
556 if (slowpath(!dq)) {
557 return dq;
558 }
559
560 _dispatch_queue_init(dq);
561 strcpy(dq->dq_label, label);
562
563 if (fastpath(!attr)) {
564 return dq;
565 }
566 if (fastpath(attr == DISPATCH_QUEUE_CONCURRENT)) {
567 dq->dq_width = UINT32_MAX;
568 dq->do_targetq = _dispatch_get_root_queue(0, false);
569 } else {
570 dispatch_debug_assert(!attr, "Invalid attribute");
571 }
572 return dq;
573 }
574
575 // 6618342 Contact the team that owns the Instrument DTrace probe before
576 // renaming this symbol
577 void
578 _dispatch_queue_dispose(dispatch_queue_t dq)
579 {
580 if (slowpath(dq == _dispatch_queue_get_current())) {
581 DISPATCH_CRASH("Release of a queue by itself");
582 }
583 if (slowpath(dq->dq_items_tail)) {
584 DISPATCH_CRASH("Release of a queue while items are enqueued");
585 }
586
587 // trash the tail queue so that use after free will crash
588 dq->dq_items_tail = (void *)0x200;
589
590 dispatch_queue_t dqsq = dispatch_atomic_xchg2o(dq, dq_specific_q,
591 (void *)0x200);
592 if (dqsq) {
593 _dispatch_release(dqsq);
594 }
595
596 _dispatch_dispose(dq);
597 }
598
599 const char *
600 dispatch_queue_get_label(dispatch_queue_t dq)
601 {
602 return dq->dq_label;
603 }
604
605 static void
606 _dispatch_queue_set_width2(void *ctxt)
607 {
608 int w = (int)(intptr_t)ctxt; // intentional truncation
609 uint32_t tmp;
610 dispatch_queue_t dq = _dispatch_queue_get_current();
611
612 if (w == 1 || w == 0) {
613 dq->dq_width = 1;
614 return;
615 }
616 if (w > 0) {
617 tmp = w;
618 } else switch (w) {
619 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
620 tmp = _dispatch_hw_config.cc_max_physical;
621 break;
622 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
623 tmp = _dispatch_hw_config.cc_max_active;
624 break;
625 default:
626 // fall through
627 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
628 tmp = _dispatch_hw_config.cc_max_logical;
629 break;
630 }
631 // multiply by two since the running count is inc/dec by two
632 // (the low bit == barrier)
633 dq->dq_width = tmp * 2;
634 }
635
636 void
637 dispatch_queue_set_width(dispatch_queue_t dq, long width)
638 {
639 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
640 return;
641 }
642 dispatch_barrier_async_f(dq, (void*)(intptr_t)width,
643 _dispatch_queue_set_width2);
644 }
645
646 // 6618342 Contact the team that owns the Instrument DTrace probe before
647 // renaming this symbol
648 static void
649 _dispatch_set_target_queue2(void *ctxt)
650 {
651 dispatch_queue_t prev_dq, dq = _dispatch_queue_get_current();
652
653 prev_dq = dq->do_targetq;
654 dq->do_targetq = ctxt;
655 _dispatch_release(prev_dq);
656 }
657
658 void
659 dispatch_set_target_queue(dispatch_object_t dou, dispatch_queue_t dq)
660 {
661 dispatch_queue_t prev_dq;
662 unsigned long type;
663
664 if (slowpath(dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
665 return;
666 }
667 type = dx_type(dou._do) & _DISPATCH_META_TYPE_MASK;
668 if (slowpath(!dq)) {
669 bool is_concurrent_q = (type == _DISPATCH_QUEUE_TYPE &&
670 slowpath(dou._dq->dq_width > 1));
671 dq = _dispatch_get_root_queue(0, !is_concurrent_q);
672 }
673 // TODO: put into the vtable
674 switch(type) {
675 case _DISPATCH_QUEUE_TYPE:
676 case _DISPATCH_SOURCE_TYPE:
677 _dispatch_retain(dq);
678 return dispatch_barrier_async_f(dou._dq, dq,
679 _dispatch_set_target_queue2);
680 case _DISPATCH_IO_TYPE:
681 return _dispatch_io_set_target_queue(dou._dchannel, dq);
682 default:
683 _dispatch_retain(dq);
684 dispatch_atomic_store_barrier();
685 prev_dq = dispatch_atomic_xchg2o(dou._do, do_targetq, dq);
686 if (prev_dq) _dispatch_release(prev_dq);
687 return;
688 }
689 }
690
691 void
692 dispatch_set_current_target_queue(dispatch_queue_t dq)
693 {
694 dispatch_queue_t queue = _dispatch_queue_get_current();
695
696 if (slowpath(!queue)) {
697 DISPATCH_CLIENT_CRASH("SPI not called from a queue");
698 }
699 if (slowpath(queue->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
700 DISPATCH_CLIENT_CRASH("SPI not supported on this queue");
701 }
702 if (slowpath(queue->dq_width != 1)) {
703 DISPATCH_CLIENT_CRASH("SPI not called from a serial queue");
704 }
705 if (slowpath(!dq)) {
706 dq = _dispatch_get_root_queue(0, true);
707 }
708 _dispatch_retain(dq);
709 _dispatch_set_target_queue2(dq);
710 }
711
712 #pragma mark -
713 #pragma mark dispatch_queue_specific
714
715 struct dispatch_queue_specific_queue_s {
716 DISPATCH_STRUCT_HEADER(dispatch_queue_specific_queue_s,
717 dispatch_queue_specific_queue_vtable_s);
718 DISPATCH_QUEUE_HEADER;
719 union {
720 char _dqsq_pad[DISPATCH_QUEUE_MIN_LABEL_SIZE];
721 struct {
722 char dq_label[16];
723 TAILQ_HEAD(dispatch_queue_specific_head_s,
724 dispatch_queue_specific_s) dqsq_contexts;
725 };
726 };
727 };
728 DISPATCH_DECL(dispatch_queue_specific_queue);
729
730 static void
731 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq);
732
733 struct dispatch_queue_specific_queue_vtable_s {
734 DISPATCH_VTABLE_HEADER(dispatch_queue_specific_queue_s);
735 };
736
737 static const struct dispatch_queue_specific_queue_vtable_s
738 _dispatch_queue_specific_queue_vtable = {
739 .do_type = DISPATCH_QUEUE_SPECIFIC_TYPE,
740 .do_kind = "queue-context",
741 .do_dispose = _dispatch_queue_specific_queue_dispose,
742 .do_invoke = NULL,
743 .do_probe = (void *)dummy_function_r0,
744 .do_debug = (void *)dispatch_queue_debug,
745 };
746
747 struct dispatch_queue_specific_s {
748 const void *dqs_key;
749 void *dqs_ctxt;
750 dispatch_function_t dqs_destructor;
751 TAILQ_ENTRY(dispatch_queue_specific_s) dqs_list;
752 };
753 DISPATCH_DECL(dispatch_queue_specific);
754
755 static void
756 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq)
757 {
758 dispatch_queue_specific_t dqs, tmp;
759
760 TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) {
761 if (dqs->dqs_destructor) {
762 dispatch_async_f(_dispatch_get_root_queue(
763 DISPATCH_QUEUE_PRIORITY_DEFAULT, false), dqs->dqs_ctxt,
764 dqs->dqs_destructor);
765 }
766 free(dqs);
767 }
768 _dispatch_queue_dispose((dispatch_queue_t)dqsq);
769 }
770
771 static void
772 _dispatch_queue_init_specific(dispatch_queue_t dq)
773 {
774 dispatch_queue_specific_queue_t dqsq;
775
776 dqsq = calloc(1ul, sizeof(struct dispatch_queue_specific_queue_s));
777 _dispatch_queue_init((dispatch_queue_t)dqsq);
778 dqsq->do_vtable = &_dispatch_queue_specific_queue_vtable;
779 dqsq->do_xref_cnt = 0;
780 dqsq->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_HIGH,
781 true);
782 dqsq->dq_width = UINT32_MAX;
783 strlcpy(dqsq->dq_label, "queue-specific", sizeof(dqsq->dq_label));
784 TAILQ_INIT(&dqsq->dqsq_contexts);
785 dispatch_atomic_store_barrier();
786 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_specific_q, NULL, dqsq))) {
787 _dispatch_release((dispatch_queue_t)dqsq);
788 }
789 }
790
791 static void
792 _dispatch_queue_set_specific(void *ctxt)
793 {
794 dispatch_queue_specific_t dqs, dqsn = ctxt;
795 dispatch_queue_specific_queue_t dqsq =
796 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
797
798 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
799 if (dqs->dqs_key == dqsn->dqs_key) {
800 // Destroy previous context for existing key
801 if (dqs->dqs_destructor) {
802 dispatch_async_f(_dispatch_get_root_queue(
803 DISPATCH_QUEUE_PRIORITY_DEFAULT, false), dqs->dqs_ctxt,
804 dqs->dqs_destructor);
805 }
806 if (dqsn->dqs_ctxt) {
807 // Copy new context for existing key
808 dqs->dqs_ctxt = dqsn->dqs_ctxt;
809 dqs->dqs_destructor = dqsn->dqs_destructor;
810 } else {
811 // Remove context storage for existing key
812 TAILQ_REMOVE(&dqsq->dqsq_contexts, dqs, dqs_list);
813 free(dqs);
814 }
815 return free(dqsn);
816 }
817 }
818 // Insert context storage for new key
819 TAILQ_INSERT_TAIL(&dqsq->dqsq_contexts, dqsn, dqs_list);
820 }
821
822 DISPATCH_NOINLINE
823 void
824 dispatch_queue_set_specific(dispatch_queue_t dq, const void *key,
825 void *ctxt, dispatch_function_t destructor)
826 {
827 if (slowpath(!key)) {
828 return;
829 }
830 dispatch_queue_specific_t dqs;
831
832 dqs = calloc(1, sizeof(struct dispatch_queue_specific_s));
833 dqs->dqs_key = key;
834 dqs->dqs_ctxt = ctxt;
835 dqs->dqs_destructor = destructor;
836 if (slowpath(!dq->dq_specific_q)) {
837 _dispatch_queue_init_specific(dq);
838 }
839 dispatch_barrier_async_f(dq->dq_specific_q, dqs,
840 _dispatch_queue_set_specific);
841 }
842
843 static void
844 _dispatch_queue_get_specific(void *ctxt)
845 {
846 void **ctxtp = ctxt;
847 void *key = *ctxtp;
848 dispatch_queue_specific_queue_t dqsq =
849 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
850 dispatch_queue_specific_t dqs;
851
852 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
853 if (dqs->dqs_key == key) {
854 *ctxtp = dqs->dqs_ctxt;
855 return;
856 }
857 }
858 *ctxtp = NULL;
859 }
860
861 DISPATCH_NOINLINE
862 void *
863 dispatch_queue_get_specific(dispatch_queue_t dq, const void *key)
864 {
865 if (slowpath(!key)) {
866 return NULL;
867 }
868 void *ctxt = NULL;
869
870 if (fastpath(dq->dq_specific_q)) {
871 ctxt = (void *)key;
872 dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific);
873 }
874 return ctxt;
875 }
876
877 DISPATCH_NOINLINE
878 void *
879 dispatch_get_specific(const void *key)
880 {
881 if (slowpath(!key)) {
882 return NULL;
883 }
884 void *ctxt = NULL;
885 dispatch_queue_t dq = _dispatch_queue_get_current();
886
887 while (slowpath(dq)) {
888 if (slowpath(dq->dq_specific_q)) {
889 ctxt = (void *)key;
890 dispatch_sync_f(dq->dq_specific_q, &ctxt,
891 _dispatch_queue_get_specific);
892 if (ctxt) break;
893 }
894 dq = dq->do_targetq;
895 }
896 return ctxt;
897 }
898
899 #pragma mark -
900 #pragma mark dispatch_queue_debug
901
902 size_t
903 _dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
904 {
905 dispatch_queue_t target = dq->do_targetq;
906 return snprintf(buf, bufsiz, "target = %s[%p], width = 0x%x, "
907 "running = 0x%x, barrier = %d ", target ? target->dq_label : "",
908 target, dq->dq_width / 2, dq->dq_running / 2, dq->dq_running & 1);
909 }
910
911 size_t
912 dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
913 {
914 size_t offset = 0;
915 offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
916 dq->dq_label, dq);
917 offset += _dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
918 offset += _dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
919 offset += snprintf(&buf[offset], bufsiz - offset, "}");
920 return offset;
921 }
922
923 #if DISPATCH_DEBUG
924 void
925 dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
926 if (fastpath(dq)) {
927 dispatch_debug(dq, "%s", str);
928 } else {
929 _dispatch_log("queue[NULL]: %s", str);
930 }
931 }
932 #endif
933
934 #if DISPATCH_PERF_MON
935 static OSSpinLock _dispatch_stats_lock;
936 static size_t _dispatch_bad_ratio;
937 static struct {
938 uint64_t time_total;
939 uint64_t count_total;
940 uint64_t thread_total;
941 } _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set
942
943 static void
944 _dispatch_queue_merge_stats(uint64_t start)
945 {
946 uint64_t avg, delta = _dispatch_absolute_time() - start;
947 unsigned long count, bucket;
948
949 count = (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key);
950 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
951
952 if (count) {
953 avg = delta / count;
954 bucket = flsll(avg);
955 } else {
956 bucket = 0;
957 }
958
959 // 64-bit counters on 32-bit require a lock or a queue
960 OSSpinLockLock(&_dispatch_stats_lock);
961
962 _dispatch_stats[bucket].time_total += delta;
963 _dispatch_stats[bucket].count_total += count;
964 _dispatch_stats[bucket].thread_total++;
965
966 OSSpinLockUnlock(&_dispatch_stats_lock);
967 }
968 #endif
969
970 #pragma mark -
971 #pragma mark dispatch_continuation_t
972
973 static malloc_zone_t *_dispatch_ccache_zone;
974
975 static void
976 _dispatch_ccache_init(void *context DISPATCH_UNUSED)
977 {
978 _dispatch_ccache_zone = malloc_create_zone(0, 0);
979 dispatch_assert(_dispatch_ccache_zone);
980 malloc_set_zone_name(_dispatch_ccache_zone, "DispatchContinuations");
981 }
982
983 static dispatch_continuation_t
984 _dispatch_continuation_alloc_from_heap(void)
985 {
986 static dispatch_once_t pred;
987 dispatch_continuation_t dc;
988
989 dispatch_once_f(&pred, NULL, _dispatch_ccache_init);
990
991 while (!(dc = fastpath(malloc_zone_calloc(_dispatch_ccache_zone, 1,
992 ROUND_UP_TO_CACHELINE_SIZE(sizeof(*dc)))))) {
993 sleep(1);
994 }
995
996 return dc;
997 }
998
999 DISPATCH_ALWAYS_INLINE
1000 static inline dispatch_continuation_t
1001 _dispatch_continuation_alloc_cacheonly(void)
1002 {
1003 dispatch_continuation_t dc;
1004 dc = fastpath(_dispatch_thread_getspecific(dispatch_cache_key));
1005 if (dc) {
1006 _dispatch_thread_setspecific(dispatch_cache_key, dc->do_next);
1007 }
1008 return dc;
1009 }
1010
1011 static void
1012 _dispatch_force_cache_cleanup(void)
1013 {
1014 dispatch_continuation_t dc;
1015 dc = _dispatch_thread_getspecific(dispatch_cache_key);
1016 if (dc) {
1017 _dispatch_thread_setspecific(dispatch_cache_key, NULL);
1018 _dispatch_cache_cleanup(dc);
1019 }
1020 }
1021
1022 DISPATCH_NOINLINE
1023 static void
1024 _dispatch_cache_cleanup(void *value)
1025 {
1026 dispatch_continuation_t dc, next_dc = value;
1027
1028 while ((dc = next_dc)) {
1029 next_dc = dc->do_next;
1030 malloc_zone_free(_dispatch_ccache_zone, dc);
1031 }
1032 }
1033
1034 DISPATCH_ALWAYS_INLINE
1035 static inline void
1036 _dispatch_continuation_free(dispatch_continuation_t dc)
1037 {
1038 dispatch_continuation_t prev_dc;
1039 prev_dc = _dispatch_thread_getspecific(dispatch_cache_key);
1040 dc->do_next = prev_dc;
1041 _dispatch_thread_setspecific(dispatch_cache_key, dc);
1042 }
1043
1044 DISPATCH_ALWAYS_INLINE_NDEBUG
1045 static inline void
1046 _dispatch_continuation_redirect(dispatch_queue_t dq, dispatch_object_t dou)
1047 {
1048 dispatch_continuation_t dc = dou._dc;
1049
1050 _dispatch_trace_continuation_pop(dq, dou);
1051 (void)dispatch_atomic_add2o(dq, dq_running, 2);
1052 if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
1053 (long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT) {
1054 dispatch_atomic_barrier();
1055 _dispatch_thread_semaphore_signal(
1056 (_dispatch_thread_semaphore_t)dc->dc_ctxt);
1057 } else {
1058 _dispatch_async_f_redirect(dq, dc);
1059 }
1060 }
1061
1062 DISPATCH_ALWAYS_INLINE_NDEBUG
1063 static inline void
1064 _dispatch_continuation_pop(dispatch_object_t dou)
1065 {
1066 dispatch_continuation_t dc = dou._dc;
1067 dispatch_group_t dg;
1068
1069 _dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou);
1070 if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
1071 return _dispatch_queue_invoke(dou._dq);
1072 }
1073
1074 // Add the item back to the cache before calling the function. This
1075 // allows the 'hot' continuation to be used for a quick callback.
1076 //
1077 // The ccache version is per-thread.
1078 // Therefore, the object has not been reused yet.
1079 // This generates better assembly.
1080 if ((long)dc->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
1081 _dispatch_continuation_free(dc);
1082 }
1083 if ((long)dc->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
1084 dg = dc->dc_group;
1085 } else {
1086 dg = NULL;
1087 }
1088 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
1089 if (dg) {
1090 dispatch_group_leave(dg);
1091 _dispatch_release(dg);
1092 }
1093 }
1094
1095 #pragma mark -
1096 #pragma mark dispatch_barrier_async
1097
1098 DISPATCH_NOINLINE
1099 static void
1100 _dispatch_barrier_async_f_slow(dispatch_queue_t dq, void *ctxt,
1101 dispatch_function_t func)
1102 {
1103 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
1104
1105 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
1106 dc->dc_func = func;
1107 dc->dc_ctxt = ctxt;
1108
1109 _dispatch_queue_push(dq, dc);
1110 }
1111
1112 DISPATCH_NOINLINE
1113 void
1114 dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
1115 dispatch_function_t func)
1116 {
1117 dispatch_continuation_t dc;
1118
1119 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
1120 if (!dc) {
1121 return _dispatch_barrier_async_f_slow(dq, ctxt, func);
1122 }
1123
1124 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
1125 dc->dc_func = func;
1126 dc->dc_ctxt = ctxt;
1127
1128 _dispatch_queue_push(dq, dc);
1129 }
1130
1131 #ifdef __BLOCKS__
1132 void
1133 dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void))
1134 {
1135 dispatch_barrier_async_f(dq, _dispatch_Block_copy(work),
1136 _dispatch_call_block_and_release);
1137 }
1138 #endif
1139
1140 #pragma mark -
1141 #pragma mark dispatch_async
1142
1143 static void
1144 _dispatch_async_f_redirect_invoke(void *_ctxt)
1145 {
1146 struct dispatch_continuation_s *dc = _ctxt;
1147 struct dispatch_continuation_s *other_dc = dc->dc_data[1];
1148 dispatch_queue_t old_dq, dq = dc->dc_data[0], rq;
1149
1150 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1151 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1152 _dispatch_continuation_pop(other_dc);
1153 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1154
1155 rq = dq->do_targetq;
1156 while (slowpath(rq->do_targetq) && rq != old_dq) {
1157 if (dispatch_atomic_sub2o(rq, dq_running, 2) == 0) {
1158 _dispatch_wakeup(rq);
1159 }
1160 rq = rq->do_targetq;
1161 }
1162
1163 if (dispatch_atomic_sub2o(dq, dq_running, 2) == 0) {
1164 _dispatch_wakeup(dq);
1165 }
1166 _dispatch_release(dq);
1167 }
1168
1169 DISPATCH_NOINLINE
1170 static void
1171 _dispatch_async_f2_slow(dispatch_queue_t dq, dispatch_continuation_t dc)
1172 {
1173 _dispatch_wakeup(dq);
1174 _dispatch_queue_push(dq, dc);
1175 }
1176
1177 DISPATCH_NOINLINE
1178 static void
1179 _dispatch_async_f_redirect(dispatch_queue_t dq,
1180 dispatch_continuation_t other_dc)
1181 {
1182 dispatch_continuation_t dc;
1183 dispatch_queue_t rq;
1184
1185 _dispatch_retain(dq);
1186
1187 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
1188 if (!dc) {
1189 dc = _dispatch_continuation_alloc_from_heap();
1190 }
1191
1192 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1193 dc->dc_func = _dispatch_async_f_redirect_invoke;
1194 dc->dc_ctxt = dc;
1195 dc->dc_data[0] = dq;
1196 dc->dc_data[1] = other_dc;
1197
1198 // Find the queue to redirect to
1199 rq = dq->do_targetq;
1200 while (slowpath(rq->do_targetq)) {
1201 uint32_t running;
1202
1203 if (slowpath(rq->dq_items_tail) ||
1204 slowpath(DISPATCH_OBJECT_SUSPENDED(rq)) ||
1205 slowpath(rq->dq_width == 1)) {
1206 break;
1207 }
1208 running = dispatch_atomic_add2o(rq, dq_running, 2) - 2;
1209 if (slowpath(running & 1) || slowpath(running + 2 > rq->dq_width)) {
1210 if (slowpath(dispatch_atomic_sub2o(rq, dq_running, 2) == 0)) {
1211 return _dispatch_async_f2_slow(rq, dc);
1212 }
1213 break;
1214 }
1215 rq = rq->do_targetq;
1216 }
1217 _dispatch_queue_push(rq, dc);
1218 }
1219
1220 DISPATCH_NOINLINE
1221 static void
1222 _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
1223 {
1224 uint32_t running;
1225 bool locked;
1226
1227 do {
1228 if (slowpath(dq->dq_items_tail)
1229 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
1230 break;
1231 }
1232 running = dispatch_atomic_add2o(dq, dq_running, 2);
1233 if (slowpath(running > dq->dq_width)) {
1234 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
1235 return _dispatch_async_f2_slow(dq, dc);
1236 }
1237 break;
1238 }
1239 locked = running & 1;
1240 if (fastpath(!locked)) {
1241 return _dispatch_async_f_redirect(dq, dc);
1242 }
1243 locked = dispatch_atomic_sub2o(dq, dq_running, 2) & 1;
1244 // We might get lucky and find that the barrier has ended by now
1245 } while (!locked);
1246
1247 _dispatch_queue_push(dq, dc);
1248 }
1249
1250 DISPATCH_NOINLINE
1251 static void
1252 _dispatch_async_f_slow(dispatch_queue_t dq, void *ctxt,
1253 dispatch_function_t func)
1254 {
1255 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
1256
1257 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1258 dc->dc_func = func;
1259 dc->dc_ctxt = ctxt;
1260
1261 // No fastpath/slowpath hint because we simply don't know
1262 if (dq->do_targetq) {
1263 return _dispatch_async_f2(dq, dc);
1264 }
1265
1266 _dispatch_queue_push(dq, dc);
1267 }
1268
1269 DISPATCH_NOINLINE
1270 void
1271 dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
1272 {
1273 dispatch_continuation_t dc;
1274
1275 // No fastpath/slowpath hint because we simply don't know
1276 if (dq->dq_width == 1) {
1277 return dispatch_barrier_async_f(dq, ctxt, func);
1278 }
1279
1280 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
1281 if (!dc) {
1282 return _dispatch_async_f_slow(dq, ctxt, func);
1283 }
1284
1285 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1286 dc->dc_func = func;
1287 dc->dc_ctxt = ctxt;
1288
1289 // No fastpath/slowpath hint because we simply don't know
1290 if (dq->do_targetq) {
1291 return _dispatch_async_f2(dq, dc);
1292 }
1293
1294 _dispatch_queue_push(dq, dc);
1295 }
1296
1297 #ifdef __BLOCKS__
1298 void
1299 dispatch_async(dispatch_queue_t dq, void (^work)(void))
1300 {
1301 dispatch_async_f(dq, _dispatch_Block_copy(work),
1302 _dispatch_call_block_and_release);
1303 }
1304 #endif
1305
1306 #pragma mark -
1307 #pragma mark dispatch_group_async
1308
1309 DISPATCH_NOINLINE
1310 void
1311 dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
1312 dispatch_function_t func)
1313 {
1314 dispatch_continuation_t dc;
1315
1316 _dispatch_retain(dg);
1317 dispatch_group_enter(dg);
1318
1319 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
1320 if (!dc) {
1321 dc = _dispatch_continuation_alloc_from_heap();
1322 }
1323
1324 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_GROUP_BIT);
1325 dc->dc_func = func;
1326 dc->dc_ctxt = ctxt;
1327 dc->dc_group = dg;
1328
1329 // No fastpath/slowpath hint because we simply don't know
1330 if (dq->dq_width != 1 && dq->do_targetq) {
1331 return _dispatch_async_f2(dq, dc);
1332 }
1333
1334 _dispatch_queue_push(dq, dc);
1335 }
1336
1337 #ifdef __BLOCKS__
1338 void
1339 dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
1340 dispatch_block_t db)
1341 {
1342 dispatch_group_async_f(dg, dq, _dispatch_Block_copy(db),
1343 _dispatch_call_block_and_release);
1344 }
1345 #endif
1346
1347 #pragma mark -
1348 #pragma mark dispatch_function_invoke
1349
1350 DISPATCH_ALWAYS_INLINE
1351 static inline void
1352 _dispatch_function_invoke(dispatch_queue_t dq, void *ctxt,
1353 dispatch_function_t func)
1354 {
1355 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1356 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1357 _dispatch_client_callout(ctxt, func);
1358 _dispatch_workitem_inc();
1359 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1360 }
1361
1362 struct dispatch_function_recurse_s {
1363 dispatch_queue_t dfr_dq;
1364 void* dfr_ctxt;
1365 dispatch_function_t dfr_func;
1366 };
1367
1368 static void
1369 _dispatch_function_recurse_invoke(void *ctxt)
1370 {
1371 struct dispatch_function_recurse_s *dfr = ctxt;
1372 _dispatch_function_invoke(dfr->dfr_dq, dfr->dfr_ctxt, dfr->dfr_func);
1373 }
1374
1375 DISPATCH_ALWAYS_INLINE
1376 static inline void
1377 _dispatch_function_recurse(dispatch_queue_t dq, void *ctxt,
1378 dispatch_function_t func)
1379 {
1380 struct dispatch_function_recurse_s dfr = {
1381 .dfr_dq = dq,
1382 .dfr_func = func,
1383 .dfr_ctxt = ctxt,
1384 };
1385 dispatch_sync_f(dq->do_targetq, &dfr, _dispatch_function_recurse_invoke);
1386 }
1387
1388 #pragma mark -
1389 #pragma mark dispatch_barrier_sync
1390
1391 struct dispatch_barrier_sync_slow_s {
1392 DISPATCH_CONTINUATION_HEADER(dispatch_barrier_sync_slow_s);
1393 };
1394
1395 struct dispatch_barrier_sync_slow2_s {
1396 dispatch_queue_t dbss2_dq;
1397 #if DISPATCH_COCOA_COMPAT
1398 dispatch_function_t dbss2_func;
1399 void *dbss2_ctxt;
1400 #endif
1401 _dispatch_thread_semaphore_t dbss2_sema;
1402 };
1403
1404 static void
1405 _dispatch_barrier_sync_f_slow_invoke(void *ctxt)
1406 {
1407 struct dispatch_barrier_sync_slow2_s *dbss2 = ctxt;
1408
1409 dispatch_assert(dbss2->dbss2_dq == _dispatch_queue_get_current());
1410 #if DISPATCH_COCOA_COMPAT
1411 // When the main queue is bound to the main thread
1412 if (dbss2->dbss2_dq == &_dispatch_main_q && pthread_main_np()) {
1413 dbss2->dbss2_func(dbss2->dbss2_ctxt);
1414 dbss2->dbss2_func = NULL;
1415 dispatch_atomic_barrier();
1416 _dispatch_thread_semaphore_signal(dbss2->dbss2_sema);
1417 return;
1418 }
1419 #endif
1420 (void)dispatch_atomic_add2o(dbss2->dbss2_dq, do_suspend_cnt,
1421 DISPATCH_OBJECT_SUSPEND_INTERVAL);
1422 // rdar://9032024 running lock must be held until sync_f_slow returns
1423 (void)dispatch_atomic_add2o(dbss2->dbss2_dq, dq_running, 2);
1424 dispatch_atomic_barrier();
1425 _dispatch_thread_semaphore_signal(dbss2->dbss2_sema);
1426 }
1427
1428 DISPATCH_NOINLINE
1429 static void
1430 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
1431 dispatch_function_t func)
1432 {
1433 // It's preferred to execute synchronous blocks on the current thread
1434 // due to thread-local side effects, garbage collection, etc. However,
1435 // blocks submitted to the main thread MUST be run on the main thread
1436
1437 struct dispatch_barrier_sync_slow2_s dbss2 = {
1438 .dbss2_dq = dq,
1439 #if DISPATCH_COCOA_COMPAT
1440 .dbss2_func = func,
1441 .dbss2_ctxt = ctxt,
1442 #endif
1443 .dbss2_sema = _dispatch_get_thread_semaphore(),
1444 };
1445 struct dispatch_barrier_sync_slow_s dbss = {
1446 .do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT |
1447 DISPATCH_OBJ_SYNC_SLOW_BIT),
1448 .dc_func = _dispatch_barrier_sync_f_slow_invoke,
1449 .dc_ctxt = &dbss2,
1450 };
1451 _dispatch_queue_push(dq, (void *)&dbss);
1452
1453 _dispatch_thread_semaphore_wait(dbss2.dbss2_sema);
1454 _dispatch_put_thread_semaphore(dbss2.dbss2_sema);
1455
1456 #if DISPATCH_COCOA_COMPAT
1457 // Main queue bound to main thread
1458 if (dbss2.dbss2_func == NULL) {
1459 return;
1460 }
1461 #endif
1462 dispatch_atomic_acquire_barrier();
1463 if (slowpath(dq->do_targetq) && slowpath(dq->do_targetq->do_targetq)) {
1464 _dispatch_function_recurse(dq, ctxt, func);
1465 } else {
1466 _dispatch_function_invoke(dq, ctxt, func);
1467 }
1468 dispatch_atomic_release_barrier();
1469 if (fastpath(dq->do_suspend_cnt < 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL)) {
1470 // rdar://problem/8290662 "lock transfer"
1471 // ensure drain of current barrier sync has finished
1472 while (slowpath(dq->dq_running > 2)) {
1473 _dispatch_hardware_pause();
1474 }
1475 _dispatch_thread_semaphore_t sema;
1476 sema = _dispatch_queue_drain_one_barrier_sync(dq);
1477 if (sema) {
1478 _dispatch_thread_semaphore_signal(sema);
1479 return;
1480 }
1481 }
1482 (void)dispatch_atomic_sub2o(dq, do_suspend_cnt,
1483 DISPATCH_OBJECT_SUSPEND_INTERVAL);
1484 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
1485 _dispatch_wakeup(dq);
1486 }
1487 }
1488
1489 DISPATCH_NOINLINE
1490 static void
1491 _dispatch_barrier_sync_f2(dispatch_queue_t dq)
1492 {
1493 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
1494 // rdar://problem/8290662 "lock transfer"
1495 _dispatch_thread_semaphore_t sema;
1496 sema = _dispatch_queue_drain_one_barrier_sync(dq);
1497 if (sema) {
1498 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
1499 DISPATCH_OBJECT_SUSPEND_INTERVAL);
1500 // rdar://9032024 running lock must be held until sync_f_slow
1501 // returns: increment by 2 and decrement by 1
1502 (void)dispatch_atomic_inc2o(dq, dq_running);
1503 _dispatch_thread_semaphore_signal(sema);
1504 return;
1505 }
1506 }
1507 if (slowpath(dispatch_atomic_dec2o(dq, dq_running) == 0)) {
1508 _dispatch_wakeup(dq);
1509 }
1510 }
1511
1512 DISPATCH_NOINLINE
1513 static void
1514 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
1515 dispatch_function_t func)
1516 {
1517 dispatch_atomic_acquire_barrier();
1518 _dispatch_function_invoke(dq, ctxt, func);
1519 dispatch_atomic_release_barrier();
1520 if (slowpath(dq->dq_items_tail)) {
1521 return _dispatch_barrier_sync_f2(dq);
1522 }
1523 if (slowpath(dispatch_atomic_dec2o(dq, dq_running) == 0)) {
1524 _dispatch_wakeup(dq);
1525 }
1526 }
1527
1528 DISPATCH_NOINLINE
1529 static void
1530 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
1531 dispatch_function_t func)
1532 {
1533 dispatch_atomic_acquire_barrier();
1534 _dispatch_function_recurse(dq, ctxt, func);
1535 dispatch_atomic_release_barrier();
1536 if (slowpath(dq->dq_items_tail)) {
1537 return _dispatch_barrier_sync_f2(dq);
1538 }
1539 if (slowpath(dispatch_atomic_dec2o(dq, dq_running) == 0)) {
1540 _dispatch_wakeup(dq);
1541 }
1542 }
1543
1544 DISPATCH_NOINLINE
1545 void
1546 dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
1547 dispatch_function_t func)
1548 {
1549 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1550 // 2) the queue is not suspended
1551 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
1552 return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
1553 }
1554 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) {
1555 // global queues and main queue bound to main thread always falls into
1556 // the slow case
1557 return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
1558 }
1559 if (slowpath(dq->do_targetq->do_targetq)) {
1560 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func);
1561 }
1562 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
1563 }
1564
1565 #ifdef __BLOCKS__
1566 #if DISPATCH_COCOA_COMPAT
1567 DISPATCH_NOINLINE
1568 static void
1569 _dispatch_barrier_sync_slow(dispatch_queue_t dq, void (^work)(void))
1570 {
1571 // Blocks submitted to the main queue MUST be run on the main thread,
1572 // therefore under GC we must Block_copy in order to notify the thread-local
1573 // garbage collector that the objects are transferring to the main thread
1574 // rdar://problem/7176237&7181849&7458685
1575 if (dispatch_begin_thread_4GC) {
1576 dispatch_block_t block = _dispatch_Block_copy(work);
1577 return dispatch_barrier_sync_f(dq, block,
1578 _dispatch_call_block_and_release);
1579 }
1580 struct Block_basic *bb = (void *)work;
1581 dispatch_barrier_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
1582 }
1583 #endif
1584
1585 void
1586 dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void))
1587 {
1588 #if DISPATCH_COCOA_COMPAT
1589 if (slowpath(dq == &_dispatch_main_q)) {
1590 return _dispatch_barrier_sync_slow(dq, work);
1591 }
1592 #endif
1593 struct Block_basic *bb = (void *)work;
1594 dispatch_barrier_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
1595 }
1596 #endif
1597
1598 #pragma mark -
1599 #pragma mark dispatch_sync
1600
1601 DISPATCH_NOINLINE
1602 static void
1603 _dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
1604 {
1605 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
1606 struct dispatch_sync_slow_s {
1607 DISPATCH_CONTINUATION_HEADER(dispatch_sync_slow_s);
1608 } dss = {
1609 .do_vtable = (void*)DISPATCH_OBJ_SYNC_SLOW_BIT,
1610 .dc_ctxt = (void*)sema,
1611 };
1612 _dispatch_queue_push(dq, (void *)&dss);
1613
1614 _dispatch_thread_semaphore_wait(sema);
1615 _dispatch_put_thread_semaphore(sema);
1616
1617 if (slowpath(dq->do_targetq->do_targetq)) {
1618 _dispatch_function_recurse(dq, ctxt, func);
1619 } else {
1620 _dispatch_function_invoke(dq, ctxt, func);
1621 }
1622 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
1623 _dispatch_wakeup(dq);
1624 }
1625 }
1626
1627 DISPATCH_NOINLINE
1628 static void
1629 _dispatch_sync_f_slow2(dispatch_queue_t dq, void *ctxt,
1630 dispatch_function_t func)
1631 {
1632 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
1633 _dispatch_wakeup(dq);
1634 }
1635 _dispatch_sync_f_slow(dq, ctxt, func);
1636 }
1637
1638 DISPATCH_NOINLINE
1639 static void
1640 _dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
1641 dispatch_function_t func)
1642 {
1643 _dispatch_function_invoke(dq, ctxt, func);
1644 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
1645 _dispatch_wakeup(dq);
1646 }
1647 }
1648
1649 DISPATCH_NOINLINE
1650 static void
1651 _dispatch_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
1652 dispatch_function_t func)
1653 {
1654 _dispatch_function_recurse(dq, ctxt, func);
1655 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
1656 _dispatch_wakeup(dq);
1657 }
1658 }
1659
1660 DISPATCH_NOINLINE
1661 static void
1662 _dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
1663 {
1664 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1665 // 2) the queue is not suspended
1666 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
1667 return _dispatch_sync_f_slow(dq, ctxt, func);
1668 }
1669 if (slowpath(dispatch_atomic_add2o(dq, dq_running, 2) & 1)) {
1670 return _dispatch_sync_f_slow2(dq, ctxt, func);
1671 }
1672 if (slowpath(dq->do_targetq->do_targetq)) {
1673 return _dispatch_sync_f_recurse(dq, ctxt, func);
1674 }
1675 _dispatch_sync_f_invoke(dq, ctxt, func);
1676 }
1677
1678 DISPATCH_NOINLINE
1679 void
1680 dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
1681 {
1682 if (fastpath(dq->dq_width == 1)) {
1683 return dispatch_barrier_sync_f(dq, ctxt, func);
1684 }
1685 if (slowpath(!dq->do_targetq)) {
1686 // the global root queues do not need strict ordering
1687 (void)dispatch_atomic_add2o(dq, dq_running, 2);
1688 return _dispatch_sync_f_invoke(dq, ctxt, func);
1689 }
1690 _dispatch_sync_f2(dq, ctxt, func);
1691 }
1692
1693 #ifdef __BLOCKS__
1694 #if DISPATCH_COCOA_COMPAT
1695 DISPATCH_NOINLINE
1696 static void
1697 _dispatch_sync_slow(dispatch_queue_t dq, void (^work)(void))
1698 {
1699 // Blocks submitted to the main queue MUST be run on the main thread,
1700 // therefore under GC we must Block_copy in order to notify the thread-local
1701 // garbage collector that the objects are transferring to the main thread
1702 // rdar://problem/7176237&7181849&7458685
1703 if (dispatch_begin_thread_4GC) {
1704 dispatch_block_t block = _dispatch_Block_copy(work);
1705 return dispatch_sync_f(dq, block, _dispatch_call_block_and_release);
1706 }
1707 struct Block_basic *bb = (void *)work;
1708 dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
1709 }
1710 #endif
1711
1712 void
1713 dispatch_sync(dispatch_queue_t dq, void (^work)(void))
1714 {
1715 #if DISPATCH_COCOA_COMPAT
1716 if (slowpath(dq == &_dispatch_main_q)) {
1717 return _dispatch_sync_slow(dq, work);
1718 }
1719 #endif
1720 struct Block_basic *bb = (void *)work;
1721 dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
1722 }
1723 #endif
1724
1725 #pragma mark -
1726 #pragma mark dispatch_after
1727
1728 struct _dispatch_after_time_s {
1729 void *datc_ctxt;
1730 void (*datc_func)(void *);
1731 dispatch_source_t ds;
1732 };
1733
1734 static void
1735 _dispatch_after_timer_callback(void *ctxt)
1736 {
1737 struct _dispatch_after_time_s *datc = ctxt;
1738
1739 dispatch_assert(datc->datc_func);
1740 _dispatch_client_callout(datc->datc_ctxt, datc->datc_func);
1741
1742 dispatch_source_t ds = datc->ds;
1743 free(datc);
1744
1745 dispatch_source_cancel(ds); // Needed until 7287561 gets integrated
1746 dispatch_release(ds);
1747 }
1748
1749 DISPATCH_NOINLINE
1750 void
1751 dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
1752 dispatch_function_t func)
1753 {
1754 uint64_t delta;
1755 struct _dispatch_after_time_s *datc = NULL;
1756 dispatch_source_t ds;
1757
1758 if (when == DISPATCH_TIME_FOREVER) {
1759 #if DISPATCH_DEBUG
1760 DISPATCH_CLIENT_CRASH(
1761 "dispatch_after_f() called with 'when' == infinity");
1762 #endif
1763 return;
1764 }
1765
1766 // this function can and should be optimized to not use a dispatch source
1767 delta = _dispatch_timeout(when);
1768 if (delta == 0) {
1769 return dispatch_async_f(queue, ctxt, func);
1770 }
1771 // on successful creation, source owns malloc-ed context (which it frees in
1772 // the event handler)
1773 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);
1774 dispatch_assert(ds);
1775
1776 datc = malloc(sizeof(*datc));
1777 dispatch_assert(datc);
1778 datc->datc_ctxt = ctxt;
1779 datc->datc_func = func;
1780 datc->ds = ds;
1781
1782 dispatch_set_context(ds, datc);
1783 dispatch_source_set_event_handler_f(ds, _dispatch_after_timer_callback);
1784 dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, 0);
1785 dispatch_resume(ds);
1786 }
1787
1788 #ifdef __BLOCKS__
1789 void
1790 dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
1791 dispatch_block_t work)
1792 {
1793 // test before the copy of the block
1794 if (when == DISPATCH_TIME_FOREVER) {
1795 #if DISPATCH_DEBUG
1796 DISPATCH_CLIENT_CRASH(
1797 "dispatch_after() called with 'when' == infinity");
1798 #endif
1799 return;
1800 }
1801 dispatch_after_f(when, queue, _dispatch_Block_copy(work),
1802 _dispatch_call_block_and_release);
1803 }
1804 #endif
1805
1806 #pragma mark -
1807 #pragma mark dispatch_wakeup
1808
1809 DISPATCH_NOINLINE
1810 void
1811 _dispatch_queue_push_list_slow(dispatch_queue_t dq,
1812 struct dispatch_object_s *obj)
1813 {
1814 // The queue must be retained before dq_items_head is written in order
1815 // to ensure that the reference is still valid when _dispatch_wakeup is
1816 // called. Otherwise, if preempted between the assignment to
1817 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
1818 // queue may release the last reference to the queue when invoked by
1819 // _dispatch_queue_drain. <rdar://problem/6932776>
1820 _dispatch_retain(dq);
1821 dq->dq_items_head = obj;
1822 _dispatch_wakeup(dq);
1823 _dispatch_release(dq);
1824 }
1825
1826 // 6618342 Contact the team that owns the Instrument DTrace probe before
1827 // renaming this symbol
1828 dispatch_queue_t
1829 _dispatch_wakeup(dispatch_object_t dou)
1830 {
1831 dispatch_queue_t tq;
1832
1833 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
1834 return NULL;
1835 }
1836 if (!dx_probe(dou._do) && !dou._dq->dq_items_tail) {
1837 return NULL;
1838 }
1839
1840 // _dispatch_source_invoke() relies on this testing the whole suspend count
1841 // word, not just the lock bit. In other words, no point taking the lock
1842 // if the source is suspended or canceled.
1843 if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,
1844 DISPATCH_OBJECT_SUSPEND_LOCK)) {
1845 #if DISPATCH_COCOA_COMPAT
1846 if (dou._dq == &_dispatch_main_q) {
1847 _dispatch_queue_wakeup_main();
1848 }
1849 #endif
1850 return NULL;
1851 }
1852 _dispatch_retain(dou._do);
1853 tq = dou._do->do_targetq;
1854 _dispatch_queue_push(tq, dou._do);
1855 return tq; // libdispatch does not need this, but the Instrument DTrace
1856 // probe does
1857 }
1858
1859 #if DISPATCH_COCOA_COMPAT
1860 DISPATCH_NOINLINE
1861 void
1862 _dispatch_queue_wakeup_main(void)
1863 {
1864 kern_return_t kr;
1865
1866 dispatch_once_f(&_dispatch_main_q_port_pred, NULL,
1867 _dispatch_main_q_port_init);
1868
1869 kr = _dispatch_send_wakeup_main_thread(main_q_port, 0);
1870
1871 switch (kr) {
1872 case MACH_SEND_TIMEOUT:
1873 case MACH_SEND_TIMED_OUT:
1874 case MACH_SEND_INVALID_DEST:
1875 break;
1876 default:
1877 (void)dispatch_assume_zero(kr);
1878 break;
1879 }
1880
1881 _dispatch_safe_fork = false;
1882 }
1883 #endif
1884
1885 static bool
1886 _dispatch_queue_wakeup_global(dispatch_queue_t dq)
1887 {
1888 static dispatch_once_t pred;
1889 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
1890 int r;
1891
1892 if (!dq->dq_items_tail) {
1893 return false;
1894 }
1895
1896 _dispatch_safe_fork = false;
1897
1898 dispatch_debug_queue(dq, __PRETTY_FUNCTION__);
1899
1900 dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);
1901
1902 #if HAVE_PTHREAD_WORKQUEUES
1903 #if DISPATCH_ENABLE_THREAD_POOL
1904 if (qc->dgq_kworkqueue)
1905 #endif
1906 {
1907 if (dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, 1)) {
1908 pthread_workitem_handle_t wh;
1909 unsigned int gen_cnt;
1910 _dispatch_debug("requesting new worker thread");
1911
1912 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
1913 _dispatch_worker_thread2, dq, &wh, &gen_cnt);
1914 (void)dispatch_assume_zero(r);
1915 } else {
1916 _dispatch_debug("work thread request still pending on global "
1917 "queue: %p", dq);
1918 }
1919 goto out;
1920 }
1921 #endif // HAVE_PTHREAD_WORKQUEUES
1922 #if DISPATCH_ENABLE_THREAD_POOL
1923 if (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
1924 goto out;
1925 }
1926
1927 pthread_t pthr;
1928 int t_count;
1929 do {
1930 t_count = qc->dgq_thread_pool_size;
1931 if (!t_count) {
1932 _dispatch_debug("The thread pool is full: %p", dq);
1933 goto out;
1934 }
1935 } while (!dispatch_atomic_cmpxchg2o(qc, dgq_thread_pool_size, t_count,
1936 t_count - 1));
1937
1938 while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
1939 if (r != EAGAIN) {
1940 (void)dispatch_assume_zero(r);
1941 }
1942 sleep(1);
1943 }
1944 r = pthread_detach(pthr);
1945 (void)dispatch_assume_zero(r);
1946 #endif // DISPATCH_ENABLE_THREAD_POOL
1947
1948 out:
1949 return false;
1950 }
1951
1952 #pragma mark -
1953 #pragma mark dispatch_queue_drain
1954
1955 // 6618342 Contact the team that owns the Instrument DTrace probe before
1956 // renaming this symbol
1957 DISPATCH_NOINLINE
1958 void
1959 _dispatch_queue_invoke(dispatch_queue_t dq)
1960 {
1961 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) &&
1962 fastpath(dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) {
1963 dispatch_atomic_acquire_barrier();
1964 dispatch_queue_t otq = dq->do_targetq, tq = NULL;
1965 _dispatch_queue_drain(dq);
1966 if (dq->do_vtable->do_invoke) {
1967 // Assume that object invoke checks it is executing on correct queue
1968 tq = dx_invoke(dq);
1969 } else if (slowpath(otq != dq->do_targetq)) {
1970 // An item on the queue changed the target queue
1971 tq = dq->do_targetq;
1972 }
1973 // We do not need to check the result.
1974 // When the suspend-count lock is dropped, then the check will happen.
1975 dispatch_atomic_release_barrier();
1976 (void)dispatch_atomic_dec2o(dq, dq_running);
1977 if (tq) {
1978 return _dispatch_queue_push(tq, dq);
1979 }
1980 }
1981
1982 dq->do_next = DISPATCH_OBJECT_LISTLESS;
1983 if (!dispatch_atomic_sub2o(dq, do_suspend_cnt,
1984 DISPATCH_OBJECT_SUSPEND_LOCK)) {
1985 if (dq->dq_running == 0) {
1986 _dispatch_wakeup(dq); // verify that the queue is idle
1987 }
1988 }
1989 _dispatch_release(dq); // added when the queue is put on the list
1990 }
1991
1992 static void
1993 _dispatch_queue_drain(dispatch_queue_t dq)
1994 {
1995 dispatch_queue_t orig_tq, old_dq;
1996 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1997 struct dispatch_object_s *dc = NULL, *next_dc = NULL;
1998
1999 // Continue draining sources after target queue change rdar://8928171
2000 bool check_tq = (dx_type(dq) != DISPATCH_SOURCE_KEVENT_TYPE);
2001
2002 orig_tq = dq->do_targetq;
2003
2004 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2005 //dispatch_debug_queue(dq, __PRETTY_FUNCTION__);
2006
2007 while (dq->dq_items_tail) {
2008 while (!(dc = fastpath(dq->dq_items_head))) {
2009 _dispatch_hardware_pause();
2010 }
2011 dq->dq_items_head = NULL;
2012 do {
2013 next_dc = fastpath(dc->do_next);
2014 if (!next_dc &&
2015 !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, dc, NULL)) {
2016 // Enqueue is TIGHTLY controlled, we won't wait long.
2017 while (!(next_dc = fastpath(dc->do_next))) {
2018 _dispatch_hardware_pause();
2019 }
2020 }
2021 if (DISPATCH_OBJECT_SUSPENDED(dq)) {
2022 goto out;
2023 }
2024 if (dq->dq_running > dq->dq_width) {
2025 goto out;
2026 }
2027 if (slowpath(orig_tq != dq->do_targetq) && check_tq) {
2028 goto out;
2029 }
2030 if (fastpath(dq->dq_width == 1)) {
2031 _dispatch_continuation_pop(dc);
2032 _dispatch_workitem_inc();
2033 } else if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
2034 (long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
2035 if (dq->dq_running > 1) {
2036 goto out;
2037 }
2038 _dispatch_continuation_pop(dc);
2039 _dispatch_workitem_inc();
2040 } else {
2041 _dispatch_continuation_redirect(dq, dc);
2042 }
2043 } while ((dc = next_dc));
2044 }
2045
2046 out:
2047 // if this is not a complete drain, we must undo some things
2048 if (slowpath(dc)) {
2049 // 'dc' must NOT be "popped"
2050 // 'dc' might be the last item
2051 if (!next_dc &&
2052 !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, NULL, dc)) {
2053 // wait for enqueue slow path to finish
2054 while (!(next_dc = fastpath(dq->dq_items_head))) {
2055 _dispatch_hardware_pause();
2056 }
2057 dc->do_next = next_dc;
2058 }
2059 dq->dq_items_head = dc;
2060 }
2061
2062 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2063 }
2064
2065 static void
2066 _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq)
2067 {
2068 #if DISPATCH_PERF_MON
2069 uint64_t start = _dispatch_absolute_time();
2070 #endif
2071 _dispatch_queue_drain(dq);
2072 #if DISPATCH_PERF_MON
2073 _dispatch_queue_merge_stats(start);
2074 #endif
2075 _dispatch_force_cache_cleanup();
2076 }
2077
2078 #if DISPATCH_COCOA_COMPAT
2079 void
2080 _dispatch_main_queue_drain(void)
2081 {
2082 dispatch_queue_t dq = &_dispatch_main_q;
2083 if (!dq->dq_items_tail) {
2084 return;
2085 }
2086 struct dispatch_main_queue_drain_marker_s {
2087 DISPATCH_CONTINUATION_HEADER(dispatch_main_queue_drain_marker_s);
2088 } marker = {
2089 .do_vtable = NULL,
2090 };
2091 struct dispatch_object_s *dmarker = (void*)&marker;
2092 _dispatch_queue_push_notrace(dq, dmarker);
2093
2094 #if DISPATCH_PERF_MON
2095 uint64_t start = _dispatch_absolute_time();
2096 #endif
2097 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2098 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2099
2100 struct dispatch_object_s *dc = NULL, *next_dc = NULL;
2101 while (dq->dq_items_tail) {
2102 while (!(dc = fastpath(dq->dq_items_head))) {
2103 _dispatch_hardware_pause();
2104 }
2105 dq->dq_items_head = NULL;
2106 do {
2107 next_dc = fastpath(dc->do_next);
2108 if (!next_dc &&
2109 !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, dc, NULL)) {
2110 // Enqueue is TIGHTLY controlled, we won't wait long.
2111 while (!(next_dc = fastpath(dc->do_next))) {
2112 _dispatch_hardware_pause();
2113 }
2114 }
2115 if (dc == dmarker) {
2116 if (next_dc) {
2117 dq->dq_items_head = next_dc;
2118 _dispatch_queue_wakeup_main();
2119 }
2120 goto out;
2121 }
2122 _dispatch_continuation_pop(dc);
2123 _dispatch_workitem_inc();
2124 } while ((dc = next_dc));
2125 }
2126 dispatch_assert(dc); // did not encounter marker
2127
2128 out:
2129 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2130 #if DISPATCH_PERF_MON
2131 _dispatch_queue_merge_stats(start);
2132 #endif
2133 _dispatch_force_cache_cleanup();
2134 }
2135 #endif
2136
2137 DISPATCH_ALWAYS_INLINE_NDEBUG
2138 static inline _dispatch_thread_semaphore_t
2139 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq)
2140 {
2141 // rdar://problem/8290662 "lock transfer"
2142 struct dispatch_object_s *dc, *next_dc;
2143
2144 // queue is locked, or suspended and not being drained
2145 dc = dq->dq_items_head;
2146 if (slowpath(!dc) || DISPATCH_OBJ_IS_VTABLE(dc) || ((long)dc->do_vtable &
2147 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) !=
2148 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) {
2149 return 0;
2150 }
2151 // dequeue dc, it is a barrier sync
2152 next_dc = fastpath(dc->do_next);
2153 dq->dq_items_head = next_dc;
2154 if (!next_dc && !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, dc, NULL)) {
2155 // Enqueue is TIGHTLY controlled, we won't wait long.
2156 while (!(next_dc = fastpath(dc->do_next))) {
2157 _dispatch_hardware_pause();
2158 }
2159 dq->dq_items_head = next_dc;
2160 }
2161 _dispatch_trace_continuation_pop(dq, dc);
2162 _dispatch_workitem_inc();
2163
2164 struct dispatch_barrier_sync_slow_s *dbssp = (void *)dc;
2165 struct dispatch_barrier_sync_slow2_s *dbss2p = dbssp->dc_ctxt;
2166 return dbss2p->dbss2_sema;
2167 }
2168
2169 static struct dispatch_object_s *
2170 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq)
2171 {
2172 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
2173
2174 // The mediator value acts both as a "lock" and a signal
2175 head = dispatch_atomic_xchg2o(dq, dq_items_head, mediator);
2176
2177 if (slowpath(head == NULL)) {
2178 // The first xchg on the tail will tell the enqueueing thread that it
2179 // is safe to blindly write out to the head pointer. A cmpxchg honors
2180 // the algorithm.
2181 (void)dispatch_atomic_cmpxchg2o(dq, dq_items_head, mediator, NULL);
2182 _dispatch_debug("no work on global work queue");
2183 return NULL;
2184 }
2185
2186 if (slowpath(head == mediator)) {
2187 // This thread lost the race for ownership of the queue.
2188 //
2189 // The ratio of work to libdispatch overhead must be bad. This
2190 // scenario implies that there are too many threads in the pool.
2191 // Create a new pending thread and then exit this thread.
2192 // The kernel will grant a new thread when the load subsides.
2193 _dispatch_debug("Contention on queue: %p", dq);
2194 _dispatch_queue_wakeup_global(dq);
2195 #if DISPATCH_PERF_MON
2196 dispatch_atomic_inc(&_dispatch_bad_ratio);
2197 #endif
2198 return NULL;
2199 }
2200
2201 // Restore the head pointer to a sane value before returning.
2202 // If 'next' is NULL, then this item _might_ be the last item.
2203 next = fastpath(head->do_next);
2204
2205 if (slowpath(!next)) {
2206 dq->dq_items_head = NULL;
2207
2208 if (dispatch_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL)) {
2209 // both head and tail are NULL now
2210 goto out;
2211 }
2212
2213 // There must be a next item now. This thread won't wait long.
2214 while (!(next = head->do_next)) {
2215 _dispatch_hardware_pause();
2216 }
2217 }
2218
2219 dq->dq_items_head = next;
2220 _dispatch_queue_wakeup_global(dq);
2221 out:
2222 return head;
2223 }
2224
2225 #pragma mark -
2226 #pragma mark dispatch_worker_thread
2227
2228 // 6618342 Contact the team that owns the Instrument DTrace probe before
2229 // renaming this symbol
2230 static void
2231 _dispatch_worker_thread2(void *context)
2232 {
2233 struct dispatch_object_s *item;
2234 dispatch_queue_t dq = context;
2235 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
2236
2237
2238 if (_dispatch_thread_getspecific(dispatch_queue_key)) {
2239 DISPATCH_CRASH("Premature thread recycling");
2240 }
2241
2242 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2243 qc->dgq_pending = 0;
2244
2245 #if DISPATCH_COCOA_COMPAT
2246 (void)dispatch_atomic_inc(&_dispatch_worker_threads);
2247 // ensure that high-level memory management techniques do not leak/crash
2248 if (dispatch_begin_thread_4GC) {
2249 dispatch_begin_thread_4GC();
2250 }
2251 void *pool = _dispatch_begin_NSAutoReleasePool();
2252 #endif
2253
2254 #if DISPATCH_PERF_MON
2255 uint64_t start = _dispatch_absolute_time();
2256 #endif
2257 while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
2258 _dispatch_continuation_pop(item);
2259 }
2260 #if DISPATCH_PERF_MON
2261 _dispatch_queue_merge_stats(start);
2262 #endif
2263
2264 #if DISPATCH_COCOA_COMPAT
2265 _dispatch_end_NSAutoReleasePool(pool);
2266 dispatch_end_thread_4GC();
2267 if (!dispatch_atomic_dec(&_dispatch_worker_threads) &&
2268 dispatch_no_worker_threads_4GC) {
2269 dispatch_no_worker_threads_4GC();
2270 }
2271 #endif
2272
2273 _dispatch_thread_setspecific(dispatch_queue_key, NULL);
2274
2275 _dispatch_force_cache_cleanup();
2276
2277 }
2278
2279 #if DISPATCH_ENABLE_THREAD_POOL
2280 // 6618342 Contact the team that owns the Instrument DTrace probe before
2281 // renaming this symbol
2282 static void *
2283 _dispatch_worker_thread(void *context)
2284 {
2285 dispatch_queue_t dq = context;
2286 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
2287 sigset_t mask;
2288 int r;
2289
2290 // workaround tweaks the kernel workqueue does for us
2291 r = sigfillset(&mask);
2292 (void)dispatch_assume_zero(r);
2293 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
2294 (void)dispatch_assume_zero(r);
2295
2296 do {
2297 _dispatch_worker_thread2(context);
2298 // we use 65 seconds in case there are any timers that run once a minute
2299 } while (dispatch_semaphore_wait(qc->dgq_thread_mediator,
2300 dispatch_time(0, 65ull * NSEC_PER_SEC)) == 0);
2301
2302 (void)dispatch_atomic_inc2o(qc, dgq_thread_pool_size);
2303 if (dq->dq_items_tail) {
2304 _dispatch_queue_wakeup_global(dq);
2305 }
2306
2307 return NULL;
2308 }
2309
2310 int
2311 _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset)
2312 {
2313 int r;
2314
2315 /* Workaround: 6269619 Not all signals can be delivered on any thread */
2316
2317 r = sigdelset(set, SIGILL);
2318 (void)dispatch_assume_zero(r);
2319 r = sigdelset(set, SIGTRAP);
2320 (void)dispatch_assume_zero(r);
2321 #if HAVE_DECL_SIGEMT
2322 r = sigdelset(set, SIGEMT);
2323 (void)dispatch_assume_zero(r);
2324 #endif
2325 r = sigdelset(set, SIGFPE);
2326 (void)dispatch_assume_zero(r);
2327 r = sigdelset(set, SIGBUS);
2328 (void)dispatch_assume_zero(r);
2329 r = sigdelset(set, SIGSEGV);
2330 (void)dispatch_assume_zero(r);
2331 r = sigdelset(set, SIGSYS);
2332 (void)dispatch_assume_zero(r);
2333 r = sigdelset(set, SIGPIPE);
2334 (void)dispatch_assume_zero(r);
2335
2336 return pthread_sigmask(how, set, oset);
2337 }
2338 #endif
2339
2340 #pragma mark -
2341 #pragma mark dispatch_main_queue
2342
2343 static bool _dispatch_program_is_probably_callback_driven;
2344
2345 #if DISPATCH_COCOA_COMPAT
2346 static void
2347 _dispatch_main_q_port_init(void *ctxt DISPATCH_UNUSED)
2348 {
2349 kern_return_t kr;
2350
2351 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
2352 &main_q_port);
2353 DISPATCH_VERIFY_MIG(kr);
2354 (void)dispatch_assume_zero(kr);
2355 kr = mach_port_insert_right(mach_task_self(), main_q_port, main_q_port,
2356 MACH_MSG_TYPE_MAKE_SEND);
2357 DISPATCH_VERIFY_MIG(kr);
2358 (void)dispatch_assume_zero(kr);
2359
2360 _dispatch_program_is_probably_callback_driven = true;
2361 _dispatch_safe_fork = false;
2362 }
2363
2364 mach_port_t
2365 _dispatch_get_main_queue_port_4CF(void)
2366 {
2367 dispatch_once_f(&_dispatch_main_q_port_pred, NULL,
2368 _dispatch_main_q_port_init);
2369 return main_q_port;
2370 }
2371
2372 static bool main_q_is_draining;
2373
2374 // 6618342 Contact the team that owns the Instrument DTrace probe before
2375 // renaming this symbol
2376 DISPATCH_NOINLINE
2377 static void
2378 _dispatch_queue_set_mainq_drain_state(bool arg)
2379 {
2380 main_q_is_draining = arg;
2381 }
2382
2383 void
2384 _dispatch_main_queue_callback_4CF(mach_msg_header_t *msg DISPATCH_UNUSED)
2385 {
2386 if (main_q_is_draining) {
2387 return;
2388 }
2389 _dispatch_queue_set_mainq_drain_state(true);
2390 _dispatch_main_queue_drain();
2391 _dispatch_queue_set_mainq_drain_state(false);
2392 }
2393
2394 #endif
2395
2396 void
2397 dispatch_main(void)
2398 {
2399 #if HAVE_PTHREAD_MAIN_NP
2400 if (pthread_main_np()) {
2401 #endif
2402 _dispatch_program_is_probably_callback_driven = true;
2403 pthread_exit(NULL);
2404 DISPATCH_CRASH("pthread_exit() returned");
2405 #if HAVE_PTHREAD_MAIN_NP
2406 }
2407 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
2408 #endif
2409 }
2410
2411 DISPATCH_NOINLINE DISPATCH_NORETURN
2412 static void
2413 _dispatch_sigsuspend(void)
2414 {
2415 static const sigset_t mask;
2416
2417 #if DISPATCH_COCOA_COMPAT
2418 // Do not count the signal handling thread as a worker thread
2419 (void)dispatch_atomic_dec(&_dispatch_worker_threads);
2420 #endif
2421 for (;;) {
2422 sigsuspend(&mask);
2423 }
2424 }
2425
2426 DISPATCH_NORETURN
2427 static void
2428 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED)
2429 {
2430 // never returns, so burn bridges behind us
2431 _dispatch_clear_stack(0);
2432 _dispatch_sigsuspend();
2433 }
2434
2435 DISPATCH_NOINLINE
2436 static void
2437 _dispatch_queue_cleanup2(void)
2438 {
2439 (void)dispatch_atomic_dec(&_dispatch_main_q.dq_running);
2440
2441 if (dispatch_atomic_sub(&_dispatch_main_q.do_suspend_cnt,
2442 DISPATCH_OBJECT_SUSPEND_LOCK) == 0) {
2443 _dispatch_wakeup(&_dispatch_main_q);
2444 }
2445
2446 // overload the "probably" variable to mean that dispatch_main() or
2447 // similar non-POSIX API was called
2448 // this has to run before the DISPATCH_COCOA_COMPAT below
2449 if (_dispatch_program_is_probably_callback_driven) {
2450 dispatch_async_f(_dispatch_get_root_queue(0, false), NULL,
2451 _dispatch_sig_thread);
2452 sleep(1); // workaround 6778970
2453 }
2454
2455 #if DISPATCH_COCOA_COMPAT
2456 dispatch_once_f(&_dispatch_main_q_port_pred, NULL,
2457 _dispatch_main_q_port_init);
2458
2459 mach_port_t mp = main_q_port;
2460 kern_return_t kr;
2461
2462 main_q_port = 0;
2463
2464 if (mp) {
2465 kr = mach_port_deallocate(mach_task_self(), mp);
2466 DISPATCH_VERIFY_MIG(kr);
2467 (void)dispatch_assume_zero(kr);
2468 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE,
2469 -1);
2470 DISPATCH_VERIFY_MIG(kr);
2471 (void)dispatch_assume_zero(kr);
2472 }
2473 #endif
2474 }
2475
2476 static void
2477 _dispatch_queue_cleanup(void *ctxt)
2478 {
2479 if (ctxt == &_dispatch_main_q) {
2480 return _dispatch_queue_cleanup2();
2481 }
2482 // POSIX defines that destructors are only called if 'ctxt' is non-null
2483 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
2484 }
2485
2486 #pragma mark -
2487 #pragma mark dispatch_manager_queue
2488
2489 static unsigned int _dispatch_select_workaround;
2490 static fd_set _dispatch_rfds;
2491 static fd_set _dispatch_wfds;
2492 static void **_dispatch_rfd_ptrs;
2493 static void **_dispatch_wfd_ptrs;
2494
2495 static int _dispatch_kq;
2496
2497 static void
2498 _dispatch_get_kq_init(void *context DISPATCH_UNUSED)
2499 {
2500 static const struct kevent kev = {
2501 .ident = 1,
2502 .filter = EVFILT_USER,
2503 .flags = EV_ADD|EV_CLEAR,
2504 };
2505
2506 _dispatch_kq = kqueue();
2507
2508 _dispatch_safe_fork = false;
2509
2510 if (_dispatch_kq == -1) {
2511 DISPATCH_CLIENT_CRASH("kqueue() create failed: "
2512 "probably out of file descriptors");
2513 } else if (dispatch_assume(_dispatch_kq < FD_SETSIZE)) {
2514 // in case we fall back to select()
2515 FD_SET(_dispatch_kq, &_dispatch_rfds);
2516 }
2517
2518 (void)dispatch_assume_zero(kevent(_dispatch_kq, &kev, 1, NULL, 0, NULL));
2519
2520 _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q);
2521 }
2522
2523 static int
2524 _dispatch_get_kq(void)
2525 {
2526 static dispatch_once_t pred;
2527
2528 dispatch_once_f(&pred, NULL, _dispatch_get_kq_init);
2529
2530 return _dispatch_kq;
2531 }
2532
2533 long
2534 _dispatch_update_kq(const struct kevent *kev)
2535 {
2536 struct kevent kev_copy = *kev;
2537 // This ensures we don't get a pending kevent back while registering
2538 // a new kevent
2539 kev_copy.flags |= EV_RECEIPT;
2540
2541 if (_dispatch_select_workaround && (kev_copy.flags & EV_DELETE)) {
2542 // Only executed on manager queue
2543 switch (kev_copy.filter) {
2544 case EVFILT_READ:
2545 if (kev_copy.ident < FD_SETSIZE &&
2546 FD_ISSET((int)kev_copy.ident, &_dispatch_rfds)) {
2547 FD_CLR((int)kev_copy.ident, &_dispatch_rfds);
2548 _dispatch_rfd_ptrs[kev_copy.ident] = 0;
2549 (void)dispatch_atomic_dec(&_dispatch_select_workaround);
2550 return 0;
2551 }
2552 break;
2553 case EVFILT_WRITE:
2554 if (kev_copy.ident < FD_SETSIZE &&
2555 FD_ISSET((int)kev_copy.ident, &_dispatch_wfds)) {
2556 FD_CLR((int)kev_copy.ident, &_dispatch_wfds);
2557 _dispatch_wfd_ptrs[kev_copy.ident] = 0;
2558 (void)dispatch_atomic_dec(&_dispatch_select_workaround);
2559 return 0;
2560 }
2561 break;
2562 default:
2563 break;
2564 }
2565 }
2566
2567 int rval = kevent(_dispatch_get_kq(), &kev_copy, 1, &kev_copy, 1, NULL);
2568 if (rval == -1) {
2569 // If we fail to register with kevents, for other reasons aside from
2570 // changelist elements.
2571 (void)dispatch_assume_zero(errno);
2572 //kev_copy.flags |= EV_ERROR;
2573 //kev_copy.data = error;
2574 return errno;
2575 }
2576
2577 // The following select workaround only applies to adding kevents
2578 if ((kev->flags & (EV_DISABLE|EV_DELETE)) ||
2579 !(kev->flags & (EV_ADD|EV_ENABLE))) {
2580 return 0;
2581 }
2582
2583 // Only executed on manager queue
2584 switch (kev_copy.data) {
2585 case 0:
2586 return 0;
2587 case EBADF:
2588 break;
2589 default:
2590 // If an error occurred while registering with kevent, and it was
2591 // because of a kevent changelist processing && the kevent involved
2592 // either doing a read or write, it would indicate we were trying
2593 // to register a /dev/* port; fall back to select
2594 switch (kev_copy.filter) {
2595 case EVFILT_READ:
2596 if (dispatch_assume(kev_copy.ident < FD_SETSIZE)) {
2597 if (!_dispatch_rfd_ptrs) {
2598 _dispatch_rfd_ptrs = calloc(FD_SETSIZE, sizeof(void*));
2599 }
2600 _dispatch_rfd_ptrs[kev_copy.ident] = kev_copy.udata;
2601 FD_SET((int)kev_copy.ident, &_dispatch_rfds);
2602 (void)dispatch_atomic_inc(&_dispatch_select_workaround);
2603 _dispatch_debug("select workaround used to read fd %d: 0x%lx",
2604 (int)kev_copy.ident, (long)kev_copy.data);
2605 return 0;
2606 }
2607 break;
2608 case EVFILT_WRITE:
2609 if (dispatch_assume(kev_copy.ident < FD_SETSIZE)) {
2610 if (!_dispatch_wfd_ptrs) {
2611 _dispatch_wfd_ptrs = calloc(FD_SETSIZE, sizeof(void*));
2612 }
2613 _dispatch_wfd_ptrs[kev_copy.ident] = kev_copy.udata;
2614 FD_SET((int)kev_copy.ident, &_dispatch_wfds);
2615 (void)dispatch_atomic_inc(&_dispatch_select_workaround);
2616 _dispatch_debug("select workaround used to write fd %d: 0x%lx",
2617 (int)kev_copy.ident, (long)kev_copy.data);
2618 return 0;
2619 }
2620 break;
2621 default:
2622 // kevent error, _dispatch_source_merge_kevent() will handle it
2623 _dispatch_source_drain_kevent(&kev_copy);
2624 break;
2625 }
2626 break;
2627 }
2628 return kev_copy.data;
2629 }
2630
2631 static bool
2632 _dispatch_mgr_wakeup(dispatch_queue_t dq)
2633 {
2634 static const struct kevent kev = {
2635 .ident = 1,
2636 .filter = EVFILT_USER,
2637 .fflags = NOTE_TRIGGER,
2638 };
2639
2640 _dispatch_debug("waking up the _dispatch_mgr_q: %p", dq);
2641
2642 _dispatch_update_kq(&kev);
2643
2644 return false;
2645 }
2646
2647 static void
2648 _dispatch_mgr_thread2(struct kevent *kev, size_t cnt)
2649 {
2650 size_t i;
2651
2652 for (i = 0; i < cnt; i++) {
2653 // EVFILT_USER isn't used by sources
2654 if (kev[i].filter == EVFILT_USER) {
2655 // If _dispatch_mgr_thread2() ever is changed to return to the
2656 // caller, then this should become _dispatch_queue_drain()
2657 _dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q);
2658 } else {
2659 _dispatch_source_drain_kevent(&kev[i]);
2660 }
2661 }
2662 }
2663
2664 #if DISPATCH_USE_VM_PRESSURE && DISPATCH_USE_MALLOC_VM_PRESSURE_SOURCE
2665 // VM Pressure source for malloc <rdar://problem/7805121>
2666 static dispatch_source_t _dispatch_malloc_vm_pressure_source;
2667
2668 static void
2669 _dispatch_malloc_vm_pressure_handler(void *context DISPATCH_UNUSED)
2670 {
2671 malloc_zone_pressure_relief(0,0);
2672 }
2673
2674 static void
2675 _dispatch_malloc_vm_pressure_setup(void)
2676 {
2677 _dispatch_malloc_vm_pressure_source = dispatch_source_create(
2678 DISPATCH_SOURCE_TYPE_VM, 0, DISPATCH_VM_PRESSURE,
2679 _dispatch_get_root_queue(0, true));
2680 dispatch_source_set_event_handler_f(_dispatch_malloc_vm_pressure_source,
2681 _dispatch_malloc_vm_pressure_handler);
2682 dispatch_resume(_dispatch_malloc_vm_pressure_source);
2683 }
2684 #else
2685 #define _dispatch_malloc_vm_pressure_setup()
2686 #endif
2687
2688 DISPATCH_NOINLINE DISPATCH_NORETURN
2689 static void
2690 _dispatch_mgr_invoke(void)
2691 {
2692 static const struct timespec timeout_immediately = { 0, 0 };
2693 struct timespec timeout;
2694 const struct timespec *timeoutp;
2695 struct timeval sel_timeout, *sel_timeoutp;
2696 fd_set tmp_rfds, tmp_wfds;
2697 struct kevent kev[1];
2698 int k_cnt, err, i, r;
2699
2700 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_mgr_q);
2701 #if DISPATCH_COCOA_COMPAT
2702 // Do not count the manager thread as a worker thread
2703 (void)dispatch_atomic_dec(&_dispatch_worker_threads);
2704 #endif
2705 _dispatch_malloc_vm_pressure_setup();
2706
2707 for (;;) {
2708 _dispatch_run_timers();
2709
2710 timeoutp = _dispatch_get_next_timer_fire(&timeout);
2711
2712 if (_dispatch_select_workaround) {
2713 FD_COPY(&_dispatch_rfds, &tmp_rfds);
2714 FD_COPY(&_dispatch_wfds, &tmp_wfds);
2715 if (timeoutp) {
2716 sel_timeout.tv_sec = timeoutp->tv_sec;
2717 sel_timeout.tv_usec = (typeof(sel_timeout.tv_usec))
2718 (timeoutp->tv_nsec / 1000u);
2719 sel_timeoutp = &sel_timeout;
2720 } else {
2721 sel_timeoutp = NULL;
2722 }
2723
2724 r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, sel_timeoutp);
2725 if (r == -1) {
2726 err = errno;
2727 if (err != EBADF) {
2728 if (err != EINTR) {
2729 (void)dispatch_assume_zero(err);
2730 }
2731 continue;
2732 }
2733 for (i = 0; i < FD_SETSIZE; i++) {
2734 if (i == _dispatch_kq) {
2735 continue;
2736 }
2737 if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i,
2738 &_dispatch_wfds)) {
2739 continue;
2740 }
2741 r = dup(i);
2742 if (r != -1) {
2743 close(r);
2744 } else {
2745 if (FD_ISSET(i, &_dispatch_rfds)) {
2746 FD_CLR(i, &_dispatch_rfds);
2747 _dispatch_rfd_ptrs[i] = 0;
2748 (void)dispatch_atomic_dec(
2749 &_dispatch_select_workaround);
2750 }
2751 if (FD_ISSET(i, &_dispatch_wfds)) {
2752 FD_CLR(i, &_dispatch_wfds);
2753 _dispatch_wfd_ptrs[i] = 0;
2754 (void)dispatch_atomic_dec(
2755 &_dispatch_select_workaround);
2756 }
2757 }
2758 }
2759 continue;
2760 }
2761
2762 if (r > 0) {
2763 for (i = 0; i < FD_SETSIZE; i++) {
2764 if (i == _dispatch_kq) {
2765 continue;
2766 }
2767 if (FD_ISSET(i, &tmp_rfds)) {
2768 FD_CLR(i, &_dispatch_rfds); // emulate EV_DISABLE
2769 EV_SET(&kev[0], i, EVFILT_READ,
2770 EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2771 _dispatch_rfd_ptrs[i]);
2772 _dispatch_rfd_ptrs[i] = 0;
2773 (void)dispatch_atomic_dec(&_dispatch_select_workaround);
2774 _dispatch_mgr_thread2(kev, 1);
2775 }
2776 if (FD_ISSET(i, &tmp_wfds)) {
2777 FD_CLR(i, &_dispatch_wfds); // emulate EV_DISABLE
2778 EV_SET(&kev[0], i, EVFILT_WRITE,
2779 EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2780 _dispatch_wfd_ptrs[i]);
2781 _dispatch_wfd_ptrs[i] = 0;
2782 (void)dispatch_atomic_dec(&_dispatch_select_workaround);
2783 _dispatch_mgr_thread2(kev, 1);
2784 }
2785 }
2786 }
2787
2788 timeoutp = &timeout_immediately;
2789 }
2790
2791 k_cnt = kevent(_dispatch_kq, NULL, 0, kev, sizeof(kev) / sizeof(kev[0]),
2792 timeoutp);
2793 err = errno;
2794
2795 switch (k_cnt) {
2796 case -1:
2797 if (err == EBADF) {
2798 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2799 }
2800 if (err != EINTR) {
2801 (void)dispatch_assume_zero(err);
2802 }
2803 continue;
2804 default:
2805 _dispatch_mgr_thread2(kev, (size_t)k_cnt);
2806 // fall through
2807 case 0:
2808 _dispatch_force_cache_cleanup();
2809 continue;
2810 }
2811 }
2812 }
2813
2814 DISPATCH_NORETURN
2815 static dispatch_queue_t
2816 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED)
2817 {
2818 // never returns, so burn bridges behind us & clear stack 2k ahead
2819 _dispatch_clear_stack(2048);
2820 _dispatch_mgr_invoke();
2821 }