]> git.saurik.com Git - apple/libdispatch.git/blob - src/queue.c
libdispatch-703.1.4.tar.gz
[apple/libdispatch.git] / src / queue.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #endif
25
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
29 #endif
30 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
31 #define DISPATCH_USE_PTHREAD_POOL 1
32 #endif
33 #if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || DISPATCH_DEBUG) \
34 && !defined(DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK)
35 #define DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1
36 #endif
37 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK && \
38 !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
39 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
40 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
41 #endif
42 #if HAVE_PTHREAD_WORKQUEUE_QOS && !DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
43 #undef HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
44 #define HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 0
45 #endif
46 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
47 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
48 #define pthread_workqueue_t void*
49 #endif
50
51 static void _dispatch_sig_thread(void *ctxt);
52 static void _dispatch_cache_cleanup(void *value);
53 static void _dispatch_sync_f(dispatch_queue_t dq, void *ctxt,
54 dispatch_function_t func, pthread_priority_t pp);
55 static void _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc);
56 static void _dispatch_queue_cleanup(void *ctxt);
57 static void _dispatch_deferred_items_cleanup(void *ctxt);
58 static void _dispatch_frame_cleanup(void *ctxt);
59 static void _dispatch_context_cleanup(void *ctxt);
60 static void _dispatch_non_barrier_complete(dispatch_queue_t dq);
61 static inline void _dispatch_global_queue_poke(dispatch_queue_t dq);
62 #if HAVE_PTHREAD_WORKQUEUES
63 static void _dispatch_worker_thread4(void *context);
64 #if HAVE_PTHREAD_WORKQUEUE_QOS
65 static void _dispatch_worker_thread3(pthread_priority_t priority);
66 #endif
67 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
68 static void _dispatch_worker_thread2(int priority, int options, void *context);
69 #endif
70 #endif
71 #if DISPATCH_USE_PTHREAD_POOL
72 static void *_dispatch_worker_thread(void *context);
73 static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
74 #endif
75
76 #if DISPATCH_COCOA_COMPAT
77 static dispatch_once_t _dispatch_main_q_handle_pred;
78 static void _dispatch_runloop_queue_poke(dispatch_queue_t dq,
79 pthread_priority_t pp, dispatch_wakeup_flags_t flags);
80 static void _dispatch_runloop_queue_handle_init(void *ctxt);
81 static void _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq);
82 #endif
83
84 static void _dispatch_root_queues_init_once(void *context);
85 static dispatch_once_t _dispatch_root_queues_pred;
86
87 #pragma mark -
88 #pragma mark dispatch_root_queue
89
90 struct dispatch_pthread_root_queue_context_s {
91 pthread_attr_t dpq_thread_attr;
92 dispatch_block_t dpq_thread_configure;
93 struct dispatch_semaphore_s dpq_thread_mediator;
94 dispatch_pthread_root_queue_observer_hooks_s dpq_observer_hooks;
95 };
96 typedef struct dispatch_pthread_root_queue_context_s *
97 dispatch_pthread_root_queue_context_t;
98
99 #if DISPATCH_ENABLE_THREAD_POOL
100 static struct dispatch_pthread_root_queue_context_s
101 _dispatch_pthread_root_queue_contexts[] = {
102 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {
103 .dpq_thread_mediator = {
104 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
105 }},
106 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {
107 .dpq_thread_mediator = {
108 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
109 }},
110 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {
111 .dpq_thread_mediator = {
112 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
113 }},
114 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {
115 .dpq_thread_mediator = {
116 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
117 }},
118 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {
119 .dpq_thread_mediator = {
120 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
121 }},
122 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {
123 .dpq_thread_mediator = {
124 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
125 }},
126 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {
127 .dpq_thread_mediator = {
128 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
129 }},
130 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {
131 .dpq_thread_mediator = {
132 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
133 }},
134 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {
135 .dpq_thread_mediator = {
136 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
137 }},
138 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {
139 .dpq_thread_mediator = {
140 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
141 }},
142 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {
143 .dpq_thread_mediator = {
144 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
145 }},
146 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {
147 .dpq_thread_mediator = {
148 DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
149 }},
150 };
151 #endif
152
153 #define MAX_PTHREAD_COUNT 255
154
155 struct dispatch_root_queue_context_s {
156 union {
157 struct {
158 unsigned int volatile dgq_pending;
159 #if HAVE_PTHREAD_WORKQUEUES
160 qos_class_t dgq_qos;
161 int dgq_wq_priority, dgq_wq_options;
162 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
163 pthread_workqueue_t dgq_kworkqueue;
164 #endif
165 #endif // HAVE_PTHREAD_WORKQUEUES
166 #if DISPATCH_USE_PTHREAD_POOL
167 void *dgq_ctxt;
168 uint32_t volatile dgq_thread_pool_size;
169 #endif
170 };
171 char _dgq_pad[DISPATCH_CACHELINE_SIZE];
172 };
173 };
174 typedef struct dispatch_root_queue_context_s *dispatch_root_queue_context_t;
175
176 #define WORKQ_PRIO_INVALID (-1)
177 #ifndef WORKQ_BG_PRIOQUEUE_CONDITIONAL
178 #define WORKQ_BG_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
179 #endif
180 #ifndef WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
181 #define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
182 #endif
183
184 DISPATCH_CACHELINE_ALIGN
185 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
186 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
187 #if HAVE_PTHREAD_WORKQUEUES
188 .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE,
189 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
190 .dgq_wq_options = 0,
191 #endif
192 #if DISPATCH_ENABLE_THREAD_POOL
193 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
194 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS],
195 #endif
196 }}},
197 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
198 #if HAVE_PTHREAD_WORKQUEUES
199 .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE,
200 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
201 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
202 #endif
203 #if DISPATCH_ENABLE_THREAD_POOL
204 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
205 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT],
206 #endif
207 }}},
208 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
209 #if HAVE_PTHREAD_WORKQUEUES
210 .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND,
211 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
212 .dgq_wq_options = 0,
213 #endif
214 #if DISPATCH_ENABLE_THREAD_POOL
215 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
216 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
217 #endif
218 }}},
219 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
220 #if HAVE_PTHREAD_WORKQUEUES
221 .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND,
222 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
223 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
224 #endif
225 #if DISPATCH_ENABLE_THREAD_POOL
226 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
227 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
228 #endif
229 }}},
230 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
231 #if HAVE_PTHREAD_WORKQUEUES
232 .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY,
233 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
234 .dgq_wq_options = 0,
235 #endif
236 #if DISPATCH_ENABLE_THREAD_POOL
237 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
238 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
239 #endif
240 }}},
241 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
242 #if HAVE_PTHREAD_WORKQUEUES
243 .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY,
244 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
245 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
246 #endif
247 #if DISPATCH_ENABLE_THREAD_POOL
248 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
249 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
250 #endif
251 }}},
252 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
253 #if HAVE_PTHREAD_WORKQUEUES
254 .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT,
255 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
256 .dgq_wq_options = 0,
257 #endif
258 #if DISPATCH_ENABLE_THREAD_POOL
259 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
260 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
261 #endif
262 }}},
263 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
264 #if HAVE_PTHREAD_WORKQUEUES
265 .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT,
266 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
267 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
268 #endif
269 #if DISPATCH_ENABLE_THREAD_POOL
270 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
271 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
272 #endif
273 }}},
274 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
275 #if HAVE_PTHREAD_WORKQUEUES
276 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED,
277 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
278 .dgq_wq_options = 0,
279 #endif
280 #if DISPATCH_ENABLE_THREAD_POOL
281 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
282 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
283 #endif
284 }}},
285 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
286 #if HAVE_PTHREAD_WORKQUEUES
287 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED,
288 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
289 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
290 #endif
291 #if DISPATCH_ENABLE_THREAD_POOL
292 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
293 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
294 #endif
295 }}},
296 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
297 #if HAVE_PTHREAD_WORKQUEUES
298 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE,
299 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
300 .dgq_wq_options = 0,
301 #endif
302 #if DISPATCH_ENABLE_THREAD_POOL
303 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
304 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS],
305 #endif
306 }}},
307 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
308 #if HAVE_PTHREAD_WORKQUEUES
309 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE,
310 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
311 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
312 #endif
313 #if DISPATCH_ENABLE_THREAD_POOL
314 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
315 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT],
316 #endif
317 }}},
318 };
319
320 // 6618342 Contact the team that owns the Instrument DTrace probe before
321 // renaming this symbol
322 DISPATCH_CACHELINE_ALIGN
323 struct dispatch_queue_s _dispatch_root_queues[] = {
324 #define _DISPATCH_ROOT_QUEUE_ENTRY(n, ...) \
325 [DISPATCH_ROOT_QUEUE_IDX_##n] = { \
326 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root), \
327 .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
328 .do_ctxt = &_dispatch_root_queue_contexts[ \
329 DISPATCH_ROOT_QUEUE_IDX_##n], \
330 .dq_width = DISPATCH_QUEUE_WIDTH_POOL, \
331 .dq_override_voucher = DISPATCH_NO_VOUCHER, \
332 .dq_override = DISPATCH_SATURATED_OVERRIDE, \
333 __VA_ARGS__ \
334 }
335 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE_QOS,
336 .dq_label = "com.apple.root.maintenance-qos",
337 .dq_serialnum = 4,
338 ),
339 _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE_QOS_OVERCOMMIT,
340 .dq_label = "com.apple.root.maintenance-qos.overcommit",
341 .dq_serialnum = 5,
342 ),
343 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND_QOS,
344 .dq_label = "com.apple.root.background-qos",
345 .dq_serialnum = 6,
346 ),
347 _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND_QOS_OVERCOMMIT,
348 .dq_label = "com.apple.root.background-qos.overcommit",
349 .dq_serialnum = 7,
350 ),
351 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY_QOS,
352 .dq_label = "com.apple.root.utility-qos",
353 .dq_serialnum = 8,
354 ),
355 _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY_QOS_OVERCOMMIT,
356 .dq_label = "com.apple.root.utility-qos.overcommit",
357 .dq_serialnum = 9,
358 ),
359 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT_QOS,
360 .dq_label = "com.apple.root.default-qos",
361 .dq_serialnum = 10,
362 ),
363 _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT_QOS_OVERCOMMIT,
364 .dq_label = "com.apple.root.default-qos.overcommit",
365 .dq_serialnum = 11,
366 ),
367 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED_QOS,
368 .dq_label = "com.apple.root.user-initiated-qos",
369 .dq_serialnum = 12,
370 ),
371 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED_QOS_OVERCOMMIT,
372 .dq_label = "com.apple.root.user-initiated-qos.overcommit",
373 .dq_serialnum = 13,
374 ),
375 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE_QOS,
376 .dq_label = "com.apple.root.user-interactive-qos",
377 .dq_serialnum = 14,
378 ),
379 _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE_QOS_OVERCOMMIT,
380 .dq_label = "com.apple.root.user-interactive-qos.overcommit",
381 .dq_serialnum = 15,
382 ),
383 };
384
385 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
386 static const dispatch_queue_t _dispatch_wq2root_queues[][2] = {
387 [WORKQ_BG_PRIOQUEUE][0] = &_dispatch_root_queues[
388 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
389 [WORKQ_BG_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
390 &_dispatch_root_queues[
391 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
392 [WORKQ_LOW_PRIOQUEUE][0] = &_dispatch_root_queues[
393 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
394 [WORKQ_LOW_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
395 &_dispatch_root_queues[
396 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
397 [WORKQ_DEFAULT_PRIOQUEUE][0] = &_dispatch_root_queues[
398 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
399 [WORKQ_DEFAULT_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
400 &_dispatch_root_queues[
401 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
402 [WORKQ_HIGH_PRIOQUEUE][0] = &_dispatch_root_queues[
403 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
404 [WORKQ_HIGH_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
405 &_dispatch_root_queues[
406 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
407 };
408 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
409
410 #define DISPATCH_PRIORITY_COUNT 5
411
412 enum {
413 // No DISPATCH_PRIORITY_IDX_MAINTENANCE define because there is no legacy
414 // maintenance priority
415 DISPATCH_PRIORITY_IDX_BACKGROUND = 0,
416 DISPATCH_PRIORITY_IDX_NON_INTERACTIVE,
417 DISPATCH_PRIORITY_IDX_LOW,
418 DISPATCH_PRIORITY_IDX_DEFAULT,
419 DISPATCH_PRIORITY_IDX_HIGH,
420 };
421
422 static qos_class_t _dispatch_priority2qos[] = {
423 [DISPATCH_PRIORITY_IDX_BACKGROUND] = _DISPATCH_QOS_CLASS_BACKGROUND,
424 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = _DISPATCH_QOS_CLASS_UTILITY,
425 [DISPATCH_PRIORITY_IDX_LOW] = _DISPATCH_QOS_CLASS_UTILITY,
426 [DISPATCH_PRIORITY_IDX_DEFAULT] = _DISPATCH_QOS_CLASS_DEFAULT,
427 [DISPATCH_PRIORITY_IDX_HIGH] = _DISPATCH_QOS_CLASS_USER_INITIATED,
428 };
429
430 #if HAVE_PTHREAD_WORKQUEUE_QOS
431 static const int _dispatch_priority2wq[] = {
432 [DISPATCH_PRIORITY_IDX_BACKGROUND] = WORKQ_BG_PRIOQUEUE,
433 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = WORKQ_NON_INTERACTIVE_PRIOQUEUE,
434 [DISPATCH_PRIORITY_IDX_LOW] = WORKQ_LOW_PRIOQUEUE,
435 [DISPATCH_PRIORITY_IDX_DEFAULT] = WORKQ_DEFAULT_PRIOQUEUE,
436 [DISPATCH_PRIORITY_IDX_HIGH] = WORKQ_HIGH_PRIOQUEUE,
437 };
438 #endif
439
440 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
441 static struct dispatch_queue_s _dispatch_mgr_root_queue;
442 #else
443 #define _dispatch_mgr_root_queue _dispatch_root_queues[\
444 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT]
445 #endif
446
447 // 6618342 Contact the team that owns the Instrument DTrace probe before
448 // renaming this symbol
449 DISPATCH_CACHELINE_ALIGN
450 struct dispatch_queue_s _dispatch_mgr_q = {
451 DISPATCH_GLOBAL_OBJECT_HEADER(queue_mgr),
452 .dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1),
453 .do_targetq = &_dispatch_mgr_root_queue,
454 .dq_label = "com.apple.libdispatch-manager",
455 .dq_width = 1,
456 .dq_override_voucher = DISPATCH_NO_VOUCHER,
457 .dq_override = DISPATCH_SATURATED_OVERRIDE,
458 .dq_serialnum = 2,
459 };
460
461 dispatch_queue_t
462 dispatch_get_global_queue(long priority, unsigned long flags)
463 {
464 if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
465 return DISPATCH_BAD_INPUT;
466 }
467 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
468 _dispatch_root_queues_init_once);
469 qos_class_t qos;
470 switch (priority) {
471 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
472 case _DISPATCH_QOS_CLASS_MAINTENANCE:
473 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS]
474 .dq_priority) {
475 // map maintenance to background on old kernel
476 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND];
477 } else {
478 qos = (qos_class_t)priority;
479 }
480 break;
481 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
482 case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
483 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND];
484 break;
485 case DISPATCH_QUEUE_PRIORITY_NON_INTERACTIVE:
486 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_NON_INTERACTIVE];
487 break;
488 case DISPATCH_QUEUE_PRIORITY_LOW:
489 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_LOW];
490 break;
491 case DISPATCH_QUEUE_PRIORITY_DEFAULT:
492 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_DEFAULT];
493 break;
494 case DISPATCH_QUEUE_PRIORITY_HIGH:
495 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH];
496 break;
497 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE:
498 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
499 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS]
500 .dq_priority) {
501 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH];
502 break;
503 }
504 #endif
505 // fallthrough
506 default:
507 qos = (qos_class_t)priority;
508 break;
509 }
510 return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT);
511 }
512
513 DISPATCH_ALWAYS_INLINE
514 static inline dispatch_queue_t
515 _dispatch_get_current_queue(void)
516 {
517 return _dispatch_queue_get_current() ?:
518 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true);
519 }
520
521 dispatch_queue_t
522 dispatch_get_current_queue(void)
523 {
524 return _dispatch_get_current_queue();
525 }
526
527 DISPATCH_NOINLINE DISPATCH_NORETURN
528 static void
529 _dispatch_assert_queue_fail(dispatch_queue_t dq, bool expected)
530 {
531 _dispatch_client_assert_fail(
532 "Block was %sexpected to execute on queue [%s]",
533 expected ? "" : "not ", dq->dq_label ?: "");
534 }
535
536 DISPATCH_NOINLINE DISPATCH_NORETURN
537 static void
538 _dispatch_assert_queue_barrier_fail(dispatch_queue_t dq)
539 {
540 _dispatch_client_assert_fail(
541 "Block was expected to act as a barrier on queue [%s]",
542 dq->dq_label ?: "");
543 }
544
545 void
546 dispatch_assert_queue(dispatch_queue_t dq)
547 {
548 unsigned long metatype = dx_metatype(dq);
549 if (unlikely(metatype != _DISPATCH_QUEUE_TYPE)) {
550 DISPATCH_CLIENT_CRASH(metatype, "invalid queue passed to "
551 "dispatch_assert_queue()");
552 }
553 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
554 if (unlikely(_dq_state_drain_pended(dq_state))) {
555 goto fail;
556 }
557 if (likely(_dq_state_drain_owner(dq_state) == _dispatch_tid_self())) {
558 return;
559 }
560 if (likely(dq->dq_width > 1)) {
561 // we can look at the width: if it is changing while we read it,
562 // it means that a barrier is running on `dq` concurrently, which
563 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
564 if (fastpath(_dispatch_thread_frame_find_queue(dq))) {
565 return;
566 }
567 }
568 fail:
569 _dispatch_assert_queue_fail(dq, true);
570 }
571
572 void
573 dispatch_assert_queue_not(dispatch_queue_t dq)
574 {
575 unsigned long metatype = dx_metatype(dq);
576 if (unlikely(metatype != _DISPATCH_QUEUE_TYPE)) {
577 DISPATCH_CLIENT_CRASH(metatype, "invalid queue passed to "
578 "dispatch_assert_queue_not()");
579 }
580 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
581 if (_dq_state_drain_pended(dq_state)) {
582 return;
583 }
584 if (likely(_dq_state_drain_owner(dq_state) != _dispatch_tid_self())) {
585 if (likely(dq->dq_width == 1)) {
586 // we can look at the width: if it is changing while we read it,
587 // it means that a barrier is running on `dq` concurrently, which
588 // proves that we're not on `dq`. Hence reading a stale '1' is ok.
589 return;
590 }
591 if (likely(!_dispatch_thread_frame_find_queue(dq))) {
592 return;
593 }
594 }
595 _dispatch_assert_queue_fail(dq, false);
596 }
597
598 void
599 dispatch_assert_queue_barrier(dispatch_queue_t dq)
600 {
601 dispatch_assert_queue(dq);
602
603 if (likely(dq->dq_width == 1)) {
604 return;
605 }
606
607 if (likely(dq->do_targetq)) {
608 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
609 if (likely(_dq_state_is_in_barrier(dq_state))) {
610 return;
611 }
612 }
613
614 _dispatch_assert_queue_barrier_fail(dq);
615 }
616
617 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
618 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
619 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
620 #else
621 #define _dispatch_root_queue_debug(...)
622 #define _dispatch_debug_root_queue(...)
623 #endif
624
625 #pragma mark -
626 #pragma mark dispatch_init
627
628 #if HAVE_PTHREAD_WORKQUEUE_QOS
629 pthread_priority_t _dispatch_background_priority;
630 pthread_priority_t _dispatch_user_initiated_priority;
631
632 static void
633 _dispatch_root_queues_init_qos(int supported)
634 {
635 pthread_priority_t p;
636 qos_class_t qos;
637 unsigned int i;
638 for (i = 0; i < DISPATCH_PRIORITY_COUNT; i++) {
639 p = _pthread_qos_class_encode_workqueue(_dispatch_priority2wq[i], 0);
640 qos = _pthread_qos_class_decode(p, NULL, NULL);
641 dispatch_assert(qos != _DISPATCH_QOS_CLASS_UNSPECIFIED);
642 _dispatch_priority2qos[i] = qos;
643 }
644 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
645 qos = _dispatch_root_queue_contexts[i].dgq_qos;
646 if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
647 !(supported & WORKQ_FEATURE_MAINTENANCE)) {
648 continue;
649 }
650 unsigned long flags = i & 1 ? _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0;
651 flags |= _PTHREAD_PRIORITY_ROOTQUEUE_FLAG;
652 if (i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS ||
653 i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT) {
654 flags |= _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
655 }
656 p = _pthread_qos_class_encode(qos, 0, flags);
657 _dispatch_root_queues[i].dq_priority = (dispatch_priority_t)p;
658 }
659 }
660 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
661
662 static inline bool
663 _dispatch_root_queues_init_workq(int *wq_supported)
664 {
665 int r;
666 bool result = false;
667 *wq_supported = 0;
668 #if HAVE_PTHREAD_WORKQUEUES
669 bool disable_wq = false;
670 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
671 disable_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
672 #endif
673 #if DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
674 bool disable_qos = false;
675 #if DISPATCH_DEBUG
676 disable_qos = slowpath(getenv("LIBDISPATCH_DISABLE_QOS"));
677 #endif
678 #if DISPATCH_USE_KEVENT_WORKQUEUE
679 bool disable_kevent_wq = false;
680 #if DISPATCH_DEBUG
681 disable_kevent_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KEVENT_WQ"));
682 #endif
683 #endif
684 if (!disable_wq && !disable_qos) {
685 *wq_supported = _pthread_workqueue_supported();
686 #if DISPATCH_USE_KEVENT_WORKQUEUE
687 if (!disable_kevent_wq && (*wq_supported & WORKQ_FEATURE_KEVENT)) {
688 r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread3,
689 (pthread_workqueue_function_kevent_t)
690 _dispatch_kevent_worker_thread,
691 offsetof(struct dispatch_queue_s, dq_serialnum), 0);
692 #if DISPATCH_USE_MGR_THREAD
693 _dispatch_kevent_workqueue_enabled = !r;
694 #endif
695 #if DISPATCH_EVFILT_MACHPORT_PORTSET_FALLBACK
696 _dispatch_evfilt_machport_direct_enabled = !r;
697 #endif
698 result = !r;
699 } else
700 #endif
701 if (*wq_supported & WORKQ_FEATURE_FINEPRIO) {
702 #if DISPATCH_USE_MGR_THREAD
703 r = _pthread_workqueue_init(_dispatch_worker_thread3,
704 offsetof(struct dispatch_queue_s, dq_serialnum), 0);
705 result = !r;
706 #endif
707 }
708 if (result) _dispatch_root_queues_init_qos(*wq_supported);
709 }
710 #endif // DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
711 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
712 if (!result && !disable_wq) {
713 pthread_workqueue_setdispatchoffset_np(
714 offsetof(struct dispatch_queue_s, dq_serialnum));
715 r = pthread_workqueue_setdispatch_np(_dispatch_worker_thread2);
716 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
717 (void)dispatch_assume_zero(r);
718 #endif
719 result = !r;
720 }
721 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
722 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
723 if (!result) {
724 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
725 pthread_workqueue_attr_t pwq_attr;
726 if (!disable_wq) {
727 r = pthread_workqueue_attr_init_np(&pwq_attr);
728 (void)dispatch_assume_zero(r);
729 }
730 #endif
731 int i;
732 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
733 pthread_workqueue_t pwq = NULL;
734 dispatch_root_queue_context_t qc;
735 qc = &_dispatch_root_queue_contexts[i];
736 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
737 if (!disable_wq && qc->dgq_wq_priority != WORKQ_PRIO_INVALID) {
738 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr,
739 qc->dgq_wq_priority);
740 (void)dispatch_assume_zero(r);
741 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr,
742 qc->dgq_wq_options &
743 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT);
744 (void)dispatch_assume_zero(r);
745 r = pthread_workqueue_create_np(&pwq, &pwq_attr);
746 (void)dispatch_assume_zero(r);
747 result = result || dispatch_assume(pwq);
748 }
749 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
750 qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul);
751 }
752 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
753 if (!disable_wq) {
754 r = pthread_workqueue_attr_destroy_np(&pwq_attr);
755 (void)dispatch_assume_zero(r);
756 }
757 #endif
758 }
759 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
760 #endif // HAVE_PTHREAD_WORKQUEUES
761 return result;
762 }
763
764 #if DISPATCH_USE_PTHREAD_POOL
765 static inline void
766 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc,
767 uint8_t pool_size, bool overcommit)
768 {
769 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
770 uint32_t thread_pool_size = overcommit ? MAX_PTHREAD_COUNT :
771 dispatch_hw_config(active_cpus);
772 if (slowpath(pool_size) && pool_size < thread_pool_size) {
773 thread_pool_size = pool_size;
774 }
775 qc->dgq_thread_pool_size = thread_pool_size;
776 #if HAVE_PTHREAD_WORKQUEUES
777 if (qc->dgq_qos) {
778 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
779 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
780 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
781 #if HAVE_PTHREAD_WORKQUEUE_QOS
782 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(
783 &pqc->dpq_thread_attr, qc->dgq_qos, 0));
784 #endif
785 }
786 #endif // HAVE_PTHREAD_WORKQUEUES
787 #if USE_MACH_SEM
788 // override the default FIFO behavior for the pool semaphores
789 kern_return_t kr = semaphore_create(mach_task_self(),
790 &pqc->dpq_thread_mediator.dsema_port, SYNC_POLICY_LIFO, 0);
791 DISPATCH_VERIFY_MIG(kr);
792 (void)dispatch_assume_zero(kr);
793 (void)dispatch_assume(pqc->dpq_thread_mediator.dsema_port);
794 #elif USE_POSIX_SEM
795 /* XXXRW: POSIX semaphores don't support LIFO? */
796 int ret = sem_init(&(pqc->dpq_thread_mediator.dsema_sem), 0, 0);
797 (void)dispatch_assume_zero(ret);
798 #endif
799 }
800 #endif // DISPATCH_USE_PTHREAD_POOL
801
802 static dispatch_once_t _dispatch_root_queues_pred;
803
804 void
805 _dispatch_root_queues_init(void)
806 {
807 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
808 _dispatch_root_queues_init_once);
809 }
810
811 static void
812 _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
813 {
814 int wq_supported;
815 _dispatch_fork_becomes_unsafe();
816 if (!_dispatch_root_queues_init_workq(&wq_supported)) {
817 #if DISPATCH_ENABLE_THREAD_POOL
818 int i;
819 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
820 bool overcommit = true;
821 #if TARGET_OS_EMBEDDED
822 // some software hangs if the non-overcommitting queues do not
823 // overcommit when threads block. Someday, this behavior should
824 // apply to all platforms
825 if (!(i & 1)) {
826 overcommit = false;
827 }
828 #endif
829 _dispatch_root_queue_init_pthread_pool(
830 &_dispatch_root_queue_contexts[i], 0, overcommit);
831 }
832 #else
833 DISPATCH_INTERNAL_CRASH((errno << 16) | wq_supported,
834 "Root queue initialization failed");
835 #endif // DISPATCH_ENABLE_THREAD_POOL
836 }
837 }
838
839 DISPATCH_EXPORT DISPATCH_NOTHROW
840 void
841 libdispatch_init(void)
842 {
843 dispatch_assert(DISPATCH_QUEUE_QOS_COUNT == 6);
844 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 12);
845
846 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW ==
847 -DISPATCH_QUEUE_PRIORITY_HIGH);
848 dispatch_assert(countof(_dispatch_root_queues) ==
849 DISPATCH_ROOT_QUEUE_COUNT);
850 dispatch_assert(countof(_dispatch_root_queue_contexts) ==
851 DISPATCH_ROOT_QUEUE_COUNT);
852 dispatch_assert(countof(_dispatch_priority2qos) ==
853 DISPATCH_PRIORITY_COUNT);
854 #if HAVE_PTHREAD_WORKQUEUE_QOS
855 dispatch_assert(countof(_dispatch_priority2wq) ==
856 DISPATCH_PRIORITY_COUNT);
857 #endif
858 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
859 dispatch_assert(sizeof(_dispatch_wq2root_queues) /
860 sizeof(_dispatch_wq2root_queues[0][0]) ==
861 WORKQ_NUM_PRIOQUEUE * 2);
862 #endif
863 #if DISPATCH_ENABLE_THREAD_POOL
864 dispatch_assert(countof(_dispatch_pthread_root_queue_contexts) ==
865 DISPATCH_ROOT_QUEUE_COUNT);
866 #endif
867
868 dispatch_assert(offsetof(struct dispatch_continuation_s, do_next) ==
869 offsetof(struct dispatch_object_s, do_next));
870 dispatch_assert(offsetof(struct dispatch_continuation_s, do_vtable) ==
871 offsetof(struct dispatch_object_s, do_vtable));
872 dispatch_assert(sizeof(struct dispatch_apply_s) <=
873 DISPATCH_CONTINUATION_SIZE);
874 dispatch_assert(sizeof(struct dispatch_queue_s) % DISPATCH_CACHELINE_SIZE
875 == 0);
876 dispatch_assert(offsetof(struct dispatch_queue_s, dq_state) % _Alignof(uint64_t) == 0);
877 dispatch_assert(sizeof(struct dispatch_root_queue_context_s) %
878 DISPATCH_CACHELINE_SIZE == 0);
879
880
881 #if HAVE_PTHREAD_WORKQUEUE_QOS
882 // 26497968 _dispatch_user_initiated_priority should be set for qos
883 // propagation to work properly
884 pthread_priority_t p = _pthread_qos_class_encode(qos_class_main(), 0, 0);
885 _dispatch_main_q.dq_priority = (dispatch_priority_t)p;
886 _dispatch_main_q.dq_override = p & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
887 p = _pthread_qos_class_encode(_DISPATCH_QOS_CLASS_USER_INITIATED, 0, 0);
888 _dispatch_user_initiated_priority = p;
889 p = _pthread_qos_class_encode(_DISPATCH_QOS_CLASS_BACKGROUND, 0, 0);
890 _dispatch_background_priority = p;
891 #if DISPATCH_DEBUG
892 if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) {
893 _dispatch_set_qos_class_enabled = 1;
894 }
895 #endif
896 #endif
897
898 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
899 _dispatch_thread_key_create(&__dispatch_tsd_key, _libdispatch_tsd_cleanup);
900 #else
901 _dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup);
902 _dispatch_thread_key_create(&dispatch_deferred_items_key,
903 _dispatch_deferred_items_cleanup);
904 _dispatch_thread_key_create(&dispatch_frame_key, _dispatch_frame_cleanup);
905 _dispatch_thread_key_create(&dispatch_voucher_key, _voucher_thread_cleanup);
906 _dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup);
907 _dispatch_thread_key_create(&dispatch_context_key, _dispatch_context_cleanup);
908 _dispatch_thread_key_create(&dispatch_defaultpriority_key, NULL);
909 _dispatch_thread_key_create(&dispatch_pthread_root_queue_observer_hooks_key,
910 NULL);
911 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
912 _dispatch_thread_key_create(&dispatch_bcounter_key, NULL);
913 #endif
914 #if DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
915 if (DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK) {
916 _dispatch_thread_key_create(&dispatch_sema4_key,
917 _dispatch_thread_semaphore_dispose);
918 }
919 #endif
920 #endif
921
922 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
923 _dispatch_main_q.do_targetq = &_dispatch_root_queues[
924 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT];
925 #endif
926
927 _dispatch_queue_set_current(&_dispatch_main_q);
928 _dispatch_queue_set_bound_thread(&_dispatch_main_q);
929
930 #if DISPATCH_USE_PTHREAD_ATFORK
931 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare,
932 dispatch_atfork_parent, dispatch_atfork_child));
933 #endif
934 _dispatch_hw_config_init();
935 _dispatch_vtable_init();
936 _os_object_init();
937 _voucher_init();
938 _dispatch_introspection_init();
939 }
940
941 #if HAVE_MACH
942 static dispatch_once_t _dispatch_mach_host_port_pred;
943 static mach_port_t _dispatch_mach_host_port;
944
945 static void
946 _dispatch_mach_host_port_init(void *ctxt DISPATCH_UNUSED)
947 {
948 kern_return_t kr;
949 mach_port_t mp, mhp = mach_host_self();
950 kr = host_get_host_port(mhp, &mp);
951 DISPATCH_VERIFY_MIG(kr);
952 if (fastpath(!kr)) {
953 // mach_host_self returned the HOST_PRIV port
954 kr = mach_port_deallocate(mach_task_self(), mhp);
955 DISPATCH_VERIFY_MIG(kr);
956 mhp = mp;
957 } else if (kr != KERN_INVALID_ARGUMENT) {
958 (void)dispatch_assume_zero(kr);
959 }
960 if (!fastpath(mhp)) {
961 DISPATCH_CLIENT_CRASH(kr, "Could not get unprivileged host port");
962 }
963 _dispatch_mach_host_port = mhp;
964 }
965
966 mach_port_t
967 _dispatch_get_mach_host_port(void)
968 {
969 dispatch_once_f(&_dispatch_mach_host_port_pred, NULL,
970 _dispatch_mach_host_port_init);
971 return _dispatch_mach_host_port;
972 }
973 #endif
974
975 #if DISPATCH_USE_THREAD_LOCAL_STORAGE
976 #include <unistd.h>
977 #include <sys/syscall.h>
978
979 #ifdef SYS_gettid
980 DISPATCH_ALWAYS_INLINE
981 static inline pid_t
982 gettid(void)
983 {
984 return (pid_t) syscall(SYS_gettid);
985 }
986 #else
987 #error "SYS_gettid unavailable on this system"
988 #endif
989
990 #define _tsd_call_cleanup(k, f) do { \
991 if ((f) && tsd->k) ((void(*)(void*))(f))(tsd->k); \
992 } while (0)
993
994 void
995 _libdispatch_tsd_cleanup(void *ctx)
996 {
997 struct dispatch_tsd *tsd = (struct dispatch_tsd*) ctx;
998
999 _tsd_call_cleanup(dispatch_queue_key, _dispatch_queue_cleanup);
1000 _tsd_call_cleanup(dispatch_frame_key, _dispatch_frame_cleanup);
1001 _tsd_call_cleanup(dispatch_cache_key, _dispatch_cache_cleanup);
1002 _tsd_call_cleanup(dispatch_context_key, _dispatch_context_cleanup);
1003 _tsd_call_cleanup(dispatch_pthread_root_queue_observer_hooks_key,
1004 NULL);
1005 _tsd_call_cleanup(dispatch_defaultpriority_key, NULL);
1006 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
1007 _tsd_call_cleanup(dispatch_bcounter_key, NULL);
1008 #endif
1009 #if DISPATCH_LOCK_USE_SEMAPHORE_FALLBACK
1010 _tsd_call_cleanup(dispatch_sema4_key, _dispatch_thread_semaphore_dispose);
1011 #endif
1012 _tsd_call_cleanup(dispatch_priority_key, NULL);
1013 _tsd_call_cleanup(dispatch_voucher_key, _voucher_thread_cleanup);
1014 _tsd_call_cleanup(dispatch_deferred_items_key,
1015 _dispatch_deferred_items_cleanup);
1016 tsd->tid = 0;
1017 }
1018
1019 DISPATCH_NOINLINE
1020 void
1021 libdispatch_tsd_init(void)
1022 {
1023 pthread_setspecific(__dispatch_tsd_key, &__dispatch_tsd);
1024 __dispatch_tsd.tid = gettid();
1025 }
1026 #endif
1027
1028 DISPATCH_EXPORT DISPATCH_NOTHROW
1029 void
1030 dispatch_atfork_child(void)
1031 {
1032 void *crash = (void *)0x100;
1033 size_t i;
1034
1035 #if HAVE_MACH
1036 _dispatch_mach_host_port_pred = 0;
1037 _dispatch_mach_host_port = MACH_VOUCHER_NULL;
1038 #endif
1039 _voucher_atfork_child();
1040 if (!_dispatch_is_multithreaded_inline()) {
1041 // clear the _PROHIBIT bit if set
1042 _dispatch_unsafe_fork = 0;
1043 return;
1044 }
1045 _dispatch_unsafe_fork = 0;
1046 _dispatch_child_of_unsafe_fork = true;
1047
1048 _dispatch_main_q.dq_items_head = crash;
1049 _dispatch_main_q.dq_items_tail = crash;
1050
1051 _dispatch_mgr_q.dq_items_head = crash;
1052 _dispatch_mgr_q.dq_items_tail = crash;
1053
1054 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
1055 _dispatch_root_queues[i].dq_items_head = crash;
1056 _dispatch_root_queues[i].dq_items_tail = crash;
1057 }
1058 }
1059
1060 #pragma mark -
1061 #pragma mark dispatch_queue_attr_t
1062
1063 DISPATCH_ALWAYS_INLINE
1064 static inline bool
1065 _dispatch_qos_class_valid(dispatch_qos_class_t qos_class, int relative_priority)
1066 {
1067 qos_class_t qos = (qos_class_t)qos_class;
1068 switch (qos) {
1069 case _DISPATCH_QOS_CLASS_MAINTENANCE:
1070 case _DISPATCH_QOS_CLASS_BACKGROUND:
1071 case _DISPATCH_QOS_CLASS_UTILITY:
1072 case _DISPATCH_QOS_CLASS_DEFAULT:
1073 case _DISPATCH_QOS_CLASS_USER_INITIATED:
1074 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE:
1075 case _DISPATCH_QOS_CLASS_UNSPECIFIED:
1076 break;
1077 default:
1078 return false;
1079 }
1080 if (relative_priority > 0 || relative_priority < QOS_MIN_RELATIVE_PRIORITY){
1081 return false;
1082 }
1083 return true;
1084 }
1085
1086 #define DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(qos) \
1087 [_DISPATCH_QOS_CLASS_##qos] = DQA_INDEX_QOS_CLASS_##qos
1088
1089 static const
1090 _dispatch_queue_attr_index_qos_class_t _dispatch_queue_attr_qos2idx[] = {
1091 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UNSPECIFIED),
1092 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(MAINTENANCE),
1093 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(BACKGROUND),
1094 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UTILITY),
1095 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(DEFAULT),
1096 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INITIATED),
1097 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INTERACTIVE),
1098 };
1099
1100 #define DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit) \
1101 ((overcommit) == _dispatch_queue_attr_overcommit_disabled ? \
1102 DQA_INDEX_NON_OVERCOMMIT : \
1103 ((overcommit) == _dispatch_queue_attr_overcommit_enabled ? \
1104 DQA_INDEX_OVERCOMMIT : DQA_INDEX_UNSPECIFIED_OVERCOMMIT))
1105
1106 #define DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent) \
1107 ((concurrent) ? DQA_INDEX_CONCURRENT : DQA_INDEX_SERIAL)
1108
1109 #define DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive) \
1110 ((inactive) ? DQA_INDEX_INACTIVE : DQA_INDEX_ACTIVE)
1111
1112 #define DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency) \
1113 (frequency)
1114
1115 #define DISPATCH_QUEUE_ATTR_PRIO2IDX(prio) (-(prio))
1116
1117 #define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (_dispatch_queue_attr_qos2idx[(qos)])
1118
1119 static inline dispatch_queue_attr_t
1120 _dispatch_get_queue_attr(qos_class_t qos, int prio,
1121 _dispatch_queue_attr_overcommit_t overcommit,
1122 dispatch_autorelease_frequency_t frequency,
1123 bool concurrent, bool inactive)
1124 {
1125 return (dispatch_queue_attr_t)&_dispatch_queue_attrs
1126 [DISPATCH_QUEUE_ATTR_QOS2IDX(qos)]
1127 [DISPATCH_QUEUE_ATTR_PRIO2IDX(prio)]
1128 [DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit)]
1129 [DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency)]
1130 [DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent)]
1131 [DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive)];
1132 }
1133
1134 dispatch_queue_attr_t
1135 _dispatch_get_default_queue_attr(void)
1136 {
1137 return _dispatch_get_queue_attr(_DISPATCH_QOS_CLASS_UNSPECIFIED, 0,
1138 _dispatch_queue_attr_overcommit_unspecified,
1139 DISPATCH_AUTORELEASE_FREQUENCY_INHERIT, false, false);
1140 }
1141
1142 dispatch_queue_attr_t
1143 dispatch_queue_attr_make_with_qos_class(dispatch_queue_attr_t dqa,
1144 dispatch_qos_class_t qos_class, int relative_priority)
1145 {
1146 if (!_dispatch_qos_class_valid(qos_class, relative_priority)) {
1147 return DISPATCH_BAD_INPUT;
1148 }
1149 if (!slowpath(dqa)) {
1150 dqa = _dispatch_get_default_queue_attr();
1151 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1152 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1153 }
1154 return _dispatch_get_queue_attr(qos_class, relative_priority,
1155 dqa->dqa_overcommit, dqa->dqa_autorelease_frequency,
1156 dqa->dqa_concurrent, dqa->dqa_inactive);
1157 }
1158
1159 dispatch_queue_attr_t
1160 dispatch_queue_attr_make_initially_inactive(dispatch_queue_attr_t dqa)
1161 {
1162 if (!slowpath(dqa)) {
1163 dqa = _dispatch_get_default_queue_attr();
1164 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1165 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1166 }
1167 return _dispatch_get_queue_attr(dqa->dqa_qos_class,
1168 dqa->dqa_relative_priority, dqa->dqa_overcommit,
1169 dqa->dqa_autorelease_frequency, dqa->dqa_concurrent, true);
1170 }
1171
1172 dispatch_queue_attr_t
1173 dispatch_queue_attr_make_with_overcommit(dispatch_queue_attr_t dqa,
1174 bool overcommit)
1175 {
1176 if (!slowpath(dqa)) {
1177 dqa = _dispatch_get_default_queue_attr();
1178 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1179 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1180 }
1181 return _dispatch_get_queue_attr(dqa->dqa_qos_class,
1182 dqa->dqa_relative_priority, overcommit ?
1183 _dispatch_queue_attr_overcommit_enabled :
1184 _dispatch_queue_attr_overcommit_disabled,
1185 dqa->dqa_autorelease_frequency, dqa->dqa_concurrent,
1186 dqa->dqa_inactive);
1187 }
1188
1189 dispatch_queue_attr_t
1190 dispatch_queue_attr_make_with_autorelease_frequency(dispatch_queue_attr_t dqa,
1191 dispatch_autorelease_frequency_t frequency)
1192 {
1193 switch (frequency) {
1194 case DISPATCH_AUTORELEASE_FREQUENCY_INHERIT:
1195 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
1196 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
1197 break;
1198 default:
1199 return DISPATCH_BAD_INPUT;
1200 }
1201 if (!slowpath(dqa)) {
1202 dqa = _dispatch_get_default_queue_attr();
1203 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1204 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1205 }
1206 return _dispatch_get_queue_attr(dqa->dqa_qos_class,
1207 dqa->dqa_relative_priority, dqa->dqa_overcommit,
1208 frequency, dqa->dqa_concurrent, dqa->dqa_inactive);
1209 }
1210
1211 #pragma mark -
1212 #pragma mark dispatch_queue_t
1213
1214 // skip zero
1215 // 1 - main_q
1216 // 2 - mgr_q
1217 // 3 - mgr_root_q
1218 // 4,5,6,7,8,9,10,11,12,13,14,15 - global queues
1219 // we use 'xadd' on Intel, so the initial value == next assigned
1220 unsigned long volatile _dispatch_queue_serial_numbers = 16;
1221
1222 DISPATCH_NOINLINE
1223 static dispatch_queue_t
1224 _dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
1225 dispatch_queue_t tq, bool legacy)
1226 {
1227 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1228 // Be sure the root queue priorities are set
1229 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
1230 _dispatch_root_queues_init_once);
1231 #endif
1232 if (!slowpath(dqa)) {
1233 dqa = _dispatch_get_default_queue_attr();
1234 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1235 DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
1236 }
1237
1238 //
1239 // Step 1: Normalize arguments (qos, overcommit, tq)
1240 //
1241
1242 qos_class_t qos = dqa->dqa_qos_class;
1243 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1244 if (qos == _DISPATCH_QOS_CLASS_USER_INTERACTIVE &&
1245 !_dispatch_root_queues[
1246 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS].dq_priority) {
1247 qos = _DISPATCH_QOS_CLASS_USER_INITIATED;
1248 }
1249 #endif
1250 bool maintenance_fallback = false;
1251 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1252 maintenance_fallback = true;
1253 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1254 if (maintenance_fallback) {
1255 if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
1256 !_dispatch_root_queues[
1257 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS].dq_priority) {
1258 qos = _DISPATCH_QOS_CLASS_BACKGROUND;
1259 }
1260 }
1261
1262 _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
1263 if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
1264 if (tq->do_targetq) {
1265 DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
1266 "a non-global target queue");
1267 }
1268 }
1269
1270 if (tq && !tq->do_targetq &&
1271 tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
1272 // Handle discrepancies between attr and target queue, attributes win
1273 if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
1274 if (tq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
1275 overcommit = _dispatch_queue_attr_overcommit_enabled;
1276 } else {
1277 overcommit = _dispatch_queue_attr_overcommit_disabled;
1278 }
1279 }
1280 if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) {
1281 tq = _dispatch_get_root_queue_with_overcommit(tq,
1282 overcommit == _dispatch_queue_attr_overcommit_enabled);
1283 } else {
1284 tq = NULL;
1285 }
1286 } else if (tq && !tq->do_targetq) {
1287 // target is a pthread or runloop root queue, setting QoS or overcommit
1288 // is disallowed
1289 if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
1290 DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
1291 "and use this kind of target queue");
1292 }
1293 if (qos != _DISPATCH_QOS_CLASS_UNSPECIFIED) {
1294 DISPATCH_CLIENT_CRASH(tq, "Cannot specify a QoS attribute "
1295 "and use this kind of target queue");
1296 }
1297 } else {
1298 if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
1299 // Serial queues default to overcommit!
1300 overcommit = dqa->dqa_concurrent ?
1301 _dispatch_queue_attr_overcommit_disabled :
1302 _dispatch_queue_attr_overcommit_enabled;
1303 }
1304 }
1305 if (!tq) {
1306 qos_class_t tq_qos = qos == _DISPATCH_QOS_CLASS_UNSPECIFIED ?
1307 _DISPATCH_QOS_CLASS_DEFAULT : qos;
1308 tq = _dispatch_get_root_queue(tq_qos, overcommit ==
1309 _dispatch_queue_attr_overcommit_enabled);
1310 if (slowpath(!tq)) {
1311 DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
1312 }
1313 }
1314
1315 //
1316 // Step 2: Initialize the queue
1317 //
1318
1319 if (legacy) {
1320 // if any of these attributes is specified, use non legacy classes
1321 if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
1322 legacy = false;
1323 }
1324 }
1325
1326 const void *vtable;
1327 dispatch_queue_flags_t dqf = 0;
1328 if (legacy) {
1329 vtable = DISPATCH_VTABLE(queue);
1330 } else if (dqa->dqa_concurrent) {
1331 vtable = DISPATCH_VTABLE(queue_concurrent);
1332 } else {
1333 vtable = DISPATCH_VTABLE(queue_serial);
1334 }
1335 switch (dqa->dqa_autorelease_frequency) {
1336 case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
1337 dqf |= DQF_AUTORELEASE_NEVER;
1338 break;
1339 case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
1340 dqf |= DQF_AUTORELEASE_ALWAYS;
1341 break;
1342 }
1343 if (label) {
1344 const char *tmp = _dispatch_strdup_if_mutable(label);
1345 if (tmp != label) {
1346 dqf |= DQF_LABEL_NEEDS_FREE;
1347 label = tmp;
1348 }
1349 }
1350
1351 dispatch_queue_t dq = _dispatch_alloc(vtable,
1352 sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
1353 _dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
1354 DISPATCH_QUEUE_WIDTH_MAX : 1, dqa->dqa_inactive);
1355
1356 dq->dq_label = label;
1357
1358 #if HAVE_PTHREAD_WORKQUEUE_QOS
1359 dq->dq_priority = (dispatch_priority_t)_pthread_qos_class_encode(qos,
1360 dqa->dqa_relative_priority,
1361 overcommit == _dispatch_queue_attr_overcommit_enabled ?
1362 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0);
1363 #endif
1364 _dispatch_retain(tq);
1365 if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) {
1366 // legacy way of inherithing the QoS from the target
1367 _dispatch_queue_priority_inherit_from_target(dq, tq);
1368 }
1369 if (!dqa->dqa_inactive) {
1370 _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
1371 }
1372 dq->do_targetq = tq;
1373 _dispatch_object_debug(dq, "%s", __func__);
1374 return _dispatch_introspection_queue_create(dq);
1375 }
1376
1377 dispatch_queue_t
1378 dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
1379 dispatch_queue_t tq)
1380 {
1381 return _dispatch_queue_create_with_target(label, dqa, tq, false);
1382 }
1383
1384 dispatch_queue_t
1385 dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
1386 {
1387 return _dispatch_queue_create_with_target(label, attr,
1388 DISPATCH_TARGET_QUEUE_DEFAULT, true);
1389 }
1390
1391 dispatch_queue_t
1392 dispatch_queue_create_with_accounting_override_voucher(const char *label,
1393 dispatch_queue_attr_t attr, voucher_t voucher)
1394 {
1395 dispatch_queue_t dq = dispatch_queue_create_with_target(label, attr,
1396 DISPATCH_TARGET_QUEUE_DEFAULT);
1397 dq->dq_override_voucher = _voucher_create_accounting_voucher(voucher);
1398 return dq;
1399 }
1400
1401 void
1402 _dispatch_queue_destroy(dispatch_queue_t dq)
1403 {
1404 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
1405 uint64_t initial_state = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
1406
1407 if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
1408 initial_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE;
1409 }
1410 if (dx_type(dq) == DISPATCH_SOURCE_KEVENT_TYPE) {
1411 // dispatch_cancel_and_wait may apply overrides in a racy way with
1412 // the source cancellation finishing. This race is expensive and not
1413 // really worthwhile to resolve since the source becomes dead anyway.
1414 dq_state &= ~DISPATCH_QUEUE_HAS_OVERRIDE;
1415 }
1416 if (slowpath(dq_state != initial_state)) {
1417 if (_dq_state_drain_locked(dq_state)) {
1418 DISPATCH_CLIENT_CRASH(dq, "Release of a locked queue");
1419 }
1420 #ifndef __LP64__
1421 dq_state >>= 32;
1422 #endif
1423 DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
1424 "Release of a queue with corrupt state");
1425 }
1426 if (slowpath(dq == _dispatch_queue_get_current())) {
1427 DISPATCH_CLIENT_CRASH(dq, "Release of a queue by itself");
1428 }
1429 if (slowpath(dq->dq_items_tail)) {
1430 DISPATCH_CLIENT_CRASH(dq->dq_items_tail,
1431 "Release of a queue while items are enqueued");
1432 }
1433
1434 // trash the queue so that use after free will crash
1435 dq->dq_items_head = (void *)0x200;
1436 dq->dq_items_tail = (void *)0x200;
1437 // poison the state with something that is suspended and is easy to spot
1438 dq->dq_state = 0xdead000000000000;
1439
1440 dispatch_queue_t dqsq = os_atomic_xchg2o(dq, dq_specific_q,
1441 (void *)0x200, relaxed);
1442 if (dqsq) {
1443 _dispatch_release(dqsq);
1444 }
1445 if (dq->dq_override_voucher != DISPATCH_NO_VOUCHER) {
1446 if (dq->dq_override_voucher) _voucher_release(dq->dq_override_voucher);
1447 dq->dq_override_voucher = DISPATCH_NO_VOUCHER;
1448 }
1449 }
1450
1451 // 6618342 Contact the team that owns the Instrument DTrace probe before
1452 // renaming this symbol
1453 void
1454 _dispatch_queue_dispose(dispatch_queue_t dq)
1455 {
1456 _dispatch_object_debug(dq, "%s", __func__);
1457 _dispatch_introspection_queue_dispose(dq);
1458 if (dq->dq_label && _dispatch_queue_label_needs_free(dq)) {
1459 free((void*)dq->dq_label);
1460 }
1461 _dispatch_queue_destroy(dq);
1462 }
1463
1464 DISPATCH_NOINLINE
1465 static void
1466 _dispatch_queue_suspend_slow(dispatch_queue_t dq)
1467 {
1468 uint64_t dq_state, value, delta;
1469
1470 _dispatch_queue_sidelock_lock(dq);
1471
1472 // what we want to transfer (remove from dq_state)
1473 delta = DISPATCH_QUEUE_SUSPEND_HALF * DISPATCH_QUEUE_SUSPEND_INTERVAL;
1474 // but this is a suspend so add a suspend count at the same time
1475 delta -= DISPATCH_QUEUE_SUSPEND_INTERVAL;
1476 if (dq->dq_side_suspend_cnt == 0) {
1477 // we substract delta from dq_state, and we want to set this bit
1478 delta -= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT;
1479 }
1480
1481 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1482 // unsigned underflow of the substraction can happen because other
1483 // threads could have touched this value while we were trying to acquire
1484 // the lock, or because another thread raced us to do the same operation
1485 // and got to the lock first.
1486 if (slowpath(os_sub_overflow(dq_state, delta, &value))) {
1487 os_atomic_rmw_loop_give_up(goto retry);
1488 }
1489 });
1490 if (slowpath(os_add_overflow(dq->dq_side_suspend_cnt,
1491 DISPATCH_QUEUE_SUSPEND_HALF, &dq->dq_side_suspend_cnt))) {
1492 DISPATCH_CLIENT_CRASH(0, "Too many nested calls to dispatch_suspend()");
1493 }
1494 return _dispatch_queue_sidelock_unlock(dq);
1495
1496 retry:
1497 _dispatch_queue_sidelock_unlock(dq);
1498 return dx_vtable(dq)->do_suspend(dq);
1499 }
1500
1501 void
1502 _dispatch_queue_suspend(dispatch_queue_t dq)
1503 {
1504 dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT);
1505
1506 uint64_t dq_state, value;
1507
1508 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1509 value = DISPATCH_QUEUE_SUSPEND_INTERVAL;
1510 if (slowpath(os_add_overflow(dq_state, value, &value))) {
1511 os_atomic_rmw_loop_give_up({
1512 return _dispatch_queue_suspend_slow(dq);
1513 });
1514 }
1515 });
1516
1517 if (!_dq_state_is_suspended(dq_state)) {
1518 // rdar://8181908 we need to extend the queue life for the duration
1519 // of the call to wakeup at _dispatch_queue_resume() time.
1520 _dispatch_retain(dq);
1521 }
1522 }
1523
1524 DISPATCH_NOINLINE
1525 static void
1526 _dispatch_queue_resume_slow(dispatch_queue_t dq)
1527 {
1528 uint64_t dq_state, value, delta;
1529
1530 _dispatch_queue_sidelock_lock(dq);
1531
1532 // what we want to transfer
1533 delta = DISPATCH_QUEUE_SUSPEND_HALF * DISPATCH_QUEUE_SUSPEND_INTERVAL;
1534 // but this is a resume so consume a suspend count at the same time
1535 delta -= DISPATCH_QUEUE_SUSPEND_INTERVAL;
1536 switch (dq->dq_side_suspend_cnt) {
1537 case 0:
1538 goto retry;
1539 case DISPATCH_QUEUE_SUSPEND_HALF:
1540 // we will transition the side count to 0, so we want to clear this bit
1541 delta -= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT;
1542 break;
1543 }
1544 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1545 // unsigned overflow of the addition can happen because other
1546 // threads could have touched this value while we were trying to acquire
1547 // the lock, or because another thread raced us to do the same operation
1548 // and got to the lock first.
1549 if (slowpath(os_add_overflow(dq_state, delta, &value))) {
1550 os_atomic_rmw_loop_give_up(goto retry);
1551 }
1552 });
1553 dq->dq_side_suspend_cnt -= DISPATCH_QUEUE_SUSPEND_HALF;
1554 return _dispatch_queue_sidelock_unlock(dq);
1555
1556 retry:
1557 _dispatch_queue_sidelock_unlock(dq);
1558 return dx_vtable(dq)->do_resume(dq, false);
1559 }
1560
1561 DISPATCH_NOINLINE
1562 static void
1563 _dispatch_queue_resume_finalize_activation(dispatch_queue_t dq)
1564 {
1565 // Step 2: run the activation finalizer
1566 if (dx_vtable(dq)->do_finalize_activation) {
1567 dx_vtable(dq)->do_finalize_activation(dq);
1568 }
1569 // Step 3: consume the suspend count
1570 return dx_vtable(dq)->do_resume(dq, false);
1571 }
1572
1573 void
1574 _dispatch_queue_resume(dispatch_queue_t dq, bool activate)
1575 {
1576 // covers all suspend and inactive bits, including side suspend bit
1577 const uint64_t suspend_bits = DISPATCH_QUEUE_SUSPEND_BITS_MASK;
1578 // backward compatibility: only dispatch sources can abuse
1579 // dispatch_resume() to really mean dispatch_activate()
1580 bool resume_can_activate = (dx_type(dq) == DISPATCH_SOURCE_KEVENT_TYPE);
1581 uint64_t dq_state, value;
1582
1583 dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT);
1584
1585 // Activation is a bit tricky as it needs to finalize before the wakeup.
1586 //
1587 // If after doing its updates to the suspend count and/or inactive bit,
1588 // the last suspension related bit that would remain is the
1589 // NEEDS_ACTIVATION one, then this function:
1590 //
1591 // 1. moves the state to { sc:1 i:0 na:0 } (converts the needs-activate into
1592 // a suspend count)
1593 // 2. runs the activation finalizer
1594 // 3. consumes the suspend count set in (1), and finishes the resume flow
1595 //
1596 // Concurrently, some property setters such as setting dispatch source
1597 // handlers or _dispatch_queue_set_target_queue try to do in-place changes
1598 // before activation. These protect their action by taking a suspend count.
1599 // Step (1) above cannot happen if such a setter has locked the object.
1600 if (activate) {
1601 // relaxed atomic because this doesn't publish anything, this is only
1602 // about picking the thread that gets to finalize the activation
1603 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
1604 if ((dq_state & suspend_bits) ==
1605 DISPATCH_QUEUE_NEEDS_ACTIVATION + DISPATCH_QUEUE_INACTIVE) {
1606 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1607 value = dq_state - DISPATCH_QUEUE_INACTIVE
1608 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1609 + DISPATCH_QUEUE_SUSPEND_INTERVAL;
1610 } else if (_dq_state_is_inactive(dq_state)) {
1611 // { sc:>0 i:1 na:1 } -> { i:0 na:1 }
1612 // simple activation because sc is not 0
1613 // resume will deal with na:1 later
1614 value = dq_state - DISPATCH_QUEUE_INACTIVE;
1615 } else {
1616 // object already active, this is a no-op, just exit
1617 os_atomic_rmw_loop_give_up(return);
1618 }
1619 });
1620 } else {
1621 // release barrier needed to publish the effect of
1622 // - dispatch_set_target_queue()
1623 // - dispatch_set_*_handler()
1624 // - do_finalize_activation()
1625 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, release, {
1626 if ((dq_state & suspend_bits) == DISPATCH_QUEUE_SUSPEND_INTERVAL
1627 + DISPATCH_QUEUE_NEEDS_ACTIVATION) {
1628 // { sc:1 i:0 na:1 } -> { sc:1 i:0 na:0 }
1629 value = dq_state - DISPATCH_QUEUE_NEEDS_ACTIVATION;
1630 } else if (resume_can_activate && (dq_state & suspend_bits) ==
1631 DISPATCH_QUEUE_NEEDS_ACTIVATION + DISPATCH_QUEUE_INACTIVE) {
1632 // { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
1633 value = dq_state - DISPATCH_QUEUE_INACTIVE
1634 - DISPATCH_QUEUE_NEEDS_ACTIVATION
1635 + DISPATCH_QUEUE_SUSPEND_INTERVAL;
1636 } else {
1637 value = DISPATCH_QUEUE_SUSPEND_INTERVAL;
1638 if (slowpath(os_sub_overflow(dq_state, value, &value))) {
1639 // underflow means over-resume or a suspend count transfer
1640 // to the side count is needed
1641 os_atomic_rmw_loop_give_up({
1642 if (!(dq_state & DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT)) {
1643 goto over_resume;
1644 }
1645 return _dispatch_queue_resume_slow(dq);
1646 });
1647 }
1648 if (_dq_state_is_runnable(value) &&
1649 !_dq_state_drain_locked(value)) {
1650 uint64_t full_width = value;
1651 if (_dq_state_has_pending_barrier(value)) {
1652 full_width -= DISPATCH_QUEUE_PENDING_BARRIER;
1653 full_width += DISPATCH_QUEUE_WIDTH_INTERVAL;
1654 full_width += DISPATCH_QUEUE_IN_BARRIER;
1655 } else {
1656 full_width += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
1657 full_width += DISPATCH_QUEUE_IN_BARRIER;
1658 }
1659 if ((full_width & DISPATCH_QUEUE_WIDTH_MASK) ==
1660 DISPATCH_QUEUE_WIDTH_FULL_BIT) {
1661 value = full_width;
1662 value &= ~DISPATCH_QUEUE_DIRTY;
1663 value |= _dispatch_tid_self();
1664 }
1665 }
1666 }
1667 });
1668 }
1669
1670 if ((dq_state ^ value) & DISPATCH_QUEUE_NEEDS_ACTIVATION) {
1671 // we cleared the NEEDS_ACTIVATION bit and we have a valid suspend count
1672 return _dispatch_queue_resume_finalize_activation(dq);
1673 }
1674
1675 if (activate) {
1676 // if we're still in an activate codepath here we should have
1677 // { sc:>0 na:1 }, if not we've got a corrupt state
1678 if (!fastpath(_dq_state_is_suspended(value))) {
1679 DISPATCH_CLIENT_CRASH(dq, "Invalid suspension state");
1680 }
1681 return;
1682 }
1683
1684 if (_dq_state_is_suspended(value)) {
1685 return;
1686 }
1687
1688 if ((dq_state ^ value) & DISPATCH_QUEUE_IN_BARRIER) {
1689 _dispatch_release(dq);
1690 return _dispatch_try_lock_transfer_or_wakeup(dq);
1691 }
1692
1693 if (_dq_state_should_wakeup(value)) {
1694 // <rdar://problem/14637483>
1695 // seq_cst wrt state changes that were flushed and not acted upon
1696 os_atomic_thread_fence(acquire);
1697 pthread_priority_t pp = _dispatch_queue_reset_override_priority(dq,
1698 _dispatch_queue_is_thread_bound(dq));
1699 return dx_wakeup(dq, pp, DISPATCH_WAKEUP_CONSUME);
1700 }
1701 return _dispatch_release_tailcall(dq);
1702
1703 over_resume:
1704 if (slowpath(_dq_state_is_inactive(dq_state))) {
1705 DISPATCH_CLIENT_CRASH(dq, "Over-resume of an inactive object");
1706 }
1707 DISPATCH_CLIENT_CRASH(dq, "Over-resume of an object");
1708 }
1709
1710 const char *
1711 dispatch_queue_get_label(dispatch_queue_t dq)
1712 {
1713 if (slowpath(dq == DISPATCH_CURRENT_QUEUE_LABEL)) {
1714 dq = _dispatch_get_current_queue();
1715 }
1716 return dq->dq_label ? dq->dq_label : "";
1717 }
1718
1719 qos_class_t
1720 dispatch_queue_get_qos_class(dispatch_queue_t dq, int *relative_priority_ptr)
1721 {
1722 qos_class_t qos = _DISPATCH_QOS_CLASS_UNSPECIFIED;
1723 int relative_priority = 0;
1724 #if HAVE_PTHREAD_WORKQUEUE_QOS
1725 pthread_priority_t dqp = dq->dq_priority;
1726 if (dqp & _PTHREAD_PRIORITY_INHERIT_FLAG) dqp = 0;
1727 qos = _pthread_qos_class_decode(dqp, &relative_priority, NULL);
1728 #else
1729 (void)dq;
1730 #endif
1731 if (relative_priority_ptr) *relative_priority_ptr = relative_priority;
1732 return qos;
1733 }
1734
1735 static void
1736 _dispatch_queue_set_width2(void *ctxt)
1737 {
1738 int w = (int)(intptr_t)ctxt; // intentional truncation
1739 uint32_t tmp;
1740 dispatch_queue_t dq = _dispatch_queue_get_current();
1741
1742 if (w > 0) {
1743 tmp = (unsigned int)w;
1744 } else switch (w) {
1745 case 0:
1746 tmp = 1;
1747 break;
1748 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
1749 tmp = dispatch_hw_config(physical_cpus);
1750 break;
1751 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
1752 tmp = dispatch_hw_config(active_cpus);
1753 break;
1754 default:
1755 // fall through
1756 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
1757 tmp = dispatch_hw_config(logical_cpus);
1758 break;
1759 }
1760 if (tmp > DISPATCH_QUEUE_WIDTH_MAX) {
1761 tmp = DISPATCH_QUEUE_WIDTH_MAX;
1762 }
1763
1764 dispatch_queue_flags_t old_dqf, new_dqf;
1765 os_atomic_rmw_loop2o(dq, dq_atomic_flags, old_dqf, new_dqf, relaxed, {
1766 new_dqf = old_dqf & ~DQF_WIDTH_MASK;
1767 new_dqf |= (tmp << DQF_WIDTH_SHIFT);
1768 });
1769 _dispatch_object_debug(dq, "%s", __func__);
1770 }
1771
1772 void
1773 dispatch_queue_set_width(dispatch_queue_t dq, long width)
1774 {
1775 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) ||
1776 slowpath(dx_hastypeflag(dq, QUEUE_ROOT))) {
1777 return;
1778 }
1779
1780 unsigned long type = dx_type(dq);
1781 switch (type) {
1782 case DISPATCH_QUEUE_LEGACY_TYPE:
1783 case DISPATCH_QUEUE_CONCURRENT_TYPE:
1784 break;
1785 case DISPATCH_QUEUE_SERIAL_TYPE:
1786 DISPATCH_CLIENT_CRASH(type, "Cannot set width of a serial queue");
1787 default:
1788 DISPATCH_CLIENT_CRASH(type, "Unexpected dispatch object type");
1789 }
1790
1791 _dispatch_barrier_trysync_or_async_f(dq, (void*)(intptr_t)width,
1792 _dispatch_queue_set_width2);
1793 }
1794
1795 static void
1796 _dispatch_queue_legacy_set_target_queue(void *ctxt)
1797 {
1798 dispatch_queue_t dq = _dispatch_queue_get_current();
1799 dispatch_queue_t tq = ctxt;
1800 dispatch_queue_t otq = dq->do_targetq;
1801
1802 if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
1803 _dispatch_ktrace3(DISPATCH_PERF_non_leaf_retarget, dq, otq, tq);
1804 _dispatch_bug_deprecated("Changing the target of a queue "
1805 "already targeted by other dispatch objects");
1806 }
1807
1808 _dispatch_queue_priority_inherit_from_target(dq, tq);
1809 _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
1810 #if HAVE_PTHREAD_WORKQUEUE_QOS
1811 // see _dispatch_queue_class_wakeup()
1812 _dispatch_queue_sidelock_lock(dq);
1813 #endif
1814 dq->do_targetq = tq;
1815 #if HAVE_PTHREAD_WORKQUEUE_QOS
1816 // see _dispatch_queue_class_wakeup()
1817 _dispatch_queue_sidelock_unlock(dq);
1818 #endif
1819
1820 _dispatch_object_debug(dq, "%s", __func__);
1821 _dispatch_introspection_target_queue_changed(dq);
1822 _dispatch_release_tailcall(otq);
1823 }
1824
1825 void
1826 _dispatch_queue_set_target_queue(dispatch_queue_t dq, dispatch_queue_t tq)
1827 {
1828 dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT &&
1829 dq->do_targetq);
1830
1831 if (slowpath(!tq)) {
1832 bool is_concurrent_q = (dq->dq_width > 1);
1833 tq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
1834 !is_concurrent_q);
1835 }
1836
1837 if (_dispatch_queue_try_inactive_suspend(dq)) {
1838 _dispatch_object_set_target_queue_inline(dq, tq);
1839 return dx_vtable(dq)->do_resume(dq, false);
1840 }
1841
1842 if (dq->dq_override_voucher != DISPATCH_NO_VOUCHER) {
1843 DISPATCH_CLIENT_CRASH(dq, "Cannot change the target of a queue or "
1844 "source with an accounting override voucher "
1845 "after it has been activated");
1846 }
1847
1848 unsigned long type = dx_type(dq);
1849 switch (type) {
1850 case DISPATCH_QUEUE_LEGACY_TYPE:
1851 if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
1852 _dispatch_bug_deprecated("Changing the target of a queue "
1853 "already targeted by other dispatch objects");
1854 }
1855 break;
1856 case DISPATCH_SOURCE_KEVENT_TYPE:
1857 case DISPATCH_MACH_CHANNEL_TYPE:
1858 _dispatch_ktrace1(DISPATCH_PERF_post_activate_retarget, dq);
1859 _dispatch_bug_deprecated("Changing the target of a source "
1860 "after it has been activated");
1861 break;
1862
1863 case DISPATCH_QUEUE_SERIAL_TYPE:
1864 case DISPATCH_QUEUE_CONCURRENT_TYPE:
1865 DISPATCH_CLIENT_CRASH(type, "Cannot change the target of this queue "
1866 "after it has been activated");
1867 default:
1868 DISPATCH_CLIENT_CRASH(type, "Unexpected dispatch object type");
1869 }
1870
1871 _dispatch_retain(tq);
1872 return _dispatch_barrier_trysync_or_async_f(dq, tq,
1873 _dispatch_queue_legacy_set_target_queue);
1874 }
1875
1876 #pragma mark -
1877 #pragma mark dispatch_mgr_queue
1878
1879 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1880 static struct dispatch_pthread_root_queue_context_s
1881 _dispatch_mgr_root_queue_pthread_context;
1882 static struct dispatch_root_queue_context_s
1883 _dispatch_mgr_root_queue_context = {{{
1884 #if HAVE_PTHREAD_WORKQUEUES
1885 .dgq_kworkqueue = (void*)(~0ul),
1886 #endif
1887 .dgq_ctxt = &_dispatch_mgr_root_queue_pthread_context,
1888 .dgq_thread_pool_size = 1,
1889 }}};
1890
1891 static struct dispatch_queue_s _dispatch_mgr_root_queue = {
1892 DISPATCH_GLOBAL_OBJECT_HEADER(queue_root),
1893 .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
1894 .do_ctxt = &_dispatch_mgr_root_queue_context,
1895 .dq_label = "com.apple.root.libdispatch-manager",
1896 .dq_width = DISPATCH_QUEUE_WIDTH_POOL,
1897 .dq_override = DISPATCH_SATURATED_OVERRIDE,
1898 .dq_override_voucher = DISPATCH_NO_VOUCHER,
1899 .dq_serialnum = 3,
1900 };
1901 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1902
1903 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
1904 static struct {
1905 volatile int prio;
1906 volatile qos_class_t qos;
1907 int default_prio;
1908 int policy;
1909 pthread_t tid;
1910 } _dispatch_mgr_sched;
1911
1912 static dispatch_once_t _dispatch_mgr_sched_pred;
1913
1914 // TODO: switch to "event-reflector thread" property <rdar://problem/18126138>
1915
1916 #if HAVE_PTHREAD_WORKQUEUE_QOS
1917 // Must be kept in sync with list of qos classes in sys/qos.h
1918 static const int _dispatch_mgr_sched_qos2prio[] = {
1919 [_DISPATCH_QOS_CLASS_MAINTENANCE] = 4,
1920 [_DISPATCH_QOS_CLASS_BACKGROUND] = 4,
1921 [_DISPATCH_QOS_CLASS_UTILITY] = 20,
1922 [_DISPATCH_QOS_CLASS_DEFAULT] = 31,
1923 [_DISPATCH_QOS_CLASS_USER_INITIATED] = 37,
1924 [_DISPATCH_QOS_CLASS_USER_INTERACTIVE] = 47,
1925 };
1926 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
1927
1928 static void
1929 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED)
1930 {
1931 struct sched_param param;
1932 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1933 pthread_attr_t *attr;
1934 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
1935 #else
1936 pthread_attr_t a, *attr = &a;
1937 #endif
1938 (void)dispatch_assume_zero(pthread_attr_init(attr));
1939 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr,
1940 &_dispatch_mgr_sched.policy));
1941 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
1942 #if HAVE_PTHREAD_WORKQUEUE_QOS
1943 qos_class_t qos = qos_class_main();
1944 if (qos == _DISPATCH_QOS_CLASS_DEFAULT) {
1945 qos = _DISPATCH_QOS_CLASS_USER_INITIATED; // rdar://problem/17279292
1946 }
1947 if (qos) {
1948 _dispatch_mgr_sched.qos = qos;
1949 param.sched_priority = _dispatch_mgr_sched_qos2prio[qos];
1950 }
1951 #endif
1952 _dispatch_mgr_sched.default_prio = param.sched_priority;
1953 _dispatch_mgr_sched.prio = _dispatch_mgr_sched.default_prio;
1954 }
1955 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
1956
1957 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1958 DISPATCH_NOINLINE
1959 static pthread_t *
1960 _dispatch_mgr_root_queue_init(void)
1961 {
1962 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
1963 struct sched_param param;
1964 pthread_attr_t *attr;
1965 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
1966 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr,
1967 PTHREAD_CREATE_DETACHED));
1968 #if !DISPATCH_DEBUG
1969 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr, 64 * 1024));
1970 #endif
1971 #if HAVE_PTHREAD_WORKQUEUE_QOS
1972 qos_class_t qos = _dispatch_mgr_sched.qos;
1973 if (qos) {
1974 if (_dispatch_set_qos_class_enabled) {
1975 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(attr,
1976 qos, 0));
1977 }
1978 _dispatch_mgr_q.dq_priority =
1979 (dispatch_priority_t)_pthread_qos_class_encode(qos, 0, 0);
1980 }
1981 #endif
1982 param.sched_priority = _dispatch_mgr_sched.prio;
1983 if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
1984 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr, &param));
1985 }
1986 return &_dispatch_mgr_sched.tid;
1987 }
1988
1989 static inline void
1990 _dispatch_mgr_priority_apply(void)
1991 {
1992 struct sched_param param;
1993 do {
1994 param.sched_priority = _dispatch_mgr_sched.prio;
1995 if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
1996 (void)dispatch_assume_zero(pthread_setschedparam(
1997 _dispatch_mgr_sched.tid, _dispatch_mgr_sched.policy,
1998 &param));
1999 }
2000 } while (_dispatch_mgr_sched.prio > param.sched_priority);
2001 }
2002
2003 DISPATCH_NOINLINE
2004 void
2005 _dispatch_mgr_priority_init(void)
2006 {
2007 struct sched_param param;
2008 pthread_attr_t *attr;
2009 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
2010 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
2011 #if HAVE_PTHREAD_WORKQUEUE_QOS
2012 qos_class_t qos = 0;
2013 (void)pthread_attr_get_qos_class_np(attr, &qos, NULL);
2014 if (_dispatch_mgr_sched.qos > qos && _dispatch_set_qos_class_enabled) {
2015 (void)pthread_set_qos_class_self_np(_dispatch_mgr_sched.qos, 0);
2016 int p = _dispatch_mgr_sched_qos2prio[_dispatch_mgr_sched.qos];
2017 if (p > param.sched_priority) {
2018 param.sched_priority = p;
2019 }
2020 }
2021 #endif
2022 if (slowpath(_dispatch_mgr_sched.prio > param.sched_priority)) {
2023 return _dispatch_mgr_priority_apply();
2024 }
2025 }
2026 #endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2027
2028 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2029 DISPATCH_NOINLINE
2030 static void
2031 _dispatch_mgr_priority_raise(const pthread_attr_t *attr)
2032 {
2033 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
2034 struct sched_param param;
2035 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
2036 #if HAVE_PTHREAD_WORKQUEUE_QOS
2037 qos_class_t q, qos = 0;
2038 (void)pthread_attr_get_qos_class_np((pthread_attr_t *)attr, &qos, NULL);
2039 if (qos) {
2040 param.sched_priority = _dispatch_mgr_sched_qos2prio[qos];
2041 os_atomic_rmw_loop2o(&_dispatch_mgr_sched, qos, q, qos, relaxed, {
2042 if (q >= qos) os_atomic_rmw_loop_give_up(break);
2043 });
2044 }
2045 #endif
2046 int p, prio = param.sched_priority;
2047 os_atomic_rmw_loop2o(&_dispatch_mgr_sched, prio, p, prio, relaxed, {
2048 if (p >= prio) os_atomic_rmw_loop_give_up(return);
2049 });
2050 #if DISPATCH_USE_KEVENT_WORKQUEUE
2051 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
2052 _dispatch_root_queues_init_once);
2053 if (_dispatch_kevent_workqueue_enabled) {
2054 pthread_priority_t pp = 0;
2055 if (prio > _dispatch_mgr_sched.default_prio) {
2056 // The values of _PTHREAD_PRIORITY_SCHED_PRI_FLAG and
2057 // _PTHREAD_PRIORITY_ROOTQUEUE_FLAG overlap, but that is not
2058 // problematic in this case, since it the second one is only ever
2059 // used on dq_priority fields.
2060 // We never pass the _PTHREAD_PRIORITY_ROOTQUEUE_FLAG to a syscall,
2061 // it is meaningful to libdispatch only.
2062 pp = (pthread_priority_t)prio | _PTHREAD_PRIORITY_SCHED_PRI_FLAG;
2063 } else if (qos) {
2064 pp = _pthread_qos_class_encode(qos, 0, 0);
2065 }
2066 if (pp) {
2067 int r = _pthread_workqueue_set_event_manager_priority(pp);
2068 (void)dispatch_assume_zero(r);
2069 }
2070 return;
2071 }
2072 #endif
2073 #if DISPATCH_USE_MGR_THREAD
2074 if (_dispatch_mgr_sched.tid) {
2075 return _dispatch_mgr_priority_apply();
2076 }
2077 #endif
2078 }
2079 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2080
2081 #if DISPATCH_USE_KEVENT_WORKQUEUE
2082 void
2083 _dispatch_kevent_workqueue_init(void)
2084 {
2085 // Initialize kevent workqueue support
2086 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
2087 _dispatch_root_queues_init_once);
2088 if (!_dispatch_kevent_workqueue_enabled) return;
2089 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
2090 qos_class_t qos = _dispatch_mgr_sched.qos;
2091 int prio = _dispatch_mgr_sched.prio;
2092 pthread_priority_t pp = 0;
2093 if (qos) {
2094 pp = _pthread_qos_class_encode(qos, 0, 0);
2095 _dispatch_mgr_q.dq_priority = (dispatch_priority_t)pp;
2096 }
2097 if (prio > _dispatch_mgr_sched.default_prio) {
2098 pp = (pthread_priority_t)prio | _PTHREAD_PRIORITY_SCHED_PRI_FLAG;
2099 }
2100 if (pp) {
2101 int r = _pthread_workqueue_set_event_manager_priority(pp);
2102 (void)dispatch_assume_zero(r);
2103 }
2104 }
2105 #endif
2106
2107 #pragma mark -
2108 #pragma mark dispatch_pthread_root_queue
2109
2110 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2111 static dispatch_queue_t
2112 _dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
2113 const pthread_attr_t *attr, dispatch_block_t configure,
2114 dispatch_pthread_root_queue_observer_hooks_t observer_hooks)
2115 {
2116 dispatch_queue_t dq;
2117 dispatch_root_queue_context_t qc;
2118 dispatch_pthread_root_queue_context_t pqc;
2119 dispatch_queue_flags_t dqf = 0;
2120 size_t dqs;
2121 uint8_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ?
2122 (uint8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0;
2123
2124 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
2125 dqs = roundup(dqs, _Alignof(struct dispatch_root_queue_context_s));
2126 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_root), dqs +
2127 sizeof(struct dispatch_root_queue_context_s) +
2128 sizeof(struct dispatch_pthread_root_queue_context_s));
2129 qc = (void*)dq + dqs;
2130 dispatch_assert((uintptr_t)qc % _Alignof(typeof(*qc)) == 0);
2131 pqc = (void*)qc + sizeof(struct dispatch_root_queue_context_s);
2132 dispatch_assert((uintptr_t)pqc % _Alignof(typeof(*pqc)) == 0);
2133 if (label) {
2134 const char *tmp = _dispatch_strdup_if_mutable(label);
2135 if (tmp != label) {
2136 dqf |= DQF_LABEL_NEEDS_FREE;
2137 label = tmp;
2138 }
2139 }
2140
2141 _dispatch_queue_init(dq, dqf, DISPATCH_QUEUE_WIDTH_POOL, false);
2142 dq->dq_label = label;
2143 dq->dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
2144 dq->dq_override = DISPATCH_SATURATED_OVERRIDE;
2145 dq->do_ctxt = qc;
2146 dq->do_targetq = NULL;
2147
2148 pqc->dpq_thread_mediator.do_vtable = DISPATCH_VTABLE(semaphore);
2149 qc->dgq_ctxt = pqc;
2150 #if HAVE_PTHREAD_WORKQUEUES
2151 qc->dgq_kworkqueue = (void*)(~0ul);
2152 #endif
2153 _dispatch_root_queue_init_pthread_pool(qc, pool_size, true);
2154
2155 if (attr) {
2156 memcpy(&pqc->dpq_thread_attr, attr, sizeof(pthread_attr_t));
2157 _dispatch_mgr_priority_raise(&pqc->dpq_thread_attr);
2158 } else {
2159 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
2160 }
2161 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
2162 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
2163 if (configure) {
2164 pqc->dpq_thread_configure = _dispatch_Block_copy(configure);
2165 }
2166 if (observer_hooks) {
2167 pqc->dpq_observer_hooks = *observer_hooks;
2168 }
2169 _dispatch_object_debug(dq, "%s", __func__);
2170 return _dispatch_introspection_queue_create(dq);
2171 }
2172
2173 dispatch_queue_t
2174 dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
2175 const pthread_attr_t *attr, dispatch_block_t configure)
2176 {
2177 return _dispatch_pthread_root_queue_create(label, flags, attr, configure,
2178 NULL);
2179 }
2180
2181 #if DISPATCH_IOHID_SPI
2182 dispatch_queue_t
2183 _dispatch_pthread_root_queue_create_with_observer_hooks_4IOHID(const char *label,
2184 unsigned long flags, const pthread_attr_t *attr,
2185 dispatch_pthread_root_queue_observer_hooks_t observer_hooks,
2186 dispatch_block_t configure)
2187 {
2188 if (!observer_hooks->queue_will_execute ||
2189 !observer_hooks->queue_did_execute) {
2190 DISPATCH_CLIENT_CRASH(0, "Invalid pthread root queue observer hooks");
2191 }
2192 return _dispatch_pthread_root_queue_create(label, flags, attr, configure,
2193 observer_hooks);
2194 }
2195 #endif
2196
2197 dispatch_queue_t
2198 dispatch_pthread_root_queue_copy_current(void)
2199 {
2200 dispatch_queue_t dq = _dispatch_queue_get_current();
2201 if (!dq) return NULL;
2202 while (slowpath(dq->do_targetq)) {
2203 dq = dq->do_targetq;
2204 }
2205 if (dx_type(dq) != DISPATCH_QUEUE_GLOBAL_ROOT_TYPE ||
2206 dq->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
2207 return NULL;
2208 }
2209 return (dispatch_queue_t)_os_object_retain_with_resurrect(dq->_as_os_obj);
2210 }
2211
2212 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2213
2214 void
2215 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq)
2216 {
2217 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
2218 DISPATCH_INTERNAL_CRASH(dq, "Global root queue disposed");
2219 }
2220 _dispatch_object_debug(dq, "%s", __func__);
2221 _dispatch_introspection_queue_dispose(dq);
2222 #if DISPATCH_USE_PTHREAD_POOL
2223 dispatch_root_queue_context_t qc = dq->do_ctxt;
2224 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
2225
2226 pthread_attr_destroy(&pqc->dpq_thread_attr);
2227 _dispatch_semaphore_dispose(&pqc->dpq_thread_mediator);
2228 if (pqc->dpq_thread_configure) {
2229 Block_release(pqc->dpq_thread_configure);
2230 }
2231 dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
2232 false);
2233 #endif
2234 if (dq->dq_label && _dispatch_queue_label_needs_free(dq)) {
2235 free((void*)dq->dq_label);
2236 }
2237 _dispatch_queue_destroy(dq);
2238 }
2239
2240 #pragma mark -
2241 #pragma mark dispatch_queue_specific
2242
2243 struct dispatch_queue_specific_queue_s {
2244 DISPATCH_QUEUE_HEADER(queue_specific_queue);
2245 TAILQ_HEAD(dispatch_queue_specific_head_s,
2246 dispatch_queue_specific_s) dqsq_contexts;
2247 } DISPATCH_QUEUE_ALIGN;
2248
2249 struct dispatch_queue_specific_s {
2250 const void *dqs_key;
2251 void *dqs_ctxt;
2252 dispatch_function_t dqs_destructor;
2253 TAILQ_ENTRY(dispatch_queue_specific_s) dqs_list;
2254 };
2255 DISPATCH_DECL(dispatch_queue_specific);
2256
2257 void
2258 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq)
2259 {
2260 dispatch_queue_specific_t dqs, tmp;
2261
2262 TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) {
2263 if (dqs->dqs_destructor) {
2264 dispatch_async_f(_dispatch_get_root_queue(
2265 _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt,
2266 dqs->dqs_destructor);
2267 }
2268 free(dqs);
2269 }
2270 _dispatch_queue_destroy(dqsq->_as_dq);
2271 }
2272
2273 static void
2274 _dispatch_queue_init_specific(dispatch_queue_t dq)
2275 {
2276 dispatch_queue_specific_queue_t dqsq;
2277
2278 dqsq = _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue),
2279 sizeof(struct dispatch_queue_specific_queue_s));
2280 _dispatch_queue_init(dqsq->_as_dq, DQF_NONE,
2281 DISPATCH_QUEUE_WIDTH_MAX, false);
2282 dqsq->do_xref_cnt = -1;
2283 dqsq->do_targetq = _dispatch_get_root_queue(
2284 _DISPATCH_QOS_CLASS_USER_INITIATED, true);
2285 dqsq->dq_label = "queue-specific";
2286 TAILQ_INIT(&dqsq->dqsq_contexts);
2287 if (slowpath(!os_atomic_cmpxchg2o(dq, dq_specific_q, NULL,
2288 dqsq->_as_dq, release))) {
2289 _dispatch_release(dqsq->_as_dq);
2290 }
2291 }
2292
2293 static void
2294 _dispatch_queue_set_specific(void *ctxt)
2295 {
2296 dispatch_queue_specific_t dqs, dqsn = ctxt;
2297 dispatch_queue_specific_queue_t dqsq =
2298 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
2299
2300 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
2301 if (dqs->dqs_key == dqsn->dqs_key) {
2302 // Destroy previous context for existing key
2303 if (dqs->dqs_destructor) {
2304 dispatch_async_f(_dispatch_get_root_queue(
2305 _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt,
2306 dqs->dqs_destructor);
2307 }
2308 if (dqsn->dqs_ctxt) {
2309 // Copy new context for existing key
2310 dqs->dqs_ctxt = dqsn->dqs_ctxt;
2311 dqs->dqs_destructor = dqsn->dqs_destructor;
2312 } else {
2313 // Remove context storage for existing key
2314 TAILQ_REMOVE(&dqsq->dqsq_contexts, dqs, dqs_list);
2315 free(dqs);
2316 }
2317 return free(dqsn);
2318 }
2319 }
2320 // Insert context storage for new key
2321 TAILQ_INSERT_TAIL(&dqsq->dqsq_contexts, dqsn, dqs_list);
2322 }
2323
2324 DISPATCH_NOINLINE
2325 void
2326 dispatch_queue_set_specific(dispatch_queue_t dq, const void *key,
2327 void *ctxt, dispatch_function_t destructor)
2328 {
2329 if (slowpath(!key)) {
2330 return;
2331 }
2332 dispatch_queue_specific_t dqs;
2333
2334 dqs = _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s));
2335 dqs->dqs_key = key;
2336 dqs->dqs_ctxt = ctxt;
2337 dqs->dqs_destructor = destructor;
2338 if (slowpath(!dq->dq_specific_q)) {
2339 _dispatch_queue_init_specific(dq);
2340 }
2341 _dispatch_barrier_trysync_or_async_f(dq->dq_specific_q, dqs,
2342 _dispatch_queue_set_specific);
2343 }
2344
2345 static void
2346 _dispatch_queue_get_specific(void *ctxt)
2347 {
2348 void **ctxtp = ctxt;
2349 void *key = *ctxtp;
2350 dispatch_queue_specific_queue_t dqsq =
2351 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
2352 dispatch_queue_specific_t dqs;
2353
2354 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
2355 if (dqs->dqs_key == key) {
2356 *ctxtp = dqs->dqs_ctxt;
2357 return;
2358 }
2359 }
2360 *ctxtp = NULL;
2361 }
2362
2363 DISPATCH_NOINLINE
2364 void *
2365 dispatch_queue_get_specific(dispatch_queue_t dq, const void *key)
2366 {
2367 if (slowpath(!key)) {
2368 return NULL;
2369 }
2370 void *ctxt = NULL;
2371
2372 if (fastpath(dq->dq_specific_q)) {
2373 ctxt = (void *)key;
2374 dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific);
2375 }
2376 return ctxt;
2377 }
2378
2379 DISPATCH_NOINLINE
2380 void *
2381 dispatch_get_specific(const void *key)
2382 {
2383 if (slowpath(!key)) {
2384 return NULL;
2385 }
2386 void *ctxt = NULL;
2387 dispatch_queue_t dq = _dispatch_queue_get_current();
2388
2389 while (slowpath(dq)) {
2390 if (slowpath(dq->dq_specific_q)) {
2391 ctxt = (void *)key;
2392 dispatch_sync_f(dq->dq_specific_q, &ctxt,
2393 _dispatch_queue_get_specific);
2394 if (ctxt) break;
2395 }
2396 dq = dq->do_targetq;
2397 }
2398 return ctxt;
2399 }
2400
2401 #if DISPATCH_IOHID_SPI
2402 bool
2403 _dispatch_queue_is_exclusively_owned_by_current_thread_4IOHID(
2404 dispatch_queue_t dq) // rdar://problem/18033810
2405 {
2406 if (dq->dq_width != 1) {
2407 DISPATCH_CLIENT_CRASH(dq->dq_width, "Invalid queue type");
2408 }
2409 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
2410 return _dq_state_drain_locked_by(dq_state, _dispatch_tid_self());
2411 }
2412 #endif
2413
2414 #pragma mark -
2415 #pragma mark dispatch_queue_debug
2416
2417 size_t
2418 _dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
2419 {
2420 size_t offset = 0;
2421 dispatch_queue_t target = dq->do_targetq;
2422 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
2423
2424 offset += dsnprintf(&buf[offset], bufsiz - offset,
2425 "target = %s[%p], width = 0x%x, state = 0x%016llx",
2426 target && target->dq_label ? target->dq_label : "", target,
2427 dq->dq_width, (unsigned long long)dq_state);
2428 if (_dq_state_is_suspended(dq_state)) {
2429 offset += dsnprintf(&buf[offset], bufsiz - offset, ", suspended = %d",
2430 _dq_state_suspend_cnt(dq_state));
2431 }
2432 if (_dq_state_is_inactive(dq_state)) {
2433 offset += dsnprintf(&buf[offset], bufsiz - offset, ", inactive");
2434 } else if (_dq_state_needs_activation(dq_state)) {
2435 offset += dsnprintf(&buf[offset], bufsiz - offset, ", needs-activation");
2436 }
2437 if (_dq_state_is_enqueued(dq_state)) {
2438 offset += dsnprintf(&buf[offset], bufsiz - offset, ", enqueued");
2439 }
2440 if (_dq_state_is_dirty(dq_state)) {
2441 offset += dsnprintf(&buf[offset], bufsiz - offset, ", dirty");
2442 }
2443 if (_dq_state_has_override(dq_state)) {
2444 offset += dsnprintf(&buf[offset], bufsiz - offset, ", async-override");
2445 }
2446 mach_port_t owner = _dq_state_drain_owner(dq_state);
2447 if (!_dispatch_queue_is_thread_bound(dq) && owner) {
2448 offset += dsnprintf(&buf[offset], bufsiz - offset, ", draining on 0x%x",
2449 owner);
2450 }
2451 if (_dq_state_is_in_barrier(dq_state)) {
2452 offset += dsnprintf(&buf[offset], bufsiz - offset, ", in-barrier");
2453 } else {
2454 offset += dsnprintf(&buf[offset], bufsiz - offset, ", in-flight = %d",
2455 _dq_state_used_width(dq_state, dq->dq_width));
2456 }
2457 if (_dq_state_has_pending_barrier(dq_state)) {
2458 offset += dsnprintf(&buf[offset], bufsiz - offset, ", pending-barrier");
2459 }
2460 if (_dispatch_queue_is_thread_bound(dq)) {
2461 offset += dsnprintf(&buf[offset], bufsiz - offset, ", thread = 0x%x ",
2462 owner);
2463 }
2464 return offset;
2465 }
2466
2467 size_t
2468 dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
2469 {
2470 size_t offset = 0;
2471 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2472 dq->dq_label ? dq->dq_label : dx_kind(dq), dq);
2473 offset += _dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
2474 offset += _dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
2475 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2476 return offset;
2477 }
2478
2479 #if DISPATCH_DEBUG
2480 void
2481 dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
2482 if (fastpath(dq)) {
2483 _dispatch_object_debug(dq, "%s", str);
2484 } else {
2485 _dispatch_log("queue[NULL]: %s", str);
2486 }
2487 }
2488 #endif
2489
2490 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
2491 static OSSpinLock _dispatch_stats_lock;
2492 static struct {
2493 uint64_t time_total;
2494 uint64_t count_total;
2495 uint64_t thread_total;
2496 } _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set
2497
2498 static void
2499 _dispatch_queue_merge_stats(uint64_t start)
2500 {
2501 uint64_t delta = _dispatch_absolute_time() - start;
2502 unsigned long count;
2503
2504 count = (unsigned long)_dispatch_thread_getspecific(dispatch_bcounter_key);
2505 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
2506
2507 int bucket = flsl((long)count);
2508
2509 // 64-bit counters on 32-bit require a lock or a queue
2510 OSSpinLockLock(&_dispatch_stats_lock);
2511
2512 _dispatch_stats[bucket].time_total += delta;
2513 _dispatch_stats[bucket].count_total += count;
2514 _dispatch_stats[bucket].thread_total++;
2515
2516 OSSpinLockUnlock(&_dispatch_stats_lock);
2517 }
2518 #endif
2519
2520 #pragma mark -
2521 #pragma mark _dispatch_set_priority_and_mach_voucher
2522 #if HAVE_PTHREAD_WORKQUEUE_QOS
2523
2524 DISPATCH_NOINLINE
2525 void
2526 _dispatch_set_priority_and_mach_voucher_slow(pthread_priority_t pp,
2527 mach_voucher_t kv)
2528 {
2529 _pthread_set_flags_t pflags = 0;
2530 if (pp && _dispatch_set_qos_class_enabled) {
2531 pthread_priority_t old_pri = _dispatch_get_priority();
2532 if (pp != old_pri) {
2533 if (old_pri & _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG) {
2534 pflags |= _PTHREAD_SET_SELF_WQ_KEVENT_UNBIND;
2535 // when we unbind, overcomitness can flip, so we need to learn
2536 // it from the defaultpri, see _dispatch_priority_compute_update
2537 pp |= (_dispatch_get_defaultpriority() &
2538 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
2539 } else {
2540 // else we need to keep the one that is set in the current pri
2541 pp |= (old_pri & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
2542 }
2543 if (likely(old_pri & ~_PTHREAD_PRIORITY_FLAGS_MASK)) {
2544 pflags |= _PTHREAD_SET_SELF_QOS_FLAG;
2545 }
2546 if (unlikely(DISPATCH_QUEUE_DRAIN_OWNER(&_dispatch_mgr_q) ==
2547 _dispatch_tid_self())) {
2548 DISPATCH_INTERNAL_CRASH(pp,
2549 "Changing the QoS while on the manager queue");
2550 }
2551 if (unlikely(pp & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) {
2552 DISPATCH_INTERNAL_CRASH(pp, "Cannot raise oneself to manager");
2553 }
2554 if (old_pri & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG) {
2555 DISPATCH_INTERNAL_CRASH(old_pri,
2556 "Cannot turn a manager thread into a normal one");
2557 }
2558 }
2559 }
2560 if (kv != VOUCHER_NO_MACH_VOUCHER) {
2561 #if VOUCHER_USE_MACH_VOUCHER
2562 pflags |= _PTHREAD_SET_SELF_VOUCHER_FLAG;
2563 #endif
2564 }
2565 if (!pflags) return;
2566 int r = _pthread_set_properties_self(pflags, pp, kv);
2567 if (r == EINVAL) {
2568 DISPATCH_INTERNAL_CRASH(pp, "_pthread_set_properties_self failed");
2569 }
2570 (void)dispatch_assume_zero(r);
2571 }
2572
2573 DISPATCH_NOINLINE
2574 voucher_t
2575 _dispatch_set_priority_and_voucher_slow(pthread_priority_t priority,
2576 voucher_t v, _dispatch_thread_set_self_t flags)
2577 {
2578 voucher_t ov = DISPATCH_NO_VOUCHER;
2579 mach_voucher_t kv = VOUCHER_NO_MACH_VOUCHER;
2580 if (v != DISPATCH_NO_VOUCHER) {
2581 bool retained = flags & DISPATCH_VOUCHER_CONSUME;
2582 ov = _voucher_get();
2583 if (ov == v && (flags & DISPATCH_VOUCHER_REPLACE)) {
2584 if (retained && v) _voucher_release_no_dispose(v);
2585 ov = DISPATCH_NO_VOUCHER;
2586 } else {
2587 if (!retained && v) _voucher_retain(v);
2588 kv = _voucher_swap_and_get_mach_voucher(ov, v);
2589 }
2590 }
2591 #if !PTHREAD_WORKQUEUE_RESETS_VOUCHER_AND_PRIORITY_ON_PARK
2592 flags &= ~(_dispatch_thread_set_self_t)DISPATCH_THREAD_PARK;
2593 #endif
2594 if (!(flags & DISPATCH_THREAD_PARK)) {
2595 _dispatch_set_priority_and_mach_voucher_slow(priority, kv);
2596 }
2597 if (ov != DISPATCH_NO_VOUCHER && (flags & DISPATCH_VOUCHER_REPLACE)) {
2598 if (ov) _voucher_release(ov);
2599 ov = DISPATCH_NO_VOUCHER;
2600 }
2601 return ov;
2602 }
2603 #endif
2604 #pragma mark -
2605 #pragma mark dispatch_continuation_t
2606
2607 static void
2608 _dispatch_force_cache_cleanup(void)
2609 {
2610 dispatch_continuation_t dc;
2611 dc = _dispatch_thread_getspecific(dispatch_cache_key);
2612 if (dc) {
2613 _dispatch_thread_setspecific(dispatch_cache_key, NULL);
2614 _dispatch_cache_cleanup(dc);
2615 }
2616 }
2617
2618 DISPATCH_NOINLINE
2619 static void
2620 _dispatch_cache_cleanup(void *value)
2621 {
2622 dispatch_continuation_t dc, next_dc = value;
2623
2624 while ((dc = next_dc)) {
2625 next_dc = dc->do_next;
2626 _dispatch_continuation_free_to_heap(dc);
2627 }
2628 }
2629
2630 #if DISPATCH_USE_MEMORYPRESSURE_SOURCE
2631 DISPATCH_NOINLINE
2632 void
2633 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc)
2634 {
2635 _dispatch_continuation_free_to_heap(dc);
2636 dispatch_continuation_t next_dc;
2637 dc = _dispatch_thread_getspecific(dispatch_cache_key);
2638 int cnt;
2639 if (!dc || (cnt = dc->dc_cache_cnt -
2640 _dispatch_continuation_cache_limit) <= 0){
2641 return;
2642 }
2643 do {
2644 next_dc = dc->do_next;
2645 _dispatch_continuation_free_to_heap(dc);
2646 } while (--cnt && (dc = next_dc));
2647 _dispatch_thread_setspecific(dispatch_cache_key, next_dc);
2648 }
2649 #endif
2650
2651 DISPATCH_ALWAYS_INLINE_NDEBUG
2652 static inline void
2653 _dispatch_continuation_slow_item_signal(dispatch_queue_t dq,
2654 dispatch_object_t dou)
2655 {
2656 dispatch_continuation_t dc = dou._dc;
2657 pthread_priority_t pp = dq->dq_override;
2658
2659 _dispatch_trace_continuation_pop(dq, dc);
2660 if (pp > (dc->dc_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
2661 _dispatch_wqthread_override_start((mach_port_t)dc->dc_data, pp);
2662 }
2663 _dispatch_thread_event_signal((dispatch_thread_event_t)dc->dc_other);
2664 _dispatch_introspection_queue_item_complete(dc);
2665 }
2666
2667 DISPATCH_NOINLINE
2668 static void
2669 _dispatch_continuation_push(dispatch_queue_t dq, dispatch_continuation_t dc)
2670 {
2671 _dispatch_queue_push(dq, dc,
2672 _dispatch_continuation_get_override_priority(dq, dc));
2673 }
2674
2675 DISPATCH_NOINLINE
2676 static void
2677 _dispatch_continuation_push_sync_slow(dispatch_queue_t dq,
2678 dispatch_continuation_t dc)
2679 {
2680 _dispatch_queue_push_inline(dq, dc,
2681 _dispatch_continuation_get_override_priority(dq, dc),
2682 DISPATCH_WAKEUP_SLOW_WAITER);
2683 }
2684
2685 DISPATCH_ALWAYS_INLINE
2686 static inline void
2687 _dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
2688 bool barrier)
2689 {
2690 if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
2691 return _dispatch_continuation_push(dq, dc);
2692 }
2693 return _dispatch_async_f2(dq, dc);
2694 }
2695
2696 DISPATCH_NOINLINE
2697 void
2698 _dispatch_continuation_async(dispatch_queue_t dq, dispatch_continuation_t dc)
2699 {
2700 _dispatch_continuation_async2(dq, dc,
2701 dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT);
2702 }
2703
2704 #pragma mark -
2705 #pragma mark dispatch_block_create
2706
2707 #if __BLOCKS__
2708
2709 DISPATCH_ALWAYS_INLINE
2710 static inline bool
2711 _dispatch_block_flags_valid(dispatch_block_flags_t flags)
2712 {
2713 return ((flags & ~DISPATCH_BLOCK_API_MASK) == 0);
2714 }
2715
2716 DISPATCH_ALWAYS_INLINE
2717 static inline dispatch_block_flags_t
2718 _dispatch_block_normalize_flags(dispatch_block_flags_t flags)
2719 {
2720 if (flags & (DISPATCH_BLOCK_NO_VOUCHER|DISPATCH_BLOCK_DETACHED)) {
2721 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2722 }
2723 if (flags & (DISPATCH_BLOCK_NO_QOS_CLASS|DISPATCH_BLOCK_DETACHED)) {
2724 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2725 }
2726 return flags;
2727 }
2728
2729 static inline dispatch_block_t
2730 _dispatch_block_create_with_voucher_and_priority(dispatch_block_flags_t flags,
2731 voucher_t voucher, pthread_priority_t pri, dispatch_block_t block)
2732 {
2733 flags = _dispatch_block_normalize_flags(flags);
2734 bool assign = (flags & DISPATCH_BLOCK_ASSIGN_CURRENT);
2735
2736 if (assign && !(flags & DISPATCH_BLOCK_HAS_VOUCHER)) {
2737 voucher = VOUCHER_CURRENT;
2738 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2739 }
2740 if (voucher == VOUCHER_CURRENT) {
2741 voucher = _voucher_get();
2742 }
2743 if (assign && !(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
2744 pri = _dispatch_priority_propagate();
2745 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2746 }
2747 dispatch_block_t db = _dispatch_block_create(flags, voucher, pri, block);
2748 #if DISPATCH_DEBUG
2749 dispatch_assert(_dispatch_block_get_data(db));
2750 #endif
2751 return db;
2752 }
2753
2754 dispatch_block_t
2755 dispatch_block_create(dispatch_block_flags_t flags, dispatch_block_t block)
2756 {
2757 if (!_dispatch_block_flags_valid(flags)) return DISPATCH_BAD_INPUT;
2758 return _dispatch_block_create_with_voucher_and_priority(flags, NULL, 0,
2759 block);
2760 }
2761
2762 dispatch_block_t
2763 dispatch_block_create_with_qos_class(dispatch_block_flags_t flags,
2764 dispatch_qos_class_t qos_class, int relative_priority,
2765 dispatch_block_t block)
2766 {
2767 if (!_dispatch_block_flags_valid(flags) ||
2768 !_dispatch_qos_class_valid(qos_class, relative_priority)) {
2769 return DISPATCH_BAD_INPUT;
2770 }
2771 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2772 pthread_priority_t pri = 0;
2773 #if HAVE_PTHREAD_WORKQUEUE_QOS
2774 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0);
2775 #endif
2776 return _dispatch_block_create_with_voucher_and_priority(flags, NULL,
2777 pri, block);
2778 }
2779
2780 dispatch_block_t
2781 dispatch_block_create_with_voucher(dispatch_block_flags_t flags,
2782 voucher_t voucher, dispatch_block_t block)
2783 {
2784 if (!_dispatch_block_flags_valid(flags)) return DISPATCH_BAD_INPUT;
2785 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2786 return _dispatch_block_create_with_voucher_and_priority(flags, voucher, 0,
2787 block);
2788 }
2789
2790 dispatch_block_t
2791 dispatch_block_create_with_voucher_and_qos_class(dispatch_block_flags_t flags,
2792 voucher_t voucher, dispatch_qos_class_t qos_class,
2793 int relative_priority, dispatch_block_t block)
2794 {
2795 if (!_dispatch_block_flags_valid(flags) ||
2796 !_dispatch_qos_class_valid(qos_class, relative_priority)) {
2797 return DISPATCH_BAD_INPUT;
2798 }
2799 flags |= (DISPATCH_BLOCK_HAS_VOUCHER|DISPATCH_BLOCK_HAS_PRIORITY);
2800 pthread_priority_t pri = 0;
2801 #if HAVE_PTHREAD_WORKQUEUE_QOS
2802 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0);
2803 #endif
2804 return _dispatch_block_create_with_voucher_and_priority(flags, voucher,
2805 pri, block);
2806 }
2807
2808 void
2809 dispatch_block_perform(dispatch_block_flags_t flags, dispatch_block_t block)
2810 {
2811 if (!_dispatch_block_flags_valid(flags)) {
2812 DISPATCH_CLIENT_CRASH(flags, "Invalid flags passed to "
2813 "dispatch_block_perform()");
2814 }
2815 flags = _dispatch_block_normalize_flags(flags);
2816 struct dispatch_block_private_data_s dbpds =
2817 DISPATCH_BLOCK_PRIVATE_DATA_PERFORM_INITIALIZER(flags, block);
2818 return _dispatch_block_invoke_direct(&dbpds);
2819 }
2820
2821 #define _dbpd_group(dbpd) ((dbpd)->dbpd_group)
2822
2823 void
2824 _dispatch_block_invoke_direct(const struct dispatch_block_private_data_s *dbcpd)
2825 {
2826 dispatch_block_private_data_t dbpd = (dispatch_block_private_data_t)dbcpd;
2827 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2828 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2829 if (slowpath(atomic_flags & DBF_WAITED)) {
2830 DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
2831 "run more than once and waited for");
2832 }
2833 if (atomic_flags & DBF_CANCELED) goto out;
2834
2835 pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY;
2836 _dispatch_thread_set_self_t adopt_flags = 0;
2837 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
2838 op = _dispatch_get_priority();
2839 p = dbpd->dbpd_priority;
2840 if (_dispatch_block_sync_should_enforce_qos_class(flags)) {
2841 adopt_flags |= DISPATCH_PRIORITY_ENFORCE;
2842 }
2843 }
2844 voucher_t ov, v = DISPATCH_NO_VOUCHER;
2845 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2846 v = dbpd->dbpd_voucher;
2847 }
2848 ov = _dispatch_adopt_priority_and_set_voucher(p, v, adopt_flags);
2849 dbpd->dbpd_thread = _dispatch_tid_self();
2850 _dispatch_client_callout(dbpd->dbpd_block,
2851 _dispatch_Block_invoke(dbpd->dbpd_block));
2852 _dispatch_reset_priority_and_voucher(op, ov);
2853 out:
2854 if ((atomic_flags & DBF_PERFORM) == 0) {
2855 if (os_atomic_inc2o(dbpd, dbpd_performed, relaxed) == 1) {
2856 dispatch_group_leave(_dbpd_group(dbpd));
2857 }
2858 }
2859 }
2860
2861 void
2862 _dispatch_block_sync_invoke(void *block)
2863 {
2864 dispatch_block_t b = block;
2865 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
2866 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2867 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2868 if (slowpath(atomic_flags & DBF_WAITED)) {
2869 DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
2870 "run more than once and waited for");
2871 }
2872 if (atomic_flags & DBF_CANCELED) goto out;
2873
2874 pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY;
2875 _dispatch_thread_set_self_t adopt_flags = 0;
2876 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
2877 op = _dispatch_get_priority();
2878 p = dbpd->dbpd_priority;
2879 if (_dispatch_block_sync_should_enforce_qos_class(flags)) {
2880 adopt_flags |= DISPATCH_PRIORITY_ENFORCE;
2881 }
2882 }
2883 voucher_t ov, v = DISPATCH_NO_VOUCHER;
2884 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2885 v = dbpd->dbpd_voucher;
2886 }
2887 ov = _dispatch_adopt_priority_and_set_voucher(p, v, adopt_flags);
2888 dbpd->dbpd_block();
2889 _dispatch_reset_priority_and_voucher(op, ov);
2890 out:
2891 if ((atomic_flags & DBF_PERFORM) == 0) {
2892 if (os_atomic_inc2o(dbpd, dbpd_performed, relaxed) == 1) {
2893 dispatch_group_leave(_dbpd_group(dbpd));
2894 }
2895 }
2896
2897 os_mpsc_queue_t oq;
2898 oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
2899 if (oq) {
2900 // balances dispatch_{,barrier_,}sync
2901 _os_object_release_internal(oq->_as_os_obj);
2902 }
2903 }
2904
2905 DISPATCH_ALWAYS_INLINE
2906 static void
2907 _dispatch_block_async_invoke2(dispatch_block_t b, bool release)
2908 {
2909 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
2910 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2911 if (slowpath(atomic_flags & DBF_WAITED)) {
2912 DISPATCH_CLIENT_CRASH(atomic_flags, "A block object may not be both "
2913 "run more than once and waited for");
2914 }
2915 if (!slowpath(atomic_flags & DBF_CANCELED)) {
2916 dbpd->dbpd_block();
2917 }
2918 if ((atomic_flags & DBF_PERFORM) == 0) {
2919 if (os_atomic_inc2o(dbpd, dbpd_performed, relaxed) == 1) {
2920 dispatch_group_leave(_dbpd_group(dbpd));
2921 }
2922 }
2923 os_mpsc_queue_t oq;
2924 oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
2925 if (oq) {
2926 // balances dispatch_{,barrier_,group_}async
2927 _os_object_release_internal_inline(oq->_as_os_obj);
2928 }
2929 if (release) {
2930 Block_release(b);
2931 }
2932 }
2933
2934 static void
2935 _dispatch_block_async_invoke(void *block)
2936 {
2937 _dispatch_block_async_invoke2(block, false);
2938 }
2939
2940 static void
2941 _dispatch_block_async_invoke_and_release(void *block)
2942 {
2943 _dispatch_block_async_invoke2(block, true);
2944 }
2945
2946 void
2947 dispatch_block_cancel(dispatch_block_t db)
2948 {
2949 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2950 if (!dbpd) {
2951 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
2952 "dispatch_block_cancel()");
2953 }
2954 (void)os_atomic_or2o(dbpd, dbpd_atomic_flags, DBF_CANCELED, relaxed);
2955 }
2956
2957 long
2958 dispatch_block_testcancel(dispatch_block_t db)
2959 {
2960 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2961 if (!dbpd) {
2962 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
2963 "dispatch_block_testcancel()");
2964 }
2965 return (bool)(dbpd->dbpd_atomic_flags & DBF_CANCELED);
2966 }
2967
2968 long
2969 dispatch_block_wait(dispatch_block_t db, dispatch_time_t timeout)
2970 {
2971 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2972 if (!dbpd) {
2973 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
2974 "dispatch_block_wait()");
2975 }
2976
2977 unsigned int flags = os_atomic_or_orig2o(dbpd, dbpd_atomic_flags,
2978 DBF_WAITING, relaxed);
2979 if (slowpath(flags & (DBF_WAITED | DBF_WAITING))) {
2980 DISPATCH_CLIENT_CRASH(flags, "A block object may not be waited for "
2981 "more than once");
2982 }
2983
2984 // <rdar://problem/17703192> If we know the queue where this block is
2985 // enqueued, or the thread that's executing it, then we should boost
2986 // it here.
2987
2988 pthread_priority_t pp = _dispatch_get_priority();
2989
2990 os_mpsc_queue_t boost_oq;
2991 boost_oq = os_atomic_xchg2o(dbpd, dbpd_queue, NULL, relaxed);
2992 if (boost_oq) {
2993 // release balances dispatch_{,barrier_,group_}async.
2994 // Can't put the queue back in the timeout case: the block might
2995 // finish after we fell out of group_wait and see our NULL, so
2996 // neither of us would ever release. Side effect: After a _wait
2997 // that times out, subsequent waits will not boost the qos of the
2998 // still-running block.
2999 dx_wakeup(boost_oq, pp, DISPATCH_WAKEUP_OVERRIDING |
3000 DISPATCH_WAKEUP_CONSUME);
3001 }
3002
3003 mach_port_t boost_th = dbpd->dbpd_thread;
3004 if (boost_th) {
3005 _dispatch_thread_override_start(boost_th, pp, dbpd);
3006 }
3007
3008 int performed = os_atomic_load2o(dbpd, dbpd_performed, relaxed);
3009 if (slowpath(performed > 1 || (boost_th && boost_oq))) {
3010 DISPATCH_CLIENT_CRASH(performed, "A block object may not be both "
3011 "run more than once and waited for");
3012 }
3013
3014 long ret = dispatch_group_wait(_dbpd_group(dbpd), timeout);
3015
3016 if (boost_th) {
3017 _dispatch_thread_override_end(boost_th, dbpd);
3018 }
3019
3020 if (ret) {
3021 // timed out: reverse our changes
3022 (void)os_atomic_and2o(dbpd, dbpd_atomic_flags,
3023 ~DBF_WAITING, relaxed);
3024 } else {
3025 (void)os_atomic_or2o(dbpd, dbpd_atomic_flags,
3026 DBF_WAITED, relaxed);
3027 // don't need to re-test here: the second call would see
3028 // the first call's WAITING
3029 }
3030
3031 return ret;
3032 }
3033
3034 void
3035 dispatch_block_notify(dispatch_block_t db, dispatch_queue_t queue,
3036 dispatch_block_t notification_block)
3037 {
3038 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
3039 if (!dbpd) {
3040 DISPATCH_CLIENT_CRASH(db, "Invalid block object passed to "
3041 "dispatch_block_notify()");
3042 }
3043 int performed = os_atomic_load2o(dbpd, dbpd_performed, relaxed);
3044 if (slowpath(performed > 1)) {
3045 DISPATCH_CLIENT_CRASH(performed, "A block object may not be both "
3046 "run more than once and observed");
3047 }
3048
3049 return dispatch_group_notify(_dbpd_group(dbpd), queue, notification_block);
3050 }
3051
3052 DISPATCH_NOINLINE
3053 void
3054 _dispatch_continuation_init_slow(dispatch_continuation_t dc,
3055 dispatch_queue_class_t dqu, dispatch_block_flags_t flags)
3056 {
3057 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(dc->dc_ctxt);
3058 dispatch_block_flags_t block_flags = dbpd->dbpd_flags;
3059 uintptr_t dc_flags = dc->dc_flags;
3060 os_mpsc_queue_t oq = dqu._oq;
3061
3062 // balanced in d_block_async_invoke_and_release or d_block_wait
3063 if (os_atomic_cmpxchg2o(dbpd, dbpd_queue, NULL, oq, relaxed)) {
3064 _os_object_retain_internal_inline(oq->_as_os_obj);
3065 }
3066
3067 if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
3068 dc->dc_func = _dispatch_block_async_invoke_and_release;
3069 } else {
3070 dc->dc_func = _dispatch_block_async_invoke;
3071 }
3072
3073 flags |= block_flags;
3074 if (block_flags & DISPATCH_BLOCK_HAS_PRIORITY) {
3075 _dispatch_continuation_priority_set(dc, dbpd->dbpd_priority, flags);
3076 } else {
3077 _dispatch_continuation_priority_set(dc, dc->dc_priority, flags);
3078 }
3079 if (block_flags & DISPATCH_BLOCK_BARRIER) {
3080 dc_flags |= DISPATCH_OBJ_BARRIER_BIT;
3081 }
3082 if (block_flags & DISPATCH_BLOCK_HAS_VOUCHER) {
3083 voucher_t v = dbpd->dbpd_voucher;
3084 dc->dc_voucher = v ? _voucher_retain(v) : NULL;
3085 dc_flags |= DISPATCH_OBJ_ENFORCE_VOUCHER;
3086 _dispatch_voucher_debug("continuation[%p] set", dc->dc_voucher, dc);
3087 _dispatch_voucher_ktrace_dc_push(dc);
3088 } else {
3089 _dispatch_continuation_voucher_set(dc, oq, flags);
3090 }
3091 dc_flags |= DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT;
3092 dc->dc_flags = dc_flags;
3093 }
3094
3095 void
3096 _dispatch_continuation_update_bits(dispatch_continuation_t dc,
3097 uintptr_t dc_flags)
3098 {
3099 dc->dc_flags = dc_flags;
3100 if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) {
3101 if (dc_flags & DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT) {
3102 dc->dc_func = _dispatch_block_async_invoke_and_release;
3103 } else if (dc_flags & DISPATCH_OBJ_BLOCK_BIT) {
3104 dc->dc_func = _dispatch_call_block_and_release;
3105 }
3106 } else {
3107 if (dc_flags & DISPATCH_OBJ_BLOCK_PRIVATE_DATA_BIT) {
3108 dc->dc_func = _dispatch_block_async_invoke;
3109 } else if (dc_flags & DISPATCH_OBJ_BLOCK_BIT) {
3110 dc->dc_func = _dispatch_Block_invoke(dc->dc_ctxt);
3111 }
3112 }
3113 }
3114
3115 #endif // __BLOCKS__
3116
3117 #pragma mark -
3118 #pragma mark dispatch_barrier_async
3119
3120 DISPATCH_NOINLINE
3121 static void
3122 _dispatch_async_f_slow(dispatch_queue_t dq, void *ctxt,
3123 dispatch_function_t func, pthread_priority_t pp,
3124 dispatch_block_flags_t flags, uintptr_t dc_flags)
3125 {
3126 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
3127 _dispatch_continuation_init_f(dc, dq, ctxt, func, pp, flags, dc_flags);
3128 _dispatch_continuation_async(dq, dc);
3129 }
3130
3131 DISPATCH_ALWAYS_INLINE
3132 static inline void
3133 _dispatch_barrier_async_f2(dispatch_queue_t dq, void *ctxt,
3134 dispatch_function_t func, pthread_priority_t pp,
3135 dispatch_block_flags_t flags)
3136 {
3137 dispatch_continuation_t dc = _dispatch_continuation_alloc_cacheonly();
3138 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
3139
3140 if (!fastpath(dc)) {
3141 return _dispatch_async_f_slow(dq, ctxt, func, pp, flags, dc_flags);
3142 }
3143
3144 _dispatch_continuation_init_f(dc, dq, ctxt, func, pp, flags, dc_flags);
3145 _dispatch_continuation_push(dq, dc);
3146 }
3147
3148 DISPATCH_NOINLINE
3149 void
3150 dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
3151 dispatch_function_t func)
3152 {
3153 _dispatch_barrier_async_f2(dq, ctxt, func, 0, 0);
3154 }
3155
3156 DISPATCH_NOINLINE
3157 void
3158 _dispatch_barrier_async_detached_f(dispatch_queue_t dq, void *ctxt,
3159 dispatch_function_t func)
3160 {
3161 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3162 dc->dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
3163 dc->dc_func = func;
3164 dc->dc_ctxt = ctxt;
3165 dc->dc_voucher = DISPATCH_NO_VOUCHER;
3166 dc->dc_priority = DISPATCH_NO_PRIORITY;
3167 _dispatch_queue_push(dq, dc, 0);
3168 }
3169
3170 #ifdef __BLOCKS__
3171 void
3172 dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void))
3173 {
3174 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3175 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;
3176
3177 _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
3178 _dispatch_continuation_push(dq, dc);
3179 }
3180 #endif
3181
3182 #pragma mark -
3183 #pragma mark dispatch_async
3184
3185 void
3186 _dispatch_async_redirect_invoke(dispatch_continuation_t dc,
3187 dispatch_invoke_flags_t flags)
3188 {
3189 dispatch_thread_frame_s dtf;
3190 struct dispatch_continuation_s *other_dc = dc->dc_other;
3191 dispatch_invoke_flags_t ctxt_flags = (dispatch_invoke_flags_t)dc->dc_ctxt;
3192 // if we went through _dispatch_root_queue_push_override,
3193 // the "right" root queue was stuffed into dc_func
3194 dispatch_queue_t assumed_rq = (dispatch_queue_t)dc->dc_func;
3195 dispatch_queue_t dq = dc->dc_data, rq, old_dq;
3196 struct _dispatch_identity_s di;
3197
3198 pthread_priority_t op, dp, old_dp;
3199
3200 if (ctxt_flags) {
3201 flags &= ~_DISPATCH_INVOKE_AUTORELEASE_MASK;
3202 flags |= ctxt_flags;
3203 }
3204 old_dq = _dispatch_get_current_queue();
3205 if (assumed_rq) {
3206 _dispatch_queue_set_current(assumed_rq);
3207 _dispatch_root_queue_identity_assume(&di, 0);
3208 }
3209
3210 old_dp = _dispatch_set_defaultpriority(dq->dq_priority, &dp);
3211 op = dq->dq_override;
3212 if (op > (dp & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
3213 _dispatch_wqthread_override_start(_dispatch_tid_self(), op);
3214 // Ensure that the root queue sees that this thread was overridden.
3215 _dispatch_set_defaultpriority_override();
3216 }
3217
3218 _dispatch_thread_frame_push(&dtf, dq);
3219 _dispatch_continuation_pop_forwarded(dc, DISPATCH_NO_VOUCHER,
3220 DISPATCH_OBJ_CONSUME_BIT, {
3221 _dispatch_continuation_pop(other_dc, dq, flags);
3222 });
3223 _dispatch_thread_frame_pop(&dtf);
3224 if (assumed_rq) {
3225 _dispatch_root_queue_identity_restore(&di);
3226 _dispatch_queue_set_current(old_dq);
3227 }
3228 _dispatch_reset_defaultpriority(old_dp);
3229
3230 rq = dq->do_targetq;
3231 while (slowpath(rq->do_targetq) && rq != old_dq) {
3232 _dispatch_non_barrier_complete(rq);
3233 rq = rq->do_targetq;
3234 }
3235
3236 _dispatch_non_barrier_complete(dq);
3237
3238 if (dtf.dtf_deferred) {
3239 struct dispatch_object_s *dou = dtf.dtf_deferred;
3240 return _dispatch_queue_drain_deferred_invoke(dq, flags, 0, dou);
3241 }
3242
3243 _dispatch_release_tailcall(dq);
3244 }
3245
3246 DISPATCH_ALWAYS_INLINE
3247 static inline dispatch_continuation_t
3248 _dispatch_async_redirect_wrap(dispatch_queue_t dq, dispatch_object_t dou)
3249 {
3250 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3251
3252 dou._do->do_next = NULL;
3253 dc->do_vtable = DC_VTABLE(ASYNC_REDIRECT);
3254 dc->dc_func = NULL;
3255 dc->dc_ctxt = (void *)(uintptr_t)_dispatch_queue_autorelease_frequency(dq);
3256 dc->dc_data = dq;
3257 dc->dc_other = dou._do;
3258 dc->dc_voucher = DISPATCH_NO_VOUCHER;
3259 dc->dc_priority = DISPATCH_NO_PRIORITY;
3260 _dispatch_retain(dq);
3261 return dc;
3262 }
3263
3264 DISPATCH_NOINLINE
3265 static void
3266 _dispatch_async_f_redirect(dispatch_queue_t dq,
3267 dispatch_object_t dou, pthread_priority_t pp)
3268 {
3269 if (!slowpath(_dispatch_object_is_redirection(dou))) {
3270 dou._dc = _dispatch_async_redirect_wrap(dq, dou);
3271 }
3272 dq = dq->do_targetq;
3273
3274 // Find the queue to redirect to
3275 while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
3276 if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
3277 break;
3278 }
3279 if (!dou._dc->dc_ctxt) {
3280 // find first queue in descending target queue order that has
3281 // an autorelease frequency set, and use that as the frequency for
3282 // this continuation.
3283 dou._dc->dc_ctxt = (void *)
3284 (uintptr_t)_dispatch_queue_autorelease_frequency(dq);
3285 }
3286 dq = dq->do_targetq;
3287 }
3288
3289 _dispatch_queue_push(dq, dou, pp);
3290 }
3291
3292 DISPATCH_ALWAYS_INLINE
3293 static inline void
3294 _dispatch_continuation_redirect(dispatch_queue_t dq,
3295 struct dispatch_object_s *dc)
3296 {
3297 _dispatch_trace_continuation_pop(dq, dc);
3298 // This is a re-redirect, overrides have already been applied
3299 // by _dispatch_async_f2.
3300 // However we want to end up on the root queue matching `dc` qos, so pick up
3301 // the current override of `dq` which includes dc's overrde (and maybe more)
3302 _dispatch_async_f_redirect(dq, dc, dq->dq_override);
3303 _dispatch_introspection_queue_item_complete(dc);
3304 }
3305
3306 DISPATCH_NOINLINE
3307 static void
3308 _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
3309 {
3310 // <rdar://problem/24738102&24743140> reserving non barrier width
3311 // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
3312 // equivalent), so we have to check that this thread hasn't enqueued
3313 // anything ahead of this call or we can break ordering
3314 if (slowpath(dq->dq_items_tail)) {
3315 return _dispatch_continuation_push(dq, dc);
3316 }
3317
3318 if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {
3319 return _dispatch_continuation_push(dq, dc);
3320 }
3321
3322 return _dispatch_async_f_redirect(dq, dc,
3323 _dispatch_continuation_get_override_priority(dq, dc));
3324 }
3325
3326 DISPATCH_ALWAYS_INLINE
3327 static inline void
3328 _dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3329 pthread_priority_t pp, dispatch_block_flags_t flags)
3330 {
3331 dispatch_continuation_t dc = _dispatch_continuation_alloc_cacheonly();
3332 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
3333
3334 if (!fastpath(dc)) {
3335 return _dispatch_async_f_slow(dq, ctxt, func, pp, flags, dc_flags);
3336 }
3337
3338 _dispatch_continuation_init_f(dc, dq, ctxt, func, pp, flags, dc_flags);
3339 _dispatch_continuation_async2(dq, dc, false);
3340 }
3341
3342 DISPATCH_NOINLINE
3343 void
3344 dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
3345 {
3346 _dispatch_async_f(dq, ctxt, func, 0, 0);
3347 }
3348
3349 DISPATCH_NOINLINE
3350 void
3351 dispatch_async_enforce_qos_class_f(dispatch_queue_t dq, void *ctxt,
3352 dispatch_function_t func)
3353 {
3354 _dispatch_async_f(dq, ctxt, func, 0, DISPATCH_BLOCK_ENFORCE_QOS_CLASS);
3355 }
3356
3357 #ifdef __BLOCKS__
3358 void
3359 dispatch_async(dispatch_queue_t dq, void (^work)(void))
3360 {
3361 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3362 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
3363
3364 _dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
3365 _dispatch_continuation_async(dq, dc);
3366 }
3367 #endif
3368
3369 #pragma mark -
3370 #pragma mark dispatch_group_async
3371
3372 DISPATCH_ALWAYS_INLINE
3373 static inline void
3374 _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
3375 dispatch_continuation_t dc)
3376 {
3377 dispatch_group_enter(dg);
3378 dc->dc_data = dg;
3379 _dispatch_continuation_async(dq, dc);
3380 }
3381
3382 DISPATCH_NOINLINE
3383 void
3384 dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
3385 dispatch_function_t func)
3386 {
3387 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3388 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_GROUP_BIT;
3389
3390 _dispatch_continuation_init_f(dc, dq, ctxt, func, 0, 0, dc_flags);
3391 _dispatch_continuation_group_async(dg, dq, dc);
3392 }
3393
3394 #ifdef __BLOCKS__
3395 void
3396 dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
3397 dispatch_block_t db)
3398 {
3399 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3400 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_GROUP_BIT;
3401
3402 _dispatch_continuation_init(dc, dq, db, 0, 0, dc_flags);
3403 _dispatch_continuation_group_async(dg, dq, dc);
3404 }
3405 #endif
3406
3407 #pragma mark -
3408 #pragma mark dispatch_sync / dispatch_barrier_sync recurse and invoke
3409
3410 DISPATCH_NOINLINE
3411 static void
3412 _dispatch_sync_function_invoke_slow(dispatch_queue_t dq, void *ctxt,
3413 dispatch_function_t func)
3414 {
3415 voucher_t ov;
3416 dispatch_thread_frame_s dtf;
3417 _dispatch_thread_frame_push(&dtf, dq);
3418 ov = _dispatch_set_priority_and_voucher(0, dq->dq_override_voucher, 0);
3419 _dispatch_client_callout(ctxt, func);
3420 _dispatch_perfmon_workitem_inc();
3421 _dispatch_reset_voucher(ov, 0);
3422 _dispatch_thread_frame_pop(&dtf);
3423 }
3424
3425 DISPATCH_ALWAYS_INLINE
3426 static inline void
3427 _dispatch_sync_function_invoke_inline(dispatch_queue_t dq, void *ctxt,
3428 dispatch_function_t func)
3429 {
3430 if (slowpath(dq->dq_override_voucher != DISPATCH_NO_VOUCHER)) {
3431 return _dispatch_sync_function_invoke_slow(dq, ctxt, func);
3432 }
3433 dispatch_thread_frame_s dtf;
3434 _dispatch_thread_frame_push(&dtf, dq);
3435 _dispatch_client_callout(ctxt, func);
3436 _dispatch_perfmon_workitem_inc();
3437 _dispatch_thread_frame_pop(&dtf);
3438 }
3439
3440 DISPATCH_NOINLINE
3441 static void
3442 _dispatch_sync_function_invoke(dispatch_queue_t dq, void *ctxt,
3443 dispatch_function_t func)
3444 {
3445 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
3446 }
3447
3448 void
3449 _dispatch_sync_recurse_invoke(void *ctxt)
3450 {
3451 dispatch_continuation_t dc = ctxt;
3452 _dispatch_sync_function_invoke(dc->dc_data, dc->dc_ctxt, dc->dc_func);
3453 }
3454
3455 DISPATCH_ALWAYS_INLINE
3456 static inline void
3457 _dispatch_sync_function_recurse(dispatch_queue_t dq, void *ctxt,
3458 dispatch_function_t func, pthread_priority_t pp)
3459 {
3460 struct dispatch_continuation_s dc = {
3461 .dc_data = dq,
3462 .dc_func = func,
3463 .dc_ctxt = ctxt,
3464 };
3465 _dispatch_sync_f(dq->do_targetq, &dc, _dispatch_sync_recurse_invoke, pp);
3466 }
3467
3468 DISPATCH_NOINLINE
3469 static void
3470 _dispatch_non_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
3471 dispatch_function_t func)
3472 {
3473 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
3474 _dispatch_non_barrier_complete(dq);
3475 }
3476
3477 DISPATCH_NOINLINE
3478 static void
3479 _dispatch_non_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
3480 dispatch_function_t func, pthread_priority_t pp)
3481 {
3482 _dispatch_sync_function_recurse(dq, ctxt, func, pp);
3483 _dispatch_non_barrier_complete(dq);
3484 }
3485
3486 DISPATCH_ALWAYS_INLINE
3487 static void
3488 _dispatch_non_barrier_sync_f_invoke_inline(dispatch_queue_t dq, void *ctxt,
3489 dispatch_function_t func, pthread_priority_t pp)
3490 {
3491 _dispatch_introspection_non_barrier_sync_begin(dq, func);
3492 if (slowpath(dq->do_targetq->do_targetq)) {
3493 return _dispatch_non_barrier_sync_f_recurse(dq, ctxt, func, pp);
3494 }
3495 _dispatch_non_barrier_sync_f_invoke(dq, ctxt, func);
3496 }
3497
3498 #pragma mark -
3499 #pragma mark dispatch_barrier_sync
3500
3501 DISPATCH_NOINLINE
3502 static void
3503 _dispatch_barrier_complete(dispatch_queue_t dq)
3504 {
3505 uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
3506 dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
3507
3508 if (slowpath(dq->dq_items_tail)) {
3509 return _dispatch_try_lock_transfer_or_wakeup(dq);
3510 }
3511
3512 if (!fastpath(_dispatch_queue_drain_try_unlock(dq, owned))) {
3513 // someone enqueued a slow item at the head
3514 // looping may be its last chance
3515 return _dispatch_try_lock_transfer_or_wakeup(dq);
3516 }
3517 }
3518
3519 DISPATCH_NOINLINE
3520 static void
3521 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
3522 dispatch_function_t func, pthread_priority_t pp)
3523 {
3524 _dispatch_sync_function_recurse(dq, ctxt, func, pp);
3525 _dispatch_barrier_complete(dq);
3526 }
3527
3528 DISPATCH_NOINLINE
3529 static void
3530 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
3531 dispatch_function_t func)
3532 {
3533 _dispatch_sync_function_invoke_inline(dq, ctxt, func);
3534 _dispatch_barrier_complete(dq);
3535 }
3536
3537 DISPATCH_ALWAYS_INLINE
3538 static void
3539 _dispatch_barrier_sync_f_invoke_inline(dispatch_queue_t dq, void *ctxt,
3540 dispatch_function_t func, pthread_priority_t pp)
3541 {
3542 _dispatch_introspection_barrier_sync_begin(dq, func);
3543 if (slowpath(dq->do_targetq->do_targetq)) {
3544 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func, pp);
3545 }
3546 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
3547 }
3548
3549 typedef struct dispatch_barrier_sync_context_s {
3550 struct dispatch_continuation_s dbsc_dc;
3551 dispatch_thread_frame_s dbsc_dtf;
3552 } *dispatch_barrier_sync_context_t;
3553
3554 static void
3555 _dispatch_barrier_sync_f_slow_invoke(void *ctxt)
3556 {
3557 dispatch_barrier_sync_context_t dbsc = ctxt;
3558 dispatch_continuation_t dc = &dbsc->dbsc_dc;
3559 dispatch_queue_t dq = dc->dc_data;
3560 dispatch_thread_event_t event = (dispatch_thread_event_t)dc->dc_other;
3561
3562 dispatch_assert(dq == _dispatch_queue_get_current());
3563 #if DISPATCH_COCOA_COMPAT
3564 if (slowpath(_dispatch_queue_is_thread_bound(dq))) {
3565 dispatch_assert(_dispatch_thread_frame_get_current() == NULL);
3566
3567 // the block runs on the thread the queue is bound to and not
3568 // on the calling thread, but we mean to see the calling thread
3569 // dispatch thread frames, so we fake the link, and then undo it
3570 _dispatch_thread_frame_set_current(&dbsc->dbsc_dtf);
3571 // The queue is bound to a non-dispatch thread (e.g. main thread)
3572 _dispatch_continuation_voucher_adopt(dc, DISPATCH_NO_VOUCHER,
3573 DISPATCH_OBJ_CONSUME_BIT);
3574 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
3575 os_atomic_store2o(dc, dc_func, NULL, release);
3576 _dispatch_thread_frame_set_current(NULL);
3577 }
3578 #endif
3579 _dispatch_thread_event_signal(event); // release
3580 }
3581
3582 DISPATCH_NOINLINE
3583 static void
3584 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
3585 dispatch_function_t func, pthread_priority_t pp)
3586 {
3587 if (slowpath(!dq->do_targetq)) {
3588 // see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
3589 return _dispatch_sync_function_invoke(dq, ctxt, func);
3590 }
3591
3592 if (!pp) {
3593 pp = _dispatch_get_priority();
3594 pp &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3595 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
3596 }
3597 dispatch_thread_event_s event;
3598 _dispatch_thread_event_init(&event);
3599 struct dispatch_barrier_sync_context_s dbsc = {
3600 .dbsc_dc = {
3601 .dc_data = dq,
3602 #if DISPATCH_COCOA_COMPAT
3603 .dc_func = func,
3604 .dc_ctxt = ctxt,
3605 #endif
3606 .dc_other = &event,
3607 }
3608 };
3609 #if DISPATCH_COCOA_COMPAT
3610 // It's preferred to execute synchronous blocks on the current thread
3611 // due to thread-local side effects, etc. However, blocks submitted
3612 // to the main thread MUST be run on the main thread
3613 if (slowpath(_dispatch_queue_is_thread_bound(dq))) {
3614 // consumed by _dispatch_barrier_sync_f_slow_invoke
3615 // or in the DISPATCH_COCOA_COMPAT hunk below
3616 _dispatch_continuation_voucher_set(&dbsc.dbsc_dc, dq, 0);
3617 // save frame linkage for _dispatch_barrier_sync_f_slow_invoke
3618 _dispatch_thread_frame_save_state(&dbsc.dbsc_dtf);
3619 // thread bound queues cannot mutate their target queue hierarchy
3620 // so it's fine to look now
3621 _dispatch_introspection_barrier_sync_begin(dq, func);
3622 }
3623 #endif
3624 uint32_t th_self = _dispatch_tid_self();
3625 struct dispatch_continuation_s dbss = {
3626 .dc_flags = DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT,
3627 .dc_func = _dispatch_barrier_sync_f_slow_invoke,
3628 .dc_ctxt = &dbsc,
3629 .dc_data = (void*)(uintptr_t)th_self,
3630 .dc_priority = pp,
3631 .dc_other = &event,
3632 .dc_voucher = DISPATCH_NO_VOUCHER,
3633 };
3634
3635 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
3636 if (unlikely(_dq_state_drain_locked_by(dq_state, th_self))) {
3637 DISPATCH_CLIENT_CRASH(dq, "dispatch_barrier_sync called on queue "
3638 "already owned by current thread");
3639 }
3640
3641 _dispatch_continuation_push_sync_slow(dq, &dbss);
3642 _dispatch_thread_event_wait(&event); // acquire
3643 _dispatch_thread_event_destroy(&event);
3644 if (_dispatch_queue_received_override(dq, pp)) {
3645 // Ensure that the root queue sees that this thread was overridden.
3646 // pairs with the _dispatch_wqthread_override_start in
3647 // _dispatch_continuation_slow_item_signal
3648 _dispatch_set_defaultpriority_override();
3649 }
3650
3651 #if DISPATCH_COCOA_COMPAT
3652 // Queue bound to a non-dispatch thread
3653 if (dbsc.dbsc_dc.dc_func == NULL) {
3654 return;
3655 } else if (dbsc.dbsc_dc.dc_voucher) {
3656 // this almost never happens, unless a dispatch_sync() onto a thread
3657 // bound queue went to the slow path at the same time dispatch_main()
3658 // is called, or the queue is detached from the runloop.
3659 _voucher_release(dbsc.dbsc_dc.dc_voucher);
3660 }
3661 #endif
3662
3663 _dispatch_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
3664 }
3665
3666 DISPATCH_ALWAYS_INLINE
3667 static inline void
3668 _dispatch_barrier_sync_f2(dispatch_queue_t dq, void *ctxt,
3669 dispatch_function_t func, pthread_priority_t pp)
3670 {
3671 if (slowpath(!_dispatch_queue_try_acquire_barrier_sync(dq))) {
3672 // global concurrent queues and queues bound to non-dispatch threads
3673 // always fall into the slow case
3674 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, pp);
3675 }
3676 //
3677 // TODO: the more correct thing to do would be to set dq_override to the qos
3678 // of the thread that just acquired the barrier lock here. Unwinding that
3679 // would slow down the uncontended fastpath however.
3680 //
3681 // The chosen tradeoff is that if an enqueue on a lower priority thread
3682 // contends with this fastpath, this thread may receive a useless override.
3683 // Improving this requires the override level to be part of the atomic
3684 // dq_state
3685 //
3686 _dispatch_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
3687 }
3688
3689 DISPATCH_NOINLINE
3690 static void
3691 _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
3692 dispatch_function_t func, pthread_priority_t pp)
3693 {
3694 _dispatch_barrier_sync_f2(dq, ctxt, func, pp);
3695 }
3696
3697 DISPATCH_NOINLINE
3698 void
3699 dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
3700 dispatch_function_t func)
3701 {
3702 _dispatch_barrier_sync_f2(dq, ctxt, func, 0);
3703 }
3704
3705 #ifdef __BLOCKS__
3706 DISPATCH_NOINLINE
3707 static void
3708 _dispatch_sync_block_with_private_data(dispatch_queue_t dq,
3709 void (^work)(void), dispatch_block_flags_t flags)
3710 {
3711 pthread_priority_t pp = _dispatch_block_get_priority(work);
3712
3713 flags |= _dispatch_block_get_flags(work);
3714 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
3715 pthread_priority_t tp = _dispatch_get_priority();
3716 tp &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3717 if (pp < tp) {
3718 pp = tp | _PTHREAD_PRIORITY_ENFORCE_FLAG;
3719 } else if (_dispatch_block_sync_should_enforce_qos_class(flags)) {
3720 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
3721 }
3722 }
3723 // balanced in d_block_sync_invoke or d_block_wait
3724 if (os_atomic_cmpxchg2o(_dispatch_block_get_data(work),
3725 dbpd_queue, NULL, dq, relaxed)) {
3726 _dispatch_retain(dq);
3727 }
3728 if (flags & DISPATCH_BLOCK_BARRIER) {
3729 _dispatch_barrier_sync_f(dq, work, _dispatch_block_sync_invoke, pp);
3730 } else {
3731 _dispatch_sync_f(dq, work, _dispatch_block_sync_invoke, pp);
3732 }
3733 }
3734
3735 void
3736 dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void))
3737 {
3738 if (slowpath(_dispatch_block_has_private_data(work))) {
3739 dispatch_block_flags_t flags = DISPATCH_BLOCK_BARRIER;
3740 return _dispatch_sync_block_with_private_data(dq, work, flags);
3741 }
3742 dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work));
3743 }
3744 #endif
3745
3746 DISPATCH_NOINLINE
3747 void
3748 _dispatch_barrier_trysync_or_async_f(dispatch_queue_t dq, void *ctxt,
3749 dispatch_function_t func)
3750 {
3751 // Use for mutation of queue-/source-internal state only, ignores target
3752 // queue hierarchy!
3753 if (!fastpath(_dispatch_queue_try_acquire_barrier_sync(dq))) {
3754 return _dispatch_barrier_async_detached_f(dq, ctxt, func);
3755 }
3756 // skip the recursion because it's about the queue state only
3757 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
3758 }
3759
3760 #pragma mark -
3761 #pragma mark dispatch_sync
3762
3763 DISPATCH_NOINLINE
3764 static void
3765 _dispatch_non_barrier_complete(dispatch_queue_t dq)
3766 {
3767 uint64_t old_state, new_state;
3768
3769 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
3770 new_state = old_state - DISPATCH_QUEUE_WIDTH_INTERVAL;
3771 if (_dq_state_is_runnable(new_state)) {
3772 if (!_dq_state_is_runnable(old_state)) {
3773 // we're making a FULL -> non FULL transition
3774 new_state |= DISPATCH_QUEUE_DIRTY;
3775 }
3776 if (!_dq_state_drain_locked(new_state)) {
3777 uint64_t full_width = new_state;
3778 if (_dq_state_has_pending_barrier(new_state)) {
3779 full_width -= DISPATCH_QUEUE_PENDING_BARRIER;
3780 full_width += DISPATCH_QUEUE_WIDTH_INTERVAL;
3781 full_width += DISPATCH_QUEUE_IN_BARRIER;
3782 } else {
3783 full_width += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
3784 full_width += DISPATCH_QUEUE_IN_BARRIER;
3785 }
3786 if ((full_width & DISPATCH_QUEUE_WIDTH_MASK) ==
3787 DISPATCH_QUEUE_WIDTH_FULL_BIT) {
3788 new_state = full_width;
3789 new_state &= ~DISPATCH_QUEUE_DIRTY;
3790 new_state |= _dispatch_tid_self();
3791 }
3792 }
3793 }
3794 });
3795
3796 if (_dq_state_is_in_barrier(new_state)) {
3797 return _dispatch_try_lock_transfer_or_wakeup(dq);
3798 }
3799 if (!_dq_state_is_runnable(old_state)) {
3800 _dispatch_queue_try_wakeup(dq, new_state, 0);
3801 }
3802 }
3803
3804 DISPATCH_NOINLINE
3805 static void
3806 _dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3807 pthread_priority_t pp)
3808 {
3809 dispatch_assert(dq->do_targetq);
3810 if (!pp) {
3811 pp = _dispatch_get_priority();
3812 pp &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3813 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
3814 }
3815 dispatch_thread_event_s event;
3816 _dispatch_thread_event_init(&event);
3817 uint32_t th_self = _dispatch_tid_self();
3818 struct dispatch_continuation_s dc = {
3819 .dc_flags = DISPATCH_OBJ_SYNC_SLOW_BIT,
3820 #if DISPATCH_INTROSPECTION
3821 .dc_func = func,
3822 .dc_ctxt = ctxt,
3823 #endif
3824 .dc_data = (void*)(uintptr_t)th_self,
3825 .dc_other = &event,
3826 .dc_priority = pp,
3827 .dc_voucher = DISPATCH_NO_VOUCHER,
3828 };
3829
3830 uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
3831 if (unlikely(_dq_state_drain_locked_by(dq_state, th_self))) {
3832 DISPATCH_CLIENT_CRASH(dq, "dispatch_sync called on queue "
3833 "already owned by current thread");
3834 }
3835
3836 _dispatch_continuation_push_sync_slow(dq, &dc);
3837 _dispatch_thread_event_wait(&event); // acquire
3838 _dispatch_thread_event_destroy(&event);
3839 if (_dispatch_queue_received_override(dq, pp)) {
3840 // Ensure that the root queue sees that this thread was overridden.
3841 // pairs with the _dispatch_wqthread_override_start in
3842 // _dispatch_continuation_slow_item_signal
3843 _dispatch_set_defaultpriority_override();
3844 }
3845 _dispatch_non_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
3846 }
3847
3848 DISPATCH_ALWAYS_INLINE
3849 static inline void
3850 _dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3851 pthread_priority_t pp)
3852 {
3853 // <rdar://problem/24738102&24743140> reserving non barrier width
3854 // doesn't fail if only the ENQUEUED bit is set (unlike its barrier width
3855 // equivalent), so we have to check that this thread hasn't enqueued
3856 // anything ahead of this call or we can break ordering
3857 if (slowpath(dq->dq_items_tail)) {
3858 return _dispatch_sync_f_slow(dq, ctxt, func, pp);
3859 }
3860 // concurrent queues do not respect width on sync
3861 if (slowpath(!_dispatch_queue_try_reserve_sync_width(dq))) {
3862 return _dispatch_sync_f_slow(dq, ctxt, func, pp);
3863 }
3864 _dispatch_non_barrier_sync_f_invoke_inline(dq, ctxt, func, pp);
3865 }
3866
3867 DISPATCH_NOINLINE
3868 static void
3869 _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3870 pthread_priority_t pp)
3871 {
3872 if (DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width)) {
3873 return _dispatch_sync_f2(dq, ctxt, func, pp);
3874 }
3875 return _dispatch_barrier_sync_f(dq, ctxt, func, pp);
3876 }
3877
3878 DISPATCH_NOINLINE
3879 void
3880 dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
3881 {
3882 if (DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width)) {
3883 return _dispatch_sync_f2(dq, ctxt, func, 0);
3884 }
3885 return dispatch_barrier_sync_f(dq, ctxt, func);
3886 }
3887
3888 #ifdef __BLOCKS__
3889 void
3890 dispatch_sync(dispatch_queue_t dq, void (^work)(void))
3891 {
3892 if (slowpath(_dispatch_block_has_private_data(work))) {
3893 return _dispatch_sync_block_with_private_data(dq, work, 0);
3894 }
3895 dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
3896 }
3897 #endif
3898
3899 #pragma mark -
3900 #pragma mark dispatch_trysync
3901
3902 struct trysync_context {
3903 dispatch_queue_t tc_dq;
3904 void *tc_ctxt;
3905 dispatch_function_t tc_func;
3906 };
3907
3908 DISPATCH_NOINLINE
3909 static int
3910 _dispatch_trysync_recurse(dispatch_queue_t dq,
3911 struct trysync_context *tc, bool barrier)
3912 {
3913 dispatch_queue_t tq = dq->do_targetq;
3914
3915 if (barrier) {
3916 if (slowpath(!_dispatch_queue_try_acquire_barrier_sync(dq))) {
3917 return EWOULDBLOCK;
3918 }
3919 } else {
3920 // <rdar://problem/24743140> check nothing was queued by the current
3921 // thread ahead of this call. _dispatch_queue_try_reserve_sync_width
3922 // ignores the ENQUEUED bit which could cause it to miss a barrier_async
3923 // made by the same thread just before.
3924 if (slowpath(dq->dq_items_tail)) {
3925 return EWOULDBLOCK;
3926 }
3927 // concurrent queues do not respect width on sync
3928 if (slowpath(!_dispatch_queue_try_reserve_sync_width(dq))) {
3929 return EWOULDBLOCK;
3930 }
3931 }
3932
3933 int rc = 0;
3934 if (_dispatch_queue_cannot_trysync(tq)) {
3935 _dispatch_queue_atomic_flags_set(dq, DQF_CANNOT_TRYSYNC);
3936 rc = ENOTSUP;
3937 } else if (tq->do_targetq) {
3938 rc = _dispatch_trysync_recurse(tq, tc, tq->dq_width == 1);
3939 if (rc == ENOTSUP) {
3940 _dispatch_queue_atomic_flags_set(dq, DQF_CANNOT_TRYSYNC);
3941 }
3942 } else {
3943 dispatch_thread_frame_s dtf;
3944 _dispatch_thread_frame_push(&dtf, tq);
3945 _dispatch_sync_function_invoke(tc->tc_dq, tc->tc_ctxt, tc->tc_func);
3946 _dispatch_thread_frame_pop(&dtf);
3947 }
3948 if (barrier) {
3949 _dispatch_barrier_complete(dq);
3950 } else {
3951 _dispatch_non_barrier_complete(dq);
3952 }
3953 return rc;
3954 }
3955
3956 DISPATCH_NOINLINE
3957 bool
3958 _dispatch_barrier_trysync_f(dispatch_queue_t dq, void *ctxt,
3959 dispatch_function_t f)
3960 {
3961 if (slowpath(!dq->do_targetq)) {
3962 _dispatch_sync_function_invoke(dq, ctxt, f);
3963 return true;
3964 }
3965 if (slowpath(_dispatch_queue_cannot_trysync(dq))) {
3966 return false;
3967 }
3968 struct trysync_context tc = {
3969 .tc_dq = dq,
3970 .tc_func = f,
3971 .tc_ctxt = ctxt,
3972 };
3973 return _dispatch_trysync_recurse(dq, &tc, true) == 0;
3974 }
3975
3976 DISPATCH_NOINLINE
3977 bool
3978 _dispatch_trysync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t f)
3979 {
3980 if (slowpath(!dq->do_targetq)) {
3981 _dispatch_sync_function_invoke(dq, ctxt, f);
3982 return true;
3983 }
3984 if (slowpath(_dispatch_queue_cannot_trysync(dq))) {
3985 return false;
3986 }
3987 struct trysync_context tc = {
3988 .tc_dq = dq,
3989 .tc_func = f,
3990 .tc_ctxt = ctxt,
3991 };
3992 return _dispatch_trysync_recurse(dq, &tc, dq->dq_width == 1) == 0;
3993 }
3994
3995 #pragma mark -
3996 #pragma mark dispatch_after
3997
3998 DISPATCH_ALWAYS_INLINE
3999 static inline void
4000 _dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
4001 void *ctxt, void *handler, bool block)
4002 {
4003 dispatch_source_t ds;
4004 uint64_t leeway, delta;
4005
4006 if (when == DISPATCH_TIME_FOREVER) {
4007 #if DISPATCH_DEBUG
4008 DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
4009 #endif
4010 return;
4011 }
4012
4013 delta = _dispatch_timeout(when);
4014 if (delta == 0) {
4015 if (block) {
4016 return dispatch_async(queue, handler);
4017 }
4018 return dispatch_async_f(queue, ctxt, handler);
4019 }
4020 leeway = delta / 10; // <rdar://problem/13447496>
4021
4022 if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
4023 if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
4024
4025 // this function can and should be optimized to not use a dispatch source
4026 ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
4027 dispatch_assert(ds);
4028
4029 dispatch_continuation_t dc = _dispatch_continuation_alloc();
4030 if (block) {
4031 _dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
4032 } else {
4033 _dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
4034 }
4035 // reference `ds` so that it doesn't show up as a leak
4036 dc->dc_data = ds;
4037 _dispatch_source_set_event_handler_continuation(ds, dc);
4038 dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, leeway);
4039 dispatch_activate(ds);
4040 }
4041
4042 DISPATCH_NOINLINE
4043 void
4044 dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
4045 dispatch_function_t func)
4046 {
4047 _dispatch_after(when, queue, ctxt, func, false);
4048 }
4049
4050 #ifdef __BLOCKS__
4051 void
4052 dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
4053 dispatch_block_t work)
4054 {
4055 _dispatch_after(when, queue, NULL, work, true);
4056 }
4057 #endif
4058
4059 #pragma mark -
4060 #pragma mark dispatch_queue_wakeup
4061
4062 DISPATCH_NOINLINE
4063 void
4064 _dispatch_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
4065 dispatch_wakeup_flags_t flags)
4066 {
4067 dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
4068
4069 if (_dispatch_queue_class_probe(dq)) {
4070 target = DISPATCH_QUEUE_WAKEUP_TARGET;
4071 }
4072 if (target) {
4073 return _dispatch_queue_class_wakeup(dq, pp, flags, target);
4074 } else if (pp) {
4075 return _dispatch_queue_class_override_drainer(dq, pp, flags);
4076 } else if (flags & DISPATCH_WAKEUP_CONSUME) {
4077 return _dispatch_release_tailcall(dq);
4078 }
4079 }
4080
4081 #if DISPATCH_COCOA_COMPAT
4082 DISPATCH_ALWAYS_INLINE
4083 static inline bool
4084 _dispatch_runloop_handle_is_valid(dispatch_runloop_handle_t handle)
4085 {
4086 #if TARGET_OS_MAC
4087 return MACH_PORT_VALID(handle);
4088 #elif defined(__linux__)
4089 return handle >= 0;
4090 #else
4091 #error "runloop support not implemented on this platform"
4092 #endif
4093 }
4094
4095 DISPATCH_ALWAYS_INLINE
4096 static inline dispatch_runloop_handle_t
4097 _dispatch_runloop_queue_get_handle(dispatch_queue_t dq)
4098 {
4099 #if TARGET_OS_MAC
4100 return ((dispatch_runloop_handle_t)(uintptr_t)dq->do_ctxt);
4101 #elif defined(__linux__)
4102 // decode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4103 return ((dispatch_runloop_handle_t)(uintptr_t)dq->do_ctxt) - 1;
4104 #else
4105 #error "runloop support not implemented on this platform"
4106 #endif
4107 }
4108
4109 DISPATCH_ALWAYS_INLINE
4110 static inline void
4111 _dispatch_runloop_queue_set_handle(dispatch_queue_t dq, dispatch_runloop_handle_t handle)
4112 {
4113 #if TARGET_OS_MAC
4114 dq->do_ctxt = (void *)(uintptr_t)handle;
4115 #elif defined(__linux__)
4116 // encode: 0 is a valid fd, so offset by 1 to distinguish from NULL
4117 dq->do_ctxt = (void *)(uintptr_t)(handle + 1);
4118 #else
4119 #error "runloop support not implemented on this platform"
4120 #endif
4121 }
4122 #endif // DISPATCH_COCOA_COMPAT
4123
4124 void
4125 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
4126 dispatch_wakeup_flags_t flags)
4127 {
4128 #if DISPATCH_COCOA_COMPAT
4129 if (slowpath(_dispatch_queue_atomic_flags(dq) & DQF_RELEASED)) {
4130 // <rdar://problem/14026816>
4131 return _dispatch_queue_wakeup(dq, pp, flags);
4132 }
4133
4134 if (_dispatch_queue_class_probe(dq)) {
4135 return _dispatch_runloop_queue_poke(dq, pp, flags);
4136 }
4137
4138 pp = _dispatch_queue_reset_override_priority(dq, true);
4139 if (pp) {
4140 mach_port_t owner = DISPATCH_QUEUE_DRAIN_OWNER(dq);
4141 if (_dispatch_queue_class_probe(dq)) {
4142 _dispatch_runloop_queue_poke(dq, pp, flags);
4143 }
4144 _dispatch_thread_override_end(owner, dq);
4145 return;
4146 }
4147 if (flags & DISPATCH_WAKEUP_CONSUME) {
4148 return _dispatch_release_tailcall(dq);
4149 }
4150 #else
4151 return _dispatch_queue_wakeup(dq, pp, flags);
4152 #endif
4153 }
4154
4155 void
4156 _dispatch_main_queue_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
4157 dispatch_wakeup_flags_t flags)
4158 {
4159 #if DISPATCH_COCOA_COMPAT
4160 if (_dispatch_queue_is_thread_bound(dq)) {
4161 return _dispatch_runloop_queue_wakeup(dq, pp, flags);
4162 }
4163 #endif
4164 return _dispatch_queue_wakeup(dq, pp, flags);
4165 }
4166
4167 void
4168 _dispatch_root_queue_wakeup(dispatch_queue_t dq,
4169 pthread_priority_t pp DISPATCH_UNUSED,
4170 dispatch_wakeup_flags_t flags)
4171 {
4172 if (flags & DISPATCH_WAKEUP_CONSUME) {
4173 // see _dispatch_queue_push_set_head
4174 dispatch_assert(flags & DISPATCH_WAKEUP_FLUSH);
4175 }
4176 _dispatch_global_queue_poke(dq);
4177 }
4178
4179 #pragma mark -
4180 #pragma mark dispatch root queues poke
4181
4182 #if DISPATCH_COCOA_COMPAT
4183 static inline void
4184 _dispatch_runloop_queue_class_poke(dispatch_queue_t dq)
4185 {
4186 dispatch_runloop_handle_t handle = _dispatch_runloop_queue_get_handle(dq);
4187 if (!_dispatch_runloop_handle_is_valid(handle)) {
4188 return;
4189 }
4190
4191 #if TARGET_OS_MAC
4192 mach_port_t mp = handle;
4193 kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0);
4194 switch (kr) {
4195 case MACH_SEND_TIMEOUT:
4196 case MACH_SEND_TIMED_OUT:
4197 case MACH_SEND_INVALID_DEST:
4198 break;
4199 default:
4200 (void)dispatch_assume_zero(kr);
4201 break;
4202 }
4203 #elif defined(__linux__)
4204 int result;
4205 do {
4206 result = eventfd_write(handle, 1);
4207 } while (result == -1 && errno == EINTR);
4208 (void)dispatch_assume_zero(result);
4209 #else
4210 #error "runloop support not implemented on this platform"
4211 #endif
4212 }
4213
4214 DISPATCH_NOINLINE
4215 static void
4216 _dispatch_runloop_queue_poke(dispatch_queue_t dq,
4217 pthread_priority_t pp, dispatch_wakeup_flags_t flags)
4218 {
4219 // it's not useful to handle WAKEUP_FLUSH because mach_msg() will have
4220 // a release barrier and that when runloop queues stop being thread bound
4221 // they have a non optional wake-up to start being a "normal" queue
4222 // either in _dispatch_runloop_queue_xref_dispose,
4223 // or in _dispatch_queue_cleanup2() for the main thread.
4224
4225 if (dq == &_dispatch_main_q) {
4226 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
4227 _dispatch_runloop_queue_handle_init);
4228 }
4229 _dispatch_queue_override_priority(dq, /* inout */ &pp, /* inout */ &flags);
4230 if (flags & DISPATCH_WAKEUP_OVERRIDING) {
4231 mach_port_t owner = DISPATCH_QUEUE_DRAIN_OWNER(dq);
4232 _dispatch_thread_override_start(owner, pp, dq);
4233 if (flags & DISPATCH_WAKEUP_WAS_OVERRIDDEN) {
4234 _dispatch_thread_override_end(owner, dq);
4235 }
4236 }
4237 _dispatch_runloop_queue_class_poke(dq);
4238 if (flags & DISPATCH_WAKEUP_CONSUME) {
4239 return _dispatch_release_tailcall(dq);
4240 }
4241 }
4242 #endif
4243
4244 DISPATCH_NOINLINE
4245 static void
4246 _dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n)
4247 {
4248 dispatch_root_queue_context_t qc = dq->do_ctxt;
4249 uint32_t i = n;
4250 int r;
4251
4252 _dispatch_debug_root_queue(dq, __func__);
4253 #if HAVE_PTHREAD_WORKQUEUES
4254 #if DISPATCH_USE_PTHREAD_POOL
4255 if (qc->dgq_kworkqueue != (void*)(~0ul))
4256 #endif
4257 {
4258 _dispatch_root_queue_debug("requesting new worker thread for global "
4259 "queue: %p", dq);
4260 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4261 if (qc->dgq_kworkqueue) {
4262 pthread_workitem_handle_t wh;
4263 unsigned int gen_cnt;
4264 do {
4265 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
4266 _dispatch_worker_thread4, dq, &wh, &gen_cnt);
4267 (void)dispatch_assume_zero(r);
4268 } while (--i);
4269 return;
4270 }
4271 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4272 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
4273 if (!dq->dq_priority) {
4274 r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
4275 qc->dgq_wq_options, (int)i);
4276 (void)dispatch_assume_zero(r);
4277 return;
4278 }
4279 #endif
4280 #if HAVE_PTHREAD_WORKQUEUE_QOS
4281 r = _pthread_workqueue_addthreads((int)i, dq->dq_priority);
4282 (void)dispatch_assume_zero(r);
4283 #endif
4284 return;
4285 }
4286 #endif // HAVE_PTHREAD_WORKQUEUES
4287 #if DISPATCH_USE_PTHREAD_POOL
4288 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
4289 if (fastpath(pqc->dpq_thread_mediator.do_vtable)) {
4290 while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
4291 if (!--i) {
4292 return;
4293 }
4294 }
4295 }
4296 uint32_t j, t_count;
4297 // seq_cst with atomic store to tail <rdar://problem/16932833>
4298 t_count = os_atomic_load2o(qc, dgq_thread_pool_size, ordered);
4299 do {
4300 if (!t_count) {
4301 _dispatch_root_queue_debug("pthread pool is full for root queue: "
4302 "%p", dq);
4303 return;
4304 }
4305 j = i > t_count ? t_count : i;
4306 } while (!os_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count,
4307 t_count - j, &t_count, acquire));
4308
4309 pthread_attr_t *attr = &pqc->dpq_thread_attr;
4310 pthread_t tid, *pthr = &tid;
4311 #if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
4312 if (slowpath(dq == &_dispatch_mgr_root_queue)) {
4313 pthr = _dispatch_mgr_root_queue_init();
4314 }
4315 #endif
4316 do {
4317 _dispatch_retain(dq);
4318 while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
4319 if (r != EAGAIN) {
4320 (void)dispatch_assume_zero(r);
4321 }
4322 _dispatch_temporary_resource_shortage();
4323 }
4324 } while (--j);
4325 #endif // DISPATCH_USE_PTHREAD_POOL
4326 }
4327
4328 static inline void
4329 _dispatch_global_queue_poke_n(dispatch_queue_t dq, unsigned int n)
4330 {
4331 if (!_dispatch_queue_class_probe(dq)) {
4332 return;
4333 }
4334 #if HAVE_PTHREAD_WORKQUEUES
4335 dispatch_root_queue_context_t qc = dq->do_ctxt;
4336 if (
4337 #if DISPATCH_USE_PTHREAD_POOL
4338 (qc->dgq_kworkqueue != (void*)(~0ul)) &&
4339 #endif
4340 !os_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) {
4341 _dispatch_root_queue_debug("worker thread request still pending for "
4342 "global queue: %p", dq);
4343 return;
4344 }
4345 #endif // HAVE_PTHREAD_WORKQUEUES
4346 return _dispatch_global_queue_poke_slow(dq, n);
4347 }
4348
4349 static inline void
4350 _dispatch_global_queue_poke(dispatch_queue_t dq)
4351 {
4352 return _dispatch_global_queue_poke_n(dq, 1);
4353 }
4354
4355 DISPATCH_NOINLINE
4356 void
4357 _dispatch_queue_push_list_slow(dispatch_queue_t dq, unsigned int n)
4358 {
4359 return _dispatch_global_queue_poke_n(dq, n);
4360 }
4361
4362 #pragma mark -
4363 #pragma mark dispatch_queue_drain
4364
4365 void
4366 _dispatch_continuation_pop(dispatch_object_t dou, dispatch_queue_t dq,
4367 dispatch_invoke_flags_t flags)
4368 {
4369 _dispatch_continuation_pop_inline(dou, dq, flags);
4370 }
4371
4372 void
4373 _dispatch_continuation_invoke(dispatch_object_t dou, voucher_t override_voucher,
4374 dispatch_invoke_flags_t flags)
4375 {
4376 _dispatch_continuation_invoke_inline(dou, override_voucher, flags);
4377 }
4378
4379 /*
4380 * Drain comes in 2 flavours (serial/concurrent) and 2 modes
4381 * (redirecting or not).
4382 *
4383 * Serial
4384 * ~~~~~~
4385 * Serial drain is about serial queues (width == 1). It doesn't support
4386 * the redirecting mode, which doesn't make sense, and treats all continuations
4387 * as barriers. Bookkeeping is minimal in serial flavour, most of the loop
4388 * is optimized away.
4389 *
4390 * Serial drain stops if the width of the queue grows to larger than 1.
4391 * Going through a serial drain prevents any recursive drain from being
4392 * redirecting.
4393 *
4394 * Concurrent
4395 * ~~~~~~~~~~
4396 * When in non-redirecting mode (meaning one of the target queues is serial),
4397 * non-barriers and barriers alike run in the context of the drain thread.
4398 * Slow non-barrier items are still all signaled so that they can make progress
4399 * toward the dispatch_sync() that will serialize them all .
4400 *
4401 * In redirecting mode, non-barrier work items are redirected downward.
4402 *
4403 * Concurrent drain stops if the width of the queue becomes 1, so that the
4404 * queue drain moves to the more efficient serial mode.
4405 */
4406 DISPATCH_ALWAYS_INLINE
4407 static dispatch_queue_t
4408 _dispatch_queue_drain(dispatch_queue_t dq, dispatch_invoke_flags_t flags,
4409 uint64_t *owned_ptr, struct dispatch_object_s **dc_out,
4410 bool serial_drain)
4411 {
4412 dispatch_queue_t orig_tq = dq->do_targetq;
4413 dispatch_thread_frame_s dtf;
4414 struct dispatch_object_s *dc = NULL, *next_dc;
4415 uint64_t owned = *owned_ptr;
4416
4417 _dispatch_thread_frame_push(&dtf, dq);
4418 if (_dq_state_is_in_barrier(owned)) {
4419 // we really own `IN_BARRIER + dq->dq_width * WIDTH_INTERVAL`
4420 // but width can change while draining barrier work items, so we only
4421 // convert to `dq->dq_width * WIDTH_INTERVAL` when we drop `IN_BARRIER`
4422 owned = DISPATCH_QUEUE_IN_BARRIER;
4423 }
4424
4425 while (dq->dq_items_tail) {
4426 dc = _dispatch_queue_head(dq);
4427 do {
4428 if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(dq))) {
4429 goto out;
4430 }
4431 if (unlikely(orig_tq != dq->do_targetq)) {
4432 goto out;
4433 }
4434 if (unlikely(serial_drain != (dq->dq_width == 1))) {
4435 goto out;
4436 }
4437 if (serial_drain || _dispatch_object_is_barrier(dc)) {
4438 if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
4439 goto out;
4440 }
4441 next_dc = _dispatch_queue_next(dq, dc);
4442 if (_dispatch_object_is_slow_item(dc)) {
4443 owned = 0;
4444 goto out_with_deferred;
4445 }
4446 } else {
4447 if (owned == DISPATCH_QUEUE_IN_BARRIER) {
4448 // we just ran barrier work items, we have to make their
4449 // effect visible to other sync work items on other threads
4450 // that may start coming in after this point, hence the
4451 // release barrier
4452 os_atomic_and2o(dq, dq_state, ~owned, release);
4453 owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4454 } else if (unlikely(owned == 0)) {
4455 if (_dispatch_object_is_slow_item(dc)) {
4456 // sync "readers" don't observe the limit
4457 _dispatch_queue_reserve_sync_width(dq);
4458 } else if (!_dispatch_queue_try_acquire_async(dq)) {
4459 goto out_with_no_width;
4460 }
4461 owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
4462 }
4463
4464 next_dc = _dispatch_queue_next(dq, dc);
4465 if (_dispatch_object_is_slow_item(dc)) {
4466 owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4467 _dispatch_continuation_slow_item_signal(dq, dc);
4468 continue;
4469 }
4470
4471 if (flags & DISPATCH_INVOKE_REDIRECTING_DRAIN) {
4472 owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4473 _dispatch_continuation_redirect(dq, dc);
4474 continue;
4475 }
4476 }
4477
4478 _dispatch_continuation_pop_inline(dc, dq, flags);
4479 _dispatch_perfmon_workitem_inc();
4480 if (unlikely(dtf.dtf_deferred)) {
4481 goto out_with_deferred_compute_owned;
4482 }
4483 } while ((dc = next_dc));
4484 }
4485
4486 out:
4487 if (owned == DISPATCH_QUEUE_IN_BARRIER) {
4488 // if we're IN_BARRIER we really own the full width too
4489 owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4490 }
4491 if (dc) {
4492 owned = _dispatch_queue_adjust_owned(dq, owned, dc);
4493 }
4494 *owned_ptr = owned;
4495 _dispatch_thread_frame_pop(&dtf);
4496 return dc ? dq->do_targetq : NULL;
4497
4498 out_with_no_width:
4499 *owned_ptr = 0;
4500 _dispatch_thread_frame_pop(&dtf);
4501 return NULL;
4502
4503 out_with_deferred_compute_owned:
4504 if (serial_drain) {
4505 owned = DISPATCH_QUEUE_IN_BARRIER + DISPATCH_QUEUE_WIDTH_INTERVAL;
4506 } else {
4507 if (owned == DISPATCH_QUEUE_IN_BARRIER) {
4508 // if we're IN_BARRIER we really own the full width too
4509 owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4510 }
4511 if (next_dc) {
4512 owned = _dispatch_queue_adjust_owned(dq, owned, next_dc);
4513 }
4514 }
4515 out_with_deferred:
4516 *owned_ptr = owned;
4517 if (unlikely(!dc_out)) {
4518 DISPATCH_INTERNAL_CRASH(dc,
4519 "Deferred continuation on source, mach channel or mgr");
4520 }
4521 *dc_out = dc;
4522 _dispatch_thread_frame_pop(&dtf);
4523 return dq->do_targetq;
4524 }
4525
4526 DISPATCH_NOINLINE
4527 static dispatch_queue_t
4528 _dispatch_queue_concurrent_drain(dispatch_queue_t dq,
4529 dispatch_invoke_flags_t flags, uint64_t *owned,
4530 struct dispatch_object_s **dc_ptr)
4531 {
4532 return _dispatch_queue_drain(dq, flags, owned, dc_ptr, false);
4533 }
4534
4535 DISPATCH_NOINLINE
4536 dispatch_queue_t
4537 _dispatch_queue_serial_drain(dispatch_queue_t dq,
4538 dispatch_invoke_flags_t flags, uint64_t *owned,
4539 struct dispatch_object_s **dc_ptr)
4540 {
4541 flags &= ~(dispatch_invoke_flags_t)DISPATCH_INVOKE_REDIRECTING_DRAIN;
4542 return _dispatch_queue_drain(dq, flags, owned, dc_ptr, true);
4543 }
4544
4545 #if DISPATCH_COCOA_COMPAT
4546 static void
4547 _dispatch_main_queue_drain(void)
4548 {
4549 dispatch_queue_t dq = &_dispatch_main_q;
4550 dispatch_thread_frame_s dtf;
4551
4552 if (!dq->dq_items_tail) {
4553 return;
4554 }
4555
4556 if (!fastpath(_dispatch_queue_is_thread_bound(dq))) {
4557 DISPATCH_CLIENT_CRASH(0, "_dispatch_main_queue_callback_4CF called"
4558 " after dispatch_main()");
4559 }
4560 mach_port_t owner = DISPATCH_QUEUE_DRAIN_OWNER(dq);
4561 if (slowpath(owner != _dispatch_tid_self())) {
4562 DISPATCH_CLIENT_CRASH(owner, "_dispatch_main_queue_callback_4CF called"
4563 " from the wrong thread");
4564 }
4565
4566 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
4567 _dispatch_runloop_queue_handle_init);
4568
4569 _dispatch_perfmon_start();
4570 // <rdar://problem/23256682> hide the frame chaining when CFRunLoop
4571 // drains the main runloop, as this should not be observable that way
4572 _dispatch_thread_frame_push_and_rebase(&dtf, dq, NULL);
4573
4574 pthread_priority_t old_pri = _dispatch_get_priority();
4575 pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri, NULL);
4576 voucher_t voucher = _voucher_copy();
4577
4578 struct dispatch_object_s *dc, *next_dc, *tail;
4579 dc = os_mpsc_capture_snapshot(dq, dq_items, &tail);
4580 do {
4581 next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
4582 _dispatch_continuation_pop_inline(dc, dq, DISPATCH_INVOKE_NONE);
4583 _dispatch_perfmon_workitem_inc();
4584 } while ((dc = next_dc));
4585
4586 // runloop based queues use their port for the queue PUBLISH pattern
4587 // so this raw call to dx_wakeup(0) is valid
4588 dx_wakeup(dq, 0, 0);
4589 _dispatch_voucher_debug("main queue restore", voucher);
4590 _dispatch_reset_defaultpriority(old_dp);
4591 _dispatch_reset_priority_and_voucher(old_pri, voucher);
4592 _dispatch_thread_frame_pop(&dtf);
4593 _dispatch_perfmon_end();
4594 _dispatch_force_cache_cleanup();
4595 }
4596
4597 static bool
4598 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq)
4599 {
4600 if (!dq->dq_items_tail) {
4601 return false;
4602 }
4603 dispatch_thread_frame_s dtf;
4604 _dispatch_perfmon_start();
4605 _dispatch_thread_frame_push(&dtf, dq);
4606 pthread_priority_t old_pri = _dispatch_get_priority();
4607 pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri, NULL);
4608 voucher_t voucher = _voucher_copy();
4609
4610 struct dispatch_object_s *dc, *next_dc;
4611 dc = _dispatch_queue_head(dq);
4612 next_dc = _dispatch_queue_next(dq, dc);
4613 _dispatch_continuation_pop_inline(dc, dq, DISPATCH_INVOKE_NONE);
4614 _dispatch_perfmon_workitem_inc();
4615
4616 if (!next_dc) {
4617 // runloop based queues use their port for the queue PUBLISH pattern
4618 // so this raw call to dx_wakeup(0) is valid
4619 dx_wakeup(dq, 0, 0);
4620 }
4621
4622 _dispatch_voucher_debug("runloop queue restore", voucher);
4623 _dispatch_reset_defaultpriority(old_dp);
4624 _dispatch_reset_priority_and_voucher(old_pri, voucher);
4625 _dispatch_thread_frame_pop(&dtf);
4626 _dispatch_perfmon_end();
4627 _dispatch_force_cache_cleanup();
4628 return next_dc;
4629 }
4630 #endif
4631
4632 DISPATCH_NOINLINE
4633 void
4634 _dispatch_try_lock_transfer_or_wakeup(dispatch_queue_t dq)
4635 {
4636 dispatch_continuation_t dc_tmp, dc_start, dc_end;
4637 struct dispatch_object_s *dc = NULL;
4638 uint64_t dq_state, owned;
4639 size_t count = 0;
4640
4641 owned = DISPATCH_QUEUE_IN_BARRIER;
4642 owned += dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
4643 attempt_running_slow_head:
4644 if (slowpath(dq->dq_items_tail) && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
4645 dc = _dispatch_queue_head(dq);
4646 if (!_dispatch_object_is_slow_item(dc)) {
4647 // not a slow item, needs to wake up
4648 } else if (fastpath(dq->dq_width == 1) ||
4649 _dispatch_object_is_barrier(dc)) {
4650 // rdar://problem/8290662 "barrier/writer lock transfer"
4651 dc_start = dc_end = (dispatch_continuation_t)dc;
4652 owned = 0;
4653 count = 1;
4654 dc = _dispatch_queue_next(dq, dc);
4655 } else {
4656 // <rdar://problem/10164594> "reader lock transfer"
4657 // we must not signal semaphores immediately because our right
4658 // for dequeuing is granted through holding the full "barrier" width
4659 // which a signaled work item could relinquish out from our feet
4660 dc_start = (dispatch_continuation_t)dc;
4661 do {
4662 // no check on width here because concurrent queues
4663 // do not respect width for blocked readers, the thread
4664 // is already spent anyway
4665 dc_end = (dispatch_continuation_t)dc;
4666 owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4667 count++;
4668 dc = _dispatch_queue_next(dq, dc);
4669 } while (dc && _dispatch_object_is_slow_non_barrier(dc));
4670 }
4671
4672 if (count) {
4673 _dispatch_queue_drain_transfer_lock(dq, owned, dc_start);
4674 do {
4675 // signaled job will release the continuation
4676 dc_tmp = dc_start;
4677 dc_start = dc_start->do_next;
4678 _dispatch_continuation_slow_item_signal(dq, dc_tmp);
4679 } while (dc_tmp != dc_end);
4680 return;
4681 }
4682 }
4683
4684 if (dc || dx_metatype(dq) != _DISPATCH_QUEUE_TYPE) {
4685 // <rdar://problem/23336992> the following wakeup is needed for sources
4686 // or mach channels: when ds_pending_data is set at the same time
4687 // as a trysync_f happens, lock transfer code above doesn't know about
4688 // ds_pending_data or the wakeup logic, but lock transfer is useless
4689 // for sources and mach channels in the first place.
4690 owned = _dispatch_queue_adjust_owned(dq, owned, dc);
4691 dq_state = _dispatch_queue_drain_unlock(dq, owned, NULL);
4692 return _dispatch_queue_try_wakeup(dq, dq_state, 0);
4693 } else if (!fastpath(_dispatch_queue_drain_try_unlock(dq, owned))) {
4694 // someone enqueued a slow item at the head
4695 // looping may be its last chance
4696 goto attempt_running_slow_head;
4697 }
4698 }
4699
4700 void
4701 _dispatch_mgr_queue_drain(void)
4702 {
4703 const dispatch_invoke_flags_t flags = DISPATCH_INVOKE_MANAGER_DRAIN;
4704 dispatch_queue_t dq = &_dispatch_mgr_q;
4705 uint64_t owned = DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
4706
4707 if (dq->dq_items_tail) {
4708 _dispatch_perfmon_start();
4709 if (slowpath(_dispatch_queue_serial_drain(dq, flags, &owned, NULL))) {
4710 DISPATCH_INTERNAL_CRASH(0, "Interrupted drain on manager queue");
4711 }
4712 _dispatch_voucher_debug("mgr queue clear", NULL);
4713 _voucher_clear();
4714 _dispatch_reset_defaultpriority_override();
4715 _dispatch_perfmon_end();
4716 }
4717
4718 #if DISPATCH_USE_KEVENT_WORKQUEUE
4719 if (!_dispatch_kevent_workqueue_enabled)
4720 #endif
4721 {
4722 _dispatch_force_cache_cleanup();
4723 }
4724 }
4725
4726 #pragma mark -
4727 #pragma mark dispatch_queue_invoke
4728
4729 void
4730 _dispatch_queue_drain_deferred_invoke(dispatch_queue_t dq,
4731 dispatch_invoke_flags_t flags, uint64_t to_unlock,
4732 struct dispatch_object_s *dc)
4733 {
4734 if (_dispatch_object_is_slow_item(dc)) {
4735 dispatch_assert(to_unlock == 0);
4736 _dispatch_queue_drain_transfer_lock(dq, to_unlock, dc);
4737 _dispatch_continuation_slow_item_signal(dq, dc);
4738 return _dispatch_release_tailcall(dq);
4739 }
4740
4741 bool should_defer_again = false, should_pend_queue = true;
4742 uint64_t old_state, new_state;
4743
4744 if (_dispatch_get_current_queue()->do_targetq) {
4745 _dispatch_thread_frame_get_current()->dtf_deferred = dc;
4746 should_defer_again = true;
4747 should_pend_queue = false;
4748 }
4749
4750 if (dq->dq_width > 1) {
4751 should_pend_queue = false;
4752 } else if (should_pend_queue) {
4753 dispatch_assert(to_unlock ==
4754 DISPATCH_QUEUE_WIDTH_INTERVAL + DISPATCH_QUEUE_IN_BARRIER);
4755 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{
4756 new_state = old_state;
4757 if (_dq_state_has_waiters(old_state) ||
4758 _dq_state_is_enqueued(old_state)) {
4759 os_atomic_rmw_loop_give_up(break);
4760 }
4761 new_state += DISPATCH_QUEUE_DRAIN_PENDED;
4762 new_state -= DISPATCH_QUEUE_IN_BARRIER;
4763 new_state -= DISPATCH_QUEUE_WIDTH_INTERVAL;
4764 });
4765 should_pend_queue = (new_state & DISPATCH_QUEUE_DRAIN_PENDED);
4766 }
4767
4768 if (!should_pend_queue) {
4769 if (to_unlock & DISPATCH_QUEUE_IN_BARRIER) {
4770 _dispatch_try_lock_transfer_or_wakeup(dq);
4771 _dispatch_release(dq);
4772 } else if (to_unlock) {
4773 uint64_t dq_state = _dispatch_queue_drain_unlock(dq, to_unlock, NULL);
4774 _dispatch_queue_try_wakeup(dq, dq_state, DISPATCH_WAKEUP_CONSUME);
4775 } else {
4776 _dispatch_release(dq);
4777 }
4778 dq = NULL;
4779 }
4780
4781 if (!should_defer_again) {
4782 dx_invoke(dc, flags & _DISPATCH_INVOKE_PROPAGATE_MASK);
4783 }
4784
4785 if (dq) {
4786 uint32_t self = _dispatch_tid_self();
4787 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{
4788 new_state = old_state;
4789 if (!_dq_state_drain_pended(old_state) ||
4790 _dq_state_drain_owner(old_state) != self) {
4791 os_atomic_rmw_loop_give_up({
4792 // We may have been overridden, so inform the root queue
4793 _dispatch_set_defaultpriority_override();
4794 return _dispatch_release_tailcall(dq);
4795 });
4796 }
4797 new_state = DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT(new_state);
4798 });
4799 if (_dq_state_has_override(old_state)) {
4800 // Ensure that the root queue sees that this thread was overridden.
4801 _dispatch_set_defaultpriority_override();
4802 }
4803 return dx_invoke(dq, flags | DISPATCH_INVOKE_STEALING);
4804 }
4805 }
4806
4807 void
4808 _dispatch_queue_finalize_activation(dispatch_queue_t dq)
4809 {
4810 dispatch_queue_t tq = dq->do_targetq;
4811 _dispatch_queue_priority_inherit_from_target(dq, tq);
4812 _dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
4813 if (dq->dq_override_voucher == DISPATCH_NO_VOUCHER) {
4814 voucher_t v = tq->dq_override_voucher;
4815 if (v != DISPATCH_NO_VOUCHER) {
4816 if (v) _voucher_retain(v);
4817 dq->dq_override_voucher = v;
4818 }
4819 }
4820 }
4821
4822 DISPATCH_ALWAYS_INLINE
4823 static inline dispatch_queue_t
4824 dispatch_queue_invoke2(dispatch_queue_t dq, dispatch_invoke_flags_t flags,
4825 uint64_t *owned, struct dispatch_object_s **dc_ptr)
4826 {
4827 dispatch_queue_t otq = dq->do_targetq;
4828 dispatch_queue_t cq = _dispatch_queue_get_current();
4829
4830 if (slowpath(cq != otq)) {
4831 return otq;
4832 }
4833 if (dq->dq_width == 1) {
4834 return _dispatch_queue_serial_drain(dq, flags, owned, dc_ptr);
4835 }
4836 return _dispatch_queue_concurrent_drain(dq, flags, owned, dc_ptr);
4837 }
4838
4839 // 6618342 Contact the team that owns the Instrument DTrace probe before
4840 // renaming this symbol
4841 DISPATCH_NOINLINE
4842 void
4843 _dispatch_queue_invoke(dispatch_queue_t dq, dispatch_invoke_flags_t flags)
4844 {
4845 _dispatch_queue_class_invoke(dq, flags, dispatch_queue_invoke2);
4846 }
4847
4848 #pragma mark -
4849 #pragma mark dispatch_queue_class_wakeup
4850
4851 #if HAVE_PTHREAD_WORKQUEUE_QOS
4852 void
4853 _dispatch_queue_override_invoke(dispatch_continuation_t dc,
4854 dispatch_invoke_flags_t flags)
4855 {
4856 dispatch_queue_t old_rq = _dispatch_queue_get_current();
4857 dispatch_queue_t assumed_rq = dc->dc_other;
4858 voucher_t ov = DISPATCH_NO_VOUCHER;
4859 dispatch_object_t dou;
4860
4861 dou._do = dc->dc_data;
4862 _dispatch_queue_set_current(assumed_rq);
4863 flags |= DISPATCH_INVOKE_OVERRIDING;
4864 if (dc_type(dc) == DISPATCH_CONTINUATION_TYPE(OVERRIDE_STEALING)) {
4865 flags |= DISPATCH_INVOKE_STEALING;
4866 } else {
4867 // balance the fake continuation push in
4868 // _dispatch_root_queue_push_override
4869 _dispatch_trace_continuation_pop(assumed_rq, dou._do);
4870 }
4871 _dispatch_continuation_pop_forwarded(dc, ov, DISPATCH_OBJ_CONSUME_BIT, {
4872 if (_dispatch_object_has_vtable(dou._do)) {
4873 dx_invoke(dou._do, flags);
4874 } else {
4875 _dispatch_continuation_invoke_inline(dou, ov, flags);
4876 }
4877 });
4878 _dispatch_queue_set_current(old_rq);
4879 }
4880
4881 DISPATCH_ALWAYS_INLINE
4882 static inline bool
4883 _dispatch_need_global_root_queue_push_override(dispatch_queue_t rq,
4884 pthread_priority_t pp)
4885 {
4886 pthread_priority_t rqp = rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
4887 bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
4888
4889 if (unlikely(!rqp)) return false;
4890
4891 pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
4892 return defaultqueue ? pp && pp != rqp : pp > rqp;
4893 }
4894
4895 DISPATCH_ALWAYS_INLINE
4896 static inline bool
4897 _dispatch_need_global_root_queue_push_override_stealer(dispatch_queue_t rq,
4898 pthread_priority_t pp)
4899 {
4900 pthread_priority_t rqp = rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
4901 bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
4902
4903 if (unlikely(!rqp)) return false;
4904
4905 pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
4906 return defaultqueue || pp > rqp;
4907 }
4908
4909 DISPATCH_NOINLINE
4910 static void
4911 _dispatch_root_queue_push_override(dispatch_queue_t orig_rq,
4912 dispatch_object_t dou, pthread_priority_t pp)
4913 {
4914 bool overcommit = orig_rq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
4915 dispatch_queue_t rq = _dispatch_get_root_queue_for_priority(pp, overcommit);
4916 dispatch_continuation_t dc = dou._dc;
4917
4918 if (_dispatch_object_is_redirection(dc)) {
4919 // no double-wrap is needed, _dispatch_async_redirect_invoke will do
4920 // the right thing
4921 dc->dc_func = (void *)orig_rq;
4922 } else {
4923 dc = _dispatch_continuation_alloc();
4924 dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING);
4925 // fake that we queued `dou` on `orig_rq` for introspection purposes
4926 _dispatch_trace_continuation_push(orig_rq, dou);
4927 dc->dc_ctxt = dc;
4928 dc->dc_other = orig_rq;
4929 dc->dc_data = dou._do;
4930 dc->dc_priority = DISPATCH_NO_PRIORITY;
4931 dc->dc_voucher = DISPATCH_NO_VOUCHER;
4932 }
4933
4934 DISPATCH_COMPILER_CAN_ASSUME(dx_type(rq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE);
4935 _dispatch_queue_push_inline(rq, dc, 0, 0);
4936 }
4937
4938 DISPATCH_NOINLINE
4939 static void
4940 _dispatch_root_queue_push_override_stealer(dispatch_queue_t orig_rq,
4941 dispatch_queue_t dq, pthread_priority_t pp)
4942 {
4943 bool overcommit = orig_rq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
4944 dispatch_queue_t rq = _dispatch_get_root_queue_for_priority(pp, overcommit);
4945 dispatch_continuation_t dc = _dispatch_continuation_alloc();
4946
4947 dc->do_vtable = DC_VTABLE(OVERRIDE_STEALING);
4948 _dispatch_retain(dq);
4949 dc->dc_func = NULL;
4950 dc->dc_ctxt = dc;
4951 dc->dc_other = orig_rq;
4952 dc->dc_data = dq;
4953 dc->dc_priority = DISPATCH_NO_PRIORITY;
4954 dc->dc_voucher = DISPATCH_NO_VOUCHER;
4955
4956 DISPATCH_COMPILER_CAN_ASSUME(dx_type(rq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE);
4957 _dispatch_queue_push_inline(rq, dc, 0, 0);
4958 }
4959
4960 DISPATCH_NOINLINE
4961 static void
4962 _dispatch_queue_class_wakeup_with_override(dispatch_queue_t dq,
4963 pthread_priority_t pp, dispatch_wakeup_flags_t flags, uint64_t dq_state)
4964 {
4965 mach_port_t owner = _dq_state_drain_owner(dq_state);
4966 pthread_priority_t pp2;
4967 dispatch_queue_t tq;
4968 bool locked;
4969
4970 if (owner) {
4971 int rc = _dispatch_wqthread_override_start_check_owner(owner, pp,
4972 &dq->dq_state_lock);
4973 // EPERM means the target of the override is not a work queue thread
4974 // and could be a thread bound queue such as the main queue.
4975 // When that happens we must get to that queue and wake it up if we
4976 // want the override to be appplied and take effect.
4977 if (rc != EPERM) {
4978 goto out;
4979 }
4980 }
4981
4982 if (_dq_state_is_suspended(dq_state)) {
4983 goto out;
4984 }
4985
4986 tq = dq->do_targetq;
4987
4988 if (_dispatch_queue_has_immutable_target(dq)) {
4989 locked = false;
4990 } else if (_dispatch_is_in_root_queues_array(tq)) {
4991 // avoid locking when we recognize the target queue as a global root
4992 // queue it is gross, but is a very common case. The locking isn't
4993 // needed because these target queues cannot go away.
4994 locked = false;
4995 } else if (_dispatch_queue_sidelock_trylock(dq, pp)) {
4996 // <rdar://problem/17735825> to traverse the tq chain safely we must
4997 // lock it to ensure it cannot change
4998 locked = true;
4999 tq = dq->do_targetq;
5000 _dispatch_ktrace1(DISPATCH_PERF_mutable_target, dq);
5001 } else {
5002 //
5003 // Leading to being there, the current thread has:
5004 // 1. enqueued an object on `dq`
5005 // 2. raised the dq_override value of `dq`
5006 // 3. set the HAS_OVERRIDE bit and not seen an owner
5007 // 4. tried and failed to acquire the side lock
5008 //
5009 //
5010 // The side lock owner can only be one of three things:
5011 //
5012 // - The suspend/resume side count code. Besides being unlikely,
5013 // it means that at this moment the queue is actually suspended,
5014 // which transfers the responsibility of applying the override to
5015 // the eventual dispatch_resume().
5016 //
5017 // - A dispatch_set_target_queue() call. The fact that we saw no `owner`
5018 // means that the trysync it does wasn't being drained when (3)
5019 // happened which can only be explained by one of these interleavings:
5020 //
5021 // o `dq` became idle between when the object queued in (1) ran and
5022 // the set_target_queue call and we were unlucky enough that our
5023 // step (3) happened while this queue was idle. There is no reason
5024 // to override anything anymore, the queue drained to completion
5025 // while we were preempted, our job is done.
5026 //
5027 // o `dq` is queued but not draining during (1-3), then when we try
5028 // to lock at (4) the queue is now draining a set_target_queue.
5029 // Since we set HAS_OVERRIDE with a release barrier, the effect of
5030 // (2) was visible to the drainer when he acquired the drain lock,
5031 // and that guy has applied our override. Our job is done.
5032 //
5033 // - Another instance of _dispatch_queue_class_wakeup_with_override(),
5034 // which is fine because trylock leaves a hint that we failed our
5035 // trylock, causing the tryunlock below to fail and reassess whether
5036 // a better override needs to be applied.
5037 //
5038 _dispatch_ktrace1(DISPATCH_PERF_mutable_target, dq);
5039 goto out;
5040 }
5041
5042 apply_again:
5043 if (dx_type(tq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
5044 if (_dispatch_need_global_root_queue_push_override_stealer(tq, pp)) {
5045 _dispatch_root_queue_push_override_stealer(tq, dq, pp);
5046 }
5047 } else if (_dispatch_queue_need_override(tq, pp)) {
5048 dx_wakeup(tq, pp, DISPATCH_WAKEUP_OVERRIDING);
5049 }
5050 while (unlikely(locked && !_dispatch_queue_sidelock_tryunlock(dq))) {
5051 // rdar://problem/24081326
5052 //
5053 // Another instance of _dispatch_queue_class_wakeup_with_override()
5054 // tried to acquire the side lock while we were running, and could have
5055 // had a better override than ours to apply.
5056 //
5057 pp2 = dq->dq_override;
5058 if (pp2 > pp) {
5059 pp = pp2;
5060 // The other instance had a better priority than ours, override
5061 // our thread, and apply the override that wasn't applied to `dq`
5062 // because of us.
5063 goto apply_again;
5064 }
5065 }
5066
5067 out:
5068 if (flags & DISPATCH_WAKEUP_CONSUME) {
5069 return _dispatch_release_tailcall(dq);
5070 }
5071 }
5072 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5073
5074 DISPATCH_NOINLINE
5075 void
5076 _dispatch_queue_class_override_drainer(dispatch_queue_t dq,
5077 pthread_priority_t pp, dispatch_wakeup_flags_t flags)
5078 {
5079 #if HAVE_PTHREAD_WORKQUEUE_QOS
5080 uint64_t dq_state, value;
5081
5082 //
5083 // Someone is trying to override the last work item of the queue.
5084 // Do not remember this override on the queue because we know the precise
5085 // duration the override is required for: until the current drain unlocks.
5086 //
5087 // That is why this function only tries to set HAS_OVERRIDE if we can
5088 // still observe a drainer, and doesn't need to set the DIRTY bit
5089 // because oq_override wasn't touched and there is no race to resolve
5090 //
5091 os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
5092 if (!_dq_state_drain_locked(dq_state)) {
5093 os_atomic_rmw_loop_give_up(break);
5094 }
5095 value = dq_state | DISPATCH_QUEUE_HAS_OVERRIDE;
5096 });
5097 if (_dq_state_drain_locked(dq_state)) {
5098 return _dispatch_queue_class_wakeup_with_override(dq, pp,
5099 flags, dq_state);
5100 }
5101 #else
5102 (void)pp;
5103 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5104 if (flags & DISPATCH_WAKEUP_CONSUME) {
5105 return _dispatch_release_tailcall(dq);
5106 }
5107 }
5108
5109 #if DISPATCH_USE_KEVENT_WORKQUEUE
5110 DISPATCH_NOINLINE
5111 static void
5112 _dispatch_trystash_to_deferred_items(dispatch_queue_t dq, dispatch_object_t dou,
5113 pthread_priority_t pp, dispatch_deferred_items_t ddi)
5114 {
5115 dispatch_priority_t old_pp = ddi->ddi_stashed_pp;
5116 dispatch_queue_t old_dq = ddi->ddi_stashed_dq;
5117 struct dispatch_object_s *old_dou = ddi->ddi_stashed_dou;
5118 dispatch_priority_t rq_overcommit;
5119
5120 rq_overcommit = dq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
5121 if (likely(!old_pp || rq_overcommit)) {
5122 ddi->ddi_stashed_dq = dq;
5123 ddi->ddi_stashed_dou = dou._do;
5124 ddi->ddi_stashed_pp = (dispatch_priority_t)pp | rq_overcommit |
5125 _PTHREAD_PRIORITY_PRIORITY_MASK;
5126 if (likely(!old_pp)) {
5127 return;
5128 }
5129 // push the previously stashed item
5130 pp = old_pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
5131 dq = old_dq;
5132 dou._do = old_dou;
5133 }
5134 if (_dispatch_need_global_root_queue_push_override(dq, pp)) {
5135 return _dispatch_root_queue_push_override(dq, dou, pp);
5136 }
5137 // bit of cheating: we should really pass `pp` but we know that we are
5138 // pushing onto a global queue at this point, and we just checked that
5139 // `pp` doesn't matter.
5140 DISPATCH_COMPILER_CAN_ASSUME(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE);
5141 _dispatch_queue_push_inline(dq, dou, 0, 0);
5142 }
5143 #endif
5144
5145 DISPATCH_NOINLINE
5146 static void
5147 _dispatch_queue_push_slow(dispatch_queue_t dq, dispatch_object_t dou,
5148 pthread_priority_t pp)
5149 {
5150 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
5151 _dispatch_root_queues_init_once);
5152 _dispatch_queue_push(dq, dou, pp);
5153 }
5154
5155 DISPATCH_NOINLINE
5156 void
5157 _dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
5158 pthread_priority_t pp)
5159 {
5160 _dispatch_assert_is_valid_qos_override(pp);
5161 if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
5162 #if DISPATCH_USE_KEVENT_WORKQUEUE
5163 dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
5164 if (unlikely(ddi && !(ddi->ddi_stashed_pp &
5165 (dispatch_priority_t)_PTHREAD_PRIORITY_FLAGS_MASK))) {
5166 dispatch_assert(_dispatch_root_queues_pred == DLOCK_ONCE_DONE);
5167 return _dispatch_trystash_to_deferred_items(dq, dou, pp, ddi);
5168 }
5169 #endif
5170 #if HAVE_PTHREAD_WORKQUEUE_QOS
5171 // can't use dispatch_once_f() as it would create a frame
5172 if (unlikely(_dispatch_root_queues_pred != DLOCK_ONCE_DONE)) {
5173 return _dispatch_queue_push_slow(dq, dou, pp);
5174 }
5175 if (_dispatch_need_global_root_queue_push_override(dq, pp)) {
5176 return _dispatch_root_queue_push_override(dq, dou, pp);
5177 }
5178 #endif
5179 }
5180 _dispatch_queue_push_inline(dq, dou, pp, 0);
5181 }
5182
5183 DISPATCH_NOINLINE
5184 static void
5185 _dispatch_queue_class_wakeup_enqueue(dispatch_queue_t dq, pthread_priority_t pp,
5186 dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
5187 {
5188 dispatch_queue_t tq;
5189
5190 if (flags & (DISPATCH_WAKEUP_OVERRIDING | DISPATCH_WAKEUP_WAS_OVERRIDDEN)) {
5191 // _dispatch_queue_drain_try_unlock may have reset the override while
5192 // we were becoming the enqueuer
5193 _dispatch_queue_reinstate_override_priority(dq, (dispatch_priority_t)pp);
5194 }
5195 if (!(flags & DISPATCH_WAKEUP_CONSUME)) {
5196 _dispatch_retain(dq);
5197 }
5198 if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
5199 // try_become_enqueuer has no acquire barrier, as the last block
5200 // of a queue asyncing to that queue is not an uncommon pattern
5201 // and in that case the acquire is completely useless
5202 //
5203 // so instead use a thread fence here when we will read the targetq
5204 // pointer because that is the only thing that really requires
5205 // that barrier.
5206 os_atomic_thread_fence(acquire);
5207 tq = dq->do_targetq;
5208 } else {
5209 dispatch_assert(target == DISPATCH_QUEUE_WAKEUP_MGR);
5210 tq = &_dispatch_mgr_q;
5211 }
5212 return _dispatch_queue_push(tq, dq, pp);
5213 }
5214
5215 DISPATCH_NOINLINE
5216 void
5217 _dispatch_queue_class_wakeup(dispatch_queue_t dq, pthread_priority_t pp,
5218 dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
5219 {
5220 uint64_t old_state, new_state, bits = 0;
5221
5222 #if HAVE_PTHREAD_WORKQUEUE_QOS
5223 _dispatch_queue_override_priority(dq, /* inout */ &pp, /* inout */ &flags);
5224 #endif
5225
5226 if (flags & DISPATCH_WAKEUP_FLUSH) {
5227 bits = DISPATCH_QUEUE_DIRTY;
5228 }
5229 if (flags & DISPATCH_WAKEUP_OVERRIDING) {
5230 //
5231 // Setting the dirty bit here is about forcing callers of
5232 // _dispatch_queue_drain_try_unlock() to loop again when an override
5233 // has just been set to close the following race:
5234 //
5235 // Drainer (in drain_try_unlokc():
5236 // override_reset();
5237 // preempted....
5238 //
5239 // Enqueuer:
5240 // atomic_or(oq_override, override, relaxed);
5241 // atomic_or(dq_state, HAS_OVERRIDE, release);
5242 //
5243 // Drainer:
5244 // ... resumes
5245 // successful drain_unlock() and leaks `oq_override`
5246 //
5247 bits = DISPATCH_QUEUE_DIRTY | DISPATCH_QUEUE_HAS_OVERRIDE;
5248 }
5249
5250 if (flags & DISPATCH_WAKEUP_SLOW_WAITER) {
5251 uint64_t pending_barrier_width =
5252 (dq->dq_width - 1) * DISPATCH_QUEUE_WIDTH_INTERVAL;
5253 uint64_t xor_owner_and_set_full_width_and_in_barrier =
5254 _dispatch_tid_self() | DISPATCH_QUEUE_WIDTH_FULL_BIT |
5255 DISPATCH_QUEUE_IN_BARRIER;
5256
5257 #ifdef DLOCK_NOWAITERS_BIT
5258 bits |= DLOCK_NOWAITERS_BIT;
5259 #else
5260 bits |= DLOCK_WAITERS_BIT;
5261 #endif
5262 flags ^= DISPATCH_WAKEUP_SLOW_WAITER;
5263 dispatch_assert(!(flags & DISPATCH_WAKEUP_CONSUME));
5264
5265 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
5266 new_state = old_state | bits;
5267 if (_dq_state_drain_pended(old_state)) {
5268 // same as DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT
5269 // but we want to be more efficient wrt the WAITERS_BIT
5270 new_state &= ~DISPATCH_QUEUE_DRAIN_OWNER_MASK;
5271 new_state &= ~DISPATCH_QUEUE_DRAIN_PENDED;
5272 }
5273 if (unlikely(_dq_state_drain_locked(new_state))) {
5274 #ifdef DLOCK_NOWAITERS_BIT
5275 new_state &= ~(uint64_t)DLOCK_NOWAITERS_BIT;
5276 #endif
5277 } else if (unlikely(!_dq_state_is_runnable(new_state) ||
5278 !(flags & DISPATCH_WAKEUP_FLUSH))) {
5279 // either not runnable, or was not for the first item (26700358)
5280 // so we should not try to lock and handle overrides instead
5281 } else if (_dq_state_has_pending_barrier(old_state) ||
5282 new_state + pending_barrier_width <
5283 DISPATCH_QUEUE_WIDTH_FULL_BIT) {
5284 // see _dispatch_queue_drain_try_lock
5285 new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
5286 new_state ^= xor_owner_and_set_full_width_and_in_barrier;
5287 } else {
5288 new_state |= DISPATCH_QUEUE_ENQUEUED;
5289 }
5290 });
5291 if ((old_state ^ new_state) & DISPATCH_QUEUE_IN_BARRIER) {
5292 return _dispatch_try_lock_transfer_or_wakeup(dq);
5293 }
5294 } else if (bits) {
5295 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{
5296 new_state = old_state | bits;
5297 if (likely(_dq_state_should_wakeup(old_state))) {
5298 new_state |= DISPATCH_QUEUE_ENQUEUED;
5299 }
5300 });
5301 } else {
5302 os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed,{
5303 new_state = old_state;
5304 if (likely(_dq_state_should_wakeup(old_state))) {
5305 new_state |= DISPATCH_QUEUE_ENQUEUED;
5306 } else {
5307 os_atomic_rmw_loop_give_up(break);
5308 }
5309 });
5310 }
5311
5312 if ((old_state ^ new_state) & DISPATCH_QUEUE_ENQUEUED) {
5313 return _dispatch_queue_class_wakeup_enqueue(dq, pp, flags, target);
5314 }
5315
5316 #if HAVE_PTHREAD_WORKQUEUE_QOS
5317 if ((flags & DISPATCH_WAKEUP_OVERRIDING)
5318 && target == DISPATCH_QUEUE_WAKEUP_TARGET) {
5319 return _dispatch_queue_class_wakeup_with_override(dq, pp,
5320 flags, new_state);
5321 }
5322 #endif
5323
5324 if (flags & DISPATCH_WAKEUP_CONSUME) {
5325 return _dispatch_release_tailcall(dq);
5326 }
5327 }
5328
5329 #pragma mark -
5330 #pragma mark dispatch_root_queue_drain
5331
5332 DISPATCH_NOINLINE
5333 static bool
5334 _dispatch_root_queue_drain_one_slow(dispatch_queue_t dq)
5335 {
5336 dispatch_root_queue_context_t qc = dq->do_ctxt;
5337 struct dispatch_object_s *const mediator = (void *)~0ul;
5338 bool pending = false, available = true;
5339 unsigned int sleep_time = DISPATCH_CONTENTION_USLEEP_START;
5340
5341 do {
5342 // Spin for a short while in case the contention is temporary -- e.g.
5343 // when starting up after dispatch_apply, or when executing a few
5344 // short continuations in a row.
5345 if (_dispatch_contention_wait_until(dq->dq_items_head != mediator)) {
5346 goto out;
5347 }
5348 // Since we have serious contention, we need to back off.
5349 if (!pending) {
5350 // Mark this queue as pending to avoid requests for further threads
5351 (void)os_atomic_inc2o(qc, dgq_pending, relaxed);
5352 pending = true;
5353 }
5354 _dispatch_contention_usleep(sleep_time);
5355 if (fastpath(dq->dq_items_head != mediator)) goto out;
5356 sleep_time *= 2;
5357 } while (sleep_time < DISPATCH_CONTENTION_USLEEP_MAX);
5358
5359 // The ratio of work to libdispatch overhead must be bad. This
5360 // scenario implies that there are too many threads in the pool.
5361 // Create a new pending thread and then exit this thread.
5362 // The kernel will grant a new thread when the load subsides.
5363 _dispatch_debug("contention on global queue: %p", dq);
5364 available = false;
5365 out:
5366 if (pending) {
5367 (void)os_atomic_dec2o(qc, dgq_pending, relaxed);
5368 }
5369 if (!available) {
5370 _dispatch_global_queue_poke(dq);
5371 }
5372 return available;
5373 }
5374
5375 DISPATCH_ALWAYS_INLINE
5376 static inline bool
5377 _dispatch_root_queue_drain_one2(dispatch_queue_t dq)
5378 {
5379 // Wait for queue head and tail to be both non-empty or both empty
5380 bool available; // <rdar://problem/15917893>
5381 _dispatch_wait_until((dq->dq_items_head != NULL) ==
5382 (available = (dq->dq_items_tail != NULL)));
5383 return available;
5384 }
5385
5386 DISPATCH_ALWAYS_INLINE_NDEBUG
5387 static inline struct dispatch_object_s *
5388 _dispatch_root_queue_drain_one(dispatch_queue_t dq)
5389 {
5390 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
5391
5392 start:
5393 // The mediator value acts both as a "lock" and a signal
5394 head = os_atomic_xchg2o(dq, dq_items_head, mediator, relaxed);
5395
5396 if (slowpath(head == NULL)) {
5397 // The first xchg on the tail will tell the enqueueing thread that it
5398 // is safe to blindly write out to the head pointer. A cmpxchg honors
5399 // the algorithm.
5400 if (slowpath(!os_atomic_cmpxchg2o(dq, dq_items_head, mediator,
5401 NULL, relaxed))) {
5402 goto start;
5403 }
5404 if (slowpath(dq->dq_items_tail) && // <rdar://problem/14416349>
5405 _dispatch_root_queue_drain_one2(dq)) {
5406 goto start;
5407 }
5408 _dispatch_root_queue_debug("no work on global queue: %p", dq);
5409 return NULL;
5410 }
5411
5412 if (slowpath(head == mediator)) {
5413 // This thread lost the race for ownership of the queue.
5414 if (fastpath(_dispatch_root_queue_drain_one_slow(dq))) {
5415 goto start;
5416 }
5417 return NULL;
5418 }
5419
5420 // Restore the head pointer to a sane value before returning.
5421 // If 'next' is NULL, then this item _might_ be the last item.
5422 next = fastpath(head->do_next);
5423
5424 if (slowpath(!next)) {
5425 os_atomic_store2o(dq, dq_items_head, NULL, relaxed);
5426 // 22708742: set tail to NULL with release, so that NULL write to head
5427 // above doesn't clobber head from concurrent enqueuer
5428 if (os_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL, release)) {
5429 // both head and tail are NULL now
5430 goto out;
5431 }
5432 // There must be a next item now.
5433 _dispatch_wait_until(next = head->do_next);
5434 }
5435
5436 os_atomic_store2o(dq, dq_items_head, next, relaxed);
5437 _dispatch_global_queue_poke(dq);
5438 out:
5439 return head;
5440 }
5441
5442 void
5443 _dispatch_root_queue_drain_deferred_item(dispatch_queue_t dq,
5444 struct dispatch_object_s *dou, pthread_priority_t pp)
5445 {
5446 struct _dispatch_identity_s di;
5447
5448 // fake that we queued `dou` on `dq` for introspection purposes
5449 _dispatch_trace_continuation_push(dq, dou);
5450
5451 pp = _dispatch_priority_inherit_from_root_queue(pp, dq);
5452 _dispatch_queue_set_current(dq);
5453 _dispatch_root_queue_identity_assume(&di, pp);
5454 #if DISPATCH_COCOA_COMPAT
5455 void *pool = _dispatch_last_resort_autorelease_pool_push();
5456 #endif // DISPATCH_COCOA_COMPAT
5457
5458 _dispatch_perfmon_start();
5459 _dispatch_continuation_pop_inline(dou, dq,
5460 DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN);
5461 _dispatch_perfmon_workitem_inc();
5462 _dispatch_perfmon_end();
5463
5464 #if DISPATCH_COCOA_COMPAT
5465 _dispatch_last_resort_autorelease_pool_pop(pool);
5466 #endif // DISPATCH_COCOA_COMPAT
5467 _dispatch_reset_defaultpriority(di.old_pp);
5468 _dispatch_queue_set_current(NULL);
5469
5470 _dispatch_voucher_debug("root queue clear", NULL);
5471 _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
5472 }
5473
5474 DISPATCH_NOT_TAIL_CALLED // prevent tailcall (for Instrument DTrace probe)
5475 static void
5476 _dispatch_root_queue_drain(dispatch_queue_t dq, pthread_priority_t pri)
5477 {
5478 #if DISPATCH_DEBUG
5479 dispatch_queue_t cq;
5480 if (slowpath(cq = _dispatch_queue_get_current())) {
5481 DISPATCH_INTERNAL_CRASH(cq, "Premature thread recycling");
5482 }
5483 #endif
5484 _dispatch_queue_set_current(dq);
5485 if (dq->dq_priority) pri = dq->dq_priority;
5486 pthread_priority_t old_dp = _dispatch_set_defaultpriority(pri, NULL);
5487 #if DISPATCH_COCOA_COMPAT
5488 void *pool = _dispatch_last_resort_autorelease_pool_push();
5489 #endif // DISPATCH_COCOA_COMPAT
5490
5491 _dispatch_perfmon_start();
5492 struct dispatch_object_s *item;
5493 bool reset = false;
5494 while ((item = fastpath(_dispatch_root_queue_drain_one(dq)))) {
5495 if (reset) _dispatch_wqthread_override_reset();
5496 _dispatch_continuation_pop_inline(item, dq,
5497 DISPATCH_INVOKE_WORKER_DRAIN|DISPATCH_INVOKE_REDIRECTING_DRAIN);
5498 _dispatch_perfmon_workitem_inc();
5499 reset = _dispatch_reset_defaultpriority_override();
5500 }
5501 _dispatch_perfmon_end();
5502
5503 #if DISPATCH_COCOA_COMPAT
5504 _dispatch_last_resort_autorelease_pool_pop(pool);
5505 #endif // DISPATCH_COCOA_COMPAT
5506 _dispatch_reset_defaultpriority(old_dp);
5507 _dispatch_queue_set_current(NULL);
5508 }
5509
5510 #pragma mark -
5511 #pragma mark dispatch_worker_thread
5512
5513 #if HAVE_PTHREAD_WORKQUEUES
5514 static void
5515 _dispatch_worker_thread4(void *context)
5516 {
5517 dispatch_queue_t dq = context;
5518 dispatch_root_queue_context_t qc = dq->do_ctxt;
5519
5520 _dispatch_introspection_thread_add();
5521 int pending = (int)os_atomic_dec2o(qc, dgq_pending, relaxed);
5522 dispatch_assert(pending >= 0);
5523 _dispatch_root_queue_drain(dq, _dispatch_get_priority());
5524 _dispatch_voucher_debug("root queue clear", NULL);
5525 _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
5526 }
5527
5528 #if HAVE_PTHREAD_WORKQUEUE_QOS
5529 static void
5530 _dispatch_worker_thread3(pthread_priority_t pp)
5531 {
5532 bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
5533 dispatch_queue_t dq;
5534 pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
5535 _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
5536 dq = _dispatch_get_root_queue_for_priority(pp, overcommit);
5537 return _dispatch_worker_thread4(dq);
5538 }
5539 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
5540
5541 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5542 // 6618342 Contact the team that owns the Instrument DTrace probe before
5543 // renaming this symbol
5544 static void
5545 _dispatch_worker_thread2(int priority, int options,
5546 void *context DISPATCH_UNUSED)
5547 {
5548 dispatch_assert(priority >= 0 && priority < WORKQ_NUM_PRIOQUEUE);
5549 dispatch_assert(!(options & ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT));
5550 dispatch_queue_t dq = _dispatch_wq2root_queues[priority][options];
5551
5552 return _dispatch_worker_thread4(dq);
5553 }
5554 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
5555 #endif // HAVE_PTHREAD_WORKQUEUES
5556
5557 #if DISPATCH_USE_PTHREAD_POOL
5558 // 6618342 Contact the team that owns the Instrument DTrace probe before
5559 // renaming this symbol
5560 static void *
5561 _dispatch_worker_thread(void *context)
5562 {
5563 dispatch_queue_t dq = context;
5564 dispatch_root_queue_context_t qc = dq->do_ctxt;
5565 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
5566
5567 if (pqc->dpq_observer_hooks.queue_will_execute) {
5568 _dispatch_set_pthread_root_queue_observer_hooks(
5569 &pqc->dpq_observer_hooks);
5570 }
5571 if (pqc->dpq_thread_configure) {
5572 pqc->dpq_thread_configure();
5573 }
5574
5575 sigset_t mask;
5576 int r;
5577 // workaround tweaks the kernel workqueue does for us
5578 r = sigfillset(&mask);
5579 (void)dispatch_assume_zero(r);
5580 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
5581 (void)dispatch_assume_zero(r);
5582 _dispatch_introspection_thread_add();
5583
5584 const int64_t timeout = 5ull * NSEC_PER_SEC;
5585 pthread_priority_t old_pri = _dispatch_get_priority();
5586 do {
5587 _dispatch_root_queue_drain(dq, old_pri);
5588 _dispatch_reset_priority_and_voucher(old_pri, NULL);
5589 } while (dispatch_semaphore_wait(&pqc->dpq_thread_mediator,
5590 dispatch_time(0, timeout)) == 0);
5591
5592 (void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);
5593 _dispatch_global_queue_poke(dq);
5594 _dispatch_release(dq);
5595
5596 return NULL;
5597 }
5598
5599 int
5600 _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset)
5601 {
5602 int r;
5603
5604 /* Workaround: 6269619 Not all signals can be delivered on any thread */
5605
5606 r = sigdelset(set, SIGILL);
5607 (void)dispatch_assume_zero(r);
5608 r = sigdelset(set, SIGTRAP);
5609 (void)dispatch_assume_zero(r);
5610 #if HAVE_DECL_SIGEMT
5611 r = sigdelset(set, SIGEMT);
5612 (void)dispatch_assume_zero(r);
5613 #endif
5614 r = sigdelset(set, SIGFPE);
5615 (void)dispatch_assume_zero(r);
5616 r = sigdelset(set, SIGBUS);
5617 (void)dispatch_assume_zero(r);
5618 r = sigdelset(set, SIGSEGV);
5619 (void)dispatch_assume_zero(r);
5620 r = sigdelset(set, SIGSYS);
5621 (void)dispatch_assume_zero(r);
5622 r = sigdelset(set, SIGPIPE);
5623 (void)dispatch_assume_zero(r);
5624
5625 return pthread_sigmask(how, set, oset);
5626 }
5627 #endif // DISPATCH_USE_PTHREAD_POOL
5628
5629 #pragma mark -
5630 #pragma mark dispatch_runloop_queue
5631
5632 static bool _dispatch_program_is_probably_callback_driven;
5633
5634 #if DISPATCH_COCOA_COMPAT
5635
5636 dispatch_queue_t
5637 _dispatch_runloop_root_queue_create_4CF(const char *label, unsigned long flags)
5638 {
5639 dispatch_queue_t dq;
5640 size_t dqs;
5641
5642 if (slowpath(flags)) {
5643 return DISPATCH_BAD_INPUT;
5644 }
5645 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
5646 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_runloop), dqs);
5647 _dispatch_queue_init(dq, DQF_THREAD_BOUND | DQF_CANNOT_TRYSYNC, 1, false);
5648 dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,true);
5649 dq->dq_label = label ? label : "runloop-queue"; // no-copy contract
5650 _dispatch_runloop_queue_handle_init(dq);
5651 _dispatch_queue_set_bound_thread(dq);
5652 _dispatch_object_debug(dq, "%s", __func__);
5653 return _dispatch_introspection_queue_create(dq);
5654 }
5655
5656 void
5657 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq)
5658 {
5659 _dispatch_object_debug(dq, "%s", __func__);
5660
5661 pthread_priority_t pp = _dispatch_queue_reset_override_priority(dq, true);
5662 _dispatch_queue_clear_bound_thread(dq);
5663 dx_wakeup(dq, pp, DISPATCH_WAKEUP_FLUSH);
5664 if (pp) _dispatch_thread_override_end(DISPATCH_QUEUE_DRAIN_OWNER(dq), dq);
5665 }
5666
5667 void
5668 _dispatch_runloop_queue_dispose(dispatch_queue_t dq)
5669 {
5670 _dispatch_object_debug(dq, "%s", __func__);
5671 _dispatch_introspection_queue_dispose(dq);
5672 _dispatch_runloop_queue_handle_dispose(dq);
5673 _dispatch_queue_destroy(dq);
5674 }
5675
5676 bool
5677 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq)
5678 {
5679 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
5680 DISPATCH_CLIENT_CRASH(dq->do_vtable, "Not a runloop queue");
5681 }
5682 dispatch_retain(dq);
5683 bool r = _dispatch_runloop_queue_drain_one(dq);
5684 dispatch_release(dq);
5685 return r;
5686 }
5687
5688 void
5689 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq)
5690 {
5691 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
5692 DISPATCH_CLIENT_CRASH(dq->do_vtable, "Not a runloop queue");
5693 }
5694 _dispatch_runloop_queue_wakeup(dq, 0, false);
5695 }
5696
5697 dispatch_runloop_handle_t
5698 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq)
5699 {
5700 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
5701 DISPATCH_CLIENT_CRASH(dq->do_vtable, "Not a runloop queue");
5702 }
5703 return _dispatch_runloop_queue_get_handle(dq);
5704 }
5705
5706 static void
5707 _dispatch_runloop_queue_handle_init(void *ctxt)
5708 {
5709 dispatch_queue_t dq = (dispatch_queue_t)ctxt;
5710 dispatch_runloop_handle_t handle;
5711
5712 _dispatch_fork_becomes_unsafe();
5713
5714 #if TARGET_OS_MAC
5715 mach_port_t mp;
5716 kern_return_t kr;
5717 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &mp);
5718 DISPATCH_VERIFY_MIG(kr);
5719 (void)dispatch_assume_zero(kr);
5720 kr = mach_port_insert_right(mach_task_self(), mp, mp,
5721 MACH_MSG_TYPE_MAKE_SEND);
5722 DISPATCH_VERIFY_MIG(kr);
5723 (void)dispatch_assume_zero(kr);
5724 if (dq != &_dispatch_main_q) {
5725 struct mach_port_limits limits = {
5726 .mpl_qlimit = 1,
5727 };
5728 kr = mach_port_set_attributes(mach_task_self(), mp,
5729 MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits,
5730 sizeof(limits));
5731 DISPATCH_VERIFY_MIG(kr);
5732 (void)dispatch_assume_zero(kr);
5733 }
5734 handle = mp;
5735 #elif defined(__linux__)
5736 int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
5737 if (fd == -1) {
5738 int err = errno;
5739 switch (err) {
5740 case EMFILE:
5741 DISPATCH_CLIENT_CRASH(err, "eventfd() failure: "
5742 "process is out of file descriptors");
5743 break;
5744 case ENFILE:
5745 DISPATCH_CLIENT_CRASH(err, "eventfd() failure: "
5746 "system is out of file descriptors");
5747 break;
5748 case ENOMEM:
5749 DISPATCH_CLIENT_CRASH(err, "eventfd() failure: "
5750 "kernel is out of memory");
5751 break;
5752 default:
5753 DISPATCH_INTERNAL_CRASH(err, "eventfd() failure");
5754 break;
5755 }
5756 }
5757 handle = fd;
5758 #else
5759 #error "runloop support not implemented on this platform"
5760 #endif
5761 _dispatch_runloop_queue_set_handle(dq, handle);
5762
5763 _dispatch_program_is_probably_callback_driven = true;
5764 }
5765
5766 static void
5767 _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq)
5768 {
5769 dispatch_runloop_handle_t handle = _dispatch_runloop_queue_get_handle(dq);
5770 if (!_dispatch_runloop_handle_is_valid(handle)) {
5771 return;
5772 }
5773 dq->do_ctxt = NULL;
5774 #if TARGET_OS_MAC
5775 mach_port_t mp = handle;
5776 kern_return_t kr = mach_port_deallocate(mach_task_self(), mp);
5777 DISPATCH_VERIFY_MIG(kr);
5778 (void)dispatch_assume_zero(kr);
5779 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE, -1);
5780 DISPATCH_VERIFY_MIG(kr);
5781 (void)dispatch_assume_zero(kr);
5782 #elif defined(__linux__)
5783 int rc = close(handle);
5784 (void)dispatch_assume_zero(rc);
5785 #else
5786 #error "runloop support not implemented on this platform"
5787 #endif
5788 }
5789
5790 #pragma mark -
5791 #pragma mark dispatch_main_queue
5792
5793 dispatch_runloop_handle_t
5794 _dispatch_get_main_queue_handle_4CF(void)
5795 {
5796 dispatch_queue_t dq = &_dispatch_main_q;
5797 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
5798 _dispatch_runloop_queue_handle_init);
5799 return _dispatch_runloop_queue_get_handle(dq);
5800 }
5801
5802 #if TARGET_OS_MAC
5803 dispatch_runloop_handle_t
5804 _dispatch_get_main_queue_port_4CF(void)
5805 {
5806 return _dispatch_get_main_queue_handle_4CF();
5807 }
5808 #endif
5809
5810 static bool main_q_is_draining;
5811
5812 // 6618342 Contact the team that owns the Instrument DTrace probe before
5813 // renaming this symbol
5814 DISPATCH_NOINLINE
5815 static void
5816 _dispatch_queue_set_mainq_drain_state(bool arg)
5817 {
5818 main_q_is_draining = arg;
5819 }
5820
5821 void
5822 _dispatch_main_queue_callback_4CF(
5823 #if TARGET_OS_MAC
5824 mach_msg_header_t *_Null_unspecified msg
5825 #else
5826 void *ignored
5827 #endif
5828 DISPATCH_UNUSED)
5829 {
5830 if (main_q_is_draining) {
5831 return;
5832 }
5833 _dispatch_queue_set_mainq_drain_state(true);
5834 _dispatch_main_queue_drain();
5835 _dispatch_queue_set_mainq_drain_state(false);
5836 }
5837
5838 #endif
5839
5840 void
5841 dispatch_main(void)
5842 {
5843 #if HAVE_PTHREAD_MAIN_NP
5844 if (pthread_main_np()) {
5845 #endif
5846 _dispatch_object_debug(&_dispatch_main_q, "%s", __func__);
5847 _dispatch_program_is_probably_callback_driven = true;
5848 _dispatch_ktrace0(ARIADNE_ENTER_DISPATCH_MAIN_CODE);
5849 #ifdef __linux__
5850 // On Linux, if the main thread calls pthread_exit, the process becomes a zombie.
5851 // To avoid that, just before calling pthread_exit we register a TSD destructor
5852 // that will call _dispatch_sig_thread -- thus capturing the main thread in sigsuspend.
5853 // This relies on an implementation detail (currently true in glibc) that TSD destructors
5854 // will be called in the order of creation to cause all the TSD cleanup functions to
5855 // run before the thread becomes trapped in sigsuspend.
5856 pthread_key_t dispatch_main_key;
5857 pthread_key_create(&dispatch_main_key, _dispatch_sig_thread);
5858 pthread_setspecific(dispatch_main_key, &dispatch_main_key);
5859 #endif
5860 pthread_exit(NULL);
5861 DISPATCH_INTERNAL_CRASH(errno, "pthread_exit() returned");
5862 #if HAVE_PTHREAD_MAIN_NP
5863 }
5864 DISPATCH_CLIENT_CRASH(0, "dispatch_main() must be called on the main thread");
5865 #endif
5866 }
5867
5868 DISPATCH_NOINLINE DISPATCH_NORETURN
5869 static void
5870 _dispatch_sigsuspend(void)
5871 {
5872 static const sigset_t mask;
5873
5874 for (;;) {
5875 sigsuspend(&mask);
5876 }
5877 }
5878
5879 DISPATCH_NORETURN
5880 static void
5881 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED)
5882 {
5883 // never returns, so burn bridges behind us
5884 _dispatch_clear_stack(0);
5885 _dispatch_sigsuspend();
5886 }
5887
5888 DISPATCH_NOINLINE
5889 static void
5890 _dispatch_queue_cleanup2(void)
5891 {
5892 dispatch_queue_t dq = &_dispatch_main_q;
5893 _dispatch_queue_clear_bound_thread(dq);
5894
5895 // <rdar://problem/22623242>
5896 // Here is what happens when both this cleanup happens because of
5897 // dispatch_main() being called, and a concurrent enqueuer makes the queue
5898 // non empty.
5899 //
5900 // _dispatch_queue_cleanup2:
5901 // atomic_and(dq_is_thread_bound, ~DQF_THREAD_BOUND, relaxed);
5902 // maximal_barrier();
5903 // if (load(dq_items_tail, seq_cst)) {
5904 // // do the wake up the normal serial queue way
5905 // } else {
5906 // // do no wake up <----
5907 // }
5908 //
5909 // enqueuer:
5910 // store(dq_items_tail, new_tail, release);
5911 // if (load(dq_is_thread_bound, relaxed)) {
5912 // // do the wake up the runloop way <----
5913 // } else {
5914 // // do the wake up the normal serial way
5915 // }
5916 //
5917 // what would be bad is to take both paths marked <---- because the queue
5918 // wouldn't be woken up until the next time it's used (which may never
5919 // happen)
5920 //
5921 // An enqueuer that speculates the load of the old value of thread_bound
5922 // and then does the store may wake up the main queue the runloop way.
5923 // But then, the cleanup thread will see that store because the load
5924 // of dq_items_tail is sequentially consistent, and we have just thrown away
5925 // our pipeline.
5926 //
5927 // By the time cleanup2() is out of the maximally synchronizing barrier,
5928 // no other thread can speculate the wrong load anymore, and both cleanup2()
5929 // and a concurrent enqueuer would treat the queue in the standard non
5930 // thread bound way
5931
5932 _dispatch_queue_atomic_flags_clear(dq,
5933 DQF_THREAD_BOUND | DQF_CANNOT_TRYSYNC);
5934 os_atomic_maximally_synchronizing_barrier();
5935 // no need to drop the override, the thread will die anyway
5936 // the barrier above includes an acquire, so it's ok to do this raw
5937 // call to dx_wakeup(0)
5938 dx_wakeup(dq, 0, 0);
5939
5940 // overload the "probably" variable to mean that dispatch_main() or
5941 // similar non-POSIX API was called
5942 // this has to run before the DISPATCH_COCOA_COMPAT below
5943 // See dispatch_main for call to _dispatch_sig_thread on linux.
5944 #ifndef __linux__
5945 if (_dispatch_program_is_probably_callback_driven) {
5946 _dispatch_barrier_async_detached_f(_dispatch_get_root_queue(
5947 _DISPATCH_QOS_CLASS_DEFAULT, true), NULL, _dispatch_sig_thread);
5948 sleep(1); // workaround 6778970
5949 }
5950 #endif
5951
5952 #if DISPATCH_COCOA_COMPAT
5953 dispatch_once_f(&_dispatch_main_q_handle_pred, dq,
5954 _dispatch_runloop_queue_handle_init);
5955 _dispatch_runloop_queue_handle_dispose(dq);
5956 #endif
5957 }
5958
5959 static void
5960 _dispatch_queue_cleanup(void *ctxt)
5961 {
5962 if (ctxt == &_dispatch_main_q) {
5963 return _dispatch_queue_cleanup2();
5964 }
5965 // POSIX defines that destructors are only called if 'ctxt' is non-null
5966 DISPATCH_INTERNAL_CRASH(ctxt,
5967 "Premature thread exit while a dispatch queue is running");
5968 }
5969
5970 static void
5971 _dispatch_deferred_items_cleanup(void *ctxt)
5972 {
5973 // POSIX defines that destructors are only called if 'ctxt' is non-null
5974 DISPATCH_INTERNAL_CRASH(ctxt,
5975 "Premature thread exit with unhandled deferred items");
5976 }
5977
5978 static void
5979 _dispatch_frame_cleanup(void *ctxt)
5980 {
5981 // POSIX defines that destructors are only called if 'ctxt' is non-null
5982 DISPATCH_INTERNAL_CRASH(ctxt,
5983 "Premature thread exit while a dispatch frame is active");
5984 }
5985
5986 static void
5987 _dispatch_context_cleanup(void *ctxt)
5988 {
5989 // POSIX defines that destructors are only called if 'ctxt' is non-null
5990 DISPATCH_INTERNAL_CRASH(ctxt,
5991 "Premature thread exit while a dispatch context is set");
5992 }