2 * Copyright (c) 2000-2017 Apple Inc. All rights reserved.
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
30 #include <sys/cdefs.h>
32 #include <kern/assert.h>
34 #include <kern/clock.h>
35 #include <kern/cpu_data.h>
36 #include <kern/kern_types.h>
37 #include <kern/policy_internal.h>
38 #include <kern/processor.h>
39 #include <kern/sched_prim.h> /* for thread_exception_return */
40 #include <kern/task.h>
41 #include <kern/thread.h>
42 #include <kern/zalloc.h>
43 #include <mach/kern_return.h>
44 #include <mach/mach_param.h>
45 #include <mach/mach_port.h>
46 #include <mach/mach_types.h>
47 #include <mach/mach_vm.h>
48 #include <mach/sync_policy.h>
49 #include <mach/task.h>
50 #include <mach/thread_act.h> /* for thread_resume */
51 #include <mach/thread_policy.h>
52 #include <mach/thread_status.h>
53 #include <mach/vm_prot.h>
54 #include <mach/vm_statistics.h>
55 #include <machine/atomic.h>
56 #include <machine/machine_routines.h>
57 #include <vm/vm_map.h>
58 #include <vm/vm_protos.h>
60 #include <sys/eventvar.h>
61 #include <sys/kdebug.h>
62 #include <sys/kernel.h>
64 #include <sys/param.h>
65 #include <sys/proc_info.h> /* for fill_procworkqueue */
66 #include <sys/proc_internal.h>
67 #include <sys/pthread_shims.h>
68 #include <sys/resourcevar.h>
69 #include <sys/signalvar.h>
70 #include <sys/sysctl.h>
71 #include <sys/sysproto.h>
72 #include <sys/systm.h>
73 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
75 #include <pthread/bsdthread_private.h>
76 #include <pthread/workqueue_syscalls.h>
77 #include <pthread/workqueue_internal.h>
78 #include <pthread/workqueue_trace.h>
82 static void workq_unpark_continue(void *uth
, wait_result_t wr
) __dead2
;
83 static void workq_schedule_creator(proc_t p
, struct workqueue
*wq
,
84 workq_kern_threadreq_flags_t flags
);
86 static bool workq_threadreq_admissible(struct workqueue
*wq
, struct uthread
*uth
,
87 workq_threadreq_t req
);
89 static uint32_t workq_constrained_allowance(struct workqueue
*wq
,
90 thread_qos_t at_qos
, struct uthread
*uth
, bool may_start_timer
);
92 static bool workq_thread_is_busy(uint64_t cur_ts
,
93 _Atomic
uint64_t *lastblocked_tsp
);
95 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
;
99 struct workq_usec_var
{
104 #define WORKQ_SYSCTL_USECS(var, init) \
105 static struct workq_usec_var var = { .usecs = init }; \
106 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
107 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
108 workq_sysctl_handle_usecs, "I", "")
110 static lck_grp_t
*workq_lck_grp
;
111 static lck_attr_t
*workq_lck_attr
;
112 static lck_grp_attr_t
*workq_lck_grp_attr
;
113 os_refgrp_decl(static, workq_refgrp
, "workq", NULL
);
115 static struct mpsc_daemon_queue workq_deallocate_queue
;
116 static zone_t workq_zone_workqueue
;
117 static zone_t workq_zone_threadreq
;
119 WORKQ_SYSCTL_USECS(wq_stalled_window
, WQ_STALLED_WINDOW_USECS
);
120 WORKQ_SYSCTL_USECS(wq_reduce_pool_window
, WQ_REDUCE_POOL_WINDOW_USECS
);
121 WORKQ_SYSCTL_USECS(wq_max_timer_interval
, WQ_MAX_TIMER_INTERVAL_USECS
);
122 static uint32_t wq_max_threads
= WORKQUEUE_MAXTHREADS
;
123 static uint32_t wq_max_constrained_threads
= WORKQUEUE_MAXTHREADS
/ 8;
124 static uint32_t wq_init_constrained_limit
= 1;
125 static uint16_t wq_death_max_load
;
126 static uint32_t wq_max_parallelism
[WORKQ_NUM_QOS_BUCKETS
];
131 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
134 struct workq_usec_var
*v
= arg1
;
135 int error
= sysctl_handle_int(oidp
, &v
->usecs
, 0, req
);
136 if (error
|| !req
->newptr
) {
139 clock_interval_to_absolutetime_interval(v
->usecs
, NSEC_PER_USEC
,
144 SYSCTL_INT(_kern
, OID_AUTO
, wq_max_threads
, CTLFLAG_RW
| CTLFLAG_LOCKED
,
145 &wq_max_threads
, 0, "");
147 SYSCTL_INT(_kern
, OID_AUTO
, wq_max_constrained_threads
, CTLFLAG_RW
| CTLFLAG_LOCKED
,
148 &wq_max_constrained_threads
, 0, "");
152 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
154 static struct workqueue
*
155 proc_get_wqptr_fast(struct proc
*p
)
157 return os_atomic_load(&p
->p_wqptr
, relaxed
);
160 static struct workqueue
*
161 proc_get_wqptr(struct proc
*p
)
163 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
164 return wq
== WQPTR_IS_INITING_VALUE
? NULL
: wq
;
168 proc_set_wqptr(struct proc
*p
, struct workqueue
*wq
)
170 wq
= os_atomic_xchg(&p
->p_wqptr
, wq
, release
);
171 if (wq
== WQPTR_IS_INITING_VALUE
) {
173 thread_wakeup(&p
->p_wqptr
);
179 proc_init_wqptr_or_wait(struct proc
*p
)
181 struct workqueue
*wq
;
184 wq
= os_atomic_load(&p
->p_wqptr
, relaxed
);
187 os_atomic_store(&p
->p_wqptr
, WQPTR_IS_INITING_VALUE
, relaxed
);
192 if (wq
== WQPTR_IS_INITING_VALUE
) {
193 assert_wait(&p
->p_wqptr
, THREAD_UNINT
);
195 thread_block(THREAD_CONTINUE_NULL
);
202 static inline event_t
203 workq_parked_wait_event(struct uthread
*uth
)
205 return (event_t
)&uth
->uu_workq_stackaddr
;
209 workq_thread_wakeup(struct uthread
*uth
)
211 thread_wakeup_thread(workq_parked_wait_event(uth
), uth
->uu_thread
);
214 #pragma mark wq_thactive
216 #if defined(__LP64__)
218 // 127 - 115 : 13 bits of zeroes
219 // 114 - 112 : best QoS among all pending constrained requests
220 // 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
221 #define WQ_THACTIVE_BUCKET_WIDTH 16
222 #define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH)
225 // 63 - 61 : best QoS among all pending constrained requests
226 // 60 : Manager bucket (0 or 1)
227 // 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
228 #define WQ_THACTIVE_BUCKET_WIDTH 10
229 #define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
231 #define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
232 #define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
234 static_assert(sizeof(wq_thactive_t
) * CHAR_BIT
- WQ_THACTIVE_QOS_SHIFT
>= 3,
235 "Make sure we have space to encode a QoS");
237 static inline wq_thactive_t
238 _wq_thactive(struct workqueue
*wq
)
240 return os_atomic_load_wide(&wq
->wq_thactive
, relaxed
);
244 _wq_bucket(thread_qos_t qos
)
246 // Map both BG and MT to the same bucket by over-shifting down and
247 // clamping MT and BG together.
249 case THREAD_QOS_MAINTENANCE
:
256 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
257 ((tha) >> WQ_THACTIVE_QOS_SHIFT)
259 static inline thread_qos_t
260 _wq_thactive_best_constrained_req_qos(struct workqueue
*wq
)
262 // Avoid expensive atomic operations: the three bits we're loading are in
263 // a single byte, and always updated under the workqueue lock
264 wq_thactive_t v
= *(wq_thactive_t
*)&wq
->wq_thactive
;
265 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v
);
269 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue
*wq
)
271 thread_qos_t old_qos
, new_qos
;
272 workq_threadreq_t req
;
274 req
= priority_queue_max(&wq
->wq_constrained_queue
,
275 struct workq_threadreq_s
, tr_entry
);
276 new_qos
= req
? req
->tr_qos
: THREAD_QOS_UNSPECIFIED
;
277 old_qos
= _wq_thactive_best_constrained_req_qos(wq
);
278 if (old_qos
!= new_qos
) {
279 long delta
= (long)new_qos
- (long)old_qos
;
280 wq_thactive_t v
= (wq_thactive_t
)delta
<< WQ_THACTIVE_QOS_SHIFT
;
282 * We can do an atomic add relative to the initial load because updates
283 * to this qos are always serialized under the workqueue lock.
285 v
= os_atomic_add(&wq
->wq_thactive
, v
, relaxed
);
287 WQ_TRACE_WQ(TRACE_wq_thactive_update
, wq
, (uint64_t)v
,
288 (uint64_t)(v
>> 64), 0, 0);
290 WQ_TRACE_WQ(TRACE_wq_thactive_update
, wq
, v
, 0, 0, 0);
295 static inline wq_thactive_t
296 _wq_thactive_offset_for_qos(thread_qos_t qos
)
298 return (wq_thactive_t
)1 << (_wq_bucket(qos
) * WQ_THACTIVE_BUCKET_WIDTH
);
301 static inline wq_thactive_t
302 _wq_thactive_inc(struct workqueue
*wq
, thread_qos_t qos
)
304 wq_thactive_t v
= _wq_thactive_offset_for_qos(qos
);
305 return os_atomic_add_orig(&wq
->wq_thactive
, v
, relaxed
);
308 static inline wq_thactive_t
309 _wq_thactive_dec(struct workqueue
*wq
, thread_qos_t qos
)
311 wq_thactive_t v
= _wq_thactive_offset_for_qos(qos
);
312 return os_atomic_sub_orig(&wq
->wq_thactive
, v
, relaxed
);
316 _wq_thactive_move(struct workqueue
*wq
,
317 thread_qos_t old_qos
, thread_qos_t new_qos
)
319 wq_thactive_t v
= _wq_thactive_offset_for_qos(new_qos
) -
320 _wq_thactive_offset_for_qos(old_qos
);
321 os_atomic_add(&wq
->wq_thactive
, v
, relaxed
);
322 wq
->wq_thscheduled_count
[_wq_bucket(old_qos
)]--;
323 wq
->wq_thscheduled_count
[_wq_bucket(new_qos
)]++;
326 static inline uint32_t
327 _wq_thactive_aggregate_downto_qos(struct workqueue
*wq
, wq_thactive_t v
,
328 thread_qos_t qos
, uint32_t *busycount
, uint32_t *max_busycount
)
330 uint32_t count
= 0, active
;
333 assert(WORKQ_THREAD_QOS_MIN
<= qos
&& qos
<= WORKQ_THREAD_QOS_MAX
);
336 curtime
= mach_absolute_time();
340 *max_busycount
= THREAD_QOS_LAST
- qos
;
343 int i
= _wq_bucket(qos
);
344 v
>>= i
* WQ_THACTIVE_BUCKET_WIDTH
;
345 for (; i
< WORKQ_NUM_QOS_BUCKETS
; i
++, v
>>= WQ_THACTIVE_BUCKET_WIDTH
) {
346 active
= v
& WQ_THACTIVE_BUCKET_MASK
;
349 if (busycount
&& wq
->wq_thscheduled_count
[i
] > active
) {
350 if (workq_thread_is_busy(curtime
, &wq
->wq_lastblocked_ts
[i
])) {
352 * We only consider the last blocked thread for a given bucket
353 * as busy because we don't want to take the list lock in each
354 * sched callback. However this is an approximation that could
355 * contribute to thread creation storms.
365 #pragma mark wq_flags
367 static inline uint32_t
368 _wq_flags(struct workqueue
*wq
)
370 return os_atomic_load(&wq
->wq_flags
, relaxed
);
374 _wq_exiting(struct workqueue
*wq
)
376 return _wq_flags(wq
) & WQ_EXITING
;
380 workq_is_exiting(struct proc
*p
)
382 struct workqueue
*wq
= proc_get_wqptr(p
);
383 return !wq
|| _wq_exiting(wq
);
386 #pragma mark workqueue lock
389 workq_lock_spin_is_acquired_kdp(struct workqueue
*wq
)
391 return kdp_lck_spin_is_acquired(&wq
->wq_lock
);
395 workq_lock_spin(struct workqueue
*wq
)
397 lck_spin_lock_grp(&wq
->wq_lock
, workq_lck_grp
);
401 workq_lock_held(__assert_only
struct workqueue
*wq
)
403 LCK_SPIN_ASSERT(&wq
->wq_lock
, LCK_ASSERT_OWNED
);
407 workq_lock_try(struct workqueue
*wq
)
409 return lck_spin_try_lock_grp(&wq
->wq_lock
, workq_lck_grp
);
413 workq_unlock(struct workqueue
*wq
)
415 lck_spin_unlock(&wq
->wq_lock
);
418 #pragma mark idle thread lists
420 #define WORKQ_POLICY_INIT(qos) \
421 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
423 static inline thread_qos_t
424 workq_pri_bucket(struct uu_workq_policy req
)
426 return MAX(MAX(req
.qos_req
, req
.qos_max
), req
.qos_override
);
429 static inline thread_qos_t
430 workq_pri_override(struct uu_workq_policy req
)
432 return MAX(workq_pri_bucket(req
), req
.qos_bucket
);
436 workq_thread_needs_params_change(workq_threadreq_t req
, struct uthread
*uth
)
438 workq_threadreq_param_t cur_trp
, req_trp
= { };
440 cur_trp
.trp_value
= uth
->uu_save
.uus_workq_park_data
.workloop_params
;
441 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
) {
442 req_trp
= kqueue_threadreq_workloop_param(req
);
446 * CPU percent flags are handled separately to policy changes, so ignore
447 * them for all of these checks.
449 uint16_t cur_flags
= (cur_trp
.trp_flags
& ~TRP_CPUPERCENT
);
450 uint16_t req_flags
= (req_trp
.trp_flags
& ~TRP_CPUPERCENT
);
452 if (!req_flags
&& !cur_flags
) {
456 if (req_flags
!= cur_flags
) {
460 if ((req_flags
& TRP_PRIORITY
) && req_trp
.trp_pri
!= cur_trp
.trp_pri
) {
464 if ((req_flags
& TRP_POLICY
) && req_trp
.trp_pol
!= cur_trp
.trp_pol
) {
472 workq_thread_needs_priority_change(workq_threadreq_t req
, struct uthread
*uth
)
474 if (workq_thread_needs_params_change(req
, uth
)) {
478 return req
->tr_qos
!= workq_pri_override(uth
->uu_workq_pri
);
482 workq_thread_update_bucket(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
483 struct uu_workq_policy old_pri
, struct uu_workq_policy new_pri
,
486 thread_qos_t old_bucket
= old_pri
.qos_bucket
;
487 thread_qos_t new_bucket
= workq_pri_bucket(new_pri
);
489 if (old_bucket
!= new_bucket
) {
490 _wq_thactive_move(wq
, old_bucket
, new_bucket
);
493 new_pri
.qos_bucket
= new_bucket
;
494 uth
->uu_workq_pri
= new_pri
;
496 if (workq_pri_override(old_pri
) != new_bucket
) {
497 thread_set_workq_override(uth
->uu_thread
, new_bucket
);
500 if (wq
->wq_reqcount
&& (old_bucket
> new_bucket
|| force_run
)) {
501 int flags
= WORKQ_THREADREQ_CAN_CREATE_THREADS
;
502 if (old_bucket
> new_bucket
) {
504 * When lowering our bucket, we may unblock a thread request,
505 * but we can't drop our priority before we have evaluated
506 * whether this is the case, and if we ever drop the workqueue lock
507 * that would cause a priority inversion.
509 * We hence have to disallow thread creation in that case.
513 workq_schedule_creator(p
, wq
, flags
);
518 * Sets/resets the cpu percent limits on the current thread. We can't set
519 * these limits from outside of the current thread, so this function needs
520 * to be called when we're executing on the intended
523 workq_thread_reset_cpupercent(workq_threadreq_t req
, struct uthread
*uth
)
525 assert(uth
== current_uthread());
526 workq_threadreq_param_t trp
= { };
528 if (req
&& (req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
)) {
529 trp
= kqueue_threadreq_workloop_param(req
);
532 if (uth
->uu_workq_flags
& UT_WORKQ_CPUPERCENT
) {
534 * Going through disable when we have an existing CPU percent limit
535 * set will force the ledger to refill the token bucket of the current
536 * thread. Removing any penalty applied by previous thread use.
538 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE
, 0, 0);
539 uth
->uu_workq_flags
&= ~UT_WORKQ_CPUPERCENT
;
542 if (trp
.trp_flags
& TRP_CPUPERCENT
) {
543 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK
, trp
.trp_cpupercent
,
544 (uint64_t)trp
.trp_refillms
* NSEC_PER_SEC
);
545 uth
->uu_workq_flags
|= UT_WORKQ_CPUPERCENT
;
550 workq_thread_reset_pri(struct workqueue
*wq
, struct uthread
*uth
,
551 workq_threadreq_t req
, bool unpark
)
553 thread_t th
= uth
->uu_thread
;
554 thread_qos_t qos
= req
? req
->tr_qos
: WORKQ_THREAD_QOS_CLEANUP
;
555 workq_threadreq_param_t trp
= { };
557 int policy
= POLICY_TIMESHARE
;
559 if (req
&& (req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
)) {
560 trp
= kqueue_threadreq_workloop_param(req
);
563 uth
->uu_workq_pri
= WORKQ_POLICY_INIT(qos
);
564 uth
->uu_workq_flags
&= ~UT_WORKQ_OUTSIDE_QOS
;
567 uth
->uu_save
.uus_workq_park_data
.workloop_params
= trp
.trp_value
;
568 // qos sent out to userspace (may differ from uu_workq_pri on param threads)
569 uth
->uu_save
.uus_workq_park_data
.qos
= qos
;
572 if (qos
== WORKQ_THREAD_QOS_MANAGER
) {
573 uint32_t mgr_pri
= wq
->wq_event_manager_priority
;
574 assert(trp
.trp_value
== 0); // manager qos and thread policy don't mix
576 if (mgr_pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
577 mgr_pri
&= _PTHREAD_PRIORITY_SCHED_PRI_MASK
;
578 thread_set_workq_pri(th
, THREAD_QOS_UNSPECIFIED
, mgr_pri
,
583 qos
= _pthread_priority_thread_qos(mgr_pri
);
585 if (trp
.trp_flags
& TRP_PRIORITY
) {
586 qos
= THREAD_QOS_UNSPECIFIED
;
587 priority
= trp
.trp_pri
;
588 uth
->uu_workq_flags
|= UT_WORKQ_OUTSIDE_QOS
;
591 if (trp
.trp_flags
& TRP_POLICY
) {
592 policy
= trp
.trp_pol
;
596 thread_set_workq_pri(th
, qos
, priority
, policy
);
600 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
601 * every time a servicer is being told about a new max QoS.
604 workq_thread_set_max_qos(struct proc
*p
, workq_threadreq_t kqr
)
606 struct uu_workq_policy old_pri
, new_pri
;
607 struct uthread
*uth
= current_uthread();
608 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
609 thread_qos_t qos
= kqr
->tr_kq_qos_index
;
611 if (uth
->uu_workq_pri
.qos_max
== qos
) {
616 old_pri
= new_pri
= uth
->uu_workq_pri
;
617 new_pri
.qos_max
= qos
;
618 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
622 #pragma mark idle threads accounting and handling
624 static inline struct uthread
*
625 workq_oldest_killable_idle_thread(struct workqueue
*wq
)
627 struct uthread
*uth
= TAILQ_LAST(&wq
->wq_thidlelist
, workq_uthread_head
);
629 if (uth
&& !uth
->uu_save
.uus_workq_park_data
.has_stack
) {
630 uth
= TAILQ_PREV(uth
, workq_uthread_head
, uu_workq_entry
);
632 assert(uth
->uu_save
.uus_workq_park_data
.has_stack
);
638 static inline uint64_t
639 workq_kill_delay_for_idle_thread(struct workqueue
*wq
)
641 uint64_t delay
= wq_reduce_pool_window
.abstime
;
642 uint16_t idle
= wq
->wq_thidlecount
;
645 * If we have less than wq_death_max_load threads, have a 5s timer.
647 * For the next wq_max_constrained_threads ones, decay linearly from
650 if (idle
<= wq_death_max_load
) {
654 if (wq_max_constrained_threads
> idle
- wq_death_max_load
) {
655 delay
*= (wq_max_constrained_threads
- (idle
- wq_death_max_load
));
657 return delay
/ wq_max_constrained_threads
;
661 workq_should_kill_idle_thread(struct workqueue
*wq
, struct uthread
*uth
,
664 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
665 return now
- uth
->uu_save
.uus_workq_park_data
.idle_stamp
> delay
;
669 workq_death_call_schedule(struct workqueue
*wq
, uint64_t deadline
)
671 uint32_t wq_flags
= os_atomic_load(&wq
->wq_flags
, relaxed
);
673 if (wq_flags
& (WQ_EXITING
| WQ_DEATH_CALL_SCHEDULED
)) {
676 os_atomic_or(&wq
->wq_flags
, WQ_DEATH_CALL_SCHEDULED
, relaxed
);
678 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_NONE
, wq
, 1, 0, 0, 0);
681 * <rdar://problem/13139182> Due to how long term timers work, the leeway
682 * can't be too short, so use 500ms which is long enough that we will not
683 * wake up the CPU for killing threads, but short enough that it doesn't
684 * fall into long-term timer list shenanigans.
686 thread_call_enter_delayed_with_leeway(wq
->wq_death_call
, NULL
, deadline
,
687 wq_reduce_pool_window
.abstime
/ 10,
688 THREAD_CALL_DELAY_LEEWAY
| THREAD_CALL_DELAY_USER_BACKGROUND
);
692 * `decrement` is set to the number of threads that are no longer dying:
693 * - because they have been resuscitated just in time (workq_pop_idle_thread)
694 * - or have been killed (workq_thread_terminate).
697 workq_death_policy_evaluate(struct workqueue
*wq
, uint16_t decrement
)
701 assert(wq
->wq_thdying_count
>= decrement
);
702 if ((wq
->wq_thdying_count
-= decrement
) > 0) {
706 if (wq
->wq_thidlecount
<= 1) {
710 if ((uth
= workq_oldest_killable_idle_thread(wq
)) == NULL
) {
714 uint64_t now
= mach_absolute_time();
715 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
717 if (now
- uth
->uu_save
.uus_workq_park_data
.idle_stamp
> delay
) {
718 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_START
,
719 wq
, wq
->wq_thidlecount
, 0, 0, 0);
720 wq
->wq_thdying_count
++;
721 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
722 if ((uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) == 0) {
723 workq_thread_wakeup(uth
);
728 workq_death_call_schedule(wq
,
729 uth
->uu_save
.uus_workq_park_data
.idle_stamp
+ delay
);
733 workq_thread_terminate(struct proc
*p
, struct uthread
*uth
)
735 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
738 TAILQ_REMOVE(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
739 if (uth
->uu_workq_flags
& UT_WORKQ_DYING
) {
740 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_END
,
741 wq
, wq
->wq_thidlecount
, 0, 0, 0);
742 workq_death_policy_evaluate(wq
, 1);
744 if (wq
->wq_nthreads
-- == wq_max_threads
) {
746 * We got under the thread limit again, which may have prevented
747 * thread creation from happening, redrive if there are pending requests
749 if (wq
->wq_reqcount
) {
750 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
755 thread_deallocate(uth
->uu_thread
);
759 workq_kill_old_threads_call(void *param0
, void *param1 __unused
)
761 struct workqueue
*wq
= param0
;
764 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
765 os_atomic_andnot(&wq
->wq_flags
, WQ_DEATH_CALL_SCHEDULED
, relaxed
);
766 workq_death_policy_evaluate(wq
, 0);
767 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
771 static struct uthread
*
772 workq_pop_idle_thread(struct workqueue
*wq
, uint8_t uu_flags
,
777 if ((uth
= TAILQ_FIRST(&wq
->wq_thidlelist
))) {
778 TAILQ_REMOVE(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
780 uth
= TAILQ_FIRST(&wq
->wq_thnewlist
);
781 TAILQ_REMOVE(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
783 TAILQ_INSERT_TAIL(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
785 assert((uth
->uu_workq_flags
& UT_WORKQ_RUNNING
) == 0);
786 uth
->uu_workq_flags
|= UT_WORKQ_RUNNING
| uu_flags
;
787 if ((uu_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
788 wq
->wq_constrained_threads_scheduled
++;
790 wq
->wq_threads_scheduled
++;
791 wq
->wq_thidlecount
--;
793 if (__improbable(uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
794 uth
->uu_workq_flags
^= UT_WORKQ_DYING
;
795 workq_death_policy_evaluate(wq
, 1);
796 *needs_wakeup
= false;
797 } else if (uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) {
798 *needs_wakeup
= false;
800 *needs_wakeup
= true;
806 * Called by thread_create_workq_waiting() during thread initialization, before
807 * assert_wait, before the thread has been started.
810 workq_thread_init_and_wq_lock(task_t task
, thread_t th
)
812 struct uthread
*uth
= get_bsdthread_info(th
);
814 uth
->uu_workq_flags
= UT_WORKQ_NEW
;
815 uth
->uu_workq_pri
= WORKQ_POLICY_INIT(THREAD_QOS_LEGACY
);
816 uth
->uu_workq_thport
= MACH_PORT_NULL
;
817 uth
->uu_workq_stackaddr
= 0;
818 uth
->uu_workq_pthread_kill_allowed
= 0;
820 thread_set_tag(th
, THREAD_TAG_PTHREAD
| THREAD_TAG_WORKQUEUE
);
821 thread_reset_workq_qos(th
, THREAD_QOS_LEGACY
);
823 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task
)));
824 return workq_parked_wait_event(uth
);
828 * Try to add a new workqueue thread.
830 * - called with workq lock held
831 * - dropped and retaken around thread creation
832 * - return with workq lock held
835 workq_add_new_idle_thread(proc_t p
, struct workqueue
*wq
)
837 mach_vm_offset_t th_stackaddr
;
845 vm_map_t vmap
= get_task_map(p
->task
);
847 kret
= pthread_functions
->workq_create_threadstack(p
, vmap
, &th_stackaddr
);
848 if (kret
!= KERN_SUCCESS
) {
849 WQ_TRACE_WQ(TRACE_wq_thread_create_failed
| DBG_FUNC_NONE
, wq
,
854 kret
= thread_create_workq_waiting(p
->task
, workq_unpark_continue
, &th
);
855 if (kret
!= KERN_SUCCESS
) {
856 WQ_TRACE_WQ(TRACE_wq_thread_create_failed
| DBG_FUNC_NONE
, wq
,
858 pthread_functions
->workq_destroy_threadstack(p
, vmap
, th_stackaddr
);
862 // thread_create_workq_waiting() will return with the wq lock held
863 // on success, because it calls workq_thread_init_and_wq_lock() above
865 struct uthread
*uth
= get_bsdthread_info(th
);
868 wq
->wq_thidlecount
++;
869 uth
->uu_workq_stackaddr
= th_stackaddr
;
870 TAILQ_INSERT_TAIL(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
872 WQ_TRACE_WQ(TRACE_wq_thread_create
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
878 * Do not redrive here if we went under wq_max_threads again,
879 * it is the responsibility of the callers of this function
880 * to do so when it fails.
886 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
888 __attribute__((noreturn
, noinline
))
890 workq_unpark_for_death_and_unlock(proc_t p
, struct workqueue
*wq
,
891 struct uthread
*uth
, uint32_t death_flags
, uint32_t setup_flags
)
893 thread_qos_t qos
= workq_pri_override(uth
->uu_workq_pri
);
894 bool first_use
= uth
->uu_workq_flags
& UT_WORKQ_NEW
;
896 if (qos
> WORKQ_THREAD_QOS_CLEANUP
) {
897 workq_thread_reset_pri(wq
, uth
, NULL
, /*unpark*/ true);
898 qos
= WORKQ_THREAD_QOS_CLEANUP
;
901 workq_thread_reset_cpupercent(NULL
, uth
);
903 if (death_flags
& WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
) {
904 wq
->wq_thidlecount
--;
906 TAILQ_REMOVE(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
908 TAILQ_REMOVE(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
911 TAILQ_INSERT_TAIL(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
915 if (setup_flags
& WQ_SETUP_CLEAR_VOUCHER
) {
916 __assert_only kern_return_t kr
;
917 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
918 assert(kr
== KERN_SUCCESS
);
921 uint32_t flags
= WQ_FLAG_THREAD_NEWSPI
| qos
| WQ_FLAG_THREAD_PRIO_QOS
;
922 thread_t th
= uth
->uu_thread
;
923 vm_map_t vmap
= get_task_map(p
->task
);
926 flags
|= WQ_FLAG_THREAD_REUSE
;
929 pthread_functions
->workq_setup_thread(p
, th
, vmap
, uth
->uu_workq_stackaddr
,
930 uth
->uu_workq_thport
, 0, WQ_SETUP_EXIT_THREAD
, flags
);
931 __builtin_unreachable();
935 workq_is_current_thread_updating_turnstile(struct workqueue
*wq
)
937 return wq
->wq_turnstile_updater
== current_thread();
940 __attribute__((always_inline
))
942 workq_perform_turnstile_operation_locked(struct workqueue
*wq
,
943 void (^operation
)(void))
946 wq
->wq_turnstile_updater
= current_thread();
948 wq
->wq_turnstile_updater
= THREAD_NULL
;
952 workq_turnstile_update_inheritor(struct workqueue
*wq
,
953 turnstile_inheritor_t inheritor
,
954 turnstile_update_flags_t flags
)
956 if (wq
->wq_inheritor
== inheritor
) {
959 wq
->wq_inheritor
= inheritor
;
960 workq_perform_turnstile_operation_locked(wq
, ^{
961 turnstile_update_inheritor(wq
->wq_turnstile
, inheritor
,
962 flags
| TURNSTILE_IMMEDIATE_UPDATE
);
963 turnstile_update_inheritor_complete(wq
->wq_turnstile
,
964 TURNSTILE_INTERLOCK_HELD
);
969 workq_push_idle_thread(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
970 uint32_t setup_flags
)
972 uint64_t now
= mach_absolute_time();
973 bool is_creator
= (uth
== wq
->wq_creator
);
975 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
976 wq
->wq_constrained_threads_scheduled
--;
978 uth
->uu_workq_flags
&= ~(UT_WORKQ_RUNNING
| UT_WORKQ_OVERCOMMIT
);
979 TAILQ_REMOVE(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
980 wq
->wq_threads_scheduled
--;
983 wq
->wq_creator
= NULL
;
984 WQ_TRACE_WQ(TRACE_wq_creator_select
, wq
, 3, 0,
985 uth
->uu_save
.uus_workq_park_data
.yields
, 0);
988 if (wq
->wq_inheritor
== uth
->uu_thread
) {
989 assert(wq
->wq_creator
== NULL
);
990 if (wq
->wq_reqcount
) {
991 workq_turnstile_update_inheritor(wq
, wq
, TURNSTILE_INHERITOR_WORKQ
);
993 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
997 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
998 assert(is_creator
|| (_wq_flags(wq
) & WQ_EXITING
));
999 TAILQ_INSERT_TAIL(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
1000 wq
->wq_thidlecount
++;
1005 _wq_thactive_dec(wq
, uth
->uu_workq_pri
.qos_bucket
);
1006 wq
->wq_thscheduled_count
[_wq_bucket(uth
->uu_workq_pri
.qos_bucket
)]--;
1007 uth
->uu_workq_flags
|= UT_WORKQ_IDLE_CLEANUP
;
1010 uth
->uu_save
.uus_workq_park_data
.idle_stamp
= now
;
1012 struct uthread
*oldest
= workq_oldest_killable_idle_thread(wq
);
1013 uint16_t cur_idle
= wq
->wq_thidlecount
;
1015 if (cur_idle
>= wq_max_constrained_threads
||
1016 (wq
->wq_thdying_count
== 0 && oldest
&&
1017 workq_should_kill_idle_thread(wq
, oldest
, now
))) {
1019 * Immediately kill threads if we have too may of them.
1021 * And swap "place" with the oldest one we'd have woken up.
1022 * This is a relatively desperate situation where we really
1023 * need to kill threads quickly and it's best to kill
1024 * the one that's currently on core than context switching.
1027 oldest
->uu_save
.uus_workq_park_data
.idle_stamp
= now
;
1028 TAILQ_REMOVE(&wq
->wq_thidlelist
, oldest
, uu_workq_entry
);
1029 TAILQ_INSERT_HEAD(&wq
->wq_thidlelist
, oldest
, uu_workq_entry
);
1032 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_START
,
1033 wq
, cur_idle
, 0, 0, 0);
1034 wq
->wq_thdying_count
++;
1035 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
1036 uth
->uu_workq_flags
&= ~UT_WORKQ_IDLE_CLEANUP
;
1037 workq_unpark_for_death_and_unlock(p
, wq
, uth
, 0, setup_flags
);
1038 __builtin_unreachable();
1041 struct uthread
*tail
= TAILQ_LAST(&wq
->wq_thidlelist
, workq_uthread_head
);
1044 wq
->wq_thidlecount
= cur_idle
;
1046 if (cur_idle
>= wq_death_max_load
&& tail
&&
1047 tail
->uu_save
.uus_workq_park_data
.has_stack
) {
1048 uth
->uu_save
.uus_workq_park_data
.has_stack
= false;
1049 TAILQ_INSERT_TAIL(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
1051 uth
->uu_save
.uus_workq_park_data
.has_stack
= true;
1052 TAILQ_INSERT_HEAD(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
1056 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
1057 workq_death_call_schedule(wq
, now
+ delay
);
1061 #pragma mark thread requests
1064 workq_priority_for_req(workq_threadreq_t req
)
1066 thread_qos_t qos
= req
->tr_qos
;
1068 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
1069 workq_threadreq_param_t trp
= kqueue_threadreq_workloop_param(req
);
1070 assert(trp
.trp_flags
& TRP_PRIORITY
);
1073 return thread_workq_pri_for_qos(qos
);
1076 static inline struct priority_queue
*
1077 workq_priority_queue_for_req(struct workqueue
*wq
, workq_threadreq_t req
)
1079 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
1080 return &wq
->wq_special_queue
;
1081 } else if (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
1082 return &wq
->wq_overcommit_queue
;
1084 return &wq
->wq_constrained_queue
;
1089 * returns true if the the enqueued request is the highest priority item
1090 * in its priority queue.
1093 workq_threadreq_enqueue(struct workqueue
*wq
, workq_threadreq_t req
)
1095 assert(req
->tr_state
== WORKQ_TR_STATE_NEW
);
1097 req
->tr_state
= WORKQ_TR_STATE_QUEUED
;
1098 wq
->wq_reqcount
+= req
->tr_count
;
1100 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
1101 assert(wq
->wq_event_manager_threadreq
== NULL
);
1102 assert(req
->tr_flags
& WORKQ_TR_FLAG_KEVENT
);
1103 assert(req
->tr_count
== 1);
1104 wq
->wq_event_manager_threadreq
= req
;
1107 if (priority_queue_insert(workq_priority_queue_for_req(wq
, req
),
1108 &req
->tr_entry
, workq_priority_for_req(req
),
1109 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
)) {
1110 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
1111 _wq_thactive_refresh_best_constrained_req_qos(wq
);
1119 * returns true if the the dequeued request was the highest priority item
1120 * in its priority queue.
1123 workq_threadreq_dequeue(struct workqueue
*wq
, workq_threadreq_t req
)
1127 if (--req
->tr_count
== 0) {
1128 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
1129 assert(wq
->wq_event_manager_threadreq
== req
);
1130 assert(req
->tr_count
== 0);
1131 wq
->wq_event_manager_threadreq
= NULL
;
1134 if (priority_queue_remove(workq_priority_queue_for_req(wq
, req
),
1135 &req
->tr_entry
, PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
)) {
1136 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
1137 _wq_thactive_refresh_best_constrained_req_qos(wq
);
1146 workq_threadreq_destroy(proc_t p
, workq_threadreq_t req
)
1148 req
->tr_state
= WORKQ_TR_STATE_CANCELED
;
1149 if (req
->tr_flags
& (WORKQ_TR_FLAG_WORKLOOP
| WORKQ_TR_FLAG_KEVENT
)) {
1150 kqueue_threadreq_cancel(p
, req
);
1152 zfree(workq_zone_threadreq
, req
);
1156 #pragma mark workqueue thread creation thread calls
1159 workq_thread_call_prepost(struct workqueue
*wq
, uint32_t sched
, uint32_t pend
,
1162 uint32_t old_flags
, new_flags
;
1164 os_atomic_rmw_loop(&wq
->wq_flags
, old_flags
, new_flags
, acquire
, {
1165 if (__improbable(old_flags
& (WQ_EXITING
| sched
| pend
| fail_mask
))) {
1166 os_atomic_rmw_loop_give_up(return false);
1168 if (__improbable(old_flags
& WQ_PROC_SUSPENDED
)) {
1169 new_flags
= old_flags
| pend
;
1171 new_flags
= old_flags
| sched
;
1175 return (old_flags
& WQ_PROC_SUSPENDED
) == 0;
1178 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1181 workq_schedule_delayed_thread_creation(struct workqueue
*wq
, int flags
)
1183 assert(!preemption_enabled());
1185 if (!workq_thread_call_prepost(wq
, WQ_DELAYED_CALL_SCHEDULED
,
1186 WQ_DELAYED_CALL_PENDED
, WQ_IMMEDIATE_CALL_PENDED
|
1187 WQ_IMMEDIATE_CALL_SCHEDULED
)) {
1191 uint64_t now
= mach_absolute_time();
1193 if (flags
& WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART
) {
1194 /* do not change the window */
1195 } else if (now
- wq
->wq_thread_call_last_run
<= wq
->wq_timer_interval
) {
1196 wq
->wq_timer_interval
*= 2;
1197 if (wq
->wq_timer_interval
> wq_max_timer_interval
.abstime
) {
1198 wq
->wq_timer_interval
= wq_max_timer_interval
.abstime
;
1200 } else if (now
- wq
->wq_thread_call_last_run
> 2 * wq
->wq_timer_interval
) {
1201 wq
->wq_timer_interval
/= 2;
1202 if (wq
->wq_timer_interval
< wq_stalled_window
.abstime
) {
1203 wq
->wq_timer_interval
= wq_stalled_window
.abstime
;
1207 WQ_TRACE_WQ(TRACE_wq_start_add_timer
, wq
, wq
->wq_reqcount
,
1208 _wq_flags(wq
), wq
->wq_timer_interval
, 0);
1210 thread_call_t call
= wq
->wq_delayed_call
;
1211 uintptr_t arg
= WQ_DELAYED_CALL_SCHEDULED
;
1212 uint64_t deadline
= now
+ wq
->wq_timer_interval
;
1213 if (thread_call_enter1_delayed(call
, (void *)arg
, deadline
)) {
1214 panic("delayed_call was already enqueued");
1220 workq_schedule_immediate_thread_creation(struct workqueue
*wq
)
1222 assert(!preemption_enabled());
1224 if (workq_thread_call_prepost(wq
, WQ_IMMEDIATE_CALL_SCHEDULED
,
1225 WQ_IMMEDIATE_CALL_PENDED
, 0)) {
1226 WQ_TRACE_WQ(TRACE_wq_start_add_timer
, wq
, wq
->wq_reqcount
,
1227 _wq_flags(wq
), 0, 0);
1229 uintptr_t arg
= WQ_IMMEDIATE_CALL_SCHEDULED
;
1230 if (thread_call_enter1(wq
->wq_immediate_call
, (void *)arg
)) {
1231 panic("immediate_call was already enqueued");
1237 workq_proc_suspended(struct proc
*p
)
1239 struct workqueue
*wq
= proc_get_wqptr(p
);
1242 os_atomic_or(&wq
->wq_flags
, WQ_PROC_SUSPENDED
, relaxed
);
1247 workq_proc_resumed(struct proc
*p
)
1249 struct workqueue
*wq
= proc_get_wqptr(p
);
1256 wq_flags
= os_atomic_andnot_orig(&wq
->wq_flags
, WQ_PROC_SUSPENDED
|
1257 WQ_DELAYED_CALL_PENDED
| WQ_IMMEDIATE_CALL_PENDED
, relaxed
);
1258 if ((wq_flags
& WQ_EXITING
) == 0) {
1259 disable_preemption();
1260 if (wq_flags
& WQ_IMMEDIATE_CALL_PENDED
) {
1261 workq_schedule_immediate_thread_creation(wq
);
1262 } else if (wq_flags
& WQ_DELAYED_CALL_PENDED
) {
1263 workq_schedule_delayed_thread_creation(wq
,
1264 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART
);
1266 enable_preemption();
1271 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1274 workq_thread_is_busy(uint64_t now
, _Atomic
uint64_t *lastblocked_tsp
)
1276 uint64_t lastblocked_ts
= os_atomic_load_wide(lastblocked_tsp
, relaxed
);
1277 if (now
<= lastblocked_ts
) {
1279 * Because the update of the timestamp when a thread blocks
1280 * isn't serialized against us looking at it (i.e. we don't hold
1281 * the workq lock), it's possible to have a timestamp that matches
1282 * the current time or that even looks to be in the future relative
1283 * to when we grabbed the current time...
1285 * Just treat this as a busy thread since it must have just blocked.
1289 return (now
- lastblocked_ts
) < wq_stalled_window
.abstime
;
1293 workq_add_new_threads_call(void *_p
, void *flags
)
1296 struct workqueue
*wq
= proc_get_wqptr(p
);
1297 uint32_t my_flag
= (uint32_t)(uintptr_t)flags
;
1300 * workq_exit() will set the workqueue to NULL before
1301 * it cancels thread calls.
1307 assert((my_flag
== WQ_DELAYED_CALL_SCHEDULED
) ||
1308 (my_flag
== WQ_IMMEDIATE_CALL_SCHEDULED
));
1310 WQ_TRACE_WQ(TRACE_wq_add_timer
| DBG_FUNC_START
, wq
, _wq_flags(wq
),
1311 wq
->wq_nthreads
, wq
->wq_thidlecount
, 0);
1313 workq_lock_spin(wq
);
1315 wq
->wq_thread_call_last_run
= mach_absolute_time();
1316 os_atomic_andnot(&wq
->wq_flags
, my_flag
, release
);
1318 /* This can drop the workqueue lock, and take it again */
1319 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
1323 WQ_TRACE_WQ(TRACE_wq_add_timer
| DBG_FUNC_END
, wq
, 0,
1324 wq
->wq_nthreads
, wq
->wq_thidlecount
, 0);
1327 #pragma mark thread state tracking
1330 workq_sched_callback(int type
, thread_t thread
)
1332 struct uthread
*uth
= get_bsdthread_info(thread
);
1333 proc_t proc
= get_bsdtask_info(get_threadtask(thread
));
1334 struct workqueue
*wq
= proc_get_wqptr(proc
);
1335 thread_qos_t req_qos
, qos
= uth
->uu_workq_pri
.qos_bucket
;
1336 wq_thactive_t old_thactive
;
1337 bool start_timer
= false;
1339 if (qos
== WORKQ_THREAD_QOS_MANAGER
) {
1344 case SCHED_CALL_BLOCK
:
1345 old_thactive
= _wq_thactive_dec(wq
, qos
);
1346 req_qos
= WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive
);
1349 * Remember the timestamp of the last thread that blocked in this
1350 * bucket, it used used by admission checks to ignore one thread
1351 * being inactive if this timestamp is recent enough.
1353 * If we collide with another thread trying to update the
1354 * last_blocked (really unlikely since another thread would have to
1355 * get scheduled and then block after we start down this path), it's
1356 * not a problem. Either timestamp is adequate, so no need to retry
1358 os_atomic_store_wide(&wq
->wq_lastblocked_ts
[_wq_bucket(qos
)],
1359 thread_last_run_time(thread
), relaxed
);
1361 if (req_qos
== THREAD_QOS_UNSPECIFIED
) {
1363 * No pending request at the moment we could unblock, move on.
1365 } else if (qos
< req_qos
) {
1367 * The blocking thread is at a lower QoS than the highest currently
1368 * pending constrained request, nothing has to be redriven
1371 uint32_t max_busycount
, old_req_count
;
1372 old_req_count
= _wq_thactive_aggregate_downto_qos(wq
, old_thactive
,
1373 req_qos
, NULL
, &max_busycount
);
1375 * If it is possible that may_start_constrained_thread had refused
1376 * admission due to being over the max concurrency, we may need to
1377 * spin up a new thread.
1379 * We take into account the maximum number of busy threads
1380 * that can affect may_start_constrained_thread as looking at the
1381 * actual number may_start_constrained_thread will see is racy.
1383 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1384 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1386 uint32_t conc
= wq_max_parallelism
[_wq_bucket(qos
)];
1387 if (old_req_count
<= conc
&& conc
<= old_req_count
+ max_busycount
) {
1388 start_timer
= workq_schedule_delayed_thread_creation(wq
, 0);
1391 if (__improbable(kdebug_enable
)) {
1392 __unused
uint32_t old
= _wq_thactive_aggregate_downto_qos(wq
,
1393 old_thactive
, qos
, NULL
, NULL
);
1394 WQ_TRACE_WQ(TRACE_wq_thread_block
| DBG_FUNC_START
, wq
,
1395 old
- 1, qos
| (req_qos
<< 8),
1396 wq
->wq_reqcount
<< 1 | start_timer
, 0);
1400 case SCHED_CALL_UNBLOCK
:
1402 * we cannot take the workqueue_lock here...
1403 * an UNBLOCK can occur from a timer event which
1404 * is run from an interrupt context... if the workqueue_lock
1405 * is already held by this processor, we'll deadlock...
1406 * the thread lock for the thread being UNBLOCKED
1409 old_thactive
= _wq_thactive_inc(wq
, qos
);
1410 if (__improbable(kdebug_enable
)) {
1411 __unused
uint32_t old
= _wq_thactive_aggregate_downto_qos(wq
,
1412 old_thactive
, qos
, NULL
, NULL
);
1413 req_qos
= WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive
);
1414 WQ_TRACE_WQ(TRACE_wq_thread_block
| DBG_FUNC_END
, wq
,
1415 old
+ 1, qos
| (req_qos
<< 8),
1416 wq
->wq_threads_scheduled
, 0);
1422 #pragma mark workq lifecycle
1425 workq_reference(struct workqueue
*wq
)
1427 os_ref_retain(&wq
->wq_refcnt
);
1431 workq_deallocate_queue_invoke(mpsc_queue_chain_t e
,
1432 __assert_only mpsc_daemon_queue_t dq
)
1434 struct workqueue
*wq
;
1435 struct turnstile
*ts
;
1437 wq
= mpsc_queue_element(e
, struct workqueue
, wq_destroy_link
);
1438 assert(dq
== &workq_deallocate_queue
);
1440 turnstile_complete((uintptr_t)wq
, &wq
->wq_turnstile
, &ts
, TURNSTILE_WORKQS
);
1442 turnstile_cleanup();
1443 turnstile_deallocate(ts
);
1445 lck_spin_destroy(&wq
->wq_lock
, workq_lck_grp
);
1446 zfree(workq_zone_workqueue
, wq
);
1450 workq_deallocate(struct workqueue
*wq
)
1452 if (os_ref_release_relaxed(&wq
->wq_refcnt
) == 0) {
1453 workq_deallocate_queue_invoke(&wq
->wq_destroy_link
,
1454 &workq_deallocate_queue
);
1459 workq_deallocate_safe(struct workqueue
*wq
)
1461 if (__improbable(os_ref_release_relaxed(&wq
->wq_refcnt
) == 0)) {
1462 mpsc_daemon_enqueue(&workq_deallocate_queue
, &wq
->wq_destroy_link
,
1463 MPSC_QUEUE_DISABLE_PREEMPTION
);
1468 * Setup per-process state for the workqueue.
1471 workq_open(struct proc
*p
, __unused
struct workq_open_args
*uap
,
1472 __unused
int32_t *retval
)
1474 struct workqueue
*wq
;
1477 if ((p
->p_lflag
& P_LREGISTER
) == 0) {
1481 if (wq_init_constrained_limit
) {
1482 uint32_t limit
, num_cpus
= ml_get_max_cpus();
1485 * set up the limit for the constrained pool
1486 * this is a virtual pool in that we don't
1487 * maintain it on a separate idle and run list
1489 limit
= num_cpus
* WORKQUEUE_CONSTRAINED_FACTOR
;
1491 if (limit
> wq_max_constrained_threads
) {
1492 wq_max_constrained_threads
= limit
;
1495 if (wq_max_threads
> WQ_THACTIVE_BUCKET_HALF
) {
1496 wq_max_threads
= WQ_THACTIVE_BUCKET_HALF
;
1498 if (wq_max_threads
> CONFIG_THREAD_MAX
- 20) {
1499 wq_max_threads
= CONFIG_THREAD_MAX
- 20;
1502 wq_death_max_load
= (uint16_t)fls(num_cpus
) + 1;
1504 for (thread_qos_t qos
= WORKQ_THREAD_QOS_MIN
; qos
<= WORKQ_THREAD_QOS_MAX
; qos
++) {
1505 wq_max_parallelism
[_wq_bucket(qos
)] =
1506 qos_max_parallelism(qos
, QOS_PARALLELISM_COUNT_LOGICAL
);
1509 wq_init_constrained_limit
= 0;
1512 if (proc_get_wqptr(p
) == NULL
) {
1513 if (proc_init_wqptr_or_wait(p
) == FALSE
) {
1514 assert(proc_get_wqptr(p
) != NULL
);
1518 wq
= (struct workqueue
*)zalloc(workq_zone_workqueue
);
1519 bzero(wq
, sizeof(struct workqueue
));
1521 os_ref_init_count(&wq
->wq_refcnt
, &workq_refgrp
, 1);
1523 // Start the event manager at the priority hinted at by the policy engine
1524 thread_qos_t mgr_priority_hint
= task_get_default_manager_qos(current_task());
1525 pthread_priority_t pp
= _pthread_priority_make_from_thread_qos(mgr_priority_hint
, 0, 0);
1526 wq
->wq_event_manager_priority
= (uint32_t)pp
;
1527 wq
->wq_timer_interval
= wq_stalled_window
.abstime
;
1529 turnstile_prepare((uintptr_t)wq
, &wq
->wq_turnstile
, turnstile_alloc(),
1532 TAILQ_INIT(&wq
->wq_thrunlist
);
1533 TAILQ_INIT(&wq
->wq_thnewlist
);
1534 TAILQ_INIT(&wq
->wq_thidlelist
);
1535 priority_queue_init(&wq
->wq_overcommit_queue
,
1536 PRIORITY_QUEUE_BUILTIN_MAX_HEAP
);
1537 priority_queue_init(&wq
->wq_constrained_queue
,
1538 PRIORITY_QUEUE_BUILTIN_MAX_HEAP
);
1539 priority_queue_init(&wq
->wq_special_queue
,
1540 PRIORITY_QUEUE_BUILTIN_MAX_HEAP
);
1542 wq
->wq_delayed_call
= thread_call_allocate_with_options(
1543 workq_add_new_threads_call
, p
, THREAD_CALL_PRIORITY_KERNEL
,
1544 THREAD_CALL_OPTIONS_ONCE
);
1545 wq
->wq_immediate_call
= thread_call_allocate_with_options(
1546 workq_add_new_threads_call
, p
, THREAD_CALL_PRIORITY_KERNEL
,
1547 THREAD_CALL_OPTIONS_ONCE
);
1548 wq
->wq_death_call
= thread_call_allocate_with_options(
1549 workq_kill_old_threads_call
, wq
,
1550 THREAD_CALL_PRIORITY_USER
, THREAD_CALL_OPTIONS_ONCE
);
1552 lck_spin_init(&wq
->wq_lock
, workq_lck_grp
, workq_lck_attr
);
1554 WQ_TRACE_WQ(TRACE_wq_create
| DBG_FUNC_NONE
, wq
,
1555 VM_KERNEL_ADDRHIDE(wq
), 0, 0, 0);
1556 proc_set_wqptr(p
, wq
);
1564 * Routine: workq_mark_exiting
1566 * Function: Mark the work queue such that new threads will not be added to the
1567 * work queue after we return.
1569 * Conditions: Called against the current process.
1572 workq_mark_exiting(struct proc
*p
)
1574 struct workqueue
*wq
= proc_get_wqptr(p
);
1576 workq_threadreq_t mgr_req
;
1582 WQ_TRACE_WQ(TRACE_wq_pthread_exit
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
1584 workq_lock_spin(wq
);
1586 wq_flags
= os_atomic_or_orig(&wq
->wq_flags
, WQ_EXITING
, relaxed
);
1587 if (__improbable(wq_flags
& WQ_EXITING
)) {
1588 panic("workq_mark_exiting called twice");
1592 * Opportunistically try to cancel thread calls that are likely in flight.
1593 * workq_exit() will do the proper cleanup.
1595 if (wq_flags
& WQ_IMMEDIATE_CALL_SCHEDULED
) {
1596 thread_call_cancel(wq
->wq_immediate_call
);
1598 if (wq_flags
& WQ_DELAYED_CALL_SCHEDULED
) {
1599 thread_call_cancel(wq
->wq_delayed_call
);
1601 if (wq_flags
& WQ_DEATH_CALL_SCHEDULED
) {
1602 thread_call_cancel(wq
->wq_death_call
);
1605 mgr_req
= wq
->wq_event_manager_threadreq
;
1606 wq
->wq_event_manager_threadreq
= NULL
;
1607 wq
->wq_reqcount
= 0; /* workq_schedule_creator must not look at queues */
1608 wq
->wq_creator
= NULL
;
1609 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
1614 kqueue_threadreq_cancel(p
, mgr_req
);
1617 * No one touches the priority queues once WQ_EXITING is set.
1618 * It is hence safe to do the tear down without holding any lock.
1620 priority_queue_destroy(&wq
->wq_overcommit_queue
,
1621 struct workq_threadreq_s
, tr_entry
, ^(void *e
){
1622 workq_threadreq_destroy(p
, e
);
1624 priority_queue_destroy(&wq
->wq_constrained_queue
,
1625 struct workq_threadreq_s
, tr_entry
, ^(void *e
){
1626 workq_threadreq_destroy(p
, e
);
1628 priority_queue_destroy(&wq
->wq_special_queue
,
1629 struct workq_threadreq_s
, tr_entry
, ^(void *e
){
1630 workq_threadreq_destroy(p
, e
);
1633 WQ_TRACE(TRACE_wq_pthread_exit
| DBG_FUNC_END
, 0, 0, 0, 0, 0);
1637 * Routine: workq_exit
1639 * Function: clean up the work queue structure(s) now that there are no threads
1640 * left running inside the work queue (except possibly current_thread).
1642 * Conditions: Called by the last thread in the process.
1643 * Called against current process.
1646 workq_exit(struct proc
*p
)
1648 struct workqueue
*wq
;
1649 struct uthread
*uth
, *tmp
;
1651 wq
= os_atomic_xchg(&p
->p_wqptr
, NULL
, relaxed
);
1653 thread_t th
= current_thread();
1655 WQ_TRACE_WQ(TRACE_wq_workqueue_exit
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
1657 if (thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) {
1659 * <rdar://problem/40111515> Make sure we will no longer call the
1660 * sched call, if we ever block this thread, which the cancel_wait
1663 thread_sched_call(th
, NULL
);
1667 * Thread calls are always scheduled by the proc itself or under the
1668 * workqueue spinlock if WQ_EXITING is not yet set.
1670 * Either way, when this runs, the proc has no threads left beside
1671 * the one running this very code, so we know no thread call can be
1672 * dispatched anymore.
1674 thread_call_cancel_wait(wq
->wq_delayed_call
);
1675 thread_call_cancel_wait(wq
->wq_immediate_call
);
1676 thread_call_cancel_wait(wq
->wq_death_call
);
1677 thread_call_free(wq
->wq_delayed_call
);
1678 thread_call_free(wq
->wq_immediate_call
);
1679 thread_call_free(wq
->wq_death_call
);
1682 * Clean up workqueue data structures for threads that exited and
1683 * didn't get a chance to clean up after themselves.
1685 * idle/new threads should have been interrupted and died on their own
1687 TAILQ_FOREACH_SAFE(uth
, &wq
->wq_thrunlist
, uu_workq_entry
, tmp
) {
1688 thread_sched_call(uth
->uu_thread
, NULL
);
1689 thread_deallocate(uth
->uu_thread
);
1691 assert(TAILQ_EMPTY(&wq
->wq_thnewlist
));
1692 assert(TAILQ_EMPTY(&wq
->wq_thidlelist
));
1694 WQ_TRACE_WQ(TRACE_wq_destroy
| DBG_FUNC_END
, wq
,
1695 VM_KERNEL_ADDRHIDE(wq
), 0, 0, 0);
1697 workq_deallocate(wq
);
1699 WQ_TRACE(TRACE_wq_workqueue_exit
| DBG_FUNC_END
, 0, 0, 0, 0, 0);
1704 #pragma mark bsd thread control
1707 _pthread_priority_to_policy(pthread_priority_t priority
,
1708 thread_qos_policy_data_t
*data
)
1710 data
->qos_tier
= _pthread_priority_thread_qos(priority
);
1711 data
->tier_importance
= _pthread_priority_relpri(priority
);
1712 if (data
->qos_tier
== THREAD_QOS_UNSPECIFIED
|| data
->tier_importance
> 0 ||
1713 data
->tier_importance
< THREAD_QOS_MIN_TIER_IMPORTANCE
) {
1720 bsdthread_set_self(proc_t p
, thread_t th
, pthread_priority_t priority
,
1721 mach_port_name_t voucher
, enum workq_set_self_flags flags
)
1723 struct uthread
*uth
= get_bsdthread_info(th
);
1724 struct workqueue
*wq
= proc_get_wqptr(p
);
1727 int unbind_rv
= 0, qos_rv
= 0, voucher_rv
= 0, fixedpri_rv
= 0;
1728 bool is_wq_thread
= (thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
);
1730 if (flags
& WORKQ_SET_SELF_WQ_KEVENT_UNBIND
) {
1731 if (!is_wq_thread
) {
1736 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
1741 workq_threadreq_t kqr
= uth
->uu_kqr_bound
;
1743 unbind_rv
= EALREADY
;
1747 if (kqr
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
1752 kqueue_threadreq_unbind(p
, kqr
);
1756 if (flags
& WORKQ_SET_SELF_QOS_FLAG
) {
1757 thread_qos_policy_data_t new_policy
;
1759 if (!_pthread_priority_to_policy(priority
, &new_policy
)) {
1764 if (!is_wq_thread
) {
1766 * Threads opted out of QoS can't change QoS
1768 if (!thread_has_qos_policy(th
)) {
1772 } else if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
||
1773 uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_ABOVEUI
) {
1775 * Workqueue manager threads or threads above UI can't change QoS
1781 * For workqueue threads, possibly adjust buckets and redrive thread
1784 bool old_overcommit
= uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
;
1785 bool new_overcommit
= priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
1786 struct uu_workq_policy old_pri
, new_pri
;
1787 bool force_run
= false;
1789 workq_lock_spin(wq
);
1791 if (old_overcommit
!= new_overcommit
) {
1792 uth
->uu_workq_flags
^= UT_WORKQ_OVERCOMMIT
;
1793 if (old_overcommit
) {
1794 wq
->wq_constrained_threads_scheduled
++;
1795 } else if (wq
->wq_constrained_threads_scheduled
-- ==
1796 wq_max_constrained_threads
) {
1801 old_pri
= new_pri
= uth
->uu_workq_pri
;
1802 new_pri
.qos_req
= new_policy
.qos_tier
;
1803 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, force_run
);
1807 kr
= thread_policy_set_internal(th
, THREAD_QOS_POLICY
,
1808 (thread_policy_t
)&new_policy
, THREAD_QOS_POLICY_COUNT
);
1809 if (kr
!= KERN_SUCCESS
) {
1815 if (flags
& WORKQ_SET_SELF_VOUCHER_FLAG
) {
1816 kr
= thread_set_voucher_name(voucher
);
1817 if (kr
!= KERN_SUCCESS
) {
1818 voucher_rv
= ENOENT
;
1827 if (flags
& WORKQ_SET_SELF_FIXEDPRIORITY_FLAG
) {
1828 thread_extended_policy_data_t extpol
= {.timeshare
= 0};
1831 /* Not allowed on workqueue threads */
1832 fixedpri_rv
= ENOTSUP
;
1836 kr
= thread_policy_set_internal(th
, THREAD_EXTENDED_POLICY
,
1837 (thread_policy_t
)&extpol
, THREAD_EXTENDED_POLICY_COUNT
);
1838 if (kr
!= KERN_SUCCESS
) {
1839 fixedpri_rv
= EINVAL
;
1842 } else if (flags
& WORKQ_SET_SELF_TIMESHARE_FLAG
) {
1843 thread_extended_policy_data_t extpol
= {.timeshare
= 1};
1846 /* Not allowed on workqueue threads */
1847 fixedpri_rv
= ENOTSUP
;
1851 kr
= thread_policy_set_internal(th
, THREAD_EXTENDED_POLICY
,
1852 (thread_policy_t
)&extpol
, THREAD_EXTENDED_POLICY_COUNT
);
1853 if (kr
!= KERN_SUCCESS
) {
1854 fixedpri_rv
= EINVAL
;
1860 if (qos_rv
&& voucher_rv
) {
1861 /* Both failed, give that a unique error. */
1885 bsdthread_add_explicit_override(proc_t p
, mach_port_name_t kport
,
1886 pthread_priority_t pp
, user_addr_t resource
)
1888 thread_qos_t qos
= _pthread_priority_thread_qos(pp
);
1889 if (qos
== THREAD_QOS_UNSPECIFIED
) {
1893 thread_t th
= port_name_to_thread(kport
,
1894 PORT_TO_THREAD_IN_CURRENT_TASK
);
1895 if (th
== THREAD_NULL
) {
1899 int rv
= proc_thread_qos_add_override(p
->task
, th
, 0, qos
, TRUE
,
1900 resource
, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE
);
1902 thread_deallocate(th
);
1907 bsdthread_remove_explicit_override(proc_t p
, mach_port_name_t kport
,
1908 user_addr_t resource
)
1910 thread_t th
= port_name_to_thread(kport
,
1911 PORT_TO_THREAD_IN_CURRENT_TASK
);
1912 if (th
== THREAD_NULL
) {
1916 int rv
= proc_thread_qos_remove_override(p
->task
, th
, 0, resource
,
1917 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE
);
1919 thread_deallocate(th
);
1924 workq_thread_add_dispatch_override(proc_t p
, mach_port_name_t kport
,
1925 pthread_priority_t pp
, user_addr_t ulock_addr
)
1927 struct uu_workq_policy old_pri
, new_pri
;
1928 struct workqueue
*wq
= proc_get_wqptr(p
);
1930 thread_qos_t qos_override
= _pthread_priority_thread_qos(pp
);
1931 if (qos_override
== THREAD_QOS_UNSPECIFIED
) {
1935 thread_t thread
= port_name_to_thread(kport
,
1936 PORT_TO_THREAD_IN_CURRENT_TASK
);
1937 if (thread
== THREAD_NULL
) {
1941 struct uthread
*uth
= get_bsdthread_info(thread
);
1942 if ((thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
) == 0) {
1943 thread_deallocate(thread
);
1947 WQ_TRACE_WQ(TRACE_wq_override_dispatch
| DBG_FUNC_NONE
,
1948 wq
, thread_tid(thread
), 1, pp
, 0);
1950 thread_mtx_lock(thread
);
1956 * Workaround lack of explicit support for 'no-fault copyin'
1957 * <rdar://problem/24999882>, as disabling preemption prevents paging in
1959 disable_preemption();
1960 rc
= copyin_atomic32(ulock_addr
, &val
);
1961 enable_preemption();
1962 if (rc
== 0 && ulock_owner_value_to_port_name(val
) != kport
) {
1967 workq_lock_spin(wq
);
1969 old_pri
= uth
->uu_workq_pri
;
1970 if (old_pri
.qos_override
>= qos_override
) {
1972 } else if (thread
== current_thread()) {
1974 new_pri
.qos_override
= qos_override
;
1975 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
1977 uth
->uu_workq_pri
.qos_override
= qos_override
;
1978 if (qos_override
> workq_pri_override(old_pri
)) {
1979 thread_set_workq_override(thread
, qos_override
);
1986 thread_mtx_unlock(thread
);
1987 thread_deallocate(thread
);
1992 workq_thread_reset_dispatch_override(proc_t p
, thread_t thread
)
1994 struct uu_workq_policy old_pri
, new_pri
;
1995 struct workqueue
*wq
= proc_get_wqptr(p
);
1996 struct uthread
*uth
= get_bsdthread_info(thread
);
1998 if ((thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
) == 0) {
2002 WQ_TRACE_WQ(TRACE_wq_override_reset
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
2004 workq_lock_spin(wq
);
2005 old_pri
= new_pri
= uth
->uu_workq_pri
;
2006 new_pri
.qos_override
= THREAD_QOS_UNSPECIFIED
;
2007 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
2013 workq_thread_allow_kill(__unused proc_t p
, thread_t thread
, bool enable
)
2015 if (!(thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
)) {
2016 // If the thread isn't a workqueue thread, don't set the
2017 // kill_allowed bit; however, we still need to return 0
2018 // instead of an error code since this code is executed
2019 // on the abort path which needs to not depend on the
2020 // pthread_t (returning an error depends on pthread_t via
2024 struct uthread
*uth
= get_bsdthread_info(thread
);
2025 uth
->uu_workq_pthread_kill_allowed
= enable
;
2030 bsdthread_get_max_parallelism(thread_qos_t qos
, unsigned long flags
,
2033 static_assert(QOS_PARALLELISM_COUNT_LOGICAL
==
2034 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL
, "logical");
2035 static_assert(QOS_PARALLELISM_REALTIME
==
2036 _PTHREAD_QOS_PARALLELISM_REALTIME
, "realtime");
2038 if (flags
& ~(QOS_PARALLELISM_REALTIME
| QOS_PARALLELISM_COUNT_LOGICAL
)) {
2042 if (flags
& QOS_PARALLELISM_REALTIME
) {
2046 } else if (qos
== THREAD_QOS_UNSPECIFIED
|| qos
>= THREAD_QOS_LAST
) {
2050 *retval
= qos_max_parallelism(qos
, flags
);
2054 #define ENSURE_UNUSED(arg) \
2055 ({ if ((arg) != 0) { return EINVAL; } })
2058 bsdthread_ctl(struct proc
*p
, struct bsdthread_ctl_args
*uap
, int *retval
)
2061 case BSDTHREAD_CTL_QOS_OVERRIDE_START
:
2062 return bsdthread_add_explicit_override(p
, (mach_port_name_t
)uap
->arg1
,
2063 (pthread_priority_t
)uap
->arg2
, uap
->arg3
);
2064 case BSDTHREAD_CTL_QOS_OVERRIDE_END
:
2065 ENSURE_UNUSED(uap
->arg3
);
2066 return bsdthread_remove_explicit_override(p
, (mach_port_name_t
)uap
->arg1
,
2067 (user_addr_t
)uap
->arg2
);
2069 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH
:
2070 return workq_thread_add_dispatch_override(p
, (mach_port_name_t
)uap
->arg1
,
2071 (pthread_priority_t
)uap
->arg2
, uap
->arg3
);
2072 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET
:
2073 return workq_thread_reset_dispatch_override(p
, current_thread());
2075 case BSDTHREAD_CTL_SET_SELF
:
2076 return bsdthread_set_self(p
, current_thread(),
2077 (pthread_priority_t
)uap
->arg1
, (mach_port_name_t
)uap
->arg2
,
2078 (enum workq_set_self_flags
)uap
->arg3
);
2080 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM
:
2081 ENSURE_UNUSED(uap
->arg3
);
2082 return bsdthread_get_max_parallelism((thread_qos_t
)uap
->arg1
,
2083 (unsigned long)uap
->arg2
, retval
);
2084 case BSDTHREAD_CTL_WORKQ_ALLOW_KILL
:
2085 ENSURE_UNUSED(uap
->arg2
);
2086 ENSURE_UNUSED(uap
->arg3
);
2087 return workq_thread_allow_kill(p
, current_thread(), (bool)uap
->arg1
);
2089 case BSDTHREAD_CTL_SET_QOS
:
2090 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD
:
2091 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET
:
2092 /* no longer supported */
2100 #pragma mark workqueue thread manipulation
2103 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
2104 struct uthread
*uth
, uint32_t setup_flags
);
2107 workq_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
2108 struct uthread
*uth
, uint32_t setup_flags
);
2110 static void workq_setup_and_run(proc_t p
, struct uthread
*uth
, int flags
) __dead2
;
2112 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2113 static inline uint64_t
2114 workq_trace_req_id(workq_threadreq_t req
)
2116 struct kqworkloop
*kqwl
;
2117 if (req
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
2118 kqwl
= __container_of(req
, struct kqworkloop
, kqwl_request
);
2119 return kqwl
->kqwl_dynamicid
;
2122 return VM_KERNEL_ADDRHIDE(req
);
2127 * Entry point for libdispatch to ask for threads
2130 workq_reqthreads(struct proc
*p
, uint32_t reqcount
, pthread_priority_t pp
)
2132 thread_qos_t qos
= _pthread_priority_thread_qos(pp
);
2133 struct workqueue
*wq
= proc_get_wqptr(p
);
2134 uint32_t unpaced
, upcall_flags
= WQ_FLAG_THREAD_NEWSPI
;
2136 if (wq
== NULL
|| reqcount
<= 0 || reqcount
> UINT16_MAX
||
2137 qos
== THREAD_QOS_UNSPECIFIED
) {
2141 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads
| DBG_FUNC_NONE
,
2142 wq
, reqcount
, pp
, 0, 0);
2144 workq_threadreq_t req
= zalloc(workq_zone_threadreq
);
2145 priority_queue_entry_init(&req
->tr_entry
);
2146 req
->tr_state
= WORKQ_TR_STATE_NEW
;
2150 if (pp
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
) {
2151 req
->tr_flags
|= WORKQ_TR_FLAG_OVERCOMMIT
;
2152 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
2155 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate
| DBG_FUNC_NONE
,
2156 wq
, workq_trace_req_id(req
), req
->tr_qos
, reqcount
, 0);
2158 workq_lock_spin(wq
);
2160 if (_wq_exiting(wq
)) {
2165 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2166 * threads without pacing, to inform the scheduler of that workload.
2168 * The last requests, or the ones that failed the admission checks are
2169 * enqueued and go through the regular creator codepath.
2171 * If there aren't enough threads, add one, but re-evaluate everything
2172 * as conditions may now have changed.
2174 if (reqcount
> 1 && (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
2175 unpaced
= workq_constrained_allowance(wq
, qos
, NULL
, false);
2176 if (unpaced
>= reqcount
- 1) {
2177 unpaced
= reqcount
- 1;
2180 unpaced
= reqcount
- 1;
2184 * This path does not currently handle custom workloop parameters
2185 * when creating threads for parallelism.
2187 assert(!(req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
));
2190 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
2192 while (unpaced
> 0 && wq
->wq_thidlecount
) {
2193 struct uthread
*uth
;
2195 uint8_t uu_flags
= UT_WORKQ_EARLY_BOUND
;
2197 if (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
2198 uu_flags
|= UT_WORKQ_OVERCOMMIT
;
2201 uth
= workq_pop_idle_thread(wq
, uu_flags
, &needs_wakeup
);
2203 _wq_thactive_inc(wq
, qos
);
2204 wq
->wq_thscheduled_count
[_wq_bucket(qos
)]++;
2205 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
2208 uth
->uu_save
.uus_workq_park_data
.upcall_flags
= upcall_flags
;
2209 uth
->uu_save
.uus_workq_park_data
.thread_request
= req
;
2211 workq_thread_wakeup(uth
);
2216 } while (unpaced
&& wq
->wq_nthreads
< wq_max_threads
&&
2217 workq_add_new_idle_thread(p
, wq
));
2219 if (_wq_exiting(wq
)) {
2223 req
->tr_count
= reqcount
;
2224 if (workq_threadreq_enqueue(wq
, req
)) {
2225 /* This can drop the workqueue lock, and take it again */
2226 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
2233 zfree(workq_zone_threadreq
, req
);
2238 workq_kern_threadreq_initiate(struct proc
*p
, workq_threadreq_t req
,
2239 struct turnstile
*workloop_ts
, thread_qos_t qos
,
2240 workq_kern_threadreq_flags_t flags
)
2242 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2243 struct uthread
*uth
= NULL
;
2245 assert(req
->tr_flags
& (WORKQ_TR_FLAG_WORKLOOP
| WORKQ_TR_FLAG_KEVENT
));
2247 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
2248 workq_threadreq_param_t trp
= kqueue_threadreq_workloop_param(req
);
2249 qos
= thread_workq_qos_for_pri(trp
.trp_pri
);
2250 if (qos
== THREAD_QOS_UNSPECIFIED
) {
2251 qos
= WORKQ_THREAD_QOS_ABOVEUI
;
2255 assert(req
->tr_state
== WORKQ_TR_STATE_IDLE
);
2256 priority_queue_entry_init(&req
->tr_entry
);
2258 req
->tr_state
= WORKQ_TR_STATE_NEW
;
2261 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate
| DBG_FUNC_NONE
, wq
,
2262 workq_trace_req_id(req
), qos
, 1, 0);
2264 if (flags
& WORKQ_THREADREQ_ATTEMPT_REBIND
) {
2266 * we're called back synchronously from the context of
2267 * kqueue_threadreq_unbind from within workq_thread_return()
2268 * we can try to match up this thread with this request !
2270 uth
= current_uthread();
2271 assert(uth
->uu_kqr_bound
== NULL
);
2274 workq_lock_spin(wq
);
2275 if (_wq_exiting(wq
)) {
2276 req
->tr_state
= WORKQ_TR_STATE_IDLE
;
2281 if (uth
&& workq_threadreq_admissible(wq
, uth
, req
)) {
2282 assert(uth
!= wq
->wq_creator
);
2283 if (uth
->uu_workq_pri
.qos_bucket
!= req
->tr_qos
) {
2284 _wq_thactive_move(wq
, uth
->uu_workq_pri
.qos_bucket
, req
->tr_qos
);
2285 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ false);
2288 * We're called from workq_kern_threadreq_initiate()
2289 * due to an unbind, with the kq req held.
2291 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
2292 workq_trace_req_id(req
), 0, 0, 0);
2294 kqueue_threadreq_bind(p
, req
, uth
->uu_thread
, 0);
2297 workq_perform_turnstile_operation_locked(wq
, ^{
2298 turnstile_update_inheritor(workloop_ts
, wq
->wq_turnstile
,
2299 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_TURNSTILE
);
2300 turnstile_update_inheritor_complete(workloop_ts
,
2301 TURNSTILE_INTERLOCK_HELD
);
2304 if (workq_threadreq_enqueue(wq
, req
)) {
2305 workq_schedule_creator(p
, wq
, flags
);
2315 workq_kern_threadreq_modify(struct proc
*p
, workq_threadreq_t req
,
2316 thread_qos_t qos
, workq_kern_threadreq_flags_t flags
)
2318 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2319 bool make_overcommit
= false;
2321 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
2322 /* Requests outside-of-QoS shouldn't accept modify operations */
2326 workq_lock_spin(wq
);
2328 assert(req
->tr_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2329 assert(req
->tr_flags
& (WORKQ_TR_FLAG_KEVENT
| WORKQ_TR_FLAG_WORKLOOP
));
2331 if (req
->tr_state
== WORKQ_TR_STATE_BINDING
) {
2332 kqueue_threadreq_bind(p
, req
, req
->tr_thread
, 0);
2337 if (flags
& WORKQ_THREADREQ_MAKE_OVERCOMMIT
) {
2338 make_overcommit
= (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0;
2341 if (_wq_exiting(wq
) || (req
->tr_qos
== qos
&& !make_overcommit
)) {
2346 assert(req
->tr_count
== 1);
2347 if (req
->tr_state
!= WORKQ_TR_STATE_QUEUED
) {
2348 panic("Invalid thread request (%p) state %d", req
, req
->tr_state
);
2351 WQ_TRACE_WQ(TRACE_wq_thread_request_modify
| DBG_FUNC_NONE
, wq
,
2352 workq_trace_req_id(req
), qos
, 0, 0);
2354 struct priority_queue
*pq
= workq_priority_queue_for_req(wq
, req
);
2355 workq_threadreq_t req_max
;
2358 * Stage 1: Dequeue the request from its priority queue.
2360 * If we dequeue the root item of the constrained priority queue,
2361 * maintain the best constrained request qos invariant.
2363 if (priority_queue_remove(pq
, &req
->tr_entry
,
2364 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
)) {
2365 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
2366 _wq_thactive_refresh_best_constrained_req_qos(wq
);
2371 * Stage 2: Apply changes to the thread request
2373 * If the item will not become the root of the priority queue it belongs to,
2374 * then we need to wait in line, just enqueue and return quickly.
2376 if (__improbable(make_overcommit
)) {
2377 req
->tr_flags
^= WORKQ_TR_FLAG_OVERCOMMIT
;
2378 pq
= workq_priority_queue_for_req(wq
, req
);
2382 req_max
= priority_queue_max(pq
, struct workq_threadreq_s
, tr_entry
);
2383 if (req_max
&& req_max
->tr_qos
>= qos
) {
2384 priority_queue_insert(pq
, &req
->tr_entry
, workq_priority_for_req(req
),
2385 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
);
2391 * Stage 3: Reevaluate whether we should run the thread request.
2393 * Pretend the thread request is new again:
2394 * - adjust wq_reqcount to not count it anymore.
2395 * - make its state WORKQ_TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
2396 * properly attempts a synchronous bind)
2399 req
->tr_state
= WORKQ_TR_STATE_NEW
;
2400 if (workq_threadreq_enqueue(wq
, req
)) {
2401 workq_schedule_creator(p
, wq
, flags
);
2407 workq_kern_threadreq_lock(struct proc
*p
)
2409 workq_lock_spin(proc_get_wqptr_fast(p
));
2413 workq_kern_threadreq_unlock(struct proc
*p
)
2415 workq_unlock(proc_get_wqptr_fast(p
));
2419 workq_kern_threadreq_update_inheritor(struct proc
*p
, workq_threadreq_t req
,
2420 thread_t owner
, struct turnstile
*wl_ts
,
2421 turnstile_update_flags_t flags
)
2423 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2424 turnstile_inheritor_t inheritor
;
2426 assert(req
->tr_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2427 assert(req
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
);
2428 workq_lock_held(wq
);
2430 if (req
->tr_state
== WORKQ_TR_STATE_BINDING
) {
2431 kqueue_threadreq_bind(p
, req
, req
->tr_thread
,
2432 KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE
);
2436 if (_wq_exiting(wq
)) {
2437 inheritor
= TURNSTILE_INHERITOR_NULL
;
2439 if (req
->tr_state
!= WORKQ_TR_STATE_QUEUED
) {
2440 panic("Invalid thread request (%p) state %d", req
, req
->tr_state
);
2445 flags
|= TURNSTILE_INHERITOR_THREAD
;
2447 inheritor
= wq
->wq_turnstile
;
2448 flags
|= TURNSTILE_INHERITOR_TURNSTILE
;
2452 workq_perform_turnstile_operation_locked(wq
, ^{
2453 turnstile_update_inheritor(wl_ts
, inheritor
, flags
);
2458 workq_kern_threadreq_redrive(struct proc
*p
, workq_kern_threadreq_flags_t flags
)
2460 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2462 workq_lock_spin(wq
);
2463 workq_schedule_creator(p
, wq
, flags
);
2468 workq_schedule_creator_turnstile_redrive(struct workqueue
*wq
, bool locked
)
2471 workq_schedule_creator(NULL
, wq
, WORKQ_THREADREQ_NONE
);
2473 workq_schedule_immediate_thread_creation(wq
);
2478 workq_thread_return(struct proc
*p
, struct workq_kernreturn_args
*uap
,
2479 struct workqueue
*wq
)
2481 thread_t th
= current_thread();
2482 struct uthread
*uth
= get_bsdthread_info(th
);
2483 workq_threadreq_t kqr
= uth
->uu_kqr_bound
;
2484 workq_threadreq_param_t trp
= { };
2485 int nevents
= uap
->affinity
, error
;
2486 user_addr_t eventlist
= uap
->item
;
2488 if (((thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) == 0) ||
2489 (uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
2493 if (eventlist
&& nevents
&& kqr
== NULL
) {
2497 /* reset signal mask on the workqueue thread to default state */
2498 if (uth
->uu_sigmask
!= (sigset_t
)(~workq_threadmask
)) {
2500 uth
->uu_sigmask
= ~workq_threadmask
;
2504 if (kqr
&& kqr
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
) {
2506 * Ensure we store the threadreq param before unbinding
2507 * the kqr from this thread.
2509 trp
= kqueue_threadreq_workloop_param(kqr
);
2513 * Freeze thee base pri while we decide the fate of this thread.
2516 * - we return to user and kevent_cleanup will have unfrozen the base pri,
2517 * - or we proceed to workq_select_threadreq_or_park_and_unlock() who will.
2519 thread_freeze_base_pri(th
);
2522 uint32_t upcall_flags
= WQ_FLAG_THREAD_NEWSPI
| WQ_FLAG_THREAD_REUSE
;
2523 if (kqr
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
2524 upcall_flags
|= WQ_FLAG_THREAD_WORKLOOP
| WQ_FLAG_THREAD_KEVENT
;
2526 upcall_flags
|= WQ_FLAG_THREAD_KEVENT
;
2528 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
2529 upcall_flags
|= WQ_FLAG_THREAD_EVENT_MANAGER
;
2531 if (uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) {
2532 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
2534 if (uth
->uu_workq_flags
& UT_WORKQ_OUTSIDE_QOS
) {
2535 upcall_flags
|= WQ_FLAG_THREAD_OUTSIDEQOS
;
2537 upcall_flags
|= uth
->uu_workq_pri
.qos_req
|
2538 WQ_FLAG_THREAD_PRIO_QOS
;
2542 error
= pthread_functions
->workq_handle_stack_events(p
, th
,
2543 get_task_map(p
->task
), uth
->uu_workq_stackaddr
,
2544 uth
->uu_workq_thport
, eventlist
, nevents
, upcall_flags
);
2546 assert(uth
->uu_kqr_bound
== kqr
);
2550 // pthread is supposed to pass KEVENT_FLAG_PARKING here
2551 // which should cause the above call to either:
2553 // - return an error
2554 // - return 0 and have unbound properly
2555 assert(uth
->uu_kqr_bound
== NULL
);
2558 WQ_TRACE_WQ(TRACE_wq_runthread
| DBG_FUNC_END
, wq
, uap
->options
, 0, 0, 0);
2560 thread_sched_call(th
, NULL
);
2561 thread_will_park_or_terminate(th
);
2562 #if CONFIG_WORKLOOP_DEBUG
2563 UU_KEVENT_HISTORY_WRITE_ENTRY(uth
, { .uu_error
= -1, });
2566 workq_lock_spin(wq
);
2567 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2568 uth
->uu_save
.uus_workq_park_data
.workloop_params
= trp
.trp_value
;
2569 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
,
2570 WQ_SETUP_CLEAR_VOUCHER
);
2571 __builtin_unreachable();
2575 * Multiplexed call to interact with the workqueue mechanism
2578 workq_kernreturn(struct proc
*p
, struct workq_kernreturn_args
*uap
, int32_t *retval
)
2580 int options
= uap
->options
;
2581 int arg2
= uap
->affinity
;
2582 int arg3
= uap
->prio
;
2583 struct workqueue
*wq
= proc_get_wqptr(p
);
2586 if ((p
->p_lflag
& P_LREGISTER
) == 0) {
2591 case WQOPS_QUEUE_NEWSPISUPP
: {
2593 * arg2 = offset of serialno into dispatch queue
2594 * arg3 = kevent support
2598 // If we get here, then userspace has indicated support for kevent delivery.
2601 p
->p_dispatchqueue_serialno_offset
= (uint64_t)offset
;
2604 case WQOPS_QUEUE_REQTHREADS
: {
2606 * arg2 = number of threads to start
2609 error
= workq_reqthreads(p
, arg2
, arg3
);
2612 case WQOPS_SET_EVENT_MANAGER_PRIORITY
: {
2614 * arg2 = priority for the manager thread
2616 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
2617 * the low bits of the value contains a scheduling priority
2618 * instead of a QOS value
2620 pthread_priority_t pri
= arg2
;
2628 * Normalize the incoming priority so that it is ordered numerically.
2630 if (pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
2631 pri
&= (_PTHREAD_PRIORITY_SCHED_PRI_MASK
|
2632 _PTHREAD_PRIORITY_SCHED_PRI_FLAG
);
2634 thread_qos_t qos
= _pthread_priority_thread_qos(pri
);
2635 int relpri
= _pthread_priority_relpri(pri
);
2636 if (relpri
> 0 || relpri
< THREAD_QOS_MIN_TIER_IMPORTANCE
||
2637 qos
== THREAD_QOS_UNSPECIFIED
) {
2641 pri
&= ~_PTHREAD_PRIORITY_FLAGS_MASK
;
2645 * If userspace passes a scheduling priority, that wins over any QoS.
2646 * Userspace should takes care not to lower the priority this way.
2648 workq_lock_spin(wq
);
2649 if (wq
->wq_event_manager_priority
< (uint32_t)pri
) {
2650 wq
->wq_event_manager_priority
= (uint32_t)pri
;
2655 case WQOPS_THREAD_KEVENT_RETURN
:
2656 case WQOPS_THREAD_WORKLOOP_RETURN
:
2657 case WQOPS_THREAD_RETURN
: {
2658 error
= workq_thread_return(p
, uap
, wq
);
2662 case WQOPS_SHOULD_NARROW
: {
2664 * arg2 = priority to test
2667 thread_t th
= current_thread();
2668 struct uthread
*uth
= get_bsdthread_info(th
);
2669 if (((thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) == 0) ||
2670 (uth
->uu_workq_flags
& (UT_WORKQ_DYING
| UT_WORKQ_OVERCOMMIT
))) {
2675 thread_qos_t qos
= _pthread_priority_thread_qos(arg2
);
2676 if (qos
== THREAD_QOS_UNSPECIFIED
) {
2680 workq_lock_spin(wq
);
2681 bool should_narrow
= !workq_constrained_allowance(wq
, qos
, uth
, false);
2684 *retval
= should_narrow
;
2687 case WQOPS_SETUP_DISPATCH
: {
2689 * item = pointer to workq_dispatch_config structure
2690 * arg2 = sizeof(item)
2692 struct workq_dispatch_config cfg
;
2693 bzero(&cfg
, sizeof(cfg
));
2695 error
= copyin(uap
->item
, &cfg
, MIN(sizeof(cfg
), (unsigned long) arg2
));
2700 if (cfg
.wdc_flags
& ~WORKQ_DISPATCH_SUPPORTED_FLAGS
||
2701 cfg
.wdc_version
< WORKQ_DISPATCH_MIN_SUPPORTED_VERSION
) {
2706 /* Load fields from version 1 */
2707 p
->p_dispatchqueue_serialno_offset
= cfg
.wdc_queue_serialno_offs
;
2709 /* Load fields from version 2 */
2710 if (cfg
.wdc_version
>= 2) {
2711 p
->p_dispatchqueue_label_offset
= cfg
.wdc_queue_label_offs
;
2725 * We have no work to do, park ourselves on the idle list.
2727 * Consumes the workqueue lock and does not return.
2729 __attribute__((noreturn
, noinline
))
2731 workq_park_and_unlock(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
2732 uint32_t setup_flags
)
2734 assert(uth
== current_uthread());
2735 assert(uth
->uu_kqr_bound
== NULL
);
2736 workq_push_idle_thread(p
, wq
, uth
, setup_flags
); // may not return
2738 workq_thread_reset_cpupercent(NULL
, uth
);
2740 if ((uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) &&
2741 !(uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
2745 * workq_push_idle_thread() will unset `has_stack`
2746 * if it wants us to free the stack before parking.
2748 if (!uth
->uu_save
.uus_workq_park_data
.has_stack
) {
2749 pthread_functions
->workq_markfree_threadstack(p
, uth
->uu_thread
,
2750 get_task_map(p
->task
), uth
->uu_workq_stackaddr
);
2754 * When we remove the voucher from the thread, we may lose our importance
2755 * causing us to get preempted, so we do this after putting the thread on
2756 * the idle list. Then, when we get our importance back we'll be able to
2757 * use this thread from e.g. the kevent call out to deliver a boosting
2760 __assert_only kern_return_t kr
;
2761 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
2762 assert(kr
== KERN_SUCCESS
);
2764 workq_lock_spin(wq
);
2765 uth
->uu_workq_flags
&= ~UT_WORKQ_IDLE_CLEANUP
;
2766 setup_flags
&= ~WQ_SETUP_CLEAR_VOUCHER
;
2769 if (uth
->uu_workq_flags
& UT_WORKQ_RUNNING
) {
2771 * While we'd dropped the lock to unset our voucher, someone came
2772 * around and made us runnable. But because we weren't waiting on the
2773 * event their thread_wakeup() was ineffectual. To correct for that,
2774 * we just run the continuation ourselves.
2776 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2777 workq_unpark_select_threadreq_or_park_and_unlock(p
, wq
, uth
, setup_flags
);
2778 __builtin_unreachable();
2781 if (uth
->uu_workq_flags
& UT_WORKQ_DYING
) {
2782 workq_unpark_for_death_and_unlock(p
, wq
, uth
,
2783 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
, setup_flags
);
2784 __builtin_unreachable();
2787 thread_set_pending_block_hint(uth
->uu_thread
, kThreadWaitParkedWorkQueue
);
2788 assert_wait(workq_parked_wait_event(uth
), THREAD_INTERRUPTIBLE
);
2790 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2791 thread_block(workq_unpark_continue
);
2792 __builtin_unreachable();
2796 workq_may_start_event_mgr_thread(struct workqueue
*wq
, struct uthread
*uth
)
2799 * There's an event manager request and either:
2800 * - no event manager currently running
2801 * - we are re-using the event manager
2803 return wq
->wq_thscheduled_count
[_wq_bucket(WORKQ_THREAD_QOS_MANAGER
)] == 0 ||
2804 (uth
&& uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
);
2808 workq_constrained_allowance(struct workqueue
*wq
, thread_qos_t at_qos
,
2809 struct uthread
*uth
, bool may_start_timer
)
2811 assert(at_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2814 uint32_t max_count
= wq
->wq_constrained_threads_scheduled
;
2815 if (uth
&& (uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
2817 * don't count the current thread as scheduled
2819 assert(max_count
> 0);
2822 if (max_count
>= wq_max_constrained_threads
) {
2823 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 1,
2824 wq
->wq_constrained_threads_scheduled
,
2825 wq_max_constrained_threads
, 0);
2827 * we need 1 or more constrained threads to return to the kernel before
2828 * we can dispatch additional work
2832 max_count
-= wq_max_constrained_threads
;
2835 * Compute a metric for many how many threads are active. We find the
2836 * highest priority request outstanding and then add up the number of
2837 * active threads in that and all higher-priority buckets. We'll also add
2838 * any "busy" threads which are not active but blocked recently enough that
2839 * we can't be sure they've gone idle yet. We'll then compare this metric
2840 * to our max concurrency to decide whether to add a new thread.
2843 uint32_t busycount
, thactive_count
;
2845 thactive_count
= _wq_thactive_aggregate_downto_qos(wq
, _wq_thactive(wq
),
2846 at_qos
, &busycount
, NULL
);
2848 if (uth
&& uth
->uu_workq_pri
.qos_bucket
!= WORKQ_THREAD_QOS_MANAGER
&&
2849 at_qos
<= uth
->uu_workq_pri
.qos_bucket
) {
2851 * Don't count this thread as currently active, but only if it's not
2852 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
2855 assert(thactive_count
> 0);
2859 count
= wq_max_parallelism
[_wq_bucket(at_qos
)];
2860 if (count
> thactive_count
+ busycount
) {
2861 count
-= thactive_count
+ busycount
;
2862 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 2,
2863 thactive_count
, busycount
, 0);
2864 return MIN(count
, max_count
);
2866 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 3,
2867 thactive_count
, busycount
, 0);
2870 if (busycount
&& may_start_timer
) {
2872 * If this is called from the add timer, we won't have another timer
2873 * fire when the thread exits the "busy" state, so rearm the timer.
2875 workq_schedule_delayed_thread_creation(wq
, 0);
2882 workq_threadreq_admissible(struct workqueue
*wq
, struct uthread
*uth
,
2883 workq_threadreq_t req
)
2885 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
2886 return workq_may_start_event_mgr_thread(wq
, uth
);
2888 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
2889 return workq_constrained_allowance(wq
, req
->tr_qos
, uth
, true);
2894 static workq_threadreq_t
2895 workq_threadreq_select_for_creator(struct workqueue
*wq
)
2897 workq_threadreq_t req_qos
, req_pri
, req_tmp
;
2898 thread_qos_t qos
= THREAD_QOS_UNSPECIFIED
;
2901 req_tmp
= wq
->wq_event_manager_threadreq
;
2902 if (req_tmp
&& workq_may_start_event_mgr_thread(wq
, NULL
)) {
2907 * Compute the best priority request, and ignore the turnstile for now
2910 req_pri
= priority_queue_max(&wq
->wq_special_queue
,
2911 struct workq_threadreq_s
, tr_entry
);
2913 pri
= priority_queue_entry_key(&wq
->wq_special_queue
, &req_pri
->tr_entry
);
2917 * Compute the best QoS Request, and check whether it beats the "pri" one
2920 req_qos
= priority_queue_max(&wq
->wq_overcommit_queue
,
2921 struct workq_threadreq_s
, tr_entry
);
2923 qos
= req_qos
->tr_qos
;
2926 req_tmp
= priority_queue_max(&wq
->wq_constrained_queue
,
2927 struct workq_threadreq_s
, tr_entry
);
2929 if (req_tmp
&& qos
< req_tmp
->tr_qos
) {
2930 if (pri
&& pri
>= thread_workq_pri_for_qos(req_tmp
->tr_qos
)) {
2934 if (workq_constrained_allowance(wq
, req_tmp
->tr_qos
, NULL
, true)) {
2936 * If the constrained thread request is the best one and passes
2937 * the admission check, pick it.
2943 if (pri
&& (!qos
|| pri
>= thread_workq_pri_for_qos(qos
))) {
2952 * If we had no eligible request but we have a turnstile push,
2953 * it must be a non overcommit thread request that failed
2954 * the admission check.
2956 * Just fake a BG thread request so that if the push stops the creator
2957 * priority just drops to 4.
2959 if (turnstile_workq_proprietor_of_max_turnstile(wq
->wq_turnstile
, NULL
)) {
2960 static struct workq_threadreq_s workq_sync_push_fake_req
= {
2961 .tr_qos
= THREAD_QOS_BACKGROUND
,
2964 return &workq_sync_push_fake_req
;
2970 static workq_threadreq_t
2971 workq_threadreq_select(struct workqueue
*wq
, struct uthread
*uth
)
2973 workq_threadreq_t req_qos
, req_pri
, req_tmp
;
2974 uintptr_t proprietor
;
2975 thread_qos_t qos
= THREAD_QOS_UNSPECIFIED
;
2978 if (uth
== wq
->wq_creator
) {
2982 req_tmp
= wq
->wq_event_manager_threadreq
;
2983 if (req_tmp
&& workq_may_start_event_mgr_thread(wq
, uth
)) {
2988 * Compute the best priority request (special or turnstile)
2991 pri
= turnstile_workq_proprietor_of_max_turnstile(wq
->wq_turnstile
,
2994 struct kqworkloop
*kqwl
= (struct kqworkloop
*)proprietor
;
2995 req_pri
= &kqwl
->kqwl_request
;
2996 if (req_pri
->tr_state
!= WORKQ_TR_STATE_QUEUED
) {
2997 panic("Invalid thread request (%p) state %d",
2998 req_pri
, req_pri
->tr_state
);
3004 req_tmp
= priority_queue_max(&wq
->wq_special_queue
,
3005 struct workq_threadreq_s
, tr_entry
);
3006 if (req_tmp
&& pri
< priority_queue_entry_key(&wq
->wq_special_queue
,
3007 &req_tmp
->tr_entry
)) {
3009 pri
= priority_queue_entry_key(&wq
->wq_special_queue
, &req_tmp
->tr_entry
);
3013 * Compute the best QoS Request, and check whether it beats the "pri" one
3016 req_qos
= priority_queue_max(&wq
->wq_overcommit_queue
,
3017 struct workq_threadreq_s
, tr_entry
);
3019 qos
= req_qos
->tr_qos
;
3022 req_tmp
= priority_queue_max(&wq
->wq_constrained_queue
,
3023 struct workq_threadreq_s
, tr_entry
);
3025 if (req_tmp
&& qos
< req_tmp
->tr_qos
) {
3026 if (pri
&& pri
>= thread_workq_pri_for_qos(req_tmp
->tr_qos
)) {
3030 if (workq_constrained_allowance(wq
, req_tmp
->tr_qos
, uth
, true)) {
3032 * If the constrained thread request is the best one and passes
3033 * the admission check, pick it.
3039 if (req_pri
&& (!qos
|| pri
>= thread_workq_pri_for_qos(qos
))) {
3047 * The creator is an anonymous thread that is counted as scheduled,
3048 * but otherwise without its scheduler callback set or tracked as active
3049 * that is used to make other threads.
3051 * When more requests are added or an existing one is hurried along,
3052 * a creator is elected and setup, or the existing one overridden accordingly.
3054 * While this creator is in flight, because no request has been dequeued,
3055 * already running threads have a chance at stealing thread requests avoiding
3056 * useless context switches, and the creator once scheduled may not find any
3057 * work to do and will then just park again.
3059 * The creator serves the dual purpose of informing the scheduler of work that
3060 * hasn't be materialized as threads yet, and also as a natural pacing mechanism
3061 * for thread creation.
3063 * By being anonymous (and not bound to anything) it means that thread requests
3064 * can be stolen from this creator by threads already on core yielding more
3065 * efficient scheduling and reduced context switches.
3068 workq_schedule_creator(proc_t p
, struct workqueue
*wq
,
3069 workq_kern_threadreq_flags_t flags
)
3071 workq_threadreq_t req
;
3072 struct uthread
*uth
;
3075 workq_lock_held(wq
);
3076 assert(p
|| (flags
& WORKQ_THREADREQ_CAN_CREATE_THREADS
) == 0);
3079 uth
= wq
->wq_creator
;
3081 if (!wq
->wq_reqcount
) {
3083 * There is no thread request left.
3085 * If there is a creator, leave everything in place, so that it cleans
3086 * up itself in workq_push_idle_thread().
3088 * Else, make sure the turnstile state is reset to no inheritor.
3091 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
3096 req
= workq_threadreq_select_for_creator(wq
);
3099 * There isn't a thread request that passes the admission check.
3101 * If there is a creator, do not touch anything, the creator will sort
3102 * it out when it runs.
3104 * Else, set the inheritor to "WORKQ" so that the turnstile propagation
3105 * code calls us if anything changes.
3108 workq_turnstile_update_inheritor(wq
, wq
, TURNSTILE_INHERITOR_WORKQ
);
3115 * We need to maybe override the creator we already have
3117 if (workq_thread_needs_priority_change(req
, uth
)) {
3118 WQ_TRACE_WQ(TRACE_wq_creator_select
| DBG_FUNC_NONE
,
3119 wq
, 1, thread_tid(uth
->uu_thread
), req
->tr_qos
, 0);
3120 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
3122 assert(wq
->wq_inheritor
== uth
->uu_thread
);
3123 } else if (wq
->wq_thidlecount
) {
3125 * We need to unpark a creator thread
3127 wq
->wq_creator
= uth
= workq_pop_idle_thread(wq
, UT_WORKQ_OVERCOMMIT
,
3129 /* Always reset the priorities on the newly chosen creator */
3130 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
3131 workq_turnstile_update_inheritor(wq
, uth
->uu_thread
,
3132 TURNSTILE_INHERITOR_THREAD
);
3133 WQ_TRACE_WQ(TRACE_wq_creator_select
| DBG_FUNC_NONE
,
3134 wq
, 2, thread_tid(uth
->uu_thread
), req
->tr_qos
, 0);
3135 uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
= wq
->wq_fulfilled
;
3136 uth
->uu_save
.uus_workq_park_data
.yields
= 0;
3138 workq_thread_wakeup(uth
);
3142 * We need to allocate a thread...
3144 if (__improbable(wq
->wq_nthreads
>= wq_max_threads
)) {
3145 /* out of threads, just go away */
3146 flags
= WORKQ_THREADREQ_NONE
;
3147 } else if (flags
& WORKQ_THREADREQ_SET_AST_ON_FAILURE
) {
3148 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ
);
3149 } else if (!(flags
& WORKQ_THREADREQ_CAN_CREATE_THREADS
)) {
3150 /* This can drop the workqueue lock, and take it again */
3151 workq_schedule_immediate_thread_creation(wq
);
3152 } else if (workq_add_new_idle_thread(p
, wq
)) {
3155 workq_schedule_delayed_thread_creation(wq
, 0);
3159 * If the current thread is the inheritor:
3161 * If we set the AST, then the thread will stay the inheritor until
3162 * either the AST calls workq_kern_threadreq_redrive(), or it parks
3163 * and calls workq_push_idle_thread().
3165 * Else, the responsibility of the thread creation is with a thread-call
3166 * and we need to clear the inheritor.
3168 if ((flags
& WORKQ_THREADREQ_SET_AST_ON_FAILURE
) == 0 &&
3169 wq
->wq_inheritor
== current_thread()) {
3170 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
3176 * Same as workq_unpark_select_threadreq_or_park_and_unlock,
3177 * but do not allow early binds.
3179 * Called with the base pri frozen, will unfreeze it.
3181 __attribute__((noreturn
, noinline
))
3183 workq_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
3184 struct uthread
*uth
, uint32_t setup_flags
)
3186 workq_threadreq_t req
= NULL
;
3187 bool is_creator
= (wq
->wq_creator
== uth
);
3188 bool schedule_creator
= false;
3190 if (__improbable(_wq_exiting(wq
))) {
3191 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
3195 if (wq
->wq_reqcount
== 0) {
3196 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 1, 0, 0, 0);
3200 req
= workq_threadreq_select(wq
, uth
);
3201 if (__improbable(req
== NULL
)) {
3202 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 2, 0, 0, 0);
3206 uint8_t tr_flags
= req
->tr_flags
;
3207 struct turnstile
*req_ts
= kqueue_threadreq_get_turnstile(req
);
3210 * Attempt to setup ourselves as the new thing to run, moving all priority
3211 * pushes to ourselves.
3213 * If the current thread is the creator, then the fact that we are presently
3214 * running is proof that we'll do something useful, so keep going.
3216 * For other cases, peek at the AST to know whether the scheduler wants
3217 * to preempt us, if yes, park instead, and move the thread request
3218 * turnstile back to the workqueue.
3221 workq_perform_turnstile_operation_locked(wq
, ^{
3222 turnstile_update_inheritor(req_ts
, uth
->uu_thread
,
3223 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_THREAD
);
3224 turnstile_update_inheritor_complete(req_ts
,
3225 TURNSTILE_INTERLOCK_HELD
);
3230 WQ_TRACE_WQ(TRACE_wq_creator_select
, wq
, 4, 0,
3231 uth
->uu_save
.uus_workq_park_data
.yields
, 0);
3232 wq
->wq_creator
= NULL
;
3233 _wq_thactive_inc(wq
, req
->tr_qos
);
3234 wq
->wq_thscheduled_count
[_wq_bucket(req
->tr_qos
)]++;
3235 } else if (uth
->uu_workq_pri
.qos_bucket
!= req
->tr_qos
) {
3236 _wq_thactive_move(wq
, uth
->uu_workq_pri
.qos_bucket
, req
->tr_qos
);
3239 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
3241 thread_unfreeze_base_pri(uth
->uu_thread
);
3242 #if 0 // <rdar://problem/55259863> to turn this back on
3243 if (__improbable(thread_unfreeze_base_pri(uth
->uu_thread
) && !is_creator
)) {
3245 workq_perform_turnstile_operation_locked(wq
, ^{
3246 turnstile_update_inheritor(req_ts
, wq
->wq_turnstile
,
3247 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_TURNSTILE
);
3248 turnstile_update_inheritor_complete(req_ts
,
3249 TURNSTILE_INTERLOCK_HELD
);
3252 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 3, 0, 0, 0);
3258 * We passed all checks, dequeue the request, bind to it, and set it up
3259 * to return to user.
3261 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
3262 workq_trace_req_id(req
), 0, 0, 0);
3264 schedule_creator
= workq_threadreq_dequeue(wq
, req
);
3266 if (tr_flags
& (WORKQ_TR_FLAG_KEVENT
| WORKQ_TR_FLAG_WORKLOOP
)) {
3267 kqueue_threadreq_bind_prepost(p
, req
, uth
);
3269 } else if (req
->tr_count
> 0) {
3273 workq_thread_reset_cpupercent(req
, uth
);
3274 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
3275 uth
->uu_workq_flags
^= UT_WORKQ_NEW
;
3276 setup_flags
|= WQ_SETUP_FIRST_USE
;
3278 if (tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
3279 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
3280 uth
->uu_workq_flags
|= UT_WORKQ_OVERCOMMIT
;
3281 wq
->wq_constrained_threads_scheduled
--;
3284 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) != 0) {
3285 uth
->uu_workq_flags
&= ~UT_WORKQ_OVERCOMMIT
;
3286 wq
->wq_constrained_threads_scheduled
++;
3290 if (is_creator
|| schedule_creator
) {
3291 /* This can drop the workqueue lock, and take it again */
3292 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
3298 zfree(workq_zone_threadreq
, req
);
3304 uint32_t upcall_flags
= WQ_FLAG_THREAD_NEWSPI
;
3305 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
3306 upcall_flags
|= WQ_FLAG_THREAD_EVENT_MANAGER
;
3307 } else if (tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
3308 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
3310 if (tr_flags
& WORKQ_TR_FLAG_KEVENT
) {
3311 upcall_flags
|= WQ_FLAG_THREAD_KEVENT
;
3313 if (tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
3314 upcall_flags
|= WQ_FLAG_THREAD_WORKLOOP
| WQ_FLAG_THREAD_KEVENT
;
3316 uth
->uu_save
.uus_workq_park_data
.upcall_flags
= upcall_flags
;
3318 if (tr_flags
& (WORKQ_TR_FLAG_KEVENT
| WORKQ_TR_FLAG_WORKLOOP
)) {
3319 kqueue_threadreq_bind_commit(p
, uth
->uu_thread
);
3321 workq_setup_and_run(p
, uth
, setup_flags
);
3322 __builtin_unreachable();
3325 thread_unfreeze_base_pri(uth
->uu_thread
);
3326 #if 0 // <rdar://problem/55259863>
3329 workq_park_and_unlock(p
, wq
, uth
, setup_flags
);
3333 * Runs a thread request on a thread
3335 * - if thread is THREAD_NULL, will find a thread and run the request there.
3336 * Otherwise, the thread must be the current thread.
3338 * - if req is NULL, will find the highest priority request and run that. If
3339 * it is not NULL, it must be a threadreq object in state NEW. If it can not
3340 * be run immediately, it will be enqueued and moved to state QUEUED.
3342 * Either way, the thread request object serviced will be moved to state
3343 * BINDING and attached to the uthread.
3345 * Should be called with the workqueue lock held. Will drop it.
3346 * Should be called with the base pri not frozen.
3348 __attribute__((noreturn
, noinline
))
3350 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
3351 struct uthread
*uth
, uint32_t setup_flags
)
3353 if (uth
->uu_workq_flags
& UT_WORKQ_EARLY_BOUND
) {
3354 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
3355 setup_flags
|= WQ_SETUP_FIRST_USE
;
3357 uth
->uu_workq_flags
&= ~(UT_WORKQ_NEW
| UT_WORKQ_EARLY_BOUND
);
3359 * This pointer is possibly freed and only used for tracing purposes.
3361 workq_threadreq_t req
= uth
->uu_save
.uus_workq_park_data
.thread_request
;
3363 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
3364 VM_KERNEL_ADDRHIDE(req
), 0, 0, 0);
3366 workq_setup_and_run(p
, uth
, setup_flags
);
3367 __builtin_unreachable();
3370 thread_freeze_base_pri(uth
->uu_thread
);
3371 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
, setup_flags
);
3375 workq_creator_should_yield(struct workqueue
*wq
, struct uthread
*uth
)
3377 thread_qos_t qos
= workq_pri_override(uth
->uu_workq_pri
);
3379 if (qos
>= THREAD_QOS_USER_INTERACTIVE
) {
3383 uint32_t snapshot
= uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
;
3384 if (wq
->wq_fulfilled
== snapshot
) {
3388 uint32_t cnt
= 0, conc
= wq_max_parallelism
[_wq_bucket(qos
)];
3389 if (wq
->wq_fulfilled
- snapshot
> conc
) {
3390 /* we fulfilled more than NCPU requests since being dispatched */
3391 WQ_TRACE_WQ(TRACE_wq_creator_yield
, wq
, 1,
3392 wq
->wq_fulfilled
, snapshot
, 0);
3396 for (int i
= _wq_bucket(qos
); i
< WORKQ_NUM_QOS_BUCKETS
; i
++) {
3397 cnt
+= wq
->wq_thscheduled_count
[i
];
3400 /* We fulfilled requests and have more than NCPU scheduled threads */
3401 WQ_TRACE_WQ(TRACE_wq_creator_yield
, wq
, 2,
3402 wq
->wq_fulfilled
, snapshot
, 0);
3410 * parked thread wakes up
3412 __attribute__((noreturn
, noinline
))
3414 workq_unpark_continue(void *parameter __unused
, wait_result_t wr __unused
)
3416 thread_t th
= current_thread();
3417 struct uthread
*uth
= get_bsdthread_info(th
);
3418 proc_t p
= current_proc();
3419 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
3421 workq_lock_spin(wq
);
3423 if (wq
->wq_creator
== uth
&& workq_creator_should_yield(wq
, uth
)) {
3425 * If the number of threads we have out are able to keep up with the
3426 * demand, then we should avoid sending this creator thread to
3429 uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
= wq
->wq_fulfilled
;
3430 uth
->uu_save
.uus_workq_park_data
.yields
++;
3432 thread_yield_with_continuation(workq_unpark_continue
, NULL
);
3433 __builtin_unreachable();
3436 if (__probable(uth
->uu_workq_flags
& UT_WORKQ_RUNNING
)) {
3437 workq_unpark_select_threadreq_or_park_and_unlock(p
, wq
, uth
, WQ_SETUP_NONE
);
3438 __builtin_unreachable();
3441 if (__probable(wr
== THREAD_AWAKENED
)) {
3443 * We were set running, but for the purposes of dying.
3445 assert(uth
->uu_workq_flags
& UT_WORKQ_DYING
);
3446 assert((uth
->uu_workq_flags
& UT_WORKQ_NEW
) == 0);
3449 * workaround for <rdar://problem/38647347>,
3450 * in case we do hit userspace, make sure calling
3451 * workq_thread_terminate() does the right thing here,
3452 * and if we never call it, that workq_exit() will too because it sees
3453 * this thread on the runlist.
3455 assert(wr
== THREAD_INTERRUPTED
);
3456 wq
->wq_thdying_count
++;
3457 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
3460 workq_unpark_for_death_and_unlock(p
, wq
, uth
,
3461 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
, WQ_SETUP_NONE
);
3462 __builtin_unreachable();
3465 __attribute__((noreturn
, noinline
))
3467 workq_setup_and_run(proc_t p
, struct uthread
*uth
, int setup_flags
)
3469 thread_t th
= uth
->uu_thread
;
3470 vm_map_t vmap
= get_task_map(p
->task
);
3472 if (setup_flags
& WQ_SETUP_CLEAR_VOUCHER
) {
3474 * For preemption reasons, we want to reset the voucher as late as
3475 * possible, so we do it in two places:
3476 * - Just before parking (i.e. in workq_park_and_unlock())
3477 * - Prior to doing the setup for the next workitem (i.e. here)
3479 * Those two places are sufficient to ensure we always reset it before
3480 * it goes back out to user space, but be careful to not break that
3483 __assert_only kern_return_t kr
;
3484 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
3485 assert(kr
== KERN_SUCCESS
);
3488 uint32_t upcall_flags
= uth
->uu_save
.uus_workq_park_data
.upcall_flags
;
3489 if (!(setup_flags
& WQ_SETUP_FIRST_USE
)) {
3490 upcall_flags
|= WQ_FLAG_THREAD_REUSE
;
3493 if (uth
->uu_workq_flags
& UT_WORKQ_OUTSIDE_QOS
) {
3495 * For threads that have an outside-of-QoS thread priority, indicate
3496 * to userspace that setting QoS should only affect the TSD and not
3497 * change QOS in the kernel.
3499 upcall_flags
|= WQ_FLAG_THREAD_OUTSIDEQOS
;
3502 * Put the QoS class value into the lower bits of the reuse_thread
3503 * register, this is where the thread priority used to be stored
3506 upcall_flags
|= uth
->uu_save
.uus_workq_park_data
.qos
|
3507 WQ_FLAG_THREAD_PRIO_QOS
;
3510 if (uth
->uu_workq_thport
== MACH_PORT_NULL
) {
3511 /* convert_thread_to_port() consumes a reference */
3512 thread_reference(th
);
3513 ipc_port_t port
= convert_thread_to_port(th
);
3514 uth
->uu_workq_thport
= ipc_port_copyout_send(port
, get_task_ipcspace(p
->task
));
3518 * Call out to pthread, this sets up the thread, pulls in kevent structs
3519 * onto the stack, sets up the thread state and then returns to userspace.
3521 WQ_TRACE_WQ(TRACE_wq_runthread
| DBG_FUNC_START
,
3522 proc_get_wqptr_fast(p
), 0, 0, 0, 0);
3523 thread_sched_call(th
, workq_sched_callback
);
3524 pthread_functions
->workq_setup_thread(p
, th
, vmap
, uth
->uu_workq_stackaddr
,
3525 uth
->uu_workq_thport
, 0, setup_flags
, upcall_flags
);
3527 __builtin_unreachable();
3533 fill_procworkqueue(proc_t p
, struct proc_workqueueinfo
* pwqinfo
)
3535 struct workqueue
*wq
= proc_get_wqptr(p
);
3544 * This is sometimes called from interrupt context by the kperf sampler.
3545 * In that case, it's not safe to spin trying to take the lock since we
3546 * might already hold it. So, we just try-lock it and error out if it's
3547 * already held. Since this is just a debugging aid, and all our callers
3548 * are able to handle an error, that's fine.
3550 bool locked
= workq_lock_try(wq
);
3555 wq_thactive_t act
= _wq_thactive(wq
);
3556 activecount
= _wq_thactive_aggregate_downto_qos(wq
, act
,
3557 WORKQ_THREAD_QOS_MIN
, NULL
, NULL
);
3558 if (act
& _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER
)) {
3561 pwqinfo
->pwq_nthreads
= wq
->wq_nthreads
;
3562 pwqinfo
->pwq_runthreads
= activecount
;
3563 pwqinfo
->pwq_blockedthreads
= wq
->wq_threads_scheduled
- activecount
;
3564 pwqinfo
->pwq_state
= 0;
3566 if (wq
->wq_constrained_threads_scheduled
>= wq_max_constrained_threads
) {
3567 pwqinfo
->pwq_state
|= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
;
3570 if (wq
->wq_nthreads
>= wq_max_threads
) {
3571 pwqinfo
->pwq_state
|= WQ_EXCEEDED_TOTAL_THREAD_LIMIT
;
3579 workqueue_get_pwq_exceeded(void *v
, boolean_t
*exceeded_total
,
3580 boolean_t
*exceeded_constrained
)
3583 struct proc_workqueueinfo pwqinfo
;
3587 assert(exceeded_total
!= NULL
);
3588 assert(exceeded_constrained
!= NULL
);
3590 err
= fill_procworkqueue(p
, &pwqinfo
);
3594 if (!(pwqinfo
.pwq_state
& WQ_FLAGS_AVAILABLE
)) {
3598 *exceeded_total
= (pwqinfo
.pwq_state
& WQ_EXCEEDED_TOTAL_THREAD_LIMIT
);
3599 *exceeded_constrained
= (pwqinfo
.pwq_state
& WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
);
3605 workqueue_get_pwq_state_kdp(void * v
)
3607 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
<< 17) ==
3608 kTaskWqExceededConstrainedThreadLimit
);
3609 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT
<< 17) ==
3610 kTaskWqExceededTotalThreadLimit
);
3611 static_assert((WQ_FLAGS_AVAILABLE
<< 17) == kTaskWqFlagsAvailable
);
3612 static_assert((WQ_FLAGS_AVAILABLE
| WQ_EXCEEDED_TOTAL_THREAD_LIMIT
|
3613 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
) == 0x7);
3620 struct workqueue
*wq
= proc_get_wqptr(p
);
3622 if (wq
== NULL
|| workq_lock_spin_is_acquired_kdp(wq
)) {
3626 uint32_t pwq_state
= WQ_FLAGS_AVAILABLE
;
3628 if (wq
->wq_constrained_threads_scheduled
>= wq_max_constrained_threads
) {
3629 pwq_state
|= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
;
3632 if (wq
->wq_nthreads
>= wq_max_threads
) {
3633 pwq_state
|= WQ_EXCEEDED_TOTAL_THREAD_LIMIT
;
3642 workq_lck_grp_attr
= lck_grp_attr_alloc_init();
3643 workq_lck_attr
= lck_attr_alloc_init();
3644 workq_lck_grp
= lck_grp_alloc_init("workq", workq_lck_grp_attr
);
3646 workq_zone_workqueue
= zinit(sizeof(struct workqueue
),
3647 1024 * sizeof(struct workqueue
), 8192, "workq.wq");
3648 workq_zone_threadreq
= zinit(sizeof(struct workq_threadreq_s
),
3649 1024 * sizeof(struct workq_threadreq_s
), 8192, "workq.threadreq");
3651 clock_interval_to_absolutetime_interval(wq_stalled_window
.usecs
,
3652 NSEC_PER_USEC
, &wq_stalled_window
.abstime
);
3653 clock_interval_to_absolutetime_interval(wq_reduce_pool_window
.usecs
,
3654 NSEC_PER_USEC
, &wq_reduce_pool_window
.abstime
);
3655 clock_interval_to_absolutetime_interval(wq_max_timer_interval
.usecs
,
3656 NSEC_PER_USEC
, &wq_max_timer_interval
.abstime
);
3658 thread_deallocate_daemon_register_queue(&workq_deallocate_queue
,
3659 workq_deallocate_queue_invoke
);