X-Git-Url: https://git.saurik.com/apple/xnu.git/blobdiff_plain/0c530ab8987f0ae6a1a3d9284f40182b88852816..2d21ac55c334faf3a56e5634905ed6987fc787d4:/bsd/kern/pthread_synch.c diff --git a/bsd/kern/pthread_synch.c b/bsd/kern/pthread_synch.c new file mode 100644 index 000000000..153a2fa81 --- /dev/null +++ b/bsd/kern/pthread_synch.c @@ -0,0 +1,2087 @@ +/* + * Copyright (c) 2000-2007 Apple Inc. All rights reserved. + * + * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ + * + * This file contains Original Code and/or Modifications of Original Code + * as defined in and that are subject to the Apple Public Source License + * Version 2.0 (the 'License'). You may not use this file except in + * compliance with the License. The rights granted to you under the License + * may not be used to create, or enable the creation or redistribution of, + * unlawful or unlicensed copies of an Apple operating system, or to + * circumvent, violate, or enable the circumvention or violation of, any + * terms of an Apple operating system software license agreement. + * + * Please obtain a copy of the License at + * http://www.opensource.apple.com/apsl/ and read it before using this file. + * + * The Original Code and all software distributed under the License are + * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER + * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, + * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. + * Please see the License for the specific language governing rights and + * limitations under the License. + * + * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ + */ +/* Copyright (c) 1995-2005 Apple Computer, Inc. All Rights Reserved */ +/* + * pthread_synch.c + */ + +#define _PTHREAD_CONDATTR_T +#define _PTHREAD_COND_T +#define _PTHREAD_MUTEXATTR_T +#define _PTHREAD_MUTEX_T +#define _PTHREAD_RWLOCKATTR_T +#define _PTHREAD_RWLOCK_T + +#undef pthread_mutexattr_t +#undef pthread_mutex_t +#undef pthread_condattr_t +#undef pthread_cond_t +#undef pthread_rwlockattr_t +#undef pthread_rwlock_t + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include /* for coredump */ + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include /* for thread_exception_return */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include ` /* for current_map() */ +#include /* for thread_resume */ +#include +#if defined(__i386__) +#include +#include +#include +#include +#endif + +#include + +#if 0 +#undef KERNEL_DEBUG +#define KERNEL_DEBUG KERNEL_DEBUG_CONSTANT +#undef KERNEL_DEBUG1 +#define KERNEL_DEBUG1 KERNEL_DEBUG_CONSTANT1 +#endif + + +#if defined(__ppc__) || defined(__ppc64__) +#include +#endif + + +lck_grp_attr_t *pthread_lck_grp_attr; +lck_grp_t *pthread_lck_grp; +lck_attr_t *pthread_lck_attr; +lck_mtx_t * pthread_list_mlock; +extern void pthread_init(void); + +extern kern_return_t thread_getstatus(register thread_t act, int flavor, + thread_state_t tstate, mach_msg_type_number_t *count); +extern kern_return_t thread_setstatus(thread_t thread, int flavor, + thread_state_t tstate, mach_msg_type_number_t count); +extern void thread_set_cthreadself(thread_t thread, uint64_t pself, int isLP64); +extern kern_return_t mach_port_deallocate(ipc_space_t, mach_port_name_t); +extern kern_return_t semaphore_signal_internal_trap(mach_port_name_t); + +static int workqueue_additem(struct workqueue *wq, int prio, user_addr_t item); +static int workqueue_removeitem(struct workqueue *wq, int prio, user_addr_t item); +static void workqueue_run_nextitem(proc_t p, thread_t th); +static void wq_runitem(proc_t p, user_addr_t item, thread_t th, struct threadlist *tl, + int reuse_thread, int wake_thread, int return_directly); +static int setup_wqthread(proc_t p, thread_t th, user_addr_t item, int reuse_thread, struct threadlist *tl); +static int workqueue_addnewthread(struct workqueue *wq); +static void workqueue_removethread(struct workqueue *wq); +static void workqueue_lock(proc_t); +static void workqueue_lock_spin(proc_t); +static void workqueue_unlock(proc_t); + +#define C_32_STK_ALIGN 16 +#define C_64_STK_ALIGN 16 +#define C_64_REDZONE_LEN 128 +#define TRUNC_DOWN32(a,c) ((((uint32_t)a)-(c)) & ((uint32_t)(-(c)))) +#define TRUNC_DOWN64(a,c) ((((uint64_t)a)-(c)) & ((uint64_t)(-(c)))) + + +/* + * Flags filed passed to bsdthread_create and back in pthread_start +31 <---------------------------------> 0 +_________________________________________ +| flags(8) | policy(8) | importance(16) | +----------------------------------------- +*/ +void _pthread_start(pthread_t self, mach_port_t kport, void *(*fun)(void *), void * funarg, size_t stacksize, unsigned int flags); + +#define PTHREAD_START_CUSTOM 0x01000000 +#define PTHREAD_START_SETSCHED 0x02000000 +#define PTHREAD_START_DETACHED 0x04000000 +#define PTHREAD_START_POLICY_BITSHIFT 16 +#define PTHREAD_START_POLICY_MASK 0xffff +#define PTHREAD_START_IMPORTANCE_MASK 0xffff + +#define SCHED_OTHER POLICY_TIMESHARE +#define SCHED_FIFO POLICY_FIFO +#define SCHED_RR POLICY_RR + +void +pthread_init(void) +{ + + pthread_lck_grp_attr = lck_grp_attr_alloc_init(); + pthread_lck_grp = lck_grp_alloc_init("pthread", pthread_lck_grp_attr); + + /* + * allocate the lock attribute for pthread synchronizers + */ + pthread_lck_attr = lck_attr_alloc_init(); + + pthread_list_mlock = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr); + +} + +void +pthread_list_lock(void) +{ + lck_mtx_lock(pthread_list_mlock); +} + +void +pthread_list_unlock(void) +{ + lck_mtx_unlock(pthread_list_mlock); +} + + +int +__pthread_mutex_destroy(__unused struct proc *p, struct __pthread_mutex_destroy_args *uap, __unused register_t *retval) +{ + int res; + int mutexid = uap->mutexid; + pthread_mutex_t * mutex; + lck_mtx_t * lmtx; + lck_mtx_t * lmtx1; + + + mutex = pthread_id_to_mutex(mutexid); + if (mutex == 0) + return(EINVAL); + + MTX_LOCK(mutex->lock); + if (mutex->sig == _PTHREAD_KERN_MUTEX_SIG) + { + if (mutex->owner == (thread_t)NULL && + mutex->refcount == 1) + { + mutex->sig = _PTHREAD_NO_SIG; + lmtx = mutex->mutex; + lmtx1 = mutex->lock; + mutex->mutex = NULL; + pthread_id_mutex_remove(mutexid); + mutex->refcount --; + MTX_UNLOCK(mutex->lock); + lck_mtx_free(lmtx, pthread_lck_grp); + lck_mtx_free(lmtx1, pthread_lck_grp); + kfree((void *)mutex, sizeof(struct _pthread_mutex)); + return(0); + } + else + res = EBUSY; + } + else + res = EINVAL; + MTX_UNLOCK(mutex->lock); + pthread_mutex_release(mutex); + return (res); +} + +/* + * Initialize a mutex variable, possibly with additional attributes. + */ +static void +pthread_mutex_init_internal(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) +{ + mutex->prioceiling = attr->prioceiling; + mutex->protocol = attr->protocol; + mutex->type = attr->type; + mutex->pshared = attr->pshared; + mutex->refcount = 0; + mutex->owner = (thread_t)NULL; + mutex->owner_proc = current_proc(); + mutex->sig = _PTHREAD_KERN_MUTEX_SIG; + mutex->lock = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr); + mutex->mutex = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr); +} + +/* + * Initialize a mutex variable, possibly with additional attributes. + * Public interface - so don't trust the lock - initialize it first. + */ +int +__pthread_mutex_init(__unused struct proc *p, struct __pthread_mutex_init_args *uap, __unused register_t *retval) +{ + user_addr_t umutex = uap->mutex; + pthread_mutex_t * mutex; + user_addr_t uattr = uap->attr; + pthread_mutexattr_t attr; + unsigned int addr = (unsigned int)((uintptr_t)uap->mutex); + int pmutex_sig; + int mutexid; + int error = 0; + + if ((umutex == 0) || (uattr == 0)) + return(EINVAL); + + if ((error = copyin(uattr, &attr, sizeof(pthread_mutexattr_t)))) + return(error); + + if (attr.sig != _PTHREAD_MUTEX_ATTR_SIG) + return (EINVAL); + + if ((error = copyin(umutex, &pmutex_sig, sizeof(int)))) + return(error); + + if (pmutex_sig == _PTHREAD_KERN_MUTEX_SIG) + return(EBUSY); + mutex = (pthread_mutex_t *)kalloc(sizeof(pthread_mutex_t)); + + pthread_mutex_init_internal(mutex, &attr); + + + addr += 8; + mutexid = pthread_id_mutex_add(mutex); + if (mutexid) { + if ((error = copyout(&mutexid, ((user_addr_t)((uintptr_t)(addr))), 4))) + goto cleanup; + return(0); + } else + error = ENOMEM; +cleanup: + if(mutexid) + pthread_id_mutex_remove(mutexid); + lck_mtx_free(mutex->lock, pthread_lck_grp); + lck_mtx_free(mutex->mutex, pthread_lck_grp); + kfree(mutex, sizeof(struct _pthread_mutex)); + return(error); +} + +/* + * Lock a mutex. + * TODO: Priority inheritance stuff + */ +int +__pthread_mutex_lock(struct proc *p, struct __pthread_mutex_lock_args *uap, __unused register_t *retval) +{ + int mutexid = uap->mutexid; + pthread_mutex_t * mutex; + int error; + + mutex = pthread_id_to_mutex(mutexid); + if (mutex == 0) + return(EINVAL); + + MTX_LOCK(mutex->lock); + + if (mutex->sig != _PTHREAD_KERN_MUTEX_SIG) + { + error = EINVAL; + goto out; + } + + if ((p != mutex->owner_proc) && (mutex->pshared != PTHREAD_PROCESS_SHARED)) { + error = EINVAL; + goto out; + } + + MTX_UNLOCK(mutex->lock); + + lck_mtx_lock(mutex->mutex); + + MTX_LOCK(mutex->lock); + mutex->owner = current_thread(); + error = 0; +out: + MTX_UNLOCK(mutex->lock); + pthread_mutex_release(mutex); + return (error); +} + +/* + * Attempt to lock a mutex, but don't block if this isn't possible. + */ +int +__pthread_mutex_trylock(struct proc *p, struct __pthread_mutex_trylock_args *uap, __unused register_t *retval) +{ + int mutexid = uap->mutexid; + pthread_mutex_t * mutex; + boolean_t state; + int error; + + mutex = pthread_id_to_mutex(mutexid); + if (mutex == 0) + return(EINVAL); + + MTX_LOCK(mutex->lock); + + if (mutex->sig != _PTHREAD_KERN_MUTEX_SIG) + { + error = EINVAL; + goto out; + } + + if ((p != mutex->owner_proc) && (mutex->pshared != PTHREAD_PROCESS_SHARED)) { + error = EINVAL; + goto out; + } + + MTX_UNLOCK(mutex->lock); + + state = lck_mtx_try_lock(mutex->mutex); + if (state) { + MTX_LOCK(mutex->lock); + mutex->owner = current_thread(); + MTX_UNLOCK(mutex->lock); + error = 0; + } else + error = EBUSY; + + pthread_mutex_release(mutex); + return (error); +out: + MTX_UNLOCK(mutex->lock); + pthread_mutex_release(mutex); + return (error); +} + +/* + * Unlock a mutex. + * TODO: Priority inheritance stuff + */ +int +__pthread_mutex_unlock(struct proc *p, struct __pthread_mutex_unlock_args *uap, __unused register_t *retval) +{ + int mutexid = uap->mutexid; + pthread_mutex_t * mutex; + int error; + + mutex = pthread_id_to_mutex(mutexid); + if (mutex == 0) + return(EINVAL); + + MTX_LOCK(mutex->lock); + + if (mutex->sig != _PTHREAD_KERN_MUTEX_SIG) + { + error = EINVAL; + goto out; + } + + if ((p != mutex->owner_proc) && (mutex->pshared != PTHREAD_PROCESS_SHARED)) { + error = EINVAL; + goto out; + } + + MTX_UNLOCK(mutex->lock); + + lck_mtx_unlock(mutex->mutex); + + MTX_LOCK(mutex->lock); + mutex->owner = NULL; + error = 0; +out: + MTX_UNLOCK(mutex->lock); + pthread_mutex_release(mutex); + return (error); +} + + +int +__pthread_cond_init(__unused struct proc *p, struct __pthread_cond_init_args *uap, __unused register_t *retval) +{ + pthread_cond_t * cond; + pthread_condattr_t attr; + user_addr_t ucond = uap->cond; + user_addr_t uattr = uap->attr; + unsigned int addr = (unsigned int)((uintptr_t)uap->cond); + int condid, error, cond_sig; + semaphore_t sem; + kern_return_t kret; + int value = 0; + + if ((ucond == 0) || (uattr == 0)) + return(EINVAL); + + if ((error = copyin(uattr, &attr, sizeof(pthread_condattr_t)))) + return(error); + + if (attr.sig != _PTHREAD_COND_ATTR_SIG) + return (EINVAL); + + if ((error = copyin(ucond, &cond_sig, sizeof(int)))) + return(error); + + if (cond_sig == _PTHREAD_KERN_COND_SIG) + return(EBUSY); + kret = semaphore_create(kernel_task, &sem, SYNC_POLICY_FIFO, value); + if (kret != KERN_SUCCESS) + return(ENOMEM); + + cond = (pthread_cond_t *)kalloc(sizeof(pthread_cond_t)); + + cond->lock = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr); + cond->pshared = attr.pshared; + cond->sig = _PTHREAD_KERN_COND_SIG; + cond->sigpending = 0; + cond->waiters = 0; + cond->refcount = 0; + cond->mutex = (pthread_mutex_t *)0; + cond->owner_proc = current_proc(); + cond->sem = sem; + + addr += 8; + condid = pthread_id_cond_add(cond); + if (condid) { + if ((error = copyout(&condid, ((user_addr_t)((uintptr_t)(addr))), 4))) + goto cleanup; + return(0); + } else + error = ENOMEM; +cleanup: + if(condid) + pthread_id_cond_remove(condid); + semaphore_destroy(kernel_task, cond->sem); + kfree(cond, sizeof(pthread_cond_t)); + return(error); +} + + +/* + * Destroy a condition variable. + */ +int +__pthread_cond_destroy(__unused struct proc *p, struct __pthread_cond_destroy_args *uap, __unused register_t *retval) +{ + pthread_cond_t *cond; + int condid = uap->condid; + semaphore_t sem; + lck_mtx_t * lmtx; + int res; + + cond = pthread_id_to_cond(condid); + if (cond == 0) + return(EINVAL); + + COND_LOCK(cond->lock); + if (cond->sig == _PTHREAD_KERN_COND_SIG) + { + if (cond->refcount == 1) + { + cond->sig = _PTHREAD_NO_SIG; + sem = cond->sem; + cond->sem = NULL; + lmtx = cond->lock; + pthread_id_cond_remove(condid); + cond->refcount --; + COND_UNLOCK(cond->lock); + lck_mtx_free(lmtx, pthread_lck_grp); + (void)semaphore_destroy(kernel_task, sem); + kfree((void *)cond, sizeof(pthread_cond_t)); + return(0); + } + else + res = EBUSY; + } + else + res = EINVAL; + COND_UNLOCK(cond->lock); + pthread_cond_release(cond); + return (res); +} + + +/* + * Signal a condition variable, waking up all threads waiting for it. + */ +int +__pthread_cond_broadcast(__unused struct proc *p, struct __pthread_cond_broadcast_args *uap, __unused register_t *retval) +{ + int condid = uap->condid; + pthread_cond_t * cond; + int error; + kern_return_t kret; + + cond = pthread_id_to_cond(condid); + if (cond == 0) + return(EINVAL); + + COND_LOCK(cond->lock); + + if (cond->sig != _PTHREAD_KERN_COND_SIG) + { + error = EINVAL; + goto out; + } + + if ((p != cond->owner_proc) && (cond->pshared != PTHREAD_PROCESS_SHARED)) { + error = EINVAL; + goto out; + } + + COND_UNLOCK(cond->lock); + + kret = semaphore_signal_all(cond->sem); + switch (kret) { + case KERN_INVALID_ADDRESS: + case KERN_PROTECTION_FAILURE: + error = EINVAL; + break; + case KERN_ABORTED: + case KERN_OPERATION_TIMED_OUT: + error = EINTR; + break; + case KERN_SUCCESS: + error = 0; + break; + default: + error = EINVAL; + break; + } + + COND_LOCK(cond->lock); +out: + COND_UNLOCK(cond->lock); + pthread_cond_release(cond); + return (error); +} + + +/* + * Signal a condition variable, waking only one thread. + */ +int +__pthread_cond_signal(__unused struct proc *p, struct __pthread_cond_signal_args *uap, __unused register_t *retval) +{ + int condid = uap->condid; + pthread_cond_t * cond; + int error; + kern_return_t kret; + + cond = pthread_id_to_cond(condid); + if (cond == 0) + return(EINVAL); + + COND_LOCK(cond->lock); + + if (cond->sig != _PTHREAD_KERN_COND_SIG) + { + error = EINVAL; + goto out; + } + + if ((p != cond->owner_proc) && (cond->pshared != PTHREAD_PROCESS_SHARED)) { + error = EINVAL; + goto out; + } + + COND_UNLOCK(cond->lock); + + kret = semaphore_signal(cond->sem); + switch (kret) { + case KERN_INVALID_ADDRESS: + case KERN_PROTECTION_FAILURE: + error = EINVAL; + break; + case KERN_ABORTED: + case KERN_OPERATION_TIMED_OUT: + error = EINTR; + break; + case KERN_SUCCESS: + error = 0; + break; + default: + error = EINVAL; + break; + } + + COND_LOCK(cond->lock); +out: + COND_UNLOCK(cond->lock); + pthread_cond_release(cond); + return (error); +} + + +int +__pthread_cond_wait(__unused struct proc *p, struct __pthread_cond_wait_args *uap, __unused register_t *retval) +{ + int condid = uap->condid; + pthread_cond_t * cond; + int mutexid = uap->mutexid; + pthread_mutex_t * mutex; + int error; + kern_return_t kret; + + cond = pthread_id_to_cond(condid); + if (cond == 0) + return(EINVAL); + + mutex = pthread_id_to_mutex(mutexid); + if (mutex == 0) { + pthread_cond_release(cond); + return(EINVAL); + } + COND_LOCK(cond->lock); + + if (cond->sig != _PTHREAD_KERN_COND_SIG) + { + error = EINVAL; + goto out; + } + + if ((p != cond->owner_proc) && (cond->pshared != PTHREAD_PROCESS_SHARED)) { + error = EINVAL; + goto out; + } + + COND_UNLOCK(cond->lock); + + kret = semaphore_wait(cond->sem); + switch (kret) { + case KERN_INVALID_ADDRESS: + case KERN_PROTECTION_FAILURE: + error = EACCES; + break; + case KERN_ABORTED: + case KERN_OPERATION_TIMED_OUT: + error = EINTR; + break; + case KERN_SUCCESS: + error = 0; + break; + default: + error = EINVAL; + break; + } + + COND_LOCK(cond->lock); +out: + COND_UNLOCK(cond->lock); + pthread_cond_release(cond); + pthread_mutex_release(mutex); + return (error); +} + +int +__pthread_cond_timedwait(__unused struct proc *p, struct __pthread_cond_timedwait_args *uap, __unused register_t *retval) +{ + int condid = uap->condid; + pthread_cond_t * cond; + int mutexid = uap->mutexid; + pthread_mutex_t * mutex; + mach_timespec_t absts; + int error; + kern_return_t kret; + + absts.tv_sec = 0; + absts.tv_nsec = 0; + + if (uap->abstime) + if ((error = copyin(uap->abstime, &absts, sizeof(mach_timespec_t )))) + return(error); + cond = pthread_id_to_cond(condid); + if (cond == 0) + return(EINVAL); + + mutex = pthread_id_to_mutex(mutexid); + if (mutex == 0) { + pthread_cond_release(cond); + return(EINVAL); + } + COND_LOCK(cond->lock); + + if (cond->sig != _PTHREAD_KERN_COND_SIG) + { + error = EINVAL; + goto out; + } + + if ((p != cond->owner_proc) && (cond->pshared != PTHREAD_PROCESS_SHARED)) { + error = EINVAL; + goto out; + } + + COND_UNLOCK(cond->lock); + + kret = semaphore_timedwait(cond->sem, absts); + switch (kret) { + case KERN_INVALID_ADDRESS: + case KERN_PROTECTION_FAILURE: + error = EACCES; + break; + case KERN_ABORTED: + case KERN_OPERATION_TIMED_OUT: + error = EINTR; + break; + case KERN_SUCCESS: + error = 0; + break; + default: + error = EINVAL; + break; + } + + COND_LOCK(cond->lock); +out: + COND_UNLOCK(cond->lock); + pthread_cond_release(cond); + pthread_mutex_release(mutex); + return (error); +} + +int +bsdthread_create(__unused struct proc *p, struct bsdthread_create_args *uap, user_addr_t *retval) +{ + kern_return_t kret; + void * sright; + int error = 0; + int allocated = 0; + mach_vm_offset_t stackaddr; + mach_vm_size_t th_allocsize = 0; + mach_vm_size_t user_stacksize; + mach_vm_size_t th_stacksize; + mach_vm_offset_t th_stackaddr; + mach_vm_offset_t th_stack; + mach_vm_offset_t th_pthread; + mach_port_t th_thport; + thread_t th; + user_addr_t user_func = uap->func; + user_addr_t user_funcarg = uap->func_arg; + user_addr_t user_stack = uap->stack; + user_addr_t user_pthread = uap->pthread; + unsigned int flags = (unsigned int)uap->flags; + vm_map_t vmap = current_map(); + task_t ctask = current_task(); + unsigned int policy, importance; + + int isLP64 = 0; + + +#if 0 + KERNEL_DEBUG_CONSTANT(0x9000080 | DBG_FUNC_START, flags, 0, 0, 0, 0); +#endif + + isLP64 = IS_64BIT_PROCESS(p); + + +#if defined(__ppc__) + stackaddr = 0xF0000000; +#elif defined(__i386__) + stackaddr = 0xB0000000; +#elif defined(__arm__) + stackaddr = 0xB0000000; /* XXX ARM */ +#else +#error Need to define a stack address hint for this architecture +#endif + kret = thread_create(ctask, &th); + if (kret != KERN_SUCCESS) + return(ENOMEM); + thread_reference(th); + + sright = (void *) convert_thread_to_port(th); + th_thport = (void *)ipc_port_copyout_send(sright, get_task_ipcspace(ctask)); + + if ((flags & PTHREAD_START_CUSTOM) == 0) { + th_stacksize = (mach_vm_size_t)user_stack; /* if it is custom them it is stacksize */ + th_allocsize = th_stacksize + PTH_DEFAULT_GUARDSIZE + p->p_pthsize; + + kret = mach_vm_map(vmap, &stackaddr, + th_allocsize, + page_size-1, + VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE , NULL, + 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL, + VM_INHERIT_DEFAULT); + if (kret != KERN_SUCCESS) + kret = mach_vm_allocate(vmap, + &stackaddr, th_allocsize, + VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE); + if (kret != KERN_SUCCESS) { + error = ENOMEM; + goto out; + } +#if 0 + KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_NONE, th_allocsize, stackaddr, 0, 2, 0); +#endif + th_stackaddr = stackaddr; + allocated = 1; + /* + * The guard page is at the lowest address + * The stack base is the highest address + */ + kret = mach_vm_protect(vmap, stackaddr, PTH_DEFAULT_GUARDSIZE, FALSE, VM_PROT_NONE); + + if (kret != KERN_SUCCESS) { + error = ENOMEM; + goto out1; + } + th_stack = (stackaddr + th_stacksize + PTH_DEFAULT_GUARDSIZE); + th_pthread = (stackaddr + th_stacksize + PTH_DEFAULT_GUARDSIZE); + user_stacksize = th_stacksize; + } else { + th_stack = user_stack; + user_stacksize = user_stack; + th_pthread = user_pthread; +#if 0 + KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_NONE, 0, 0, 0, 3, 0); +#endif + } + +#if defined(__ppc__) + /* + * Set up PowerPC registers... + * internally they are always kept as 64 bit and + * since the register set is the same between 32 and 64bit modes + * we don't need 2 different methods for setting the state + */ + { + ppc_thread_state64_t state64; + ppc_thread_state64_t *ts64 = &state64; + + ts64->srr0 = (uint64_t)p->p_threadstart; + ts64->r1 = (uint64_t)(th_stack - C_ARGSAVE_LEN - C_RED_ZONE); + ts64->r3 = (uint64_t)th_pthread; + ts64->r4 = (uint64_t)((unsigned int)th_thport); + ts64->r5 = (uint64_t)user_func; + ts64->r6 = (uint64_t)user_funcarg; + ts64->r7 = (uint64_t)user_stacksize; + ts64->r8 = (uint64_t)uap->flags; + + thread_set_wq_state64(th, (thread_state_t)ts64); + + thread_set_cthreadself(th, (uint64_t)th_pthread, isLP64); + } +#elif defined(__i386__) + { + /* + * Set up i386 registers & function call. + */ + if (isLP64 == 0) { + x86_thread_state32_t state; + x86_thread_state32_t *ts = &state; + + ts->eip = (int)p->p_threadstart; + ts->eax = (unsigned int)th_pthread; + ts->ebx = (unsigned int)th_thport; + ts->ecx = (unsigned int)user_func; + ts->edx = (unsigned int)user_funcarg; + ts->edi = (unsigned int)user_stacksize; + ts->esi = (unsigned int)uap->flags; + /* + * set stack pointer + */ + ts->esp = (int)((vm_offset_t)(th_stack-C_32_STK_ALIGN)); + + thread_set_wq_state32(th, (thread_state_t)ts); + + } else { + x86_thread_state64_t state64; + x86_thread_state64_t *ts64 = &state64; + + ts64->rip = (uint64_t)p->p_threadstart; + ts64->rdi = (uint64_t)th_pthread; + ts64->rsi = (uint64_t)((unsigned int)(th_thport)); + ts64->rdx = (uint64_t)user_func; + ts64->rcx = (uint64_t)user_funcarg; + ts64->r8 = (uint64_t)user_stacksize; + ts64->r9 = (uint64_t)uap->flags; + /* + * set stack pointer aligned to 16 byte boundary + */ + ts64->rsp = (uint64_t)(th_stack - C_64_REDZONE_LEN); + + thread_set_wq_state64(th, (thread_state_t)ts64); + } + } +#elif defined(__arm__) + { + int flavor=0, count=0; + void * state; + + kret = thread_getstatus(th, flavor, (thread_state_t)&state, &count); + if (kret != KERN_SUCCESS) { + error = EINVAL; + goto out1; + } + + /* XXX ARM TODO */ + + kret = thread_setstatus(th, flavor, (thread_state_t)&state, count); + if (kret != KERN_SUCCESS) + error = EINVAL; + goto out1; + } +#else +#error bsdthread_create not defined for this architecture +#endif + /* Set scheduling parameters if needed */ + if ((flags & PTHREAD_START_SETSCHED) != 0) { + thread_extended_policy_data_t extinfo; + thread_precedence_policy_data_t precedinfo; + + importance = (flags & PTHREAD_START_IMPORTANCE_MASK); + policy = (flags >> PTHREAD_START_POLICY_BITSHIFT) & PTHREAD_START_POLICY_MASK; + + if (policy == SCHED_OTHER) + extinfo.timeshare = 1; + else + extinfo.timeshare = 0; + thread_policy_set(th, THREAD_EXTENDED_POLICY, (thread_policy_t)&extinfo, THREAD_EXTENDED_POLICY_COUNT); + + precedinfo.importance = importance; + thread_policy_set(th, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT); + } + + kret = thread_resume(th); + if (kret != KERN_SUCCESS) { + error = EINVAL; + goto out1; + } + thread_deallocate(th); /* drop the creator reference */ +#if 0 + KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_END, error, (unsigned int)th_pthread, 0, 0, 0); +#endif + *retval = th_pthread; + + return(0); + +out1: + if (allocated != 0) + (void)mach_vm_deallocate(vmap, stackaddr, th_allocsize); +out: + (void)mach_port_deallocate(get_task_ipcspace(ctask), (mach_port_name_t)th_thport); + (void)thread_terminate(th); + (void)thread_deallocate(th); + return(error); +} + +int +bsdthread_terminate(__unused struct proc *p, struct bsdthread_terminate_args *uap, __unused register_t *retval) +{ + mach_vm_offset_t freeaddr; + mach_vm_size_t freesize; + kern_return_t kret; + mach_port_name_t kthport = (mach_port_name_t)uap->port; + mach_port_name_t sem = (mach_port_name_t)uap->sem; + + freeaddr = (mach_vm_offset_t)uap->stackaddr; + freesize = uap->freesize; + +#if 0 + KERNEL_DEBUG_CONSTANT(0x9000084 |DBG_FUNC_START, (unsigned int)freeaddr, (unsigned int)freesize, (unsigned int)kthport, 0xff, 0); +#endif + if (sem != MACH_PORT_NULL) { + kret = semaphore_signal_internal_trap(sem); + if (kret != KERN_SUCCESS) { + return(EINVAL); + } + } + if ((freesize != (mach_vm_size_t)0) && (freeaddr != (mach_vm_offset_t)0)) { + kret = mach_vm_deallocate(current_map(), freeaddr, freesize); + if (kret != KERN_SUCCESS) { + return(EINVAL); + } + } + + (void) thread_terminate(current_thread()); + if (kthport != MACH_PORT_NULL) + mach_port_deallocate(get_task_ipcspace(current_task()), kthport); + thread_exception_return(); + panic("bsdthread_terminate: still running\n"); +#if 0 + KERNEL_DEBUG_CONSTANT(0x9000084 |DBG_FUNC_END, 0, 0, 0, 0xff, 0); +#endif + return(0); +} + + +int +bsdthread_register(struct proc *p, struct bsdthread_register_args *uap, __unused register_t *retval) +{ + /* syscall randomizer test can pass bogus values */ + if (uap->pthsize > MAX_PTHREAD_SIZE) { + return(EINVAL); + } + p->p_threadstart = uap->threadstart; + p->p_wqthread = uap->wqthread; + p->p_pthsize = uap->pthsize; + + return(0); +} + + + + +int wq_stalled_window_usecs = WQ_STALLED_WINDOW_USECS; +int wq_reduce_pool_window_usecs = WQ_REDUCE_POOL_WINDOW_USECS; +int wq_max_run_latency_usecs = WQ_MAX_RUN_LATENCY_USECS; +int wq_timer_interval_msecs = WQ_TIMER_INTERVAL_MSECS; + + +SYSCTL_INT(_kern, OID_AUTO, wq_stalled_window_usecs, CTLFLAG_RW, + &wq_stalled_window_usecs, 0, ""); + +SYSCTL_INT(_kern, OID_AUTO, wq_reduce_pool_window_usecs, CTLFLAG_RW, + &wq_reduce_pool_window_usecs, 0, ""); + +SYSCTL_INT(_kern, OID_AUTO, wq_max_run_latency_usecs, CTLFLAG_RW, + &wq_max_run_latency_usecs, 0, ""); + +SYSCTL_INT(_kern, OID_AUTO, wq_timer_interval_msecs, CTLFLAG_RW, + &wq_timer_interval_msecs, 0, ""); + + + + +void +workqueue_init_lock(proc_t p) +{ + lck_mtx_init(&p->p_wqlock, pthread_lck_grp, pthread_lck_attr); +} + +void +workqueue_destroy_lock(proc_t p) +{ + lck_mtx_destroy(&p->p_wqlock, pthread_lck_grp); +} + +static void +workqueue_lock(proc_t p) +{ + lck_mtx_lock(&p->p_wqlock); +} + +static void +workqueue_lock_spin(proc_t p) +{ + lck_mtx_lock_spin(&p->p_wqlock); +} + +static void +workqueue_unlock(proc_t p) +{ + lck_mtx_unlock(&p->p_wqlock); +} + + + +static void +workqueue_interval_timer_start(thread_call_t call, int interval_in_ms) +{ + uint64_t deadline; + + clock_interval_to_deadline(interval_in_ms, 1000 * 1000, &deadline); + + thread_call_enter_delayed(call, deadline); +} + + +static void +workqueue_timer(struct workqueue *wq, __unused int param1) +{ + struct timeval tv, dtv; + uint32_t i; + boolean_t added_more_threads = FALSE; + boolean_t reset_maxactive = FALSE; + boolean_t restart_timer = FALSE; + + microuptime(&tv); + + KERNEL_DEBUG(0xefffd108, (int)wq, 0, 0, 0, 0); + + /* + * check to see if the stall frequency was beyond our tolerance + * or we have work on the queue, but haven't scheduled any + * new work within our acceptable time interval because + * there were no idle threads left to schedule + * + * WQ_TIMER_WATCH will only be set if we have 1 or more affinity + * groups that have stalled (no active threads and no idle threads)... + * it will not be set if all affinity groups have at least 1 thread + * that is currently runnable... if all processors have a runnable + * thread, there is no need to add more threads even if we're not + * scheduling new work within our allowed window... it just means + * that the work items are taking a long time to complete. + */ + if (wq->wq_flags & (WQ_ADD_TO_POOL | WQ_TIMER_WATCH)) { + + if (wq->wq_flags & WQ_ADD_TO_POOL) + added_more_threads = TRUE; + else { + timersub(&tv, &wq->wq_lastran_ts, &dtv); + + if (((dtv.tv_sec * 1000000) + dtv.tv_usec) > wq_stalled_window_usecs) + added_more_threads = TRUE; + } + if (added_more_threads == TRUE) { + for (i = 0; i < wq->wq_affinity_max && wq->wq_nthreads < WORKQUEUE_MAXTHREADS; i++) { + (void)workqueue_addnewthread(wq); + } + } + } + timersub(&tv, &wq->wq_reduce_ts, &dtv); + + if (((dtv.tv_sec * 1000000) + dtv.tv_usec) > wq_reduce_pool_window_usecs) + reset_maxactive = TRUE; + + /* + * if the pool size has grown beyond the minimum number + * of threads needed to keep all of the processors busy, and + * the maximum number of threads scheduled concurrently during + * the last sample period didn't exceed half the current pool + * size, then its time to trim the pool size back + */ + if (added_more_threads == FALSE && + reset_maxactive == TRUE && + wq->wq_nthreads > wq->wq_affinity_max && + wq->wq_max_threads_scheduled <= (wq->wq_nthreads / 2)) { + uint32_t nthreads_to_remove; + + if ((nthreads_to_remove = (wq->wq_nthreads / 4)) == 0) + nthreads_to_remove = 1; + + for (i = 0; i < nthreads_to_remove && wq->wq_nthreads > wq->wq_affinity_max; i++) + workqueue_removethread(wq); + } + workqueue_lock_spin(wq->wq_proc); + + if (reset_maxactive == TRUE) { + wq->wq_max_threads_scheduled = 0; + microuptime(&wq->wq_reduce_ts); + } + if (added_more_threads) { + wq->wq_flags &= ~(WQ_ADD_TO_POOL | WQ_TIMER_WATCH); + + /* + * since we added more threads, we should be + * able to run some work if its still available + */ + workqueue_run_nextitem(wq->wq_proc, THREAD_NULL); + workqueue_lock_spin(wq->wq_proc); + } + if ((wq->wq_nthreads > wq->wq_affinity_max) || + (wq->wq_flags & WQ_TIMER_WATCH)) { + restart_timer = TRUE; + } else + wq->wq_flags &= ~WQ_TIMER_RUNNING; + + workqueue_unlock(wq->wq_proc); + + /* + * we needed to knock down the WQ_TIMER_RUNNING flag while behind + * the workqueue lock... however, we don't want to hold the lock + * while restarting the timer and we certainly don't want 2 or more + * instances of the timer... so set a local to indicate the need + * for a restart since the state of wq_flags may change once we + * drop the workqueue lock... + */ + if (restart_timer == TRUE) + workqueue_interval_timer_start(wq->wq_timer_call, wq_timer_interval_msecs); +} + + +static void +workqueue_callback( + int type, + thread_t thread) +{ + struct uthread *uth; + struct threadlist *tl; + struct workqueue *wq; + + uth = get_bsdthread_info(thread); + tl = uth->uu_threadlist; + wq = tl->th_workq; + + switch (type) { + + case SCHED_CALL_BLOCK: + { + uint32_t old_activecount; + + old_activecount = OSAddAtomic(-1, (SInt32 *)&wq->wq_thactivecount[tl->th_affinity_tag]); + + if (old_activecount == 1 && wq->wq_itemcount) { + /* + * we were the last active thread on this affinity set + * and we've got work to do + */ + workqueue_lock_spin(wq->wq_proc); + /* + * if this thread is blocking (not parking) + * and the idle list is empty for this affinity group + * we'll count it as a 'stall' + */ + if ((tl->th_flags & TH_LIST_RUNNING) && + TAILQ_EMPTY(&wq->wq_thidlelist[tl->th_affinity_tag])) + wq->wq_stalled_count++; + + workqueue_run_nextitem(wq->wq_proc, THREAD_NULL); + /* + * workqueue_run_nextitem will drop the workqueue + * lock before it returns + */ + } + KERNEL_DEBUG(0xefffd020, (int)thread, wq->wq_threads_scheduled, tl->th_affinity_tag, 0, 0); + } + break; + + case SCHED_CALL_UNBLOCK: + /* + * we cannot take the workqueue_lock here... + * an UNBLOCK can occur from a timer event which + * is run from an interrupt context... if the workqueue_lock + * is already held by this processor, we'll deadlock... + * the thread lock for the thread being UNBLOCKED + * is also held + */ + if (tl->th_unparked) + OSAddAtomic(-1, (SInt32 *)&tl->th_unparked); + else + OSAddAtomic(1, (SInt32 *)&wq->wq_thactivecount[tl->th_affinity_tag]); + + KERNEL_DEBUG(0xefffd024, (int)thread, wq->wq_threads_scheduled, tl->th_affinity_tag, 0, 0); + break; + } +} + +static void +workqueue_removethread(struct workqueue *wq) +{ + struct threadlist *tl; + uint32_t i, affinity_tag = 0; + + tl = NULL; + + workqueue_lock_spin(wq->wq_proc); + + for (i = 0; i < wq->wq_affinity_max; i++) { + + affinity_tag = wq->wq_nextaffinitytag; + + if (affinity_tag == 0) + affinity_tag = wq->wq_affinity_max - 1; + else + affinity_tag--; + wq->wq_nextaffinitytag = affinity_tag; + + /* + * look for an idle thread to steal from this affinity group + * but don't grab the only thread associated with it + */ + if (!TAILQ_EMPTY(&wq->wq_thidlelist[affinity_tag]) && wq->wq_thcount[affinity_tag] > 1) { + tl = TAILQ_FIRST(&wq->wq_thidlelist[affinity_tag]); + TAILQ_REMOVE(&wq->wq_thidlelist[affinity_tag], tl, th_entry); + + wq->wq_nthreads--; + wq->wq_thcount[affinity_tag]--; + + break; + } + } + workqueue_unlock(wq->wq_proc); + + if (tl != NULL) { + thread_sched_call(tl->th_thread, NULL); + + if ( (tl->th_flags & TH_LIST_BLOCKED) ) + wakeup(tl); + else { + /* + * thread was created, but never used... + * need to clean up the stack and port ourselves + * since we're not going to spin up through the + * normal exit path triggered from Libc + */ + (void)mach_vm_deallocate(wq->wq_map, tl->th_stackaddr, tl->th_allocsize); + (void)mach_port_deallocate(get_task_ipcspace(wq->wq_task), (mach_port_name_t)tl->th_thport); + + thread_terminate(tl->th_thread); + } + KERNEL_DEBUG(0xefffd030, (int)tl->th_thread, wq->wq_nthreads, tl->th_flags & TH_LIST_BLOCKED, 0, 0); + /* + * drop our ref on the thread + */ + thread_deallocate(tl->th_thread); + + kfree(tl, sizeof(struct threadlist)); + } +} + + +static int +workqueue_addnewthread(struct workqueue *wq) +{ + struct threadlist *tl; + struct uthread *uth; + kern_return_t kret; + thread_t th; + proc_t p; + void *sright; + mach_vm_offset_t stackaddr; + uint32_t affinity_tag; + + p = wq->wq_proc; + + kret = thread_create(wq->wq_task, &th); + + if (kret != KERN_SUCCESS) + return(EINVAL); + + tl = kalloc(sizeof(struct threadlist)); + bzero(tl, sizeof(struct threadlist)); + +#if defined(__ppc__) + stackaddr = 0xF0000000; +#elif defined(__i386__) + stackaddr = 0xB0000000; +#elif defined(__arm__) + stackaddr = 0xB0000000; /* XXX ARM */ +#else +#error Need to define a stack address hint for this architecture +#endif + tl->th_allocsize = PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE + p->p_pthsize; + + kret = mach_vm_map(wq->wq_map, &stackaddr, + tl->th_allocsize, + page_size-1, + VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE , NULL, + 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL, + VM_INHERIT_DEFAULT); + + if (kret != KERN_SUCCESS) { + kret = mach_vm_allocate(wq->wq_map, + &stackaddr, tl->th_allocsize, + VM_MAKE_TAG(VM_MEMORY_STACK) | VM_FLAGS_ANYWHERE); + } + if (kret == KERN_SUCCESS) { + /* + * The guard page is at the lowest address + * The stack base is the highest address + */ + kret = mach_vm_protect(wq->wq_map, stackaddr, PTH_DEFAULT_GUARDSIZE, FALSE, VM_PROT_NONE); + + if (kret != KERN_SUCCESS) + (void) mach_vm_deallocate(wq->wq_map, stackaddr, tl->th_allocsize); + } + if (kret != KERN_SUCCESS) { + (void) thread_terminate(th); + + kfree(tl, sizeof(struct threadlist)); + + return(EINVAL); + } + thread_reference(th); + + sright = (void *) convert_thread_to_port(th); + tl->th_thport = (void *)ipc_port_copyout_send(sright, get_task_ipcspace(wq->wq_task)); + + thread_static_param(th, TRUE); + + workqueue_lock_spin(p); + + affinity_tag = wq->wq_nextaffinitytag; + wq->wq_nextaffinitytag = (affinity_tag + 1) % wq->wq_affinity_max; + + workqueue_unlock(p); + + tl->th_flags = TH_LIST_INITED | TH_LIST_SUSPENDED; + + tl->th_thread = th; + tl->th_workq = wq; + tl->th_stackaddr = stackaddr; + tl->th_affinity_tag = affinity_tag; + +#if defined(__ppc__) + //ml_fp_setvalid(FALSE); + thread_set_cthreadself(th, (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE), IS_64BIT_PROCESS(p)); +#endif /* __ppc__ */ + /* + * affinity tag of 0 means no affinity... + * but we want our tags to be 0 based because they + * are used to index arrays, so... + * keep it 0 based internally and bump by 1 when + * calling out to set it + */ + (void)thread_affinity_set(th, affinity_tag + 1); + thread_sched_call(th, workqueue_callback); + + uth = get_bsdthread_info(tl->th_thread); + uth->uu_threadlist = (void *)tl; + + workqueue_lock_spin(p); + + TAILQ_INSERT_TAIL(&wq->wq_thidlelist[tl->th_affinity_tag], tl, th_entry); + wq->wq_nthreads++; + wq->wq_thcount[affinity_tag]++; + + KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_START, (int)current_thread(), affinity_tag, wq->wq_nthreads, 0, (int)tl->th_thread); + + /* + * work may have come into the queue while + * no threads were available to run... since + * we're adding a new thread, go evaluate the + * current state + */ + workqueue_run_nextitem(p, THREAD_NULL); + /* + * workqueue_run_nextitem is responsible for + * dropping the workqueue lock in all cases + */ + + return(0); +} + +int +workq_open(__unused struct proc *p, __unused struct workq_open_args *uap, __unused register_t *retval) +{ + struct workqueue * wq; + int size; + char * ptr; + int j; + uint32_t i; + int error = 0; + int num_cpus; + struct workitem * witem; + struct workitemlist *wl; + + workqueue_lock(p); + + if (p->p_wqptr == NULL) { + num_cpus = ml_get_max_cpus(); + + size = (sizeof(struct workqueue)) + + (num_cpus * sizeof(int *)) + + (num_cpus * sizeof(TAILQ_HEAD(, threadlist))); + + ptr = (char *)kalloc(size); + bzero(ptr, size); + + wq = (struct workqueue *)ptr; + wq->wq_flags = WQ_LIST_INITED; + wq->wq_proc = p; + wq->wq_affinity_max = num_cpus; + wq->wq_task = current_task(); + wq->wq_map = current_map(); + + for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) { + wl = (struct workitemlist *)&wq->wq_list[i]; + TAILQ_INIT(&wl->wl_itemlist); + TAILQ_INIT(&wl->wl_freelist); + + for (j = 0; j < WORKITEM_SIZE; j++) { + witem = &wq->wq_array[(i*WORKITEM_SIZE) + j]; + TAILQ_INSERT_TAIL(&wl->wl_freelist, witem, wi_entry); + } + } + wq->wq_thactivecount = (uint32_t *)((char *)ptr + sizeof(struct workqueue)); + wq->wq_thcount = (uint32_t *)&wq->wq_thactivecount[wq->wq_affinity_max]; + wq->wq_thidlelist = (struct wq_thidlelist *)&wq->wq_thcount[wq->wq_affinity_max]; + + for (i = 0; i < wq->wq_affinity_max; i++) + TAILQ_INIT(&wq->wq_thidlelist[i]); + + TAILQ_INIT(&wq->wq_thrunlist); + + p->p_wqptr = (void *)wq; + p->p_wqsize = size; + + workqueue_unlock(p); + + wq->wq_timer_call = thread_call_allocate((thread_call_func_t)workqueue_timer, (thread_call_param_t)wq); + + for (i = 0; i < wq->wq_affinity_max; i++) { + (void)workqueue_addnewthread(wq); + } + /* If unable to create any threads, return error */ + if (wq->wq_nthreads == 0) + error = EINVAL; + workqueue_lock_spin(p); + + microuptime(&wq->wq_reduce_ts); + microuptime(&wq->wq_lastran_ts); + wq->wq_max_threads_scheduled = 0; + wq->wq_stalled_count = 0; + } + workqueue_unlock(p); + + return(error); +} + +int +workq_ops(struct proc *p, struct workq_ops_args *uap, __unused register_t *retval) +{ + int options = uap->options; + int prio = uap->prio; /* should be used to find the right workqueue */ + user_addr_t item = uap->item; + int error = 0; + thread_t th = THREAD_NULL; + struct workqueue *wq; + + prio += 2; /* normalize prio -2 to +2 to 0 -4 */ + + switch (options) { + + case WQOPS_QUEUE_ADD: { + + KERNEL_DEBUG(0xefffd008 | DBG_FUNC_NONE, (int)item, 0, 0, 0, 0); + + workqueue_lock_spin(p); + + if ((wq = (struct workqueue *)p->p_wqptr) == NULL) { + workqueue_unlock(p); + return (EINVAL); + } + error = workqueue_additem(wq, prio, item); + + } + break; + case WQOPS_QUEUE_REMOVE: { + + workqueue_lock_spin(p); + + if ((wq = (struct workqueue *)p->p_wqptr) == NULL) { + workqueue_unlock(p); + return (EINVAL); + } + error = workqueue_removeitem(wq, prio, item); + } + break; + case WQOPS_THREAD_RETURN: { + + th = current_thread(); + + KERNEL_DEBUG(0xefffd004 | DBG_FUNC_END, 0, 0, 0, 0, 0); + + workqueue_lock_spin(p); + + if ((wq = (struct workqueue *)p->p_wqptr) == NULL) { + workqueue_unlock(p); + return (EINVAL); + } + } + break; + default: + return (EINVAL); + } + workqueue_run_nextitem(p, th); + /* + * workqueue_run_nextitem is responsible for + * dropping the workqueue lock in all cases + */ + return(error); +} + +void +workqueue_exit(struct proc *p) +{ + struct workqueue * wq; + struct threadlist * tl, *tlist; + uint32_t i; + + if (p->p_wqptr != NULL) { + + workqueue_lock_spin(p); + + wq = (struct workqueue *)p->p_wqptr; + p->p_wqptr = NULL; + + workqueue_unlock(p); + + if (wq == NULL) + return; + + if (wq->wq_flags & WQ_TIMER_RUNNING) + thread_call_cancel(wq->wq_timer_call); + thread_call_free(wq->wq_timer_call); + + TAILQ_FOREACH_SAFE(tl, &wq->wq_thrunlist, th_entry, tlist) { + /* + * drop our last ref on the thread + */ + thread_sched_call(tl->th_thread, NULL); + thread_deallocate(tl->th_thread); + + TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry); + kfree(tl, sizeof(struct threadlist)); + } + for (i = 0; i < wq->wq_affinity_max; i++) { + TAILQ_FOREACH_SAFE(tl, &wq->wq_thidlelist[i], th_entry, tlist) { + /* + * drop our last ref on the thread + */ + thread_sched_call(tl->th_thread, NULL); + thread_deallocate(tl->th_thread); + + TAILQ_REMOVE(&wq->wq_thidlelist[i], tl, th_entry); + kfree(tl, sizeof(struct threadlist)); + } + } + kfree(wq, p->p_wqsize); + } +} + +static int +workqueue_additem(struct workqueue *wq, int prio, user_addr_t item) +{ + struct workitem *witem; + struct workitemlist *wl; + + wl = (struct workitemlist *)&wq->wq_list[prio]; + + if (TAILQ_EMPTY(&wl->wl_freelist)) + return (ENOMEM); + + witem = (struct workitem *)TAILQ_FIRST(&wl->wl_freelist); + TAILQ_REMOVE(&wl->wl_freelist, witem, wi_entry); + + witem->wi_item = item; + TAILQ_INSERT_TAIL(&wl->wl_itemlist, witem, wi_entry); + + if (wq->wq_itemcount == 0) { + microuptime(&wq->wq_lastran_ts); + wq->wq_stalled_count = 0; + } + wq->wq_itemcount++; + + return (0); +} + +static int +workqueue_removeitem(struct workqueue *wq, int prio, user_addr_t item) +{ + struct workitem *witem; + struct workitemlist *wl; + int error = ESRCH; + + wl = (struct workitemlist *)&wq->wq_list[prio]; + + TAILQ_FOREACH(witem, &wl->wl_itemlist, wi_entry) { + if (witem->wi_item == item) { + TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry); + wq->wq_itemcount--; + + witem->wi_item = (user_addr_t)0; + TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, wi_entry); + + error = 0; + break; + } + } + if (wq->wq_itemcount == 0) + wq->wq_flags &= ~(WQ_ADD_TO_POOL | WQ_TIMER_WATCH); + + return (error); +} + +/* + * workqueue_run_nextitem: + * called with the workqueue lock held... + * responsible for dropping it in all cases + */ +static void +workqueue_run_nextitem(proc_t p, thread_t thread) +{ + struct workqueue *wq; + struct workitem *witem = NULL; + user_addr_t item = 0; + thread_t th_to_run = THREAD_NULL; + thread_t th_to_park = THREAD_NULL; + int wake_thread = 0; + int reuse_thread = 1; + uint32_t stalled_affinity_count = 0; + int i; + uint32_t affinity_tag; + struct threadlist *tl = NULL; + struct uthread *uth = NULL; + struct workitemlist *wl; + boolean_t start_timer = FALSE; + struct timeval tv, lat_tv; + + wq = (struct workqueue *)p->p_wqptr; + + KERNEL_DEBUG(0xefffd000 | DBG_FUNC_START, (int)thread, wq->wq_threads_scheduled, wq->wq_stalled_count, 0, 0); + + if (wq->wq_itemcount == 0) { + if ((th_to_park = thread) == THREAD_NULL) + goto out; + goto parkit; + } + if (thread != THREAD_NULL) { + /* + * we're a worker thread from the pool... currently we + * are considered 'active' which means we're counted + * in "wq_thactivecount" + */ + uth = get_bsdthread_info(thread); + tl = uth->uu_threadlist; + + if (wq->wq_thactivecount[tl->th_affinity_tag] == 1) { + /* + * we're the only active thread associated with our + * affinity group, so pick up some work and keep going + */ + th_to_run = thread; + goto pick_up_work; + } + } + for (affinity_tag = 0; affinity_tag < wq->wq_affinity_max; affinity_tag++) { + /* + * look for first affinity group that is currently not active + * and has at least 1 idle thread + */ + if (wq->wq_thactivecount[affinity_tag] == 0) { + if (!TAILQ_EMPTY(&wq->wq_thidlelist[affinity_tag])) + break; + stalled_affinity_count++; + } + } + if (thread == THREAD_NULL) { + /* + * we're not one of the 'worker' threads + */ + if (affinity_tag >= wq->wq_affinity_max) { + /* + * we've already got at least 1 thread per + * affinity group in the active state... or + * we've got no idle threads to play with + */ + if (stalled_affinity_count) { + + if ( !(wq->wq_flags & WQ_TIMER_RUNNING) ) { + wq->wq_flags |= WQ_TIMER_RUNNING; + start_timer = TRUE; + } + wq->wq_flags |= WQ_TIMER_WATCH; + } + goto out; + } + } else { + /* + * we're overbooked on the affinity group we're associated with, + * so park this thread + */ + th_to_park = thread; + + if (affinity_tag >= wq->wq_affinity_max) { + /* + * all the affinity groups have active threads + * running, or there are no idle threads to + * schedule + */ + if (stalled_affinity_count) { + + if ( !(wq->wq_flags & WQ_TIMER_RUNNING) ) { + wq->wq_flags |= WQ_TIMER_RUNNING; + start_timer = TRUE; + } + wq->wq_flags |= WQ_TIMER_WATCH; + } + goto parkit; + } + /* + * we've got a candidate (affinity group with no currently + * active threads) to start a new thread on... + * we already know there is both work available + * and an idle thread with the correct affinity tag, so + * fall into the code that pulls a new thread and workitem... + * once we've kicked that thread off, we'll park this one + */ + } + tl = TAILQ_FIRST(&wq->wq_thidlelist[affinity_tag]); + TAILQ_REMOVE(&wq->wq_thidlelist[affinity_tag], tl, th_entry); + + th_to_run = tl->th_thread; + TAILQ_INSERT_TAIL(&wq->wq_thrunlist, tl, th_entry); + + if ((tl->th_flags & TH_LIST_SUSPENDED) == TH_LIST_SUSPENDED) { + tl->th_flags &= ~TH_LIST_SUSPENDED; + reuse_thread = 0; + } else if ((tl->th_flags & TH_LIST_BLOCKED) == TH_LIST_BLOCKED) { + tl->th_flags &= ~TH_LIST_BLOCKED; + wake_thread = 1; + } + tl->th_flags |= TH_LIST_RUNNING; + + wq->wq_threads_scheduled++; + + if (wq->wq_threads_scheduled > wq->wq_max_threads_scheduled) + wq->wq_max_threads_scheduled = wq->wq_threads_scheduled; + +pick_up_work: + for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) { + wl = (struct workitemlist *)&wq->wq_list[i]; + + if (!(TAILQ_EMPTY(&wl->wl_itemlist))) { + + witem = TAILQ_FIRST(&wl->wl_itemlist); + TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry); + wq->wq_itemcount--; + + item = witem->wi_item; + witem->wi_item = (user_addr_t)0; + TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, wi_entry); + + break; + } + } + if (witem == NULL) + panic("workq_run_nextitem: NULL witem"); + + if (thread != th_to_run) { + /* + * we're starting up a thread from a parked/suspended condition + */ + OSAddAtomic(1, (SInt32 *)&wq->wq_thactivecount[tl->th_affinity_tag]); + OSAddAtomic(1, (SInt32 *)&tl->th_unparked); + } + if (wq->wq_itemcount == 0) + wq->wq_flags &= ~WQ_TIMER_WATCH; + else { + microuptime(&tv); + /* + * if we had any affinity groups stall (no threads runnable) + * since we last scheduled an item... and + * the elapsed time since we last scheduled an item + * exceeds the latency tolerance... + * we ask the timer thread (which should already be running) + * to add some more threads to the pool + */ + if (wq->wq_stalled_count && !(wq->wq_flags & WQ_ADD_TO_POOL)) { + timersub(&tv, &wq->wq_lastran_ts, &lat_tv); + + if (((lat_tv.tv_sec * 1000000) + lat_tv.tv_usec) > wq_max_run_latency_usecs) + wq->wq_flags |= WQ_ADD_TO_POOL; + + KERNEL_DEBUG(0xefffd10c, wq->wq_stalled_count, lat_tv.tv_sec, lat_tv.tv_usec, wq->wq_flags, 0); + } + wq->wq_lastran_ts = tv; + } + wq->wq_stalled_count = 0; + workqueue_unlock(p); + + KERNEL_DEBUG(0xefffd02c, wq->wq_thactivecount[0], wq->wq_thactivecount[1], + wq->wq_thactivecount[2], wq->wq_thactivecount[3], 0); + + KERNEL_DEBUG(0xefffd02c, wq->wq_thactivecount[4], wq->wq_thactivecount[5], + wq->wq_thactivecount[6], wq->wq_thactivecount[7], 0); + + /* + * if current thread is reused for workitem, does not return via unix_syscall + */ + wq_runitem(p, item, th_to_run, tl, reuse_thread, wake_thread, (thread == th_to_run)); + + if (th_to_park == THREAD_NULL) { + + KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, (int)thread, (int)item, wq->wq_flags, 1, 0); + + return; + } + workqueue_lock_spin(p); + +parkit: + wq->wq_threads_scheduled--; + /* + * this is a workqueue thread with no more + * work to do... park it for now + */ + uth = get_bsdthread_info(th_to_park); + tl = uth->uu_threadlist; + if (tl == 0) + panic("wq thread with no threadlist "); + + TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry); + tl->th_flags &= ~TH_LIST_RUNNING; + + tl->th_flags |= TH_LIST_BLOCKED; + TAILQ_INSERT_HEAD(&wq->wq_thidlelist[tl->th_affinity_tag], tl, th_entry); + + assert_wait((caddr_t)tl, (THREAD_INTERRUPTIBLE)); + + workqueue_unlock(p); + + if (start_timer) + workqueue_interval_timer_start(wq->wq_timer_call, wq_timer_interval_msecs); + + KERNEL_DEBUG1(0xefffd018 | DBG_FUNC_START, (int)current_thread(), wq->wq_threads_scheduled, 0, 0, (int)th_to_park); + + thread_block((thread_continue_t)thread_exception_return); + + panic("unexpected return from thread_block"); + +out: + workqueue_unlock(p); + + if (start_timer) + workqueue_interval_timer_start(wq->wq_timer_call, wq_timer_interval_msecs); + + KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, (int)thread, 0, wq->wq_flags, 2, 0); + + return; +} + +static void +wq_runitem(proc_t p, user_addr_t item, thread_t th, struct threadlist *tl, + int reuse_thread, int wake_thread, int return_directly) +{ + int ret = 0; + + KERNEL_DEBUG1(0xefffd004 | DBG_FUNC_START, (int)current_thread(), (int)item, wake_thread, tl->th_affinity_tag, (int)th); + + ret = setup_wqthread(p, th, item, reuse_thread, tl); + + if (ret != 0) + panic("setup_wqthread failed %x\n", ret); + + if (return_directly) { + thread_exception_return(); + + panic("wq_runitem: thread_exception_return returned ...\n"); + } + if (wake_thread) { + KERNEL_DEBUG1(0xefffd018 | DBG_FUNC_END, (int)current_thread(), 0, 0, 0, (int)th); + + wakeup(tl); + } else { + KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_END, (int)current_thread(), 0, 0, 0, (int)th); + + thread_resume(th); + } +} + + +int +setup_wqthread(proc_t p, thread_t th, user_addr_t item, int reuse_thread, struct threadlist *tl) +{ + +#if defined(__ppc__) + /* + * Set up PowerPC registers... + * internally they are always kept as 64 bit and + * since the register set is the same between 32 and 64bit modes + * we don't need 2 different methods for setting the state + */ + { + ppc_thread_state64_t state64; + ppc_thread_state64_t *ts64 = &state64; + + ts64->srr0 = (uint64_t)p->p_wqthread; + ts64->r1 = (uint64_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE) - C_ARGSAVE_LEN - C_RED_ZONE); + ts64->r3 = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE); + ts64->r4 = (uint64_t)((unsigned int)tl->th_thport); + ts64->r5 = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE); + ts64->r6 = (uint64_t)item; + ts64->r7 = (uint64_t)reuse_thread; + ts64->r8 = (uint64_t)0; + + thread_set_wq_state64(th, (thread_state_t)ts64); + } +#elif defined(__i386__) + int isLP64 = 0; + + isLP64 = IS_64BIT_PROCESS(p); + /* + * Set up i386 registers & function call. + */ + if (isLP64 == 0) { + x86_thread_state32_t state; + x86_thread_state32_t *ts = &state; + + ts->eip = (int)p->p_wqthread; + ts->eax = (unsigned int)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE); + ts->ebx = (unsigned int)tl->th_thport; + ts->ecx = (unsigned int)(tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE); + ts->edx = (unsigned int)item; + ts->edi = (unsigned int)reuse_thread; + ts->esi = (unsigned int)0; + /* + * set stack pointer + */ + ts->esp = (int)((vm_offset_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE) - C_32_STK_ALIGN)); + + thread_set_wq_state32(th, (thread_state_t)ts); + + } else { + x86_thread_state64_t state64; + x86_thread_state64_t *ts64 = &state64; + + ts64->rip = (uint64_t)p->p_wqthread; + ts64->rdi = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE); + ts64->rsi = (uint64_t)((unsigned int)(tl->th_thport)); + ts64->rdx = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE); + ts64->rcx = (uint64_t)item; + ts64->r8 = (uint64_t)reuse_thread; + ts64->r9 = (uint64_t)0; + + /* + * set stack pointer aligned to 16 byte boundary + */ + ts64->rsp = (uint64_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE) - C_64_REDZONE_LEN); + + thread_set_wq_state64(th, (thread_state_t)ts64); + } +#elif defined(__arm__) + arm_thread_state_t state; + arm_thread_state_t *ts = &state; + + /* XXX ARM add more */ + ts->pc = p->p_wqthread; + ts->sp = tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE; + + thread_set_wq_state32(th, (thread_state_t)ts); +#else +#error setup_wqthread not defined for this architecture +#endif + return(0); +} +