/*
- * Copyright (c) 2008-2011 Apple Inc. All rights reserved.
+ * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
*
* @APPLE_APACHE_LICENSE_HEADER_START@
*
#include "internal.h"
-// semaphores are too fundamental to use the dispatch_assume*() macros
-#if USE_MACH_SEM
-#define DISPATCH_SEMAPHORE_VERIFY_KR(x) do { \
- if (slowpath(x)) { \
- DISPATCH_CRASH("flawed group/semaphore logic"); \
- } \
- } while (0)
-#elif USE_POSIX_SEM
-#define DISPATCH_SEMAPHORE_VERIFY_RET(x) do { \
- if (slowpath((x) == -1)) { \
- DISPATCH_CRASH("flawed group/semaphore logic"); \
- } \
- } while (0)
-#endif
-
DISPATCH_WEAK // rdar://problem/8503746
long _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema);
-static void _dispatch_semaphore_dispose(dispatch_semaphore_t dsema);
-static size_t _dispatch_semaphore_debug(dispatch_semaphore_t dsema, char *buf,
- size_t bufsiz);
-static long _dispatch_group_wake(dispatch_semaphore_t dsema);
-
#pragma mark -
-#pragma mark dispatch_semaphore_t
+#pragma mark dispatch_semaphore_class_t
+
+static void
+_dispatch_semaphore_class_init(long value, dispatch_semaphore_class_t dsemau)
+{
+ struct dispatch_semaphore_header_s *dsema = dsemau._dsema_hdr;
-struct dispatch_semaphore_vtable_s {
- DISPATCH_VTABLE_HEADER(dispatch_semaphore_s);
-};
+ dsema->do_next = DISPATCH_OBJECT_LISTLESS;
+ dsema->do_targetq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false);
+ dsema->dsema_value = value;
+ _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
+}
-const struct dispatch_semaphore_vtable_s _dispatch_semaphore_vtable = {
- .do_type = DISPATCH_SEMAPHORE_TYPE,
- .do_kind = "semaphore",
- .do_dispose = _dispatch_semaphore_dispose,
- .do_debug = _dispatch_semaphore_debug,
-};
+#pragma mark -
+#pragma mark dispatch_semaphore_t
dispatch_semaphore_t
dispatch_semaphore_create(long value)
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) {
- return NULL;
- }
-
- dsema = calloc(1, sizeof(struct dispatch_semaphore_s));
-
- if (fastpath(dsema)) {
- dsema->do_vtable = &_dispatch_semaphore_vtable;
- dsema->do_next = DISPATCH_OBJECT_LISTLESS;
- dsema->do_ref_cnt = 1;
- dsema->do_xref_cnt = 1;
- dsema->do_targetq = dispatch_get_global_queue(
- DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
- dsema->dsema_value = value;
- dsema->dsema_orig = value;
-#if USE_POSIX_SEM
- int ret = sem_init(&dsema->dsema_sem, 0, 0);
- DISPATCH_SEMAPHORE_VERIFY_RET(ret);
-#endif
+ return DISPATCH_BAD_INPUT;
}
+ dsema = (dispatch_semaphore_t)_dispatch_object_alloc(
+ DISPATCH_VTABLE(semaphore), sizeof(struct dispatch_semaphore_s));
+ _dispatch_semaphore_class_init(value, dsema);
+ dsema->dsema_orig = value;
return dsema;
}
-#if USE_MACH_SEM
-static void
-_dispatch_semaphore_create_port(semaphore_t *s4)
+void
+_dispatch_semaphore_dispose(dispatch_object_t dou,
+ DISPATCH_UNUSED bool *allow_free)
{
- kern_return_t kr;
- semaphore_t tmp;
-
- if (*s4) {
- return;
- }
-
- // lazily allocate the semaphore port
-
- // Someday:
- // 1) Switch to a doubly-linked FIFO in user-space.
- // 2) User-space timers for the timeout.
- // 3) Use the per-thread semaphore port.
-
- while ((kr = semaphore_create(mach_task_self(), &tmp,
- SYNC_POLICY_FIFO, 0))) {
- DISPATCH_VERIFY_MIG(kr);
- sleep(1);
- }
+ dispatch_semaphore_t dsema = dou._dsema;
- if (!dispatch_atomic_cmpxchg(s4, 0, tmp)) {
- kr = semaphore_destroy(mach_task_self(), tmp);
- DISPATCH_SEMAPHORE_VERIFY_KR(kr);
- }
-
- _dispatch_safe_fork = false;
-}
-#endif
-
-static void
-_dispatch_semaphore_dispose(dispatch_semaphore_t dsema)
-{
if (dsema->dsema_value < dsema->dsema_orig) {
- DISPATCH_CLIENT_CRASH(
- "Semaphore/group object deallocated while in use");
- }
-
-#if USE_MACH_SEM
- kern_return_t kr;
- if (dsema->dsema_port) {
- kr = semaphore_destroy(mach_task_self(), dsema->dsema_port);
- DISPATCH_SEMAPHORE_VERIFY_KR(kr);
- }
- if (dsema->dsema_waiter_port) {
- kr = semaphore_destroy(mach_task_self(), dsema->dsema_waiter_port);
- DISPATCH_SEMAPHORE_VERIFY_KR(kr);
+ DISPATCH_CLIENT_CRASH(dsema->dsema_orig - dsema->dsema_value,
+ "Semaphore object deallocated while in use");
}
-#elif USE_POSIX_SEM
- int ret = sem_destroy(&dsema->dsema_sem);
- DISPATCH_SEMAPHORE_VERIFY_RET(ret);
-#endif
- _dispatch_dispose(dsema);
+ _dispatch_sema4_dispose(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
}
-static size_t
-_dispatch_semaphore_debug(dispatch_semaphore_t dsema, char *buf, size_t bufsiz)
+size_t
+_dispatch_semaphore_debug(dispatch_object_t dou, char *buf, size_t bufsiz)
{
+ dispatch_semaphore_t dsema = dou._dsema;
+
size_t offset = 0;
- offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
+ offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
dx_kind(dsema), dsema);
offset += _dispatch_object_debug_attr(dsema, &buf[offset], bufsiz - offset);
#if USE_MACH_SEM
- offset += snprintf(&buf[offset], bufsiz - offset, "port = 0x%u, ",
- dsema->dsema_port);
+ offset += dsnprintf(&buf[offset], bufsiz - offset, "port = 0x%u, ",
+ dsema->dsema_sema);
#endif
- offset += snprintf(&buf[offset], bufsiz - offset,
+ offset += dsnprintf(&buf[offset], bufsiz - offset,
"value = %ld, orig = %ld }", dsema->dsema_value, dsema->dsema_orig);
return offset;
}
long
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
- // Before dsema_sent_ksignals is incremented we can rely on the reference
- // held by the waiter. However, once this value is incremented the waiter
- // may return between the atomic increment and the semaphore_signal(),
- // therefore an explicit reference must be held in order to safely access
- // dsema after the atomic increment.
- _dispatch_retain(dsema);
-
- (void)dispatch_atomic_inc2o(dsema, dsema_sent_ksignals);
-
-#if USE_MACH_SEM
- _dispatch_semaphore_create_port(&dsema->dsema_port);
- kern_return_t kr = semaphore_signal(dsema->dsema_port);
- DISPATCH_SEMAPHORE_VERIFY_KR(kr);
-#elif USE_POSIX_SEM
- int ret = sem_post(&dsema->dsema_sem);
- DISPATCH_SEMAPHORE_VERIFY_RET(ret);
-#endif
-
- _dispatch_release(dsema);
+ _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
+ _dispatch_sema4_signal(&dsema->dsema_sema, 1);
return 1;
}
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
- dispatch_atomic_release_barrier();
- long value = dispatch_atomic_inc2o(dsema, dsema_value);
+ long value = os_atomic_inc2o(dsema, dsema_value, release);
if (fastpath(value > 0)) {
return 0;
}
if (slowpath(value == LONG_MIN)) {
- DISPATCH_CLIENT_CRASH("Unbalanced call to dispatch_group_leave() or "
- "dispatch_semaphore_signal()");
+ DISPATCH_CLIENT_CRASH(value,
+ "Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
{
long orig;
-again:
- // Mach semaphores appear to sometimes spuriously wake up. Therefore,
- // we keep a parallel count of the number of times a Mach semaphore is
- // signaled (6880961).
- while ((orig = dsema->dsema_sent_ksignals)) {
- if (dispatch_atomic_cmpxchg2o(dsema, dsema_sent_ksignals, orig,
- orig - 1)) {
- return 0;
- }
- }
-
-#if USE_MACH_SEM
- mach_timespec_t _timeout;
- kern_return_t kr;
-
- _dispatch_semaphore_create_port(&dsema->dsema_port);
-
- // From xnu/osfmk/kern/sync_sema.c:
- // wait_semaphore->count = -1; /* we don't keep an actual count */
- //
- // The code above does not match the documentation, and that fact is
- // not surprising. The documented semantics are clumsy to use in any
- // practical way. The above hack effectively tricks the rest of the
- // Mach semaphore logic to behave like the libdispatch algorithm.
-
- switch (timeout) {
- default:
- do {
- uint64_t nsec = _dispatch_timeout(timeout);
- _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
- _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
- kr = slowpath(semaphore_timedwait(dsema->dsema_port, _timeout));
- } while (kr == KERN_ABORTED);
-
- if (kr != KERN_OPERATION_TIMED_OUT) {
- DISPATCH_SEMAPHORE_VERIFY_KR(kr);
- break;
- }
- // Fall through and try to undo what the fast path did to
- // dsema->dsema_value
- case DISPATCH_TIME_NOW:
- while ((orig = dsema->dsema_value) < 0) {
- if (dispatch_atomic_cmpxchg2o(dsema, dsema_value, orig, orig + 1)) {
- return KERN_OPERATION_TIMED_OUT;
- }
- }
- // Another thread called semaphore_signal().
- // Fall through and drain the wakeup.
- case DISPATCH_TIME_FOREVER:
- do {
- kr = semaphore_wait(dsema->dsema_port);
- } while (kr == KERN_ABORTED);
- DISPATCH_SEMAPHORE_VERIFY_KR(kr);
- break;
- }
-#elif USE_POSIX_SEM
- struct timespec _timeout;
- int ret;
-
+ _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
- do {
- uint64_t nsec = _dispatch_timeout(timeout);
- _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
- _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
- ret = slowpath(sem_timedwait(&dsema->dsema_sem, &_timeout));
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1 && errno != ETIMEDOUT) {
- DISPATCH_SEMAPHORE_VERIFY_RET(ret);
+ if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
- while ((orig = dsema->dsema_value) < 0) {
- if (dispatch_atomic_cmpxchg2o(dsema, dsema_value, orig, orig + 1)) {
- errno = ETIMEDOUT;
- return -1;
+ orig = dsema->dsema_value;
+ while (orig < 0) {
+ if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
+ &orig, relaxed)) {
+ return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
- do {
- ret = sem_wait(&dsema->dsema_sem);
- } while (ret != 0);
- DISPATCH_SEMAPHORE_VERIFY_RET(ret);
+ _dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
-#endif
-
- goto again;
+ return 0;
}
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
- long value = dispatch_atomic_dec2o(dsema, dsema_value);
- dispatch_atomic_acquire_barrier();
+ long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (fastpath(value >= 0)) {
return 0;
}
#pragma mark -
#pragma mark dispatch_group_t
+DISPATCH_ALWAYS_INLINE
+static inline dispatch_group_t
+_dispatch_group_create_with_count(long count)
+{
+ dispatch_group_t dg = (dispatch_group_t)_dispatch_object_alloc(
+ DISPATCH_VTABLE(group), sizeof(struct dispatch_group_s));
+ _dispatch_semaphore_class_init(count, dg);
+ if (count) {
+ os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://problem/22318411>
+ }
+ return dg;
+}
+
dispatch_group_t
dispatch_group_create(void)
{
- return (dispatch_group_t)dispatch_semaphore_create(LONG_MAX);
+ return _dispatch_group_create_with_count(0);
+}
+
+dispatch_group_t
+_dispatch_group_create_and_enter(void)
+{
+ return _dispatch_group_create_with_count(1);
}
void
dispatch_group_enter(dispatch_group_t dg)
{
- dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
-
- (void)dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
+ long value = os_atomic_inc_orig2o(dg, dg_value, acquire);
+ if (slowpath((unsigned long)value >= (unsigned long)LONG_MAX)) {
+ DISPATCH_CLIENT_CRASH(value,
+ "Too many nested calls to dispatch_group_enter()");
+ }
+ if (value == 0) {
+ _dispatch_retain(dg); // <rdar://problem/22318411>
+ }
}
DISPATCH_NOINLINE
static long
-_dispatch_group_wake(dispatch_semaphore_t dsema)
+_dispatch_group_wake(dispatch_group_t dg, bool needs_release)
{
- struct dispatch_sema_notify_s *next, *head, *tail = NULL;
+ dispatch_continuation_t next, head, tail = NULL;
long rval;
- head = dispatch_atomic_xchg2o(dsema, dsema_notify_head, NULL);
+ // cannot use os_mpsc_capture_snapshot() because we can have concurrent
+ // _dispatch_group_wake() calls
+ head = os_atomic_xchg2o(dg, dg_notify_head, NULL, relaxed);
if (head) {
// snapshot before anything is notified/woken <rdar://problem/8554546>
- tail = dispatch_atomic_xchg2o(dsema, dsema_notify_tail, NULL);
+ tail = os_atomic_xchg2o(dg, dg_notify_tail, NULL, release);
}
- rval = dispatch_atomic_xchg2o(dsema, dsema_group_waiters, 0);
+ rval = (long)os_atomic_xchg2o(dg, dg_waiters, 0, relaxed);
if (rval) {
// wake group waiters
-#if USE_MACH_SEM
- _dispatch_semaphore_create_port(&dsema->dsema_waiter_port);
- do {
- kern_return_t kr = semaphore_signal(dsema->dsema_waiter_port);
- DISPATCH_SEMAPHORE_VERIFY_KR(kr);
- } while (--rval);
-#elif USE_POSIX_SEM
- do {
- int ret = sem_post(&dsema->dsema_sem);
- DISPATCH_SEMAPHORE_VERIFY_RET(ret);
- } while (--rval);
-#endif
+ _dispatch_sema4_create(&dg->dg_sema, _DSEMA4_POLICY_FIFO);
+ _dispatch_sema4_signal(&dg->dg_sema, rval);
}
+ uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
if (head) {
// async group notify blocks
do {
- dispatch_async_f(head->dsn_queue, head->dsn_ctxt, head->dsn_func);
- _dispatch_release(head->dsn_queue);
- next = fastpath(head->dsn_next);
- if (!next && head != tail) {
- while (!(next = fastpath(head->dsn_next))) {
- _dispatch_hardware_pause();
- }
- }
- free(head);
+ next = os_mpsc_pop_snapshot_head(head, tail, do_next);
+ dispatch_queue_t dsn_queue = (dispatch_queue_t)head->dc_data;
+ _dispatch_continuation_async(dsn_queue, head);
+ _dispatch_release(dsn_queue);
} while ((head = next));
- _dispatch_release(dsema);
+ refs++;
}
+ if (refs) _dispatch_release_n(dg, refs);
return 0;
}
void
dispatch_group_leave(dispatch_group_t dg)
{
- dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
-
- dispatch_semaphore_signal(dsema);
- if (dsema->dsema_value == dsema->dsema_orig) {
- (void)_dispatch_group_wake(dsema);
+ long value = os_atomic_dec2o(dg, dg_value, release);
+ if (slowpath(value == 0)) {
+ return (void)_dispatch_group_wake(dg, true);
+ }
+ if (slowpath(value < 0)) {
+ DISPATCH_CLIENT_CRASH(value,
+ "Unbalanced call to dispatch_group_leave()");
}
}
-DISPATCH_NOINLINE
-static long
-_dispatch_group_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout)
+void
+_dispatch_group_dispose(dispatch_object_t dou, DISPATCH_UNUSED bool *allow_free)
{
- long orig;
+ dispatch_group_t dg = dou._dg;
-again:
- // check before we cause another signal to be sent by incrementing
- // dsema->dsema_group_waiters
- if (dsema->dsema_value == dsema->dsema_orig) {
- return _dispatch_group_wake(dsema);
- }
- // Mach semaphores appear to sometimes spuriously wake up. Therefore,
- // we keep a parallel count of the number of times a Mach semaphore is
- // signaled (6880961).
- (void)dispatch_atomic_inc2o(dsema, dsema_group_waiters);
- // check the values again in case we need to wake any threads
- if (dsema->dsema_value == dsema->dsema_orig) {
- return _dispatch_group_wake(dsema);
+ if (dg->dg_value) {
+ DISPATCH_CLIENT_CRASH(dg->dg_value,
+ "Group object deallocated while in use");
}
+ _dispatch_sema4_dispose(&dg->dg_sema, _DSEMA4_POLICY_FIFO);
+}
+
+size_t
+_dispatch_group_debug(dispatch_object_t dou, char *buf, size_t bufsiz)
+{
+ dispatch_group_t dg = dou._dg;
+
+ size_t offset = 0;
+ offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
+ dx_kind(dg), dg);
+ offset += _dispatch_object_debug_attr(dg, &buf[offset], bufsiz - offset);
#if USE_MACH_SEM
- mach_timespec_t _timeout;
- kern_return_t kr;
+ offset += dsnprintf(&buf[offset], bufsiz - offset, "port = 0x%u, ",
+ dg->dg_sema);
+#endif
+ offset += dsnprintf(&buf[offset], bufsiz - offset,
+ "count = %ld, waiters = %d }", dg->dg_value, dg->dg_waiters);
+ return offset;
+}
- _dispatch_semaphore_create_port(&dsema->dsema_waiter_port);
+DISPATCH_NOINLINE
+static long
+_dispatch_group_wait_slow(dispatch_group_t dg, dispatch_time_t timeout)
+{
+ long value;
+ int orig_waiters;
- // From xnu/osfmk/kern/sync_sema.c:
- // wait_semaphore->count = -1; /* we don't keep an actual count */
- //
- // The code above does not match the documentation, and that fact is
- // not surprising. The documented semantics are clumsy to use in any
- // practical way. The above hack effectively tricks the rest of the
- // Mach semaphore logic to behave like the libdispatch algorithm.
+ // check before we cause another signal to be sent by incrementing
+ // dg->dg_waiters
+ value = os_atomic_load2o(dg, dg_value, ordered); // 19296565
+ if (value == 0) {
+ return _dispatch_group_wake(dg, false);
+ }
- switch (timeout) {
- default:
- do {
- uint64_t nsec = _dispatch_timeout(timeout);
- _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
- _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
- kr = slowpath(semaphore_timedwait(dsema->dsema_waiter_port,
- _timeout));
- } while (kr == KERN_ABORTED);
-
- if (kr != KERN_OPERATION_TIMED_OUT) {
- DISPATCH_SEMAPHORE_VERIFY_KR(kr);
- break;
- }
- // Fall through and try to undo the earlier change to
- // dsema->dsema_group_waiters
- case DISPATCH_TIME_NOW:
- while ((orig = dsema->dsema_group_waiters)) {
- if (dispatch_atomic_cmpxchg2o(dsema, dsema_group_waiters, orig,
- orig - 1)) {
- return KERN_OPERATION_TIMED_OUT;
- }
- }
- // Another thread called semaphore_signal().
- // Fall through and drain the wakeup.
- case DISPATCH_TIME_FOREVER:
- do {
- kr = semaphore_wait(dsema->dsema_waiter_port);
- } while (kr == KERN_ABORTED);
- DISPATCH_SEMAPHORE_VERIFY_KR(kr);
- break;
+ (void)os_atomic_inc2o(dg, dg_waiters, relaxed);
+ // check the values again in case we need to wake any threads
+ value = os_atomic_load2o(dg, dg_value, ordered); // 19296565
+ if (value == 0) {
+ _dispatch_group_wake(dg, false);
+ // Fall through to consume the extra signal, forcing timeout to avoid
+ // useless setups as it won't block
+ timeout = DISPATCH_TIME_FOREVER;
}
-#elif USE_POSIX_SEM
- struct timespec _timeout;
- int ret;
+ _dispatch_sema4_create(&dg->dg_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
- do {
- uint64_t nsec = _dispatch_timeout(timeout);
- _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
- _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
- ret = slowpath(sem_timedwait(&dsema->dsema_sem, &_timeout));
- } while (ret == -1 && errno == EINTR);
-
- if (!(ret == -1 && errno == ETIMEDOUT)) {
- DISPATCH_SEMAPHORE_VERIFY_RET(ret);
+ if (!_dispatch_sema4_timedwait(&dg->dg_sema, timeout)) {
break;
}
// Fall through and try to undo the earlier change to
- // dsema->dsema_group_waiters
+ // dg->dg_waiters
case DISPATCH_TIME_NOW:
- while ((orig = dsema->dsema_group_waiters)) {
- if (dispatch_atomic_cmpxchg2o(dsema, dsema_group_waiters, orig,
- orig - 1)) {
- errno = ETIMEDOUT;
- return -1;
+ orig_waiters = dg->dg_waiters;
+ while (orig_waiters) {
+ if (os_atomic_cmpxchgvw2o(dg, dg_waiters, orig_waiters,
+ orig_waiters - 1, &orig_waiters, relaxed)) {
+ return _DSEMA4_TIMEOUT();
}
}
- // Another thread called semaphore_signal().
+ // Another thread is running _dispatch_group_wake()
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
- do {
- ret = sem_wait(&dsema->dsema_sem);
- } while (ret == -1 && errno == EINTR);
- DISPATCH_SEMAPHORE_VERIFY_RET(ret);
+ _dispatch_sema4_wait(&dg->dg_sema);
break;
}
-#endif
-
- goto again;
+ return 0;
}
long
dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
{
- dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
-
- if (dsema->dsema_value == dsema->dsema_orig) {
+ if (dg->dg_value == 0) {
return 0;
}
if (timeout == 0) {
-#if USE_MACH_SEM
- return KERN_OPERATION_TIMED_OUT;
-#elif USE_POSIX_SEM
- errno = ETIMEDOUT;
- return (-1);
-#endif
+ return _DSEMA4_TIMEOUT();
}
- return _dispatch_group_wait_slow(dsema, timeout);
+ return _dispatch_group_wait_slow(dg, timeout);
}
-DISPATCH_NOINLINE
-void
-dispatch_group_notify_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
- void (*func)(void *))
+DISPATCH_ALWAYS_INLINE
+static inline void
+_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
+ dispatch_continuation_t dsn)
{
- dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
- struct dispatch_sema_notify_s *dsn, *prev;
-
- // FIXME -- this should be updated to use the continuation cache
- while (!(dsn = calloc(1, sizeof(*dsn)))) {
- sleep(1);
- }
-
- dsn->dsn_queue = dq;
- dsn->dsn_ctxt = ctxt;
- dsn->dsn_func = func;
+ dsn->dc_data = dq;
+ dsn->do_next = NULL;
_dispatch_retain(dq);
- dispatch_atomic_store_barrier();
- prev = dispatch_atomic_xchg2o(dsema, dsema_notify_tail, dsn);
- if (fastpath(prev)) {
- prev->dsn_next = dsn;
- } else {
+ if (os_mpsc_push_update_tail(dg, dg_notify, dsn, do_next)) {
_dispatch_retain(dg);
- dsema->dsema_notify_head = dsn;
- if (dsema->dsema_value == dsema->dsema_orig) {
- _dispatch_group_wake(dsema);
+ os_atomic_store2o(dg, dg_notify_head, dsn, ordered);
+ // seq_cst with atomic store to notify_head <rdar://problem/11750916>
+ if (os_atomic_load2o(dg, dg_value, ordered) == 0) {
+ _dispatch_group_wake(dg, false);
}
}
}
-#ifdef __BLOCKS__
-void
-dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
- dispatch_block_t db)
-{
- dispatch_group_notify_f(dg, dq, _dispatch_Block_copy(db),
- _dispatch_call_block_and_release);
-}
-#endif
-
-#pragma mark -
-#pragma mark _dispatch_thread_semaphore_t
-
-DISPATCH_NOINLINE
-static _dispatch_thread_semaphore_t
-_dispatch_thread_semaphore_create(void)
-{
-#if USE_MACH_SEM
- semaphore_t s4;
- kern_return_t kr;
- while (slowpath(kr = semaphore_create(mach_task_self(), &s4,
- SYNC_POLICY_FIFO, 0))) {
- DISPATCH_VERIFY_MIG(kr);
- sleep(1);
- }
- return s4;
-#elif USE_POSIX_SEM
- sem_t s4;
- int ret = sem_init(&s4, 0, 0);
- DISPATCH_SEMAPHORE_VERIFY_RET(ret);
- return s4;
-#endif
-}
-
DISPATCH_NOINLINE
void
-_dispatch_thread_semaphore_dispose(_dispatch_thread_semaphore_t sema)
+dispatch_group_notify_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
+ dispatch_function_t func)
{
-#if USE_MACH_SEM
- semaphore_t s4 = (semaphore_t)sema;
- kern_return_t kr = semaphore_destroy(mach_task_self(), s4);
- DISPATCH_SEMAPHORE_VERIFY_KR(kr);
-#elif USE_POSIX_SEM
- sem_t s4 = (sem_t)sema;
- int ret = sem_destroy(&s4);
- DISPATCH_SEMAPHORE_VERIFY_RET(ret);
-#endif
+ dispatch_continuation_t dsn = _dispatch_continuation_alloc();
+ _dispatch_continuation_init_f(dsn, dq, ctxt, func, 0, 0,
+ DISPATCH_OBJ_CONSUME_BIT);
+ _dispatch_group_notify(dg, dq, dsn);
}
+#ifdef __BLOCKS__
void
-_dispatch_thread_semaphore_signal(_dispatch_thread_semaphore_t sema)
+dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
+ dispatch_block_t db)
{
-#if USE_MACH_SEM
- semaphore_t s4 = (semaphore_t)sema;
- kern_return_t kr = semaphore_signal(s4);
- DISPATCH_SEMAPHORE_VERIFY_KR(kr);
-#elif USE_POSIX_SEM
- sem_t s4 = (sem_t)sema;
- int ret = sem_post(&s4);
- DISPATCH_SEMAPHORE_VERIFY_RET(ret);
-#endif
+ dispatch_continuation_t dsn = _dispatch_continuation_alloc();
+ _dispatch_continuation_init(dsn, dq, db, 0, 0, DISPATCH_OBJ_CONSUME_BIT);
+ _dispatch_group_notify(dg, dq, dsn);
}
-
-void
-_dispatch_thread_semaphore_wait(_dispatch_thread_semaphore_t sema)
-{
-#if USE_MACH_SEM
- semaphore_t s4 = (semaphore_t)sema;
- kern_return_t kr;
- do {
- kr = semaphore_wait(s4);
- } while (slowpath(kr == KERN_ABORTED));
- DISPATCH_SEMAPHORE_VERIFY_KR(kr);
-#elif USE_POSIX_SEM
- sem_t s4 = (sem_t)sema;
- int ret;
- do {
- ret = sem_wait(&s4);
- } while (slowpath(ret != 0));
- DISPATCH_SEMAPHORE_VERIFY_RET(ret);
#endif
-}
-
-_dispatch_thread_semaphore_t
-_dispatch_get_thread_semaphore(void)
-{
- _dispatch_thread_semaphore_t sema = (_dispatch_thread_semaphore_t)
- _dispatch_thread_getspecific(dispatch_sema4_key);
- if (slowpath(!sema)) {
- return _dispatch_thread_semaphore_create();
- }
- _dispatch_thread_setspecific(dispatch_sema4_key, NULL);
- return sema;
-}
-
-void
-_dispatch_put_thread_semaphore(_dispatch_thread_semaphore_t sema)
-{
- _dispatch_thread_semaphore_t old_sema = (_dispatch_thread_semaphore_t)
- _dispatch_thread_getspecific(dispatch_sema4_key);
- _dispatch_thread_setspecific(dispatch_sema4_key, (void*)sema);
- if (slowpath(old_sema)) {
- return _dispatch_thread_semaphore_dispose(old_sema);
- }
-}