2 * Copyright (c) 2000-2020 Apple Inc. All rights reserved.
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
30 #include <sys/cdefs.h>
32 #include <kern/assert.h>
34 #include <kern/clock.h>
35 #include <kern/cpu_data.h>
36 #include <kern/kern_types.h>
37 #include <kern/policy_internal.h>
38 #include <kern/processor.h>
39 #include <kern/sched_prim.h> /* for thread_exception_return */
40 #include <kern/task.h>
41 #include <kern/thread.h>
42 #include <kern/zalloc.h>
43 #include <mach/kern_return.h>
44 #include <mach/mach_param.h>
45 #include <mach/mach_port.h>
46 #include <mach/mach_types.h>
47 #include <mach/mach_vm.h>
48 #include <mach/sync_policy.h>
49 #include <mach/task.h>
50 #include <mach/thread_act.h> /* for thread_resume */
51 #include <mach/thread_policy.h>
52 #include <mach/thread_status.h>
53 #include <mach/vm_prot.h>
54 #include <mach/vm_statistics.h>
55 #include <machine/atomic.h>
56 #include <machine/machine_routines.h>
57 #include <vm/vm_map.h>
58 #include <vm/vm_protos.h>
60 #include <sys/eventvar.h>
61 #include <sys/kdebug.h>
62 #include <sys/kernel.h>
64 #include <sys/param.h>
65 #include <sys/proc_info.h> /* for fill_procworkqueue */
66 #include <sys/proc_internal.h>
67 #include <sys/pthread_shims.h>
68 #include <sys/resourcevar.h>
69 #include <sys/signalvar.h>
70 #include <sys/sysctl.h>
71 #include <sys/sysproto.h>
72 #include <sys/systm.h>
73 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
75 #include <pthread/bsdthread_private.h>
76 #include <pthread/workqueue_syscalls.h>
77 #include <pthread/workqueue_internal.h>
78 #include <pthread/workqueue_trace.h>
82 static void workq_unpark_continue(void *uth
, wait_result_t wr
) __dead2
;
83 static void workq_schedule_creator(proc_t p
, struct workqueue
*wq
,
84 workq_kern_threadreq_flags_t flags
);
86 static bool workq_threadreq_admissible(struct workqueue
*wq
, struct uthread
*uth
,
87 workq_threadreq_t req
);
89 static uint32_t workq_constrained_allowance(struct workqueue
*wq
,
90 thread_qos_t at_qos
, struct uthread
*uth
, bool may_start_timer
);
92 static bool workq_thread_is_busy(uint64_t cur_ts
,
93 _Atomic
uint64_t *lastblocked_tsp
);
95 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
;
99 struct workq_usec_var
{
104 #define WORKQ_SYSCTL_USECS(var, init) \
105 static struct workq_usec_var var = { .usecs = init }; \
106 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
107 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
108 workq_sysctl_handle_usecs, "I", "")
110 static LCK_GRP_DECLARE(workq_lck_grp
, "workq");
111 os_refgrp_decl(static, workq_refgrp
, "workq", NULL
);
113 static ZONE_DECLARE(workq_zone_workqueue
, "workq.wq",
114 sizeof(struct workqueue
), ZC_NONE
);
115 static ZONE_DECLARE(workq_zone_threadreq
, "workq.threadreq",
116 sizeof(struct workq_threadreq_s
), ZC_CACHING
);
118 static struct mpsc_daemon_queue workq_deallocate_queue
;
120 WORKQ_SYSCTL_USECS(wq_stalled_window
, WQ_STALLED_WINDOW_USECS
);
121 WORKQ_SYSCTL_USECS(wq_reduce_pool_window
, WQ_REDUCE_POOL_WINDOW_USECS
);
122 WORKQ_SYSCTL_USECS(wq_max_timer_interval
, WQ_MAX_TIMER_INTERVAL_USECS
);
123 static uint32_t wq_max_threads
= WORKQUEUE_MAXTHREADS
;
124 static uint32_t wq_max_constrained_threads
= WORKQUEUE_MAXTHREADS
/ 8;
125 static uint32_t wq_init_constrained_limit
= 1;
126 static uint16_t wq_death_max_load
;
127 static uint32_t wq_max_parallelism
[WORKQ_NUM_QOS_BUCKETS
];
132 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
135 struct workq_usec_var
*v
= arg1
;
136 int error
= sysctl_handle_int(oidp
, &v
->usecs
, 0, req
);
137 if (error
|| !req
->newptr
) {
140 clock_interval_to_absolutetime_interval(v
->usecs
, NSEC_PER_USEC
,
145 SYSCTL_INT(_kern
, OID_AUTO
, wq_max_threads
, CTLFLAG_RW
| CTLFLAG_LOCKED
,
146 &wq_max_threads
, 0, "");
148 SYSCTL_INT(_kern
, OID_AUTO
, wq_max_constrained_threads
, CTLFLAG_RW
| CTLFLAG_LOCKED
,
149 &wq_max_constrained_threads
, 0, "");
153 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
155 static struct workqueue
*
156 proc_get_wqptr_fast(struct proc
*p
)
158 return os_atomic_load(&p
->p_wqptr
, relaxed
);
161 static struct workqueue
*
162 proc_get_wqptr(struct proc
*p
)
164 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
165 return wq
== WQPTR_IS_INITING_VALUE
? NULL
: wq
;
169 proc_set_wqptr(struct proc
*p
, struct workqueue
*wq
)
171 wq
= os_atomic_xchg(&p
->p_wqptr
, wq
, release
);
172 if (wq
== WQPTR_IS_INITING_VALUE
) {
174 thread_wakeup(&p
->p_wqptr
);
180 proc_init_wqptr_or_wait(struct proc
*p
)
182 struct workqueue
*wq
;
185 wq
= os_atomic_load(&p
->p_wqptr
, relaxed
);
188 os_atomic_store(&p
->p_wqptr
, WQPTR_IS_INITING_VALUE
, relaxed
);
193 if (wq
== WQPTR_IS_INITING_VALUE
) {
194 assert_wait(&p
->p_wqptr
, THREAD_UNINT
);
196 thread_block(THREAD_CONTINUE_NULL
);
203 static inline event_t
204 workq_parked_wait_event(struct uthread
*uth
)
206 return (event_t
)&uth
->uu_workq_stackaddr
;
210 workq_thread_wakeup(struct uthread
*uth
)
212 thread_wakeup_thread(workq_parked_wait_event(uth
), uth
->uu_thread
);
215 #pragma mark wq_thactive
217 #if defined(__LP64__)
219 // 127 - 115 : 13 bits of zeroes
220 // 114 - 112 : best QoS among all pending constrained requests
221 // 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
222 #define WQ_THACTIVE_BUCKET_WIDTH 16
223 #define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH)
226 // 63 - 61 : best QoS among all pending constrained requests
227 // 60 : Manager bucket (0 or 1)
228 // 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
229 #define WQ_THACTIVE_BUCKET_WIDTH 10
230 #define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
232 #define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
233 #define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
235 static_assert(sizeof(wq_thactive_t
) * CHAR_BIT
- WQ_THACTIVE_QOS_SHIFT
>= 3,
236 "Make sure we have space to encode a QoS");
238 static inline wq_thactive_t
239 _wq_thactive(struct workqueue
*wq
)
241 return os_atomic_load_wide(&wq
->wq_thactive
, relaxed
);
245 _wq_bucket(thread_qos_t qos
)
247 // Map both BG and MT to the same bucket by over-shifting down and
248 // clamping MT and BG together.
250 case THREAD_QOS_MAINTENANCE
:
257 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
258 ((thread_qos_t)((tha) >> WQ_THACTIVE_QOS_SHIFT))
260 static inline thread_qos_t
261 _wq_thactive_best_constrained_req_qos(struct workqueue
*wq
)
263 // Avoid expensive atomic operations: the three bits we're loading are in
264 // a single byte, and always updated under the workqueue lock
265 wq_thactive_t v
= *(wq_thactive_t
*)&wq
->wq_thactive
;
266 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v
);
270 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue
*wq
)
272 thread_qos_t old_qos
, new_qos
;
273 workq_threadreq_t req
;
275 req
= priority_queue_max(&wq
->wq_constrained_queue
,
276 struct workq_threadreq_s
, tr_entry
);
277 new_qos
= req
? req
->tr_qos
: THREAD_QOS_UNSPECIFIED
;
278 old_qos
= _wq_thactive_best_constrained_req_qos(wq
);
279 if (old_qos
!= new_qos
) {
280 long delta
= (long)new_qos
- (long)old_qos
;
281 wq_thactive_t v
= (wq_thactive_t
)delta
<< WQ_THACTIVE_QOS_SHIFT
;
283 * We can do an atomic add relative to the initial load because updates
284 * to this qos are always serialized under the workqueue lock.
286 v
= os_atomic_add(&wq
->wq_thactive
, v
, relaxed
);
288 WQ_TRACE_WQ(TRACE_wq_thactive_update
, wq
, (uint64_t)v
,
289 (uint64_t)(v
>> 64), 0, 0);
291 WQ_TRACE_WQ(TRACE_wq_thactive_update
, wq
, v
, 0, 0, 0);
296 static inline wq_thactive_t
297 _wq_thactive_offset_for_qos(thread_qos_t qos
)
299 return (wq_thactive_t
)1 << (_wq_bucket(qos
) * WQ_THACTIVE_BUCKET_WIDTH
);
302 static inline wq_thactive_t
303 _wq_thactive_inc(struct workqueue
*wq
, thread_qos_t qos
)
305 wq_thactive_t v
= _wq_thactive_offset_for_qos(qos
);
306 return os_atomic_add_orig(&wq
->wq_thactive
, v
, relaxed
);
309 static inline wq_thactive_t
310 _wq_thactive_dec(struct workqueue
*wq
, thread_qos_t qos
)
312 wq_thactive_t v
= _wq_thactive_offset_for_qos(qos
);
313 return os_atomic_sub_orig(&wq
->wq_thactive
, v
, relaxed
);
317 _wq_thactive_move(struct workqueue
*wq
,
318 thread_qos_t old_qos
, thread_qos_t new_qos
)
320 wq_thactive_t v
= _wq_thactive_offset_for_qos(new_qos
) -
321 _wq_thactive_offset_for_qos(old_qos
);
322 os_atomic_add(&wq
->wq_thactive
, v
, relaxed
);
323 wq
->wq_thscheduled_count
[_wq_bucket(old_qos
)]--;
324 wq
->wq_thscheduled_count
[_wq_bucket(new_qos
)]++;
327 static inline uint32_t
328 _wq_thactive_aggregate_downto_qos(struct workqueue
*wq
, wq_thactive_t v
,
329 thread_qos_t qos
, uint32_t *busycount
, uint32_t *max_busycount
)
331 uint32_t count
= 0, active
;
334 assert(WORKQ_THREAD_QOS_MIN
<= qos
&& qos
<= WORKQ_THREAD_QOS_MAX
);
337 curtime
= mach_absolute_time();
341 *max_busycount
= THREAD_QOS_LAST
- qos
;
344 int i
= _wq_bucket(qos
);
345 v
>>= i
* WQ_THACTIVE_BUCKET_WIDTH
;
346 for (; i
< WORKQ_NUM_QOS_BUCKETS
; i
++, v
>>= WQ_THACTIVE_BUCKET_WIDTH
) {
347 active
= v
& WQ_THACTIVE_BUCKET_MASK
;
350 if (busycount
&& wq
->wq_thscheduled_count
[i
] > active
) {
351 if (workq_thread_is_busy(curtime
, &wq
->wq_lastblocked_ts
[i
])) {
353 * We only consider the last blocked thread for a given bucket
354 * as busy because we don't want to take the list lock in each
355 * sched callback. However this is an approximation that could
356 * contribute to thread creation storms.
366 #pragma mark wq_flags
368 static inline uint32_t
369 _wq_flags(struct workqueue
*wq
)
371 return os_atomic_load(&wq
->wq_flags
, relaxed
);
375 _wq_exiting(struct workqueue
*wq
)
377 return _wq_flags(wq
) & WQ_EXITING
;
381 workq_is_exiting(struct proc
*p
)
383 struct workqueue
*wq
= proc_get_wqptr(p
);
384 return !wq
|| _wq_exiting(wq
);
387 #pragma mark workqueue lock
390 workq_lock_spin_is_acquired_kdp(struct workqueue
*wq
)
392 return kdp_lck_spin_is_acquired(&wq
->wq_lock
);
396 workq_lock_spin(struct workqueue
*wq
)
398 lck_spin_lock_grp(&wq
->wq_lock
, &workq_lck_grp
);
402 workq_lock_held(__assert_only
struct workqueue
*wq
)
404 LCK_SPIN_ASSERT(&wq
->wq_lock
, LCK_ASSERT_OWNED
);
408 workq_lock_try(struct workqueue
*wq
)
410 return lck_spin_try_lock_grp(&wq
->wq_lock
, &workq_lck_grp
);
414 workq_unlock(struct workqueue
*wq
)
416 lck_spin_unlock(&wq
->wq_lock
);
419 #pragma mark idle thread lists
421 #define WORKQ_POLICY_INIT(qos) \
422 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
424 static inline thread_qos_t
425 workq_pri_bucket(struct uu_workq_policy req
)
427 return MAX(MAX(req
.qos_req
, req
.qos_max
), req
.qos_override
);
430 static inline thread_qos_t
431 workq_pri_override(struct uu_workq_policy req
)
433 return MAX(workq_pri_bucket(req
), req
.qos_bucket
);
437 workq_thread_needs_params_change(workq_threadreq_t req
, struct uthread
*uth
)
439 workq_threadreq_param_t cur_trp
, req_trp
= { };
441 cur_trp
.trp_value
= uth
->uu_save
.uus_workq_park_data
.workloop_params
;
442 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
) {
443 req_trp
= kqueue_threadreq_workloop_param(req
);
447 * CPU percent flags are handled separately to policy changes, so ignore
448 * them for all of these checks.
450 uint16_t cur_flags
= (cur_trp
.trp_flags
& ~TRP_CPUPERCENT
);
451 uint16_t req_flags
= (req_trp
.trp_flags
& ~TRP_CPUPERCENT
);
453 if (!req_flags
&& !cur_flags
) {
457 if (req_flags
!= cur_flags
) {
461 if ((req_flags
& TRP_PRIORITY
) && req_trp
.trp_pri
!= cur_trp
.trp_pri
) {
465 if ((req_flags
& TRP_POLICY
) && req_trp
.trp_pol
!= cur_trp
.trp_pol
) {
473 workq_thread_needs_priority_change(workq_threadreq_t req
, struct uthread
*uth
)
475 if (workq_thread_needs_params_change(req
, uth
)) {
479 return req
->tr_qos
!= workq_pri_override(uth
->uu_workq_pri
);
483 workq_thread_update_bucket(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
484 struct uu_workq_policy old_pri
, struct uu_workq_policy new_pri
,
487 thread_qos_t old_bucket
= old_pri
.qos_bucket
;
488 thread_qos_t new_bucket
= workq_pri_bucket(new_pri
);
490 if (old_bucket
!= new_bucket
) {
491 _wq_thactive_move(wq
, old_bucket
, new_bucket
);
494 new_pri
.qos_bucket
= new_bucket
;
495 uth
->uu_workq_pri
= new_pri
;
497 if (workq_pri_override(old_pri
) != new_bucket
) {
498 thread_set_workq_override(uth
->uu_thread
, new_bucket
);
501 if (wq
->wq_reqcount
&& (old_bucket
> new_bucket
|| force_run
)) {
502 int flags
= WORKQ_THREADREQ_CAN_CREATE_THREADS
;
503 if (old_bucket
> new_bucket
) {
505 * When lowering our bucket, we may unblock a thread request,
506 * but we can't drop our priority before we have evaluated
507 * whether this is the case, and if we ever drop the workqueue lock
508 * that would cause a priority inversion.
510 * We hence have to disallow thread creation in that case.
514 workq_schedule_creator(p
, wq
, flags
);
519 * Sets/resets the cpu percent limits on the current thread. We can't set
520 * these limits from outside of the current thread, so this function needs
521 * to be called when we're executing on the intended
524 workq_thread_reset_cpupercent(workq_threadreq_t req
, struct uthread
*uth
)
526 assert(uth
== current_uthread());
527 workq_threadreq_param_t trp
= { };
529 if (req
&& (req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
)) {
530 trp
= kqueue_threadreq_workloop_param(req
);
533 if (uth
->uu_workq_flags
& UT_WORKQ_CPUPERCENT
) {
535 * Going through disable when we have an existing CPU percent limit
536 * set will force the ledger to refill the token bucket of the current
537 * thread. Removing any penalty applied by previous thread use.
539 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE
, 0, 0);
540 uth
->uu_workq_flags
&= ~UT_WORKQ_CPUPERCENT
;
543 if (trp
.trp_flags
& TRP_CPUPERCENT
) {
544 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK
, trp
.trp_cpupercent
,
545 (uint64_t)trp
.trp_refillms
* NSEC_PER_SEC
);
546 uth
->uu_workq_flags
|= UT_WORKQ_CPUPERCENT
;
551 workq_thread_reset_pri(struct workqueue
*wq
, struct uthread
*uth
,
552 workq_threadreq_t req
, bool unpark
)
554 thread_t th
= uth
->uu_thread
;
555 thread_qos_t qos
= req
? req
->tr_qos
: WORKQ_THREAD_QOS_CLEANUP
;
556 workq_threadreq_param_t trp
= { };
558 int policy
= POLICY_TIMESHARE
;
560 if (req
&& (req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
)) {
561 trp
= kqueue_threadreq_workloop_param(req
);
564 uth
->uu_workq_pri
= WORKQ_POLICY_INIT(qos
);
565 uth
->uu_workq_flags
&= ~UT_WORKQ_OUTSIDE_QOS
;
568 uth
->uu_save
.uus_workq_park_data
.workloop_params
= trp
.trp_value
;
569 // qos sent out to userspace (may differ from uu_workq_pri on param threads)
570 uth
->uu_save
.uus_workq_park_data
.qos
= qos
;
573 if (qos
== WORKQ_THREAD_QOS_MANAGER
) {
574 uint32_t mgr_pri
= wq
->wq_event_manager_priority
;
575 assert(trp
.trp_value
== 0); // manager qos and thread policy don't mix
577 if (mgr_pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
578 mgr_pri
&= _PTHREAD_PRIORITY_SCHED_PRI_MASK
;
579 thread_set_workq_pri(th
, THREAD_QOS_UNSPECIFIED
, mgr_pri
,
584 qos
= _pthread_priority_thread_qos(mgr_pri
);
586 if (trp
.trp_flags
& TRP_PRIORITY
) {
587 qos
= THREAD_QOS_UNSPECIFIED
;
588 priority
= trp
.trp_pri
;
589 uth
->uu_workq_flags
|= UT_WORKQ_OUTSIDE_QOS
;
592 if (trp
.trp_flags
& TRP_POLICY
) {
593 policy
= trp
.trp_pol
;
597 thread_set_workq_pri(th
, qos
, priority
, policy
);
601 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
602 * every time a servicer is being told about a new max QoS.
605 workq_thread_set_max_qos(struct proc
*p
, workq_threadreq_t kqr
)
607 struct uu_workq_policy old_pri
, new_pri
;
608 struct uthread
*uth
= current_uthread();
609 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
610 thread_qos_t qos
= kqr
->tr_kq_qos_index
;
612 if (uth
->uu_workq_pri
.qos_max
== qos
) {
617 old_pri
= new_pri
= uth
->uu_workq_pri
;
618 new_pri
.qos_max
= qos
;
619 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
623 #pragma mark idle threads accounting and handling
625 static inline struct uthread
*
626 workq_oldest_killable_idle_thread(struct workqueue
*wq
)
628 struct uthread
*uth
= TAILQ_LAST(&wq
->wq_thidlelist
, workq_uthread_head
);
630 if (uth
&& !uth
->uu_save
.uus_workq_park_data
.has_stack
) {
631 uth
= TAILQ_PREV(uth
, workq_uthread_head
, uu_workq_entry
);
633 assert(uth
->uu_save
.uus_workq_park_data
.has_stack
);
639 static inline uint64_t
640 workq_kill_delay_for_idle_thread(struct workqueue
*wq
)
642 uint64_t delay
= wq_reduce_pool_window
.abstime
;
643 uint16_t idle
= wq
->wq_thidlecount
;
646 * If we have less than wq_death_max_load threads, have a 5s timer.
648 * For the next wq_max_constrained_threads ones, decay linearly from
651 if (idle
<= wq_death_max_load
) {
655 if (wq_max_constrained_threads
> idle
- wq_death_max_load
) {
656 delay
*= (wq_max_constrained_threads
- (idle
- wq_death_max_load
));
658 return delay
/ wq_max_constrained_threads
;
662 workq_should_kill_idle_thread(struct workqueue
*wq
, struct uthread
*uth
,
665 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
666 return now
- uth
->uu_save
.uus_workq_park_data
.idle_stamp
> delay
;
670 workq_death_call_schedule(struct workqueue
*wq
, uint64_t deadline
)
672 uint32_t wq_flags
= os_atomic_load(&wq
->wq_flags
, relaxed
);
674 if (wq_flags
& (WQ_EXITING
| WQ_DEATH_CALL_SCHEDULED
)) {
677 os_atomic_or(&wq
->wq_flags
, WQ_DEATH_CALL_SCHEDULED
, relaxed
);
679 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_NONE
, wq
, 1, 0, 0, 0);
682 * <rdar://problem/13139182> Due to how long term timers work, the leeway
683 * can't be too short, so use 500ms which is long enough that we will not
684 * wake up the CPU for killing threads, but short enough that it doesn't
685 * fall into long-term timer list shenanigans.
687 thread_call_enter_delayed_with_leeway(wq
->wq_death_call
, NULL
, deadline
,
688 wq_reduce_pool_window
.abstime
/ 10,
689 THREAD_CALL_DELAY_LEEWAY
| THREAD_CALL_DELAY_USER_BACKGROUND
);
693 * `decrement` is set to the number of threads that are no longer dying:
694 * - because they have been resuscitated just in time (workq_pop_idle_thread)
695 * - or have been killed (workq_thread_terminate).
698 workq_death_policy_evaluate(struct workqueue
*wq
, uint16_t decrement
)
702 assert(wq
->wq_thdying_count
>= decrement
);
703 if ((wq
->wq_thdying_count
-= decrement
) > 0) {
707 if (wq
->wq_thidlecount
<= 1) {
711 if ((uth
= workq_oldest_killable_idle_thread(wq
)) == NULL
) {
715 uint64_t now
= mach_absolute_time();
716 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
718 if (now
- uth
->uu_save
.uus_workq_park_data
.idle_stamp
> delay
) {
719 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_START
,
720 wq
, wq
->wq_thidlecount
, 0, 0, 0);
721 wq
->wq_thdying_count
++;
722 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
723 if ((uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) == 0) {
724 workq_thread_wakeup(uth
);
729 workq_death_call_schedule(wq
,
730 uth
->uu_save
.uus_workq_park_data
.idle_stamp
+ delay
);
734 workq_thread_terminate(struct proc
*p
, struct uthread
*uth
)
736 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
739 TAILQ_REMOVE(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
740 if (uth
->uu_workq_flags
& UT_WORKQ_DYING
) {
741 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_END
,
742 wq
, wq
->wq_thidlecount
, 0, 0, 0);
743 workq_death_policy_evaluate(wq
, 1);
745 if (wq
->wq_nthreads
-- == wq_max_threads
) {
747 * We got under the thread limit again, which may have prevented
748 * thread creation from happening, redrive if there are pending requests
750 if (wq
->wq_reqcount
) {
751 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
756 thread_deallocate(uth
->uu_thread
);
760 workq_kill_old_threads_call(void *param0
, void *param1 __unused
)
762 struct workqueue
*wq
= param0
;
765 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
766 os_atomic_andnot(&wq
->wq_flags
, WQ_DEATH_CALL_SCHEDULED
, relaxed
);
767 workq_death_policy_evaluate(wq
, 0);
768 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
772 static struct uthread
*
773 workq_pop_idle_thread(struct workqueue
*wq
, uint8_t uu_flags
,
778 if ((uth
= TAILQ_FIRST(&wq
->wq_thidlelist
))) {
779 TAILQ_REMOVE(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
781 uth
= TAILQ_FIRST(&wq
->wq_thnewlist
);
782 TAILQ_REMOVE(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
784 TAILQ_INSERT_TAIL(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
786 assert((uth
->uu_workq_flags
& UT_WORKQ_RUNNING
) == 0);
787 uth
->uu_workq_flags
|= UT_WORKQ_RUNNING
| uu_flags
;
788 if ((uu_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
789 wq
->wq_constrained_threads_scheduled
++;
791 wq
->wq_threads_scheduled
++;
792 wq
->wq_thidlecount
--;
794 if (__improbable(uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
795 uth
->uu_workq_flags
^= UT_WORKQ_DYING
;
796 workq_death_policy_evaluate(wq
, 1);
797 *needs_wakeup
= false;
798 } else if (uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) {
799 *needs_wakeup
= false;
801 *needs_wakeup
= true;
807 * Called by thread_create_workq_waiting() during thread initialization, before
808 * assert_wait, before the thread has been started.
811 workq_thread_init_and_wq_lock(task_t task
, thread_t th
)
813 struct uthread
*uth
= get_bsdthread_info(th
);
815 uth
->uu_workq_flags
= UT_WORKQ_NEW
;
816 uth
->uu_workq_pri
= WORKQ_POLICY_INIT(THREAD_QOS_LEGACY
);
817 uth
->uu_workq_thport
= MACH_PORT_NULL
;
818 uth
->uu_workq_stackaddr
= 0;
819 uth
->uu_workq_pthread_kill_allowed
= 0;
821 thread_set_tag(th
, THREAD_TAG_PTHREAD
| THREAD_TAG_WORKQUEUE
);
822 thread_reset_workq_qos(th
, THREAD_QOS_LEGACY
);
824 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task
)));
825 return workq_parked_wait_event(uth
);
829 * Try to add a new workqueue thread.
831 * - called with workq lock held
832 * - dropped and retaken around thread creation
833 * - return with workq lock held
836 workq_add_new_idle_thread(proc_t p
, struct workqueue
*wq
)
838 mach_vm_offset_t th_stackaddr
;
846 vm_map_t vmap
= get_task_map(p
->task
);
848 kret
= pthread_functions
->workq_create_threadstack(p
, vmap
, &th_stackaddr
);
849 if (kret
!= KERN_SUCCESS
) {
850 WQ_TRACE_WQ(TRACE_wq_thread_create_failed
| DBG_FUNC_NONE
, wq
,
855 kret
= thread_create_workq_waiting(p
->task
, workq_unpark_continue
, &th
);
856 if (kret
!= KERN_SUCCESS
) {
857 WQ_TRACE_WQ(TRACE_wq_thread_create_failed
| DBG_FUNC_NONE
, wq
,
859 pthread_functions
->workq_destroy_threadstack(p
, vmap
, th_stackaddr
);
863 // thread_create_workq_waiting() will return with the wq lock held
864 // on success, because it calls workq_thread_init_and_wq_lock() above
866 struct uthread
*uth
= get_bsdthread_info(th
);
869 wq
->wq_thidlecount
++;
870 uth
->uu_workq_stackaddr
= (user_addr_t
)th_stackaddr
;
871 TAILQ_INSERT_TAIL(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
873 WQ_TRACE_WQ(TRACE_wq_thread_create
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
879 * Do not redrive here if we went under wq_max_threads again,
880 * it is the responsibility of the callers of this function
881 * to do so when it fails.
887 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
889 __attribute__((noreturn
, noinline
))
891 workq_unpark_for_death_and_unlock(proc_t p
, struct workqueue
*wq
,
892 struct uthread
*uth
, uint32_t death_flags
, uint32_t setup_flags
)
894 thread_qos_t qos
= workq_pri_override(uth
->uu_workq_pri
);
895 bool first_use
= uth
->uu_workq_flags
& UT_WORKQ_NEW
;
897 if (qos
> WORKQ_THREAD_QOS_CLEANUP
) {
898 workq_thread_reset_pri(wq
, uth
, NULL
, /*unpark*/ true);
899 qos
= WORKQ_THREAD_QOS_CLEANUP
;
902 workq_thread_reset_cpupercent(NULL
, uth
);
904 if (death_flags
& WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
) {
905 wq
->wq_thidlecount
--;
907 TAILQ_REMOVE(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
909 TAILQ_REMOVE(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
912 TAILQ_INSERT_TAIL(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
916 if (setup_flags
& WQ_SETUP_CLEAR_VOUCHER
) {
917 __assert_only kern_return_t kr
;
918 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
919 assert(kr
== KERN_SUCCESS
);
922 uint32_t flags
= WQ_FLAG_THREAD_NEWSPI
| qos
| WQ_FLAG_THREAD_PRIO_QOS
;
923 thread_t th
= uth
->uu_thread
;
924 vm_map_t vmap
= get_task_map(p
->task
);
927 flags
|= WQ_FLAG_THREAD_REUSE
;
930 pthread_functions
->workq_setup_thread(p
, th
, vmap
, uth
->uu_workq_stackaddr
,
931 uth
->uu_workq_thport
, 0, WQ_SETUP_EXIT_THREAD
, flags
);
932 __builtin_unreachable();
936 workq_is_current_thread_updating_turnstile(struct workqueue
*wq
)
938 return wq
->wq_turnstile_updater
== current_thread();
941 __attribute__((always_inline
))
943 workq_perform_turnstile_operation_locked(struct workqueue
*wq
,
944 void (^operation
)(void))
947 wq
->wq_turnstile_updater
= current_thread();
949 wq
->wq_turnstile_updater
= THREAD_NULL
;
953 workq_turnstile_update_inheritor(struct workqueue
*wq
,
954 turnstile_inheritor_t inheritor
,
955 turnstile_update_flags_t flags
)
957 if (wq
->wq_inheritor
== inheritor
) {
960 wq
->wq_inheritor
= inheritor
;
961 workq_perform_turnstile_operation_locked(wq
, ^{
962 turnstile_update_inheritor(wq
->wq_turnstile
, inheritor
,
963 flags
| TURNSTILE_IMMEDIATE_UPDATE
);
964 turnstile_update_inheritor_complete(wq
->wq_turnstile
,
965 TURNSTILE_INTERLOCK_HELD
);
970 workq_push_idle_thread(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
971 uint32_t setup_flags
)
973 uint64_t now
= mach_absolute_time();
974 bool is_creator
= (uth
== wq
->wq_creator
);
976 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
977 wq
->wq_constrained_threads_scheduled
--;
979 uth
->uu_workq_flags
&= ~(UT_WORKQ_RUNNING
| UT_WORKQ_OVERCOMMIT
);
980 TAILQ_REMOVE(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
981 wq
->wq_threads_scheduled
--;
984 wq
->wq_creator
= NULL
;
985 WQ_TRACE_WQ(TRACE_wq_creator_select
, wq
, 3, 0,
986 uth
->uu_save
.uus_workq_park_data
.yields
, 0);
989 if (wq
->wq_inheritor
== uth
->uu_thread
) {
990 assert(wq
->wq_creator
== NULL
);
991 if (wq
->wq_reqcount
) {
992 workq_turnstile_update_inheritor(wq
, wq
, TURNSTILE_INHERITOR_WORKQ
);
994 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
998 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
999 assert(is_creator
|| (_wq_flags(wq
) & WQ_EXITING
));
1000 TAILQ_INSERT_TAIL(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
1001 wq
->wq_thidlecount
++;
1006 _wq_thactive_dec(wq
, uth
->uu_workq_pri
.qos_bucket
);
1007 wq
->wq_thscheduled_count
[_wq_bucket(uth
->uu_workq_pri
.qos_bucket
)]--;
1008 uth
->uu_workq_flags
|= UT_WORKQ_IDLE_CLEANUP
;
1011 uth
->uu_save
.uus_workq_park_data
.idle_stamp
= now
;
1013 struct uthread
*oldest
= workq_oldest_killable_idle_thread(wq
);
1014 uint16_t cur_idle
= wq
->wq_thidlecount
;
1016 if (cur_idle
>= wq_max_constrained_threads
||
1017 (wq
->wq_thdying_count
== 0 && oldest
&&
1018 workq_should_kill_idle_thread(wq
, oldest
, now
))) {
1020 * Immediately kill threads if we have too may of them.
1022 * And swap "place" with the oldest one we'd have woken up.
1023 * This is a relatively desperate situation where we really
1024 * need to kill threads quickly and it's best to kill
1025 * the one that's currently on core than context switching.
1028 oldest
->uu_save
.uus_workq_park_data
.idle_stamp
= now
;
1029 TAILQ_REMOVE(&wq
->wq_thidlelist
, oldest
, uu_workq_entry
);
1030 TAILQ_INSERT_HEAD(&wq
->wq_thidlelist
, oldest
, uu_workq_entry
);
1033 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_START
,
1034 wq
, cur_idle
, 0, 0, 0);
1035 wq
->wq_thdying_count
++;
1036 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
1037 uth
->uu_workq_flags
&= ~UT_WORKQ_IDLE_CLEANUP
;
1038 workq_unpark_for_death_and_unlock(p
, wq
, uth
, 0, setup_flags
);
1039 __builtin_unreachable();
1042 struct uthread
*tail
= TAILQ_LAST(&wq
->wq_thidlelist
, workq_uthread_head
);
1045 wq
->wq_thidlecount
= cur_idle
;
1047 if (cur_idle
>= wq_death_max_load
&& tail
&&
1048 tail
->uu_save
.uus_workq_park_data
.has_stack
) {
1049 uth
->uu_save
.uus_workq_park_data
.has_stack
= false;
1050 TAILQ_INSERT_TAIL(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
1052 uth
->uu_save
.uus_workq_park_data
.has_stack
= true;
1053 TAILQ_INSERT_HEAD(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
1057 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
1058 workq_death_call_schedule(wq
, now
+ delay
);
1062 #pragma mark thread requests
1065 workq_priority_for_req(workq_threadreq_t req
)
1067 thread_qos_t qos
= req
->tr_qos
;
1069 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
1070 workq_threadreq_param_t trp
= kqueue_threadreq_workloop_param(req
);
1071 assert(trp
.trp_flags
& TRP_PRIORITY
);
1074 return thread_workq_pri_for_qos(qos
);
1077 static inline struct priority_queue_sched_max
*
1078 workq_priority_queue_for_req(struct workqueue
*wq
, workq_threadreq_t req
)
1080 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
1081 return &wq
->wq_special_queue
;
1082 } else if (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
1083 return &wq
->wq_overcommit_queue
;
1085 return &wq
->wq_constrained_queue
;
1090 * returns true if the the enqueued request is the highest priority item
1091 * in its priority queue.
1094 workq_threadreq_enqueue(struct workqueue
*wq
, workq_threadreq_t req
)
1096 assert(req
->tr_state
== WORKQ_TR_STATE_NEW
);
1098 req
->tr_state
= WORKQ_TR_STATE_QUEUED
;
1099 wq
->wq_reqcount
+= req
->tr_count
;
1101 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
1102 assert(wq
->wq_event_manager_threadreq
== NULL
);
1103 assert(req
->tr_flags
& WORKQ_TR_FLAG_KEVENT
);
1104 assert(req
->tr_count
== 1);
1105 wq
->wq_event_manager_threadreq
= req
;
1109 struct priority_queue_sched_max
*q
= workq_priority_queue_for_req(wq
, req
);
1110 priority_queue_entry_set_sched_pri(q
, &req
->tr_entry
,
1111 workq_priority_for_req(req
), false);
1113 if (priority_queue_insert(q
, &req
->tr_entry
)) {
1114 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
1115 _wq_thactive_refresh_best_constrained_req_qos(wq
);
1123 * returns true if the the dequeued request was the highest priority item
1124 * in its priority queue.
1127 workq_threadreq_dequeue(struct workqueue
*wq
, workq_threadreq_t req
)
1131 if (--req
->tr_count
== 0) {
1132 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
1133 assert(wq
->wq_event_manager_threadreq
== req
);
1134 assert(req
->tr_count
== 0);
1135 wq
->wq_event_manager_threadreq
= NULL
;
1138 if (priority_queue_remove(workq_priority_queue_for_req(wq
, req
),
1140 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
1141 _wq_thactive_refresh_best_constrained_req_qos(wq
);
1150 workq_threadreq_destroy(proc_t p
, workq_threadreq_t req
)
1152 req
->tr_state
= WORKQ_TR_STATE_CANCELED
;
1153 if (req
->tr_flags
& (WORKQ_TR_FLAG_WORKLOOP
| WORKQ_TR_FLAG_KEVENT
)) {
1154 kqueue_threadreq_cancel(p
, req
);
1156 zfree(workq_zone_threadreq
, req
);
1160 #pragma mark workqueue thread creation thread calls
1163 workq_thread_call_prepost(struct workqueue
*wq
, uint32_t sched
, uint32_t pend
,
1166 uint32_t old_flags
, new_flags
;
1168 os_atomic_rmw_loop(&wq
->wq_flags
, old_flags
, new_flags
, acquire
, {
1169 if (__improbable(old_flags
& (WQ_EXITING
| sched
| pend
| fail_mask
))) {
1170 os_atomic_rmw_loop_give_up(return false);
1172 if (__improbable(old_flags
& WQ_PROC_SUSPENDED
)) {
1173 new_flags
= old_flags
| pend
;
1175 new_flags
= old_flags
| sched
;
1179 return (old_flags
& WQ_PROC_SUSPENDED
) == 0;
1182 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1185 workq_schedule_delayed_thread_creation(struct workqueue
*wq
, int flags
)
1187 assert(!preemption_enabled());
1189 if (!workq_thread_call_prepost(wq
, WQ_DELAYED_CALL_SCHEDULED
,
1190 WQ_DELAYED_CALL_PENDED
, WQ_IMMEDIATE_CALL_PENDED
|
1191 WQ_IMMEDIATE_CALL_SCHEDULED
)) {
1195 uint64_t now
= mach_absolute_time();
1197 if (flags
& WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART
) {
1198 /* do not change the window */
1199 } else if (now
- wq
->wq_thread_call_last_run
<= wq
->wq_timer_interval
) {
1200 wq
->wq_timer_interval
*= 2;
1201 if (wq
->wq_timer_interval
> wq_max_timer_interval
.abstime
) {
1202 wq
->wq_timer_interval
= (uint32_t)wq_max_timer_interval
.abstime
;
1204 } else if (now
- wq
->wq_thread_call_last_run
> 2 * wq
->wq_timer_interval
) {
1205 wq
->wq_timer_interval
/= 2;
1206 if (wq
->wq_timer_interval
< wq_stalled_window
.abstime
) {
1207 wq
->wq_timer_interval
= (uint32_t)wq_stalled_window
.abstime
;
1211 WQ_TRACE_WQ(TRACE_wq_start_add_timer
, wq
, wq
->wq_reqcount
,
1212 _wq_flags(wq
), wq
->wq_timer_interval
, 0);
1214 thread_call_t call
= wq
->wq_delayed_call
;
1215 uintptr_t arg
= WQ_DELAYED_CALL_SCHEDULED
;
1216 uint64_t deadline
= now
+ wq
->wq_timer_interval
;
1217 if (thread_call_enter1_delayed(call
, (void *)arg
, deadline
)) {
1218 panic("delayed_call was already enqueued");
1224 workq_schedule_immediate_thread_creation(struct workqueue
*wq
)
1226 assert(!preemption_enabled());
1228 if (workq_thread_call_prepost(wq
, WQ_IMMEDIATE_CALL_SCHEDULED
,
1229 WQ_IMMEDIATE_CALL_PENDED
, 0)) {
1230 WQ_TRACE_WQ(TRACE_wq_start_add_timer
, wq
, wq
->wq_reqcount
,
1231 _wq_flags(wq
), 0, 0);
1233 uintptr_t arg
= WQ_IMMEDIATE_CALL_SCHEDULED
;
1234 if (thread_call_enter1(wq
->wq_immediate_call
, (void *)arg
)) {
1235 panic("immediate_call was already enqueued");
1241 workq_proc_suspended(struct proc
*p
)
1243 struct workqueue
*wq
= proc_get_wqptr(p
);
1246 os_atomic_or(&wq
->wq_flags
, WQ_PROC_SUSPENDED
, relaxed
);
1251 workq_proc_resumed(struct proc
*p
)
1253 struct workqueue
*wq
= proc_get_wqptr(p
);
1260 wq_flags
= os_atomic_andnot_orig(&wq
->wq_flags
, WQ_PROC_SUSPENDED
|
1261 WQ_DELAYED_CALL_PENDED
| WQ_IMMEDIATE_CALL_PENDED
, relaxed
);
1262 if ((wq_flags
& WQ_EXITING
) == 0) {
1263 disable_preemption();
1264 if (wq_flags
& WQ_IMMEDIATE_CALL_PENDED
) {
1265 workq_schedule_immediate_thread_creation(wq
);
1266 } else if (wq_flags
& WQ_DELAYED_CALL_PENDED
) {
1267 workq_schedule_delayed_thread_creation(wq
,
1268 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART
);
1270 enable_preemption();
1275 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1278 workq_thread_is_busy(uint64_t now
, _Atomic
uint64_t *lastblocked_tsp
)
1280 uint64_t lastblocked_ts
= os_atomic_load_wide(lastblocked_tsp
, relaxed
);
1281 if (now
<= lastblocked_ts
) {
1283 * Because the update of the timestamp when a thread blocks
1284 * isn't serialized against us looking at it (i.e. we don't hold
1285 * the workq lock), it's possible to have a timestamp that matches
1286 * the current time or that even looks to be in the future relative
1287 * to when we grabbed the current time...
1289 * Just treat this as a busy thread since it must have just blocked.
1293 return (now
- lastblocked_ts
) < wq_stalled_window
.abstime
;
1297 workq_add_new_threads_call(void *_p
, void *flags
)
1300 struct workqueue
*wq
= proc_get_wqptr(p
);
1301 uint32_t my_flag
= (uint32_t)(uintptr_t)flags
;
1304 * workq_exit() will set the workqueue to NULL before
1305 * it cancels thread calls.
1311 assert((my_flag
== WQ_DELAYED_CALL_SCHEDULED
) ||
1312 (my_flag
== WQ_IMMEDIATE_CALL_SCHEDULED
));
1314 WQ_TRACE_WQ(TRACE_wq_add_timer
| DBG_FUNC_START
, wq
, _wq_flags(wq
),
1315 wq
->wq_nthreads
, wq
->wq_thidlecount
, 0);
1317 workq_lock_spin(wq
);
1319 wq
->wq_thread_call_last_run
= mach_absolute_time();
1320 os_atomic_andnot(&wq
->wq_flags
, my_flag
, release
);
1322 /* This can drop the workqueue lock, and take it again */
1323 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
1327 WQ_TRACE_WQ(TRACE_wq_add_timer
| DBG_FUNC_END
, wq
, 0,
1328 wq
->wq_nthreads
, wq
->wq_thidlecount
, 0);
1331 #pragma mark thread state tracking
1334 workq_sched_callback(int type
, thread_t thread
)
1336 struct uthread
*uth
= get_bsdthread_info(thread
);
1337 proc_t proc
= get_bsdtask_info(get_threadtask(thread
));
1338 struct workqueue
*wq
= proc_get_wqptr(proc
);
1339 thread_qos_t req_qos
, qos
= uth
->uu_workq_pri
.qos_bucket
;
1340 wq_thactive_t old_thactive
;
1341 bool start_timer
= false;
1343 if (qos
== WORKQ_THREAD_QOS_MANAGER
) {
1348 case SCHED_CALL_BLOCK
:
1349 old_thactive
= _wq_thactive_dec(wq
, qos
);
1350 req_qos
= WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive
);
1353 * Remember the timestamp of the last thread that blocked in this
1354 * bucket, it used used by admission checks to ignore one thread
1355 * being inactive if this timestamp is recent enough.
1357 * If we collide with another thread trying to update the
1358 * last_blocked (really unlikely since another thread would have to
1359 * get scheduled and then block after we start down this path), it's
1360 * not a problem. Either timestamp is adequate, so no need to retry
1362 os_atomic_store_wide(&wq
->wq_lastblocked_ts
[_wq_bucket(qos
)],
1363 thread_last_run_time(thread
), relaxed
);
1365 if (req_qos
== THREAD_QOS_UNSPECIFIED
) {
1367 * No pending request at the moment we could unblock, move on.
1369 } else if (qos
< req_qos
) {
1371 * The blocking thread is at a lower QoS than the highest currently
1372 * pending constrained request, nothing has to be redriven
1375 uint32_t max_busycount
, old_req_count
;
1376 old_req_count
= _wq_thactive_aggregate_downto_qos(wq
, old_thactive
,
1377 req_qos
, NULL
, &max_busycount
);
1379 * If it is possible that may_start_constrained_thread had refused
1380 * admission due to being over the max concurrency, we may need to
1381 * spin up a new thread.
1383 * We take into account the maximum number of busy threads
1384 * that can affect may_start_constrained_thread as looking at the
1385 * actual number may_start_constrained_thread will see is racy.
1387 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1388 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1390 uint32_t conc
= wq_max_parallelism
[_wq_bucket(qos
)];
1391 if (old_req_count
<= conc
&& conc
<= old_req_count
+ max_busycount
) {
1392 start_timer
= workq_schedule_delayed_thread_creation(wq
, 0);
1395 if (__improbable(kdebug_enable
)) {
1396 __unused
uint32_t old
= _wq_thactive_aggregate_downto_qos(wq
,
1397 old_thactive
, qos
, NULL
, NULL
);
1398 WQ_TRACE_WQ(TRACE_wq_thread_block
| DBG_FUNC_START
, wq
,
1399 old
- 1, qos
| (req_qos
<< 8),
1400 wq
->wq_reqcount
<< 1 | start_timer
, 0);
1404 case SCHED_CALL_UNBLOCK
:
1406 * we cannot take the workqueue_lock here...
1407 * an UNBLOCK can occur from a timer event which
1408 * is run from an interrupt context... if the workqueue_lock
1409 * is already held by this processor, we'll deadlock...
1410 * the thread lock for the thread being UNBLOCKED
1413 old_thactive
= _wq_thactive_inc(wq
, qos
);
1414 if (__improbable(kdebug_enable
)) {
1415 __unused
uint32_t old
= _wq_thactive_aggregate_downto_qos(wq
,
1416 old_thactive
, qos
, NULL
, NULL
);
1417 req_qos
= WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive
);
1418 WQ_TRACE_WQ(TRACE_wq_thread_block
| DBG_FUNC_END
, wq
,
1419 old
+ 1, qos
| (req_qos
<< 8),
1420 wq
->wq_threads_scheduled
, 0);
1426 #pragma mark workq lifecycle
1429 workq_reference(struct workqueue
*wq
)
1431 os_ref_retain(&wq
->wq_refcnt
);
1435 workq_deallocate_queue_invoke(mpsc_queue_chain_t e
,
1436 __assert_only mpsc_daemon_queue_t dq
)
1438 struct workqueue
*wq
;
1439 struct turnstile
*ts
;
1441 wq
= mpsc_queue_element(e
, struct workqueue
, wq_destroy_link
);
1442 assert(dq
== &workq_deallocate_queue
);
1444 turnstile_complete((uintptr_t)wq
, &wq
->wq_turnstile
, &ts
, TURNSTILE_WORKQS
);
1446 turnstile_cleanup();
1447 turnstile_deallocate(ts
);
1449 lck_spin_destroy(&wq
->wq_lock
, &workq_lck_grp
);
1450 zfree(workq_zone_workqueue
, wq
);
1454 workq_deallocate(struct workqueue
*wq
)
1456 if (os_ref_release_relaxed(&wq
->wq_refcnt
) == 0) {
1457 workq_deallocate_queue_invoke(&wq
->wq_destroy_link
,
1458 &workq_deallocate_queue
);
1463 workq_deallocate_safe(struct workqueue
*wq
)
1465 if (__improbable(os_ref_release_relaxed(&wq
->wq_refcnt
) == 0)) {
1466 mpsc_daemon_enqueue(&workq_deallocate_queue
, &wq
->wq_destroy_link
,
1467 MPSC_QUEUE_DISABLE_PREEMPTION
);
1472 * Setup per-process state for the workqueue.
1475 workq_open(struct proc
*p
, __unused
struct workq_open_args
*uap
,
1476 __unused
int32_t *retval
)
1478 struct workqueue
*wq
;
1481 if ((p
->p_lflag
& P_LREGISTER
) == 0) {
1485 if (wq_init_constrained_limit
) {
1486 uint32_t limit
, num_cpus
= ml_wait_max_cpus();
1489 * set up the limit for the constrained pool
1490 * this is a virtual pool in that we don't
1491 * maintain it on a separate idle and run list
1493 limit
= num_cpus
* WORKQUEUE_CONSTRAINED_FACTOR
;
1495 if (limit
> wq_max_constrained_threads
) {
1496 wq_max_constrained_threads
= limit
;
1499 if (wq_max_threads
> WQ_THACTIVE_BUCKET_HALF
) {
1500 wq_max_threads
= WQ_THACTIVE_BUCKET_HALF
;
1502 if (wq_max_threads
> CONFIG_THREAD_MAX
- 20) {
1503 wq_max_threads
= CONFIG_THREAD_MAX
- 20;
1506 wq_death_max_load
= (uint16_t)fls(num_cpus
) + 1;
1508 for (thread_qos_t qos
= WORKQ_THREAD_QOS_MIN
; qos
<= WORKQ_THREAD_QOS_MAX
; qos
++) {
1509 wq_max_parallelism
[_wq_bucket(qos
)] =
1510 qos_max_parallelism(qos
, QOS_PARALLELISM_COUNT_LOGICAL
);
1513 wq_init_constrained_limit
= 0;
1516 if (proc_get_wqptr(p
) == NULL
) {
1517 if (proc_init_wqptr_or_wait(p
) == FALSE
) {
1518 assert(proc_get_wqptr(p
) != NULL
);
1522 wq
= (struct workqueue
*)zalloc(workq_zone_workqueue
);
1523 bzero(wq
, sizeof(struct workqueue
));
1525 os_ref_init_count(&wq
->wq_refcnt
, &workq_refgrp
, 1);
1527 // Start the event manager at the priority hinted at by the policy engine
1528 thread_qos_t mgr_priority_hint
= task_get_default_manager_qos(current_task());
1529 pthread_priority_t pp
= _pthread_priority_make_from_thread_qos(mgr_priority_hint
, 0, 0);
1530 wq
->wq_event_manager_priority
= (uint32_t)pp
;
1531 wq
->wq_timer_interval
= (uint32_t)wq_stalled_window
.abstime
;
1533 turnstile_prepare((uintptr_t)wq
, &wq
->wq_turnstile
, turnstile_alloc(),
1536 TAILQ_INIT(&wq
->wq_thrunlist
);
1537 TAILQ_INIT(&wq
->wq_thnewlist
);
1538 TAILQ_INIT(&wq
->wq_thidlelist
);
1539 priority_queue_init(&wq
->wq_overcommit_queue
);
1540 priority_queue_init(&wq
->wq_constrained_queue
);
1541 priority_queue_init(&wq
->wq_special_queue
);
1543 wq
->wq_delayed_call
= thread_call_allocate_with_options(
1544 workq_add_new_threads_call
, p
, THREAD_CALL_PRIORITY_KERNEL
,
1545 THREAD_CALL_OPTIONS_ONCE
);
1546 wq
->wq_immediate_call
= thread_call_allocate_with_options(
1547 workq_add_new_threads_call
, p
, THREAD_CALL_PRIORITY_KERNEL
,
1548 THREAD_CALL_OPTIONS_ONCE
);
1549 wq
->wq_death_call
= thread_call_allocate_with_options(
1550 workq_kill_old_threads_call
, wq
,
1551 THREAD_CALL_PRIORITY_USER
, THREAD_CALL_OPTIONS_ONCE
);
1553 lck_spin_init(&wq
->wq_lock
, &workq_lck_grp
, LCK_ATTR_NULL
);
1555 WQ_TRACE_WQ(TRACE_wq_create
| DBG_FUNC_NONE
, wq
,
1556 VM_KERNEL_ADDRHIDE(wq
), 0, 0, 0);
1557 proc_set_wqptr(p
, wq
);
1565 * Routine: workq_mark_exiting
1567 * Function: Mark the work queue such that new threads will not be added to the
1568 * work queue after we return.
1570 * Conditions: Called against the current process.
1573 workq_mark_exiting(struct proc
*p
)
1575 struct workqueue
*wq
= proc_get_wqptr(p
);
1577 workq_threadreq_t mgr_req
;
1583 WQ_TRACE_WQ(TRACE_wq_pthread_exit
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
1585 workq_lock_spin(wq
);
1587 wq_flags
= os_atomic_or_orig(&wq
->wq_flags
, WQ_EXITING
, relaxed
);
1588 if (__improbable(wq_flags
& WQ_EXITING
)) {
1589 panic("workq_mark_exiting called twice");
1593 * Opportunistically try to cancel thread calls that are likely in flight.
1594 * workq_exit() will do the proper cleanup.
1596 if (wq_flags
& WQ_IMMEDIATE_CALL_SCHEDULED
) {
1597 thread_call_cancel(wq
->wq_immediate_call
);
1599 if (wq_flags
& WQ_DELAYED_CALL_SCHEDULED
) {
1600 thread_call_cancel(wq
->wq_delayed_call
);
1602 if (wq_flags
& WQ_DEATH_CALL_SCHEDULED
) {
1603 thread_call_cancel(wq
->wq_death_call
);
1606 mgr_req
= wq
->wq_event_manager_threadreq
;
1607 wq
->wq_event_manager_threadreq
= NULL
;
1608 wq
->wq_reqcount
= 0; /* workq_schedule_creator must not look at queues */
1609 wq
->wq_creator
= NULL
;
1610 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
1615 kqueue_threadreq_cancel(p
, mgr_req
);
1618 * No one touches the priority queues once WQ_EXITING is set.
1619 * It is hence safe to do the tear down without holding any lock.
1621 priority_queue_destroy(&wq
->wq_overcommit_queue
,
1622 struct workq_threadreq_s
, tr_entry
, ^(workq_threadreq_t e
){
1623 workq_threadreq_destroy(p
, e
);
1625 priority_queue_destroy(&wq
->wq_constrained_queue
,
1626 struct workq_threadreq_s
, tr_entry
, ^(workq_threadreq_t e
){
1627 workq_threadreq_destroy(p
, e
);
1629 priority_queue_destroy(&wq
->wq_special_queue
,
1630 struct workq_threadreq_s
, tr_entry
, ^(workq_threadreq_t e
){
1631 workq_threadreq_destroy(p
, e
);
1634 WQ_TRACE(TRACE_wq_pthread_exit
| DBG_FUNC_END
, 0, 0, 0, 0, 0);
1638 * Routine: workq_exit
1640 * Function: clean up the work queue structure(s) now that there are no threads
1641 * left running inside the work queue (except possibly current_thread).
1643 * Conditions: Called by the last thread in the process.
1644 * Called against current process.
1647 workq_exit(struct proc
*p
)
1649 struct workqueue
*wq
;
1650 struct uthread
*uth
, *tmp
;
1652 wq
= os_atomic_xchg(&p
->p_wqptr
, NULL
, relaxed
);
1654 thread_t th
= current_thread();
1656 WQ_TRACE_WQ(TRACE_wq_workqueue_exit
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
1658 if (thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) {
1660 * <rdar://problem/40111515> Make sure we will no longer call the
1661 * sched call, if we ever block this thread, which the cancel_wait
1664 thread_sched_call(th
, NULL
);
1668 * Thread calls are always scheduled by the proc itself or under the
1669 * workqueue spinlock if WQ_EXITING is not yet set.
1671 * Either way, when this runs, the proc has no threads left beside
1672 * the one running this very code, so we know no thread call can be
1673 * dispatched anymore.
1675 thread_call_cancel_wait(wq
->wq_delayed_call
);
1676 thread_call_cancel_wait(wq
->wq_immediate_call
);
1677 thread_call_cancel_wait(wq
->wq_death_call
);
1678 thread_call_free(wq
->wq_delayed_call
);
1679 thread_call_free(wq
->wq_immediate_call
);
1680 thread_call_free(wq
->wq_death_call
);
1683 * Clean up workqueue data structures for threads that exited and
1684 * didn't get a chance to clean up after themselves.
1686 * idle/new threads should have been interrupted and died on their own
1688 TAILQ_FOREACH_SAFE(uth
, &wq
->wq_thrunlist
, uu_workq_entry
, tmp
) {
1689 thread_sched_call(uth
->uu_thread
, NULL
);
1690 thread_deallocate(uth
->uu_thread
);
1692 assert(TAILQ_EMPTY(&wq
->wq_thnewlist
));
1693 assert(TAILQ_EMPTY(&wq
->wq_thidlelist
));
1695 WQ_TRACE_WQ(TRACE_wq_destroy
| DBG_FUNC_END
, wq
,
1696 VM_KERNEL_ADDRHIDE(wq
), 0, 0, 0);
1698 workq_deallocate(wq
);
1700 WQ_TRACE(TRACE_wq_workqueue_exit
| DBG_FUNC_END
, 0, 0, 0, 0, 0);
1705 #pragma mark bsd thread control
1708 _pthread_priority_to_policy(pthread_priority_t priority
,
1709 thread_qos_policy_data_t
*data
)
1711 data
->qos_tier
= _pthread_priority_thread_qos(priority
);
1712 data
->tier_importance
= _pthread_priority_relpri(priority
);
1713 if (data
->qos_tier
== THREAD_QOS_UNSPECIFIED
|| data
->tier_importance
> 0 ||
1714 data
->tier_importance
< THREAD_QOS_MIN_TIER_IMPORTANCE
) {
1721 bsdthread_set_self(proc_t p
, thread_t th
, pthread_priority_t priority
,
1722 mach_port_name_t voucher
, enum workq_set_self_flags flags
)
1724 struct uthread
*uth
= get_bsdthread_info(th
);
1725 struct workqueue
*wq
= proc_get_wqptr(p
);
1728 int unbind_rv
= 0, qos_rv
= 0, voucher_rv
= 0, fixedpri_rv
= 0;
1729 bool is_wq_thread
= (thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
);
1731 if (flags
& WORKQ_SET_SELF_WQ_KEVENT_UNBIND
) {
1732 if (!is_wq_thread
) {
1737 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
1742 workq_threadreq_t kqr
= uth
->uu_kqr_bound
;
1744 unbind_rv
= EALREADY
;
1748 if (kqr
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
1753 kqueue_threadreq_unbind(p
, kqr
);
1757 if (flags
& WORKQ_SET_SELF_QOS_FLAG
) {
1758 thread_qos_policy_data_t new_policy
;
1760 if (!_pthread_priority_to_policy(priority
, &new_policy
)) {
1765 if (!is_wq_thread
) {
1767 * Threads opted out of QoS can't change QoS
1769 if (!thread_has_qos_policy(th
)) {
1773 } else if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
||
1774 uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_ABOVEUI
) {
1776 * Workqueue manager threads or threads above UI can't change QoS
1782 * For workqueue threads, possibly adjust buckets and redrive thread
1785 bool old_overcommit
= uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
;
1786 bool new_overcommit
= priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
1787 struct uu_workq_policy old_pri
, new_pri
;
1788 bool force_run
= false;
1790 workq_lock_spin(wq
);
1792 if (old_overcommit
!= new_overcommit
) {
1793 uth
->uu_workq_flags
^= UT_WORKQ_OVERCOMMIT
;
1794 if (old_overcommit
) {
1795 wq
->wq_constrained_threads_scheduled
++;
1796 } else if (wq
->wq_constrained_threads_scheduled
-- ==
1797 wq_max_constrained_threads
) {
1802 old_pri
= new_pri
= uth
->uu_workq_pri
;
1803 new_pri
.qos_req
= (thread_qos_t
)new_policy
.qos_tier
;
1804 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, force_run
);
1808 kr
= thread_policy_set_internal(th
, THREAD_QOS_POLICY
,
1809 (thread_policy_t
)&new_policy
, THREAD_QOS_POLICY_COUNT
);
1810 if (kr
!= KERN_SUCCESS
) {
1816 if (flags
& WORKQ_SET_SELF_VOUCHER_FLAG
) {
1817 kr
= thread_set_voucher_name(voucher
);
1818 if (kr
!= KERN_SUCCESS
) {
1819 voucher_rv
= ENOENT
;
1828 if (flags
& WORKQ_SET_SELF_FIXEDPRIORITY_FLAG
) {
1829 thread_extended_policy_data_t extpol
= {.timeshare
= 0};
1832 /* Not allowed on workqueue threads */
1833 fixedpri_rv
= ENOTSUP
;
1837 kr
= thread_policy_set_internal(th
, THREAD_EXTENDED_POLICY
,
1838 (thread_policy_t
)&extpol
, THREAD_EXTENDED_POLICY_COUNT
);
1839 if (kr
!= KERN_SUCCESS
) {
1840 fixedpri_rv
= EINVAL
;
1843 } else if (flags
& WORKQ_SET_SELF_TIMESHARE_FLAG
) {
1844 thread_extended_policy_data_t extpol
= {.timeshare
= 1};
1847 /* Not allowed on workqueue threads */
1848 fixedpri_rv
= ENOTSUP
;
1852 kr
= thread_policy_set_internal(th
, THREAD_EXTENDED_POLICY
,
1853 (thread_policy_t
)&extpol
, THREAD_EXTENDED_POLICY_COUNT
);
1854 if (kr
!= KERN_SUCCESS
) {
1855 fixedpri_rv
= EINVAL
;
1861 if (qos_rv
&& voucher_rv
) {
1862 /* Both failed, give that a unique error. */
1887 bsdthread_add_explicit_override(proc_t p
, mach_port_name_t kport
,
1888 pthread_priority_t pp
, user_addr_t resource
)
1890 thread_qos_t qos
= _pthread_priority_thread_qos(pp
);
1891 if (qos
== THREAD_QOS_UNSPECIFIED
) {
1895 thread_t th
= port_name_to_thread(kport
,
1896 PORT_TO_THREAD_IN_CURRENT_TASK
);
1897 if (th
== THREAD_NULL
) {
1901 int rv
= proc_thread_qos_add_override(p
->task
, th
, 0, qos
, TRUE
,
1902 resource
, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE
);
1904 thread_deallocate(th
);
1909 bsdthread_remove_explicit_override(proc_t p
, mach_port_name_t kport
,
1910 user_addr_t resource
)
1912 thread_t th
= port_name_to_thread(kport
,
1913 PORT_TO_THREAD_IN_CURRENT_TASK
);
1914 if (th
== THREAD_NULL
) {
1918 int rv
= proc_thread_qos_remove_override(p
->task
, th
, 0, resource
,
1919 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE
);
1921 thread_deallocate(th
);
1926 workq_thread_add_dispatch_override(proc_t p
, mach_port_name_t kport
,
1927 pthread_priority_t pp
, user_addr_t ulock_addr
)
1929 struct uu_workq_policy old_pri
, new_pri
;
1930 struct workqueue
*wq
= proc_get_wqptr(p
);
1932 thread_qos_t qos_override
= _pthread_priority_thread_qos(pp
);
1933 if (qos_override
== THREAD_QOS_UNSPECIFIED
) {
1937 thread_t thread
= port_name_to_thread(kport
,
1938 PORT_TO_THREAD_IN_CURRENT_TASK
);
1939 if (thread
== THREAD_NULL
) {
1943 struct uthread
*uth
= get_bsdthread_info(thread
);
1944 if ((thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
) == 0) {
1945 thread_deallocate(thread
);
1949 WQ_TRACE_WQ(TRACE_wq_override_dispatch
| DBG_FUNC_NONE
,
1950 wq
, thread_tid(thread
), 1, pp
, 0);
1952 thread_mtx_lock(thread
);
1958 * Workaround lack of explicit support for 'no-fault copyin'
1959 * <rdar://problem/24999882>, as disabling preemption prevents paging in
1961 disable_preemption();
1962 rc
= copyin_atomic32(ulock_addr
, &val
);
1963 enable_preemption();
1964 if (rc
== 0 && ulock_owner_value_to_port_name(val
) != kport
) {
1969 workq_lock_spin(wq
);
1971 old_pri
= uth
->uu_workq_pri
;
1972 if (old_pri
.qos_override
>= qos_override
) {
1974 } else if (thread
== current_thread()) {
1976 new_pri
.qos_override
= qos_override
;
1977 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
1979 uth
->uu_workq_pri
.qos_override
= qos_override
;
1980 if (qos_override
> workq_pri_override(old_pri
)) {
1981 thread_set_workq_override(thread
, qos_override
);
1988 thread_mtx_unlock(thread
);
1989 thread_deallocate(thread
);
1994 workq_thread_reset_dispatch_override(proc_t p
, thread_t thread
)
1996 struct uu_workq_policy old_pri
, new_pri
;
1997 struct workqueue
*wq
= proc_get_wqptr(p
);
1998 struct uthread
*uth
= get_bsdthread_info(thread
);
2000 if ((thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
) == 0) {
2004 WQ_TRACE_WQ(TRACE_wq_override_reset
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
2006 workq_lock_spin(wq
);
2007 old_pri
= new_pri
= uth
->uu_workq_pri
;
2008 new_pri
.qos_override
= THREAD_QOS_UNSPECIFIED
;
2009 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
2015 workq_thread_allow_kill(__unused proc_t p
, thread_t thread
, bool enable
)
2017 if (!(thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
)) {
2018 // If the thread isn't a workqueue thread, don't set the
2019 // kill_allowed bit; however, we still need to return 0
2020 // instead of an error code since this code is executed
2021 // on the abort path which needs to not depend on the
2022 // pthread_t (returning an error depends on pthread_t via
2026 struct uthread
*uth
= get_bsdthread_info(thread
);
2027 uth
->uu_workq_pthread_kill_allowed
= enable
;
2032 bsdthread_get_max_parallelism(thread_qos_t qos
, unsigned long flags
,
2035 static_assert(QOS_PARALLELISM_COUNT_LOGICAL
==
2036 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL
, "logical");
2037 static_assert(QOS_PARALLELISM_REALTIME
==
2038 _PTHREAD_QOS_PARALLELISM_REALTIME
, "realtime");
2040 if (flags
& ~(QOS_PARALLELISM_REALTIME
| QOS_PARALLELISM_COUNT_LOGICAL
)) {
2044 if (flags
& QOS_PARALLELISM_REALTIME
) {
2048 } else if (qos
== THREAD_QOS_UNSPECIFIED
|| qos
>= THREAD_QOS_LAST
) {
2052 *retval
= qos_max_parallelism(qos
, flags
);
2056 #define ENSURE_UNUSED(arg) \
2057 ({ if ((arg) != 0) { return EINVAL; } })
2060 bsdthread_ctl(struct proc
*p
, struct bsdthread_ctl_args
*uap
, int *retval
)
2063 case BSDTHREAD_CTL_QOS_OVERRIDE_START
:
2064 return bsdthread_add_explicit_override(p
, (mach_port_name_t
)uap
->arg1
,
2065 (pthread_priority_t
)uap
->arg2
, uap
->arg3
);
2066 case BSDTHREAD_CTL_QOS_OVERRIDE_END
:
2067 ENSURE_UNUSED(uap
->arg3
);
2068 return bsdthread_remove_explicit_override(p
, (mach_port_name_t
)uap
->arg1
,
2069 (user_addr_t
)uap
->arg2
);
2071 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH
:
2072 return workq_thread_add_dispatch_override(p
, (mach_port_name_t
)uap
->arg1
,
2073 (pthread_priority_t
)uap
->arg2
, uap
->arg3
);
2074 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET
:
2075 return workq_thread_reset_dispatch_override(p
, current_thread());
2077 case BSDTHREAD_CTL_SET_SELF
:
2078 return bsdthread_set_self(p
, current_thread(),
2079 (pthread_priority_t
)uap
->arg1
, (mach_port_name_t
)uap
->arg2
,
2080 (enum workq_set_self_flags
)uap
->arg3
);
2082 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM
:
2083 ENSURE_UNUSED(uap
->arg3
);
2084 return bsdthread_get_max_parallelism((thread_qos_t
)uap
->arg1
,
2085 (unsigned long)uap
->arg2
, retval
);
2086 case BSDTHREAD_CTL_WORKQ_ALLOW_KILL
:
2087 ENSURE_UNUSED(uap
->arg2
);
2088 ENSURE_UNUSED(uap
->arg3
);
2089 return workq_thread_allow_kill(p
, current_thread(), (bool)uap
->arg1
);
2091 case BSDTHREAD_CTL_SET_QOS
:
2092 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD
:
2093 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET
:
2094 /* no longer supported */
2102 #pragma mark workqueue thread manipulation
2105 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
2106 struct uthread
*uth
, uint32_t setup_flags
);
2109 workq_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
2110 struct uthread
*uth
, uint32_t setup_flags
);
2112 static void workq_setup_and_run(proc_t p
, struct uthread
*uth
, int flags
) __dead2
;
2114 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2115 static inline uint64_t
2116 workq_trace_req_id(workq_threadreq_t req
)
2118 struct kqworkloop
*kqwl
;
2119 if (req
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
2120 kqwl
= __container_of(req
, struct kqworkloop
, kqwl_request
);
2121 return kqwl
->kqwl_dynamicid
;
2124 return VM_KERNEL_ADDRHIDE(req
);
2129 * Entry point for libdispatch to ask for threads
2132 workq_reqthreads(struct proc
*p
, uint32_t reqcount
, pthread_priority_t pp
)
2134 thread_qos_t qos
= _pthread_priority_thread_qos(pp
);
2135 struct workqueue
*wq
= proc_get_wqptr(p
);
2136 uint32_t unpaced
, upcall_flags
= WQ_FLAG_THREAD_NEWSPI
;
2138 if (wq
== NULL
|| reqcount
<= 0 || reqcount
> UINT16_MAX
||
2139 qos
== THREAD_QOS_UNSPECIFIED
) {
2143 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads
| DBG_FUNC_NONE
,
2144 wq
, reqcount
, pp
, 0, 0);
2146 workq_threadreq_t req
= zalloc(workq_zone_threadreq
);
2147 priority_queue_entry_init(&req
->tr_entry
);
2148 req
->tr_state
= WORKQ_TR_STATE_NEW
;
2152 if (pp
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
) {
2153 req
->tr_flags
|= WORKQ_TR_FLAG_OVERCOMMIT
;
2154 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
2157 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate
| DBG_FUNC_NONE
,
2158 wq
, workq_trace_req_id(req
), req
->tr_qos
, reqcount
, 0);
2160 workq_lock_spin(wq
);
2162 if (_wq_exiting(wq
)) {
2167 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2168 * threads without pacing, to inform the scheduler of that workload.
2170 * The last requests, or the ones that failed the admission checks are
2171 * enqueued and go through the regular creator codepath.
2173 * If there aren't enough threads, add one, but re-evaluate everything
2174 * as conditions may now have changed.
2176 if (reqcount
> 1 && (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
2177 unpaced
= workq_constrained_allowance(wq
, qos
, NULL
, false);
2178 if (unpaced
>= reqcount
- 1) {
2179 unpaced
= reqcount
- 1;
2182 unpaced
= reqcount
- 1;
2186 * This path does not currently handle custom workloop parameters
2187 * when creating threads for parallelism.
2189 assert(!(req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
));
2192 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
2194 while (unpaced
> 0 && wq
->wq_thidlecount
) {
2195 struct uthread
*uth
;
2197 uint8_t uu_flags
= UT_WORKQ_EARLY_BOUND
;
2199 if (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
2200 uu_flags
|= UT_WORKQ_OVERCOMMIT
;
2203 uth
= workq_pop_idle_thread(wq
, uu_flags
, &needs_wakeup
);
2205 _wq_thactive_inc(wq
, qos
);
2206 wq
->wq_thscheduled_count
[_wq_bucket(qos
)]++;
2207 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
2210 uth
->uu_save
.uus_workq_park_data
.upcall_flags
= upcall_flags
;
2211 uth
->uu_save
.uus_workq_park_data
.thread_request
= req
;
2213 workq_thread_wakeup(uth
);
2218 } while (unpaced
&& wq
->wq_nthreads
< wq_max_threads
&&
2219 workq_add_new_idle_thread(p
, wq
));
2221 if (_wq_exiting(wq
)) {
2225 req
->tr_count
= (uint16_t)reqcount
;
2226 if (workq_threadreq_enqueue(wq
, req
)) {
2227 /* This can drop the workqueue lock, and take it again */
2228 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
2235 zfree(workq_zone_threadreq
, req
);
2240 workq_kern_threadreq_initiate(struct proc
*p
, workq_threadreq_t req
,
2241 struct turnstile
*workloop_ts
, thread_qos_t qos
,
2242 workq_kern_threadreq_flags_t flags
)
2244 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2245 struct uthread
*uth
= NULL
;
2247 assert(req
->tr_flags
& (WORKQ_TR_FLAG_WORKLOOP
| WORKQ_TR_FLAG_KEVENT
));
2249 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
2250 workq_threadreq_param_t trp
= kqueue_threadreq_workloop_param(req
);
2251 qos
= thread_workq_qos_for_pri(trp
.trp_pri
);
2252 if (qos
== THREAD_QOS_UNSPECIFIED
) {
2253 qos
= WORKQ_THREAD_QOS_ABOVEUI
;
2257 assert(req
->tr_state
== WORKQ_TR_STATE_IDLE
);
2258 priority_queue_entry_init(&req
->tr_entry
);
2260 req
->tr_state
= WORKQ_TR_STATE_NEW
;
2263 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate
| DBG_FUNC_NONE
, wq
,
2264 workq_trace_req_id(req
), qos
, 1, 0);
2266 if (flags
& WORKQ_THREADREQ_ATTEMPT_REBIND
) {
2268 * we're called back synchronously from the context of
2269 * kqueue_threadreq_unbind from within workq_thread_return()
2270 * we can try to match up this thread with this request !
2272 uth
= current_uthread();
2273 assert(uth
->uu_kqr_bound
== NULL
);
2276 workq_lock_spin(wq
);
2277 if (_wq_exiting(wq
)) {
2278 req
->tr_state
= WORKQ_TR_STATE_IDLE
;
2283 if (uth
&& workq_threadreq_admissible(wq
, uth
, req
)) {
2284 assert(uth
!= wq
->wq_creator
);
2285 if (uth
->uu_workq_pri
.qos_bucket
!= req
->tr_qos
) {
2286 _wq_thactive_move(wq
, uth
->uu_workq_pri
.qos_bucket
, req
->tr_qos
);
2287 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ false);
2290 * We're called from workq_kern_threadreq_initiate()
2291 * due to an unbind, with the kq req held.
2293 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
2294 workq_trace_req_id(req
), 0, 0, 0);
2296 kqueue_threadreq_bind(p
, req
, uth
->uu_thread
, 0);
2299 workq_perform_turnstile_operation_locked(wq
, ^{
2300 turnstile_update_inheritor(workloop_ts
, wq
->wq_turnstile
,
2301 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_TURNSTILE
);
2302 turnstile_update_inheritor_complete(workloop_ts
,
2303 TURNSTILE_INTERLOCK_HELD
);
2306 if (workq_threadreq_enqueue(wq
, req
)) {
2307 workq_schedule_creator(p
, wq
, flags
);
2317 workq_kern_threadreq_modify(struct proc
*p
, workq_threadreq_t req
,
2318 thread_qos_t qos
, workq_kern_threadreq_flags_t flags
)
2320 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2321 bool make_overcommit
= false;
2323 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
2324 /* Requests outside-of-QoS shouldn't accept modify operations */
2328 workq_lock_spin(wq
);
2330 assert(req
->tr_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2331 assert(req
->tr_flags
& (WORKQ_TR_FLAG_KEVENT
| WORKQ_TR_FLAG_WORKLOOP
));
2333 if (req
->tr_state
== WORKQ_TR_STATE_BINDING
) {
2334 kqueue_threadreq_bind(p
, req
, req
->tr_thread
, 0);
2339 if (flags
& WORKQ_THREADREQ_MAKE_OVERCOMMIT
) {
2340 make_overcommit
= (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0;
2343 if (_wq_exiting(wq
) || (req
->tr_qos
== qos
&& !make_overcommit
)) {
2348 assert(req
->tr_count
== 1);
2349 if (req
->tr_state
!= WORKQ_TR_STATE_QUEUED
) {
2350 panic("Invalid thread request (%p) state %d", req
, req
->tr_state
);
2353 WQ_TRACE_WQ(TRACE_wq_thread_request_modify
| DBG_FUNC_NONE
, wq
,
2354 workq_trace_req_id(req
), qos
, 0, 0);
2356 struct priority_queue_sched_max
*pq
= workq_priority_queue_for_req(wq
, req
);
2357 workq_threadreq_t req_max
;
2360 * Stage 1: Dequeue the request from its priority queue.
2362 * If we dequeue the root item of the constrained priority queue,
2363 * maintain the best constrained request qos invariant.
2365 if (priority_queue_remove(pq
, &req
->tr_entry
)) {
2366 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
2367 _wq_thactive_refresh_best_constrained_req_qos(wq
);
2372 * Stage 2: Apply changes to the thread request
2374 * If the item will not become the root of the priority queue it belongs to,
2375 * then we need to wait in line, just enqueue and return quickly.
2377 if (__improbable(make_overcommit
)) {
2378 req
->tr_flags
^= WORKQ_TR_FLAG_OVERCOMMIT
;
2379 pq
= workq_priority_queue_for_req(wq
, req
);
2383 req_max
= priority_queue_max(pq
, struct workq_threadreq_s
, tr_entry
);
2384 if (req_max
&& req_max
->tr_qos
>= qos
) {
2385 priority_queue_entry_set_sched_pri(pq
, &req
->tr_entry
,
2386 workq_priority_for_req(req
), false);
2387 priority_queue_insert(pq
, &req
->tr_entry
);
2393 * Stage 3: Reevaluate whether we should run the thread request.
2395 * Pretend the thread request is new again:
2396 * - adjust wq_reqcount to not count it anymore.
2397 * - make its state WORKQ_TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
2398 * properly attempts a synchronous bind)
2401 req
->tr_state
= WORKQ_TR_STATE_NEW
;
2402 if (workq_threadreq_enqueue(wq
, req
)) {
2403 workq_schedule_creator(p
, wq
, flags
);
2409 workq_kern_threadreq_lock(struct proc
*p
)
2411 workq_lock_spin(proc_get_wqptr_fast(p
));
2415 workq_kern_threadreq_unlock(struct proc
*p
)
2417 workq_unlock(proc_get_wqptr_fast(p
));
2421 workq_kern_threadreq_update_inheritor(struct proc
*p
, workq_threadreq_t req
,
2422 thread_t owner
, struct turnstile
*wl_ts
,
2423 turnstile_update_flags_t flags
)
2425 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2426 turnstile_inheritor_t inheritor
;
2428 assert(req
->tr_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2429 assert(req
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
);
2430 workq_lock_held(wq
);
2432 if (req
->tr_state
== WORKQ_TR_STATE_BINDING
) {
2433 kqueue_threadreq_bind(p
, req
, req
->tr_thread
,
2434 KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE
);
2438 if (_wq_exiting(wq
)) {
2439 inheritor
= TURNSTILE_INHERITOR_NULL
;
2441 if (req
->tr_state
!= WORKQ_TR_STATE_QUEUED
) {
2442 panic("Invalid thread request (%p) state %d", req
, req
->tr_state
);
2447 flags
|= TURNSTILE_INHERITOR_THREAD
;
2449 inheritor
= wq
->wq_turnstile
;
2450 flags
|= TURNSTILE_INHERITOR_TURNSTILE
;
2454 workq_perform_turnstile_operation_locked(wq
, ^{
2455 turnstile_update_inheritor(wl_ts
, inheritor
, flags
);
2460 workq_kern_threadreq_redrive(struct proc
*p
, workq_kern_threadreq_flags_t flags
)
2462 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2464 workq_lock_spin(wq
);
2465 workq_schedule_creator(p
, wq
, flags
);
2470 workq_schedule_creator_turnstile_redrive(struct workqueue
*wq
, bool locked
)
2473 workq_schedule_creator(NULL
, wq
, WORKQ_THREADREQ_NONE
);
2475 workq_schedule_immediate_thread_creation(wq
);
2480 workq_thread_return(struct proc
*p
, struct workq_kernreturn_args
*uap
,
2481 struct workqueue
*wq
)
2483 thread_t th
= current_thread();
2484 struct uthread
*uth
= get_bsdthread_info(th
);
2485 workq_threadreq_t kqr
= uth
->uu_kqr_bound
;
2486 workq_threadreq_param_t trp
= { };
2487 int nevents
= uap
->affinity
, error
;
2488 user_addr_t eventlist
= uap
->item
;
2490 if (((thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) == 0) ||
2491 (uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
2495 if (eventlist
&& nevents
&& kqr
== NULL
) {
2499 /* reset signal mask on the workqueue thread to default state */
2500 if (uth
->uu_sigmask
!= (sigset_t
)(~workq_threadmask
)) {
2502 uth
->uu_sigmask
= ~workq_threadmask
;
2506 if (kqr
&& kqr
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
) {
2508 * Ensure we store the threadreq param before unbinding
2509 * the kqr from this thread.
2511 trp
= kqueue_threadreq_workloop_param(kqr
);
2515 * Freeze thee base pri while we decide the fate of this thread.
2518 * - we return to user and kevent_cleanup will have unfrozen the base pri,
2519 * - or we proceed to workq_select_threadreq_or_park_and_unlock() who will.
2521 thread_freeze_base_pri(th
);
2524 uint32_t upcall_flags
= WQ_FLAG_THREAD_NEWSPI
| WQ_FLAG_THREAD_REUSE
;
2525 if (kqr
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
2526 upcall_flags
|= WQ_FLAG_THREAD_WORKLOOP
| WQ_FLAG_THREAD_KEVENT
;
2528 upcall_flags
|= WQ_FLAG_THREAD_KEVENT
;
2530 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
2531 upcall_flags
|= WQ_FLAG_THREAD_EVENT_MANAGER
;
2533 if (uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) {
2534 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
2536 if (uth
->uu_workq_flags
& UT_WORKQ_OUTSIDE_QOS
) {
2537 upcall_flags
|= WQ_FLAG_THREAD_OUTSIDEQOS
;
2539 upcall_flags
|= uth
->uu_workq_pri
.qos_req
|
2540 WQ_FLAG_THREAD_PRIO_QOS
;
2544 error
= pthread_functions
->workq_handle_stack_events(p
, th
,
2545 get_task_map(p
->task
), uth
->uu_workq_stackaddr
,
2546 uth
->uu_workq_thport
, eventlist
, nevents
, upcall_flags
);
2548 assert(uth
->uu_kqr_bound
== kqr
);
2552 // pthread is supposed to pass KEVENT_FLAG_PARKING here
2553 // which should cause the above call to either:
2555 // - return an error
2556 // - return 0 and have unbound properly
2557 assert(uth
->uu_kqr_bound
== NULL
);
2560 WQ_TRACE_WQ(TRACE_wq_runthread
| DBG_FUNC_END
, wq
, uap
->options
, 0, 0, 0);
2562 thread_sched_call(th
, NULL
);
2563 thread_will_park_or_terminate(th
);
2564 #if CONFIG_WORKLOOP_DEBUG
2565 UU_KEVENT_HISTORY_WRITE_ENTRY(uth
, { .uu_error
= -1, });
2568 workq_lock_spin(wq
);
2569 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2570 uth
->uu_save
.uus_workq_park_data
.workloop_params
= trp
.trp_value
;
2571 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
,
2572 WQ_SETUP_CLEAR_VOUCHER
);
2573 __builtin_unreachable();
2577 * Multiplexed call to interact with the workqueue mechanism
2580 workq_kernreturn(struct proc
*p
, struct workq_kernreturn_args
*uap
, int32_t *retval
)
2582 int options
= uap
->options
;
2583 int arg2
= uap
->affinity
;
2584 int arg3
= uap
->prio
;
2585 struct workqueue
*wq
= proc_get_wqptr(p
);
2588 if ((p
->p_lflag
& P_LREGISTER
) == 0) {
2593 case WQOPS_QUEUE_NEWSPISUPP
: {
2595 * arg2 = offset of serialno into dispatch queue
2596 * arg3 = kevent support
2600 // If we get here, then userspace has indicated support for kevent delivery.
2603 p
->p_dispatchqueue_serialno_offset
= (uint64_t)offset
;
2606 case WQOPS_QUEUE_REQTHREADS
: {
2608 * arg2 = number of threads to start
2611 error
= workq_reqthreads(p
, arg2
, arg3
);
2614 case WQOPS_SET_EVENT_MANAGER_PRIORITY
: {
2616 * arg2 = priority for the manager thread
2618 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
2619 * the low bits of the value contains a scheduling priority
2620 * instead of a QOS value
2622 pthread_priority_t pri
= arg2
;
2630 * Normalize the incoming priority so that it is ordered numerically.
2632 if (pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
2633 pri
&= (_PTHREAD_PRIORITY_SCHED_PRI_MASK
|
2634 _PTHREAD_PRIORITY_SCHED_PRI_FLAG
);
2636 thread_qos_t qos
= _pthread_priority_thread_qos(pri
);
2637 int relpri
= _pthread_priority_relpri(pri
);
2638 if (relpri
> 0 || relpri
< THREAD_QOS_MIN_TIER_IMPORTANCE
||
2639 qos
== THREAD_QOS_UNSPECIFIED
) {
2643 pri
&= ~_PTHREAD_PRIORITY_FLAGS_MASK
;
2647 * If userspace passes a scheduling priority, that wins over any QoS.
2648 * Userspace should takes care not to lower the priority this way.
2650 workq_lock_spin(wq
);
2651 if (wq
->wq_event_manager_priority
< (uint32_t)pri
) {
2652 wq
->wq_event_manager_priority
= (uint32_t)pri
;
2657 case WQOPS_THREAD_KEVENT_RETURN
:
2658 case WQOPS_THREAD_WORKLOOP_RETURN
:
2659 case WQOPS_THREAD_RETURN
: {
2660 error
= workq_thread_return(p
, uap
, wq
);
2664 case WQOPS_SHOULD_NARROW
: {
2666 * arg2 = priority to test
2669 thread_t th
= current_thread();
2670 struct uthread
*uth
= get_bsdthread_info(th
);
2671 if (((thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) == 0) ||
2672 (uth
->uu_workq_flags
& (UT_WORKQ_DYING
| UT_WORKQ_OVERCOMMIT
))) {
2677 thread_qos_t qos
= _pthread_priority_thread_qos(arg2
);
2678 if (qos
== THREAD_QOS_UNSPECIFIED
) {
2682 workq_lock_spin(wq
);
2683 bool should_narrow
= !workq_constrained_allowance(wq
, qos
, uth
, false);
2686 *retval
= should_narrow
;
2689 case WQOPS_SETUP_DISPATCH
: {
2691 * item = pointer to workq_dispatch_config structure
2692 * arg2 = sizeof(item)
2694 struct workq_dispatch_config cfg
;
2695 bzero(&cfg
, sizeof(cfg
));
2697 error
= copyin(uap
->item
, &cfg
, MIN(sizeof(cfg
), (unsigned long) arg2
));
2702 if (cfg
.wdc_flags
& ~WORKQ_DISPATCH_SUPPORTED_FLAGS
||
2703 cfg
.wdc_version
< WORKQ_DISPATCH_MIN_SUPPORTED_VERSION
) {
2708 /* Load fields from version 1 */
2709 p
->p_dispatchqueue_serialno_offset
= cfg
.wdc_queue_serialno_offs
;
2711 /* Load fields from version 2 */
2712 if (cfg
.wdc_version
>= 2) {
2713 p
->p_dispatchqueue_label_offset
= cfg
.wdc_queue_label_offs
;
2727 * We have no work to do, park ourselves on the idle list.
2729 * Consumes the workqueue lock and does not return.
2731 __attribute__((noreturn
, noinline
))
2733 workq_park_and_unlock(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
2734 uint32_t setup_flags
)
2736 assert(uth
== current_uthread());
2737 assert(uth
->uu_kqr_bound
== NULL
);
2738 workq_push_idle_thread(p
, wq
, uth
, setup_flags
); // may not return
2740 workq_thread_reset_cpupercent(NULL
, uth
);
2742 if ((uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) &&
2743 !(uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
2747 * workq_push_idle_thread() will unset `has_stack`
2748 * if it wants us to free the stack before parking.
2750 if (!uth
->uu_save
.uus_workq_park_data
.has_stack
) {
2751 pthread_functions
->workq_markfree_threadstack(p
, uth
->uu_thread
,
2752 get_task_map(p
->task
), uth
->uu_workq_stackaddr
);
2756 * When we remove the voucher from the thread, we may lose our importance
2757 * causing us to get preempted, so we do this after putting the thread on
2758 * the idle list. Then, when we get our importance back we'll be able to
2759 * use this thread from e.g. the kevent call out to deliver a boosting
2762 __assert_only kern_return_t kr
;
2763 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
2764 assert(kr
== KERN_SUCCESS
);
2766 workq_lock_spin(wq
);
2767 uth
->uu_workq_flags
&= ~UT_WORKQ_IDLE_CLEANUP
;
2768 setup_flags
&= ~WQ_SETUP_CLEAR_VOUCHER
;
2771 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2773 if (uth
->uu_workq_flags
& UT_WORKQ_RUNNING
) {
2775 * While we'd dropped the lock to unset our voucher, someone came
2776 * around and made us runnable. But because we weren't waiting on the
2777 * event their thread_wakeup() was ineffectual. To correct for that,
2778 * we just run the continuation ourselves.
2780 workq_unpark_select_threadreq_or_park_and_unlock(p
, wq
, uth
, setup_flags
);
2781 __builtin_unreachable();
2784 if (uth
->uu_workq_flags
& UT_WORKQ_DYING
) {
2785 workq_unpark_for_death_and_unlock(p
, wq
, uth
,
2786 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
, setup_flags
);
2787 __builtin_unreachable();
2790 thread_set_pending_block_hint(uth
->uu_thread
, kThreadWaitParkedWorkQueue
);
2791 assert_wait(workq_parked_wait_event(uth
), THREAD_INTERRUPTIBLE
);
2793 thread_block(workq_unpark_continue
);
2794 __builtin_unreachable();
2798 workq_may_start_event_mgr_thread(struct workqueue
*wq
, struct uthread
*uth
)
2801 * There's an event manager request and either:
2802 * - no event manager currently running
2803 * - we are re-using the event manager
2805 return wq
->wq_thscheduled_count
[_wq_bucket(WORKQ_THREAD_QOS_MANAGER
)] == 0 ||
2806 (uth
&& uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
);
2810 workq_constrained_allowance(struct workqueue
*wq
, thread_qos_t at_qos
,
2811 struct uthread
*uth
, bool may_start_timer
)
2813 assert(at_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2816 uint32_t max_count
= wq
->wq_constrained_threads_scheduled
;
2817 if (uth
&& (uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
2819 * don't count the current thread as scheduled
2821 assert(max_count
> 0);
2824 if (max_count
>= wq_max_constrained_threads
) {
2825 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 1,
2826 wq
->wq_constrained_threads_scheduled
,
2827 wq_max_constrained_threads
, 0);
2829 * we need 1 or more constrained threads to return to the kernel before
2830 * we can dispatch additional work
2834 max_count
-= wq_max_constrained_threads
;
2837 * Compute a metric for many how many threads are active. We find the
2838 * highest priority request outstanding and then add up the number of
2839 * active threads in that and all higher-priority buckets. We'll also add
2840 * any "busy" threads which are not active but blocked recently enough that
2841 * we can't be sure they've gone idle yet. We'll then compare this metric
2842 * to our max concurrency to decide whether to add a new thread.
2845 uint32_t busycount
, thactive_count
;
2847 thactive_count
= _wq_thactive_aggregate_downto_qos(wq
, _wq_thactive(wq
),
2848 at_qos
, &busycount
, NULL
);
2850 if (uth
&& uth
->uu_workq_pri
.qos_bucket
!= WORKQ_THREAD_QOS_MANAGER
&&
2851 at_qos
<= uth
->uu_workq_pri
.qos_bucket
) {
2853 * Don't count this thread as currently active, but only if it's not
2854 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
2857 assert(thactive_count
> 0);
2861 count
= wq_max_parallelism
[_wq_bucket(at_qos
)];
2862 if (count
> thactive_count
+ busycount
) {
2863 count
-= thactive_count
+ busycount
;
2864 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 2,
2865 thactive_count
, busycount
, 0);
2866 return MIN(count
, max_count
);
2868 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 3,
2869 thactive_count
, busycount
, 0);
2872 if (busycount
&& may_start_timer
) {
2874 * If this is called from the add timer, we won't have another timer
2875 * fire when the thread exits the "busy" state, so rearm the timer.
2877 workq_schedule_delayed_thread_creation(wq
, 0);
2884 workq_threadreq_admissible(struct workqueue
*wq
, struct uthread
*uth
,
2885 workq_threadreq_t req
)
2887 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
2888 return workq_may_start_event_mgr_thread(wq
, uth
);
2890 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
2891 return workq_constrained_allowance(wq
, req
->tr_qos
, uth
, true);
2896 static workq_threadreq_t
2897 workq_threadreq_select_for_creator(struct workqueue
*wq
)
2899 workq_threadreq_t req_qos
, req_pri
, req_tmp
, req_mgr
;
2900 thread_qos_t qos
= THREAD_QOS_UNSPECIFIED
;
2904 * Compute the best priority request, and ignore the turnstile for now
2907 req_pri
= priority_queue_max(&wq
->wq_special_queue
,
2908 struct workq_threadreq_s
, tr_entry
);
2910 pri
= (uint8_t)priority_queue_entry_sched_pri(&wq
->wq_special_queue
,
2911 &req_pri
->tr_entry
);
2915 * Handle the manager thread request. The special queue might yield
2916 * a higher priority, but the manager always beats the QoS world.
2919 req_mgr
= wq
->wq_event_manager_threadreq
;
2920 if (req_mgr
&& workq_may_start_event_mgr_thread(wq
, NULL
)) {
2921 uint32_t mgr_pri
= wq
->wq_event_manager_priority
;
2923 if (mgr_pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
2924 mgr_pri
&= _PTHREAD_PRIORITY_SCHED_PRI_MASK
;
2926 mgr_pri
= thread_workq_pri_for_qos(
2927 _pthread_priority_thread_qos(mgr_pri
));
2930 return mgr_pri
>= pri
? req_mgr
: req_pri
;
2934 * Compute the best QoS Request, and check whether it beats the "pri" one
2937 req_qos
= priority_queue_max(&wq
->wq_overcommit_queue
,
2938 struct workq_threadreq_s
, tr_entry
);
2940 qos
= req_qos
->tr_qos
;
2943 req_tmp
= priority_queue_max(&wq
->wq_constrained_queue
,
2944 struct workq_threadreq_s
, tr_entry
);
2946 if (req_tmp
&& qos
< req_tmp
->tr_qos
) {
2947 if (pri
&& pri
>= thread_workq_pri_for_qos(req_tmp
->tr_qos
)) {
2951 if (workq_constrained_allowance(wq
, req_tmp
->tr_qos
, NULL
, true)) {
2953 * If the constrained thread request is the best one and passes
2954 * the admission check, pick it.
2960 if (pri
&& (!qos
|| pri
>= thread_workq_pri_for_qos(qos
))) {
2969 * If we had no eligible request but we have a turnstile push,
2970 * it must be a non overcommit thread request that failed
2971 * the admission check.
2973 * Just fake a BG thread request so that if the push stops the creator
2974 * priority just drops to 4.
2976 if (turnstile_workq_proprietor_of_max_turnstile(wq
->wq_turnstile
, NULL
)) {
2977 static struct workq_threadreq_s workq_sync_push_fake_req
= {
2978 .tr_qos
= THREAD_QOS_BACKGROUND
,
2981 return &workq_sync_push_fake_req
;
2987 static workq_threadreq_t
2988 workq_threadreq_select(struct workqueue
*wq
, struct uthread
*uth
)
2990 workq_threadreq_t req_qos
, req_pri
, req_tmp
, req_mgr
;
2991 uintptr_t proprietor
;
2992 thread_qos_t qos
= THREAD_QOS_UNSPECIFIED
;
2995 if (uth
== wq
->wq_creator
) {
3000 * Compute the best priority request (special or turnstile)
3003 pri
= (uint8_t)turnstile_workq_proprietor_of_max_turnstile(wq
->wq_turnstile
,
3006 struct kqworkloop
*kqwl
= (struct kqworkloop
*)proprietor
;
3007 req_pri
= &kqwl
->kqwl_request
;
3008 if (req_pri
->tr_state
!= WORKQ_TR_STATE_QUEUED
) {
3009 panic("Invalid thread request (%p) state %d",
3010 req_pri
, req_pri
->tr_state
);
3016 req_tmp
= priority_queue_max(&wq
->wq_special_queue
,
3017 struct workq_threadreq_s
, tr_entry
);
3018 if (req_tmp
&& pri
< priority_queue_entry_sched_pri(&wq
->wq_special_queue
,
3019 &req_tmp
->tr_entry
)) {
3021 pri
= (uint8_t)priority_queue_entry_sched_pri(&wq
->wq_special_queue
,
3022 &req_tmp
->tr_entry
);
3026 * Handle the manager thread request. The special queue might yield
3027 * a higher priority, but the manager always beats the QoS world.
3030 req_mgr
= wq
->wq_event_manager_threadreq
;
3031 if (req_mgr
&& workq_may_start_event_mgr_thread(wq
, uth
)) {
3032 uint32_t mgr_pri
= wq
->wq_event_manager_priority
;
3034 if (mgr_pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
3035 mgr_pri
&= _PTHREAD_PRIORITY_SCHED_PRI_MASK
;
3037 mgr_pri
= thread_workq_pri_for_qos(
3038 _pthread_priority_thread_qos(mgr_pri
));
3041 return mgr_pri
>= pri
? req_mgr
: req_pri
;
3045 * Compute the best QoS Request, and check whether it beats the "pri" one
3048 req_qos
= priority_queue_max(&wq
->wq_overcommit_queue
,
3049 struct workq_threadreq_s
, tr_entry
);
3051 qos
= req_qos
->tr_qos
;
3054 req_tmp
= priority_queue_max(&wq
->wq_constrained_queue
,
3055 struct workq_threadreq_s
, tr_entry
);
3057 if (req_tmp
&& qos
< req_tmp
->tr_qos
) {
3058 if (pri
&& pri
>= thread_workq_pri_for_qos(req_tmp
->tr_qos
)) {
3062 if (workq_constrained_allowance(wq
, req_tmp
->tr_qos
, uth
, true)) {
3064 * If the constrained thread request is the best one and passes
3065 * the admission check, pick it.
3071 if (req_pri
&& (!qos
|| pri
>= thread_workq_pri_for_qos(qos
))) {
3079 * The creator is an anonymous thread that is counted as scheduled,
3080 * but otherwise without its scheduler callback set or tracked as active
3081 * that is used to make other threads.
3083 * When more requests are added or an existing one is hurried along,
3084 * a creator is elected and setup, or the existing one overridden accordingly.
3086 * While this creator is in flight, because no request has been dequeued,
3087 * already running threads have a chance at stealing thread requests avoiding
3088 * useless context switches, and the creator once scheduled may not find any
3089 * work to do and will then just park again.
3091 * The creator serves the dual purpose of informing the scheduler of work that
3092 * hasn't be materialized as threads yet, and also as a natural pacing mechanism
3093 * for thread creation.
3095 * By being anonymous (and not bound to anything) it means that thread requests
3096 * can be stolen from this creator by threads already on core yielding more
3097 * efficient scheduling and reduced context switches.
3100 workq_schedule_creator(proc_t p
, struct workqueue
*wq
,
3101 workq_kern_threadreq_flags_t flags
)
3103 workq_threadreq_t req
;
3104 struct uthread
*uth
;
3107 workq_lock_held(wq
);
3108 assert(p
|| (flags
& WORKQ_THREADREQ_CAN_CREATE_THREADS
) == 0);
3111 uth
= wq
->wq_creator
;
3113 if (!wq
->wq_reqcount
) {
3115 * There is no thread request left.
3117 * If there is a creator, leave everything in place, so that it cleans
3118 * up itself in workq_push_idle_thread().
3120 * Else, make sure the turnstile state is reset to no inheritor.
3123 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
3128 req
= workq_threadreq_select_for_creator(wq
);
3131 * There isn't a thread request that passes the admission check.
3133 * If there is a creator, do not touch anything, the creator will sort
3134 * it out when it runs.
3136 * Else, set the inheritor to "WORKQ" so that the turnstile propagation
3137 * code calls us if anything changes.
3140 workq_turnstile_update_inheritor(wq
, wq
, TURNSTILE_INHERITOR_WORKQ
);
3147 * We need to maybe override the creator we already have
3149 if (workq_thread_needs_priority_change(req
, uth
)) {
3150 WQ_TRACE_WQ(TRACE_wq_creator_select
| DBG_FUNC_NONE
,
3151 wq
, 1, thread_tid(uth
->uu_thread
), req
->tr_qos
, 0);
3152 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
3154 assert(wq
->wq_inheritor
== uth
->uu_thread
);
3155 } else if (wq
->wq_thidlecount
) {
3157 * We need to unpark a creator thread
3159 wq
->wq_creator
= uth
= workq_pop_idle_thread(wq
, UT_WORKQ_OVERCOMMIT
,
3161 /* Always reset the priorities on the newly chosen creator */
3162 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
3163 workq_turnstile_update_inheritor(wq
, uth
->uu_thread
,
3164 TURNSTILE_INHERITOR_THREAD
);
3165 WQ_TRACE_WQ(TRACE_wq_creator_select
| DBG_FUNC_NONE
,
3166 wq
, 2, thread_tid(uth
->uu_thread
), req
->tr_qos
, 0);
3167 uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
= wq
->wq_fulfilled
;
3168 uth
->uu_save
.uus_workq_park_data
.yields
= 0;
3170 workq_thread_wakeup(uth
);
3174 * We need to allocate a thread...
3176 if (__improbable(wq
->wq_nthreads
>= wq_max_threads
)) {
3177 /* out of threads, just go away */
3178 flags
= WORKQ_THREADREQ_NONE
;
3179 } else if (flags
& WORKQ_THREADREQ_SET_AST_ON_FAILURE
) {
3180 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ
);
3181 } else if (!(flags
& WORKQ_THREADREQ_CAN_CREATE_THREADS
)) {
3182 /* This can drop the workqueue lock, and take it again */
3183 workq_schedule_immediate_thread_creation(wq
);
3184 } else if (workq_add_new_idle_thread(p
, wq
)) {
3187 workq_schedule_delayed_thread_creation(wq
, 0);
3191 * If the current thread is the inheritor:
3193 * If we set the AST, then the thread will stay the inheritor until
3194 * either the AST calls workq_kern_threadreq_redrive(), or it parks
3195 * and calls workq_push_idle_thread().
3197 * Else, the responsibility of the thread creation is with a thread-call
3198 * and we need to clear the inheritor.
3200 if ((flags
& WORKQ_THREADREQ_SET_AST_ON_FAILURE
) == 0 &&
3201 wq
->wq_inheritor
== current_thread()) {
3202 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
3208 * Same as workq_unpark_select_threadreq_or_park_and_unlock,
3209 * but do not allow early binds.
3211 * Called with the base pri frozen, will unfreeze it.
3213 __attribute__((noreturn
, noinline
))
3215 workq_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
3216 struct uthread
*uth
, uint32_t setup_flags
)
3218 workq_threadreq_t req
= NULL
;
3219 bool is_creator
= (wq
->wq_creator
== uth
);
3220 bool schedule_creator
= false;
3222 if (__improbable(_wq_exiting(wq
))) {
3223 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
3227 if (wq
->wq_reqcount
== 0) {
3228 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 1, 0, 0, 0);
3232 req
= workq_threadreq_select(wq
, uth
);
3233 if (__improbable(req
== NULL
)) {
3234 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 2, 0, 0, 0);
3238 uint8_t tr_flags
= req
->tr_flags
;
3239 struct turnstile
*req_ts
= kqueue_threadreq_get_turnstile(req
);
3242 * Attempt to setup ourselves as the new thing to run, moving all priority
3243 * pushes to ourselves.
3245 * If the current thread is the creator, then the fact that we are presently
3246 * running is proof that we'll do something useful, so keep going.
3248 * For other cases, peek at the AST to know whether the scheduler wants
3249 * to preempt us, if yes, park instead, and move the thread request
3250 * turnstile back to the workqueue.
3253 workq_perform_turnstile_operation_locked(wq
, ^{
3254 turnstile_update_inheritor(req_ts
, uth
->uu_thread
,
3255 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_THREAD
);
3256 turnstile_update_inheritor_complete(req_ts
,
3257 TURNSTILE_INTERLOCK_HELD
);
3262 WQ_TRACE_WQ(TRACE_wq_creator_select
, wq
, 4, 0,
3263 uth
->uu_save
.uus_workq_park_data
.yields
, 0);
3264 wq
->wq_creator
= NULL
;
3265 _wq_thactive_inc(wq
, req
->tr_qos
);
3266 wq
->wq_thscheduled_count
[_wq_bucket(req
->tr_qos
)]++;
3267 } else if (uth
->uu_workq_pri
.qos_bucket
!= req
->tr_qos
) {
3268 _wq_thactive_move(wq
, uth
->uu_workq_pri
.qos_bucket
, req
->tr_qos
);
3271 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
3273 thread_unfreeze_base_pri(uth
->uu_thread
);
3274 #if 0 // <rdar://problem/55259863> to turn this back on
3275 if (__improbable(thread_unfreeze_base_pri(uth
->uu_thread
) && !is_creator
)) {
3277 workq_perform_turnstile_operation_locked(wq
, ^{
3278 turnstile_update_inheritor(req_ts
, wq
->wq_turnstile
,
3279 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_TURNSTILE
);
3280 turnstile_update_inheritor_complete(req_ts
,
3281 TURNSTILE_INTERLOCK_HELD
);
3284 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 3, 0, 0, 0);
3290 * We passed all checks, dequeue the request, bind to it, and set it up
3291 * to return to user.
3293 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
3294 workq_trace_req_id(req
), 0, 0, 0);
3296 schedule_creator
= workq_threadreq_dequeue(wq
, req
);
3298 if (tr_flags
& (WORKQ_TR_FLAG_KEVENT
| WORKQ_TR_FLAG_WORKLOOP
)) {
3299 kqueue_threadreq_bind_prepost(p
, req
, uth
);
3301 } else if (req
->tr_count
> 0) {
3305 workq_thread_reset_cpupercent(req
, uth
);
3306 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
3307 uth
->uu_workq_flags
^= UT_WORKQ_NEW
;
3308 setup_flags
|= WQ_SETUP_FIRST_USE
;
3310 if (tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
3311 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
3312 uth
->uu_workq_flags
|= UT_WORKQ_OVERCOMMIT
;
3313 wq
->wq_constrained_threads_scheduled
--;
3316 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) != 0) {
3317 uth
->uu_workq_flags
&= ~UT_WORKQ_OVERCOMMIT
;
3318 wq
->wq_constrained_threads_scheduled
++;
3322 if (is_creator
|| schedule_creator
) {
3323 /* This can drop the workqueue lock, and take it again */
3324 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
3330 zfree(workq_zone_threadreq
, req
);
3336 uint32_t upcall_flags
= WQ_FLAG_THREAD_NEWSPI
;
3337 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
3338 upcall_flags
|= WQ_FLAG_THREAD_EVENT_MANAGER
;
3339 } else if (tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
3340 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
3342 if (tr_flags
& WORKQ_TR_FLAG_KEVENT
) {
3343 upcall_flags
|= WQ_FLAG_THREAD_KEVENT
;
3345 if (tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
3346 upcall_flags
|= WQ_FLAG_THREAD_WORKLOOP
| WQ_FLAG_THREAD_KEVENT
;
3348 uth
->uu_save
.uus_workq_park_data
.upcall_flags
= upcall_flags
;
3350 if (tr_flags
& (WORKQ_TR_FLAG_KEVENT
| WORKQ_TR_FLAG_WORKLOOP
)) {
3351 kqueue_threadreq_bind_commit(p
, uth
->uu_thread
);
3353 workq_setup_and_run(p
, uth
, setup_flags
);
3354 __builtin_unreachable();
3357 thread_unfreeze_base_pri(uth
->uu_thread
);
3358 #if 0 // <rdar://problem/55259863>
3361 workq_park_and_unlock(p
, wq
, uth
, setup_flags
);
3365 * Runs a thread request on a thread
3367 * - if thread is THREAD_NULL, will find a thread and run the request there.
3368 * Otherwise, the thread must be the current thread.
3370 * - if req is NULL, will find the highest priority request and run that. If
3371 * it is not NULL, it must be a threadreq object in state NEW. If it can not
3372 * be run immediately, it will be enqueued and moved to state QUEUED.
3374 * Either way, the thread request object serviced will be moved to state
3375 * BINDING and attached to the uthread.
3377 * Should be called with the workqueue lock held. Will drop it.
3378 * Should be called with the base pri not frozen.
3380 __attribute__((noreturn
, noinline
))
3382 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
3383 struct uthread
*uth
, uint32_t setup_flags
)
3385 if (uth
->uu_workq_flags
& UT_WORKQ_EARLY_BOUND
) {
3386 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
3387 setup_flags
|= WQ_SETUP_FIRST_USE
;
3389 uth
->uu_workq_flags
&= ~(UT_WORKQ_NEW
| UT_WORKQ_EARLY_BOUND
);
3391 * This pointer is possibly freed and only used for tracing purposes.
3393 workq_threadreq_t req
= uth
->uu_save
.uus_workq_park_data
.thread_request
;
3395 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
3396 VM_KERNEL_ADDRHIDE(req
), 0, 0, 0);
3398 workq_setup_and_run(p
, uth
, setup_flags
);
3399 __builtin_unreachable();
3402 thread_freeze_base_pri(uth
->uu_thread
);
3403 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
, setup_flags
);
3407 workq_creator_should_yield(struct workqueue
*wq
, struct uthread
*uth
)
3409 thread_qos_t qos
= workq_pri_override(uth
->uu_workq_pri
);
3411 if (qos
>= THREAD_QOS_USER_INTERACTIVE
) {
3415 uint32_t snapshot
= uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
;
3416 if (wq
->wq_fulfilled
== snapshot
) {
3420 uint32_t cnt
= 0, conc
= wq_max_parallelism
[_wq_bucket(qos
)];
3421 if (wq
->wq_fulfilled
- snapshot
> conc
) {
3422 /* we fulfilled more than NCPU requests since being dispatched */
3423 WQ_TRACE_WQ(TRACE_wq_creator_yield
, wq
, 1,
3424 wq
->wq_fulfilled
, snapshot
, 0);
3428 for (int i
= _wq_bucket(qos
); i
< WORKQ_NUM_QOS_BUCKETS
; i
++) {
3429 cnt
+= wq
->wq_thscheduled_count
[i
];
3432 /* We fulfilled requests and have more than NCPU scheduled threads */
3433 WQ_TRACE_WQ(TRACE_wq_creator_yield
, wq
, 2,
3434 wq
->wq_fulfilled
, snapshot
, 0);
3442 * parked thread wakes up
3444 __attribute__((noreturn
, noinline
))
3446 workq_unpark_continue(void *parameter __unused
, wait_result_t wr __unused
)
3448 thread_t th
= current_thread();
3449 struct uthread
*uth
= get_bsdthread_info(th
);
3450 proc_t p
= current_proc();
3451 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
3453 workq_lock_spin(wq
);
3455 if (wq
->wq_creator
== uth
&& workq_creator_should_yield(wq
, uth
)) {
3457 * If the number of threads we have out are able to keep up with the
3458 * demand, then we should avoid sending this creator thread to
3461 uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
= wq
->wq_fulfilled
;
3462 uth
->uu_save
.uus_workq_park_data
.yields
++;
3464 thread_yield_with_continuation(workq_unpark_continue
, NULL
);
3465 __builtin_unreachable();
3468 if (__probable(uth
->uu_workq_flags
& UT_WORKQ_RUNNING
)) {
3469 workq_unpark_select_threadreq_or_park_and_unlock(p
, wq
, uth
, WQ_SETUP_NONE
);
3470 __builtin_unreachable();
3473 if (__probable(wr
== THREAD_AWAKENED
)) {
3475 * We were set running, but for the purposes of dying.
3477 assert(uth
->uu_workq_flags
& UT_WORKQ_DYING
);
3478 assert((uth
->uu_workq_flags
& UT_WORKQ_NEW
) == 0);
3481 * workaround for <rdar://problem/38647347>,
3482 * in case we do hit userspace, make sure calling
3483 * workq_thread_terminate() does the right thing here,
3484 * and if we never call it, that workq_exit() will too because it sees
3485 * this thread on the runlist.
3487 assert(wr
== THREAD_INTERRUPTED
);
3488 wq
->wq_thdying_count
++;
3489 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
3492 workq_unpark_for_death_and_unlock(p
, wq
, uth
,
3493 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
, WQ_SETUP_NONE
);
3494 __builtin_unreachable();
3497 __attribute__((noreturn
, noinline
))
3499 workq_setup_and_run(proc_t p
, struct uthread
*uth
, int setup_flags
)
3501 thread_t th
= uth
->uu_thread
;
3502 vm_map_t vmap
= get_task_map(p
->task
);
3504 if (setup_flags
& WQ_SETUP_CLEAR_VOUCHER
) {
3506 * For preemption reasons, we want to reset the voucher as late as
3507 * possible, so we do it in two places:
3508 * - Just before parking (i.e. in workq_park_and_unlock())
3509 * - Prior to doing the setup for the next workitem (i.e. here)
3511 * Those two places are sufficient to ensure we always reset it before
3512 * it goes back out to user space, but be careful to not break that
3515 __assert_only kern_return_t kr
;
3516 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
3517 assert(kr
== KERN_SUCCESS
);
3520 uint32_t upcall_flags
= uth
->uu_save
.uus_workq_park_data
.upcall_flags
;
3521 if (!(setup_flags
& WQ_SETUP_FIRST_USE
)) {
3522 upcall_flags
|= WQ_FLAG_THREAD_REUSE
;
3525 if (uth
->uu_workq_flags
& UT_WORKQ_OUTSIDE_QOS
) {
3527 * For threads that have an outside-of-QoS thread priority, indicate
3528 * to userspace that setting QoS should only affect the TSD and not
3529 * change QOS in the kernel.
3531 upcall_flags
|= WQ_FLAG_THREAD_OUTSIDEQOS
;
3534 * Put the QoS class value into the lower bits of the reuse_thread
3535 * register, this is where the thread priority used to be stored
3538 upcall_flags
|= uth
->uu_save
.uus_workq_park_data
.qos
|
3539 WQ_FLAG_THREAD_PRIO_QOS
;
3542 if (uth
->uu_workq_thport
== MACH_PORT_NULL
) {
3543 /* convert_thread_to_port() consumes a reference */
3544 thread_reference(th
);
3545 ipc_port_t port
= convert_thread_to_port(th
);
3546 uth
->uu_workq_thport
= ipc_port_copyout_send(port
, get_task_ipcspace(p
->task
));
3550 * Call out to pthread, this sets up the thread, pulls in kevent structs
3551 * onto the stack, sets up the thread state and then returns to userspace.
3553 WQ_TRACE_WQ(TRACE_wq_runthread
| DBG_FUNC_START
,
3554 proc_get_wqptr_fast(p
), 0, 0, 0, 0);
3555 thread_sched_call(th
, workq_sched_callback
);
3556 pthread_functions
->workq_setup_thread(p
, th
, vmap
, uth
->uu_workq_stackaddr
,
3557 uth
->uu_workq_thport
, 0, setup_flags
, upcall_flags
);
3559 __builtin_unreachable();
3565 fill_procworkqueue(proc_t p
, struct proc_workqueueinfo
* pwqinfo
)
3567 struct workqueue
*wq
= proc_get_wqptr(p
);
3576 * This is sometimes called from interrupt context by the kperf sampler.
3577 * In that case, it's not safe to spin trying to take the lock since we
3578 * might already hold it. So, we just try-lock it and error out if it's
3579 * already held. Since this is just a debugging aid, and all our callers
3580 * are able to handle an error, that's fine.
3582 bool locked
= workq_lock_try(wq
);
3587 wq_thactive_t act
= _wq_thactive(wq
);
3588 activecount
= _wq_thactive_aggregate_downto_qos(wq
, act
,
3589 WORKQ_THREAD_QOS_MIN
, NULL
, NULL
);
3590 if (act
& _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER
)) {
3593 pwqinfo
->pwq_nthreads
= wq
->wq_nthreads
;
3594 pwqinfo
->pwq_runthreads
= activecount
;
3595 pwqinfo
->pwq_blockedthreads
= wq
->wq_threads_scheduled
- activecount
;
3596 pwqinfo
->pwq_state
= 0;
3598 if (wq
->wq_constrained_threads_scheduled
>= wq_max_constrained_threads
) {
3599 pwqinfo
->pwq_state
|= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
;
3602 if (wq
->wq_nthreads
>= wq_max_threads
) {
3603 pwqinfo
->pwq_state
|= WQ_EXCEEDED_TOTAL_THREAD_LIMIT
;
3611 workqueue_get_pwq_exceeded(void *v
, boolean_t
*exceeded_total
,
3612 boolean_t
*exceeded_constrained
)
3615 struct proc_workqueueinfo pwqinfo
;
3619 assert(exceeded_total
!= NULL
);
3620 assert(exceeded_constrained
!= NULL
);
3622 err
= fill_procworkqueue(p
, &pwqinfo
);
3626 if (!(pwqinfo
.pwq_state
& WQ_FLAGS_AVAILABLE
)) {
3630 *exceeded_total
= (pwqinfo
.pwq_state
& WQ_EXCEEDED_TOTAL_THREAD_LIMIT
);
3631 *exceeded_constrained
= (pwqinfo
.pwq_state
& WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
);
3637 workqueue_get_pwq_state_kdp(void * v
)
3639 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
<< 17) ==
3640 kTaskWqExceededConstrainedThreadLimit
);
3641 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT
<< 17) ==
3642 kTaskWqExceededTotalThreadLimit
);
3643 static_assert((WQ_FLAGS_AVAILABLE
<< 17) == kTaskWqFlagsAvailable
);
3644 static_assert((WQ_FLAGS_AVAILABLE
| WQ_EXCEEDED_TOTAL_THREAD_LIMIT
|
3645 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
) == 0x7);
3652 struct workqueue
*wq
= proc_get_wqptr(p
);
3654 if (wq
== NULL
|| workq_lock_spin_is_acquired_kdp(wq
)) {
3658 uint32_t pwq_state
= WQ_FLAGS_AVAILABLE
;
3660 if (wq
->wq_constrained_threads_scheduled
>= wq_max_constrained_threads
) {
3661 pwq_state
|= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
;
3664 if (wq
->wq_nthreads
>= wq_max_threads
) {
3665 pwq_state
|= WQ_EXCEEDED_TOTAL_THREAD_LIMIT
;
3674 clock_interval_to_absolutetime_interval(wq_stalled_window
.usecs
,
3675 NSEC_PER_USEC
, &wq_stalled_window
.abstime
);
3676 clock_interval_to_absolutetime_interval(wq_reduce_pool_window
.usecs
,
3677 NSEC_PER_USEC
, &wq_reduce_pool_window
.abstime
);
3678 clock_interval_to_absolutetime_interval(wq_max_timer_interval
.usecs
,
3679 NSEC_PER_USEC
, &wq_max_timer_interval
.abstime
);
3681 thread_deallocate_daemon_register_queue(&workq_deallocate_queue
,
3682 workq_deallocate_queue_invoke
);