2 * Copyright (c) 2000-2017 Apple Inc. All rights reserved.
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
30 #include <sys/cdefs.h>
32 // <rdar://problem/26158937> panic() should be marked noreturn
33 extern void panic(const char *string
, ...) __printflike(1,2) __dead2
;
35 #include <kern/assert.h>
37 #include <kern/clock.h>
38 #include <kern/cpu_data.h>
39 #include <kern/kern_types.h>
40 #include <kern/policy_internal.h>
41 #include <kern/processor.h>
42 #include <kern/sched_prim.h> /* for thread_exception_return */
43 #include <kern/task.h>
44 #include <kern/thread.h>
45 #include <kern/zalloc.h>
46 #include <mach/kern_return.h>
47 #include <mach/mach_param.h>
48 #include <mach/mach_port.h>
49 #include <mach/mach_types.h>
50 #include <mach/mach_vm.h>
51 #include <mach/sync_policy.h>
52 #include <mach/task.h>
53 #include <mach/thread_act.h> /* for thread_resume */
54 #include <mach/thread_policy.h>
55 #include <mach/thread_status.h>
56 #include <mach/vm_prot.h>
57 #include <mach/vm_statistics.h>
58 #include <machine/atomic.h>
59 #include <machine/machine_routines.h>
60 #include <vm/vm_map.h>
61 #include <vm/vm_protos.h>
63 #include <sys/eventvar.h>
64 #include <sys/kdebug.h>
65 #include <sys/kernel.h>
67 #include <sys/param.h>
68 #include <sys/proc_info.h> /* for fill_procworkqueue */
69 #include <sys/proc_internal.h>
70 #include <sys/pthread_shims.h>
71 #include <sys/resourcevar.h>
72 #include <sys/signalvar.h>
73 #include <sys/sysctl.h>
74 #include <sys/sysproto.h>
75 #include <sys/systm.h>
76 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
78 #include <pthread/bsdthread_private.h>
79 #include <pthread/workqueue_syscalls.h>
80 #include <pthread/workqueue_internal.h>
81 #include <pthread/workqueue_trace.h>
85 extern thread_t
port_name_to_thread(mach_port_name_t port_name
); /* osfmk/kern/ipc_tt.h */
87 static void workq_unpark_continue(void *uth
, wait_result_t wr
) __dead2
;
88 static void workq_schedule_creator(proc_t p
, struct workqueue
*wq
, int flags
);
90 static bool workq_threadreq_admissible(struct workqueue
*wq
, struct uthread
*uth
,
91 workq_threadreq_t req
);
93 static uint32_t workq_constrained_allowance(struct workqueue
*wq
,
94 thread_qos_t at_qos
, struct uthread
*uth
, bool may_start_timer
);
96 static bool workq_thread_is_busy(uint64_t cur_ts
,
97 _Atomic
uint64_t *lastblocked_tsp
);
99 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
;
103 struct workq_usec_var
{
108 #define WORKQ_SYSCTL_USECS(var, init) \
109 static struct workq_usec_var var = { .usecs = init }; \
110 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
111 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
112 workq_sysctl_handle_usecs, "I", "")
114 static lck_grp_t
*workq_lck_grp
;
115 static lck_attr_t
*workq_lck_attr
;
116 static lck_grp_attr_t
*workq_lck_grp_attr
;
117 os_refgrp_decl(static, workq_refgrp
, "workq", NULL
);
119 static zone_t workq_zone_workqueue
;
120 static zone_t workq_zone_threadreq
;
122 WORKQ_SYSCTL_USECS(wq_stalled_window
, WQ_STALLED_WINDOW_USECS
);
123 WORKQ_SYSCTL_USECS(wq_reduce_pool_window
, WQ_REDUCE_POOL_WINDOW_USECS
);
124 WORKQ_SYSCTL_USECS(wq_max_timer_interval
, WQ_MAX_TIMER_INTERVAL_USECS
);
125 static uint32_t wq_max_threads
= WORKQUEUE_MAXTHREADS
;
126 static uint32_t wq_max_constrained_threads
= WORKQUEUE_MAXTHREADS
/ 8;
127 static uint32_t wq_init_constrained_limit
= 1;
128 static uint16_t wq_death_max_load
;
129 static uint32_t wq_max_parallelism
[WORKQ_NUM_QOS_BUCKETS
];
134 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
137 struct workq_usec_var
*v
= arg1
;
138 int error
= sysctl_handle_int(oidp
, &v
->usecs
, 0, req
);
139 if (error
|| !req
->newptr
)
141 clock_interval_to_absolutetime_interval(v
->usecs
, NSEC_PER_USEC
,
146 SYSCTL_INT(_kern
, OID_AUTO
, wq_max_threads
, CTLFLAG_RW
| CTLFLAG_LOCKED
,
147 &wq_max_threads
, 0, "");
149 SYSCTL_INT(_kern
, OID_AUTO
, wq_max_constrained_threads
, CTLFLAG_RW
| CTLFLAG_LOCKED
,
150 &wq_max_constrained_threads
, 0, "");
154 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
156 static struct workqueue
*
157 proc_get_wqptr_fast(struct proc
*p
)
159 return os_atomic_load(&p
->p_wqptr
, relaxed
);
162 static struct workqueue
*
163 proc_get_wqptr(struct proc
*p
)
165 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
166 return wq
== WQPTR_IS_INITING_VALUE
? NULL
: wq
;
170 proc_set_wqptr(struct proc
*p
, struct workqueue
*wq
)
172 wq
= os_atomic_xchg(&p
->p_wqptr
, wq
, release
);
173 if (wq
== WQPTR_IS_INITING_VALUE
) {
175 thread_wakeup(&p
->p_wqptr
);
181 proc_init_wqptr_or_wait(struct proc
*p
)
183 struct workqueue
*wq
;
189 p
->p_wqptr
= WQPTR_IS_INITING_VALUE
;
194 if (wq
== WQPTR_IS_INITING_VALUE
) {
195 assert_wait(&p
->p_wqptr
, THREAD_UNINT
);
197 thread_block(THREAD_CONTINUE_NULL
);
204 static inline event_t
205 workq_parked_wait_event(struct uthread
*uth
)
207 return (event_t
)&uth
->uu_workq_stackaddr
;
211 workq_thread_wakeup(struct uthread
*uth
)
213 if ((uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) == 0) {
214 thread_wakeup_thread(workq_parked_wait_event(uth
), uth
->uu_thread
);
218 #pragma mark wq_thactive
220 #if defined(__LP64__)
222 // 127 - 115 : 13 bits of zeroes
223 // 114 - 112 : best QoS among all pending constrained requests
224 // 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
225 #define WQ_THACTIVE_BUCKET_WIDTH 16
226 #define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH)
229 // 63 - 61 : best QoS among all pending constrained requests
230 // 60 : Manager bucket (0 or 1)
231 // 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
232 #define WQ_THACTIVE_BUCKET_WIDTH 10
233 #define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
235 #define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
236 #define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
238 static_assert(sizeof(wq_thactive_t
) * CHAR_BIT
- WQ_THACTIVE_QOS_SHIFT
>= 3,
239 "Make sure we have space to encode a QoS");
241 static inline wq_thactive_t
242 _wq_thactive(struct workqueue
*wq
)
244 return os_atomic_load(&wq
->wq_thactive
, relaxed
);
248 _wq_bucket(thread_qos_t qos
)
250 // Map both BG and MT to the same bucket by over-shifting down and
251 // clamping MT and BG together.
253 case THREAD_QOS_MAINTENANCE
:
260 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
261 ((tha) >> WQ_THACTIVE_QOS_SHIFT)
263 static inline thread_qos_t
264 _wq_thactive_best_constrained_req_qos(struct workqueue
*wq
)
266 // Avoid expensive atomic operations: the three bits we're loading are in
267 // a single byte, and always updated under the workqueue lock
268 wq_thactive_t v
= *(wq_thactive_t
*)&wq
->wq_thactive
;
269 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v
);
273 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue
*wq
)
275 thread_qos_t old_qos
, new_qos
;
276 workq_threadreq_t req
;
278 req
= priority_queue_max(&wq
->wq_constrained_queue
,
279 struct workq_threadreq_s
, tr_entry
);
280 new_qos
= req
? req
->tr_qos
: THREAD_QOS_UNSPECIFIED
;
281 old_qos
= _wq_thactive_best_constrained_req_qos(wq
);
282 if (old_qos
!= new_qos
) {
283 long delta
= (long)new_qos
- (long)old_qos
;
284 wq_thactive_t v
= (wq_thactive_t
)delta
<< WQ_THACTIVE_QOS_SHIFT
;
286 * We can do an atomic add relative to the initial load because updates
287 * to this qos are always serialized under the workqueue lock.
289 v
= os_atomic_add(&wq
->wq_thactive
, v
, relaxed
);
291 WQ_TRACE_WQ(TRACE_wq_thactive_update
, wq
, (uint64_t)v
,
292 (uint64_t)(v
>> 64), 0, 0);
294 WQ_TRACE_WQ(TRACE_wq_thactive_update
, wq
, v
, 0, 0, 0);
299 static inline wq_thactive_t
300 _wq_thactive_offset_for_qos(thread_qos_t qos
)
302 return (wq_thactive_t
)1 << (_wq_bucket(qos
) * WQ_THACTIVE_BUCKET_WIDTH
);
305 static inline wq_thactive_t
306 _wq_thactive_inc(struct workqueue
*wq
, thread_qos_t qos
)
308 wq_thactive_t v
= _wq_thactive_offset_for_qos(qos
);
309 return os_atomic_add_orig(&wq
->wq_thactive
, v
, relaxed
);
312 static inline wq_thactive_t
313 _wq_thactive_dec(struct workqueue
*wq
, thread_qos_t qos
)
315 wq_thactive_t v
= _wq_thactive_offset_for_qos(qos
);
316 return os_atomic_sub_orig(&wq
->wq_thactive
, v
, relaxed
);
320 _wq_thactive_move(struct workqueue
*wq
,
321 thread_qos_t old_qos
, thread_qos_t new_qos
)
323 wq_thactive_t v
= _wq_thactive_offset_for_qos(new_qos
) -
324 _wq_thactive_offset_for_qos(old_qos
);
325 os_atomic_add_orig(&wq
->wq_thactive
, v
, relaxed
);
326 wq
->wq_thscheduled_count
[_wq_bucket(old_qos
)]--;
327 wq
->wq_thscheduled_count
[_wq_bucket(new_qos
)]++;
330 static inline uint32_t
331 _wq_thactive_aggregate_downto_qos(struct workqueue
*wq
, wq_thactive_t v
,
332 thread_qos_t qos
, uint32_t *busycount
, uint32_t *max_busycount
)
334 uint32_t count
= 0, active
;
337 assert(WORKQ_THREAD_QOS_MIN
<= qos
&& qos
<= WORKQ_THREAD_QOS_MAX
);
340 curtime
= mach_absolute_time();
344 *max_busycount
= THREAD_QOS_LAST
- qos
;
347 int i
= _wq_bucket(qos
);
348 v
>>= i
* WQ_THACTIVE_BUCKET_WIDTH
;
349 for (; i
< WORKQ_NUM_QOS_BUCKETS
; i
++, v
>>= WQ_THACTIVE_BUCKET_WIDTH
) {
350 active
= v
& WQ_THACTIVE_BUCKET_MASK
;
353 if (busycount
&& wq
->wq_thscheduled_count
[i
] > active
) {
354 if (workq_thread_is_busy(curtime
, &wq
->wq_lastblocked_ts
[i
])) {
356 * We only consider the last blocked thread for a given bucket
357 * as busy because we don't want to take the list lock in each
358 * sched callback. However this is an approximation that could
359 * contribute to thread creation storms.
369 #pragma mark wq_flags
371 static inline uint32_t
372 _wq_flags(struct workqueue
*wq
)
374 return os_atomic_load(&wq
->wq_flags
, relaxed
);
378 _wq_exiting(struct workqueue
*wq
)
380 return _wq_flags(wq
) & WQ_EXITING
;
384 workq_is_exiting(struct proc
*p
)
386 struct workqueue
*wq
= proc_get_wqptr(p
);
387 return !wq
|| _wq_exiting(wq
);
391 workq_turnstile(struct proc
*p
)
393 struct workqueue
*wq
= proc_get_wqptr(p
);
394 return wq
? wq
->wq_turnstile
: TURNSTILE_NULL
;
397 #pragma mark workqueue lock
400 workq_lock_spin_is_acquired_kdp(struct workqueue
*wq
)
402 return kdp_lck_spin_is_acquired(&wq
->wq_lock
);
406 workq_lock_spin(struct workqueue
*wq
)
408 lck_spin_lock(&wq
->wq_lock
);
412 workq_lock_held(__assert_only
struct workqueue
*wq
)
414 LCK_SPIN_ASSERT(&wq
->wq_lock
, LCK_ASSERT_OWNED
);
418 workq_lock_try(struct workqueue
*wq
)
420 return lck_spin_try_lock(&wq
->wq_lock
);
424 workq_unlock(struct workqueue
*wq
)
426 lck_spin_unlock(&wq
->wq_lock
);
429 #pragma mark idle thread lists
431 #define WORKQ_POLICY_INIT(qos) \
432 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
434 static inline thread_qos_t
435 workq_pri_bucket(struct uu_workq_policy req
)
437 return MAX(MAX(req
.qos_req
, req
.qos_max
), req
.qos_override
);
440 static inline thread_qos_t
441 workq_pri_override(struct uu_workq_policy req
)
443 return MAX(workq_pri_bucket(req
), req
.qos_bucket
);
447 workq_thread_needs_params_change(workq_threadreq_t req
, struct uthread
*uth
)
449 workq_threadreq_param_t cur_trp
, req_trp
= { };
451 cur_trp
.trp_value
= uth
->uu_save
.uus_workq_park_data
.workloop_params
;
452 if (req
->tr_flags
& TR_FLAG_WL_PARAMS
) {
453 req_trp
= kqueue_threadreq_workloop_param(req
);
457 * CPU percent flags are handled separately to policy changes, so ignore
458 * them for all of these checks.
460 uint16_t cur_flags
= (cur_trp
.trp_flags
& ~TRP_CPUPERCENT
);
461 uint16_t req_flags
= (req_trp
.trp_flags
& ~TRP_CPUPERCENT
);
463 if (!req_flags
&& !cur_flags
) {
467 if (req_flags
!= cur_flags
) {
471 if ((req_flags
& TRP_PRIORITY
) && req_trp
.trp_pri
!= cur_trp
.trp_pri
) {
475 if ((req_flags
& TRP_POLICY
) && cur_trp
.trp_pol
!= cur_trp
.trp_pol
) {
483 workq_thread_needs_priority_change(workq_threadreq_t req
, struct uthread
*uth
)
485 if (workq_thread_needs_params_change(req
, uth
)) {
489 return req
->tr_qos
!= workq_pri_override(uth
->uu_workq_pri
);
493 workq_thread_update_bucket(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
494 struct uu_workq_policy old_pri
, struct uu_workq_policy new_pri
,
497 thread_qos_t old_bucket
= old_pri
.qos_bucket
;
498 thread_qos_t new_bucket
= workq_pri_bucket(new_pri
);
500 if (old_bucket
!= new_bucket
) {
501 _wq_thactive_move(wq
, old_bucket
, new_bucket
);
504 new_pri
.qos_bucket
= new_bucket
;
505 uth
->uu_workq_pri
= new_pri
;
507 if (workq_pri_override(old_pri
) != new_bucket
) {
508 thread_set_workq_override(uth
->uu_thread
, new_bucket
);
511 if (wq
->wq_reqcount
&& (old_bucket
> new_bucket
|| force_run
)) {
512 int flags
= WORKQ_THREADREQ_CAN_CREATE_THREADS
;
513 if (old_bucket
> new_bucket
) {
515 * When lowering our bucket, we may unblock a thread request,
516 * but we can't drop our priority before we have evaluated
517 * whether this is the case, and if we ever drop the workqueue lock
518 * that would cause a priority inversion.
520 * We hence have to disallow thread creation in that case.
524 workq_schedule_creator(p
, wq
, flags
);
529 * Sets/resets the cpu percent limits on the current thread. We can't set
530 * these limits from outside of the current thread, so this function needs
531 * to be called when we're executing on the intended
534 workq_thread_reset_cpupercent(workq_threadreq_t req
, struct uthread
*uth
)
536 assert(uth
== current_uthread());
537 workq_threadreq_param_t trp
= { };
539 if (req
&& (req
->tr_flags
& TR_FLAG_WL_PARAMS
)) {
540 trp
= kqueue_threadreq_workloop_param(req
);
543 if (uth
->uu_workq_flags
& UT_WORKQ_CPUPERCENT
) {
545 * Going through disable when we have an existing CPU percent limit
546 * set will force the ledger to refill the token bucket of the current
547 * thread. Removing any penalty applied by previous thread use.
549 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE
, 0, 0);
550 uth
->uu_workq_flags
&= ~UT_WORKQ_CPUPERCENT
;
553 if (trp
.trp_flags
& TRP_CPUPERCENT
) {
554 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK
, trp
.trp_cpupercent
,
555 (uint64_t)trp
.trp_refillms
* NSEC_PER_SEC
);
556 uth
->uu_workq_flags
|= UT_WORKQ_CPUPERCENT
;
561 workq_thread_reset_pri(struct workqueue
*wq
, struct uthread
*uth
,
562 workq_threadreq_t req
)
564 thread_t th
= uth
->uu_thread
;
565 thread_qos_t qos
= req
? req
->tr_qos
: WORKQ_THREAD_QOS_CLEANUP
;
566 workq_threadreq_param_t trp
= { };
568 int policy
= POLICY_TIMESHARE
;
570 if (req
&& (req
->tr_flags
& TR_FLAG_WL_PARAMS
)) {
571 trp
= kqueue_threadreq_workloop_param(req
);
574 uth
->uu_workq_pri
= WORKQ_POLICY_INIT(qos
);
575 uth
->uu_workq_flags
&= ~UT_WORKQ_OUTSIDE_QOS
;
576 uth
->uu_save
.uus_workq_park_data
.workloop_params
= trp
.trp_value
;
578 // qos sent out to userspace (may differ from uu_workq_pri on param threads)
579 uth
->uu_save
.uus_workq_park_data
.qos
= qos
;
581 if (qos
== WORKQ_THREAD_QOS_MANAGER
) {
582 uint32_t mgr_pri
= wq
->wq_event_manager_priority
;
583 assert(trp
.trp_value
== 0); // manager qos and thread policy don't mix
585 if (mgr_pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
586 mgr_pri
&= _PTHREAD_PRIORITY_SCHED_PRI_MASK
;
587 thread_set_workq_pri(th
, THREAD_QOS_UNSPECIFIED
, mgr_pri
,
592 qos
= _pthread_priority_thread_qos(mgr_pri
);
594 if (trp
.trp_flags
& TRP_PRIORITY
) {
595 qos
= THREAD_QOS_UNSPECIFIED
;
596 priority
= trp
.trp_pri
;
597 uth
->uu_workq_flags
|= UT_WORKQ_OUTSIDE_QOS
;
600 if (trp
.trp_flags
& TRP_POLICY
) {
601 policy
= trp
.trp_pol
;
605 thread_set_workq_pri(th
, qos
, priority
, policy
);
609 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
610 * every time a servicer is being told about a new max QoS.
613 workq_thread_set_max_qos(struct proc
*p
, struct kqrequest
*kqr
)
615 struct uu_workq_policy old_pri
, new_pri
;
616 struct uthread
*uth
= get_bsdthread_info(kqr
->kqr_thread
);
617 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
618 thread_qos_t qos
= kqr
->kqr_qos_index
;
620 if (uth
->uu_workq_pri
.qos_max
== qos
)
624 old_pri
= new_pri
= uth
->uu_workq_pri
;
625 new_pri
.qos_max
= qos
;
626 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
630 #pragma mark idle threads accounting and handling
632 static inline struct uthread
*
633 workq_oldest_killable_idle_thread(struct workqueue
*wq
)
635 struct uthread
*uth
= TAILQ_LAST(&wq
->wq_thidlelist
, workq_uthread_head
);
637 if (uth
&& !uth
->uu_save
.uus_workq_park_data
.has_stack
) {
638 uth
= TAILQ_PREV(uth
, workq_uthread_head
, uu_workq_entry
);
640 assert(uth
->uu_save
.uus_workq_park_data
.has_stack
);
646 static inline uint64_t
647 workq_kill_delay_for_idle_thread(struct workqueue
*wq
)
649 uint64_t delay
= wq_reduce_pool_window
.abstime
;
650 uint16_t idle
= wq
->wq_thidlecount
;
653 * If we have less than wq_death_max_load threads, have a 5s timer.
655 * For the next wq_max_constrained_threads ones, decay linearly from
658 if (idle
<= wq_death_max_load
) {
662 if (wq_max_constrained_threads
> idle
- wq_death_max_load
) {
663 delay
*= (wq_max_constrained_threads
- (idle
- wq_death_max_load
));
665 return delay
/ wq_max_constrained_threads
;
669 workq_should_kill_idle_thread(struct workqueue
*wq
, struct uthread
*uth
,
672 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
673 return now
- uth
->uu_save
.uus_workq_park_data
.idle_stamp
> delay
;
677 workq_death_call_schedule(struct workqueue
*wq
, uint64_t deadline
)
679 uint32_t wq_flags
= os_atomic_load(&wq
->wq_flags
, relaxed
);
681 if (wq_flags
& (WQ_EXITING
| WQ_DEATH_CALL_SCHEDULED
)) {
684 os_atomic_or(&wq
->wq_flags
, WQ_DEATH_CALL_SCHEDULED
, relaxed
);
686 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_NONE
, wq
, 1, 0, 0, 0);
689 * <rdar://problem/13139182> Due to how long term timers work, the leeway
690 * can't be too short, so use 500ms which is long enough that we will not
691 * wake up the CPU for killing threads, but short enough that it doesn't
692 * fall into long-term timer list shenanigans.
694 thread_call_enter_delayed_with_leeway(wq
->wq_death_call
, NULL
, deadline
,
695 wq_reduce_pool_window
.abstime
/ 10,
696 THREAD_CALL_DELAY_LEEWAY
| THREAD_CALL_DELAY_USER_BACKGROUND
);
700 * `decrement` is set to the number of threads that are no longer dying:
701 * - because they have been resuscitated just in time (workq_pop_idle_thread)
702 * - or have been killed (workq_thread_terminate).
705 workq_death_policy_evaluate(struct workqueue
*wq
, uint16_t decrement
)
709 assert(wq
->wq_thdying_count
>= decrement
);
710 if ((wq
->wq_thdying_count
-= decrement
) > 0)
713 if (wq
->wq_thidlecount
<= 1)
716 if ((uth
= workq_oldest_killable_idle_thread(wq
)) == NULL
)
719 uint64_t now
= mach_absolute_time();
720 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
722 if (now
- uth
->uu_save
.uus_workq_park_data
.idle_stamp
> delay
) {
723 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_START
,
724 wq
, wq
->wq_thidlecount
, 0, 0, 0);
725 wq
->wq_thdying_count
++;
726 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
727 workq_thread_wakeup(uth
);
731 workq_death_call_schedule(wq
,
732 uth
->uu_save
.uus_workq_park_data
.idle_stamp
+ delay
);
736 workq_thread_terminate(struct proc
*p
, struct uthread
*uth
)
738 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
741 TAILQ_REMOVE(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
742 if (uth
->uu_workq_flags
& UT_WORKQ_DYING
) {
743 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_END
,
744 wq
, wq
->wq_thidlecount
, 0, 0, 0);
745 workq_death_policy_evaluate(wq
, 1);
747 if (wq
->wq_nthreads
-- == wq_max_threads
) {
749 * We got under the thread limit again, which may have prevented
750 * thread creation from happening, redrive if there are pending requests
752 if (wq
->wq_reqcount
) {
753 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
758 thread_deallocate(uth
->uu_thread
);
762 workq_kill_old_threads_call(void *param0
, void *param1 __unused
)
764 struct workqueue
*wq
= param0
;
767 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
768 os_atomic_and(&wq
->wq_flags
, ~WQ_DEATH_CALL_SCHEDULED
, relaxed
);
769 workq_death_policy_evaluate(wq
, 0);
770 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
774 static struct uthread
*
775 workq_pop_idle_thread(struct workqueue
*wq
)
779 if ((uth
= TAILQ_FIRST(&wq
->wq_thidlelist
))) {
780 TAILQ_REMOVE(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
782 uth
= TAILQ_FIRST(&wq
->wq_thnewlist
);
783 TAILQ_REMOVE(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
785 TAILQ_INSERT_TAIL(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
787 assert((uth
->uu_workq_flags
& UT_WORKQ_RUNNING
) == 0);
788 uth
->uu_workq_flags
|= UT_WORKQ_RUNNING
| UT_WORKQ_OVERCOMMIT
;
789 wq
->wq_threads_scheduled
++;
790 wq
->wq_thidlecount
--;
792 if (__improbable(uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
793 uth
->uu_workq_flags
^= UT_WORKQ_DYING
;
794 workq_death_policy_evaluate(wq
, 1);
800 * Called by thread_create_workq_waiting() during thread initialization, before
801 * assert_wait, before the thread has been started.
804 workq_thread_init_and_wq_lock(task_t task
, thread_t th
)
806 struct uthread
*uth
= get_bsdthread_info(th
);
808 uth
->uu_workq_flags
= UT_WORKQ_NEW
;
809 uth
->uu_workq_pri
= WORKQ_POLICY_INIT(THREAD_QOS_LEGACY
);
810 uth
->uu_workq_thport
= MACH_PORT_NULL
;
811 uth
->uu_workq_stackaddr
= 0;
813 thread_set_tag(th
, THREAD_TAG_PTHREAD
| THREAD_TAG_WORKQUEUE
);
814 thread_reset_workq_qos(th
, THREAD_QOS_LEGACY
);
816 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task
)));
817 return workq_parked_wait_event(uth
);
821 * Try to add a new workqueue thread.
823 * - called with workq lock held
824 * - dropped and retaken around thread creation
825 * - return with workq lock held
828 workq_add_new_idle_thread(proc_t p
, struct workqueue
*wq
)
830 mach_vm_offset_t th_stackaddr
;
838 vm_map_t vmap
= get_task_map(p
->task
);
840 kret
= pthread_functions
->workq_create_threadstack(p
, vmap
, &th_stackaddr
);
841 if (kret
!= KERN_SUCCESS
) {
842 WQ_TRACE_WQ(TRACE_wq_thread_create_failed
| DBG_FUNC_NONE
, wq
,
847 kret
= thread_create_workq_waiting(p
->task
, workq_unpark_continue
, &th
);
848 if (kret
!= KERN_SUCCESS
) {
849 WQ_TRACE_WQ(TRACE_wq_thread_create_failed
| DBG_FUNC_NONE
, wq
,
851 pthread_functions
->workq_destroy_threadstack(p
, vmap
, th_stackaddr
);
855 // thread_create_workq_waiting() will return with the wq lock held
856 // on success, because it calls workq_thread_init_and_wq_lock() above
858 struct uthread
*uth
= get_bsdthread_info(th
);
861 wq
->wq_thidlecount
++;
862 uth
->uu_workq_stackaddr
= th_stackaddr
;
863 TAILQ_INSERT_TAIL(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
865 WQ_TRACE_WQ(TRACE_wq_thread_create
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
871 * Do not redrive here if we went under wq_max_threads again,
872 * it is the responsibility of the callers of this function
873 * to do so when it fails.
879 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
881 __attribute__((noreturn
, noinline
))
883 workq_unpark_for_death_and_unlock(proc_t p
, struct workqueue
*wq
,
884 struct uthread
*uth
, uint32_t death_flags
)
886 thread_qos_t qos
= workq_pri_override(uth
->uu_workq_pri
);
887 bool first_use
= uth
->uu_workq_flags
& UT_WORKQ_NEW
;
889 if (qos
> WORKQ_THREAD_QOS_CLEANUP
) {
890 workq_thread_reset_pri(wq
, uth
, NULL
);
891 qos
= WORKQ_THREAD_QOS_CLEANUP
;
894 workq_thread_reset_cpupercent(NULL
, uth
);
896 if (death_flags
& WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
) {
897 wq
->wq_thidlecount
--;
899 TAILQ_REMOVE(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
901 TAILQ_REMOVE(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
904 TAILQ_INSERT_TAIL(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
908 uint32_t flags
= WQ_FLAG_THREAD_NEWSPI
| qos
| WQ_FLAG_THREAD_PRIO_QOS
;
909 uint32_t setup_flags
= WQ_SETUP_EXIT_THREAD
;
910 thread_t th
= uth
->uu_thread
;
911 vm_map_t vmap
= get_task_map(p
->task
);
913 if (!first_use
) flags
|= WQ_FLAG_THREAD_REUSE
;
915 pthread_functions
->workq_setup_thread(p
, th
, vmap
, uth
->uu_workq_stackaddr
,
916 uth
->uu_workq_thport
, 0, setup_flags
, flags
);
917 __builtin_unreachable();
921 workq_is_current_thread_updating_turnstile(struct workqueue
*wq
)
923 return wq
->wq_turnstile_updater
== current_thread();
926 __attribute__((always_inline
))
928 workq_perform_turnstile_operation_locked(struct workqueue
*wq
,
929 void (^operation
)(void))
932 wq
->wq_turnstile_updater
= current_thread();
934 wq
->wq_turnstile_updater
= THREAD_NULL
;
938 workq_turnstile_update_inheritor(struct workqueue
*wq
,
939 turnstile_inheritor_t inheritor
,
940 turnstile_update_flags_t flags
)
942 workq_perform_turnstile_operation_locked(wq
, ^{
943 turnstile_update_inheritor(wq
->wq_turnstile
, inheritor
,
944 flags
| TURNSTILE_IMMEDIATE_UPDATE
);
945 turnstile_update_inheritor_complete(wq
->wq_turnstile
,
946 TURNSTILE_INTERLOCK_HELD
);
951 workq_push_idle_thread(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
)
953 uint64_t now
= mach_absolute_time();
955 uth
->uu_workq_flags
&= ~UT_WORKQ_RUNNING
;
956 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
957 wq
->wq_constrained_threads_scheduled
--;
959 TAILQ_REMOVE(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
960 wq
->wq_threads_scheduled
--;
962 if (wq
->wq_creator
== uth
) {
963 WQ_TRACE_WQ(TRACE_wq_creator_select
, wq
, 3, 0,
964 uth
->uu_save
.uus_workq_park_data
.yields
, 0);
965 wq
->wq_creator
= NULL
;
966 if (wq
->wq_reqcount
) {
967 workq_turnstile_update_inheritor(wq
, wq
, TURNSTILE_INHERITOR_WORKQ
);
969 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
971 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
972 TAILQ_INSERT_TAIL(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
973 wq
->wq_thidlecount
++;
977 _wq_thactive_dec(wq
, uth
->uu_workq_pri
.qos_bucket
);
978 wq
->wq_thscheduled_count
[_wq_bucket(uth
->uu_workq_pri
.qos_bucket
)]--;
979 assert(!(uth
->uu_workq_flags
& UT_WORKQ_NEW
));
980 uth
->uu_workq_flags
|= UT_WORKQ_IDLE_CLEANUP
;
983 uth
->uu_save
.uus_workq_park_data
.idle_stamp
= now
;
985 struct uthread
*oldest
= workq_oldest_killable_idle_thread(wq
);
986 uint16_t cur_idle
= wq
->wq_thidlecount
;
988 if (cur_idle
>= wq_max_constrained_threads
||
989 (wq
->wq_thdying_count
== 0 && oldest
&&
990 workq_should_kill_idle_thread(wq
, oldest
, now
))) {
992 * Immediately kill threads if we have too may of them.
994 * And swap "place" with the oldest one we'd have woken up.
995 * This is a relatively desperate situation where we really
996 * need to kill threads quickly and it's best to kill
997 * the one that's currently on core than context switching.
1000 oldest
->uu_save
.uus_workq_park_data
.idle_stamp
= now
;
1001 TAILQ_REMOVE(&wq
->wq_thidlelist
, oldest
, uu_workq_entry
);
1002 TAILQ_INSERT_HEAD(&wq
->wq_thidlelist
, oldest
, uu_workq_entry
);
1005 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_START
,
1006 wq
, cur_idle
, 0, 0, 0);
1007 wq
->wq_thdying_count
++;
1008 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
1009 uth
->uu_workq_flags
&= ~UT_WORKQ_IDLE_CLEANUP
;
1010 workq_unpark_for_death_and_unlock(p
, wq
, uth
, 0);
1011 __builtin_unreachable();
1014 struct uthread
*tail
= TAILQ_LAST(&wq
->wq_thidlelist
, workq_uthread_head
);
1017 wq
->wq_thidlecount
= cur_idle
;
1019 if (cur_idle
>= wq_death_max_load
&& tail
&&
1020 tail
->uu_save
.uus_workq_park_data
.has_stack
) {
1021 uth
->uu_save
.uus_workq_park_data
.has_stack
= false;
1022 TAILQ_INSERT_TAIL(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
1024 uth
->uu_save
.uus_workq_park_data
.has_stack
= true;
1025 TAILQ_INSERT_HEAD(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
1029 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
1030 workq_death_call_schedule(wq
, now
+ delay
);
1034 #pragma mark thread requests
1037 workq_priority_for_req(workq_threadreq_t req
)
1039 thread_qos_t qos
= req
->tr_qos
;
1041 if (req
->tr_flags
& TR_FLAG_WL_OUTSIDE_QOS
) {
1042 workq_threadreq_param_t trp
= kqueue_threadreq_workloop_param(req
);
1043 assert(trp
.trp_flags
& TRP_PRIORITY
);
1046 return thread_workq_pri_for_qos(qos
);
1049 static inline struct priority_queue
*
1050 workq_priority_queue_for_req(struct workqueue
*wq
, workq_threadreq_t req
)
1052 if (req
->tr_flags
& TR_FLAG_WL_OUTSIDE_QOS
) {
1053 return &wq
->wq_special_queue
;
1054 } else if (req
->tr_flags
& TR_FLAG_OVERCOMMIT
) {
1055 return &wq
->wq_overcommit_queue
;
1057 return &wq
->wq_constrained_queue
;
1062 * returns true if the the enqueued request is the highest priority item
1063 * in its priority queue.
1066 workq_threadreq_enqueue(struct workqueue
*wq
, workq_threadreq_t req
)
1068 assert(req
->tr_state
== TR_STATE_NEW
);
1070 req
->tr_state
= TR_STATE_QUEUED
;
1071 wq
->wq_reqcount
+= req
->tr_count
;
1073 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
1074 assert(wq
->wq_event_manager_threadreq
== NULL
);
1075 assert(req
->tr_flags
& TR_FLAG_KEVENT
);
1076 assert(req
->tr_count
== 1);
1077 wq
->wq_event_manager_threadreq
= req
;
1080 if (priority_queue_insert(workq_priority_queue_for_req(wq
, req
),
1081 &req
->tr_entry
, workq_priority_for_req(req
),
1082 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
)) {
1083 if ((req
->tr_flags
& TR_FLAG_OVERCOMMIT
) == 0) {
1084 _wq_thactive_refresh_best_constrained_req_qos(wq
);
1092 * returns true if the the dequeued request was the highest priority item
1093 * in its priority queue.
1096 workq_threadreq_dequeue(struct workqueue
*wq
, workq_threadreq_t req
)
1100 if (--req
->tr_count
== 0) {
1101 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
1102 assert(wq
->wq_event_manager_threadreq
== req
);
1103 assert(req
->tr_count
== 0);
1104 wq
->wq_event_manager_threadreq
= NULL
;
1107 if (priority_queue_remove(workq_priority_queue_for_req(wq
, req
),
1108 &req
->tr_entry
, PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
)) {
1109 if ((req
->tr_flags
& TR_FLAG_OVERCOMMIT
) == 0) {
1110 _wq_thactive_refresh_best_constrained_req_qos(wq
);
1119 workq_threadreq_destroy(proc_t p
, workq_threadreq_t req
)
1121 req
->tr_state
= TR_STATE_IDLE
;
1122 if (req
->tr_flags
& (TR_FLAG_WORKLOOP
| TR_FLAG_KEVENT
)) {
1123 kqueue_threadreq_cancel(p
, req
);
1125 zfree(workq_zone_threadreq
, req
);
1130 * Mark a thread request as complete. At this point, it is treated as owned by
1131 * the submitting subsystem and you should assume it could be freed.
1133 * Called with the workqueue lock held.
1136 workq_threadreq_bind_and_unlock(proc_t p
, struct workqueue
*wq
,
1137 workq_threadreq_t req
, struct uthread
*uth
)
1139 uint8_t tr_flags
= req
->tr_flags
;
1140 bool needs_commit
= false;
1141 int creator_flags
= 0;
1145 if (req
->tr_state
== TR_STATE_QUEUED
) {
1146 workq_threadreq_dequeue(wq
, req
);
1147 creator_flags
= WORKQ_THREADREQ_CAN_CREATE_THREADS
;
1150 if (wq
->wq_creator
== uth
) {
1151 WQ_TRACE_WQ(TRACE_wq_creator_select
, wq
, 4, 0,
1152 uth
->uu_save
.uus_workq_park_data
.yields
, 0);
1153 creator_flags
= WORKQ_THREADREQ_CAN_CREATE_THREADS
|
1154 WORKQ_THREADREQ_CREATOR_TRANSFER
;
1155 wq
->wq_creator
= NULL
;
1156 _wq_thactive_inc(wq
, req
->tr_qos
);
1157 wq
->wq_thscheduled_count
[_wq_bucket(req
->tr_qos
)]++;
1158 } else if (uth
->uu_workq_pri
.qos_bucket
!= req
->tr_qos
) {
1159 _wq_thactive_move(wq
, uth
->uu_workq_pri
.qos_bucket
, req
->tr_qos
);
1161 workq_thread_reset_pri(wq
, uth
, req
);
1163 if (tr_flags
& TR_FLAG_OVERCOMMIT
) {
1164 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
1165 uth
->uu_workq_flags
|= UT_WORKQ_OVERCOMMIT
;
1166 wq
->wq_constrained_threads_scheduled
--;
1169 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) != 0) {
1170 uth
->uu_workq_flags
&= ~UT_WORKQ_OVERCOMMIT
;
1171 wq
->wq_constrained_threads_scheduled
++;
1175 if (tr_flags
& (TR_FLAG_KEVENT
| TR_FLAG_WORKLOOP
)) {
1176 if (req
->tr_state
== TR_STATE_NEW
) {
1178 * We're called from workq_kern_threadreq_initiate()
1179 * due to an unbind, with the kq req held.
1181 assert(!creator_flags
);
1182 req
->tr_state
= TR_STATE_IDLE
;
1183 kqueue_threadreq_bind(p
, req
, uth
->uu_thread
, 0);
1185 assert(req
->tr_count
== 0);
1186 workq_perform_turnstile_operation_locked(wq
, ^{
1187 kqueue_threadreq_bind_prepost(p
, req
, uth
->uu_thread
);
1189 needs_commit
= true;
1192 } else if (req
->tr_count
> 0) {
1196 if (creator_flags
) {
1197 /* This can drop the workqueue lock, and take it again */
1198 workq_schedule_creator(p
, wq
, creator_flags
);
1204 zfree(workq_zone_threadreq
, req
);
1207 kqueue_threadreq_bind_commit(p
, uth
->uu_thread
);
1213 uint32_t upcall_flags
= WQ_FLAG_THREAD_NEWSPI
;
1214 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
1215 upcall_flags
|= WQ_FLAG_THREAD_EVENT_MANAGER
;
1216 } else if (tr_flags
& TR_FLAG_OVERCOMMIT
) {
1217 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
1219 if (tr_flags
& TR_FLAG_KEVENT
) {
1220 upcall_flags
|= WQ_FLAG_THREAD_KEVENT
;
1222 if (tr_flags
& TR_FLAG_WORKLOOP
) {
1223 upcall_flags
|= WQ_FLAG_THREAD_WORKLOOP
| WQ_FLAG_THREAD_KEVENT
;
1225 uth
->uu_save
.uus_workq_park_data
.upcall_flags
= upcall_flags
;
1228 #pragma mark workqueue thread creation thread calls
1231 workq_thread_call_prepost(struct workqueue
*wq
, uint32_t sched
, uint32_t pend
,
1234 uint32_t old_flags
, new_flags
;
1236 os_atomic_rmw_loop(&wq
->wq_flags
, old_flags
, new_flags
, acquire
, {
1237 if (__improbable(old_flags
& (WQ_EXITING
| sched
| pend
| fail_mask
))) {
1238 os_atomic_rmw_loop_give_up(return false);
1240 if (__improbable(old_flags
& WQ_PROC_SUSPENDED
)) {
1241 new_flags
= old_flags
| pend
;
1243 new_flags
= old_flags
| sched
;
1247 return (old_flags
& WQ_PROC_SUSPENDED
) == 0;
1250 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1253 workq_schedule_delayed_thread_creation(struct workqueue
*wq
, int flags
)
1255 assert(!preemption_enabled());
1257 if (!workq_thread_call_prepost(wq
, WQ_DELAYED_CALL_SCHEDULED
,
1258 WQ_DELAYED_CALL_PENDED
, WQ_IMMEDIATE_CALL_PENDED
|
1259 WQ_IMMEDIATE_CALL_SCHEDULED
)) {
1263 uint64_t now
= mach_absolute_time();
1265 if (flags
& WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART
) {
1266 /* do not change the window */
1267 } else if (now
- wq
->wq_thread_call_last_run
<= wq
->wq_timer_interval
) {
1268 wq
->wq_timer_interval
*= 2;
1269 if (wq
->wq_timer_interval
> wq_max_timer_interval
.abstime
) {
1270 wq
->wq_timer_interval
= wq_max_timer_interval
.abstime
;
1272 } else if (now
- wq
->wq_thread_call_last_run
> 2 * wq
->wq_timer_interval
) {
1273 wq
->wq_timer_interval
/= 2;
1274 if (wq
->wq_timer_interval
< wq_stalled_window
.abstime
) {
1275 wq
->wq_timer_interval
= wq_stalled_window
.abstime
;
1279 WQ_TRACE_WQ(TRACE_wq_start_add_timer
, wq
, wq
->wq_reqcount
,
1280 _wq_flags(wq
), wq
->wq_timer_interval
, 0);
1282 thread_call_t call
= wq
->wq_delayed_call
;
1283 uintptr_t arg
= WQ_DELAYED_CALL_SCHEDULED
;
1284 uint64_t deadline
= now
+ wq
->wq_timer_interval
;
1285 if (thread_call_enter1_delayed(call
, (void *)arg
, deadline
)) {
1286 panic("delayed_call was already enqueued");
1292 workq_schedule_immediate_thread_creation(struct workqueue
*wq
)
1294 assert(!preemption_enabled());
1296 if (workq_thread_call_prepost(wq
, WQ_IMMEDIATE_CALL_SCHEDULED
,
1297 WQ_IMMEDIATE_CALL_PENDED
, 0)) {
1298 WQ_TRACE_WQ(TRACE_wq_start_add_timer
, wq
, wq
->wq_reqcount
,
1299 _wq_flags(wq
), 0, 0);
1301 uintptr_t arg
= WQ_IMMEDIATE_CALL_SCHEDULED
;
1302 if (thread_call_enter1(wq
->wq_immediate_call
, (void *)arg
)) {
1303 panic("immediate_call was already enqueued");
1309 workq_proc_suspended(struct proc
*p
)
1311 struct workqueue
*wq
= proc_get_wqptr(p
);
1313 if (wq
) os_atomic_or(&wq
->wq_flags
, WQ_PROC_SUSPENDED
, relaxed
);
1317 workq_proc_resumed(struct proc
*p
)
1319 struct workqueue
*wq
= proc_get_wqptr(p
);
1324 wq_flags
= os_atomic_and_orig(&wq
->wq_flags
, ~(WQ_PROC_SUSPENDED
|
1325 WQ_DELAYED_CALL_PENDED
| WQ_IMMEDIATE_CALL_PENDED
), relaxed
);
1326 if ((wq_flags
& WQ_EXITING
) == 0) {
1327 disable_preemption();
1328 if (wq_flags
& WQ_IMMEDIATE_CALL_PENDED
) {
1329 workq_schedule_immediate_thread_creation(wq
);
1330 } else if (wq_flags
& WQ_DELAYED_CALL_PENDED
) {
1331 workq_schedule_delayed_thread_creation(wq
,
1332 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART
);
1334 enable_preemption();
1339 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1342 workq_thread_is_busy(uint64_t now
, _Atomic
uint64_t *lastblocked_tsp
)
1344 uint64_t lastblocked_ts
= os_atomic_load(lastblocked_tsp
, relaxed
);
1345 if (now
<= lastblocked_ts
) {
1347 * Because the update of the timestamp when a thread blocks
1348 * isn't serialized against us looking at it (i.e. we don't hold
1349 * the workq lock), it's possible to have a timestamp that matches
1350 * the current time or that even looks to be in the future relative
1351 * to when we grabbed the current time...
1353 * Just treat this as a busy thread since it must have just blocked.
1357 return (now
- lastblocked_ts
) < wq_stalled_window
.abstime
;
1361 workq_add_new_threads_call(void *_p
, void *flags
)
1364 struct workqueue
*wq
= proc_get_wqptr(p
);
1365 uint32_t my_flag
= (uint32_t)(uintptr_t)flags
;
1368 * workq_exit() will set the workqueue to NULL before
1369 * it cancels thread calls.
1373 assert((my_flag
== WQ_DELAYED_CALL_SCHEDULED
) ||
1374 (my_flag
== WQ_IMMEDIATE_CALL_SCHEDULED
));
1376 WQ_TRACE_WQ(TRACE_wq_add_timer
| DBG_FUNC_START
, wq
, _wq_flags(wq
),
1377 wq
->wq_nthreads
, wq
->wq_thidlecount
, 0);
1379 workq_lock_spin(wq
);
1381 wq
->wq_thread_call_last_run
= mach_absolute_time();
1382 os_atomic_and(&wq
->wq_flags
, ~my_flag
, release
);
1384 /* This can drop the workqueue lock, and take it again */
1385 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
1389 WQ_TRACE_WQ(TRACE_wq_add_timer
| DBG_FUNC_END
, wq
, 0,
1390 wq
->wq_nthreads
, wq
->wq_thidlecount
, 0);
1393 #pragma mark thread state tracking
1396 workq_sched_callback(int type
, thread_t thread
)
1398 struct uthread
*uth
= get_bsdthread_info(thread
);
1399 proc_t proc
= get_bsdtask_info(get_threadtask(thread
));
1400 struct workqueue
*wq
= proc_get_wqptr(proc
);
1401 thread_qos_t req_qos
, qos
= uth
->uu_workq_pri
.qos_bucket
;
1402 wq_thactive_t old_thactive
;
1403 bool start_timer
= false;
1405 if (qos
== WORKQ_THREAD_QOS_MANAGER
) {
1410 case SCHED_CALL_BLOCK
:
1411 old_thactive
= _wq_thactive_dec(wq
, qos
);
1412 req_qos
= WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive
);
1415 * Remember the timestamp of the last thread that blocked in this
1416 * bucket, it used used by admission checks to ignore one thread
1417 * being inactive if this timestamp is recent enough.
1419 * If we collide with another thread trying to update the
1420 * last_blocked (really unlikely since another thread would have to
1421 * get scheduled and then block after we start down this path), it's
1422 * not a problem. Either timestamp is adequate, so no need to retry
1424 os_atomic_store(&wq
->wq_lastblocked_ts
[_wq_bucket(qos
)],
1425 thread_last_run_time(thread
), relaxed
);
1427 if (req_qos
== THREAD_QOS_UNSPECIFIED
) {
1429 * No pending request at the moment we could unblock, move on.
1431 } else if (qos
< req_qos
) {
1433 * The blocking thread is at a lower QoS than the highest currently
1434 * pending constrained request, nothing has to be redriven
1437 uint32_t max_busycount
, old_req_count
;
1438 old_req_count
= _wq_thactive_aggregate_downto_qos(wq
, old_thactive
,
1439 req_qos
, NULL
, &max_busycount
);
1441 * If it is possible that may_start_constrained_thread had refused
1442 * admission due to being over the max concurrency, we may need to
1443 * spin up a new thread.
1445 * We take into account the maximum number of busy threads
1446 * that can affect may_start_constrained_thread as looking at the
1447 * actual number may_start_constrained_thread will see is racy.
1449 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1450 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1452 uint32_t conc
= wq_max_parallelism
[_wq_bucket(qos
)];
1453 if (old_req_count
<= conc
&& conc
<= old_req_count
+ max_busycount
) {
1454 start_timer
= workq_schedule_delayed_thread_creation(wq
, 0);
1457 if (__improbable(kdebug_enable
)) {
1458 __unused
uint32_t old
= _wq_thactive_aggregate_downto_qos(wq
,
1459 old_thactive
, qos
, NULL
, NULL
);
1460 WQ_TRACE_WQ(TRACE_wq_thread_block
| DBG_FUNC_START
, wq
,
1461 old
- 1, qos
| (req_qos
<< 8),
1462 wq
->wq_reqcount
<< 1 | start_timer
, 0);
1466 case SCHED_CALL_UNBLOCK
:
1468 * we cannot take the workqueue_lock here...
1469 * an UNBLOCK can occur from a timer event which
1470 * is run from an interrupt context... if the workqueue_lock
1471 * is already held by this processor, we'll deadlock...
1472 * the thread lock for the thread being UNBLOCKED
1475 old_thactive
= _wq_thactive_inc(wq
, qos
);
1476 if (__improbable(kdebug_enable
)) {
1477 __unused
uint32_t old
= _wq_thactive_aggregate_downto_qos(wq
,
1478 old_thactive
, qos
, NULL
, NULL
);
1479 req_qos
= WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive
);
1480 WQ_TRACE_WQ(TRACE_wq_thread_block
| DBG_FUNC_END
, wq
,
1481 old
+ 1, qos
| (req_qos
<< 8),
1482 wq
->wq_threads_scheduled
, 0);
1488 #pragma mark workq lifecycle
1491 workq_reference(struct workqueue
*wq
)
1493 os_ref_retain(&wq
->wq_refcnt
);
1497 workq_destroy(struct workqueue
*wq
)
1499 struct turnstile
*ts
;
1501 turnstile_complete((uintptr_t)wq
, &wq
->wq_turnstile
, &ts
);
1503 turnstile_cleanup();
1504 turnstile_deallocate(ts
);
1506 lck_spin_destroy(&wq
->wq_lock
, workq_lck_grp
);
1507 zfree(workq_zone_workqueue
, wq
);
1511 workq_deallocate(struct workqueue
*wq
)
1513 if (os_ref_release_relaxed(&wq
->wq_refcnt
) == 0) {
1519 workq_deallocate_safe(struct workqueue
*wq
)
1521 if (__improbable(os_ref_release_relaxed(&wq
->wq_refcnt
) == 0)) {
1522 workq_deallocate_enqueue(wq
);
1527 * Setup per-process state for the workqueue.
1530 workq_open(struct proc
*p
, __unused
struct workq_open_args
*uap
,
1531 __unused
int32_t *retval
)
1533 struct workqueue
*wq
;
1536 if ((p
->p_lflag
& P_LREGISTER
) == 0) {
1540 if (wq_init_constrained_limit
) {
1541 uint32_t limit
, num_cpus
= ml_get_max_cpus();
1544 * set up the limit for the constrained pool
1545 * this is a virtual pool in that we don't
1546 * maintain it on a separate idle and run list
1548 limit
= num_cpus
* WORKQUEUE_CONSTRAINED_FACTOR
;
1550 if (limit
> wq_max_constrained_threads
)
1551 wq_max_constrained_threads
= limit
;
1553 if (wq_max_threads
> WQ_THACTIVE_BUCKET_HALF
) {
1554 wq_max_threads
= WQ_THACTIVE_BUCKET_HALF
;
1556 if (wq_max_threads
> CONFIG_THREAD_MAX
- 20) {
1557 wq_max_threads
= CONFIG_THREAD_MAX
- 20;
1560 wq_death_max_load
= (uint16_t)fls(num_cpus
) + 1;
1562 for (thread_qos_t qos
= WORKQ_THREAD_QOS_MIN
; qos
<= WORKQ_THREAD_QOS_MAX
; qos
++) {
1563 wq_max_parallelism
[_wq_bucket(qos
)] =
1564 qos_max_parallelism(qos
, QOS_PARALLELISM_COUNT_LOGICAL
);
1567 wq_init_constrained_limit
= 0;
1570 if (proc_get_wqptr(p
) == NULL
) {
1571 if (proc_init_wqptr_or_wait(p
) == FALSE
) {
1572 assert(proc_get_wqptr(p
) != NULL
);
1576 wq
= (struct workqueue
*)zalloc(workq_zone_workqueue
);
1577 bzero(wq
, sizeof(struct workqueue
));
1579 os_ref_init_count(&wq
->wq_refcnt
, &workq_refgrp
, 1);
1581 // Start the event manager at the priority hinted at by the policy engine
1582 thread_qos_t mgr_priority_hint
= task_get_default_manager_qos(current_task());
1583 pthread_priority_t pp
= _pthread_priority_make_from_thread_qos(mgr_priority_hint
, 0, 0);
1584 wq
->wq_event_manager_priority
= (uint32_t)pp
;
1585 wq
->wq_timer_interval
= wq_stalled_window
.abstime
;
1587 turnstile_prepare((uintptr_t)wq
, &wq
->wq_turnstile
, turnstile_alloc(),
1590 TAILQ_INIT(&wq
->wq_thrunlist
);
1591 TAILQ_INIT(&wq
->wq_thnewlist
);
1592 TAILQ_INIT(&wq
->wq_thidlelist
);
1593 priority_queue_init(&wq
->wq_overcommit_queue
,
1594 PRIORITY_QUEUE_BUILTIN_MAX_HEAP
);
1595 priority_queue_init(&wq
->wq_constrained_queue
,
1596 PRIORITY_QUEUE_BUILTIN_MAX_HEAP
);
1597 priority_queue_init(&wq
->wq_special_queue
,
1598 PRIORITY_QUEUE_BUILTIN_MAX_HEAP
);
1600 wq
->wq_delayed_call
= thread_call_allocate_with_options(
1601 workq_add_new_threads_call
, p
, THREAD_CALL_PRIORITY_KERNEL
,
1602 THREAD_CALL_OPTIONS_ONCE
);
1603 wq
->wq_immediate_call
= thread_call_allocate_with_options(
1604 workq_add_new_threads_call
, p
, THREAD_CALL_PRIORITY_KERNEL
,
1605 THREAD_CALL_OPTIONS_ONCE
);
1606 wq
->wq_death_call
= thread_call_allocate_with_options(
1607 workq_kill_old_threads_call
, wq
,
1608 THREAD_CALL_PRIORITY_USER
, THREAD_CALL_OPTIONS_ONCE
);
1610 lck_spin_init(&wq
->wq_lock
, workq_lck_grp
, workq_lck_attr
);
1612 WQ_TRACE_WQ(TRACE_wq_create
| DBG_FUNC_NONE
, wq
,
1613 VM_KERNEL_ADDRHIDE(wq
), 0, 0, 0);
1614 proc_set_wqptr(p
, wq
);
1622 * Routine: workq_mark_exiting
1624 * Function: Mark the work queue such that new threads will not be added to the
1625 * work queue after we return.
1627 * Conditions: Called against the current process.
1630 workq_mark_exiting(struct proc
*p
)
1632 struct workqueue
*wq
= proc_get_wqptr(p
);
1634 workq_threadreq_t mgr_req
;
1638 WQ_TRACE_WQ(TRACE_wq_pthread_exit
|DBG_FUNC_START
, wq
, 0, 0, 0, 0);
1640 workq_lock_spin(wq
);
1642 wq_flags
= os_atomic_or_orig(&wq
->wq_flags
, WQ_EXITING
, relaxed
);
1643 if (__improbable(wq_flags
& WQ_EXITING
)) {
1644 panic("workq_mark_exiting called twice");
1648 * Opportunistically try to cancel thread calls that are likely in flight.
1649 * workq_exit() will do the proper cleanup.
1651 if (wq_flags
& WQ_IMMEDIATE_CALL_SCHEDULED
) {
1652 thread_call_cancel(wq
->wq_immediate_call
);
1654 if (wq_flags
& WQ_DELAYED_CALL_SCHEDULED
) {
1655 thread_call_cancel(wq
->wq_delayed_call
);
1657 if (wq_flags
& WQ_DEATH_CALL_SCHEDULED
) {
1658 thread_call_cancel(wq
->wq_death_call
);
1661 mgr_req
= wq
->wq_event_manager_threadreq
;
1662 wq
->wq_event_manager_threadreq
= NULL
;
1663 wq
->wq_reqcount
= 0; /* workq_schedule_creator must not look at queues */
1664 workq_turnstile_update_inheritor(wq
, NULL
, 0);
1669 kqueue_threadreq_cancel(p
, mgr_req
);
1672 * No one touches the priority queues once WQ_EXITING is set.
1673 * It is hence safe to do the tear down without holding any lock.
1675 priority_queue_destroy(&wq
->wq_overcommit_queue
,
1676 struct workq_threadreq_s
, tr_entry
, ^(void *e
){
1677 workq_threadreq_destroy(p
, e
);
1679 priority_queue_destroy(&wq
->wq_constrained_queue
,
1680 struct workq_threadreq_s
, tr_entry
, ^(void *e
){
1681 workq_threadreq_destroy(p
, e
);
1683 priority_queue_destroy(&wq
->wq_special_queue
,
1684 struct workq_threadreq_s
, tr_entry
, ^(void *e
){
1685 workq_threadreq_destroy(p
, e
);
1688 WQ_TRACE(TRACE_wq_pthread_exit
|DBG_FUNC_END
, 0, 0, 0, 0, 0);
1692 * Routine: workq_exit
1694 * Function: clean up the work queue structure(s) now that there are no threads
1695 * left running inside the work queue (except possibly current_thread).
1697 * Conditions: Called by the last thread in the process.
1698 * Called against current process.
1701 workq_exit(struct proc
*p
)
1703 struct workqueue
*wq
;
1704 struct uthread
*uth
, *tmp
;
1706 wq
= os_atomic_xchg(&p
->p_wqptr
, NULL
, relaxed
);
1708 thread_t th
= current_thread();
1710 WQ_TRACE_WQ(TRACE_wq_workqueue_exit
|DBG_FUNC_START
, wq
, 0, 0, 0, 0);
1712 if (thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) {
1714 * <rdar://problem/40111515> Make sure we will no longer call the
1715 * sched call, if we ever block this thread, which the cancel_wait
1718 thread_sched_call(th
, NULL
);
1722 * Thread calls are always scheduled by the proc itself or under the
1723 * workqueue spinlock if WQ_EXITING is not yet set.
1725 * Either way, when this runs, the proc has no threads left beside
1726 * the one running this very code, so we know no thread call can be
1727 * dispatched anymore.
1729 thread_call_cancel_wait(wq
->wq_delayed_call
);
1730 thread_call_cancel_wait(wq
->wq_immediate_call
);
1731 thread_call_cancel_wait(wq
->wq_death_call
);
1732 thread_call_free(wq
->wq_delayed_call
);
1733 thread_call_free(wq
->wq_immediate_call
);
1734 thread_call_free(wq
->wq_death_call
);
1737 * Clean up workqueue data structures for threads that exited and
1738 * didn't get a chance to clean up after themselves.
1740 * idle/new threads should have been interrupted and died on their own
1742 TAILQ_FOREACH_SAFE(uth
, &wq
->wq_thrunlist
, uu_workq_entry
, tmp
) {
1743 thread_sched_call(uth
->uu_thread
, NULL
);
1744 thread_deallocate(uth
->uu_thread
);
1746 assert(TAILQ_EMPTY(&wq
->wq_thnewlist
));
1747 assert(TAILQ_EMPTY(&wq
->wq_thidlelist
));
1749 WQ_TRACE_WQ(TRACE_wq_destroy
| DBG_FUNC_END
, wq
,
1750 VM_KERNEL_ADDRHIDE(wq
), 0, 0, 0);
1752 workq_deallocate(wq
);
1754 WQ_TRACE(TRACE_wq_workqueue_exit
|DBG_FUNC_END
, 0, 0, 0, 0, 0);
1759 #pragma mark bsd thread control
1762 _pthread_priority_to_policy(pthread_priority_t priority
,
1763 thread_qos_policy_data_t
*data
)
1765 data
->qos_tier
= _pthread_priority_thread_qos(priority
);
1766 data
->tier_importance
= _pthread_priority_relpri(priority
);
1767 if (data
->qos_tier
== THREAD_QOS_UNSPECIFIED
|| data
->tier_importance
> 0 ||
1768 data
->tier_importance
< THREAD_QOS_MIN_TIER_IMPORTANCE
) {
1775 bsdthread_set_self(proc_t p
, thread_t th
, pthread_priority_t priority
,
1776 mach_port_name_t voucher
, enum workq_set_self_flags flags
)
1778 struct uthread
*uth
= get_bsdthread_info(th
);
1779 struct workqueue
*wq
= proc_get_wqptr(p
);
1782 int unbind_rv
= 0, qos_rv
= 0, voucher_rv
= 0, fixedpri_rv
= 0;
1783 bool is_wq_thread
= (thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
);
1785 if (flags
& WORKQ_SET_SELF_WQ_KEVENT_UNBIND
) {
1786 if (!is_wq_thread
) {
1791 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
1796 struct kqrequest
*kqr
= uth
->uu_kqr_bound
;
1798 unbind_rv
= EALREADY
;
1802 if (kqr
->kqr_state
& KQR_WORKLOOP
) {
1807 kqueue_threadreq_unbind(p
, uth
->uu_kqr_bound
);
1811 if (flags
& WORKQ_SET_SELF_QOS_FLAG
) {
1812 thread_qos_policy_data_t new_policy
;
1814 if (!_pthread_priority_to_policy(priority
, &new_policy
)) {
1819 if (!is_wq_thread
) {
1821 * Threads opted out of QoS can't change QoS
1823 if (!thread_has_qos_policy(th
)) {
1827 } else if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
1829 * Workqueue manager threads can't change QoS
1835 * For workqueue threads, possibly adjust buckets and redrive thread
1838 bool old_overcommit
= uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
;
1839 bool new_overcommit
= priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
1840 struct uu_workq_policy old_pri
, new_pri
;
1841 bool force_run
= false;
1843 workq_lock_spin(wq
);
1845 if (old_overcommit
!= new_overcommit
) {
1846 uth
->uu_workq_flags
^= UT_WORKQ_OVERCOMMIT
;
1847 if (old_overcommit
) {
1848 wq
->wq_constrained_threads_scheduled
++;
1849 } else if (wq
->wq_constrained_threads_scheduled
-- ==
1850 wq_max_constrained_threads
) {
1855 old_pri
= new_pri
= uth
->uu_workq_pri
;
1856 new_pri
.qos_req
= new_policy
.qos_tier
;
1857 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, force_run
);
1861 kr
= thread_policy_set_internal(th
, THREAD_QOS_POLICY
,
1862 (thread_policy_t
)&new_policy
, THREAD_QOS_POLICY_COUNT
);
1863 if (kr
!= KERN_SUCCESS
) {
1869 if (flags
& WORKQ_SET_SELF_VOUCHER_FLAG
) {
1870 kr
= thread_set_voucher_name(voucher
);
1871 if (kr
!= KERN_SUCCESS
) {
1872 voucher_rv
= ENOENT
;
1878 if (qos_rv
) goto done
;
1879 if (flags
& WORKQ_SET_SELF_FIXEDPRIORITY_FLAG
) {
1880 thread_extended_policy_data_t extpol
= {.timeshare
= 0};
1883 /* Not allowed on workqueue threads */
1884 fixedpri_rv
= ENOTSUP
;
1888 kr
= thread_policy_set_internal(th
, THREAD_EXTENDED_POLICY
,
1889 (thread_policy_t
)&extpol
, THREAD_EXTENDED_POLICY_COUNT
);
1890 if (kr
!= KERN_SUCCESS
) {
1891 fixedpri_rv
= EINVAL
;
1894 } else if (flags
& WORKQ_SET_SELF_TIMESHARE_FLAG
) {
1895 thread_extended_policy_data_t extpol
= {.timeshare
= 1};
1898 /* Not allowed on workqueue threads */
1899 fixedpri_rv
= ENOTSUP
;
1903 kr
= thread_policy_set_internal(th
, THREAD_EXTENDED_POLICY
,
1904 (thread_policy_t
)&extpol
, THREAD_EXTENDED_POLICY_COUNT
);
1905 if (kr
!= KERN_SUCCESS
) {
1906 fixedpri_rv
= EINVAL
;
1912 if (qos_rv
&& voucher_rv
) {
1913 /* Both failed, give that a unique error. */
1937 bsdthread_add_explicit_override(proc_t p
, mach_port_name_t kport
,
1938 pthread_priority_t pp
, user_addr_t resource
)
1940 thread_qos_t qos
= _pthread_priority_thread_qos(pp
);
1941 if (qos
== THREAD_QOS_UNSPECIFIED
) {
1945 thread_t th
= port_name_to_thread(kport
);
1946 if (th
== THREAD_NULL
) {
1950 int rv
= proc_thread_qos_add_override(p
->task
, th
, 0, qos
, TRUE
,
1951 resource
, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE
);
1953 thread_deallocate(th
);
1958 bsdthread_remove_explicit_override(proc_t p
, mach_port_name_t kport
,
1959 user_addr_t resource
)
1961 thread_t th
= port_name_to_thread(kport
);
1962 if (th
== THREAD_NULL
) {
1966 int rv
= proc_thread_qos_remove_override(p
->task
, th
, 0, resource
,
1967 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE
);
1969 thread_deallocate(th
);
1974 workq_thread_add_dispatch_override(proc_t p
, mach_port_name_t kport
,
1975 pthread_priority_t pp
, user_addr_t ulock_addr
)
1977 struct uu_workq_policy old_pri
, new_pri
;
1978 struct workqueue
*wq
= proc_get_wqptr(p
);
1980 thread_qos_t qos_override
= _pthread_priority_thread_qos(pp
);
1981 if (qos_override
== THREAD_QOS_UNSPECIFIED
) {
1985 thread_t thread
= port_name_to_thread(kport
);
1986 if (thread
== THREAD_NULL
) {
1990 struct uthread
*uth
= get_bsdthread_info(thread
);
1991 if ((thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
) == 0) {
1992 thread_deallocate(thread
);
1996 WQ_TRACE_WQ(TRACE_wq_override_dispatch
| DBG_FUNC_NONE
,
1997 wq
, thread_tid(thread
), 1, pp
, 0);
1999 thread_mtx_lock(thread
);
2005 * Workaround lack of explicit support for 'no-fault copyin'
2006 * <rdar://problem/24999882>, as disabling preemption prevents paging in
2008 disable_preemption();
2009 rc
= copyin_word(ulock_addr
, &val
, sizeof(kport
));
2010 enable_preemption();
2011 if (rc
== 0 && ulock_owner_value_to_port_name((uint32_t)val
) != kport
) {
2016 workq_lock_spin(wq
);
2018 old_pri
= uth
->uu_workq_pri
;
2019 if (old_pri
.qos_override
>= qos_override
) {
2021 } else if (thread
== current_thread()) {
2023 new_pri
.qos_override
= qos_override
;
2024 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
2026 uth
->uu_workq_pri
.qos_override
= qos_override
;
2027 if (qos_override
> workq_pri_override(old_pri
)) {
2028 thread_set_workq_override(thread
, qos_override
);
2035 thread_mtx_unlock(thread
);
2036 thread_deallocate(thread
);
2041 workq_thread_reset_dispatch_override(proc_t p
, thread_t thread
)
2043 struct uu_workq_policy old_pri
, new_pri
;
2044 struct workqueue
*wq
= proc_get_wqptr(p
);
2045 struct uthread
*uth
= get_bsdthread_info(thread
);
2047 if ((thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
) == 0) {
2051 WQ_TRACE_WQ(TRACE_wq_override_reset
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
2053 workq_lock_spin(wq
);
2054 old_pri
= new_pri
= uth
->uu_workq_pri
;
2055 new_pri
.qos_override
= THREAD_QOS_UNSPECIFIED
;
2056 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
2062 bsdthread_get_max_parallelism(thread_qos_t qos
, unsigned long flags
,
2065 static_assert(QOS_PARALLELISM_COUNT_LOGICAL
==
2066 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL
, "logical");
2067 static_assert(QOS_PARALLELISM_REALTIME
==
2068 _PTHREAD_QOS_PARALLELISM_REALTIME
, "realtime");
2070 if (flags
& ~(QOS_PARALLELISM_REALTIME
| QOS_PARALLELISM_COUNT_LOGICAL
)) {
2074 if (flags
& QOS_PARALLELISM_REALTIME
) {
2078 } else if (qos
== THREAD_QOS_UNSPECIFIED
|| qos
>= THREAD_QOS_LAST
) {
2082 *retval
= qos_max_parallelism(qos
, flags
);
2086 #define ENSURE_UNUSED(arg) \
2087 ({ if ((arg) != 0) { return EINVAL; } })
2090 bsdthread_ctl(struct proc
*p
, struct bsdthread_ctl_args
*uap
, int *retval
)
2093 case BSDTHREAD_CTL_QOS_OVERRIDE_START
:
2094 return bsdthread_add_explicit_override(p
, (mach_port_name_t
)uap
->arg1
,
2095 (pthread_priority_t
)uap
->arg2
, uap
->arg3
);
2096 case BSDTHREAD_CTL_QOS_OVERRIDE_END
:
2097 ENSURE_UNUSED(uap
->arg3
);
2098 return bsdthread_remove_explicit_override(p
, (mach_port_name_t
)uap
->arg1
,
2099 (user_addr_t
)uap
->arg2
);
2101 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH
:
2102 return workq_thread_add_dispatch_override(p
, (mach_port_name_t
)uap
->arg1
,
2103 (pthread_priority_t
)uap
->arg2
, uap
->arg3
);
2104 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET
:
2105 return workq_thread_reset_dispatch_override(p
, current_thread());
2107 case BSDTHREAD_CTL_SET_SELF
:
2108 return bsdthread_set_self(p
, current_thread(),
2109 (pthread_priority_t
)uap
->arg1
, (mach_port_name_t
)uap
->arg2
,
2110 (enum workq_set_self_flags
)uap
->arg3
);
2112 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM
:
2113 ENSURE_UNUSED(uap
->arg3
);
2114 return bsdthread_get_max_parallelism((thread_qos_t
)uap
->arg1
,
2115 (unsigned long)uap
->arg2
, retval
);
2117 case BSDTHREAD_CTL_SET_QOS
:
2118 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD
:
2119 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET
:
2120 /* no longer supported */
2128 #pragma mark workqueue thread manipulation
2131 workq_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
2132 struct uthread
*uth
);
2134 static void workq_setup_and_run(proc_t p
, struct uthread
*uth
, int flags
) __dead2
;
2136 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2137 static inline uint64_t
2138 workq_trace_req_id(workq_threadreq_t req
)
2140 struct kqworkloop
*kqwl
;
2141 if (req
->tr_flags
& TR_FLAG_WORKLOOP
) {
2142 kqwl
= __container_of(req
, struct kqworkloop
, kqwl_request
.kqr_req
);
2143 return kqwl
->kqwl_dynamicid
;
2146 return VM_KERNEL_ADDRHIDE(req
);
2151 * Entry point for libdispatch to ask for threads
2154 workq_reqthreads(struct proc
*p
, uint32_t reqcount
, pthread_priority_t pp
)
2156 thread_qos_t qos
= _pthread_priority_thread_qos(pp
);
2157 struct workqueue
*wq
= proc_get_wqptr(p
);
2158 uint32_t unpaced
, upcall_flags
= WQ_FLAG_THREAD_NEWSPI
;
2160 if (wq
== NULL
|| reqcount
<= 0 || reqcount
> UINT16_MAX
||
2161 qos
== THREAD_QOS_UNSPECIFIED
) {
2165 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads
| DBG_FUNC_NONE
,
2166 wq
, reqcount
, pp
, 0, 0);
2168 workq_threadreq_t req
= zalloc(workq_zone_threadreq
);
2169 priority_queue_entry_init(&req
->tr_entry
);
2170 req
->tr_state
= TR_STATE_NEW
;
2174 if (pp
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
) {
2175 req
->tr_flags
|= TR_FLAG_OVERCOMMIT
;
2176 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
2179 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate
| DBG_FUNC_NONE
,
2180 wq
, workq_trace_req_id(req
), req
->tr_qos
, reqcount
, 0);
2182 workq_lock_spin(wq
);
2184 if (_wq_exiting(wq
)) {
2189 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2190 * threads without pacing, to inform the scheduler of that workload.
2192 * The last requests, or the ones that failed the admission checks are
2193 * enqueued and go through the regular creator codepath.
2195 * If there aren't enough threads, add one, but re-evaluate everything
2196 * as conditions may now have changed.
2198 if (reqcount
> 1 && (req
->tr_flags
& TR_FLAG_OVERCOMMIT
) == 0) {
2199 unpaced
= workq_constrained_allowance(wq
, qos
, NULL
, false);
2200 if (unpaced
>= reqcount
- 1) {
2201 unpaced
= reqcount
- 1;
2204 unpaced
= reqcount
- 1;
2208 * This path does not currently handle custom workloop parameters
2209 * when creating threads for parallelism.
2211 assert(!(req
->tr_flags
& TR_FLAG_WL_PARAMS
));
2214 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
2216 while (unpaced
> 0 && wq
->wq_thidlecount
) {
2217 struct uthread
*uth
= workq_pop_idle_thread(wq
);
2219 _wq_thactive_inc(wq
, qos
);
2220 wq
->wq_thscheduled_count
[_wq_bucket(qos
)]++;
2221 workq_thread_reset_pri(wq
, uth
, req
);
2224 uth
->uu_workq_flags
|= UT_WORKQ_EARLY_BOUND
;
2225 if ((req
->tr_flags
& TR_FLAG_OVERCOMMIT
) == 0) {
2226 uth
->uu_workq_flags
&= ~UT_WORKQ_OVERCOMMIT
;
2227 wq
->wq_constrained_threads_scheduled
++;
2229 uth
->uu_save
.uus_workq_park_data
.upcall_flags
= upcall_flags
;
2230 uth
->uu_save
.uus_workq_park_data
.thread_request
= req
;
2231 workq_thread_wakeup(uth
);
2235 } while (unpaced
&& wq
->wq_nthreads
< wq_max_threads
&&
2236 workq_add_new_idle_thread(p
, wq
));
2238 if (_wq_exiting(wq
)) {
2242 req
->tr_count
= reqcount
;
2243 if (workq_threadreq_enqueue(wq
, req
)) {
2244 /* This can drop the workqueue lock, and take it again */
2245 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
2252 zfree(workq_zone_threadreq
, req
);
2257 workq_kern_threadreq_initiate(struct proc
*p
, struct kqrequest
*kqr
,
2258 struct turnstile
*workloop_ts
, thread_qos_t qos
, int flags
)
2260 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2261 workq_threadreq_t req
= &kqr
->kqr_req
;
2262 struct uthread
*uth
= NULL
;
2263 uint8_t tr_flags
= 0;
2265 if (kqr
->kqr_state
& KQR_WORKLOOP
) {
2266 tr_flags
= TR_FLAG_WORKLOOP
;
2268 workq_threadreq_param_t trp
= kqueue_threadreq_workloop_param(req
);
2269 if (trp
.trp_flags
& TRP_PRIORITY
) {
2270 tr_flags
|= TR_FLAG_WL_OUTSIDE_QOS
;
2271 qos
= thread_workq_qos_for_pri(trp
.trp_pri
);
2272 if (qos
== THREAD_QOS_UNSPECIFIED
) {
2273 qos
= WORKQ_THREAD_QOS_ABOVEUI
;
2276 if (trp
.trp_flags
) {
2277 tr_flags
|= TR_FLAG_WL_PARAMS
;
2280 tr_flags
= TR_FLAG_KEVENT
;
2282 if (qos
!= WORKQ_THREAD_QOS_MANAGER
&&
2283 (kqr
->kqr_state
& KQR_THOVERCOMMIT
)) {
2284 tr_flags
|= TR_FLAG_OVERCOMMIT
;
2287 assert(req
->tr_state
== TR_STATE_IDLE
);
2288 priority_queue_entry_init(&req
->tr_entry
);
2290 req
->tr_state
= TR_STATE_NEW
;
2291 req
->tr_flags
= tr_flags
;
2294 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate
| DBG_FUNC_NONE
, wq
,
2295 workq_trace_req_id(req
), qos
, 1, 0);
2297 if (flags
& WORKQ_THREADREQ_ATTEMPT_REBIND
) {
2299 * we're called back synchronously from the context of
2300 * kqueue_threadreq_unbind from within workq_thread_return()
2301 * we can try to match up this thread with this request !
2303 uth
= current_uthread();
2304 assert(uth
->uu_kqr_bound
== NULL
);
2307 workq_lock_spin(wq
);
2308 if (_wq_exiting(wq
)) {
2313 if (uth
&& workq_threadreq_admissible(wq
, uth
, req
)) {
2314 assert(uth
!= wq
->wq_creator
);
2315 workq_threadreq_bind_and_unlock(p
, wq
, req
, uth
);
2318 workq_perform_turnstile_operation_locked(wq
, ^{
2319 turnstile_update_inheritor(workloop_ts
, wq
->wq_turnstile
,
2320 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_TURNSTILE
);
2321 turnstile_update_inheritor_complete(workloop_ts
,
2322 TURNSTILE_INTERLOCK_HELD
);
2325 if (workq_threadreq_enqueue(wq
, req
)) {
2326 workq_schedule_creator(p
, wq
, flags
);
2335 workq_kern_threadreq_modify(struct proc
*p
, struct kqrequest
*kqr
,
2336 thread_qos_t qos
, int flags
)
2338 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2339 workq_threadreq_t req
= &kqr
->kqr_req
;
2340 bool change_overcommit
= false;
2342 if (req
->tr_flags
& TR_FLAG_WL_OUTSIDE_QOS
) {
2343 /* Requests outside-of-QoS shouldn't accept modify operations */
2347 workq_lock_spin(wq
);
2349 assert(req
->tr_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2350 assert(req
->tr_flags
& (TR_FLAG_KEVENT
| TR_FLAG_WORKLOOP
));
2352 if (req
->tr_state
== TR_STATE_BINDING
) {
2353 kqueue_threadreq_bind(p
, req
, req
->tr_binding_thread
, 0);
2358 change_overcommit
= (bool)(kqr
->kqr_state
& KQR_THOVERCOMMIT
) !=
2359 (bool)(req
->tr_flags
& TR_FLAG_OVERCOMMIT
);
2361 if (_wq_exiting(wq
) || (req
->tr_qos
== qos
&& !change_overcommit
)) {
2366 assert(req
->tr_count
== 1);
2367 if (req
->tr_state
!= TR_STATE_QUEUED
) {
2368 panic("Invalid thread request (%p) state %d", req
, req
->tr_state
);
2371 WQ_TRACE_WQ(TRACE_wq_thread_request_modify
| DBG_FUNC_NONE
, wq
,
2372 workq_trace_req_id(req
), qos
, 0, 0);
2374 struct priority_queue
*pq
= workq_priority_queue_for_req(wq
, req
);
2375 workq_threadreq_t req_max
;
2378 * Stage 1: Dequeue the request from its priority queue.
2380 * If we dequeue the root item of the constrained priority queue,
2381 * maintain the best constrained request qos invariant.
2383 if (priority_queue_remove(pq
, &req
->tr_entry
,
2384 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
)) {
2385 if ((req
->tr_flags
& TR_FLAG_OVERCOMMIT
) == 0) {
2386 _wq_thactive_refresh_best_constrained_req_qos(wq
);
2391 * Stage 2: Apply changes to the thread request
2393 * If the item will not become the root of the priority queue it belongs to,
2394 * then we need to wait in line, just enqueue and return quickly.
2396 if (__improbable(change_overcommit
)) {
2397 req
->tr_flags
^= TR_FLAG_OVERCOMMIT
;
2398 pq
= workq_priority_queue_for_req(wq
, req
);
2402 req_max
= priority_queue_max(pq
, struct workq_threadreq_s
, tr_entry
);
2403 if (req_max
&& req_max
->tr_qos
>= qos
) {
2404 priority_queue_insert(pq
, &req
->tr_entry
, workq_priority_for_req(req
),
2405 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
);
2411 * Stage 3: Reevaluate whether we should run the thread request.
2413 * Pretend the thread request is new again:
2414 * - adjust wq_reqcount to not count it anymore.
2415 * - make its state TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
2416 * properly attempts a synchronous bind)
2419 req
->tr_state
= TR_STATE_NEW
;
2420 if (workq_threadreq_enqueue(wq
, req
)) {
2421 workq_schedule_creator(p
, wq
, flags
);
2427 workq_kern_threadreq_lock(struct proc
*p
)
2429 workq_lock_spin(proc_get_wqptr_fast(p
));
2433 workq_kern_threadreq_unlock(struct proc
*p
)
2435 workq_unlock(proc_get_wqptr_fast(p
));
2439 workq_kern_threadreq_update_inheritor(struct proc
*p
, struct kqrequest
*kqr
,
2440 thread_t owner
, struct turnstile
*wl_ts
,
2441 turnstile_update_flags_t flags
)
2443 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2444 workq_threadreq_t req
= &kqr
->kqr_req
;
2445 turnstile_inheritor_t inheritor
;
2447 assert(req
->tr_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2448 assert(req
->tr_flags
& TR_FLAG_WORKLOOP
);
2449 workq_lock_held(wq
);
2451 if (req
->tr_state
== TR_STATE_BINDING
) {
2452 kqueue_threadreq_bind(p
, req
, req
->tr_binding_thread
,
2453 KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE
);
2457 if (_wq_exiting(wq
)) {
2458 inheritor
= TURNSTILE_INHERITOR_NULL
;
2460 if (req
->tr_state
!= TR_STATE_QUEUED
) {
2461 panic("Invalid thread request (%p) state %d", req
, req
->tr_state
);
2466 flags
|= TURNSTILE_INHERITOR_THREAD
;
2468 inheritor
= wq
->wq_turnstile
;
2469 flags
|= TURNSTILE_INHERITOR_TURNSTILE
;
2473 workq_perform_turnstile_operation_locked(wq
, ^{
2474 turnstile_update_inheritor(wl_ts
, inheritor
, flags
);
2479 workq_kern_threadreq_redrive(struct proc
*p
, int flags
)
2481 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2483 workq_lock_spin(wq
);
2484 workq_schedule_creator(p
, wq
, flags
);
2489 workq_schedule_creator_turnstile_redrive(struct workqueue
*wq
, bool locked
)
2491 if (!locked
) workq_lock_spin(wq
);
2492 workq_schedule_creator(NULL
, wq
, WORKQ_THREADREQ_CREATOR_SYNC_UPDATE
);
2493 if (!locked
) workq_unlock(wq
);
2497 workq_thread_return(struct proc
*p
, struct workq_kernreturn_args
*uap
,
2498 struct workqueue
*wq
)
2500 thread_t th
= current_thread();
2501 struct uthread
*uth
= get_bsdthread_info(th
);
2502 struct kqrequest
*kqr
= uth
->uu_kqr_bound
;
2503 workq_threadreq_param_t trp
= { };
2504 int nevents
= uap
->affinity
, error
;
2505 user_addr_t eventlist
= uap
->item
;
2507 if (((thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) == 0) ||
2508 (uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
2512 if (eventlist
&& nevents
&& kqr
== NULL
) {
2516 /* reset signal mask on the workqueue thread to default state */
2517 if (uth
->uu_sigmask
!= (sigset_t
)(~workq_threadmask
)) {
2519 uth
->uu_sigmask
= ~workq_threadmask
;
2523 if (kqr
&& kqr
->kqr_req
.tr_flags
& TR_FLAG_WL_PARAMS
) {
2525 * Ensure we store the threadreq param before unbinding
2526 * the kqr from this thread.
2528 trp
= kqueue_threadreq_workloop_param(&kqr
->kqr_req
);
2532 uint32_t upcall_flags
= WQ_FLAG_THREAD_NEWSPI
| WQ_FLAG_THREAD_REUSE
;
2533 if (kqr
->kqr_state
& KQR_WORKLOOP
) {
2534 upcall_flags
|= WQ_FLAG_THREAD_WORKLOOP
| WQ_FLAG_THREAD_KEVENT
;
2536 upcall_flags
|= WQ_FLAG_THREAD_KEVENT
;
2538 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
2539 upcall_flags
|= WQ_FLAG_THREAD_EVENT_MANAGER
;
2541 if (uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) {
2542 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
2544 if (uth
->uu_workq_flags
& UT_WORKQ_OUTSIDE_QOS
) {
2545 upcall_flags
|= WQ_FLAG_THREAD_OUTSIDEQOS
;
2547 upcall_flags
|= uth
->uu_workq_pri
.qos_req
|
2548 WQ_FLAG_THREAD_PRIO_QOS
;
2552 error
= pthread_functions
->workq_handle_stack_events(p
, th
,
2553 get_task_map(p
->task
), uth
->uu_workq_stackaddr
,
2554 uth
->uu_workq_thport
, eventlist
, nevents
, upcall_flags
);
2555 if (error
) return error
;
2557 // pthread is supposed to pass KEVENT_FLAG_PARKING here
2558 // which should cause the above call to either:
2560 // - return an error
2561 // - return 0 and have unbound properly
2562 assert(uth
->uu_kqr_bound
== NULL
);
2565 WQ_TRACE_WQ(TRACE_wq_runthread
| DBG_FUNC_END
, wq
, uap
->options
, 0, 0, 0);
2567 thread_sched_call(th
, NULL
);
2568 thread_will_park_or_terminate(th
);
2569 #if CONFIG_WORKLOOP_DEBUG
2570 UU_KEVENT_HISTORY_WRITE_ENTRY(uth
, { .uu_error
= -1, });
2573 workq_lock_spin(wq
);
2574 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2575 uth
->uu_save
.uus_workq_park_data
.workloop_params
= trp
.trp_value
;
2576 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
);
2577 __builtin_unreachable();
2581 * Multiplexed call to interact with the workqueue mechanism
2584 workq_kernreturn(struct proc
*p
, struct workq_kernreturn_args
*uap
, int32_t *retval
)
2586 int options
= uap
->options
;
2587 int arg2
= uap
->affinity
;
2588 int arg3
= uap
->prio
;
2589 struct workqueue
*wq
= proc_get_wqptr(p
);
2592 if ((p
->p_lflag
& P_LREGISTER
) == 0) {
2597 case WQOPS_QUEUE_NEWSPISUPP
: {
2599 * arg2 = offset of serialno into dispatch queue
2600 * arg3 = kevent support
2604 // If we get here, then userspace has indicated support for kevent delivery.
2607 p
->p_dispatchqueue_serialno_offset
= (uint64_t)offset
;
2610 case WQOPS_QUEUE_REQTHREADS
: {
2612 * arg2 = number of threads to start
2615 error
= workq_reqthreads(p
, arg2
, arg3
);
2618 case WQOPS_SET_EVENT_MANAGER_PRIORITY
: {
2620 * arg2 = priority for the manager thread
2622 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
2623 * the low bits of the value contains a scheduling priority
2624 * instead of a QOS value
2626 pthread_priority_t pri
= arg2
;
2634 * Normalize the incoming priority so that it is ordered numerically.
2636 if (pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
2637 pri
&= (_PTHREAD_PRIORITY_SCHED_PRI_MASK
|
2638 _PTHREAD_PRIORITY_SCHED_PRI_FLAG
);
2640 thread_qos_t qos
= _pthread_priority_thread_qos(pri
);
2641 int relpri
= _pthread_priority_relpri(pri
);
2642 if (relpri
> 0 || relpri
< THREAD_QOS_MIN_TIER_IMPORTANCE
||
2643 qos
== THREAD_QOS_UNSPECIFIED
) {
2647 pri
&= ~_PTHREAD_PRIORITY_FLAGS_MASK
;
2651 * If userspace passes a scheduling priority, that wins over any QoS.
2652 * Userspace should takes care not to lower the priority this way.
2654 workq_lock_spin(wq
);
2655 if (wq
->wq_event_manager_priority
< (uint32_t)pri
) {
2656 wq
->wq_event_manager_priority
= (uint32_t)pri
;
2661 case WQOPS_THREAD_KEVENT_RETURN
:
2662 case WQOPS_THREAD_WORKLOOP_RETURN
:
2663 case WQOPS_THREAD_RETURN
: {
2664 error
= workq_thread_return(p
, uap
, wq
);
2668 case WQOPS_SHOULD_NARROW
: {
2670 * arg2 = priority to test
2673 thread_t th
= current_thread();
2674 struct uthread
*uth
= get_bsdthread_info(th
);
2675 if (((thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) == 0) ||
2676 (uth
->uu_workq_flags
& (UT_WORKQ_DYING
|UT_WORKQ_OVERCOMMIT
))) {
2681 thread_qos_t qos
= _pthread_priority_thread_qos(arg2
);
2682 if (qos
== THREAD_QOS_UNSPECIFIED
) {
2686 workq_lock_spin(wq
);
2687 bool should_narrow
= !workq_constrained_allowance(wq
, qos
, uth
, false);
2690 *retval
= should_narrow
;
2702 * We have no work to do, park ourselves on the idle list.
2704 * Consumes the workqueue lock and does not return.
2706 __attribute__((noreturn
, noinline
))
2708 workq_park_and_unlock(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
)
2710 assert(uth
== current_uthread());
2711 assert(uth
->uu_kqr_bound
== NULL
);
2712 workq_push_idle_thread(p
, wq
, uth
); // may not return
2714 workq_thread_reset_cpupercent(NULL
, uth
);
2716 if (uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) {
2720 * workq_push_idle_thread() will unset `has_stack`
2721 * if it wants us to free the stack before parking.
2723 if (!uth
->uu_save
.uus_workq_park_data
.has_stack
) {
2724 pthread_functions
->workq_markfree_threadstack(p
, uth
->uu_thread
,
2725 get_task_map(p
->task
), uth
->uu_workq_stackaddr
);
2729 * When we remove the voucher from the thread, we may lose our importance
2730 * causing us to get preempted, so we do this after putting the thread on
2731 * the idle list. Then, when we get our importance back we'll be able to
2732 * use this thread from e.g. the kevent call out to deliver a boosting
2735 __assert_only kern_return_t kr
;
2736 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
2737 assert(kr
== KERN_SUCCESS
);
2739 workq_lock_spin(wq
);
2740 uth
->uu_workq_flags
&= ~UT_WORKQ_IDLE_CLEANUP
;
2743 if (uth
->uu_workq_flags
& UT_WORKQ_RUNNING
) {
2745 * While we'd dropped the lock to unset our voucher, someone came
2746 * around and made us runnable. But because we weren't waiting on the
2747 * event their thread_wakeup() was ineffectual. To correct for that,
2748 * we just run the continuation ourselves.
2750 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2751 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
);
2752 __builtin_unreachable();
2755 if (uth
->uu_workq_flags
& UT_WORKQ_DYING
) {
2756 workq_unpark_for_death_and_unlock(p
, wq
, uth
,
2757 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
);
2758 __builtin_unreachable();
2761 thread_set_pending_block_hint(uth
->uu_thread
, kThreadWaitParkedWorkQueue
);
2762 assert_wait(workq_parked_wait_event(uth
), THREAD_INTERRUPTIBLE
);
2764 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2765 thread_block(workq_unpark_continue
);
2766 __builtin_unreachable();
2770 workq_may_start_event_mgr_thread(struct workqueue
*wq
, struct uthread
*uth
)
2773 * There's an event manager request and either:
2774 * - no event manager currently running
2775 * - we are re-using the event manager
2777 return wq
->wq_thscheduled_count
[_wq_bucket(WORKQ_THREAD_QOS_MANAGER
)] == 0 ||
2778 (uth
&& uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
);
2782 workq_constrained_allowance(struct workqueue
*wq
, thread_qos_t at_qos
,
2783 struct uthread
*uth
, bool may_start_timer
)
2785 assert(at_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2788 uint32_t max_count
= wq
->wq_constrained_threads_scheduled
;
2789 if (uth
&& (uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
2791 * don't count the current thread as scheduled
2793 assert(max_count
> 0);
2796 if (max_count
>= wq_max_constrained_threads
) {
2797 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 1,
2798 wq
->wq_constrained_threads_scheduled
,
2799 wq_max_constrained_threads
, 0);
2801 * we need 1 or more constrained threads to return to the kernel before
2802 * we can dispatch additional work
2806 max_count
-= wq_max_constrained_threads
;
2809 * Compute a metric for many how many threads are active. We find the
2810 * highest priority request outstanding and then add up the number of
2811 * active threads in that and all higher-priority buckets. We'll also add
2812 * any "busy" threads which are not active but blocked recently enough that
2813 * we can't be sure they've gone idle yet. We'll then compare this metric
2814 * to our max concurrency to decide whether to add a new thread.
2817 uint32_t busycount
, thactive_count
;
2819 thactive_count
= _wq_thactive_aggregate_downto_qos(wq
, _wq_thactive(wq
),
2820 at_qos
, &busycount
, NULL
);
2822 if (uth
&& uth
->uu_workq_pri
.qos_bucket
!= WORKQ_THREAD_QOS_MANAGER
&&
2823 at_qos
<= uth
->uu_workq_pri
.qos_bucket
) {
2825 * Don't count this thread as currently active, but only if it's not
2826 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
2829 assert(thactive_count
> 0);
2833 count
= wq_max_parallelism
[_wq_bucket(at_qos
)];
2834 if (count
> thactive_count
+ busycount
) {
2835 count
-= thactive_count
+ busycount
;
2836 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 2,
2837 thactive_count
, busycount
, 0);
2838 return MIN(count
, max_count
);
2840 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 3,
2841 thactive_count
, busycount
, 0);
2844 if (busycount
&& may_start_timer
) {
2846 * If this is called from the add timer, we won't have another timer
2847 * fire when the thread exits the "busy" state, so rearm the timer.
2849 workq_schedule_delayed_thread_creation(wq
, 0);
2856 workq_threadreq_admissible(struct workqueue
*wq
, struct uthread
*uth
,
2857 workq_threadreq_t req
)
2859 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
2860 return workq_may_start_event_mgr_thread(wq
, uth
);
2862 if ((req
->tr_flags
& TR_FLAG_OVERCOMMIT
) == 0) {
2863 return workq_constrained_allowance(wq
, req
->tr_qos
, uth
, true);
2868 static workq_threadreq_t
2869 workq_threadreq_select_for_creator(struct workqueue
*wq
)
2871 workq_threadreq_t req_qos
, req_pri
, req_tmp
;
2872 thread_qos_t qos
= THREAD_QOS_UNSPECIFIED
;
2875 req_tmp
= wq
->wq_event_manager_threadreq
;
2876 if (req_tmp
&& workq_may_start_event_mgr_thread(wq
, NULL
)) {
2881 * Compute the best priority request, and ignore the turnstile for now
2884 req_pri
= priority_queue_max(&wq
->wq_special_queue
,
2885 struct workq_threadreq_s
, tr_entry
);
2887 pri
= priority_queue_entry_key(&wq
->wq_special_queue
, &req_pri
->tr_entry
);
2891 * Compute the best QoS Request, and check whether it beats the "pri" one
2894 req_qos
= priority_queue_max(&wq
->wq_overcommit_queue
,
2895 struct workq_threadreq_s
, tr_entry
);
2897 qos
= req_qos
->tr_qos
;
2900 req_tmp
= priority_queue_max(&wq
->wq_constrained_queue
,
2901 struct workq_threadreq_s
, tr_entry
);
2903 if (req_tmp
&& qos
< req_tmp
->tr_qos
) {
2904 if (pri
&& pri
>= thread_workq_pri_for_qos(req_tmp
->tr_qos
)) {
2908 if (workq_constrained_allowance(wq
, req_tmp
->tr_qos
, NULL
, true)) {
2910 * If the constrained thread request is the best one and passes
2911 * the admission check, pick it.
2917 if (pri
&& (!qos
|| pri
>= thread_workq_pri_for_qos(qos
))) {
2926 * If we had no eligible request but we have a turnstile push,
2927 * it must be a non overcommit thread request that failed
2928 * the admission check.
2930 * Just fake a BG thread request so that if the push stops the creator
2931 * priority just drops to 4.
2933 if (turnstile_workq_proprietor_of_max_turnstile(wq
->wq_turnstile
, NULL
)) {
2934 static struct workq_threadreq_s workq_sync_push_fake_req
= {
2935 .tr_qos
= THREAD_QOS_BACKGROUND
,
2938 return &workq_sync_push_fake_req
;
2944 static workq_threadreq_t
2945 workq_threadreq_select(struct workqueue
*wq
, struct uthread
*uth
)
2947 workq_threadreq_t req_qos
, req_pri
, req_tmp
;
2948 uintptr_t proprietor
;
2949 thread_qos_t qos
= THREAD_QOS_UNSPECIFIED
;
2952 if (uth
== wq
->wq_creator
) uth
= NULL
;
2954 req_tmp
= wq
->wq_event_manager_threadreq
;
2955 if (req_tmp
&& workq_may_start_event_mgr_thread(wq
, uth
)) {
2960 * Compute the best priority request (special or turnstile)
2963 pri
= turnstile_workq_proprietor_of_max_turnstile(wq
->wq_turnstile
,
2966 struct kqworkloop
*kqwl
= (struct kqworkloop
*)proprietor
;
2967 req_pri
= &kqwl
->kqwl_request
.kqr_req
;
2968 if (req_pri
->tr_state
!= TR_STATE_QUEUED
) {
2969 panic("Invalid thread request (%p) state %d",
2970 req_pri
, req_pri
->tr_state
);
2976 req_tmp
= priority_queue_max(&wq
->wq_special_queue
,
2977 struct workq_threadreq_s
, tr_entry
);
2978 if (req_tmp
&& pri
< priority_queue_entry_key(&wq
->wq_special_queue
,
2979 &req_tmp
->tr_entry
)) {
2981 pri
= priority_queue_entry_key(&wq
->wq_special_queue
, &req_tmp
->tr_entry
);
2985 * Compute the best QoS Request, and check whether it beats the "pri" one
2988 req_qos
= priority_queue_max(&wq
->wq_overcommit_queue
,
2989 struct workq_threadreq_s
, tr_entry
);
2991 qos
= req_qos
->tr_qos
;
2994 req_tmp
= priority_queue_max(&wq
->wq_constrained_queue
,
2995 struct workq_threadreq_s
, tr_entry
);
2997 if (req_tmp
&& qos
< req_tmp
->tr_qos
) {
2998 if (pri
&& pri
>= thread_workq_pri_for_qos(req_tmp
->tr_qos
)) {
3002 if (workq_constrained_allowance(wq
, req_tmp
->tr_qos
, uth
, true)) {
3004 * If the constrained thread request is the best one and passes
3005 * the admission check, pick it.
3011 if (req_pri
&& (!qos
|| pri
>= thread_workq_pri_for_qos(qos
))) {
3019 * The creator is an anonymous thread that is counted as scheduled,
3020 * but otherwise without its scheduler callback set or tracked as active
3021 * that is used to make other threads.
3023 * When more requests are added or an existing one is hurried along,
3024 * a creator is elected and setup, or the existing one overridden accordingly.
3026 * While this creator is in flight, because no request has been dequeued,
3027 * already running threads have a chance at stealing thread requests avoiding
3028 * useless context switches, and the creator once scheduled may not find any
3029 * work to do and will then just park again.
3031 * The creator serves the dual purpose of informing the scheduler of work that
3032 * hasn't be materialized as threads yet, and also as a natural pacing mechanism
3033 * for thread creation.
3035 * By being anonymous (and not bound to anything) it means that thread requests
3036 * can be stolen from this creator by threads already on core yielding more
3037 * efficient scheduling and reduced context switches.
3040 workq_schedule_creator(proc_t p
, struct workqueue
*wq
, int flags
)
3042 workq_threadreq_t req
;
3043 struct uthread
*uth
;
3045 workq_lock_held(wq
);
3046 assert(p
|| (flags
& WORKQ_THREADREQ_CAN_CREATE_THREADS
) == 0);
3049 uth
= wq
->wq_creator
;
3051 if (!wq
->wq_reqcount
) {
3053 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
3058 req
= workq_threadreq_select_for_creator(wq
);
3060 if (flags
& WORKQ_THREADREQ_CREATOR_SYNC_UPDATE
) {
3061 assert((flags
& WORKQ_THREADREQ_CREATOR_TRANSFER
) == 0);
3063 * turnstile propagation code is reaching out to us,
3064 * and we still don't want to do anything, do not recurse.
3067 workq_turnstile_update_inheritor(wq
, wq
, TURNSTILE_INHERITOR_WORKQ
);
3074 * We need to maybe override the creator we already have
3076 if (workq_thread_needs_priority_change(req
, uth
)) {
3077 WQ_TRACE_WQ(TRACE_wq_creator_select
| DBG_FUNC_NONE
,
3078 wq
, 1, thread_tid(uth
->uu_thread
), req
->tr_qos
, 0);
3079 workq_thread_reset_pri(wq
, uth
, req
);
3081 } else if (wq
->wq_thidlecount
) {
3083 * We need to unpark a creator thread
3085 wq
->wq_creator
= uth
= workq_pop_idle_thread(wq
);
3086 if (workq_thread_needs_priority_change(req
, uth
)) {
3087 workq_thread_reset_pri(wq
, uth
, req
);
3089 workq_turnstile_update_inheritor(wq
, uth
->uu_thread
,
3090 TURNSTILE_INHERITOR_THREAD
);
3091 WQ_TRACE_WQ(TRACE_wq_creator_select
| DBG_FUNC_NONE
,
3092 wq
, 2, thread_tid(uth
->uu_thread
), req
->tr_qos
, 0);
3093 uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
= wq
->wq_fulfilled
;
3094 uth
->uu_save
.uus_workq_park_data
.yields
= 0;
3095 workq_thread_wakeup(uth
);
3098 * We need to allocate a thread...
3100 if (__improbable(wq
->wq_nthreads
>= wq_max_threads
)) {
3101 /* out of threads, just go away */
3102 } else if (flags
& WORKQ_THREADREQ_SET_AST_ON_FAILURE
) {
3103 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ
);
3104 } else if (!(flags
& WORKQ_THREADREQ_CAN_CREATE_THREADS
)) {
3105 /* This can drop the workqueue lock, and take it again */
3106 workq_schedule_immediate_thread_creation(wq
);
3107 } else if (workq_add_new_idle_thread(p
, wq
)) {
3110 workq_schedule_delayed_thread_creation(wq
, 0);
3113 if (flags
& WORKQ_THREADREQ_CREATOR_TRANSFER
) {
3115 * workq_schedule_creator() failed at creating a thread,
3116 * and the responsibility of redriving is now with a thread-call.
3118 * We still need to tell the turnstile the previous creator is gone.
3120 workq_turnstile_update_inheritor(wq
, NULL
, 0);
3126 * Runs a thread request on a thread
3128 * - if thread is THREAD_NULL, will find a thread and run the request there.
3129 * Otherwise, the thread must be the current thread.
3131 * - if req is NULL, will find the highest priority request and run that. If
3132 * it is not NULL, it must be a threadreq object in state NEW. If it can not
3133 * be run immediately, it will be enqueued and moved to state QUEUED.
3135 * Either way, the thread request object serviced will be moved to state
3136 * BINDING and attached to the uthread.
3138 * Should be called with the workqueue lock held. Will drop it.
3140 __attribute__((noreturn
, noinline
))
3142 workq_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
3143 struct uthread
*uth
)
3145 uint32_t setup_flags
= 0;
3146 workq_threadreq_t req
;
3148 if (uth
->uu_workq_flags
& UT_WORKQ_EARLY_BOUND
) {
3149 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
3150 setup_flags
|= WQ_SETUP_FIRST_USE
;
3152 uth
->uu_workq_flags
&= ~(UT_WORKQ_NEW
| UT_WORKQ_EARLY_BOUND
);
3154 * This pointer is possibly freed and only used for tracing purposes.
3156 req
= uth
->uu_save
.uus_workq_park_data
.thread_request
;
3158 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
3159 VM_KERNEL_ADDRHIDE(req
), 0, 0, 0);
3161 } else if (_wq_exiting(wq
)) {
3162 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
3163 } else if (wq
->wq_reqcount
== 0) {
3164 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 1, 0, 0, 0);
3165 } else if ((req
= workq_threadreq_select(wq
, uth
)) == NULL
) {
3166 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 2, 0, 0, 0);
3168 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
3169 workq_trace_req_id(req
), 0, 0, 0);
3170 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
3171 uth
->uu_workq_flags
^= UT_WORKQ_NEW
;
3172 setup_flags
|= WQ_SETUP_FIRST_USE
;
3174 workq_thread_reset_cpupercent(req
, uth
);
3175 workq_threadreq_bind_and_unlock(p
, wq
, req
, uth
);
3177 workq_setup_and_run(p
, uth
, setup_flags
);
3178 __builtin_unreachable();
3181 workq_park_and_unlock(p
, wq
, uth
);
3182 __builtin_unreachable();
3186 workq_creator_should_yield(struct workqueue
*wq
, struct uthread
*uth
)
3188 thread_qos_t qos
= workq_pri_override(uth
->uu_workq_pri
);
3190 if (qos
>= THREAD_QOS_USER_INTERACTIVE
) {
3194 uint32_t snapshot
= uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
;
3195 if (wq
->wq_fulfilled
== snapshot
) {
3199 uint32_t cnt
= 0, conc
= wq_max_parallelism
[_wq_bucket(qos
)];
3200 if (wq
->wq_fulfilled
- snapshot
> conc
) {
3201 /* we fulfilled more than NCPU requests since being dispatched */
3202 WQ_TRACE_WQ(TRACE_wq_creator_yield
, wq
, 1,
3203 wq
->wq_fulfilled
, snapshot
, 0);
3207 for (int i
= _wq_bucket(qos
); i
< WORKQ_NUM_QOS_BUCKETS
; i
++) {
3208 cnt
+= wq
->wq_thscheduled_count
[i
];
3211 /* We fulfilled requests and have more than NCPU scheduled threads */
3212 WQ_TRACE_WQ(TRACE_wq_creator_yield
, wq
, 2,
3213 wq
->wq_fulfilled
, snapshot
, 0);
3221 * parked thread wakes up
3223 __attribute__((noreturn
, noinline
))
3225 workq_unpark_continue(void *parameter __unused
, wait_result_t wr __unused
)
3227 struct uthread
*uth
= current_uthread();
3228 proc_t p
= current_proc();
3229 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
3231 workq_lock_spin(wq
);
3233 if (wq
->wq_creator
== uth
&& workq_creator_should_yield(wq
, uth
)) {
3235 * If the number of threads we have out are able to keep up with the
3236 * demand, then we should avoid sending this creator thread to
3239 uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
= wq
->wq_fulfilled
;
3240 uth
->uu_save
.uus_workq_park_data
.yields
++;
3242 thread_yield_with_continuation(workq_unpark_continue
, NULL
);
3243 __builtin_unreachable();
3246 if (__probable(uth
->uu_workq_flags
& UT_WORKQ_RUNNING
)) {
3247 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
);
3248 __builtin_unreachable();
3251 if (__probable(wr
== THREAD_AWAKENED
)) {
3253 * We were set running, but for the purposes of dying.
3255 assert(uth
->uu_workq_flags
& UT_WORKQ_DYING
);
3256 assert((uth
->uu_workq_flags
& UT_WORKQ_NEW
) == 0);
3259 * workaround for <rdar://problem/38647347>,
3260 * in case we do hit userspace, make sure calling
3261 * workq_thread_terminate() does the right thing here,
3262 * and if we never call it, that workq_exit() will too because it sees
3263 * this thread on the runlist.
3265 assert(wr
== THREAD_INTERRUPTED
);
3266 wq
->wq_thdying_count
++;
3267 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
3270 workq_unpark_for_death_and_unlock(p
, wq
, uth
,
3271 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
);
3272 __builtin_unreachable();
3275 __attribute__((noreturn
, noinline
))
3277 workq_setup_and_run(proc_t p
, struct uthread
*uth
, int setup_flags
)
3279 thread_t th
= uth
->uu_thread
;
3280 vm_map_t vmap
= get_task_map(p
->task
);
3282 if (setup_flags
& WQ_SETUP_CLEAR_VOUCHER
) {
3284 * For preemption reasons, we want to reset the voucher as late as
3285 * possible, so we do it in two places:
3286 * - Just before parking (i.e. in workq_park_and_unlock())
3287 * - Prior to doing the setup for the next workitem (i.e. here)
3289 * Those two places are sufficient to ensure we always reset it before
3290 * it goes back out to user space, but be careful to not break that
3293 __assert_only kern_return_t kr
;
3294 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
3295 assert(kr
== KERN_SUCCESS
);
3298 uint32_t upcall_flags
= uth
->uu_save
.uus_workq_park_data
.upcall_flags
;
3299 if (!(setup_flags
& WQ_SETUP_FIRST_USE
)) {
3300 upcall_flags
|= WQ_FLAG_THREAD_REUSE
;
3303 if (uth
->uu_workq_flags
& UT_WORKQ_OUTSIDE_QOS
) {
3305 * For threads that have an outside-of-QoS thread priority, indicate
3306 * to userspace that setting QoS should only affect the TSD and not
3307 * change QOS in the kernel.
3309 upcall_flags
|= WQ_FLAG_THREAD_OUTSIDEQOS
;
3312 * Put the QoS class value into the lower bits of the reuse_thread
3313 * register, this is where the thread priority used to be stored
3316 upcall_flags
|= uth
->uu_save
.uus_workq_park_data
.qos
|
3317 WQ_FLAG_THREAD_PRIO_QOS
;
3320 if (uth
->uu_workq_thport
== MACH_PORT_NULL
) {
3321 /* convert_thread_to_port() consumes a reference */
3322 thread_reference(th
);
3323 ipc_port_t port
= convert_thread_to_port(th
);
3324 uth
->uu_workq_thport
= ipc_port_copyout_send(port
, get_task_ipcspace(p
->task
));
3328 * Call out to pthread, this sets up the thread, pulls in kevent structs
3329 * onto the stack, sets up the thread state and then returns to userspace.
3331 WQ_TRACE_WQ(TRACE_wq_runthread
| DBG_FUNC_START
,
3332 proc_get_wqptr_fast(p
), 0, 0, 0, 0);
3333 thread_sched_call(th
, workq_sched_callback
);
3334 pthread_functions
->workq_setup_thread(p
, th
, vmap
, uth
->uu_workq_stackaddr
,
3335 uth
->uu_workq_thport
, 0, setup_flags
, upcall_flags
);
3337 __builtin_unreachable();
3343 fill_procworkqueue(proc_t p
, struct proc_workqueueinfo
* pwqinfo
)
3345 struct workqueue
*wq
= proc_get_wqptr(p
);
3354 * This is sometimes called from interrupt context by the kperf sampler.
3355 * In that case, it's not safe to spin trying to take the lock since we
3356 * might already hold it. So, we just try-lock it and error out if it's
3357 * already held. Since this is just a debugging aid, and all our callers
3358 * are able to handle an error, that's fine.
3360 bool locked
= workq_lock_try(wq
);
3365 wq_thactive_t act
= _wq_thactive(wq
);
3366 activecount
= _wq_thactive_aggregate_downto_qos(wq
, act
,
3367 WORKQ_THREAD_QOS_MIN
, NULL
, NULL
);
3368 if (act
& _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER
)) {
3371 pwqinfo
->pwq_nthreads
= wq
->wq_nthreads
;
3372 pwqinfo
->pwq_runthreads
= activecount
;
3373 pwqinfo
->pwq_blockedthreads
= wq
->wq_threads_scheduled
- activecount
;
3374 pwqinfo
->pwq_state
= 0;
3376 if (wq
->wq_constrained_threads_scheduled
>= wq_max_constrained_threads
) {
3377 pwqinfo
->pwq_state
|= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
;
3380 if (wq
->wq_nthreads
>= wq_max_threads
) {
3381 pwqinfo
->pwq_state
|= WQ_EXCEEDED_TOTAL_THREAD_LIMIT
;
3389 workqueue_get_pwq_exceeded(void *v
, boolean_t
*exceeded_total
,
3390 boolean_t
*exceeded_constrained
)
3393 struct proc_workqueueinfo pwqinfo
;
3397 assert(exceeded_total
!= NULL
);
3398 assert(exceeded_constrained
!= NULL
);
3400 err
= fill_procworkqueue(p
, &pwqinfo
);
3404 if (!(pwqinfo
.pwq_state
& WQ_FLAGS_AVAILABLE
)) {
3408 *exceeded_total
= (pwqinfo
.pwq_state
& WQ_EXCEEDED_TOTAL_THREAD_LIMIT
);
3409 *exceeded_constrained
= (pwqinfo
.pwq_state
& WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
);
3415 workqueue_get_pwq_state_kdp(void * v
)
3417 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
<< 17) ==
3418 kTaskWqExceededConstrainedThreadLimit
);
3419 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT
<< 17) ==
3420 kTaskWqExceededTotalThreadLimit
);
3421 static_assert((WQ_FLAGS_AVAILABLE
<< 17) == kTaskWqFlagsAvailable
);
3422 static_assert((WQ_FLAGS_AVAILABLE
| WQ_EXCEEDED_TOTAL_THREAD_LIMIT
|
3423 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
) == 0x7);
3430 struct workqueue
*wq
= proc_get_wqptr(p
);
3432 if (wq
== NULL
|| workq_lock_spin_is_acquired_kdp(wq
)) {
3436 uint32_t pwq_state
= WQ_FLAGS_AVAILABLE
;
3438 if (wq
->wq_constrained_threads_scheduled
>= wq_max_constrained_threads
) {
3439 pwq_state
|= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
;
3442 if (wq
->wq_nthreads
>= wq_max_threads
) {
3443 pwq_state
|= WQ_EXCEEDED_TOTAL_THREAD_LIMIT
;
3452 workq_lck_grp_attr
= lck_grp_attr_alloc_init();
3453 workq_lck_attr
= lck_attr_alloc_init();
3454 workq_lck_grp
= lck_grp_alloc_init("workq", workq_lck_grp_attr
);
3456 workq_zone_workqueue
= zinit(sizeof(struct workqueue
),
3457 1024 * sizeof(struct workqueue
), 8192, "workq.wq");
3458 workq_zone_threadreq
= zinit(sizeof(struct workq_threadreq_s
),
3459 1024 * sizeof(struct workq_threadreq_s
), 8192, "workq.threadreq");
3461 clock_interval_to_absolutetime_interval(wq_stalled_window
.usecs
,
3462 NSEC_PER_USEC
, &wq_stalled_window
.abstime
);
3463 clock_interval_to_absolutetime_interval(wq_reduce_pool_window
.usecs
,
3464 NSEC_PER_USEC
, &wq_reduce_pool_window
.abstime
);
3465 clock_interval_to_absolutetime_interval(wq_max_timer_interval
.usecs
,
3466 NSEC_PER_USEC
, &wq_max_timer_interval
.abstime
);