* pthread_synch.c
*/
+#pragma mark - Front Matter
+
#define _PTHREAD_CONDATTR_T
#define _PTHREAD_COND_T
#define _PTHREAD_MUTEXATTR_T
#include <sys/user.h> /* for coredump */
#include <sys/proc_info.h> /* for fill_procworkqueue */
-
#include <mach/mach_port.h>
#include <mach/mach_types.h>
#include <mach/semaphore.h>
#include <vm/vm_map.h>
#include <mach/thread_act.h> /* for thread_resume */
#include <machine/machine_routines.h>
+#include <mach/shared_region.h>
#include <libkern/OSAtomic.h>
#include <sys/pthread_shims.h>
#include "kern_internal.h"
-uint32_t pthread_debug_tracing = 0;
+#if DEBUG
+#define kevent_qos_internal kevent_qos_internal_stub
+static int kevent_qos_internal_stub(__unused struct proc *p, __unused int fd,
+ __unused user_addr_t changelist, __unused int nchanges,
+ __unused user_addr_t eventlist, __unused int nevents,
+ __unused user_addr_t data_out, user_size_t *data_available,
+ __unused unsigned int flags, int32_t *retval){
+ if (data_available){
+ static int i = 0;
+ switch (i++ % 4) {
+ case 0:
+ case 2:
+ *data_available = *data_available / 2;
+ *retval = 4;
+ break;
+ case 1:
+ *data_available = 0;
+ *retval = 4;
+ break;
+ case 3:
+ *retval = 0;
+ break;
+ }
+ } else {
+ *retval = 0;
+ }
+ return 0;
+}
+#endif /* DEBUG */
+
+uint32_t pthread_debug_tracing = 1;
SYSCTL_INT(_kern, OID_AUTO, pthread_debug_tracing, CTLFLAG_RW | CTLFLAG_LOCKED,
&pthread_debug_tracing, 0, "")
extern void thread_set_cthreadself(thread_t thread, uint64_t pself, int isLP64);
extern void workqueue_thread_yielded(void);
-static boolean_t workqueue_run_nextreq(proc_t p, struct workqueue *wq, thread_t th, boolean_t force_oc,
- boolean_t overcommit, pthread_priority_t oc_prio);
+enum run_nextreq_mode {RUN_NEXTREQ_DEFAULT, RUN_NEXTREQ_OVERCOMMIT, RUN_NEXTREQ_DEFERRED_OVERCOMMIT, RUN_NEXTREQ_UNCONSTRAINED, RUN_NEXTREQ_EVENT_MANAGER};
+static boolean_t workqueue_run_nextreq(proc_t p, struct workqueue *wq, thread_t th, enum run_nextreq_mode mode, pthread_priority_t oc_prio);
static boolean_t workqueue_run_one(proc_t p, struct workqueue *wq, boolean_t overcommit, pthread_priority_t priority);
-static void wq_runreq(proc_t p, boolean_t overcommit, pthread_priority_t priority, thread_t th, struct threadlist *tl,
+static void wq_runreq(proc_t p, pthread_priority_t priority, thread_t th, struct threadlist *tl,
int reuse_thread, int wake_thread, int return_directly);
-static int _setup_wqthread(proc_t p, thread_t th, boolean_t overcommit, pthread_priority_t priority, int reuse_thread, struct threadlist *tl);
+static int _setup_wqthread(proc_t p, thread_t th, pthread_priority_t priority, int reuse_thread, struct threadlist *tl);
static void wq_unpark_continue(void);
static void wq_unsuspend_continue(void);
-static boolean_t workqueue_addnewthread(struct workqueue *wq, boolean_t oc_thread);
+static boolean_t workqueue_addnewthread(struct workqueue *wq, boolean_t ignore_constrained_thread_limit);
static void workqueue_removethread(struct threadlist *tl, int fromexit);
static void workqueue_lock_spin(proc_t);
static void workqueue_unlock(proc_t);
+static boolean_t may_start_constrained_thread(struct workqueue *wq, uint32_t at_priclass, uint32_t my_priclass, boolean_t *start_timer);
+
+static mach_vm_offset_t stackaddr_hint(proc_t p);
+
int proc_settargetconc(pid_t pid, int queuenum, int32_t targetconc);
int proc_setalltargetconc(pid_t pid, int32_t * targetconcp);
#define C_32_STK_ALIGN 16
#define C_64_STK_ALIGN 16
#define C_64_REDZONE_LEN 128
-#define TRUNC_DOWN32(a,c) ((((uint32_t)a)-(c)) & ((uint32_t)(-(c))))
-#define TRUNC_DOWN64(a,c) ((((uint64_t)a)-(c)) & ((uint64_t)(-(c))))
+
+#define PTHREAD_T_OFFSET 0
/*
* Flags filed passed to bsdthread_create and back in pthread_start
#define SCHED_FIFO POLICY_FIFO
#define SCHED_RR POLICY_RR
+#define BASEPRI_DEFAULT 31
+
+#pragma mark - Process/Thread Setup/Teardown syscalls
+
+static mach_vm_offset_t stackaddr_hint(proc_t p __unused){
+ mach_vm_offset_t stackaddr;
+#if defined(__i386__) || defined(__x86_64__)
+ if (proc_is64bit(p)){
+ // Above nanomalloc range (see NANOZONE_SIGNATURE)
+ stackaddr = 0x700000000000;
+ } else {
+ stackaddr = SHARED_REGION_BASE_I386 + SHARED_REGION_SIZE_I386;
+ }
+#elif defined(__arm__) || defined(__arm64__)
+ if (proc_is64bit(p)){
+ // 64 stacks below nanomalloc (see NANOZONE_SIGNATURE)
+ stackaddr = 0x170000000 - 64 * PTH_DEFAULT_STACKSIZE;
+#if defined(__arm__)
+ } else if (pthread_kern->map_is_1gb(get_task_map(pthread_kern->proc_get_task(p)))){
+ stackaddr = SHARED_REGION_BASE_ARM - 32 * PTH_DEFAULT_STACKSIZE;
+#endif
+ } else {
+ stackaddr = SHARED_REGION_BASE_ARM + SHARED_REGION_SIZE_ARM;
+ }
+#else
+#error Need to define a stack address hint for this architecture
+#endif
+ return stackaddr;
+}
+
+/**
+ * bsdthread_create system call. Used by pthread_create.
+ */
int
_bsdthread_create(struct proc *p, user_addr_t user_func, user_addr_t user_funcarg, user_addr_t user_stack, user_addr_t user_pthread, uint32_t flags, user_addr_t *retval)
{
int allocated = 0;
mach_vm_offset_t stackaddr;
mach_vm_size_t th_allocsize = 0;
- mach_vm_size_t user_stacksize;
- mach_vm_size_t th_stacksize;
mach_vm_size_t th_guardsize;
- mach_vm_offset_t th_stackaddr;
mach_vm_offset_t th_stack;
mach_vm_offset_t th_pthread;
mach_port_name_t th_thport;
isLP64 = proc_is64bit(p);
th_guardsize = vm_map_page_size(vmap);
-#if defined(__i386__) || defined(__x86_64__)
- stackaddr = 0xB0000000;
-#else
-#error Need to define a stack address hint for this architecture
-#endif
+ stackaddr = stackaddr_hint(p);
kret = pthread_kern->thread_create(ctask, &th);
if (kret != KERN_SUCCESS)
return(ENOMEM);
sright = (void *)pthread_kern->convert_thread_to_port(th);
th_thport = pthread_kern->ipc_port_copyout_send(sright, pthread_kern->task_get_ipcspace(ctask));
- if ((flags & PTHREAD_START_CUSTOM) == 0) {
- th_stacksize = (mach_vm_size_t)user_stack; /* if it is custom them it is stacksize */
- th_allocsize = th_stacksize + th_guardsize + pthread_kern->proc_get_pthsize(p);
-
- kret = mach_vm_map(vmap, &stackaddr,
- th_allocsize,
- page_size-1,
- VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE , NULL,
- 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL,
- VM_INHERIT_DEFAULT);
- if (kret != KERN_SUCCESS)
- kret = mach_vm_allocate(vmap,
- &stackaddr, th_allocsize,
- VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE);
- if (kret != KERN_SUCCESS) {
+ if ((flags & PTHREAD_START_CUSTOM) == 0) {
+ mach_vm_size_t pthread_size =
+ vm_map_round_page_mask(pthread_kern->proc_get_pthsize(p) + PTHREAD_T_OFFSET, vm_map_page_mask(vmap));
+ th_allocsize = th_guardsize + user_stack + pthread_size;
+ user_stack += PTHREAD_T_OFFSET;
+
+ kret = mach_vm_map(vmap, &stackaddr,
+ th_allocsize,
+ page_size-1,
+ VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE , NULL,
+ 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL,
+ VM_INHERIT_DEFAULT);
+ if (kret != KERN_SUCCESS){
+ kret = mach_vm_allocate(vmap,
+ &stackaddr, th_allocsize,
+ VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE);
+ }
+ if (kret != KERN_SUCCESS) {
error = ENOMEM;
goto out;
- }
+ }
PTHREAD_TRACE(TRACE_pthread_thread_create|DBG_FUNC_NONE, th_allocsize, stackaddr, 0, 2, 0);
- th_stackaddr = stackaddr;
allocated = 1;
- /*
+ /*
* The guard page is at the lowest address
- * The stack base is the highest address
+ * The stack base is the highest address
*/
kret = mach_vm_protect(vmap, stackaddr, th_guardsize, FALSE, VM_PROT_NONE);
- if (kret != KERN_SUCCESS) {
+ if (kret != KERN_SUCCESS) {
error = ENOMEM;
goto out1;
- }
- th_stack = (stackaddr + th_stacksize + th_guardsize);
- th_pthread = (stackaddr + th_stacksize + th_guardsize);
- user_stacksize = th_stacksize;
+ }
+
+ th_pthread = stackaddr + th_guardsize + user_stack;
+ th_stack = th_pthread;
- /*
+ /*
* Pre-fault the first page of the new thread's stack and the page that will
* contain the pthread_t structure.
*/
- vm_fault( vmap,
- vm_map_trunc_page_mask(th_stack - PAGE_SIZE_64, vm_map_page_mask(vmap)),
- VM_PROT_READ | VM_PROT_WRITE,
- FALSE,
- THREAD_UNINT, NULL, 0);
+ if (vm_map_trunc_page_mask((vm_map_offset_t)(th_stack - C_64_REDZONE_LEN), vm_map_page_mask(vmap)) !=
+ vm_map_trunc_page_mask((vm_map_offset_t)th_pthread, vm_map_page_mask(vmap))){
+ vm_fault( vmap,
+ vm_map_trunc_page_mask((vm_map_offset_t)(th_stack - C_64_REDZONE_LEN), vm_map_page_mask(vmap)),
+ VM_PROT_READ | VM_PROT_WRITE,
+ FALSE,
+ THREAD_UNINT, NULL, 0);
+ }
vm_fault( vmap,
- vm_map_trunc_page_mask(th_pthread, vm_map_page_mask(vmap)),
- VM_PROT_READ | VM_PROT_WRITE,
- FALSE,
- THREAD_UNINT, NULL, 0);
+ vm_map_trunc_page_mask((vm_map_offset_t)th_pthread, vm_map_page_mask(vmap)),
+ VM_PROT_READ | VM_PROT_WRITE,
+ FALSE,
+ THREAD_UNINT, NULL, 0);
+
} else {
th_stack = user_stack;
- user_stacksize = user_stack;
th_pthread = user_pthread;
PTHREAD_TRACE(TRACE_pthread_thread_create|DBG_FUNC_NONE, 0, 0, 0, 3, 0);
* Set up i386 registers & function call.
*/
if (isLP64 == 0) {
- x86_thread_state32_t state;
- x86_thread_state32_t *ts = &state;
-
- ts->eip = (unsigned int)pthread_kern->proc_get_threadstart(p);
- ts->eax = (unsigned int)th_pthread;
- ts->ebx = (unsigned int)th_thport;
- ts->ecx = (unsigned int)user_func;
- ts->edx = (unsigned int)user_funcarg;
- ts->edi = (unsigned int)user_stacksize;
- ts->esi = (unsigned int)flags;
- /*
- * set stack pointer
- */
- ts->esp = (int)((vm_offset_t)(th_stack-C_32_STK_ALIGN));
+ x86_thread_state32_t state = {
+ .eip = (unsigned int)pthread_kern->proc_get_threadstart(p),
+ .eax = (unsigned int)th_pthread,
+ .ebx = (unsigned int)th_thport,
+ .ecx = (unsigned int)user_func,
+ .edx = (unsigned int)user_funcarg,
+ .edi = (unsigned int)user_stack,
+ .esi = (unsigned int)flags,
+ /*
+ * set stack pointer
+ */
+ .esp = (int)((vm_offset_t)(th_stack-C_32_STK_ALIGN))
+ };
- error = pthread_kern->thread_set_wq_state32(th, (thread_state_t)ts);
+ error = pthread_kern->thread_set_wq_state32(th, (thread_state_t)&state);
if (error != KERN_SUCCESS) {
error = EINVAL;
goto out;
}
} else {
- x86_thread_state64_t state64;
- x86_thread_state64_t *ts64 = &state64;
-
- ts64->rip = (uint64_t)pthread_kern->proc_get_threadstart(p);
- ts64->rdi = (uint64_t)th_pthread;
- ts64->rsi = (uint64_t)(th_thport);
- ts64->rdx = (uint64_t)user_func;
- ts64->rcx = (uint64_t)user_funcarg;
- ts64->r8 = (uint64_t)user_stacksize;
- ts64->r9 = (uint64_t)flags;
- /*
- * set stack pointer aligned to 16 byte boundary
- */
- ts64->rsp = (uint64_t)(th_stack - C_64_REDZONE_LEN);
+ x86_thread_state64_t state64 = {
+ .rip = (uint64_t)pthread_kern->proc_get_threadstart(p),
+ .rdi = (uint64_t)th_pthread,
+ .rsi = (uint64_t)(th_thport),
+ .rdx = (uint64_t)user_func,
+ .rcx = (uint64_t)user_funcarg,
+ .r8 = (uint64_t)user_stack,
+ .r9 = (uint64_t)flags,
+ /*
+ * set stack pointer aligned to 16 byte boundary
+ */
+ .rsp = (uint64_t)(th_stack - C_64_REDZONE_LEN)
+ };
- error = pthread_kern->thread_set_wq_state64(th, (thread_state_t)ts64);
+ error = pthread_kern->thread_set_wq_state64(th, (thread_state_t)&state64);
if (error != KERN_SUCCESS) {
error = EINVAL;
goto out;
}
#elif defined(__arm__)
- arm_thread_state_t state;
- arm_thread_state_t *ts = &state;
-
- ts->pc = (int)pthread_kern->proc_get_threadstart(p);
- ts->r[0] = (unsigned int)th_pthread;
- ts->r[1] = (unsigned int)th_thport;
- ts->r[2] = (unsigned int)user_func;
- ts->r[3] = (unsigned int)user_funcarg;
- ts->r[4] = (unsigned int)user_stacksize;
- ts->r[5] = (unsigned int)flags;
-
- /* Set r7 & lr to 0 for better back tracing */
- ts->r[7] = 0;
- ts->lr = 0;
-
- /*
- * set stack pointer
- */
- ts->sp = (int)((vm_offset_t)(th_stack-C_32_STK_ALIGN));
+ arm_thread_state_t state = {
+ .pc = (int)pthread_kern->proc_get_threadstart(p),
+ .r[0] = (unsigned int)th_pthread,
+ .r[1] = (unsigned int)th_thport,
+ .r[2] = (unsigned int)user_func,
+ .r[3] = (unsigned int)user_funcarg,
+ .r[4] = (unsigned int)user_stack,
+ .r[5] = (unsigned int)flags,
+
+ /* Set r7 & lr to 0 for better back tracing */
+ .r[7] = 0,
+ .lr = 0,
+
+ /*
+ * set stack pointer
+ */
+ .sp = (int)((vm_offset_t)(th_stack-C_32_STK_ALIGN))
+ };
- (void) pthread_kern->thread_set_wq_state32(th, (thread_state_t)ts);
+ (void) pthread_kern->thread_set_wq_state32(th, (thread_state_t)&state);
#else
#error bsdthread_create not defined for this architecture
thread_policy_set(th, THREAD_EXTENDED_POLICY, (thread_policy_t)&extinfo, THREAD_EXTENDED_POLICY_COUNT);
-#define BASEPRI_DEFAULT 31
precedinfo.importance = (importance - BASEPRI_DEFAULT);
thread_policy_set(th, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
} else if ((flags & PTHREAD_START_QOSCLASS) != 0) {
PTHREAD_TRACE(TRACE_pthread_thread_create|DBG_FUNC_END, error, th_pthread, 0, 0, 0);
- *retval = th_pthread;
+ // cast required as mach_vm_offset_t is always 64 bits even on 32-bit platforms
+ *retval = (user_addr_t)th_pthread;
return(0);
out1:
if (allocated != 0) {
- (void)mach_vm_deallocate(vmap, stackaddr, th_allocsize);
+ (void)mach_vm_deallocate(vmap, stackaddr, th_allocsize);
}
out:
(void)pthread_kern->mach_port_deallocate(pthread_kern->task_get_ipcspace(ctask), th_thport);
return(error);
}
+/**
+ * bsdthread_terminate system call. Used by pthread_terminate
+ */
int
_bsdthread_terminate(__unused struct proc *p,
user_addr_t stackaddr,
return(0);
}
+/**
+ * bsdthread_register system call. Performs per-process setup. Responsible for
+ * returning capabilitiy bits to userspace and receiving userspace function addresses.
+ */
int
_bsdthread_register(struct proc *p,
user_addr_t threadstart,
return(0);
}
+#pragma mark - QoS Manipulation
+
int
_bsdthread_ctl_set_qos(struct proc *p, user_addr_t __unused cmd, mach_port_name_t kport, user_addr_t tsd_priority_addr, user_addr_t arg3, int *retval)
{
*/
if (old_active == wq->wq_reqconc[old_bucket]) {
/* workqueue_run_nextreq will drop the workqueue lock in all exit paths. */
- (void)workqueue_run_nextreq(p, wq, THREAD_NULL, FALSE, FALSE, 0);
+ (void)workqueue_run_nextreq(p, wq, THREAD_NULL, RUN_NEXTREQ_DEFAULT, 0);
} else {
workqueue_unlock(p);
}
fixedpri:
if ((flags & _PTHREAD_SET_SELF_FIXEDPRIORITY_FLAG) != 0) {
- thread_extended_policy_data_t extpol;
+ thread_extended_policy_data_t extpol = {.timeshare = 0};
thread_t thread = current_thread();
- extpol.timeshare = 0;
+ struct threadlist *tl = util_get_thread_threadlist_entry(thread);
+ if (tl) {
+ /* Not allowed on workqueue threads */
+ fixedpri_rv = ENOTSUP;
+ goto done;
+ }
+
+ kr = pthread_kern->thread_policy_set_internal(thread, THREAD_EXTENDED_POLICY, (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
+ if (kr != KERN_SUCCESS) {
+ fixedpri_rv = EINVAL;
+ goto done;
+ }
+ } else if ((flags & _PTHREAD_SET_SELF_TIMESHARE_FLAG) != 0) {
+ thread_extended_policy_data_t extpol = {.timeshare = 1};
+ thread_t thread = current_thread();
struct threadlist *tl = util_get_thread_threadlist_entry(thread);
if (tl) {
- /* Not allowed on workqueue threads, since there is no symmetric clear function */
+ /* Not allowed on workqueue threads */
fixedpri_rv = ENOTSUP;
goto done;
}
}
}
+#pragma mark - Workqueue Implementation
+#pragma mark sysctls
+
uint32_t wq_yielded_threshold = WQ_YIELDED_THRESHOLD;
uint32_t wq_yielded_window_usecs = WQ_YIELDED_WINDOW_USECS;
uint32_t wq_stalled_window_usecs = WQ_STALLED_WINDOW_USECS;
uint32_t wq_max_timer_interval_usecs = WQ_MAX_TIMER_INTERVAL_USECS;
uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS;
uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8;
-
+uint32_t wq_max_concurrency = 1; // set to ncpus on load
SYSCTL_INT(_kern, OID_AUTO, wq_yielded_threshold, CTLFLAG_RW | CTLFLAG_LOCKED,
&wq_yielded_threshold, 0, "");
SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
&wq_max_constrained_threads, 0, "");
+#ifdef DEBUG
+SYSCTL_INT(_kern, OID_AUTO, wq_max_concurrency, CTLFLAG_RW | CTLFLAG_LOCKED,
+ &wq_max_concurrency, 0, "");
+
+static int wq_kevent_test SYSCTL_HANDLER_ARGS;
+SYSCTL_PROC(_debug, OID_AUTO, wq_kevent_test, CTLFLAG_MASKED | CTLFLAG_RW | CTLFLAG_LOCKED | CTLFLAG_ANYBODY | CTLTYPE_OPAQUE, NULL, 0, wq_kevent_test, 0, "-");
+#endif
static uint32_t wq_init_constrained_limit = 1;
+#pragma mark workqueue lock
void
_workqueue_init_lock(proc_t p)
lck_spin_unlock(pthread_kern->proc_get_wqlockptr(p));
}
+#pragma mark workqueue add timer
+/**
+ * Sets up the timer which will call out to workqueue_add_timer
+ */
static void
workqueue_interval_timer_start(struct workqueue *wq)
{
uint64_t deadline;
+ /* n.b. wq_timer_interval is reset to 0 in workqueue_add_timer if the
+ ATIMER_RUNNING flag is not present. The net effect here is that if a
+ sequence of threads is required, we'll double the time before we give out
+ the next one. */
if (wq->wq_timer_interval == 0) {
wq->wq_timer_interval = wq_stalled_window_usecs;
PTHREAD_TRACE(TRACE_wq_start_add_timer, wq, wq->wq_reqcount, wq->wq_flags, wq->wq_timer_interval, 0);
}
-
+/**
+ * returns whether lastblocked_tsp is within wq_stalled_window_usecs of cur_ts
+ */
static boolean_t
wq_thread_is_busy(uint64_t cur_ts, uint64_t *lastblocked_tsp)
{
return (FALSE);
}
-
#define WQ_TIMER_NEEDED(wq, start_timer) do { \
int oldflags = wq->wq_flags; \
\
} \
} while (0)
-
-
+/**
+ * handler function for the timer
+ */
static void
workqueue_add_timer(struct workqueue *wq, __unused int param1)
{
proc_t p;
boolean_t start_timer = FALSE;
boolean_t retval;
- boolean_t add_thread;
- uint32_t busycount;
-
+
PTHREAD_TRACE(TRACE_wq_add_timer | DBG_FUNC_START, wq, wq->wq_flags, wq->wq_nthreads, wq->wq_thidlecount, 0);
p = wq->wq_proc;
again:
retval = TRUE;
- add_thread = FALSE;
-
if ( !(wq->wq_flags & WQ_EXITING)) {
+ boolean_t add_thread = FALSE;
/*
* check to see if the stall frequency was beyond our tolerance
* or we have work on the queue, but haven't scheduled any
* there were no idle threads left to schedule
*/
if (wq->wq_reqcount) {
- uint32_t priclass;
- uint32_t thactive_count;
- uint32_t i;
- uint64_t curtime;
-
- for (priclass = 0; priclass < WORKQUEUE_NUM_BUCKETS; priclass++) {
- if (wq->wq_requests[priclass])
- break;
+ uint32_t priclass = 0;
+ uint32_t thactive_count = 0;
+ uint64_t curtime = mach_absolute_time();
+ uint64_t busycount = 0;
+
+ if (wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] &&
+ wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0){
+ priclass = WORKQUEUE_EVENT_MANAGER_BUCKET;
+ } else {
+ for (priclass = 0; priclass < WORKQUEUE_NUM_BUCKETS; priclass++) {
+ if (wq->wq_requests[priclass])
+ break;
+ }
}
- assert(priclass < WORKQUEUE_NUM_BUCKETS);
- curtime = mach_absolute_time();
- busycount = 0;
- thactive_count = 0;
-
- /*
- * check for conditions under which we would not add a thread, either
- * a) we've got as many running threads as we want in this priority
- * band and the priority bands above it
- *
- * b) check to see if the priority group has blocked threads, if the
- * last blocked timestamp is old enough, we will have already passed
- * (a) where we would have stopped if we had enough active threads.
- */
- for (i = 0; i <= priclass; i++) {
-
- thactive_count += wq->wq_thactive_count[i];
+ if (priclass < WORKQUEUE_EVENT_MANAGER_BUCKET){
+ /*
+ * Compute a metric for many how many threads are active. We
+ * find the highest priority request outstanding and then add up
+ * the number of active threads in that and all higher-priority
+ * buckets. We'll also add any "busy" threads which are not
+ * active but blocked recently enough that we can't be sure
+ * they've gone idle yet. We'll then compare this metric to our
+ * max concurrency to decide whether to add a new thread.
+ */
+ for (uint32_t i = 0; i <= priclass; i++) {
+ thactive_count += wq->wq_thactive_count[i];
- if (wq->wq_thscheduled_count[i]) {
- if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i]))
- busycount++;
+ // XXX why isn't this checking thscheduled_count < thactive_count ?
+ if (wq->wq_thscheduled_count[i]) {
+ if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i]))
+ busycount++;
+ }
}
}
- if (thactive_count + busycount < wq->wq_max_concurrency) {
+
+ if (thactive_count + busycount < wq->wq_max_concurrency ||
+ priclass == WORKQUEUE_EVENT_MANAGER_BUCKET) {
if (wq->wq_thidlecount == 0) {
/*
* if we have no idle threads, try to add one
*/
- retval = workqueue_addnewthread(wq, FALSE);
+ retval = workqueue_addnewthread(wq, priclass == WORKQUEUE_EVENT_MANAGER_BUCKET);
}
add_thread = TRUE;
}
* workqueue_run_nextreq is responsible for
* dropping the workqueue lock in all cases
*/
- retval = workqueue_run_nextreq(p, wq, THREAD_NULL, FALSE, FALSE, 0);
+ retval = workqueue_run_nextreq(p, wq, THREAD_NULL, RUN_NEXTREQ_DEFAULT, 0);
workqueue_lock_spin(p);
if (retval == FALSE)
}
}
}
+
+ /*
+ * If we called WQ_TIMER_NEEDED above, then this flag will be set if that
+ * call marked the timer running. If so, we let the timer interval grow.
+ * Otherwise, we reset it back to 0.
+ */
if ( !(wq->wq_flags & WQ_ATIMER_RUNNING))
wq->wq_timer_interval = 0;
workqueue_unlock(p);
- if (start_timer == TRUE)
- workqueue_interval_timer_start(wq);
+ if (start_timer == TRUE)
+ workqueue_interval_timer_start(wq);
}
+#pragma mark thread state tracking
+// called by spinlock code when trying to yield to lock owner
void
_workqueue_thread_yielded(void)
{
}
}
if (wq->wq_thidlecount) {
- uint32_t priority;
- boolean_t overcommit = FALSE;
- boolean_t force_oc = FALSE;
-
- for (priority = 0; priority < WORKQUEUE_NUM_BUCKETS; priority++) {
- if (wq->wq_requests[priority]) {
- break;
- }
- }
- assert(priority < WORKQUEUE_NUM_BUCKETS);
-
- wq->wq_reqcount--;
- wq->wq_requests[priority]--;
-
- if (wq->wq_ocrequests[priority]) {
- wq->wq_ocrequests[priority]--;
- overcommit = TRUE;
- } else
- force_oc = TRUE;
-
- (void)workqueue_run_nextreq(p, wq, THREAD_NULL, force_oc, overcommit, pthread_priority_from_class_index(priority));
+ (void)workqueue_run_nextreq(p, wq, THREAD_NULL, RUN_NEXTREQ_UNCONSTRAINED, 0);
/*
* workqueue_run_nextreq is responsible for
* dropping the workqueue lock in all cases
old_activecount = OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority]);
- if (old_activecount == wq->wq_reqconc[tl->th_priority]) {
+ /*
+ * If we blocked and were at the requested concurrency previously, we may
+ * need to spin up a new thread. Of course, if it's the event manager
+ * then that's moot, so ignore that case.
+ */
+ if (old_activecount == wq->wq_reqconc[tl->th_priority] &&
+ tl->th_priority != WORKQUEUE_EVENT_MANAGER_BUCKET) {
uint64_t curtime;
UInt64 *lastblocked_ptr;
return workqueue_callback;
}
+#pragma mark thread addition/removal
+
+/**
+ * pop goes the thread
+ */
static void
workqueue_removethread(struct threadlist *tl, int fromexit)
{
}
-/*
- * called with workq lock held
- * dropped and retaken around thread creation
- * return with workq lock held
+/**
+ * Try to add a new workqueue thread.
+ *
+ * - called with workq lock held
+ * - dropped and retaken around thread creation
+ * - return with workq lock held
*/
static boolean_t
-workqueue_addnewthread(struct workqueue *wq, boolean_t oc_thread)
+workqueue_addnewthread(struct workqueue *wq, boolean_t ignore_constrained_thread_limit)
{
struct threadlist *tl;
struct uthread *uth;
proc_t p;
void *sright;
mach_vm_offset_t stackaddr;
- mach_vm_size_t guardsize;
- if ((wq->wq_flags & WQ_EXITING) == WQ_EXITING)
+ if ((wq->wq_flags & WQ_EXITING) == WQ_EXITING) {
+ PTHREAD_TRACE(TRACE_wq_thread_add_during_exit | DBG_FUNC_NONE, wq, 0, 0, 0, 0);
return (FALSE);
+ }
if (wq->wq_nthreads >= wq_max_threads || wq->wq_nthreads >= (pthread_kern->config_thread_max - 20)) {
wq->wq_lflags |= WQL_EXCEEDED_TOTAL_THREAD_LIMIT;
+
+ PTHREAD_TRACE(TRACE_wq_thread_limit_exceeded | DBG_FUNC_NONE, wq, wq->wq_nthreads, wq_max_threads,
+ pthread_kern->config_thread_max - 20, 0);
return (FALSE);
}
wq->wq_lflags &= ~WQL_EXCEEDED_TOTAL_THREAD_LIMIT;
- if (oc_thread == FALSE && wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
- /*
- * if we're not creating this thread to service an overcommit request,
- * then check the size of the constrained thread pool... if we've already
- * reached our max for threads scheduled from this pool, don't create a new
- * one... the callers of this function are prepared for failure.
+ if (ignore_constrained_thread_limit == FALSE &&
+ wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
+ /*
+ * If we're not creating this thread to service an overcommit or
+ * event manager request, then we check to see if we are over our
+ * constrained thread limit, in which case we error out.
*/
wq->wq_lflags |= WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
+
+ PTHREAD_TRACE(TRACE_wq_thread_constrained_maxed | DBG_FUNC_NONE, wq, wq->wq_constrained_threads_scheduled,
+ wq_max_constrained_threads, 0, 0);
return (FALSE);
}
if (wq->wq_constrained_threads_scheduled < wq_max_constrained_threads)
kret = pthread_kern->thread_create_workq(wq->wq_task, (thread_continue_t)wq_unsuspend_continue, &th);
if (kret != KERN_SUCCESS) {
+ PTHREAD_TRACE(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, kret, 0, 0, 0);
goto failed;
}
tl = kalloc(sizeof(struct threadlist));
bzero(tl, sizeof(struct threadlist));
-#if defined(__i386__) || defined(__x86_64__)
- stackaddr = 0xB0000000;
-#else
-#error Need to define a stack address hint for this architecture
-#endif
-
- guardsize = vm_map_page_size(wq->wq_map);
- tl->th_allocsize = PTH_DEFAULT_STACKSIZE + guardsize + pthread_kern->proc_get_pthsize(p);
+ stackaddr = stackaddr_hint(p);
+
+ mach_vm_size_t guardsize = vm_map_page_size(wq->wq_map);
+ mach_vm_size_t pthread_size =
+ vm_map_round_page_mask(pthread_kern->proc_get_pthsize(p) + PTHREAD_T_OFFSET, vm_map_page_mask(wq->wq_map));
+ tl->th_allocsize = guardsize + PTH_DEFAULT_STACKSIZE + pthread_size;
kret = mach_vm_map(wq->wq_map, &stackaddr,
tl->th_allocsize,
VM_INHERIT_DEFAULT);
if (kret != KERN_SUCCESS) {
- kret = mach_vm_allocate(wq->wq_map,
- &stackaddr, tl->th_allocsize,
- VM_MAKE_TAG(VM_MEMORY_STACK) | VM_FLAGS_ANYWHERE);
+ PTHREAD_TRACE(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, kret, 1, 0, 0);
+
+ kret = mach_vm_allocate(wq->wq_map,
+ &stackaddr, tl->th_allocsize,
+ VM_MAKE_TAG(VM_MEMORY_STACK) | VM_FLAGS_ANYWHERE);
}
if (kret == KERN_SUCCESS) {
- /*
+ /*
* The guard page is at the lowest address
* The stack base is the highest address
*/
- kret = mach_vm_protect(wq->wq_map, stackaddr, guardsize, FALSE, VM_PROT_NONE);
+ kret = mach_vm_protect(wq->wq_map, stackaddr, guardsize, FALSE, VM_PROT_NONE);
- if (kret != KERN_SUCCESS)
- (void) mach_vm_deallocate(wq->wq_map, stackaddr, tl->th_allocsize);
+ if (kret != KERN_SUCCESS) {
+ (void) mach_vm_deallocate(wq->wq_map, stackaddr, tl->th_allocsize);
+ PTHREAD_TRACE(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, kret, 2, 0, 0);
+ }
}
if (kret != KERN_SUCCESS) {
(void) thread_terminate(th);
return (FALSE);
}
-
+/**
+ * Setup per-process state for the workqueue.
+ */
int
_workq_open(struct proc *p, __unused int32_t *retval)
{
wq = (struct workqueue *)ptr;
wq->wq_flags = WQ_LIST_INITED;
wq->wq_proc = p;
- wq->wq_max_concurrency = num_cpus;
+ wq->wq_max_concurrency = wq_max_concurrency;
wq->wq_task = current_task();
wq->wq_map = pthread_kern->current_map();
for (i = 0; i < WORKQUEUE_NUM_BUCKETS; i++)
wq->wq_reqconc[i] = (uint16_t)wq->wq_max_concurrency;
+ // The event manager bucket is special, so its gets a concurrency of 1
+ // though we shouldn't ever read this value for that bucket
+ wq->wq_reqconc[WORKQUEUE_EVENT_MANAGER_BUCKET] = 1;
+
+ // Always start the event manager at BACKGROUND
+ wq->wq_event_manager_priority = (uint32_t)pthread_qos_class_get_priority(THREAD_QOS_BACKGROUND) | _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
+
TAILQ_INIT(&wq->wq_thrunlist);
TAILQ_INIT(&wq->wq_thidlelist);
return(error);
}
-
-int
-_workq_kernreturn(struct proc *p,
- int options,
- __unused user_addr_t item,
- int arg2,
- int arg3,
- __unused int32_t *retval)
-{
- struct workqueue *wq;
- int error = 0;
-
- if (pthread_kern->proc_get_register(p) == 0) {
- return EINVAL;
- }
-
- switch (options) {
- case WQOPS_QUEUE_NEWSPISUPP: {
- /*
- * arg2 = offset of serialno into dispatch queue
- */
- int offset = arg2;
-
- pthread_kern->proc_set_dispatchqueue_serialno_offset(p, (uint64_t)offset);
- break;
- }
- case WQOPS_QUEUE_REQTHREADS: {
- /*
- * arg2 = number of threads to start
- * arg3 = priority
- */
- boolean_t overcommit = FALSE;
- int reqcount = arg2;
- pthread_priority_t priority = arg3;
- int class;
-
- overcommit = (_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) != 0;
- class = pthread_priority_get_class_index(priority);
-
- if ((reqcount <= 0) || (class < 0) || (class >= WORKQUEUE_NUM_BUCKETS)) {
- error = EINVAL;
- break;
- }
-
- workqueue_lock_spin(p);
-
- if ((wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p)) == NULL) {
- workqueue_unlock(p);
-
- error = EINVAL;
- break;
- }
-
- if (!overcommit) {
- wq->wq_reqcount += reqcount;
- wq->wq_requests[class] += reqcount;
-
- PTHREAD_TRACE(TRACE_wq_req_threads | DBG_FUNC_NONE, wq, priority, wq->wq_requests[class], reqcount, 0);
-
- while (wq->wq_reqcount) {
- if (!workqueue_run_one(p, wq, overcommit, priority))
- break;
- }
- } else {
- PTHREAD_TRACE(TRACE_wq_req_octhreads | DBG_FUNC_NONE, wq, priority, wq->wq_requests[class], reqcount, 0);
-
- while (reqcount) {
- if (!workqueue_run_one(p, wq, overcommit, priority))
- break;
- reqcount--;
- }
- if (reqcount) {
- /*
- * we need to delay starting some of the overcommit requests...
- * we should only fail to create the overcommit threads if
- * we're at the max thread limit... as existing threads
- * return to the kernel, we'll notice the ocrequests
- * and spin them back to user space as the overcommit variety
- */
- wq->wq_reqcount += reqcount;
- wq->wq_requests[class] += reqcount;
- wq->wq_ocrequests[class] += reqcount;
-
- PTHREAD_TRACE(TRACE_wq_delay_octhreads | DBG_FUNC_NONE, wq, priority, wq->wq_requests[class], reqcount, 0);
- }
- }
- workqueue_unlock(p);
- break;
- }
-
- case WQOPS_THREAD_RETURN: {
- thread_t th = current_thread();
- struct uthread *uth = pthread_kern->get_bsdthread_info(th);
- struct threadlist *tl = util_get_thread_threadlist_entry(th);
-
- /* reset signal mask on the workqueue thread to default state */
- if (pthread_kern->uthread_get_sigmask(uth) != (sigset_t)(~workq_threadmask)) {
- pthread_kern->proc_lock(p);
- pthread_kern->uthread_set_sigmask(uth, ~workq_threadmask);
- pthread_kern->proc_unlock(p);
- }
-
- /* dropping WQ override counts has to be done outside the wq lock. */
- wq_thread_override_reset(th, THREAD_QOS_OVERRIDE_RESOURCE_WILDCARD);
-
- workqueue_lock_spin(p);
-
- if ((wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p)) == NULL || !tl) {
- workqueue_unlock(p);
-
- error = EINVAL;
- break;
- }
- PTHREAD_TRACE(TRACE_wq_runitem | DBG_FUNC_END, wq, 0, 0, 0, 0);
-
-
- (void)workqueue_run_nextreq(p, wq, th, FALSE, FALSE, 0);
- /*
- * workqueue_run_nextreq is responsible for
- * dropping the workqueue lock in all cases
- */
- break;
- }
-
- default:
- error = EINVAL;
- break;
- }
- return (error);
-}
-
/*
* Routine: workqueue_mark_exiting
*
* Function: Mark the work queue such that new threads will not be added to the
- * work queue after we return.
+ * work queue after we return.
*
* Conditions: Called against the current process.
*/
/*
* we now arm the timer in the callback function w/o holding the workq lock...
- * we do this by setting WQ_ATIMER_RUNNING via OSCompareAndSwap in order to
+ * we do this by setting WQ_ATIMER_RUNNING via OSCompareAndSwap in order to
* insure only a single timer if running and to notice that WQ_EXITING has
* been set (we don't want to start a timer once WQ_EXITING is posted)
*
}
-static boolean_t
-workqueue_run_one(proc_t p, struct workqueue *wq, boolean_t overcommit, pthread_priority_t priority)
-{
- boolean_t ran_one;
-
- if (wq->wq_thidlecount == 0) {
- if (overcommit == FALSE) {
- if (wq->wq_constrained_threads_scheduled < wq->wq_max_concurrency)
- workqueue_addnewthread(wq, overcommit);
- } else {
- workqueue_addnewthread(wq, overcommit);
-
- if (wq->wq_thidlecount == 0)
- return (FALSE);
- }
- }
- ran_one = workqueue_run_nextreq(p, wq, THREAD_NULL, FALSE, overcommit, priority);
- /*
- * workqueue_run_nextreq is responsible for
- * dropping the workqueue lock in all cases
- */
- workqueue_lock_spin(p);
+#pragma mark workqueue thread manipulation
- return (ran_one);
-}
+/**
+ * Entry point for libdispatch to ask for threads
+ */
+static int wqops_queue_reqthreads(struct proc *p, int reqcount, pthread_priority_t priority){
+ struct workqueue *wq;
+ boolean_t overcommit = (_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) != 0;
+ int class = pthread_priority_get_class_index(priority);
+ boolean_t event_manager = (_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG) != 0;
+ if (event_manager){
+ class = WORKQUEUE_EVENT_MANAGER_BUCKET;
+ }
-/*
- * workqueue_run_nextreq:
- * called with the workqueue lock held...
- * responsible for dropping it in all cases
- */
-static boolean_t
-workqueue_run_nextreq(proc_t p, struct workqueue *wq, thread_t thread,
- boolean_t force_oc, boolean_t overcommit, pthread_priority_t oc_prio)
-{
- thread_t th_to_run = THREAD_NULL;
- thread_t th_to_park = THREAD_NULL;
- int wake_thread = 0;
- int reuse_thread = WQ_FLAG_THREAD_REUSE;
- uint32_t priclass, orig_class;
- uint32_t us_to_wait;
- struct threadlist *tl = NULL;
- struct uthread *uth = NULL;
+ if ((reqcount <= 0) || (class < 0) || (class >= WORKQUEUE_NUM_BUCKETS) || (overcommit && event_manager)) {
+ return EINVAL;
+ }
+
+ workqueue_lock_spin(p);
+
+ if ((wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p)) == NULL) {
+ workqueue_unlock(p);
+
+ return EINVAL;
+ }
+
+ if (overcommit == 0 && event_manager == 0) {
+ wq->wq_reqcount += reqcount;
+ wq->wq_requests[class] += reqcount;
+
+ PTHREAD_TRACE(TRACE_wq_req_threads | DBG_FUNC_NONE, wq, priority, wq->wq_requests[class], reqcount, 0);
+
+ while (wq->wq_reqcount) {
+ if (!workqueue_run_one(p, wq, overcommit, 0))
+ break;
+ }
+ } else if (overcommit){
+ PTHREAD_TRACE(TRACE_wq_req_octhreads | DBG_FUNC_NONE, wq, priority, wq->wq_ocrequests[class], reqcount, 0);
+
+ while (reqcount) {
+ if (!workqueue_run_one(p, wq, overcommit, priority))
+ break;
+ reqcount--;
+ }
+ if (reqcount) {
+ /*
+ * we need to delay starting some of the overcommit requests...
+ * we should only fail to create the overcommit threads if
+ * we're at the max thread limit... as existing threads
+ * return to the kernel, we'll notice the ocrequests
+ * and spin them back to user space as the overcommit variety
+ */
+ wq->wq_reqcount += reqcount;
+ wq->wq_requests[class] += reqcount;
+ wq->wq_ocrequests[class] += reqcount;
+
+ PTHREAD_TRACE(TRACE_wq_delay_octhreads | DBG_FUNC_NONE, wq, priority, wq->wq_ocrequests[class], reqcount, 0);
+
+ /* if we delayed this thread coming up but we're not constrained
+ * or at max threads then we need to start the timer so we don't
+ * risk dropping this request on the floor.
+ */
+ if ((wq->wq_lflags & (WQL_EXCEEDED_TOTAL_THREAD_LIMIT | WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT)) == 0) {
+ boolean_t start_timer = FALSE;
+ WQ_TIMER_NEEDED(wq, start_timer);
+
+ if (start_timer) {
+ workqueue_interval_timer_start(wq);
+ }
+ }
+ }
+ } else if (event_manager) {
+ PTHREAD_TRACE(TRACE_wq_req_event_manager | DBG_FUNC_NONE, wq, wq->wq_event_manager_priority, wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET], wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET], 0);
+
+ if (wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0){
+ wq->wq_reqcount += 1;
+ wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] = 1;
+ }
+
+ // We've recorded the request for an event manager thread above. We'll
+ // let the timer pick it up as we would for a kernel callout. We can
+ // do a direct add/wakeup when that support is added for the kevent path.
+ boolean_t start_timer = FALSE;
+ if (wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0)
+ WQ_TIMER_NEEDED(wq, start_timer);
+ if (start_timer == TRUE)
+ workqueue_interval_timer_start(wq);
+ }
+ workqueue_unlock(p);
+
+ return 0;
+}
+
+/* Used by the kevent system to request threads. Currently count is ignored
+ * and we always return one thread per invocation.
+ */
+thread_t _workq_reqthreads(struct proc *p, int requests_count, workq_reqthreads_req_t requests){
boolean_t start_timer = FALSE;
- boolean_t adjust_counters = TRUE;
- uint64_t curtime;
- uint32_t thactive_count;
- uint32_t busycount;
+ assert(requests_count > 0);
+
+#if DEBUG
+ // Make sure that the requests array is sorted, highest priority first
+ if (requests_count > 1){
+ __assert_only qos_class_t priority = _pthread_priority_get_qos_newest(requests[0].priority);
+ __assert_only unsigned long flags = ((_pthread_priority_get_flags(requests[0].priority) & (_PTHREAD_PRIORITY_OVERCOMMIT_FLAG|_PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) != 0);
+ for (int i = 1; i < requests_count; i++){
+ if (requests[i].count == 0) continue;
+ __assert_only qos_class_t next_priority = _pthread_priority_get_qos_newest(requests[i].priority);
+ __assert_only unsigned long next_flags = ((_pthread_priority_get_flags(requests[i].priority) & (_PTHREAD_PRIORITY_OVERCOMMIT_FLAG|_PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) != 0);
+ if (next_flags != flags){
+ flags = next_flags;
+ priority = next_priority;
+ } else {
+ assert(next_priority <= priority);
+ }
+ }
+ }
+#endif // DEBUG
- PTHREAD_TRACE(TRACE_wq_run_nextitem|DBG_FUNC_START, wq, thread, wq->wq_thidlecount, wq->wq_reqcount, 0);
+ int error = 0;
+ struct workqueue *wq;
- if (thread != THREAD_NULL) {
- uth = pthread_kern->get_bsdthread_info(thread);
+ workqueue_lock_spin(p);
- if ((tl = pthread_kern->uthread_get_threadlist(uth)) == NULL) {
- panic("wq thread with no threadlist");
+ if ((wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p)) == NULL) {
+ error = EINVAL;
+ goto done;
+ }
+
+ PTHREAD_TRACE(TRACE_wq_kevent_req_threads | DBG_FUNC_START, wq, requests_count, 0, 0, 0);
+
+ // Look for overcommit or event-manager-only requests.
+ boolean_t have_overcommit = FALSE;
+ pthread_priority_t priority = 0;
+ for (int i = 0; i < requests_count; i++){
+ if (requests[i].count == 0)
+ continue;
+ priority = requests[i].priority;
+ if (_pthread_priority_get_qos_newest(priority) == QOS_CLASS_UNSPECIFIED){
+ priority |= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
+ }
+ if ((_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG) != 0){
+ goto event_manager;
+ }
+ if ((_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) != 0){
+ have_overcommit = TRUE;
+ break;
}
}
+ if (have_overcommit){
+ // I can't make this call, since it's not safe from some contexts yet,
+ // so just setup a delayed overcommit and let the timer do the work
+ //boolean_t success = workqueue_run_one(p, wq, TRUE, priority);
+ if (/* !success */ TRUE){
+ int class = pthread_priority_get_class_index(priority);
+ wq->wq_reqcount += 1;
+ wq->wq_requests[class] += 1;
+ wq->wq_kevent_ocrequests[class] += 1;
+
+ PTHREAD_TRACE(TRACE_wq_req_kevent_octhreads | DBG_FUNC_NONE, wq, priority, wq->wq_kevent_ocrequests[class], 1, 0);
+
+ WQ_TIMER_NEEDED(wq, start_timer);
+ }
+ goto done;
+ }
+
+ // Having no overcommit requests, try to find any request that can start
+ // There's no TOCTTOU since we hold the workqueue lock
+ for (int i = 0; i < requests_count; i++){
+ workq_reqthreads_req_t req = requests + i;
+ priority = req->priority;
+
+ if (req->count == 0)
+ continue;
+
+ int class = pthread_priority_get_class_index(priority);
+
+ // Ask if we can start a new thread at the given class. Pass NUM_BUCKETS as
+ // my class to indicate we won't reuse this thread
+ if (may_start_constrained_thread(wq, class, WORKQUEUE_NUM_BUCKETS, NULL)){
+ wq->wq_reqcount += 1;
+ wq->wq_requests[class] += 1;
+ wq->wq_kevent_requests[class] += 1;
+
+ PTHREAD_TRACE(TRACE_wq_req_kevent_threads | DBG_FUNC_NONE, wq, priority, wq->wq_kevent_requests[class], 1, 0);
+
+ // I can't make this call because it's not yet safe to make from
+ // scheduler callout context, so instead we'll just start up the timer
+ // which will spin up the thread when it files.
+ // workqueue_run_one(p, wq, FALSE, priority);
+
+ WQ_TIMER_NEEDED(wq, start_timer);
+
+ goto done;
+ }
+ }
+
+ // Okay, here's the fun case: we can't spin up any of the non-overcommit threads
+ // that we've seen a request for, so we kick this over to the event manager thread
+
+event_manager:
+ PTHREAD_TRACE(TRACE_wq_req_event_manager | DBG_FUNC_NONE, wq, wq->wq_event_manager_priority, wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET], wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET], 0);
+
+ if (wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0){
+ wq->wq_reqcount += 1;
+ wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] = 1;
+ }
+ wq->wq_kevent_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] = 1;
+
+ if (wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0)
+ WQ_TIMER_NEEDED(wq, start_timer);
+
+done:
+ workqueue_unlock(p);
+
+ if (start_timer == TRUE)
+ workqueue_interval_timer_start(wq);
+
+ PTHREAD_TRACE(TRACE_wq_kevent_req_threads | DBG_FUNC_END, wq, start_timer, 0, 0, 0);
+
+ return THREAD_NULL;
+}
+
+
+static int wqops_thread_return(struct proc *p){
+ thread_t th = current_thread();
+ struct uthread *uth = pthread_kern->get_bsdthread_info(th);
+ struct threadlist *tl = util_get_thread_threadlist_entry(th);
+
+ /* reset signal mask on the workqueue thread to default state */
+ if (pthread_kern->uthread_get_sigmask(uth) != (sigset_t)(~workq_threadmask)) {
+ pthread_kern->proc_lock(p);
+ pthread_kern->uthread_set_sigmask(uth, ~workq_threadmask);
+ pthread_kern->proc_unlock(p);
+ }
+
+ /* dropping WQ override counts has to be done outside the wq lock. */
+ wq_thread_override_reset(th, THREAD_QOS_OVERRIDE_RESOURCE_WILDCARD);
+
+ workqueue_lock_spin(p);
+
+ struct workqueue *wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p);
+ if (wq == NULL || !tl) {
+ workqueue_unlock(p);
+
+ return EINVAL;
+ }
+ PTHREAD_TRACE(TRACE_wq_runitem | DBG_FUNC_END, wq, 0, 0, 0, 0);
+
+ (void)workqueue_run_nextreq(p, wq, th, RUN_NEXTREQ_DEFAULT, 0);
/*
- * from here until we drop the workq lock
- * we can't be pre-empted since we hold
- * the lock in spin mode... this is important
- * since we have to independently update the priority that
- * the thread is associated with and the priorty based
- * counters that "workqueue_callback" also changes and bases
- * decisons on.
+ * workqueue_run_nextreq is responsible for
+ * dropping the workqueue lock in all cases
*/
-dispatch_overcommit:
+ return 0;
+}
- if (overcommit || force_oc) {
- priclass = pthread_priority_get_class_index(oc_prio);
+/**
+ * Multiplexed call to interact with the workqueue mechanism
+ */
+int
+_workq_kernreturn(struct proc *p,
+ int options,
+ __unused user_addr_t item,
+ int arg2,
+ int arg3,
+ int32_t *retval)
+{
+ int error = 0;
+
+ if (pthread_kern->proc_get_register(p) == 0) {
+ return EINVAL;
+ }
- if (thread != THREAD_NULL) {
- th_to_run = thread;
- goto pick_up_work;
+ switch (options) {
+ case WQOPS_QUEUE_NEWSPISUPP: {
+ /*
+ * arg2 = offset of serialno into dispatch queue
+ * arg3 = kevent support
+ */
+ int offset = arg2;
+ if (arg3 & 0x01){
+ // If we get here, then userspace has indicated support for kevent delivery.
}
- goto grab_idle_thread;
+
+ pthread_kern->proc_set_dispatchqueue_serialno_offset(p, (uint64_t)offset);
+ break;
}
- if (wq->wq_reqcount) {
- for (priclass = 0; priclass < WORKQUEUE_NUM_BUCKETS; priclass++) {
- if (wq->wq_requests[priclass])
- break;
+ case WQOPS_QUEUE_REQTHREADS: {
+ /*
+ * arg2 = number of threads to start
+ * arg3 = priority
+ */
+ error = wqops_queue_reqthreads(p, arg2, arg3);
+ break;
+ }
+ case WQOPS_SET_EVENT_MANAGER_PRIORITY: {
+ /*
+ * arg2 = priority for the manager thread
+ *
+ * if _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG is set, the
+ * ~_PTHREAD_PRIORITY_FLAGS_MASK contains a scheduling priority instead
+ * of a QOS value
+ */
+ pthread_priority_t pri = arg2;
+
+ workqueue_lock_spin(p);
+ struct workqueue *wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p);
+ if (wq == NULL ) {
+ workqueue_unlock(p);
+ error = EINVAL;
+ break;
+ }
+ if (pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG){
+ // If userspace passes a scheduling priority, that takes precidence
+ // over any QoS. (So, userspace should take care not to accidenatally
+ // lower the priority this way.)
+ uint32_t sched_pri = pri & (~_PTHREAD_PRIORITY_FLAGS_MASK);
+ if (wq->wq_event_manager_priority & _PTHREAD_PRIORITY_SCHED_PRI_FLAG){
+ wq->wq_event_manager_priority = MAX(sched_pri, wq->wq_event_manager_priority & (~_PTHREAD_PRIORITY_FLAGS_MASK))
+ | _PTHREAD_PRIORITY_SCHED_PRI_FLAG | _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
+ } else {
+ wq->wq_event_manager_priority = sched_pri
+ | _PTHREAD_PRIORITY_SCHED_PRI_FLAG | _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
+ }
+ } else if ((wq->wq_event_manager_priority & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) == 0){
+ int cur_qos = pthread_priority_get_qos_class(wq->wq_event_manager_priority);
+ int new_qos = pthread_priority_get_qos_class(pri);
+ wq->wq_event_manager_priority = (uint32_t)pthread_qos_class_get_priority(MAX(cur_qos, new_qos)) | _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
+ }
+ workqueue_unlock(p);
+ break;
+ }
+ case WQOPS_THREAD_KEVENT_RETURN: {
+ int32_t kevent_retval;
+ int ret = kevent_qos_internal(p, -1, item, arg2, item, arg2, NULL, NULL, KEVENT_FLAG_WORKQ | KEVENT_FLAG_IMMEDIATE | KEVENT_FLAG_ERROR_EVENTS, &kevent_retval);
+ // We shouldn't be getting more errors out than events we put in, so
+ // reusing the input buffer should always provide enough space
+ assert(ret == KERN_SUCCESS && kevent_retval >= 0);
+ if (ret != KERN_SUCCESS){
+ error = ret;
+ break;
+ } else if (kevent_retval > 0){
+ assert(kevent_retval <= arg2);
+ *retval = kevent_retval;
+ error = 0;
+ break;
}
- assert(priclass < WORKQUEUE_NUM_BUCKETS);
+ } /* FALLTHROUGH */
+ case WQOPS_THREAD_RETURN: {
+ error = wqops_thread_return(p);
+ // NOT REACHED except in case of error
+ assert(error);
+ break;
+ }
+ default:
+ error = EINVAL;
+ break;
+ }
+ return (error);
+}
- if (wq->wq_ocrequests[priclass] && (thread != THREAD_NULL || wq->wq_thidlecount)) {
- /*
- * handle delayed overcommit request...
- * they have priority over normal requests
- * within a given priority level
- */
- wq->wq_reqcount--;
- wq->wq_requests[priclass]--;
- wq->wq_ocrequests[priclass]--;
- oc_prio = pthread_priority_from_class_index(priclass);
- overcommit = TRUE;
+static boolean_t
+workqueue_run_one(proc_t p, struct workqueue *wq, boolean_t overcommit, pthread_priority_t priority)
+{
+ boolean_t ran_one;
- goto dispatch_overcommit;
+ if (wq->wq_thidlecount == 0) {
+ if (overcommit == FALSE) {
+ if (wq->wq_constrained_threads_scheduled < wq->wq_max_concurrency)
+ workqueue_addnewthread(wq, overcommit);
+ } else {
+ workqueue_addnewthread(wq, overcommit);
+
+ if (wq->wq_thidlecount == 0)
+ return (FALSE);
}
}
+ ran_one = workqueue_run_nextreq(p, wq, THREAD_NULL, overcommit ? RUN_NEXTREQ_OVERCOMMIT : RUN_NEXTREQ_DEFAULT, priority);
/*
- * if we get here, the work should be handled by a constrained thread
+ * workqueue_run_nextreq is responsible for
+ * dropping the workqueue lock in all cases
*/
- if (wq->wq_reqcount == 0 || wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
- /*
- * no work to do, or we're already at or over the scheduling limit for
- * constrained threads... just return or park the thread...
- * do not start the timer for this condition... if we don't have any work,
- * we'll check again when new work arrives... if we're over the limit, we need 1 or more
- * constrained threads to return to the kernel before we can dispatch additional work
- */
- if ((th_to_park = thread) == THREAD_NULL)
- goto out_of_work;
- goto parkit;
+ workqueue_lock_spin(p);
+
+ return (ran_one);
+}
+
+/*
+ * this is a workqueue thread with no more
+ * work to do... park it for now
+ */
+static void
+parkit(struct workqueue *wq, struct threadlist *tl, thread_t thread)
+{
+ uint32_t us_to_wait;
+
+ TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry);
+ tl->th_flags &= ~TH_LIST_RUNNING;
+
+ tl->th_flags |= TH_LIST_BLOCKED;
+ TAILQ_INSERT_HEAD(&wq->wq_thidlelist, tl, th_entry);
+
+ pthread_kern->thread_sched_call(thread, NULL);
+
+ OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority]);
+ wq->wq_thscheduled_count[tl->th_priority]--;
+ wq->wq_threads_scheduled--;
+
+ if (tl->th_flags & TH_LIST_CONSTRAINED) {
+ wq->wq_constrained_threads_scheduled--;
+ wq->wq_lflags &= ~WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
+ tl->th_flags &= ~TH_LIST_CONSTRAINED;
}
- thactive_count = 0;
- busycount = 0;
+ if (wq->wq_thidlecount < 100)
+ us_to_wait = wq_reduce_pool_window_usecs - (wq->wq_thidlecount * (wq_reduce_pool_window_usecs / 100));
+ else
+ us_to_wait = wq_reduce_pool_window_usecs / 100;
+
+ wq->wq_thidlecount++;
+ wq->wq_lflags &= ~WQL_EXCEEDED_TOTAL_THREAD_LIMIT;
+
+ assert_wait_timeout_with_leeway((caddr_t)tl, (THREAD_INTERRUPTIBLE),
+ TIMEOUT_URGENCY_SYS_BACKGROUND|TIMEOUT_URGENCY_LEEWAY, us_to_wait,
+ wq_reduce_pool_window_usecs, NSEC_PER_USEC);
+
+ PTHREAD_TRACE1(TRACE_wq_thread_park | DBG_FUNC_START, wq, wq->wq_threads_scheduled, wq->wq_thidlecount, us_to_wait, thread_tid(thread));
+}
- curtime = mach_absolute_time();
+static boolean_t may_start_constrained_thread(struct workqueue *wq, uint32_t at_priclass, uint32_t my_priclass, boolean_t *start_timer){
+ if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
+ /*
+ * we need 1 or more constrained threads to return to the kernel before
+ * we can dispatch additional work
+ */
+ return FALSE;
+ }
- thactive_count += wq->wq_thactive_count[priclass];
+ uint32_t busycount = 0;
+ uint32_t thactive_count = wq->wq_thactive_count[at_priclass];
- if (wq->wq_thscheduled_count[priclass]) {
- if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[priclass])) {
+ // Has our most recently blocked thread blocked recently enough that we
+ // should still consider it busy?
+ // XXX should this be wq->wq_thscheduled_count[at_priclass] > thactive_count ?
+ if (wq->wq_thscheduled_count[at_priclass]) {
+ if (wq_thread_is_busy(mach_absolute_time(), &wq->wq_lastblocked_ts[at_priclass])) {
busycount++;
}
}
- if (thread != THREAD_NULL) {
- if (tl->th_priority == priclass) {
- /*
- * dont't count this thread as currently active
- */
- thactive_count--;
- }
+ if (my_priclass < WORKQUEUE_NUM_BUCKETS && my_priclass == at_priclass){
+ /*
+ * dont't count this thread as currently active
+ */
+ thactive_count--;
}
+
if (thactive_count + busycount >= wq->wq_max_concurrency) {
- if (busycount) {
+ if (busycount && start_timer) {
/*
* we found at least 1 thread in the
* 'busy' state... make sure we start
* to kick off the timer... we need to
* start it now...
*/
- WQ_TIMER_NEEDED(wq, start_timer);
+ WQ_TIMER_NEEDED(wq, *start_timer);
}
- PTHREAD_TRACE(TRACE_wq_overcommitted|DBG_FUNC_NONE, wq, (start_timer ? 1<<7 : 0) | pthread_priority_from_class_index(priclass), thactive_count, busycount, 0);
+ PTHREAD_TRACE(TRACE_wq_overcommitted|DBG_FUNC_NONE, wq, (start_timer ? 1<<7 : 0) | pthread_priority_from_class_index(at_priclass), thactive_count, busycount, 0);
- if ((th_to_park = thread) == THREAD_NULL) {
- goto out_of_work;
+ return FALSE;
+ }
+ return TRUE;
+}
+
+static struct threadlist *pop_from_thidlelist(struct workqueue *wq, uint32_t priclass, int *upcall_flags, int *wake_thread){
+ struct threadlist *tl = TAILQ_FIRST(&wq->wq_thidlelist);
+ TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
+ wq->wq_thidlecount--;
+
+ TAILQ_INSERT_TAIL(&wq->wq_thrunlist, tl, th_entry);
+
+ if ((tl->th_flags & TH_LIST_SUSPENDED) == TH_LIST_SUSPENDED) {
+ tl->th_flags &= ~TH_LIST_SUSPENDED;
+ *upcall_flags &= ~WQ_FLAG_THREAD_REUSE;
+
+ } else if ((tl->th_flags & TH_LIST_BLOCKED) == TH_LIST_BLOCKED) {
+ tl->th_flags &= ~TH_LIST_BLOCKED;
+ *wake_thread = 1;
+ }
+ tl->th_flags |= TH_LIST_RUNNING | TH_LIST_BUSY;
+
+ wq->wq_threads_scheduled++;
+ wq->wq_thscheduled_count[priclass]++;
+ OSAddAtomic(1, &wq->wq_thactive_count[priclass]);
+
+ return tl;
+}
+
+static void
+reset_to_priority(struct threadlist *tl, pthread_priority_t pri){
+ kern_return_t ret;
+ thread_t th = tl->th_thread;
+
+ if (tl->th_flags & TH_LIST_EVENT_MGR_SCHED_PRI){
+ thread_precedence_policy_data_t precedinfo = {
+ .importance = 0
+ };
+ ret = pthread_kern->thread_policy_set_internal(th, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
+ assert(ret == KERN_SUCCESS || ret == KERN_TERMINATED);
+ tl->th_flags &= ~TH_LIST_EVENT_MGR_SCHED_PRI;
+ }
+
+ thread_qos_policy_data_t qosinfo = {
+ .qos_tier = pthread_priority_get_qos_class(pri),
+ .tier_importance = 0
+ };
+ ret = pthread_kern->thread_policy_set_internal(th, THREAD_QOS_POLICY, (thread_policy_t)&qosinfo, THREAD_QOS_POLICY_COUNT);
+ assert(ret == KERN_SUCCESS || ret == KERN_TERMINATED);
+}
+
+static void
+reset_to_schedpri(struct threadlist *tl, pthread_priority_t pri){
+ kern_return_t ret;
+ thread_t th = tl->th_thread;
+
+ thread_qos_policy_data_t qosinfo = {
+ .qos_tier = THREAD_QOS_UNSPECIFIED,
+ .tier_importance = 0
+ };
+ ret = pthread_kern->thread_policy_set_internal(th, THREAD_QOS_POLICY, (thread_policy_t)&qosinfo, THREAD_QOS_POLICY_COUNT);
+ assert(ret == KERN_SUCCESS || ret == KERN_TERMINATED);
+
+ thread_precedence_policy_data_t precedinfo = {
+ .importance = ((pri & (~_PTHREAD_PRIORITY_FLAGS_MASK)) - BASEPRI_DEFAULT)
+ };
+ ret = pthread_kern->thread_policy_set_internal(th, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
+ assert(ret == KERN_SUCCESS || ret == KERN_TERMINATED);
+
+ tl->th_flags |= TH_LIST_EVENT_MGR_SCHED_PRI;
+}
+
+/**
+ * grabs a thread for a request
+ *
+ * - called with the workqueue lock held...
+ * - responsible for dropping it in all cases
+ * - if provided mode is for overcommit, doesn't consume a reqcount
+ *
+ */
+static boolean_t
+workqueue_run_nextreq(proc_t p, struct workqueue *wq, thread_t thread,
+ enum run_nextreq_mode mode, pthread_priority_t oc_prio)
+{
+ thread_t th_to_run = THREAD_NULL;
+ int wake_thread = 0;
+ int upcall_flags = WQ_FLAG_THREAD_REUSE;
+ uint32_t priclass;
+ struct threadlist *tl = NULL;
+ struct uthread *uth = NULL;
+ boolean_t start_timer = FALSE;
+
+ // valid modes to call this function with
+ assert(mode == RUN_NEXTREQ_DEFAULT || mode == RUN_NEXTREQ_OVERCOMMIT || mode == RUN_NEXTREQ_UNCONSTRAINED);
+ // may only have a priority if in OVERCOMMIT mode
+ assert(mode == RUN_NEXTREQ_OVERCOMMIT || oc_prio == 0);
+ // thread == thread_null means "please spin up a new workqueue thread, we can't reuse this"
+ // thread != thread_null is thread reuse, and must be the current thread
+ assert(thread == THREAD_NULL || thread == current_thread());
+
+ PTHREAD_TRACE(TRACE_wq_run_nextitem|DBG_FUNC_START, wq, thread, wq->wq_thidlecount, wq->wq_reqcount, 0);
+
+ if (thread != THREAD_NULL) {
+ uth = pthread_kern->get_bsdthread_info(thread);
+
+ if ((tl = pthread_kern->uthread_get_threadlist(uth)) == NULL) {
+ panic("wq thread with no threadlist");
}
+ }
- goto parkit;
+ /*
+ * from here until we drop the workq lock
+ * we can't be pre-empted since we hold
+ * the lock in spin mode... this is important
+ * since we have to independently update the priority that
+ * the thread is associated with and the priorty based
+ * counters that "workqueue_callback" also changes and bases
+ * decisons on.
+ */
+
+ if (mode == RUN_NEXTREQ_OVERCOMMIT) {
+ priclass = pthread_priority_get_class_index(oc_prio);
+ upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
+ } else if (wq->wq_reqcount == 0){
+ // no work to do. we'll check again when new work arrives.
+ goto done;
+ } else if (wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] &&
+ ((wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0) ||
+ (thread != THREAD_NULL && tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET))){
+ // There's an event manager request and either:
+ // - no event manager currently running
+ // - we are re-using the event manager
+ mode = RUN_NEXTREQ_EVENT_MANAGER;
+ priclass = WORKQUEUE_EVENT_MANAGER_BUCKET;
+ upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
+ if (wq->wq_kevent_requests[WORKQUEUE_EVENT_MANAGER_BUCKET])
+ upcall_flags |= WQ_FLAG_THREAD_KEVENT;
+ } else {
+ // Find highest priority and check for special request types
+ for (priclass = 0; priclass < WORKQUEUE_EVENT_MANAGER_BUCKET; priclass++) {
+ if (wq->wq_requests[priclass])
+ break;
+ }
+ if (priclass == WORKQUEUE_EVENT_MANAGER_BUCKET){
+ // only request should have been event manager since it's not in a bucket,
+ // but we weren't able to handle it since there's already an event manager running,
+ // so we fell into this case
+ assert(wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] == 1 &&
+ wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 1 &&
+ wq->wq_reqcount == 1);
+ goto done;
+ }
+
+ if (wq->wq_kevent_ocrequests[priclass]){
+ mode = RUN_NEXTREQ_DEFERRED_OVERCOMMIT;
+ upcall_flags |= WQ_FLAG_THREAD_KEVENT;
+ upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
+ } else if (wq->wq_ocrequests[priclass]){
+ mode = RUN_NEXTREQ_DEFERRED_OVERCOMMIT;
+ upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
+ } else if (wq->wq_kevent_requests[priclass]){
+ upcall_flags |= WQ_FLAG_THREAD_KEVENT;
+ }
+ }
+
+ if (mode == RUN_NEXTREQ_DEFAULT /* non-overcommit */){
+ uint32_t my_priclass = (thread != THREAD_NULL) ? tl->th_priority : WORKQUEUE_NUM_BUCKETS;
+ if (may_start_constrained_thread(wq, priclass, my_priclass, &start_timer) == FALSE){
+ // per policy, we won't start another constrained thread
+ goto done;
+ }
}
if (thread != THREAD_NULL) {
* we pick up new work for this specific thread.
*/
th_to_run = thread;
- goto pick_up_work;
- }
-
-grab_idle_thread:
- if (wq->wq_thidlecount == 0) {
+ } else if (wq->wq_thidlecount == 0) {
/*
* we have no additional threads waiting to pick up
* work, however, there is additional work to do.
PTHREAD_TRACE(TRACE_wq_stalled, wq, wq->wq_nthreads, start_timer, 0, 0);
- goto no_thread_to_run;
+ goto done;
+ } else {
+ // there is both work available and an idle thread, so activate a thread
+ tl = pop_from_thidlelist(wq, priclass, &upcall_flags, &wake_thread);
+ th_to_run = tl->th_thread;
}
- /*
- * we already know there is both work available
- * and an idle thread, so activate a thread and then
- * fall into the code that pulls a new work request...
- */
- tl = TAILQ_FIRST(&wq->wq_thidlelist);
- TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
- wq->wq_thidlecount--;
-
- TAILQ_INSERT_TAIL(&wq->wq_thrunlist, tl, th_entry);
-
- if ((tl->th_flags & TH_LIST_SUSPENDED) == TH_LIST_SUSPENDED) {
- tl->th_flags &= ~TH_LIST_SUSPENDED;
- reuse_thread = 0;
-
- } else if ((tl->th_flags & TH_LIST_BLOCKED) == TH_LIST_BLOCKED) {
- tl->th_flags &= ~TH_LIST_BLOCKED;
- wake_thread = 1;
- }
- tl->th_flags |= TH_LIST_RUNNING | TH_LIST_BUSY;
+ // Adjust counters and thread flags AKA consume the request
+ // TODO: It would be lovely if OVERCOMMIT consumed reqcount
+ switch (mode) {
+ case RUN_NEXTREQ_DEFAULT:
+ case RUN_NEXTREQ_UNCONSTRAINED:
+ wq->wq_reqcount--;
+ wq->wq_requests[priclass]--;
- wq->wq_threads_scheduled++;
- wq->wq_thscheduled_count[priclass]++;
- OSAddAtomic(1, &wq->wq_thactive_count[priclass]);
+ if (mode == RUN_NEXTREQ_DEFAULT){
+ if (!(tl->th_flags & TH_LIST_CONSTRAINED)) {
+ wq->wq_constrained_threads_scheduled++;
+ tl->th_flags |= TH_LIST_CONSTRAINED;
+ }
+ } else if (mode == RUN_NEXTREQ_UNCONSTRAINED){
+ if (tl->th_flags & TH_LIST_CONSTRAINED) {
+ // XXX: Why aren't we unsetting CONSTRAINED_THREAD_LIMIT here
+ wq->wq_constrained_threads_scheduled--;
+ tl->th_flags &= ~TH_LIST_CONSTRAINED;
+ }
+ }
+ if (upcall_flags & WQ_FLAG_THREAD_KEVENT){
+ wq->wq_kevent_requests[priclass]--;
+ }
+ break;
- adjust_counters = FALSE;
- th_to_run = tl->th_thread;
+ case RUN_NEXTREQ_EVENT_MANAGER:
+ wq->wq_reqcount--;
+ wq->wq_requests[priclass]--;
-pick_up_work:
- if (!overcommit && !force_oc) {
- wq->wq_reqcount--;
- wq->wq_requests[priclass]--;
+ if (tl->th_flags & TH_LIST_CONSTRAINED) {
+ wq->wq_constrained_threads_scheduled--;
+ tl->th_flags &= ~TH_LIST_CONSTRAINED;
+ }
+ if (upcall_flags & WQ_FLAG_THREAD_KEVENT){
+ wq->wq_kevent_requests[priclass]--;
+ }
+ break;
- if ( !(tl->th_flags & TH_LIST_CONSTRAINED)) {
- wq->wq_constrained_threads_scheduled++;
- tl->th_flags |= TH_LIST_CONSTRAINED;
- }
- } else {
- if (tl->th_flags & TH_LIST_CONSTRAINED) {
- wq->wq_constrained_threads_scheduled--;
- tl->th_flags &= ~TH_LIST_CONSTRAINED;
- }
+ case RUN_NEXTREQ_DEFERRED_OVERCOMMIT:
+ wq->wq_reqcount--;
+ wq->wq_requests[priclass]--;
+ if (upcall_flags & WQ_FLAG_THREAD_KEVENT){
+ wq->wq_kevent_ocrequests[priclass]--;
+ } else {
+ wq->wq_ocrequests[priclass]--;
+ }
+ /* FALLTHROUGH */
+ case RUN_NEXTREQ_OVERCOMMIT:
+ if (tl->th_flags & TH_LIST_CONSTRAINED) {
+ wq->wq_constrained_threads_scheduled--;
+ tl->th_flags &= ~TH_LIST_CONSTRAINED;
+ }
+ break;
}
- orig_class = tl->th_priority;
+ // Confirm we've maintained our counter invariants
+ assert(wq->wq_requests[priclass] < UINT16_MAX);
+ assert(wq->wq_ocrequests[priclass] < UINT16_MAX);
+ assert(wq->wq_kevent_requests[priclass] < UINT16_MAX);
+ assert(wq->wq_kevent_ocrequests[priclass] < UINT16_MAX);
+ assert(wq->wq_ocrequests[priclass] + wq->wq_kevent_requests[priclass] +
+ wq->wq_kevent_ocrequests[priclass] <=
+ wq->wq_requests[priclass]);
+
+ uint32_t orig_class = tl->th_priority;
tl->th_priority = (uint8_t)priclass;
- if (adjust_counters && (orig_class != priclass)) {
+ if ((thread != THREAD_NULL) && (orig_class != priclass)) {
/*
* we need to adjust these counters based on this
* thread's new disposition w/r to priority
workqueue_unlock(p);
- if (orig_class != priclass) {
- pthread_priority_t pri = pthread_priority_from_class_index(priclass);
-
- thread_qos_policy_data_t qosinfo;
-
- /* Set the QoS tier on the thread, along with the ceiling of max importance for this class. */
- qosinfo.qos_tier = pthread_priority_get_qos_class(pri);
- qosinfo.tier_importance = 0;
-
- PTHREAD_TRACE(TRACE_wq_reset_priority | DBG_FUNC_START, wq, thread_tid(tl->th_thread), pthread_priority_from_class_index(orig_class), 0, 0);
-
- /* All the previous implementation here now boils down to setting the QoS policy on the thread. */
- pthread_kern->thread_policy_set_internal(th_to_run, THREAD_QOS_POLICY, (thread_policy_t)&qosinfo, THREAD_QOS_POLICY_COUNT);
+ pthread_priority_t outgoing_priority;
+ if (mode == RUN_NEXTREQ_EVENT_MANAGER){
+ outgoing_priority = wq->wq_event_manager_priority;
+ } else {
+ outgoing_priority = pthread_priority_from_class_index(priclass);
+ }
- PTHREAD_TRACE(TRACE_wq_reset_priority | DBG_FUNC_END, wq, thread_tid(tl->th_thread), pthread_priority_from_class_index(priclass), qosinfo.qos_tier, 0);
+ PTHREAD_TRACE(TRACE_wq_reset_priority | DBG_FUNC_START, wq, thread_tid(tl->th_thread), outgoing_priority, 0, 0);
+ if (outgoing_priority & _PTHREAD_PRIORITY_SCHED_PRI_FLAG){
+ reset_to_schedpri(tl, outgoing_priority & (~_PTHREAD_PRIORITY_FLAGS_MASK));
+ } else if (orig_class != priclass) {
+ reset_to_priority(tl, outgoing_priority);
}
+ PTHREAD_TRACE(TRACE_wq_reset_priority | DBG_FUNC_END, wq, thread_tid(tl->th_thread), outgoing_priority, 0, 0);
/*
* if current thread is reused for work request, does not return via unix_syscall
*/
- wq_runreq(p, overcommit, pthread_priority_from_class_index(priclass), th_to_run, tl, reuse_thread, wake_thread, (thread == th_to_run));
+ wq_runreq(p, outgoing_priority, th_to_run, tl, upcall_flags, wake_thread, (thread == th_to_run));
- PTHREAD_TRACE(TRACE_wq_run_nextitem|DBG_FUNC_END, wq, thread_tid(th_to_run), overcommit, 1, 0);
+ PTHREAD_TRACE(TRACE_wq_run_nextitem|DBG_FUNC_END, wq, thread_tid(th_to_run), mode == RUN_NEXTREQ_OVERCOMMIT, 1, 0);
return (TRUE);
-out_of_work:
- /*
- * we have no work to do or we are fully booked
- * w/r to running threads...
- */
-no_thread_to_run:
- workqueue_unlock(p);
-
- if (start_timer)
- workqueue_interval_timer_start(wq);
-
- PTHREAD_TRACE(TRACE_wq_run_nextitem|DBG_FUNC_END, wq, thread_tid(thread), start_timer, 2, 0);
-
- return (FALSE);
-
-parkit:
- /*
- * this is a workqueue thread with no more
- * work to do... park it for now
- */
- TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry);
- tl->th_flags &= ~TH_LIST_RUNNING;
-
- tl->th_flags |= TH_LIST_BLOCKED;
- TAILQ_INSERT_HEAD(&wq->wq_thidlelist, tl, th_entry);
-
- pthread_kern->thread_sched_call(th_to_park, NULL);
-
- OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority]);
- wq->wq_thscheduled_count[tl->th_priority]--;
- wq->wq_threads_scheduled--;
-
- if (tl->th_flags & TH_LIST_CONSTRAINED) {
- wq->wq_constrained_threads_scheduled--;
- wq->wq_lflags &= ~WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
- tl->th_flags &= ~TH_LIST_CONSTRAINED;
+done:
+ if (thread != THREAD_NULL){
+ parkit(wq,tl,thread);
}
- if (wq->wq_thidlecount < 100)
- us_to_wait = wq_reduce_pool_window_usecs - (wq->wq_thidlecount * (wq_reduce_pool_window_usecs / 100));
- else
- us_to_wait = wq_reduce_pool_window_usecs / 100;
-
- wq->wq_thidlecount++;
- wq->wq_lflags &= ~WQL_EXCEEDED_TOTAL_THREAD_LIMIT;
-
- assert_wait_timeout_with_leeway((caddr_t)tl, (THREAD_INTERRUPTIBLE),
- TIMEOUT_URGENCY_SYS_BACKGROUND|TIMEOUT_URGENCY_LEEWAY, us_to_wait,
- wq_reduce_pool_window_usecs, NSEC_PER_USEC);
workqueue_unlock(p);
if (start_timer)
workqueue_interval_timer_start(wq);
- PTHREAD_TRACE1(TRACE_wq_thread_park | DBG_FUNC_START, wq, wq->wq_threads_scheduled, wq->wq_thidlecount, us_to_wait, thread_tid(th_to_park));
- PTHREAD_TRACE(TRACE_wq_run_nextitem | DBG_FUNC_END, wq, thread_tid(thread), 0, 3, 0);
+ PTHREAD_TRACE(TRACE_wq_run_nextitem | DBG_FUNC_END, wq, thread_tid(thread), start_timer, 3, 0);
- thread_block((thread_continue_t)wq_unpark_continue);
- /* NOT REACHED */
+ if (thread != THREAD_NULL){
+ thread_block((thread_continue_t)wq_unpark_continue);
+ /* NOT REACHED */
+ }
return (FALSE);
}
-
+/**
+ * Called when a new thread is created
+ */
static void
wq_unsuspend_continue(void)
{
pthread_kern->thread_bootstrap_return();
}
-
+/**
+ * parked thread wakes up
+ */
static void
wq_unpark_continue(void)
{
- struct uthread *uth = NULL;
+ struct uthread *uth;
struct threadlist *tl;
- thread_t th_to_unpark;
- proc_t p;
-
- th_to_unpark = current_thread();
- uth = pthread_kern->get_bsdthread_info(th_to_unpark);
- if (uth != NULL) {
- if ((tl = pthread_kern->uthread_get_threadlist(uth)) != NULL) {
+ thread_t th_to_unpark = current_thread();
- if ((tl->th_flags & (TH_LIST_RUNNING | TH_LIST_BUSY)) == TH_LIST_RUNNING) {
- /*
- * a normal wakeup of this thread occurred... no need
- * for any synchronization with the timer and wq_runreq
- */
-normal_return_to_user:
- pthread_kern->thread_sched_call(th_to_unpark, workqueue_callback);
+ if ((uth = pthread_kern->get_bsdthread_info(th_to_unpark)) == NULL)
+ goto done;
+ if ((tl = pthread_kern->uthread_get_threadlist(uth)) == NULL)
+ goto done;
- PTHREAD_TRACE(0xefffd018 | DBG_FUNC_END, tl->th_workq, 0, 0, 0, 0);
-
- pthread_kern->thread_exception_return();
- }
- p = current_proc();
+ /*
+ * check if a normal wakeup of this thread occurred... if so, there's no need
+ * for any synchronization with the timer and wq_runreq so we just skip all this.
+ */
+ if ((tl->th_flags & (TH_LIST_RUNNING | TH_LIST_BUSY)) != TH_LIST_RUNNING) {
+ proc_t p = current_proc();
- workqueue_lock_spin(p);
+ workqueue_lock_spin(p);
- if ( !(tl->th_flags & TH_LIST_RUNNING)) {
- /*
- * the timer popped us out and we've not
- * been moved off of the idle list
- * so we should now self-destruct
- *
- * workqueue_removethread consumes the lock
- */
- workqueue_removethread(tl, 0);
- pthread_kern->thread_exception_return();
- }
+ if ( !(tl->th_flags & TH_LIST_RUNNING)) {
/*
- * the timer woke us up, but we have already
- * started to make this a runnable thread,
- * but have not yet finished that process...
- * so wait for the normal wakeup
+ * the timer popped us out and we've not
+ * been moved off of the idle list
+ * so we should now self-destruct
+ *
+ * workqueue_removethread consumes the lock
*/
- while ((tl->th_flags & TH_LIST_BUSY)) {
-
- assert_wait((caddr_t)tl, (THREAD_UNINT));
+ workqueue_removethread(tl, 0);
+ pthread_kern->unix_syscall_return(0);
+ }
- workqueue_unlock(p);
+ /*
+ * the timer woke us up, but we have already
+ * started to make this a runnable thread,
+ * but have not yet finished that process...
+ * so wait for the normal wakeup
+ */
+ while ((tl->th_flags & TH_LIST_BUSY)) {
- thread_block(THREAD_CONTINUE_NULL);
+ assert_wait((caddr_t)tl, (THREAD_UNINT));
- workqueue_lock_spin(p);
- }
- /*
- * we have finished setting up the thread's context
- * now we can return as if we got a normal wakeup
- */
workqueue_unlock(p);
- goto normal_return_to_user;
+ thread_block(THREAD_CONTINUE_NULL);
+
+ workqueue_lock_spin(p);
}
+
+ /*
+ * we have finished setting up the thread's context
+ * now we can return as if we got a normal wakeup
+ */
+ workqueue_unlock(p);
}
+
+ pthread_kern->thread_sched_call(th_to_unpark, workqueue_callback);
+
+ // FIXME: What's this?
+ PTHREAD_TRACE(0xefffd018 | DBG_FUNC_END, tl->th_workq, 0, 0, 0, 0);
+
+done:
+
+ // XXX should be using unix_syscall_return(EJUSTRETURN)
pthread_kern->thread_exception_return();
}
static void
-wq_runreq(proc_t p, boolean_t overcommit, pthread_priority_t priority, thread_t th, struct threadlist *tl,
- int reuse_thread, int wake_thread, int return_directly)
+wq_runreq(proc_t p, pthread_priority_t priority, thread_t th, struct threadlist *tl,
+ int flags, int wake_thread, int return_directly)
{
int ret = 0;
boolean_t need_resume = FALSE;
- PTHREAD_TRACE1(TRACE_wq_runitem | DBG_FUNC_START, tl->th_workq, overcommit, priority, thread_tid(current_thread()), thread_tid(th));
+ PTHREAD_TRACE1(TRACE_wq_runitem | DBG_FUNC_START, tl->th_workq, flags, priority, thread_tid(current_thread()), thread_tid(th));
- ret = _setup_wqthread(p, th, overcommit, priority, reuse_thread, tl);
+ ret = _setup_wqthread(p, th, priority, flags, tl);
if (ret != 0)
panic("setup_wqthread failed %x\n", ret);
if (return_directly) {
PTHREAD_TRACE(TRACE_wq_run_nextitem|DBG_FUNC_END, tl->th_workq, 0, 0, 4, 0);
+ // XXX should be using unix_syscall_return(EJUSTRETURN)
pthread_kern->thread_exception_return();
panic("wq_runreq: thread_exception_return returned ...\n");
}
workqueue_unlock(p);
} else {
- PTHREAD_TRACE1(TRACE_wq_thread_suspend | DBG_FUNC_END, tl->th_workq, 0, 0, thread_tid(current_thread()), thread_tid(th));
+ PTHREAD_TRACE1(TRACE_wq_thread_suspend | DBG_FUNC_END, tl->th_workq, 0, 0, thread_tid(current_thread()), thread_tid(th));
workqueue_lock_spin(p);
}
}
+#define KEVENT_LIST_LEN 16
+#define KEVENT_DATA_SIZE (32 * 1024)
+/**
+ * configures initial thread stack/registers to jump into:
+ * _pthread_wqthread(pthread_t self, mach_port_t kport, void *stackaddr, void *keventlist, int flags, int nkevents);
+ * to get there we jump through assembily stubs in pthread_asm.s. Those
+ * routines setup a stack frame, using the current stack pointer, and marshall
+ * arguments from registers to the stack as required by the ABI.
+ *
+ * One odd thing we do here is to start the pthread_t 4k below what would be the
+ * top of the stack otherwise. This is because usually only the first 4k of the
+ * pthread_t will be used and so we want to put it on the same 16k page as the
+ * top of the stack to save memory.
+ *
+ * When we are done the stack will look like:
+ * |-----------| th_stackaddr + th_allocsize
+ * |pthread_t | th_stackaddr + DEFAULT_STACKSIZE + guardsize + PTHREAD_STACK_OFFSET
+ * |kevent list| optionally - at most KEVENT_LIST_LEN events
+ * |kevent data| optionally - at most KEVENT_DATA_SIZE bytes
+ * |stack gap | bottom aligned to 16 bytes, and at least as big as stack_gap_min
+ * | STACK |
+ * | ⇓ |
+ * | |
+ * |guard page | guardsize
+ * |-----------| th_stackaddr
+ */
int
-_setup_wqthread(proc_t p, thread_t th, boolean_t overcommit, pthread_priority_t priority, int reuse_thread, struct threadlist *tl)
+_setup_wqthread(proc_t p, thread_t th, pthread_priority_t priority, int flags, struct threadlist *tl)
{
- uint32_t flags = reuse_thread | WQ_FLAG_THREAD_NEWSPI;
- mach_vm_size_t guardsize = vm_map_page_size(tl->th_workq->wq_map);
int error = 0;
- if (overcommit) {
- flags |= WQ_FLAG_THREAD_OVERCOMMIT;
- }
+ const vm_size_t guardsize = vm_map_page_size(tl->th_workq->wq_map);
+ const vm_size_t stack_gap_min = (proc_is64bit(p) == 0) ? C_32_STK_ALIGN : C_64_REDZONE_LEN;
+ const vm_size_t stack_align_min = (proc_is64bit(p) == 0) ? C_32_STK_ALIGN : C_64_STK_ALIGN;
+
+ user_addr_t pthread_self_addr = (user_addr_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + guardsize + PTHREAD_T_OFFSET);
+ user_addr_t stack_top_addr = (user_addr_t)((pthread_self_addr - stack_gap_min) & -stack_align_min);
+ user_addr_t stack_bottom_addr = (user_addr_t)(tl->th_stackaddr + guardsize);
/* Put the QoS class value into the lower bits of the reuse_thread register, this is where
* the thread priority used to be stored anyway.
*/
flags |= (_pthread_priority_get_qos_newest(priority) & WQ_FLAG_THREAD_PRIOMASK);
-#if defined(__i386__) || defined(__x86_64__)
- int isLP64 = proc_is64bit(p);
+ flags |= WQ_FLAG_THREAD_NEWSPI;
- /*
- * Set up i386 registers & function call.
- */
- if (isLP64 == 0) {
- x86_thread_state32_t state;
- x86_thread_state32_t *ts = &state;
-
- ts->eip = (unsigned int)pthread_kern->proc_get_wqthread(p);
- ts->eax = (unsigned int)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + guardsize);
- ts->ebx = (unsigned int)tl->th_thport;
- ts->ecx = (unsigned int)(tl->th_stackaddr + guardsize);
- ts->edx = (unsigned int)0;
- ts->edi = (unsigned int)flags;
- ts->esi = (unsigned int)0;
- /*
- * set stack pointer
- */
- ts->esp = (int)((vm_offset_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + guardsize) - C_32_STK_ALIGN));
+ user_addr_t kevent_list = NULL;
+ int kevent_count = 0;
+ if (flags & WQ_FLAG_THREAD_KEVENT){
+ kevent_list = pthread_self_addr - KEVENT_LIST_LEN * sizeof(struct kevent_qos_s);
+ kevent_count = KEVENT_LIST_LEN;
- (void)pthread_kern->thread_set_wq_state32(th, (thread_state_t)ts);
+ user_addr_t kevent_data_buf = kevent_list - KEVENT_DATA_SIZE;
+ user_size_t kevent_data_available = KEVENT_DATA_SIZE;
- } else {
- x86_thread_state64_t state64;
- x86_thread_state64_t *ts64 = &state64;
+ int32_t events_out = 0;
- ts64->rip = (uint64_t)pthread_kern->proc_get_wqthread(p);
- ts64->rdi = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + guardsize);
- ts64->rsi = (uint64_t)(tl->th_thport);
- ts64->rdx = (uint64_t)(tl->th_stackaddr + guardsize);
- ts64->rcx = (uint64_t)0;
- ts64->r8 = (uint64_t)flags;
- ts64->r9 = (uint64_t)0;
+ int ret = kevent_qos_internal(p, -1, NULL, 0, kevent_list, kevent_count,
+ kevent_data_buf, &kevent_data_available,
+ KEVENT_FLAG_WORKQ | KEVENT_FLAG_STACK_DATA | KEVENT_FLAG_STACK_EVENTS | KEVENT_FLAG_IMMEDIATE,
+ &events_out);
- /*
- * set stack pointer aligned to 16 byte boundary
- */
- ts64->rsp = (uint64_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + guardsize) - C_64_REDZONE_LEN);
+ // squash any errors into just empty output on non-debug builds
+ assert(ret == KERN_SUCCESS && events_out != -1);
+ if (ret != KERN_SUCCESS || events_out == -1){
+ events_out = 0;
+ kevent_data_available = KEVENT_DATA_SIZE;
+ }
+
+ // We shouldn't get data out if there aren't events available
+ assert(events_out != 0 || kevent_data_available == KEVENT_DATA_SIZE);
+
+ if (events_out >= 0){
+ kevent_count = events_out;
+ kevent_list = pthread_self_addr - kevent_count * sizeof(struct kevent_qos_s);
+
+ if (kevent_data_available == KEVENT_DATA_SIZE){
+ stack_top_addr = (kevent_list - stack_gap_min) & -stack_align_min;
+ } else {
+ stack_top_addr = (kevent_data_buf + kevent_data_available - stack_gap_min) & -stack_align_min;
+ }
+ } else {
+ kevent_list = NULL;
+ kevent_count = 0;
+ }
+ }
+
+#if defined(__i386__) || defined(__x86_64__)
+ int isLP64 = proc_is64bit(p);
- error = pthread_kern->thread_set_wq_state64(th, (thread_state_t)ts64);
+ if (isLP64 == 0) {
+ x86_thread_state32_t state = {
+ .eip = (unsigned int)pthread_kern->proc_get_wqthread(p),
+ .eax = /* arg0 */ (unsigned int)pthread_self_addr,
+ .ebx = /* arg1 */ (unsigned int)tl->th_thport,
+ .ecx = /* arg2 */ (unsigned int)stack_bottom_addr,
+ .edx = /* arg3 */ (unsigned int)kevent_list,
+ .edi = /* arg4 */ (unsigned int)flags,
+ .esi = /* arg5 */ (unsigned int)kevent_count,
+
+ .esp = (int)((vm_offset_t)stack_top_addr),
+ };
+
+ (void)pthread_kern->thread_set_wq_state32(th, (thread_state_t)&state);
+ } else {
+ x86_thread_state64_t state64 = {
+ // x86-64 already passes all the arguments in registers, so we just put them in their final place here
+ .rip = (uint64_t)pthread_kern->proc_get_wqthread(p),
+ .rdi = (uint64_t)pthread_self_addr,
+ .rsi = (uint64_t)tl->th_thport,
+ .rdx = (uint64_t)stack_bottom_addr,
+ .rcx = (uint64_t)kevent_list,
+ .r8 = (uint64_t)flags,
+ .r9 = (uint64_t)kevent_count,
+
+ .rsp = (uint64_t)(stack_top_addr)
+ };
+
+ error = pthread_kern->thread_set_wq_state64(th, (thread_state_t)&state64);
if (error != KERN_SUCCESS) {
error = EINVAL;
}
return error;
}
+#if DEBUG
+static int wq_kevent_test SYSCTL_HANDLER_ARGS {
+ //(struct sysctl_oid *oidp, void *arg1, int arg2, struct sysctl_req *req)
+#pragma unused(oidp, arg1, arg2)
+ int error;
+ struct workq_reqthreads_req_s requests[64] = {};
+
+ if (req->newlen > sizeof(requests) || req->newlen < sizeof(struct workq_reqthreads_req_s))
+ return EINVAL;
+
+ error = copyin(req->newptr, requests, req->newlen);
+ if (error) return error;
+
+ _workq_reqthreads(req->p, (int)(req->newlen / sizeof(struct workq_reqthreads_req_s)), requests);
+
+ return 0;
+}
+#endif // DEBUG
+
+#pragma mark - Misc
+
int
_fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
{
sysctl_register_oid(&sysctl__kern_wq_max_threads);
sysctl_register_oid(&sysctl__kern_wq_max_constrained_threads);
sysctl_register_oid(&sysctl__kern_pthread_debug_tracing);
+
+#if DEBUG
+ sysctl_register_oid(&sysctl__kern_wq_max_concurrency);
+ sysctl_register_oid(&sysctl__debug_wq_kevent_test);
+#endif
+
+ wq_max_concurrency = pthread_kern->ml_get_max_cpus();
+
}