]> git.saurik.com Git - apple/libdispatch.git/blob - src/queue.c
libdispatch-500.10.1.tar.gz
[apple/libdispatch.git] / src / queue.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #endif
25
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
29 #endif
30 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
31 #define DISPATCH_USE_PTHREAD_POOL 1
32 #endif
33 #if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || DISPATCH_DEBUG) \
34 && !defined(DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK)
35 #define DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1
36 #endif
37 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK && \
38 !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
39 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
40 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
41 #endif
42 #if HAVE_PTHREAD_WORKQUEUE_QOS && !DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
43 #undef HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
44 #define HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 0
45 #endif
46 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
47 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
48 #define pthread_workqueue_t void*
49 #endif
50
51 static void _dispatch_cache_cleanup(void *value);
52 static void _dispatch_async_f_redirect(dispatch_queue_t dq,
53 dispatch_continuation_t dc, pthread_priority_t pp);
54 static void _dispatch_queue_cleanup(void *ctxt);
55 static inline void _dispatch_queue_wakeup_global2(dispatch_queue_t dq,
56 unsigned int n);
57 static inline void _dispatch_queue_wakeup_global(dispatch_queue_t dq);
58 static inline _dispatch_thread_semaphore_t
59 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq);
60 static inline bool _dispatch_queue_prepare_override(dispatch_queue_t dq,
61 dispatch_queue_t tq, pthread_priority_t p);
62 static inline void _dispatch_queue_push_override(dispatch_queue_t dq,
63 dispatch_queue_t tq, pthread_priority_t p, bool owning);
64 #if HAVE_PTHREAD_WORKQUEUES
65 static void _dispatch_worker_thread4(void *context);
66 #if HAVE_PTHREAD_WORKQUEUE_QOS
67 static void _dispatch_worker_thread3(pthread_priority_t priority);
68 #endif
69 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
70 static void _dispatch_worker_thread2(int priority, int options, void *context);
71 #endif
72 #endif
73 #if DISPATCH_USE_PTHREAD_POOL
74 static void *_dispatch_worker_thread(void *context);
75 static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
76 #endif
77
78 #if DISPATCH_COCOA_COMPAT
79 static dispatch_once_t _dispatch_main_q_port_pred;
80 static dispatch_queue_t _dispatch_main_queue_wakeup(void);
81 unsigned long _dispatch_runloop_queue_wakeup(dispatch_queue_t dq);
82 static void _dispatch_runloop_queue_port_init(void *ctxt);
83 static void _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq);
84 #endif
85
86 static void _dispatch_root_queues_init(void *context);
87 static dispatch_once_t _dispatch_root_queues_pred;
88
89 #pragma mark -
90 #pragma mark dispatch_root_queue
91
92 struct dispatch_pthread_root_queue_context_s {
93 pthread_attr_t dpq_thread_attr;
94 dispatch_block_t dpq_thread_configure;
95 struct dispatch_semaphore_s dpq_thread_mediator;
96 dispatch_pthread_root_queue_observer_hooks_s dpq_observer_hooks;
97 };
98 typedef struct dispatch_pthread_root_queue_context_s *
99 dispatch_pthread_root_queue_context_t;
100
101 #if DISPATCH_ENABLE_THREAD_POOL
102 static struct dispatch_pthread_root_queue_context_s
103 _dispatch_pthread_root_queue_contexts[] = {
104 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {
105 .dpq_thread_mediator = {
106 .do_vtable = DISPATCH_VTABLE(semaphore),
107 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
108 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
109 }},
110 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {
111 .dpq_thread_mediator = {
112 .do_vtable = DISPATCH_VTABLE(semaphore),
113 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
114 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
115 }},
116 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {
117 .dpq_thread_mediator = {
118 .do_vtable = DISPATCH_VTABLE(semaphore),
119 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
120 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
121 }},
122 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {
123 .dpq_thread_mediator = {
124 .do_vtable = DISPATCH_VTABLE(semaphore),
125 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
126 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
127 }},
128 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {
129 .dpq_thread_mediator = {
130 .do_vtable = DISPATCH_VTABLE(semaphore),
131 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
132 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
133 }},
134 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {
135 .dpq_thread_mediator = {
136 .do_vtable = DISPATCH_VTABLE(semaphore),
137 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
138 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
139 }},
140 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {
141 .dpq_thread_mediator = {
142 .do_vtable = DISPATCH_VTABLE(semaphore),
143 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
144 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
145 }},
146 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {
147 .dpq_thread_mediator = {
148 .do_vtable = DISPATCH_VTABLE(semaphore),
149 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
150 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
151 }},
152 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {
153 .dpq_thread_mediator = {
154 .do_vtable = DISPATCH_VTABLE(semaphore),
155 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
156 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
157 }},
158 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {
159 .dpq_thread_mediator = {
160 .do_vtable = DISPATCH_VTABLE(semaphore),
161 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
162 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
163 }},
164 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {
165 .dpq_thread_mediator = {
166 .do_vtable = DISPATCH_VTABLE(semaphore),
167 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
168 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
169 }},
170 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {
171 .dpq_thread_mediator = {
172 .do_vtable = DISPATCH_VTABLE(semaphore),
173 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
174 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
175 }},
176 };
177 #endif
178
179 #define MAX_PTHREAD_COUNT 255
180
181 struct dispatch_root_queue_context_s {
182 union {
183 struct {
184 unsigned int volatile dgq_pending;
185 #if HAVE_PTHREAD_WORKQUEUES
186 qos_class_t dgq_qos;
187 int dgq_wq_priority, dgq_wq_options;
188 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
189 pthread_workqueue_t dgq_kworkqueue;
190 #endif
191 #endif // HAVE_PTHREAD_WORKQUEUES
192 #if DISPATCH_USE_PTHREAD_POOL
193 void *dgq_ctxt;
194 uint32_t volatile dgq_thread_pool_size;
195 #endif
196 };
197 char _dgq_pad[DISPATCH_CACHELINE_SIZE];
198 };
199 };
200 typedef struct dispatch_root_queue_context_s *dispatch_root_queue_context_t;
201
202 DISPATCH_CACHELINE_ALIGN
203 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
204 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
205 #if HAVE_PTHREAD_WORKQUEUES
206 .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE,
207 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
208 .dgq_wq_options = 0,
209 #endif
210 #if DISPATCH_ENABLE_THREAD_POOL
211 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
212 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS],
213 #endif
214 }}},
215 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
216 #if HAVE_PTHREAD_WORKQUEUES
217 .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE,
218 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
219 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
220 #endif
221 #if DISPATCH_ENABLE_THREAD_POOL
222 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
223 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT],
224 #endif
225 }}},
226 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
227 #if HAVE_PTHREAD_WORKQUEUES
228 .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND,
229 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
230 .dgq_wq_options = 0,
231 #endif
232 #if DISPATCH_ENABLE_THREAD_POOL
233 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
234 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
235 #endif
236 }}},
237 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
238 #if HAVE_PTHREAD_WORKQUEUES
239 .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND,
240 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
241 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
242 #endif
243 #if DISPATCH_ENABLE_THREAD_POOL
244 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
245 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
246 #endif
247 }}},
248 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
249 #if HAVE_PTHREAD_WORKQUEUES
250 .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY,
251 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
252 .dgq_wq_options = 0,
253 #endif
254 #if DISPATCH_ENABLE_THREAD_POOL
255 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
256 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
257 #endif
258 }}},
259 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
260 #if HAVE_PTHREAD_WORKQUEUES
261 .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY,
262 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
263 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
264 #endif
265 #if DISPATCH_ENABLE_THREAD_POOL
266 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
267 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
268 #endif
269 }}},
270 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
271 #if HAVE_PTHREAD_WORKQUEUES
272 .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT,
273 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
274 .dgq_wq_options = 0,
275 #endif
276 #if DISPATCH_ENABLE_THREAD_POOL
277 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
278 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
279 #endif
280 }}},
281 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
282 #if HAVE_PTHREAD_WORKQUEUES
283 .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT,
284 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
285 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
286 #endif
287 #if DISPATCH_ENABLE_THREAD_POOL
288 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
289 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
290 #endif
291 }}},
292 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
293 #if HAVE_PTHREAD_WORKQUEUES
294 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED,
295 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
296 .dgq_wq_options = 0,
297 #endif
298 #if DISPATCH_ENABLE_THREAD_POOL
299 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
300 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
301 #endif
302 }}},
303 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
304 #if HAVE_PTHREAD_WORKQUEUES
305 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED,
306 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
307 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
308 #endif
309 #if DISPATCH_ENABLE_THREAD_POOL
310 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
311 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
312 #endif
313 }}},
314 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
315 #if HAVE_PTHREAD_WORKQUEUES
316 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE,
317 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
318 .dgq_wq_options = 0,
319 #endif
320 #if DISPATCH_ENABLE_THREAD_POOL
321 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
322 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS],
323 #endif
324 }}},
325 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
326 #if HAVE_PTHREAD_WORKQUEUES
327 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE,
328 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
329 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
330 #endif
331 #if DISPATCH_ENABLE_THREAD_POOL
332 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
333 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT],
334 #endif
335 }}},
336 };
337
338 // 6618342 Contact the team that owns the Instrument DTrace probe before
339 // renaming this symbol
340 // dq_running is set to 2 so that barrier operations go through the slow path
341 DISPATCH_CACHELINE_ALIGN
342 struct dispatch_queue_s _dispatch_root_queues[] = {
343 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {
344 .do_vtable = DISPATCH_VTABLE(queue_root),
345 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
346 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
347 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
348 .do_ctxt = &_dispatch_root_queue_contexts[
349 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS],
350 .dq_label = "com.apple.root.maintenance-qos",
351 .dq_running = 2,
352 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
353 .dq_override_voucher = DISPATCH_NO_VOUCHER,
354 .dq_serialnum = 4,
355 },
356 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {
357 .do_vtable = DISPATCH_VTABLE(queue_root),
358 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
359 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
360 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
361 .do_ctxt = &_dispatch_root_queue_contexts[
362 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT],
363 .dq_label = "com.apple.root.maintenance-qos.overcommit",
364 .dq_running = 2,
365 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
366 .dq_override_voucher = DISPATCH_NO_VOUCHER,
367 .dq_serialnum = 5,
368 },
369 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {
370 .do_vtable = DISPATCH_VTABLE(queue_root),
371 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
372 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
373 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
374 .do_ctxt = &_dispatch_root_queue_contexts[
375 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
376 .dq_label = "com.apple.root.background-qos",
377 .dq_running = 2,
378 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
379 .dq_override_voucher = DISPATCH_NO_VOUCHER,
380 .dq_serialnum = 6,
381 },
382 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {
383 .do_vtable = DISPATCH_VTABLE(queue_root),
384 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
385 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
386 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
387 .do_ctxt = &_dispatch_root_queue_contexts[
388 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
389 .dq_label = "com.apple.root.background-qos.overcommit",
390 .dq_running = 2,
391 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
392 .dq_override_voucher = DISPATCH_NO_VOUCHER,
393 .dq_serialnum = 7,
394 },
395 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {
396 .do_vtable = DISPATCH_VTABLE(queue_root),
397 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
398 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
399 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
400 .do_ctxt = &_dispatch_root_queue_contexts[
401 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
402 .dq_label = "com.apple.root.utility-qos",
403 .dq_running = 2,
404 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
405 .dq_override_voucher = DISPATCH_NO_VOUCHER,
406 .dq_serialnum = 8,
407 },
408 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {
409 .do_vtable = DISPATCH_VTABLE(queue_root),
410 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
411 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
412 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
413 .do_ctxt = &_dispatch_root_queue_contexts[
414 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
415 .dq_label = "com.apple.root.utility-qos.overcommit",
416 .dq_running = 2,
417 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
418 .dq_override_voucher = DISPATCH_NO_VOUCHER,
419 .dq_serialnum = 9,
420 },
421 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {
422 .do_vtable = DISPATCH_VTABLE(queue_root),
423 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
424 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
425 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
426 .do_ctxt = &_dispatch_root_queue_contexts[
427 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
428 .dq_label = "com.apple.root.default-qos",
429 .dq_running = 2,
430 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
431 .dq_override_voucher = DISPATCH_NO_VOUCHER,
432 .dq_serialnum = 10,
433 },
434 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {
435 .do_vtable = DISPATCH_VTABLE(queue_root),
436 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
437 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
438 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
439 .do_ctxt = &_dispatch_root_queue_contexts[
440 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
441 .dq_label = "com.apple.root.default-qos.overcommit",
442 .dq_running = 2,
443 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
444 .dq_override_voucher = DISPATCH_NO_VOUCHER,
445 .dq_serialnum = 11,
446 },
447 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {
448 .do_vtable = DISPATCH_VTABLE(queue_root),
449 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
450 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
451 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
452 .do_ctxt = &_dispatch_root_queue_contexts[
453 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
454 .dq_label = "com.apple.root.user-initiated-qos",
455 .dq_running = 2,
456 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
457 .dq_override_voucher = DISPATCH_NO_VOUCHER,
458 .dq_serialnum = 12,
459 },
460 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {
461 .do_vtable = DISPATCH_VTABLE(queue_root),
462 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
463 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
464 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
465 .do_ctxt = &_dispatch_root_queue_contexts[
466 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
467 .dq_label = "com.apple.root.user-initiated-qos.overcommit",
468 .dq_running = 2,
469 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
470 .dq_override_voucher = DISPATCH_NO_VOUCHER,
471 .dq_serialnum = 13,
472 },
473 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {
474 .do_vtable = DISPATCH_VTABLE(queue_root),
475 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
476 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
477 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
478 .do_ctxt = &_dispatch_root_queue_contexts[
479 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS],
480 .dq_label = "com.apple.root.user-interactive-qos",
481 .dq_running = 2,
482 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
483 .dq_override_voucher = DISPATCH_NO_VOUCHER,
484 .dq_serialnum = 14,
485 },
486 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {
487 .do_vtable = DISPATCH_VTABLE(queue_root),
488 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
489 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
490 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
491 .do_ctxt = &_dispatch_root_queue_contexts[
492 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT],
493 .dq_label = "com.apple.root.user-interactive-qos.overcommit",
494 .dq_running = 2,
495 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
496 .dq_override_voucher = DISPATCH_NO_VOUCHER,
497 .dq_serialnum = 15,
498 },
499 };
500
501 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
502 static const dispatch_queue_t _dispatch_wq2root_queues[][2] = {
503 [WORKQ_BG_PRIOQUEUE][0] = &_dispatch_root_queues[
504 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
505 [WORKQ_BG_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
506 &_dispatch_root_queues[
507 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
508 [WORKQ_LOW_PRIOQUEUE][0] = &_dispatch_root_queues[
509 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
510 [WORKQ_LOW_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
511 &_dispatch_root_queues[
512 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
513 [WORKQ_DEFAULT_PRIOQUEUE][0] = &_dispatch_root_queues[
514 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
515 [WORKQ_DEFAULT_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
516 &_dispatch_root_queues[
517 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
518 [WORKQ_HIGH_PRIOQUEUE][0] = &_dispatch_root_queues[
519 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
520 [WORKQ_HIGH_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
521 &_dispatch_root_queues[
522 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
523 };
524 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
525
526 #define DISPATCH_PRIORITY_COUNT 5
527
528 enum {
529 // No DISPATCH_PRIORITY_IDX_MAINTENANCE define because there is no legacy
530 // maintenance priority
531 DISPATCH_PRIORITY_IDX_BACKGROUND = 0,
532 DISPATCH_PRIORITY_IDX_NON_INTERACTIVE,
533 DISPATCH_PRIORITY_IDX_LOW,
534 DISPATCH_PRIORITY_IDX_DEFAULT,
535 DISPATCH_PRIORITY_IDX_HIGH,
536 };
537
538 static qos_class_t _dispatch_priority2qos[] = {
539 [DISPATCH_PRIORITY_IDX_BACKGROUND] = _DISPATCH_QOS_CLASS_BACKGROUND,
540 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = _DISPATCH_QOS_CLASS_UTILITY,
541 [DISPATCH_PRIORITY_IDX_LOW] = _DISPATCH_QOS_CLASS_UTILITY,
542 [DISPATCH_PRIORITY_IDX_DEFAULT] = _DISPATCH_QOS_CLASS_DEFAULT,
543 [DISPATCH_PRIORITY_IDX_HIGH] = _DISPATCH_QOS_CLASS_USER_INITIATED,
544 };
545
546 #if HAVE_PTHREAD_WORKQUEUE_QOS
547 static const int _dispatch_priority2wq[] = {
548 [DISPATCH_PRIORITY_IDX_BACKGROUND] = WORKQ_BG_PRIOQUEUE,
549 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = WORKQ_NON_INTERACTIVE_PRIOQUEUE,
550 [DISPATCH_PRIORITY_IDX_LOW] = WORKQ_LOW_PRIOQUEUE,
551 [DISPATCH_PRIORITY_IDX_DEFAULT] = WORKQ_DEFAULT_PRIOQUEUE,
552 [DISPATCH_PRIORITY_IDX_HIGH] = WORKQ_HIGH_PRIOQUEUE,
553 };
554 #endif
555
556 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
557 static struct dispatch_queue_s _dispatch_mgr_root_queue;
558 #else
559 #define _dispatch_mgr_root_queue \
560 _dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY]
561 #endif
562
563 // 6618342 Contact the team that owns the Instrument DTrace probe before
564 // renaming this symbol
565 DISPATCH_CACHELINE_ALIGN
566 struct dispatch_queue_s _dispatch_mgr_q = {
567 .do_vtable = DISPATCH_VTABLE(queue_mgr),
568 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
569 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
570 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
571 .do_targetq = &_dispatch_mgr_root_queue,
572 .dq_label = "com.apple.libdispatch-manager",
573 .dq_width = 1,
574 .dq_is_thread_bound = 1,
575 .dq_override_voucher = DISPATCH_NO_VOUCHER,
576 .dq_serialnum = 2,
577 };
578
579 dispatch_queue_t
580 dispatch_get_global_queue(long priority, unsigned long flags)
581 {
582 if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
583 return NULL;
584 }
585 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
586 _dispatch_root_queues_init);
587 qos_class_t qos;
588 switch (priority) {
589 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
590 case _DISPATCH_QOS_CLASS_MAINTENANCE:
591 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS]
592 .dq_priority) {
593 // map maintenance to background on old kernel
594 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND];
595 } else {
596 qos = (qos_class_t)priority;
597 }
598 break;
599 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
600 case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
601 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND];
602 break;
603 case DISPATCH_QUEUE_PRIORITY_NON_INTERACTIVE:
604 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_NON_INTERACTIVE];
605 break;
606 case DISPATCH_QUEUE_PRIORITY_LOW:
607 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_LOW];
608 break;
609 case DISPATCH_QUEUE_PRIORITY_DEFAULT:
610 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_DEFAULT];
611 break;
612 case DISPATCH_QUEUE_PRIORITY_HIGH:
613 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH];
614 break;
615 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE:
616 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
617 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS]
618 .dq_priority) {
619 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH];
620 break;
621 }
622 #endif
623 // fallthrough
624 default:
625 qos = (qos_class_t)priority;
626 break;
627 }
628 return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT);
629 }
630
631 DISPATCH_ALWAYS_INLINE
632 static inline dispatch_queue_t
633 _dispatch_get_current_queue(void)
634 {
635 return _dispatch_queue_get_current() ?:
636 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true);
637 }
638
639 dispatch_queue_t
640 dispatch_get_current_queue(void)
641 {
642 return _dispatch_get_current_queue();
643 }
644
645 DISPATCH_ALWAYS_INLINE
646 static inline bool
647 _dispatch_queue_targets_queue(dispatch_queue_t dq1, dispatch_queue_t dq2)
648 {
649 while (dq1) {
650 if (dq1 == dq2) {
651 return true;
652 }
653 dq1 = dq1->do_targetq;
654 }
655 return false;
656 }
657
658 #define DISPATCH_ASSERT_QUEUE_MESSAGE "BUG in client of libdispatch: " \
659 "Assertion failed: Block was run on an unexpected queue"
660
661 DISPATCH_NOINLINE
662 static void
663 _dispatch_assert_queue_fail(dispatch_queue_t dq, bool expected)
664 {
665 char *msg;
666 asprintf(&msg, "%s\n%s queue: 0x%p[%s]", DISPATCH_ASSERT_QUEUE_MESSAGE,
667 expected ? "Expected" : "Unexpected", dq, dq->dq_label ?
668 dq->dq_label : "");
669 _dispatch_log("%s", msg);
670 _dispatch_set_crash_log_message_dynamic(msg);
671 _dispatch_hardware_crash();
672 free(msg);
673 }
674
675 void
676 dispatch_assert_queue(dispatch_queue_t dq)
677 {
678 if (slowpath(!(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE))) {
679 DISPATCH_CLIENT_CRASH("invalid queue passed to "
680 "dispatch_assert_queue()");
681 }
682 dispatch_queue_t cq = _dispatch_queue_get_current();
683 if (fastpath(cq) && fastpath(_dispatch_queue_targets_queue(cq, dq))) {
684 return;
685 }
686 _dispatch_assert_queue_fail(dq, true);
687 }
688
689 void
690 dispatch_assert_queue_not(dispatch_queue_t dq)
691 {
692 if (slowpath(!(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE))) {
693 DISPATCH_CLIENT_CRASH("invalid queue passed to "
694 "dispatch_assert_queue_not()");
695 }
696 dispatch_queue_t cq = _dispatch_queue_get_current();
697 if (slowpath(cq) && slowpath(_dispatch_queue_targets_queue(cq, dq))) {
698 _dispatch_assert_queue_fail(dq, false);
699 }
700 }
701
702 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
703 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
704 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
705 #else
706 #define _dispatch_root_queue_debug(...)
707 #define _dispatch_debug_root_queue(...)
708 #endif
709
710 #pragma mark -
711 #pragma mark dispatch_init
712
713 #if HAVE_PTHREAD_WORKQUEUE_QOS
714 int _dispatch_set_qos_class_enabled;
715 pthread_priority_t _dispatch_background_priority;
716 pthread_priority_t _dispatch_user_initiated_priority;
717
718 static void
719 _dispatch_root_queues_init_qos(int supported)
720 {
721 pthread_priority_t p;
722 qos_class_t qos;
723 unsigned int i;
724 for (i = 0; i < DISPATCH_PRIORITY_COUNT; i++) {
725 p = _pthread_qos_class_encode_workqueue(_dispatch_priority2wq[i], 0);
726 qos = _pthread_qos_class_decode(p, NULL, NULL);
727 dispatch_assert(qos != _DISPATCH_QOS_CLASS_UNSPECIFIED);
728 _dispatch_priority2qos[i] = qos;
729 }
730 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
731 qos = _dispatch_root_queue_contexts[i].dgq_qos;
732 if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
733 !(supported & WORKQ_FEATURE_MAINTENANCE)) {
734 continue;
735 }
736 unsigned long flags = i & 1 ? _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0;
737 flags |= _PTHREAD_PRIORITY_ROOTQUEUE_FLAG;
738 if (i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS ||
739 i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT) {
740 flags |= _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
741 }
742 p = _pthread_qos_class_encode(qos, 0, flags);
743 _dispatch_root_queues[i].dq_priority = p;
744 }
745 p = _pthread_qos_class_encode(qos_class_main(), 0, 0);
746 _dispatch_main_q.dq_priority = p;
747 _dispatch_queue_set_override_priority(&_dispatch_main_q);
748 _dispatch_background_priority = _dispatch_root_queues[
749 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS].dq_priority &
750 ~_PTHREAD_PRIORITY_FLAGS_MASK;
751 _dispatch_user_initiated_priority = _dispatch_root_queues[
752 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS].dq_priority &
753 ~_PTHREAD_PRIORITY_FLAGS_MASK;
754 if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) {
755 _dispatch_set_qos_class_enabled = 1;
756 }
757 }
758 #endif
759
760 static inline bool
761 _dispatch_root_queues_init_workq(void)
762 {
763 bool result = false;
764 #if HAVE_PTHREAD_WORKQUEUES
765 bool disable_wq = false;
766 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
767 disable_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
768 #endif
769 int r;
770 #if HAVE_PTHREAD_WORKQUEUE_QOS
771 bool disable_qos = false;
772 #if DISPATCH_DEBUG
773 disable_qos = slowpath(getenv("LIBDISPATCH_DISABLE_QOS"));
774 #endif
775 if (!disable_qos && !disable_wq) {
776 r = _pthread_workqueue_supported();
777 int supported = r;
778 if (r & WORKQ_FEATURE_FINEPRIO) {
779 r = _pthread_workqueue_init(_dispatch_worker_thread3,
780 offsetof(struct dispatch_queue_s, dq_serialnum), 0);
781 result = !r;
782 if (result) _dispatch_root_queues_init_qos(supported);
783 }
784 }
785 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
786 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
787 if (!result && !disable_wq) {
788 #if PTHREAD_WORKQUEUE_SPI_VERSION >= 20121218
789 pthread_workqueue_setdispatchoffset_np(
790 offsetof(struct dispatch_queue_s, dq_serialnum));
791 #endif
792 r = pthread_workqueue_setdispatch_np(_dispatch_worker_thread2);
793 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
794 (void)dispatch_assume_zero(r);
795 #endif
796 result = !r;
797 }
798 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
799 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
800 if (!result) {
801 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
802 pthread_workqueue_attr_t pwq_attr;
803 if (!disable_wq) {
804 r = pthread_workqueue_attr_init_np(&pwq_attr);
805 (void)dispatch_assume_zero(r);
806 }
807 #endif
808 int i;
809 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
810 pthread_workqueue_t pwq = NULL;
811 dispatch_root_queue_context_t qc;
812 qc = &_dispatch_root_queue_contexts[i];
813 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
814 if (!disable_wq) {
815 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr,
816 qc->dgq_wq_priority);
817 (void)dispatch_assume_zero(r);
818 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr,
819 qc->dgq_wq_options &
820 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT);
821 (void)dispatch_assume_zero(r);
822 r = pthread_workqueue_create_np(&pwq, &pwq_attr);
823 (void)dispatch_assume_zero(r);
824 result = result || dispatch_assume(pwq);
825 }
826 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
827 qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul);
828 }
829 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
830 if (!disable_wq) {
831 r = pthread_workqueue_attr_destroy_np(&pwq_attr);
832 (void)dispatch_assume_zero(r);
833 }
834 #endif
835 }
836 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
837 #endif // HAVE_PTHREAD_WORKQUEUES
838 return result;
839 }
840
841 #if DISPATCH_USE_PTHREAD_POOL
842 static inline void
843 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc,
844 uint8_t pool_size, bool overcommit)
845 {
846 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
847 uint32_t thread_pool_size = overcommit ? MAX_PTHREAD_COUNT :
848 dispatch_hw_config(active_cpus);
849 if (slowpath(pool_size) && pool_size < thread_pool_size) {
850 thread_pool_size = pool_size;
851 }
852 qc->dgq_thread_pool_size = thread_pool_size;
853 if (qc->dgq_qos) {
854 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
855 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
856 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
857 #if HAVE_PTHREAD_WORKQUEUE_QOS
858 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(
859 &pqc->dpq_thread_attr, qc->dgq_qos, 0));
860 #endif
861 }
862 #if USE_MACH_SEM
863 // override the default FIFO behavior for the pool semaphores
864 kern_return_t kr = semaphore_create(mach_task_self(),
865 &pqc->dpq_thread_mediator.dsema_port, SYNC_POLICY_LIFO, 0);
866 DISPATCH_VERIFY_MIG(kr);
867 (void)dispatch_assume_zero(kr);
868 (void)dispatch_assume(pqc->dpq_thread_mediator.dsema_port);
869 #elif USE_POSIX_SEM
870 /* XXXRW: POSIX semaphores don't support LIFO? */
871 int ret = sem_init(&pqc->dpq_thread_mediator.dsema_sem), 0, 0);
872 (void)dispatch_assume_zero(ret);
873 #endif
874 }
875 #endif // DISPATCH_USE_PTHREAD_POOL
876
877 static dispatch_once_t _dispatch_root_queues_pred;
878
879 static void
880 _dispatch_root_queues_init(void *context DISPATCH_UNUSED)
881 {
882 _dispatch_safe_fork = false;
883 if (!_dispatch_root_queues_init_workq()) {
884 #if DISPATCH_ENABLE_THREAD_POOL
885 int i;
886 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
887 bool overcommit = true;
888 #if TARGET_OS_EMBEDDED
889 // some software hangs if the non-overcommitting queues do not
890 // overcommit when threads block. Someday, this behavior should
891 // apply to all platforms
892 if (!(i & 1)) {
893 overcommit = false;
894 }
895 #endif
896 _dispatch_root_queue_init_pthread_pool(
897 &_dispatch_root_queue_contexts[i], 0, overcommit);
898 }
899 #else
900 DISPATCH_CRASH("Root queue initialization failed");
901 #endif // DISPATCH_ENABLE_THREAD_POOL
902 }
903 }
904
905 #define countof(x) (sizeof(x) / sizeof(x[0]))
906
907 DISPATCH_EXPORT DISPATCH_NOTHROW
908 void
909 libdispatch_init(void)
910 {
911 dispatch_assert(DISPATCH_QUEUE_QOS_COUNT == 6);
912 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 12);
913
914 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW ==
915 -DISPATCH_QUEUE_PRIORITY_HIGH);
916 dispatch_assert(countof(_dispatch_root_queues) ==
917 DISPATCH_ROOT_QUEUE_COUNT);
918 dispatch_assert(countof(_dispatch_root_queue_contexts) ==
919 DISPATCH_ROOT_QUEUE_COUNT);
920 dispatch_assert(countof(_dispatch_priority2qos) ==
921 DISPATCH_PRIORITY_COUNT);
922 #if HAVE_PTHREAD_WORKQUEUE_QOS
923 dispatch_assert(countof(_dispatch_priority2wq) ==
924 DISPATCH_PRIORITY_COUNT);
925 #endif
926 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
927 dispatch_assert(sizeof(_dispatch_wq2root_queues) /
928 sizeof(_dispatch_wq2root_queues[0][0]) ==
929 WORKQ_NUM_PRIOQUEUE * 2);
930 #endif
931 #if DISPATCH_ENABLE_THREAD_POOL
932 dispatch_assert(countof(_dispatch_pthread_root_queue_contexts) ==
933 DISPATCH_ROOT_QUEUE_COUNT);
934 #endif
935
936 dispatch_assert(offsetof(struct dispatch_continuation_s, do_next) ==
937 offsetof(struct dispatch_object_s, do_next));
938 dispatch_assert(sizeof(struct dispatch_apply_s) <=
939 DISPATCH_CONTINUATION_SIZE);
940 dispatch_assert(sizeof(struct dispatch_queue_s) % DISPATCH_CACHELINE_SIZE
941 == 0);
942 dispatch_assert(sizeof(struct dispatch_root_queue_context_s) %
943 DISPATCH_CACHELINE_SIZE == 0);
944
945 _dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup);
946 _dispatch_thread_key_create(&dispatch_voucher_key, _voucher_thread_cleanup);
947 _dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup);
948 _dispatch_thread_key_create(&dispatch_io_key, NULL);
949 _dispatch_thread_key_create(&dispatch_apply_key, NULL);
950 _dispatch_thread_key_create(&dispatch_defaultpriority_key, NULL);
951 _dispatch_thread_key_create(&dispatch_pthread_root_queue_observer_hooks_key,
952 NULL);
953 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
954 _dispatch_thread_key_create(&dispatch_bcounter_key, NULL);
955 #endif
956 #if !DISPATCH_USE_OS_SEMAPHORE_CACHE
957 _dispatch_thread_key_create(&dispatch_sema4_key,
958 (void (*)(void *))_dispatch_thread_semaphore_dispose);
959 #endif
960
961 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
962 _dispatch_main_q.do_targetq = &_dispatch_root_queues[
963 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT];
964 #endif
965
966 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_main_q);
967 _dispatch_queue_set_bound_thread(&_dispatch_main_q);
968
969 #if DISPATCH_USE_PTHREAD_ATFORK
970 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare,
971 dispatch_atfork_parent, dispatch_atfork_child));
972 #endif
973
974 _dispatch_hw_config_init();
975 _dispatch_vtable_init();
976 _os_object_init();
977 _voucher_init();
978 _dispatch_introspection_init();
979 }
980
981 #if HAVE_MACH
982 static dispatch_once_t _dispatch_mach_host_port_pred;
983 static mach_port_t _dispatch_mach_host_port;
984
985 static void
986 _dispatch_mach_host_port_init(void *ctxt DISPATCH_UNUSED)
987 {
988 kern_return_t kr;
989 mach_port_t mp, mhp = mach_host_self();
990 kr = host_get_host_port(mhp, &mp);
991 DISPATCH_VERIFY_MIG(kr);
992 if (!kr) {
993 // mach_host_self returned the HOST_PRIV port
994 kr = mach_port_deallocate(mach_task_self(), mhp);
995 DISPATCH_VERIFY_MIG(kr);
996 (void)dispatch_assume_zero(kr);
997 mhp = mp;
998 } else if (kr != KERN_INVALID_ARGUMENT) {
999 (void)dispatch_assume_zero(kr);
1000 }
1001 if (!dispatch_assume(mhp)) {
1002 DISPATCH_CRASH("Could not get unprivileged host port");
1003 }
1004 _dispatch_mach_host_port = mhp;
1005 }
1006
1007 mach_port_t
1008 _dispatch_get_mach_host_port(void)
1009 {
1010 dispatch_once_f(&_dispatch_mach_host_port_pred, NULL,
1011 _dispatch_mach_host_port_init);
1012 return _dispatch_mach_host_port;
1013 }
1014 #endif
1015
1016 DISPATCH_EXPORT DISPATCH_NOTHROW
1017 void
1018 dispatch_atfork_child(void)
1019 {
1020 void *crash = (void *)0x100;
1021 size_t i;
1022
1023 #if HAVE_MACH
1024 _dispatch_mach_host_port_pred = 0;
1025 _dispatch_mach_host_port = MACH_VOUCHER_NULL;
1026 #endif
1027 _voucher_atfork_child();
1028 if (_dispatch_safe_fork) {
1029 return;
1030 }
1031 _dispatch_child_of_unsafe_fork = true;
1032
1033 _dispatch_main_q.dq_items_head = crash;
1034 _dispatch_main_q.dq_items_tail = crash;
1035
1036 _dispatch_mgr_q.dq_items_head = crash;
1037 _dispatch_mgr_q.dq_items_tail = crash;
1038
1039 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
1040 _dispatch_root_queues[i].dq_items_head = crash;
1041 _dispatch_root_queues[i].dq_items_tail = crash;
1042 }
1043 }
1044
1045 #pragma mark -
1046 #pragma mark dispatch_queue_attr_t
1047
1048 DISPATCH_ALWAYS_INLINE
1049 static inline bool
1050 _dispatch_qos_class_valid(dispatch_qos_class_t qos_class, int relative_priority)
1051 {
1052 qos_class_t qos = (qos_class_t)qos_class;
1053 switch (qos) {
1054 case _DISPATCH_QOS_CLASS_MAINTENANCE:
1055 case _DISPATCH_QOS_CLASS_BACKGROUND:
1056 case _DISPATCH_QOS_CLASS_UTILITY:
1057 case _DISPATCH_QOS_CLASS_DEFAULT:
1058 case _DISPATCH_QOS_CLASS_USER_INITIATED:
1059 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE:
1060 case _DISPATCH_QOS_CLASS_UNSPECIFIED:
1061 break;
1062 default:
1063 return false;
1064 }
1065 if (relative_priority > 0 || relative_priority < QOS_MIN_RELATIVE_PRIORITY){
1066 return false;
1067 }
1068 return true;
1069 }
1070
1071 #define DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(qos) \
1072 [_DISPATCH_QOS_CLASS_##qos] = DQA_INDEX_QOS_CLASS_##qos
1073
1074 static const
1075 _dispatch_queue_attr_index_qos_class_t _dispatch_queue_attr_qos2idx[] = {
1076 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UNSPECIFIED),
1077 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(MAINTENANCE),
1078 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(BACKGROUND),
1079 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UTILITY),
1080 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(DEFAULT),
1081 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INITIATED),
1082 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INTERACTIVE),
1083 };
1084
1085 #define DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit) \
1086 ((overcommit) == _dispatch_queue_attr_overcommit_disabled ? \
1087 DQA_INDEX_NON_OVERCOMMIT : \
1088 ((overcommit) == _dispatch_queue_attr_overcommit_enabled ? \
1089 DQA_INDEX_OVERCOMMIT : DQA_INDEX_UNSPECIFIED_OVERCOMMIT))
1090
1091 #define DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent) \
1092 ((concurrent) ? DQA_INDEX_CONCURRENT : DQA_INDEX_SERIAL)
1093
1094 #define DISPATCH_QUEUE_ATTR_PRIO2IDX(prio) (-(prio))
1095
1096 #define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (_dispatch_queue_attr_qos2idx[(qos)])
1097
1098 static inline dispatch_queue_attr_t
1099 _dispatch_get_queue_attr(qos_class_t qos, int prio,
1100 _dispatch_queue_attr_overcommit_t overcommit, bool concurrent)
1101 {
1102 return (dispatch_queue_attr_t)&_dispatch_queue_attrs
1103 [DISPATCH_QUEUE_ATTR_QOS2IDX(qos)]
1104 [DISPATCH_QUEUE_ATTR_PRIO2IDX(prio)]
1105 [DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit)]
1106 [DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent)];
1107 }
1108
1109 dispatch_queue_attr_t
1110 dispatch_queue_attr_make_with_qos_class(dispatch_queue_attr_t dqa,
1111 dispatch_qos_class_t qos_class, int relative_priority)
1112 {
1113 if (!_dispatch_qos_class_valid(qos_class, relative_priority)) return NULL;
1114 if (!slowpath(dqa)) {
1115 dqa = _dispatch_get_queue_attr(0, 0,
1116 _dispatch_queue_attr_overcommit_unspecified, false);
1117 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1118 DISPATCH_CLIENT_CRASH("Invalid queue attribute");
1119 }
1120 return _dispatch_get_queue_attr(qos_class, relative_priority,
1121 dqa->dqa_overcommit, dqa->dqa_concurrent);
1122 }
1123
1124 dispatch_queue_attr_t
1125 dispatch_queue_attr_make_with_overcommit(dispatch_queue_attr_t dqa,
1126 bool overcommit)
1127 {
1128 if (!slowpath(dqa)) {
1129 dqa = _dispatch_get_queue_attr(0, 0,
1130 _dispatch_queue_attr_overcommit_unspecified, false);
1131 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1132 DISPATCH_CLIENT_CRASH("Invalid queue attribute");
1133 }
1134 return _dispatch_get_queue_attr(dqa->dqa_qos_class,
1135 dqa->dqa_relative_priority, overcommit ?
1136 _dispatch_queue_attr_overcommit_enabled :
1137 _dispatch_queue_attr_overcommit_disabled, dqa->dqa_concurrent);
1138 }
1139
1140 #pragma mark -
1141 #pragma mark dispatch_queue_t
1142
1143 // skip zero
1144 // 1 - main_q
1145 // 2 - mgr_q
1146 // 3 - mgr_root_q
1147 // 4,5,6,7,8,9,10,11,12,13,14,15 - global queues
1148 // we use 'xadd' on Intel, so the initial value == next assigned
1149 unsigned long volatile _dispatch_queue_serial_numbers = 16;
1150
1151 dispatch_queue_t
1152 dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
1153 dispatch_queue_t tq)
1154 {
1155 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1156 // Be sure the root queue priorities are set
1157 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
1158 _dispatch_root_queues_init);
1159 #endif
1160 bool disallow_tq = (slowpath(dqa) && dqa != DISPATCH_QUEUE_CONCURRENT);
1161 if (!slowpath(dqa)) {
1162 dqa = _dispatch_get_queue_attr(0, 0,
1163 _dispatch_queue_attr_overcommit_unspecified, false);
1164 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1165 DISPATCH_CLIENT_CRASH("Invalid queue attribute");
1166 }
1167 dispatch_queue_t dq = _dispatch_alloc(DISPATCH_VTABLE(queue),
1168 sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
1169 _dispatch_queue_init(dq);
1170 if (label) {
1171 dq->dq_label = strdup(label);
1172 }
1173 qos_class_t qos = dqa->dqa_qos_class;
1174 _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
1175 if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
1176 // Serial queues default to overcommit!
1177 overcommit = dqa->dqa_concurrent ?
1178 _dispatch_queue_attr_overcommit_disabled :
1179 _dispatch_queue_attr_overcommit_enabled;
1180 }
1181 #if HAVE_PTHREAD_WORKQUEUE_QOS
1182 dq->dq_priority = _pthread_qos_class_encode(qos, dqa->dqa_relative_priority,
1183 overcommit == _dispatch_queue_attr_overcommit_enabled ?
1184 _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0);
1185 #endif
1186 if (dqa->dqa_concurrent) {
1187 dq->dq_width = DISPATCH_QUEUE_WIDTH_MAX;
1188 }
1189 if (!tq) {
1190 if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) {
1191 qos = _DISPATCH_QOS_CLASS_DEFAULT;
1192 }
1193 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1194 if (qos == _DISPATCH_QOS_CLASS_USER_INTERACTIVE &&
1195 !_dispatch_root_queues[
1196 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS].dq_priority) {
1197 qos = _DISPATCH_QOS_CLASS_USER_INITIATED;
1198 }
1199 #endif
1200 bool maintenance_fallback = false;
1201 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1202 maintenance_fallback = true;
1203 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1204 if (maintenance_fallback) {
1205 if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
1206 !_dispatch_root_queues[
1207 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS].dq_priority) {
1208 qos = _DISPATCH_QOS_CLASS_BACKGROUND;
1209 }
1210 }
1211
1212 tq = _dispatch_get_root_queue(qos, overcommit ==
1213 _dispatch_queue_attr_overcommit_enabled);
1214 if (slowpath(!tq)) {
1215 DISPATCH_CLIENT_CRASH("Invalid queue attribute");
1216 }
1217 } else {
1218 _dispatch_retain(tq);
1219 if (disallow_tq) {
1220 // TODO: override target queue's qos/overcommit ?
1221 DISPATCH_CLIENT_CRASH("Invalid combination of target queue & "
1222 "queue attribute");
1223 }
1224 _dispatch_queue_priority_inherit_from_target(dq, tq);
1225 }
1226 _dispatch_queue_set_override_priority(dq);
1227 dq->do_targetq = tq;
1228 _dispatch_object_debug(dq, "%s", __func__);
1229 return _dispatch_introspection_queue_create(dq);
1230 }
1231
1232 dispatch_queue_t
1233 dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
1234 {
1235 return dispatch_queue_create_with_target(label, attr,
1236 DISPATCH_TARGET_QUEUE_DEFAULT);
1237 }
1238
1239 dispatch_queue_t
1240 dispatch_queue_create_with_accounting_override_voucher(const char *label,
1241 dispatch_queue_attr_t attr, voucher_t voucher)
1242 {
1243 dispatch_queue_t dq = dispatch_queue_create_with_target(label, attr,
1244 DISPATCH_TARGET_QUEUE_DEFAULT);
1245 dq->dq_override_voucher = _voucher_create_accounting_voucher(voucher);
1246 return dq;
1247 }
1248
1249 void
1250 _dispatch_queue_destroy(dispatch_object_t dou)
1251 {
1252 dispatch_queue_t dq = dou._dq;
1253 if (slowpath(dq == _dispatch_queue_get_current())) {
1254 DISPATCH_CRASH("Release of a queue by itself");
1255 }
1256 if (slowpath(dq->dq_items_tail)) {
1257 DISPATCH_CRASH("Release of a queue while items are enqueued");
1258 }
1259
1260 // trash the tail queue so that use after free will crash
1261 dq->dq_items_tail = (void *)0x200;
1262
1263 dispatch_queue_t dqsq = dispatch_atomic_xchg2o(dq, dq_specific_q,
1264 (void *)0x200, relaxed);
1265 if (dqsq) {
1266 _dispatch_release(dqsq);
1267 }
1268 if (dq->dq_override_voucher != DISPATCH_NO_VOUCHER) {
1269 if (dq->dq_override_voucher) _voucher_release(dq->dq_override_voucher);
1270 dq->dq_override_voucher = DISPATCH_NO_VOUCHER;
1271 }
1272 }
1273
1274 // 6618342 Contact the team that owns the Instrument DTrace probe before
1275 // renaming this symbol
1276 void
1277 _dispatch_queue_dispose(dispatch_queue_t dq)
1278 {
1279 _dispatch_object_debug(dq, "%s", __func__);
1280 _dispatch_introspection_queue_dispose(dq);
1281 if (dq->dq_label) {
1282 free((void*)dq->dq_label);
1283 }
1284 _dispatch_queue_destroy(dq);
1285 }
1286
1287 const char *
1288 dispatch_queue_get_label(dispatch_queue_t dq)
1289 {
1290 if (slowpath(dq == DISPATCH_CURRENT_QUEUE_LABEL)) {
1291 dq = _dispatch_get_current_queue();
1292 }
1293 return dq->dq_label ? dq->dq_label : "";
1294 }
1295
1296 qos_class_t
1297 dispatch_queue_get_qos_class(dispatch_queue_t dq, int *relative_priority_ptr)
1298 {
1299 qos_class_t qos = _DISPATCH_QOS_CLASS_UNSPECIFIED;
1300 int relative_priority = 0;
1301 #if HAVE_PTHREAD_WORKQUEUE_QOS
1302 pthread_priority_t dqp = dq->dq_priority;
1303 if (dqp & _PTHREAD_PRIORITY_INHERIT_FLAG) dqp = 0;
1304 qos = _pthread_qos_class_decode(dqp, &relative_priority, NULL);
1305 #else
1306 (void)dq;
1307 #endif
1308 if (relative_priority_ptr) *relative_priority_ptr = relative_priority;
1309 return qos;
1310 }
1311
1312 static void
1313 _dispatch_queue_set_width2(void *ctxt)
1314 {
1315 int w = (int)(intptr_t)ctxt; // intentional truncation
1316 uint32_t tmp;
1317 dispatch_queue_t dq = _dispatch_queue_get_current();
1318
1319 if (w == 1 || w == 0) {
1320 dq->dq_width = 1;
1321 _dispatch_object_debug(dq, "%s", __func__);
1322 return;
1323 }
1324 if (w > 0) {
1325 tmp = (unsigned int)w;
1326 } else switch (w) {
1327 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
1328 tmp = dispatch_hw_config(physical_cpus);
1329 break;
1330 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
1331 tmp = dispatch_hw_config(active_cpus);
1332 break;
1333 default:
1334 // fall through
1335 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
1336 tmp = dispatch_hw_config(logical_cpus);
1337 break;
1338 }
1339 if (tmp > DISPATCH_QUEUE_WIDTH_MAX / 2) {
1340 tmp = DISPATCH_QUEUE_WIDTH_MAX / 2;
1341 }
1342 // multiply by two since the running count is inc/dec by two
1343 // (the low bit == barrier)
1344 dq->dq_width = (typeof(dq->dq_width))(tmp * 2);
1345 _dispatch_object_debug(dq, "%s", __func__);
1346 }
1347
1348 void
1349 dispatch_queue_set_width(dispatch_queue_t dq, long width)
1350 {
1351 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) ||
1352 slowpath(dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE)) {
1353 return;
1354 }
1355 _dispatch_barrier_trysync_f(dq, (void*)(intptr_t)width,
1356 _dispatch_queue_set_width2);
1357 }
1358
1359 // 6618342 Contact the team that owns the Instrument DTrace probe before
1360 // renaming this symbol
1361 static void
1362 _dispatch_set_target_queue2(void *ctxt)
1363 {
1364 dispatch_queue_t prev_dq, dq = _dispatch_queue_get_current(), tq = ctxt;
1365 #if HAVE_PTHREAD_WORKQUEUE_QOS
1366 // see _dispatch_queue_wakeup_with_qos_slow
1367 mach_msg_timeout_t timeout = 1;
1368 mach_port_t th;
1369
1370 while (!dispatch_atomic_cmpxchgv2o(dq, dq_tqthread, MACH_PORT_NULL,
1371 _dispatch_thread_port(), &th, acquire)) {
1372 _dispatch_thread_switch(th, DISPATCH_YIELD_THREAD_SWITCH_OPTION,
1373 timeout++);
1374 }
1375 #endif
1376 _dispatch_queue_priority_inherit_from_target(dq, tq);
1377 prev_dq = dq->do_targetq;
1378 dq->do_targetq = tq;
1379 _dispatch_release(prev_dq);
1380 _dispatch_object_debug(dq, "%s", __func__);
1381 dispatch_atomic_store2o(dq, dq_tqthread, MACH_PORT_NULL, release);
1382 }
1383
1384 void
1385 dispatch_set_target_queue(dispatch_object_t dou, dispatch_queue_t dq)
1386 {
1387 DISPATCH_OBJECT_TFB(_dispatch_objc_set_target_queue, dou, dq);
1388 if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) ||
1389 slowpath(dx_type(dou._do) == DISPATCH_QUEUE_ROOT_TYPE)) {
1390 return;
1391 }
1392 unsigned long type = dx_metatype(dou._do);
1393 if (slowpath(!dq)) {
1394 bool is_concurrent_q = (type == _DISPATCH_QUEUE_TYPE &&
1395 slowpath(dou._dq->dq_width > 1));
1396 dq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
1397 !is_concurrent_q);
1398 }
1399 // TODO: put into the vtable
1400 switch(type) {
1401 case _DISPATCH_QUEUE_TYPE:
1402 case _DISPATCH_SOURCE_TYPE:
1403 _dispatch_retain(dq);
1404 return _dispatch_barrier_trysync_f(dou._dq, dq,
1405 _dispatch_set_target_queue2);
1406 case _DISPATCH_IO_TYPE:
1407 return _dispatch_io_set_target_queue(dou._dchannel, dq);
1408 default: {
1409 dispatch_queue_t prev_dq;
1410 _dispatch_retain(dq);
1411 prev_dq = dispatch_atomic_xchg2o(dou._do, do_targetq, dq, release);
1412 if (prev_dq) _dispatch_release(prev_dq);
1413 _dispatch_object_debug(dou._do, "%s", __func__);
1414 return;
1415 }
1416 }
1417 }
1418
1419 #pragma mark -
1420 #pragma mark dispatch_pthread_root_queue
1421
1422 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1423 static struct dispatch_pthread_root_queue_context_s
1424 _dispatch_mgr_root_queue_pthread_context;
1425 static struct dispatch_root_queue_context_s
1426 _dispatch_mgr_root_queue_context = {{{
1427 #if HAVE_PTHREAD_WORKQUEUES
1428 .dgq_kworkqueue = (void*)(~0ul),
1429 #endif
1430 .dgq_ctxt = &_dispatch_mgr_root_queue_pthread_context,
1431 .dgq_thread_pool_size = 1,
1432 }}};
1433
1434 static struct dispatch_queue_s _dispatch_mgr_root_queue = {
1435 .do_vtable = DISPATCH_VTABLE(queue_root),
1436 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
1437 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
1438 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
1439 .do_ctxt = &_dispatch_mgr_root_queue_context,
1440 .dq_label = "com.apple.root.libdispatch-manager",
1441 .dq_running = 2,
1442 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
1443 .dq_override_voucher = DISPATCH_NO_VOUCHER,
1444 .dq_serialnum = 3,
1445 };
1446
1447 static struct {
1448 volatile int prio;
1449 volatile qos_class_t qos;
1450 int default_prio;
1451 int policy;
1452 pthread_t tid;
1453 } _dispatch_mgr_sched;
1454
1455 static dispatch_once_t _dispatch_mgr_sched_pred;
1456
1457 // TODO: switch to "event-reflector thread" property <rdar://problem/18126138>
1458
1459 // Must be kept in sync with list of qos classes in sys/qos.h
1460 static const int _dispatch_mgr_sched_qos2prio[] = {
1461 [_DISPATCH_QOS_CLASS_MAINTENANCE] = 4,
1462 [_DISPATCH_QOS_CLASS_BACKGROUND] = 4,
1463 [_DISPATCH_QOS_CLASS_UTILITY] = 20,
1464 [_DISPATCH_QOS_CLASS_DEFAULT] = 31,
1465 [_DISPATCH_QOS_CLASS_USER_INITIATED] = 37,
1466 [_DISPATCH_QOS_CLASS_USER_INTERACTIVE] = 47,
1467 };
1468
1469 static void
1470 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED)
1471 {
1472 struct sched_param param;
1473 pthread_attr_t *attr;
1474 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
1475 (void)dispatch_assume_zero(pthread_attr_init(attr));
1476 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr,
1477 &_dispatch_mgr_sched.policy));
1478 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
1479 #if HAVE_PTHREAD_WORKQUEUE_QOS
1480 qos_class_t qos = qos_class_main();
1481 if (qos == _DISPATCH_QOS_CLASS_DEFAULT) {
1482 qos = _DISPATCH_QOS_CLASS_USER_INITIATED; // rdar://problem/17279292
1483 }
1484 if (qos) {
1485 _dispatch_mgr_sched.qos = qos;
1486 param.sched_priority = _dispatch_mgr_sched_qos2prio[qos];
1487 }
1488 #endif
1489 _dispatch_mgr_sched.default_prio = param.sched_priority;
1490 _dispatch_mgr_sched.prio = _dispatch_mgr_sched.default_prio;
1491 }
1492
1493 DISPATCH_NOINLINE
1494 static pthread_t *
1495 _dispatch_mgr_root_queue_init(void)
1496 {
1497 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
1498 struct sched_param param;
1499 pthread_attr_t *attr;
1500 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
1501 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr,
1502 PTHREAD_CREATE_DETACHED));
1503 #if !DISPATCH_DEBUG
1504 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr, 64 * 1024));
1505 #endif
1506 #if HAVE_PTHREAD_WORKQUEUE_QOS
1507 qos_class_t qos = _dispatch_mgr_sched.qos;
1508 if (qos) {
1509 if (_dispatch_set_qos_class_enabled) {
1510 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(attr,
1511 qos, 0));
1512 }
1513 _dispatch_mgr_q.dq_priority = _pthread_qos_class_encode(qos, 0, 0);
1514 _dispatch_queue_set_override_priority(&_dispatch_mgr_q);
1515 }
1516 #endif
1517 param.sched_priority = _dispatch_mgr_sched.prio;
1518 if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
1519 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr, &param));
1520 }
1521 return &_dispatch_mgr_sched.tid;
1522 }
1523
1524 static inline void
1525 _dispatch_mgr_priority_apply(void)
1526 {
1527 struct sched_param param;
1528 do {
1529 param.sched_priority = _dispatch_mgr_sched.prio;
1530 if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
1531 (void)dispatch_assume_zero(pthread_setschedparam(
1532 _dispatch_mgr_sched.tid, _dispatch_mgr_sched.policy,
1533 &param));
1534 }
1535 } while (_dispatch_mgr_sched.prio > param.sched_priority);
1536 }
1537
1538 DISPATCH_NOINLINE
1539 void
1540 _dispatch_mgr_priority_init(void)
1541 {
1542 struct sched_param param;
1543 pthread_attr_t *attr;
1544 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
1545 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
1546 #if HAVE_PTHREAD_WORKQUEUE_QOS
1547 qos_class_t qos = 0;
1548 (void)pthread_attr_get_qos_class_np(attr, &qos, NULL);
1549 if (_dispatch_mgr_sched.qos > qos && _dispatch_set_qos_class_enabled) {
1550 (void)pthread_set_qos_class_self_np(_dispatch_mgr_sched.qos, 0);
1551 int p = _dispatch_mgr_sched_qos2prio[_dispatch_mgr_sched.qos];
1552 if (p > param.sched_priority) {
1553 param.sched_priority = p;
1554 }
1555 }
1556 #endif
1557 if (slowpath(_dispatch_mgr_sched.prio > param.sched_priority)) {
1558 return _dispatch_mgr_priority_apply();
1559 }
1560 }
1561
1562 DISPATCH_NOINLINE
1563 static void
1564 _dispatch_mgr_priority_raise(const pthread_attr_t *attr)
1565 {
1566 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
1567 struct sched_param param;
1568 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
1569 #if HAVE_PTHREAD_WORKQUEUE_QOS
1570 qos_class_t qos = 0;
1571 (void)pthread_attr_get_qos_class_np((pthread_attr_t *)attr, &qos, NULL);
1572 if (qos) {
1573 param.sched_priority = _dispatch_mgr_sched_qos2prio[qos];
1574 qos_class_t q = _dispatch_mgr_sched.qos;
1575 do if (q >= qos) {
1576 break;
1577 } while (slowpath(!dispatch_atomic_cmpxchgvw2o(&_dispatch_mgr_sched,
1578 qos, q, qos, &q, relaxed)));
1579 }
1580 #endif
1581 int p = _dispatch_mgr_sched.prio;
1582 do if (p >= param.sched_priority) {
1583 return;
1584 } while (slowpath(!dispatch_atomic_cmpxchgvw2o(&_dispatch_mgr_sched, prio,
1585 p, param.sched_priority, &p, relaxed)));
1586 if (_dispatch_mgr_sched.tid) {
1587 return _dispatch_mgr_priority_apply();
1588 }
1589 }
1590
1591 static dispatch_queue_t
1592 _dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
1593 const pthread_attr_t *attr, dispatch_block_t configure,
1594 dispatch_pthread_root_queue_observer_hooks_t observer_hooks)
1595 {
1596 dispatch_queue_t dq;
1597 dispatch_root_queue_context_t qc;
1598 dispatch_pthread_root_queue_context_t pqc;
1599 size_t dqs;
1600 uint8_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ?
1601 (uint8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0;
1602
1603 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
1604 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_root), dqs +
1605 sizeof(struct dispatch_root_queue_context_s) +
1606 sizeof(struct dispatch_pthread_root_queue_context_s));
1607 qc = (void*)dq + dqs;
1608 pqc = (void*)qc + sizeof(struct dispatch_root_queue_context_s);
1609
1610 _dispatch_queue_init(dq);
1611 if (label) {
1612 dq->dq_label = strdup(label);
1613 }
1614
1615 dq->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK;
1616 dq->do_ctxt = qc;
1617 dq->do_targetq = NULL;
1618 dq->dq_running = 2;
1619 dq->dq_width = DISPATCH_QUEUE_WIDTH_MAX;
1620
1621 pqc->dpq_thread_mediator.do_vtable = DISPATCH_VTABLE(semaphore);
1622 qc->dgq_ctxt = pqc;
1623 #if HAVE_PTHREAD_WORKQUEUES
1624 qc->dgq_kworkqueue = (void*)(~0ul);
1625 #endif
1626 _dispatch_root_queue_init_pthread_pool(qc, pool_size, true);
1627
1628 if (attr) {
1629 memcpy(&pqc->dpq_thread_attr, attr, sizeof(pthread_attr_t));
1630 _dispatch_mgr_priority_raise(&pqc->dpq_thread_attr);
1631 } else {
1632 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
1633 }
1634 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
1635 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
1636 if (configure) {
1637 pqc->dpq_thread_configure = _dispatch_Block_copy(configure);
1638 }
1639 if (observer_hooks) {
1640 pqc->dpq_observer_hooks = *observer_hooks;
1641 }
1642 _dispatch_object_debug(dq, "%s", __func__);
1643 return _dispatch_introspection_queue_create(dq);
1644 }
1645
1646 dispatch_queue_t
1647 dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
1648 const pthread_attr_t *attr, dispatch_block_t configure)
1649 {
1650 return _dispatch_pthread_root_queue_create(label, flags, attr, configure,
1651 NULL);
1652 }
1653
1654
1655 #endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1656
1657 void
1658 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq)
1659 {
1660 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
1661 DISPATCH_CRASH("Global root queue disposed");
1662 }
1663 _dispatch_object_debug(dq, "%s", __func__);
1664 _dispatch_introspection_queue_dispose(dq);
1665 #if DISPATCH_USE_PTHREAD_POOL
1666 dispatch_root_queue_context_t qc = dq->do_ctxt;
1667 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
1668
1669 pthread_attr_destroy(&pqc->dpq_thread_attr);
1670 _dispatch_semaphore_dispose(&pqc->dpq_thread_mediator);
1671 if (pqc->dpq_thread_configure) {
1672 Block_release(pqc->dpq_thread_configure);
1673 }
1674 dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
1675 false);
1676 #endif
1677 if (dq->dq_label) {
1678 free((void*)dq->dq_label);
1679 }
1680 _dispatch_queue_destroy(dq);
1681 }
1682
1683 #pragma mark -
1684 #pragma mark dispatch_queue_specific
1685
1686 struct dispatch_queue_specific_queue_s {
1687 DISPATCH_STRUCT_HEADER(queue_specific_queue);
1688 DISPATCH_QUEUE_HEADER;
1689 TAILQ_HEAD(dispatch_queue_specific_head_s,
1690 dispatch_queue_specific_s) dqsq_contexts;
1691 };
1692
1693 struct dispatch_queue_specific_s {
1694 const void *dqs_key;
1695 void *dqs_ctxt;
1696 dispatch_function_t dqs_destructor;
1697 TAILQ_ENTRY(dispatch_queue_specific_s) dqs_list;
1698 };
1699 DISPATCH_DECL(dispatch_queue_specific);
1700
1701 void
1702 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq)
1703 {
1704 dispatch_queue_specific_t dqs, tmp;
1705
1706 TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) {
1707 if (dqs->dqs_destructor) {
1708 dispatch_async_f(_dispatch_get_root_queue(
1709 _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt,
1710 dqs->dqs_destructor);
1711 }
1712 free(dqs);
1713 }
1714 _dispatch_queue_destroy((dispatch_queue_t)dqsq);
1715 }
1716
1717 static void
1718 _dispatch_queue_init_specific(dispatch_queue_t dq)
1719 {
1720 dispatch_queue_specific_queue_t dqsq;
1721
1722 dqsq = _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue),
1723 sizeof(struct dispatch_queue_specific_queue_s));
1724 _dispatch_queue_init((dispatch_queue_t)dqsq);
1725 dqsq->do_xref_cnt = -1;
1726 dqsq->do_targetq = _dispatch_get_root_queue(
1727 _DISPATCH_QOS_CLASS_USER_INITIATED, true);
1728 dqsq->dq_width = DISPATCH_QUEUE_WIDTH_MAX;
1729 dqsq->dq_label = "queue-specific";
1730 TAILQ_INIT(&dqsq->dqsq_contexts);
1731 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_specific_q, NULL,
1732 (dispatch_queue_t)dqsq, release))) {
1733 _dispatch_release((dispatch_queue_t)dqsq);
1734 }
1735 }
1736
1737 static void
1738 _dispatch_queue_set_specific(void *ctxt)
1739 {
1740 dispatch_queue_specific_t dqs, dqsn = ctxt;
1741 dispatch_queue_specific_queue_t dqsq =
1742 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
1743
1744 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
1745 if (dqs->dqs_key == dqsn->dqs_key) {
1746 // Destroy previous context for existing key
1747 if (dqs->dqs_destructor) {
1748 dispatch_async_f(_dispatch_get_root_queue(
1749 _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt,
1750 dqs->dqs_destructor);
1751 }
1752 if (dqsn->dqs_ctxt) {
1753 // Copy new context for existing key
1754 dqs->dqs_ctxt = dqsn->dqs_ctxt;
1755 dqs->dqs_destructor = dqsn->dqs_destructor;
1756 } else {
1757 // Remove context storage for existing key
1758 TAILQ_REMOVE(&dqsq->dqsq_contexts, dqs, dqs_list);
1759 free(dqs);
1760 }
1761 return free(dqsn);
1762 }
1763 }
1764 // Insert context storage for new key
1765 TAILQ_INSERT_TAIL(&dqsq->dqsq_contexts, dqsn, dqs_list);
1766 }
1767
1768 DISPATCH_NOINLINE
1769 void
1770 dispatch_queue_set_specific(dispatch_queue_t dq, const void *key,
1771 void *ctxt, dispatch_function_t destructor)
1772 {
1773 if (slowpath(!key)) {
1774 return;
1775 }
1776 dispatch_queue_specific_t dqs;
1777
1778 dqs = _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s));
1779 dqs->dqs_key = key;
1780 dqs->dqs_ctxt = ctxt;
1781 dqs->dqs_destructor = destructor;
1782 if (slowpath(!dq->dq_specific_q)) {
1783 _dispatch_queue_init_specific(dq);
1784 }
1785 _dispatch_barrier_trysync_f(dq->dq_specific_q, dqs,
1786 _dispatch_queue_set_specific);
1787 }
1788
1789 static void
1790 _dispatch_queue_get_specific(void *ctxt)
1791 {
1792 void **ctxtp = ctxt;
1793 void *key = *ctxtp;
1794 dispatch_queue_specific_queue_t dqsq =
1795 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
1796 dispatch_queue_specific_t dqs;
1797
1798 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
1799 if (dqs->dqs_key == key) {
1800 *ctxtp = dqs->dqs_ctxt;
1801 return;
1802 }
1803 }
1804 *ctxtp = NULL;
1805 }
1806
1807 DISPATCH_NOINLINE
1808 void *
1809 dispatch_queue_get_specific(dispatch_queue_t dq, const void *key)
1810 {
1811 if (slowpath(!key)) {
1812 return NULL;
1813 }
1814 void *ctxt = NULL;
1815
1816 if (fastpath(dq->dq_specific_q)) {
1817 ctxt = (void *)key;
1818 dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific);
1819 }
1820 return ctxt;
1821 }
1822
1823 DISPATCH_NOINLINE
1824 void *
1825 dispatch_get_specific(const void *key)
1826 {
1827 if (slowpath(!key)) {
1828 return NULL;
1829 }
1830 void *ctxt = NULL;
1831 dispatch_queue_t dq = _dispatch_queue_get_current();
1832
1833 while (slowpath(dq)) {
1834 if (slowpath(dq->dq_specific_q)) {
1835 ctxt = (void *)key;
1836 dispatch_sync_f(dq->dq_specific_q, &ctxt,
1837 _dispatch_queue_get_specific);
1838 if (ctxt) break;
1839 }
1840 dq = dq->do_targetq;
1841 }
1842 return ctxt;
1843 }
1844
1845
1846 #pragma mark -
1847 #pragma mark dispatch_queue_debug
1848
1849 size_t
1850 _dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
1851 {
1852 size_t offset = 0;
1853 dispatch_queue_t target = dq->do_targetq;
1854 offset += dsnprintf(&buf[offset], bufsiz - offset, "target = %s[%p], "
1855 "width = 0x%x, running = 0x%x, barrier = %d ",
1856 target && target->dq_label ? target->dq_label : "", target,
1857 dq->dq_width / 2, dq->dq_running / 2, dq->dq_running & 1);
1858 if (dq->dq_is_thread_bound) {
1859 offset += dsnprintf(&buf[offset], bufsiz - offset, ", thread = 0x%x ",
1860 _dispatch_queue_get_bound_thread(dq));
1861 }
1862 return offset;
1863 }
1864
1865 size_t
1866 dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
1867 {
1868 size_t offset = 0;
1869 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
1870 dq->dq_label ? dq->dq_label : dx_kind(dq), dq);
1871 offset += _dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
1872 offset += _dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
1873 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
1874 return offset;
1875 }
1876
1877 #if DISPATCH_DEBUG
1878 void
1879 dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
1880 if (fastpath(dq)) {
1881 _dispatch_object_debug(dq, "%s", str);
1882 } else {
1883 _dispatch_log("queue[NULL]: %s", str);
1884 }
1885 }
1886 #endif
1887
1888 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
1889 static OSSpinLock _dispatch_stats_lock;
1890 static struct {
1891 uint64_t time_total;
1892 uint64_t count_total;
1893 uint64_t thread_total;
1894 } _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set
1895
1896 static void
1897 _dispatch_queue_merge_stats(uint64_t start)
1898 {
1899 uint64_t delta = _dispatch_absolute_time() - start;
1900 unsigned long count;
1901
1902 count = (unsigned long)_dispatch_thread_getspecific(dispatch_bcounter_key);
1903 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
1904
1905 int bucket = flsl((long)count);
1906
1907 // 64-bit counters on 32-bit require a lock or a queue
1908 OSSpinLockLock(&_dispatch_stats_lock);
1909
1910 _dispatch_stats[bucket].time_total += delta;
1911 _dispatch_stats[bucket].count_total += count;
1912 _dispatch_stats[bucket].thread_total++;
1913
1914 OSSpinLockUnlock(&_dispatch_stats_lock);
1915 }
1916 #endif
1917
1918 #pragma mark -
1919 #pragma mark dispatch_continuation_t
1920
1921 static void
1922 _dispatch_force_cache_cleanup(void)
1923 {
1924 dispatch_continuation_t dc;
1925 dc = _dispatch_thread_getspecific(dispatch_cache_key);
1926 if (dc) {
1927 _dispatch_thread_setspecific(dispatch_cache_key, NULL);
1928 _dispatch_cache_cleanup(dc);
1929 }
1930 }
1931
1932 DISPATCH_NOINLINE
1933 static void
1934 _dispatch_cache_cleanup(void *value)
1935 {
1936 dispatch_continuation_t dc, next_dc = value;
1937
1938 while ((dc = next_dc)) {
1939 next_dc = dc->do_next;
1940 _dispatch_continuation_free_to_heap(dc);
1941 }
1942 }
1943
1944 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
1945 int _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT;
1946
1947 DISPATCH_NOINLINE
1948 void
1949 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc)
1950 {
1951 _dispatch_continuation_free_to_heap(dc);
1952 dispatch_continuation_t next_dc;
1953 dc = _dispatch_thread_getspecific(dispatch_cache_key);
1954 int cnt;
1955 if (!dc || (cnt = dc->dc_cache_cnt -
1956 _dispatch_continuation_cache_limit) <= 0){
1957 return;
1958 }
1959 do {
1960 next_dc = dc->do_next;
1961 _dispatch_continuation_free_to_heap(dc);
1962 } while (--cnt && (dc = next_dc));
1963 _dispatch_thread_setspecific(dispatch_cache_key, next_dc);
1964 }
1965 #endif
1966
1967 DISPATCH_ALWAYS_INLINE_NDEBUG
1968 static inline void
1969 _dispatch_continuation_redirect(dispatch_queue_t dq, dispatch_object_t dou)
1970 {
1971 dispatch_continuation_t dc = dou._dc;
1972
1973 (void)dispatch_atomic_add2o(dq, dq_running, 2, acquire);
1974 if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
1975 (long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT) {
1976 _dispatch_trace_continuation_pop(dq, dou);
1977 _dispatch_wqthread_override_start((mach_port_t)dc->dc_data,
1978 _dispatch_queue_get_override_priority(dq));
1979 _dispatch_thread_semaphore_signal(
1980 (_dispatch_thread_semaphore_t)dc->dc_other);
1981 _dispatch_introspection_queue_item_complete(dou);
1982 } else {
1983 _dispatch_async_f_redirect(dq, dc,
1984 _dispatch_queue_get_override_priority(dq));
1985 }
1986 _dispatch_perfmon_workitem_inc();
1987 }
1988
1989 #pragma mark -
1990 #pragma mark dispatch_block_create
1991
1992 #if __BLOCKS__
1993
1994 DISPATCH_ALWAYS_INLINE
1995 static inline bool
1996 _dispatch_block_flags_valid(dispatch_block_flags_t flags)
1997 {
1998 return ((flags & ~DISPATCH_BLOCK_API_MASK) == 0);
1999 }
2000
2001 DISPATCH_ALWAYS_INLINE
2002 static inline dispatch_block_flags_t
2003 _dispatch_block_normalize_flags(dispatch_block_flags_t flags)
2004 {
2005 if (flags & (DISPATCH_BLOCK_NO_VOUCHER|DISPATCH_BLOCK_DETACHED)) {
2006 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2007 }
2008 if (flags & (DISPATCH_BLOCK_NO_QOS_CLASS|DISPATCH_BLOCK_DETACHED)) {
2009 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2010 }
2011 return flags;
2012 }
2013
2014 static inline dispatch_block_t
2015 _dispatch_block_create_with_voucher_and_priority(dispatch_block_flags_t flags,
2016 voucher_t voucher, pthread_priority_t pri, dispatch_block_t block)
2017 {
2018 flags = _dispatch_block_normalize_flags(flags);
2019 voucher_t cv = NULL;
2020 bool assign = (flags & DISPATCH_BLOCK_ASSIGN_CURRENT);
2021 if (assign && !(flags & DISPATCH_BLOCK_HAS_VOUCHER)) {
2022 voucher = cv = voucher_copy();
2023 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2024 }
2025 if (assign && !(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
2026 pri = _dispatch_priority_propagate();
2027 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2028 }
2029 dispatch_block_t db = _dispatch_block_create(flags, voucher, pri, block);
2030 if (cv) _voucher_release(cv);
2031 #if DISPATCH_DEBUG
2032 dispatch_assert(_dispatch_block_get_data(db));
2033 #endif
2034 return db;
2035 }
2036
2037 dispatch_block_t
2038 dispatch_block_create(dispatch_block_flags_t flags, dispatch_block_t block)
2039 {
2040 if (!_dispatch_block_flags_valid(flags)) return NULL;
2041 return _dispatch_block_create_with_voucher_and_priority(flags, NULL, 0,
2042 block);
2043 }
2044
2045 dispatch_block_t
2046 dispatch_block_create_with_qos_class(dispatch_block_flags_t flags,
2047 dispatch_qos_class_t qos_class, int relative_priority,
2048 dispatch_block_t block)
2049 {
2050 if (!_dispatch_block_flags_valid(flags)) return NULL;
2051 if (!_dispatch_qos_class_valid(qos_class, relative_priority)) return NULL;
2052 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
2053 pthread_priority_t pri = 0;
2054 #if HAVE_PTHREAD_WORKQUEUE_QOS
2055 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0);
2056 #endif
2057 return _dispatch_block_create_with_voucher_and_priority(flags, NULL,
2058 pri, block);
2059 }
2060
2061 dispatch_block_t
2062 dispatch_block_create_with_voucher(dispatch_block_flags_t flags,
2063 voucher_t voucher, dispatch_block_t block)
2064 {
2065 if (!_dispatch_block_flags_valid(flags)) return NULL;
2066 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
2067 return _dispatch_block_create_with_voucher_and_priority(flags, voucher, 0,
2068 block);
2069 }
2070
2071 dispatch_block_t
2072 dispatch_block_create_with_voucher_and_qos_class(dispatch_block_flags_t flags,
2073 voucher_t voucher, dispatch_qos_class_t qos_class,
2074 int relative_priority, dispatch_block_t block)
2075 {
2076 if (!_dispatch_block_flags_valid(flags)) return NULL;
2077 if (!_dispatch_qos_class_valid(qos_class, relative_priority)) return NULL;
2078 flags |= (DISPATCH_BLOCK_HAS_VOUCHER|DISPATCH_BLOCK_HAS_PRIORITY);
2079 pthread_priority_t pri = 0;
2080 #if HAVE_PTHREAD_WORKQUEUE_QOS
2081 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0);
2082 #endif
2083 return _dispatch_block_create_with_voucher_and_priority(flags, voucher,
2084 pri, block);
2085 }
2086
2087 void
2088 dispatch_block_perform(dispatch_block_flags_t flags, dispatch_block_t block)
2089 {
2090 if (!_dispatch_block_flags_valid(flags)) {
2091 DISPATCH_CLIENT_CRASH("Invalid flags passed to "
2092 "dispatch_block_perform()");
2093 }
2094 flags = _dispatch_block_normalize_flags(flags);
2095 struct dispatch_block_private_data_s dbpds =
2096 DISPATCH_BLOCK_PRIVATE_DATA_PERFORM_INITIALIZER(flags, block);
2097 return _dispatch_block_invoke(&dbpds);
2098 }
2099
2100 #define _dbpd_group(dbpd) ((dbpd)->dbpd_group)
2101
2102 void
2103 _dispatch_block_invoke(const struct dispatch_block_private_data_s *dbcpd)
2104 {
2105 dispatch_block_private_data_t dbpd = (dispatch_block_private_data_t)dbcpd;
2106 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2107 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2108 if (slowpath(atomic_flags & DBF_WAITED)) {
2109 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2110 "than once and waited for");
2111 }
2112 if (atomic_flags & DBF_CANCELED) goto out;
2113
2114 pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY;
2115 unsigned long override = DISPATCH_VOUCHER_IGNORE_QUEUE_OVERRIDE;
2116 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
2117 op = _dispatch_get_priority();
2118 p = dbpd->dbpd_priority;
2119 override |= (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS) ||
2120 !(flags & DISPATCH_BLOCK_INHERIT_QOS_CLASS) ?
2121 DISPATCH_PRIORITY_ENFORCE : 0;
2122 }
2123 voucher_t ov, v = DISPATCH_NO_VOUCHER;
2124 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2125 v = dbpd->dbpd_voucher;
2126 if (v) _voucher_retain(v);
2127 }
2128 ov = _dispatch_adopt_priority_and_voucher(p, v, override);
2129 dbpd->dbpd_thread = _dispatch_thread_port();
2130 _dispatch_client_callout(dbpd->dbpd_block,
2131 _dispatch_Block_invoke(dbpd->dbpd_block));
2132 _dispatch_reset_priority_and_voucher(op, ov);
2133 out:
2134 if ((atomic_flags & DBF_PERFORM) == 0) {
2135 if (dispatch_atomic_inc2o(dbpd, dbpd_performed, acquire) == 1) {
2136 dispatch_group_leave(_dbpd_group(dbpd));
2137 }
2138 }
2139 }
2140
2141 static void
2142 _dispatch_block_sync_invoke(void *block)
2143 {
2144 dispatch_block_t b = block;
2145 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
2146 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2147 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2148 if (slowpath(atomic_flags & DBF_WAITED)) {
2149 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2150 "than once and waited for");
2151 }
2152 if (atomic_flags & DBF_CANCELED) goto out;
2153
2154 pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY;
2155 unsigned long override = 0;
2156 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
2157 op = _dispatch_get_priority();
2158 p = dbpd->dbpd_priority;
2159 override = (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS) ||
2160 !(flags & DISPATCH_BLOCK_INHERIT_QOS_CLASS) ?
2161 DISPATCH_PRIORITY_ENFORCE : 0;
2162 }
2163 voucher_t ov, v = DISPATCH_NO_VOUCHER;
2164 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2165 v = dbpd->dbpd_voucher;
2166 if (v) _voucher_retain(v);
2167 }
2168 ov = _dispatch_adopt_priority_and_voucher(p, v, override);
2169 dbpd->dbpd_block();
2170 _dispatch_reset_priority_and_voucher(op, ov);
2171 out:
2172 if ((atomic_flags & DBF_PERFORM) == 0) {
2173 if (dispatch_atomic_inc2o(dbpd, dbpd_performed, acquire) == 1) {
2174 dispatch_group_leave(_dbpd_group(dbpd));
2175 }
2176 }
2177
2178 dispatch_queue_t dq = _dispatch_queue_get_current();
2179 if (dispatch_atomic_cmpxchg2o(dbpd, dbpd_queue, dq, NULL, acquire)) {
2180 // balances dispatch_{,barrier_,}sync
2181 _dispatch_release(dq);
2182 }
2183 }
2184
2185 static void
2186 _dispatch_block_async_invoke_and_release(void *block)
2187 {
2188 dispatch_block_t b = block;
2189 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
2190 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2191 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2192 if (slowpath(atomic_flags & DBF_WAITED)) {
2193 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2194 "than once and waited for");
2195 }
2196 if (atomic_flags & DBF_CANCELED) goto out;
2197
2198 pthread_priority_t p = DISPATCH_NO_PRIORITY;
2199 unsigned long override = 0;
2200 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
2201 override = (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS) ?
2202 DISPATCH_PRIORITY_ENFORCE : 0;
2203 p = dbpd->dbpd_priority;
2204 }
2205 voucher_t v = DISPATCH_NO_VOUCHER;
2206 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2207 v = dbpd->dbpd_voucher;
2208 if (v) _voucher_retain(v);
2209 }
2210 _dispatch_adopt_priority_and_replace_voucher(p, v, override);
2211 dbpd->dbpd_block();
2212 out:
2213 if ((atomic_flags & DBF_PERFORM) == 0) {
2214 if (dispatch_atomic_inc2o(dbpd, dbpd_performed, acquire) == 1) {
2215 dispatch_group_leave(_dbpd_group(dbpd));
2216 }
2217 }
2218 dispatch_queue_t dq = _dispatch_queue_get_current();
2219 if (dispatch_atomic_cmpxchg2o(dbpd, dbpd_queue, dq, NULL, acquire)) {
2220 // balances dispatch_{,barrier_,group_}async
2221 _dispatch_release(dq);
2222 }
2223 Block_release(b);
2224 }
2225
2226 void
2227 dispatch_block_cancel(dispatch_block_t db)
2228 {
2229 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2230 if (!dbpd) {
2231 DISPATCH_CLIENT_CRASH("Invalid block object passed to "
2232 "dispatch_block_cancel()");
2233 }
2234 (void)dispatch_atomic_or2o(dbpd, dbpd_atomic_flags, DBF_CANCELED, relaxed);
2235 }
2236
2237 long
2238 dispatch_block_testcancel(dispatch_block_t db)
2239 {
2240 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2241 if (!dbpd) {
2242 DISPATCH_CLIENT_CRASH("Invalid block object passed to "
2243 "dispatch_block_testcancel()");
2244 }
2245 return (bool)(dbpd->dbpd_atomic_flags & DBF_CANCELED);
2246 }
2247
2248 long
2249 dispatch_block_wait(dispatch_block_t db, dispatch_time_t timeout)
2250 {
2251 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2252 if (!dbpd) {
2253 DISPATCH_CLIENT_CRASH("Invalid block object passed to "
2254 "dispatch_block_wait()");
2255 }
2256
2257 unsigned int flags = dispatch_atomic_or_orig2o(dbpd, dbpd_atomic_flags,
2258 DBF_WAITING, relaxed);
2259 if (slowpath(flags & (DBF_WAITED | DBF_WAITING))) {
2260 DISPATCH_CLIENT_CRASH("A block object may not be waited for "
2261 "more than once");
2262 }
2263
2264 // <rdar://problem/17703192> If we know the queue where this block is
2265 // enqueued, or the thread that's executing it, then we should boost
2266 // it here.
2267
2268 pthread_priority_t pp = _dispatch_get_priority();
2269
2270 dispatch_queue_t boost_dq;
2271 boost_dq = dispatch_atomic_xchg2o(dbpd, dbpd_queue, NULL, acquire);
2272 if (boost_dq) {
2273 // release balances dispatch_{,barrier_,group_}async.
2274 // Can't put the queue back in the timeout case: the block might
2275 // finish after we fell out of group_wait and see our NULL, so
2276 // neither of us would ever release. Side effect: After a _wait
2277 // that times out, subsequent waits will not boost the qos of the
2278 // still-running block.
2279 _dispatch_queue_wakeup_with_qos_and_release(boost_dq, pp);
2280 }
2281
2282 mach_port_t boost_th = dbpd->dbpd_thread;
2283 if (boost_th) {
2284 _dispatch_thread_override_start(boost_th, pp);
2285 }
2286
2287 int performed = dispatch_atomic_load2o(dbpd, dbpd_performed, relaxed);
2288 if (slowpath(performed > 1 || (boost_th && boost_dq))) {
2289 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2290 "than once and waited for");
2291 }
2292
2293 long ret = dispatch_group_wait(_dbpd_group(dbpd), timeout);
2294
2295 if (boost_th) {
2296 _dispatch_thread_override_end(boost_th);
2297 }
2298
2299 if (ret) {
2300 // timed out: reverse our changes
2301 (void)dispatch_atomic_and2o(dbpd, dbpd_atomic_flags,
2302 ~DBF_WAITING, relaxed);
2303 } else {
2304 (void)dispatch_atomic_or2o(dbpd, dbpd_atomic_flags,
2305 DBF_WAITED, relaxed);
2306 // don't need to re-test here: the second call would see
2307 // the first call's WAITING
2308 }
2309
2310 return ret;
2311 }
2312
2313 void
2314 dispatch_block_notify(dispatch_block_t db, dispatch_queue_t queue,
2315 dispatch_block_t notification_block)
2316 {
2317 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2318 if (!dbpd) {
2319 DISPATCH_CLIENT_CRASH("Invalid block object passed to "
2320 "dispatch_block_notify()");
2321 }
2322 int performed = dispatch_atomic_load2o(dbpd, dbpd_performed, relaxed);
2323 if (slowpath(performed > 1)) {
2324 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2325 "than once and observed");
2326 }
2327
2328 return dispatch_group_notify(_dbpd_group(dbpd), queue, notification_block);
2329 }
2330
2331 #endif // __BLOCKS__
2332
2333 #pragma mark -
2334 #pragma mark dispatch_barrier_async
2335
2336 DISPATCH_NOINLINE
2337 static void
2338 _dispatch_barrier_async_f_slow(dispatch_queue_t dq, void *ctxt,
2339 dispatch_function_t func, pthread_priority_t pp,
2340 dispatch_block_flags_t flags)
2341 {
2342 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
2343
2344 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
2345 dc->dc_func = func;
2346 dc->dc_ctxt = ctxt;
2347 _dispatch_continuation_voucher_set(dc, flags);
2348 _dispatch_continuation_priority_set(dc, pp, flags);
2349
2350 pp = _dispatch_continuation_get_override_priority(dq, dc);
2351
2352 _dispatch_queue_push(dq, dc, pp);
2353 }
2354
2355 DISPATCH_ALWAYS_INLINE
2356 static inline void
2357 _dispatch_barrier_async_f2(dispatch_queue_t dq, void *ctxt,
2358 dispatch_function_t func, pthread_priority_t pp,
2359 dispatch_block_flags_t flags)
2360 {
2361 dispatch_continuation_t dc;
2362
2363 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
2364 if (!dc) {
2365 return _dispatch_barrier_async_f_slow(dq, ctxt, func, pp, flags);
2366 }
2367
2368 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
2369 dc->dc_func = func;
2370 dc->dc_ctxt = ctxt;
2371 _dispatch_continuation_voucher_set(dc, flags);
2372 _dispatch_continuation_priority_set(dc, pp, flags);
2373
2374 pp = _dispatch_continuation_get_override_priority(dq, dc);
2375
2376 _dispatch_queue_push(dq, dc, pp);
2377 }
2378
2379 DISPATCH_NOINLINE
2380 static void
2381 _dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
2382 dispatch_function_t func, pthread_priority_t pp,
2383 dispatch_block_flags_t flags)
2384 {
2385 return _dispatch_barrier_async_f2(dq, ctxt, func, pp, flags);
2386 }
2387
2388 DISPATCH_NOINLINE
2389 void
2390 dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
2391 dispatch_function_t func)
2392 {
2393 return _dispatch_barrier_async_f2(dq, ctxt, func, 0, 0);
2394 }
2395
2396 DISPATCH_NOINLINE
2397 void
2398 _dispatch_barrier_async_detached_f(dispatch_queue_t dq, void *ctxt,
2399 dispatch_function_t func)
2400 {
2401 return _dispatch_barrier_async_f2(dq, ctxt, func, 0,
2402 DISPATCH_BLOCK_NO_QOS_CLASS|DISPATCH_BLOCK_NO_VOUCHER);
2403 }
2404
2405 #ifdef __BLOCKS__
2406 void
2407 dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void))
2408 {
2409 dispatch_function_t func = _dispatch_call_block_and_release;
2410 pthread_priority_t pp = 0;
2411 dispatch_block_flags_t flags = 0;
2412 if (slowpath(_dispatch_block_has_private_data(work))) {
2413 func = _dispatch_block_async_invoke_and_release;
2414 pp = _dispatch_block_get_priority(work);
2415 flags = _dispatch_block_get_flags(work);
2416 // balanced in d_block_async_invoke_and_release or d_block_wait
2417 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work),
2418 dbpd_queue, NULL, dq, release)) {
2419 _dispatch_retain(dq);
2420 }
2421 }
2422 _dispatch_barrier_async_f(dq, _dispatch_Block_copy(work), func, pp, flags);
2423 }
2424 #endif
2425
2426 #pragma mark -
2427 #pragma mark dispatch_async
2428
2429 void
2430 _dispatch_async_redirect_invoke(void *ctxt)
2431 {
2432 struct dispatch_continuation_s *dc = ctxt;
2433 struct dispatch_continuation_s *other_dc = dc->dc_other;
2434 dispatch_queue_t old_dq, dq = dc->dc_data, rq;
2435
2436 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2437 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2438 pthread_priority_t old_dp = _dispatch_set_defaultpriority(dq->dq_priority);
2439 _dispatch_continuation_pop(other_dc);
2440 _dispatch_reset_defaultpriority(old_dp);
2441 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2442
2443 rq = dq->do_targetq;
2444 while (slowpath(rq->do_targetq) && rq != old_dq) {
2445 if (dispatch_atomic_sub2o(rq, dq_running, 2, relaxed) == 0) {
2446 _dispatch_queue_wakeup(rq);
2447 }
2448 rq = rq->do_targetq;
2449 }
2450
2451 if (dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0) {
2452 _dispatch_queue_wakeup(dq);
2453 }
2454 _dispatch_release(dq);
2455 }
2456
2457 static inline void
2458 _dispatch_async_f_redirect2(dispatch_queue_t dq, dispatch_continuation_t dc,
2459 pthread_priority_t pp)
2460 {
2461 uint32_t running = 2;
2462
2463 // Find the queue to redirect to
2464 do {
2465 if (slowpath(dq->dq_items_tail) ||
2466 slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) ||
2467 slowpath(dq->dq_width == 1)) {
2468 break;
2469 }
2470 running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
2471 if (slowpath(running & 1) || slowpath(running > dq->dq_width)) {
2472 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
2473 break;
2474 }
2475 dq = dq->do_targetq;
2476 } while (slowpath(dq->do_targetq));
2477
2478 _dispatch_queue_push_wakeup(dq, dc, pp, running == 0);
2479 }
2480
2481 DISPATCH_NOINLINE
2482 static void
2483 _dispatch_async_f_redirect(dispatch_queue_t dq,
2484 dispatch_continuation_t other_dc, pthread_priority_t pp)
2485 {
2486 dispatch_continuation_t dc = _dispatch_continuation_alloc();
2487
2488 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
2489 dc->dc_func = _dispatch_async_redirect_invoke;
2490 dc->dc_ctxt = dc;
2491 dc->dc_data = dq;
2492 dc->dc_other = other_dc;
2493 dc->dc_priority = 0;
2494 dc->dc_voucher = NULL;
2495
2496 _dispatch_retain(dq);
2497 dq = dq->do_targetq;
2498 if (slowpath(dq->do_targetq)) {
2499 return _dispatch_async_f_redirect2(dq, dc, pp);
2500 }
2501
2502 _dispatch_queue_push(dq, dc, pp);
2503 }
2504
2505 DISPATCH_NOINLINE
2506 static void
2507 _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc,
2508 pthread_priority_t pp)
2509 {
2510 uint32_t running = 2;
2511
2512 do {
2513 if (slowpath(dq->dq_items_tail)
2514 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
2515 break;
2516 }
2517 running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
2518 if (slowpath(running > dq->dq_width)) {
2519 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
2520 break;
2521 }
2522 if (!slowpath(running & 1)) {
2523 return _dispatch_async_f_redirect(dq, dc, pp);
2524 }
2525 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
2526 // We might get lucky and find that the barrier has ended by now
2527 } while (!(running & 1));
2528
2529 _dispatch_queue_push_wakeup(dq, dc, pp, running == 0);
2530 }
2531
2532 DISPATCH_NOINLINE
2533 static void
2534 _dispatch_async_f_slow(dispatch_queue_t dq, void *ctxt,
2535 dispatch_function_t func, pthread_priority_t pp,
2536 dispatch_block_flags_t flags)
2537 {
2538 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
2539
2540 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
2541 dc->dc_func = func;
2542 dc->dc_ctxt = ctxt;
2543 _dispatch_continuation_voucher_set(dc, flags);
2544 _dispatch_continuation_priority_set(dc, pp, flags);
2545
2546 pp = _dispatch_continuation_get_override_priority(dq, dc);
2547
2548 // No fastpath/slowpath hint because we simply don't know
2549 if (dq->do_targetq) {
2550 return _dispatch_async_f2(dq, dc, pp);
2551 }
2552
2553 _dispatch_queue_push(dq, dc, pp);
2554 }
2555
2556 DISPATCH_ALWAYS_INLINE
2557 static inline void
2558 _dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
2559 pthread_priority_t pp, dispatch_block_flags_t flags)
2560 {
2561 dispatch_continuation_t dc;
2562
2563 // No fastpath/slowpath hint because we simply don't know
2564 if (dq->dq_width == 1 || flags & DISPATCH_BLOCK_BARRIER) {
2565 return _dispatch_barrier_async_f(dq, ctxt, func, pp, flags);
2566 }
2567
2568 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
2569 if (!dc) {
2570 return _dispatch_async_f_slow(dq, ctxt, func, pp, flags);
2571 }
2572
2573 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
2574 dc->dc_func = func;
2575 dc->dc_ctxt = ctxt;
2576 _dispatch_continuation_voucher_set(dc, flags);
2577 _dispatch_continuation_priority_set(dc, pp, flags);
2578
2579 pp = _dispatch_continuation_get_override_priority(dq, dc);
2580
2581 // No fastpath/slowpath hint because we simply don't know
2582 if (dq->do_targetq) {
2583 return _dispatch_async_f2(dq, dc, pp);
2584 }
2585
2586 _dispatch_queue_push(dq, dc, pp);
2587 }
2588
2589 DISPATCH_NOINLINE
2590 void
2591 dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
2592 {
2593 return _dispatch_async_f(dq, ctxt, func, 0, 0);
2594 }
2595
2596 DISPATCH_NOINLINE
2597 void
2598 dispatch_async_enforce_qos_class_f(dispatch_queue_t dq, void *ctxt,
2599 dispatch_function_t func)
2600 {
2601 return _dispatch_async_f(dq, ctxt, func, 0,
2602 DISPATCH_BLOCK_ENFORCE_QOS_CLASS);
2603 }
2604
2605 #ifdef __BLOCKS__
2606 void
2607 dispatch_async(dispatch_queue_t dq, void (^work)(void))
2608 {
2609 dispatch_function_t func = _dispatch_call_block_and_release;
2610 dispatch_block_flags_t flags = 0;
2611 pthread_priority_t pp = 0;
2612 if (slowpath(_dispatch_block_has_private_data(work))) {
2613 func = _dispatch_block_async_invoke_and_release;
2614 pp = _dispatch_block_get_priority(work);
2615 flags = _dispatch_block_get_flags(work);
2616 // balanced in d_block_async_invoke_and_release or d_block_wait
2617 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work),
2618 dbpd_queue, NULL, dq, release)) {
2619 _dispatch_retain(dq);
2620 }
2621 }
2622 _dispatch_async_f(dq, _dispatch_Block_copy(work), func, pp, flags);
2623 }
2624 #endif
2625
2626 #pragma mark -
2627 #pragma mark dispatch_group_async
2628
2629 DISPATCH_ALWAYS_INLINE
2630 static inline void
2631 _dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
2632 dispatch_function_t func, pthread_priority_t pp,
2633 dispatch_block_flags_t flags)
2634 {
2635 dispatch_continuation_t dc;
2636
2637 _dispatch_retain(dg);
2638 dispatch_group_enter(dg);
2639
2640 dc = _dispatch_continuation_alloc();
2641
2642 unsigned long barrier = (flags & DISPATCH_BLOCK_BARRIER) ?
2643 DISPATCH_OBJ_BARRIER_BIT : 0;
2644 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_GROUP_BIT |
2645 barrier);
2646 dc->dc_func = func;
2647 dc->dc_ctxt = ctxt;
2648 dc->dc_data = dg;
2649 _dispatch_continuation_voucher_set(dc, flags);
2650 _dispatch_continuation_priority_set(dc, pp, flags);
2651
2652 pp = _dispatch_continuation_get_override_priority(dq, dc);
2653
2654 // No fastpath/slowpath hint because we simply don't know
2655 if (dq->dq_width != 1 && !barrier && dq->do_targetq) {
2656 return _dispatch_async_f2(dq, dc, pp);
2657 }
2658
2659 _dispatch_queue_push(dq, dc, pp);
2660 }
2661
2662 DISPATCH_NOINLINE
2663 void
2664 dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
2665 dispatch_function_t func)
2666 {
2667 return _dispatch_group_async_f(dg, dq, ctxt, func, 0, 0);
2668 }
2669
2670 #ifdef __BLOCKS__
2671 void
2672 dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
2673 dispatch_block_t db)
2674 {
2675 dispatch_function_t func = _dispatch_call_block_and_release;
2676 dispatch_block_flags_t flags = 0;
2677 pthread_priority_t pp = 0;
2678 if (slowpath(_dispatch_block_has_private_data(db))) {
2679 func = _dispatch_block_async_invoke_and_release;
2680 pp = _dispatch_block_get_priority(db);
2681 flags = _dispatch_block_get_flags(db);
2682 // balanced in d_block_async_invoke_and_release or d_block_wait
2683 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(db),
2684 dbpd_queue, NULL, dq, release)) {
2685 _dispatch_retain(dq);
2686 }
2687 }
2688 _dispatch_group_async_f(dg, dq, _dispatch_Block_copy(db), func, pp, flags);
2689 }
2690 #endif
2691
2692 #pragma mark -
2693 #pragma mark dispatch_function_invoke
2694
2695 static void _dispatch_sync_f(dispatch_queue_t dq, void *ctxt,
2696 dispatch_function_t func, pthread_priority_t pp);
2697
2698 DISPATCH_NOINLINE
2699 static void
2700 _dispatch_function_invoke_slow(dispatch_queue_t dq, void *ctxt,
2701 dispatch_function_t func)
2702 {
2703 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2704 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2705 voucher_t ov = _dispatch_adopt_queue_override_voucher(dq);
2706 _dispatch_client_callout(ctxt, func);
2707 _dispatch_perfmon_workitem_inc();
2708 _dispatch_reset_voucher(ov);
2709 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2710 }
2711
2712 DISPATCH_ALWAYS_INLINE
2713 static inline void
2714 _dispatch_function_invoke(dispatch_queue_t dq, void *ctxt,
2715 dispatch_function_t func)
2716 {
2717 if (slowpath(dq->dq_override_voucher != DISPATCH_NO_VOUCHER)) {
2718 return _dispatch_function_invoke_slow(dq, ctxt, func);
2719 }
2720 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2721 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2722 _dispatch_client_callout(ctxt, func);
2723 _dispatch_perfmon_workitem_inc();
2724 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2725 }
2726
2727 void
2728 _dispatch_sync_recurse_invoke(void *ctxt)
2729 {
2730 dispatch_continuation_t dc = ctxt;
2731 _dispatch_function_invoke(dc->dc_data, dc->dc_ctxt, dc->dc_func);
2732 }
2733
2734 DISPATCH_ALWAYS_INLINE
2735 static inline void
2736 _dispatch_function_recurse(dispatch_queue_t dq, void *ctxt,
2737 dispatch_function_t func, pthread_priority_t pp)
2738 {
2739 struct dispatch_continuation_s dc = {
2740 .dc_data = dq,
2741 .dc_func = func,
2742 .dc_ctxt = ctxt,
2743 };
2744 _dispatch_sync_f(dq->do_targetq, &dc, _dispatch_sync_recurse_invoke, pp);
2745 }
2746
2747 #pragma mark -
2748 #pragma mark dispatch_barrier_sync
2749
2750 static void _dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
2751 dispatch_function_t func);
2752
2753 DISPATCH_ALWAYS_INLINE_NDEBUG
2754 static inline _dispatch_thread_semaphore_t
2755 _dispatch_barrier_sync_f_pop(dispatch_queue_t dq, dispatch_object_t dou,
2756 bool lock)
2757 {
2758 _dispatch_thread_semaphore_t sema;
2759 dispatch_continuation_t dc = dou._dc;
2760 mach_port_t th;
2761
2762 if (DISPATCH_OBJ_IS_VTABLE(dc) || ((long)dc->do_vtable &
2763 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) !=
2764 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) {
2765 return 0;
2766 }
2767 _dispatch_trace_continuation_pop(dq, dc);
2768 _dispatch_perfmon_workitem_inc();
2769
2770 th = (mach_port_t)dc->dc_data;
2771 dc = dc->dc_ctxt;
2772 dq = dc->dc_data;
2773 sema = (_dispatch_thread_semaphore_t)dc->dc_other;
2774 if (lock) {
2775 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
2776 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
2777 // rdar://problem/9032024 running lock must be held until sync_f_slow
2778 // returns
2779 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
2780 }
2781 _dispatch_introspection_queue_item_complete(dou);
2782 _dispatch_wqthread_override_start(th,
2783 _dispatch_queue_get_override_priority(dq));
2784 return sema ? sema : MACH_PORT_DEAD;
2785 }
2786
2787 static void
2788 _dispatch_barrier_sync_f_slow_invoke(void *ctxt)
2789 {
2790 dispatch_continuation_t dc = ctxt;
2791 dispatch_queue_t dq = dc->dc_data;
2792 _dispatch_thread_semaphore_t sema;
2793 sema = (_dispatch_thread_semaphore_t)dc->dc_other;
2794
2795 dispatch_assert(dq == _dispatch_queue_get_current());
2796 #if DISPATCH_COCOA_COMPAT
2797 if (slowpath(dq->dq_is_thread_bound)) {
2798 // The queue is bound to a non-dispatch thread (e.g. main thread)
2799 _dispatch_continuation_voucher_adopt(dc);
2800 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
2801 dispatch_atomic_store2o(dc, dc_func, NULL, release);
2802 _dispatch_thread_semaphore_signal(sema); // release
2803 return;
2804 }
2805 #endif
2806 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
2807 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
2808 // rdar://9032024 running lock must be held until sync_f_slow returns
2809 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
2810 _dispatch_thread_semaphore_signal(sema); // release
2811 }
2812
2813 DISPATCH_NOINLINE
2814 static void
2815 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
2816 dispatch_function_t func, pthread_priority_t pp)
2817 {
2818 if (slowpath(!dq->do_targetq)) {
2819 // the global concurrent queues do not need strict ordering
2820 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
2821 return _dispatch_sync_f_invoke(dq, ctxt, func);
2822 }
2823 if (!pp) pp = (_dispatch_get_priority() | _PTHREAD_PRIORITY_ENFORCE_FLAG);
2824 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
2825 struct dispatch_continuation_s dc = {
2826 .dc_data = dq,
2827 #if DISPATCH_COCOA_COMPAT
2828 .dc_func = func,
2829 .dc_ctxt = ctxt,
2830 #endif
2831 .dc_other = (void*)sema,
2832 };
2833 #if DISPATCH_COCOA_COMPAT
2834 // It's preferred to execute synchronous blocks on the current thread
2835 // due to thread-local side effects, garbage collection, etc. However,
2836 // blocks submitted to the main thread MUST be run on the main thread
2837 if (slowpath(dq->dq_is_thread_bound)) {
2838 _dispatch_continuation_voucher_set(&dc, 0);
2839 }
2840 #endif
2841 struct dispatch_continuation_s dbss = {
2842 .do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT |
2843 DISPATCH_OBJ_SYNC_SLOW_BIT),
2844 .dc_func = _dispatch_barrier_sync_f_slow_invoke,
2845 .dc_ctxt = &dc,
2846 .dc_data = (void*)(uintptr_t)_dispatch_thread_port(),
2847 .dc_priority = pp,
2848 };
2849 _dispatch_queue_push(dq, &dbss,
2850 _dispatch_continuation_get_override_priority(dq, &dbss));
2851
2852 _dispatch_thread_semaphore_wait(sema); // acquire
2853 _dispatch_put_thread_semaphore(sema);
2854
2855 pthread_priority_t p = _dispatch_queue_get_override_priority(dq);
2856 if (p > (pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
2857 // Ensure that the root queue sees that this thread was overridden.
2858 _dispatch_set_defaultpriority_override();
2859 }
2860
2861 #if DISPATCH_COCOA_COMPAT
2862 // Queue bound to a non-dispatch thread
2863 if (dc.dc_func == NULL) {
2864 return;
2865 }
2866 #endif
2867
2868 _dispatch_queue_set_thread(dq);
2869 if (slowpath(dq->do_targetq->do_targetq)) {
2870 _dispatch_function_recurse(dq, ctxt, func, pp);
2871 } else {
2872 _dispatch_function_invoke(dq, ctxt, func);
2873 }
2874 _dispatch_queue_clear_thread(dq);
2875
2876 if (fastpath(dq->do_suspend_cnt < 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL) &&
2877 dq->dq_running == 2) {
2878 // rdar://problem/8290662 "lock transfer"
2879 sema = _dispatch_queue_drain_one_barrier_sync(dq);
2880 if (sema) {
2881 _dispatch_thread_semaphore_signal(sema); // release
2882 return;
2883 }
2884 }
2885 (void)dispatch_atomic_sub2o(dq, do_suspend_cnt,
2886 DISPATCH_OBJECT_SUSPEND_INTERVAL, release);
2887 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
2888 _dispatch_queue_wakeup(dq);
2889 }
2890 }
2891
2892 DISPATCH_NOINLINE
2893 static void
2894 _dispatch_barrier_sync_f2(dispatch_queue_t dq)
2895 {
2896 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
2897 // rdar://problem/8290662 "lock transfer"
2898 _dispatch_thread_semaphore_t sema;
2899 sema = _dispatch_queue_drain_one_barrier_sync(dq);
2900 if (sema) {
2901 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
2902 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
2903 // rdar://9032024 running lock must be held until sync_f_slow
2904 // returns: increment by 2 and decrement by 1
2905 (void)dispatch_atomic_inc2o(dq, dq_running, relaxed);
2906 _dispatch_thread_semaphore_signal(sema);
2907 return;
2908 }
2909 }
2910 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
2911 _dispatch_queue_wakeup(dq);
2912 }
2913 }
2914
2915 DISPATCH_NOINLINE
2916 static void
2917 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
2918 dispatch_function_t func)
2919 {
2920 _dispatch_queue_set_thread(dq);
2921 _dispatch_function_invoke(dq, ctxt, func);
2922 _dispatch_queue_clear_thread(dq);
2923 if (slowpath(dq->dq_items_tail)) {
2924 return _dispatch_barrier_sync_f2(dq);
2925 }
2926 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
2927 _dispatch_queue_wakeup(dq);
2928 }
2929 }
2930
2931 DISPATCH_NOINLINE
2932 static void
2933 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
2934 dispatch_function_t func, pthread_priority_t pp)
2935 {
2936 _dispatch_queue_set_thread(dq);
2937 _dispatch_function_recurse(dq, ctxt, func, pp);
2938 _dispatch_queue_clear_thread(dq);
2939 if (slowpath(dq->dq_items_tail)) {
2940 return _dispatch_barrier_sync_f2(dq);
2941 }
2942 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
2943 _dispatch_queue_wakeup(dq);
2944 }
2945 }
2946
2947 DISPATCH_NOINLINE
2948 static void
2949 _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
2950 dispatch_function_t func, pthread_priority_t pp)
2951 {
2952 // 1) ensure that this thread hasn't enqueued anything ahead of this call
2953 // 2) the queue is not suspended
2954 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
2955 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, pp);
2956 }
2957 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, acquire))) {
2958 // global concurrent queues and queues bound to non-dispatch threads
2959 // always fall into the slow case
2960 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, pp);
2961 }
2962 if (slowpath(dq->do_targetq->do_targetq)) {
2963 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func, pp);
2964 }
2965 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
2966 }
2967
2968 DISPATCH_NOINLINE
2969 void
2970 dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
2971 dispatch_function_t func)
2972 {
2973 // 1) ensure that this thread hasn't enqueued anything ahead of this call
2974 // 2) the queue is not suspended
2975 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
2976 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, 0);
2977 }
2978 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, acquire))) {
2979 // global concurrent queues and queues bound to non-dispatch threads
2980 // always fall into the slow case
2981 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, 0);
2982 }
2983 if (slowpath(dq->do_targetq->do_targetq)) {
2984 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func, 0);
2985 }
2986 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
2987 }
2988
2989 #ifdef __BLOCKS__
2990 DISPATCH_NOINLINE
2991 static void
2992 _dispatch_barrier_sync_slow(dispatch_queue_t dq, void (^work)(void))
2993 {
2994 bool has_pd = _dispatch_block_has_private_data(work);
2995 dispatch_function_t func = _dispatch_Block_invoke(work);
2996 pthread_priority_t pp = 0;
2997 if (has_pd) {
2998 func = _dispatch_block_sync_invoke;
2999 pp = _dispatch_block_get_priority(work);
3000 dispatch_block_flags_t flags = _dispatch_block_get_flags(work);
3001 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
3002 pthread_priority_t tp = _dispatch_get_priority();
3003 if (pp < tp) {
3004 pp = tp | _PTHREAD_PRIORITY_ENFORCE_FLAG;
3005 } else if (!(flags & DISPATCH_BLOCK_INHERIT_QOS_CLASS)) {
3006 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
3007 }
3008 }
3009 // balanced in d_block_sync_invoke or d_block_wait
3010 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work),
3011 dbpd_queue, NULL, dq, release)) {
3012 _dispatch_retain(dq);
3013 }
3014 #if DISPATCH_COCOA_COMPAT
3015 } else if (dq->dq_is_thread_bound && dispatch_begin_thread_4GC) {
3016 // Blocks submitted to the main queue MUST be run on the main thread,
3017 // under GC we must Block_copy in order to notify the thread-local
3018 // garbage collector that the objects are transferring to another thread
3019 // rdar://problem/7176237&7181849&7458685
3020 work = _dispatch_Block_copy(work);
3021 func = _dispatch_call_block_and_release;
3022 }
3023 #endif
3024 _dispatch_barrier_sync_f(dq, work, func, pp);
3025 }
3026
3027 void
3028 dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void))
3029 {
3030 if (slowpath(dq->dq_is_thread_bound) ||
3031 slowpath(_dispatch_block_has_private_data(work))) {
3032 return _dispatch_barrier_sync_slow(dq, work);
3033 }
3034 dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work));
3035 }
3036 #endif
3037
3038 DISPATCH_NOINLINE
3039 static void
3040 _dispatch_barrier_trysync_f_invoke(dispatch_queue_t dq, void *ctxt,
3041 dispatch_function_t func)
3042 {
3043 _dispatch_queue_set_thread(dq);
3044 _dispatch_function_invoke(dq, ctxt, func);
3045 _dispatch_queue_clear_thread(dq);
3046 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
3047 _dispatch_queue_wakeup(dq);
3048 }
3049 }
3050
3051 DISPATCH_NOINLINE
3052 void
3053 _dispatch_barrier_trysync_f(dispatch_queue_t dq, void *ctxt,
3054 dispatch_function_t func)
3055 {
3056 // Use for mutation of queue-/source-internal state only, ignores target
3057 // queue hierarchy!
3058 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))
3059 || slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1,
3060 acquire))) {
3061 return _dispatch_barrier_async_detached_f(dq, ctxt, func);
3062 }
3063 _dispatch_barrier_trysync_f_invoke(dq, ctxt, func);
3064 }
3065
3066 #pragma mark -
3067 #pragma mark dispatch_sync
3068
3069 DISPATCH_NOINLINE
3070 static void
3071 _dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3072 pthread_priority_t pp, bool wakeup)
3073 {
3074 if (!pp) pp = (_dispatch_get_priority() | _PTHREAD_PRIORITY_ENFORCE_FLAG);
3075 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
3076 struct dispatch_continuation_s dc = {
3077 .do_vtable = (void*)DISPATCH_OBJ_SYNC_SLOW_BIT,
3078 #if DISPATCH_INTROSPECTION
3079 .dc_func = func,
3080 .dc_ctxt = ctxt,
3081 .dc_data = (void*)(uintptr_t)_dispatch_thread_port(),
3082 #endif
3083 .dc_other = (void*)sema,
3084 .dc_priority = pp,
3085 };
3086 _dispatch_queue_push_wakeup(dq, &dc,
3087 _dispatch_continuation_get_override_priority(dq, &dc), wakeup);
3088
3089 _dispatch_thread_semaphore_wait(sema);
3090 _dispatch_put_thread_semaphore(sema);
3091
3092 pthread_priority_t p = _dispatch_queue_get_override_priority(dq);
3093 if (p > (pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
3094 // Ensure that the root queue sees that this thread was overridden.
3095 _dispatch_set_defaultpriority_override();
3096 }
3097
3098 if (slowpath(dq->do_targetq->do_targetq)) {
3099 _dispatch_function_recurse(dq, ctxt, func, pp);
3100 } else {
3101 _dispatch_function_invoke(dq, ctxt, func);
3102 }
3103
3104 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
3105 _dispatch_queue_wakeup(dq);
3106 }
3107 }
3108
3109 DISPATCH_NOINLINE
3110 static void
3111 _dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
3112 dispatch_function_t func)
3113 {
3114 _dispatch_function_invoke(dq, ctxt, func);
3115 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
3116 _dispatch_queue_wakeup(dq);
3117 }
3118 }
3119
3120 DISPATCH_NOINLINE
3121 static void
3122 _dispatch_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
3123 dispatch_function_t func, pthread_priority_t pp)
3124 {
3125 _dispatch_function_recurse(dq, ctxt, func, pp);
3126 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
3127 _dispatch_queue_wakeup(dq);
3128 }
3129 }
3130
3131 static inline void
3132 _dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3133 pthread_priority_t pp)
3134 {
3135 // 1) ensure that this thread hasn't enqueued anything ahead of this call
3136 // 2) the queue is not suspended
3137 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
3138 return _dispatch_sync_f_slow(dq, ctxt, func, pp, false);
3139 }
3140 uint32_t running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
3141 // re-check suspension after barrier check <rdar://problem/15242126>
3142 if (slowpath(running & 1) || _dispatch_object_suspended(dq)) {
3143 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
3144 return _dispatch_sync_f_slow(dq, ctxt, func, pp, running == 0);
3145 }
3146 if (slowpath(dq->do_targetq->do_targetq)) {
3147 return _dispatch_sync_f_recurse(dq, ctxt, func, pp);
3148 }
3149 _dispatch_sync_f_invoke(dq, ctxt, func);
3150 }
3151
3152 DISPATCH_NOINLINE
3153 static void
3154 _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3155 pthread_priority_t pp)
3156 {
3157 if (fastpath(dq->dq_width == 1)) {
3158 return _dispatch_barrier_sync_f(dq, ctxt, func, pp);
3159 }
3160 if (slowpath(!dq->do_targetq)) {
3161 // the global concurrent queues do not need strict ordering
3162 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
3163 return _dispatch_sync_f_invoke(dq, ctxt, func);
3164 }
3165 _dispatch_sync_f2(dq, ctxt, func, pp);
3166 }
3167
3168 DISPATCH_NOINLINE
3169 void
3170 dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
3171 {
3172 if (fastpath(dq->dq_width == 1)) {
3173 return dispatch_barrier_sync_f(dq, ctxt, func);
3174 }
3175 if (slowpath(!dq->do_targetq)) {
3176 // the global concurrent queues do not need strict ordering
3177 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
3178 return _dispatch_sync_f_invoke(dq, ctxt, func);
3179 }
3180 _dispatch_sync_f2(dq, ctxt, func, 0);
3181 }
3182
3183 #ifdef __BLOCKS__
3184 DISPATCH_NOINLINE
3185 static void
3186 _dispatch_sync_slow(dispatch_queue_t dq, void (^work)(void))
3187 {
3188 bool has_pd = _dispatch_block_has_private_data(work);
3189 if (has_pd && (_dispatch_block_get_flags(work) & DISPATCH_BLOCK_BARRIER)) {
3190 return _dispatch_barrier_sync_slow(dq, work);
3191 }
3192 dispatch_function_t func = _dispatch_Block_invoke(work);
3193 pthread_priority_t pp = 0;
3194 if (has_pd) {
3195 func = _dispatch_block_sync_invoke;
3196 pp = _dispatch_block_get_priority(work);
3197 dispatch_block_flags_t flags = _dispatch_block_get_flags(work);
3198 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
3199 pthread_priority_t tp = _dispatch_get_priority();
3200 if (pp < tp) {
3201 pp = tp | _PTHREAD_PRIORITY_ENFORCE_FLAG;
3202 } else if (!(flags & DISPATCH_BLOCK_INHERIT_QOS_CLASS)) {
3203 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
3204 }
3205 }
3206 // balanced in d_block_sync_invoke or d_block_wait
3207 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work),
3208 dbpd_queue, NULL, dq, release)) {
3209 _dispatch_retain(dq);
3210 }
3211 #if DISPATCH_COCOA_COMPAT
3212 } else if (dq->dq_is_thread_bound && dispatch_begin_thread_4GC) {
3213 // Blocks submitted to the main queue MUST be run on the main thread,
3214 // under GC we must Block_copy in order to notify the thread-local
3215 // garbage collector that the objects are transferring to another thread
3216 // rdar://problem/7176237&7181849&7458685
3217 work = _dispatch_Block_copy(work);
3218 func = _dispatch_call_block_and_release;
3219 #endif
3220 }
3221 if (slowpath(!dq->do_targetq)) {
3222 // the global concurrent queues do not need strict ordering
3223 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
3224 return _dispatch_sync_f_invoke(dq, work, func);
3225 }
3226 _dispatch_sync_f2(dq, work, func, pp);
3227 }
3228
3229 void
3230 dispatch_sync(dispatch_queue_t dq, void (^work)(void))
3231 {
3232 if (fastpath(dq->dq_width == 1)) {
3233 return dispatch_barrier_sync(dq, work);
3234 }
3235 if (slowpath(dq->dq_is_thread_bound) ||
3236 slowpath(_dispatch_block_has_private_data(work)) ) {
3237 return _dispatch_sync_slow(dq, work);
3238 }
3239 dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
3240 }
3241 #endif
3242
3243 #pragma mark -
3244 #pragma mark dispatch_after
3245
3246 void
3247 _dispatch_after_timer_callback(void *ctxt)
3248 {
3249 dispatch_continuation_t dc = ctxt, dc1;
3250 dispatch_source_t ds = dc->dc_data;
3251 dc1 = _dispatch_continuation_free_cacheonly(dc);
3252 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
3253 dispatch_source_cancel(ds);
3254 dispatch_release(ds);
3255 if (slowpath(dc1)) {
3256 _dispatch_continuation_free_to_cache_limit(dc1);
3257 }
3258 }
3259
3260 DISPATCH_NOINLINE
3261 void
3262 dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
3263 dispatch_function_t func)
3264 {
3265 uint64_t delta, leeway;
3266 dispatch_source_t ds;
3267
3268 if (when == DISPATCH_TIME_FOREVER) {
3269 #if DISPATCH_DEBUG
3270 DISPATCH_CLIENT_CRASH(
3271 "dispatch_after_f() called with 'when' == infinity");
3272 #endif
3273 return;
3274 }
3275
3276 delta = _dispatch_timeout(when);
3277 if (delta == 0) {
3278 return dispatch_async_f(queue, ctxt, func);
3279 }
3280 leeway = delta / 10; // <rdar://problem/13447496>
3281 if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
3282 if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
3283
3284 // this function can and should be optimized to not use a dispatch source
3285 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);
3286 dispatch_assert(ds);
3287
3288 // TODO: don't use a separate continuation & voucher
3289 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3290 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
3291 dc->dc_func = func;
3292 dc->dc_ctxt = ctxt;
3293 dc->dc_data = ds;
3294
3295 dispatch_set_context(ds, dc);
3296 dispatch_source_set_event_handler_f(ds, _dispatch_after_timer_callback);
3297 dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, leeway);
3298 dispatch_resume(ds);
3299 }
3300
3301 #ifdef __BLOCKS__
3302 void
3303 dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
3304 dispatch_block_t work)
3305 {
3306 // test before the copy of the block
3307 if (when == DISPATCH_TIME_FOREVER) {
3308 #if DISPATCH_DEBUG
3309 DISPATCH_CLIENT_CRASH(
3310 "dispatch_after() called with 'when' == infinity");
3311 #endif
3312 return;
3313 }
3314 dispatch_after_f(when, queue, _dispatch_Block_copy(work),
3315 _dispatch_call_block_and_release);
3316 }
3317 #endif
3318
3319 #pragma mark -
3320 #pragma mark dispatch_queue_push
3321
3322 DISPATCH_ALWAYS_INLINE
3323 static inline void
3324 _dispatch_queue_push_list_slow2(dispatch_queue_t dq, pthread_priority_t pp,
3325 struct dispatch_object_s *obj, bool retained)
3326 {
3327 // The queue must be retained before dq_items_head is written in order
3328 // to ensure that the reference is still valid when _dispatch_wakeup is
3329 // called. Otherwise, if preempted between the assignment to
3330 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
3331 // queue may release the last reference to the queue when invoked by
3332 // _dispatch_queue_drain. <rdar://problem/6932776>
3333 if (!retained) _dispatch_retain(dq);
3334 dq->dq_items_head = obj;
3335 return _dispatch_queue_wakeup_with_qos_and_release(dq, pp);
3336 }
3337
3338 DISPATCH_NOINLINE
3339 void
3340 _dispatch_queue_push_list_slow(dispatch_queue_t dq, pthread_priority_t pp,
3341 struct dispatch_object_s *obj, unsigned int n, bool retained)
3342 {
3343 if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) {
3344 dispatch_assert(!retained);
3345 dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed);
3346 return _dispatch_queue_wakeup_global2(dq, n);
3347 }
3348 _dispatch_queue_push_list_slow2(dq, pp, obj, retained);
3349 }
3350
3351 DISPATCH_NOINLINE
3352 void
3353 _dispatch_queue_push_slow(dispatch_queue_t dq, pthread_priority_t pp,
3354 struct dispatch_object_s *obj, bool retained)
3355 {
3356 if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) {
3357 dispatch_assert(!retained);
3358 dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed);
3359 return _dispatch_queue_wakeup_global(dq);
3360 }
3361 _dispatch_queue_push_list_slow2(dq, pp, obj, retained);
3362 }
3363
3364 #pragma mark -
3365 #pragma mark dispatch_queue_probe
3366
3367 unsigned long
3368 _dispatch_queue_probe(dispatch_queue_t dq)
3369 {
3370 return _dispatch_queue_class_probe(dq);
3371 }
3372
3373 #if DISPATCH_COCOA_COMPAT
3374 unsigned long
3375 _dispatch_runloop_queue_probe(dispatch_queue_t dq)
3376 {
3377 if (_dispatch_queue_class_probe(dq)) {
3378 if (dq->do_xref_cnt == -1) return true; // <rdar://problem/14026816>
3379 return _dispatch_runloop_queue_wakeup(dq);
3380 }
3381 return false;
3382 }
3383 #endif
3384
3385 unsigned long
3386 _dispatch_mgr_queue_probe(dispatch_queue_t dq)
3387 {
3388 if (_dispatch_queue_class_probe(dq)) {
3389 return _dispatch_mgr_wakeup(dq);
3390 }
3391 return false;
3392 }
3393
3394 unsigned long
3395 _dispatch_root_queue_probe(dispatch_queue_t dq)
3396 {
3397 _dispatch_queue_wakeup_global(dq);
3398 return false;
3399 }
3400
3401 #pragma mark -
3402 #pragma mark dispatch_wakeup
3403
3404 // 6618342 Contact the team that owns the Instrument DTrace probe before
3405 // renaming this symbol
3406 dispatch_queue_t
3407 _dispatch_wakeup(dispatch_object_t dou)
3408 {
3409 unsigned long type = dx_metatype(dou._do);
3410 if (type == _DISPATCH_QUEUE_TYPE || type == _DISPATCH_SOURCE_TYPE) {
3411 return _dispatch_queue_wakeup(dou._dq);
3412 }
3413 if (_dispatch_object_suspended(dou)) {
3414 return NULL;
3415 }
3416 if (!dx_probe(dou._do)) {
3417 return NULL;
3418 }
3419 if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,
3420 DISPATCH_OBJECT_SUSPEND_LOCK, acquire)) {
3421 return NULL;
3422 }
3423 _dispatch_retain(dou._do);
3424 dispatch_queue_t tq = dou._do->do_targetq;
3425 _dispatch_queue_push(tq, dou._do, 0);
3426 return tq; // libdispatch does not need this, but the Instrument DTrace
3427 // probe does
3428 }
3429
3430 #if DISPATCH_COCOA_COMPAT
3431 static inline void
3432 _dispatch_runloop_queue_wakeup_thread(dispatch_queue_t dq)
3433 {
3434 mach_port_t mp = (mach_port_t)dq->do_ctxt;
3435 if (!mp) {
3436 return;
3437 }
3438 kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0);
3439 switch (kr) {
3440 case MACH_SEND_TIMEOUT:
3441 case MACH_SEND_TIMED_OUT:
3442 case MACH_SEND_INVALID_DEST:
3443 break;
3444 default:
3445 (void)dispatch_assume_zero(kr);
3446 break;
3447 }
3448 }
3449
3450 DISPATCH_NOINLINE DISPATCH_WEAK
3451 unsigned long
3452 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq)
3453 {
3454 _dispatch_runloop_queue_wakeup_thread(dq);
3455 return false;
3456 }
3457
3458 DISPATCH_NOINLINE
3459 static dispatch_queue_t
3460 _dispatch_main_queue_wakeup(void)
3461 {
3462 dispatch_queue_t dq = &_dispatch_main_q;
3463 if (!dq->dq_is_thread_bound) {
3464 return NULL;
3465 }
3466 dispatch_once_f(&_dispatch_main_q_port_pred, dq,
3467 _dispatch_runloop_queue_port_init);
3468 _dispatch_runloop_queue_wakeup_thread(dq);
3469 return NULL;
3470 }
3471 #endif
3472
3473 DISPATCH_NOINLINE
3474 static void
3475 _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq, unsigned int n)
3476 {
3477 dispatch_root_queue_context_t qc = dq->do_ctxt;
3478 uint32_t i = n;
3479 int r;
3480
3481 _dispatch_debug_root_queue(dq, __func__);
3482 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
3483 _dispatch_root_queues_init);
3484
3485 #if HAVE_PTHREAD_WORKQUEUES
3486 #if DISPATCH_USE_PTHREAD_POOL
3487 if (qc->dgq_kworkqueue != (void*)(~0ul))
3488 #endif
3489 {
3490 _dispatch_root_queue_debug("requesting new worker thread for global "
3491 "queue: %p", dq);
3492 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
3493 if (qc->dgq_kworkqueue) {
3494 pthread_workitem_handle_t wh;
3495 unsigned int gen_cnt;
3496 do {
3497 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
3498 _dispatch_worker_thread4, dq, &wh, &gen_cnt);
3499 (void)dispatch_assume_zero(r);
3500 } while (--i);
3501 return;
3502 }
3503 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
3504 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
3505 if (!dq->dq_priority) {
3506 r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
3507 qc->dgq_wq_options, (int)i);
3508 (void)dispatch_assume_zero(r);
3509 return;
3510 }
3511 #endif
3512 #if HAVE_PTHREAD_WORKQUEUE_QOS
3513 r = _pthread_workqueue_addthreads((int)i, dq->dq_priority);
3514 (void)dispatch_assume_zero(r);
3515 #endif
3516 return;
3517 }
3518 #endif // HAVE_PTHREAD_WORKQUEUES
3519 #if DISPATCH_USE_PTHREAD_POOL
3520 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
3521 if (fastpath(pqc->dpq_thread_mediator.do_vtable)) {
3522 while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
3523 if (!--i) {
3524 return;
3525 }
3526 }
3527 }
3528 uint32_t j, t_count;
3529 // seq_cst with atomic store to tail <rdar://problem/16932833>
3530 t_count = dispatch_atomic_load2o(qc, dgq_thread_pool_size, seq_cst);
3531 do {
3532 if (!t_count) {
3533 _dispatch_root_queue_debug("pthread pool is full for root queue: "
3534 "%p", dq);
3535 return;
3536 }
3537 j = i > t_count ? t_count : i;
3538 } while (!dispatch_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count,
3539 t_count - j, &t_count, acquire));
3540
3541 pthread_attr_t *attr = &pqc->dpq_thread_attr;
3542 pthread_t tid, *pthr = &tid;
3543 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
3544 if (slowpath(dq == &_dispatch_mgr_root_queue)) {
3545 pthr = _dispatch_mgr_root_queue_init();
3546 }
3547 #endif
3548 do {
3549 _dispatch_retain(dq);
3550 while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
3551 if (r != EAGAIN) {
3552 (void)dispatch_assume_zero(r);
3553 }
3554 _dispatch_temporary_resource_shortage();
3555 }
3556 } while (--j);
3557 #endif // DISPATCH_USE_PTHREAD_POOL
3558 }
3559
3560 static inline void
3561 _dispatch_queue_wakeup_global2(dispatch_queue_t dq, unsigned int n)
3562 {
3563 if (!_dispatch_queue_class_probe(dq)) {
3564 return;
3565 }
3566 #if HAVE_PTHREAD_WORKQUEUES
3567 dispatch_root_queue_context_t qc = dq->do_ctxt;
3568 if (
3569 #if DISPATCH_USE_PTHREAD_POOL
3570 (qc->dgq_kworkqueue != (void*)(~0ul)) &&
3571 #endif
3572 !dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) {
3573 _dispatch_root_queue_debug("worker thread request still pending for "
3574 "global queue: %p", dq);
3575 return;
3576 }
3577 #endif // HAVE_PTHREAD_WORKQUEUES
3578 return _dispatch_queue_wakeup_global_slow(dq, n);
3579 }
3580
3581 static inline void
3582 _dispatch_queue_wakeup_global(dispatch_queue_t dq)
3583 {
3584 return _dispatch_queue_wakeup_global2(dq, 1);
3585 }
3586
3587 #pragma mark -
3588 #pragma mark dispatch_queue_invoke
3589
3590 DISPATCH_ALWAYS_INLINE
3591 static inline dispatch_queue_t
3592 dispatch_queue_invoke2(dispatch_object_t dou,
3593 _dispatch_thread_semaphore_t *sema_ptr)
3594 {
3595 dispatch_queue_t dq = dou._dq;
3596 dispatch_queue_t otq = dq->do_targetq;
3597 dispatch_queue_t cq = _dispatch_queue_get_current();
3598
3599 if (slowpath(cq != otq)) {
3600 return otq;
3601 }
3602
3603 *sema_ptr = _dispatch_queue_drain(dq);
3604
3605 if (slowpath(otq != dq->do_targetq)) {
3606 // An item on the queue changed the target queue
3607 return dq->do_targetq;
3608 }
3609 return NULL;
3610 }
3611
3612 // 6618342 Contact the team that owns the Instrument DTrace probe before
3613 // renaming this symbol
3614 DISPATCH_NOINLINE
3615 void
3616 _dispatch_queue_invoke(dispatch_queue_t dq, dispatch_object_t dou,
3617 dispatch_invoke_flags_t flags)
3618 {
3619 _dispatch_queue_class_invoke(dq, dou._dc, flags, dispatch_queue_invoke2);
3620 }
3621
3622 #pragma mark -
3623 #pragma mark dispatch_queue_drain
3624
3625 DISPATCH_ALWAYS_INLINE
3626 static inline struct dispatch_object_s*
3627 _dispatch_queue_head(dispatch_queue_t dq)
3628 {
3629 struct dispatch_object_s *dc;
3630 _dispatch_wait_until(dc = fastpath(dq->dq_items_head));
3631 return dc;
3632 }
3633
3634 DISPATCH_ALWAYS_INLINE
3635 static inline struct dispatch_object_s*
3636 _dispatch_queue_next(dispatch_queue_t dq, struct dispatch_object_s *dc)
3637 {
3638 struct dispatch_object_s *next_dc;
3639 next_dc = fastpath(dc->do_next);
3640 dq->dq_items_head = next_dc;
3641 if (!next_dc && !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, dc, NULL,
3642 relaxed)) {
3643 _dispatch_wait_until(next_dc = fastpath(dc->do_next));
3644 dq->dq_items_head = next_dc;
3645 }
3646 return next_dc;
3647 }
3648
3649 _dispatch_thread_semaphore_t
3650 _dispatch_queue_drain(dispatch_object_t dou)
3651 {
3652 dispatch_queue_t dq = dou._dq, orig_tq, old_dq;
3653 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
3654 struct dispatch_object_s *dc, *next_dc;
3655 _dispatch_thread_semaphore_t sema = 0;
3656
3657 // Continue draining sources after target queue change rdar://8928171
3658 bool check_tq = (dx_type(dq) != DISPATCH_SOURCE_KEVENT_TYPE);
3659
3660 orig_tq = dq->do_targetq;
3661
3662 _dispatch_thread_setspecific(dispatch_queue_key, dq);
3663 pthread_priority_t old_dp = _dispatch_set_defaultpriority(dq->dq_priority);
3664
3665 pthread_priority_t op = _dispatch_queue_get_override_priority(dq);
3666 pthread_priority_t dp = _dispatch_get_defaultpriority();
3667 dp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
3668 if (op > dp) {
3669 _dispatch_wqthread_override_start(dq->dq_thread, op);
3670 }
3671
3672 //dispatch_debug_queue(dq, __func__);
3673
3674 while (dq->dq_items_tail) {
3675 dc = _dispatch_queue_head(dq);
3676 do {
3677 if (DISPATCH_OBJECT_SUSPENDED(dq)) {
3678 goto out;
3679 }
3680 if (dq->dq_running > dq->dq_width) {
3681 goto out;
3682 }
3683 if (slowpath(orig_tq != dq->do_targetq) && check_tq) {
3684 goto out;
3685 }
3686 bool redirect = false;
3687 if (!fastpath(dq->dq_width == 1)) {
3688 if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
3689 (long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
3690 if (dq->dq_running > 1) {
3691 goto out;
3692 }
3693 } else {
3694 redirect = true;
3695 }
3696 }
3697 next_dc = _dispatch_queue_next(dq, dc);
3698 if (redirect) {
3699 _dispatch_continuation_redirect(dq, dc);
3700 continue;
3701 }
3702 if ((sema = _dispatch_barrier_sync_f_pop(dq, dc, true))) {
3703 goto out;
3704 }
3705 _dispatch_continuation_pop(dc);
3706 _dispatch_perfmon_workitem_inc();
3707 } while ((dc = next_dc));
3708 }
3709
3710 out:
3711 _dispatch_reset_defaultpriority(old_dp);
3712 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
3713 return sema;
3714 }
3715
3716 #if DISPATCH_COCOA_COMPAT
3717 static void
3718 _dispatch_main_queue_drain(void)
3719 {
3720 dispatch_queue_t dq = &_dispatch_main_q;
3721 if (!dq->dq_items_tail) {
3722 return;
3723 }
3724 struct dispatch_continuation_s marker = {
3725 .do_vtable = NULL,
3726 };
3727 struct dispatch_object_s *dmarker = (void*)&marker;
3728 _dispatch_queue_push_notrace(dq, dmarker, 0);
3729
3730 _dispatch_perfmon_start();
3731 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
3732 _dispatch_thread_setspecific(dispatch_queue_key, dq);
3733 pthread_priority_t old_pri = _dispatch_get_priority();
3734 pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri);
3735 voucher_t voucher = _voucher_copy();
3736
3737 struct dispatch_object_s *dc, *next_dc;
3738 dc = _dispatch_queue_head(dq);
3739 do {
3740 next_dc = _dispatch_queue_next(dq, dc);
3741 if (dc == dmarker) {
3742 goto out;
3743 }
3744 _dispatch_continuation_pop(dc);
3745 _dispatch_perfmon_workitem_inc();
3746 } while ((dc = next_dc));
3747 DISPATCH_CRASH("Main queue corruption");
3748
3749 out:
3750 if (next_dc) {
3751 _dispatch_main_queue_wakeup();
3752 } else {
3753 pthread_priority_t p = _dispatch_queue_reset_override_priority(dq);
3754
3755 if (p > (dq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
3756 _dispatch_thread_override_end(dq->dq_thread);
3757 }
3758 }
3759 _dispatch_voucher_debug("main queue restore", voucher);
3760 _dispatch_reset_priority_and_voucher(old_pri, voucher);
3761 _dispatch_reset_defaultpriority(old_dp);
3762 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
3763 _dispatch_perfmon_end();
3764 _dispatch_force_cache_cleanup();
3765 }
3766
3767 static bool
3768 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq)
3769 {
3770 if (!dq->dq_items_tail) {
3771 return false;
3772 }
3773 _dispatch_perfmon_start();
3774 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
3775 _dispatch_thread_setspecific(dispatch_queue_key, dq);
3776 pthread_priority_t old_pri = _dispatch_get_priority();
3777 pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri);
3778 voucher_t voucher = _voucher_copy();
3779
3780 struct dispatch_object_s *dc, *next_dc;
3781 dc = _dispatch_queue_head(dq);
3782 next_dc = _dispatch_queue_next(dq, dc);
3783 _dispatch_continuation_pop(dc);
3784 _dispatch_perfmon_workitem_inc();
3785
3786 _dispatch_voucher_debug("runloop queue restore", voucher);
3787 _dispatch_reset_priority_and_voucher(old_pri, voucher);
3788 _dispatch_reset_defaultpriority(old_dp);
3789 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
3790 _dispatch_perfmon_end();
3791 _dispatch_force_cache_cleanup();
3792 return next_dc;
3793 }
3794 #endif
3795
3796 DISPATCH_ALWAYS_INLINE_NDEBUG
3797 static inline _dispatch_thread_semaphore_t
3798 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq)
3799 {
3800 // rdar://problem/8290662 "lock transfer"
3801 struct dispatch_object_s *dc;
3802 _dispatch_thread_semaphore_t sema;
3803
3804 // queue is locked, or suspended and not being drained
3805 dc = dq->dq_items_head;
3806 if (slowpath(!dc) || !(sema = _dispatch_barrier_sync_f_pop(dq, dc, false))){
3807 return 0;
3808 }
3809 // dequeue dc, it is a barrier sync
3810 (void)_dispatch_queue_next(dq, dc);
3811 return sema;
3812 }
3813
3814 void
3815 _dispatch_mgr_queue_drain(void)
3816 {
3817 dispatch_queue_t dq = &_dispatch_mgr_q;
3818 if (!dq->dq_items_tail) {
3819 return _dispatch_force_cache_cleanup();
3820 }
3821 _dispatch_perfmon_start();
3822 if (slowpath(_dispatch_queue_drain(dq))) {
3823 DISPATCH_CRASH("Sync onto manager queue");
3824 }
3825 _dispatch_voucher_debug("mgr queue clear", NULL);
3826 _voucher_clear();
3827 _dispatch_queue_reset_override_priority(dq);
3828 _dispatch_reset_defaultpriority_override();
3829 _dispatch_perfmon_end();
3830 _dispatch_force_cache_cleanup();
3831 }
3832
3833 #pragma mark -
3834 #pragma mark _dispatch_queue_wakeup_with_qos
3835
3836 DISPATCH_NOINLINE
3837 static dispatch_queue_t
3838 _dispatch_queue_wakeup_with_qos_slow(dispatch_queue_t dq, pthread_priority_t pp,
3839 bool retained)
3840 {
3841 if (!dx_probe(dq) && (dq->dq_is_thread_bound || !dq->dq_thread)) {
3842 if (retained) _dispatch_release(dq);
3843 return NULL;
3844 }
3845 if (!dispatch_atomic_cmpxchg2o(dq, do_suspend_cnt, 0,
3846 DISPATCH_OBJECT_SUSPEND_LOCK, acquire)) {
3847 bool was_overridden, override;
3848
3849 override = _dispatch_queue_override_priority(dq, &pp, &was_overridden);
3850 if (override && dq->dq_running > 1) {
3851 override = false;
3852 }
3853
3854 #if DISPATCH_COCOA_COMPAT
3855 if (dq == &_dispatch_main_q && dq->dq_is_thread_bound) {
3856 if (override) {
3857 _dispatch_thread_override_start(dq->dq_thread, pp);
3858 if (was_overridden) {
3859 _dispatch_thread_override_end(dq->dq_thread);
3860 }
3861 }
3862 return _dispatch_main_queue_wakeup();
3863 }
3864 #endif
3865 if (override) {
3866 #if HAVE_PTHREAD_WORKQUEUE_QOS
3867 mach_port_t th;
3868 // <rdar://problem/17735825> to traverse the tq chain safely we must
3869 // lock it to ensure it cannot change, unless the queue is running
3870 // and we can just override the thread itself
3871 if (dq->dq_thread) {
3872 _dispatch_wqthread_override_start(dq->dq_thread, pp);
3873 } else if (!dispatch_atomic_cmpxchgv2o(dq, dq_tqthread,
3874 MACH_PORT_NULL, _dispatch_thread_port(), &th, acquire)) {
3875 // already locked, override the owner, trysync will do a queue
3876 // wakeup when it returns, see _dispatch_set_target_queue2
3877 _dispatch_wqthread_override_start(th, pp);
3878 } else {
3879 dispatch_queue_t tq = dq->do_targetq;
3880 if (_dispatch_queue_prepare_override(dq, tq, pp)) {
3881 _dispatch_queue_push_override(dq, tq, pp, false);
3882 } else {
3883 _dispatch_queue_wakeup_with_qos(tq, pp);
3884 }
3885 dispatch_atomic_store2o(dq, dq_tqthread, MACH_PORT_NULL,
3886 release);
3887 }
3888 #endif
3889 }
3890 if (retained) _dispatch_release(dq);
3891 return NULL;
3892 }
3893
3894 dispatch_queue_t tq = dq->do_targetq;
3895 if (!retained) _dispatch_retain(dq);
3896 _dispatch_queue_push_queue(tq, dq, pp);
3897 return tq; // libdispatch does not need this, but the Instrument DTrace
3898 // probe does
3899 }
3900
3901 DISPATCH_ALWAYS_INLINE
3902 static inline dispatch_queue_t
3903 _dispatch_queue_wakeup_with_qos2(dispatch_queue_t dq, pthread_priority_t pp,
3904 bool retained)
3905 {
3906 if (_dispatch_object_suspended(dq)) {
3907 _dispatch_queue_override_priority(dq, &pp, NULL);
3908 if (retained) _dispatch_release(dq);
3909 return NULL;
3910 }
3911 return _dispatch_queue_wakeup_with_qos_slow(dq, pp, retained);
3912 }
3913
3914 DISPATCH_NOINLINE
3915 void
3916 _dispatch_queue_wakeup_with_qos_and_release(dispatch_queue_t dq,
3917 pthread_priority_t pp)
3918 {
3919 (void)_dispatch_queue_wakeup_with_qos2(dq, pp, true);
3920 }
3921
3922 DISPATCH_NOINLINE
3923 void
3924 _dispatch_queue_wakeup_with_qos(dispatch_queue_t dq, pthread_priority_t pp)
3925 {
3926 (void)_dispatch_queue_wakeup_with_qos2(dq, pp, false);
3927 }
3928
3929 DISPATCH_NOINLINE
3930 void
3931 _dispatch_queue_wakeup_and_release(dispatch_queue_t dq)
3932 {
3933 (void)_dispatch_queue_wakeup_with_qos2(dq,
3934 _dispatch_queue_get_override_priority(dq), true);
3935 }
3936
3937 DISPATCH_NOINLINE
3938 dispatch_queue_t
3939 _dispatch_queue_wakeup(dispatch_queue_t dq)
3940 {
3941 return _dispatch_queue_wakeup_with_qos2(dq,
3942 _dispatch_queue_get_override_priority(dq), false);
3943 }
3944
3945 #if HAVE_PTHREAD_WORKQUEUE_QOS
3946 DISPATCH_NOINLINE
3947 static void
3948 _dispatch_queue_override_invoke_stealing(void *ctxt)
3949 {
3950 dispatch_continuation_t dc = (dispatch_continuation_t)ctxt;
3951 dispatch_queue_t dq = dc->dc_data;
3952
3953 dx_invoke(dq, dc, DISPATCH_INVOKE_OVERRIDING | DISPATCH_INVOKE_STEALING);
3954 }
3955
3956 DISPATCH_NOINLINE
3957 static void
3958 _dispatch_queue_override_invoke_owning(void *ctxt)
3959 {
3960 dispatch_continuation_t dc = (dispatch_continuation_t)ctxt;
3961 dispatch_queue_t dq = dc->dc_data;
3962
3963 // balance the fake continuation push in _dispatch_queue_push_override
3964 _dispatch_trace_continuation_pop(dc->dc_other, dc->dc_data);
3965 dx_invoke(dq, dc, DISPATCH_INVOKE_OVERRIDING);
3966 }
3967 #endif
3968
3969 static inline bool
3970 _dispatch_queue_prepare_override(dispatch_queue_t dq, dispatch_queue_t tq,
3971 pthread_priority_t p)
3972 {
3973 #if HAVE_PTHREAD_WORKQUEUE_QOS
3974 if (dx_type(tq) != DISPATCH_QUEUE_ROOT_TYPE || !tq->dq_priority) {
3975 return false;
3976 }
3977 if (p <= (dq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
3978 return false;
3979 }
3980 if (p <= (tq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
3981 return false;
3982 }
3983 return true;
3984 #else
3985 (void)dq; (void)tq; (void)p;
3986 return false;
3987 #endif
3988 }
3989
3990 static inline void
3991 _dispatch_queue_push_override(dispatch_queue_t dq, dispatch_queue_t tq,
3992 pthread_priority_t p, bool owning)
3993 {
3994 #if HAVE_PTHREAD_WORKQUEUE_QOS
3995 unsigned int qosbit, idx, overcommit;
3996 overcommit = (tq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) ? 1 : 0;
3997 qosbit = (p & _PTHREAD_PRIORITY_QOS_CLASS_MASK) >>
3998 _PTHREAD_PRIORITY_QOS_CLASS_SHIFT;
3999 idx = (unsigned int)__builtin_ffs((int)qosbit);
4000 if (!idx || idx > DISPATCH_QUEUE_QOS_COUNT) {
4001 DISPATCH_CRASH("Corrupted override priority");
4002 }
4003 dispatch_queue_t rq = &_dispatch_root_queues[((idx-1) << 1) | overcommit];
4004
4005 dispatch_continuation_t dc = _dispatch_continuation_alloc();
4006 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
4007 if (owning) {
4008 // fake that we queued `dq` on `tq` for introspection purposes
4009 _dispatch_trace_continuation_push(tq, dq);
4010 dc->dc_func = _dispatch_queue_override_invoke_owning;
4011 } else {
4012 dc->dc_func = _dispatch_queue_override_invoke_stealing;
4013 _dispatch_retain(dq);
4014 }
4015 dc->dc_ctxt = dc;
4016 dc->dc_priority = 0;
4017 dc->dc_other = tq;
4018 dc->dc_voucher = NULL;
4019 dc->dc_data = dq;
4020
4021 _dispatch_queue_push(rq, dc, 0);
4022 #else
4023 (void)dq; (void)tq; (void)p;
4024 #endif
4025 }
4026
4027 void
4028 _dispatch_queue_push_queue(dispatch_queue_t tq, dispatch_queue_t dq,
4029 pthread_priority_t pp)
4030 {
4031 _dispatch_queue_override_priority(dq, &pp, NULL);
4032 if (_dispatch_queue_prepare_override(dq, tq, pp)) {
4033 _dispatch_queue_push_override(dq, tq, pp, true);
4034 } else {
4035 _dispatch_queue_push(tq, dq, pp);
4036 }
4037 }
4038
4039 #pragma mark -
4040 #pragma mark dispatch_root_queue_drain
4041
4042 DISPATCH_NOINLINE
4043 static bool
4044 _dispatch_queue_concurrent_drain_one_slow(dispatch_queue_t dq)
4045 {
4046 dispatch_root_queue_context_t qc = dq->do_ctxt;
4047 struct dispatch_object_s *const mediator = (void *)~0ul;
4048 bool pending = false, available = true;
4049 unsigned int sleep_time = DISPATCH_CONTENTION_USLEEP_START;
4050
4051 do {
4052 // Spin for a short while in case the contention is temporary -- e.g.
4053 // when starting up after dispatch_apply, or when executing a few
4054 // short continuations in a row.
4055 if (_dispatch_contention_wait_until(dq->dq_items_head != mediator)) {
4056 goto out;
4057 }
4058 // Since we have serious contention, we need to back off.
4059 if (!pending) {
4060 // Mark this queue as pending to avoid requests for further threads
4061 (void)dispatch_atomic_inc2o(qc, dgq_pending, relaxed);
4062 pending = true;
4063 }
4064 _dispatch_contention_usleep(sleep_time);
4065 if (fastpath(dq->dq_items_head != mediator)) goto out;
4066 sleep_time *= 2;
4067 } while (sleep_time < DISPATCH_CONTENTION_USLEEP_MAX);
4068
4069 // The ratio of work to libdispatch overhead must be bad. This
4070 // scenario implies that there are too many threads in the pool.
4071 // Create a new pending thread and then exit this thread.
4072 // The kernel will grant a new thread when the load subsides.
4073 _dispatch_debug("contention on global queue: %p", dq);
4074 available = false;
4075 out:
4076 if (pending) {
4077 (void)dispatch_atomic_dec2o(qc, dgq_pending, relaxed);
4078 }
4079 if (!available) {
4080 _dispatch_queue_wakeup_global(dq);
4081 }
4082 return available;
4083 }
4084
4085 DISPATCH_ALWAYS_INLINE
4086 static inline bool
4087 _dispatch_queue_concurrent_drain_one2(dispatch_queue_t dq)
4088 {
4089 // Wait for queue head and tail to be both non-empty or both empty
4090 bool available; // <rdar://problem/15917893>
4091 _dispatch_wait_until((dq->dq_items_head != NULL) ==
4092 (available = (dq->dq_items_tail != NULL)));
4093 return available;
4094 }
4095
4096 DISPATCH_ALWAYS_INLINE_NDEBUG
4097 static inline struct dispatch_object_s *
4098 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq)
4099 {
4100 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
4101
4102 start:
4103 // The mediator value acts both as a "lock" and a signal
4104 head = dispatch_atomic_xchg2o(dq, dq_items_head, mediator, relaxed);
4105
4106 if (slowpath(head == NULL)) {
4107 // The first xchg on the tail will tell the enqueueing thread that it
4108 // is safe to blindly write out to the head pointer. A cmpxchg honors
4109 // the algorithm.
4110 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_items_head, mediator,
4111 NULL, relaxed))) {
4112 goto start;
4113 }
4114 if (slowpath(dq->dq_items_tail) && // <rdar://problem/14416349>
4115 _dispatch_queue_concurrent_drain_one2(dq)) {
4116 goto start;
4117 }
4118 _dispatch_root_queue_debug("no work on global queue: %p", dq);
4119 return NULL;
4120 }
4121
4122 if (slowpath(head == mediator)) {
4123 // This thread lost the race for ownership of the queue.
4124 if (fastpath(_dispatch_queue_concurrent_drain_one_slow(dq))) {
4125 goto start;
4126 }
4127 return NULL;
4128 }
4129
4130 // Restore the head pointer to a sane value before returning.
4131 // If 'next' is NULL, then this item _might_ be the last item.
4132 next = fastpath(head->do_next);
4133
4134 if (slowpath(!next)) {
4135 dispatch_atomic_store2o(dq, dq_items_head, NULL, relaxed);
4136
4137 if (dispatch_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL, relaxed)) {
4138 // both head and tail are NULL now
4139 goto out;
4140 }
4141 // There must be a next item now.
4142 _dispatch_wait_until(next = head->do_next);
4143 }
4144
4145 dispatch_atomic_store2o(dq, dq_items_head, next, relaxed);
4146 _dispatch_queue_wakeup_global(dq);
4147 out:
4148 return head;
4149 }
4150
4151 static void
4152 _dispatch_root_queue_drain(dispatch_queue_t dq)
4153 {
4154 #if DISPATCH_DEBUG
4155 if (_dispatch_thread_getspecific(dispatch_queue_key)) {
4156 DISPATCH_CRASH("Premature thread recycling");
4157 }
4158 #endif
4159 _dispatch_thread_setspecific(dispatch_queue_key, dq);
4160 pthread_priority_t old_pri = _dispatch_get_priority();
4161 pthread_priority_t pri = dq->dq_priority ? dq->dq_priority : old_pri;
4162 pthread_priority_t old_dp = _dispatch_set_defaultpriority(pri);
4163
4164 #if DISPATCH_COCOA_COMPAT
4165 // ensure that high-level memory management techniques do not leak/crash
4166 if (dispatch_begin_thread_4GC) {
4167 dispatch_begin_thread_4GC();
4168 }
4169 void *pool = _dispatch_autorelease_pool_push();
4170 #endif // DISPATCH_COCOA_COMPAT
4171
4172 _dispatch_perfmon_start();
4173 struct dispatch_object_s *item;
4174 bool reset = false;
4175 while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
4176 if (reset) _dispatch_wqthread_override_reset();
4177 _dispatch_continuation_pop(item);
4178 reset = _dispatch_reset_defaultpriority_override();
4179 }
4180 _dispatch_voucher_debug("root queue clear", NULL);
4181 _dispatch_reset_priority_and_voucher(old_pri, NULL);
4182 _dispatch_reset_defaultpriority(old_dp);
4183 _dispatch_perfmon_end();
4184
4185 #if DISPATCH_COCOA_COMPAT
4186 _dispatch_autorelease_pool_pop(pool);
4187 if (dispatch_end_thread_4GC) {
4188 dispatch_end_thread_4GC();
4189 }
4190 #endif // DISPATCH_COCOA_COMPAT
4191
4192 _dispatch_thread_setspecific(dispatch_queue_key, NULL);
4193 }
4194
4195 #pragma mark -
4196 #pragma mark dispatch_worker_thread
4197
4198 #if HAVE_PTHREAD_WORKQUEUES
4199 static void
4200 _dispatch_worker_thread4(void *context)
4201 {
4202 dispatch_queue_t dq = context;
4203 dispatch_root_queue_context_t qc = dq->do_ctxt;
4204
4205 _dispatch_introspection_thread_add();
4206 int pending = (int)dispatch_atomic_dec2o(qc, dgq_pending, relaxed);
4207 dispatch_assert(pending >= 0);
4208 _dispatch_root_queue_drain(dq);
4209 __asm__(""); // prevent tailcall (for Instrument DTrace probe)
4210 }
4211
4212 #if HAVE_PTHREAD_WORKQUEUE_QOS
4213 static void
4214 _dispatch_worker_thread3(pthread_priority_t priority)
4215 {
4216 // Reset priority TSD to workaround <rdar://problem/17825261>
4217 _dispatch_thread_setspecific(dispatch_priority_key,
4218 (void*)(uintptr_t)(priority & ~_PTHREAD_PRIORITY_FLAGS_MASK));
4219 unsigned int overcommit, qosbit, idx;
4220 overcommit = (priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) ? 1 : 0;
4221 qosbit = (priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK) >>
4222 _PTHREAD_PRIORITY_QOS_CLASS_SHIFT;
4223 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS].
4224 dq_priority) {
4225 // If kernel doesn't support maintenance, bottom bit is background.
4226 // Shift to our idea of where background bit is.
4227 qosbit <<= 1;
4228 }
4229 idx = (unsigned int)__builtin_ffs((int)qosbit);
4230 dispatch_assert(idx > 0 && idx < DISPATCH_QUEUE_QOS_COUNT+1);
4231 dispatch_queue_t dq = &_dispatch_root_queues[((idx-1) << 1) | overcommit];
4232 return _dispatch_worker_thread4(dq);
4233 }
4234 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
4235
4236 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
4237 // 6618342 Contact the team that owns the Instrument DTrace probe before
4238 // renaming this symbol
4239 static void
4240 _dispatch_worker_thread2(int priority, int options,
4241 void *context DISPATCH_UNUSED)
4242 {
4243 dispatch_assert(priority >= 0 && priority < WORKQ_NUM_PRIOQUEUE);
4244 dispatch_assert(!(options & ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT));
4245 dispatch_queue_t dq = _dispatch_wq2root_queues[priority][options];
4246
4247 return _dispatch_worker_thread4(dq);
4248 }
4249 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
4250 #endif // HAVE_PTHREAD_WORKQUEUES
4251
4252 #if DISPATCH_USE_PTHREAD_POOL
4253 // 6618342 Contact the team that owns the Instrument DTrace probe before
4254 // renaming this symbol
4255 static void *
4256 _dispatch_worker_thread(void *context)
4257 {
4258 dispatch_queue_t dq = context;
4259 dispatch_root_queue_context_t qc = dq->do_ctxt;
4260 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
4261
4262 if (pqc->dpq_observer_hooks.queue_will_execute) {
4263 _dispatch_set_pthread_root_queue_observer_hooks(
4264 &pqc->dpq_observer_hooks);
4265 }
4266 if (pqc->dpq_thread_configure) {
4267 pqc->dpq_thread_configure();
4268 }
4269
4270 sigset_t mask;
4271 int r;
4272 // workaround tweaks the kernel workqueue does for us
4273 r = sigfillset(&mask);
4274 (void)dispatch_assume_zero(r);
4275 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
4276 (void)dispatch_assume_zero(r);
4277 _dispatch_introspection_thread_add();
4278
4279 const int64_t timeout = 5ull * NSEC_PER_SEC;
4280 do {
4281 _dispatch_root_queue_drain(dq);
4282 } while (dispatch_semaphore_wait(&pqc->dpq_thread_mediator,
4283 dispatch_time(0, timeout)) == 0);
4284
4285 (void)dispatch_atomic_inc2o(qc, dgq_thread_pool_size, release);
4286 _dispatch_queue_wakeup_global(dq);
4287 _dispatch_release(dq);
4288
4289 return NULL;
4290 }
4291
4292 int
4293 _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset)
4294 {
4295 int r;
4296
4297 /* Workaround: 6269619 Not all signals can be delivered on any thread */
4298
4299 r = sigdelset(set, SIGILL);
4300 (void)dispatch_assume_zero(r);
4301 r = sigdelset(set, SIGTRAP);
4302 (void)dispatch_assume_zero(r);
4303 #if HAVE_DECL_SIGEMT
4304 r = sigdelset(set, SIGEMT);
4305 (void)dispatch_assume_zero(r);
4306 #endif
4307 r = sigdelset(set, SIGFPE);
4308 (void)dispatch_assume_zero(r);
4309 r = sigdelset(set, SIGBUS);
4310 (void)dispatch_assume_zero(r);
4311 r = sigdelset(set, SIGSEGV);
4312 (void)dispatch_assume_zero(r);
4313 r = sigdelset(set, SIGSYS);
4314 (void)dispatch_assume_zero(r);
4315 r = sigdelset(set, SIGPIPE);
4316 (void)dispatch_assume_zero(r);
4317
4318 return pthread_sigmask(how, set, oset);
4319 }
4320 #endif // DISPATCH_USE_PTHREAD_POOL
4321
4322 #pragma mark -
4323 #pragma mark dispatch_runloop_queue
4324
4325 static bool _dispatch_program_is_probably_callback_driven;
4326
4327 #if DISPATCH_COCOA_COMPAT
4328
4329 dispatch_queue_t
4330 _dispatch_runloop_root_queue_create_4CF(const char *label, unsigned long flags)
4331 {
4332 dispatch_queue_t dq;
4333 size_t dqs;
4334
4335 if (slowpath(flags)) {
4336 return NULL;
4337 }
4338 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
4339 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_runloop), dqs);
4340 _dispatch_queue_init(dq);
4341 dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,true);
4342 dq->dq_label = label ? label : "runloop-queue"; // no-copy contract
4343 dq->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK;
4344 dq->dq_running = 1;
4345 dq->dq_is_thread_bound = 1;
4346 _dispatch_runloop_queue_port_init(dq);
4347 _dispatch_queue_set_bound_thread(dq);
4348 _dispatch_object_debug(dq, "%s", __func__);
4349 return _dispatch_introspection_queue_create(dq);
4350 }
4351
4352 void
4353 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq)
4354 {
4355 _dispatch_object_debug(dq, "%s", __func__);
4356 (void)dispatch_atomic_dec2o(dq, dq_running, relaxed);
4357 unsigned int suspend_cnt = dispatch_atomic_sub2o(dq, do_suspend_cnt,
4358 DISPATCH_OBJECT_SUSPEND_LOCK, release);
4359 _dispatch_queue_clear_bound_thread(dq);
4360 if (suspend_cnt == 0) {
4361 _dispatch_queue_wakeup(dq);
4362 }
4363 }
4364
4365 void
4366 _dispatch_runloop_queue_dispose(dispatch_queue_t dq)
4367 {
4368 _dispatch_object_debug(dq, "%s", __func__);
4369 _dispatch_introspection_queue_dispose(dq);
4370 _dispatch_runloop_queue_port_dispose(dq);
4371 _dispatch_queue_destroy(dq);
4372 }
4373
4374 bool
4375 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq)
4376 {
4377 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
4378 DISPATCH_CLIENT_CRASH("Not a runloop queue");
4379 }
4380 dispatch_retain(dq);
4381 bool r = _dispatch_runloop_queue_drain_one(dq);
4382 dispatch_release(dq);
4383 return r;
4384 }
4385
4386 void
4387 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq)
4388 {
4389 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
4390 DISPATCH_CLIENT_CRASH("Not a runloop queue");
4391 }
4392 _dispatch_runloop_queue_probe(dq);
4393 }
4394
4395 mach_port_t
4396 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq)
4397 {
4398 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
4399 DISPATCH_CLIENT_CRASH("Not a runloop queue");
4400 }
4401 return (mach_port_t)dq->do_ctxt;
4402 }
4403
4404 static void
4405 _dispatch_runloop_queue_port_init(void *ctxt)
4406 {
4407 dispatch_queue_t dq = (dispatch_queue_t)ctxt;
4408 mach_port_t mp;
4409 kern_return_t kr;
4410
4411 _dispatch_safe_fork = false;
4412 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &mp);
4413 DISPATCH_VERIFY_MIG(kr);
4414 (void)dispatch_assume_zero(kr);
4415 kr = mach_port_insert_right(mach_task_self(), mp, mp,
4416 MACH_MSG_TYPE_MAKE_SEND);
4417 DISPATCH_VERIFY_MIG(kr);
4418 (void)dispatch_assume_zero(kr);
4419 if (dq != &_dispatch_main_q) {
4420 struct mach_port_limits limits = {
4421 .mpl_qlimit = 1,
4422 };
4423 kr = mach_port_set_attributes(mach_task_self(), mp,
4424 MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits,
4425 sizeof(limits));
4426 DISPATCH_VERIFY_MIG(kr);
4427 (void)dispatch_assume_zero(kr);
4428 }
4429 dq->do_ctxt = (void*)(uintptr_t)mp;
4430
4431 _dispatch_program_is_probably_callback_driven = true;
4432 }
4433
4434 static void
4435 _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq)
4436 {
4437 mach_port_t mp = (mach_port_t)dq->do_ctxt;
4438 if (!mp) {
4439 return;
4440 }
4441 dq->do_ctxt = NULL;
4442 kern_return_t kr = mach_port_deallocate(mach_task_self(), mp);
4443 DISPATCH_VERIFY_MIG(kr);
4444 (void)dispatch_assume_zero(kr);
4445 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE, -1);
4446 DISPATCH_VERIFY_MIG(kr);
4447 (void)dispatch_assume_zero(kr);
4448 }
4449
4450 #pragma mark -
4451 #pragma mark dispatch_main_queue
4452
4453 mach_port_t
4454 _dispatch_get_main_queue_port_4CF(void)
4455 {
4456 dispatch_queue_t dq = &_dispatch_main_q;
4457 dispatch_once_f(&_dispatch_main_q_port_pred, dq,
4458 _dispatch_runloop_queue_port_init);
4459 return (mach_port_t)dq->do_ctxt;
4460 }
4461
4462 static bool main_q_is_draining;
4463
4464 // 6618342 Contact the team that owns the Instrument DTrace probe before
4465 // renaming this symbol
4466 DISPATCH_NOINLINE
4467 static void
4468 _dispatch_queue_set_mainq_drain_state(bool arg)
4469 {
4470 main_q_is_draining = arg;
4471 }
4472
4473 void
4474 _dispatch_main_queue_callback_4CF(mach_msg_header_t *msg DISPATCH_UNUSED)
4475 {
4476 if (main_q_is_draining) {
4477 return;
4478 }
4479 _dispatch_queue_set_mainq_drain_state(true);
4480 _dispatch_main_queue_drain();
4481 _dispatch_queue_set_mainq_drain_state(false);
4482 }
4483
4484 #endif
4485
4486 void
4487 dispatch_main(void)
4488 {
4489 #if HAVE_PTHREAD_MAIN_NP
4490 if (pthread_main_np()) {
4491 #endif
4492 _dispatch_object_debug(&_dispatch_main_q, "%s", __func__);
4493 _dispatch_program_is_probably_callback_driven = true;
4494 pthread_exit(NULL);
4495 DISPATCH_CRASH("pthread_exit() returned");
4496 #if HAVE_PTHREAD_MAIN_NP
4497 }
4498 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
4499 #endif
4500 }
4501
4502 DISPATCH_NOINLINE DISPATCH_NORETURN
4503 static void
4504 _dispatch_sigsuspend(void)
4505 {
4506 static const sigset_t mask;
4507
4508 for (;;) {
4509 sigsuspend(&mask);
4510 }
4511 }
4512
4513 DISPATCH_NORETURN
4514 static void
4515 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED)
4516 {
4517 // never returns, so burn bridges behind us
4518 _dispatch_clear_stack(0);
4519 _dispatch_sigsuspend();
4520 }
4521
4522 DISPATCH_NOINLINE
4523 static void
4524 _dispatch_queue_cleanup2(void)
4525 {
4526 dispatch_queue_t dq = &_dispatch_main_q;
4527 (void)dispatch_atomic_dec2o(dq, dq_running, relaxed);
4528 (void)dispatch_atomic_sub2o(dq, do_suspend_cnt,
4529 DISPATCH_OBJECT_SUSPEND_LOCK, release);
4530 _dispatch_queue_clear_bound_thread(dq);
4531 dq->dq_is_thread_bound = 0;
4532 // no need to drop the override, the thread will die anyway
4533 _dispatch_queue_wakeup_with_qos(dq,
4534 _dispatch_queue_reset_override_priority(dq));
4535
4536 // overload the "probably" variable to mean that dispatch_main() or
4537 // similar non-POSIX API was called
4538 // this has to run before the DISPATCH_COCOA_COMPAT below
4539 if (_dispatch_program_is_probably_callback_driven) {
4540 _dispatch_barrier_async_detached_f(_dispatch_get_root_queue(
4541 _DISPATCH_QOS_CLASS_DEFAULT, true), NULL, _dispatch_sig_thread);
4542 sleep(1); // workaround 6778970
4543 }
4544
4545 #if DISPATCH_COCOA_COMPAT
4546 dispatch_once_f(&_dispatch_main_q_port_pred, dq,
4547 _dispatch_runloop_queue_port_init);
4548 _dispatch_runloop_queue_port_dispose(dq);
4549 #endif
4550 }
4551
4552 static void
4553 _dispatch_queue_cleanup(void *ctxt)
4554 {
4555 if (ctxt == &_dispatch_main_q) {
4556 return _dispatch_queue_cleanup2();
4557 }
4558 // POSIX defines that destructors are only called if 'ctxt' is non-null
4559 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
4560 }