]> git.saurik.com Git - apple/libdispatch.git/blob - src/queue.c
libdispatch-84.5.3.tar.gz
[apple/libdispatch.git] / src / queue.c
1 /*
2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22 #include "protocol.h"
23
24 void
25 dummy_function(void)
26 {
27 }
28
29 long
30 dummy_function_r0(void)
31 {
32 return 0;
33 }
34
35 static bool _dispatch_select_workaround;
36 static fd_set _dispatch_rfds;
37 static fd_set _dispatch_wfds;
38 static void *_dispatch_rfd_ptrs[FD_SETSIZE];
39 static void *_dispatch_wfd_ptrs[FD_SETSIZE];
40
41
42 static struct dispatch_semaphore_s _dispatch_thread_mediator[] = {
43 {
44 .do_vtable = &_dispatch_semaphore_vtable,
45 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
46 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
47 },
48 {
49 .do_vtable = &_dispatch_semaphore_vtable,
50 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
51 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
52 },
53 {
54 .do_vtable = &_dispatch_semaphore_vtable,
55 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
56 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
57 },
58 {
59 .do_vtable = &_dispatch_semaphore_vtable,
60 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
61 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
62 },
63 {
64 .do_vtable = &_dispatch_semaphore_vtable,
65 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
66 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
67 },
68 {
69 .do_vtable = &_dispatch_semaphore_vtable,
70 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
71 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
72 },
73 };
74
75 static struct dispatch_queue_s _dispatch_root_queues[];
76
77 static inline dispatch_queue_t
78 _dispatch_get_root_queue(long priority, bool overcommit)
79 {
80 if (overcommit) switch (priority) {
81 case DISPATCH_QUEUE_PRIORITY_LOW:
82 return &_dispatch_root_queues[1];
83 case DISPATCH_QUEUE_PRIORITY_DEFAULT:
84 return &_dispatch_root_queues[3];
85 case DISPATCH_QUEUE_PRIORITY_HIGH:
86 return &_dispatch_root_queues[5];
87 }
88 switch (priority) {
89 case DISPATCH_QUEUE_PRIORITY_LOW:
90 return &_dispatch_root_queues[0];
91 case DISPATCH_QUEUE_PRIORITY_DEFAULT:
92 return &_dispatch_root_queues[2];
93 case DISPATCH_QUEUE_PRIORITY_HIGH:
94 return &_dispatch_root_queues[4];
95 default:
96 return NULL;
97 }
98 }
99
100 #ifdef __BLOCKS__
101 dispatch_block_t
102 _dispatch_Block_copy(dispatch_block_t db)
103 {
104 dispatch_block_t rval;
105
106 while (!(rval = Block_copy(db))) {
107 sleep(1);
108 }
109
110 return rval;
111 }
112 #define _dispatch_Block_copy(x) ((typeof(x))_dispatch_Block_copy(x))
113
114 void
115 _dispatch_call_block_and_release(void *block)
116 {
117 void (^b)(void) = block;
118 b();
119 Block_release(b);
120 }
121
122 void
123 _dispatch_call_block_and_release2(void *block, void *ctxt)
124 {
125 void (^b)(void*) = block;
126 b(ctxt);
127 Block_release(b);
128 }
129
130 #endif /* __BLOCKS__ */
131
132 struct dispatch_queue_attr_vtable_s {
133 DISPATCH_VTABLE_HEADER(dispatch_queue_attr_s);
134 };
135
136 struct dispatch_queue_attr_s {
137 DISPATCH_STRUCT_HEADER(dispatch_queue_attr_s, dispatch_queue_attr_vtable_s);
138
139 // Public:
140 int qa_priority;
141 void* finalizer_ctxt;
142 dispatch_queue_finalizer_function_t finalizer_func;
143
144 // Private:
145 unsigned long qa_flags;
146 };
147
148 static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset);
149
150 #define _dispatch_queue_trylock(dq) dispatch_atomic_cmpxchg(&(dq)->dq_running, 0, 1)
151 static inline void _dispatch_queue_unlock(dispatch_queue_t dq);
152 static void _dispatch_queue_invoke(dispatch_queue_t dq);
153 static void _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq);
154 static bool _dispatch_queue_wakeup_global(dispatch_queue_t dq);
155 static struct dispatch_object_s *_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq);
156
157 static bool _dispatch_program_is_probably_callback_driven;
158
159 #if DISPATCH_COCOA_COMPAT
160 void (*dispatch_begin_thread_4GC)(void) = dummy_function;
161 void (*dispatch_end_thread_4GC)(void) = dummy_function;
162 void *(*_dispatch_begin_NSAutoReleasePool)(void) = (void *)dummy_function;
163 void (*_dispatch_end_NSAutoReleasePool)(void *) = (void *)dummy_function;
164 static void _dispatch_queue_wakeup_main(void);
165
166 static dispatch_once_t _dispatch_main_q_port_pred;
167 static bool main_q_is_draining;
168 static mach_port_t main_q_port;
169 #endif
170
171 static void _dispatch_cache_cleanup2(void *value);
172 static void _dispatch_force_cache_cleanup(void);
173
174 static const struct dispatch_queue_vtable_s _dispatch_queue_vtable = {
175 .do_type = DISPATCH_QUEUE_TYPE,
176 .do_kind = "queue",
177 .do_dispose = _dispatch_queue_dispose,
178 .do_invoke = (void *)dummy_function_r0,
179 .do_probe = (void *)dummy_function_r0,
180 .do_debug = dispatch_queue_debug,
181 };
182
183 static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable = {
184 .do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
185 .do_kind = "global-queue",
186 .do_debug = dispatch_queue_debug,
187 .do_probe = _dispatch_queue_wakeup_global,
188 };
189
190 #define MAX_THREAD_COUNT 255
191
192 struct dispatch_root_queue_context_s {
193 pthread_workqueue_t dgq_kworkqueue;
194 uint32_t dgq_pending;
195 uint32_t dgq_thread_pool_size;
196 dispatch_semaphore_t dgq_thread_mediator;
197 };
198
199 #define DISPATCH_ROOT_QUEUE_COUNT (DISPATCH_QUEUE_PRIORITY_COUNT * 2)
200 static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
201 {
202 .dgq_thread_mediator = &_dispatch_thread_mediator[0],
203 .dgq_thread_pool_size = MAX_THREAD_COUNT,
204 },
205 {
206 .dgq_thread_mediator = &_dispatch_thread_mediator[1],
207 .dgq_thread_pool_size = MAX_THREAD_COUNT,
208 },
209 {
210 .dgq_thread_mediator = &_dispatch_thread_mediator[2],
211 .dgq_thread_pool_size = MAX_THREAD_COUNT,
212 },
213 {
214 .dgq_thread_mediator = &_dispatch_thread_mediator[3],
215 .dgq_thread_pool_size = MAX_THREAD_COUNT,
216 },
217 {
218 .dgq_thread_mediator = &_dispatch_thread_mediator[4],
219 .dgq_thread_pool_size = MAX_THREAD_COUNT,
220 },
221 {
222 .dgq_thread_mediator = &_dispatch_thread_mediator[5],
223 .dgq_thread_pool_size = MAX_THREAD_COUNT,
224 },
225 };
226
227 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
228 // dq_running is set to 2 so that barrier operations go through the slow path
229 static struct dispatch_queue_s _dispatch_root_queues[] = {
230 {
231 .do_vtable = &_dispatch_queue_root_vtable,
232 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
233 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
234 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
235 .do_ctxt = &_dispatch_root_queue_contexts[0],
236
237 .dq_label = "com.apple.root.low-priority",
238 .dq_running = 2,
239 .dq_width = UINT32_MAX,
240 .dq_serialnum = 4,
241 },
242 {
243 .do_vtable = &_dispatch_queue_root_vtable,
244 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
245 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
246 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
247 .do_ctxt = &_dispatch_root_queue_contexts[1],
248
249 .dq_label = "com.apple.root.low-overcommit-priority",
250 .dq_running = 2,
251 .dq_width = UINT32_MAX,
252 .dq_serialnum = 5,
253 },
254 {
255 .do_vtable = &_dispatch_queue_root_vtable,
256 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
257 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
258 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
259 .do_ctxt = &_dispatch_root_queue_contexts[2],
260
261 .dq_label = "com.apple.root.default-priority",
262 .dq_running = 2,
263 .dq_width = UINT32_MAX,
264 .dq_serialnum = 6,
265 },
266 {
267 .do_vtable = &_dispatch_queue_root_vtable,
268 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
269 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
270 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
271 .do_ctxt = &_dispatch_root_queue_contexts[3],
272
273 .dq_label = "com.apple.root.default-overcommit-priority",
274 .dq_running = 2,
275 .dq_width = UINT32_MAX,
276 .dq_serialnum = 7,
277 },
278 {
279 .do_vtable = &_dispatch_queue_root_vtable,
280 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
281 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
282 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
283 .do_ctxt = &_dispatch_root_queue_contexts[4],
284
285 .dq_label = "com.apple.root.high-priority",
286 .dq_running = 2,
287 .dq_width = UINT32_MAX,
288 .dq_serialnum = 8,
289 },
290 {
291 .do_vtable = &_dispatch_queue_root_vtable,
292 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
293 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
294 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
295 .do_ctxt = &_dispatch_root_queue_contexts[5],
296
297 .dq_label = "com.apple.root.high-overcommit-priority",
298 .dq_running = 2,
299 .dq_width = UINT32_MAX,
300 .dq_serialnum = 9,
301 },
302 };
303
304 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
305 struct dispatch_queue_s _dispatch_main_q = {
306 .do_vtable = &_dispatch_queue_vtable,
307 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
308 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
309 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
310 .do_targetq = &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_COUNT / 2],
311
312 .dq_label = "com.apple.main-thread",
313 .dq_running = 1,
314 .dq_width = 1,
315 .dq_serialnum = 1,
316 };
317
318 #if DISPATCH_PERF_MON
319 static OSSpinLock _dispatch_stats_lock;
320 static size_t _dispatch_bad_ratio;
321 static struct {
322 uint64_t time_total;
323 uint64_t count_total;
324 uint64_t thread_total;
325 } _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set
326 static void _dispatch_queue_merge_stats(uint64_t start);
327 #endif
328
329 static void *_dispatch_worker_thread(void *context);
330 static void _dispatch_worker_thread2(void *context);
331
332 malloc_zone_t *_dispatch_ccache_zone;
333
334 static inline void
335 _dispatch_continuation_free(dispatch_continuation_t dc)
336 {
337 dispatch_continuation_t prev_dc = _dispatch_thread_getspecific(dispatch_cache_key);
338 dc->do_next = prev_dc;
339 _dispatch_thread_setspecific(dispatch_cache_key, dc);
340 }
341
342 static inline void
343 _dispatch_continuation_pop(dispatch_object_t dou)
344 {
345 dispatch_continuation_t dc = dou._dc;
346 dispatch_group_t dg;
347
348 if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
349 return _dispatch_queue_invoke(dou._dq);
350 }
351
352 // Add the item back to the cache before calling the function. This
353 // allows the 'hot' continuation to be used for a quick callback.
354 //
355 // The ccache version is per-thread.
356 // Therefore, the object has not been reused yet.
357 // This generates better assembly.
358 if ((long)dou._do->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
359 _dispatch_continuation_free(dc);
360 }
361 if ((long)dou._do->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
362 dg = dc->dc_group;
363 } else {
364 dg = NULL;
365 }
366 dc->dc_func(dc->dc_ctxt);
367 if (dg) {
368 dispatch_group_leave(dg);
369 _dispatch_release(dg);
370 }
371 }
372
373 struct dispatch_object_s *
374 _dispatch_queue_concurrent_drain_one(dispatch_queue_t dq)
375 {
376 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;
377
378 // The mediator value acts both as a "lock" and a signal
379 head = dispatch_atomic_xchg(&dq->dq_items_head, mediator);
380
381 if (slowpath(head == NULL)) {
382 // The first xchg on the tail will tell the enqueueing thread that it
383 // is safe to blindly write out to the head pointer. A cmpxchg honors
384 // the algorithm.
385 dispatch_atomic_cmpxchg(&dq->dq_items_head, mediator, NULL);
386 _dispatch_debug("no work on global work queue");
387 return NULL;
388 }
389
390 if (slowpath(head == mediator)) {
391 // This thread lost the race for ownership of the queue.
392 //
393 // The ratio of work to libdispatch overhead must be bad. This
394 // scenario implies that there are too many threads in the pool.
395 // Create a new pending thread and then exit this thread.
396 // The kernel will grant a new thread when the load subsides.
397 _dispatch_debug("Contention on queue: %p", dq);
398 _dispatch_queue_wakeup_global(dq);
399 #if DISPATCH_PERF_MON
400 dispatch_atomic_inc(&_dispatch_bad_ratio);
401 #endif
402 return NULL;
403 }
404
405 // Restore the head pointer to a sane value before returning.
406 // If 'next' is NULL, then this item _might_ be the last item.
407 next = fastpath(head->do_next);
408
409 if (slowpath(!next)) {
410 dq->dq_items_head = NULL;
411
412 if (dispatch_atomic_cmpxchg(&dq->dq_items_tail, head, NULL)) {
413 // both head and tail are NULL now
414 goto out;
415 }
416
417 // There must be a next item now. This thread won't wait long.
418 while (!(next = head->do_next)) {
419 _dispatch_hardware_pause();
420 }
421 }
422
423 dq->dq_items_head = next;
424 _dispatch_queue_wakeup_global(dq);
425 out:
426 return head;
427 }
428
429 dispatch_queue_t
430 dispatch_get_current_queue(void)
431 {
432 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true);
433 }
434
435 #undef dispatch_get_main_queue
436 __OSX_AVAILABLE_STARTING(__MAC_10_6,__IPHONE_NA)
437 dispatch_queue_t dispatch_get_main_queue(void);
438
439 dispatch_queue_t
440 dispatch_get_main_queue(void)
441 {
442 return &_dispatch_main_q;
443 }
444 #define dispatch_get_main_queue() (&_dispatch_main_q)
445
446 struct _dispatch_hw_config_s _dispatch_hw_config;
447
448 static void
449 _dispatch_queue_set_width_init(void)
450 {
451 size_t valsz = sizeof(uint32_t);
452
453 errno = 0;
454 sysctlbyname("hw.activecpu", &_dispatch_hw_config.cc_max_active, &valsz, NULL, 0);
455 dispatch_assume_zero(errno);
456 dispatch_assume(valsz == sizeof(uint32_t));
457
458 errno = 0;
459 sysctlbyname("hw.logicalcpu_max", &_dispatch_hw_config.cc_max_logical, &valsz, NULL, 0);
460 dispatch_assume_zero(errno);
461 dispatch_assume(valsz == sizeof(uint32_t));
462
463 errno = 0;
464 sysctlbyname("hw.physicalcpu_max", &_dispatch_hw_config.cc_max_physical, &valsz, NULL, 0);
465 dispatch_assume_zero(errno);
466 dispatch_assume(valsz == sizeof(uint32_t));
467 }
468
469 void
470 dispatch_queue_set_width(dispatch_queue_t dq, long width)
471 {
472 int w = (int)width; // intentional truncation
473 uint32_t tmp;
474
475 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
476 return;
477 }
478 if (w == 1 || w == 0) {
479 dq->dq_width = 1;
480 return;
481 }
482 if (w > 0) {
483 tmp = w;
484 } else switch (w) {
485 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
486 tmp = _dispatch_hw_config.cc_max_physical;
487 break;
488 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
489 tmp = _dispatch_hw_config.cc_max_active;
490 break;
491 default:
492 // fall through
493 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
494 tmp = _dispatch_hw_config.cc_max_logical;
495 break;
496 }
497 // multiply by two since the running count is inc/dec by two (the low bit == barrier)
498 dq->dq_width = tmp * 2;
499
500 // XXX if the queue has items and the width is increased, we should try to wake the queue
501 }
502
503 // skip zero
504 // 1 - main_q
505 // 2 - mgr_q
506 // 3 - _unused_
507 // 4,5,6,7,8,9 - global queues
508 // we use 'xadd' on Intel, so the initial value == next assigned
509 static unsigned long _dispatch_queue_serial_numbers = 10;
510
511 // Note to later developers: ensure that any initialization changes are
512 // made for statically allocated queues (i.e. _dispatch_main_q).
513 inline void
514 _dispatch_queue_init(dispatch_queue_t dq)
515 {
516 dq->do_vtable = &_dispatch_queue_vtable;
517 dq->do_next = DISPATCH_OBJECT_LISTLESS;
518 dq->do_ref_cnt = 1;
519 dq->do_xref_cnt = 1;
520 dq->do_targetq = _dispatch_get_root_queue(0, true);
521 dq->dq_running = 0;
522 dq->dq_width = 1;
523 dq->dq_serialnum = dispatch_atomic_inc(&_dispatch_queue_serial_numbers) - 1;
524 }
525
526 dispatch_queue_t
527 dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
528 {
529 dispatch_queue_t dq;
530 size_t label_len;
531
532 if (!label) {
533 label = "";
534 }
535
536 label_len = strlen(label);
537 if (label_len < (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1)) {
538 label_len = (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1);
539 }
540
541 // XXX switch to malloc()
542 dq = calloc(1ul, sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_MIN_LABEL_SIZE + label_len + 1);
543 if (slowpath(!dq)) {
544 return dq;
545 }
546
547 _dispatch_queue_init(dq);
548 strcpy(dq->dq_label, label);
549
550 #ifndef DISPATCH_NO_LEGACY
551 if (slowpath(attr)) {
552 dq->do_targetq = _dispatch_get_root_queue(attr->qa_priority, attr->qa_flags & DISPATCH_QUEUE_OVERCOMMIT);
553 dq->dq_finalizer_ctxt = attr->finalizer_ctxt;
554 dq->dq_finalizer_func = attr->finalizer_func;
555 #ifdef __BLOCKS__
556 if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
557 // if finalizer_ctxt is a Block, retain it.
558 dq->dq_finalizer_ctxt = Block_copy(dq->dq_finalizer_ctxt);
559 if (!(dq->dq_finalizer_ctxt)) {
560 goto out_bad;
561 }
562 }
563 #endif
564 }
565 #endif
566
567 return dq;
568
569 out_bad:
570 free(dq);
571 return NULL;
572 }
573
574 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
575 void
576 _dispatch_queue_dispose(dispatch_queue_t dq)
577 {
578 if (slowpath(dq == _dispatch_queue_get_current())) {
579 DISPATCH_CRASH("Release of a queue by itself");
580 }
581 if (slowpath(dq->dq_items_tail)) {
582 DISPATCH_CRASH("Release of a queue while items are enqueued");
583 }
584
585 #ifndef DISPATCH_NO_LEGACY
586 if (dq->dq_finalizer_func) {
587 dq->dq_finalizer_func(dq->dq_finalizer_ctxt, dq);
588 }
589 #endif
590
591 // trash the tail queue so that use after free will crash
592 dq->dq_items_tail = (void *)0x200;
593
594 _dispatch_dispose(dq);
595 }
596
597 DISPATCH_NOINLINE
598 void
599 _dispatch_queue_push_list_slow(dispatch_queue_t dq, struct dispatch_object_s *obj)
600 {
601 // The queue must be retained before dq_items_head is written in order
602 // to ensure that the reference is still valid when _dispatch_wakeup is
603 // called. Otherwise, if preempted between the assignment to
604 // dq_items_head and _dispatch_wakeup, the blocks submitted to the
605 // queue may release the last reference to the queue when invoked by
606 // _dispatch_queue_drain. <rdar://problem/6932776>
607 _dispatch_retain(dq);
608 dq->dq_items_head = obj;
609 _dispatch_wakeup(dq);
610 _dispatch_release(dq);
611 }
612
613 DISPATCH_NOINLINE
614 static void
615 _dispatch_barrier_async_f_slow(dispatch_queue_t dq, void *context, dispatch_function_t func)
616 {
617 dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_from_heap());
618
619 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
620 dc->dc_func = func;
621 dc->dc_ctxt = context;
622
623 _dispatch_queue_push(dq, dc);
624 }
625
626 #ifdef __BLOCKS__
627 void
628 dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void))
629 {
630 dispatch_barrier_async_f(dq, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
631 }
632 #endif
633
634 DISPATCH_NOINLINE
635 void
636 dispatch_barrier_async_f(dispatch_queue_t dq, void *context, dispatch_function_t func)
637 {
638 dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_cacheonly());
639
640 if (!dc) {
641 return _dispatch_barrier_async_f_slow(dq, context, func);
642 }
643
644 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
645 dc->dc_func = func;
646 dc->dc_ctxt = context;
647
648 _dispatch_queue_push(dq, dc);
649 }
650
651 DISPATCH_NOINLINE
652 static void
653 _dispatch_async_f_slow(dispatch_queue_t dq, void *context, dispatch_function_t func)
654 {
655 dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_from_heap());
656
657 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
658 dc->dc_func = func;
659 dc->dc_ctxt = context;
660
661 _dispatch_queue_push(dq, dc);
662 }
663
664 #ifdef __BLOCKS__
665 void
666 dispatch_async(dispatch_queue_t dq, void (^work)(void))
667 {
668 dispatch_async_f(dq, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
669 }
670 #endif
671
672 DISPATCH_NOINLINE
673 void
674 dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
675 {
676 dispatch_continuation_t dc = fastpath(_dispatch_continuation_alloc_cacheonly());
677
678 // unlike dispatch_sync_f(), we do NOT need to check the queue width,
679 // the "drain" function will do this test
680
681 if (!dc) {
682 return _dispatch_async_f_slow(dq, ctxt, func);
683 }
684
685 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
686 dc->dc_func = func;
687 dc->dc_ctxt = ctxt;
688
689 _dispatch_queue_push(dq, dc);
690 }
691
692 struct dispatch_barrier_sync_slow2_s {
693 dispatch_queue_t dbss2_dq;
694 dispatch_function_t dbss2_func;
695 dispatch_function_t dbss2_ctxt;
696 dispatch_semaphore_t dbss2_sema;
697 };
698
699 static void
700 _dispatch_barrier_sync_f_slow_invoke(void *ctxt)
701 {
702 struct dispatch_barrier_sync_slow2_s *dbss2 = ctxt;
703
704 dispatch_assert(dbss2->dbss2_dq == dispatch_get_current_queue());
705 // ALL blocks on the main queue, must be run on the main thread
706 if (dbss2->dbss2_dq == dispatch_get_main_queue()) {
707 dbss2->dbss2_func(dbss2->dbss2_ctxt);
708 } else {
709 dispatch_suspend(dbss2->dbss2_dq);
710 }
711 dispatch_semaphore_signal(dbss2->dbss2_sema);
712 }
713
714 DISPATCH_NOINLINE
715 static void
716 _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
717 {
718
719 // It's preferred to execute synchronous blocks on the current thread
720 // due to thread-local side effects, garbage collection, etc. However,
721 // blocks submitted to the main thread MUST be run on the main thread
722
723 struct dispatch_barrier_sync_slow2_s dbss2 = {
724 .dbss2_dq = dq,
725 .dbss2_func = func,
726 .dbss2_ctxt = ctxt,
727 .dbss2_sema = _dispatch_get_thread_semaphore(),
728 };
729 struct dispatch_barrier_sync_slow_s {
730 DISPATCH_CONTINUATION_HEADER(dispatch_barrier_sync_slow_s);
731 } dbss = {
732 .do_vtable = (void *)DISPATCH_OBJ_BARRIER_BIT,
733 .dc_func = _dispatch_barrier_sync_f_slow_invoke,
734 .dc_ctxt = &dbss2,
735 };
736
737 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
738 _dispatch_queue_push(dq, (void *)&dbss);
739 dispatch_semaphore_wait(dbss2.dbss2_sema, DISPATCH_TIME_FOREVER);
740
741 if (dq != dispatch_get_main_queue()) {
742 _dispatch_thread_setspecific(dispatch_queue_key, dq);
743 func(ctxt);
744 _dispatch_workitem_inc();
745 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
746 dispatch_resume(dq);
747 }
748 _dispatch_put_thread_semaphore(dbss2.dbss2_sema);
749 }
750
751 #ifdef __BLOCKS__
752 void
753 dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void))
754 {
755 // Blocks submitted to the main queue MUST be run on the main thread,
756 // therefore we must Block_copy in order to notify the thread-local
757 // garbage collector that the objects are transferring to the main thread
758 if (dq == dispatch_get_main_queue()) {
759 dispatch_block_t block = Block_copy(work);
760 return dispatch_barrier_sync_f(dq, block, _dispatch_call_block_and_release);
761 }
762 struct Block_basic *bb = (void *)work;
763
764 dispatch_barrier_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
765 }
766 #endif
767
768 DISPATCH_NOINLINE
769 void
770 dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
771 {
772 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
773
774 // 1) ensure that this thread hasn't enqueued anything ahead of this call
775 // 2) the queue is not suspended
776 // 3) the queue is not weird
777 if (slowpath(dq->dq_items_tail)
778 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))
779 || slowpath(!_dispatch_queue_trylock(dq))) {
780 return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
781 }
782
783 _dispatch_thread_setspecific(dispatch_queue_key, dq);
784 func(ctxt);
785 _dispatch_workitem_inc();
786 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
787 _dispatch_queue_unlock(dq);
788 }
789
790 static void
791 _dispatch_sync_f_slow2(void *ctxt)
792 {
793 dispatch_queue_t dq = _dispatch_queue_get_current();
794 dispatch_atomic_add(&dq->dq_running, 2);
795 dispatch_semaphore_signal(ctxt);
796 }
797
798 DISPATCH_NOINLINE
799 static void
800 _dispatch_sync_f_slow(dispatch_queue_t dq)
801 {
802 // the global root queues do not need strict ordering
803 if (dq->do_targetq == NULL) {
804 dispatch_atomic_add(&dq->dq_running, 2);
805 return;
806 }
807
808 struct dispatch_sync_slow_s {
809 DISPATCH_CONTINUATION_HEADER(dispatch_sync_slow_s);
810 } dss = {
811 .do_vtable = NULL,
812 .dc_func = _dispatch_sync_f_slow2,
813 .dc_ctxt = _dispatch_get_thread_semaphore(),
814 };
815
816 // XXX FIXME -- concurrent queues can be come serial again
817 _dispatch_queue_push(dq, (void *)&dss);
818
819 dispatch_semaphore_wait(dss.dc_ctxt, DISPATCH_TIME_FOREVER);
820 _dispatch_put_thread_semaphore(dss.dc_ctxt);
821 }
822
823 #ifdef __BLOCKS__
824 void
825 dispatch_sync(dispatch_queue_t dq, void (^work)(void))
826 {
827 struct Block_basic *bb = (void *)work;
828 dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
829 }
830 #endif
831
832 DISPATCH_NOINLINE
833 void
834 dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
835 {
836 typeof(dq->dq_running) prev_cnt;
837 dispatch_queue_t old_dq;
838
839 if (dq->dq_width == 1) {
840 return dispatch_barrier_sync_f(dq, ctxt, func);
841 }
842
843 // 1) ensure that this thread hasn't enqueued anything ahead of this call
844 // 2) the queue is not suspended
845 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
846 _dispatch_sync_f_slow(dq);
847 } else {
848 prev_cnt = dispatch_atomic_add(&dq->dq_running, 2) - 2;
849
850 if (slowpath(prev_cnt & 1)) {
851 if (dispatch_atomic_sub(&dq->dq_running, 2) == 0) {
852 _dispatch_wakeup(dq);
853 }
854 _dispatch_sync_f_slow(dq);
855 }
856 }
857
858 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
859 _dispatch_thread_setspecific(dispatch_queue_key, dq);
860 func(ctxt);
861 _dispatch_workitem_inc();
862 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
863
864 if (slowpath(dispatch_atomic_sub(&dq->dq_running, 2) == 0)) {
865 _dispatch_wakeup(dq);
866 }
867 }
868
869 const char *
870 dispatch_queue_get_label(dispatch_queue_t dq)
871 {
872 return dq->dq_label;
873 }
874
875 #if DISPATCH_COCOA_COMPAT
876 static void
877 _dispatch_main_q_port_init(void *ctxt __attribute__((unused)))
878 {
879 kern_return_t kr;
880
881 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &main_q_port);
882 DISPATCH_VERIFY_MIG(kr);
883 dispatch_assume_zero(kr);
884 kr = mach_port_insert_right(mach_task_self(), main_q_port, main_q_port, MACH_MSG_TYPE_MAKE_SEND);
885 DISPATCH_VERIFY_MIG(kr);
886 dispatch_assume_zero(kr);
887
888 _dispatch_program_is_probably_callback_driven = true;
889 _dispatch_safe_fork = false;
890 }
891
892 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
893 DISPATCH_NOINLINE
894 static void
895 _dispatch_queue_set_mainq_drain_state(bool arg)
896 {
897 main_q_is_draining = arg;
898 }
899 #endif
900
901 void
902 dispatch_main(void)
903 {
904 if (pthread_main_np()) {
905 _dispatch_program_is_probably_callback_driven = true;
906 pthread_exit(NULL);
907 DISPATCH_CRASH("pthread_exit() returned");
908 }
909 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread");
910 }
911
912 static void
913 _dispatch_sigsuspend(void *ctxt __attribute__((unused)))
914 {
915 static const sigset_t mask;
916
917 for (;;) {
918 sigsuspend(&mask);
919 }
920 }
921
922 DISPATCH_NOINLINE
923 static void
924 _dispatch_queue_cleanup2(void)
925 {
926 dispatch_atomic_dec(&_dispatch_main_q.dq_running);
927
928 if (dispatch_atomic_sub(&_dispatch_main_q.do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK) == 0) {
929 _dispatch_wakeup(&_dispatch_main_q);
930 }
931
932 // overload the "probably" variable to mean that dispatch_main() or
933 // similar non-POSIX API was called
934 // this has to run before the DISPATCH_COCOA_COMPAT below
935 if (_dispatch_program_is_probably_callback_driven) {
936 dispatch_async_f(_dispatch_get_root_queue(0, 0), NULL, _dispatch_sigsuspend);
937 sleep(1); // workaround 6778970
938 }
939
940 #if DISPATCH_COCOA_COMPAT
941 dispatch_once_f(&_dispatch_main_q_port_pred, NULL, _dispatch_main_q_port_init);
942
943 mach_port_t mp = main_q_port;
944 kern_return_t kr;
945
946 main_q_port = 0;
947
948 if (mp) {
949 kr = mach_port_deallocate(mach_task_self(), mp);
950 DISPATCH_VERIFY_MIG(kr);
951 dispatch_assume_zero(kr);
952 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE, -1);
953 DISPATCH_VERIFY_MIG(kr);
954 dispatch_assume_zero(kr);
955 }
956 #endif
957 }
958
959 dispatch_queue_t
960 dispatch_get_concurrent_queue(long pri)
961 {
962 if (pri > 0) {
963 pri = DISPATCH_QUEUE_PRIORITY_HIGH;
964 } else if (pri < 0) {
965 pri = DISPATCH_QUEUE_PRIORITY_LOW;
966 }
967 return _dispatch_get_root_queue(pri, false);
968 }
969
970 static void
971 _dispatch_queue_cleanup(void *ctxt)
972 {
973 if (ctxt == &_dispatch_main_q) {
974 return _dispatch_queue_cleanup2();
975 }
976 // POSIX defines that destructors are only called if 'ctxt' is non-null
977 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running");
978 }
979
980 dispatch_queue_t
981 dispatch_get_global_queue(long priority, unsigned long flags)
982 {
983 if (flags & ~DISPATCH_QUEUE_OVERCOMMIT) {
984 return NULL;
985 }
986 return _dispatch_get_root_queue(priority, flags & DISPATCH_QUEUE_OVERCOMMIT);
987 }
988
989 #define countof(x) (sizeof(x) / sizeof(x[0]))
990 void
991 libdispatch_init(void)
992 {
993 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT == 3);
994 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 6);
995
996 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW == -DISPATCH_QUEUE_PRIORITY_HIGH);
997 dispatch_assert(countof(_dispatch_root_queues) == DISPATCH_ROOT_QUEUE_COUNT);
998 dispatch_assert(countof(_dispatch_thread_mediator) == DISPATCH_ROOT_QUEUE_COUNT);
999 dispatch_assert(countof(_dispatch_root_queue_contexts) == DISPATCH_ROOT_QUEUE_COUNT);
1000
1001 _dispatch_thread_key_init_np(dispatch_queue_key, _dispatch_queue_cleanup);
1002 _dispatch_thread_key_init_np(dispatch_sema4_key, (void (*)(void *))dispatch_release); // use the extern release
1003 _dispatch_thread_key_init_np(dispatch_cache_key, _dispatch_cache_cleanup2);
1004 #if DISPATCH_PERF_MON
1005 _dispatch_thread_key_init_np(dispatch_bcounter_key, NULL);
1006 #endif
1007
1008 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_main_q);
1009
1010 _dispatch_queue_set_width_init();
1011 }
1012
1013 void
1014 _dispatch_queue_unlock(dispatch_queue_t dq)
1015 {
1016 if (slowpath(dispatch_atomic_dec(&dq->dq_running))) {
1017 return;
1018 }
1019
1020 _dispatch_wakeup(dq);
1021 }
1022
1023 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1024 dispatch_queue_t
1025 _dispatch_wakeup(dispatch_object_t dou)
1026 {
1027 dispatch_queue_t tq;
1028
1029 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
1030 return NULL;
1031 }
1032 if (!dx_probe(dou._do) && !dou._dq->dq_items_tail) {
1033 return NULL;
1034 }
1035
1036 if (!_dispatch_trylock(dou._do)) {
1037 #if DISPATCH_COCOA_COMPAT
1038 if (dou._dq == &_dispatch_main_q) {
1039 _dispatch_queue_wakeup_main();
1040 }
1041 #endif
1042 return NULL;
1043 }
1044 _dispatch_retain(dou._do);
1045 tq = dou._do->do_targetq;
1046 _dispatch_queue_push(tq, dou._do);
1047 return tq; // libdispatch doesn't need this, but the Instrument DTrace probe does
1048 }
1049
1050 #if DISPATCH_COCOA_COMPAT
1051 DISPATCH_NOINLINE
1052 void
1053 _dispatch_queue_wakeup_main(void)
1054 {
1055 kern_return_t kr;
1056
1057 dispatch_once_f(&_dispatch_main_q_port_pred, NULL, _dispatch_main_q_port_init);
1058
1059 kr = _dispatch_send_wakeup_main_thread(main_q_port, 0);
1060
1061 switch (kr) {
1062 case MACH_SEND_TIMEOUT:
1063 case MACH_SEND_TIMED_OUT:
1064 case MACH_SEND_INVALID_DEST:
1065 break;
1066 default:
1067 dispatch_assume_zero(kr);
1068 break;
1069 }
1070
1071 _dispatch_safe_fork = false;
1072 }
1073 #endif
1074
1075 static inline int
1076 _dispatch_rootq2wq_pri(long idx)
1077 {
1078 #ifdef WORKQ_DEFAULT_PRIOQUEUE
1079 switch (idx) {
1080 case 0:
1081 case 1:
1082 return WORKQ_LOW_PRIOQUEUE;
1083 case 2:
1084 case 3:
1085 default:
1086 return WORKQ_DEFAULT_PRIOQUEUE;
1087 case 4:
1088 case 5:
1089 return WORKQ_HIGH_PRIOQUEUE;
1090 }
1091 #else
1092 return pri;
1093 #endif
1094 }
1095
1096 static void
1097 _dispatch_root_queues_init(void *context __attribute__((unused)))
1098 {
1099 bool disable_wq = getenv("LIBDISPATCH_DISABLE_KWQ");
1100 pthread_workqueue_attr_t pwq_attr;
1101 kern_return_t kr;
1102 int i, r;
1103
1104 r = pthread_workqueue_attr_init_np(&pwq_attr);
1105 dispatch_assume_zero(r);
1106
1107 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
1108 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr, _dispatch_rootq2wq_pri(i));
1109 dispatch_assume_zero(r);
1110 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr, i & 1);
1111 dispatch_assume_zero(r);
1112 // some software hangs if the non-overcommitting queues do not overcommit when threads block
1113 #if 0
1114 if (!(i & 1)) {
1115 dispatch_root_queue_contexts[i].dgq_thread_pool_size = _dispatch_hw_config.cc_max_active;
1116 }
1117 #endif
1118
1119 r = 0;
1120 if (disable_wq || (r = pthread_workqueue_create_np(&_dispatch_root_queue_contexts[i].dgq_kworkqueue, &pwq_attr))) {
1121 if (r != ENOTSUP) {
1122 dispatch_assume_zero(r);
1123 }
1124 // override the default FIFO behavior for the pool semaphores
1125 kr = semaphore_create(mach_task_self(), &_dispatch_thread_mediator[i].dsema_port, SYNC_POLICY_LIFO, 0);
1126 DISPATCH_VERIFY_MIG(kr);
1127 dispatch_assume_zero(kr);
1128 dispatch_assume(_dispatch_thread_mediator[i].dsema_port);
1129 } else {
1130 dispatch_assume(_dispatch_root_queue_contexts[i].dgq_kworkqueue);
1131 }
1132 }
1133
1134 r = pthread_workqueue_attr_destroy_np(&pwq_attr);
1135 dispatch_assume_zero(r);
1136 }
1137
1138 bool
1139 _dispatch_queue_wakeup_global(dispatch_queue_t dq)
1140 {
1141 static dispatch_once_t pred;
1142 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
1143 pthread_workitem_handle_t wh;
1144 unsigned int gen_cnt;
1145 pthread_t pthr;
1146 int r, t_count;
1147
1148 if (!dq->dq_items_tail) {
1149 return false;
1150 }
1151
1152 _dispatch_safe_fork = false;
1153
1154 dispatch_debug_queue(dq, __PRETTY_FUNCTION__);
1155
1156 dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);
1157
1158 if (qc->dgq_kworkqueue) {
1159 if (dispatch_atomic_cmpxchg(&qc->dgq_pending, 0, 1)) {
1160 _dispatch_debug("requesting new worker thread");
1161
1162 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue, _dispatch_worker_thread2, dq, &wh, &gen_cnt);
1163 dispatch_assume_zero(r);
1164 } else {
1165 _dispatch_debug("work thread request still pending on global queue: %p", dq);
1166 }
1167 goto out;
1168 }
1169
1170 if (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
1171 goto out;
1172 }
1173
1174 do {
1175 t_count = qc->dgq_thread_pool_size;
1176 if (!t_count) {
1177 _dispatch_debug("The thread pool is full: %p", dq);
1178 goto out;
1179 }
1180 } while (!dispatch_atomic_cmpxchg(&qc->dgq_thread_pool_size, t_count, t_count - 1));
1181
1182 while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
1183 if (r != EAGAIN) {
1184 dispatch_assume_zero(r);
1185 }
1186 sleep(1);
1187 }
1188 r = pthread_detach(pthr);
1189 dispatch_assume_zero(r);
1190
1191 out:
1192 return false;
1193 }
1194
1195 void
1196 _dispatch_queue_serial_drain_till_empty(dispatch_queue_t dq)
1197 {
1198 #if DISPATCH_PERF_MON
1199 uint64_t start = mach_absolute_time();
1200 #endif
1201 _dispatch_queue_drain(dq);
1202 #if DISPATCH_PERF_MON
1203 _dispatch_queue_merge_stats(start);
1204 #endif
1205 _dispatch_force_cache_cleanup();
1206 }
1207
1208 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1209 DISPATCH_NOINLINE
1210 void
1211 _dispatch_queue_invoke(dispatch_queue_t dq)
1212 {
1213 dispatch_queue_t tq = dq->do_targetq;
1214
1215 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) && fastpath(_dispatch_queue_trylock(dq))) {
1216 _dispatch_queue_drain(dq);
1217 if (tq == dq->do_targetq) {
1218 tq = dx_invoke(dq);
1219 } else {
1220 tq = dq->do_targetq;
1221 }
1222 // We do not need to check the result.
1223 // When the suspend-count lock is dropped, then the check will happen.
1224 dispatch_atomic_dec(&dq->dq_running);
1225 if (tq) {
1226 return _dispatch_queue_push(tq, dq);
1227 }
1228 }
1229
1230 dq->do_next = DISPATCH_OBJECT_LISTLESS;
1231 if (dispatch_atomic_sub(&dq->do_suspend_cnt, DISPATCH_OBJECT_SUSPEND_LOCK) == 0) {
1232 if (dq->dq_running == 0) {
1233 _dispatch_wakeup(dq); // verify that the queue is idle
1234 }
1235 }
1236 _dispatch_release(dq); // added when the queue is put on the list
1237 }
1238
1239 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1240 static void
1241 _dispatch_set_target_queue2(void *ctxt)
1242 {
1243 dispatch_queue_t prev_dq, dq = _dispatch_queue_get_current();
1244
1245 prev_dq = dq->do_targetq;
1246 dq->do_targetq = ctxt;
1247 _dispatch_release(prev_dq);
1248 }
1249
1250 void
1251 dispatch_set_target_queue(dispatch_object_t dou, dispatch_queue_t dq)
1252 {
1253 if (slowpath(dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
1254 return;
1255 }
1256 // NOTE: we test for NULL target queues internally to detect root queues
1257 // therefore, if the retain crashes due to a bad input, that is OK
1258 _dispatch_retain(dq);
1259 dispatch_barrier_async_f(dou._dq, dq, _dispatch_set_target_queue2);
1260 }
1261
1262 static void
1263 _dispatch_async_f_redirect2(void *_ctxt)
1264 {
1265 struct dispatch_continuation_s *dc = _ctxt;
1266 struct dispatch_continuation_s *other_dc = dc->dc_data[1];
1267 dispatch_queue_t old_dq, dq = dc->dc_data[0];
1268
1269 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1270 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1271 _dispatch_continuation_pop(other_dc);
1272 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1273
1274 if (dispatch_atomic_sub(&dq->dq_running, 2) == 0) {
1275 _dispatch_wakeup(dq);
1276 }
1277 _dispatch_release(dq);
1278 }
1279
1280 static void
1281 _dispatch_async_f_redirect(dispatch_queue_t dq, struct dispatch_object_s *other_dc)
1282 {
1283 dispatch_continuation_t dc = (void *)other_dc;
1284 dispatch_queue_t root_dq = dq;
1285
1286 if (dc->dc_func == _dispatch_sync_f_slow2) {
1287 return dc->dc_func(dc->dc_ctxt);
1288 }
1289
1290 dispatch_atomic_add(&dq->dq_running, 2);
1291 _dispatch_retain(dq);
1292
1293 dc = _dispatch_continuation_alloc_cacheonly() ?: _dispatch_continuation_alloc_from_heap();
1294
1295 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
1296 dc->dc_func = _dispatch_async_f_redirect2;
1297 dc->dc_ctxt = dc;
1298 dc->dc_data[0] = dq;
1299 dc->dc_data[1] = other_dc;
1300
1301 do {
1302 root_dq = root_dq->do_targetq;
1303 } while (root_dq->do_targetq);
1304
1305 _dispatch_queue_push(root_dq, dc);
1306 }
1307
1308
1309 void
1310 _dispatch_queue_drain(dispatch_queue_t dq)
1311 {
1312 dispatch_queue_t orig_tq, old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
1313 struct dispatch_object_s *dc = NULL, *next_dc = NULL;
1314
1315 orig_tq = dq->do_targetq;
1316
1317 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1318
1319 while (dq->dq_items_tail) {
1320 while (!fastpath(dq->dq_items_head)) {
1321 _dispatch_hardware_pause();
1322 }
1323
1324 dc = dq->dq_items_head;
1325 dq->dq_items_head = NULL;
1326
1327 do {
1328 // Enqueue is TIGHTLY controlled, we won't wait long.
1329 do {
1330 next_dc = fastpath(dc->do_next);
1331 } while (!next_dc && !dispatch_atomic_cmpxchg(&dq->dq_items_tail, dc, NULL));
1332 if (DISPATCH_OBJECT_SUSPENDED(dq)) {
1333 goto out;
1334 }
1335 if (dq->dq_running > dq->dq_width) {
1336 goto out;
1337 }
1338 if (orig_tq != dq->do_targetq) {
1339 goto out;
1340 }
1341 if (fastpath(dq->dq_width == 1)) {
1342 _dispatch_continuation_pop(dc);
1343 _dispatch_workitem_inc();
1344 } else if ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
1345 if (dq->dq_running > 1) {
1346 goto out;
1347 }
1348 _dispatch_continuation_pop(dc);
1349 _dispatch_workitem_inc();
1350 } else {
1351 _dispatch_async_f_redirect(dq, dc);
1352 }
1353 } while ((dc = next_dc));
1354 }
1355
1356 out:
1357 // if this is not a complete drain, we must undo some things
1358 if (slowpath(dc)) {
1359 // 'dc' must NOT be "popped"
1360 // 'dc' might be the last item
1361 if (next_dc || dispatch_atomic_cmpxchg(&dq->dq_items_tail, NULL, dc)) {
1362 dq->dq_items_head = dc;
1363 } else {
1364 while (!(next_dc = dq->dq_items_head)) {
1365 _dispatch_hardware_pause();
1366 }
1367 dq->dq_items_head = dc;
1368 dc->do_next = next_dc;
1369 }
1370 }
1371
1372 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
1373 }
1374
1375 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1376 void *
1377 _dispatch_worker_thread(void *context)
1378 {
1379 dispatch_queue_t dq = context;
1380 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
1381 sigset_t mask;
1382 int r;
1383
1384 // workaround tweaks the kernel workqueue does for us
1385 r = sigfillset(&mask);
1386 dispatch_assume_zero(r);
1387 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL);
1388 dispatch_assume_zero(r);
1389
1390 do {
1391 _dispatch_worker_thread2(context);
1392 // we use 65 seconds in case there are any timers that run once a minute
1393 } while (dispatch_semaphore_wait(qc->dgq_thread_mediator, dispatch_time(0, 65ull * NSEC_PER_SEC)) == 0);
1394
1395 dispatch_atomic_inc(&qc->dgq_thread_pool_size);
1396 if (dq->dq_items_tail) {
1397 _dispatch_queue_wakeup_global(dq);
1398 }
1399
1400 return NULL;
1401 }
1402
1403 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
1404 void
1405 _dispatch_worker_thread2(void *context)
1406 {
1407 struct dispatch_object_s *item;
1408 dispatch_queue_t dq = context;
1409 struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
1410
1411 if (_dispatch_thread_getspecific(dispatch_queue_key)) {
1412 DISPATCH_CRASH("Premature thread recycling");
1413 }
1414
1415 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1416 qc->dgq_pending = 0;
1417
1418 #if DISPATCH_COCOA_COMPAT
1419 // ensure that high-level memory management techniques do not leak/crash
1420 dispatch_begin_thread_4GC();
1421 void *pool = _dispatch_begin_NSAutoReleasePool();
1422 #endif
1423
1424 #if DISPATCH_PERF_MON
1425 uint64_t start = mach_absolute_time();
1426 #endif
1427 while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
1428 _dispatch_continuation_pop(item);
1429 }
1430 #if DISPATCH_PERF_MON
1431 _dispatch_queue_merge_stats(start);
1432 #endif
1433
1434 #if DISPATCH_COCOA_COMPAT
1435 _dispatch_end_NSAutoReleasePool(pool);
1436 dispatch_end_thread_4GC();
1437 #endif
1438
1439 _dispatch_thread_setspecific(dispatch_queue_key, NULL);
1440
1441 _dispatch_force_cache_cleanup();
1442 }
1443
1444 #if DISPATCH_PERF_MON
1445 void
1446 _dispatch_queue_merge_stats(uint64_t start)
1447 {
1448 uint64_t avg, delta = mach_absolute_time() - start;
1449 unsigned long count, bucket;
1450
1451 count = (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key);
1452 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
1453
1454 if (count) {
1455 avg = delta / count;
1456 bucket = flsll(avg);
1457 } else {
1458 bucket = 0;
1459 }
1460
1461 // 64-bit counters on 32-bit require a lock or a queue
1462 OSSpinLockLock(&_dispatch_stats_lock);
1463
1464 _dispatch_stats[bucket].time_total += delta;
1465 _dispatch_stats[bucket].count_total += count;
1466 _dispatch_stats[bucket].thread_total++;
1467
1468 OSSpinLockUnlock(&_dispatch_stats_lock);
1469 }
1470 #endif
1471
1472 size_t
1473 dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
1474 {
1475 return snprintf(buf, bufsiz, "parent = %p ", dq->do_targetq);
1476 }
1477
1478 size_t
1479 dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
1480 {
1481 size_t offset = 0;
1482 offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", dq->dq_label, dq);
1483 offset += dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
1484 offset += dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
1485 offset += snprintf(&buf[offset], bufsiz - offset, "}");
1486 return offset;
1487 }
1488
1489 #if DISPATCH_DEBUG
1490 void
1491 dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
1492 if (fastpath(dq)) {
1493 dispatch_debug(dq, "%s", str);
1494 } else {
1495 _dispatch_log("queue[NULL]: %s", str);
1496 }
1497 }
1498 #endif
1499
1500 #if DISPATCH_COCOA_COMPAT
1501 void
1502 _dispatch_main_queue_callback_4CF(mach_msg_header_t *msg __attribute__((unused)))
1503 {
1504 if (main_q_is_draining) {
1505 return;
1506 }
1507 _dispatch_queue_set_mainq_drain_state(true);
1508 _dispatch_queue_serial_drain_till_empty(&_dispatch_main_q);
1509 _dispatch_queue_set_mainq_drain_state(false);
1510 }
1511
1512 mach_port_t
1513 _dispatch_get_main_queue_port_4CF(void)
1514 {
1515 dispatch_once_f(&_dispatch_main_q_port_pred, NULL, _dispatch_main_q_port_init);
1516 return main_q_port;
1517 }
1518 #endif
1519
1520 static void
1521 dispatch_queue_attr_dispose(dispatch_queue_attr_t attr)
1522 {
1523 dispatch_queue_attr_set_finalizer(attr, NULL);
1524 _dispatch_dispose(attr);
1525 }
1526
1527 static const struct dispatch_queue_attr_vtable_s dispatch_queue_attr_vtable = {
1528 .do_type = DISPATCH_QUEUE_ATTR_TYPE,
1529 .do_kind = "queue-attr",
1530 .do_dispose = dispatch_queue_attr_dispose,
1531 };
1532
1533 dispatch_queue_attr_t
1534 dispatch_queue_attr_create(void)
1535 {
1536 dispatch_queue_attr_t a = calloc(1, sizeof(struct dispatch_queue_attr_s));
1537
1538 if (a) {
1539 a->do_vtable = &dispatch_queue_attr_vtable;
1540 a->do_next = DISPATCH_OBJECT_LISTLESS;
1541 a->do_ref_cnt = 1;
1542 a->do_xref_cnt = 1;
1543 a->do_targetq = _dispatch_get_root_queue(0, 0);
1544 a->qa_flags = DISPATCH_QUEUE_OVERCOMMIT;
1545 }
1546 return a;
1547 }
1548
1549 void
1550 dispatch_queue_attr_set_flags(dispatch_queue_attr_t attr, uint64_t flags)
1551 {
1552 dispatch_assert_zero(flags & ~DISPATCH_QUEUE_FLAGS_MASK);
1553 attr->qa_flags = (unsigned long)flags & DISPATCH_QUEUE_FLAGS_MASK;
1554 }
1555
1556 void
1557 dispatch_queue_attr_set_priority(dispatch_queue_attr_t attr, int priority)
1558 {
1559 dispatch_debug_assert(attr, "NULL pointer");
1560 dispatch_debug_assert(priority <= DISPATCH_QUEUE_PRIORITY_HIGH && priority >= DISPATCH_QUEUE_PRIORITY_LOW, "Invalid priority");
1561
1562 if (priority > 0) {
1563 priority = DISPATCH_QUEUE_PRIORITY_HIGH;
1564 } else if (priority < 0) {
1565 priority = DISPATCH_QUEUE_PRIORITY_LOW;
1566 }
1567
1568 attr->qa_priority = priority;
1569 }
1570
1571 void
1572 dispatch_queue_attr_set_finalizer_f(dispatch_queue_attr_t attr,
1573 void *context, dispatch_queue_finalizer_function_t finalizer)
1574 {
1575 #ifdef __BLOCKS__
1576 if (attr->finalizer_func == (void*)_dispatch_call_block_and_release2) {
1577 Block_release(attr->finalizer_ctxt);
1578 }
1579 #endif
1580 attr->finalizer_ctxt = context;
1581 attr->finalizer_func = finalizer;
1582 }
1583
1584 #ifdef __BLOCKS__
1585 long
1586 dispatch_queue_attr_set_finalizer(dispatch_queue_attr_t attr,
1587 dispatch_queue_finalizer_t finalizer)
1588 {
1589 void *ctxt;
1590 dispatch_queue_finalizer_function_t func;
1591
1592 if (finalizer) {
1593 if (!(ctxt = Block_copy(finalizer))) {
1594 return 1;
1595 }
1596 func = (void *)_dispatch_call_block_and_release2;
1597 } else {
1598 ctxt = NULL;
1599 func = NULL;
1600 }
1601
1602 dispatch_queue_attr_set_finalizer_f(attr, ctxt, func);
1603
1604 return 0;
1605 }
1606 #endif
1607
1608 static void
1609 _dispatch_ccache_init(void *context __attribute__((unused)))
1610 {
1611 _dispatch_ccache_zone = malloc_create_zone(0, 0);
1612 dispatch_assert(_dispatch_ccache_zone);
1613 malloc_set_zone_name(_dispatch_ccache_zone, "DispatchContinuations");
1614 }
1615
1616 dispatch_continuation_t
1617 _dispatch_continuation_alloc_from_heap(void)
1618 {
1619 static dispatch_once_t pred;
1620 dispatch_continuation_t dc;
1621
1622 dispatch_once_f(&pred, NULL, _dispatch_ccache_init);
1623
1624 while (!(dc = fastpath(malloc_zone_calloc(_dispatch_ccache_zone, 1, ROUND_UP_TO_CACHELINE_SIZE(sizeof(*dc)))))) {
1625 sleep(1);
1626 }
1627
1628 return dc;
1629 }
1630
1631 void
1632 _dispatch_force_cache_cleanup(void)
1633 {
1634 dispatch_continuation_t dc = _dispatch_thread_getspecific(dispatch_cache_key);
1635 if (dc) {
1636 _dispatch_thread_setspecific(dispatch_cache_key, NULL);
1637 _dispatch_cache_cleanup2(dc);
1638 }
1639 }
1640
1641 DISPATCH_NOINLINE
1642 void
1643 _dispatch_cache_cleanup2(void *value)
1644 {
1645 dispatch_continuation_t dc, next_dc = value;
1646
1647 while ((dc = next_dc)) {
1648 next_dc = dc->do_next;
1649 malloc_zone_free(_dispatch_ccache_zone, dc);
1650 }
1651 }
1652
1653 static char _dispatch_build[16];
1654
1655 static void
1656 _dispatch_bug_init(void *context __attribute__((unused)))
1657 {
1658 int mib[] = { CTL_KERN, KERN_OSVERSION };
1659 size_t bufsz = sizeof(_dispatch_build);
1660
1661 sysctl(mib, 2, _dispatch_build, &bufsz, NULL, 0);
1662 }
1663
1664 void
1665 _dispatch_bug(size_t line, long val)
1666 {
1667 static dispatch_once_t pred;
1668 static void *last_seen;
1669 void *ra = __builtin_return_address(0);
1670
1671 dispatch_once_f(&pred, NULL, _dispatch_bug_init);
1672 if (last_seen != ra) {
1673 last_seen = ra;
1674 _dispatch_log("BUG in libdispatch: %s - %lu - 0x%lx", _dispatch_build, line, val);
1675 }
1676 }
1677
1678 void
1679 _dispatch_abort(size_t line, long val)
1680 {
1681 _dispatch_bug(line, val);
1682 abort();
1683 }
1684
1685 void
1686 _dispatch_log(const char *msg, ...)
1687 {
1688 va_list ap;
1689
1690 va_start(ap, msg);
1691
1692 _dispatch_logv(msg, ap);
1693
1694 va_end(ap);
1695 }
1696
1697 void
1698 _dispatch_logv(const char *msg, va_list ap)
1699 {
1700 #if DISPATCH_DEBUG
1701 static FILE *logfile, *tmp;
1702 char newbuf[strlen(msg) + 2];
1703 char path[PATH_MAX];
1704
1705 sprintf(newbuf, "%s\n", msg);
1706
1707 if (!logfile) {
1708 snprintf(path, sizeof(path), "/var/tmp/libdispatch.%d.log", getpid());
1709 tmp = fopen(path, "a");
1710 assert(tmp);
1711 if (!dispatch_atomic_cmpxchg(&logfile, NULL, tmp)) {
1712 fclose(tmp);
1713 } else {
1714 struct timeval tv;
1715 gettimeofday(&tv, NULL);
1716 fprintf(logfile, "=== log file opened for %s[%u] at %ld.%06u ===\n",
1717 getprogname() ?: "", getpid(), tv.tv_sec, tv.tv_usec);
1718 }
1719 }
1720 vfprintf(logfile, newbuf, ap);
1721 fflush(logfile);
1722 #else
1723 vsyslog(LOG_NOTICE, msg, ap);
1724 #endif
1725 }
1726
1727 int
1728 _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset)
1729 {
1730 int r;
1731
1732 /* Workaround: 6269619 Not all signals can be delivered on any thread */
1733
1734 r = sigdelset(set, SIGILL);
1735 dispatch_assume_zero(r);
1736 r = sigdelset(set, SIGTRAP);
1737 dispatch_assume_zero(r);
1738 r = sigdelset(set, SIGEMT);
1739 dispatch_assume_zero(r);
1740 r = sigdelset(set, SIGFPE);
1741 dispatch_assume_zero(r);
1742 r = sigdelset(set, SIGBUS);
1743 dispatch_assume_zero(r);
1744 r = sigdelset(set, SIGSEGV);
1745 dispatch_assume_zero(r);
1746 r = sigdelset(set, SIGSYS);
1747 dispatch_assume_zero(r);
1748 r = sigdelset(set, SIGPIPE);
1749 dispatch_assume_zero(r);
1750
1751 return pthread_sigmask(how, set, oset);
1752 }
1753
1754 bool _dispatch_safe_fork = true;
1755
1756 void
1757 dispatch_atfork_prepare(void)
1758 {
1759 }
1760
1761 void
1762 dispatch_atfork_parent(void)
1763 {
1764 }
1765
1766 void
1767 dispatch_atfork_child(void)
1768 {
1769 void *crash = (void *)0x100;
1770 size_t i;
1771
1772 if (_dispatch_safe_fork) {
1773 return;
1774 }
1775
1776 _dispatch_main_q.dq_items_head = crash;
1777 _dispatch_main_q.dq_items_tail = crash;
1778
1779 _dispatch_mgr_q.dq_items_head = crash;
1780 _dispatch_mgr_q.dq_items_tail = crash;
1781
1782 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
1783 _dispatch_root_queues[i].dq_items_head = crash;
1784 _dispatch_root_queues[i].dq_items_tail = crash;
1785 }
1786 }
1787
1788 void
1789 dispatch_init_pthread(pthread_t pthr __attribute__((unused)))
1790 {
1791 }
1792
1793 static int _dispatch_kq;
1794
1795 static void
1796 _dispatch_get_kq_init(void *context __attribute__((unused)))
1797 {
1798 static const struct kevent kev = {
1799 .ident = 1,
1800 .filter = EVFILT_USER,
1801 .flags = EV_ADD|EV_CLEAR,
1802 };
1803
1804 _dispatch_kq = kqueue();
1805 _dispatch_safe_fork = false;
1806 // in case we fall back to select()
1807 FD_SET(_dispatch_kq, &_dispatch_rfds);
1808
1809 if (_dispatch_kq == -1) {
1810 dispatch_assert_zero(errno);
1811 }
1812
1813 dispatch_assume_zero(kevent(_dispatch_kq, &kev, 1, NULL, 0, NULL));
1814
1815 _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q);
1816 }
1817
1818 static int
1819 _dispatch_get_kq(void)
1820 {
1821 static dispatch_once_t pred;
1822
1823 dispatch_once_f(&pred, NULL, _dispatch_get_kq_init);
1824
1825 return _dispatch_kq;
1826 }
1827
1828 static void
1829 _dispatch_mgr_thread2(struct kevent *kev, size_t cnt)
1830 {
1831 size_t i;
1832
1833 for (i = 0; i < cnt; i++) {
1834 // EVFILT_USER isn't used by sources
1835 if (kev[i].filter == EVFILT_USER) {
1836 // If _dispatch_mgr_thread2() ever is changed to return to the
1837 // caller, then this should become _dispatch_queue_drain()
1838 _dispatch_queue_serial_drain_till_empty(&_dispatch_mgr_q);
1839 } else {
1840 _dispatch_source_drain_kevent(&kev[i]);
1841 }
1842 }
1843 }
1844
1845 static dispatch_queue_t
1846 _dispatch_mgr_invoke(dispatch_queue_t dq)
1847 {
1848 static const struct timespec timeout_immediately = { 0, 0 };
1849 struct timespec timeout;
1850 const struct timespec *timeoutp;
1851 struct timeval sel_timeout, *sel_timeoutp;
1852 fd_set tmp_rfds, tmp_wfds;
1853 struct kevent kev[1];
1854 int k_cnt, k_err, i, r;
1855
1856 _dispatch_thread_setspecific(dispatch_queue_key, dq);
1857
1858 for (;;) {
1859 _dispatch_run_timers();
1860
1861 timeoutp = _dispatch_get_next_timer_fire(&timeout);
1862
1863 if (_dispatch_select_workaround) {
1864 FD_COPY(&_dispatch_rfds, &tmp_rfds);
1865 FD_COPY(&_dispatch_wfds, &tmp_wfds);
1866 if (timeoutp) {
1867 sel_timeout.tv_sec = timeoutp->tv_sec;
1868 sel_timeout.tv_usec = (typeof(sel_timeout.tv_usec))(timeoutp->tv_nsec / 1000u);
1869 sel_timeoutp = &sel_timeout;
1870 } else {
1871 sel_timeoutp = NULL;
1872 }
1873
1874 r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, sel_timeoutp);
1875 if (r == -1) {
1876 if (errno != EBADF) {
1877 dispatch_assume_zero(errno);
1878 continue;
1879 }
1880 for (i = 0; i < FD_SETSIZE; i++) {
1881 if (i == _dispatch_kq) {
1882 continue;
1883 }
1884 if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)) {
1885 continue;
1886 }
1887 r = dup(i);
1888 if (r != -1) {
1889 close(r);
1890 } else {
1891 FD_CLR(i, &_dispatch_rfds);
1892 FD_CLR(i, &_dispatch_wfds);
1893 _dispatch_rfd_ptrs[i] = 0;
1894 _dispatch_wfd_ptrs[i] = 0;
1895 }
1896 }
1897 continue;
1898 }
1899
1900 if (r > 0) {
1901 for (i = 0; i < FD_SETSIZE; i++) {
1902 if (i == _dispatch_kq) {
1903 continue;
1904 }
1905 if (FD_ISSET(i, &tmp_rfds)) {
1906 FD_CLR(i, &_dispatch_rfds); // emulate EV_DISABLE
1907 EV_SET(&kev[0], i, EVFILT_READ, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_rfd_ptrs[i]);
1908 _dispatch_rfd_ptrs[i] = 0;
1909 _dispatch_mgr_thread2(kev, 1);
1910 }
1911 if (FD_ISSET(i, &tmp_wfds)) {
1912 FD_CLR(i, &_dispatch_wfds); // emulate EV_DISABLE
1913 EV_SET(&kev[0], i, EVFILT_WRITE, EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, _dispatch_wfd_ptrs[i]);
1914 _dispatch_wfd_ptrs[i] = 0;
1915 _dispatch_mgr_thread2(kev, 1);
1916 }
1917 }
1918 }
1919
1920 timeoutp = &timeout_immediately;
1921 }
1922
1923 k_cnt = kevent(_dispatch_kq, NULL, 0, kev, sizeof(kev) / sizeof(kev[0]), timeoutp);
1924 k_err = errno;
1925
1926 switch (k_cnt) {
1927 case -1:
1928 if (k_err == EBADF) {
1929 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
1930 }
1931 dispatch_assume_zero(k_err);
1932 continue;
1933 default:
1934 _dispatch_mgr_thread2(kev, (size_t)k_cnt);
1935 // fall through
1936 case 0:
1937 _dispatch_force_cache_cleanup();
1938 continue;
1939 }
1940 }
1941
1942 return NULL;
1943 }
1944
1945 static bool
1946 _dispatch_mgr_wakeup(dispatch_queue_t dq)
1947 {
1948 static const struct kevent kev = {
1949 .ident = 1,
1950 .filter = EVFILT_USER,
1951 #ifdef EV_TRIGGER
1952 .flags = EV_TRIGGER,
1953 #endif
1954 #ifdef NOTE_TRIGGER
1955 .fflags = NOTE_TRIGGER,
1956 #endif
1957 };
1958
1959 _dispatch_debug("waking up the _dispatch_mgr_q: %p", dq);
1960
1961 _dispatch_update_kq(&kev);
1962
1963 return false;
1964 }
1965
1966 void
1967 _dispatch_update_kq(const struct kevent *kev)
1968 {
1969 struct kevent kev_copy = *kev;
1970 kev_copy.flags |= EV_RECEIPT;
1971
1972 if (kev_copy.flags & EV_DELETE) {
1973 switch (kev_copy.filter) {
1974 case EVFILT_READ:
1975 if (FD_ISSET((int)kev_copy.ident, &_dispatch_rfds)) {
1976 FD_CLR((int)kev_copy.ident, &_dispatch_rfds);
1977 _dispatch_rfd_ptrs[kev_copy.ident] = 0;
1978 return;
1979 }
1980 case EVFILT_WRITE:
1981 if (FD_ISSET((int)kev_copy.ident, &_dispatch_wfds)) {
1982 FD_CLR((int)kev_copy.ident, &_dispatch_wfds);
1983 _dispatch_wfd_ptrs[kev_copy.ident] = 0;
1984 return;
1985 }
1986 default:
1987 break;
1988 }
1989 }
1990
1991 int rval = kevent(_dispatch_get_kq(), &kev_copy, 1, &kev_copy, 1, NULL);
1992 if (rval == -1) {
1993 // If we fail to register with kevents, for other reasons aside from
1994 // changelist elements.
1995 dispatch_assume_zero(errno);
1996 //kev_copy.flags |= EV_ERROR;
1997 //kev_copy.data = error;
1998 return;
1999 }
2000
2001 // The following select workaround only applies to adding kevents
2002 if (!(kev->flags & EV_ADD)) {
2003 return;
2004 }
2005
2006 switch (kev_copy.data) {
2007 case 0:
2008 return;
2009 case EBADF:
2010 break;
2011 default:
2012 // If an error occurred while registering with kevent, and it was
2013 // because of a kevent changelist processing && the kevent involved
2014 // either doing a read or write, it would indicate we were trying
2015 // to register a /dev/* port; fall back to select
2016 switch (kev_copy.filter) {
2017 case EVFILT_READ:
2018 _dispatch_select_workaround = true;
2019 FD_SET((int)kev_copy.ident, &_dispatch_rfds);
2020 _dispatch_rfd_ptrs[kev_copy.ident] = kev_copy.udata;
2021 break;
2022 case EVFILT_WRITE:
2023 _dispatch_select_workaround = true;
2024 FD_SET((int)kev_copy.ident, &_dispatch_wfds);
2025 _dispatch_wfd_ptrs[kev_copy.ident] = kev_copy.udata;
2026 break;
2027 default:
2028 _dispatch_source_drain_kevent(&kev_copy);
2029 break;
2030 }
2031 break;
2032 }
2033 }
2034
2035 static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = {
2036 .do_type = DISPATCH_QUEUE_MGR_TYPE,
2037 .do_kind = "mgr-queue",
2038 .do_invoke = _dispatch_mgr_invoke,
2039 .do_debug = dispatch_queue_debug,
2040 .do_probe = _dispatch_mgr_wakeup,
2041 };
2042
2043 // 6618342 Contact the team that owns the Instrument DTrace probe before renaming this symbol
2044 struct dispatch_queue_s _dispatch_mgr_q = {
2045 .do_vtable = &_dispatch_queue_mgr_vtable,
2046 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
2047 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
2048 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
2049 .do_targetq = &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_COUNT - 1],
2050
2051 .dq_label = "com.apple.libdispatch-manager",
2052 .dq_width = 1,
2053 .dq_serialnum = 2,
2054 };
2055
2056 const struct dispatch_queue_offsets_s dispatch_queue_offsets = {
2057 .dqo_version = 3,
2058 .dqo_label = offsetof(struct dispatch_queue_s, dq_label),
2059 .dqo_label_size = sizeof(_dispatch_main_q.dq_label),
2060 .dqo_flags = 0,
2061 .dqo_flags_size = 0,
2062 .dqo_width = offsetof(struct dispatch_queue_s, dq_width),
2063 .dqo_width_size = sizeof(_dispatch_main_q.dq_width),
2064 .dqo_serialnum = offsetof(struct dispatch_queue_s, dq_serialnum),
2065 .dqo_serialnum_size = sizeof(_dispatch_main_q.dq_serialnum),
2066 .dqo_running = offsetof(struct dispatch_queue_s, dq_running),
2067 .dqo_running_size = sizeof(_dispatch_main_q.dq_running),
2068 };
2069
2070 #ifdef __BLOCKS__
2071 void
2072 dispatch_after(dispatch_time_t when, dispatch_queue_t queue, dispatch_block_t work)
2073 {
2074 // test before the copy of the block
2075 if (when == DISPATCH_TIME_FOREVER) {
2076 #if DISPATCH_DEBUG
2077 DISPATCH_CLIENT_CRASH("dispatch_after() called with 'when' == infinity");
2078 #endif
2079 return;
2080 }
2081 dispatch_after_f(when, queue, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
2082 }
2083 #endif
2084
2085 DISPATCH_NOINLINE
2086 void
2087 dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt, void (*func)(void *))
2088 {
2089 uint64_t delta;
2090 if (when == DISPATCH_TIME_FOREVER) {
2091 #if DISPATCH_DEBUG
2092 DISPATCH_CLIENT_CRASH("dispatch_after_f() called with 'when' == infinity");
2093 #endif
2094 return;
2095 }
2096
2097 // this function can and should be optimized to not use a dispatch source
2098 again:
2099 delta = _dispatch_timeout(when);
2100 if (delta == 0) {
2101 return dispatch_async_f(queue, ctxt, func);
2102 }
2103 if (!dispatch_source_timer_create(DISPATCH_TIMER_INTERVAL, delta, 0, NULL, queue, ^(dispatch_source_t ds) {
2104 long err_dom, err_val;
2105 if ((err_dom = dispatch_source_get_error(ds, &err_val))) {
2106 dispatch_assert(err_dom == DISPATCH_ERROR_DOMAIN_POSIX);
2107 dispatch_assert(err_val == ECANCELED);
2108 func(ctxt);
2109 dispatch_release(ds); // MUST NOT be _dispatch_release()
2110 } else {
2111 dispatch_source_cancel(ds);
2112 }
2113 })) {
2114 goto again;
2115 }
2116 }