2 * Copyright (c) 2000-2017 Apple Inc. All rights reserved.
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
30 #include <sys/cdefs.h>
32 // <rdar://problem/26158937> panic() should be marked noreturn
33 extern void panic(const char *string
, ...) __printflike(1, 2) __dead2
;
35 #include <kern/assert.h>
37 #include <kern/clock.h>
38 #include <kern/cpu_data.h>
39 #include <kern/kern_types.h>
40 #include <kern/policy_internal.h>
41 #include <kern/processor.h>
42 #include <kern/sched_prim.h> /* for thread_exception_return */
43 #include <kern/task.h>
44 #include <kern/thread.h>
45 #include <kern/zalloc.h>
46 #include <mach/kern_return.h>
47 #include <mach/mach_param.h>
48 #include <mach/mach_port.h>
49 #include <mach/mach_types.h>
50 #include <mach/mach_vm.h>
51 #include <mach/sync_policy.h>
52 #include <mach/task.h>
53 #include <mach/thread_act.h> /* for thread_resume */
54 #include <mach/thread_policy.h>
55 #include <mach/thread_status.h>
56 #include <mach/vm_prot.h>
57 #include <mach/vm_statistics.h>
58 #include <machine/atomic.h>
59 #include <machine/machine_routines.h>
60 #include <vm/vm_map.h>
61 #include <vm/vm_protos.h>
63 #include <sys/eventvar.h>
64 #include <sys/kdebug.h>
65 #include <sys/kernel.h>
67 #include <sys/param.h>
68 #include <sys/proc_info.h> /* for fill_procworkqueue */
69 #include <sys/proc_internal.h>
70 #include <sys/pthread_shims.h>
71 #include <sys/resourcevar.h>
72 #include <sys/signalvar.h>
73 #include <sys/sysctl.h>
74 #include <sys/sysproto.h>
75 #include <sys/systm.h>
76 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
78 #include <pthread/bsdthread_private.h>
79 #include <pthread/workqueue_syscalls.h>
80 #include <pthread/workqueue_internal.h>
81 #include <pthread/workqueue_trace.h>
85 extern thread_t
port_name_to_thread(mach_port_name_t port_name
); /* osfmk/kern/ipc_tt.h */
87 static void workq_unpark_continue(void *uth
, wait_result_t wr
) __dead2
;
88 static void workq_schedule_creator(proc_t p
, struct workqueue
*wq
, int flags
);
90 static bool workq_threadreq_admissible(struct workqueue
*wq
, struct uthread
*uth
,
91 workq_threadreq_t req
);
93 static uint32_t workq_constrained_allowance(struct workqueue
*wq
,
94 thread_qos_t at_qos
, struct uthread
*uth
, bool may_start_timer
);
96 static bool workq_thread_is_busy(uint64_t cur_ts
,
97 _Atomic
uint64_t *lastblocked_tsp
);
99 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
;
103 struct workq_usec_var
{
108 #define WORKQ_SYSCTL_USECS(var, init) \
109 static struct workq_usec_var var = { .usecs = init }; \
110 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
111 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
112 workq_sysctl_handle_usecs, "I", "")
114 static lck_grp_t
*workq_lck_grp
;
115 static lck_attr_t
*workq_lck_attr
;
116 static lck_grp_attr_t
*workq_lck_grp_attr
;
117 os_refgrp_decl(static, workq_refgrp
, "workq", NULL
);
119 static zone_t workq_zone_workqueue
;
120 static zone_t workq_zone_threadreq
;
122 WORKQ_SYSCTL_USECS(wq_stalled_window
, WQ_STALLED_WINDOW_USECS
);
123 WORKQ_SYSCTL_USECS(wq_reduce_pool_window
, WQ_REDUCE_POOL_WINDOW_USECS
);
124 WORKQ_SYSCTL_USECS(wq_max_timer_interval
, WQ_MAX_TIMER_INTERVAL_USECS
);
125 static uint32_t wq_max_threads
= WORKQUEUE_MAXTHREADS
;
126 static uint32_t wq_max_constrained_threads
= WORKQUEUE_MAXTHREADS
/ 8;
127 static uint32_t wq_init_constrained_limit
= 1;
128 static uint16_t wq_death_max_load
;
129 static uint32_t wq_max_parallelism
[WORKQ_NUM_QOS_BUCKETS
];
134 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
137 struct workq_usec_var
*v
= arg1
;
138 int error
= sysctl_handle_int(oidp
, &v
->usecs
, 0, req
);
139 if (error
|| !req
->newptr
) {
142 clock_interval_to_absolutetime_interval(v
->usecs
, NSEC_PER_USEC
,
147 SYSCTL_INT(_kern
, OID_AUTO
, wq_max_threads
, CTLFLAG_RW
| CTLFLAG_LOCKED
,
148 &wq_max_threads
, 0, "");
150 SYSCTL_INT(_kern
, OID_AUTO
, wq_max_constrained_threads
, CTLFLAG_RW
| CTLFLAG_LOCKED
,
151 &wq_max_constrained_threads
, 0, "");
155 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
157 static struct workqueue
*
158 proc_get_wqptr_fast(struct proc
*p
)
160 return os_atomic_load(&p
->p_wqptr
, relaxed
);
163 static struct workqueue
*
164 proc_get_wqptr(struct proc
*p
)
166 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
167 return wq
== WQPTR_IS_INITING_VALUE
? NULL
: wq
;
171 proc_set_wqptr(struct proc
*p
, struct workqueue
*wq
)
173 wq
= os_atomic_xchg(&p
->p_wqptr
, wq
, release
);
174 if (wq
== WQPTR_IS_INITING_VALUE
) {
176 thread_wakeup(&p
->p_wqptr
);
182 proc_init_wqptr_or_wait(struct proc
*p
)
184 struct workqueue
*wq
;
190 p
->p_wqptr
= WQPTR_IS_INITING_VALUE
;
195 if (wq
== WQPTR_IS_INITING_VALUE
) {
196 assert_wait(&p
->p_wqptr
, THREAD_UNINT
);
198 thread_block(THREAD_CONTINUE_NULL
);
205 static inline event_t
206 workq_parked_wait_event(struct uthread
*uth
)
208 return (event_t
)&uth
->uu_workq_stackaddr
;
212 workq_thread_wakeup(struct uthread
*uth
)
214 if ((uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) == 0) {
215 thread_wakeup_thread(workq_parked_wait_event(uth
), uth
->uu_thread
);
219 #pragma mark wq_thactive
221 #if defined(__LP64__)
223 // 127 - 115 : 13 bits of zeroes
224 // 114 - 112 : best QoS among all pending constrained requests
225 // 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
226 #define WQ_THACTIVE_BUCKET_WIDTH 16
227 #define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH)
230 // 63 - 61 : best QoS among all pending constrained requests
231 // 60 : Manager bucket (0 or 1)
232 // 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
233 #define WQ_THACTIVE_BUCKET_WIDTH 10
234 #define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
236 #define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
237 #define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
239 static_assert(sizeof(wq_thactive_t
) * CHAR_BIT
- WQ_THACTIVE_QOS_SHIFT
>= 3,
240 "Make sure we have space to encode a QoS");
242 static inline wq_thactive_t
243 _wq_thactive(struct workqueue
*wq
)
245 return os_atomic_load(&wq
->wq_thactive
, relaxed
);
249 _wq_bucket(thread_qos_t qos
)
251 // Map both BG and MT to the same bucket by over-shifting down and
252 // clamping MT and BG together.
254 case THREAD_QOS_MAINTENANCE
:
261 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
262 ((tha) >> WQ_THACTIVE_QOS_SHIFT)
264 static inline thread_qos_t
265 _wq_thactive_best_constrained_req_qos(struct workqueue
*wq
)
267 // Avoid expensive atomic operations: the three bits we're loading are in
268 // a single byte, and always updated under the workqueue lock
269 wq_thactive_t v
= *(wq_thactive_t
*)&wq
->wq_thactive
;
270 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v
);
274 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue
*wq
)
276 thread_qos_t old_qos
, new_qos
;
277 workq_threadreq_t req
;
279 req
= priority_queue_max(&wq
->wq_constrained_queue
,
280 struct workq_threadreq_s
, tr_entry
);
281 new_qos
= req
? req
->tr_qos
: THREAD_QOS_UNSPECIFIED
;
282 old_qos
= _wq_thactive_best_constrained_req_qos(wq
);
283 if (old_qos
!= new_qos
) {
284 long delta
= (long)new_qos
- (long)old_qos
;
285 wq_thactive_t v
= (wq_thactive_t
)delta
<< WQ_THACTIVE_QOS_SHIFT
;
287 * We can do an atomic add relative to the initial load because updates
288 * to this qos are always serialized under the workqueue lock.
290 v
= os_atomic_add(&wq
->wq_thactive
, v
, relaxed
);
292 WQ_TRACE_WQ(TRACE_wq_thactive_update
, wq
, (uint64_t)v
,
293 (uint64_t)(v
>> 64), 0, 0);
295 WQ_TRACE_WQ(TRACE_wq_thactive_update
, wq
, v
, 0, 0, 0);
300 static inline wq_thactive_t
301 _wq_thactive_offset_for_qos(thread_qos_t qos
)
303 return (wq_thactive_t
)1 << (_wq_bucket(qos
) * WQ_THACTIVE_BUCKET_WIDTH
);
306 static inline wq_thactive_t
307 _wq_thactive_inc(struct workqueue
*wq
, thread_qos_t qos
)
309 wq_thactive_t v
= _wq_thactive_offset_for_qos(qos
);
310 return os_atomic_add_orig(&wq
->wq_thactive
, v
, relaxed
);
313 static inline wq_thactive_t
314 _wq_thactive_dec(struct workqueue
*wq
, thread_qos_t qos
)
316 wq_thactive_t v
= _wq_thactive_offset_for_qos(qos
);
317 return os_atomic_sub_orig(&wq
->wq_thactive
, v
, relaxed
);
321 _wq_thactive_move(struct workqueue
*wq
,
322 thread_qos_t old_qos
, thread_qos_t new_qos
)
324 wq_thactive_t v
= _wq_thactive_offset_for_qos(new_qos
) -
325 _wq_thactive_offset_for_qos(old_qos
);
326 os_atomic_add_orig(&wq
->wq_thactive
, v
, relaxed
);
327 wq
->wq_thscheduled_count
[_wq_bucket(old_qos
)]--;
328 wq
->wq_thscheduled_count
[_wq_bucket(new_qos
)]++;
331 static inline uint32_t
332 _wq_thactive_aggregate_downto_qos(struct workqueue
*wq
, wq_thactive_t v
,
333 thread_qos_t qos
, uint32_t *busycount
, uint32_t *max_busycount
)
335 uint32_t count
= 0, active
;
338 assert(WORKQ_THREAD_QOS_MIN
<= qos
&& qos
<= WORKQ_THREAD_QOS_MAX
);
341 curtime
= mach_absolute_time();
345 *max_busycount
= THREAD_QOS_LAST
- qos
;
348 int i
= _wq_bucket(qos
);
349 v
>>= i
* WQ_THACTIVE_BUCKET_WIDTH
;
350 for (; i
< WORKQ_NUM_QOS_BUCKETS
; i
++, v
>>= WQ_THACTIVE_BUCKET_WIDTH
) {
351 active
= v
& WQ_THACTIVE_BUCKET_MASK
;
354 if (busycount
&& wq
->wq_thscheduled_count
[i
] > active
) {
355 if (workq_thread_is_busy(curtime
, &wq
->wq_lastblocked_ts
[i
])) {
357 * We only consider the last blocked thread for a given bucket
358 * as busy because we don't want to take the list lock in each
359 * sched callback. However this is an approximation that could
360 * contribute to thread creation storms.
370 #pragma mark wq_flags
372 static inline uint32_t
373 _wq_flags(struct workqueue
*wq
)
375 return os_atomic_load(&wq
->wq_flags
, relaxed
);
379 _wq_exiting(struct workqueue
*wq
)
381 return _wq_flags(wq
) & WQ_EXITING
;
385 workq_is_exiting(struct proc
*p
)
387 struct workqueue
*wq
= proc_get_wqptr(p
);
388 return !wq
|| _wq_exiting(wq
);
392 workq_turnstile(struct proc
*p
)
394 struct workqueue
*wq
= proc_get_wqptr(p
);
395 return wq
? wq
->wq_turnstile
: TURNSTILE_NULL
;
398 #pragma mark workqueue lock
401 workq_lock_spin_is_acquired_kdp(struct workqueue
*wq
)
403 return kdp_lck_spin_is_acquired(&wq
->wq_lock
);
407 workq_lock_spin(struct workqueue
*wq
)
409 lck_spin_lock_grp(&wq
->wq_lock
, workq_lck_grp
);
413 workq_lock_held(__assert_only
struct workqueue
*wq
)
415 LCK_SPIN_ASSERT(&wq
->wq_lock
, LCK_ASSERT_OWNED
);
419 workq_lock_try(struct workqueue
*wq
)
421 return lck_spin_try_lock_grp(&wq
->wq_lock
, workq_lck_grp
);
425 workq_unlock(struct workqueue
*wq
)
427 lck_spin_unlock(&wq
->wq_lock
);
430 #pragma mark idle thread lists
432 #define WORKQ_POLICY_INIT(qos) \
433 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
435 static inline thread_qos_t
436 workq_pri_bucket(struct uu_workq_policy req
)
438 return MAX(MAX(req
.qos_req
, req
.qos_max
), req
.qos_override
);
441 static inline thread_qos_t
442 workq_pri_override(struct uu_workq_policy req
)
444 return MAX(workq_pri_bucket(req
), req
.qos_bucket
);
448 workq_thread_needs_params_change(workq_threadreq_t req
, struct uthread
*uth
)
450 workq_threadreq_param_t cur_trp
, req_trp
= { };
452 cur_trp
.trp_value
= uth
->uu_save
.uus_workq_park_data
.workloop_params
;
453 if (req
->tr_flags
& TR_FLAG_WL_PARAMS
) {
454 req_trp
= kqueue_threadreq_workloop_param(req
);
458 * CPU percent flags are handled separately to policy changes, so ignore
459 * them for all of these checks.
461 uint16_t cur_flags
= (cur_trp
.trp_flags
& ~TRP_CPUPERCENT
);
462 uint16_t req_flags
= (req_trp
.trp_flags
& ~TRP_CPUPERCENT
);
464 if (!req_flags
&& !cur_flags
) {
468 if (req_flags
!= cur_flags
) {
472 if ((req_flags
& TRP_PRIORITY
) && req_trp
.trp_pri
!= cur_trp
.trp_pri
) {
476 if ((req_flags
& TRP_POLICY
) && cur_trp
.trp_pol
!= cur_trp
.trp_pol
) {
484 workq_thread_needs_priority_change(workq_threadreq_t req
, struct uthread
*uth
)
486 if (workq_thread_needs_params_change(req
, uth
)) {
490 return req
->tr_qos
!= workq_pri_override(uth
->uu_workq_pri
);
494 workq_thread_update_bucket(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
495 struct uu_workq_policy old_pri
, struct uu_workq_policy new_pri
,
498 thread_qos_t old_bucket
= old_pri
.qos_bucket
;
499 thread_qos_t new_bucket
= workq_pri_bucket(new_pri
);
501 if (old_bucket
!= new_bucket
) {
502 _wq_thactive_move(wq
, old_bucket
, new_bucket
);
505 new_pri
.qos_bucket
= new_bucket
;
506 uth
->uu_workq_pri
= new_pri
;
508 if (workq_pri_override(old_pri
) != new_bucket
) {
509 thread_set_workq_override(uth
->uu_thread
, new_bucket
);
512 if (wq
->wq_reqcount
&& (old_bucket
> new_bucket
|| force_run
)) {
513 int flags
= WORKQ_THREADREQ_CAN_CREATE_THREADS
;
514 if (old_bucket
> new_bucket
) {
516 * When lowering our bucket, we may unblock a thread request,
517 * but we can't drop our priority before we have evaluated
518 * whether this is the case, and if we ever drop the workqueue lock
519 * that would cause a priority inversion.
521 * We hence have to disallow thread creation in that case.
525 workq_schedule_creator(p
, wq
, flags
);
530 * Sets/resets the cpu percent limits on the current thread. We can't set
531 * these limits from outside of the current thread, so this function needs
532 * to be called when we're executing on the intended
535 workq_thread_reset_cpupercent(workq_threadreq_t req
, struct uthread
*uth
)
537 assert(uth
== current_uthread());
538 workq_threadreq_param_t trp
= { };
540 if (req
&& (req
->tr_flags
& TR_FLAG_WL_PARAMS
)) {
541 trp
= kqueue_threadreq_workloop_param(req
);
544 if (uth
->uu_workq_flags
& UT_WORKQ_CPUPERCENT
) {
546 * Going through disable when we have an existing CPU percent limit
547 * set will force the ledger to refill the token bucket of the current
548 * thread. Removing any penalty applied by previous thread use.
550 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE
, 0, 0);
551 uth
->uu_workq_flags
&= ~UT_WORKQ_CPUPERCENT
;
554 if (trp
.trp_flags
& TRP_CPUPERCENT
) {
555 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK
, trp
.trp_cpupercent
,
556 (uint64_t)trp
.trp_refillms
* NSEC_PER_SEC
);
557 uth
->uu_workq_flags
|= UT_WORKQ_CPUPERCENT
;
562 workq_thread_reset_pri(struct workqueue
*wq
, struct uthread
*uth
,
563 workq_threadreq_t req
)
565 thread_t th
= uth
->uu_thread
;
566 thread_qos_t qos
= req
? req
->tr_qos
: WORKQ_THREAD_QOS_CLEANUP
;
567 workq_threadreq_param_t trp
= { };
569 int policy
= POLICY_TIMESHARE
;
571 if (req
&& (req
->tr_flags
& TR_FLAG_WL_PARAMS
)) {
572 trp
= kqueue_threadreq_workloop_param(req
);
575 uth
->uu_workq_pri
= WORKQ_POLICY_INIT(qos
);
576 uth
->uu_workq_flags
&= ~UT_WORKQ_OUTSIDE_QOS
;
577 uth
->uu_save
.uus_workq_park_data
.workloop_params
= trp
.trp_value
;
579 // qos sent out to userspace (may differ from uu_workq_pri on param threads)
580 uth
->uu_save
.uus_workq_park_data
.qos
= qos
;
582 if (qos
== WORKQ_THREAD_QOS_MANAGER
) {
583 uint32_t mgr_pri
= wq
->wq_event_manager_priority
;
584 assert(trp
.trp_value
== 0); // manager qos and thread policy don't mix
586 if (mgr_pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
587 mgr_pri
&= _PTHREAD_PRIORITY_SCHED_PRI_MASK
;
588 thread_set_workq_pri(th
, THREAD_QOS_UNSPECIFIED
, mgr_pri
,
593 qos
= _pthread_priority_thread_qos(mgr_pri
);
595 if (trp
.trp_flags
& TRP_PRIORITY
) {
596 qos
= THREAD_QOS_UNSPECIFIED
;
597 priority
= trp
.trp_pri
;
598 uth
->uu_workq_flags
|= UT_WORKQ_OUTSIDE_QOS
;
601 if (trp
.trp_flags
& TRP_POLICY
) {
602 policy
= trp
.trp_pol
;
606 thread_set_workq_pri(th
, qos
, priority
, policy
);
610 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
611 * every time a servicer is being told about a new max QoS.
614 workq_thread_set_max_qos(struct proc
*p
, struct kqrequest
*kqr
)
616 struct uu_workq_policy old_pri
, new_pri
;
617 struct uthread
*uth
= get_bsdthread_info(kqr
->kqr_thread
);
618 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
619 thread_qos_t qos
= kqr
->kqr_qos_index
;
621 if (uth
->uu_workq_pri
.qos_max
== qos
) {
626 old_pri
= new_pri
= uth
->uu_workq_pri
;
627 new_pri
.qos_max
= qos
;
628 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
632 #pragma mark idle threads accounting and handling
634 static inline struct uthread
*
635 workq_oldest_killable_idle_thread(struct workqueue
*wq
)
637 struct uthread
*uth
= TAILQ_LAST(&wq
->wq_thidlelist
, workq_uthread_head
);
639 if (uth
&& !uth
->uu_save
.uus_workq_park_data
.has_stack
) {
640 uth
= TAILQ_PREV(uth
, workq_uthread_head
, uu_workq_entry
);
642 assert(uth
->uu_save
.uus_workq_park_data
.has_stack
);
648 static inline uint64_t
649 workq_kill_delay_for_idle_thread(struct workqueue
*wq
)
651 uint64_t delay
= wq_reduce_pool_window
.abstime
;
652 uint16_t idle
= wq
->wq_thidlecount
;
655 * If we have less than wq_death_max_load threads, have a 5s timer.
657 * For the next wq_max_constrained_threads ones, decay linearly from
660 if (idle
<= wq_death_max_load
) {
664 if (wq_max_constrained_threads
> idle
- wq_death_max_load
) {
665 delay
*= (wq_max_constrained_threads
- (idle
- wq_death_max_load
));
667 return delay
/ wq_max_constrained_threads
;
671 workq_should_kill_idle_thread(struct workqueue
*wq
, struct uthread
*uth
,
674 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
675 return now
- uth
->uu_save
.uus_workq_park_data
.idle_stamp
> delay
;
679 workq_death_call_schedule(struct workqueue
*wq
, uint64_t deadline
)
681 uint32_t wq_flags
= os_atomic_load(&wq
->wq_flags
, relaxed
);
683 if (wq_flags
& (WQ_EXITING
| WQ_DEATH_CALL_SCHEDULED
)) {
686 os_atomic_or(&wq
->wq_flags
, WQ_DEATH_CALL_SCHEDULED
, relaxed
);
688 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_NONE
, wq
, 1, 0, 0, 0);
691 * <rdar://problem/13139182> Due to how long term timers work, the leeway
692 * can't be too short, so use 500ms which is long enough that we will not
693 * wake up the CPU for killing threads, but short enough that it doesn't
694 * fall into long-term timer list shenanigans.
696 thread_call_enter_delayed_with_leeway(wq
->wq_death_call
, NULL
, deadline
,
697 wq_reduce_pool_window
.abstime
/ 10,
698 THREAD_CALL_DELAY_LEEWAY
| THREAD_CALL_DELAY_USER_BACKGROUND
);
702 * `decrement` is set to the number of threads that are no longer dying:
703 * - because they have been resuscitated just in time (workq_pop_idle_thread)
704 * - or have been killed (workq_thread_terminate).
707 workq_death_policy_evaluate(struct workqueue
*wq
, uint16_t decrement
)
711 assert(wq
->wq_thdying_count
>= decrement
);
712 if ((wq
->wq_thdying_count
-= decrement
) > 0) {
716 if (wq
->wq_thidlecount
<= 1) {
720 if ((uth
= workq_oldest_killable_idle_thread(wq
)) == NULL
) {
724 uint64_t now
= mach_absolute_time();
725 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
727 if (now
- uth
->uu_save
.uus_workq_park_data
.idle_stamp
> delay
) {
728 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_START
,
729 wq
, wq
->wq_thidlecount
, 0, 0, 0);
730 wq
->wq_thdying_count
++;
731 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
732 workq_thread_wakeup(uth
);
736 workq_death_call_schedule(wq
,
737 uth
->uu_save
.uus_workq_park_data
.idle_stamp
+ delay
);
741 workq_thread_terminate(struct proc
*p
, struct uthread
*uth
)
743 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
746 TAILQ_REMOVE(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
747 if (uth
->uu_workq_flags
& UT_WORKQ_DYING
) {
748 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_END
,
749 wq
, wq
->wq_thidlecount
, 0, 0, 0);
750 workq_death_policy_evaluate(wq
, 1);
752 if (wq
->wq_nthreads
-- == wq_max_threads
) {
754 * We got under the thread limit again, which may have prevented
755 * thread creation from happening, redrive if there are pending requests
757 if (wq
->wq_reqcount
) {
758 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
763 thread_deallocate(uth
->uu_thread
);
767 workq_kill_old_threads_call(void *param0
, void *param1 __unused
)
769 struct workqueue
*wq
= param0
;
772 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
773 os_atomic_and(&wq
->wq_flags
, ~WQ_DEATH_CALL_SCHEDULED
, relaxed
);
774 workq_death_policy_evaluate(wq
, 0);
775 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
779 static struct uthread
*
780 workq_pop_idle_thread(struct workqueue
*wq
)
784 if ((uth
= TAILQ_FIRST(&wq
->wq_thidlelist
))) {
785 TAILQ_REMOVE(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
787 uth
= TAILQ_FIRST(&wq
->wq_thnewlist
);
788 TAILQ_REMOVE(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
790 TAILQ_INSERT_TAIL(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
792 assert((uth
->uu_workq_flags
& UT_WORKQ_RUNNING
) == 0);
793 uth
->uu_workq_flags
|= UT_WORKQ_RUNNING
| UT_WORKQ_OVERCOMMIT
;
794 wq
->wq_threads_scheduled
++;
795 wq
->wq_thidlecount
--;
797 if (__improbable(uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
798 uth
->uu_workq_flags
^= UT_WORKQ_DYING
;
799 workq_death_policy_evaluate(wq
, 1);
805 * Called by thread_create_workq_waiting() during thread initialization, before
806 * assert_wait, before the thread has been started.
809 workq_thread_init_and_wq_lock(task_t task
, thread_t th
)
811 struct uthread
*uth
= get_bsdthread_info(th
);
813 uth
->uu_workq_flags
= UT_WORKQ_NEW
;
814 uth
->uu_workq_pri
= WORKQ_POLICY_INIT(THREAD_QOS_LEGACY
);
815 uth
->uu_workq_thport
= MACH_PORT_NULL
;
816 uth
->uu_workq_stackaddr
= 0;
818 thread_set_tag(th
, THREAD_TAG_PTHREAD
| THREAD_TAG_WORKQUEUE
);
819 thread_reset_workq_qos(th
, THREAD_QOS_LEGACY
);
821 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task
)));
822 return workq_parked_wait_event(uth
);
826 * Try to add a new workqueue thread.
828 * - called with workq lock held
829 * - dropped and retaken around thread creation
830 * - return with workq lock held
833 workq_add_new_idle_thread(proc_t p
, struct workqueue
*wq
)
835 mach_vm_offset_t th_stackaddr
;
843 vm_map_t vmap
= get_task_map(p
->task
);
845 kret
= pthread_functions
->workq_create_threadstack(p
, vmap
, &th_stackaddr
);
846 if (kret
!= KERN_SUCCESS
) {
847 WQ_TRACE_WQ(TRACE_wq_thread_create_failed
| DBG_FUNC_NONE
, wq
,
852 kret
= thread_create_workq_waiting(p
->task
, workq_unpark_continue
, &th
);
853 if (kret
!= KERN_SUCCESS
) {
854 WQ_TRACE_WQ(TRACE_wq_thread_create_failed
| DBG_FUNC_NONE
, wq
,
856 pthread_functions
->workq_destroy_threadstack(p
, vmap
, th_stackaddr
);
860 // thread_create_workq_waiting() will return with the wq lock held
861 // on success, because it calls workq_thread_init_and_wq_lock() above
863 struct uthread
*uth
= get_bsdthread_info(th
);
866 wq
->wq_thidlecount
++;
867 uth
->uu_workq_stackaddr
= th_stackaddr
;
868 TAILQ_INSERT_TAIL(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
870 WQ_TRACE_WQ(TRACE_wq_thread_create
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
876 * Do not redrive here if we went under wq_max_threads again,
877 * it is the responsibility of the callers of this function
878 * to do so when it fails.
884 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
886 __attribute__((noreturn
, noinline
))
888 workq_unpark_for_death_and_unlock(proc_t p
, struct workqueue
*wq
,
889 struct uthread
*uth
, uint32_t death_flags
)
891 thread_qos_t qos
= workq_pri_override(uth
->uu_workq_pri
);
892 bool first_use
= uth
->uu_workq_flags
& UT_WORKQ_NEW
;
894 if (qos
> WORKQ_THREAD_QOS_CLEANUP
) {
895 workq_thread_reset_pri(wq
, uth
, NULL
);
896 qos
= WORKQ_THREAD_QOS_CLEANUP
;
899 workq_thread_reset_cpupercent(NULL
, uth
);
901 if (death_flags
& WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
) {
902 wq
->wq_thidlecount
--;
904 TAILQ_REMOVE(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
906 TAILQ_REMOVE(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
909 TAILQ_INSERT_TAIL(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
913 uint32_t flags
= WQ_FLAG_THREAD_NEWSPI
| qos
| WQ_FLAG_THREAD_PRIO_QOS
;
914 uint32_t setup_flags
= WQ_SETUP_EXIT_THREAD
;
915 thread_t th
= uth
->uu_thread
;
916 vm_map_t vmap
= get_task_map(p
->task
);
919 flags
|= WQ_FLAG_THREAD_REUSE
;
922 pthread_functions
->workq_setup_thread(p
, th
, vmap
, uth
->uu_workq_stackaddr
,
923 uth
->uu_workq_thport
, 0, setup_flags
, flags
);
924 __builtin_unreachable();
928 workq_is_current_thread_updating_turnstile(struct workqueue
*wq
)
930 return wq
->wq_turnstile_updater
== current_thread();
933 __attribute__((always_inline
))
935 workq_perform_turnstile_operation_locked(struct workqueue
*wq
,
936 void (^operation
)(void))
939 wq
->wq_turnstile_updater
= current_thread();
941 wq
->wq_turnstile_updater
= THREAD_NULL
;
945 workq_turnstile_update_inheritor(struct workqueue
*wq
,
946 turnstile_inheritor_t inheritor
,
947 turnstile_update_flags_t flags
)
949 workq_perform_turnstile_operation_locked(wq
, ^{
950 turnstile_update_inheritor(wq
->wq_turnstile
, inheritor
,
951 flags
| TURNSTILE_IMMEDIATE_UPDATE
);
952 turnstile_update_inheritor_complete(wq
->wq_turnstile
,
953 TURNSTILE_INTERLOCK_HELD
);
958 workq_push_idle_thread(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
)
960 uint64_t now
= mach_absolute_time();
962 uth
->uu_workq_flags
&= ~UT_WORKQ_RUNNING
;
963 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
964 wq
->wq_constrained_threads_scheduled
--;
966 TAILQ_REMOVE(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
967 wq
->wq_threads_scheduled
--;
969 if (wq
->wq_creator
== uth
) {
970 WQ_TRACE_WQ(TRACE_wq_creator_select
, wq
, 3, 0,
971 uth
->uu_save
.uus_workq_park_data
.yields
, 0);
972 wq
->wq_creator
= NULL
;
973 if (wq
->wq_reqcount
) {
974 workq_turnstile_update_inheritor(wq
, wq
, TURNSTILE_INHERITOR_WORKQ
);
976 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
978 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
979 TAILQ_INSERT_TAIL(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
980 wq
->wq_thidlecount
++;
984 _wq_thactive_dec(wq
, uth
->uu_workq_pri
.qos_bucket
);
985 wq
->wq_thscheduled_count
[_wq_bucket(uth
->uu_workq_pri
.qos_bucket
)]--;
986 assert(!(uth
->uu_workq_flags
& UT_WORKQ_NEW
));
987 uth
->uu_workq_flags
|= UT_WORKQ_IDLE_CLEANUP
;
990 uth
->uu_save
.uus_workq_park_data
.idle_stamp
= now
;
992 struct uthread
*oldest
= workq_oldest_killable_idle_thread(wq
);
993 uint16_t cur_idle
= wq
->wq_thidlecount
;
995 if (cur_idle
>= wq_max_constrained_threads
||
996 (wq
->wq_thdying_count
== 0 && oldest
&&
997 workq_should_kill_idle_thread(wq
, oldest
, now
))) {
999 * Immediately kill threads if we have too may of them.
1001 * And swap "place" with the oldest one we'd have woken up.
1002 * This is a relatively desperate situation where we really
1003 * need to kill threads quickly and it's best to kill
1004 * the one that's currently on core than context switching.
1007 oldest
->uu_save
.uus_workq_park_data
.idle_stamp
= now
;
1008 TAILQ_REMOVE(&wq
->wq_thidlelist
, oldest
, uu_workq_entry
);
1009 TAILQ_INSERT_HEAD(&wq
->wq_thidlelist
, oldest
, uu_workq_entry
);
1012 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_START
,
1013 wq
, cur_idle
, 0, 0, 0);
1014 wq
->wq_thdying_count
++;
1015 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
1016 uth
->uu_workq_flags
&= ~UT_WORKQ_IDLE_CLEANUP
;
1017 workq_unpark_for_death_and_unlock(p
, wq
, uth
, 0);
1018 __builtin_unreachable();
1021 struct uthread
*tail
= TAILQ_LAST(&wq
->wq_thidlelist
, workq_uthread_head
);
1024 wq
->wq_thidlecount
= cur_idle
;
1026 if (cur_idle
>= wq_death_max_load
&& tail
&&
1027 tail
->uu_save
.uus_workq_park_data
.has_stack
) {
1028 uth
->uu_save
.uus_workq_park_data
.has_stack
= false;
1029 TAILQ_INSERT_TAIL(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
1031 uth
->uu_save
.uus_workq_park_data
.has_stack
= true;
1032 TAILQ_INSERT_HEAD(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
1036 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
1037 workq_death_call_schedule(wq
, now
+ delay
);
1041 #pragma mark thread requests
1044 workq_priority_for_req(workq_threadreq_t req
)
1046 thread_qos_t qos
= req
->tr_qos
;
1048 if (req
->tr_flags
& TR_FLAG_WL_OUTSIDE_QOS
) {
1049 workq_threadreq_param_t trp
= kqueue_threadreq_workloop_param(req
);
1050 assert(trp
.trp_flags
& TRP_PRIORITY
);
1053 return thread_workq_pri_for_qos(qos
);
1056 static inline struct priority_queue
*
1057 workq_priority_queue_for_req(struct workqueue
*wq
, workq_threadreq_t req
)
1059 if (req
->tr_flags
& TR_FLAG_WL_OUTSIDE_QOS
) {
1060 return &wq
->wq_special_queue
;
1061 } else if (req
->tr_flags
& TR_FLAG_OVERCOMMIT
) {
1062 return &wq
->wq_overcommit_queue
;
1064 return &wq
->wq_constrained_queue
;
1069 * returns true if the the enqueued request is the highest priority item
1070 * in its priority queue.
1073 workq_threadreq_enqueue(struct workqueue
*wq
, workq_threadreq_t req
)
1075 assert(req
->tr_state
== TR_STATE_NEW
);
1077 req
->tr_state
= TR_STATE_QUEUED
;
1078 wq
->wq_reqcount
+= req
->tr_count
;
1080 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
1081 assert(wq
->wq_event_manager_threadreq
== NULL
);
1082 assert(req
->tr_flags
& TR_FLAG_KEVENT
);
1083 assert(req
->tr_count
== 1);
1084 wq
->wq_event_manager_threadreq
= req
;
1087 if (priority_queue_insert(workq_priority_queue_for_req(wq
, req
),
1088 &req
->tr_entry
, workq_priority_for_req(req
),
1089 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
)) {
1090 if ((req
->tr_flags
& TR_FLAG_OVERCOMMIT
) == 0) {
1091 _wq_thactive_refresh_best_constrained_req_qos(wq
);
1099 * returns true if the the dequeued request was the highest priority item
1100 * in its priority queue.
1103 workq_threadreq_dequeue(struct workqueue
*wq
, workq_threadreq_t req
)
1107 if (--req
->tr_count
== 0) {
1108 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
1109 assert(wq
->wq_event_manager_threadreq
== req
);
1110 assert(req
->tr_count
== 0);
1111 wq
->wq_event_manager_threadreq
= NULL
;
1114 if (priority_queue_remove(workq_priority_queue_for_req(wq
, req
),
1115 &req
->tr_entry
, PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
)) {
1116 if ((req
->tr_flags
& TR_FLAG_OVERCOMMIT
) == 0) {
1117 _wq_thactive_refresh_best_constrained_req_qos(wq
);
1126 workq_threadreq_destroy(proc_t p
, workq_threadreq_t req
)
1128 req
->tr_state
= TR_STATE_IDLE
;
1129 if (req
->tr_flags
& (TR_FLAG_WORKLOOP
| TR_FLAG_KEVENT
)) {
1130 kqueue_threadreq_cancel(p
, req
);
1132 zfree(workq_zone_threadreq
, req
);
1137 * Mark a thread request as complete. At this point, it is treated as owned by
1138 * the submitting subsystem and you should assume it could be freed.
1140 * Called with the workqueue lock held.
1143 workq_threadreq_bind_and_unlock(proc_t p
, struct workqueue
*wq
,
1144 workq_threadreq_t req
, struct uthread
*uth
)
1146 uint8_t tr_flags
= req
->tr_flags
;
1147 bool needs_commit
= false;
1148 int creator_flags
= 0;
1152 if (req
->tr_state
== TR_STATE_QUEUED
) {
1153 workq_threadreq_dequeue(wq
, req
);
1154 creator_flags
= WORKQ_THREADREQ_CAN_CREATE_THREADS
;
1157 if (wq
->wq_creator
== uth
) {
1158 WQ_TRACE_WQ(TRACE_wq_creator_select
, wq
, 4, 0,
1159 uth
->uu_save
.uus_workq_park_data
.yields
, 0);
1160 creator_flags
= WORKQ_THREADREQ_CAN_CREATE_THREADS
|
1161 WORKQ_THREADREQ_CREATOR_TRANSFER
;
1162 wq
->wq_creator
= NULL
;
1163 _wq_thactive_inc(wq
, req
->tr_qos
);
1164 wq
->wq_thscheduled_count
[_wq_bucket(req
->tr_qos
)]++;
1165 } else if (uth
->uu_workq_pri
.qos_bucket
!= req
->tr_qos
) {
1166 _wq_thactive_move(wq
, uth
->uu_workq_pri
.qos_bucket
, req
->tr_qos
);
1168 workq_thread_reset_pri(wq
, uth
, req
);
1170 if (tr_flags
& TR_FLAG_OVERCOMMIT
) {
1171 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
1172 uth
->uu_workq_flags
|= UT_WORKQ_OVERCOMMIT
;
1173 wq
->wq_constrained_threads_scheduled
--;
1176 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) != 0) {
1177 uth
->uu_workq_flags
&= ~UT_WORKQ_OVERCOMMIT
;
1178 wq
->wq_constrained_threads_scheduled
++;
1182 if (tr_flags
& (TR_FLAG_KEVENT
| TR_FLAG_WORKLOOP
)) {
1183 if (req
->tr_state
== TR_STATE_NEW
) {
1185 * We're called from workq_kern_threadreq_initiate()
1186 * due to an unbind, with the kq req held.
1188 assert(!creator_flags
);
1189 req
->tr_state
= TR_STATE_IDLE
;
1190 kqueue_threadreq_bind(p
, req
, uth
->uu_thread
, 0);
1192 assert(req
->tr_count
== 0);
1193 workq_perform_turnstile_operation_locked(wq
, ^{
1194 kqueue_threadreq_bind_prepost(p
, req
, uth
->uu_thread
);
1196 needs_commit
= true;
1199 } else if (req
->tr_count
> 0) {
1203 if (creator_flags
) {
1204 /* This can drop the workqueue lock, and take it again */
1205 workq_schedule_creator(p
, wq
, creator_flags
);
1211 zfree(workq_zone_threadreq
, req
);
1214 kqueue_threadreq_bind_commit(p
, uth
->uu_thread
);
1220 uint32_t upcall_flags
= WQ_FLAG_THREAD_NEWSPI
;
1221 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
1222 upcall_flags
|= WQ_FLAG_THREAD_EVENT_MANAGER
;
1223 } else if (tr_flags
& TR_FLAG_OVERCOMMIT
) {
1224 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
1226 if (tr_flags
& TR_FLAG_KEVENT
) {
1227 upcall_flags
|= WQ_FLAG_THREAD_KEVENT
;
1229 if (tr_flags
& TR_FLAG_WORKLOOP
) {
1230 upcall_flags
|= WQ_FLAG_THREAD_WORKLOOP
| WQ_FLAG_THREAD_KEVENT
;
1232 uth
->uu_save
.uus_workq_park_data
.upcall_flags
= upcall_flags
;
1235 #pragma mark workqueue thread creation thread calls
1238 workq_thread_call_prepost(struct workqueue
*wq
, uint32_t sched
, uint32_t pend
,
1241 uint32_t old_flags
, new_flags
;
1243 os_atomic_rmw_loop(&wq
->wq_flags
, old_flags
, new_flags
, acquire
, {
1244 if (__improbable(old_flags
& (WQ_EXITING
| sched
| pend
| fail_mask
))) {
1245 os_atomic_rmw_loop_give_up(return false);
1247 if (__improbable(old_flags
& WQ_PROC_SUSPENDED
)) {
1248 new_flags
= old_flags
| pend
;
1250 new_flags
= old_flags
| sched
;
1254 return (old_flags
& WQ_PROC_SUSPENDED
) == 0;
1257 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1260 workq_schedule_delayed_thread_creation(struct workqueue
*wq
, int flags
)
1262 assert(!preemption_enabled());
1264 if (!workq_thread_call_prepost(wq
, WQ_DELAYED_CALL_SCHEDULED
,
1265 WQ_DELAYED_CALL_PENDED
, WQ_IMMEDIATE_CALL_PENDED
|
1266 WQ_IMMEDIATE_CALL_SCHEDULED
)) {
1270 uint64_t now
= mach_absolute_time();
1272 if (flags
& WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART
) {
1273 /* do not change the window */
1274 } else if (now
- wq
->wq_thread_call_last_run
<= wq
->wq_timer_interval
) {
1275 wq
->wq_timer_interval
*= 2;
1276 if (wq
->wq_timer_interval
> wq_max_timer_interval
.abstime
) {
1277 wq
->wq_timer_interval
= wq_max_timer_interval
.abstime
;
1279 } else if (now
- wq
->wq_thread_call_last_run
> 2 * wq
->wq_timer_interval
) {
1280 wq
->wq_timer_interval
/= 2;
1281 if (wq
->wq_timer_interval
< wq_stalled_window
.abstime
) {
1282 wq
->wq_timer_interval
= wq_stalled_window
.abstime
;
1286 WQ_TRACE_WQ(TRACE_wq_start_add_timer
, wq
, wq
->wq_reqcount
,
1287 _wq_flags(wq
), wq
->wq_timer_interval
, 0);
1289 thread_call_t call
= wq
->wq_delayed_call
;
1290 uintptr_t arg
= WQ_DELAYED_CALL_SCHEDULED
;
1291 uint64_t deadline
= now
+ wq
->wq_timer_interval
;
1292 if (thread_call_enter1_delayed(call
, (void *)arg
, deadline
)) {
1293 panic("delayed_call was already enqueued");
1299 workq_schedule_immediate_thread_creation(struct workqueue
*wq
)
1301 assert(!preemption_enabled());
1303 if (workq_thread_call_prepost(wq
, WQ_IMMEDIATE_CALL_SCHEDULED
,
1304 WQ_IMMEDIATE_CALL_PENDED
, 0)) {
1305 WQ_TRACE_WQ(TRACE_wq_start_add_timer
, wq
, wq
->wq_reqcount
,
1306 _wq_flags(wq
), 0, 0);
1308 uintptr_t arg
= WQ_IMMEDIATE_CALL_SCHEDULED
;
1309 if (thread_call_enter1(wq
->wq_immediate_call
, (void *)arg
)) {
1310 panic("immediate_call was already enqueued");
1316 workq_proc_suspended(struct proc
*p
)
1318 struct workqueue
*wq
= proc_get_wqptr(p
);
1321 os_atomic_or(&wq
->wq_flags
, WQ_PROC_SUSPENDED
, relaxed
);
1326 workq_proc_resumed(struct proc
*p
)
1328 struct workqueue
*wq
= proc_get_wqptr(p
);
1335 wq_flags
= os_atomic_and_orig(&wq
->wq_flags
, ~(WQ_PROC_SUSPENDED
|
1336 WQ_DELAYED_CALL_PENDED
| WQ_IMMEDIATE_CALL_PENDED
), relaxed
);
1337 if ((wq_flags
& WQ_EXITING
) == 0) {
1338 disable_preemption();
1339 if (wq_flags
& WQ_IMMEDIATE_CALL_PENDED
) {
1340 workq_schedule_immediate_thread_creation(wq
);
1341 } else if (wq_flags
& WQ_DELAYED_CALL_PENDED
) {
1342 workq_schedule_delayed_thread_creation(wq
,
1343 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART
);
1345 enable_preemption();
1350 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1353 workq_thread_is_busy(uint64_t now
, _Atomic
uint64_t *lastblocked_tsp
)
1355 uint64_t lastblocked_ts
= os_atomic_load(lastblocked_tsp
, relaxed
);
1356 if (now
<= lastblocked_ts
) {
1358 * Because the update of the timestamp when a thread blocks
1359 * isn't serialized against us looking at it (i.e. we don't hold
1360 * the workq lock), it's possible to have a timestamp that matches
1361 * the current time or that even looks to be in the future relative
1362 * to when we grabbed the current time...
1364 * Just treat this as a busy thread since it must have just blocked.
1368 return (now
- lastblocked_ts
) < wq_stalled_window
.abstime
;
1372 workq_add_new_threads_call(void *_p
, void *flags
)
1375 struct workqueue
*wq
= proc_get_wqptr(p
);
1376 uint32_t my_flag
= (uint32_t)(uintptr_t)flags
;
1379 * workq_exit() will set the workqueue to NULL before
1380 * it cancels thread calls.
1386 assert((my_flag
== WQ_DELAYED_CALL_SCHEDULED
) ||
1387 (my_flag
== WQ_IMMEDIATE_CALL_SCHEDULED
));
1389 WQ_TRACE_WQ(TRACE_wq_add_timer
| DBG_FUNC_START
, wq
, _wq_flags(wq
),
1390 wq
->wq_nthreads
, wq
->wq_thidlecount
, 0);
1392 workq_lock_spin(wq
);
1394 wq
->wq_thread_call_last_run
= mach_absolute_time();
1395 os_atomic_and(&wq
->wq_flags
, ~my_flag
, release
);
1397 /* This can drop the workqueue lock, and take it again */
1398 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
1402 WQ_TRACE_WQ(TRACE_wq_add_timer
| DBG_FUNC_END
, wq
, 0,
1403 wq
->wq_nthreads
, wq
->wq_thidlecount
, 0);
1406 #pragma mark thread state tracking
1409 workq_sched_callback(int type
, thread_t thread
)
1411 struct uthread
*uth
= get_bsdthread_info(thread
);
1412 proc_t proc
= get_bsdtask_info(get_threadtask(thread
));
1413 struct workqueue
*wq
= proc_get_wqptr(proc
);
1414 thread_qos_t req_qos
, qos
= uth
->uu_workq_pri
.qos_bucket
;
1415 wq_thactive_t old_thactive
;
1416 bool start_timer
= false;
1418 if (qos
== WORKQ_THREAD_QOS_MANAGER
) {
1423 case SCHED_CALL_BLOCK
:
1424 old_thactive
= _wq_thactive_dec(wq
, qos
);
1425 req_qos
= WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive
);
1428 * Remember the timestamp of the last thread that blocked in this
1429 * bucket, it used used by admission checks to ignore one thread
1430 * being inactive if this timestamp is recent enough.
1432 * If we collide with another thread trying to update the
1433 * last_blocked (really unlikely since another thread would have to
1434 * get scheduled and then block after we start down this path), it's
1435 * not a problem. Either timestamp is adequate, so no need to retry
1437 os_atomic_store(&wq
->wq_lastblocked_ts
[_wq_bucket(qos
)],
1438 thread_last_run_time(thread
), relaxed
);
1440 if (req_qos
== THREAD_QOS_UNSPECIFIED
) {
1442 * No pending request at the moment we could unblock, move on.
1444 } else if (qos
< req_qos
) {
1446 * The blocking thread is at a lower QoS than the highest currently
1447 * pending constrained request, nothing has to be redriven
1450 uint32_t max_busycount
, old_req_count
;
1451 old_req_count
= _wq_thactive_aggregate_downto_qos(wq
, old_thactive
,
1452 req_qos
, NULL
, &max_busycount
);
1454 * If it is possible that may_start_constrained_thread had refused
1455 * admission due to being over the max concurrency, we may need to
1456 * spin up a new thread.
1458 * We take into account the maximum number of busy threads
1459 * that can affect may_start_constrained_thread as looking at the
1460 * actual number may_start_constrained_thread will see is racy.
1462 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1463 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1465 uint32_t conc
= wq_max_parallelism
[_wq_bucket(qos
)];
1466 if (old_req_count
<= conc
&& conc
<= old_req_count
+ max_busycount
) {
1467 start_timer
= workq_schedule_delayed_thread_creation(wq
, 0);
1470 if (__improbable(kdebug_enable
)) {
1471 __unused
uint32_t old
= _wq_thactive_aggregate_downto_qos(wq
,
1472 old_thactive
, qos
, NULL
, NULL
);
1473 WQ_TRACE_WQ(TRACE_wq_thread_block
| DBG_FUNC_START
, wq
,
1474 old
- 1, qos
| (req_qos
<< 8),
1475 wq
->wq_reqcount
<< 1 | start_timer
, 0);
1479 case SCHED_CALL_UNBLOCK
:
1481 * we cannot take the workqueue_lock here...
1482 * an UNBLOCK can occur from a timer event which
1483 * is run from an interrupt context... if the workqueue_lock
1484 * is already held by this processor, we'll deadlock...
1485 * the thread lock for the thread being UNBLOCKED
1488 old_thactive
= _wq_thactive_inc(wq
, qos
);
1489 if (__improbable(kdebug_enable
)) {
1490 __unused
uint32_t old
= _wq_thactive_aggregate_downto_qos(wq
,
1491 old_thactive
, qos
, NULL
, NULL
);
1492 req_qos
= WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive
);
1493 WQ_TRACE_WQ(TRACE_wq_thread_block
| DBG_FUNC_END
, wq
,
1494 old
+ 1, qos
| (req_qos
<< 8),
1495 wq
->wq_threads_scheduled
, 0);
1501 #pragma mark workq lifecycle
1504 workq_reference(struct workqueue
*wq
)
1506 os_ref_retain(&wq
->wq_refcnt
);
1510 workq_destroy(struct workqueue
*wq
)
1512 struct turnstile
*ts
;
1514 turnstile_complete((uintptr_t)wq
, &wq
->wq_turnstile
, &ts
);
1516 turnstile_cleanup();
1517 turnstile_deallocate(ts
);
1519 lck_spin_destroy(&wq
->wq_lock
, workq_lck_grp
);
1520 zfree(workq_zone_workqueue
, wq
);
1524 workq_deallocate(struct workqueue
*wq
)
1526 if (os_ref_release_relaxed(&wq
->wq_refcnt
) == 0) {
1532 workq_deallocate_safe(struct workqueue
*wq
)
1534 if (__improbable(os_ref_release_relaxed(&wq
->wq_refcnt
) == 0)) {
1535 workq_deallocate_enqueue(wq
);
1540 * Setup per-process state for the workqueue.
1543 workq_open(struct proc
*p
, __unused
struct workq_open_args
*uap
,
1544 __unused
int32_t *retval
)
1546 struct workqueue
*wq
;
1549 if ((p
->p_lflag
& P_LREGISTER
) == 0) {
1553 if (wq_init_constrained_limit
) {
1554 uint32_t limit
, num_cpus
= ml_get_max_cpus();
1557 * set up the limit for the constrained pool
1558 * this is a virtual pool in that we don't
1559 * maintain it on a separate idle and run list
1561 limit
= num_cpus
* WORKQUEUE_CONSTRAINED_FACTOR
;
1563 if (limit
> wq_max_constrained_threads
) {
1564 wq_max_constrained_threads
= limit
;
1567 if (wq_max_threads
> WQ_THACTIVE_BUCKET_HALF
) {
1568 wq_max_threads
= WQ_THACTIVE_BUCKET_HALF
;
1570 if (wq_max_threads
> CONFIG_THREAD_MAX
- 20) {
1571 wq_max_threads
= CONFIG_THREAD_MAX
- 20;
1574 wq_death_max_load
= (uint16_t)fls(num_cpus
) + 1;
1576 for (thread_qos_t qos
= WORKQ_THREAD_QOS_MIN
; qos
<= WORKQ_THREAD_QOS_MAX
; qos
++) {
1577 wq_max_parallelism
[_wq_bucket(qos
)] =
1578 qos_max_parallelism(qos
, QOS_PARALLELISM_COUNT_LOGICAL
);
1581 wq_init_constrained_limit
= 0;
1584 if (proc_get_wqptr(p
) == NULL
) {
1585 if (proc_init_wqptr_or_wait(p
) == FALSE
) {
1586 assert(proc_get_wqptr(p
) != NULL
);
1590 wq
= (struct workqueue
*)zalloc(workq_zone_workqueue
);
1591 bzero(wq
, sizeof(struct workqueue
));
1593 os_ref_init_count(&wq
->wq_refcnt
, &workq_refgrp
, 1);
1595 // Start the event manager at the priority hinted at by the policy engine
1596 thread_qos_t mgr_priority_hint
= task_get_default_manager_qos(current_task());
1597 pthread_priority_t pp
= _pthread_priority_make_from_thread_qos(mgr_priority_hint
, 0, 0);
1598 wq
->wq_event_manager_priority
= (uint32_t)pp
;
1599 wq
->wq_timer_interval
= wq_stalled_window
.abstime
;
1601 turnstile_prepare((uintptr_t)wq
, &wq
->wq_turnstile
, turnstile_alloc(),
1604 TAILQ_INIT(&wq
->wq_thrunlist
);
1605 TAILQ_INIT(&wq
->wq_thnewlist
);
1606 TAILQ_INIT(&wq
->wq_thidlelist
);
1607 priority_queue_init(&wq
->wq_overcommit_queue
,
1608 PRIORITY_QUEUE_BUILTIN_MAX_HEAP
);
1609 priority_queue_init(&wq
->wq_constrained_queue
,
1610 PRIORITY_QUEUE_BUILTIN_MAX_HEAP
);
1611 priority_queue_init(&wq
->wq_special_queue
,
1612 PRIORITY_QUEUE_BUILTIN_MAX_HEAP
);
1614 wq
->wq_delayed_call
= thread_call_allocate_with_options(
1615 workq_add_new_threads_call
, p
, THREAD_CALL_PRIORITY_KERNEL
,
1616 THREAD_CALL_OPTIONS_ONCE
);
1617 wq
->wq_immediate_call
= thread_call_allocate_with_options(
1618 workq_add_new_threads_call
, p
, THREAD_CALL_PRIORITY_KERNEL
,
1619 THREAD_CALL_OPTIONS_ONCE
);
1620 wq
->wq_death_call
= thread_call_allocate_with_options(
1621 workq_kill_old_threads_call
, wq
,
1622 THREAD_CALL_PRIORITY_USER
, THREAD_CALL_OPTIONS_ONCE
);
1624 lck_spin_init(&wq
->wq_lock
, workq_lck_grp
, workq_lck_attr
);
1626 WQ_TRACE_WQ(TRACE_wq_create
| DBG_FUNC_NONE
, wq
,
1627 VM_KERNEL_ADDRHIDE(wq
), 0, 0, 0);
1628 proc_set_wqptr(p
, wq
);
1636 * Routine: workq_mark_exiting
1638 * Function: Mark the work queue such that new threads will not be added to the
1639 * work queue after we return.
1641 * Conditions: Called against the current process.
1644 workq_mark_exiting(struct proc
*p
)
1646 struct workqueue
*wq
= proc_get_wqptr(p
);
1648 workq_threadreq_t mgr_req
;
1654 WQ_TRACE_WQ(TRACE_wq_pthread_exit
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
1656 workq_lock_spin(wq
);
1658 wq_flags
= os_atomic_or_orig(&wq
->wq_flags
, WQ_EXITING
, relaxed
);
1659 if (__improbable(wq_flags
& WQ_EXITING
)) {
1660 panic("workq_mark_exiting called twice");
1664 * Opportunistically try to cancel thread calls that are likely in flight.
1665 * workq_exit() will do the proper cleanup.
1667 if (wq_flags
& WQ_IMMEDIATE_CALL_SCHEDULED
) {
1668 thread_call_cancel(wq
->wq_immediate_call
);
1670 if (wq_flags
& WQ_DELAYED_CALL_SCHEDULED
) {
1671 thread_call_cancel(wq
->wq_delayed_call
);
1673 if (wq_flags
& WQ_DEATH_CALL_SCHEDULED
) {
1674 thread_call_cancel(wq
->wq_death_call
);
1677 mgr_req
= wq
->wq_event_manager_threadreq
;
1678 wq
->wq_event_manager_threadreq
= NULL
;
1679 wq
->wq_reqcount
= 0; /* workq_schedule_creator must not look at queues */
1680 workq_turnstile_update_inheritor(wq
, NULL
, 0);
1685 kqueue_threadreq_cancel(p
, mgr_req
);
1688 * No one touches the priority queues once WQ_EXITING is set.
1689 * It is hence safe to do the tear down without holding any lock.
1691 priority_queue_destroy(&wq
->wq_overcommit_queue
,
1692 struct workq_threadreq_s
, tr_entry
, ^(void *e
){
1693 workq_threadreq_destroy(p
, e
);
1695 priority_queue_destroy(&wq
->wq_constrained_queue
,
1696 struct workq_threadreq_s
, tr_entry
, ^(void *e
){
1697 workq_threadreq_destroy(p
, e
);
1699 priority_queue_destroy(&wq
->wq_special_queue
,
1700 struct workq_threadreq_s
, tr_entry
, ^(void *e
){
1701 workq_threadreq_destroy(p
, e
);
1704 WQ_TRACE(TRACE_wq_pthread_exit
| DBG_FUNC_END
, 0, 0, 0, 0, 0);
1708 * Routine: workq_exit
1710 * Function: clean up the work queue structure(s) now that there are no threads
1711 * left running inside the work queue (except possibly current_thread).
1713 * Conditions: Called by the last thread in the process.
1714 * Called against current process.
1717 workq_exit(struct proc
*p
)
1719 struct workqueue
*wq
;
1720 struct uthread
*uth
, *tmp
;
1722 wq
= os_atomic_xchg(&p
->p_wqptr
, NULL
, relaxed
);
1724 thread_t th
= current_thread();
1726 WQ_TRACE_WQ(TRACE_wq_workqueue_exit
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
1728 if (thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) {
1730 * <rdar://problem/40111515> Make sure we will no longer call the
1731 * sched call, if we ever block this thread, which the cancel_wait
1734 thread_sched_call(th
, NULL
);
1738 * Thread calls are always scheduled by the proc itself or under the
1739 * workqueue spinlock if WQ_EXITING is not yet set.
1741 * Either way, when this runs, the proc has no threads left beside
1742 * the one running this very code, so we know no thread call can be
1743 * dispatched anymore.
1745 thread_call_cancel_wait(wq
->wq_delayed_call
);
1746 thread_call_cancel_wait(wq
->wq_immediate_call
);
1747 thread_call_cancel_wait(wq
->wq_death_call
);
1748 thread_call_free(wq
->wq_delayed_call
);
1749 thread_call_free(wq
->wq_immediate_call
);
1750 thread_call_free(wq
->wq_death_call
);
1753 * Clean up workqueue data structures for threads that exited and
1754 * didn't get a chance to clean up after themselves.
1756 * idle/new threads should have been interrupted and died on their own
1758 TAILQ_FOREACH_SAFE(uth
, &wq
->wq_thrunlist
, uu_workq_entry
, tmp
) {
1759 thread_sched_call(uth
->uu_thread
, NULL
);
1760 thread_deallocate(uth
->uu_thread
);
1762 assert(TAILQ_EMPTY(&wq
->wq_thnewlist
));
1763 assert(TAILQ_EMPTY(&wq
->wq_thidlelist
));
1765 WQ_TRACE_WQ(TRACE_wq_destroy
| DBG_FUNC_END
, wq
,
1766 VM_KERNEL_ADDRHIDE(wq
), 0, 0, 0);
1768 workq_deallocate(wq
);
1770 WQ_TRACE(TRACE_wq_workqueue_exit
| DBG_FUNC_END
, 0, 0, 0, 0, 0);
1775 #pragma mark bsd thread control
1778 _pthread_priority_to_policy(pthread_priority_t priority
,
1779 thread_qos_policy_data_t
*data
)
1781 data
->qos_tier
= _pthread_priority_thread_qos(priority
);
1782 data
->tier_importance
= _pthread_priority_relpri(priority
);
1783 if (data
->qos_tier
== THREAD_QOS_UNSPECIFIED
|| data
->tier_importance
> 0 ||
1784 data
->tier_importance
< THREAD_QOS_MIN_TIER_IMPORTANCE
) {
1791 bsdthread_set_self(proc_t p
, thread_t th
, pthread_priority_t priority
,
1792 mach_port_name_t voucher
, enum workq_set_self_flags flags
)
1794 struct uthread
*uth
= get_bsdthread_info(th
);
1795 struct workqueue
*wq
= proc_get_wqptr(p
);
1798 int unbind_rv
= 0, qos_rv
= 0, voucher_rv
= 0, fixedpri_rv
= 0;
1799 bool is_wq_thread
= (thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
);
1801 if (flags
& WORKQ_SET_SELF_WQ_KEVENT_UNBIND
) {
1802 if (!is_wq_thread
) {
1807 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
1812 struct kqrequest
*kqr
= uth
->uu_kqr_bound
;
1814 unbind_rv
= EALREADY
;
1818 if (kqr
->kqr_state
& KQR_WORKLOOP
) {
1823 kqueue_threadreq_unbind(p
, uth
->uu_kqr_bound
);
1827 if (flags
& WORKQ_SET_SELF_QOS_FLAG
) {
1828 thread_qos_policy_data_t new_policy
;
1830 if (!_pthread_priority_to_policy(priority
, &new_policy
)) {
1835 if (!is_wq_thread
) {
1837 * Threads opted out of QoS can't change QoS
1839 if (!thread_has_qos_policy(th
)) {
1843 } else if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
1845 * Workqueue manager threads can't change QoS
1851 * For workqueue threads, possibly adjust buckets and redrive thread
1854 bool old_overcommit
= uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
;
1855 bool new_overcommit
= priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
1856 struct uu_workq_policy old_pri
, new_pri
;
1857 bool force_run
= false;
1859 workq_lock_spin(wq
);
1861 if (old_overcommit
!= new_overcommit
) {
1862 uth
->uu_workq_flags
^= UT_WORKQ_OVERCOMMIT
;
1863 if (old_overcommit
) {
1864 wq
->wq_constrained_threads_scheduled
++;
1865 } else if (wq
->wq_constrained_threads_scheduled
-- ==
1866 wq_max_constrained_threads
) {
1871 old_pri
= new_pri
= uth
->uu_workq_pri
;
1872 new_pri
.qos_req
= new_policy
.qos_tier
;
1873 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, force_run
);
1877 kr
= thread_policy_set_internal(th
, THREAD_QOS_POLICY
,
1878 (thread_policy_t
)&new_policy
, THREAD_QOS_POLICY_COUNT
);
1879 if (kr
!= KERN_SUCCESS
) {
1885 if (flags
& WORKQ_SET_SELF_VOUCHER_FLAG
) {
1886 kr
= thread_set_voucher_name(voucher
);
1887 if (kr
!= KERN_SUCCESS
) {
1888 voucher_rv
= ENOENT
;
1897 if (flags
& WORKQ_SET_SELF_FIXEDPRIORITY_FLAG
) {
1898 thread_extended_policy_data_t extpol
= {.timeshare
= 0};
1901 /* Not allowed on workqueue threads */
1902 fixedpri_rv
= ENOTSUP
;
1906 kr
= thread_policy_set_internal(th
, THREAD_EXTENDED_POLICY
,
1907 (thread_policy_t
)&extpol
, THREAD_EXTENDED_POLICY_COUNT
);
1908 if (kr
!= KERN_SUCCESS
) {
1909 fixedpri_rv
= EINVAL
;
1912 } else if (flags
& WORKQ_SET_SELF_TIMESHARE_FLAG
) {
1913 thread_extended_policy_data_t extpol
= {.timeshare
= 1};
1916 /* Not allowed on workqueue threads */
1917 fixedpri_rv
= ENOTSUP
;
1921 kr
= thread_policy_set_internal(th
, THREAD_EXTENDED_POLICY
,
1922 (thread_policy_t
)&extpol
, THREAD_EXTENDED_POLICY_COUNT
);
1923 if (kr
!= KERN_SUCCESS
) {
1924 fixedpri_rv
= EINVAL
;
1930 if (qos_rv
&& voucher_rv
) {
1931 /* Both failed, give that a unique error. */
1955 bsdthread_add_explicit_override(proc_t p
, mach_port_name_t kport
,
1956 pthread_priority_t pp
, user_addr_t resource
)
1958 thread_qos_t qos
= _pthread_priority_thread_qos(pp
);
1959 if (qos
== THREAD_QOS_UNSPECIFIED
) {
1963 thread_t th
= port_name_to_thread(kport
);
1964 if (th
== THREAD_NULL
) {
1968 int rv
= proc_thread_qos_add_override(p
->task
, th
, 0, qos
, TRUE
,
1969 resource
, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE
);
1971 thread_deallocate(th
);
1976 bsdthread_remove_explicit_override(proc_t p
, mach_port_name_t kport
,
1977 user_addr_t resource
)
1979 thread_t th
= port_name_to_thread(kport
);
1980 if (th
== THREAD_NULL
) {
1984 int rv
= proc_thread_qos_remove_override(p
->task
, th
, 0, resource
,
1985 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE
);
1987 thread_deallocate(th
);
1992 workq_thread_add_dispatch_override(proc_t p
, mach_port_name_t kport
,
1993 pthread_priority_t pp
, user_addr_t ulock_addr
)
1995 struct uu_workq_policy old_pri
, new_pri
;
1996 struct workqueue
*wq
= proc_get_wqptr(p
);
1998 thread_qos_t qos_override
= _pthread_priority_thread_qos(pp
);
1999 if (qos_override
== THREAD_QOS_UNSPECIFIED
) {
2003 thread_t thread
= port_name_to_thread(kport
);
2004 if (thread
== THREAD_NULL
) {
2008 struct uthread
*uth
= get_bsdthread_info(thread
);
2009 if ((thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
) == 0) {
2010 thread_deallocate(thread
);
2014 WQ_TRACE_WQ(TRACE_wq_override_dispatch
| DBG_FUNC_NONE
,
2015 wq
, thread_tid(thread
), 1, pp
, 0);
2017 thread_mtx_lock(thread
);
2023 * Workaround lack of explicit support for 'no-fault copyin'
2024 * <rdar://problem/24999882>, as disabling preemption prevents paging in
2026 disable_preemption();
2027 rc
= copyin_word(ulock_addr
, &val
, sizeof(kport
));
2028 enable_preemption();
2029 if (rc
== 0 && ulock_owner_value_to_port_name((uint32_t)val
) != kport
) {
2034 workq_lock_spin(wq
);
2036 old_pri
= uth
->uu_workq_pri
;
2037 if (old_pri
.qos_override
>= qos_override
) {
2039 } else if (thread
== current_thread()) {
2041 new_pri
.qos_override
= qos_override
;
2042 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
2044 uth
->uu_workq_pri
.qos_override
= qos_override
;
2045 if (qos_override
> workq_pri_override(old_pri
)) {
2046 thread_set_workq_override(thread
, qos_override
);
2053 thread_mtx_unlock(thread
);
2054 thread_deallocate(thread
);
2059 workq_thread_reset_dispatch_override(proc_t p
, thread_t thread
)
2061 struct uu_workq_policy old_pri
, new_pri
;
2062 struct workqueue
*wq
= proc_get_wqptr(p
);
2063 struct uthread
*uth
= get_bsdthread_info(thread
);
2065 if ((thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
) == 0) {
2069 WQ_TRACE_WQ(TRACE_wq_override_reset
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
2071 workq_lock_spin(wq
);
2072 old_pri
= new_pri
= uth
->uu_workq_pri
;
2073 new_pri
.qos_override
= THREAD_QOS_UNSPECIFIED
;
2074 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
2080 bsdthread_get_max_parallelism(thread_qos_t qos
, unsigned long flags
,
2083 static_assert(QOS_PARALLELISM_COUNT_LOGICAL
==
2084 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL
, "logical");
2085 static_assert(QOS_PARALLELISM_REALTIME
==
2086 _PTHREAD_QOS_PARALLELISM_REALTIME
, "realtime");
2088 if (flags
& ~(QOS_PARALLELISM_REALTIME
| QOS_PARALLELISM_COUNT_LOGICAL
)) {
2092 if (flags
& QOS_PARALLELISM_REALTIME
) {
2096 } else if (qos
== THREAD_QOS_UNSPECIFIED
|| qos
>= THREAD_QOS_LAST
) {
2100 *retval
= qos_max_parallelism(qos
, flags
);
2104 #define ENSURE_UNUSED(arg) \
2105 ({ if ((arg) != 0) { return EINVAL; } })
2108 bsdthread_ctl(struct proc
*p
, struct bsdthread_ctl_args
*uap
, int *retval
)
2111 case BSDTHREAD_CTL_QOS_OVERRIDE_START
:
2112 return bsdthread_add_explicit_override(p
, (mach_port_name_t
)uap
->arg1
,
2113 (pthread_priority_t
)uap
->arg2
, uap
->arg3
);
2114 case BSDTHREAD_CTL_QOS_OVERRIDE_END
:
2115 ENSURE_UNUSED(uap
->arg3
);
2116 return bsdthread_remove_explicit_override(p
, (mach_port_name_t
)uap
->arg1
,
2117 (user_addr_t
)uap
->arg2
);
2119 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH
:
2120 return workq_thread_add_dispatch_override(p
, (mach_port_name_t
)uap
->arg1
,
2121 (pthread_priority_t
)uap
->arg2
, uap
->arg3
);
2122 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET
:
2123 return workq_thread_reset_dispatch_override(p
, current_thread());
2125 case BSDTHREAD_CTL_SET_SELF
:
2126 return bsdthread_set_self(p
, current_thread(),
2127 (pthread_priority_t
)uap
->arg1
, (mach_port_name_t
)uap
->arg2
,
2128 (enum workq_set_self_flags
)uap
->arg3
);
2130 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM
:
2131 ENSURE_UNUSED(uap
->arg3
);
2132 return bsdthread_get_max_parallelism((thread_qos_t
)uap
->arg1
,
2133 (unsigned long)uap
->arg2
, retval
);
2135 case BSDTHREAD_CTL_SET_QOS
:
2136 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD
:
2137 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET
:
2138 /* no longer supported */
2146 #pragma mark workqueue thread manipulation
2149 workq_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
2150 struct uthread
*uth
);
2152 static void workq_setup_and_run(proc_t p
, struct uthread
*uth
, int flags
) __dead2
;
2154 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2155 static inline uint64_t
2156 workq_trace_req_id(workq_threadreq_t req
)
2158 struct kqworkloop
*kqwl
;
2159 if (req
->tr_flags
& TR_FLAG_WORKLOOP
) {
2160 kqwl
= __container_of(req
, struct kqworkloop
, kqwl_request
.kqr_req
);
2161 return kqwl
->kqwl_dynamicid
;
2164 return VM_KERNEL_ADDRHIDE(req
);
2169 * Entry point for libdispatch to ask for threads
2172 workq_reqthreads(struct proc
*p
, uint32_t reqcount
, pthread_priority_t pp
)
2174 thread_qos_t qos
= _pthread_priority_thread_qos(pp
);
2175 struct workqueue
*wq
= proc_get_wqptr(p
);
2176 uint32_t unpaced
, upcall_flags
= WQ_FLAG_THREAD_NEWSPI
;
2178 if (wq
== NULL
|| reqcount
<= 0 || reqcount
> UINT16_MAX
||
2179 qos
== THREAD_QOS_UNSPECIFIED
) {
2183 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads
| DBG_FUNC_NONE
,
2184 wq
, reqcount
, pp
, 0, 0);
2186 workq_threadreq_t req
= zalloc(workq_zone_threadreq
);
2187 priority_queue_entry_init(&req
->tr_entry
);
2188 req
->tr_state
= TR_STATE_NEW
;
2192 if (pp
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
) {
2193 req
->tr_flags
|= TR_FLAG_OVERCOMMIT
;
2194 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
2197 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate
| DBG_FUNC_NONE
,
2198 wq
, workq_trace_req_id(req
), req
->tr_qos
, reqcount
, 0);
2200 workq_lock_spin(wq
);
2202 if (_wq_exiting(wq
)) {
2207 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2208 * threads without pacing, to inform the scheduler of that workload.
2210 * The last requests, or the ones that failed the admission checks are
2211 * enqueued and go through the regular creator codepath.
2213 * If there aren't enough threads, add one, but re-evaluate everything
2214 * as conditions may now have changed.
2216 if (reqcount
> 1 && (req
->tr_flags
& TR_FLAG_OVERCOMMIT
) == 0) {
2217 unpaced
= workq_constrained_allowance(wq
, qos
, NULL
, false);
2218 if (unpaced
>= reqcount
- 1) {
2219 unpaced
= reqcount
- 1;
2222 unpaced
= reqcount
- 1;
2226 * This path does not currently handle custom workloop parameters
2227 * when creating threads for parallelism.
2229 assert(!(req
->tr_flags
& TR_FLAG_WL_PARAMS
));
2232 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
2234 while (unpaced
> 0 && wq
->wq_thidlecount
) {
2235 struct uthread
*uth
= workq_pop_idle_thread(wq
);
2237 _wq_thactive_inc(wq
, qos
);
2238 wq
->wq_thscheduled_count
[_wq_bucket(qos
)]++;
2239 workq_thread_reset_pri(wq
, uth
, req
);
2242 uth
->uu_workq_flags
|= UT_WORKQ_EARLY_BOUND
;
2243 if ((req
->tr_flags
& TR_FLAG_OVERCOMMIT
) == 0) {
2244 uth
->uu_workq_flags
&= ~UT_WORKQ_OVERCOMMIT
;
2245 wq
->wq_constrained_threads_scheduled
++;
2247 uth
->uu_save
.uus_workq_park_data
.upcall_flags
= upcall_flags
;
2248 uth
->uu_save
.uus_workq_park_data
.thread_request
= req
;
2249 workq_thread_wakeup(uth
);
2253 } while (unpaced
&& wq
->wq_nthreads
< wq_max_threads
&&
2254 workq_add_new_idle_thread(p
, wq
));
2256 if (_wq_exiting(wq
)) {
2260 req
->tr_count
= reqcount
;
2261 if (workq_threadreq_enqueue(wq
, req
)) {
2262 /* This can drop the workqueue lock, and take it again */
2263 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
2270 zfree(workq_zone_threadreq
, req
);
2275 workq_kern_threadreq_initiate(struct proc
*p
, struct kqrequest
*kqr
,
2276 struct turnstile
*workloop_ts
, thread_qos_t qos
, int flags
)
2278 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2279 workq_threadreq_t req
= &kqr
->kqr_req
;
2280 struct uthread
*uth
= NULL
;
2281 uint8_t tr_flags
= 0;
2283 if (kqr
->kqr_state
& KQR_WORKLOOP
) {
2284 tr_flags
= TR_FLAG_WORKLOOP
;
2286 workq_threadreq_param_t trp
= kqueue_threadreq_workloop_param(req
);
2287 if (trp
.trp_flags
& TRP_PRIORITY
) {
2288 tr_flags
|= TR_FLAG_WL_OUTSIDE_QOS
;
2289 qos
= thread_workq_qos_for_pri(trp
.trp_pri
);
2290 if (qos
== THREAD_QOS_UNSPECIFIED
) {
2291 qos
= WORKQ_THREAD_QOS_ABOVEUI
;
2294 if (trp
.trp_flags
) {
2295 tr_flags
|= TR_FLAG_WL_PARAMS
;
2298 tr_flags
= TR_FLAG_KEVENT
;
2300 if (qos
!= WORKQ_THREAD_QOS_MANAGER
&&
2301 (kqr
->kqr_state
& KQR_THOVERCOMMIT
)) {
2302 tr_flags
|= TR_FLAG_OVERCOMMIT
;
2305 assert(req
->tr_state
== TR_STATE_IDLE
);
2306 priority_queue_entry_init(&req
->tr_entry
);
2308 req
->tr_state
= TR_STATE_NEW
;
2309 req
->tr_flags
= tr_flags
;
2312 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate
| DBG_FUNC_NONE
, wq
,
2313 workq_trace_req_id(req
), qos
, 1, 0);
2315 if (flags
& WORKQ_THREADREQ_ATTEMPT_REBIND
) {
2317 * we're called back synchronously from the context of
2318 * kqueue_threadreq_unbind from within workq_thread_return()
2319 * we can try to match up this thread with this request !
2321 uth
= current_uthread();
2322 assert(uth
->uu_kqr_bound
== NULL
);
2325 workq_lock_spin(wq
);
2326 if (_wq_exiting(wq
)) {
2331 if (uth
&& workq_threadreq_admissible(wq
, uth
, req
)) {
2332 assert(uth
!= wq
->wq_creator
);
2333 workq_threadreq_bind_and_unlock(p
, wq
, req
, uth
);
2336 workq_perform_turnstile_operation_locked(wq
, ^{
2337 turnstile_update_inheritor(workloop_ts
, wq
->wq_turnstile
,
2338 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_TURNSTILE
);
2339 turnstile_update_inheritor_complete(workloop_ts
,
2340 TURNSTILE_INTERLOCK_HELD
);
2343 if (workq_threadreq_enqueue(wq
, req
)) {
2344 workq_schedule_creator(p
, wq
, flags
);
2353 workq_kern_threadreq_modify(struct proc
*p
, struct kqrequest
*kqr
,
2354 thread_qos_t qos
, int flags
)
2356 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2357 workq_threadreq_t req
= &kqr
->kqr_req
;
2358 bool change_overcommit
= false;
2360 if (req
->tr_flags
& TR_FLAG_WL_OUTSIDE_QOS
) {
2361 /* Requests outside-of-QoS shouldn't accept modify operations */
2365 workq_lock_spin(wq
);
2367 assert(req
->tr_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2368 assert(req
->tr_flags
& (TR_FLAG_KEVENT
| TR_FLAG_WORKLOOP
));
2370 if (req
->tr_state
== TR_STATE_BINDING
) {
2371 kqueue_threadreq_bind(p
, req
, req
->tr_binding_thread
, 0);
2376 change_overcommit
= (bool)(kqr
->kqr_state
& KQR_THOVERCOMMIT
) !=
2377 (bool)(req
->tr_flags
& TR_FLAG_OVERCOMMIT
);
2379 if (_wq_exiting(wq
) || (req
->tr_qos
== qos
&& !change_overcommit
)) {
2384 assert(req
->tr_count
== 1);
2385 if (req
->tr_state
!= TR_STATE_QUEUED
) {
2386 panic("Invalid thread request (%p) state %d", req
, req
->tr_state
);
2389 WQ_TRACE_WQ(TRACE_wq_thread_request_modify
| DBG_FUNC_NONE
, wq
,
2390 workq_trace_req_id(req
), qos
, 0, 0);
2392 struct priority_queue
*pq
= workq_priority_queue_for_req(wq
, req
);
2393 workq_threadreq_t req_max
;
2396 * Stage 1: Dequeue the request from its priority queue.
2398 * If we dequeue the root item of the constrained priority queue,
2399 * maintain the best constrained request qos invariant.
2401 if (priority_queue_remove(pq
, &req
->tr_entry
,
2402 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
)) {
2403 if ((req
->tr_flags
& TR_FLAG_OVERCOMMIT
) == 0) {
2404 _wq_thactive_refresh_best_constrained_req_qos(wq
);
2409 * Stage 2: Apply changes to the thread request
2411 * If the item will not become the root of the priority queue it belongs to,
2412 * then we need to wait in line, just enqueue and return quickly.
2414 if (__improbable(change_overcommit
)) {
2415 req
->tr_flags
^= TR_FLAG_OVERCOMMIT
;
2416 pq
= workq_priority_queue_for_req(wq
, req
);
2420 req_max
= priority_queue_max(pq
, struct workq_threadreq_s
, tr_entry
);
2421 if (req_max
&& req_max
->tr_qos
>= qos
) {
2422 priority_queue_insert(pq
, &req
->tr_entry
, workq_priority_for_req(req
),
2423 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
);
2429 * Stage 3: Reevaluate whether we should run the thread request.
2431 * Pretend the thread request is new again:
2432 * - adjust wq_reqcount to not count it anymore.
2433 * - make its state TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
2434 * properly attempts a synchronous bind)
2437 req
->tr_state
= TR_STATE_NEW
;
2438 if (workq_threadreq_enqueue(wq
, req
)) {
2439 workq_schedule_creator(p
, wq
, flags
);
2445 workq_kern_threadreq_lock(struct proc
*p
)
2447 workq_lock_spin(proc_get_wqptr_fast(p
));
2451 workq_kern_threadreq_unlock(struct proc
*p
)
2453 workq_unlock(proc_get_wqptr_fast(p
));
2457 workq_kern_threadreq_update_inheritor(struct proc
*p
, struct kqrequest
*kqr
,
2458 thread_t owner
, struct turnstile
*wl_ts
,
2459 turnstile_update_flags_t flags
)
2461 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2462 workq_threadreq_t req
= &kqr
->kqr_req
;
2463 turnstile_inheritor_t inheritor
;
2465 assert(req
->tr_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2466 assert(req
->tr_flags
& TR_FLAG_WORKLOOP
);
2467 workq_lock_held(wq
);
2469 if (req
->tr_state
== TR_STATE_BINDING
) {
2470 kqueue_threadreq_bind(p
, req
, req
->tr_binding_thread
,
2471 KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE
);
2475 if (_wq_exiting(wq
)) {
2476 inheritor
= TURNSTILE_INHERITOR_NULL
;
2478 if (req
->tr_state
!= TR_STATE_QUEUED
) {
2479 panic("Invalid thread request (%p) state %d", req
, req
->tr_state
);
2484 flags
|= TURNSTILE_INHERITOR_THREAD
;
2486 inheritor
= wq
->wq_turnstile
;
2487 flags
|= TURNSTILE_INHERITOR_TURNSTILE
;
2491 workq_perform_turnstile_operation_locked(wq
, ^{
2492 turnstile_update_inheritor(wl_ts
, inheritor
, flags
);
2497 workq_kern_threadreq_redrive(struct proc
*p
, int flags
)
2499 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2501 workq_lock_spin(wq
);
2502 workq_schedule_creator(p
, wq
, flags
);
2507 workq_schedule_creator_turnstile_redrive(struct workqueue
*wq
, bool locked
)
2510 workq_lock_spin(wq
);
2512 workq_schedule_creator(NULL
, wq
, WORKQ_THREADREQ_CREATOR_SYNC_UPDATE
);
2519 workq_thread_return(struct proc
*p
, struct workq_kernreturn_args
*uap
,
2520 struct workqueue
*wq
)
2522 thread_t th
= current_thread();
2523 struct uthread
*uth
= get_bsdthread_info(th
);
2524 struct kqrequest
*kqr
= uth
->uu_kqr_bound
;
2525 workq_threadreq_param_t trp
= { };
2526 int nevents
= uap
->affinity
, error
;
2527 user_addr_t eventlist
= uap
->item
;
2529 if (((thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) == 0) ||
2530 (uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
2534 if (eventlist
&& nevents
&& kqr
== NULL
) {
2538 /* reset signal mask on the workqueue thread to default state */
2539 if (uth
->uu_sigmask
!= (sigset_t
)(~workq_threadmask
)) {
2541 uth
->uu_sigmask
= ~workq_threadmask
;
2545 if (kqr
&& kqr
->kqr_req
.tr_flags
& TR_FLAG_WL_PARAMS
) {
2547 * Ensure we store the threadreq param before unbinding
2548 * the kqr from this thread.
2550 trp
= kqueue_threadreq_workloop_param(&kqr
->kqr_req
);
2554 uint32_t upcall_flags
= WQ_FLAG_THREAD_NEWSPI
| WQ_FLAG_THREAD_REUSE
;
2555 if (kqr
->kqr_state
& KQR_WORKLOOP
) {
2556 upcall_flags
|= WQ_FLAG_THREAD_WORKLOOP
| WQ_FLAG_THREAD_KEVENT
;
2558 upcall_flags
|= WQ_FLAG_THREAD_KEVENT
;
2560 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
2561 upcall_flags
|= WQ_FLAG_THREAD_EVENT_MANAGER
;
2563 if (uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) {
2564 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
2566 if (uth
->uu_workq_flags
& UT_WORKQ_OUTSIDE_QOS
) {
2567 upcall_flags
|= WQ_FLAG_THREAD_OUTSIDEQOS
;
2569 upcall_flags
|= uth
->uu_workq_pri
.qos_req
|
2570 WQ_FLAG_THREAD_PRIO_QOS
;
2574 error
= pthread_functions
->workq_handle_stack_events(p
, th
,
2575 get_task_map(p
->task
), uth
->uu_workq_stackaddr
,
2576 uth
->uu_workq_thport
, eventlist
, nevents
, upcall_flags
);
2581 // pthread is supposed to pass KEVENT_FLAG_PARKING here
2582 // which should cause the above call to either:
2584 // - return an error
2585 // - return 0 and have unbound properly
2586 assert(uth
->uu_kqr_bound
== NULL
);
2589 WQ_TRACE_WQ(TRACE_wq_runthread
| DBG_FUNC_END
, wq
, uap
->options
, 0, 0, 0);
2591 thread_sched_call(th
, NULL
);
2592 thread_will_park_or_terminate(th
);
2593 #if CONFIG_WORKLOOP_DEBUG
2594 UU_KEVENT_HISTORY_WRITE_ENTRY(uth
, { .uu_error
= -1, });
2597 workq_lock_spin(wq
);
2598 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2599 uth
->uu_save
.uus_workq_park_data
.workloop_params
= trp
.trp_value
;
2600 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
);
2601 __builtin_unreachable();
2605 * Multiplexed call to interact with the workqueue mechanism
2608 workq_kernreturn(struct proc
*p
, struct workq_kernreturn_args
*uap
, int32_t *retval
)
2610 int options
= uap
->options
;
2611 int arg2
= uap
->affinity
;
2612 int arg3
= uap
->prio
;
2613 struct workqueue
*wq
= proc_get_wqptr(p
);
2616 if ((p
->p_lflag
& P_LREGISTER
) == 0) {
2621 case WQOPS_QUEUE_NEWSPISUPP
: {
2623 * arg2 = offset of serialno into dispatch queue
2624 * arg3 = kevent support
2628 // If we get here, then userspace has indicated support for kevent delivery.
2631 p
->p_dispatchqueue_serialno_offset
= (uint64_t)offset
;
2634 case WQOPS_QUEUE_REQTHREADS
: {
2636 * arg2 = number of threads to start
2639 error
= workq_reqthreads(p
, arg2
, arg3
);
2642 case WQOPS_SET_EVENT_MANAGER_PRIORITY
: {
2644 * arg2 = priority for the manager thread
2646 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
2647 * the low bits of the value contains a scheduling priority
2648 * instead of a QOS value
2650 pthread_priority_t pri
= arg2
;
2658 * Normalize the incoming priority so that it is ordered numerically.
2660 if (pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
2661 pri
&= (_PTHREAD_PRIORITY_SCHED_PRI_MASK
|
2662 _PTHREAD_PRIORITY_SCHED_PRI_FLAG
);
2664 thread_qos_t qos
= _pthread_priority_thread_qos(pri
);
2665 int relpri
= _pthread_priority_relpri(pri
);
2666 if (relpri
> 0 || relpri
< THREAD_QOS_MIN_TIER_IMPORTANCE
||
2667 qos
== THREAD_QOS_UNSPECIFIED
) {
2671 pri
&= ~_PTHREAD_PRIORITY_FLAGS_MASK
;
2675 * If userspace passes a scheduling priority, that wins over any QoS.
2676 * Userspace should takes care not to lower the priority this way.
2678 workq_lock_spin(wq
);
2679 if (wq
->wq_event_manager_priority
< (uint32_t)pri
) {
2680 wq
->wq_event_manager_priority
= (uint32_t)pri
;
2685 case WQOPS_THREAD_KEVENT_RETURN
:
2686 case WQOPS_THREAD_WORKLOOP_RETURN
:
2687 case WQOPS_THREAD_RETURN
: {
2688 error
= workq_thread_return(p
, uap
, wq
);
2692 case WQOPS_SHOULD_NARROW
: {
2694 * arg2 = priority to test
2697 thread_t th
= current_thread();
2698 struct uthread
*uth
= get_bsdthread_info(th
);
2699 if (((thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) == 0) ||
2700 (uth
->uu_workq_flags
& (UT_WORKQ_DYING
| UT_WORKQ_OVERCOMMIT
))) {
2705 thread_qos_t qos
= _pthread_priority_thread_qos(arg2
);
2706 if (qos
== THREAD_QOS_UNSPECIFIED
) {
2710 workq_lock_spin(wq
);
2711 bool should_narrow
= !workq_constrained_allowance(wq
, qos
, uth
, false);
2714 *retval
= should_narrow
;
2726 * We have no work to do, park ourselves on the idle list.
2728 * Consumes the workqueue lock and does not return.
2730 __attribute__((noreturn
, noinline
))
2732 workq_park_and_unlock(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
)
2734 assert(uth
== current_uthread());
2735 assert(uth
->uu_kqr_bound
== NULL
);
2736 workq_push_idle_thread(p
, wq
, uth
); // may not return
2738 workq_thread_reset_cpupercent(NULL
, uth
);
2740 if (uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) {
2744 * workq_push_idle_thread() will unset `has_stack`
2745 * if it wants us to free the stack before parking.
2747 if (!uth
->uu_save
.uus_workq_park_data
.has_stack
) {
2748 pthread_functions
->workq_markfree_threadstack(p
, uth
->uu_thread
,
2749 get_task_map(p
->task
), uth
->uu_workq_stackaddr
);
2753 * When we remove the voucher from the thread, we may lose our importance
2754 * causing us to get preempted, so we do this after putting the thread on
2755 * the idle list. Then, when we get our importance back we'll be able to
2756 * use this thread from e.g. the kevent call out to deliver a boosting
2759 __assert_only kern_return_t kr
;
2760 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
2761 assert(kr
== KERN_SUCCESS
);
2763 workq_lock_spin(wq
);
2764 uth
->uu_workq_flags
&= ~UT_WORKQ_IDLE_CLEANUP
;
2767 if (uth
->uu_workq_flags
& UT_WORKQ_RUNNING
) {
2769 * While we'd dropped the lock to unset our voucher, someone came
2770 * around and made us runnable. But because we weren't waiting on the
2771 * event their thread_wakeup() was ineffectual. To correct for that,
2772 * we just run the continuation ourselves.
2774 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2775 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
);
2776 __builtin_unreachable();
2779 if (uth
->uu_workq_flags
& UT_WORKQ_DYING
) {
2780 workq_unpark_for_death_and_unlock(p
, wq
, uth
,
2781 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
);
2782 __builtin_unreachable();
2785 thread_set_pending_block_hint(uth
->uu_thread
, kThreadWaitParkedWorkQueue
);
2786 assert_wait(workq_parked_wait_event(uth
), THREAD_INTERRUPTIBLE
);
2788 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2789 thread_block(workq_unpark_continue
);
2790 __builtin_unreachable();
2794 workq_may_start_event_mgr_thread(struct workqueue
*wq
, struct uthread
*uth
)
2797 * There's an event manager request and either:
2798 * - no event manager currently running
2799 * - we are re-using the event manager
2801 return wq
->wq_thscheduled_count
[_wq_bucket(WORKQ_THREAD_QOS_MANAGER
)] == 0 ||
2802 (uth
&& uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
);
2806 workq_constrained_allowance(struct workqueue
*wq
, thread_qos_t at_qos
,
2807 struct uthread
*uth
, bool may_start_timer
)
2809 assert(at_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2812 uint32_t max_count
= wq
->wq_constrained_threads_scheduled
;
2813 if (uth
&& (uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
2815 * don't count the current thread as scheduled
2817 assert(max_count
> 0);
2820 if (max_count
>= wq_max_constrained_threads
) {
2821 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 1,
2822 wq
->wq_constrained_threads_scheduled
,
2823 wq_max_constrained_threads
, 0);
2825 * we need 1 or more constrained threads to return to the kernel before
2826 * we can dispatch additional work
2830 max_count
-= wq_max_constrained_threads
;
2833 * Compute a metric for many how many threads are active. We find the
2834 * highest priority request outstanding and then add up the number of
2835 * active threads in that and all higher-priority buckets. We'll also add
2836 * any "busy" threads which are not active but blocked recently enough that
2837 * we can't be sure they've gone idle yet. We'll then compare this metric
2838 * to our max concurrency to decide whether to add a new thread.
2841 uint32_t busycount
, thactive_count
;
2843 thactive_count
= _wq_thactive_aggregate_downto_qos(wq
, _wq_thactive(wq
),
2844 at_qos
, &busycount
, NULL
);
2846 if (uth
&& uth
->uu_workq_pri
.qos_bucket
!= WORKQ_THREAD_QOS_MANAGER
&&
2847 at_qos
<= uth
->uu_workq_pri
.qos_bucket
) {
2849 * Don't count this thread as currently active, but only if it's not
2850 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
2853 assert(thactive_count
> 0);
2857 count
= wq_max_parallelism
[_wq_bucket(at_qos
)];
2858 if (count
> thactive_count
+ busycount
) {
2859 count
-= thactive_count
+ busycount
;
2860 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 2,
2861 thactive_count
, busycount
, 0);
2862 return MIN(count
, max_count
);
2864 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 3,
2865 thactive_count
, busycount
, 0);
2868 if (busycount
&& may_start_timer
) {
2870 * If this is called from the add timer, we won't have another timer
2871 * fire when the thread exits the "busy" state, so rearm the timer.
2873 workq_schedule_delayed_thread_creation(wq
, 0);
2880 workq_threadreq_admissible(struct workqueue
*wq
, struct uthread
*uth
,
2881 workq_threadreq_t req
)
2883 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
2884 return workq_may_start_event_mgr_thread(wq
, uth
);
2886 if ((req
->tr_flags
& TR_FLAG_OVERCOMMIT
) == 0) {
2887 return workq_constrained_allowance(wq
, req
->tr_qos
, uth
, true);
2892 static workq_threadreq_t
2893 workq_threadreq_select_for_creator(struct workqueue
*wq
)
2895 workq_threadreq_t req_qos
, req_pri
, req_tmp
;
2896 thread_qos_t qos
= THREAD_QOS_UNSPECIFIED
;
2899 req_tmp
= wq
->wq_event_manager_threadreq
;
2900 if (req_tmp
&& workq_may_start_event_mgr_thread(wq
, NULL
)) {
2905 * Compute the best priority request, and ignore the turnstile for now
2908 req_pri
= priority_queue_max(&wq
->wq_special_queue
,
2909 struct workq_threadreq_s
, tr_entry
);
2911 pri
= priority_queue_entry_key(&wq
->wq_special_queue
, &req_pri
->tr_entry
);
2915 * Compute the best QoS Request, and check whether it beats the "pri" one
2918 req_qos
= priority_queue_max(&wq
->wq_overcommit_queue
,
2919 struct workq_threadreq_s
, tr_entry
);
2921 qos
= req_qos
->tr_qos
;
2924 req_tmp
= priority_queue_max(&wq
->wq_constrained_queue
,
2925 struct workq_threadreq_s
, tr_entry
);
2927 if (req_tmp
&& qos
< req_tmp
->tr_qos
) {
2928 if (pri
&& pri
>= thread_workq_pri_for_qos(req_tmp
->tr_qos
)) {
2932 if (workq_constrained_allowance(wq
, req_tmp
->tr_qos
, NULL
, true)) {
2934 * If the constrained thread request is the best one and passes
2935 * the admission check, pick it.
2941 if (pri
&& (!qos
|| pri
>= thread_workq_pri_for_qos(qos
))) {
2950 * If we had no eligible request but we have a turnstile push,
2951 * it must be a non overcommit thread request that failed
2952 * the admission check.
2954 * Just fake a BG thread request so that if the push stops the creator
2955 * priority just drops to 4.
2957 if (turnstile_workq_proprietor_of_max_turnstile(wq
->wq_turnstile
, NULL
)) {
2958 static struct workq_threadreq_s workq_sync_push_fake_req
= {
2959 .tr_qos
= THREAD_QOS_BACKGROUND
,
2962 return &workq_sync_push_fake_req
;
2968 static workq_threadreq_t
2969 workq_threadreq_select(struct workqueue
*wq
, struct uthread
*uth
)
2971 workq_threadreq_t req_qos
, req_pri
, req_tmp
;
2972 uintptr_t proprietor
;
2973 thread_qos_t qos
= THREAD_QOS_UNSPECIFIED
;
2976 if (uth
== wq
->wq_creator
) {
2980 req_tmp
= wq
->wq_event_manager_threadreq
;
2981 if (req_tmp
&& workq_may_start_event_mgr_thread(wq
, uth
)) {
2986 * Compute the best priority request (special or turnstile)
2989 pri
= turnstile_workq_proprietor_of_max_turnstile(wq
->wq_turnstile
,
2992 struct kqworkloop
*kqwl
= (struct kqworkloop
*)proprietor
;
2993 req_pri
= &kqwl
->kqwl_request
.kqr_req
;
2994 if (req_pri
->tr_state
!= TR_STATE_QUEUED
) {
2995 panic("Invalid thread request (%p) state %d",
2996 req_pri
, req_pri
->tr_state
);
3002 req_tmp
= priority_queue_max(&wq
->wq_special_queue
,
3003 struct workq_threadreq_s
, tr_entry
);
3004 if (req_tmp
&& pri
< priority_queue_entry_key(&wq
->wq_special_queue
,
3005 &req_tmp
->tr_entry
)) {
3007 pri
= priority_queue_entry_key(&wq
->wq_special_queue
, &req_tmp
->tr_entry
);
3011 * Compute the best QoS Request, and check whether it beats the "pri" one
3014 req_qos
= priority_queue_max(&wq
->wq_overcommit_queue
,
3015 struct workq_threadreq_s
, tr_entry
);
3017 qos
= req_qos
->tr_qos
;
3020 req_tmp
= priority_queue_max(&wq
->wq_constrained_queue
,
3021 struct workq_threadreq_s
, tr_entry
);
3023 if (req_tmp
&& qos
< req_tmp
->tr_qos
) {
3024 if (pri
&& pri
>= thread_workq_pri_for_qos(req_tmp
->tr_qos
)) {
3028 if (workq_constrained_allowance(wq
, req_tmp
->tr_qos
, uth
, true)) {
3030 * If the constrained thread request is the best one and passes
3031 * the admission check, pick it.
3037 if (req_pri
&& (!qos
|| pri
>= thread_workq_pri_for_qos(qos
))) {
3045 * The creator is an anonymous thread that is counted as scheduled,
3046 * but otherwise without its scheduler callback set or tracked as active
3047 * that is used to make other threads.
3049 * When more requests are added or an existing one is hurried along,
3050 * a creator is elected and setup, or the existing one overridden accordingly.
3052 * While this creator is in flight, because no request has been dequeued,
3053 * already running threads have a chance at stealing thread requests avoiding
3054 * useless context switches, and the creator once scheduled may not find any
3055 * work to do and will then just park again.
3057 * The creator serves the dual purpose of informing the scheduler of work that
3058 * hasn't be materialized as threads yet, and also as a natural pacing mechanism
3059 * for thread creation.
3061 * By being anonymous (and not bound to anything) it means that thread requests
3062 * can be stolen from this creator by threads already on core yielding more
3063 * efficient scheduling and reduced context switches.
3066 workq_schedule_creator(proc_t p
, struct workqueue
*wq
, int flags
)
3068 workq_threadreq_t req
;
3069 struct uthread
*uth
;
3071 workq_lock_held(wq
);
3072 assert(p
|| (flags
& WORKQ_THREADREQ_CAN_CREATE_THREADS
) == 0);
3075 uth
= wq
->wq_creator
;
3077 if (!wq
->wq_reqcount
) {
3079 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
3084 req
= workq_threadreq_select_for_creator(wq
);
3086 if (flags
& WORKQ_THREADREQ_CREATOR_SYNC_UPDATE
) {
3087 assert((flags
& WORKQ_THREADREQ_CREATOR_TRANSFER
) == 0);
3089 * turnstile propagation code is reaching out to us,
3090 * and we still don't want to do anything, do not recurse.
3093 workq_turnstile_update_inheritor(wq
, wq
, TURNSTILE_INHERITOR_WORKQ
);
3100 * We need to maybe override the creator we already have
3102 if (workq_thread_needs_priority_change(req
, uth
)) {
3103 WQ_TRACE_WQ(TRACE_wq_creator_select
| DBG_FUNC_NONE
,
3104 wq
, 1, thread_tid(uth
->uu_thread
), req
->tr_qos
, 0);
3105 workq_thread_reset_pri(wq
, uth
, req
);
3107 } else if (wq
->wq_thidlecount
) {
3109 * We need to unpark a creator thread
3111 wq
->wq_creator
= uth
= workq_pop_idle_thread(wq
);
3112 if (workq_thread_needs_priority_change(req
, uth
)) {
3113 workq_thread_reset_pri(wq
, uth
, req
);
3115 workq_turnstile_update_inheritor(wq
, uth
->uu_thread
,
3116 TURNSTILE_INHERITOR_THREAD
);
3117 WQ_TRACE_WQ(TRACE_wq_creator_select
| DBG_FUNC_NONE
,
3118 wq
, 2, thread_tid(uth
->uu_thread
), req
->tr_qos
, 0);
3119 uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
= wq
->wq_fulfilled
;
3120 uth
->uu_save
.uus_workq_park_data
.yields
= 0;
3121 workq_thread_wakeup(uth
);
3124 * We need to allocate a thread...
3126 if (__improbable(wq
->wq_nthreads
>= wq_max_threads
)) {
3127 /* out of threads, just go away */
3128 } else if (flags
& WORKQ_THREADREQ_SET_AST_ON_FAILURE
) {
3129 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ
);
3130 } else if (!(flags
& WORKQ_THREADREQ_CAN_CREATE_THREADS
)) {
3131 /* This can drop the workqueue lock, and take it again */
3132 workq_schedule_immediate_thread_creation(wq
);
3133 } else if (workq_add_new_idle_thread(p
, wq
)) {
3136 workq_schedule_delayed_thread_creation(wq
, 0);
3139 if (flags
& WORKQ_THREADREQ_CREATOR_TRANSFER
) {
3141 * workq_schedule_creator() failed at creating a thread,
3142 * and the responsibility of redriving is now with a thread-call.
3144 * We still need to tell the turnstile the previous creator is gone.
3146 workq_turnstile_update_inheritor(wq
, NULL
, 0);
3152 * Runs a thread request on a thread
3154 * - if thread is THREAD_NULL, will find a thread and run the request there.
3155 * Otherwise, the thread must be the current thread.
3157 * - if req is NULL, will find the highest priority request and run that. If
3158 * it is not NULL, it must be a threadreq object in state NEW. If it can not
3159 * be run immediately, it will be enqueued and moved to state QUEUED.
3161 * Either way, the thread request object serviced will be moved to state
3162 * BINDING and attached to the uthread.
3164 * Should be called with the workqueue lock held. Will drop it.
3166 __attribute__((noreturn
, noinline
))
3168 workq_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
3169 struct uthread
*uth
)
3171 uint32_t setup_flags
= 0;
3172 workq_threadreq_t req
;
3174 if (uth
->uu_workq_flags
& UT_WORKQ_EARLY_BOUND
) {
3175 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
3176 setup_flags
|= WQ_SETUP_FIRST_USE
;
3178 uth
->uu_workq_flags
&= ~(UT_WORKQ_NEW
| UT_WORKQ_EARLY_BOUND
);
3180 * This pointer is possibly freed and only used for tracing purposes.
3182 req
= uth
->uu_save
.uus_workq_park_data
.thread_request
;
3184 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
3185 VM_KERNEL_ADDRHIDE(req
), 0, 0, 0);
3187 } else if (_wq_exiting(wq
)) {
3188 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
3189 } else if (wq
->wq_reqcount
== 0) {
3190 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 1, 0, 0, 0);
3191 } else if ((req
= workq_threadreq_select(wq
, uth
)) == NULL
) {
3192 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 2, 0, 0, 0);
3194 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
3195 workq_trace_req_id(req
), 0, 0, 0);
3196 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
3197 uth
->uu_workq_flags
^= UT_WORKQ_NEW
;
3198 setup_flags
|= WQ_SETUP_FIRST_USE
;
3200 workq_thread_reset_cpupercent(req
, uth
);
3201 workq_threadreq_bind_and_unlock(p
, wq
, req
, uth
);
3203 workq_setup_and_run(p
, uth
, setup_flags
);
3204 __builtin_unreachable();
3207 workq_park_and_unlock(p
, wq
, uth
);
3208 __builtin_unreachable();
3212 workq_creator_should_yield(struct workqueue
*wq
, struct uthread
*uth
)
3214 thread_qos_t qos
= workq_pri_override(uth
->uu_workq_pri
);
3216 if (qos
>= THREAD_QOS_USER_INTERACTIVE
) {
3220 uint32_t snapshot
= uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
;
3221 if (wq
->wq_fulfilled
== snapshot
) {
3225 uint32_t cnt
= 0, conc
= wq_max_parallelism
[_wq_bucket(qos
)];
3226 if (wq
->wq_fulfilled
- snapshot
> conc
) {
3227 /* we fulfilled more than NCPU requests since being dispatched */
3228 WQ_TRACE_WQ(TRACE_wq_creator_yield
, wq
, 1,
3229 wq
->wq_fulfilled
, snapshot
, 0);
3233 for (int i
= _wq_bucket(qos
); i
< WORKQ_NUM_QOS_BUCKETS
; i
++) {
3234 cnt
+= wq
->wq_thscheduled_count
[i
];
3237 /* We fulfilled requests and have more than NCPU scheduled threads */
3238 WQ_TRACE_WQ(TRACE_wq_creator_yield
, wq
, 2,
3239 wq
->wq_fulfilled
, snapshot
, 0);
3247 * parked thread wakes up
3249 __attribute__((noreturn
, noinline
))
3251 workq_unpark_continue(void *parameter __unused
, wait_result_t wr __unused
)
3253 struct uthread
*uth
= current_uthread();
3254 proc_t p
= current_proc();
3255 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
3257 workq_lock_spin(wq
);
3259 if (wq
->wq_creator
== uth
&& workq_creator_should_yield(wq
, uth
)) {
3261 * If the number of threads we have out are able to keep up with the
3262 * demand, then we should avoid sending this creator thread to
3265 uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
= wq
->wq_fulfilled
;
3266 uth
->uu_save
.uus_workq_park_data
.yields
++;
3268 thread_yield_with_continuation(workq_unpark_continue
, NULL
);
3269 __builtin_unreachable();
3272 if (__probable(uth
->uu_workq_flags
& UT_WORKQ_RUNNING
)) {
3273 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
);
3274 __builtin_unreachable();
3277 if (__probable(wr
== THREAD_AWAKENED
)) {
3279 * We were set running, but for the purposes of dying.
3281 assert(uth
->uu_workq_flags
& UT_WORKQ_DYING
);
3282 assert((uth
->uu_workq_flags
& UT_WORKQ_NEW
) == 0);
3285 * workaround for <rdar://problem/38647347>,
3286 * in case we do hit userspace, make sure calling
3287 * workq_thread_terminate() does the right thing here,
3288 * and if we never call it, that workq_exit() will too because it sees
3289 * this thread on the runlist.
3291 assert(wr
== THREAD_INTERRUPTED
);
3292 wq
->wq_thdying_count
++;
3293 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
3296 workq_unpark_for_death_and_unlock(p
, wq
, uth
,
3297 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
);
3298 __builtin_unreachable();
3301 __attribute__((noreturn
, noinline
))
3303 workq_setup_and_run(proc_t p
, struct uthread
*uth
, int setup_flags
)
3305 thread_t th
= uth
->uu_thread
;
3306 vm_map_t vmap
= get_task_map(p
->task
);
3308 if (setup_flags
& WQ_SETUP_CLEAR_VOUCHER
) {
3310 * For preemption reasons, we want to reset the voucher as late as
3311 * possible, so we do it in two places:
3312 * - Just before parking (i.e. in workq_park_and_unlock())
3313 * - Prior to doing the setup for the next workitem (i.e. here)
3315 * Those two places are sufficient to ensure we always reset it before
3316 * it goes back out to user space, but be careful to not break that
3319 __assert_only kern_return_t kr
;
3320 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
3321 assert(kr
== KERN_SUCCESS
);
3324 uint32_t upcall_flags
= uth
->uu_save
.uus_workq_park_data
.upcall_flags
;
3325 if (!(setup_flags
& WQ_SETUP_FIRST_USE
)) {
3326 upcall_flags
|= WQ_FLAG_THREAD_REUSE
;
3329 if (uth
->uu_workq_flags
& UT_WORKQ_OUTSIDE_QOS
) {
3331 * For threads that have an outside-of-QoS thread priority, indicate
3332 * to userspace that setting QoS should only affect the TSD and not
3333 * change QOS in the kernel.
3335 upcall_flags
|= WQ_FLAG_THREAD_OUTSIDEQOS
;
3338 * Put the QoS class value into the lower bits of the reuse_thread
3339 * register, this is where the thread priority used to be stored
3342 upcall_flags
|= uth
->uu_save
.uus_workq_park_data
.qos
|
3343 WQ_FLAG_THREAD_PRIO_QOS
;
3346 if (uth
->uu_workq_thport
== MACH_PORT_NULL
) {
3347 /* convert_thread_to_port() consumes a reference */
3348 thread_reference(th
);
3349 ipc_port_t port
= convert_thread_to_port(th
);
3350 uth
->uu_workq_thport
= ipc_port_copyout_send(port
, get_task_ipcspace(p
->task
));
3354 * Call out to pthread, this sets up the thread, pulls in kevent structs
3355 * onto the stack, sets up the thread state and then returns to userspace.
3357 WQ_TRACE_WQ(TRACE_wq_runthread
| DBG_FUNC_START
,
3358 proc_get_wqptr_fast(p
), 0, 0, 0, 0);
3359 thread_sched_call(th
, workq_sched_callback
);
3360 pthread_functions
->workq_setup_thread(p
, th
, vmap
, uth
->uu_workq_stackaddr
,
3361 uth
->uu_workq_thport
, 0, setup_flags
, upcall_flags
);
3363 __builtin_unreachable();
3369 fill_procworkqueue(proc_t p
, struct proc_workqueueinfo
* pwqinfo
)
3371 struct workqueue
*wq
= proc_get_wqptr(p
);
3380 * This is sometimes called from interrupt context by the kperf sampler.
3381 * In that case, it's not safe to spin trying to take the lock since we
3382 * might already hold it. So, we just try-lock it and error out if it's
3383 * already held. Since this is just a debugging aid, and all our callers
3384 * are able to handle an error, that's fine.
3386 bool locked
= workq_lock_try(wq
);
3391 wq_thactive_t act
= _wq_thactive(wq
);
3392 activecount
= _wq_thactive_aggregate_downto_qos(wq
, act
,
3393 WORKQ_THREAD_QOS_MIN
, NULL
, NULL
);
3394 if (act
& _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER
)) {
3397 pwqinfo
->pwq_nthreads
= wq
->wq_nthreads
;
3398 pwqinfo
->pwq_runthreads
= activecount
;
3399 pwqinfo
->pwq_blockedthreads
= wq
->wq_threads_scheduled
- activecount
;
3400 pwqinfo
->pwq_state
= 0;
3402 if (wq
->wq_constrained_threads_scheduled
>= wq_max_constrained_threads
) {
3403 pwqinfo
->pwq_state
|= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
;
3406 if (wq
->wq_nthreads
>= wq_max_threads
) {
3407 pwqinfo
->pwq_state
|= WQ_EXCEEDED_TOTAL_THREAD_LIMIT
;
3415 workqueue_get_pwq_exceeded(void *v
, boolean_t
*exceeded_total
,
3416 boolean_t
*exceeded_constrained
)
3419 struct proc_workqueueinfo pwqinfo
;
3423 assert(exceeded_total
!= NULL
);
3424 assert(exceeded_constrained
!= NULL
);
3426 err
= fill_procworkqueue(p
, &pwqinfo
);
3430 if (!(pwqinfo
.pwq_state
& WQ_FLAGS_AVAILABLE
)) {
3434 *exceeded_total
= (pwqinfo
.pwq_state
& WQ_EXCEEDED_TOTAL_THREAD_LIMIT
);
3435 *exceeded_constrained
= (pwqinfo
.pwq_state
& WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
);
3441 workqueue_get_pwq_state_kdp(void * v
)
3443 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
<< 17) ==
3444 kTaskWqExceededConstrainedThreadLimit
);
3445 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT
<< 17) ==
3446 kTaskWqExceededTotalThreadLimit
);
3447 static_assert((WQ_FLAGS_AVAILABLE
<< 17) == kTaskWqFlagsAvailable
);
3448 static_assert((WQ_FLAGS_AVAILABLE
| WQ_EXCEEDED_TOTAL_THREAD_LIMIT
|
3449 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
) == 0x7);
3456 struct workqueue
*wq
= proc_get_wqptr(p
);
3458 if (wq
== NULL
|| workq_lock_spin_is_acquired_kdp(wq
)) {
3462 uint32_t pwq_state
= WQ_FLAGS_AVAILABLE
;
3464 if (wq
->wq_constrained_threads_scheduled
>= wq_max_constrained_threads
) {
3465 pwq_state
|= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
;
3468 if (wq
->wq_nthreads
>= wq_max_threads
) {
3469 pwq_state
|= WQ_EXCEEDED_TOTAL_THREAD_LIMIT
;
3478 workq_lck_grp_attr
= lck_grp_attr_alloc_init();
3479 workq_lck_attr
= lck_attr_alloc_init();
3480 workq_lck_grp
= lck_grp_alloc_init("workq", workq_lck_grp_attr
);
3482 workq_zone_workqueue
= zinit(sizeof(struct workqueue
),
3483 1024 * sizeof(struct workqueue
), 8192, "workq.wq");
3484 workq_zone_threadreq
= zinit(sizeof(struct workq_threadreq_s
),
3485 1024 * sizeof(struct workq_threadreq_s
), 8192, "workq.threadreq");
3487 clock_interval_to_absolutetime_interval(wq_stalled_window
.usecs
,
3488 NSEC_PER_USEC
, &wq_stalled_window
.abstime
);
3489 clock_interval_to_absolutetime_interval(wq_reduce_pool_window
.usecs
,
3490 NSEC_PER_USEC
, &wq_reduce_pool_window
.abstime
);
3491 clock_interval_to_absolutetime_interval(wq_max_timer_interval
.usecs
,
3492 NSEC_PER_USEC
, &wq_max_timer_interval
.abstime
);