]> git.saurik.com Git - apple/libdispatch.git/blob - src/queue.c
libdispatch-913.1.6.tar.gz
[apple/libdispatch.git] / src / queue.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h" // _dispatch_send_wakeup_runloop_thread
24 #endif
25
26 #if HAVE_PTHREAD_WORKQUEUES || DISPATCH_USE_INTERNAL_WORKQUEUE
27 #define DISPATCH_USE_WORKQUEUES 1
28 #endif
29 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
30 !defined(DISPATCH_ENABLE_THREAD_POOL)
31 #define DISPATCH_ENABLE_THREAD_POOL 1
32 #endif
33 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
34 #define DISPATCH_USE_PTHREAD_POOL 1
35 #endif
36 #if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || \
37 DISPATCH_DEBUG) && !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
38 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
39 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
40 #endif
41 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && (DISPATCH_DEBUG || \
42 (!DISPATCH_USE_KEVENT_WORKQUEUE && !HAVE_PTHREAD_WORKQUEUE_QOS)) && \
43 !defined(DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP)
44 #define DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 1
45 #endif
46 #if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP || \
47 DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || \
48 DISPATCH_USE_INTERNAL_WORKQUEUE
49 #if !DISPATCH_USE_INTERNAL_WORKQUEUE
50 #define DISPATCH_USE_WORKQ_PRIORITY 1
51 #endif
52 #define DISPATCH_USE_WORKQ_OPTIONS 1
53 #endif
54
55 #if DISPATCH_USE_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
56 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
57 #define pthread_workqueue_t void*
58 #endif
59
60 static void _dispatch_sig_thread(void *ctxt);
61 static void _dispatch_cache_cleanup(void *value);
62 static void _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc);
63 static void _dispatch_queue_cleanup(void *ctxt);
64 static void _dispatch_wlh_cleanup(void *ctxt);
65 static void _dispatch_deferred_items_cleanup(void *ctxt);
66 static void _dispatch_frame_cleanup(void *ctxt);
67 static void _dispatch_context_cleanup(void *ctxt);
68 static void _dispatch_queue_barrier_complete(dispatch_queue_t dq,
69 dispatch_qos_t qos, dispatch_wakeup_flags_t flags);
70 static void _dispatch_queue_non_barrier_complete(dispatch_queue_t dq);
71 static void _dispatch_queue_push_sync_waiter(dispatch_queue_t dq,
72 dispatch_sync_context_t dsc, dispatch_qos_t qos);
73 #if HAVE_PTHREAD_WORKQUEUE_QOS
74 static void _dispatch_root_queue_push_override_stealer(dispatch_queue_t orig_rq,
75 dispatch_queue_t dq, dispatch_qos_t qos);
76 static inline void _dispatch_queue_class_wakeup_with_override(dispatch_queue_t,
77 uint64_t dq_state, dispatch_wakeup_flags_t flags);
78 #endif
79 #if HAVE_PTHREAD_WORKQUEUES
80 static void _dispatch_worker_thread4(void *context);
81 #if HAVE_PTHREAD_WORKQUEUE_QOS
82 static void _dispatch_worker_thread3(pthread_priority_t priority);
83 #endif
84 #if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
85 static void _dispatch_worker_thread2(int priority, int options, void *context);
86 #endif
87 #endif
88 #if DISPATCH_USE_PTHREAD_POOL
89 static void *_dispatch_worker_thread(void *context);
90 #endif
91
92 #if DISPATCH_COCOA_COMPAT
93 static dispatch_once_t _dispatch_main_q_handle_pred;
94 static void _dispatch_runloop_queue_poke(dispatch_queue_t dq,
95 dispatch_qos_t qos, dispatch_wakeup_flags_t flags);
96 static void _dispatch_runloop_queue_handle_init(void *ctxt);
97 static void _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq);
98 #endif
99
100 #pragma mark -
101 #pragma mark dispatch_root_queue
102
103 struct dispatch_pthread_root_queue_context_s {
104 pthread_attr_t dpq_thread_attr;
105 dispatch_block_t dpq_thread_configure;
106 struct dispatch_semaphore_s dpq_thread_mediator;
107 dispatch_pthread_root_queue_observer_hooks_s dpq_observer_hooks;
108 };
109 typedef struct dispatch_pthread_root_queue_context_s *
110 dispatch_pthread_root_queue_context_t;
111
112 #if DISPATCH_ENABLE_THREAD_POOL
113 static struct dispatch_pthread_root_queue_context_s
114 _dispatch_pthread_root_queue_contexts[] = {
115 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {
116 .dpq_thread_mediator = {
117 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
118 }},
119 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {
120 .dpq_thread_mediator = {
121 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
122 }},
123 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {
124 .dpq_thread_mediator = {
125 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
126 }},
127 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {
128 .dpq_thread_mediator = {
129 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
130 }},
131 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {
132 .dpq_thread_mediator = {
133 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
134 }},
135 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {
136 .dpq_thread_mediator = {
137 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
138 }},
139 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {
140 .dpq_thread_mediator = {
141 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
142 }},
143 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {
144 .dpq_thread_mediator = {
145 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
146 }},
147 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {
148 .dpq_thread_mediator = {
149 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
150 }},
151 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {
152 .dpq_thread_mediator = {
153 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
154 }},
155 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {
156 .dpq_thread_mediator = {
157 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
158 }},
159 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {
160 .dpq_thread_mediator = {
161 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
162 }},
163 };
164 #endif
165
166 #ifndef DISPATCH_WORKQ_MAX_PTHREAD_COUNT
167 #define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
168 #endif
169
170 struct dispatch_root_queue_context_s {
171 union {
172 struct {
173 int volatile dgq_pending;
174 #if DISPATCH_USE_WORKQUEUES
175 qos_class_t dgq_qos;
176 #if DISPATCH_USE_WORKQ_PRIORITY
177 int dgq_wq_priority;
178 #endif
179 #if DISPATCH_USE_WORKQ_OPTIONS
180 int dgq_wq_options;
181 #endif
182 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
183 pthread_workqueue_t dgq_kworkqueue;
184 #endif
185 #endif // DISPATCH_USE_WORKQUEUES
186 #if DISPATCH_USE_PTHREAD_POOL
187 void *dgq_ctxt;
188 int32_t volatile dgq_thread_pool_size;
189 #endif
190 };
191 char _dgq_pad[DISPATCH_CACHELINE_SIZE];
192 };
193 };
194 typedef struct dispatch_root_queue_context_s *dispatch_root_queue_context_t;
195
196 #define WORKQ_PRIO_INVALID (-1)
197 #ifndef WORKQ_BG_PRIOQUEUE_CONDITIONAL
198 #define WORKQ_BG_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
199 #endif
200 #ifndef WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
201 #define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
202 #endif
203
204 DISPATCH_CACHELINE_ALIGN
205 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
206 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
207 #if DISPATCH_USE_WORKQUEUES
208 .dgq_qos = QOS_CLASS_MAINTENANCE,
209 #if DISPATCH_USE_WORKQ_PRIORITY
210 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
211 #endif
212 #if DISPATCH_USE_WORKQ_OPTIONS
213 .dgq_wq_options = 0,
214 #endif
215 #endif
216 #if DISPATCH_ENABLE_THREAD_POOL
217 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
218 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS],
219 #endif
220 }}},
221 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
222 #if DISPATCH_USE_WORKQUEUES
223 .dgq_qos = QOS_CLASS_MAINTENANCE,
224 #if DISPATCH_USE_WORKQ_PRIORITY
225 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
226 #endif
227 #if DISPATCH_USE_WORKQ_OPTIONS
228 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
229 #endif
230 #endif
231 #if DISPATCH_ENABLE_THREAD_POOL
232 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
233 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT],
234 #endif
235 }}},
236 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
237 #if DISPATCH_USE_WORKQUEUES
238 .dgq_qos = QOS_CLASS_BACKGROUND,
239 #if DISPATCH_USE_WORKQ_PRIORITY
240 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
241 #endif
242 #if DISPATCH_USE_WORKQ_OPTIONS
243 .dgq_wq_options = 0,
244 #endif
245 #endif
246 #if DISPATCH_ENABLE_THREAD_POOL
247 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
248 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
249 #endif
250 }}},
251 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
252 #if DISPATCH_USE_WORKQUEUES
253 .dgq_qos = QOS_CLASS_BACKGROUND,
254 #if DISPATCH_USE_WORKQ_PRIORITY
255 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
256 #endif
257 #if DISPATCH_USE_WORKQ_OPTIONS
258 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
259 #endif
260 #endif
261 #if DISPATCH_ENABLE_THREAD_POOL
262 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
263 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
264 #endif
265 }}},
266 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
267 #if DISPATCH_USE_WORKQUEUES
268 .dgq_qos = QOS_CLASS_UTILITY,
269 #if DISPATCH_USE_WORKQ_PRIORITY
270 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
271 #endif
272 #if DISPATCH_USE_WORKQ_OPTIONS
273 .dgq_wq_options = 0,
274 #endif
275 #endif
276 #if DISPATCH_ENABLE_THREAD_POOL
277 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
278 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
279 #endif
280 }}},
281 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
282 #if DISPATCH_USE_WORKQUEUES
283 .dgq_qos = QOS_CLASS_UTILITY,
284 #if DISPATCH_USE_WORKQ_PRIORITY
285 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
286 #endif
287 #if DISPATCH_USE_WORKQ_OPTIONS
288 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
289 #endif
290 #endif
291 #if DISPATCH_ENABLE_THREAD_POOL
292 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
293 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
294 #endif
295 }}},
296 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
297 #if DISPATCH_USE_WORKQUEUES
298 .dgq_qos = QOS_CLASS_DEFAULT,
299 #if DISPATCH_USE_WORKQ_PRIORITY
300 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
301 #endif
302 #if DISPATCH_USE_WORKQ_OPTIONS
303 .dgq_wq_options = 0,
304 #endif
305 #endif
306 #if DISPATCH_ENABLE_THREAD_POOL
307 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
308 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
309 #endif
310 }}},
311 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
312 #if DISPATCH_USE_WORKQUEUES
313 .dgq_qos = QOS_CLASS_DEFAULT,
314 #if DISPATCH_USE_WORKQ_PRIORITY
315 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
316 #endif
317 #if DISPATCH_USE_WORKQ_OPTIONS
318 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
319 #endif
320 #endif
321 #if DISPATCH_ENABLE_THREAD_POOL
322 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
323 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
324 #endif
325 }}},
326 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
327 #if DISPATCH_USE_WORKQUEUES
328 .dgq_qos = QOS_CLASS_USER_INITIATED,
329 #if DISPATCH_USE_WORKQ_PRIORITY
330 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
331 #endif
332 #if DISPATCH_USE_WORKQ_OPTIONS
333 .dgq_wq_options = 0,
334 #endif
335 #endif
336 #if DISPATCH_ENABLE_THREAD_POOL
337 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
338 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
339 #endif
340 }}},
341 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
342 #if DISPATCH_USE_WORKQUEUES
343 .dgq_qos = QOS_CLASS_USER_INITIATED,
344 #if DISPATCH_USE_WORKQ_PRIORITY
345 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
346 #endif
347 #if DISPATCH_USE_WORKQ_OPTIONS
348 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
349 #endif
350 #endif
351 #if DISPATCH_ENABLE_THREAD_POOL
352 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
353 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
354 #endif
355 }}},
356 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
357 #if DISPATCH_USE_WORKQUEUES
358 .dgq_qos = QOS_CLASS_USER_INTERACTIVE,
359 #if DISPATCH_USE_WORKQ_PRIORITY
360 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
361 #endif
362 #if DISPATCH_USE_WORKQ_OPTIONS
363 .dgq_wq_options = 0,
364 #endif
365 #endif
366 #if DISPATCH_ENABLE_THREAD_POOL
367 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
368 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS],
369 #endif
370 }}},
371 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
372 #if DISPATCH_USE_WORKQUEUES
373 .dgq_qos = QOS_CLASS_USER_INTERACTIVE,
374 #if DISPATCH_USE_WORKQ_PRIORITY
375 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
376 #endif
377 #if DISPATCH_USE_WORKQ_OPTIONS
378 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
379 #endif
380 #endif
381 #if DISPATCH_ENABLE_THREAD_POOL
382 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
383 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT],
384 #endif
385 }}},
386 };
387
388 // 6618342 Contact the team that owns the Instrument DTrace probe before
389 // renaming this symbol
390 DISPATCH_CACHELINE_ALIGN
391 struct dispatch_queue_s _dispatch_root_queues[] = {
392 #define _DISPATCH_ROOT_QUEUE_IDX(n, flags) \
393 ((flags & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) ? \
394 DISPATCH_ROOT_QUEUE_IDX_##n##_QOS_OVERCOMMIT : \
395 DISPATCH_ROOT_QUEUE_IDX_##n##_QOS)
396 #define _DISPATCH_ROOT_QUEUE_ENTRY(n, flags, ...) \
397 [_DISPATCH_ROOT_QUEUE_IDX(n, flags)] = { \
398 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root), \
399 .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
400 .do_ctxt = &_dispatch_root_queue_contexts[ \
401 _DISPATCH_ROOT_QUEUE_IDX(n, flags)], \
402 .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \
403 .dq_priority = _dispatch_priority_make(DISPATCH_QOS_##n, 0) | flags | \
404 DISPATCH_PRIORITY_FLAG_ROOTQUEUE | \
405 ((flags & DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE) ? 0 : \
406 DISPATCH_QOS_##n << DISPATCH_PRIORITY_OVERRIDE_SHIFT), \
407 __VA_ARGS__ \
408 }
409 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, 0,
410 .dq_label = "com.apple.root.maintenance-qos",
411 .dq_serialnum = 4,
412 ),
413 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
414 .dq_label = "com.apple.root.maintenance-qos.overcommit",
415 .dq_serialnum = 5,
416 ),
417 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, 0,
418 .dq_label = "com.apple.root.background-qos",
419 .dq_serialnum = 6,
420 ),
421 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
422 .dq_label = "com.apple.root.background-qos.overcommit",
423 .dq_serialnum = 7,
424 ),
425 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, 0,
426 .dq_label = "com.apple.root.utility-qos",
427 .dq_serialnum = 8,
428 ),
429 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
430 .dq_label = "com.apple.root.utility-qos.overcommit",
431 .dq_serialnum = 9,
432 ),
433 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE,
434 .dq_label = "com.apple.root.default-qos",
435 .dq_serialnum = 10,
436 ),
437 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
438 DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
439 .dq_label = "com.apple.root.default-qos.overcommit",
440 .dq_serialnum = 11,
441 ),
442 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, 0,
443 .dq_label = "com.apple.root.user-initiated-qos",
444 .dq_serialnum = 12,
445 ),
446 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
447 .dq_label = "com.apple.root.user-initiated-qos.overcommit",
448 .dq_serialnum = 13,
449 ),
450 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, 0,
451 .dq_label = "com.apple.root.user-interactive-qos",
452 .dq_serialnum = 14,
453 ),
454 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
455 .dq_label = "com.apple.root.user-interactive-qos.overcommit",
456 .dq_serialnum = 15,
457 ),
458 };
459
460 #if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
461 static const dispatch_queue_t _dispatch_wq2root_queues[][2] = {
462 [WORKQ_BG_PRIOQUEUE][0] = &_dispatch_root_queues[
463 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
464 [WORKQ_BG_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
465 &_dispatch_root_queues[
466 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
467 [WORKQ_LOW_PRIOQUEUE][0] = &_dispatch_root_queues[
468 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
469 [WORKQ_LOW_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
470 &_dispatch_root_queues[
471 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
472 [WORKQ_DEFAULT_PRIOQUEUE][0] = &_dispatch_root_queues[
473 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
474 [WORKQ_DEFAULT_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
475 &_dispatch_root_queues[
476 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
477 [WORKQ_HIGH_PRIOQUEUE][0] = &_dispatch_root_queues[
478 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
479 [WORKQ_HIGH_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
480 &_dispatch_root_queues[
481 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
482 };
483 #endif // DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
484
485 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
486 static struct dispatch_queue_s _dispatch_mgr_root_queue;
487 #else
488 #define _dispatch_mgr_root_queue _dispatch_root_queues[\
489 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT]
490 #endif
491
492 // 6618342 Contact the team that owns the Instrument DTrace probe before
493 // renaming this symbol
494 DISPATCH_CACHELINE_ALIGN
495 struct dispatch_queue_s _dispatch_mgr_q = {
496 DISPATCH_GLOBAL_OBJECT_HEADER(queue_mgr),
497 .dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
498 DISPATCH_QUEUE_ROLE_BASE_ANON,
499 .do_targetq = &_dispatch_mgr_root_queue,
500 .dq_label = "com.apple.libdispatch-manager",
501 .dq_atomic_flags = DQF_WIDTH(1),
502 .dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
503 DISPATCH_PRIORITY_SATURATED_OVERRIDE,
504 .dq_serialnum = 2,
505 };
506
507 dispatch_queue_t
508 dispatch_get_global_queue(long priority, unsigned long flags)
509 {
510 if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
511 return DISPATCH_BAD_INPUT;
512 }
513 dispatch_qos_t qos = _dispatch_qos_from_queue_priority(priority);
514 #if !HAVE_PTHREAD_WORKQUEUE_QOS
515 if (qos == QOS_CLASS_MAINTENANCE) {
516 qos = DISPATCH_QOS_BACKGROUND;
517 } else if (qos == QOS_CLASS_USER_INTERACTIVE) {
518 qos = DISPATCH_QOS_USER_INITIATED;
519 }
520 #endif
521 if (qos == DISPATCH_QOS_UNSPECIFIED) {
522 return DISPATCH_BAD_INPUT;
523 }
524 return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT);
525 }
526
527 DISPATCH_ALWAYS_INLINE
528 static inline dispatch_queue_t
529 _dispatch_get_current_queue(void)
530 {
531 return _dispatch_queue_get_current() ?:
532 _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
533 }
534
535 dispatch_queue_t
536 dispatch_get_current_queue(void)
537 {
538 return _dispatch_get_current_queue();
539 }
540
541 DISPATCH_NOINLINE DISPATCH_NORETURN
542 static void
543 _dispatch_assert_queue_fail(dispatch_queue_t dq, bool expected)
544 {
545 _dispatch_client_assert_fail(
546 "Block was %sexpected to execute on queue [%s]",
547 expected ? "" : "not ", dq->dq_label ?: "");
548 }
549
550 DISPATCH_NOINLINE DISPATCH_NORETURN
551 static void
552 _dispatch_assert_queue_barrier_fail(dispatch_queue_t dq)
553 {
554 _dispatch_client_assert_fail(
555 "Block was expected to act as a barrier on queue [%s]",
556 dq->dq_label ?: "");
557 }
558
559 void
560 dispatch_assert_queue(dispatch_queue_t dq)
561 {
562 unsigned long metatype = dx_metatype(dq);
563 if (unlikely(metatype != _DISPATCH_QUEUE_TYPE)) {
564 DISPATCH_CLIENT_CRASH(metatype, "invalid queue passed to "
565 "dispatch_assert_queue()");
566 }
567 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
568 if (likely(_dq_state_drain_locked_by_self(dq_state))) {
569 return;
570 }
571 // we can look at the width: if it is changing while we read it,
572 // it means that a barrier is running on `dq` concurrently, which
573 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
574 //
575 // However if we can have thread bound queues, these mess with lock
576 // ownership and we always have to take the slowpath
577 if (likely(DISPATCH_COCOA_COMPAT || dq->dq_width > 1)) {
578 if (likely(_dispatch_thread_frame_find_queue(dq))) {
579 return;
580 }
581 }
582 _dispatch_assert_queue_fail(dq, true);
583 }
584
585 void
586 dispatch_assert_queue_not(dispatch_queue_t dq)
587 {
588 unsigned long metatype = dx_metatype(dq);
589 if (unlikely(metatype != _DISPATCH_QUEUE_TYPE)) {
590 DISPATCH_CLIENT_CRASH(metatype, "invalid queue passed to "
591 "dispatch_assert_queue_not()");
592 }
593 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
594 if (likely(!_dq_state_drain_locked_by_self(dq_state))) {
595 // we can look at the width: if it is changing while we read it,
596 // it means that a barrier is running on `dq` concurrently, which
597 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
598 //
599 // However if we can have thread bound queues, these mess with lock
600 // ownership and we always have to take the slowpath
601 if (likely(!DISPATCH_COCOA_COMPAT && dq->dq_width == 1)) {
602 return;
603 }
604 if (likely(!_dispatch_thread_frame_find_queue(dq))) {
605 return;
606 }
607 }
608 _dispatch_assert_queue_fail(dq, false);
609 }
610
611 void
612 dispatch_assert_queue_barrier(dispatch_queue_t dq)
613 {
614 dispatch_assert_queue(dq);
615
616 if (likely(dq->dq_width == 1)) {
617 return;
618 }
619
620 if (likely(dq->do_targetq)) {
621 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
622 if (likely(_dq_state_is_in_barrier(dq_state))) {
623 return;
624 }
625 }
626
627 _dispatch_assert_queue_barrier_fail(dq);
628 }
629
630 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
631 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
632 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
633 #else
634 #define _dispatch_root_queue_debug(...)
635 #define _dispatch_debug_root_queue(...)
636 #endif
637
638 #pragma mark -
639 #pragma mark dispatch_init
640
641 static inline bool
642 _dispatch_root_queues_init_workq(int *wq_supported)
643 {
644 int r; (void)r;
645 bool result = false;
646 *wq_supported = 0;
647 #if DISPATCH_USE_WORKQUEUES
648 bool disable_wq = false; (void)disable_wq;
649 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
650 disable_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
651 #endif
652 #if DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
653 bool disable_qos = false;
654 #if DISPATCH_DEBUG
655 disable_qos = slowpath(getenv("LIBDISPATCH_DISABLE_QOS"));
656 #endif
657 #if DISPATCH_USE_KEVENT_WORKQUEUE
658 bool disable_kevent_wq = false;
659 #if DISPATCH_DEBUG || DISPATCH_PROFILE
660 disable_kevent_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KEVENT_WQ"));
661 #endif
662 #endif
663
664 if (!disable_wq && !disable_qos) {
665 *wq_supported = _pthread_workqueue_supported();
666 #if DISPATCH_USE_KEVENT_WORKQUEUE
667 if (!disable_kevent_wq && (*wq_supported & WORKQ_FEATURE_KEVENT)) {
668 r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread3,
669 (pthread_workqueue_function_kevent_t)
670 _dispatch_kevent_worker_thread,
671 offsetof(struct dispatch_queue_s, dq_serialnum), 0);
672 #if DISPATCH_USE_MGR_THREAD
673 _dispatch_kevent_workqueue_enabled = !r;
674 #endif
675 result = !r;
676 } else
677 #endif // DISPATCH_USE_KEVENT_WORKQUEUE
678 if (*wq_supported & WORKQ_FEATURE_FINEPRIO) {
679 #if DISPATCH_USE_MGR_THREAD
680 r = _pthread_workqueue_init(_dispatch_worker_thread3,
681 offsetof(struct dispatch_queue_s, dq_serialnum), 0);
682 result = !r;
683 #endif
684 }
685 if (!(*wq_supported & WORKQ_FEATURE_MAINTENANCE)) {
686 DISPATCH_INTERNAL_CRASH(*wq_supported,
687 "QoS Maintenance support required");
688 }
689 }
690 #endif // DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
691 #if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
692 if (!result && !disable_wq) {
693 pthread_workqueue_setdispatchoffset_np(
694 offsetof(struct dispatch_queue_s, dq_serialnum));
695 r = pthread_workqueue_setdispatch_np(_dispatch_worker_thread2);
696 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
697 (void)dispatch_assume_zero(r);
698 #endif
699 result = !r;
700 }
701 #endif // DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
702 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
703 if (!result) {
704 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
705 pthread_workqueue_attr_t pwq_attr;
706 if (!disable_wq) {
707 r = pthread_workqueue_attr_init_np(&pwq_attr);
708 (void)dispatch_assume_zero(r);
709 }
710 #endif
711 size_t i;
712 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
713 pthread_workqueue_t pwq = NULL;
714 dispatch_root_queue_context_t qc;
715 qc = &_dispatch_root_queue_contexts[i];
716 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
717 if (!disable_wq && qc->dgq_wq_priority != WORKQ_PRIO_INVALID) {
718 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr,
719 qc->dgq_wq_priority);
720 (void)dispatch_assume_zero(r);
721 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr,
722 qc->dgq_wq_options &
723 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT);
724 (void)dispatch_assume_zero(r);
725 r = pthread_workqueue_create_np(&pwq, &pwq_attr);
726 (void)dispatch_assume_zero(r);
727 result = result || dispatch_assume(pwq);
728 }
729 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
730 if (pwq) {
731 qc->dgq_kworkqueue = pwq;
732 } else {
733 qc->dgq_kworkqueue = (void*)(~0ul);
734 // because the fastpath of _dispatch_global_queue_poke didn't
735 // know yet that we're using the internal pool implementation
736 // we have to undo its setting of dgq_pending
737 qc->dgq_pending = 0;
738 }
739 }
740 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
741 if (!disable_wq) {
742 r = pthread_workqueue_attr_destroy_np(&pwq_attr);
743 (void)dispatch_assume_zero(r);
744 }
745 #endif
746 }
747 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
748 #endif // DISPATCH_USE_WORKQUEUES
749 return result;
750 }
751
752 #if DISPATCH_USE_PTHREAD_POOL
753 static inline void
754 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc,
755 int32_t pool_size, bool overcommit)
756 {
757 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
758 int32_t thread_pool_size = overcommit ? DISPATCH_WORKQ_MAX_PTHREAD_COUNT :
759 (int32_t)dispatch_hw_config(active_cpus);
760 if (slowpath(pool_size) && pool_size < thread_pool_size) {
761 thread_pool_size = pool_size;
762 }
763 qc->dgq_thread_pool_size = thread_pool_size;
764 #if DISPATCH_USE_WORKQUEUES
765 if (qc->dgq_qos) {
766 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
767 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
768 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
769 #if HAVE_PTHREAD_WORKQUEUE_QOS
770 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(
771 &pqc->dpq_thread_attr, qc->dgq_qos, 0));
772 #endif
773 }
774 #endif // HAVE_PTHREAD_WORKQUEUES
775 _dispatch_sema4_t *sema = &pqc->dpq_thread_mediator.dsema_sema;
776 _dispatch_sema4_init(sema, _DSEMA4_POLICY_LIFO);
777 _dispatch_sema4_create(sema, _DSEMA4_POLICY_LIFO);
778 }
779 #endif // DISPATCH_USE_PTHREAD_POOL
780
781 static void
782 _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
783 {
784 int wq_supported;
785 _dispatch_fork_becomes_unsafe();
786 if (!_dispatch_root_queues_init_workq(&wq_supported)) {
787 #if DISPATCH_ENABLE_THREAD_POOL
788 size_t i;
789 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
790 bool overcommit = true;
791 #if TARGET_OS_EMBEDDED || (DISPATCH_USE_INTERNAL_WORKQUEUE && HAVE_DISPATCH_WORKQ_MONITORING)
792 // some software hangs if the non-overcommitting queues do not
793 // overcommit when threads block. Someday, this behavior should
794 // apply to all platforms
795 if (!(i & 1)) {
796 overcommit = false;
797 }
798 #endif
799 _dispatch_root_queue_init_pthread_pool(
800 &_dispatch_root_queue_contexts[i], 0, overcommit);
801 }
802 #else
803 DISPATCH_INTERNAL_CRASH((errno << 16) | wq_supported,
804 "Root queue initialization failed");
805 #endif // DISPATCH_ENABLE_THREAD_POOL
806 }
807 }
808
809 void
810 _dispatch_root_queues_init(void)
811 {
812 static dispatch_once_t _dispatch_root_queues_pred;
813 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
814 _dispatch_root_queues_init_once);
815 }
816
817 DISPATCH_EXPORT DISPATCH_NOTHROW
818 void
819 libdispatch_init(void)
820 {
821 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 2 * DISPATCH_QOS_MAX);
822
823 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW ==
824 -DISPATCH_QUEUE_PRIORITY_HIGH);
825 dispatch_assert(countof(_dispatch_root_queues) ==
826 DISPATCH_ROOT_QUEUE_COUNT);
827 dispatch_assert(countof(_dispatch_root_queue_contexts) ==
828 DISPATCH_ROOT_QUEUE_COUNT);
829 #if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
830 dispatch_assert(sizeof(_dispatch_wq2root_queues) /
831 sizeof(_dispatch_wq2root_queues[0][0]) ==
832 WORKQ_NUM_PRIOQUEUE * 2);
833 #endif
834 #if DISPATCH_ENABLE_THREAD_POOL
835 dispatch_assert(countof(_dispatch_pthread_root_queue_contexts) ==
836 DISPATCH_ROOT_QUEUE_COUNT);
837 #endif
838
839 dispatch_assert(offsetof(struct dispatch_continuation_s, do_next) ==
840 offsetof(struct dispatch_object_s, do_next));
841 dispatch_assert(offsetof(struct dispatch_continuation_s, do_vtable) ==
842 offsetof(struct dispatch_object_s, do_vtable));
843 dispatch_assert(sizeof(struct dispatch_apply_s) <=
844 DISPATCH_CONTINUATION_SIZE);
845 dispatch_assert(sizeof(struct dispatch_queue_s) % DISPATCH_CACHELINE_SIZE
846 == 0);
847 dispatch_assert(offsetof(struct dispatch_queue_s, dq_state) % _Alignof(uint64_t) == 0);
848 dispatch_assert(sizeof(struct dispatch_root_queue_context_s) %
849 DISPATCH_CACHELINE_SIZE == 0);
850
851 #if HAVE_PTHREAD_WORKQUEUE_QOS
852 dispatch_qos_t qos = _dispatch_qos_from_qos_class(qos_class_main());
853 dispatch_priority_t pri = _dispatch_priority_make(qos, 0);
854 _dispatch_main_q.dq_priority = _dispatch_priority_with_override_qos(pri, qos);
855 #if DISPATCH_DEBUG
856 if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) {
857 _dispatch_set_qos_class_enabled = 1;
858 }
859 #endif
860 #endif
861
862 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
863 _dispatch_thread_key_create(&__dispatch_tsd_key, _libdispatch_tsd_cleanup);
864 #else
865 _dispatch_thread_key_create(&dispatch_priority_key, NULL);
866 _dispatch_thread_key_create(&dispatch_r2k_key, NULL);
867 _dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup);
868 _dispatch_thread_key_create(&dispatch_frame_key, _dispatch_frame_cleanup);
869 _dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup);
870 _dispatch_thread_key_create(&dispatch_context_key, _dispatch_context_cleanup);
871 _dispatch_thread_key_create(&dispatch_pthread_root_queue_observer_hooks_key,
872 NULL);
873 _dispatch_thread_key_create(&dispatch_basepri_key, NULL);
874 #if DISPATCH_INTROSPECTION
875 _dispatch_thread_key_create(&dispatch_introspection_key , NULL);
876 #elif DISPATCH_PERF_MON
877 _dispatch_thread_key_create(&dispatch_bcounter_key, NULL);
878 #endif
879 _dispatch_thread_key_create(&dispatch_wlh_key, _dispatch_wlh_cleanup);
880 _dispatch_thread_key_create(&dispatch_voucher_key, _voucher_thread_cleanup);
881 _dispatch_thread_key_create(&dispatch_deferred_items_key,
882 _dispatch_deferred_items_cleanup);
883 #endif
884
885 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
886 _dispatch_main_q.do_targetq = &_dispatch_root_queues[
887 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT];
888 #endif
889
890 _dispatch_queue_set_current(&_dispatch_main_q);
891 _dispatch_queue_set_bound_thread(&_dispatch_main_q);
892
893 #if DISPATCH_USE_PTHREAD_ATFORK
894 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare,
895 dispatch_atfork_parent, dispatch_atfork_child));
896 #endif
897 _dispatch_hw_config_init();
898 _dispatch_time_init();
899 _dispatch_vtable_init();
900 _os_object_init();
901 _voucher_init();
902 _dispatch_introspection_init();
903 }
904
905 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
906 #include <unistd.h>
907 #include <sys/syscall.h>
908
909 #ifndef __ANDROID__
910 #ifdef SYS_gettid
911 DISPATCH_ALWAYS_INLINE
912 static inline pid_t
913 gettid(void)
914 {
915 return (pid_t) syscall(SYS_gettid);
916 }
917 #else
918 #error "SYS_gettid unavailable on this system"
919 #endif /* SYS_gettid */
920 #endif /* ! __ANDROID__ */
921
922 #define _tsd_call_cleanup(k, f) do { \
923 if ((f) && tsd->k) ((void(*)(void*))(f))(tsd->k); \
924 } while (0)
925
926 #ifdef __ANDROID__
927 static void (*_dispatch_thread_detach_callback)(void);
928
929 void
930 _dispatch_install_thread_detach_callback(dispatch_function_t cb)
931 {
932 if (os_atomic_xchg(&_dispatch_thread_detach_callback, cb, relaxed)) {
933 DISPATCH_CLIENT_CRASH(0, "Installing a thread detach callback twice");
934 }
935 }
936 #endif
937
938 void
939 _libdispatch_tsd_cleanup(void *ctx)
940 {
941 struct dispatch_tsd *tsd = (struct dispatch_tsd*) ctx;
942
943 _tsd_call_cleanup(dispatch_priority_key, NULL);
944 _tsd_call_cleanup(dispatch_r2k_key, NULL);
945
946 _tsd_call_cleanup(dispatch_queue_key, _dispatch_queue_cleanup);
947 _tsd_call_cleanup(dispatch_frame_key, _dispatch_frame_cleanup);
948 _tsd_call_cleanup(dispatch_cache_key, _dispatch_cache_cleanup);
949 _tsd_call_cleanup(dispatch_context_key, _dispatch_context_cleanup);
950 _tsd_call_cleanup(dispatch_pthread_root_queue_observer_hooks_key,
951 NULL);
952 _tsd_call_cleanup(dispatch_basepri_key, NULL);
953 #if DISPATCH_INTROSPECTION
954 _tsd_call_cleanup(dispatch_introspection_key, NULL);
955 #elif DISPATCH_PERF_MON
956 _tsd_call_cleanup(dispatch_bcounter_key, NULL);
957 #endif
958 _tsd_call_cleanup(dispatch_wlh_key, _dispatch_wlh_cleanup);
959 _tsd_call_cleanup(dispatch_voucher_key, _voucher_thread_cleanup);
960 _tsd_call_cleanup(dispatch_deferred_items_key,
961 _dispatch_deferred_items_cleanup);
962 #ifdef __ANDROID__
963 if (_dispatch_thread_detach_callback) {
964 _dispatch_thread_detach_callback();
965 }
966 #endif
967 tsd->tid = 0;
968 }
969
970 DISPATCH_NOINLINE
971 void
972 libdispatch_tsd_init(void)
973 {
974 pthread_setspecific(__dispatch_tsd_key, &__dispatch_tsd);
975 __dispatch_tsd.tid = gettid();
976 }
977 #endif
978
979 DISPATCH_NOTHROW
980 void
981 _dispatch_queue_atfork_child(void)
982 {
983 dispatch_queue_t main_q = &_dispatch_main_q;
984 void *crash = (void *)0x100;
985 size_t i;
986
987 if (_dispatch_queue_is_thread_bound(main_q)) {
988 _dispatch_queue_set_bound_thread(main_q);
989 }
990
991 if (!_dispatch_is_multithreaded_inline()) return;
992
993 main_q->dq_items_head = crash;
994 main_q->dq_items_tail = crash;
995
996 _dispatch_mgr_q.dq_items_head = crash;
997 _dispatch_mgr_q.dq_items_tail = crash;
998
999 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
1000 _dispatch_root_queues[i].dq_items_head = crash;
1001 _dispatch_root_queues[i].dq_items_tail = crash;
1002 }
1003 }
1004
1005 DISPATCH_NOINLINE
1006 void
1007 _dispatch_fork_becomes_unsafe_slow(void)
1008 {
1009 uint8_t value = os_atomic_or(&_dispatch_unsafe_fork,
1010 _DISPATCH_UNSAFE_FORK_MULTITHREADED, relaxed);
1011 if (value & _DISPATCH_UNSAFE_FORK_PROHIBIT) {
1012 DISPATCH_CLIENT_CRASH(0, "Transition to multithreaded is prohibited");
1013 }
1014 }
1015
1016 DISPATCH_NOINLINE
1017 void
1018 _dispatch_prohibit_transition_to_multithreaded(bool prohibit)
1019 {
1020 if (prohibit) {
1021 uint8_t value = os_atomic_or(&_dispatch_unsafe_fork,
1022 _DISPATCH_UNSAFE_FORK_PROHIBIT, relaxed);
1023 if (value & _DISPATCH_UNSAFE_FORK_MULTITHREADED) {
1024 DISPATCH_CLIENT_CRASH(0, "The executable is already multithreaded");
1025 }
1026 } else {
1027 os_atomic_and(&_dispatch_unsafe_fork,
1028 (uint8_t)~_DISPATCH_UNSAFE_FORK_PROHIBIT, relaxed);
1029 }
1030 }
1031
1032 #pragma mark -
1033 #pragma mark dispatch_queue_attr_t
1034
1035 DISPATCH_ALWAYS_INLINE
1036 static inline bool
1037 _dispatch_qos_class_valid(dispatch_qos_class_t qos_class, int relative_priority)
1038 {
1039 qos_class_t qos = (qos_class_t)qos_class;
1040 switch (qos) {
1041 case QOS_CLASS_MAINTENANCE:
1042 case QOS_CLASS_BACKGROUND:
1043 case QOS_CLASS_UTILITY:
1044 case QOS_CLASS_DEFAULT:
1045 case QOS_CLASS_USER_INITIATED:
1046 case QOS_CLASS_USER_INTERACTIVE:
1047 case QOS_CLASS_UNSPECIFIED:
1048 break;
1049 default:
1050 return false;
1051 }
1052 if (relative_priority > 0 || relative_priority < QOS_MIN_RELATIVE_PRIORITY){
1053 return false;
1054 }
1055 return true;
1056 }
1057
1058 #define DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit) \
1059 ((overcommit) == _dispatch_queue_attr_overcommit_disabled ? \
1060 DQA_INDEX_NON_OVERCOMMIT : \
1061 ((overcommit) == _dispatch_queue_attr_overcommit_enabled ? \
1062 DQA_INDEX_OVERCOMMIT : DQA_INDEX_UNSPECIFIED_OVERCOMMIT))
1063
1064 #define DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent) \
1065 ((concurrent) ? DQA_INDEX_CONCURRENT : DQA_INDEX_SERIAL)
1066
1067 #define DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive) \
1068 ((inactive) ? DQA_INDEX_INACTIVE : DQA_INDEX_ACTIVE)
1069
1070 #define DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency) \
1071 (frequency)
1072
1073 #define DISPATCH_QUEUE_ATTR_PRIO2IDX(prio) (-(prio))
1074
1075 #define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (qos)
1076
1077 static inline dispatch_queue_attr_t
1078 _dispatch_get_queue_attr(dispatch_qos_t qos, int prio,
1079 _dispatch_queue_attr_overcommit_t overcommit,
1080 dispatch_autorelease_frequency_t frequency,
1081 bool concurrent, bool inactive)
1082 {
1083 return (dispatch_queue_attr_t)&_dispatch_queue_attrs
1084 [DISPATCH_QUEUE_ATTR_QOS2IDX(qos)]
1085 [DISPATCH_QUEUE_ATTR_PRIO2IDX(prio)]
1086 [DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit)]
1087 [DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency)]
1088 [DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent)]
1089 [DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive)];
1090 }
1091
1092 dispatch_queue_attr_t
1093 _dispatch_get_default_queue_attr(void)
1094 {
1095 return _dispatch_get_queue_attr(DISPATCH_QOS_UNSPECIFIED, 0,
1096 _dispatch_queue_attr_overcommit_unspecified,
1097 DISPATCH_AUTORELEASE_FREQUENCY_INHERIT, false, false);
1098 }
1099
1100 dispatch_queue_attr_t
1101 dispatch_queue_attr_make_with_qos_class(dispatch_queue_attr_t dqa,
1102 dispatch_qos_class_t qos_class, int relpri)
1103 {
1104 if (!_dispatch_qos_class_valid(qos_class, relpri)) {
1105 return DISPATCH_BAD_INPUT;
1106 }
1107 if (!slowpath(dqa)) {
1108 dqa = _dispatch_get_default_queue_attr();
1109 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1110 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1111 }
1112 return _dispatch_get_queue_attr(_dispatch_qos_from_qos_class(qos_class),
1113 relpri, dqa->dqa_overcommit, dqa->dqa_autorelease_frequency,
1114 dqa->dqa_concurrent, dqa->dqa_inactive);
1115 }
1116
1117 dispatch_queue_attr_t
1118 dispatch_queue_attr_make_initially_inactive(dispatch_queue_attr_t dqa)
1119 {
1120 if (!slowpath(dqa)) {
1121 dqa = _dispatch_get_default_queue_attr();
1122 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1123 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1124 }
1125 dispatch_priority_t pri = dqa->dqa_qos_and_relpri;
1126 return _dispatch_get_queue_attr(_dispatch_priority_qos(pri),
1127 _dispatch_priority_relpri(pri), dqa->dqa_overcommit,
1128 dqa->dqa_autorelease_frequency, dqa->dqa_concurrent, true);
1129 }
1130
1131 dispatch_queue_attr_t
1132 dispatch_queue_attr_make_with_overcommit(dispatch_queue_attr_t dqa,
1133 bool overcommit)
1134 {
1135 if (!slowpath(dqa)) {
1136 dqa = _dispatch_get_default_queue_attr();
1137 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1138 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1139 }
1140 dispatch_priority_t pri = dqa->dqa_qos_and_relpri;
1141 return _dispatch_get_queue_attr(_dispatch_priority_qos(pri),
1142 _dispatch_priority_relpri(pri), overcommit ?
1143 _dispatch_queue_attr_overcommit_enabled :
1144 _dispatch_queue_attr_overcommit_disabled,
1145 dqa->dqa_autorelease_frequency, dqa->dqa_concurrent,
1146 dqa->dqa_inactive);
1147 }
1148
1149 dispatch_queue_attr_t
1150 dispatch_queue_attr_make_with_autorelease_frequency(dispatch_queue_attr_t dqa,
1151 dispatch_autorelease_frequency_t frequency)
1152 {
1153 switch (frequency) {
1154 case DISPATCH_AUTORELEASE_FREQUENCY_INHERIT:
1155 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
1156 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
1157 break;
1158 default:
1159 return DISPATCH_BAD_INPUT;
1160 }
1161 if (!slowpath(dqa)) {
1162 dqa = _dispatch_get_default_queue_attr();
1163 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1164 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1165 }
1166 dispatch_priority_t pri = dqa->dqa_qos_and_relpri;
1167 return _dispatch_get_queue_attr(_dispatch_priority_qos(pri),
1168 _dispatch_priority_relpri(pri), dqa->dqa_overcommit,
1169 frequency, dqa->dqa_concurrent, dqa->dqa_inactive);
1170 }
1171
1172 #pragma mark -
1173 #pragma mark dispatch_queue_t
1174
1175 void
1176 dispatch_queue_set_label_nocopy(dispatch_queue_t dq, const char *label)
1177 {
1178 if (dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
1179 return;
1180 }
1181 dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(dq);
1182 if (unlikely(dqf & DQF_LABEL_NEEDS_FREE)) {
1183 DISPATCH_CLIENT_CRASH(dq, "Cannot change label for this queue");
1184 }
1185 dq->dq_label = label;
1186 }
1187
1188 static inline bool
1189 _dispatch_base_queue_is_wlh(dispatch_queue_t dq, dispatch_queue_t tq)
1190 {
1191 (void)dq; (void)tq;
1192 return false;
1193 }
1194
1195 static void
1196 _dispatch_queue_inherit_wlh_from_target(dispatch_queue_t dq,
1197 dispatch_queue_t tq)
1198 {
1199 uint64_t old_state, new_state, role;
1200
1201 if (!dx_hastypeflag(tq, QUEUE_ROOT)) {
1202 role = DISPATCH_QUEUE_ROLE_INNER;
1203 } else if (_dispatch_base_queue_is_wlh(dq, tq)) {
1204 role = DISPATCH_QUEUE_ROLE_BASE_WLH;
1205 } else {
1206 role = DISPATCH_QUEUE_ROLE_BASE_ANON;
1207 }
1208
1209 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
1210 new_state = old_state & ~DISPATCH_QUEUE_ROLE_MASK;
1211 new_state |= role;
1212 if (old_state == new_state) {
1213 os_atomic_rmw_loop_give_up(break);
1214 }
1215 });
1216
1217 dispatch_wlh_t cur_wlh = _dispatch_get_wlh();
1218 if (cur_wlh == (dispatch_wlh_t)dq && !_dq_state_is_base_wlh(new_state)) {
1219 _dispatch_event_loop_leave_immediate(cur_wlh, new_state);
1220 }
1221 if (!dx_hastypeflag(tq, QUEUE_ROOT)) {
1222 #if DISPATCH_ALLOW_NON_LEAF_RETARGET
1223 _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
1224 #else
1225 _dispatch_queue_atomic_flags_set_and_clear(tq, DQF_TARGETED, DQF_LEGACY);
1226 #endif
1227 }
1228 }
1229
1230 unsigned long volatile _dispatch_queue_serial_numbers =
1231 DISPATCH_QUEUE_SERIAL_NUMBER_INIT;
1232
1233 dispatch_priority_t
1234 _dispatch_queue_compute_priority_and_wlh(dispatch_queue_t dq,
1235 dispatch_wlh_t *wlh_out)
1236 {
1237 dispatch_priority_t p = dq->dq_priority & DISPATCH_PRIORITY_REQUESTED_MASK;
1238 dispatch_queue_t tq = dq->do_targetq;
1239 dispatch_priority_t tqp = tq->dq_priority &DISPATCH_PRIORITY_REQUESTED_MASK;
1240 dispatch_wlh_t wlh = DISPATCH_WLH_ANON;
1241
1242 if (_dq_state_is_base_wlh(dq->dq_state)) {
1243 wlh = (dispatch_wlh_t)dq;
1244 }
1245
1246 while (unlikely(!dx_hastypeflag(tq, QUEUE_ROOT))) {
1247 if (unlikely(tq == &_dispatch_mgr_q)) {
1248 if (wlh_out) *wlh_out = DISPATCH_WLH_ANON;
1249 return DISPATCH_PRIORITY_FLAG_MANAGER;
1250 }
1251 if (unlikely(_dispatch_queue_is_thread_bound(tq))) {
1252 // thread-bound hierarchies are weird, we need to install
1253 // from the context of the thread this hierarchy is bound to
1254 if (wlh_out) *wlh_out = NULL;
1255 return 0;
1256 }
1257 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(tq))) {
1258 // this queue may not be activated yet, so the queue graph may not
1259 // have stabilized yet
1260 _dispatch_ktrace1(DISPATCH_PERF_delayed_registration, dq);
1261 if (wlh_out) *wlh_out = NULL;
1262 return 0;
1263 }
1264
1265 if (_dq_state_is_base_wlh(tq->dq_state)) {
1266 wlh = (dispatch_wlh_t)tq;
1267 } else if (unlikely(_dispatch_queue_is_legacy(tq))) {
1268 // we're not allowed to dereference tq->do_targetq
1269 _dispatch_ktrace1(DISPATCH_PERF_delayed_registration, dq);
1270 if (wlh_out) *wlh_out = NULL;
1271 return 0;
1272 }
1273
1274 if (!(tq->dq_priority & DISPATCH_PRIORITY_FLAG_INHERIT)) {
1275 if (p < tqp) p = tqp;
1276 }
1277 tq = tq->do_targetq;
1278 tqp = tq->dq_priority & DISPATCH_PRIORITY_REQUESTED_MASK;
1279 }
1280
1281 if (unlikely(!tqp)) {
1282 // pthread root queues opt out of QoS
1283 if (wlh_out) *wlh_out = DISPATCH_WLH_ANON;
1284 return DISPATCH_PRIORITY_FLAG_MANAGER;
1285 }
1286 if (wlh_out) *wlh_out = wlh;
1287 return _dispatch_priority_inherit_from_root_queue(p, tq);
1288 }
1289
1290 DISPATCH_NOINLINE
1291 static dispatch_queue_t
1292 _dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
1293 dispatch_queue_t tq, bool legacy)
1294 {
1295 if (!slowpath(dqa)) {
1296 dqa = _dispatch_get_default_queue_attr();
1297 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1298 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1299 }
1300
1301 //
1302 // Step 1: Normalize arguments (qos, overcommit, tq)
1303 //
1304
1305 dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);
1306 #if !HAVE_PTHREAD_WORKQUEUE_QOS
1307 if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
1308 qos = DISPATCH_QOS_USER_INITIATED;
1309 }
1310 if (qos == DISPATCH_QOS_MAINTENANCE) {
1311 qos = DISPATCH_QOS_BACKGROUND;
1312 }
1313 #endif // !HAVE_PTHREAD_WORKQUEUE_QOS
1314
1315 _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
1316 if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
1317 if (tq->do_targetq) {
1318 DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
1319 "a non-global target queue");
1320 }
1321 }
1322
1323 if (tq && !tq->do_targetq &&
1324 tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
1325 // Handle discrepancies between attr and target queue, attributes win
1326 if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
1327 if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
1328 overcommit = _dispatch_queue_attr_overcommit_enabled;
1329 } else {
1330 overcommit = _dispatch_queue_attr_overcommit_disabled;
1331 }
1332 }
1333 if (qos == DISPATCH_QOS_UNSPECIFIED) {
1334 dispatch_qos_t tq_qos = _dispatch_priority_qos(tq->dq_priority);
1335 tq = _dispatch_get_root_queue(tq_qos,
1336 overcommit == _dispatch_queue_attr_overcommit_enabled);
1337 } else {
1338 tq = NULL;
1339 }
1340 } else if (tq && !tq->do_targetq) {
1341 // target is a pthread or runloop root queue, setting QoS or overcommit
1342 // is disallowed
1343 if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
1344 DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
1345 "and use this kind of target queue");
1346 }
1347 if (qos != DISPATCH_QOS_UNSPECIFIED) {
1348 DISPATCH_CLIENT_CRASH(tq, "Cannot specify a QoS attribute "
1349 "and use this kind of target queue");
1350 }
1351 } else {
1352 if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
1353 // Serial queues default to overcommit!
1354 overcommit = dqa->dqa_concurrent ?
1355 _dispatch_queue_attr_overcommit_disabled :
1356 _dispatch_queue_attr_overcommit_enabled;
1357 }
1358 }
1359 if (!tq) {
1360 tq = _dispatch_get_root_queue(
1361 qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
1362 overcommit == _dispatch_queue_attr_overcommit_enabled);
1363 if (slowpath(!tq)) {
1364 DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
1365 }
1366 }
1367
1368 //
1369 // Step 2: Initialize the queue
1370 //
1371
1372 if (legacy) {
1373 // if any of these attributes is specified, use non legacy classes
1374 if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
1375 legacy = false;
1376 }
1377 }
1378
1379 const void *vtable;
1380 dispatch_queue_flags_t dqf = 0;
1381 if (legacy) {
1382 vtable = DISPATCH_VTABLE(queue);
1383 } else if (dqa->dqa_concurrent) {
1384 vtable = DISPATCH_VTABLE(queue_concurrent);
1385 } else {
1386 vtable = DISPATCH_VTABLE(queue_serial);
1387 }
1388 switch (dqa->dqa_autorelease_frequency) {
1389 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
1390 dqf |= DQF_AUTORELEASE_NEVER;
1391 break;
1392 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
1393 dqf |= DQF_AUTORELEASE_ALWAYS;
1394 break;
1395 }
1396 if (legacy) {
1397 dqf |= DQF_LEGACY;
1398 }
1399 if (label) {
1400 const char *tmp = _dispatch_strdup_if_mutable(label);
1401 if (tmp != label) {
1402 dqf |= DQF_LABEL_NEEDS_FREE;
1403 label = tmp;
1404 }
1405 }
1406
1407 dispatch_queue_t dq = _dispatch_object_alloc(vtable,
1408 sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
1409 _dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
1410 DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
1411 (dqa->dqa_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
1412
1413 dq->dq_label = label;
1414 #if HAVE_PTHREAD_WORKQUEUE_QOS
1415 dq->dq_priority = dqa->dqa_qos_and_relpri;
1416 if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
1417 dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
1418 }
1419 #endif
1420 _dispatch_retain(tq);
1421 if (qos == QOS_CLASS_UNSPECIFIED) {
1422 // legacy way of inherithing the QoS from the target
1423 _dispatch_queue_priority_inherit_from_target(dq, tq);
1424 }
1425 if (!dqa->dqa_inactive) {
1426 _dispatch_queue_inherit_wlh_from_target(dq, tq);
1427 }
1428 dq->do_targetq = tq;
1429 _dispatch_object_debug(dq, "%s", __func__);
1430 return _dispatch_introspection_queue_create(dq);
1431 }
1432
1433 dispatch_queue_t
1434 dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
1435 dispatch_queue_t tq)
1436 {
1437 return _dispatch_queue_create_with_target(label, dqa, tq, false);
1438 }
1439
1440 dispatch_queue_t
1441 dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
1442 {
1443 return _dispatch_queue_create_with_target(label, attr,
1444 DISPATCH_TARGET_QUEUE_DEFAULT, true);
1445 }
1446
1447 dispatch_queue_t
1448 dispatch_queue_create_with_accounting_override_voucher(const char *label,
1449 dispatch_queue_attr_t attr, voucher_t voucher)
1450 {
1451 (void)label; (void)attr; (void)voucher;
1452 DISPATCH_CLIENT_CRASH(0, "Unsupported interface");
1453 }
1454
1455 void
1456 _dispatch_queue_destroy(dispatch_queue_t dq, bool *allow_free)
1457 {
1458 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
1459 uint64_t initial_state = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
1460
1461 if (dx_hastypeflag(dq, QUEUE_ROOT)) {
1462 initial_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE;
1463 }
1464 dq_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
1465 dq_state &= ~DISPATCH_QUEUE_DIRTY;
1466 dq_state &= ~DISPATCH_QUEUE_ROLE_MASK;
1467 if (slowpath(dq_state != initial_state)) {
1468 if (_dq_state_drain_locked(dq_state)) {
1469 DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
1470 "Release of a locked queue");
1471 }
1472 #ifndef __LP64__
1473 dq_state >>= 32;
1474 #endif
1475 DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
1476 "Release of a queue with corrupt state");
1477 }
1478 if (slowpath(dq->dq_items_tail)) {
1479 DISPATCH_CLIENT_CRASH(dq->dq_items_tail,
1480 "Release of a queue while items are enqueued");
1481 }
1482
1483 // trash the queue so that use after free will crash
1484 dq->dq_items_head = (void *)0x200;
1485 dq->dq_items_tail = (void *)0x200;
1486
1487 dispatch_queue_t dqsq = os_atomic_xchg2o(dq, dq_specific_q,
1488 (void *)0x200, relaxed);
1489 if (dqsq) {
1490 _dispatch_release(dqsq);
1491 }
1492
1493 // fastpath for queues that never got their storage retained
1494 if (likely(os_atomic_load2o(dq, dq_sref_cnt, relaxed) == 0)) {
1495 // poison the state with something that is suspended and is easy to spot
1496 dq->dq_state = 0xdead000000000000;
1497 return;
1498 }
1499
1500 // Take over freeing the memory from _dispatch_object_dealloc()
1501 //
1502 // As soon as we call _dispatch_queue_release_storage(), we forfeit
1503 // the possibility for the caller of dx_dispose() to finalize the object
1504 // so that responsibility is ours.
1505 _dispatch_object_finalize(dq);
1506 *allow_free = false;
1507 dq->dq_label = "<released queue, pending free>";
1508 dq->do_targetq = NULL;
1509 dq->do_finalizer = NULL;
1510 dq->do_ctxt = NULL;
1511 return _dispatch_queue_release_storage(dq);
1512 }
1513
1514 // 6618342 Contact the team that owns the Instrument DTrace probe before
1515 // renaming this symbol
1516 void
1517 _dispatch_queue_dispose(dispatch_queue_t dq, bool *allow_free)
1518 {
1519 _dispatch_object_debug(dq, "%s", __func__);
1520 _dispatch_introspection_queue_dispose(dq);
1521 if (dq->dq_label && _dispatch_queue_label_needs_free(dq)) {
1522 free((void*)dq->dq_label);
1523 }
1524 _dispatch_queue_destroy(dq, allow_free);
1525 }
1526
1527 void
1528 _dispatch_queue_xref_dispose(dispatch_queue_t dq)
1529 {
1530 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
1531 if (unlikely(_dq_state_is_suspended(dq_state))) {
1532 long state = (long)dq_state;
1533 if (sizeof(long) < sizeof(uint64_t)) state = (long)(dq_state >> 32);
1534 if (unlikely(_dq_state_is_inactive(dq_state))) {
1535 // Arguments for and against this assert are within 6705399
1536 DISPATCH_CLIENT_CRASH(state, "Release of an inactive object");
1537 }
1538 DISPATCH_CLIENT_CRASH(dq_state, "Release of a suspended object");
1539 }
1540 os_atomic_or2o(dq, dq_atomic_flags, DQF_RELEASED, relaxed);
1541 }
1542
1543 DISPATCH_NOINLINE
1544 static void
1545 _dispatch_queue_suspend_slow(dispatch_queue_t dq)
1546 {
1547 uint64_t dq_state, value, delta;
1548
1549 _dispatch_queue_sidelock_lock(dq);
1550
1551 // what we want to transfer (remove from dq_state)
1552 delta = DISPATCH_QUEUE_SUSPEND_HALF * DISPATCH_QUEUE_SUSPEND_INTERVAL;
1553 // but this is a suspend so add a suspend count at the same time
1554 delta -= DISPATCH_QUEUE_SUSPEND_INTERVAL;
1555 if (dq->dq_side_suspend_cnt == 0) {
1556 // we substract delta from dq_state, and we want to set this bit
1557 delta -= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT;
1558 }
1559
1560 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1561 // unsigned underflow of the substraction can happen because other
1562 // threads could have touched this value while we were trying to acquire
1563 // the lock, or because another thread raced us to do the same operation
1564 // and got to the lock first.
1565 if (unlikely(os_sub_overflow(dq_state, delta, &value))) {
1566 os_atomic_rmw_loop_give_up(goto retry);
1567 }
1568 });
1569 if (unlikely(os_add_overflow(dq->dq_side_suspend_cnt,
1570 DISPATCH_QUEUE_SUSPEND_HALF, &dq->dq_side_suspend_cnt))) {
1571 DISPATCH_CLIENT_CRASH(0, "Too many nested calls to dispatch_suspend()");
1572 }
1573 return _dispatch_queue_sidelock_unlock(dq);
1574
1575 retry:
1576 _dispatch_queue_sidelock_unlock(dq);
1577 return dx_vtable(dq)->do_suspend(dq);
1578 }
1579
1580 void
1581 _dispatch_queue_suspend(dispatch_queue_t dq)
1582 {
1583 dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT);
1584
1585 uint64_t dq_state, value;
1586
1587 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1588 value = DISPATCH_QUEUE_SUSPEND_INTERVAL;
1589 if (unlikely(os_add_overflow(dq_state, value, &value))) {
1590 os_atomic_rmw_loop_give_up({
1591 return _dispatch_queue_suspend_slow(dq);
1592 });
1593 }
1594 });
1595
1596 if (!_dq_state_is_suspended(dq_state)) {
1597 // rdar://8181908 we need to extend the queue life for the duration
1598 // of the call to wakeup at _dispatch_queue_resume() time.
1599 _dispatch_retain_2(dq);
1600 }
1601 }
1602
1603 DISPATCH_NOINLINE
1604 static void
1605 _dispatch_queue_resume_slow(dispatch_queue_t dq)
1606 {
1607 uint64_t dq_state, value, delta;
1608
1609 _dispatch_queue_sidelock_lock(dq);
1610
1611 // what we want to transfer
1612 delta = DISPATCH_QUEUE_SUSPEND_HALF * DISPATCH_QUEUE_SUSPEND_INTERVAL;
1613 // but this is a resume so consume a suspend count at the same time
1614 delta -= DISPATCH_QUEUE_SUSPEND_INTERVAL;
1615 switch (dq->dq_side_suspend_cnt) {
1616 case 0:
1617 goto retry;
1618 case DISPATCH_QUEUE_SUSPEND_HALF:
1619 // we will transition the side count to 0, so we want to clear this bit
1620 delta -= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT;
1621 break;
1622 }
1623 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1624 // unsigned overflow of the addition can happen because other
1625 // threads could have touched this value while we were trying to acquire
1626 // the lock, or because another thread raced us to do the same operation
1627 // and got to the lock first.
1628 if (unlikely(os_add_overflow(dq_state, delta, &value))) {
1629 os_atomic_rmw_loop_give_up(goto retry);
1630 }
1631 });
1632 dq->dq_side_suspend_cnt -= DISPATCH_QUEUE_SUSPEND_HALF;
1633 return _dispatch_queue_sidelock_unlock(dq);
1634
1635 retry:
1636 _dispatch_queue_sidelock_unlock(dq);
1637 return dx_vtable(dq)->do_resume(dq, false);
1638 }
1639
1640 DISPATCH_NOINLINE
1641 static void
1642 _dispatch_queue_resume_finalize_activation(dispatch_queue_t dq)
1643 {
1644 bool allow_resume = true;
1645 // Step 2: run the activation finalizer
1646 if (dx_vtable(dq)->do_finalize_activation) {
1647 dx_vtable(dq)->do_finalize_activation(dq, &allow_resume);
1648 }
1649 // Step 3: consume the suspend count
1650 if (allow_resume) {
1651 return dx_vtable(dq)->do_resume(dq, false);
1652 }
1653 }
1654
1655 void
1656 _dispatch_queue_resume(dispatch_queue_t dq, bool activate)
1657 {
1658 // covers all suspend and inactive bits, including side suspend bit
1659 const uint64_t suspend_bits = DISPATCH_QUEUE_SUSPEND_BITS_MASK;
1660 uint64_t pending_barrier_width =
1661 (dq->dq_width - 1) * DISPATCH_QUEUE_WIDTH_INTERVAL;
1662 uint64_t set_owner_and_set_full_width_and_in_barrier =
1663 _dispatch_lock_value_for_self() | DISPATCH_QUEUE_WIDTH_FULL_BIT |
1664 DISPATCH_QUEUE_IN_BARRIER;
1665
1666 // backward compatibility: only dispatch sources can abuse
1667 // dispatch_resume() to really mean dispatch_activate()
1668 bool is_source = (dx_metatype(dq) == _DISPATCH_SOURCE_TYPE);
1669 uint64_t dq_state, value;
1670
1671 dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT);
1672
1673 // Activation is a bit tricky as it needs to finalize before the wakeup.
1674 //
1675 // If after doing its updates to the suspend count and/or inactive bit,
1676 // the last suspension related bit that would remain is the
1677 // NEEDS_ACTIVATION one, then this function:
1678 //
1679 // 1. moves the state to { sc:1 i:0 na:0 } (converts the needs-activate into
1680 // a suspend count)
1681 // 2. runs the activation finalizer
1682 // 3. consumes the suspend count set in (1), and finishes the resume flow
1683 //
1684 // Concurrently, some property setters such as setting dispatch source
1685 // handlers or _dispatch_queue_set_target_queue try to do in-place changes
1686 // before activation. These protect their action by taking a suspend count.
1687 // Step (1) above cannot happen if such a setter has locked the object.
1688 if (activate) {
1689 // relaxed atomic because this doesn't publish anything, this is only
1690 // about picking the thread that gets to finalize the activation
1691 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1692 if ((dq_state & suspend_bits) ==
1693 DISPATCH_QUEUE_NEEDS_ACTIVATION + DISPATCH_QUEUE_INACTIVE) {
1694 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1695 value = dq_state - DISPATCH_QUEUE_INACTIVE
1696 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1697 + DISPATCH_QUEUE_SUSPEND_INTERVAL;
1698 } else if (_dq_state_is_inactive(dq_state)) {
1699 // { sc:>0 i:1 na:1 } -> { i:0 na:1 }
1700 // simple activation because sc is not 0
1701 // resume will deal with na:1 later
1702 value = dq_state - DISPATCH_QUEUE_INACTIVE;
1703 } else {
1704 // object already active, this is a no-op, just exit
1705 os_atomic_rmw_loop_give_up(return);
1706 }
1707 });
1708 } else {
1709 // release barrier needed to publish the effect of
1710 // - dispatch_set_target_queue()
1711 // - dispatch_set_*_handler()
1712 // - do_finalize_activation()
1713 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, release, {
1714 if ((dq_state & suspend_bits) == DISPATCH_QUEUE_SUSPEND_INTERVAL
1715 + DISPATCH_QUEUE_NEEDS_ACTIVATION) {
1716 // { sc:1 i:0 na:1 } -> { sc:1 i:0 na:0 }
1717 value = dq_state - DISPATCH_QUEUE_NEEDS_ACTIVATION;
1718 } else if (is_source && (dq_state & suspend_bits) ==
1719 DISPATCH_QUEUE_NEEDS_ACTIVATION + DISPATCH_QUEUE_INACTIVE) {
1720 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1721 value = dq_state - DISPATCH_QUEUE_INACTIVE
1722 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1723 + DISPATCH_QUEUE_SUSPEND_INTERVAL;
1724 } else if (unlikely(os_sub_overflow(dq_state,
1725 DISPATCH_QUEUE_SUSPEND_INTERVAL, &value))) {
1726 // underflow means over-resume or a suspend count transfer
1727 // to the side count is needed
1728 os_atomic_rmw_loop_give_up({
1729 if (!(dq_state & DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT)) {
1730 goto over_resume;
1731 }
1732 return _dispatch_queue_resume_slow(dq);
1733 });
1734 //
1735 // below this, value = dq_state - DISPATCH_QUEUE_SUSPEND_INTERVAL
1736 //
1737 } else if (!_dq_state_is_runnable(value)) {
1738 // Out of width or still suspended.
1739 // For the former, force _dispatch_queue_non_barrier_complete
1740 // to reconsider whether it has work to do
1741 value |= DISPATCH_QUEUE_DIRTY;
1742 } else if (_dq_state_drain_locked(value)) {
1743 // still locked by someone else, make drain_try_unlock() fail
1744 // and reconsider whether it has work to do
1745 value |= DISPATCH_QUEUE_DIRTY;
1746 } else if (!is_source && (_dq_state_has_pending_barrier(value) ||
1747 value + pending_barrier_width <
1748 DISPATCH_QUEUE_WIDTH_FULL_BIT)) {
1749 // if we can, acquire the full width drain lock
1750 // and then perform a lock transfer
1751 //
1752 // However this is never useful for a source where there are no
1753 // sync waiters, so never take the lock and do a plain wakeup
1754 value &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
1755 value |= set_owner_and_set_full_width_and_in_barrier;
1756 } else {
1757 // clear overrides and force a wakeup
1758 value &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
1759 value &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
1760 }
1761 });
1762 }
1763
1764 if ((dq_state ^ value) & DISPATCH_QUEUE_NEEDS_ACTIVATION) {
1765 // we cleared the NEEDS_ACTIVATION bit and we have a valid suspend count
1766 return _dispatch_queue_resume_finalize_activation(dq);
1767 }
1768
1769 if (activate) {
1770 // if we're still in an activate codepath here we should have
1771 // { sc:>0 na:1 }, if not we've got a corrupt state
1772 if (unlikely(!_dq_state_is_suspended(value))) {
1773 DISPATCH_CLIENT_CRASH(dq, "Invalid suspension state");
1774 }
1775 return;
1776 }
1777
1778 if (_dq_state_is_suspended(value)) {
1779 return;
1780 }
1781
1782 if (_dq_state_is_dirty(dq_state)) {
1783 // <rdar://problem/14637483>
1784 // dependency ordering for dq state changes that were flushed
1785 // and not acted upon
1786 os_atomic_thread_fence(dependency);
1787 dq = os_atomic_force_dependency_on(dq, dq_state);
1788 }
1789 // Balancing the retain_2 done in suspend() for rdar://8181908
1790 dispatch_wakeup_flags_t flags = DISPATCH_WAKEUP_CONSUME_2;
1791 if ((dq_state ^ value) & DISPATCH_QUEUE_IN_BARRIER) {
1792 flags |= DISPATCH_WAKEUP_BARRIER_COMPLETE;
1793 } else if (!_dq_state_is_runnable(value)) {
1794 if (_dq_state_is_base_wlh(dq_state)) {
1795 _dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
1796 }
1797 return _dispatch_release_2(dq);
1798 }
1799 dispatch_assert(!_dq_state_received_sync_wait(dq_state));
1800 dispatch_assert(!_dq_state_in_sync_transfer(dq_state));
1801 return dx_wakeup(dq, _dq_state_max_qos(dq_state), flags);
1802
1803 over_resume:
1804 if (unlikely(_dq_state_is_inactive(dq_state))) {
1805 DISPATCH_CLIENT_CRASH(dq, "Over-resume of an inactive object");
1806 }
1807 DISPATCH_CLIENT_CRASH(dq, "Over-resume of an object");
1808 }
1809
1810 const char *
1811 dispatch_queue_get_label(dispatch_queue_t dq)
1812 {
1813 if (slowpath(dq == DISPATCH_CURRENT_QUEUE_LABEL)) {
1814 dq = _dispatch_get_current_queue();
1815 }
1816 return dq->dq_label ? dq->dq_label : "";
1817 }
1818
1819 qos_class_t
1820 dispatch_queue_get_qos_class(dispatch_queue_t dq, int *relpri_ptr)
1821 {
1822 dispatch_qos_class_t qos = _dispatch_priority_qos(dq->dq_priority);
1823 if (relpri_ptr) {
1824 *relpri_ptr = qos ? _dispatch_priority_relpri(dq->dq_priority) : 0;
1825 }
1826 return _dispatch_qos_to_qos_class(qos);
1827 }
1828
1829 static void
1830 _dispatch_queue_set_width2(void *ctxt)
1831 {
1832 int w = (int)(intptr_t)ctxt; // intentional truncation
1833 uint32_t tmp;
1834 dispatch_queue_t dq = _dispatch_queue_get_current();
1835
1836 if (w >= 0) {
1837 tmp = w ? (unsigned int)w : 1;
1838 } else {
1839 dispatch_qos_t qos = _dispatch_qos_from_pp(_dispatch_get_priority());
1840 switch (w) {
1841 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
1842 tmp = _dispatch_qos_max_parallelism(qos,
1843 DISPATCH_MAX_PARALLELISM_PHYSICAL);
1844 break;
1845 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
1846 tmp = _dispatch_qos_max_parallelism(qos,
1847 DISPATCH_MAX_PARALLELISM_ACTIVE);
1848 break;
1849 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
1850 default:
1851 tmp = _dispatch_qos_max_parallelism(qos, 0);
1852 break;
1853 }
1854 }
1855 if (tmp > DISPATCH_QUEUE_WIDTH_MAX) {
1856 tmp = DISPATCH_QUEUE_WIDTH_MAX;
1857 }
1858
1859 dispatch_queue_flags_t old_dqf, new_dqf;
1860 os_atomic_rmw_loop2o(dq, dq_atomic_flags, old_dqf, new_dqf, relaxed, {
1861 new_dqf = (old_dqf & DQF_FLAGS_MASK) | DQF_WIDTH(tmp);
1862 });
1863 _dispatch_queue_inherit_wlh_from_target(dq, dq->do_targetq);
1864 _dispatch_object_debug(dq, "%s", __func__);
1865 }
1866
1867 void
1868 dispatch_queue_set_width(dispatch_queue_t dq, long width)
1869 {
1870 if (unlikely(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT ||
1871 dx_hastypeflag(dq, QUEUE_ROOT) ||
1872 dx_hastypeflag(dq, QUEUE_BASE))) {
1873 return;
1874 }
1875
1876 unsigned long type = dx_type(dq);
1877 switch (type) {
1878 case DISPATCH_QUEUE_LEGACY_TYPE:
1879 case DISPATCH_QUEUE_CONCURRENT_TYPE:
1880 break;
1881 case DISPATCH_QUEUE_SERIAL_TYPE:
1882 DISPATCH_CLIENT_CRASH(type, "Cannot set width of a serial queue");
1883 default:
1884 DISPATCH_CLIENT_CRASH(type, "Unexpected dispatch object type");
1885 }
1886
1887 if (likely((int)width >= 0)) {
1888 _dispatch_barrier_trysync_or_async_f(dq, (void*)(intptr_t)width,
1889 _dispatch_queue_set_width2, DISPATCH_BARRIER_TRYSYNC_SUSPEND);
1890 } else {
1891 // The negative width constants need to execute on the queue to
1892 // query the queue QoS
1893 _dispatch_barrier_async_detached_f(dq, (void*)(intptr_t)width,
1894 _dispatch_queue_set_width2);
1895 }
1896 }
1897
1898 static void
1899 _dispatch_queue_legacy_set_target_queue(void *ctxt)
1900 {
1901 dispatch_queue_t dq = _dispatch_queue_get_current();
1902 dispatch_queue_t tq = ctxt;
1903 dispatch_queue_t otq = dq->do_targetq;
1904
1905 if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
1906 #if DISPATCH_ALLOW_NON_LEAF_RETARGET
1907 _dispatch_ktrace3(DISPATCH_PERF_non_leaf_retarget, dq, otq, tq);
1908 _dispatch_bug_deprecated("Changing the target of a queue "
1909 "already targeted by other dispatch objects");
1910 #else
1911 DISPATCH_CLIENT_CRASH(0, "Cannot change the target of a queue "
1912 "already targeted by other dispatch objects");
1913 #endif
1914 }
1915
1916 _dispatch_queue_priority_inherit_from_target(dq, tq);
1917 _dispatch_queue_inherit_wlh_from_target(dq, tq);
1918 #if HAVE_PTHREAD_WORKQUEUE_QOS
1919 // see _dispatch_queue_class_wakeup()
1920 _dispatch_queue_sidelock_lock(dq);
1921 #endif
1922 dq->do_targetq = tq;
1923 #if HAVE_PTHREAD_WORKQUEUE_QOS
1924 // see _dispatch_queue_class_wakeup()
1925 _dispatch_queue_sidelock_unlock(dq);
1926 #endif
1927
1928 _dispatch_object_debug(dq, "%s", __func__);
1929 _dispatch_introspection_target_queue_changed(dq);
1930 _dispatch_release_tailcall(otq);
1931 }
1932
1933 void
1934 _dispatch_queue_set_target_queue(dispatch_queue_t dq, dispatch_queue_t tq)
1935 {
1936 dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT &&
1937 dq->do_targetq);
1938
1939 if (unlikely(!tq)) {
1940 bool is_concurrent_q = (dq->dq_width > 1);
1941 tq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, !is_concurrent_q);
1942 }
1943
1944 if (_dispatch_queue_try_inactive_suspend(dq)) {
1945 _dispatch_object_set_target_queue_inline(dq, tq);
1946 return dx_vtable(dq)->do_resume(dq, false);
1947 }
1948
1949 #if !DISPATCH_ALLOW_NON_LEAF_RETARGET
1950 if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
1951 DISPATCH_CLIENT_CRASH(0, "Cannot change the target of a queue "
1952 "already targeted by other dispatch objects");
1953 }
1954 #endif
1955
1956 if (unlikely(!_dispatch_queue_is_legacy(dq))) {
1957 #if DISPATCH_ALLOW_NON_LEAF_RETARGET
1958 if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
1959 DISPATCH_CLIENT_CRASH(0, "Cannot change the target of a queue "
1960 "already targeted by other dispatch objects");
1961 }
1962 #endif
1963 DISPATCH_CLIENT_CRASH(0, "Cannot change the target of this object "
1964 "after it has been activated");
1965 }
1966
1967 unsigned long type = dx_type(dq);
1968 switch (type) {
1969 case DISPATCH_QUEUE_LEGACY_TYPE:
1970 #if DISPATCH_ALLOW_NON_LEAF_RETARGET
1971 if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
1972 _dispatch_bug_deprecated("Changing the target of a queue "
1973 "already targeted by other dispatch objects");
1974 }
1975 #endif
1976 break;
1977 case DISPATCH_SOURCE_KEVENT_TYPE:
1978 case DISPATCH_MACH_CHANNEL_TYPE:
1979 _dispatch_ktrace1(DISPATCH_PERF_post_activate_retarget, dq);
1980 _dispatch_bug_deprecated("Changing the target of a source "
1981 "after it has been activated");
1982 break;
1983 default:
1984 DISPATCH_CLIENT_CRASH(type, "Unexpected dispatch object type");
1985 }
1986
1987 _dispatch_retain(tq);
1988 return _dispatch_barrier_trysync_or_async_f(dq, tq,
1989 _dispatch_queue_legacy_set_target_queue,
1990 DISPATCH_BARRIER_TRYSYNC_SUSPEND);
1991 }
1992
1993 #pragma mark -
1994 #pragma mark dispatch_mgr_queue
1995
1996 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1997 static struct dispatch_pthread_root_queue_context_s
1998 _dispatch_mgr_root_queue_pthread_context;
1999 static struct dispatch_root_queue_context_s
2000 _dispatch_mgr_root_queue_context = {{{
2001 #if DISPATCH_USE_WORKQUEUES
2002 .dgq_kworkqueue = (void*)(~0ul),
2003 #endif
2004 .dgq_ctxt = &_dispatch_mgr_root_queue_pthread_context,
2005 .dgq_thread_pool_size = 1,
2006 }}};
2007
2008 static struct dispatch_queue_s _dispatch_mgr_root_queue = {
2009 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root),
2010 .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
2011 .do_ctxt = &_dispatch_mgr_root_queue_context,
2012 .dq_label = "com.apple.root.libdispatch-manager",
2013 .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
2014 .dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
2015 DISPATCH_PRIORITY_SATURATED_OVERRIDE,
2016 .dq_serialnum = 3,
2017 };
2018 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2019
2020 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
2021 static struct {
2022 volatile int prio;
2023 volatile qos_class_t qos;
2024 int default_prio;
2025 int policy;
2026 pthread_t tid;
2027 } _dispatch_mgr_sched;
2028
2029 static dispatch_once_t _dispatch_mgr_sched_pred;
2030
2031 #if HAVE_PTHREAD_WORKQUEUE_QOS
2032 // TODO: switch to "event-reflector thread" property <rdar://problem/18126138>
2033 // Must be kept in sync with list of qos classes in sys/qos.h
2034 static const int _dispatch_mgr_sched_qos2prio[] = {
2035 [QOS_CLASS_MAINTENANCE] = 4,
2036 [QOS_CLASS_BACKGROUND] = 4,
2037 [QOS_CLASS_UTILITY] = 20,
2038 [QOS_CLASS_DEFAULT] = 31,
2039 [QOS_CLASS_USER_INITIATED] = 37,
2040 [QOS_CLASS_USER_INTERACTIVE] = 47,
2041 };
2042 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
2043
2044 static void
2045 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED)
2046 {
2047 struct sched_param param;
2048 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2049 pthread_attr_t *attr;
2050 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
2051 #else
2052 pthread_attr_t a, *attr = &a;
2053 #endif
2054 (void)dispatch_assume_zero(pthread_attr_init(attr));
2055 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr,
2056 &_dispatch_mgr_sched.policy));
2057 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
2058 #if HAVE_PTHREAD_WORKQUEUE_QOS
2059 qos_class_t qos = qos_class_main();
2060 if (qos == QOS_CLASS_DEFAULT) {
2061 qos = QOS_CLASS_USER_INITIATED; // rdar://problem/17279292
2062 }
2063 if (qos) {
2064 _dispatch_mgr_sched.qos = qos;
2065 param.sched_priority = _dispatch_mgr_sched_qos2prio[qos];
2066 }
2067 #endif
2068 _dispatch_mgr_sched.default_prio = param.sched_priority;
2069 _dispatch_mgr_sched.prio = _dispatch_mgr_sched.default_prio;
2070 }
2071 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
2072
2073 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2074 DISPATCH_NOINLINE
2075 static pthread_t *
2076 _dispatch_mgr_root_queue_init(void)
2077 {
2078 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
2079 struct sched_param param;
2080 pthread_attr_t *attr;
2081 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
2082 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr,
2083 PTHREAD_CREATE_DETACHED));
2084 #if !DISPATCH_DEBUG
2085 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr, 64 * 1024));
2086 #endif
2087 #if HAVE_PTHREAD_WORKQUEUE_QOS
2088 qos_class_t qos = _dispatch_mgr_sched.qos;
2089 if (qos) {
2090 if (_dispatch_set_qos_class_enabled) {
2091 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(attr,
2092 qos, 0));
2093 }
2094 }
2095 #endif
2096 param.sched_priority = _dispatch_mgr_sched.prio;
2097 if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
2098 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr, &param));
2099 }
2100 return &_dispatch_mgr_sched.tid;
2101 }
2102
2103 static inline void
2104 _dispatch_mgr_priority_apply(void)
2105 {
2106 struct sched_param param;
2107 do {
2108 param.sched_priority = _dispatch_mgr_sched.prio;
2109 if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
2110 (void)dispatch_assume_zero(pthread_setschedparam(
2111 _dispatch_mgr_sched.tid, _dispatch_mgr_sched.policy,
2112 &param));
2113 }
2114 } while (_dispatch_mgr_sched.prio > param.sched_priority);
2115 }
2116
2117 DISPATCH_NOINLINE
2118 void
2119 _dispatch_mgr_priority_init(void)
2120 {
2121 struct sched_param param;
2122 pthread_attr_t *attr;
2123 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
2124 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
2125 #if HAVE_PTHREAD_WORKQUEUE_QOS
2126 qos_class_t qos = 0;
2127 (void)pthread_attr_get_qos_class_np(attr, &qos, NULL);
2128 if (_dispatch_mgr_sched.qos > qos && _dispatch_set_qos_class_enabled) {
2129 (void)pthread_set_qos_class_self_np(_dispatch_mgr_sched.qos, 0);
2130 int p = _dispatch_mgr_sched_qos2prio[_dispatch_mgr_sched.qos];
2131 if (p > param.sched_priority) {
2132 param.sched_priority = p;
2133 }
2134 }
2135 #endif
2136 if (slowpath(_dispatch_mgr_sched.prio > param.sched_priority)) {
2137 return _dispatch_mgr_priority_apply();
2138 }
2139 }
2140 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2141
2142 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2143 DISPATCH_NOINLINE
2144 static void
2145 _dispatch_mgr_priority_raise(const pthread_attr_t *attr)
2146 {
2147 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
2148 struct sched_param param;
2149 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
2150 #if HAVE_PTHREAD_WORKQUEUE_QOS
2151 qos_class_t q, qos = 0;
2152 (void)pthread_attr_get_qos_class_np((pthread_attr_t *)attr, &qos, NULL);
2153 if (qos) {
2154 param.sched_priority = _dispatch_mgr_sched_qos2prio[qos];
2155 os_atomic_rmw_loop2o(&_dispatch_mgr_sched, qos, q, qos, relaxed, {
2156 if (q >= qos) os_atomic_rmw_loop_give_up(break);
2157 });
2158 }
2159 #endif
2160 int p, prio = param.sched_priority;
2161 os_atomic_rmw_loop2o(&_dispatch_mgr_sched, prio, p, prio, relaxed, {
2162 if (p >= prio) os_atomic_rmw_loop_give_up(return);
2163 });
2164 #if DISPATCH_USE_KEVENT_WORKQUEUE
2165 _dispatch_root_queues_init();
2166 if (_dispatch_kevent_workqueue_enabled) {
2167 pthread_priority_t pp = 0;
2168 if (prio > _dispatch_mgr_sched.default_prio) {
2169 // The values of _PTHREAD_PRIORITY_SCHED_PRI_FLAG and
2170 // _PTHREAD_PRIORITY_ROOTQUEUE_FLAG overlap, but that is not
2171 // problematic in this case, since it the second one is only ever
2172 // used on dq_priority fields.
2173 // We never pass the _PTHREAD_PRIORITY_ROOTQUEUE_FLAG to a syscall,
2174 // it is meaningful to libdispatch only.
2175 pp = (pthread_priority_t)prio | _PTHREAD_PRIORITY_SCHED_PRI_FLAG;
2176 } else if (qos) {
2177 pp = _pthread_qos_class_encode(qos, 0, 0);
2178 }
2179 if (pp) {
2180 int r = _pthread_workqueue_set_event_manager_priority(pp);
2181 (void)dispatch_assume_zero(r);
2182 }
2183 return;
2184 }
2185 #endif
2186 #if DISPATCH_USE_MGR_THREAD
2187 if (_dispatch_mgr_sched.tid) {
2188 return _dispatch_mgr_priority_apply();
2189 }
2190 #endif
2191 }
2192 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2193
2194 #if DISPATCH_USE_KEVENT_WORKQUEUE
2195 void
2196 _dispatch_kevent_workqueue_init(void)
2197 {
2198 // Initialize kevent workqueue support
2199 _dispatch_root_queues_init();
2200 if (!_dispatch_kevent_workqueue_enabled) return;
2201 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
2202 qos_class_t qos = _dispatch_mgr_sched.qos;
2203 int prio = _dispatch_mgr_sched.prio;
2204 pthread_priority_t pp = 0;
2205 if (qos) {
2206 pp = _pthread_qos_class_encode(qos, 0, 0);
2207 }
2208 if (prio > _dispatch_mgr_sched.default_prio) {
2209 pp = (pthread_priority_t)prio | _PTHREAD_PRIORITY_SCHED_PRI_FLAG;
2210 }
2211 if (pp) {
2212 int r = _pthread_workqueue_set_event_manager_priority(pp);
2213 (void)dispatch_assume_zero(r);
2214 }
2215 }
2216 #endif // DISPATCH_USE_KEVENT_WORKQUEUE
2217
2218 #pragma mark -
2219 #pragma mark dispatch_pthread_root_queue
2220
2221 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2222 static dispatch_queue_t
2223 _dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
2224 const pthread_attr_t *attr, dispatch_block_t configure,
2225 dispatch_pthread_root_queue_observer_hooks_t observer_hooks)
2226 {
2227 dispatch_queue_t dq;
2228 dispatch_root_queue_context_t qc;
2229 dispatch_pthread_root_queue_context_t pqc;
2230 dispatch_queue_flags_t dqf = 0;
2231 size_t dqs;
2232 int32_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ?
2233 (int8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0;
2234
2235 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
2236 dqs = roundup(dqs, _Alignof(struct dispatch_root_queue_context_s));
2237 dq = _dispatch_object_alloc(DISPATCH_VTABLE(queue_root), dqs +
2238 sizeof(struct dispatch_root_queue_context_s) +
2239 sizeof(struct dispatch_pthread_root_queue_context_s));
2240 qc = (void*)dq + dqs;
2241 dispatch_assert((uintptr_t)qc % _Alignof(typeof(*qc)) == 0);
2242 pqc = (void*)qc + sizeof(struct dispatch_root_queue_context_s);
2243 dispatch_assert((uintptr_t)pqc % _Alignof(typeof(*pqc)) == 0);
2244 if (label) {
2245 const char *tmp = _dispatch_strdup_if_mutable(label);
2246 if (tmp != label) {
2247 dqf |= DQF_LABEL_NEEDS_FREE;
2248 label = tmp;
2249 }
2250 }
2251
2252 _dispatch_queue_init(dq, dqf, DISPATCH_QUEUE_WIDTH_POOL, 0);
2253 dq->dq_label = label;
2254 dq->dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE;
2255 dq->do_ctxt = qc;
2256 dq->dq_priority = DISPATCH_PRIORITY_SATURATED_OVERRIDE;
2257
2258 pqc->dpq_thread_mediator.do_vtable = DISPATCH_VTABLE(semaphore);
2259 qc->dgq_ctxt = pqc;
2260 #if DISPATCH_USE_WORKQUEUES
2261 qc->dgq_kworkqueue = (void*)(~0ul);
2262 #endif
2263 _dispatch_root_queue_init_pthread_pool(qc, pool_size, true);
2264
2265 if (attr) {
2266 memcpy(&pqc->dpq_thread_attr, attr, sizeof(pthread_attr_t));
2267 _dispatch_mgr_priority_raise(&pqc->dpq_thread_attr);
2268 } else {
2269 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
2270 }
2271 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
2272 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
2273 if (configure) {
2274 pqc->dpq_thread_configure = _dispatch_Block_copy(configure);
2275 }
2276 if (observer_hooks) {
2277 pqc->dpq_observer_hooks = *observer_hooks;
2278 }
2279 _dispatch_object_debug(dq, "%s", __func__);
2280 return _dispatch_introspection_queue_create(dq);
2281 }
2282
2283 dispatch_queue_t
2284 dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
2285 const pthread_attr_t *attr, dispatch_block_t configure)
2286 {
2287 return _dispatch_pthread_root_queue_create(label, flags, attr, configure,
2288 NULL);
2289 }
2290
2291 #if DISPATCH_IOHID_SPI
2292 dispatch_queue_t
2293 _dispatch_pthread_root_queue_create_with_observer_hooks_4IOHID(const char *label,
2294 unsigned long flags, const pthread_attr_t *attr,
2295 dispatch_pthread_root_queue_observer_hooks_t observer_hooks,
2296 dispatch_block_t configure)
2297 {
2298 if (!observer_hooks->queue_will_execute ||
2299 !observer_hooks->queue_did_execute) {
2300 DISPATCH_CLIENT_CRASH(0, "Invalid pthread root queue observer hooks");
2301 }
2302 return _dispatch_pthread_root_queue_create(label, flags, attr, configure,
2303 observer_hooks);
2304 }
2305 #endif
2306
2307 dispatch_queue_t
2308 dispatch_pthread_root_queue_copy_current(void)
2309 {
2310 dispatch_queue_t dq = _dispatch_queue_get_current();
2311 if (!dq) return NULL;
2312 while (unlikely(dq->do_targetq)) {
2313 dq = dq->do_targetq;
2314 }
2315 if (dx_type(dq) != DISPATCH_QUEUE_GLOBAL_ROOT_TYPE ||
2316 dq->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
2317 return NULL;
2318 }
2319 return (dispatch_queue_t)_os_object_retain_with_resurrect(dq->_as_os_obj);
2320 }
2321
2322 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2323
2324 void
2325 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq, bool *allow_free)
2326 {
2327 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
2328 DISPATCH_INTERNAL_CRASH(dq, "Global root queue disposed");
2329 }
2330 _dispatch_object_debug(dq, "%s", __func__);
2331 _dispatch_introspection_queue_dispose(dq);
2332 #if DISPATCH_USE_PTHREAD_POOL
2333 dispatch_root_queue_context_t qc = dq->do_ctxt;
2334 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
2335
2336 pthread_attr_destroy(&pqc->dpq_thread_attr);
2337 _dispatch_semaphore_dispose(&pqc->dpq_thread_mediator, NULL);
2338 if (pqc->dpq_thread_configure) {
2339 Block_release(pqc->dpq_thread_configure);
2340 }
2341 dq->do_targetq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false);
2342 #endif
2343 if (dq->dq_label && _dispatch_queue_label_needs_free(dq)) {
2344 free((void*)dq->dq_label);
2345 }
2346 _dispatch_queue_destroy(dq, allow_free);
2347 }
2348
2349 #pragma mark -
2350 #pragma mark dispatch_queue_specific
2351
2352 struct dispatch_queue_specific_queue_s {
2353 DISPATCH_QUEUE_HEADER(queue_specific_queue);
2354 TAILQ_HEAD(dispatch_queue_specific_head_s,
2355 dispatch_queue_specific_s) dqsq_contexts;
2356 } DISPATCH_ATOMIC64_ALIGN;
2357
2358 struct dispatch_queue_specific_s {
2359 const void *dqs_key;
2360 void *dqs_ctxt;
2361 dispatch_function_t dqs_destructor;
2362 TAILQ_ENTRY(dispatch_queue_specific_s) dqs_list;
2363 };
2364 DISPATCH_DECL(dispatch_queue_specific);
2365
2366 void
2367 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq,
2368 bool *allow_free)
2369 {
2370 dispatch_queue_specific_t dqs, tmp;
2371 dispatch_queue_t rq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false);
2372
2373 TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) {
2374 if (dqs->dqs_destructor) {
2375 dispatch_async_f(rq, dqs->dqs_ctxt, dqs->dqs_destructor);
2376 }
2377 free(dqs);
2378 }
2379 _dispatch_queue_destroy(dqsq->_as_dq, allow_free);
2380 }
2381
2382 static void
2383 _dispatch_queue_init_specific(dispatch_queue_t dq)
2384 {
2385 dispatch_queue_specific_queue_t dqsq;
2386
2387 dqsq = _dispatch_object_alloc(DISPATCH_VTABLE(queue_specific_queue),
2388 sizeof(struct dispatch_queue_specific_queue_s));
2389 _dispatch_queue_init(dqsq->_as_dq, DQF_NONE, DISPATCH_QUEUE_WIDTH_MAX,
2390 DISPATCH_QUEUE_ROLE_BASE_ANON);
2391 dqsq->do_xref_cnt = -1;
2392 dqsq->do_targetq = _dispatch_get_root_queue(
2393 DISPATCH_QOS_USER_INITIATED, true);
2394 dqsq->dq_label = "queue-specific";
2395 TAILQ_INIT(&dqsq->dqsq_contexts);
2396 if (slowpath(!os_atomic_cmpxchg2o(dq, dq_specific_q, NULL,
2397 dqsq->_as_dq, release))) {
2398 _dispatch_release(dqsq->_as_dq);
2399 }
2400 }
2401
2402 static void
2403 _dispatch_queue_set_specific(void *ctxt)
2404 {
2405 dispatch_queue_specific_t dqs, dqsn = ctxt;
2406 dispatch_queue_specific_queue_t dqsq =
2407 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
2408
2409 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
2410 if (dqs->dqs_key == dqsn->dqs_key) {
2411 // Destroy previous context for existing key
2412 if (dqs->dqs_destructor) {
2413 dispatch_async_f(_dispatch_get_root_queue(
2414 DISPATCH_QOS_DEFAULT, false), dqs->dqs_ctxt,
2415 dqs->dqs_destructor);
2416 }
2417 if (dqsn->dqs_ctxt) {
2418 // Copy new context for existing key
2419 dqs->dqs_ctxt = dqsn->dqs_ctxt;
2420 dqs->dqs_destructor = dqsn->dqs_destructor;
2421 } else {
2422 // Remove context storage for existing key
2423 TAILQ_REMOVE(&dqsq->dqsq_contexts, dqs, dqs_list);
2424 free(dqs);
2425 }
2426 return free(dqsn);
2427 }
2428 }
2429 // Insert context storage for new key
2430 TAILQ_INSERT_TAIL(&dqsq->dqsq_contexts, dqsn, dqs_list);
2431 }
2432
2433 DISPATCH_NOINLINE
2434 void
2435 dispatch_queue_set_specific(dispatch_queue_t dq, const void *key,
2436 void *ctxt, dispatch_function_t destructor)
2437 {
2438 if (slowpath(!key)) {
2439 return;
2440 }
2441 dispatch_queue_specific_t dqs;
2442
2443 dqs = _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s));
2444 dqs->dqs_key = key;
2445 dqs->dqs_ctxt = ctxt;
2446 dqs->dqs_destructor = destructor;
2447 if (slowpath(!dq->dq_specific_q)) {
2448 _dispatch_queue_init_specific(dq);
2449 }
2450 _dispatch_barrier_trysync_or_async_f(dq->dq_specific_q, dqs,
2451 _dispatch_queue_set_specific, 0);
2452 }
2453
2454 static void
2455 _dispatch_queue_get_specific(void *ctxt)
2456 {
2457 void **ctxtp = ctxt;
2458 void *key = *ctxtp;
2459 dispatch_queue_specific_queue_t dqsq =
2460 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
2461 dispatch_queue_specific_t dqs;
2462
2463 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
2464 if (dqs->dqs_key == key) {
2465 *ctxtp = dqs->dqs_ctxt;
2466 return;
2467 }
2468 }
2469 *ctxtp = NULL;
2470 }
2471
2472 DISPATCH_ALWAYS_INLINE
2473 static inline void *
2474 _dispatch_queue_get_specific_inline(dispatch_queue_t dq, const void *key)
2475 {
2476 void *ctxt = NULL;
2477 if (fastpath(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE && dq->dq_specific_q)){
2478 ctxt = (void *)key;
2479 dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific);
2480 }
2481 return ctxt;
2482 }
2483
2484 DISPATCH_NOINLINE
2485 void *
2486 dispatch_queue_get_specific(dispatch_queue_t dq, const void *key)
2487 {
2488 if (slowpath(!key)) {
2489 return NULL;
2490 }
2491 return _dispatch_queue_get_specific_inline(dq, key);
2492 }
2493
2494 DISPATCH_NOINLINE
2495 void *
2496 dispatch_get_specific(const void *key)
2497 {
2498 if (slowpath(!key)) {
2499 return NULL;
2500 }
2501 void *ctxt = NULL;
2502 dispatch_queue_t dq = _dispatch_queue_get_current();
2503
2504 while (slowpath(dq)) {
2505 ctxt = _dispatch_queue_get_specific_inline(dq, key);
2506 if (ctxt) break;
2507 dq = dq->do_targetq;
2508 }
2509 return ctxt;
2510 }
2511
2512 #if DISPATCH_IOHID_SPI
2513 bool
2514 _dispatch_queue_is_exclusively_owned_by_current_thread_4IOHID(
2515 dispatch_queue_t dq) // rdar://problem/18033810
2516 {
2517 if (dq->dq_width != 1) {
2518 DISPATCH_CLIENT_CRASH(dq->dq_width, "Invalid queue type");
2519 }
2520 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
2521 return _dq_state_drain_locked_by_self(dq_state);
2522 }
2523 #endif
2524
2525 #pragma mark -
2526 #pragma mark dispatch_queue_debug
2527
2528 size_t
2529 _dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
2530 {
2531 size_t offset = 0;
2532 dispatch_queue_t target = dq->do_targetq;
2533 const char *tlabel = target && target->dq_label ? target->dq_label : "";
2534 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
2535
2536 offset += dsnprintf(&buf[offset], bufsiz - offset, "sref = %d, "
2537 "target = %s[%p], width = 0x%x, state = 0x%016llx",
2538 dq->dq_sref_cnt + 1, tlabel, target, dq->dq_width,
2539 (unsigned long long)dq_state);
2540 if (_dq_state_is_suspended(dq_state)) {
2541 offset += dsnprintf(&buf[offset], bufsiz - offset, ", suspended = %d",
2542 _dq_state_suspend_cnt(dq_state));
2543 }
2544 if (_dq_state_is_inactive(dq_state)) {
2545 offset += dsnprintf(&buf[offset], bufsiz - offset, ", inactive");
2546 } else if (_dq_state_needs_activation(dq_state)) {
2547 offset += dsnprintf(&buf[offset], bufsiz - offset, ", needs-activation");
2548 }
2549 if (_dq_state_is_enqueued(dq_state)) {
2550 offset += dsnprintf(&buf[offset], bufsiz - offset, ", enqueued");
2551 }
2552 if (_dq_state_is_dirty(dq_state)) {
2553 offset += dsnprintf(&buf[offset], bufsiz - offset, ", dirty");
2554 }
2555 dispatch_qos_t qos = _dq_state_max_qos(dq_state);
2556 if (qos) {
2557 offset += dsnprintf(&buf[offset], bufsiz - offset, ", max qos %d", qos);
2558 }
2559 mach_port_t owner = _dq_state_drain_owner(dq_state);
2560 if (!_dispatch_queue_is_thread_bound(dq) && owner) {
2561 offset += dsnprintf(&buf[offset], bufsiz - offset, ", draining on 0x%x",
2562 owner);
2563 }
2564 if (_dq_state_is_in_barrier(dq_state)) {
2565 offset += dsnprintf(&buf[offset], bufsiz - offset, ", in-barrier");
2566 } else {
2567 offset += dsnprintf(&buf[offset], bufsiz - offset, ", in-flight = %d",
2568 _dq_state_used_width(dq_state, dq->dq_width));
2569 }
2570 if (_dq_state_has_pending_barrier(dq_state)) {
2571 offset += dsnprintf(&buf[offset], bufsiz - offset, ", pending-barrier");
2572 }
2573 if (_dispatch_queue_is_thread_bound(dq)) {
2574 offset += dsnprintf(&buf[offset], bufsiz - offset, ", thread = 0x%x ",
2575 owner);
2576 }
2577 return offset;
2578 }
2579
2580 size_t
2581 dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
2582 {
2583 size_t offset = 0;
2584 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2585 dq->dq_label ? dq->dq_label : dx_kind(dq), dq);
2586 offset += _dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
2587 offset += _dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
2588 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2589 return offset;
2590 }
2591
2592 #if DISPATCH_DEBUG
2593 void
2594 dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
2595 if (fastpath(dq)) {
2596 _dispatch_object_debug(dq, "%s", str);
2597 } else {
2598 _dispatch_log("queue[NULL]: %s", str);
2599 }
2600 }
2601 #endif
2602
2603 #if DISPATCH_PERF_MON
2604
2605 #define DISPATCH_PERF_MON_BUCKETS 8
2606
2607 static struct {
2608 uint64_t volatile time_total;
2609 uint64_t volatile count_total;
2610 uint64_t volatile thread_total;
2611 } _dispatch_stats[DISPATCH_PERF_MON_BUCKETS];
2612 DISPATCH_USED static size_t _dispatch_stat_buckets = DISPATCH_PERF_MON_BUCKETS;
2613
2614 void
2615 _dispatch_queue_merge_stats(uint64_t start, bool trace, perfmon_thread_type type)
2616 {
2617 uint64_t delta = _dispatch_absolute_time() - start;
2618 unsigned long count;
2619 int bucket = 0;
2620 count = (unsigned long)_dispatch_thread_getspecific(dispatch_bcounter_key);
2621 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
2622 if (count == 0) {
2623 bucket = 0;
2624 if (trace) _dispatch_ktrace1(DISPATCH_PERF_MON_worker_useless, type);
2625 } else {
2626 bucket = MIN(DISPATCH_PERF_MON_BUCKETS - 1,
2627 (int)sizeof(count) * CHAR_BIT - __builtin_clzl(count));
2628 os_atomic_add(&_dispatch_stats[bucket].count_total, count, relaxed);
2629 }
2630 os_atomic_add(&_dispatch_stats[bucket].time_total, delta, relaxed);
2631 os_atomic_inc(&_dispatch_stats[bucket].thread_total, relaxed);
2632 if (trace) {
2633 _dispatch_ktrace3(DISPATCH_PERF_MON_worker_thread_end, count, delta, type);
2634 }
2635 }
2636
2637 #endif
2638
2639 #pragma mark -
2640 #pragma mark _dispatch_set_priority_and_mach_voucher
2641 #if HAVE_PTHREAD_WORKQUEUE_QOS
2642
2643 DISPATCH_NOINLINE
2644 void
2645 _dispatch_set_priority_and_mach_voucher_slow(pthread_priority_t pp,
2646 mach_voucher_t kv)
2647 {
2648 _pthread_set_flags_t pflags = 0;
2649 if (pp && _dispatch_set_qos_class_enabled) {
2650 pthread_priority_t old_pri = _dispatch_get_priority();
2651 if (pp != old_pri) {
2652 if (old_pri & _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG) {
2653 pflags |= _PTHREAD_SET_SELF_WQ_KEVENT_UNBIND;
2654 // when we unbind, overcomitness can flip, so we need to learn
2655 // it from the defaultpri, see _dispatch_priority_compute_update
2656 pp |= (_dispatch_get_basepri() &
2657 DISPATCH_PRIORITY_FLAG_OVERCOMMIT);
2658 } else {
2659 // else we need to keep the one that is set in the current pri
2660 pp |= (old_pri & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
2661 }
2662 if (likely(old_pri & ~_PTHREAD_PRIORITY_FLAGS_MASK)) {
2663 pflags |= _PTHREAD_SET_SELF_QOS_FLAG;
2664 }
2665 uint64_t mgr_dq_state =
2666 os_atomic_load2o(&_dispatch_mgr_q, dq_state, relaxed);
2667 if (unlikely(_dq_state_drain_locked_by_self(mgr_dq_state))) {
2668 DISPATCH_INTERNAL_CRASH(pp,
2669 "Changing the QoS while on the manager queue");
2670 }
2671 if (unlikely(pp & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) {
2672 DISPATCH_INTERNAL_CRASH(pp, "Cannot raise oneself to manager");
2673 }
2674 if (old_pri & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG) {
2675 DISPATCH_INTERNAL_CRASH(old_pri,
2676 "Cannot turn a manager thread into a normal one");
2677 }
2678 }
2679 }
2680 if (kv != VOUCHER_NO_MACH_VOUCHER) {
2681 #if VOUCHER_USE_MACH_VOUCHER
2682 pflags |= _PTHREAD_SET_SELF_VOUCHER_FLAG;
2683 #endif
2684 }
2685 if (!pflags) return;
2686 int r = _pthread_set_properties_self(pflags, pp, kv);
2687 if (r == EINVAL) {
2688 DISPATCH_INTERNAL_CRASH(pp, "_pthread_set_properties_self failed");
2689 }
2690 (void)dispatch_assume_zero(r);
2691 }
2692
2693 DISPATCH_NOINLINE
2694 voucher_t
2695 _dispatch_set_priority_and_voucher_slow(pthread_priority_t priority,
2696 voucher_t v, dispatch_thread_set_self_t flags)
2697 {
2698 voucher_t ov = DISPATCH_NO_VOUCHER;
2699 mach_voucher_t kv = VOUCHER_NO_MACH_VOUCHER;
2700 if (v != DISPATCH_NO_VOUCHER) {
2701 bool retained = flags & DISPATCH_VOUCHER_CONSUME;
2702 ov = _voucher_get();
2703 if (ov == v && (flags & DISPATCH_VOUCHER_REPLACE)) {
2704 if (retained && v) _voucher_release_no_dispose(v);
2705 ov = DISPATCH_NO_VOUCHER;
2706 } else {
2707 if (!retained && v) _voucher_retain(v);
2708 kv = _voucher_swap_and_get_mach_voucher(ov, v);
2709 }
2710 }
2711 if (!(flags & DISPATCH_THREAD_PARK)) {
2712 _dispatch_set_priority_and_mach_voucher_slow(priority, kv);
2713 }
2714 if (ov != DISPATCH_NO_VOUCHER && (flags & DISPATCH_VOUCHER_REPLACE)) {
2715 if (ov) _voucher_release(ov);
2716 ov = DISPATCH_NO_VOUCHER;
2717 }
2718 return ov;
2719 }
2720 #endif
2721 #pragma mark -
2722 #pragma mark dispatch_continuation_t
2723
2724 const struct dispatch_continuation_vtable_s _dispatch_continuation_vtables[] = {
2725 DC_VTABLE_ENTRY(ASYNC_REDIRECT,
2726 .do_kind = "dc-redirect",
2727 .do_invoke = _dispatch_async_redirect_invoke),
2728 #if HAVE_MACH
2729 DC_VTABLE_ENTRY(MACH_SEND_BARRRIER_DRAIN,
2730 .do_kind = "dc-mach-send-drain",
2731 .do_invoke = _dispatch_mach_send_barrier_drain_invoke),
2732 DC_VTABLE_ENTRY(MACH_SEND_BARRIER,
2733 .do_kind = "dc-mach-send-barrier",
2734 .do_invoke = _dispatch_mach_barrier_invoke),
2735 DC_VTABLE_ENTRY(MACH_RECV_BARRIER,
2736 .do_kind = "dc-mach-recv-barrier",
2737 .do_invoke = _dispatch_mach_barrier_invoke),
2738 DC_VTABLE_ENTRY(MACH_ASYNC_REPLY,
2739 .do_kind = "dc-mach-async-reply",
2740 .do_invoke = _dispatch_mach_msg_async_reply_invoke),
2741 #endif
2742 #if HAVE_PTHREAD_WORKQUEUE_QOS
2743 DC_VTABLE_ENTRY(OVERRIDE_STEALING,
2744 .do_kind = "dc-override-stealing",
2745 .do_invoke = _dispatch_queue_override_invoke),
2746 DC_VTABLE_ENTRY(OVERRIDE_OWNING,
2747 .do_kind = "dc-override-owning",
2748 .do_invoke = _dispatch_queue_override_invoke),
2749 #endif
2750 };
2751
2752 static void
2753 _dispatch_force_cache_cleanup(void)
2754 {
2755 dispatch_continuation_t dc;
2756 dc = _dispatch_thread_getspecific(dispatch_cache_key);
2757 if (dc) {
2758 _dispatch_thread_setspecific(dispatch_cache_key, NULL);
2759 _dispatch_cache_cleanup(dc);
2760 }
2761 }
2762
2763 DISPATCH_NOINLINE
2764 static void
2765 _dispatch_cache_cleanup(void *value)
2766 {
2767 dispatch_continuation_t dc, next_dc = value;
2768
2769 while ((dc = next_dc)) {
2770 next_dc = dc->do_next;
2771 _dispatch_continuation_free_to_heap(dc);
2772 }
2773 }
2774
2775 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
2776 DISPATCH_NOINLINE
2777 void
2778 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc)
2779 {
2780 _dispatch_continuation_free_to_heap(dc);
2781 dispatch_continuation_t next_dc;
2782 dc = _dispatch_thread_getspecific(dispatch_cache_key);
2783 int cnt;
2784 if (!dc || (cnt = dc->dc_cache_cnt -
2785 _dispatch_continuation_cache_limit) <= 0) {
2786 return;
2787 }
2788 do {
2789 next_dc = dc->do_next;
2790 _dispatch_continuation_free_to_heap(dc);
2791 } while (--cnt && (dc = next_dc));
2792 _dispatch_thread_setspecific(dispatch_cache_key, next_dc);
2793 }
2794 #endif
2795
2796 DISPATCH_NOINLINE
2797 static void
2798 _dispatch_continuation_push(dispatch_queue_t dq, dispatch_continuation_t dc)
2799 {
2800 dx_push(dq, dc, _dispatch_continuation_override_qos(dq, dc));
2801 }
2802
2803 DISPATCH_ALWAYS_INLINE
2804 static inline void
2805 _dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
2806 bool barrier)
2807 {
2808 if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
2809 return _dispatch_continuation_push(dq, dc);
2810 }
2811 return _dispatch_async_f2(dq, dc);
2812 }
2813
2814 DISPATCH_NOINLINE
2815 void
2816 _dispatch_continuation_async(dispatch_queue_t dq, dispatch_continuation_t dc)
2817 {
2818 _dispatch_continuation_async2(dq, dc,
2819 dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT);
2820 }
2821
2822 #pragma mark -
2823 #pragma mark dispatch_block_create
2824
2825 #if __BLOCKS__
2826
2827 DISPATCH_ALWAYS_INLINE
2828 static inline bool
2829 _dispatch_block_flags_valid(dispatch_block_flags_t flags)
2830 {
2831 return ((flags & ~DISPATCH_BLOCK_API_MASK) == 0);
2832 }
2833
2834 DISPATCH_ALWAYS_INLINE
2835 static inline dispatch_block_flags_t
2836 _dispatch_block_normalize_flags(dispatch_block_flags_t flags)
2837 {
2838 if (flags & (DISPATCH_BLOCK_NO_VOUCHER|DISPATCH_BLOCK_DETACHED)) {
2839 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2840 }
2841 if (flags & (DISPATCH_BLOCK_NO_QOS_CLASS|DISPATCH_BLOCK_DETACHED)) {
2842 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2843 }
2844 return flags;
2845 }
2846
2847 static inline dispatch_block_t
2848 _dispatch_block_create_with_voucher_and_priority(dispatch_block_flags_t flags,
2849 voucher_t voucher, pthread_priority_t pri, dispatch_block_t block)
2850 {
2851 flags = _dispatch_block_normalize_flags(flags);
2852 bool assign = (flags & DISPATCH_BLOCK_ASSIGN_CURRENT);
2853
2854 if (assign && !(flags & DISPATCH_BLOCK_HAS_VOUCHER)) {
2855 #if OS_VOUCHER_ACTIVITY_SPI
2856 voucher = VOUCHER_CURRENT;
2857 #endif
2858 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2859 }
2860 #if OS_VOUCHER_ACTIVITY_SPI
2861 if (voucher == VOUCHER_CURRENT) {
2862 voucher = _voucher_get();
2863 }
2864 #endif
2865 if (assign && !(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
2866 pri = _dispatch_priority_propagate();
2867 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2868 }
2869 dispatch_block_t db = _dispatch_block_create(flags, voucher, pri, block);
2870 #if DISPATCH_DEBUG
2871 dispatch_assert(_dispatch_block_get_data(db));
2872 #endif
2873 return db;
2874 }
2875
2876 dispatch_block_t
2877 dispatch_block_create(dispatch_block_flags_t flags, dispatch_block_t block)
2878 {
2879 if (!_dispatch_block_flags_valid(flags)) return DISPATCH_BAD_INPUT;
2880 return _dispatch_block_create_with_voucher_and_priority(flags, NULL, 0,
2881 block);
2882 }
2883
2884 dispatch_block_t
2885 dispatch_block_create_with_qos_class(dispatch_block_flags_t flags,
2886 dispatch_qos_class_t qos_class, int relative_priority,
2887 dispatch_block_t block)
2888 {
2889 if (!_dispatch_block_flags_valid(flags) ||
2890 !_dispatch_qos_class_valid(qos_class, relative_priority)) {
2891 return DISPATCH_BAD_INPUT;
2892 }
2893 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2894 pthread_priority_t pri = 0;
2895 #if HAVE_PTHREAD_WORKQUEUE_QOS
2896 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0);
2897 #endif
2898 return _dispatch_block_create_with_voucher_and_priority(flags, NULL,
2899 pri, block);
2900 }
2901
2902 dispatch_block_t
2903 dispatch_block_create_with_voucher(dispatch_block_flags_t flags,
2904 voucher_t voucher, dispatch_block_t block)
2905 {
2906 if (!_dispatch_block_flags_valid(flags)) return DISPATCH_BAD_INPUT;
2907 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2908 return _dispatch_block_create_with_voucher_and_priority(flags, voucher, 0,
2909 block);
2910 }
2911
2912 dispatch_block_t
2913 dispatch_block_create_with_voucher_and_qos_class(dispatch_block_flags_t flags,
2914 voucher_t voucher, dispatch_qos_class_t qos_class,
2915 int relative_priority, dispatch_block_t block)
2916 {
2917 if (!_dispatch_block_flags_valid(flags) ||
2918 !_dispatch_qos_class_valid(qos_class, relative_priority)) {
2919 return DISPATCH_BAD_INPUT;
2920 }
2921 flags |= (DISPATCH_BLOCK_HAS_VOUCHER|DISPATCH_BLOCK_HAS_PRIORITY);
2922 pthread_priority_t pri = 0;
2923 #if HAVE_PTHREAD_WORKQUEUE_QOS
2924 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0);
2925 #endif
2926 return _dispatch_block_create_with_voucher_and_priority(flags, voucher,
2927 pri, block);
2928 }
2929
2930 void
2931 dispatch_block_perform(dispatch_block_flags_t flags, dispatch_block_t block)
2932 {
2933 if (!_dispatch_block_flags_valid(flags)) {
2934 DISPATCH_CLIENT_CRASH(flags, "Invalid flags passed to "
2935 "dispatch_block_perform()");
2936 }
2937 flags = _dispatch_block_normalize_flags(flags);
2938 struct dispatch_block_private_data_s dbpds =
2939 DISPATCH_BLOCK_PRIVATE_DATA_PERFORM_INITIALIZER(flags, block);
2940 return _dispatch_block_invoke_direct(&dbpds);
2941 }
2942
2943 #define _dbpd_group(dbpd) ((dbpd)->dbpd_group)
2944
2945 void
2946 _dispatch_block_invoke_direct(const struct dispatch_block_private_data_s *dbcpd)
2947 {
2948 dispatch_block_private_data_t dbpd = (dispatch_block_private_data_t)dbcpd;
2949 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2950 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2951 if (slowpath(atomic_flags & DBF_WAITED)) {
2952 DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
2953 "run more than once and waited for");
2954 }
2955 if (atomic_flags & DBF_CANCELED) goto out;
2956
2957 pthread_priority_t op = 0, p = 0;
2958 op = _dispatch_block_invoke_should_set_priority(flags, dbpd->dbpd_priority);
2959 if (op) {
2960 p = dbpd->dbpd_priority;
2961 }
2962 voucher_t ov, v = DISPATCH_NO_VOUCHER;
2963 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2964 v = dbpd->dbpd_voucher;
2965 }
2966 ov = _dispatch_set_priority_and_voucher(p, v, 0);
2967 dbpd->dbpd_thread = _dispatch_tid_self();
2968 _dispatch_client_callout(dbpd->dbpd_block,
2969 _dispatch_Block_invoke(dbpd->dbpd_block));
2970 _dispatch_reset_priority_and_voucher(op, ov);
2971 out:
2972 if ((atomic_flags & DBF_PERFORM) == 0) {
2973 if (os_atomic_inc2o(dbpd, dbpd_performed, relaxed) == 1) {
2974 dispatch_group_leave(_dbpd_group(dbpd));
2975 }
2976 }
2977 }
2978
2979 void
2980 _dispatch_block_sync_invoke(void *block)
2981 {
2982 dispatch_block_t b = block;
2983 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
2984 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2985 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2986 if (unlikely(atomic_flags & DBF_WAITED)) {
2987 DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
2988 "run more than once and waited for");
2989 }
2990 if (atomic_flags & DBF_CANCELED) goto out;
2991
2992 voucher_t ov = DISPATCH_NO_VOUCHER;
2993 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2994 ov = _dispatch_adopt_priority_and_set_voucher(0, dbpd->dbpd_voucher, 0);
2995 }
2996 dbpd->dbpd_block();
2997 _dispatch_reset_voucher(ov, 0);
2998 out:
2999 if ((atomic_flags & DBF_PERFORM) == 0) {
3000 if (os_atomic_inc2o(dbpd, dbpd_performed, relaxed) == 1) {
3001 dispatch_group_leave(_dbpd_group(dbpd));
3002 }
3003 }
3004
3005 os_mpsc_queue_t oq;
3006 oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
3007 if (oq) {
3008 // balances dispatch_{,barrier_,}sync
3009 _os_object_release_internal_n(oq->_as_os_obj, 2);
3010 }
3011 }
3012
3013 static void
3014 _dispatch_block_async_invoke_reset_max_qos(dispatch_queue_t dq,
3015 dispatch_qos_t qos)
3016 {
3017 uint64_t old_state, new_state, qos_bits = _dq_state_from_qos(qos);
3018
3019 // Only dispatch queues can reach this point (as opposed to sources or more
3020 // complex objects) which allows us to handle the DIRTY bit protocol by only
3021 // looking at the tail
3022 dispatch_assert(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE);
3023
3024 again:
3025 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
3026 dispatch_assert(_dq_state_is_base_wlh(old_state));
3027 if ((old_state & DISPATCH_QUEUE_MAX_QOS_MASK) <= qos_bits) {
3028 // Nothing to do if the QoS isn't going down
3029 os_atomic_rmw_loop_give_up(return);
3030 }
3031 if (_dq_state_is_dirty(old_state)) {
3032 os_atomic_rmw_loop_give_up({
3033 // just renew the drain lock with an acquire barrier, to see
3034 // what the enqueuer that set DIRTY has done.
3035 // the xor generates better assembly as DISPATCH_QUEUE_DIRTY
3036 // is already in a register
3037 os_atomic_xor2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, acquire);
3038 if (!dq->dq_items_tail) {
3039 goto again;
3040 }
3041 return;
3042 });
3043 }
3044
3045 new_state = old_state;
3046 new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
3047 new_state |= qos_bits;
3048 });
3049
3050 _dispatch_deferred_items_get()->ddi_wlh_needs_update = true;
3051 _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
3052 }
3053
3054 #define DISPATCH_BLOCK_ASYNC_INVOKE_RELEASE 0x1
3055 #define DISPATCH_BLOCK_ASYNC_INVOKE_NO_OVERRIDE_RESET 0x2
3056
3057 DISPATCH_NOINLINE
3058 static void
3059 _dispatch_block_async_invoke2(dispatch_block_t b, unsigned long invoke_flags)
3060 {
3061 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
3062 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
3063 if (slowpath(atomic_flags & DBF_WAITED)) {
3064 DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
3065 "run more than once and waited for");
3066 }
3067
3068 if (unlikely((dbpd->dbpd_flags &
3069 DISPATCH_BLOCK_IF_LAST_RESET_QUEUE_QOS_OVERRIDE) &&
3070 !(invoke_flags & DISPATCH_BLOCK_ASYNC_INVOKE_NO_OVERRIDE_RESET))) {
3071 dispatch_queue_t dq = _dispatch_get_current_queue();
3072 dispatch_qos_t qos = _dispatch_qos_from_pp(_dispatch_get_priority());
3073 if ((dispatch_wlh_t)dq == _dispatch_get_wlh() && !dq->dq_items_tail) {
3074 _dispatch_block_async_invoke_reset_max_qos(dq, qos);
3075 }
3076 }
3077
3078 if (!slowpath(atomic_flags & DBF_CANCELED)) {
3079 dbpd->dbpd_block();
3080 }
3081 if ((atomic_flags & DBF_PERFORM) == 0) {
3082 if (os_atomic_inc2o(dbpd, dbpd_performed, relaxed) == 1) {
3083 dispatch_group_leave(_dbpd_group(dbpd));
3084 }
3085 }
3086
3087 os_mpsc_queue_t oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
3088 if (oq) {
3089 // balances dispatch_{,barrier_,group_}async
3090 _os_object_release_internal_n_inline(oq->_as_os_obj, 2);
3091 }
3092
3093 if (invoke_flags & DISPATCH_BLOCK_ASYNC_INVOKE_RELEASE) {
3094 Block_release(b);
3095 }
3096 }
3097
3098 static void
3099 _dispatch_block_async_invoke(void *block)
3100 {
3101 _dispatch_block_async_invoke2(block, 0);
3102 }
3103
3104 static void
3105 _dispatch_block_async_invoke_and_release(void *block)
3106 {
3107 _dispatch_block_async_invoke2(block, DISPATCH_BLOCK_ASYNC_INVOKE_RELEASE);
3108 }
3109
3110 static void
3111 _dispatch_block_async_invoke_and_release_mach_barrier(void *block)
3112 {
3113 _dispatch_block_async_invoke2(block, DISPATCH_BLOCK_ASYNC_INVOKE_RELEASE |
3114 DISPATCH_BLOCK_ASYNC_INVOKE_NO_OVERRIDE_RESET);
3115 }
3116
3117 DISPATCH_ALWAYS_INLINE
3118 static inline bool
3119 _dispatch_block_supports_wait_and_cancel(dispatch_block_private_data_t dbpd)
3120 {
3121 return dbpd && !(dbpd->dbpd_flags &
3122 DISPATCH_BLOCK_IF_LAST_RESET_QUEUE_QOS_OVERRIDE);
3123 }
3124
3125 void
3126 dispatch_block_cancel(dispatch_block_t db)
3127 {
3128 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
3129 if (unlikely(!_dispatch_block_supports_wait_and_cancel(dbpd))) {
3130 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
3131 "dispatch_block_cancel()");
3132 }
3133 (void)os_atomic_or2o(dbpd, dbpd_atomic_flags, DBF_CANCELED, relaxed);
3134 }
3135
3136 long
3137 dispatch_block_testcancel(dispatch_block_t db)
3138 {
3139 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
3140 if (unlikely(!_dispatch_block_supports_wait_and_cancel(dbpd))) {
3141 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
3142 "dispatch_block_testcancel()");
3143 }
3144 return (bool)(dbpd->dbpd_atomic_flags & DBF_CANCELED);
3145 }
3146
3147 long
3148 dispatch_block_wait(dispatch_block_t db, dispatch_time_t timeout)
3149 {
3150 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
3151 if (unlikely(!_dispatch_block_supports_wait_and_cancel(dbpd))) {
3152 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
3153 "dispatch_block_wait()");
3154 }
3155
3156 unsigned int flags = os_atomic_or_orig2o(dbpd, dbpd_atomic_flags,
3157 DBF_WAITING, relaxed);
3158 if (slowpath(flags & (DBF_WAITED | DBF_WAITING))) {
3159 DISPATCH_CLIENT_CRASH(flags, "A block object may not be waited for "
3160 "more than once");
3161 }
3162
3163 // <rdar://problem/17703192> If we know the queue where this block is
3164 // enqueued, or the thread that's executing it, then we should boost
3165 // it here.
3166
3167 pthread_priority_t pp = _dispatch_get_priority();
3168
3169 os_mpsc_queue_t boost_oq;
3170 boost_oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
3171 if (boost_oq) {
3172 // release balances dispatch_{,barrier_,group_}async.
3173 // Can't put the queue back in the timeout case: the block might
3174 // finish after we fell out of group_wait and see our NULL, so
3175 // neither of us would ever release. Side effect: After a _wait
3176 // that times out, subsequent waits will not boost the qos of the
3177 // still-running block.
3178 dx_wakeup(boost_oq, _dispatch_qos_from_pp(pp),
3179 DISPATCH_WAKEUP_BLOCK_WAIT | DISPATCH_WAKEUP_CONSUME_2);
3180 }
3181
3182 mach_port_t boost_th = dbpd->dbpd_thread;
3183 if (boost_th) {
3184 _dispatch_thread_override_start(boost_th, pp, dbpd);
3185 }
3186
3187 int performed = os_atomic_load2o(dbpd, dbpd_performed, relaxed);
3188 if (slowpath(performed > 1 || (boost_th && boost_oq))) {
3189 DISPATCH_CLIENT_CRASH(performed, "A block object may not be both "
3190 "run more than once and waited for");
3191 }
3192
3193 long ret = dispatch_group_wait(_dbpd_group(dbpd), timeout);
3194
3195 if (boost_th) {
3196 _dispatch_thread_override_end(boost_th, dbpd);
3197 }
3198
3199 if (ret) {
3200 // timed out: reverse our changes
3201 (void)os_atomic_and2o(dbpd, dbpd_atomic_flags,
3202 ~DBF_WAITING, relaxed);
3203 } else {
3204 (void)os_atomic_or2o(dbpd, dbpd_atomic_flags,
3205 DBF_WAITED, relaxed);
3206 // don't need to re-test here: the second call would see
3207 // the first call's WAITING
3208 }
3209
3210 return ret;
3211 }
3212
3213 void
3214 dispatch_block_notify(dispatch_block_t db, dispatch_queue_t queue,
3215 dispatch_block_t notification_block)
3216 {
3217 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
3218 if (!dbpd) {
3219 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
3220 "dispatch_block_notify()");
3221 }
3222 int performed = os_atomic_load2o(dbpd, dbpd_performed, relaxed);
3223 if (slowpath(performed > 1)) {
3224 DISPATCH_CLIENT_CRASH(performed, "A block object may not be both "
3225 "run more than once and observed");
3226 }
3227
3228 return dispatch_group_notify(_dbpd_group(dbpd), queue, notification_block);
3229 }
3230
3231 DISPATCH_NOINLINE
3232 void
3233 _dispatch_continuation_init_slow(dispatch_continuation_t dc,
3234 dispatch_queue_class_t dqu, dispatch_block_flags_t flags)
3235 {
3236 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(dc->dc_ctxt);
3237 dispatch_block_flags_t block_flags = dbpd->dbpd_flags;
3238 uintptr_t dc_flags = dc->dc_flags;
3239 os_mpsc_queue_t oq = dqu._oq;
3240
3241 // balanced in d_block_async_invoke_and_release or d_block_wait
3242 if (os_atomic_cmpxchg2o(dbpd, dbpd_queue, NULL, oq, relaxed)) {
3243 _os_object_retain_internal_n_inline(oq->_as_os_obj, 2);
3244 }
3245
3246 if (dc_flags & DISPATCH_OBJ_MACH_BARRIER) {
3247 dispatch_assert(dc_flags & DISPATCH_OBJ_CONSUME_BIT);
3248 dc->dc_func = _dispatch_block_async_invoke_and_release_mach_barrier;
3249 } else if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
3250 dc->dc_func = _dispatch_block_async_invoke_and_release;
3251 } else {
3252 dc->dc_func = _dispatch_block_async_invoke;
3253 }
3254
3255 flags |= block_flags;
3256 if (block_flags & DISPATCH_BLOCK_HAS_PRIORITY) {
3257 _dispatch_continuation_priority_set(dc, dbpd->dbpd_priority, flags);
3258 } else {
3259 _dispatch_continuation_priority_set(dc, dc->dc_priority, flags);
3260 }
3261 if (block_flags & DISPATCH_BLOCK_BARRIER) {
3262 dc_flags |= DISPATCH_OBJ_BARRIER_BIT;
3263 }
3264 if (block_flags & DISPATCH_BLOCK_HAS_VOUCHER) {
3265 voucher_t v = dbpd->dbpd_voucher;
3266 dc->dc_voucher = v ? _voucher_retain(v) : NULL;
3267 dc_flags |= DISPATCH_OBJ_ENFORCE_VOUCHER;
3268 _dispatch_voucher_debug("continuation[%p] set", dc->dc_voucher, dc);
3269 _dispatch_voucher_ktrace_dc_push(dc);
3270 } else {
3271 _dispatch_continuation_voucher_set(dc, oq, flags);
3272 }
3273 dc_flags |= DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT;
3274 dc->dc_flags = dc_flags;
3275 }
3276
3277 #endif // __BLOCKS__
3278 #pragma mark -
3279 #pragma mark dispatch_barrier_async
3280
3281 DISPATCH_NOINLINE
3282 static void
3283 _dispatch_async_f_slow(dispatch_queue_t dq, void *ctxt,
3284 dispatch_function_t func, pthread_priority_t pp,
3285 dispatch_block_flags_t flags, uintptr_t dc_flags)
3286 {
3287 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
3288 _dispatch_continuation_init_f(dc, dq, ctxt, func, pp, flags, dc_flags);
3289 _dispatch_continuation_async(dq, dc);
3290 }
3291
3292 DISPATCH_ALWAYS_INLINE
3293 static inline void
3294 _dispatch_barrier_async_f2(dispatch_queue_t dq, void *ctxt,
3295 dispatch_function_t func, pthread_priority_t pp,
3296 dispatch_block_flags_t flags)
3297 {
3298 dispatch_continuation_t dc = _dispatch_continuation_alloc_cacheonly();
3299 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
3300
3301 if (!fastpath(dc)) {
3302 return _dispatch_async_f_slow(dq, ctxt, func, pp, flags, dc_flags);
3303 }
3304
3305 _dispatch_continuation_init_f(dc, dq, ctxt, func, pp, flags, dc_flags);
3306 _dispatch_continuation_push(dq, dc);
3307 }
3308
3309 DISPATCH_NOINLINE
3310 void
3311 dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
3312 dispatch_function_t func)
3313 {
3314 _dispatch_barrier_async_f2(dq, ctxt, func, 0, 0);
3315 }
3316
3317 DISPATCH_NOINLINE
3318 void
3319 _dispatch_barrier_async_detached_f(dispatch_queue_t dq, void *ctxt,
3320 dispatch_function_t func)
3321 {
3322 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3323 dc->dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
3324 dc->dc_func = func;
3325 dc->dc_ctxt = ctxt;
3326 dc->dc_voucher = DISPATCH_NO_VOUCHER;
3327 dc->dc_priority = DISPATCH_NO_PRIORITY;
3328 dx_push(dq, dc, 0);
3329 }
3330
3331 #ifdef __BLOCKS__
3332 void
3333 dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
3334 {
3335 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3336 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
3337
3338 _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
3339 _dispatch_continuation_push(dq, dc);
3340 }
3341 #endif
3342
3343 #pragma mark -
3344 #pragma mark dispatch_async
3345
3346 void
3347 _dispatch_async_redirect_invoke(dispatch_continuation_t dc,
3348 dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags)
3349 {
3350 dispatch_thread_frame_s dtf;
3351 struct dispatch_continuation_s *other_dc = dc->dc_other;
3352 dispatch_invoke_flags_t ctxt_flags = (dispatch_invoke_flags_t)dc->dc_ctxt;
3353 // if we went through _dispatch_root_queue_push_override,
3354 // the "right" root queue was stuffed into dc_func
3355 dispatch_queue_t assumed_rq = (dispatch_queue_t)dc->dc_func;
3356 dispatch_queue_t dq = dc->dc_data, rq, old_dq;
3357 dispatch_priority_t old_dbp;
3358
3359 if (ctxt_flags) {
3360 flags &= ~_DISPATCH_INVOKE_AUTORELEASE_MASK;
3361 flags |= ctxt_flags;
3362 }
3363 old_dq = _dispatch_get_current_queue();
3364 if (assumed_rq) {
3365 old_dbp = _dispatch_root_queue_identity_assume(assumed_rq);
3366 _dispatch_set_basepri(dq->dq_priority);
3367 } else {
3368 old_dbp = _dispatch_set_basepri(dq->dq_priority);
3369 }
3370
3371 _dispatch_thread_frame_push(&dtf, dq);
3372 _dispatch_continuation_pop_forwarded(dc, DISPATCH_NO_VOUCHER,
3373 DISPATCH_OBJ_CONSUME_BIT, {
3374 _dispatch_continuation_pop(other_dc, dic, flags, dq);
3375 });
3376 _dispatch_thread_frame_pop(&dtf);
3377 if (assumed_rq) _dispatch_queue_set_current(old_dq);
3378 _dispatch_reset_basepri(old_dbp);
3379
3380 rq = dq->do_targetq;
3381 while (slowpath(rq->do_targetq) && rq != old_dq) {
3382 _dispatch_queue_non_barrier_complete(rq);
3383 rq = rq->do_targetq;
3384 }
3385
3386 _dispatch_queue_non_barrier_complete(dq);
3387 _dispatch_release_tailcall(dq); // pairs with _dispatch_async_redirect_wrap
3388 }
3389
3390 DISPATCH_ALWAYS_INLINE
3391 static inline dispatch_continuation_t
3392 _dispatch_async_redirect_wrap(dispatch_queue_t dq, dispatch_object_t dou)
3393 {
3394 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3395
3396 dou._do->do_next = NULL;
3397 dc->do_vtable = DC_VTABLE(ASYNC_REDIRECT);
3398 dc->dc_func = NULL;
3399 dc->dc_ctxt = (void *)(uintptr_t)_dispatch_queue_autorelease_frequency(dq);
3400 dc->dc_data = dq;
3401 dc->dc_other = dou._do;
3402 dc->dc_voucher = DISPATCH_NO_VOUCHER;
3403 dc->dc_priority = DISPATCH_NO_PRIORITY;
3404 _dispatch_retain(dq); // released in _dispatch_async_redirect_invoke
3405 return dc;
3406 }
3407
3408 DISPATCH_NOINLINE
3409 static void
3410 _dispatch_async_f_redirect(dispatch_queue_t dq,
3411 dispatch_object_t dou, dispatch_qos_t qos)
3412 {
3413 if (!slowpath(_dispatch_object_is_redirection(dou))) {
3414 dou._dc = _dispatch_async_redirect_wrap(dq, dou);
3415 }
3416 dq = dq->do_targetq;
3417
3418 // Find the queue to redirect to
3419 while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
3420 if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
3421 break;
3422 }
3423 if (!dou._dc->dc_ctxt) {
3424 // find first queue in descending target queue order that has
3425 // an autorelease frequency set, and use that as the frequency for
3426 // this continuation.
3427 dou._dc->dc_ctxt = (void *)
3428 (uintptr_t)_dispatch_queue_autorelease_frequency(dq);
3429 }
3430 dq = dq->do_targetq;
3431 }
3432
3433 dx_push(dq, dou, qos);
3434 }
3435
3436 DISPATCH_ALWAYS_INLINE
3437 static inline void
3438 _dispatch_continuation_redirect(dispatch_queue_t dq,
3439 struct dispatch_object_s *dc)
3440 {
3441 _dispatch_trace_continuation_pop(dq, dc);
3442 // This is a re-redirect, overrides have already been applied
3443 // by _dispatch_async_f2.
3444 // However we want to end up on the root queue matching `dc` qos, so pick up
3445 // the current override of `dq` which includes dc's overrde (and maybe more)
3446 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
3447 _dispatch_async_f_redirect(dq, dc, _dq_state_max_qos(dq_state));
3448 _dispatch_introspection_queue_item_complete(dc);
3449 }
3450
3451 DISPATCH_NOINLINE
3452 static void
3453 _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
3454 {
3455 // <rdar://problem/24738102&24743140> reserving non barrier width
3456 // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
3457 // equivalent), so we have to check that this thread hasn't enqueued
3458 // anything ahead of this call or we can break ordering
3459 if (slowpath(dq->dq_items_tail)) {
3460 return _dispatch_continuation_push(dq, dc);
3461 }
3462
3463 if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {
3464 return _dispatch_continuation_push(dq, dc);
3465 }
3466
3467 return _dispatch_async_f_redirect(dq, dc,
3468 _dispatch_continuation_override_qos(dq, dc));
3469 }
3470
3471 DISPATCH_ALWAYS_INLINE
3472 static inline void
3473 _dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3474 pthread_priority_t pp, dispatch_block_flags_t flags)
3475 {
3476 dispatch_continuation_t dc = _dispatch_continuation_alloc_cacheonly();
3477 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
3478
3479 if (!fastpath(dc)) {
3480 return _dispatch_async_f_slow(dq, ctxt, func, pp, flags, dc_flags);
3481 }
3482
3483 _dispatch_continuation_init_f(dc, dq, ctxt, func, pp, flags, dc_flags);
3484 _dispatch_continuation_async2(dq, dc, false);
3485 }
3486
3487 DISPATCH_NOINLINE
3488 void
3489 dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
3490 {
3491 _dispatch_async_f(dq, ctxt, func, 0, 0);
3492 }
3493
3494 DISPATCH_NOINLINE
3495 void
3496 dispatch_async_enforce_qos_class_f(dispatch_queue_t dq, void *ctxt,
3497 dispatch_function_t func)
3498 {
3499 _dispatch_async_f(dq, ctxt, func, 0, DISPATCH_BLOCK_ENFORCE_QOS_CLASS);
3500 }
3501
3502 #ifdef __BLOCKS__
3503 void
3504 dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
3505 {
3506 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3507 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
3508
3509 _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
3510 _dispatch_continuation_async(dq, dc);
3511 }
3512 #endif
3513
3514 #pragma mark -
3515 #pragma mark dispatch_group_async
3516
3517 DISPATCH_ALWAYS_INLINE
3518 static inline void
3519 _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
3520 dispatch_continuation_t dc)
3521 {
3522 dispatch_group_enter(dg);
3523 dc->dc_data = dg;
3524 _dispatch_continuation_async(dq, dc);
3525 }
3526
3527 DISPATCH_NOINLINE
3528 void
3529 dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
3530 dispatch_function_t func)
3531 {
3532 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3533 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_GROUP_BIT;
3534
3535 _dispatch_continuation_init_f(dc, dq, ctxt, func, 0, 0, dc_flags);
3536 _dispatch_continuation_group_async(dg, dq, dc);
3537 }
3538
3539 #ifdef __BLOCKS__
3540 void
3541 dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
3542 dispatch_block_t db)
3543 {
3544 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3545 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_GROUP_BIT;
3546
3547 _dispatch_continuation_init(dc, dq, db, 0, 0, dc_flags);
3548 _dispatch_continuation_group_async(dg, dq, dc);
3549 }
3550 #endif
3551
3552 #pragma mark -
3553 #pragma mark _dispatch_sync_invoke / _dispatch_sync_complete
3554
3555 DISPATCH_NOINLINE
3556 static void
3557 _dispatch_queue_non_barrier_complete(dispatch_queue_t dq)
3558 {
3559 uint64_t old_state, new_state, owner_self = _dispatch_lock_value_for_self();
3560
3561 // see _dispatch_queue_resume()
3562 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
3563 new_state = old_state - DISPATCH_QUEUE_WIDTH_INTERVAL;
3564 if (unlikely(_dq_state_drain_locked(old_state))) {
3565 // make drain_try_unlock() fail and reconsider whether there's
3566 // enough width now for a new item
3567 new_state |= DISPATCH_QUEUE_DIRTY;
3568 } else if (likely(_dq_state_is_runnable(new_state))) {
3569 uint64_t full_width = new_state;
3570 if (_dq_state_has_pending_barrier(old_state)) {
3571 full_width -= DISPATCH_QUEUE_PENDING_BARRIER;
3572 full_width += DISPATCH_QUEUE_WIDTH_INTERVAL;
3573 full_width += DISPATCH_QUEUE_IN_BARRIER;
3574 } else {
3575 full_width += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
3576 full_width += DISPATCH_QUEUE_IN_BARRIER;
3577 }
3578 if ((full_width & DISPATCH_QUEUE_WIDTH_MASK) ==
3579 DISPATCH_QUEUE_WIDTH_FULL_BIT) {
3580 new_state = full_width;
3581 new_state &= ~DISPATCH_QUEUE_DIRTY;
3582 new_state |= owner_self;
3583 } else if (_dq_state_is_dirty(old_state)) {
3584 new_state |= DISPATCH_QUEUE_ENQUEUED;
3585 }
3586 }
3587 });
3588
3589 if ((old_state ^ new_state) & DISPATCH_QUEUE_IN_BARRIER) {
3590 if (_dq_state_is_dirty(old_state)) {
3591 // <rdar://problem/14637483>
3592 // dependency ordering for dq state changes that were flushed
3593 // and not acted upon
3594 os_atomic_thread_fence(dependency);
3595 dq = os_atomic_force_dependency_on(dq, old_state);
3596 }
3597 return _dispatch_queue_barrier_complete(dq, 0, 0);
3598 }
3599
3600 if ((old_state ^ new_state) & DISPATCH_QUEUE_ENQUEUED) {
3601 _dispatch_retain_2(dq);
3602 dispatch_assert(!_dq_state_is_base_wlh(new_state));
3603 return dx_push(dq->do_targetq, dq, _dq_state_max_qos(new_state));
3604 }
3605 }
3606
3607
3608 DISPATCH_ALWAYS_INLINE
3609 static inline void
3610 _dispatch_sync_function_invoke_inline(dispatch_queue_t dq, void *ctxt,
3611 dispatch_function_t func)
3612 {
3613 dispatch_thread_frame_s dtf;
3614 _dispatch_thread_frame_push(&dtf, dq);
3615 _dispatch_client_callout(ctxt, func);
3616 _dispatch_perfmon_workitem_inc();
3617 _dispatch_thread_frame_pop(&dtf);
3618 }
3619
3620 DISPATCH_NOINLINE
3621 static void
3622 _dispatch_sync_function_invoke(dispatch_queue_t dq, void *ctxt,
3623 dispatch_function_t func)
3624 {
3625 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
3626 }
3627
3628 DISPATCH_NOINLINE
3629 static void
3630 _dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
3631 uintptr_t dc_flags)
3632 {
3633 bool barrier = (dc_flags & DISPATCH_OBJ_BARRIER_BIT);
3634 do {
3635 if (dq == stop_dq) return;
3636 if (barrier) {
3637 _dispatch_queue_barrier_complete(dq, 0, 0);
3638 } else {
3639 _dispatch_queue_non_barrier_complete(dq);
3640 }
3641 dq = dq->do_targetq;
3642 barrier = (dq->dq_width == 1);
3643 } while (unlikely(dq->do_targetq));
3644 }
3645
3646 DISPATCH_NOINLINE
3647 static void
3648 _dispatch_sync_invoke_and_complete_recurse(dispatch_queue_t dq, void *ctxt,
3649 dispatch_function_t func, uintptr_t dc_flags)
3650 {
3651 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
3652 _dispatch_sync_complete_recurse(dq, NULL, dc_flags);
3653 }
3654
3655 DISPATCH_NOINLINE
3656 static void
3657 _dispatch_sync_invoke_and_complete(dispatch_queue_t dq, void *ctxt,
3658 dispatch_function_t func)
3659 {
3660 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
3661 _dispatch_queue_non_barrier_complete(dq);
3662 }
3663
3664 /*
3665 * For queues we can cheat and inline the unlock code, which is invalid
3666 * for objects with a more complex state machine (sources or mach channels)
3667 */
3668 DISPATCH_NOINLINE
3669 static void
3670 _dispatch_queue_barrier_sync_invoke_and_complete(dispatch_queue_t dq,
3671 void *ctxt, dispatch_function_t func)
3672 {
3673 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
3674 if (unlikely(dq->dq_items_tail || dq->dq_width > 1)) {
3675 return _dispatch_queue_barrier_complete(dq, 0, 0);
3676 }
3677
3678 // Presence of any of these bits requires more work that only
3679 // _dispatch_queue_barrier_complete() handles properly
3680 //
3681 // Note: testing for RECEIVED_OVERRIDE or RECEIVED_SYNC_WAIT without
3682 // checking the role is sloppy, but is a super fast check, and neither of
3683 // these bits should be set if the lock was never contended/discovered.
3684 const uint64_t fail_unlock_mask = DISPATCH_QUEUE_SUSPEND_BITS_MASK |
3685 DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_DIRTY |
3686 DISPATCH_QUEUE_RECEIVED_OVERRIDE | DISPATCH_QUEUE_SYNC_TRANSFER |
3687 DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;
3688 uint64_t old_state, new_state;
3689
3690 // similar to _dispatch_queue_drain_try_unlock
3691 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
3692 new_state = old_state - DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
3693 new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
3694 new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
3695 if (unlikely(old_state & fail_unlock_mask)) {
3696 os_atomic_rmw_loop_give_up({
3697 return _dispatch_queue_barrier_complete(dq, 0, 0);
3698 });
3699 }
3700 });
3701 if (_dq_state_is_base_wlh(old_state)) {
3702 _dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
3703 }
3704 }
3705
3706 #pragma mark -
3707 #pragma mark _dispatch_sync_wait / _dispatch_sync_waiter_wake
3708
3709 #define DISPATCH_SYNC_WAITER_NO_UNLOCK (~0ull)
3710
3711 DISPATCH_NOINLINE
3712 static void
3713 _dispatch_sync_waiter_wake(dispatch_sync_context_t dsc,
3714 dispatch_wlh_t wlh, uint64_t old_state, uint64_t new_state)
3715 {
3716 dispatch_wlh_t waiter_wlh = dsc->dc_data;
3717
3718 if (_dq_state_in_sync_transfer(old_state) ||
3719 _dq_state_in_sync_transfer(new_state) ||
3720 (waiter_wlh != DISPATCH_WLH_ANON)) {
3721 _dispatch_event_loop_wake_owner(dsc, wlh, old_state, new_state);
3722 }
3723 if (waiter_wlh == DISPATCH_WLH_ANON) {
3724 if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
3725 _dispatch_wqthread_override_start(dsc->dsc_waiter,
3726 dsc->dsc_override_qos);
3727 }
3728 _dispatch_thread_event_signal(&dsc->dsc_event);
3729 }
3730 _dispatch_introspection_queue_item_complete(dsc->_as_dc);
3731 }
3732
3733 DISPATCH_NOINLINE
3734 static void
3735 _dispatch_sync_waiter_redirect_or_wake(dispatch_queue_t dq, uint64_t owned,
3736 dispatch_object_t dou)
3737 {
3738 dispatch_sync_context_t dsc = (dispatch_sync_context_t)dou._dc;
3739 uint64_t next_owner = 0, old_state, new_state;
3740 dispatch_wlh_t wlh = NULL;
3741
3742 _dispatch_trace_continuation_pop(dq, dsc->_as_dc);
3743
3744 if (owned == DISPATCH_SYNC_WAITER_NO_UNLOCK) {
3745 dispatch_assert(!(dsc->dc_flags & DISPATCH_OBJ_BARRIER_BIT));
3746 new_state = old_state = os_atomic_load2o(dq, dq_state, relaxed);
3747 } else {
3748 if (dsc->dc_flags & DISPATCH_OBJ_BARRIER_BIT) {
3749 next_owner = _dispatch_lock_value_from_tid(dsc->dsc_waiter);
3750 }
3751 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
3752 new_state = old_state - owned;
3753 new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
3754 new_state &= ~DISPATCH_QUEUE_DIRTY;
3755 new_state |= next_owner;
3756 if (_dq_state_is_base_wlh(old_state)) {
3757 new_state |= DISPATCH_QUEUE_SYNC_TRANSFER;
3758 }
3759 });
3760 if (_dq_state_is_base_wlh(old_state)) {
3761 wlh = (dispatch_wlh_t)dq;
3762 } else if (_dq_state_received_override(old_state)) {
3763 // Ensure that the root queue sees that this thread was overridden.
3764 _dispatch_set_basepri_override_qos(_dq_state_max_qos(old_state));
3765 }
3766 }
3767
3768 if (dsc->dc_data == DISPATCH_WLH_ANON) {
3769 if (dsc->dsc_override_qos < _dq_state_max_qos(old_state)) {
3770 dsc->dsc_override_qos = _dq_state_max_qos(old_state);
3771 }
3772 }
3773
3774 if (unlikely(_dq_state_is_inner_queue(old_state))) {
3775 dispatch_queue_t tq = dq->do_targetq;
3776 if (likely(tq->dq_width == 1)) {
3777 dsc->dc_flags = DISPATCH_OBJ_BARRIER_BIT |
3778 DISPATCH_OBJ_SYNC_WAITER_BIT;
3779 } else {
3780 dsc->dc_flags = DISPATCH_OBJ_SYNC_WAITER_BIT;
3781 }
3782 _dispatch_introspection_queue_item_complete(dsc->_as_dc);
3783 return _dispatch_queue_push_sync_waiter(tq, dsc, 0);
3784 }
3785
3786 return _dispatch_sync_waiter_wake(dsc, wlh, old_state, new_state);
3787 }
3788
3789 DISPATCH_NOINLINE
3790 static void
3791 _dispatch_queue_class_barrier_complete(dispatch_queue_t dq, dispatch_qos_t qos,
3792 dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target,
3793 uint64_t owned)
3794 {
3795 uint64_t old_state, new_state, enqueue;
3796 dispatch_queue_t tq;
3797
3798 if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
3799 tq = &_dispatch_mgr_q;
3800 enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
3801 } else if (target) {
3802 tq = (target == DISPATCH_QUEUE_WAKEUP_TARGET) ? dq->do_targetq : target;
3803 enqueue = DISPATCH_QUEUE_ENQUEUED;
3804 } else {
3805 tq = NULL;
3806 enqueue = 0;
3807 }
3808
3809 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
3810 new_state = _dq_state_merge_qos(old_state - owned, qos);
3811 new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
3812 if (unlikely(_dq_state_is_suspended(old_state))) {
3813 if (likely(_dq_state_is_base_wlh(old_state))) {
3814 new_state &= ~DISPATCH_QUEUE_ENQUEUED;
3815 }
3816 } else if (enqueue) {
3817 if (!_dq_state_is_enqueued(old_state)) {
3818 new_state |= enqueue;
3819 }
3820 } else if (unlikely(_dq_state_is_dirty(old_state))) {
3821 os_atomic_rmw_loop_give_up({
3822 // just renew the drain lock with an acquire barrier, to see
3823 // what the enqueuer that set DIRTY has done.
3824 // the xor generates better assembly as DISPATCH_QUEUE_DIRTY
3825 // is already in a register
3826 os_atomic_xor2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, acquire);
3827 flags |= DISPATCH_WAKEUP_BARRIER_COMPLETE;
3828 return dx_wakeup(dq, qos, flags);
3829 });
3830 } else if (likely(_dq_state_is_base_wlh(old_state))) {
3831 new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
3832 new_state &= ~DISPATCH_QUEUE_ENQUEUED;
3833 } else {
3834 new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
3835 }
3836 });
3837 old_state -= owned;
3838 dispatch_assert(_dq_state_drain_locked_by_self(old_state));
3839 dispatch_assert(!_dq_state_is_enqueued_on_manager(old_state));
3840
3841
3842 if (_dq_state_received_override(old_state)) {
3843 // Ensure that the root queue sees that this thread was overridden.
3844 _dispatch_set_basepri_override_qos(_dq_state_max_qos(old_state));
3845 }
3846
3847 if (tq) {
3848 if (likely((old_state ^ new_state) & enqueue)) {
3849 dispatch_assert(_dq_state_is_enqueued(new_state));
3850 dispatch_assert(flags & DISPATCH_WAKEUP_CONSUME_2);
3851 return _dispatch_queue_push_queue(tq, dq, new_state);
3852 }
3853 #if HAVE_PTHREAD_WORKQUEUE_QOS
3854 // <rdar://problem/27694093> when doing sync to async handoff
3855 // if the queue received an override we have to forecefully redrive
3856 // the same override so that a new stealer is enqueued because
3857 // the previous one may be gone already
3858 if (_dq_state_should_override(new_state)) {
3859 return _dispatch_queue_class_wakeup_with_override(dq, new_state,
3860 flags);
3861 }
3862 #endif
3863 }
3864 if (flags & DISPATCH_WAKEUP_CONSUME_2) {
3865 return _dispatch_release_2_tailcall(dq);
3866 }
3867 }
3868
3869 DISPATCH_NOINLINE
3870 static void
3871 _dispatch_queue_barrier_complete(dispatch_queue_t dq, dispatch_qos_t qos,
3872 dispatch_wakeup_flags_t flags)
3873 {
3874 dispatch_continuation_t dc_tmp, dc_start = NULL, dc_end = NULL;
3875 dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
3876 struct dispatch_object_s *dc = NULL;
3877 uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
3878 dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
3879 size_t count = 0;
3880
3881 dispatch_assert(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE);
3882
3883 if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
3884 dc = _dispatch_queue_head(dq);
3885 if (!_dispatch_object_is_sync_waiter(dc)) {
3886 // not a slow item, needs to wake up
3887 } else if (likely(dq->dq_width == 1) ||
3888 _dispatch_object_is_barrier(dc)) {
3889 // rdar://problem/8290662 "barrier/writer lock transfer"
3890 dc_start = dc_end = (dispatch_continuation_t)dc;
3891 owned = 0;
3892 count = 1;
3893 dc = _dispatch_queue_next(dq, dc);
3894 } else {
3895 // <rdar://problem/10164594> "reader lock transfer"
3896 // we must not wake waiters immediately because our right
3897 // for dequeuing is granted through holding the full "barrier" width
3898 // which a signaled work item could relinquish out from our feet
3899 dc_start = (dispatch_continuation_t)dc;
3900 do {
3901 // no check on width here because concurrent queues
3902 // do not respect width for blocked readers, the thread
3903 // is already spent anyway
3904 dc_end = (dispatch_continuation_t)dc;
3905 owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
3906 count++;
3907 dc = _dispatch_queue_next(dq, dc);
3908 } while (dc && _dispatch_object_is_sync_waiter_non_barrier(dc));
3909 }
3910
3911 if (count) {
3912 do {
3913 dc_tmp = dc_start;
3914 dc_start = dc_start->do_next;
3915 _dispatch_sync_waiter_redirect_or_wake(dq, owned, dc_tmp);
3916 owned = DISPATCH_SYNC_WAITER_NO_UNLOCK;
3917 } while (dc_tmp != dc_end);
3918 if (flags & DISPATCH_WAKEUP_CONSUME_2) {
3919 return _dispatch_release_2_tailcall(dq);
3920 }
3921 return;
3922 }
3923 if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
3924 _dispatch_retain_2(dq);
3925 flags |= DISPATCH_WAKEUP_CONSUME_2;
3926 }
3927 target = DISPATCH_QUEUE_WAKEUP_TARGET;
3928 }
3929
3930 return _dispatch_queue_class_barrier_complete(dq, qos, flags, target,owned);
3931 }
3932
3933 #if DISPATCH_COCOA_COMPAT
3934 static void
3935 _dispatch_sync_thread_bound_invoke(void *ctxt)
3936 {
3937 dispatch_sync_context_t dsc = ctxt;
3938 dispatch_queue_t cq = _dispatch_queue_get_current();
3939 dispatch_queue_t orig_dq = dsc->dc_other;
3940 dispatch_thread_frame_s dtf;
3941 dispatch_assert(_dispatch_queue_is_thread_bound(cq));
3942
3943 // the block runs on the thread the queue is bound to and not
3944 // on the calling thread, but we mean to see the calling thread
3945 // dispatch thread frames, so we fake the link, and then undo it
3946 _dispatch_thread_frame_push_and_rebase(&dtf, orig_dq, &dsc->dsc_dtf);
3947 _dispatch_client_callout(dsc->dsc_ctxt, dsc->dsc_func);
3948 _dispatch_thread_frame_pop(&dtf);
3949
3950 // communicate back to _dispatch_sync_wait who the thread bound queue
3951 // was so that we skip it during _dispatch_sync_complete_recurse
3952 dsc->dc_other = cq;
3953 dsc->dsc_func = NULL;
3954 _dispatch_thread_event_signal(&dsc->dsc_event); // release
3955 }
3956 #endif
3957
3958 DISPATCH_ALWAYS_INLINE
3959 static inline uint64_t
3960 _dispatch_sync_wait_prepare(dispatch_queue_t dq)
3961 {
3962 uint64_t old_state, new_state;
3963
3964 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
3965 if (_dq_state_is_suspended(old_state) ||
3966 !_dq_state_is_base_wlh(old_state)) {
3967 os_atomic_rmw_loop_give_up(return old_state);
3968 }
3969 if (!_dq_state_drain_locked(old_state) ||
3970 _dq_state_in_sync_transfer(old_state)) {
3971 os_atomic_rmw_loop_give_up(return old_state);
3972 }
3973 new_state = old_state | DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;
3974 });
3975 return new_state;
3976 }
3977
3978 static void
3979 _dispatch_sync_waiter_compute_wlh(dispatch_queue_t dq,
3980 dispatch_sync_context_t dsc)
3981 {
3982 bool needs_locking = _dispatch_queue_is_legacy(dq);
3983
3984 if (needs_locking) {
3985 dsc->dsc_release_storage = true;
3986 _dispatch_queue_sidelock_lock(dq);
3987 }
3988
3989 dispatch_queue_t tq = dq->do_targetq;
3990 uint64_t dq_state = _dispatch_sync_wait_prepare(tq);
3991
3992 if (_dq_state_is_suspended(dq_state) ||
3993 _dq_state_is_base_anon(dq_state)) {
3994 dsc->dsc_release_storage = false;
3995 dsc->dc_data = DISPATCH_WLH_ANON;
3996 } else if (_dq_state_is_base_wlh(dq_state)) {
3997 if (dsc->dsc_release_storage) {
3998 _dispatch_queue_retain_storage(tq);
3999 }
4000 dsc->dc_data = (dispatch_wlh_t)tq;
4001 } else {
4002 _dispatch_sync_waiter_compute_wlh(tq, dsc);
4003 }
4004 if (needs_locking) _dispatch_queue_sidelock_unlock(dq);
4005 }
4006
4007 DISPATCH_NOINLINE
4008 static void
4009 _dispatch_sync_wait(dispatch_queue_t top_dq, void *ctxt,
4010 dispatch_function_t func, uintptr_t top_dc_flags,
4011 dispatch_queue_t dq, uintptr_t dc_flags)
4012 {
4013 pthread_priority_t pp = _dispatch_get_priority();
4014 dispatch_tid tid = _dispatch_tid_self();
4015 dispatch_qos_t qos;
4016 uint64_t dq_state;
4017
4018 dq_state = _dispatch_sync_wait_prepare(dq);
4019 if (unlikely(_dq_state_drain_locked_by(dq_state, tid))) {
4020 DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
4021 "dispatch_sync called on queue "
4022 "already owned by current thread");
4023 }
4024
4025 struct dispatch_sync_context_s dsc = {
4026 .dc_flags = dc_flags | DISPATCH_OBJ_SYNC_WAITER_BIT,
4027 .dc_other = top_dq,
4028 .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
4029 .dc_voucher = DISPATCH_NO_VOUCHER,
4030 .dsc_func = func,
4031 .dsc_ctxt = ctxt,
4032 .dsc_waiter = tid,
4033 };
4034 if (_dq_state_is_suspended(dq_state) ||
4035 _dq_state_is_base_anon(dq_state)) {
4036 dsc.dc_data = DISPATCH_WLH_ANON;
4037 } else if (_dq_state_is_base_wlh(dq_state)) {
4038 dsc.dc_data = (dispatch_wlh_t)dq;
4039 } else {
4040 _dispatch_sync_waiter_compute_wlh(dq, &dsc);
4041 }
4042 #if DISPATCH_COCOA_COMPAT
4043 // It's preferred to execute synchronous blocks on the current thread
4044 // due to thread-local side effects, etc. However, blocks submitted
4045 // to the main thread MUST be run on the main thread
4046 //
4047 // Since we don't know whether that will happen, save the frame linkage
4048 // for the sake of _dispatch_sync_thread_bound_invoke
4049 _dispatch_thread_frame_save_state(&dsc.dsc_dtf);
4050
4051 // Since the continuation doesn't have the CONSUME bit, the voucher will be
4052 // retained on adoption on the thread bound queue if it happens so we can
4053 // borrow this thread's reference
4054 dsc.dc_voucher = _voucher_get();
4055 dsc.dc_func = _dispatch_sync_thread_bound_invoke;
4056 dsc.dc_ctxt = &dsc;
4057 #endif
4058
4059 if (dsc.dc_data == DISPATCH_WLH_ANON) {
4060 dsc.dsc_override_qos_floor = dsc.dsc_override_qos =
4061 _dispatch_get_basepri_override_qos_floor();
4062 qos = _dispatch_qos_from_pp(pp);
4063 _dispatch_thread_event_init(&dsc.dsc_event);
4064 } else {
4065 qos = 0;
4066 }
4067 _dispatch_queue_push_sync_waiter(dq, &dsc, qos);
4068 if (dsc.dc_data == DISPATCH_WLH_ANON) {
4069 _dispatch_thread_event_wait(&dsc.dsc_event); // acquire
4070 _dispatch_thread_event_destroy(&dsc.dsc_event);
4071 // If _dispatch_sync_waiter_wake() gave this thread an override,
4072 // ensure that the root queue sees it.
4073 if (dsc.dsc_override_qos > dsc.dsc_override_qos_floor) {
4074 _dispatch_set_basepri_override_qos(dsc.dsc_override_qos);
4075 }
4076 } else {
4077 _dispatch_event_loop_wait_for_ownership(&dsc);
4078 }
4079 _dispatch_introspection_sync_begin(top_dq);
4080 #if DISPATCH_COCOA_COMPAT
4081 if (unlikely(dsc.dsc_func == NULL)) {
4082 // Queue bound to a non-dispatch thread, the continuation already ran
4083 // so just unlock all the things, except for the thread bound queue
4084 dispatch_queue_t bound_dq = dsc.dc_other;
4085 return _dispatch_sync_complete_recurse(top_dq, bound_dq, top_dc_flags);
4086 }
4087 #endif
4088 _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags);
4089 }
4090
4091 DISPATCH_NOINLINE
4092 static void
4093 _dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt,
4094 dispatch_function_t func, uintptr_t dc_flags)
4095 {
4096 if (unlikely(!dq->do_targetq)) {
4097 return _dispatch_sync_function_invoke(dq, ctxt, func);
4098 }
4099 _dispatch_sync_wait(dq, ctxt, func, dc_flags, dq, dc_flags);
4100 }
4101
4102 #pragma mark -
4103 #pragma mark dispatch_sync / dispatch_barrier_sync
4104
4105 DISPATCH_NOINLINE
4106 static void
4107 _dispatch_sync_recurse(dispatch_queue_t dq, void *ctxt,
4108 dispatch_function_t func, uintptr_t dc_flags)
4109 {
4110 dispatch_tid tid = _dispatch_tid_self();
4111 dispatch_queue_t tq = dq->do_targetq;
4112
4113 do {
4114 if (likely(tq->dq_width == 1)) {
4115 if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(tq, tid))) {
4116 return _dispatch_sync_wait(dq, ctxt, func, dc_flags, tq,
4117 DISPATCH_OBJ_BARRIER_BIT);
4118 }
4119 } else {
4120 if (unlikely(!_dispatch_queue_try_reserve_sync_width(tq))) {
4121 return _dispatch_sync_wait(dq, ctxt, func, dc_flags, tq, 0);
4122 }
4123 }
4124 tq = tq->do_targetq;
4125 } while (unlikely(tq->do_targetq));
4126
4127 return _dispatch_sync_invoke_and_complete_recurse(dq, ctxt, func, dc_flags);
4128 }
4129
4130 DISPATCH_NOINLINE
4131 void
4132 dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
4133 dispatch_function_t func)
4134 {
4135 dispatch_tid tid = _dispatch_tid_self();
4136
4137 // The more correct thing to do would be to merge the qos of the thread
4138 // that just acquired the barrier lock into the queue state.
4139 //
4140 // However this is too expensive for the fastpath, so skip doing it.
4141 // The chosen tradeoff is that if an enqueue on a lower priority thread
4142 // contends with this fastpath, this thread may receive a useless override.
4143 //
4144 // Global concurrent queues and queues bound to non-dispatch threads
4145 // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
4146 if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dq, tid))) {
4147 return _dispatch_sync_f_slow(dq, ctxt, func, DISPATCH_OBJ_BARRIER_BIT);
4148 }
4149
4150 _dispatch_introspection_sync_begin(dq);
4151 if (unlikely(dq->do_targetq->do_targetq)) {
4152 return _dispatch_sync_recurse(dq, ctxt, func, DISPATCH_OBJ_BARRIER_BIT);
4153 }
4154 _dispatch_queue_barrier_sync_invoke_and_complete(dq, ctxt, func);
4155 }
4156
4157 DISPATCH_NOINLINE
4158 void
4159 dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
4160 {
4161 if (likely(dq->dq_width == 1)) {
4162 return dispatch_barrier_sync_f(dq, ctxt, func);
4163 }
4164
4165 // Global concurrent queues and queues bound to non-dispatch threads
4166 // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
4167 if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq))) {
4168 return _dispatch_sync_f_slow(dq, ctxt, func, 0);
4169 }
4170
4171 _dispatch_introspection_sync_begin(dq);
4172 if (unlikely(dq->do_targetq->do_targetq)) {
4173 return _dispatch_sync_recurse(dq, ctxt, func, 0);
4174 }
4175 _dispatch_sync_invoke_and_complete(dq, ctxt, func);
4176 }
4177
4178 #ifdef __BLOCKS__
4179 DISPATCH_NOINLINE
4180 static void
4181 _dispatch_sync_block_with_private_data(dispatch_queue_t dq,
4182 dispatch_block_t work, dispatch_block_flags_t flags)
4183 {
4184 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(work);
4185 pthread_priority_t op = 0, p = 0;
4186
4187 flags |= dbpd->dbpd_flags;
4188 op = _dispatch_block_invoke_should_set_priority(flags, dbpd->dbpd_priority);
4189 if (op) {
4190 p = dbpd->dbpd_priority;
4191 }
4192 voucher_t ov, v = DISPATCH_NO_VOUCHER;
4193 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
4194 v = dbpd->dbpd_voucher;
4195 }
4196 ov = _dispatch_set_priority_and_voucher(p, v, 0);
4197
4198 // balanced in d_block_sync_invoke or d_block_wait
4199 if (os_atomic_cmpxchg2o(dbpd, dbpd_queue, NULL, dq->_as_oq, relaxed)) {
4200 _dispatch_retain_2(dq);
4201 }
4202 if (flags & DISPATCH_BLOCK_BARRIER) {
4203 dispatch_barrier_sync_f(dq, work, _dispatch_block_sync_invoke);
4204 } else {
4205 dispatch_sync_f(dq, work, _dispatch_block_sync_invoke);
4206 }
4207 _dispatch_reset_priority_and_voucher(op, ov);
4208 }
4209
4210 void
4211 dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
4212 {
4213 if (unlikely(_dispatch_block_has_private_data(work))) {
4214 dispatch_block_flags_t flags = DISPATCH_BLOCK_BARRIER;
4215 return _dispatch_sync_block_with_private_data(dq, work, flags);
4216 }
4217 dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work));
4218 }
4219
4220 void
4221 dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
4222 {
4223 if (unlikely(_dispatch_block_has_private_data(work))) {
4224 return _dispatch_sync_block_with_private_data(dq, work, 0);
4225 }
4226 dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
4227 }
4228 #endif // __BLOCKS__
4229
4230 #pragma mark -
4231 #pragma mark dispatch_trysync
4232
4233 DISPATCH_NOINLINE
4234 static void
4235 _dispatch_barrier_trysync_or_async_f_complete(dispatch_queue_t dq,
4236 void *ctxt, dispatch_function_t func, uint32_t flags)
4237 {
4238 dispatch_wakeup_flags_t wflags = DISPATCH_WAKEUP_BARRIER_COMPLETE;
4239
4240 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
4241 if (flags & DISPATCH_BARRIER_TRYSYNC_SUSPEND) {
4242 uint64_t dq_state = os_atomic_sub2o(dq, dq_state,
4243 DISPATCH_QUEUE_SUSPEND_INTERVAL, relaxed);
4244 if (!_dq_state_is_suspended(dq_state)) {
4245 wflags |= DISPATCH_WAKEUP_CONSUME_2;
4246 }
4247 }
4248 dx_wakeup(dq, 0, wflags);
4249 }
4250
4251 // Use for mutation of queue-/source-internal state only
4252 // ignores target queue hierarchy!
4253 DISPATCH_NOINLINE
4254 void
4255 _dispatch_barrier_trysync_or_async_f(dispatch_queue_t dq, void *ctxt,
4256 dispatch_function_t func, uint32_t flags)
4257 {
4258 dispatch_tid tid = _dispatch_tid_self();
4259 uint64_t suspend_count = (flags & DISPATCH_BARRIER_TRYSYNC_SUSPEND) ? 1 : 0;
4260 if (unlikely(!_dispatch_queue_try_acquire_barrier_sync_and_suspend(dq, tid,
4261 suspend_count))) {
4262 return _dispatch_barrier_async_detached_f(dq, ctxt, func);
4263 }
4264 if (flags & DISPATCH_BARRIER_TRYSYNC_SUSPEND) {
4265 _dispatch_retain_2(dq); // see _dispatch_queue_suspend
4266 }
4267 _dispatch_barrier_trysync_or_async_f_complete(dq, ctxt, func, flags);
4268 }
4269
4270 DISPATCH_NOINLINE
4271 static long
4272 _dispatch_trysync_recurse(dispatch_queue_t dq, void *ctxt,
4273 dispatch_function_t f, uintptr_t dc_flags)
4274 {
4275 dispatch_tid tid = _dispatch_tid_self();
4276 dispatch_queue_t q, tq = dq->do_targetq;
4277
4278 for (;;) {
4279 if (likely(tq->do_targetq == NULL)) {
4280 _dispatch_sync_invoke_and_complete_recurse(dq, ctxt, f, dc_flags);
4281 return true;
4282 }
4283 if (unlikely(_dispatch_queue_cannot_trysync(tq))) {
4284 for (q = dq; q != tq; q = q->do_targetq) {
4285 _dispatch_queue_atomic_flags_set(q, DQF_CANNOT_TRYSYNC);
4286 }
4287 break;
4288 }
4289 if (likely(tq->dq_width == 1)) {
4290 if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(tq, tid))) {
4291 break;
4292 }
4293 } else {
4294 if (unlikely(!_dispatch_queue_try_reserve_sync_width(tq))) {
4295 break;
4296 }
4297 }
4298 tq = tq->do_targetq;
4299 }
4300
4301 _dispatch_sync_complete_recurse(dq, tq, dc_flags);
4302 return false;
4303 }
4304
4305 DISPATCH_NOINLINE
4306 long
4307 _dispatch_barrier_trysync_f(dispatch_queue_t dq, void *ctxt,
4308 dispatch_function_t f)
4309 {
4310 dispatch_tid tid = _dispatch_tid_self();
4311 if (unlikely(!dq->do_targetq)) {
4312 DISPATCH_CLIENT_CRASH(dq, "_dispatch_trsync called on a root queue");
4313 }
4314 if (unlikely(_dispatch_queue_cannot_trysync(dq))) {
4315 return false;
4316 }
4317 if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dq, tid))) {
4318 return false;
4319 }
4320 return _dispatch_trysync_recurse(dq, ctxt, f, DISPATCH_OBJ_BARRIER_BIT);
4321 }
4322
4323 DISPATCH_NOINLINE
4324 long
4325 _dispatch_trysync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t f)
4326 {
4327 if (likely(dq->dq_width == 1)) {
4328 return _dispatch_barrier_trysync_f(dq, ctxt, f);
4329 }
4330 if (unlikely(!dq->do_targetq)) {
4331 DISPATCH_CLIENT_CRASH(dq, "_dispatch_trsync called on a root queue");
4332 }
4333 if (unlikely(_dispatch_queue_cannot_trysync(dq))) {
4334 return false;
4335 }
4336 if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq))) {
4337 return false;
4338 }
4339 return _dispatch_trysync_recurse(dq, ctxt, f, 0);
4340 }
4341
4342 #pragma mark -
4343 #pragma mark dispatch_queue_wakeup
4344
4345 DISPATCH_NOINLINE
4346 void
4347 _dispatch_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
4348 dispatch_wakeup_flags_t flags)
4349 {
4350 dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
4351
4352 if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
4353 return _dispatch_queue_barrier_complete(dq, qos, flags);
4354 }
4355 if (_dispatch_queue_class_probe(dq)) {
4356 target = DISPATCH_QUEUE_WAKEUP_TARGET;
4357 }
4358 return _dispatch_queue_class_wakeup(dq, qos, flags, target);
4359 }
4360
4361 #if DISPATCH_COCOA_COMPAT
4362 DISPATCH_ALWAYS_INLINE
4363 static inline bool
4364 _dispatch_runloop_handle_is_valid(dispatch_runloop_handle_t handle)
4365 {
4366 #if TARGET_OS_MAC
4367 return MACH_PORT_VALID(handle);
4368 #elif defined(__linux__)
4369 return handle >= 0;
4370 #else
4371 #error "runloop support not implemented on this platform"
4372 #endif
4373 }
4374
4375 DISPATCH_ALWAYS_INLINE
4376 static inline dispatch_runloop_handle_t
4377 _dispatch_runloop_queue_get_handle(dispatch_queue_t dq)
4378 {
4379 #if TARGET_OS_MAC
4380 return ((dispatch_runloop_handle_t)(uintptr_t)dq->do_ctxt);
4381 #elif defined(__linux__)
4382 // decode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4383 return ((dispatch_runloop_handle_t)(uintptr_t)dq->do_ctxt) - 1;
4384 #else
4385 #error "runloop support not implemented on this platform"
4386 #endif
4387 }
4388
4389 DISPATCH_ALWAYS_INLINE
4390 static inline void
4391 _dispatch_runloop_queue_set_handle(dispatch_queue_t dq, dispatch_runloop_handle_t handle)
4392 {
4393 #if TARGET_OS_MAC
4394 dq->do_ctxt = (void *)(uintptr_t)handle;
4395 #elif defined(__linux__)
4396 // encode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4397 dq->do_ctxt = (void *)(uintptr_t)(handle + 1);
4398 #else
4399 #error "runloop support not implemented on this platform"
4400 #endif
4401 }
4402 #endif // DISPATCH_COCOA_COMPAT
4403
4404 DISPATCH_ALWAYS_INLINE
4405 static inline dispatch_qos_t
4406 _dispatch_runloop_queue_reset_max_qos(dispatch_queue_class_t dqu)
4407 {
4408 uint64_t old_state, clear_bits = DISPATCH_QUEUE_MAX_QOS_MASK |
4409 DISPATCH_QUEUE_RECEIVED_OVERRIDE;
4410 old_state = os_atomic_and_orig2o(dqu._dq, dq_state, ~clear_bits, relaxed);
4411 return _dq_state_max_qos(old_state);
4412 }
4413
4414 void
4415 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
4416 dispatch_wakeup_flags_t flags)
4417 {
4418 #if DISPATCH_COCOA_COMPAT
4419 if (slowpath(_dispatch_queue_atomic_flags(dq) & DQF_RELEASED)) {
4420 // <rdar://problem/14026816>
4421 return _dispatch_queue_wakeup(dq, qos, flags);
4422 }
4423
4424 if (flags & DISPATCH_WAKEUP_MAKE_DIRTY) {
4425 os_atomic_or2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, release);
4426 }
4427 if (_dispatch_queue_class_probe(dq)) {
4428 return _dispatch_runloop_queue_poke(dq, qos, flags);
4429 }
4430
4431 qos = _dispatch_runloop_queue_reset_max_qos(dq);
4432 if (qos) {
4433 mach_port_t owner = DISPATCH_QUEUE_DRAIN_OWNER(dq);
4434 if (_dispatch_queue_class_probe(dq)) {
4435 _dispatch_runloop_queue_poke(dq, qos, flags);
4436 }
4437 _dispatch_thread_override_end(owner, dq);
4438 return;
4439 }
4440 if (flags & DISPATCH_WAKEUP_CONSUME_2) {
4441 return _dispatch_release_2_tailcall(dq);
4442 }
4443 #else
4444 return _dispatch_queue_wakeup(dq, qos, flags);
4445 #endif
4446 }
4447
4448 void
4449 _dispatch_main_queue_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
4450 dispatch_wakeup_flags_t flags)
4451 {
4452 #if DISPATCH_COCOA_COMPAT
4453 if (_dispatch_queue_is_thread_bound(dq)) {
4454 return _dispatch_runloop_queue_wakeup(dq, qos, flags);
4455 }
4456 #endif
4457 return _dispatch_queue_wakeup(dq, qos, flags);
4458 }
4459
4460 #pragma mark -
4461 #pragma mark dispatch root queues poke
4462
4463 #if DISPATCH_COCOA_COMPAT
4464 static inline void
4465 _dispatch_runloop_queue_class_poke(dispatch_queue_t dq)
4466 {
4467 dispatch_runloop_handle_t handle = _dispatch_runloop_queue_get_handle(dq);
4468 if (!_dispatch_runloop_handle_is_valid(handle)) {
4469 return;
4470 }
4471
4472 #if HAVE_MACH
4473 mach_port_t mp = handle;
4474 kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0);
4475 switch (kr) {
4476 case MACH_SEND_TIMEOUT:
4477 case MACH_SEND_TIMED_OUT:
4478 case MACH_SEND_INVALID_DEST:
4479 break;
4480 default:
4481 (void)dispatch_assume_zero(kr);
4482 break;
4483 }
4484 #elif defined(__linux__)
4485 int result;
4486 do {
4487 result = eventfd_write(handle, 1);
4488 } while (result == -1 && errno == EINTR);
4489 (void)dispatch_assume_zero(result);
4490 #else
4491 #error "runloop support not implemented on this platform"
4492 #endif
4493 }
4494
4495 DISPATCH_NOINLINE
4496 static void
4497 _dispatch_runloop_queue_poke(dispatch_queue_t dq, dispatch_qos_t qos,
4498 dispatch_wakeup_flags_t flags)
4499 {
4500 // it's not useful to handle WAKEUP_MAKE_DIRTY because mach_msg() will have
4501 // a release barrier and that when runloop queues stop being thread-bound
4502 // they have a non optional wake-up to start being a "normal" queue
4503 // either in _dispatch_runloop_queue_xref_dispose,
4504 // or in _dispatch_queue_cleanup2() for the main thread.
4505 uint64_t old_state, new_state;
4506
4507 if (dq == &_dispatch_main_q) {
4508 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
4509 _dispatch_runloop_queue_handle_init);
4510 }
4511
4512 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
4513 new_state = _dq_state_merge_qos(old_state, qos);
4514 if (old_state == new_state) {
4515 os_atomic_rmw_loop_give_up(goto no_change);
4516 }
4517 });
4518
4519 dispatch_qos_t dq_qos = _dispatch_priority_qos(dq->dq_priority);
4520 if (qos > dq_qos) {
4521 mach_port_t owner = _dq_state_drain_owner(new_state);
4522 pthread_priority_t pp = _dispatch_qos_to_pp(qos);
4523 _dispatch_thread_override_start(owner, pp, dq);
4524 if (_dq_state_max_qos(old_state) > dq_qos) {
4525 _dispatch_thread_override_end(owner, dq);
4526 }
4527 }
4528 no_change:
4529 _dispatch_runloop_queue_class_poke(dq);
4530 if (flags & DISPATCH_WAKEUP_CONSUME_2) {
4531 return _dispatch_release_2_tailcall(dq);
4532 }
4533 }
4534 #endif
4535
4536 DISPATCH_NOINLINE
4537 static void
4538 _dispatch_global_queue_poke_slow(dispatch_queue_t dq, int n, int floor)
4539 {
4540 dispatch_root_queue_context_t qc = dq->do_ctxt;
4541 int remaining = n;
4542 int r = ENOSYS;
4543
4544 _dispatch_root_queues_init();
4545 _dispatch_debug_root_queue(dq, __func__);
4546 #if DISPATCH_USE_WORKQUEUES
4547 #if DISPATCH_USE_PTHREAD_POOL
4548 if (qc->dgq_kworkqueue != (void*)(~0ul))
4549 #endif
4550 {
4551 _dispatch_root_queue_debug("requesting new worker thread for global "
4552 "queue: %p", dq);
4553 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4554 if (qc->dgq_kworkqueue) {
4555 pthread_workitem_handle_t wh;
4556 unsigned int gen_cnt;
4557 do {
4558 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
4559 _dispatch_worker_thread4, dq, &wh, &gen_cnt);
4560 (void)dispatch_assume_zero(r);
4561 } while (--remaining);
4562 return;
4563 }
4564 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4565 #if HAVE_PTHREAD_WORKQUEUE_QOS
4566 r = _pthread_workqueue_addthreads(remaining,
4567 _dispatch_priority_to_pp(dq->dq_priority));
4568 #elif DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
4569 r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
4570 qc->dgq_wq_options, remaining);
4571 #endif
4572 (void)dispatch_assume_zero(r);
4573 return;
4574 }
4575 #endif // DISPATCH_USE_WORKQUEUES
4576 #if DISPATCH_USE_PTHREAD_POOL
4577 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
4578 if (fastpath(pqc->dpq_thread_mediator.do_vtable)) {
4579 while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
4580 _dispatch_root_queue_debug("signaled sleeping worker for "
4581 "global queue: %p", dq);
4582 if (!--remaining) {
4583 return;
4584 }
4585 }
4586 }
4587
4588 bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
4589 if (overcommit) {
4590 os_atomic_add2o(qc, dgq_pending, remaining, relaxed);
4591 } else {
4592 if (!os_atomic_cmpxchg2o(qc, dgq_pending, 0, remaining, relaxed)) {
4593 _dispatch_root_queue_debug("worker thread request still pending for "
4594 "global queue: %p", dq);
4595 return;
4596 }
4597 }
4598
4599 int32_t can_request, t_count;
4600 // seq_cst with atomic store to tail <rdar://problem/16932833>
4601 t_count = os_atomic_load2o(qc, dgq_thread_pool_size, ordered);
4602 do {
4603 can_request = t_count < floor ? 0 : t_count - floor;
4604 if (remaining > can_request) {
4605 _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
4606 remaining, can_request);
4607 os_atomic_sub2o(qc, dgq_pending, remaining - can_request, relaxed);
4608 remaining = can_request;
4609 }
4610 if (remaining == 0) {
4611 _dispatch_root_queue_debug("pthread pool is full for root queue: "
4612 "%p", dq);
4613 return;
4614 }
4615 } while (!os_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count,
4616 t_count - remaining, &t_count, acquire));
4617
4618 pthread_attr_t *attr = &pqc->dpq_thread_attr;
4619 pthread_t tid, *pthr = &tid;
4620 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
4621 if (slowpath(dq == &_dispatch_mgr_root_queue)) {
4622 pthr = _dispatch_mgr_root_queue_init();
4623 }
4624 #endif
4625 do {
4626 _dispatch_retain(dq); // released in _dispatch_worker_thread
4627 while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
4628 if (r != EAGAIN) {
4629 (void)dispatch_assume_zero(r);
4630 }
4631 _dispatch_temporary_resource_shortage();
4632 }
4633 } while (--remaining);
4634 #endif // DISPATCH_USE_PTHREAD_POOL
4635 }
4636
4637 DISPATCH_NOINLINE
4638 void
4639 _dispatch_global_queue_poke(dispatch_queue_t dq, int n, int floor)
4640 {
4641 if (!_dispatch_queue_class_probe(dq)) {
4642 return;
4643 }
4644 #if DISPATCH_USE_WORKQUEUES
4645 dispatch_root_queue_context_t qc = dq->do_ctxt;
4646 if (
4647 #if DISPATCH_USE_PTHREAD_POOL
4648 (qc->dgq_kworkqueue != (void*)(~0ul)) &&
4649 #endif
4650 !os_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) {
4651 _dispatch_root_queue_debug("worker thread request still pending for "
4652 "global queue: %p", dq);
4653 return;
4654 }
4655 #endif // DISPATCH_USE_WORKQUEUES
4656 return _dispatch_global_queue_poke_slow(dq, n, floor);
4657 }
4658
4659 #pragma mark -
4660 #pragma mark dispatch_queue_drain
4661
4662 void
4663 _dispatch_continuation_pop(dispatch_object_t dou, dispatch_invoke_context_t dic,
4664 dispatch_invoke_flags_t flags, dispatch_queue_t dq)
4665 {
4666 _dispatch_continuation_pop_inline(dou, dic, flags, dq);
4667 }
4668
4669 void
4670 _dispatch_continuation_invoke(dispatch_object_t dou, voucher_t ov,
4671 dispatch_invoke_flags_t flags)
4672 {
4673 _dispatch_continuation_invoke_inline(dou, ov, flags);
4674 }
4675
4676 DISPATCH_NOINLINE
4677 static void
4678 _dispatch_return_to_kernel(void)
4679 {
4680 if (unlikely(_dispatch_get_wlh() == DISPATCH_WLH_ANON)) {
4681 _dispatch_clear_return_to_kernel();
4682 } else {
4683 _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
4684 }
4685 }
4686
4687 void
4688 _dispatch_poll_for_events_4launchd(void)
4689 {
4690 #if DISPATCH_USE_KEVENT_WORKQUEUE
4691 if (_dispatch_get_wlh()) {
4692 dispatch_assert(_dispatch_deferred_items_get()->ddi_wlh_servicing);
4693 _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
4694 }
4695 #endif
4696 }
4697
4698 #if HAVE_PTHREAD_WORKQUEUE_NARROWING
4699 static os_atomic(uint64_t) _dispatch_narrowing_deadlines[DISPATCH_QOS_MAX];
4700 #if !DISPATCH_TIME_UNIT_USES_NANOSECONDS
4701 static uint64_t _dispatch_narrow_check_interval_cache;
4702 #endif
4703
4704 DISPATCH_ALWAYS_INLINE
4705 static inline uint64_t
4706 _dispatch_narrow_check_interval(void)
4707 {
4708 #if DISPATCH_TIME_UNIT_USES_NANOSECONDS
4709 return 50 * NSEC_PER_MSEC;
4710 #else
4711 if (_dispatch_narrow_check_interval_cache == 0) {
4712 _dispatch_narrow_check_interval_cache =
4713 _dispatch_time_nano2mach(50 * NSEC_PER_MSEC);
4714 }
4715 return _dispatch_narrow_check_interval_cache;
4716 #endif
4717 }
4718
4719 DISPATCH_ALWAYS_INLINE
4720 static inline void
4721 _dispatch_queue_drain_init_narrowing_check_deadline(dispatch_invoke_context_t dic,
4722 dispatch_priority_t pri)
4723 {
4724 if (_dispatch_priority_qos(pri) &&
4725 !(pri & DISPATCH_PRIORITY_FLAG_OVERCOMMIT)) {
4726 dic->dic_next_narrow_check = _dispatch_approximate_time() +
4727 _dispatch_narrow_check_interval();
4728 }
4729 }
4730
4731 DISPATCH_NOINLINE
4732 static bool
4733 _dispatch_queue_drain_should_narrow_slow(uint64_t now,
4734 dispatch_invoke_context_t dic)
4735 {
4736 if (dic->dic_next_narrow_check != DISPATCH_THREAD_IS_NARROWING) {
4737 pthread_priority_t pp = _dispatch_get_priority();
4738 dispatch_qos_t qos = _dispatch_qos_from_pp(pp);
4739 if (unlikely(!qos || qos > countof(_dispatch_narrowing_deadlines))) {
4740 DISPATCH_CLIENT_CRASH(pp, "Thread QoS corruption");
4741 }
4742 size_t idx = qos - 1; // no entry needed for DISPATCH_QOS_UNSPECIFIED
4743 os_atomic(uint64_t) *deadline = &_dispatch_narrowing_deadlines[idx];
4744 uint64_t oldval, newval = now + _dispatch_narrow_check_interval();
4745
4746 dic->dic_next_narrow_check = newval;
4747 os_atomic_rmw_loop(deadline, oldval, newval, relaxed, {
4748 if (now < oldval) {
4749 os_atomic_rmw_loop_give_up(return false);
4750 }
4751 });
4752
4753 if (!_pthread_workqueue_should_narrow(pp)) {
4754 return false;
4755 }
4756 dic->dic_next_narrow_check = DISPATCH_THREAD_IS_NARROWING;
4757 }
4758 return true;
4759 }
4760
4761 DISPATCH_ALWAYS_INLINE
4762 static inline bool
4763 _dispatch_queue_drain_should_narrow(dispatch_invoke_context_t dic)
4764 {
4765 uint64_t next_check = dic->dic_next_narrow_check;
4766 if (unlikely(next_check)) {
4767 uint64_t now = _dispatch_approximate_time();
4768 if (unlikely(next_check < now)) {
4769 return _dispatch_queue_drain_should_narrow_slow(now, dic);
4770 }
4771 }
4772 return false;
4773 }
4774 #else
4775 #define _dispatch_queue_drain_init_narrowing_check_deadline(rq, dic) ((void)0)
4776 #define _dispatch_queue_drain_should_narrow(dic) false
4777 #endif
4778
4779 /*
4780 * Drain comes in 2 flavours (serial/concurrent) and 2 modes
4781 * (redirecting or not).
4782 *
4783 * Serial
4784 * ~~~~~~
4785 * Serial drain is about serial queues (width == 1). It doesn't support
4786 * the redirecting mode, which doesn't make sense, and treats all continuations
4787 * as barriers. Bookkeeping is minimal in serial flavour, most of the loop
4788 * is optimized away.
4789 *
4790 * Serial drain stops if the width of the queue grows to larger than 1.
4791 * Going through a serial drain prevents any recursive drain from being
4792 * redirecting.
4793 *
4794 * Concurrent
4795 * ~~~~~~~~~~
4796 * When in non-redirecting mode (meaning one of the target queues is serial),
4797 * non-barriers and barriers alike run in the context of the drain thread.
4798 * Slow non-barrier items are still all signaled so that they can make progress
4799 * toward the dispatch_sync() that will serialize them all .
4800 *
4801 * In redirecting mode, non-barrier work items are redirected downward.
4802 *
4803 * Concurrent drain stops if the width of the queue becomes 1, so that the
4804 * queue drain moves to the more efficient serial mode.
4805 */
4806 DISPATCH_ALWAYS_INLINE
4807 static dispatch_queue_wakeup_target_t
4808 _dispatch_queue_drain(dispatch_queue_t dq, dispatch_invoke_context_t dic,
4809 dispatch_invoke_flags_t flags, uint64_t *owned_ptr, bool serial_drain)
4810 {
4811 dispatch_queue_t orig_tq = dq->do_targetq;
4812 dispatch_thread_frame_s dtf;
4813 struct dispatch_object_s *dc = NULL, *next_dc;
4814 uint64_t dq_state, owned = *owned_ptr;
4815
4816 if (unlikely(!dq->dq_items_tail)) return NULL;
4817
4818 _dispatch_thread_frame_push(&dtf, dq);
4819 if (serial_drain || _dq_state_is_in_barrier(owned)) {
4820 // we really own `IN_BARRIER + dq->dq_width * WIDTH_INTERVAL`
4821 // but width can change while draining barrier work items, so we only
4822 // convert to `dq->dq_width * WIDTH_INTERVAL` when we drop `IN_BARRIER`
4823 owned = DISPATCH_QUEUE_IN_BARRIER;
4824 } else {
4825 owned &= DISPATCH_QUEUE_WIDTH_MASK;
4826 }
4827
4828 dc = _dispatch_queue_head(dq);
4829 goto first_iteration;
4830
4831 for (;;) {
4832 dc = next_dc;
4833 if (unlikely(dic->dic_deferred)) {
4834 goto out_with_deferred_compute_owned;
4835 }
4836 if (unlikely(_dispatch_needs_to_return_to_kernel())) {
4837 _dispatch_return_to_kernel();
4838 }
4839 if (unlikely(!dc)) {
4840 if (!dq->dq_items_tail) {
4841 break;
4842 }
4843 dc = _dispatch_queue_head(dq);
4844 }
4845 if (unlikely(serial_drain != (dq->dq_width == 1))) {
4846 break;
4847 }
4848 if (unlikely(_dispatch_queue_drain_should_narrow(dic))) {
4849 break;
4850 }
4851
4852 first_iteration:
4853 dq_state = os_atomic_load(&dq->dq_state, relaxed);
4854 if (unlikely(_dq_state_is_suspended(dq_state))) {
4855 break;
4856 }
4857 if (unlikely(orig_tq != dq->do_targetq)) {
4858 break;
4859 }
4860
4861 if (serial_drain || _dispatch_object_is_barrier(dc)) {
4862 if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
4863 if (!_dispatch_queue_try_upgrade_full_width(dq, owned)) {
4864 goto out_with_no_width;
4865 }
4866 owned = DISPATCH_QUEUE_IN_BARRIER;
4867 }
4868 next_dc = _dispatch_queue_next(dq, dc);
4869 if (_dispatch_object_is_sync_waiter(dc)) {
4870 owned = 0;
4871 dic->dic_deferred = dc;
4872 goto out_with_deferred;
4873 }
4874 } else {
4875 if (owned == DISPATCH_QUEUE_IN_BARRIER) {
4876 // we just ran barrier work items, we have to make their
4877 // effect visible to other sync work items on other threads
4878 // that may start coming in after this point, hence the
4879 // release barrier
4880 os_atomic_xor2o(dq, dq_state, owned, release);
4881 owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4882 } else if (unlikely(owned == 0)) {
4883 if (_dispatch_object_is_sync_waiter(dc)) {
4884 // sync "readers" don't observe the limit
4885 _dispatch_queue_reserve_sync_width(dq);
4886 } else if (!_dispatch_queue_try_acquire_async(dq)) {
4887 goto out_with_no_width;
4888 }
4889 owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
4890 }
4891
4892 next_dc = _dispatch_queue_next(dq, dc);
4893 if (_dispatch_object_is_sync_waiter(dc)) {
4894 owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4895 _dispatch_sync_waiter_redirect_or_wake(dq,
4896 DISPATCH_SYNC_WAITER_NO_UNLOCK, dc);
4897 continue;
4898 }
4899
4900 if (flags & DISPATCH_INVOKE_REDIRECTING_DRAIN) {
4901 owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4902 _dispatch_continuation_redirect(dq, dc);
4903 continue;
4904 }
4905 }
4906
4907 _dispatch_continuation_pop_inline(dc, dic, flags, dq);
4908 }
4909
4910 if (owned == DISPATCH_QUEUE_IN_BARRIER) {
4911 // if we're IN_BARRIER we really own the full width too
4912 owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4913 }
4914 if (dc) {
4915 owned = _dispatch_queue_adjust_owned(dq, owned, dc);
4916 }
4917 *owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
4918 *owned_ptr |= owned;
4919 _dispatch_thread_frame_pop(&dtf);
4920 return dc ? dq->do_targetq : NULL;
4921
4922 out_with_no_width:
4923 *owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
4924 _dispatch_thread_frame_pop(&dtf);
4925 return DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;
4926
4927 out_with_deferred_compute_owned:
4928 if (serial_drain) {
4929 owned = DISPATCH_QUEUE_IN_BARRIER + DISPATCH_QUEUE_WIDTH_INTERVAL;
4930 } else {
4931 if (owned == DISPATCH_QUEUE_IN_BARRIER) {
4932 // if we're IN_BARRIER we really own the full width too
4933 owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4934 }
4935 if (dc) {
4936 owned = _dispatch_queue_adjust_owned(dq, owned, dc);
4937 }
4938 }
4939 out_with_deferred:
4940 *owned_ptr &= DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_ENQUEUED_ON_MGR;
4941 *owned_ptr |= owned;
4942 if (unlikely(flags & DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS)) {
4943 DISPATCH_INTERNAL_CRASH(dc,
4944 "Deferred continuation on source, mach channel or mgr");
4945 }
4946 _dispatch_thread_frame_pop(&dtf);
4947 return dq->do_targetq;
4948 }
4949
4950 DISPATCH_NOINLINE
4951 static dispatch_queue_wakeup_target_t
4952 _dispatch_queue_concurrent_drain(dispatch_queue_t dq,
4953 dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
4954 uint64_t *owned)
4955 {
4956 return _dispatch_queue_drain(dq, dic, flags, owned, false);
4957 }
4958
4959 DISPATCH_NOINLINE
4960 dispatch_queue_wakeup_target_t
4961 _dispatch_queue_serial_drain(dispatch_queue_t dq, dispatch_invoke_context_t dic,
4962 dispatch_invoke_flags_t flags, uint64_t *owned)
4963 {
4964 flags &= ~(dispatch_invoke_flags_t)DISPATCH_INVOKE_REDIRECTING_DRAIN;
4965 return _dispatch_queue_drain(dq, dic, flags, owned, true);
4966 }
4967
4968 #if DISPATCH_COCOA_COMPAT
4969 DISPATCH_NOINLINE
4970 static void
4971 _dispatch_main_queue_update_priority_from_thread(void)
4972 {
4973 dispatch_queue_t dq = &_dispatch_main_q;
4974 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
4975 mach_port_t owner = _dq_state_drain_owner(dq_state);
4976
4977 dispatch_priority_t main_pri =
4978 _dispatch_priority_from_pp_strip_flags(_dispatch_get_priority());
4979 dispatch_qos_t main_qos = _dispatch_priority_qos(main_pri);
4980 dispatch_qos_t max_qos = _dq_state_max_qos(dq_state);
4981 dispatch_qos_t old_qos = _dispatch_priority_qos(dq->dq_priority);
4982
4983 // the main thread QoS was adjusted by someone else, learn the new QoS
4984 // and reinitialize _dispatch_main_q.dq_priority
4985 dq->dq_priority = _dispatch_priority_with_override_qos(main_pri, main_qos);
4986
4987 if (old_qos < max_qos && main_qos == DISPATCH_QOS_UNSPECIFIED) {
4988 // main thread is opted out of QoS and we had an override
4989 return _dispatch_thread_override_end(owner, dq);
4990 }
4991
4992 if (old_qos < max_qos && max_qos <= main_qos) {
4993 // main QoS was raised, and we had an override which is now useless
4994 return _dispatch_thread_override_end(owner, dq);
4995 }
4996
4997 if (main_qos < max_qos && max_qos <= old_qos) {
4998 // main thread QoS was lowered, and we actually need an override
4999 pthread_priority_t pp = _dispatch_qos_to_pp(max_qos);
5000 return _dispatch_thread_override_start(owner, pp, dq);
5001 }
5002 }
5003
5004 static void
5005 _dispatch_main_queue_drain(void)
5006 {
5007 dispatch_queue_t dq = &_dispatch_main_q;
5008 dispatch_thread_frame_s dtf;
5009
5010 if (!dq->dq_items_tail) {
5011 return;
5012 }
5013
5014 _dispatch_perfmon_start_notrace();
5015 if (!fastpath(_dispatch_queue_is_thread_bound(dq))) {
5016 DISPATCH_CLIENT_CRASH(0, "_dispatch_main_queue_callback_4CF called"
5017 " after dispatch_main()");
5018 }
5019 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
5020 if (unlikely(!_dq_state_drain_locked_by_self(dq_state))) {
5021 DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
5022 "_dispatch_main_queue_callback_4CF called"
5023 " from the wrong thread");
5024 }
5025
5026 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
5027 _dispatch_runloop_queue_handle_init);
5028
5029 // <rdar://problem/23256682> hide the frame chaining when CFRunLoop
5030 // drains the main runloop, as this should not be observable that way
5031 _dispatch_adopt_wlh_anon();
5032 _dispatch_thread_frame_push_and_rebase(&dtf, dq, NULL);
5033
5034 pthread_priority_t pp = _dispatch_get_priority();
5035 dispatch_priority_t pri = _dispatch_priority_from_pp(pp);
5036 dispatch_qos_t qos = _dispatch_priority_qos(pri);
5037 voucher_t voucher = _voucher_copy();
5038
5039 if (unlikely(qos != _dispatch_priority_qos(dq->dq_priority))) {
5040 _dispatch_main_queue_update_priority_from_thread();
5041 }
5042 dispatch_priority_t old_dbp = _dispatch_set_basepri(pri);
5043 _dispatch_set_basepri_override_qos(DISPATCH_QOS_SATURATED);
5044
5045 dispatch_invoke_context_s dic = { };
5046 struct dispatch_object_s *dc, *next_dc, *tail;
5047 dc = os_mpsc_capture_snapshot(dq, dq_items, &tail);
5048 do {
5049 next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
5050 _dispatch_continuation_pop_inline(dc, &dic, DISPATCH_INVOKE_NONE, dq);
5051 } while ((dc = next_dc));
5052
5053 dx_wakeup(dq, 0, 0);
5054 _dispatch_voucher_debug("main queue restore", voucher);
5055 _dispatch_reset_basepri(old_dbp);
5056 _dispatch_reset_basepri_override();
5057 _dispatch_reset_priority_and_voucher(pp, voucher);
5058 _dispatch_thread_frame_pop(&dtf);
5059 _dispatch_reset_wlh();
5060 _dispatch_force_cache_cleanup();
5061 _dispatch_perfmon_end_notrace();
5062 }
5063
5064 static bool
5065 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq)
5066 {
5067 if (!dq->dq_items_tail) {
5068 return false;
5069 }
5070 _dispatch_perfmon_start_notrace();
5071 dispatch_thread_frame_s dtf;
5072 bool should_reset_wlh = _dispatch_adopt_wlh_anon_recurse();
5073 _dispatch_thread_frame_push(&dtf, dq);
5074 pthread_priority_t pp = _dispatch_get_priority();
5075 dispatch_priority_t pri = _dispatch_priority_from_pp(pp);
5076 voucher_t voucher = _voucher_copy();
5077 dispatch_priority_t old_dbp = _dispatch_set_basepri(pri);
5078 _dispatch_set_basepri_override_qos(DISPATCH_QOS_SATURATED);
5079
5080 dispatch_invoke_context_s dic = { };
5081 struct dispatch_object_s *dc, *next_dc;
5082 dc = _dispatch_queue_head(dq);
5083 next_dc = _dispatch_queue_next(dq, dc);
5084 _dispatch_continuation_pop_inline(dc, &dic, DISPATCH_INVOKE_NONE, dq);
5085
5086 if (!next_dc) {
5087 dx_wakeup(dq, 0, 0);
5088 }
5089
5090 _dispatch_voucher_debug("runloop queue restore", voucher);
5091 _dispatch_reset_basepri(old_dbp);
5092 _dispatch_reset_basepri_override();
5093 _dispatch_reset_priority_and_voucher(pp, voucher);
5094 _dispatch_thread_frame_pop(&dtf);
5095 if (should_reset_wlh) _dispatch_reset_wlh();
5096 _dispatch_force_cache_cleanup();
5097 _dispatch_perfmon_end_notrace();
5098 return next_dc;
5099 }
5100 #endif
5101
5102 void
5103 _dispatch_mgr_queue_drain(void)
5104 {
5105 const dispatch_invoke_flags_t flags = DISPATCH_INVOKE_MANAGER_DRAIN;
5106 dispatch_invoke_context_s dic = { };
5107 dispatch_queue_t dq = &_dispatch_mgr_q;
5108 uint64_t owned = DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
5109
5110 if (dq->dq_items_tail) {
5111 _dispatch_perfmon_start();
5112 _dispatch_set_basepri_override_qos(DISPATCH_QOS_SATURATED);
5113 if (slowpath(_dispatch_queue_serial_drain(dq, &dic, flags, &owned))) {
5114 DISPATCH_INTERNAL_CRASH(0, "Interrupted drain on manager queue");
5115 }
5116 _dispatch_voucher_debug("mgr queue clear", NULL);
5117 _voucher_clear();
5118 _dispatch_reset_basepri_override();
5119 _dispatch_perfmon_end(perfmon_thread_manager);
5120 }
5121
5122 #if DISPATCH_USE_KEVENT_WORKQUEUE
5123 if (!_dispatch_kevent_workqueue_enabled)
5124 #endif
5125 {
5126 _dispatch_force_cache_cleanup();
5127 }
5128 }
5129
5130 #pragma mark -
5131 #pragma mark dispatch_queue_invoke
5132
5133 void
5134 _dispatch_queue_drain_sync_waiter(dispatch_queue_t dq,
5135 dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
5136 uint64_t owned)
5137 {
5138 struct dispatch_object_s *dc = dic->dic_deferred;
5139 dispatch_assert(_dispatch_object_is_sync_waiter(dc));
5140 dic->dic_deferred = NULL;
5141 if (flags & DISPATCH_INVOKE_WLH) {
5142 // Leave the enqueued bit in place, completion of the last sync waiter
5143 // in the handoff chain is responsible for dequeuing
5144 //
5145 // We currently have a +2 to consume, but we need to keep a +1
5146 // for the thread request
5147 dispatch_assert(_dq_state_is_enqueued_on_target(owned));
5148 dispatch_assert(!_dq_state_is_enqueued_on_manager(owned));
5149 owned &= ~DISPATCH_QUEUE_ENQUEUED;
5150 _dispatch_release_no_dispose(dq);
5151 } else {
5152 // The sync waiter must own a reference
5153 _dispatch_release_2_no_dispose(dq);
5154 }
5155 return _dispatch_sync_waiter_redirect_or_wake(dq, owned, dc);
5156 }
5157
5158 void
5159 _dispatch_queue_finalize_activation(dispatch_queue_t dq,
5160 DISPATCH_UNUSED bool *allow_resume)
5161 {
5162 dispatch_queue_t tq = dq->do_targetq;
5163 _dispatch_queue_priority_inherit_from_target(dq, tq);
5164 _dispatch_queue_inherit_wlh_from_target(dq, tq);
5165 }
5166
5167 DISPATCH_ALWAYS_INLINE
5168 static inline dispatch_queue_wakeup_target_t
5169 dispatch_queue_invoke2(dispatch_queue_t dq, dispatch_invoke_context_t dic,
5170 dispatch_invoke_flags_t flags, uint64_t *owned)
5171 {
5172 dispatch_queue_t otq = dq->do_targetq;
5173 dispatch_queue_t cq = _dispatch_queue_get_current();
5174
5175 if (slowpath(cq != otq)) {
5176 return otq;
5177 }
5178 if (dq->dq_width == 1) {
5179 return _dispatch_queue_serial_drain(dq, dic, flags, owned);
5180 }
5181 return _dispatch_queue_concurrent_drain(dq, dic, flags, owned);
5182 }
5183
5184 // 6618342 Contact the team that owns the Instrument DTrace probe before
5185 // renaming this symbol
5186 DISPATCH_NOINLINE
5187 void
5188 _dispatch_queue_invoke(dispatch_queue_t dq, dispatch_invoke_context_t dic,
5189 dispatch_invoke_flags_t flags)
5190 {
5191 _dispatch_queue_class_invoke(dq, dic, flags, 0, dispatch_queue_invoke2);
5192 }
5193
5194 #pragma mark -
5195 #pragma mark dispatch_queue_class_wakeup
5196
5197 #if HAVE_PTHREAD_WORKQUEUE_QOS
5198 void
5199 _dispatch_queue_override_invoke(dispatch_continuation_t dc,
5200 dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags)
5201 {
5202 dispatch_queue_t old_rq = _dispatch_queue_get_current();
5203 dispatch_queue_t assumed_rq = dc->dc_other;
5204 dispatch_priority_t old_dp;
5205 voucher_t ov = DISPATCH_NO_VOUCHER;
5206 dispatch_object_t dou;
5207
5208 dou._do = dc->dc_data;
5209 old_dp = _dispatch_root_queue_identity_assume(assumed_rq);
5210 if (dc_type(dc) == DISPATCH_CONTINUATION_TYPE(OVERRIDE_STEALING)) {
5211 flags |= DISPATCH_INVOKE_STEALING;
5212 } else {
5213 // balance the fake continuation push in
5214 // _dispatch_root_queue_push_override
5215 _dispatch_trace_continuation_pop(assumed_rq, dou._do);
5216 }
5217 _dispatch_continuation_pop_forwarded(dc, ov, DISPATCH_OBJ_CONSUME_BIT, {
5218 if (_dispatch_object_has_vtable(dou._do)) {
5219 dx_invoke(dou._do, dic, flags);
5220 } else {
5221 _dispatch_continuation_invoke_inline(dou, ov, flags);
5222 }
5223 });
5224 _dispatch_reset_basepri(old_dp);
5225 _dispatch_queue_set_current(old_rq);
5226 }
5227
5228 DISPATCH_ALWAYS_INLINE
5229 static inline bool
5230 _dispatch_root_queue_push_needs_override(dispatch_queue_t rq,
5231 dispatch_qos_t qos)
5232 {
5233 dispatch_qos_t rqos = _dispatch_priority_qos(rq->dq_priority);
5234 bool defaultqueue = rq->dq_priority & DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE;
5235
5236 if (unlikely(!rqos)) return false;
5237
5238 return defaultqueue ? qos && qos != rqos : qos > rqos;
5239 }
5240
5241 DISPATCH_ALWAYS_INLINE
5242 static inline bool
5243 _dispatch_root_queue_push_queue_override_needed(dispatch_queue_t rq,
5244 dispatch_qos_t qos)
5245 {
5246 // for root queues, the override is the guaranteed minimum override level
5247 return qos > _dispatch_priority_override_qos(rq->dq_priority);
5248 }
5249
5250 DISPATCH_NOINLINE
5251 static void
5252 _dispatch_root_queue_push_override(dispatch_queue_t orig_rq,
5253 dispatch_object_t dou, dispatch_qos_t qos)
5254 {
5255 bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
5256 dispatch_queue_t rq = _dispatch_get_root_queue(qos, overcommit);
5257 dispatch_continuation_t dc = dou._dc;
5258
5259 if (_dispatch_object_is_redirection(dc)) {
5260 // no double-wrap is needed, _dispatch_async_redirect_invoke will do
5261 // the right thing
5262 dc->dc_func = (void *)orig_rq;
5263 } else {
5264 dc = _dispatch_continuation_alloc();
5265 dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
5266 // fake that we queued `dou` on `orig_rq` for introspection purposes
5267 _dispatch_trace_continuation_push(orig_rq, dou);
5268 dc->dc_ctxt = dc;
5269 dc->dc_other = orig_rq;
5270 dc->dc_data = dou._do;
5271 dc->dc_priority = DISPATCH_NO_PRIORITY;
5272 dc->dc_voucher = DISPATCH_NO_VOUCHER;
5273 }
5274 _dispatch_root_queue_push_inline(rq, dc, dc, 1);
5275 }
5276
5277 DISPATCH_NOINLINE
5278 static void
5279 _dispatch_root_queue_push_override_stealer(dispatch_queue_t orig_rq,
5280 dispatch_queue_t dq, dispatch_qos_t qos)
5281 {
5282 bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
5283 dispatch_queue_t rq = _dispatch_get_root_queue(qos, overcommit);
5284 dispatch_continuation_t dc = _dispatch_continuation_alloc();
5285
5286 dc->do_vtable = DC_VTABLE(OVERRIDE_STEALING);
5287 _dispatch_retain_2(dq);
5288 dc->dc_func = NULL;
5289 dc->dc_ctxt = dc;
5290 dc->dc_other = orig_rq;
5291 dc->dc_data = dq;
5292 dc->dc_priority = DISPATCH_NO_PRIORITY;
5293 dc->dc_voucher = DISPATCH_NO_VOUCHER;
5294 _dispatch_root_queue_push_inline(rq, dc, dc, 1);
5295 }
5296
5297 DISPATCH_NOINLINE
5298 static void
5299 _dispatch_queue_class_wakeup_with_override_slow(dispatch_queue_t dq,
5300 uint64_t dq_state, dispatch_wakeup_flags_t flags)
5301 {
5302 dispatch_qos_t oqos, qos = _dq_state_max_qos(dq_state);
5303 dispatch_queue_t tq;
5304 bool locked;
5305
5306 if (_dq_state_is_base_anon(dq_state)) {
5307 mach_port_t owner = _dq_state_drain_owner(dq_state);
5308 if (owner) {
5309 (void)_dispatch_wqthread_override_start_check_owner(owner, qos,
5310 &dq->dq_state_lock);
5311 goto out;
5312 }
5313 }
5314
5315 tq = dq->do_targetq;
5316
5317 if (likely(!_dispatch_queue_is_legacy(dq))) {
5318 locked = false;
5319 } else if (_dispatch_is_in_root_queues_array(tq)) {
5320 // avoid locking when we recognize the target queue as a global root
5321 // queue it is gross, but is a very common case. The locking isn't
5322 // needed because these target queues cannot go away.
5323 locked = false;
5324 } else if (_dispatch_queue_sidelock_trylock(dq, qos)) {
5325 // <rdar://problem/17735825> to traverse the tq chain safely we must
5326 // lock it to ensure it cannot change
5327 locked = true;
5328 tq = dq->do_targetq;
5329 _dispatch_ktrace1(DISPATCH_PERF_mutable_target, dq);
5330 } else {
5331 //
5332 // Leading to being there, the current thread has:
5333 // 1. enqueued an object on `dq`
5334 // 2. raised the max_qos value, set RECEIVED_OVERRIDE on `dq`
5335 // and didn't see an owner
5336 // 3. tried and failed to acquire the side lock
5337 //
5338 // The side lock owner can only be one of three things:
5339 //
5340 // - The suspend/resume side count code. Besides being unlikely,
5341 // it means that at this moment the queue is actually suspended,
5342 // which transfers the responsibility of applying the override to
5343 // the eventual dispatch_resume().
5344 //
5345 // - A dispatch_set_target_queue() call. The fact that we saw no `owner`
5346 // means that the trysync it does wasn't being drained when (2)
5347 // happened which can only be explained by one of these interleavings:
5348 //
5349 // o `dq` became idle between when the object queued in (1) ran and
5350 // the set_target_queue call and we were unlucky enough that our
5351 // step (2) happened while this queue was idle. There is no reason
5352 // to override anything anymore, the queue drained to completion
5353 // while we were preempted, our job is done.
5354 //
5355 // o `dq` is queued but not draining during (1-2), then when we try
5356 // to lock at (3) the queue is now draining a set_target_queue.
5357 // This drainer must have seen the effects of (2) and that guy has
5358 // applied our override. Our job is done.
5359 //
5360 // - Another instance of _dispatch_queue_class_wakeup_with_override(),
5361 // which is fine because trylock leaves a hint that we failed our
5362 // trylock, causing the tryunlock below to fail and reassess whether
5363 // a better override needs to be applied.
5364 //
5365 _dispatch_ktrace1(DISPATCH_PERF_mutable_target, dq);
5366 goto out;
5367 }
5368
5369 apply_again:
5370 if (dx_hastypeflag(tq, QUEUE_ROOT)) {
5371 if (_dispatch_root_queue_push_queue_override_needed(tq, qos)) {
5372 _dispatch_root_queue_push_override_stealer(tq, dq, qos);
5373 }
5374 } else if (_dispatch_queue_need_override(tq, qos)) {
5375 dx_wakeup(tq, qos, 0);
5376 }
5377 while (unlikely(locked && !_dispatch_queue_sidelock_tryunlock(dq))) {
5378 // rdar://problem/24081326
5379 //
5380 // Another instance of _dispatch_queue_class_wakeup_with_override()
5381 // tried to acquire the side lock while we were running, and could have
5382 // had a better override than ours to apply.
5383 //
5384 oqos = _dq_state_max_qos(os_atomic_load2o(dq, dq_state, relaxed));
5385 if (oqos > qos) {
5386 qos = oqos;
5387 // The other instance had a better priority than ours, override
5388 // our thread, and apply the override that wasn't applied to `dq`
5389 // because of us.
5390 goto apply_again;
5391 }
5392 }
5393
5394 out:
5395 if (flags & DISPATCH_WAKEUP_CONSUME_2) {
5396 return _dispatch_release_2_tailcall(dq);
5397 }
5398 }
5399
5400
5401 DISPATCH_ALWAYS_INLINE
5402 static inline void
5403 _dispatch_queue_class_wakeup_with_override(dispatch_queue_t dq,
5404 uint64_t dq_state, dispatch_wakeup_flags_t flags)
5405 {
5406 dispatch_assert(_dq_state_should_override(dq_state));
5407
5408 return _dispatch_queue_class_wakeup_with_override_slow(dq, dq_state, flags);
5409 }
5410 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5411
5412 DISPATCH_NOINLINE
5413 void
5414 _dispatch_root_queue_push(dispatch_queue_t rq, dispatch_object_t dou,
5415 dispatch_qos_t qos)
5416 {
5417 #if DISPATCH_USE_KEVENT_WORKQUEUE
5418 dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
5419 if (unlikely(ddi && ddi->ddi_can_stash)) {
5420 dispatch_object_t old_dou = ddi->ddi_stashed_dou;
5421 dispatch_priority_t rq_overcommit;
5422 rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
5423
5424 if (likely(!old_dou._do || rq_overcommit)) {
5425 dispatch_queue_t old_rq = ddi->ddi_stashed_rq;
5426 dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
5427 ddi->ddi_stashed_rq = rq;
5428 ddi->ddi_stashed_dou = dou;
5429 ddi->ddi_stashed_qos = qos;
5430 _dispatch_debug("deferring item %p, rq %p, qos %d",
5431 dou._do, rq, qos);
5432 if (rq_overcommit) {
5433 ddi->ddi_can_stash = false;
5434 }
5435 if (likely(!old_dou._do)) {
5436 return;
5437 }
5438 // push the previously stashed item
5439 qos = old_qos;
5440 rq = old_rq;
5441 dou = old_dou;
5442 }
5443 }
5444 #endif
5445 #if HAVE_PTHREAD_WORKQUEUE_QOS
5446 if (_dispatch_root_queue_push_needs_override(rq, qos)) {
5447 return _dispatch_root_queue_push_override(rq, dou, qos);
5448 }
5449 #else
5450 (void)qos;
5451 #endif
5452 _dispatch_root_queue_push_inline(rq, dou, dou, 1);
5453 }
5454
5455 void
5456 _dispatch_root_queue_wakeup(dispatch_queue_t dq,
5457 DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags)
5458 {
5459 if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) {
5460 DISPATCH_INTERNAL_CRASH(dq->dq_priority,
5461 "Don't try to wake up or override a root queue");
5462 }
5463 if (flags & DISPATCH_WAKEUP_CONSUME_2) {
5464 return _dispatch_release_2_tailcall(dq);
5465 }
5466 }
5467
5468 DISPATCH_NOINLINE
5469 void
5470 _dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
5471 dispatch_qos_t qos)
5472 {
5473 _dispatch_queue_push_inline(dq, dou, qos);
5474 }
5475
5476 DISPATCH_NOINLINE
5477 void
5478 _dispatch_queue_class_wakeup(dispatch_queue_t dq, dispatch_qos_t qos,
5479 dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
5480 {
5481 dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);
5482
5483 if (target && !(flags & DISPATCH_WAKEUP_CONSUME_2)) {
5484 _dispatch_retain_2(dq);
5485 flags |= DISPATCH_WAKEUP_CONSUME_2;
5486 }
5487
5488 if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
5489 //
5490 // _dispatch_queue_class_barrier_complete() is about what both regular
5491 // queues and sources needs to evaluate, but the former can have sync
5492 // handoffs to perform which _dispatch_queue_class_barrier_complete()
5493 // doesn't handle, only _dispatch_queue_barrier_complete() does.
5494 //
5495 // _dispatch_queue_wakeup() is the one for plain queues that calls
5496 // _dispatch_queue_barrier_complete(), and this is only taken for non
5497 // queue types.
5498 //
5499 dispatch_assert(dx_metatype(dq) != _DISPATCH_QUEUE_TYPE);
5500 qos = _dispatch_queue_override_qos(dq, qos);
5501 return _dispatch_queue_class_barrier_complete(dq, qos, flags, target,
5502 DISPATCH_QUEUE_SERIAL_DRAIN_OWNED);
5503 }
5504
5505 if (target) {
5506 uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
5507 if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
5508 enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
5509 }
5510 qos = _dispatch_queue_override_qos(dq, qos);
5511 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
5512 new_state = _dq_state_merge_qos(old_state, qos);
5513 if (likely(!_dq_state_is_suspended(old_state) &&
5514 !_dq_state_is_enqueued(old_state) &&
5515 (!_dq_state_drain_locked(old_state) ||
5516 (enqueue != DISPATCH_QUEUE_ENQUEUED_ON_MGR &&
5517 _dq_state_is_base_wlh(old_state))))) {
5518 new_state |= enqueue;
5519 }
5520 if (flags & DISPATCH_WAKEUP_MAKE_DIRTY) {
5521 new_state |= DISPATCH_QUEUE_DIRTY;
5522 } else if (new_state == old_state) {
5523 os_atomic_rmw_loop_give_up(goto done);
5524 }
5525 });
5526
5527 if (likely((old_state ^ new_state) & enqueue)) {
5528 dispatch_queue_t tq;
5529 if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
5530 // the rmw_loop above has no acquire barrier, as the last block
5531 // of a queue asyncing to that queue is not an uncommon pattern
5532 // and in that case the acquire would be completely useless
5533 //
5534 // so instead use depdendency ordering to read
5535 // the targetq pointer.
5536 os_atomic_thread_fence(dependency);
5537 tq = os_atomic_load_with_dependency_on2o(dq, do_targetq,
5538 (long)new_state);
5539 } else {
5540 tq = target;
5541 }
5542 dispatch_assert(_dq_state_is_enqueued(new_state));
5543 return _dispatch_queue_push_queue(tq, dq, new_state);
5544 }
5545 #if HAVE_PTHREAD_WORKQUEUE_QOS
5546 if (unlikely((old_state ^ new_state) & DISPATCH_QUEUE_MAX_QOS_MASK)) {
5547 if (_dq_state_should_override(new_state)) {
5548 return _dispatch_queue_class_wakeup_with_override(dq, new_state,
5549 flags);
5550 }
5551 }
5552 } else if (qos) {
5553 //
5554 // Someone is trying to override the last work item of the queue.
5555 //
5556 uint64_t old_state, new_state;
5557 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
5558 if (!_dq_state_drain_locked(old_state) ||
5559 !_dq_state_is_enqueued(old_state)) {
5560 os_atomic_rmw_loop_give_up(goto done);
5561 }
5562 new_state = _dq_state_merge_qos(old_state, qos);
5563 if (new_state == old_state) {
5564 os_atomic_rmw_loop_give_up(goto done);
5565 }
5566 });
5567 if (_dq_state_should_override(new_state)) {
5568 return _dispatch_queue_class_wakeup_with_override(dq, new_state,
5569 flags);
5570 }
5571 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5572 }
5573 done:
5574 if (likely(flags & DISPATCH_WAKEUP_CONSUME_2)) {
5575 return _dispatch_release_2_tailcall(dq);
5576 }
5577 }
5578
5579 DISPATCH_NOINLINE
5580 static void
5581 _dispatch_queue_push_sync_waiter(dispatch_queue_t dq,
5582 dispatch_sync_context_t dsc, dispatch_qos_t qos)
5583 {
5584 uint64_t old_state, new_state;
5585
5586 if (unlikely(dx_type(dq) == DISPATCH_QUEUE_NETWORK_EVENT_TYPE)) {
5587 DISPATCH_CLIENT_CRASH(0,
5588 "dispatch_sync onto a network event queue");
5589 }
5590
5591 _dispatch_trace_continuation_push(dq, dsc->_as_dc);
5592
5593 if (unlikely(_dispatch_queue_push_update_tail(dq, dsc->_as_do))) {
5594 // for slow waiters, we borrow the reference of the caller
5595 // so we don't need to protect the wakeup with a temporary retain
5596 _dispatch_queue_push_update_head(dq, dsc->_as_do);
5597 if (unlikely(_dispatch_queue_is_thread_bound(dq))) {
5598 return dx_wakeup(dq, qos, DISPATCH_WAKEUP_MAKE_DIRTY);
5599 }
5600
5601 uint64_t pending_barrier_width =
5602 (dq->dq_width - 1) * DISPATCH_QUEUE_WIDTH_INTERVAL;
5603 uint64_t set_owner_and_set_full_width_and_in_barrier =
5604 _dispatch_lock_value_for_self() |
5605 DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER;
5606 // similar to _dispatch_queue_drain_try_unlock()
5607 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
5608 new_state = _dq_state_merge_qos(old_state, qos);
5609 new_state |= DISPATCH_QUEUE_DIRTY;
5610 if (unlikely(_dq_state_drain_locked(old_state) ||
5611 !_dq_state_is_runnable(old_state))) {
5612 // not runnable, so we should just handle overrides
5613 } else if (_dq_state_is_base_wlh(old_state) &&
5614 _dq_state_is_enqueued(old_state)) {
5615 // 32123779 let the event thread redrive since it's out already
5616 } else if (_dq_state_has_pending_barrier(old_state) ||
5617 new_state + pending_barrier_width <
5618 DISPATCH_QUEUE_WIDTH_FULL_BIT) {
5619 // see _dispatch_queue_drain_try_lock
5620 new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
5621 new_state |= set_owner_and_set_full_width_and_in_barrier;
5622 }
5623 });
5624
5625 if (_dq_state_is_base_wlh(old_state) &&
5626 (dsc->dsc_waiter == _dispatch_tid_self())) {
5627 dsc->dsc_wlh_was_first = true;
5628 }
5629
5630 if ((old_state ^ new_state) & DISPATCH_QUEUE_IN_BARRIER) {
5631 return _dispatch_queue_barrier_complete(dq, qos, 0);
5632 }
5633 #if HAVE_PTHREAD_WORKQUEUE_QOS
5634 if (unlikely((old_state ^ new_state) & DISPATCH_QUEUE_MAX_QOS_MASK)) {
5635 if (_dq_state_should_override(new_state)) {
5636 return _dispatch_queue_class_wakeup_with_override(dq,
5637 new_state, 0);
5638 }
5639 }
5640 } else if (unlikely(qos)) {
5641 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
5642 new_state = _dq_state_merge_qos(old_state, qos);
5643 if (old_state == new_state) {
5644 os_atomic_rmw_loop_give_up(return);
5645 }
5646 });
5647 if (_dq_state_should_override(new_state)) {
5648 return _dispatch_queue_class_wakeup_with_override(dq, new_state, 0);
5649 }
5650 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5651 }
5652 }
5653
5654 #pragma mark -
5655 #pragma mark dispatch_root_queue_drain
5656
5657 DISPATCH_NOINLINE
5658 static bool
5659 _dispatch_root_queue_drain_one_slow(dispatch_queue_t dq)
5660 {
5661 dispatch_root_queue_context_t qc = dq->do_ctxt;
5662 struct dispatch_object_s *const mediator = (void *)~0ul;
5663 bool pending = false, available = true;
5664 unsigned int sleep_time = DISPATCH_CONTENTION_USLEEP_START;
5665
5666 do {
5667 // Spin for a short while in case the contention is temporary -- e.g.
5668 // when starting up after dispatch_apply, or when executing a few
5669 // short continuations in a row.
5670 if (_dispatch_contention_wait_until(dq->dq_items_head != mediator)) {
5671 goto out;
5672 }
5673 // Since we have serious contention, we need to back off.
5674 if (!pending) {
5675 // Mark this queue as pending to avoid requests for further threads
5676 (void)os_atomic_inc2o(qc, dgq_pending, relaxed);
5677 pending = true;
5678 }
5679 _dispatch_contention_usleep(sleep_time);
5680 if (fastpath(dq->dq_items_head != mediator)) goto out;
5681 sleep_time *= 2;
5682 } while (sleep_time < DISPATCH_CONTENTION_USLEEP_MAX);
5683
5684 // The ratio of work to libdispatch overhead must be bad. This
5685 // scenario implies that there are too many threads in the pool.
5686 // Create a new pending thread and then exit this thread.
5687 // The kernel will grant a new thread when the load subsides.
5688 _dispatch_debug("contention on global queue: %p", dq);
5689 available = false;
5690 out:
5691 if (pending) {
5692 (void)os_atomic_dec2o(qc, dgq_pending, relaxed);
5693 }
5694 if (!available) {
5695 _dispatch_global_queue_poke(dq, 1, 0);
5696 }
5697 return available;
5698 }
5699
5700 DISPATCH_ALWAYS_INLINE
5701 static inline bool
5702 _dispatch_root_queue_drain_one2(dispatch_queue_t dq)
5703 {
5704 // Wait for queue head and tail to be both non-empty or both empty
5705 bool available; // <rdar://problem/15917893>
5706 _dispatch_wait_until((dq->dq_items_head != NULL) ==
5707 (available = (dq->dq_items_tail != NULL)));
5708 return available;
5709 }
5710
5711 DISPATCH_ALWAYS_INLINE_NDEBUG
5712 static inline struct dispatch_object_s *
5713 _dispatch_root_queue_drain_one(dispatch_queue_t dq)
5714 {
5715 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
5716
5717 start:
5718 // The mediator value acts both as a "lock" and a signal
5719 head = os_atomic_xchg2o(dq, dq_items_head, mediator, relaxed);
5720
5721 if (slowpath(head == NULL)) {
5722 // The first xchg on the tail will tell the enqueueing thread that it
5723 // is safe to blindly write out to the head pointer. A cmpxchg honors
5724 // the algorithm.
5725 if (slowpath(!os_atomic_cmpxchg2o(dq, dq_items_head, mediator,
5726 NULL, relaxed))) {
5727 goto start;
5728 }
5729 if (slowpath(dq->dq_items_tail) && // <rdar://problem/14416349>
5730 _dispatch_root_queue_drain_one2(dq)) {
5731 goto start;
5732 }
5733 _dispatch_root_queue_debug("no work on global queue: %p", dq);
5734 return NULL;
5735 }
5736
5737 if (slowpath(head == mediator)) {
5738 // This thread lost the race for ownership of the queue.
5739 if (fastpath(_dispatch_root_queue_drain_one_slow(dq))) {
5740 goto start;
5741 }
5742 return NULL;
5743 }
5744
5745 // Restore the head pointer to a sane value before returning.
5746 // If 'next' is NULL, then this item _might_ be the last item.
5747 next = fastpath(head->do_next);
5748
5749 if (slowpath(!next)) {
5750 os_atomic_store2o(dq, dq_items_head, NULL, relaxed);
5751 // 22708742: set tail to NULL with release, so that NULL write to head
5752 // above doesn't clobber head from concurrent enqueuer
5753 if (os_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL, release)) {
5754 // both head and tail are NULL now
5755 goto out;
5756 }
5757 // There must be a next item now.
5758 next = os_mpsc_get_next(head, do_next);
5759 }
5760
5761 os_atomic_store2o(dq, dq_items_head, next, relaxed);
5762 _dispatch_global_queue_poke(dq, 1, 0);
5763 out:
5764 return head;
5765 }
5766
5767 #if DISPATCH_USE_KEVENT_WORKQUEUE
5768 void
5769 _dispatch_root_queue_drain_deferred_wlh(dispatch_deferred_items_t ddi
5770 DISPATCH_PERF_MON_ARGS_PROTO)
5771 {
5772 dispatch_queue_t rq = ddi->ddi_stashed_rq;
5773 dispatch_queue_t dq = ddi->ddi_stashed_dou._dq;
5774 _dispatch_queue_set_current(rq);
5775 dispatch_priority_t old_pri = _dispatch_set_basepri_wlh(rq->dq_priority);
5776 dispatch_invoke_context_s dic = { };
5777 dispatch_invoke_flags_t flags = DISPATCH_INVOKE_WORKER_DRAIN |
5778 DISPATCH_INVOKE_REDIRECTING_DRAIN | DISPATCH_INVOKE_WLH;
5779 _dispatch_queue_drain_init_narrowing_check_deadline(&dic, rq->dq_priority);
5780 uint64_t dq_state;
5781
5782 ddi->ddi_wlh_servicing = true;
5783 if (unlikely(_dispatch_needs_to_return_to_kernel())) {
5784 _dispatch_return_to_kernel();
5785 }
5786 retry:
5787 dispatch_assert(ddi->ddi_wlh_needs_delete);
5788 _dispatch_trace_continuation_pop(rq, dq);
5789
5790 if (_dispatch_queue_drain_try_lock_wlh(dq, &dq_state)) {
5791 dx_invoke(dq, &dic, flags);
5792 if (!ddi->ddi_wlh_needs_delete) {
5793 goto park;
5794 }
5795 dq_state = os_atomic_load2o(dq, dq_state, relaxed);
5796 if (unlikely(!_dq_state_is_base_wlh(dq_state))) { // rdar://32671286
5797 goto park;
5798 }
5799 if (unlikely(_dq_state_is_enqueued_on_target(dq_state))) {
5800 _dispatch_retain(dq);
5801 _dispatch_trace_continuation_push(dq->do_targetq, dq);
5802 goto retry;
5803 }
5804 } else {
5805 if (_dq_state_is_suspended(dq_state)) {
5806 dispatch_assert(!_dq_state_is_enqueued(dq_state));
5807 _dispatch_release_2_no_dispose(dq);
5808 } else {
5809 dispatch_assert(_dq_state_is_enqueued(dq_state));
5810 dispatch_assert(_dq_state_drain_locked(dq_state));
5811 _dispatch_release_no_dispose(dq);
5812 }
5813 }
5814
5815 _dispatch_event_loop_leave_deferred((dispatch_wlh_t)dq, dq_state);
5816
5817 park:
5818 // event thread that could steal
5819 _dispatch_perfmon_end(perfmon_thread_event_steal);
5820 _dispatch_reset_basepri(old_pri);
5821 _dispatch_reset_basepri_override();
5822 _dispatch_queue_set_current(NULL);
5823
5824 _dispatch_voucher_debug("root queue clear", NULL);
5825 _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
5826 }
5827
5828 void
5829 _dispatch_root_queue_drain_deferred_item(dispatch_deferred_items_t ddi
5830 DISPATCH_PERF_MON_ARGS_PROTO)
5831 {
5832 dispatch_queue_t rq = ddi->ddi_stashed_rq;
5833 _dispatch_queue_set_current(rq);
5834 dispatch_priority_t old_pri = _dispatch_set_basepri(rq->dq_priority);
5835
5836 dispatch_invoke_context_s dic = { };
5837 dispatch_invoke_flags_t flags = DISPATCH_INVOKE_WORKER_DRAIN |
5838 DISPATCH_INVOKE_REDIRECTING_DRAIN;
5839 #if DISPATCH_COCOA_COMPAT
5840 _dispatch_last_resort_autorelease_pool_push(&dic);
5841 #endif // DISPATCH_COCOA_COMPAT
5842 _dispatch_queue_drain_init_narrowing_check_deadline(&dic, rq->dq_priority);
5843 _dispatch_continuation_pop_inline(ddi->ddi_stashed_dou, &dic, flags, rq);
5844
5845 // event thread that could steal
5846 _dispatch_perfmon_end(perfmon_thread_event_steal);
5847 #if DISPATCH_COCOA_COMPAT
5848 _dispatch_last_resort_autorelease_pool_pop(&dic);
5849 #endif // DISPATCH_COCOA_COMPAT
5850 _dispatch_reset_basepri(old_pri);
5851 _dispatch_reset_basepri_override();
5852 _dispatch_queue_set_current(NULL);
5853
5854 _dispatch_voucher_debug("root queue clear", NULL);
5855 _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
5856 }
5857 #endif
5858
5859 DISPATCH_NOT_TAIL_CALLED // prevent tailcall (for Instrument DTrace probe)
5860 static void
5861 _dispatch_root_queue_drain(dispatch_queue_t dq, pthread_priority_t pp)
5862 {
5863 #if DISPATCH_DEBUG
5864 dispatch_queue_t cq;
5865 if (slowpath(cq = _dispatch_queue_get_current())) {
5866 DISPATCH_INTERNAL_CRASH(cq, "Premature thread recycling");
5867 }
5868 #endif
5869 _dispatch_queue_set_current(dq);
5870 dispatch_priority_t pri = dq->dq_priority;
5871 if (!pri) pri = _dispatch_priority_from_pp(pp);
5872 dispatch_priority_t old_dbp = _dispatch_set_basepri(pri);
5873 _dispatch_adopt_wlh_anon();
5874
5875 struct dispatch_object_s *item;
5876 bool reset = false;
5877 dispatch_invoke_context_s dic = { };
5878 #if DISPATCH_COCOA_COMPAT
5879 _dispatch_last_resort_autorelease_pool_push(&dic);
5880 #endif // DISPATCH_COCOA_COMPAT
5881 dispatch_invoke_flags_t flags = DISPATCH_INVOKE_WORKER_DRAIN |
5882 DISPATCH_INVOKE_REDIRECTING_DRAIN;
5883 _dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri);
5884 _dispatch_perfmon_start();
5885 while ((item = fastpath(_dispatch_root_queue_drain_one(dq)))) {
5886 if (reset) _dispatch_wqthread_override_reset();
5887 _dispatch_continuation_pop_inline(item, &dic, flags, dq);
5888 reset = _dispatch_reset_basepri_override();
5889 if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
5890 break;
5891 }
5892 }
5893
5894 // overcommit or not. worker thread
5895 if (pri & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
5896 _dispatch_perfmon_end(perfmon_thread_worker_oc);
5897 } else {
5898 _dispatch_perfmon_end(perfmon_thread_worker_non_oc);
5899 }
5900
5901 #if DISPATCH_COCOA_COMPAT
5902 _dispatch_last_resort_autorelease_pool_pop(&dic);
5903 #endif // DISPATCH_COCOA_COMPAT
5904 _dispatch_reset_wlh();
5905 _dispatch_reset_basepri(old_dbp);
5906 _dispatch_reset_basepri_override();
5907 _dispatch_queue_set_current(NULL);
5908 }
5909
5910 #pragma mark -
5911 #pragma mark dispatch_worker_thread
5912
5913 #if HAVE_PTHREAD_WORKQUEUES
5914 static void
5915 _dispatch_worker_thread4(void *context)
5916 {
5917 dispatch_queue_t dq = context;
5918 dispatch_root_queue_context_t qc = dq->do_ctxt;
5919
5920 _dispatch_introspection_thread_add();
5921 int pending = os_atomic_dec2o(qc, dgq_pending, relaxed);
5922 dispatch_assert(pending >= 0);
5923 _dispatch_root_queue_drain(dq, _dispatch_get_priority());
5924 _dispatch_voucher_debug("root queue clear", NULL);
5925 _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
5926 }
5927
5928 #if HAVE_PTHREAD_WORKQUEUE_QOS
5929 static void
5930 _dispatch_worker_thread3(pthread_priority_t pp)
5931 {
5932 bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
5933 dispatch_queue_t dq;
5934 pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
5935 _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
5936 dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit);
5937 return _dispatch_worker_thread4(dq);
5938 }
5939 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5940
5941 #if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5942 // 6618342 Contact the team that owns the Instrument DTrace probe before
5943 // renaming this symbol
5944 static void
5945 _dispatch_worker_thread2(int priority, int options,
5946 void *context DISPATCH_UNUSED)
5947 {
5948 dispatch_assert(priority >= 0 && priority < WORKQ_NUM_PRIOQUEUE);
5949 dispatch_assert(!(options & ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT));
5950 dispatch_queue_t dq = _dispatch_wq2root_queues[priority][options];
5951
5952 return _dispatch_worker_thread4(dq);
5953 }
5954 #endif // DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5955 #endif // HAVE_PTHREAD_WORKQUEUES
5956
5957 #if DISPATCH_USE_PTHREAD_POOL
5958 // 6618342 Contact the team that owns the Instrument DTrace probe before
5959 // renaming this symbol
5960 static void *
5961 _dispatch_worker_thread(void *context)
5962 {
5963 dispatch_queue_t dq = context;
5964 dispatch_root_queue_context_t qc = dq->do_ctxt;
5965 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
5966
5967 int pending = os_atomic_dec2o(qc, dgq_pending, relaxed);
5968 if (unlikely(pending < 0)) {
5969 DISPATCH_INTERNAL_CRASH(pending, "Pending thread request underflow");
5970 }
5971
5972 if (pqc->dpq_observer_hooks.queue_will_execute) {
5973 _dispatch_set_pthread_root_queue_observer_hooks(
5974 &pqc->dpq_observer_hooks);
5975 }
5976 if (pqc->dpq_thread_configure) {
5977 pqc->dpq_thread_configure();
5978 }
5979
5980 // workaround tweaks the kernel workqueue does for us
5981 _dispatch_sigmask();
5982 _dispatch_introspection_thread_add();
5983
5984 #if DISPATCH_USE_INTERNAL_WORKQUEUE
5985 bool overcommit = (qc->dgq_wq_options & WORKQ_ADDTHREADS_OPTION_OVERCOMMIT);
5986 bool manager = (dq == &_dispatch_mgr_root_queue);
5987 bool monitored = !(overcommit || manager);
5988 if (monitored) {
5989 _dispatch_workq_worker_register(dq, qc->dgq_qos);
5990 }
5991 #endif
5992
5993 const int64_t timeout = 5ull * NSEC_PER_SEC;
5994 pthread_priority_t old_pri = _dispatch_get_priority();
5995 do {
5996 _dispatch_root_queue_drain(dq, old_pri);
5997 _dispatch_reset_priority_and_voucher(old_pri, NULL);
5998 } while (dispatch_semaphore_wait(&pqc->dpq_thread_mediator,
5999 dispatch_time(0, timeout)) == 0);
6000
6001 #if DISPATCH_USE_INTERNAL_WORKQUEUE
6002 if (monitored) {
6003 _dispatch_workq_worker_unregister(dq, qc->dgq_qos);
6004 }
6005 #endif
6006 (void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);
6007 _dispatch_global_queue_poke(dq, 1, 0);
6008 _dispatch_release(dq); // retained in _dispatch_global_queue_poke_slow
6009 return NULL;
6010 }
6011 #endif // DISPATCH_USE_PTHREAD_POOL
6012
6013 #pragma mark -
6014 #pragma mark dispatch_network_root_queue
6015 #if TARGET_OS_MAC
6016
6017 dispatch_queue_t
6018 _dispatch_network_root_queue_create_4NW(const char *label,
6019 const pthread_attr_t *attrs, dispatch_block_t configure)
6020 {
6021 unsigned long flags = dispatch_pthread_root_queue_flags_pool_size(1);
6022 return dispatch_pthread_root_queue_create(label, flags, attrs, configure);
6023 }
6024
6025 #endif // TARGET_OS_MAC
6026 #pragma mark -
6027 #pragma mark dispatch_runloop_queue
6028
6029 static bool _dispatch_program_is_probably_callback_driven;
6030
6031 #if DISPATCH_COCOA_COMPAT
6032
6033 dispatch_queue_t
6034 _dispatch_runloop_root_queue_create_4CF(const char *label, unsigned long flags)
6035 {
6036 dispatch_queue_t dq;
6037 size_t dqs;
6038
6039 if (slowpath(flags)) {
6040 return DISPATCH_BAD_INPUT;
6041 }
6042 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
6043 dq = _dispatch_object_alloc(DISPATCH_VTABLE(queue_runloop), dqs);
6044 _dispatch_queue_init(dq, DQF_THREAD_BOUND | DQF_CANNOT_TRYSYNC, 1,
6045 DISPATCH_QUEUE_ROLE_BASE_ANON);
6046 dq->do_targetq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
6047 dq->dq_label = label ? label : "runloop-queue"; // no-copy contract
6048 _dispatch_runloop_queue_handle_init(dq);
6049 _dispatch_queue_set_bound_thread(dq);
6050 _dispatch_object_debug(dq, "%s", __func__);
6051 return _dispatch_introspection_queue_create(dq);
6052 }
6053
6054 void
6055 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq)
6056 {
6057 _dispatch_object_debug(dq, "%s", __func__);
6058
6059 dispatch_qos_t qos = _dispatch_runloop_queue_reset_max_qos(dq);
6060 _dispatch_queue_clear_bound_thread(dq);
6061 dx_wakeup(dq, qos, DISPATCH_WAKEUP_MAKE_DIRTY);
6062 if (qos) _dispatch_thread_override_end(DISPATCH_QUEUE_DRAIN_OWNER(dq), dq);
6063 }
6064
6065 void
6066 _dispatch_runloop_queue_dispose(dispatch_queue_t dq, bool *allow_free)
6067 {
6068 _dispatch_object_debug(dq, "%s", __func__);
6069 _dispatch_introspection_queue_dispose(dq);
6070 _dispatch_runloop_queue_handle_dispose(dq);
6071 _dispatch_queue_destroy(dq, allow_free);
6072 }
6073
6074 bool
6075 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq)
6076 {
6077 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
6078 DISPATCH_CLIENT_CRASH(dq->do_vtable, "Not a runloop queue");
6079 }
6080 dispatch_retain(dq);
6081 bool r = _dispatch_runloop_queue_drain_one(dq);
6082 dispatch_release(dq);
6083 return r;
6084 }
6085
6086 void
6087 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq)
6088 {
6089 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
6090 DISPATCH_CLIENT_CRASH(dq->do_vtable, "Not a runloop queue");
6091 }
6092 _dispatch_runloop_queue_wakeup(dq, 0, false);
6093 }
6094
6095 #if TARGET_OS_MAC
6096 dispatch_runloop_handle_t
6097 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq)
6098 {
6099 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
6100 DISPATCH_CLIENT_CRASH(dq->do_vtable, "Not a runloop queue");
6101 }
6102 return _dispatch_runloop_queue_get_handle(dq);
6103 }
6104 #endif
6105
6106 static void
6107 _dispatch_runloop_queue_handle_init(void *ctxt)
6108 {
6109 dispatch_queue_t dq = (dispatch_queue_t)ctxt;
6110 dispatch_runloop_handle_t handle;
6111
6112 _dispatch_fork_becomes_unsafe();
6113
6114 #if TARGET_OS_MAC
6115 mach_port_t mp;
6116 kern_return_t kr;
6117 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &mp);
6118 DISPATCH_VERIFY_MIG(kr);
6119 (void)dispatch_assume_zero(kr);
6120 kr = mach_port_insert_right(mach_task_self(), mp, mp,
6121 MACH_MSG_TYPE_MAKE_SEND);
6122 DISPATCH_VERIFY_MIG(kr);
6123 (void)dispatch_assume_zero(kr);
6124 if (dq != &_dispatch_main_q) {
6125 struct mach_port_limits limits = {
6126 .mpl_qlimit = 1,
6127 };
6128 kr = mach_port_set_attributes(mach_task_self(), mp,
6129 MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits,
6130 sizeof(limits));
6131 DISPATCH_VERIFY_MIG(kr);
6132 (void)dispatch_assume_zero(kr);
6133 }
6134 handle = mp;
6135 #elif defined(__linux__)
6136 int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
6137 if (fd == -1) {
6138 int err = errno;
6139 switch (err) {
6140 case EMFILE:
6141 DISPATCH_CLIENT_CRASH(err, "eventfd() failure: "
6142 "process is out of file descriptors");
6143 break;
6144 case ENFILE:
6145 DISPATCH_CLIENT_CRASH(err, "eventfd() failure: "
6146 "system is out of file descriptors");
6147 break;
6148 case ENOMEM:
6149 DISPATCH_CLIENT_CRASH(err, "eventfd() failure: "
6150 "kernel is out of memory");
6151 break;
6152 default:
6153 DISPATCH_INTERNAL_CRASH(err, "eventfd() failure");
6154 break;
6155 }
6156 }
6157 handle = fd;
6158 #else
6159 #error "runloop support not implemented on this platform"
6160 #endif
6161 _dispatch_runloop_queue_set_handle(dq, handle);
6162
6163 _dispatch_program_is_probably_callback_driven = true;
6164 }
6165
6166 static void
6167 _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq)
6168 {
6169 dispatch_runloop_handle_t handle = _dispatch_runloop_queue_get_handle(dq);
6170 if (!_dispatch_runloop_handle_is_valid(handle)) {
6171 return;
6172 }
6173 dq->do_ctxt = NULL;
6174 #if TARGET_OS_MAC
6175 mach_port_t mp = handle;
6176 kern_return_t kr = mach_port_deallocate(mach_task_self(), mp);
6177 DISPATCH_VERIFY_MIG(kr);
6178 (void)dispatch_assume_zero(kr);
6179 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE, -1);
6180 DISPATCH_VERIFY_MIG(kr);
6181 (void)dispatch_assume_zero(kr);
6182 #elif defined(__linux__)
6183 int rc = close(handle);
6184 (void)dispatch_assume_zero(rc);
6185 #else
6186 #error "runloop support not implemented on this platform"
6187 #endif
6188 }
6189
6190 #pragma mark -
6191 #pragma mark dispatch_main_queue
6192
6193 dispatch_runloop_handle_t
6194 _dispatch_get_main_queue_handle_4CF(void)
6195 {
6196 dispatch_queue_t dq = &_dispatch_main_q;
6197 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
6198 _dispatch_runloop_queue_handle_init);
6199 return _dispatch_runloop_queue_get_handle(dq);
6200 }
6201
6202 #if TARGET_OS_MAC
6203 dispatch_runloop_handle_t
6204 _dispatch_get_main_queue_port_4CF(void)
6205 {
6206 return _dispatch_get_main_queue_handle_4CF();
6207 }
6208 #endif
6209
6210 static bool main_q_is_draining;
6211
6212 // 6618342 Contact the team that owns the Instrument DTrace probe before
6213 // renaming this symbol
6214 DISPATCH_NOINLINE
6215 static void
6216 _dispatch_queue_set_mainq_drain_state(bool arg)
6217 {
6218 main_q_is_draining = arg;
6219 }
6220
6221 void
6222 _dispatch_main_queue_callback_4CF(
6223 void *ignored DISPATCH_UNUSED)
6224 {
6225 if (main_q_is_draining) {
6226 return;
6227 }
6228 _dispatch_queue_set_mainq_drain_state(true);
6229 _dispatch_main_queue_drain();
6230 _dispatch_queue_set_mainq_drain_state(false);
6231 }
6232
6233 #endif
6234
6235 void
6236 dispatch_main(void)
6237 {
6238 _dispatch_root_queues_init();
6239 #if HAVE_PTHREAD_MAIN_NP
6240 if (pthread_main_np()) {
6241 #endif
6242 _dispatch_object_debug(&_dispatch_main_q, "%s", __func__);
6243 _dispatch_program_is_probably_callback_driven = true;
6244 _dispatch_ktrace0(ARIADNE_ENTER_DISPATCH_MAIN_CODE);
6245 #ifdef __linux__
6246 // On Linux, if the main thread calls pthread_exit, the process becomes a zombie.
6247 // To avoid that, just before calling pthread_exit we register a TSD destructor
6248 // that will call _dispatch_sig_thread -- thus capturing the main thread in sigsuspend.
6249 // This relies on an implementation detail (currently true in glibc) that TSD destructors
6250 // will be called in the order of creation to cause all the TSD cleanup functions to
6251 // run before the thread becomes trapped in sigsuspend.
6252 pthread_key_t dispatch_main_key;
6253 pthread_key_create(&dispatch_main_key, _dispatch_sig_thread);
6254 pthread_setspecific(dispatch_main_key, &dispatch_main_key);
6255 _dispatch_sigmask();
6256 #endif
6257 pthread_exit(NULL);
6258 DISPATCH_INTERNAL_CRASH(errno, "pthread_exit() returned");
6259 #if HAVE_PTHREAD_MAIN_NP
6260 }
6261 DISPATCH_CLIENT_CRASH(0, "dispatch_main() must be called on the main thread");
6262 #endif
6263 }
6264
6265 DISPATCH_NOINLINE DISPATCH_NORETURN
6266 static void
6267 _dispatch_sigsuspend(void)
6268 {
6269 static const sigset_t mask;
6270
6271 for (;;) {
6272 sigsuspend(&mask);
6273 }
6274 }
6275
6276 DISPATCH_NORETURN
6277 static void
6278 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED)
6279 {
6280 // never returns, so burn bridges behind us
6281 _dispatch_clear_stack(0);
6282 _dispatch_sigsuspend();
6283 }
6284
6285 DISPATCH_NOINLINE
6286 static void
6287 _dispatch_queue_cleanup2(void)
6288 {
6289 dispatch_queue_t dq = &_dispatch_main_q;
6290 uint64_t old_state, new_state;
6291
6292 // Turning the main queue from a runloop queue into an ordinary serial queue
6293 // is a 3 steps operation:
6294 // 1. finish taking the main queue lock the usual way
6295 // 2. clear the THREAD_BOUND flag
6296 // 3. do a handoff
6297 //
6298 // If an enqueuer executes concurrently, he may do the wakeup the runloop
6299 // way, because he still believes the queue to be thread-bound, but the
6300 // dirty bit will force this codepath to notice the enqueue, and the usual
6301 // lock transfer will do the proper wakeup.
6302 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
6303 new_state = old_state & ~DISPATCH_QUEUE_DIRTY;
6304 new_state += DISPATCH_QUEUE_WIDTH_INTERVAL;
6305 new_state += DISPATCH_QUEUE_IN_BARRIER;
6306 });
6307 _dispatch_queue_atomic_flags_clear(dq, DQF_THREAD_BOUND|DQF_CANNOT_TRYSYNC);
6308 _dispatch_queue_barrier_complete(dq, 0, 0);
6309
6310 // overload the "probably" variable to mean that dispatch_main() or
6311 // similar non-POSIX API was called
6312 // this has to run before the DISPATCH_COCOA_COMPAT below
6313 // See dispatch_main for call to _dispatch_sig_thread on linux.
6314 #ifndef __linux__
6315 if (_dispatch_program_is_probably_callback_driven) {
6316 _dispatch_barrier_async_detached_f(_dispatch_get_root_queue(
6317 DISPATCH_QOS_DEFAULT, true), NULL, _dispatch_sig_thread);
6318 sleep(1); // workaround 6778970
6319 }
6320 #endif
6321
6322 #if DISPATCH_COCOA_COMPAT
6323 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
6324 _dispatch_runloop_queue_handle_init);
6325 _dispatch_runloop_queue_handle_dispose(dq);
6326 #endif
6327 }
6328
6329 static void
6330 _dispatch_queue_cleanup(void *ctxt)
6331 {
6332 if (ctxt == &_dispatch_main_q) {
6333 return _dispatch_queue_cleanup2();
6334 }
6335 // POSIX defines that destructors are only called if 'ctxt' is non-null
6336 DISPATCH_INTERNAL_CRASH(ctxt,
6337 "Premature thread exit while a dispatch queue is running");
6338 }
6339
6340 static void
6341 _dispatch_wlh_cleanup(void *ctxt)
6342 {
6343 // POSIX defines that destructors are only called if 'ctxt' is non-null
6344 dispatch_queue_t wlh;
6345 wlh = (dispatch_queue_t)((uintptr_t)ctxt & ~DISPATCH_WLH_STORAGE_REF);
6346 _dispatch_queue_release_storage(wlh);
6347 }
6348
6349 DISPATCH_NORETURN
6350 static void
6351 _dispatch_deferred_items_cleanup(void *ctxt)
6352 {
6353 // POSIX defines that destructors are only called if 'ctxt' is non-null
6354 DISPATCH_INTERNAL_CRASH(ctxt,
6355 "Premature thread exit with unhandled deferred items");
6356 }
6357
6358 DISPATCH_NORETURN
6359 static void
6360 _dispatch_frame_cleanup(void *ctxt)
6361 {
6362 // POSIX defines that destructors are only called if 'ctxt' is non-null
6363 DISPATCH_INTERNAL_CRASH(ctxt,
6364 "Premature thread exit while a dispatch frame is active");
6365 }
6366
6367 DISPATCH_NORETURN
6368 static void
6369 _dispatch_context_cleanup(void *ctxt)
6370 {
6371 // POSIX defines that destructors are only called if 'ctxt' is non-null
6372 DISPATCH_INTERNAL_CRASH(ctxt,
6373 "Premature thread exit while a dispatch context is set");
6374 }