2 * Copyright (c) 2000-2020 Apple Inc. All rights reserved.
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
30 #include <sys/cdefs.h>
32 #include <kern/assert.h>
34 #include <kern/clock.h>
35 #include <kern/cpu_data.h>
36 #include <kern/kern_types.h>
37 #include <kern/policy_internal.h>
38 #include <kern/processor.h>
39 #include <kern/sched_prim.h> /* for thread_exception_return */
40 #include <kern/task.h>
41 #include <kern/thread.h>
42 #include <kern/zalloc.h>
43 #include <mach/kern_return.h>
44 #include <mach/mach_param.h>
45 #include <mach/mach_port.h>
46 #include <mach/mach_types.h>
47 #include <mach/mach_vm.h>
48 #include <mach/sync_policy.h>
49 #include <mach/task.h>
50 #include <mach/thread_act.h> /* for thread_resume */
51 #include <mach/thread_policy.h>
52 #include <mach/thread_status.h>
53 #include <mach/vm_prot.h>
54 #include <mach/vm_statistics.h>
55 #include <machine/atomic.h>
56 #include <machine/machine_routines.h>
57 #include <vm/vm_map.h>
58 #include <vm/vm_protos.h>
60 #include <sys/eventvar.h>
61 #include <sys/kdebug.h>
62 #include <sys/kernel.h>
64 #include <sys/param.h>
65 #include <sys/proc_info.h> /* for fill_procworkqueue */
66 #include <sys/proc_internal.h>
67 #include <sys/pthread_shims.h>
68 #include <sys/resourcevar.h>
69 #include <sys/signalvar.h>
70 #include <sys/sysctl.h>
71 #include <sys/sysproto.h>
72 #include <sys/systm.h>
73 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
75 #include <pthread/bsdthread_private.h>
76 #include <pthread/workqueue_syscalls.h>
77 #include <pthread/workqueue_internal.h>
78 #include <pthread/workqueue_trace.h>
82 static void workq_unpark_continue(void *uth
, wait_result_t wr
) __dead2
;
83 static void workq_schedule_creator(proc_t p
, struct workqueue
*wq
,
84 workq_kern_threadreq_flags_t flags
);
86 static bool workq_threadreq_admissible(struct workqueue
*wq
, struct uthread
*uth
,
87 workq_threadreq_t req
);
89 static uint32_t workq_constrained_allowance(struct workqueue
*wq
,
90 thread_qos_t at_qos
, struct uthread
*uth
, bool may_start_timer
);
92 static bool workq_thread_is_busy(uint64_t cur_ts
,
93 _Atomic
uint64_t *lastblocked_tsp
);
95 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
;
99 struct workq_usec_var
{
104 #define WORKQ_SYSCTL_USECS(var, init) \
105 static struct workq_usec_var var = { .usecs = init }; \
106 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
107 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
108 workq_sysctl_handle_usecs, "I", "")
110 static LCK_GRP_DECLARE(workq_lck_grp
, "workq");
111 os_refgrp_decl(static, workq_refgrp
, "workq", NULL
);
113 static ZONE_DECLARE(workq_zone_workqueue
, "workq.wq",
114 sizeof(struct workqueue
), ZC_NONE
);
115 static ZONE_DECLARE(workq_zone_threadreq
, "workq.threadreq",
116 sizeof(struct workq_threadreq_s
), ZC_CACHING
);
118 static struct mpsc_daemon_queue workq_deallocate_queue
;
120 WORKQ_SYSCTL_USECS(wq_stalled_window
, WQ_STALLED_WINDOW_USECS
);
121 WORKQ_SYSCTL_USECS(wq_reduce_pool_window
, WQ_REDUCE_POOL_WINDOW_USECS
);
122 WORKQ_SYSCTL_USECS(wq_max_timer_interval
, WQ_MAX_TIMER_INTERVAL_USECS
);
123 static uint32_t wq_max_threads
= WORKQUEUE_MAXTHREADS
;
124 static uint32_t wq_max_constrained_threads
= WORKQUEUE_MAXTHREADS
/ 8;
125 static uint32_t wq_init_constrained_limit
= 1;
126 static uint16_t wq_death_max_load
;
127 static uint32_t wq_max_parallelism
[WORKQ_NUM_QOS_BUCKETS
];
132 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
135 struct workq_usec_var
*v
= arg1
;
136 int error
= sysctl_handle_int(oidp
, &v
->usecs
, 0, req
);
137 if (error
|| !req
->newptr
) {
140 clock_interval_to_absolutetime_interval(v
->usecs
, NSEC_PER_USEC
,
145 SYSCTL_INT(_kern
, OID_AUTO
, wq_max_threads
, CTLFLAG_RW
| CTLFLAG_LOCKED
,
146 &wq_max_threads
, 0, "");
148 SYSCTL_INT(_kern
, OID_AUTO
, wq_max_constrained_threads
, CTLFLAG_RW
| CTLFLAG_LOCKED
,
149 &wq_max_constrained_threads
, 0, "");
153 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
155 static struct workqueue
*
156 proc_get_wqptr_fast(struct proc
*p
)
158 return os_atomic_load(&p
->p_wqptr
, relaxed
);
161 static struct workqueue
*
162 proc_get_wqptr(struct proc
*p
)
164 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
165 return wq
== WQPTR_IS_INITING_VALUE
? NULL
: wq
;
169 proc_set_wqptr(struct proc
*p
, struct workqueue
*wq
)
171 wq
= os_atomic_xchg(&p
->p_wqptr
, wq
, release
);
172 if (wq
== WQPTR_IS_INITING_VALUE
) {
174 thread_wakeup(&p
->p_wqptr
);
180 proc_init_wqptr_or_wait(struct proc
*p
)
182 struct workqueue
*wq
;
185 wq
= os_atomic_load(&p
->p_wqptr
, relaxed
);
188 os_atomic_store(&p
->p_wqptr
, WQPTR_IS_INITING_VALUE
, relaxed
);
193 if (wq
== WQPTR_IS_INITING_VALUE
) {
194 assert_wait(&p
->p_wqptr
, THREAD_UNINT
);
196 thread_block(THREAD_CONTINUE_NULL
);
203 static inline event_t
204 workq_parked_wait_event(struct uthread
*uth
)
206 return (event_t
)&uth
->uu_workq_stackaddr
;
210 workq_thread_wakeup(struct uthread
*uth
)
212 thread_wakeup_thread(workq_parked_wait_event(uth
), uth
->uu_thread
);
215 #pragma mark wq_thactive
217 #if defined(__LP64__)
219 // 127 - 115 : 13 bits of zeroes
220 // 114 - 112 : best QoS among all pending constrained requests
221 // 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
222 #define WQ_THACTIVE_BUCKET_WIDTH 16
223 #define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH)
226 // 63 - 61 : best QoS among all pending constrained requests
227 // 60 : Manager bucket (0 or 1)
228 // 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
229 #define WQ_THACTIVE_BUCKET_WIDTH 10
230 #define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
232 #define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
233 #define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
235 static_assert(sizeof(wq_thactive_t
) * CHAR_BIT
- WQ_THACTIVE_QOS_SHIFT
>= 3,
236 "Make sure we have space to encode a QoS");
238 static inline wq_thactive_t
239 _wq_thactive(struct workqueue
*wq
)
241 return os_atomic_load_wide(&wq
->wq_thactive
, relaxed
);
245 _wq_bucket(thread_qos_t qos
)
247 // Map both BG and MT to the same bucket by over-shifting down and
248 // clamping MT and BG together.
250 case THREAD_QOS_MAINTENANCE
:
257 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
258 ((thread_qos_t)((tha) >> WQ_THACTIVE_QOS_SHIFT))
260 static inline thread_qos_t
261 _wq_thactive_best_constrained_req_qos(struct workqueue
*wq
)
263 // Avoid expensive atomic operations: the three bits we're loading are in
264 // a single byte, and always updated under the workqueue lock
265 wq_thactive_t v
= *(wq_thactive_t
*)&wq
->wq_thactive
;
266 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v
);
270 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue
*wq
)
272 thread_qos_t old_qos
, new_qos
;
273 workq_threadreq_t req
;
275 req
= priority_queue_max(&wq
->wq_constrained_queue
,
276 struct workq_threadreq_s
, tr_entry
);
277 new_qos
= req
? req
->tr_qos
: THREAD_QOS_UNSPECIFIED
;
278 old_qos
= _wq_thactive_best_constrained_req_qos(wq
);
279 if (old_qos
!= new_qos
) {
280 long delta
= (long)new_qos
- (long)old_qos
;
281 wq_thactive_t v
= (wq_thactive_t
)delta
<< WQ_THACTIVE_QOS_SHIFT
;
283 * We can do an atomic add relative to the initial load because updates
284 * to this qos are always serialized under the workqueue lock.
286 v
= os_atomic_add(&wq
->wq_thactive
, v
, relaxed
);
288 WQ_TRACE_WQ(TRACE_wq_thactive_update
, wq
, (uint64_t)v
,
289 (uint64_t)(v
>> 64), 0, 0);
291 WQ_TRACE_WQ(TRACE_wq_thactive_update
, wq
, v
, 0, 0, 0);
296 static inline wq_thactive_t
297 _wq_thactive_offset_for_qos(thread_qos_t qos
)
299 return (wq_thactive_t
)1 << (_wq_bucket(qos
) * WQ_THACTIVE_BUCKET_WIDTH
);
302 static inline wq_thactive_t
303 _wq_thactive_inc(struct workqueue
*wq
, thread_qos_t qos
)
305 wq_thactive_t v
= _wq_thactive_offset_for_qos(qos
);
306 return os_atomic_add_orig(&wq
->wq_thactive
, v
, relaxed
);
309 static inline wq_thactive_t
310 _wq_thactive_dec(struct workqueue
*wq
, thread_qos_t qos
)
312 wq_thactive_t v
= _wq_thactive_offset_for_qos(qos
);
313 return os_atomic_sub_orig(&wq
->wq_thactive
, v
, relaxed
);
317 _wq_thactive_move(struct workqueue
*wq
,
318 thread_qos_t old_qos
, thread_qos_t new_qos
)
320 wq_thactive_t v
= _wq_thactive_offset_for_qos(new_qos
) -
321 _wq_thactive_offset_for_qos(old_qos
);
322 os_atomic_add(&wq
->wq_thactive
, v
, relaxed
);
323 wq
->wq_thscheduled_count
[_wq_bucket(old_qos
)]--;
324 wq
->wq_thscheduled_count
[_wq_bucket(new_qos
)]++;
327 static inline uint32_t
328 _wq_thactive_aggregate_downto_qos(struct workqueue
*wq
, wq_thactive_t v
,
329 thread_qos_t qos
, uint32_t *busycount
, uint32_t *max_busycount
)
331 uint32_t count
= 0, active
;
334 assert(WORKQ_THREAD_QOS_MIN
<= qos
&& qos
<= WORKQ_THREAD_QOS_MAX
);
337 curtime
= mach_absolute_time();
341 *max_busycount
= THREAD_QOS_LAST
- qos
;
344 int i
= _wq_bucket(qos
);
345 v
>>= i
* WQ_THACTIVE_BUCKET_WIDTH
;
346 for (; i
< WORKQ_NUM_QOS_BUCKETS
; i
++, v
>>= WQ_THACTIVE_BUCKET_WIDTH
) {
347 active
= v
& WQ_THACTIVE_BUCKET_MASK
;
350 if (busycount
&& wq
->wq_thscheduled_count
[i
] > active
) {
351 if (workq_thread_is_busy(curtime
, &wq
->wq_lastblocked_ts
[i
])) {
353 * We only consider the last blocked thread for a given bucket
354 * as busy because we don't want to take the list lock in each
355 * sched callback. However this is an approximation that could
356 * contribute to thread creation storms.
366 #pragma mark wq_flags
368 static inline uint32_t
369 _wq_flags(struct workqueue
*wq
)
371 return os_atomic_load(&wq
->wq_flags
, relaxed
);
375 _wq_exiting(struct workqueue
*wq
)
377 return _wq_flags(wq
) & WQ_EXITING
;
381 workq_is_exiting(struct proc
*p
)
383 struct workqueue
*wq
= proc_get_wqptr(p
);
384 return !wq
|| _wq_exiting(wq
);
387 #pragma mark workqueue lock
390 workq_lock_spin_is_acquired_kdp(struct workqueue
*wq
)
392 return kdp_lck_spin_is_acquired(&wq
->wq_lock
);
396 workq_lock_spin(struct workqueue
*wq
)
398 lck_spin_lock_grp(&wq
->wq_lock
, &workq_lck_grp
);
402 workq_lock_held(__assert_only
struct workqueue
*wq
)
404 LCK_SPIN_ASSERT(&wq
->wq_lock
, LCK_ASSERT_OWNED
);
408 workq_lock_try(struct workqueue
*wq
)
410 return lck_spin_try_lock_grp(&wq
->wq_lock
, &workq_lck_grp
);
414 workq_unlock(struct workqueue
*wq
)
416 lck_spin_unlock(&wq
->wq_lock
);
419 #pragma mark idle thread lists
421 #define WORKQ_POLICY_INIT(qos) \
422 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
424 static inline thread_qos_t
425 workq_pri_bucket(struct uu_workq_policy req
)
427 return MAX(MAX(req
.qos_req
, req
.qos_max
), req
.qos_override
);
430 static inline thread_qos_t
431 workq_pri_override(struct uu_workq_policy req
)
433 return MAX(workq_pri_bucket(req
), req
.qos_bucket
);
437 workq_thread_needs_params_change(workq_threadreq_t req
, struct uthread
*uth
)
439 workq_threadreq_param_t cur_trp
, req_trp
= { };
441 cur_trp
.trp_value
= uth
->uu_save
.uus_workq_park_data
.workloop_params
;
442 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
) {
443 req_trp
= kqueue_threadreq_workloop_param(req
);
447 * CPU percent flags are handled separately to policy changes, so ignore
448 * them for all of these checks.
450 uint16_t cur_flags
= (cur_trp
.trp_flags
& ~TRP_CPUPERCENT
);
451 uint16_t req_flags
= (req_trp
.trp_flags
& ~TRP_CPUPERCENT
);
453 if (!req_flags
&& !cur_flags
) {
457 if (req_flags
!= cur_flags
) {
461 if ((req_flags
& TRP_PRIORITY
) && req_trp
.trp_pri
!= cur_trp
.trp_pri
) {
465 if ((req_flags
& TRP_POLICY
) && req_trp
.trp_pol
!= cur_trp
.trp_pol
) {
473 workq_thread_needs_priority_change(workq_threadreq_t req
, struct uthread
*uth
)
475 if (workq_thread_needs_params_change(req
, uth
)) {
479 return req
->tr_qos
!= workq_pri_override(uth
->uu_workq_pri
);
483 workq_thread_update_bucket(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
484 struct uu_workq_policy old_pri
, struct uu_workq_policy new_pri
,
487 thread_qos_t old_bucket
= old_pri
.qos_bucket
;
488 thread_qos_t new_bucket
= workq_pri_bucket(new_pri
);
490 if (old_bucket
!= new_bucket
) {
491 _wq_thactive_move(wq
, old_bucket
, new_bucket
);
494 new_pri
.qos_bucket
= new_bucket
;
495 uth
->uu_workq_pri
= new_pri
;
497 if (workq_pri_override(old_pri
) != new_bucket
) {
498 thread_set_workq_override(uth
->uu_thread
, new_bucket
);
501 if (wq
->wq_reqcount
&& (old_bucket
> new_bucket
|| force_run
)) {
502 int flags
= WORKQ_THREADREQ_CAN_CREATE_THREADS
;
503 if (old_bucket
> new_bucket
) {
505 * When lowering our bucket, we may unblock a thread request,
506 * but we can't drop our priority before we have evaluated
507 * whether this is the case, and if we ever drop the workqueue lock
508 * that would cause a priority inversion.
510 * We hence have to disallow thread creation in that case.
514 workq_schedule_creator(p
, wq
, flags
);
519 * Sets/resets the cpu percent limits on the current thread. We can't set
520 * these limits from outside of the current thread, so this function needs
521 * to be called when we're executing on the intended
524 workq_thread_reset_cpupercent(workq_threadreq_t req
, struct uthread
*uth
)
526 assert(uth
== current_uthread());
527 workq_threadreq_param_t trp
= { };
529 if (req
&& (req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
)) {
530 trp
= kqueue_threadreq_workloop_param(req
);
533 if (uth
->uu_workq_flags
& UT_WORKQ_CPUPERCENT
) {
535 * Going through disable when we have an existing CPU percent limit
536 * set will force the ledger to refill the token bucket of the current
537 * thread. Removing any penalty applied by previous thread use.
539 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE
, 0, 0);
540 uth
->uu_workq_flags
&= ~UT_WORKQ_CPUPERCENT
;
543 if (trp
.trp_flags
& TRP_CPUPERCENT
) {
544 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK
, trp
.trp_cpupercent
,
545 (uint64_t)trp
.trp_refillms
* NSEC_PER_SEC
);
546 uth
->uu_workq_flags
|= UT_WORKQ_CPUPERCENT
;
551 workq_thread_reset_pri(struct workqueue
*wq
, struct uthread
*uth
,
552 workq_threadreq_t req
, bool unpark
)
554 thread_t th
= uth
->uu_thread
;
555 thread_qos_t qos
= req
? req
->tr_qos
: WORKQ_THREAD_QOS_CLEANUP
;
556 workq_threadreq_param_t trp
= { };
558 int policy
= POLICY_TIMESHARE
;
560 if (req
&& (req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
)) {
561 trp
= kqueue_threadreq_workloop_param(req
);
564 uth
->uu_workq_pri
= WORKQ_POLICY_INIT(qos
);
565 uth
->uu_workq_flags
&= ~UT_WORKQ_OUTSIDE_QOS
;
568 uth
->uu_save
.uus_workq_park_data
.workloop_params
= trp
.trp_value
;
569 // qos sent out to userspace (may differ from uu_workq_pri on param threads)
570 uth
->uu_save
.uus_workq_park_data
.qos
= qos
;
573 if (qos
== WORKQ_THREAD_QOS_MANAGER
) {
574 uint32_t mgr_pri
= wq
->wq_event_manager_priority
;
575 assert(trp
.trp_value
== 0); // manager qos and thread policy don't mix
577 if (mgr_pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
578 mgr_pri
&= _PTHREAD_PRIORITY_SCHED_PRI_MASK
;
579 thread_set_workq_pri(th
, THREAD_QOS_UNSPECIFIED
, mgr_pri
,
584 qos
= _pthread_priority_thread_qos(mgr_pri
);
586 if (trp
.trp_flags
& TRP_PRIORITY
) {
587 qos
= THREAD_QOS_UNSPECIFIED
;
588 priority
= trp
.trp_pri
;
589 uth
->uu_workq_flags
|= UT_WORKQ_OUTSIDE_QOS
;
592 if (trp
.trp_flags
& TRP_POLICY
) {
593 policy
= trp
.trp_pol
;
597 thread_set_workq_pri(th
, qos
, priority
, policy
);
601 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
602 * every time a servicer is being told about a new max QoS.
605 workq_thread_set_max_qos(struct proc
*p
, workq_threadreq_t kqr
)
607 struct uu_workq_policy old_pri
, new_pri
;
608 struct uthread
*uth
= current_uthread();
609 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
610 thread_qos_t qos
= kqr
->tr_kq_qos_index
;
612 if (uth
->uu_workq_pri
.qos_max
== qos
) {
617 old_pri
= new_pri
= uth
->uu_workq_pri
;
618 new_pri
.qos_max
= qos
;
619 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
623 #pragma mark idle threads accounting and handling
625 static inline struct uthread
*
626 workq_oldest_killable_idle_thread(struct workqueue
*wq
)
628 struct uthread
*uth
= TAILQ_LAST(&wq
->wq_thidlelist
, workq_uthread_head
);
630 if (uth
&& !uth
->uu_save
.uus_workq_park_data
.has_stack
) {
631 uth
= TAILQ_PREV(uth
, workq_uthread_head
, uu_workq_entry
);
633 assert(uth
->uu_save
.uus_workq_park_data
.has_stack
);
639 static inline uint64_t
640 workq_kill_delay_for_idle_thread(struct workqueue
*wq
)
642 uint64_t delay
= wq_reduce_pool_window
.abstime
;
643 uint16_t idle
= wq
->wq_thidlecount
;
646 * If we have less than wq_death_max_load threads, have a 5s timer.
648 * For the next wq_max_constrained_threads ones, decay linearly from
651 if (idle
<= wq_death_max_load
) {
655 if (wq_max_constrained_threads
> idle
- wq_death_max_load
) {
656 delay
*= (wq_max_constrained_threads
- (idle
- wq_death_max_load
));
658 return delay
/ wq_max_constrained_threads
;
662 workq_should_kill_idle_thread(struct workqueue
*wq
, struct uthread
*uth
,
665 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
666 return now
- uth
->uu_save
.uus_workq_park_data
.idle_stamp
> delay
;
670 workq_death_call_schedule(struct workqueue
*wq
, uint64_t deadline
)
672 uint32_t wq_flags
= os_atomic_load(&wq
->wq_flags
, relaxed
);
674 if (wq_flags
& (WQ_EXITING
| WQ_DEATH_CALL_SCHEDULED
)) {
677 os_atomic_or(&wq
->wq_flags
, WQ_DEATH_CALL_SCHEDULED
, relaxed
);
679 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_NONE
, wq
, 1, 0, 0, 0);
682 * <rdar://problem/13139182> Due to how long term timers work, the leeway
683 * can't be too short, so use 500ms which is long enough that we will not
684 * wake up the CPU for killing threads, but short enough that it doesn't
685 * fall into long-term timer list shenanigans.
687 thread_call_enter_delayed_with_leeway(wq
->wq_death_call
, NULL
, deadline
,
688 wq_reduce_pool_window
.abstime
/ 10,
689 THREAD_CALL_DELAY_LEEWAY
| THREAD_CALL_DELAY_USER_BACKGROUND
);
693 * `decrement` is set to the number of threads that are no longer dying:
694 * - because they have been resuscitated just in time (workq_pop_idle_thread)
695 * - or have been killed (workq_thread_terminate).
698 workq_death_policy_evaluate(struct workqueue
*wq
, uint16_t decrement
)
702 assert(wq
->wq_thdying_count
>= decrement
);
703 if ((wq
->wq_thdying_count
-= decrement
) > 0) {
707 if (wq
->wq_thidlecount
<= 1) {
711 if ((uth
= workq_oldest_killable_idle_thread(wq
)) == NULL
) {
715 uint64_t now
= mach_absolute_time();
716 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
718 if (now
- uth
->uu_save
.uus_workq_park_data
.idle_stamp
> delay
) {
719 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_START
,
720 wq
, wq
->wq_thidlecount
, 0, 0, 0);
721 wq
->wq_thdying_count
++;
722 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
723 if ((uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) == 0) {
724 workq_thread_wakeup(uth
);
729 workq_death_call_schedule(wq
,
730 uth
->uu_save
.uus_workq_park_data
.idle_stamp
+ delay
);
734 workq_thread_terminate(struct proc
*p
, struct uthread
*uth
)
736 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
739 TAILQ_REMOVE(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
740 if (uth
->uu_workq_flags
& UT_WORKQ_DYING
) {
741 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_END
,
742 wq
, wq
->wq_thidlecount
, 0, 0, 0);
743 workq_death_policy_evaluate(wq
, 1);
745 if (wq
->wq_nthreads
-- == wq_max_threads
) {
747 * We got under the thread limit again, which may have prevented
748 * thread creation from happening, redrive if there are pending requests
750 if (wq
->wq_reqcount
) {
751 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
756 thread_deallocate(uth
->uu_thread
);
760 workq_kill_old_threads_call(void *param0
, void *param1 __unused
)
762 struct workqueue
*wq
= param0
;
765 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
766 os_atomic_andnot(&wq
->wq_flags
, WQ_DEATH_CALL_SCHEDULED
, relaxed
);
767 workq_death_policy_evaluate(wq
, 0);
768 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
772 static struct uthread
*
773 workq_pop_idle_thread(struct workqueue
*wq
, uint8_t uu_flags
,
778 if ((uth
= TAILQ_FIRST(&wq
->wq_thidlelist
))) {
779 TAILQ_REMOVE(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
781 uth
= TAILQ_FIRST(&wq
->wq_thnewlist
);
782 TAILQ_REMOVE(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
784 TAILQ_INSERT_TAIL(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
786 assert((uth
->uu_workq_flags
& UT_WORKQ_RUNNING
) == 0);
787 uth
->uu_workq_flags
|= UT_WORKQ_RUNNING
| uu_flags
;
788 if ((uu_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
789 wq
->wq_constrained_threads_scheduled
++;
791 wq
->wq_threads_scheduled
++;
792 wq
->wq_thidlecount
--;
794 if (__improbable(uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
795 uth
->uu_workq_flags
^= UT_WORKQ_DYING
;
796 workq_death_policy_evaluate(wq
, 1);
797 *needs_wakeup
= false;
798 } else if (uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) {
799 *needs_wakeup
= false;
801 *needs_wakeup
= true;
807 * Called by thread_create_workq_waiting() during thread initialization, before
808 * assert_wait, before the thread has been started.
811 workq_thread_init_and_wq_lock(task_t task
, thread_t th
)
813 struct uthread
*uth
= get_bsdthread_info(th
);
815 uth
->uu_workq_flags
= UT_WORKQ_NEW
;
816 uth
->uu_workq_pri
= WORKQ_POLICY_INIT(THREAD_QOS_LEGACY
);
817 uth
->uu_workq_thport
= MACH_PORT_NULL
;
818 uth
->uu_workq_stackaddr
= 0;
819 uth
->uu_workq_pthread_kill_allowed
= 0;
821 thread_set_tag(th
, THREAD_TAG_PTHREAD
| THREAD_TAG_WORKQUEUE
);
822 thread_reset_workq_qos(th
, THREAD_QOS_LEGACY
);
824 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task
)));
825 return workq_parked_wait_event(uth
);
829 * Try to add a new workqueue thread.
831 * - called with workq lock held
832 * - dropped and retaken around thread creation
833 * - return with workq lock held
836 workq_add_new_idle_thread(proc_t p
, struct workqueue
*wq
)
838 mach_vm_offset_t th_stackaddr
;
846 vm_map_t vmap
= get_task_map(p
->task
);
848 kret
= pthread_functions
->workq_create_threadstack(p
, vmap
, &th_stackaddr
);
849 if (kret
!= KERN_SUCCESS
) {
850 WQ_TRACE_WQ(TRACE_wq_thread_create_failed
| DBG_FUNC_NONE
, wq
,
855 kret
= thread_create_workq_waiting(p
->task
, workq_unpark_continue
, &th
);
856 if (kret
!= KERN_SUCCESS
) {
857 WQ_TRACE_WQ(TRACE_wq_thread_create_failed
| DBG_FUNC_NONE
, wq
,
859 pthread_functions
->workq_destroy_threadstack(p
, vmap
, th_stackaddr
);
863 // thread_create_workq_waiting() will return with the wq lock held
864 // on success, because it calls workq_thread_init_and_wq_lock() above
866 struct uthread
*uth
= get_bsdthread_info(th
);
869 wq
->wq_thidlecount
++;
870 uth
->uu_workq_stackaddr
= (user_addr_t
)th_stackaddr
;
871 TAILQ_INSERT_TAIL(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
873 WQ_TRACE_WQ(TRACE_wq_thread_create
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
879 * Do not redrive here if we went under wq_max_threads again,
880 * it is the responsibility of the callers of this function
881 * to do so when it fails.
887 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
889 __attribute__((noreturn
, noinline
))
891 workq_unpark_for_death_and_unlock(proc_t p
, struct workqueue
*wq
,
892 struct uthread
*uth
, uint32_t death_flags
, uint32_t setup_flags
)
894 thread_qos_t qos
= workq_pri_override(uth
->uu_workq_pri
);
895 bool first_use
= uth
->uu_workq_flags
& UT_WORKQ_NEW
;
897 if (qos
> WORKQ_THREAD_QOS_CLEANUP
) {
898 workq_thread_reset_pri(wq
, uth
, NULL
, /*unpark*/ true);
899 qos
= WORKQ_THREAD_QOS_CLEANUP
;
902 workq_thread_reset_cpupercent(NULL
, uth
);
904 if (death_flags
& WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
) {
905 wq
->wq_thidlecount
--;
907 TAILQ_REMOVE(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
909 TAILQ_REMOVE(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
912 TAILQ_INSERT_TAIL(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
916 if (setup_flags
& WQ_SETUP_CLEAR_VOUCHER
) {
917 __assert_only kern_return_t kr
;
918 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
919 assert(kr
== KERN_SUCCESS
);
922 uint32_t flags
= WQ_FLAG_THREAD_NEWSPI
| qos
| WQ_FLAG_THREAD_PRIO_QOS
;
923 thread_t th
= uth
->uu_thread
;
924 vm_map_t vmap
= get_task_map(p
->task
);
927 flags
|= WQ_FLAG_THREAD_REUSE
;
930 pthread_functions
->workq_setup_thread(p
, th
, vmap
, uth
->uu_workq_stackaddr
,
931 uth
->uu_workq_thport
, 0, WQ_SETUP_EXIT_THREAD
, flags
);
932 __builtin_unreachable();
936 workq_is_current_thread_updating_turnstile(struct workqueue
*wq
)
938 return wq
->wq_turnstile_updater
== current_thread();
941 __attribute__((always_inline
))
943 workq_perform_turnstile_operation_locked(struct workqueue
*wq
,
944 void (^operation
)(void))
947 wq
->wq_turnstile_updater
= current_thread();
949 wq
->wq_turnstile_updater
= THREAD_NULL
;
953 workq_turnstile_update_inheritor(struct workqueue
*wq
,
954 turnstile_inheritor_t inheritor
,
955 turnstile_update_flags_t flags
)
957 if (wq
->wq_inheritor
== inheritor
) {
960 wq
->wq_inheritor
= inheritor
;
961 workq_perform_turnstile_operation_locked(wq
, ^{
962 turnstile_update_inheritor(wq
->wq_turnstile
, inheritor
,
963 flags
| TURNSTILE_IMMEDIATE_UPDATE
);
964 turnstile_update_inheritor_complete(wq
->wq_turnstile
,
965 TURNSTILE_INTERLOCK_HELD
);
970 workq_push_idle_thread(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
971 uint32_t setup_flags
)
973 uint64_t now
= mach_absolute_time();
974 bool is_creator
= (uth
== wq
->wq_creator
);
976 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
977 wq
->wq_constrained_threads_scheduled
--;
979 uth
->uu_workq_flags
&= ~(UT_WORKQ_RUNNING
| UT_WORKQ_OVERCOMMIT
);
980 TAILQ_REMOVE(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
981 wq
->wq_threads_scheduled
--;
984 wq
->wq_creator
= NULL
;
985 WQ_TRACE_WQ(TRACE_wq_creator_select
, wq
, 3, 0,
986 uth
->uu_save
.uus_workq_park_data
.yields
, 0);
989 if (wq
->wq_inheritor
== uth
->uu_thread
) {
990 assert(wq
->wq_creator
== NULL
);
991 if (wq
->wq_reqcount
) {
992 workq_turnstile_update_inheritor(wq
, wq
, TURNSTILE_INHERITOR_WORKQ
);
994 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
998 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
999 assert(is_creator
|| (_wq_flags(wq
) & WQ_EXITING
));
1000 TAILQ_INSERT_TAIL(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
1001 wq
->wq_thidlecount
++;
1006 _wq_thactive_dec(wq
, uth
->uu_workq_pri
.qos_bucket
);
1007 wq
->wq_thscheduled_count
[_wq_bucket(uth
->uu_workq_pri
.qos_bucket
)]--;
1008 uth
->uu_workq_flags
|= UT_WORKQ_IDLE_CLEANUP
;
1011 uth
->uu_save
.uus_workq_park_data
.idle_stamp
= now
;
1013 struct uthread
*oldest
= workq_oldest_killable_idle_thread(wq
);
1014 uint16_t cur_idle
= wq
->wq_thidlecount
;
1016 if (cur_idle
>= wq_max_constrained_threads
||
1017 (wq
->wq_thdying_count
== 0 && oldest
&&
1018 workq_should_kill_idle_thread(wq
, oldest
, now
))) {
1020 * Immediately kill threads if we have too may of them.
1022 * And swap "place" with the oldest one we'd have woken up.
1023 * This is a relatively desperate situation where we really
1024 * need to kill threads quickly and it's best to kill
1025 * the one that's currently on core than context switching.
1028 oldest
->uu_save
.uus_workq_park_data
.idle_stamp
= now
;
1029 TAILQ_REMOVE(&wq
->wq_thidlelist
, oldest
, uu_workq_entry
);
1030 TAILQ_INSERT_HEAD(&wq
->wq_thidlelist
, oldest
, uu_workq_entry
);
1033 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_START
,
1034 wq
, cur_idle
, 0, 0, 0);
1035 wq
->wq_thdying_count
++;
1036 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
1037 uth
->uu_workq_flags
&= ~UT_WORKQ_IDLE_CLEANUP
;
1038 workq_unpark_for_death_and_unlock(p
, wq
, uth
, 0, setup_flags
);
1039 __builtin_unreachable();
1042 struct uthread
*tail
= TAILQ_LAST(&wq
->wq_thidlelist
, workq_uthread_head
);
1045 wq
->wq_thidlecount
= cur_idle
;
1047 if (cur_idle
>= wq_death_max_load
&& tail
&&
1048 tail
->uu_save
.uus_workq_park_data
.has_stack
) {
1049 uth
->uu_save
.uus_workq_park_data
.has_stack
= false;
1050 TAILQ_INSERT_TAIL(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
1052 uth
->uu_save
.uus_workq_park_data
.has_stack
= true;
1053 TAILQ_INSERT_HEAD(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
1057 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
1058 workq_death_call_schedule(wq
, now
+ delay
);
1062 #pragma mark thread requests
1065 workq_priority_for_req(workq_threadreq_t req
)
1067 thread_qos_t qos
= req
->tr_qos
;
1069 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
1070 workq_threadreq_param_t trp
= kqueue_threadreq_workloop_param(req
);
1071 assert(trp
.trp_flags
& TRP_PRIORITY
);
1074 return thread_workq_pri_for_qos(qos
);
1077 static inline struct priority_queue_sched_max
*
1078 workq_priority_queue_for_req(struct workqueue
*wq
, workq_threadreq_t req
)
1080 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
1081 return &wq
->wq_special_queue
;
1082 } else if (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
1083 return &wq
->wq_overcommit_queue
;
1085 return &wq
->wq_constrained_queue
;
1090 * returns true if the the enqueued request is the highest priority item
1091 * in its priority queue.
1094 workq_threadreq_enqueue(struct workqueue
*wq
, workq_threadreq_t req
)
1096 assert(req
->tr_state
== WORKQ_TR_STATE_NEW
);
1098 req
->tr_state
= WORKQ_TR_STATE_QUEUED
;
1099 wq
->wq_reqcount
+= req
->tr_count
;
1101 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
1102 assert(wq
->wq_event_manager_threadreq
== NULL
);
1103 assert(req
->tr_flags
& WORKQ_TR_FLAG_KEVENT
);
1104 assert(req
->tr_count
== 1);
1105 wq
->wq_event_manager_threadreq
= req
;
1109 struct priority_queue_sched_max
*q
= workq_priority_queue_for_req(wq
, req
);
1110 priority_queue_entry_set_sched_pri(q
, &req
->tr_entry
,
1111 workq_priority_for_req(req
), false);
1113 if (priority_queue_insert(q
, &req
->tr_entry
)) {
1114 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
1115 _wq_thactive_refresh_best_constrained_req_qos(wq
);
1123 * returns true if the the dequeued request was the highest priority item
1124 * in its priority queue.
1127 workq_threadreq_dequeue(struct workqueue
*wq
, workq_threadreq_t req
)
1131 if (--req
->tr_count
== 0) {
1132 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
1133 assert(wq
->wq_event_manager_threadreq
== req
);
1134 assert(req
->tr_count
== 0);
1135 wq
->wq_event_manager_threadreq
= NULL
;
1138 if (priority_queue_remove(workq_priority_queue_for_req(wq
, req
),
1140 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
1141 _wq_thactive_refresh_best_constrained_req_qos(wq
);
1150 workq_threadreq_destroy(proc_t p
, workq_threadreq_t req
)
1152 req
->tr_state
= WORKQ_TR_STATE_CANCELED
;
1153 if (req
->tr_flags
& (WORKQ_TR_FLAG_WORKLOOP
| WORKQ_TR_FLAG_KEVENT
)) {
1154 kqueue_threadreq_cancel(p
, req
);
1156 zfree(workq_zone_threadreq
, req
);
1160 #pragma mark workqueue thread creation thread calls
1163 workq_thread_call_prepost(struct workqueue
*wq
, uint32_t sched
, uint32_t pend
,
1166 uint32_t old_flags
, new_flags
;
1168 os_atomic_rmw_loop(&wq
->wq_flags
, old_flags
, new_flags
, acquire
, {
1169 if (__improbable(old_flags
& (WQ_EXITING
| sched
| pend
| fail_mask
))) {
1170 os_atomic_rmw_loop_give_up(return false);
1172 if (__improbable(old_flags
& WQ_PROC_SUSPENDED
)) {
1173 new_flags
= old_flags
| pend
;
1175 new_flags
= old_flags
| sched
;
1179 return (old_flags
& WQ_PROC_SUSPENDED
) == 0;
1182 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1185 workq_schedule_delayed_thread_creation(struct workqueue
*wq
, int flags
)
1187 assert(!preemption_enabled());
1189 if (!workq_thread_call_prepost(wq
, WQ_DELAYED_CALL_SCHEDULED
,
1190 WQ_DELAYED_CALL_PENDED
, WQ_IMMEDIATE_CALL_PENDED
|
1191 WQ_IMMEDIATE_CALL_SCHEDULED
)) {
1195 uint64_t now
= mach_absolute_time();
1197 if (flags
& WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART
) {
1198 /* do not change the window */
1199 } else if (now
- wq
->wq_thread_call_last_run
<= wq
->wq_timer_interval
) {
1200 wq
->wq_timer_interval
*= 2;
1201 if (wq
->wq_timer_interval
> wq_max_timer_interval
.abstime
) {
1202 wq
->wq_timer_interval
= (uint32_t)wq_max_timer_interval
.abstime
;
1204 } else if (now
- wq
->wq_thread_call_last_run
> 2 * wq
->wq_timer_interval
) {
1205 wq
->wq_timer_interval
/= 2;
1206 if (wq
->wq_timer_interval
< wq_stalled_window
.abstime
) {
1207 wq
->wq_timer_interval
= (uint32_t)wq_stalled_window
.abstime
;
1211 WQ_TRACE_WQ(TRACE_wq_start_add_timer
, wq
, wq
->wq_reqcount
,
1212 _wq_flags(wq
), wq
->wq_timer_interval
, 0);
1214 thread_call_t call
= wq
->wq_delayed_call
;
1215 uintptr_t arg
= WQ_DELAYED_CALL_SCHEDULED
;
1216 uint64_t deadline
= now
+ wq
->wq_timer_interval
;
1217 if (thread_call_enter1_delayed(call
, (void *)arg
, deadline
)) {
1218 panic("delayed_call was already enqueued");
1224 workq_schedule_immediate_thread_creation(struct workqueue
*wq
)
1226 assert(!preemption_enabled());
1228 if (workq_thread_call_prepost(wq
, WQ_IMMEDIATE_CALL_SCHEDULED
,
1229 WQ_IMMEDIATE_CALL_PENDED
, 0)) {
1230 WQ_TRACE_WQ(TRACE_wq_start_add_timer
, wq
, wq
->wq_reqcount
,
1231 _wq_flags(wq
), 0, 0);
1233 uintptr_t arg
= WQ_IMMEDIATE_CALL_SCHEDULED
;
1234 if (thread_call_enter1(wq
->wq_immediate_call
, (void *)arg
)) {
1235 panic("immediate_call was already enqueued");
1241 workq_proc_suspended(struct proc
*p
)
1243 struct workqueue
*wq
= proc_get_wqptr(p
);
1246 os_atomic_or(&wq
->wq_flags
, WQ_PROC_SUSPENDED
, relaxed
);
1251 workq_proc_resumed(struct proc
*p
)
1253 struct workqueue
*wq
= proc_get_wqptr(p
);
1260 wq_flags
= os_atomic_andnot_orig(&wq
->wq_flags
, WQ_PROC_SUSPENDED
|
1261 WQ_DELAYED_CALL_PENDED
| WQ_IMMEDIATE_CALL_PENDED
, relaxed
);
1262 if ((wq_flags
& WQ_EXITING
) == 0) {
1263 disable_preemption();
1264 if (wq_flags
& WQ_IMMEDIATE_CALL_PENDED
) {
1265 workq_schedule_immediate_thread_creation(wq
);
1266 } else if (wq_flags
& WQ_DELAYED_CALL_PENDED
) {
1267 workq_schedule_delayed_thread_creation(wq
,
1268 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART
);
1270 enable_preemption();
1275 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1278 workq_thread_is_busy(uint64_t now
, _Atomic
uint64_t *lastblocked_tsp
)
1280 uint64_t lastblocked_ts
= os_atomic_load_wide(lastblocked_tsp
, relaxed
);
1281 if (now
<= lastblocked_ts
) {
1283 * Because the update of the timestamp when a thread blocks
1284 * isn't serialized against us looking at it (i.e. we don't hold
1285 * the workq lock), it's possible to have a timestamp that matches
1286 * the current time or that even looks to be in the future relative
1287 * to when we grabbed the current time...
1289 * Just treat this as a busy thread since it must have just blocked.
1293 return (now
- lastblocked_ts
) < wq_stalled_window
.abstime
;
1297 workq_add_new_threads_call(void *_p
, void *flags
)
1300 struct workqueue
*wq
= proc_get_wqptr(p
);
1301 uint32_t my_flag
= (uint32_t)(uintptr_t)flags
;
1304 * workq_exit() will set the workqueue to NULL before
1305 * it cancels thread calls.
1311 assert((my_flag
== WQ_DELAYED_CALL_SCHEDULED
) ||
1312 (my_flag
== WQ_IMMEDIATE_CALL_SCHEDULED
));
1314 WQ_TRACE_WQ(TRACE_wq_add_timer
| DBG_FUNC_START
, wq
, _wq_flags(wq
),
1315 wq
->wq_nthreads
, wq
->wq_thidlecount
, 0);
1317 workq_lock_spin(wq
);
1319 wq
->wq_thread_call_last_run
= mach_absolute_time();
1320 os_atomic_andnot(&wq
->wq_flags
, my_flag
, release
);
1322 /* This can drop the workqueue lock, and take it again */
1323 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
1327 WQ_TRACE_WQ(TRACE_wq_add_timer
| DBG_FUNC_END
, wq
, 0,
1328 wq
->wq_nthreads
, wq
->wq_thidlecount
, 0);
1331 #pragma mark thread state tracking
1334 workq_sched_callback(int type
, thread_t thread
)
1336 struct uthread
*uth
= get_bsdthread_info(thread
);
1337 proc_t proc
= get_bsdtask_info(get_threadtask(thread
));
1338 struct workqueue
*wq
= proc_get_wqptr(proc
);
1339 thread_qos_t req_qos
, qos
= uth
->uu_workq_pri
.qos_bucket
;
1340 wq_thactive_t old_thactive
;
1341 bool start_timer
= false;
1343 if (qos
== WORKQ_THREAD_QOS_MANAGER
) {
1348 case SCHED_CALL_BLOCK
:
1349 old_thactive
= _wq_thactive_dec(wq
, qos
);
1350 req_qos
= WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive
);
1353 * Remember the timestamp of the last thread that blocked in this
1354 * bucket, it used used by admission checks to ignore one thread
1355 * being inactive if this timestamp is recent enough.
1357 * If we collide with another thread trying to update the
1358 * last_blocked (really unlikely since another thread would have to
1359 * get scheduled and then block after we start down this path), it's
1360 * not a problem. Either timestamp is adequate, so no need to retry
1362 os_atomic_store_wide(&wq
->wq_lastblocked_ts
[_wq_bucket(qos
)],
1363 thread_last_run_time(thread
), relaxed
);
1365 if (req_qos
== THREAD_QOS_UNSPECIFIED
) {
1367 * No pending request at the moment we could unblock, move on.
1369 } else if (qos
< req_qos
) {
1371 * The blocking thread is at a lower QoS than the highest currently
1372 * pending constrained request, nothing has to be redriven
1375 uint32_t max_busycount
, old_req_count
;
1376 old_req_count
= _wq_thactive_aggregate_downto_qos(wq
, old_thactive
,
1377 req_qos
, NULL
, &max_busycount
);
1379 * If it is possible that may_start_constrained_thread had refused
1380 * admission due to being over the max concurrency, we may need to
1381 * spin up a new thread.
1383 * We take into account the maximum number of busy threads
1384 * that can affect may_start_constrained_thread as looking at the
1385 * actual number may_start_constrained_thread will see is racy.
1387 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1388 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1390 uint32_t conc
= wq_max_parallelism
[_wq_bucket(qos
)];
1391 if (old_req_count
<= conc
&& conc
<= old_req_count
+ max_busycount
) {
1392 start_timer
= workq_schedule_delayed_thread_creation(wq
, 0);
1395 if (__improbable(kdebug_enable
)) {
1396 __unused
uint32_t old
= _wq_thactive_aggregate_downto_qos(wq
,
1397 old_thactive
, qos
, NULL
, NULL
);
1398 WQ_TRACE_WQ(TRACE_wq_thread_block
| DBG_FUNC_START
, wq
,
1399 old
- 1, qos
| (req_qos
<< 8),
1400 wq
->wq_reqcount
<< 1 | start_timer
, 0);
1404 case SCHED_CALL_UNBLOCK
:
1406 * we cannot take the workqueue_lock here...
1407 * an UNBLOCK can occur from a timer event which
1408 * is run from an interrupt context... if the workqueue_lock
1409 * is already held by this processor, we'll deadlock...
1410 * the thread lock for the thread being UNBLOCKED
1413 old_thactive
= _wq_thactive_inc(wq
, qos
);
1414 if (__improbable(kdebug_enable
)) {
1415 __unused
uint32_t old
= _wq_thactive_aggregate_downto_qos(wq
,
1416 old_thactive
, qos
, NULL
, NULL
);
1417 req_qos
= WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive
);
1418 WQ_TRACE_WQ(TRACE_wq_thread_block
| DBG_FUNC_END
, wq
,
1419 old
+ 1, qos
| (req_qos
<< 8),
1420 wq
->wq_threads_scheduled
, 0);
1426 #pragma mark workq lifecycle
1429 workq_reference(struct workqueue
*wq
)
1431 os_ref_retain(&wq
->wq_refcnt
);
1435 workq_deallocate_queue_invoke(mpsc_queue_chain_t e
,
1436 __assert_only mpsc_daemon_queue_t dq
)
1438 struct workqueue
*wq
;
1439 struct turnstile
*ts
;
1441 wq
= mpsc_queue_element(e
, struct workqueue
, wq_destroy_link
);
1442 assert(dq
== &workq_deallocate_queue
);
1444 turnstile_complete((uintptr_t)wq
, &wq
->wq_turnstile
, &ts
, TURNSTILE_WORKQS
);
1446 turnstile_cleanup();
1447 turnstile_deallocate(ts
);
1449 lck_spin_destroy(&wq
->wq_lock
, &workq_lck_grp
);
1450 zfree(workq_zone_workqueue
, wq
);
1454 workq_deallocate(struct workqueue
*wq
)
1456 if (os_ref_release_relaxed(&wq
->wq_refcnt
) == 0) {
1457 workq_deallocate_queue_invoke(&wq
->wq_destroy_link
,
1458 &workq_deallocate_queue
);
1463 workq_deallocate_safe(struct workqueue
*wq
)
1465 if (__improbable(os_ref_release_relaxed(&wq
->wq_refcnt
) == 0)) {
1466 mpsc_daemon_enqueue(&workq_deallocate_queue
, &wq
->wq_destroy_link
,
1467 MPSC_QUEUE_DISABLE_PREEMPTION
);
1472 * Setup per-process state for the workqueue.
1475 workq_open(struct proc
*p
, __unused
struct workq_open_args
*uap
,
1476 __unused
int32_t *retval
)
1478 struct workqueue
*wq
;
1481 if ((p
->p_lflag
& P_LREGISTER
) == 0) {
1485 if (wq_init_constrained_limit
) {
1486 uint32_t limit
, num_cpus
= ml_wait_max_cpus();
1489 * set up the limit for the constrained pool
1490 * this is a virtual pool in that we don't
1491 * maintain it on a separate idle and run list
1493 limit
= num_cpus
* WORKQUEUE_CONSTRAINED_FACTOR
;
1495 if (limit
> wq_max_constrained_threads
) {
1496 wq_max_constrained_threads
= limit
;
1499 if (wq_max_threads
> WQ_THACTIVE_BUCKET_HALF
) {
1500 wq_max_threads
= WQ_THACTIVE_BUCKET_HALF
;
1502 if (wq_max_threads
> CONFIG_THREAD_MAX
- 20) {
1503 wq_max_threads
= CONFIG_THREAD_MAX
- 20;
1506 wq_death_max_load
= (uint16_t)fls(num_cpus
) + 1;
1508 for (thread_qos_t qos
= WORKQ_THREAD_QOS_MIN
; qos
<= WORKQ_THREAD_QOS_MAX
; qos
++) {
1509 wq_max_parallelism
[_wq_bucket(qos
)] =
1510 qos_max_parallelism(qos
, QOS_PARALLELISM_COUNT_LOGICAL
);
1513 wq_init_constrained_limit
= 0;
1516 if (proc_get_wqptr(p
) == NULL
) {
1517 if (proc_init_wqptr_or_wait(p
) == FALSE
) {
1518 assert(proc_get_wqptr(p
) != NULL
);
1522 wq
= (struct workqueue
*)zalloc(workq_zone_workqueue
);
1523 bzero(wq
, sizeof(struct workqueue
));
1525 os_ref_init_count(&wq
->wq_refcnt
, &workq_refgrp
, 1);
1527 // Start the event manager at the priority hinted at by the policy engine
1528 thread_qos_t mgr_priority_hint
= task_get_default_manager_qos(current_task());
1529 pthread_priority_t pp
= _pthread_priority_make_from_thread_qos(mgr_priority_hint
, 0, 0);
1530 wq
->wq_event_manager_priority
= (uint32_t)pp
;
1531 wq
->wq_timer_interval
= (uint32_t)wq_stalled_window
.abstime
;
1533 turnstile_prepare((uintptr_t)wq
, &wq
->wq_turnstile
, turnstile_alloc(),
1536 TAILQ_INIT(&wq
->wq_thrunlist
);
1537 TAILQ_INIT(&wq
->wq_thnewlist
);
1538 TAILQ_INIT(&wq
->wq_thidlelist
);
1539 priority_queue_init(&wq
->wq_overcommit_queue
);
1540 priority_queue_init(&wq
->wq_constrained_queue
);
1541 priority_queue_init(&wq
->wq_special_queue
);
1543 /* We are only using the delayed thread call for the constrained pool
1544 * which can't have work at >= UI QoS and so we can be fine with a
1545 * UI QoS thread call.
1547 wq
->wq_delayed_call
= thread_call_allocate_with_qos(
1548 workq_add_new_threads_call
, p
, THREAD_QOS_USER_INTERACTIVE
,
1549 THREAD_CALL_OPTIONS_ONCE
);
1550 wq
->wq_immediate_call
= thread_call_allocate_with_options(
1551 workq_add_new_threads_call
, p
, THREAD_CALL_PRIORITY_KERNEL
,
1552 THREAD_CALL_OPTIONS_ONCE
);
1553 wq
->wq_death_call
= thread_call_allocate_with_options(
1554 workq_kill_old_threads_call
, wq
,
1555 THREAD_CALL_PRIORITY_USER
, THREAD_CALL_OPTIONS_ONCE
);
1557 lck_spin_init(&wq
->wq_lock
, &workq_lck_grp
, LCK_ATTR_NULL
);
1559 WQ_TRACE_WQ(TRACE_wq_create
| DBG_FUNC_NONE
, wq
,
1560 VM_KERNEL_ADDRHIDE(wq
), 0, 0, 0);
1561 proc_set_wqptr(p
, wq
);
1569 * Routine: workq_mark_exiting
1571 * Function: Mark the work queue such that new threads will not be added to the
1572 * work queue after we return.
1574 * Conditions: Called against the current process.
1577 workq_mark_exiting(struct proc
*p
)
1579 struct workqueue
*wq
= proc_get_wqptr(p
);
1581 workq_threadreq_t mgr_req
;
1587 WQ_TRACE_WQ(TRACE_wq_pthread_exit
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
1589 workq_lock_spin(wq
);
1591 wq_flags
= os_atomic_or_orig(&wq
->wq_flags
, WQ_EXITING
, relaxed
);
1592 if (__improbable(wq_flags
& WQ_EXITING
)) {
1593 panic("workq_mark_exiting called twice");
1597 * Opportunistically try to cancel thread calls that are likely in flight.
1598 * workq_exit() will do the proper cleanup.
1600 if (wq_flags
& WQ_IMMEDIATE_CALL_SCHEDULED
) {
1601 thread_call_cancel(wq
->wq_immediate_call
);
1603 if (wq_flags
& WQ_DELAYED_CALL_SCHEDULED
) {
1604 thread_call_cancel(wq
->wq_delayed_call
);
1606 if (wq_flags
& WQ_DEATH_CALL_SCHEDULED
) {
1607 thread_call_cancel(wq
->wq_death_call
);
1610 mgr_req
= wq
->wq_event_manager_threadreq
;
1611 wq
->wq_event_manager_threadreq
= NULL
;
1612 wq
->wq_reqcount
= 0; /* workq_schedule_creator must not look at queues */
1613 wq
->wq_creator
= NULL
;
1614 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
1619 kqueue_threadreq_cancel(p
, mgr_req
);
1622 * No one touches the priority queues once WQ_EXITING is set.
1623 * It is hence safe to do the tear down without holding any lock.
1625 priority_queue_destroy(&wq
->wq_overcommit_queue
,
1626 struct workq_threadreq_s
, tr_entry
, ^(workq_threadreq_t e
){
1627 workq_threadreq_destroy(p
, e
);
1629 priority_queue_destroy(&wq
->wq_constrained_queue
,
1630 struct workq_threadreq_s
, tr_entry
, ^(workq_threadreq_t e
){
1631 workq_threadreq_destroy(p
, e
);
1633 priority_queue_destroy(&wq
->wq_special_queue
,
1634 struct workq_threadreq_s
, tr_entry
, ^(workq_threadreq_t e
){
1635 workq_threadreq_destroy(p
, e
);
1638 WQ_TRACE(TRACE_wq_pthread_exit
| DBG_FUNC_END
, 0, 0, 0, 0, 0);
1642 * Routine: workq_exit
1644 * Function: clean up the work queue structure(s) now that there are no threads
1645 * left running inside the work queue (except possibly current_thread).
1647 * Conditions: Called by the last thread in the process.
1648 * Called against current process.
1651 workq_exit(struct proc
*p
)
1653 struct workqueue
*wq
;
1654 struct uthread
*uth
, *tmp
;
1656 wq
= os_atomic_xchg(&p
->p_wqptr
, NULL
, relaxed
);
1658 thread_t th
= current_thread();
1660 WQ_TRACE_WQ(TRACE_wq_workqueue_exit
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
1662 if (thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) {
1664 * <rdar://problem/40111515> Make sure we will no longer call the
1665 * sched call, if we ever block this thread, which the cancel_wait
1668 thread_sched_call(th
, NULL
);
1672 * Thread calls are always scheduled by the proc itself or under the
1673 * workqueue spinlock if WQ_EXITING is not yet set.
1675 * Either way, when this runs, the proc has no threads left beside
1676 * the one running this very code, so we know no thread call can be
1677 * dispatched anymore.
1679 thread_call_cancel_wait(wq
->wq_delayed_call
);
1680 thread_call_cancel_wait(wq
->wq_immediate_call
);
1681 thread_call_cancel_wait(wq
->wq_death_call
);
1682 thread_call_free(wq
->wq_delayed_call
);
1683 thread_call_free(wq
->wq_immediate_call
);
1684 thread_call_free(wq
->wq_death_call
);
1687 * Clean up workqueue data structures for threads that exited and
1688 * didn't get a chance to clean up after themselves.
1690 * idle/new threads should have been interrupted and died on their own
1692 TAILQ_FOREACH_SAFE(uth
, &wq
->wq_thrunlist
, uu_workq_entry
, tmp
) {
1693 thread_sched_call(uth
->uu_thread
, NULL
);
1694 thread_deallocate(uth
->uu_thread
);
1696 assert(TAILQ_EMPTY(&wq
->wq_thnewlist
));
1697 assert(TAILQ_EMPTY(&wq
->wq_thidlelist
));
1699 WQ_TRACE_WQ(TRACE_wq_destroy
| DBG_FUNC_END
, wq
,
1700 VM_KERNEL_ADDRHIDE(wq
), 0, 0, 0);
1702 workq_deallocate(wq
);
1704 WQ_TRACE(TRACE_wq_workqueue_exit
| DBG_FUNC_END
, 0, 0, 0, 0, 0);
1709 #pragma mark bsd thread control
1712 _pthread_priority_to_policy(pthread_priority_t priority
,
1713 thread_qos_policy_data_t
*data
)
1715 data
->qos_tier
= _pthread_priority_thread_qos(priority
);
1716 data
->tier_importance
= _pthread_priority_relpri(priority
);
1717 if (data
->qos_tier
== THREAD_QOS_UNSPECIFIED
|| data
->tier_importance
> 0 ||
1718 data
->tier_importance
< THREAD_QOS_MIN_TIER_IMPORTANCE
) {
1725 bsdthread_set_self(proc_t p
, thread_t th
, pthread_priority_t priority
,
1726 mach_port_name_t voucher
, enum workq_set_self_flags flags
)
1728 struct uthread
*uth
= get_bsdthread_info(th
);
1729 struct workqueue
*wq
= proc_get_wqptr(p
);
1732 int unbind_rv
= 0, qos_rv
= 0, voucher_rv
= 0, fixedpri_rv
= 0;
1733 bool is_wq_thread
= (thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
);
1735 if (flags
& WORKQ_SET_SELF_WQ_KEVENT_UNBIND
) {
1736 if (!is_wq_thread
) {
1741 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
1746 workq_threadreq_t kqr
= uth
->uu_kqr_bound
;
1748 unbind_rv
= EALREADY
;
1752 if (kqr
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
1757 kqueue_threadreq_unbind(p
, kqr
);
1761 if (flags
& WORKQ_SET_SELF_QOS_FLAG
) {
1762 thread_qos_policy_data_t new_policy
;
1764 if (!_pthread_priority_to_policy(priority
, &new_policy
)) {
1769 if (!is_wq_thread
) {
1771 * Threads opted out of QoS can't change QoS
1773 if (!thread_has_qos_policy(th
)) {
1777 } else if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
||
1778 uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_ABOVEUI
) {
1780 * Workqueue manager threads or threads above UI can't change QoS
1786 * For workqueue threads, possibly adjust buckets and redrive thread
1789 bool old_overcommit
= uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
;
1790 bool new_overcommit
= priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
1791 struct uu_workq_policy old_pri
, new_pri
;
1792 bool force_run
= false;
1794 workq_lock_spin(wq
);
1796 if (old_overcommit
!= new_overcommit
) {
1797 uth
->uu_workq_flags
^= UT_WORKQ_OVERCOMMIT
;
1798 if (old_overcommit
) {
1799 wq
->wq_constrained_threads_scheduled
++;
1800 } else if (wq
->wq_constrained_threads_scheduled
-- ==
1801 wq_max_constrained_threads
) {
1806 old_pri
= new_pri
= uth
->uu_workq_pri
;
1807 new_pri
.qos_req
= (thread_qos_t
)new_policy
.qos_tier
;
1808 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, force_run
);
1812 kr
= thread_policy_set_internal(th
, THREAD_QOS_POLICY
,
1813 (thread_policy_t
)&new_policy
, THREAD_QOS_POLICY_COUNT
);
1814 if (kr
!= KERN_SUCCESS
) {
1820 if (flags
& WORKQ_SET_SELF_VOUCHER_FLAG
) {
1821 kr
= thread_set_voucher_name(voucher
);
1822 if (kr
!= KERN_SUCCESS
) {
1823 voucher_rv
= ENOENT
;
1832 if (flags
& WORKQ_SET_SELF_FIXEDPRIORITY_FLAG
) {
1833 thread_extended_policy_data_t extpol
= {.timeshare
= 0};
1836 /* Not allowed on workqueue threads */
1837 fixedpri_rv
= ENOTSUP
;
1841 kr
= thread_policy_set_internal(th
, THREAD_EXTENDED_POLICY
,
1842 (thread_policy_t
)&extpol
, THREAD_EXTENDED_POLICY_COUNT
);
1843 if (kr
!= KERN_SUCCESS
) {
1844 fixedpri_rv
= EINVAL
;
1847 } else if (flags
& WORKQ_SET_SELF_TIMESHARE_FLAG
) {
1848 thread_extended_policy_data_t extpol
= {.timeshare
= 1};
1851 /* Not allowed on workqueue threads */
1852 fixedpri_rv
= ENOTSUP
;
1856 kr
= thread_policy_set_internal(th
, THREAD_EXTENDED_POLICY
,
1857 (thread_policy_t
)&extpol
, THREAD_EXTENDED_POLICY_COUNT
);
1858 if (kr
!= KERN_SUCCESS
) {
1859 fixedpri_rv
= EINVAL
;
1865 if (qos_rv
&& voucher_rv
) {
1866 /* Both failed, give that a unique error. */
1891 bsdthread_add_explicit_override(proc_t p
, mach_port_name_t kport
,
1892 pthread_priority_t pp
, user_addr_t resource
)
1894 thread_qos_t qos
= _pthread_priority_thread_qos(pp
);
1895 if (qos
== THREAD_QOS_UNSPECIFIED
) {
1899 thread_t th
= port_name_to_thread(kport
,
1900 PORT_TO_THREAD_IN_CURRENT_TASK
);
1901 if (th
== THREAD_NULL
) {
1905 int rv
= proc_thread_qos_add_override(p
->task
, th
, 0, qos
, TRUE
,
1906 resource
, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE
);
1908 thread_deallocate(th
);
1913 bsdthread_remove_explicit_override(proc_t p
, mach_port_name_t kport
,
1914 user_addr_t resource
)
1916 thread_t th
= port_name_to_thread(kport
,
1917 PORT_TO_THREAD_IN_CURRENT_TASK
);
1918 if (th
== THREAD_NULL
) {
1922 int rv
= proc_thread_qos_remove_override(p
->task
, th
, 0, resource
,
1923 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE
);
1925 thread_deallocate(th
);
1930 workq_thread_add_dispatch_override(proc_t p
, mach_port_name_t kport
,
1931 pthread_priority_t pp
, user_addr_t ulock_addr
)
1933 struct uu_workq_policy old_pri
, new_pri
;
1934 struct workqueue
*wq
= proc_get_wqptr(p
);
1936 thread_qos_t qos_override
= _pthread_priority_thread_qos(pp
);
1937 if (qos_override
== THREAD_QOS_UNSPECIFIED
) {
1941 thread_t thread
= port_name_to_thread(kport
,
1942 PORT_TO_THREAD_IN_CURRENT_TASK
);
1943 if (thread
== THREAD_NULL
) {
1947 struct uthread
*uth
= get_bsdthread_info(thread
);
1948 if ((thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
) == 0) {
1949 thread_deallocate(thread
);
1953 WQ_TRACE_WQ(TRACE_wq_override_dispatch
| DBG_FUNC_NONE
,
1954 wq
, thread_tid(thread
), 1, pp
, 0);
1956 thread_mtx_lock(thread
);
1962 * Workaround lack of explicit support for 'no-fault copyin'
1963 * <rdar://problem/24999882>, as disabling preemption prevents paging in
1965 disable_preemption();
1966 rc
= copyin_atomic32(ulock_addr
, &val
);
1967 enable_preemption();
1968 if (rc
== 0 && ulock_owner_value_to_port_name(val
) != kport
) {
1973 workq_lock_spin(wq
);
1975 old_pri
= uth
->uu_workq_pri
;
1976 if (old_pri
.qos_override
>= qos_override
) {
1978 } else if (thread
== current_thread()) {
1980 new_pri
.qos_override
= qos_override
;
1981 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
1983 uth
->uu_workq_pri
.qos_override
= qos_override
;
1984 if (qos_override
> workq_pri_override(old_pri
)) {
1985 thread_set_workq_override(thread
, qos_override
);
1992 thread_mtx_unlock(thread
);
1993 thread_deallocate(thread
);
1998 workq_thread_reset_dispatch_override(proc_t p
, thread_t thread
)
2000 struct uu_workq_policy old_pri
, new_pri
;
2001 struct workqueue
*wq
= proc_get_wqptr(p
);
2002 struct uthread
*uth
= get_bsdthread_info(thread
);
2004 if ((thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
) == 0) {
2008 WQ_TRACE_WQ(TRACE_wq_override_reset
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
2010 workq_lock_spin(wq
);
2011 old_pri
= new_pri
= uth
->uu_workq_pri
;
2012 new_pri
.qos_override
= THREAD_QOS_UNSPECIFIED
;
2013 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
2019 workq_thread_allow_kill(__unused proc_t p
, thread_t thread
, bool enable
)
2021 if (!(thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
)) {
2022 // If the thread isn't a workqueue thread, don't set the
2023 // kill_allowed bit; however, we still need to return 0
2024 // instead of an error code since this code is executed
2025 // on the abort path which needs to not depend on the
2026 // pthread_t (returning an error depends on pthread_t via
2030 struct uthread
*uth
= get_bsdthread_info(thread
);
2031 uth
->uu_workq_pthread_kill_allowed
= enable
;
2036 bsdthread_get_max_parallelism(thread_qos_t qos
, unsigned long flags
,
2039 static_assert(QOS_PARALLELISM_COUNT_LOGICAL
==
2040 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL
, "logical");
2041 static_assert(QOS_PARALLELISM_REALTIME
==
2042 _PTHREAD_QOS_PARALLELISM_REALTIME
, "realtime");
2044 if (flags
& ~(QOS_PARALLELISM_REALTIME
| QOS_PARALLELISM_COUNT_LOGICAL
)) {
2048 if (flags
& QOS_PARALLELISM_REALTIME
) {
2052 } else if (qos
== THREAD_QOS_UNSPECIFIED
|| qos
>= THREAD_QOS_LAST
) {
2056 *retval
= qos_max_parallelism(qos
, flags
);
2060 #define ENSURE_UNUSED(arg) \
2061 ({ if ((arg) != 0) { return EINVAL; } })
2064 bsdthread_ctl(struct proc
*p
, struct bsdthread_ctl_args
*uap
, int *retval
)
2067 case BSDTHREAD_CTL_QOS_OVERRIDE_START
:
2068 return bsdthread_add_explicit_override(p
, (mach_port_name_t
)uap
->arg1
,
2069 (pthread_priority_t
)uap
->arg2
, uap
->arg3
);
2070 case BSDTHREAD_CTL_QOS_OVERRIDE_END
:
2071 ENSURE_UNUSED(uap
->arg3
);
2072 return bsdthread_remove_explicit_override(p
, (mach_port_name_t
)uap
->arg1
,
2073 (user_addr_t
)uap
->arg2
);
2075 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH
:
2076 return workq_thread_add_dispatch_override(p
, (mach_port_name_t
)uap
->arg1
,
2077 (pthread_priority_t
)uap
->arg2
, uap
->arg3
);
2078 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET
:
2079 return workq_thread_reset_dispatch_override(p
, current_thread());
2081 case BSDTHREAD_CTL_SET_SELF
:
2082 return bsdthread_set_self(p
, current_thread(),
2083 (pthread_priority_t
)uap
->arg1
, (mach_port_name_t
)uap
->arg2
,
2084 (enum workq_set_self_flags
)uap
->arg3
);
2086 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM
:
2087 ENSURE_UNUSED(uap
->arg3
);
2088 return bsdthread_get_max_parallelism((thread_qos_t
)uap
->arg1
,
2089 (unsigned long)uap
->arg2
, retval
);
2090 case BSDTHREAD_CTL_WORKQ_ALLOW_KILL
:
2091 ENSURE_UNUSED(uap
->arg2
);
2092 ENSURE_UNUSED(uap
->arg3
);
2093 return workq_thread_allow_kill(p
, current_thread(), (bool)uap
->arg1
);
2095 case BSDTHREAD_CTL_SET_QOS
:
2096 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD
:
2097 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET
:
2098 /* no longer supported */
2106 #pragma mark workqueue thread manipulation
2109 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
2110 struct uthread
*uth
, uint32_t setup_flags
);
2113 workq_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
2114 struct uthread
*uth
, uint32_t setup_flags
);
2116 static void workq_setup_and_run(proc_t p
, struct uthread
*uth
, int flags
) __dead2
;
2118 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2119 static inline uint64_t
2120 workq_trace_req_id(workq_threadreq_t req
)
2122 struct kqworkloop
*kqwl
;
2123 if (req
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
2124 kqwl
= __container_of(req
, struct kqworkloop
, kqwl_request
);
2125 return kqwl
->kqwl_dynamicid
;
2128 return VM_KERNEL_ADDRHIDE(req
);
2133 * Entry point for libdispatch to ask for threads
2136 workq_reqthreads(struct proc
*p
, uint32_t reqcount
, pthread_priority_t pp
)
2138 thread_qos_t qos
= _pthread_priority_thread_qos(pp
);
2139 struct workqueue
*wq
= proc_get_wqptr(p
);
2140 uint32_t unpaced
, upcall_flags
= WQ_FLAG_THREAD_NEWSPI
;
2142 if (wq
== NULL
|| reqcount
<= 0 || reqcount
> UINT16_MAX
||
2143 qos
== THREAD_QOS_UNSPECIFIED
) {
2147 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads
| DBG_FUNC_NONE
,
2148 wq
, reqcount
, pp
, 0, 0);
2150 workq_threadreq_t req
= zalloc(workq_zone_threadreq
);
2151 priority_queue_entry_init(&req
->tr_entry
);
2152 req
->tr_state
= WORKQ_TR_STATE_NEW
;
2156 if (pp
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
) {
2157 req
->tr_flags
|= WORKQ_TR_FLAG_OVERCOMMIT
;
2158 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
2161 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate
| DBG_FUNC_NONE
,
2162 wq
, workq_trace_req_id(req
), req
->tr_qos
, reqcount
, 0);
2164 workq_lock_spin(wq
);
2166 if (_wq_exiting(wq
)) {
2171 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2172 * threads without pacing, to inform the scheduler of that workload.
2174 * The last requests, or the ones that failed the admission checks are
2175 * enqueued and go through the regular creator codepath.
2177 * If there aren't enough threads, add one, but re-evaluate everything
2178 * as conditions may now have changed.
2180 if (reqcount
> 1 && (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
2181 unpaced
= workq_constrained_allowance(wq
, qos
, NULL
, false);
2182 if (unpaced
>= reqcount
- 1) {
2183 unpaced
= reqcount
- 1;
2186 unpaced
= reqcount
- 1;
2190 * This path does not currently handle custom workloop parameters
2191 * when creating threads for parallelism.
2193 assert(!(req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
));
2196 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
2198 while (unpaced
> 0 && wq
->wq_thidlecount
) {
2199 struct uthread
*uth
;
2201 uint8_t uu_flags
= UT_WORKQ_EARLY_BOUND
;
2203 if (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
2204 uu_flags
|= UT_WORKQ_OVERCOMMIT
;
2207 uth
= workq_pop_idle_thread(wq
, uu_flags
, &needs_wakeup
);
2209 _wq_thactive_inc(wq
, qos
);
2210 wq
->wq_thscheduled_count
[_wq_bucket(qos
)]++;
2211 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
2214 uth
->uu_save
.uus_workq_park_data
.upcall_flags
= upcall_flags
;
2215 uth
->uu_save
.uus_workq_park_data
.thread_request
= req
;
2217 workq_thread_wakeup(uth
);
2222 } while (unpaced
&& wq
->wq_nthreads
< wq_max_threads
&&
2223 workq_add_new_idle_thread(p
, wq
));
2225 if (_wq_exiting(wq
)) {
2229 req
->tr_count
= (uint16_t)reqcount
;
2230 if (workq_threadreq_enqueue(wq
, req
)) {
2231 /* This can drop the workqueue lock, and take it again */
2232 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
2239 zfree(workq_zone_threadreq
, req
);
2244 workq_kern_threadreq_initiate(struct proc
*p
, workq_threadreq_t req
,
2245 struct turnstile
*workloop_ts
, thread_qos_t qos
,
2246 workq_kern_threadreq_flags_t flags
)
2248 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2249 struct uthread
*uth
= NULL
;
2251 assert(req
->tr_flags
& (WORKQ_TR_FLAG_WORKLOOP
| WORKQ_TR_FLAG_KEVENT
));
2253 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
2254 workq_threadreq_param_t trp
= kqueue_threadreq_workloop_param(req
);
2255 qos
= thread_workq_qos_for_pri(trp
.trp_pri
);
2256 if (qos
== THREAD_QOS_UNSPECIFIED
) {
2257 qos
= WORKQ_THREAD_QOS_ABOVEUI
;
2261 assert(req
->tr_state
== WORKQ_TR_STATE_IDLE
);
2262 priority_queue_entry_init(&req
->tr_entry
);
2264 req
->tr_state
= WORKQ_TR_STATE_NEW
;
2267 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate
| DBG_FUNC_NONE
, wq
,
2268 workq_trace_req_id(req
), qos
, 1, 0);
2270 if (flags
& WORKQ_THREADREQ_ATTEMPT_REBIND
) {
2272 * we're called back synchronously from the context of
2273 * kqueue_threadreq_unbind from within workq_thread_return()
2274 * we can try to match up this thread with this request !
2276 uth
= current_uthread();
2277 assert(uth
->uu_kqr_bound
== NULL
);
2280 workq_lock_spin(wq
);
2281 if (_wq_exiting(wq
)) {
2282 req
->tr_state
= WORKQ_TR_STATE_IDLE
;
2287 if (uth
&& workq_threadreq_admissible(wq
, uth
, req
)) {
2288 assert(uth
!= wq
->wq_creator
);
2289 if (uth
->uu_workq_pri
.qos_bucket
!= req
->tr_qos
) {
2290 _wq_thactive_move(wq
, uth
->uu_workq_pri
.qos_bucket
, req
->tr_qos
);
2291 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ false);
2294 * We're called from workq_kern_threadreq_initiate()
2295 * due to an unbind, with the kq req held.
2297 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
2298 workq_trace_req_id(req
), 0, 0, 0);
2300 kqueue_threadreq_bind(p
, req
, uth
->uu_thread
, 0);
2303 workq_perform_turnstile_operation_locked(wq
, ^{
2304 turnstile_update_inheritor(workloop_ts
, wq
->wq_turnstile
,
2305 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_TURNSTILE
);
2306 turnstile_update_inheritor_complete(workloop_ts
,
2307 TURNSTILE_INTERLOCK_HELD
);
2310 if (workq_threadreq_enqueue(wq
, req
)) {
2311 workq_schedule_creator(p
, wq
, flags
);
2321 workq_kern_threadreq_modify(struct proc
*p
, workq_threadreq_t req
,
2322 thread_qos_t qos
, workq_kern_threadreq_flags_t flags
)
2324 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2325 bool make_overcommit
= false;
2327 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
2328 /* Requests outside-of-QoS shouldn't accept modify operations */
2332 workq_lock_spin(wq
);
2334 assert(req
->tr_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2335 assert(req
->tr_flags
& (WORKQ_TR_FLAG_KEVENT
| WORKQ_TR_FLAG_WORKLOOP
));
2337 if (req
->tr_state
== WORKQ_TR_STATE_BINDING
) {
2338 kqueue_threadreq_bind(p
, req
, req
->tr_thread
, 0);
2343 if (flags
& WORKQ_THREADREQ_MAKE_OVERCOMMIT
) {
2344 make_overcommit
= (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0;
2347 if (_wq_exiting(wq
) || (req
->tr_qos
== qos
&& !make_overcommit
)) {
2352 assert(req
->tr_count
== 1);
2353 if (req
->tr_state
!= WORKQ_TR_STATE_QUEUED
) {
2354 panic("Invalid thread request (%p) state %d", req
, req
->tr_state
);
2357 WQ_TRACE_WQ(TRACE_wq_thread_request_modify
| DBG_FUNC_NONE
, wq
,
2358 workq_trace_req_id(req
), qos
, 0, 0);
2360 struct priority_queue_sched_max
*pq
= workq_priority_queue_for_req(wq
, req
);
2361 workq_threadreq_t req_max
;
2364 * Stage 1: Dequeue the request from its priority queue.
2366 * If we dequeue the root item of the constrained priority queue,
2367 * maintain the best constrained request qos invariant.
2369 if (priority_queue_remove(pq
, &req
->tr_entry
)) {
2370 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
2371 _wq_thactive_refresh_best_constrained_req_qos(wq
);
2376 * Stage 2: Apply changes to the thread request
2378 * If the item will not become the root of the priority queue it belongs to,
2379 * then we need to wait in line, just enqueue and return quickly.
2381 if (__improbable(make_overcommit
)) {
2382 req
->tr_flags
^= WORKQ_TR_FLAG_OVERCOMMIT
;
2383 pq
= workq_priority_queue_for_req(wq
, req
);
2387 req_max
= priority_queue_max(pq
, struct workq_threadreq_s
, tr_entry
);
2388 if (req_max
&& req_max
->tr_qos
>= qos
) {
2389 priority_queue_entry_set_sched_pri(pq
, &req
->tr_entry
,
2390 workq_priority_for_req(req
), false);
2391 priority_queue_insert(pq
, &req
->tr_entry
);
2397 * Stage 3: Reevaluate whether we should run the thread request.
2399 * Pretend the thread request is new again:
2400 * - adjust wq_reqcount to not count it anymore.
2401 * - make its state WORKQ_TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
2402 * properly attempts a synchronous bind)
2405 req
->tr_state
= WORKQ_TR_STATE_NEW
;
2406 if (workq_threadreq_enqueue(wq
, req
)) {
2407 workq_schedule_creator(p
, wq
, flags
);
2413 workq_kern_threadreq_lock(struct proc
*p
)
2415 workq_lock_spin(proc_get_wqptr_fast(p
));
2419 workq_kern_threadreq_unlock(struct proc
*p
)
2421 workq_unlock(proc_get_wqptr_fast(p
));
2425 workq_kern_threadreq_update_inheritor(struct proc
*p
, workq_threadreq_t req
,
2426 thread_t owner
, struct turnstile
*wl_ts
,
2427 turnstile_update_flags_t flags
)
2429 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2430 turnstile_inheritor_t inheritor
;
2432 assert(req
->tr_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2433 assert(req
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
);
2434 workq_lock_held(wq
);
2436 if (req
->tr_state
== WORKQ_TR_STATE_BINDING
) {
2437 kqueue_threadreq_bind(p
, req
, req
->tr_thread
,
2438 KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE
);
2442 if (_wq_exiting(wq
)) {
2443 inheritor
= TURNSTILE_INHERITOR_NULL
;
2445 if (req
->tr_state
!= WORKQ_TR_STATE_QUEUED
) {
2446 panic("Invalid thread request (%p) state %d", req
, req
->tr_state
);
2451 flags
|= TURNSTILE_INHERITOR_THREAD
;
2453 inheritor
= wq
->wq_turnstile
;
2454 flags
|= TURNSTILE_INHERITOR_TURNSTILE
;
2458 workq_perform_turnstile_operation_locked(wq
, ^{
2459 turnstile_update_inheritor(wl_ts
, inheritor
, flags
);
2464 workq_kern_threadreq_redrive(struct proc
*p
, workq_kern_threadreq_flags_t flags
)
2466 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2468 workq_lock_spin(wq
);
2469 workq_schedule_creator(p
, wq
, flags
);
2474 workq_schedule_creator_turnstile_redrive(struct workqueue
*wq
, bool locked
)
2477 workq_schedule_creator(NULL
, wq
, WORKQ_THREADREQ_NONE
);
2479 workq_schedule_immediate_thread_creation(wq
);
2484 workq_thread_return(struct proc
*p
, struct workq_kernreturn_args
*uap
,
2485 struct workqueue
*wq
)
2487 thread_t th
= current_thread();
2488 struct uthread
*uth
= get_bsdthread_info(th
);
2489 workq_threadreq_t kqr
= uth
->uu_kqr_bound
;
2490 workq_threadreq_param_t trp
= { };
2491 int nevents
= uap
->affinity
, error
;
2492 user_addr_t eventlist
= uap
->item
;
2494 if (((thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) == 0) ||
2495 (uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
2499 if (eventlist
&& nevents
&& kqr
== NULL
) {
2503 /* reset signal mask on the workqueue thread to default state */
2504 if (uth
->uu_sigmask
!= (sigset_t
)(~workq_threadmask
)) {
2506 uth
->uu_sigmask
= ~workq_threadmask
;
2510 if (kqr
&& kqr
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
) {
2512 * Ensure we store the threadreq param before unbinding
2513 * the kqr from this thread.
2515 trp
= kqueue_threadreq_workloop_param(kqr
);
2519 * Freeze thee base pri while we decide the fate of this thread.
2522 * - we return to user and kevent_cleanup will have unfrozen the base pri,
2523 * - or we proceed to workq_select_threadreq_or_park_and_unlock() who will.
2525 thread_freeze_base_pri(th
);
2528 uint32_t upcall_flags
= WQ_FLAG_THREAD_NEWSPI
| WQ_FLAG_THREAD_REUSE
;
2529 if (kqr
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
2530 upcall_flags
|= WQ_FLAG_THREAD_WORKLOOP
| WQ_FLAG_THREAD_KEVENT
;
2532 upcall_flags
|= WQ_FLAG_THREAD_KEVENT
;
2534 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
2535 upcall_flags
|= WQ_FLAG_THREAD_EVENT_MANAGER
;
2537 if (uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) {
2538 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
2540 if (uth
->uu_workq_flags
& UT_WORKQ_OUTSIDE_QOS
) {
2541 upcall_flags
|= WQ_FLAG_THREAD_OUTSIDEQOS
;
2543 upcall_flags
|= uth
->uu_workq_pri
.qos_req
|
2544 WQ_FLAG_THREAD_PRIO_QOS
;
2548 error
= pthread_functions
->workq_handle_stack_events(p
, th
,
2549 get_task_map(p
->task
), uth
->uu_workq_stackaddr
,
2550 uth
->uu_workq_thport
, eventlist
, nevents
, upcall_flags
);
2552 assert(uth
->uu_kqr_bound
== kqr
);
2556 // pthread is supposed to pass KEVENT_FLAG_PARKING here
2557 // which should cause the above call to either:
2559 // - return an error
2560 // - return 0 and have unbound properly
2561 assert(uth
->uu_kqr_bound
== NULL
);
2564 WQ_TRACE_WQ(TRACE_wq_runthread
| DBG_FUNC_END
, wq
, uap
->options
, 0, 0, 0);
2566 thread_sched_call(th
, NULL
);
2567 thread_will_park_or_terminate(th
);
2568 #if CONFIG_WORKLOOP_DEBUG
2569 UU_KEVENT_HISTORY_WRITE_ENTRY(uth
, { .uu_error
= -1, });
2572 workq_lock_spin(wq
);
2573 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2574 uth
->uu_save
.uus_workq_park_data
.workloop_params
= trp
.trp_value
;
2575 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
,
2576 WQ_SETUP_CLEAR_VOUCHER
);
2577 __builtin_unreachable();
2581 * Multiplexed call to interact with the workqueue mechanism
2584 workq_kernreturn(struct proc
*p
, struct workq_kernreturn_args
*uap
, int32_t *retval
)
2586 int options
= uap
->options
;
2587 int arg2
= uap
->affinity
;
2588 int arg3
= uap
->prio
;
2589 struct workqueue
*wq
= proc_get_wqptr(p
);
2592 if ((p
->p_lflag
& P_LREGISTER
) == 0) {
2597 case WQOPS_QUEUE_NEWSPISUPP
: {
2599 * arg2 = offset of serialno into dispatch queue
2600 * arg3 = kevent support
2604 // If we get here, then userspace has indicated support for kevent delivery.
2607 p
->p_dispatchqueue_serialno_offset
= (uint64_t)offset
;
2610 case WQOPS_QUEUE_REQTHREADS
: {
2612 * arg2 = number of threads to start
2615 error
= workq_reqthreads(p
, arg2
, arg3
);
2618 case WQOPS_SET_EVENT_MANAGER_PRIORITY
: {
2620 * arg2 = priority for the manager thread
2622 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
2623 * the low bits of the value contains a scheduling priority
2624 * instead of a QOS value
2626 pthread_priority_t pri
= arg2
;
2634 * Normalize the incoming priority so that it is ordered numerically.
2636 if (pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
2637 pri
&= (_PTHREAD_PRIORITY_SCHED_PRI_MASK
|
2638 _PTHREAD_PRIORITY_SCHED_PRI_FLAG
);
2640 thread_qos_t qos
= _pthread_priority_thread_qos(pri
);
2641 int relpri
= _pthread_priority_relpri(pri
);
2642 if (relpri
> 0 || relpri
< THREAD_QOS_MIN_TIER_IMPORTANCE
||
2643 qos
== THREAD_QOS_UNSPECIFIED
) {
2647 pri
&= ~_PTHREAD_PRIORITY_FLAGS_MASK
;
2651 * If userspace passes a scheduling priority, that wins over any QoS.
2652 * Userspace should takes care not to lower the priority this way.
2654 workq_lock_spin(wq
);
2655 if (wq
->wq_event_manager_priority
< (uint32_t)pri
) {
2656 wq
->wq_event_manager_priority
= (uint32_t)pri
;
2661 case WQOPS_THREAD_KEVENT_RETURN
:
2662 case WQOPS_THREAD_WORKLOOP_RETURN
:
2663 case WQOPS_THREAD_RETURN
: {
2664 error
= workq_thread_return(p
, uap
, wq
);
2668 case WQOPS_SHOULD_NARROW
: {
2670 * arg2 = priority to test
2673 thread_t th
= current_thread();
2674 struct uthread
*uth
= get_bsdthread_info(th
);
2675 if (((thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) == 0) ||
2676 (uth
->uu_workq_flags
& (UT_WORKQ_DYING
| UT_WORKQ_OVERCOMMIT
))) {
2681 thread_qos_t qos
= _pthread_priority_thread_qos(arg2
);
2682 if (qos
== THREAD_QOS_UNSPECIFIED
) {
2686 workq_lock_spin(wq
);
2687 bool should_narrow
= !workq_constrained_allowance(wq
, qos
, uth
, false);
2690 *retval
= should_narrow
;
2693 case WQOPS_SETUP_DISPATCH
: {
2695 * item = pointer to workq_dispatch_config structure
2696 * arg2 = sizeof(item)
2698 struct workq_dispatch_config cfg
;
2699 bzero(&cfg
, sizeof(cfg
));
2701 error
= copyin(uap
->item
, &cfg
, MIN(sizeof(cfg
), (unsigned long) arg2
));
2706 if (cfg
.wdc_flags
& ~WORKQ_DISPATCH_SUPPORTED_FLAGS
||
2707 cfg
.wdc_version
< WORKQ_DISPATCH_MIN_SUPPORTED_VERSION
) {
2712 /* Load fields from version 1 */
2713 p
->p_dispatchqueue_serialno_offset
= cfg
.wdc_queue_serialno_offs
;
2715 /* Load fields from version 2 */
2716 if (cfg
.wdc_version
>= 2) {
2717 p
->p_dispatchqueue_label_offset
= cfg
.wdc_queue_label_offs
;
2731 * We have no work to do, park ourselves on the idle list.
2733 * Consumes the workqueue lock and does not return.
2735 __attribute__((noreturn
, noinline
))
2737 workq_park_and_unlock(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
2738 uint32_t setup_flags
)
2740 assert(uth
== current_uthread());
2741 assert(uth
->uu_kqr_bound
== NULL
);
2742 workq_push_idle_thread(p
, wq
, uth
, setup_flags
); // may not return
2744 workq_thread_reset_cpupercent(NULL
, uth
);
2746 if ((uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) &&
2747 !(uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
2751 * workq_push_idle_thread() will unset `has_stack`
2752 * if it wants us to free the stack before parking.
2754 if (!uth
->uu_save
.uus_workq_park_data
.has_stack
) {
2755 pthread_functions
->workq_markfree_threadstack(p
, uth
->uu_thread
,
2756 get_task_map(p
->task
), uth
->uu_workq_stackaddr
);
2760 * When we remove the voucher from the thread, we may lose our importance
2761 * causing us to get preempted, so we do this after putting the thread on
2762 * the idle list. Then, when we get our importance back we'll be able to
2763 * use this thread from e.g. the kevent call out to deliver a boosting
2766 __assert_only kern_return_t kr
;
2767 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
2768 assert(kr
== KERN_SUCCESS
);
2770 workq_lock_spin(wq
);
2771 uth
->uu_workq_flags
&= ~UT_WORKQ_IDLE_CLEANUP
;
2772 setup_flags
&= ~WQ_SETUP_CLEAR_VOUCHER
;
2775 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2777 if (uth
->uu_workq_flags
& UT_WORKQ_RUNNING
) {
2779 * While we'd dropped the lock to unset our voucher, someone came
2780 * around and made us runnable. But because we weren't waiting on the
2781 * event their thread_wakeup() was ineffectual. To correct for that,
2782 * we just run the continuation ourselves.
2784 workq_unpark_select_threadreq_or_park_and_unlock(p
, wq
, uth
, setup_flags
);
2785 __builtin_unreachable();
2788 if (uth
->uu_workq_flags
& UT_WORKQ_DYING
) {
2789 workq_unpark_for_death_and_unlock(p
, wq
, uth
,
2790 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
, setup_flags
);
2791 __builtin_unreachable();
2794 thread_set_pending_block_hint(uth
->uu_thread
, kThreadWaitParkedWorkQueue
);
2795 assert_wait(workq_parked_wait_event(uth
), THREAD_INTERRUPTIBLE
);
2797 thread_block(workq_unpark_continue
);
2798 __builtin_unreachable();
2802 workq_may_start_event_mgr_thread(struct workqueue
*wq
, struct uthread
*uth
)
2805 * There's an event manager request and either:
2806 * - no event manager currently running
2807 * - we are re-using the event manager
2809 return wq
->wq_thscheduled_count
[_wq_bucket(WORKQ_THREAD_QOS_MANAGER
)] == 0 ||
2810 (uth
&& uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
);
2814 workq_constrained_allowance(struct workqueue
*wq
, thread_qos_t at_qos
,
2815 struct uthread
*uth
, bool may_start_timer
)
2817 assert(at_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2820 uint32_t max_count
= wq
->wq_constrained_threads_scheduled
;
2821 if (uth
&& (uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
2823 * don't count the current thread as scheduled
2825 assert(max_count
> 0);
2828 if (max_count
>= wq_max_constrained_threads
) {
2829 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 1,
2830 wq
->wq_constrained_threads_scheduled
,
2831 wq_max_constrained_threads
, 0);
2833 * we need 1 or more constrained threads to return to the kernel before
2834 * we can dispatch additional work
2838 max_count
-= wq_max_constrained_threads
;
2841 * Compute a metric for many how many threads are active. We find the
2842 * highest priority request outstanding and then add up the number of active
2843 * threads in that and all higher-priority buckets. We'll also add any
2844 * "busy" threads which are not currently active but blocked recently enough
2845 * that we can't be sure that they won't be unblocked soon and start
2846 * being active again.
2848 * We'll then compare this metric to our max concurrency to decide whether
2849 * to add a new thread.
2852 uint32_t busycount
, thactive_count
;
2854 thactive_count
= _wq_thactive_aggregate_downto_qos(wq
, _wq_thactive(wq
),
2855 at_qos
, &busycount
, NULL
);
2857 if (uth
&& uth
->uu_workq_pri
.qos_bucket
!= WORKQ_THREAD_QOS_MANAGER
&&
2858 at_qos
<= uth
->uu_workq_pri
.qos_bucket
) {
2860 * Don't count this thread as currently active, but only if it's not
2861 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
2864 assert(thactive_count
> 0);
2868 count
= wq_max_parallelism
[_wq_bucket(at_qos
)];
2869 if (count
> thactive_count
+ busycount
) {
2870 count
-= thactive_count
+ busycount
;
2871 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 2,
2872 thactive_count
, busycount
, 0);
2873 return MIN(count
, max_count
);
2875 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 3,
2876 thactive_count
, busycount
, 0);
2879 if (may_start_timer
) {
2881 * If this is called from the add timer, we won't have another timer
2882 * fire when the thread exits the "busy" state, so rearm the timer.
2884 workq_schedule_delayed_thread_creation(wq
, 0);
2891 workq_threadreq_admissible(struct workqueue
*wq
, struct uthread
*uth
,
2892 workq_threadreq_t req
)
2894 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
2895 return workq_may_start_event_mgr_thread(wq
, uth
);
2897 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
2898 return workq_constrained_allowance(wq
, req
->tr_qos
, uth
, true);
2903 static workq_threadreq_t
2904 workq_threadreq_select_for_creator(struct workqueue
*wq
)
2906 workq_threadreq_t req_qos
, req_pri
, req_tmp
, req_mgr
;
2907 thread_qos_t qos
= THREAD_QOS_UNSPECIFIED
;
2911 * Compute the best priority request, and ignore the turnstile for now
2914 req_pri
= priority_queue_max(&wq
->wq_special_queue
,
2915 struct workq_threadreq_s
, tr_entry
);
2917 pri
= (uint8_t)priority_queue_entry_sched_pri(&wq
->wq_special_queue
,
2918 &req_pri
->tr_entry
);
2922 * Handle the manager thread request. The special queue might yield
2923 * a higher priority, but the manager always beats the QoS world.
2926 req_mgr
= wq
->wq_event_manager_threadreq
;
2927 if (req_mgr
&& workq_may_start_event_mgr_thread(wq
, NULL
)) {
2928 uint32_t mgr_pri
= wq
->wq_event_manager_priority
;
2930 if (mgr_pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
2931 mgr_pri
&= _PTHREAD_PRIORITY_SCHED_PRI_MASK
;
2933 mgr_pri
= thread_workq_pri_for_qos(
2934 _pthread_priority_thread_qos(mgr_pri
));
2937 return mgr_pri
>= pri
? req_mgr
: req_pri
;
2941 * Compute the best QoS Request, and check whether it beats the "pri" one
2944 req_qos
= priority_queue_max(&wq
->wq_overcommit_queue
,
2945 struct workq_threadreq_s
, tr_entry
);
2947 qos
= req_qos
->tr_qos
;
2950 req_tmp
= priority_queue_max(&wq
->wq_constrained_queue
,
2951 struct workq_threadreq_s
, tr_entry
);
2953 if (req_tmp
&& qos
< req_tmp
->tr_qos
) {
2954 if (pri
&& pri
>= thread_workq_pri_for_qos(req_tmp
->tr_qos
)) {
2958 if (workq_constrained_allowance(wq
, req_tmp
->tr_qos
, NULL
, true)) {
2960 * If the constrained thread request is the best one and passes
2961 * the admission check, pick it.
2967 if (pri
&& (!qos
|| pri
>= thread_workq_pri_for_qos(qos
))) {
2976 * If we had no eligible request but we have a turnstile push,
2977 * it must be a non overcommit thread request that failed
2978 * the admission check.
2980 * Just fake a BG thread request so that if the push stops the creator
2981 * priority just drops to 4.
2983 if (turnstile_workq_proprietor_of_max_turnstile(wq
->wq_turnstile
, NULL
)) {
2984 static struct workq_threadreq_s workq_sync_push_fake_req
= {
2985 .tr_qos
= THREAD_QOS_BACKGROUND
,
2988 return &workq_sync_push_fake_req
;
2994 static workq_threadreq_t
2995 workq_threadreq_select(struct workqueue
*wq
, struct uthread
*uth
)
2997 workq_threadreq_t req_qos
, req_pri
, req_tmp
, req_mgr
;
2998 uintptr_t proprietor
;
2999 thread_qos_t qos
= THREAD_QOS_UNSPECIFIED
;
3002 if (uth
== wq
->wq_creator
) {
3007 * Compute the best priority request (special or turnstile)
3010 pri
= (uint8_t)turnstile_workq_proprietor_of_max_turnstile(wq
->wq_turnstile
,
3013 struct kqworkloop
*kqwl
= (struct kqworkloop
*)proprietor
;
3014 req_pri
= &kqwl
->kqwl_request
;
3015 if (req_pri
->tr_state
!= WORKQ_TR_STATE_QUEUED
) {
3016 panic("Invalid thread request (%p) state %d",
3017 req_pri
, req_pri
->tr_state
);
3023 req_tmp
= priority_queue_max(&wq
->wq_special_queue
,
3024 struct workq_threadreq_s
, tr_entry
);
3025 if (req_tmp
&& pri
< priority_queue_entry_sched_pri(&wq
->wq_special_queue
,
3026 &req_tmp
->tr_entry
)) {
3028 pri
= (uint8_t)priority_queue_entry_sched_pri(&wq
->wq_special_queue
,
3029 &req_tmp
->tr_entry
);
3033 * Handle the manager thread request. The special queue might yield
3034 * a higher priority, but the manager always beats the QoS world.
3037 req_mgr
= wq
->wq_event_manager_threadreq
;
3038 if (req_mgr
&& workq_may_start_event_mgr_thread(wq
, uth
)) {
3039 uint32_t mgr_pri
= wq
->wq_event_manager_priority
;
3041 if (mgr_pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
3042 mgr_pri
&= _PTHREAD_PRIORITY_SCHED_PRI_MASK
;
3044 mgr_pri
= thread_workq_pri_for_qos(
3045 _pthread_priority_thread_qos(mgr_pri
));
3048 return mgr_pri
>= pri
? req_mgr
: req_pri
;
3052 * Compute the best QoS Request, and check whether it beats the "pri" one
3055 req_qos
= priority_queue_max(&wq
->wq_overcommit_queue
,
3056 struct workq_threadreq_s
, tr_entry
);
3058 qos
= req_qos
->tr_qos
;
3061 req_tmp
= priority_queue_max(&wq
->wq_constrained_queue
,
3062 struct workq_threadreq_s
, tr_entry
);
3064 if (req_tmp
&& qos
< req_tmp
->tr_qos
) {
3065 if (pri
&& pri
>= thread_workq_pri_for_qos(req_tmp
->tr_qos
)) {
3069 if (workq_constrained_allowance(wq
, req_tmp
->tr_qos
, uth
, true)) {
3071 * If the constrained thread request is the best one and passes
3072 * the admission check, pick it.
3078 if (req_pri
&& (!qos
|| pri
>= thread_workq_pri_for_qos(qos
))) {
3086 * The creator is an anonymous thread that is counted as scheduled,
3087 * but otherwise without its scheduler callback set or tracked as active
3088 * that is used to make other threads.
3090 * When more requests are added or an existing one is hurried along,
3091 * a creator is elected and setup, or the existing one overridden accordingly.
3093 * While this creator is in flight, because no request has been dequeued,
3094 * already running threads have a chance at stealing thread requests avoiding
3095 * useless context switches, and the creator once scheduled may not find any
3096 * work to do and will then just park again.
3098 * The creator serves the dual purpose of informing the scheduler of work that
3099 * hasn't be materialized as threads yet, and also as a natural pacing mechanism
3100 * for thread creation.
3102 * By being anonymous (and not bound to anything) it means that thread requests
3103 * can be stolen from this creator by threads already on core yielding more
3104 * efficient scheduling and reduced context switches.
3107 workq_schedule_creator(proc_t p
, struct workqueue
*wq
,
3108 workq_kern_threadreq_flags_t flags
)
3110 workq_threadreq_t req
;
3111 struct uthread
*uth
;
3114 workq_lock_held(wq
);
3115 assert(p
|| (flags
& WORKQ_THREADREQ_CAN_CREATE_THREADS
) == 0);
3118 uth
= wq
->wq_creator
;
3120 if (!wq
->wq_reqcount
) {
3122 * There is no thread request left.
3124 * If there is a creator, leave everything in place, so that it cleans
3125 * up itself in workq_push_idle_thread().
3127 * Else, make sure the turnstile state is reset to no inheritor.
3130 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
3135 req
= workq_threadreq_select_for_creator(wq
);
3138 * There isn't a thread request that passes the admission check.
3140 * If there is a creator, do not touch anything, the creator will sort
3141 * it out when it runs.
3143 * Else, set the inheritor to "WORKQ" so that the turnstile propagation
3144 * code calls us if anything changes.
3147 workq_turnstile_update_inheritor(wq
, wq
, TURNSTILE_INHERITOR_WORKQ
);
3154 * We need to maybe override the creator we already have
3156 if (workq_thread_needs_priority_change(req
, uth
)) {
3157 WQ_TRACE_WQ(TRACE_wq_creator_select
| DBG_FUNC_NONE
,
3158 wq
, 1, thread_tid(uth
->uu_thread
), req
->tr_qos
, 0);
3159 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
3161 assert(wq
->wq_inheritor
== uth
->uu_thread
);
3162 } else if (wq
->wq_thidlecount
) {
3164 * We need to unpark a creator thread
3166 wq
->wq_creator
= uth
= workq_pop_idle_thread(wq
, UT_WORKQ_OVERCOMMIT
,
3168 /* Always reset the priorities on the newly chosen creator */
3169 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
3170 workq_turnstile_update_inheritor(wq
, uth
->uu_thread
,
3171 TURNSTILE_INHERITOR_THREAD
);
3172 WQ_TRACE_WQ(TRACE_wq_creator_select
| DBG_FUNC_NONE
,
3173 wq
, 2, thread_tid(uth
->uu_thread
), req
->tr_qos
, 0);
3174 uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
= wq
->wq_fulfilled
;
3175 uth
->uu_save
.uus_workq_park_data
.yields
= 0;
3177 workq_thread_wakeup(uth
);
3181 * We need to allocate a thread...
3183 if (__improbable(wq
->wq_nthreads
>= wq_max_threads
)) {
3184 /* out of threads, just go away */
3185 flags
= WORKQ_THREADREQ_NONE
;
3186 } else if (flags
& WORKQ_THREADREQ_SET_AST_ON_FAILURE
) {
3187 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ
);
3188 } else if (!(flags
& WORKQ_THREADREQ_CAN_CREATE_THREADS
)) {
3189 /* This can drop the workqueue lock, and take it again */
3190 workq_schedule_immediate_thread_creation(wq
);
3191 } else if (workq_add_new_idle_thread(p
, wq
)) {
3194 workq_schedule_delayed_thread_creation(wq
, 0);
3198 * If the current thread is the inheritor:
3200 * If we set the AST, then the thread will stay the inheritor until
3201 * either the AST calls workq_kern_threadreq_redrive(), or it parks
3202 * and calls workq_push_idle_thread().
3204 * Else, the responsibility of the thread creation is with a thread-call
3205 * and we need to clear the inheritor.
3207 if ((flags
& WORKQ_THREADREQ_SET_AST_ON_FAILURE
) == 0 &&
3208 wq
->wq_inheritor
== current_thread()) {
3209 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
3215 * Same as workq_unpark_select_threadreq_or_park_and_unlock,
3216 * but do not allow early binds.
3218 * Called with the base pri frozen, will unfreeze it.
3220 __attribute__((noreturn
, noinline
))
3222 workq_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
3223 struct uthread
*uth
, uint32_t setup_flags
)
3225 workq_threadreq_t req
= NULL
;
3226 bool is_creator
= (wq
->wq_creator
== uth
);
3227 bool schedule_creator
= false;
3229 if (__improbable(_wq_exiting(wq
))) {
3230 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
3234 if (wq
->wq_reqcount
== 0) {
3235 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 1, 0, 0, 0);
3239 req
= workq_threadreq_select(wq
, uth
);
3240 if (__improbable(req
== NULL
)) {
3241 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 2, 0, 0, 0);
3245 uint8_t tr_flags
= req
->tr_flags
;
3246 struct turnstile
*req_ts
= kqueue_threadreq_get_turnstile(req
);
3249 * Attempt to setup ourselves as the new thing to run, moving all priority
3250 * pushes to ourselves.
3252 * If the current thread is the creator, then the fact that we are presently
3253 * running is proof that we'll do something useful, so keep going.
3255 * For other cases, peek at the AST to know whether the scheduler wants
3256 * to preempt us, if yes, park instead, and move the thread request
3257 * turnstile back to the workqueue.
3260 workq_perform_turnstile_operation_locked(wq
, ^{
3261 turnstile_update_inheritor(req_ts
, uth
->uu_thread
,
3262 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_THREAD
);
3263 turnstile_update_inheritor_complete(req_ts
,
3264 TURNSTILE_INTERLOCK_HELD
);
3269 WQ_TRACE_WQ(TRACE_wq_creator_select
, wq
, 4, 0,
3270 uth
->uu_save
.uus_workq_park_data
.yields
, 0);
3271 wq
->wq_creator
= NULL
;
3272 _wq_thactive_inc(wq
, req
->tr_qos
);
3273 wq
->wq_thscheduled_count
[_wq_bucket(req
->tr_qos
)]++;
3274 } else if (uth
->uu_workq_pri
.qos_bucket
!= req
->tr_qos
) {
3275 _wq_thactive_move(wq
, uth
->uu_workq_pri
.qos_bucket
, req
->tr_qos
);
3278 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
3280 if (__improbable(thread_unfreeze_base_pri(uth
->uu_thread
) && !is_creator
)) {
3282 workq_perform_turnstile_operation_locked(wq
, ^{
3283 turnstile_update_inheritor(req_ts
, wq
->wq_turnstile
,
3284 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_TURNSTILE
);
3285 turnstile_update_inheritor_complete(req_ts
,
3286 TURNSTILE_INTERLOCK_HELD
);
3289 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 3, 0, 0, 0);
3294 * We passed all checks, dequeue the request, bind to it, and set it up
3295 * to return to user.
3297 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
3298 workq_trace_req_id(req
), 0, 0, 0);
3300 schedule_creator
= workq_threadreq_dequeue(wq
, req
);
3302 if (tr_flags
& (WORKQ_TR_FLAG_KEVENT
| WORKQ_TR_FLAG_WORKLOOP
)) {
3303 kqueue_threadreq_bind_prepost(p
, req
, uth
);
3305 } else if (req
->tr_count
> 0) {
3309 workq_thread_reset_cpupercent(req
, uth
);
3310 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
3311 uth
->uu_workq_flags
^= UT_WORKQ_NEW
;
3312 setup_flags
|= WQ_SETUP_FIRST_USE
;
3314 if (tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
3315 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
3316 uth
->uu_workq_flags
|= UT_WORKQ_OVERCOMMIT
;
3317 wq
->wq_constrained_threads_scheduled
--;
3320 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) != 0) {
3321 uth
->uu_workq_flags
&= ~UT_WORKQ_OVERCOMMIT
;
3322 wq
->wq_constrained_threads_scheduled
++;
3326 if (is_creator
|| schedule_creator
) {
3327 /* This can drop the workqueue lock, and take it again */
3328 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
3334 zfree(workq_zone_threadreq
, req
);
3340 uint32_t upcall_flags
= WQ_FLAG_THREAD_NEWSPI
;
3341 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
3342 upcall_flags
|= WQ_FLAG_THREAD_EVENT_MANAGER
;
3343 } else if (tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
3344 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
3346 if (tr_flags
& WORKQ_TR_FLAG_KEVENT
) {
3347 upcall_flags
|= WQ_FLAG_THREAD_KEVENT
;
3349 if (tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
3350 upcall_flags
|= WQ_FLAG_THREAD_WORKLOOP
| WQ_FLAG_THREAD_KEVENT
;
3352 uth
->uu_save
.uus_workq_park_data
.upcall_flags
= upcall_flags
;
3354 if (tr_flags
& (WORKQ_TR_FLAG_KEVENT
| WORKQ_TR_FLAG_WORKLOOP
)) {
3355 kqueue_threadreq_bind_commit(p
, uth
->uu_thread
);
3357 workq_setup_and_run(p
, uth
, setup_flags
);
3358 __builtin_unreachable();
3361 thread_unfreeze_base_pri(uth
->uu_thread
);
3363 workq_park_and_unlock(p
, wq
, uth
, setup_flags
);
3367 * Runs a thread request on a thread
3369 * - if thread is THREAD_NULL, will find a thread and run the request there.
3370 * Otherwise, the thread must be the current thread.
3372 * - if req is NULL, will find the highest priority request and run that. If
3373 * it is not NULL, it must be a threadreq object in state NEW. If it can not
3374 * be run immediately, it will be enqueued and moved to state QUEUED.
3376 * Either way, the thread request object serviced will be moved to state
3377 * BINDING and attached to the uthread.
3379 * Should be called with the workqueue lock held. Will drop it.
3380 * Should be called with the base pri not frozen.
3382 __attribute__((noreturn
, noinline
))
3384 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
3385 struct uthread
*uth
, uint32_t setup_flags
)
3387 if (uth
->uu_workq_flags
& UT_WORKQ_EARLY_BOUND
) {
3388 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
3389 setup_flags
|= WQ_SETUP_FIRST_USE
;
3391 uth
->uu_workq_flags
&= ~(UT_WORKQ_NEW
| UT_WORKQ_EARLY_BOUND
);
3393 * This pointer is possibly freed and only used for tracing purposes.
3395 workq_threadreq_t req
= uth
->uu_save
.uus_workq_park_data
.thread_request
;
3397 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
3398 VM_KERNEL_ADDRHIDE(req
), 0, 0, 0);
3400 workq_setup_and_run(p
, uth
, setup_flags
);
3401 __builtin_unreachable();
3404 thread_freeze_base_pri(uth
->uu_thread
);
3405 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
, setup_flags
);
3409 workq_creator_should_yield(struct workqueue
*wq
, struct uthread
*uth
)
3411 thread_qos_t qos
= workq_pri_override(uth
->uu_workq_pri
);
3413 if (qos
>= THREAD_QOS_USER_INTERACTIVE
) {
3417 uint32_t snapshot
= uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
;
3418 if (wq
->wq_fulfilled
== snapshot
) {
3422 uint32_t cnt
= 0, conc
= wq_max_parallelism
[_wq_bucket(qos
)];
3423 if (wq
->wq_fulfilled
- snapshot
> conc
) {
3424 /* we fulfilled more than NCPU requests since being dispatched */
3425 WQ_TRACE_WQ(TRACE_wq_creator_yield
, wq
, 1,
3426 wq
->wq_fulfilled
, snapshot
, 0);
3430 for (int i
= _wq_bucket(qos
); i
< WORKQ_NUM_QOS_BUCKETS
; i
++) {
3431 cnt
+= wq
->wq_thscheduled_count
[i
];
3434 /* We fulfilled requests and have more than NCPU scheduled threads */
3435 WQ_TRACE_WQ(TRACE_wq_creator_yield
, wq
, 2,
3436 wq
->wq_fulfilled
, snapshot
, 0);
3444 * parked thread wakes up
3446 __attribute__((noreturn
, noinline
))
3448 workq_unpark_continue(void *parameter __unused
, wait_result_t wr __unused
)
3450 thread_t th
= current_thread();
3451 struct uthread
*uth
= get_bsdthread_info(th
);
3452 proc_t p
= current_proc();
3453 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
3455 workq_lock_spin(wq
);
3457 if (wq
->wq_creator
== uth
&& workq_creator_should_yield(wq
, uth
)) {
3459 * If the number of threads we have out are able to keep up with the
3460 * demand, then we should avoid sending this creator thread to
3463 uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
= wq
->wq_fulfilled
;
3464 uth
->uu_save
.uus_workq_park_data
.yields
++;
3466 thread_yield_with_continuation(workq_unpark_continue
, NULL
);
3467 __builtin_unreachable();
3470 if (__probable(uth
->uu_workq_flags
& UT_WORKQ_RUNNING
)) {
3471 workq_unpark_select_threadreq_or_park_and_unlock(p
, wq
, uth
, WQ_SETUP_NONE
);
3472 __builtin_unreachable();
3475 if (__probable(wr
== THREAD_AWAKENED
)) {
3477 * We were set running, but for the purposes of dying.
3479 assert(uth
->uu_workq_flags
& UT_WORKQ_DYING
);
3480 assert((uth
->uu_workq_flags
& UT_WORKQ_NEW
) == 0);
3483 * workaround for <rdar://problem/38647347>,
3484 * in case we do hit userspace, make sure calling
3485 * workq_thread_terminate() does the right thing here,
3486 * and if we never call it, that workq_exit() will too because it sees
3487 * this thread on the runlist.
3489 assert(wr
== THREAD_INTERRUPTED
);
3490 wq
->wq_thdying_count
++;
3491 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
3494 workq_unpark_for_death_and_unlock(p
, wq
, uth
,
3495 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
, WQ_SETUP_NONE
);
3496 __builtin_unreachable();
3499 __attribute__((noreturn
, noinline
))
3501 workq_setup_and_run(proc_t p
, struct uthread
*uth
, int setup_flags
)
3503 thread_t th
= uth
->uu_thread
;
3504 vm_map_t vmap
= get_task_map(p
->task
);
3506 if (setup_flags
& WQ_SETUP_CLEAR_VOUCHER
) {
3508 * For preemption reasons, we want to reset the voucher as late as
3509 * possible, so we do it in two places:
3510 * - Just before parking (i.e. in workq_park_and_unlock())
3511 * - Prior to doing the setup for the next workitem (i.e. here)
3513 * Those two places are sufficient to ensure we always reset it before
3514 * it goes back out to user space, but be careful to not break that
3517 __assert_only kern_return_t kr
;
3518 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
3519 assert(kr
== KERN_SUCCESS
);
3522 uint32_t upcall_flags
= uth
->uu_save
.uus_workq_park_data
.upcall_flags
;
3523 if (!(setup_flags
& WQ_SETUP_FIRST_USE
)) {
3524 upcall_flags
|= WQ_FLAG_THREAD_REUSE
;
3527 if (uth
->uu_workq_flags
& UT_WORKQ_OUTSIDE_QOS
) {
3529 * For threads that have an outside-of-QoS thread priority, indicate
3530 * to userspace that setting QoS should only affect the TSD and not
3531 * change QOS in the kernel.
3533 upcall_flags
|= WQ_FLAG_THREAD_OUTSIDEQOS
;
3536 * Put the QoS class value into the lower bits of the reuse_thread
3537 * register, this is where the thread priority used to be stored
3540 upcall_flags
|= uth
->uu_save
.uus_workq_park_data
.qos
|
3541 WQ_FLAG_THREAD_PRIO_QOS
;
3544 if (uth
->uu_workq_thport
== MACH_PORT_NULL
) {
3545 /* convert_thread_to_port_pinned() consumes a reference */
3546 thread_reference(th
);
3547 /* Convert to immovable/pinned thread port, but port is not pinned yet */
3548 ipc_port_t port
= convert_thread_to_port_pinned(th
);
3549 /* Atomically, pin and copy out the port */
3550 uth
->uu_workq_thport
= ipc_port_copyout_send_pinned(port
, get_task_ipcspace(p
->task
));
3554 * Call out to pthread, this sets up the thread, pulls in kevent structs
3555 * onto the stack, sets up the thread state and then returns to userspace.
3557 WQ_TRACE_WQ(TRACE_wq_runthread
| DBG_FUNC_START
,
3558 proc_get_wqptr_fast(p
), 0, 0, 0, 0);
3559 thread_sched_call(th
, workq_sched_callback
);
3560 pthread_functions
->workq_setup_thread(p
, th
, vmap
, uth
->uu_workq_stackaddr
,
3561 uth
->uu_workq_thport
, 0, setup_flags
, upcall_flags
);
3563 __builtin_unreachable();
3569 fill_procworkqueue(proc_t p
, struct proc_workqueueinfo
* pwqinfo
)
3571 struct workqueue
*wq
= proc_get_wqptr(p
);
3580 * This is sometimes called from interrupt context by the kperf sampler.
3581 * In that case, it's not safe to spin trying to take the lock since we
3582 * might already hold it. So, we just try-lock it and error out if it's
3583 * already held. Since this is just a debugging aid, and all our callers
3584 * are able to handle an error, that's fine.
3586 bool locked
= workq_lock_try(wq
);
3591 wq_thactive_t act
= _wq_thactive(wq
);
3592 activecount
= _wq_thactive_aggregate_downto_qos(wq
, act
,
3593 WORKQ_THREAD_QOS_MIN
, NULL
, NULL
);
3594 if (act
& _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER
)) {
3597 pwqinfo
->pwq_nthreads
= wq
->wq_nthreads
;
3598 pwqinfo
->pwq_runthreads
= activecount
;
3599 pwqinfo
->pwq_blockedthreads
= wq
->wq_threads_scheduled
- activecount
;
3600 pwqinfo
->pwq_state
= 0;
3602 if (wq
->wq_constrained_threads_scheduled
>= wq_max_constrained_threads
) {
3603 pwqinfo
->pwq_state
|= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
;
3606 if (wq
->wq_nthreads
>= wq_max_threads
) {
3607 pwqinfo
->pwq_state
|= WQ_EXCEEDED_TOTAL_THREAD_LIMIT
;
3615 workqueue_get_pwq_exceeded(void *v
, boolean_t
*exceeded_total
,
3616 boolean_t
*exceeded_constrained
)
3619 struct proc_workqueueinfo pwqinfo
;
3623 assert(exceeded_total
!= NULL
);
3624 assert(exceeded_constrained
!= NULL
);
3626 err
= fill_procworkqueue(p
, &pwqinfo
);
3630 if (!(pwqinfo
.pwq_state
& WQ_FLAGS_AVAILABLE
)) {
3634 *exceeded_total
= (pwqinfo
.pwq_state
& WQ_EXCEEDED_TOTAL_THREAD_LIMIT
);
3635 *exceeded_constrained
= (pwqinfo
.pwq_state
& WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
);
3641 workqueue_get_pwq_state_kdp(void * v
)
3643 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
<< 17) ==
3644 kTaskWqExceededConstrainedThreadLimit
);
3645 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT
<< 17) ==
3646 kTaskWqExceededTotalThreadLimit
);
3647 static_assert((WQ_FLAGS_AVAILABLE
<< 17) == kTaskWqFlagsAvailable
);
3648 static_assert((WQ_FLAGS_AVAILABLE
| WQ_EXCEEDED_TOTAL_THREAD_LIMIT
|
3649 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
) == 0x7);
3656 struct workqueue
*wq
= proc_get_wqptr(p
);
3658 if (wq
== NULL
|| workq_lock_spin_is_acquired_kdp(wq
)) {
3662 uint32_t pwq_state
= WQ_FLAGS_AVAILABLE
;
3664 if (wq
->wq_constrained_threads_scheduled
>= wq_max_constrained_threads
) {
3665 pwq_state
|= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
;
3668 if (wq
->wq_nthreads
>= wq_max_threads
) {
3669 pwq_state
|= WQ_EXCEEDED_TOTAL_THREAD_LIMIT
;
3678 clock_interval_to_absolutetime_interval(wq_stalled_window
.usecs
,
3679 NSEC_PER_USEC
, &wq_stalled_window
.abstime
);
3680 clock_interval_to_absolutetime_interval(wq_reduce_pool_window
.usecs
,
3681 NSEC_PER_USEC
, &wq_reduce_pool_window
.abstime
);
3682 clock_interval_to_absolutetime_interval(wq_max_timer_interval
.usecs
,
3683 NSEC_PER_USEC
, &wq_max_timer_interval
.abstime
);
3685 thread_deallocate_daemon_register_queue(&workq_deallocate_queue
,
3686 workq_deallocate_queue_invoke
);