* Copyright (c) 2000-2012 Apple Inc. All rights reserved.
*
* @APPLE_OSREFERENCE_LICENSE_HEADER_START@
- *
+ *
* This file contains Original Code and/or Modifications of Original Code
* as defined in and that are subject to the Apple Public Source License
* Version 2.0 (the 'License'). You may not use this file except in
* unlawful or unlicensed copies of an Apple operating system, or to
* circumvent, violate, or enable the circumvention or violation of, any
* terms of an Apple operating system software license agreement.
- *
+ *
* Please obtain a copy of the License at
* http://www.opensource.apple.com/apsl/ and read it before using this file.
- *
+ *
* The Original Code and all software distributed under the License are
* distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
* EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
* FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
* Please see the License for the specific language governing rights and
* limitations under the License.
- *
+ *
* @APPLE_OSREFERENCE_LICENSE_HEADER_END@
*/
/* Copyright (c) 1995-2005 Apple Computer, Inc. All Rights Reserved */
#include <kern/clock.h>
#include <mach/kern_return.h>
#include <kern/thread.h>
-#include <kern/sched_prim.h>
-#include <kern/kalloc.h>
+#include <kern/zalloc.h>
#include <kern/sched_prim.h> /* for thread_exception_return */
#include <kern/processor.h>
#include <kern/assert.h>
lck_grp_t *pthread_lck_grp;
lck_attr_t *pthread_lck_attr;
+zone_t pthread_zone_workqueue;
+zone_t pthread_zone_threadlist;
+zone_t pthread_zone_threadreq;
+
extern void thread_set_cthreadself(thread_t thread, uint64_t pself, int isLP64);
extern void workqueue_thread_yielded(void);
-enum run_nextreq_mode {
- RUN_NEXTREQ_DEFAULT,
- RUN_NEXTREQ_DEFAULT_KEVENT,
- RUN_NEXTREQ_OVERCOMMIT,
- RUN_NEXTREQ_OVERCOMMIT_KEVENT,
- RUN_NEXTREQ_DEFERRED_OVERCOMMIT,
- RUN_NEXTREQ_UNCONSTRAINED,
- RUN_NEXTREQ_EVENT_MANAGER,
- RUN_NEXTREQ_ADD_TIMER
-};
-static thread_t workqueue_run_nextreq(proc_t p, struct workqueue *wq, thread_t th,
- enum run_nextreq_mode mode, pthread_priority_t prio,
- bool kevent_bind_via_return);
-
-static boolean_t workqueue_run_one(proc_t p, struct workqueue *wq, boolean_t overcommit, pthread_priority_t priority);
-
-static void wq_runreq(proc_t p, thread_t th, struct workqueue *wq,
- struct threadlist *tl, boolean_t return_directly, boolean_t deferred_kevent);
-
-static void _setup_wqthread(proc_t p, thread_t th, struct workqueue *wq, struct threadlist *tl, bool first_use);
+#define WQ_SETUP_FIRST_USE 1
+#define WQ_SETUP_CLEAR_VOUCHER 2
+static void _setup_wqthread(proc_t p, thread_t th, struct workqueue *wq,
+ struct threadlist *tl, int flags);
static void reset_priority(struct threadlist *tl, pthread_priority_t pri);
static pthread_priority_t pthread_priority_from_wq_class_index(struct workqueue *wq, int index);
static void wq_unpark_continue(void* ptr, wait_result_t wait_result) __dead2;
-static boolean_t workqueue_addnewthread(struct workqueue *wq, boolean_t ignore_constrained_thread_limit);
-
+static bool workqueue_addnewthread(proc_t p, struct workqueue *wq);
static void workqueue_removethread(struct threadlist *tl, bool fromexit, bool first_use);
static void workqueue_lock_spin(struct workqueue *);
static void workqueue_unlock(struct workqueue *);
-static boolean_t may_start_constrained_thread(struct workqueue *wq, uint32_t at_priclass, uint32_t my_priclass, boolean_t *start_timer);
+#define WQ_RUN_TR_THROTTLED 0
+#define WQ_RUN_TR_THREAD_NEEDED 1
+#define WQ_RUN_TR_THREAD_STARTED 2
+#define WQ_RUN_TR_EXITING 3
+static int workqueue_run_threadreq_and_unlock(proc_t p, struct workqueue *wq,
+ struct threadlist *tl, struct threadreq *req, bool may_add_new_thread);
+
+static bool may_start_constrained_thread(struct workqueue *wq,
+ uint32_t at_priclass, struct threadlist *tl, bool may_start_timer);
static mach_vm_offset_t stack_addr_hint(proc_t p, vm_map_t vmap);
+static boolean_t wq_thread_is_busy(uint64_t cur_ts,
+ _Atomic uint64_t *lastblocked_tsp);
int proc_settargetconc(pid_t pid, int queuenum, int32_t targetconc);
int proc_setalltargetconc(pid_t pid, int32_t * targetconcp);
#define PTHREAD_T_OFFSET 0
/*
- * Flags filed passed to bsdthread_create and back in pthread_start
+ * Flags filed passed to bsdthread_create and back in pthread_start
31 <---------------------------------> 0
_________________________________________
| flags(8) | policy(8) | importance(16) |
#pragma mark sysctls
-uint32_t wq_yielded_threshold = WQ_YIELDED_THRESHOLD;
-uint32_t wq_yielded_window_usecs = WQ_YIELDED_WINDOW_USECS;
-uint32_t wq_stalled_window_usecs = WQ_STALLED_WINDOW_USECS;
-uint32_t wq_reduce_pool_window_usecs = WQ_REDUCE_POOL_WINDOW_USECS;
-uint32_t wq_max_timer_interval_usecs = WQ_MAX_TIMER_INTERVAL_USECS;
-uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS;
-uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8;
-uint32_t wq_max_concurrency = 1; // set to ncpus on load
-
-SYSCTL_INT(_kern, OID_AUTO, wq_yielded_threshold, CTLFLAG_RW | CTLFLAG_LOCKED,
- &wq_yielded_threshold, 0, "");
-
-SYSCTL_INT(_kern, OID_AUTO, wq_yielded_window_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
- &wq_yielded_window_usecs, 0, "");
+static uint32_t wq_stalled_window_usecs = WQ_STALLED_WINDOW_USECS;
+static uint32_t wq_reduce_pool_window_usecs = WQ_REDUCE_POOL_WINDOW_USECS;
+static uint32_t wq_max_timer_interval_usecs = WQ_MAX_TIMER_INTERVAL_USECS;
+static uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS;
+static uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8;
+static uint32_t wq_max_concurrency[WORKQUEUE_NUM_BUCKETS + 1]; // set to ncpus on load
SYSCTL_INT(_kern, OID_AUTO, wq_stalled_window_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
&wq_stalled_window_usecs, 0, "");
&wq_max_constrained_threads, 0, "");
#ifdef DEBUG
-SYSCTL_INT(_kern, OID_AUTO, wq_max_concurrency, CTLFLAG_RW | CTLFLAG_LOCKED,
- &wq_max_concurrency, 0, "");
-
static int wq_kevent_test SYSCTL_HANDLER_ARGS;
SYSCTL_PROC(_debug, OID_AUTO, wq_kevent_test, CTLFLAG_MASKED | CTLFLAG_RW | CTLFLAG_LOCKED | CTLFLAG_ANYBODY | CTLTYPE_OPAQUE, NULL, 0, wq_kevent_test, 0, "-");
#endif
SYSCTL_INT(_kern, OID_AUTO, pthread_debug_tracing, CTLFLAG_RW | CTLFLAG_LOCKED,
&pthread_debug_tracing, 0, "")
+/*
+ * +-----+-----+-----+-----+-----+-----+-----+
+ * | MT | BG | UT | DE | IN | UN | mgr |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ * | pri | 5 | 4 | 3 | 2 | 1 | 0 | 6 |
+ * | qos | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+ * +-----+-----+-----+-----+-----+-----+-----+-----+
+ */
+static inline uint32_t
+_wq_bucket_to_thread_qos(int pri)
+{
+ if (pri == WORKQUEUE_EVENT_MANAGER_BUCKET) {
+ return WORKQUEUE_EVENT_MANAGER_BUCKET + 1;
+ }
+ return WORKQUEUE_EVENT_MANAGER_BUCKET - pri;
+}
+
+#pragma mark wq_thactive
+
+#if defined(__LP64__)
+// Layout is:
+// 7 * 16 bits for each QoS bucket request count (including manager)
+// 3 bits of best QoS among all pending constrained requests
+// 13 bits of zeroes
+#define WQ_THACTIVE_BUCKET_WIDTH 16
+#define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH)
+#else
+// Layout is:
+// 6 * 10 bits for each QoS bucket request count (except manager)
+// 1 bit for the manager bucket
+// 3 bits of best QoS among all pending constrained requests
+#define WQ_THACTIVE_BUCKET_WIDTH 10
+#define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
+#endif
+#define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
+#define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
+#define WQ_THACTIVE_NO_PENDING_REQUEST 6
+
+_Static_assert(sizeof(wq_thactive_t) * CHAR_BIT - WQ_THACTIVE_QOS_SHIFT >= 3,
+ "Make sure we have space to encode a QoS");
+
+static inline wq_thactive_t
+_wq_thactive_fetch_and_add(struct workqueue *wq, wq_thactive_t offset)
+{
+#if PTHREAD_INLINE_RMW_ATOMICS || !defined(__LP64__)
+ return atomic_fetch_add_explicit(&wq->wq_thactive, offset,
+ memory_order_relaxed);
+#else
+ return pthread_kern->atomic_fetch_add_128_relaxed(&wq->wq_thactive, offset);
+#endif
+}
+
+static inline wq_thactive_t
+_wq_thactive(struct workqueue *wq)
+{
+#if PTHREAD_INLINE_RMW_ATOMICS || !defined(__LP64__)
+ return atomic_load_explicit(&wq->wq_thactive, memory_order_relaxed);
+#else
+ return pthread_kern->atomic_load_128_relaxed(&wq->wq_thactive);
+#endif
+}
+
+#define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
+ ((tha) >> WQ_THACTIVE_QOS_SHIFT)
+
+static inline uint32_t
+_wq_thactive_best_constrained_req_qos(struct workqueue *wq)
+{
+ // Avoid expensive atomic operations: the three bits we're loading are in
+ // a single byte, and always updated under the workqueue lock
+ wq_thactive_t v = *(wq_thactive_t *)&wq->wq_thactive;
+ return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v);
+}
+
+static inline wq_thactive_t
+_wq_thactive_set_best_constrained_req_qos(struct workqueue *wq,
+ uint32_t orig_qos, uint32_t new_qos)
+{
+ wq_thactive_t v;
+ v = (wq_thactive_t)(new_qos - orig_qos) << WQ_THACTIVE_QOS_SHIFT;
+ /*
+ * We can do an atomic add relative to the initial load because updates
+ * to this qos are always serialized under the workqueue lock.
+ */
+ return _wq_thactive_fetch_and_add(wq, v) + v;
+}
+
+static inline wq_thactive_t
+_wq_thactive_offset_for_qos(int qos)
+{
+ return (wq_thactive_t)1 << (qos * WQ_THACTIVE_BUCKET_WIDTH);
+}
+
+static inline wq_thactive_t
+_wq_thactive_inc(struct workqueue *wq, int qos)
+{
+ return _wq_thactive_fetch_and_add(wq, _wq_thactive_offset_for_qos(qos));
+}
+
+static inline wq_thactive_t
+_wq_thactive_dec(struct workqueue *wq, int qos)
+{
+ return _wq_thactive_fetch_and_add(wq, -_wq_thactive_offset_for_qos(qos));
+}
+
+static inline wq_thactive_t
+_wq_thactive_move(struct workqueue *wq, int oldqos, int newqos)
+{
+ return _wq_thactive_fetch_and_add(wq, _wq_thactive_offset_for_qos(newqos) -
+ _wq_thactive_offset_for_qos(oldqos));
+}
+
+static inline uint32_t
+_wq_thactive_aggregate_downto_qos(struct workqueue *wq, wq_thactive_t v,
+ int qos, uint32_t *busycount, uint32_t *max_busycount)
+{
+ uint32_t count = 0, active;
+ uint64_t curtime;
+
+#ifndef __LP64__
+ /*
+ * on 32bits the manager bucket is a single bit and the best constrained
+ * request QoS 3 bits are where the 10 bits of a regular QoS bucket count
+ * would be. Mask them out.
+ */
+ v &= ~(~0ull << WQ_THACTIVE_QOS_SHIFT);
+#endif
+ if (busycount) {
+ curtime = mach_absolute_time();
+ *busycount = 0;
+ }
+ if (max_busycount) {
+ *max_busycount = qos + 1;
+ }
+ for (int i = 0; i <= qos; i++, v >>= WQ_THACTIVE_BUCKET_WIDTH) {
+ active = v & WQ_THACTIVE_BUCKET_MASK;
+ count += active;
+ if (busycount && wq->wq_thscheduled_count[i] > active) {
+ if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i])) {
+ /*
+ * We only consider the last blocked thread for a given bucket
+ * as busy because we don't want to take the list lock in each
+ * sched callback. However this is an approximation that could
+ * contribute to thread creation storms.
+ */
+ (*busycount)++;
+ }
+ }
+ }
+ return count;
+}
#pragma mark - Process/Thread Setup/Teardown syscalls
stackaddr = SHARED_REGION_BASE_I386 + SHARED_REGION_SIZE_I386 + aslr_offset;
}
#elif defined(__arm__) || defined(__arm64__)
- // vm_map_get_max_aslr_slide_pages ensures 1MB of slide, we do better
- aslr_offset = random() % ((proc64bit ? 4 : 2) * PTH_DEFAULT_STACKSIZE);
- aslr_offset = vm_map_trunc_page_mask((vm_map_offset_t)aslr_offset, vm_map_page_mask(vmap));
- if (proc64bit) {
- // 64 stacks below nanomalloc (see NANOZONE_SIGNATURE)
- stackaddr = 0x170000000 - 64 * PTH_DEFAULT_STACKSIZE - aslr_offset;
+ user_addr_t main_thread_stack_top = 0;
+ if (pthread_kern->proc_get_user_stack) {
+ main_thread_stack_top = pthread_kern->proc_get_user_stack(p);
+ }
+ if (proc64bit && main_thread_stack_top) {
+ // The main thread stack position is randomly slid by xnu (c.f.
+ // load_main() in mach_loader.c), so basing pthread stack allocations
+ // where the main thread stack ends is already ASLRd and doing so
+ // avoids creating a gap in the process address space that may cause
+ // extra PTE memory usage. rdar://problem/33328206
+ stackaddr = vm_map_trunc_page_mask((vm_map_offset_t)main_thread_stack_top,
+ vm_map_page_mask(vmap));
} else {
- // If you try to slide down from this point, you risk ending up in memory consumed by malloc
- stackaddr = SHARED_REGION_BASE_ARM - 32 * PTH_DEFAULT_STACKSIZE + aslr_offset;
+ // vm_map_get_max_aslr_slide_pages ensures 1MB of slide, we do better
+ aslr_offset = random() % ((proc64bit ? 4 : 2) * PTH_DEFAULT_STACKSIZE);
+ aslr_offset = vm_map_trunc_page_mask((vm_map_offset_t)aslr_offset,
+ vm_map_page_mask(vmap));
+ if (proc64bit) {
+ // 64 stacks below shared region
+ stackaddr = SHARED_REGION_BASE_ARM64 - 64 * PTH_DEFAULT_STACKSIZE - aslr_offset;
+ } else {
+ // If you try to slide down from this point, you risk ending up in memory consumed by malloc
+ stackaddr = SHARED_REGION_BASE_ARM - 32 * PTH_DEFAULT_STACKSIZE + aslr_offset;
+ }
}
#else
#error Need to define a stack address hint for this architecture
sright = (void *)pthread_kern->convert_thread_to_port(th);
th_thport = pthread_kern->ipc_port_copyout_send(sright, pthread_kern->task_get_ipcspace(ctask));
+ if (!MACH_PORT_VALID(th_thport)) {
+ error = EMFILE; // userland will convert this into a crash
+ goto out;
+ }
if ((flags & PTHREAD_START_CUSTOM) == 0) {
mach_vm_size_t pthread_size =
*/
kret = mach_vm_protect(vmap, stackaddr, th_guardsize, FALSE, VM_PROT_NONE);
- if (kret != KERN_SUCCESS) {
+ if (kret != KERN_SUCCESS) {
error = ENOMEM;
goto out1;
}
th_pthread = stackaddr + th_guardsize + user_stack;
th_stack = th_pthread;
-
+
/*
* Pre-fault the first page of the new thread's stack and the page that will
* contain the pthread_t structure.
- */
- if (vm_map_trunc_page_mask((vm_map_offset_t)(th_stack - C_64_REDZONE_LEN), vm_map_page_mask(vmap)) !=
+ */
+ if (vm_map_trunc_page_mask((vm_map_offset_t)(th_stack - C_64_REDZONE_LEN), vm_map_page_mask(vmap)) !=
vm_map_trunc_page_mask((vm_map_offset_t)th_pthread, vm_map_page_mask(vmap))){
vm_fault( vmap,
vm_map_trunc_page_mask((vm_map_offset_t)(th_stack - C_64_REDZONE_LEN), vm_map_page_mask(vmap)),
VM_PROT_READ | VM_PROT_WRITE,
- FALSE,
+ FALSE,
THREAD_UNINT, NULL, 0);
}
-
+
vm_fault( vmap,
vm_map_trunc_page_mask((vm_map_offset_t)th_pthread, vm_map_page_mask(vmap)),
VM_PROT_READ | VM_PROT_WRITE,
- FALSE,
+ FALSE,
THREAD_UNINT, NULL, 0);
} else {
error = EINVAL;
goto out;
}
-
+
}
#elif defined(__arm__)
arm_thread_state_t state = {
.r[7] = 0,
.lr = 0,
- /*
+ /*
* set stack pointer
*/
.sp = (int)((vm_offset_t)(th_stack-C_32_STK_ALIGN))
pthread_kern->thread_policy_set_internal(th, THREAD_QOS_POLICY, (thread_policy_t)&qos, THREAD_QOS_POLICY_COUNT);
}
+ if (pthread_kern->proc_get_mach_thread_self_tsd_offset) {
+ uint64_t mach_thread_self_offset =
+ pthread_kern->proc_get_mach_thread_self_tsd_offset(p);
+ if (mach_thread_self_offset && tsd_offset) {
+ bool proc64bit = proc_is64bit(p);
+ if (proc64bit) {
+ uint64_t th_thport_tsd = (uint64_t)th_thport;
+ error = copyout(&th_thport_tsd, th_pthread + tsd_offset +
+ mach_thread_self_offset, sizeof(th_thport_tsd));
+ } else {
+ uint32_t th_thport_tsd = (uint32_t)th_thport;
+ error = copyout(&th_thport_tsd, th_pthread + tsd_offset +
+ mach_thread_self_offset, sizeof(th_thport_tsd));
+ }
+ if (error) {
+ goto out1;
+ }
+ }
+ }
+
kret = pthread_kern->thread_resume(th);
if (kret != KERN_SUCCESS) {
error = EINVAL;
}
out:
(void)pthread_kern->mach_port_deallocate(pthread_kern->task_get_ipcspace(ctask), th_thport);
+ if (pthread_kern->thread_will_park_or_terminate) {
+ pthread_kern->thread_will_park_or_terminate(th);
+ }
(void)thread_terminate(th);
(void)thread_deallocate(th);
return(error);
}
}
}
-
- (void) thread_terminate(th);
+
+ if (pthread_kern->thread_will_park_or_terminate) {
+ pthread_kern->thread_will_park_or_terminate(th);
+ }
+ (void)thread_terminate(th);
if (sem != MACH_PORT_NULL) {
kret = pthread_kern->semaphore_signal_internal_trap(sem);
if (kret != KERN_SUCCESS) {
return(EINVAL);
}
}
-
+
if (kthport != MACH_PORT_NULL) {
pthread_kern->mach_port_deallocate(pthread_kern->task_get_ipcspace(current_task()), kthport);
}
uint64_t dispatchqueue_offset,
int32_t *retval)
{
- /* We have to do this first so that it resets after fork */
- pthread_kern->proc_set_stack_addr_hint(p, (user_addr_t)stack_addr_hint(p, pthread_kern->current_map()));
+ struct _pthread_registration_data data = {};
+ uint32_t max_tsd_offset;
+ kern_return_t kr;
+ size_t pthread_init_sz = 0;
- /* prevent multiple registrations */
- if (pthread_kern->proc_get_register(p) != 0) {
- return(EINVAL);
- }
/* syscall randomizer test can pass bogus values */
if (pthsize < 0 || pthsize > MAX_PTHREAD_SIZE) {
return(EINVAL);
}
+ /*
+ * if we have pthread_init_data, then we use that and target_concptr
+ * (which is an offset) get data.
+ */
+ if (pthread_init_data != 0) {
+ if (pthread_init_data_size < sizeof(data.version)) {
+ return EINVAL;
+ }
+ pthread_init_sz = MIN(sizeof(data), (size_t)pthread_init_data_size);
+ int ret = copyin(pthread_init_data, &data, pthread_init_sz);
+ if (ret) {
+ return ret;
+ }
+ if (data.version != (size_t)pthread_init_data_size) {
+ return EINVAL;
+ }
+ } else {
+ data.dispatch_queue_offset = dispatchqueue_offset;
+ }
+
+ /* We have to do this before proc_get_register so that it resets after fork */
+ mach_vm_offset_t stackaddr = stack_addr_hint(p, pthread_kern->current_map());
+ pthread_kern->proc_set_stack_addr_hint(p, (user_addr_t)stackaddr);
+
+ /* prevent multiple registrations */
+ if (pthread_kern->proc_get_register(p) != 0) {
+ return(EINVAL);
+ }
+
pthread_kern->proc_set_threadstart(p, threadstart);
pthread_kern->proc_set_wqthread(p, wqthread);
pthread_kern->proc_set_pthsize(p, pthsize);
pthread_kern->proc_set_register(p);
- /* if we have pthread_init_data, then we use that and target_concptr (which is an offset) get data. */
- if (pthread_init_data != 0) {
- thread_qos_policy_data_t qos;
+ uint32_t tsd_slot_sz = proc_is64bit(p) ? sizeof(uint64_t) : sizeof(uint32_t);
+ if ((uint32_t)pthsize >= tsd_slot_sz &&
+ data.tsd_offset <= (uint32_t)(pthsize - tsd_slot_sz)) {
+ max_tsd_offset = ((uint32_t)pthsize - data.tsd_offset - tsd_slot_sz);
+ } else {
+ data.tsd_offset = 0;
+ max_tsd_offset = 0;
+ }
+ pthread_kern->proc_set_pthread_tsd_offset(p, data.tsd_offset);
- struct _pthread_registration_data data = {};
- size_t pthread_init_sz = MIN(sizeof(struct _pthread_registration_data), (size_t)pthread_init_data_size);
+ if (data.dispatch_queue_offset > max_tsd_offset) {
+ data.dispatch_queue_offset = 0;
+ }
+ pthread_kern->proc_set_dispatchqueue_offset(p, data.dispatch_queue_offset);
- kern_return_t kr = copyin(pthread_init_data, &data, pthread_init_sz);
- if (kr != KERN_SUCCESS) {
- return EINVAL;
+ if (pthread_kern->proc_set_return_to_kernel_offset) {
+ if (data.return_to_kernel_offset > max_tsd_offset) {
+ data.return_to_kernel_offset = 0;
}
+ pthread_kern->proc_set_return_to_kernel_offset(p,
+ data.return_to_kernel_offset);
+ }
- /* Incoming data from the data structure */
- pthread_kern->proc_set_dispatchqueue_offset(p, data.dispatch_queue_offset);
- if (data.version > offsetof(struct _pthread_registration_data, tsd_offset)
- && data.tsd_offset < (uint32_t)pthsize) {
- pthread_kern->proc_set_pthread_tsd_offset(p, data.tsd_offset);
+ if (pthread_kern->proc_set_mach_thread_self_tsd_offset) {
+ if (data.mach_thread_self_offset > max_tsd_offset) {
+ data.mach_thread_self_offset = 0;
}
+ pthread_kern->proc_set_mach_thread_self_tsd_offset(p,
+ data.mach_thread_self_offset);
+ }
+ if (pthread_init_data != 0) {
/* Outgoing data that userspace expects as a reply */
data.version = sizeof(struct _pthread_registration_data);
if (pthread_kern->qos_main_thread_active()) {
mach_msg_type_number_t nqos = THREAD_QOS_POLICY_COUNT;
+ thread_qos_policy_data_t qos;
boolean_t gd = FALSE;
kr = pthread_kern->thread_policy_get(current_thread(), THREAD_QOS_POLICY, (thread_policy_t)&qos, &nqos, &gd);
if (kr != KERN_SUCCESS) {
return EINVAL;
}
- } else {
- pthread_kern->proc_set_dispatchqueue_offset(p, dispatchqueue_offset);
}
/* return the supported feature set as the return value. */
int
_bsdthread_ctl_set_qos(struct proc *p, user_addr_t __unused cmd, mach_port_name_t kport, user_addr_t tsd_priority_addr, user_addr_t arg3, int *retval)
{
- kern_return_t kr;
+ int rv;
thread_t th;
pthread_priority_t priority;
/* QoS is stored in a given slot in the pthread TSD. We need to copy that in and set our QoS based on it. */
if (proc_is64bit(p)) {
uint64_t v;
- kr = copyin(tsd_priority_addr, &v, sizeof(v));
- if (kr != KERN_SUCCESS) {
- return kr;
- }
+ rv = copyin(tsd_priority_addr, &v, sizeof(v));
+ if (rv) goto out;
priority = (int)(v & 0xffffffff);
} else {
uint32_t v;
- kr = copyin(tsd_priority_addr, &v, sizeof(v));
- if (kr != KERN_SUCCESS) {
- return kr;
- }
+ rv = copyin(tsd_priority_addr, &v, sizeof(v));
+ if (rv) goto out;
priority = v;
}
return EPERM;
}
- int rv = _bsdthread_ctl_set_self(p, 0, priority, 0, _PTHREAD_SET_SELF_QOS_FLAG, retval);
+ rv = _bsdthread_ctl_set_self(p, 0, priority, 0, _PTHREAD_SET_SELF_QOS_FLAG, retval);
/* Static param the thread, we just set QoS on it, so its stuck in QoS land now. */
/* pthread_kern->thread_static_param(th, TRUE); */ // see <rdar://problem/16433744>, for details
thread_deallocate(th);
+out:
return rv;
}
return NULL;
}
+boolean_t
+_workq_thread_has_been_unbound(thread_t th, int qos_class)
+{
+ struct threadlist *tl = util_get_thread_threadlist_entry(th);
+ if (!tl) {
+ return FALSE;
+ }
+
+ struct workqueue *wq = tl->th_workq;
+ workqueue_lock_spin(wq);
+
+ if (tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET) {
+ goto failure;
+ } else if (qos_class != class_index_get_thread_qos(tl->th_priority)) {
+ goto failure;
+ }
+
+ if ((tl->th_flags & TH_LIST_KEVENT_BOUND)){
+ goto failure;
+ }
+ tl->th_flags &= ~TH_LIST_KEVENT_BOUND;
+
+ workqueue_unlock(wq);
+ return TRUE;
+
+failure:
+ workqueue_unlock(wq);
+ return FALSE;
+}
+
int
_bsdthread_ctl_set_self(struct proc *p, user_addr_t __unused cmd, pthread_priority_t priority, mach_port_name_t voucher, _pthread_set_flags_t flags, int __unused *retval)
{
thread_qos_policy_data_t qos;
mach_msg_type_number_t nqos = THREAD_QOS_POLICY_COUNT;
boolean_t gd = FALSE;
- bool was_manager_thread = false;
thread_t th = current_thread();
struct workqueue *wq = NULL;
struct threadlist *tl = NULL;
workqueue_lock_spin(wq);
if (tl->th_flags & TH_LIST_KEVENT_BOUND) {
tl->th_flags &= ~TH_LIST_KEVENT_BOUND;
- unsigned int kevent_flags = KEVENT_FLAG_WORKQ;
+ unsigned int kevent_flags = KEVENT_FLAG_WORKQ | KEVENT_FLAG_UNBIND_CHECK_FLAGS;
if (tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET) {
kevent_flags |= KEVENT_FLAG_WORKQ_MANAGER;
}
workqueue_unlock(wq);
- kevent_qos_internal_unbind(p, class_index_get_thread_qos(tl->th_priority), th, kevent_flags);
+ __assert_only int ret = kevent_qos_internal_unbind(p, class_index_get_thread_qos(tl->th_priority), th, kevent_flags);
+ assert(ret == 0);
} else {
workqueue_unlock(wq);
}
goto voucher;
}
- /* If we have main-thread QoS then we don't allow a thread to come out of QOS_CLASS_UNSPECIFIED. */
- if (pthread_kern->qos_main_thread_active() && qos.qos_tier == THREAD_QOS_UNSPECIFIED) {
+ /*
+ * If we have main-thread QoS then we don't allow a thread to come out
+ * of QOS_CLASS_UNSPECIFIED.
+ */
+ if (pthread_kern->qos_main_thread_active() && qos.qos_tier ==
+ THREAD_QOS_UNSPECIFIED) {
qos_rv = EPERM;
goto voucher;
}
- /* Get the work queue for tracing, also the threadlist for bucket manipluation. */
if (!tl) {
tl = util_get_thread_threadlist_entry(th);
if (tl) wq = tl->th_workq;
qos.qos_tier = pthread_priority_get_thread_qos(priority);
qos.tier_importance = (qos.qos_tier == QOS_CLASS_UNSPECIFIED) ? 0 : _pthread_priority_get_relpri(priority);
- if (qos.qos_tier == QOS_CLASS_UNSPECIFIED) {
+ if (qos.qos_tier == QOS_CLASS_UNSPECIFIED ||
+ qos.tier_importance > 0 || qos.tier_importance < THREAD_QOS_MIN_TIER_IMPORTANCE) {
qos_rv = EINVAL;
goto voucher;
}
- /* If we're a workqueue, the threadlist item priority needs adjusting, along with the bucket we were running in. */
+ /*
+ * If we're a workqueue, the threadlist item priority needs adjusting,
+ * along with the bucket we were running in.
+ */
if (tl) {
- workqueue_lock_spin(wq);
- bool now_under_constrained_limit = false;
-
- assert(!(tl->th_flags & TH_LIST_KEVENT_BOUND));
+ bool try_run_threadreq = false;
+ workqueue_lock_spin(wq);
kr = pthread_kern->thread_set_workq_qos(th, qos.qos_tier, qos.tier_importance);
assert(kr == KERN_SUCCESS || kr == KERN_TERMINATED);
/* Fix up counters. */
uint8_t old_bucket = tl->th_priority;
uint8_t new_bucket = pthread_priority_get_class_index(priority);
- if (old_bucket == WORKQUEUE_EVENT_MANAGER_BUCKET) {
- was_manager_thread = true;
- }
-
- uint32_t old_active = OSAddAtomic(-1, &wq->wq_thactive_count[old_bucket]);
- OSAddAtomic(1, &wq->wq_thactive_count[new_bucket]);
- wq->wq_thscheduled_count[old_bucket]--;
- wq->wq_thscheduled_count[new_bucket]++;
+ if (old_bucket != new_bucket) {
+ _wq_thactive_move(wq, old_bucket, new_bucket);
+ wq->wq_thscheduled_count[old_bucket]--;
+ wq->wq_thscheduled_count[new_bucket]++;
+ if (old_bucket == WORKQUEUE_EVENT_MANAGER_BUCKET ||
+ old_bucket < new_bucket) {
+ /*
+ * if the QoS of the thread was lowered, then this could
+ * allow for a higher QoS thread request to run, so we need
+ * to reevaluate.
+ */
+ try_run_threadreq = true;
+ }
+ tl->th_priority = new_bucket;
+ }
bool old_overcommit = !(tl->th_flags & TH_LIST_CONSTRAINED);
bool new_overcommit = priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
if (!old_overcommit && new_overcommit) {
- wq->wq_constrained_threads_scheduled--;
- tl->th_flags &= ~TH_LIST_CONSTRAINED;
- if (wq->wq_constrained_threads_scheduled == wq_max_constrained_threads - 1) {
- now_under_constrained_limit = true;
+ if (wq->wq_constrained_threads_scheduled-- ==
+ wq_max_constrained_threads) {
+ try_run_threadreq = true;
}
+ tl->th_flags &= ~TH_LIST_CONSTRAINED;
} else if (old_overcommit && !new_overcommit) {
wq->wq_constrained_threads_scheduled++;
tl->th_flags |= TH_LIST_CONSTRAINED;
}
- tl->th_priority = new_bucket;
-
- /* If we were at the ceiling of threads for a given bucket, we have
- * to reevaluate whether we should start more work.
- */
- if (old_active == wq->wq_reqconc[old_bucket] || now_under_constrained_limit) {
- /* workqueue_run_nextreq will drop the workqueue lock in all exit paths. */
- (void)workqueue_run_nextreq(p, wq, THREAD_NULL, RUN_NEXTREQ_DEFAULT, 0, false);
+ if (try_run_threadreq) {
+ workqueue_run_threadreq_and_unlock(p, wq, NULL, NULL, true);
} else {
workqueue_unlock(wq);
}
if (fixedpri_rv) {
return fixedpri_rv;
}
-
+
return 0;
}
return 0;
}
+static int
+_bsdthread_ctl_max_parallelism(struct proc __unused *p, user_addr_t __unused cmd,
+ int qos, unsigned long flags, int *retval)
+{
+ _Static_assert(QOS_PARALLELISM_COUNT_LOGICAL ==
+ _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL, "logical");
+ _Static_assert(QOS_PARALLELISM_REALTIME ==
+ _PTHREAD_QOS_PARALLELISM_REALTIME, "realtime");
+
+ if (flags & ~(QOS_PARALLELISM_REALTIME | QOS_PARALLELISM_COUNT_LOGICAL)) {
+ return EINVAL;
+ }
+
+ if (flags & QOS_PARALLELISM_REALTIME) {
+ if (qos) {
+ return EINVAL;
+ }
+ } else if (qos == THREAD_QOS_UNSPECIFIED || qos >= THREAD_QOS_LAST) {
+ return EINVAL;
+ }
+
+ *retval = pthread_kern->qos_max_parallelism(qos, flags);
+ return 0;
+}
+
int
_bsdthread_ctl(struct proc *p, user_addr_t cmd, user_addr_t arg1, user_addr_t arg2, user_addr_t arg3, int *retval)
{
return _bsdthread_ctl_qos_dispatch_asynchronous_override_reset(p, cmd, (int)arg1, arg2, arg3, retval);
case BSDTHREAD_CTL_SET_SELF:
return _bsdthread_ctl_set_self(p, cmd, (pthread_priority_t)arg1, (mach_port_name_t)arg2, (_pthread_set_flags_t)arg3, retval);
+ case BSDTHREAD_CTL_QOS_MAX_PARALLELISM:
+ return _bsdthread_ctl_max_parallelism(p, cmd, (int)arg1, (unsigned long)arg2, retval);
default:
return EINVAL;
}
}
#pragma mark - Workqueue Implementation
-#pragma mark workqueue lock
-static boolean_t workqueue_lock_spin_is_acquired_kdp(struct workqueue *wq) {
- return kdp_lck_spin_is_acquired(&wq->wq_lock);
-}
+#pragma mark wq_flags
-static void
-workqueue_lock_spin(struct workqueue *wq)
+static inline uint32_t
+_wq_flags(struct workqueue *wq)
{
- boolean_t interrupt_state = ml_set_interrupts_enabled(FALSE);
- lck_spin_lock(&wq->wq_lock);
- wq->wq_interrupt_state = interrupt_state;
+ return atomic_load_explicit(&wq->wq_flags, memory_order_relaxed);
}
-static void
-workqueue_unlock(struct workqueue *wq)
+static inline bool
+_wq_exiting(struct workqueue *wq)
{
- boolean_t interrupt_state = wq->wq_interrupt_state;
- lck_spin_unlock(&wq->wq_lock);
- ml_set_interrupts_enabled(interrupt_state);
+ return _wq_flags(wq) & WQ_EXITING;
}
-#pragma mark workqueue add timer
-
-/**
- * Sets up the timer which will call out to workqueue_add_timer
- */
-static void
-workqueue_interval_timer_start(struct workqueue *wq)
+static inline uint32_t
+_wq_flags_or_orig(struct workqueue *wq, uint32_t v)
{
- uint64_t deadline;
+#if PTHREAD_INLINE_RMW_ATOMICS
+ uint32_t state;
+ do {
+ state = _wq_flags(wq);
+ } while (!OSCompareAndSwap(state, state | v, &wq->wq_flags));
+ return state;
+#else
+ return atomic_fetch_or_explicit(&wq->wq_flags, v, memory_order_relaxed);
+#endif
+}
- /* n.b. wq_timer_interval is reset to 0 in workqueue_add_timer if the
- ATIMER_RUNNING flag is not present. The net effect here is that if a
- sequence of threads is required, we'll double the time before we give out
- the next one. */
- if (wq->wq_timer_interval == 0) {
- wq->wq_timer_interval = wq_stalled_window_usecs;
+static inline uint32_t
+_wq_flags_and_orig(struct workqueue *wq, uint32_t v)
+{
+#if PTHREAD_INLINE_RMW_ATOMICS
+ uint32_t state;
+ do {
+ state = _wq_flags(wq);
+ } while (!OSCompareAndSwap(state, state & v, &wq->wq_flags));
+ return state;
+#else
+ return atomic_fetch_and_explicit(&wq->wq_flags, v, memory_order_relaxed);
+#endif
+}
- } else {
- wq->wq_timer_interval = wq->wq_timer_interval * 2;
+static inline bool
+WQ_TIMER_DELAYED_NEEDED(struct workqueue *wq)
+{
+ uint32_t oldflags, newflags;
+ do {
+ oldflags = _wq_flags(wq);
+ if (oldflags & (WQ_EXITING | WQ_ATIMER_DELAYED_RUNNING)) {
+ return false;
+ }
+ newflags = oldflags | WQ_ATIMER_DELAYED_RUNNING;
+ } while (!OSCompareAndSwap(oldflags, newflags, &wq->wq_flags));
+ return true;
+}
- if (wq->wq_timer_interval > wq_max_timer_interval_usecs) {
- wq->wq_timer_interval = wq_max_timer_interval_usecs;
+static inline bool
+WQ_TIMER_IMMEDIATE_NEEDED(struct workqueue *wq)
+{
+ uint32_t oldflags, newflags;
+ do {
+ oldflags = _wq_flags(wq);
+ if (oldflags & (WQ_EXITING | WQ_ATIMER_IMMEDIATE_RUNNING)) {
+ return false;
}
- }
- clock_interval_to_deadline(wq->wq_timer_interval, 1000, &deadline);
+ newflags = oldflags | WQ_ATIMER_IMMEDIATE_RUNNING;
+ } while (!OSCompareAndSwap(oldflags, newflags, &wq->wq_flags));
+ return true;
+}
- PTHREAD_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount, wq->wq_flags, wq->wq_timer_interval, 0);
+#pragma mark thread requests pacing
- boolean_t ret = thread_call_enter1_delayed(wq->wq_atimer_delayed_call, wq->wq_atimer_delayed_call, deadline);
- if (ret) {
- panic("delayed_call was already enqueued");
- }
+static inline uint32_t
+_wq_pacing_shift_for_pri(int pri)
+{
+ return _wq_bucket_to_thread_qos(pri) - 1;
}
-/**
- * Immediately trigger the workqueue_add_timer
- */
-static void
-workqueue_interval_timer_trigger(struct workqueue *wq)
+static inline int
+_wq_highest_paced_priority(struct workqueue *wq)
{
- PTHREAD_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount, wq->wq_flags, 0, 0);
+ uint8_t paced = wq->wq_paced;
+ int msb = paced ? 32 - __builtin_clz(paced) : 0; // fls(paced) == bit + 1
+ return WORKQUEUE_EVENT_MANAGER_BUCKET - msb;
+}
- boolean_t ret = thread_call_enter1(wq->wq_atimer_immediate_call, wq->wq_atimer_immediate_call);
- if (ret) {
- panic("immediate_call was already enqueued");
- }
+static inline uint8_t
+_wq_pacing_bit_for_pri(int pri)
+{
+ return 1u << _wq_pacing_shift_for_pri(pri);
}
-/**
- * returns whether lastblocked_tsp is within wq_stalled_window_usecs of cur_ts
- */
-static boolean_t
-wq_thread_is_busy(uint64_t cur_ts, uint64_t *lastblocked_tsp)
+static inline bool
+_wq_should_pace_priority(struct workqueue *wq, int pri)
{
- clock_sec_t secs;
- clock_usec_t usecs;
- uint64_t lastblocked_ts;
- uint64_t elapsed;
+ return wq->wq_paced >= _wq_pacing_bit_for_pri(pri);
+}
- /*
- * the timestamp is updated atomically w/o holding the workqueue lock
- * so we need to do an atomic read of the 64 bits so that we don't see
- * a mismatched pair of 32 bit reads... we accomplish this in an architecturally
- * independent fashion by using OSCompareAndSwap64 to write back the
- * value we grabbed... if it succeeds, then we have a good timestamp to
- * evaluate... if it fails, we straddled grabbing the timestamp while it
- * was being updated... treat a failed update as a busy thread since
- * it implies we are about to see a really fresh timestamp anyway
- */
- lastblocked_ts = *lastblocked_tsp;
+static inline void
+_wq_pacing_start(struct workqueue *wq, struct threadlist *tl)
+{
+ uint8_t bit = _wq_pacing_bit_for_pri(tl->th_priority);
+ assert((tl->th_flags & TH_LIST_PACING) == 0);
+ assert((wq->wq_paced & bit) == 0);
+ wq->wq_paced |= bit;
+ tl->th_flags |= TH_LIST_PACING;
+}
- if ( !OSCompareAndSwap64((UInt64)lastblocked_ts, (UInt64)lastblocked_ts, lastblocked_tsp))
- return (TRUE);
+static inline bool
+_wq_pacing_end(struct workqueue *wq, struct threadlist *tl)
+{
+ if (tl->th_flags & TH_LIST_PACING) {
+ uint8_t bit = _wq_pacing_bit_for_pri(tl->th_priority);
+ assert((wq->wq_paced & bit) != 0);
+ wq->wq_paced ^= bit;
+ tl->th_flags &= ~TH_LIST_PACING;
+ return wq->wq_paced < bit; // !_wq_should_pace_priority
+ }
+ return false;
+}
- if (lastblocked_ts >= cur_ts) {
+#pragma mark thread requests
+
+static void
+_threadreq_init_alloced(struct threadreq *req, int priority, int flags)
+{
+ assert((flags & TR_FLAG_ONSTACK) == 0);
+ req->tr_state = TR_STATE_NEW;
+ req->tr_priority = priority;
+ req->tr_flags = flags;
+}
+
+static void
+_threadreq_init_stack(struct threadreq *req, int priority, int flags)
+{
+ req->tr_state = TR_STATE_NEW;
+ req->tr_priority = priority;
+ req->tr_flags = flags | TR_FLAG_ONSTACK;
+}
+
+static void
+_threadreq_copy_prepare(struct workqueue *wq)
+{
+again:
+ if (wq->wq_cached_threadreq) {
+ return;
+ }
+
+ workqueue_unlock(wq);
+ struct threadreq *req = zalloc(pthread_zone_threadreq);
+ workqueue_lock_spin(wq);
+
+ if (wq->wq_cached_threadreq) {
/*
- * because the update of the timestamp when a thread blocks isn't
- * serialized against us looking at it (i.e. we don't hold the workq lock)
- * it's possible to have a timestamp that matches the current time or
- * that even looks to be in the future relative to when we grabbed the current
- * time... just treat this as a busy thread since it must have just blocked.
+ * We lost the race and someone left behind an extra threadreq for us
+ * to use. Throw away our request and retry.
*/
- return (TRUE);
+ workqueue_unlock(wq);
+ zfree(pthread_zone_threadreq, req);
+ workqueue_lock_spin(wq);
+ goto again;
+ } else {
+ wq->wq_cached_threadreq = req;
}
- elapsed = cur_ts - lastblocked_ts;
- pthread_kern->absolutetime_to_microtime(elapsed, &secs, &usecs);
+ assert(wq->wq_cached_threadreq);
+}
- if (secs == 0 && usecs < wq_stalled_window_usecs)
- return (TRUE);
- return (FALSE);
+static bool
+_threadreq_copy_prepare_noblock(struct workqueue *wq)
+{
+ if (wq->wq_cached_threadreq) {
+ return true;
+ }
+
+ wq->wq_cached_threadreq = zalloc_noblock(pthread_zone_threadreq);
+
+ return wq->wq_cached_threadreq != NULL;
}
-static inline bool
-WQ_TIMER_DELAYED_NEEDED(struct workqueue *wq)
+static inline struct threadreq_head *
+_threadreq_list_for_req(struct workqueue *wq, const struct threadreq *req)
{
- int oldflags;
-retry:
- oldflags = wq->wq_flags;
- if ( !(oldflags & (WQ_EXITING | WQ_ATIMER_DELAYED_RUNNING))) {
- if (OSCompareAndSwap(oldflags, oldflags | WQ_ATIMER_DELAYED_RUNNING, (UInt32 *)&wq->wq_flags)) {
- return true;
- } else {
- goto retry;
+ if (req->tr_flags & TR_FLAG_OVERCOMMIT) {
+ return &wq->wq_overcommit_reqlist[req->tr_priority];
+ } else {
+ return &wq->wq_reqlist[req->tr_priority];
+ }
+}
+
+static void
+_threadreq_enqueue(struct workqueue *wq, struct threadreq *req)
+{
+ assert(req && req->tr_state == TR_STATE_NEW);
+ if (req->tr_priority == WORKQUEUE_EVENT_MANAGER_BUCKET) {
+ assert(wq->wq_event_manager_threadreq.tr_state != TR_STATE_WAITING);
+ memcpy(&wq->wq_event_manager_threadreq, req, sizeof(struct threadreq));
+ req = &wq->wq_event_manager_threadreq;
+ req->tr_flags &= ~(TR_FLAG_ONSTACK | TR_FLAG_NO_PACING);
+ } else {
+ if (req->tr_flags & TR_FLAG_ONSTACK) {
+ assert(wq->wq_cached_threadreq);
+ struct threadreq *newreq = wq->wq_cached_threadreq;
+ wq->wq_cached_threadreq = NULL;
+
+ memcpy(newreq, req, sizeof(struct threadreq));
+ newreq->tr_flags &= ~(TR_FLAG_ONSTACK | TR_FLAG_NO_PACING);
+ req->tr_state = TR_STATE_DEAD;
+ req = newreq;
}
+ TAILQ_INSERT_TAIL(_threadreq_list_for_req(wq, req), req, tr_entry);
}
- return false;
+ req->tr_state = TR_STATE_WAITING;
+ wq->wq_reqcount++;
}
-static inline bool
-WQ_TIMER_IMMEDIATE_NEEDED(struct workqueue *wq)
+static void
+_threadreq_dequeue(struct workqueue *wq, struct threadreq *req)
+{
+ if (req->tr_priority != WORKQUEUE_EVENT_MANAGER_BUCKET) {
+ struct threadreq_head *req_list = _threadreq_list_for_req(wq, req);
+#if DEBUG
+ struct threadreq *cursor = NULL;
+ TAILQ_FOREACH(cursor, req_list, tr_entry) {
+ if (cursor == req) break;
+ }
+ assert(cursor == req);
+#endif
+ TAILQ_REMOVE(req_list, req, tr_entry);
+ }
+ wq->wq_reqcount--;
+}
+
+/*
+ * Mark a thread request as complete. At this point, it is treated as owned by
+ * the submitting subsystem and you should assume it could be freed.
+ *
+ * Called with the workqueue lock held.
+ */
+static int
+_threadreq_complete_and_unlock(proc_t p, struct workqueue *wq,
+ struct threadreq *req, struct threadlist *tl)
{
- int oldflags;
-retry:
- oldflags = wq->wq_flags;
- if ( !(oldflags & (WQ_EXITING | WQ_ATIMER_IMMEDIATE_RUNNING))) {
- if (OSCompareAndSwap(oldflags, oldflags | WQ_ATIMER_IMMEDIATE_RUNNING, (UInt32 *)&wq->wq_flags)) {
- return true;
+ struct threadreq *req_tofree = NULL;
+ bool sync = (req->tr_state == TR_STATE_NEW);
+ bool workloop = req->tr_flags & TR_FLAG_WORKLOOP;
+ bool onstack = req->tr_flags & TR_FLAG_ONSTACK;
+ bool kevent = req->tr_flags & TR_FLAG_KEVENT;
+ bool unbinding = tl->th_flags & TH_LIST_UNBINDING;
+ bool locked = true;
+ bool waking_parked_thread = (tl->th_flags & TH_LIST_BUSY);
+ int ret;
+
+ req->tr_state = TR_STATE_COMPLETE;
+
+ if (!workloop && !onstack && req != &wq->wq_event_manager_threadreq) {
+ if (wq->wq_cached_threadreq) {
+ req_tofree = req;
} else {
- goto retry;
+ wq->wq_cached_threadreq = req;
}
}
- return false;
+
+ if (tl->th_flags & TH_LIST_UNBINDING) {
+ tl->th_flags &= ~TH_LIST_UNBINDING;
+ assert((tl->th_flags & TH_LIST_KEVENT_BOUND));
+ } else if (workloop || kevent) {
+ assert((tl->th_flags & TH_LIST_KEVENT_BOUND) == 0);
+ tl->th_flags |= TH_LIST_KEVENT_BOUND;
+ }
+
+ if (workloop) {
+ workqueue_unlock(wq);
+ ret = pthread_kern->workloop_fulfill_threadreq(wq->wq_proc, (void*)req,
+ tl->th_thread, sync ? WORKLOOP_FULFILL_THREADREQ_SYNC : 0);
+ assert(ret == 0);
+ locked = false;
+ } else if (kevent) {
+ unsigned int kevent_flags = KEVENT_FLAG_WORKQ;
+ if (sync) {
+ kevent_flags |= KEVENT_FLAG_SYNCHRONOUS_BIND;
+ }
+ if (tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET) {
+ kevent_flags |= KEVENT_FLAG_WORKQ_MANAGER;
+ }
+ workqueue_unlock(wq);
+ ret = kevent_qos_internal_bind(wq->wq_proc,
+ class_index_get_thread_qos(tl->th_priority), tl->th_thread,
+ kevent_flags);
+ if (ret != 0) {
+ workqueue_lock_spin(wq);
+ tl->th_flags &= ~TH_LIST_KEVENT_BOUND;
+ locked = true;
+ } else {
+ locked = false;
+ }
+ }
+
+ /*
+ * Run Thread, Run!
+ */
+ PTHREAD_TRACE_WQ(TRACE_wq_run_threadreq | DBG_FUNC_END, wq, 0, 0, 0, 0);
+ PTHREAD_TRACE_WQ_REQ(TRACE_wq_runitem | DBG_FUNC_START, wq, req, tl->th_priority,
+ thread_tid(current_thread()), thread_tid(tl->th_thread));
+
+ if (waking_parked_thread) {
+ if (!locked) {
+ workqueue_lock_spin(wq);
+ }
+ tl->th_flags &= ~(TH_LIST_BUSY);
+ if ((tl->th_flags & TH_LIST_REMOVING_VOUCHER) == 0) {
+ /*
+ * If the thread is in the process of removing its voucher, then it
+ * isn't actually in the wait event yet and we don't need to wake
+ * it up. Save the trouble (and potential lock-ordering issues
+ * (see 30617015)).
+ */
+ thread_wakeup_thread(tl, tl->th_thread);
+ }
+ workqueue_unlock(wq);
+
+ if (req_tofree) zfree(pthread_zone_threadreq, req_tofree);
+ return WQ_RUN_TR_THREAD_STARTED;
+ }
+
+ assert ((tl->th_flags & TH_LIST_PACING) == 0);
+ if (locked) {
+ workqueue_unlock(wq);
+ }
+ if (req_tofree) zfree(pthread_zone_threadreq, req_tofree);
+ if (unbinding) {
+ return WQ_RUN_TR_THREAD_STARTED;
+ }
+ _setup_wqthread(p, tl->th_thread, wq, tl, WQ_SETUP_CLEAR_VOUCHER);
+ pthread_kern->unix_syscall_return(EJUSTRETURN);
+ __builtin_unreachable();
}
+/*
+ * Mark a thread request as cancelled. Has similar ownership semantics to the
+ * complete call above.
+ */
+static void
+_threadreq_cancel(struct workqueue *wq, struct threadreq *req)
+{
+ assert(req->tr_state == TR_STATE_WAITING);
+ req->tr_state = TR_STATE_DEAD;
+
+ assert((req->tr_flags & TR_FLAG_ONSTACK) == 0);
+ if (req->tr_flags & TR_FLAG_WORKLOOP) {
+ __assert_only int ret;
+ ret = pthread_kern->workloop_fulfill_threadreq(wq->wq_proc, (void*)req,
+ THREAD_NULL, WORKLOOP_FULFILL_THREADREQ_CANCEL);
+ assert(ret == 0 || ret == ECANCELED);
+ } else if (req != &wq->wq_event_manager_threadreq) {
+ zfree(pthread_zone_threadreq, req);
+ }
+}
+
+#pragma mark workqueue lock
+
+static boolean_t workqueue_lock_spin_is_acquired_kdp(struct workqueue *wq) {
+ return kdp_lck_spin_is_acquired(&wq->wq_lock);
+}
+
+static void
+workqueue_lock_spin(struct workqueue *wq)
+{
+ assert(ml_get_interrupts_enabled() == TRUE);
+ lck_spin_lock(&wq->wq_lock);
+}
+
+static bool
+workqueue_lock_try(struct workqueue *wq)
+{
+ return lck_spin_try_lock(&wq->wq_lock);
+}
+
+static void
+workqueue_unlock(struct workqueue *wq)
+{
+ lck_spin_unlock(&wq->wq_lock);
+}
+
+#pragma mark workqueue add timer
+
/**
- * handler function for the timer
+ * Sets up the timer which will call out to workqueue_add_timer
*/
static void
-workqueue_add_timer(struct workqueue *wq, thread_call_t thread_call_self)
+workqueue_interval_timer_start(struct workqueue *wq)
{
- proc_t p;
- boolean_t start_timer = FALSE;
- boolean_t retval;
+ uint64_t deadline;
+
+ /* n.b. wq_timer_interval is reset to 0 in workqueue_add_timer if the
+ ATIMER_RUNNING flag is not present. The net effect here is that if a
+ sequence of threads is required, we'll double the time before we give out
+ the next one. */
+ if (wq->wq_timer_interval == 0) {
+ wq->wq_timer_interval = wq_stalled_window_usecs;
+
+ } else {
+ wq->wq_timer_interval = wq->wq_timer_interval * 2;
+
+ if (wq->wq_timer_interval > wq_max_timer_interval_usecs) {
+ wq->wq_timer_interval = wq_max_timer_interval_usecs;
+ }
+ }
+ clock_interval_to_deadline(wq->wq_timer_interval, 1000, &deadline);
- PTHREAD_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_START, wq, wq->wq_flags, wq->wq_nthreads, wq->wq_thidlecount, 0);
+ PTHREAD_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
+ _wq_flags(wq), wq->wq_timer_interval, 0);
- p = wq->wq_proc;
+ thread_call_t call = wq->wq_atimer_delayed_call;
+ if (thread_call_enter1_delayed(call, call, deadline)) {
+ panic("delayed_call was already enqueued");
+ }
+}
+
+/**
+ * Immediately trigger the workqueue_add_timer
+ */
+static void
+workqueue_interval_timer_trigger(struct workqueue *wq)
+{
+ PTHREAD_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
+ _wq_flags(wq), 0, 0);
+
+ thread_call_t call = wq->wq_atimer_immediate_call;
+ if (thread_call_enter1(call, call)) {
+ panic("immediate_call was already enqueued");
+ }
+}
+
+/**
+ * returns whether lastblocked_tsp is within wq_stalled_window_usecs of cur_ts
+ */
+static boolean_t
+wq_thread_is_busy(uint64_t cur_ts, _Atomic uint64_t *lastblocked_tsp)
+{
+ clock_sec_t secs;
+ clock_usec_t usecs;
+ uint64_t lastblocked_ts;
+ uint64_t elapsed;
+
+ lastblocked_ts = atomic_load_explicit(lastblocked_tsp, memory_order_relaxed);
+ if (lastblocked_ts >= cur_ts) {
+ /*
+ * because the update of the timestamp when a thread blocks isn't
+ * serialized against us looking at it (i.e. we don't hold the workq lock)
+ * it's possible to have a timestamp that matches the current time or
+ * that even looks to be in the future relative to when we grabbed the current
+ * time... just treat this as a busy thread since it must have just blocked.
+ */
+ return (TRUE);
+ }
+ elapsed = cur_ts - lastblocked_ts;
+
+ pthread_kern->absolutetime_to_microtime(elapsed, &secs, &usecs);
+
+ return (secs == 0 && usecs < wq_stalled_window_usecs);
+}
+
+/**
+ * handler function for the timer
+ */
+static void
+workqueue_add_timer(struct workqueue *wq, thread_call_t thread_call_self)
+{
+ proc_t p = wq->wq_proc;
workqueue_lock_spin(wq);
+ PTHREAD_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_START, wq,
+ _wq_flags(wq), wq->wq_nthreads, wq->wq_thidlecount, 0);
+
/*
* There's two tricky issues here.
*
workqueue_lock_spin(wq);
}
+ /*
+ * Prevent _workqueue_mark_exiting() from going away
+ */
wq->wq_lflags |= WQL_ATIMER_BUSY;
/*
* Decide which timer we are and remove the RUNNING flag.
*/
if (thread_call_self == wq->wq_atimer_delayed_call) {
- if ((wq->wq_flags & WQ_ATIMER_DELAYED_RUNNING) == 0) {
- panic("workqueue_add_timer is the delayed timer but the delayed running flag isn't set");
+ uint64_t wq_flags = _wq_flags_and_orig(wq, ~WQ_ATIMER_DELAYED_RUNNING);
+ if ((wq_flags & WQ_ATIMER_DELAYED_RUNNING) == 0) {
+ panic("workqueue_add_timer(delayed) w/o WQ_ATIMER_DELAYED_RUNNING");
}
- WQ_UNSETFLAG(wq, WQ_ATIMER_DELAYED_RUNNING);
} else if (thread_call_self == wq->wq_atimer_immediate_call) {
- if ((wq->wq_flags & WQ_ATIMER_IMMEDIATE_RUNNING) == 0) {
- panic("workqueue_add_timer is the immediate timer but the immediate running flag isn't set");
+ uint64_t wq_flags = _wq_flags_and_orig(wq, ~WQ_ATIMER_IMMEDIATE_RUNNING);
+ if ((wq_flags & WQ_ATIMER_IMMEDIATE_RUNNING) == 0) {
+ panic("workqueue_add_timer(immediate) w/o WQ_ATIMER_IMMEDIATE_RUNNING");
}
- WQ_UNSETFLAG(wq, WQ_ATIMER_IMMEDIATE_RUNNING);
} else {
panic("workqueue_add_timer can't figure out which timer it is");
}
-again:
- retval = TRUE;
- if ( !(wq->wq_flags & WQ_EXITING)) {
- boolean_t add_thread = FALSE;
- /*
- * check to see if the stall frequency was beyond our tolerance
- * or we have work on the queue, but haven't scheduled any
- * new work within our acceptable time interval because
- * there were no idle threads left to schedule
- */
- if (wq->wq_reqcount) {
- uint32_t priclass = 0;
- uint32_t thactive_count = 0;
- uint64_t curtime = mach_absolute_time();
- uint64_t busycount = 0;
-
- if (wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] &&
- wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0){
- priclass = WORKQUEUE_EVENT_MANAGER_BUCKET;
- } else {
- for (priclass = 0; priclass < WORKQUEUE_NUM_BUCKETS; priclass++) {
- if (wq->wq_requests[priclass])
- break;
- }
- }
+ int ret = WQ_RUN_TR_THREAD_STARTED;
+ while (ret == WQ_RUN_TR_THREAD_STARTED && wq->wq_reqcount) {
+ ret = workqueue_run_threadreq_and_unlock(p, wq, NULL, NULL, true);
- if (priclass < WORKQUEUE_EVENT_MANAGER_BUCKET){
- /*
- * Compute a metric for many how many threads are active. We
- * find the highest priority request outstanding and then add up
- * the number of active threads in that and all higher-priority
- * buckets. We'll also add any "busy" threads which are not
- * active but blocked recently enough that we can't be sure
- * they've gone idle yet. We'll then compare this metric to our
- * max concurrency to decide whether to add a new thread.
- */
- for (uint32_t i = 0; i <= priclass; i++) {
- thactive_count += wq->wq_thactive_count[i];
-
- if (wq->wq_thscheduled_count[i] < wq->wq_thactive_count[i]) {
- if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i]))
- busycount++;
- }
- }
- }
-
- if (thactive_count + busycount < wq->wq_max_concurrency ||
- priclass == WORKQUEUE_EVENT_MANAGER_BUCKET) {
-
- if (wq->wq_thidlecount == 0) {
- /*
- * if we have no idle threads, try to add one
- */
- retval = workqueue_addnewthread(wq, priclass == WORKQUEUE_EVENT_MANAGER_BUCKET);
- }
- add_thread = TRUE;
- }
-
- if (wq->wq_reqcount) {
- /*
- * as long as we have threads to schedule, and we successfully
- * scheduled new work, keep trying
- */
- while (wq->wq_thidlecount && !(wq->wq_flags & WQ_EXITING)) {
- /*
- * workqueue_run_nextreq is responsible for
- * dropping the workqueue lock in all cases
- */
- retval = (workqueue_run_nextreq(p, wq, THREAD_NULL, RUN_NEXTREQ_ADD_TIMER, 0, false) != THREAD_NULL);
- workqueue_lock_spin(wq);
-
- if (retval == FALSE)
- break;
- }
- if ( !(wq->wq_flags & WQ_EXITING) && wq->wq_reqcount) {
-
- if (wq->wq_thidlecount == 0 && retval == TRUE && add_thread == TRUE)
- goto again;
-
- if (wq->wq_thidlecount == 0 || busycount) {
- start_timer = WQ_TIMER_DELAYED_NEEDED(wq);
- }
-
- PTHREAD_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_NONE, wq, wq->wq_reqcount, wq->wq_thidlecount, busycount, 0);
- }
- }
- }
+ workqueue_lock_spin(wq);
}
+ _threadreq_copy_prepare(wq);
/*
* If we called WQ_TIMER_NEEDED above, then this flag will be set if that
* call marked the timer running. If so, we let the timer interval grow.
* Otherwise, we reset it back to 0.
*/
- if (!(wq->wq_flags & WQ_ATIMER_DELAYED_RUNNING)) {
+ uint32_t wq_flags = _wq_flags(wq);
+ if (!(wq_flags & WQ_ATIMER_DELAYED_RUNNING)) {
wq->wq_timer_interval = 0;
}
wq->wq_lflags &= ~WQL_ATIMER_BUSY;
- if ((wq->wq_flags & WQ_EXITING) || (wq->wq_lflags & WQL_ATIMER_WAITING)) {
+ if ((wq_flags & WQ_EXITING) || (wq->wq_lflags & WQL_ATIMER_WAITING)) {
/*
- * wakeup the thread hung up in _workqueue_mark_exiting or workqueue_add_timer waiting for this timer
- * to finish getting out of the way
+ * wakeup the thread hung up in _workqueue_mark_exiting or
+ * workqueue_add_timer waiting for this timer to finish getting out of
+ * the way
*/
wq->wq_lflags &= ~WQL_ATIMER_WAITING;
wakeup(wq);
}
- PTHREAD_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_END, wq, start_timer, wq->wq_nthreads, wq->wq_thidlecount, 0);
+ PTHREAD_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_END, wq, 0, wq->wq_nthreads, wq->wq_thidlecount, 0);
workqueue_unlock(wq);
-
- if (start_timer == TRUE)
- workqueue_interval_timer_start(wq);
}
#pragma mark thread state tracking
void
_workqueue_thread_yielded(void)
{
- struct workqueue *wq;
- proc_t p;
-
- p = current_proc();
-
- if ((wq = pthread_kern->proc_get_wqptr(p)) == NULL || wq->wq_reqcount == 0)
- return;
-
- workqueue_lock_spin(wq);
-
- if (wq->wq_reqcount) {
- uint64_t curtime;
- uint64_t elapsed;
- clock_sec_t secs;
- clock_usec_t usecs;
-
- if (wq->wq_thread_yielded_count++ == 0)
- wq->wq_thread_yielded_timestamp = mach_absolute_time();
-
- if (wq->wq_thread_yielded_count < wq_yielded_threshold) {
- workqueue_unlock(wq);
- return;
- }
-
- PTHREAD_TRACE_WQ(TRACE_wq_thread_yielded | DBG_FUNC_START, wq, wq->wq_thread_yielded_count, wq->wq_reqcount, 0, 0);
-
- wq->wq_thread_yielded_count = 0;
-
- curtime = mach_absolute_time();
- elapsed = curtime - wq->wq_thread_yielded_timestamp;
- pthread_kern->absolutetime_to_microtime(elapsed, &secs, &usecs);
-
- if (secs == 0 && usecs < wq_yielded_window_usecs) {
-
- if (wq->wq_thidlecount == 0) {
- workqueue_addnewthread(wq, TRUE);
- /*
- * 'workqueue_addnewthread' drops the workqueue lock
- * when creating the new thread and then retakes it before
- * returning... this window allows other threads to process
- * requests, so we need to recheck for available work
- * if none found, we just return... the newly created thread
- * will eventually get used (if it hasn't already)...
- */
- if (wq->wq_reqcount == 0) {
- workqueue_unlock(wq);
- return;
- }
- }
- if (wq->wq_thidlecount) {
- (void)workqueue_run_nextreq(p, wq, THREAD_NULL, RUN_NEXTREQ_UNCONSTRAINED, 0, false);
- /*
- * workqueue_run_nextreq is responsible for
- * dropping the workqueue lock in all cases
- */
- PTHREAD_TRACE_WQ(TRACE_wq_thread_yielded | DBG_FUNC_END, wq, wq->wq_thread_yielded_count, wq->wq_reqcount, 1, 0);
-
- return;
- }
- }
- PTHREAD_TRACE_WQ(TRACE_wq_thread_yielded | DBG_FUNC_END, wq, wq->wq_thread_yielded_count, wq->wq_reqcount, 2, 0);
- }
- workqueue_unlock(wq);
}
static void
workqueue_callback(int type, thread_t thread)
{
- struct uthread *uth;
- struct threadlist *tl;
- struct workqueue *wq;
-
- uth = pthread_kern->get_bsdthread_info(thread);
- tl = pthread_kern->uthread_get_threadlist(uth);
- wq = tl->th_workq;
+ struct uthread *uth = pthread_kern->get_bsdthread_info(thread);
+ struct threadlist *tl = pthread_kern->uthread_get_threadlist(uth);
+ struct workqueue *wq = tl->th_workq;
+ uint32_t old_count, req_qos, qos = tl->th_priority;
+ wq_thactive_t old_thactive;
switch (type) {
case SCHED_CALL_BLOCK: {
- uint32_t old_activecount;
- boolean_t start_timer = FALSE;
-
- old_activecount = OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority]);
+ bool start_timer = false;
- /*
- * If we blocked and were at the requested concurrency previously, we may
- * need to spin up a new thread. Of course, if it's the event manager
- * then that's moot, so ignore that case.
- */
- if (old_activecount == wq->wq_reqconc[tl->th_priority] &&
- tl->th_priority != WORKQUEUE_EVENT_MANAGER_BUCKET) {
- uint64_t curtime;
- UInt64 *lastblocked_ptr;
+ old_thactive = _wq_thactive_dec(wq, tl->th_priority);
+ req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
+ old_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive,
+ qos, NULL, NULL);
+ if (old_count == wq_max_concurrency[tl->th_priority]) {
/*
- * the number of active threads at this priority
- * has fallen below the maximum number of concurrent
- * threads that we're allowed to run
+ * The number of active threads at this priority has fallen below
+ * the maximum number of concurrent threads that are allowed to run
+ *
+ * if we collide with another thread trying to update the
+ * last_blocked (really unlikely since another thread would have to
+ * get scheduled and then block after we start down this path), it's
+ * not a problem. Either timestamp is adequate, so no need to retry
*/
- lastblocked_ptr = (UInt64 *)&wq->wq_lastblocked_ts[tl->th_priority];
- curtime = mach_absolute_time();
+ atomic_store_explicit(&wq->wq_lastblocked_ts[qos],
+ mach_absolute_time(), memory_order_relaxed);
+ }
+ if (req_qos == WORKQUEUE_EVENT_MANAGER_BUCKET || qos > req_qos) {
/*
- * if we collide with another thread trying to update the last_blocked (really unlikely
- * since another thread would have to get scheduled and then block after we start down
- * this path), it's not a problem. Either timestamp is adequate, so no need to retry
+ * The blocking thread is at a lower QoS than the highest currently
+ * pending constrained request, nothing has to be redriven
*/
-
- OSCompareAndSwap64(*lastblocked_ptr, (UInt64)curtime, lastblocked_ptr);
-
- if (wq->wq_reqcount) {
- /*
- * We have work to do so start up the timer if it's not
- * running; it'll sort out whether we need to start another
- * thread
- */
- start_timer = WQ_TIMER_DELAYED_NEEDED(wq);
- }
-
- if (start_timer == TRUE) {
- workqueue_interval_timer_start(wq);
+ } else {
+ uint32_t max_busycount, old_req_count;
+ old_req_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive,
+ req_qos, NULL, &max_busycount);
+ /*
+ * If it is possible that may_start_constrained_thread had refused
+ * admission due to being over the max concurrency, we may need to
+ * spin up a new thread.
+ *
+ * We take into account the maximum number of busy threads
+ * that can affect may_start_constrained_thread as looking at the
+ * actual number may_start_constrained_thread will see is racy.
+ *
+ * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
+ * between NCPU (4) and NCPU - 2 (2) we need to redrive.
+ */
+ if (wq_max_concurrency[req_qos] <= old_req_count + max_busycount &&
+ old_req_count <= wq_max_concurrency[req_qos]) {
+ if (WQ_TIMER_DELAYED_NEEDED(wq)) {
+ start_timer = true;
+ workqueue_interval_timer_start(wq);
+ }
}
}
- PTHREAD_TRACE1_WQ(TRACE_wq_thread_block | DBG_FUNC_START, wq, old_activecount, tl->th_priority, start_timer, thread_tid(thread));
+
+ PTHREAD_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_START, wq,
+ old_count - 1, qos | (req_qos << 8),
+ wq->wq_reqcount << 1 | start_timer, 0);
break;
}
- case SCHED_CALL_UNBLOCK:
+ case SCHED_CALL_UNBLOCK: {
/*
* we cannot take the workqueue_lock here...
* an UNBLOCK can occur from a timer event which
* the thread lock for the thread being UNBLOCKED
* is also held
*/
- OSAddAtomic(1, &wq->wq_thactive_count[tl->th_priority]);
-
- PTHREAD_TRACE1_WQ(TRACE_wq_thread_block | DBG_FUNC_END, wq, wq->wq_threads_scheduled, tl->th_priority, 0, thread_tid(thread));
-
+ old_thactive = _wq_thactive_inc(wq, qos);
+ if (pthread_debug_tracing) {
+ req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
+ old_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive,
+ qos, NULL, NULL);
+ PTHREAD_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_END, wq,
+ old_count + 1, qos | (req_qos << 8),
+ wq->wq_threads_scheduled, 0);
+ }
break;
}
+ }
}
sched_call_t
(void)mach_vm_deallocate(wq->wq_map, tl->th_stackaddr, _workqueue_allocsize(wq));
}
(void)pthread_kern->mach_port_deallocate(pthread_kern->task_get_ipcspace(wq->wq_task), tl->th_thport);
-
- } else {
-
- PTHREAD_TRACE1_WQ(TRACE_wq_thread_park | DBG_FUNC_END, wq, (uintptr_t)thread_tid(current_thread()), wq->wq_nthreads, 0xdead, thread_tid(tl->th_thread));
}
/*
* drop our ref on the thread
*/
thread_deallocate(tl->th_thread);
- kfree(tl, sizeof(struct threadlist));
+ zfree(pthread_zone_threadlist, tl);
}
* - dropped and retaken around thread creation
* - return with workq lock held
*/
-static boolean_t
-workqueue_addnewthread(struct workqueue *wq, boolean_t ignore_constrained_thread_limit)
+static bool
+workqueue_addnewthread(proc_t p, struct workqueue *wq)
{
- struct threadlist *tl;
- struct uthread *uth;
- kern_return_t kret;
- thread_t th;
- proc_t p;
- void *sright;
- mach_vm_offset_t stackaddr;
-
- if ((wq->wq_flags & WQ_EXITING) == WQ_EXITING) {
- PTHREAD_TRACE_WQ(TRACE_wq_thread_add_during_exit | DBG_FUNC_NONE, wq, 0, 0, 0, 0);
- return (FALSE);
- }
-
- if (wq->wq_nthreads >= wq_max_threads) {
- PTHREAD_TRACE_WQ(TRACE_wq_thread_limit_exceeded | DBG_FUNC_NONE, wq, wq->wq_nthreads, wq_max_threads, 0, 0);
- return (FALSE);
- }
-
- if (ignore_constrained_thread_limit == FALSE &&
- wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
- /*
- * If we're not creating this thread to service an overcommit or
- * event manager request, then we check to see if we are over our
- * constrained thread limit, in which case we error out.
- */
- PTHREAD_TRACE_WQ(TRACE_wq_thread_constrained_maxed | DBG_FUNC_NONE, wq, wq->wq_constrained_threads_scheduled,
- wq_max_constrained_threads, 0, 0);
- return (FALSE);
- }
+ kern_return_t kret;
wq->wq_nthreads++;
- p = wq->wq_proc;
workqueue_unlock(wq);
- tl = kalloc(sizeof(struct threadlist));
+ struct threadlist *tl = zalloc(pthread_zone_threadlist);
bzero(tl, sizeof(struct threadlist));
+ thread_t th;
kret = pthread_kern->thread_create_workq_waiting(wq->wq_task, wq_unpark_continue, tl, &th);
if (kret != KERN_SUCCESS) {
PTHREAD_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, kret, 0, 0, 0);
- kfree(tl, sizeof(struct threadlist));
- goto failed;
+ goto fail_free;
}
- stackaddr = pthread_kern->proc_get_stack_addr_hint(p);
+ mach_vm_offset_t stackaddr = pthread_kern->proc_get_stack_addr_hint(p);
mach_vm_size_t guardsize = vm_map_page_size(wq->wq_map);
mach_vm_size_t pthread_size =
VM_INHERIT_DEFAULT);
if (kret != KERN_SUCCESS) {
- PTHREAD_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, kret, 1, 0, 0);
-
kret = mach_vm_allocate(wq->wq_map,
&stackaddr, th_allocsize,
VM_MAKE_TAG(VM_MEMORY_STACK) | VM_FLAGS_ANYWHERE);
}
- if (kret == KERN_SUCCESS) {
- /*
- * The guard page is at the lowest address
- * The stack base is the highest address
- */
- kret = mach_vm_protect(wq->wq_map, stackaddr, guardsize, FALSE, VM_PROT_NONE);
- if (kret != KERN_SUCCESS) {
- (void) mach_vm_deallocate(wq->wq_map, stackaddr, th_allocsize);
- PTHREAD_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, kret, 2, 0, 0);
- }
- }
if (kret != KERN_SUCCESS) {
- (void) thread_terminate(th);
- thread_deallocate(th);
-
- kfree(tl, sizeof(struct threadlist));
- goto failed;
+ PTHREAD_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, kret, 1, 0, 0);
+ goto fail_terminate;
}
- thread_reference(th);
- pthread_kern->thread_set_tag(th, THREAD_TAG_PTHREAD | THREAD_TAG_WORKQUEUE);
+ /*
+ * The guard page is at the lowest address
+ * The stack base is the highest address
+ */
+ kret = mach_vm_protect(wq->wq_map, stackaddr, guardsize, FALSE, VM_PROT_NONE);
+ if (kret != KERN_SUCCESS) {
+ PTHREAD_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, kret, 2, 0, 0);
+ goto fail_vm_deallocate;
+ }
- sright = (void *)pthread_kern->convert_thread_to_port(th);
- tl->th_thport = pthread_kern->ipc_port_copyout_send(sright, pthread_kern->task_get_ipcspace(wq->wq_task));
+ pthread_kern->thread_set_tag(th, THREAD_TAG_PTHREAD | THREAD_TAG_WORKQUEUE);
pthread_kern->thread_static_param(th, TRUE);
- tl->th_flags = TH_LIST_INITED | TH_LIST_NEW;
+ /*
+ * convert_thread_to_port() consumes a reference
+ */
+ thread_reference(th);
+ void *sright = (void *)pthread_kern->convert_thread_to_port(th);
+ tl->th_thport = pthread_kern->ipc_port_copyout_send(sright,
+ pthread_kern->task_get_ipcspace(wq->wq_task));
+ tl->th_flags = TH_LIST_INITED | TH_LIST_NEW;
tl->th_thread = th;
tl->th_workq = wq;
tl->th_stackaddr = stackaddr;
tl->th_priority = WORKQUEUE_NUM_BUCKETS;
- uth = pthread_kern->get_bsdthread_info(tl->th_thread);
+ struct uthread *uth;
+ uth = pthread_kern->get_bsdthread_info(tl->th_thread);
+
+ workqueue_lock_spin(wq);
+
+ void *current_tl = pthread_kern->uthread_get_threadlist(uth);
+ if (current_tl == NULL) {
+ pthread_kern->uthread_set_threadlist(uth, tl);
+ TAILQ_INSERT_TAIL(&wq->wq_thidlelist, tl, th_entry);
+ wq->wq_thidlecount++;
+ } else if (current_tl == WQ_THREADLIST_EXITING_POISON) {
+ /*
+ * Failed thread creation race: The thread already woke up and has exited.
+ */
+ PTHREAD_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, kret, 3, 0, 0);
+ goto fail_unlock;
+ } else {
+ panic("Unexpected initial threadlist value");
+ }
+
+ PTHREAD_TRACE_WQ(TRACE_wq_thread_create | DBG_FUNC_NONE, wq, 0, 0, 0, 0);
- workqueue_lock_spin(wq);
+ return (TRUE);
- pthread_kern->uthread_set_threadlist(uth, tl);
- TAILQ_INSERT_TAIL(&wq->wq_thidlelist, tl, th_entry);
+fail_unlock:
+ workqueue_unlock(wq);
+ (void)pthread_kern->mach_port_deallocate(pthread_kern->task_get_ipcspace(wq->wq_task),
+ tl->th_thport);
- wq->wq_thidlecount++;
+fail_vm_deallocate:
+ (void) mach_vm_deallocate(wq->wq_map, stackaddr, th_allocsize);
- PTHREAD_TRACE_WQ(TRACE_wq_thread_create | DBG_FUNC_NONE, wq, 0, 0, 0, 0);
+fail_terminate:
+ if (pthread_kern->thread_will_park_or_terminate) {
+ pthread_kern->thread_will_park_or_terminate(th);
+ }
+ (void)thread_terminate(th);
+ thread_deallocate(th);
- return (TRUE);
+fail_free:
+ zfree(pthread_zone_threadlist, tl);
-failed:
workqueue_lock_spin(wq);
wq->wq_nthreads--;
_workq_open(struct proc *p, __unused int32_t *retval)
{
struct workqueue * wq;
- int wq_size;
char * ptr;
- uint32_t i;
uint32_t num_cpus;
int error = 0;
wq_init_constrained_limit = 0;
+ if (wq_max_threads > WQ_THACTIVE_BUCKET_HALF) {
+ wq_max_threads = WQ_THACTIVE_BUCKET_HALF;
+ }
if (wq_max_threads > pthread_kern->config_thread_max - 20) {
wq_max_threads = pthread_kern->config_thread_max - 20;
}
goto out;
}
- wq_size = sizeof(struct workqueue);
-
- ptr = (char *)kalloc(wq_size);
- bzero(ptr, wq_size);
+ ptr = (char *)zalloc(pthread_zone_workqueue);
+ bzero(ptr, sizeof(struct workqueue));
wq = (struct workqueue *)ptr;
- wq->wq_flags = WQ_LIST_INITED;
wq->wq_proc = p;
- wq->wq_max_concurrency = wq_max_concurrency;
wq->wq_task = current_task();
wq->wq_map = pthread_kern->current_map();
- for (i = 0; i < WORKQUEUE_NUM_BUCKETS; i++)
- wq->wq_reqconc[i] = (uint16_t)wq->wq_max_concurrency;
-
- // The event manager bucket is special, so its gets a concurrency of 1
- // though we shouldn't ever read this value for that bucket
- wq->wq_reqconc[WORKQUEUE_EVENT_MANAGER_BUCKET] = 1;
-
// Start the event manager at the priority hinted at by the policy engine
int mgr_priority_hint = pthread_kern->task_get_default_manager_qos(current_task());
wq->wq_event_manager_priority = (uint32_t)thread_qos_get_pthread_priority(mgr_priority_hint) | _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
TAILQ_INIT(&wq->wq_thrunlist);
TAILQ_INIT(&wq->wq_thidlelist);
+ for (int i = 0; i < WORKQUEUE_EVENT_MANAGER_BUCKET; i++) {
+ TAILQ_INIT(&wq->wq_overcommit_reqlist[i]);
+ TAILQ_INIT(&wq->wq_reqlist[i]);
+ }
wq->wq_atimer_delayed_call =
thread_call_allocate_with_priority((thread_call_func_t)workqueue_add_timer,
lck_spin_init(&wq->wq_lock, pthread_lck_grp, pthread_lck_attr);
+ wq->wq_cached_threadreq = zalloc(pthread_zone_threadreq);
+ *(wq_thactive_t *)&wq->wq_thactive =
+ (wq_thactive_t)WQ_THACTIVE_NO_PENDING_REQUEST <<
+ WQ_THACTIVE_QOS_SHIFT;
+
pthread_kern->proc_set_wqptr(p, wq);
}
_workqueue_mark_exiting(struct proc *p)
{
struct workqueue *wq = pthread_kern->proc_get_wqptr(p);
+ if (!wq) return;
- if (wq != NULL) {
-
- PTHREAD_TRACE_WQ(TRACE_wq_pthread_exit|DBG_FUNC_START, wq, 0, 0, 0, 0);
+ PTHREAD_TRACE_WQ(TRACE_wq_pthread_exit|DBG_FUNC_START, wq, 0, 0, 0, 0);
- workqueue_lock_spin(wq);
+ workqueue_lock_spin(wq);
- /*
- * We arm the add timer without holding the workqueue lock so we need
- * to synchronize with any running or soon to be running timers.
- *
- * Threads that intend to arm the timer atomically OR
- * WQ_ATIMER_{DELAYED,IMMEDIATE}_RUNNING into the wq_flags, only if
- * WQ_EXITING is not present. So, once we have set WQ_EXITING, we can
- * be sure that no new RUNNING flags will be set, but still need to
- * wait for the already running timers to complete.
- *
- * We always hold the workq lock when dropping WQ_ATIMER_RUNNING, so
- * the check for and sleep until clear is protected.
- */
- WQ_SETFLAG(wq, WQ_EXITING);
+ /*
+ * We arm the add timer without holding the workqueue lock so we need
+ * to synchronize with any running or soon to be running timers.
+ *
+ * Threads that intend to arm the timer atomically OR
+ * WQ_ATIMER_{DELAYED,IMMEDIATE}_RUNNING into the wq_flags, only if
+ * WQ_EXITING is not present. So, once we have set WQ_EXITING, we can
+ * be sure that no new RUNNING flags will be set, but still need to
+ * wait for the already running timers to complete.
+ *
+ * We always hold the workq lock when dropping WQ_ATIMER_RUNNING, so
+ * the check for and sleep until clear is protected.
+ */
+ uint64_t wq_flags = _wq_flags_or_orig(wq, WQ_EXITING);
- if (wq->wq_flags & WQ_ATIMER_DELAYED_RUNNING) {
- if (thread_call_cancel(wq->wq_atimer_delayed_call) == TRUE) {
- WQ_UNSETFLAG(wq, WQ_ATIMER_DELAYED_RUNNING);
- }
+ if (wq_flags & WQ_ATIMER_DELAYED_RUNNING) {
+ if (thread_call_cancel(wq->wq_atimer_delayed_call) == TRUE) {
+ wq_flags = _wq_flags_and_orig(wq, ~WQ_ATIMER_DELAYED_RUNNING);
}
- if (wq->wq_flags & WQ_ATIMER_IMMEDIATE_RUNNING) {
- if (thread_call_cancel(wq->wq_atimer_immediate_call) == TRUE) {
- WQ_UNSETFLAG(wq, WQ_ATIMER_IMMEDIATE_RUNNING);
- }
+ }
+ if (wq_flags & WQ_ATIMER_IMMEDIATE_RUNNING) {
+ if (thread_call_cancel(wq->wq_atimer_immediate_call) == TRUE) {
+ wq_flags = _wq_flags_and_orig(wq, ~WQ_ATIMER_IMMEDIATE_RUNNING);
}
- while (wq->wq_flags & (WQ_ATIMER_DELAYED_RUNNING | WQ_ATIMER_IMMEDIATE_RUNNING) ||
- (wq->wq_lflags & WQL_ATIMER_BUSY)) {
- assert_wait((caddr_t)wq, (THREAD_UNINT));
- workqueue_unlock(wq);
+ }
+ while ((_wq_flags(wq) & (WQ_ATIMER_DELAYED_RUNNING | WQ_ATIMER_IMMEDIATE_RUNNING)) ||
+ (wq->wq_lflags & WQL_ATIMER_BUSY)) {
+ assert_wait((caddr_t)wq, (THREAD_UNINT));
+ workqueue_unlock(wq);
- thread_block(THREAD_CONTINUE_NULL);
+ thread_block(THREAD_CONTINUE_NULL);
- workqueue_lock_spin(wq);
- }
- workqueue_unlock(wq);
+ workqueue_lock_spin(wq);
+ }
+
+ /*
+ * Save off pending requests, will complete/free them below after unlocking
+ */
+ TAILQ_HEAD(, threadreq) local_list = TAILQ_HEAD_INITIALIZER(local_list);
+
+ for (int i = 0; i < WORKQUEUE_EVENT_MANAGER_BUCKET; i++) {
+ TAILQ_CONCAT(&local_list, &wq->wq_overcommit_reqlist[i], tr_entry);
+ TAILQ_CONCAT(&local_list, &wq->wq_reqlist[i], tr_entry);
+ }
+
+ /*
+ * XXX: Can't deferred cancel the event manager request, so just smash it.
+ */
+ assert((wq->wq_event_manager_threadreq.tr_flags & TR_FLAG_WORKLOOP) == 0);
+ wq->wq_event_manager_threadreq.tr_state = TR_STATE_DEAD;
+
+ workqueue_unlock(wq);
- PTHREAD_TRACE(TRACE_wq_pthread_exit|DBG_FUNC_END, 0, 0, 0, 0, 0);
+ struct threadreq *tr, *tr_temp;
+ TAILQ_FOREACH_SAFE(tr, &local_list, tr_entry, tr_temp) {
+ _threadreq_cancel(wq, tr);
}
+ PTHREAD_TRACE(TRACE_wq_pthread_exit|DBG_FUNC_END, 0, 0, 0, 0, 0);
}
/*
struct workqueue * wq;
struct threadlist * tl, *tlist;
struct uthread *uth;
- size_t wq_size = sizeof(struct workqueue);
wq = pthread_kern->proc_get_wqptr(p);
if (wq != NULL) {
*/
thread_deallocate(tl->th_thread);
- kfree(tl, sizeof(struct threadlist));
+ zfree(pthread_zone_threadlist, tl);
}
TAILQ_FOREACH_SAFE(tl, &wq->wq_thidlelist, th_entry, tlist) {
assert((tl->th_flags & TH_LIST_RUNNING) == 0);
assert(tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET);
workqueue_removethread(tl, true, false);
}
+ if (wq->wq_cached_threadreq) {
+ zfree(pthread_zone_threadreq, wq->wq_cached_threadreq);
+ }
thread_call_free(wq->wq_atimer_delayed_call);
thread_call_free(wq->wq_atimer_immediate_call);
lck_spin_destroy(&wq->wq_lock, pthread_lck_grp);
- kfree(wq, wq_size);
+ for (int i = 0; i < WORKQUEUE_EVENT_MANAGER_BUCKET; i++) {
+ assert(TAILQ_EMPTY(&wq->wq_overcommit_reqlist[i]));
+ assert(TAILQ_EMPTY(&wq->wq_reqlist[i]));
+ }
+
+ zfree(pthread_zone_workqueue, wq);
PTHREAD_TRACE(TRACE_wq_workqueue_exit|DBG_FUNC_END, 0, 0, 0, 0, 0);
}
#pragma mark workqueue thread manipulation
+
/**
* Entry point for libdispatch to ask for threads
*/
-static int wqops_queue_reqthreads(struct proc *p, int reqcount, pthread_priority_t priority){
- struct workqueue *wq;
- boolean_t start_timer = FALSE;
-
- boolean_t overcommit = (_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) != 0;
- int class = pthread_priority_get_class_index(priority);
-
- boolean_t event_manager = (_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG) != 0;
- if (event_manager){
- class = WORKQUEUE_EVENT_MANAGER_BUCKET;
- }
+static int
+wqops_queue_reqthreads(struct proc *p, int reqcount,
+ pthread_priority_t priority)
+{
+ bool overcommit = _pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
+ bool event_manager = _pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
+ int class = event_manager ? WORKQUEUE_EVENT_MANAGER_BUCKET :
+ pthread_priority_get_class_index(priority);
- if ((reqcount <= 0) || (class < 0) || (class >= WORKQUEUE_NUM_BUCKETS) || (overcommit && event_manager)) {
+ if ((reqcount <= 0) || (class < 0) || (class >= WORKQUEUE_NUM_BUCKETS) ||
+ (overcommit && event_manager)) {
return EINVAL;
}
-
+ struct workqueue *wq;
if ((wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p)) == NULL) {
return EINVAL;
}
workqueue_lock_spin(wq);
-
- if (overcommit == 0 && event_manager == 0) {
- wq->wq_reqcount += reqcount;
- wq->wq_requests[class] += reqcount;
-
- PTHREAD_TRACE_WQ(TRACE_wq_req_threads | DBG_FUNC_NONE, wq, priority, wq->wq_requests[class], reqcount, 0);
-
- while (wq->wq_reqcount) {
- if (!workqueue_run_one(p, wq, overcommit, 0))
- break;
- }
- } else if (overcommit) {
- PTHREAD_TRACE_WQ(TRACE_wq_req_octhreads | DBG_FUNC_NONE, wq, priority, wq->wq_ocrequests[class], reqcount, 0);
-
- while (reqcount) {
- if (!workqueue_run_one(p, wq, overcommit, priority))
- break;
- reqcount--;
- }
- if (reqcount) {
- /*
- * We need to delay starting some of the overcommit requests.
- * We'll record the request here and as existing threads return to
- * the kernel, we'll notice the ocrequests and spin them back to
- * user space as the overcommit variety.
- */
- wq->wq_reqcount += reqcount;
- wq->wq_requests[class] += reqcount;
- wq->wq_ocrequests[class] += reqcount;
-
- PTHREAD_TRACE_WQ(TRACE_wq_delay_octhreads | DBG_FUNC_NONE, wq, priority, wq->wq_ocrequests[class], reqcount, 0);
-
- /*
- * If we delayed this thread coming up but we're not constrained
- * or at max threads then we need to start the timer so we don't
- * risk dropping this request on the floor.
- */
- if ((wq->wq_constrained_threads_scheduled < wq_max_constrained_threads) &&
- (wq->wq_nthreads < wq_max_threads)){
- start_timer = WQ_TIMER_DELAYED_NEEDED(wq);
- }
- }
- } else if (event_manager) {
- PTHREAD_TRACE_WQ(TRACE_wq_req_event_manager | DBG_FUNC_NONE, wq, wq->wq_event_manager_priority, wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET], wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET], 0);
+ _threadreq_copy_prepare(wq);
- if (wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0){
- wq->wq_reqcount += 1;
- wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] = 1;
- }
+ PTHREAD_TRACE_WQ(TRACE_wq_wqops_reqthreads | DBG_FUNC_NONE, wq, reqcount, priority, 0, 0);
- // We've recorded the request for an event manager thread above. We'll
- // let the timer pick it up as we would for a kernel callout. We can
- // do a direct add/wakeup when that support is added for the kevent path.
- if (wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0){
- start_timer = WQ_TIMER_DELAYED_NEEDED(wq);
- }
+ int tr_flags = 0;
+ if (overcommit) tr_flags |= TR_FLAG_OVERCOMMIT;
+ if (reqcount > 1) {
+ /*
+ * when libdispatch asks for more than one thread, it wants to achieve
+ * parallelism. Pacing would be detrimental to this ask, so treat
+ * these specially to not do the pacing admission check
+ */
+ tr_flags |= TR_FLAG_NO_PACING;
}
- if (start_timer) {
- workqueue_interval_timer_start(wq);
+ while (reqcount-- && !_wq_exiting(wq)) {
+ struct threadreq req;
+ _threadreq_init_stack(&req, class, tr_flags);
+
+ workqueue_run_threadreq_and_unlock(p, wq, NULL, &req, true);
+
+ workqueue_lock_spin(wq); /* reacquire */
+ _threadreq_copy_prepare(wq);
}
workqueue_unlock(wq);
*
* Currently count is ignored and we always return one thread per invocation.
*/
-thread_t _workq_reqthreads(struct proc *p, int requests_count, workq_reqthreads_req_t requests){
- thread_t th = THREAD_NULL;
- boolean_t do_thread_call = FALSE;
- boolean_t emergency_thread = FALSE;
- assert(requests_count > 0);
+static thread_t
+_workq_kevent_reqthreads(struct proc *p, pthread_priority_t priority,
+ bool no_emergency)
+{
+ int wq_run_tr = WQ_RUN_TR_THROTTLED;
+ bool emergency_thread = false;
+ struct threadreq req;
-#if DEBUG
- // Make sure that the requests array is sorted, highest priority first
- if (requests_count > 1){
- __assert_only qos_class_t priority = _pthread_priority_get_qos_newest(requests[0].priority);
- __assert_only unsigned long flags = ((_pthread_priority_get_flags(requests[0].priority) & (_PTHREAD_PRIORITY_OVERCOMMIT_FLAG|_PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) != 0);
- for (int i = 1; i < requests_count; i++){
- if (requests[i].count == 0) continue;
- __assert_only qos_class_t next_priority = _pthread_priority_get_qos_newest(requests[i].priority);
- __assert_only unsigned long next_flags = ((_pthread_priority_get_flags(requests[i].priority) & (_PTHREAD_PRIORITY_OVERCOMMIT_FLAG|_PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) != 0);
- if (next_flags != flags){
- flags = next_flags;
- priority = next_priority;
- } else {
- assert(next_priority <= priority);
- }
- }
- }
-#endif // DEBUG
struct workqueue *wq;
if ((wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p)) == NULL) {
return THREAD_NULL;
}
+ int class = pthread_priority_get_class_index(priority);
+
workqueue_lock_spin(wq);
+ bool has_threadreq = _threadreq_copy_prepare_noblock(wq);
- PTHREAD_TRACE_WQ(TRACE_wq_kevent_req_threads | DBG_FUNC_START, wq, requests_count, 0, 0, 0);
+ PTHREAD_TRACE_WQ_REQ(TRACE_wq_kevent_reqthreads | DBG_FUNC_NONE, wq, NULL, priority, 0, 0);
- // Look for overcommit or event-manager-only requests.
- boolean_t have_overcommit = FALSE;
- pthread_priority_t priority = 0;
- for (int i = 0; i < requests_count; i++){
- if (requests[i].count == 0)
- continue;
- priority = requests[i].priority;
- if (_pthread_priority_get_qos_newest(priority) == QOS_CLASS_UNSPECIFIED){
- priority |= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
- }
- if ((_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG) != 0){
- goto event_manager;
- }
- if ((_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) != 0){
- have_overcommit = TRUE;
- break;
- }
+ /*
+ * Skip straight to event manager if that's what was requested
+ */
+ if ((_pthread_priority_get_qos_newest(priority) == QOS_CLASS_UNSPECIFIED) ||
+ (_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)){
+ goto event_manager;
}
- if (have_overcommit){
- if (wq->wq_thidlecount){
- th = workqueue_run_nextreq(p, wq, THREAD_NULL, RUN_NEXTREQ_OVERCOMMIT_KEVENT, priority, true);
- if (th != THREAD_NULL){
- goto out;
- } else {
- workqueue_lock_spin(wq); // reacquire lock
- }
- }
+ bool will_pace = _wq_should_pace_priority(wq, class);
+ if ((wq->wq_thidlecount == 0 || will_pace) && has_threadreq == false) {
+ /*
+ * We'll need to persist the request and can't, so return the emergency
+ * thread instead, which has a persistent request object.
+ */
+ emergency_thread = true;
+ goto event_manager;
+ }
- int class = pthread_priority_get_class_index(priority);
- wq->wq_reqcount += 1;
- wq->wq_requests[class] += 1;
- wq->wq_kevent_ocrequests[class] += 1;
+ /*
+ * Handle overcommit requests
+ */
+ if ((_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) != 0){
+ _threadreq_init_stack(&req, class, TR_FLAG_KEVENT | TR_FLAG_OVERCOMMIT);
+ wq_run_tr = workqueue_run_threadreq_and_unlock(p, wq, NULL, &req, false);
+ goto done;
+ }
+
+ /*
+ * Handle constrained requests
+ */
+ boolean_t may_start = may_start_constrained_thread(wq, class, NULL, false);
+ if (may_start || no_emergency) {
+ _threadreq_init_stack(&req, class, TR_FLAG_KEVENT);
+ wq_run_tr = workqueue_run_threadreq_and_unlock(p, wq, NULL, &req, false);
+ goto done;
+ } else {
+ emergency_thread = true;
+ }
+
+
+event_manager:
+ _threadreq_init_stack(&req, WORKQUEUE_EVENT_MANAGER_BUCKET, TR_FLAG_KEVENT);
+ wq_run_tr = workqueue_run_threadreq_and_unlock(p, wq, NULL, &req, false);
- do_thread_call = WQ_TIMER_IMMEDIATE_NEEDED(wq);
- goto deferred;
+done:
+ if (wq_run_tr == WQ_RUN_TR_THREAD_NEEDED && WQ_TIMER_IMMEDIATE_NEEDED(wq)) {
+ workqueue_interval_timer_trigger(wq);
}
+ return emergency_thread ? (void*)-1 : 0;
+}
- // Having no overcommit requests, try to find any request that can start
- // There's no TOCTTOU since we hold the workqueue lock
- for (int i = 0; i < requests_count; i++){
- workq_reqthreads_req_t req = requests + i;
- priority = req->priority;
- int class = pthread_priority_get_class_index(priority);
+thread_t
+_workq_reqthreads(struct proc *p, __assert_only int requests_count,
+ workq_reqthreads_req_t request)
+{
+ assert(requests_count == 1);
- if (req->count == 0)
- continue;
+ pthread_priority_t priority = request->priority;
+ bool no_emergency = request->count & WORKQ_REQTHREADS_NOEMERGENCY;
- if (!may_start_constrained_thread(wq, class, WORKQUEUE_NUM_BUCKETS, NULL))
- continue;
+ return _workq_kevent_reqthreads(p, priority, no_emergency);
+}
- wq->wq_reqcount += 1;
- wq->wq_requests[class] += 1;
- wq->wq_kevent_requests[class] += 1;
- PTHREAD_TRACE_WQ(TRACE_wq_req_kevent_threads | DBG_FUNC_NONE, wq, priority, wq->wq_kevent_requests[class], 1, 0);
+int
+workq_kern_threadreq(struct proc *p, workq_threadreq_t _req,
+ enum workq_threadreq_type type, unsigned long priority, int flags)
+{
+ struct workqueue *wq;
+ int ret;
- if (wq->wq_thidlecount){
- th = workqueue_run_nextreq(p, wq, THREAD_NULL, RUN_NEXTREQ_DEFAULT_KEVENT, priority, true);
- goto out;
+ if ((wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p)) == NULL) {
+ return EINVAL;
+ }
+
+ switch (type) {
+ case WORKQ_THREADREQ_KEVENT: {
+ bool no_emergency = flags & WORKQ_THREADREQ_FLAG_NOEMERGENCY;
+ (void)_workq_kevent_reqthreads(p, priority, no_emergency);
+ return 0;
+ }
+ case WORKQ_THREADREQ_WORKLOOP:
+ case WORKQ_THREADREQ_WORKLOOP_NO_THREAD_CALL: {
+ struct threadreq *req = (struct threadreq *)_req;
+ int req_class = pthread_priority_get_class_index(priority);
+ int req_flags = TR_FLAG_WORKLOOP;
+ if ((_pthread_priority_get_flags(priority) &
+ _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) != 0){
+ req_flags |= TR_FLAG_OVERCOMMIT;
+ }
+
+ thread_t thread = current_thread();
+ struct threadlist *tl = util_get_thread_threadlist_entry(thread);
+
+ if (tl && tl != WQ_THREADLIST_EXITING_POISON &&
+ (tl->th_flags & TH_LIST_UNBINDING)) {
+ /*
+ * we're called back synchronously from the context of
+ * kevent_qos_internal_unbind from within wqops_thread_return()
+ * we can try to match up this thread with this request !
+ */
} else {
- do_thread_call = WQ_TIMER_IMMEDIATE_NEEDED(wq);
- goto deferred;
+ tl = NULL;
+ }
+
+ _threadreq_init_alloced(req, req_class, req_flags);
+ workqueue_lock_spin(wq);
+ PTHREAD_TRACE_WQ_REQ(TRACE_wq_kevent_reqthreads | DBG_FUNC_NONE, wq, req, priority, 1, 0);
+ ret = workqueue_run_threadreq_and_unlock(p, wq, tl, req, false);
+ if (ret == WQ_RUN_TR_EXITING) {
+ return ECANCELED;
+ }
+ if (ret == WQ_RUN_TR_THREAD_NEEDED) {
+ if (type == WORKQ_THREADREQ_WORKLOOP_NO_THREAD_CALL) {
+ return EAGAIN;
+ }
+ if (WQ_TIMER_IMMEDIATE_NEEDED(wq)) {
+ workqueue_interval_timer_trigger(wq);
+ }
+ }
+ return 0;
+ }
+ case WORKQ_THREADREQ_REDRIVE:
+ PTHREAD_TRACE_WQ_REQ(TRACE_wq_kevent_reqthreads | DBG_FUNC_NONE, wq, 0, 0, 4, 0);
+ workqueue_lock_spin(wq);
+ ret = workqueue_run_threadreq_and_unlock(p, wq, NULL, NULL, true);
+ if (ret == WQ_RUN_TR_EXITING) {
+ return ECANCELED;
}
+ return 0;
+ default:
+ return ENOTSUP;
}
+}
- // Okay, here's the fun case: we can't spin up any of the non-overcommit threads
- // that we've seen a request for, so we kick this over to the event manager thread
- emergency_thread = TRUE;
+int
+workq_kern_threadreq_modify(struct proc *p, workq_threadreq_t _req,
+ enum workq_threadreq_op operation, unsigned long arg1,
+ unsigned long __unused arg2)
+{
+ struct threadreq *req = (struct threadreq *)_req;
+ struct workqueue *wq;
+ int priclass, ret = 0, wq_tr_rc = WQ_RUN_TR_THROTTLED;
-event_manager:
- if (wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0){
- wq->wq_reqcount += 1;
- wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] = 1;
- PTHREAD_TRACE_WQ(TRACE_wq_req_event_manager | DBG_FUNC_NONE, wq, 0, wq->wq_kevent_requests[WORKQUEUE_EVENT_MANAGER_BUCKET], 1, 0);
- } else {
- PTHREAD_TRACE_WQ(TRACE_wq_req_event_manager | DBG_FUNC_NONE, wq, 0, wq->wq_kevent_requests[WORKQUEUE_EVENT_MANAGER_BUCKET], 0, 0);
+ if (req == NULL || (wq = pthread_kern->proc_get_wqptr(p)) == NULL) {
+ return EINVAL;
}
- wq->wq_kevent_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] = 1;
- if (wq->wq_thidlecount && wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0){
- th = workqueue_run_nextreq(p, wq, THREAD_NULL, RUN_NEXTREQ_EVENT_MANAGER, 0, true);
- assert(th != THREAD_NULL);
- goto out;
+ workqueue_lock_spin(wq);
+
+ if (_wq_exiting(wq)) {
+ ret = ECANCELED;
+ goto out_unlock;
}
- do_thread_call = WQ_TIMER_IMMEDIATE_NEEDED(wq);
-deferred:
- workqueue_unlock(wq);
+ /*
+ * Find/validate the referenced request structure
+ */
+ if (req->tr_state != TR_STATE_WAITING) {
+ ret = EINVAL;
+ goto out_unlock;
+ }
+ assert(req->tr_priority < WORKQUEUE_EVENT_MANAGER_BUCKET);
+ assert(req->tr_flags & TR_FLAG_WORKLOOP);
+
+ switch (operation) {
+ case WORKQ_THREADREQ_CHANGE_PRI:
+ case WORKQ_THREADREQ_CHANGE_PRI_NO_THREAD_CALL:
+ priclass = pthread_priority_get_class_index(arg1);
+ PTHREAD_TRACE_WQ_REQ(TRACE_wq_kevent_reqthreads | DBG_FUNC_NONE, wq, req, arg1, 2, 0);
+ if (req->tr_priority == priclass) {
+ goto out_unlock;
+ }
+ _threadreq_dequeue(wq, req);
+ req->tr_priority = priclass;
+ req->tr_state = TR_STATE_NEW; // what was old is new again
+ wq_tr_rc = workqueue_run_threadreq_and_unlock(p, wq, NULL, req, false);
+ goto out;
- if (do_thread_call == TRUE){
- workqueue_interval_timer_trigger(wq);
+ case WORKQ_THREADREQ_CANCEL:
+ PTHREAD_TRACE_WQ_REQ(TRACE_wq_kevent_reqthreads | DBG_FUNC_NONE, wq, req, 0, 3, 0);
+ _threadreq_dequeue(wq, req);
+ req->tr_state = TR_STATE_DEAD;
+ break;
+
+ default:
+ ret = ENOTSUP;
+ break;
}
+out_unlock:
+ workqueue_unlock(wq);
out:
- PTHREAD_TRACE_WQ(TRACE_wq_kevent_req_threads | DBG_FUNC_END, wq, do_thread_call, 0, 0, 0);
-
- return emergency_thread ? (void*)-1 : th;
+ if (wq_tr_rc == WQ_RUN_TR_THREAD_NEEDED) {
+ if (operation == WORKQ_THREADREQ_CHANGE_PRI_NO_THREAD_CALL) {
+ ret = EAGAIN;
+ } else if (WQ_TIMER_IMMEDIATE_NEEDED(wq)) {
+ workqueue_interval_timer_trigger(wq);
+ }
+ }
+ return ret;
}
-static int wqops_thread_return(struct proc *p){
+static int
+wqops_thread_return(struct proc *p, struct workqueue *wq)
+{
thread_t th = current_thread();
struct uthread *uth = pthread_kern->get_bsdthread_info(th);
struct threadlist *tl = pthread_kern->uthread_get_threadlist(uth);
pthread_kern->proc_unlock(p);
}
- struct workqueue *wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p);
if (wq == NULL || !tl) {
return EINVAL;
}
* lowered. Of course, now our understanding of the thread's QoS is wrong,
* so we'll adjust below.
*/
- int new_qos =
- pthread_kern->proc_usynch_thread_qos_squash_override_for_resource(th,
- THREAD_QOS_OVERRIDE_RESOURCE_WILDCARD,
- THREAD_QOS_OVERRIDE_TYPE_DISPATCH_ASYNCHRONOUS_OVERRIDE);
+ bool was_manager = (tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET);
+ int new_qos;
+
+ if (!was_manager) {
+ new_qos = pthread_kern->proc_usynch_thread_qos_squash_override_for_resource(th,
+ THREAD_QOS_OVERRIDE_RESOURCE_WILDCARD,
+ THREAD_QOS_OVERRIDE_TYPE_DISPATCH_ASYNCHRONOUS_OVERRIDE);
+ }
+
+ PTHREAD_TRACE_WQ(TRACE_wq_runitem | DBG_FUNC_END, wq, tl->th_priority, 0, 0, 0);
workqueue_lock_spin(wq);
if (tl->th_flags & TH_LIST_KEVENT_BOUND) {
unsigned int flags = KEVENT_FLAG_WORKQ;
- if (tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET) {
+ if (was_manager) {
flags |= KEVENT_FLAG_WORKQ_MANAGER;
}
+ tl->th_flags |= TH_LIST_UNBINDING;
workqueue_unlock(wq);
kevent_qos_internal_unbind(p, class_index_get_thread_qos(tl->th_priority), th, flags);
+ if (!(tl->th_flags & TH_LIST_UNBINDING)) {
+ _setup_wqthread(p, th, wq, tl, WQ_SETUP_CLEAR_VOUCHER);
+ pthread_kern->unix_syscall_return(EJUSTRETURN);
+ __builtin_unreachable();
+ }
workqueue_lock_spin(wq);
-
- tl->th_flags &= ~TH_LIST_KEVENT_BOUND;
+ tl->th_flags &= ~(TH_LIST_KEVENT_BOUND | TH_LIST_UNBINDING);
}
- /* Fix up counters from the squash operation. */
- uint8_t old_bucket = tl->th_priority;
- uint8_t new_bucket = thread_qos_get_class_index(new_qos);
-
- if (old_bucket != new_bucket) {
- OSAddAtomic(-1, &wq->wq_thactive_count[old_bucket]);
- OSAddAtomic(1, &wq->wq_thactive_count[new_bucket]);
+ if (!was_manager) {
+ /* Fix up counters from the squash operation. */
+ uint8_t old_bucket = tl->th_priority;
+ uint8_t new_bucket = thread_qos_get_class_index(new_qos);
- wq->wq_thscheduled_count[old_bucket]--;
- wq->wq_thscheduled_count[new_bucket]++;
+ if (old_bucket != new_bucket) {
+ _wq_thactive_move(wq, old_bucket, new_bucket);
+ wq->wq_thscheduled_count[old_bucket]--;
+ wq->wq_thscheduled_count[new_bucket]++;
- tl->th_priority = new_bucket;
+ PTHREAD_TRACE_WQ(TRACE_wq_thread_squash | DBG_FUNC_NONE, wq, tl->th_priority, new_bucket, 0, 0);
+ tl->th_priority = new_bucket;
+ PTHREAD_TRACE_WQ(TRACE_wq_override_reset | DBG_FUNC_END, tl->th_workq, new_qos, 0, 0, 0);
+ }
}
- PTHREAD_TRACE_WQ(TRACE_wq_override_reset | DBG_FUNC_END, tl->th_workq, new_qos, 0, 0, 0);
-
- PTHREAD_TRACE_WQ(TRACE_wq_runitem | DBG_FUNC_END, wq, 0, 0, 0, 0);
-
- (void)workqueue_run_nextreq(p, wq, th, RUN_NEXTREQ_DEFAULT, 0, false);
- /*
- * workqueue_run_nextreq is responsible for
- * dropping the workqueue lock in all cases
- */
+ workqueue_run_threadreq_and_unlock(p, wq, tl, NULL, false);
return 0;
}
int arg3,
int32_t *retval)
{
+ struct workqueue *wq;
int error = 0;
if (pthread_kern->proc_get_register(p) == 0) {
*/
pthread_priority_t pri = arg2;
- struct workqueue *wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p);
+ wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p);
if (wq == NULL) {
error = EINVAL;
break;
}
workqueue_lock_spin(wq);
if (pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG){
- // If userspace passes a scheduling priority, that takes precidence
- // over any QoS. (So, userspace should take care not to accidenatally
- // lower the priority this way.)
- uint32_t sched_pri = pri & (~_PTHREAD_PRIORITY_FLAGS_MASK);
+ /*
+ * If userspace passes a scheduling priority, that takes precidence
+ * over any QoS. (So, userspace should take care not to accidenatally
+ * lower the priority this way.)
+ */
+ uint32_t sched_pri = pri & _PTHREAD_PRIORITY_SCHED_PRI_MASK;
if (wq->wq_event_manager_priority & _PTHREAD_PRIORITY_SCHED_PRI_FLAG){
- wq->wq_event_manager_priority = MAX(sched_pri, wq->wq_event_manager_priority & (~_PTHREAD_PRIORITY_FLAGS_MASK))
+ wq->wq_event_manager_priority = MAX(sched_pri, wq->wq_event_manager_priority & _PTHREAD_PRIORITY_SCHED_PRI_MASK)
| _PTHREAD_PRIORITY_SCHED_PRI_FLAG | _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
} else {
wq->wq_event_manager_priority = sched_pri
break;
}
case WQOPS_THREAD_KEVENT_RETURN:
- if (item != 0) {
+ case WQOPS_THREAD_WORKLOOP_RETURN:
+ wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p);
+ PTHREAD_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_END, wq, options, 0, 0, 0);
+ if (item != 0 && arg2 != 0) {
int32_t kevent_retval;
- int ret = kevent_qos_internal(p, -1, item, arg2, item, arg2, NULL, NULL, KEVENT_FLAG_WORKQ | KEVENT_FLAG_IMMEDIATE | KEVENT_FLAG_ERROR_EVENTS, &kevent_retval);
- // We shouldn't be getting more errors out than events we put in, so
- // reusing the input buffer should always provide enough space. But,
- // the assert is commented out since we get errors in edge cases in the
- // process lifecycle.
+ int ret;
+ if (options == WQOPS_THREAD_KEVENT_RETURN) {
+ ret = kevent_qos_internal(p, -1, item, arg2, item, arg2, NULL, NULL,
+ KEVENT_FLAG_WORKQ | KEVENT_FLAG_IMMEDIATE | KEVENT_FLAG_ERROR_EVENTS,
+ &kevent_retval);
+ } else /* options == WQOPS_THREAD_WORKLOOP_RETURN */ {
+ kqueue_id_t kevent_id = -1;
+ ret = kevent_id_internal(p, &kevent_id, item, arg2, item, arg2,
+ NULL, NULL,
+ KEVENT_FLAG_WORKLOOP | KEVENT_FLAG_IMMEDIATE | KEVENT_FLAG_ERROR_EVENTS,
+ &kevent_retval);
+ }
+ /*
+ * We shouldn't be getting more errors out than events we put in, so
+ * reusing the input buffer should always provide enough space. But,
+ * the assert is commented out since we get errors in edge cases in the
+ * process lifecycle.
+ */
//assert(ret == KERN_SUCCESS && kevent_retval >= 0);
if (ret != KERN_SUCCESS){
error = ret;
break;
}
}
- // FALLTHRU
+ goto thread_return;
+
case WQOPS_THREAD_RETURN:
- error = wqops_thread_return(p);
+ wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p);
+ PTHREAD_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_END, wq, options, 0, 0, 0);
+ thread_return:
+ error = wqops_thread_return(p, wq);
// NOT REACHED except in case of error
assert(error);
break;
- default:
- error = EINVAL;
- break;
- }
- return (error);
-}
+ case WQOPS_SHOULD_NARROW: {
+ /*
+ * arg2 = priority to test
+ * arg3 = unused
+ */
+ pthread_priority_t priority = arg2;
+ thread_t th = current_thread();
+ struct threadlist *tl = util_get_thread_threadlist_entry(th);
-static boolean_t
-workqueue_run_one(proc_t p, struct workqueue *wq, boolean_t overcommit, pthread_priority_t priority)
-{
- boolean_t ran_one;
+ if (tl == NULL || (tl->th_flags & TH_LIST_CONSTRAINED) == 0) {
+ error = EINVAL;
+ break;
+ }
- if (wq->wq_thidlecount == 0) {
- if (overcommit == FALSE) {
- if (wq->wq_constrained_threads_scheduled < wq->wq_max_concurrency)
- workqueue_addnewthread(wq, overcommit);
- } else {
- workqueue_addnewthread(wq, overcommit);
+ int class = pthread_priority_get_class_index(priority);
+ wq = tl->th_workq;
+ workqueue_lock_spin(wq);
+ bool should_narrow = !may_start_constrained_thread(wq, class, tl, false);
+ workqueue_unlock(wq);
- if (wq->wq_thidlecount == 0)
- return (FALSE);
- }
+ *retval = should_narrow;
+ break;
+ }
+ default:
+ error = EINVAL;
+ break;
}
- ran_one = (workqueue_run_nextreq(p, wq, THREAD_NULL, overcommit ? RUN_NEXTREQ_OVERCOMMIT : RUN_NEXTREQ_DEFAULT, priority, false) != THREAD_NULL);
- /*
- * workqueue_run_nextreq is responsible for
- * dropping the workqueue lock in all cases
- */
- workqueue_lock_spin(wq);
- return (ran_one);
+ switch (options) {
+ case WQOPS_THREAD_KEVENT_RETURN:
+ case WQOPS_THREAD_WORKLOOP_RETURN:
+ case WQOPS_THREAD_RETURN:
+ PTHREAD_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_START, wq, options, 0, 0, 0);
+ break;
+ }
+ return (error);
}
/*
assert(thread == tl->th_thread);
assert(thread == current_thread());
+ PTHREAD_TRACE_WQ(TRACE_wq_thread_park | DBG_FUNC_START, wq, 0, 0, 0, 0);
+
uint32_t us_to_wait = 0;
TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry);
tl->th_flags &= ~TH_LIST_CONSTRAINED;
}
- OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority]);
+ _wq_thactive_dec(wq, tl->th_priority);
wq->wq_thscheduled_count[tl->th_priority]--;
wq->wq_threads_scheduled--;
uint32_t thidlecount = ++wq->wq_thidlecount;
*/
if (TAILQ_EMPTY(&wq->wq_thidlemgrlist) &&
tl->th_priority != WORKQUEUE_EVENT_MANAGER_BUCKET){
+ PTHREAD_TRACE_WQ(TRACE_wq_thread_reset_priority | DBG_FUNC_NONE,
+ wq, thread_tid(thread),
+ (tl->th_priority << 16) | WORKQUEUE_EVENT_MANAGER_BUCKET, 2, 0);
reset_priority(tl, pthread_priority_from_wq_class_index(wq, WORKQUEUE_EVENT_MANAGER_BUCKET));
tl->th_priority = WORKQUEUE_EVENT_MANAGER_BUCKET;
}
TAILQ_INSERT_HEAD(&wq->wq_thidlelist, tl, th_entry);
}
- PTHREAD_TRACE_WQ(TRACE_wq_thread_park | DBG_FUNC_START, wq,
- wq->wq_threads_scheduled, wq->wq_thidlecount, us_to_wait, 0);
-
/*
* When we remove the voucher from the thread, we may lose our importance
* causing us to get preempted, so we do this after putting the thread on
* to use this thread from e.g. the kevent call out to deliver a boosting
* message.
*/
+ tl->th_flags |= TH_LIST_REMOVING_VOUCHER;
workqueue_unlock(wq);
- kern_return_t kr = pthread_kern->thread_set_voucher_name(MACH_PORT_NULL);
+ if (pthread_kern->thread_will_park_or_terminate) {
+ pthread_kern->thread_will_park_or_terminate(tl->th_thread);
+ }
+ __assert_only kern_return_t kr;
+ kr = pthread_kern->thread_set_voucher_name(MACH_PORT_NULL);
assert(kr == KERN_SUCCESS);
workqueue_lock_spin(wq);
+ tl->th_flags &= ~(TH_LIST_REMOVING_VOUCHER);
if ((tl->th_flags & TH_LIST_RUNNING) == 0) {
if (thidlecount < 101) {
}
}
-static boolean_t may_start_constrained_thread(struct workqueue *wq, uint32_t at_priclass, uint32_t my_priclass, boolean_t *start_timer){
- if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
+static bool
+may_start_constrained_thread(struct workqueue *wq, uint32_t at_priclass,
+ struct threadlist *tl, bool may_start_timer)
+{
+ uint32_t req_qos = _wq_thactive_best_constrained_req_qos(wq);
+ wq_thactive_t thactive;
+
+ if (may_start_timer && at_priclass < req_qos) {
+ /*
+ * When called from workqueue_run_threadreq_and_unlock() pre-post newest
+ * higher priorities into the thactive state so that
+ * workqueue_callback() takes the right decision.
+ *
+ * If the admission check passes, workqueue_run_threadreq_and_unlock
+ * will reset this value before running the request.
+ */
+ thactive = _wq_thactive_set_best_constrained_req_qos(wq, req_qos,
+ at_priclass);
+#ifdef __LP64__
+ PTHREAD_TRACE_WQ(TRACE_wq_thactive_update, 1, (uint64_t)thactive,
+ (uint64_t)(thactive >> 64), 0, 0);
+#endif
+ } else {
+ thactive = _wq_thactive(wq);
+ }
+
+ uint32_t constrained_threads = wq->wq_constrained_threads_scheduled;
+ if (tl && (tl->th_flags & TH_LIST_CONSTRAINED)) {
+ /*
+ * don't count the current thread as scheduled
+ */
+ constrained_threads--;
+ }
+ if (constrained_threads >= wq_max_constrained_threads) {
+ PTHREAD_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 1,
+ wq->wq_constrained_threads_scheduled,
+ wq_max_constrained_threads, 0);
/*
* we need 1 or more constrained threads to return to the kernel before
* we can dispatch additional work
*/
- return FALSE;
+ return false;
}
- uint32_t busycount = 0;
- uint32_t thactive_count = wq->wq_thactive_count[at_priclass];
+ /*
+ * Compute a metric for many how many threads are active. We find the
+ * highest priority request outstanding and then add up the number of
+ * active threads in that and all higher-priority buckets. We'll also add
+ * any "busy" threads which are not active but blocked recently enough that
+ * we can't be sure they've gone idle yet. We'll then compare this metric
+ * to our max concurrency to decide whether to add a new thread.
+ */
- // Has our most recently blocked thread blocked recently enough that we
- // should still consider it busy?
- if (wq->wq_thscheduled_count[at_priclass] > wq->wq_thactive_count[at_priclass]) {
- if (wq_thread_is_busy(mach_absolute_time(), &wq->wq_lastblocked_ts[at_priclass])) {
- busycount++;
- }
- }
+ uint32_t busycount, thactive_count;
- if (my_priclass < WORKQUEUE_NUM_BUCKETS && my_priclass == at_priclass){
+ thactive_count = _wq_thactive_aggregate_downto_qos(wq, thactive,
+ at_priclass, &busycount, NULL);
+
+ if (tl && tl->th_priority <= at_priclass) {
/*
* don't count this thread as currently active
*/
+ assert(thactive_count > 0);
thactive_count--;
}
- if (thactive_count + busycount >= wq->wq_max_concurrency) {
- if (busycount && start_timer) {
- /*
- * we found at least 1 thread in the
- * 'busy' state... make sure we start
- * the timer because if they are the only
- * threads keeping us from scheduling
- * this work request, we won't get a callback
- * to kick off the timer... we need to
- * start it now...
- */
- *start_timer = WQ_TIMER_DELAYED_NEEDED(wq);
+ if (thactive_count + busycount < wq_max_concurrency[at_priclass]) {
+ PTHREAD_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 2,
+ thactive_count, busycount, 0);
+ return true;
+ } else {
+ PTHREAD_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 3,
+ thactive_count, busycount, 0);
+ }
+
+ if (busycount && may_start_timer) {
+ /*
+ * If this is called from the add timer, we won't have another timer
+ * fire when the thread exits the "busy" state, so rearm the timer.
+ */
+ if (WQ_TIMER_DELAYED_NEEDED(wq)) {
+ workqueue_interval_timer_start(wq);
}
-
- PTHREAD_TRACE_WQ(TRACE_wq_overcommitted|DBG_FUNC_NONE, wq, ((start_timer && *start_timer) ? 1 << _PTHREAD_PRIORITY_FLAGS_SHIFT : 0) | class_index_get_pthread_priority(at_priclass), thactive_count, busycount, 0);
-
- return FALSE;
}
- return TRUE;
+
+ return false;
}
static struct threadlist *
struct threadlist *tl = NULL;
- if (!TAILQ_EMPTY(&wq->wq_thidlemgrlist) &&
+ if (!TAILQ_EMPTY(&wq->wq_thidlemgrlist) &&
(priclass == WORKQUEUE_EVENT_MANAGER_BUCKET || TAILQ_EMPTY(&wq->wq_thidlelist))){
tl = TAILQ_FIRST(&wq->wq_thidlemgrlist);
TAILQ_REMOVE(&wq->wq_thidlemgrlist, tl, th_entry);
wq->wq_threads_scheduled++;
wq->wq_thscheduled_count[priclass]++;
- OSAddAtomic(1, &wq->wq_thactive_count[priclass]);
-
+ _wq_thactive_inc(wq, priclass);
return tl;
}
static pthread_priority_t
-pthread_priority_from_wq_class_index(struct workqueue *wq, int index){
+pthread_priority_from_wq_class_index(struct workqueue *wq, int index)
+{
if (index == WORKQUEUE_EVENT_MANAGER_BUCKET){
return wq->wq_event_manager_priority;
} else {
}
static void
-reset_priority(struct threadlist *tl, pthread_priority_t pri){
+reset_priority(struct threadlist *tl, pthread_priority_t pri)
+{
kern_return_t ret;
thread_t th = tl->th_thread;
}
}
+/*
+ * Picks the best request to run, and returns the best overcommit fallback
+ * if the best pick is non overcommit and risks failing its admission check.
+ */
+static struct threadreq *
+workqueue_best_threadreqs(struct workqueue *wq, struct threadlist *tl,
+ struct threadreq **fallback)
+{
+ struct threadreq *req, *best_req = NULL;
+ int priclass, prilimit;
+
+ if ((wq->wq_event_manager_threadreq.tr_state == TR_STATE_WAITING) &&
+ ((wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0) ||
+ (tl && tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET))) {
+ /*
+ * There's an event manager request and either:
+ * - no event manager currently running
+ * - we are re-using the event manager
+ */
+ req = &wq->wq_event_manager_threadreq;
+ PTHREAD_TRACE_WQ_REQ(TRACE_wq_run_threadreq_req_select | DBG_FUNC_NONE, wq, req, 1, 0, 0);
+ return req;
+ }
+
+ if (tl) {
+ prilimit = WORKQUEUE_EVENT_MANAGER_BUCKET;
+ } else {
+ prilimit = _wq_highest_paced_priority(wq);
+ }
+ for (priclass = 0; priclass < prilimit; priclass++) {
+ req = TAILQ_FIRST(&wq->wq_overcommit_reqlist[priclass]);
+ if (req) {
+ PTHREAD_TRACE_WQ_REQ(TRACE_wq_run_threadreq_req_select | DBG_FUNC_NONE, wq, req, 2, 0, 0);
+ if (best_req) {
+ *fallback = req;
+ } else {
+ best_req = req;
+ }
+ break;
+ }
+ if (!best_req) {
+ best_req = TAILQ_FIRST(&wq->wq_reqlist[priclass]);
+ if (best_req) {
+ PTHREAD_TRACE_WQ_REQ(TRACE_wq_run_threadreq_req_select | DBG_FUNC_NONE, wq, best_req, 3, 0, 0);
+ }
+ }
+ }
+ return best_req;
+}
+
/**
- * grabs a thread for a request
+ * Runs a thread request on a thread
*
- * - called with the workqueue lock held...
- * - responsible for dropping it in all cases
- * - if provided mode is for overcommit, doesn't consume a reqcount
+ * - if thread is THREAD_NULL, will find a thread and run the request there.
+ * Otherwise, the thread must be the current thread.
*
+ * - if req is NULL, will find the highest priority request and run that. If
+ * it is not NULL, it must be a threadreq object in state NEW. If it can not
+ * be run immediately, it will be enqueued and moved to state WAITING.
+ *
+ * Either way, the thread request object serviced will be moved to state
+ * PENDING and attached to the threadlist.
+ *
+ * Should be called with the workqueue lock held. Will drop it.
+ *
+ * WARNING: _workq_kevent_reqthreads needs to be able to preflight any
+ * admission checks in this function. If you are changing this function,
+ * keep that one up-to-date.
+ *
+ * - if parking_tl is non NULL, then the current thread is parking. This will
+ * try to reuse this thread for a request. If no match is found, it will be
+ * parked.
*/
-static thread_t
-workqueue_run_nextreq(proc_t p, struct workqueue *wq, thread_t thread,
- enum run_nextreq_mode mode, pthread_priority_t prio,
- bool kevent_bind_via_return)
+static int
+workqueue_run_threadreq_and_unlock(proc_t p, struct workqueue *wq,
+ struct threadlist *parking_tl, struct threadreq *req,
+ bool may_add_new_thread)
{
- thread_t th_to_run = THREAD_NULL;
- uint32_t upcall_flags = 0;
- uint32_t priclass;
- struct threadlist *tl = NULL;
- struct uthread *uth = NULL;
- boolean_t start_timer = FALSE;
+ struct threadreq *incoming_req = req;
- if (mode == RUN_NEXTREQ_ADD_TIMER) {
- mode = RUN_NEXTREQ_DEFAULT;
- }
+ struct threadlist *tl = parking_tl;
+ int rc = WQ_RUN_TR_THROTTLED;
- // valid modes to call this function with
- assert(mode == RUN_NEXTREQ_DEFAULT || mode == RUN_NEXTREQ_DEFAULT_KEVENT ||
- mode == RUN_NEXTREQ_OVERCOMMIT || mode == RUN_NEXTREQ_UNCONSTRAINED ||
- mode == RUN_NEXTREQ_EVENT_MANAGER || mode == RUN_NEXTREQ_OVERCOMMIT_KEVENT);
- // may only have a priority if in OVERCOMMIT or DEFAULT_KEVENT mode
- assert(mode == RUN_NEXTREQ_OVERCOMMIT || mode == RUN_NEXTREQ_OVERCOMMIT_KEVENT ||
- mode == RUN_NEXTREQ_DEFAULT_KEVENT || prio == 0);
- // thread == thread_null means "please spin up a new workqueue thread, we can't reuse this"
- // thread != thread_null is thread reuse, and must be the current thread
- assert(thread == THREAD_NULL || thread == current_thread());
+ assert(tl == NULL || tl->th_thread == current_thread());
+ assert(req == NULL || req->tr_state == TR_STATE_NEW);
+ assert(!may_add_new_thread || !tl);
- PTHREAD_TRACE_WQ(TRACE_wq_run_nextitem|DBG_FUNC_START, wq, thread_tid(thread), wq->wq_thidlecount, wq->wq_reqcount, 0);
+ PTHREAD_TRACE_WQ_REQ(TRACE_wq_run_threadreq | DBG_FUNC_START, wq, req,
+ tl ? thread_tid(tl->th_thread) : 0,
+ req ? (req->tr_priority << 16 | req->tr_flags) : 0, 0);
+
+ /*
+ * Special cases when provided an event manager request
+ */
+ if (req && req->tr_priority == WORKQUEUE_EVENT_MANAGER_BUCKET) {
+ // Clients must not rely on identity of event manager requests
+ assert(req->tr_flags & TR_FLAG_ONSTACK);
+ // You can't be both overcommit and event manager
+ assert((req->tr_flags & TR_FLAG_OVERCOMMIT) == 0);
+
+ /*
+ * We can only ever have one event manager request, so coalesce them if
+ * there's already one outstanding.
+ */
+ if (wq->wq_event_manager_threadreq.tr_state == TR_STATE_WAITING) {
+ PTHREAD_TRACE_WQ_REQ(TRACE_wq_run_threadreq_mgr_merge | DBG_FUNC_NONE, wq, req, 0, 0, 0);
+
+ struct threadreq *existing_req = &wq->wq_event_manager_threadreq;
+ if (req->tr_flags & TR_FLAG_KEVENT) {
+ existing_req->tr_flags |= TR_FLAG_KEVENT;
+ }
- if (thread != THREAD_NULL) {
- uth = pthread_kern->get_bsdthread_info(thread);
+ req = existing_req;
+ incoming_req = NULL;
+ }
- if ((tl = pthread_kern->uthread_get_threadlist(uth)) == NULL) {
- panic("wq thread with no threadlist");
+ if (wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] &&
+ (!tl || tl->th_priority != WORKQUEUE_EVENT_MANAGER_BUCKET)){
+ /*
+ * There can only be one event manager running at a time.
+ */
+ PTHREAD_TRACE_WQ(TRACE_wq_run_threadreq | DBG_FUNC_END, wq, 1, 0, 0, 0);
+ goto done;
}
}
- /*
- * from here until we drop the workq lock we can't be pre-empted since we
- * hold the lock in spin mode... this is important since we have to
- * independently update the priority that the thread is associated with and
- * the priorty based counters that "workqueue_callback" also changes and
- * bases decisions on.
- */
+again: // Start again after creating a thread
+
+ if (_wq_exiting(wq)) {
+ rc = WQ_RUN_TR_EXITING;
+ goto exiting;
+ }
/*
- * This giant monstrosity does three things:
- *
- * - adjusts the mode, if required
- * - selects the priclass that we'll be servicing
- * - sets any mode-specific upcall flags
- *
- * When possible special-cases should be handled here and converted into
- * non-special cases.
+ * Thread request selection and admission control
*/
- if (mode == RUN_NEXTREQ_OVERCOMMIT) {
- priclass = pthread_priority_get_class_index(prio);
- upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
- } else if (mode == RUN_NEXTREQ_OVERCOMMIT_KEVENT){
- priclass = pthread_priority_get_class_index(prio);
- upcall_flags |= WQ_FLAG_THREAD_KEVENT;
- } else if (mode == RUN_NEXTREQ_EVENT_MANAGER){
- assert(wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0);
- priclass = WORKQUEUE_EVENT_MANAGER_BUCKET;
- upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
- if (wq->wq_kevent_requests[WORKQUEUE_EVENT_MANAGER_BUCKET]){
- upcall_flags |= WQ_FLAG_THREAD_KEVENT;
- }
- } else if (wq->wq_reqcount == 0){
- // no work to do. we'll check again when new work arrives.
- goto done;
- } else if (mode == RUN_NEXTREQ_DEFAULT_KEVENT) {
- assert(kevent_bind_via_return);
-
- priclass = pthread_priority_get_class_index(prio);
- assert(priclass < WORKQUEUE_EVENT_MANAGER_BUCKET);
- assert(wq->wq_kevent_requests[priclass] > 0);
-
- upcall_flags |= WQ_FLAG_THREAD_KEVENT;
- mode = RUN_NEXTREQ_DEFAULT;
- } else if (wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] &&
- ((wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0) ||
- (thread != THREAD_NULL && tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET))){
- // There's an event manager request and either:
- // - no event manager currently running
- // - we are re-using the event manager
- mode = RUN_NEXTREQ_EVENT_MANAGER;
- priclass = WORKQUEUE_EVENT_MANAGER_BUCKET;
- upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
- if (wq->wq_kevent_requests[WORKQUEUE_EVENT_MANAGER_BUCKET]){
- upcall_flags |= WQ_FLAG_THREAD_KEVENT;
- }
- } else {
- // Find highest priority and check for special request types
- for (priclass = 0; priclass < WORKQUEUE_EVENT_MANAGER_BUCKET; priclass++) {
- if (wq->wq_requests[priclass])
- break;
- }
- if (priclass == WORKQUEUE_EVENT_MANAGER_BUCKET){
- // only request should have been event manager since it's not in a bucket,
- // but we weren't able to handle it since there's already an event manager running,
- // so we fell into this case
- assert(wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] == 1 &&
- wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 1 &&
- wq->wq_reqcount == 1);
+ struct threadreq *fallback = NULL;
+ if (req) {
+ if ((req->tr_flags & TR_FLAG_NO_PACING) == 0 &&
+ _wq_should_pace_priority(wq, req->tr_priority)) {
+ /*
+ * If a request fails the pacing admission check, then thread
+ * requests are redriven when the pacing thread is finally scheduled
+ * when it calls _wq_pacing_end() in wq_unpark_continue().
+ */
goto done;
}
-
- if (wq->wq_kevent_ocrequests[priclass]){
- mode = RUN_NEXTREQ_DEFERRED_OVERCOMMIT;
- upcall_flags |= WQ_FLAG_THREAD_KEVENT;
- upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
- } else if (wq->wq_ocrequests[priclass]){
- mode = RUN_NEXTREQ_DEFERRED_OVERCOMMIT;
- upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
- } else if (wq->wq_kevent_requests[priclass]){
- upcall_flags |= WQ_FLAG_THREAD_KEVENT;
- }
+ } else if (wq->wq_reqcount == 0) {
+ PTHREAD_TRACE_WQ(TRACE_wq_run_threadreq | DBG_FUNC_END, wq, 2, 0, 0, 0);
+ goto done;
+ } else if ((req = workqueue_best_threadreqs(wq, tl, &fallback)) == NULL) {
+ PTHREAD_TRACE_WQ(TRACE_wq_run_threadreq | DBG_FUNC_END, wq, 3, 0, 0, 0);
+ goto done;
}
- assert(mode != RUN_NEXTREQ_EVENT_MANAGER || priclass == WORKQUEUE_EVENT_MANAGER_BUCKET);
- assert(mode == RUN_NEXTREQ_EVENT_MANAGER || priclass != WORKQUEUE_EVENT_MANAGER_BUCKET);
-
- if (mode == RUN_NEXTREQ_DEFAULT /* non-overcommit */){
- uint32_t my_priclass = (thread != THREAD_NULL) ? tl->th_priority : WORKQUEUE_NUM_BUCKETS;
- if (may_start_constrained_thread(wq, priclass, my_priclass, &start_timer) == FALSE){
- // per policy, we won't start another constrained thread
- goto done;
+ if ((req->tr_flags & TR_FLAG_OVERCOMMIT) == 0 &&
+ (req->tr_priority < WORKQUEUE_EVENT_MANAGER_BUCKET)) {
+ if (!may_start_constrained_thread(wq, req->tr_priority, parking_tl, true)) {
+ if (!fallback) {
+ PTHREAD_TRACE_WQ(TRACE_wq_run_threadreq | DBG_FUNC_END, wq, 4, 0, 0, 0);
+ goto done;
+ }
+ assert(req->tr_state == TR_STATE_WAITING);
+ req = fallback;
}
}
- if (thread != THREAD_NULL) {
- /*
- * thread is non-NULL here when we return from userspace
- * in workq_kernreturn, rather than trying to find a thread
- * we pick up new work for this specific thread.
- */
- th_to_run = thread;
- upcall_flags |= WQ_FLAG_THREAD_REUSE;
- } else if (wq->wq_thidlecount == 0) {
+ /*
+ * Thread selection.
+ */
+ if (parking_tl) {
+ if (tl->th_priority != req->tr_priority) {
+ _wq_thactive_move(wq, tl->th_priority, req->tr_priority);
+ wq->wq_thscheduled_count[tl->th_priority]--;
+ wq->wq_thscheduled_count[req->tr_priority]++;
+ }
+ PTHREAD_TRACE_WQ(TRACE_wq_run_threadreq_thread_select | DBG_FUNC_NONE,
+ wq, 1, thread_tid(tl->th_thread), 0, 0);
+ } else if (wq->wq_thidlecount) {
+ tl = pop_from_thidlelist(wq, req->tr_priority);
/*
- * we have no additional threads waiting to pick up
- * work, however, there is additional work to do.
+ * This call will update wq_thscheduled_count and wq_thactive_count for
+ * the provided priority. It will not set the returned thread to that
+ * priority. This matches the behavior of the parking_tl clause above.
*/
- start_timer = WQ_TIMER_DELAYED_NEEDED(wq);
-
- PTHREAD_TRACE_WQ(TRACE_wq_stalled, wq, wq->wq_nthreads, start_timer, 0, 0);
-
- goto done;
- } else {
- // there is both work available and an idle thread, so activate a thread
- tl = pop_from_thidlelist(wq, priclass);
- th_to_run = tl->th_thread;
- }
-
- // Adjust counters and thread flags AKA consume the request
- // TODO: It would be lovely if OVERCOMMIT consumed reqcount
- switch (mode) {
- case RUN_NEXTREQ_DEFAULT:
- case RUN_NEXTREQ_DEFAULT_KEVENT: /* actually mapped to DEFAULT above */
- case RUN_NEXTREQ_ADD_TIMER: /* actually mapped to DEFAULT above */
- case RUN_NEXTREQ_UNCONSTRAINED:
- wq->wq_reqcount--;
- wq->wq_requests[priclass]--;
-
- if (mode == RUN_NEXTREQ_DEFAULT){
- if (!(tl->th_flags & TH_LIST_CONSTRAINED)) {
- wq->wq_constrained_threads_scheduled++;
- tl->th_flags |= TH_LIST_CONSTRAINED;
- }
- } else if (mode == RUN_NEXTREQ_UNCONSTRAINED){
- if (tl->th_flags & TH_LIST_CONSTRAINED) {
- wq->wq_constrained_threads_scheduled--;
- tl->th_flags &= ~TH_LIST_CONSTRAINED;
- }
- }
- if (upcall_flags & WQ_FLAG_THREAD_KEVENT){
- wq->wq_kevent_requests[priclass]--;
+ PTHREAD_TRACE_WQ(TRACE_wq_run_threadreq_thread_select | DBG_FUNC_NONE,
+ wq, 2, thread_tid(tl->th_thread), 0, 0);
+ } else /* no idle threads */ {
+ if (!may_add_new_thread || wq->wq_nthreads >= wq_max_threads) {
+ PTHREAD_TRACE_WQ(TRACE_wq_run_threadreq | DBG_FUNC_END, wq, 5,
+ may_add_new_thread, wq->wq_nthreads, 0);
+ if (wq->wq_nthreads < wq_max_threads) {
+ rc = WQ_RUN_TR_THREAD_NEEDED;
}
- break;
-
- case RUN_NEXTREQ_EVENT_MANAGER:
- wq->wq_reqcount--;
- wq->wq_requests[priclass]--;
+ goto done;
+ }
- if (tl->th_flags & TH_LIST_CONSTRAINED) {
- wq->wq_constrained_threads_scheduled--;
- tl->th_flags &= ~TH_LIST_CONSTRAINED;
- }
- if (upcall_flags & WQ_FLAG_THREAD_KEVENT){
- wq->wq_kevent_requests[priclass]--;
- }
- break;
+ bool added_thread = workqueue_addnewthread(p, wq);
+ /*
+ * workqueue_addnewthread will drop and re-take the lock, so we
+ * need to ensure we still have a cached request.
+ *
+ * It also means we have to pick a new request, since our old pick may
+ * not be valid anymore.
+ */
+ req = incoming_req;
+ if (req && (req->tr_flags & TR_FLAG_ONSTACK)) {
+ _threadreq_copy_prepare(wq);
+ }
- case RUN_NEXTREQ_DEFERRED_OVERCOMMIT:
- wq->wq_reqcount--;
- wq->wq_requests[priclass]--;
- if (upcall_flags & WQ_FLAG_THREAD_KEVENT){
- wq->wq_kevent_ocrequests[priclass]--;
- } else {
- wq->wq_ocrequests[priclass]--;
- }
- /* FALLTHROUGH */
- case RUN_NEXTREQ_OVERCOMMIT:
- case RUN_NEXTREQ_OVERCOMMIT_KEVENT:
- if (tl->th_flags & TH_LIST_CONSTRAINED) {
- wq->wq_constrained_threads_scheduled--;
- tl->th_flags &= ~TH_LIST_CONSTRAINED;
+ if (added_thread) {
+ PTHREAD_TRACE_WQ(TRACE_wq_run_threadreq_thread_select | DBG_FUNC_NONE,
+ wq, 3, 0, 0, 0);
+ goto again;
+ } else if (_wq_exiting(wq)) {
+ rc = WQ_RUN_TR_EXITING;
+ goto exiting;
+ } else {
+ PTHREAD_TRACE_WQ(TRACE_wq_run_threadreq | DBG_FUNC_END, wq, 6, 0, 0, 0);
+ /*
+ * Something caused thread creation to fail. Kick off the timer in
+ * the hope that it'll succeed next time.
+ */
+ if (WQ_TIMER_DELAYED_NEEDED(wq)) {
+ workqueue_interval_timer_start(wq);
}
- break;
+ goto done;
+ }
}
- // Confirm we've maintained our counter invariants
- assert(wq->wq_requests[priclass] < UINT16_MAX);
- assert(wq->wq_ocrequests[priclass] < UINT16_MAX);
- assert(wq->wq_kevent_requests[priclass] < UINT16_MAX);
- assert(wq->wq_kevent_ocrequests[priclass] < UINT16_MAX);
- assert(wq->wq_ocrequests[priclass] + wq->wq_kevent_requests[priclass] +
- wq->wq_kevent_ocrequests[priclass] <=
- wq->wq_requests[priclass]);
-
- assert((tl->th_flags & TH_LIST_KEVENT_BOUND) == 0);
- if (upcall_flags & WQ_FLAG_THREAD_KEVENT) {
- tl->th_flags |= TH_LIST_KEVENT;
+ /*
+ * Setup thread, mark request as complete and run with it.
+ */
+ if (req->tr_state == TR_STATE_WAITING) {
+ _threadreq_dequeue(wq, req);
+ }
+ if (tl->th_priority != req->tr_priority) {
+ PTHREAD_TRACE_WQ(TRACE_wq_thread_reset_priority | DBG_FUNC_NONE,
+ wq, thread_tid(tl->th_thread),
+ (tl->th_priority << 16) | req->tr_priority, 1, 0);
+ reset_priority(tl, pthread_priority_from_wq_class_index(wq, req->tr_priority));
+ tl->th_priority = (uint8_t)req->tr_priority;
+ }
+ if (req->tr_flags & TR_FLAG_OVERCOMMIT) {
+ if ((tl->th_flags & TH_LIST_CONSTRAINED) != 0) {
+ tl->th_flags &= ~TH_LIST_CONSTRAINED;
+ wq->wq_constrained_threads_scheduled--;
+ }
} else {
- tl->th_flags &= ~TH_LIST_KEVENT;
+ if ((tl->th_flags & TH_LIST_CONSTRAINED) == 0) {
+ tl->th_flags |= TH_LIST_CONSTRAINED;
+ wq->wq_constrained_threads_scheduled++;
+ }
}
- uint32_t orig_class = tl->th_priority;
- tl->th_priority = (uint8_t)priclass;
+ if (!parking_tl && !(req->tr_flags & TR_FLAG_NO_PACING)) {
+ _wq_pacing_start(wq, tl);
+ }
+ if ((req->tr_flags & TR_FLAG_OVERCOMMIT) == 0) {
+ uint32_t old_qos, new_qos;
- if ((thread != THREAD_NULL) && (orig_class != priclass)) {
/*
- * we need to adjust these counters based on this
- * thread's new disposition w/r to priority
+ * If we are scheduling a constrained thread request, we may need to
+ * update the best constrained qos in the thactive atomic state.
*/
- OSAddAtomic(-1, &wq->wq_thactive_count[orig_class]);
- OSAddAtomic(1, &wq->wq_thactive_count[priclass]);
-
- wq->wq_thscheduled_count[orig_class]--;
- wq->wq_thscheduled_count[priclass]++;
+ for (new_qos = 0; new_qos < WQ_THACTIVE_NO_PENDING_REQUEST; new_qos++) {
+ if (TAILQ_FIRST(&wq->wq_reqlist[new_qos]))
+ break;
+ }
+ old_qos = _wq_thactive_best_constrained_req_qos(wq);
+ if (old_qos != new_qos) {
+ wq_thactive_t v = _wq_thactive_set_best_constrained_req_qos(wq,
+ old_qos, new_qos);
+#ifdef __LP64__
+ PTHREAD_TRACE_WQ(TRACE_wq_thactive_update, 2, (uint64_t)v,
+ (uint64_t)(v >> 64), 0, 0);
+#else
+ PTHREAD_TRACE_WQ(TRACE_wq_thactive_update, 2, v, 0, 0, 0);
+#endif
+ }
}
- wq->wq_thread_yielded_count = 0;
-
- pthread_priority_t outgoing_priority = pthread_priority_from_wq_class_index(wq, tl->th_priority);
- PTHREAD_TRACE_WQ(TRACE_wq_reset_priority | DBG_FUNC_START, wq, thread_tid(tl->th_thread), outgoing_priority, 0, 0);
- reset_priority(tl, outgoing_priority);
- PTHREAD_TRACE_WQ(TRACE_wq_reset_priority | DBG_FUNC_END, wq, thread_tid(tl->th_thread), outgoing_priority, 0, 0);
-
- /*
- * persist upcall_flags so that in can be retrieved in setup_wqthread
- */
- tl->th_upcall_flags = upcall_flags >> WQ_FLAG_THREAD_PRIOSHIFT;
-
- /*
- * if current thread is reused for work request, does not return via unix_syscall
- */
- wq_runreq(p, th_to_run, wq, tl, (thread == th_to_run),
- (upcall_flags & WQ_FLAG_THREAD_KEVENT) && !kevent_bind_via_return);
-
- PTHREAD_TRACE_WQ(TRACE_wq_run_nextitem|DBG_FUNC_END, wq, thread_tid(th_to_run), mode == RUN_NEXTREQ_OVERCOMMIT, 1, 0);
-
- assert(!kevent_bind_via_return || (upcall_flags & WQ_FLAG_THREAD_KEVENT));
- if (kevent_bind_via_return && (upcall_flags & WQ_FLAG_THREAD_KEVENT)) {
- tl->th_flags |= TH_LIST_KEVENT_BOUND;
+ {
+ uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI;
+ if (req->tr_flags & TR_FLAG_OVERCOMMIT)
+ upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
+ if (req->tr_flags & TR_FLAG_KEVENT)
+ upcall_flags |= WQ_FLAG_THREAD_KEVENT;
+ if (req->tr_flags & TR_FLAG_WORKLOOP)
+ upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
+ if (tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET)
+ upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
+ tl->th_upcall_flags = upcall_flags >> WQ_FLAG_THREAD_PRIOSHIFT;
}
-
- workqueue_unlock(wq);
-
- return th_to_run;
+ if (req->tr_flags & TR_FLAG_KEVENT) {
+ tl->th_flags |= TH_LIST_KEVENT;
+ } else {
+ tl->th_flags &= ~TH_LIST_KEVENT;
+ }
+ return _threadreq_complete_and_unlock(p, wq, req, tl);
done:
- if (start_timer)
- workqueue_interval_timer_start(wq);
+ if (incoming_req) {
+ _threadreq_enqueue(wq, incoming_req);
+ }
- PTHREAD_TRACE_WQ(TRACE_wq_run_nextitem | DBG_FUNC_END, wq, thread_tid(thread), start_timer, 3, 0);
+exiting:
- if (thread != THREAD_NULL){
- parkit(wq, tl, thread);
- /* NOT REACHED */
+ if (parking_tl && !(parking_tl->th_flags & TH_LIST_UNBINDING)) {
+ parkit(wq, parking_tl, parking_tl->th_thread);
+ __builtin_unreachable();
}
workqueue_unlock(wq);
- return THREAD_NULL;
+ return rc;
}
/**
struct uthread *uth = pthread_kern->get_bsdthread_info(th);
if (uth == NULL) goto done;
- struct threadlist *tl = pthread_kern->uthread_get_threadlist(uth);
- if (tl == NULL) goto done;
-
- struct workqueue *wq = tl->th_workq;
+ struct workqueue *wq = pthread_kern->proc_get_wqptr(p);
+ if (wq == NULL) goto done;
workqueue_lock_spin(wq);
+ struct threadlist *tl = pthread_kern->uthread_get_threadlist(uth);
+ assert(tl != WQ_THREADLIST_EXITING_POISON);
+ if (tl == NULL) {
+ /*
+ * We woke up before addnewthread() was finished setting us up. Go
+ * ahead and exit, but before we do poison the threadlist variable so
+ * that addnewthread() doesn't think we are valid still.
+ */
+ pthread_kern->uthread_set_threadlist(uth, WQ_THREADLIST_EXITING_POISON);
+ workqueue_unlock(wq);
+ goto done;
+ }
+
assert(tl->th_flags & TH_LIST_INITED);
if ((tl->th_flags & TH_LIST_NEW)){
workqueue_unlock(wq);
thread_block(wq_unpark_continue);
- /* NOT REACHED */
+ __builtin_unreachable();
}
}
if ((tl->th_flags & TH_LIST_RUNNING) == 0) {
assert((tl->th_flags & TH_LIST_BUSY) == 0);
+ if (!first_use) {
+ PTHREAD_TRACE_WQ(TRACE_wq_thread_park | DBG_FUNC_END, wq, 0, 0, 0, 0);
+ }
/*
* We were set running, but not for the purposes of actually running.
* This could be because the timer elapsed. Or it could be because the
(tl->th_priority < qos_class_get_class_index(WQ_THREAD_CLEANUP_QOS) ||
(tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET))) {
// Reset the QoS to something low for the pthread cleanup
+ PTHREAD_TRACE_WQ(TRACE_wq_thread_reset_priority | DBG_FUNC_NONE,
+ wq, thread_tid(th),
+ (tl->th_priority << 16) | qos_class_get_class_index(WQ_THREAD_CLEANUP_QOS), 3, 0);
pthread_priority_t cleanup_pri = _pthread_priority_make_newest(WQ_THREAD_CLEANUP_QOS, 0, 0);
reset_priority(tl, cleanup_pri);
}
} else {
pthread_kern->unix_syscall_return(0);
}
- /* NOT REACHED */
+ __builtin_unreachable();
}
/*
}
return_to_user:
- workqueue_unlock(wq);
- _setup_wqthread(p, th, wq, tl, first_use);
+ if (!first_use) {
+ PTHREAD_TRACE_WQ(TRACE_wq_thread_park | DBG_FUNC_END, wq, 0, 0, 0, 0);
+ }
+ if (_wq_pacing_end(wq, tl) && wq->wq_reqcount) {
+ workqueue_run_threadreq_and_unlock(p, wq, NULL, NULL, true);
+ } else {
+ workqueue_unlock(wq);
+ }
+ _setup_wqthread(p, th, wq, tl, first_use ? WQ_SETUP_FIRST_USE : 0);
pthread_kern->thread_sched_call(th, workqueue_callback);
done:
if (first_use){
panic("Our attempt to return to userspace failed...");
}
-/* called with workqueue lock held */
-static void
-wq_runreq(proc_t p, thread_t th, struct workqueue *wq, struct threadlist *tl,
- boolean_t return_directly, boolean_t needs_kevent_bind)
-{
- PTHREAD_TRACE1_WQ(TRACE_wq_runitem | DBG_FUNC_START, tl->th_workq, 0, 0, thread_tid(current_thread()), thread_tid(th));
-
- unsigned int kevent_flags = KEVENT_FLAG_WORKQ;
- if (tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET) {
- kevent_flags |= KEVENT_FLAG_WORKQ_MANAGER;
- }
-
- if (return_directly) {
- if (needs_kevent_bind) {
- assert((tl->th_flags & TH_LIST_KEVENT_BOUND) == 0);
- tl->th_flags |= TH_LIST_KEVENT_BOUND;
- }
-
- workqueue_unlock(wq);
-
- if (needs_kevent_bind) {
- kevent_qos_internal_bind(p, class_index_get_thread_qos(tl->th_priority), th, kevent_flags);
- }
-
- /*
- * For preemption reasons, we want to reset the voucher as late as
- * possible, so we do it in two places:
- * - Just before parking (i.e. in parkit())
- * - Prior to doing the setup for the next workitem (i.e. here)
- *
- * Those two places are sufficient to ensure we always reset it before
- * it goes back out to user space, but be careful to not break that
- * guarantee.
- */
- kern_return_t kr = pthread_kern->thread_set_voucher_name(MACH_PORT_NULL);
- assert(kr == KERN_SUCCESS);
-
- _setup_wqthread(p, th, wq, tl, false);
-
- PTHREAD_TRACE_WQ(TRACE_wq_run_nextitem|DBG_FUNC_END, tl->th_workq, 0, 0, 4, 0);
-
- pthread_kern->unix_syscall_return(EJUSTRETURN);
- /* NOT REACHED */
- }
-
- if (needs_kevent_bind) {
- // Leave TH_LIST_BUSY set so that the thread can't beat us to calling kevent
- workqueue_unlock(wq);
- assert((tl->th_flags & TH_LIST_KEVENT_BOUND) == 0);
- kevent_qos_internal_bind(p, class_index_get_thread_qos(tl->th_priority), th, kevent_flags);
- tl->th_flags |= TH_LIST_KEVENT_BOUND;
- workqueue_lock_spin(wq);
- }
- tl->th_flags &= ~(TH_LIST_BUSY);
- thread_wakeup_thread(tl,th);
-}
-
-#define KEVENT_LIST_LEN 16 // WORKQ_KEVENT_EVENT_BUFFER_LEN
-#define KEVENT_DATA_SIZE (32 * 1024)
-
/**
* configures initial thread stack/registers to jump into:
* _pthread_wqthread(pthread_t self, mach_port_t kport, void *stackaddr, void *keventlist, int upcall_flags, int nkevents);
* When we are done the stack will look like:
* |-----------| th_stackaddr + th_allocsize
* |pthread_t | th_stackaddr + DEFAULT_STACKSIZE + guardsize + PTHREAD_STACK_OFFSET
- * |kevent list| optionally - at most KEVENT_LIST_LEN events
- * |kevent data| optionally - at most KEVENT_DATA_SIZE bytes
+ * |kevent list| optionally - at most WQ_KEVENT_LIST_LEN events
+ * |kevent data| optionally - at most WQ_KEVENT_DATA_SIZE bytes
* |stack gap | bottom aligned to 16 bytes, and at least as big as stack_gap_min
* | STACK |
* | ⇓ |
* |-----------| th_stackaddr
*/
void
-_setup_wqthread(proc_t p, thread_t th, struct workqueue *wq, struct threadlist *tl,
- bool first_use)
+_setup_wqthread(proc_t p, thread_t th, struct workqueue *wq,
+ struct threadlist *tl, int setup_flags)
{
int error;
- uint32_t upcall_flags;
+ if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
+ /*
+ * For preemption reasons, we want to reset the voucher as late as
+ * possible, so we do it in two places:
+ * - Just before parking (i.e. in parkit())
+ * - Prior to doing the setup for the next workitem (i.e. here)
+ *
+ * Those two places are sufficient to ensure we always reset it before
+ * it goes back out to user space, but be careful to not break that
+ * guarantee.
+ */
+ __assert_only kern_return_t kr;
+ kr = pthread_kern->thread_set_voucher_name(MACH_PORT_NULL);
+ assert(kr == KERN_SUCCESS);
+ }
+
+ uint32_t upcall_flags = tl->th_upcall_flags << WQ_FLAG_THREAD_PRIOSHIFT;
+ if (!(setup_flags & WQ_SETUP_FIRST_USE)) {
+ upcall_flags |= WQ_FLAG_THREAD_REUSE;
+ }
+ /*
+ * Put the QoS class value into the lower bits of the reuse_thread register, this is where
+ * the thread priority used to be stored anyway.
+ */
pthread_priority_t priority = pthread_priority_from_wq_class_index(wq, tl->th_priority);
+ upcall_flags |= (_pthread_priority_get_qos_newest(priority) & WQ_FLAG_THREAD_PRIOMASK);
const vm_size_t guardsize = vm_map_page_size(tl->th_workq->wq_map);
const vm_size_t stack_gap_min = (proc_is64bit(p) == 0) ? C_32_STK_ALIGN : C_64_REDZONE_LEN;
panic("workqueue thread start function pointer is NULL");
}
- /* Put the QoS class value into the lower bits of the reuse_thread register, this is where
- * the thread priority used to be stored anyway.
- */
- upcall_flags = tl->th_upcall_flags << WQ_FLAG_THREAD_PRIOSHIFT;
- upcall_flags |= (_pthread_priority_get_qos_newest(priority) & WQ_FLAG_THREAD_PRIOMASK);
-
- upcall_flags |= WQ_FLAG_THREAD_NEWSPI;
-
- uint32_t tsd_offset = pthread_kern->proc_get_pthread_tsd_offset(p);
- if (tsd_offset) {
- mach_vm_offset_t th_tsd_base = (mach_vm_offset_t)pthread_self_addr + tsd_offset;
- kern_return_t kret = pthread_kern->thread_set_tsd_base(th, th_tsd_base);
- if (kret == KERN_SUCCESS) {
- upcall_flags |= WQ_FLAG_THREAD_TSD_BASE_SET;
+ if (setup_flags & WQ_SETUP_FIRST_USE) {
+ uint32_t tsd_offset = pthread_kern->proc_get_pthread_tsd_offset(p);
+ if (tsd_offset) {
+ mach_vm_offset_t th_tsd_base = (mach_vm_offset_t)pthread_self_addr + tsd_offset;
+ kern_return_t kret = pthread_kern->thread_set_tsd_base(th, th_tsd_base);
+ if (kret == KERN_SUCCESS) {
+ upcall_flags |= WQ_FLAG_THREAD_TSD_BASE_SET;
+ }
}
- }
- if (first_use) {
/*
* Pre-fault the first page of the new thread's stack and the page that will
* contain the pthread_t structure.
VM_PROT_READ | VM_PROT_WRITE,
FALSE,
THREAD_UNINT, NULL, 0);
- } else {
- upcall_flags |= WQ_FLAG_THREAD_REUSE;
}
user_addr_t kevent_list = NULL;
int kevent_count = 0;
if (upcall_flags & WQ_FLAG_THREAD_KEVENT){
- kevent_list = pthread_self_addr - KEVENT_LIST_LEN * sizeof(struct kevent_qos_s);
- kevent_count = KEVENT_LIST_LEN;
+ bool workloop = upcall_flags & WQ_FLAG_THREAD_WORKLOOP;
+
+ kevent_list = pthread_self_addr - WQ_KEVENT_LIST_LEN * sizeof(struct kevent_qos_s);
+ kevent_count = WQ_KEVENT_LIST_LEN;
+
+ user_addr_t kevent_id_addr = kevent_list;
+ if (workloop) {
+ /*
+ * The kevent ID goes just below the kevent list. Sufficiently new
+ * userspace will know to look there. Old userspace will just
+ * ignore it.
+ */
+ kevent_id_addr -= sizeof(kqueue_id_t);
+ }
- user_addr_t kevent_data_buf = kevent_list - KEVENT_DATA_SIZE;
- user_size_t kevent_data_available = KEVENT_DATA_SIZE;
+ user_addr_t kevent_data_buf = kevent_id_addr - WQ_KEVENT_DATA_SIZE;
+ user_size_t kevent_data_available = WQ_KEVENT_DATA_SIZE;
int32_t events_out = 0;
assert(tl->th_flags | TH_LIST_KEVENT_BOUND);
- unsigned int flags = KEVENT_FLAG_WORKQ | KEVENT_FLAG_STACK_DATA | KEVENT_FLAG_IMMEDIATE;
+ unsigned int flags = KEVENT_FLAG_STACK_DATA | KEVENT_FLAG_IMMEDIATE;
if (tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET) {
flags |= KEVENT_FLAG_WORKQ_MANAGER;
}
- int ret = kevent_qos_internal(p, class_index_get_thread_qos(tl->th_priority), NULL, 0, kevent_list, kevent_count,
- kevent_data_buf, &kevent_data_available,
- flags, &events_out);
-
- // turns out there are a lot of edge cases where this will fail, so not enabled by default
- //assert((ret == KERN_SUCCESS && events_out != -1) || ret == KERN_ABORTED);
+ int ret = 0;
+ if (workloop) {
+ flags |= KEVENT_FLAG_WORKLOOP;
+ kqueue_id_t kevent_id = -1;
+ ret = kevent_id_internal(p, &kevent_id,
+ NULL, 0, kevent_list, kevent_count,
+ kevent_data_buf, &kevent_data_available,
+ flags, &events_out);
+ copyout(&kevent_id, kevent_id_addr, sizeof(kevent_id));
+ } else {
+ flags |= KEVENT_FLAG_WORKQ;
+ ret = kevent_qos_internal(p,
+ class_index_get_thread_qos(tl->th_priority),
+ NULL, 0, kevent_list, kevent_count,
+ kevent_data_buf, &kevent_data_available,
+ flags, &events_out);
+ }
- // squash any errors into just empty output on
+ // squash any errors into just empty output
if (ret != KERN_SUCCESS || events_out == -1){
events_out = 0;
- kevent_data_available = KEVENT_DATA_SIZE;
+ kevent_data_available = WQ_KEVENT_DATA_SIZE;
}
// We shouldn't get data out if there aren't events available
- assert(events_out != 0 || kevent_data_available == KEVENT_DATA_SIZE);
+ assert(events_out != 0 || kevent_data_available == WQ_KEVENT_DATA_SIZE);
if (events_out > 0){
- if (kevent_data_available == KEVENT_DATA_SIZE){
- stack_top_addr = (kevent_list - stack_gap_min) & -stack_align_min;
+ if (kevent_data_available == WQ_KEVENT_DATA_SIZE){
+ stack_top_addr = (kevent_id_addr - stack_gap_min) & -stack_align_min;
} else {
stack_top_addr = (kevent_data_buf + kevent_data_available - stack_gap_min) & -stack_align_min;
}
}
}
+ PTHREAD_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_START, wq, 0, 0, 0, 0);
+
#if defined(__i386__) || defined(__x86_64__)
if (proc_is64bit(p) == 0) {
x86_thread_state32_t state = {
if (error) return error;
_workq_reqthreads(req->p, (int)(req->newlen / sizeof(struct workq_reqthreads_req_s)), requests);
-
+
return 0;
}
#endif // DEBUG
-
+
#pragma mark - Misc
-int
+int
_fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
{
struct workqueue * wq;
int error = 0;
int activecount;
- uint32_t pri;
if ((wq = pthread_kern->proc_get_wqptr(p)) == NULL) {
return EINVAL;
}
- workqueue_lock_spin(wq);
- activecount = 0;
-
- for (pri = 0; pri < WORKQUEUE_NUM_BUCKETS; pri++) {
- activecount += wq->wq_thactive_count[pri];
+ /*
+ * This is sometimes called from interrupt context by the kperf sampler.
+ * In that case, it's not safe to spin trying to take the lock since we
+ * might already hold it. So, we just try-lock it and error out if it's
+ * already held. Since this is just a debugging aid, and all our callers
+ * are able to handle an error, that's fine.
+ */
+ bool locked = workqueue_lock_try(wq);
+ if (!locked) {
+ return EBUSY;
}
+
+ activecount = _wq_thactive_aggregate_downto_qos(wq, _wq_thactive(wq),
+ WORKQUEUE_NUM_BUCKETS - 1, NULL, NULL);
pwqinfo->pwq_nthreads = wq->wq_nthreads;
pwqinfo->pwq_runthreads = activecount;
pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
return pwq_state;
}
-int
+int
_thread_selfid(__unused struct proc *p, uint64_t *retval)
{
thread_t thread = current_thread();
{
pthread_lck_grp_attr = lck_grp_attr_alloc_init();
pthread_lck_grp = lck_grp_alloc_init("pthread", pthread_lck_grp_attr);
-
+
/*
* allocate the lock attribute for pthread synchronizers
*/
pthread_lck_attr = lck_attr_alloc_init();
pthread_list_mlock = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr);
-
+
pth_global_hashinit();
psynch_thcall = thread_call_allocate(psynch_wq_cleanup, NULL);
psynch_zoneinit();
+ pthread_zone_workqueue = zinit(sizeof(struct workqueue),
+ 1024 * sizeof(struct workqueue), 8192, "pthread.workqueue");
+ pthread_zone_threadlist = zinit(sizeof(struct threadlist),
+ 1024 * sizeof(struct threadlist), 8192, "pthread.threadlist");
+ pthread_zone_threadreq = zinit(sizeof(struct threadreq),
+ 1024 * sizeof(struct threadreq), 8192, "pthread.threadreq");
+
/*
* register sysctls
*/
- sysctl_register_oid(&sysctl__kern_wq_yielded_threshold);
- sysctl_register_oid(&sysctl__kern_wq_yielded_window_usecs);
sysctl_register_oid(&sysctl__kern_wq_stalled_window_usecs);
sysctl_register_oid(&sysctl__kern_wq_reduce_pool_window_usecs);
sysctl_register_oid(&sysctl__kern_wq_max_timer_interval_usecs);
sysctl_register_oid(&sysctl__kern_pthread_debug_tracing);
#if DEBUG
- sysctl_register_oid(&sysctl__kern_wq_max_concurrency);
sysctl_register_oid(&sysctl__debug_wq_kevent_test);
#endif
- wq_max_concurrency = pthread_kern->ml_get_max_cpus();
-
+ for (int i = 0; i < WORKQUEUE_NUM_BUCKETS; i++) {
+ uint32_t thread_qos = _wq_bucket_to_thread_qos(i);
+ wq_max_concurrency[i] = pthread_kern->qos_max_parallelism(thread_qos,
+ QOS_PARALLELISM_COUNT_LOGICAL);
+ }
+ wq_max_concurrency[WORKQUEUE_EVENT_MANAGER_BUCKET] = 1;
}