]> git.saurik.com Git - apple/libdispatch.git/blob - src/queue.c
libdispatch-703.50.37.tar.gz
[apple/libdispatch.git] / src / queue.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #endif
25
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
29 #endif
30 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
31 #define DISPATCH_USE_PTHREAD_POOL 1
32 #endif
33 #if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || DISPATCH_DEBUG) \
34 && !defined(DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK)
35 #define DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1
36 #endif
37 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK && \
38 !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
39 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
40 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
41 #endif
42 #if HAVE_PTHREAD_WORKQUEUE_QOS && !DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
43 #undef HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
44 #define HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 0
45 #endif
46 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
47 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
48 #define pthread_workqueue_t void*
49 #endif
50
51 static void _dispatch_sig_thread(void *ctxt);
52 static void _dispatch_cache_cleanup(void *value);
53 static void _dispatch_sync_f(dispatch_queue_t dq, void *ctxt,
54 dispatch_function_t func, pthread_priority_t pp);
55 static void _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc);
56 static void _dispatch_queue_cleanup(void *ctxt);
57 static void _dispatch_deferred_items_cleanup(void *ctxt);
58 static void _dispatch_frame_cleanup(void *ctxt);
59 static void _dispatch_context_cleanup(void *ctxt);
60 static void _dispatch_non_barrier_complete(dispatch_queue_t dq);
61 static inline void _dispatch_global_queue_poke(dispatch_queue_t dq);
62 #if HAVE_PTHREAD_WORKQUEUES
63 static void _dispatch_worker_thread4(void *context);
64 #if HAVE_PTHREAD_WORKQUEUE_QOS
65 static void _dispatch_worker_thread3(pthread_priority_t priority);
66 #endif
67 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
68 static void _dispatch_worker_thread2(int priority, int options, void *context);
69 #endif
70 #endif
71 #if DISPATCH_USE_PTHREAD_POOL
72 static void *_dispatch_worker_thread(void *context);
73 static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
74 #endif
75
76 #if DISPATCH_COCOA_COMPAT
77 static dispatch_once_t _dispatch_main_q_handle_pred;
78 static void _dispatch_runloop_queue_poke(dispatch_queue_t dq,
79 pthread_priority_t pp, dispatch_wakeup_flags_t flags);
80 static void _dispatch_runloop_queue_handle_init(void *ctxt);
81 static void _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq);
82 #endif
83
84 static void _dispatch_root_queues_init_once(void *context);
85 static dispatch_once_t _dispatch_root_queues_pred;
86
87 #pragma mark -
88 #pragma mark dispatch_root_queue
89
90 struct dispatch_pthread_root_queue_context_s {
91 pthread_attr_t dpq_thread_attr;
92 dispatch_block_t dpq_thread_configure;
93 struct dispatch_semaphore_s dpq_thread_mediator;
94 dispatch_pthread_root_queue_observer_hooks_s dpq_observer_hooks;
95 };
96 typedef struct dispatch_pthread_root_queue_context_s *
97 dispatch_pthread_root_queue_context_t;
98
99 #if DISPATCH_ENABLE_THREAD_POOL
100 static struct dispatch_pthread_root_queue_context_s
101 _dispatch_pthread_root_queue_contexts[] = {
102 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {
103 .dpq_thread_mediator = {
104 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
105 }},
106 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {
107 .dpq_thread_mediator = {
108 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
109 }},
110 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {
111 .dpq_thread_mediator = {
112 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
113 }},
114 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {
115 .dpq_thread_mediator = {
116 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
117 }},
118 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {
119 .dpq_thread_mediator = {
120 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
121 }},
122 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {
123 .dpq_thread_mediator = {
124 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
125 }},
126 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {
127 .dpq_thread_mediator = {
128 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
129 }},
130 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {
131 .dpq_thread_mediator = {
132 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
133 }},
134 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {
135 .dpq_thread_mediator = {
136 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
137 }},
138 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {
139 .dpq_thread_mediator = {
140 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
141 }},
142 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {
143 .dpq_thread_mediator = {
144 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
145 }},
146 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {
147 .dpq_thread_mediator = {
148 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
149 }},
150 };
151 #endif
152
153 #define MAX_PTHREAD_COUNT 255
154
155 struct dispatch_root_queue_context_s {
156 union {
157 struct {
158 unsigned int volatile dgq_pending;
159 #if HAVE_PTHREAD_WORKQUEUES
160 qos_class_t dgq_qos;
161 int dgq_wq_priority, dgq_wq_options;
162 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
163 pthread_workqueue_t dgq_kworkqueue;
164 #endif
165 #endif // HAVE_PTHREAD_WORKQUEUES
166 #if DISPATCH_USE_PTHREAD_POOL
167 void *dgq_ctxt;
168 uint32_t volatile dgq_thread_pool_size;
169 #endif
170 };
171 char _dgq_pad[DISPATCH_CACHELINE_SIZE];
172 };
173 };
174 typedef struct dispatch_root_queue_context_s *dispatch_root_queue_context_t;
175
176 #define WORKQ_PRIO_INVALID (-1)
177 #ifndef WORKQ_BG_PRIOQUEUE_CONDITIONAL
178 #define WORKQ_BG_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
179 #endif
180 #ifndef WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
181 #define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
182 #endif
183
184 DISPATCH_CACHELINE_ALIGN
185 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
186 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
187 #if HAVE_PTHREAD_WORKQUEUES
188 .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE,
189 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
190 .dgq_wq_options = 0,
191 #endif
192 #if DISPATCH_ENABLE_THREAD_POOL
193 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
194 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS],
195 #endif
196 }}},
197 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
198 #if HAVE_PTHREAD_WORKQUEUES
199 .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE,
200 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
201 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
202 #endif
203 #if DISPATCH_ENABLE_THREAD_POOL
204 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
205 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT],
206 #endif
207 }}},
208 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
209 #if HAVE_PTHREAD_WORKQUEUES
210 .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND,
211 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
212 .dgq_wq_options = 0,
213 #endif
214 #if DISPATCH_ENABLE_THREAD_POOL
215 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
216 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
217 #endif
218 }}},
219 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
220 #if HAVE_PTHREAD_WORKQUEUES
221 .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND,
222 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
223 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
224 #endif
225 #if DISPATCH_ENABLE_THREAD_POOL
226 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
227 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
228 #endif
229 }}},
230 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
231 #if HAVE_PTHREAD_WORKQUEUES
232 .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY,
233 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
234 .dgq_wq_options = 0,
235 #endif
236 #if DISPATCH_ENABLE_THREAD_POOL
237 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
238 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
239 #endif
240 }}},
241 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
242 #if HAVE_PTHREAD_WORKQUEUES
243 .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY,
244 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
245 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
246 #endif
247 #if DISPATCH_ENABLE_THREAD_POOL
248 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
249 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
250 #endif
251 }}},
252 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
253 #if HAVE_PTHREAD_WORKQUEUES
254 .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT,
255 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
256 .dgq_wq_options = 0,
257 #endif
258 #if DISPATCH_ENABLE_THREAD_POOL
259 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
260 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
261 #endif
262 }}},
263 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
264 #if HAVE_PTHREAD_WORKQUEUES
265 .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT,
266 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
267 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
268 #endif
269 #if DISPATCH_ENABLE_THREAD_POOL
270 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
271 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
272 #endif
273 }}},
274 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
275 #if HAVE_PTHREAD_WORKQUEUES
276 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED,
277 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
278 .dgq_wq_options = 0,
279 #endif
280 #if DISPATCH_ENABLE_THREAD_POOL
281 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
282 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
283 #endif
284 }}},
285 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
286 #if HAVE_PTHREAD_WORKQUEUES
287 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED,
288 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
289 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
290 #endif
291 #if DISPATCH_ENABLE_THREAD_POOL
292 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
293 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
294 #endif
295 }}},
296 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
297 #if HAVE_PTHREAD_WORKQUEUES
298 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE,
299 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
300 .dgq_wq_options = 0,
301 #endif
302 #if DISPATCH_ENABLE_THREAD_POOL
303 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
304 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS],
305 #endif
306 }}},
307 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
308 #if HAVE_PTHREAD_WORKQUEUES
309 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE,
310 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
311 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
312 #endif
313 #if DISPATCH_ENABLE_THREAD_POOL
314 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
315 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT],
316 #endif
317 }}},
318 };
319
320 // 6618342 Contact the team that owns the Instrument DTrace probe before
321 // renaming this symbol
322 DISPATCH_CACHELINE_ALIGN
323 struct dispatch_queue_s _dispatch_root_queues[] = {
324 #define _DISPATCH_ROOT_QUEUE_ENTRY(n, ...) \
325 [DISPATCH_ROOT_QUEUE_IDX_##n] = { \
326 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root), \
327 .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
328 .do_ctxt = &_dispatch_root_queue_contexts[ \
329 DISPATCH_ROOT_QUEUE_IDX_##n], \
330 .dq_width = DISPATCH_QUEUE_WIDTH_POOL, \
331 .dq_override_voucher = DISPATCH_NO_VOUCHER, \
332 .dq_override = DISPATCH_SATURATED_OVERRIDE, \
333 __VA_ARGS__ \
334 }
335 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE_QOS,
336 .dq_label = "com.apple.root.maintenance-qos",
337 .dq_serialnum = 4,
338 ),
339 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE_QOS_OVERCOMMIT,
340 .dq_label = "com.apple.root.maintenance-qos.overcommit",
341 .dq_serialnum = 5,
342 ),
343 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND_QOS,
344 .dq_label = "com.apple.root.background-qos",
345 .dq_serialnum = 6,
346 ),
347 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND_QOS_OVERCOMMIT,
348 .dq_label = "com.apple.root.background-qos.overcommit",
349 .dq_serialnum = 7,
350 ),
351 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY_QOS,
352 .dq_label = "com.apple.root.utility-qos",
353 .dq_serialnum = 8,
354 ),
355 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY_QOS_OVERCOMMIT,
356 .dq_label = "com.apple.root.utility-qos.overcommit",
357 .dq_serialnum = 9,
358 ),
359 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT_QOS,
360 .dq_label = "com.apple.root.default-qos",
361 .dq_serialnum = 10,
362 ),
363 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT_QOS_OVERCOMMIT,
364 .dq_label = "com.apple.root.default-qos.overcommit",
365 .dq_serialnum = 11,
366 ),
367 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED_QOS,
368 .dq_label = "com.apple.root.user-initiated-qos",
369 .dq_serialnum = 12,
370 ),
371 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED_QOS_OVERCOMMIT,
372 .dq_label = "com.apple.root.user-initiated-qos.overcommit",
373 .dq_serialnum = 13,
374 ),
375 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE_QOS,
376 .dq_label = "com.apple.root.user-interactive-qos",
377 .dq_serialnum = 14,
378 ),
379 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE_QOS_OVERCOMMIT,
380 .dq_label = "com.apple.root.user-interactive-qos.overcommit",
381 .dq_serialnum = 15,
382 ),
383 };
384
385 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
386 static const dispatch_queue_t _dispatch_wq2root_queues[][2] = {
387 [WORKQ_BG_PRIOQUEUE][0] = &_dispatch_root_queues[
388 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
389 [WORKQ_BG_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
390 &_dispatch_root_queues[
391 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
392 [WORKQ_LOW_PRIOQUEUE][0] = &_dispatch_root_queues[
393 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
394 [WORKQ_LOW_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
395 &_dispatch_root_queues[
396 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
397 [WORKQ_DEFAULT_PRIOQUEUE][0] = &_dispatch_root_queues[
398 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
399 [WORKQ_DEFAULT_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
400 &_dispatch_root_queues[
401 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
402 [WORKQ_HIGH_PRIOQUEUE][0] = &_dispatch_root_queues[
403 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
404 [WORKQ_HIGH_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
405 &_dispatch_root_queues[
406 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
407 };
408 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
409
410 #define DISPATCH_PRIORITY_COUNT 5
411
412 enum {
413 // No DISPATCH_PRIORITY_IDX_MAINTENANCE define because there is no legacy
414 // maintenance priority
415 DISPATCH_PRIORITY_IDX_BACKGROUND = 0,
416 DISPATCH_PRIORITY_IDX_NON_INTERACTIVE,
417 DISPATCH_PRIORITY_IDX_LOW,
418 DISPATCH_PRIORITY_IDX_DEFAULT,
419 DISPATCH_PRIORITY_IDX_HIGH,
420 };
421
422 static qos_class_t _dispatch_priority2qos[] = {
423 [DISPATCH_PRIORITY_IDX_BACKGROUND] = _DISPATCH_QOS_CLASS_BACKGROUND,
424 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = _DISPATCH_QOS_CLASS_UTILITY,
425 [DISPATCH_PRIORITY_IDX_LOW] = _DISPATCH_QOS_CLASS_UTILITY,
426 [DISPATCH_PRIORITY_IDX_DEFAULT] = _DISPATCH_QOS_CLASS_DEFAULT,
427 [DISPATCH_PRIORITY_IDX_HIGH] = _DISPATCH_QOS_CLASS_USER_INITIATED,
428 };
429
430 #if HAVE_PTHREAD_WORKQUEUE_QOS
431 static const int _dispatch_priority2wq[] = {
432 [DISPATCH_PRIORITY_IDX_BACKGROUND] = WORKQ_BG_PRIOQUEUE,
433 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = WORKQ_NON_INTERACTIVE_PRIOQUEUE,
434 [DISPATCH_PRIORITY_IDX_LOW] = WORKQ_LOW_PRIOQUEUE,
435 [DISPATCH_PRIORITY_IDX_DEFAULT] = WORKQ_DEFAULT_PRIOQUEUE,
436 [DISPATCH_PRIORITY_IDX_HIGH] = WORKQ_HIGH_PRIOQUEUE,
437 };
438 #endif
439
440 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
441 static struct dispatch_queue_s _dispatch_mgr_root_queue;
442 #else
443 #define _dispatch_mgr_root_queue _dispatch_root_queues[\
444 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT]
445 #endif
446
447 // 6618342 Contact the team that owns the Instrument DTrace probe before
448 // renaming this symbol
449 DISPATCH_CACHELINE_ALIGN
450 struct dispatch_queue_s _dispatch_mgr_q = {
451 DISPATCH_GLOBAL_OBJECT_HEADER(queue_mgr),
452 .dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1),
453 .do_targetq = &_dispatch_mgr_root_queue,
454 .dq_label = "com.apple.libdispatch-manager",
455 .dq_width = 1,
456 .dq_override_voucher = DISPATCH_NO_VOUCHER,
457 .dq_override = DISPATCH_SATURATED_OVERRIDE,
458 .dq_serialnum = 2,
459 };
460
461 dispatch_queue_t
462 dispatch_get_global_queue(long priority, unsigned long flags)
463 {
464 if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
465 return DISPATCH_BAD_INPUT;
466 }
467 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
468 _dispatch_root_queues_init_once);
469 qos_class_t qos;
470 switch (priority) {
471 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
472 case _DISPATCH_QOS_CLASS_MAINTENANCE:
473 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS]
474 .dq_priority) {
475 // map maintenance to background on old kernel
476 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND];
477 } else {
478 qos = (qos_class_t)priority;
479 }
480 break;
481 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
482 case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
483 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND];
484 break;
485 case DISPATCH_QUEUE_PRIORITY_NON_INTERACTIVE:
486 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_NON_INTERACTIVE];
487 break;
488 case DISPATCH_QUEUE_PRIORITY_LOW:
489 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_LOW];
490 break;
491 case DISPATCH_QUEUE_PRIORITY_DEFAULT:
492 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_DEFAULT];
493 break;
494 case DISPATCH_QUEUE_PRIORITY_HIGH:
495 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH];
496 break;
497 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE:
498 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
499 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS]
500 .dq_priority) {
501 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH];
502 break;
503 }
504 #endif
505 // fallthrough
506 default:
507 qos = (qos_class_t)priority;
508 break;
509 }
510 return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT);
511 }
512
513 DISPATCH_ALWAYS_INLINE
514 static inline dispatch_queue_t
515 _dispatch_get_current_queue(void)
516 {
517 return _dispatch_queue_get_current() ?:
518 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true);
519 }
520
521 dispatch_queue_t
522 dispatch_get_current_queue(void)
523 {
524 return _dispatch_get_current_queue();
525 }
526
527 DISPATCH_NOINLINE DISPATCH_NORETURN
528 static void
529 _dispatch_assert_queue_fail(dispatch_queue_t dq, bool expected)
530 {
531 _dispatch_client_assert_fail(
532 "Block was %sexpected to execute on queue [%s]",
533 expected ? "" : "not ", dq->dq_label ?: "");
534 }
535
536 DISPATCH_NOINLINE DISPATCH_NORETURN
537 static void
538 _dispatch_assert_queue_barrier_fail(dispatch_queue_t dq)
539 {
540 _dispatch_client_assert_fail(
541 "Block was expected to act as a barrier on queue [%s]",
542 dq->dq_label ?: "");
543 }
544
545 void
546 dispatch_assert_queue(dispatch_queue_t dq)
547 {
548 unsigned long metatype = dx_metatype(dq);
549 if (unlikely(metatype != _DISPATCH_QUEUE_TYPE)) {
550 DISPATCH_CLIENT_CRASH(metatype, "invalid queue passed to "
551 "dispatch_assert_queue()");
552 }
553 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
554 if (unlikely(_dq_state_drain_pended(dq_state))) {
555 goto fail;
556 }
557 if (likely(_dq_state_drain_owner(dq_state) == _dispatch_tid_self())) {
558 return;
559 }
560 if (likely(dq->dq_width > 1)) {
561 // we can look at the width: if it is changing while we read it,
562 // it means that a barrier is running on `dq` concurrently, which
563 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
564 if (fastpath(_dispatch_thread_frame_find_queue(dq))) {
565 return;
566 }
567 }
568 fail:
569 _dispatch_assert_queue_fail(dq, true);
570 }
571
572 void
573 dispatch_assert_queue_not(dispatch_queue_t dq)
574 {
575 unsigned long metatype = dx_metatype(dq);
576 if (unlikely(metatype != _DISPATCH_QUEUE_TYPE)) {
577 DISPATCH_CLIENT_CRASH(metatype, "invalid queue passed to "
578 "dispatch_assert_queue_not()");
579 }
580 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
581 if (_dq_state_drain_pended(dq_state)) {
582 return;
583 }
584 if (likely(_dq_state_drain_owner(dq_state) != _dispatch_tid_self())) {
585 if (likely(dq->dq_width == 1)) {
586 // we can look at the width: if it is changing while we read it,
587 // it means that a barrier is running on `dq` concurrently, which
588 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
589 return;
590 }
591 if (likely(!_dispatch_thread_frame_find_queue(dq))) {
592 return;
593 }
594 }
595 _dispatch_assert_queue_fail(dq, false);
596 }
597
598 void
599 dispatch_assert_queue_barrier(dispatch_queue_t dq)
600 {
601 dispatch_assert_queue(dq);
602
603 if (likely(dq->dq_width == 1)) {
604 return;
605 }
606
607 if (likely(dq->do_targetq)) {
608 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
609 if (likely(_dq_state_is_in_barrier(dq_state))) {
610 return;
611 }
612 }
613
614 _dispatch_assert_queue_barrier_fail(dq);
615 }
616
617 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
618 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
619 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
620 #else
621 #define _dispatch_root_queue_debug(...)
622 #define _dispatch_debug_root_queue(...)
623 #endif
624
625 #pragma mark -
626 #pragma mark dispatch_init
627
628 #if HAVE_PTHREAD_WORKQUEUE_QOS
629 pthread_priority_t _dispatch_background_priority;
630 pthread_priority_t _dispatch_user_initiated_priority;
631
632 static void
633 _dispatch_root_queues_init_qos(int supported)
634 {
635 pthread_priority_t p;
636 qos_class_t qos;
637 unsigned int i;
638 for (i = 0; i < DISPATCH_PRIORITY_COUNT; i++) {
639 p = _pthread_qos_class_encode_workqueue(_dispatch_priority2wq[i], 0);
640 qos = _pthread_qos_class_decode(p, NULL, NULL);
641 dispatch_assert(qos != _DISPATCH_QOS_CLASS_UNSPECIFIED);
642 _dispatch_priority2qos[i] = qos;
643 }
644 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
645 qos = _dispatch_root_queue_contexts[i].dgq_qos;
646 if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
647 !(supported & WORKQ_FEATURE_MAINTENANCE)) {
648 continue;
649 }
650 unsigned long flags = i & 1 ? _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0;
651 flags |= _PTHREAD_PRIORITY_ROOTQUEUE_FLAG;
652 if (i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS ||
653 i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT) {
654 flags |= _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
655 }
656 p = _pthread_qos_class_encode(qos, 0, flags);
657 _dispatch_root_queues[i].dq_priority = (dispatch_priority_t)p;
658 }
659 }
660 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
661
662 static inline bool
663 _dispatch_root_queues_init_workq(int *wq_supported)
664 {
665 int r;
666 bool result = false;
667 *wq_supported = 0;
668 #if HAVE_PTHREAD_WORKQUEUES
669 bool disable_wq = false;
670 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
671 disable_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
672 #endif
673 #if DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
674 bool disable_qos = false;
675 #if DISPATCH_DEBUG
676 disable_qos = slowpath(getenv("LIBDISPATCH_DISABLE_QOS"));
677 #endif
678 #if DISPATCH_USE_KEVENT_WORKQUEUE
679 bool disable_kevent_wq = false;
680 #if DISPATCH_DEBUG
681 disable_kevent_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KEVENT_WQ"));
682 #endif
683 #endif
684 if (!disable_wq && !disable_qos) {
685 *wq_supported = _pthread_workqueue_supported();
686 #if DISPATCH_USE_KEVENT_WORKQUEUE
687 if (!disable_kevent_wq && (*wq_supported & WORKQ_FEATURE_KEVENT)) {
688 r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread3,
689 (pthread_workqueue_function_kevent_t)
690 _dispatch_kevent_worker_thread,
691 offsetof(struct dispatch_queue_s, dq_serialnum), 0);
692 #if DISPATCH_USE_MGR_THREAD
693 _dispatch_kevent_workqueue_enabled = !r;
694 #endif
695 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
696 _dispatch_evfilt_machport_direct_enabled = !r;
697 #endif
698 result = !r;
699 } else
700 #endif
701 if (*wq_supported & WORKQ_FEATURE_FINEPRIO) {
702 #if DISPATCH_USE_MGR_THREAD
703 r = _pthread_workqueue_init(_dispatch_worker_thread3,
704 offsetof(struct dispatch_queue_s, dq_serialnum), 0);
705 result = !r;
706 #endif
707 }
708 if (result) _dispatch_root_queues_init_qos(*wq_supported);
709 }
710 #endif // DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
711 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
712 if (!result && !disable_wq) {
713 pthread_workqueue_setdispatchoffset_np(
714 offsetof(struct dispatch_queue_s, dq_serialnum));
715 r = pthread_workqueue_setdispatch_np(_dispatch_worker_thread2);
716 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
717 (void)dispatch_assume_zero(r);
718 #endif
719 result = !r;
720 }
721 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
722 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
723 if (!result) {
724 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
725 pthread_workqueue_attr_t pwq_attr;
726 if (!disable_wq) {
727 r = pthread_workqueue_attr_init_np(&pwq_attr);
728 (void)dispatch_assume_zero(r);
729 }
730 #endif
731 int i;
732 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
733 pthread_workqueue_t pwq = NULL;
734 dispatch_root_queue_context_t qc;
735 qc = &_dispatch_root_queue_contexts[i];
736 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
737 if (!disable_wq && qc->dgq_wq_priority != WORKQ_PRIO_INVALID) {
738 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr,
739 qc->dgq_wq_priority);
740 (void)dispatch_assume_zero(r);
741 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr,
742 qc->dgq_wq_options &
743 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT);
744 (void)dispatch_assume_zero(r);
745 r = pthread_workqueue_create_np(&pwq, &pwq_attr);
746 (void)dispatch_assume_zero(r);
747 result = result || dispatch_assume(pwq);
748 }
749 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
750 qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul);
751 }
752 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
753 if (!disable_wq) {
754 r = pthread_workqueue_attr_destroy_np(&pwq_attr);
755 (void)dispatch_assume_zero(r);
756 }
757 #endif
758 }
759 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
760 #endif // HAVE_PTHREAD_WORKQUEUES
761 return result;
762 }
763
764 #if DISPATCH_USE_PTHREAD_POOL
765 static inline void
766 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc,
767 uint8_t pool_size, bool overcommit)
768 {
769 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
770 uint32_t thread_pool_size = overcommit ? MAX_PTHREAD_COUNT :
771 dispatch_hw_config(active_cpus);
772 if (slowpath(pool_size) && pool_size < thread_pool_size) {
773 thread_pool_size = pool_size;
774 }
775 qc->dgq_thread_pool_size = thread_pool_size;
776 #if HAVE_PTHREAD_WORKQUEUES
777 if (qc->dgq_qos) {
778 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
779 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
780 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
781 #if HAVE_PTHREAD_WORKQUEUE_QOS
782 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(
783 &pqc->dpq_thread_attr, qc->dgq_qos, 0));
784 #endif
785 }
786 #endif // HAVE_PTHREAD_WORKQUEUES
787 _os_semaphore_t *sema = &pqc->dpq_thread_mediator.dsema_sema;
788 _os_semaphore_init(sema, _OS_SEM_POLICY_LIFO);
789 _os_semaphore_create(sema, _OS_SEM_POLICY_LIFO);
790 }
791 #endif // DISPATCH_USE_PTHREAD_POOL
792
793 static dispatch_once_t _dispatch_root_queues_pred;
794
795 void
796 _dispatch_root_queues_init(void)
797 {
798 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
799 _dispatch_root_queues_init_once);
800 }
801
802 static void
803 _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
804 {
805 int wq_supported;
806 _dispatch_fork_becomes_unsafe();
807 if (!_dispatch_root_queues_init_workq(&wq_supported)) {
808 #if DISPATCH_ENABLE_THREAD_POOL
809 int i;
810 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
811 bool overcommit = true;
812 #if TARGET_OS_EMBEDDED
813 // some software hangs if the non-overcommitting queues do not
814 // overcommit when threads block. Someday, this behavior should
815 // apply to all platforms
816 if (!(i & 1)) {
817 overcommit = false;
818 }
819 #endif
820 _dispatch_root_queue_init_pthread_pool(
821 &_dispatch_root_queue_contexts[i], 0, overcommit);
822 }
823 #else
824 DISPATCH_INTERNAL_CRASH((errno << 16) | wq_supported,
825 "Root queue initialization failed");
826 #endif // DISPATCH_ENABLE_THREAD_POOL
827 }
828 }
829
830 DISPATCH_EXPORT DISPATCH_NOTHROW
831 void
832 libdispatch_init(void)
833 {
834 dispatch_assert(DISPATCH_QUEUE_QOS_COUNT == 6);
835 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 12);
836
837 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW ==
838 -DISPATCH_QUEUE_PRIORITY_HIGH);
839 dispatch_assert(countof(_dispatch_root_queues) ==
840 DISPATCH_ROOT_QUEUE_COUNT);
841 dispatch_assert(countof(_dispatch_root_queue_contexts) ==
842 DISPATCH_ROOT_QUEUE_COUNT);
843 dispatch_assert(countof(_dispatch_priority2qos) ==
844 DISPATCH_PRIORITY_COUNT);
845 #if HAVE_PTHREAD_WORKQUEUE_QOS
846 dispatch_assert(countof(_dispatch_priority2wq) ==
847 DISPATCH_PRIORITY_COUNT);
848 #endif
849 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
850 dispatch_assert(sizeof(_dispatch_wq2root_queues) /
851 sizeof(_dispatch_wq2root_queues[0][0]) ==
852 WORKQ_NUM_PRIOQUEUE * 2);
853 #endif
854 #if DISPATCH_ENABLE_THREAD_POOL
855 dispatch_assert(countof(_dispatch_pthread_root_queue_contexts) ==
856 DISPATCH_ROOT_QUEUE_COUNT);
857 #endif
858
859 dispatch_assert(offsetof(struct dispatch_continuation_s, do_next) ==
860 offsetof(struct dispatch_object_s, do_next));
861 dispatch_assert(offsetof(struct dispatch_continuation_s, do_vtable) ==
862 offsetof(struct dispatch_object_s, do_vtable));
863 dispatch_assert(sizeof(struct dispatch_apply_s) <=
864 DISPATCH_CONTINUATION_SIZE);
865 dispatch_assert(sizeof(struct dispatch_queue_s) % DISPATCH_CACHELINE_SIZE
866 == 0);
867 dispatch_assert(offsetof(struct dispatch_queue_s, dq_state) % _Alignof(uint64_t) == 0);
868 dispatch_assert(sizeof(struct dispatch_root_queue_context_s) %
869 DISPATCH_CACHELINE_SIZE == 0);
870
871
872 #if HAVE_PTHREAD_WORKQUEUE_QOS
873 // 26497968 _dispatch_user_initiated_priority should be set for qos
874 // propagation to work properly
875 pthread_priority_t p = _pthread_qos_class_encode(qos_class_main(), 0, 0);
876 _dispatch_main_q.dq_priority = (dispatch_priority_t)p;
877 _dispatch_main_q.dq_override = p & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
878 p = _pthread_qos_class_encode(_DISPATCH_QOS_CLASS_USER_INITIATED, 0, 0);
879 _dispatch_user_initiated_priority = p;
880 p = _pthread_qos_class_encode(_DISPATCH_QOS_CLASS_BACKGROUND, 0, 0);
881 _dispatch_background_priority = p;
882 #if DISPATCH_DEBUG
883 if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) {
884 _dispatch_set_qos_class_enabled = 1;
885 }
886 #endif
887 #endif
888
889 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
890 _dispatch_thread_key_create(&__dispatch_tsd_key, _libdispatch_tsd_cleanup);
891 #else
892 _dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup);
893 _dispatch_thread_key_create(&dispatch_deferred_items_key,
894 _dispatch_deferred_items_cleanup);
895 _dispatch_thread_key_create(&dispatch_frame_key, _dispatch_frame_cleanup);
896 _dispatch_thread_key_create(&dispatch_voucher_key, _voucher_thread_cleanup);
897 _dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup);
898 _dispatch_thread_key_create(&dispatch_context_key, _dispatch_context_cleanup);
899 _dispatch_thread_key_create(&dispatch_defaultpriority_key, NULL);
900 _dispatch_thread_key_create(&dispatch_pthread_root_queue_observer_hooks_key,
901 NULL);
902 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
903 _dispatch_thread_key_create(&dispatch_bcounter_key, NULL);
904 #endif
905 #if DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
906 if (DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK) {
907 _dispatch_thread_key_create(&dispatch_sema4_key,
908 _dispatch_thread_semaphore_dispose);
909 }
910 #endif
911 #endif
912
913 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
914 _dispatch_main_q.do_targetq = &_dispatch_root_queues[
915 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT];
916 #endif
917
918 _dispatch_queue_set_current(&_dispatch_main_q);
919 _dispatch_queue_set_bound_thread(&_dispatch_main_q);
920
921 #if DISPATCH_USE_PTHREAD_ATFORK
922 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare,
923 dispatch_atfork_parent, dispatch_atfork_child));
924 #endif
925 _dispatch_hw_config_init();
926 _dispatch_vtable_init();
927 _os_object_init();
928 _voucher_init();
929 _dispatch_introspection_init();
930 }
931
932 #if HAVE_MACH
933 static dispatch_once_t _dispatch_mach_host_port_pred;
934 static mach_port_t _dispatch_mach_host_port;
935
936 static void
937 _dispatch_mach_host_port_init(void *ctxt DISPATCH_UNUSED)
938 {
939 kern_return_t kr;
940 mach_port_t mp, mhp = mach_host_self();
941 kr = host_get_host_port(mhp, &mp);
942 DISPATCH_VERIFY_MIG(kr);
943 if (fastpath(!kr)) {
944 // mach_host_self returned the HOST_PRIV port
945 kr = mach_port_deallocate(mach_task_self(), mhp);
946 DISPATCH_VERIFY_MIG(kr);
947 mhp = mp;
948 } else if (kr != KERN_INVALID_ARGUMENT) {
949 (void)dispatch_assume_zero(kr);
950 }
951 if (!fastpath(mhp)) {
952 DISPATCH_CLIENT_CRASH(kr, "Could not get unprivileged host port");
953 }
954 _dispatch_mach_host_port = mhp;
955 }
956
957 mach_port_t
958 _dispatch_get_mach_host_port(void)
959 {
960 dispatch_once_f(&_dispatch_mach_host_port_pred, NULL,
961 _dispatch_mach_host_port_init);
962 return _dispatch_mach_host_port;
963 }
964 #endif
965
966 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
967 #include <unistd.h>
968 #include <sys/syscall.h>
969
970 #ifdef SYS_gettid
971 DISPATCH_ALWAYS_INLINE
972 static inline pid_t
973 gettid(void)
974 {
975 return (pid_t) syscall(SYS_gettid);
976 }
977 #else
978 #error "SYS_gettid unavailable on this system"
979 #endif
980
981 #define _tsd_call_cleanup(k, f) do { \
982 if ((f) && tsd->k) ((void(*)(void*))(f))(tsd->k); \
983 } while (0)
984
985 void
986 _libdispatch_tsd_cleanup(void *ctx)
987 {
988 struct dispatch_tsd *tsd = (struct dispatch_tsd*) ctx;
989
990 _tsd_call_cleanup(dispatch_queue_key, _dispatch_queue_cleanup);
991 _tsd_call_cleanup(dispatch_frame_key, _dispatch_frame_cleanup);
992 _tsd_call_cleanup(dispatch_cache_key, _dispatch_cache_cleanup);
993 _tsd_call_cleanup(dispatch_context_key, _dispatch_context_cleanup);
994 _tsd_call_cleanup(dispatch_pthread_root_queue_observer_hooks_key,
995 NULL);
996 _tsd_call_cleanup(dispatch_defaultpriority_key, NULL);
997 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
998 _tsd_call_cleanup(dispatch_bcounter_key, NULL);
999 #endif
1000 #if DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
1001 _tsd_call_cleanup(dispatch_sema4_key, _dispatch_thread_semaphore_dispose);
1002 #endif
1003 _tsd_call_cleanup(dispatch_priority_key, NULL);
1004 _tsd_call_cleanup(dispatch_voucher_key, _voucher_thread_cleanup);
1005 _tsd_call_cleanup(dispatch_deferred_items_key,
1006 _dispatch_deferred_items_cleanup);
1007 tsd->tid = 0;
1008 }
1009
1010 DISPATCH_NOINLINE
1011 void
1012 libdispatch_tsd_init(void)
1013 {
1014 pthread_setspecific(__dispatch_tsd_key, &__dispatch_tsd);
1015 __dispatch_tsd.tid = gettid();
1016 }
1017 #endif
1018
1019 DISPATCH_NOTHROW
1020 void
1021 _dispatch_queue_atfork_child(void)
1022 {
1023 void *crash = (void *)0x100;
1024 size_t i;
1025
1026 #if HAVE_MACH
1027 _dispatch_mach_host_port_pred = 0;
1028 _dispatch_mach_host_port = MACH_PORT_NULL;
1029 #endif
1030
1031 if (!_dispatch_is_multithreaded_inline()) return;
1032
1033 _dispatch_main_q.dq_items_head = crash;
1034 _dispatch_main_q.dq_items_tail = crash;
1035
1036 _dispatch_mgr_q.dq_items_head = crash;
1037 _dispatch_mgr_q.dq_items_tail = crash;
1038
1039 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
1040 _dispatch_root_queues[i].dq_items_head = crash;
1041 _dispatch_root_queues[i].dq_items_tail = crash;
1042 }
1043 }
1044
1045 #pragma mark -
1046 #pragma mark dispatch_queue_attr_t
1047
1048 DISPATCH_ALWAYS_INLINE
1049 static inline bool
1050 _dispatch_qos_class_valid(dispatch_qos_class_t qos_class, int relative_priority)
1051 {
1052 qos_class_t qos = (qos_class_t)qos_class;
1053 switch (qos) {
1054 case _DISPATCH_QOS_CLASS_MAINTENANCE:
1055 case _DISPATCH_QOS_CLASS_BACKGROUND:
1056 case _DISPATCH_QOS_CLASS_UTILITY:
1057 case _DISPATCH_QOS_CLASS_DEFAULT:
1058 case _DISPATCH_QOS_CLASS_USER_INITIATED:
1059 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE:
1060 case _DISPATCH_QOS_CLASS_UNSPECIFIED:
1061 break;
1062 default:
1063 return false;
1064 }
1065 if (relative_priority > 0 || relative_priority < QOS_MIN_RELATIVE_PRIORITY){
1066 return false;
1067 }
1068 return true;
1069 }
1070
1071 #define DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(qos) \
1072 [_DISPATCH_QOS_CLASS_##qos] = DQA_INDEX_QOS_CLASS_##qos
1073
1074 static const
1075 _dispatch_queue_attr_index_qos_class_t _dispatch_queue_attr_qos2idx[] = {
1076 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UNSPECIFIED),
1077 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(MAINTENANCE),
1078 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(BACKGROUND),
1079 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UTILITY),
1080 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(DEFAULT),
1081 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INITIATED),
1082 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INTERACTIVE),
1083 };
1084
1085 #define DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit) \
1086 ((overcommit) == _dispatch_queue_attr_overcommit_disabled ? \
1087 DQA_INDEX_NON_OVERCOMMIT : \
1088 ((overcommit) == _dispatch_queue_attr_overcommit_enabled ? \
1089 DQA_INDEX_OVERCOMMIT : DQA_INDEX_UNSPECIFIED_OVERCOMMIT))
1090
1091 #define DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent) \
1092 ((concurrent) ? DQA_INDEX_CONCURRENT : DQA_INDEX_SERIAL)
1093
1094 #define DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive) \
1095 ((inactive) ? DQA_INDEX_INACTIVE : DQA_INDEX_ACTIVE)
1096
1097 #define DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency) \
1098 (frequency)
1099
1100 #define DISPATCH_QUEUE_ATTR_PRIO2IDX(prio) (-(prio))
1101
1102 #define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (_dispatch_queue_attr_qos2idx[(qos)])
1103
1104 static inline dispatch_queue_attr_t
1105 _dispatch_get_queue_attr(qos_class_t qos, int prio,
1106 _dispatch_queue_attr_overcommit_t overcommit,
1107 dispatch_autorelease_frequency_t frequency,
1108 bool concurrent, bool inactive)
1109 {
1110 return (dispatch_queue_attr_t)&_dispatch_queue_attrs
1111 [DISPATCH_QUEUE_ATTR_QOS2IDX(qos)]
1112 [DISPATCH_QUEUE_ATTR_PRIO2IDX(prio)]
1113 [DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit)]
1114 [DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency)]
1115 [DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent)]
1116 [DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive)];
1117 }
1118
1119 dispatch_queue_attr_t
1120 _dispatch_get_default_queue_attr(void)
1121 {
1122 return _dispatch_get_queue_attr(_DISPATCH_QOS_CLASS_UNSPECIFIED, 0,
1123 _dispatch_queue_attr_overcommit_unspecified,
1124 DISPATCH_AUTORELEASE_FREQUENCY_INHERIT, false, false);
1125 }
1126
1127 dispatch_queue_attr_t
1128 dispatch_queue_attr_make_with_qos_class(dispatch_queue_attr_t dqa,
1129 dispatch_qos_class_t qos_class, int relative_priority)
1130 {
1131 if (!_dispatch_qos_class_valid(qos_class, relative_priority)) {
1132 return DISPATCH_BAD_INPUT;
1133 }
1134 if (!slowpath(dqa)) {
1135 dqa = _dispatch_get_default_queue_attr();
1136 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1137 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1138 }
1139 return _dispatch_get_queue_attr(qos_class, relative_priority,
1140 dqa->dqa_overcommit, dqa->dqa_autorelease_frequency,
1141 dqa->dqa_concurrent, dqa->dqa_inactive);
1142 }
1143
1144 dispatch_queue_attr_t
1145 dispatch_queue_attr_make_initially_inactive(dispatch_queue_attr_t dqa)
1146 {
1147 if (!slowpath(dqa)) {
1148 dqa = _dispatch_get_default_queue_attr();
1149 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1150 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1151 }
1152 return _dispatch_get_queue_attr(dqa->dqa_qos_class,
1153 dqa->dqa_relative_priority, dqa->dqa_overcommit,
1154 dqa->dqa_autorelease_frequency, dqa->dqa_concurrent, true);
1155 }
1156
1157 dispatch_queue_attr_t
1158 dispatch_queue_attr_make_with_overcommit(dispatch_queue_attr_t dqa,
1159 bool overcommit)
1160 {
1161 if (!slowpath(dqa)) {
1162 dqa = _dispatch_get_default_queue_attr();
1163 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1164 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1165 }
1166 return _dispatch_get_queue_attr(dqa->dqa_qos_class,
1167 dqa->dqa_relative_priority, overcommit ?
1168 _dispatch_queue_attr_overcommit_enabled :
1169 _dispatch_queue_attr_overcommit_disabled,
1170 dqa->dqa_autorelease_frequency, dqa->dqa_concurrent,
1171 dqa->dqa_inactive);
1172 }
1173
1174 dispatch_queue_attr_t
1175 dispatch_queue_attr_make_with_autorelease_frequency(dispatch_queue_attr_t dqa,
1176 dispatch_autorelease_frequency_t frequency)
1177 {
1178 switch (frequency) {
1179 case DISPATCH_AUTORELEASE_FREQUENCY_INHERIT:
1180 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
1181 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
1182 break;
1183 default:
1184 return DISPATCH_BAD_INPUT;
1185 }
1186 if (!slowpath(dqa)) {
1187 dqa = _dispatch_get_default_queue_attr();
1188 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1189 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1190 }
1191 return _dispatch_get_queue_attr(dqa->dqa_qos_class,
1192 dqa->dqa_relative_priority, dqa->dqa_overcommit,
1193 frequency, dqa->dqa_concurrent, dqa->dqa_inactive);
1194 }
1195
1196 #pragma mark -
1197 #pragma mark dispatch_queue_t
1198
1199 // skip zero
1200 // 1 - main_q
1201 // 2 - mgr_q
1202 // 3 - mgr_root_q
1203 // 4,5,6,7,8,9,10,11,12,13,14,15 - global queues
1204 // we use 'xadd' on Intel, so the initial value == next assigned
1205 unsigned long volatile _dispatch_queue_serial_numbers = 16;
1206
1207 DISPATCH_NOINLINE
1208 static dispatch_queue_t
1209 _dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
1210 dispatch_queue_t tq, bool legacy)
1211 {
1212 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1213 // Be sure the root queue priorities are set
1214 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
1215 _dispatch_root_queues_init_once);
1216 #endif
1217 if (!slowpath(dqa)) {
1218 dqa = _dispatch_get_default_queue_attr();
1219 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1220 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1221 }
1222
1223 //
1224 // Step 1: Normalize arguments (qos, overcommit, tq)
1225 //
1226
1227 qos_class_t qos = dqa->dqa_qos_class;
1228 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1229 if (qos == _DISPATCH_QOS_CLASS_USER_INTERACTIVE &&
1230 !_dispatch_root_queues[
1231 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS].dq_priority) {
1232 qos = _DISPATCH_QOS_CLASS_USER_INITIATED;
1233 }
1234 #endif
1235 bool maintenance_fallback = false;
1236 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1237 maintenance_fallback = true;
1238 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1239 if (maintenance_fallback) {
1240 if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
1241 !_dispatch_root_queues[
1242 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS].dq_priority) {
1243 qos = _DISPATCH_QOS_CLASS_BACKGROUND;
1244 }
1245 }
1246
1247 _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
1248 if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
1249 if (tq->do_targetq) {
1250 DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
1251 "a non-global target queue");
1252 }
1253 }
1254
1255 if (tq && !tq->do_targetq &&
1256 tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
1257 // Handle discrepancies between attr and target queue, attributes win
1258 if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
1259 if (tq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
1260 overcommit = _dispatch_queue_attr_overcommit_enabled;
1261 } else {
1262 overcommit = _dispatch_queue_attr_overcommit_disabled;
1263 }
1264 }
1265 if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) {
1266 tq = _dispatch_get_root_queue_with_overcommit(tq,
1267 overcommit == _dispatch_queue_attr_overcommit_enabled);
1268 } else {
1269 tq = NULL;
1270 }
1271 } else if (tq && !tq->do_targetq) {
1272 // target is a pthread or runloop root queue, setting QoS or overcommit
1273 // is disallowed
1274 if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
1275 DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
1276 "and use this kind of target queue");
1277 }
1278 if (qos != _DISPATCH_QOS_CLASS_UNSPECIFIED) {
1279 DISPATCH_CLIENT_CRASH(tq, "Cannot specify a QoS attribute "
1280 "and use this kind of target queue");
1281 }
1282 } else {
1283 if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
1284 // Serial queues default to overcommit!
1285 overcommit = dqa->dqa_concurrent ?
1286 _dispatch_queue_attr_overcommit_disabled :
1287 _dispatch_queue_attr_overcommit_enabled;
1288 }
1289 }
1290 if (!tq) {
1291 qos_class_t tq_qos = qos == _DISPATCH_QOS_CLASS_UNSPECIFIED ?
1292 _DISPATCH_QOS_CLASS_DEFAULT : qos;
1293 tq = _dispatch_get_root_queue(tq_qos, overcommit ==
1294 _dispatch_queue_attr_overcommit_enabled);
1295 if (slowpath(!tq)) {
1296 DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
1297 }
1298 }
1299
1300 //
1301 // Step 2: Initialize the queue
1302 //
1303
1304 if (legacy) {
1305 // if any of these attributes is specified, use non legacy classes
1306 if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
1307 legacy = false;
1308 }
1309 }
1310
1311 const void *vtable;
1312 dispatch_queue_flags_t dqf = 0;
1313 if (legacy) {
1314 vtable = DISPATCH_VTABLE(queue);
1315 } else if (dqa->dqa_concurrent) {
1316 vtable = DISPATCH_VTABLE(queue_concurrent);
1317 } else {
1318 vtable = DISPATCH_VTABLE(queue_serial);
1319 }
1320 switch (dqa->dqa_autorelease_frequency) {
1321 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
1322 dqf |= DQF_AUTORELEASE_NEVER;
1323 break;
1324 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
1325 dqf |= DQF_AUTORELEASE_ALWAYS;
1326 break;
1327 }
1328 if (label) {
1329 const char *tmp = _dispatch_strdup_if_mutable(label);
1330 if (tmp != label) {
1331 dqf |= DQF_LABEL_NEEDS_FREE;
1332 label = tmp;
1333 }
1334 }
1335
1336 dispatch_queue_t dq = _dispatch_alloc(vtable,
1337 sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
1338 _dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
1339 DISPATCH_QUEUE_WIDTH_MAX : 1, dqa->dqa_inactive);
1340
1341 dq->dq_label = label;
1342
1343 #if HAVE_PTHREAD_WORKQUEUE_QOS
1344 dq->dq_priority = (dispatch_priority_t)_pthread_qos_class_encode(qos,
1345 dqa->dqa_relative_priority,
1346 overcommit == _dispatch_queue_attr_overcommit_enabled ?
1347 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0);
1348 #endif
1349 _dispatch_retain(tq);
1350 if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) {
1351 // legacy way of inherithing the QoS from the target
1352 _dispatch_queue_priority_inherit_from_target(dq, tq);
1353 }
1354 if (!dqa->dqa_inactive) {
1355 _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
1356 }
1357 dq->do_targetq = tq;
1358 _dispatch_object_debug(dq, "%s", __func__);
1359 return _dispatch_introspection_queue_create(dq);
1360 }
1361
1362 dispatch_queue_t
1363 dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
1364 dispatch_queue_t tq)
1365 {
1366 return _dispatch_queue_create_with_target(label, dqa, tq, false);
1367 }
1368
1369 dispatch_queue_t
1370 dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
1371 {
1372 return _dispatch_queue_create_with_target(label, attr,
1373 DISPATCH_TARGET_QUEUE_DEFAULT, true);
1374 }
1375
1376 dispatch_queue_t
1377 dispatch_queue_create_with_accounting_override_voucher(const char *label,
1378 dispatch_queue_attr_t attr, voucher_t voucher)
1379 {
1380 dispatch_queue_t dq = dispatch_queue_create_with_target(label, attr,
1381 DISPATCH_TARGET_QUEUE_DEFAULT);
1382 dq->dq_override_voucher = _voucher_create_accounting_voucher(voucher);
1383 return dq;
1384 }
1385
1386 void
1387 _dispatch_queue_destroy(dispatch_queue_t dq)
1388 {
1389 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
1390 uint64_t initial_state = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
1391
1392 if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
1393 initial_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE;
1394 }
1395 if (dx_type(dq) == DISPATCH_SOURCE_KEVENT_TYPE) {
1396 // dispatch_cancel_and_wait may apply overrides in a racy way with
1397 // the source cancellation finishing. This race is expensive and not
1398 // really worthwhile to resolve since the source becomes dead anyway.
1399 dq_state &= ~DISPATCH_QUEUE_HAS_OVERRIDE;
1400 }
1401 if (slowpath(dq_state != initial_state)) {
1402 if (_dq_state_drain_locked(dq_state)) {
1403 DISPATCH_CLIENT_CRASH(dq, "Release of a locked queue");
1404 }
1405 #ifndef __LP64__
1406 dq_state >>= 32;
1407 #endif
1408 DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
1409 "Release of a queue with corrupt state");
1410 }
1411 if (slowpath(dq == _dispatch_queue_get_current())) {
1412 DISPATCH_CLIENT_CRASH(dq, "Release of a queue by itself");
1413 }
1414 if (slowpath(dq->dq_items_tail)) {
1415 DISPATCH_CLIENT_CRASH(dq->dq_items_tail,
1416 "Release of a queue while items are enqueued");
1417 }
1418
1419 // trash the queue so that use after free will crash
1420 dq->dq_items_head = (void *)0x200;
1421 dq->dq_items_tail = (void *)0x200;
1422 // poison the state with something that is suspended and is easy to spot
1423 dq->dq_state = 0xdead000000000000;
1424
1425 dispatch_queue_t dqsq = os_atomic_xchg2o(dq, dq_specific_q,
1426 (void *)0x200, relaxed);
1427 if (dqsq) {
1428 _dispatch_release(dqsq);
1429 }
1430 if (dq->dq_override_voucher != DISPATCH_NO_VOUCHER) {
1431 if (dq->dq_override_voucher) _voucher_release(dq->dq_override_voucher);
1432 dq->dq_override_voucher = DISPATCH_NO_VOUCHER;
1433 }
1434 }
1435
1436 // 6618342 Contact the team that owns the Instrument DTrace probe before
1437 // renaming this symbol
1438 void
1439 _dispatch_queue_dispose(dispatch_queue_t dq)
1440 {
1441 _dispatch_object_debug(dq, "%s", __func__);
1442 _dispatch_introspection_queue_dispose(dq);
1443 if (dq->dq_label && _dispatch_queue_label_needs_free(dq)) {
1444 free((void*)dq->dq_label);
1445 }
1446 _dispatch_queue_destroy(dq);
1447 }
1448
1449 DISPATCH_NOINLINE
1450 static void
1451 _dispatch_queue_suspend_slow(dispatch_queue_t dq)
1452 {
1453 uint64_t dq_state, value, delta;
1454
1455 _dispatch_queue_sidelock_lock(dq);
1456
1457 // what we want to transfer (remove from dq_state)
1458 delta = DISPATCH_QUEUE_SUSPEND_HALF * DISPATCH_QUEUE_SUSPEND_INTERVAL;
1459 // but this is a suspend so add a suspend count at the same time
1460 delta -= DISPATCH_QUEUE_SUSPEND_INTERVAL;
1461 if (dq->dq_side_suspend_cnt == 0) {
1462 // we substract delta from dq_state, and we want to set this bit
1463 delta -= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT;
1464 }
1465
1466 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1467 // unsigned underflow of the substraction can happen because other
1468 // threads could have touched this value while we were trying to acquire
1469 // the lock, or because another thread raced us to do the same operation
1470 // and got to the lock first.
1471 if (slowpath(os_sub_overflow(dq_state, delta, &value))) {
1472 os_atomic_rmw_loop_give_up(goto retry);
1473 }
1474 });
1475 if (slowpath(os_add_overflow(dq->dq_side_suspend_cnt,
1476 DISPATCH_QUEUE_SUSPEND_HALF, &dq->dq_side_suspend_cnt))) {
1477 DISPATCH_CLIENT_CRASH(0, "Too many nested calls to dispatch_suspend()");
1478 }
1479 return _dispatch_queue_sidelock_unlock(dq);
1480
1481 retry:
1482 _dispatch_queue_sidelock_unlock(dq);
1483 return dx_vtable(dq)->do_suspend(dq);
1484 }
1485
1486 void
1487 _dispatch_queue_suspend(dispatch_queue_t dq)
1488 {
1489 dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT);
1490
1491 uint64_t dq_state, value;
1492
1493 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1494 value = DISPATCH_QUEUE_SUSPEND_INTERVAL;
1495 if (slowpath(os_add_overflow(dq_state, value, &value))) {
1496 os_atomic_rmw_loop_give_up({
1497 return _dispatch_queue_suspend_slow(dq);
1498 });
1499 }
1500 });
1501
1502 if (!_dq_state_is_suspended(dq_state)) {
1503 // rdar://8181908 we need to extend the queue life for the duration
1504 // of the call to wakeup at _dispatch_queue_resume() time.
1505 _dispatch_retain(dq);
1506 }
1507 }
1508
1509 DISPATCH_NOINLINE
1510 static void
1511 _dispatch_queue_resume_slow(dispatch_queue_t dq)
1512 {
1513 uint64_t dq_state, value, delta;
1514
1515 _dispatch_queue_sidelock_lock(dq);
1516
1517 // what we want to transfer
1518 delta = DISPATCH_QUEUE_SUSPEND_HALF * DISPATCH_QUEUE_SUSPEND_INTERVAL;
1519 // but this is a resume so consume a suspend count at the same time
1520 delta -= DISPATCH_QUEUE_SUSPEND_INTERVAL;
1521 switch (dq->dq_side_suspend_cnt) {
1522 case 0:
1523 goto retry;
1524 case DISPATCH_QUEUE_SUSPEND_HALF:
1525 // we will transition the side count to 0, so we want to clear this bit
1526 delta -= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT;
1527 break;
1528 }
1529 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1530 // unsigned overflow of the addition can happen because other
1531 // threads could have touched this value while we were trying to acquire
1532 // the lock, or because another thread raced us to do the same operation
1533 // and got to the lock first.
1534 if (slowpath(os_add_overflow(dq_state, delta, &value))) {
1535 os_atomic_rmw_loop_give_up(goto retry);
1536 }
1537 });
1538 dq->dq_side_suspend_cnt -= DISPATCH_QUEUE_SUSPEND_HALF;
1539 return _dispatch_queue_sidelock_unlock(dq);
1540
1541 retry:
1542 _dispatch_queue_sidelock_unlock(dq);
1543 return dx_vtable(dq)->do_resume(dq, false);
1544 }
1545
1546 DISPATCH_NOINLINE
1547 static void
1548 _dispatch_queue_resume_finalize_activation(dispatch_queue_t dq)
1549 {
1550 // Step 2: run the activation finalizer
1551 if (dx_vtable(dq)->do_finalize_activation) {
1552 dx_vtable(dq)->do_finalize_activation(dq);
1553 }
1554 // Step 3: consume the suspend count
1555 return dx_vtable(dq)->do_resume(dq, false);
1556 }
1557
1558 void
1559 _dispatch_queue_resume(dispatch_queue_t dq, bool activate)
1560 {
1561 // covers all suspend and inactive bits, including side suspend bit
1562 const uint64_t suspend_bits = DISPATCH_QUEUE_SUSPEND_BITS_MASK;
1563 // backward compatibility: only dispatch sources can abuse
1564 // dispatch_resume() to really mean dispatch_activate()
1565 bool resume_can_activate = (dx_type(dq) == DISPATCH_SOURCE_KEVENT_TYPE);
1566 uint64_t dq_state, value;
1567
1568 dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT);
1569
1570 // Activation is a bit tricky as it needs to finalize before the wakeup.
1571 //
1572 // If after doing its updates to the suspend count and/or inactive bit,
1573 // the last suspension related bit that would remain is the
1574 // NEEDS_ACTIVATION one, then this function:
1575 //
1576 // 1. moves the state to { sc:1 i:0 na:0 } (converts the needs-activate into
1577 // a suspend count)
1578 // 2. runs the activation finalizer
1579 // 3. consumes the suspend count set in (1), and finishes the resume flow
1580 //
1581 // Concurrently, some property setters such as setting dispatch source
1582 // handlers or _dispatch_queue_set_target_queue try to do in-place changes
1583 // before activation. These protect their action by taking a suspend count.
1584 // Step (1) above cannot happen if such a setter has locked the object.
1585 if (activate) {
1586 // relaxed atomic because this doesn't publish anything, this is only
1587 // about picking the thread that gets to finalize the activation
1588 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1589 if ((dq_state & suspend_bits) ==
1590 DISPATCH_QUEUE_NEEDS_ACTIVATION + DISPATCH_QUEUE_INACTIVE) {
1591 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1592 value = dq_state - DISPATCH_QUEUE_INACTIVE
1593 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1594 + DISPATCH_QUEUE_SUSPEND_INTERVAL;
1595 } else if (_dq_state_is_inactive(dq_state)) {
1596 // { sc:>0 i:1 na:1 } -> { i:0 na:1 }
1597 // simple activation because sc is not 0
1598 // resume will deal with na:1 later
1599 value = dq_state - DISPATCH_QUEUE_INACTIVE;
1600 } else {
1601 // object already active, this is a no-op, just exit
1602 os_atomic_rmw_loop_give_up(return);
1603 }
1604 });
1605 } else {
1606 // release barrier needed to publish the effect of
1607 // - dispatch_set_target_queue()
1608 // - dispatch_set_*_handler()
1609 // - do_finalize_activation()
1610 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, release, {
1611 if ((dq_state & suspend_bits) == DISPATCH_QUEUE_SUSPEND_INTERVAL
1612 + DISPATCH_QUEUE_NEEDS_ACTIVATION) {
1613 // { sc:1 i:0 na:1 } -> { sc:1 i:0 na:0 }
1614 value = dq_state - DISPATCH_QUEUE_NEEDS_ACTIVATION;
1615 } else if (resume_can_activate && (dq_state & suspend_bits) ==
1616 DISPATCH_QUEUE_NEEDS_ACTIVATION + DISPATCH_QUEUE_INACTIVE) {
1617 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1618 value = dq_state - DISPATCH_QUEUE_INACTIVE
1619 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1620 + DISPATCH_QUEUE_SUSPEND_INTERVAL;
1621 } else {
1622 value = DISPATCH_QUEUE_SUSPEND_INTERVAL;
1623 if (slowpath(os_sub_overflow(dq_state, value, &value))) {
1624 // underflow means over-resume or a suspend count transfer
1625 // to the side count is needed
1626 os_atomic_rmw_loop_give_up({
1627 if (!(dq_state & DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT)) {
1628 goto over_resume;
1629 }
1630 return _dispatch_queue_resume_slow(dq);
1631 });
1632 }
1633 if (_dq_state_is_runnable(value) &&
1634 !_dq_state_drain_locked(value)) {
1635 uint64_t full_width = value;
1636 if (_dq_state_has_pending_barrier(value)) {
1637 full_width -= DISPATCH_QUEUE_PENDING_BARRIER;
1638 full_width += DISPATCH_QUEUE_WIDTH_INTERVAL;
1639 full_width += DISPATCH_QUEUE_IN_BARRIER;
1640 } else {
1641 full_width += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
1642 full_width += DISPATCH_QUEUE_IN_BARRIER;
1643 }
1644 if ((full_width & DISPATCH_QUEUE_WIDTH_MASK) ==
1645 DISPATCH_QUEUE_WIDTH_FULL_BIT) {
1646 value = full_width;
1647 value &= ~DISPATCH_QUEUE_DIRTY;
1648 value |= _dispatch_tid_self();
1649 }
1650 }
1651 }
1652 });
1653 }
1654
1655 if ((dq_state ^ value) & DISPATCH_QUEUE_NEEDS_ACTIVATION) {
1656 // we cleared the NEEDS_ACTIVATION bit and we have a valid suspend count
1657 return _dispatch_queue_resume_finalize_activation(dq);
1658 }
1659
1660 if (activate) {
1661 // if we're still in an activate codepath here we should have
1662 // { sc:>0 na:1 }, if not we've got a corrupt state
1663 if (!fastpath(_dq_state_is_suspended(value))) {
1664 DISPATCH_CLIENT_CRASH(dq, "Invalid suspension state");
1665 }
1666 return;
1667 }
1668
1669 if (_dq_state_is_suspended(value)) {
1670 return;
1671 }
1672
1673 if ((dq_state ^ value) & DISPATCH_QUEUE_IN_BARRIER) {
1674 _dispatch_try_lock_transfer_or_wakeup(dq);
1675 } else if (_dq_state_should_wakeup(value)) {
1676 // <rdar://problem/14637483>
1677 // seq_cst wrt state changes that were flushed and not acted upon
1678 os_atomic_thread_fence(acquire);
1679 pthread_priority_t pp = _dispatch_queue_reset_override_priority(dq,
1680 _dispatch_queue_is_thread_bound(dq));
1681 // Balancing the retain() done in suspend() for rdar://8181908
1682 return dx_wakeup(dq, pp, DISPATCH_WAKEUP_CONSUME);
1683 }
1684
1685 // Balancing the retain() done in suspend() for rdar://8181908
1686 return _dispatch_release_tailcall(dq);
1687
1688 over_resume:
1689 if (slowpath(_dq_state_is_inactive(dq_state))) {
1690 DISPATCH_CLIENT_CRASH(dq, "Over-resume of an inactive object");
1691 }
1692 DISPATCH_CLIENT_CRASH(dq, "Over-resume of an object");
1693 }
1694
1695 const char *
1696 dispatch_queue_get_label(dispatch_queue_t dq)
1697 {
1698 if (slowpath(dq == DISPATCH_CURRENT_QUEUE_LABEL)) {
1699 dq = _dispatch_get_current_queue();
1700 }
1701 return dq->dq_label ? dq->dq_label : "";
1702 }
1703
1704 qos_class_t
1705 dispatch_queue_get_qos_class(dispatch_queue_t dq, int *relative_priority_ptr)
1706 {
1707 qos_class_t qos = _DISPATCH_QOS_CLASS_UNSPECIFIED;
1708 int relative_priority = 0;
1709 #if HAVE_PTHREAD_WORKQUEUE_QOS
1710 pthread_priority_t dqp = dq->dq_priority;
1711 if (dqp & _PTHREAD_PRIORITY_INHERIT_FLAG) dqp = 0;
1712 qos = _pthread_qos_class_decode(dqp, &relative_priority, NULL);
1713 #else
1714 (void)dq;
1715 #endif
1716 if (relative_priority_ptr) *relative_priority_ptr = relative_priority;
1717 return qos;
1718 }
1719
1720 static void
1721 _dispatch_queue_set_width2(void *ctxt)
1722 {
1723 int w = (int)(intptr_t)ctxt; // intentional truncation
1724 uint32_t tmp;
1725 dispatch_queue_t dq = _dispatch_queue_get_current();
1726
1727 if (w > 0) {
1728 tmp = (unsigned int)w;
1729 } else switch (w) {
1730 case 0:
1731 tmp = 1;
1732 break;
1733 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
1734 tmp = dispatch_hw_config(physical_cpus);
1735 break;
1736 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
1737 tmp = dispatch_hw_config(active_cpus);
1738 break;
1739 default:
1740 // fall through
1741 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
1742 tmp = dispatch_hw_config(logical_cpus);
1743 break;
1744 }
1745 if (tmp > DISPATCH_QUEUE_WIDTH_MAX) {
1746 tmp = DISPATCH_QUEUE_WIDTH_MAX;
1747 }
1748
1749 dispatch_queue_flags_t old_dqf, new_dqf;
1750 os_atomic_rmw_loop2o(dq, dq_atomic_flags, old_dqf, new_dqf, relaxed, {
1751 new_dqf = old_dqf & ~DQF_WIDTH_MASK;
1752 new_dqf |= (tmp << DQF_WIDTH_SHIFT);
1753 });
1754 _dispatch_object_debug(dq, "%s", __func__);
1755 }
1756
1757 void
1758 dispatch_queue_set_width(dispatch_queue_t dq, long width)
1759 {
1760 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) ||
1761 slowpath(dx_hastypeflag(dq, QUEUE_ROOT))) {
1762 return;
1763 }
1764
1765 unsigned long type = dx_type(dq);
1766 switch (type) {
1767 case DISPATCH_QUEUE_LEGACY_TYPE:
1768 case DISPATCH_QUEUE_CONCURRENT_TYPE:
1769 break;
1770 case DISPATCH_QUEUE_SERIAL_TYPE:
1771 DISPATCH_CLIENT_CRASH(type, "Cannot set width of a serial queue");
1772 default:
1773 DISPATCH_CLIENT_CRASH(type, "Unexpected dispatch object type");
1774 }
1775
1776 _dispatch_barrier_trysync_or_async_f(dq, (void*)(intptr_t)width,
1777 _dispatch_queue_set_width2);
1778 }
1779
1780 static void
1781 _dispatch_queue_legacy_set_target_queue(void *ctxt)
1782 {
1783 dispatch_queue_t dq = _dispatch_queue_get_current();
1784 dispatch_queue_t tq = ctxt;
1785 dispatch_queue_t otq = dq->do_targetq;
1786
1787 if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
1788 _dispatch_ktrace3(DISPATCH_PERF_non_leaf_retarget, dq, otq, tq);
1789 _dispatch_bug_deprecated("Changing the target of a queue "
1790 "already targeted by other dispatch objects");
1791 }
1792
1793 _dispatch_queue_priority_inherit_from_target(dq, tq);
1794 _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
1795 #if HAVE_PTHREAD_WORKQUEUE_QOS
1796 // see _dispatch_queue_class_wakeup()
1797 _dispatch_queue_sidelock_lock(dq);
1798 #endif
1799 dq->do_targetq = tq;
1800 #if HAVE_PTHREAD_WORKQUEUE_QOS
1801 // see _dispatch_queue_class_wakeup()
1802 _dispatch_queue_sidelock_unlock(dq);
1803 #endif
1804
1805 _dispatch_object_debug(dq, "%s", __func__);
1806 _dispatch_introspection_target_queue_changed(dq);
1807 _dispatch_release_tailcall(otq);
1808 }
1809
1810 void
1811 _dispatch_queue_set_target_queue(dispatch_queue_t dq, dispatch_queue_t tq)
1812 {
1813 dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT &&
1814 dq->do_targetq);
1815
1816 if (slowpath(!tq)) {
1817 bool is_concurrent_q = (dq->dq_width > 1);
1818 tq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
1819 !is_concurrent_q);
1820 }
1821
1822 if (_dispatch_queue_try_inactive_suspend(dq)) {
1823 _dispatch_object_set_target_queue_inline(dq, tq);
1824 return dx_vtable(dq)->do_resume(dq, false);
1825 }
1826
1827 if (dq->dq_override_voucher != DISPATCH_NO_VOUCHER) {
1828 DISPATCH_CLIENT_CRASH(dq, "Cannot change the target of a queue or "
1829 "source with an accounting override voucher "
1830 "after it has been activated");
1831 }
1832
1833 unsigned long type = dx_type(dq);
1834 switch (type) {
1835 case DISPATCH_QUEUE_LEGACY_TYPE:
1836 if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
1837 _dispatch_bug_deprecated("Changing the target of a queue "
1838 "already targeted by other dispatch objects");
1839 }
1840 break;
1841 case DISPATCH_SOURCE_KEVENT_TYPE:
1842 case DISPATCH_MACH_CHANNEL_TYPE:
1843 _dispatch_ktrace1(DISPATCH_PERF_post_activate_retarget, dq);
1844 _dispatch_bug_deprecated("Changing the target of a source "
1845 "after it has been activated");
1846 break;
1847
1848 case DISPATCH_QUEUE_SERIAL_TYPE:
1849 case DISPATCH_QUEUE_CONCURRENT_TYPE:
1850 DISPATCH_CLIENT_CRASH(type, "Cannot change the target of this queue "
1851 "after it has been activated");
1852 default:
1853 DISPATCH_CLIENT_CRASH(type, "Unexpected dispatch object type");
1854 }
1855
1856 _dispatch_retain(tq);
1857 return _dispatch_barrier_trysync_or_async_f(dq, tq,
1858 _dispatch_queue_legacy_set_target_queue);
1859 }
1860
1861 #pragma mark -
1862 #pragma mark dispatch_mgr_queue
1863
1864 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1865 static struct dispatch_pthread_root_queue_context_s
1866 _dispatch_mgr_root_queue_pthread_context;
1867 static struct dispatch_root_queue_context_s
1868 _dispatch_mgr_root_queue_context = {{{
1869 #if HAVE_PTHREAD_WORKQUEUES
1870 .dgq_kworkqueue = (void*)(~0ul),
1871 #endif
1872 .dgq_ctxt = &_dispatch_mgr_root_queue_pthread_context,
1873 .dgq_thread_pool_size = 1,
1874 }}};
1875
1876 static struct dispatch_queue_s _dispatch_mgr_root_queue = {
1877 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root),
1878 .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
1879 .do_ctxt = &_dispatch_mgr_root_queue_context,
1880 .dq_label = "com.apple.root.libdispatch-manager",
1881 .dq_width = DISPATCH_QUEUE_WIDTH_POOL,
1882 .dq_override = DISPATCH_SATURATED_OVERRIDE,
1883 .dq_override_voucher = DISPATCH_NO_VOUCHER,
1884 .dq_serialnum = 3,
1885 };
1886 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1887
1888 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
1889 static struct {
1890 volatile int prio;
1891 volatile qos_class_t qos;
1892 int default_prio;
1893 int policy;
1894 pthread_t tid;
1895 } _dispatch_mgr_sched;
1896
1897 static dispatch_once_t _dispatch_mgr_sched_pred;
1898
1899 // TODO: switch to "event-reflector thread" property <rdar://problem/18126138>
1900
1901 #if HAVE_PTHREAD_WORKQUEUE_QOS
1902 // Must be kept in sync with list of qos classes in sys/qos.h
1903 static const int _dispatch_mgr_sched_qos2prio[] = {
1904 [_DISPATCH_QOS_CLASS_MAINTENANCE] = 4,
1905 [_DISPATCH_QOS_CLASS_BACKGROUND] = 4,
1906 [_DISPATCH_QOS_CLASS_UTILITY] = 20,
1907 [_DISPATCH_QOS_CLASS_DEFAULT] = 31,
1908 [_DISPATCH_QOS_CLASS_USER_INITIATED] = 37,
1909 [_DISPATCH_QOS_CLASS_USER_INTERACTIVE] = 47,
1910 };
1911 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
1912
1913 static void
1914 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED)
1915 {
1916 struct sched_param param;
1917 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1918 pthread_attr_t *attr;
1919 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
1920 #else
1921 pthread_attr_t a, *attr = &a;
1922 #endif
1923 (void)dispatch_assume_zero(pthread_attr_init(attr));
1924 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr,
1925 &_dispatch_mgr_sched.policy));
1926 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
1927 #if HAVE_PTHREAD_WORKQUEUE_QOS
1928 qos_class_t qos = qos_class_main();
1929 if (qos == _DISPATCH_QOS_CLASS_DEFAULT) {
1930 qos = _DISPATCH_QOS_CLASS_USER_INITIATED; // rdar://problem/17279292
1931 }
1932 if (qos) {
1933 _dispatch_mgr_sched.qos = qos;
1934 param.sched_priority = _dispatch_mgr_sched_qos2prio[qos];
1935 }
1936 #endif
1937 _dispatch_mgr_sched.default_prio = param.sched_priority;
1938 _dispatch_mgr_sched.prio = _dispatch_mgr_sched.default_prio;
1939 }
1940 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
1941
1942 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1943 DISPATCH_NOINLINE
1944 static pthread_t *
1945 _dispatch_mgr_root_queue_init(void)
1946 {
1947 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
1948 struct sched_param param;
1949 pthread_attr_t *attr;
1950 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
1951 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr,
1952 PTHREAD_CREATE_DETACHED));
1953 #if !DISPATCH_DEBUG
1954 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr, 64 * 1024));
1955 #endif
1956 #if HAVE_PTHREAD_WORKQUEUE_QOS
1957 qos_class_t qos = _dispatch_mgr_sched.qos;
1958 if (qos) {
1959 if (_dispatch_set_qos_class_enabled) {
1960 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(attr,
1961 qos, 0));
1962 }
1963 _dispatch_mgr_q.dq_priority =
1964 (dispatch_priority_t)_pthread_qos_class_encode(qos, 0, 0);
1965 }
1966 #endif
1967 param.sched_priority = _dispatch_mgr_sched.prio;
1968 if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
1969 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr, &param));
1970 }
1971 return &_dispatch_mgr_sched.tid;
1972 }
1973
1974 static inline void
1975 _dispatch_mgr_priority_apply(void)
1976 {
1977 struct sched_param param;
1978 do {
1979 param.sched_priority = _dispatch_mgr_sched.prio;
1980 if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
1981 (void)dispatch_assume_zero(pthread_setschedparam(
1982 _dispatch_mgr_sched.tid, _dispatch_mgr_sched.policy,
1983 &param));
1984 }
1985 } while (_dispatch_mgr_sched.prio > param.sched_priority);
1986 }
1987
1988 DISPATCH_NOINLINE
1989 void
1990 _dispatch_mgr_priority_init(void)
1991 {
1992 struct sched_param param;
1993 pthread_attr_t *attr;
1994 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
1995 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
1996 #if HAVE_PTHREAD_WORKQUEUE_QOS
1997 qos_class_t qos = 0;
1998 (void)pthread_attr_get_qos_class_np(attr, &qos, NULL);
1999 if (_dispatch_mgr_sched.qos > qos && _dispatch_set_qos_class_enabled) {
2000 (void)pthread_set_qos_class_self_np(_dispatch_mgr_sched.qos, 0);
2001 int p = _dispatch_mgr_sched_qos2prio[_dispatch_mgr_sched.qos];
2002 if (p > param.sched_priority) {
2003 param.sched_priority = p;
2004 }
2005 }
2006 #endif
2007 if (slowpath(_dispatch_mgr_sched.prio > param.sched_priority)) {
2008 return _dispatch_mgr_priority_apply();
2009 }
2010 }
2011 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2012
2013 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2014 DISPATCH_NOINLINE
2015 static void
2016 _dispatch_mgr_priority_raise(const pthread_attr_t *attr)
2017 {
2018 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
2019 struct sched_param param;
2020 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
2021 #if HAVE_PTHREAD_WORKQUEUE_QOS
2022 qos_class_t q, qos = 0;
2023 (void)pthread_attr_get_qos_class_np((pthread_attr_t *)attr, &qos, NULL);
2024 if (qos) {
2025 param.sched_priority = _dispatch_mgr_sched_qos2prio[qos];
2026 os_atomic_rmw_loop2o(&_dispatch_mgr_sched, qos, q, qos, relaxed, {
2027 if (q >= qos) os_atomic_rmw_loop_give_up(break);
2028 });
2029 }
2030 #endif
2031 int p, prio = param.sched_priority;
2032 os_atomic_rmw_loop2o(&_dispatch_mgr_sched, prio, p, prio, relaxed, {
2033 if (p >= prio) os_atomic_rmw_loop_give_up(return);
2034 });
2035 #if DISPATCH_USE_KEVENT_WORKQUEUE
2036 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
2037 _dispatch_root_queues_init_once);
2038 if (_dispatch_kevent_workqueue_enabled) {
2039 pthread_priority_t pp = 0;
2040 if (prio > _dispatch_mgr_sched.default_prio) {
2041 // The values of _PTHREAD_PRIORITY_SCHED_PRI_FLAG and
2042 // _PTHREAD_PRIORITY_ROOTQUEUE_FLAG overlap, but that is not
2043 // problematic in this case, since it the second one is only ever
2044 // used on dq_priority fields.
2045 // We never pass the _PTHREAD_PRIORITY_ROOTQUEUE_FLAG to a syscall,
2046 // it is meaningful to libdispatch only.
2047 pp = (pthread_priority_t)prio | _PTHREAD_PRIORITY_SCHED_PRI_FLAG;
2048 } else if (qos) {
2049 pp = _pthread_qos_class_encode(qos, 0, 0);
2050 }
2051 if (pp) {
2052 int r = _pthread_workqueue_set_event_manager_priority(pp);
2053 (void)dispatch_assume_zero(r);
2054 }
2055 return;
2056 }
2057 #endif
2058 #if DISPATCH_USE_MGR_THREAD
2059 if (_dispatch_mgr_sched.tid) {
2060 return _dispatch_mgr_priority_apply();
2061 }
2062 #endif
2063 }
2064 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2065
2066 #if DISPATCH_USE_KEVENT_WORKQUEUE
2067 void
2068 _dispatch_kevent_workqueue_init(void)
2069 {
2070 // Initialize kevent workqueue support
2071 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
2072 _dispatch_root_queues_init_once);
2073 if (!_dispatch_kevent_workqueue_enabled) return;
2074 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
2075 qos_class_t qos = _dispatch_mgr_sched.qos;
2076 int prio = _dispatch_mgr_sched.prio;
2077 pthread_priority_t pp = 0;
2078 if (qos) {
2079 pp = _pthread_qos_class_encode(qos, 0, 0);
2080 _dispatch_mgr_q.dq_priority = (dispatch_priority_t)pp;
2081 }
2082 if (prio > _dispatch_mgr_sched.default_prio) {
2083 pp = (pthread_priority_t)prio | _PTHREAD_PRIORITY_SCHED_PRI_FLAG;
2084 }
2085 if (pp) {
2086 int r = _pthread_workqueue_set_event_manager_priority(pp);
2087 (void)dispatch_assume_zero(r);
2088 }
2089 }
2090 #endif
2091
2092 #pragma mark -
2093 #pragma mark dispatch_pthread_root_queue
2094
2095 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2096 static dispatch_queue_t
2097 _dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
2098 const pthread_attr_t *attr, dispatch_block_t configure,
2099 dispatch_pthread_root_queue_observer_hooks_t observer_hooks)
2100 {
2101 dispatch_queue_t dq;
2102 dispatch_root_queue_context_t qc;
2103 dispatch_pthread_root_queue_context_t pqc;
2104 dispatch_queue_flags_t dqf = 0;
2105 size_t dqs;
2106 uint8_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ?
2107 (uint8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0;
2108
2109 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
2110 dqs = roundup(dqs, _Alignof(struct dispatch_root_queue_context_s));
2111 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_root), dqs +
2112 sizeof(struct dispatch_root_queue_context_s) +
2113 sizeof(struct dispatch_pthread_root_queue_context_s));
2114 qc = (void*)dq + dqs;
2115 dispatch_assert((uintptr_t)qc % _Alignof(typeof(*qc)) == 0);
2116 pqc = (void*)qc + sizeof(struct dispatch_root_queue_context_s);
2117 dispatch_assert((uintptr_t)pqc % _Alignof(typeof(*pqc)) == 0);
2118 if (label) {
2119 const char *tmp = _dispatch_strdup_if_mutable(label);
2120 if (tmp != label) {
2121 dqf |= DQF_LABEL_NEEDS_FREE;
2122 label = tmp;
2123 }
2124 }
2125
2126 _dispatch_queue_init(dq, dqf, DISPATCH_QUEUE_WIDTH_POOL, false);
2127 dq->dq_label = label;
2128 dq->dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
2129 dq->dq_override = DISPATCH_SATURATED_OVERRIDE;
2130 dq->do_ctxt = qc;
2131 dq->do_targetq = NULL;
2132
2133 pqc->dpq_thread_mediator.do_vtable = DISPATCH_VTABLE(semaphore);
2134 qc->dgq_ctxt = pqc;
2135 #if HAVE_PTHREAD_WORKQUEUES
2136 qc->dgq_kworkqueue = (void*)(~0ul);
2137 #endif
2138 _dispatch_root_queue_init_pthread_pool(qc, pool_size, true);
2139
2140 if (attr) {
2141 memcpy(&pqc->dpq_thread_attr, attr, sizeof(pthread_attr_t));
2142 _dispatch_mgr_priority_raise(&pqc->dpq_thread_attr);
2143 } else {
2144 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
2145 }
2146 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
2147 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
2148 if (configure) {
2149 pqc->dpq_thread_configure = _dispatch_Block_copy(configure);
2150 }
2151 if (observer_hooks) {
2152 pqc->dpq_observer_hooks = *observer_hooks;
2153 }
2154 _dispatch_object_debug(dq, "%s", __func__);
2155 return _dispatch_introspection_queue_create(dq);
2156 }
2157
2158 dispatch_queue_t
2159 dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
2160 const pthread_attr_t *attr, dispatch_block_t configure)
2161 {
2162 return _dispatch_pthread_root_queue_create(label, flags, attr, configure,
2163 NULL);
2164 }
2165
2166 #if DISPATCH_IOHID_SPI
2167 dispatch_queue_t
2168 _dispatch_pthread_root_queue_create_with_observer_hooks_4IOHID(const char *label,
2169 unsigned long flags, const pthread_attr_t *attr,
2170 dispatch_pthread_root_queue_observer_hooks_t observer_hooks,
2171 dispatch_block_t configure)
2172 {
2173 if (!observer_hooks->queue_will_execute ||
2174 !observer_hooks->queue_did_execute) {
2175 DISPATCH_CLIENT_CRASH(0, "Invalid pthread root queue observer hooks");
2176 }
2177 return _dispatch_pthread_root_queue_create(label, flags, attr, configure,
2178 observer_hooks);
2179 }
2180 #endif
2181
2182 dispatch_queue_t
2183 dispatch_pthread_root_queue_copy_current(void)
2184 {
2185 dispatch_queue_t dq = _dispatch_queue_get_current();
2186 if (!dq) return NULL;
2187 while (slowpath(dq->do_targetq)) {
2188 dq = dq->do_targetq;
2189 }
2190 if (dx_type(dq) != DISPATCH_QUEUE_GLOBAL_ROOT_TYPE ||
2191 dq->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
2192 return NULL;
2193 }
2194 return (dispatch_queue_t)_os_object_retain_with_resurrect(dq->_as_os_obj);
2195 }
2196
2197 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2198
2199 void
2200 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq)
2201 {
2202 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
2203 DISPATCH_INTERNAL_CRASH(dq, "Global root queue disposed");
2204 }
2205 _dispatch_object_debug(dq, "%s", __func__);
2206 _dispatch_introspection_queue_dispose(dq);
2207 #if DISPATCH_USE_PTHREAD_POOL
2208 dispatch_root_queue_context_t qc = dq->do_ctxt;
2209 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
2210
2211 pthread_attr_destroy(&pqc->dpq_thread_attr);
2212 _dispatch_semaphore_dispose(&pqc->dpq_thread_mediator);
2213 if (pqc->dpq_thread_configure) {
2214 Block_release(pqc->dpq_thread_configure);
2215 }
2216 dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
2217 false);
2218 #endif
2219 if (dq->dq_label && _dispatch_queue_label_needs_free(dq)) {
2220 free((void*)dq->dq_label);
2221 }
2222 _dispatch_queue_destroy(dq);
2223 }
2224
2225 #pragma mark -
2226 #pragma mark dispatch_queue_specific
2227
2228 struct dispatch_queue_specific_queue_s {
2229 DISPATCH_QUEUE_HEADER(queue_specific_queue);
2230 TAILQ_HEAD(dispatch_queue_specific_head_s,
2231 dispatch_queue_specific_s) dqsq_contexts;
2232 } DISPATCH_QUEUE_ALIGN;
2233
2234 struct dispatch_queue_specific_s {
2235 const void *dqs_key;
2236 void *dqs_ctxt;
2237 dispatch_function_t dqs_destructor;
2238 TAILQ_ENTRY(dispatch_queue_specific_s) dqs_list;
2239 };
2240 DISPATCH_DECL(dispatch_queue_specific);
2241
2242 void
2243 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq)
2244 {
2245 dispatch_queue_specific_t dqs, tmp;
2246
2247 TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) {
2248 if (dqs->dqs_destructor) {
2249 dispatch_async_f(_dispatch_get_root_queue(
2250 _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt,
2251 dqs->dqs_destructor);
2252 }
2253 free(dqs);
2254 }
2255 _dispatch_queue_destroy(dqsq->_as_dq);
2256 }
2257
2258 static void
2259 _dispatch_queue_init_specific(dispatch_queue_t dq)
2260 {
2261 dispatch_queue_specific_queue_t dqsq;
2262
2263 dqsq = _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue),
2264 sizeof(struct dispatch_queue_specific_queue_s));
2265 _dispatch_queue_init(dqsq->_as_dq, DQF_NONE,
2266 DISPATCH_QUEUE_WIDTH_MAX, false);
2267 dqsq->do_xref_cnt = -1;
2268 dqsq->do_targetq = _dispatch_get_root_queue(
2269 _DISPATCH_QOS_CLASS_USER_INITIATED, true);
2270 dqsq->dq_label = "queue-specific";
2271 TAILQ_INIT(&dqsq->dqsq_contexts);
2272 if (slowpath(!os_atomic_cmpxchg2o(dq, dq_specific_q, NULL,
2273 dqsq->_as_dq, release))) {
2274 _dispatch_release(dqsq->_as_dq);
2275 }
2276 }
2277
2278 static void
2279 _dispatch_queue_set_specific(void *ctxt)
2280 {
2281 dispatch_queue_specific_t dqs, dqsn = ctxt;
2282 dispatch_queue_specific_queue_t dqsq =
2283 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
2284
2285 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
2286 if (dqs->dqs_key == dqsn->dqs_key) {
2287 // Destroy previous context for existing key
2288 if (dqs->dqs_destructor) {
2289 dispatch_async_f(_dispatch_get_root_queue(
2290 _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt,
2291 dqs->dqs_destructor);
2292 }
2293 if (dqsn->dqs_ctxt) {
2294 // Copy new context for existing key
2295 dqs->dqs_ctxt = dqsn->dqs_ctxt;
2296 dqs->dqs_destructor = dqsn->dqs_destructor;
2297 } else {
2298 // Remove context storage for existing key
2299 TAILQ_REMOVE(&dqsq->dqsq_contexts, dqs, dqs_list);
2300 free(dqs);
2301 }
2302 return free(dqsn);
2303 }
2304 }
2305 // Insert context storage for new key
2306 TAILQ_INSERT_TAIL(&dqsq->dqsq_contexts, dqsn, dqs_list);
2307 }
2308
2309 DISPATCH_NOINLINE
2310 void
2311 dispatch_queue_set_specific(dispatch_queue_t dq, const void *key,
2312 void *ctxt, dispatch_function_t destructor)
2313 {
2314 if (slowpath(!key)) {
2315 return;
2316 }
2317 dispatch_queue_specific_t dqs;
2318
2319 dqs = _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s));
2320 dqs->dqs_key = key;
2321 dqs->dqs_ctxt = ctxt;
2322 dqs->dqs_destructor = destructor;
2323 if (slowpath(!dq->dq_specific_q)) {
2324 _dispatch_queue_init_specific(dq);
2325 }
2326 _dispatch_barrier_trysync_or_async_f(dq->dq_specific_q, dqs,
2327 _dispatch_queue_set_specific);
2328 }
2329
2330 static void
2331 _dispatch_queue_get_specific(void *ctxt)
2332 {
2333 void **ctxtp = ctxt;
2334 void *key = *ctxtp;
2335 dispatch_queue_specific_queue_t dqsq =
2336 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
2337 dispatch_queue_specific_t dqs;
2338
2339 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
2340 if (dqs->dqs_key == key) {
2341 *ctxtp = dqs->dqs_ctxt;
2342 return;
2343 }
2344 }
2345 *ctxtp = NULL;
2346 }
2347
2348 DISPATCH_NOINLINE
2349 void *
2350 dispatch_queue_get_specific(dispatch_queue_t dq, const void *key)
2351 {
2352 if (slowpath(!key)) {
2353 return NULL;
2354 }
2355 void *ctxt = NULL;
2356
2357 if (fastpath(dq->dq_specific_q)) {
2358 ctxt = (void *)key;
2359 dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific);
2360 }
2361 return ctxt;
2362 }
2363
2364 DISPATCH_NOINLINE
2365 void *
2366 dispatch_get_specific(const void *key)
2367 {
2368 if (slowpath(!key)) {
2369 return NULL;
2370 }
2371 void *ctxt = NULL;
2372 dispatch_queue_t dq = _dispatch_queue_get_current();
2373
2374 while (slowpath(dq)) {
2375 if (slowpath(dq->dq_specific_q)) {
2376 ctxt = (void *)key;
2377 dispatch_sync_f(dq->dq_specific_q, &ctxt,
2378 _dispatch_queue_get_specific);
2379 if (ctxt) break;
2380 }
2381 dq = dq->do_targetq;
2382 }
2383 return ctxt;
2384 }
2385
2386 #if DISPATCH_IOHID_SPI
2387 bool
2388 _dispatch_queue_is_exclusively_owned_by_current_thread_4IOHID(
2389 dispatch_queue_t dq) // rdar://problem/18033810
2390 {
2391 if (dq->dq_width != 1) {
2392 DISPATCH_CLIENT_CRASH(dq->dq_width, "Invalid queue type");
2393 }
2394 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
2395 return _dq_state_drain_locked_by(dq_state, _dispatch_tid_self());
2396 }
2397 #endif
2398
2399 #pragma mark -
2400 #pragma mark dispatch_queue_debug
2401
2402 size_t
2403 _dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
2404 {
2405 size_t offset = 0;
2406 dispatch_queue_t target = dq->do_targetq;
2407 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
2408
2409 offset += dsnprintf(&buf[offset], bufsiz - offset,
2410 "target = %s[%p], width = 0x%x, state = 0x%016llx",
2411 target && target->dq_label ? target->dq_label : "", target,
2412 dq->dq_width, (unsigned long long)dq_state);
2413 if (_dq_state_is_suspended(dq_state)) {
2414 offset += dsnprintf(&buf[offset], bufsiz - offset, ", suspended = %d",
2415 _dq_state_suspend_cnt(dq_state));
2416 }
2417 if (_dq_state_is_inactive(dq_state)) {
2418 offset += dsnprintf(&buf[offset], bufsiz - offset, ", inactive");
2419 } else if (_dq_state_needs_activation(dq_state)) {
2420 offset += dsnprintf(&buf[offset], bufsiz - offset, ", needs-activation");
2421 }
2422 if (_dq_state_is_enqueued(dq_state)) {
2423 offset += dsnprintf(&buf[offset], bufsiz - offset, ", enqueued");
2424 }
2425 if (_dq_state_is_dirty(dq_state)) {
2426 offset += dsnprintf(&buf[offset], bufsiz - offset, ", dirty");
2427 }
2428 if (_dq_state_has_override(dq_state)) {
2429 offset += dsnprintf(&buf[offset], bufsiz - offset, ", async-override");
2430 }
2431 mach_port_t owner = _dq_state_drain_owner(dq_state);
2432 if (!_dispatch_queue_is_thread_bound(dq) && owner) {
2433 offset += dsnprintf(&buf[offset], bufsiz - offset, ", draining on 0x%x",
2434 owner);
2435 }
2436 if (_dq_state_is_in_barrier(dq_state)) {
2437 offset += dsnprintf(&buf[offset], bufsiz - offset, ", in-barrier");
2438 } else {
2439 offset += dsnprintf(&buf[offset], bufsiz - offset, ", in-flight = %d",
2440 _dq_state_used_width(dq_state, dq->dq_width));
2441 }
2442 if (_dq_state_has_pending_barrier(dq_state)) {
2443 offset += dsnprintf(&buf[offset], bufsiz - offset, ", pending-barrier");
2444 }
2445 if (_dispatch_queue_is_thread_bound(dq)) {
2446 offset += dsnprintf(&buf[offset], bufsiz - offset, ", thread = 0x%x ",
2447 owner);
2448 }
2449 return offset;
2450 }
2451
2452 size_t
2453 dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
2454 {
2455 size_t offset = 0;
2456 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2457 dq->dq_label ? dq->dq_label : dx_kind(dq), dq);
2458 offset += _dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
2459 offset += _dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
2460 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2461 return offset;
2462 }
2463
2464 #if DISPATCH_DEBUG
2465 void
2466 dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
2467 if (fastpath(dq)) {
2468 _dispatch_object_debug(dq, "%s", str);
2469 } else {
2470 _dispatch_log("queue[NULL]: %s", str);
2471 }
2472 }
2473 #endif
2474
2475 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
2476 static OSSpinLock _dispatch_stats_lock;
2477 static struct {
2478 uint64_t time_total;
2479 uint64_t count_total;
2480 uint64_t thread_total;
2481 } _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set
2482
2483 static void
2484 _dispatch_queue_merge_stats(uint64_t start)
2485 {
2486 uint64_t delta = _dispatch_absolute_time() - start;
2487 unsigned long count;
2488
2489 count = (unsigned long)_dispatch_thread_getspecific(dispatch_bcounter_key);
2490 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
2491
2492 int bucket = flsl((long)count);
2493
2494 // 64-bit counters on 32-bit require a lock or a queue
2495 OSSpinLockLock(&_dispatch_stats_lock);
2496
2497 _dispatch_stats[bucket].time_total += delta;
2498 _dispatch_stats[bucket].count_total += count;
2499 _dispatch_stats[bucket].thread_total++;
2500
2501 OSSpinLockUnlock(&_dispatch_stats_lock);
2502 }
2503 #endif
2504
2505 #pragma mark -
2506 #pragma mark _dispatch_set_priority_and_mach_voucher
2507 #if HAVE_PTHREAD_WORKQUEUE_QOS
2508
2509 DISPATCH_NOINLINE
2510 void
2511 _dispatch_set_priority_and_mach_voucher_slow(pthread_priority_t pp,
2512 mach_voucher_t kv)
2513 {
2514 _pthread_set_flags_t pflags = 0;
2515 if (pp && _dispatch_set_qos_class_enabled) {
2516 pthread_priority_t old_pri = _dispatch_get_priority();
2517 if (pp != old_pri) {
2518 if (old_pri & _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG) {
2519 pflags |= _PTHREAD_SET_SELF_WQ_KEVENT_UNBIND;
2520 // when we unbind, overcomitness can flip, so we need to learn
2521 // it from the defaultpri, see _dispatch_priority_compute_update
2522 pp |= (_dispatch_get_defaultpriority() &
2523 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
2524 } else {
2525 // else we need to keep the one that is set in the current pri
2526 pp |= (old_pri & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
2527 }
2528 if (likely(old_pri & ~_PTHREAD_PRIORITY_FLAGS_MASK)) {
2529 pflags |= _PTHREAD_SET_SELF_QOS_FLAG;
2530 }
2531 if (unlikely(DISPATCH_QUEUE_DRAIN_OWNER(&_dispatch_mgr_q) ==
2532 _dispatch_tid_self())) {
2533 DISPATCH_INTERNAL_CRASH(pp,
2534 "Changing the QoS while on the manager queue");
2535 }
2536 if (unlikely(pp & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) {
2537 DISPATCH_INTERNAL_CRASH(pp, "Cannot raise oneself to manager");
2538 }
2539 if (old_pri & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG) {
2540 DISPATCH_INTERNAL_CRASH(old_pri,
2541 "Cannot turn a manager thread into a normal one");
2542 }
2543 }
2544 }
2545 if (kv != VOUCHER_NO_MACH_VOUCHER) {
2546 #if VOUCHER_USE_MACH_VOUCHER
2547 pflags |= _PTHREAD_SET_SELF_VOUCHER_FLAG;
2548 #endif
2549 }
2550 if (!pflags) return;
2551 int r = _pthread_set_properties_self(pflags, pp, kv);
2552 if (r == EINVAL) {
2553 DISPATCH_INTERNAL_CRASH(pp, "_pthread_set_properties_self failed");
2554 }
2555 (void)dispatch_assume_zero(r);
2556 }
2557
2558 DISPATCH_NOINLINE
2559 voucher_t
2560 _dispatch_set_priority_and_voucher_slow(pthread_priority_t priority,
2561 voucher_t v, _dispatch_thread_set_self_t flags)
2562 {
2563 voucher_t ov = DISPATCH_NO_VOUCHER;
2564 mach_voucher_t kv = VOUCHER_NO_MACH_VOUCHER;
2565 if (v != DISPATCH_NO_VOUCHER) {
2566 bool retained = flags & DISPATCH_VOUCHER_CONSUME;
2567 ov = _voucher_get();
2568 if (ov == v && (flags & DISPATCH_VOUCHER_REPLACE)) {
2569 if (retained && v) _voucher_release_no_dispose(v);
2570 ov = DISPATCH_NO_VOUCHER;
2571 } else {
2572 if (!retained && v) _voucher_retain(v);
2573 kv = _voucher_swap_and_get_mach_voucher(ov, v);
2574 }
2575 }
2576 #if !PTHREAD_WORKQUEUE_RESETS_VOUCHER_AND_PRIORITY_ON_PARK
2577 flags &= ~(_dispatch_thread_set_self_t)DISPATCH_THREAD_PARK;
2578 #endif
2579 if (!(flags & DISPATCH_THREAD_PARK)) {
2580 _dispatch_set_priority_and_mach_voucher_slow(priority, kv);
2581 }
2582 if (ov != DISPATCH_NO_VOUCHER && (flags & DISPATCH_VOUCHER_REPLACE)) {
2583 if (ov) _voucher_release(ov);
2584 ov = DISPATCH_NO_VOUCHER;
2585 }
2586 return ov;
2587 }
2588 #endif
2589 #pragma mark -
2590 #pragma mark dispatch_continuation_t
2591
2592 static void
2593 _dispatch_force_cache_cleanup(void)
2594 {
2595 dispatch_continuation_t dc;
2596 dc = _dispatch_thread_getspecific(dispatch_cache_key);
2597 if (dc) {
2598 _dispatch_thread_setspecific(dispatch_cache_key, NULL);
2599 _dispatch_cache_cleanup(dc);
2600 }
2601 }
2602
2603 DISPATCH_NOINLINE
2604 static void
2605 _dispatch_cache_cleanup(void *value)
2606 {
2607 dispatch_continuation_t dc, next_dc = value;
2608
2609 while ((dc = next_dc)) {
2610 next_dc = dc->do_next;
2611 _dispatch_continuation_free_to_heap(dc);
2612 }
2613 }
2614
2615 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
2616 DISPATCH_NOINLINE
2617 void
2618 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc)
2619 {
2620 _dispatch_continuation_free_to_heap(dc);
2621 dispatch_continuation_t next_dc;
2622 dc = _dispatch_thread_getspecific(dispatch_cache_key);
2623 int cnt;
2624 if (!dc || (cnt = dc->dc_cache_cnt -
2625 _dispatch_continuation_cache_limit) <= 0){
2626 return;
2627 }
2628 do {
2629 next_dc = dc->do_next;
2630 _dispatch_continuation_free_to_heap(dc);
2631 } while (--cnt && (dc = next_dc));
2632 _dispatch_thread_setspecific(dispatch_cache_key, next_dc);
2633 }
2634 #endif
2635
2636 DISPATCH_ALWAYS_INLINE_NDEBUG
2637 static inline void
2638 _dispatch_continuation_slow_item_signal(dispatch_queue_t dq,
2639 dispatch_object_t dou)
2640 {
2641 dispatch_continuation_t dc = dou._dc;
2642 pthread_priority_t pp = dq->dq_override;
2643
2644 _dispatch_trace_continuation_pop(dq, dc);
2645 if (pp > (dc->dc_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
2646 _dispatch_wqthread_override_start((mach_port_t)dc->dc_data, pp);
2647 }
2648 _dispatch_thread_event_signal((dispatch_thread_event_t)dc->dc_other);
2649 _dispatch_introspection_queue_item_complete(dc);
2650 }
2651
2652 DISPATCH_NOINLINE
2653 static void
2654 _dispatch_continuation_push(dispatch_queue_t dq, dispatch_continuation_t dc)
2655 {
2656 _dispatch_queue_push(dq, dc,
2657 _dispatch_continuation_get_override_priority(dq, dc));
2658 }
2659
2660 DISPATCH_NOINLINE
2661 static void
2662 _dispatch_continuation_push_sync_slow(dispatch_queue_t dq,
2663 dispatch_continuation_t dc)
2664 {
2665 _dispatch_queue_push_inline(dq, dc,
2666 _dispatch_continuation_get_override_priority(dq, dc),
2667 DISPATCH_WAKEUP_SLOW_WAITER);
2668 }
2669
2670 DISPATCH_ALWAYS_INLINE
2671 static inline void
2672 _dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
2673 bool barrier)
2674 {
2675 if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
2676 return _dispatch_continuation_push(dq, dc);
2677 }
2678 return _dispatch_async_f2(dq, dc);
2679 }
2680
2681 DISPATCH_NOINLINE
2682 void
2683 _dispatch_continuation_async(dispatch_queue_t dq, dispatch_continuation_t dc)
2684 {
2685 _dispatch_continuation_async2(dq, dc,
2686 dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT);
2687 }
2688
2689 #pragma mark -
2690 #pragma mark dispatch_block_create
2691
2692 #if __BLOCKS__
2693
2694 DISPATCH_ALWAYS_INLINE
2695 static inline bool
2696 _dispatch_block_flags_valid(dispatch_block_flags_t flags)
2697 {
2698 return ((flags & ~DISPATCH_BLOCK_API_MASK) == 0);
2699 }
2700
2701 DISPATCH_ALWAYS_INLINE
2702 static inline dispatch_block_flags_t
2703 _dispatch_block_normalize_flags(dispatch_block_flags_t flags)
2704 {
2705 if (flags & (DISPATCH_BLOCK_NO_VOUCHER|DISPATCH_BLOCK_DETACHED)) {
2706 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2707 }
2708 if (flags & (DISPATCH_BLOCK_NO_QOS_CLASS|DISPATCH_BLOCK_DETACHED)) {
2709 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2710 }
2711 return flags;
2712 }
2713
2714 static inline dispatch_block_t
2715 _dispatch_block_create_with_voucher_and_priority(dispatch_block_flags_t flags,
2716 voucher_t voucher, pthread_priority_t pri, dispatch_block_t block)
2717 {
2718 flags = _dispatch_block_normalize_flags(flags);
2719 bool assign = (flags & DISPATCH_BLOCK_ASSIGN_CURRENT);
2720
2721 if (assign && !(flags & DISPATCH_BLOCK_HAS_VOUCHER)) {
2722 #if OS_VOUCHER_ACTIVITY_SPI
2723 voucher = VOUCHER_CURRENT;
2724 #endif
2725 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2726 }
2727 #if OS_VOUCHER_ACTIVITY_SPI
2728 if (voucher == VOUCHER_CURRENT) {
2729 voucher = _voucher_get();
2730 }
2731 #endif
2732 if (assign && !(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
2733 pri = _dispatch_priority_propagate();
2734 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2735 }
2736 dispatch_block_t db = _dispatch_block_create(flags, voucher, pri, block);
2737 #if DISPATCH_DEBUG
2738 dispatch_assert(_dispatch_block_get_data(db));
2739 #endif
2740 return db;
2741 }
2742
2743 dispatch_block_t
2744 dispatch_block_create(dispatch_block_flags_t flags, dispatch_block_t block)
2745 {
2746 if (!_dispatch_block_flags_valid(flags)) return DISPATCH_BAD_INPUT;
2747 return _dispatch_block_create_with_voucher_and_priority(flags, NULL, 0,
2748 block);
2749 }
2750
2751 dispatch_block_t
2752 dispatch_block_create_with_qos_class(dispatch_block_flags_t flags,
2753 dispatch_qos_class_t qos_class, int relative_priority,
2754 dispatch_block_t block)
2755 {
2756 if (!_dispatch_block_flags_valid(flags) ||
2757 !_dispatch_qos_class_valid(qos_class, relative_priority)) {
2758 return DISPATCH_BAD_INPUT;
2759 }
2760 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2761 pthread_priority_t pri = 0;
2762 #if HAVE_PTHREAD_WORKQUEUE_QOS
2763 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0);
2764 #endif
2765 return _dispatch_block_create_with_voucher_and_priority(flags, NULL,
2766 pri, block);
2767 }
2768
2769 dispatch_block_t
2770 dispatch_block_create_with_voucher(dispatch_block_flags_t flags,
2771 voucher_t voucher, dispatch_block_t block)
2772 {
2773 if (!_dispatch_block_flags_valid(flags)) return DISPATCH_BAD_INPUT;
2774 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2775 return _dispatch_block_create_with_voucher_and_priority(flags, voucher, 0,
2776 block);
2777 }
2778
2779 dispatch_block_t
2780 dispatch_block_create_with_voucher_and_qos_class(dispatch_block_flags_t flags,
2781 voucher_t voucher, dispatch_qos_class_t qos_class,
2782 int relative_priority, dispatch_block_t block)
2783 {
2784 if (!_dispatch_block_flags_valid(flags) ||
2785 !_dispatch_qos_class_valid(qos_class, relative_priority)) {
2786 return DISPATCH_BAD_INPUT;
2787 }
2788 flags |= (DISPATCH_BLOCK_HAS_VOUCHER|DISPATCH_BLOCK_HAS_PRIORITY);
2789 pthread_priority_t pri = 0;
2790 #if HAVE_PTHREAD_WORKQUEUE_QOS
2791 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0);
2792 #endif
2793 return _dispatch_block_create_with_voucher_and_priority(flags, voucher,
2794 pri, block);
2795 }
2796
2797 void
2798 dispatch_block_perform(dispatch_block_flags_t flags, dispatch_block_t block)
2799 {
2800 if (!_dispatch_block_flags_valid(flags)) {
2801 DISPATCH_CLIENT_CRASH(flags, "Invalid flags passed to "
2802 "dispatch_block_perform()");
2803 }
2804 flags = _dispatch_block_normalize_flags(flags);
2805 struct dispatch_block_private_data_s dbpds =
2806 DISPATCH_BLOCK_PRIVATE_DATA_PERFORM_INITIALIZER(flags, block);
2807 return _dispatch_block_invoke_direct(&dbpds);
2808 }
2809
2810 #define _dbpd_group(dbpd) ((dbpd)->dbpd_group)
2811
2812 void
2813 _dispatch_block_invoke_direct(const struct dispatch_block_private_data_s *dbcpd)
2814 {
2815 dispatch_block_private_data_t dbpd = (dispatch_block_private_data_t)dbcpd;
2816 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2817 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2818 if (slowpath(atomic_flags & DBF_WAITED)) {
2819 DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
2820 "run more than once and waited for");
2821 }
2822 if (atomic_flags & DBF_CANCELED) goto out;
2823
2824 pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY;
2825 _dispatch_thread_set_self_t adopt_flags = 0;
2826 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
2827 op = _dispatch_get_priority();
2828 p = dbpd->dbpd_priority;
2829 if (_dispatch_block_sync_should_enforce_qos_class(flags)) {
2830 adopt_flags |= DISPATCH_PRIORITY_ENFORCE;
2831 }
2832 }
2833 voucher_t ov, v = DISPATCH_NO_VOUCHER;
2834 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2835 v = dbpd->dbpd_voucher;
2836 }
2837 ov = _dispatch_adopt_priority_and_set_voucher(p, v, adopt_flags);
2838 dbpd->dbpd_thread = _dispatch_tid_self();
2839 _dispatch_client_callout(dbpd->dbpd_block,
2840 _dispatch_Block_invoke(dbpd->dbpd_block));
2841 _dispatch_reset_priority_and_voucher(op, ov);
2842 out:
2843 if ((atomic_flags & DBF_PERFORM) == 0) {
2844 if (os_atomic_inc2o(dbpd, dbpd_performed, relaxed) == 1) {
2845 dispatch_group_leave(_dbpd_group(dbpd));
2846 }
2847 }
2848 }
2849
2850 void
2851 _dispatch_block_sync_invoke(void *block)
2852 {
2853 dispatch_block_t b = block;
2854 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
2855 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2856 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2857 if (slowpath(atomic_flags & DBF_WAITED)) {
2858 DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
2859 "run more than once and waited for");
2860 }
2861 if (atomic_flags & DBF_CANCELED) goto out;
2862
2863 pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY;
2864 _dispatch_thread_set_self_t adopt_flags = 0;
2865 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
2866 op = _dispatch_get_priority();
2867 p = dbpd->dbpd_priority;
2868 if (_dispatch_block_sync_should_enforce_qos_class(flags)) {
2869 adopt_flags |= DISPATCH_PRIORITY_ENFORCE;
2870 }
2871 }
2872 voucher_t ov, v = DISPATCH_NO_VOUCHER;
2873 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2874 v = dbpd->dbpd_voucher;
2875 }
2876 ov = _dispatch_adopt_priority_and_set_voucher(p, v, adopt_flags);
2877 dbpd->dbpd_block();
2878 _dispatch_reset_priority_and_voucher(op, ov);
2879 out:
2880 if ((atomic_flags & DBF_PERFORM) == 0) {
2881 if (os_atomic_inc2o(dbpd, dbpd_performed, relaxed) == 1) {
2882 dispatch_group_leave(_dbpd_group(dbpd));
2883 }
2884 }
2885
2886 os_mpsc_queue_t oq;
2887 oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
2888 if (oq) {
2889 // balances dispatch_{,barrier_,}sync
2890 _os_object_release_internal(oq->_as_os_obj);
2891 }
2892 }
2893
2894 DISPATCH_ALWAYS_INLINE
2895 static void
2896 _dispatch_block_async_invoke2(dispatch_block_t b, bool release)
2897 {
2898 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
2899 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2900 if (slowpath(atomic_flags & DBF_WAITED)) {
2901 DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
2902 "run more than once and waited for");
2903 }
2904 if (!slowpath(atomic_flags & DBF_CANCELED)) {
2905 dbpd->dbpd_block();
2906 }
2907 if ((atomic_flags & DBF_PERFORM) == 0) {
2908 if (os_atomic_inc2o(dbpd, dbpd_performed, relaxed) == 1) {
2909 dispatch_group_leave(_dbpd_group(dbpd));
2910 }
2911 }
2912 os_mpsc_queue_t oq;
2913 oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
2914 if (oq) {
2915 // balances dispatch_{,barrier_,group_}async
2916 _os_object_release_internal_inline(oq->_as_os_obj);
2917 }
2918 if (release) {
2919 Block_release(b);
2920 }
2921 }
2922
2923 static void
2924 _dispatch_block_async_invoke(void *block)
2925 {
2926 _dispatch_block_async_invoke2(block, false);
2927 }
2928
2929 static void
2930 _dispatch_block_async_invoke_and_release(void *block)
2931 {
2932 _dispatch_block_async_invoke2(block, true);
2933 }
2934
2935 void
2936 dispatch_block_cancel(dispatch_block_t db)
2937 {
2938 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2939 if (!dbpd) {
2940 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
2941 "dispatch_block_cancel()");
2942 }
2943 (void)os_atomic_or2o(dbpd, dbpd_atomic_flags, DBF_CANCELED, relaxed);
2944 }
2945
2946 long
2947 dispatch_block_testcancel(dispatch_block_t db)
2948 {
2949 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2950 if (!dbpd) {
2951 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
2952 "dispatch_block_testcancel()");
2953 }
2954 return (bool)(dbpd->dbpd_atomic_flags & DBF_CANCELED);
2955 }
2956
2957 long
2958 dispatch_block_wait(dispatch_block_t db, dispatch_time_t timeout)
2959 {
2960 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2961 if (!dbpd) {
2962 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
2963 "dispatch_block_wait()");
2964 }
2965
2966 unsigned int flags = os_atomic_or_orig2o(dbpd, dbpd_atomic_flags,
2967 DBF_WAITING, relaxed);
2968 if (slowpath(flags & (DBF_WAITED | DBF_WAITING))) {
2969 DISPATCH_CLIENT_CRASH(flags, "A block object may not be waited for "
2970 "more than once");
2971 }
2972
2973 // <rdar://problem/17703192> If we know the queue where this block is
2974 // enqueued, or the thread that's executing it, then we should boost
2975 // it here.
2976
2977 pthread_priority_t pp = _dispatch_get_priority();
2978
2979 os_mpsc_queue_t boost_oq;
2980 boost_oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
2981 if (boost_oq) {
2982 // release balances dispatch_{,barrier_,group_}async.
2983 // Can't put the queue back in the timeout case: the block might
2984 // finish after we fell out of group_wait and see our NULL, so
2985 // neither of us would ever release. Side effect: After a _wait
2986 // that times out, subsequent waits will not boost the qos of the
2987 // still-running block.
2988 dx_wakeup(boost_oq, pp, DISPATCH_WAKEUP_OVERRIDING |
2989 DISPATCH_WAKEUP_CONSUME);
2990 }
2991
2992 mach_port_t boost_th = dbpd->dbpd_thread;
2993 if (boost_th) {
2994 _dispatch_thread_override_start(boost_th, pp, dbpd);
2995 }
2996
2997 int performed = os_atomic_load2o(dbpd, dbpd_performed, relaxed);
2998 if (slowpath(performed > 1 || (boost_th && boost_oq))) {
2999 DISPATCH_CLIENT_CRASH(performed, "A block object may not be both "
3000 "run more than once and waited for");
3001 }
3002
3003 long ret = dispatch_group_wait(_dbpd_group(dbpd), timeout);
3004
3005 if (boost_th) {
3006 _dispatch_thread_override_end(boost_th, dbpd);
3007 }
3008
3009 if (ret) {
3010 // timed out: reverse our changes
3011 (void)os_atomic_and2o(dbpd, dbpd_atomic_flags,
3012 ~DBF_WAITING, relaxed);
3013 } else {
3014 (void)os_atomic_or2o(dbpd, dbpd_atomic_flags,
3015 DBF_WAITED, relaxed);
3016 // don't need to re-test here: the second call would see
3017 // the first call's WAITING
3018 }
3019
3020 return ret;
3021 }
3022
3023 void
3024 dispatch_block_notify(dispatch_block_t db, dispatch_queue_t queue,
3025 dispatch_block_t notification_block)
3026 {
3027 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
3028 if (!dbpd) {
3029 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
3030 "dispatch_block_notify()");
3031 }
3032 int performed = os_atomic_load2o(dbpd, dbpd_performed, relaxed);
3033 if (slowpath(performed > 1)) {
3034 DISPATCH_CLIENT_CRASH(performed, "A block object may not be both "
3035 "run more than once and observed");
3036 }
3037
3038 return dispatch_group_notify(_dbpd_group(dbpd), queue, notification_block);
3039 }
3040
3041 DISPATCH_NOINLINE
3042 void
3043 _dispatch_continuation_init_slow(dispatch_continuation_t dc,
3044 dispatch_queue_class_t dqu, dispatch_block_flags_t flags)
3045 {
3046 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(dc->dc_ctxt);
3047 dispatch_block_flags_t block_flags = dbpd->dbpd_flags;
3048 uintptr_t dc_flags = dc->dc_flags;
3049 os_mpsc_queue_t oq = dqu._oq;
3050
3051 // balanced in d_block_async_invoke_and_release or d_block_wait
3052 if (os_atomic_cmpxchg2o(dbpd, dbpd_queue, NULL, oq, relaxed)) {
3053 _os_object_retain_internal_inline(oq->_as_os_obj);
3054 }
3055
3056 if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
3057 dc->dc_func = _dispatch_block_async_invoke_and_release;
3058 } else {
3059 dc->dc_func = _dispatch_block_async_invoke;
3060 }
3061
3062 flags |= block_flags;
3063 if (block_flags & DISPATCH_BLOCK_HAS_PRIORITY) {
3064 _dispatch_continuation_priority_set(dc, dbpd->dbpd_priority, flags);
3065 } else {
3066 _dispatch_continuation_priority_set(dc, dc->dc_priority, flags);
3067 }
3068 if (block_flags & DISPATCH_BLOCK_BARRIER) {
3069 dc_flags |= DISPATCH_OBJ_BARRIER_BIT;
3070 }
3071 if (block_flags & DISPATCH_BLOCK_HAS_VOUCHER) {
3072 voucher_t v = dbpd->dbpd_voucher;
3073 dc->dc_voucher = v ? _voucher_retain(v) : NULL;
3074 dc_flags |= DISPATCH_OBJ_ENFORCE_VOUCHER;
3075 _dispatch_voucher_debug("continuation[%p] set", dc->dc_voucher, dc);
3076 _dispatch_voucher_ktrace_dc_push(dc);
3077 } else {
3078 _dispatch_continuation_voucher_set(dc, oq, flags);
3079 }
3080 dc_flags |= DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT;
3081 dc->dc_flags = dc_flags;
3082 }
3083
3084 void
3085 _dispatch_continuation_update_bits(dispatch_continuation_t dc,
3086 uintptr_t dc_flags)
3087 {
3088 dc->dc_flags = dc_flags;
3089 if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
3090 if (dc_flags & DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT) {
3091 dc->dc_func = _dispatch_block_async_invoke_and_release;
3092 } else if (dc_flags & DISPATCH_OBJ_BLOCK_BIT) {
3093 dc->dc_func = _dispatch_call_block_and_release;
3094 }
3095 } else {
3096 if (dc_flags & DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT) {
3097 dc->dc_func = _dispatch_block_async_invoke;
3098 } else if (dc_flags & DISPATCH_OBJ_BLOCK_BIT) {
3099 dc->dc_func = _dispatch_Block_invoke(dc->dc_ctxt);
3100 }
3101 }
3102 }
3103
3104 #endif // __BLOCKS__
3105
3106 #pragma mark -
3107 #pragma mark dispatch_barrier_async
3108
3109 DISPATCH_NOINLINE
3110 static void
3111 _dispatch_async_f_slow(dispatch_queue_t dq, void *ctxt,
3112 dispatch_function_t func, pthread_priority_t pp,
3113 dispatch_block_flags_t flags, uintptr_t dc_flags)
3114 {
3115 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
3116 _dispatch_continuation_init_f(dc, dq, ctxt, func, pp, flags, dc_flags);
3117 _dispatch_continuation_async(dq, dc);
3118 }
3119
3120 DISPATCH_ALWAYS_INLINE
3121 static inline void
3122 _dispatch_barrier_async_f2(dispatch_queue_t dq, void *ctxt,
3123 dispatch_function_t func, pthread_priority_t pp,
3124 dispatch_block_flags_t flags)
3125 {
3126 dispatch_continuation_t dc = _dispatch_continuation_alloc_cacheonly();
3127 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
3128
3129 if (!fastpath(dc)) {
3130 return _dispatch_async_f_slow(dq, ctxt, func, pp, flags, dc_flags);
3131 }
3132
3133 _dispatch_continuation_init_f(dc, dq, ctxt, func, pp, flags, dc_flags);
3134 _dispatch_continuation_push(dq, dc);
3135 }
3136
3137 DISPATCH_NOINLINE
3138 void
3139 dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
3140 dispatch_function_t func)
3141 {
3142 _dispatch_barrier_async_f2(dq, ctxt, func, 0, 0);
3143 }
3144
3145 DISPATCH_NOINLINE
3146 void
3147 _dispatch_barrier_async_detached_f(dispatch_queue_t dq, void *ctxt,
3148 dispatch_function_t func)
3149 {
3150 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3151 dc->dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
3152 dc->dc_func = func;
3153 dc->dc_ctxt = ctxt;
3154 dc->dc_voucher = DISPATCH_NO_VOUCHER;
3155 dc->dc_priority = DISPATCH_NO_PRIORITY;
3156 _dispatch_queue_push(dq, dc, 0);
3157 }
3158
3159 #ifdef __BLOCKS__
3160 void
3161 dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void))
3162 {
3163 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3164 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
3165
3166 _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
3167 _dispatch_continuation_push(dq, dc);
3168 }
3169 #endif
3170
3171 #pragma mark -
3172 #pragma mark dispatch_async
3173
3174 void
3175 _dispatch_async_redirect_invoke(dispatch_continuation_t dc,
3176 dispatch_invoke_flags_t flags)
3177 {
3178 dispatch_thread_frame_s dtf;
3179 struct dispatch_continuation_s *other_dc = dc->dc_other;
3180 dispatch_invoke_flags_t ctxt_flags = (dispatch_invoke_flags_t)dc->dc_ctxt;
3181 // if we went through _dispatch_root_queue_push_override,
3182 // the "right" root queue was stuffed into dc_func
3183 dispatch_queue_t assumed_rq = (dispatch_queue_t)dc->dc_func;
3184 dispatch_queue_t dq = dc->dc_data, rq, old_dq;
3185 struct _dispatch_identity_s di;
3186
3187 pthread_priority_t op, dp, old_dp;
3188
3189 if (ctxt_flags) {
3190 flags &= ~_DISPATCH_INVOKE_AUTORELEASE_MASK;
3191 flags |= ctxt_flags;
3192 }
3193 old_dq = _dispatch_get_current_queue();
3194 if (assumed_rq) {
3195 _dispatch_queue_set_current(assumed_rq);
3196 _dispatch_root_queue_identity_assume(&di, 0);
3197 }
3198
3199 old_dp = _dispatch_set_defaultpriority(dq->dq_priority, &dp);
3200 op = dq->dq_override;
3201 if (op > (dp & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
3202 _dispatch_wqthread_override_start(_dispatch_tid_self(), op);
3203 // Ensure that the root queue sees that this thread was overridden.
3204 _dispatch_set_defaultpriority_override();
3205 }
3206
3207 _dispatch_thread_frame_push(&dtf, dq);
3208 _dispatch_continuation_pop_forwarded(dc, DISPATCH_NO_VOUCHER,
3209 DISPATCH_OBJ_CONSUME_BIT, {
3210 _dispatch_continuation_pop(other_dc, dq, flags);
3211 });
3212 _dispatch_thread_frame_pop(&dtf);
3213 if (assumed_rq) {
3214 _dispatch_root_queue_identity_restore(&di);
3215 _dispatch_queue_set_current(old_dq);
3216 }
3217 _dispatch_reset_defaultpriority(old_dp);
3218
3219 rq = dq->do_targetq;
3220 while (slowpath(rq->do_targetq) && rq != old_dq) {
3221 _dispatch_non_barrier_complete(rq);
3222 rq = rq->do_targetq;
3223 }
3224
3225 _dispatch_non_barrier_complete(dq);
3226
3227 if (dtf.dtf_deferred) {
3228 struct dispatch_object_s *dou = dtf.dtf_deferred;
3229 return _dispatch_queue_drain_deferred_invoke(dq, flags, 0, dou);
3230 }
3231
3232 _dispatch_release_tailcall(dq);
3233 }
3234
3235 DISPATCH_ALWAYS_INLINE
3236 static inline dispatch_continuation_t
3237 _dispatch_async_redirect_wrap(dispatch_queue_t dq, dispatch_object_t dou)
3238 {
3239 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3240
3241 dou._do->do_next = NULL;
3242 dc->do_vtable = DC_VTABLE(ASYNC_REDIRECT);
3243 dc->dc_func = NULL;
3244 dc->dc_ctxt = (void *)(uintptr_t)_dispatch_queue_autorelease_frequency(dq);
3245 dc->dc_data = dq;
3246 dc->dc_other = dou._do;
3247 dc->dc_voucher = DISPATCH_NO_VOUCHER;
3248 dc->dc_priority = DISPATCH_NO_PRIORITY;
3249 _dispatch_retain(dq);
3250 return dc;
3251 }
3252
3253 DISPATCH_NOINLINE
3254 static void
3255 _dispatch_async_f_redirect(dispatch_queue_t dq,
3256 dispatch_object_t dou, pthread_priority_t pp)
3257 {
3258 if (!slowpath(_dispatch_object_is_redirection(dou))) {
3259 dou._dc = _dispatch_async_redirect_wrap(dq, dou);
3260 }
3261 dq = dq->do_targetq;
3262
3263 // Find the queue to redirect to
3264 while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
3265 if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
3266 break;
3267 }
3268 if (!dou._dc->dc_ctxt) {
3269 // find first queue in descending target queue order that has
3270 // an autorelease frequency set, and use that as the frequency for
3271 // this continuation.
3272 dou._dc->dc_ctxt = (void *)
3273 (uintptr_t)_dispatch_queue_autorelease_frequency(dq);
3274 }
3275 dq = dq->do_targetq;
3276 }
3277
3278 _dispatch_queue_push(dq, dou, pp);
3279 }
3280
3281 DISPATCH_ALWAYS_INLINE
3282 static inline void
3283 _dispatch_continuation_redirect(dispatch_queue_t dq,
3284 struct dispatch_object_s *dc)
3285 {
3286 _dispatch_trace_continuation_pop(dq, dc);
3287 // This is a re-redirect, overrides have already been applied
3288 // by _dispatch_async_f2.
3289 // However we want to end up on the root queue matching `dc` qos, so pick up
3290 // the current override of `dq` which includes dc's overrde (and maybe more)
3291 _dispatch_async_f_redirect(dq, dc, dq->dq_override);
3292 _dispatch_introspection_queue_item_complete(dc);
3293 }
3294
3295 DISPATCH_NOINLINE
3296 static void
3297 _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
3298 {
3299 // <rdar://problem/24738102&24743140> reserving non barrier width
3300 // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
3301 // equivalent), so we have to check that this thread hasn't enqueued
3302 // anything ahead of this call or we can break ordering
3303 if (slowpath(dq->dq_items_tail)) {
3304 return _dispatch_continuation_push(dq, dc);
3305 }
3306
3307 if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {
3308 return _dispatch_continuation_push(dq, dc);
3309 }
3310
3311 return _dispatch_async_f_redirect(dq, dc,
3312 _dispatch_continuation_get_override_priority(dq, dc));
3313 }
3314
3315 DISPATCH_ALWAYS_INLINE
3316 static inline void
3317 _dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3318 pthread_priority_t pp, dispatch_block_flags_t flags)
3319 {
3320 dispatch_continuation_t dc = _dispatch_continuation_alloc_cacheonly();
3321 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
3322
3323 if (!fastpath(dc)) {
3324 return _dispatch_async_f_slow(dq, ctxt, func, pp, flags, dc_flags);
3325 }
3326
3327 _dispatch_continuation_init_f(dc, dq, ctxt, func, pp, flags, dc_flags);
3328 _dispatch_continuation_async2(dq, dc, false);
3329 }
3330
3331 DISPATCH_NOINLINE
3332 void
3333 dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
3334 {
3335 _dispatch_async_f(dq, ctxt, func, 0, 0);
3336 }
3337
3338 DISPATCH_NOINLINE
3339 void
3340 dispatch_async_enforce_qos_class_f(dispatch_queue_t dq, void *ctxt,
3341 dispatch_function_t func)
3342 {
3343 _dispatch_async_f(dq, ctxt, func, 0, DISPATCH_BLOCK_ENFORCE_QOS_CLASS);
3344 }
3345
3346 #ifdef __BLOCKS__
3347 void
3348 dispatch_async(dispatch_queue_t dq, void (^work)(void))
3349 {
3350 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3351 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
3352
3353 _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
3354 _dispatch_continuation_async(dq, dc);
3355 }
3356 #endif
3357
3358 #pragma mark -
3359 #pragma mark dispatch_group_async
3360
3361 DISPATCH_ALWAYS_INLINE
3362 static inline void
3363 _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
3364 dispatch_continuation_t dc)
3365 {
3366 dispatch_group_enter(dg);
3367 dc->dc_data = dg;
3368 _dispatch_continuation_async(dq, dc);
3369 }
3370
3371 DISPATCH_NOINLINE
3372 void
3373 dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
3374 dispatch_function_t func)
3375 {
3376 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3377 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_GROUP_BIT;
3378
3379 _dispatch_continuation_init_f(dc, dq, ctxt, func, 0, 0, dc_flags);
3380 _dispatch_continuation_group_async(dg, dq, dc);
3381 }
3382
3383 #ifdef __BLOCKS__
3384 void
3385 dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
3386 dispatch_block_t db)
3387 {
3388 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3389 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_GROUP_BIT;
3390
3391 _dispatch_continuation_init(dc, dq, db, 0, 0, dc_flags);
3392 _dispatch_continuation_group_async(dg, dq, dc);
3393 }
3394 #endif
3395
3396 #pragma mark -
3397 #pragma mark dispatch_sync / dispatch_barrier_sync recurse and invoke
3398
3399 DISPATCH_NOINLINE
3400 static void
3401 _dispatch_sync_function_invoke_slow(dispatch_queue_t dq, void *ctxt,
3402 dispatch_function_t func)
3403 {
3404 voucher_t ov;
3405 dispatch_thread_frame_s dtf;
3406 _dispatch_thread_frame_push(&dtf, dq);
3407 ov = _dispatch_set_priority_and_voucher(0, dq->dq_override_voucher, 0);
3408 _dispatch_client_callout(ctxt, func);
3409 _dispatch_perfmon_workitem_inc();
3410 _dispatch_reset_voucher(ov, 0);
3411 _dispatch_thread_frame_pop(&dtf);
3412 }
3413
3414 DISPATCH_ALWAYS_INLINE
3415 static inline void
3416 _dispatch_sync_function_invoke_inline(dispatch_queue_t dq, void *ctxt,
3417 dispatch_function_t func)
3418 {
3419 if (slowpath(dq->dq_override_voucher != DISPATCH_NO_VOUCHER)) {
3420 return _dispatch_sync_function_invoke_slow(dq, ctxt, func);
3421 }
3422 dispatch_thread_frame_s dtf;
3423 _dispatch_thread_frame_push(&dtf, dq);
3424 _dispatch_client_callout(ctxt, func);
3425 _dispatch_perfmon_workitem_inc();
3426 _dispatch_thread_frame_pop(&dtf);
3427 }
3428
3429 DISPATCH_NOINLINE
3430 static void
3431 _dispatch_sync_function_invoke(dispatch_queue_t dq, void *ctxt,
3432 dispatch_function_t func)
3433 {
3434 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
3435 }
3436
3437 void
3438 _dispatch_sync_recurse_invoke(void *ctxt)
3439 {
3440 dispatch_continuation_t dc = ctxt;
3441 _dispatch_sync_function_invoke(dc->dc_data, dc->dc_ctxt, dc->dc_func);
3442 }
3443
3444 DISPATCH_ALWAYS_INLINE
3445 static inline void
3446 _dispatch_sync_function_recurse(dispatch_queue_t dq, void *ctxt,
3447 dispatch_function_t func, pthread_priority_t pp)
3448 {
3449 struct dispatch_continuation_s dc = {
3450 .dc_data = dq,
3451 .dc_func = func,
3452 .dc_ctxt = ctxt,
3453 };
3454 _dispatch_sync_f(dq->do_targetq, &dc, _dispatch_sync_recurse_invoke, pp);
3455 }
3456
3457 DISPATCH_NOINLINE
3458 static void
3459 _dispatch_non_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
3460 dispatch_function_t func)
3461 {
3462 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
3463 _dispatch_non_barrier_complete(dq);
3464 }
3465
3466 DISPATCH_NOINLINE
3467 static void
3468 _dispatch_non_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
3469 dispatch_function_t func, pthread_priority_t pp)
3470 {
3471 _dispatch_sync_function_recurse(dq, ctxt, func, pp);
3472 _dispatch_non_barrier_complete(dq);
3473 }
3474
3475 DISPATCH_ALWAYS_INLINE
3476 static void
3477 _dispatch_non_barrier_sync_f_invoke_inline(dispatch_queue_t dq, void *ctxt,
3478 dispatch_function_t func, pthread_priority_t pp)
3479 {
3480 _dispatch_introspection_non_barrier_sync_begin(dq, func);
3481 if (slowpath(dq->do_targetq->do_targetq)) {
3482 return _dispatch_non_barrier_sync_f_recurse(dq, ctxt, func, pp);
3483 }
3484 _dispatch_non_barrier_sync_f_invoke(dq, ctxt, func);
3485 }
3486
3487 #pragma mark -
3488 #pragma mark dispatch_barrier_sync
3489
3490 DISPATCH_NOINLINE
3491 static void
3492 _dispatch_barrier_complete(dispatch_queue_t dq)
3493 {
3494 uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
3495 dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
3496
3497 if (slowpath(dq->dq_items_tail)) {
3498 return _dispatch_try_lock_transfer_or_wakeup(dq);
3499 }
3500
3501 if (!fastpath(_dispatch_queue_drain_try_unlock(dq, owned))) {
3502 // someone enqueued a slow item at the head
3503 // looping may be its last chance
3504 return _dispatch_try_lock_transfer_or_wakeup(dq);
3505 }
3506 }
3507
3508 DISPATCH_NOINLINE
3509 static void
3510 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
3511 dispatch_function_t func, pthread_priority_t pp)
3512 {
3513 _dispatch_sync_function_recurse(dq, ctxt, func, pp);
3514 _dispatch_barrier_complete(dq);
3515 }
3516
3517 DISPATCH_NOINLINE
3518 static void
3519 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
3520 dispatch_function_t func)
3521 {
3522 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
3523 _dispatch_barrier_complete(dq);
3524 }
3525
3526 DISPATCH_ALWAYS_INLINE
3527 static void
3528 _dispatch_barrier_sync_f_invoke_inline(dispatch_queue_t dq, void *ctxt,
3529 dispatch_function_t func, pthread_priority_t pp)
3530 {
3531 _dispatch_introspection_barrier_sync_begin(dq, func);
3532 if (slowpath(dq->do_targetq->do_targetq)) {
3533 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func, pp);
3534 }
3535 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
3536 }
3537
3538 typedef struct dispatch_barrier_sync_context_s {
3539 struct dispatch_continuation_s dbsc_dc;
3540 dispatch_thread_frame_s dbsc_dtf;
3541 } *dispatch_barrier_sync_context_t;
3542
3543 static void
3544 _dispatch_barrier_sync_f_slow_invoke(void *ctxt)
3545 {
3546 dispatch_barrier_sync_context_t dbsc = ctxt;
3547 dispatch_continuation_t dc = &dbsc->dbsc_dc;
3548 dispatch_queue_t dq = dc->dc_data;
3549 dispatch_thread_event_t event = (dispatch_thread_event_t)dc->dc_other;
3550
3551 dispatch_assert(dq == _dispatch_queue_get_current());
3552 #if DISPATCH_COCOA_COMPAT
3553 if (slowpath(_dispatch_queue_is_thread_bound(dq))) {
3554 dispatch_assert(_dispatch_thread_frame_get_current() == NULL);
3555
3556 // the block runs on the thread the queue is bound to and not
3557 // on the calling thread, but we mean to see the calling thread
3558 // dispatch thread frames, so we fake the link, and then undo it
3559 _dispatch_thread_frame_set_current(&dbsc->dbsc_dtf);
3560 // The queue is bound to a non-dispatch thread (e.g. main thread)
3561 _dispatch_continuation_voucher_adopt(dc, DISPATCH_NO_VOUCHER,
3562 DISPATCH_OBJ_CONSUME_BIT);
3563 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
3564 os_atomic_store2o(dc, dc_func, NULL, release);
3565 _dispatch_thread_frame_set_current(NULL);
3566 }
3567 #endif
3568 _dispatch_thread_event_signal(event); // release
3569 }
3570
3571 DISPATCH_NOINLINE
3572 static void
3573 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
3574 dispatch_function_t func, pthread_priority_t pp)
3575 {
3576 if (slowpath(!dq->do_targetq)) {
3577 // see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
3578 return _dispatch_sync_function_invoke(dq, ctxt, func);
3579 }
3580
3581 if (!pp) {
3582 pp = _dispatch_get_priority();
3583 pp &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3584 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
3585 }
3586 dispatch_thread_event_s event;
3587 _dispatch_thread_event_init(&event);
3588 struct dispatch_barrier_sync_context_s dbsc = {
3589 .dbsc_dc = {
3590 .dc_data = dq,
3591 #if DISPATCH_COCOA_COMPAT
3592 .dc_func = func,
3593 .dc_ctxt = ctxt,
3594 #endif
3595 .dc_other = &event,
3596 }
3597 };
3598 #if DISPATCH_COCOA_COMPAT
3599 // It's preferred to execute synchronous blocks on the current thread
3600 // due to thread-local side effects, etc. However, blocks submitted
3601 // to the main thread MUST be run on the main thread
3602 if (slowpath(_dispatch_queue_is_thread_bound(dq))) {
3603 // consumed by _dispatch_barrier_sync_f_slow_invoke
3604 // or in the DISPATCH_COCOA_COMPAT hunk below
3605 _dispatch_continuation_voucher_set(&dbsc.dbsc_dc, dq, 0);
3606 // save frame linkage for _dispatch_barrier_sync_f_slow_invoke
3607 _dispatch_thread_frame_save_state(&dbsc.dbsc_dtf);
3608 // thread bound queues cannot mutate their target queue hierarchy
3609 // so it's fine to look now
3610 _dispatch_introspection_barrier_sync_begin(dq, func);
3611 }
3612 #endif
3613 uint32_t th_self = _dispatch_tid_self();
3614 struct dispatch_continuation_s dbss = {
3615 .dc_flags = DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT,
3616 .dc_func = _dispatch_barrier_sync_f_slow_invoke,
3617 .dc_ctxt = &dbsc,
3618 .dc_data = (void*)(uintptr_t)th_self,
3619 .dc_priority = pp,
3620 .dc_other = &event,
3621 .dc_voucher = DISPATCH_NO_VOUCHER,
3622 };
3623
3624 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
3625 if (unlikely(_dq_state_drain_locked_by(dq_state, th_self))) {
3626 DISPATCH_CLIENT_CRASH(dq, "dispatch_barrier_sync called on queue "
3627 "already owned by current thread");
3628 }
3629
3630 _dispatch_continuation_push_sync_slow(dq, &dbss);
3631 _dispatch_thread_event_wait(&event); // acquire
3632 _dispatch_thread_event_destroy(&event);
3633 if (_dispatch_queue_received_override(dq, pp)) {
3634 // Ensure that the root queue sees that this thread was overridden.
3635 // pairs with the _dispatch_wqthread_override_start in
3636 // _dispatch_continuation_slow_item_signal
3637 _dispatch_set_defaultpriority_override();
3638 }
3639
3640 #if DISPATCH_COCOA_COMPAT
3641 // Queue bound to a non-dispatch thread
3642 if (dbsc.dbsc_dc.dc_func == NULL) {
3643 return;
3644 } else if (dbsc.dbsc_dc.dc_voucher) {
3645 // this almost never happens, unless a dispatch_sync() onto a thread
3646 // bound queue went to the slow path at the same time dispatch_main()
3647 // is called, or the queue is detached from the runloop.
3648 _voucher_release(dbsc.dbsc_dc.dc_voucher);
3649 }
3650 #endif
3651
3652 _dispatch_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
3653 }
3654
3655 DISPATCH_ALWAYS_INLINE
3656 static inline void
3657 _dispatch_barrier_sync_f2(dispatch_queue_t dq, void *ctxt,
3658 dispatch_function_t func, pthread_priority_t pp)
3659 {
3660 if (slowpath(!_dispatch_queue_try_acquire_barrier_sync(dq))) {
3661 // global concurrent queues and queues bound to non-dispatch threads
3662 // always fall into the slow case
3663 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, pp);
3664 }
3665 //
3666 // TODO: the more correct thing to do would be to set dq_override to the qos
3667 // of the thread that just acquired the barrier lock here. Unwinding that
3668 // would slow down the uncontended fastpath however.
3669 //
3670 // The chosen tradeoff is that if an enqueue on a lower priority thread
3671 // contends with this fastpath, this thread may receive a useless override.
3672 // Improving this requires the override level to be part of the atomic
3673 // dq_state
3674 //
3675 _dispatch_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
3676 }
3677
3678 DISPATCH_NOINLINE
3679 static void
3680 _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
3681 dispatch_function_t func, pthread_priority_t pp)
3682 {
3683 _dispatch_barrier_sync_f2(dq, ctxt, func, pp);
3684 }
3685
3686 DISPATCH_NOINLINE
3687 void
3688 dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
3689 dispatch_function_t func)
3690 {
3691 _dispatch_barrier_sync_f2(dq, ctxt, func, 0);
3692 }
3693
3694 #ifdef __BLOCKS__
3695 DISPATCH_NOINLINE
3696 static void
3697 _dispatch_sync_block_with_private_data(dispatch_queue_t dq,
3698 void (^work)(void), dispatch_block_flags_t flags)
3699 {
3700 pthread_priority_t pp = _dispatch_block_get_priority(work);
3701
3702 flags |= _dispatch_block_get_flags(work);
3703 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
3704 pthread_priority_t tp = _dispatch_get_priority();
3705 tp &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3706 if (pp < tp) {
3707 pp = tp | _PTHREAD_PRIORITY_ENFORCE_FLAG;
3708 } else if (_dispatch_block_sync_should_enforce_qos_class(flags)) {
3709 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
3710 }
3711 }
3712 // balanced in d_block_sync_invoke or d_block_wait
3713 if (os_atomic_cmpxchg2o(_dispatch_block_get_data(work),
3714 dbpd_queue, NULL, dq->_as_oq, relaxed)) {
3715 _dispatch_retain(dq);
3716 }
3717 if (flags & DISPATCH_BLOCK_BARRIER) {
3718 _dispatch_barrier_sync_f(dq, work, _dispatch_block_sync_invoke, pp);
3719 } else {
3720 _dispatch_sync_f(dq, work, _dispatch_block_sync_invoke, pp);
3721 }
3722 }
3723
3724 void
3725 dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void))
3726 {
3727 if (slowpath(_dispatch_block_has_private_data(work))) {
3728 dispatch_block_flags_t flags = DISPATCH_BLOCK_BARRIER;
3729 return _dispatch_sync_block_with_private_data(dq, work, flags);
3730 }
3731 dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work));
3732 }
3733 #endif
3734
3735 DISPATCH_NOINLINE
3736 void
3737 _dispatch_barrier_trysync_or_async_f(dispatch_queue_t dq, void *ctxt,
3738 dispatch_function_t func)
3739 {
3740 // Use for mutation of queue-/source-internal state only, ignores target
3741 // queue hierarchy!
3742 if (!fastpath(_dispatch_queue_try_acquire_barrier_sync(dq))) {
3743 return _dispatch_barrier_async_detached_f(dq, ctxt, func);
3744 }
3745 // skip the recursion because it's about the queue state only
3746 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
3747 }
3748
3749 #pragma mark -
3750 #pragma mark dispatch_sync
3751
3752 DISPATCH_NOINLINE
3753 static void
3754 _dispatch_non_barrier_complete(dispatch_queue_t dq)
3755 {
3756 uint64_t old_state, new_state;
3757
3758 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
3759 new_state = old_state - DISPATCH_QUEUE_WIDTH_INTERVAL;
3760 if (_dq_state_is_runnable(new_state)) {
3761 if (!_dq_state_is_runnable(old_state)) {
3762 // we're making a FULL -> non FULL transition
3763 new_state |= DISPATCH_QUEUE_DIRTY;
3764 }
3765 if (!_dq_state_drain_locked(new_state)) {
3766 uint64_t full_width = new_state;
3767 if (_dq_state_has_pending_barrier(new_state)) {
3768 full_width -= DISPATCH_QUEUE_PENDING_BARRIER;
3769 full_width += DISPATCH_QUEUE_WIDTH_INTERVAL;
3770 full_width += DISPATCH_QUEUE_IN_BARRIER;
3771 } else {
3772 full_width += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
3773 full_width += DISPATCH_QUEUE_IN_BARRIER;
3774 }
3775 if ((full_width & DISPATCH_QUEUE_WIDTH_MASK) ==
3776 DISPATCH_QUEUE_WIDTH_FULL_BIT) {
3777 new_state = full_width;
3778 new_state &= ~DISPATCH_QUEUE_DIRTY;
3779 new_state |= _dispatch_tid_self();
3780 }
3781 }
3782 }
3783 });
3784
3785 if (_dq_state_is_in_barrier(new_state)) {
3786 return _dispatch_try_lock_transfer_or_wakeup(dq);
3787 }
3788 if (!_dq_state_is_runnable(old_state)) {
3789 _dispatch_queue_try_wakeup(dq, new_state,
3790 DISPATCH_WAKEUP_WAITER_HANDOFF);
3791 }
3792 }
3793
3794 DISPATCH_NOINLINE
3795 static void
3796 _dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3797 pthread_priority_t pp)
3798 {
3799 dispatch_assert(dq->do_targetq);
3800 if (!pp) {
3801 pp = _dispatch_get_priority();
3802 pp &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3803 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
3804 }
3805 dispatch_thread_event_s event;
3806 _dispatch_thread_event_init(&event);
3807 uint32_t th_self = _dispatch_tid_self();
3808 struct dispatch_continuation_s dc = {
3809 .dc_flags = DISPATCH_OBJ_SYNC_SLOW_BIT,
3810 #if DISPATCH_INTROSPECTION
3811 .dc_func = func,
3812 .dc_ctxt = ctxt,
3813 #endif
3814 .dc_data = (void*)(uintptr_t)th_self,
3815 .dc_other = &event,
3816 .dc_priority = pp,
3817 .dc_voucher = DISPATCH_NO_VOUCHER,
3818 };
3819
3820 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
3821 if (unlikely(_dq_state_drain_locked_by(dq_state, th_self))) {
3822 DISPATCH_CLIENT_CRASH(dq, "dispatch_sync called on queue "
3823 "already owned by current thread");
3824 }
3825
3826 _dispatch_continuation_push_sync_slow(dq, &dc);
3827 _dispatch_thread_event_wait(&event); // acquire
3828 _dispatch_thread_event_destroy(&event);
3829 if (_dispatch_queue_received_override(dq, pp)) {
3830 // Ensure that the root queue sees that this thread was overridden.
3831 // pairs with the _dispatch_wqthread_override_start in
3832 // _dispatch_continuation_slow_item_signal
3833 _dispatch_set_defaultpriority_override();
3834 }
3835 _dispatch_non_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
3836 }
3837
3838 DISPATCH_ALWAYS_INLINE
3839 static inline void
3840 _dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3841 pthread_priority_t pp)
3842 {
3843 // <rdar://problem/24738102&24743140> reserving non barrier width
3844 // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
3845 // equivalent), so we have to check that this thread hasn't enqueued
3846 // anything ahead of this call or we can break ordering
3847 if (slowpath(dq->dq_items_tail)) {
3848 return _dispatch_sync_f_slow(dq, ctxt, func, pp);
3849 }
3850 // concurrent queues do not respect width on sync
3851 if (slowpath(!_dispatch_queue_try_reserve_sync_width(dq))) {
3852 return _dispatch_sync_f_slow(dq, ctxt, func, pp);
3853 }
3854 _dispatch_non_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
3855 }
3856
3857 DISPATCH_NOINLINE
3858 static void
3859 _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3860 pthread_priority_t pp)
3861 {
3862 if (DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width)) {
3863 return _dispatch_sync_f2(dq, ctxt, func, pp);
3864 }
3865 return _dispatch_barrier_sync_f(dq, ctxt, func, pp);
3866 }
3867
3868 DISPATCH_NOINLINE
3869 void
3870 dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
3871 {
3872 if (DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width)) {
3873 return _dispatch_sync_f2(dq, ctxt, func, 0);
3874 }
3875 return dispatch_barrier_sync_f(dq, ctxt, func);
3876 }
3877
3878 #ifdef __BLOCKS__
3879 void
3880 dispatch_sync(dispatch_queue_t dq, void (^work)(void))
3881 {
3882 if (slowpath(_dispatch_block_has_private_data(work))) {
3883 return _dispatch_sync_block_with_private_data(dq, work, 0);
3884 }
3885 dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
3886 }
3887 #endif
3888
3889 #pragma mark -
3890 #pragma mark dispatch_trysync
3891
3892 struct trysync_context {
3893 dispatch_queue_t tc_dq;
3894 void *tc_ctxt;
3895 dispatch_function_t tc_func;
3896 };
3897
3898 DISPATCH_NOINLINE
3899 static int
3900 _dispatch_trysync_recurse(dispatch_queue_t dq,
3901 struct trysync_context *tc, bool barrier)
3902 {
3903 dispatch_queue_t tq = dq->do_targetq;
3904
3905 if (barrier) {
3906 if (slowpath(!_dispatch_queue_try_acquire_barrier_sync(dq))) {
3907 return EWOULDBLOCK;
3908 }
3909 } else {
3910 // <rdar://problem/24743140> check nothing was queued by the current
3911 // thread ahead of this call. _dispatch_queue_try_reserve_sync_width
3912 // ignores the ENQUEUED bit which could cause it to miss a barrier_async
3913 // made by the same thread just before.
3914 if (slowpath(dq->dq_items_tail)) {
3915 return EWOULDBLOCK;
3916 }
3917 // concurrent queues do not respect width on sync
3918 if (slowpath(!_dispatch_queue_try_reserve_sync_width(dq))) {
3919 return EWOULDBLOCK;
3920 }
3921 }
3922
3923 int rc = 0;
3924 if (_dispatch_queue_cannot_trysync(tq)) {
3925 _dispatch_queue_atomic_flags_set(dq, DQF_CANNOT_TRYSYNC);
3926 rc = ENOTSUP;
3927 } else if (tq->do_targetq) {
3928 rc = _dispatch_trysync_recurse(tq, tc, tq->dq_width == 1);
3929 if (rc == ENOTSUP) {
3930 _dispatch_queue_atomic_flags_set(dq, DQF_CANNOT_TRYSYNC);
3931 }
3932 } else {
3933 dispatch_thread_frame_s dtf;
3934 _dispatch_thread_frame_push(&dtf, tq);
3935 _dispatch_sync_function_invoke(tc->tc_dq, tc->tc_ctxt, tc->tc_func);
3936 _dispatch_thread_frame_pop(&dtf);
3937 }
3938 if (barrier) {
3939 _dispatch_barrier_complete(dq);
3940 } else {
3941 _dispatch_non_barrier_complete(dq);
3942 }
3943 return rc;
3944 }
3945
3946 DISPATCH_NOINLINE
3947 bool
3948 _dispatch_barrier_trysync_f(dispatch_queue_t dq, void *ctxt,
3949 dispatch_function_t f)
3950 {
3951 if (slowpath(!dq->do_targetq)) {
3952 _dispatch_sync_function_invoke(dq, ctxt, f);
3953 return true;
3954 }
3955 if (slowpath(_dispatch_queue_cannot_trysync(dq))) {
3956 return false;
3957 }
3958 struct trysync_context tc = {
3959 .tc_dq = dq,
3960 .tc_func = f,
3961 .tc_ctxt = ctxt,
3962 };
3963 return _dispatch_trysync_recurse(dq, &tc, true) == 0;
3964 }
3965
3966 DISPATCH_NOINLINE
3967 bool
3968 _dispatch_trysync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t f)
3969 {
3970 if (slowpath(!dq->do_targetq)) {
3971 _dispatch_sync_function_invoke(dq, ctxt, f);
3972 return true;
3973 }
3974 if (slowpath(_dispatch_queue_cannot_trysync(dq))) {
3975 return false;
3976 }
3977 struct trysync_context tc = {
3978 .tc_dq = dq,
3979 .tc_func = f,
3980 .tc_ctxt = ctxt,
3981 };
3982 return _dispatch_trysync_recurse(dq, &tc, dq->dq_width == 1) == 0;
3983 }
3984
3985 #pragma mark -
3986 #pragma mark dispatch_after
3987
3988 DISPATCH_ALWAYS_INLINE
3989 static inline void
3990 _dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
3991 void *ctxt, void *handler, bool block)
3992 {
3993 dispatch_source_t ds;
3994 uint64_t leeway, delta;
3995
3996 if (when == DISPATCH_TIME_FOREVER) {
3997 #if DISPATCH_DEBUG
3998 DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
3999 #endif
4000 return;
4001 }
4002
4003 delta = _dispatch_timeout(when);
4004 if (delta == 0) {
4005 if (block) {
4006 return dispatch_async(queue, handler);
4007 }
4008 return dispatch_async_f(queue, ctxt, handler);
4009 }
4010 leeway = delta / 10; // <rdar://problem/13447496>
4011
4012 if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
4013 if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
4014
4015 // this function can and should be optimized to not use a dispatch source
4016 ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
4017 dispatch_assert(ds);
4018
4019 dispatch_continuation_t dc = _dispatch_continuation_alloc();
4020 if (block) {
4021 _dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
4022 } else {
4023 _dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
4024 }
4025 // reference `ds` so that it doesn't show up as a leak
4026 dc->dc_data = ds;
4027 _dispatch_source_set_event_handler_continuation(ds, dc);
4028 dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, leeway);
4029 dispatch_activate(ds);
4030 }
4031
4032 DISPATCH_NOINLINE
4033 void
4034 dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
4035 dispatch_function_t func)
4036 {
4037 _dispatch_after(when, queue, ctxt, func, false);
4038 }
4039
4040 #ifdef __BLOCKS__
4041 void
4042 dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
4043 dispatch_block_t work)
4044 {
4045 _dispatch_after(when, queue, NULL, work, true);
4046 }
4047 #endif
4048
4049 #pragma mark -
4050 #pragma mark dispatch_queue_wakeup
4051
4052 DISPATCH_NOINLINE
4053 void
4054 _dispatch_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
4055 dispatch_wakeup_flags_t flags)
4056 {
4057 dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
4058
4059 if (_dispatch_queue_class_probe(dq)) {
4060 target = DISPATCH_QUEUE_WAKEUP_TARGET;
4061 }
4062 if (target) {
4063 return _dispatch_queue_class_wakeup(dq, pp, flags, target);
4064 } else if (pp) {
4065 return _dispatch_queue_class_override_drainer(dq, pp, flags);
4066 } else if (flags & DISPATCH_WAKEUP_CONSUME) {
4067 return _dispatch_release_tailcall(dq);
4068 }
4069 }
4070
4071 #if DISPATCH_COCOA_COMPAT
4072 DISPATCH_ALWAYS_INLINE
4073 static inline bool
4074 _dispatch_runloop_handle_is_valid(dispatch_runloop_handle_t handle)
4075 {
4076 #if TARGET_OS_MAC
4077 return MACH_PORT_VALID(handle);
4078 #elif defined(__linux__)
4079 return handle >= 0;
4080 #else
4081 #error "runloop support not implemented on this platform"
4082 #endif
4083 }
4084
4085 DISPATCH_ALWAYS_INLINE
4086 static inline dispatch_runloop_handle_t
4087 _dispatch_runloop_queue_get_handle(dispatch_queue_t dq)
4088 {
4089 #if TARGET_OS_MAC
4090 return ((dispatch_runloop_handle_t)(uintptr_t)dq->do_ctxt);
4091 #elif defined(__linux__)
4092 // decode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4093 return ((dispatch_runloop_handle_t)(uintptr_t)dq->do_ctxt) - 1;
4094 #else
4095 #error "runloop support not implemented on this platform"
4096 #endif
4097 }
4098
4099 DISPATCH_ALWAYS_INLINE
4100 static inline void
4101 _dispatch_runloop_queue_set_handle(dispatch_queue_t dq, dispatch_runloop_handle_t handle)
4102 {
4103 #if TARGET_OS_MAC
4104 dq->do_ctxt = (void *)(uintptr_t)handle;
4105 #elif defined(__linux__)
4106 // encode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4107 dq->do_ctxt = (void *)(uintptr_t)(handle + 1);
4108 #else
4109 #error "runloop support not implemented on this platform"
4110 #endif
4111 }
4112 #endif // DISPATCH_COCOA_COMPAT
4113
4114 void
4115 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
4116 dispatch_wakeup_flags_t flags)
4117 {
4118 #if DISPATCH_COCOA_COMPAT
4119 if (slowpath(_dispatch_queue_atomic_flags(dq) & DQF_RELEASED)) {
4120 // <rdar://problem/14026816>
4121 return _dispatch_queue_wakeup(dq, pp, flags);
4122 }
4123
4124 if (_dispatch_queue_class_probe(dq)) {
4125 return _dispatch_runloop_queue_poke(dq, pp, flags);
4126 }
4127
4128 pp = _dispatch_queue_reset_override_priority(dq, true);
4129 if (pp) {
4130 mach_port_t owner = DISPATCH_QUEUE_DRAIN_OWNER(dq);
4131 if (_dispatch_queue_class_probe(dq)) {
4132 _dispatch_runloop_queue_poke(dq, pp, flags);
4133 }
4134 _dispatch_thread_override_end(owner, dq);
4135 return;
4136 }
4137 if (flags & DISPATCH_WAKEUP_CONSUME) {
4138 return _dispatch_release_tailcall(dq);
4139 }
4140 #else
4141 return _dispatch_queue_wakeup(dq, pp, flags);
4142 #endif
4143 }
4144
4145 void
4146 _dispatch_main_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
4147 dispatch_wakeup_flags_t flags)
4148 {
4149 #if DISPATCH_COCOA_COMPAT
4150 if (_dispatch_queue_is_thread_bound(dq)) {
4151 return _dispatch_runloop_queue_wakeup(dq, pp, flags);
4152 }
4153 #endif
4154 return _dispatch_queue_wakeup(dq, pp, flags);
4155 }
4156
4157 void
4158 _dispatch_root_queue_wakeup(dispatch_queue_t dq,
4159 pthread_priority_t pp DISPATCH_UNUSED,
4160 dispatch_wakeup_flags_t flags)
4161 {
4162 if (flags & DISPATCH_WAKEUP_CONSUME) {
4163 // see _dispatch_queue_push_set_head
4164 dispatch_assert(flags & DISPATCH_WAKEUP_FLUSH);
4165 }
4166 _dispatch_global_queue_poke(dq);
4167 }
4168
4169 #pragma mark -
4170 #pragma mark dispatch root queues poke
4171
4172 #if DISPATCH_COCOA_COMPAT
4173 static inline void
4174 _dispatch_runloop_queue_class_poke(dispatch_queue_t dq)
4175 {
4176 dispatch_runloop_handle_t handle = _dispatch_runloop_queue_get_handle(dq);
4177 if (!_dispatch_runloop_handle_is_valid(handle)) {
4178 return;
4179 }
4180
4181 #if TARGET_OS_MAC
4182 mach_port_t mp = handle;
4183 kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0);
4184 switch (kr) {
4185 case MACH_SEND_TIMEOUT:
4186 case MACH_SEND_TIMED_OUT:
4187 case MACH_SEND_INVALID_DEST:
4188 break;
4189 default:
4190 (void)dispatch_assume_zero(kr);
4191 break;
4192 }
4193 #elif defined(__linux__)
4194 int result;
4195 do {
4196 result = eventfd_write(handle, 1);
4197 } while (result == -1 && errno == EINTR);
4198 (void)dispatch_assume_zero(result);
4199 #else
4200 #error "runloop support not implemented on this platform"
4201 #endif
4202 }
4203
4204 DISPATCH_NOINLINE
4205 static void
4206 _dispatch_runloop_queue_poke(dispatch_queue_t dq,
4207 pthread_priority_t pp, dispatch_wakeup_flags_t flags)
4208 {
4209 // it's not useful to handle WAKEUP_FLUSH because mach_msg() will have
4210 // a release barrier and that when runloop queues stop being thread bound
4211 // they have a non optional wake-up to start being a "normal" queue
4212 // either in _dispatch_runloop_queue_xref_dispose,
4213 // or in _dispatch_queue_cleanup2() for the main thread.
4214
4215 if (dq == &_dispatch_main_q) {
4216 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
4217 _dispatch_runloop_queue_handle_init);
4218 }
4219 _dispatch_queue_override_priority(dq, /* inout */ &pp, /* inout */ &flags);
4220 if (flags & DISPATCH_WAKEUP_OVERRIDING) {
4221 mach_port_t owner = DISPATCH_QUEUE_DRAIN_OWNER(dq);
4222 _dispatch_thread_override_start(owner, pp, dq);
4223 if (flags & DISPATCH_WAKEUP_WAS_OVERRIDDEN) {
4224 _dispatch_thread_override_end(owner, dq);
4225 }
4226 }
4227 _dispatch_runloop_queue_class_poke(dq);
4228 if (flags & DISPATCH_WAKEUP_CONSUME) {
4229 return _dispatch_release_tailcall(dq);
4230 }
4231 }
4232 #endif
4233
4234 DISPATCH_NOINLINE
4235 static void
4236 _dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n)
4237 {
4238 dispatch_root_queue_context_t qc = dq->do_ctxt;
4239 uint32_t i = n;
4240 int r;
4241
4242 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
4243 _dispatch_root_queues_init_once);
4244
4245 _dispatch_debug_root_queue(dq, __func__);
4246 #if HAVE_PTHREAD_WORKQUEUES
4247 #if DISPATCH_USE_PTHREAD_POOL
4248 if (qc->dgq_kworkqueue != (void*)(~0ul))
4249 #endif
4250 {
4251 _dispatch_root_queue_debug("requesting new worker thread for global "
4252 "queue: %p", dq);
4253 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4254 if (qc->dgq_kworkqueue) {
4255 pthread_workitem_handle_t wh;
4256 unsigned int gen_cnt;
4257 do {
4258 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
4259 _dispatch_worker_thread4, dq, &wh, &gen_cnt);
4260 (void)dispatch_assume_zero(r);
4261 } while (--i);
4262 return;
4263 }
4264 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4265 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
4266 if (!dq->dq_priority) {
4267 r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
4268 qc->dgq_wq_options, (int)i);
4269 (void)dispatch_assume_zero(r);
4270 return;
4271 }
4272 #endif
4273 #if HAVE_PTHREAD_WORKQUEUE_QOS
4274 r = _pthread_workqueue_addthreads((int)i, dq->dq_priority);
4275 (void)dispatch_assume_zero(r);
4276 #endif
4277 return;
4278 }
4279 #endif // HAVE_PTHREAD_WORKQUEUES
4280 #if DISPATCH_USE_PTHREAD_POOL
4281 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
4282 if (fastpath(pqc->dpq_thread_mediator.do_vtable)) {
4283 while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
4284 if (!--i) {
4285 return;
4286 }
4287 }
4288 }
4289 uint32_t j, t_count;
4290 // seq_cst with atomic store to tail <rdar://problem/16932833>
4291 t_count = os_atomic_load2o(qc, dgq_thread_pool_size, ordered);
4292 do {
4293 if (!t_count) {
4294 _dispatch_root_queue_debug("pthread pool is full for root queue: "
4295 "%p", dq);
4296 return;
4297 }
4298 j = i > t_count ? t_count : i;
4299 } while (!os_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count,
4300 t_count - j, &t_count, acquire));
4301
4302 pthread_attr_t *attr = &pqc->dpq_thread_attr;
4303 pthread_t tid, *pthr = &tid;
4304 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
4305 if (slowpath(dq == &_dispatch_mgr_root_queue)) {
4306 pthr = _dispatch_mgr_root_queue_init();
4307 }
4308 #endif
4309 do {
4310 _dispatch_retain(dq);
4311 while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
4312 if (r != EAGAIN) {
4313 (void)dispatch_assume_zero(r);
4314 }
4315 _dispatch_temporary_resource_shortage();
4316 }
4317 } while (--j);
4318 #endif // DISPATCH_USE_PTHREAD_POOL
4319 }
4320
4321 static inline void
4322 _dispatch_global_queue_poke_n(dispatch_queue_t dq, unsigned int n)
4323 {
4324 if (!_dispatch_queue_class_probe(dq)) {
4325 return;
4326 }
4327 #if HAVE_PTHREAD_WORKQUEUES
4328 dispatch_root_queue_context_t qc = dq->do_ctxt;
4329 if (
4330 #if DISPATCH_USE_PTHREAD_POOL
4331 (qc->dgq_kworkqueue != (void*)(~0ul)) &&
4332 #endif
4333 !os_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) {
4334 _dispatch_root_queue_debug("worker thread request still pending for "
4335 "global queue: %p", dq);
4336 return;
4337 }
4338 #endif // HAVE_PTHREAD_WORKQUEUES
4339 return _dispatch_global_queue_poke_slow(dq, n);
4340 }
4341
4342 static inline void
4343 _dispatch_global_queue_poke(dispatch_queue_t dq)
4344 {
4345 return _dispatch_global_queue_poke_n(dq, 1);
4346 }
4347
4348 DISPATCH_NOINLINE
4349 void
4350 _dispatch_queue_push_list_slow(dispatch_queue_t dq, unsigned int n)
4351 {
4352 return _dispatch_global_queue_poke_n(dq, n);
4353 }
4354
4355 #pragma mark -
4356 #pragma mark dispatch_queue_drain
4357
4358 void
4359 _dispatch_continuation_pop(dispatch_object_t dou, dispatch_queue_t dq,
4360 dispatch_invoke_flags_t flags)
4361 {
4362 _dispatch_continuation_pop_inline(dou, dq, flags);
4363 }
4364
4365 void
4366 _dispatch_continuation_invoke(dispatch_object_t dou, voucher_t override_voucher,
4367 dispatch_invoke_flags_t flags)
4368 {
4369 _dispatch_continuation_invoke_inline(dou, override_voucher, flags);
4370 }
4371
4372 /*
4373 * Drain comes in 2 flavours (serial/concurrent) and 2 modes
4374 * (redirecting or not).
4375 *
4376 * Serial
4377 * ~~~~~~
4378 * Serial drain is about serial queues (width == 1). It doesn't support
4379 * the redirecting mode, which doesn't make sense, and treats all continuations
4380 * as barriers. Bookkeeping is minimal in serial flavour, most of the loop
4381 * is optimized away.
4382 *
4383 * Serial drain stops if the width of the queue grows to larger than 1.
4384 * Going through a serial drain prevents any recursive drain from being
4385 * redirecting.
4386 *
4387 * Concurrent
4388 * ~~~~~~~~~~
4389 * When in non-redirecting mode (meaning one of the target queues is serial),
4390 * non-barriers and barriers alike run in the context of the drain thread.
4391 * Slow non-barrier items are still all signaled so that they can make progress
4392 * toward the dispatch_sync() that will serialize them all .
4393 *
4394 * In redirecting mode, non-barrier work items are redirected downward.
4395 *
4396 * Concurrent drain stops if the width of the queue becomes 1, so that the
4397 * queue drain moves to the more efficient serial mode.
4398 */
4399 DISPATCH_ALWAYS_INLINE
4400 static dispatch_queue_t
4401 _dispatch_queue_drain(dispatch_queue_t dq, dispatch_invoke_flags_t flags,
4402 uint64_t *owned_ptr, struct dispatch_object_s **dc_out,
4403 bool serial_drain)
4404 {
4405 dispatch_queue_t orig_tq = dq->do_targetq;
4406 dispatch_thread_frame_s dtf;
4407 struct dispatch_object_s *dc = NULL, *next_dc;
4408 uint64_t owned = *owned_ptr;
4409
4410 _dispatch_thread_frame_push(&dtf, dq);
4411 if (_dq_state_is_in_barrier(owned)) {
4412 // we really own `IN_BARRIER + dq->dq_width * WIDTH_INTERVAL`
4413 // but width can change while draining barrier work items, so we only
4414 // convert to `dq->dq_width * WIDTH_INTERVAL` when we drop `IN_BARRIER`
4415 owned = DISPATCH_QUEUE_IN_BARRIER;
4416 }
4417
4418 while (dq->dq_items_tail) {
4419 dc = _dispatch_queue_head(dq);
4420 do {
4421 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(dq))) {
4422 goto out;
4423 }
4424 if (unlikely(orig_tq != dq->do_targetq)) {
4425 goto out;
4426 }
4427 if (unlikely(serial_drain != (dq->dq_width == 1))) {
4428 goto out;
4429 }
4430 if (serial_drain || _dispatch_object_is_barrier(dc)) {
4431 if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
4432 goto out;
4433 }
4434 next_dc = _dispatch_queue_next(dq, dc);
4435 if (_dispatch_object_is_slow_item(dc)) {
4436 owned = 0;
4437 goto out_with_deferred;
4438 }
4439 } else {
4440 if (owned == DISPATCH_QUEUE_IN_BARRIER) {
4441 // we just ran barrier work items, we have to make their
4442 // effect visible to other sync work items on other threads
4443 // that may start coming in after this point, hence the
4444 // release barrier
4445 os_atomic_and2o(dq, dq_state, ~owned, release);
4446 owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4447 } else if (unlikely(owned == 0)) {
4448 if (_dispatch_object_is_slow_item(dc)) {
4449 // sync "readers" don't observe the limit
4450 _dispatch_queue_reserve_sync_width(dq);
4451 } else if (!_dispatch_queue_try_acquire_async(dq)) {
4452 goto out_with_no_width;
4453 }
4454 owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
4455 }
4456
4457 next_dc = _dispatch_queue_next(dq, dc);
4458 if (_dispatch_object_is_slow_item(dc)) {
4459 owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4460 _dispatch_continuation_slow_item_signal(dq, dc);
4461 continue;
4462 }
4463
4464 if (flags & DISPATCH_INVOKE_REDIRECTING_DRAIN) {
4465 owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4466 _dispatch_continuation_redirect(dq, dc);
4467 continue;
4468 }
4469 }
4470
4471 _dispatch_continuation_pop_inline(dc, dq, flags);
4472 _dispatch_perfmon_workitem_inc();
4473 if (unlikely(dtf.dtf_deferred)) {
4474 goto out_with_deferred_compute_owned;
4475 }
4476 } while ((dc = next_dc));
4477 }
4478
4479 out:
4480 if (owned == DISPATCH_QUEUE_IN_BARRIER) {
4481 // if we're IN_BARRIER we really own the full width too
4482 owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4483 }
4484 if (dc) {
4485 owned = _dispatch_queue_adjust_owned(dq, owned, dc);
4486 }
4487 *owned_ptr = owned;
4488 _dispatch_thread_frame_pop(&dtf);
4489 return dc ? dq->do_targetq : NULL;
4490
4491 out_with_no_width:
4492 *owned_ptr = 0;
4493 _dispatch_thread_frame_pop(&dtf);
4494 return NULL;
4495
4496 out_with_deferred_compute_owned:
4497 if (serial_drain) {
4498 owned = DISPATCH_QUEUE_IN_BARRIER + DISPATCH_QUEUE_WIDTH_INTERVAL;
4499 } else {
4500 if (owned == DISPATCH_QUEUE_IN_BARRIER) {
4501 // if we're IN_BARRIER we really own the full width too
4502 owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4503 }
4504 if (next_dc) {
4505 owned = _dispatch_queue_adjust_owned(dq, owned, next_dc);
4506 }
4507 }
4508 out_with_deferred:
4509 *owned_ptr = owned;
4510 if (unlikely(!dc_out)) {
4511 DISPATCH_INTERNAL_CRASH(dc,
4512 "Deferred continuation on source, mach channel or mgr");
4513 }
4514 *dc_out = dc;
4515 _dispatch_thread_frame_pop(&dtf);
4516 return dq->do_targetq;
4517 }
4518
4519 DISPATCH_NOINLINE
4520 static dispatch_queue_t
4521 _dispatch_queue_concurrent_drain(dispatch_queue_t dq,
4522 dispatch_invoke_flags_t flags, uint64_t *owned,
4523 struct dispatch_object_s **dc_ptr)
4524 {
4525 return _dispatch_queue_drain(dq, flags, owned, dc_ptr, false);
4526 }
4527
4528 DISPATCH_NOINLINE
4529 dispatch_queue_t
4530 _dispatch_queue_serial_drain(dispatch_queue_t dq,
4531 dispatch_invoke_flags_t flags, uint64_t *owned,
4532 struct dispatch_object_s **dc_ptr)
4533 {
4534 flags &= ~(dispatch_invoke_flags_t)DISPATCH_INVOKE_REDIRECTING_DRAIN;
4535 return _dispatch_queue_drain(dq, flags, owned, dc_ptr, true);
4536 }
4537
4538 #if DISPATCH_COCOA_COMPAT
4539 static void
4540 _dispatch_main_queue_drain(void)
4541 {
4542 dispatch_queue_t dq = &_dispatch_main_q;
4543 dispatch_thread_frame_s dtf;
4544
4545 if (!dq->dq_items_tail) {
4546 return;
4547 }
4548
4549 if (!fastpath(_dispatch_queue_is_thread_bound(dq))) {
4550 DISPATCH_CLIENT_CRASH(0, "_dispatch_main_queue_callback_4CF called"
4551 " after dispatch_main()");
4552 }
4553 mach_port_t owner = DISPATCH_QUEUE_DRAIN_OWNER(dq);
4554 if (slowpath(owner != _dispatch_tid_self())) {
4555 DISPATCH_CLIENT_CRASH(owner, "_dispatch_main_queue_callback_4CF called"
4556 " from the wrong thread");
4557 }
4558
4559 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
4560 _dispatch_runloop_queue_handle_init);
4561
4562 _dispatch_perfmon_start();
4563 // <rdar://problem/23256682> hide the frame chaining when CFRunLoop
4564 // drains the main runloop, as this should not be observable that way
4565 _dispatch_thread_frame_push_and_rebase(&dtf, dq, NULL);
4566
4567 pthread_priority_t old_pri = _dispatch_get_priority();
4568 pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri, NULL);
4569 voucher_t voucher = _voucher_copy();
4570
4571 struct dispatch_object_s *dc, *next_dc, *tail;
4572 dc = os_mpsc_capture_snapshot(dq, dq_items, &tail);
4573 do {
4574 next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
4575 _dispatch_continuation_pop_inline(dc, dq, DISPATCH_INVOKE_NONE);
4576 _dispatch_perfmon_workitem_inc();
4577 } while ((dc = next_dc));
4578
4579 // runloop based queues use their port for the queue PUBLISH pattern
4580 // so this raw call to dx_wakeup(0) is valid
4581 dx_wakeup(dq, 0, 0);
4582 _dispatch_voucher_debug("main queue restore", voucher);
4583 _dispatch_reset_defaultpriority(old_dp);
4584 _dispatch_reset_priority_and_voucher(old_pri, voucher);
4585 _dispatch_thread_frame_pop(&dtf);
4586 _dispatch_perfmon_end();
4587 _dispatch_force_cache_cleanup();
4588 }
4589
4590 static bool
4591 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq)
4592 {
4593 if (!dq->dq_items_tail) {
4594 return false;
4595 }
4596 dispatch_thread_frame_s dtf;
4597 _dispatch_perfmon_start();
4598 _dispatch_thread_frame_push(&dtf, dq);
4599 pthread_priority_t old_pri = _dispatch_get_priority();
4600 pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri, NULL);
4601 voucher_t voucher = _voucher_copy();
4602
4603 struct dispatch_object_s *dc, *next_dc;
4604 dc = _dispatch_queue_head(dq);
4605 next_dc = _dispatch_queue_next(dq, dc);
4606 _dispatch_continuation_pop_inline(dc, dq, DISPATCH_INVOKE_NONE);
4607 _dispatch_perfmon_workitem_inc();
4608
4609 if (!next_dc) {
4610 // runloop based queues use their port for the queue PUBLISH pattern
4611 // so this raw call to dx_wakeup(0) is valid
4612 dx_wakeup(dq, 0, 0);
4613 }
4614
4615 _dispatch_voucher_debug("runloop queue restore", voucher);
4616 _dispatch_reset_defaultpriority(old_dp);
4617 _dispatch_reset_priority_and_voucher(old_pri, voucher);
4618 _dispatch_thread_frame_pop(&dtf);
4619 _dispatch_perfmon_end();
4620 _dispatch_force_cache_cleanup();
4621 return next_dc;
4622 }
4623 #endif
4624
4625 DISPATCH_NOINLINE
4626 void
4627 _dispatch_try_lock_transfer_or_wakeup(dispatch_queue_t dq)
4628 {
4629 dispatch_continuation_t dc_tmp, dc_start, dc_end;
4630 struct dispatch_object_s *dc = NULL;
4631 uint64_t dq_state, owned;
4632 size_t count = 0;
4633
4634 owned = DISPATCH_QUEUE_IN_BARRIER;
4635 owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4636 attempt_running_slow_head:
4637 if (slowpath(dq->dq_items_tail) && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
4638 dc = _dispatch_queue_head(dq);
4639 if (!_dispatch_object_is_slow_item(dc)) {
4640 // not a slow item, needs to wake up
4641 } else if (fastpath(dq->dq_width == 1) ||
4642 _dispatch_object_is_barrier(dc)) {
4643 // rdar://problem/8290662 "barrier/writer lock transfer"
4644 dc_start = dc_end = (dispatch_continuation_t)dc;
4645 owned = 0;
4646 count = 1;
4647 dc = _dispatch_queue_next(dq, dc);
4648 } else {
4649 // <rdar://problem/10164594> "reader lock transfer"
4650 // we must not signal semaphores immediately because our right
4651 // for dequeuing is granted through holding the full "barrier" width
4652 // which a signaled work item could relinquish out from our feet
4653 dc_start = (dispatch_continuation_t)dc;
4654 do {
4655 // no check on width here because concurrent queues
4656 // do not respect width for blocked readers, the thread
4657 // is already spent anyway
4658 dc_end = (dispatch_continuation_t)dc;
4659 owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4660 count++;
4661 dc = _dispatch_queue_next(dq, dc);
4662 } while (dc && _dispatch_object_is_slow_non_barrier(dc));
4663 }
4664
4665 if (count) {
4666 _dispatch_queue_drain_transfer_lock(dq, owned, dc_start);
4667 do {
4668 // signaled job will release the continuation
4669 dc_tmp = dc_start;
4670 dc_start = dc_start->do_next;
4671 _dispatch_continuation_slow_item_signal(dq, dc_tmp);
4672 } while (dc_tmp != dc_end);
4673 return;
4674 }
4675 }
4676
4677 if (dc || dx_metatype(dq) != _DISPATCH_QUEUE_TYPE) {
4678 // <rdar://problem/23336992> the following wakeup is needed for sources
4679 // or mach channels: when ds_pending_data is set at the same time
4680 // as a trysync_f happens, lock transfer code above doesn't know about
4681 // ds_pending_data or the wakeup logic, but lock transfer is useless
4682 // for sources and mach channels in the first place.
4683 owned = _dispatch_queue_adjust_owned(dq, owned, dc);
4684 dq_state = _dispatch_queue_drain_unlock(dq, owned, NULL);
4685 return _dispatch_queue_try_wakeup(dq, dq_state,
4686 DISPATCH_WAKEUP_WAITER_HANDOFF);
4687 } else if (!fastpath(_dispatch_queue_drain_try_unlock(dq, owned))) {
4688 // someone enqueued a slow item at the head
4689 // looping may be its last chance
4690 goto attempt_running_slow_head;
4691 }
4692 }
4693
4694 void
4695 _dispatch_mgr_queue_drain(void)
4696 {
4697 const dispatch_invoke_flags_t flags = DISPATCH_INVOKE_MANAGER_DRAIN;
4698 dispatch_queue_t dq = &_dispatch_mgr_q;
4699 uint64_t owned = DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
4700
4701 if (dq->dq_items_tail) {
4702 _dispatch_perfmon_start();
4703 if (slowpath(_dispatch_queue_serial_drain(dq, flags, &owned, NULL))) {
4704 DISPATCH_INTERNAL_CRASH(0, "Interrupted drain on manager queue");
4705 }
4706 _dispatch_voucher_debug("mgr queue clear", NULL);
4707 _voucher_clear();
4708 _dispatch_reset_defaultpriority_override();
4709 _dispatch_perfmon_end();
4710 }
4711
4712 #if DISPATCH_USE_KEVENT_WORKQUEUE
4713 if (!_dispatch_kevent_workqueue_enabled)
4714 #endif
4715 {
4716 _dispatch_force_cache_cleanup();
4717 }
4718 }
4719
4720 #pragma mark -
4721 #pragma mark dispatch_queue_invoke
4722
4723 void
4724 _dispatch_queue_drain_deferred_invoke(dispatch_queue_t dq,
4725 dispatch_invoke_flags_t flags, uint64_t to_unlock,
4726 struct dispatch_object_s *dc)
4727 {
4728 if (_dispatch_object_is_slow_item(dc)) {
4729 dispatch_assert(to_unlock == 0);
4730 _dispatch_queue_drain_transfer_lock(dq, to_unlock, dc);
4731 _dispatch_continuation_slow_item_signal(dq, dc);
4732 return _dispatch_release_tailcall(dq);
4733 }
4734
4735 bool should_defer_again = false, should_pend_queue = true;
4736 uint64_t old_state, new_state;
4737
4738 if (_dispatch_get_current_queue()->do_targetq) {
4739 _dispatch_thread_frame_get_current()->dtf_deferred = dc;
4740 should_defer_again = true;
4741 should_pend_queue = false;
4742 }
4743
4744 if (dq->dq_width > 1) {
4745 should_pend_queue = false;
4746 } else if (should_pend_queue) {
4747 dispatch_assert(to_unlock ==
4748 DISPATCH_QUEUE_WIDTH_INTERVAL + DISPATCH_QUEUE_IN_BARRIER);
4749 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{
4750 new_state = old_state;
4751 if (_dq_state_has_waiters(old_state) ||
4752 _dq_state_is_enqueued(old_state)) {
4753 os_atomic_rmw_loop_give_up(break);
4754 }
4755 new_state += DISPATCH_QUEUE_DRAIN_PENDED;
4756 new_state -= DISPATCH_QUEUE_IN_BARRIER;
4757 new_state -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4758 });
4759 should_pend_queue = (new_state & DISPATCH_QUEUE_DRAIN_PENDED);
4760 }
4761
4762 if (!should_pend_queue) {
4763 if (to_unlock & DISPATCH_QUEUE_IN_BARRIER) {
4764 _dispatch_try_lock_transfer_or_wakeup(dq);
4765 _dispatch_release(dq);
4766 } else if (to_unlock) {
4767 uint64_t dq_state = _dispatch_queue_drain_unlock(dq, to_unlock, NULL);
4768 _dispatch_queue_try_wakeup(dq, dq_state, DISPATCH_WAKEUP_CONSUME);
4769 } else {
4770 _dispatch_release(dq);
4771 }
4772 dq = NULL;
4773 }
4774
4775 if (!should_defer_again) {
4776 dx_invoke(dc, flags & _DISPATCH_INVOKE_PROPAGATE_MASK);
4777 }
4778
4779 if (dq) {
4780 uint32_t self = _dispatch_tid_self();
4781 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{
4782 new_state = old_state;
4783 if (!_dq_state_drain_pended(old_state) ||
4784 _dq_state_drain_owner(old_state) != self) {
4785 os_atomic_rmw_loop_give_up({
4786 // We may have been overridden, so inform the root queue
4787 _dispatch_set_defaultpriority_override();
4788 return _dispatch_release_tailcall(dq);
4789 });
4790 }
4791 new_state = DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT(new_state);
4792 });
4793 if (_dq_state_has_override(old_state)) {
4794 // Ensure that the root queue sees that this thread was overridden.
4795 _dispatch_set_defaultpriority_override();
4796 }
4797 return dx_invoke(dq, flags | DISPATCH_INVOKE_STEALING);
4798 }
4799 }
4800
4801 void
4802 _dispatch_queue_finalize_activation(dispatch_queue_t dq)
4803 {
4804 dispatch_queue_t tq = dq->do_targetq;
4805 _dispatch_queue_priority_inherit_from_target(dq, tq);
4806 _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
4807 if (dq->dq_override_voucher == DISPATCH_NO_VOUCHER) {
4808 voucher_t v = tq->dq_override_voucher;
4809 if (v != DISPATCH_NO_VOUCHER) {
4810 if (v) _voucher_retain(v);
4811 dq->dq_override_voucher = v;
4812 }
4813 }
4814 }
4815
4816 DISPATCH_ALWAYS_INLINE
4817 static inline dispatch_queue_t
4818 dispatch_queue_invoke2(dispatch_queue_t dq, dispatch_invoke_flags_t flags,
4819 uint64_t *owned, struct dispatch_object_s **dc_ptr)
4820 {
4821 dispatch_queue_t otq = dq->do_targetq;
4822 dispatch_queue_t cq = _dispatch_queue_get_current();
4823
4824 if (slowpath(cq != otq)) {
4825 return otq;
4826 }
4827 if (dq->dq_width == 1) {
4828 return _dispatch_queue_serial_drain(dq, flags, owned, dc_ptr);
4829 }
4830 return _dispatch_queue_concurrent_drain(dq, flags, owned, dc_ptr);
4831 }
4832
4833 // 6618342 Contact the team that owns the Instrument DTrace probe before
4834 // renaming this symbol
4835 DISPATCH_NOINLINE
4836 void
4837 _dispatch_queue_invoke(dispatch_queue_t dq, dispatch_invoke_flags_t flags)
4838 {
4839 _dispatch_queue_class_invoke(dq, flags, dispatch_queue_invoke2);
4840 }
4841
4842 #pragma mark -
4843 #pragma mark dispatch_queue_class_wakeup
4844
4845 #if HAVE_PTHREAD_WORKQUEUE_QOS
4846 void
4847 _dispatch_queue_override_invoke(dispatch_continuation_t dc,
4848 dispatch_invoke_flags_t flags)
4849 {
4850 dispatch_queue_t old_rq = _dispatch_queue_get_current();
4851 dispatch_queue_t assumed_rq = dc->dc_other;
4852 voucher_t ov = DISPATCH_NO_VOUCHER;
4853 dispatch_object_t dou;
4854
4855 dou._do = dc->dc_data;
4856 _dispatch_queue_set_current(assumed_rq);
4857 flags |= DISPATCH_INVOKE_OVERRIDING;
4858 if (dc_type(dc) == DISPATCH_CONTINUATION_TYPE(OVERRIDE_STEALING)) {
4859 flags |= DISPATCH_INVOKE_STEALING;
4860 } else {
4861 // balance the fake continuation push in
4862 // _dispatch_root_queue_push_override
4863 _dispatch_trace_continuation_pop(assumed_rq, dou._do);
4864 }
4865 _dispatch_continuation_pop_forwarded(dc, ov, DISPATCH_OBJ_CONSUME_BIT, {
4866 if (_dispatch_object_has_vtable(dou._do)) {
4867 dx_invoke(dou._do, flags);
4868 } else {
4869 _dispatch_continuation_invoke_inline(dou, ov, flags);
4870 }
4871 });
4872 _dispatch_queue_set_current(old_rq);
4873 }
4874
4875 DISPATCH_ALWAYS_INLINE
4876 static inline bool
4877 _dispatch_need_global_root_queue_override(dispatch_queue_t rq,
4878 pthread_priority_t pp)
4879 {
4880 pthread_priority_t rqp = rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
4881 bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
4882
4883 if (unlikely(!rqp)) return false;
4884
4885 pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
4886 return defaultqueue ? pp && pp != rqp : pp > rqp;
4887 }
4888
4889 DISPATCH_ALWAYS_INLINE
4890 static inline bool
4891 _dispatch_need_global_root_queue_override_stealer(dispatch_queue_t rq,
4892 pthread_priority_t pp, dispatch_wakeup_flags_t wflags)
4893 {
4894 pthread_priority_t rqp = rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
4895 bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
4896
4897 if (unlikely(!rqp)) return false;
4898
4899 if (wflags & DISPATCH_WAKEUP_WAITER_HANDOFF) {
4900 if (!(wflags & _DISPATCH_WAKEUP_OVERRIDE_BITS)) {
4901 return false;
4902 }
4903 }
4904
4905 pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
4906 return defaultqueue || pp > rqp;
4907 }
4908
4909 DISPATCH_NOINLINE
4910 static void
4911 _dispatch_root_queue_push_override(dispatch_queue_t orig_rq,
4912 dispatch_object_t dou, pthread_priority_t pp)
4913 {
4914 bool overcommit = orig_rq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
4915 dispatch_queue_t rq = _dispatch_get_root_queue_for_priority(pp, overcommit);
4916 dispatch_continuation_t dc = dou._dc;
4917
4918 if (_dispatch_object_is_redirection(dc)) {
4919 // no double-wrap is needed, _dispatch_async_redirect_invoke will do
4920 // the right thing
4921 dc->dc_func = (void *)orig_rq;
4922 } else {
4923 dc = _dispatch_continuation_alloc();
4924 dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
4925 // fake that we queued `dou` on `orig_rq` for introspection purposes
4926 _dispatch_trace_continuation_push(orig_rq, dou);
4927 dc->dc_ctxt = dc;
4928 dc->dc_other = orig_rq;
4929 dc->dc_data = dou._do;
4930 dc->dc_priority = DISPATCH_NO_PRIORITY;
4931 dc->dc_voucher = DISPATCH_NO_VOUCHER;
4932 }
4933
4934 DISPATCH_COMPILER_CAN_ASSUME(dx_type(rq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE);
4935 _dispatch_queue_push_inline(rq, dc, 0, 0);
4936 }
4937
4938 DISPATCH_NOINLINE
4939 static void
4940 _dispatch_root_queue_push_override_stealer(dispatch_queue_t orig_rq,
4941 dispatch_queue_t dq, pthread_priority_t pp)
4942 {
4943 bool overcommit = orig_rq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
4944 dispatch_queue_t rq = _dispatch_get_root_queue_for_priority(pp, overcommit);
4945 dispatch_continuation_t dc = _dispatch_continuation_alloc();
4946
4947 dc->do_vtable = DC_VTABLE(OVERRIDE_STEALING);
4948 _dispatch_retain(dq);
4949 dc->dc_func = NULL;
4950 dc->dc_ctxt = dc;
4951 dc->dc_other = orig_rq;
4952 dc->dc_data = dq;
4953 dc->dc_priority = DISPATCH_NO_PRIORITY;
4954 dc->dc_voucher = DISPATCH_NO_VOUCHER;
4955
4956 DISPATCH_COMPILER_CAN_ASSUME(dx_type(rq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE);
4957 _dispatch_queue_push_inline(rq, dc, 0, 0);
4958 }
4959
4960 DISPATCH_NOINLINE
4961 static void
4962 _dispatch_queue_class_wakeup_with_override(dispatch_queue_t dq,
4963 pthread_priority_t pp, dispatch_wakeup_flags_t flags, uint64_t dq_state)
4964 {
4965 mach_port_t owner = _dq_state_drain_owner(dq_state);
4966 pthread_priority_t pp2;
4967 dispatch_queue_t tq;
4968 bool locked;
4969
4970 if (owner) {
4971 int rc = _dispatch_wqthread_override_start_check_owner(owner, pp,
4972 &dq->dq_state_lock);
4973 // EPERM means the target of the override is not a work queue thread
4974 // and could be a thread bound queue such as the main queue.
4975 // When that happens we must get to that queue and wake it up if we
4976 // want the override to be appplied and take effect.
4977 if (rc != EPERM) {
4978 goto out;
4979 }
4980 }
4981
4982 if (_dq_state_is_suspended(dq_state)) {
4983 goto out;
4984 }
4985
4986 tq = dq->do_targetq;
4987
4988 if (_dispatch_queue_has_immutable_target(dq)) {
4989 locked = false;
4990 } else if (_dispatch_is_in_root_queues_array(tq)) {
4991 // avoid locking when we recognize the target queue as a global root
4992 // queue it is gross, but is a very common case. The locking isn't
4993 // needed because these target queues cannot go away.
4994 locked = false;
4995 } else if (_dispatch_queue_sidelock_trylock(dq, pp)) {
4996 // <rdar://problem/17735825> to traverse the tq chain safely we must
4997 // lock it to ensure it cannot change
4998 locked = true;
4999 tq = dq->do_targetq;
5000 _dispatch_ktrace1(DISPATCH_PERF_mutable_target, dq);
5001 } else {
5002 //
5003 // Leading to being there, the current thread has:
5004 // 1. enqueued an object on `dq`
5005 // 2. raised the dq_override value of `dq`
5006 // 3. set the HAS_OVERRIDE bit and not seen an owner
5007 // 4. tried and failed to acquire the side lock
5008 //
5009 //
5010 // The side lock owner can only be one of three things:
5011 //
5012 // - The suspend/resume side count code. Besides being unlikely,
5013 // it means that at this moment the queue is actually suspended,
5014 // which transfers the responsibility of applying the override to
5015 // the eventual dispatch_resume().
5016 //
5017 // - A dispatch_set_target_queue() call. The fact that we saw no `owner`
5018 // means that the trysync it does wasn't being drained when (3)
5019 // happened which can only be explained by one of these interleavings:
5020 //
5021 // o `dq` became idle between when the object queued in (1) ran and
5022 // the set_target_queue call and we were unlucky enough that our
5023 // step (3) happened while this queue was idle. There is no reason
5024 // to override anything anymore, the queue drained to completion
5025 // while we were preempted, our job is done.
5026 //
5027 // o `dq` is queued but not draining during (1-3), then when we try
5028 // to lock at (4) the queue is now draining a set_target_queue.
5029 // Since we set HAS_OVERRIDE with a release barrier, the effect of
5030 // (2) was visible to the drainer when he acquired the drain lock,
5031 // and that guy has applied our override. Our job is done.
5032 //
5033 // - Another instance of _dispatch_queue_class_wakeup_with_override(),
5034 // which is fine because trylock leaves a hint that we failed our
5035 // trylock, causing the tryunlock below to fail and reassess whether
5036 // a better override needs to be applied.
5037 //
5038 _dispatch_ktrace1(DISPATCH_PERF_mutable_target, dq);
5039 goto out;
5040 }
5041
5042 apply_again:
5043 if (dx_type(tq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
5044 if (_dispatch_need_global_root_queue_override_stealer(tq, pp, flags)) {
5045 _dispatch_root_queue_push_override_stealer(tq, dq, pp);
5046 }
5047 } else if (flags & DISPATCH_WAKEUP_WAITER_HANDOFF) {
5048 dx_wakeup(tq, pp, flags);
5049 } else if (_dispatch_queue_need_override(tq, pp)) {
5050 dx_wakeup(tq, pp, DISPATCH_WAKEUP_OVERRIDING);
5051 }
5052 while (unlikely(locked && !_dispatch_queue_sidelock_tryunlock(dq))) {
5053 // rdar://problem/24081326
5054 //
5055 // Another instance of _dispatch_queue_class_wakeup_with_override()
5056 // tried to acquire the side lock while we were running, and could have
5057 // had a better override than ours to apply.
5058 //
5059 pp2 = dq->dq_override;
5060 if (pp2 > pp) {
5061 pp = pp2;
5062 // The other instance had a better priority than ours, override
5063 // our thread, and apply the override that wasn't applied to `dq`
5064 // because of us.
5065 goto apply_again;
5066 }
5067 }
5068
5069 out:
5070 if (flags & DISPATCH_WAKEUP_CONSUME) {
5071 return _dispatch_release_tailcall(dq);
5072 }
5073 }
5074 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5075
5076 DISPATCH_NOINLINE
5077 void
5078 _dispatch_queue_class_override_drainer(dispatch_queue_t dq,
5079 pthread_priority_t pp, dispatch_wakeup_flags_t flags)
5080 {
5081 #if HAVE_PTHREAD_WORKQUEUE_QOS
5082 uint64_t dq_state, value;
5083
5084 //
5085 // Someone is trying to override the last work item of the queue.
5086 // Do not remember this override on the queue because we know the precise
5087 // duration the override is required for: until the current drain unlocks.
5088 //
5089 // That is why this function only tries to set HAS_OVERRIDE if we can
5090 // still observe a drainer, and doesn't need to set the DIRTY bit
5091 // because oq_override wasn't touched and there is no race to resolve
5092 //
5093 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
5094 if (!_dq_state_drain_locked(dq_state)) {
5095 os_atomic_rmw_loop_give_up(break);
5096 }
5097 value = dq_state | DISPATCH_QUEUE_HAS_OVERRIDE;
5098 });
5099 if (_dq_state_drain_locked(dq_state)) {
5100 return _dispatch_queue_class_wakeup_with_override(dq, pp,
5101 flags, dq_state);
5102 }
5103 #else
5104 (void)pp;
5105 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5106 if (flags & DISPATCH_WAKEUP_CONSUME) {
5107 return _dispatch_release_tailcall(dq);
5108 }
5109 }
5110
5111 #if DISPATCH_USE_KEVENT_WORKQUEUE
5112 DISPATCH_NOINLINE
5113 static void
5114 _dispatch_trystash_to_deferred_items(dispatch_queue_t dq, dispatch_object_t dou,
5115 pthread_priority_t pp, dispatch_deferred_items_t ddi)
5116 {
5117 dispatch_priority_t old_pp = ddi->ddi_stashed_pp;
5118 dispatch_queue_t old_dq = ddi->ddi_stashed_dq;
5119 struct dispatch_object_s *old_dou = ddi->ddi_stashed_dou;
5120 dispatch_priority_t rq_overcommit;
5121
5122 rq_overcommit = dq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
5123 if (likely(!old_pp || rq_overcommit)) {
5124 ddi->ddi_stashed_dq = dq;
5125 ddi->ddi_stashed_dou = dou._do;
5126 ddi->ddi_stashed_pp = (dispatch_priority_t)pp | rq_overcommit |
5127 _PTHREAD_PRIORITY_PRIORITY_MASK;
5128 if (likely(!old_pp)) {
5129 return;
5130 }
5131 // push the previously stashed item
5132 pp = old_pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
5133 dq = old_dq;
5134 dou._do = old_dou;
5135 }
5136 if (_dispatch_need_global_root_queue_override(dq, pp)) {
5137 return _dispatch_root_queue_push_override(dq, dou, pp);
5138 }
5139 // bit of cheating: we should really pass `pp` but we know that we are
5140 // pushing onto a global queue at this point, and we just checked that
5141 // `pp` doesn't matter.
5142 DISPATCH_COMPILER_CAN_ASSUME(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE);
5143 _dispatch_queue_push_inline(dq, dou, 0, 0);
5144 }
5145 #endif
5146
5147 DISPATCH_NOINLINE
5148 static void
5149 _dispatch_queue_push_slow(dispatch_queue_t dq, dispatch_object_t dou,
5150 pthread_priority_t pp)
5151 {
5152 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
5153 _dispatch_root_queues_init_once);
5154 _dispatch_queue_push(dq, dou, pp);
5155 }
5156
5157 DISPATCH_NOINLINE
5158 void
5159 _dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
5160 pthread_priority_t pp)
5161 {
5162 _dispatch_assert_is_valid_qos_override(pp);
5163 if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
5164 #if DISPATCH_USE_KEVENT_WORKQUEUE
5165 dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
5166 if (unlikely(ddi && !(ddi->ddi_stashed_pp &
5167 (dispatch_priority_t)_PTHREAD_PRIORITY_FLAGS_MASK))) {
5168 dispatch_assert(_dispatch_root_queues_pred == DLOCK_ONCE_DONE);
5169 return _dispatch_trystash_to_deferred_items(dq, dou, pp, ddi);
5170 }
5171 #endif
5172 #if HAVE_PTHREAD_WORKQUEUE_QOS
5173 // can't use dispatch_once_f() as it would create a frame
5174 if (unlikely(_dispatch_root_queues_pred != DLOCK_ONCE_DONE)) {
5175 return _dispatch_queue_push_slow(dq, dou, pp);
5176 }
5177 if (_dispatch_need_global_root_queue_override(dq, pp)) {
5178 return _dispatch_root_queue_push_override(dq, dou, pp);
5179 }
5180 #endif
5181 }
5182 _dispatch_queue_push_inline(dq, dou, pp, 0);
5183 }
5184
5185 DISPATCH_NOINLINE
5186 static void
5187 _dispatch_queue_class_wakeup_enqueue(dispatch_queue_t dq, pthread_priority_t pp,
5188 dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
5189 {
5190 dispatch_queue_t tq;
5191
5192 if (flags & (DISPATCH_WAKEUP_OVERRIDING | DISPATCH_WAKEUP_WAS_OVERRIDDEN)) {
5193 // _dispatch_queue_drain_try_unlock may have reset the override while
5194 // we were becoming the enqueuer
5195 _dispatch_queue_reinstate_override_priority(dq, (dispatch_priority_t)pp);
5196 }
5197 if (!(flags & DISPATCH_WAKEUP_CONSUME)) {
5198 _dispatch_retain(dq);
5199 }
5200 if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
5201 // try_become_enqueuer has no acquire barrier, as the last block
5202 // of a queue asyncing to that queue is not an uncommon pattern
5203 // and in that case the acquire is completely useless
5204 //
5205 // so instead use a thread fence here when we will read the targetq
5206 // pointer because that is the only thing that really requires
5207 // that barrier.
5208 os_atomic_thread_fence(acquire);
5209 tq = dq->do_targetq;
5210 } else {
5211 dispatch_assert(target == DISPATCH_QUEUE_WAKEUP_MGR);
5212 tq = &_dispatch_mgr_q;
5213 }
5214 return _dispatch_queue_push(tq, dq, pp);
5215 }
5216
5217 DISPATCH_NOINLINE
5218 void
5219 _dispatch_queue_class_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
5220 dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
5221 {
5222 uint64_t old_state, new_state, bits = 0;
5223
5224 #if HAVE_PTHREAD_WORKQUEUE_QOS
5225 _dispatch_queue_override_priority(dq, /* inout */ &pp, /* inout */ &flags);
5226 #endif
5227
5228 if (flags & DISPATCH_WAKEUP_FLUSH) {
5229 bits = DISPATCH_QUEUE_DIRTY;
5230 }
5231 if (flags & DISPATCH_WAKEUP_OVERRIDING) {
5232 //
5233 // Setting the dirty bit here is about forcing callers of
5234 // _dispatch_queue_drain_try_unlock() to loop again when an override
5235 // has just been set to close the following race:
5236 //
5237 // Drainer (in drain_try_unlokc():
5238 // override_reset();
5239 // preempted....
5240 //
5241 // Enqueuer:
5242 // atomic_or(oq_override, override, relaxed);
5243 // atomic_or(dq_state, HAS_OVERRIDE, release);
5244 //
5245 // Drainer:
5246 // ... resumes
5247 // successful drain_unlock() and leaks `oq_override`
5248 //
5249 bits = DISPATCH_QUEUE_DIRTY | DISPATCH_QUEUE_HAS_OVERRIDE;
5250 }
5251
5252 if (flags & DISPATCH_WAKEUP_SLOW_WAITER) {
5253 uint64_t pending_barrier_width =
5254 (dq->dq_width - 1) * DISPATCH_QUEUE_WIDTH_INTERVAL;
5255 uint64_t xor_owner_and_set_full_width_and_in_barrier =
5256 _dispatch_tid_self() | DISPATCH_QUEUE_WIDTH_FULL_BIT |
5257 DISPATCH_QUEUE_IN_BARRIER;
5258
5259 #ifdef DLOCK_NOWAITERS_BIT
5260 bits |= DLOCK_NOWAITERS_BIT;
5261 #else
5262 bits |= DLOCK_WAITERS_BIT;
5263 #endif
5264 flags ^= DISPATCH_WAKEUP_SLOW_WAITER;
5265 dispatch_assert(!(flags & DISPATCH_WAKEUP_CONSUME));
5266
5267 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
5268 new_state = old_state | bits;
5269 if (_dq_state_drain_pended(old_state)) {
5270 // same as DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT
5271 // but we want to be more efficient wrt the WAITERS_BIT
5272 new_state &= ~DISPATCH_QUEUE_DRAIN_OWNER_MASK;
5273 new_state &= ~DISPATCH_QUEUE_DRAIN_PENDED;
5274 }
5275 if (unlikely(_dq_state_drain_locked(new_state))) {
5276 #ifdef DLOCK_NOWAITERS_BIT
5277 new_state &= ~(uint64_t)DLOCK_NOWAITERS_BIT;
5278 #endif
5279 } else if (unlikely(!_dq_state_is_runnable(new_state) ||
5280 !(flags & DISPATCH_WAKEUP_FLUSH))) {
5281 // either not runnable, or was not for the first item (26700358)
5282 // so we should not try to lock and handle overrides instead
5283 } else if (_dq_state_has_pending_barrier(old_state) ||
5284 new_state + pending_barrier_width <
5285 DISPATCH_QUEUE_WIDTH_FULL_BIT) {
5286 // see _dispatch_queue_drain_try_lock
5287 new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
5288 new_state ^= xor_owner_and_set_full_width_and_in_barrier;
5289 } else {
5290 new_state |= DISPATCH_QUEUE_ENQUEUED;
5291 }
5292 });
5293 if ((old_state ^ new_state) & DISPATCH_QUEUE_IN_BARRIER) {
5294 return _dispatch_try_lock_transfer_or_wakeup(dq);
5295 }
5296 } else if (bits) {
5297 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{
5298 new_state = old_state | bits;
5299 if (likely(_dq_state_should_wakeup(old_state))) {
5300 new_state |= DISPATCH_QUEUE_ENQUEUED;
5301 }
5302 });
5303 } else {
5304 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed,{
5305 new_state = old_state;
5306 if (likely(_dq_state_should_wakeup(old_state))) {
5307 new_state |= DISPATCH_QUEUE_ENQUEUED;
5308 } else {
5309 os_atomic_rmw_loop_give_up(break);
5310 }
5311 });
5312 }
5313
5314 if ((old_state ^ new_state) & DISPATCH_QUEUE_ENQUEUED) {
5315 return _dispatch_queue_class_wakeup_enqueue(dq, pp, flags, target);
5316 }
5317
5318 #if HAVE_PTHREAD_WORKQUEUE_QOS
5319 if ((flags & (DISPATCH_WAKEUP_OVERRIDING | DISPATCH_WAKEUP_WAITER_HANDOFF))
5320 && target == DISPATCH_QUEUE_WAKEUP_TARGET) {
5321 return _dispatch_queue_class_wakeup_with_override(dq, pp,
5322 flags, new_state);
5323 }
5324 #endif
5325
5326 if (flags & DISPATCH_WAKEUP_CONSUME) {
5327 return _dispatch_release_tailcall(dq);
5328 }
5329 }
5330
5331 #pragma mark -
5332 #pragma mark dispatch_root_queue_drain
5333
5334 DISPATCH_NOINLINE
5335 static bool
5336 _dispatch_root_queue_drain_one_slow(dispatch_queue_t dq)
5337 {
5338 dispatch_root_queue_context_t qc = dq->do_ctxt;
5339 struct dispatch_object_s *const mediator = (void *)~0ul;
5340 bool pending = false, available = true;
5341 unsigned int sleep_time = DISPATCH_CONTENTION_USLEEP_START;
5342
5343 do {
5344 // Spin for a short while in case the contention is temporary -- e.g.
5345 // when starting up after dispatch_apply, or when executing a few
5346 // short continuations in a row.
5347 if (_dispatch_contention_wait_until(dq->dq_items_head != mediator)) {
5348 goto out;
5349 }
5350 // Since we have serious contention, we need to back off.
5351 if (!pending) {
5352 // Mark this queue as pending to avoid requests for further threads
5353 (void)os_atomic_inc2o(qc, dgq_pending, relaxed);
5354 pending = true;
5355 }
5356 _dispatch_contention_usleep(sleep_time);
5357 if (fastpath(dq->dq_items_head != mediator)) goto out;
5358 sleep_time *= 2;
5359 } while (sleep_time < DISPATCH_CONTENTION_USLEEP_MAX);
5360
5361 // The ratio of work to libdispatch overhead must be bad. This
5362 // scenario implies that there are too many threads in the pool.
5363 // Create a new pending thread and then exit this thread.
5364 // The kernel will grant a new thread when the load subsides.
5365 _dispatch_debug("contention on global queue: %p", dq);
5366 available = false;
5367 out:
5368 if (pending) {
5369 (void)os_atomic_dec2o(qc, dgq_pending, relaxed);
5370 }
5371 if (!available) {
5372 _dispatch_global_queue_poke(dq);
5373 }
5374 return available;
5375 }
5376
5377 DISPATCH_ALWAYS_INLINE
5378 static inline bool
5379 _dispatch_root_queue_drain_one2(dispatch_queue_t dq)
5380 {
5381 // Wait for queue head and tail to be both non-empty or both empty
5382 bool available; // <rdar://problem/15917893>
5383 _dispatch_wait_until((dq->dq_items_head != NULL) ==
5384 (available = (dq->dq_items_tail != NULL)));
5385 return available;
5386 }
5387
5388 DISPATCH_ALWAYS_INLINE_NDEBUG
5389 static inline struct dispatch_object_s *
5390 _dispatch_root_queue_drain_one(dispatch_queue_t dq)
5391 {
5392 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
5393
5394 start:
5395 // The mediator value acts both as a "lock" and a signal
5396 head = os_atomic_xchg2o(dq, dq_items_head, mediator, relaxed);
5397
5398 if (slowpath(head == NULL)) {
5399 // The first xchg on the tail will tell the enqueueing thread that it
5400 // is safe to blindly write out to the head pointer. A cmpxchg honors
5401 // the algorithm.
5402 if (slowpath(!os_atomic_cmpxchg2o(dq, dq_items_head, mediator,
5403 NULL, relaxed))) {
5404 goto start;
5405 }
5406 if (slowpath(dq->dq_items_tail) && // <rdar://problem/14416349>
5407 _dispatch_root_queue_drain_one2(dq)) {
5408 goto start;
5409 }
5410 _dispatch_root_queue_debug("no work on global queue: %p", dq);
5411 return NULL;
5412 }
5413
5414 if (slowpath(head == mediator)) {
5415 // This thread lost the race for ownership of the queue.
5416 if (fastpath(_dispatch_root_queue_drain_one_slow(dq))) {
5417 goto start;
5418 }
5419 return NULL;
5420 }
5421
5422 // Restore the head pointer to a sane value before returning.
5423 // If 'next' is NULL, then this item _might_ be the last item.
5424 next = fastpath(head->do_next);
5425
5426 if (slowpath(!next)) {
5427 os_atomic_store2o(dq, dq_items_head, NULL, relaxed);
5428 // 22708742: set tail to NULL with release, so that NULL write to head
5429 // above doesn't clobber head from concurrent enqueuer
5430 if (os_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL, release)) {
5431 // both head and tail are NULL now
5432 goto out;
5433 }
5434 // There must be a next item now.
5435 _dispatch_wait_until(next = head->do_next);
5436 }
5437
5438 os_atomic_store2o(dq, dq_items_head, next, relaxed);
5439 _dispatch_global_queue_poke(dq);
5440 out:
5441 return head;
5442 }
5443
5444 void
5445 _dispatch_root_queue_drain_deferred_item(dispatch_queue_t dq,
5446 struct dispatch_object_s *dou, pthread_priority_t pp)
5447 {
5448 struct _dispatch_identity_s di;
5449
5450 // fake that we queued `dou` on `dq` for introspection purposes
5451 _dispatch_trace_continuation_push(dq, dou);
5452
5453 pp = _dispatch_priority_inherit_from_root_queue(pp, dq);
5454 _dispatch_queue_set_current(dq);
5455 _dispatch_root_queue_identity_assume(&di, pp);
5456 #if DISPATCH_COCOA_COMPAT
5457 void *pool = _dispatch_last_resort_autorelease_pool_push();
5458 #endif // DISPATCH_COCOA_COMPAT
5459
5460 _dispatch_perfmon_start();
5461 _dispatch_continuation_pop_inline(dou, dq,
5462 DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN);
5463 _dispatch_perfmon_workitem_inc();
5464 _dispatch_perfmon_end();
5465
5466 #if DISPATCH_COCOA_COMPAT
5467 _dispatch_last_resort_autorelease_pool_pop(pool);
5468 #endif // DISPATCH_COCOA_COMPAT
5469 _dispatch_reset_defaultpriority(di.old_pp);
5470 _dispatch_queue_set_current(NULL);
5471
5472 _dispatch_voucher_debug("root queue clear", NULL);
5473 _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
5474 }
5475
5476 DISPATCH_NOT_TAIL_CALLED // prevent tailcall (for Instrument DTrace probe)
5477 static void
5478 _dispatch_root_queue_drain(dispatch_queue_t dq, pthread_priority_t pri)
5479 {
5480 #if DISPATCH_DEBUG
5481 dispatch_queue_t cq;
5482 if (slowpath(cq = _dispatch_queue_get_current())) {
5483 DISPATCH_INTERNAL_CRASH(cq, "Premature thread recycling");
5484 }
5485 #endif
5486 _dispatch_queue_set_current(dq);
5487 if (dq->dq_priority) pri = dq->dq_priority;
5488 pthread_priority_t old_dp = _dispatch_set_defaultpriority(pri, NULL);
5489 #if DISPATCH_COCOA_COMPAT
5490 void *pool = _dispatch_last_resort_autorelease_pool_push();
5491 #endif // DISPATCH_COCOA_COMPAT
5492
5493 _dispatch_perfmon_start();
5494 struct dispatch_object_s *item;
5495 bool reset = false;
5496 while ((item = fastpath(_dispatch_root_queue_drain_one(dq)))) {
5497 if (reset) _dispatch_wqthread_override_reset();
5498 _dispatch_continuation_pop_inline(item, dq,
5499 DISPATCH_INVOKE_WORKER_DRAIN|DISPATCH_INVOKE_REDIRECTING_DRAIN);
5500 _dispatch_perfmon_workitem_inc();
5501 reset = _dispatch_reset_defaultpriority_override();
5502 }
5503 _dispatch_perfmon_end();
5504
5505 #if DISPATCH_COCOA_COMPAT
5506 _dispatch_last_resort_autorelease_pool_pop(pool);
5507 #endif // DISPATCH_COCOA_COMPAT
5508 _dispatch_reset_defaultpriority(old_dp);
5509 _dispatch_queue_set_current(NULL);
5510 }
5511
5512 #pragma mark -
5513 #pragma mark dispatch_worker_thread
5514
5515 #if HAVE_PTHREAD_WORKQUEUES
5516 static void
5517 _dispatch_worker_thread4(void *context)
5518 {
5519 dispatch_queue_t dq = context;
5520 dispatch_root_queue_context_t qc = dq->do_ctxt;
5521
5522 _dispatch_introspection_thread_add();
5523 int pending = (int)os_atomic_dec2o(qc, dgq_pending, relaxed);
5524 dispatch_assert(pending >= 0);
5525 _dispatch_root_queue_drain(dq, _dispatch_get_priority());
5526 _dispatch_voucher_debug("root queue clear", NULL);
5527 _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
5528 }
5529
5530 #if HAVE_PTHREAD_WORKQUEUE_QOS
5531 static void
5532 _dispatch_worker_thread3(pthread_priority_t pp)
5533 {
5534 bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
5535 dispatch_queue_t dq;
5536 pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
5537 _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
5538 dq = _dispatch_get_root_queue_for_priority(pp, overcommit);
5539 return _dispatch_worker_thread4(dq);
5540 }
5541 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5542
5543 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5544 // 6618342 Contact the team that owns the Instrument DTrace probe before
5545 // renaming this symbol
5546 static void
5547 _dispatch_worker_thread2(int priority, int options,
5548 void *context DISPATCH_UNUSED)
5549 {
5550 dispatch_assert(priority >= 0 && priority < WORKQ_NUM_PRIOQUEUE);
5551 dispatch_assert(!(options & ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT));
5552 dispatch_queue_t dq = _dispatch_wq2root_queues[priority][options];
5553
5554 return _dispatch_worker_thread4(dq);
5555 }
5556 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5557 #endif // HAVE_PTHREAD_WORKQUEUES
5558
5559 #if DISPATCH_USE_PTHREAD_POOL
5560 // 6618342 Contact the team that owns the Instrument DTrace probe before
5561 // renaming this symbol
5562 static void *
5563 _dispatch_worker_thread(void *context)
5564 {
5565 dispatch_queue_t dq = context;
5566 dispatch_root_queue_context_t qc = dq->do_ctxt;
5567 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
5568
5569 if (pqc->dpq_observer_hooks.queue_will_execute) {
5570 _dispatch_set_pthread_root_queue_observer_hooks(
5571 &pqc->dpq_observer_hooks);
5572 }
5573 if (pqc->dpq_thread_configure) {
5574 pqc->dpq_thread_configure();
5575 }
5576
5577 sigset_t mask;
5578 int r;
5579 // workaround tweaks the kernel workqueue does for us
5580 r = sigfillset(&mask);
5581 (void)dispatch_assume_zero(r);
5582 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
5583 (void)dispatch_assume_zero(r);
5584 _dispatch_introspection_thread_add();
5585
5586 const int64_t timeout = 5ull * NSEC_PER_SEC;
5587 pthread_priority_t old_pri = _dispatch_get_priority();
5588 do {
5589 _dispatch_root_queue_drain(dq, old_pri);
5590 _dispatch_reset_priority_and_voucher(old_pri, NULL);
5591 } while (dispatch_semaphore_wait(&pqc->dpq_thread_mediator,
5592 dispatch_time(0, timeout)) == 0);
5593
5594 (void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);
5595 _dispatch_global_queue_poke(dq);
5596 _dispatch_release(dq);
5597
5598 return NULL;
5599 }
5600
5601 int
5602 _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset)
5603 {
5604 int r;
5605
5606 /* Workaround: 6269619 Not all signals can be delivered on any thread */
5607
5608 r = sigdelset(set, SIGILL);
5609 (void)dispatch_assume_zero(r);
5610 r = sigdelset(set, SIGTRAP);
5611 (void)dispatch_assume_zero(r);
5612 #if HAVE_DECL_SIGEMT
5613 r = sigdelset(set, SIGEMT);
5614 (void)dispatch_assume_zero(r);
5615 #endif
5616 r = sigdelset(set, SIGFPE);
5617 (void)dispatch_assume_zero(r);
5618 r = sigdelset(set, SIGBUS);
5619 (void)dispatch_assume_zero(r);
5620 r = sigdelset(set, SIGSEGV);
5621 (void)dispatch_assume_zero(r);
5622 r = sigdelset(set, SIGSYS);
5623 (void)dispatch_assume_zero(r);
5624 r = sigdelset(set, SIGPIPE);
5625 (void)dispatch_assume_zero(r);
5626
5627 return pthread_sigmask(how, set, oset);
5628 }
5629 #endif // DISPATCH_USE_PTHREAD_POOL
5630
5631 #pragma mark -
5632 #pragma mark dispatch_runloop_queue
5633
5634 static bool _dispatch_program_is_probably_callback_driven;
5635
5636 #if DISPATCH_COCOA_COMPAT
5637
5638 dispatch_queue_t
5639 _dispatch_runloop_root_queue_create_4CF(const char *label, unsigned long flags)
5640 {
5641 dispatch_queue_t dq;
5642 size_t dqs;
5643
5644 if (slowpath(flags)) {
5645 return DISPATCH_BAD_INPUT;
5646 }
5647 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
5648 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_runloop), dqs);
5649 _dispatch_queue_init(dq, DQF_THREAD_BOUND | DQF_CANNOT_TRYSYNC, 1, false);
5650 dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,true);
5651 dq->dq_label = label ? label : "runloop-queue"; // no-copy contract
5652 _dispatch_runloop_queue_handle_init(dq);
5653 _dispatch_queue_set_bound_thread(dq);
5654 _dispatch_object_debug(dq, "%s", __func__);
5655 return _dispatch_introspection_queue_create(dq);
5656 }
5657
5658 void
5659 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq)
5660 {
5661 _dispatch_object_debug(dq, "%s", __func__);
5662
5663 pthread_priority_t pp = _dispatch_queue_reset_override_priority(dq, true);
5664 _dispatch_queue_clear_bound_thread(dq);
5665 dx_wakeup(dq, pp, DISPATCH_WAKEUP_FLUSH);
5666 if (pp) _dispatch_thread_override_end(DISPATCH_QUEUE_DRAIN_OWNER(dq), dq);
5667 }
5668
5669 void
5670 _dispatch_runloop_queue_dispose(dispatch_queue_t dq)
5671 {
5672 _dispatch_object_debug(dq, "%s", __func__);
5673 _dispatch_introspection_queue_dispose(dq);
5674 _dispatch_runloop_queue_handle_dispose(dq);
5675 _dispatch_queue_destroy(dq);
5676 }
5677
5678 bool
5679 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq)
5680 {
5681 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
5682 DISPATCH_CLIENT_CRASH(dq->do_vtable, "Not a runloop queue");
5683 }
5684 dispatch_retain(dq);
5685 bool r = _dispatch_runloop_queue_drain_one(dq);
5686 dispatch_release(dq);
5687 return r;
5688 }
5689
5690 void
5691 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq)
5692 {
5693 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
5694 DISPATCH_CLIENT_CRASH(dq->do_vtable, "Not a runloop queue");
5695 }
5696 _dispatch_runloop_queue_wakeup(dq, 0, false);
5697 }
5698
5699 dispatch_runloop_handle_t
5700 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq)
5701 {
5702 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
5703 DISPATCH_CLIENT_CRASH(dq->do_vtable, "Not a runloop queue");
5704 }
5705 return _dispatch_runloop_queue_get_handle(dq);
5706 }
5707
5708 static void
5709 _dispatch_runloop_queue_handle_init(void *ctxt)
5710 {
5711 dispatch_queue_t dq = (dispatch_queue_t)ctxt;
5712 dispatch_runloop_handle_t handle;
5713
5714 _dispatch_fork_becomes_unsafe();
5715
5716 #if TARGET_OS_MAC
5717 mach_port_t mp;
5718 kern_return_t kr;
5719 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &mp);
5720 DISPATCH_VERIFY_MIG(kr);
5721 (void)dispatch_assume_zero(kr);
5722 kr = mach_port_insert_right(mach_task_self(), mp, mp,
5723 MACH_MSG_TYPE_MAKE_SEND);
5724 DISPATCH_VERIFY_MIG(kr);
5725 (void)dispatch_assume_zero(kr);
5726 if (dq != &_dispatch_main_q) {
5727 struct mach_port_limits limits = {
5728 .mpl_qlimit = 1,
5729 };
5730 kr = mach_port_set_attributes(mach_task_self(), mp,
5731 MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits,
5732 sizeof(limits));
5733 DISPATCH_VERIFY_MIG(kr);
5734 (void)dispatch_assume_zero(kr);
5735 }
5736 handle = mp;
5737 #elif defined(__linux__)
5738 int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
5739 if (fd == -1) {
5740 int err = errno;
5741 switch (err) {
5742 case EMFILE:
5743 DISPATCH_CLIENT_CRASH(err, "eventfd() failure: "
5744 "process is out of file descriptors");
5745 break;
5746 case ENFILE:
5747 DISPATCH_CLIENT_CRASH(err, "eventfd() failure: "
5748 "system is out of file descriptors");
5749 break;
5750 case ENOMEM:
5751 DISPATCH_CLIENT_CRASH(err, "eventfd() failure: "
5752 "kernel is out of memory");
5753 break;
5754 default:
5755 DISPATCH_INTERNAL_CRASH(err, "eventfd() failure");
5756 break;
5757 }
5758 }
5759 handle = fd;
5760 #else
5761 #error "runloop support not implemented on this platform"
5762 #endif
5763 _dispatch_runloop_queue_set_handle(dq, handle);
5764
5765 _dispatch_program_is_probably_callback_driven = true;
5766 }
5767
5768 static void
5769 _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq)
5770 {
5771 dispatch_runloop_handle_t handle = _dispatch_runloop_queue_get_handle(dq);
5772 if (!_dispatch_runloop_handle_is_valid(handle)) {
5773 return;
5774 }
5775 dq->do_ctxt = NULL;
5776 #if TARGET_OS_MAC
5777 mach_port_t mp = handle;
5778 kern_return_t kr = mach_port_deallocate(mach_task_self(), mp);
5779 DISPATCH_VERIFY_MIG(kr);
5780 (void)dispatch_assume_zero(kr);
5781 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE, -1);
5782 DISPATCH_VERIFY_MIG(kr);
5783 (void)dispatch_assume_zero(kr);
5784 #elif defined(__linux__)
5785 int rc = close(handle);
5786 (void)dispatch_assume_zero(rc);
5787 #else
5788 #error "runloop support not implemented on this platform"
5789 #endif
5790 }
5791
5792 #pragma mark -
5793 #pragma mark dispatch_main_queue
5794
5795 dispatch_runloop_handle_t
5796 _dispatch_get_main_queue_handle_4CF(void)
5797 {
5798 dispatch_queue_t dq = &_dispatch_main_q;
5799 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
5800 _dispatch_runloop_queue_handle_init);
5801 return _dispatch_runloop_queue_get_handle(dq);
5802 }
5803
5804 #if TARGET_OS_MAC
5805 dispatch_runloop_handle_t
5806 _dispatch_get_main_queue_port_4CF(void)
5807 {
5808 return _dispatch_get_main_queue_handle_4CF();
5809 }
5810 #endif
5811
5812 static bool main_q_is_draining;
5813
5814 // 6618342 Contact the team that owns the Instrument DTrace probe before
5815 // renaming this symbol
5816 DISPATCH_NOINLINE
5817 static void
5818 _dispatch_queue_set_mainq_drain_state(bool arg)
5819 {
5820 main_q_is_draining = arg;
5821 }
5822
5823 void
5824 _dispatch_main_queue_callback_4CF(
5825 void *ignored DISPATCH_UNUSED)
5826 {
5827 if (main_q_is_draining) {
5828 return;
5829 }
5830 _dispatch_queue_set_mainq_drain_state(true);
5831 _dispatch_main_queue_drain();
5832 _dispatch_queue_set_mainq_drain_state(false);
5833 }
5834
5835 #endif
5836
5837 void
5838 dispatch_main(void)
5839 {
5840 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
5841 _dispatch_root_queues_init_once);
5842
5843 #if HAVE_PTHREAD_MAIN_NP
5844 if (pthread_main_np()) {
5845 #endif
5846 _dispatch_object_debug(&_dispatch_main_q, "%s", __func__);
5847 _dispatch_program_is_probably_callback_driven = true;
5848 _dispatch_ktrace0(ARIADNE_ENTER_DISPATCH_MAIN_CODE);
5849 #ifdef __linux__
5850 // On Linux, if the main thread calls pthread_exit, the process becomes a zombie.
5851 // To avoid that, just before calling pthread_exit we register a TSD destructor
5852 // that will call _dispatch_sig_thread -- thus capturing the main thread in sigsuspend.
5853 // This relies on an implementation detail (currently true in glibc) that TSD destructors
5854 // will be called in the order of creation to cause all the TSD cleanup functions to
5855 // run before the thread becomes trapped in sigsuspend.
5856 pthread_key_t dispatch_main_key;
5857 pthread_key_create(&dispatch_main_key, _dispatch_sig_thread);
5858 pthread_setspecific(dispatch_main_key, &dispatch_main_key);
5859 #endif
5860 pthread_exit(NULL);
5861 DISPATCH_INTERNAL_CRASH(errno, "pthread_exit() returned");
5862 #if HAVE_PTHREAD_MAIN_NP
5863 }
5864 DISPATCH_CLIENT_CRASH(0, "dispatch_main() must be called on the main thread");
5865 #endif
5866 }
5867
5868 DISPATCH_NOINLINE DISPATCH_NORETURN
5869 static void
5870 _dispatch_sigsuspend(void)
5871 {
5872 static const sigset_t mask;
5873
5874 for (;;) {
5875 sigsuspend(&mask);
5876 }
5877 }
5878
5879 DISPATCH_NORETURN
5880 static void
5881 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED)
5882 {
5883 // never returns, so burn bridges behind us
5884 _dispatch_clear_stack(0);
5885 _dispatch_sigsuspend();
5886 }
5887
5888 DISPATCH_NOINLINE
5889 static void
5890 _dispatch_queue_cleanup2(void)
5891 {
5892 dispatch_queue_t dq = &_dispatch_main_q;
5893 _dispatch_queue_clear_bound_thread(dq);
5894
5895 // <rdar://problem/22623242>
5896 // Here is what happens when both this cleanup happens because of
5897 // dispatch_main() being called, and a concurrent enqueuer makes the queue
5898 // non empty.
5899 //
5900 // _dispatch_queue_cleanup2:
5901 // atomic_and(dq_is_thread_bound, ~DQF_THREAD_BOUND, relaxed);
5902 // maximal_barrier();
5903 // if (load(dq_items_tail, seq_cst)) {
5904 // // do the wake up the normal serial queue way
5905 // } else {
5906 // // do no wake up <----
5907 // }
5908 //
5909 // enqueuer:
5910 // store(dq_items_tail, new_tail, release);
5911 // if (load(dq_is_thread_bound, relaxed)) {
5912 // // do the wake up the runloop way <----
5913 // } else {
5914 // // do the wake up the normal serial way
5915 // }
5916 //
5917 // what would be bad is to take both paths marked <---- because the queue
5918 // wouldn't be woken up until the next time it's used (which may never
5919 // happen)
5920 //
5921 // An enqueuer that speculates the load of the old value of thread_bound
5922 // and then does the store may wake up the main queue the runloop way.
5923 // But then, the cleanup thread will see that store because the load
5924 // of dq_items_tail is sequentially consistent, and we have just thrown away
5925 // our pipeline.
5926 //
5927 // By the time cleanup2() is out of the maximally synchronizing barrier,
5928 // no other thread can speculate the wrong load anymore, and both cleanup2()
5929 // and a concurrent enqueuer would treat the queue in the standard non
5930 // thread bound way
5931
5932 _dispatch_queue_atomic_flags_clear(dq,
5933 DQF_THREAD_BOUND | DQF_CANNOT_TRYSYNC);
5934 os_atomic_maximally_synchronizing_barrier();
5935 // no need to drop the override, the thread will die anyway
5936 // the barrier above includes an acquire, so it's ok to do this raw
5937 // call to dx_wakeup(0)
5938 dx_wakeup(dq, 0, 0);
5939
5940 // overload the "probably" variable to mean that dispatch_main() or
5941 // similar non-POSIX API was called
5942 // this has to run before the DISPATCH_COCOA_COMPAT below
5943 // See dispatch_main for call to _dispatch_sig_thread on linux.
5944 #ifndef __linux__
5945 if (_dispatch_program_is_probably_callback_driven) {
5946 _dispatch_barrier_async_detached_f(_dispatch_get_root_queue(
5947 _DISPATCH_QOS_CLASS_DEFAULT, true), NULL, _dispatch_sig_thread);
5948 sleep(1); // workaround 6778970
5949 }
5950 #endif
5951
5952 #if DISPATCH_COCOA_COMPAT
5953 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
5954 _dispatch_runloop_queue_handle_init);
5955 _dispatch_runloop_queue_handle_dispose(dq);
5956 #endif
5957 }
5958
5959 static void
5960 _dispatch_queue_cleanup(void *ctxt)
5961 {
5962 if (ctxt == &_dispatch_main_q) {
5963 return _dispatch_queue_cleanup2();
5964 }
5965 // POSIX defines that destructors are only called if 'ctxt' is non-null
5966 DISPATCH_INTERNAL_CRASH(ctxt,
5967 "Premature thread exit while a dispatch queue is running");
5968 }
5969
5970 static void
5971 _dispatch_deferred_items_cleanup(void *ctxt)
5972 {
5973 // POSIX defines that destructors are only called if 'ctxt' is non-null
5974 DISPATCH_INTERNAL_CRASH(ctxt,
5975 "Premature thread exit with unhandled deferred items");
5976 }
5977
5978 static void
5979 _dispatch_frame_cleanup(void *ctxt)
5980 {
5981 // POSIX defines that destructors are only called if 'ctxt' is non-null
5982 DISPATCH_INTERNAL_CRASH(ctxt,
5983 "Premature thread exit while a dispatch frame is active");
5984 }
5985
5986 static void
5987 _dispatch_context_cleanup(void *ctxt)
5988 {
5989 // POSIX defines that destructors are only called if 'ctxt' is non-null
5990 DISPATCH_INTERNAL_CRASH(ctxt,
5991 "Premature thread exit while a dispatch context is set");
5992 }