#include <unistd.h>
#include <stdio.h>
#include <math.h>
-#include <sys/wait.h>
-#include <sys/param.h>
-#include <sys/syscall.h>
-#include <sys/types.h>
-#include <sys/ptrace.h>
-#include <semaphore.h>
+#include <sys/kdebug.h>
#include <stdlib.h>
#include <pthread.h>
-#include <fcntl.h>
#include <errno.h>
#include <err.h>
#include <string.h>
+#include <assert.h>
+#include <sysexits.h>
+#include <sys/sysctl.h>
+#include <getopt.h>
#include <spawn.h>
#include <spawn_private.h>
#include <sys/spawn_internal.h>
#include <mach-o/dyld.h>
-#include <libkern/OSAtomic.h>
-
#include <mach/mach_time.h>
#include <mach/mach.h>
#include <mach/task.h>
#include <mach/semaphore.h>
-typedef enum wake_type { WAKE_BROADCAST_ONESEM, WAKE_BROADCAST_PERTHREAD, WAKE_CHAIN } wake_type_t;
+#include <pthread/qos_private.h>
+
+#include <sys/resource.h>
+
+#include <stdatomic.h>
+
+typedef enum wake_type { WAKE_BROADCAST_ONESEM, WAKE_BROADCAST_PERTHREAD, WAKE_CHAIN, WAKE_HOP } wake_type_t;
typedef enum my_policy_type { MY_POLICY_REALTIME, MY_POLICY_TIMESHARE, MY_POLICY_FIXEDPRI } my_policy_type_t;
-#define assert(truth, label) do { if(!(truth)) { printf("Thread %p: failure on line %d\n", pthread_self(), __LINE__); goto label; } } while (0)
+#define mach_assert_zero(error) do { if ((error) != 0) { fprintf(stderr, "[FAIL] error %d (%s) ", (error), mach_error_string(error)); assert(error == 0); } } while (0)
+#define mach_assert_zero_t(tid, error) do { if ((error) != 0) { fprintf(stderr, "[FAIL] Thread %d error %d (%s) ", (tid), (error), mach_error_string(error)); assert(error == 0); } } while (0)
+#define assert_zero_t(tid, error) do { if ((error) != 0) { fprintf(stderr, "[FAIL] Thread %d error %d ", (tid), (error)); assert(error == 0); } } while (0)
#define CONSTRAINT_NANOS (20000000ll) /* 20 ms */
#define COMPUTATION_NANOS (10000000ll) /* 10 ms */
#define TRACEWORTHY_NANOS (10000000ll) /* 10 ms */
+#define DEBUG 0
+
#if DEBUG
#define debug_log(args...) printf(args)
#else
#endif
/* Declarations */
-void* child_thread_func(void *arg);
-void print_usage();
-int thread_setup(int my_id);
-my_policy_type_t parse_thread_policy(const char *str);
-int thread_finish_iteration();
-void selfexec_with_apptype(int argc, char *argv[]);
+static void* worker_thread(void *arg);
+static void usage();
+static int thread_setup(uint32_t my_id);
+static my_policy_type_t parse_thread_policy(const char *str);
+static void selfexec_with_apptype(int argc, char *argv[]);
+static void parse_args(int argc, char *argv[]);
+
+static __attribute__((aligned(128))) _Atomic uint32_t g_done_threads;
+static __attribute__((aligned(128))) _Atomic boolean_t g_churn_stop = FALSE;
+static __attribute__((aligned(128))) _Atomic uint64_t g_churn_stopped_at = 0;
/* Global variables (general) */
-int g_numthreads;
-wake_type_t g_waketype;
-policy_t g_policy;
-int g_iterations;
-struct mach_timebase_info g_mti;
-semaphore_t g_main_sem;
-uint64_t *g_thread_endtimes_abs;
-volatile int32_t g_done_threads;
-boolean_t g_do_spin = FALSE;
-boolean_t g_verbose = FALSE;
-boolean_t g_do_affinity = FALSE;
-uint64_t g_starttime_abs;
-#if MIMIC_DIGI_LEAD_TIME
-int g_long_spinid;
-uint64_t g_spinlength_abs;
-#endif /* MIMIC_DIGI_LEAD_TIME */
+static uint32_t g_numcpus;
+static uint32_t g_numthreads;
+static wake_type_t g_waketype;
+static policy_t g_policy;
+static uint32_t g_iterations;
+static struct mach_timebase_info g_mti;
+static semaphore_t g_main_sem;
+static uint64_t *g_thread_endtimes_abs;
+static boolean_t g_verbose = FALSE;
+static boolean_t g_do_affinity = FALSE;
+static uint64_t g_starttime_abs;
+static uint32_t g_iteration_sleeptime_us = 0;
+static uint32_t g_priority = 0;
+static uint32_t g_churn_pri = 0;
+static uint32_t g_churn_count = 0;
+
+static pthread_t* g_churn_threads = NULL;
+
+/* Threshold for dropping a 'bad run' tracepoint */
+static uint64_t g_traceworthy_latency_ns = TRACEWORTHY_NANOS;
+
+/* Have we re-execed to set apptype? */
+static boolean_t g_seen_apptype = FALSE;
+
+/* usleep in betweeen iterations */
+static boolean_t g_do_sleep = TRUE;
+
+/* Every thread spins until all threads have checked in */
+static boolean_t g_do_all_spin = FALSE;
+
+/* Every thread backgrounds temporarily before parking */
+static boolean_t g_drop_priority = FALSE;
+
+/* One randomly chosen thread holds up the train for a certain duration. */
+static boolean_t g_do_one_long_spin = FALSE;
+static uint32_t g_one_long_spin_id = 0;
+static uint64_t g_one_long_spin_length_abs = 0;
+static uint64_t g_one_long_spin_length_ns = 0;
+
+/* Each thread spins for a certain duration after waking up before blocking again. */
+static boolean_t g_do_each_spin = FALSE;
+static uint64_t g_each_spin_duration_abs = 0;
+static uint64_t g_each_spin_duration_ns = 0;
/* Global variables (broadcast) */
-semaphore_t g_machsem;
-semaphore_t g_leadersem;
+static semaphore_t g_broadcastsem;
+static semaphore_t g_leadersem;
+static semaphore_t g_readysem;
+static semaphore_t g_donesem;
/* Global variables (chain) */
-semaphore_t *g_semarr;
+static semaphore_t *g_semarr;
-uint64_t
+static uint64_t
abs_to_nanos(uint64_t abstime)
{
return (uint64_t)(abstime * (((double)g_mti.numer) / ((double)g_mti.denom)));
}
-uint64_t
+static uint64_t
nanos_to_abs(uint64_t ns)
{
return (uint64_t)(ns * (((double)g_mti.denom) / ((double)g_mti.numer)));
}
+inline static void
+yield(void)
+{
+#if defined(__arm__) || defined(__arm64__)
+ asm volatile("yield");
+#elif defined(__x86_64__) || defined(__i386__)
+ asm volatile("pause");
+#else
+#error Unrecognized architecture
+#endif
+}
+
+static void *
+churn_thread(__unused void *arg)
+{
+ uint64_t spin_count = 0;
+
+ /*
+ * As a safety measure to avoid wedging, we will bail on the spin if
+ * it's been more than 1s after the most recent run start
+ */
+
+ while (g_churn_stop == FALSE &&
+ mach_absolute_time() < (g_starttime_abs + NSEC_PER_SEC)) {
+ spin_count++;
+ yield();
+ }
+
+ /* This is totally racy, but only here to detect if anyone stops early */
+ atomic_fetch_add_explicit(&g_churn_stopped_at, spin_count, memory_order_relaxed);
+
+ return NULL;
+}
+
+static void
+create_churn_threads()
+{
+ if (g_churn_count == 0)
+ g_churn_count = g_numcpus - 1;
+
+ errno_t err;
+
+ struct sched_param param = { .sched_priority = (int)g_churn_pri };
+ pthread_attr_t attr;
+
+ /* Array for churn threads */
+ g_churn_threads = (pthread_t*) valloc(sizeof(pthread_t) * g_churn_count);
+ assert(g_churn_threads);
+
+ if ((err = pthread_attr_init(&attr)))
+ errc(EX_OSERR, err, "pthread_attr_init");
+
+ if ((err = pthread_attr_setschedparam(&attr, ¶m)))
+ errc(EX_OSERR, err, "pthread_attr_setschedparam");
+
+ if ((err = pthread_attr_setschedpolicy(&attr, SCHED_RR)))
+ errc(EX_OSERR, err, "pthread_attr_setschedpolicy");
+
+ for (uint32_t i = 0 ; i < g_churn_count ; i++) {
+ pthread_t new_thread;
+
+ if ((err = pthread_create(&new_thread, &attr, churn_thread, NULL)))
+ errc(EX_OSERR, err, "pthread_create");
+ g_churn_threads[i] = new_thread;
+ }
+
+ if ((err = pthread_attr_destroy(&attr)))
+ errc(EX_OSERR, err, "pthread_attr_destroy");
+}
+
+static void
+join_churn_threads(void)
+{
+ if (atomic_load_explicit(&g_churn_stopped_at, memory_order_seq_cst) != 0)
+ printf("Warning: Some of the churn threads may have stopped early: %lld\n",
+ g_churn_stopped_at);
+
+ atomic_store_explicit(&g_churn_stop, TRUE, memory_order_seq_cst);
+
+ /* Rejoin churn threads */
+ for (uint32_t i = 0; i < g_churn_count; i++) {
+ errno_t err = pthread_join(g_churn_threads[i], NULL);
+ if (err) errc(EX_OSERR, err, "pthread_join %d", i);
+ }
+}
+
/*
* Figure out what thread policy to use
*/
-my_policy_type_t
+static my_policy_type_t
parse_thread_policy(const char *str)
{
if (strcmp(str, "timeshare") == 0) {
} else if (strcmp(str, "fixed") == 0) {
return MY_POLICY_FIXEDPRI;
} else {
- printf("Invalid thread policy %s\n", str);
- exit(1);
+ errx(EX_USAGE, "Invalid thread policy \"%s\"", str);
}
}
/*
* Figure out what wakeup pattern to use
*/
-wake_type_t
+static wake_type_t
parse_wakeup_pattern(const char *str)
{
if (strcmp(str, "chain") == 0) {
return WAKE_CHAIN;
+ } else if (strcmp(str, "hop") == 0) {
+ return WAKE_HOP;
} else if (strcmp(str, "broadcast-single-sem") == 0) {
return WAKE_BROADCAST_ONESEM;
} else if (strcmp(str, "broadcast-per-thread") == 0) {
return WAKE_BROADCAST_PERTHREAD;
} else {
- print_usage();
- exit(1);
+ errx(EX_USAGE, "Invalid wakeup pattern \"%s\"", str);
}
}
/*
* Set policy
*/
-int
-thread_setup(int my_id)
+static int
+thread_setup(uint32_t my_id)
{
- int res;
+ kern_return_t kr;
+ errno_t ret;
+ thread_time_constraint_policy_data_t pol;
+
+ if (g_priority) {
+ int policy = SCHED_OTHER;
+ if (g_policy == MY_POLICY_FIXEDPRI)
+ policy = SCHED_RR;
+
+ struct sched_param param = {.sched_priority = (int)g_priority};
+ if ((ret = pthread_setschedparam(pthread_self(), policy, ¶m)))
+ errc(EX_OSERR, ret, "pthread_setschedparam: %d", my_id);
+ }
switch (g_policy) {
case MY_POLICY_TIMESHARE:
- {
- res = KERN_SUCCESS;
break;
- }
- case MY_POLICY_REALTIME:
- {
- thread_time_constraint_policy_data_t pol;
-
+ case MY_POLICY_REALTIME:
/* Hard-coded realtime parameters (similar to what Digi uses) */
- pol.period = 100000;
- pol.constraint = nanos_to_abs(CONSTRAINT_NANOS);
- pol.computation = nanos_to_abs(COMPUTATION_NANOS);
+ pol.period = 100000;
+ pol.constraint = (uint32_t) nanos_to_abs(CONSTRAINT_NANOS);
+ pol.computation = (uint32_t) nanos_to_abs(COMPUTATION_NANOS);
pol.preemptible = 0; /* Ignored by OS */
- res = thread_policy_set(mach_thread_self(), THREAD_TIME_CONSTRAINT_POLICY, (thread_policy_t) &pol, THREAD_TIME_CONSTRAINT_POLICY_COUNT);
- assert(res == 0, fail);
+ kr = thread_policy_set(mach_thread_self(), THREAD_TIME_CONSTRAINT_POLICY,
+ (thread_policy_t) &pol, THREAD_TIME_CONSTRAINT_POLICY_COUNT);
+ mach_assert_zero_t(my_id, kr);
break;
- }
- case MY_POLICY_FIXEDPRI:
- {
- thread_extended_policy_data_t pol;
- pol.timeshare = 0;
-
- res = thread_policy_set(mach_thread_self(), THREAD_EXTENDED_POLICY, (thread_policy_t) &pol, THREAD_EXTENDED_POLICY_COUNT);
- assert(res == 0, fail);
+ case MY_POLICY_FIXEDPRI:
+ ret = pthread_set_fixedpriority_self();
+ if (ret) errc(EX_OSERR, ret, "pthread_set_fixedpriority_self");
break;
- }
default:
- {
- printf("invalid policy type\n");
- return 1;
- }
+ errx(EX_USAGE, "invalid policy type %d", g_policy);
}
if (g_do_affinity) {
affinity.affinity_tag = my_id % 2;
- res = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY, (thread_policy_t)&affinity, THREAD_AFFINITY_POLICY_COUNT);
- assert(res == 0, fail);
+ kr = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY,
+ (thread_policy_t)&affinity, THREAD_AFFINITY_POLICY_COUNT);
+ mach_assert_zero_t(my_id, kr);
}
return 0;
-fail:
- return 1;
}
/*
- * Wake up main thread if everyone's done
+ * Wait for a wakeup, potentially wake up another of the "0-N" threads,
+ * and notify the main thread when done.
*/
-int
-thread_finish_iteration(int id)
+static void*
+worker_thread(void *arg)
{
- int32_t new;
- int res = 0;
- volatile float x = 0.0;
- volatile float y = 0.0;
+ uint32_t my_id = (uint32_t)(uintptr_t)arg;
+ kern_return_t kr;
- debug_log("Thread %p finished iteration.\n", pthread_self());
-
-#if MIMIC_DIGI_LEAD_TIME
- /*
- * One randomly chosen thread determines when everybody gets to stop.
- */
- if (g_do_spin) {
- if (g_long_spinid == id) {
- uint64_t endspin;
+ volatile double x = 0.0;
+ volatile double y = 0.0;
- /* This thread took up fully half of his computation */
- endspin = g_starttime_abs + g_spinlength_abs;
- while (mach_absolute_time() < endspin) {
- y = y + 1.5 + x;
- x = sqrt(y);
- }
- }
- }
-#endif /* MIMIC_DIGI_LEAD_TIME */
-
- new = OSAtomicIncrement32(&g_done_threads);
+ /* Set policy and so forth */
+ thread_setup(my_id);
- debug_log("New value is %d\n", new);
+ for (uint32_t i = 0; i < g_iterations; i++) {
+ if (my_id == 0) {
+ /*
+ * Leader thread either wakes everyone up or starts the chain going.
+ */
- /*
- * When the last thread finishes, everyone gets to go back to sleep.
- */
- if (new == g_numthreads) {
- debug_log("Thread %p signalling main thread.\n", pthread_self());
- res = semaphore_signal(g_main_sem);
- } else {
-#ifndef MIMIC_DIGI_LEAD_TIME
- if (g_do_spin) {
- while (g_done_threads < g_numthreads) {
- y = y + 1.5 + x;
- x = sqrt(y);
- }
- }
-#endif
- }
+ /* Give the worker threads undisturbed time to finish before waiting on them */
+ if (g_do_sleep)
+ usleep(g_iteration_sleeptime_us);
- return res;
-}
+ debug_log("%d Leader thread wait for ready\n", i);
-/*
- * Wait for a wakeup, potentially wake up another of the "0-N" threads,
- * and notify the main thread when done.
- */
-void*
-child_thread_func(void *arg)
-{
- int my_id = (int)(uintptr_t)arg;
- int res;
- int i, j;
- int32_t new;
+ /*
+ * Wait for everyone else to declare ready
+ * Is there a better way to do this that won't interfere with the rest of the chain?
+ * TODO: Invent 'semaphore wait for N signals'
+ */
- /* Set policy and so forth */
- thread_setup(my_id);
+ for (uint32_t j = 0 ; j < g_numthreads - 1; j++) {
+ kr = semaphore_wait(g_readysem);
+ mach_assert_zero_t(my_id, kr);
+ }
- /* Tell main thread when everyone has set up */
- new = OSAtomicIncrement32(&g_done_threads);
- semaphore_signal(g_main_sem);
+ debug_log("%d Leader thread wait\n", i);
+
+ /* Signal main thread and wait for start of iteration */
+
+ kr = semaphore_wait_signal(g_leadersem, g_main_sem);
+ mach_assert_zero_t(my_id, kr);
- /* For each iteration */
- for (i = 0; i < g_iterations; i++) {
- /*
- * Leader thread either wakes everyone up or starts the chain going.
- */
- if (my_id == 0) {
- res = semaphore_wait(g_leadersem);
- assert(res == 0, fail);
-
g_thread_endtimes_abs[my_id] = mach_absolute_time();
-#if MIMIC_DIGI_LEAD_TIME
- g_long_spinid = rand() % g_numthreads;
-#endif /* MIMIC_DIGI_LEAD_TIME */
+ debug_log("%d Leader thread go\n", i);
+
+ assert_zero_t(my_id, atomic_load_explicit(&g_done_threads, memory_order_relaxed));
switch (g_waketype) {
- case WAKE_CHAIN:
- semaphore_signal(g_semarr[my_id + 1]);
- break;
- case WAKE_BROADCAST_ONESEM:
- semaphore_signal_all(g_machsem);
+ case WAKE_BROADCAST_ONESEM:
+ kr = semaphore_signal_all(g_broadcastsem);
+ mach_assert_zero_t(my_id, kr);
break;
case WAKE_BROADCAST_PERTHREAD:
- for (j = 1; j < g_numthreads; j++) {
- semaphore_signal(g_semarr[j]);
+ for (uint32_t j = 1; j < g_numthreads; j++) {
+ kr = semaphore_signal(g_semarr[j]);
+ mach_assert_zero_t(my_id, kr);
}
break;
- default:
- printf("Invalid wakeup type?!\n");
- exit(1);
+ case WAKE_CHAIN:
+ kr = semaphore_signal(g_semarr[my_id + 1]);
+ mach_assert_zero_t(my_id, kr);
+ break;
+ case WAKE_HOP:
+ kr = semaphore_wait_signal(g_donesem, g_semarr[my_id + 1]);
+ mach_assert_zero_t(my_id, kr);
+ break;
}
} else {
/*
* Everyone else waits to be woken up,
- * records when she wake up, and possibly
+ * records when she wakes up, and possibly
* wakes up a friend.
*/
switch(g_waketype) {
case WAKE_BROADCAST_ONESEM:
- res = semaphore_wait(g_machsem);
- assert(res == KERN_SUCCESS, fail);
+ kr = semaphore_wait_signal(g_broadcastsem, g_readysem);
+ mach_assert_zero_t(my_id, kr);
g_thread_endtimes_abs[my_id] = mach_absolute_time();
-
break;
- /*
- * For the chain wakeup case:
- * wait, record time, signal next thread if appropriate
- */
+
case WAKE_BROADCAST_PERTHREAD:
- res = semaphore_wait(g_semarr[my_id]);
- assert(res == 0, fail);
+ kr = semaphore_wait_signal(g_semarr[my_id], g_readysem);
+ mach_assert_zero_t(my_id, kr);
g_thread_endtimes_abs[my_id] = mach_absolute_time();
break;
case WAKE_CHAIN:
- res = semaphore_wait(g_semarr[my_id]);
- assert(res == 0, fail);
+ kr = semaphore_wait_signal(g_semarr[my_id], g_readysem);
+ mach_assert_zero_t(my_id, kr);
+
+ /* Signal the next thread *after* recording wake time */
+
+ g_thread_endtimes_abs[my_id] = mach_absolute_time();
+
+ if (my_id < (g_numthreads - 1)) {
+ kr = semaphore_signal(g_semarr[my_id + 1]);
+ mach_assert_zero_t(my_id, kr);
+ }
+
+ break;
+
+ case WAKE_HOP:
+ kr = semaphore_wait_signal(g_semarr[my_id], g_readysem);
+ mach_assert_zero_t(my_id, kr);
+
+ /* Signal the next thread *after* recording wake time */
g_thread_endtimes_abs[my_id] = mach_absolute_time();
if (my_id < (g_numthreads - 1)) {
- res = semaphore_signal(g_semarr[my_id + 1]);
- assert(res == 0, fail);
+ kr = semaphore_wait_signal(g_donesem, g_semarr[my_id + 1]);
+ mach_assert_zero_t(my_id, kr);
+ } else {
+ kr = semaphore_signal_all(g_donesem);
+ mach_assert_zero_t(my_id, kr);
}
break;
- default:
- printf("Invalid wake type.\n");
- goto fail;
}
}
- res = thread_finish_iteration(my_id);
- assert(res == 0, fail);
+ debug_log("Thread %p woke up for iteration %d.\n", pthread_self(), i);
+
+ if (g_do_one_long_spin && g_one_long_spin_id == my_id) {
+ /* One randomly chosen thread holds up the train for a while. */
+
+ uint64_t endspin = g_starttime_abs + g_one_long_spin_length_abs;
+ while (mach_absolute_time() < endspin) {
+ y = y + 1.5 + x;
+ x = sqrt(y);
+ }
+ }
+
+ if (g_do_each_spin) {
+ /* Each thread spins for a certain duration after waking up before blocking again. */
+
+ uint64_t endspin = mach_absolute_time() + g_each_spin_duration_abs;
+ while (mach_absolute_time() < endspin) {
+ y = y + 1.5 + x;
+ x = sqrt(y);
+ }
+ }
+
+ uint32_t done_threads;
+ done_threads = atomic_fetch_add_explicit(&g_done_threads, 1, memory_order_relaxed) + 1;
+
+ debug_log("Thread %p new value is %d, iteration %d\n", pthread_self(), done_threads, i);
+
+ if (g_drop_priority) {
+ /* Drop priority to BG momentarily */
+ errno_t ret = setpriority(PRIO_DARWIN_THREAD, 0, PRIO_DARWIN_BG);
+ if (ret) errc(EX_OSERR, ret, "setpriority PRIO_DARWIN_BG");
+ }
+
+ if (g_do_all_spin) {
+ /* Everyone spins until the last thread checks in. */
+
+ while (atomic_load_explicit(&g_done_threads, memory_order_relaxed) < g_numthreads) {
+ y = y + 1.5 + x;
+ x = sqrt(y);
+ }
+ }
+
+ if (g_drop_priority) {
+ /* Restore normal priority */
+ errno_t ret = setpriority(PRIO_DARWIN_THREAD, 0, 0);
+ if (ret) errc(EX_OSERR, ret, "setpriority 0");
+ }
+
+ debug_log("Thread %p done spinning, iteration %d\n", pthread_self(), i);
}
- return 0;
-fail:
- exit(1);
-}
+ if (my_id == 0) {
+ /* Give the worker threads undisturbed time to finish before waiting on them */
+ if (g_do_sleep)
+ usleep(g_iteration_sleeptime_us);
-/*
- * Admittedly not very attractive.
- */
-void
-print_usage()
-{
- printf("Usage: zn <num threads> <chain | broadcast-single-sem | broadcast-per-thread> <realtime | timeshare | fixed> <num iterations> [-trace <traceworthy latency in ns>] [-spin] [-affinity] [-verbose]\n");
+ /* Wait for the worker threads to finish */
+ for (uint32_t i = 0 ; i < g_numthreads - 1; i++) {
+ kr = semaphore_wait(g_readysem);
+ mach_assert_zero_t(my_id, kr);
+ }
+
+ /* Tell everyone and the main thread that the last iteration is done */
+ debug_log("%d Leader thread done\n", i);
+
+ kr = semaphore_signal_all(g_main_sem);
+ mach_assert_zero_t(my_id, kr);
+ } else {
+ /* Hold up thread teardown so it doesn't affect the last iteration */
+ kr = semaphore_wait_signal(g_main_sem, g_readysem);
+ mach_assert_zero_t(my_id, kr);
+ }
+
+ return 0;
}
/*
* Given an array of uint64_t values, compute average, max, min, and standard deviation
*/
-void
+static void
compute_stats(uint64_t *values, uint64_t count, float *averagep, uint64_t *maxp, uint64_t *minp, float *stddevp)
{
- int i;
+ uint32_t i;
uint64_t _sum = 0;
uint64_t _max = 0;
uint64_t _min = UINT64_MAX;
int
main(int argc, char **argv)
{
- int i;
- int res;
+ errno_t ret;
+ kern_return_t kr;
+
pthread_t *threads;
uint64_t *worst_latencies_ns;
uint64_t *worst_latencies_from_first_ns;
- uint64_t last_end;
uint64_t max, min;
- uint64_t traceworthy_latency_ns = TRACEWORTHY_NANOS;
float avg, stddev;
- boolean_t seen_apptype = FALSE;
- srand(time(NULL));
+ for (int i = 0; i < argc; i++)
+ if (strcmp(argv[i], "--switched_apptype") == 0)
+ g_seen_apptype = TRUE;
- if (argc < 5 || argc > 10) {
- print_usage();
- goto fail;
- }
+ if (!g_seen_apptype)
+ selfexec_with_apptype(argc, argv);
- /* How many threads? */
- g_numthreads = atoi(argv[1]);
+ parse_args(argc, argv);
- /* What wakeup pattern? */
- g_waketype = parse_wakeup_pattern(argv[2]);
+ srand((unsigned int)time(NULL));
- /* Policy */
- g_policy = parse_thread_policy(argv[3]);
+ mach_timebase_info(&g_mti);
- /* Iterations */
- g_iterations = atoi(argv[4]);
-
- /* Optional args */
- for (i = 5; i < argc; i++) {
- if (strcmp(argv[i], "-spin") == 0) {
- g_do_spin = TRUE;
- } else if (strcmp(argv[i], "-verbose") == 0) {
- g_verbose = TRUE;
- } else if ((strcmp(argv[i], "-trace") == 0) &&
- (i < (argc - 1))) {
- traceworthy_latency_ns = strtoull(argv[++i], NULL, 10);
- } else if (strcmp(argv[i], "-affinity") == 0) {
- g_do_affinity = TRUE;
- } else if (strcmp(argv[i], "-switched_apptype") == 0) {
- seen_apptype = TRUE;
- } else {
- print_usage();
- goto fail;
- }
- }
+ size_t ncpu_size = sizeof(g_numcpus);
+ ret = sysctlbyname("hw.ncpu", &g_numcpus, &ncpu_size, NULL, 0);
+ if (ret) err(EX_OSERR, "Failed sysctlbyname(hw.ncpu)");
- if (!seen_apptype) {
- selfexec_with_apptype(argc, argv);
+ if (g_do_each_spin)
+ g_each_spin_duration_abs = nanos_to_abs(g_each_spin_duration_ns);
+
+ /* Configure the long-spin thread to take up half of its computation */
+ if (g_do_one_long_spin) {
+ g_one_long_spin_length_ns = COMPUTATION_NANOS / 2;
+ g_one_long_spin_length_abs = nanos_to_abs(g_one_long_spin_length_ns);
}
- mach_timebase_info(&g_mti);
+ /* Estimate the amount of time the cleanup phase needs to back off */
+ g_iteration_sleeptime_us = g_numthreads * 20;
-#if MIMIC_DIGI_LEAD_TIME
- g_spinlength_abs = nanos_to_abs(COMPUTATION_NANOS) / 2;
-#endif /* MIMIC_DIGI_LEAD_TIME */
+ uint32_t threads_per_core = (g_numthreads / g_numcpus) + 1;
+ if (g_do_each_spin)
+ g_iteration_sleeptime_us += threads_per_core * (g_each_spin_duration_ns / NSEC_PER_USEC);
+ if (g_do_one_long_spin)
+ g_iteration_sleeptime_us += g_one_long_spin_length_ns / NSEC_PER_USEC;
/* Arrays for threads and their wakeup times */
- threads = (pthread_t*) malloc(sizeof(pthread_t) * g_numthreads);
- assert(threads, fail);
+ threads = (pthread_t*) valloc(sizeof(pthread_t) * g_numthreads);
+ assert(threads);
+
+ size_t endtimes_size = sizeof(uint64_t) * g_numthreads;
+
+ g_thread_endtimes_abs = (uint64_t*) valloc(endtimes_size);
+ assert(g_thread_endtimes_abs);
+
+ /* Ensure the allocation is pre-faulted */
+ ret = memset_s(g_thread_endtimes_abs, endtimes_size, 0, endtimes_size);
+ if (ret) errc(EX_OSERR, ret, "memset_s endtimes");
+
+ size_t latencies_size = sizeof(uint64_t) * g_iterations;
+
+ worst_latencies_ns = (uint64_t*) valloc(latencies_size);
+ assert(worst_latencies_ns);
- g_thread_endtimes_abs = (uint64_t*) malloc(sizeof(uint64_t) * g_numthreads);
- assert(g_thread_endtimes_abs, fail);
+ /* Ensure the allocation is pre-faulted */
+ ret = memset_s(worst_latencies_ns, latencies_size, 0, latencies_size);
+ if (ret) errc(EX_OSERR, ret, "memset_s latencies");
- worst_latencies_ns = (uint64_t*) malloc(sizeof(uint64_t) * g_iterations);
- assert(worst_latencies_ns, fail);
+ worst_latencies_from_first_ns = (uint64_t*) valloc(latencies_size);
+ assert(worst_latencies_from_first_ns);
- worst_latencies_from_first_ns = (uint64_t*) malloc(sizeof(uint64_t) * g_iterations);
- assert(worst_latencies_from_first_ns, fail);
- res = semaphore_create(mach_task_self(), &g_main_sem, SYNC_POLICY_FIFO, 0);
- assert(res == KERN_SUCCESS, fail);
+ /* Ensure the allocation is pre-faulted */
+ ret = memset_s(worst_latencies_from_first_ns, latencies_size, 0, latencies_size);
+ if (ret) errc(EX_OSERR, ret, "memset_s latencies_from_first");
+
+ kr = semaphore_create(mach_task_self(), &g_main_sem, SYNC_POLICY_FIFO, 0);
+ mach_assert_zero(kr);
/* Either one big semaphore or one per thread */
- if (g_waketype == WAKE_CHAIN || g_waketype == WAKE_BROADCAST_PERTHREAD) {
- g_semarr = malloc(sizeof(semaphore_t) * g_numthreads);
- assert(g_semarr != NULL, fail);
+ if (g_waketype == WAKE_CHAIN ||
+ g_waketype == WAKE_BROADCAST_PERTHREAD ||
+ g_waketype == WAKE_HOP) {
+
+ g_semarr = valloc(sizeof(semaphore_t) * g_numthreads);
+ assert(g_semarr);
- for (i = 0; i < g_numthreads; i++) {
- res = semaphore_create(mach_task_self(), &g_semarr[i], SYNC_POLICY_FIFO, 0);
- assert(res == KERN_SUCCESS, fail);
+ for (uint32_t i = 0; i < g_numthreads; i++) {
+ kr = semaphore_create(mach_task_self(), &g_semarr[i], SYNC_POLICY_FIFO, 0);
+ mach_assert_zero(kr);
}
-
+
g_leadersem = g_semarr[0];
} else {
- res = semaphore_create(mach_task_self(), &g_machsem, SYNC_POLICY_FIFO, 0);
- assert(res == KERN_SUCCESS, fail);
- res = semaphore_create(mach_task_self(), &g_leadersem, SYNC_POLICY_FIFO, 0);
- assert(res == KERN_SUCCESS, fail);
+ kr = semaphore_create(mach_task_self(), &g_broadcastsem, SYNC_POLICY_FIFO, 0);
+ mach_assert_zero(kr);
+ kr = semaphore_create(mach_task_self(), &g_leadersem, SYNC_POLICY_FIFO, 0);
+ mach_assert_zero(kr);
}
+ if (g_waketype == WAKE_HOP) {
+ kr = semaphore_create(mach_task_self(), &g_donesem, SYNC_POLICY_FIFO, 0);
+ mach_assert_zero(kr);
+ }
+
+ kr = semaphore_create(mach_task_self(), &g_readysem, SYNC_POLICY_FIFO, 0);
+ mach_assert_zero(kr);
+
+ atomic_store_explicit(&g_done_threads, 0, memory_order_relaxed);
+
/* Create the threads */
- g_done_threads = 0;
- for (i = 0; i < g_numthreads; i++) {
- res = pthread_create(&threads[i], NULL, child_thread_func, (void*)(uintptr_t)i);
- assert(res == 0, fail);
+ for (uint32_t i = 0; i < g_numthreads; i++) {
+ ret = pthread_create(&threads[i], NULL, worker_thread, (void*)(uintptr_t)i);
+ if (ret) errc(EX_OSERR, ret, "pthread_create %d", i);
}
- res = setpriority(PRIO_DARWIN_ROLE, 0, PRIO_DARWIN_ROLE_UI_FOCAL);
- assert(res == 0, fail);
+ ret = setpriority(PRIO_DARWIN_ROLE, 0, PRIO_DARWIN_ROLE_UI_FOCAL);
+ if (ret) errc(EX_OSERR, ret, "setpriority");
+
thread_setup(0);
- /* Switching to fixed pri may have stripped our main thread QoS and priority, so re-instate */
- if (g_policy == MY_POLICY_FIXEDPRI) {
- thread_precedence_policy_data_t prec;
- mach_msg_type_number_t count;
- boolean_t get_default = FALSE;
-
- count = THREAD_PRECEDENCE_POLICY_COUNT;
- res = thread_policy_get(mach_thread_self(), THREAD_PRECEDENCE_POLICY, (thread_policy_t) &prec, &count, &get_default);
- assert(res == 0, fail);
-
- prec.importance += 16; /* 47 - 31 */
- res = thread_policy_set(mach_thread_self(), THREAD_PRECEDENCE_POLICY, (thread_policy_t) &prec, THREAD_PRECEDENCE_POLICY_COUNT);
- assert(res == 0, fail);
- }
+ g_starttime_abs = mach_absolute_time();
+
+ if (g_churn_pri)
+ create_churn_threads();
/* Let everyone get settled */
- for (i = 0; i < g_numthreads; i++) {
- res = semaphore_wait(g_main_sem);
- assert(res == 0, fail);
- }
- /* Let worker threads get back to sleep... */
- usleep(g_numthreads * 10);
+ kr = semaphore_wait(g_main_sem);
+ mach_assert_zero(kr);
+
+ /* Give the system a bit more time to settle */
+ if (g_do_sleep)
+ usleep(g_iteration_sleeptime_us);
/* Go! */
- for (i = 0; i < g_iterations; i++) {
- int j;
+ for (uint32_t i = 0; i < g_iterations; i++) {
+ uint32_t j;
uint64_t worst_abs = 0, best_abs = UINT64_MAX;
- g_done_threads = 0;
- OSMemoryBarrier();
+ if (g_do_one_long_spin)
+ g_one_long_spin_id = (uint32_t)rand() % g_numthreads;
+
+ debug_log("%d Main thread reset\n", i);
+
+ atomic_store_explicit(&g_done_threads, 0, memory_order_seq_cst);
g_starttime_abs = mach_absolute_time();
- /* Fire them off */
- semaphore_signal(g_leadersem);
+ /* Fire them off and wait for worker threads to finish */
+ kr = semaphore_wait_signal(g_main_sem, g_leadersem);
+ mach_assert_zero(kr);
- /* Wait for worker threads to finish */
- semaphore_wait(g_main_sem);
- assert(res == KERN_SUCCESS, fail);
+ debug_log("%d Main thread return\n", i);
- /*
+ assert(atomic_load_explicit(&g_done_threads, memory_order_relaxed) == g_numthreads);
+
+ /*
* We report the worst latencies relative to start time
* and relative to the lead worker thread.
*/
for (j = 0; j < g_numthreads; j++) {
uint64_t latency_abs;
-
+
latency_abs = g_thread_endtimes_abs[j] - g_starttime_abs;
worst_abs = worst_abs < latency_abs ? latency_abs : worst_abs;
}
/*
* In the event of a bad run, cut a trace point.
*/
- if (worst_latencies_from_first_ns[i] > traceworthy_latency_ns) {
- int _tmp;
+ if (worst_latencies_from_first_ns[i] > g_traceworthy_latency_ns) {
+ /* Ariadne's ad-hoc test signpost */
+ kdebug_trace(ARIADNEDBG_CODE(0, 0), worst_latencies_from_first_ns[i], g_traceworthy_latency_ns, 0, 0);
- if (g_verbose) {
+ if (g_verbose)
printf("Worst on this round was %.2f us.\n", ((float)worst_latencies_from_first_ns[i]) / 1000.0);
- }
-
- _tmp = syscall(SYS_kdebug_trace, 0xEEEEEEEE, 0, 0, 0, 0);
}
- /* Let worker threads get back to sleep... */
- usleep(g_numthreads * 10);
+ /* Give the system a bit more time to settle */
+ if (g_do_sleep)
+ usleep(g_iteration_sleeptime_us);
}
/* Rejoin threads */
- last_end = 0;
- for (i = 0; i < g_numthreads; i++) {
- res = pthread_join(threads[i], NULL);
- assert(res == 0, fail);
+ for (uint32_t i = 0; i < g_numthreads; i++) {
+ ret = pthread_join(threads[i], NULL);
+ if (ret) errc(EX_OSERR, ret, "pthread_join %d", i);
}
+ if (g_churn_pri)
+ join_churn_threads();
+
compute_stats(worst_latencies_ns, g_iterations, &avg, &max, &min, &stddev);
printf("Results (from a stop):\n");
printf("Max:\t\t%.2f us\n", ((float)max) / 1000.0);
printf("Stddev:\t\t%.2f us\n", stddev / 1000.0);
#if 0
- for (i = 0; i < g_iterations; i++) {
+ for (uint32_t i = 0; i < g_iterations; i++) {
printf("Iteration %d: %f us\n", i, worst_latencies_ns[i] / 1000.0);
}
-#endif
+#endif
+
+ free(threads);
+ free(g_thread_endtimes_abs);
+ free(worst_latencies_ns);
+ free(worst_latencies_from_first_ns);
return 0;
-fail:
- return 1;
}
/*
* apps. We use it here for a test tool only to opt into QoS using the same
* policies. Do not use this outside xnu or libxpc/launchd.
*/
-void
+static void
selfexec_with_apptype(int argc, char *argv[])
{
int ret;
char *new_argv[argc + 1 + 1 /* NULL */];
int i;
char prog[PATH_MAX];
- int32_t prog_size = PATH_MAX;
+ uint32_t prog_size = PATH_MAX;
ret = _NSGetExecutablePath(prog, &prog_size);
- if (ret != 0) err(1, "_NSGetExecutablePath");
+ if (ret) err(EX_OSERR, "_NSGetExecutablePath");
for (i=0; i < argc; i++) {
new_argv[i] = argv[i];
}
- new_argv[i] = "-switched_apptype";
+ new_argv[i] = "--switched_apptype";
new_argv[i+1] = NULL;
ret = posix_spawnattr_init(&attr);
- if (ret != 0) errc(1, ret, "posix_spawnattr_init");
+ if (ret) errc(EX_OSERR, ret, "posix_spawnattr_init");
ret = posix_spawnattr_setflags(&attr, POSIX_SPAWN_SETEXEC);
- if (ret != 0) errc(1, ret, "posix_spawnattr_setflags");
+ if (ret) errc(EX_OSERR, ret, "posix_spawnattr_setflags");
ret = posix_spawnattr_setprocesstype_np(&attr, POSIX_SPAWN_PROC_TYPE_APP_DEFAULT);
- if (ret != 0) errc(1, ret, "posix_spawnattr_setprocesstype_np");
+ if (ret) errc(EX_OSERR, ret, "posix_spawnattr_setprocesstype_np");
ret = posix_spawn(NULL, prog, NULL, &attr, new_argv, environ);
- if (ret != 0) errc(1, ret, "posix_spawn");
+ if (ret) errc(EX_OSERR, ret, "posix_spawn");
}
+
+/*
+ * Admittedly not very attractive.
+ */
+static void __attribute__((noreturn))
+usage()
+{
+ errx(EX_USAGE, "Usage: %s <threads> <chain | hop | broadcast-single-sem | broadcast-per-thread> "
+ "<realtime | timeshare | fixed> <iterations>\n\t\t"
+ "[--trace <traceworthy latency in ns>] "
+ "[--verbose] [--spin-one] [--spin-all] [--spin-time <nanos>] [--affinity]\n\t\t"
+ "[--no-sleep] [--drop-priority] [--churn-pri <pri>] [--churn-count <n>]",
+ getprogname());
+}
+
+static struct option* g_longopts;
+static int option_index;
+
+static uint32_t
+read_dec_arg()
+{
+ char *cp;
+ /* char* optarg is a magic global */
+
+ uint32_t arg_val = (uint32_t)strtoull(optarg, &cp, 10);
+
+ if (cp == optarg || *cp)
+ errx(EX_USAGE, "arg --%s requires a decimal number, found \"%s\"",
+ g_longopts[option_index].name, optarg);
+
+ return arg_val;
+}
+
+static void
+parse_args(int argc, char *argv[])
+{
+ enum {
+ OPT_GETOPT = 0,
+ OPT_SPIN_TIME,
+ OPT_TRACE,
+ OPT_PRIORITY,
+ OPT_CHURN_PRI,
+ OPT_CHURN_COUNT,
+ };
+
+ static struct option longopts[] = {
+ { "spin-time", required_argument, NULL, OPT_SPIN_TIME },
+ { "trace", required_argument, NULL, OPT_TRACE },
+ { "priority", required_argument, NULL, OPT_PRIORITY },
+ { "churn-pri", required_argument, NULL, OPT_CHURN_PRI },
+ { "churn-count", required_argument, NULL, OPT_CHURN_COUNT },
+ { "switched_apptype", no_argument, (int*)&g_seen_apptype, TRUE },
+ { "spin-one", no_argument, (int*)&g_do_one_long_spin, TRUE },
+ { "spin-all", no_argument, (int*)&g_do_all_spin, TRUE },
+ { "affinity", no_argument, (int*)&g_do_affinity, TRUE },
+ { "no-sleep", no_argument, (int*)&g_do_sleep, FALSE },
+ { "drop-priority", no_argument, (int*)&g_drop_priority, TRUE },
+ { "verbose", no_argument, (int*)&g_verbose, TRUE },
+ { "help", no_argument, NULL, 'h' },
+ { NULL, 0, NULL, 0 }
+ };
+
+ g_longopts = longopts;
+ int ch = 0;
+
+ while ((ch = getopt_long(argc, argv, "h", longopts, &option_index)) != -1) {
+ switch (ch) {
+ case OPT_GETOPT:
+ /* getopt_long set a variable */
+ break;
+ case OPT_SPIN_TIME:
+ g_do_each_spin = TRUE;
+ g_each_spin_duration_ns = read_dec_arg();
+ break;
+ case OPT_TRACE:
+ g_traceworthy_latency_ns = read_dec_arg();
+ break;
+ case OPT_PRIORITY:
+ g_priority = read_dec_arg();
+ break;
+ case OPT_CHURN_PRI:
+ g_churn_pri = read_dec_arg();
+ break;
+ case OPT_CHURN_COUNT:
+ g_churn_count = read_dec_arg();
+ break;
+ case '?':
+ case 'h':
+ default:
+ usage();
+ /* NORETURN */
+ }
+ }
+
+ /*
+ * getopt_long reorders all the options to the beginning of the argv array.
+ * Jump past them to the non-option arguments.
+ */
+
+ argc -= optind;
+ argv += optind;
+
+ if (argc > 4) {
+ warnx("Too many non-option arguments passed");
+ usage();
+ }
+
+ if (argc != 4) {
+ warnx("Missing required <threads> <waketype> <policy> <iterations> arguments");
+ usage();
+ }
+
+ char *cp;
+
+ /* How many threads? */
+ g_numthreads = (uint32_t)strtoull(argv[0], &cp, 10);
+
+ if (cp == argv[0] || *cp)
+ errx(EX_USAGE, "numthreads requires a decimal number, found \"%s\"", argv[0]);
+
+ if (g_numthreads < 1)
+ errx(EX_USAGE, "Must use at least one thread");
+
+ /* What wakeup pattern? */
+ g_waketype = parse_wakeup_pattern(argv[1]);
+
+ /* Policy */
+ g_policy = parse_thread_policy(argv[2]);
+
+ /* Iterations */
+ g_iterations = (uint32_t)strtoull(argv[3], &cp, 10);
+
+ if (cp == argv[3] || *cp)
+ errx(EX_USAGE, "numthreads requires a decimal number, found \"%s\"", argv[3]);
+
+ if (g_iterations < 1)
+ errx(EX_USAGE, "Must have at least one iteration");
+
+ if (g_numthreads == 1 && g_waketype == WAKE_CHAIN)
+ errx(EX_USAGE, "chain mode requires more than one thread");
+
+ if (g_numthreads == 1 && g_waketype == WAKE_HOP)
+ errx(EX_USAGE, "hop mode requires more than one thread");
+}
+
+