/*
- * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
+ * Copyright (c) 2000-2009 Apple Inc. All rights reserved.
*
* @APPLE_OSREFERENCE_LICENSE_HEADER_START@
*
#include <sys/pthread_internal.h>
#include <sys/vm.h>
#include <sys/user.h> /* for coredump */
+#include <sys/proc_info.h> /* for fill_procworkqueue */
#include <mach/mach_types.h>
#include <kern/sched_prim.h> /* for thread_exception_return */
#include <kern/processor.h>
#include <kern/affinity.h>
+#include <kern/assert.h>
#include <mach/mach_vm.h>
#include <mach/mach_param.h>
#include <mach/thread_status.h>
#include <mach/message.h>
#include <mach/port.h>
#include <vm/vm_protos.h>
-#include <vm/vm_map.h>` /* for current_map() */
+#include <vm/vm_map.h> /* for current_map() */
#include <mach/thread_act.h> /* for thread_resume */
#include <machine/machine_routines.h>
#if defined(__i386__)
lck_grp_attr_t *pthread_lck_grp_attr;
lck_grp_t *pthread_lck_grp;
lck_attr_t *pthread_lck_attr;
-lck_mtx_t * pthread_list_mlock;
-extern void pthread_init(void);
extern kern_return_t thread_getstatus(register thread_t act, int flavor,
thread_state_t tstate, mach_msg_type_number_t *count);
extern kern_return_t mach_port_deallocate(ipc_space_t, mach_port_name_t);
extern kern_return_t semaphore_signal_internal_trap(mach_port_name_t);
-static int workqueue_additem(struct workqueue *wq, int prio, user_addr_t item);
+extern void workqueue_thread_yielded(void);
+
+static int workqueue_additem(struct workqueue *wq, int prio, user_addr_t item, int affinity);
static int workqueue_removeitem(struct workqueue *wq, int prio, user_addr_t item);
-static void workqueue_run_nextitem(proc_t p, thread_t th);
+static boolean_t workqueue_run_nextitem(proc_t p, struct workqueue *wq, thread_t th,
+ user_addr_t oc_item, int oc_prio, int oc_affinity);
static void wq_runitem(proc_t p, user_addr_t item, thread_t th, struct threadlist *tl,
int reuse_thread, int wake_thread, int return_directly);
+static void wq_unpark_continue(void);
+static void wq_unsuspend_continue(void);
static int setup_wqthread(proc_t p, thread_t th, user_addr_t item, int reuse_thread, struct threadlist *tl);
-static int workqueue_addnewthread(struct workqueue *wq);
-static void workqueue_removethread(struct workqueue *wq);
-static void workqueue_lock(proc_t);
+static boolean_t workqueue_addnewthread(struct workqueue *wq);
+static void workqueue_removethread(struct threadlist *tl);
static void workqueue_lock_spin(proc_t);
static void workqueue_unlock(proc_t);
+int proc_settargetconc(pid_t pid, int queuenum, int32_t targetconc);
+int proc_setalltargetconc(pid_t pid, int32_t * targetconcp);
+
+#define WQ_MAXPRI_MIN 0 /* low prio queue num */
+#define WQ_MAXPRI_MAX 2 /* max prio queuenum */
+#define WQ_PRI_NUM 3 /* number of prio work queues */
#define C_32_STK_ALIGN 16
#define C_64_STK_ALIGN 16
#define PTHREAD_START_SETSCHED 0x02000000
#define PTHREAD_START_DETACHED 0x04000000
#define PTHREAD_START_POLICY_BITSHIFT 16
-#define PTHREAD_START_POLICY_MASK 0xffff
+#define PTHREAD_START_POLICY_MASK 0xff
#define PTHREAD_START_IMPORTANCE_MASK 0xffff
#define SCHED_OTHER POLICY_TIMESHARE
#define SCHED_FIFO POLICY_FIFO
#define SCHED_RR POLICY_RR
-void
-pthread_init(void)
-{
-
- pthread_lck_grp_attr = lck_grp_attr_alloc_init();
- pthread_lck_grp = lck_grp_alloc_init("pthread", pthread_lck_grp_attr);
-
- /*
- * allocate the lock attribute for pthread synchronizers
- */
- pthread_lck_attr = lck_attr_alloc_init();
-
- pthread_list_mlock = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr);
-
-}
-
-void
-pthread_list_lock(void)
-{
- lck_mtx_lock(pthread_list_mlock);
-}
-
-void
-pthread_list_unlock(void)
-{
- lck_mtx_unlock(pthread_list_mlock);
-}
-
-
-int
-__pthread_mutex_destroy(__unused struct proc *p, struct __pthread_mutex_destroy_args *uap, __unused register_t *retval)
-{
- int res;
- int mutexid = uap->mutexid;
- pthread_mutex_t * mutex;
- lck_mtx_t * lmtx;
- lck_mtx_t * lmtx1;
-
-
- mutex = pthread_id_to_mutex(mutexid);
- if (mutex == 0)
- return(EINVAL);
-
- MTX_LOCK(mutex->lock);
- if (mutex->sig == _PTHREAD_KERN_MUTEX_SIG)
- {
- if (mutex->owner == (thread_t)NULL &&
- mutex->refcount == 1)
- {
- mutex->sig = _PTHREAD_NO_SIG;
- lmtx = mutex->mutex;
- lmtx1 = mutex->lock;
- mutex->mutex = NULL;
- pthread_id_mutex_remove(mutexid);
- mutex->refcount --;
- MTX_UNLOCK(mutex->lock);
- lck_mtx_free(lmtx, pthread_lck_grp);
- lck_mtx_free(lmtx1, pthread_lck_grp);
- kfree((void *)mutex, sizeof(struct _pthread_mutex));
- return(0);
- }
- else
- res = EBUSY;
- }
- else
- res = EINVAL;
- MTX_UNLOCK(mutex->lock);
- pthread_mutex_release(mutex);
- return (res);
-}
-
-/*
- * Initialize a mutex variable, possibly with additional attributes.
- */
-static void
-pthread_mutex_init_internal(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
-{
- mutex->prioceiling = attr->prioceiling;
- mutex->protocol = attr->protocol;
- mutex->type = attr->type;
- mutex->pshared = attr->pshared;
- mutex->refcount = 0;
- mutex->owner = (thread_t)NULL;
- mutex->owner_proc = current_proc();
- mutex->sig = _PTHREAD_KERN_MUTEX_SIG;
- mutex->lock = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr);
- mutex->mutex = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr);
-}
-
-/*
- * Initialize a mutex variable, possibly with additional attributes.
- * Public interface - so don't trust the lock - initialize it first.
- */
-int
-__pthread_mutex_init(__unused struct proc *p, struct __pthread_mutex_init_args *uap, __unused register_t *retval)
-{
- user_addr_t umutex = uap->mutex;
- pthread_mutex_t * mutex;
- user_addr_t uattr = uap->attr;
- pthread_mutexattr_t attr;
- unsigned int addr = (unsigned int)((uintptr_t)uap->mutex);
- int pmutex_sig;
- int mutexid;
- int error = 0;
-
- if ((umutex == 0) || (uattr == 0))
- return(EINVAL);
-
- if ((error = copyin(uattr, &attr, sizeof(pthread_mutexattr_t))))
- return(error);
- if (attr.sig != _PTHREAD_MUTEX_ATTR_SIG)
- return (EINVAL);
-
- if ((error = copyin(umutex, &pmutex_sig, sizeof(int))))
- return(error);
-
- if (pmutex_sig == _PTHREAD_KERN_MUTEX_SIG)
- return(EBUSY);
- mutex = (pthread_mutex_t *)kalloc(sizeof(pthread_mutex_t));
-
- pthread_mutex_init_internal(mutex, &attr);
-
-
- addr += 8;
- mutexid = pthread_id_mutex_add(mutex);
- if (mutexid) {
- if ((error = copyout(&mutexid, ((user_addr_t)((uintptr_t)(addr))), 4)))
- goto cleanup;
- return(0);
- } else
- error = ENOMEM;
-cleanup:
- if(mutexid)
- pthread_id_mutex_remove(mutexid);
- lck_mtx_free(mutex->lock, pthread_lck_grp);
- lck_mtx_free(mutex->mutex, pthread_lck_grp);
- kfree(mutex, sizeof(struct _pthread_mutex));
- return(error);
-}
-
-/*
- * Lock a mutex.
- * TODO: Priority inheritance stuff
- */
-int
-__pthread_mutex_lock(struct proc *p, struct __pthread_mutex_lock_args *uap, __unused register_t *retval)
-{
- int mutexid = uap->mutexid;
- pthread_mutex_t * mutex;
- int error;
-
- mutex = pthread_id_to_mutex(mutexid);
- if (mutex == 0)
- return(EINVAL);
-
- MTX_LOCK(mutex->lock);
-
- if (mutex->sig != _PTHREAD_KERN_MUTEX_SIG)
- {
- error = EINVAL;
- goto out;
- }
-
- if ((p != mutex->owner_proc) && (mutex->pshared != PTHREAD_PROCESS_SHARED)) {
- error = EINVAL;
- goto out;
- }
-
- MTX_UNLOCK(mutex->lock);
-
- lck_mtx_lock(mutex->mutex);
-
- MTX_LOCK(mutex->lock);
- mutex->owner = current_thread();
- error = 0;
-out:
- MTX_UNLOCK(mutex->lock);
- pthread_mutex_release(mutex);
- return (error);
-}
-
-/*
- * Attempt to lock a mutex, but don't block if this isn't possible.
- */
-int
-__pthread_mutex_trylock(struct proc *p, struct __pthread_mutex_trylock_args *uap, __unused register_t *retval)
-{
- int mutexid = uap->mutexid;
- pthread_mutex_t * mutex;
- boolean_t state;
- int error;
-
- mutex = pthread_id_to_mutex(mutexid);
- if (mutex == 0)
- return(EINVAL);
-
- MTX_LOCK(mutex->lock);
-
- if (mutex->sig != _PTHREAD_KERN_MUTEX_SIG)
- {
- error = EINVAL;
- goto out;
- }
-
- if ((p != mutex->owner_proc) && (mutex->pshared != PTHREAD_PROCESS_SHARED)) {
- error = EINVAL;
- goto out;
- }
-
- MTX_UNLOCK(mutex->lock);
-
- state = lck_mtx_try_lock(mutex->mutex);
- if (state) {
- MTX_LOCK(mutex->lock);
- mutex->owner = current_thread();
- MTX_UNLOCK(mutex->lock);
- error = 0;
- } else
- error = EBUSY;
-
- pthread_mutex_release(mutex);
- return (error);
-out:
- MTX_UNLOCK(mutex->lock);
- pthread_mutex_release(mutex);
- return (error);
-}
-
-/*
- * Unlock a mutex.
- * TODO: Priority inheritance stuff
- */
-int
-__pthread_mutex_unlock(struct proc *p, struct __pthread_mutex_unlock_args *uap, __unused register_t *retval)
-{
- int mutexid = uap->mutexid;
- pthread_mutex_t * mutex;
- int error;
-
- mutex = pthread_id_to_mutex(mutexid);
- if (mutex == 0)
- return(EINVAL);
-
- MTX_LOCK(mutex->lock);
-
- if (mutex->sig != _PTHREAD_KERN_MUTEX_SIG)
- {
- error = EINVAL;
- goto out;
- }
-
- if ((p != mutex->owner_proc) && (mutex->pshared != PTHREAD_PROCESS_SHARED)) {
- error = EINVAL;
- goto out;
- }
-
- MTX_UNLOCK(mutex->lock);
-
- lck_mtx_unlock(mutex->mutex);
-
- MTX_LOCK(mutex->lock);
- mutex->owner = NULL;
- error = 0;
-out:
- MTX_UNLOCK(mutex->lock);
- pthread_mutex_release(mutex);
- return (error);
-}
-
-
-int
-__pthread_cond_init(__unused struct proc *p, struct __pthread_cond_init_args *uap, __unused register_t *retval)
-{
- pthread_cond_t * cond;
- pthread_condattr_t attr;
- user_addr_t ucond = uap->cond;
- user_addr_t uattr = uap->attr;
- unsigned int addr = (unsigned int)((uintptr_t)uap->cond);
- int condid, error, cond_sig;
- semaphore_t sem;
- kern_return_t kret;
- int value = 0;
-
- if ((ucond == 0) || (uattr == 0))
- return(EINVAL);
-
- if ((error = copyin(uattr, &attr, sizeof(pthread_condattr_t))))
- return(error);
-
- if (attr.sig != _PTHREAD_COND_ATTR_SIG)
- return (EINVAL);
-
- if ((error = copyin(ucond, &cond_sig, sizeof(int))))
- return(error);
-
- if (cond_sig == _PTHREAD_KERN_COND_SIG)
- return(EBUSY);
- kret = semaphore_create(kernel_task, &sem, SYNC_POLICY_FIFO, value);
- if (kret != KERN_SUCCESS)
- return(ENOMEM);
-
- cond = (pthread_cond_t *)kalloc(sizeof(pthread_cond_t));
-
- cond->lock = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr);
- cond->pshared = attr.pshared;
- cond->sig = _PTHREAD_KERN_COND_SIG;
- cond->sigpending = 0;
- cond->waiters = 0;
- cond->refcount = 0;
- cond->mutex = (pthread_mutex_t *)0;
- cond->owner_proc = current_proc();
- cond->sem = sem;
-
- addr += 8;
- condid = pthread_id_cond_add(cond);
- if (condid) {
- if ((error = copyout(&condid, ((user_addr_t)((uintptr_t)(addr))), 4)))
- goto cleanup;
- return(0);
- } else
- error = ENOMEM;
-cleanup:
- if(condid)
- pthread_id_cond_remove(condid);
- semaphore_destroy(kernel_task, cond->sem);
- kfree(cond, sizeof(pthread_cond_t));
- return(error);
-}
-
-
-/*
- * Destroy a condition variable.
- */
-int
-__pthread_cond_destroy(__unused struct proc *p, struct __pthread_cond_destroy_args *uap, __unused register_t *retval)
-{
- pthread_cond_t *cond;
- int condid = uap->condid;
- semaphore_t sem;
- lck_mtx_t * lmtx;
- int res;
-
- cond = pthread_id_to_cond(condid);
- if (cond == 0)
- return(EINVAL);
-
- COND_LOCK(cond->lock);
- if (cond->sig == _PTHREAD_KERN_COND_SIG)
- {
- if (cond->refcount == 1)
- {
- cond->sig = _PTHREAD_NO_SIG;
- sem = cond->sem;
- cond->sem = NULL;
- lmtx = cond->lock;
- pthread_id_cond_remove(condid);
- cond->refcount --;
- COND_UNLOCK(cond->lock);
- lck_mtx_free(lmtx, pthread_lck_grp);
- (void)semaphore_destroy(kernel_task, sem);
- kfree((void *)cond, sizeof(pthread_cond_t));
- return(0);
- }
- else
- res = EBUSY;
- }
- else
- res = EINVAL;
- COND_UNLOCK(cond->lock);
- pthread_cond_release(cond);
- return (res);
-}
-
-
-/*
- * Signal a condition variable, waking up all threads waiting for it.
- */
-int
-__pthread_cond_broadcast(__unused struct proc *p, struct __pthread_cond_broadcast_args *uap, __unused register_t *retval)
-{
- int condid = uap->condid;
- pthread_cond_t * cond;
- int error;
- kern_return_t kret;
-
- cond = pthread_id_to_cond(condid);
- if (cond == 0)
- return(EINVAL);
-
- COND_LOCK(cond->lock);
-
- if (cond->sig != _PTHREAD_KERN_COND_SIG)
- {
- error = EINVAL;
- goto out;
- }
-
- if ((p != cond->owner_proc) && (cond->pshared != PTHREAD_PROCESS_SHARED)) {
- error = EINVAL;
- goto out;
- }
-
- COND_UNLOCK(cond->lock);
-
- kret = semaphore_signal_all(cond->sem);
- switch (kret) {
- case KERN_INVALID_ADDRESS:
- case KERN_PROTECTION_FAILURE:
- error = EINVAL;
- break;
- case KERN_ABORTED:
- case KERN_OPERATION_TIMED_OUT:
- error = EINTR;
- break;
- case KERN_SUCCESS:
- error = 0;
- break;
- default:
- error = EINVAL;
- break;
- }
-
- COND_LOCK(cond->lock);
-out:
- COND_UNLOCK(cond->lock);
- pthread_cond_release(cond);
- return (error);
-}
-
-
-/*
- * Signal a condition variable, waking only one thread.
- */
-int
-__pthread_cond_signal(__unused struct proc *p, struct __pthread_cond_signal_args *uap, __unused register_t *retval)
-{
- int condid = uap->condid;
- pthread_cond_t * cond;
- int error;
- kern_return_t kret;
-
- cond = pthread_id_to_cond(condid);
- if (cond == 0)
- return(EINVAL);
-
- COND_LOCK(cond->lock);
-
- if (cond->sig != _PTHREAD_KERN_COND_SIG)
- {
- error = EINVAL;
- goto out;
- }
-
- if ((p != cond->owner_proc) && (cond->pshared != PTHREAD_PROCESS_SHARED)) {
- error = EINVAL;
- goto out;
- }
-
- COND_UNLOCK(cond->lock);
-
- kret = semaphore_signal(cond->sem);
- switch (kret) {
- case KERN_INVALID_ADDRESS:
- case KERN_PROTECTION_FAILURE:
- error = EINVAL;
- break;
- case KERN_ABORTED:
- case KERN_OPERATION_TIMED_OUT:
- error = EINTR;
- break;
- case KERN_SUCCESS:
- error = 0;
- break;
- default:
- error = EINVAL;
- break;
- }
-
- COND_LOCK(cond->lock);
-out:
- COND_UNLOCK(cond->lock);
- pthread_cond_release(cond);
- return (error);
-}
-
-
-int
-__pthread_cond_wait(__unused struct proc *p, struct __pthread_cond_wait_args *uap, __unused register_t *retval)
-{
- int condid = uap->condid;
- pthread_cond_t * cond;
- int mutexid = uap->mutexid;
- pthread_mutex_t * mutex;
- int error;
- kern_return_t kret;
-
- cond = pthread_id_to_cond(condid);
- if (cond == 0)
- return(EINVAL);
-
- mutex = pthread_id_to_mutex(mutexid);
- if (mutex == 0) {
- pthread_cond_release(cond);
- return(EINVAL);
- }
- COND_LOCK(cond->lock);
-
- if (cond->sig != _PTHREAD_KERN_COND_SIG)
- {
- error = EINVAL;
- goto out;
- }
-
- if ((p != cond->owner_proc) && (cond->pshared != PTHREAD_PROCESS_SHARED)) {
- error = EINVAL;
- goto out;
- }
-
- COND_UNLOCK(cond->lock);
-
- kret = semaphore_wait(cond->sem);
- switch (kret) {
- case KERN_INVALID_ADDRESS:
- case KERN_PROTECTION_FAILURE:
- error = EACCES;
- break;
- case KERN_ABORTED:
- case KERN_OPERATION_TIMED_OUT:
- error = EINTR;
- break;
- case KERN_SUCCESS:
- error = 0;
- break;
- default:
- error = EINVAL;
- break;
- }
-
- COND_LOCK(cond->lock);
-out:
- COND_UNLOCK(cond->lock);
- pthread_cond_release(cond);
- pthread_mutex_release(mutex);
- return (error);
-}
-
-int
-__pthread_cond_timedwait(__unused struct proc *p, struct __pthread_cond_timedwait_args *uap, __unused register_t *retval)
-{
- int condid = uap->condid;
- pthread_cond_t * cond;
- int mutexid = uap->mutexid;
- pthread_mutex_t * mutex;
- mach_timespec_t absts;
- int error;
- kern_return_t kret;
-
- absts.tv_sec = 0;
- absts.tv_nsec = 0;
-
- if (uap->abstime)
- if ((error = copyin(uap->abstime, &absts, sizeof(mach_timespec_t ))))
- return(error);
- cond = pthread_id_to_cond(condid);
- if (cond == 0)
- return(EINVAL);
-
- mutex = pthread_id_to_mutex(mutexid);
- if (mutex == 0) {
- pthread_cond_release(cond);
- return(EINVAL);
- }
- COND_LOCK(cond->lock);
-
- if (cond->sig != _PTHREAD_KERN_COND_SIG)
- {
- error = EINVAL;
- goto out;
- }
-
- if ((p != cond->owner_proc) && (cond->pshared != PTHREAD_PROCESS_SHARED)) {
- error = EINVAL;
- goto out;
- }
-
- COND_UNLOCK(cond->lock);
-
- kret = semaphore_timedwait(cond->sem, absts);
- switch (kret) {
- case KERN_INVALID_ADDRESS:
- case KERN_PROTECTION_FAILURE:
- error = EACCES;
- break;
- case KERN_ABORTED:
- case KERN_OPERATION_TIMED_OUT:
- error = EINTR;
- break;
- case KERN_SUCCESS:
- error = 0;
- break;
- default:
- error = EINVAL;
- break;
- }
-
- COND_LOCK(cond->lock);
-out:
- COND_UNLOCK(cond->lock);
- pthread_cond_release(cond);
- pthread_mutex_release(mutex);
- return (error);
-}
int
bsdthread_create(__unused struct proc *p, struct bsdthread_create_args *uap, user_addr_t *retval)
mach_vm_offset_t th_stackaddr;
mach_vm_offset_t th_stack;
mach_vm_offset_t th_pthread;
- mach_port_t th_thport;
+ mach_port_name_t th_thport;
thread_t th;
user_addr_t user_func = uap->func;
user_addr_t user_funcarg = uap->func_arg;
int isLP64 = 0;
+ if ((p->p_lflag & P_LREGISTER) == 0)
+ return(EINVAL);
#if 0
KERNEL_DEBUG_CONSTANT(0x9000080 | DBG_FUNC_START, flags, 0, 0, 0, 0);
#endif
#if defined(__ppc__)
stackaddr = 0xF0000000;
-#elif defined(__i386__)
+#elif defined(__i386__) || defined(__x86_64__)
stackaddr = 0xB0000000;
-#elif defined(__arm__)
- stackaddr = 0xB0000000; /* XXX ARM */
#else
#error Need to define a stack address hint for this architecture
#endif
thread_reference(th);
sright = (void *) convert_thread_to_port(th);
- th_thport = (void *)ipc_port_copyout_send(sright, get_task_ipcspace(ctask));
+ th_thport = ipc_port_copyout_send(sright, get_task_ipcspace(ctask));
if ((flags & PTHREAD_START_CUSTOM) == 0) {
th_stacksize = (mach_vm_size_t)user_stack; /* if it is custom them it is stacksize */
ts64->srr0 = (uint64_t)p->p_threadstart;
ts64->r1 = (uint64_t)(th_stack - C_ARGSAVE_LEN - C_RED_ZONE);
ts64->r3 = (uint64_t)th_pthread;
- ts64->r4 = (uint64_t)((unsigned int)th_thport);
+ ts64->r4 = (uint64_t)(th_thport);
ts64->r5 = (uint64_t)user_func;
ts64->r6 = (uint64_t)user_funcarg;
ts64->r7 = (uint64_t)user_stacksize;
thread_set_cthreadself(th, (uint64_t)th_pthread, isLP64);
}
-#elif defined(__i386__)
+#elif defined(__i386__) || defined(__x86_64__)
{
/*
* Set up i386 registers & function call.
ts64->rip = (uint64_t)p->p_threadstart;
ts64->rdi = (uint64_t)th_pthread;
- ts64->rsi = (uint64_t)((unsigned int)(th_thport));
+ ts64->rsi = (uint64_t)(th_thport);
ts64->rdx = (uint64_t)user_func;
ts64->rcx = (uint64_t)user_funcarg;
ts64->r8 = (uint64_t)user_stacksize;
thread_set_wq_state64(th, (thread_state_t)ts64);
}
}
-#elif defined(__arm__)
- {
- int flavor=0, count=0;
- void * state;
-
- kret = thread_getstatus(th, flavor, (thread_state_t)&state, &count);
- if (kret != KERN_SUCCESS) {
- error = EINVAL;
- goto out1;
- }
-
- /* XXX ARM TODO */
-
- kret = thread_setstatus(th, flavor, (thread_state_t)&state, count);
- if (kret != KERN_SUCCESS)
- error = EINVAL;
- goto out1;
- }
#else
#error bsdthread_create not defined for this architecture
#endif
extinfo.timeshare = 0;
thread_policy_set(th, THREAD_EXTENDED_POLICY, (thread_policy_t)&extinfo, THREAD_EXTENDED_POLICY_COUNT);
- precedinfo.importance = importance;
+#define BASEPRI_DEFAULT 31
+ precedinfo.importance = (importance - BASEPRI_DEFAULT);
thread_policy_set(th, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
}
}
thread_deallocate(th); /* drop the creator reference */
#if 0
- KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_END, error, (unsigned int)th_pthread, 0, 0, 0);
+ KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_END, error, th_pthread, 0, 0, 0);
#endif
*retval = th_pthread;
if (allocated != 0)
(void)mach_vm_deallocate(vmap, stackaddr, th_allocsize);
out:
- (void)mach_port_deallocate(get_task_ipcspace(ctask), (mach_port_name_t)th_thport);
+ (void)mach_port_deallocate(get_task_ipcspace(ctask), th_thport);
(void)thread_terminate(th);
(void)thread_deallocate(th);
return(error);
}
int
-bsdthread_terminate(__unused struct proc *p, struct bsdthread_terminate_args *uap, __unused register_t *retval)
+bsdthread_terminate(__unused struct proc *p, struct bsdthread_terminate_args *uap, __unused int32_t *retval)
{
mach_vm_offset_t freeaddr;
mach_vm_size_t freesize;
freesize = uap->freesize;
#if 0
- KERNEL_DEBUG_CONSTANT(0x9000084 |DBG_FUNC_START, (unsigned int)freeaddr, (unsigned int)freesize, (unsigned int)kthport, 0xff, 0);
+ KERNEL_DEBUG_CONSTANT(0x9000084 |DBG_FUNC_START, freeaddr, freesize, kthport, 0xff, 0);
#endif
- if (sem != MACH_PORT_NULL) {
- kret = semaphore_signal_internal_trap(sem);
+ if ((freesize != (mach_vm_size_t)0) && (freeaddr != (mach_vm_offset_t)0)) {
+ kret = mach_vm_deallocate(current_map(), freeaddr, freesize);
if (kret != KERN_SUCCESS) {
return(EINVAL);
}
}
- if ((freesize != (mach_vm_size_t)0) && (freeaddr != (mach_vm_offset_t)0)) {
- kret = mach_vm_deallocate(current_map(), freeaddr, freesize);
+
+ (void) thread_terminate(current_thread());
+ if (sem != MACH_PORT_NULL) {
+ kret = semaphore_signal_internal_trap(sem);
if (kret != KERN_SUCCESS) {
return(EINVAL);
}
}
- (void) thread_terminate(current_thread());
if (kthport != MACH_PORT_NULL)
mach_port_deallocate(get_task_ipcspace(current_task()), kthport);
thread_exception_return();
int
-bsdthread_register(struct proc *p, struct bsdthread_register_args *uap, __unused register_t *retval)
+bsdthread_register(struct proc *p, struct bsdthread_register_args *uap, __unused int32_t *retval)
{
+ /* prevent multiple registrations */
+ if ((p->p_lflag & P_LREGISTER) != 0)
+ return(EINVAL);
/* syscall randomizer test can pass bogus values */
if (uap->pthsize > MAX_PTHREAD_SIZE) {
return(EINVAL);
p->p_threadstart = uap->threadstart;
p->p_wqthread = uap->wqthread;
p->p_pthsize = uap->pthsize;
+ p->p_targconc = uap->targetconc_ptr;
+ p->p_dispatchqueue_offset = uap->dispatchqueue_offset;
+ proc_setregister(p);
return(0);
}
+uint32_t wq_yielded_threshold = WQ_YIELDED_THRESHOLD;
+uint32_t wq_yielded_window_usecs = WQ_YIELDED_WINDOW_USECS;
+uint32_t wq_stalled_window_usecs = WQ_STALLED_WINDOW_USECS;
+uint32_t wq_reduce_pool_window_usecs = WQ_REDUCE_POOL_WINDOW_USECS;
+uint32_t wq_max_timer_interval_usecs = WQ_MAX_TIMER_INTERVAL_USECS;
+uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS;
+SYSCTL_INT(_kern, OID_AUTO, wq_yielded_threshold, CTLFLAG_RW,
+ &wq_yielded_threshold, 0, "");
-int wq_stalled_window_usecs = WQ_STALLED_WINDOW_USECS;
-int wq_reduce_pool_window_usecs = WQ_REDUCE_POOL_WINDOW_USECS;
-int wq_max_run_latency_usecs = WQ_MAX_RUN_LATENCY_USECS;
-int wq_timer_interval_msecs = WQ_TIMER_INTERVAL_MSECS;
-
+SYSCTL_INT(_kern, OID_AUTO, wq_yielded_window_usecs, CTLFLAG_RW,
+ &wq_yielded_window_usecs, 0, "");
SYSCTL_INT(_kern, OID_AUTO, wq_stalled_window_usecs, CTLFLAG_RW,
&wq_stalled_window_usecs, 0, "");
SYSCTL_INT(_kern, OID_AUTO, wq_reduce_pool_window_usecs, CTLFLAG_RW,
&wq_reduce_pool_window_usecs, 0, "");
-SYSCTL_INT(_kern, OID_AUTO, wq_max_run_latency_usecs, CTLFLAG_RW,
- &wq_max_run_latency_usecs, 0, "");
+SYSCTL_INT(_kern, OID_AUTO, wq_max_timer_interval_usecs, CTLFLAG_RW,
+ &wq_max_timer_interval_usecs, 0, "");
+
+SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW,
+ &wq_max_threads, 0, "");
+
+
+void
+workqueue_init_lock(proc_t p)
+{
+ lck_spin_init(&p->p_wqlock, pthread_lck_grp, pthread_lck_attr);
+
+ p->p_wqiniting = FALSE;
+}
+
+void
+workqueue_destroy_lock(proc_t p)
+{
+ lck_spin_destroy(&p->p_wqlock, pthread_lck_grp);
+}
+
+
+static void
+workqueue_lock_spin(proc_t p)
+{
+ lck_spin_lock(&p->p_wqlock);
+}
+
+static void
+workqueue_unlock(proc_t p)
+{
+ lck_spin_unlock(&p->p_wqlock);
+}
+
+
+static void
+workqueue_interval_timer_start(struct workqueue *wq)
+{
+ uint64_t deadline;
+
+ if (wq->wq_timer_interval == 0)
+ wq->wq_timer_interval = wq_stalled_window_usecs;
+ else {
+ wq->wq_timer_interval = wq->wq_timer_interval * 2;
+
+ if (wq->wq_timer_interval > wq_max_timer_interval_usecs)
+ wq->wq_timer_interval = wq_max_timer_interval_usecs;
+ }
+ clock_interval_to_deadline(wq->wq_timer_interval, 1000, &deadline);
+
+ thread_call_enter_delayed(wq->wq_atimer_call, deadline);
+
+ KERNEL_DEBUG(0xefffd110, wq, wq->wq_itemcount, wq->wq_flags, wq->wq_timer_interval, 0);
+}
+
+
+static boolean_t
+wq_thread_is_busy(uint64_t cur_ts, uint64_t *lastblocked_tsp)
+{ clock_sec_t secs;
+ clock_usec_t usecs;
+ uint64_t lastblocked_ts;
+ uint64_t elapsed;
+
+ /*
+ * the timestamp is updated atomically w/o holding the workqueue lock
+ * so we need to do an atomic read of the 64 bits so that we don't see
+ * a mismatched pair of 32 bit reads... we accomplish this in an architecturally
+ * independent fashion by using OSCompareAndSwap64 to write back the
+ * value we grabbed... if it succeeds, then we have a good timestamp to
+ * evaluate... if it fails, we straddled grabbing the timestamp while it
+ * was being updated... treat a failed update as a busy thread since
+ * it implies we are about to see a really fresh timestamp anyway
+ */
+ lastblocked_ts = *lastblocked_tsp;
+
+#if defined(__ppc__)
+#else
+ if ( !OSCompareAndSwap64((UInt64)lastblocked_ts, (UInt64)lastblocked_ts, lastblocked_tsp))
+ return (TRUE);
+#endif
+ if (lastblocked_ts >= cur_ts) {
+ /*
+ * because the update of the timestamp when a thread blocks isn't
+ * serialized against us looking at it (i.e. we don't hold the workq lock)
+ * it's possible to have a timestamp that matches the current time or
+ * that even looks to be in the future relative to when we grabbed the current
+ * time... just treat this as a busy thread since it must have just blocked.
+ */
+ return (TRUE);
+ }
+ elapsed = cur_ts - lastblocked_ts;
+
+ absolutetime_to_microtime(elapsed, &secs, &usecs);
+
+ if (secs == 0 && usecs < wq_stalled_window_usecs)
+ return (TRUE);
+ return (FALSE);
+}
+
+
+#define WQ_TIMER_NEEDED(wq, start_timer) do { \
+ int oldflags = wq->wq_flags; \
+ \
+ if ( !(oldflags & (WQ_EXITING | WQ_ATIMER_RUNNING))) { \
+ if (OSCompareAndSwap(oldflags, oldflags | WQ_ATIMER_RUNNING, (UInt32 *)&wq->wq_flags)) \
+ start_timer = TRUE; \
+ } \
+} while (0)
+
-SYSCTL_INT(_kern, OID_AUTO, wq_timer_interval_msecs, CTLFLAG_RW,
- &wq_timer_interval_msecs, 0, "");
+static void
+workqueue_add_timer(struct workqueue *wq, __unused int param1)
+{
+ proc_t p;
+ boolean_t start_timer = FALSE;
+ boolean_t retval;
+ boolean_t add_thread;
+ uint32_t busycount;
+
+ KERNEL_DEBUG(0xefffd108 | DBG_FUNC_START, wq, wq->wq_flags, wq->wq_nthreads, wq->wq_thidlecount, 0);
+
+ p = wq->wq_proc;
+
+ workqueue_lock_spin(p);
+
+ /*
+ * because workqueue_callback now runs w/o taking the workqueue lock
+ * we are unsynchronized w/r to a change in state of the running threads...
+ * to make sure we always evaluate that change, we allow it to start up
+ * a new timer if the current one is actively evalutating the state
+ * however, we do not need more than 2 timers fired up (1 active and 1 pending)
+ * and we certainly do not want 2 active timers evaluating the state
+ * simultaneously... so use WQL_ATIMER_BUSY to serialize the timers...
+ * note that WQL_ATIMER_BUSY is in a different flag word from WQ_ATIMER_RUNNING since
+ * it is always protected by the workq lock... WQ_ATIMER_RUNNING is evaluated
+ * and set atomimcally since the callback function needs to manipulate it
+ * w/o holding the workq lock...
+ *
+ * !WQ_ATIMER_RUNNING && !WQL_ATIMER_BUSY == no pending timer, no active timer
+ * !WQ_ATIMER_RUNNING && WQL_ATIMER_BUSY == no pending timer, 1 active timer
+ * WQ_ATIMER_RUNNING && !WQL_ATIMER_BUSY == 1 pending timer, no active timer
+ * WQ_ATIMER_RUNNING && WQL_ATIMER_BUSY == 1 pending timer, 1 active timer
+ */
+ while (wq->wq_lflags & WQL_ATIMER_BUSY) {
+ wq->wq_lflags |= WQL_ATIMER_WAITING;
+
+ assert_wait((caddr_t)wq, (THREAD_UNINT));
+ workqueue_unlock(p);
+
+ thread_block(THREAD_CONTINUE_NULL);
+
+ workqueue_lock_spin(p);
+ }
+ wq->wq_lflags |= WQL_ATIMER_BUSY;
+
+ /*
+ * the workq lock will protect us from seeing WQ_EXITING change state, but we
+ * still need to update this atomically in case someone else tries to start
+ * the timer just as we're releasing it
+ */
+ while ( !(OSCompareAndSwap(wq->wq_flags, (wq->wq_flags & ~WQ_ATIMER_RUNNING), (UInt32 *)&wq->wq_flags)));
+
+again:
+ retval = TRUE;
+ add_thread = FALSE;
+
+ if ( !(wq->wq_flags & WQ_EXITING)) {
+ /*
+ * check to see if the stall frequency was beyond our tolerance
+ * or we have work on the queue, but haven't scheduled any
+ * new work within our acceptable time interval because
+ * there were no idle threads left to schedule
+ */
+ if (wq->wq_itemcount) {
+ uint32_t priority;
+ uint32_t affinity_tag;
+ uint32_t i;
+ uint64_t curtime;
+
+ for (priority = 0; priority < WORKQUEUE_NUMPRIOS; priority++) {
+ if (wq->wq_list_bitmap & (1 << priority))
+ break;
+ }
+ assert(priority < WORKQUEUE_NUMPRIOS);
+
+ curtime = mach_absolute_time();
+ busycount = 0;
+
+ for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; affinity_tag++) {
+ /*
+ * if we have no idle threads, we can try to add them if needed
+ */
+ if (wq->wq_thidlecount == 0)
+ add_thread = TRUE;
+
+ /*
+ * look for first affinity group that is currently not active
+ * i.e. no active threads at this priority level or higher
+ * and has not been active recently at this priority level or higher
+ */
+ for (i = 0; i <= priority; i++) {
+ if (wq->wq_thactive_count[i][affinity_tag]) {
+ add_thread = FALSE;
+ break;
+ }
+ if (wq->wq_thscheduled_count[i][affinity_tag]) {
+ if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i][affinity_tag])) {
+ add_thread = FALSE;
+ busycount++;
+ break;
+ }
+ }
+ }
+ if (add_thread == TRUE) {
+ retval = workqueue_addnewthread(wq);
+ break;
+ }
+ }
+ if (wq->wq_itemcount) {
+ /*
+ * as long as we have threads to schedule, and we successfully
+ * scheduled new work, keep trying
+ */
+ while (wq->wq_thidlecount && !(wq->wq_flags & WQ_EXITING)) {
+ /*
+ * workqueue_run_nextitem is responsible for
+ * dropping the workqueue lock in all cases
+ */
+ retval = workqueue_run_nextitem(p, wq, THREAD_NULL, 0, 0, 0);
+ workqueue_lock_spin(p);
+
+ if (retval == FALSE)
+ break;
+ }
+ if ( !(wq->wq_flags & WQ_EXITING) && wq->wq_itemcount) {
+
+ if (wq->wq_thidlecount == 0 && retval == TRUE && add_thread == TRUE)
+ goto again;
+
+ if (wq->wq_thidlecount == 0 || busycount)
+ WQ_TIMER_NEEDED(wq, start_timer);
+
+ KERNEL_DEBUG(0xefffd108 | DBG_FUNC_NONE, wq, wq->wq_itemcount, wq->wq_thidlecount, busycount, 0);
+ }
+ }
+ }
+ }
+ if ( !(wq->wq_flags & WQ_ATIMER_RUNNING))
+ wq->wq_timer_interval = 0;
+
+ wq->wq_lflags &= ~WQL_ATIMER_BUSY;
+
+ if ((wq->wq_flags & WQ_EXITING) || (wq->wq_lflags & WQL_ATIMER_WAITING)) {
+ /*
+ * wakeup the thread hung up in workqueue_exit or workqueue_add_timer waiting for this timer
+ * to finish getting out of the way
+ */
+ wq->wq_lflags &= ~WQL_ATIMER_WAITING;
+ wakeup(wq);
+ }
+ KERNEL_DEBUG(0xefffd108 | DBG_FUNC_END, wq, start_timer, wq->wq_nthreads, wq->wq_thidlecount, 0);
+
+ workqueue_unlock(p);
+ if (start_timer == TRUE)
+ workqueue_interval_timer_start(wq);
+}
void
-workqueue_init_lock(proc_t p)
+workqueue_thread_yielded(void)
{
- lck_mtx_init(&p->p_wqlock, pthread_lck_grp, pthread_lck_attr);
-}
+ struct workqueue *wq;
+ proc_t p;
-void
-workqueue_destroy_lock(proc_t p)
-{
- lck_mtx_destroy(&p->p_wqlock, pthread_lck_grp);
-}
+ p = current_proc();
-static void
-workqueue_lock(proc_t p)
-{
- lck_mtx_lock(&p->p_wqlock);
-}
+ if ((wq = p->p_wqptr) == NULL || wq->wq_itemcount == 0)
+ return;
+
+ workqueue_lock_spin(p);
-static void
-workqueue_lock_spin(proc_t p)
-{
- lck_mtx_lock_spin(&p->p_wqlock);
-}
+ if (wq->wq_itemcount) {
+ uint64_t curtime;
+ uint64_t elapsed;
+ clock_sec_t secs;
+ clock_usec_t usecs;
-static void
-workqueue_unlock(proc_t p)
-{
- lck_mtx_unlock(&p->p_wqlock);
-}
+ if (wq->wq_thread_yielded_count++ == 0)
+ wq->wq_thread_yielded_timestamp = mach_absolute_time();
+ if (wq->wq_thread_yielded_count < wq_yielded_threshold) {
+ workqueue_unlock(p);
+ return;
+ }
+ KERNEL_DEBUG(0xefffd138 | DBG_FUNC_START, wq, wq->wq_thread_yielded_count, wq->wq_itemcount, 0, 0);
+ wq->wq_thread_yielded_count = 0;
-static void
-workqueue_interval_timer_start(thread_call_t call, int interval_in_ms)
-{
- uint64_t deadline;
+ curtime = mach_absolute_time();
+ elapsed = curtime - wq->wq_thread_yielded_timestamp;
+ absolutetime_to_microtime(elapsed, &secs, &usecs);
- clock_interval_to_deadline(interval_in_ms, 1000 * 1000, &deadline);
+ if (secs == 0 && usecs < wq_yielded_window_usecs) {
- thread_call_enter_delayed(call, deadline);
-}
+ if (wq->wq_thidlecount == 0) {
+ workqueue_addnewthread(wq);
+ /*
+ * 'workqueue_addnewthread' drops the workqueue lock
+ * when creating the new thread and then retakes it before
+ * returning... this window allows other threads to process
+ * work on the queue, so we need to recheck for available work
+ * if none found, we just return... the newly created thread
+ * will eventually get used (if it hasn't already)...
+ */
+ if (wq->wq_itemcount == 0) {
+ workqueue_unlock(p);
+ return;
+ }
+ }
+ if (wq->wq_thidlecount) {
+ uint32_t priority;
+ uint32_t affinity = -1;
+ user_addr_t item;
+ struct workitem *witem = NULL;
+ struct workitemlist *wl = NULL;
+ struct uthread *uth;
+ struct threadlist *tl;
+
+ uth = get_bsdthread_info(current_thread());
+ if ((tl = uth->uu_threadlist))
+ affinity = tl->th_affinity_tag;
+
+ for (priority = 0; priority < WORKQUEUE_NUMPRIOS; priority++) {
+ if (wq->wq_list_bitmap & (1 << priority)) {
+ wl = (struct workitemlist *)&wq->wq_list[priority];
+ break;
+ }
+ }
+ assert(wl != NULL);
+ assert(!(TAILQ_EMPTY(&wl->wl_itemlist)));
+ witem = TAILQ_FIRST(&wl->wl_itemlist);
+ TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry);
-static void
-workqueue_timer(struct workqueue *wq, __unused int param1)
-{
- struct timeval tv, dtv;
- uint32_t i;
- boolean_t added_more_threads = FALSE;
- boolean_t reset_maxactive = FALSE;
- boolean_t restart_timer = FALSE;
-
- microuptime(&tv);
+ if (TAILQ_EMPTY(&wl->wl_itemlist))
+ wq->wq_list_bitmap &= ~(1 << priority);
+ wq->wq_itemcount--;
- KERNEL_DEBUG(0xefffd108, (int)wq, 0, 0, 0, 0);
+ item = witem->wi_item;
+ witem->wi_item = (user_addr_t)0;
+ witem->wi_affinity = 0;
- /*
- * check to see if the stall frequency was beyond our tolerance
- * or we have work on the queue, but haven't scheduled any
- * new work within our acceptable time interval because
- * there were no idle threads left to schedule
- *
- * WQ_TIMER_WATCH will only be set if we have 1 or more affinity
- * groups that have stalled (no active threads and no idle threads)...
- * it will not be set if all affinity groups have at least 1 thread
- * that is currently runnable... if all processors have a runnable
- * thread, there is no need to add more threads even if we're not
- * scheduling new work within our allowed window... it just means
- * that the work items are taking a long time to complete.
- */
- if (wq->wq_flags & (WQ_ADD_TO_POOL | WQ_TIMER_WATCH)) {
+ TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, wi_entry);
- if (wq->wq_flags & WQ_ADD_TO_POOL)
- added_more_threads = TRUE;
- else {
- timersub(&tv, &wq->wq_lastran_ts, &dtv);
+ (void)workqueue_run_nextitem(p, wq, THREAD_NULL, item, priority, affinity);
+ /*
+ * workqueue_run_nextitem is responsible for
+ * dropping the workqueue lock in all cases
+ */
+ KERNEL_DEBUG(0xefffd138 | DBG_FUNC_END, wq, wq->wq_thread_yielded_count, wq->wq_itemcount, 1, 0);
- if (((dtv.tv_sec * 1000000) + dtv.tv_usec) > wq_stalled_window_usecs)
- added_more_threads = TRUE;
- }
- if (added_more_threads == TRUE) {
- for (i = 0; i < wq->wq_affinity_max && wq->wq_nthreads < WORKQUEUE_MAXTHREADS; i++) {
- (void)workqueue_addnewthread(wq);
+ return;
}
}
+ KERNEL_DEBUG(0xefffd138 | DBG_FUNC_END, wq, wq->wq_thread_yielded_count, wq->wq_itemcount, 2, 0);
}
- timersub(&tv, &wq->wq_reduce_ts, &dtv);
-
- if (((dtv.tv_sec * 1000000) + dtv.tv_usec) > wq_reduce_pool_window_usecs)
- reset_maxactive = TRUE;
-
- /*
- * if the pool size has grown beyond the minimum number
- * of threads needed to keep all of the processors busy, and
- * the maximum number of threads scheduled concurrently during
- * the last sample period didn't exceed half the current pool
- * size, then its time to trim the pool size back
- */
- if (added_more_threads == FALSE &&
- reset_maxactive == TRUE &&
- wq->wq_nthreads > wq->wq_affinity_max &&
- wq->wq_max_threads_scheduled <= (wq->wq_nthreads / 2)) {
- uint32_t nthreads_to_remove;
-
- if ((nthreads_to_remove = (wq->wq_nthreads / 4)) == 0)
- nthreads_to_remove = 1;
-
- for (i = 0; i < nthreads_to_remove && wq->wq_nthreads > wq->wq_affinity_max; i++)
- workqueue_removethread(wq);
- }
- workqueue_lock_spin(wq->wq_proc);
-
- if (reset_maxactive == TRUE) {
- wq->wq_max_threads_scheduled = 0;
- microuptime(&wq->wq_reduce_ts);
- }
- if (added_more_threads) {
- wq->wq_flags &= ~(WQ_ADD_TO_POOL | WQ_TIMER_WATCH);
-
- /*
- * since we added more threads, we should be
- * able to run some work if its still available
- */
- workqueue_run_nextitem(wq->wq_proc, THREAD_NULL);
- workqueue_lock_spin(wq->wq_proc);
- }
- if ((wq->wq_nthreads > wq->wq_affinity_max) ||
- (wq->wq_flags & WQ_TIMER_WATCH)) {
- restart_timer = TRUE;
- } else
- wq->wq_flags &= ~WQ_TIMER_RUNNING;
-
- workqueue_unlock(wq->wq_proc);
-
- /*
- * we needed to knock down the WQ_TIMER_RUNNING flag while behind
- * the workqueue lock... however, we don't want to hold the lock
- * while restarting the timer and we certainly don't want 2 or more
- * instances of the timer... so set a local to indicate the need
- * for a restart since the state of wq_flags may change once we
- * drop the workqueue lock...
- */
- if (restart_timer == TRUE)
- workqueue_interval_timer_start(wq->wq_timer_call, wq_timer_interval_msecs);
+ workqueue_unlock(p);
}
+
static void
-workqueue_callback(
- int type,
- thread_t thread)
+workqueue_callback(int type, thread_t thread)
{
struct uthread *uth;
struct threadlist *tl;
struct workqueue *wq;
uth = get_bsdthread_info(thread);
- tl = uth->uu_threadlist;
- wq = tl->th_workq;
+ tl = uth->uu_threadlist;
+ wq = tl->th_workq;
switch (type) {
{
uint32_t old_activecount;
- old_activecount = OSAddAtomic(-1, (SInt32 *)&wq->wq_thactivecount[tl->th_affinity_tag]);
+ old_activecount = OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority][tl->th_affinity_tag]);
+
+ if (old_activecount == 1) {
+ boolean_t start_timer = FALSE;
+ uint64_t curtime;
+ UInt64 *lastblocked_ptr;
- if (old_activecount == 1 && wq->wq_itemcount) {
/*
* we were the last active thread on this affinity set
* and we've got work to do
*/
- workqueue_lock_spin(wq->wq_proc);
+ lastblocked_ptr = (UInt64 *)&wq->wq_lastblocked_ts[tl->th_priority][tl->th_affinity_tag];
+ curtime = mach_absolute_time();
+
/*
- * if this thread is blocking (not parking)
- * and the idle list is empty for this affinity group
- * we'll count it as a 'stall'
+ * if we collide with another thread trying to update the last_blocked (really unlikely
+ * since another thread would have to get scheduled and then block after we start down
+ * this path), it's not a problem. Either timestamp is adequate, so no need to retry
*/
- if ((tl->th_flags & TH_LIST_RUNNING) &&
- TAILQ_EMPTY(&wq->wq_thidlelist[tl->th_affinity_tag]))
- wq->wq_stalled_count++;
-
- workqueue_run_nextitem(wq->wq_proc, THREAD_NULL);
+#if defined(__ppc__)
/*
- * workqueue_run_nextitem will drop the workqueue
- * lock before it returns
+ * this doesn't have to actually work reliablly for PPC, it just has to compile/link
*/
+ *lastblocked_ptr = (UInt64)curtime;
+#else
+ OSCompareAndSwap64(*lastblocked_ptr, (UInt64)curtime, lastblocked_ptr);
+#endif
+ if (wq->wq_itemcount)
+ WQ_TIMER_NEEDED(wq, start_timer);
+
+ if (start_timer == TRUE)
+ workqueue_interval_timer_start(wq);
}
- KERNEL_DEBUG(0xefffd020, (int)thread, wq->wq_threads_scheduled, tl->th_affinity_tag, 0, 0);
+ KERNEL_DEBUG1(0xefffd020 | DBG_FUNC_START, wq, old_activecount, tl->th_priority, tl->th_affinity_tag, thread_tid(thread));
}
break;
* the thread lock for the thread being UNBLOCKED
* is also held
*/
- if (tl->th_unparked)
- OSAddAtomic(-1, (SInt32 *)&tl->th_unparked);
- else
- OSAddAtomic(1, (SInt32 *)&wq->wq_thactivecount[tl->th_affinity_tag]);
+ OSAddAtomic(1, &wq->wq_thactive_count[tl->th_priority][tl->th_affinity_tag]);
- KERNEL_DEBUG(0xefffd024, (int)thread, wq->wq_threads_scheduled, tl->th_affinity_tag, 0, 0);
- break;
+ KERNEL_DEBUG1(0xefffd020 | DBG_FUNC_END, wq, wq->wq_threads_scheduled, tl->th_priority, tl->th_affinity_tag, thread_tid(thread));
+
+ break;
}
}
+
static void
-workqueue_removethread(struct workqueue *wq)
+workqueue_removethread(struct threadlist *tl)
{
- struct threadlist *tl;
- uint32_t i, affinity_tag = 0;
-
- tl = NULL;
-
- workqueue_lock_spin(wq->wq_proc);
-
- for (i = 0; i < wq->wq_affinity_max; i++) {
+ struct workqueue *wq;
+ struct uthread * uth;
- affinity_tag = wq->wq_nextaffinitytag;
+ wq = tl->th_workq;
- if (affinity_tag == 0)
- affinity_tag = wq->wq_affinity_max - 1;
- else
- affinity_tag--;
- wq->wq_nextaffinitytag = affinity_tag;
+ TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
- /*
- * look for an idle thread to steal from this affinity group
- * but don't grab the only thread associated with it
- */
- if (!TAILQ_EMPTY(&wq->wq_thidlelist[affinity_tag]) && wq->wq_thcount[affinity_tag] > 1) {
- tl = TAILQ_FIRST(&wq->wq_thidlelist[affinity_tag]);
- TAILQ_REMOVE(&wq->wq_thidlelist[affinity_tag], tl, th_entry);
+ wq->wq_nthreads--;
+ wq->wq_thidlecount--;
- wq->wq_nthreads--;
- wq->wq_thcount[affinity_tag]--;
+ /*
+ * Clear the threadlist pointer in uthread so
+ * blocked thread on wakeup for termination will
+ * not access the thread list as it is going to be
+ * freed.
+ */
+ thread_sched_call(tl->th_thread, NULL);
- break;
- }
+ uth = get_bsdthread_info(tl->th_thread);
+ if (uth != (struct uthread *)0) {
+ uth->uu_threadlist = NULL;
}
workqueue_unlock(wq->wq_proc);
- if (tl != NULL) {
- thread_sched_call(tl->th_thread, NULL);
-
- if ( (tl->th_flags & TH_LIST_BLOCKED) )
- wakeup(tl);
- else {
- /*
- * thread was created, but never used...
- * need to clean up the stack and port ourselves
- * since we're not going to spin up through the
- * normal exit path triggered from Libc
- */
- (void)mach_vm_deallocate(wq->wq_map, tl->th_stackaddr, tl->th_allocsize);
- (void)mach_port_deallocate(get_task_ipcspace(wq->wq_task), (mach_port_name_t)tl->th_thport);
-
- thread_terminate(tl->th_thread);
- }
- KERNEL_DEBUG(0xefffd030, (int)tl->th_thread, wq->wq_nthreads, tl->th_flags & TH_LIST_BLOCKED, 0, 0);
+ if ( (tl->th_flags & TH_LIST_SUSPENDED) ) {
/*
- * drop our ref on the thread
+ * thread was created, but never used...
+ * need to clean up the stack and port ourselves
+ * since we're not going to spin up through the
+ * normal exit path triggered from Libc
*/
- thread_deallocate(tl->th_thread);
+ (void)mach_vm_deallocate(wq->wq_map, tl->th_stackaddr, tl->th_allocsize);
+ (void)mach_port_deallocate(get_task_ipcspace(wq->wq_task), tl->th_thport);
- kfree(tl, sizeof(struct threadlist));
+ KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_END, wq, (uintptr_t)thread_tid(current_thread()), wq->wq_nthreads, 0xdead, thread_tid(tl->th_thread));
+ } else {
+
+ KERNEL_DEBUG1(0xefffd018 | DBG_FUNC_END, wq, (uintptr_t)thread_tid(current_thread()), wq->wq_nthreads, 0xdead, thread_tid(tl->th_thread));
}
+ /*
+ * drop our ref on the thread
+ */
+ thread_deallocate(tl->th_thread);
+
+ kfree(tl, sizeof(struct threadlist));
}
-static int
+
+static boolean_t
workqueue_addnewthread(struct workqueue *wq)
{
struct threadlist *tl;
proc_t p;
void *sright;
mach_vm_offset_t stackaddr;
- uint32_t affinity_tag;
+
+ if (wq->wq_nthreads >= wq_max_threads || wq->wq_nthreads >= (CONFIG_THREAD_MAX - 20))
+ return (FALSE);
+ wq->wq_nthreads++;
p = wq->wq_proc;
+ workqueue_unlock(p);
- kret = thread_create(wq->wq_task, &th);
+ kret = thread_create_workq(wq->wq_task, (thread_continue_t)wq_unsuspend_continue, &th);
if (kret != KERN_SUCCESS)
- return(EINVAL);
+ goto failed;
tl = kalloc(sizeof(struct threadlist));
bzero(tl, sizeof(struct threadlist));
#if defined(__ppc__)
stackaddr = 0xF0000000;
-#elif defined(__i386__)
+#elif defined(__i386__) || defined(__x86_64__)
stackaddr = 0xB0000000;
-#elif defined(__arm__)
- stackaddr = 0xB0000000; /* XXX ARM */
#else
#error Need to define a stack address hint for this architecture
#endif
(void) thread_terminate(th);
kfree(tl, sizeof(struct threadlist));
-
- return(EINVAL);
+ goto failed;
}
thread_reference(th);
sright = (void *) convert_thread_to_port(th);
- tl->th_thport = (void *)ipc_port_copyout_send(sright, get_task_ipcspace(wq->wq_task));
+ tl->th_thport = ipc_port_copyout_send(sright, get_task_ipcspace(wq->wq_task));
thread_static_param(th, TRUE);
- workqueue_lock_spin(p);
-
- affinity_tag = wq->wq_nextaffinitytag;
- wq->wq_nextaffinitytag = (affinity_tag + 1) % wq->wq_affinity_max;
-
- workqueue_unlock(p);
-
tl->th_flags = TH_LIST_INITED | TH_LIST_SUSPENDED;
tl->th_thread = th;
tl->th_workq = wq;
tl->th_stackaddr = stackaddr;
- tl->th_affinity_tag = affinity_tag;
+ tl->th_affinity_tag = -1;
+ tl->th_priority = WORKQUEUE_NUMPRIOS;
+ tl->th_policy = -1;
#if defined(__ppc__)
//ml_fp_setvalid(FALSE);
thread_set_cthreadself(th, (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE), IS_64BIT_PROCESS(p));
#endif /* __ppc__ */
- /*
- * affinity tag of 0 means no affinity...
- * but we want our tags to be 0 based because they
- * are used to index arrays, so...
- * keep it 0 based internally and bump by 1 when
- * calling out to set it
- */
- (void)thread_affinity_set(th, affinity_tag + 1);
- thread_sched_call(th, workqueue_callback);
uth = get_bsdthread_info(tl->th_thread);
uth->uu_threadlist = (void *)tl;
workqueue_lock_spin(p);
+
+ TAILQ_INSERT_TAIL(&wq->wq_thidlelist, tl, th_entry);
- TAILQ_INSERT_TAIL(&wq->wq_thidlelist[tl->th_affinity_tag], tl, th_entry);
- wq->wq_nthreads++;
- wq->wq_thcount[affinity_tag]++;
+ wq->wq_thidlecount++;
- KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_START, (int)current_thread(), affinity_tag, wq->wq_nthreads, 0, (int)tl->th_thread);
+ KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_START, wq, wq->wq_nthreads, 0, thread_tid(current_thread()), thread_tid(tl->th_thread));
- /*
- * work may have come into the queue while
- * no threads were available to run... since
- * we're adding a new thread, go evaluate the
- * current state
- */
- workqueue_run_nextitem(p, THREAD_NULL);
- /*
- * workqueue_run_nextitem is responsible for
- * dropping the workqueue lock in all cases
- */
+ return (TRUE);
- return(0);
+failed:
+ workqueue_lock_spin(p);
+ wq->wq_nthreads--;
+
+ return (FALSE);
}
+
int
-workq_open(__unused struct proc *p, __unused struct workq_open_args *uap, __unused register_t *retval)
+workq_open(struct proc *p, __unused struct workq_open_args *uap, __unused int32_t *retval)
{
struct workqueue * wq;
- int size;
+ int wq_size;
char * ptr;
+ char * nptr;
int j;
uint32_t i;
+ uint32_t num_cpus;
int error = 0;
- int num_cpus;
+ boolean_t need_wakeup = FALSE;
struct workitem * witem;
struct workitemlist *wl;
- workqueue_lock(p);
+ if ((p->p_lflag & P_LREGISTER) == 0)
+ return(EINVAL);
+
+ workqueue_lock_spin(p);
if (p->p_wqptr == NULL) {
+
+ while (p->p_wqiniting == TRUE) {
+
+ assert_wait((caddr_t)&p->p_wqiniting, THREAD_UNINT);
+ workqueue_unlock(p);
+
+ thread_block(THREAD_CONTINUE_NULL);
+
+ workqueue_lock_spin(p);
+ }
+ if (p->p_wqptr != NULL)
+ goto out;
+
+ p->p_wqiniting = TRUE;
+
+ workqueue_unlock(p);
+
num_cpus = ml_get_max_cpus();
- size = (sizeof(struct workqueue)) +
- (num_cpus * sizeof(int *)) +
- (num_cpus * sizeof(TAILQ_HEAD(, threadlist)));
+ wq_size = sizeof(struct workqueue) +
+ (num_cpus * WORKQUEUE_NUMPRIOS * sizeof(uint32_t)) +
+ (num_cpus * WORKQUEUE_NUMPRIOS * sizeof(uint32_t)) +
+ (num_cpus * WORKQUEUE_NUMPRIOS * sizeof(uint64_t)) +
+ sizeof(uint64_t);
- ptr = (char *)kalloc(size);
- bzero(ptr, size);
+ ptr = (char *)kalloc(wq_size);
+ bzero(ptr, wq_size);
wq = (struct workqueue *)ptr;
wq->wq_flags = WQ_LIST_INITED;
witem = &wq->wq_array[(i*WORKITEM_SIZE) + j];
TAILQ_INSERT_TAIL(&wl->wl_freelist, witem, wi_entry);
}
+ wq->wq_reqconc[i] = wq->wq_affinity_max;
}
- wq->wq_thactivecount = (uint32_t *)((char *)ptr + sizeof(struct workqueue));
- wq->wq_thcount = (uint32_t *)&wq->wq_thactivecount[wq->wq_affinity_max];
- wq->wq_thidlelist = (struct wq_thidlelist *)&wq->wq_thcount[wq->wq_affinity_max];
+ nptr = ptr + sizeof(struct workqueue);
- for (i = 0; i < wq->wq_affinity_max; i++)
- TAILQ_INIT(&wq->wq_thidlelist[i]);
+ for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
+ wq->wq_thactive_count[i] = (uint32_t *)nptr;
+ nptr += (num_cpus * sizeof(uint32_t));
+ }
+ for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
+ wq->wq_thscheduled_count[i] = (uint32_t *)nptr;
+ nptr += (num_cpus * sizeof(uint32_t));
+ }
+ /*
+ * align nptr on a 64 bit boundary so that we can do nice
+ * atomic64 operations on the timestamps...
+ * note that we requested an extra uint64_t when calcuating
+ * the size for the allocation of the workqueue struct
+ */
+ nptr += (sizeof(uint64_t) - 1);
+ nptr = (char *)((long)nptr & ~(sizeof(uint64_t) - 1));
+ for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
+ wq->wq_lastblocked_ts[i] = (uint64_t *)nptr;
+ nptr += (num_cpus * sizeof(uint64_t));
+ }
TAILQ_INIT(&wq->wq_thrunlist);
+ TAILQ_INIT(&wq->wq_thidlelist);
- p->p_wqptr = (void *)wq;
- p->p_wqsize = size;
-
- workqueue_unlock(p);
-
- wq->wq_timer_call = thread_call_allocate((thread_call_func_t)workqueue_timer, (thread_call_param_t)wq);
+ wq->wq_atimer_call = thread_call_allocate((thread_call_func_t)workqueue_add_timer, (thread_call_param_t)wq);
- for (i = 0; i < wq->wq_affinity_max; i++) {
- (void)workqueue_addnewthread(wq);
- }
- /* If unable to create any threads, return error */
- if (wq->wq_nthreads == 0)
- error = EINVAL;
workqueue_lock_spin(p);
- microuptime(&wq->wq_reduce_ts);
- microuptime(&wq->wq_lastran_ts);
- wq->wq_max_threads_scheduled = 0;
- wq->wq_stalled_count = 0;
+ p->p_wqptr = (void *)wq;
+ p->p_wqsize = wq_size;
+
+ p->p_wqiniting = FALSE;
+ need_wakeup = TRUE;
}
+out:
workqueue_unlock(p);
+ if (need_wakeup == TRUE)
+ wakeup(&p->p_wqiniting);
return(error);
}
int
-workq_ops(struct proc *p, struct workq_ops_args *uap, __unused register_t *retval)
+workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, __unused int32_t *retval)
{
- int options = uap->options;
- int prio = uap->prio; /* should be used to find the right workqueue */
user_addr_t item = uap->item;
- int error = 0;
- thread_t th = THREAD_NULL;
+ int options = uap->options;
+ int prio = uap->prio; /* should be used to find the right workqueue */
+ int affinity = uap->affinity;
+ int error = 0;
+ thread_t th = THREAD_NULL;
+ user_addr_t oc_item = 0;
struct workqueue *wq;
- prio += 2; /* normalize prio -2 to +2 to 0 -4 */
+ if ((p->p_lflag & P_LREGISTER) == 0)
+ return(EINVAL);
+
+ /*
+ * affinity not yet hooked up on this path
+ */
+ affinity = -1;
switch (options) {
case WQOPS_QUEUE_ADD: {
-
- KERNEL_DEBUG(0xefffd008 | DBG_FUNC_NONE, (int)item, 0, 0, 0, 0);
+
+ if (prio & WORKQUEUE_OVERCOMMIT) {
+ prio &= ~WORKQUEUE_OVERCOMMIT;
+ oc_item = item;
+ }
+ if ((prio < 0) || (prio >= WORKQUEUE_NUMPRIOS))
+ return (EINVAL);
workqueue_lock_spin(p);
workqueue_unlock(p);
return (EINVAL);
}
- error = workqueue_additem(wq, prio, item);
-
+ if (wq->wq_thidlecount == 0 && (oc_item || (wq->wq_nthreads < wq->wq_affinity_max))) {
+
+ workqueue_addnewthread(wq);
+
+ if (wq->wq_thidlecount == 0)
+ oc_item = 0;
+ }
+ if (oc_item == 0)
+ error = workqueue_additem(wq, prio, item, affinity);
+
+ KERNEL_DEBUG(0xefffd008 | DBG_FUNC_NONE, wq, prio, affinity, oc_item, 0);
}
break;
case WQOPS_QUEUE_REMOVE: {
+ if ((prio < 0) || (prio >= WORKQUEUE_NUMPRIOS))
+ return (EINVAL);
+
workqueue_lock_spin(p);
if ((wq = (struct workqueue *)p->p_wqptr) == NULL) {
case WQOPS_THREAD_RETURN: {
th = current_thread();
+ struct uthread *uth = get_bsdthread_info(th);
+
+ /* reset signal mask on the workqueue thread to default state */
+ if (uth->uu_sigmask != (sigset_t)(~workq_threadmask)) {
+ proc_lock(p);
+ uth->uu_sigmask = ~workq_threadmask;
+ proc_unlock(p);
+ }
+
+ workqueue_lock_spin(p);
+
+ if ((wq = (struct workqueue *)p->p_wqptr) == NULL || (uth->uu_threadlist == NULL)) {
+ workqueue_unlock(p);
+ return (EINVAL);
+ }
+ KERNEL_DEBUG(0xefffd004 | DBG_FUNC_END, wq, 0, 0, 0, 0);
+ }
+ break;
+ case WQOPS_THREAD_SETCONC: {
- KERNEL_DEBUG(0xefffd004 | DBG_FUNC_END, 0, 0, 0, 0, 0);
+ if ((prio < 0) || (prio > WORKQUEUE_NUMPRIOS))
+ return (EINVAL);
workqueue_lock_spin(p);
workqueue_unlock(p);
return (EINVAL);
}
+ /*
+ * for this operation, we re-purpose the affinity
+ * argument as the concurrency target
+ */
+ if (prio < WORKQUEUE_NUMPRIOS)
+ wq->wq_reqconc[prio] = affinity;
+ else {
+ for (prio = 0; prio < WORKQUEUE_NUMPRIOS; prio++)
+ wq->wq_reqconc[prio] = affinity;
+
+ }
}
break;
default:
return (EINVAL);
}
- workqueue_run_nextitem(p, th);
+ (void)workqueue_run_nextitem(p, wq, th, oc_item, prio, affinity);
/*
* workqueue_run_nextitem is responsible for
* dropping the workqueue lock in all cases
*/
- return(error);
+ return (error);
+
}
void
{
struct workqueue * wq;
struct threadlist * tl, *tlist;
- uint32_t i;
+ struct uthread *uth;
+ int wq_size = 0;
if (p->p_wqptr != NULL) {
+ KERNEL_DEBUG(0x900808c | DBG_FUNC_START, p->p_wqptr, 0, 0, 0, 0);
+
workqueue_lock_spin(p);
wq = (struct workqueue *)p->p_wqptr;
- p->p_wqptr = NULL;
- workqueue_unlock(p);
+ if (wq == NULL) {
+ workqueue_unlock(p);
- if (wq == NULL)
+ KERNEL_DEBUG(0x900808c | DBG_FUNC_END, 0, 0, 0, -1, 0);
return;
-
- if (wq->wq_flags & WQ_TIMER_RUNNING)
- thread_call_cancel(wq->wq_timer_call);
- thread_call_free(wq->wq_timer_call);
+ }
+ wq_size = p->p_wqsize;
+ p->p_wqptr = NULL;
+ p->p_wqsize = 0;
+
+ /*
+ * we now arm the timer in the callback function w/o holding the workq lock...
+ * we do this by setting WQ_ATIMER_RUNNING via OSCompareAndSwap in order to
+ * insure only a single timer if running and to notice that WQ_EXITING has
+ * been set (we don't want to start a timer once WQ_EXITING is posted)
+ *
+ * so once we have successfully set WQ_EXITING, we cannot fire up a new timer...
+ * therefor no need to clear the timer state atomically from the flags
+ *
+ * since we always hold the workq lock when dropping WQ_ATIMER_RUNNING
+ * the check for and sleep until clear is protected
+ */
+ while ( !(OSCompareAndSwap(wq->wq_flags, (wq->wq_flags | WQ_EXITING), (UInt32 *)&wq->wq_flags)));
+
+ if (wq->wq_flags & WQ_ATIMER_RUNNING) {
+ if (thread_call_cancel(wq->wq_atimer_call) == TRUE)
+ wq->wq_flags &= ~WQ_ATIMER_RUNNING;
+ }
+ while ((wq->wq_flags & WQ_ATIMER_RUNNING) || (wq->wq_lflags & WQL_ATIMER_BUSY)) {
+
+ assert_wait((caddr_t)wq, (THREAD_UNINT));
+ workqueue_unlock(p);
+
+ thread_block(THREAD_CONTINUE_NULL);
+
+ workqueue_lock_spin(p);
+ }
+ workqueue_unlock(p);
TAILQ_FOREACH_SAFE(tl, &wq->wq_thrunlist, th_entry, tlist) {
+
+ thread_sched_call(tl->th_thread, NULL);
+
+ uth = get_bsdthread_info(tl->th_thread);
+ if (uth != (struct uthread *)0) {
+ uth->uu_threadlist = NULL;
+ }
+ TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry);
+
/*
* drop our last ref on the thread
*/
- thread_sched_call(tl->th_thread, NULL);
thread_deallocate(tl->th_thread);
- TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry);
kfree(tl, sizeof(struct threadlist));
}
- for (i = 0; i < wq->wq_affinity_max; i++) {
- TAILQ_FOREACH_SAFE(tl, &wq->wq_thidlelist[i], th_entry, tlist) {
- /*
- * drop our last ref on the thread
- */
- thread_sched_call(tl->th_thread, NULL);
- thread_deallocate(tl->th_thread);
+ TAILQ_FOREACH_SAFE(tl, &wq->wq_thidlelist, th_entry, tlist) {
+
+ thread_sched_call(tl->th_thread, NULL);
- TAILQ_REMOVE(&wq->wq_thidlelist[i], tl, th_entry);
- kfree(tl, sizeof(struct threadlist));
+ uth = get_bsdthread_info(tl->th_thread);
+ if (uth != (struct uthread *)0) {
+ uth->uu_threadlist = NULL;
}
+ TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
+
+ /*
+ * drop our last ref on the thread
+ */
+ thread_deallocate(tl->th_thread);
+
+ kfree(tl, sizeof(struct threadlist));
}
- kfree(wq, p->p_wqsize);
+ thread_call_free(wq->wq_atimer_call);
+
+ kfree(wq, wq_size);
+
+ KERNEL_DEBUG(0x900808c | DBG_FUNC_END, 0, 0, 0, 0, 0);
}
}
static int
-workqueue_additem(struct workqueue *wq, int prio, user_addr_t item)
+workqueue_additem(struct workqueue *wq, int prio, user_addr_t item, int affinity)
{
struct workitem *witem;
struct workitemlist *wl;
TAILQ_REMOVE(&wl->wl_freelist, witem, wi_entry);
witem->wi_item = item;
+ witem->wi_affinity = affinity;
TAILQ_INSERT_TAIL(&wl->wl_itemlist, witem, wi_entry);
- if (wq->wq_itemcount == 0) {
- microuptime(&wq->wq_lastran_ts);
- wq->wq_stalled_count = 0;
- }
+ wq->wq_list_bitmap |= (1 << prio);
+
wq->wq_itemcount++;
return (0);
TAILQ_FOREACH(witem, &wl->wl_itemlist, wi_entry) {
if (witem->wi_item == item) {
TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry);
- wq->wq_itemcount--;
+ if (TAILQ_EMPTY(&wl->wl_itemlist))
+ wq->wq_list_bitmap &= ~(1 << prio);
+ wq->wq_itemcount--;
+
witem->wi_item = (user_addr_t)0;
+ witem->wi_affinity = 0;
TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, wi_entry);
error = 0;
break;
}
}
- if (wq->wq_itemcount == 0)
- wq->wq_flags &= ~(WQ_ADD_TO_POOL | WQ_TIMER_WATCH);
-
return (error);
}
+static int workqueue_importance[WORKQUEUE_NUMPRIOS] =
+{
+ 2, 0, -2,
+};
+
+static int workqueue_policy[WORKQUEUE_NUMPRIOS] =
+{
+ 1, 1, 1,
+};
+
+
/*
* workqueue_run_nextitem:
* called with the workqueue lock held...
* responsible for dropping it in all cases
*/
-static void
-workqueue_run_nextitem(proc_t p, thread_t thread)
+static boolean_t
+workqueue_run_nextitem(proc_t p, struct workqueue *wq, thread_t thread, user_addr_t oc_item, int oc_prio, int oc_affinity)
{
- struct workqueue *wq;
struct workitem *witem = NULL;
user_addr_t item = 0;
thread_t th_to_run = THREAD_NULL;
thread_t th_to_park = THREAD_NULL;
int wake_thread = 0;
int reuse_thread = 1;
- uint32_t stalled_affinity_count = 0;
- int i;
- uint32_t affinity_tag;
+ uint32_t priority, orig_priority;
+ uint32_t affinity_tag, orig_affinity_tag;
+ uint32_t i, n;
+ uint32_t activecount;
+ uint32_t busycount;
+ uint32_t us_to_wait;
struct threadlist *tl = NULL;
+ struct threadlist *ttl = NULL;
struct uthread *uth = NULL;
- struct workitemlist *wl;
+ struct workitemlist *wl = NULL;
boolean_t start_timer = FALSE;
- struct timeval tv, lat_tv;
+ boolean_t adjust_counters = TRUE;
+ uint64_t curtime;
- wq = (struct workqueue *)p->p_wqptr;
- KERNEL_DEBUG(0xefffd000 | DBG_FUNC_START, (int)thread, wq->wq_threads_scheduled, wq->wq_stalled_count, 0, 0);
+ KERNEL_DEBUG(0xefffd000 | DBG_FUNC_START, wq, thread, wq->wq_thidlecount, wq->wq_itemcount, 0);
+
+ /*
+ * from here until we drop the workq lock
+ * we can't be pre-empted since we hold
+ * the lock in spin mode... this is important
+ * since we have to independently update the priority
+ * and affinity that the thread is associated with
+ * and these values are used to index the multi-dimensional
+ * counter arrays in 'workqueue_callback'
+ */
+ if (oc_item) {
+ uint32_t min_scheduled = 0;
+ uint32_t scheduled_count;
+ uint32_t active_count;
+ uint32_t t_affinity = 0;
+
+ priority = oc_prio;
+ item = oc_item;
+
+ if ((affinity_tag = oc_affinity) == (uint32_t)-1) {
+ for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; affinity_tag++) {
+ /*
+ * look for the affinity group with the least number of threads
+ */
+ scheduled_count = 0;
+ active_count = 0;
+ for (i = 0; i <= priority; i++) {
+ scheduled_count += wq->wq_thscheduled_count[i][affinity_tag];
+ active_count += wq->wq_thactive_count[i][affinity_tag];
+ }
+ if (active_count == 0) {
+ t_affinity = affinity_tag;
+ break;
+ }
+ if (affinity_tag == 0 || scheduled_count < min_scheduled) {
+ min_scheduled = scheduled_count;
+ t_affinity = affinity_tag;
+ }
+ }
+ affinity_tag = t_affinity;
+ }
+ goto grab_idle_thread;
+ }
if (wq->wq_itemcount == 0) {
if ((th_to_park = thread) == THREAD_NULL)
- goto out;
+ goto out_of_work;
goto parkit;
}
+ for (priority = 0; priority < WORKQUEUE_NUMPRIOS; priority++) {
+ if (wq->wq_list_bitmap & (1 << priority)) {
+ wl = (struct workitemlist *)&wq->wq_list[priority];
+ break;
+ }
+ }
+ assert(wl != NULL);
+ assert(!(TAILQ_EMPTY(&wl->wl_itemlist)));
+
+ curtime = mach_absolute_time();
+
if (thread != THREAD_NULL) {
- /*
- * we're a worker thread from the pool... currently we
- * are considered 'active' which means we're counted
- * in "wq_thactivecount"
- */
uth = get_bsdthread_info(thread);
tl = uth->uu_threadlist;
+ affinity_tag = tl->th_affinity_tag;
- if (wq->wq_thactivecount[tl->th_affinity_tag] == 1) {
- /*
- * we're the only active thread associated with our
- * affinity group, so pick up some work and keep going
+ /*
+ * check to see if the affinity group this thread is
+ * associated with is still within the bounds of the
+ * specified concurrency for the priority level
+ * we're considering running work for
+ */
+ if (affinity_tag < wq->wq_reqconc[priority]) {
+ /*
+ * we're a worker thread from the pool... currently we
+ * are considered 'active' which means we're counted
+ * in "wq_thactive_count"
+ * add up the active counts of all the priority levels
+ * up to and including the one we want to schedule
*/
- th_to_run = thread;
- goto pick_up_work;
+ for (activecount = 0, i = 0; i <= priority; i++) {
+ uint32_t acount;
+
+ acount = wq->wq_thactive_count[i][affinity_tag];
+
+ if (acount == 0 && wq->wq_thscheduled_count[i][affinity_tag]) {
+ if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i][affinity_tag]))
+ acount = 1;
+ }
+ activecount += acount;
+ }
+ if (activecount == 1) {
+ /*
+ * we're the only active thread associated with our
+ * affinity group at this priority level and higher,
+ * so pick up some work and keep going
+ */
+ th_to_run = thread;
+ goto pick_up_work;
+ }
}
+ /*
+ * there's more than 1 thread running in this affinity group
+ * or the concurrency level has been cut back for this priority...
+ * lets continue on and look for an 'empty' group to run this
+ * work item in
+ */
}
- for (affinity_tag = 0; affinity_tag < wq->wq_affinity_max; affinity_tag++) {
- /*
+ busycount = 0;
+
+ for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; affinity_tag++) {
+ /*
* look for first affinity group that is currently not active
- * and has at least 1 idle thread
+ * i.e. no active threads at this priority level or higher
+ * and no threads that have run recently
*/
- if (wq->wq_thactivecount[affinity_tag] == 0) {
- if (!TAILQ_EMPTY(&wq->wq_thidlelist[affinity_tag]))
- break;
- stalled_affinity_count++;
+ for (activecount = 0, i = 0; i <= priority; i++) {
+ if ((activecount = wq->wq_thactive_count[i][affinity_tag]))
+ break;
+
+ if (wq->wq_thscheduled_count[i][affinity_tag]) {
+ if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i][affinity_tag])) {
+ busycount++;
+ break;
+ }
+ }
}
+ if (activecount == 0 && busycount == 0)
+ break;
}
- if (thread == THREAD_NULL) {
- /*
- * we're not one of the 'worker' threads
+ if (affinity_tag >= wq->wq_reqconc[priority]) {
+ /*
+ * we've already got at least 1 thread per
+ * affinity group in the active state...
*/
- if (affinity_tag >= wq->wq_affinity_max) {
- /*
- * we've already got at least 1 thread per
- * affinity group in the active state... or
- * we've got no idle threads to play with
+ if (busycount) {
+ /*
+ * we found at least 1 thread in the
+ * 'busy' state... make sure we start
+ * the timer because if they are the only
+ * threads keeping us from scheduling
+ * this workitem, we won't get a callback
+ * to kick off the timer... we need to
+ * start it now...
*/
- if (stalled_affinity_count) {
-
- if ( !(wq->wq_flags & WQ_TIMER_RUNNING) ) {
- wq->wq_flags |= WQ_TIMER_RUNNING;
- start_timer = TRUE;
- }
- wq->wq_flags |= WQ_TIMER_WATCH;
- }
- goto out;
+ WQ_TIMER_NEEDED(wq, start_timer);
}
- } else {
- /*
- * we're overbooked on the affinity group we're associated with,
- * so park this thread
- */
- th_to_park = thread;
+ KERNEL_DEBUG(0xefffd000 | DBG_FUNC_NONE, wq, busycount, start_timer, 0, 0);
- if (affinity_tag >= wq->wq_affinity_max) {
- /*
- * all the affinity groups have active threads
- * running, or there are no idle threads to
- * schedule
+ if (thread != THREAD_NULL) {
+ /*
+ * go park this one for later
*/
- if (stalled_affinity_count) {
-
- if ( !(wq->wq_flags & WQ_TIMER_RUNNING) ) {
- wq->wq_flags |= WQ_TIMER_RUNNING;
- start_timer = TRUE;
- }
- wq->wq_flags |= WQ_TIMER_WATCH;
- }
+ th_to_park = thread;
goto parkit;
}
+ goto out_of_work;
+ }
+ if (thread != THREAD_NULL) {
+ /*
+ * we're overbooked on the affinity group this thread is
+ * currently associated with, but we have work to do
+ * and at least 1 idle processor, so we'll just retarget
+ * this thread to a new affinity group
+ */
+ th_to_run = thread;
+ goto pick_up_work;
+ }
+ if (wq->wq_thidlecount == 0) {
/*
- * we've got a candidate (affinity group with no currently
- * active threads) to start a new thread on...
- * we already know there is both work available
- * and an idle thread with the correct affinity tag, so
- * fall into the code that pulls a new thread and workitem...
- * once we've kicked that thread off, we'll park this one
+ * we don't have a thread to schedule, but we have
+ * work to do and at least 1 affinity group that
+ * doesn't currently have an active thread...
*/
+ WQ_TIMER_NEEDED(wq, start_timer);
+
+ KERNEL_DEBUG(0xefffd118, wq, wq->wq_nthreads, start_timer, 0, 0);
+
+ goto no_thread_to_run;
+ }
+
+grab_idle_thread:
+ /*
+ * we've got a candidate (affinity group with no currently
+ * active threads) to start a new thread on...
+ * we already know there is both work available
+ * and an idle thread, so activate a thread and then
+ * fall into the code that pulls a new workitem...
+ */
+ TAILQ_FOREACH(ttl, &wq->wq_thidlelist, th_entry) {
+ if (ttl->th_affinity_tag == affinity_tag || ttl->th_affinity_tag == (uint16_t)-1) {
+
+ TAILQ_REMOVE(&wq->wq_thidlelist, ttl, th_entry);
+ tl = ttl;
+
+ break;
+ }
}
- tl = TAILQ_FIRST(&wq->wq_thidlelist[affinity_tag]);
- TAILQ_REMOVE(&wq->wq_thidlelist[affinity_tag], tl, th_entry);
+ if (tl == NULL) {
+ tl = TAILQ_FIRST(&wq->wq_thidlelist);
+ TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
+ }
+ wq->wq_thidlecount--;
- th_to_run = tl->th_thread;
TAILQ_INSERT_TAIL(&wq->wq_thrunlist, tl, th_entry);
if ((tl->th_flags & TH_LIST_SUSPENDED) == TH_LIST_SUSPENDED) {
- tl->th_flags &= ~TH_LIST_SUSPENDED;
+ tl->th_flags &= ~TH_LIST_SUSPENDED;
reuse_thread = 0;
+
} else if ((tl->th_flags & TH_LIST_BLOCKED) == TH_LIST_BLOCKED) {
- tl->th_flags &= ~TH_LIST_BLOCKED;
+ tl->th_flags &= ~TH_LIST_BLOCKED;
wake_thread = 1;
}
- tl->th_flags |= TH_LIST_RUNNING;
+ tl->th_flags |= TH_LIST_RUNNING | TH_LIST_BUSY;
- wq->wq_threads_scheduled++;
+ wq->wq_threads_scheduled++;
+ wq->wq_thscheduled_count[priority][affinity_tag]++;
+ OSAddAtomic(1, &wq->wq_thactive_count[priority][affinity_tag]);
- if (wq->wq_threads_scheduled > wq->wq_max_threads_scheduled)
- wq->wq_max_threads_scheduled = wq->wq_threads_scheduled;
+ adjust_counters = FALSE;
+ th_to_run = tl->th_thread;
pick_up_work:
- for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
- wl = (struct workitemlist *)&wq->wq_list[i];
-
- if (!(TAILQ_EMPTY(&wl->wl_itemlist))) {
+ if (item == 0) {
+ witem = TAILQ_FIRST(&wl->wl_itemlist);
+ TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry);
+
+ if (TAILQ_EMPTY(&wl->wl_itemlist))
+ wq->wq_list_bitmap &= ~(1 << priority);
+ wq->wq_itemcount--;
+
+ item = witem->wi_item;
+ witem->wi_item = (user_addr_t)0;
+ witem->wi_affinity = 0;
+ TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, wi_entry);
+ }
+ orig_priority = tl->th_priority;
+ orig_affinity_tag = tl->th_affinity_tag;
- witem = TAILQ_FIRST(&wl->wl_itemlist);
- TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry);
- wq->wq_itemcount--;
+ tl->th_priority = priority;
+ tl->th_affinity_tag = affinity_tag;
- item = witem->wi_item;
- witem->wi_item = (user_addr_t)0;
- TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, wi_entry);
+ if (adjust_counters == TRUE && (orig_priority != priority || orig_affinity_tag != affinity_tag)) {
+ /*
+ * we need to adjust these counters based on this
+ * thread's new disposition w/r to affinity and priority
+ */
+ OSAddAtomic(-1, &wq->wq_thactive_count[orig_priority][orig_affinity_tag]);
+ OSAddAtomic(1, &wq->wq_thactive_count[priority][affinity_tag]);
- break;
- }
+ wq->wq_thscheduled_count[orig_priority][orig_affinity_tag]--;
+ wq->wq_thscheduled_count[priority][affinity_tag]++;
}
- if (witem == NULL)
- panic("workq_run_nextitem: NULL witem");
+ wq->wq_thread_yielded_count = 0;
- if (thread != th_to_run) {
- /*
- * we're starting up a thread from a parked/suspended condition
- */
- OSAddAtomic(1, (SInt32 *)&wq->wq_thactivecount[tl->th_affinity_tag]);
- OSAddAtomic(1, (SInt32 *)&tl->th_unparked);
- }
- if (wq->wq_itemcount == 0)
- wq->wq_flags &= ~WQ_TIMER_WATCH;
- else {
- microuptime(&tv);
+ workqueue_unlock(p);
+
+ if (orig_affinity_tag != affinity_tag) {
/*
- * if we had any affinity groups stall (no threads runnable)
- * since we last scheduled an item... and
- * the elapsed time since we last scheduled an item
- * exceeds the latency tolerance...
- * we ask the timer thread (which should already be running)
- * to add some more threads to the pool
+ * this thread's affinity does not match the affinity group
+ * its being placed on (it's either a brand new thread or
+ * we're retargeting an existing thread to a new group)...
+ * affinity tag of 0 means no affinity...
+ * but we want our tags to be 0 based because they
+ * are used to index arrays, so...
+ * keep it 0 based internally and bump by 1 when
+ * calling out to set it
*/
- if (wq->wq_stalled_count && !(wq->wq_flags & WQ_ADD_TO_POOL)) {
- timersub(&tv, &wq->wq_lastran_ts, &lat_tv);
+ KERNEL_DEBUG(0xefffd114 | DBG_FUNC_START, wq, orig_affinity_tag, 0, 0, 0);
- if (((lat_tv.tv_sec * 1000000) + lat_tv.tv_usec) > wq_max_run_latency_usecs)
- wq->wq_flags |= WQ_ADD_TO_POOL;
+ (void)thread_affinity_set(th_to_run, affinity_tag + 1);
- KERNEL_DEBUG(0xefffd10c, wq->wq_stalled_count, lat_tv.tv_sec, lat_tv.tv_usec, wq->wq_flags, 0);
- }
- wq->wq_lastran_ts = tv;
+ KERNEL_DEBUG(0xefffd114 | DBG_FUNC_END, wq, affinity_tag, 0, 0, 0);
}
- wq->wq_stalled_count = 0;
- workqueue_unlock(p);
+ if (orig_priority != priority) {
+ thread_precedence_policy_data_t precedinfo;
+ thread_extended_policy_data_t extinfo;
+ uint32_t policy;
+
+ policy = workqueue_policy[priority];
+
+ KERNEL_DEBUG(0xefffd120 | DBG_FUNC_START, wq, orig_priority, tl->th_policy, 0, 0);
+
+ if (tl->th_policy != policy) {
- KERNEL_DEBUG(0xefffd02c, wq->wq_thactivecount[0], wq->wq_thactivecount[1],
- wq->wq_thactivecount[2], wq->wq_thactivecount[3], 0);
+ extinfo.timeshare = policy;
+ (void)thread_policy_set_internal(th_to_run, THREAD_EXTENDED_POLICY, (thread_policy_t)&extinfo, THREAD_EXTENDED_POLICY_COUNT);
- KERNEL_DEBUG(0xefffd02c, wq->wq_thactivecount[4], wq->wq_thactivecount[5],
- wq->wq_thactivecount[6], wq->wq_thactivecount[7], 0);
+ tl->th_policy = policy;
+ }
+ precedinfo.importance = workqueue_importance[priority];
+ (void)thread_policy_set_internal(th_to_run, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
+ KERNEL_DEBUG(0xefffd120 | DBG_FUNC_END, wq, priority, policy, 0, 0);
+ }
+ if (kdebug_enable) {
+ int lpri = -1;
+ int laffinity = -1;
+ int first = -1;
+ uint32_t code = 0xefffd02c | DBG_FUNC_START;
+
+ for (n = 0; n < WORKQUEUE_NUMPRIOS; n++) {
+ for (i = 0; i < wq->wq_affinity_max; i++) {
+ if (wq->wq_thactive_count[n][i]) {
+ if (lpri != -1) {
+ KERNEL_DEBUG(code, lpri, laffinity, wq->wq_thactive_count[lpri][laffinity], first, 0);
+ code = 0xefffd02c;
+ first = 0;
+ }
+ lpri = n;
+ laffinity = i;
+ }
+ }
+ }
+ if (lpri != -1) {
+ if (first == -1)
+ first = 0xeeeeeeee;
+ KERNEL_DEBUG(0xefffd02c | DBG_FUNC_END, lpri, laffinity, wq->wq_thactive_count[lpri][laffinity], first, 0);
+ }
+ }
/*
* if current thread is reused for workitem, does not return via unix_syscall
*/
wq_runitem(p, item, th_to_run, tl, reuse_thread, wake_thread, (thread == th_to_run));
- if (th_to_park == THREAD_NULL) {
+ KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, wq, thread_tid(th_to_run), item, 1, 0);
- KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, (int)thread, (int)item, wq->wq_flags, 1, 0);
+ return (TRUE);
- return;
- }
- workqueue_lock_spin(p);
+out_of_work:
+ /*
+ * we have no work to do or we are fully booked
+ * w/r to running threads...
+ */
+no_thread_to_run:
+ workqueue_unlock(p);
+
+ if (start_timer)
+ workqueue_interval_timer_start(wq);
+
+ KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, wq, thread_tid(thread), 0, 2, 0);
+
+ return (FALSE);
parkit:
- wq->wq_threads_scheduled--;
/*
* this is a workqueue thread with no more
* work to do... park it for now
tl->th_flags &= ~TH_LIST_RUNNING;
tl->th_flags |= TH_LIST_BLOCKED;
- TAILQ_INSERT_HEAD(&wq->wq_thidlelist[tl->th_affinity_tag], tl, th_entry);
+ TAILQ_INSERT_HEAD(&wq->wq_thidlelist, tl, th_entry);
+
+ thread_sched_call(th_to_park, NULL);
+
+ OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority][tl->th_affinity_tag]);
+ wq->wq_thscheduled_count[tl->th_priority][tl->th_affinity_tag]--;
+ wq->wq_threads_scheduled--;
+
+ if (wq->wq_thidlecount < 100)
+ us_to_wait = wq_reduce_pool_window_usecs - (wq->wq_thidlecount * (wq_reduce_pool_window_usecs / 100));
+ else
+ us_to_wait = wq_reduce_pool_window_usecs / 100;
+
+ wq->wq_thidlecount++;
- assert_wait((caddr_t)tl, (THREAD_INTERRUPTIBLE));
+ assert_wait_timeout((caddr_t)tl, (THREAD_INTERRUPTIBLE), us_to_wait, NSEC_PER_USEC);
workqueue_unlock(p);
if (start_timer)
- workqueue_interval_timer_start(wq->wq_timer_call, wq_timer_interval_msecs);
+ workqueue_interval_timer_start(wq);
- KERNEL_DEBUG1(0xefffd018 | DBG_FUNC_START, (int)current_thread(), wq->wq_threads_scheduled, 0, 0, (int)th_to_park);
+ KERNEL_DEBUG1(0xefffd018 | DBG_FUNC_START, wq, wq->wq_threads_scheduled, wq->wq_thidlecount, us_to_wait, thread_tid(th_to_park));
+ KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, wq, thread_tid(thread), 0, 3, 0);
- thread_block((thread_continue_t)thread_exception_return);
+ thread_block((thread_continue_t)wq_unpark_continue);
+ /* NOT REACHED */
- panic("unexpected return from thread_block");
+ return (FALSE);
+}
-out:
- workqueue_unlock(p);
- if (start_timer)
- workqueue_interval_timer_start(wq->wq_timer_call, wq_timer_interval_msecs);
+static void
+wq_unsuspend_continue(void)
+{
+ struct uthread *uth = NULL;
+ thread_t th_to_unsuspend;
+ struct threadlist *tl;
+ proc_t p;
+
+ th_to_unsuspend = current_thread();
+ uth = get_bsdthread_info(th_to_unsuspend);
+
+ if (uth != NULL && (tl = uth->uu_threadlist) != NULL) {
+
+ if ((tl->th_flags & (TH_LIST_RUNNING | TH_LIST_BUSY)) == TH_LIST_RUNNING) {
+ /*
+ * most likely a normal resume of this thread occurred...
+ * it's also possible that the thread was aborted after we
+ * finished setting it up so that it could be dispatched... if
+ * so, thread_bootstrap_return will notice the abort and put
+ * the thread on the path to self-destruction
+ */
+normal_resume_to_user:
+ thread_sched_call(th_to_unsuspend, workqueue_callback);
+
+ thread_bootstrap_return();
+ }
+ /*
+ * if we get here, it's because we've been resumed due to
+ * an abort of this thread (process is crashing)
+ */
+ p = current_proc();
+
+ workqueue_lock_spin(p);
+
+ if (tl->th_flags & TH_LIST_SUSPENDED) {
+ /*
+ * thread has been aborted while still on our idle
+ * queue... remove it from our domain...
+ * workqueue_removethread consumes the lock
+ */
+ workqueue_removethread(tl);
+
+ thread_bootstrap_return();
+ }
+ while ((tl->th_flags & TH_LIST_BUSY)) {
+ /*
+ * this thread was aborted after we started making
+ * it runnable, but before we finished dispatching it...
+ * we need to wait for that process to finish,
+ * and we need to ask for a wakeup instead of a
+ * thread_resume since the abort has already resumed us
+ */
+ tl->th_flags |= TH_LIST_NEED_WAKEUP;
+
+ assert_wait((caddr_t)tl, (THREAD_UNINT));
+
+ workqueue_unlock(p);
+
+ thread_block(THREAD_CONTINUE_NULL);
+
+ workqueue_lock_spin(p);
+ }
+ workqueue_unlock(p);
+ /*
+ * we have finished setting up the thread's context...
+ * thread_bootstrap_return will take us through the abort path
+ * where the thread will self destruct
+ */
+ goto normal_resume_to_user;
+ }
+ thread_bootstrap_return();
+}
+
+
+static void
+wq_unpark_continue(void)
+{
+ struct uthread *uth = NULL;
+ struct threadlist *tl;
+ thread_t th_to_unpark;
+ proc_t p;
+
+ th_to_unpark = current_thread();
+ uth = get_bsdthread_info(th_to_unpark);
+
+ if (uth != NULL) {
+ if ((tl = uth->uu_threadlist) != NULL) {
+
+ if ((tl->th_flags & (TH_LIST_RUNNING | TH_LIST_BUSY)) == TH_LIST_RUNNING) {
+ /*
+ * a normal wakeup of this thread occurred... no need
+ * for any synchronization with the timer and wq_runitem
+ */
+normal_return_to_user:
+ thread_sched_call(th_to_unpark, workqueue_callback);
+
+ KERNEL_DEBUG(0xefffd018 | DBG_FUNC_END, tl->th_workq, 0, 0, 0, 0);
+
+ thread_exception_return();
+ }
+ p = current_proc();
+
+ workqueue_lock_spin(p);
+
+ if ( !(tl->th_flags & TH_LIST_RUNNING)) {
+ /*
+ * the timer popped us out and we've not
+ * been moved off of the idle list
+ * so we should now self-destruct
+ *
+ * workqueue_removethread consumes the lock
+ */
+ workqueue_removethread(tl);
+
+ thread_exception_return();
+ }
+ /*
+ * the timer woke us up, but we have already
+ * started to make this a runnable thread,
+ * but have not yet finished that process...
+ * so wait for the normal wakeup
+ */
+ while ((tl->th_flags & TH_LIST_BUSY)) {
- KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, (int)thread, 0, wq->wq_flags, 2, 0);
+ assert_wait((caddr_t)tl, (THREAD_UNINT));
- return;
+ workqueue_unlock(p);
+
+ thread_block(THREAD_CONTINUE_NULL);
+
+ workqueue_lock_spin(p);
+ }
+ /*
+ * we have finished setting up the thread's context
+ * now we can return as if we got a normal wakeup
+ */
+ workqueue_unlock(p);
+
+ goto normal_return_to_user;
+ }
+ }
+ thread_exception_return();
}
+
+
static void
wq_runitem(proc_t p, user_addr_t item, thread_t th, struct threadlist *tl,
int reuse_thread, int wake_thread, int return_directly)
{
int ret = 0;
- KERNEL_DEBUG1(0xefffd004 | DBG_FUNC_START, (int)current_thread(), (int)item, wake_thread, tl->th_affinity_tag, (int)th);
+ KERNEL_DEBUG1(0xefffd004 | DBG_FUNC_START, tl->th_workq, tl->th_priority, tl->th_affinity_tag, thread_tid(current_thread()), thread_tid(th));
ret = setup_wqthread(p, th, item, reuse_thread, tl);
panic("setup_wqthread failed %x\n", ret);
if (return_directly) {
+ KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, tl->th_workq, 0, 0, 4, 0);
+
thread_exception_return();
panic("wq_runitem: thread_exception_return returned ...\n");
}
if (wake_thread) {
- KERNEL_DEBUG1(0xefffd018 | DBG_FUNC_END, (int)current_thread(), 0, 0, 0, (int)th);
-
+ workqueue_lock_spin(p);
+
+ tl->th_flags &= ~TH_LIST_BUSY;
wakeup(tl);
+
+ workqueue_unlock(p);
} else {
- KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_END, (int)current_thread(), 0, 0, 0, (int)th);
+ KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_END, tl->th_workq, 0, 0, thread_tid(current_thread()), thread_tid(th));
+
+ workqueue_lock_spin(p);
+
+ if (tl->th_flags & TH_LIST_NEED_WAKEUP)
+ wakeup(tl);
+ else
+ thread_resume(th);
- thread_resume(th);
+ tl->th_flags &= ~(TH_LIST_BUSY | TH_LIST_NEED_WAKEUP);
+
+ workqueue_unlock(p);
}
}
-
int
setup_wqthread(proc_t p, thread_t th, user_addr_t item, int reuse_thread, struct threadlist *tl)
{
-
#if defined(__ppc__)
/*
* Set up PowerPC registers...
ts64->srr0 = (uint64_t)p->p_wqthread;
ts64->r1 = (uint64_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE) - C_ARGSAVE_LEN - C_RED_ZONE);
ts64->r3 = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE);
- ts64->r4 = (uint64_t)((unsigned int)tl->th_thport);
+ ts64->r4 = (uint64_t)(tl->th_thport);
ts64->r5 = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE);
ts64->r6 = (uint64_t)item;
ts64->r7 = (uint64_t)reuse_thread;
ts64->r8 = (uint64_t)0;
+ if ((reuse_thread != 0) && (ts64->r3 == (uint64_t)0))
+ panic("setup_wqthread: setting reuse thread with null pthread\n");
thread_set_wq_state64(th, (thread_state_t)ts64);
}
-#elif defined(__i386__)
+#elif defined(__i386__) || defined(__x86_64__)
int isLP64 = 0;
isLP64 = IS_64BIT_PROCESS(p);
*/
ts->esp = (int)((vm_offset_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE) - C_32_STK_ALIGN));
+ if ((reuse_thread != 0) && (ts->eax == (unsigned int)0))
+ panic("setup_wqthread: setting reuse thread with null pthread\n");
thread_set_wq_state32(th, (thread_state_t)ts);
} else {
ts64->rip = (uint64_t)p->p_wqthread;
ts64->rdi = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE);
- ts64->rsi = (uint64_t)((unsigned int)(tl->th_thport));
+ ts64->rsi = (uint64_t)(tl->th_thport);
ts64->rdx = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE);
ts64->rcx = (uint64_t)item;
ts64->r8 = (uint64_t)reuse_thread;
*/
ts64->rsp = (uint64_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE) - C_64_REDZONE_LEN);
+ if ((reuse_thread != 0) && (ts64->rdi == (uint64_t)0))
+ panic("setup_wqthread: setting reuse thread with null pthread\n");
thread_set_wq_state64(th, (thread_state_t)ts64);
}
-#elif defined(__arm__)
- arm_thread_state_t state;
- arm_thread_state_t *ts = &state;
-
- /* XXX ARM add more */
- ts->pc = p->p_wqthread;
- ts->sp = tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE;
-
- thread_set_wq_state32(th, (thread_state_t)ts);
#else
#error setup_wqthread not defined for this architecture
#endif
return(0);
}
+int
+fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
+{
+ struct workqueue * wq;
+ int error = 0;
+ int activecount;
+ uint32_t pri, affinity;
+
+ workqueue_lock_spin(p);
+ if ((wq = p->p_wqptr) == NULL) {
+ error = EINVAL;
+ goto out;
+ }
+ activecount = 0;
+
+ for (pri = 0; pri < WORKQUEUE_NUMPRIOS; pri++) {
+ for (affinity = 0; affinity < wq->wq_affinity_max; affinity++)
+ activecount += wq->wq_thactive_count[pri][affinity];
+ }
+ pwqinfo->pwq_nthreads = wq->wq_nthreads;
+ pwqinfo->pwq_runthreads = activecount;
+ pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
+out:
+ workqueue_unlock(p);
+ return(error);
+}
+
+/* Set target concurrency of one of the queue(0,1,2) with specified value */
+int
+proc_settargetconc(pid_t pid, int queuenum, int32_t targetconc)
+{
+ proc_t p, self;
+ uint64_t addr;
+ int32_t conc = targetconc;
+ int error = 0;
+ vm_map_t oldmap = VM_MAP_NULL;
+ int gotref = 0;
+
+ self = current_proc();
+ if (self->p_pid != pid) {
+ /* if not on self, hold a refernce on the process */
+
+ if (pid == 0)
+ return(EINVAL);
+
+ p = proc_find(pid);
+
+ if (p == PROC_NULL)
+ return(ESRCH);
+ gotref = 1;
+
+ } else
+ p = self;
+
+ if ((addr = p->p_targconc) == (uint64_t)0) {
+ error = EINVAL;
+ goto out;
+ }
+
+
+ if ((queuenum >= WQ_MAXPRI_MIN) && (queuenum <= WQ_MAXPRI_MAX)) {
+ addr += (queuenum * sizeof(int32_t));
+ if (gotref == 1)
+ oldmap = vm_map_switch(get_task_map(p->task));
+ error = copyout(&conc, addr, sizeof(int32_t));
+ if (gotref == 1)
+ (void)vm_map_switch(oldmap);
+
+ } else {
+ error = EINVAL;
+ }
+out:
+ if (gotref == 1)
+ proc_rele(p);
+ return(error);
+}
+
+
+/* Set target concurrency on all the prio queues with specified value */
+int
+proc_setalltargetconc(pid_t pid, int32_t * targetconcp)
+{
+ proc_t p, self;
+ uint64_t addr;
+ int error = 0;
+ vm_map_t oldmap = VM_MAP_NULL;
+ int gotref = 0;
+
+ self = current_proc();
+ if (self->p_pid != pid) {
+ /* if not on self, hold a refernce on the process */
+
+ if (pid == 0)
+ return(EINVAL);
+
+ p = proc_find(pid);
+
+ if (p == PROC_NULL)
+ return(ESRCH);
+ gotref = 1;
+
+ } else
+ p = self;
+
+ if ((addr = (uint64_t)p->p_targconc) == (uint64_t)0) {
+ error = EINVAL;
+ goto out;
+ }
+
+
+ if (gotref == 1)
+ oldmap = vm_map_switch(get_task_map(p->task));
+
+ error = copyout(targetconcp, addr, WQ_PRI_NUM * sizeof(int32_t));
+ if (gotref == 1)
+ (void)vm_map_switch(oldmap);
+
+out:
+ if (gotref == 1)
+ proc_rele(p);
+ return(error);
+}
+
+int thread_selfid(__unused struct proc *p, __unused struct thread_selfid_args *uap, uint64_t *retval)
+{
+ thread_t thread = current_thread();
+ *retval = thread_tid(thread);
+ return KERN_SUCCESS;
+}
+
+void
+pthread_init(void)
+{
+ pthread_lck_grp_attr = lck_grp_attr_alloc_init();
+ pthread_lck_grp = lck_grp_alloc_init("pthread", pthread_lck_grp_attr);
+
+ /*
+ * allocate the lock attribute for pthread synchronizers
+ */
+ pthread_lck_attr = lck_attr_alloc_init();
+
+ workqueue_init_lock((proc_t) get_bsdtask_info(kernel_task));
+#if PSYNCH
+ pthread_list_mlock = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr);
+
+ pth_global_hashinit();
+ psynch_thcall = thread_call_allocate(psynch_wq_cleanup, NULL);
+#endif /* PSYNCH */
+}