X-Git-Url: https://git.saurik.com/apple/xnu.git/blobdiff_plain/c910b4d9d2451126ae3917b931cd4390c11e1d52..316670eb35587141e969394ae8537d66b9211e80:/osfmk/kern/thread_call.c diff --git a/osfmk/kern/thread_call.c b/osfmk/kern/thread_call.c index 7ae31523c..7d43919ae 100644 --- a/osfmk/kern/thread_call.c +++ b/osfmk/kern/thread_call.c @@ -41,83 +41,247 @@ #include #include - #include +#include + #include -decl_simple_lock_data(static,thread_call_lock) -static zone_t thread_call_zone; +static zone_t thread_call_zone; +static struct wait_queue daemon_wqueue; struct thread_call_group { queue_head_t pending_queue; - uint32_t pending_count; + uint32_t pending_count; queue_head_t delayed_queue; + uint32_t delayed_count; timer_call_data_t delayed_timer; + timer_call_data_t dealloc_timer; struct wait_queue idle_wqueue; - uint32_t idle_count, active_count; + uint32_t idle_count, active_count; + + integer_t pri; + uint32_t target_thread_count; + uint64_t idle_timestamp; + + uint32_t flags; + sched_call_t sched_call; }; typedef struct thread_call_group *thread_call_group_t; -static struct thread_call_group thread_call_group0; +#define TCG_PARALLEL 0x01 +#define TCG_DEALLOC_ACTIVE 0x02 + +#define THREAD_CALL_GROUP_COUNT 4 +#define THREAD_CALL_THREAD_MIN 4 +#define INTERNAL_CALL_COUNT 768 +#define THREAD_CALL_DEALLOC_INTERVAL_NS (5 * 1000 * 1000) /* 5 ms */ +#define THREAD_CALL_ADD_RATIO 4 +#define THREAD_CALL_MACH_FACTOR_CAP 3 + +static struct thread_call_group thread_call_groups[THREAD_CALL_GROUP_COUNT]; +static boolean_t thread_call_daemon_awake; +static thread_call_data_t internal_call_storage[INTERNAL_CALL_COUNT]; +static queue_head_t thread_call_internal_queue; +static uint64_t thread_call_dealloc_interval_abs; + +static __inline__ thread_call_t _internal_call_allocate(void); +static __inline__ void _internal_call_release(thread_call_t call); +static __inline__ boolean_t _pending_call_enqueue(thread_call_t call, thread_call_group_t group); +static __inline__ boolean_t _delayed_call_enqueue(thread_call_t call, thread_call_group_t group, uint64_t deadline); +static __inline__ boolean_t _call_dequeue(thread_call_t call, thread_call_group_t group); +static __inline__ void thread_call_wake(thread_call_group_t group); +static __inline__ void _set_delayed_call_timer(thread_call_t call, thread_call_group_t group); +static boolean_t _remove_from_pending_queue(thread_call_func_t func, thread_call_param_t param0, boolean_t remove_all); +static boolean_t _remove_from_delayed_queue(thread_call_func_t func, thread_call_param_t param0, boolean_t remove_all); +static void thread_call_daemon(void *arg); +static void thread_call_thread(thread_call_group_t group, wait_result_t wres); +extern void thread_call_delayed_timer(timer_call_param_t p0, timer_call_param_t p1); +static void thread_call_dealloc_timer(timer_call_param_t p0, timer_call_param_t p1); +static void thread_call_group_setup(thread_call_group_t group, thread_call_priority_t pri, uint32_t target_thread_count, boolean_t parallel); +static void sched_call_thread(int type, thread_t thread); +static void thread_call_start_deallocate_timer(thread_call_group_t group); +static void thread_call_wait_locked(thread_call_t call); -static boolean_t thread_call_daemon_awake; +#define qe(x) ((queue_entry_t)(x)) +#define TC(x) ((thread_call_t)(x)) -#define thread_call_thread_min 4 -#define internal_call_count 768 +lck_grp_t thread_call_queues_lck_grp; +lck_grp_t thread_call_lck_grp; +lck_attr_t thread_call_lck_attr; +lck_grp_attr_t thread_call_lck_grp_attr; -static thread_call_data_t internal_call_storage[internal_call_count]; -static queue_head_t thread_call_internal_queue; +#if defined(__i386__) || defined(__x86_64__) +lck_mtx_t thread_call_lock_data; +#else +lck_spin_t thread_call_lock_data; +#endif -static __inline__ thread_call_t _internal_call_allocate(void); -static __inline__ void _internal_call_release( - thread_call_t call); +#define thread_call_lock_spin() \ + lck_mtx_lock_spin_always(&thread_call_lock_data) -static __inline__ boolean_t _pending_call_enqueue( - thread_call_t call, - thread_call_group_t group), - _delayed_call_enqueue( - thread_call_t call, - thread_call_group_t group, - uint64_t deadline), - _call_dequeue( - thread_call_t call, - thread_call_group_t group); +#define thread_call_unlock() \ + lck_mtx_unlock_always(&thread_call_lock_data) -static __inline__ void thread_call_wake( - thread_call_group_t group); -static __inline__ void _set_delayed_call_timer( - thread_call_t call, - thread_call_group_t group); - -static boolean_t _remove_from_pending_queue( - thread_call_func_t func, - thread_call_param_t param0, - boolean_t remove_all), - _remove_from_delayed_queue( - thread_call_func_t func, - thread_call_param_t param0, - boolean_t remove_all); +static inline spl_t +disable_ints_and_lock(void) +{ + spl_t s; -static void thread_call_daemon( - thread_call_group_t group), - thread_call_thread( - thread_call_group_t group); + s = splsched(); + thread_call_lock_spin(); -static void thread_call_delayed_timer( - timer_call_param_t p0, - timer_call_param_t p1); + return s; +} -#define qe(x) ((queue_entry_t)(x)) -#define TC(x) ((thread_call_t)(x)) +static inline void +enable_ints_and_unlock(void) +{ + thread_call_unlock(); + (void)spllo(); +} + + +static inline boolean_t +group_isparallel(thread_call_group_t group) +{ + return ((group->flags & TCG_PARALLEL) != 0); +} + +static boolean_t +thread_call_group_should_add_thread(thread_call_group_t group) +{ + uint32_t thread_count; + + if (!group_isparallel(group)) { + if (group->pending_count > 0 && group->active_count == 0) { + return TRUE; + } + + return FALSE; + } + + if (group->pending_count > 0) { + if (group->idle_count > 0) { + panic("Pending work, but threads are idle?"); + } + + thread_count = group->active_count; + + /* + * Add a thread if either there are no threads, + * the group has fewer than its target number of + * threads, or the amount of work is large relative + * to the number of threads. In the last case, pay attention + * to the total load on the system, and back off if + * it's high. + */ + if ((thread_count == 0) || + (thread_count < group->target_thread_count) || + ((group->pending_count > THREAD_CALL_ADD_RATIO * thread_count) && + (sched_mach_factor < THREAD_CALL_MACH_FACTOR_CAP))) { + return TRUE; + } + } + + return FALSE; +} + +static inline integer_t +thread_call_priority_to_sched_pri(thread_call_priority_t pri) +{ + switch (pri) { + case THREAD_CALL_PRIORITY_HIGH: + return BASEPRI_PREEMPT; + case THREAD_CALL_PRIORITY_KERNEL: + return BASEPRI_KERNEL; + case THREAD_CALL_PRIORITY_USER: + return BASEPRI_DEFAULT; + case THREAD_CALL_PRIORITY_LOW: + return DEPRESSPRI; + default: + panic("Invalid priority."); + } + + return 0; +} + +/* Lock held */ +static inline thread_call_group_t +thread_call_get_group( + thread_call_t call) +{ + thread_call_priority_t pri = call->tc_pri; + + assert(pri == THREAD_CALL_PRIORITY_LOW || + pri == THREAD_CALL_PRIORITY_USER || + pri == THREAD_CALL_PRIORITY_KERNEL || + pri == THREAD_CALL_PRIORITY_HIGH); + + return &thread_call_groups[pri]; +} + +static void +thread_call_group_setup( + thread_call_group_t group, + thread_call_priority_t pri, + uint32_t target_thread_count, + boolean_t parallel) +{ + queue_init(&group->pending_queue); + queue_init(&group->delayed_queue); + + timer_call_setup(&group->delayed_timer, thread_call_delayed_timer, group); + timer_call_setup(&group->dealloc_timer, thread_call_dealloc_timer, group); + + wait_queue_init(&group->idle_wqueue, SYNC_POLICY_FIFO); + + group->target_thread_count = target_thread_count; + group->pri = thread_call_priority_to_sched_pri(pri); + + group->sched_call = sched_call_thread; + if (parallel) { + group->flags |= TCG_PARALLEL; + group->sched_call = NULL; + } +} + +/* + * Simple wrapper for creating threads bound to + * thread call groups. + */ +static kern_return_t +thread_call_thread_create( + thread_call_group_t group) +{ + thread_t thread; + kern_return_t result; + + result = kernel_thread_start_priority((thread_continue_t)thread_call_thread, group, group->pri, &thread); + if (result != KERN_SUCCESS) { + return result; + } + + if (group->pri < BASEPRI_PREEMPT) { + /* + * New style doesn't get to run to completion in + * kernel if there are higher priority threads + * available. + */ + thread_set_eager_preempt(thread); + } + + thread_deallocate(thread); + return KERN_SUCCESS; +} /* * thread_call_initialize: @@ -128,43 +292,51 @@ static void thread_call_delayed_timer( void thread_call_initialize(void) { - thread_call_t call; - thread_call_group_t group = &thread_call_group0; + thread_call_t call; kern_return_t result; - thread_t thread; - int i; - spl_t s; + thread_t thread; + int i; i = sizeof (thread_call_data_t); thread_call_zone = zinit(i, 4096 * i, 16 * i, "thread_call"); - - simple_lock_init(&thread_call_lock, 0); - - s = splsched(); - simple_lock(&thread_call_lock); - - queue_init(&group->pending_queue); - queue_init(&group->delayed_queue); - - timer_call_setup(&group->delayed_timer, thread_call_delayed_timer, group); - - wait_queue_init(&group->idle_wqueue, SYNC_POLICY_FIFO); - - queue_init(&thread_call_internal_queue); - for ( - call = internal_call_storage; - call < &internal_call_storage[internal_call_count]; + zone_change(thread_call_zone, Z_CALLERACCT, FALSE); + zone_change(thread_call_zone, Z_NOENCRYPT, TRUE); + + lck_attr_setdefault(&thread_call_lck_attr); + lck_grp_attr_setdefault(&thread_call_lck_grp_attr); + lck_grp_init(&thread_call_queues_lck_grp, "thread_call_queues", &thread_call_lck_grp_attr); + lck_grp_init(&thread_call_lck_grp, "thread_call", &thread_call_lck_grp_attr); + +#if defined(__i386__) || defined(__x86_64__) + lck_mtx_init(&thread_call_lock_data, &thread_call_lck_grp, &thread_call_lck_attr); +#else + lck_spin_init(&thread_call_lock_data, &thread_call_lck_grp, &thread_call_lck_attr); +#endif + + nanotime_to_absolutetime(0, THREAD_CALL_DEALLOC_INTERVAL_NS, &thread_call_dealloc_interval_abs); + wait_queue_init(&daemon_wqueue, SYNC_POLICY_FIFO); + + thread_call_group_setup(&thread_call_groups[THREAD_CALL_PRIORITY_LOW], THREAD_CALL_PRIORITY_LOW, 0, TRUE); + thread_call_group_setup(&thread_call_groups[THREAD_CALL_PRIORITY_USER], THREAD_CALL_PRIORITY_USER, 0, TRUE); + thread_call_group_setup(&thread_call_groups[THREAD_CALL_PRIORITY_KERNEL], THREAD_CALL_PRIORITY_KERNEL, 1, TRUE); + thread_call_group_setup(&thread_call_groups[THREAD_CALL_PRIORITY_HIGH], THREAD_CALL_PRIORITY_HIGH, THREAD_CALL_THREAD_MIN, FALSE); + + disable_ints_and_lock(); + + queue_init(&thread_call_internal_queue); + for ( + call = internal_call_storage; + call < &internal_call_storage[INTERNAL_CALL_COUNT]; call++) { enqueue_tail(&thread_call_internal_queue, qe(call)); - } + } thread_call_daemon_awake = TRUE; - simple_unlock(&thread_call_lock); - splx(s); + enable_ints_and_unlock(); - result = kernel_thread_start_priority((thread_continue_t)thread_call_daemon, group, BASEPRI_PREEMPT + 1, &thread); + result = kernel_thread_start_priority((thread_continue_t)thread_call_daemon, NULL, BASEPRI_PREEMPT + 1, &thread); if (result != KERN_SUCCESS) panic("thread_call_initialize"); @@ -177,7 +349,9 @@ thread_call_setup( thread_call_func_t func, thread_call_param_t param0) { - call_entry_setup(call, func, param0); + bzero(call, sizeof(*call)); + call_entry_setup((call_entry_t)call, func, param0); + call->tc_pri = THREAD_CALL_PRIORITY_HIGH; /* Default priority */ } /* @@ -213,7 +387,7 @@ _internal_call_release( thread_call_t call) { if ( call >= internal_call_storage && - call < &internal_call_storage[internal_call_count] ) + call < &internal_call_storage[INTERNAL_CALL_COUNT] ) enqueue_head(&thread_call_internal_queue, qe(call)); } @@ -233,12 +407,18 @@ _pending_call_enqueue( thread_call_t call, thread_call_group_t group) { - queue_t old_queue; + queue_head_t *old_queue; + + old_queue = call_entry_enqueue_tail(CE(call), &group->pending_queue); - old_queue = call_entry_enqueue_tail(call, &group->pending_queue); + if (old_queue == NULL) { + call->tc_submit_count++; + } group->pending_count++; + thread_call_wake(group); + return (old_queue != NULL); } @@ -256,16 +436,18 @@ _pending_call_enqueue( */ static __inline__ boolean_t _delayed_call_enqueue( - thread_call_t call, + thread_call_t call, thread_call_group_t group, - uint64_t deadline) + uint64_t deadline) { - queue_t old_queue; + queue_head_t *old_queue; - old_queue = call_entry_enqueue_deadline(call, &group->delayed_queue, deadline); + old_queue = call_entry_enqueue_deadline(CE(call), &group->delayed_queue, deadline); if (old_queue == &group->pending_queue) group->pending_count--; + else if (old_queue == NULL) + call->tc_submit_count++; return (old_queue != NULL); } @@ -284,12 +466,15 @@ _call_dequeue( thread_call_t call, thread_call_group_t group) { - queue_t old_queue; + queue_head_t *old_queue; - old_queue = call_entry_dequeue(call); + old_queue = call_entry_dequeue(CE(call)); - if (old_queue == &group->pending_queue) - group->pending_count--; + if (old_queue != NULL) { + call->tc_finish_count++; + if (old_queue == &group->pending_queue) + group->pending_count--; + } return (old_queue != NULL); } @@ -307,7 +492,7 @@ _set_delayed_call_timer( thread_call_t call, thread_call_group_t group) { - timer_call_enter(&group->delayed_timer, call->deadline); + timer_call_enter(&group->delayed_timer, call->tc_call.deadline, 0); } /* @@ -329,30 +514,30 @@ _remove_from_pending_queue( { boolean_t call_removed = FALSE; thread_call_t call; - thread_call_group_t group = &thread_call_group0; - - call = TC(queue_first(&group->pending_queue)); - - while (!queue_end(&group->pending_queue, qe(call))) { - if ( call->func == func && - call->param0 == param0 ) { + thread_call_group_t group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH]; + + call = TC(queue_first(&group->pending_queue)); + + while (!queue_end(&group->pending_queue, qe(call))) { + if (call->tc_call.func == func && + call->tc_call.param0 == param0) { thread_call_t next = TC(queue_next(qe(call))); - + _call_dequeue(call, group); _internal_call_release(call); - + call_removed = TRUE; if (!remove_all) break; - + call = next; } else call = TC(queue_next(qe(call))); - } - - return (call_removed); + } + + return (call_removed); } /* @@ -372,34 +557,36 @@ _remove_from_delayed_queue( thread_call_param_t param0, boolean_t remove_all) { - boolean_t call_removed = FALSE; - thread_call_t call; - thread_call_group_t group = &thread_call_group0; - - call = TC(queue_first(&group->delayed_queue)); - - while (!queue_end(&group->delayed_queue, qe(call))) { - if ( call->func == func && - call->param0 == param0 ) { + boolean_t call_removed = FALSE; + thread_call_t call; + thread_call_group_t group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH]; + + call = TC(queue_first(&group->delayed_queue)); + + while (!queue_end(&group->delayed_queue, qe(call))) { + if (call->tc_call.func == func && + call->tc_call.param0 == param0) { thread_call_t next = TC(queue_next(qe(call))); - + _call_dequeue(call, group); - + _internal_call_release(call); - + call_removed = TRUE; if (!remove_all) break; - + call = next; } else call = TC(queue_next(qe(call))); - } - - return (call_removed); + } + + return (call_removed); } +#ifndef __LP64__ + /* * thread_call_func: * @@ -414,40 +601,38 @@ thread_call_func( thread_call_param_t param, boolean_t unique_call) { - thread_call_t call; - thread_call_group_t group = &thread_call_group0; - spl_t s; - - s = splsched(); - simple_lock(&thread_call_lock); - - call = TC(queue_first(&group->pending_queue)); - + thread_call_t call; + thread_call_group_t group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH]; + spl_t s; + + s = splsched(); + thread_call_lock_spin(); + + call = TC(queue_first(&group->pending_queue)); + while (unique_call && !queue_end(&group->pending_queue, qe(call))) { - if ( call->func == func && - call->param0 == param ) { + if (call->tc_call.func == func && call->tc_call.param0 == param) { break; } - + call = TC(queue_next(qe(call))); - } - - if (!unique_call || queue_end(&group->pending_queue, qe(call))) { + } + + if (!unique_call || queue_end(&group->pending_queue, qe(call))) { call = _internal_call_allocate(); - call->func = func; - call->param0 = param; - call->param1 = NULL; - + call->tc_call.func = func; + call->tc_call.param0 = param; + call->tc_call.param1 = NULL; + _pending_call_enqueue(call, group); - - if (group->active_count == 0) - thread_call_wake(group); - } + } - simple_unlock(&thread_call_lock); - splx(s); + thread_call_unlock(); + splx(s); } +#endif /* __LP64__ */ + /* * thread_call_func_delayed: * @@ -456,29 +641,29 @@ thread_call_func( */ void thread_call_func_delayed( - thread_call_func_t func, - thread_call_param_t param, - uint64_t deadline) + thread_call_func_t func, + thread_call_param_t param, + uint64_t deadline) { - thread_call_t call; - thread_call_group_t group = &thread_call_group0; - spl_t s; - - s = splsched(); - simple_lock(&thread_call_lock); - - call = _internal_call_allocate(); - call->func = func; - call->param0 = param; - call->param1 = 0; - - _delayed_call_enqueue(call, group, deadline); - - if (queue_first(&group->delayed_queue) == qe(call)) - _set_delayed_call_timer(call, group); - - simple_unlock(&thread_call_lock); - splx(s); + thread_call_t call; + thread_call_group_t group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH]; + spl_t s; + + s = splsched(); + thread_call_lock_spin(); + + call = _internal_call_allocate(); + call->tc_call.func = func; + call->tc_call.param0 = param; + call->tc_call.param1 = 0; + + _delayed_call_enqueue(call, group, deadline); + + if (queue_first(&group->delayed_queue) == qe(call)) + _set_delayed_call_timer(call, group); + + thread_call_unlock(); + splx(s); } /* @@ -495,29 +680,53 @@ thread_call_func_delayed( */ boolean_t thread_call_func_cancel( - thread_call_func_t func, - thread_call_param_t param, - boolean_t cancel_all) + thread_call_func_t func, + thread_call_param_t param, + boolean_t cancel_all) { - boolean_t result; - spl_t s; - - s = splsched(); - simple_lock(&thread_call_lock); + boolean_t result; + spl_t s; + + s = splsched(); + thread_call_lock_spin(); - if (cancel_all) + if (cancel_all) result = _remove_from_pending_queue(func, param, cancel_all) | - _remove_from_delayed_queue(func, param, cancel_all); + _remove_from_delayed_queue(func, param, cancel_all); else result = _remove_from_pending_queue(func, param, cancel_all) || - _remove_from_delayed_queue(func, param, cancel_all); - - simple_unlock(&thread_call_lock); - splx(s); + _remove_from_delayed_queue(func, param, cancel_all); + + thread_call_unlock(); + splx(s); return (result); } +/* + * Allocate a thread call with a given priority. Importances + * other than THREAD_CALL_PRIORITY_HIGH will be run in threads + * with eager preemption enabled (i.e. may be aggressively preempted + * by higher-priority threads which are not in the normal "urgent" bands). + */ +thread_call_t +thread_call_allocate_with_priority( + thread_call_func_t func, + thread_call_param_t param0, + thread_call_priority_t pri) +{ + thread_call_t call; + + if (pri > THREAD_CALL_PRIORITY_LOW) { + panic("Invalid pri: %d\n", pri); + } + + call = thread_call_allocate(func, param0); + call->tc_pri = pri; + + return call; +} + /* * thread_call_allocate: * @@ -525,41 +734,53 @@ thread_call_func_cancel( */ thread_call_t thread_call_allocate( - thread_call_func_t func, - thread_call_param_t param0) + thread_call_func_t func, + thread_call_param_t param0) { - thread_call_t call = zalloc(thread_call_zone); + thread_call_t call = zalloc(thread_call_zone); - call_entry_setup(call, func, param0); + thread_call_setup(call, func, param0); + call->tc_refs = 1; + call->tc_flags = THREAD_CALL_ALLOC; - return (call); + return (call); } /* * thread_call_free: * - * Free a callout entry. + * Release a callout. If the callout is currently + * executing, it will be freed when all invocations + * finish. */ boolean_t thread_call_free( - thread_call_t call) + thread_call_t call) { - spl_t s; - - s = splsched(); - simple_lock(&thread_call_lock); - - if (call->queue != NULL) { - simple_unlock(&thread_call_lock); + spl_t s; + int32_t refs; + + s = splsched(); + thread_call_lock_spin(); + + if (call->tc_call.queue != NULL) { + thread_call_unlock(); splx(s); return (FALSE); - } - - simple_unlock(&thread_call_lock); - splx(s); - - zfree(thread_call_zone, call); + } + + refs = --call->tc_refs; + if (refs < 0) { + panic("Refcount negative: %d\n", refs); + } + + thread_call_unlock(); + splx(s); + + if (refs == 0) { + zfree(thread_call_zone, call); + } return (TRUE); } @@ -574,53 +795,51 @@ thread_call_free( */ boolean_t thread_call_enter( - thread_call_t call) + thread_call_t call) { - boolean_t result = TRUE; - thread_call_group_t group = &thread_call_group0; - spl_t s; - - s = splsched(); - simple_lock(&thread_call_lock); - - if (call->queue != &group->pending_queue) { - result = _pending_call_enqueue(call, group); - - if (group->active_count == 0) - thread_call_wake(group); + boolean_t result = TRUE; + thread_call_group_t group; + spl_t s; + + group = thread_call_get_group(call); + + s = splsched(); + thread_call_lock_spin(); + + if (call->tc_call.queue != &group->pending_queue) { + result = _pending_call_enqueue(call, group); } - call->param1 = 0; + call->tc_call.param1 = 0; - simple_unlock(&thread_call_lock); - splx(s); + thread_call_unlock(); + splx(s); return (result); } boolean_t thread_call_enter1( - thread_call_t call, - thread_call_param_t param1) + thread_call_t call, + thread_call_param_t param1) { - boolean_t result = TRUE; - thread_call_group_t group = &thread_call_group0; - spl_t s; - - s = splsched(); - simple_lock(&thread_call_lock); - - if (call->queue != &group->pending_queue) { - result = _pending_call_enqueue(call, group); - - if (group->active_count == 0) - thread_call_wake(group); + boolean_t result = TRUE; + thread_call_group_t group; + spl_t s; + + group = thread_call_get_group(call); + + s = splsched(); + thread_call_lock_spin(); + + if (call->tc_call.queue != &group->pending_queue) { + result = _pending_call_enqueue(call, group); } - call->param1 = param1; + call->tc_call.param1 = param1; - simple_unlock(&thread_call_lock); - splx(s); + thread_call_unlock(); + splx(s); return (result); } @@ -636,51 +855,55 @@ thread_call_enter1( */ boolean_t thread_call_enter_delayed( - thread_call_t call, - uint64_t deadline) + thread_call_t call, + uint64_t deadline) { - boolean_t result = TRUE; - thread_call_group_t group = &thread_call_group0; - spl_t s; + boolean_t result = TRUE; + thread_call_group_t group; + spl_t s; + + group = thread_call_get_group(call); - s = splsched(); - simple_lock(&thread_call_lock); + s = splsched(); + thread_call_lock_spin(); result = _delayed_call_enqueue(call, group, deadline); if (queue_first(&group->delayed_queue) == qe(call)) _set_delayed_call_timer(call, group); - call->param1 = 0; + call->tc_call.param1 = 0; - simple_unlock(&thread_call_lock); - splx(s); + thread_call_unlock(); + splx(s); return (result); } boolean_t thread_call_enter1_delayed( - thread_call_t call, - thread_call_param_t param1, - uint64_t deadline) + thread_call_t call, + thread_call_param_t param1, + uint64_t deadline) { - boolean_t result = TRUE; - thread_call_group_t group = &thread_call_group0; - spl_t s; + boolean_t result = TRUE; + thread_call_group_t group; + spl_t s; + + group = thread_call_get_group(call); - s = splsched(); - simple_lock(&thread_call_lock); + s = splsched(); + thread_call_lock_spin(); result = _delayed_call_enqueue(call, group, deadline); if (queue_first(&group->delayed_queue) == qe(call)) _set_delayed_call_timer(call, group); - call->param1 = param1; + call->tc_call.param1 = param1; - simple_unlock(&thread_call_lock); - splx(s); + thread_call_unlock(); + splx(s); return (result); } @@ -695,23 +918,63 @@ thread_call_enter1_delayed( */ boolean_t thread_call_cancel( - thread_call_t call) + thread_call_t call) { - boolean_t result; - thread_call_group_t group = &thread_call_group0; - spl_t s; - - s = splsched(); - simple_lock(&thread_call_lock); + boolean_t result; + thread_call_group_t group; + spl_t s; + + group = thread_call_get_group(call); + + s = splsched(); + thread_call_lock_spin(); result = _call_dequeue(call, group); - - simple_unlock(&thread_call_lock); - splx(s); + + thread_call_unlock(); + splx(s); return (result); } +/* + * Cancel a thread call. If it cannot be cancelled (i.e. + * is already in flight), waits for the most recent invocation + * to finish. Note that if clients re-submit this thread call, + * it may still be pending or in flight when thread_call_cancel_wait + * returns, but all requests to execute this work item prior + * to the call to thread_call_cancel_wait will have finished. + */ +boolean_t +thread_call_cancel_wait( + thread_call_t call) +{ + boolean_t result; + thread_call_group_t group; + + if ((call->tc_flags & THREAD_CALL_ALLOC) == 0) { + panic("%s: Can't wait on thread call whose storage I don't own.", __FUNCTION__); + } + + group = thread_call_get_group(call); + + (void) splsched(); + thread_call_lock_spin(); + + result = _call_dequeue(call, group); + if (result == FALSE) { + thread_call_wait_locked(call); + } + + thread_call_unlock(); + (void) spllo(); + + return result; +} + + +#ifndef __LP64__ + /* * thread_call_is_delayed: * @@ -725,25 +988,29 @@ thread_call_is_delayed( thread_call_t call, uint64_t *deadline) { - boolean_t result = FALSE; - thread_call_group_t group = &thread_call_group0; - spl_t s; + boolean_t result = FALSE; + thread_call_group_t group; + spl_t s; + + group = thread_call_get_group(call); s = splsched(); - simple_lock(&thread_call_lock); + thread_call_lock_spin(); - if (call->queue == &group->delayed_queue) { + if (call->tc_call.queue == &group->delayed_queue) { if (deadline != NULL) - *deadline = call->deadline; + *deadline = call->tc_call.deadline; result = TRUE; } - simple_unlock(&thread_call_lock); + thread_call_unlock(); splx(s); return (result); } +#endif /* __LP64__ */ + /* * thread_call_wake: * @@ -753,48 +1020,108 @@ thread_call_is_delayed( * create additional call threads. * * Called with thread_call_lock held. + * + * For high-priority group, only does wakeup/creation if there are no threads + * running. */ static __inline__ void thread_call_wake( thread_call_group_t group) { - if (group->idle_count > 0 && wait_queue_wakeup_one(&group->idle_wqueue, NULL, THREAD_AWAKENED) == KERN_SUCCESS) { - group->idle_count--; group->active_count++; - } - else - if (!thread_call_daemon_awake) { - thread_call_daemon_awake = TRUE; - thread_wakeup_one(&thread_call_daemon_awake); + /* + * New behavior: use threads if you've got 'em. + * Traditional behavior: wake only if no threads running. + */ + if (group_isparallel(group) || group->active_count == 0) { + if (wait_queue_wakeup_one(&group->idle_wqueue, NO_EVENT, THREAD_AWAKENED, -1) == KERN_SUCCESS) { + group->idle_count--; group->active_count++; + + if (group->idle_count == 0) { + timer_call_cancel(&group->dealloc_timer); + group->flags &= TCG_DEALLOC_ACTIVE; + } + } else { + if (!thread_call_daemon_awake && thread_call_group_should_add_thread(group)) { + thread_call_daemon_awake = TRUE; + wait_queue_wakeup_one(&daemon_wqueue, NO_EVENT, THREAD_AWAKENED, -1); + } + } } } /* * sched_call_thread: * - * Call out invoked by the scheduler. + * Call out invoked by the scheduler. Used only for high-priority + * thread call group. */ static void sched_call_thread( - int type, -__unused thread_t thread) + int type, + __unused thread_t thread) { - thread_call_group_t group = &thread_call_group0; + thread_call_group_t group; + + group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH]; /* XXX */ - simple_lock(&thread_call_lock); + thread_call_lock_spin(); switch (type) { - case SCHED_CALL_BLOCK: - if (--group->active_count == 0 && group->pending_count > 0) - thread_call_wake(group); - break; + case SCHED_CALL_BLOCK: + --group->active_count; + if (group->pending_count > 0) + thread_call_wake(group); + break; + + case SCHED_CALL_UNBLOCK: + group->active_count++; + break; + } + + thread_call_unlock(); +} + +/* + * Interrupts disabled, lock held; returns the same way. + * Only called on thread calls whose storage we own. Wakes up + * anyone who might be waiting on this work item and frees it + * if the client has so requested. + */ +static void +thread_call_finish(thread_call_t call) +{ + boolean_t dowake = FALSE; + + call->tc_finish_count++; + call->tc_refs--; + + if ((call->tc_flags & THREAD_CALL_WAIT) != 0) { + dowake = TRUE; + call->tc_flags &= ~THREAD_CALL_WAIT; + + /* + * Dropping lock here because the sched call for the + * high-pri group can take the big lock from under + * a thread lock. + */ + thread_call_unlock(); + thread_wakeup((event_t)call); + thread_call_lock_spin(); + } + + if (call->tc_refs == 0) { + if (dowake) { + panic("Someone waiting on a thread call that is scheduled for free: %p\n", call->tc_call.func); + } + + enable_ints_and_unlock(); + + zfree(thread_call_zone, call); - case SCHED_CALL_UNBLOCK: - group->active_count++; - break; + (void)disable_ints_and_lock(); } - simple_unlock(&thread_call_lock); } /* @@ -802,16 +1129,28 @@ __unused thread_t thread) */ static void thread_call_thread( - thread_call_group_t group) + thread_call_group_t group, + wait_result_t wres) { - thread_t self = current_thread(); + thread_t self = current_thread(); + boolean_t canwait; + + /* + * A wakeup with THREAD_INTERRUPTED indicates that + * we should terminate. + */ + if (wres == THREAD_INTERRUPTED) { + thread_terminate(self); + + /* NOTREACHED */ + panic("thread_terminate() returned?"); + } - (void) splsched(); - simple_lock(&thread_call_lock); + (void)disable_ints_and_lock(); - thread_sched_call(self, sched_call_thread); + thread_sched_call(self, group->sched_call); - while (group->pending_count > 0) { + while (group->pending_count > 0) { thread_call_t call; thread_call_func_t func; thread_call_param_t param0, param1; @@ -819,136 +1158,315 @@ thread_call_thread( call = TC(dequeue_head(&group->pending_queue)); group->pending_count--; - func = call->func; - param0 = call->param0; - param1 = call->param1; - - call->queue = NULL; + func = call->tc_call.func; + param0 = call->tc_call.param0; + param1 = call->tc_call.param1; + + call->tc_call.queue = NULL; _internal_call_release(call); - simple_unlock(&thread_call_lock); - (void) spllo(); + /* + * Can only do wakeups for thread calls whose storage + * we control. + */ + if ((call->tc_flags & THREAD_CALL_ALLOC) != 0) { + canwait = TRUE; + call->tc_refs++; /* Delay free until we're done */ + } else + canwait = FALSE; + + enable_ints_and_unlock(); KERNEL_DEBUG_CONSTANT( - MACHDBG_CODE(DBG_MACH_SCHED,MACH_CALLOUT) | DBG_FUNC_NONE, - (int)func, (int)param0, (int)param1, 0, 0); + MACHDBG_CODE(DBG_MACH_SCHED,MACH_CALLOUT) | DBG_FUNC_NONE, + VM_KERNEL_UNSLIDE(func), param0, param1, 0, 0); (*func)(param0, param1); + if (get_preemption_level() != 0) { + int pl = get_preemption_level(); + panic("thread_call_thread: preemption_level %d, last callout %p(%p, %p)", + pl, (void *)VM_KERNEL_UNSLIDE(func), param0, param1); + } + (void)thread_funnel_set(self->funnel_lock, FALSE); /* XXX */ - (void) splsched(); - simple_lock(&thread_call_lock); - } + (void) disable_ints_and_lock(); + + if (canwait) { + /* Frees if so desired */ + thread_call_finish(call); + } + } thread_sched_call(self, NULL); group->active_count--; - if (group->idle_count < thread_call_thread_min) { + if (group_isparallel(group)) { + /* + * For new style of thread group, thread always blocks. + * If we have more than the target number of threads, + * and this is the first to block, and it isn't active + * already, set a timer for deallocating a thread if we + * continue to have a surplus. + */ group->idle_count++; - wait_queue_assert_wait(&group->idle_wqueue, NULL, THREAD_UNINT, 0); - - simple_unlock(&thread_call_lock); - (void) spllo(); + if (group->idle_count == 1) { + group->idle_timestamp = mach_absolute_time(); + } + + if (((group->flags & TCG_DEALLOC_ACTIVE) == 0) && + ((group->active_count + group->idle_count) > group->target_thread_count)) { + group->flags |= TCG_DEALLOC_ACTIVE; + thread_call_start_deallocate_timer(group); + } + + /* Wait for more work (or termination) */ + wres = wait_queue_assert_wait(&group->idle_wqueue, NO_EVENT, THREAD_INTERRUPTIBLE, 0); + if (wres != THREAD_WAITING) { + panic("kcall worker unable to assert wait?"); + } + + enable_ints_and_unlock(); thread_block_parameter((thread_continue_t)thread_call_thread, group); - /* NOTREACHED */ - } + } else { + if (group->idle_count < group->target_thread_count) { + group->idle_count++; - simple_unlock(&thread_call_lock); - (void) spllo(); - - thread_terminate(self); + wait_queue_assert_wait(&group->idle_wqueue, NO_EVENT, THREAD_UNINT, 0); /* Interrupted means to exit */ + + enable_ints_and_unlock(); + + thread_block_parameter((thread_continue_t)thread_call_thread, group); + /* NOTREACHED */ + } + } + + enable_ints_and_unlock(); + + thread_terminate(self); /* NOTREACHED */ } /* - * thread_call_daemon: + * thread_call_daemon: walk list of groups, allocating + * threads if appropriate (as determined by + * thread_call_group_should_add_thread()). */ static void -thread_call_daemon_continue( - thread_call_group_t group) +thread_call_daemon_continue(__unused void *arg) { - kern_return_t result; - thread_t thread; - - (void) splsched(); - simple_lock(&thread_call_lock); - - while (group->active_count == 0 && group->pending_count > 0) { - group->active_count++; - - simple_unlock(&thread_call_lock); - (void) spllo(); - - result = kernel_thread_start_priority((thread_continue_t)thread_call_thread, group, BASEPRI_PREEMPT, &thread); - if (result != KERN_SUCCESS) - panic("thread_call_daemon"); + int i; + kern_return_t kr; + thread_call_group_t group; + + (void)disable_ints_and_lock(); + + /* Starting at zero happens to be high-priority first. */ + for (i = 0; i < THREAD_CALL_GROUP_COUNT; i++) { + group = &thread_call_groups[i]; + while (thread_call_group_should_add_thread(group)) { + group->active_count++; + + enable_ints_and_unlock(); + + kr = thread_call_thread_create(group); + if (kr != KERN_SUCCESS) { + /* + * On failure, just pause for a moment and give up. + * We can try again later. + */ + delay(10000); /* 10 ms */ + (void)disable_ints_and_lock(); + goto out; + } + + (void)disable_ints_and_lock(); + } + } - thread_deallocate(thread); +out: + thread_call_daemon_awake = FALSE; + wait_queue_assert_wait(&daemon_wqueue, NO_EVENT, THREAD_UNINT, 0); - (void) splsched(); - simple_lock(&thread_call_lock); - } + enable_ints_and_unlock(); - thread_call_daemon_awake = FALSE; - assert_wait(&thread_call_daemon_awake, THREAD_UNINT); - - simple_unlock(&thread_call_lock); - (void) spllo(); - - thread_block_parameter((thread_continue_t)thread_call_daemon_continue, group); + thread_block_parameter((thread_continue_t)thread_call_daemon_continue, NULL); /* NOTREACHED */ } static void thread_call_daemon( - thread_call_group_t group) + __unused void *arg) { thread_t self = current_thread(); self->options |= TH_OPT_VMPRIV; vm_page_free_reserve(2); /* XXX */ - - thread_call_daemon_continue(group); - /* NOTREACHED */ + + thread_call_daemon_continue(NULL); + /* NOTREACHED */ } +/* + * Schedule timer to deallocate a worker thread if we have a surplus + * of threads (in excess of the group's target) and at least one thread + * is idle the whole time. + */ static void +thread_call_start_deallocate_timer( + thread_call_group_t group) +{ + uint64_t deadline; + boolean_t onqueue; + + assert(group->idle_count > 0); + + group->flags |= TCG_DEALLOC_ACTIVE; + deadline = group->idle_timestamp + thread_call_dealloc_interval_abs; + onqueue = timer_call_enter(&group->dealloc_timer, deadline, 0); + + if (onqueue) { + panic("Deallocate timer already active?"); + } +} + +void thread_call_delayed_timer( - timer_call_param_t p0, - __unused timer_call_param_t p1 + timer_call_param_t p0, + __unused timer_call_param_t p1 ) { - thread_call_t call; + thread_call_t call; thread_call_group_t group = p0; - boolean_t new_pending = FALSE; uint64_t timestamp; - simple_lock(&thread_call_lock); + thread_call_lock_spin(); timestamp = mach_absolute_time(); - - call = TC(queue_first(&group->delayed_queue)); - - while (!queue_end(&group->delayed_queue, qe(call))) { - if (call->deadline <= timestamp) { + + call = TC(queue_first(&group->delayed_queue)); + + while (!queue_end(&group->delayed_queue, qe(call))) { + if (call->tc_call.deadline <= timestamp) { _pending_call_enqueue(call, group); - new_pending = TRUE; } else break; - + call = TC(queue_first(&group->delayed_queue)); - } + } if (!queue_end(&group->delayed_queue, qe(call))) _set_delayed_call_timer(call, group); - if (new_pending && group->active_count == 0) - thread_call_wake(group); + thread_call_unlock(); +} + +/* + * Timer callback to tell a thread to terminate if + * we have an excess of threads and at least one has been + * idle for a long time. + */ +static void +thread_call_dealloc_timer( + timer_call_param_t p0, + __unused timer_call_param_t p1) +{ + thread_call_group_t group = (thread_call_group_t)p0; + uint64_t now; + kern_return_t res; + boolean_t terminated = FALSE; + + thread_call_lock_spin(); + + now = mach_absolute_time(); + if (group->idle_count > 0) { + if (now > group->idle_timestamp + thread_call_dealloc_interval_abs) { + terminated = TRUE; + group->idle_count--; + res = wait_queue_wakeup_one(&group->idle_wqueue, NO_EVENT, THREAD_INTERRUPTED, -1); + if (res != KERN_SUCCESS) { + panic("Unable to wake up idle thread for termination?"); + } + } + + } + + /* + * If we still have an excess of threads, schedule another + * invocation of this function. + */ + if (group->idle_count > 0 && (group->idle_count + group->active_count > group->target_thread_count)) { + /* + * If we killed someone just now, push out the + * next deadline. + */ + if (terminated) { + group->idle_timestamp = now; + } - simple_unlock(&thread_call_lock); + thread_call_start_deallocate_timer(group); + } else { + group->flags &= ~TCG_DEALLOC_ACTIVE; + } + + thread_call_unlock(); } + +/* + * Wait for all requested invocations of a thread call prior to now + * to finish. Can only be invoked on thread calls whose storage we manage. + * Just waits for the finish count to catch up to the submit count we find + * at the beginning of our wait. + */ +static void +thread_call_wait_locked(thread_call_t call) +{ + uint64_t submit_count; + wait_result_t res; + + assert(call->tc_flags & THREAD_CALL_ALLOC); + + submit_count = call->tc_submit_count; + + while (call->tc_finish_count < submit_count) { + call->tc_flags |= THREAD_CALL_WAIT; + + res = assert_wait(call, THREAD_UNINT); + if (res != THREAD_WAITING) { + panic("Unable to assert wait?"); + } + + thread_call_unlock(); + (void) spllo(); + + res = thread_block(NULL); + if (res != THREAD_AWAKENED) { + panic("Awoken with %d?", res); + } + + (void) splsched(); + thread_call_lock_spin(); + } +} + +/* + * Determine whether a thread call is either on a queue or + * currently being executed. + */ +boolean_t +thread_call_isactive(thread_call_t call) +{ + boolean_t active; + + disable_ints_and_lock(); + active = (call->tc_submit_count > call->tc_finish_count); + enable_ints_and_unlock(); + + return active; +} +