#include <sys/spawn_internal.h>
#include <mach-o/dyld.h>
-#include <libkern/OSAtomic.h>
-
#include <mach/mach_time.h>
#include <mach/mach.h>
#include <mach/task.h>
#include <pthread/qos_private.h>
+#include <sys/resource.h>
+
+#include <stdatomic.h>
+
typedef enum wake_type { WAKE_BROADCAST_ONESEM, WAKE_BROADCAST_PERTHREAD, WAKE_CHAIN, WAKE_HOP } wake_type_t;
typedef enum my_policy_type { MY_POLICY_REALTIME, MY_POLICY_TIMESHARE, MY_POLICY_FIXEDPRI } my_policy_type_t;
#define COMPUTATION_NANOS (10000000ll) /* 10 ms */
#define TRACEWORTHY_NANOS (10000000ll) /* 10 ms */
+#define DEBUG 0
+
#if DEBUG
#define debug_log(args...) printf(args)
#else
static void selfexec_with_apptype(int argc, char *argv[]);
static void parse_args(int argc, char *argv[]);
+static __attribute__((aligned(128))) _Atomic uint32_t g_done_threads;
+static __attribute__((aligned(128))) _Atomic boolean_t g_churn_stop = FALSE;
+static __attribute__((aligned(128))) _Atomic uint64_t g_churn_stopped_at = 0;
+
/* Global variables (general) */
static uint32_t g_numcpus;
static uint32_t g_numthreads;
static struct mach_timebase_info g_mti;
static semaphore_t g_main_sem;
static uint64_t *g_thread_endtimes_abs;
-static volatile uint32_t g_done_threads;
static boolean_t g_verbose = FALSE;
static boolean_t g_do_affinity = FALSE;
static uint64_t g_starttime_abs;
static uint32_t g_iteration_sleeptime_us = 0;
+static uint32_t g_priority = 0;
+static uint32_t g_churn_pri = 0;
+static uint32_t g_churn_count = 0;
+
+static pthread_t* g_churn_threads = NULL;
/* Threshold for dropping a 'bad run' tracepoint */
static uint64_t g_traceworthy_latency_ns = TRACEWORTHY_NANOS;
/* Every thread spins until all threads have checked in */
static boolean_t g_do_all_spin = FALSE;
+/* Every thread backgrounds temporarily before parking */
+static boolean_t g_drop_priority = FALSE;
+
/* One randomly chosen thread holds up the train for a certain duration. */
static boolean_t g_do_one_long_spin = FALSE;
static uint32_t g_one_long_spin_id = 0;
return (uint64_t)(ns * (((double)g_mti.denom) / ((double)g_mti.numer)));
}
+inline static void
+yield(void)
+{
+#if defined(__arm__) || defined(__arm64__)
+ asm volatile("yield");
+#elif defined(__x86_64__) || defined(__i386__)
+ asm volatile("pause");
+#else
+#error Unrecognized architecture
+#endif
+}
+
+static void *
+churn_thread(__unused void *arg)
+{
+ uint64_t spin_count = 0;
+
+ /*
+ * As a safety measure to avoid wedging, we will bail on the spin if
+ * it's been more than 1s after the most recent run start
+ */
+
+ while (g_churn_stop == FALSE &&
+ mach_absolute_time() < (g_starttime_abs + NSEC_PER_SEC)) {
+ spin_count++;
+ yield();
+ }
+
+ /* This is totally racy, but only here to detect if anyone stops early */
+ atomic_fetch_add_explicit(&g_churn_stopped_at, spin_count, memory_order_relaxed);
+
+ return NULL;
+}
+
+static void
+create_churn_threads()
+{
+ if (g_churn_count == 0)
+ g_churn_count = g_numcpus - 1;
+
+ errno_t err;
+
+ struct sched_param param = { .sched_priority = (int)g_churn_pri };
+ pthread_attr_t attr;
+
+ /* Array for churn threads */
+ g_churn_threads = (pthread_t*) valloc(sizeof(pthread_t) * g_churn_count);
+ assert(g_churn_threads);
+
+ if ((err = pthread_attr_init(&attr)))
+ errc(EX_OSERR, err, "pthread_attr_init");
+
+ if ((err = pthread_attr_setschedparam(&attr, ¶m)))
+ errc(EX_OSERR, err, "pthread_attr_setschedparam");
+
+ if ((err = pthread_attr_setschedpolicy(&attr, SCHED_RR)))
+ errc(EX_OSERR, err, "pthread_attr_setschedpolicy");
+
+ for (uint32_t i = 0 ; i < g_churn_count ; i++) {
+ pthread_t new_thread;
+
+ if ((err = pthread_create(&new_thread, &attr, churn_thread, NULL)))
+ errc(EX_OSERR, err, "pthread_create");
+ g_churn_threads[i] = new_thread;
+ }
+
+ if ((err = pthread_attr_destroy(&attr)))
+ errc(EX_OSERR, err, "pthread_attr_destroy");
+}
+
+static void
+join_churn_threads(void)
+{
+ if (atomic_load_explicit(&g_churn_stopped_at, memory_order_seq_cst) != 0)
+ printf("Warning: Some of the churn threads may have stopped early: %lld\n",
+ g_churn_stopped_at);
+
+ atomic_store_explicit(&g_churn_stop, TRUE, memory_order_seq_cst);
+
+ /* Rejoin churn threads */
+ for (uint32_t i = 0; i < g_churn_count; i++) {
+ errno_t err = pthread_join(g_churn_threads[i], NULL);
+ if (err) errc(EX_OSERR, err, "pthread_join %d", i);
+ }
+}
+
/*
* Figure out what thread policy to use
*/
errno_t ret;
thread_time_constraint_policy_data_t pol;
+ if (g_priority) {
+ int policy = SCHED_OTHER;
+ if (g_policy == MY_POLICY_FIXEDPRI)
+ policy = SCHED_RR;
+
+ struct sched_param param = {.sched_priority = (int)g_priority};
+ if ((ret = pthread_setschedparam(pthread_self(), policy, ¶m)))
+ errc(EX_OSERR, ret, "pthread_setschedparam: %d", my_id);
+ }
+
switch (g_policy) {
case MY_POLICY_TIMESHARE:
break;
debug_log("%d Leader thread go\n", i);
- assert_zero_t(my_id, g_done_threads);
+ assert_zero_t(my_id, atomic_load_explicit(&g_done_threads, memory_order_relaxed));
switch (g_waketype) {
case WAKE_BROADCAST_ONESEM:
}
}
- int32_t new = OSAtomicIncrement32((volatile int32_t *)&g_done_threads);
- (void)new;
+ uint32_t done_threads;
+ done_threads = atomic_fetch_add_explicit(&g_done_threads, 1, memory_order_relaxed) + 1;
- debug_log("Thread %p new value is %d, iteration %d\n", pthread_self(), new, i);
+ debug_log("Thread %p new value is %d, iteration %d\n", pthread_self(), done_threads, i);
+
+ if (g_drop_priority) {
+ /* Drop priority to BG momentarily */
+ errno_t ret = setpriority(PRIO_DARWIN_THREAD, 0, PRIO_DARWIN_BG);
+ if (ret) errc(EX_OSERR, ret, "setpriority PRIO_DARWIN_BG");
+ }
if (g_do_all_spin) {
/* Everyone spins until the last thread checks in. */
- while (g_done_threads < g_numthreads) {
+ while (atomic_load_explicit(&g_done_threads, memory_order_relaxed) < g_numthreads) {
y = y + 1.5 + x;
x = sqrt(y);
}
}
+ if (g_drop_priority) {
+ /* Restore normal priority */
+ errno_t ret = setpriority(PRIO_DARWIN_THREAD, 0, 0);
+ if (ret) errc(EX_OSERR, ret, "setpriority 0");
+ }
+
debug_log("Thread %p done spinning, iteration %d\n", pthread_self(), i);
}
kr = semaphore_create(mach_task_self(), &g_readysem, SYNC_POLICY_FIFO, 0);
mach_assert_zero(kr);
+ atomic_store_explicit(&g_done_threads, 0, memory_order_relaxed);
+
/* Create the threads */
- g_done_threads = 0;
for (uint32_t i = 0; i < g_numthreads; i++) {
ret = pthread_create(&threads[i], NULL, worker_thread, (void*)(uintptr_t)i);
if (ret) errc(EX_OSERR, ret, "pthread_create %d", i);
thread_setup(0);
+ g_starttime_abs = mach_absolute_time();
+
+ if (g_churn_pri)
+ create_churn_threads();
+
/* Let everyone get settled */
kr = semaphore_wait(g_main_sem);
mach_assert_zero(kr);
debug_log("%d Main thread reset\n", i);
- g_done_threads = 0;
- OSMemoryBarrier();
+ atomic_store_explicit(&g_done_threads, 0, memory_order_seq_cst);
g_starttime_abs = mach_absolute_time();
debug_log("%d Main thread return\n", i);
+ assert(atomic_load_explicit(&g_done_threads, memory_order_relaxed) == g_numthreads);
+
/*
* We report the worst latencies relative to start time
* and relative to the lead worker thread.
if (ret) errc(EX_OSERR, ret, "pthread_join %d", i);
}
+ if (g_churn_pri)
+ join_churn_threads();
+
compute_stats(worst_latencies_ns, g_iterations, &avg, &max, &min, &stddev);
printf("Results (from a stop):\n");
printf("Max:\t\t%.2f us\n", ((float)max) / 1000.0);
static void __attribute__((noreturn))
usage()
{
- errx(EX_USAGE, "Usage: zn <threads> <chain | hop | broadcast-single-sem | broadcast-per-thread> "
- "<realtime | timeshare | fixed> <iterations> [--trace <traceworthy latency in ns>] "
- "[--spin-one] [--spin-all] [--spin-time <nanos>] [--affinity] [--no-sleep] [--verbose]");
+ errx(EX_USAGE, "Usage: %s <threads> <chain | hop | broadcast-single-sem | broadcast-per-thread> "
+ "<realtime | timeshare | fixed> <iterations>\n\t\t"
+ "[--trace <traceworthy latency in ns>] "
+ "[--verbose] [--spin-one] [--spin-all] [--spin-time <nanos>] [--affinity]\n\t\t"
+ "[--no-sleep] [--drop-priority] [--churn-pri <pri>] [--churn-count <n>]",
+ getprogname());
+}
+
+static struct option* g_longopts;
+static int option_index;
+
+static uint32_t
+read_dec_arg()
+{
+ char *cp;
+ /* char* optarg is a magic global */
+
+ uint32_t arg_val = (uint32_t)strtoull(optarg, &cp, 10);
+
+ if (cp == optarg || *cp)
+ errx(EX_USAGE, "arg --%s requires a decimal number, found \"%s\"",
+ g_longopts[option_index].name, optarg);
+
+ return arg_val;
}
static void
parse_args(int argc, char *argv[])
{
- int ch, option_index = 0;
- char *cp;
+ enum {
+ OPT_GETOPT = 0,
+ OPT_SPIN_TIME,
+ OPT_TRACE,
+ OPT_PRIORITY,
+ OPT_CHURN_PRI,
+ OPT_CHURN_COUNT,
+ };
static struct option longopts[] = {
- { "spin-time", required_argument, NULL, 2 },
- { "trace", required_argument, NULL, 3 },
+ { "spin-time", required_argument, NULL, OPT_SPIN_TIME },
+ { "trace", required_argument, NULL, OPT_TRACE },
+ { "priority", required_argument, NULL, OPT_PRIORITY },
+ { "churn-pri", required_argument, NULL, OPT_CHURN_PRI },
+ { "churn-count", required_argument, NULL, OPT_CHURN_COUNT },
{ "switched_apptype", no_argument, (int*)&g_seen_apptype, TRUE },
{ "spin-one", no_argument, (int*)&g_do_one_long_spin, TRUE },
{ "spin-all", no_argument, (int*)&g_do_all_spin, TRUE },
{ "affinity", no_argument, (int*)&g_do_affinity, TRUE },
{ "no-sleep", no_argument, (int*)&g_do_sleep, FALSE },
+ { "drop-priority", no_argument, (int*)&g_drop_priority, TRUE },
{ "verbose", no_argument, (int*)&g_verbose, TRUE },
{ "help", no_argument, NULL, 'h' },
{ NULL, 0, NULL, 0 }
};
+ g_longopts = longopts;
+ int ch = 0;
+
while ((ch = getopt_long(argc, argv, "h", longopts, &option_index)) != -1) {
switch (ch) {
- case 0:
+ case OPT_GETOPT:
/* getopt_long set a variable */
break;
- case 2:
- /* spin-time */
+ case OPT_SPIN_TIME:
g_do_each_spin = TRUE;
- g_each_spin_duration_ns = strtoull(optarg, &cp, 10);
-
- if (cp == optarg || *cp)
- errx(EX_USAGE, "arg --%s requires a decimal number, found \"%s\"",
- longopts[option_index].name, optarg);
+ g_each_spin_duration_ns = read_dec_arg();
break;
- case 3:
- /* trace */
- g_traceworthy_latency_ns = strtoull(optarg, &cp, 10);
-
- if (cp == optarg || *cp)
- errx(EX_USAGE, "arg --%s requires a decimal number, found \"%s\"",
- longopts[option_index].name, optarg);
+ case OPT_TRACE:
+ g_traceworthy_latency_ns = read_dec_arg();
+ break;
+ case OPT_PRIORITY:
+ g_priority = read_dec_arg();
+ break;
+ case OPT_CHURN_PRI:
+ g_churn_pri = read_dec_arg();
+ break;
+ case OPT_CHURN_COUNT:
+ g_churn_count = read_dec_arg();
break;
case '?':
case 'h':
usage();
}
+ char *cp;
+
/* How many threads? */
g_numthreads = (uint32_t)strtoull(argv[0], &cp, 10);