]> git.saurik.com Git - apple/libdispatch.git/blob - src/queue.c
libdispatch-339.92.1.tar.gz
[apple/libdispatch.git] / src / queue.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #endif
25
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
29 #endif
30 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
31 #define DISPATCH_USE_PTHREAD_POOL 1
32 #endif
33 #if HAVE_PTHREAD_WORKQUEUES && !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
34 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
35 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
36 #endif
37 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
38 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
39 #define pthread_workqueue_t void*
40 #endif
41
42 static void _dispatch_cache_cleanup(void *value);
43 static void _dispatch_async_f_redirect(dispatch_queue_t dq,
44 dispatch_continuation_t dc);
45 static void _dispatch_queue_cleanup(void *ctxt);
46 static inline void _dispatch_queue_wakeup_global2(dispatch_queue_t dq,
47 unsigned int n);
48 static inline void _dispatch_queue_wakeup_global(dispatch_queue_t dq);
49 static inline _dispatch_thread_semaphore_t
50 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq);
51 #if HAVE_PTHREAD_WORKQUEUES
52 static void _dispatch_worker_thread3(void *context);
53 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
54 static void _dispatch_worker_thread2(int priority, int options, void *context);
55 #endif
56 #endif
57 #if DISPATCH_USE_PTHREAD_POOL
58 static void *_dispatch_worker_thread(void *context);
59 static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
60 #endif
61
62 #if DISPATCH_COCOA_COMPAT
63 static dispatch_once_t _dispatch_main_q_port_pred;
64 static dispatch_queue_t _dispatch_main_queue_wakeup(void);
65 unsigned long _dispatch_runloop_queue_wakeup(dispatch_queue_t dq);
66 static void _dispatch_runloop_queue_port_init(void *ctxt);
67 static void _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq);
68 #endif
69
70 #pragma mark -
71 #pragma mark dispatch_root_queue
72
73 #if DISPATCH_ENABLE_THREAD_POOL
74 static struct dispatch_semaphore_s _dispatch_thread_mediator[] = {
75 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
76 .do_vtable = DISPATCH_VTABLE(semaphore),
77 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
78 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
79 },
80 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
81 .do_vtable = DISPATCH_VTABLE(semaphore),
82 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
83 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
84 },
85 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
86 .do_vtable = DISPATCH_VTABLE(semaphore),
87 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
88 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
89 },
90 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
91 .do_vtable = DISPATCH_VTABLE(semaphore),
92 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
93 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
94 },
95 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
96 .do_vtable = DISPATCH_VTABLE(semaphore),
97 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
98 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
99 },
100 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
101 .do_vtable = DISPATCH_VTABLE(semaphore),
102 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
103 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
104 },
105 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
106 .do_vtable = DISPATCH_VTABLE(semaphore),
107 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
108 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
109 },
110 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
111 .do_vtable = DISPATCH_VTABLE(semaphore),
112 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
113 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
114 },
115 };
116 #endif
117
118 #define MAX_PTHREAD_COUNT 255
119
120 struct dispatch_root_queue_context_s {
121 union {
122 struct {
123 unsigned int volatile dgq_pending;
124 #if HAVE_PTHREAD_WORKQUEUES
125 int dgq_wq_priority, dgq_wq_options;
126 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
127 pthread_workqueue_t dgq_kworkqueue;
128 #endif
129 #endif // HAVE_PTHREAD_WORKQUEUES
130 #if DISPATCH_USE_PTHREAD_POOL
131 void *dgq_ctxt;
132 dispatch_semaphore_t dgq_thread_mediator;
133 uint32_t volatile dgq_thread_pool_size;
134 #endif
135 };
136 char _dgq_pad[DISPATCH_CACHELINE_SIZE];
137 };
138 };
139 typedef struct dispatch_root_queue_context_s *dispatch_root_queue_context_t;
140
141 DISPATCH_CACHELINE_ALIGN
142 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
143 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {{{
144 #if HAVE_PTHREAD_WORKQUEUES
145 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
146 .dgq_wq_options = 0,
147 #endif
148 #if DISPATCH_ENABLE_THREAD_POOL
149 .dgq_thread_mediator = &_dispatch_thread_mediator[
150 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
151 #endif
152 }}},
153 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {{{
154 #if HAVE_PTHREAD_WORKQUEUES
155 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
156 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
157 #endif
158 #if DISPATCH_ENABLE_THREAD_POOL
159 .dgq_thread_mediator = &_dispatch_thread_mediator[
160 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
161 #endif
162 }}},
163 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {{{
164 #if HAVE_PTHREAD_WORKQUEUES
165 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
166 .dgq_wq_options = 0,
167 #endif
168 #if DISPATCH_ENABLE_THREAD_POOL
169 .dgq_thread_mediator = &_dispatch_thread_mediator[
170 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
171 #endif
172 }}},
173 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {{{
174 #if HAVE_PTHREAD_WORKQUEUES
175 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
176 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
177 #endif
178 #if DISPATCH_ENABLE_THREAD_POOL
179 .dgq_thread_mediator = &_dispatch_thread_mediator[
180 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
181 #endif
182 }}},
183 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {{{
184 #if HAVE_PTHREAD_WORKQUEUES
185 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
186 .dgq_wq_options = 0,
187 #endif
188 #if DISPATCH_ENABLE_THREAD_POOL
189 .dgq_thread_mediator = &_dispatch_thread_mediator[
190 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
191 #endif
192 }}},
193 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {{{
194 #if HAVE_PTHREAD_WORKQUEUES
195 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
196 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
197 #endif
198 #if DISPATCH_ENABLE_THREAD_POOL
199 .dgq_thread_mediator = &_dispatch_thread_mediator[
200 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
201 #endif
202 }}},
203 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {{{
204 #if HAVE_PTHREAD_WORKQUEUES
205 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
206 .dgq_wq_options = 0,
207 #endif
208 #if DISPATCH_ENABLE_THREAD_POOL
209 .dgq_thread_mediator = &_dispatch_thread_mediator[
210 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
211 #endif
212 }}},
213 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {{{
214 #if HAVE_PTHREAD_WORKQUEUES
215 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
216 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
217 #endif
218 #if DISPATCH_ENABLE_THREAD_POOL
219 .dgq_thread_mediator = &_dispatch_thread_mediator[
220 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
221 #endif
222 }}},
223 };
224
225 // 6618342 Contact the team that owns the Instrument DTrace probe before
226 // renaming this symbol
227 // dq_running is set to 2 so that barrier operations go through the slow path
228 DISPATCH_CACHELINE_ALIGN
229 struct dispatch_queue_s _dispatch_root_queues[] = {
230 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
231 .do_vtable = DISPATCH_VTABLE(queue_root),
232 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
233 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
234 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
235 .do_ctxt = &_dispatch_root_queue_contexts[
236 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
237 .dq_label = "com.apple.root.low-priority",
238 .dq_running = 2,
239 .dq_width = UINT32_MAX,
240 .dq_serialnum = 4,
241 },
242 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
243 .do_vtable = DISPATCH_VTABLE(queue_root),
244 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
245 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
246 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
247 .do_ctxt = &_dispatch_root_queue_contexts[
248 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
249 .dq_label = "com.apple.root.low-overcommit-priority",
250 .dq_running = 2,
251 .dq_width = UINT32_MAX,
252 .dq_serialnum = 5,
253 },
254 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
255 .do_vtable = DISPATCH_VTABLE(queue_root),
256 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
257 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
258 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
259 .do_ctxt = &_dispatch_root_queue_contexts[
260 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
261 .dq_label = "com.apple.root.default-priority",
262 .dq_running = 2,
263 .dq_width = UINT32_MAX,
264 .dq_serialnum = 6,
265 },
266 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
267 .do_vtable = DISPATCH_VTABLE(queue_root),
268 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
269 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
270 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
271 .do_ctxt = &_dispatch_root_queue_contexts[
272 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
273 .dq_label = "com.apple.root.default-overcommit-priority",
274 .dq_running = 2,
275 .dq_width = UINT32_MAX,
276 .dq_serialnum = 7,
277 },
278 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
279 .do_vtable = DISPATCH_VTABLE(queue_root),
280 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
281 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
282 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
283 .do_ctxt = &_dispatch_root_queue_contexts[
284 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
285 .dq_label = "com.apple.root.high-priority",
286 .dq_running = 2,
287 .dq_width = UINT32_MAX,
288 .dq_serialnum = 8,
289 },
290 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
291 .do_vtable = DISPATCH_VTABLE(queue_root),
292 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
293 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
294 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
295 .do_ctxt = &_dispatch_root_queue_contexts[
296 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
297 .dq_label = "com.apple.root.high-overcommit-priority",
298 .dq_running = 2,
299 .dq_width = UINT32_MAX,
300 .dq_serialnum = 9,
301 },
302 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
303 .do_vtable = DISPATCH_VTABLE(queue_root),
304 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
305 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
306 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
307 .do_ctxt = &_dispatch_root_queue_contexts[
308 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
309 .dq_label = "com.apple.root.background-priority",
310 .dq_running = 2,
311 .dq_width = UINT32_MAX,
312 .dq_serialnum = 10,
313 },
314 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
315 .do_vtable = DISPATCH_VTABLE(queue_root),
316 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
317 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
318 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
319 .do_ctxt = &_dispatch_root_queue_contexts[
320 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
321 .dq_label = "com.apple.root.background-overcommit-priority",
322 .dq_running = 2,
323 .dq_width = UINT32_MAX,
324 .dq_serialnum = 11,
325 },
326 };
327
328 #if HAVE_PTHREAD_WORKQUEUES
329 static const dispatch_queue_t _dispatch_wq2root_queues[][2] = {
330 [WORKQ_LOW_PRIOQUEUE][0] = &_dispatch_root_queues[
331 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
332 [WORKQ_LOW_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
333 &_dispatch_root_queues[
334 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
335 [WORKQ_DEFAULT_PRIOQUEUE][0] = &_dispatch_root_queues[
336 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
337 [WORKQ_DEFAULT_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
338 &_dispatch_root_queues[
339 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
340 [WORKQ_HIGH_PRIOQUEUE][0] = &_dispatch_root_queues[
341 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
342 [WORKQ_HIGH_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
343 &_dispatch_root_queues[
344 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
345 [WORKQ_BG_PRIOQUEUE][0] = &_dispatch_root_queues[
346 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
347 [WORKQ_BG_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
348 &_dispatch_root_queues[
349 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
350 };
351 #endif // HAVE_PTHREAD_WORKQUEUES
352
353 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
354 static struct dispatch_queue_s _dispatch_mgr_root_queue;
355 #else
356 #define _dispatch_mgr_root_queue \
357 _dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY]
358 #endif
359
360 // 6618342 Contact the team that owns the Instrument DTrace probe before
361 // renaming this symbol
362 DISPATCH_CACHELINE_ALIGN
363 struct dispatch_queue_s _dispatch_mgr_q = {
364 .do_vtable = DISPATCH_VTABLE(queue_mgr),
365 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
366 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
367 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
368 .do_targetq = &_dispatch_mgr_root_queue,
369 .dq_label = "com.apple.libdispatch-manager",
370 .dq_width = 1,
371 .dq_is_thread_bound = 1,
372 .dq_serialnum = 2,
373 };
374
375 dispatch_queue_t
376 dispatch_get_global_queue(long priority, unsigned long flags)
377 {
378 if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
379 return NULL;
380 }
381 return _dispatch_get_root_queue(priority,
382 flags & DISPATCH_QUEUE_OVERCOMMIT);
383 }
384
385 DISPATCH_ALWAYS_INLINE
386 static inline dispatch_queue_t
387 _dispatch_get_current_queue(void)
388 {
389 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
390 }
391
392 dispatch_queue_t
393 dispatch_get_current_queue(void)
394 {
395 return _dispatch_get_current_queue();
396 }
397
398 DISPATCH_ALWAYS_INLINE
399 static inline bool
400 _dispatch_queue_targets_queue(dispatch_queue_t dq1, dispatch_queue_t dq2)
401 {
402 while (dq1) {
403 if (dq1 == dq2) {
404 return true;
405 }
406 dq1 = dq1->do_targetq;
407 }
408 return false;
409 }
410
411 #define DISPATCH_ASSERT_QUEUE_MESSAGE "BUG in client of libdispatch: " \
412 "Assertion failed: Block was run on an unexpected queue"
413
414 DISPATCH_NOINLINE
415 static void
416 _dispatch_assert_queue_fail(dispatch_queue_t dq, bool expected)
417 {
418 char *msg;
419 asprintf(&msg, "%s\n%s queue: 0x%p[%s]", DISPATCH_ASSERT_QUEUE_MESSAGE,
420 expected ? "Expected" : "Unexpected", dq, dq->dq_label ?
421 dq->dq_label : "");
422 _dispatch_log("%s", msg);
423 _dispatch_set_crash_log_message(msg);
424 _dispatch_hardware_crash();
425 free(msg);
426 }
427
428 void
429 dispatch_assert_queue(dispatch_queue_t dq)
430 {
431 if (slowpath(!dq) || slowpath(!(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE))) {
432 DISPATCH_CLIENT_CRASH("invalid queue passed to "
433 "dispatch_assert_queue()");
434 }
435 dispatch_queue_t cq = _dispatch_queue_get_current();
436 if (fastpath(cq) && fastpath(_dispatch_queue_targets_queue(cq, dq))) {
437 return;
438 }
439 _dispatch_assert_queue_fail(dq, true);
440 }
441
442 void
443 dispatch_assert_queue_not(dispatch_queue_t dq)
444 {
445 if (slowpath(!dq) || slowpath(!(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE))) {
446 DISPATCH_CLIENT_CRASH("invalid queue passed to "
447 "dispatch_assert_queue_not()");
448 }
449 dispatch_queue_t cq = _dispatch_queue_get_current();
450 if (slowpath(cq) && slowpath(_dispatch_queue_targets_queue(cq, dq))) {
451 _dispatch_assert_queue_fail(dq, false);
452 }
453 }
454
455 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
456 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
457 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
458 #else
459 #define _dispatch_root_queue_debug(...)
460 #define _dispatch_debug_root_queue(...)
461 #endif
462
463 #pragma mark -
464 #pragma mark dispatch_init
465
466 static void
467 _dispatch_hw_config_init(void)
468 {
469 _dispatch_hw_config.cc_max_active = _dispatch_get_activecpu();
470 _dispatch_hw_config.cc_max_logical = _dispatch_get_logicalcpu_max();
471 _dispatch_hw_config.cc_max_physical = _dispatch_get_physicalcpu_max();
472 }
473
474 static inline bool
475 _dispatch_root_queues_init_workq(void)
476 {
477 bool result = false;
478 #if HAVE_PTHREAD_WORKQUEUES
479 bool disable_wq = false;
480 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
481 disable_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
482 #endif
483 int r;
484 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
485 if (!disable_wq) {
486 #if PTHREAD_WORKQUEUE_SPI_VERSION >= 20121218
487 pthread_workqueue_setdispatchoffset_np(
488 offsetof(struct dispatch_queue_s, dq_serialnum));
489 #endif
490 r = pthread_workqueue_setdispatch_np(_dispatch_worker_thread2);
491 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
492 (void)dispatch_assume_zero(r);
493 #endif
494 result = !r;
495 }
496 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
497 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
498 if (!result) {
499 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
500 pthread_workqueue_attr_t pwq_attr;
501 if (!disable_wq) {
502 r = pthread_workqueue_attr_init_np(&pwq_attr);
503 (void)dispatch_assume_zero(r);
504 }
505 #endif
506 int i;
507 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
508 pthread_workqueue_t pwq = NULL;
509 dispatch_root_queue_context_t qc;
510 qc = &_dispatch_root_queue_contexts[i];
511 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
512 if (!disable_wq
513 #if DISPATCH_NO_BG_PRIORITY
514 && (qc->dgq_wq_priority != WORKQ_BG_PRIOQUEUE)
515 #endif
516 ) {
517 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr,
518 qc->dgq_wq_priority);
519 (void)dispatch_assume_zero(r);
520 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr,
521 qc->dgq_wq_options &
522 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT);
523 (void)dispatch_assume_zero(r);
524 r = pthread_workqueue_create_np(&pwq, &pwq_attr);
525 (void)dispatch_assume_zero(r);
526 result = result || dispatch_assume(pwq);
527 }
528 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
529 qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul);
530 }
531 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
532 if (!disable_wq) {
533 r = pthread_workqueue_attr_destroy_np(&pwq_attr);
534 (void)dispatch_assume_zero(r);
535 }
536 #endif
537 }
538 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
539 #endif // HAVE_PTHREAD_WORKQUEUES
540 return result;
541 }
542
543 #if DISPATCH_USE_PTHREAD_POOL
544 static inline void
545 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc,
546 bool overcommit)
547 {
548 qc->dgq_thread_pool_size = overcommit ? MAX_PTHREAD_COUNT :
549 _dispatch_hw_config.cc_max_active;
550 #if USE_MACH_SEM
551 // override the default FIFO behavior for the pool semaphores
552 kern_return_t kr = semaphore_create(mach_task_self(),
553 &qc->dgq_thread_mediator->dsema_port, SYNC_POLICY_LIFO, 0);
554 DISPATCH_VERIFY_MIG(kr);
555 (void)dispatch_assume_zero(kr);
556 (void)dispatch_assume(qc->dgq_thread_mediator->dsema_port);
557 #elif USE_POSIX_SEM
558 /* XXXRW: POSIX semaphores don't support LIFO? */
559 int ret = sem_init(&qc->dgq_thread_mediator->dsema_sem, 0, 0);
560 (void)dispatch_assume_zero(ret);
561 #endif
562 }
563 #endif // DISPATCH_USE_PTHREAD_POOL
564
565 static void
566 _dispatch_root_queues_init(void *context DISPATCH_UNUSED)
567 {
568 _dispatch_safe_fork = false;
569 if (!_dispatch_root_queues_init_workq()) {
570 #if DISPATCH_ENABLE_THREAD_POOL
571 int i;
572 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
573 bool overcommit = true;
574 #if TARGET_OS_EMBEDDED
575 // some software hangs if the non-overcommitting queues do not
576 // overcommit when threads block. Someday, this behavior should
577 // apply to all platforms
578 if (!(i & 1)) {
579 overcommit = false;
580 }
581 #endif
582 _dispatch_root_queue_init_pthread_pool(
583 &_dispatch_root_queue_contexts[i], overcommit);
584 }
585 #else
586 DISPATCH_CRASH("Root queue initialization failed");
587 #endif // DISPATCH_ENABLE_THREAD_POOL
588 }
589
590 }
591
592 #define countof(x) (sizeof(x) / sizeof(x[0]))
593
594 DISPATCH_EXPORT DISPATCH_NOTHROW
595 void
596 libdispatch_init(void)
597 {
598 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT == 4);
599 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 8);
600
601 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW ==
602 -DISPATCH_QUEUE_PRIORITY_HIGH);
603 dispatch_assert(countof(_dispatch_root_queues) ==
604 DISPATCH_ROOT_QUEUE_COUNT);
605 dispatch_assert(countof(_dispatch_root_queue_contexts) ==
606 DISPATCH_ROOT_QUEUE_COUNT);
607 #if HAVE_PTHREAD_WORKQUEUES
608 dispatch_assert(sizeof(_dispatch_wq2root_queues) /
609 sizeof(_dispatch_wq2root_queues[0][0]) ==
610 DISPATCH_ROOT_QUEUE_COUNT);
611 #endif
612 #if DISPATCH_ENABLE_THREAD_POOL
613 dispatch_assert(countof(_dispatch_thread_mediator) ==
614 DISPATCH_ROOT_QUEUE_COUNT);
615 #endif
616
617 dispatch_assert(sizeof(struct dispatch_apply_s) <=
618 DISPATCH_CONTINUATION_SIZE);
619 dispatch_assert(sizeof(struct dispatch_queue_s) % DISPATCH_CACHELINE_SIZE
620 == 0);
621 dispatch_assert(sizeof(struct dispatch_root_queue_context_s) %
622 DISPATCH_CACHELINE_SIZE == 0);
623
624 _dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup);
625 #if !DISPATCH_USE_OS_SEMAPHORE_CACHE
626 _dispatch_thread_key_create(&dispatch_sema4_key,
627 (void (*)(void *))_dispatch_thread_semaphore_dispose);
628 #endif
629 _dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup);
630 _dispatch_thread_key_create(&dispatch_io_key, NULL);
631 _dispatch_thread_key_create(&dispatch_apply_key, NULL);
632 #if DISPATCH_PERF_MON
633 _dispatch_thread_key_create(&dispatch_bcounter_key, NULL);
634 #endif
635
636 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
637 _dispatch_main_q.do_targetq = &_dispatch_root_queues[
638 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY];
639 #endif
640
641 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_main_q);
642 _dispatch_queue_set_bound_thread(&_dispatch_main_q);
643
644 #if DISPATCH_USE_PTHREAD_ATFORK
645 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare,
646 dispatch_atfork_parent, dispatch_atfork_child));
647 #endif
648
649 _dispatch_hw_config_init();
650 _dispatch_vtable_init();
651 _os_object_init();
652 _dispatch_introspection_init();
653 }
654
655 DISPATCH_EXPORT DISPATCH_NOTHROW
656 void
657 dispatch_atfork_child(void)
658 {
659 void *crash = (void *)0x100;
660 size_t i;
661
662 if (_dispatch_safe_fork) {
663 return;
664 }
665 _dispatch_child_of_unsafe_fork = true;
666
667 _dispatch_main_q.dq_items_head = crash;
668 _dispatch_main_q.dq_items_tail = crash;
669
670 _dispatch_mgr_q.dq_items_head = crash;
671 _dispatch_mgr_q.dq_items_tail = crash;
672
673 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
674 _dispatch_root_queues[i].dq_items_head = crash;
675 _dispatch_root_queues[i].dq_items_tail = crash;
676 }
677 }
678
679 #pragma mark -
680 #pragma mark dispatch_queue_t
681
682 // skip zero
683 // 1 - main_q
684 // 2 - mgr_q
685 // 3 - mgr_root_q
686 // 4,5,6,7,8,9,10,11 - global queues
687 // we use 'xadd' on Intel, so the initial value == next assigned
688 unsigned long volatile _dispatch_queue_serial_numbers = 12;
689
690 dispatch_queue_t
691 dispatch_queue_create_with_target(const char *label,
692 dispatch_queue_attr_t attr, dispatch_queue_t tq)
693 {
694 dispatch_queue_t dq;
695
696 dq = _dispatch_alloc(DISPATCH_VTABLE(queue),
697 sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
698
699 _dispatch_queue_init(dq);
700 if (label) {
701 dq->dq_label = strdup(label);
702 }
703
704 if (attr == DISPATCH_QUEUE_CONCURRENT) {
705 dq->dq_width = UINT32_MAX;
706 if (!tq) {
707 tq = _dispatch_get_root_queue(0, false);
708 }
709 } else {
710 if (!tq) {
711 // Default target queue is overcommit!
712 tq = _dispatch_get_root_queue(0, true);
713 }
714 if (slowpath(attr)) {
715 dispatch_debug_assert(!attr, "Invalid attribute");
716 }
717 }
718 dq->do_targetq = tq;
719 _dispatch_object_debug(dq, "%s", __func__);
720 return _dispatch_introspection_queue_create(dq);
721 }
722
723 dispatch_queue_t
724 dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
725 {
726 return dispatch_queue_create_with_target(label, attr,
727 DISPATCH_TARGET_QUEUE_DEFAULT);
728 }
729
730 void
731 _dispatch_queue_destroy(dispatch_object_t dou)
732 {
733 dispatch_queue_t dq = dou._dq;
734 if (slowpath(dq == _dispatch_queue_get_current())) {
735 DISPATCH_CRASH("Release of a queue by itself");
736 }
737 if (slowpath(dq->dq_items_tail)) {
738 DISPATCH_CRASH("Release of a queue while items are enqueued");
739 }
740
741 // trash the tail queue so that use after free will crash
742 dq->dq_items_tail = (void *)0x200;
743
744 dispatch_queue_t dqsq = dispatch_atomic_xchg2o(dq, dq_specific_q,
745 (void *)0x200, relaxed);
746 if (dqsq) {
747 _dispatch_release(dqsq);
748 }
749 }
750
751 // 6618342 Contact the team that owns the Instrument DTrace probe before
752 // renaming this symbol
753 void
754 _dispatch_queue_dispose(dispatch_queue_t dq)
755 {
756 _dispatch_object_debug(dq, "%s", __func__);
757 _dispatch_introspection_queue_dispose(dq);
758 if (dq->dq_label) {
759 free((void*)dq->dq_label);
760 }
761 _dispatch_queue_destroy(dq);
762 }
763
764 const char *
765 dispatch_queue_get_label(dispatch_queue_t dq)
766 {
767 if (slowpath(dq == DISPATCH_CURRENT_QUEUE_LABEL)) {
768 dq = _dispatch_get_current_queue();
769 }
770 return dq->dq_label ? dq->dq_label : "";
771 }
772
773 static void
774 _dispatch_queue_set_width2(void *ctxt)
775 {
776 int w = (int)(intptr_t)ctxt; // intentional truncation
777 uint32_t tmp;
778 dispatch_queue_t dq = _dispatch_queue_get_current();
779
780 if (w == 1 || w == 0) {
781 dq->dq_width = 1;
782 _dispatch_object_debug(dq, "%s", __func__);
783 return;
784 }
785 if (w > 0) {
786 tmp = (unsigned int)w;
787 } else switch (w) {
788 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
789 tmp = _dispatch_hw_config.cc_max_physical;
790 break;
791 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
792 tmp = _dispatch_hw_config.cc_max_active;
793 break;
794 default:
795 // fall through
796 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
797 tmp = _dispatch_hw_config.cc_max_logical;
798 break;
799 }
800 // multiply by two since the running count is inc/dec by two
801 // (the low bit == barrier)
802 dq->dq_width = tmp * 2;
803 _dispatch_object_debug(dq, "%s", __func__);
804 }
805
806 void
807 dispatch_queue_set_width(dispatch_queue_t dq, long width)
808 {
809 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) ||
810 slowpath(dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE)) {
811 return;
812 }
813 _dispatch_barrier_trysync_f(dq, (void*)(intptr_t)width,
814 _dispatch_queue_set_width2);
815 }
816
817 // 6618342 Contact the team that owns the Instrument DTrace probe before
818 // renaming this symbol
819 static void
820 _dispatch_set_target_queue2(void *ctxt)
821 {
822 dispatch_queue_t prev_dq, dq = _dispatch_queue_get_current();
823
824 prev_dq = dq->do_targetq;
825 dq->do_targetq = ctxt;
826 _dispatch_release(prev_dq);
827 _dispatch_object_debug(dq, "%s", __func__);
828 }
829
830 void
831 dispatch_set_target_queue(dispatch_object_t dou, dispatch_queue_t dq)
832 {
833 DISPATCH_OBJECT_TFB(_dispatch_objc_set_target_queue, dou, dq);
834 if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) ||
835 slowpath(dx_type(dou._do) == DISPATCH_QUEUE_ROOT_TYPE)) {
836 return;
837 }
838 unsigned long type = dx_metatype(dou._do);
839 if (slowpath(!dq)) {
840 bool is_concurrent_q = (type == _DISPATCH_QUEUE_TYPE &&
841 slowpath(dou._dq->dq_width > 1));
842 dq = _dispatch_get_root_queue(0, !is_concurrent_q);
843 }
844 // TODO: put into the vtable
845 switch(type) {
846 case _DISPATCH_QUEUE_TYPE:
847 case _DISPATCH_SOURCE_TYPE:
848 _dispatch_retain(dq);
849 return _dispatch_barrier_trysync_f(dou._dq, dq,
850 _dispatch_set_target_queue2);
851 case _DISPATCH_IO_TYPE:
852 return _dispatch_io_set_target_queue(dou._dchannel, dq);
853 default: {
854 dispatch_queue_t prev_dq;
855 _dispatch_retain(dq);
856 prev_dq = dispatch_atomic_xchg2o(dou._do, do_targetq, dq, release);
857 if (prev_dq) _dispatch_release(prev_dq);
858 _dispatch_object_debug(dou._do, "%s", __func__);
859 return;
860 }
861 }
862 }
863
864 #pragma mark -
865 #pragma mark dispatch_pthread_root_queue
866
867 struct dispatch_pthread_root_queue_context_s {
868 pthread_attr_t dpq_thread_attr;
869 dispatch_block_t dpq_thread_configure;
870 struct dispatch_semaphore_s dpq_thread_mediator;
871 };
872 typedef struct dispatch_pthread_root_queue_context_s *
873 dispatch_pthread_root_queue_context_t;
874
875 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
876 static struct dispatch_pthread_root_queue_context_s
877 _dispatch_mgr_root_queue_pthread_context;
878 static struct dispatch_root_queue_context_s
879 _dispatch_mgr_root_queue_context = {{{
880 #if HAVE_PTHREAD_WORKQUEUES
881 .dgq_kworkqueue = (void*)(~0ul),
882 #endif
883 .dgq_ctxt = &_dispatch_mgr_root_queue_pthread_context,
884 .dgq_thread_pool_size = 1,
885 }}};
886 static struct dispatch_queue_s _dispatch_mgr_root_queue = {
887 .do_vtable = DISPATCH_VTABLE(queue_root),
888 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
889 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
890 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
891 .do_ctxt = &_dispatch_mgr_root_queue_context,
892 .dq_label = "com.apple.root.libdispatch-manager",
893 .dq_running = 2,
894 .dq_width = UINT32_MAX,
895 .dq_serialnum = 3,
896 };
897 static struct {
898 volatile int prio;
899 int policy;
900 pthread_t tid;
901 } _dispatch_mgr_sched;
902 static dispatch_once_t _dispatch_mgr_sched_pred;
903
904 static void
905 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED)
906 {
907 struct sched_param param;
908 pthread_attr_t *attr;
909 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
910 (void)dispatch_assume_zero(pthread_attr_init(attr));
911 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr,
912 &_dispatch_mgr_sched.policy));
913 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
914 // high-priority workq threads are at priority 2 above default
915 _dispatch_mgr_sched.prio = param.sched_priority + 2;
916 }
917
918 DISPATCH_NOINLINE
919 static pthread_t *
920 _dispatch_mgr_root_queue_init(void)
921 {
922 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
923 struct sched_param param;
924 pthread_attr_t *attr;
925 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
926 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr,
927 PTHREAD_CREATE_DETACHED));
928 #if !DISPATCH_DEBUG
929 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr, 64 * 1024));
930 #endif
931 param.sched_priority = _dispatch_mgr_sched.prio;
932 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr, &param));
933 return &_dispatch_mgr_sched.tid;
934 }
935
936 static inline void
937 _dispatch_mgr_priority_apply(void)
938 {
939 struct sched_param param;
940 do {
941 param.sched_priority = _dispatch_mgr_sched.prio;
942 (void)dispatch_assume_zero(pthread_setschedparam(
943 _dispatch_mgr_sched.tid, _dispatch_mgr_sched.policy, &param));
944 } while (_dispatch_mgr_sched.prio > param.sched_priority);
945 }
946
947 DISPATCH_NOINLINE
948 void
949 _dispatch_mgr_priority_init(void)
950 {
951 struct sched_param param;
952 pthread_attr_t *attr;
953 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
954 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
955 if (slowpath(_dispatch_mgr_sched.prio > param.sched_priority)) {
956 return _dispatch_mgr_priority_apply();
957 }
958 }
959
960 DISPATCH_NOINLINE
961 static void
962 _dispatch_mgr_priority_raise(const pthread_attr_t *attr)
963 {
964 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
965 struct sched_param param;
966 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
967 int p = _dispatch_mgr_sched.prio;
968 do if (p >= param.sched_priority) {
969 return;
970 } while (slowpath(!dispatch_atomic_cmpxchgvw2o(&_dispatch_mgr_sched, prio,
971 p, param.sched_priority, &p, relaxed)));
972 if (_dispatch_mgr_sched.tid) {
973 return _dispatch_mgr_priority_apply();
974 }
975 }
976
977 dispatch_queue_t
978 dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
979 const pthread_attr_t *attr, dispatch_block_t configure)
980 {
981 dispatch_queue_t dq;
982 dispatch_root_queue_context_t qc;
983 dispatch_pthread_root_queue_context_t pqc;
984 size_t dqs;
985
986 if (slowpath(flags)) {
987 return NULL;
988 }
989 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
990 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_root), dqs +
991 sizeof(struct dispatch_root_queue_context_s) +
992 sizeof(struct dispatch_pthread_root_queue_context_s));
993 qc = (void*)dq + dqs;
994 pqc = (void*)qc + sizeof(struct dispatch_root_queue_context_s);
995
996 _dispatch_queue_init(dq);
997 if (label) {
998 dq->dq_label = strdup(label);
999 }
1000
1001 dq->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK;
1002 dq->do_ctxt = qc;
1003 dq->do_targetq = NULL;
1004 dq->dq_running = 2;
1005 dq->dq_width = UINT32_MAX;
1006
1007 pqc->dpq_thread_mediator.do_vtable = DISPATCH_VTABLE(semaphore);
1008 qc->dgq_thread_mediator = &pqc->dpq_thread_mediator;
1009 qc->dgq_ctxt = pqc;
1010 #if HAVE_PTHREAD_WORKQUEUES
1011 qc->dgq_kworkqueue = (void*)(~0ul);
1012 #endif
1013 _dispatch_root_queue_init_pthread_pool(qc, true); // rdar://11352331
1014
1015 if (attr) {
1016 memcpy(&pqc->dpq_thread_attr, attr, sizeof(pthread_attr_t));
1017 _dispatch_mgr_priority_raise(&pqc->dpq_thread_attr);
1018 } else {
1019 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
1020 }
1021 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
1022 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
1023 if (configure) {
1024 pqc->dpq_thread_configure = _dispatch_Block_copy(configure);
1025 }
1026 _dispatch_object_debug(dq, "%s", __func__);
1027 return _dispatch_introspection_queue_create(dq);
1028 }
1029 #endif
1030
1031 void
1032 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq)
1033 {
1034 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
1035 DISPATCH_CRASH("Global root queue disposed");
1036 }
1037 _dispatch_object_debug(dq, "%s", __func__);
1038 _dispatch_introspection_queue_dispose(dq);
1039 #if DISPATCH_USE_PTHREAD_POOL
1040 dispatch_root_queue_context_t qc = dq->do_ctxt;
1041 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
1042
1043 _dispatch_semaphore_dispose(qc->dgq_thread_mediator);
1044 if (pqc->dpq_thread_configure) {
1045 Block_release(pqc->dpq_thread_configure);
1046 }
1047 dq->do_targetq = _dispatch_get_root_queue(0, false);
1048 #endif
1049 if (dq->dq_label) {
1050 free((void*)dq->dq_label);
1051 }
1052 _dispatch_queue_destroy(dq);
1053 }
1054
1055 #pragma mark -
1056 #pragma mark dispatch_queue_specific
1057
1058 struct dispatch_queue_specific_queue_s {
1059 DISPATCH_STRUCT_HEADER(queue_specific_queue);
1060 DISPATCH_QUEUE_HEADER;
1061 TAILQ_HEAD(dispatch_queue_specific_head_s,
1062 dispatch_queue_specific_s) dqsq_contexts;
1063 };
1064
1065 struct dispatch_queue_specific_s {
1066 const void *dqs_key;
1067 void *dqs_ctxt;
1068 dispatch_function_t dqs_destructor;
1069 TAILQ_ENTRY(dispatch_queue_specific_s) dqs_list;
1070 };
1071 DISPATCH_DECL(dispatch_queue_specific);
1072
1073 void
1074 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq)
1075 {
1076 dispatch_queue_specific_t dqs, tmp;
1077
1078 TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) {
1079 if (dqs->dqs_destructor) {
1080 dispatch_async_f(_dispatch_get_root_queue(
1081 DISPATCH_QUEUE_PRIORITY_DEFAULT, false), dqs->dqs_ctxt,
1082 dqs->dqs_destructor);
1083 }
1084 free(dqs);
1085 }
1086 _dispatch_queue_destroy((dispatch_queue_t)dqsq);
1087 }
1088
1089 static void
1090 _dispatch_queue_init_specific(dispatch_queue_t dq)
1091 {
1092 dispatch_queue_specific_queue_t dqsq;
1093
1094 dqsq = _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue),
1095 sizeof(struct dispatch_queue_specific_queue_s));
1096 _dispatch_queue_init((dispatch_queue_t)dqsq);
1097 dqsq->do_xref_cnt = -1;
1098 dqsq->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_HIGH,
1099 true);
1100 dqsq->dq_width = UINT32_MAX;
1101 dqsq->dq_label = "queue-specific";
1102 TAILQ_INIT(&dqsq->dqsq_contexts);
1103 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_specific_q, NULL,
1104 (dispatch_queue_t)dqsq, release))) {
1105 _dispatch_release((dispatch_queue_t)dqsq);
1106 }
1107 }
1108
1109 static void
1110 _dispatch_queue_set_specific(void *ctxt)
1111 {
1112 dispatch_queue_specific_t dqs, dqsn = ctxt;
1113 dispatch_queue_specific_queue_t dqsq =
1114 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
1115
1116 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
1117 if (dqs->dqs_key == dqsn->dqs_key) {
1118 // Destroy previous context for existing key
1119 if (dqs->dqs_destructor) {
1120 dispatch_async_f(_dispatch_get_root_queue(
1121 DISPATCH_QUEUE_PRIORITY_DEFAULT, false), dqs->dqs_ctxt,
1122 dqs->dqs_destructor);
1123 }
1124 if (dqsn->dqs_ctxt) {
1125 // Copy new context for existing key
1126 dqs->dqs_ctxt = dqsn->dqs_ctxt;
1127 dqs->dqs_destructor = dqsn->dqs_destructor;
1128 } else {
1129 // Remove context storage for existing key
1130 TAILQ_REMOVE(&dqsq->dqsq_contexts, dqs, dqs_list);
1131 free(dqs);
1132 }
1133 return free(dqsn);
1134 }
1135 }
1136 // Insert context storage for new key
1137 TAILQ_INSERT_TAIL(&dqsq->dqsq_contexts, dqsn, dqs_list);
1138 }
1139
1140 DISPATCH_NOINLINE
1141 void
1142 dispatch_queue_set_specific(dispatch_queue_t dq, const void *key,
1143 void *ctxt, dispatch_function_t destructor)
1144 {
1145 if (slowpath(!key)) {
1146 return;
1147 }
1148 dispatch_queue_specific_t dqs;
1149
1150 dqs = _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s));
1151 dqs->dqs_key = key;
1152 dqs->dqs_ctxt = ctxt;
1153 dqs->dqs_destructor = destructor;
1154 if (slowpath(!dq->dq_specific_q)) {
1155 _dispatch_queue_init_specific(dq);
1156 }
1157 _dispatch_barrier_trysync_f(dq->dq_specific_q, dqs,
1158 _dispatch_queue_set_specific);
1159 }
1160
1161 static void
1162 _dispatch_queue_get_specific(void *ctxt)
1163 {
1164 void **ctxtp = ctxt;
1165 void *key = *ctxtp;
1166 dispatch_queue_specific_queue_t dqsq =
1167 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
1168 dispatch_queue_specific_t dqs;
1169
1170 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
1171 if (dqs->dqs_key == key) {
1172 *ctxtp = dqs->dqs_ctxt;
1173 return;
1174 }
1175 }
1176 *ctxtp = NULL;
1177 }
1178
1179 DISPATCH_NOINLINE
1180 void *
1181 dispatch_queue_get_specific(dispatch_queue_t dq, const void *key)
1182 {
1183 if (slowpath(!key)) {
1184 return NULL;
1185 }
1186 void *ctxt = NULL;
1187
1188 if (fastpath(dq->dq_specific_q)) {
1189 ctxt = (void *)key;
1190 dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific);
1191 }
1192 return ctxt;
1193 }
1194
1195 DISPATCH_NOINLINE
1196 void *
1197 dispatch_get_specific(const void *key)
1198 {
1199 if (slowpath(!key)) {
1200 return NULL;
1201 }
1202 void *ctxt = NULL;
1203 dispatch_queue_t dq = _dispatch_queue_get_current();
1204
1205 while (slowpath(dq)) {
1206 if (slowpath(dq->dq_specific_q)) {
1207 ctxt = (void *)key;
1208 dispatch_sync_f(dq->dq_specific_q, &ctxt,
1209 _dispatch_queue_get_specific);
1210 if (ctxt) break;
1211 }
1212 dq = dq->do_targetq;
1213 }
1214 return ctxt;
1215 }
1216
1217 #pragma mark -
1218 #pragma mark dispatch_queue_debug
1219
1220 size_t
1221 _dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
1222 {
1223 size_t offset = 0;
1224 dispatch_queue_t target = dq->do_targetq;
1225 offset += dsnprintf(buf, bufsiz, "target = %s[%p], width = 0x%x, "
1226 "running = 0x%x, barrier = %d ", target && target->dq_label ?
1227 target->dq_label : "", target, dq->dq_width / 2,
1228 dq->dq_running / 2, dq->dq_running & 1);
1229 if (dq->dq_is_thread_bound) {
1230 offset += dsnprintf(buf, bufsiz, ", thread = %p ",
1231 _dispatch_queue_get_bound_thread(dq));
1232 }
1233 return offset;
1234 }
1235
1236 size_t
1237 dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
1238 {
1239 size_t offset = 0;
1240 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
1241 dq->dq_label ? dq->dq_label : dx_kind(dq), dq);
1242 offset += _dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
1243 offset += _dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
1244 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
1245 return offset;
1246 }
1247
1248 #if DISPATCH_DEBUG
1249 void
1250 dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
1251 if (fastpath(dq)) {
1252 _dispatch_object_debug(dq, "%s", str);
1253 } else {
1254 _dispatch_log("queue[NULL]: %s", str);
1255 }
1256 }
1257 #endif
1258
1259 #if DISPATCH_PERF_MON
1260 static OSSpinLock _dispatch_stats_lock;
1261 static struct {
1262 uint64_t time_total;
1263 uint64_t count_total;
1264 uint64_t thread_total;
1265 } _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set
1266
1267 static void
1268 _dispatch_queue_merge_stats(uint64_t start)
1269 {
1270 uint64_t avg, delta = _dispatch_absolute_time() - start;
1271 unsigned long count, bucket;
1272
1273 count = (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key);
1274 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
1275
1276 if (count) {
1277 avg = delta / count;
1278 bucket = flsll(avg);
1279 } else {
1280 bucket = 0;
1281 }
1282
1283 // 64-bit counters on 32-bit require a lock or a queue
1284 OSSpinLockLock(&_dispatch_stats_lock);
1285
1286 _dispatch_stats[bucket].time_total += delta;
1287 _dispatch_stats[bucket].count_total += count;
1288 _dispatch_stats[bucket].thread_total++;
1289
1290 OSSpinLockUnlock(&_dispatch_stats_lock);
1291 }
1292 #endif
1293
1294 #pragma mark -
1295 #pragma mark dispatch_continuation_t
1296
1297 static void
1298 _dispatch_force_cache_cleanup(void)
1299 {
1300 dispatch_continuation_t dc;
1301 dc = _dispatch_thread_getspecific(dispatch_cache_key);
1302 if (dc) {
1303 _dispatch_thread_setspecific(dispatch_cache_key, NULL);
1304 _dispatch_cache_cleanup(dc);
1305 }
1306 }
1307
1308 DISPATCH_NOINLINE
1309 static void
1310 _dispatch_cache_cleanup(void *value)
1311 {
1312 dispatch_continuation_t dc, next_dc = value;
1313
1314 while ((dc = next_dc)) {
1315 next_dc = dc->do_next;
1316 _dispatch_continuation_free_to_heap(dc);
1317 }
1318 }
1319
1320 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
1321 int _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT;
1322
1323 DISPATCH_NOINLINE
1324 void
1325 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc)
1326 {
1327 _dispatch_continuation_free_to_heap(dc);
1328 dispatch_continuation_t next_dc;
1329 dc = _dispatch_thread_getspecific(dispatch_cache_key);
1330 int cnt;
1331 if (!dc || (cnt = dc->do_ref_cnt-_dispatch_continuation_cache_limit) <= 0) {
1332 return;
1333 }
1334 do {
1335 next_dc = dc->do_next;
1336 _dispatch_continuation_free_to_heap(dc);
1337 } while (--cnt && (dc = next_dc));
1338 _dispatch_thread_setspecific(dispatch_cache_key, next_dc);
1339 }
1340 #endif
1341
1342 DISPATCH_ALWAYS_INLINE_NDEBUG
1343 static inline void
1344 _dispatch_continuation_redirect(dispatch_queue_t dq, dispatch_object_t dou)
1345 {
1346 dispatch_continuation_t dc = dou._dc;
1347
1348 (void)dispatch_atomic_add2o(dq, dq_running, 2, acquire);
1349 if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
1350 (long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT) {
1351 _dispatch_trace_continuation_pop(dq, dou);
1352 _dispatch_thread_semaphore_signal(
1353 (_dispatch_thread_semaphore_t)dc->dc_other);
1354 _dispatch_introspection_queue_item_complete(dou);
1355 } else {
1356 _dispatch_async_f_redirect(dq, dc);
1357 }
1358 _dispatch_perfmon_workitem_inc();
1359 }
1360
1361 DISPATCH_ALWAYS_INLINE_NDEBUG
1362 static inline void
1363 _dispatch_continuation_pop(dispatch_object_t dou)
1364 {
1365 dispatch_continuation_t dc = dou._dc, dc1;
1366 dispatch_group_t dg;
1367
1368 _dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou);
1369 if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
1370 return dx_invoke(dou._do);
1371 }
1372
1373 // Add the item back to the cache before calling the function. This
1374 // allows the 'hot' continuation to be used for a quick callback.
1375 //
1376 // The ccache version is per-thread.
1377 // Therefore, the object has not been reused yet.
1378 // This generates better assembly.
1379 if ((long)dc->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
1380 dc1 = _dispatch_continuation_free_cacheonly(dc);
1381 } else {
1382 dc1 = NULL;
1383 }
1384 if ((long)dc->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
1385 dg = dc->dc_data;
1386 } else {
1387 dg = NULL;
1388 }
1389 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
1390 if (dg) {
1391 dispatch_group_leave(dg);
1392 _dispatch_release(dg);
1393 }
1394 _dispatch_introspection_queue_item_complete(dou);
1395 if (slowpath(dc1)) {
1396 _dispatch_continuation_free_to_cache_limit(dc1);
1397 }
1398 }
1399
1400 #pragma mark -
1401 #pragma mark dispatch_barrier_async
1402
1403 DISPATCH_NOINLINE
1404 static void
1405 _dispatch_barrier_async_f_slow(dispatch_queue_t dq, void *ctxt,
1406 dispatch_function_t func)
1407 {
1408 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
1409
1410 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
1411 dc->dc_func = func;
1412 dc->dc_ctxt = ctxt;
1413
1414 _dispatch_queue_push(dq, dc);
1415 }
1416
1417 DISPATCH_NOINLINE
1418 void
1419 dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
1420 dispatch_function_t func)
1421 {
1422 dispatch_continuation_t dc;
1423
1424 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
1425 if (!dc) {
1426 return _dispatch_barrier_async_f_slow(dq, ctxt, func);
1427 }
1428
1429 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
1430 dc->dc_func = func;
1431 dc->dc_ctxt = ctxt;
1432
1433 _dispatch_queue_push(dq, dc);
1434 }
1435
1436 #ifdef __BLOCKS__
1437 void
1438 dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void))
1439 {
1440 dispatch_barrier_async_f(dq, _dispatch_Block_copy(work),
1441 _dispatch_call_block_and_release);
1442 }
1443 #endif
1444
1445 #pragma mark -
1446 #pragma mark dispatch_async
1447
1448 void
1449 _dispatch_async_redirect_invoke(void *ctxt)
1450 {
1451 struct dispatch_continuation_s *dc = ctxt;
1452 struct dispatch_continuation_s *other_dc = dc->dc_other;
1453 dispatch_queue_t old_dq, dq = dc->dc_data, rq;
1454
1455 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1456 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1457 _dispatch_continuation_pop(other_dc);
1458 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1459
1460 rq = dq->do_targetq;
1461 while (slowpath(rq->do_targetq) && rq != old_dq) {
1462 if (dispatch_atomic_sub2o(rq, dq_running, 2, relaxed) == 0) {
1463 _dispatch_wakeup(rq);
1464 }
1465 rq = rq->do_targetq;
1466 }
1467
1468 if (dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0) {
1469 _dispatch_wakeup(dq);
1470 }
1471 _dispatch_release(dq);
1472 }
1473
1474 static inline void
1475 _dispatch_async_f_redirect2(dispatch_queue_t dq, dispatch_continuation_t dc)
1476 {
1477 uint32_t running = 2;
1478
1479 // Find the queue to redirect to
1480 do {
1481 if (slowpath(dq->dq_items_tail) ||
1482 slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) ||
1483 slowpath(dq->dq_width == 1)) {
1484 break;
1485 }
1486 running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
1487 if (slowpath(running & 1) || slowpath(running > dq->dq_width)) {
1488 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
1489 break;
1490 }
1491 dq = dq->do_targetq;
1492 } while (slowpath(dq->do_targetq));
1493
1494 _dispatch_queue_push_wakeup(dq, dc, running == 0);
1495 }
1496
1497 DISPATCH_NOINLINE
1498 static void
1499 _dispatch_async_f_redirect(dispatch_queue_t dq,
1500 dispatch_continuation_t other_dc)
1501 {
1502 dispatch_continuation_t dc = _dispatch_continuation_alloc();
1503
1504 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1505 dc->dc_func = _dispatch_async_redirect_invoke;
1506 dc->dc_ctxt = dc;
1507 dc->dc_data = dq;
1508 dc->dc_other = other_dc;
1509
1510 _dispatch_retain(dq);
1511 dq = dq->do_targetq;
1512 if (slowpath(dq->do_targetq)) {
1513 return _dispatch_async_f_redirect2(dq, dc);
1514 }
1515
1516 _dispatch_queue_push(dq, dc);
1517 }
1518
1519 DISPATCH_NOINLINE
1520 static void
1521 _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
1522 {
1523 uint32_t running = 2;
1524
1525 do {
1526 if (slowpath(dq->dq_items_tail)
1527 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
1528 break;
1529 }
1530 running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
1531 if (slowpath(running > dq->dq_width)) {
1532 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
1533 break;
1534 }
1535 if (!slowpath(running & 1)) {
1536 return _dispatch_async_f_redirect(dq, dc);
1537 }
1538 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
1539 // We might get lucky and find that the barrier has ended by now
1540 } while (!(running & 1));
1541
1542 _dispatch_queue_push_wakeup(dq, dc, running == 0);
1543 }
1544
1545 DISPATCH_NOINLINE
1546 static void
1547 _dispatch_async_f_slow(dispatch_queue_t dq, void *ctxt,
1548 dispatch_function_t func)
1549 {
1550 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
1551
1552 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1553 dc->dc_func = func;
1554 dc->dc_ctxt = ctxt;
1555
1556 // No fastpath/slowpath hint because we simply don't know
1557 if (dq->do_targetq) {
1558 return _dispatch_async_f2(dq, dc);
1559 }
1560
1561 _dispatch_queue_push(dq, dc);
1562 }
1563
1564 DISPATCH_NOINLINE
1565 void
1566 dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
1567 {
1568 dispatch_continuation_t dc;
1569
1570 // No fastpath/slowpath hint because we simply don't know
1571 if (dq->dq_width == 1) {
1572 return dispatch_barrier_async_f(dq, ctxt, func);
1573 }
1574
1575 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
1576 if (!dc) {
1577 return _dispatch_async_f_slow(dq, ctxt, func);
1578 }
1579
1580 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1581 dc->dc_func = func;
1582 dc->dc_ctxt = ctxt;
1583
1584 // No fastpath/slowpath hint because we simply don't know
1585 if (dq->do_targetq) {
1586 return _dispatch_async_f2(dq, dc);
1587 }
1588
1589 _dispatch_queue_push(dq, dc);
1590 }
1591
1592 #ifdef __BLOCKS__
1593 void
1594 dispatch_async(dispatch_queue_t dq, void (^work)(void))
1595 {
1596 dispatch_async_f(dq, _dispatch_Block_copy(work),
1597 _dispatch_call_block_and_release);
1598 }
1599 #endif
1600
1601 #pragma mark -
1602 #pragma mark dispatch_group_async
1603
1604 DISPATCH_NOINLINE
1605 void
1606 dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
1607 dispatch_function_t func)
1608 {
1609 dispatch_continuation_t dc;
1610
1611 _dispatch_retain(dg);
1612 dispatch_group_enter(dg);
1613
1614 dc = _dispatch_continuation_alloc();
1615
1616 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_GROUP_BIT);
1617 dc->dc_func = func;
1618 dc->dc_ctxt = ctxt;
1619 dc->dc_data = dg;
1620
1621 // No fastpath/slowpath hint because we simply don't know
1622 if (dq->dq_width != 1 && dq->do_targetq) {
1623 return _dispatch_async_f2(dq, dc);
1624 }
1625
1626 _dispatch_queue_push(dq, dc);
1627 }
1628
1629 #ifdef __BLOCKS__
1630 void
1631 dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
1632 dispatch_block_t db)
1633 {
1634 dispatch_group_async_f(dg, dq, _dispatch_Block_copy(db),
1635 _dispatch_call_block_and_release);
1636 }
1637 #endif
1638
1639 #pragma mark -
1640 #pragma mark dispatch_function_invoke
1641
1642 DISPATCH_ALWAYS_INLINE
1643 static inline void
1644 _dispatch_function_invoke(dispatch_queue_t dq, void *ctxt,
1645 dispatch_function_t func)
1646 {
1647 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1648 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1649 _dispatch_client_callout(ctxt, func);
1650 _dispatch_perfmon_workitem_inc();
1651 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1652 }
1653
1654 void
1655 _dispatch_sync_recurse_invoke(void *ctxt)
1656 {
1657 dispatch_continuation_t dc = ctxt;
1658 _dispatch_function_invoke(dc->dc_data, dc->dc_ctxt, dc->dc_func);
1659 }
1660
1661 DISPATCH_ALWAYS_INLINE
1662 static inline void
1663 _dispatch_function_recurse(dispatch_queue_t dq, void *ctxt,
1664 dispatch_function_t func)
1665 {
1666 struct dispatch_continuation_s dc = {
1667 .dc_data = dq,
1668 .dc_func = func,
1669 .dc_ctxt = ctxt,
1670 };
1671 dispatch_sync_f(dq->do_targetq, &dc, _dispatch_sync_recurse_invoke);
1672 }
1673
1674 #pragma mark -
1675 #pragma mark dispatch_barrier_sync
1676
1677 static void _dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
1678 dispatch_function_t func);
1679
1680 DISPATCH_ALWAYS_INLINE_NDEBUG
1681 static inline _dispatch_thread_semaphore_t
1682 _dispatch_barrier_sync_f_pop(dispatch_queue_t dq, dispatch_object_t dou,
1683 bool lock)
1684 {
1685 _dispatch_thread_semaphore_t sema;
1686 dispatch_continuation_t dc = dou._dc;
1687
1688 if (DISPATCH_OBJ_IS_VTABLE(dc) || ((long)dc->do_vtable &
1689 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) !=
1690 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) {
1691 return 0;
1692 }
1693 _dispatch_trace_continuation_pop(dq, dc);
1694 _dispatch_perfmon_workitem_inc();
1695
1696 dc = dc->dc_ctxt;
1697 dq = dc->dc_data;
1698 sema = (_dispatch_thread_semaphore_t)dc->dc_other;
1699 if (lock) {
1700 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
1701 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
1702 // rdar://problem/9032024 running lock must be held until sync_f_slow
1703 // returns
1704 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
1705 }
1706 _dispatch_introspection_queue_item_complete(dou);
1707 return sema ? sema : MACH_PORT_DEAD;
1708 }
1709
1710 static void
1711 _dispatch_barrier_sync_f_slow_invoke(void *ctxt)
1712 {
1713 dispatch_continuation_t dc = ctxt;
1714 dispatch_queue_t dq = dc->dc_data;
1715 _dispatch_thread_semaphore_t sema;
1716 sema = (_dispatch_thread_semaphore_t)dc->dc_other;
1717
1718 dispatch_assert(dq == _dispatch_queue_get_current());
1719 #if DISPATCH_COCOA_COMPAT
1720 if (slowpath(dq->dq_is_thread_bound)) {
1721 // The queue is bound to a non-dispatch thread (e.g. main thread)
1722 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
1723 dispatch_atomic_store2o(dc, dc_func, NULL, release);
1724 _dispatch_thread_semaphore_signal(sema); // release
1725 return;
1726 }
1727 #endif
1728 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
1729 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
1730 // rdar://9032024 running lock must be held until sync_f_slow returns
1731 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
1732 _dispatch_thread_semaphore_signal(sema); // release
1733 }
1734
1735 DISPATCH_NOINLINE
1736 static void
1737 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
1738 dispatch_function_t func)
1739 {
1740 if (slowpath(!dq->do_targetq)) {
1741 // the global concurrent queues do not need strict ordering
1742 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
1743 return _dispatch_sync_f_invoke(dq, ctxt, func);
1744 }
1745 // It's preferred to execute synchronous blocks on the current thread
1746 // due to thread-local side effects, garbage collection, etc. However,
1747 // blocks submitted to the main thread MUST be run on the main thread
1748
1749 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
1750 struct dispatch_continuation_s dc = {
1751 .dc_data = dq,
1752 #if DISPATCH_COCOA_COMPAT
1753 .dc_func = func,
1754 .dc_ctxt = ctxt,
1755 #endif
1756 .dc_other = (void*)sema,
1757 };
1758 struct dispatch_continuation_s dbss = {
1759 .do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT |
1760 DISPATCH_OBJ_SYNC_SLOW_BIT),
1761 .dc_func = _dispatch_barrier_sync_f_slow_invoke,
1762 .dc_ctxt = &dc,
1763 #if DISPATCH_INTROSPECTION
1764 .dc_data = (void*)_dispatch_thread_self(),
1765 #endif
1766 };
1767 _dispatch_queue_push(dq, &dbss);
1768
1769 _dispatch_thread_semaphore_wait(sema); // acquire
1770 _dispatch_put_thread_semaphore(sema);
1771
1772 #if DISPATCH_COCOA_COMPAT
1773 // Queue bound to a non-dispatch thread
1774 if (dc.dc_func == NULL) {
1775 return;
1776 }
1777 #endif
1778 if (slowpath(dq->do_targetq->do_targetq)) {
1779 _dispatch_function_recurse(dq, ctxt, func);
1780 } else {
1781 _dispatch_function_invoke(dq, ctxt, func);
1782 }
1783 if (fastpath(dq->do_suspend_cnt < 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL) &&
1784 dq->dq_running == 2) {
1785 // rdar://problem/8290662 "lock transfer"
1786 sema = _dispatch_queue_drain_one_barrier_sync(dq);
1787 if (sema) {
1788 _dispatch_thread_semaphore_signal(sema); // release
1789 return;
1790 }
1791 }
1792 (void)dispatch_atomic_sub2o(dq, do_suspend_cnt,
1793 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
1794 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, release) == 0)) {
1795 _dispatch_wakeup(dq);
1796 }
1797 }
1798
1799 DISPATCH_NOINLINE
1800 static void
1801 _dispatch_barrier_sync_f2(dispatch_queue_t dq)
1802 {
1803 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
1804 // rdar://problem/8290662 "lock transfer"
1805 _dispatch_thread_semaphore_t sema;
1806 sema = _dispatch_queue_drain_one_barrier_sync(dq);
1807 if (sema) {
1808 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
1809 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
1810 // rdar://9032024 running lock must be held until sync_f_slow
1811 // returns: increment by 2 and decrement by 1
1812 (void)dispatch_atomic_inc2o(dq, dq_running, relaxed);
1813 _dispatch_thread_semaphore_signal(sema);
1814 return;
1815 }
1816 }
1817 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
1818 _dispatch_wakeup(dq);
1819 }
1820 }
1821
1822 DISPATCH_NOINLINE
1823 static void
1824 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
1825 dispatch_function_t func)
1826 {
1827 _dispatch_function_invoke(dq, ctxt, func);
1828 if (slowpath(dq->dq_items_tail)) {
1829 return _dispatch_barrier_sync_f2(dq);
1830 }
1831 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
1832 _dispatch_wakeup(dq);
1833 }
1834 }
1835
1836 DISPATCH_NOINLINE
1837 static void
1838 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
1839 dispatch_function_t func)
1840 {
1841 _dispatch_function_recurse(dq, ctxt, func);
1842 if (slowpath(dq->dq_items_tail)) {
1843 return _dispatch_barrier_sync_f2(dq);
1844 }
1845 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
1846 _dispatch_wakeup(dq);
1847 }
1848 }
1849
1850 DISPATCH_NOINLINE
1851 void
1852 dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
1853 dispatch_function_t func)
1854 {
1855 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1856 // 2) the queue is not suspended
1857 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
1858 return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
1859 }
1860 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, acquire))) {
1861 // global concurrent queues and queues bound to non-dispatch threads
1862 // always fall into the slow case
1863 return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
1864 }
1865 if (slowpath(dq->do_targetq->do_targetq)) {
1866 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func);
1867 }
1868 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
1869 }
1870
1871 #ifdef __BLOCKS__
1872 #if DISPATCH_COCOA_COMPAT
1873 DISPATCH_NOINLINE
1874 static void
1875 _dispatch_barrier_sync_slow(dispatch_queue_t dq, void (^work)(void))
1876 {
1877 // Blocks submitted to the main queue MUST be run on the main thread,
1878 // therefore under GC we must Block_copy in order to notify the thread-local
1879 // garbage collector that the objects are transferring to the main thread
1880 // rdar://problem/7176237&7181849&7458685
1881 if (dispatch_begin_thread_4GC) {
1882 dispatch_block_t block = _dispatch_Block_copy(work);
1883 return dispatch_barrier_sync_f(dq, block,
1884 _dispatch_call_block_and_release);
1885 }
1886 dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work));
1887 }
1888 #endif
1889
1890 void
1891 dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void))
1892 {
1893 #if DISPATCH_COCOA_COMPAT
1894 if (slowpath(dq->dq_is_thread_bound)) {
1895 return _dispatch_barrier_sync_slow(dq, work);
1896 }
1897 #endif
1898 dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work));
1899 }
1900 #endif
1901
1902 DISPATCH_NOINLINE
1903 static void
1904 _dispatch_barrier_trysync_f_invoke(dispatch_queue_t dq, void *ctxt,
1905 dispatch_function_t func)
1906 {
1907 _dispatch_function_invoke(dq, ctxt, func);
1908 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
1909 _dispatch_wakeup(dq);
1910 }
1911 }
1912
1913 DISPATCH_NOINLINE
1914 void
1915 _dispatch_barrier_trysync_f(dispatch_queue_t dq, void *ctxt,
1916 dispatch_function_t func)
1917 {
1918 // Use for mutation of queue-/source-internal state only, ignores target
1919 // queue hierarchy!
1920 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))
1921 || slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1,
1922 acquire))) {
1923 return dispatch_barrier_async_f(dq, ctxt, func);
1924 }
1925 _dispatch_barrier_trysync_f_invoke(dq, ctxt, func);
1926 }
1927
1928 #pragma mark -
1929 #pragma mark dispatch_sync
1930
1931 DISPATCH_NOINLINE
1932 static void
1933 _dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
1934 bool wakeup)
1935 {
1936 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
1937 struct dispatch_continuation_s dss = {
1938 .do_vtable = (void*)DISPATCH_OBJ_SYNC_SLOW_BIT,
1939 #if DISPATCH_INTROSPECTION
1940 .dc_func = func,
1941 .dc_ctxt = ctxt,
1942 .dc_data = (void*)_dispatch_thread_self(),
1943 #endif
1944 .dc_other = (void*)sema,
1945 };
1946 _dispatch_queue_push_wakeup(dq, &dss, wakeup);
1947
1948 _dispatch_thread_semaphore_wait(sema);
1949 _dispatch_put_thread_semaphore(sema);
1950
1951 if (slowpath(dq->do_targetq->do_targetq)) {
1952 _dispatch_function_recurse(dq, ctxt, func);
1953 } else {
1954 _dispatch_function_invoke(dq, ctxt, func);
1955 }
1956 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
1957 _dispatch_wakeup(dq);
1958 }
1959 }
1960
1961 DISPATCH_NOINLINE
1962 static void
1963 _dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
1964 dispatch_function_t func)
1965 {
1966 _dispatch_function_invoke(dq, ctxt, func);
1967 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
1968 _dispatch_wakeup(dq);
1969 }
1970 }
1971
1972 DISPATCH_NOINLINE
1973 static void
1974 _dispatch_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
1975 dispatch_function_t func)
1976 {
1977 _dispatch_function_recurse(dq, ctxt, func);
1978 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
1979 _dispatch_wakeup(dq);
1980 }
1981 }
1982
1983 static inline void
1984 _dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
1985 {
1986 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1987 // 2) the queue is not suspended
1988 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
1989 return _dispatch_sync_f_slow(dq, ctxt, func, false);
1990 }
1991 uint32_t running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
1992 // re-check suspension after barrier check <rdar://problem/15242126>
1993 if (slowpath(running & 1) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
1994 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
1995 return _dispatch_sync_f_slow(dq, ctxt, func, running == 0);
1996 }
1997 if (slowpath(dq->do_targetq->do_targetq)) {
1998 return _dispatch_sync_f_recurse(dq, ctxt, func);
1999 }
2000 _dispatch_sync_f_invoke(dq, ctxt, func);
2001 }
2002
2003 DISPATCH_NOINLINE
2004 void
2005 dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
2006 {
2007 if (fastpath(dq->dq_width == 1)) {
2008 return dispatch_barrier_sync_f(dq, ctxt, func);
2009 }
2010 if (slowpath(!dq->do_targetq)) {
2011 // the global concurrent queues do not need strict ordering
2012 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
2013 return _dispatch_sync_f_invoke(dq, ctxt, func);
2014 }
2015 _dispatch_sync_f2(dq, ctxt, func);
2016 }
2017
2018 #ifdef __BLOCKS__
2019 #if DISPATCH_COCOA_COMPAT
2020 DISPATCH_NOINLINE
2021 static void
2022 _dispatch_sync_slow(dispatch_queue_t dq, void (^work)(void))
2023 {
2024 // Blocks submitted to the main queue MUST be run on the main thread,
2025 // therefore under GC we must Block_copy in order to notify the thread-local
2026 // garbage collector that the objects are transferring to the main thread
2027 // rdar://problem/7176237&7181849&7458685
2028 if (dispatch_begin_thread_4GC) {
2029 dispatch_block_t block = _dispatch_Block_copy(work);
2030 return dispatch_sync_f(dq, block, _dispatch_call_block_and_release);
2031 }
2032 dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
2033 }
2034 #endif
2035
2036 void
2037 dispatch_sync(dispatch_queue_t dq, void (^work)(void))
2038 {
2039 #if DISPATCH_COCOA_COMPAT
2040 if (slowpath(dq->dq_is_thread_bound)) {
2041 return _dispatch_sync_slow(dq, work);
2042 }
2043 #endif
2044 dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
2045 }
2046 #endif
2047
2048 #pragma mark -
2049 #pragma mark dispatch_after
2050
2051 void
2052 _dispatch_after_timer_callback(void *ctxt)
2053 {
2054 dispatch_continuation_t dc = ctxt, dc1;
2055 dispatch_source_t ds = dc->dc_data;
2056 dc1 = _dispatch_continuation_free_cacheonly(dc);
2057 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
2058 dispatch_source_cancel(ds);
2059 dispatch_release(ds);
2060 if (slowpath(dc1)) {
2061 _dispatch_continuation_free_to_cache_limit(dc1);
2062 }
2063 }
2064
2065 DISPATCH_NOINLINE
2066 void
2067 dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
2068 dispatch_function_t func)
2069 {
2070 uint64_t delta, leeway;
2071 dispatch_source_t ds;
2072
2073 if (when == DISPATCH_TIME_FOREVER) {
2074 #if DISPATCH_DEBUG
2075 DISPATCH_CLIENT_CRASH(
2076 "dispatch_after_f() called with 'when' == infinity");
2077 #endif
2078 return;
2079 }
2080
2081 delta = _dispatch_timeout(when);
2082 if (delta == 0) {
2083 return dispatch_async_f(queue, ctxt, func);
2084 }
2085 leeway = delta / 10; // <rdar://problem/13447496>
2086 if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
2087 if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
2088
2089 // this function can and should be optimized to not use a dispatch source
2090 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);
2091 dispatch_assert(ds);
2092
2093 dispatch_continuation_t dc = _dispatch_continuation_alloc();
2094 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
2095 dc->dc_func = func;
2096 dc->dc_ctxt = ctxt;
2097 dc->dc_data = ds;
2098
2099 dispatch_set_context(ds, dc);
2100 dispatch_source_set_event_handler_f(ds, _dispatch_after_timer_callback);
2101 dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, leeway);
2102 dispatch_resume(ds);
2103 }
2104
2105 #ifdef __BLOCKS__
2106 void
2107 dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
2108 dispatch_block_t work)
2109 {
2110 // test before the copy of the block
2111 if (when == DISPATCH_TIME_FOREVER) {
2112 #if DISPATCH_DEBUG
2113 DISPATCH_CLIENT_CRASH(
2114 "dispatch_after() called with 'when' == infinity");
2115 #endif
2116 return;
2117 }
2118 dispatch_after_f(when, queue, _dispatch_Block_copy(work),
2119 _dispatch_call_block_and_release);
2120 }
2121 #endif
2122
2123 #pragma mark -
2124 #pragma mark dispatch_queue_push
2125
2126 DISPATCH_NOINLINE
2127 static void
2128 _dispatch_queue_push_list_slow2(dispatch_queue_t dq,
2129 struct dispatch_object_s *obj)
2130 {
2131 // The queue must be retained before dq_items_head is written in order
2132 // to ensure that the reference is still valid when _dispatch_wakeup is
2133 // called. Otherwise, if preempted between the assignment to
2134 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
2135 // queue may release the last reference to the queue when invoked by
2136 // _dispatch_queue_drain. <rdar://problem/6932776>
2137 _dispatch_retain(dq);
2138 dq->dq_items_head = obj;
2139 _dispatch_wakeup(dq);
2140 _dispatch_release(dq);
2141 }
2142
2143 DISPATCH_NOINLINE
2144 void
2145 _dispatch_queue_push_list_slow(dispatch_queue_t dq,
2146 struct dispatch_object_s *obj, unsigned int n)
2147 {
2148 if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) {
2149 dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed);
2150 return _dispatch_queue_wakeup_global2(dq, n);
2151 }
2152 _dispatch_queue_push_list_slow2(dq, obj);
2153 }
2154
2155 DISPATCH_NOINLINE
2156 void
2157 _dispatch_queue_push_slow(dispatch_queue_t dq,
2158 struct dispatch_object_s *obj)
2159 {
2160 if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) {
2161 dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed);
2162 return _dispatch_queue_wakeup_global(dq);
2163 }
2164 _dispatch_queue_push_list_slow2(dq, obj);
2165 }
2166
2167 #pragma mark -
2168 #pragma mark dispatch_queue_probe
2169
2170 unsigned long
2171 _dispatch_queue_probe(dispatch_queue_t dq)
2172 {
2173 return (unsigned long)slowpath(dq->dq_items_tail != NULL);
2174 }
2175
2176 #if DISPATCH_COCOA_COMPAT
2177 unsigned long
2178 _dispatch_runloop_queue_probe(dispatch_queue_t dq)
2179 {
2180 if (_dispatch_queue_probe(dq)) {
2181 if (dq->do_xref_cnt == -1) return true; // <rdar://problem/14026816>
2182 return _dispatch_runloop_queue_wakeup(dq);
2183 }
2184 return false;
2185 }
2186 #endif
2187
2188 unsigned long
2189 _dispatch_mgr_queue_probe(dispatch_queue_t dq)
2190 {
2191 if (_dispatch_queue_probe(dq)) {
2192 return _dispatch_mgr_wakeup(dq);
2193 }
2194 return false;
2195 }
2196
2197 unsigned long
2198 _dispatch_root_queue_probe(dispatch_queue_t dq)
2199 {
2200 _dispatch_queue_wakeup_global(dq);
2201 return false;
2202 }
2203
2204 #pragma mark -
2205 #pragma mark dispatch_wakeup
2206
2207 // 6618342 Contact the team that owns the Instrument DTrace probe before
2208 // renaming this symbol
2209 dispatch_queue_t
2210 _dispatch_wakeup(dispatch_object_t dou)
2211 {
2212 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
2213 return NULL;
2214 }
2215 if (!dx_probe(dou._do)) {
2216 return NULL;
2217 }
2218 if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,
2219 DISPATCH_OBJECT_SUSPEND_LOCK, release)) {
2220 #if DISPATCH_COCOA_COMPAT
2221 if (dou._dq == &_dispatch_main_q) {
2222 return _dispatch_main_queue_wakeup();
2223 }
2224 #endif
2225 return NULL;
2226 }
2227 _dispatch_retain(dou._do);
2228 dispatch_queue_t tq = dou._do->do_targetq;
2229 _dispatch_queue_push(tq, dou._do);
2230 return tq; // libdispatch does not need this, but the Instrument DTrace
2231 // probe does
2232 }
2233
2234 #if DISPATCH_COCOA_COMPAT
2235 static inline void
2236 _dispatch_runloop_queue_wakeup_thread(dispatch_queue_t dq)
2237 {
2238 mach_port_t mp = (mach_port_t)dq->do_ctxt;
2239 if (!mp) {
2240 return;
2241 }
2242 kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0);
2243 switch (kr) {
2244 case MACH_SEND_TIMEOUT:
2245 case MACH_SEND_TIMED_OUT:
2246 case MACH_SEND_INVALID_DEST:
2247 break;
2248 default:
2249 (void)dispatch_assume_zero(kr);
2250 break;
2251 }
2252 }
2253
2254 DISPATCH_NOINLINE DISPATCH_WEAK
2255 unsigned long
2256 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq)
2257 {
2258 _dispatch_runloop_queue_wakeup_thread(dq);
2259 return false;
2260 }
2261
2262 DISPATCH_NOINLINE
2263 static dispatch_queue_t
2264 _dispatch_main_queue_wakeup(void)
2265 {
2266 dispatch_queue_t dq = &_dispatch_main_q;
2267 if (!dq->dq_is_thread_bound) {
2268 return NULL;
2269 }
2270 dispatch_once_f(&_dispatch_main_q_port_pred, dq,
2271 _dispatch_runloop_queue_port_init);
2272 _dispatch_runloop_queue_wakeup_thread(dq);
2273 return NULL;
2274 }
2275 #endif
2276
2277 DISPATCH_NOINLINE
2278 static void
2279 _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq, unsigned int n)
2280 {
2281 static dispatch_once_t pred;
2282 dispatch_root_queue_context_t qc = dq->do_ctxt;
2283 uint32_t i = n;
2284 int r;
2285
2286 _dispatch_debug_root_queue(dq, __func__);
2287 dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);
2288
2289 #if HAVE_PTHREAD_WORKQUEUES
2290 #if DISPATCH_USE_PTHREAD_POOL
2291 if (qc->dgq_kworkqueue != (void*)(~0ul))
2292 #endif
2293 {
2294 _dispatch_root_queue_debug("requesting new worker thread for global "
2295 "queue: %p", dq);
2296 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
2297 if (qc->dgq_kworkqueue) {
2298 pthread_workitem_handle_t wh;
2299 unsigned int gen_cnt;
2300 do {
2301 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
2302 _dispatch_worker_thread3, dq, &wh, &gen_cnt);
2303 (void)dispatch_assume_zero(r);
2304 } while (--i);
2305 return;
2306 }
2307 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
2308 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2309 r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
2310 qc->dgq_wq_options, (int)i);
2311 (void)dispatch_assume_zero(r);
2312 #endif
2313 return;
2314 }
2315 #endif // HAVE_PTHREAD_WORKQUEUES
2316 #if DISPATCH_USE_PTHREAD_POOL
2317 if (fastpath(qc->dgq_thread_mediator)) {
2318 while (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
2319 if (!--i) {
2320 return;
2321 }
2322 }
2323 }
2324 uint32_t j, t_count = qc->dgq_thread_pool_size;
2325 do {
2326 if (!t_count) {
2327 _dispatch_root_queue_debug("pthread pool is full for root queue: "
2328 "%p", dq);
2329 return;
2330 }
2331 j = i > t_count ? t_count : i;
2332 } while (!dispatch_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count,
2333 t_count - j, &t_count, relaxed));
2334
2335 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
2336 pthread_attr_t *attr = pqc ? &pqc->dpq_thread_attr : NULL;
2337 pthread_t tid, *pthr = &tid;
2338 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2339 if (slowpath(dq == &_dispatch_mgr_root_queue)) {
2340 pthr = _dispatch_mgr_root_queue_init();
2341 }
2342 #endif
2343 do {
2344 _dispatch_retain(dq);
2345 while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
2346 if (r != EAGAIN) {
2347 (void)dispatch_assume_zero(r);
2348 }
2349 _dispatch_temporary_resource_shortage();
2350 }
2351 if (!attr) {
2352 r = pthread_detach(*pthr);
2353 (void)dispatch_assume_zero(r);
2354 }
2355 } while (--j);
2356 #endif // DISPATCH_USE_PTHREAD_POOL
2357 }
2358
2359 static inline void
2360 _dispatch_queue_wakeup_global2(dispatch_queue_t dq, unsigned int n)
2361 {
2362 if (!dq->dq_items_tail) {
2363 return;
2364 }
2365 #if HAVE_PTHREAD_WORKQUEUES
2366 dispatch_root_queue_context_t qc = dq->do_ctxt;
2367 if (
2368 #if DISPATCH_USE_PTHREAD_POOL
2369 (qc->dgq_kworkqueue != (void*)(~0ul)) &&
2370 #endif
2371 !dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) {
2372 _dispatch_root_queue_debug("worker thread request still pending for "
2373 "global queue: %p", dq);
2374 return;
2375 }
2376 #endif // HAVE_PTHREAD_WORKQUEUES
2377 return _dispatch_queue_wakeup_global_slow(dq, n);
2378 }
2379
2380 static inline void
2381 _dispatch_queue_wakeup_global(dispatch_queue_t dq)
2382 {
2383 return _dispatch_queue_wakeup_global2(dq, 1);
2384 }
2385
2386 #pragma mark -
2387 #pragma mark dispatch_queue_invoke
2388
2389 DISPATCH_ALWAYS_INLINE
2390 static inline dispatch_queue_t
2391 dispatch_queue_invoke2(dispatch_object_t dou,
2392 _dispatch_thread_semaphore_t *sema_ptr)
2393 {
2394 dispatch_queue_t dq = dou._dq;
2395 dispatch_queue_t otq = dq->do_targetq;
2396 *sema_ptr = _dispatch_queue_drain(dq);
2397
2398 if (slowpath(otq != dq->do_targetq)) {
2399 // An item on the queue changed the target queue
2400 return dq->do_targetq;
2401 }
2402 return NULL;
2403 }
2404
2405 // 6618342 Contact the team that owns the Instrument DTrace probe before
2406 // renaming this symbol
2407 DISPATCH_NOINLINE
2408 void
2409 _dispatch_queue_invoke(dispatch_queue_t dq)
2410 {
2411 _dispatch_queue_class_invoke(dq, dispatch_queue_invoke2);
2412 }
2413
2414 #pragma mark -
2415 #pragma mark dispatch_queue_drain
2416
2417 DISPATCH_ALWAYS_INLINE
2418 static inline struct dispatch_object_s*
2419 _dispatch_queue_head(dispatch_queue_t dq)
2420 {
2421 struct dispatch_object_s *dc;
2422 while (!(dc = fastpath(dq->dq_items_head))) {
2423 dispatch_hardware_pause();
2424 }
2425 return dc;
2426 }
2427
2428 DISPATCH_ALWAYS_INLINE
2429 static inline struct dispatch_object_s*
2430 _dispatch_queue_next(dispatch_queue_t dq, struct dispatch_object_s *dc)
2431 {
2432 struct dispatch_object_s *next_dc;
2433 next_dc = fastpath(dc->do_next);
2434 dq->dq_items_head = next_dc;
2435 if (!next_dc && !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, dc, NULL,
2436 relaxed)) {
2437 // Enqueue is TIGHTLY controlled, we won't wait long.
2438 while (!(next_dc = fastpath(dc->do_next))) {
2439 dispatch_hardware_pause();
2440 }
2441 dq->dq_items_head = next_dc;
2442 }
2443 return next_dc;
2444 }
2445
2446 _dispatch_thread_semaphore_t
2447 _dispatch_queue_drain(dispatch_object_t dou)
2448 {
2449 dispatch_queue_t dq = dou._dq, orig_tq, old_dq;
2450 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2451 struct dispatch_object_s *dc, *next_dc;
2452 _dispatch_thread_semaphore_t sema = 0;
2453
2454 // Continue draining sources after target queue change rdar://8928171
2455 bool check_tq = (dx_type(dq) != DISPATCH_SOURCE_KEVENT_TYPE);
2456
2457 orig_tq = dq->do_targetq;
2458
2459 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2460 //dispatch_debug_queue(dq, __func__);
2461
2462 while (dq->dq_items_tail) {
2463 dc = _dispatch_queue_head(dq);
2464 do {
2465 if (DISPATCH_OBJECT_SUSPENDED(dq)) {
2466 goto out;
2467 }
2468 if (dq->dq_running > dq->dq_width) {
2469 goto out;
2470 }
2471 if (slowpath(orig_tq != dq->do_targetq) && check_tq) {
2472 goto out;
2473 }
2474 bool redirect = false;
2475 if (!fastpath(dq->dq_width == 1)) {
2476 if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
2477 (long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
2478 if (dq->dq_running > 1) {
2479 goto out;
2480 }
2481 } else {
2482 redirect = true;
2483 }
2484 }
2485 next_dc = _dispatch_queue_next(dq, dc);
2486 if (redirect) {
2487 _dispatch_continuation_redirect(dq, dc);
2488 continue;
2489 }
2490 if ((sema = _dispatch_barrier_sync_f_pop(dq, dc, true))) {
2491 goto out;
2492 }
2493 _dispatch_continuation_pop(dc);
2494 _dispatch_perfmon_workitem_inc();
2495 } while ((dc = next_dc));
2496 }
2497
2498 out:
2499 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2500 return sema;
2501 }
2502
2503 #if DISPATCH_COCOA_COMPAT
2504 static void
2505 _dispatch_main_queue_drain(void)
2506 {
2507 dispatch_queue_t dq = &_dispatch_main_q;
2508 if (!dq->dq_items_tail) {
2509 return;
2510 }
2511 struct dispatch_continuation_s marker = {
2512 .do_vtable = NULL,
2513 };
2514 struct dispatch_object_s *dmarker = (void*)&marker;
2515 _dispatch_queue_push_notrace(dq, dmarker);
2516
2517 _dispatch_perfmon_start();
2518 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2519 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2520
2521 struct dispatch_object_s *dc, *next_dc;
2522 dc = _dispatch_queue_head(dq);
2523 do {
2524 next_dc = _dispatch_queue_next(dq, dc);
2525 if (dc == dmarker) {
2526 goto out;
2527 }
2528 _dispatch_continuation_pop(dc);
2529 _dispatch_perfmon_workitem_inc();
2530 } while ((dc = next_dc));
2531 DISPATCH_CRASH("Main queue corruption");
2532
2533 out:
2534 if (next_dc) {
2535 _dispatch_main_queue_wakeup();
2536 }
2537 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2538 _dispatch_perfmon_end();
2539 _dispatch_force_cache_cleanup();
2540 }
2541
2542 static bool
2543 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq)
2544 {
2545 if (!dq->dq_items_tail) {
2546 return false;
2547 }
2548 _dispatch_perfmon_start();
2549 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2550 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2551
2552 struct dispatch_object_s *dc, *next_dc;
2553 dc = _dispatch_queue_head(dq);
2554 next_dc = _dispatch_queue_next(dq, dc);
2555 _dispatch_continuation_pop(dc);
2556 _dispatch_perfmon_workitem_inc();
2557
2558 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2559 _dispatch_perfmon_end();
2560 _dispatch_force_cache_cleanup();
2561 return next_dc;
2562 }
2563 #endif
2564
2565 DISPATCH_ALWAYS_INLINE_NDEBUG
2566 static inline _dispatch_thread_semaphore_t
2567 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq)
2568 {
2569 // rdar://problem/8290662 "lock transfer"
2570 struct dispatch_object_s *dc;
2571 _dispatch_thread_semaphore_t sema;
2572
2573 // queue is locked, or suspended and not being drained
2574 dc = dq->dq_items_head;
2575 if (slowpath(!dc) || !(sema = _dispatch_barrier_sync_f_pop(dq, dc, false))){
2576 return 0;
2577 }
2578 // dequeue dc, it is a barrier sync
2579 (void)_dispatch_queue_next(dq, dc);
2580 return sema;
2581 }
2582
2583 void
2584 _dispatch_mgr_queue_drain(void)
2585 {
2586 dispatch_queue_t dq = &_dispatch_mgr_q;
2587 if (!dq->dq_items_tail) {
2588 return _dispatch_force_cache_cleanup();
2589 }
2590 _dispatch_perfmon_start();
2591 if (slowpath(_dispatch_queue_drain(dq))) {
2592 DISPATCH_CRASH("Sync onto manager queue");
2593 }
2594 _dispatch_perfmon_end();
2595 _dispatch_force_cache_cleanup();
2596 }
2597
2598 #pragma mark -
2599 #pragma mark dispatch_root_queue_drain
2600
2601 #ifndef DISPATCH_CONTENTION_USE_RAND
2602 #define DISPATCH_CONTENTION_USE_RAND (!TARGET_OS_EMBEDDED)
2603 #endif
2604 #ifndef DISPATCH_CONTENTION_SPINS_MAX
2605 #define DISPATCH_CONTENTION_SPINS_MAX (128 - 1)
2606 #endif
2607 #ifndef DISPATCH_CONTENTION_SPINS_MIN
2608 #define DISPATCH_CONTENTION_SPINS_MIN (32 - 1)
2609 #endif
2610 #ifndef DISPATCH_CONTENTION_USLEEP_START
2611 #define DISPATCH_CONTENTION_USLEEP_START 500
2612 #endif
2613 #ifndef DISPATCH_CONTENTION_USLEEP_MAX
2614 #define DISPATCH_CONTENTION_USLEEP_MAX 100000
2615 #endif
2616
2617 DISPATCH_NOINLINE
2618 static bool
2619 _dispatch_queue_concurrent_drain_one_slow(dispatch_queue_t dq)
2620 {
2621 dispatch_root_queue_context_t qc = dq->do_ctxt;
2622 struct dispatch_object_s *const mediator = (void *)~0ul;
2623 bool pending = false, available = true;
2624 unsigned int spins, sleep_time = DISPATCH_CONTENTION_USLEEP_START;
2625
2626 do {
2627 // Spin for a short while in case the contention is temporary -- e.g.
2628 // when starting up after dispatch_apply, or when executing a few
2629 // short continuations in a row.
2630 #if DISPATCH_CONTENTION_USE_RAND
2631 // Use randomness to prevent threads from resonating at the same
2632 // frequency and permanently contending. All threads sharing the same
2633 // seed value is safe with the FreeBSD rand_r implementation.
2634 static unsigned int seed;
2635 spins = (rand_r(&seed) & DISPATCH_CONTENTION_SPINS_MAX) |
2636 DISPATCH_CONTENTION_SPINS_MIN;
2637 #else
2638 spins = DISPATCH_CONTENTION_SPINS_MIN +
2639 (DISPATCH_CONTENTION_SPINS_MAX-DISPATCH_CONTENTION_SPINS_MIN)/2;
2640 #endif
2641 while (spins--) {
2642 dispatch_hardware_pause();
2643 if (fastpath(dq->dq_items_head != mediator)) goto out;
2644 };
2645 // Since we have serious contention, we need to back off.
2646 if (!pending) {
2647 // Mark this queue as pending to avoid requests for further threads
2648 (void)dispatch_atomic_inc2o(qc, dgq_pending, relaxed);
2649 pending = true;
2650 }
2651 _dispatch_contention_usleep(sleep_time);
2652 if (fastpath(dq->dq_items_head != mediator)) goto out;
2653 sleep_time *= 2;
2654 } while (sleep_time < DISPATCH_CONTENTION_USLEEP_MAX);
2655
2656 // The ratio of work to libdispatch overhead must be bad. This
2657 // scenario implies that there are too many threads in the pool.
2658 // Create a new pending thread and then exit this thread.
2659 // The kernel will grant a new thread when the load subsides.
2660 _dispatch_debug("contention on global queue: %p", dq);
2661 _dispatch_queue_wakeup_global(dq);
2662 available = false;
2663 out:
2664 if (pending) {
2665 (void)dispatch_atomic_dec2o(qc, dgq_pending, relaxed);
2666 }
2667 return available;
2668 }
2669
2670 DISPATCH_ALWAYS_INLINE_NDEBUG
2671 static inline struct dispatch_object_s *
2672 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq)
2673 {
2674 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
2675
2676 start:
2677 // The mediator value acts both as a "lock" and a signal
2678 head = dispatch_atomic_xchg2o(dq, dq_items_head, mediator, relaxed);
2679
2680 if (slowpath(head == NULL)) {
2681 // The first xchg on the tail will tell the enqueueing thread that it
2682 // is safe to blindly write out to the head pointer. A cmpxchg honors
2683 // the algorithm.
2684 (void)dispatch_atomic_cmpxchg2o(dq, dq_items_head, mediator, NULL,
2685 relaxed);
2686 _dispatch_root_queue_debug("no work on global queue: %p", dq);
2687 return NULL;
2688 }
2689
2690 if (slowpath(head == mediator)) {
2691 // This thread lost the race for ownership of the queue.
2692 if (fastpath(_dispatch_queue_concurrent_drain_one_slow(dq))) {
2693 goto start;
2694 }
2695 return NULL;
2696 }
2697
2698 // Restore the head pointer to a sane value before returning.
2699 // If 'next' is NULL, then this item _might_ be the last item.
2700 next = fastpath(head->do_next);
2701
2702 if (slowpath(!next)) {
2703 dispatch_atomic_store2o(dq, dq_items_head, NULL, relaxed);
2704
2705 if (dispatch_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL, relaxed)) {
2706 // both head and tail are NULL now
2707 goto out;
2708 }
2709
2710 // There must be a next item now. This thread won't wait long.
2711 while (!(next = head->do_next)) {
2712 dispatch_hardware_pause();
2713 }
2714 }
2715
2716 dispatch_atomic_store2o(dq, dq_items_head, next, relaxed);
2717 _dispatch_queue_wakeup_global(dq);
2718 out:
2719 return head;
2720 }
2721
2722 static void
2723 _dispatch_root_queue_drain(dispatch_queue_t dq)
2724 {
2725 #if DISPATCH_DEBUG
2726 if (_dispatch_thread_getspecific(dispatch_queue_key)) {
2727 DISPATCH_CRASH("Premature thread recycling");
2728 }
2729 #endif
2730 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2731
2732 #if DISPATCH_COCOA_COMPAT
2733 // ensure that high-level memory management techniques do not leak/crash
2734 if (dispatch_begin_thread_4GC) {
2735 dispatch_begin_thread_4GC();
2736 }
2737 void *pool = _dispatch_autorelease_pool_push();
2738 #endif // DISPATCH_COCOA_COMPAT
2739
2740 _dispatch_perfmon_start();
2741 struct dispatch_object_s *item;
2742 while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
2743 _dispatch_continuation_pop(item);
2744 }
2745 _dispatch_perfmon_end();
2746
2747 #if DISPATCH_COCOA_COMPAT
2748 _dispatch_autorelease_pool_pop(pool);
2749 if (dispatch_end_thread_4GC) {
2750 dispatch_end_thread_4GC();
2751 }
2752 #endif // DISPATCH_COCOA_COMPAT
2753
2754 _dispatch_thread_setspecific(dispatch_queue_key, NULL);
2755 }
2756
2757 #pragma mark -
2758 #pragma mark dispatch_worker_thread
2759
2760 #if HAVE_PTHREAD_WORKQUEUES
2761 static void
2762 _dispatch_worker_thread3(void *context)
2763 {
2764 dispatch_queue_t dq = context;
2765 dispatch_root_queue_context_t qc = dq->do_ctxt;
2766
2767 _dispatch_introspection_thread_add();
2768
2769 (void)dispatch_atomic_dec2o(qc, dgq_pending, relaxed);
2770 _dispatch_root_queue_drain(dq);
2771 __asm__(""); // prevent tailcall (for Instrument DTrace probe)
2772
2773 }
2774
2775 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2776 // 6618342 Contact the team that owns the Instrument DTrace probe before
2777 // renaming this symbol
2778 static void
2779 _dispatch_worker_thread2(int priority, int options,
2780 void *context DISPATCH_UNUSED)
2781 {
2782 dispatch_assert(priority >= 0 && priority < WORKQ_NUM_PRIOQUEUE);
2783 dispatch_assert(!(options & ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT));
2784 dispatch_queue_t dq = _dispatch_wq2root_queues[priority][options];
2785
2786 return _dispatch_worker_thread3(dq);
2787 }
2788 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2789 #endif // HAVE_PTHREAD_WORKQUEUES
2790
2791 #if DISPATCH_USE_PTHREAD_POOL
2792 // 6618342 Contact the team that owns the Instrument DTrace probe before
2793 // renaming this symbol
2794 static void *
2795 _dispatch_worker_thread(void *context)
2796 {
2797 dispatch_queue_t dq = context;
2798 dispatch_root_queue_context_t qc = dq->do_ctxt;
2799 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
2800
2801 if (pqc && pqc->dpq_thread_configure) {
2802 pqc->dpq_thread_configure();
2803 }
2804
2805 sigset_t mask;
2806 int r;
2807 // workaround tweaks the kernel workqueue does for us
2808 r = sigfillset(&mask);
2809 (void)dispatch_assume_zero(r);
2810 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
2811 (void)dispatch_assume_zero(r);
2812 _dispatch_introspection_thread_add();
2813
2814 // Non-pthread-root-queue pthreads use a 65 second timeout in case there
2815 // are any timers that run once a minute <rdar://problem/11744973>
2816 const int64_t timeout = (pqc ? 5ull : 65ull) * NSEC_PER_SEC;
2817
2818 do {
2819 _dispatch_root_queue_drain(dq);
2820 } while (dispatch_semaphore_wait(qc->dgq_thread_mediator,
2821 dispatch_time(0, timeout)) == 0);
2822
2823 (void)dispatch_atomic_inc2o(qc, dgq_thread_pool_size, relaxed);
2824 _dispatch_queue_wakeup_global(dq);
2825 _dispatch_release(dq);
2826
2827 return NULL;
2828 }
2829
2830 int
2831 _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset)
2832 {
2833 int r;
2834
2835 /* Workaround: 6269619 Not all signals can be delivered on any thread */
2836
2837 r = sigdelset(set, SIGILL);
2838 (void)dispatch_assume_zero(r);
2839 r = sigdelset(set, SIGTRAP);
2840 (void)dispatch_assume_zero(r);
2841 #if HAVE_DECL_SIGEMT
2842 r = sigdelset(set, SIGEMT);
2843 (void)dispatch_assume_zero(r);
2844 #endif
2845 r = sigdelset(set, SIGFPE);
2846 (void)dispatch_assume_zero(r);
2847 r = sigdelset(set, SIGBUS);
2848 (void)dispatch_assume_zero(r);
2849 r = sigdelset(set, SIGSEGV);
2850 (void)dispatch_assume_zero(r);
2851 r = sigdelset(set, SIGSYS);
2852 (void)dispatch_assume_zero(r);
2853 r = sigdelset(set, SIGPIPE);
2854 (void)dispatch_assume_zero(r);
2855
2856 return pthread_sigmask(how, set, oset);
2857 }
2858 #endif // DISPATCH_USE_PTHREAD_POOL
2859
2860 #pragma mark -
2861 #pragma mark dispatch_runloop_queue
2862
2863 static bool _dispatch_program_is_probably_callback_driven;
2864
2865 #if DISPATCH_COCOA_COMPAT
2866
2867 dispatch_queue_t
2868 _dispatch_runloop_root_queue_create_4CF(const char *label, unsigned long flags)
2869 {
2870 dispatch_queue_t dq;
2871 size_t dqs;
2872
2873 if (slowpath(flags)) {
2874 return NULL;
2875 }
2876 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
2877 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_runloop), dqs);
2878 _dispatch_queue_init(dq);
2879 dq->do_targetq = _dispatch_get_root_queue(0, true);
2880 dq->dq_label = label ? label : "runloop-queue"; // no-copy contract
2881 dq->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK;
2882 dq->dq_running = 1;
2883 dq->dq_is_thread_bound = 1;
2884 _dispatch_runloop_queue_port_init(dq);
2885 _dispatch_queue_set_bound_thread(dq);
2886 _dispatch_object_debug(dq, "%s", __func__);
2887 return _dispatch_introspection_queue_create(dq);
2888 }
2889
2890 void
2891 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq)
2892 {
2893 _dispatch_object_debug(dq, "%s", __func__);
2894 (void)dispatch_atomic_dec2o(dq, dq_running, relaxed);
2895 unsigned int suspend_cnt = dispatch_atomic_sub2o(dq, do_suspend_cnt,
2896 DISPATCH_OBJECT_SUSPEND_LOCK, release);
2897 _dispatch_queue_clear_bound_thread(dq);
2898 if (suspend_cnt == 0) {
2899 _dispatch_wakeup(dq);
2900 }
2901 }
2902
2903 void
2904 _dispatch_runloop_queue_dispose(dispatch_queue_t dq)
2905 {
2906 _dispatch_object_debug(dq, "%s", __func__);
2907 _dispatch_introspection_queue_dispose(dq);
2908 _dispatch_runloop_queue_port_dispose(dq);
2909 _dispatch_queue_destroy(dq);
2910 }
2911
2912 bool
2913 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq)
2914 {
2915 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
2916 DISPATCH_CLIENT_CRASH("Not a runloop queue");
2917 }
2918 dispatch_retain(dq);
2919 bool r = _dispatch_runloop_queue_drain_one(dq);
2920 dispatch_release(dq);
2921 return r;
2922 }
2923
2924 void
2925 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq)
2926 {
2927 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
2928 DISPATCH_CLIENT_CRASH("Not a runloop queue");
2929 }
2930 _dispatch_runloop_queue_probe(dq);
2931 }
2932
2933 mach_port_t
2934 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq)
2935 {
2936 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
2937 DISPATCH_CLIENT_CRASH("Not a runloop queue");
2938 }
2939 return (mach_port_t)dq->do_ctxt;
2940 }
2941
2942 static void
2943 _dispatch_runloop_queue_port_init(void *ctxt)
2944 {
2945 dispatch_queue_t dq = (dispatch_queue_t)ctxt;
2946 mach_port_t mp;
2947 kern_return_t kr;
2948
2949 _dispatch_safe_fork = false;
2950 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &mp);
2951 DISPATCH_VERIFY_MIG(kr);
2952 (void)dispatch_assume_zero(kr);
2953 kr = mach_port_insert_right(mach_task_self(), mp, mp,
2954 MACH_MSG_TYPE_MAKE_SEND);
2955 DISPATCH_VERIFY_MIG(kr);
2956 (void)dispatch_assume_zero(kr);
2957 if (dq != &_dispatch_main_q) {
2958 struct mach_port_limits limits = {
2959 .mpl_qlimit = 1,
2960 };
2961 kr = mach_port_set_attributes(mach_task_self(), mp,
2962 MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits,
2963 sizeof(limits));
2964 DISPATCH_VERIFY_MIG(kr);
2965 (void)dispatch_assume_zero(kr);
2966 }
2967 dq->do_ctxt = (void*)(uintptr_t)mp;
2968
2969 _dispatch_program_is_probably_callback_driven = true;
2970 }
2971
2972 static void
2973 _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq)
2974 {
2975 mach_port_t mp = (mach_port_t)dq->do_ctxt;
2976 if (!mp) {
2977 return;
2978 }
2979 dq->do_ctxt = NULL;
2980 kern_return_t kr = mach_port_deallocate(mach_task_self(), mp);
2981 DISPATCH_VERIFY_MIG(kr);
2982 (void)dispatch_assume_zero(kr);
2983 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE, -1);
2984 DISPATCH_VERIFY_MIG(kr);
2985 (void)dispatch_assume_zero(kr);
2986 }
2987
2988 #pragma mark -
2989 #pragma mark dispatch_main_queue
2990
2991 mach_port_t
2992 _dispatch_get_main_queue_port_4CF(void)
2993 {
2994 dispatch_queue_t dq = &_dispatch_main_q;
2995 dispatch_once_f(&_dispatch_main_q_port_pred, dq,
2996 _dispatch_runloop_queue_port_init);
2997 return (mach_port_t)dq->do_ctxt;
2998 }
2999
3000 static bool main_q_is_draining;
3001
3002 // 6618342 Contact the team that owns the Instrument DTrace probe before
3003 // renaming this symbol
3004 DISPATCH_NOINLINE
3005 static void
3006 _dispatch_queue_set_mainq_drain_state(bool arg)
3007 {
3008 main_q_is_draining = arg;
3009 }
3010
3011 void
3012 _dispatch_main_queue_callback_4CF(mach_msg_header_t *msg DISPATCH_UNUSED)
3013 {
3014 if (main_q_is_draining) {
3015 return;
3016 }
3017 _dispatch_queue_set_mainq_drain_state(true);
3018 _dispatch_main_queue_drain();
3019 _dispatch_queue_set_mainq_drain_state(false);
3020 }
3021
3022 #endif
3023
3024 void
3025 dispatch_main(void)
3026 {
3027 #if HAVE_PTHREAD_MAIN_NP
3028 if (pthread_main_np()) {
3029 #endif
3030 _dispatch_object_debug(&_dispatch_main_q, "%s", __func__);
3031 _dispatch_program_is_probably_callback_driven = true;
3032 pthread_exit(NULL);
3033 DISPATCH_CRASH("pthread_exit() returned");
3034 #if HAVE_PTHREAD_MAIN_NP
3035 }
3036 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
3037 #endif
3038 }
3039
3040 DISPATCH_NOINLINE DISPATCH_NORETURN
3041 static void
3042 _dispatch_sigsuspend(void)
3043 {
3044 static const sigset_t mask;
3045
3046 for (;;) {
3047 sigsuspend(&mask);
3048 }
3049 }
3050
3051 DISPATCH_NORETURN
3052 static void
3053 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED)
3054 {
3055 // never returns, so burn bridges behind us
3056 _dispatch_clear_stack(0);
3057 _dispatch_sigsuspend();
3058 }
3059
3060 DISPATCH_NOINLINE
3061 static void
3062 _dispatch_queue_cleanup2(void)
3063 {
3064 dispatch_queue_t dq = &_dispatch_main_q;
3065 (void)dispatch_atomic_dec2o(dq, dq_running, relaxed);
3066 unsigned int suspend_cnt = dispatch_atomic_sub2o(dq, do_suspend_cnt,
3067 DISPATCH_OBJECT_SUSPEND_LOCK, release);
3068 dq->dq_is_thread_bound = 0;
3069 if (suspend_cnt == 0) {
3070 _dispatch_wakeup(dq);
3071 }
3072
3073 // overload the "probably" variable to mean that dispatch_main() or
3074 // similar non-POSIX API was called
3075 // this has to run before the DISPATCH_COCOA_COMPAT below
3076 if (_dispatch_program_is_probably_callback_driven) {
3077 dispatch_async_f(_dispatch_get_root_queue(0, true), NULL,
3078 _dispatch_sig_thread);
3079 sleep(1); // workaround 6778970
3080 }
3081
3082 #if DISPATCH_COCOA_COMPAT
3083 dispatch_once_f(&_dispatch_main_q_port_pred, dq,
3084 _dispatch_runloop_queue_port_init);
3085 _dispatch_runloop_queue_port_dispose(dq);
3086 #endif
3087 }
3088
3089 static void
3090 _dispatch_queue_cleanup(void *ctxt)
3091 {
3092 if (ctxt == &_dispatch_main_q) {
3093 return _dispatch_queue_cleanup2();
3094 }
3095 // POSIX defines that destructors are only called if 'ctxt' is non-null
3096 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
3097 }