X-Git-Url: https://git.saurik.com/apple/libdispatch.git/blobdiff_plain/517da941284910bcce6aed25a1e923708f0ed33f..refs/heads/master:/src/semaphore.c diff --git a/src/semaphore.c b/src/semaphore.c index 20d9ae5..3fe94c6 100644 --- a/src/semaphore.c +++ b/src/semaphore.c @@ -20,91 +20,26 @@ #include "internal.h" -// semaphores are too fundamental to use the dispatch_assume*() macros -#if USE_MACH_SEM -#define DISPATCH_SEMAPHORE_VERIFY_KR(x) do { \ - if (slowpath(x)) { \ - DISPATCH_CRASH("flawed group/semaphore logic"); \ - } \ - } while (0) -#elif USE_POSIX_SEM -#define DISPATCH_SEMAPHORE_VERIFY_RET(x) do { \ - if (slowpath((x) == -1)) { \ - DISPATCH_CRASH("flawed group/semaphore logic"); \ - } \ - } while (0) -#endif - -#if USE_WIN32_SEM -// rdar://problem/8428132 -static DWORD best_resolution = 1; // 1ms - -DWORD -_push_timer_resolution(DWORD ms) -{ - MMRESULT res; - static dispatch_once_t once; - - if (ms > 16) { - // only update timer resolution if smaller than default 15.6ms - // zero means not updated - return 0; - } - - // aim for the best resolution we can accomplish - dispatch_once(&once, ^{ - TIMECAPS tc; - MMRESULT res; - res = timeGetDevCaps(&tc, sizeof(tc)); - if (res == MMSYSERR_NOERROR) { - best_resolution = min(max(tc.wPeriodMin, best_resolution), - tc.wPeriodMax); - } - }); - - res = timeBeginPeriod(best_resolution); - if (res == TIMERR_NOERROR) { - return best_resolution; - } - // zero means not updated - return 0; -} - -// match ms parameter to result from _push_timer_resolution -void -_pop_timer_resolution(DWORD ms) -{ - if (ms) { - timeEndPeriod(ms); - } -} -#endif /* USE_WIN32_SEM */ - - DISPATCH_WEAK // rdar://problem/8503746 long _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema); -static long _dispatch_group_wake(dispatch_semaphore_t dsema); - #pragma mark - -#pragma mark dispatch_semaphore_t +#pragma mark dispatch_semaphore_class_t static void -_dispatch_semaphore_init(long value, dispatch_object_t dou) +_dispatch_semaphore_class_init(long value, dispatch_semaphore_class_t dsemau) { - dispatch_semaphore_t dsema = dou._dsema; + struct dispatch_semaphore_header_s *dsema = dsemau._dsema_hdr; - dsema->do_next = (dispatch_semaphore_t)DISPATCH_OBJECT_LISTLESS; - dsema->do_targetq = dispatch_get_global_queue( - DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); + dsema->do_next = DISPATCH_OBJECT_LISTLESS; + dsema->do_targetq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false); dsema->dsema_value = value; - dsema->dsema_orig = value; -#if USE_POSIX_SEM - int ret = sem_init(&dsema->dsema_sem, 0, 0); - DISPATCH_SEMAPHORE_VERIFY_RET(ret); -#endif + _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); } +#pragma mark - +#pragma mark dispatch_semaphore_t + dispatch_semaphore_t dispatch_semaphore_create(long value) { @@ -114,93 +49,28 @@ dispatch_semaphore_create(long value) // equal to the number of waiting threads. Therefore it is bogus to // initialize the semaphore with a negative value. if (value < 0) { - return NULL; + return DISPATCH_BAD_INPUT; } - dsema = (dispatch_semaphore_t)_dispatch_alloc(DISPATCH_VTABLE(semaphore), - sizeof(struct dispatch_semaphore_s) - - sizeof(dsema->dsema_notify_head) - - sizeof(dsema->dsema_notify_tail)); - _dispatch_semaphore_init(value, dsema); + dsema = (dispatch_semaphore_t)_dispatch_object_alloc( + DISPATCH_VTABLE(semaphore), sizeof(struct dispatch_semaphore_s)); + _dispatch_semaphore_class_init(value, dsema); + dsema->dsema_orig = value; return dsema; } -#if USE_MACH_SEM -static void -_dispatch_semaphore_create_port(semaphore_t *s4) -{ - kern_return_t kr; - semaphore_t tmp; - - if (*s4) { - return; - } - _dispatch_safe_fork = false; - - // lazily allocate the semaphore port - - // Someday: - // 1) Switch to a doubly-linked FIFO in user-space. - // 2) User-space timers for the timeout. - // 3) Use the per-thread semaphore port. - - while ((kr = semaphore_create(mach_task_self(), &tmp, - SYNC_POLICY_FIFO, 0))) { - DISPATCH_VERIFY_MIG(kr); - _dispatch_temporary_resource_shortage(); - } - - if (!dispatch_atomic_cmpxchg(s4, 0, tmp, relaxed)) { - kr = semaphore_destroy(mach_task_self(), tmp); - DISPATCH_SEMAPHORE_VERIFY_KR(kr); - } -} -#elif USE_WIN32_SEM -static void -_dispatch_semaphore_create_handle(HANDLE *s4) -{ - HANDLE tmp; - - if (*s4) { - return; - } - - // lazily allocate the semaphore port - - while (!dispatch_assume(tmp = CreateSemaphore(NULL, 0, LONG_MAX, NULL))) { - _dispatch_temporary_resource_shortage(); - } - - if (!dispatch_atomic_cmpxchg(s4, 0, tmp)) { - CloseHandle(tmp); - } -} -#endif - void -_dispatch_semaphore_dispose(dispatch_object_t dou) +_dispatch_semaphore_dispose(dispatch_object_t dou, + DISPATCH_UNUSED bool *allow_free) { dispatch_semaphore_t dsema = dou._dsema; if (dsema->dsema_value < dsema->dsema_orig) { - DISPATCH_CLIENT_CRASH( - "Semaphore/group object deallocated while in use"); + DISPATCH_CLIENT_CRASH(dsema->dsema_orig - dsema->dsema_value, + "Semaphore object deallocated while in use"); } -#if USE_MACH_SEM - kern_return_t kr; - if (dsema->dsema_port) { - kr = semaphore_destroy(mach_task_self(), dsema->dsema_port); - DISPATCH_SEMAPHORE_VERIFY_KR(kr); - } -#elif USE_POSIX_SEM - int ret = sem_destroy(&dsema->dsema_sem); - DISPATCH_SEMAPHORE_VERIFY_RET(ret); -#elif USE_WIN32_SEM - if (dsema->dsema_handle) { - CloseHandle(dsema->dsema_handle); - } -#endif + _dispatch_sema4_dispose(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); } size_t @@ -214,7 +84,7 @@ _dispatch_semaphore_debug(dispatch_object_t dou, char *buf, size_t bufsiz) offset += _dispatch_object_debug_attr(dsema, &buf[offset], bufsiz - offset); #if USE_MACH_SEM offset += dsnprintf(&buf[offset], bufsiz - offset, "port = 0x%u, ", - dsema->dsema_port); + dsema->dsema_sema); #endif offset += dsnprintf(&buf[offset], bufsiz - offset, "value = %ld, orig = %ld }", dsema->dsema_value, dsema->dsema_orig); @@ -225,43 +95,21 @@ DISPATCH_NOINLINE long _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema) { - // Before dsema_sent_ksignals is incremented we can rely on the reference - // held by the waiter. However, once this value is incremented the waiter - // may return between the atomic increment and the semaphore_signal(), - // therefore an explicit reference must be held in order to safely access - // dsema after the atomic increment. - _dispatch_retain(dsema); - -#if USE_MACH_SEM || USE_POSIX_SEM - (void)dispatch_atomic_inc2o(dsema, dsema_sent_ksignals, relaxed); -#endif - -#if USE_MACH_SEM - _dispatch_semaphore_create_port(&dsema->dsema_port); - kern_return_t kr = semaphore_signal(dsema->dsema_port); - DISPATCH_SEMAPHORE_VERIFY_KR(kr); -#elif USE_POSIX_SEM - int ret = sem_post(&dsema->dsema_sem); - DISPATCH_SEMAPHORE_VERIFY_RET(ret); -#elif USE_WIN32_SEM - _dispatch_semaphore_create_handle(&dsema->dsema_handle); - int ret = ReleaseSemaphore(dsema->dsema_handle, 1, NULL); - dispatch_assume(ret); -#endif - - _dispatch_release(dsema); + _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); + _dispatch_sema4_signal(&dsema->dsema_sema, 1); return 1; } long dispatch_semaphore_signal(dispatch_semaphore_t dsema) { - long value = dispatch_atomic_inc2o(dsema, dsema_value, release); + long value = os_atomic_inc2o(dsema, dsema_value, release); if (fastpath(value > 0)) { return 0; } if (slowpath(value == LONG_MIN)) { - DISPATCH_CLIENT_CRASH("Unbalanced call to dispatch_semaphore_signal()"); + DISPATCH_CLIENT_CRASH(value, + "Unbalanced call to dispatch_semaphore_signal()"); } return _dispatch_semaphore_signal_slow(dsema); } @@ -273,127 +121,35 @@ _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, { long orig; -#if USE_MACH_SEM - mach_timespec_t _timeout; - kern_return_t kr; -#elif USE_POSIX_SEM - struct timespec _timeout; - int ret; -#elif USE_WIN32_SEM - uint64_t nsec; - DWORD msec; - DWORD resolution; - DWORD wait_result; -#endif - -#if USE_MACH_SEM || USE_POSIX_SEM -again: - // Mach semaphores appear to sometimes spuriously wake up. Therefore, - // we keep a parallel count of the number of times a Mach semaphore is - // signaled (6880961). - orig = dsema->dsema_sent_ksignals; - while (orig) { - if (dispatch_atomic_cmpxchgvw2o(dsema, dsema_sent_ksignals, orig, - orig - 1, &orig, relaxed)) { - return 0; - } - } -#endif - -#if USE_MACH_SEM - _dispatch_semaphore_create_port(&dsema->dsema_port); -#elif USE_WIN32_SEM - _dispatch_semaphore_create_handle(&dsema->dsema_handle); -#endif - - // From xnu/osfmk/kern/sync_sema.c: - // wait_semaphore->count = -1; /* we don't keep an actual count */ - // - // The code above does not match the documentation, and that fact is - // not surprising. The documented semantics are clumsy to use in any - // practical way. The above hack effectively tricks the rest of the - // Mach semaphore logic to behave like the libdispatch algorithm. - + _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); switch (timeout) { default: -#if USE_MACH_SEM - do { - uint64_t nsec = _dispatch_timeout(timeout); - _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC); - _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC); - kr = slowpath(semaphore_timedwait(dsema->dsema_port, _timeout)); - } while (kr == KERN_ABORTED); - - if (kr != KERN_OPERATION_TIMED_OUT) { - DISPATCH_SEMAPHORE_VERIFY_KR(kr); - break; - } -#elif USE_POSIX_SEM - do { - uint64_t nsec = _dispatch_timeout(timeout); - _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC); - _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC); - ret = slowpath(sem_timedwait(&dsema->dsema_sem, &_timeout)); - } while (ret == -1 && errno == EINTR); - - if (ret == -1 && errno != ETIMEDOUT) { - DISPATCH_SEMAPHORE_VERIFY_RET(ret); - break; - } -#elif USE_WIN32_SEM - nsec = _dispatch_timeout(timeout); - msec = (DWORD)(nsec / (uint64_t)1000000); - resolution = _push_timer_resolution(msec); - wait_result = WaitForSingleObject(dsema->dsema_handle, msec); - _pop_timer_resolution(resolution); - if (wait_result != WAIT_TIMEOUT) { + if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) { break; } -#endif // Fall through and try to undo what the fast path did to // dsema->dsema_value case DISPATCH_TIME_NOW: orig = dsema->dsema_value; while (orig < 0) { - if (dispatch_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1, + if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1, &orig, relaxed)) { -#if USE_MACH_SEM - return KERN_OPERATION_TIMED_OUT; -#elif USE_POSIX_SEM || USE_WIN32_SEM - errno = ETIMEDOUT; - return -1; -#endif + return _DSEMA4_TIMEOUT(); } } // Another thread called semaphore_signal(). // Fall through and drain the wakeup. case DISPATCH_TIME_FOREVER: -#if USE_MACH_SEM - do { - kr = semaphore_wait(dsema->dsema_port); - } while (kr == KERN_ABORTED); - DISPATCH_SEMAPHORE_VERIFY_KR(kr); -#elif USE_POSIX_SEM - do { - ret = sem_wait(&dsema->dsema_sem); - } while (ret != 0); - DISPATCH_SEMAPHORE_VERIFY_RET(ret); -#elif USE_WIN32_SEM - WaitForSingleObject(dsema->dsema_handle, INFINITE); -#endif + _dispatch_sema4_wait(&dsema->dsema_sema); break; } -#if USE_MACH_SEM || USE_POSIX_SEM - goto again; -#else return 0; -#endif } long dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) { - long value = dispatch_atomic_dec2o(dsema, dsema_value, acquire); + long value = os_atomic_dec2o(dsema, dsema_value, acquire); if (fastpath(value >= 0)) { return 0; } @@ -403,377 +159,220 @@ dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) #pragma mark - #pragma mark dispatch_group_t +DISPATCH_ALWAYS_INLINE +static inline dispatch_group_t +_dispatch_group_create_with_count(long count) +{ + dispatch_group_t dg = (dispatch_group_t)_dispatch_object_alloc( + DISPATCH_VTABLE(group), sizeof(struct dispatch_group_s)); + _dispatch_semaphore_class_init(count, dg); + if (count) { + os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // + } + return dg; +} + dispatch_group_t dispatch_group_create(void) { - dispatch_group_t dg = (dispatch_group_t)_dispatch_alloc( - DISPATCH_VTABLE(group), sizeof(struct dispatch_semaphore_s)); - _dispatch_semaphore_init(LONG_MAX, dg); - return dg; + return _dispatch_group_create_with_count(0); +} + +dispatch_group_t +_dispatch_group_create_and_enter(void) +{ + return _dispatch_group_create_with_count(1); } void dispatch_group_enter(dispatch_group_t dg) { - dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg; - long value = dispatch_atomic_dec2o(dsema, dsema_value, acquire); - if (slowpath(value < 0)) { - DISPATCH_CLIENT_CRASH( + long value = os_atomic_inc_orig2o(dg, dg_value, acquire); + if (slowpath((unsigned long)value >= (unsigned long)LONG_MAX)) { + DISPATCH_CLIENT_CRASH(value, "Too many nested calls to dispatch_group_enter()"); } + if (value == 0) { + _dispatch_retain(dg); // + } } DISPATCH_NOINLINE static long -_dispatch_group_wake(dispatch_semaphore_t dsema) +_dispatch_group_wake(dispatch_group_t dg, bool needs_release) { - dispatch_continuation_t next, head, tail = NULL, dc; + dispatch_continuation_t next, head, tail = NULL; long rval; - head = dispatch_atomic_xchg2o(dsema, dsema_notify_head, NULL, relaxed); + // cannot use os_mpsc_capture_snapshot() because we can have concurrent + // _dispatch_group_wake() calls + head = os_atomic_xchg2o(dg, dg_notify_head, NULL, relaxed); if (head) { // snapshot before anything is notified/woken - tail = dispatch_atomic_xchg2o(dsema, dsema_notify_tail, NULL, relaxed); + tail = os_atomic_xchg2o(dg, dg_notify_tail, NULL, release); } - rval = (long)dispatch_atomic_xchg2o(dsema, dsema_group_waiters, 0, relaxed); + rval = (long)os_atomic_xchg2o(dg, dg_waiters, 0, relaxed); if (rval) { // wake group waiters -#if USE_MACH_SEM - _dispatch_semaphore_create_port(&dsema->dsema_port); - do { - kern_return_t kr = semaphore_signal(dsema->dsema_port); - DISPATCH_SEMAPHORE_VERIFY_KR(kr); - } while (--rval); -#elif USE_POSIX_SEM - do { - int ret = sem_post(&dsema->dsema_sem); - DISPATCH_SEMAPHORE_VERIFY_RET(ret); - } while (--rval); -#elif USE_WIN32_SEM - _dispatch_semaphore_create_handle(&dsema->dsema_handle); - int ret; - ret = ReleaseSemaphore(dsema->dsema_handle, rval, NULL); - dispatch_assume(ret); -#else -#error "No supported semaphore type" -#endif + _dispatch_sema4_create(&dg->dg_sema, _DSEMA4_POLICY_FIFO); + _dispatch_sema4_signal(&dg->dg_sema, rval); } + uint16_t refs = needs_release ? 1 : 0; // if (head) { // async group notify blocks do { - next = fastpath(head->do_next); - if (!next && head != tail) { - while (!(next = fastpath(head->do_next))) { - dispatch_hardware_pause(); - } - } + next = os_mpsc_pop_snapshot_head(head, tail, do_next); dispatch_queue_t dsn_queue = (dispatch_queue_t)head->dc_data; - dc = _dispatch_continuation_free_cacheonly(head); - dispatch_async_f(dsn_queue, head->dc_ctxt, head->dc_func); + _dispatch_continuation_async(dsn_queue, head); _dispatch_release(dsn_queue); - if (slowpath(dc)) { - _dispatch_continuation_free_to_cache_limit(dc); - } } while ((head = next)); - _dispatch_release(dsema); + refs++; } + if (refs) _dispatch_release_n(dg, refs); return 0; } void dispatch_group_leave(dispatch_group_t dg) { - dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg; - long value = dispatch_atomic_inc2o(dsema, dsema_value, release); + long value = os_atomic_dec2o(dg, dg_value, release); + if (slowpath(value == 0)) { + return (void)_dispatch_group_wake(dg, true); + } if (slowpath(value < 0)) { - DISPATCH_CLIENT_CRASH("Unbalanced call to dispatch_group_leave()"); + DISPATCH_CLIENT_CRASH(value, + "Unbalanced call to dispatch_group_leave()"); } - if (slowpath(value == LONG_MAX)) { - (void)_dispatch_group_wake(dsema); +} + +void +_dispatch_group_dispose(dispatch_object_t dou, DISPATCH_UNUSED bool *allow_free) +{ + dispatch_group_t dg = dou._dg; + + if (dg->dg_value) { + DISPATCH_CLIENT_CRASH(dg->dg_value, + "Group object deallocated while in use"); } + + _dispatch_sema4_dispose(&dg->dg_sema, _DSEMA4_POLICY_FIFO); } -DISPATCH_NOINLINE -static long -_dispatch_group_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) +size_t +_dispatch_group_debug(dispatch_object_t dou, char *buf, size_t bufsiz) { - long orig; + dispatch_group_t dg = dou._dg; + size_t offset = 0; + offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", + dx_kind(dg), dg); + offset += _dispatch_object_debug_attr(dg, &buf[offset], bufsiz - offset); #if USE_MACH_SEM - mach_timespec_t _timeout; - kern_return_t kr; -#elif USE_POSIX_SEM // KVV - struct timespec _timeout; - int ret; -#elif USE_WIN32_SEM // KVV - uint64_t nsec; - DWORD msec; - DWORD resolution; - DWORD wait_result; + offset += dsnprintf(&buf[offset], bufsiz - offset, "port = 0x%u, ", + dg->dg_sema); #endif + offset += dsnprintf(&buf[offset], bufsiz - offset, + "count = %ld, waiters = %d }", dg->dg_value, dg->dg_waiters); + return offset; +} + +DISPATCH_NOINLINE +static long +_dispatch_group_wait_slow(dispatch_group_t dg, dispatch_time_t timeout) +{ + long value; + int orig_waiters; -again: // check before we cause another signal to be sent by incrementing - // dsema->dsema_group_waiters - if (dsema->dsema_value == LONG_MAX) { - return _dispatch_group_wake(dsema); + // dg->dg_waiters + value = os_atomic_load2o(dg, dg_value, ordered); // 19296565 + if (value == 0) { + return _dispatch_group_wake(dg, false); } - // Mach semaphores appear to sometimes spuriously wake up. Therefore, - // we keep a parallel count of the number of times a Mach semaphore is - // signaled (6880961). - (void)dispatch_atomic_inc2o(dsema, dsema_group_waiters, relaxed); + + (void)os_atomic_inc2o(dg, dg_waiters, relaxed); // check the values again in case we need to wake any threads - if (dsema->dsema_value == LONG_MAX) { - return _dispatch_group_wake(dsema); + value = os_atomic_load2o(dg, dg_value, ordered); // 19296565 + if (value == 0) { + _dispatch_group_wake(dg, false); + // Fall through to consume the extra signal, forcing timeout to avoid + // useless setups as it won't block + timeout = DISPATCH_TIME_FOREVER; } -#if USE_MACH_SEM - _dispatch_semaphore_create_port(&dsema->dsema_port); -#elif USE_WIN32_SEM - _dispatch_semaphore_create_handle(&dsema->dsema_handle); -#endif - - // From xnu/osfmk/kern/sync_sema.c: - // wait_semaphore->count = -1; /* we don't keep an actual count */ - // - // The code above does not match the documentation, and that fact is - // not surprising. The documented semantics are clumsy to use in any - // practical way. The above hack effectively tricks the rest of the - // Mach semaphore logic to behave like the libdispatch algorithm. - + _dispatch_sema4_create(&dg->dg_sema, _DSEMA4_POLICY_FIFO); switch (timeout) { default: -#if USE_MACH_SEM - do { - uint64_t nsec = _dispatch_timeout(timeout); - _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC); - _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC); - kr = slowpath(semaphore_timedwait(dsema->dsema_port, _timeout)); - } while (kr == KERN_ABORTED); - - if (kr != KERN_OPERATION_TIMED_OUT) { - DISPATCH_SEMAPHORE_VERIFY_KR(kr); - break; - } -#elif USE_POSIX_SEM - do { - uint64_t nsec = _dispatch_timeout(timeout); - _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC); - _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC); - ret = slowpath(sem_timedwait(&dsema->dsema_sem, &_timeout)); - } while (ret == -1 && errno == EINTR); - - if (!(ret == -1 && errno == ETIMEDOUT)) { - DISPATCH_SEMAPHORE_VERIFY_RET(ret); + if (!_dispatch_sema4_timedwait(&dg->dg_sema, timeout)) { break; } -#elif USE_WIN32_SEM - nsec = _dispatch_timeout(timeout); - msec = (DWORD)(nsec / (uint64_t)1000000); - resolution = _push_timer_resolution(msec); - wait_result = WaitForSingleObject(dsema->dsema_handle, msec); - _pop_timer_resolution(resolution); - if (wait_result != WAIT_TIMEOUT) { - break; - } -#endif // Fall through and try to undo the earlier change to - // dsema->dsema_group_waiters + // dg->dg_waiters case DISPATCH_TIME_NOW: - orig = dsema->dsema_group_waiters; - while (orig) { - if (dispatch_atomic_cmpxchgvw2o(dsema, dsema_group_waiters, orig, - orig - 1, &orig, relaxed)) { -#if USE_MACH_SEM - return KERN_OPERATION_TIMED_OUT; -#elif USE_POSIX_SEM || USE_WIN32_SEM - errno = ETIMEDOUT; - return -1; -#endif + orig_waiters = dg->dg_waiters; + while (orig_waiters) { + if (os_atomic_cmpxchgvw2o(dg, dg_waiters, orig_waiters, + orig_waiters - 1, &orig_waiters, relaxed)) { + return _DSEMA4_TIMEOUT(); } } - // Another thread called semaphore_signal(). + // Another thread is running _dispatch_group_wake() // Fall through and drain the wakeup. case DISPATCH_TIME_FOREVER: -#if USE_MACH_SEM - do { - kr = semaphore_wait(dsema->dsema_port); - } while (kr == KERN_ABORTED); - DISPATCH_SEMAPHORE_VERIFY_KR(kr); -#elif USE_POSIX_SEM - do { - ret = sem_wait(&dsema->dsema_sem); - } while (ret == -1 && errno == EINTR); - DISPATCH_SEMAPHORE_VERIFY_RET(ret); -#elif USE_WIN32_SEM - WaitForSingleObject(dsema->dsema_handle, INFINITE); -#endif + _dispatch_sema4_wait(&dg->dg_sema); break; } - goto again; - } + return 0; +} long dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout) { - dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg; - - if (dsema->dsema_value == LONG_MAX) { + if (dg->dg_value == 0) { return 0; } if (timeout == 0) { -#if USE_MACH_SEM - return KERN_OPERATION_TIMED_OUT; -#elif USE_POSIX_SEM || USE_WIN32_SEM - errno = ETIMEDOUT; - return (-1); -#endif + return _DSEMA4_TIMEOUT(); } - return _dispatch_group_wait_slow(dsema, timeout); + return _dispatch_group_wait_slow(dg, timeout); } -DISPATCH_NOINLINE -void -dispatch_group_notify_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, - void (*func)(void *)) +DISPATCH_ALWAYS_INLINE +static inline void +_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq, + dispatch_continuation_t dsn) { - dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg; - dispatch_continuation_t prev, dsn = _dispatch_continuation_alloc(); - dsn->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT; dsn->dc_data = dq; - dsn->dc_ctxt = ctxt; - dsn->dc_func = func; dsn->do_next = NULL; _dispatch_retain(dq); - prev = dispatch_atomic_xchg2o(dsema, dsema_notify_tail, dsn, release); - if (fastpath(prev)) { - prev->do_next = dsn; - } else { + if (os_mpsc_push_update_tail(dg, dg_notify, dsn, do_next)) { _dispatch_retain(dg); - dispatch_atomic_store2o(dsema, dsema_notify_head, dsn, seq_cst); - dispatch_atomic_barrier(seq_cst); // - if (dispatch_atomic_load2o(dsema, dsema_value, seq_cst) == LONG_MAX) { - _dispatch_group_wake(dsema); + os_atomic_store2o(dg, dg_notify_head, dsn, ordered); + // seq_cst with atomic store to notify_head + if (os_atomic_load2o(dg, dg_value, ordered) == 0) { + _dispatch_group_wake(dg, false); } } } -#ifdef __BLOCKS__ -void -dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq, - dispatch_block_t db) -{ - dispatch_group_notify_f(dg, dq, _dispatch_Block_copy(db), - _dispatch_call_block_and_release); -} -#endif - -#pragma mark - -#pragma mark _dispatch_thread_semaphore_t - -_dispatch_thread_semaphore_t -_dispatch_thread_semaphore_create(void) -{ - _dispatch_safe_fork = false; -#if DISPATCH_USE_OS_SEMAPHORE_CACHE - return _os_semaphore_create(); -#elif USE_MACH_SEM - semaphore_t s4; - kern_return_t kr; - while (slowpath(kr = semaphore_create(mach_task_self(), &s4, - SYNC_POLICY_FIFO, 0))) { - DISPATCH_VERIFY_MIG(kr); - _dispatch_temporary_resource_shortage(); - } - return s4; -#elif USE_POSIX_SEM - sem_t s4; - int ret = sem_init(&s4, 0, 0); - DISPATCH_SEMAPHORE_VERIFY_RET(ret); - return s4; -#elif USE_WIN32_SEM - HANDLE tmp; - while (!dispatch_assume(tmp = CreateSemaphore(NULL, 0, LONG_MAX, NULL))) { - _dispatch_temporary_resource_shortage(); - } - return (_dispatch_thread_semaphore_t)tmp; -#else -#error "No supported semaphore type" -#endif -} - +DISPATCH_NOINLINE void -_dispatch_thread_semaphore_dispose(_dispatch_thread_semaphore_t sema) +dispatch_group_notify_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, + dispatch_function_t func) { -#if DISPATCH_USE_OS_SEMAPHORE_CACHE - return _os_semaphore_dispose(sema); -#elif USE_MACH_SEM - semaphore_t s4 = (semaphore_t)sema; - kern_return_t kr = semaphore_destroy(mach_task_self(), s4); - DISPATCH_SEMAPHORE_VERIFY_KR(kr); -#elif USE_POSIX_SEM - sem_t s4 = (sem_t)sema; - int ret = sem_destroy(&s4); - DISPATCH_SEMAPHORE_VERIFY_RET(ret); -#elif USE_WIN32_SEM - // XXX: signal the semaphore? - WINBOOL success; - success = CloseHandle((HANDLE)sema); - dispatch_assume(success); -#else -#error "No supported semaphore type" -#endif + dispatch_continuation_t dsn = _dispatch_continuation_alloc(); + _dispatch_continuation_init_f(dsn, dq, ctxt, func, 0, 0, + DISPATCH_OBJ_CONSUME_BIT); + _dispatch_group_notify(dg, dq, dsn); } +#ifdef __BLOCKS__ void -_dispatch_thread_semaphore_signal(_dispatch_thread_semaphore_t sema) +dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq, + dispatch_block_t db) { - // assumed to contain a release barrier -#if DISPATCH_USE_OS_SEMAPHORE_CACHE - return _os_semaphore_signal(sema); -#elif USE_MACH_SEM - semaphore_t s4 = (semaphore_t)sema; - kern_return_t kr = semaphore_signal(s4); - DISPATCH_SEMAPHORE_VERIFY_KR(kr); -#elif USE_POSIX_SEM - sem_t s4 = (sem_t)sema; - int ret = sem_post(&s4); - DISPATCH_SEMAPHORE_VERIFY_RET(ret); -#elif USE_WIN32_SEM - int ret; - ret = ReleaseSemaphore((HANDLE)sema, 1, NULL); - dispatch_assume(ret); -#else -#error "No supported semaphore type" -#endif + dispatch_continuation_t dsn = _dispatch_continuation_alloc(); + _dispatch_continuation_init(dsn, dq, db, 0, 0, DISPATCH_OBJ_CONSUME_BIT); + _dispatch_group_notify(dg, dq, dsn); } - -void -_dispatch_thread_semaphore_wait(_dispatch_thread_semaphore_t sema) -{ - // assumed to contain an acquire barrier -#if DISPATCH_USE_OS_SEMAPHORE_CACHE - return _os_semaphore_wait(sema); -#elif USE_MACH_SEM - semaphore_t s4 = (semaphore_t)sema; - kern_return_t kr; - do { - kr = semaphore_wait(s4); - } while (slowpath(kr == KERN_ABORTED)); - DISPATCH_SEMAPHORE_VERIFY_KR(kr); -#elif USE_POSIX_SEM - sem_t s4 = (sem_t)sema; - int ret; - do { - ret = sem_wait(&s4); - } while (slowpath(ret != 0)); - DISPATCH_SEMAPHORE_VERIFY_RET(ret); -#elif USE_WIN32_SEM - DWORD wait_result; - do { - wait_result = WaitForSingleObject((HANDLE)sema, INFINITE); - } while (wait_result != WAIT_OBJECT_0); -#else -#error "No supported semaphore type" #endif -}