]> git.saurik.com Git - apple/libdispatch.git/blob - src/queue.c
libdispatch-703.30.5.tar.gz
[apple/libdispatch.git] / src / queue.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #endif
25
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
29 #endif
30 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
31 #define DISPATCH_USE_PTHREAD_POOL 1
32 #endif
33 #if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || DISPATCH_DEBUG) \
34 && !defined(DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK)
35 #define DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1
36 #endif
37 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK && \
38 !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
39 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
40 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
41 #endif
42 #if HAVE_PTHREAD_WORKQUEUE_QOS && !DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
43 #undef HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
44 #define HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 0
45 #endif
46 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
47 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
48 #define pthread_workqueue_t void*
49 #endif
50
51 static void _dispatch_sig_thread(void *ctxt);
52 static void _dispatch_cache_cleanup(void *value);
53 static void _dispatch_sync_f(dispatch_queue_t dq, void *ctxt,
54 dispatch_function_t func, pthread_priority_t pp);
55 static void _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc);
56 static void _dispatch_queue_cleanup(void *ctxt);
57 static void _dispatch_deferred_items_cleanup(void *ctxt);
58 static void _dispatch_frame_cleanup(void *ctxt);
59 static void _dispatch_context_cleanup(void *ctxt);
60 static void _dispatch_non_barrier_complete(dispatch_queue_t dq);
61 static inline void _dispatch_global_queue_poke(dispatch_queue_t dq);
62 #if HAVE_PTHREAD_WORKQUEUES
63 static void _dispatch_worker_thread4(void *context);
64 #if HAVE_PTHREAD_WORKQUEUE_QOS
65 static void _dispatch_worker_thread3(pthread_priority_t priority);
66 #endif
67 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
68 static void _dispatch_worker_thread2(int priority, int options, void *context);
69 #endif
70 #endif
71 #if DISPATCH_USE_PTHREAD_POOL
72 static void *_dispatch_worker_thread(void *context);
73 static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
74 #endif
75
76 #if DISPATCH_COCOA_COMPAT
77 static dispatch_once_t _dispatch_main_q_handle_pred;
78 static void _dispatch_runloop_queue_poke(dispatch_queue_t dq,
79 pthread_priority_t pp, dispatch_wakeup_flags_t flags);
80 static void _dispatch_runloop_queue_handle_init(void *ctxt);
81 static void _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq);
82 #endif
83
84 static void _dispatch_root_queues_init_once(void *context);
85 static dispatch_once_t _dispatch_root_queues_pred;
86
87 #pragma mark -
88 #pragma mark dispatch_root_queue
89
90 struct dispatch_pthread_root_queue_context_s {
91 pthread_attr_t dpq_thread_attr;
92 dispatch_block_t dpq_thread_configure;
93 struct dispatch_semaphore_s dpq_thread_mediator;
94 dispatch_pthread_root_queue_observer_hooks_s dpq_observer_hooks;
95 };
96 typedef struct dispatch_pthread_root_queue_context_s *
97 dispatch_pthread_root_queue_context_t;
98
99 #if DISPATCH_ENABLE_THREAD_POOL
100 static struct dispatch_pthread_root_queue_context_s
101 _dispatch_pthread_root_queue_contexts[] = {
102 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {
103 .dpq_thread_mediator = {
104 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
105 }},
106 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {
107 .dpq_thread_mediator = {
108 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
109 }},
110 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {
111 .dpq_thread_mediator = {
112 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
113 }},
114 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {
115 .dpq_thread_mediator = {
116 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
117 }},
118 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {
119 .dpq_thread_mediator = {
120 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
121 }},
122 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {
123 .dpq_thread_mediator = {
124 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
125 }},
126 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {
127 .dpq_thread_mediator = {
128 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
129 }},
130 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {
131 .dpq_thread_mediator = {
132 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
133 }},
134 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {
135 .dpq_thread_mediator = {
136 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
137 }},
138 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {
139 .dpq_thread_mediator = {
140 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
141 }},
142 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {
143 .dpq_thread_mediator = {
144 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
145 }},
146 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {
147 .dpq_thread_mediator = {
148 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
149 }},
150 };
151 #endif
152
153 #define MAX_PTHREAD_COUNT 255
154
155 struct dispatch_root_queue_context_s {
156 union {
157 struct {
158 unsigned int volatile dgq_pending;
159 #if HAVE_PTHREAD_WORKQUEUES
160 qos_class_t dgq_qos;
161 int dgq_wq_priority, dgq_wq_options;
162 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
163 pthread_workqueue_t dgq_kworkqueue;
164 #endif
165 #endif // HAVE_PTHREAD_WORKQUEUES
166 #if DISPATCH_USE_PTHREAD_POOL
167 void *dgq_ctxt;
168 uint32_t volatile dgq_thread_pool_size;
169 #endif
170 };
171 char _dgq_pad[DISPATCH_CACHELINE_SIZE];
172 };
173 };
174 typedef struct dispatch_root_queue_context_s *dispatch_root_queue_context_t;
175
176 #define WORKQ_PRIO_INVALID (-1)
177 #ifndef WORKQ_BG_PRIOQUEUE_CONDITIONAL
178 #define WORKQ_BG_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
179 #endif
180 #ifndef WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
181 #define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
182 #endif
183
184 DISPATCH_CACHELINE_ALIGN
185 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
186 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
187 #if HAVE_PTHREAD_WORKQUEUES
188 .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE,
189 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
190 .dgq_wq_options = 0,
191 #endif
192 #if DISPATCH_ENABLE_THREAD_POOL
193 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
194 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS],
195 #endif
196 }}},
197 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
198 #if HAVE_PTHREAD_WORKQUEUES
199 .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE,
200 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
201 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
202 #endif
203 #if DISPATCH_ENABLE_THREAD_POOL
204 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
205 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT],
206 #endif
207 }}},
208 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
209 #if HAVE_PTHREAD_WORKQUEUES
210 .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND,
211 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
212 .dgq_wq_options = 0,
213 #endif
214 #if DISPATCH_ENABLE_THREAD_POOL
215 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
216 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
217 #endif
218 }}},
219 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
220 #if HAVE_PTHREAD_WORKQUEUES
221 .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND,
222 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
223 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
224 #endif
225 #if DISPATCH_ENABLE_THREAD_POOL
226 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
227 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
228 #endif
229 }}},
230 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
231 #if HAVE_PTHREAD_WORKQUEUES
232 .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY,
233 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
234 .dgq_wq_options = 0,
235 #endif
236 #if DISPATCH_ENABLE_THREAD_POOL
237 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
238 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
239 #endif
240 }}},
241 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
242 #if HAVE_PTHREAD_WORKQUEUES
243 .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY,
244 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
245 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
246 #endif
247 #if DISPATCH_ENABLE_THREAD_POOL
248 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
249 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
250 #endif
251 }}},
252 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
253 #if HAVE_PTHREAD_WORKQUEUES
254 .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT,
255 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
256 .dgq_wq_options = 0,
257 #endif
258 #if DISPATCH_ENABLE_THREAD_POOL
259 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
260 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
261 #endif
262 }}},
263 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
264 #if HAVE_PTHREAD_WORKQUEUES
265 .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT,
266 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
267 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
268 #endif
269 #if DISPATCH_ENABLE_THREAD_POOL
270 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
271 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
272 #endif
273 }}},
274 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
275 #if HAVE_PTHREAD_WORKQUEUES
276 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED,
277 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
278 .dgq_wq_options = 0,
279 #endif
280 #if DISPATCH_ENABLE_THREAD_POOL
281 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
282 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
283 #endif
284 }}},
285 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
286 #if HAVE_PTHREAD_WORKQUEUES
287 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED,
288 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
289 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
290 #endif
291 #if DISPATCH_ENABLE_THREAD_POOL
292 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
293 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
294 #endif
295 }}},
296 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
297 #if HAVE_PTHREAD_WORKQUEUES
298 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE,
299 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
300 .dgq_wq_options = 0,
301 #endif
302 #if DISPATCH_ENABLE_THREAD_POOL
303 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
304 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS],
305 #endif
306 }}},
307 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
308 #if HAVE_PTHREAD_WORKQUEUES
309 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE,
310 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
311 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
312 #endif
313 #if DISPATCH_ENABLE_THREAD_POOL
314 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
315 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT],
316 #endif
317 }}},
318 };
319
320 // 6618342 Contact the team that owns the Instrument DTrace probe before
321 // renaming this symbol
322 DISPATCH_CACHELINE_ALIGN
323 struct dispatch_queue_s _dispatch_root_queues[] = {
324 #define _DISPATCH_ROOT_QUEUE_ENTRY(n, ...) \
325 [DISPATCH_ROOT_QUEUE_IDX_##n] = { \
326 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root), \
327 .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
328 .do_ctxt = &_dispatch_root_queue_contexts[ \
329 DISPATCH_ROOT_QUEUE_IDX_##n], \
330 .dq_width = DISPATCH_QUEUE_WIDTH_POOL, \
331 .dq_override_voucher = DISPATCH_NO_VOUCHER, \
332 .dq_override = DISPATCH_SATURATED_OVERRIDE, \
333 __VA_ARGS__ \
334 }
335 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE_QOS,
336 .dq_label = "com.apple.root.maintenance-qos",
337 .dq_serialnum = 4,
338 ),
339 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE_QOS_OVERCOMMIT,
340 .dq_label = "com.apple.root.maintenance-qos.overcommit",
341 .dq_serialnum = 5,
342 ),
343 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND_QOS,
344 .dq_label = "com.apple.root.background-qos",
345 .dq_serialnum = 6,
346 ),
347 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND_QOS_OVERCOMMIT,
348 .dq_label = "com.apple.root.background-qos.overcommit",
349 .dq_serialnum = 7,
350 ),
351 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY_QOS,
352 .dq_label = "com.apple.root.utility-qos",
353 .dq_serialnum = 8,
354 ),
355 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY_QOS_OVERCOMMIT,
356 .dq_label = "com.apple.root.utility-qos.overcommit",
357 .dq_serialnum = 9,
358 ),
359 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT_QOS,
360 .dq_label = "com.apple.root.default-qos",
361 .dq_serialnum = 10,
362 ),
363 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT_QOS_OVERCOMMIT,
364 .dq_label = "com.apple.root.default-qos.overcommit",
365 .dq_serialnum = 11,
366 ),
367 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED_QOS,
368 .dq_label = "com.apple.root.user-initiated-qos",
369 .dq_serialnum = 12,
370 ),
371 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED_QOS_OVERCOMMIT,
372 .dq_label = "com.apple.root.user-initiated-qos.overcommit",
373 .dq_serialnum = 13,
374 ),
375 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE_QOS,
376 .dq_label = "com.apple.root.user-interactive-qos",
377 .dq_serialnum = 14,
378 ),
379 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE_QOS_OVERCOMMIT,
380 .dq_label = "com.apple.root.user-interactive-qos.overcommit",
381 .dq_serialnum = 15,
382 ),
383 };
384
385 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
386 static const dispatch_queue_t _dispatch_wq2root_queues[][2] = {
387 [WORKQ_BG_PRIOQUEUE][0] = &_dispatch_root_queues[
388 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
389 [WORKQ_BG_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
390 &_dispatch_root_queues[
391 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
392 [WORKQ_LOW_PRIOQUEUE][0] = &_dispatch_root_queues[
393 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
394 [WORKQ_LOW_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
395 &_dispatch_root_queues[
396 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
397 [WORKQ_DEFAULT_PRIOQUEUE][0] = &_dispatch_root_queues[
398 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
399 [WORKQ_DEFAULT_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
400 &_dispatch_root_queues[
401 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
402 [WORKQ_HIGH_PRIOQUEUE][0] = &_dispatch_root_queues[
403 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
404 [WORKQ_HIGH_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
405 &_dispatch_root_queues[
406 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
407 };
408 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
409
410 #define DISPATCH_PRIORITY_COUNT 5
411
412 enum {
413 // No DISPATCH_PRIORITY_IDX_MAINTENANCE define because there is no legacy
414 // maintenance priority
415 DISPATCH_PRIORITY_IDX_BACKGROUND = 0,
416 DISPATCH_PRIORITY_IDX_NON_INTERACTIVE,
417 DISPATCH_PRIORITY_IDX_LOW,
418 DISPATCH_PRIORITY_IDX_DEFAULT,
419 DISPATCH_PRIORITY_IDX_HIGH,
420 };
421
422 static qos_class_t _dispatch_priority2qos[] = {
423 [DISPATCH_PRIORITY_IDX_BACKGROUND] = _DISPATCH_QOS_CLASS_BACKGROUND,
424 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = _DISPATCH_QOS_CLASS_UTILITY,
425 [DISPATCH_PRIORITY_IDX_LOW] = _DISPATCH_QOS_CLASS_UTILITY,
426 [DISPATCH_PRIORITY_IDX_DEFAULT] = _DISPATCH_QOS_CLASS_DEFAULT,
427 [DISPATCH_PRIORITY_IDX_HIGH] = _DISPATCH_QOS_CLASS_USER_INITIATED,
428 };
429
430 #if HAVE_PTHREAD_WORKQUEUE_QOS
431 static const int _dispatch_priority2wq[] = {
432 [DISPATCH_PRIORITY_IDX_BACKGROUND] = WORKQ_BG_PRIOQUEUE,
433 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = WORKQ_NON_INTERACTIVE_PRIOQUEUE,
434 [DISPATCH_PRIORITY_IDX_LOW] = WORKQ_LOW_PRIOQUEUE,
435 [DISPATCH_PRIORITY_IDX_DEFAULT] = WORKQ_DEFAULT_PRIOQUEUE,
436 [DISPATCH_PRIORITY_IDX_HIGH] = WORKQ_HIGH_PRIOQUEUE,
437 };
438 #endif
439
440 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
441 static struct dispatch_queue_s _dispatch_mgr_root_queue;
442 #else
443 #define _dispatch_mgr_root_queue _dispatch_root_queues[\
444 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT]
445 #endif
446
447 // 6618342 Contact the team that owns the Instrument DTrace probe before
448 // renaming this symbol
449 DISPATCH_CACHELINE_ALIGN
450 struct dispatch_queue_s _dispatch_mgr_q = {
451 DISPATCH_GLOBAL_OBJECT_HEADER(queue_mgr),
452 .dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1),
453 .do_targetq = &_dispatch_mgr_root_queue,
454 .dq_label = "com.apple.libdispatch-manager",
455 .dq_width = 1,
456 .dq_override_voucher = DISPATCH_NO_VOUCHER,
457 .dq_override = DISPATCH_SATURATED_OVERRIDE,
458 .dq_serialnum = 2,
459 };
460
461 dispatch_queue_t
462 dispatch_get_global_queue(long priority, unsigned long flags)
463 {
464 if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
465 return DISPATCH_BAD_INPUT;
466 }
467 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
468 _dispatch_root_queues_init_once);
469 qos_class_t qos;
470 switch (priority) {
471 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
472 case _DISPATCH_QOS_CLASS_MAINTENANCE:
473 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS]
474 .dq_priority) {
475 // map maintenance to background on old kernel
476 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND];
477 } else {
478 qos = (qos_class_t)priority;
479 }
480 break;
481 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
482 case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
483 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND];
484 break;
485 case DISPATCH_QUEUE_PRIORITY_NON_INTERACTIVE:
486 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_NON_INTERACTIVE];
487 break;
488 case DISPATCH_QUEUE_PRIORITY_LOW:
489 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_LOW];
490 break;
491 case DISPATCH_QUEUE_PRIORITY_DEFAULT:
492 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_DEFAULT];
493 break;
494 case DISPATCH_QUEUE_PRIORITY_HIGH:
495 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH];
496 break;
497 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE:
498 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
499 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS]
500 .dq_priority) {
501 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH];
502 break;
503 }
504 #endif
505 // fallthrough
506 default:
507 qos = (qos_class_t)priority;
508 break;
509 }
510 return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT);
511 }
512
513 DISPATCH_ALWAYS_INLINE
514 static inline dispatch_queue_t
515 _dispatch_get_current_queue(void)
516 {
517 return _dispatch_queue_get_current() ?:
518 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true);
519 }
520
521 dispatch_queue_t
522 dispatch_get_current_queue(void)
523 {
524 return _dispatch_get_current_queue();
525 }
526
527 DISPATCH_NOINLINE DISPATCH_NORETURN
528 static void
529 _dispatch_assert_queue_fail(dispatch_queue_t dq, bool expected)
530 {
531 _dispatch_client_assert_fail(
532 "Block was %sexpected to execute on queue [%s]",
533 expected ? "" : "not ", dq->dq_label ?: "");
534 }
535
536 DISPATCH_NOINLINE DISPATCH_NORETURN
537 static void
538 _dispatch_assert_queue_barrier_fail(dispatch_queue_t dq)
539 {
540 _dispatch_client_assert_fail(
541 "Block was expected to act as a barrier on queue [%s]",
542 dq->dq_label ?: "");
543 }
544
545 void
546 dispatch_assert_queue(dispatch_queue_t dq)
547 {
548 unsigned long metatype = dx_metatype(dq);
549 if (unlikely(metatype != _DISPATCH_QUEUE_TYPE)) {
550 DISPATCH_CLIENT_CRASH(metatype, "invalid queue passed to "
551 "dispatch_assert_queue()");
552 }
553 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
554 if (unlikely(_dq_state_drain_pended(dq_state))) {
555 goto fail;
556 }
557 if (likely(_dq_state_drain_owner(dq_state) == _dispatch_tid_self())) {
558 return;
559 }
560 if (likely(dq->dq_width > 1)) {
561 // we can look at the width: if it is changing while we read it,
562 // it means that a barrier is running on `dq` concurrently, which
563 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
564 if (fastpath(_dispatch_thread_frame_find_queue(dq))) {
565 return;
566 }
567 }
568 fail:
569 _dispatch_assert_queue_fail(dq, true);
570 }
571
572 void
573 dispatch_assert_queue_not(dispatch_queue_t dq)
574 {
575 unsigned long metatype = dx_metatype(dq);
576 if (unlikely(metatype != _DISPATCH_QUEUE_TYPE)) {
577 DISPATCH_CLIENT_CRASH(metatype, "invalid queue passed to "
578 "dispatch_assert_queue_not()");
579 }
580 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
581 if (_dq_state_drain_pended(dq_state)) {
582 return;
583 }
584 if (likely(_dq_state_drain_owner(dq_state) != _dispatch_tid_self())) {
585 if (likely(dq->dq_width == 1)) {
586 // we can look at the width: if it is changing while we read it,
587 // it means that a barrier is running on `dq` concurrently, which
588 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
589 return;
590 }
591 if (likely(!_dispatch_thread_frame_find_queue(dq))) {
592 return;
593 }
594 }
595 _dispatch_assert_queue_fail(dq, false);
596 }
597
598 void
599 dispatch_assert_queue_barrier(dispatch_queue_t dq)
600 {
601 dispatch_assert_queue(dq);
602
603 if (likely(dq->dq_width == 1)) {
604 return;
605 }
606
607 if (likely(dq->do_targetq)) {
608 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
609 if (likely(_dq_state_is_in_barrier(dq_state))) {
610 return;
611 }
612 }
613
614 _dispatch_assert_queue_barrier_fail(dq);
615 }
616
617 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
618 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
619 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
620 #else
621 #define _dispatch_root_queue_debug(...)
622 #define _dispatch_debug_root_queue(...)
623 #endif
624
625 #pragma mark -
626 #pragma mark dispatch_init
627
628 #if HAVE_PTHREAD_WORKQUEUE_QOS
629 pthread_priority_t _dispatch_background_priority;
630 pthread_priority_t _dispatch_user_initiated_priority;
631
632 static void
633 _dispatch_root_queues_init_qos(int supported)
634 {
635 pthread_priority_t p;
636 qos_class_t qos;
637 unsigned int i;
638 for (i = 0; i < DISPATCH_PRIORITY_COUNT; i++) {
639 p = _pthread_qos_class_encode_workqueue(_dispatch_priority2wq[i], 0);
640 qos = _pthread_qos_class_decode(p, NULL, NULL);
641 dispatch_assert(qos != _DISPATCH_QOS_CLASS_UNSPECIFIED);
642 _dispatch_priority2qos[i] = qos;
643 }
644 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
645 qos = _dispatch_root_queue_contexts[i].dgq_qos;
646 if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
647 !(supported & WORKQ_FEATURE_MAINTENANCE)) {
648 continue;
649 }
650 unsigned long flags = i & 1 ? _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0;
651 flags |= _PTHREAD_PRIORITY_ROOTQUEUE_FLAG;
652 if (i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS ||
653 i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT) {
654 flags |= _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
655 }
656 p = _pthread_qos_class_encode(qos, 0, flags);
657 _dispatch_root_queues[i].dq_priority = (dispatch_priority_t)p;
658 }
659 }
660 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
661
662 static inline bool
663 _dispatch_root_queues_init_workq(int *wq_supported)
664 {
665 int r;
666 bool result = false;
667 *wq_supported = 0;
668 #if HAVE_PTHREAD_WORKQUEUES
669 bool disable_wq = false;
670 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
671 disable_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
672 #endif
673 #if DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
674 bool disable_qos = false;
675 #if DISPATCH_DEBUG
676 disable_qos = slowpath(getenv("LIBDISPATCH_DISABLE_QOS"));
677 #endif
678 #if DISPATCH_USE_KEVENT_WORKQUEUE
679 bool disable_kevent_wq = false;
680 #if DISPATCH_DEBUG
681 disable_kevent_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KEVENT_WQ"));
682 #endif
683 #endif
684 if (!disable_wq && !disable_qos) {
685 *wq_supported = _pthread_workqueue_supported();
686 #if DISPATCH_USE_KEVENT_WORKQUEUE
687 if (!disable_kevent_wq && (*wq_supported & WORKQ_FEATURE_KEVENT)) {
688 r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread3,
689 (pthread_workqueue_function_kevent_t)
690 _dispatch_kevent_worker_thread,
691 offsetof(struct dispatch_queue_s, dq_serialnum), 0);
692 #if DISPATCH_USE_MGR_THREAD
693 _dispatch_kevent_workqueue_enabled = !r;
694 #endif
695 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
696 _dispatch_evfilt_machport_direct_enabled = !r;
697 #endif
698 result = !r;
699 } else
700 #endif
701 if (*wq_supported & WORKQ_FEATURE_FINEPRIO) {
702 #if DISPATCH_USE_MGR_THREAD
703 r = _pthread_workqueue_init(_dispatch_worker_thread3,
704 offsetof(struct dispatch_queue_s, dq_serialnum), 0);
705 result = !r;
706 #endif
707 }
708 if (result) _dispatch_root_queues_init_qos(*wq_supported);
709 }
710 #endif // DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
711 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
712 if (!result && !disable_wq) {
713 pthread_workqueue_setdispatchoffset_np(
714 offsetof(struct dispatch_queue_s, dq_serialnum));
715 r = pthread_workqueue_setdispatch_np(_dispatch_worker_thread2);
716 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
717 (void)dispatch_assume_zero(r);
718 #endif
719 result = !r;
720 }
721 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
722 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
723 if (!result) {
724 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
725 pthread_workqueue_attr_t pwq_attr;
726 if (!disable_wq) {
727 r = pthread_workqueue_attr_init_np(&pwq_attr);
728 (void)dispatch_assume_zero(r);
729 }
730 #endif
731 int i;
732 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
733 pthread_workqueue_t pwq = NULL;
734 dispatch_root_queue_context_t qc;
735 qc = &_dispatch_root_queue_contexts[i];
736 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
737 if (!disable_wq && qc->dgq_wq_priority != WORKQ_PRIO_INVALID) {
738 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr,
739 qc->dgq_wq_priority);
740 (void)dispatch_assume_zero(r);
741 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr,
742 qc->dgq_wq_options &
743 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT);
744 (void)dispatch_assume_zero(r);
745 r = pthread_workqueue_create_np(&pwq, &pwq_attr);
746 (void)dispatch_assume_zero(r);
747 result = result || dispatch_assume(pwq);
748 }
749 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
750 qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul);
751 }
752 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
753 if (!disable_wq) {
754 r = pthread_workqueue_attr_destroy_np(&pwq_attr);
755 (void)dispatch_assume_zero(r);
756 }
757 #endif
758 }
759 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
760 #endif // HAVE_PTHREAD_WORKQUEUES
761 return result;
762 }
763
764 #if DISPATCH_USE_PTHREAD_POOL
765 static inline void
766 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc,
767 uint8_t pool_size, bool overcommit)
768 {
769 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
770 uint32_t thread_pool_size = overcommit ? MAX_PTHREAD_COUNT :
771 dispatch_hw_config(active_cpus);
772 if (slowpath(pool_size) && pool_size < thread_pool_size) {
773 thread_pool_size = pool_size;
774 }
775 qc->dgq_thread_pool_size = thread_pool_size;
776 #if HAVE_PTHREAD_WORKQUEUES
777 if (qc->dgq_qos) {
778 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
779 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
780 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
781 #if HAVE_PTHREAD_WORKQUEUE_QOS
782 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(
783 &pqc->dpq_thread_attr, qc->dgq_qos, 0));
784 #endif
785 }
786 #endif // HAVE_PTHREAD_WORKQUEUES
787 #if USE_MACH_SEM
788 // override the default FIFO behavior for the pool semaphores
789 kern_return_t kr = semaphore_create(mach_task_self(),
790 &pqc->dpq_thread_mediator.dsema_port, SYNC_POLICY_LIFO, 0);
791 DISPATCH_VERIFY_MIG(kr);
792 (void)dispatch_assume_zero(kr);
793 (void)dispatch_assume(pqc->dpq_thread_mediator.dsema_port);
794 #elif USE_POSIX_SEM
795 /* XXXRW: POSIX semaphores don't support LIFO? */
796 int ret = sem_init(&(pqc->dpq_thread_mediator.dsema_sem), 0, 0);
797 (void)dispatch_assume_zero(ret);
798 #endif
799 }
800 #endif // DISPATCH_USE_PTHREAD_POOL
801
802 static dispatch_once_t _dispatch_root_queues_pred;
803
804 void
805 _dispatch_root_queues_init(void)
806 {
807 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
808 _dispatch_root_queues_init_once);
809 }
810
811 static void
812 _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
813 {
814 int wq_supported;
815 _dispatch_fork_becomes_unsafe();
816 if (!_dispatch_root_queues_init_workq(&wq_supported)) {
817 #if DISPATCH_ENABLE_THREAD_POOL
818 int i;
819 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
820 bool overcommit = true;
821 #if TARGET_OS_EMBEDDED
822 // some software hangs if the non-overcommitting queues do not
823 // overcommit when threads block. Someday, this behavior should
824 // apply to all platforms
825 if (!(i & 1)) {
826 overcommit = false;
827 }
828 #endif
829 _dispatch_root_queue_init_pthread_pool(
830 &_dispatch_root_queue_contexts[i], 0, overcommit);
831 }
832 #else
833 DISPATCH_INTERNAL_CRASH((errno << 16) | wq_supported,
834 "Root queue initialization failed");
835 #endif // DISPATCH_ENABLE_THREAD_POOL
836 }
837 }
838
839 DISPATCH_EXPORT DISPATCH_NOTHROW
840 void
841 libdispatch_init(void)
842 {
843 dispatch_assert(DISPATCH_QUEUE_QOS_COUNT == 6);
844 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 12);
845
846 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW ==
847 -DISPATCH_QUEUE_PRIORITY_HIGH);
848 dispatch_assert(countof(_dispatch_root_queues) ==
849 DISPATCH_ROOT_QUEUE_COUNT);
850 dispatch_assert(countof(_dispatch_root_queue_contexts) ==
851 DISPATCH_ROOT_QUEUE_COUNT);
852 dispatch_assert(countof(_dispatch_priority2qos) ==
853 DISPATCH_PRIORITY_COUNT);
854 #if HAVE_PTHREAD_WORKQUEUE_QOS
855 dispatch_assert(countof(_dispatch_priority2wq) ==
856 DISPATCH_PRIORITY_COUNT);
857 #endif
858 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
859 dispatch_assert(sizeof(_dispatch_wq2root_queues) /
860 sizeof(_dispatch_wq2root_queues[0][0]) ==
861 WORKQ_NUM_PRIOQUEUE * 2);
862 #endif
863 #if DISPATCH_ENABLE_THREAD_POOL
864 dispatch_assert(countof(_dispatch_pthread_root_queue_contexts) ==
865 DISPATCH_ROOT_QUEUE_COUNT);
866 #endif
867
868 dispatch_assert(offsetof(struct dispatch_continuation_s, do_next) ==
869 offsetof(struct dispatch_object_s, do_next));
870 dispatch_assert(offsetof(struct dispatch_continuation_s, do_vtable) ==
871 offsetof(struct dispatch_object_s, do_vtable));
872 dispatch_assert(sizeof(struct dispatch_apply_s) <=
873 DISPATCH_CONTINUATION_SIZE);
874 dispatch_assert(sizeof(struct dispatch_queue_s) % DISPATCH_CACHELINE_SIZE
875 == 0);
876 dispatch_assert(offsetof(struct dispatch_queue_s, dq_state) % _Alignof(uint64_t) == 0);
877 dispatch_assert(sizeof(struct dispatch_root_queue_context_s) %
878 DISPATCH_CACHELINE_SIZE == 0);
879
880
881 #if HAVE_PTHREAD_WORKQUEUE_QOS
882 // 26497968 _dispatch_user_initiated_priority should be set for qos
883 // propagation to work properly
884 pthread_priority_t p = _pthread_qos_class_encode(qos_class_main(), 0, 0);
885 _dispatch_main_q.dq_priority = (dispatch_priority_t)p;
886 _dispatch_main_q.dq_override = p & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
887 p = _pthread_qos_class_encode(_DISPATCH_QOS_CLASS_USER_INITIATED, 0, 0);
888 _dispatch_user_initiated_priority = p;
889 p = _pthread_qos_class_encode(_DISPATCH_QOS_CLASS_BACKGROUND, 0, 0);
890 _dispatch_background_priority = p;
891 #if DISPATCH_DEBUG
892 if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) {
893 _dispatch_set_qos_class_enabled = 1;
894 }
895 #endif
896 #endif
897
898 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
899 _dispatch_thread_key_create(&__dispatch_tsd_key, _libdispatch_tsd_cleanup);
900 #else
901 _dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup);
902 _dispatch_thread_key_create(&dispatch_deferred_items_key,
903 _dispatch_deferred_items_cleanup);
904 _dispatch_thread_key_create(&dispatch_frame_key, _dispatch_frame_cleanup);
905 _dispatch_thread_key_create(&dispatch_voucher_key, _voucher_thread_cleanup);
906 _dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup);
907 _dispatch_thread_key_create(&dispatch_context_key, _dispatch_context_cleanup);
908 _dispatch_thread_key_create(&dispatch_defaultpriority_key, NULL);
909 _dispatch_thread_key_create(&dispatch_pthread_root_queue_observer_hooks_key,
910 NULL);
911 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
912 _dispatch_thread_key_create(&dispatch_bcounter_key, NULL);
913 #endif
914 #if DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
915 if (DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK) {
916 _dispatch_thread_key_create(&dispatch_sema4_key,
917 _dispatch_thread_semaphore_dispose);
918 }
919 #endif
920 #endif
921
922 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
923 _dispatch_main_q.do_targetq = &_dispatch_root_queues[
924 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT];
925 #endif
926
927 _dispatch_queue_set_current(&_dispatch_main_q);
928 _dispatch_queue_set_bound_thread(&_dispatch_main_q);
929
930 #if DISPATCH_USE_PTHREAD_ATFORK
931 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare,
932 dispatch_atfork_parent, dispatch_atfork_child));
933 #endif
934 _dispatch_hw_config_init();
935 _dispatch_vtable_init();
936 _os_object_init();
937 _voucher_init();
938 _dispatch_introspection_init();
939 }
940
941 #if HAVE_MACH
942 static dispatch_once_t _dispatch_mach_host_port_pred;
943 static mach_port_t _dispatch_mach_host_port;
944
945 static void
946 _dispatch_mach_host_port_init(void *ctxt DISPATCH_UNUSED)
947 {
948 kern_return_t kr;
949 mach_port_t mp, mhp = mach_host_self();
950 kr = host_get_host_port(mhp, &mp);
951 DISPATCH_VERIFY_MIG(kr);
952 if (fastpath(!kr)) {
953 // mach_host_self returned the HOST_PRIV port
954 kr = mach_port_deallocate(mach_task_self(), mhp);
955 DISPATCH_VERIFY_MIG(kr);
956 mhp = mp;
957 } else if (kr != KERN_INVALID_ARGUMENT) {
958 (void)dispatch_assume_zero(kr);
959 }
960 if (!fastpath(mhp)) {
961 DISPATCH_CLIENT_CRASH(kr, "Could not get unprivileged host port");
962 }
963 _dispatch_mach_host_port = mhp;
964 }
965
966 mach_port_t
967 _dispatch_get_mach_host_port(void)
968 {
969 dispatch_once_f(&_dispatch_mach_host_port_pred, NULL,
970 _dispatch_mach_host_port_init);
971 return _dispatch_mach_host_port;
972 }
973 #endif
974
975 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
976 #include <unistd.h>
977 #include <sys/syscall.h>
978
979 #ifdef SYS_gettid
980 DISPATCH_ALWAYS_INLINE
981 static inline pid_t
982 gettid(void)
983 {
984 return (pid_t) syscall(SYS_gettid);
985 }
986 #else
987 #error "SYS_gettid unavailable on this system"
988 #endif
989
990 #define _tsd_call_cleanup(k, f) do { \
991 if ((f) && tsd->k) ((void(*)(void*))(f))(tsd->k); \
992 } while (0)
993
994 void
995 _libdispatch_tsd_cleanup(void *ctx)
996 {
997 struct dispatch_tsd *tsd = (struct dispatch_tsd*) ctx;
998
999 _tsd_call_cleanup(dispatch_queue_key, _dispatch_queue_cleanup);
1000 _tsd_call_cleanup(dispatch_frame_key, _dispatch_frame_cleanup);
1001 _tsd_call_cleanup(dispatch_cache_key, _dispatch_cache_cleanup);
1002 _tsd_call_cleanup(dispatch_context_key, _dispatch_context_cleanup);
1003 _tsd_call_cleanup(dispatch_pthread_root_queue_observer_hooks_key,
1004 NULL);
1005 _tsd_call_cleanup(dispatch_defaultpriority_key, NULL);
1006 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
1007 _tsd_call_cleanup(dispatch_bcounter_key, NULL);
1008 #endif
1009 #if DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
1010 _tsd_call_cleanup(dispatch_sema4_key, _dispatch_thread_semaphore_dispose);
1011 #endif
1012 _tsd_call_cleanup(dispatch_priority_key, NULL);
1013 _tsd_call_cleanup(dispatch_voucher_key, _voucher_thread_cleanup);
1014 _tsd_call_cleanup(dispatch_deferred_items_key,
1015 _dispatch_deferred_items_cleanup);
1016 tsd->tid = 0;
1017 }
1018
1019 DISPATCH_NOINLINE
1020 void
1021 libdispatch_tsd_init(void)
1022 {
1023 pthread_setspecific(__dispatch_tsd_key, &__dispatch_tsd);
1024 __dispatch_tsd.tid = gettid();
1025 }
1026 #endif
1027
1028 DISPATCH_EXPORT DISPATCH_NOTHROW
1029 void
1030 dispatch_atfork_child(void)
1031 {
1032 void *crash = (void *)0x100;
1033 size_t i;
1034
1035 #if HAVE_MACH
1036 _dispatch_mach_host_port_pred = 0;
1037 _dispatch_mach_host_port = MACH_VOUCHER_NULL;
1038 #endif
1039 _voucher_atfork_child();
1040 if (!_dispatch_is_multithreaded_inline()) {
1041 // clear the _PROHIBIT bit if set
1042 _dispatch_unsafe_fork = 0;
1043 return;
1044 }
1045 _dispatch_unsafe_fork = 0;
1046 _dispatch_child_of_unsafe_fork = true;
1047
1048 _dispatch_main_q.dq_items_head = crash;
1049 _dispatch_main_q.dq_items_tail = crash;
1050
1051 _dispatch_mgr_q.dq_items_head = crash;
1052 _dispatch_mgr_q.dq_items_tail = crash;
1053
1054 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
1055 _dispatch_root_queues[i].dq_items_head = crash;
1056 _dispatch_root_queues[i].dq_items_tail = crash;
1057 }
1058 }
1059
1060 #pragma mark -
1061 #pragma mark dispatch_queue_attr_t
1062
1063 DISPATCH_ALWAYS_INLINE
1064 static inline bool
1065 _dispatch_qos_class_valid(dispatch_qos_class_t qos_class, int relative_priority)
1066 {
1067 qos_class_t qos = (qos_class_t)qos_class;
1068 switch (qos) {
1069 case _DISPATCH_QOS_CLASS_MAINTENANCE:
1070 case _DISPATCH_QOS_CLASS_BACKGROUND:
1071 case _DISPATCH_QOS_CLASS_UTILITY:
1072 case _DISPATCH_QOS_CLASS_DEFAULT:
1073 case _DISPATCH_QOS_CLASS_USER_INITIATED:
1074 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE:
1075 case _DISPATCH_QOS_CLASS_UNSPECIFIED:
1076 break;
1077 default:
1078 return false;
1079 }
1080 if (relative_priority > 0 || relative_priority < QOS_MIN_RELATIVE_PRIORITY){
1081 return false;
1082 }
1083 return true;
1084 }
1085
1086 #define DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(qos) \
1087 [_DISPATCH_QOS_CLASS_##qos] = DQA_INDEX_QOS_CLASS_##qos
1088
1089 static const
1090 _dispatch_queue_attr_index_qos_class_t _dispatch_queue_attr_qos2idx[] = {
1091 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UNSPECIFIED),
1092 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(MAINTENANCE),
1093 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(BACKGROUND),
1094 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UTILITY),
1095 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(DEFAULT),
1096 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INITIATED),
1097 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INTERACTIVE),
1098 };
1099
1100 #define DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit) \
1101 ((overcommit) == _dispatch_queue_attr_overcommit_disabled ? \
1102 DQA_INDEX_NON_OVERCOMMIT : \
1103 ((overcommit) == _dispatch_queue_attr_overcommit_enabled ? \
1104 DQA_INDEX_OVERCOMMIT : DQA_INDEX_UNSPECIFIED_OVERCOMMIT))
1105
1106 #define DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent) \
1107 ((concurrent) ? DQA_INDEX_CONCURRENT : DQA_INDEX_SERIAL)
1108
1109 #define DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive) \
1110 ((inactive) ? DQA_INDEX_INACTIVE : DQA_INDEX_ACTIVE)
1111
1112 #define DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency) \
1113 (frequency)
1114
1115 #define DISPATCH_QUEUE_ATTR_PRIO2IDX(prio) (-(prio))
1116
1117 #define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (_dispatch_queue_attr_qos2idx[(qos)])
1118
1119 static inline dispatch_queue_attr_t
1120 _dispatch_get_queue_attr(qos_class_t qos, int prio,
1121 _dispatch_queue_attr_overcommit_t overcommit,
1122 dispatch_autorelease_frequency_t frequency,
1123 bool concurrent, bool inactive)
1124 {
1125 return (dispatch_queue_attr_t)&_dispatch_queue_attrs
1126 [DISPATCH_QUEUE_ATTR_QOS2IDX(qos)]
1127 [DISPATCH_QUEUE_ATTR_PRIO2IDX(prio)]
1128 [DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit)]
1129 [DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency)]
1130 [DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent)]
1131 [DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive)];
1132 }
1133
1134 dispatch_queue_attr_t
1135 _dispatch_get_default_queue_attr(void)
1136 {
1137 return _dispatch_get_queue_attr(_DISPATCH_QOS_CLASS_UNSPECIFIED, 0,
1138 _dispatch_queue_attr_overcommit_unspecified,
1139 DISPATCH_AUTORELEASE_FREQUENCY_INHERIT, false, false);
1140 }
1141
1142 dispatch_queue_attr_t
1143 dispatch_queue_attr_make_with_qos_class(dispatch_queue_attr_t dqa,
1144 dispatch_qos_class_t qos_class, int relative_priority)
1145 {
1146 if (!_dispatch_qos_class_valid(qos_class, relative_priority)) {
1147 return DISPATCH_BAD_INPUT;
1148 }
1149 if (!slowpath(dqa)) {
1150 dqa = _dispatch_get_default_queue_attr();
1151 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1152 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1153 }
1154 return _dispatch_get_queue_attr(qos_class, relative_priority,
1155 dqa->dqa_overcommit, dqa->dqa_autorelease_frequency,
1156 dqa->dqa_concurrent, dqa->dqa_inactive);
1157 }
1158
1159 dispatch_queue_attr_t
1160 dispatch_queue_attr_make_initially_inactive(dispatch_queue_attr_t dqa)
1161 {
1162 if (!slowpath(dqa)) {
1163 dqa = _dispatch_get_default_queue_attr();
1164 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1165 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1166 }
1167 return _dispatch_get_queue_attr(dqa->dqa_qos_class,
1168 dqa->dqa_relative_priority, dqa->dqa_overcommit,
1169 dqa->dqa_autorelease_frequency, dqa->dqa_concurrent, true);
1170 }
1171
1172 dispatch_queue_attr_t
1173 dispatch_queue_attr_make_with_overcommit(dispatch_queue_attr_t dqa,
1174 bool overcommit)
1175 {
1176 if (!slowpath(dqa)) {
1177 dqa = _dispatch_get_default_queue_attr();
1178 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1179 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1180 }
1181 return _dispatch_get_queue_attr(dqa->dqa_qos_class,
1182 dqa->dqa_relative_priority, overcommit ?
1183 _dispatch_queue_attr_overcommit_enabled :
1184 _dispatch_queue_attr_overcommit_disabled,
1185 dqa->dqa_autorelease_frequency, dqa->dqa_concurrent,
1186 dqa->dqa_inactive);
1187 }
1188
1189 dispatch_queue_attr_t
1190 dispatch_queue_attr_make_with_autorelease_frequency(dispatch_queue_attr_t dqa,
1191 dispatch_autorelease_frequency_t frequency)
1192 {
1193 switch (frequency) {
1194 case DISPATCH_AUTORELEASE_FREQUENCY_INHERIT:
1195 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
1196 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
1197 break;
1198 default:
1199 return DISPATCH_BAD_INPUT;
1200 }
1201 if (!slowpath(dqa)) {
1202 dqa = _dispatch_get_default_queue_attr();
1203 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1204 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1205 }
1206 return _dispatch_get_queue_attr(dqa->dqa_qos_class,
1207 dqa->dqa_relative_priority, dqa->dqa_overcommit,
1208 frequency, dqa->dqa_concurrent, dqa->dqa_inactive);
1209 }
1210
1211 #pragma mark -
1212 #pragma mark dispatch_queue_t
1213
1214 // skip zero
1215 // 1 - main_q
1216 // 2 - mgr_q
1217 // 3 - mgr_root_q
1218 // 4,5,6,7,8,9,10,11,12,13,14,15 - global queues
1219 // we use 'xadd' on Intel, so the initial value == next assigned
1220 unsigned long volatile _dispatch_queue_serial_numbers = 16;
1221
1222 DISPATCH_NOINLINE
1223 static dispatch_queue_t
1224 _dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
1225 dispatch_queue_t tq, bool legacy)
1226 {
1227 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1228 // Be sure the root queue priorities are set
1229 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
1230 _dispatch_root_queues_init_once);
1231 #endif
1232 if (!slowpath(dqa)) {
1233 dqa = _dispatch_get_default_queue_attr();
1234 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1235 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1236 }
1237
1238 //
1239 // Step 1: Normalize arguments (qos, overcommit, tq)
1240 //
1241
1242 qos_class_t qos = dqa->dqa_qos_class;
1243 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1244 if (qos == _DISPATCH_QOS_CLASS_USER_INTERACTIVE &&
1245 !_dispatch_root_queues[
1246 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS].dq_priority) {
1247 qos = _DISPATCH_QOS_CLASS_USER_INITIATED;
1248 }
1249 #endif
1250 bool maintenance_fallback = false;
1251 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1252 maintenance_fallback = true;
1253 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1254 if (maintenance_fallback) {
1255 if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
1256 !_dispatch_root_queues[
1257 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS].dq_priority) {
1258 qos = _DISPATCH_QOS_CLASS_BACKGROUND;
1259 }
1260 }
1261
1262 _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
1263 if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
1264 if (tq->do_targetq) {
1265 DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
1266 "a non-global target queue");
1267 }
1268 }
1269
1270 if (tq && !tq->do_targetq &&
1271 tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
1272 // Handle discrepancies between attr and target queue, attributes win
1273 if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
1274 if (tq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
1275 overcommit = _dispatch_queue_attr_overcommit_enabled;
1276 } else {
1277 overcommit = _dispatch_queue_attr_overcommit_disabled;
1278 }
1279 }
1280 if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) {
1281 tq = _dispatch_get_root_queue_with_overcommit(tq,
1282 overcommit == _dispatch_queue_attr_overcommit_enabled);
1283 } else {
1284 tq = NULL;
1285 }
1286 } else if (tq && !tq->do_targetq) {
1287 // target is a pthread or runloop root queue, setting QoS or overcommit
1288 // is disallowed
1289 if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
1290 DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
1291 "and use this kind of target queue");
1292 }
1293 if (qos != _DISPATCH_QOS_CLASS_UNSPECIFIED) {
1294 DISPATCH_CLIENT_CRASH(tq, "Cannot specify a QoS attribute "
1295 "and use this kind of target queue");
1296 }
1297 } else {
1298 if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
1299 // Serial queues default to overcommit!
1300 overcommit = dqa->dqa_concurrent ?
1301 _dispatch_queue_attr_overcommit_disabled :
1302 _dispatch_queue_attr_overcommit_enabled;
1303 }
1304 }
1305 if (!tq) {
1306 qos_class_t tq_qos = qos == _DISPATCH_QOS_CLASS_UNSPECIFIED ?
1307 _DISPATCH_QOS_CLASS_DEFAULT : qos;
1308 tq = _dispatch_get_root_queue(tq_qos, overcommit ==
1309 _dispatch_queue_attr_overcommit_enabled);
1310 if (slowpath(!tq)) {
1311 DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
1312 }
1313 }
1314
1315 //
1316 // Step 2: Initialize the queue
1317 //
1318
1319 if (legacy) {
1320 // if any of these attributes is specified, use non legacy classes
1321 if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
1322 legacy = false;
1323 }
1324 }
1325
1326 const void *vtable;
1327 dispatch_queue_flags_t dqf = 0;
1328 if (legacy) {
1329 vtable = DISPATCH_VTABLE(queue);
1330 } else if (dqa->dqa_concurrent) {
1331 vtable = DISPATCH_VTABLE(queue_concurrent);
1332 } else {
1333 vtable = DISPATCH_VTABLE(queue_serial);
1334 }
1335 switch (dqa->dqa_autorelease_frequency) {
1336 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
1337 dqf |= DQF_AUTORELEASE_NEVER;
1338 break;
1339 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
1340 dqf |= DQF_AUTORELEASE_ALWAYS;
1341 break;
1342 }
1343 if (label) {
1344 const char *tmp = _dispatch_strdup_if_mutable(label);
1345 if (tmp != label) {
1346 dqf |= DQF_LABEL_NEEDS_FREE;
1347 label = tmp;
1348 }
1349 }
1350
1351 dispatch_queue_t dq = _dispatch_alloc(vtable,
1352 sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
1353 _dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
1354 DISPATCH_QUEUE_WIDTH_MAX : 1, dqa->dqa_inactive);
1355
1356 dq->dq_label = label;
1357
1358 #if HAVE_PTHREAD_WORKQUEUE_QOS
1359 dq->dq_priority = (dispatch_priority_t)_pthread_qos_class_encode(qos,
1360 dqa->dqa_relative_priority,
1361 overcommit == _dispatch_queue_attr_overcommit_enabled ?
1362 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0);
1363 #endif
1364 _dispatch_retain(tq);
1365 if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) {
1366 // legacy way of inherithing the QoS from the target
1367 _dispatch_queue_priority_inherit_from_target(dq, tq);
1368 }
1369 if (!dqa->dqa_inactive) {
1370 _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
1371 }
1372 dq->do_targetq = tq;
1373 _dispatch_object_debug(dq, "%s", __func__);
1374 return _dispatch_introspection_queue_create(dq);
1375 }
1376
1377 dispatch_queue_t
1378 dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
1379 dispatch_queue_t tq)
1380 {
1381 return _dispatch_queue_create_with_target(label, dqa, tq, false);
1382 }
1383
1384 dispatch_queue_t
1385 dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
1386 {
1387 return _dispatch_queue_create_with_target(label, attr,
1388 DISPATCH_TARGET_QUEUE_DEFAULT, true);
1389 }
1390
1391 dispatch_queue_t
1392 dispatch_queue_create_with_accounting_override_voucher(const char *label,
1393 dispatch_queue_attr_t attr, voucher_t voucher)
1394 {
1395 dispatch_queue_t dq = dispatch_queue_create_with_target(label, attr,
1396 DISPATCH_TARGET_QUEUE_DEFAULT);
1397 dq->dq_override_voucher = _voucher_create_accounting_voucher(voucher);
1398 return dq;
1399 }
1400
1401 void
1402 _dispatch_queue_destroy(dispatch_queue_t dq)
1403 {
1404 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
1405 uint64_t initial_state = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
1406
1407 if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
1408 initial_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE;
1409 }
1410 if (dx_type(dq) == DISPATCH_SOURCE_KEVENT_TYPE) {
1411 // dispatch_cancel_and_wait may apply overrides in a racy way with
1412 // the source cancellation finishing. This race is expensive and not
1413 // really worthwhile to resolve since the source becomes dead anyway.
1414 dq_state &= ~DISPATCH_QUEUE_HAS_OVERRIDE;
1415 }
1416 if (slowpath(dq_state != initial_state)) {
1417 if (_dq_state_drain_locked(dq_state)) {
1418 DISPATCH_CLIENT_CRASH(dq, "Release of a locked queue");
1419 }
1420 #ifndef __LP64__
1421 dq_state >>= 32;
1422 #endif
1423 DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
1424 "Release of a queue with corrupt state");
1425 }
1426 if (slowpath(dq == _dispatch_queue_get_current())) {
1427 DISPATCH_CLIENT_CRASH(dq, "Release of a queue by itself");
1428 }
1429 if (slowpath(dq->dq_items_tail)) {
1430 DISPATCH_CLIENT_CRASH(dq->dq_items_tail,
1431 "Release of a queue while items are enqueued");
1432 }
1433
1434 // trash the queue so that use after free will crash
1435 dq->dq_items_head = (void *)0x200;
1436 dq->dq_items_tail = (void *)0x200;
1437 // poison the state with something that is suspended and is easy to spot
1438 dq->dq_state = 0xdead000000000000;
1439
1440 dispatch_queue_t dqsq = os_atomic_xchg2o(dq, dq_specific_q,
1441 (void *)0x200, relaxed);
1442 if (dqsq) {
1443 _dispatch_release(dqsq);
1444 }
1445 if (dq->dq_override_voucher != DISPATCH_NO_VOUCHER) {
1446 if (dq->dq_override_voucher) _voucher_release(dq->dq_override_voucher);
1447 dq->dq_override_voucher = DISPATCH_NO_VOUCHER;
1448 }
1449 }
1450
1451 // 6618342 Contact the team that owns the Instrument DTrace probe before
1452 // renaming this symbol
1453 void
1454 _dispatch_queue_dispose(dispatch_queue_t dq)
1455 {
1456 _dispatch_object_debug(dq, "%s", __func__);
1457 _dispatch_introspection_queue_dispose(dq);
1458 if (dq->dq_label && _dispatch_queue_label_needs_free(dq)) {
1459 free((void*)dq->dq_label);
1460 }
1461 _dispatch_queue_destroy(dq);
1462 }
1463
1464 DISPATCH_NOINLINE
1465 static void
1466 _dispatch_queue_suspend_slow(dispatch_queue_t dq)
1467 {
1468 uint64_t dq_state, value, delta;
1469
1470 _dispatch_queue_sidelock_lock(dq);
1471
1472 // what we want to transfer (remove from dq_state)
1473 delta = DISPATCH_QUEUE_SUSPEND_HALF * DISPATCH_QUEUE_SUSPEND_INTERVAL;
1474 // but this is a suspend so add a suspend count at the same time
1475 delta -= DISPATCH_QUEUE_SUSPEND_INTERVAL;
1476 if (dq->dq_side_suspend_cnt == 0) {
1477 // we substract delta from dq_state, and we want to set this bit
1478 delta -= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT;
1479 }
1480
1481 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1482 // unsigned underflow of the substraction can happen because other
1483 // threads could have touched this value while we were trying to acquire
1484 // the lock, or because another thread raced us to do the same operation
1485 // and got to the lock first.
1486 if (slowpath(os_sub_overflow(dq_state, delta, &value))) {
1487 os_atomic_rmw_loop_give_up(goto retry);
1488 }
1489 });
1490 if (slowpath(os_add_overflow(dq->dq_side_suspend_cnt,
1491 DISPATCH_QUEUE_SUSPEND_HALF, &dq->dq_side_suspend_cnt))) {
1492 DISPATCH_CLIENT_CRASH(0, "Too many nested calls to dispatch_suspend()");
1493 }
1494 return _dispatch_queue_sidelock_unlock(dq);
1495
1496 retry:
1497 _dispatch_queue_sidelock_unlock(dq);
1498 return dx_vtable(dq)->do_suspend(dq);
1499 }
1500
1501 void
1502 _dispatch_queue_suspend(dispatch_queue_t dq)
1503 {
1504 dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT);
1505
1506 uint64_t dq_state, value;
1507
1508 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1509 value = DISPATCH_QUEUE_SUSPEND_INTERVAL;
1510 if (slowpath(os_add_overflow(dq_state, value, &value))) {
1511 os_atomic_rmw_loop_give_up({
1512 return _dispatch_queue_suspend_slow(dq);
1513 });
1514 }
1515 });
1516
1517 if (!_dq_state_is_suspended(dq_state)) {
1518 // rdar://8181908 we need to extend the queue life for the duration
1519 // of the call to wakeup at _dispatch_queue_resume() time.
1520 _dispatch_retain(dq);
1521 }
1522 }
1523
1524 DISPATCH_NOINLINE
1525 static void
1526 _dispatch_queue_resume_slow(dispatch_queue_t dq)
1527 {
1528 uint64_t dq_state, value, delta;
1529
1530 _dispatch_queue_sidelock_lock(dq);
1531
1532 // what we want to transfer
1533 delta = DISPATCH_QUEUE_SUSPEND_HALF * DISPATCH_QUEUE_SUSPEND_INTERVAL;
1534 // but this is a resume so consume a suspend count at the same time
1535 delta -= DISPATCH_QUEUE_SUSPEND_INTERVAL;
1536 switch (dq->dq_side_suspend_cnt) {
1537 case 0:
1538 goto retry;
1539 case DISPATCH_QUEUE_SUSPEND_HALF:
1540 // we will transition the side count to 0, so we want to clear this bit
1541 delta -= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT;
1542 break;
1543 }
1544 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1545 // unsigned overflow of the addition can happen because other
1546 // threads could have touched this value while we were trying to acquire
1547 // the lock, or because another thread raced us to do the same operation
1548 // and got to the lock first.
1549 if (slowpath(os_add_overflow(dq_state, delta, &value))) {
1550 os_atomic_rmw_loop_give_up(goto retry);
1551 }
1552 });
1553 dq->dq_side_suspend_cnt -= DISPATCH_QUEUE_SUSPEND_HALF;
1554 return _dispatch_queue_sidelock_unlock(dq);
1555
1556 retry:
1557 _dispatch_queue_sidelock_unlock(dq);
1558 return dx_vtable(dq)->do_resume(dq, false);
1559 }
1560
1561 DISPATCH_NOINLINE
1562 static void
1563 _dispatch_queue_resume_finalize_activation(dispatch_queue_t dq)
1564 {
1565 // Step 2: run the activation finalizer
1566 if (dx_vtable(dq)->do_finalize_activation) {
1567 dx_vtable(dq)->do_finalize_activation(dq);
1568 }
1569 // Step 3: consume the suspend count
1570 return dx_vtable(dq)->do_resume(dq, false);
1571 }
1572
1573 void
1574 _dispatch_queue_resume(dispatch_queue_t dq, bool activate)
1575 {
1576 // covers all suspend and inactive bits, including side suspend bit
1577 const uint64_t suspend_bits = DISPATCH_QUEUE_SUSPEND_BITS_MASK;
1578 // backward compatibility: only dispatch sources can abuse
1579 // dispatch_resume() to really mean dispatch_activate()
1580 bool resume_can_activate = (dx_type(dq) == DISPATCH_SOURCE_KEVENT_TYPE);
1581 uint64_t dq_state, value;
1582
1583 dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT);
1584
1585 // Activation is a bit tricky as it needs to finalize before the wakeup.
1586 //
1587 // If after doing its updates to the suspend count and/or inactive bit,
1588 // the last suspension related bit that would remain is the
1589 // NEEDS_ACTIVATION one, then this function:
1590 //
1591 // 1. moves the state to { sc:1 i:0 na:0 } (converts the needs-activate into
1592 // a suspend count)
1593 // 2. runs the activation finalizer
1594 // 3. consumes the suspend count set in (1), and finishes the resume flow
1595 //
1596 // Concurrently, some property setters such as setting dispatch source
1597 // handlers or _dispatch_queue_set_target_queue try to do in-place changes
1598 // before activation. These protect their action by taking a suspend count.
1599 // Step (1) above cannot happen if such a setter has locked the object.
1600 if (activate) {
1601 // relaxed atomic because this doesn't publish anything, this is only
1602 // about picking the thread that gets to finalize the activation
1603 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1604 if ((dq_state & suspend_bits) ==
1605 DISPATCH_QUEUE_NEEDS_ACTIVATION + DISPATCH_QUEUE_INACTIVE) {
1606 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1607 value = dq_state - DISPATCH_QUEUE_INACTIVE
1608 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1609 + DISPATCH_QUEUE_SUSPEND_INTERVAL;
1610 } else if (_dq_state_is_inactive(dq_state)) {
1611 // { sc:>0 i:1 na:1 } -> { i:0 na:1 }
1612 // simple activation because sc is not 0
1613 // resume will deal with na:1 later
1614 value = dq_state - DISPATCH_QUEUE_INACTIVE;
1615 } else {
1616 // object already active, this is a no-op, just exit
1617 os_atomic_rmw_loop_give_up(return);
1618 }
1619 });
1620 } else {
1621 // release barrier needed to publish the effect of
1622 // - dispatch_set_target_queue()
1623 // - dispatch_set_*_handler()
1624 // - do_finalize_activation()
1625 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, release, {
1626 if ((dq_state & suspend_bits) == DISPATCH_QUEUE_SUSPEND_INTERVAL
1627 + DISPATCH_QUEUE_NEEDS_ACTIVATION) {
1628 // { sc:1 i:0 na:1 } -> { sc:1 i:0 na:0 }
1629 value = dq_state - DISPATCH_QUEUE_NEEDS_ACTIVATION;
1630 } else if (resume_can_activate && (dq_state & suspend_bits) ==
1631 DISPATCH_QUEUE_NEEDS_ACTIVATION + DISPATCH_QUEUE_INACTIVE) {
1632 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1633 value = dq_state - DISPATCH_QUEUE_INACTIVE
1634 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1635 + DISPATCH_QUEUE_SUSPEND_INTERVAL;
1636 } else {
1637 value = DISPATCH_QUEUE_SUSPEND_INTERVAL;
1638 if (slowpath(os_sub_overflow(dq_state, value, &value))) {
1639 // underflow means over-resume or a suspend count transfer
1640 // to the side count is needed
1641 os_atomic_rmw_loop_give_up({
1642 if (!(dq_state & DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT)) {
1643 goto over_resume;
1644 }
1645 return _dispatch_queue_resume_slow(dq);
1646 });
1647 }
1648 if (_dq_state_is_runnable(value) &&
1649 !_dq_state_drain_locked(value)) {
1650 uint64_t full_width = value;
1651 if (_dq_state_has_pending_barrier(value)) {
1652 full_width -= DISPATCH_QUEUE_PENDING_BARRIER;
1653 full_width += DISPATCH_QUEUE_WIDTH_INTERVAL;
1654 full_width += DISPATCH_QUEUE_IN_BARRIER;
1655 } else {
1656 full_width += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
1657 full_width += DISPATCH_QUEUE_IN_BARRIER;
1658 }
1659 if ((full_width & DISPATCH_QUEUE_WIDTH_MASK) ==
1660 DISPATCH_QUEUE_WIDTH_FULL_BIT) {
1661 value = full_width;
1662 value &= ~DISPATCH_QUEUE_DIRTY;
1663 value |= _dispatch_tid_self();
1664 }
1665 }
1666 }
1667 });
1668 }
1669
1670 if ((dq_state ^ value) & DISPATCH_QUEUE_NEEDS_ACTIVATION) {
1671 // we cleared the NEEDS_ACTIVATION bit and we have a valid suspend count
1672 return _dispatch_queue_resume_finalize_activation(dq);
1673 }
1674
1675 if (activate) {
1676 // if we're still in an activate codepath here we should have
1677 // { sc:>0 na:1 }, if not we've got a corrupt state
1678 if (!fastpath(_dq_state_is_suspended(value))) {
1679 DISPATCH_CLIENT_CRASH(dq, "Invalid suspension state");
1680 }
1681 return;
1682 }
1683
1684 if (_dq_state_is_suspended(value)) {
1685 return;
1686 }
1687
1688 if ((dq_state ^ value) & DISPATCH_QUEUE_IN_BARRIER) {
1689 _dispatch_try_lock_transfer_or_wakeup(dq);
1690 } else if (_dq_state_should_wakeup(value)) {
1691 // <rdar://problem/14637483>
1692 // seq_cst wrt state changes that were flushed and not acted upon
1693 os_atomic_thread_fence(acquire);
1694 pthread_priority_t pp = _dispatch_queue_reset_override_priority(dq,
1695 _dispatch_queue_is_thread_bound(dq));
1696 // Balancing the retain() done in suspend() for rdar://8181908
1697 return dx_wakeup(dq, pp, DISPATCH_WAKEUP_CONSUME);
1698 }
1699
1700 // Balancing the retain() done in suspend() for rdar://8181908
1701 return _dispatch_release_tailcall(dq);
1702
1703 over_resume:
1704 if (slowpath(_dq_state_is_inactive(dq_state))) {
1705 DISPATCH_CLIENT_CRASH(dq, "Over-resume of an inactive object");
1706 }
1707 DISPATCH_CLIENT_CRASH(dq, "Over-resume of an object");
1708 }
1709
1710 const char *
1711 dispatch_queue_get_label(dispatch_queue_t dq)
1712 {
1713 if (slowpath(dq == DISPATCH_CURRENT_QUEUE_LABEL)) {
1714 dq = _dispatch_get_current_queue();
1715 }
1716 return dq->dq_label ? dq->dq_label : "";
1717 }
1718
1719 qos_class_t
1720 dispatch_queue_get_qos_class(dispatch_queue_t dq, int *relative_priority_ptr)
1721 {
1722 qos_class_t qos = _DISPATCH_QOS_CLASS_UNSPECIFIED;
1723 int relative_priority = 0;
1724 #if HAVE_PTHREAD_WORKQUEUE_QOS
1725 pthread_priority_t dqp = dq->dq_priority;
1726 if (dqp & _PTHREAD_PRIORITY_INHERIT_FLAG) dqp = 0;
1727 qos = _pthread_qos_class_decode(dqp, &relative_priority, NULL);
1728 #else
1729 (void)dq;
1730 #endif
1731 if (relative_priority_ptr) *relative_priority_ptr = relative_priority;
1732 return qos;
1733 }
1734
1735 static void
1736 _dispatch_queue_set_width2(void *ctxt)
1737 {
1738 int w = (int)(intptr_t)ctxt; // intentional truncation
1739 uint32_t tmp;
1740 dispatch_queue_t dq = _dispatch_queue_get_current();
1741
1742 if (w > 0) {
1743 tmp = (unsigned int)w;
1744 } else switch (w) {
1745 case 0:
1746 tmp = 1;
1747 break;
1748 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
1749 tmp = dispatch_hw_config(physical_cpus);
1750 break;
1751 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
1752 tmp = dispatch_hw_config(active_cpus);
1753 break;
1754 default:
1755 // fall through
1756 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
1757 tmp = dispatch_hw_config(logical_cpus);
1758 break;
1759 }
1760 if (tmp > DISPATCH_QUEUE_WIDTH_MAX) {
1761 tmp = DISPATCH_QUEUE_WIDTH_MAX;
1762 }
1763
1764 dispatch_queue_flags_t old_dqf, new_dqf;
1765 os_atomic_rmw_loop2o(dq, dq_atomic_flags, old_dqf, new_dqf, relaxed, {
1766 new_dqf = old_dqf & ~DQF_WIDTH_MASK;
1767 new_dqf |= (tmp << DQF_WIDTH_SHIFT);
1768 });
1769 _dispatch_object_debug(dq, "%s", __func__);
1770 }
1771
1772 void
1773 dispatch_queue_set_width(dispatch_queue_t dq, long width)
1774 {
1775 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) ||
1776 slowpath(dx_hastypeflag(dq, QUEUE_ROOT))) {
1777 return;
1778 }
1779
1780 unsigned long type = dx_type(dq);
1781 switch (type) {
1782 case DISPATCH_QUEUE_LEGACY_TYPE:
1783 case DISPATCH_QUEUE_CONCURRENT_TYPE:
1784 break;
1785 case DISPATCH_QUEUE_SERIAL_TYPE:
1786 DISPATCH_CLIENT_CRASH(type, "Cannot set width of a serial queue");
1787 default:
1788 DISPATCH_CLIENT_CRASH(type, "Unexpected dispatch object type");
1789 }
1790
1791 _dispatch_barrier_trysync_or_async_f(dq, (void*)(intptr_t)width,
1792 _dispatch_queue_set_width2);
1793 }
1794
1795 static void
1796 _dispatch_queue_legacy_set_target_queue(void *ctxt)
1797 {
1798 dispatch_queue_t dq = _dispatch_queue_get_current();
1799 dispatch_queue_t tq = ctxt;
1800 dispatch_queue_t otq = dq->do_targetq;
1801
1802 if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
1803 _dispatch_ktrace3(DISPATCH_PERF_non_leaf_retarget, dq, otq, tq);
1804 _dispatch_bug_deprecated("Changing the target of a queue "
1805 "already targeted by other dispatch objects");
1806 }
1807
1808 _dispatch_queue_priority_inherit_from_target(dq, tq);
1809 _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
1810 #if HAVE_PTHREAD_WORKQUEUE_QOS
1811 // see _dispatch_queue_class_wakeup()
1812 _dispatch_queue_sidelock_lock(dq);
1813 #endif
1814 dq->do_targetq = tq;
1815 #if HAVE_PTHREAD_WORKQUEUE_QOS
1816 // see _dispatch_queue_class_wakeup()
1817 _dispatch_queue_sidelock_unlock(dq);
1818 #endif
1819
1820 _dispatch_object_debug(dq, "%s", __func__);
1821 _dispatch_introspection_target_queue_changed(dq);
1822 _dispatch_release_tailcall(otq);
1823 }
1824
1825 void
1826 _dispatch_queue_set_target_queue(dispatch_queue_t dq, dispatch_queue_t tq)
1827 {
1828 dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT &&
1829 dq->do_targetq);
1830
1831 if (slowpath(!tq)) {
1832 bool is_concurrent_q = (dq->dq_width > 1);
1833 tq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
1834 !is_concurrent_q);
1835 }
1836
1837 if (_dispatch_queue_try_inactive_suspend(dq)) {
1838 _dispatch_object_set_target_queue_inline(dq, tq);
1839 return dx_vtable(dq)->do_resume(dq, false);
1840 }
1841
1842 if (dq->dq_override_voucher != DISPATCH_NO_VOUCHER) {
1843 DISPATCH_CLIENT_CRASH(dq, "Cannot change the target of a queue or "
1844 "source with an accounting override voucher "
1845 "after it has been activated");
1846 }
1847
1848 unsigned long type = dx_type(dq);
1849 switch (type) {
1850 case DISPATCH_QUEUE_LEGACY_TYPE:
1851 if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
1852 _dispatch_bug_deprecated("Changing the target of a queue "
1853 "already targeted by other dispatch objects");
1854 }
1855 break;
1856 case DISPATCH_SOURCE_KEVENT_TYPE:
1857 case DISPATCH_MACH_CHANNEL_TYPE:
1858 _dispatch_ktrace1(DISPATCH_PERF_post_activate_retarget, dq);
1859 _dispatch_bug_deprecated("Changing the target of a source "
1860 "after it has been activated");
1861 break;
1862
1863 case DISPATCH_QUEUE_SERIAL_TYPE:
1864 case DISPATCH_QUEUE_CONCURRENT_TYPE:
1865 DISPATCH_CLIENT_CRASH(type, "Cannot change the target of this queue "
1866 "after it has been activated");
1867 default:
1868 DISPATCH_CLIENT_CRASH(type, "Unexpected dispatch object type");
1869 }
1870
1871 _dispatch_retain(tq);
1872 return _dispatch_barrier_trysync_or_async_f(dq, tq,
1873 _dispatch_queue_legacy_set_target_queue);
1874 }
1875
1876 #pragma mark -
1877 #pragma mark dispatch_mgr_queue
1878
1879 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1880 static struct dispatch_pthread_root_queue_context_s
1881 _dispatch_mgr_root_queue_pthread_context;
1882 static struct dispatch_root_queue_context_s
1883 _dispatch_mgr_root_queue_context = {{{
1884 #if HAVE_PTHREAD_WORKQUEUES
1885 .dgq_kworkqueue = (void*)(~0ul),
1886 #endif
1887 .dgq_ctxt = &_dispatch_mgr_root_queue_pthread_context,
1888 .dgq_thread_pool_size = 1,
1889 }}};
1890
1891 static struct dispatch_queue_s _dispatch_mgr_root_queue = {
1892 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root),
1893 .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
1894 .do_ctxt = &_dispatch_mgr_root_queue_context,
1895 .dq_label = "com.apple.root.libdispatch-manager",
1896 .dq_width = DISPATCH_QUEUE_WIDTH_POOL,
1897 .dq_override = DISPATCH_SATURATED_OVERRIDE,
1898 .dq_override_voucher = DISPATCH_NO_VOUCHER,
1899 .dq_serialnum = 3,
1900 };
1901 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1902
1903 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
1904 static struct {
1905 volatile int prio;
1906 volatile qos_class_t qos;
1907 int default_prio;
1908 int policy;
1909 pthread_t tid;
1910 } _dispatch_mgr_sched;
1911
1912 static dispatch_once_t _dispatch_mgr_sched_pred;
1913
1914 // TODO: switch to "event-reflector thread" property <rdar://problem/18126138>
1915
1916 #if HAVE_PTHREAD_WORKQUEUE_QOS
1917 // Must be kept in sync with list of qos classes in sys/qos.h
1918 static const int _dispatch_mgr_sched_qos2prio[] = {
1919 [_DISPATCH_QOS_CLASS_MAINTENANCE] = 4,
1920 [_DISPATCH_QOS_CLASS_BACKGROUND] = 4,
1921 [_DISPATCH_QOS_CLASS_UTILITY] = 20,
1922 [_DISPATCH_QOS_CLASS_DEFAULT] = 31,
1923 [_DISPATCH_QOS_CLASS_USER_INITIATED] = 37,
1924 [_DISPATCH_QOS_CLASS_USER_INTERACTIVE] = 47,
1925 };
1926 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
1927
1928 static void
1929 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED)
1930 {
1931 struct sched_param param;
1932 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1933 pthread_attr_t *attr;
1934 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
1935 #else
1936 pthread_attr_t a, *attr = &a;
1937 #endif
1938 (void)dispatch_assume_zero(pthread_attr_init(attr));
1939 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr,
1940 &_dispatch_mgr_sched.policy));
1941 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
1942 #if HAVE_PTHREAD_WORKQUEUE_QOS
1943 qos_class_t qos = qos_class_main();
1944 if (qos == _DISPATCH_QOS_CLASS_DEFAULT) {
1945 qos = _DISPATCH_QOS_CLASS_USER_INITIATED; // rdar://problem/17279292
1946 }
1947 if (qos) {
1948 _dispatch_mgr_sched.qos = qos;
1949 param.sched_priority = _dispatch_mgr_sched_qos2prio[qos];
1950 }
1951 #endif
1952 _dispatch_mgr_sched.default_prio = param.sched_priority;
1953 _dispatch_mgr_sched.prio = _dispatch_mgr_sched.default_prio;
1954 }
1955 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
1956
1957 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1958 DISPATCH_NOINLINE
1959 static pthread_t *
1960 _dispatch_mgr_root_queue_init(void)
1961 {
1962 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
1963 struct sched_param param;
1964 pthread_attr_t *attr;
1965 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
1966 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr,
1967 PTHREAD_CREATE_DETACHED));
1968 #if !DISPATCH_DEBUG
1969 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr, 64 * 1024));
1970 #endif
1971 #if HAVE_PTHREAD_WORKQUEUE_QOS
1972 qos_class_t qos = _dispatch_mgr_sched.qos;
1973 if (qos) {
1974 if (_dispatch_set_qos_class_enabled) {
1975 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(attr,
1976 qos, 0));
1977 }
1978 _dispatch_mgr_q.dq_priority =
1979 (dispatch_priority_t)_pthread_qos_class_encode(qos, 0, 0);
1980 }
1981 #endif
1982 param.sched_priority = _dispatch_mgr_sched.prio;
1983 if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
1984 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr, &param));
1985 }
1986 return &_dispatch_mgr_sched.tid;
1987 }
1988
1989 static inline void
1990 _dispatch_mgr_priority_apply(void)
1991 {
1992 struct sched_param param;
1993 do {
1994 param.sched_priority = _dispatch_mgr_sched.prio;
1995 if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
1996 (void)dispatch_assume_zero(pthread_setschedparam(
1997 _dispatch_mgr_sched.tid, _dispatch_mgr_sched.policy,
1998 &param));
1999 }
2000 } while (_dispatch_mgr_sched.prio > param.sched_priority);
2001 }
2002
2003 DISPATCH_NOINLINE
2004 void
2005 _dispatch_mgr_priority_init(void)
2006 {
2007 struct sched_param param;
2008 pthread_attr_t *attr;
2009 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
2010 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
2011 #if HAVE_PTHREAD_WORKQUEUE_QOS
2012 qos_class_t qos = 0;
2013 (void)pthread_attr_get_qos_class_np(attr, &qos, NULL);
2014 if (_dispatch_mgr_sched.qos > qos && _dispatch_set_qos_class_enabled) {
2015 (void)pthread_set_qos_class_self_np(_dispatch_mgr_sched.qos, 0);
2016 int p = _dispatch_mgr_sched_qos2prio[_dispatch_mgr_sched.qos];
2017 if (p > param.sched_priority) {
2018 param.sched_priority = p;
2019 }
2020 }
2021 #endif
2022 if (slowpath(_dispatch_mgr_sched.prio > param.sched_priority)) {
2023 return _dispatch_mgr_priority_apply();
2024 }
2025 }
2026 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2027
2028 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2029 DISPATCH_NOINLINE
2030 static void
2031 _dispatch_mgr_priority_raise(const pthread_attr_t *attr)
2032 {
2033 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
2034 struct sched_param param;
2035 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
2036 #if HAVE_PTHREAD_WORKQUEUE_QOS
2037 qos_class_t q, qos = 0;
2038 (void)pthread_attr_get_qos_class_np((pthread_attr_t *)attr, &qos, NULL);
2039 if (qos) {
2040 param.sched_priority = _dispatch_mgr_sched_qos2prio[qos];
2041 os_atomic_rmw_loop2o(&_dispatch_mgr_sched, qos, q, qos, relaxed, {
2042 if (q >= qos) os_atomic_rmw_loop_give_up(break);
2043 });
2044 }
2045 #endif
2046 int p, prio = param.sched_priority;
2047 os_atomic_rmw_loop2o(&_dispatch_mgr_sched, prio, p, prio, relaxed, {
2048 if (p >= prio) os_atomic_rmw_loop_give_up(return);
2049 });
2050 #if DISPATCH_USE_KEVENT_WORKQUEUE
2051 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
2052 _dispatch_root_queues_init_once);
2053 if (_dispatch_kevent_workqueue_enabled) {
2054 pthread_priority_t pp = 0;
2055 if (prio > _dispatch_mgr_sched.default_prio) {
2056 // The values of _PTHREAD_PRIORITY_SCHED_PRI_FLAG and
2057 // _PTHREAD_PRIORITY_ROOTQUEUE_FLAG overlap, but that is not
2058 // problematic in this case, since it the second one is only ever
2059 // used on dq_priority fields.
2060 // We never pass the _PTHREAD_PRIORITY_ROOTQUEUE_FLAG to a syscall,
2061 // it is meaningful to libdispatch only.
2062 pp = (pthread_priority_t)prio | _PTHREAD_PRIORITY_SCHED_PRI_FLAG;
2063 } else if (qos) {
2064 pp = _pthread_qos_class_encode(qos, 0, 0);
2065 }
2066 if (pp) {
2067 int r = _pthread_workqueue_set_event_manager_priority(pp);
2068 (void)dispatch_assume_zero(r);
2069 }
2070 return;
2071 }
2072 #endif
2073 #if DISPATCH_USE_MGR_THREAD
2074 if (_dispatch_mgr_sched.tid) {
2075 return _dispatch_mgr_priority_apply();
2076 }
2077 #endif
2078 }
2079 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2080
2081 #if DISPATCH_USE_KEVENT_WORKQUEUE
2082 void
2083 _dispatch_kevent_workqueue_init(void)
2084 {
2085 // Initialize kevent workqueue support
2086 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
2087 _dispatch_root_queues_init_once);
2088 if (!_dispatch_kevent_workqueue_enabled) return;
2089 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
2090 qos_class_t qos = _dispatch_mgr_sched.qos;
2091 int prio = _dispatch_mgr_sched.prio;
2092 pthread_priority_t pp = 0;
2093 if (qos) {
2094 pp = _pthread_qos_class_encode(qos, 0, 0);
2095 _dispatch_mgr_q.dq_priority = (dispatch_priority_t)pp;
2096 }
2097 if (prio > _dispatch_mgr_sched.default_prio) {
2098 pp = (pthread_priority_t)prio | _PTHREAD_PRIORITY_SCHED_PRI_FLAG;
2099 }
2100 if (pp) {
2101 int r = _pthread_workqueue_set_event_manager_priority(pp);
2102 (void)dispatch_assume_zero(r);
2103 }
2104 }
2105 #endif
2106
2107 #pragma mark -
2108 #pragma mark dispatch_pthread_root_queue
2109
2110 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2111 static dispatch_queue_t
2112 _dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
2113 const pthread_attr_t *attr, dispatch_block_t configure,
2114 dispatch_pthread_root_queue_observer_hooks_t observer_hooks)
2115 {
2116 dispatch_queue_t dq;
2117 dispatch_root_queue_context_t qc;
2118 dispatch_pthread_root_queue_context_t pqc;
2119 dispatch_queue_flags_t dqf = 0;
2120 size_t dqs;
2121 uint8_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ?
2122 (uint8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0;
2123
2124 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
2125 dqs = roundup(dqs, _Alignof(struct dispatch_root_queue_context_s));
2126 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_root), dqs +
2127 sizeof(struct dispatch_root_queue_context_s) +
2128 sizeof(struct dispatch_pthread_root_queue_context_s));
2129 qc = (void*)dq + dqs;
2130 dispatch_assert((uintptr_t)qc % _Alignof(typeof(*qc)) == 0);
2131 pqc = (void*)qc + sizeof(struct dispatch_root_queue_context_s);
2132 dispatch_assert((uintptr_t)pqc % _Alignof(typeof(*pqc)) == 0);
2133 if (label) {
2134 const char *tmp = _dispatch_strdup_if_mutable(label);
2135 if (tmp != label) {
2136 dqf |= DQF_LABEL_NEEDS_FREE;
2137 label = tmp;
2138 }
2139 }
2140
2141 _dispatch_queue_init(dq, dqf, DISPATCH_QUEUE_WIDTH_POOL, false);
2142 dq->dq_label = label;
2143 dq->dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
2144 dq->dq_override = DISPATCH_SATURATED_OVERRIDE;
2145 dq->do_ctxt = qc;
2146 dq->do_targetq = NULL;
2147
2148 pqc->dpq_thread_mediator.do_vtable = DISPATCH_VTABLE(semaphore);
2149 qc->dgq_ctxt = pqc;
2150 #if HAVE_PTHREAD_WORKQUEUES
2151 qc->dgq_kworkqueue = (void*)(~0ul);
2152 #endif
2153 _dispatch_root_queue_init_pthread_pool(qc, pool_size, true);
2154
2155 if (attr) {
2156 memcpy(&pqc->dpq_thread_attr, attr, sizeof(pthread_attr_t));
2157 _dispatch_mgr_priority_raise(&pqc->dpq_thread_attr);
2158 } else {
2159 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
2160 }
2161 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
2162 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
2163 if (configure) {
2164 pqc->dpq_thread_configure = _dispatch_Block_copy(configure);
2165 }
2166 if (observer_hooks) {
2167 pqc->dpq_observer_hooks = *observer_hooks;
2168 }
2169 _dispatch_object_debug(dq, "%s", __func__);
2170 return _dispatch_introspection_queue_create(dq);
2171 }
2172
2173 dispatch_queue_t
2174 dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
2175 const pthread_attr_t *attr, dispatch_block_t configure)
2176 {
2177 return _dispatch_pthread_root_queue_create(label, flags, attr, configure,
2178 NULL);
2179 }
2180
2181 #if DISPATCH_IOHID_SPI
2182 dispatch_queue_t
2183 _dispatch_pthread_root_queue_create_with_observer_hooks_4IOHID(const char *label,
2184 unsigned long flags, const pthread_attr_t *attr,
2185 dispatch_pthread_root_queue_observer_hooks_t observer_hooks,
2186 dispatch_block_t configure)
2187 {
2188 if (!observer_hooks->queue_will_execute ||
2189 !observer_hooks->queue_did_execute) {
2190 DISPATCH_CLIENT_CRASH(0, "Invalid pthread root queue observer hooks");
2191 }
2192 return _dispatch_pthread_root_queue_create(label, flags, attr, configure,
2193 observer_hooks);
2194 }
2195 #endif
2196
2197 dispatch_queue_t
2198 dispatch_pthread_root_queue_copy_current(void)
2199 {
2200 dispatch_queue_t dq = _dispatch_queue_get_current();
2201 if (!dq) return NULL;
2202 while (slowpath(dq->do_targetq)) {
2203 dq = dq->do_targetq;
2204 }
2205 if (dx_type(dq) != DISPATCH_QUEUE_GLOBAL_ROOT_TYPE ||
2206 dq->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
2207 return NULL;
2208 }
2209 return (dispatch_queue_t)_os_object_retain_with_resurrect(dq->_as_os_obj);
2210 }
2211
2212 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2213
2214 void
2215 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq)
2216 {
2217 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
2218 DISPATCH_INTERNAL_CRASH(dq, "Global root queue disposed");
2219 }
2220 _dispatch_object_debug(dq, "%s", __func__);
2221 _dispatch_introspection_queue_dispose(dq);
2222 #if DISPATCH_USE_PTHREAD_POOL
2223 dispatch_root_queue_context_t qc = dq->do_ctxt;
2224 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
2225
2226 pthread_attr_destroy(&pqc->dpq_thread_attr);
2227 _dispatch_semaphore_dispose(&pqc->dpq_thread_mediator);
2228 if (pqc->dpq_thread_configure) {
2229 Block_release(pqc->dpq_thread_configure);
2230 }
2231 dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
2232 false);
2233 #endif
2234 if (dq->dq_label && _dispatch_queue_label_needs_free(dq)) {
2235 free((void*)dq->dq_label);
2236 }
2237 _dispatch_queue_destroy(dq);
2238 }
2239
2240 #pragma mark -
2241 #pragma mark dispatch_queue_specific
2242
2243 struct dispatch_queue_specific_queue_s {
2244 DISPATCH_QUEUE_HEADER(queue_specific_queue);
2245 TAILQ_HEAD(dispatch_queue_specific_head_s,
2246 dispatch_queue_specific_s) dqsq_contexts;
2247 } DISPATCH_QUEUE_ALIGN;
2248
2249 struct dispatch_queue_specific_s {
2250 const void *dqs_key;
2251 void *dqs_ctxt;
2252 dispatch_function_t dqs_destructor;
2253 TAILQ_ENTRY(dispatch_queue_specific_s) dqs_list;
2254 };
2255 DISPATCH_DECL(dispatch_queue_specific);
2256
2257 void
2258 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq)
2259 {
2260 dispatch_queue_specific_t dqs, tmp;
2261
2262 TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) {
2263 if (dqs->dqs_destructor) {
2264 dispatch_async_f(_dispatch_get_root_queue(
2265 _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt,
2266 dqs->dqs_destructor);
2267 }
2268 free(dqs);
2269 }
2270 _dispatch_queue_destroy(dqsq->_as_dq);
2271 }
2272
2273 static void
2274 _dispatch_queue_init_specific(dispatch_queue_t dq)
2275 {
2276 dispatch_queue_specific_queue_t dqsq;
2277
2278 dqsq = _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue),
2279 sizeof(struct dispatch_queue_specific_queue_s));
2280 _dispatch_queue_init(dqsq->_as_dq, DQF_NONE,
2281 DISPATCH_QUEUE_WIDTH_MAX, false);
2282 dqsq->do_xref_cnt = -1;
2283 dqsq->do_targetq = _dispatch_get_root_queue(
2284 _DISPATCH_QOS_CLASS_USER_INITIATED, true);
2285 dqsq->dq_label = "queue-specific";
2286 TAILQ_INIT(&dqsq->dqsq_contexts);
2287 if (slowpath(!os_atomic_cmpxchg2o(dq, dq_specific_q, NULL,
2288 dqsq->_as_dq, release))) {
2289 _dispatch_release(dqsq->_as_dq);
2290 }
2291 }
2292
2293 static void
2294 _dispatch_queue_set_specific(void *ctxt)
2295 {
2296 dispatch_queue_specific_t dqs, dqsn = ctxt;
2297 dispatch_queue_specific_queue_t dqsq =
2298 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
2299
2300 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
2301 if (dqs->dqs_key == dqsn->dqs_key) {
2302 // Destroy previous context for existing key
2303 if (dqs->dqs_destructor) {
2304 dispatch_async_f(_dispatch_get_root_queue(
2305 _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt,
2306 dqs->dqs_destructor);
2307 }
2308 if (dqsn->dqs_ctxt) {
2309 // Copy new context for existing key
2310 dqs->dqs_ctxt = dqsn->dqs_ctxt;
2311 dqs->dqs_destructor = dqsn->dqs_destructor;
2312 } else {
2313 // Remove context storage for existing key
2314 TAILQ_REMOVE(&dqsq->dqsq_contexts, dqs, dqs_list);
2315 free(dqs);
2316 }
2317 return free(dqsn);
2318 }
2319 }
2320 // Insert context storage for new key
2321 TAILQ_INSERT_TAIL(&dqsq->dqsq_contexts, dqsn, dqs_list);
2322 }
2323
2324 DISPATCH_NOINLINE
2325 void
2326 dispatch_queue_set_specific(dispatch_queue_t dq, const void *key,
2327 void *ctxt, dispatch_function_t destructor)
2328 {
2329 if (slowpath(!key)) {
2330 return;
2331 }
2332 dispatch_queue_specific_t dqs;
2333
2334 dqs = _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s));
2335 dqs->dqs_key = key;
2336 dqs->dqs_ctxt = ctxt;
2337 dqs->dqs_destructor = destructor;
2338 if (slowpath(!dq->dq_specific_q)) {
2339 _dispatch_queue_init_specific(dq);
2340 }
2341 _dispatch_barrier_trysync_or_async_f(dq->dq_specific_q, dqs,
2342 _dispatch_queue_set_specific);
2343 }
2344
2345 static void
2346 _dispatch_queue_get_specific(void *ctxt)
2347 {
2348 void **ctxtp = ctxt;
2349 void *key = *ctxtp;
2350 dispatch_queue_specific_queue_t dqsq =
2351 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
2352 dispatch_queue_specific_t dqs;
2353
2354 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
2355 if (dqs->dqs_key == key) {
2356 *ctxtp = dqs->dqs_ctxt;
2357 return;
2358 }
2359 }
2360 *ctxtp = NULL;
2361 }
2362
2363 DISPATCH_NOINLINE
2364 void *
2365 dispatch_queue_get_specific(dispatch_queue_t dq, const void *key)
2366 {
2367 if (slowpath(!key)) {
2368 return NULL;
2369 }
2370 void *ctxt = NULL;
2371
2372 if (fastpath(dq->dq_specific_q)) {
2373 ctxt = (void *)key;
2374 dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific);
2375 }
2376 return ctxt;
2377 }
2378
2379 DISPATCH_NOINLINE
2380 void *
2381 dispatch_get_specific(const void *key)
2382 {
2383 if (slowpath(!key)) {
2384 return NULL;
2385 }
2386 void *ctxt = NULL;
2387 dispatch_queue_t dq = _dispatch_queue_get_current();
2388
2389 while (slowpath(dq)) {
2390 if (slowpath(dq->dq_specific_q)) {
2391 ctxt = (void *)key;
2392 dispatch_sync_f(dq->dq_specific_q, &ctxt,
2393 _dispatch_queue_get_specific);
2394 if (ctxt) break;
2395 }
2396 dq = dq->do_targetq;
2397 }
2398 return ctxt;
2399 }
2400
2401 #if DISPATCH_IOHID_SPI
2402 bool
2403 _dispatch_queue_is_exclusively_owned_by_current_thread_4IOHID(
2404 dispatch_queue_t dq) // rdar://problem/18033810
2405 {
2406 if (dq->dq_width != 1) {
2407 DISPATCH_CLIENT_CRASH(dq->dq_width, "Invalid queue type");
2408 }
2409 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
2410 return _dq_state_drain_locked_by(dq_state, _dispatch_tid_self());
2411 }
2412 #endif
2413
2414 #pragma mark -
2415 #pragma mark dispatch_queue_debug
2416
2417 size_t
2418 _dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
2419 {
2420 size_t offset = 0;
2421 dispatch_queue_t target = dq->do_targetq;
2422 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
2423
2424 offset += dsnprintf(&buf[offset], bufsiz - offset,
2425 "target = %s[%p], width = 0x%x, state = 0x%016llx",
2426 target && target->dq_label ? target->dq_label : "", target,
2427 dq->dq_width, (unsigned long long)dq_state);
2428 if (_dq_state_is_suspended(dq_state)) {
2429 offset += dsnprintf(&buf[offset], bufsiz - offset, ", suspended = %d",
2430 _dq_state_suspend_cnt(dq_state));
2431 }
2432 if (_dq_state_is_inactive(dq_state)) {
2433 offset += dsnprintf(&buf[offset], bufsiz - offset, ", inactive");
2434 } else if (_dq_state_needs_activation(dq_state)) {
2435 offset += dsnprintf(&buf[offset], bufsiz - offset, ", needs-activation");
2436 }
2437 if (_dq_state_is_enqueued(dq_state)) {
2438 offset += dsnprintf(&buf[offset], bufsiz - offset, ", enqueued");
2439 }
2440 if (_dq_state_is_dirty(dq_state)) {
2441 offset += dsnprintf(&buf[offset], bufsiz - offset, ", dirty");
2442 }
2443 if (_dq_state_has_override(dq_state)) {
2444 offset += dsnprintf(&buf[offset], bufsiz - offset, ", async-override");
2445 }
2446 mach_port_t owner = _dq_state_drain_owner(dq_state);
2447 if (!_dispatch_queue_is_thread_bound(dq) && owner) {
2448 offset += dsnprintf(&buf[offset], bufsiz - offset, ", draining on 0x%x",
2449 owner);
2450 }
2451 if (_dq_state_is_in_barrier(dq_state)) {
2452 offset += dsnprintf(&buf[offset], bufsiz - offset, ", in-barrier");
2453 } else {
2454 offset += dsnprintf(&buf[offset], bufsiz - offset, ", in-flight = %d",
2455 _dq_state_used_width(dq_state, dq->dq_width));
2456 }
2457 if (_dq_state_has_pending_barrier(dq_state)) {
2458 offset += dsnprintf(&buf[offset], bufsiz - offset, ", pending-barrier");
2459 }
2460 if (_dispatch_queue_is_thread_bound(dq)) {
2461 offset += dsnprintf(&buf[offset], bufsiz - offset, ", thread = 0x%x ",
2462 owner);
2463 }
2464 return offset;
2465 }
2466
2467 size_t
2468 dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
2469 {
2470 size_t offset = 0;
2471 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2472 dq->dq_label ? dq->dq_label : dx_kind(dq), dq);
2473 offset += _dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
2474 offset += _dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
2475 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2476 return offset;
2477 }
2478
2479 #if DISPATCH_DEBUG
2480 void
2481 dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
2482 if (fastpath(dq)) {
2483 _dispatch_object_debug(dq, "%s", str);
2484 } else {
2485 _dispatch_log("queue[NULL]: %s", str);
2486 }
2487 }
2488 #endif
2489
2490 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
2491 static OSSpinLock _dispatch_stats_lock;
2492 static struct {
2493 uint64_t time_total;
2494 uint64_t count_total;
2495 uint64_t thread_total;
2496 } _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set
2497
2498 static void
2499 _dispatch_queue_merge_stats(uint64_t start)
2500 {
2501 uint64_t delta = _dispatch_absolute_time() - start;
2502 unsigned long count;
2503
2504 count = (unsigned long)_dispatch_thread_getspecific(dispatch_bcounter_key);
2505 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
2506
2507 int bucket = flsl((long)count);
2508
2509 // 64-bit counters on 32-bit require a lock or a queue
2510 OSSpinLockLock(&_dispatch_stats_lock);
2511
2512 _dispatch_stats[bucket].time_total += delta;
2513 _dispatch_stats[bucket].count_total += count;
2514 _dispatch_stats[bucket].thread_total++;
2515
2516 OSSpinLockUnlock(&_dispatch_stats_lock);
2517 }
2518 #endif
2519
2520 #pragma mark -
2521 #pragma mark _dispatch_set_priority_and_mach_voucher
2522 #if HAVE_PTHREAD_WORKQUEUE_QOS
2523
2524 DISPATCH_NOINLINE
2525 void
2526 _dispatch_set_priority_and_mach_voucher_slow(pthread_priority_t pp,
2527 mach_voucher_t kv)
2528 {
2529 _pthread_set_flags_t pflags = 0;
2530 if (pp && _dispatch_set_qos_class_enabled) {
2531 pthread_priority_t old_pri = _dispatch_get_priority();
2532 if (pp != old_pri) {
2533 if (old_pri & _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG) {
2534 pflags |= _PTHREAD_SET_SELF_WQ_KEVENT_UNBIND;
2535 // when we unbind, overcomitness can flip, so we need to learn
2536 // it from the defaultpri, see _dispatch_priority_compute_update
2537 pp |= (_dispatch_get_defaultpriority() &
2538 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
2539 } else {
2540 // else we need to keep the one that is set in the current pri
2541 pp |= (old_pri & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
2542 }
2543 if (likely(old_pri & ~_PTHREAD_PRIORITY_FLAGS_MASK)) {
2544 pflags |= _PTHREAD_SET_SELF_QOS_FLAG;
2545 }
2546 if (unlikely(DISPATCH_QUEUE_DRAIN_OWNER(&_dispatch_mgr_q) ==
2547 _dispatch_tid_self())) {
2548 DISPATCH_INTERNAL_CRASH(pp,
2549 "Changing the QoS while on the manager queue");
2550 }
2551 if (unlikely(pp & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) {
2552 DISPATCH_INTERNAL_CRASH(pp, "Cannot raise oneself to manager");
2553 }
2554 if (old_pri & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG) {
2555 DISPATCH_INTERNAL_CRASH(old_pri,
2556 "Cannot turn a manager thread into a normal one");
2557 }
2558 }
2559 }
2560 if (kv != VOUCHER_NO_MACH_VOUCHER) {
2561 #if VOUCHER_USE_MACH_VOUCHER
2562 pflags |= _PTHREAD_SET_SELF_VOUCHER_FLAG;
2563 #endif
2564 }
2565 if (!pflags) return;
2566 int r = _pthread_set_properties_self(pflags, pp, kv);
2567 if (r == EINVAL) {
2568 DISPATCH_INTERNAL_CRASH(pp, "_pthread_set_properties_self failed");
2569 }
2570 (void)dispatch_assume_zero(r);
2571 }
2572
2573 DISPATCH_NOINLINE
2574 voucher_t
2575 _dispatch_set_priority_and_voucher_slow(pthread_priority_t priority,
2576 voucher_t v, _dispatch_thread_set_self_t flags)
2577 {
2578 voucher_t ov = DISPATCH_NO_VOUCHER;
2579 mach_voucher_t kv = VOUCHER_NO_MACH_VOUCHER;
2580 if (v != DISPATCH_NO_VOUCHER) {
2581 bool retained = flags & DISPATCH_VOUCHER_CONSUME;
2582 ov = _voucher_get();
2583 if (ov == v && (flags & DISPATCH_VOUCHER_REPLACE)) {
2584 if (retained && v) _voucher_release_no_dispose(v);
2585 ov = DISPATCH_NO_VOUCHER;
2586 } else {
2587 if (!retained && v) _voucher_retain(v);
2588 kv = _voucher_swap_and_get_mach_voucher(ov, v);
2589 }
2590 }
2591 #if !PTHREAD_WORKQUEUE_RESETS_VOUCHER_AND_PRIORITY_ON_PARK
2592 flags &= ~(_dispatch_thread_set_self_t)DISPATCH_THREAD_PARK;
2593 #endif
2594 if (!(flags & DISPATCH_THREAD_PARK)) {
2595 _dispatch_set_priority_and_mach_voucher_slow(priority, kv);
2596 }
2597 if (ov != DISPATCH_NO_VOUCHER && (flags & DISPATCH_VOUCHER_REPLACE)) {
2598 if (ov) _voucher_release(ov);
2599 ov = DISPATCH_NO_VOUCHER;
2600 }
2601 return ov;
2602 }
2603 #endif
2604 #pragma mark -
2605 #pragma mark dispatch_continuation_t
2606
2607 static void
2608 _dispatch_force_cache_cleanup(void)
2609 {
2610 dispatch_continuation_t dc;
2611 dc = _dispatch_thread_getspecific(dispatch_cache_key);
2612 if (dc) {
2613 _dispatch_thread_setspecific(dispatch_cache_key, NULL);
2614 _dispatch_cache_cleanup(dc);
2615 }
2616 }
2617
2618 DISPATCH_NOINLINE
2619 static void
2620 _dispatch_cache_cleanup(void *value)
2621 {
2622 dispatch_continuation_t dc, next_dc = value;
2623
2624 while ((dc = next_dc)) {
2625 next_dc = dc->do_next;
2626 _dispatch_continuation_free_to_heap(dc);
2627 }
2628 }
2629
2630 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
2631 DISPATCH_NOINLINE
2632 void
2633 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc)
2634 {
2635 _dispatch_continuation_free_to_heap(dc);
2636 dispatch_continuation_t next_dc;
2637 dc = _dispatch_thread_getspecific(dispatch_cache_key);
2638 int cnt;
2639 if (!dc || (cnt = dc->dc_cache_cnt -
2640 _dispatch_continuation_cache_limit) <= 0){
2641 return;
2642 }
2643 do {
2644 next_dc = dc->do_next;
2645 _dispatch_continuation_free_to_heap(dc);
2646 } while (--cnt && (dc = next_dc));
2647 _dispatch_thread_setspecific(dispatch_cache_key, next_dc);
2648 }
2649 #endif
2650
2651 DISPATCH_ALWAYS_INLINE_NDEBUG
2652 static inline void
2653 _dispatch_continuation_slow_item_signal(dispatch_queue_t dq,
2654 dispatch_object_t dou)
2655 {
2656 dispatch_continuation_t dc = dou._dc;
2657 pthread_priority_t pp = dq->dq_override;
2658
2659 _dispatch_trace_continuation_pop(dq, dc);
2660 if (pp > (dc->dc_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
2661 _dispatch_wqthread_override_start((mach_port_t)dc->dc_data, pp);
2662 }
2663 _dispatch_thread_event_signal((dispatch_thread_event_t)dc->dc_other);
2664 _dispatch_introspection_queue_item_complete(dc);
2665 }
2666
2667 DISPATCH_NOINLINE
2668 static void
2669 _dispatch_continuation_push(dispatch_queue_t dq, dispatch_continuation_t dc)
2670 {
2671 _dispatch_queue_push(dq, dc,
2672 _dispatch_continuation_get_override_priority(dq, dc));
2673 }
2674
2675 DISPATCH_NOINLINE
2676 static void
2677 _dispatch_continuation_push_sync_slow(dispatch_queue_t dq,
2678 dispatch_continuation_t dc)
2679 {
2680 _dispatch_queue_push_inline(dq, dc,
2681 _dispatch_continuation_get_override_priority(dq, dc),
2682 DISPATCH_WAKEUP_SLOW_WAITER);
2683 }
2684
2685 DISPATCH_ALWAYS_INLINE
2686 static inline void
2687 _dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
2688 bool barrier)
2689 {
2690 if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
2691 return _dispatch_continuation_push(dq, dc);
2692 }
2693 return _dispatch_async_f2(dq, dc);
2694 }
2695
2696 DISPATCH_NOINLINE
2697 void
2698 _dispatch_continuation_async(dispatch_queue_t dq, dispatch_continuation_t dc)
2699 {
2700 _dispatch_continuation_async2(dq, dc,
2701 dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT);
2702 }
2703
2704 #pragma mark -
2705 #pragma mark dispatch_block_create
2706
2707 #if __BLOCKS__
2708
2709 DISPATCH_ALWAYS_INLINE
2710 static inline bool
2711 _dispatch_block_flags_valid(dispatch_block_flags_t flags)
2712 {
2713 return ((flags & ~DISPATCH_BLOCK_API_MASK) == 0);
2714 }
2715
2716 DISPATCH_ALWAYS_INLINE
2717 static inline dispatch_block_flags_t
2718 _dispatch_block_normalize_flags(dispatch_block_flags_t flags)
2719 {
2720 if (flags & (DISPATCH_BLOCK_NO_VOUCHER|DISPATCH_BLOCK_DETACHED)) {
2721 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2722 }
2723 if (flags & (DISPATCH_BLOCK_NO_QOS_CLASS|DISPATCH_BLOCK_DETACHED)) {
2724 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2725 }
2726 return flags;
2727 }
2728
2729 static inline dispatch_block_t
2730 _dispatch_block_create_with_voucher_and_priority(dispatch_block_flags_t flags,
2731 voucher_t voucher, pthread_priority_t pri, dispatch_block_t block)
2732 {
2733 flags = _dispatch_block_normalize_flags(flags);
2734 bool assign = (flags & DISPATCH_BLOCK_ASSIGN_CURRENT);
2735
2736 if (assign && !(flags & DISPATCH_BLOCK_HAS_VOUCHER)) {
2737 voucher = VOUCHER_CURRENT;
2738 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2739 }
2740 if (voucher == VOUCHER_CURRENT) {
2741 voucher = _voucher_get();
2742 }
2743 if (assign && !(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
2744 pri = _dispatch_priority_propagate();
2745 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2746 }
2747 dispatch_block_t db = _dispatch_block_create(flags, voucher, pri, block);
2748 #if DISPATCH_DEBUG
2749 dispatch_assert(_dispatch_block_get_data(db));
2750 #endif
2751 return db;
2752 }
2753
2754 dispatch_block_t
2755 dispatch_block_create(dispatch_block_flags_t flags, dispatch_block_t block)
2756 {
2757 if (!_dispatch_block_flags_valid(flags)) return DISPATCH_BAD_INPUT;
2758 return _dispatch_block_create_with_voucher_and_priority(flags, NULL, 0,
2759 block);
2760 }
2761
2762 dispatch_block_t
2763 dispatch_block_create_with_qos_class(dispatch_block_flags_t flags,
2764 dispatch_qos_class_t qos_class, int relative_priority,
2765 dispatch_block_t block)
2766 {
2767 if (!_dispatch_block_flags_valid(flags) ||
2768 !_dispatch_qos_class_valid(qos_class, relative_priority)) {
2769 return DISPATCH_BAD_INPUT;
2770 }
2771 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2772 pthread_priority_t pri = 0;
2773 #if HAVE_PTHREAD_WORKQUEUE_QOS
2774 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0);
2775 #endif
2776 return _dispatch_block_create_with_voucher_and_priority(flags, NULL,
2777 pri, block);
2778 }
2779
2780 dispatch_block_t
2781 dispatch_block_create_with_voucher(dispatch_block_flags_t flags,
2782 voucher_t voucher, dispatch_block_t block)
2783 {
2784 if (!_dispatch_block_flags_valid(flags)) return DISPATCH_BAD_INPUT;
2785 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2786 return _dispatch_block_create_with_voucher_and_priority(flags, voucher, 0,
2787 block);
2788 }
2789
2790 dispatch_block_t
2791 dispatch_block_create_with_voucher_and_qos_class(dispatch_block_flags_t flags,
2792 voucher_t voucher, dispatch_qos_class_t qos_class,
2793 int relative_priority, dispatch_block_t block)
2794 {
2795 if (!_dispatch_block_flags_valid(flags) ||
2796 !_dispatch_qos_class_valid(qos_class, relative_priority)) {
2797 return DISPATCH_BAD_INPUT;
2798 }
2799 flags |= (DISPATCH_BLOCK_HAS_VOUCHER|DISPATCH_BLOCK_HAS_PRIORITY);
2800 pthread_priority_t pri = 0;
2801 #if HAVE_PTHREAD_WORKQUEUE_QOS
2802 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0);
2803 #endif
2804 return _dispatch_block_create_with_voucher_and_priority(flags, voucher,
2805 pri, block);
2806 }
2807
2808 void
2809 dispatch_block_perform(dispatch_block_flags_t flags, dispatch_block_t block)
2810 {
2811 if (!_dispatch_block_flags_valid(flags)) {
2812 DISPATCH_CLIENT_CRASH(flags, "Invalid flags passed to "
2813 "dispatch_block_perform()");
2814 }
2815 flags = _dispatch_block_normalize_flags(flags);
2816 struct dispatch_block_private_data_s dbpds =
2817 DISPATCH_BLOCK_PRIVATE_DATA_PERFORM_INITIALIZER(flags, block);
2818 return _dispatch_block_invoke_direct(&dbpds);
2819 }
2820
2821 #define _dbpd_group(dbpd) ((dbpd)->dbpd_group)
2822
2823 void
2824 _dispatch_block_invoke_direct(const struct dispatch_block_private_data_s *dbcpd)
2825 {
2826 dispatch_block_private_data_t dbpd = (dispatch_block_private_data_t)dbcpd;
2827 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2828 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2829 if (slowpath(atomic_flags & DBF_WAITED)) {
2830 DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
2831 "run more than once and waited for");
2832 }
2833 if (atomic_flags & DBF_CANCELED) goto out;
2834
2835 pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY;
2836 _dispatch_thread_set_self_t adopt_flags = 0;
2837 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
2838 op = _dispatch_get_priority();
2839 p = dbpd->dbpd_priority;
2840 if (_dispatch_block_sync_should_enforce_qos_class(flags)) {
2841 adopt_flags |= DISPATCH_PRIORITY_ENFORCE;
2842 }
2843 }
2844 voucher_t ov, v = DISPATCH_NO_VOUCHER;
2845 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2846 v = dbpd->dbpd_voucher;
2847 }
2848 ov = _dispatch_adopt_priority_and_set_voucher(p, v, adopt_flags);
2849 dbpd->dbpd_thread = _dispatch_tid_self();
2850 _dispatch_client_callout(dbpd->dbpd_block,
2851 _dispatch_Block_invoke(dbpd->dbpd_block));
2852 _dispatch_reset_priority_and_voucher(op, ov);
2853 out:
2854 if ((atomic_flags & DBF_PERFORM) == 0) {
2855 if (os_atomic_inc2o(dbpd, dbpd_performed, relaxed) == 1) {
2856 dispatch_group_leave(_dbpd_group(dbpd));
2857 }
2858 }
2859 }
2860
2861 void
2862 _dispatch_block_sync_invoke(void *block)
2863 {
2864 dispatch_block_t b = block;
2865 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
2866 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2867 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2868 if (slowpath(atomic_flags & DBF_WAITED)) {
2869 DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
2870 "run more than once and waited for");
2871 }
2872 if (atomic_flags & DBF_CANCELED) goto out;
2873
2874 pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY;
2875 _dispatch_thread_set_self_t adopt_flags = 0;
2876 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
2877 op = _dispatch_get_priority();
2878 p = dbpd->dbpd_priority;
2879 if (_dispatch_block_sync_should_enforce_qos_class(flags)) {
2880 adopt_flags |= DISPATCH_PRIORITY_ENFORCE;
2881 }
2882 }
2883 voucher_t ov, v = DISPATCH_NO_VOUCHER;
2884 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2885 v = dbpd->dbpd_voucher;
2886 }
2887 ov = _dispatch_adopt_priority_and_set_voucher(p, v, adopt_flags);
2888 dbpd->dbpd_block();
2889 _dispatch_reset_priority_and_voucher(op, ov);
2890 out:
2891 if ((atomic_flags & DBF_PERFORM) == 0) {
2892 if (os_atomic_inc2o(dbpd, dbpd_performed, relaxed) == 1) {
2893 dispatch_group_leave(_dbpd_group(dbpd));
2894 }
2895 }
2896
2897 os_mpsc_queue_t oq;
2898 oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
2899 if (oq) {
2900 // balances dispatch_{,barrier_,}sync
2901 _os_object_release_internal(oq->_as_os_obj);
2902 }
2903 }
2904
2905 DISPATCH_ALWAYS_INLINE
2906 static void
2907 _dispatch_block_async_invoke2(dispatch_block_t b, bool release)
2908 {
2909 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
2910 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2911 if (slowpath(atomic_flags & DBF_WAITED)) {
2912 DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
2913 "run more than once and waited for");
2914 }
2915 if (!slowpath(atomic_flags & DBF_CANCELED)) {
2916 dbpd->dbpd_block();
2917 }
2918 if ((atomic_flags & DBF_PERFORM) == 0) {
2919 if (os_atomic_inc2o(dbpd, dbpd_performed, relaxed) == 1) {
2920 dispatch_group_leave(_dbpd_group(dbpd));
2921 }
2922 }
2923 os_mpsc_queue_t oq;
2924 oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
2925 if (oq) {
2926 // balances dispatch_{,barrier_,group_}async
2927 _os_object_release_internal_inline(oq->_as_os_obj);
2928 }
2929 if (release) {
2930 Block_release(b);
2931 }
2932 }
2933
2934 static void
2935 _dispatch_block_async_invoke(void *block)
2936 {
2937 _dispatch_block_async_invoke2(block, false);
2938 }
2939
2940 static void
2941 _dispatch_block_async_invoke_and_release(void *block)
2942 {
2943 _dispatch_block_async_invoke2(block, true);
2944 }
2945
2946 void
2947 dispatch_block_cancel(dispatch_block_t db)
2948 {
2949 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2950 if (!dbpd) {
2951 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
2952 "dispatch_block_cancel()");
2953 }
2954 (void)os_atomic_or2o(dbpd, dbpd_atomic_flags, DBF_CANCELED, relaxed);
2955 }
2956
2957 long
2958 dispatch_block_testcancel(dispatch_block_t db)
2959 {
2960 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2961 if (!dbpd) {
2962 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
2963 "dispatch_block_testcancel()");
2964 }
2965 return (bool)(dbpd->dbpd_atomic_flags & DBF_CANCELED);
2966 }
2967
2968 long
2969 dispatch_block_wait(dispatch_block_t db, dispatch_time_t timeout)
2970 {
2971 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2972 if (!dbpd) {
2973 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
2974 "dispatch_block_wait()");
2975 }
2976
2977 unsigned int flags = os_atomic_or_orig2o(dbpd, dbpd_atomic_flags,
2978 DBF_WAITING, relaxed);
2979 if (slowpath(flags & (DBF_WAITED | DBF_WAITING))) {
2980 DISPATCH_CLIENT_CRASH(flags, "A block object may not be waited for "
2981 "more than once");
2982 }
2983
2984 // <rdar://problem/17703192> If we know the queue where this block is
2985 // enqueued, or the thread that's executing it, then we should boost
2986 // it here.
2987
2988 pthread_priority_t pp = _dispatch_get_priority();
2989
2990 os_mpsc_queue_t boost_oq;
2991 boost_oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
2992 if (boost_oq) {
2993 // release balances dispatch_{,barrier_,group_}async.
2994 // Can't put the queue back in the timeout case: the block might
2995 // finish after we fell out of group_wait and see our NULL, so
2996 // neither of us would ever release. Side effect: After a _wait
2997 // that times out, subsequent waits will not boost the qos of the
2998 // still-running block.
2999 dx_wakeup(boost_oq, pp, DISPATCH_WAKEUP_OVERRIDING |
3000 DISPATCH_WAKEUP_CONSUME);
3001 }
3002
3003 mach_port_t boost_th = dbpd->dbpd_thread;
3004 if (boost_th) {
3005 _dispatch_thread_override_start(boost_th, pp, dbpd);
3006 }
3007
3008 int performed = os_atomic_load2o(dbpd, dbpd_performed, relaxed);
3009 if (slowpath(performed > 1 || (boost_th && boost_oq))) {
3010 DISPATCH_CLIENT_CRASH(performed, "A block object may not be both "
3011 "run more than once and waited for");
3012 }
3013
3014 long ret = dispatch_group_wait(_dbpd_group(dbpd), timeout);
3015
3016 if (boost_th) {
3017 _dispatch_thread_override_end(boost_th, dbpd);
3018 }
3019
3020 if (ret) {
3021 // timed out: reverse our changes
3022 (void)os_atomic_and2o(dbpd, dbpd_atomic_flags,
3023 ~DBF_WAITING, relaxed);
3024 } else {
3025 (void)os_atomic_or2o(dbpd, dbpd_atomic_flags,
3026 DBF_WAITED, relaxed);
3027 // don't need to re-test here: the second call would see
3028 // the first call's WAITING
3029 }
3030
3031 return ret;
3032 }
3033
3034 void
3035 dispatch_block_notify(dispatch_block_t db, dispatch_queue_t queue,
3036 dispatch_block_t notification_block)
3037 {
3038 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
3039 if (!dbpd) {
3040 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
3041 "dispatch_block_notify()");
3042 }
3043 int performed = os_atomic_load2o(dbpd, dbpd_performed, relaxed);
3044 if (slowpath(performed > 1)) {
3045 DISPATCH_CLIENT_CRASH(performed, "A block object may not be both "
3046 "run more than once and observed");
3047 }
3048
3049 return dispatch_group_notify(_dbpd_group(dbpd), queue, notification_block);
3050 }
3051
3052 DISPATCH_NOINLINE
3053 void
3054 _dispatch_continuation_init_slow(dispatch_continuation_t dc,
3055 dispatch_queue_class_t dqu, dispatch_block_flags_t flags)
3056 {
3057 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(dc->dc_ctxt);
3058 dispatch_block_flags_t block_flags = dbpd->dbpd_flags;
3059 uintptr_t dc_flags = dc->dc_flags;
3060 os_mpsc_queue_t oq = dqu._oq;
3061
3062 // balanced in d_block_async_invoke_and_release or d_block_wait
3063 if (os_atomic_cmpxchg2o(dbpd, dbpd_queue, NULL, oq, relaxed)) {
3064 _os_object_retain_internal_inline(oq->_as_os_obj);
3065 }
3066
3067 if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
3068 dc->dc_func = _dispatch_block_async_invoke_and_release;
3069 } else {
3070 dc->dc_func = _dispatch_block_async_invoke;
3071 }
3072
3073 flags |= block_flags;
3074 if (block_flags & DISPATCH_BLOCK_HAS_PRIORITY) {
3075 _dispatch_continuation_priority_set(dc, dbpd->dbpd_priority, flags);
3076 } else {
3077 _dispatch_continuation_priority_set(dc, dc->dc_priority, flags);
3078 }
3079 if (block_flags & DISPATCH_BLOCK_BARRIER) {
3080 dc_flags |= DISPATCH_OBJ_BARRIER_BIT;
3081 }
3082 if (block_flags & DISPATCH_BLOCK_HAS_VOUCHER) {
3083 voucher_t v = dbpd->dbpd_voucher;
3084 dc->dc_voucher = v ? _voucher_retain(v) : NULL;
3085 dc_flags |= DISPATCH_OBJ_ENFORCE_VOUCHER;
3086 _dispatch_voucher_debug("continuation[%p] set", dc->dc_voucher, dc);
3087 _dispatch_voucher_ktrace_dc_push(dc);
3088 } else {
3089 _dispatch_continuation_voucher_set(dc, oq, flags);
3090 }
3091 dc_flags |= DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT;
3092 dc->dc_flags = dc_flags;
3093 }
3094
3095 void
3096 _dispatch_continuation_update_bits(dispatch_continuation_t dc,
3097 uintptr_t dc_flags)
3098 {
3099 dc->dc_flags = dc_flags;
3100 if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
3101 if (dc_flags & DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT) {
3102 dc->dc_func = _dispatch_block_async_invoke_and_release;
3103 } else if (dc_flags & DISPATCH_OBJ_BLOCK_BIT) {
3104 dc->dc_func = _dispatch_call_block_and_release;
3105 }
3106 } else {
3107 if (dc_flags & DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT) {
3108 dc->dc_func = _dispatch_block_async_invoke;
3109 } else if (dc_flags & DISPATCH_OBJ_BLOCK_BIT) {
3110 dc->dc_func = _dispatch_Block_invoke(dc->dc_ctxt);
3111 }
3112 }
3113 }
3114
3115 #endif // __BLOCKS__
3116
3117 #pragma mark -
3118 #pragma mark dispatch_barrier_async
3119
3120 DISPATCH_NOINLINE
3121 static void
3122 _dispatch_async_f_slow(dispatch_queue_t dq, void *ctxt,
3123 dispatch_function_t func, pthread_priority_t pp,
3124 dispatch_block_flags_t flags, uintptr_t dc_flags)
3125 {
3126 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
3127 _dispatch_continuation_init_f(dc, dq, ctxt, func, pp, flags, dc_flags);
3128 _dispatch_continuation_async(dq, dc);
3129 }
3130
3131 DISPATCH_ALWAYS_INLINE
3132 static inline void
3133 _dispatch_barrier_async_f2(dispatch_queue_t dq, void *ctxt,
3134 dispatch_function_t func, pthread_priority_t pp,
3135 dispatch_block_flags_t flags)
3136 {
3137 dispatch_continuation_t dc = _dispatch_continuation_alloc_cacheonly();
3138 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
3139
3140 if (!fastpath(dc)) {
3141 return _dispatch_async_f_slow(dq, ctxt, func, pp, flags, dc_flags);
3142 }
3143
3144 _dispatch_continuation_init_f(dc, dq, ctxt, func, pp, flags, dc_flags);
3145 _dispatch_continuation_push(dq, dc);
3146 }
3147
3148 DISPATCH_NOINLINE
3149 void
3150 dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
3151 dispatch_function_t func)
3152 {
3153 _dispatch_barrier_async_f2(dq, ctxt, func, 0, 0);
3154 }
3155
3156 DISPATCH_NOINLINE
3157 void
3158 _dispatch_barrier_async_detached_f(dispatch_queue_t dq, void *ctxt,
3159 dispatch_function_t func)
3160 {
3161 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3162 dc->dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
3163 dc->dc_func = func;
3164 dc->dc_ctxt = ctxt;
3165 dc->dc_voucher = DISPATCH_NO_VOUCHER;
3166 dc->dc_priority = DISPATCH_NO_PRIORITY;
3167 _dispatch_queue_push(dq, dc, 0);
3168 }
3169
3170 #ifdef __BLOCKS__
3171 void
3172 dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void))
3173 {
3174 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3175 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
3176
3177 _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
3178 _dispatch_continuation_push(dq, dc);
3179 }
3180 #endif
3181
3182 #pragma mark -
3183 #pragma mark dispatch_async
3184
3185 void
3186 _dispatch_async_redirect_invoke(dispatch_continuation_t dc,
3187 dispatch_invoke_flags_t flags)
3188 {
3189 dispatch_thread_frame_s dtf;
3190 struct dispatch_continuation_s *other_dc = dc->dc_other;
3191 dispatch_invoke_flags_t ctxt_flags = (dispatch_invoke_flags_t)dc->dc_ctxt;
3192 // if we went through _dispatch_root_queue_push_override,
3193 // the "right" root queue was stuffed into dc_func
3194 dispatch_queue_t assumed_rq = (dispatch_queue_t)dc->dc_func;
3195 dispatch_queue_t dq = dc->dc_data, rq, old_dq;
3196 struct _dispatch_identity_s di;
3197
3198 pthread_priority_t op, dp, old_dp;
3199
3200 if (ctxt_flags) {
3201 flags &= ~_DISPATCH_INVOKE_AUTORELEASE_MASK;
3202 flags |= ctxt_flags;
3203 }
3204 old_dq = _dispatch_get_current_queue();
3205 if (assumed_rq) {
3206 _dispatch_queue_set_current(assumed_rq);
3207 _dispatch_root_queue_identity_assume(&di, 0);
3208 }
3209
3210 old_dp = _dispatch_set_defaultpriority(dq->dq_priority, &dp);
3211 op = dq->dq_override;
3212 if (op > (dp & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
3213 _dispatch_wqthread_override_start(_dispatch_tid_self(), op);
3214 // Ensure that the root queue sees that this thread was overridden.
3215 _dispatch_set_defaultpriority_override();
3216 }
3217
3218 _dispatch_thread_frame_push(&dtf, dq);
3219 _dispatch_continuation_pop_forwarded(dc, DISPATCH_NO_VOUCHER,
3220 DISPATCH_OBJ_CONSUME_BIT, {
3221 _dispatch_continuation_pop(other_dc, dq, flags);
3222 });
3223 _dispatch_thread_frame_pop(&dtf);
3224 if (assumed_rq) {
3225 _dispatch_root_queue_identity_restore(&di);
3226 _dispatch_queue_set_current(old_dq);
3227 }
3228 _dispatch_reset_defaultpriority(old_dp);
3229
3230 rq = dq->do_targetq;
3231 while (slowpath(rq->do_targetq) && rq != old_dq) {
3232 _dispatch_non_barrier_complete(rq);
3233 rq = rq->do_targetq;
3234 }
3235
3236 _dispatch_non_barrier_complete(dq);
3237
3238 if (dtf.dtf_deferred) {
3239 struct dispatch_object_s *dou = dtf.dtf_deferred;
3240 return _dispatch_queue_drain_deferred_invoke(dq, flags, 0, dou);
3241 }
3242
3243 _dispatch_release_tailcall(dq);
3244 }
3245
3246 DISPATCH_ALWAYS_INLINE
3247 static inline dispatch_continuation_t
3248 _dispatch_async_redirect_wrap(dispatch_queue_t dq, dispatch_object_t dou)
3249 {
3250 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3251
3252 dou._do->do_next = NULL;
3253 dc->do_vtable = DC_VTABLE(ASYNC_REDIRECT);
3254 dc->dc_func = NULL;
3255 dc->dc_ctxt = (void *)(uintptr_t)_dispatch_queue_autorelease_frequency(dq);
3256 dc->dc_data = dq;
3257 dc->dc_other = dou._do;
3258 dc->dc_voucher = DISPATCH_NO_VOUCHER;
3259 dc->dc_priority = DISPATCH_NO_PRIORITY;
3260 _dispatch_retain(dq);
3261 return dc;
3262 }
3263
3264 DISPATCH_NOINLINE
3265 static void
3266 _dispatch_async_f_redirect(dispatch_queue_t dq,
3267 dispatch_object_t dou, pthread_priority_t pp)
3268 {
3269 if (!slowpath(_dispatch_object_is_redirection(dou))) {
3270 dou._dc = _dispatch_async_redirect_wrap(dq, dou);
3271 }
3272 dq = dq->do_targetq;
3273
3274 // Find the queue to redirect to
3275 while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
3276 if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
3277 break;
3278 }
3279 if (!dou._dc->dc_ctxt) {
3280 // find first queue in descending target queue order that has
3281 // an autorelease frequency set, and use that as the frequency for
3282 // this continuation.
3283 dou._dc->dc_ctxt = (void *)
3284 (uintptr_t)_dispatch_queue_autorelease_frequency(dq);
3285 }
3286 dq = dq->do_targetq;
3287 }
3288
3289 _dispatch_queue_push(dq, dou, pp);
3290 }
3291
3292 DISPATCH_ALWAYS_INLINE
3293 static inline void
3294 _dispatch_continuation_redirect(dispatch_queue_t dq,
3295 struct dispatch_object_s *dc)
3296 {
3297 _dispatch_trace_continuation_pop(dq, dc);
3298 // This is a re-redirect, overrides have already been applied
3299 // by _dispatch_async_f2.
3300 // However we want to end up on the root queue matching `dc` qos, so pick up
3301 // the current override of `dq` which includes dc's overrde (and maybe more)
3302 _dispatch_async_f_redirect(dq, dc, dq->dq_override);
3303 _dispatch_introspection_queue_item_complete(dc);
3304 }
3305
3306 DISPATCH_NOINLINE
3307 static void
3308 _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
3309 {
3310 // <rdar://problem/24738102&24743140> reserving non barrier width
3311 // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
3312 // equivalent), so we have to check that this thread hasn't enqueued
3313 // anything ahead of this call or we can break ordering
3314 if (slowpath(dq->dq_items_tail)) {
3315 return _dispatch_continuation_push(dq, dc);
3316 }
3317
3318 if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {
3319 return _dispatch_continuation_push(dq, dc);
3320 }
3321
3322 return _dispatch_async_f_redirect(dq, dc,
3323 _dispatch_continuation_get_override_priority(dq, dc));
3324 }
3325
3326 DISPATCH_ALWAYS_INLINE
3327 static inline void
3328 _dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3329 pthread_priority_t pp, dispatch_block_flags_t flags)
3330 {
3331 dispatch_continuation_t dc = _dispatch_continuation_alloc_cacheonly();
3332 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
3333
3334 if (!fastpath(dc)) {
3335 return _dispatch_async_f_slow(dq, ctxt, func, pp, flags, dc_flags);
3336 }
3337
3338 _dispatch_continuation_init_f(dc, dq, ctxt, func, pp, flags, dc_flags);
3339 _dispatch_continuation_async2(dq, dc, false);
3340 }
3341
3342 DISPATCH_NOINLINE
3343 void
3344 dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
3345 {
3346 _dispatch_async_f(dq, ctxt, func, 0, 0);
3347 }
3348
3349 DISPATCH_NOINLINE
3350 void
3351 dispatch_async_enforce_qos_class_f(dispatch_queue_t dq, void *ctxt,
3352 dispatch_function_t func)
3353 {
3354 _dispatch_async_f(dq, ctxt, func, 0, DISPATCH_BLOCK_ENFORCE_QOS_CLASS);
3355 }
3356
3357 #ifdef __BLOCKS__
3358 void
3359 dispatch_async(dispatch_queue_t dq, void (^work)(void))
3360 {
3361 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3362 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
3363
3364 _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
3365 _dispatch_continuation_async(dq, dc);
3366 }
3367 #endif
3368
3369 #pragma mark -
3370 #pragma mark dispatch_group_async
3371
3372 DISPATCH_ALWAYS_INLINE
3373 static inline void
3374 _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
3375 dispatch_continuation_t dc)
3376 {
3377 dispatch_group_enter(dg);
3378 dc->dc_data = dg;
3379 _dispatch_continuation_async(dq, dc);
3380 }
3381
3382 DISPATCH_NOINLINE
3383 void
3384 dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
3385 dispatch_function_t func)
3386 {
3387 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3388 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_GROUP_BIT;
3389
3390 _dispatch_continuation_init_f(dc, dq, ctxt, func, 0, 0, dc_flags);
3391 _dispatch_continuation_group_async(dg, dq, dc);
3392 }
3393
3394 #ifdef __BLOCKS__
3395 void
3396 dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
3397 dispatch_block_t db)
3398 {
3399 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3400 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_GROUP_BIT;
3401
3402 _dispatch_continuation_init(dc, dq, db, 0, 0, dc_flags);
3403 _dispatch_continuation_group_async(dg, dq, dc);
3404 }
3405 #endif
3406
3407 #pragma mark -
3408 #pragma mark dispatch_sync / dispatch_barrier_sync recurse and invoke
3409
3410 DISPATCH_NOINLINE
3411 static void
3412 _dispatch_sync_function_invoke_slow(dispatch_queue_t dq, void *ctxt,
3413 dispatch_function_t func)
3414 {
3415 voucher_t ov;
3416 dispatch_thread_frame_s dtf;
3417 _dispatch_thread_frame_push(&dtf, dq);
3418 ov = _dispatch_set_priority_and_voucher(0, dq->dq_override_voucher, 0);
3419 _dispatch_client_callout(ctxt, func);
3420 _dispatch_perfmon_workitem_inc();
3421 _dispatch_reset_voucher(ov, 0);
3422 _dispatch_thread_frame_pop(&dtf);
3423 }
3424
3425 DISPATCH_ALWAYS_INLINE
3426 static inline void
3427 _dispatch_sync_function_invoke_inline(dispatch_queue_t dq, void *ctxt,
3428 dispatch_function_t func)
3429 {
3430 if (slowpath(dq->dq_override_voucher != DISPATCH_NO_VOUCHER)) {
3431 return _dispatch_sync_function_invoke_slow(dq, ctxt, func);
3432 }
3433 dispatch_thread_frame_s dtf;
3434 _dispatch_thread_frame_push(&dtf, dq);
3435 _dispatch_client_callout(ctxt, func);
3436 _dispatch_perfmon_workitem_inc();
3437 _dispatch_thread_frame_pop(&dtf);
3438 }
3439
3440 DISPATCH_NOINLINE
3441 static void
3442 _dispatch_sync_function_invoke(dispatch_queue_t dq, void *ctxt,
3443 dispatch_function_t func)
3444 {
3445 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
3446 }
3447
3448 void
3449 _dispatch_sync_recurse_invoke(void *ctxt)
3450 {
3451 dispatch_continuation_t dc = ctxt;
3452 _dispatch_sync_function_invoke(dc->dc_data, dc->dc_ctxt, dc->dc_func);
3453 }
3454
3455 DISPATCH_ALWAYS_INLINE
3456 static inline void
3457 _dispatch_sync_function_recurse(dispatch_queue_t dq, void *ctxt,
3458 dispatch_function_t func, pthread_priority_t pp)
3459 {
3460 struct dispatch_continuation_s dc = {
3461 .dc_data = dq,
3462 .dc_func = func,
3463 .dc_ctxt = ctxt,
3464 };
3465 _dispatch_sync_f(dq->do_targetq, &dc, _dispatch_sync_recurse_invoke, pp);
3466 }
3467
3468 DISPATCH_NOINLINE
3469 static void
3470 _dispatch_non_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
3471 dispatch_function_t func)
3472 {
3473 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
3474 _dispatch_non_barrier_complete(dq);
3475 }
3476
3477 DISPATCH_NOINLINE
3478 static void
3479 _dispatch_non_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
3480 dispatch_function_t func, pthread_priority_t pp)
3481 {
3482 _dispatch_sync_function_recurse(dq, ctxt, func, pp);
3483 _dispatch_non_barrier_complete(dq);
3484 }
3485
3486 DISPATCH_ALWAYS_INLINE
3487 static void
3488 _dispatch_non_barrier_sync_f_invoke_inline(dispatch_queue_t dq, void *ctxt,
3489 dispatch_function_t func, pthread_priority_t pp)
3490 {
3491 _dispatch_introspection_non_barrier_sync_begin(dq, func);
3492 if (slowpath(dq->do_targetq->do_targetq)) {
3493 return _dispatch_non_barrier_sync_f_recurse(dq, ctxt, func, pp);
3494 }
3495 _dispatch_non_barrier_sync_f_invoke(dq, ctxt, func);
3496 }
3497
3498 #pragma mark -
3499 #pragma mark dispatch_barrier_sync
3500
3501 DISPATCH_NOINLINE
3502 static void
3503 _dispatch_barrier_complete(dispatch_queue_t dq)
3504 {
3505 uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
3506 dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
3507
3508 if (slowpath(dq->dq_items_tail)) {
3509 return _dispatch_try_lock_transfer_or_wakeup(dq);
3510 }
3511
3512 if (!fastpath(_dispatch_queue_drain_try_unlock(dq, owned))) {
3513 // someone enqueued a slow item at the head
3514 // looping may be its last chance
3515 return _dispatch_try_lock_transfer_or_wakeup(dq);
3516 }
3517 }
3518
3519 DISPATCH_NOINLINE
3520 static void
3521 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
3522 dispatch_function_t func, pthread_priority_t pp)
3523 {
3524 _dispatch_sync_function_recurse(dq, ctxt, func, pp);
3525 _dispatch_barrier_complete(dq);
3526 }
3527
3528 DISPATCH_NOINLINE
3529 static void
3530 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
3531 dispatch_function_t func)
3532 {
3533 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
3534 _dispatch_barrier_complete(dq);
3535 }
3536
3537 DISPATCH_ALWAYS_INLINE
3538 static void
3539 _dispatch_barrier_sync_f_invoke_inline(dispatch_queue_t dq, void *ctxt,
3540 dispatch_function_t func, pthread_priority_t pp)
3541 {
3542 _dispatch_introspection_barrier_sync_begin(dq, func);
3543 if (slowpath(dq->do_targetq->do_targetq)) {
3544 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func, pp);
3545 }
3546 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
3547 }
3548
3549 typedef struct dispatch_barrier_sync_context_s {
3550 struct dispatch_continuation_s dbsc_dc;
3551 dispatch_thread_frame_s dbsc_dtf;
3552 } *dispatch_barrier_sync_context_t;
3553
3554 static void
3555 _dispatch_barrier_sync_f_slow_invoke(void *ctxt)
3556 {
3557 dispatch_barrier_sync_context_t dbsc = ctxt;
3558 dispatch_continuation_t dc = &dbsc->dbsc_dc;
3559 dispatch_queue_t dq = dc->dc_data;
3560 dispatch_thread_event_t event = (dispatch_thread_event_t)dc->dc_other;
3561
3562 dispatch_assert(dq == _dispatch_queue_get_current());
3563 #if DISPATCH_COCOA_COMPAT
3564 if (slowpath(_dispatch_queue_is_thread_bound(dq))) {
3565 dispatch_assert(_dispatch_thread_frame_get_current() == NULL);
3566
3567 // the block runs on the thread the queue is bound to and not
3568 // on the calling thread, but we mean to see the calling thread
3569 // dispatch thread frames, so we fake the link, and then undo it
3570 _dispatch_thread_frame_set_current(&dbsc->dbsc_dtf);
3571 // The queue is bound to a non-dispatch thread (e.g. main thread)
3572 _dispatch_continuation_voucher_adopt(dc, DISPATCH_NO_VOUCHER,
3573 DISPATCH_OBJ_CONSUME_BIT);
3574 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
3575 os_atomic_store2o(dc, dc_func, NULL, release);
3576 _dispatch_thread_frame_set_current(NULL);
3577 }
3578 #endif
3579 _dispatch_thread_event_signal(event); // release
3580 }
3581
3582 DISPATCH_NOINLINE
3583 static void
3584 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
3585 dispatch_function_t func, pthread_priority_t pp)
3586 {
3587 if (slowpath(!dq->do_targetq)) {
3588 // see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
3589 return _dispatch_sync_function_invoke(dq, ctxt, func);
3590 }
3591
3592 if (!pp) {
3593 pp = _dispatch_get_priority();
3594 pp &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3595 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
3596 }
3597 dispatch_thread_event_s event;
3598 _dispatch_thread_event_init(&event);
3599 struct dispatch_barrier_sync_context_s dbsc = {
3600 .dbsc_dc = {
3601 .dc_data = dq,
3602 #if DISPATCH_COCOA_COMPAT
3603 .dc_func = func,
3604 .dc_ctxt = ctxt,
3605 #endif
3606 .dc_other = &event,
3607 }
3608 };
3609 #if DISPATCH_COCOA_COMPAT
3610 // It's preferred to execute synchronous blocks on the current thread
3611 // due to thread-local side effects, etc. However, blocks submitted
3612 // to the main thread MUST be run on the main thread
3613 if (slowpath(_dispatch_queue_is_thread_bound(dq))) {
3614 // consumed by _dispatch_barrier_sync_f_slow_invoke
3615 // or in the DISPATCH_COCOA_COMPAT hunk below
3616 _dispatch_continuation_voucher_set(&dbsc.dbsc_dc, dq, 0);
3617 // save frame linkage for _dispatch_barrier_sync_f_slow_invoke
3618 _dispatch_thread_frame_save_state(&dbsc.dbsc_dtf);
3619 // thread bound queues cannot mutate their target queue hierarchy
3620 // so it's fine to look now
3621 _dispatch_introspection_barrier_sync_begin(dq, func);
3622 }
3623 #endif
3624 uint32_t th_self = _dispatch_tid_self();
3625 struct dispatch_continuation_s dbss = {
3626 .dc_flags = DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT,
3627 .dc_func = _dispatch_barrier_sync_f_slow_invoke,
3628 .dc_ctxt = &dbsc,
3629 .dc_data = (void*)(uintptr_t)th_self,
3630 .dc_priority = pp,
3631 .dc_other = &event,
3632 .dc_voucher = DISPATCH_NO_VOUCHER,
3633 };
3634
3635 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
3636 if (unlikely(_dq_state_drain_locked_by(dq_state, th_self))) {
3637 DISPATCH_CLIENT_CRASH(dq, "dispatch_barrier_sync called on queue "
3638 "already owned by current thread");
3639 }
3640
3641 _dispatch_continuation_push_sync_slow(dq, &dbss);
3642 _dispatch_thread_event_wait(&event); // acquire
3643 _dispatch_thread_event_destroy(&event);
3644 if (_dispatch_queue_received_override(dq, pp)) {
3645 // Ensure that the root queue sees that this thread was overridden.
3646 // pairs with the _dispatch_wqthread_override_start in
3647 // _dispatch_continuation_slow_item_signal
3648 _dispatch_set_defaultpriority_override();
3649 }
3650
3651 #if DISPATCH_COCOA_COMPAT
3652 // Queue bound to a non-dispatch thread
3653 if (dbsc.dbsc_dc.dc_func == NULL) {
3654 return;
3655 } else if (dbsc.dbsc_dc.dc_voucher) {
3656 // this almost never happens, unless a dispatch_sync() onto a thread
3657 // bound queue went to the slow path at the same time dispatch_main()
3658 // is called, or the queue is detached from the runloop.
3659 _voucher_release(dbsc.dbsc_dc.dc_voucher);
3660 }
3661 #endif
3662
3663 _dispatch_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
3664 }
3665
3666 DISPATCH_ALWAYS_INLINE
3667 static inline void
3668 _dispatch_barrier_sync_f2(dispatch_queue_t dq, void *ctxt,
3669 dispatch_function_t func, pthread_priority_t pp)
3670 {
3671 if (slowpath(!_dispatch_queue_try_acquire_barrier_sync(dq))) {
3672 // global concurrent queues and queues bound to non-dispatch threads
3673 // always fall into the slow case
3674 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, pp);
3675 }
3676 //
3677 // TODO: the more correct thing to do would be to set dq_override to the qos
3678 // of the thread that just acquired the barrier lock here. Unwinding that
3679 // would slow down the uncontended fastpath however.
3680 //
3681 // The chosen tradeoff is that if an enqueue on a lower priority thread
3682 // contends with this fastpath, this thread may receive a useless override.
3683 // Improving this requires the override level to be part of the atomic
3684 // dq_state
3685 //
3686 _dispatch_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
3687 }
3688
3689 DISPATCH_NOINLINE
3690 static void
3691 _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
3692 dispatch_function_t func, pthread_priority_t pp)
3693 {
3694 _dispatch_barrier_sync_f2(dq, ctxt, func, pp);
3695 }
3696
3697 DISPATCH_NOINLINE
3698 void
3699 dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
3700 dispatch_function_t func)
3701 {
3702 _dispatch_barrier_sync_f2(dq, ctxt, func, 0);
3703 }
3704
3705 #ifdef __BLOCKS__
3706 DISPATCH_NOINLINE
3707 static void
3708 _dispatch_sync_block_with_private_data(dispatch_queue_t dq,
3709 void (^work)(void), dispatch_block_flags_t flags)
3710 {
3711 pthread_priority_t pp = _dispatch_block_get_priority(work);
3712
3713 flags |= _dispatch_block_get_flags(work);
3714 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
3715 pthread_priority_t tp = _dispatch_get_priority();
3716 tp &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3717 if (pp < tp) {
3718 pp = tp | _PTHREAD_PRIORITY_ENFORCE_FLAG;
3719 } else if (_dispatch_block_sync_should_enforce_qos_class(flags)) {
3720 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
3721 }
3722 }
3723 // balanced in d_block_sync_invoke or d_block_wait
3724 if (os_atomic_cmpxchg2o(_dispatch_block_get_data(work),
3725 dbpd_queue, NULL, dq, relaxed)) {
3726 _dispatch_retain(dq);
3727 }
3728 if (flags & DISPATCH_BLOCK_BARRIER) {
3729 _dispatch_barrier_sync_f(dq, work, _dispatch_block_sync_invoke, pp);
3730 } else {
3731 _dispatch_sync_f(dq, work, _dispatch_block_sync_invoke, pp);
3732 }
3733 }
3734
3735 void
3736 dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void))
3737 {
3738 if (slowpath(_dispatch_block_has_private_data(work))) {
3739 dispatch_block_flags_t flags = DISPATCH_BLOCK_BARRIER;
3740 return _dispatch_sync_block_with_private_data(dq, work, flags);
3741 }
3742 dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work));
3743 }
3744 #endif
3745
3746 DISPATCH_NOINLINE
3747 void
3748 _dispatch_barrier_trysync_or_async_f(dispatch_queue_t dq, void *ctxt,
3749 dispatch_function_t func)
3750 {
3751 // Use for mutation of queue-/source-internal state only, ignores target
3752 // queue hierarchy!
3753 if (!fastpath(_dispatch_queue_try_acquire_barrier_sync(dq))) {
3754 return _dispatch_barrier_async_detached_f(dq, ctxt, func);
3755 }
3756 // skip the recursion because it's about the queue state only
3757 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
3758 }
3759
3760 #pragma mark -
3761 #pragma mark dispatch_sync
3762
3763 DISPATCH_NOINLINE
3764 static void
3765 _dispatch_non_barrier_complete(dispatch_queue_t dq)
3766 {
3767 uint64_t old_state, new_state;
3768
3769 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
3770 new_state = old_state - DISPATCH_QUEUE_WIDTH_INTERVAL;
3771 if (_dq_state_is_runnable(new_state)) {
3772 if (!_dq_state_is_runnable(old_state)) {
3773 // we're making a FULL -> non FULL transition
3774 new_state |= DISPATCH_QUEUE_DIRTY;
3775 }
3776 if (!_dq_state_drain_locked(new_state)) {
3777 uint64_t full_width = new_state;
3778 if (_dq_state_has_pending_barrier(new_state)) {
3779 full_width -= DISPATCH_QUEUE_PENDING_BARRIER;
3780 full_width += DISPATCH_QUEUE_WIDTH_INTERVAL;
3781 full_width += DISPATCH_QUEUE_IN_BARRIER;
3782 } else {
3783 full_width += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
3784 full_width += DISPATCH_QUEUE_IN_BARRIER;
3785 }
3786 if ((full_width & DISPATCH_QUEUE_WIDTH_MASK) ==
3787 DISPATCH_QUEUE_WIDTH_FULL_BIT) {
3788 new_state = full_width;
3789 new_state &= ~DISPATCH_QUEUE_DIRTY;
3790 new_state |= _dispatch_tid_self();
3791 }
3792 }
3793 }
3794 });
3795
3796 if (_dq_state_is_in_barrier(new_state)) {
3797 return _dispatch_try_lock_transfer_or_wakeup(dq);
3798 }
3799 if (!_dq_state_is_runnable(old_state)) {
3800 _dispatch_queue_try_wakeup(dq, new_state,
3801 DISPATCH_WAKEUP_WAITER_HANDOFF);
3802 }
3803 }
3804
3805 DISPATCH_NOINLINE
3806 static void
3807 _dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3808 pthread_priority_t pp)
3809 {
3810 dispatch_assert(dq->do_targetq);
3811 if (!pp) {
3812 pp = _dispatch_get_priority();
3813 pp &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3814 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
3815 }
3816 dispatch_thread_event_s event;
3817 _dispatch_thread_event_init(&event);
3818 uint32_t th_self = _dispatch_tid_self();
3819 struct dispatch_continuation_s dc = {
3820 .dc_flags = DISPATCH_OBJ_SYNC_SLOW_BIT,
3821 #if DISPATCH_INTROSPECTION
3822 .dc_func = func,
3823 .dc_ctxt = ctxt,
3824 #endif
3825 .dc_data = (void*)(uintptr_t)th_self,
3826 .dc_other = &event,
3827 .dc_priority = pp,
3828 .dc_voucher = DISPATCH_NO_VOUCHER,
3829 };
3830
3831 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
3832 if (unlikely(_dq_state_drain_locked_by(dq_state, th_self))) {
3833 DISPATCH_CLIENT_CRASH(dq, "dispatch_sync called on queue "
3834 "already owned by current thread");
3835 }
3836
3837 _dispatch_continuation_push_sync_slow(dq, &dc);
3838 _dispatch_thread_event_wait(&event); // acquire
3839 _dispatch_thread_event_destroy(&event);
3840 if (_dispatch_queue_received_override(dq, pp)) {
3841 // Ensure that the root queue sees that this thread was overridden.
3842 // pairs with the _dispatch_wqthread_override_start in
3843 // _dispatch_continuation_slow_item_signal
3844 _dispatch_set_defaultpriority_override();
3845 }
3846 _dispatch_non_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
3847 }
3848
3849 DISPATCH_ALWAYS_INLINE
3850 static inline void
3851 _dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3852 pthread_priority_t pp)
3853 {
3854 // <rdar://problem/24738102&24743140> reserving non barrier width
3855 // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
3856 // equivalent), so we have to check that this thread hasn't enqueued
3857 // anything ahead of this call or we can break ordering
3858 if (slowpath(dq->dq_items_tail)) {
3859 return _dispatch_sync_f_slow(dq, ctxt, func, pp);
3860 }
3861 // concurrent queues do not respect width on sync
3862 if (slowpath(!_dispatch_queue_try_reserve_sync_width(dq))) {
3863 return _dispatch_sync_f_slow(dq, ctxt, func, pp);
3864 }
3865 _dispatch_non_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
3866 }
3867
3868 DISPATCH_NOINLINE
3869 static void
3870 _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3871 pthread_priority_t pp)
3872 {
3873 if (DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width)) {
3874 return _dispatch_sync_f2(dq, ctxt, func, pp);
3875 }
3876 return _dispatch_barrier_sync_f(dq, ctxt, func, pp);
3877 }
3878
3879 DISPATCH_NOINLINE
3880 void
3881 dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
3882 {
3883 if (DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width)) {
3884 return _dispatch_sync_f2(dq, ctxt, func, 0);
3885 }
3886 return dispatch_barrier_sync_f(dq, ctxt, func);
3887 }
3888
3889 #ifdef __BLOCKS__
3890 void
3891 dispatch_sync(dispatch_queue_t dq, void (^work)(void))
3892 {
3893 if (slowpath(_dispatch_block_has_private_data(work))) {
3894 return _dispatch_sync_block_with_private_data(dq, work, 0);
3895 }
3896 dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
3897 }
3898 #endif
3899
3900 #pragma mark -
3901 #pragma mark dispatch_trysync
3902
3903 struct trysync_context {
3904 dispatch_queue_t tc_dq;
3905 void *tc_ctxt;
3906 dispatch_function_t tc_func;
3907 };
3908
3909 DISPATCH_NOINLINE
3910 static int
3911 _dispatch_trysync_recurse(dispatch_queue_t dq,
3912 struct trysync_context *tc, bool barrier)
3913 {
3914 dispatch_queue_t tq = dq->do_targetq;
3915
3916 if (barrier) {
3917 if (slowpath(!_dispatch_queue_try_acquire_barrier_sync(dq))) {
3918 return EWOULDBLOCK;
3919 }
3920 } else {
3921 // <rdar://problem/24743140> check nothing was queued by the current
3922 // thread ahead of this call. _dispatch_queue_try_reserve_sync_width
3923 // ignores the ENQUEUED bit which could cause it to miss a barrier_async
3924 // made by the same thread just before.
3925 if (slowpath(dq->dq_items_tail)) {
3926 return EWOULDBLOCK;
3927 }
3928 // concurrent queues do not respect width on sync
3929 if (slowpath(!_dispatch_queue_try_reserve_sync_width(dq))) {
3930 return EWOULDBLOCK;
3931 }
3932 }
3933
3934 int rc = 0;
3935 if (_dispatch_queue_cannot_trysync(tq)) {
3936 _dispatch_queue_atomic_flags_set(dq, DQF_CANNOT_TRYSYNC);
3937 rc = ENOTSUP;
3938 } else if (tq->do_targetq) {
3939 rc = _dispatch_trysync_recurse(tq, tc, tq->dq_width == 1);
3940 if (rc == ENOTSUP) {
3941 _dispatch_queue_atomic_flags_set(dq, DQF_CANNOT_TRYSYNC);
3942 }
3943 } else {
3944 dispatch_thread_frame_s dtf;
3945 _dispatch_thread_frame_push(&dtf, tq);
3946 _dispatch_sync_function_invoke(tc->tc_dq, tc->tc_ctxt, tc->tc_func);
3947 _dispatch_thread_frame_pop(&dtf);
3948 }
3949 if (barrier) {
3950 _dispatch_barrier_complete(dq);
3951 } else {
3952 _dispatch_non_barrier_complete(dq);
3953 }
3954 return rc;
3955 }
3956
3957 DISPATCH_NOINLINE
3958 bool
3959 _dispatch_barrier_trysync_f(dispatch_queue_t dq, void *ctxt,
3960 dispatch_function_t f)
3961 {
3962 if (slowpath(!dq->do_targetq)) {
3963 _dispatch_sync_function_invoke(dq, ctxt, f);
3964 return true;
3965 }
3966 if (slowpath(_dispatch_queue_cannot_trysync(dq))) {
3967 return false;
3968 }
3969 struct trysync_context tc = {
3970 .tc_dq = dq,
3971 .tc_func = f,
3972 .tc_ctxt = ctxt,
3973 };
3974 return _dispatch_trysync_recurse(dq, &tc, true) == 0;
3975 }
3976
3977 DISPATCH_NOINLINE
3978 bool
3979 _dispatch_trysync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t f)
3980 {
3981 if (slowpath(!dq->do_targetq)) {
3982 _dispatch_sync_function_invoke(dq, ctxt, f);
3983 return true;
3984 }
3985 if (slowpath(_dispatch_queue_cannot_trysync(dq))) {
3986 return false;
3987 }
3988 struct trysync_context tc = {
3989 .tc_dq = dq,
3990 .tc_func = f,
3991 .tc_ctxt = ctxt,
3992 };
3993 return _dispatch_trysync_recurse(dq, &tc, dq->dq_width == 1) == 0;
3994 }
3995
3996 #pragma mark -
3997 #pragma mark dispatch_after
3998
3999 DISPATCH_ALWAYS_INLINE
4000 static inline void
4001 _dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
4002 void *ctxt, void *handler, bool block)
4003 {
4004 dispatch_source_t ds;
4005 uint64_t leeway, delta;
4006
4007 if (when == DISPATCH_TIME_FOREVER) {
4008 #if DISPATCH_DEBUG
4009 DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
4010 #endif
4011 return;
4012 }
4013
4014 delta = _dispatch_timeout(when);
4015 if (delta == 0) {
4016 if (block) {
4017 return dispatch_async(queue, handler);
4018 }
4019 return dispatch_async_f(queue, ctxt, handler);
4020 }
4021 leeway = delta / 10; // <rdar://problem/13447496>
4022
4023 if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
4024 if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
4025
4026 // this function can and should be optimized to not use a dispatch source
4027 ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
4028 dispatch_assert(ds);
4029
4030 dispatch_continuation_t dc = _dispatch_continuation_alloc();
4031 if (block) {
4032 _dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
4033 } else {
4034 _dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
4035 }
4036 // reference `ds` so that it doesn't show up as a leak
4037 dc->dc_data = ds;
4038 _dispatch_source_set_event_handler_continuation(ds, dc);
4039 dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, leeway);
4040 dispatch_activate(ds);
4041 }
4042
4043 DISPATCH_NOINLINE
4044 void
4045 dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
4046 dispatch_function_t func)
4047 {
4048 _dispatch_after(when, queue, ctxt, func, false);
4049 }
4050
4051 #ifdef __BLOCKS__
4052 void
4053 dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
4054 dispatch_block_t work)
4055 {
4056 _dispatch_after(when, queue, NULL, work, true);
4057 }
4058 #endif
4059
4060 #pragma mark -
4061 #pragma mark dispatch_queue_wakeup
4062
4063 DISPATCH_NOINLINE
4064 void
4065 _dispatch_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
4066 dispatch_wakeup_flags_t flags)
4067 {
4068 dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
4069
4070 if (_dispatch_queue_class_probe(dq)) {
4071 target = DISPATCH_QUEUE_WAKEUP_TARGET;
4072 }
4073 if (target) {
4074 return _dispatch_queue_class_wakeup(dq, pp, flags, target);
4075 } else if (pp) {
4076 return _dispatch_queue_class_override_drainer(dq, pp, flags);
4077 } else if (flags & DISPATCH_WAKEUP_CONSUME) {
4078 return _dispatch_release_tailcall(dq);
4079 }
4080 }
4081
4082 #if DISPATCH_COCOA_COMPAT
4083 DISPATCH_ALWAYS_INLINE
4084 static inline bool
4085 _dispatch_runloop_handle_is_valid(dispatch_runloop_handle_t handle)
4086 {
4087 #if TARGET_OS_MAC
4088 return MACH_PORT_VALID(handle);
4089 #elif defined(__linux__)
4090 return handle >= 0;
4091 #else
4092 #error "runloop support not implemented on this platform"
4093 #endif
4094 }
4095
4096 DISPATCH_ALWAYS_INLINE
4097 static inline dispatch_runloop_handle_t
4098 _dispatch_runloop_queue_get_handle(dispatch_queue_t dq)
4099 {
4100 #if TARGET_OS_MAC
4101 return ((dispatch_runloop_handle_t)(uintptr_t)dq->do_ctxt);
4102 #elif defined(__linux__)
4103 // decode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4104 return ((dispatch_runloop_handle_t)(uintptr_t)dq->do_ctxt) - 1;
4105 #else
4106 #error "runloop support not implemented on this platform"
4107 #endif
4108 }
4109
4110 DISPATCH_ALWAYS_INLINE
4111 static inline void
4112 _dispatch_runloop_queue_set_handle(dispatch_queue_t dq, dispatch_runloop_handle_t handle)
4113 {
4114 #if TARGET_OS_MAC
4115 dq->do_ctxt = (void *)(uintptr_t)handle;
4116 #elif defined(__linux__)
4117 // encode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4118 dq->do_ctxt = (void *)(uintptr_t)(handle + 1);
4119 #else
4120 #error "runloop support not implemented on this platform"
4121 #endif
4122 }
4123 #endif // DISPATCH_COCOA_COMPAT
4124
4125 void
4126 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
4127 dispatch_wakeup_flags_t flags)
4128 {
4129 #if DISPATCH_COCOA_COMPAT
4130 if (slowpath(_dispatch_queue_atomic_flags(dq) & DQF_RELEASED)) {
4131 // <rdar://problem/14026816>
4132 return _dispatch_queue_wakeup(dq, pp, flags);
4133 }
4134
4135 if (_dispatch_queue_class_probe(dq)) {
4136 return _dispatch_runloop_queue_poke(dq, pp, flags);
4137 }
4138
4139 pp = _dispatch_queue_reset_override_priority(dq, true);
4140 if (pp) {
4141 mach_port_t owner = DISPATCH_QUEUE_DRAIN_OWNER(dq);
4142 if (_dispatch_queue_class_probe(dq)) {
4143 _dispatch_runloop_queue_poke(dq, pp, flags);
4144 }
4145 _dispatch_thread_override_end(owner, dq);
4146 return;
4147 }
4148 if (flags & DISPATCH_WAKEUP_CONSUME) {
4149 return _dispatch_release_tailcall(dq);
4150 }
4151 #else
4152 return _dispatch_queue_wakeup(dq, pp, flags);
4153 #endif
4154 }
4155
4156 void
4157 _dispatch_main_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
4158 dispatch_wakeup_flags_t flags)
4159 {
4160 #if DISPATCH_COCOA_COMPAT
4161 if (_dispatch_queue_is_thread_bound(dq)) {
4162 return _dispatch_runloop_queue_wakeup(dq, pp, flags);
4163 }
4164 #endif
4165 return _dispatch_queue_wakeup(dq, pp, flags);
4166 }
4167
4168 void
4169 _dispatch_root_queue_wakeup(dispatch_queue_t dq,
4170 pthread_priority_t pp DISPATCH_UNUSED,
4171 dispatch_wakeup_flags_t flags)
4172 {
4173 if (flags & DISPATCH_WAKEUP_CONSUME) {
4174 // see _dispatch_queue_push_set_head
4175 dispatch_assert(flags & DISPATCH_WAKEUP_FLUSH);
4176 }
4177 _dispatch_global_queue_poke(dq);
4178 }
4179
4180 #pragma mark -
4181 #pragma mark dispatch root queues poke
4182
4183 #if DISPATCH_COCOA_COMPAT
4184 static inline void
4185 _dispatch_runloop_queue_class_poke(dispatch_queue_t dq)
4186 {
4187 dispatch_runloop_handle_t handle = _dispatch_runloop_queue_get_handle(dq);
4188 if (!_dispatch_runloop_handle_is_valid(handle)) {
4189 return;
4190 }
4191
4192 #if TARGET_OS_MAC
4193 mach_port_t mp = handle;
4194 kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0);
4195 switch (kr) {
4196 case MACH_SEND_TIMEOUT:
4197 case MACH_SEND_TIMED_OUT:
4198 case MACH_SEND_INVALID_DEST:
4199 break;
4200 default:
4201 (void)dispatch_assume_zero(kr);
4202 break;
4203 }
4204 #elif defined(__linux__)
4205 int result;
4206 do {
4207 result = eventfd_write(handle, 1);
4208 } while (result == -1 && errno == EINTR);
4209 (void)dispatch_assume_zero(result);
4210 #else
4211 #error "runloop support not implemented on this platform"
4212 #endif
4213 }
4214
4215 DISPATCH_NOINLINE
4216 static void
4217 _dispatch_runloop_queue_poke(dispatch_queue_t dq,
4218 pthread_priority_t pp, dispatch_wakeup_flags_t flags)
4219 {
4220 // it's not useful to handle WAKEUP_FLUSH because mach_msg() will have
4221 // a release barrier and that when runloop queues stop being thread bound
4222 // they have a non optional wake-up to start being a "normal" queue
4223 // either in _dispatch_runloop_queue_xref_dispose,
4224 // or in _dispatch_queue_cleanup2() for the main thread.
4225
4226 if (dq == &_dispatch_main_q) {
4227 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
4228 _dispatch_runloop_queue_handle_init);
4229 }
4230 _dispatch_queue_override_priority(dq, /* inout */ &pp, /* inout */ &flags);
4231 if (flags & DISPATCH_WAKEUP_OVERRIDING) {
4232 mach_port_t owner = DISPATCH_QUEUE_DRAIN_OWNER(dq);
4233 _dispatch_thread_override_start(owner, pp, dq);
4234 if (flags & DISPATCH_WAKEUP_WAS_OVERRIDDEN) {
4235 _dispatch_thread_override_end(owner, dq);
4236 }
4237 }
4238 _dispatch_runloop_queue_class_poke(dq);
4239 if (flags & DISPATCH_WAKEUP_CONSUME) {
4240 return _dispatch_release_tailcall(dq);
4241 }
4242 }
4243 #endif
4244
4245 DISPATCH_NOINLINE
4246 static void
4247 _dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n)
4248 {
4249 dispatch_root_queue_context_t qc = dq->do_ctxt;
4250 uint32_t i = n;
4251 int r;
4252
4253 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
4254 _dispatch_root_queues_init_once);
4255
4256 _dispatch_debug_root_queue(dq, __func__);
4257 #if HAVE_PTHREAD_WORKQUEUES
4258 #if DISPATCH_USE_PTHREAD_POOL
4259 if (qc->dgq_kworkqueue != (void*)(~0ul))
4260 #endif
4261 {
4262 _dispatch_root_queue_debug("requesting new worker thread for global "
4263 "queue: %p", dq);
4264 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4265 if (qc->dgq_kworkqueue) {
4266 pthread_workitem_handle_t wh;
4267 unsigned int gen_cnt;
4268 do {
4269 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
4270 _dispatch_worker_thread4, dq, &wh, &gen_cnt);
4271 (void)dispatch_assume_zero(r);
4272 } while (--i);
4273 return;
4274 }
4275 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4276 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
4277 if (!dq->dq_priority) {
4278 r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
4279 qc->dgq_wq_options, (int)i);
4280 (void)dispatch_assume_zero(r);
4281 return;
4282 }
4283 #endif
4284 #if HAVE_PTHREAD_WORKQUEUE_QOS
4285 r = _pthread_workqueue_addthreads((int)i, dq->dq_priority);
4286 (void)dispatch_assume_zero(r);
4287 #endif
4288 return;
4289 }
4290 #endif // HAVE_PTHREAD_WORKQUEUES
4291 #if DISPATCH_USE_PTHREAD_POOL
4292 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
4293 if (fastpath(pqc->dpq_thread_mediator.do_vtable)) {
4294 while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
4295 if (!--i) {
4296 return;
4297 }
4298 }
4299 }
4300 uint32_t j, t_count;
4301 // seq_cst with atomic store to tail <rdar://problem/16932833>
4302 t_count = os_atomic_load2o(qc, dgq_thread_pool_size, ordered);
4303 do {
4304 if (!t_count) {
4305 _dispatch_root_queue_debug("pthread pool is full for root queue: "
4306 "%p", dq);
4307 return;
4308 }
4309 j = i > t_count ? t_count : i;
4310 } while (!os_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count,
4311 t_count - j, &t_count, acquire));
4312
4313 pthread_attr_t *attr = &pqc->dpq_thread_attr;
4314 pthread_t tid, *pthr = &tid;
4315 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
4316 if (slowpath(dq == &_dispatch_mgr_root_queue)) {
4317 pthr = _dispatch_mgr_root_queue_init();
4318 }
4319 #endif
4320 do {
4321 _dispatch_retain(dq);
4322 while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
4323 if (r != EAGAIN) {
4324 (void)dispatch_assume_zero(r);
4325 }
4326 _dispatch_temporary_resource_shortage();
4327 }
4328 } while (--j);
4329 #endif // DISPATCH_USE_PTHREAD_POOL
4330 }
4331
4332 static inline void
4333 _dispatch_global_queue_poke_n(dispatch_queue_t dq, unsigned int n)
4334 {
4335 if (!_dispatch_queue_class_probe(dq)) {
4336 return;
4337 }
4338 #if HAVE_PTHREAD_WORKQUEUES
4339 dispatch_root_queue_context_t qc = dq->do_ctxt;
4340 if (
4341 #if DISPATCH_USE_PTHREAD_POOL
4342 (qc->dgq_kworkqueue != (void*)(~0ul)) &&
4343 #endif
4344 !os_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) {
4345 _dispatch_root_queue_debug("worker thread request still pending for "
4346 "global queue: %p", dq);
4347 return;
4348 }
4349 #endif // HAVE_PTHREAD_WORKQUEUES
4350 return _dispatch_global_queue_poke_slow(dq, n);
4351 }
4352
4353 static inline void
4354 _dispatch_global_queue_poke(dispatch_queue_t dq)
4355 {
4356 return _dispatch_global_queue_poke_n(dq, 1);
4357 }
4358
4359 DISPATCH_NOINLINE
4360 void
4361 _dispatch_queue_push_list_slow(dispatch_queue_t dq, unsigned int n)
4362 {
4363 return _dispatch_global_queue_poke_n(dq, n);
4364 }
4365
4366 #pragma mark -
4367 #pragma mark dispatch_queue_drain
4368
4369 void
4370 _dispatch_continuation_pop(dispatch_object_t dou, dispatch_queue_t dq,
4371 dispatch_invoke_flags_t flags)
4372 {
4373 _dispatch_continuation_pop_inline(dou, dq, flags);
4374 }
4375
4376 void
4377 _dispatch_continuation_invoke(dispatch_object_t dou, voucher_t override_voucher,
4378 dispatch_invoke_flags_t flags)
4379 {
4380 _dispatch_continuation_invoke_inline(dou, override_voucher, flags);
4381 }
4382
4383 /*
4384 * Drain comes in 2 flavours (serial/concurrent) and 2 modes
4385 * (redirecting or not).
4386 *
4387 * Serial
4388 * ~~~~~~
4389 * Serial drain is about serial queues (width == 1). It doesn't support
4390 * the redirecting mode, which doesn't make sense, and treats all continuations
4391 * as barriers. Bookkeeping is minimal in serial flavour, most of the loop
4392 * is optimized away.
4393 *
4394 * Serial drain stops if the width of the queue grows to larger than 1.
4395 * Going through a serial drain prevents any recursive drain from being
4396 * redirecting.
4397 *
4398 * Concurrent
4399 * ~~~~~~~~~~
4400 * When in non-redirecting mode (meaning one of the target queues is serial),
4401 * non-barriers and barriers alike run in the context of the drain thread.
4402 * Slow non-barrier items are still all signaled so that they can make progress
4403 * toward the dispatch_sync() that will serialize them all .
4404 *
4405 * In redirecting mode, non-barrier work items are redirected downward.
4406 *
4407 * Concurrent drain stops if the width of the queue becomes 1, so that the
4408 * queue drain moves to the more efficient serial mode.
4409 */
4410 DISPATCH_ALWAYS_INLINE
4411 static dispatch_queue_t
4412 _dispatch_queue_drain(dispatch_queue_t dq, dispatch_invoke_flags_t flags,
4413 uint64_t *owned_ptr, struct dispatch_object_s **dc_out,
4414 bool serial_drain)
4415 {
4416 dispatch_queue_t orig_tq = dq->do_targetq;
4417 dispatch_thread_frame_s dtf;
4418 struct dispatch_object_s *dc = NULL, *next_dc;
4419 uint64_t owned = *owned_ptr;
4420
4421 _dispatch_thread_frame_push(&dtf, dq);
4422 if (_dq_state_is_in_barrier(owned)) {
4423 // we really own `IN_BARRIER + dq->dq_width * WIDTH_INTERVAL`
4424 // but width can change while draining barrier work items, so we only
4425 // convert to `dq->dq_width * WIDTH_INTERVAL` when we drop `IN_BARRIER`
4426 owned = DISPATCH_QUEUE_IN_BARRIER;
4427 }
4428
4429 while (dq->dq_items_tail) {
4430 dc = _dispatch_queue_head(dq);
4431 do {
4432 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(dq))) {
4433 goto out;
4434 }
4435 if (unlikely(orig_tq != dq->do_targetq)) {
4436 goto out;
4437 }
4438 if (unlikely(serial_drain != (dq->dq_width == 1))) {
4439 goto out;
4440 }
4441 if (serial_drain || _dispatch_object_is_barrier(dc)) {
4442 if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
4443 goto out;
4444 }
4445 next_dc = _dispatch_queue_next(dq, dc);
4446 if (_dispatch_object_is_slow_item(dc)) {
4447 owned = 0;
4448 goto out_with_deferred;
4449 }
4450 } else {
4451 if (owned == DISPATCH_QUEUE_IN_BARRIER) {
4452 // we just ran barrier work items, we have to make their
4453 // effect visible to other sync work items on other threads
4454 // that may start coming in after this point, hence the
4455 // release barrier
4456 os_atomic_and2o(dq, dq_state, ~owned, release);
4457 owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4458 } else if (unlikely(owned == 0)) {
4459 if (_dispatch_object_is_slow_item(dc)) {
4460 // sync "readers" don't observe the limit
4461 _dispatch_queue_reserve_sync_width(dq);
4462 } else if (!_dispatch_queue_try_acquire_async(dq)) {
4463 goto out_with_no_width;
4464 }
4465 owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
4466 }
4467
4468 next_dc = _dispatch_queue_next(dq, dc);
4469 if (_dispatch_object_is_slow_item(dc)) {
4470 owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4471 _dispatch_continuation_slow_item_signal(dq, dc);
4472 continue;
4473 }
4474
4475 if (flags & DISPATCH_INVOKE_REDIRECTING_DRAIN) {
4476 owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4477 _dispatch_continuation_redirect(dq, dc);
4478 continue;
4479 }
4480 }
4481
4482 _dispatch_continuation_pop_inline(dc, dq, flags);
4483 _dispatch_perfmon_workitem_inc();
4484 if (unlikely(dtf.dtf_deferred)) {
4485 goto out_with_deferred_compute_owned;
4486 }
4487 } while ((dc = next_dc));
4488 }
4489
4490 out:
4491 if (owned == DISPATCH_QUEUE_IN_BARRIER) {
4492 // if we're IN_BARRIER we really own the full width too
4493 owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4494 }
4495 if (dc) {
4496 owned = _dispatch_queue_adjust_owned(dq, owned, dc);
4497 }
4498 *owned_ptr = owned;
4499 _dispatch_thread_frame_pop(&dtf);
4500 return dc ? dq->do_targetq : NULL;
4501
4502 out_with_no_width:
4503 *owned_ptr = 0;
4504 _dispatch_thread_frame_pop(&dtf);
4505 return NULL;
4506
4507 out_with_deferred_compute_owned:
4508 if (serial_drain) {
4509 owned = DISPATCH_QUEUE_IN_BARRIER + DISPATCH_QUEUE_WIDTH_INTERVAL;
4510 } else {
4511 if (owned == DISPATCH_QUEUE_IN_BARRIER) {
4512 // if we're IN_BARRIER we really own the full width too
4513 owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4514 }
4515 if (next_dc) {
4516 owned = _dispatch_queue_adjust_owned(dq, owned, next_dc);
4517 }
4518 }
4519 out_with_deferred:
4520 *owned_ptr = owned;
4521 if (unlikely(!dc_out)) {
4522 DISPATCH_INTERNAL_CRASH(dc,
4523 "Deferred continuation on source, mach channel or mgr");
4524 }
4525 *dc_out = dc;
4526 _dispatch_thread_frame_pop(&dtf);
4527 return dq->do_targetq;
4528 }
4529
4530 DISPATCH_NOINLINE
4531 static dispatch_queue_t
4532 _dispatch_queue_concurrent_drain(dispatch_queue_t dq,
4533 dispatch_invoke_flags_t flags, uint64_t *owned,
4534 struct dispatch_object_s **dc_ptr)
4535 {
4536 return _dispatch_queue_drain(dq, flags, owned, dc_ptr, false);
4537 }
4538
4539 DISPATCH_NOINLINE
4540 dispatch_queue_t
4541 _dispatch_queue_serial_drain(dispatch_queue_t dq,
4542 dispatch_invoke_flags_t flags, uint64_t *owned,
4543 struct dispatch_object_s **dc_ptr)
4544 {
4545 flags &= ~(dispatch_invoke_flags_t)DISPATCH_INVOKE_REDIRECTING_DRAIN;
4546 return _dispatch_queue_drain(dq, flags, owned, dc_ptr, true);
4547 }
4548
4549 #if DISPATCH_COCOA_COMPAT
4550 static void
4551 _dispatch_main_queue_drain(void)
4552 {
4553 dispatch_queue_t dq = &_dispatch_main_q;
4554 dispatch_thread_frame_s dtf;
4555
4556 if (!dq->dq_items_tail) {
4557 return;
4558 }
4559
4560 if (!fastpath(_dispatch_queue_is_thread_bound(dq))) {
4561 DISPATCH_CLIENT_CRASH(0, "_dispatch_main_queue_callback_4CF called"
4562 " after dispatch_main()");
4563 }
4564 mach_port_t owner = DISPATCH_QUEUE_DRAIN_OWNER(dq);
4565 if (slowpath(owner != _dispatch_tid_self())) {
4566 DISPATCH_CLIENT_CRASH(owner, "_dispatch_main_queue_callback_4CF called"
4567 " from the wrong thread");
4568 }
4569
4570 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
4571 _dispatch_runloop_queue_handle_init);
4572
4573 _dispatch_perfmon_start();
4574 // <rdar://problem/23256682> hide the frame chaining when CFRunLoop
4575 // drains the main runloop, as this should not be observable that way
4576 _dispatch_thread_frame_push_and_rebase(&dtf, dq, NULL);
4577
4578 pthread_priority_t old_pri = _dispatch_get_priority();
4579 pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri, NULL);
4580 voucher_t voucher = _voucher_copy();
4581
4582 struct dispatch_object_s *dc, *next_dc, *tail;
4583 dc = os_mpsc_capture_snapshot(dq, dq_items, &tail);
4584 do {
4585 next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
4586 _dispatch_continuation_pop_inline(dc, dq, DISPATCH_INVOKE_NONE);
4587 _dispatch_perfmon_workitem_inc();
4588 } while ((dc = next_dc));
4589
4590 // runloop based queues use their port for the queue PUBLISH pattern
4591 // so this raw call to dx_wakeup(0) is valid
4592 dx_wakeup(dq, 0, 0);
4593 _dispatch_voucher_debug("main queue restore", voucher);
4594 _dispatch_reset_defaultpriority(old_dp);
4595 _dispatch_reset_priority_and_voucher(old_pri, voucher);
4596 _dispatch_thread_frame_pop(&dtf);
4597 _dispatch_perfmon_end();
4598 _dispatch_force_cache_cleanup();
4599 }
4600
4601 static bool
4602 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq)
4603 {
4604 if (!dq->dq_items_tail) {
4605 return false;
4606 }
4607 dispatch_thread_frame_s dtf;
4608 _dispatch_perfmon_start();
4609 _dispatch_thread_frame_push(&dtf, dq);
4610 pthread_priority_t old_pri = _dispatch_get_priority();
4611 pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri, NULL);
4612 voucher_t voucher = _voucher_copy();
4613
4614 struct dispatch_object_s *dc, *next_dc;
4615 dc = _dispatch_queue_head(dq);
4616 next_dc = _dispatch_queue_next(dq, dc);
4617 _dispatch_continuation_pop_inline(dc, dq, DISPATCH_INVOKE_NONE);
4618 _dispatch_perfmon_workitem_inc();
4619
4620 if (!next_dc) {
4621 // runloop based queues use their port for the queue PUBLISH pattern
4622 // so this raw call to dx_wakeup(0) is valid
4623 dx_wakeup(dq, 0, 0);
4624 }
4625
4626 _dispatch_voucher_debug("runloop queue restore", voucher);
4627 _dispatch_reset_defaultpriority(old_dp);
4628 _dispatch_reset_priority_and_voucher(old_pri, voucher);
4629 _dispatch_thread_frame_pop(&dtf);
4630 _dispatch_perfmon_end();
4631 _dispatch_force_cache_cleanup();
4632 return next_dc;
4633 }
4634 #endif
4635
4636 DISPATCH_NOINLINE
4637 void
4638 _dispatch_try_lock_transfer_or_wakeup(dispatch_queue_t dq)
4639 {
4640 dispatch_continuation_t dc_tmp, dc_start, dc_end;
4641 struct dispatch_object_s *dc = NULL;
4642 uint64_t dq_state, owned;
4643 size_t count = 0;
4644
4645 owned = DISPATCH_QUEUE_IN_BARRIER;
4646 owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4647 attempt_running_slow_head:
4648 if (slowpath(dq->dq_items_tail) && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
4649 dc = _dispatch_queue_head(dq);
4650 if (!_dispatch_object_is_slow_item(dc)) {
4651 // not a slow item, needs to wake up
4652 } else if (fastpath(dq->dq_width == 1) ||
4653 _dispatch_object_is_barrier(dc)) {
4654 // rdar://problem/8290662 "barrier/writer lock transfer"
4655 dc_start = dc_end = (dispatch_continuation_t)dc;
4656 owned = 0;
4657 count = 1;
4658 dc = _dispatch_queue_next(dq, dc);
4659 } else {
4660 // <rdar://problem/10164594> "reader lock transfer"
4661 // we must not signal semaphores immediately because our right
4662 // for dequeuing is granted through holding the full "barrier" width
4663 // which a signaled work item could relinquish out from our feet
4664 dc_start = (dispatch_continuation_t)dc;
4665 do {
4666 // no check on width here because concurrent queues
4667 // do not respect width for blocked readers, the thread
4668 // is already spent anyway
4669 dc_end = (dispatch_continuation_t)dc;
4670 owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4671 count++;
4672 dc = _dispatch_queue_next(dq, dc);
4673 } while (dc && _dispatch_object_is_slow_non_barrier(dc));
4674 }
4675
4676 if (count) {
4677 _dispatch_queue_drain_transfer_lock(dq, owned, dc_start);
4678 do {
4679 // signaled job will release the continuation
4680 dc_tmp = dc_start;
4681 dc_start = dc_start->do_next;
4682 _dispatch_continuation_slow_item_signal(dq, dc_tmp);
4683 } while (dc_tmp != dc_end);
4684 return;
4685 }
4686 }
4687
4688 if (dc || dx_metatype(dq) != _DISPATCH_QUEUE_TYPE) {
4689 // <rdar://problem/23336992> the following wakeup is needed for sources
4690 // or mach channels: when ds_pending_data is set at the same time
4691 // as a trysync_f happens, lock transfer code above doesn't know about
4692 // ds_pending_data or the wakeup logic, but lock transfer is useless
4693 // for sources and mach channels in the first place.
4694 owned = _dispatch_queue_adjust_owned(dq, owned, dc);
4695 dq_state = _dispatch_queue_drain_unlock(dq, owned, NULL);
4696 return _dispatch_queue_try_wakeup(dq, dq_state,
4697 DISPATCH_WAKEUP_WAITER_HANDOFF);
4698 } else if (!fastpath(_dispatch_queue_drain_try_unlock(dq, owned))) {
4699 // someone enqueued a slow item at the head
4700 // looping may be its last chance
4701 goto attempt_running_slow_head;
4702 }
4703 }
4704
4705 void
4706 _dispatch_mgr_queue_drain(void)
4707 {
4708 const dispatch_invoke_flags_t flags = DISPATCH_INVOKE_MANAGER_DRAIN;
4709 dispatch_queue_t dq = &_dispatch_mgr_q;
4710 uint64_t owned = DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
4711
4712 if (dq->dq_items_tail) {
4713 _dispatch_perfmon_start();
4714 if (slowpath(_dispatch_queue_serial_drain(dq, flags, &owned, NULL))) {
4715 DISPATCH_INTERNAL_CRASH(0, "Interrupted drain on manager queue");
4716 }
4717 _dispatch_voucher_debug("mgr queue clear", NULL);
4718 _voucher_clear();
4719 _dispatch_reset_defaultpriority_override();
4720 _dispatch_perfmon_end();
4721 }
4722
4723 #if DISPATCH_USE_KEVENT_WORKQUEUE
4724 if (!_dispatch_kevent_workqueue_enabled)
4725 #endif
4726 {
4727 _dispatch_force_cache_cleanup();
4728 }
4729 }
4730
4731 #pragma mark -
4732 #pragma mark dispatch_queue_invoke
4733
4734 void
4735 _dispatch_queue_drain_deferred_invoke(dispatch_queue_t dq,
4736 dispatch_invoke_flags_t flags, uint64_t to_unlock,
4737 struct dispatch_object_s *dc)
4738 {
4739 if (_dispatch_object_is_slow_item(dc)) {
4740 dispatch_assert(to_unlock == 0);
4741 _dispatch_queue_drain_transfer_lock(dq, to_unlock, dc);
4742 _dispatch_continuation_slow_item_signal(dq, dc);
4743 return _dispatch_release_tailcall(dq);
4744 }
4745
4746 bool should_defer_again = false, should_pend_queue = true;
4747 uint64_t old_state, new_state;
4748
4749 if (_dispatch_get_current_queue()->do_targetq) {
4750 _dispatch_thread_frame_get_current()->dtf_deferred = dc;
4751 should_defer_again = true;
4752 should_pend_queue = false;
4753 }
4754
4755 if (dq->dq_width > 1) {
4756 should_pend_queue = false;
4757 } else if (should_pend_queue) {
4758 dispatch_assert(to_unlock ==
4759 DISPATCH_QUEUE_WIDTH_INTERVAL + DISPATCH_QUEUE_IN_BARRIER);
4760 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{
4761 new_state = old_state;
4762 if (_dq_state_has_waiters(old_state) ||
4763 _dq_state_is_enqueued(old_state)) {
4764 os_atomic_rmw_loop_give_up(break);
4765 }
4766 new_state += DISPATCH_QUEUE_DRAIN_PENDED;
4767 new_state -= DISPATCH_QUEUE_IN_BARRIER;
4768 new_state -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4769 });
4770 should_pend_queue = (new_state & DISPATCH_QUEUE_DRAIN_PENDED);
4771 }
4772
4773 if (!should_pend_queue) {
4774 if (to_unlock & DISPATCH_QUEUE_IN_BARRIER) {
4775 _dispatch_try_lock_transfer_or_wakeup(dq);
4776 _dispatch_release(dq);
4777 } else if (to_unlock) {
4778 uint64_t dq_state = _dispatch_queue_drain_unlock(dq, to_unlock, NULL);
4779 _dispatch_queue_try_wakeup(dq, dq_state, DISPATCH_WAKEUP_CONSUME);
4780 } else {
4781 _dispatch_release(dq);
4782 }
4783 dq = NULL;
4784 }
4785
4786 if (!should_defer_again) {
4787 dx_invoke(dc, flags & _DISPATCH_INVOKE_PROPAGATE_MASK);
4788 }
4789
4790 if (dq) {
4791 uint32_t self = _dispatch_tid_self();
4792 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{
4793 new_state = old_state;
4794 if (!_dq_state_drain_pended(old_state) ||
4795 _dq_state_drain_owner(old_state) != self) {
4796 os_atomic_rmw_loop_give_up({
4797 // We may have been overridden, so inform the root queue
4798 _dispatch_set_defaultpriority_override();
4799 return _dispatch_release_tailcall(dq);
4800 });
4801 }
4802 new_state = DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT(new_state);
4803 });
4804 if (_dq_state_has_override(old_state)) {
4805 // Ensure that the root queue sees that this thread was overridden.
4806 _dispatch_set_defaultpriority_override();
4807 }
4808 return dx_invoke(dq, flags | DISPATCH_INVOKE_STEALING);
4809 }
4810 }
4811
4812 void
4813 _dispatch_queue_finalize_activation(dispatch_queue_t dq)
4814 {
4815 dispatch_queue_t tq = dq->do_targetq;
4816 _dispatch_queue_priority_inherit_from_target(dq, tq);
4817 _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
4818 if (dq->dq_override_voucher == DISPATCH_NO_VOUCHER) {
4819 voucher_t v = tq->dq_override_voucher;
4820 if (v != DISPATCH_NO_VOUCHER) {
4821 if (v) _voucher_retain(v);
4822 dq->dq_override_voucher = v;
4823 }
4824 }
4825 }
4826
4827 DISPATCH_ALWAYS_INLINE
4828 static inline dispatch_queue_t
4829 dispatch_queue_invoke2(dispatch_queue_t dq, dispatch_invoke_flags_t flags,
4830 uint64_t *owned, struct dispatch_object_s **dc_ptr)
4831 {
4832 dispatch_queue_t otq = dq->do_targetq;
4833 dispatch_queue_t cq = _dispatch_queue_get_current();
4834
4835 if (slowpath(cq != otq)) {
4836 return otq;
4837 }
4838 if (dq->dq_width == 1) {
4839 return _dispatch_queue_serial_drain(dq, flags, owned, dc_ptr);
4840 }
4841 return _dispatch_queue_concurrent_drain(dq, flags, owned, dc_ptr);
4842 }
4843
4844 // 6618342 Contact the team that owns the Instrument DTrace probe before
4845 // renaming this symbol
4846 DISPATCH_NOINLINE
4847 void
4848 _dispatch_queue_invoke(dispatch_queue_t dq, dispatch_invoke_flags_t flags)
4849 {
4850 _dispatch_queue_class_invoke(dq, flags, dispatch_queue_invoke2);
4851 }
4852
4853 #pragma mark -
4854 #pragma mark dispatch_queue_class_wakeup
4855
4856 #if HAVE_PTHREAD_WORKQUEUE_QOS
4857 void
4858 _dispatch_queue_override_invoke(dispatch_continuation_t dc,
4859 dispatch_invoke_flags_t flags)
4860 {
4861 dispatch_queue_t old_rq = _dispatch_queue_get_current();
4862 dispatch_queue_t assumed_rq = dc->dc_other;
4863 voucher_t ov = DISPATCH_NO_VOUCHER;
4864 dispatch_object_t dou;
4865
4866 dou._do = dc->dc_data;
4867 _dispatch_queue_set_current(assumed_rq);
4868 flags |= DISPATCH_INVOKE_OVERRIDING;
4869 if (dc_type(dc) == DISPATCH_CONTINUATION_TYPE(OVERRIDE_STEALING)) {
4870 flags |= DISPATCH_INVOKE_STEALING;
4871 } else {
4872 // balance the fake continuation push in
4873 // _dispatch_root_queue_push_override
4874 _dispatch_trace_continuation_pop(assumed_rq, dou._do);
4875 }
4876 _dispatch_continuation_pop_forwarded(dc, ov, DISPATCH_OBJ_CONSUME_BIT, {
4877 if (_dispatch_object_has_vtable(dou._do)) {
4878 dx_invoke(dou._do, flags);
4879 } else {
4880 _dispatch_continuation_invoke_inline(dou, ov, flags);
4881 }
4882 });
4883 _dispatch_queue_set_current(old_rq);
4884 }
4885
4886 DISPATCH_ALWAYS_INLINE
4887 static inline bool
4888 _dispatch_need_global_root_queue_override(dispatch_queue_t rq,
4889 pthread_priority_t pp)
4890 {
4891 pthread_priority_t rqp = rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
4892 bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
4893
4894 if (unlikely(!rqp)) return false;
4895
4896 pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
4897 return defaultqueue ? pp && pp != rqp : pp > rqp;
4898 }
4899
4900 DISPATCH_ALWAYS_INLINE
4901 static inline bool
4902 _dispatch_need_global_root_queue_override_stealer(dispatch_queue_t rq,
4903 pthread_priority_t pp, dispatch_wakeup_flags_t wflags)
4904 {
4905 pthread_priority_t rqp = rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
4906 bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
4907
4908 if (unlikely(!rqp)) return false;
4909
4910 if (wflags & DISPATCH_WAKEUP_WAITER_HANDOFF) {
4911 if (!(wflags & _DISPATCH_WAKEUP_OVERRIDE_BITS)) {
4912 return false;
4913 }
4914 }
4915
4916 pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
4917 return defaultqueue || pp > rqp;
4918 }
4919
4920 DISPATCH_NOINLINE
4921 static void
4922 _dispatch_root_queue_push_override(dispatch_queue_t orig_rq,
4923 dispatch_object_t dou, pthread_priority_t pp)
4924 {
4925 bool overcommit = orig_rq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
4926 dispatch_queue_t rq = _dispatch_get_root_queue_for_priority(pp, overcommit);
4927 dispatch_continuation_t dc = dou._dc;
4928
4929 if (_dispatch_object_is_redirection(dc)) {
4930 // no double-wrap is needed, _dispatch_async_redirect_invoke will do
4931 // the right thing
4932 dc->dc_func = (void *)orig_rq;
4933 } else {
4934 dc = _dispatch_continuation_alloc();
4935 dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
4936 // fake that we queued `dou` on `orig_rq` for introspection purposes
4937 _dispatch_trace_continuation_push(orig_rq, dou);
4938 dc->dc_ctxt = dc;
4939 dc->dc_other = orig_rq;
4940 dc->dc_data = dou._do;
4941 dc->dc_priority = DISPATCH_NO_PRIORITY;
4942 dc->dc_voucher = DISPATCH_NO_VOUCHER;
4943 }
4944
4945 DISPATCH_COMPILER_CAN_ASSUME(dx_type(rq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE);
4946 _dispatch_queue_push_inline(rq, dc, 0, 0);
4947 }
4948
4949 DISPATCH_NOINLINE
4950 static void
4951 _dispatch_root_queue_push_override_stealer(dispatch_queue_t orig_rq,
4952 dispatch_queue_t dq, pthread_priority_t pp)
4953 {
4954 bool overcommit = orig_rq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
4955 dispatch_queue_t rq = _dispatch_get_root_queue_for_priority(pp, overcommit);
4956 dispatch_continuation_t dc = _dispatch_continuation_alloc();
4957
4958 dc->do_vtable = DC_VTABLE(OVERRIDE_STEALING);
4959 _dispatch_retain(dq);
4960 dc->dc_func = NULL;
4961 dc->dc_ctxt = dc;
4962 dc->dc_other = orig_rq;
4963 dc->dc_data = dq;
4964 dc->dc_priority = DISPATCH_NO_PRIORITY;
4965 dc->dc_voucher = DISPATCH_NO_VOUCHER;
4966
4967 DISPATCH_COMPILER_CAN_ASSUME(dx_type(rq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE);
4968 _dispatch_queue_push_inline(rq, dc, 0, 0);
4969 }
4970
4971 DISPATCH_NOINLINE
4972 static void
4973 _dispatch_queue_class_wakeup_with_override(dispatch_queue_t dq,
4974 pthread_priority_t pp, dispatch_wakeup_flags_t flags, uint64_t dq_state)
4975 {
4976 mach_port_t owner = _dq_state_drain_owner(dq_state);
4977 pthread_priority_t pp2;
4978 dispatch_queue_t tq;
4979 bool locked;
4980
4981 if (owner) {
4982 int rc = _dispatch_wqthread_override_start_check_owner(owner, pp,
4983 &dq->dq_state_lock);
4984 // EPERM means the target of the override is not a work queue thread
4985 // and could be a thread bound queue such as the main queue.
4986 // When that happens we must get to that queue and wake it up if we
4987 // want the override to be appplied and take effect.
4988 if (rc != EPERM) {
4989 goto out;
4990 }
4991 }
4992
4993 if (_dq_state_is_suspended(dq_state)) {
4994 goto out;
4995 }
4996
4997 tq = dq->do_targetq;
4998
4999 if (_dispatch_queue_has_immutable_target(dq)) {
5000 locked = false;
5001 } else if (_dispatch_is_in_root_queues_array(tq)) {
5002 // avoid locking when we recognize the target queue as a global root
5003 // queue it is gross, but is a very common case. The locking isn't
5004 // needed because these target queues cannot go away.
5005 locked = false;
5006 } else if (_dispatch_queue_sidelock_trylock(dq, pp)) {
5007 // <rdar://problem/17735825> to traverse the tq chain safely we must
5008 // lock it to ensure it cannot change
5009 locked = true;
5010 tq = dq->do_targetq;
5011 _dispatch_ktrace1(DISPATCH_PERF_mutable_target, dq);
5012 } else {
5013 //
5014 // Leading to being there, the current thread has:
5015 // 1. enqueued an object on `dq`
5016 // 2. raised the dq_override value of `dq`
5017 // 3. set the HAS_OVERRIDE bit and not seen an owner
5018 // 4. tried and failed to acquire the side lock
5019 //
5020 //
5021 // The side lock owner can only be one of three things:
5022 //
5023 // - The suspend/resume side count code. Besides being unlikely,
5024 // it means that at this moment the queue is actually suspended,
5025 // which transfers the responsibility of applying the override to
5026 // the eventual dispatch_resume().
5027 //
5028 // - A dispatch_set_target_queue() call. The fact that we saw no `owner`
5029 // means that the trysync it does wasn't being drained when (3)
5030 // happened which can only be explained by one of these interleavings:
5031 //
5032 // o `dq` became idle between when the object queued in (1) ran and
5033 // the set_target_queue call and we were unlucky enough that our
5034 // step (3) happened while this queue was idle. There is no reason
5035 // to override anything anymore, the queue drained to completion
5036 // while we were preempted, our job is done.
5037 //
5038 // o `dq` is queued but not draining during (1-3), then when we try
5039 // to lock at (4) the queue is now draining a set_target_queue.
5040 // Since we set HAS_OVERRIDE with a release barrier, the effect of
5041 // (2) was visible to the drainer when he acquired the drain lock,
5042 // and that guy has applied our override. Our job is done.
5043 //
5044 // - Another instance of _dispatch_queue_class_wakeup_with_override(),
5045 // which is fine because trylock leaves a hint that we failed our
5046 // trylock, causing the tryunlock below to fail and reassess whether
5047 // a better override needs to be applied.
5048 //
5049 _dispatch_ktrace1(DISPATCH_PERF_mutable_target, dq);
5050 goto out;
5051 }
5052
5053 apply_again:
5054 if (dx_type(tq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
5055 if (_dispatch_need_global_root_queue_override_stealer(tq, pp, flags)) {
5056 _dispatch_root_queue_push_override_stealer(tq, dq, pp);
5057 }
5058 } else if (flags & DISPATCH_WAKEUP_WAITER_HANDOFF) {
5059 dx_wakeup(tq, pp, flags);
5060 } else if (_dispatch_queue_need_override(tq, pp)) {
5061 dx_wakeup(tq, pp, DISPATCH_WAKEUP_OVERRIDING);
5062 }
5063 while (unlikely(locked && !_dispatch_queue_sidelock_tryunlock(dq))) {
5064 // rdar://problem/24081326
5065 //
5066 // Another instance of _dispatch_queue_class_wakeup_with_override()
5067 // tried to acquire the side lock while we were running, and could have
5068 // had a better override than ours to apply.
5069 //
5070 pp2 = dq->dq_override;
5071 if (pp2 > pp) {
5072 pp = pp2;
5073 // The other instance had a better priority than ours, override
5074 // our thread, and apply the override that wasn't applied to `dq`
5075 // because of us.
5076 goto apply_again;
5077 }
5078 }
5079
5080 out:
5081 if (flags & DISPATCH_WAKEUP_CONSUME) {
5082 return _dispatch_release_tailcall(dq);
5083 }
5084 }
5085 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5086
5087 DISPATCH_NOINLINE
5088 void
5089 _dispatch_queue_class_override_drainer(dispatch_queue_t dq,
5090 pthread_priority_t pp, dispatch_wakeup_flags_t flags)
5091 {
5092 #if HAVE_PTHREAD_WORKQUEUE_QOS
5093 uint64_t dq_state, value;
5094
5095 //
5096 // Someone is trying to override the last work item of the queue.
5097 // Do not remember this override on the queue because we know the precise
5098 // duration the override is required for: until the current drain unlocks.
5099 //
5100 // That is why this function only tries to set HAS_OVERRIDE if we can
5101 // still observe a drainer, and doesn't need to set the DIRTY bit
5102 // because oq_override wasn't touched and there is no race to resolve
5103 //
5104 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
5105 if (!_dq_state_drain_locked(dq_state)) {
5106 os_atomic_rmw_loop_give_up(break);
5107 }
5108 value = dq_state | DISPATCH_QUEUE_HAS_OVERRIDE;
5109 });
5110 if (_dq_state_drain_locked(dq_state)) {
5111 return _dispatch_queue_class_wakeup_with_override(dq, pp,
5112 flags, dq_state);
5113 }
5114 #else
5115 (void)pp;
5116 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5117 if (flags & DISPATCH_WAKEUP_CONSUME) {
5118 return _dispatch_release_tailcall(dq);
5119 }
5120 }
5121
5122 #if DISPATCH_USE_KEVENT_WORKQUEUE
5123 DISPATCH_NOINLINE
5124 static void
5125 _dispatch_trystash_to_deferred_items(dispatch_queue_t dq, dispatch_object_t dou,
5126 pthread_priority_t pp, dispatch_deferred_items_t ddi)
5127 {
5128 dispatch_priority_t old_pp = ddi->ddi_stashed_pp;
5129 dispatch_queue_t old_dq = ddi->ddi_stashed_dq;
5130 struct dispatch_object_s *old_dou = ddi->ddi_stashed_dou;
5131 dispatch_priority_t rq_overcommit;
5132
5133 rq_overcommit = dq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
5134 if (likely(!old_pp || rq_overcommit)) {
5135 ddi->ddi_stashed_dq = dq;
5136 ddi->ddi_stashed_dou = dou._do;
5137 ddi->ddi_stashed_pp = (dispatch_priority_t)pp | rq_overcommit |
5138 _PTHREAD_PRIORITY_PRIORITY_MASK;
5139 if (likely(!old_pp)) {
5140 return;
5141 }
5142 // push the previously stashed item
5143 pp = old_pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
5144 dq = old_dq;
5145 dou._do = old_dou;
5146 }
5147 if (_dispatch_need_global_root_queue_override(dq, pp)) {
5148 return _dispatch_root_queue_push_override(dq, dou, pp);
5149 }
5150 // bit of cheating: we should really pass `pp` but we know that we are
5151 // pushing onto a global queue at this point, and we just checked that
5152 // `pp` doesn't matter.
5153 DISPATCH_COMPILER_CAN_ASSUME(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE);
5154 _dispatch_queue_push_inline(dq, dou, 0, 0);
5155 }
5156 #endif
5157
5158 DISPATCH_NOINLINE
5159 static void
5160 _dispatch_queue_push_slow(dispatch_queue_t dq, dispatch_object_t dou,
5161 pthread_priority_t pp)
5162 {
5163 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
5164 _dispatch_root_queues_init_once);
5165 _dispatch_queue_push(dq, dou, pp);
5166 }
5167
5168 DISPATCH_NOINLINE
5169 void
5170 _dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
5171 pthread_priority_t pp)
5172 {
5173 _dispatch_assert_is_valid_qos_override(pp);
5174 if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
5175 #if DISPATCH_USE_KEVENT_WORKQUEUE
5176 dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
5177 if (unlikely(ddi && !(ddi->ddi_stashed_pp &
5178 (dispatch_priority_t)_PTHREAD_PRIORITY_FLAGS_MASK))) {
5179 dispatch_assert(_dispatch_root_queues_pred == DLOCK_ONCE_DONE);
5180 return _dispatch_trystash_to_deferred_items(dq, dou, pp, ddi);
5181 }
5182 #endif
5183 #if HAVE_PTHREAD_WORKQUEUE_QOS
5184 // can't use dispatch_once_f() as it would create a frame
5185 if (unlikely(_dispatch_root_queues_pred != DLOCK_ONCE_DONE)) {
5186 return _dispatch_queue_push_slow(dq, dou, pp);
5187 }
5188 if (_dispatch_need_global_root_queue_override(dq, pp)) {
5189 return _dispatch_root_queue_push_override(dq, dou, pp);
5190 }
5191 #endif
5192 }
5193 _dispatch_queue_push_inline(dq, dou, pp, 0);
5194 }
5195
5196 DISPATCH_NOINLINE
5197 static void
5198 _dispatch_queue_class_wakeup_enqueue(dispatch_queue_t dq, pthread_priority_t pp,
5199 dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
5200 {
5201 dispatch_queue_t tq;
5202
5203 if (flags & (DISPATCH_WAKEUP_OVERRIDING | DISPATCH_WAKEUP_WAS_OVERRIDDEN)) {
5204 // _dispatch_queue_drain_try_unlock may have reset the override while
5205 // we were becoming the enqueuer
5206 _dispatch_queue_reinstate_override_priority(dq, (dispatch_priority_t)pp);
5207 }
5208 if (!(flags & DISPATCH_WAKEUP_CONSUME)) {
5209 _dispatch_retain(dq);
5210 }
5211 if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
5212 // try_become_enqueuer has no acquire barrier, as the last block
5213 // of a queue asyncing to that queue is not an uncommon pattern
5214 // and in that case the acquire is completely useless
5215 //
5216 // so instead use a thread fence here when we will read the targetq
5217 // pointer because that is the only thing that really requires
5218 // that barrier.
5219 os_atomic_thread_fence(acquire);
5220 tq = dq->do_targetq;
5221 } else {
5222 dispatch_assert(target == DISPATCH_QUEUE_WAKEUP_MGR);
5223 tq = &_dispatch_mgr_q;
5224 }
5225 return _dispatch_queue_push(tq, dq, pp);
5226 }
5227
5228 DISPATCH_NOINLINE
5229 void
5230 _dispatch_queue_class_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
5231 dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
5232 {
5233 uint64_t old_state, new_state, bits = 0;
5234
5235 #if HAVE_PTHREAD_WORKQUEUE_QOS
5236 _dispatch_queue_override_priority(dq, /* inout */ &pp, /* inout */ &flags);
5237 #endif
5238
5239 if (flags & DISPATCH_WAKEUP_FLUSH) {
5240 bits = DISPATCH_QUEUE_DIRTY;
5241 }
5242 if (flags & DISPATCH_WAKEUP_OVERRIDING) {
5243 //
5244 // Setting the dirty bit here is about forcing callers of
5245 // _dispatch_queue_drain_try_unlock() to loop again when an override
5246 // has just been set to close the following race:
5247 //
5248 // Drainer (in drain_try_unlokc():
5249 // override_reset();
5250 // preempted....
5251 //
5252 // Enqueuer:
5253 // atomic_or(oq_override, override, relaxed);
5254 // atomic_or(dq_state, HAS_OVERRIDE, release);
5255 //
5256 // Drainer:
5257 // ... resumes
5258 // successful drain_unlock() and leaks `oq_override`
5259 //
5260 bits = DISPATCH_QUEUE_DIRTY | DISPATCH_QUEUE_HAS_OVERRIDE;
5261 }
5262
5263 if (flags & DISPATCH_WAKEUP_SLOW_WAITER) {
5264 uint64_t pending_barrier_width =
5265 (dq->dq_width - 1) * DISPATCH_QUEUE_WIDTH_INTERVAL;
5266 uint64_t xor_owner_and_set_full_width_and_in_barrier =
5267 _dispatch_tid_self() | DISPATCH_QUEUE_WIDTH_FULL_BIT |
5268 DISPATCH_QUEUE_IN_BARRIER;
5269
5270 #ifdef DLOCK_NOWAITERS_BIT
5271 bits |= DLOCK_NOWAITERS_BIT;
5272 #else
5273 bits |= DLOCK_WAITERS_BIT;
5274 #endif
5275 flags ^= DISPATCH_WAKEUP_SLOW_WAITER;
5276 dispatch_assert(!(flags & DISPATCH_WAKEUP_CONSUME));
5277
5278 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
5279 new_state = old_state | bits;
5280 if (_dq_state_drain_pended(old_state)) {
5281 // same as DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT
5282 // but we want to be more efficient wrt the WAITERS_BIT
5283 new_state &= ~DISPATCH_QUEUE_DRAIN_OWNER_MASK;
5284 new_state &= ~DISPATCH_QUEUE_DRAIN_PENDED;
5285 }
5286 if (unlikely(_dq_state_drain_locked(new_state))) {
5287 #ifdef DLOCK_NOWAITERS_BIT
5288 new_state &= ~(uint64_t)DLOCK_NOWAITERS_BIT;
5289 #endif
5290 } else if (unlikely(!_dq_state_is_runnable(new_state) ||
5291 !(flags & DISPATCH_WAKEUP_FLUSH))) {
5292 // either not runnable, or was not for the first item (26700358)
5293 // so we should not try to lock and handle overrides instead
5294 } else if (_dq_state_has_pending_barrier(old_state) ||
5295 new_state + pending_barrier_width <
5296 DISPATCH_QUEUE_WIDTH_FULL_BIT) {
5297 // see _dispatch_queue_drain_try_lock
5298 new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
5299 new_state ^= xor_owner_and_set_full_width_and_in_barrier;
5300 } else {
5301 new_state |= DISPATCH_QUEUE_ENQUEUED;
5302 }
5303 });
5304 if ((old_state ^ new_state) & DISPATCH_QUEUE_IN_BARRIER) {
5305 return _dispatch_try_lock_transfer_or_wakeup(dq);
5306 }
5307 } else if (bits) {
5308 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{
5309 new_state = old_state | bits;
5310 if (likely(_dq_state_should_wakeup(old_state))) {
5311 new_state |= DISPATCH_QUEUE_ENQUEUED;
5312 }
5313 });
5314 } else {
5315 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed,{
5316 new_state = old_state;
5317 if (likely(_dq_state_should_wakeup(old_state))) {
5318 new_state |= DISPATCH_QUEUE_ENQUEUED;
5319 } else {
5320 os_atomic_rmw_loop_give_up(break);
5321 }
5322 });
5323 }
5324
5325 if ((old_state ^ new_state) & DISPATCH_QUEUE_ENQUEUED) {
5326 return _dispatch_queue_class_wakeup_enqueue(dq, pp, flags, target);
5327 }
5328
5329 #if HAVE_PTHREAD_WORKQUEUE_QOS
5330 if ((flags & (DISPATCH_WAKEUP_OVERRIDING | DISPATCH_WAKEUP_WAITER_HANDOFF))
5331 && target == DISPATCH_QUEUE_WAKEUP_TARGET) {
5332 return _dispatch_queue_class_wakeup_with_override(dq, pp,
5333 flags, new_state);
5334 }
5335 #endif
5336
5337 if (flags & DISPATCH_WAKEUP_CONSUME) {
5338 return _dispatch_release_tailcall(dq);
5339 }
5340 }
5341
5342 #pragma mark -
5343 #pragma mark dispatch_root_queue_drain
5344
5345 DISPATCH_NOINLINE
5346 static bool
5347 _dispatch_root_queue_drain_one_slow(dispatch_queue_t dq)
5348 {
5349 dispatch_root_queue_context_t qc = dq->do_ctxt;
5350 struct dispatch_object_s *const mediator = (void *)~0ul;
5351 bool pending = false, available = true;
5352 unsigned int sleep_time = DISPATCH_CONTENTION_USLEEP_START;
5353
5354 do {
5355 // Spin for a short while in case the contention is temporary -- e.g.
5356 // when starting up after dispatch_apply, or when executing a few
5357 // short continuations in a row.
5358 if (_dispatch_contention_wait_until(dq->dq_items_head != mediator)) {
5359 goto out;
5360 }
5361 // Since we have serious contention, we need to back off.
5362 if (!pending) {
5363 // Mark this queue as pending to avoid requests for further threads
5364 (void)os_atomic_inc2o(qc, dgq_pending, relaxed);
5365 pending = true;
5366 }
5367 _dispatch_contention_usleep(sleep_time);
5368 if (fastpath(dq->dq_items_head != mediator)) goto out;
5369 sleep_time *= 2;
5370 } while (sleep_time < DISPATCH_CONTENTION_USLEEP_MAX);
5371
5372 // The ratio of work to libdispatch overhead must be bad. This
5373 // scenario implies that there are too many threads in the pool.
5374 // Create a new pending thread and then exit this thread.
5375 // The kernel will grant a new thread when the load subsides.
5376 _dispatch_debug("contention on global queue: %p", dq);
5377 available = false;
5378 out:
5379 if (pending) {
5380 (void)os_atomic_dec2o(qc, dgq_pending, relaxed);
5381 }
5382 if (!available) {
5383 _dispatch_global_queue_poke(dq);
5384 }
5385 return available;
5386 }
5387
5388 DISPATCH_ALWAYS_INLINE
5389 static inline bool
5390 _dispatch_root_queue_drain_one2(dispatch_queue_t dq)
5391 {
5392 // Wait for queue head and tail to be both non-empty or both empty
5393 bool available; // <rdar://problem/15917893>
5394 _dispatch_wait_until((dq->dq_items_head != NULL) ==
5395 (available = (dq->dq_items_tail != NULL)));
5396 return available;
5397 }
5398
5399 DISPATCH_ALWAYS_INLINE_NDEBUG
5400 static inline struct dispatch_object_s *
5401 _dispatch_root_queue_drain_one(dispatch_queue_t dq)
5402 {
5403 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
5404
5405 start:
5406 // The mediator value acts both as a "lock" and a signal
5407 head = os_atomic_xchg2o(dq, dq_items_head, mediator, relaxed);
5408
5409 if (slowpath(head == NULL)) {
5410 // The first xchg on the tail will tell the enqueueing thread that it
5411 // is safe to blindly write out to the head pointer. A cmpxchg honors
5412 // the algorithm.
5413 if (slowpath(!os_atomic_cmpxchg2o(dq, dq_items_head, mediator,
5414 NULL, relaxed))) {
5415 goto start;
5416 }
5417 if (slowpath(dq->dq_items_tail) && // <rdar://problem/14416349>
5418 _dispatch_root_queue_drain_one2(dq)) {
5419 goto start;
5420 }
5421 _dispatch_root_queue_debug("no work on global queue: %p", dq);
5422 return NULL;
5423 }
5424
5425 if (slowpath(head == mediator)) {
5426 // This thread lost the race for ownership of the queue.
5427 if (fastpath(_dispatch_root_queue_drain_one_slow(dq))) {
5428 goto start;
5429 }
5430 return NULL;
5431 }
5432
5433 // Restore the head pointer to a sane value before returning.
5434 // If 'next' is NULL, then this item _might_ be the last item.
5435 next = fastpath(head->do_next);
5436
5437 if (slowpath(!next)) {
5438 os_atomic_store2o(dq, dq_items_head, NULL, relaxed);
5439 // 22708742: set tail to NULL with release, so that NULL write to head
5440 // above doesn't clobber head from concurrent enqueuer
5441 if (os_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL, release)) {
5442 // both head and tail are NULL now
5443 goto out;
5444 }
5445 // There must be a next item now.
5446 _dispatch_wait_until(next = head->do_next);
5447 }
5448
5449 os_atomic_store2o(dq, dq_items_head, next, relaxed);
5450 _dispatch_global_queue_poke(dq);
5451 out:
5452 return head;
5453 }
5454
5455 void
5456 _dispatch_root_queue_drain_deferred_item(dispatch_queue_t dq,
5457 struct dispatch_object_s *dou, pthread_priority_t pp)
5458 {
5459 struct _dispatch_identity_s di;
5460
5461 // fake that we queued `dou` on `dq` for introspection purposes
5462 _dispatch_trace_continuation_push(dq, dou);
5463
5464 pp = _dispatch_priority_inherit_from_root_queue(pp, dq);
5465 _dispatch_queue_set_current(dq);
5466 _dispatch_root_queue_identity_assume(&di, pp);
5467 #if DISPATCH_COCOA_COMPAT
5468 void *pool = _dispatch_last_resort_autorelease_pool_push();
5469 #endif // DISPATCH_COCOA_COMPAT
5470
5471 _dispatch_perfmon_start();
5472 _dispatch_continuation_pop_inline(dou, dq,
5473 DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN);
5474 _dispatch_perfmon_workitem_inc();
5475 _dispatch_perfmon_end();
5476
5477 #if DISPATCH_COCOA_COMPAT
5478 _dispatch_last_resort_autorelease_pool_pop(pool);
5479 #endif // DISPATCH_COCOA_COMPAT
5480 _dispatch_reset_defaultpriority(di.old_pp);
5481 _dispatch_queue_set_current(NULL);
5482
5483 _dispatch_voucher_debug("root queue clear", NULL);
5484 _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
5485 }
5486
5487 DISPATCH_NOT_TAIL_CALLED // prevent tailcall (for Instrument DTrace probe)
5488 static void
5489 _dispatch_root_queue_drain(dispatch_queue_t dq, pthread_priority_t pri)
5490 {
5491 #if DISPATCH_DEBUG
5492 dispatch_queue_t cq;
5493 if (slowpath(cq = _dispatch_queue_get_current())) {
5494 DISPATCH_INTERNAL_CRASH(cq, "Premature thread recycling");
5495 }
5496 #endif
5497 _dispatch_queue_set_current(dq);
5498 if (dq->dq_priority) pri = dq->dq_priority;
5499 pthread_priority_t old_dp = _dispatch_set_defaultpriority(pri, NULL);
5500 #if DISPATCH_COCOA_COMPAT
5501 void *pool = _dispatch_last_resort_autorelease_pool_push();
5502 #endif // DISPATCH_COCOA_COMPAT
5503
5504 _dispatch_perfmon_start();
5505 struct dispatch_object_s *item;
5506 bool reset = false;
5507 while ((item = fastpath(_dispatch_root_queue_drain_one(dq)))) {
5508 if (reset) _dispatch_wqthread_override_reset();
5509 _dispatch_continuation_pop_inline(item, dq,
5510 DISPATCH_INVOKE_WORKER_DRAIN|DISPATCH_INVOKE_REDIRECTING_DRAIN);
5511 _dispatch_perfmon_workitem_inc();
5512 reset = _dispatch_reset_defaultpriority_override();
5513 }
5514 _dispatch_perfmon_end();
5515
5516 #if DISPATCH_COCOA_COMPAT
5517 _dispatch_last_resort_autorelease_pool_pop(pool);
5518 #endif // DISPATCH_COCOA_COMPAT
5519 _dispatch_reset_defaultpriority(old_dp);
5520 _dispatch_queue_set_current(NULL);
5521 }
5522
5523 #pragma mark -
5524 #pragma mark dispatch_worker_thread
5525
5526 #if HAVE_PTHREAD_WORKQUEUES
5527 static void
5528 _dispatch_worker_thread4(void *context)
5529 {
5530 dispatch_queue_t dq = context;
5531 dispatch_root_queue_context_t qc = dq->do_ctxt;
5532
5533 _dispatch_introspection_thread_add();
5534 int pending = (int)os_atomic_dec2o(qc, dgq_pending, relaxed);
5535 dispatch_assert(pending >= 0);
5536 _dispatch_root_queue_drain(dq, _dispatch_get_priority());
5537 _dispatch_voucher_debug("root queue clear", NULL);
5538 _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
5539 }
5540
5541 #if HAVE_PTHREAD_WORKQUEUE_QOS
5542 static void
5543 _dispatch_worker_thread3(pthread_priority_t pp)
5544 {
5545 bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
5546 dispatch_queue_t dq;
5547 pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
5548 _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
5549 dq = _dispatch_get_root_queue_for_priority(pp, overcommit);
5550 return _dispatch_worker_thread4(dq);
5551 }
5552 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5553
5554 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5555 // 6618342 Contact the team that owns the Instrument DTrace probe before
5556 // renaming this symbol
5557 static void
5558 _dispatch_worker_thread2(int priority, int options,
5559 void *context DISPATCH_UNUSED)
5560 {
5561 dispatch_assert(priority >= 0 && priority < WORKQ_NUM_PRIOQUEUE);
5562 dispatch_assert(!(options & ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT));
5563 dispatch_queue_t dq = _dispatch_wq2root_queues[priority][options];
5564
5565 return _dispatch_worker_thread4(dq);
5566 }
5567 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5568 #endif // HAVE_PTHREAD_WORKQUEUES
5569
5570 #if DISPATCH_USE_PTHREAD_POOL
5571 // 6618342 Contact the team that owns the Instrument DTrace probe before
5572 // renaming this symbol
5573 static void *
5574 _dispatch_worker_thread(void *context)
5575 {
5576 dispatch_queue_t dq = context;
5577 dispatch_root_queue_context_t qc = dq->do_ctxt;
5578 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
5579
5580 if (pqc->dpq_observer_hooks.queue_will_execute) {
5581 _dispatch_set_pthread_root_queue_observer_hooks(
5582 &pqc->dpq_observer_hooks);
5583 }
5584 if (pqc->dpq_thread_configure) {
5585 pqc->dpq_thread_configure();
5586 }
5587
5588 sigset_t mask;
5589 int r;
5590 // workaround tweaks the kernel workqueue does for us
5591 r = sigfillset(&mask);
5592 (void)dispatch_assume_zero(r);
5593 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
5594 (void)dispatch_assume_zero(r);
5595 _dispatch_introspection_thread_add();
5596
5597 const int64_t timeout = 5ull * NSEC_PER_SEC;
5598 pthread_priority_t old_pri = _dispatch_get_priority();
5599 do {
5600 _dispatch_root_queue_drain(dq, old_pri);
5601 _dispatch_reset_priority_and_voucher(old_pri, NULL);
5602 } while (dispatch_semaphore_wait(&pqc->dpq_thread_mediator,
5603 dispatch_time(0, timeout)) == 0);
5604
5605 (void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);
5606 _dispatch_global_queue_poke(dq);
5607 _dispatch_release(dq);
5608
5609 return NULL;
5610 }
5611
5612 int
5613 _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset)
5614 {
5615 int r;
5616
5617 /* Workaround: 6269619 Not all signals can be delivered on any thread */
5618
5619 r = sigdelset(set, SIGILL);
5620 (void)dispatch_assume_zero(r);
5621 r = sigdelset(set, SIGTRAP);
5622 (void)dispatch_assume_zero(r);
5623 #if HAVE_DECL_SIGEMT
5624 r = sigdelset(set, SIGEMT);
5625 (void)dispatch_assume_zero(r);
5626 #endif
5627 r = sigdelset(set, SIGFPE);
5628 (void)dispatch_assume_zero(r);
5629 r = sigdelset(set, SIGBUS);
5630 (void)dispatch_assume_zero(r);
5631 r = sigdelset(set, SIGSEGV);
5632 (void)dispatch_assume_zero(r);
5633 r = sigdelset(set, SIGSYS);
5634 (void)dispatch_assume_zero(r);
5635 r = sigdelset(set, SIGPIPE);
5636 (void)dispatch_assume_zero(r);
5637
5638 return pthread_sigmask(how, set, oset);
5639 }
5640 #endif // DISPATCH_USE_PTHREAD_POOL
5641
5642 #pragma mark -
5643 #pragma mark dispatch_runloop_queue
5644
5645 static bool _dispatch_program_is_probably_callback_driven;
5646
5647 #if DISPATCH_COCOA_COMPAT
5648
5649 dispatch_queue_t
5650 _dispatch_runloop_root_queue_create_4CF(const char *label, unsigned long flags)
5651 {
5652 dispatch_queue_t dq;
5653 size_t dqs;
5654
5655 if (slowpath(flags)) {
5656 return DISPATCH_BAD_INPUT;
5657 }
5658 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
5659 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_runloop), dqs);
5660 _dispatch_queue_init(dq, DQF_THREAD_BOUND | DQF_CANNOT_TRYSYNC, 1, false);
5661 dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,true);
5662 dq->dq_label = label ? label : "runloop-queue"; // no-copy contract
5663 _dispatch_runloop_queue_handle_init(dq);
5664 _dispatch_queue_set_bound_thread(dq);
5665 _dispatch_object_debug(dq, "%s", __func__);
5666 return _dispatch_introspection_queue_create(dq);
5667 }
5668
5669 void
5670 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq)
5671 {
5672 _dispatch_object_debug(dq, "%s", __func__);
5673
5674 pthread_priority_t pp = _dispatch_queue_reset_override_priority(dq, true);
5675 _dispatch_queue_clear_bound_thread(dq);
5676 dx_wakeup(dq, pp, DISPATCH_WAKEUP_FLUSH);
5677 if (pp) _dispatch_thread_override_end(DISPATCH_QUEUE_DRAIN_OWNER(dq), dq);
5678 }
5679
5680 void
5681 _dispatch_runloop_queue_dispose(dispatch_queue_t dq)
5682 {
5683 _dispatch_object_debug(dq, "%s", __func__);
5684 _dispatch_introspection_queue_dispose(dq);
5685 _dispatch_runloop_queue_handle_dispose(dq);
5686 _dispatch_queue_destroy(dq);
5687 }
5688
5689 bool
5690 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq)
5691 {
5692 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
5693 DISPATCH_CLIENT_CRASH(dq->do_vtable, "Not a runloop queue");
5694 }
5695 dispatch_retain(dq);
5696 bool r = _dispatch_runloop_queue_drain_one(dq);
5697 dispatch_release(dq);
5698 return r;
5699 }
5700
5701 void
5702 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq)
5703 {
5704 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
5705 DISPATCH_CLIENT_CRASH(dq->do_vtable, "Not a runloop queue");
5706 }
5707 _dispatch_runloop_queue_wakeup(dq, 0, false);
5708 }
5709
5710 dispatch_runloop_handle_t
5711 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq)
5712 {
5713 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
5714 DISPATCH_CLIENT_CRASH(dq->do_vtable, "Not a runloop queue");
5715 }
5716 return _dispatch_runloop_queue_get_handle(dq);
5717 }
5718
5719 static void
5720 _dispatch_runloop_queue_handle_init(void *ctxt)
5721 {
5722 dispatch_queue_t dq = (dispatch_queue_t)ctxt;
5723 dispatch_runloop_handle_t handle;
5724
5725 _dispatch_fork_becomes_unsafe();
5726
5727 #if TARGET_OS_MAC
5728 mach_port_t mp;
5729 kern_return_t kr;
5730 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &mp);
5731 DISPATCH_VERIFY_MIG(kr);
5732 (void)dispatch_assume_zero(kr);
5733 kr = mach_port_insert_right(mach_task_self(), mp, mp,
5734 MACH_MSG_TYPE_MAKE_SEND);
5735 DISPATCH_VERIFY_MIG(kr);
5736 (void)dispatch_assume_zero(kr);
5737 if (dq != &_dispatch_main_q) {
5738 struct mach_port_limits limits = {
5739 .mpl_qlimit = 1,
5740 };
5741 kr = mach_port_set_attributes(mach_task_self(), mp,
5742 MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits,
5743 sizeof(limits));
5744 DISPATCH_VERIFY_MIG(kr);
5745 (void)dispatch_assume_zero(kr);
5746 }
5747 handle = mp;
5748 #elif defined(__linux__)
5749 int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
5750 if (fd == -1) {
5751 int err = errno;
5752 switch (err) {
5753 case EMFILE:
5754 DISPATCH_CLIENT_CRASH(err, "eventfd() failure: "
5755 "process is out of file descriptors");
5756 break;
5757 case ENFILE:
5758 DISPATCH_CLIENT_CRASH(err, "eventfd() failure: "
5759 "system is out of file descriptors");
5760 break;
5761 case ENOMEM:
5762 DISPATCH_CLIENT_CRASH(err, "eventfd() failure: "
5763 "kernel is out of memory");
5764 break;
5765 default:
5766 DISPATCH_INTERNAL_CRASH(err, "eventfd() failure");
5767 break;
5768 }
5769 }
5770 handle = fd;
5771 #else
5772 #error "runloop support not implemented on this platform"
5773 #endif
5774 _dispatch_runloop_queue_set_handle(dq, handle);
5775
5776 _dispatch_program_is_probably_callback_driven = true;
5777 }
5778
5779 static void
5780 _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq)
5781 {
5782 dispatch_runloop_handle_t handle = _dispatch_runloop_queue_get_handle(dq);
5783 if (!_dispatch_runloop_handle_is_valid(handle)) {
5784 return;
5785 }
5786 dq->do_ctxt = NULL;
5787 #if TARGET_OS_MAC
5788 mach_port_t mp = handle;
5789 kern_return_t kr = mach_port_deallocate(mach_task_self(), mp);
5790 DISPATCH_VERIFY_MIG(kr);
5791 (void)dispatch_assume_zero(kr);
5792 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE, -1);
5793 DISPATCH_VERIFY_MIG(kr);
5794 (void)dispatch_assume_zero(kr);
5795 #elif defined(__linux__)
5796 int rc = close(handle);
5797 (void)dispatch_assume_zero(rc);
5798 #else
5799 #error "runloop support not implemented on this platform"
5800 #endif
5801 }
5802
5803 #pragma mark -
5804 #pragma mark dispatch_main_queue
5805
5806 dispatch_runloop_handle_t
5807 _dispatch_get_main_queue_handle_4CF(void)
5808 {
5809 dispatch_queue_t dq = &_dispatch_main_q;
5810 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
5811 _dispatch_runloop_queue_handle_init);
5812 return _dispatch_runloop_queue_get_handle(dq);
5813 }
5814
5815 #if TARGET_OS_MAC
5816 dispatch_runloop_handle_t
5817 _dispatch_get_main_queue_port_4CF(void)
5818 {
5819 return _dispatch_get_main_queue_handle_4CF();
5820 }
5821 #endif
5822
5823 static bool main_q_is_draining;
5824
5825 // 6618342 Contact the team that owns the Instrument DTrace probe before
5826 // renaming this symbol
5827 DISPATCH_NOINLINE
5828 static void
5829 _dispatch_queue_set_mainq_drain_state(bool arg)
5830 {
5831 main_q_is_draining = arg;
5832 }
5833
5834 void
5835 _dispatch_main_queue_callback_4CF(
5836 #if TARGET_OS_MAC
5837 mach_msg_header_t *_Null_unspecified msg
5838 #else
5839 void *ignored
5840 #endif
5841 DISPATCH_UNUSED)
5842 {
5843 if (main_q_is_draining) {
5844 return;
5845 }
5846 _dispatch_queue_set_mainq_drain_state(true);
5847 _dispatch_main_queue_drain();
5848 _dispatch_queue_set_mainq_drain_state(false);
5849 }
5850
5851 #endif
5852
5853 void
5854 dispatch_main(void)
5855 {
5856 #if HAVE_PTHREAD_MAIN_NP
5857 if (pthread_main_np()) {
5858 #endif
5859 _dispatch_object_debug(&_dispatch_main_q, "%s", __func__);
5860 _dispatch_program_is_probably_callback_driven = true;
5861 _dispatch_ktrace0(ARIADNE_ENTER_DISPATCH_MAIN_CODE);
5862 #ifdef __linux__
5863 // On Linux, if the main thread calls pthread_exit, the process becomes a zombie.
5864 // To avoid that, just before calling pthread_exit we register a TSD destructor
5865 // that will call _dispatch_sig_thread -- thus capturing the main thread in sigsuspend.
5866 // This relies on an implementation detail (currently true in glibc) that TSD destructors
5867 // will be called in the order of creation to cause all the TSD cleanup functions to
5868 // run before the thread becomes trapped in sigsuspend.
5869 pthread_key_t dispatch_main_key;
5870 pthread_key_create(&dispatch_main_key, _dispatch_sig_thread);
5871 pthread_setspecific(dispatch_main_key, &dispatch_main_key);
5872 #endif
5873 pthread_exit(NULL);
5874 DISPATCH_INTERNAL_CRASH(errno, "pthread_exit() returned");
5875 #if HAVE_PTHREAD_MAIN_NP
5876 }
5877 DISPATCH_CLIENT_CRASH(0, "dispatch_main() must be called on the main thread");
5878 #endif
5879 }
5880
5881 DISPATCH_NOINLINE DISPATCH_NORETURN
5882 static void
5883 _dispatch_sigsuspend(void)
5884 {
5885 static const sigset_t mask;
5886
5887 for (;;) {
5888 sigsuspend(&mask);
5889 }
5890 }
5891
5892 DISPATCH_NORETURN
5893 static void
5894 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED)
5895 {
5896 // never returns, so burn bridges behind us
5897 _dispatch_clear_stack(0);
5898 _dispatch_sigsuspend();
5899 }
5900
5901 DISPATCH_NOINLINE
5902 static void
5903 _dispatch_queue_cleanup2(void)
5904 {
5905 dispatch_queue_t dq = &_dispatch_main_q;
5906 _dispatch_queue_clear_bound_thread(dq);
5907
5908 // <rdar://problem/22623242>
5909 // Here is what happens when both this cleanup happens because of
5910 // dispatch_main() being called, and a concurrent enqueuer makes the queue
5911 // non empty.
5912 //
5913 // _dispatch_queue_cleanup2:
5914 // atomic_and(dq_is_thread_bound, ~DQF_THREAD_BOUND, relaxed);
5915 // maximal_barrier();
5916 // if (load(dq_items_tail, seq_cst)) {
5917 // // do the wake up the normal serial queue way
5918 // } else {
5919 // // do no wake up <----
5920 // }
5921 //
5922 // enqueuer:
5923 // store(dq_items_tail, new_tail, release);
5924 // if (load(dq_is_thread_bound, relaxed)) {
5925 // // do the wake up the runloop way <----
5926 // } else {
5927 // // do the wake up the normal serial way
5928 // }
5929 //
5930 // what would be bad is to take both paths marked <---- because the queue
5931 // wouldn't be woken up until the next time it's used (which may never
5932 // happen)
5933 //
5934 // An enqueuer that speculates the load of the old value of thread_bound
5935 // and then does the store may wake up the main queue the runloop way.
5936 // But then, the cleanup thread will see that store because the load
5937 // of dq_items_tail is sequentially consistent, and we have just thrown away
5938 // our pipeline.
5939 //
5940 // By the time cleanup2() is out of the maximally synchronizing barrier,
5941 // no other thread can speculate the wrong load anymore, and both cleanup2()
5942 // and a concurrent enqueuer would treat the queue in the standard non
5943 // thread bound way
5944
5945 _dispatch_queue_atomic_flags_clear(dq,
5946 DQF_THREAD_BOUND | DQF_CANNOT_TRYSYNC);
5947 os_atomic_maximally_synchronizing_barrier();
5948 // no need to drop the override, the thread will die anyway
5949 // the barrier above includes an acquire, so it's ok to do this raw
5950 // call to dx_wakeup(0)
5951 dx_wakeup(dq, 0, 0);
5952
5953 // overload the "probably" variable to mean that dispatch_main() or
5954 // similar non-POSIX API was called
5955 // this has to run before the DISPATCH_COCOA_COMPAT below
5956 // See dispatch_main for call to _dispatch_sig_thread on linux.
5957 #ifndef __linux__
5958 if (_dispatch_program_is_probably_callback_driven) {
5959 _dispatch_barrier_async_detached_f(_dispatch_get_root_queue(
5960 _DISPATCH_QOS_CLASS_DEFAULT, true), NULL, _dispatch_sig_thread);
5961 sleep(1); // workaround 6778970
5962 }
5963 #endif
5964
5965 #if DISPATCH_COCOA_COMPAT
5966 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
5967 _dispatch_runloop_queue_handle_init);
5968 _dispatch_runloop_queue_handle_dispose(dq);
5969 #endif
5970 }
5971
5972 static void
5973 _dispatch_queue_cleanup(void *ctxt)
5974 {
5975 if (ctxt == &_dispatch_main_q) {
5976 return _dispatch_queue_cleanup2();
5977 }
5978 // POSIX defines that destructors are only called if 'ctxt' is non-null
5979 DISPATCH_INTERNAL_CRASH(ctxt,
5980 "Premature thread exit while a dispatch queue is running");
5981 }
5982
5983 static void
5984 _dispatch_deferred_items_cleanup(void *ctxt)
5985 {
5986 // POSIX defines that destructors are only called if 'ctxt' is non-null
5987 DISPATCH_INTERNAL_CRASH(ctxt,
5988 "Premature thread exit with unhandled deferred items");
5989 }
5990
5991 static void
5992 _dispatch_frame_cleanup(void *ctxt)
5993 {
5994 // POSIX defines that destructors are only called if 'ctxt' is non-null
5995 DISPATCH_INTERNAL_CRASH(ctxt,
5996 "Premature thread exit while a dispatch frame is active");
5997 }
5998
5999 static void
6000 _dispatch_context_cleanup(void *ctxt)
6001 {
6002 // POSIX defines that destructors are only called if 'ctxt' is non-null
6003 DISPATCH_INTERNAL_CRASH(ctxt,
6004 "Premature thread exit while a dispatch context is set");
6005 }