]> git.saurik.com Git - apple/libdispatch.git/blob - src/queue.c
libdispatch-442.1.4.tar.gz
[apple/libdispatch.git] / src / queue.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #endif
25
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
29 #endif
30 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
31 #define DISPATCH_USE_PTHREAD_POOL 1
32 #endif
33 #if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || DISPATCH_DEBUG) \
34 && !defined(DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK)
35 #define DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1
36 #endif
37 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK && \
38 !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
39 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
40 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
41 #endif
42 #if HAVE_PTHREAD_WORKQUEUE_QOS && !DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
43 #undef HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
44 #define HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 0
45 #endif
46 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
47 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
48 #define pthread_workqueue_t void*
49 #endif
50
51 static void _dispatch_cache_cleanup(void *value);
52 static void _dispatch_async_f_redirect(dispatch_queue_t dq,
53 dispatch_continuation_t dc, pthread_priority_t pp);
54 static void _dispatch_queue_cleanup(void *ctxt);
55 static inline void _dispatch_queue_wakeup_global2(dispatch_queue_t dq,
56 unsigned int n);
57 static inline void _dispatch_queue_wakeup_global(dispatch_queue_t dq);
58 static inline _dispatch_thread_semaphore_t
59 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq);
60 static inline bool _dispatch_queue_prepare_override(dispatch_queue_t dq,
61 dispatch_queue_t tq, pthread_priority_t p);
62 static inline void _dispatch_queue_push_override(dispatch_queue_t dq,
63 dispatch_queue_t tq, pthread_priority_t p);
64 #if HAVE_PTHREAD_WORKQUEUES
65 static void _dispatch_worker_thread4(void *context);
66 #if HAVE_PTHREAD_WORKQUEUE_QOS
67 static void _dispatch_worker_thread3(pthread_priority_t priority);
68 #endif
69 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
70 static void _dispatch_worker_thread2(int priority, int options, void *context);
71 #endif
72 #endif
73 #if DISPATCH_USE_PTHREAD_POOL
74 static void *_dispatch_worker_thread(void *context);
75 static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
76 #endif
77
78 #if DISPATCH_COCOA_COMPAT
79 static dispatch_once_t _dispatch_main_q_port_pred;
80 static dispatch_queue_t _dispatch_main_queue_wakeup(void);
81 unsigned long _dispatch_runloop_queue_wakeup(dispatch_queue_t dq);
82 static void _dispatch_runloop_queue_port_init(void *ctxt);
83 static void _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq);
84 #endif
85
86 static void _dispatch_root_queues_init(void *context);
87 static dispatch_once_t _dispatch_root_queues_pred;
88
89 #pragma mark -
90 #pragma mark dispatch_root_queue
91
92 struct dispatch_pthread_root_queue_context_s {
93 pthread_attr_t dpq_thread_attr;
94 dispatch_block_t dpq_thread_configure;
95 struct dispatch_semaphore_s dpq_thread_mediator;
96 };
97 typedef struct dispatch_pthread_root_queue_context_s *
98 dispatch_pthread_root_queue_context_t;
99
100 #if DISPATCH_ENABLE_THREAD_POOL
101 static struct dispatch_pthread_root_queue_context_s
102 _dispatch_pthread_root_queue_contexts[] = {
103 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {
104 .dpq_thread_mediator = {
105 .do_vtable = DISPATCH_VTABLE(semaphore),
106 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
107 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
108 }},
109 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {
110 .dpq_thread_mediator = {
111 .do_vtable = DISPATCH_VTABLE(semaphore),
112 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
113 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
114 }},
115 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {
116 .dpq_thread_mediator = {
117 .do_vtable = DISPATCH_VTABLE(semaphore),
118 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
119 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
120 }},
121 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {
122 .dpq_thread_mediator = {
123 .do_vtable = DISPATCH_VTABLE(semaphore),
124 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
125 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
126 }},
127 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {
128 .dpq_thread_mediator = {
129 .do_vtable = DISPATCH_VTABLE(semaphore),
130 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
131 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
132 }},
133 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {
134 .dpq_thread_mediator = {
135 .do_vtable = DISPATCH_VTABLE(semaphore),
136 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
137 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
138 }},
139 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {
140 .dpq_thread_mediator = {
141 .do_vtable = DISPATCH_VTABLE(semaphore),
142 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
143 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
144 }},
145 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {
146 .dpq_thread_mediator = {
147 .do_vtable = DISPATCH_VTABLE(semaphore),
148 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
149 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
150 }},
151 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {
152 .dpq_thread_mediator = {
153 .do_vtable = DISPATCH_VTABLE(semaphore),
154 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
155 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
156 }},
157 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {
158 .dpq_thread_mediator = {
159 .do_vtable = DISPATCH_VTABLE(semaphore),
160 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
161 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
162 }},
163 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {
164 .dpq_thread_mediator = {
165 .do_vtable = DISPATCH_VTABLE(semaphore),
166 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
167 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
168 }},
169 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {
170 .dpq_thread_mediator = {
171 .do_vtable = DISPATCH_VTABLE(semaphore),
172 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
173 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
174 }},
175 };
176 #endif
177
178 #define MAX_PTHREAD_COUNT 255
179
180 struct dispatch_root_queue_context_s {
181 union {
182 struct {
183 unsigned int volatile dgq_pending;
184 #if HAVE_PTHREAD_WORKQUEUES
185 qos_class_t dgq_qos;
186 int dgq_wq_priority, dgq_wq_options;
187 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
188 pthread_workqueue_t dgq_kworkqueue;
189 #endif
190 #endif // HAVE_PTHREAD_WORKQUEUES
191 #if DISPATCH_USE_PTHREAD_POOL
192 void *dgq_ctxt;
193 uint32_t volatile dgq_thread_pool_size;
194 #endif
195 };
196 char _dgq_pad[DISPATCH_CACHELINE_SIZE];
197 };
198 };
199 typedef struct dispatch_root_queue_context_s *dispatch_root_queue_context_t;
200
201 DISPATCH_CACHELINE_ALIGN
202 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
203 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
204 #if HAVE_PTHREAD_WORKQUEUES
205 .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE,
206 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
207 .dgq_wq_options = 0,
208 #endif
209 #if DISPATCH_ENABLE_THREAD_POOL
210 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
211 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS],
212 #endif
213 }}},
214 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
215 #if HAVE_PTHREAD_WORKQUEUES
216 .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE,
217 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
218 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
219 #endif
220 #if DISPATCH_ENABLE_THREAD_POOL
221 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
222 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT],
223 #endif
224 }}},
225 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
226 #if HAVE_PTHREAD_WORKQUEUES
227 .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND,
228 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
229 .dgq_wq_options = 0,
230 #endif
231 #if DISPATCH_ENABLE_THREAD_POOL
232 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
233 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
234 #endif
235 }}},
236 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
237 #if HAVE_PTHREAD_WORKQUEUES
238 .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND,
239 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
240 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
241 #endif
242 #if DISPATCH_ENABLE_THREAD_POOL
243 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
244 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
245 #endif
246 }}},
247 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
248 #if HAVE_PTHREAD_WORKQUEUES
249 .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY,
250 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
251 .dgq_wq_options = 0,
252 #endif
253 #if DISPATCH_ENABLE_THREAD_POOL
254 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
255 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
256 #endif
257 }}},
258 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
259 #if HAVE_PTHREAD_WORKQUEUES
260 .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY,
261 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
262 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
263 #endif
264 #if DISPATCH_ENABLE_THREAD_POOL
265 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
266 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
267 #endif
268 }}},
269 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
270 #if HAVE_PTHREAD_WORKQUEUES
271 .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT,
272 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
273 .dgq_wq_options = 0,
274 #endif
275 #if DISPATCH_ENABLE_THREAD_POOL
276 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
277 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
278 #endif
279 }}},
280 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
281 #if HAVE_PTHREAD_WORKQUEUES
282 .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT,
283 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
284 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
285 #endif
286 #if DISPATCH_ENABLE_THREAD_POOL
287 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
288 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
289 #endif
290 }}},
291 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
292 #if HAVE_PTHREAD_WORKQUEUES
293 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED,
294 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
295 .dgq_wq_options = 0,
296 #endif
297 #if DISPATCH_ENABLE_THREAD_POOL
298 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
299 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
300 #endif
301 }}},
302 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
303 #if HAVE_PTHREAD_WORKQUEUES
304 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED,
305 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
306 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
307 #endif
308 #if DISPATCH_ENABLE_THREAD_POOL
309 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
310 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
311 #endif
312 }}},
313 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
314 #if HAVE_PTHREAD_WORKQUEUES
315 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE,
316 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
317 .dgq_wq_options = 0,
318 #endif
319 #if DISPATCH_ENABLE_THREAD_POOL
320 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
321 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS],
322 #endif
323 }}},
324 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
325 #if HAVE_PTHREAD_WORKQUEUES
326 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE,
327 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
328 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
329 #endif
330 #if DISPATCH_ENABLE_THREAD_POOL
331 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
332 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT],
333 #endif
334 }}},
335 };
336
337 // 6618342 Contact the team that owns the Instrument DTrace probe before
338 // renaming this symbol
339 // dq_running is set to 2 so that barrier operations go through the slow path
340 DISPATCH_CACHELINE_ALIGN
341 struct dispatch_queue_s _dispatch_root_queues[] = {
342 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {
343 .do_vtable = DISPATCH_VTABLE(queue_root),
344 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
345 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
346 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
347 .do_ctxt = &_dispatch_root_queue_contexts[
348 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS],
349 .dq_label = "com.apple.root.maintenance-qos",
350 .dq_running = 2,
351 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
352 .dq_serialnum = 4,
353 },
354 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {
355 .do_vtable = DISPATCH_VTABLE(queue_root),
356 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
357 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
358 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
359 .do_ctxt = &_dispatch_root_queue_contexts[
360 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT],
361 .dq_label = "com.apple.root.maintenance-qos.overcommit",
362 .dq_running = 2,
363 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
364 .dq_serialnum = 5,
365 },
366 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {
367 .do_vtable = DISPATCH_VTABLE(queue_root),
368 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
369 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
370 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
371 .do_ctxt = &_dispatch_root_queue_contexts[
372 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
373 .dq_label = "com.apple.root.background-qos",
374 .dq_running = 2,
375 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
376 .dq_serialnum = 6,
377 },
378 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {
379 .do_vtable = DISPATCH_VTABLE(queue_root),
380 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
381 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
382 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
383 .do_ctxt = &_dispatch_root_queue_contexts[
384 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
385 .dq_label = "com.apple.root.background-qos.overcommit",
386 .dq_running = 2,
387 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
388 .dq_serialnum = 7,
389 },
390 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {
391 .do_vtable = DISPATCH_VTABLE(queue_root),
392 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
393 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
394 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
395 .do_ctxt = &_dispatch_root_queue_contexts[
396 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
397 .dq_label = "com.apple.root.utility-qos",
398 .dq_running = 2,
399 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
400 .dq_serialnum = 8,
401 },
402 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {
403 .do_vtable = DISPATCH_VTABLE(queue_root),
404 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
405 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
406 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
407 .do_ctxt = &_dispatch_root_queue_contexts[
408 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
409 .dq_label = "com.apple.root.utility-qos.overcommit",
410 .dq_running = 2,
411 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
412 .dq_serialnum = 9,
413 },
414 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {
415 .do_vtable = DISPATCH_VTABLE(queue_root),
416 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
417 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
418 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
419 .do_ctxt = &_dispatch_root_queue_contexts[
420 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
421 .dq_label = "com.apple.root.default-qos",
422 .dq_running = 2,
423 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
424 .dq_serialnum = 10,
425 },
426 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {
427 .do_vtable = DISPATCH_VTABLE(queue_root),
428 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
429 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
430 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
431 .do_ctxt = &_dispatch_root_queue_contexts[
432 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
433 .dq_label = "com.apple.root.default-qos.overcommit",
434 .dq_running = 2,
435 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
436 .dq_serialnum = 11,
437 },
438 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {
439 .do_vtable = DISPATCH_VTABLE(queue_root),
440 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
441 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
442 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
443 .do_ctxt = &_dispatch_root_queue_contexts[
444 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
445 .dq_label = "com.apple.root.user-initiated-qos",
446 .dq_running = 2,
447 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
448 .dq_serialnum = 12,
449 },
450 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {
451 .do_vtable = DISPATCH_VTABLE(queue_root),
452 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
453 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
454 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
455 .do_ctxt = &_dispatch_root_queue_contexts[
456 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
457 .dq_label = "com.apple.root.user-initiated-qos.overcommit",
458 .dq_running = 2,
459 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
460 .dq_serialnum = 13,
461 },
462 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {
463 .do_vtable = DISPATCH_VTABLE(queue_root),
464 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
465 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
466 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
467 .do_ctxt = &_dispatch_root_queue_contexts[
468 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS],
469 .dq_label = "com.apple.root.user-interactive-qos",
470 .dq_running = 2,
471 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
472 .dq_serialnum = 14,
473 },
474 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {
475 .do_vtable = DISPATCH_VTABLE(queue_root),
476 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
477 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
478 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
479 .do_ctxt = &_dispatch_root_queue_contexts[
480 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT],
481 .dq_label = "com.apple.root.user-interactive-qos.overcommit",
482 .dq_running = 2,
483 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
484 .dq_serialnum = 15,
485 },
486 };
487
488 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
489 static const dispatch_queue_t _dispatch_wq2root_queues[][2] = {
490 [WORKQ_BG_PRIOQUEUE][0] = &_dispatch_root_queues[
491 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
492 [WORKQ_BG_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
493 &_dispatch_root_queues[
494 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
495 [WORKQ_LOW_PRIOQUEUE][0] = &_dispatch_root_queues[
496 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
497 [WORKQ_LOW_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
498 &_dispatch_root_queues[
499 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
500 [WORKQ_DEFAULT_PRIOQUEUE][0] = &_dispatch_root_queues[
501 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
502 [WORKQ_DEFAULT_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
503 &_dispatch_root_queues[
504 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
505 [WORKQ_HIGH_PRIOQUEUE][0] = &_dispatch_root_queues[
506 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
507 [WORKQ_HIGH_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
508 &_dispatch_root_queues[
509 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
510 };
511 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
512
513 #define DISPATCH_PRIORITY_COUNT 5
514
515 enum {
516 // No DISPATCH_PRIORITY_IDX_MAINTENANCE define because there is no legacy
517 // maintenance priority
518 DISPATCH_PRIORITY_IDX_BACKGROUND = 0,
519 DISPATCH_PRIORITY_IDX_NON_INTERACTIVE,
520 DISPATCH_PRIORITY_IDX_LOW,
521 DISPATCH_PRIORITY_IDX_DEFAULT,
522 DISPATCH_PRIORITY_IDX_HIGH,
523 };
524
525 static qos_class_t _dispatch_priority2qos[] = {
526 [DISPATCH_PRIORITY_IDX_BACKGROUND] = _DISPATCH_QOS_CLASS_BACKGROUND,
527 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = _DISPATCH_QOS_CLASS_UTILITY,
528 [DISPATCH_PRIORITY_IDX_LOW] = _DISPATCH_QOS_CLASS_UTILITY,
529 [DISPATCH_PRIORITY_IDX_DEFAULT] = _DISPATCH_QOS_CLASS_DEFAULT,
530 [DISPATCH_PRIORITY_IDX_HIGH] = _DISPATCH_QOS_CLASS_USER_INITIATED,
531 };
532
533 #if HAVE_PTHREAD_WORKQUEUE_QOS
534 static const int _dispatch_priority2wq[] = {
535 [DISPATCH_PRIORITY_IDX_BACKGROUND] = WORKQ_BG_PRIOQUEUE,
536 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = WORKQ_NON_INTERACTIVE_PRIOQUEUE,
537 [DISPATCH_PRIORITY_IDX_LOW] = WORKQ_LOW_PRIOQUEUE,
538 [DISPATCH_PRIORITY_IDX_DEFAULT] = WORKQ_DEFAULT_PRIOQUEUE,
539 [DISPATCH_PRIORITY_IDX_HIGH] = WORKQ_HIGH_PRIOQUEUE,
540 };
541 #endif
542
543 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
544 static struct dispatch_queue_s _dispatch_mgr_root_queue;
545 #else
546 #define _dispatch_mgr_root_queue \
547 _dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY]
548 #endif
549
550 // 6618342 Contact the team that owns the Instrument DTrace probe before
551 // renaming this symbol
552 DISPATCH_CACHELINE_ALIGN
553 struct dispatch_queue_s _dispatch_mgr_q = {
554 .do_vtable = DISPATCH_VTABLE(queue_mgr),
555 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
556 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
557 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
558 .do_targetq = &_dispatch_mgr_root_queue,
559 .dq_label = "com.apple.libdispatch-manager",
560 .dq_width = 1,
561 .dq_is_thread_bound = 1,
562 .dq_serialnum = 2,
563 };
564
565 dispatch_queue_t
566 dispatch_get_global_queue(long priority, unsigned long flags)
567 {
568 if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
569 return NULL;
570 }
571 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
572 _dispatch_root_queues_init);
573 qos_class_t qos;
574 switch (priority) {
575 #if !RDAR_17878963 || DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
576 case _DISPATCH_QOS_CLASS_MAINTENANCE:
577 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS]
578 .dq_priority) {
579 // map maintenance to background on old kernel
580 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND];
581 } else {
582 qos = (qos_class_t)priority;
583 }
584 break;
585 #endif // RDAR_17878963 || DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
586 case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
587 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND];
588 break;
589 case DISPATCH_QUEUE_PRIORITY_NON_INTERACTIVE:
590 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_NON_INTERACTIVE];
591 break;
592 case DISPATCH_QUEUE_PRIORITY_LOW:
593 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_LOW];
594 break;
595 case DISPATCH_QUEUE_PRIORITY_DEFAULT:
596 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_DEFAULT];
597 break;
598 case DISPATCH_QUEUE_PRIORITY_HIGH:
599 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH];
600 break;
601 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE:
602 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
603 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS]
604 .dq_priority) {
605 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH];
606 break;
607 }
608 #endif
609 // fallthrough
610 default:
611 qos = (qos_class_t)priority;
612 break;
613 }
614 return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT);
615 }
616
617 DISPATCH_ALWAYS_INLINE
618 static inline dispatch_queue_t
619 _dispatch_get_current_queue(void)
620 {
621 return _dispatch_queue_get_current() ?:
622 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true);
623 }
624
625 dispatch_queue_t
626 dispatch_get_current_queue(void)
627 {
628 return _dispatch_get_current_queue();
629 }
630
631 DISPATCH_ALWAYS_INLINE
632 static inline bool
633 _dispatch_queue_targets_queue(dispatch_queue_t dq1, dispatch_queue_t dq2)
634 {
635 while (dq1) {
636 if (dq1 == dq2) {
637 return true;
638 }
639 dq1 = dq1->do_targetq;
640 }
641 return false;
642 }
643
644 #define DISPATCH_ASSERT_QUEUE_MESSAGE "BUG in client of libdispatch: " \
645 "Assertion failed: Block was run on an unexpected queue"
646
647 DISPATCH_NOINLINE
648 static void
649 _dispatch_assert_queue_fail(dispatch_queue_t dq, bool expected)
650 {
651 char *msg;
652 asprintf(&msg, "%s\n%s queue: 0x%p[%s]", DISPATCH_ASSERT_QUEUE_MESSAGE,
653 expected ? "Expected" : "Unexpected", dq, dq->dq_label ?
654 dq->dq_label : "");
655 _dispatch_log("%s", msg);
656 _dispatch_set_crash_log_message(msg);
657 _dispatch_hardware_crash();
658 free(msg);
659 }
660
661 void
662 dispatch_assert_queue(dispatch_queue_t dq)
663 {
664 if (slowpath(!dq) || slowpath(!(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE))) {
665 DISPATCH_CLIENT_CRASH("invalid queue passed to "
666 "dispatch_assert_queue()");
667 }
668 dispatch_queue_t cq = _dispatch_queue_get_current();
669 if (fastpath(cq) && fastpath(_dispatch_queue_targets_queue(cq, dq))) {
670 return;
671 }
672 _dispatch_assert_queue_fail(dq, true);
673 }
674
675 void
676 dispatch_assert_queue_not(dispatch_queue_t dq)
677 {
678 if (slowpath(!dq) || slowpath(!(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE))) {
679 DISPATCH_CLIENT_CRASH("invalid queue passed to "
680 "dispatch_assert_queue_not()");
681 }
682 dispatch_queue_t cq = _dispatch_queue_get_current();
683 if (slowpath(cq) && slowpath(_dispatch_queue_targets_queue(cq, dq))) {
684 _dispatch_assert_queue_fail(dq, false);
685 }
686 }
687
688 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
689 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
690 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
691 #else
692 #define _dispatch_root_queue_debug(...)
693 #define _dispatch_debug_root_queue(...)
694 #endif
695
696 #pragma mark -
697 #pragma mark dispatch_init
698
699 #if HAVE_PTHREAD_WORKQUEUE_QOS
700 int _dispatch_set_qos_class_enabled;
701 pthread_priority_t _dispatch_background_priority;
702 pthread_priority_t _dispatch_user_initiated_priority;
703
704 static void
705 _dispatch_root_queues_init_qos(int supported)
706 {
707 pthread_priority_t p;
708 qos_class_t qos;
709 unsigned int i;
710 for (i = 0; i < DISPATCH_PRIORITY_COUNT; i++) {
711 p = _pthread_qos_class_encode_workqueue(_dispatch_priority2wq[i], 0);
712 qos = _pthread_qos_class_decode(p, NULL, NULL);
713 dispatch_assert(qos != _DISPATCH_QOS_CLASS_UNSPECIFIED);
714 _dispatch_priority2qos[i] = qos;
715 }
716 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
717 qos = _dispatch_root_queue_contexts[i].dgq_qos;
718 if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
719 !(supported & WORKQ_FEATURE_MAINTENANCE)) {
720 continue;
721 }
722 unsigned long flags = i & 1 ? _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0;
723 flags |= _PTHREAD_PRIORITY_ROOTQUEUE_FLAG;
724 if (i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS ||
725 i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT) {
726 flags |= _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
727 }
728 p = _pthread_qos_class_encode(qos, 0, flags);
729 _dispatch_root_queues[i].dq_priority = p;
730 }
731 p = _pthread_qos_class_encode(qos_class_main(), 0, 0);
732 _dispatch_main_q.dq_priority = p;
733 _dispatch_queue_set_override_priority(&_dispatch_main_q);
734 _dispatch_background_priority = _dispatch_root_queues[
735 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS].dq_priority &
736 ~_PTHREAD_PRIORITY_FLAGS_MASK;
737 _dispatch_user_initiated_priority = _dispatch_root_queues[
738 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS].dq_priority &
739 ~_PTHREAD_PRIORITY_FLAGS_MASK;
740 if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) {
741 _dispatch_set_qos_class_enabled = 1;
742 }
743 }
744 #endif
745
746 static inline bool
747 _dispatch_root_queues_init_workq(void)
748 {
749 bool result = false;
750 #if HAVE_PTHREAD_WORKQUEUES
751 bool disable_wq = false;
752 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
753 disable_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
754 #endif
755 int r;
756 #if HAVE_PTHREAD_WORKQUEUE_QOS
757 bool disable_qos = false;
758 #if DISPATCH_DEBUG
759 disable_qos = slowpath(getenv("LIBDISPATCH_DISABLE_QOS"));
760 #endif
761 if (!disable_qos && !disable_wq) {
762 r = _pthread_workqueue_supported();
763 int supported = r;
764 if (r & WORKQ_FEATURE_FINEPRIO) {
765 r = _pthread_workqueue_init(_dispatch_worker_thread3,
766 offsetof(struct dispatch_queue_s, dq_serialnum), 0);
767 result = !r;
768 if (result) _dispatch_root_queues_init_qos(supported);
769 }
770 }
771 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
772 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
773 if (!result && !disable_wq) {
774 #if PTHREAD_WORKQUEUE_SPI_VERSION >= 20121218
775 pthread_workqueue_setdispatchoffset_np(
776 offsetof(struct dispatch_queue_s, dq_serialnum));
777 #endif
778 r = pthread_workqueue_setdispatch_np(_dispatch_worker_thread2);
779 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
780 (void)dispatch_assume_zero(r);
781 #endif
782 result = !r;
783 }
784 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
785 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
786 if (!result) {
787 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
788 pthread_workqueue_attr_t pwq_attr;
789 if (!disable_wq) {
790 r = pthread_workqueue_attr_init_np(&pwq_attr);
791 (void)dispatch_assume_zero(r);
792 }
793 #endif
794 int i;
795 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
796 pthread_workqueue_t pwq = NULL;
797 dispatch_root_queue_context_t qc;
798 qc = &_dispatch_root_queue_contexts[i];
799 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
800 if (!disable_wq) {
801 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr,
802 qc->dgq_wq_priority);
803 (void)dispatch_assume_zero(r);
804 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr,
805 qc->dgq_wq_options &
806 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT);
807 (void)dispatch_assume_zero(r);
808 r = pthread_workqueue_create_np(&pwq, &pwq_attr);
809 (void)dispatch_assume_zero(r);
810 result = result || dispatch_assume(pwq);
811 }
812 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
813 qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul);
814 }
815 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
816 if (!disable_wq) {
817 r = pthread_workqueue_attr_destroy_np(&pwq_attr);
818 (void)dispatch_assume_zero(r);
819 }
820 #endif
821 }
822 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
823 #endif // HAVE_PTHREAD_WORKQUEUES
824 return result;
825 }
826
827 #if DISPATCH_USE_PTHREAD_POOL
828 static inline void
829 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc,
830 uint8_t pool_size, bool overcommit)
831 {
832 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
833 uint32_t thread_pool_size = overcommit ? MAX_PTHREAD_COUNT :
834 dispatch_hw_config(active_cpus);
835 if (slowpath(pool_size) && pool_size < thread_pool_size) {
836 thread_pool_size = pool_size;
837 }
838 qc->dgq_thread_pool_size = thread_pool_size;
839 if (qc->dgq_qos) {
840 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
841 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
842 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
843 #if HAVE_PTHREAD_WORKQUEUE_QOS
844 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(
845 &pqc->dpq_thread_attr, qc->dgq_qos, 0));
846 #endif
847 }
848 #if USE_MACH_SEM
849 // override the default FIFO behavior for the pool semaphores
850 kern_return_t kr = semaphore_create(mach_task_self(),
851 &pqc->dpq_thread_mediator.dsema_port, SYNC_POLICY_LIFO, 0);
852 DISPATCH_VERIFY_MIG(kr);
853 (void)dispatch_assume_zero(kr);
854 (void)dispatch_assume(pqc->dpq_thread_mediator.dsema_port);
855 #elif USE_POSIX_SEM
856 /* XXXRW: POSIX semaphores don't support LIFO? */
857 int ret = sem_init(&pqc->dpq_thread_mediator.dsema_sem), 0, 0);
858 (void)dispatch_assume_zero(ret);
859 #endif
860 }
861 #endif // DISPATCH_USE_PTHREAD_POOL
862
863 static dispatch_once_t _dispatch_root_queues_pred;
864
865 static void
866 _dispatch_root_queues_init(void *context DISPATCH_UNUSED)
867 {
868 _dispatch_safe_fork = false;
869 if (!_dispatch_root_queues_init_workq()) {
870 #if DISPATCH_ENABLE_THREAD_POOL
871 int i;
872 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
873 bool overcommit = true;
874 #if TARGET_OS_EMBEDDED
875 // some software hangs if the non-overcommitting queues do not
876 // overcommit when threads block. Someday, this behavior should
877 // apply to all platforms
878 if (!(i & 1)) {
879 overcommit = false;
880 }
881 #endif
882 _dispatch_root_queue_init_pthread_pool(
883 &_dispatch_root_queue_contexts[i], 0, overcommit);
884 }
885 #else
886 DISPATCH_CRASH("Root queue initialization failed");
887 #endif // DISPATCH_ENABLE_THREAD_POOL
888 }
889 }
890
891 #define countof(x) (sizeof(x) / sizeof(x[0]))
892
893 DISPATCH_EXPORT DISPATCH_NOTHROW
894 void
895 libdispatch_init(void)
896 {
897 dispatch_assert(DISPATCH_QUEUE_QOS_COUNT == 6);
898 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 12);
899
900 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW ==
901 -DISPATCH_QUEUE_PRIORITY_HIGH);
902 dispatch_assert(countof(_dispatch_root_queues) ==
903 DISPATCH_ROOT_QUEUE_COUNT);
904 dispatch_assert(countof(_dispatch_root_queue_contexts) ==
905 DISPATCH_ROOT_QUEUE_COUNT);
906 dispatch_assert(countof(_dispatch_priority2qos) ==
907 DISPATCH_PRIORITY_COUNT);
908 #if HAVE_PTHREAD_WORKQUEUE_QOS
909 dispatch_assert(countof(_dispatch_priority2wq) ==
910 DISPATCH_PRIORITY_COUNT);
911 #endif
912 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
913 dispatch_assert(sizeof(_dispatch_wq2root_queues) /
914 sizeof(_dispatch_wq2root_queues[0][0]) ==
915 WORKQ_NUM_PRIOQUEUE * 2);
916 #endif
917 #if DISPATCH_ENABLE_THREAD_POOL
918 dispatch_assert(countof(_dispatch_pthread_root_queue_contexts) ==
919 DISPATCH_ROOT_QUEUE_COUNT);
920 #endif
921
922 dispatch_assert(offsetof(struct dispatch_continuation_s, do_next) ==
923 offsetof(struct dispatch_object_s, do_next));
924 dispatch_assert(sizeof(struct dispatch_apply_s) <=
925 DISPATCH_CONTINUATION_SIZE);
926 dispatch_assert(sizeof(struct dispatch_queue_s) % DISPATCH_CACHELINE_SIZE
927 == 0);
928 dispatch_assert(sizeof(struct dispatch_root_queue_context_s) %
929 DISPATCH_CACHELINE_SIZE == 0);
930
931 _dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup);
932 _dispatch_thread_key_create(&dispatch_voucher_key, _voucher_thread_cleanup);
933 _dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup);
934 _dispatch_thread_key_create(&dispatch_io_key, NULL);
935 _dispatch_thread_key_create(&dispatch_apply_key, NULL);
936 _dispatch_thread_key_create(&dispatch_defaultpriority_key, NULL);
937 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
938 _dispatch_thread_key_create(&dispatch_bcounter_key, NULL);
939 #endif
940 #if !DISPATCH_USE_OS_SEMAPHORE_CACHE
941 _dispatch_thread_key_create(&dispatch_sema4_key,
942 (void (*)(void *))_dispatch_thread_semaphore_dispose);
943 #endif
944
945 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
946 _dispatch_main_q.do_targetq = &_dispatch_root_queues[
947 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT];
948 #endif
949
950 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_main_q);
951 _dispatch_queue_set_bound_thread(&_dispatch_main_q);
952
953 #if DISPATCH_USE_PTHREAD_ATFORK
954 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare,
955 dispatch_atfork_parent, dispatch_atfork_child));
956 #endif
957
958 _dispatch_hw_config_init();
959 _dispatch_vtable_init();
960 _os_object_init();
961 _voucher_init();
962 _dispatch_introspection_init();
963 }
964
965 #if HAVE_MACH
966 static dispatch_once_t _dispatch_mach_host_port_pred;
967 static mach_port_t _dispatch_mach_host_port;
968
969 static void
970 _dispatch_mach_host_port_init(void *ctxt DISPATCH_UNUSED)
971 {
972 kern_return_t kr;
973 mach_port_t mp, mhp = mach_host_self();
974 kr = host_get_host_port(mhp, &mp);
975 DISPATCH_VERIFY_MIG(kr);
976 if (!kr) {
977 // mach_host_self returned the HOST_PRIV port
978 kr = mach_port_deallocate(mach_task_self(), mhp);
979 DISPATCH_VERIFY_MIG(kr);
980 (void)dispatch_assume_zero(kr);
981 mhp = mp;
982 } else if (kr != KERN_INVALID_ARGUMENT) {
983 (void)dispatch_assume_zero(kr);
984 }
985 if (!dispatch_assume(mhp)) {
986 DISPATCH_CRASH("Could not get unprivileged host port");
987 }
988 _dispatch_mach_host_port = mhp;
989 }
990
991 mach_port_t
992 _dispatch_get_mach_host_port(void)
993 {
994 dispatch_once_f(&_dispatch_mach_host_port_pred, NULL,
995 _dispatch_mach_host_port_init);
996 return _dispatch_mach_host_port;
997 }
998 #endif
999
1000 DISPATCH_EXPORT DISPATCH_NOTHROW
1001 void
1002 dispatch_atfork_child(void)
1003 {
1004 void *crash = (void *)0x100;
1005 size_t i;
1006
1007 #if HAVE_MACH
1008 _dispatch_mach_host_port_pred = 0;
1009 _dispatch_mach_host_port = MACH_VOUCHER_NULL;
1010 #endif
1011 _voucher_atfork_child();
1012 if (_dispatch_safe_fork) {
1013 return;
1014 }
1015 _dispatch_child_of_unsafe_fork = true;
1016
1017 _dispatch_main_q.dq_items_head = crash;
1018 _dispatch_main_q.dq_items_tail = crash;
1019
1020 _dispatch_mgr_q.dq_items_head = crash;
1021 _dispatch_mgr_q.dq_items_tail = crash;
1022
1023 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
1024 _dispatch_root_queues[i].dq_items_head = crash;
1025 _dispatch_root_queues[i].dq_items_tail = crash;
1026 }
1027 }
1028
1029 #pragma mark -
1030 #pragma mark dispatch_queue_attr_t
1031
1032 DISPATCH_ALWAYS_INLINE
1033 static inline bool
1034 _dispatch_qos_class_valid(dispatch_qos_class_t qos_class, int relative_priority)
1035 {
1036 qos_class_t qos = (qos_class_t)qos_class;
1037 switch (qos) {
1038 case _DISPATCH_QOS_CLASS_MAINTENANCE:
1039 case _DISPATCH_QOS_CLASS_BACKGROUND:
1040 case _DISPATCH_QOS_CLASS_UTILITY:
1041 case _DISPATCH_QOS_CLASS_DEFAULT:
1042 case _DISPATCH_QOS_CLASS_USER_INITIATED:
1043 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE:
1044 case _DISPATCH_QOS_CLASS_UNSPECIFIED:
1045 break;
1046 default:
1047 return false;
1048 }
1049 if (relative_priority > 0 || relative_priority < QOS_MIN_RELATIVE_PRIORITY){
1050 return false;
1051 }
1052 return true;
1053 }
1054
1055 #define DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(qos) \
1056 [_DISPATCH_QOS_CLASS_##qos] = DQA_INDEX_QOS_CLASS_##qos
1057
1058 static const
1059 _dispatch_queue_attr_index_qos_class_t _dispatch_queue_attr_qos2idx[] = {
1060 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UNSPECIFIED),
1061 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(MAINTENANCE),
1062 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(BACKGROUND),
1063 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UTILITY),
1064 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(DEFAULT),
1065 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INITIATED),
1066 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INTERACTIVE),
1067 };
1068
1069 #define DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit) \
1070 (overcommit ? DQA_INDEX_OVERCOMMIT : DQA_INDEX_NON_OVERCOMMIT)
1071
1072 #define DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent) \
1073 (concurrent ? DQA_INDEX_CONCURRENT : DQA_INDEX_SERIAL)
1074
1075 #define DISPATCH_QUEUE_ATTR_PRIO2IDX(prio) (-(prio))
1076
1077 #define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (_dispatch_queue_attr_qos2idx[(qos)])
1078
1079 static inline dispatch_queue_attr_t
1080 _dispatch_get_queue_attr(qos_class_t qos, int prio, bool overcommit,
1081 bool concurrent)
1082 {
1083 return (dispatch_queue_attr_t)&_dispatch_queue_attrs
1084 [DISPATCH_QUEUE_ATTR_QOS2IDX(qos)]
1085 [DISPATCH_QUEUE_ATTR_PRIO2IDX(prio)]
1086 [DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit)]
1087 [DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent)];
1088 }
1089
1090 dispatch_queue_attr_t
1091 dispatch_queue_attr_make_with_qos_class(dispatch_queue_attr_t dqa,
1092 dispatch_qos_class_t qos_class, int relative_priority)
1093 {
1094 if (!_dispatch_qos_class_valid(qos_class, relative_priority)) return NULL;
1095 if (!slowpath(dqa)) {
1096 dqa = _dispatch_get_queue_attr(0, 0, false, false);
1097 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1098 DISPATCH_CLIENT_CRASH("Invalid queue attribute");
1099 }
1100 return _dispatch_get_queue_attr(qos_class, relative_priority,
1101 dqa->dqa_overcommit, dqa->dqa_concurrent);
1102 }
1103
1104 dispatch_queue_attr_t
1105 dispatch_queue_attr_make_with_overcommit(dispatch_queue_attr_t dqa,
1106 bool overcommit)
1107 {
1108 if (!slowpath(dqa)) {
1109 dqa = _dispatch_get_queue_attr(0, 0, false, false);
1110 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1111 DISPATCH_CLIENT_CRASH("Invalid queue attribute");
1112 }
1113 return _dispatch_get_queue_attr(dqa->dqa_qos_class,
1114 dqa->dqa_relative_priority, overcommit, dqa->dqa_concurrent);
1115 }
1116
1117 #pragma mark -
1118 #pragma mark dispatch_queue_t
1119
1120 // skip zero
1121 // 1 - main_q
1122 // 2 - mgr_q
1123 // 3 - mgr_root_q
1124 // 4,5,6,7,8,9,10,11,12,13,14,15 - global queues
1125 // we use 'xadd' on Intel, so the initial value == next assigned
1126 unsigned long volatile _dispatch_queue_serial_numbers = 16;
1127
1128 dispatch_queue_t
1129 dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
1130 dispatch_queue_t tq)
1131 {
1132 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1133 // Be sure the root queue priorities are set
1134 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
1135 _dispatch_root_queues_init);
1136 #endif
1137 bool disallow_tq = (slowpath(dqa) && dqa != DISPATCH_QUEUE_CONCURRENT);
1138 if (!slowpath(dqa)) {
1139 dqa = _dispatch_get_queue_attr(0, 0, false, false);
1140 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
1141 DISPATCH_CLIENT_CRASH("Invalid queue attribute");
1142 }
1143 dispatch_queue_t dq = _dispatch_alloc(DISPATCH_VTABLE(queue),
1144 sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
1145 _dispatch_queue_init(dq);
1146 if (label) {
1147 dq->dq_label = strdup(label);
1148 }
1149 qos_class_t qos = dqa->dqa_qos_class;
1150 bool overcommit = dqa->dqa_overcommit;
1151 #if HAVE_PTHREAD_WORKQUEUE_QOS
1152 dq->dq_priority = _pthread_qos_class_encode(qos, dqa->dqa_relative_priority,
1153 overcommit);
1154 #endif
1155 if (dqa->dqa_concurrent) {
1156 dq->dq_width = DISPATCH_QUEUE_WIDTH_MAX;
1157 } else {
1158 // Default serial queue target queue is overcommit!
1159 overcommit = true;
1160 }
1161 if (!tq) {
1162 if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) {
1163 qos = _DISPATCH_QOS_CLASS_DEFAULT;
1164 }
1165 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1166 if (qos == _DISPATCH_QOS_CLASS_USER_INTERACTIVE &&
1167 !_dispatch_root_queues[
1168 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS].dq_priority) {
1169 qos = _DISPATCH_QOS_CLASS_USER_INITIATED;
1170 }
1171 #endif
1172 bool maintenance_fallback = false;
1173 #if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1174 maintenance_fallback = true;
1175 #endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
1176 if (maintenance_fallback) {
1177 if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
1178 !_dispatch_root_queues[
1179 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS].dq_priority) {
1180 qos = _DISPATCH_QOS_CLASS_BACKGROUND;
1181 }
1182 }
1183
1184 tq = _dispatch_get_root_queue(qos, overcommit);
1185 if (slowpath(!tq)) {
1186 DISPATCH_CLIENT_CRASH("Invalid queue attribute");
1187 }
1188 } else {
1189 _dispatch_retain(tq);
1190 if (disallow_tq) {
1191 // TODO: override target queue's qos/overcommit ?
1192 DISPATCH_CLIENT_CRASH("Invalid combination of target queue & "
1193 "queue attribute");
1194 }
1195 _dispatch_queue_priority_inherit_from_target(dq, tq);
1196 }
1197 _dispatch_queue_set_override_priority(dq);
1198 dq->do_targetq = tq;
1199 _dispatch_object_debug(dq, "%s", __func__);
1200 return _dispatch_introspection_queue_create(dq);
1201 }
1202
1203 dispatch_queue_t
1204 dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
1205 {
1206 return dispatch_queue_create_with_target(label, attr,
1207 DISPATCH_TARGET_QUEUE_DEFAULT);
1208 }
1209
1210 void
1211 _dispatch_queue_destroy(dispatch_object_t dou)
1212 {
1213 dispatch_queue_t dq = dou._dq;
1214 if (slowpath(dq == _dispatch_queue_get_current())) {
1215 DISPATCH_CRASH("Release of a queue by itself");
1216 }
1217 if (slowpath(dq->dq_items_tail)) {
1218 DISPATCH_CRASH("Release of a queue while items are enqueued");
1219 }
1220
1221 // trash the tail queue so that use after free will crash
1222 dq->dq_items_tail = (void *)0x200;
1223
1224 dispatch_queue_t dqsq = dispatch_atomic_xchg2o(dq, dq_specific_q,
1225 (void *)0x200, relaxed);
1226 if (dqsq) {
1227 _dispatch_release(dqsq);
1228 }
1229 }
1230
1231 // 6618342 Contact the team that owns the Instrument DTrace probe before
1232 // renaming this symbol
1233 void
1234 _dispatch_queue_dispose(dispatch_queue_t dq)
1235 {
1236 _dispatch_object_debug(dq, "%s", __func__);
1237 _dispatch_introspection_queue_dispose(dq);
1238 if (dq->dq_label) {
1239 free((void*)dq->dq_label);
1240 }
1241 _dispatch_queue_destroy(dq);
1242 }
1243
1244 const char *
1245 dispatch_queue_get_label(dispatch_queue_t dq)
1246 {
1247 if (slowpath(dq == DISPATCH_CURRENT_QUEUE_LABEL)) {
1248 dq = _dispatch_get_current_queue();
1249 }
1250 return dq->dq_label ? dq->dq_label : "";
1251 }
1252
1253 qos_class_t
1254 dispatch_queue_get_qos_class(dispatch_queue_t dq, int *relative_priority_ptr)
1255 {
1256 qos_class_t qos = _DISPATCH_QOS_CLASS_UNSPECIFIED;
1257 int relative_priority = 0;
1258 #if HAVE_PTHREAD_WORKQUEUE_QOS
1259 pthread_priority_t dqp = dq->dq_priority;
1260 if (dqp & _PTHREAD_PRIORITY_INHERIT_FLAG) dqp = 0;
1261 qos = _pthread_qos_class_decode(dqp, &relative_priority, NULL);
1262 #else
1263 (void)dq;
1264 #endif
1265 if (relative_priority_ptr) *relative_priority_ptr = relative_priority;
1266 return qos;
1267 }
1268
1269 static void
1270 _dispatch_queue_set_width2(void *ctxt)
1271 {
1272 int w = (int)(intptr_t)ctxt; // intentional truncation
1273 uint32_t tmp;
1274 dispatch_queue_t dq = _dispatch_queue_get_current();
1275
1276 if (w == 1 || w == 0) {
1277 dq->dq_width = 1;
1278 _dispatch_object_debug(dq, "%s", __func__);
1279 return;
1280 }
1281 if (w > 0) {
1282 tmp = (unsigned int)w;
1283 } else switch (w) {
1284 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
1285 tmp = dispatch_hw_config(physical_cpus);
1286 break;
1287 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
1288 tmp = dispatch_hw_config(active_cpus);
1289 break;
1290 default:
1291 // fall through
1292 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
1293 tmp = dispatch_hw_config(logical_cpus);
1294 break;
1295 }
1296 if (tmp > DISPATCH_QUEUE_WIDTH_MAX / 2) {
1297 tmp = DISPATCH_QUEUE_WIDTH_MAX / 2;
1298 }
1299 // multiply by two since the running count is inc/dec by two
1300 // (the low bit == barrier)
1301 dq->dq_width = (typeof(dq->dq_width))(tmp * 2);
1302 _dispatch_object_debug(dq, "%s", __func__);
1303 }
1304
1305 void
1306 dispatch_queue_set_width(dispatch_queue_t dq, long width)
1307 {
1308 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) ||
1309 slowpath(dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE)) {
1310 return;
1311 }
1312 _dispatch_barrier_trysync_f(dq, (void*)(intptr_t)width,
1313 _dispatch_queue_set_width2);
1314 }
1315
1316 // 6618342 Contact the team that owns the Instrument DTrace probe before
1317 // renaming this symbol
1318 static void
1319 _dispatch_set_target_queue2(void *ctxt)
1320 {
1321 dispatch_queue_t prev_dq, dq = _dispatch_queue_get_current(), tq = ctxt;
1322 mach_port_t th;
1323
1324 while (!dispatch_atomic_cmpxchgv2o(dq, dq_tqthread, MACH_PORT_NULL,
1325 _dispatch_thread_port(), &th, acquire)) {
1326 _dispatch_thread_switch(th, DISPATCH_YIELD_THREAD_SWITCH_OPTION,
1327 DISPATCH_CONTENTION_USLEEP_START);
1328 }
1329 _dispatch_queue_priority_inherit_from_target(dq, tq);
1330 prev_dq = dq->do_targetq;
1331 dq->do_targetq = tq;
1332 _dispatch_release(prev_dq);
1333 _dispatch_object_debug(dq, "%s", __func__);
1334 dispatch_atomic_store2o(dq, dq_tqthread, MACH_PORT_NULL, release);
1335 }
1336
1337 void
1338 dispatch_set_target_queue(dispatch_object_t dou, dispatch_queue_t dq)
1339 {
1340 DISPATCH_OBJECT_TFB(_dispatch_objc_set_target_queue, dou, dq);
1341 if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) ||
1342 slowpath(dx_type(dou._do) == DISPATCH_QUEUE_ROOT_TYPE)) {
1343 return;
1344 }
1345 unsigned long type = dx_metatype(dou._do);
1346 if (slowpath(!dq)) {
1347 bool is_concurrent_q = (type == _DISPATCH_QUEUE_TYPE &&
1348 slowpath(dou._dq->dq_width > 1));
1349 dq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
1350 !is_concurrent_q);
1351 }
1352 // TODO: put into the vtable
1353 switch(type) {
1354 case _DISPATCH_QUEUE_TYPE:
1355 case _DISPATCH_SOURCE_TYPE:
1356 _dispatch_retain(dq);
1357 return _dispatch_barrier_trysync_f(dou._dq, dq,
1358 _dispatch_set_target_queue2);
1359 case _DISPATCH_IO_TYPE:
1360 return _dispatch_io_set_target_queue(dou._dchannel, dq);
1361 default: {
1362 dispatch_queue_t prev_dq;
1363 _dispatch_retain(dq);
1364 prev_dq = dispatch_atomic_xchg2o(dou._do, do_targetq, dq, release);
1365 if (prev_dq) _dispatch_release(prev_dq);
1366 _dispatch_object_debug(dou._do, "%s", __func__);
1367 return;
1368 }
1369 }
1370 }
1371
1372 #pragma mark -
1373 #pragma mark dispatch_pthread_root_queue
1374
1375 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
1376 static struct dispatch_pthread_root_queue_context_s
1377 _dispatch_mgr_root_queue_pthread_context;
1378 static struct dispatch_root_queue_context_s
1379 _dispatch_mgr_root_queue_context = {{{
1380 #if HAVE_PTHREAD_WORKQUEUES
1381 .dgq_kworkqueue = (void*)(~0ul),
1382 #endif
1383 .dgq_ctxt = &_dispatch_mgr_root_queue_pthread_context,
1384 .dgq_thread_pool_size = 1,
1385 }}};
1386 static struct dispatch_queue_s _dispatch_mgr_root_queue = {
1387 .do_vtable = DISPATCH_VTABLE(queue_root),
1388 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
1389 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
1390 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
1391 .do_ctxt = &_dispatch_mgr_root_queue_context,
1392 .dq_label = "com.apple.root.libdispatch-manager",
1393 .dq_running = 2,
1394 .dq_width = DISPATCH_QUEUE_WIDTH_MAX,
1395 .dq_serialnum = 3,
1396 };
1397 static struct {
1398 volatile int prio;
1399 int default_prio;
1400 int policy;
1401 pthread_t tid;
1402 } _dispatch_mgr_sched;
1403 static dispatch_once_t _dispatch_mgr_sched_pred;
1404
1405 static void
1406 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED)
1407 {
1408 struct sched_param param;
1409 pthread_attr_t *attr;
1410 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
1411 (void)dispatch_assume_zero(pthread_attr_init(attr));
1412 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr,
1413 &_dispatch_mgr_sched.policy));
1414 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
1415 // legacy priority calls allowed when requesting above default priority
1416 _dispatch_mgr_sched.default_prio = param.sched_priority;
1417 _dispatch_mgr_sched.prio = _dispatch_mgr_sched.default_prio;
1418 }
1419
1420 DISPATCH_NOINLINE
1421 static pthread_t *
1422 _dispatch_mgr_root_queue_init(void)
1423 {
1424 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
1425 struct sched_param param;
1426 pthread_attr_t *attr;
1427 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
1428 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr,
1429 PTHREAD_CREATE_DETACHED));
1430 #if !DISPATCH_DEBUG
1431 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr, 64 * 1024));
1432 #endif
1433 #if HAVE_PTHREAD_WORKQUEUE_QOS
1434 if (_dispatch_set_qos_class_enabled) {
1435 qos_class_t qos = qos_class_main();
1436 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(attr, qos, 0));
1437 _dispatch_mgr_q.dq_priority = _pthread_qos_class_encode(qos, 0, 0);
1438 _dispatch_queue_set_override_priority(&_dispatch_mgr_q);
1439 }
1440 #endif
1441 param.sched_priority = _dispatch_mgr_sched.prio;
1442 if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
1443 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr, &param));
1444 }
1445 return &_dispatch_mgr_sched.tid;
1446 }
1447
1448 static inline void
1449 _dispatch_mgr_priority_apply(void)
1450 {
1451 struct sched_param param;
1452 do {
1453 param.sched_priority = _dispatch_mgr_sched.prio;
1454 if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
1455 (void)dispatch_assume_zero(pthread_setschedparam(
1456 _dispatch_mgr_sched.tid, _dispatch_mgr_sched.policy,
1457 &param));
1458 }
1459 } while (_dispatch_mgr_sched.prio > param.sched_priority);
1460 }
1461
1462 DISPATCH_NOINLINE
1463 void
1464 _dispatch_mgr_priority_init(void)
1465 {
1466 struct sched_param param;
1467 pthread_attr_t *attr;
1468 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
1469 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
1470 if (slowpath(_dispatch_mgr_sched.prio > param.sched_priority)) {
1471 return _dispatch_mgr_priority_apply();
1472 }
1473 }
1474
1475 DISPATCH_NOINLINE
1476 static void
1477 _dispatch_mgr_priority_raise(const pthread_attr_t *attr)
1478 {
1479 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
1480 struct sched_param param;
1481 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
1482 int p = _dispatch_mgr_sched.prio;
1483 do if (p >= param.sched_priority) {
1484 return;
1485 } while (slowpath(!dispatch_atomic_cmpxchgvw2o(&_dispatch_mgr_sched, prio,
1486 p, param.sched_priority, &p, relaxed)));
1487 if (_dispatch_mgr_sched.tid) {
1488 return _dispatch_mgr_priority_apply();
1489 }
1490 }
1491
1492 dispatch_queue_t
1493 dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
1494 const pthread_attr_t *attr, dispatch_block_t configure)
1495 {
1496 dispatch_queue_t dq;
1497 dispatch_root_queue_context_t qc;
1498 dispatch_pthread_root_queue_context_t pqc;
1499 size_t dqs;
1500 uint8_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ?
1501 (uint8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0;
1502
1503 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
1504 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_root), dqs +
1505 sizeof(struct dispatch_root_queue_context_s) +
1506 sizeof(struct dispatch_pthread_root_queue_context_s));
1507 qc = (void*)dq + dqs;
1508 pqc = (void*)qc + sizeof(struct dispatch_root_queue_context_s);
1509
1510 _dispatch_queue_init(dq);
1511 if (label) {
1512 dq->dq_label = strdup(label);
1513 }
1514
1515 dq->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK;
1516 dq->do_ctxt = qc;
1517 dq->do_targetq = NULL;
1518 dq->dq_running = 2;
1519 dq->dq_width = DISPATCH_QUEUE_WIDTH_MAX;
1520
1521 pqc->dpq_thread_mediator.do_vtable = DISPATCH_VTABLE(semaphore);
1522 qc->dgq_ctxt = pqc;
1523 #if HAVE_PTHREAD_WORKQUEUES
1524 qc->dgq_kworkqueue = (void*)(~0ul);
1525 #endif
1526 _dispatch_root_queue_init_pthread_pool(qc, pool_size, true);
1527
1528 if (attr) {
1529 memcpy(&pqc->dpq_thread_attr, attr, sizeof(pthread_attr_t));
1530 #if HAVE_PTHREAD_WORKQUEUE_QOS
1531 qos_class_t qos = 0;
1532 if (!pthread_attr_get_qos_class_np(&pqc->dpq_thread_attr, &qos, NULL)
1533 && qos > _DISPATCH_QOS_CLASS_DEFAULT) {
1534 DISPATCH_CLIENT_CRASH("pthread root queues do not support "
1535 "explicit QoS attributes");
1536 }
1537 #endif
1538 _dispatch_mgr_priority_raise(&pqc->dpq_thread_attr);
1539 } else {
1540 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
1541 }
1542 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
1543 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
1544 if (configure) {
1545 pqc->dpq_thread_configure = _dispatch_Block_copy(configure);
1546 }
1547 _dispatch_object_debug(dq, "%s", __func__);
1548 return _dispatch_introspection_queue_create(dq);
1549 }
1550 #endif
1551
1552 void
1553 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq)
1554 {
1555 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
1556 DISPATCH_CRASH("Global root queue disposed");
1557 }
1558 _dispatch_object_debug(dq, "%s", __func__);
1559 _dispatch_introspection_queue_dispose(dq);
1560 #if DISPATCH_USE_PTHREAD_POOL
1561 dispatch_root_queue_context_t qc = dq->do_ctxt;
1562 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
1563
1564 pthread_attr_destroy(&pqc->dpq_thread_attr);
1565 _dispatch_semaphore_dispose(&pqc->dpq_thread_mediator);
1566 if (pqc->dpq_thread_configure) {
1567 Block_release(pqc->dpq_thread_configure);
1568 }
1569 dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
1570 false);
1571 #endif
1572 if (dq->dq_label) {
1573 free((void*)dq->dq_label);
1574 }
1575 _dispatch_queue_destroy(dq);
1576 }
1577
1578 #pragma mark -
1579 #pragma mark dispatch_queue_specific
1580
1581 struct dispatch_queue_specific_queue_s {
1582 DISPATCH_STRUCT_HEADER(queue_specific_queue);
1583 DISPATCH_QUEUE_HEADER;
1584 TAILQ_HEAD(dispatch_queue_specific_head_s,
1585 dispatch_queue_specific_s) dqsq_contexts;
1586 };
1587
1588 struct dispatch_queue_specific_s {
1589 const void *dqs_key;
1590 void *dqs_ctxt;
1591 dispatch_function_t dqs_destructor;
1592 TAILQ_ENTRY(dispatch_queue_specific_s) dqs_list;
1593 };
1594 DISPATCH_DECL(dispatch_queue_specific);
1595
1596 void
1597 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq)
1598 {
1599 dispatch_queue_specific_t dqs, tmp;
1600
1601 TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) {
1602 if (dqs->dqs_destructor) {
1603 dispatch_async_f(_dispatch_get_root_queue(
1604 _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt,
1605 dqs->dqs_destructor);
1606 }
1607 free(dqs);
1608 }
1609 _dispatch_queue_destroy((dispatch_queue_t)dqsq);
1610 }
1611
1612 static void
1613 _dispatch_queue_init_specific(dispatch_queue_t dq)
1614 {
1615 dispatch_queue_specific_queue_t dqsq;
1616
1617 dqsq = _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue),
1618 sizeof(struct dispatch_queue_specific_queue_s));
1619 _dispatch_queue_init((dispatch_queue_t)dqsq);
1620 dqsq->do_xref_cnt = -1;
1621 dqsq->do_targetq = _dispatch_get_root_queue(
1622 _DISPATCH_QOS_CLASS_USER_INITIATED, true);
1623 dqsq->dq_width = DISPATCH_QUEUE_WIDTH_MAX;
1624 dqsq->dq_label = "queue-specific";
1625 TAILQ_INIT(&dqsq->dqsq_contexts);
1626 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_specific_q, NULL,
1627 (dispatch_queue_t)dqsq, release))) {
1628 _dispatch_release((dispatch_queue_t)dqsq);
1629 }
1630 }
1631
1632 static void
1633 _dispatch_queue_set_specific(void *ctxt)
1634 {
1635 dispatch_queue_specific_t dqs, dqsn = ctxt;
1636 dispatch_queue_specific_queue_t dqsq =
1637 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
1638
1639 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
1640 if (dqs->dqs_key == dqsn->dqs_key) {
1641 // Destroy previous context for existing key
1642 if (dqs->dqs_destructor) {
1643 dispatch_async_f(_dispatch_get_root_queue(
1644 _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt,
1645 dqs->dqs_destructor);
1646 }
1647 if (dqsn->dqs_ctxt) {
1648 // Copy new context for existing key
1649 dqs->dqs_ctxt = dqsn->dqs_ctxt;
1650 dqs->dqs_destructor = dqsn->dqs_destructor;
1651 } else {
1652 // Remove context storage for existing key
1653 TAILQ_REMOVE(&dqsq->dqsq_contexts, dqs, dqs_list);
1654 free(dqs);
1655 }
1656 return free(dqsn);
1657 }
1658 }
1659 // Insert context storage for new key
1660 TAILQ_INSERT_TAIL(&dqsq->dqsq_contexts, dqsn, dqs_list);
1661 }
1662
1663 DISPATCH_NOINLINE
1664 void
1665 dispatch_queue_set_specific(dispatch_queue_t dq, const void *key,
1666 void *ctxt, dispatch_function_t destructor)
1667 {
1668 if (slowpath(!key)) {
1669 return;
1670 }
1671 dispatch_queue_specific_t dqs;
1672
1673 dqs = _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s));
1674 dqs->dqs_key = key;
1675 dqs->dqs_ctxt = ctxt;
1676 dqs->dqs_destructor = destructor;
1677 if (slowpath(!dq->dq_specific_q)) {
1678 _dispatch_queue_init_specific(dq);
1679 }
1680 _dispatch_barrier_trysync_f(dq->dq_specific_q, dqs,
1681 _dispatch_queue_set_specific);
1682 }
1683
1684 static void
1685 _dispatch_queue_get_specific(void *ctxt)
1686 {
1687 void **ctxtp = ctxt;
1688 void *key = *ctxtp;
1689 dispatch_queue_specific_queue_t dqsq =
1690 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
1691 dispatch_queue_specific_t dqs;
1692
1693 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
1694 if (dqs->dqs_key == key) {
1695 *ctxtp = dqs->dqs_ctxt;
1696 return;
1697 }
1698 }
1699 *ctxtp = NULL;
1700 }
1701
1702 DISPATCH_NOINLINE
1703 void *
1704 dispatch_queue_get_specific(dispatch_queue_t dq, const void *key)
1705 {
1706 if (slowpath(!key)) {
1707 return NULL;
1708 }
1709 void *ctxt = NULL;
1710
1711 if (fastpath(dq->dq_specific_q)) {
1712 ctxt = (void *)key;
1713 dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific);
1714 }
1715 return ctxt;
1716 }
1717
1718 DISPATCH_NOINLINE
1719 void *
1720 dispatch_get_specific(const void *key)
1721 {
1722 if (slowpath(!key)) {
1723 return NULL;
1724 }
1725 void *ctxt = NULL;
1726 dispatch_queue_t dq = _dispatch_queue_get_current();
1727
1728 while (slowpath(dq)) {
1729 if (slowpath(dq->dq_specific_q)) {
1730 ctxt = (void *)key;
1731 dispatch_sync_f(dq->dq_specific_q, &ctxt,
1732 _dispatch_queue_get_specific);
1733 if (ctxt) break;
1734 }
1735 dq = dq->do_targetq;
1736 }
1737 return ctxt;
1738 }
1739
1740 #pragma mark -
1741 #pragma mark dispatch_queue_debug
1742
1743 size_t
1744 _dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
1745 {
1746 size_t offset = 0;
1747 dispatch_queue_t target = dq->do_targetq;
1748 offset += dsnprintf(buf, bufsiz, "target = %s[%p], width = 0x%x, "
1749 "running = 0x%x, barrier = %d ", target && target->dq_label ?
1750 target->dq_label : "", target, dq->dq_width / 2,
1751 dq->dq_running / 2, dq->dq_running & 1);
1752 if (dq->dq_is_thread_bound) {
1753 offset += dsnprintf(buf, bufsiz, ", thread = 0x%x ",
1754 _dispatch_queue_get_bound_thread(dq));
1755 }
1756 return offset;
1757 }
1758
1759 size_t
1760 dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
1761 {
1762 size_t offset = 0;
1763 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
1764 dq->dq_label ? dq->dq_label : dx_kind(dq), dq);
1765 offset += _dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
1766 offset += _dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
1767 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
1768 return offset;
1769 }
1770
1771 #if DISPATCH_DEBUG
1772 void
1773 dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
1774 if (fastpath(dq)) {
1775 _dispatch_object_debug(dq, "%s", str);
1776 } else {
1777 _dispatch_log("queue[NULL]: %s", str);
1778 }
1779 }
1780 #endif
1781
1782 #if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION
1783 static OSSpinLock _dispatch_stats_lock;
1784 static struct {
1785 uint64_t time_total;
1786 uint64_t count_total;
1787 uint64_t thread_total;
1788 } _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set
1789
1790 static void
1791 _dispatch_queue_merge_stats(uint64_t start)
1792 {
1793 uint64_t delta = _dispatch_absolute_time() - start;
1794 unsigned long count;
1795
1796 count = (unsigned long)_dispatch_thread_getspecific(dispatch_bcounter_key);
1797 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
1798
1799 int bucket = flsl((long)count);
1800
1801 // 64-bit counters on 32-bit require a lock or a queue
1802 OSSpinLockLock(&_dispatch_stats_lock);
1803
1804 _dispatch_stats[bucket].time_total += delta;
1805 _dispatch_stats[bucket].count_total += count;
1806 _dispatch_stats[bucket].thread_total++;
1807
1808 OSSpinLockUnlock(&_dispatch_stats_lock);
1809 }
1810 #endif
1811
1812 #pragma mark -
1813 #pragma mark dispatch_continuation_t
1814
1815 static void
1816 _dispatch_force_cache_cleanup(void)
1817 {
1818 dispatch_continuation_t dc;
1819 dc = _dispatch_thread_getspecific(dispatch_cache_key);
1820 if (dc) {
1821 _dispatch_thread_setspecific(dispatch_cache_key, NULL);
1822 _dispatch_cache_cleanup(dc);
1823 }
1824 }
1825
1826 DISPATCH_NOINLINE
1827 static void
1828 _dispatch_cache_cleanup(void *value)
1829 {
1830 dispatch_continuation_t dc, next_dc = value;
1831
1832 while ((dc = next_dc)) {
1833 next_dc = dc->do_next;
1834 _dispatch_continuation_free_to_heap(dc);
1835 }
1836 }
1837
1838 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
1839 int _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT;
1840
1841 DISPATCH_NOINLINE
1842 void
1843 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc)
1844 {
1845 _dispatch_continuation_free_to_heap(dc);
1846 dispatch_continuation_t next_dc;
1847 dc = _dispatch_thread_getspecific(dispatch_cache_key);
1848 int cnt;
1849 if (!dc || (cnt = dc->dc_cache_cnt -
1850 _dispatch_continuation_cache_limit) <= 0){
1851 return;
1852 }
1853 do {
1854 next_dc = dc->do_next;
1855 _dispatch_continuation_free_to_heap(dc);
1856 } while (--cnt && (dc = next_dc));
1857 _dispatch_thread_setspecific(dispatch_cache_key, next_dc);
1858 }
1859 #endif
1860
1861 DISPATCH_ALWAYS_INLINE_NDEBUG
1862 static inline void
1863 _dispatch_continuation_redirect(dispatch_queue_t dq, dispatch_object_t dou)
1864 {
1865 dispatch_continuation_t dc = dou._dc;
1866
1867 (void)dispatch_atomic_add2o(dq, dq_running, 2, acquire);
1868 if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
1869 (long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT) {
1870 _dispatch_trace_continuation_pop(dq, dou);
1871 _dispatch_thread_semaphore_signal(
1872 (_dispatch_thread_semaphore_t)dc->dc_other);
1873 _dispatch_introspection_queue_item_complete(dou);
1874 } else {
1875 _dispatch_async_f_redirect(dq, dc,
1876 _dispatch_queue_get_override_priority(dq));
1877 }
1878 _dispatch_perfmon_workitem_inc();
1879 }
1880
1881 #pragma mark -
1882 #pragma mark dispatch_block_create
1883
1884 #if __BLOCKS__
1885
1886 DISPATCH_ALWAYS_INLINE
1887 static inline bool
1888 _dispatch_block_flags_valid(dispatch_block_flags_t flags)
1889 {
1890 return ((flags & ~DISPATCH_BLOCK_API_MASK) == 0);
1891 }
1892
1893 DISPATCH_ALWAYS_INLINE
1894 static inline dispatch_block_flags_t
1895 _dispatch_block_normalize_flags(dispatch_block_flags_t flags)
1896 {
1897 if (flags & (DISPATCH_BLOCK_NO_VOUCHER|DISPATCH_BLOCK_DETACHED)) {
1898 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
1899 }
1900 if (flags & (DISPATCH_BLOCK_NO_QOS_CLASS|DISPATCH_BLOCK_DETACHED)) {
1901 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
1902 }
1903 return flags;
1904 }
1905
1906 static inline dispatch_block_t
1907 _dispatch_block_create_with_voucher_and_priority(dispatch_block_flags_t flags,
1908 voucher_t voucher, pthread_priority_t pri, dispatch_block_t block)
1909 {
1910 flags = _dispatch_block_normalize_flags(flags);
1911 voucher_t cv = NULL;
1912 bool assign = (flags & DISPATCH_BLOCK_ASSIGN_CURRENT);
1913 if (assign && !(flags & DISPATCH_BLOCK_HAS_VOUCHER)) {
1914 voucher = cv = voucher_copy();
1915 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
1916 }
1917 if (assign && !(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
1918 pri = _dispatch_priority_propagate();
1919 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
1920 }
1921 dispatch_block_t db = _dispatch_block_create(flags, voucher, pri, block);
1922 if (cv) _voucher_release(cv);
1923 #if DISPATCH_DEBUG
1924 dispatch_assert(_dispatch_block_get_data(db));
1925 #endif
1926 return db;
1927 }
1928
1929 dispatch_block_t
1930 dispatch_block_create(dispatch_block_flags_t flags, dispatch_block_t block)
1931 {
1932 if (!_dispatch_block_flags_valid(flags)) return NULL;
1933 return _dispatch_block_create_with_voucher_and_priority(flags, NULL, 0,
1934 block);
1935 }
1936
1937 dispatch_block_t
1938 dispatch_block_create_with_qos_class(dispatch_block_flags_t flags,
1939 dispatch_qos_class_t qos_class, int relative_priority,
1940 dispatch_block_t block)
1941 {
1942 if (!_dispatch_block_flags_valid(flags)) return NULL;
1943 if (!_dispatch_qos_class_valid(qos_class, relative_priority)) return NULL;
1944 flags |= DISPATCH_BLOCK_HAS_PRIORITY;
1945 pthread_priority_t pri = 0;
1946 #if HAVE_PTHREAD_WORKQUEUE_QOS
1947 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0);
1948 #endif
1949 return _dispatch_block_create_with_voucher_and_priority(flags, NULL,
1950 pri, block);
1951 }
1952
1953 dispatch_block_t
1954 dispatch_block_create_with_voucher(dispatch_block_flags_t flags,
1955 voucher_t voucher, dispatch_block_t block)
1956 {
1957 if (!_dispatch_block_flags_valid(flags)) return NULL;
1958 flags |= DISPATCH_BLOCK_HAS_VOUCHER;
1959 return _dispatch_block_create_with_voucher_and_priority(flags, voucher, 0,
1960 block);
1961 }
1962
1963 dispatch_block_t
1964 dispatch_block_create_with_voucher_and_qos_class(dispatch_block_flags_t flags,
1965 voucher_t voucher, dispatch_qos_class_t qos_class,
1966 int relative_priority, dispatch_block_t block)
1967 {
1968 if (!_dispatch_block_flags_valid(flags)) return NULL;
1969 if (!_dispatch_qos_class_valid(qos_class, relative_priority)) return NULL;
1970 flags |= (DISPATCH_BLOCK_HAS_VOUCHER|DISPATCH_BLOCK_HAS_PRIORITY);
1971 pthread_priority_t pri = 0;
1972 #if HAVE_PTHREAD_WORKQUEUE_QOS
1973 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0);
1974 #endif
1975 return _dispatch_block_create_with_voucher_and_priority(flags, voucher,
1976 pri, block);
1977 }
1978
1979 void
1980 dispatch_block_perform(dispatch_block_flags_t flags, dispatch_block_t block)
1981 {
1982 if (!_dispatch_block_flags_valid(flags)) {
1983 DISPATCH_CLIENT_CRASH("Invalid flags passed to "
1984 "dispatch_block_perform()");
1985 }
1986 flags = _dispatch_block_normalize_flags(flags);
1987 struct dispatch_block_private_data_s dbpds =
1988 DISPATCH_BLOCK_PRIVATE_DATA_INITIALIZER(flags, NULL, 0, block);
1989 dbpds.dbpd_atomic_flags |= DBF_PERFORM; // no group_leave at end of invoke
1990 return _dispatch_block_invoke(&dbpds);
1991 }
1992
1993 #define _dbpd_group(dbpd) ((dispatch_group_t)&(dbpd)->dbpd_group)
1994
1995 void
1996 _dispatch_block_invoke(const struct dispatch_block_private_data_s *dbcpd)
1997 {
1998 dispatch_block_private_data_t dbpd = (dispatch_block_private_data_t)dbcpd;
1999 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2000 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2001 if (slowpath(atomic_flags & DBF_WAITED)) {
2002 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2003 "than once and waited for");
2004 }
2005 if (atomic_flags & DBF_CANCELED) goto out;
2006
2007 pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY;
2008 unsigned long override = 0;
2009 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
2010 op = _dispatch_get_priority();
2011 p = dbpd->dbpd_priority;
2012 override = (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS) ||
2013 !(flags & DISPATCH_BLOCK_INHERIT_QOS_CLASS) ?
2014 DISPATCH_PRIORITY_ENFORCE : 0;
2015 }
2016 voucher_t ov, v = DISPATCH_NO_VOUCHER;
2017 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2018 v = dbpd->dbpd_voucher;
2019 if (v) _voucher_retain(v);
2020 }
2021 ov = _dispatch_adopt_priority_and_voucher(p, v, override);
2022 dbpd->dbpd_thread = _dispatch_thread_port();
2023 dbpd->dbpd_block();
2024 _dispatch_set_priority_and_replace_voucher(op, ov);
2025 out:
2026 if ((atomic_flags & DBF_PERFORM) == 0) {
2027 if (dispatch_atomic_inc2o(dbpd, dbpd_performed, acquire) == 1) {
2028 dispatch_group_leave(_dbpd_group(dbpd));
2029 }
2030 }
2031 }
2032
2033 static void
2034 _dispatch_block_sync_invoke(void *block)
2035 {
2036 dispatch_block_t b = block;
2037 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
2038 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2039 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2040 if (slowpath(atomic_flags & DBF_WAITED)) {
2041 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2042 "than once and waited for");
2043 }
2044 if (atomic_flags & DBF_CANCELED) goto out;
2045
2046 pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY;
2047 unsigned long override = 0;
2048 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
2049 op = _dispatch_get_priority();
2050 p = dbpd->dbpd_priority;
2051 override = (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS) ||
2052 !(flags & DISPATCH_BLOCK_INHERIT_QOS_CLASS) ?
2053 DISPATCH_PRIORITY_ENFORCE : 0;
2054 }
2055 voucher_t ov, v = DISPATCH_NO_VOUCHER;
2056 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2057 v = dbpd->dbpd_voucher;
2058 if (v) _voucher_retain(v);
2059 }
2060 ov = _dispatch_adopt_priority_and_voucher(p, v, override);
2061 dbpd->dbpd_block();
2062 _dispatch_set_priority_and_replace_voucher(op, ov);
2063 out:
2064 if ((atomic_flags & DBF_PERFORM) == 0) {
2065 if (dispatch_atomic_inc2o(dbpd, dbpd_performed, acquire) == 1) {
2066 dispatch_group_leave(_dbpd_group(dbpd));
2067 }
2068 }
2069
2070 dispatch_queue_t dq = _dispatch_queue_get_current();
2071 if (dispatch_atomic_cmpxchg2o(dbpd, dbpd_queue, dq, NULL, acquire)) {
2072 // balances dispatch_{,barrier_,}sync
2073 _dispatch_release(dq);
2074 }
2075 }
2076
2077 static void
2078 _dispatch_block_async_invoke_and_release(void *block)
2079 {
2080 dispatch_block_t b = block;
2081 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b);
2082 dispatch_block_flags_t flags = dbpd->dbpd_flags;
2083 unsigned int atomic_flags = dbpd->dbpd_atomic_flags;
2084 if (slowpath(atomic_flags & DBF_WAITED)) {
2085 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2086 "than once and waited for");
2087 }
2088 if (atomic_flags & DBF_CANCELED) goto out;
2089
2090 pthread_priority_t p = DISPATCH_NO_PRIORITY;
2091 unsigned long override = 0;
2092 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
2093 override = (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS) ?
2094 DISPATCH_PRIORITY_ENFORCE : 0;
2095 p = dbpd->dbpd_priority;
2096 }
2097 voucher_t v = DISPATCH_NO_VOUCHER;
2098 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) {
2099 v = dbpd->dbpd_voucher;
2100 if (v) _voucher_retain(v);
2101 }
2102 _dispatch_adopt_priority_and_replace_voucher(p, v, override);
2103 dbpd->dbpd_block();
2104 out:
2105 if ((atomic_flags & DBF_PERFORM) == 0) {
2106 if (dispatch_atomic_inc2o(dbpd, dbpd_performed, acquire) == 1) {
2107 dispatch_group_leave(_dbpd_group(dbpd));
2108 }
2109 }
2110 dispatch_queue_t dq = _dispatch_queue_get_current();
2111 if (dispatch_atomic_cmpxchg2o(dbpd, dbpd_queue, dq, NULL, acquire)) {
2112 // balances dispatch_{,barrier_,group_}async
2113 _dispatch_release(dq);
2114 }
2115 Block_release(b);
2116 }
2117
2118 void
2119 dispatch_block_cancel(dispatch_block_t db)
2120 {
2121 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2122 if (!dbpd) {
2123 DISPATCH_CLIENT_CRASH("Invalid block object passed to "
2124 "dispatch_block_cancel()");
2125 }
2126 (void)dispatch_atomic_or2o(dbpd, dbpd_atomic_flags, DBF_CANCELED, relaxed);
2127 }
2128
2129 long
2130 dispatch_block_testcancel(dispatch_block_t db)
2131 {
2132 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2133 if (!dbpd) {
2134 DISPATCH_CLIENT_CRASH("Invalid block object passed to "
2135 "dispatch_block_testcancel()");
2136 }
2137 return (bool)(dbpd->dbpd_atomic_flags & DBF_CANCELED);
2138 }
2139
2140 long
2141 dispatch_block_wait(dispatch_block_t db, dispatch_time_t timeout)
2142 {
2143 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2144 if (!dbpd) {
2145 DISPATCH_CLIENT_CRASH("Invalid block object passed to "
2146 "dispatch_block_wait()");
2147 }
2148
2149 unsigned int flags = dispatch_atomic_or_orig2o(dbpd, dbpd_atomic_flags,
2150 DBF_WAITING, relaxed);
2151 if (slowpath(flags & (DBF_WAITED | DBF_WAITING))) {
2152 DISPATCH_CLIENT_CRASH("A block object may not be waited for "
2153 "more than once");
2154 }
2155
2156 // <rdar://problem/17703192> If we know the queue where this block is
2157 // enqueued, or the thread that's executing it, then we should boost
2158 // it here.
2159
2160 pthread_priority_t pp = _dispatch_get_priority();
2161
2162 dispatch_queue_t boost_dq;
2163 boost_dq = dispatch_atomic_xchg2o(dbpd, dbpd_queue, NULL, acquire);
2164 if (boost_dq) {
2165 // release balances dispatch_{,barrier_,group_}async.
2166 // Can't put the queue back in the timeout case: the block might
2167 // finish after we fell out of group_wait and see our NULL, so
2168 // neither of us would ever release. Side effect: After a _wait
2169 // that times out, subsequent waits will not boost the qos of the
2170 // still-running block.
2171 _dispatch_queue_wakeup_with_qos_and_release(boost_dq, pp);
2172 }
2173
2174 mach_port_t boost_th = dbpd->dbpd_thread;
2175 if (boost_th) {
2176 _dispatch_thread_override_start(boost_th, pp);
2177 }
2178
2179 int performed = dispatch_atomic_load2o(dbpd, dbpd_performed, relaxed);
2180 if (slowpath(performed > 1 || (boost_th && boost_dq))) {
2181 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2182 "than once and waited for");
2183 }
2184
2185 long ret = dispatch_group_wait(_dbpd_group(dbpd), timeout);
2186
2187 if (boost_th) {
2188 _dispatch_thread_override_end(boost_th);
2189 }
2190
2191 if (ret) {
2192 // timed out: reverse our changes
2193 (void)dispatch_atomic_and2o(dbpd, dbpd_atomic_flags,
2194 ~DBF_WAITING, relaxed);
2195 } else {
2196 (void)dispatch_atomic_or2o(dbpd, dbpd_atomic_flags,
2197 DBF_WAITED, relaxed);
2198 // don't need to re-test here: the second call would see
2199 // the first call's WAITING
2200 }
2201
2202 return ret;
2203 }
2204
2205 void
2206 dispatch_block_notify(dispatch_block_t db, dispatch_queue_t queue,
2207 dispatch_block_t notification_block)
2208 {
2209 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db);
2210 if (!dbpd) {
2211 DISPATCH_CLIENT_CRASH("Invalid block object passed to "
2212 "dispatch_block_notify()");
2213 }
2214 int performed = dispatch_atomic_load2o(dbpd, dbpd_performed, relaxed);
2215 if (slowpath(performed > 1)) {
2216 DISPATCH_CLIENT_CRASH("A block object may not be both run more "
2217 "than once and observed");
2218 }
2219
2220 return dispatch_group_notify(_dbpd_group(dbpd), queue, notification_block);
2221 }
2222
2223 #endif // __BLOCKS__
2224
2225 #pragma mark -
2226 #pragma mark dispatch_barrier_async
2227
2228 DISPATCH_NOINLINE
2229 static void
2230 _dispatch_barrier_async_f_slow(dispatch_queue_t dq, void *ctxt,
2231 dispatch_function_t func, pthread_priority_t pp,
2232 dispatch_block_flags_t flags)
2233 {
2234 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
2235
2236 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
2237 dc->dc_func = func;
2238 dc->dc_ctxt = ctxt;
2239 _dispatch_continuation_voucher_set(dc, flags);
2240 _dispatch_continuation_priority_set(dc, pp, flags);
2241
2242 pp = _dispatch_continuation_get_override_priority(dq, dc);
2243
2244 _dispatch_queue_push(dq, dc, pp);
2245 }
2246
2247 DISPATCH_ALWAYS_INLINE
2248 static inline void
2249 _dispatch_barrier_async_f2(dispatch_queue_t dq, void *ctxt,
2250 dispatch_function_t func, pthread_priority_t pp,
2251 dispatch_block_flags_t flags)
2252 {
2253 dispatch_continuation_t dc;
2254
2255 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
2256 if (!dc) {
2257 return _dispatch_barrier_async_f_slow(dq, ctxt, func, pp, flags);
2258 }
2259
2260 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
2261 dc->dc_func = func;
2262 dc->dc_ctxt = ctxt;
2263 _dispatch_continuation_voucher_set(dc, flags);
2264 _dispatch_continuation_priority_set(dc, pp, flags);
2265
2266 pp = _dispatch_continuation_get_override_priority(dq, dc);
2267
2268 _dispatch_queue_push(dq, dc, pp);
2269 }
2270
2271 DISPATCH_NOINLINE
2272 static void
2273 _dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
2274 dispatch_function_t func, pthread_priority_t pp,
2275 dispatch_block_flags_t flags)
2276 {
2277 return _dispatch_barrier_async_f2(dq, ctxt, func, pp, flags);
2278 }
2279
2280 DISPATCH_NOINLINE
2281 void
2282 dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
2283 dispatch_function_t func)
2284 {
2285 return _dispatch_barrier_async_f2(dq, ctxt, func, 0, 0);
2286 }
2287
2288 DISPATCH_NOINLINE
2289 void
2290 _dispatch_barrier_async_detached_f(dispatch_queue_t dq, void *ctxt,
2291 dispatch_function_t func)
2292 {
2293 return _dispatch_barrier_async_f2(dq, ctxt, func, 0,
2294 DISPATCH_BLOCK_NO_QOS_CLASS|DISPATCH_BLOCK_NO_VOUCHER);
2295 }
2296
2297 #ifdef __BLOCKS__
2298 void
2299 dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void))
2300 {
2301 dispatch_function_t func = _dispatch_call_block_and_release;
2302 pthread_priority_t pp = 0;
2303 dispatch_block_flags_t flags = 0;
2304 if (slowpath(_dispatch_block_has_private_data(work))) {
2305 func = _dispatch_block_async_invoke_and_release;
2306 pp = _dispatch_block_get_priority(work);
2307 flags = _dispatch_block_get_flags(work);
2308 // balanced in d_block_async_invoke_and_release or d_block_wait
2309 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work),
2310 dbpd_queue, NULL, dq, release)) {
2311 _dispatch_retain(dq);
2312 }
2313 }
2314 _dispatch_barrier_async_f(dq, _dispatch_Block_copy(work), func, pp, flags);
2315 }
2316 #endif
2317
2318 #pragma mark -
2319 #pragma mark dispatch_async
2320
2321 void
2322 _dispatch_async_redirect_invoke(void *ctxt)
2323 {
2324 struct dispatch_continuation_s *dc = ctxt;
2325 struct dispatch_continuation_s *other_dc = dc->dc_other;
2326 dispatch_queue_t old_dq, dq = dc->dc_data, rq;
2327
2328 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2329 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2330 pthread_priority_t old_dp = _dispatch_set_defaultpriority(dq->dq_priority);
2331 _dispatch_continuation_pop(other_dc);
2332 _dispatch_reset_defaultpriority(old_dp);
2333 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2334
2335 rq = dq->do_targetq;
2336 while (slowpath(rq->do_targetq) && rq != old_dq) {
2337 if (dispatch_atomic_sub2o(rq, dq_running, 2, relaxed) == 0) {
2338 _dispatch_queue_wakeup(rq);
2339 }
2340 rq = rq->do_targetq;
2341 }
2342
2343 if (dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0) {
2344 _dispatch_queue_wakeup(dq);
2345 }
2346 _dispatch_release(dq);
2347 }
2348
2349 static inline void
2350 _dispatch_async_f_redirect2(dispatch_queue_t dq, dispatch_continuation_t dc,
2351 pthread_priority_t pp)
2352 {
2353 uint32_t running = 2;
2354
2355 // Find the queue to redirect to
2356 do {
2357 if (slowpath(dq->dq_items_tail) ||
2358 slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) ||
2359 slowpath(dq->dq_width == 1)) {
2360 break;
2361 }
2362 running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
2363 if (slowpath(running & 1) || slowpath(running > dq->dq_width)) {
2364 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
2365 break;
2366 }
2367 dq = dq->do_targetq;
2368 } while (slowpath(dq->do_targetq));
2369
2370 _dispatch_queue_push_wakeup(dq, dc, pp, running == 0);
2371 }
2372
2373 DISPATCH_NOINLINE
2374 static void
2375 _dispatch_async_f_redirect(dispatch_queue_t dq,
2376 dispatch_continuation_t other_dc, pthread_priority_t pp)
2377 {
2378 dispatch_continuation_t dc = _dispatch_continuation_alloc();
2379
2380 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
2381 dc->dc_func = _dispatch_async_redirect_invoke;
2382 dc->dc_ctxt = dc;
2383 dc->dc_data = dq;
2384 dc->dc_other = other_dc;
2385 dc->dc_priority = 0;
2386 dc->dc_voucher = NULL;
2387
2388 _dispatch_retain(dq);
2389 dq = dq->do_targetq;
2390 if (slowpath(dq->do_targetq)) {
2391 return _dispatch_async_f_redirect2(dq, dc, pp);
2392 }
2393
2394 _dispatch_queue_push(dq, dc, pp);
2395 }
2396
2397 DISPATCH_NOINLINE
2398 static void
2399 _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc,
2400 pthread_priority_t pp)
2401 {
2402 uint32_t running = 2;
2403
2404 do {
2405 if (slowpath(dq->dq_items_tail)
2406 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
2407 break;
2408 }
2409 running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
2410 if (slowpath(running > dq->dq_width)) {
2411 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
2412 break;
2413 }
2414 if (!slowpath(running & 1)) {
2415 return _dispatch_async_f_redirect(dq, dc, pp);
2416 }
2417 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
2418 // We might get lucky and find that the barrier has ended by now
2419 } while (!(running & 1));
2420
2421 _dispatch_queue_push_wakeup(dq, dc, pp, running == 0);
2422 }
2423
2424 DISPATCH_NOINLINE
2425 static void
2426 _dispatch_async_f_slow(dispatch_queue_t dq, void *ctxt,
2427 dispatch_function_t func, pthread_priority_t pp,
2428 dispatch_block_flags_t flags)
2429 {
2430 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
2431
2432 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
2433 dc->dc_func = func;
2434 dc->dc_ctxt = ctxt;
2435 _dispatch_continuation_voucher_set(dc, flags);
2436 _dispatch_continuation_priority_set(dc, pp, flags);
2437
2438 pp = _dispatch_continuation_get_override_priority(dq, dc);
2439
2440 // No fastpath/slowpath hint because we simply don't know
2441 if (dq->do_targetq) {
2442 return _dispatch_async_f2(dq, dc, pp);
2443 }
2444
2445 _dispatch_queue_push(dq, dc, pp);
2446 }
2447
2448 DISPATCH_ALWAYS_INLINE
2449 static inline void
2450 _dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
2451 pthread_priority_t pp, dispatch_block_flags_t flags)
2452 {
2453 dispatch_continuation_t dc;
2454
2455 // No fastpath/slowpath hint because we simply don't know
2456 if (dq->dq_width == 1 || flags & DISPATCH_BLOCK_BARRIER) {
2457 return _dispatch_barrier_async_f(dq, ctxt, func, pp, flags);
2458 }
2459
2460 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
2461 if (!dc) {
2462 return _dispatch_async_f_slow(dq, ctxt, func, pp, flags);
2463 }
2464
2465 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
2466 dc->dc_func = func;
2467 dc->dc_ctxt = ctxt;
2468 _dispatch_continuation_voucher_set(dc, flags);
2469 _dispatch_continuation_priority_set(dc, pp, flags);
2470
2471 pp = _dispatch_continuation_get_override_priority(dq, dc);
2472
2473 // No fastpath/slowpath hint because we simply don't know
2474 if (dq->do_targetq) {
2475 return _dispatch_async_f2(dq, dc, pp);
2476 }
2477
2478 _dispatch_queue_push(dq, dc, pp);
2479 }
2480
2481 DISPATCH_NOINLINE
2482 void
2483 dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
2484 {
2485 return _dispatch_async_f(dq, ctxt, func, 0, 0);
2486 }
2487
2488 #ifdef __BLOCKS__
2489 void
2490 dispatch_async(dispatch_queue_t dq, void (^work)(void))
2491 {
2492 dispatch_function_t func = _dispatch_call_block_and_release;
2493 dispatch_block_flags_t flags = 0;
2494 pthread_priority_t pp = 0;
2495 if (slowpath(_dispatch_block_has_private_data(work))) {
2496 func = _dispatch_block_async_invoke_and_release;
2497 pp = _dispatch_block_get_priority(work);
2498 flags = _dispatch_block_get_flags(work);
2499 // balanced in d_block_async_invoke_and_release or d_block_wait
2500 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work),
2501 dbpd_queue, NULL, dq, release)) {
2502 _dispatch_retain(dq);
2503 }
2504 }
2505 _dispatch_async_f(dq, _dispatch_Block_copy(work), func, pp, flags);
2506 }
2507 #endif
2508
2509 #pragma mark -
2510 #pragma mark dispatch_group_async
2511
2512 DISPATCH_ALWAYS_INLINE
2513 static inline void
2514 _dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
2515 dispatch_function_t func, pthread_priority_t pp,
2516 dispatch_block_flags_t flags)
2517 {
2518 dispatch_continuation_t dc;
2519
2520 _dispatch_retain(dg);
2521 dispatch_group_enter(dg);
2522
2523 dc = _dispatch_continuation_alloc();
2524
2525 unsigned long barrier = (flags & DISPATCH_BLOCK_BARRIER) ?
2526 DISPATCH_OBJ_BARRIER_BIT : 0;
2527 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_GROUP_BIT |
2528 barrier);
2529 dc->dc_func = func;
2530 dc->dc_ctxt = ctxt;
2531 dc->dc_data = dg;
2532 _dispatch_continuation_voucher_set(dc, flags);
2533 _dispatch_continuation_priority_set(dc, pp, flags);
2534
2535 pp = _dispatch_continuation_get_override_priority(dq, dc);
2536
2537 // No fastpath/slowpath hint because we simply don't know
2538 if (dq->dq_width != 1 && !barrier && dq->do_targetq) {
2539 return _dispatch_async_f2(dq, dc, pp);
2540 }
2541
2542 _dispatch_queue_push(dq, dc, pp);
2543 }
2544
2545 DISPATCH_NOINLINE
2546 void
2547 dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
2548 dispatch_function_t func)
2549 {
2550 return _dispatch_group_async_f(dg, dq, ctxt, func, 0, 0);
2551 }
2552
2553 #ifdef __BLOCKS__
2554 void
2555 dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
2556 dispatch_block_t db)
2557 {
2558 dispatch_function_t func = _dispatch_call_block_and_release;
2559 dispatch_block_flags_t flags = 0;
2560 pthread_priority_t pp = 0;
2561 if (slowpath(_dispatch_block_has_private_data(db))) {
2562 func = _dispatch_block_async_invoke_and_release;
2563 pp = _dispatch_block_get_priority(db);
2564 flags = _dispatch_block_get_flags(db);
2565 // balanced in d_block_async_invoke_and_release or d_block_wait
2566 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(db),
2567 dbpd_queue, NULL, dq, release)) {
2568 _dispatch_retain(dq);
2569 }
2570 }
2571 _dispatch_group_async_f(dg, dq, _dispatch_Block_copy(db), func, pp, flags);
2572 }
2573 #endif
2574
2575 #pragma mark -
2576 #pragma mark dispatch_function_invoke
2577
2578 static void _dispatch_sync_f(dispatch_queue_t dq, void *ctxt,
2579 dispatch_function_t func, pthread_priority_t pp);
2580
2581 DISPATCH_ALWAYS_INLINE
2582 static inline void
2583 _dispatch_function_invoke(dispatch_queue_t dq, void *ctxt,
2584 dispatch_function_t func)
2585 {
2586 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2587 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2588 _dispatch_client_callout(ctxt, func);
2589 _dispatch_perfmon_workitem_inc();
2590 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2591 }
2592
2593 void
2594 _dispatch_sync_recurse_invoke(void *ctxt)
2595 {
2596 dispatch_continuation_t dc = ctxt;
2597 _dispatch_function_invoke(dc->dc_data, dc->dc_ctxt, dc->dc_func);
2598 }
2599
2600 DISPATCH_ALWAYS_INLINE
2601 static inline void
2602 _dispatch_function_recurse(dispatch_queue_t dq, void *ctxt,
2603 dispatch_function_t func, pthread_priority_t pp)
2604 {
2605 struct dispatch_continuation_s dc = {
2606 .dc_data = dq,
2607 .dc_func = func,
2608 .dc_ctxt = ctxt,
2609 };
2610 _dispatch_sync_f(dq->do_targetq, &dc, _dispatch_sync_recurse_invoke, pp);
2611 }
2612
2613 #pragma mark -
2614 #pragma mark dispatch_barrier_sync
2615
2616 static void _dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
2617 dispatch_function_t func);
2618
2619 DISPATCH_ALWAYS_INLINE_NDEBUG
2620 static inline _dispatch_thread_semaphore_t
2621 _dispatch_barrier_sync_f_pop(dispatch_queue_t dq, dispatch_object_t dou,
2622 bool lock)
2623 {
2624 _dispatch_thread_semaphore_t sema;
2625 dispatch_continuation_t dc = dou._dc;
2626 mach_port_t th;
2627
2628 if (DISPATCH_OBJ_IS_VTABLE(dc) || ((long)dc->do_vtable &
2629 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) !=
2630 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) {
2631 return 0;
2632 }
2633 _dispatch_trace_continuation_pop(dq, dc);
2634 _dispatch_perfmon_workitem_inc();
2635
2636 th = (mach_port_t)dc->dc_data;
2637 dc = dc->dc_ctxt;
2638 dq = dc->dc_data;
2639 sema = (_dispatch_thread_semaphore_t)dc->dc_other;
2640 if (lock) {
2641 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
2642 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
2643 // rdar://problem/9032024 running lock must be held until sync_f_slow
2644 // returns
2645 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
2646 }
2647 _dispatch_introspection_queue_item_complete(dou);
2648 _dispatch_wqthread_override_start(th,
2649 _dispatch_queue_get_override_priority(dq));
2650 return sema ? sema : MACH_PORT_DEAD;
2651 }
2652
2653 static void
2654 _dispatch_barrier_sync_f_slow_invoke(void *ctxt)
2655 {
2656 dispatch_continuation_t dc = ctxt;
2657 dispatch_queue_t dq = dc->dc_data;
2658 _dispatch_thread_semaphore_t sema;
2659 sema = (_dispatch_thread_semaphore_t)dc->dc_other;
2660
2661 dispatch_assert(dq == _dispatch_queue_get_current());
2662 #if DISPATCH_COCOA_COMPAT
2663 if (slowpath(dq->dq_is_thread_bound)) {
2664 // The queue is bound to a non-dispatch thread (e.g. main thread)
2665 _dispatch_continuation_voucher_adopt(dc);
2666 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
2667 dispatch_atomic_store2o(dc, dc_func, NULL, release);
2668 _dispatch_thread_semaphore_signal(sema); // release
2669 return;
2670 }
2671 #endif
2672 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
2673 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
2674 // rdar://9032024 running lock must be held until sync_f_slow returns
2675 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
2676 _dispatch_thread_semaphore_signal(sema); // release
2677 }
2678
2679 DISPATCH_NOINLINE
2680 static void
2681 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
2682 dispatch_function_t func, pthread_priority_t pp)
2683 {
2684 if (slowpath(!dq->do_targetq)) {
2685 // the global concurrent queues do not need strict ordering
2686 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
2687 return _dispatch_sync_f_invoke(dq, ctxt, func);
2688 }
2689 if (!pp) pp = (_dispatch_get_priority() | _PTHREAD_PRIORITY_ENFORCE_FLAG);
2690 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
2691 struct dispatch_continuation_s dc = {
2692 .dc_data = dq,
2693 #if DISPATCH_COCOA_COMPAT
2694 .dc_func = func,
2695 .dc_ctxt = ctxt,
2696 #endif
2697 .dc_other = (void*)sema,
2698 };
2699 #if DISPATCH_COCOA_COMPAT
2700 // It's preferred to execute synchronous blocks on the current thread
2701 // due to thread-local side effects, garbage collection, etc. However,
2702 // blocks submitted to the main thread MUST be run on the main thread
2703 if (slowpath(dq->dq_is_thread_bound)) {
2704 _dispatch_continuation_voucher_set(&dc, 0);
2705 }
2706 #endif
2707 struct dispatch_continuation_s dbss = {
2708 .do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT |
2709 DISPATCH_OBJ_SYNC_SLOW_BIT),
2710 .dc_func = _dispatch_barrier_sync_f_slow_invoke,
2711 .dc_ctxt = &dc,
2712 .dc_data = (void*)(uintptr_t)_dispatch_thread_port(),
2713 .dc_priority = pp,
2714 };
2715 _dispatch_queue_push(dq, &dbss,
2716 _dispatch_continuation_get_override_priority(dq, &dbss));
2717
2718 _dispatch_thread_semaphore_wait(sema); // acquire
2719 _dispatch_put_thread_semaphore(sema);
2720
2721 #if DISPATCH_COCOA_COMPAT
2722 // Queue bound to a non-dispatch thread
2723 if (dc.dc_func == NULL) {
2724 return;
2725 }
2726 #endif
2727
2728 _dispatch_queue_set_thread(dq);
2729 if (slowpath(dq->do_targetq->do_targetq)) {
2730 _dispatch_function_recurse(dq, ctxt, func, pp);
2731 } else {
2732 _dispatch_function_invoke(dq, ctxt, func);
2733 }
2734 _dispatch_queue_clear_thread(dq);
2735
2736 if (fastpath(dq->do_suspend_cnt < 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL) &&
2737 dq->dq_running == 2) {
2738 // rdar://problem/8290662 "lock transfer"
2739 sema = _dispatch_queue_drain_one_barrier_sync(dq);
2740 if (sema) {
2741 _dispatch_thread_semaphore_signal(sema); // release
2742 return;
2743 }
2744 }
2745 (void)dispatch_atomic_sub2o(dq, do_suspend_cnt,
2746 DISPATCH_OBJECT_SUSPEND_INTERVAL, release);
2747 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
2748 _dispatch_queue_wakeup(dq);
2749 }
2750 }
2751
2752 DISPATCH_NOINLINE
2753 static void
2754 _dispatch_barrier_sync_f2(dispatch_queue_t dq)
2755 {
2756 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
2757 // rdar://problem/8290662 "lock transfer"
2758 _dispatch_thread_semaphore_t sema;
2759 sema = _dispatch_queue_drain_one_barrier_sync(dq);
2760 if (sema) {
2761 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
2762 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
2763 // rdar://9032024 running lock must be held until sync_f_slow
2764 // returns: increment by 2 and decrement by 1
2765 (void)dispatch_atomic_inc2o(dq, dq_running, relaxed);
2766 _dispatch_thread_semaphore_signal(sema);
2767 return;
2768 }
2769 }
2770 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
2771 _dispatch_queue_wakeup(dq);
2772 }
2773 }
2774
2775 DISPATCH_NOINLINE
2776 static void
2777 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
2778 dispatch_function_t func)
2779 {
2780 _dispatch_queue_set_thread(dq);
2781 _dispatch_function_invoke(dq, ctxt, func);
2782 _dispatch_queue_clear_thread(dq);
2783 if (slowpath(dq->dq_items_tail)) {
2784 return _dispatch_barrier_sync_f2(dq);
2785 }
2786 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
2787 _dispatch_queue_wakeup(dq);
2788 }
2789 }
2790
2791 DISPATCH_NOINLINE
2792 static void
2793 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
2794 dispatch_function_t func, pthread_priority_t pp)
2795 {
2796 _dispatch_queue_set_thread(dq);
2797 _dispatch_function_recurse(dq, ctxt, func, pp);
2798 _dispatch_queue_clear_thread(dq);
2799 if (slowpath(dq->dq_items_tail)) {
2800 return _dispatch_barrier_sync_f2(dq);
2801 }
2802 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
2803 _dispatch_queue_wakeup(dq);
2804 }
2805 }
2806
2807 DISPATCH_NOINLINE
2808 static void
2809 _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
2810 dispatch_function_t func, pthread_priority_t pp)
2811 {
2812 // 1) ensure that this thread hasn't enqueued anything ahead of this call
2813 // 2) the queue is not suspended
2814 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
2815 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, pp);
2816 }
2817 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, acquire))) {
2818 // global concurrent queues and queues bound to non-dispatch threads
2819 // always fall into the slow case
2820 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, pp);
2821 }
2822 if (slowpath(dq->do_targetq->do_targetq)) {
2823 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func, pp);
2824 }
2825 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
2826 }
2827
2828 DISPATCH_NOINLINE
2829 void
2830 dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
2831 dispatch_function_t func)
2832 {
2833 // 1) ensure that this thread hasn't enqueued anything ahead of this call
2834 // 2) the queue is not suspended
2835 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
2836 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, 0);
2837 }
2838 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, acquire))) {
2839 // global concurrent queues and queues bound to non-dispatch threads
2840 // always fall into the slow case
2841 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, 0);
2842 }
2843 if (slowpath(dq->do_targetq->do_targetq)) {
2844 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func, 0);
2845 }
2846 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
2847 }
2848
2849 #ifdef __BLOCKS__
2850 DISPATCH_NOINLINE
2851 static void
2852 _dispatch_barrier_sync_slow(dispatch_queue_t dq, void (^work)(void))
2853 {
2854 bool has_pd = _dispatch_block_has_private_data(work);
2855 dispatch_function_t func = _dispatch_Block_invoke(work);
2856 pthread_priority_t pp = 0;
2857 if (has_pd) {
2858 func = _dispatch_block_sync_invoke;
2859 pp = _dispatch_block_get_priority(work);
2860 dispatch_block_flags_t flags = _dispatch_block_get_flags(work);
2861 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
2862 pthread_priority_t tp = _dispatch_get_priority();
2863 if (pp < tp) {
2864 pp = tp | _PTHREAD_PRIORITY_ENFORCE_FLAG;
2865 } else if (!(flags & DISPATCH_BLOCK_INHERIT_QOS_CLASS)) {
2866 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
2867 }
2868 }
2869 // balanced in d_block_sync_invoke or d_block_wait
2870 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work),
2871 dbpd_queue, NULL, dq, release)) {
2872 _dispatch_retain(dq);
2873 }
2874 #if DISPATCH_COCOA_COMPAT
2875 } else if (dq->dq_is_thread_bound && dispatch_begin_thread_4GC) {
2876 // Blocks submitted to the main queue MUST be run on the main thread,
2877 // under GC we must Block_copy in order to notify the thread-local
2878 // garbage collector that the objects are transferring to another thread
2879 // rdar://problem/7176237&7181849&7458685
2880 work = _dispatch_Block_copy(work);
2881 func = _dispatch_call_block_and_release;
2882 }
2883 #endif
2884 _dispatch_barrier_sync_f(dq, work, func, pp);
2885 }
2886
2887 void
2888 dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void))
2889 {
2890 if (slowpath(dq->dq_is_thread_bound) ||
2891 slowpath(_dispatch_block_has_private_data(work))) {
2892 return _dispatch_barrier_sync_slow(dq, work);
2893 }
2894 dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work));
2895 }
2896 #endif
2897
2898 DISPATCH_NOINLINE
2899 static void
2900 _dispatch_barrier_trysync_f_invoke(dispatch_queue_t dq, void *ctxt,
2901 dispatch_function_t func)
2902 {
2903 _dispatch_queue_set_thread(dq);
2904 _dispatch_function_invoke(dq, ctxt, func);
2905 _dispatch_queue_clear_thread(dq);
2906 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
2907 _dispatch_queue_wakeup(dq);
2908 }
2909 }
2910
2911 DISPATCH_NOINLINE
2912 void
2913 _dispatch_barrier_trysync_f(dispatch_queue_t dq, void *ctxt,
2914 dispatch_function_t func)
2915 {
2916 // Use for mutation of queue-/source-internal state only, ignores target
2917 // queue hierarchy!
2918 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))
2919 || slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1,
2920 acquire))) {
2921 return _dispatch_barrier_async_detached_f(dq, ctxt, func);
2922 }
2923 _dispatch_barrier_trysync_f_invoke(dq, ctxt, func);
2924 }
2925
2926 #pragma mark -
2927 #pragma mark dispatch_sync
2928
2929 DISPATCH_NOINLINE
2930 static void
2931 _dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
2932 pthread_priority_t pp, bool wakeup)
2933 {
2934 if (!pp) pp = (_dispatch_get_priority() | _PTHREAD_PRIORITY_ENFORCE_FLAG);
2935 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
2936 struct dispatch_continuation_s dc = {
2937 .do_vtable = (void*)DISPATCH_OBJ_SYNC_SLOW_BIT,
2938 #if DISPATCH_INTROSPECTION
2939 .dc_func = func,
2940 .dc_ctxt = ctxt,
2941 .dc_data = (void*)(uintptr_t)_dispatch_thread_port(),
2942 #endif
2943 .dc_other = (void*)sema,
2944 .dc_priority = pp,
2945 };
2946 _dispatch_queue_push_wakeup(dq, &dc,
2947 _dispatch_continuation_get_override_priority(dq, &dc), wakeup);
2948
2949 _dispatch_thread_semaphore_wait(sema);
2950 _dispatch_put_thread_semaphore(sema);
2951
2952 if (slowpath(dq->do_targetq->do_targetq)) {
2953 _dispatch_function_recurse(dq, ctxt, func, pp);
2954 } else {
2955 _dispatch_function_invoke(dq, ctxt, func);
2956 }
2957
2958 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
2959 _dispatch_queue_wakeup(dq);
2960 }
2961 }
2962
2963 DISPATCH_NOINLINE
2964 static void
2965 _dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
2966 dispatch_function_t func)
2967 {
2968 _dispatch_function_invoke(dq, ctxt, func);
2969 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
2970 _dispatch_queue_wakeup(dq);
2971 }
2972 }
2973
2974 DISPATCH_NOINLINE
2975 static void
2976 _dispatch_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
2977 dispatch_function_t func, pthread_priority_t pp)
2978 {
2979 _dispatch_function_recurse(dq, ctxt, func, pp);
2980 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
2981 _dispatch_queue_wakeup(dq);
2982 }
2983 }
2984
2985 static inline void
2986 _dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
2987 pthread_priority_t pp)
2988 {
2989 // 1) ensure that this thread hasn't enqueued anything ahead of this call
2990 // 2) the queue is not suspended
2991 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
2992 return _dispatch_sync_f_slow(dq, ctxt, func, pp, false);
2993 }
2994 uint32_t running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
2995 // re-check suspension after barrier check <rdar://problem/15242126>
2996 if (slowpath(running & 1) || _dispatch_object_suspended(dq)) {
2997 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
2998 return _dispatch_sync_f_slow(dq, ctxt, func, pp, running == 0);
2999 }
3000 if (slowpath(dq->do_targetq->do_targetq)) {
3001 return _dispatch_sync_f_recurse(dq, ctxt, func, pp);
3002 }
3003 _dispatch_sync_f_invoke(dq, ctxt, func);
3004 }
3005
3006 DISPATCH_NOINLINE
3007 static void
3008 _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
3009 pthread_priority_t pp)
3010 {
3011 if (fastpath(dq->dq_width == 1)) {
3012 return _dispatch_barrier_sync_f(dq, ctxt, func, pp);
3013 }
3014 if (slowpath(!dq->do_targetq)) {
3015 // the global concurrent queues do not need strict ordering
3016 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
3017 return _dispatch_sync_f_invoke(dq, ctxt, func);
3018 }
3019 _dispatch_sync_f2(dq, ctxt, func, pp);
3020 }
3021
3022 DISPATCH_NOINLINE
3023 void
3024 dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
3025 {
3026 if (fastpath(dq->dq_width == 1)) {
3027 return dispatch_barrier_sync_f(dq, ctxt, func);
3028 }
3029 if (slowpath(!dq->do_targetq)) {
3030 // the global concurrent queues do not need strict ordering
3031 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
3032 return _dispatch_sync_f_invoke(dq, ctxt, func);
3033 }
3034 _dispatch_sync_f2(dq, ctxt, func, 0);
3035 }
3036
3037 #ifdef __BLOCKS__
3038 DISPATCH_NOINLINE
3039 static void
3040 _dispatch_sync_slow(dispatch_queue_t dq, void (^work)(void))
3041 {
3042 bool has_pd = _dispatch_block_has_private_data(work);
3043 if (has_pd && (_dispatch_block_get_flags(work) & DISPATCH_BLOCK_BARRIER)) {
3044 return _dispatch_barrier_sync_slow(dq, work);
3045 }
3046 dispatch_function_t func = _dispatch_Block_invoke(work);
3047 pthread_priority_t pp = 0;
3048 if (has_pd) {
3049 func = _dispatch_block_sync_invoke;
3050 pp = _dispatch_block_get_priority(work);
3051 dispatch_block_flags_t flags = _dispatch_block_get_flags(work);
3052 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) {
3053 pthread_priority_t tp = _dispatch_get_priority();
3054 if (pp < tp) {
3055 pp = tp | _PTHREAD_PRIORITY_ENFORCE_FLAG;
3056 } else if (!(flags & DISPATCH_BLOCK_INHERIT_QOS_CLASS)) {
3057 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG;
3058 }
3059 }
3060 // balanced in d_block_sync_invoke or d_block_wait
3061 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work),
3062 dbpd_queue, NULL, dq, release)) {
3063 _dispatch_retain(dq);
3064 }
3065 #if DISPATCH_COCOA_COMPAT
3066 } else if (dq->dq_is_thread_bound && dispatch_begin_thread_4GC) {
3067 // Blocks submitted to the main queue MUST be run on the main thread,
3068 // under GC we must Block_copy in order to notify the thread-local
3069 // garbage collector that the objects are transferring to another thread
3070 // rdar://problem/7176237&7181849&7458685
3071 work = _dispatch_Block_copy(work);
3072 func = _dispatch_call_block_and_release;
3073 #endif
3074 }
3075 if (slowpath(!dq->do_targetq)) {
3076 // the global concurrent queues do not need strict ordering
3077 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
3078 return _dispatch_sync_f_invoke(dq, work, func);
3079 }
3080 _dispatch_sync_f2(dq, work, func, pp);
3081 }
3082
3083 void
3084 dispatch_sync(dispatch_queue_t dq, void (^work)(void))
3085 {
3086 if (fastpath(dq->dq_width == 1)) {
3087 return dispatch_barrier_sync(dq, work);
3088 }
3089 if (slowpath(dq->dq_is_thread_bound) ||
3090 slowpath(_dispatch_block_has_private_data(work)) ) {
3091 return _dispatch_sync_slow(dq, work);
3092 }
3093 dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
3094 }
3095 #endif
3096
3097 #pragma mark -
3098 #pragma mark dispatch_after
3099
3100 void
3101 _dispatch_after_timer_callback(void *ctxt)
3102 {
3103 dispatch_continuation_t dc = ctxt, dc1;
3104 dispatch_source_t ds = dc->dc_data;
3105 dc1 = _dispatch_continuation_free_cacheonly(dc);
3106 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
3107 dispatch_source_cancel(ds);
3108 dispatch_release(ds);
3109 if (slowpath(dc1)) {
3110 _dispatch_continuation_free_to_cache_limit(dc1);
3111 }
3112 }
3113
3114 DISPATCH_NOINLINE
3115 void
3116 dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
3117 dispatch_function_t func)
3118 {
3119 uint64_t delta, leeway;
3120 dispatch_source_t ds;
3121
3122 if (when == DISPATCH_TIME_FOREVER) {
3123 #if DISPATCH_DEBUG
3124 DISPATCH_CLIENT_CRASH(
3125 "dispatch_after_f() called with 'when' == infinity");
3126 #endif
3127 return;
3128 }
3129
3130 delta = _dispatch_timeout(when);
3131 if (delta == 0) {
3132 return dispatch_async_f(queue, ctxt, func);
3133 }
3134 leeway = delta / 10; // <rdar://problem/13447496>
3135 if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
3136 if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
3137
3138 // this function can and should be optimized to not use a dispatch source
3139 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);
3140 dispatch_assert(ds);
3141
3142 // TODO: don't use a separate continuation & voucher
3143 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3144 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
3145 dc->dc_func = func;
3146 dc->dc_ctxt = ctxt;
3147 dc->dc_data = ds;
3148
3149 dispatch_set_context(ds, dc);
3150 dispatch_source_set_event_handler_f(ds, _dispatch_after_timer_callback);
3151 dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, leeway);
3152 dispatch_resume(ds);
3153 }
3154
3155 #ifdef __BLOCKS__
3156 void
3157 dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
3158 dispatch_block_t work)
3159 {
3160 // test before the copy of the block
3161 if (when == DISPATCH_TIME_FOREVER) {
3162 #if DISPATCH_DEBUG
3163 DISPATCH_CLIENT_CRASH(
3164 "dispatch_after() called with 'when' == infinity");
3165 #endif
3166 return;
3167 }
3168 dispatch_after_f(when, queue, _dispatch_Block_copy(work),
3169 _dispatch_call_block_and_release);
3170 }
3171 #endif
3172
3173 #pragma mark -
3174 #pragma mark dispatch_queue_push
3175
3176 DISPATCH_ALWAYS_INLINE
3177 static inline void
3178 _dispatch_queue_push_list_slow2(dispatch_queue_t dq, pthread_priority_t pp,
3179 struct dispatch_object_s *obj, bool retained)
3180 {
3181 // The queue must be retained before dq_items_head is written in order
3182 // to ensure that the reference is still valid when _dispatch_wakeup is
3183 // called. Otherwise, if preempted between the assignment to
3184 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
3185 // queue may release the last reference to the queue when invoked by
3186 // _dispatch_queue_drain. <rdar://problem/6932776>
3187 if (!retained) _dispatch_retain(dq);
3188 dq->dq_items_head = obj;
3189 return _dispatch_queue_wakeup_with_qos_and_release(dq, pp);
3190 }
3191
3192 DISPATCH_NOINLINE
3193 void
3194 _dispatch_queue_push_list_slow(dispatch_queue_t dq, pthread_priority_t pp,
3195 struct dispatch_object_s *obj, unsigned int n, bool retained)
3196 {
3197 if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) {
3198 dispatch_assert(!retained);
3199 dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed);
3200 return _dispatch_queue_wakeup_global2(dq, n);
3201 }
3202 _dispatch_queue_push_list_slow2(dq, pp, obj, retained);
3203 }
3204
3205 DISPATCH_NOINLINE
3206 void
3207 _dispatch_queue_push_slow(dispatch_queue_t dq, pthread_priority_t pp,
3208 struct dispatch_object_s *obj, bool retained)
3209 {
3210 if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) {
3211 dispatch_assert(!retained);
3212 dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed);
3213 return _dispatch_queue_wakeup_global(dq);
3214 }
3215 _dispatch_queue_push_list_slow2(dq, pp, obj, retained);
3216 }
3217
3218 #pragma mark -
3219 #pragma mark dispatch_queue_probe
3220
3221 unsigned long
3222 _dispatch_queue_probe(dispatch_queue_t dq)
3223 {
3224 return _dispatch_queue_class_probe(dq);
3225 }
3226
3227 #if DISPATCH_COCOA_COMPAT
3228 unsigned long
3229 _dispatch_runloop_queue_probe(dispatch_queue_t dq)
3230 {
3231 if (_dispatch_queue_class_probe(dq)) {
3232 if (dq->do_xref_cnt == -1) return true; // <rdar://problem/14026816>
3233 return _dispatch_runloop_queue_wakeup(dq);
3234 }
3235 return false;
3236 }
3237 #endif
3238
3239 unsigned long
3240 _dispatch_mgr_queue_probe(dispatch_queue_t dq)
3241 {
3242 if (_dispatch_queue_class_probe(dq)) {
3243 return _dispatch_mgr_wakeup(dq);
3244 }
3245 return false;
3246 }
3247
3248 unsigned long
3249 _dispatch_root_queue_probe(dispatch_queue_t dq)
3250 {
3251 _dispatch_queue_wakeup_global(dq);
3252 return false;
3253 }
3254
3255 #pragma mark -
3256 #pragma mark dispatch_wakeup
3257
3258 // 6618342 Contact the team that owns the Instrument DTrace probe before
3259 // renaming this symbol
3260 dispatch_queue_t
3261 _dispatch_wakeup(dispatch_object_t dou)
3262 {
3263 unsigned long type = dx_metatype(dou._do);
3264 if (type == _DISPATCH_QUEUE_TYPE || type == _DISPATCH_SOURCE_TYPE) {
3265 return _dispatch_queue_wakeup(dou._dq);
3266 }
3267 if (_dispatch_object_suspended(dou)) {
3268 return NULL;
3269 }
3270 if (!dx_probe(dou._do)) {
3271 return NULL;
3272 }
3273 if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,
3274 DISPATCH_OBJECT_SUSPEND_LOCK, acquire)) {
3275 return NULL;
3276 }
3277 _dispatch_retain(dou._do);
3278 dispatch_queue_t tq = dou._do->do_targetq;
3279 _dispatch_queue_push(tq, dou._do, 0);
3280 return tq; // libdispatch does not need this, but the Instrument DTrace
3281 // probe does
3282 }
3283
3284 #if DISPATCH_COCOA_COMPAT
3285 static inline void
3286 _dispatch_runloop_queue_wakeup_thread(dispatch_queue_t dq)
3287 {
3288 mach_port_t mp = (mach_port_t)dq->do_ctxt;
3289 if (!mp) {
3290 return;
3291 }
3292 kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0);
3293 switch (kr) {
3294 case MACH_SEND_TIMEOUT:
3295 case MACH_SEND_TIMED_OUT:
3296 case MACH_SEND_INVALID_DEST:
3297 break;
3298 default:
3299 (void)dispatch_assume_zero(kr);
3300 break;
3301 }
3302 }
3303
3304 DISPATCH_NOINLINE DISPATCH_WEAK
3305 unsigned long
3306 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq)
3307 {
3308 _dispatch_runloop_queue_wakeup_thread(dq);
3309 return false;
3310 }
3311
3312 DISPATCH_NOINLINE
3313 static dispatch_queue_t
3314 _dispatch_main_queue_wakeup(void)
3315 {
3316 dispatch_queue_t dq = &_dispatch_main_q;
3317 if (!dq->dq_is_thread_bound) {
3318 return NULL;
3319 }
3320 dispatch_once_f(&_dispatch_main_q_port_pred, dq,
3321 _dispatch_runloop_queue_port_init);
3322 _dispatch_runloop_queue_wakeup_thread(dq);
3323 return NULL;
3324 }
3325 #endif
3326
3327 DISPATCH_NOINLINE
3328 static void
3329 _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq, unsigned int n)
3330 {
3331 dispatch_root_queue_context_t qc = dq->do_ctxt;
3332 uint32_t i = n;
3333 int r;
3334
3335 _dispatch_debug_root_queue(dq, __func__);
3336 dispatch_once_f(&_dispatch_root_queues_pred, NULL,
3337 _dispatch_root_queues_init);
3338
3339 #if HAVE_PTHREAD_WORKQUEUES
3340 #if DISPATCH_USE_PTHREAD_POOL
3341 if (qc->dgq_kworkqueue != (void*)(~0ul))
3342 #endif
3343 {
3344 _dispatch_root_queue_debug("requesting new worker thread for global "
3345 "queue: %p", dq);
3346 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
3347 if (qc->dgq_kworkqueue) {
3348 pthread_workitem_handle_t wh;
3349 unsigned int gen_cnt;
3350 do {
3351 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
3352 _dispatch_worker_thread4, dq, &wh, &gen_cnt);
3353 (void)dispatch_assume_zero(r);
3354 } while (--i);
3355 return;
3356 }
3357 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
3358 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
3359 if (!dq->dq_priority) {
3360 r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
3361 qc->dgq_wq_options, (int)i);
3362 (void)dispatch_assume_zero(r);
3363 return;
3364 }
3365 #endif
3366 #if HAVE_PTHREAD_WORKQUEUE_QOS
3367 r = _pthread_workqueue_addthreads((int)i, dq->dq_priority);
3368 (void)dispatch_assume_zero(r);
3369 #endif
3370 return;
3371 }
3372 #endif // HAVE_PTHREAD_WORKQUEUES
3373 #if DISPATCH_USE_PTHREAD_POOL
3374 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
3375 if (fastpath(pqc->dpq_thread_mediator.do_vtable)) {
3376 while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
3377 if (!--i) {
3378 return;
3379 }
3380 }
3381 }
3382 uint32_t j, t_count;
3383 // seq_cst with atomic store to tail <rdar://problem/16932833>
3384 t_count = dispatch_atomic_load2o(qc, dgq_thread_pool_size, seq_cst);
3385 do {
3386 if (!t_count) {
3387 _dispatch_root_queue_debug("pthread pool is full for root queue: "
3388 "%p", dq);
3389 return;
3390 }
3391 j = i > t_count ? t_count : i;
3392 } while (!dispatch_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count,
3393 t_count - j, &t_count, acquire));
3394
3395 pthread_attr_t *attr = &pqc->dpq_thread_attr;
3396 pthread_t tid, *pthr = &tid;
3397 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
3398 if (slowpath(dq == &_dispatch_mgr_root_queue)) {
3399 pthr = _dispatch_mgr_root_queue_init();
3400 }
3401 #endif
3402 do {
3403 _dispatch_retain(dq);
3404 while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
3405 if (r != EAGAIN) {
3406 (void)dispatch_assume_zero(r);
3407 }
3408 _dispatch_temporary_resource_shortage();
3409 }
3410 } while (--j);
3411 #endif // DISPATCH_USE_PTHREAD_POOL
3412 }
3413
3414 static inline void
3415 _dispatch_queue_wakeup_global2(dispatch_queue_t dq, unsigned int n)
3416 {
3417 if (!_dispatch_queue_class_probe(dq)) {
3418 return;
3419 }
3420 #if HAVE_PTHREAD_WORKQUEUES
3421 dispatch_root_queue_context_t qc = dq->do_ctxt;
3422 if (
3423 #if DISPATCH_USE_PTHREAD_POOL
3424 (qc->dgq_kworkqueue != (void*)(~0ul)) &&
3425 #endif
3426 !dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) {
3427 _dispatch_root_queue_debug("worker thread request still pending for "
3428 "global queue: %p", dq);
3429 return;
3430 }
3431 #endif // HAVE_PTHREAD_WORKQUEUES
3432 return _dispatch_queue_wakeup_global_slow(dq, n);
3433 }
3434
3435 static inline void
3436 _dispatch_queue_wakeup_global(dispatch_queue_t dq)
3437 {
3438 return _dispatch_queue_wakeup_global2(dq, 1);
3439 }
3440
3441 #pragma mark -
3442 #pragma mark dispatch_queue_invoke
3443
3444 DISPATCH_ALWAYS_INLINE
3445 static inline dispatch_queue_t
3446 dispatch_queue_invoke2(dispatch_object_t dou,
3447 _dispatch_thread_semaphore_t *sema_ptr)
3448 {
3449 dispatch_queue_t dq = dou._dq;
3450 dispatch_queue_t otq = dq->do_targetq;
3451 dispatch_queue_t cq = _dispatch_queue_get_current();
3452
3453 if (slowpath(cq != otq)) {
3454 return otq;
3455 }
3456
3457 *sema_ptr = _dispatch_queue_drain(dq);
3458
3459 if (slowpath(otq != dq->do_targetq)) {
3460 // An item on the queue changed the target queue
3461 return dq->do_targetq;
3462 }
3463 return NULL;
3464 }
3465
3466 // 6618342 Contact the team that owns the Instrument DTrace probe before
3467 // renaming this symbol
3468 DISPATCH_NOINLINE
3469 void
3470 _dispatch_queue_invoke(dispatch_queue_t dq)
3471 {
3472 _dispatch_queue_class_invoke(dq, dispatch_queue_invoke2);
3473 }
3474
3475 #pragma mark -
3476 #pragma mark dispatch_queue_drain
3477
3478 DISPATCH_ALWAYS_INLINE
3479 static inline struct dispatch_object_s*
3480 _dispatch_queue_head(dispatch_queue_t dq)
3481 {
3482 struct dispatch_object_s *dc;
3483 _dispatch_wait_until(dc = fastpath(dq->dq_items_head));
3484 return dc;
3485 }
3486
3487 DISPATCH_ALWAYS_INLINE
3488 static inline struct dispatch_object_s*
3489 _dispatch_queue_next(dispatch_queue_t dq, struct dispatch_object_s *dc)
3490 {
3491 struct dispatch_object_s *next_dc;
3492 next_dc = fastpath(dc->do_next);
3493 dq->dq_items_head = next_dc;
3494 if (!next_dc && !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, dc, NULL,
3495 relaxed)) {
3496 _dispatch_wait_until(next_dc = fastpath(dc->do_next));
3497 dq->dq_items_head = next_dc;
3498 }
3499 return next_dc;
3500 }
3501
3502 _dispatch_thread_semaphore_t
3503 _dispatch_queue_drain(dispatch_object_t dou)
3504 {
3505 dispatch_queue_t dq = dou._dq, orig_tq, old_dq;
3506 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
3507 struct dispatch_object_s *dc, *next_dc;
3508 _dispatch_thread_semaphore_t sema = 0;
3509
3510 // Continue draining sources after target queue change rdar://8928171
3511 bool check_tq = (dx_type(dq) != DISPATCH_SOURCE_KEVENT_TYPE);
3512
3513 orig_tq = dq->do_targetq;
3514
3515 _dispatch_thread_setspecific(dispatch_queue_key, dq);
3516 pthread_priority_t old_dp = _dispatch_set_defaultpriority(dq->dq_priority);
3517
3518 pthread_priority_t op = _dispatch_queue_get_override_priority(dq);
3519 pthread_priority_t dp = _dispatch_get_defaultpriority();
3520 dp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
3521 if (op > dp) {
3522 _dispatch_wqthread_override_start(dq->dq_thread, op);
3523 }
3524
3525 //dispatch_debug_queue(dq, __func__);
3526
3527 while (dq->dq_items_tail) {
3528 dc = _dispatch_queue_head(dq);
3529 do {
3530 if (DISPATCH_OBJECT_SUSPENDED(dq)) {
3531 goto out;
3532 }
3533 if (dq->dq_running > dq->dq_width) {
3534 goto out;
3535 }
3536 if (slowpath(orig_tq != dq->do_targetq) && check_tq) {
3537 goto out;
3538 }
3539 bool redirect = false;
3540 if (!fastpath(dq->dq_width == 1)) {
3541 if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
3542 (long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
3543 if (dq->dq_running > 1) {
3544 goto out;
3545 }
3546 } else {
3547 redirect = true;
3548 }
3549 }
3550 next_dc = _dispatch_queue_next(dq, dc);
3551 if (redirect) {
3552 _dispatch_continuation_redirect(dq, dc);
3553 continue;
3554 }
3555 if ((sema = _dispatch_barrier_sync_f_pop(dq, dc, true))) {
3556 goto out;
3557 }
3558 _dispatch_continuation_pop(dc);
3559 _dispatch_perfmon_workitem_inc();
3560 } while ((dc = next_dc));
3561 }
3562
3563 out:
3564 _dispatch_reset_defaultpriority(old_dp);
3565 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
3566 return sema;
3567 }
3568
3569 #if DISPATCH_COCOA_COMPAT
3570 static void
3571 _dispatch_main_queue_drain(void)
3572 {
3573 dispatch_queue_t dq = &_dispatch_main_q;
3574 if (!dq->dq_items_tail) {
3575 return;
3576 }
3577 struct dispatch_continuation_s marker = {
3578 .do_vtable = NULL,
3579 };
3580 struct dispatch_object_s *dmarker = (void*)&marker;
3581 _dispatch_queue_push_notrace(dq, dmarker, 0);
3582
3583 _dispatch_perfmon_start();
3584 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
3585 _dispatch_thread_setspecific(dispatch_queue_key, dq);
3586 pthread_priority_t old_pri = _dispatch_get_priority();
3587 pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri);
3588 voucher_t voucher = _voucher_copy();
3589
3590 struct dispatch_object_s *dc, *next_dc;
3591 dc = _dispatch_queue_head(dq);
3592 do {
3593 next_dc = _dispatch_queue_next(dq, dc);
3594 if (dc == dmarker) {
3595 goto out;
3596 }
3597 _dispatch_continuation_pop(dc);
3598 _dispatch_perfmon_workitem_inc();
3599 } while ((dc = next_dc));
3600 DISPATCH_CRASH("Main queue corruption");
3601
3602 out:
3603 if (next_dc) {
3604 _dispatch_main_queue_wakeup();
3605 }
3606 _dispatch_voucher_debug("main queue restore", voucher);
3607 _dispatch_set_priority_and_replace_voucher(old_pri, voucher);
3608 _dispatch_queue_reset_override_priority(dq);
3609 _dispatch_reset_defaultpriority(old_dp);
3610 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
3611 _dispatch_perfmon_end();
3612 _dispatch_force_cache_cleanup();
3613 }
3614
3615 static bool
3616 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq)
3617 {
3618 if (!dq->dq_items_tail) {
3619 return false;
3620 }
3621 _dispatch_perfmon_start();
3622 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
3623 _dispatch_thread_setspecific(dispatch_queue_key, dq);
3624 pthread_priority_t old_pri = _dispatch_get_priority();
3625 pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri);
3626 voucher_t voucher = _voucher_copy();
3627
3628 struct dispatch_object_s *dc, *next_dc;
3629 dc = _dispatch_queue_head(dq);
3630 next_dc = _dispatch_queue_next(dq, dc);
3631 _dispatch_continuation_pop(dc);
3632 _dispatch_perfmon_workitem_inc();
3633
3634 _dispatch_voucher_debug("runloop queue restore", voucher);
3635 _dispatch_set_priority_and_replace_voucher(old_pri, voucher);
3636 _dispatch_reset_defaultpriority(old_dp);
3637 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
3638 _dispatch_perfmon_end();
3639 _dispatch_force_cache_cleanup();
3640 return next_dc;
3641 }
3642 #endif
3643
3644 DISPATCH_ALWAYS_INLINE_NDEBUG
3645 static inline _dispatch_thread_semaphore_t
3646 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq)
3647 {
3648 // rdar://problem/8290662 "lock transfer"
3649 struct dispatch_object_s *dc;
3650 _dispatch_thread_semaphore_t sema;
3651
3652 // queue is locked, or suspended and not being drained
3653 dc = dq->dq_items_head;
3654 if (slowpath(!dc) || !(sema = _dispatch_barrier_sync_f_pop(dq, dc, false))){
3655 return 0;
3656 }
3657 // dequeue dc, it is a barrier sync
3658 (void)_dispatch_queue_next(dq, dc);
3659 return sema;
3660 }
3661
3662 void
3663 _dispatch_mgr_queue_drain(void)
3664 {
3665 dispatch_queue_t dq = &_dispatch_mgr_q;
3666 if (!dq->dq_items_tail) {
3667 return _dispatch_force_cache_cleanup();
3668 }
3669 _dispatch_perfmon_start();
3670 if (slowpath(_dispatch_queue_drain(dq))) {
3671 DISPATCH_CRASH("Sync onto manager queue");
3672 }
3673 _dispatch_voucher_debug("mgr queue clear", NULL);
3674 _voucher_clear();
3675 _dispatch_queue_reset_override_priority(dq);
3676 _dispatch_reset_defaultpriority_override();
3677 _dispatch_perfmon_end();
3678 _dispatch_force_cache_cleanup();
3679 }
3680
3681 #pragma mark -
3682 #pragma mark _dispatch_queue_wakeup_with_qos
3683
3684 DISPATCH_NOINLINE
3685 static dispatch_queue_t
3686 _dispatch_queue_wakeup_with_qos_slow(dispatch_queue_t dq, pthread_priority_t pp,
3687 bool retained)
3688 {
3689 if (!dx_probe(dq) && (dq->dq_is_thread_bound || !dq->dq_thread)) {
3690 if (retained) _dispatch_release(dq);
3691 return NULL;
3692 }
3693 pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
3694 bool override = _dispatch_queue_override_priority(dq, pp);
3695 if (override && dq->dq_running > 1) {
3696 override = false;
3697 }
3698
3699 if (!dispatch_atomic_cmpxchg2o(dq, do_suspend_cnt, 0,
3700 DISPATCH_OBJECT_SUSPEND_LOCK, acquire)) {
3701 #if DISPATCH_COCOA_COMPAT
3702 if (dq == &_dispatch_main_q && dq->dq_is_thread_bound) {
3703 return _dispatch_main_queue_wakeup();
3704 }
3705 #endif
3706 if (override) {
3707 mach_port_t th;
3708 // <rdar://problem/17735825> to traverse the tq chain safely we must
3709 // lock it to ensure it cannot change, unless the queue is running
3710 // and we can just override the thread itself
3711 if (dq->dq_thread) {
3712 _dispatch_wqthread_override_start(dq->dq_thread, pp);
3713 } else if (!dispatch_atomic_cmpxchgv2o(dq, dq_tqthread,
3714 MACH_PORT_NULL, _dispatch_thread_port(), &th, acquire)) {
3715 // already locked, override the owner, trysync will do a queue
3716 // wakeup when it returns.
3717 _dispatch_wqthread_override_start(th, pp);
3718 } else {
3719 dispatch_queue_t tq = dq->do_targetq;
3720 if (_dispatch_queue_prepare_override(dq, tq, pp)) {
3721 _dispatch_queue_push_override(dq, tq, pp);
3722 } else {
3723 _dispatch_queue_wakeup_with_qos(tq, pp);
3724 }
3725 dispatch_atomic_store2o(dq, dq_tqthread, MACH_PORT_NULL,
3726 release);
3727 }
3728 }
3729 if (retained) _dispatch_release(dq);
3730 return NULL;
3731 }
3732 dispatch_queue_t tq = dq->do_targetq;
3733 if (!retained) _dispatch_retain(dq);
3734 if (override) {
3735 override = _dispatch_queue_prepare_override(dq, tq, pp);
3736 }
3737 _dispatch_queue_push(tq, dq, pp);
3738 if (override) {
3739 _dispatch_queue_push_override(dq, tq, pp);
3740 }
3741 return tq; // libdispatch does not need this, but the Instrument DTrace
3742 // probe does
3743 }
3744
3745 DISPATCH_ALWAYS_INLINE
3746 static inline dispatch_queue_t
3747 _dispatch_queue_wakeup_with_qos2(dispatch_queue_t dq, pthread_priority_t pp,
3748 bool retained)
3749 {
3750 if (_dispatch_object_suspended(dq)) {
3751 _dispatch_queue_override_priority(dq, pp);
3752 if (retained) _dispatch_release(dq);
3753 return NULL;
3754 }
3755 return _dispatch_queue_wakeup_with_qos_slow(dq, pp, retained);
3756 }
3757
3758 DISPATCH_NOINLINE
3759 void
3760 _dispatch_queue_wakeup_with_qos_and_release(dispatch_queue_t dq,
3761 pthread_priority_t pp)
3762 {
3763 (void)_dispatch_queue_wakeup_with_qos2(dq, pp, true);
3764 }
3765
3766 DISPATCH_NOINLINE
3767 void
3768 _dispatch_queue_wakeup_with_qos(dispatch_queue_t dq, pthread_priority_t pp)
3769 {
3770 (void)_dispatch_queue_wakeup_with_qos2(dq, pp, false);
3771 }
3772
3773 DISPATCH_NOINLINE
3774 dispatch_queue_t
3775 _dispatch_queue_wakeup(dispatch_queue_t dq)
3776 {
3777 return _dispatch_queue_wakeup_with_qos2(dq,
3778 _dispatch_queue_get_override_priority(dq), false);
3779 }
3780
3781 #if HAVE_PTHREAD_WORKQUEUE_QOS
3782 static void
3783 _dispatch_queue_override_invoke(void *ctxt)
3784 {
3785 dispatch_continuation_t dc = (dispatch_continuation_t)ctxt;
3786 dispatch_queue_t dq = dc->dc_data;
3787 pthread_priority_t p = 0;
3788
3789 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) &&
3790 fastpath(dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, acquire))) {
3791 _dispatch_queue_set_thread(dq);
3792
3793 _dispatch_object_debug(dq, "stolen onto thread 0x%x, 0x%lx",
3794 dq->dq_thread, _dispatch_get_defaultpriority());
3795
3796 pthread_priority_t old_dp = _dispatch_get_defaultpriority();
3797 _dispatch_reset_defaultpriority(dc->dc_priority);
3798
3799 dispatch_queue_t tq = NULL;
3800 _dispatch_thread_semaphore_t sema = 0;
3801 tq = dispatch_queue_invoke2(dq, &sema);
3802
3803 _dispatch_queue_clear_thread(dq);
3804 _dispatch_reset_defaultpriority(old_dp);
3805
3806 uint32_t running = dispatch_atomic_dec2o(dq, dq_running, release);
3807 if (sema) {
3808 _dispatch_thread_semaphore_signal(sema);
3809 } else if (!tq && running == 0) {
3810 p = _dispatch_queue_reset_override_priority(dq);
3811 if (p > (dq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
3812 _dispatch_wqthread_override_reset();
3813 }
3814 }
3815 _dispatch_introspection_queue_item_complete(dq);
3816 if (running == 0) {
3817 return _dispatch_queue_wakeup_with_qos_and_release(dq, p);
3818 }
3819 } else {
3820 mach_port_t th = dq->dq_thread;
3821 if (th) {
3822 p = _dispatch_queue_get_override_priority(dq);
3823 _dispatch_object_debug(dq, "overriding thr 0x%x to priority 0x%lx",
3824 th, p);
3825 _dispatch_wqthread_override_start(th, p);
3826 }
3827 }
3828 _dispatch_release(dq); // added when we pushed the override block
3829 }
3830 #endif
3831
3832 static inline bool
3833 _dispatch_queue_prepare_override(dispatch_queue_t dq, dispatch_queue_t tq,
3834 pthread_priority_t p)
3835 {
3836 #if HAVE_PTHREAD_WORKQUEUE_QOS
3837 if (dx_type(tq) != DISPATCH_QUEUE_ROOT_TYPE || !tq->dq_priority) {
3838 return false;
3839 }
3840 if (p <= (dq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
3841 return false;
3842 }
3843 if (p <= (tq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
3844 return false;
3845 }
3846 _dispatch_retain(dq);
3847 return true;
3848 #else
3849 (void)dq; (void)tq; (void)p;
3850 return false;
3851 #endif
3852 }
3853
3854 static inline void
3855 _dispatch_queue_push_override(dispatch_queue_t dq, dispatch_queue_t tq,
3856 pthread_priority_t p)
3857 {
3858 #if HAVE_PTHREAD_WORKQUEUE_QOS
3859 unsigned int qosbit, idx, overcommit;
3860 overcommit = (tq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) ? 1 : 0;
3861 qosbit = (p & _PTHREAD_PRIORITY_QOS_CLASS_MASK) >>
3862 _PTHREAD_PRIORITY_QOS_CLASS_SHIFT;
3863 idx = (unsigned int)__builtin_ffs((int)qosbit);
3864 if (!idx || idx > DISPATCH_QUEUE_QOS_COUNT) {
3865 DISPATCH_CRASH("Corrupted override priority");
3866 }
3867 dispatch_queue_t rq = &_dispatch_root_queues[((idx-1) << 1) | overcommit];
3868
3869 dispatch_continuation_t dc = _dispatch_continuation_alloc();
3870 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
3871 dc->dc_func = _dispatch_queue_override_invoke;
3872 dc->dc_ctxt = dc;
3873 dc->dc_priority = tq->dq_priority;
3874 dc->dc_voucher = NULL;
3875 dc->dc_data = dq;
3876 // dq retained by _dispatch_queue_prepare_override
3877
3878 _dispatch_queue_push(rq, dc, 0);
3879 #else
3880 (void)dq; (void)tq; (void)p;
3881 #endif
3882 }
3883
3884 #pragma mark -
3885 #pragma mark dispatch_root_queue_drain
3886
3887 DISPATCH_NOINLINE
3888 static bool
3889 _dispatch_queue_concurrent_drain_one_slow(dispatch_queue_t dq)
3890 {
3891 dispatch_root_queue_context_t qc = dq->do_ctxt;
3892 struct dispatch_object_s *const mediator = (void *)~0ul;
3893 bool pending = false, available = true;
3894 unsigned int sleep_time = DISPATCH_CONTENTION_USLEEP_START;
3895
3896 do {
3897 // Spin for a short while in case the contention is temporary -- e.g.
3898 // when starting up after dispatch_apply, or when executing a few
3899 // short continuations in a row.
3900 if (_dispatch_contention_wait_until(dq->dq_items_head != mediator)) {
3901 goto out;
3902 }
3903 // Since we have serious contention, we need to back off.
3904 if (!pending) {
3905 // Mark this queue as pending to avoid requests for further threads
3906 (void)dispatch_atomic_inc2o(qc, dgq_pending, relaxed);
3907 pending = true;
3908 }
3909 _dispatch_contention_usleep(sleep_time);
3910 if (fastpath(dq->dq_items_head != mediator)) goto out;
3911 sleep_time *= 2;
3912 } while (sleep_time < DISPATCH_CONTENTION_USLEEP_MAX);
3913
3914 // The ratio of work to libdispatch overhead must be bad. This
3915 // scenario implies that there are too many threads in the pool.
3916 // Create a new pending thread and then exit this thread.
3917 // The kernel will grant a new thread when the load subsides.
3918 _dispatch_debug("contention on global queue: %p", dq);
3919 available = false;
3920 out:
3921 if (pending) {
3922 (void)dispatch_atomic_dec2o(qc, dgq_pending, relaxed);
3923 }
3924 if (!available) {
3925 _dispatch_queue_wakeup_global(dq);
3926 }
3927 return available;
3928 }
3929
3930 DISPATCH_ALWAYS_INLINE
3931 static inline bool
3932 _dispatch_queue_concurrent_drain_one2(dispatch_queue_t dq)
3933 {
3934 // Wait for queue head and tail to be both non-empty or both empty
3935 bool available; // <rdar://problem/15917893>
3936 _dispatch_wait_until((dq->dq_items_head != NULL) ==
3937 (available = (dq->dq_items_tail != NULL)));
3938 return available;
3939 }
3940
3941 DISPATCH_ALWAYS_INLINE_NDEBUG
3942 static inline struct dispatch_object_s *
3943 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq)
3944 {
3945 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
3946
3947 start:
3948 // The mediator value acts both as a "lock" and a signal
3949 head = dispatch_atomic_xchg2o(dq, dq_items_head, mediator, relaxed);
3950
3951 if (slowpath(head == NULL)) {
3952 // The first xchg on the tail will tell the enqueueing thread that it
3953 // is safe to blindly write out to the head pointer. A cmpxchg honors
3954 // the algorithm.
3955 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_items_head, mediator,
3956 NULL, relaxed))) {
3957 goto start;
3958 }
3959 if (slowpath(dq->dq_items_tail) && // <rdar://problem/14416349>
3960 _dispatch_queue_concurrent_drain_one2(dq)) {
3961 goto start;
3962 }
3963 _dispatch_root_queue_debug("no work on global queue: %p", dq);
3964 return NULL;
3965 }
3966
3967 if (slowpath(head == mediator)) {
3968 // This thread lost the race for ownership of the queue.
3969 if (fastpath(_dispatch_queue_concurrent_drain_one_slow(dq))) {
3970 goto start;
3971 }
3972 return NULL;
3973 }
3974
3975 // Restore the head pointer to a sane value before returning.
3976 // If 'next' is NULL, then this item _might_ be the last item.
3977 next = fastpath(head->do_next);
3978
3979 if (slowpath(!next)) {
3980 dispatch_atomic_store2o(dq, dq_items_head, NULL, relaxed);
3981
3982 if (dispatch_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL, relaxed)) {
3983 // both head and tail are NULL now
3984 goto out;
3985 }
3986 // There must be a next item now.
3987 _dispatch_wait_until(next = head->do_next);
3988 }
3989
3990 dispatch_atomic_store2o(dq, dq_items_head, next, relaxed);
3991 _dispatch_queue_wakeup_global(dq);
3992 out:
3993 return head;
3994 }
3995
3996 static void
3997 _dispatch_root_queue_drain(dispatch_queue_t dq)
3998 {
3999 #if DISPATCH_DEBUG
4000 if (_dispatch_thread_getspecific(dispatch_queue_key)) {
4001 DISPATCH_CRASH("Premature thread recycling");
4002 }
4003 #endif
4004 _dispatch_thread_setspecific(dispatch_queue_key, dq);
4005 pthread_priority_t old_pri = _dispatch_get_priority();
4006 pthread_priority_t pri = dq->dq_priority ? dq->dq_priority : old_pri;
4007 pthread_priority_t old_dp = _dispatch_set_defaultpriority(pri);
4008
4009 #if DISPATCH_COCOA_COMPAT
4010 // ensure that high-level memory management techniques do not leak/crash
4011 if (dispatch_begin_thread_4GC) {
4012 dispatch_begin_thread_4GC();
4013 }
4014 void *pool = _dispatch_autorelease_pool_push();
4015 #endif // DISPATCH_COCOA_COMPAT
4016
4017 _dispatch_perfmon_start();
4018 struct dispatch_object_s *item;
4019 bool reset = false;
4020 while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
4021 if (reset) _dispatch_wqthread_override_reset();
4022 _dispatch_continuation_pop(item);
4023 reset = _dispatch_reset_defaultpriority_override();
4024 }
4025 _dispatch_voucher_debug("root queue clear", NULL);
4026 _dispatch_set_priority_and_replace_voucher(old_pri, NULL);
4027 _dispatch_reset_defaultpriority(old_dp);
4028 _dispatch_perfmon_end();
4029
4030 #if DISPATCH_COCOA_COMPAT
4031 _dispatch_autorelease_pool_pop(pool);
4032 if (dispatch_end_thread_4GC) {
4033 dispatch_end_thread_4GC();
4034 }
4035 #endif // DISPATCH_COCOA_COMPAT
4036
4037 _dispatch_thread_setspecific(dispatch_queue_key, NULL);
4038 }
4039
4040 #pragma mark -
4041 #pragma mark dispatch_worker_thread
4042
4043 #if HAVE_PTHREAD_WORKQUEUES
4044 static void
4045 _dispatch_worker_thread4(void *context)
4046 {
4047 dispatch_queue_t dq = context;
4048 dispatch_root_queue_context_t qc = dq->do_ctxt;
4049
4050 _dispatch_introspection_thread_add();
4051 int pending = (int)dispatch_atomic_dec2o(qc, dgq_pending, relaxed);
4052 dispatch_assert(pending >= 0);
4053 _dispatch_root_queue_drain(dq);
4054 __asm__(""); // prevent tailcall (for Instrument DTrace probe)
4055 }
4056
4057 #if HAVE_PTHREAD_WORKQUEUE_QOS
4058 static void
4059 _dispatch_worker_thread3(pthread_priority_t priority)
4060 {
4061 // Reset priority TSD to workaround <rdar://problem/17825261>
4062 _dispatch_thread_setspecific(dispatch_priority_key,
4063 (void*)(uintptr_t)(priority & ~_PTHREAD_PRIORITY_FLAGS_MASK));
4064 unsigned int overcommit, qosbit, idx;
4065 overcommit = (priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) ? 1 : 0;
4066 qosbit = (priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK) >>
4067 _PTHREAD_PRIORITY_QOS_CLASS_SHIFT;
4068 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS].
4069 dq_priority) {
4070 // If kernel doesn't support maintenance, bottom bit is background.
4071 // Shift to our idea of where background bit is.
4072 qosbit <<= 1;
4073 }
4074 idx = (unsigned int)__builtin_ffs((int)qosbit);
4075 dispatch_assert(idx > 0 && idx < DISPATCH_QUEUE_QOS_COUNT+1);
4076 dispatch_queue_t dq = &_dispatch_root_queues[((idx-1) << 1) | overcommit];
4077 return _dispatch_worker_thread4(dq);
4078 }
4079 #endif // HAVE_PTHREAD_WORKQUEUE_QOS
4080
4081 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
4082 // 6618342 Contact the team that owns the Instrument DTrace probe before
4083 // renaming this symbol
4084 static void
4085 _dispatch_worker_thread2(int priority, int options,
4086 void *context DISPATCH_UNUSED)
4087 {
4088 dispatch_assert(priority >= 0 && priority < WORKQ_NUM_PRIOQUEUE);
4089 dispatch_assert(!(options & ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT));
4090 dispatch_queue_t dq = _dispatch_wq2root_queues[priority][options];
4091
4092 return _dispatch_worker_thread4(dq);
4093 }
4094 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
4095 #endif // HAVE_PTHREAD_WORKQUEUES
4096
4097 #if DISPATCH_USE_PTHREAD_POOL
4098 // 6618342 Contact the team that owns the Instrument DTrace probe before
4099 // renaming this symbol
4100 static void *
4101 _dispatch_worker_thread(void *context)
4102 {
4103 dispatch_queue_t dq = context;
4104 dispatch_root_queue_context_t qc = dq->do_ctxt;
4105 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
4106
4107 if (pqc->dpq_thread_configure) {
4108 pqc->dpq_thread_configure();
4109 }
4110
4111 sigset_t mask;
4112 int r;
4113 // workaround tweaks the kernel workqueue does for us
4114 r = sigfillset(&mask);
4115 (void)dispatch_assume_zero(r);
4116 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
4117 (void)dispatch_assume_zero(r);
4118 _dispatch_introspection_thread_add();
4119
4120 const int64_t timeout = 5ull * NSEC_PER_SEC;
4121 do {
4122 _dispatch_root_queue_drain(dq);
4123 } while (dispatch_semaphore_wait(&pqc->dpq_thread_mediator,
4124 dispatch_time(0, timeout)) == 0);
4125
4126 (void)dispatch_atomic_inc2o(qc, dgq_thread_pool_size, release);
4127 _dispatch_queue_wakeup_global(dq);
4128 _dispatch_release(dq);
4129
4130 return NULL;
4131 }
4132
4133 int
4134 _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset)
4135 {
4136 int r;
4137
4138 /* Workaround: 6269619 Not all signals can be delivered on any thread */
4139
4140 r = sigdelset(set, SIGILL);
4141 (void)dispatch_assume_zero(r);
4142 r = sigdelset(set, SIGTRAP);
4143 (void)dispatch_assume_zero(r);
4144 #if HAVE_DECL_SIGEMT
4145 r = sigdelset(set, SIGEMT);
4146 (void)dispatch_assume_zero(r);
4147 #endif
4148 r = sigdelset(set, SIGFPE);
4149 (void)dispatch_assume_zero(r);
4150 r = sigdelset(set, SIGBUS);
4151 (void)dispatch_assume_zero(r);
4152 r = sigdelset(set, SIGSEGV);
4153 (void)dispatch_assume_zero(r);
4154 r = sigdelset(set, SIGSYS);
4155 (void)dispatch_assume_zero(r);
4156 r = sigdelset(set, SIGPIPE);
4157 (void)dispatch_assume_zero(r);
4158
4159 return pthread_sigmask(how, set, oset);
4160 }
4161 #endif // DISPATCH_USE_PTHREAD_POOL
4162
4163 #pragma mark -
4164 #pragma mark dispatch_runloop_queue
4165
4166 static bool _dispatch_program_is_probably_callback_driven;
4167
4168 #if DISPATCH_COCOA_COMPAT
4169
4170 dispatch_queue_t
4171 _dispatch_runloop_root_queue_create_4CF(const char *label, unsigned long flags)
4172 {
4173 dispatch_queue_t dq;
4174 size_t dqs;
4175
4176 if (slowpath(flags)) {
4177 return NULL;
4178 }
4179 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
4180 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_runloop), dqs);
4181 _dispatch_queue_init(dq);
4182 dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,true);
4183 dq->dq_label = label ? label : "runloop-queue"; // no-copy contract
4184 dq->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK;
4185 dq->dq_running = 1;
4186 dq->dq_is_thread_bound = 1;
4187 _dispatch_runloop_queue_port_init(dq);
4188 _dispatch_queue_set_bound_thread(dq);
4189 _dispatch_object_debug(dq, "%s", __func__);
4190 return _dispatch_introspection_queue_create(dq);
4191 }
4192
4193 void
4194 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq)
4195 {
4196 _dispatch_object_debug(dq, "%s", __func__);
4197 (void)dispatch_atomic_dec2o(dq, dq_running, relaxed);
4198 unsigned int suspend_cnt = dispatch_atomic_sub2o(dq, do_suspend_cnt,
4199 DISPATCH_OBJECT_SUSPEND_LOCK, release);
4200 _dispatch_queue_clear_bound_thread(dq);
4201 if (suspend_cnt == 0) {
4202 _dispatch_queue_wakeup(dq);
4203 }
4204 }
4205
4206 void
4207 _dispatch_runloop_queue_dispose(dispatch_queue_t dq)
4208 {
4209 _dispatch_object_debug(dq, "%s", __func__);
4210 _dispatch_introspection_queue_dispose(dq);
4211 _dispatch_runloop_queue_port_dispose(dq);
4212 _dispatch_queue_destroy(dq);
4213 }
4214
4215 bool
4216 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq)
4217 {
4218 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
4219 DISPATCH_CLIENT_CRASH("Not a runloop queue");
4220 }
4221 dispatch_retain(dq);
4222 bool r = _dispatch_runloop_queue_drain_one(dq);
4223 dispatch_release(dq);
4224 return r;
4225 }
4226
4227 void
4228 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq)
4229 {
4230 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
4231 DISPATCH_CLIENT_CRASH("Not a runloop queue");
4232 }
4233 _dispatch_runloop_queue_probe(dq);
4234 }
4235
4236 mach_port_t
4237 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq)
4238 {
4239 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
4240 DISPATCH_CLIENT_CRASH("Not a runloop queue");
4241 }
4242 return (mach_port_t)dq->do_ctxt;
4243 }
4244
4245 static void
4246 _dispatch_runloop_queue_port_init(void *ctxt)
4247 {
4248 dispatch_queue_t dq = (dispatch_queue_t)ctxt;
4249 mach_port_t mp;
4250 kern_return_t kr;
4251
4252 _dispatch_safe_fork = false;
4253 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &mp);
4254 DISPATCH_VERIFY_MIG(kr);
4255 (void)dispatch_assume_zero(kr);
4256 kr = mach_port_insert_right(mach_task_self(), mp, mp,
4257 MACH_MSG_TYPE_MAKE_SEND);
4258 DISPATCH_VERIFY_MIG(kr);
4259 (void)dispatch_assume_zero(kr);
4260 if (dq != &_dispatch_main_q) {
4261 struct mach_port_limits limits = {
4262 .mpl_qlimit = 1,
4263 };
4264 kr = mach_port_set_attributes(mach_task_self(), mp,
4265 MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits,
4266 sizeof(limits));
4267 DISPATCH_VERIFY_MIG(kr);
4268 (void)dispatch_assume_zero(kr);
4269 }
4270 dq->do_ctxt = (void*)(uintptr_t)mp;
4271
4272 _dispatch_program_is_probably_callback_driven = true;
4273 }
4274
4275 static void
4276 _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq)
4277 {
4278 mach_port_t mp = (mach_port_t)dq->do_ctxt;
4279 if (!mp) {
4280 return;
4281 }
4282 dq->do_ctxt = NULL;
4283 kern_return_t kr = mach_port_deallocate(mach_task_self(), mp);
4284 DISPATCH_VERIFY_MIG(kr);
4285 (void)dispatch_assume_zero(kr);
4286 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE, -1);
4287 DISPATCH_VERIFY_MIG(kr);
4288 (void)dispatch_assume_zero(kr);
4289 }
4290
4291 #pragma mark -
4292 #pragma mark dispatch_main_queue
4293
4294 mach_port_t
4295 _dispatch_get_main_queue_port_4CF(void)
4296 {
4297 dispatch_queue_t dq = &_dispatch_main_q;
4298 dispatch_once_f(&_dispatch_main_q_port_pred, dq,
4299 _dispatch_runloop_queue_port_init);
4300 return (mach_port_t)dq->do_ctxt;
4301 }
4302
4303 static bool main_q_is_draining;
4304
4305 // 6618342 Contact the team that owns the Instrument DTrace probe before
4306 // renaming this symbol
4307 DISPATCH_NOINLINE
4308 static void
4309 _dispatch_queue_set_mainq_drain_state(bool arg)
4310 {
4311 main_q_is_draining = arg;
4312 }
4313
4314 void
4315 _dispatch_main_queue_callback_4CF(mach_msg_header_t *msg DISPATCH_UNUSED)
4316 {
4317 if (main_q_is_draining) {
4318 return;
4319 }
4320 _dispatch_queue_set_mainq_drain_state(true);
4321 _dispatch_main_queue_drain();
4322 _dispatch_queue_set_mainq_drain_state(false);
4323 }
4324
4325 #endif
4326
4327 void
4328 dispatch_main(void)
4329 {
4330 #if HAVE_PTHREAD_MAIN_NP
4331 if (pthread_main_np()) {
4332 #endif
4333 _dispatch_object_debug(&_dispatch_main_q, "%s", __func__);
4334 _dispatch_program_is_probably_callback_driven = true;
4335 pthread_exit(NULL);
4336 DISPATCH_CRASH("pthread_exit() returned");
4337 #if HAVE_PTHREAD_MAIN_NP
4338 }
4339 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
4340 #endif
4341 }
4342
4343 DISPATCH_NOINLINE DISPATCH_NORETURN
4344 static void
4345 _dispatch_sigsuspend(void)
4346 {
4347 static const sigset_t mask;
4348
4349 for (;;) {
4350 sigsuspend(&mask);
4351 }
4352 }
4353
4354 DISPATCH_NORETURN
4355 static void
4356 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED)
4357 {
4358 // never returns, so burn bridges behind us
4359 _dispatch_clear_stack(0);
4360 _dispatch_sigsuspend();
4361 }
4362
4363 DISPATCH_NOINLINE
4364 static void
4365 _dispatch_queue_cleanup2(void)
4366 {
4367 dispatch_queue_t dq = &_dispatch_main_q;
4368 (void)dispatch_atomic_dec2o(dq, dq_running, relaxed);
4369 unsigned int suspend_cnt = dispatch_atomic_sub2o(dq, do_suspend_cnt,
4370 DISPATCH_OBJECT_SUSPEND_LOCK, release);
4371 dq->dq_is_thread_bound = 0;
4372 if (suspend_cnt == 0) {
4373 _dispatch_queue_wakeup(dq);
4374 }
4375
4376 // overload the "probably" variable to mean that dispatch_main() or
4377 // similar non-POSIX API was called
4378 // this has to run before the DISPATCH_COCOA_COMPAT below
4379 if (_dispatch_program_is_probably_callback_driven) {
4380 _dispatch_barrier_async_detached_f(_dispatch_get_root_queue(
4381 _DISPATCH_QOS_CLASS_DEFAULT, true), NULL, _dispatch_sig_thread);
4382 sleep(1); // workaround 6778970
4383 }
4384
4385 #if DISPATCH_COCOA_COMPAT
4386 dispatch_once_f(&_dispatch_main_q_port_pred, dq,
4387 _dispatch_runloop_queue_port_init);
4388 _dispatch_runloop_queue_port_dispose(dq);
4389 #endif
4390 }
4391
4392 static void
4393 _dispatch_queue_cleanup(void *ctxt)
4394 {
4395 if (ctxt == &_dispatch_main_q) {
4396 return _dispatch_queue_cleanup2();
4397 }
4398 // POSIX defines that destructors are only called if 'ctxt' is non-null
4399 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
4400 }