2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 // semaphores are too fundamental to use the dispatch_assume*() macros
25 // rdar://problem/8428132
26 static DWORD best_resolution
= 1; // 1ms
29 _push_timer_resolution(DWORD ms
)
32 static dispatch_once_t once
;
35 // only update timer resolution if smaller than default 15.6ms
36 // zero means not updated
40 // aim for the best resolution we can accomplish
41 dispatch_once(&once
, ^{
44 res
= timeGetDevCaps(&tc
, sizeof(tc
));
45 if (res
== MMSYSERR_NOERROR
) {
46 best_resolution
= min(max(tc
.wPeriodMin
, best_resolution
),
51 res
= timeBeginPeriod(best_resolution
);
52 if (res
== TIMERR_NOERROR
) {
53 return best_resolution
;
55 // zero means not updated
59 // match ms parameter to result from _push_timer_resolution
61 _pop_timer_resolution(DWORD ms
)
67 #endif /* USE_WIN32_SEM */
70 DISPATCH_WEAK
// rdar://problem/8503746
71 long _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema
);
74 #pragma mark dispatch_semaphore_class_t
77 _dispatch_semaphore_class_init(long value
, dispatch_semaphore_class_t dsemau
)
79 struct dispatch_semaphore_header_s
*dsema
= dsemau
._dsema_hdr
;
81 dsema
->do_next
= DISPATCH_OBJECT_LISTLESS
;
82 dsema
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
84 dsema
->dsema_value
= value
;
86 int ret
= sem_init(&dsema
->dsema_sem
, 0, 0);
87 DISPATCH_SEMAPHORE_VERIFY_RET(ret
);
92 _dispatch_semaphore_class_dispose(dispatch_semaphore_class_t dsemau
)
94 struct dispatch_semaphore_header_s
*dsema
= dsemau
._dsema_hdr
;
98 if (dsema
->dsema_port
) {
99 kr
= semaphore_destroy(mach_task_self(), dsema
->dsema_port
);
100 DISPATCH_VERIFY_MIG(kr
);
101 DISPATCH_SEMAPHORE_VERIFY_KR(kr
);
103 dsema
->dsema_port
= MACH_PORT_DEAD
;
105 int ret
= sem_destroy(&dsema
->dsema_sem
);
106 DISPATCH_SEMAPHORE_VERIFY_RET(ret
);
108 if (dsema
->dsema_handle
) {
109 CloseHandle(dsema
->dsema_handle
);
115 #pragma mark dispatch_semaphore_t
118 dispatch_semaphore_create(long value
)
120 dispatch_semaphore_t dsema
;
122 // If the internal value is negative, then the absolute of the value is
123 // equal to the number of waiting threads. Therefore it is bogus to
124 // initialize the semaphore with a negative value.
126 return DISPATCH_BAD_INPUT
;
129 dsema
= (dispatch_semaphore_t
)_dispatch_alloc(DISPATCH_VTABLE(semaphore
),
130 sizeof(struct dispatch_semaphore_s
));
131 _dispatch_semaphore_class_init(value
, dsema
);
132 dsema
->dsema_orig
= value
;
138 _dispatch_semaphore_create_port(semaphore_t
*s4
)
146 _dispatch_fork_becomes_unsafe();
148 // lazily allocate the semaphore port
151 // 1) Switch to a doubly-linked FIFO in user-space.
152 // 2) User-space timers for the timeout.
153 // 3) Use the per-thread semaphore port.
155 while ((kr
= semaphore_create(mach_task_self(), &tmp
,
156 SYNC_POLICY_FIFO
, 0))) {
157 DISPATCH_VERIFY_MIG(kr
);
158 _dispatch_temporary_resource_shortage();
161 if (!os_atomic_cmpxchg(s4
, 0, tmp
, relaxed
)) {
162 kr
= semaphore_destroy(mach_task_self(), tmp
);
163 DISPATCH_VERIFY_MIG(kr
);
164 DISPATCH_SEMAPHORE_VERIFY_KR(kr
);
169 _dispatch_semaphore_create_handle(HANDLE
*s4
)
177 // lazily allocate the semaphore port
179 while (!dispatch_assume(tmp
= CreateSemaphore(NULL
, 0, LONG_MAX
, NULL
))) {
180 _dispatch_temporary_resource_shortage();
183 if (!os_atomic_cmpxchg(s4
, 0, tmp
)) {
190 _dispatch_semaphore_dispose(dispatch_object_t dou
)
192 dispatch_semaphore_t dsema
= dou
._dsema
;
194 if (dsema
->dsema_value
< dsema
->dsema_orig
) {
195 DISPATCH_CLIENT_CRASH(dsema
->dsema_orig
- dsema
->dsema_value
,
196 "Semaphore object deallocated while in use");
199 _dispatch_semaphore_class_dispose(dsema
);
203 _dispatch_semaphore_debug(dispatch_object_t dou
, char *buf
, size_t bufsiz
)
205 dispatch_semaphore_t dsema
= dou
._dsema
;
208 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
209 dx_kind(dsema
), dsema
);
210 offset
+= _dispatch_object_debug_attr(dsema
, &buf
[offset
], bufsiz
- offset
);
212 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "port = 0x%u, ",
215 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
,
216 "value = %ld, orig = %ld }", dsema
->dsema_value
, dsema
->dsema_orig
);
222 _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema
)
225 _dispatch_semaphore_create_port(&dsema
->dsema_port
);
226 kern_return_t kr
= semaphore_signal(dsema
->dsema_port
);
227 DISPATCH_SEMAPHORE_VERIFY_KR(kr
);
229 int ret
= sem_post(&dsema
->dsema_sem
);
230 DISPATCH_SEMAPHORE_VERIFY_RET(ret
);
232 _dispatch_semaphore_create_handle(&dsema
->dsema_handle
);
233 int ret
= ReleaseSemaphore(dsema
->dsema_handle
, 1, NULL
);
234 dispatch_assume(ret
);
240 dispatch_semaphore_signal(dispatch_semaphore_t dsema
)
242 long value
= os_atomic_inc2o(dsema
, dsema_value
, release
);
243 if (fastpath(value
> 0)) {
246 if (slowpath(value
== LONG_MIN
)) {
247 DISPATCH_CLIENT_CRASH(value
,
248 "Unbalanced call to dispatch_semaphore_signal()");
250 return _dispatch_semaphore_signal_slow(dsema
);
255 _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema
,
256 dispatch_time_t timeout
)
261 mach_timespec_t _timeout
;
264 struct timespec _timeout
;
274 _dispatch_semaphore_create_port(&dsema
->dsema_port
);
276 _dispatch_semaphore_create_handle(&dsema
->dsema_handle
);
283 uint64_t nsec
= _dispatch_timeout(timeout
);
284 _timeout
.tv_sec
= (typeof(_timeout
.tv_sec
))(nsec
/ NSEC_PER_SEC
);
285 _timeout
.tv_nsec
= (typeof(_timeout
.tv_nsec
))(nsec
% NSEC_PER_SEC
);
286 kr
= slowpath(semaphore_timedwait(dsema
->dsema_port
, _timeout
));
287 } while (kr
== KERN_ABORTED
);
289 if (kr
!= KERN_OPERATION_TIMED_OUT
) {
290 DISPATCH_SEMAPHORE_VERIFY_KR(kr
);
295 uint64_t nsec
= _dispatch_time_nanoseconds_since_epoch(timeout
);
296 _timeout
.tv_sec
= (typeof(_timeout
.tv_sec
))(nsec
/ NSEC_PER_SEC
);
297 _timeout
.tv_nsec
= (typeof(_timeout
.tv_nsec
))(nsec
% NSEC_PER_SEC
);
298 ret
= slowpath(sem_timedwait(&dsema
->dsema_sem
, &_timeout
));
299 } while (ret
== -1 && errno
== EINTR
);
301 if (!(ret
== -1 && errno
== ETIMEDOUT
)) {
302 DISPATCH_SEMAPHORE_VERIFY_RET(ret
);
306 nsec
= _dispatch_timeout(timeout
);
307 msec
= (DWORD
)(nsec
/ (uint64_t)1000000);
308 resolution
= _push_timer_resolution(msec
);
309 wait_result
= WaitForSingleObject(dsema
->dsema_handle
, msec
);
310 _pop_timer_resolution(resolution
);
311 if (wait_result
!= WAIT_TIMEOUT
) {
315 // Fall through and try to undo what the fast path did to
316 // dsema->dsema_value
317 case DISPATCH_TIME_NOW
:
318 orig
= dsema
->dsema_value
;
320 if (os_atomic_cmpxchgvw2o(dsema
, dsema_value
, orig
, orig
+ 1,
323 return KERN_OPERATION_TIMED_OUT
;
324 #elif USE_POSIX_SEM || USE_WIN32_SEM
330 // Another thread called semaphore_signal().
331 // Fall through and drain the wakeup.
332 case DISPATCH_TIME_FOREVER
:
335 kr
= semaphore_wait(dsema
->dsema_port
);
336 } while (kr
== KERN_ABORTED
);
337 DISPATCH_SEMAPHORE_VERIFY_KR(kr
);
340 ret
= sem_wait(&dsema
->dsema_sem
);
342 DISPATCH_SEMAPHORE_VERIFY_RET(ret
);
344 WaitForSingleObject(dsema
->dsema_handle
, INFINITE
);
352 dispatch_semaphore_wait(dispatch_semaphore_t dsema
, dispatch_time_t timeout
)
354 long value
= os_atomic_dec2o(dsema
, dsema_value
, acquire
);
355 if (fastpath(value
>= 0)) {
358 return _dispatch_semaphore_wait_slow(dsema
, timeout
);
362 #pragma mark dispatch_group_t
364 DISPATCH_ALWAYS_INLINE
365 static inline dispatch_group_t
366 _dispatch_group_create_with_count(long count
)
368 dispatch_group_t dg
= (dispatch_group_t
)_dispatch_alloc(
369 DISPATCH_VTABLE(group
), sizeof(struct dispatch_group_s
));
370 _dispatch_semaphore_class_init(count
, dg
);
372 os_atomic_store2o(dg
, do_ref_cnt
, 1, relaxed
); // <rdar://problem/22318411>
378 dispatch_group_create(void)
380 return _dispatch_group_create_with_count(0);
384 _dispatch_group_create_and_enter(void)
386 return _dispatch_group_create_with_count(1);
390 dispatch_group_enter(dispatch_group_t dg
)
392 long value
= os_atomic_inc_orig2o(dg
, dg_value
, acquire
);
393 if (slowpath((unsigned long)value
>= (unsigned long)LONG_MAX
)) {
394 DISPATCH_CLIENT_CRASH(value
,
395 "Too many nested calls to dispatch_group_enter()");
398 _dispatch_retain(dg
); // <rdar://problem/22318411>
404 _dispatch_group_wake(dispatch_group_t dg
, bool needs_release
)
406 dispatch_continuation_t next
, head
, tail
= NULL
;
409 // cannot use os_mpsc_capture_snapshot() because we can have concurrent
410 // _dispatch_group_wake() calls
411 head
= os_atomic_xchg2o(dg
, dg_notify_head
, NULL
, relaxed
);
413 // snapshot before anything is notified/woken <rdar://problem/8554546>
414 tail
= os_atomic_xchg2o(dg
, dg_notify_tail
, NULL
, release
);
416 rval
= (long)os_atomic_xchg2o(dg
, dg_waiters
, 0, relaxed
);
418 // wake group waiters
420 _dispatch_semaphore_create_port(&dg
->dg_port
);
422 kern_return_t kr
= semaphore_signal(dg
->dg_port
);
423 DISPATCH_GROUP_VERIFY_KR(kr
);
427 int ret
= sem_post(&dg
->dg_sem
);
428 DISPATCH_SEMAPHORE_VERIFY_RET(ret
);
431 _dispatch_semaphore_create_handle(&dg
->dg_handle
);
433 ret
= ReleaseSemaphore(dg
->dg_handle
, rval
, NULL
);
434 dispatch_assume(ret
);
436 #error "No supported semaphore type"
440 // async group notify blocks
442 next
= os_mpsc_pop_snapshot_head(head
, tail
, do_next
);
443 dispatch_queue_t dsn_queue
= (dispatch_queue_t
)head
->dc_data
;
444 _dispatch_continuation_async(dsn_queue
, head
);
445 _dispatch_release(dsn_queue
);
446 } while ((head
= next
));
447 _dispatch_release(dg
);
450 _dispatch_release(dg
); // <rdar://problem/22318411>
456 dispatch_group_leave(dispatch_group_t dg
)
458 long value
= os_atomic_dec2o(dg
, dg_value
, release
);
459 if (slowpath(value
== 0)) {
460 return (void)_dispatch_group_wake(dg
, true);
462 if (slowpath(value
< 0)) {
463 DISPATCH_CLIENT_CRASH(value
,
464 "Unbalanced call to dispatch_group_leave()");
469 _dispatch_group_dispose(dispatch_object_t dou
)
471 dispatch_group_t dg
= dou
._dg
;
474 DISPATCH_CLIENT_CRASH(dg
->dg_value
,
475 "Group object deallocated while in use");
478 _dispatch_semaphore_class_dispose(dg
);
482 _dispatch_group_debug(dispatch_object_t dou
, char *buf
, size_t bufsiz
)
484 dispatch_group_t dg
= dou
._dg
;
487 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
489 offset
+= _dispatch_object_debug_attr(dg
, &buf
[offset
], bufsiz
- offset
);
491 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "port = 0x%u, ",
494 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
,
495 "count = %ld, waiters = %d }", dg
->dg_value
, dg
->dg_waiters
);
501 _dispatch_group_wait_slow(dispatch_group_t dg
, dispatch_time_t timeout
)
507 mach_timespec_t _timeout
;
509 #elif USE_POSIX_SEM // KVV
510 struct timespec _timeout
;
512 #elif USE_WIN32_SEM // KVV
519 // check before we cause another signal to be sent by incrementing
521 value
= os_atomic_load2o(dg
, dg_value
, ordered
); // 19296565
523 return _dispatch_group_wake(dg
, false);
526 (void)os_atomic_inc2o(dg
, dg_waiters
, relaxed
);
527 // check the values again in case we need to wake any threads
528 value
= os_atomic_load2o(dg
, dg_value
, ordered
); // 19296565
530 _dispatch_group_wake(dg
, false);
531 // Fall through to consume the extra signal, forcing timeout to avoid
532 // useless setups as it won't block
533 timeout
= DISPATCH_TIME_FOREVER
;
537 _dispatch_semaphore_create_port(&dg
->dg_port
);
539 _dispatch_semaphore_create_handle(&dg
->dg_handle
);
546 uint64_t nsec
= _dispatch_timeout(timeout
);
547 _timeout
.tv_sec
= (typeof(_timeout
.tv_sec
))(nsec
/ NSEC_PER_SEC
);
548 _timeout
.tv_nsec
= (typeof(_timeout
.tv_nsec
))(nsec
% NSEC_PER_SEC
);
549 kr
= slowpath(semaphore_timedwait(dg
->dg_port
, _timeout
));
550 } while (kr
== KERN_ABORTED
);
552 if (kr
!= KERN_OPERATION_TIMED_OUT
) {
553 DISPATCH_GROUP_VERIFY_KR(kr
);
558 uint64_t nsec
= _dispatch_time_nanoseconds_since_epoch(timeout
);
559 _timeout
.tv_sec
= (typeof(_timeout
.tv_sec
))(nsec
/ NSEC_PER_SEC
);
560 _timeout
.tv_nsec
= (typeof(_timeout
.tv_nsec
))(nsec
% NSEC_PER_SEC
);
561 ret
= slowpath(sem_timedwait(&dg
->dg_sem
, &_timeout
));
562 } while (ret
== -1 && errno
== EINTR
);
564 if (!(ret
== -1 && errno
== ETIMEDOUT
)) {
565 DISPATCH_SEMAPHORE_VERIFY_RET(ret
);
569 nsec
= _dispatch_timeout(timeout
);
570 msec
= (DWORD
)(nsec
/ (uint64_t)1000000);
571 resolution
= _push_timer_resolution(msec
);
572 wait_result
= WaitForSingleObject(dg
->dg_handle
, msec
);
573 _pop_timer_resolution(resolution
);
574 if (wait_result
!= WAIT_TIMEOUT
) {
578 // Fall through and try to undo the earlier change to
580 case DISPATCH_TIME_NOW
:
581 orig_waiters
= dg
->dg_waiters
;
582 while (orig_waiters
) {
583 if (os_atomic_cmpxchgvw2o(dg
, dg_waiters
, orig_waiters
,
584 orig_waiters
- 1, &orig_waiters
, relaxed
)) {
586 return KERN_OPERATION_TIMED_OUT
;
587 #elif USE_POSIX_SEM || USE_WIN32_SEM
593 // Another thread called semaphore_signal().
594 // Fall through and drain the wakeup.
595 case DISPATCH_TIME_FOREVER
:
598 kr
= semaphore_wait(dg
->dg_port
);
599 } while (kr
== KERN_ABORTED
);
600 DISPATCH_GROUP_VERIFY_KR(kr
);
603 ret
= sem_wait(&dg
->dg_sem
);
604 } while (ret
== -1 && errno
== EINTR
);
605 DISPATCH_SEMAPHORE_VERIFY_RET(ret
);
607 WaitForSingleObject(dg
->dg_handle
, INFINITE
);
615 dispatch_group_wait(dispatch_group_t dg
, dispatch_time_t timeout
)
617 if (dg
->dg_value
== 0) {
622 return KERN_OPERATION_TIMED_OUT
;
623 #elif USE_POSIX_SEM || USE_WIN32_SEM
628 return _dispatch_group_wait_slow(dg
, timeout
);
631 DISPATCH_ALWAYS_INLINE
633 _dispatch_group_notify(dispatch_group_t dg
, dispatch_queue_t dq
,
634 dispatch_continuation_t dsn
)
638 _dispatch_retain(dq
);
639 if (os_mpsc_push_update_tail(dg
, dg_notify
, dsn
, do_next
)) {
640 _dispatch_retain(dg
);
641 os_atomic_store2o(dg
, dg_notify_head
, dsn
, ordered
);
642 // seq_cst with atomic store to notify_head <rdar://problem/11750916>
643 if (os_atomic_load2o(dg
, dg_value
, ordered
) == 0) {
644 _dispatch_group_wake(dg
, false);
651 dispatch_group_notify_f(dispatch_group_t dg
, dispatch_queue_t dq
, void *ctxt
,
652 dispatch_function_t func
)
654 dispatch_continuation_t dsn
= _dispatch_continuation_alloc();
655 _dispatch_continuation_init_f(dsn
, dq
, ctxt
, func
, 0, 0,
656 DISPATCH_OBJ_CONSUME_BIT
);
657 _dispatch_group_notify(dg
, dq
, dsn
);
662 dispatch_group_notify(dispatch_group_t dg
, dispatch_queue_t dq
,
665 dispatch_continuation_t dsn
= _dispatch_continuation_alloc();
666 _dispatch_continuation_init(dsn
, dq
, db
, 0, 0, DISPATCH_OBJ_CONSUME_BIT
);
667 _dispatch_group_notify(dg
, dq
, dsn
);