]> git.saurik.com Git - apple/libdispatch.git/blob - src/queue.c
libdispatch-339.1.9.tar.gz
[apple/libdispatch.git] / src / queue.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #endif
25
26 #if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
27 !defined(DISPATCH_ENABLE_THREAD_POOL)
28 #define DISPATCH_ENABLE_THREAD_POOL 1
29 #endif
30 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
31 #define DISPATCH_USE_PTHREAD_POOL 1
32 #endif
33 #if HAVE_PTHREAD_WORKQUEUES && !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
34 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
35 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
36 #endif
37 #if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
38 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
39 #define pthread_workqueue_t void*
40 #endif
41
42 static void _dispatch_cache_cleanup(void *value);
43 static void _dispatch_async_f_redirect(dispatch_queue_t dq,
44 dispatch_continuation_t dc);
45 static void _dispatch_queue_cleanup(void *ctxt);
46 static inline void _dispatch_queue_wakeup_global2(dispatch_queue_t dq,
47 unsigned int n);
48 static inline void _dispatch_queue_wakeup_global(dispatch_queue_t dq);
49 static inline _dispatch_thread_semaphore_t
50 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq);
51 #if HAVE_PTHREAD_WORKQUEUES
52 static void _dispatch_worker_thread3(void *context);
53 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
54 static void _dispatch_worker_thread2(int priority, int options, void *context);
55 #endif
56 #endif
57 #if DISPATCH_USE_PTHREAD_POOL
58 static void *_dispatch_worker_thread(void *context);
59 static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
60 #endif
61
62 #if DISPATCH_COCOA_COMPAT
63 static dispatch_once_t _dispatch_main_q_port_pred;
64 static dispatch_queue_t _dispatch_main_queue_wakeup(void);
65 unsigned long _dispatch_runloop_queue_wakeup(dispatch_queue_t dq);
66 static void _dispatch_runloop_queue_port_init(void *ctxt);
67 static void _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq);
68 #endif
69
70 #pragma mark -
71 #pragma mark dispatch_root_queue
72
73 #if DISPATCH_ENABLE_THREAD_POOL
74 static struct dispatch_semaphore_s _dispatch_thread_mediator[] = {
75 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
76 .do_vtable = DISPATCH_VTABLE(semaphore),
77 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
78 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
79 },
80 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
81 .do_vtable = DISPATCH_VTABLE(semaphore),
82 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
83 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
84 },
85 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
86 .do_vtable = DISPATCH_VTABLE(semaphore),
87 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
88 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
89 },
90 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
91 .do_vtable = DISPATCH_VTABLE(semaphore),
92 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
93 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
94 },
95 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
96 .do_vtable = DISPATCH_VTABLE(semaphore),
97 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
98 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
99 },
100 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
101 .do_vtable = DISPATCH_VTABLE(semaphore),
102 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
103 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
104 },
105 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
106 .do_vtable = DISPATCH_VTABLE(semaphore),
107 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
108 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
109 },
110 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
111 .do_vtable = DISPATCH_VTABLE(semaphore),
112 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
113 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
114 },
115 };
116 #endif
117
118 #define MAX_PTHREAD_COUNT 255
119
120 struct dispatch_root_queue_context_s {
121 union {
122 struct {
123 unsigned int volatile dgq_pending;
124 #if HAVE_PTHREAD_WORKQUEUES
125 int dgq_wq_priority, dgq_wq_options;
126 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
127 pthread_workqueue_t dgq_kworkqueue;
128 #endif
129 #endif // HAVE_PTHREAD_WORKQUEUES
130 #if DISPATCH_USE_PTHREAD_POOL
131 void *dgq_ctxt;
132 dispatch_semaphore_t dgq_thread_mediator;
133 uint32_t volatile dgq_thread_pool_size;
134 #endif
135 };
136 char _dgq_pad[DISPATCH_CACHELINE_SIZE];
137 };
138 };
139 typedef struct dispatch_root_queue_context_s *dispatch_root_queue_context_t;
140
141 DISPATCH_CACHELINE_ALIGN
142 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
143 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {{{
144 #if HAVE_PTHREAD_WORKQUEUES
145 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
146 .dgq_wq_options = 0,
147 #endif
148 #if DISPATCH_ENABLE_THREAD_POOL
149 .dgq_thread_mediator = &_dispatch_thread_mediator[
150 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
151 #endif
152 }}},
153 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {{{
154 #if HAVE_PTHREAD_WORKQUEUES
155 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
156 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
157 #endif
158 #if DISPATCH_ENABLE_THREAD_POOL
159 .dgq_thread_mediator = &_dispatch_thread_mediator[
160 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
161 #endif
162 }}},
163 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {{{
164 #if HAVE_PTHREAD_WORKQUEUES
165 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
166 .dgq_wq_options = 0,
167 #endif
168 #if DISPATCH_ENABLE_THREAD_POOL
169 .dgq_thread_mediator = &_dispatch_thread_mediator[
170 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
171 #endif
172 }}},
173 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {{{
174 #if HAVE_PTHREAD_WORKQUEUES
175 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
176 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
177 #endif
178 #if DISPATCH_ENABLE_THREAD_POOL
179 .dgq_thread_mediator = &_dispatch_thread_mediator[
180 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
181 #endif
182 }}},
183 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {{{
184 #if HAVE_PTHREAD_WORKQUEUES
185 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
186 .dgq_wq_options = 0,
187 #endif
188 #if DISPATCH_ENABLE_THREAD_POOL
189 .dgq_thread_mediator = &_dispatch_thread_mediator[
190 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
191 #endif
192 }}},
193 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {{{
194 #if HAVE_PTHREAD_WORKQUEUES
195 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
196 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
197 #endif
198 #if DISPATCH_ENABLE_THREAD_POOL
199 .dgq_thread_mediator = &_dispatch_thread_mediator[
200 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
201 #endif
202 }}},
203 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {{{
204 #if HAVE_PTHREAD_WORKQUEUES
205 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
206 .dgq_wq_options = 0,
207 #endif
208 #if DISPATCH_ENABLE_THREAD_POOL
209 .dgq_thread_mediator = &_dispatch_thread_mediator[
210 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
211 #endif
212 }}},
213 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {{{
214 #if HAVE_PTHREAD_WORKQUEUES
215 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
216 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
217 #endif
218 #if DISPATCH_ENABLE_THREAD_POOL
219 .dgq_thread_mediator = &_dispatch_thread_mediator[
220 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
221 #endif
222 }}},
223 };
224
225 // 6618342 Contact the team that owns the Instrument DTrace probe before
226 // renaming this symbol
227 // dq_running is set to 2 so that barrier operations go through the slow path
228 DISPATCH_CACHELINE_ALIGN
229 struct dispatch_queue_s _dispatch_root_queues[] = {
230 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
231 .do_vtable = DISPATCH_VTABLE(queue_root),
232 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
233 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
234 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
235 .do_ctxt = &_dispatch_root_queue_contexts[
236 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
237 .dq_label = "com.apple.root.low-priority",
238 .dq_running = 2,
239 .dq_width = UINT32_MAX,
240 .dq_serialnum = 4,
241 },
242 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
243 .do_vtable = DISPATCH_VTABLE(queue_root),
244 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
245 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
246 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
247 .do_ctxt = &_dispatch_root_queue_contexts[
248 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
249 .dq_label = "com.apple.root.low-overcommit-priority",
250 .dq_running = 2,
251 .dq_width = UINT32_MAX,
252 .dq_serialnum = 5,
253 },
254 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
255 .do_vtable = DISPATCH_VTABLE(queue_root),
256 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
257 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
258 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
259 .do_ctxt = &_dispatch_root_queue_contexts[
260 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
261 .dq_label = "com.apple.root.default-priority",
262 .dq_running = 2,
263 .dq_width = UINT32_MAX,
264 .dq_serialnum = 6,
265 },
266 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
267 .do_vtable = DISPATCH_VTABLE(queue_root),
268 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
269 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
270 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
271 .do_ctxt = &_dispatch_root_queue_contexts[
272 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
273 .dq_label = "com.apple.root.default-overcommit-priority",
274 .dq_running = 2,
275 .dq_width = UINT32_MAX,
276 .dq_serialnum = 7,
277 },
278 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
279 .do_vtable = DISPATCH_VTABLE(queue_root),
280 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
281 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
282 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
283 .do_ctxt = &_dispatch_root_queue_contexts[
284 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
285 .dq_label = "com.apple.root.high-priority",
286 .dq_running = 2,
287 .dq_width = UINT32_MAX,
288 .dq_serialnum = 8,
289 },
290 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
291 .do_vtable = DISPATCH_VTABLE(queue_root),
292 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
293 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
294 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
295 .do_ctxt = &_dispatch_root_queue_contexts[
296 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
297 .dq_label = "com.apple.root.high-overcommit-priority",
298 .dq_running = 2,
299 .dq_width = UINT32_MAX,
300 .dq_serialnum = 9,
301 },
302 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
303 .do_vtable = DISPATCH_VTABLE(queue_root),
304 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
305 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
306 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
307 .do_ctxt = &_dispatch_root_queue_contexts[
308 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
309 .dq_label = "com.apple.root.background-priority",
310 .dq_running = 2,
311 .dq_width = UINT32_MAX,
312 .dq_serialnum = 10,
313 },
314 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
315 .do_vtable = DISPATCH_VTABLE(queue_root),
316 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
317 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
318 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
319 .do_ctxt = &_dispatch_root_queue_contexts[
320 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
321 .dq_label = "com.apple.root.background-overcommit-priority",
322 .dq_running = 2,
323 .dq_width = UINT32_MAX,
324 .dq_serialnum = 11,
325 },
326 };
327
328 #if HAVE_PTHREAD_WORKQUEUES
329 static const dispatch_queue_t _dispatch_wq2root_queues[][2] = {
330 [WORKQ_LOW_PRIOQUEUE][0] = &_dispatch_root_queues[
331 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
332 [WORKQ_LOW_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
333 &_dispatch_root_queues[
334 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
335 [WORKQ_DEFAULT_PRIOQUEUE][0] = &_dispatch_root_queues[
336 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
337 [WORKQ_DEFAULT_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
338 &_dispatch_root_queues[
339 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
340 [WORKQ_HIGH_PRIOQUEUE][0] = &_dispatch_root_queues[
341 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
342 [WORKQ_HIGH_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
343 &_dispatch_root_queues[
344 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
345 [WORKQ_BG_PRIOQUEUE][0] = &_dispatch_root_queues[
346 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
347 [WORKQ_BG_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
348 &_dispatch_root_queues[
349 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
350 };
351 #endif // HAVE_PTHREAD_WORKQUEUES
352
353 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
354 static struct dispatch_queue_s _dispatch_mgr_root_queue;
355 #else
356 #define _dispatch_mgr_root_queue \
357 _dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY]
358 #endif
359
360 // 6618342 Contact the team that owns the Instrument DTrace probe before
361 // renaming this symbol
362 DISPATCH_CACHELINE_ALIGN
363 struct dispatch_queue_s _dispatch_mgr_q = {
364 .do_vtable = DISPATCH_VTABLE(queue_mgr),
365 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
366 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
367 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
368 .do_targetq = &_dispatch_mgr_root_queue,
369 .dq_label = "com.apple.libdispatch-manager",
370 .dq_width = 1,
371 .dq_is_thread_bound = 1,
372 .dq_serialnum = 2,
373 };
374
375 dispatch_queue_t
376 dispatch_get_global_queue(long priority, unsigned long flags)
377 {
378 if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
379 return NULL;
380 }
381 return _dispatch_get_root_queue(priority,
382 flags & DISPATCH_QUEUE_OVERCOMMIT);
383 }
384
385 DISPATCH_ALWAYS_INLINE
386 static inline dispatch_queue_t
387 _dispatch_get_current_queue(void)
388 {
389 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
390 }
391
392 dispatch_queue_t
393 dispatch_get_current_queue(void)
394 {
395 return _dispatch_get_current_queue();
396 }
397
398 DISPATCH_ALWAYS_INLINE
399 static inline bool
400 _dispatch_queue_targets_queue(dispatch_queue_t dq1, dispatch_queue_t dq2)
401 {
402 while (dq1) {
403 if (dq1 == dq2) {
404 return true;
405 }
406 dq1 = dq1->do_targetq;
407 }
408 return false;
409 }
410
411 #define DISPATCH_ASSERT_QUEUE_MESSAGE "BUG in client of libdispatch: " \
412 "Assertion failed: Block was run on an unexpected queue"
413
414 DISPATCH_NOINLINE
415 static void
416 _dispatch_assert_queue_fail(dispatch_queue_t dq, bool expected)
417 {
418 char *msg;
419 asprintf(&msg, "%s\n%s queue: 0x%p[%s]", DISPATCH_ASSERT_QUEUE_MESSAGE,
420 expected ? "Expected" : "Unexpected", dq, dq->dq_label ?
421 dq->dq_label : "");
422 _dispatch_log("%s", msg);
423 _dispatch_set_crash_log_message(msg);
424 _dispatch_hardware_crash();
425 free(msg);
426 }
427
428 void
429 dispatch_assert_queue(dispatch_queue_t dq)
430 {
431 if (slowpath(!dq) || slowpath(!(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE))) {
432 DISPATCH_CLIENT_CRASH("invalid queue passed to "
433 "dispatch_assert_queue()");
434 }
435 dispatch_queue_t cq = _dispatch_queue_get_current();
436 if (fastpath(cq) && fastpath(_dispatch_queue_targets_queue(cq, dq))) {
437 return;
438 }
439 _dispatch_assert_queue_fail(dq, true);
440 }
441
442 void
443 dispatch_assert_queue_not(dispatch_queue_t dq)
444 {
445 if (slowpath(!dq) || slowpath(!(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE))) {
446 DISPATCH_CLIENT_CRASH("invalid queue passed to "
447 "dispatch_assert_queue_not()");
448 }
449 dispatch_queue_t cq = _dispatch_queue_get_current();
450 if (slowpath(cq) && slowpath(_dispatch_queue_targets_queue(cq, dq))) {
451 _dispatch_assert_queue_fail(dq, false);
452 }
453 }
454
455 #if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
456 #define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
457 #define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
458 #else
459 #define _dispatch_root_queue_debug(...)
460 #define _dispatch_debug_root_queue(...)
461 #endif
462
463 #pragma mark -
464 #pragma mark dispatch_init
465
466 static void
467 _dispatch_hw_config_init(void)
468 {
469 _dispatch_hw_config.cc_max_active = _dispatch_get_activecpu();
470 _dispatch_hw_config.cc_max_logical = _dispatch_get_logicalcpu_max();
471 _dispatch_hw_config.cc_max_physical = _dispatch_get_physicalcpu_max();
472 }
473
474 static inline bool
475 _dispatch_root_queues_init_workq(void)
476 {
477 bool result = false;
478 #if HAVE_PTHREAD_WORKQUEUES
479 bool disable_wq = false;
480 #if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
481 disable_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
482 #endif
483 int r;
484 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
485 if (!disable_wq) {
486 #if PTHREAD_WORKQUEUE_SPI_VERSION >= 20121218
487 pthread_workqueue_setdispatchoffset_np(
488 offsetof(struct dispatch_queue_s, dq_serialnum));
489 #endif
490 r = pthread_workqueue_setdispatch_np(_dispatch_worker_thread2);
491 #if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
492 (void)dispatch_assume_zero(r);
493 #endif
494 result = !r;
495 }
496 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
497 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
498 if (!result) {
499 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
500 pthread_workqueue_attr_t pwq_attr;
501 if (!disable_wq) {
502 r = pthread_workqueue_attr_init_np(&pwq_attr);
503 (void)dispatch_assume_zero(r);
504 }
505 #endif
506 int i;
507 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
508 pthread_workqueue_t pwq = NULL;
509 dispatch_root_queue_context_t qc;
510 qc = &_dispatch_root_queue_contexts[i];
511 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
512 if (!disable_wq
513 #if DISPATCH_NO_BG_PRIORITY
514 && (qc->dgq_wq_priority != WORKQ_BG_PRIOQUEUE)
515 #endif
516 ) {
517 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr,
518 qc->dgq_wq_priority);
519 (void)dispatch_assume_zero(r);
520 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr,
521 qc->dgq_wq_options &
522 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT);
523 (void)dispatch_assume_zero(r);
524 r = pthread_workqueue_create_np(&pwq, &pwq_attr);
525 (void)dispatch_assume_zero(r);
526 result = result || dispatch_assume(pwq);
527 }
528 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
529 qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul);
530 }
531 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
532 if (!disable_wq) {
533 r = pthread_workqueue_attr_destroy_np(&pwq_attr);
534 (void)dispatch_assume_zero(r);
535 }
536 #endif
537 }
538 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
539 #endif // HAVE_PTHREAD_WORKQUEUES
540 return result;
541 }
542
543 #if DISPATCH_USE_PTHREAD_POOL
544 static inline void
545 _dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc,
546 bool overcommit)
547 {
548 qc->dgq_thread_pool_size = overcommit ? MAX_PTHREAD_COUNT :
549 _dispatch_hw_config.cc_max_active;
550 #if USE_MACH_SEM
551 // override the default FIFO behavior for the pool semaphores
552 kern_return_t kr = semaphore_create(mach_task_self(),
553 &qc->dgq_thread_mediator->dsema_port, SYNC_POLICY_LIFO, 0);
554 DISPATCH_VERIFY_MIG(kr);
555 (void)dispatch_assume_zero(kr);
556 (void)dispatch_assume(qc->dgq_thread_mediator->dsema_port);
557 #elif USE_POSIX_SEM
558 /* XXXRW: POSIX semaphores don't support LIFO? */
559 int ret = sem_init(&qc->dgq_thread_mediator->dsema_sem, 0, 0);
560 (void)dispatch_assume_zero(ret);
561 #endif
562 }
563 #endif // DISPATCH_USE_PTHREAD_POOL
564
565 static void
566 _dispatch_root_queues_init(void *context DISPATCH_UNUSED)
567 {
568 _dispatch_safe_fork = false;
569 if (!_dispatch_root_queues_init_workq()) {
570 #if DISPATCH_ENABLE_THREAD_POOL
571 int i;
572 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
573 bool overcommit = true;
574 #if TARGET_OS_EMBEDDED
575 // some software hangs if the non-overcommitting queues do not
576 // overcommit when threads block. Someday, this behavior should
577 // apply to all platforms
578 if (!(i & 1)) {
579 overcommit = false;
580 }
581 #endif
582 _dispatch_root_queue_init_pthread_pool(
583 &_dispatch_root_queue_contexts[i], overcommit);
584 }
585 #else
586 DISPATCH_CRASH("Root queue initialization failed");
587 #endif // DISPATCH_ENABLE_THREAD_POOL
588 }
589
590 }
591
592 #define countof(x) (sizeof(x) / sizeof(x[0]))
593
594 DISPATCH_EXPORT DISPATCH_NOTHROW
595 void
596 libdispatch_init(void)
597 {
598 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT == 4);
599 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 8);
600
601 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW ==
602 -DISPATCH_QUEUE_PRIORITY_HIGH);
603 dispatch_assert(countof(_dispatch_root_queues) ==
604 DISPATCH_ROOT_QUEUE_COUNT);
605 dispatch_assert(countof(_dispatch_root_queue_contexts) ==
606 DISPATCH_ROOT_QUEUE_COUNT);
607 #if HAVE_PTHREAD_WORKQUEUES
608 dispatch_assert(sizeof(_dispatch_wq2root_queues) /
609 sizeof(_dispatch_wq2root_queues[0][0]) ==
610 DISPATCH_ROOT_QUEUE_COUNT);
611 #endif
612 #if DISPATCH_ENABLE_THREAD_POOL
613 dispatch_assert(countof(_dispatch_thread_mediator) ==
614 DISPATCH_ROOT_QUEUE_COUNT);
615 #endif
616
617 dispatch_assert(sizeof(struct dispatch_apply_s) <=
618 DISPATCH_CONTINUATION_SIZE);
619 dispatch_assert(sizeof(struct dispatch_queue_s) % DISPATCH_CACHELINE_SIZE
620 == 0);
621 dispatch_assert(sizeof(struct dispatch_root_queue_context_s) %
622 DISPATCH_CACHELINE_SIZE == 0);
623
624 _dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup);
625 #if !DISPATCH_USE_OS_SEMAPHORE_CACHE
626 _dispatch_thread_key_create(&dispatch_sema4_key,
627 (void (*)(void *))_dispatch_thread_semaphore_dispose);
628 #endif
629 _dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup);
630 _dispatch_thread_key_create(&dispatch_io_key, NULL);
631 _dispatch_thread_key_create(&dispatch_apply_key, NULL);
632 #if DISPATCH_PERF_MON
633 _dispatch_thread_key_create(&dispatch_bcounter_key, NULL);
634 #endif
635
636 #if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
637 _dispatch_main_q.do_targetq = &_dispatch_root_queues[
638 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY];
639 #endif
640
641 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_main_q);
642 _dispatch_queue_set_bound_thread(&_dispatch_main_q);
643
644 #if DISPATCH_USE_PTHREAD_ATFORK
645 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare,
646 dispatch_atfork_parent, dispatch_atfork_child));
647 #endif
648
649 _dispatch_hw_config_init();
650 _dispatch_vtable_init();
651 _os_object_init();
652 _dispatch_introspection_init();
653 }
654
655 DISPATCH_EXPORT DISPATCH_NOTHROW
656 void
657 dispatch_atfork_child(void)
658 {
659 void *crash = (void *)0x100;
660 size_t i;
661
662 if (_dispatch_safe_fork) {
663 return;
664 }
665 _dispatch_child_of_unsafe_fork = true;
666
667 _dispatch_main_q.dq_items_head = crash;
668 _dispatch_main_q.dq_items_tail = crash;
669
670 _dispatch_mgr_q.dq_items_head = crash;
671 _dispatch_mgr_q.dq_items_tail = crash;
672
673 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
674 _dispatch_root_queues[i].dq_items_head = crash;
675 _dispatch_root_queues[i].dq_items_tail = crash;
676 }
677 }
678
679 #pragma mark -
680 #pragma mark dispatch_queue_t
681
682 // skip zero
683 // 1 - main_q
684 // 2 - mgr_q
685 // 3 - mgr_root_q
686 // 4,5,6,7,8,9,10,11 - global queues
687 // we use 'xadd' on Intel, so the initial value == next assigned
688 unsigned long volatile _dispatch_queue_serial_numbers = 12;
689
690 dispatch_queue_t
691 dispatch_queue_create_with_target(const char *label,
692 dispatch_queue_attr_t attr, dispatch_queue_t tq)
693 {
694 dispatch_queue_t dq;
695
696 dq = _dispatch_alloc(DISPATCH_VTABLE(queue),
697 sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
698
699 _dispatch_queue_init(dq);
700 if (label) {
701 dq->dq_label = strdup(label);
702 }
703
704 if (attr == DISPATCH_QUEUE_CONCURRENT) {
705 dq->dq_width = UINT32_MAX;
706 if (!tq) {
707 tq = _dispatch_get_root_queue(0, false);
708 }
709 } else {
710 if (!tq) {
711 // Default target queue is overcommit!
712 tq = _dispatch_get_root_queue(0, true);
713 }
714 if (slowpath(attr)) {
715 dispatch_debug_assert(!attr, "Invalid attribute");
716 }
717 }
718 dq->do_targetq = tq;
719 _dispatch_object_debug(dq, "%s", __func__);
720 return _dispatch_introspection_queue_create(dq);
721 }
722
723 dispatch_queue_t
724 dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
725 {
726 return dispatch_queue_create_with_target(label, attr,
727 DISPATCH_TARGET_QUEUE_DEFAULT);
728 }
729
730 void
731 _dispatch_queue_destroy(dispatch_object_t dou)
732 {
733 dispatch_queue_t dq = dou._dq;
734 if (slowpath(dq == _dispatch_queue_get_current())) {
735 DISPATCH_CRASH("Release of a queue by itself");
736 }
737 if (slowpath(dq->dq_items_tail)) {
738 DISPATCH_CRASH("Release of a queue while items are enqueued");
739 }
740
741 // trash the tail queue so that use after free will crash
742 dq->dq_items_tail = (void *)0x200;
743
744 dispatch_queue_t dqsq = dispatch_atomic_xchg2o(dq, dq_specific_q,
745 (void *)0x200, relaxed);
746 if (dqsq) {
747 _dispatch_release(dqsq);
748 }
749 }
750
751 // 6618342 Contact the team that owns the Instrument DTrace probe before
752 // renaming this symbol
753 void
754 _dispatch_queue_dispose(dispatch_queue_t dq)
755 {
756 _dispatch_object_debug(dq, "%s", __func__);
757 _dispatch_introspection_queue_dispose(dq);
758 if (dq->dq_label) {
759 free((void*)dq->dq_label);
760 }
761 _dispatch_queue_destroy(dq);
762 }
763
764 const char *
765 dispatch_queue_get_label(dispatch_queue_t dq)
766 {
767 if (slowpath(dq == DISPATCH_CURRENT_QUEUE_LABEL)) {
768 dq = _dispatch_get_current_queue();
769 }
770 return dq->dq_label ? dq->dq_label : "";
771 }
772
773 static void
774 _dispatch_queue_set_width2(void *ctxt)
775 {
776 int w = (int)(intptr_t)ctxt; // intentional truncation
777 uint32_t tmp;
778 dispatch_queue_t dq = _dispatch_queue_get_current();
779
780 if (w == 1 || w == 0) {
781 dq->dq_width = 1;
782 _dispatch_object_debug(dq, "%s", __func__);
783 return;
784 }
785 if (w > 0) {
786 tmp = (unsigned int)w;
787 } else switch (w) {
788 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
789 tmp = _dispatch_hw_config.cc_max_physical;
790 break;
791 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
792 tmp = _dispatch_hw_config.cc_max_active;
793 break;
794 default:
795 // fall through
796 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
797 tmp = _dispatch_hw_config.cc_max_logical;
798 break;
799 }
800 // multiply by two since the running count is inc/dec by two
801 // (the low bit == barrier)
802 dq->dq_width = tmp * 2;
803 _dispatch_object_debug(dq, "%s", __func__);
804 }
805
806 void
807 dispatch_queue_set_width(dispatch_queue_t dq, long width)
808 {
809 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) ||
810 slowpath(dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE)) {
811 return;
812 }
813 _dispatch_barrier_trysync_f(dq, (void*)(intptr_t)width,
814 _dispatch_queue_set_width2);
815 }
816
817 // 6618342 Contact the team that owns the Instrument DTrace probe before
818 // renaming this symbol
819 static void
820 _dispatch_set_target_queue2(void *ctxt)
821 {
822 dispatch_queue_t prev_dq, dq = _dispatch_queue_get_current();
823
824 prev_dq = dq->do_targetq;
825 dq->do_targetq = ctxt;
826 _dispatch_release(prev_dq);
827 _dispatch_object_debug(dq, "%s", __func__);
828 }
829
830 void
831 dispatch_set_target_queue(dispatch_object_t dou, dispatch_queue_t dq)
832 {
833 DISPATCH_OBJECT_TFB(_dispatch_objc_set_target_queue, dou, dq);
834 if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) ||
835 slowpath(dx_type(dou._do) == DISPATCH_QUEUE_ROOT_TYPE)) {
836 return;
837 }
838 unsigned long type = dx_metatype(dou._do);
839 if (slowpath(!dq)) {
840 bool is_concurrent_q = (type == _DISPATCH_QUEUE_TYPE &&
841 slowpath(dou._dq->dq_width > 1));
842 dq = _dispatch_get_root_queue(0, !is_concurrent_q);
843 }
844 // TODO: put into the vtable
845 switch(type) {
846 case _DISPATCH_QUEUE_TYPE:
847 case _DISPATCH_SOURCE_TYPE:
848 _dispatch_retain(dq);
849 return _dispatch_barrier_trysync_f(dou._dq, dq,
850 _dispatch_set_target_queue2);
851 case _DISPATCH_IO_TYPE:
852 return _dispatch_io_set_target_queue(dou._dchannel, dq);
853 default: {
854 dispatch_queue_t prev_dq;
855 _dispatch_retain(dq);
856 prev_dq = dispatch_atomic_xchg2o(dou._do, do_targetq, dq, release);
857 if (prev_dq) _dispatch_release(prev_dq);
858 _dispatch_object_debug(dou._do, "%s", __func__);
859 return;
860 }
861 }
862 }
863
864 #pragma mark -
865 #pragma mark dispatch_pthread_root_queue
866
867 struct dispatch_pthread_root_queue_context_s {
868 pthread_attr_t dpq_thread_attr;
869 dispatch_block_t dpq_thread_configure;
870 struct dispatch_semaphore_s dpq_thread_mediator;
871 };
872 typedef struct dispatch_pthread_root_queue_context_s *
873 dispatch_pthread_root_queue_context_t;
874
875 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
876 static struct dispatch_pthread_root_queue_context_s
877 _dispatch_mgr_root_queue_pthread_context;
878 static struct dispatch_root_queue_context_s
879 _dispatch_mgr_root_queue_context = {{{
880 #if HAVE_PTHREAD_WORKQUEUES
881 .dgq_kworkqueue = (void*)(~0ul),
882 #endif
883 .dgq_ctxt = &_dispatch_mgr_root_queue_pthread_context,
884 .dgq_thread_pool_size = 1,
885 }}};
886 static struct dispatch_queue_s _dispatch_mgr_root_queue = {
887 .do_vtable = DISPATCH_VTABLE(queue_root),
888 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
889 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
890 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
891 .do_ctxt = &_dispatch_mgr_root_queue_context,
892 .dq_label = "com.apple.root.libdispatch-manager",
893 .dq_running = 2,
894 .dq_width = UINT32_MAX,
895 .dq_serialnum = 3,
896 };
897 static struct {
898 volatile int prio;
899 int policy;
900 pthread_t tid;
901 } _dispatch_mgr_sched;
902 static dispatch_once_t _dispatch_mgr_sched_pred;
903
904 static void
905 _dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED)
906 {
907 struct sched_param param;
908 pthread_attr_t *attr;
909 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
910 (void)dispatch_assume_zero(pthread_attr_init(attr));
911 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr,
912 &_dispatch_mgr_sched.policy));
913 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
914 // high-priority workq threads are at priority 2 above default
915 _dispatch_mgr_sched.prio = param.sched_priority + 2;
916 }
917
918 DISPATCH_NOINLINE
919 static pthread_t *
920 _dispatch_mgr_root_queue_init(void)
921 {
922 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
923 struct sched_param param;
924 pthread_attr_t *attr;
925 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
926 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr,
927 PTHREAD_CREATE_DETACHED));
928 #if !DISPATCH_DEBUG
929 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr, 64 * 1024));
930 #endif
931 param.sched_priority = _dispatch_mgr_sched.prio;
932 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr, &param));
933 return &_dispatch_mgr_sched.tid;
934 }
935
936 static inline void
937 _dispatch_mgr_priority_apply(void)
938 {
939 struct sched_param param;
940 do {
941 param.sched_priority = _dispatch_mgr_sched.prio;
942 (void)dispatch_assume_zero(pthread_setschedparam(
943 _dispatch_mgr_sched.tid, _dispatch_mgr_sched.policy, &param));
944 } while (_dispatch_mgr_sched.prio > param.sched_priority);
945 }
946
947 DISPATCH_NOINLINE
948 void
949 _dispatch_mgr_priority_init(void)
950 {
951 struct sched_param param;
952 pthread_attr_t *attr;
953 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
954 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
955 if (slowpath(_dispatch_mgr_sched.prio > param.sched_priority)) {
956 return _dispatch_mgr_priority_apply();
957 }
958 }
959
960 DISPATCH_NOINLINE
961 static void
962 _dispatch_mgr_priority_raise(const pthread_attr_t *attr)
963 {
964 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
965 struct sched_param param;
966 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
967 int p = _dispatch_mgr_sched.prio;
968 do if (p >= param.sched_priority) {
969 return;
970 } while (slowpath(!dispatch_atomic_cmpxchgvw2o(&_dispatch_mgr_sched, prio,
971 p, param.sched_priority, &p, relaxed)));
972 if (_dispatch_mgr_sched.tid) {
973 return _dispatch_mgr_priority_apply();
974 }
975 }
976
977 dispatch_queue_t
978 dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
979 const pthread_attr_t *attr, dispatch_block_t configure)
980 {
981 dispatch_queue_t dq;
982 dispatch_root_queue_context_t qc;
983 dispatch_pthread_root_queue_context_t pqc;
984 size_t dqs;
985
986 if (slowpath(flags)) {
987 return NULL;
988 }
989 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
990 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_root), dqs +
991 sizeof(struct dispatch_root_queue_context_s) +
992 sizeof(struct dispatch_pthread_root_queue_context_s));
993 qc = (void*)dq + dqs;
994 pqc = (void*)qc + sizeof(struct dispatch_root_queue_context_s);
995
996 _dispatch_queue_init(dq);
997 if (label) {
998 dq->dq_label = strdup(label);
999 }
1000
1001 dq->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK;
1002 dq->do_ctxt = qc;
1003 dq->do_targetq = NULL;
1004 dq->dq_running = 2;
1005 dq->dq_width = UINT32_MAX;
1006
1007 pqc->dpq_thread_mediator.do_vtable = DISPATCH_VTABLE(semaphore);
1008 qc->dgq_thread_mediator = &pqc->dpq_thread_mediator;
1009 qc->dgq_ctxt = pqc;
1010 #if HAVE_PTHREAD_WORKQUEUES
1011 qc->dgq_kworkqueue = (void*)(~0ul);
1012 #endif
1013 _dispatch_root_queue_init_pthread_pool(qc, true); // rdar://11352331
1014
1015 if (attr) {
1016 memcpy(&pqc->dpq_thread_attr, attr, sizeof(pthread_attr_t));
1017 _dispatch_mgr_priority_raise(&pqc->dpq_thread_attr);
1018 } else {
1019 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
1020 }
1021 (void)dispatch_assume_zero(pthread_attr_setdetachstate(
1022 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
1023 if (configure) {
1024 pqc->dpq_thread_configure = _dispatch_Block_copy(configure);
1025 }
1026 _dispatch_object_debug(dq, "%s", __func__);
1027 return _dispatch_introspection_queue_create(dq);
1028 }
1029 #endif
1030
1031 void
1032 _dispatch_pthread_root_queue_dispose(dispatch_queue_t dq)
1033 {
1034 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
1035 DISPATCH_CRASH("Global root queue disposed");
1036 }
1037 _dispatch_object_debug(dq, "%s", __func__);
1038 _dispatch_introspection_queue_dispose(dq);
1039 #if DISPATCH_USE_PTHREAD_POOL
1040 dispatch_root_queue_context_t qc = dq->do_ctxt;
1041 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
1042
1043 _dispatch_semaphore_dispose(qc->dgq_thread_mediator);
1044 if (pqc->dpq_thread_configure) {
1045 Block_release(pqc->dpq_thread_configure);
1046 }
1047 dq->do_targetq = _dispatch_get_root_queue(0, false);
1048 #endif
1049 if (dq->dq_label) {
1050 free((void*)dq->dq_label);
1051 }
1052 _dispatch_queue_destroy(dq);
1053 }
1054
1055 #pragma mark -
1056 #pragma mark dispatch_queue_specific
1057
1058 struct dispatch_queue_specific_queue_s {
1059 DISPATCH_STRUCT_HEADER(queue_specific_queue);
1060 DISPATCH_QUEUE_HEADER;
1061 TAILQ_HEAD(dispatch_queue_specific_head_s,
1062 dispatch_queue_specific_s) dqsq_contexts;
1063 };
1064
1065 struct dispatch_queue_specific_s {
1066 const void *dqs_key;
1067 void *dqs_ctxt;
1068 dispatch_function_t dqs_destructor;
1069 TAILQ_ENTRY(dispatch_queue_specific_s) dqs_list;
1070 };
1071 DISPATCH_DECL(dispatch_queue_specific);
1072
1073 void
1074 _dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq)
1075 {
1076 dispatch_queue_specific_t dqs, tmp;
1077
1078 TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) {
1079 if (dqs->dqs_destructor) {
1080 dispatch_async_f(_dispatch_get_root_queue(
1081 DISPATCH_QUEUE_PRIORITY_DEFAULT, false), dqs->dqs_ctxt,
1082 dqs->dqs_destructor);
1083 }
1084 free(dqs);
1085 }
1086 _dispatch_queue_destroy((dispatch_queue_t)dqsq);
1087 }
1088
1089 static void
1090 _dispatch_queue_init_specific(dispatch_queue_t dq)
1091 {
1092 dispatch_queue_specific_queue_t dqsq;
1093
1094 dqsq = _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue),
1095 sizeof(struct dispatch_queue_specific_queue_s));
1096 _dispatch_queue_init((dispatch_queue_t)dqsq);
1097 dqsq->do_xref_cnt = -1;
1098 dqsq->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_HIGH,
1099 true);
1100 dqsq->dq_width = UINT32_MAX;
1101 dqsq->dq_label = "queue-specific";
1102 TAILQ_INIT(&dqsq->dqsq_contexts);
1103 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_specific_q, NULL,
1104 (dispatch_queue_t)dqsq, release))) {
1105 _dispatch_release((dispatch_queue_t)dqsq);
1106 }
1107 }
1108
1109 static void
1110 _dispatch_queue_set_specific(void *ctxt)
1111 {
1112 dispatch_queue_specific_t dqs, dqsn = ctxt;
1113 dispatch_queue_specific_queue_t dqsq =
1114 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
1115
1116 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
1117 if (dqs->dqs_key == dqsn->dqs_key) {
1118 // Destroy previous context for existing key
1119 if (dqs->dqs_destructor) {
1120 dispatch_async_f(_dispatch_get_root_queue(
1121 DISPATCH_QUEUE_PRIORITY_DEFAULT, false), dqs->dqs_ctxt,
1122 dqs->dqs_destructor);
1123 }
1124 if (dqsn->dqs_ctxt) {
1125 // Copy new context for existing key
1126 dqs->dqs_ctxt = dqsn->dqs_ctxt;
1127 dqs->dqs_destructor = dqsn->dqs_destructor;
1128 } else {
1129 // Remove context storage for existing key
1130 TAILQ_REMOVE(&dqsq->dqsq_contexts, dqs, dqs_list);
1131 free(dqs);
1132 }
1133 return free(dqsn);
1134 }
1135 }
1136 // Insert context storage for new key
1137 TAILQ_INSERT_TAIL(&dqsq->dqsq_contexts, dqsn, dqs_list);
1138 }
1139
1140 DISPATCH_NOINLINE
1141 void
1142 dispatch_queue_set_specific(dispatch_queue_t dq, const void *key,
1143 void *ctxt, dispatch_function_t destructor)
1144 {
1145 if (slowpath(!key)) {
1146 return;
1147 }
1148 dispatch_queue_specific_t dqs;
1149
1150 dqs = _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s));
1151 dqs->dqs_key = key;
1152 dqs->dqs_ctxt = ctxt;
1153 dqs->dqs_destructor = destructor;
1154 if (slowpath(!dq->dq_specific_q)) {
1155 _dispatch_queue_init_specific(dq);
1156 }
1157 _dispatch_barrier_trysync_f(dq->dq_specific_q, dqs,
1158 _dispatch_queue_set_specific);
1159 }
1160
1161 static void
1162 _dispatch_queue_get_specific(void *ctxt)
1163 {
1164 void **ctxtp = ctxt;
1165 void *key = *ctxtp;
1166 dispatch_queue_specific_queue_t dqsq =
1167 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
1168 dispatch_queue_specific_t dqs;
1169
1170 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
1171 if (dqs->dqs_key == key) {
1172 *ctxtp = dqs->dqs_ctxt;
1173 return;
1174 }
1175 }
1176 *ctxtp = NULL;
1177 }
1178
1179 DISPATCH_NOINLINE
1180 void *
1181 dispatch_queue_get_specific(dispatch_queue_t dq, const void *key)
1182 {
1183 if (slowpath(!key)) {
1184 return NULL;
1185 }
1186 void *ctxt = NULL;
1187
1188 if (fastpath(dq->dq_specific_q)) {
1189 ctxt = (void *)key;
1190 dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific);
1191 }
1192 return ctxt;
1193 }
1194
1195 DISPATCH_NOINLINE
1196 void *
1197 dispatch_get_specific(const void *key)
1198 {
1199 if (slowpath(!key)) {
1200 return NULL;
1201 }
1202 void *ctxt = NULL;
1203 dispatch_queue_t dq = _dispatch_queue_get_current();
1204
1205 while (slowpath(dq)) {
1206 if (slowpath(dq->dq_specific_q)) {
1207 ctxt = (void *)key;
1208 dispatch_sync_f(dq->dq_specific_q, &ctxt,
1209 _dispatch_queue_get_specific);
1210 if (ctxt) break;
1211 }
1212 dq = dq->do_targetq;
1213 }
1214 return ctxt;
1215 }
1216
1217 #pragma mark -
1218 #pragma mark dispatch_queue_debug
1219
1220 size_t
1221 _dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
1222 {
1223 size_t offset = 0;
1224 dispatch_queue_t target = dq->do_targetq;
1225 offset += dsnprintf(buf, bufsiz, "target = %s[%p], width = 0x%x, "
1226 "running = 0x%x, barrier = %d ", target && target->dq_label ?
1227 target->dq_label : "", target, dq->dq_width / 2,
1228 dq->dq_running / 2, dq->dq_running & 1);
1229 if (dq->dq_is_thread_bound) {
1230 offset += dsnprintf(buf, bufsiz, ", thread = %p ",
1231 _dispatch_queue_get_bound_thread(dq));
1232 }
1233 return offset;
1234 }
1235
1236 size_t
1237 dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
1238 {
1239 size_t offset = 0;
1240 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
1241 dq->dq_label ? dq->dq_label : dx_kind(dq), dq);
1242 offset += _dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
1243 offset += _dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
1244 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
1245 return offset;
1246 }
1247
1248 #if DISPATCH_DEBUG
1249 void
1250 dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
1251 if (fastpath(dq)) {
1252 _dispatch_object_debug(dq, "%s", str);
1253 } else {
1254 _dispatch_log("queue[NULL]: %s", str);
1255 }
1256 }
1257 #endif
1258
1259 #if DISPATCH_PERF_MON
1260 static OSSpinLock _dispatch_stats_lock;
1261 static struct {
1262 uint64_t time_total;
1263 uint64_t count_total;
1264 uint64_t thread_total;
1265 } _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set
1266
1267 static void
1268 _dispatch_queue_merge_stats(uint64_t start)
1269 {
1270 uint64_t avg, delta = _dispatch_absolute_time() - start;
1271 unsigned long count, bucket;
1272
1273 count = (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key);
1274 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
1275
1276 if (count) {
1277 avg = delta / count;
1278 bucket = flsll(avg);
1279 } else {
1280 bucket = 0;
1281 }
1282
1283 // 64-bit counters on 32-bit require a lock or a queue
1284 OSSpinLockLock(&_dispatch_stats_lock);
1285
1286 _dispatch_stats[bucket].time_total += delta;
1287 _dispatch_stats[bucket].count_total += count;
1288 _dispatch_stats[bucket].thread_total++;
1289
1290 OSSpinLockUnlock(&_dispatch_stats_lock);
1291 }
1292 #endif
1293
1294 #pragma mark -
1295 #pragma mark dispatch_continuation_t
1296
1297 static void
1298 _dispatch_force_cache_cleanup(void)
1299 {
1300 dispatch_continuation_t dc;
1301 dc = _dispatch_thread_getspecific(dispatch_cache_key);
1302 if (dc) {
1303 _dispatch_thread_setspecific(dispatch_cache_key, NULL);
1304 _dispatch_cache_cleanup(dc);
1305 }
1306 }
1307
1308 DISPATCH_NOINLINE
1309 static void
1310 _dispatch_cache_cleanup(void *value)
1311 {
1312 dispatch_continuation_t dc, next_dc = value;
1313
1314 while ((dc = next_dc)) {
1315 next_dc = dc->do_next;
1316 _dispatch_continuation_free_to_heap(dc);
1317 }
1318 }
1319
1320 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
1321 int _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT;
1322
1323 DISPATCH_NOINLINE
1324 void
1325 _dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc)
1326 {
1327 _dispatch_continuation_free_to_heap(dc);
1328 dispatch_continuation_t next_dc;
1329 dc = _dispatch_thread_getspecific(dispatch_cache_key);
1330 int cnt;
1331 if (!dc || (cnt = dc->do_ref_cnt-_dispatch_continuation_cache_limit) <= 0) {
1332 return;
1333 }
1334 do {
1335 next_dc = dc->do_next;
1336 _dispatch_continuation_free_to_heap(dc);
1337 } while (--cnt && (dc = next_dc));
1338 _dispatch_thread_setspecific(dispatch_cache_key, next_dc);
1339 }
1340 #endif
1341
1342 DISPATCH_ALWAYS_INLINE_NDEBUG
1343 static inline void
1344 _dispatch_continuation_redirect(dispatch_queue_t dq, dispatch_object_t dou)
1345 {
1346 dispatch_continuation_t dc = dou._dc;
1347
1348 _dispatch_trace_continuation_pop(dq, dou);
1349 (void)dispatch_atomic_add2o(dq, dq_running, 2, acquire);
1350 if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
1351 (long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT) {
1352 _dispatch_thread_semaphore_signal(
1353 (_dispatch_thread_semaphore_t)dc->dc_other);
1354 } else {
1355 _dispatch_async_f_redirect(dq, dc);
1356 }
1357 }
1358
1359 DISPATCH_ALWAYS_INLINE_NDEBUG
1360 static inline void
1361 _dispatch_continuation_pop(dispatch_object_t dou)
1362 {
1363 dispatch_continuation_t dc = dou._dc, dc1;
1364 dispatch_group_t dg;
1365
1366 _dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou);
1367 if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
1368 return dx_invoke(dou._do);
1369 }
1370
1371 // Add the item back to the cache before calling the function. This
1372 // allows the 'hot' continuation to be used for a quick callback.
1373 //
1374 // The ccache version is per-thread.
1375 // Therefore, the object has not been reused yet.
1376 // This generates better assembly.
1377 if ((long)dc->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
1378 dc1 = _dispatch_continuation_free_cacheonly(dc);
1379 } else {
1380 dc1 = NULL;
1381 }
1382 if ((long)dc->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
1383 dg = dc->dc_data;
1384 } else {
1385 dg = NULL;
1386 }
1387 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
1388 if (dg) {
1389 dispatch_group_leave(dg);
1390 _dispatch_release(dg);
1391 }
1392 if (slowpath(dc1)) {
1393 _dispatch_continuation_free_to_cache_limit(dc1);
1394 }
1395 }
1396
1397 #pragma mark -
1398 #pragma mark dispatch_barrier_async
1399
1400 DISPATCH_NOINLINE
1401 static void
1402 _dispatch_barrier_async_f_slow(dispatch_queue_t dq, void *ctxt,
1403 dispatch_function_t func)
1404 {
1405 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
1406
1407 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
1408 dc->dc_func = func;
1409 dc->dc_ctxt = ctxt;
1410
1411 _dispatch_queue_push(dq, dc);
1412 }
1413
1414 DISPATCH_NOINLINE
1415 void
1416 dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
1417 dispatch_function_t func)
1418 {
1419 dispatch_continuation_t dc;
1420
1421 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
1422 if (!dc) {
1423 return _dispatch_barrier_async_f_slow(dq, ctxt, func);
1424 }
1425
1426 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
1427 dc->dc_func = func;
1428 dc->dc_ctxt = ctxt;
1429
1430 _dispatch_queue_push(dq, dc);
1431 }
1432
1433 #ifdef __BLOCKS__
1434 void
1435 dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void))
1436 {
1437 dispatch_barrier_async_f(dq, _dispatch_Block_copy(work),
1438 _dispatch_call_block_and_release);
1439 }
1440 #endif
1441
1442 #pragma mark -
1443 #pragma mark dispatch_async
1444
1445 void
1446 _dispatch_async_redirect_invoke(void *ctxt)
1447 {
1448 struct dispatch_continuation_s *dc = ctxt;
1449 struct dispatch_continuation_s *other_dc = dc->dc_other;
1450 dispatch_queue_t old_dq, dq = dc->dc_data, rq;
1451
1452 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1453 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1454 _dispatch_continuation_pop(other_dc);
1455 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1456
1457 rq = dq->do_targetq;
1458 while (slowpath(rq->do_targetq) && rq != old_dq) {
1459 if (dispatch_atomic_sub2o(rq, dq_running, 2, relaxed) == 0) {
1460 _dispatch_wakeup(rq);
1461 }
1462 rq = rq->do_targetq;
1463 }
1464
1465 if (dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0) {
1466 _dispatch_wakeup(dq);
1467 }
1468 _dispatch_release(dq);
1469 }
1470
1471 static inline void
1472 _dispatch_async_f_redirect2(dispatch_queue_t dq, dispatch_continuation_t dc)
1473 {
1474 uint32_t running = 2;
1475
1476 // Find the queue to redirect to
1477 do {
1478 if (slowpath(dq->dq_items_tail) ||
1479 slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) ||
1480 slowpath(dq->dq_width == 1)) {
1481 break;
1482 }
1483 running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
1484 if (slowpath(running & 1) || slowpath(running > dq->dq_width)) {
1485 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
1486 break;
1487 }
1488 dq = dq->do_targetq;
1489 } while (slowpath(dq->do_targetq));
1490
1491 _dispatch_queue_push_wakeup(dq, dc, running == 0);
1492 }
1493
1494 DISPATCH_NOINLINE
1495 static void
1496 _dispatch_async_f_redirect(dispatch_queue_t dq,
1497 dispatch_continuation_t other_dc)
1498 {
1499 dispatch_continuation_t dc = _dispatch_continuation_alloc();
1500
1501 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1502 dc->dc_func = _dispatch_async_redirect_invoke;
1503 dc->dc_ctxt = dc;
1504 dc->dc_data = dq;
1505 dc->dc_other = other_dc;
1506
1507 _dispatch_retain(dq);
1508 dq = dq->do_targetq;
1509 if (slowpath(dq->do_targetq)) {
1510 return _dispatch_async_f_redirect2(dq, dc);
1511 }
1512
1513 _dispatch_queue_push(dq, dc);
1514 }
1515
1516 DISPATCH_NOINLINE
1517 static void
1518 _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
1519 {
1520 uint32_t running = 2;
1521
1522 do {
1523 if (slowpath(dq->dq_items_tail)
1524 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
1525 break;
1526 }
1527 running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
1528 if (slowpath(running > dq->dq_width)) {
1529 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
1530 break;
1531 }
1532 if (!slowpath(running & 1)) {
1533 return _dispatch_async_f_redirect(dq, dc);
1534 }
1535 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
1536 // We might get lucky and find that the barrier has ended by now
1537 } while (!(running & 1));
1538
1539 _dispatch_queue_push_wakeup(dq, dc, running == 0);
1540 }
1541
1542 DISPATCH_NOINLINE
1543 static void
1544 _dispatch_async_f_slow(dispatch_queue_t dq, void *ctxt,
1545 dispatch_function_t func)
1546 {
1547 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap();
1548
1549 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1550 dc->dc_func = func;
1551 dc->dc_ctxt = ctxt;
1552
1553 // No fastpath/slowpath hint because we simply don't know
1554 if (dq->do_targetq) {
1555 return _dispatch_async_f2(dq, dc);
1556 }
1557
1558 _dispatch_queue_push(dq, dc);
1559 }
1560
1561 DISPATCH_NOINLINE
1562 void
1563 dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
1564 {
1565 dispatch_continuation_t dc;
1566
1567 // No fastpath/slowpath hint because we simply don't know
1568 if (dq->dq_width == 1) {
1569 return dispatch_barrier_async_f(dq, ctxt, func);
1570 }
1571
1572 dc = fastpath(_dispatch_continuation_alloc_cacheonly());
1573 if (!dc) {
1574 return _dispatch_async_f_slow(dq, ctxt, func);
1575 }
1576
1577 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1578 dc->dc_func = func;
1579 dc->dc_ctxt = ctxt;
1580
1581 // No fastpath/slowpath hint because we simply don't know
1582 if (dq->do_targetq) {
1583 return _dispatch_async_f2(dq, dc);
1584 }
1585
1586 _dispatch_queue_push(dq, dc);
1587 }
1588
1589 #ifdef __BLOCKS__
1590 void
1591 dispatch_async(dispatch_queue_t dq, void (^work)(void))
1592 {
1593 dispatch_async_f(dq, _dispatch_Block_copy(work),
1594 _dispatch_call_block_and_release);
1595 }
1596 #endif
1597
1598 #pragma mark -
1599 #pragma mark dispatch_group_async
1600
1601 DISPATCH_NOINLINE
1602 void
1603 dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
1604 dispatch_function_t func)
1605 {
1606 dispatch_continuation_t dc;
1607
1608 _dispatch_retain(dg);
1609 dispatch_group_enter(dg);
1610
1611 dc = _dispatch_continuation_alloc();
1612
1613 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_GROUP_BIT);
1614 dc->dc_func = func;
1615 dc->dc_ctxt = ctxt;
1616 dc->dc_data = dg;
1617
1618 // No fastpath/slowpath hint because we simply don't know
1619 if (dq->dq_width != 1 && dq->do_targetq) {
1620 return _dispatch_async_f2(dq, dc);
1621 }
1622
1623 _dispatch_queue_push(dq, dc);
1624 }
1625
1626 #ifdef __BLOCKS__
1627 void
1628 dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
1629 dispatch_block_t db)
1630 {
1631 dispatch_group_async_f(dg, dq, _dispatch_Block_copy(db),
1632 _dispatch_call_block_and_release);
1633 }
1634 #endif
1635
1636 #pragma mark -
1637 #pragma mark dispatch_function_invoke
1638
1639 DISPATCH_ALWAYS_INLINE
1640 static inline void
1641 _dispatch_function_invoke(dispatch_queue_t dq, void *ctxt,
1642 dispatch_function_t func)
1643 {
1644 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1645 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1646 _dispatch_client_callout(ctxt, func);
1647 _dispatch_perfmon_workitem_inc();
1648 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1649 }
1650
1651 void
1652 _dispatch_sync_recurse_invoke(void *ctxt)
1653 {
1654 dispatch_continuation_t dc = ctxt;
1655 _dispatch_function_invoke(dc->dc_data, dc->dc_ctxt, dc->dc_func);
1656 }
1657
1658 DISPATCH_ALWAYS_INLINE
1659 static inline void
1660 _dispatch_function_recurse(dispatch_queue_t dq, void *ctxt,
1661 dispatch_function_t func)
1662 {
1663 struct dispatch_continuation_s dc = {
1664 .dc_data = dq,
1665 .dc_func = func,
1666 .dc_ctxt = ctxt,
1667 };
1668 dispatch_sync_f(dq->do_targetq, &dc, _dispatch_sync_recurse_invoke);
1669 }
1670
1671 #pragma mark -
1672 #pragma mark dispatch_barrier_sync
1673
1674 static void _dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
1675 dispatch_function_t func);
1676
1677 DISPATCH_ALWAYS_INLINE_NDEBUG
1678 static inline _dispatch_thread_semaphore_t
1679 _dispatch_barrier_sync_f_pop(dispatch_queue_t dq, dispatch_object_t dou,
1680 bool lock)
1681 {
1682 _dispatch_thread_semaphore_t sema;
1683 dispatch_continuation_t dc = dou._dc;
1684
1685 if (DISPATCH_OBJ_IS_VTABLE(dc) || ((long)dc->do_vtable &
1686 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) !=
1687 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) {
1688 return 0;
1689 }
1690 _dispatch_trace_continuation_pop(dq, dc);
1691 _dispatch_perfmon_workitem_inc();
1692
1693 dc = dc->dc_ctxt;
1694 dq = dc->dc_data;
1695 sema = (_dispatch_thread_semaphore_t)dc->dc_other;
1696 if (lock) {
1697 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
1698 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
1699 // rdar://problem/9032024 running lock must be held until sync_f_slow
1700 // returns
1701 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
1702 }
1703 return sema ? sema : MACH_PORT_DEAD;
1704 }
1705
1706 static void
1707 _dispatch_barrier_sync_f_slow_invoke(void *ctxt)
1708 {
1709 dispatch_continuation_t dc = ctxt;
1710 dispatch_queue_t dq = dc->dc_data;
1711 _dispatch_thread_semaphore_t sema;
1712 sema = (_dispatch_thread_semaphore_t)dc->dc_other;
1713
1714 dispatch_assert(dq == _dispatch_queue_get_current());
1715 #if DISPATCH_COCOA_COMPAT
1716 if (slowpath(dq->dq_is_thread_bound)) {
1717 // The queue is bound to a non-dispatch thread (e.g. main thread)
1718 dc->dc_func(dc->dc_ctxt);
1719 dispatch_atomic_store2o(dc, dc_func, NULL, release);
1720 _dispatch_thread_semaphore_signal(sema); // release
1721 return;
1722 }
1723 #endif
1724 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
1725 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
1726 // rdar://9032024 running lock must be held until sync_f_slow returns
1727 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
1728 _dispatch_thread_semaphore_signal(sema); // release
1729 }
1730
1731 DISPATCH_NOINLINE
1732 static void
1733 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt,
1734 dispatch_function_t func)
1735 {
1736 if (slowpath(!dq->do_targetq)) {
1737 // the global concurrent queues do not need strict ordering
1738 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
1739 return _dispatch_sync_f_invoke(dq, ctxt, func);
1740 }
1741 // It's preferred to execute synchronous blocks on the current thread
1742 // due to thread-local side effects, garbage collection, etc. However,
1743 // blocks submitted to the main thread MUST be run on the main thread
1744
1745 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
1746 struct dispatch_continuation_s dc = {
1747 .dc_data = dq,
1748 #if DISPATCH_COCOA_COMPAT
1749 .dc_func = func,
1750 .dc_ctxt = ctxt,
1751 #endif
1752 .dc_other = (void*)sema,
1753 };
1754 struct dispatch_continuation_s dbss = {
1755 .do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT |
1756 DISPATCH_OBJ_SYNC_SLOW_BIT),
1757 .dc_func = _dispatch_barrier_sync_f_slow_invoke,
1758 .dc_ctxt = &dc,
1759 #if DISPATCH_INTROSPECTION
1760 .dc_data = (void*)_dispatch_thread_self(),
1761 #endif
1762 };
1763 _dispatch_queue_push(dq, &dbss);
1764
1765 _dispatch_thread_semaphore_wait(sema); // acquire
1766 _dispatch_put_thread_semaphore(sema);
1767
1768 #if DISPATCH_COCOA_COMPAT
1769 // Queue bound to a non-dispatch thread
1770 if (dc.dc_func == NULL) {
1771 return;
1772 }
1773 #endif
1774 if (slowpath(dq->do_targetq->do_targetq)) {
1775 _dispatch_function_recurse(dq, ctxt, func);
1776 } else {
1777 _dispatch_function_invoke(dq, ctxt, func);
1778 }
1779 if (fastpath(dq->do_suspend_cnt < 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL) &&
1780 dq->dq_running == 2) {
1781 // rdar://problem/8290662 "lock transfer"
1782 sema = _dispatch_queue_drain_one_barrier_sync(dq);
1783 if (sema) {
1784 _dispatch_thread_semaphore_signal(sema); // release
1785 return;
1786 }
1787 }
1788 (void)dispatch_atomic_sub2o(dq, do_suspend_cnt,
1789 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
1790 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, release) == 0)) {
1791 _dispatch_wakeup(dq);
1792 }
1793 }
1794
1795 DISPATCH_NOINLINE
1796 static void
1797 _dispatch_barrier_sync_f2(dispatch_queue_t dq)
1798 {
1799 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
1800 // rdar://problem/8290662 "lock transfer"
1801 _dispatch_thread_semaphore_t sema;
1802 sema = _dispatch_queue_drain_one_barrier_sync(dq);
1803 if (sema) {
1804 (void)dispatch_atomic_add2o(dq, do_suspend_cnt,
1805 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed);
1806 // rdar://9032024 running lock must be held until sync_f_slow
1807 // returns: increment by 2 and decrement by 1
1808 (void)dispatch_atomic_inc2o(dq, dq_running, relaxed);
1809 _dispatch_thread_semaphore_signal(sema);
1810 return;
1811 }
1812 }
1813 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
1814 _dispatch_wakeup(dq);
1815 }
1816 }
1817
1818 DISPATCH_NOINLINE
1819 static void
1820 _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
1821 dispatch_function_t func)
1822 {
1823 _dispatch_function_invoke(dq, ctxt, func);
1824 if (slowpath(dq->dq_items_tail)) {
1825 return _dispatch_barrier_sync_f2(dq);
1826 }
1827 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
1828 _dispatch_wakeup(dq);
1829 }
1830 }
1831
1832 DISPATCH_NOINLINE
1833 static void
1834 _dispatch_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
1835 dispatch_function_t func)
1836 {
1837 _dispatch_function_recurse(dq, ctxt, func);
1838 if (slowpath(dq->dq_items_tail)) {
1839 return _dispatch_barrier_sync_f2(dq);
1840 }
1841 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
1842 _dispatch_wakeup(dq);
1843 }
1844 }
1845
1846 DISPATCH_NOINLINE
1847 void
1848 dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
1849 dispatch_function_t func)
1850 {
1851 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1852 // 2) the queue is not suspended
1853 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
1854 return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
1855 }
1856 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, acquire))) {
1857 // global concurrent queues and queues bound to non-dispatch threads
1858 // always fall into the slow case
1859 return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
1860 }
1861 if (slowpath(dq->do_targetq->do_targetq)) {
1862 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func);
1863 }
1864 _dispatch_barrier_sync_f_invoke(dq, ctxt, func);
1865 }
1866
1867 #ifdef __BLOCKS__
1868 #if DISPATCH_COCOA_COMPAT
1869 DISPATCH_NOINLINE
1870 static void
1871 _dispatch_barrier_sync_slow(dispatch_queue_t dq, void (^work)(void))
1872 {
1873 // Blocks submitted to the main queue MUST be run on the main thread,
1874 // therefore under GC we must Block_copy in order to notify the thread-local
1875 // garbage collector that the objects are transferring to the main thread
1876 // rdar://problem/7176237&7181849&7458685
1877 if (dispatch_begin_thread_4GC) {
1878 dispatch_block_t block = _dispatch_Block_copy(work);
1879 return dispatch_barrier_sync_f(dq, block,
1880 _dispatch_call_block_and_release);
1881 }
1882 dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work));
1883 }
1884 #endif
1885
1886 void
1887 dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void))
1888 {
1889 #if DISPATCH_COCOA_COMPAT
1890 if (slowpath(dq->dq_is_thread_bound)) {
1891 return _dispatch_barrier_sync_slow(dq, work);
1892 }
1893 #endif
1894 dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work));
1895 }
1896 #endif
1897
1898 DISPATCH_NOINLINE
1899 static void
1900 _dispatch_barrier_trysync_f_invoke(dispatch_queue_t dq, void *ctxt,
1901 dispatch_function_t func)
1902 {
1903 _dispatch_function_invoke(dq, ctxt, func);
1904 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
1905 _dispatch_wakeup(dq);
1906 }
1907 }
1908
1909 DISPATCH_NOINLINE
1910 void
1911 _dispatch_barrier_trysync_f(dispatch_queue_t dq, void *ctxt,
1912 dispatch_function_t func)
1913 {
1914 // Use for mutation of queue-/source-internal state only, ignores target
1915 // queue hierarchy!
1916 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))
1917 || slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1,
1918 acquire))) {
1919 return dispatch_barrier_async_f(dq, ctxt, func);
1920 }
1921 _dispatch_barrier_trysync_f_invoke(dq, ctxt, func);
1922 }
1923
1924 #pragma mark -
1925 #pragma mark dispatch_sync
1926
1927 DISPATCH_NOINLINE
1928 static void
1929 _dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
1930 bool wakeup)
1931 {
1932 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
1933 struct dispatch_continuation_s dss = {
1934 .do_vtable = (void*)DISPATCH_OBJ_SYNC_SLOW_BIT,
1935 #if DISPATCH_INTROSPECTION
1936 .dc_func = func,
1937 .dc_ctxt = ctxt,
1938 .dc_data = (void*)_dispatch_thread_self(),
1939 #endif
1940 .dc_other = (void*)sema,
1941 };
1942 _dispatch_queue_push_wakeup(dq, &dss, wakeup);
1943
1944 _dispatch_thread_semaphore_wait(sema);
1945 _dispatch_put_thread_semaphore(sema);
1946
1947 if (slowpath(dq->do_targetq->do_targetq)) {
1948 _dispatch_function_recurse(dq, ctxt, func);
1949 } else {
1950 _dispatch_function_invoke(dq, ctxt, func);
1951 }
1952 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
1953 _dispatch_wakeup(dq);
1954 }
1955 }
1956
1957 DISPATCH_NOINLINE
1958 static void
1959 _dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
1960 dispatch_function_t func)
1961 {
1962 _dispatch_function_invoke(dq, ctxt, func);
1963 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
1964 _dispatch_wakeup(dq);
1965 }
1966 }
1967
1968 DISPATCH_NOINLINE
1969 static void
1970 _dispatch_sync_f_recurse(dispatch_queue_t dq, void *ctxt,
1971 dispatch_function_t func)
1972 {
1973 _dispatch_function_recurse(dq, ctxt, func);
1974 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
1975 _dispatch_wakeup(dq);
1976 }
1977 }
1978
1979 static inline void
1980 _dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
1981 {
1982 // 1) ensure that this thread hasn't enqueued anything ahead of this call
1983 // 2) the queue is not suspended
1984 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
1985 return _dispatch_sync_f_slow(dq, ctxt, func, false);
1986 }
1987 uint32_t running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
1988 if (slowpath(running & 1)) {
1989 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
1990 return _dispatch_sync_f_slow(dq, ctxt, func, running == 0);
1991 }
1992 if (slowpath(dq->do_targetq->do_targetq)) {
1993 return _dispatch_sync_f_recurse(dq, ctxt, func);
1994 }
1995 _dispatch_sync_f_invoke(dq, ctxt, func);
1996 }
1997
1998 DISPATCH_NOINLINE
1999 void
2000 dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
2001 {
2002 if (fastpath(dq->dq_width == 1)) {
2003 return dispatch_barrier_sync_f(dq, ctxt, func);
2004 }
2005 if (slowpath(!dq->do_targetq)) {
2006 // the global concurrent queues do not need strict ordering
2007 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
2008 return _dispatch_sync_f_invoke(dq, ctxt, func);
2009 }
2010 _dispatch_sync_f2(dq, ctxt, func);
2011 }
2012
2013 #ifdef __BLOCKS__
2014 #if DISPATCH_COCOA_COMPAT
2015 DISPATCH_NOINLINE
2016 static void
2017 _dispatch_sync_slow(dispatch_queue_t dq, void (^work)(void))
2018 {
2019 // Blocks submitted to the main queue MUST be run on the main thread,
2020 // therefore under GC we must Block_copy in order to notify the thread-local
2021 // garbage collector that the objects are transferring to the main thread
2022 // rdar://problem/7176237&7181849&7458685
2023 if (dispatch_begin_thread_4GC) {
2024 dispatch_block_t block = _dispatch_Block_copy(work);
2025 return dispatch_sync_f(dq, block, _dispatch_call_block_and_release);
2026 }
2027 dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
2028 }
2029 #endif
2030
2031 void
2032 dispatch_sync(dispatch_queue_t dq, void (^work)(void))
2033 {
2034 #if DISPATCH_COCOA_COMPAT
2035 if (slowpath(dq->dq_is_thread_bound)) {
2036 return _dispatch_sync_slow(dq, work);
2037 }
2038 #endif
2039 dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
2040 }
2041 #endif
2042
2043 #pragma mark -
2044 #pragma mark dispatch_after
2045
2046 void
2047 _dispatch_after_timer_callback(void *ctxt)
2048 {
2049 dispatch_continuation_t dc = ctxt, dc1;
2050 dispatch_source_t ds = dc->dc_data;
2051 dc1 = _dispatch_continuation_free_cacheonly(dc);
2052 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
2053 dispatch_source_cancel(ds);
2054 dispatch_release(ds);
2055 if (slowpath(dc1)) {
2056 _dispatch_continuation_free_to_cache_limit(dc1);
2057 }
2058 }
2059
2060 DISPATCH_NOINLINE
2061 void
2062 dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt,
2063 dispatch_function_t func)
2064 {
2065 uint64_t delta, leeway;
2066 dispatch_source_t ds;
2067
2068 if (when == DISPATCH_TIME_FOREVER) {
2069 #if DISPATCH_DEBUG
2070 DISPATCH_CLIENT_CRASH(
2071 "dispatch_after_f() called with 'when' == infinity");
2072 #endif
2073 return;
2074 }
2075
2076 delta = _dispatch_timeout(when);
2077 if (delta == 0) {
2078 return dispatch_async_f(queue, ctxt, func);
2079 }
2080 leeway = delta / 10; // <rdar://problem/13447496>
2081 if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
2082 if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
2083
2084 // this function can and should be optimized to not use a dispatch source
2085 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);
2086 dispatch_assert(ds);
2087
2088 dispatch_continuation_t dc = _dispatch_continuation_alloc();
2089 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
2090 dc->dc_func = func;
2091 dc->dc_ctxt = ctxt;
2092 dc->dc_data = ds;
2093
2094 dispatch_set_context(ds, dc);
2095 dispatch_source_set_event_handler_f(ds, _dispatch_after_timer_callback);
2096 dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, leeway);
2097 dispatch_resume(ds);
2098 }
2099
2100 #ifdef __BLOCKS__
2101 void
2102 dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
2103 dispatch_block_t work)
2104 {
2105 // test before the copy of the block
2106 if (when == DISPATCH_TIME_FOREVER) {
2107 #if DISPATCH_DEBUG
2108 DISPATCH_CLIENT_CRASH(
2109 "dispatch_after() called with 'when' == infinity");
2110 #endif
2111 return;
2112 }
2113 dispatch_after_f(when, queue, _dispatch_Block_copy(work),
2114 _dispatch_call_block_and_release);
2115 }
2116 #endif
2117
2118 #pragma mark -
2119 #pragma mark dispatch_queue_push
2120
2121 DISPATCH_NOINLINE
2122 static void
2123 _dispatch_queue_push_list_slow2(dispatch_queue_t dq,
2124 struct dispatch_object_s *obj)
2125 {
2126 // The queue must be retained before dq_items_head is written in order
2127 // to ensure that the reference is still valid when _dispatch_wakeup is
2128 // called. Otherwise, if preempted between the assignment to
2129 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
2130 // queue may release the last reference to the queue when invoked by
2131 // _dispatch_queue_drain. <rdar://problem/6932776>
2132 _dispatch_retain(dq);
2133 dq->dq_items_head = obj;
2134 _dispatch_wakeup(dq);
2135 _dispatch_release(dq);
2136 }
2137
2138 DISPATCH_NOINLINE
2139 void
2140 _dispatch_queue_push_list_slow(dispatch_queue_t dq,
2141 struct dispatch_object_s *obj, unsigned int n)
2142 {
2143 if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) {
2144 dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed);
2145 return _dispatch_queue_wakeup_global2(dq, n);
2146 }
2147 _dispatch_queue_push_list_slow2(dq, obj);
2148 }
2149
2150 DISPATCH_NOINLINE
2151 void
2152 _dispatch_queue_push_slow(dispatch_queue_t dq,
2153 struct dispatch_object_s *obj)
2154 {
2155 if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) {
2156 dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed);
2157 return _dispatch_queue_wakeup_global(dq);
2158 }
2159 _dispatch_queue_push_list_slow2(dq, obj);
2160 }
2161
2162 #pragma mark -
2163 #pragma mark dispatch_queue_probe
2164
2165 unsigned long
2166 _dispatch_queue_probe(dispatch_queue_t dq)
2167 {
2168 return (unsigned long)slowpath(dq->dq_items_tail != NULL);
2169 }
2170
2171 #if DISPATCH_COCOA_COMPAT
2172 unsigned long
2173 _dispatch_runloop_queue_probe(dispatch_queue_t dq)
2174 {
2175 if (_dispatch_queue_probe(dq)) {
2176 if (dq->do_xref_cnt == -1) return true; // <rdar://problem/14026816>
2177 return _dispatch_runloop_queue_wakeup(dq);
2178 }
2179 return false;
2180 }
2181 #endif
2182
2183 unsigned long
2184 _dispatch_mgr_queue_probe(dispatch_queue_t dq)
2185 {
2186 if (_dispatch_queue_probe(dq)) {
2187 return _dispatch_mgr_wakeup(dq);
2188 }
2189 return false;
2190 }
2191
2192 unsigned long
2193 _dispatch_root_queue_probe(dispatch_queue_t dq)
2194 {
2195 _dispatch_queue_wakeup_global(dq);
2196 return false;
2197 }
2198
2199 #pragma mark -
2200 #pragma mark dispatch_wakeup
2201
2202 // 6618342 Contact the team that owns the Instrument DTrace probe before
2203 // renaming this symbol
2204 dispatch_queue_t
2205 _dispatch_wakeup(dispatch_object_t dou)
2206 {
2207 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
2208 return NULL;
2209 }
2210 if (!dx_probe(dou._do)) {
2211 return NULL;
2212 }
2213 if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,
2214 DISPATCH_OBJECT_SUSPEND_LOCK, release)) {
2215 #if DISPATCH_COCOA_COMPAT
2216 if (dou._dq == &_dispatch_main_q) {
2217 return _dispatch_main_queue_wakeup();
2218 }
2219 #endif
2220 return NULL;
2221 }
2222 _dispatch_retain(dou._do);
2223 dispatch_queue_t tq = dou._do->do_targetq;
2224 _dispatch_queue_push(tq, dou._do);
2225 return tq; // libdispatch does not need this, but the Instrument DTrace
2226 // probe does
2227 }
2228
2229 #if DISPATCH_COCOA_COMPAT
2230 static inline void
2231 _dispatch_runloop_queue_wakeup_thread(dispatch_queue_t dq)
2232 {
2233 mach_port_t mp = (mach_port_t)dq->do_ctxt;
2234 if (!mp) {
2235 return;
2236 }
2237 kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0);
2238 switch (kr) {
2239 case MACH_SEND_TIMEOUT:
2240 case MACH_SEND_TIMED_OUT:
2241 case MACH_SEND_INVALID_DEST:
2242 break;
2243 default:
2244 (void)dispatch_assume_zero(kr);
2245 break;
2246 }
2247 }
2248
2249 DISPATCH_NOINLINE DISPATCH_WEAK
2250 unsigned long
2251 _dispatch_runloop_queue_wakeup(dispatch_queue_t dq)
2252 {
2253 _dispatch_runloop_queue_wakeup_thread(dq);
2254 return false;
2255 }
2256
2257 DISPATCH_NOINLINE
2258 static dispatch_queue_t
2259 _dispatch_main_queue_wakeup(void)
2260 {
2261 dispatch_queue_t dq = &_dispatch_main_q;
2262 if (!dq->dq_is_thread_bound) {
2263 return NULL;
2264 }
2265 dispatch_once_f(&_dispatch_main_q_port_pred, dq,
2266 _dispatch_runloop_queue_port_init);
2267 _dispatch_runloop_queue_wakeup_thread(dq);
2268 return NULL;
2269 }
2270 #endif
2271
2272 DISPATCH_NOINLINE
2273 static void
2274 _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq, unsigned int n)
2275 {
2276 static dispatch_once_t pred;
2277 dispatch_root_queue_context_t qc = dq->do_ctxt;
2278 uint32_t i = n;
2279 int r;
2280
2281 _dispatch_debug_root_queue(dq, __func__);
2282 dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);
2283
2284 #if HAVE_PTHREAD_WORKQUEUES
2285 #if DISPATCH_USE_PTHREAD_POOL
2286 if (qc->dgq_kworkqueue != (void*)(~0ul))
2287 #endif
2288 {
2289 _dispatch_root_queue_debug("requesting new worker thread for global "
2290 "queue: %p", dq);
2291 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
2292 if (qc->dgq_kworkqueue) {
2293 pthread_workitem_handle_t wh;
2294 unsigned int gen_cnt;
2295 do {
2296 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
2297 _dispatch_worker_thread3, dq, &wh, &gen_cnt);
2298 (void)dispatch_assume_zero(r);
2299 } while (--i);
2300 return;
2301 }
2302 #endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
2303 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2304 r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
2305 qc->dgq_wq_options, (int)i);
2306 (void)dispatch_assume_zero(r);
2307 #endif
2308 return;
2309 }
2310 #endif // HAVE_PTHREAD_WORKQUEUES
2311 #if DISPATCH_USE_PTHREAD_POOL
2312 if (fastpath(qc->dgq_thread_mediator)) {
2313 while (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
2314 if (!--i) {
2315 return;
2316 }
2317 }
2318 }
2319 uint32_t j, t_count = qc->dgq_thread_pool_size;
2320 do {
2321 if (!t_count) {
2322 _dispatch_root_queue_debug("pthread pool is full for root queue: "
2323 "%p", dq);
2324 return;
2325 }
2326 j = i > t_count ? t_count : i;
2327 } while (!dispatch_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count,
2328 t_count - j, &t_count, relaxed));
2329
2330 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
2331 pthread_attr_t *attr = pqc ? &pqc->dpq_thread_attr : NULL;
2332 pthread_t tid, *pthr = &tid;
2333 #if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
2334 if (slowpath(dq == &_dispatch_mgr_root_queue)) {
2335 pthr = _dispatch_mgr_root_queue_init();
2336 }
2337 #endif
2338 do {
2339 _dispatch_retain(dq);
2340 while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
2341 if (r != EAGAIN) {
2342 (void)dispatch_assume_zero(r);
2343 }
2344 _dispatch_temporary_resource_shortage();
2345 }
2346 if (!attr) {
2347 r = pthread_detach(*pthr);
2348 (void)dispatch_assume_zero(r);
2349 }
2350 } while (--j);
2351 #endif // DISPATCH_USE_PTHREAD_POOL
2352 }
2353
2354 static inline void
2355 _dispatch_queue_wakeup_global2(dispatch_queue_t dq, unsigned int n)
2356 {
2357 if (!dq->dq_items_tail) {
2358 return;
2359 }
2360 #if HAVE_PTHREAD_WORKQUEUES
2361 dispatch_root_queue_context_t qc = dq->do_ctxt;
2362 if (
2363 #if DISPATCH_USE_PTHREAD_POOL
2364 (qc->dgq_kworkqueue != (void*)(~0ul)) &&
2365 #endif
2366 !dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) {
2367 _dispatch_root_queue_debug("worker thread request still pending for "
2368 "global queue: %p", dq);
2369 return;
2370 }
2371 #endif // HAVE_PTHREAD_WORKQUEUES
2372 return _dispatch_queue_wakeup_global_slow(dq, n);
2373 }
2374
2375 static inline void
2376 _dispatch_queue_wakeup_global(dispatch_queue_t dq)
2377 {
2378 return _dispatch_queue_wakeup_global2(dq, 1);
2379 }
2380
2381 #pragma mark -
2382 #pragma mark dispatch_queue_invoke
2383
2384 DISPATCH_ALWAYS_INLINE
2385 static inline dispatch_queue_t
2386 dispatch_queue_invoke2(dispatch_object_t dou,
2387 _dispatch_thread_semaphore_t *sema_ptr)
2388 {
2389 dispatch_queue_t dq = dou._dq;
2390 dispatch_queue_t otq = dq->do_targetq;
2391 *sema_ptr = _dispatch_queue_drain(dq);
2392
2393 if (slowpath(otq != dq->do_targetq)) {
2394 // An item on the queue changed the target queue
2395 return dq->do_targetq;
2396 }
2397 return NULL;
2398 }
2399
2400 // 6618342 Contact the team that owns the Instrument DTrace probe before
2401 // renaming this symbol
2402 DISPATCH_NOINLINE
2403 void
2404 _dispatch_queue_invoke(dispatch_queue_t dq)
2405 {
2406 _dispatch_queue_class_invoke(dq, dispatch_queue_invoke2);
2407 }
2408
2409 #pragma mark -
2410 #pragma mark dispatch_queue_drain
2411
2412 DISPATCH_ALWAYS_INLINE
2413 static inline struct dispatch_object_s*
2414 _dispatch_queue_head(dispatch_queue_t dq)
2415 {
2416 struct dispatch_object_s *dc;
2417 while (!(dc = fastpath(dq->dq_items_head))) {
2418 dispatch_hardware_pause();
2419 }
2420 return dc;
2421 }
2422
2423 DISPATCH_ALWAYS_INLINE
2424 static inline struct dispatch_object_s*
2425 _dispatch_queue_next(dispatch_queue_t dq, struct dispatch_object_s *dc)
2426 {
2427 struct dispatch_object_s *next_dc;
2428 next_dc = fastpath(dc->do_next);
2429 dq->dq_items_head = next_dc;
2430 if (!next_dc && !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, dc, NULL,
2431 relaxed)) {
2432 // Enqueue is TIGHTLY controlled, we won't wait long.
2433 while (!(next_dc = fastpath(dc->do_next))) {
2434 dispatch_hardware_pause();
2435 }
2436 dq->dq_items_head = next_dc;
2437 }
2438 return next_dc;
2439 }
2440
2441 _dispatch_thread_semaphore_t
2442 _dispatch_queue_drain(dispatch_object_t dou)
2443 {
2444 dispatch_queue_t dq = dou._dq, orig_tq, old_dq;
2445 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2446 struct dispatch_object_s *dc, *next_dc;
2447 _dispatch_thread_semaphore_t sema = 0;
2448
2449 // Continue draining sources after target queue change rdar://8928171
2450 bool check_tq = (dx_type(dq) != DISPATCH_SOURCE_KEVENT_TYPE);
2451
2452 orig_tq = dq->do_targetq;
2453
2454 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2455 //dispatch_debug_queue(dq, __func__);
2456
2457 while (dq->dq_items_tail) {
2458 dc = _dispatch_queue_head(dq);
2459 do {
2460 if (DISPATCH_OBJECT_SUSPENDED(dq)) {
2461 goto out;
2462 }
2463 if (dq->dq_running > dq->dq_width) {
2464 goto out;
2465 }
2466 if (slowpath(orig_tq != dq->do_targetq) && check_tq) {
2467 goto out;
2468 }
2469 bool redirect = false;
2470 if (!fastpath(dq->dq_width == 1)) {
2471 if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
2472 (long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
2473 if (dq->dq_running > 1) {
2474 goto out;
2475 }
2476 } else {
2477 redirect = true;
2478 }
2479 }
2480 next_dc = _dispatch_queue_next(dq, dc);
2481 if (redirect) {
2482 _dispatch_continuation_redirect(dq, dc);
2483 continue;
2484 }
2485 if ((sema = _dispatch_barrier_sync_f_pop(dq, dc, true))) {
2486 goto out;
2487 }
2488 _dispatch_continuation_pop(dc);
2489 _dispatch_perfmon_workitem_inc();
2490 } while ((dc = next_dc));
2491 }
2492
2493 out:
2494 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2495 return sema;
2496 }
2497
2498 #if DISPATCH_COCOA_COMPAT
2499 static void
2500 _dispatch_main_queue_drain(void)
2501 {
2502 dispatch_queue_t dq = &_dispatch_main_q;
2503 if (!dq->dq_items_tail) {
2504 return;
2505 }
2506 struct dispatch_continuation_s marker = {
2507 .do_vtable = NULL,
2508 };
2509 struct dispatch_object_s *dmarker = (void*)&marker;
2510 _dispatch_queue_push_notrace(dq, dmarker);
2511
2512 _dispatch_perfmon_start();
2513 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2514 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2515
2516 struct dispatch_object_s *dc, *next_dc;
2517 dc = _dispatch_queue_head(dq);
2518 do {
2519 next_dc = _dispatch_queue_next(dq, dc);
2520 if (dc == dmarker) {
2521 goto out;
2522 }
2523 _dispatch_continuation_pop(dc);
2524 _dispatch_perfmon_workitem_inc();
2525 } while ((dc = next_dc));
2526 DISPATCH_CRASH("Main queue corruption");
2527
2528 out:
2529 if (next_dc) {
2530 _dispatch_main_queue_wakeup();
2531 }
2532 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2533 _dispatch_perfmon_end();
2534 _dispatch_force_cache_cleanup();
2535 }
2536
2537 static bool
2538 _dispatch_runloop_queue_drain_one(dispatch_queue_t dq)
2539 {
2540 if (!dq->dq_items_tail) {
2541 return false;
2542 }
2543 _dispatch_perfmon_start();
2544 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
2545 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2546
2547 struct dispatch_object_s *dc, *next_dc;
2548 dc = _dispatch_queue_head(dq);
2549 next_dc = _dispatch_queue_next(dq, dc);
2550 _dispatch_continuation_pop(dc);
2551 _dispatch_perfmon_workitem_inc();
2552
2553 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
2554 _dispatch_perfmon_end();
2555 _dispatch_force_cache_cleanup();
2556 return next_dc;
2557 }
2558 #endif
2559
2560 DISPATCH_ALWAYS_INLINE_NDEBUG
2561 static inline _dispatch_thread_semaphore_t
2562 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq)
2563 {
2564 // rdar://problem/8290662 "lock transfer"
2565 struct dispatch_object_s *dc;
2566 _dispatch_thread_semaphore_t sema;
2567
2568 // queue is locked, or suspended and not being drained
2569 dc = dq->dq_items_head;
2570 if (slowpath(!dc) || !(sema = _dispatch_barrier_sync_f_pop(dq, dc, false))){
2571 return 0;
2572 }
2573 // dequeue dc, it is a barrier sync
2574 (void)_dispatch_queue_next(dq, dc);
2575 return sema;
2576 }
2577
2578 void
2579 _dispatch_mgr_queue_drain(void)
2580 {
2581 dispatch_queue_t dq = &_dispatch_mgr_q;
2582 if (!dq->dq_items_tail) {
2583 return _dispatch_force_cache_cleanup();
2584 }
2585 _dispatch_perfmon_start();
2586 if (slowpath(_dispatch_queue_drain(dq))) {
2587 DISPATCH_CRASH("Sync onto manager queue");
2588 }
2589 _dispatch_perfmon_end();
2590 _dispatch_force_cache_cleanup();
2591 }
2592
2593 #pragma mark -
2594 #pragma mark dispatch_root_queue_drain
2595
2596 #ifndef DISPATCH_CONTENTION_USE_RAND
2597 #define DISPATCH_CONTENTION_USE_RAND (!TARGET_OS_EMBEDDED)
2598 #endif
2599 #ifndef DISPATCH_CONTENTION_SPINS_MAX
2600 #define DISPATCH_CONTENTION_SPINS_MAX (128 - 1)
2601 #endif
2602 #ifndef DISPATCH_CONTENTION_SPINS_MIN
2603 #define DISPATCH_CONTENTION_SPINS_MIN (32 - 1)
2604 #endif
2605 #ifndef DISPATCH_CONTENTION_USLEEP_START
2606 #define DISPATCH_CONTENTION_USLEEP_START 500
2607 #endif
2608 #ifndef DISPATCH_CONTENTION_USLEEP_MAX
2609 #define DISPATCH_CONTENTION_USLEEP_MAX 100000
2610 #endif
2611
2612 DISPATCH_NOINLINE
2613 static bool
2614 _dispatch_queue_concurrent_drain_one_slow(dispatch_queue_t dq)
2615 {
2616 dispatch_root_queue_context_t qc = dq->do_ctxt;
2617 struct dispatch_object_s *const mediator = (void *)~0ul;
2618 bool pending = false, available = true;
2619 unsigned int spins, sleep_time = DISPATCH_CONTENTION_USLEEP_START;
2620
2621 do {
2622 // Spin for a short while in case the contention is temporary -- e.g.
2623 // when starting up after dispatch_apply, or when executing a few
2624 // short continuations in a row.
2625 #if DISPATCH_CONTENTION_USE_RAND
2626 // Use randomness to prevent threads from resonating at the same
2627 // frequency and permanently contending. All threads sharing the same
2628 // seed value is safe with the FreeBSD rand_r implementation.
2629 static unsigned int seed;
2630 spins = (rand_r(&seed) & DISPATCH_CONTENTION_SPINS_MAX) |
2631 DISPATCH_CONTENTION_SPINS_MIN;
2632 #else
2633 spins = DISPATCH_CONTENTION_SPINS_MIN +
2634 (DISPATCH_CONTENTION_SPINS_MAX-DISPATCH_CONTENTION_SPINS_MIN)/2;
2635 #endif
2636 while (spins--) {
2637 dispatch_hardware_pause();
2638 if (fastpath(dq->dq_items_head != mediator)) goto out;
2639 };
2640 // Since we have serious contention, we need to back off.
2641 if (!pending) {
2642 // Mark this queue as pending to avoid requests for further threads
2643 (void)dispatch_atomic_inc2o(qc, dgq_pending, relaxed);
2644 pending = true;
2645 }
2646 _dispatch_contention_usleep(sleep_time);
2647 if (fastpath(dq->dq_items_head != mediator)) goto out;
2648 sleep_time *= 2;
2649 } while (sleep_time < DISPATCH_CONTENTION_USLEEP_MAX);
2650
2651 // The ratio of work to libdispatch overhead must be bad. This
2652 // scenario implies that there are too many threads in the pool.
2653 // Create a new pending thread and then exit this thread.
2654 // The kernel will grant a new thread when the load subsides.
2655 _dispatch_debug("contention on global queue: %p", dq);
2656 _dispatch_queue_wakeup_global(dq);
2657 available = false;
2658 out:
2659 if (pending) {
2660 (void)dispatch_atomic_dec2o(qc, dgq_pending, relaxed);
2661 }
2662 return available;
2663 }
2664
2665 DISPATCH_ALWAYS_INLINE_NDEBUG
2666 static inline struct dispatch_object_s *
2667 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq)
2668 {
2669 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
2670
2671 start:
2672 // The mediator value acts both as a "lock" and a signal
2673 head = dispatch_atomic_xchg2o(dq, dq_items_head, mediator, relaxed);
2674
2675 if (slowpath(head == NULL)) {
2676 // The first xchg on the tail will tell the enqueueing thread that it
2677 // is safe to blindly write out to the head pointer. A cmpxchg honors
2678 // the algorithm.
2679 (void)dispatch_atomic_cmpxchg2o(dq, dq_items_head, mediator, NULL,
2680 relaxed);
2681 _dispatch_root_queue_debug("no work on global queue: %p", dq);
2682 return NULL;
2683 }
2684
2685 if (slowpath(head == mediator)) {
2686 // This thread lost the race for ownership of the queue.
2687 if (fastpath(_dispatch_queue_concurrent_drain_one_slow(dq))) {
2688 goto start;
2689 }
2690 return NULL;
2691 }
2692
2693 // Restore the head pointer to a sane value before returning.
2694 // If 'next' is NULL, then this item _might_ be the last item.
2695 next = fastpath(head->do_next);
2696
2697 if (slowpath(!next)) {
2698 dispatch_atomic_store2o(dq, dq_items_head, NULL, relaxed);
2699
2700 if (dispatch_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL, relaxed)) {
2701 // both head and tail are NULL now
2702 goto out;
2703 }
2704
2705 // There must be a next item now. This thread won't wait long.
2706 while (!(next = head->do_next)) {
2707 dispatch_hardware_pause();
2708 }
2709 }
2710
2711 dispatch_atomic_store2o(dq, dq_items_head, next, relaxed);
2712 _dispatch_queue_wakeup_global(dq);
2713 out:
2714 return head;
2715 }
2716
2717 static void
2718 _dispatch_root_queue_drain(dispatch_queue_t dq)
2719 {
2720 #if DISPATCH_DEBUG
2721 if (_dispatch_thread_getspecific(dispatch_queue_key)) {
2722 DISPATCH_CRASH("Premature thread recycling");
2723 }
2724 #endif
2725 _dispatch_thread_setspecific(dispatch_queue_key, dq);
2726
2727 #if DISPATCH_COCOA_COMPAT
2728 // ensure that high-level memory management techniques do not leak/crash
2729 if (dispatch_begin_thread_4GC) {
2730 dispatch_begin_thread_4GC();
2731 }
2732 void *pool = _dispatch_autorelease_pool_push();
2733 #endif // DISPATCH_COCOA_COMPAT
2734
2735 _dispatch_perfmon_start();
2736 struct dispatch_object_s *item;
2737 while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
2738 _dispatch_continuation_pop(item);
2739 }
2740 _dispatch_perfmon_end();
2741
2742 #if DISPATCH_COCOA_COMPAT
2743 _dispatch_autorelease_pool_pop(pool);
2744 if (dispatch_end_thread_4GC) {
2745 dispatch_end_thread_4GC();
2746 }
2747 #endif // DISPATCH_COCOA_COMPAT
2748
2749 _dispatch_thread_setspecific(dispatch_queue_key, NULL);
2750 }
2751
2752 #pragma mark -
2753 #pragma mark dispatch_worker_thread
2754
2755 #if HAVE_PTHREAD_WORKQUEUES
2756 static void
2757 _dispatch_worker_thread3(void *context)
2758 {
2759 dispatch_queue_t dq = context;
2760 dispatch_root_queue_context_t qc = dq->do_ctxt;
2761
2762 _dispatch_introspection_thread_add();
2763
2764 (void)dispatch_atomic_dec2o(qc, dgq_pending, relaxed);
2765 _dispatch_root_queue_drain(dq);
2766 __asm__(""); // prevent tailcall (for Instrument DTrace probe)
2767
2768 }
2769
2770 #if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2771 // 6618342 Contact the team that owns the Instrument DTrace probe before
2772 // renaming this symbol
2773 static void
2774 _dispatch_worker_thread2(int priority, int options,
2775 void *context DISPATCH_UNUSED)
2776 {
2777 dispatch_assert(priority >= 0 && priority < WORKQ_NUM_PRIOQUEUE);
2778 dispatch_assert(!(options & ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT));
2779 dispatch_queue_t dq = _dispatch_wq2root_queues[priority][options];
2780
2781 return _dispatch_worker_thread3(dq);
2782 }
2783 #endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
2784 #endif // HAVE_PTHREAD_WORKQUEUES
2785
2786 #if DISPATCH_USE_PTHREAD_POOL
2787 // 6618342 Contact the team that owns the Instrument DTrace probe before
2788 // renaming this symbol
2789 static void *
2790 _dispatch_worker_thread(void *context)
2791 {
2792 dispatch_queue_t dq = context;
2793 dispatch_root_queue_context_t qc = dq->do_ctxt;
2794 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
2795
2796 if (pqc && pqc->dpq_thread_configure) {
2797 pqc->dpq_thread_configure();
2798 }
2799
2800 sigset_t mask;
2801 int r;
2802 // workaround tweaks the kernel workqueue does for us
2803 r = sigfillset(&mask);
2804 (void)dispatch_assume_zero(r);
2805 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
2806 (void)dispatch_assume_zero(r);
2807 _dispatch_introspection_thread_add();
2808
2809 // Non-pthread-root-queue pthreads use a 65 second timeout in case there
2810 // are any timers that run once a minute <rdar://problem/11744973>
2811 const int64_t timeout = (pqc ? 5ull : 65ull) * NSEC_PER_SEC;
2812
2813 do {
2814 _dispatch_root_queue_drain(dq);
2815 } while (dispatch_semaphore_wait(qc->dgq_thread_mediator,
2816 dispatch_time(0, timeout)) == 0);
2817
2818 (void)dispatch_atomic_inc2o(qc, dgq_thread_pool_size, relaxed);
2819 _dispatch_queue_wakeup_global(dq);
2820 _dispatch_release(dq);
2821
2822 return NULL;
2823 }
2824
2825 int
2826 _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset)
2827 {
2828 int r;
2829
2830 /* Workaround: 6269619 Not all signals can be delivered on any thread */
2831
2832 r = sigdelset(set, SIGILL);
2833 (void)dispatch_assume_zero(r);
2834 r = sigdelset(set, SIGTRAP);
2835 (void)dispatch_assume_zero(r);
2836 #if HAVE_DECL_SIGEMT
2837 r = sigdelset(set, SIGEMT);
2838 (void)dispatch_assume_zero(r);
2839 #endif
2840 r = sigdelset(set, SIGFPE);
2841 (void)dispatch_assume_zero(r);
2842 r = sigdelset(set, SIGBUS);
2843 (void)dispatch_assume_zero(r);
2844 r = sigdelset(set, SIGSEGV);
2845 (void)dispatch_assume_zero(r);
2846 r = sigdelset(set, SIGSYS);
2847 (void)dispatch_assume_zero(r);
2848 r = sigdelset(set, SIGPIPE);
2849 (void)dispatch_assume_zero(r);
2850
2851 return pthread_sigmask(how, set, oset);
2852 }
2853 #endif // DISPATCH_USE_PTHREAD_POOL
2854
2855 #pragma mark -
2856 #pragma mark dispatch_runloop_queue
2857
2858 static bool _dispatch_program_is_probably_callback_driven;
2859
2860 #if DISPATCH_COCOA_COMPAT
2861
2862 dispatch_queue_t
2863 _dispatch_runloop_root_queue_create_4CF(const char *label, unsigned long flags)
2864 {
2865 dispatch_queue_t dq;
2866 size_t dqs;
2867
2868 if (slowpath(flags)) {
2869 return NULL;
2870 }
2871 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
2872 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_runloop), dqs);
2873 _dispatch_queue_init(dq);
2874 dq->do_targetq = _dispatch_get_root_queue(0, true);
2875 dq->dq_label = label ? label : "runloop-queue"; // no-copy contract
2876 dq->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK;
2877 dq->dq_running = 1;
2878 dq->dq_is_thread_bound = 1;
2879 _dispatch_runloop_queue_port_init(dq);
2880 _dispatch_queue_set_bound_thread(dq);
2881 _dispatch_object_debug(dq, "%s", __func__);
2882 return _dispatch_introspection_queue_create(dq);
2883 }
2884
2885 void
2886 _dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq)
2887 {
2888 _dispatch_object_debug(dq, "%s", __func__);
2889 (void)dispatch_atomic_dec2o(dq, dq_running, relaxed);
2890 unsigned int suspend_cnt = dispatch_atomic_sub2o(dq, do_suspend_cnt,
2891 DISPATCH_OBJECT_SUSPEND_LOCK, release);
2892 _dispatch_queue_clear_bound_thread(dq);
2893 if (suspend_cnt == 0) {
2894 _dispatch_wakeup(dq);
2895 }
2896 }
2897
2898 void
2899 _dispatch_runloop_queue_dispose(dispatch_queue_t dq)
2900 {
2901 _dispatch_object_debug(dq, "%s", __func__);
2902 _dispatch_introspection_queue_dispose(dq);
2903 _dispatch_runloop_queue_port_dispose(dq);
2904 _dispatch_queue_destroy(dq);
2905 }
2906
2907 bool
2908 _dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq)
2909 {
2910 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
2911 DISPATCH_CLIENT_CRASH("Not a runloop queue");
2912 }
2913 dispatch_retain(dq);
2914 bool r = _dispatch_runloop_queue_drain_one(dq);
2915 dispatch_release(dq);
2916 return r;
2917 }
2918
2919 void
2920 _dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq)
2921 {
2922 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
2923 DISPATCH_CLIENT_CRASH("Not a runloop queue");
2924 }
2925 _dispatch_runloop_queue_probe(dq);
2926 }
2927
2928 mach_port_t
2929 _dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq)
2930 {
2931 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) {
2932 DISPATCH_CLIENT_CRASH("Not a runloop queue");
2933 }
2934 return (mach_port_t)dq->do_ctxt;
2935 }
2936
2937 static void
2938 _dispatch_runloop_queue_port_init(void *ctxt)
2939 {
2940 dispatch_queue_t dq = (dispatch_queue_t)ctxt;
2941 mach_port_t mp;
2942 kern_return_t kr;
2943
2944 _dispatch_safe_fork = false;
2945 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &mp);
2946 DISPATCH_VERIFY_MIG(kr);
2947 (void)dispatch_assume_zero(kr);
2948 kr = mach_port_insert_right(mach_task_self(), mp, mp,
2949 MACH_MSG_TYPE_MAKE_SEND);
2950 DISPATCH_VERIFY_MIG(kr);
2951 (void)dispatch_assume_zero(kr);
2952 if (dq != &_dispatch_main_q) {
2953 struct mach_port_limits limits = {
2954 .mpl_qlimit = 1,
2955 };
2956 kr = mach_port_set_attributes(mach_task_self(), mp,
2957 MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits,
2958 sizeof(limits));
2959 DISPATCH_VERIFY_MIG(kr);
2960 (void)dispatch_assume_zero(kr);
2961 }
2962 dq->do_ctxt = (void*)(uintptr_t)mp;
2963
2964 _dispatch_program_is_probably_callback_driven = true;
2965 }
2966
2967 static void
2968 _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq)
2969 {
2970 mach_port_t mp = (mach_port_t)dq->do_ctxt;
2971 if (!mp) {
2972 return;
2973 }
2974 dq->do_ctxt = NULL;
2975 kern_return_t kr = mach_port_deallocate(mach_task_self(), mp);
2976 DISPATCH_VERIFY_MIG(kr);
2977 (void)dispatch_assume_zero(kr);
2978 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE, -1);
2979 DISPATCH_VERIFY_MIG(kr);
2980 (void)dispatch_assume_zero(kr);
2981 }
2982
2983 #pragma mark -
2984 #pragma mark dispatch_main_queue
2985
2986 mach_port_t
2987 _dispatch_get_main_queue_port_4CF(void)
2988 {
2989 dispatch_queue_t dq = &_dispatch_main_q;
2990 dispatch_once_f(&_dispatch_main_q_port_pred, dq,
2991 _dispatch_runloop_queue_port_init);
2992 return (mach_port_t)dq->do_ctxt;
2993 }
2994
2995 static bool main_q_is_draining;
2996
2997 // 6618342 Contact the team that owns the Instrument DTrace probe before
2998 // renaming this symbol
2999 DISPATCH_NOINLINE
3000 static void
3001 _dispatch_queue_set_mainq_drain_state(bool arg)
3002 {
3003 main_q_is_draining = arg;
3004 }
3005
3006 void
3007 _dispatch_main_queue_callback_4CF(mach_msg_header_t *msg DISPATCH_UNUSED)
3008 {
3009 if (main_q_is_draining) {
3010 return;
3011 }
3012 _dispatch_queue_set_mainq_drain_state(true);
3013 _dispatch_main_queue_drain();
3014 _dispatch_queue_set_mainq_drain_state(false);
3015 }
3016
3017 #endif
3018
3019 void
3020 dispatch_main(void)
3021 {
3022 #if HAVE_PTHREAD_MAIN_NP
3023 if (pthread_main_np()) {
3024 #endif
3025 _dispatch_object_debug(&_dispatch_main_q, "%s", __func__);
3026 _dispatch_program_is_probably_callback_driven = true;
3027 pthread_exit(NULL);
3028 DISPATCH_CRASH("pthread_exit() returned");
3029 #if HAVE_PTHREAD_MAIN_NP
3030 }
3031 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
3032 #endif
3033 }
3034
3035 DISPATCH_NOINLINE DISPATCH_NORETURN
3036 static void
3037 _dispatch_sigsuspend(void)
3038 {
3039 static const sigset_t mask;
3040
3041 for (;;) {
3042 sigsuspend(&mask);
3043 }
3044 }
3045
3046 DISPATCH_NORETURN
3047 static void
3048 _dispatch_sig_thread(void *ctxt DISPATCH_UNUSED)
3049 {
3050 // never returns, so burn bridges behind us
3051 _dispatch_clear_stack(0);
3052 _dispatch_sigsuspend();
3053 }
3054
3055 DISPATCH_NOINLINE
3056 static void
3057 _dispatch_queue_cleanup2(void)
3058 {
3059 dispatch_queue_t dq = &_dispatch_main_q;
3060 (void)dispatch_atomic_dec2o(dq, dq_running, relaxed);
3061 unsigned int suspend_cnt = dispatch_atomic_sub2o(dq, do_suspend_cnt,
3062 DISPATCH_OBJECT_SUSPEND_LOCK, release);
3063 dq->dq_is_thread_bound = 0;
3064 if (suspend_cnt == 0) {
3065 _dispatch_wakeup(dq);
3066 }
3067
3068 // overload the "probably" variable to mean that dispatch_main() or
3069 // similar non-POSIX API was called
3070 // this has to run before the DISPATCH_COCOA_COMPAT below
3071 if (_dispatch_program_is_probably_callback_driven) {
3072 dispatch_async_f(_dispatch_get_root_queue(0, true), NULL,
3073 _dispatch_sig_thread);
3074 sleep(1); // workaround 6778970
3075 }
3076
3077 #if DISPATCH_COCOA_COMPAT
3078 dispatch_once_f(&_dispatch_main_q_port_pred, dq,
3079 _dispatch_runloop_queue_port_init);
3080 _dispatch_runloop_queue_port_dispose(dq);
3081 #endif
3082 }
3083
3084 static void
3085 _dispatch_queue_cleanup(void *ctxt)
3086 {
3087 if (ctxt == &_dispatch_main_q) {
3088 return _dispatch_queue_cleanup2();
3089 }
3090 // POSIX defines that destructors are only called if 'ctxt' is non-null
3091 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
3092 }