2 * Copyright (c) 2000-2017 Apple Inc. All rights reserved.
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
30 #include <sys/cdefs.h>
32 #include <kern/assert.h>
34 #include <kern/clock.h>
35 #include <kern/cpu_data.h>
36 #include <kern/kern_types.h>
37 #include <kern/policy_internal.h>
38 #include <kern/processor.h>
39 #include <kern/sched_prim.h> /* for thread_exception_return */
40 #include <kern/task.h>
41 #include <kern/thread.h>
42 #include <kern/zalloc.h>
43 #include <mach/kern_return.h>
44 #include <mach/mach_param.h>
45 #include <mach/mach_port.h>
46 #include <mach/mach_types.h>
47 #include <mach/mach_vm.h>
48 #include <mach/sync_policy.h>
49 #include <mach/task.h>
50 #include <mach/thread_act.h> /* for thread_resume */
51 #include <mach/thread_policy.h>
52 #include <mach/thread_status.h>
53 #include <mach/vm_prot.h>
54 #include <mach/vm_statistics.h>
55 #include <machine/atomic.h>
56 #include <machine/machine_routines.h>
57 #include <vm/vm_map.h>
58 #include <vm/vm_protos.h>
60 #include <sys/eventvar.h>
61 #include <sys/kdebug.h>
62 #include <sys/kernel.h>
64 #include <sys/param.h>
65 #include <sys/proc_info.h> /* for fill_procworkqueue */
66 #include <sys/proc_internal.h>
67 #include <sys/pthread_shims.h>
68 #include <sys/resourcevar.h>
69 #include <sys/signalvar.h>
70 #include <sys/sysctl.h>
71 #include <sys/sysproto.h>
72 #include <sys/systm.h>
73 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
75 #include <pthread/bsdthread_private.h>
76 #include <pthread/workqueue_syscalls.h>
77 #include <pthread/workqueue_internal.h>
78 #include <pthread/workqueue_trace.h>
82 static void workq_unpark_continue(void *uth
, wait_result_t wr
) __dead2
;
83 static void workq_schedule_creator(proc_t p
, struct workqueue
*wq
,
84 workq_kern_threadreq_flags_t flags
);
86 static bool workq_threadreq_admissible(struct workqueue
*wq
, struct uthread
*uth
,
87 workq_threadreq_t req
);
89 static uint32_t workq_constrained_allowance(struct workqueue
*wq
,
90 thread_qos_t at_qos
, struct uthread
*uth
, bool may_start_timer
);
92 static bool workq_thread_is_busy(uint64_t cur_ts
,
93 _Atomic
uint64_t *lastblocked_tsp
);
95 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
;
99 struct workq_usec_var
{
104 #define WORKQ_SYSCTL_USECS(var, init) \
105 static struct workq_usec_var var = { .usecs = init }; \
106 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
107 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
108 workq_sysctl_handle_usecs, "I", "")
110 static lck_grp_t
*workq_lck_grp
;
111 static lck_attr_t
*workq_lck_attr
;
112 static lck_grp_attr_t
*workq_lck_grp_attr
;
113 os_refgrp_decl(static, workq_refgrp
, "workq", NULL
);
115 static struct mpsc_daemon_queue workq_deallocate_queue
;
116 static zone_t workq_zone_workqueue
;
117 static zone_t workq_zone_threadreq
;
119 WORKQ_SYSCTL_USECS(wq_stalled_window
, WQ_STALLED_WINDOW_USECS
);
120 WORKQ_SYSCTL_USECS(wq_reduce_pool_window
, WQ_REDUCE_POOL_WINDOW_USECS
);
121 WORKQ_SYSCTL_USECS(wq_max_timer_interval
, WQ_MAX_TIMER_INTERVAL_USECS
);
122 static uint32_t wq_max_threads
= WORKQUEUE_MAXTHREADS
;
123 static uint32_t wq_max_constrained_threads
= WORKQUEUE_MAXTHREADS
/ 8;
124 static uint32_t wq_init_constrained_limit
= 1;
125 static uint16_t wq_death_max_load
;
126 static uint32_t wq_max_parallelism
[WORKQ_NUM_QOS_BUCKETS
];
131 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
134 struct workq_usec_var
*v
= arg1
;
135 int error
= sysctl_handle_int(oidp
, &v
->usecs
, 0, req
);
136 if (error
|| !req
->newptr
) {
139 clock_interval_to_absolutetime_interval(v
->usecs
, NSEC_PER_USEC
,
144 SYSCTL_INT(_kern
, OID_AUTO
, wq_max_threads
, CTLFLAG_RW
| CTLFLAG_LOCKED
,
145 &wq_max_threads
, 0, "");
147 SYSCTL_INT(_kern
, OID_AUTO
, wq_max_constrained_threads
, CTLFLAG_RW
| CTLFLAG_LOCKED
,
148 &wq_max_constrained_threads
, 0, "");
152 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
154 static struct workqueue
*
155 proc_get_wqptr_fast(struct proc
*p
)
157 return os_atomic_load(&p
->p_wqptr
, relaxed
);
160 static struct workqueue
*
161 proc_get_wqptr(struct proc
*p
)
163 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
164 return wq
== WQPTR_IS_INITING_VALUE
? NULL
: wq
;
168 proc_set_wqptr(struct proc
*p
, struct workqueue
*wq
)
170 wq
= os_atomic_xchg(&p
->p_wqptr
, wq
, release
);
171 if (wq
== WQPTR_IS_INITING_VALUE
) {
173 thread_wakeup(&p
->p_wqptr
);
179 proc_init_wqptr_or_wait(struct proc
*p
)
181 struct workqueue
*wq
;
184 wq
= os_atomic_load(&p
->p_wqptr
, relaxed
);
187 os_atomic_store(&p
->p_wqptr
, WQPTR_IS_INITING_VALUE
, relaxed
);
192 if (wq
== WQPTR_IS_INITING_VALUE
) {
193 assert_wait(&p
->p_wqptr
, THREAD_UNINT
);
195 thread_block(THREAD_CONTINUE_NULL
);
202 static inline event_t
203 workq_parked_wait_event(struct uthread
*uth
)
205 return (event_t
)&uth
->uu_workq_stackaddr
;
209 workq_thread_wakeup(struct uthread
*uth
)
211 thread_wakeup_thread(workq_parked_wait_event(uth
), uth
->uu_thread
);
214 #pragma mark wq_thactive
216 #if defined(__LP64__)
218 // 127 - 115 : 13 bits of zeroes
219 // 114 - 112 : best QoS among all pending constrained requests
220 // 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
221 #define WQ_THACTIVE_BUCKET_WIDTH 16
222 #define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH)
225 // 63 - 61 : best QoS among all pending constrained requests
226 // 60 : Manager bucket (0 or 1)
227 // 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
228 #define WQ_THACTIVE_BUCKET_WIDTH 10
229 #define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
231 #define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
232 #define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
234 static_assert(sizeof(wq_thactive_t
) * CHAR_BIT
- WQ_THACTIVE_QOS_SHIFT
>= 3,
235 "Make sure we have space to encode a QoS");
237 static inline wq_thactive_t
238 _wq_thactive(struct workqueue
*wq
)
240 return os_atomic_load_wide(&wq
->wq_thactive
, relaxed
);
244 _wq_bucket(thread_qos_t qos
)
246 // Map both BG and MT to the same bucket by over-shifting down and
247 // clamping MT and BG together.
249 case THREAD_QOS_MAINTENANCE
:
256 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
257 ((tha) >> WQ_THACTIVE_QOS_SHIFT)
259 static inline thread_qos_t
260 _wq_thactive_best_constrained_req_qos(struct workqueue
*wq
)
262 // Avoid expensive atomic operations: the three bits we're loading are in
263 // a single byte, and always updated under the workqueue lock
264 wq_thactive_t v
= *(wq_thactive_t
*)&wq
->wq_thactive
;
265 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v
);
269 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue
*wq
)
271 thread_qos_t old_qos
, new_qos
;
272 workq_threadreq_t req
;
274 req
= priority_queue_max(&wq
->wq_constrained_queue
,
275 struct workq_threadreq_s
, tr_entry
);
276 new_qos
= req
? req
->tr_qos
: THREAD_QOS_UNSPECIFIED
;
277 old_qos
= _wq_thactive_best_constrained_req_qos(wq
);
278 if (old_qos
!= new_qos
) {
279 long delta
= (long)new_qos
- (long)old_qos
;
280 wq_thactive_t v
= (wq_thactive_t
)delta
<< WQ_THACTIVE_QOS_SHIFT
;
282 * We can do an atomic add relative to the initial load because updates
283 * to this qos are always serialized under the workqueue lock.
285 v
= os_atomic_add(&wq
->wq_thactive
, v
, relaxed
);
287 WQ_TRACE_WQ(TRACE_wq_thactive_update
, wq
, (uint64_t)v
,
288 (uint64_t)(v
>> 64), 0, 0);
290 WQ_TRACE_WQ(TRACE_wq_thactive_update
, wq
, v
, 0, 0, 0);
295 static inline wq_thactive_t
296 _wq_thactive_offset_for_qos(thread_qos_t qos
)
298 return (wq_thactive_t
)1 << (_wq_bucket(qos
) * WQ_THACTIVE_BUCKET_WIDTH
);
301 static inline wq_thactive_t
302 _wq_thactive_inc(struct workqueue
*wq
, thread_qos_t qos
)
304 wq_thactive_t v
= _wq_thactive_offset_for_qos(qos
);
305 return os_atomic_add_orig(&wq
->wq_thactive
, v
, relaxed
);
308 static inline wq_thactive_t
309 _wq_thactive_dec(struct workqueue
*wq
, thread_qos_t qos
)
311 wq_thactive_t v
= _wq_thactive_offset_for_qos(qos
);
312 return os_atomic_sub_orig(&wq
->wq_thactive
, v
, relaxed
);
316 _wq_thactive_move(struct workqueue
*wq
,
317 thread_qos_t old_qos
, thread_qos_t new_qos
)
319 wq_thactive_t v
= _wq_thactive_offset_for_qos(new_qos
) -
320 _wq_thactive_offset_for_qos(old_qos
);
321 os_atomic_add(&wq
->wq_thactive
, v
, relaxed
);
322 wq
->wq_thscheduled_count
[_wq_bucket(old_qos
)]--;
323 wq
->wq_thscheduled_count
[_wq_bucket(new_qos
)]++;
326 static inline uint32_t
327 _wq_thactive_aggregate_downto_qos(struct workqueue
*wq
, wq_thactive_t v
,
328 thread_qos_t qos
, uint32_t *busycount
, uint32_t *max_busycount
)
330 uint32_t count
= 0, active
;
333 assert(WORKQ_THREAD_QOS_MIN
<= qos
&& qos
<= WORKQ_THREAD_QOS_MAX
);
336 curtime
= mach_absolute_time();
340 *max_busycount
= THREAD_QOS_LAST
- qos
;
343 int i
= _wq_bucket(qos
);
344 v
>>= i
* WQ_THACTIVE_BUCKET_WIDTH
;
345 for (; i
< WORKQ_NUM_QOS_BUCKETS
; i
++, v
>>= WQ_THACTIVE_BUCKET_WIDTH
) {
346 active
= v
& WQ_THACTIVE_BUCKET_MASK
;
349 if (busycount
&& wq
->wq_thscheduled_count
[i
] > active
) {
350 if (workq_thread_is_busy(curtime
, &wq
->wq_lastblocked_ts
[i
])) {
352 * We only consider the last blocked thread for a given bucket
353 * as busy because we don't want to take the list lock in each
354 * sched callback. However this is an approximation that could
355 * contribute to thread creation storms.
365 #pragma mark wq_flags
367 static inline uint32_t
368 _wq_flags(struct workqueue
*wq
)
370 return os_atomic_load(&wq
->wq_flags
, relaxed
);
374 _wq_exiting(struct workqueue
*wq
)
376 return _wq_flags(wq
) & WQ_EXITING
;
380 workq_is_exiting(struct proc
*p
)
382 struct workqueue
*wq
= proc_get_wqptr(p
);
383 return !wq
|| _wq_exiting(wq
);
386 #pragma mark workqueue lock
389 workq_lock_spin_is_acquired_kdp(struct workqueue
*wq
)
391 return kdp_lck_spin_is_acquired(&wq
->wq_lock
);
395 workq_lock_spin(struct workqueue
*wq
)
397 lck_spin_lock_grp(&wq
->wq_lock
, workq_lck_grp
);
401 workq_lock_held(__assert_only
struct workqueue
*wq
)
403 LCK_SPIN_ASSERT(&wq
->wq_lock
, LCK_ASSERT_OWNED
);
407 workq_lock_try(struct workqueue
*wq
)
409 return lck_spin_try_lock_grp(&wq
->wq_lock
, workq_lck_grp
);
413 workq_unlock(struct workqueue
*wq
)
415 lck_spin_unlock(&wq
->wq_lock
);
418 #pragma mark idle thread lists
420 #define WORKQ_POLICY_INIT(qos) \
421 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
423 static inline thread_qos_t
424 workq_pri_bucket(struct uu_workq_policy req
)
426 return MAX(MAX(req
.qos_req
, req
.qos_max
), req
.qos_override
);
429 static inline thread_qos_t
430 workq_pri_override(struct uu_workq_policy req
)
432 return MAX(workq_pri_bucket(req
), req
.qos_bucket
);
436 workq_thread_needs_params_change(workq_threadreq_t req
, struct uthread
*uth
)
438 workq_threadreq_param_t cur_trp
, req_trp
= { };
440 cur_trp
.trp_value
= uth
->uu_save
.uus_workq_park_data
.workloop_params
;
441 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
) {
442 req_trp
= kqueue_threadreq_workloop_param(req
);
446 * CPU percent flags are handled separately to policy changes, so ignore
447 * them for all of these checks.
449 uint16_t cur_flags
= (cur_trp
.trp_flags
& ~TRP_CPUPERCENT
);
450 uint16_t req_flags
= (req_trp
.trp_flags
& ~TRP_CPUPERCENT
);
452 if (!req_flags
&& !cur_flags
) {
456 if (req_flags
!= cur_flags
) {
460 if ((req_flags
& TRP_PRIORITY
) && req_trp
.trp_pri
!= cur_trp
.trp_pri
) {
464 if ((req_flags
& TRP_POLICY
) && cur_trp
.trp_pol
!= cur_trp
.trp_pol
) {
472 workq_thread_needs_priority_change(workq_threadreq_t req
, struct uthread
*uth
)
474 if (workq_thread_needs_params_change(req
, uth
)) {
478 return req
->tr_qos
!= workq_pri_override(uth
->uu_workq_pri
);
482 workq_thread_update_bucket(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
483 struct uu_workq_policy old_pri
, struct uu_workq_policy new_pri
,
486 thread_qos_t old_bucket
= old_pri
.qos_bucket
;
487 thread_qos_t new_bucket
= workq_pri_bucket(new_pri
);
489 if (old_bucket
!= new_bucket
) {
490 _wq_thactive_move(wq
, old_bucket
, new_bucket
);
493 new_pri
.qos_bucket
= new_bucket
;
494 uth
->uu_workq_pri
= new_pri
;
496 if (workq_pri_override(old_pri
) != new_bucket
) {
497 thread_set_workq_override(uth
->uu_thread
, new_bucket
);
500 if (wq
->wq_reqcount
&& (old_bucket
> new_bucket
|| force_run
)) {
501 int flags
= WORKQ_THREADREQ_CAN_CREATE_THREADS
;
502 if (old_bucket
> new_bucket
) {
504 * When lowering our bucket, we may unblock a thread request,
505 * but we can't drop our priority before we have evaluated
506 * whether this is the case, and if we ever drop the workqueue lock
507 * that would cause a priority inversion.
509 * We hence have to disallow thread creation in that case.
513 workq_schedule_creator(p
, wq
, flags
);
518 * Sets/resets the cpu percent limits on the current thread. We can't set
519 * these limits from outside of the current thread, so this function needs
520 * to be called when we're executing on the intended
523 workq_thread_reset_cpupercent(workq_threadreq_t req
, struct uthread
*uth
)
525 assert(uth
== current_uthread());
526 workq_threadreq_param_t trp
= { };
528 if (req
&& (req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
)) {
529 trp
= kqueue_threadreq_workloop_param(req
);
532 if (uth
->uu_workq_flags
& UT_WORKQ_CPUPERCENT
) {
534 * Going through disable when we have an existing CPU percent limit
535 * set will force the ledger to refill the token bucket of the current
536 * thread. Removing any penalty applied by previous thread use.
538 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE
, 0, 0);
539 uth
->uu_workq_flags
&= ~UT_WORKQ_CPUPERCENT
;
542 if (trp
.trp_flags
& TRP_CPUPERCENT
) {
543 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK
, trp
.trp_cpupercent
,
544 (uint64_t)trp
.trp_refillms
* NSEC_PER_SEC
);
545 uth
->uu_workq_flags
|= UT_WORKQ_CPUPERCENT
;
550 workq_thread_reset_pri(struct workqueue
*wq
, struct uthread
*uth
,
551 workq_threadreq_t req
, bool unpark
)
553 thread_t th
= uth
->uu_thread
;
554 thread_qos_t qos
= req
? req
->tr_qos
: WORKQ_THREAD_QOS_CLEANUP
;
555 workq_threadreq_param_t trp
= { };
557 int policy
= POLICY_TIMESHARE
;
559 if (req
&& (req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
)) {
560 trp
= kqueue_threadreq_workloop_param(req
);
563 uth
->uu_workq_pri
= WORKQ_POLICY_INIT(qos
);
564 uth
->uu_workq_flags
&= ~UT_WORKQ_OUTSIDE_QOS
;
567 uth
->uu_save
.uus_workq_park_data
.workloop_params
= trp
.trp_value
;
568 // qos sent out to userspace (may differ from uu_workq_pri on param threads)
569 uth
->uu_save
.uus_workq_park_data
.qos
= qos
;
572 if (qos
== WORKQ_THREAD_QOS_MANAGER
) {
573 uint32_t mgr_pri
= wq
->wq_event_manager_priority
;
574 assert(trp
.trp_value
== 0); // manager qos and thread policy don't mix
576 if (mgr_pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
577 mgr_pri
&= _PTHREAD_PRIORITY_SCHED_PRI_MASK
;
578 thread_set_workq_pri(th
, THREAD_QOS_UNSPECIFIED
, mgr_pri
,
583 qos
= _pthread_priority_thread_qos(mgr_pri
);
585 if (trp
.trp_flags
& TRP_PRIORITY
) {
586 qos
= THREAD_QOS_UNSPECIFIED
;
587 priority
= trp
.trp_pri
;
588 uth
->uu_workq_flags
|= UT_WORKQ_OUTSIDE_QOS
;
591 if (trp
.trp_flags
& TRP_POLICY
) {
592 policy
= trp
.trp_pol
;
596 thread_set_workq_pri(th
, qos
, priority
, policy
);
600 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
601 * every time a servicer is being told about a new max QoS.
604 workq_thread_set_max_qos(struct proc
*p
, workq_threadreq_t kqr
)
606 struct uu_workq_policy old_pri
, new_pri
;
607 struct uthread
*uth
= current_uthread();
608 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
609 thread_qos_t qos
= kqr
->tr_kq_qos_index
;
611 if (uth
->uu_workq_pri
.qos_max
== qos
) {
616 old_pri
= new_pri
= uth
->uu_workq_pri
;
617 new_pri
.qos_max
= qos
;
618 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
622 #pragma mark idle threads accounting and handling
624 static inline struct uthread
*
625 workq_oldest_killable_idle_thread(struct workqueue
*wq
)
627 struct uthread
*uth
= TAILQ_LAST(&wq
->wq_thidlelist
, workq_uthread_head
);
629 if (uth
&& !uth
->uu_save
.uus_workq_park_data
.has_stack
) {
630 uth
= TAILQ_PREV(uth
, workq_uthread_head
, uu_workq_entry
);
632 assert(uth
->uu_save
.uus_workq_park_data
.has_stack
);
638 static inline uint64_t
639 workq_kill_delay_for_idle_thread(struct workqueue
*wq
)
641 uint64_t delay
= wq_reduce_pool_window
.abstime
;
642 uint16_t idle
= wq
->wq_thidlecount
;
645 * If we have less than wq_death_max_load threads, have a 5s timer.
647 * For the next wq_max_constrained_threads ones, decay linearly from
650 if (idle
<= wq_death_max_load
) {
654 if (wq_max_constrained_threads
> idle
- wq_death_max_load
) {
655 delay
*= (wq_max_constrained_threads
- (idle
- wq_death_max_load
));
657 return delay
/ wq_max_constrained_threads
;
661 workq_should_kill_idle_thread(struct workqueue
*wq
, struct uthread
*uth
,
664 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
665 return now
- uth
->uu_save
.uus_workq_park_data
.idle_stamp
> delay
;
669 workq_death_call_schedule(struct workqueue
*wq
, uint64_t deadline
)
671 uint32_t wq_flags
= os_atomic_load(&wq
->wq_flags
, relaxed
);
673 if (wq_flags
& (WQ_EXITING
| WQ_DEATH_CALL_SCHEDULED
)) {
676 os_atomic_or(&wq
->wq_flags
, WQ_DEATH_CALL_SCHEDULED
, relaxed
);
678 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_NONE
, wq
, 1, 0, 0, 0);
681 * <rdar://problem/13139182> Due to how long term timers work, the leeway
682 * can't be too short, so use 500ms which is long enough that we will not
683 * wake up the CPU for killing threads, but short enough that it doesn't
684 * fall into long-term timer list shenanigans.
686 thread_call_enter_delayed_with_leeway(wq
->wq_death_call
, NULL
, deadline
,
687 wq_reduce_pool_window
.abstime
/ 10,
688 THREAD_CALL_DELAY_LEEWAY
| THREAD_CALL_DELAY_USER_BACKGROUND
);
692 * `decrement` is set to the number of threads that are no longer dying:
693 * - because they have been resuscitated just in time (workq_pop_idle_thread)
694 * - or have been killed (workq_thread_terminate).
697 workq_death_policy_evaluate(struct workqueue
*wq
, uint16_t decrement
)
701 assert(wq
->wq_thdying_count
>= decrement
);
702 if ((wq
->wq_thdying_count
-= decrement
) > 0) {
706 if (wq
->wq_thidlecount
<= 1) {
710 if ((uth
= workq_oldest_killable_idle_thread(wq
)) == NULL
) {
714 uint64_t now
= mach_absolute_time();
715 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
717 if (now
- uth
->uu_save
.uus_workq_park_data
.idle_stamp
> delay
) {
718 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_START
,
719 wq
, wq
->wq_thidlecount
, 0, 0, 0);
720 wq
->wq_thdying_count
++;
721 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
722 if ((uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) == 0) {
723 workq_thread_wakeup(uth
);
728 workq_death_call_schedule(wq
,
729 uth
->uu_save
.uus_workq_park_data
.idle_stamp
+ delay
);
733 workq_thread_terminate(struct proc
*p
, struct uthread
*uth
)
735 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
738 TAILQ_REMOVE(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
739 if (uth
->uu_workq_flags
& UT_WORKQ_DYING
) {
740 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_END
,
741 wq
, wq
->wq_thidlecount
, 0, 0, 0);
742 workq_death_policy_evaluate(wq
, 1);
744 if (wq
->wq_nthreads
-- == wq_max_threads
) {
746 * We got under the thread limit again, which may have prevented
747 * thread creation from happening, redrive if there are pending requests
749 if (wq
->wq_reqcount
) {
750 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
755 thread_deallocate(uth
->uu_thread
);
759 workq_kill_old_threads_call(void *param0
, void *param1 __unused
)
761 struct workqueue
*wq
= param0
;
764 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
765 os_atomic_andnot(&wq
->wq_flags
, WQ_DEATH_CALL_SCHEDULED
, relaxed
);
766 workq_death_policy_evaluate(wq
, 0);
767 WQ_TRACE_WQ(TRACE_wq_death_call
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
771 static struct uthread
*
772 workq_pop_idle_thread(struct workqueue
*wq
, uint8_t uu_flags
,
777 if ((uth
= TAILQ_FIRST(&wq
->wq_thidlelist
))) {
778 TAILQ_REMOVE(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
780 uth
= TAILQ_FIRST(&wq
->wq_thnewlist
);
781 TAILQ_REMOVE(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
783 TAILQ_INSERT_TAIL(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
785 assert((uth
->uu_workq_flags
& UT_WORKQ_RUNNING
) == 0);
786 uth
->uu_workq_flags
|= UT_WORKQ_RUNNING
| uu_flags
;
787 if ((uu_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
788 wq
->wq_constrained_threads_scheduled
++;
790 wq
->wq_threads_scheduled
++;
791 wq
->wq_thidlecount
--;
793 if (__improbable(uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
794 uth
->uu_workq_flags
^= UT_WORKQ_DYING
;
795 workq_death_policy_evaluate(wq
, 1);
796 *needs_wakeup
= false;
797 } else if (uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) {
798 *needs_wakeup
= false;
800 *needs_wakeup
= true;
806 * Called by thread_create_workq_waiting() during thread initialization, before
807 * assert_wait, before the thread has been started.
810 workq_thread_init_and_wq_lock(task_t task
, thread_t th
)
812 struct uthread
*uth
= get_bsdthread_info(th
);
814 uth
->uu_workq_flags
= UT_WORKQ_NEW
;
815 uth
->uu_workq_pri
= WORKQ_POLICY_INIT(THREAD_QOS_LEGACY
);
816 uth
->uu_workq_thport
= MACH_PORT_NULL
;
817 uth
->uu_workq_stackaddr
= 0;
818 uth
->uu_workq_pthread_kill_allowed
= 0;
820 thread_set_tag(th
, THREAD_TAG_PTHREAD
| THREAD_TAG_WORKQUEUE
);
821 thread_reset_workq_qos(th
, THREAD_QOS_LEGACY
);
823 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task
)));
824 return workq_parked_wait_event(uth
);
828 * Try to add a new workqueue thread.
830 * - called with workq lock held
831 * - dropped and retaken around thread creation
832 * - return with workq lock held
835 workq_add_new_idle_thread(proc_t p
, struct workqueue
*wq
)
837 mach_vm_offset_t th_stackaddr
;
845 vm_map_t vmap
= get_task_map(p
->task
);
847 kret
= pthread_functions
->workq_create_threadstack(p
, vmap
, &th_stackaddr
);
848 if (kret
!= KERN_SUCCESS
) {
849 WQ_TRACE_WQ(TRACE_wq_thread_create_failed
| DBG_FUNC_NONE
, wq
,
854 kret
= thread_create_workq_waiting(p
->task
, workq_unpark_continue
, &th
);
855 if (kret
!= KERN_SUCCESS
) {
856 WQ_TRACE_WQ(TRACE_wq_thread_create_failed
| DBG_FUNC_NONE
, wq
,
858 pthread_functions
->workq_destroy_threadstack(p
, vmap
, th_stackaddr
);
862 // thread_create_workq_waiting() will return with the wq lock held
863 // on success, because it calls workq_thread_init_and_wq_lock() above
865 struct uthread
*uth
= get_bsdthread_info(th
);
868 wq
->wq_thidlecount
++;
869 uth
->uu_workq_stackaddr
= th_stackaddr
;
870 TAILQ_INSERT_TAIL(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
872 WQ_TRACE_WQ(TRACE_wq_thread_create
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
878 * Do not redrive here if we went under wq_max_threads again,
879 * it is the responsibility of the callers of this function
880 * to do so when it fails.
886 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
888 __attribute__((noreturn
, noinline
))
890 workq_unpark_for_death_and_unlock(proc_t p
, struct workqueue
*wq
,
891 struct uthread
*uth
, uint32_t death_flags
, uint32_t setup_flags
)
893 thread_qos_t qos
= workq_pri_override(uth
->uu_workq_pri
);
894 bool first_use
= uth
->uu_workq_flags
& UT_WORKQ_NEW
;
896 if (qos
> WORKQ_THREAD_QOS_CLEANUP
) {
897 workq_thread_reset_pri(wq
, uth
, NULL
, /*unpark*/ true);
898 qos
= WORKQ_THREAD_QOS_CLEANUP
;
901 workq_thread_reset_cpupercent(NULL
, uth
);
903 if (death_flags
& WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
) {
904 wq
->wq_thidlecount
--;
906 TAILQ_REMOVE(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
908 TAILQ_REMOVE(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
911 TAILQ_INSERT_TAIL(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
915 if (setup_flags
& WQ_SETUP_CLEAR_VOUCHER
) {
916 __assert_only kern_return_t kr
;
917 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
918 assert(kr
== KERN_SUCCESS
);
921 uint32_t flags
= WQ_FLAG_THREAD_NEWSPI
| qos
| WQ_FLAG_THREAD_PRIO_QOS
;
922 thread_t th
= uth
->uu_thread
;
923 vm_map_t vmap
= get_task_map(p
->task
);
926 flags
|= WQ_FLAG_THREAD_REUSE
;
929 pthread_functions
->workq_setup_thread(p
, th
, vmap
, uth
->uu_workq_stackaddr
,
930 uth
->uu_workq_thport
, 0, WQ_SETUP_EXIT_THREAD
, flags
);
931 __builtin_unreachable();
935 workq_is_current_thread_updating_turnstile(struct workqueue
*wq
)
937 return wq
->wq_turnstile_updater
== current_thread();
940 __attribute__((always_inline
))
942 workq_perform_turnstile_operation_locked(struct workqueue
*wq
,
943 void (^operation
)(void))
946 wq
->wq_turnstile_updater
= current_thread();
948 wq
->wq_turnstile_updater
= THREAD_NULL
;
952 workq_turnstile_update_inheritor(struct workqueue
*wq
,
953 turnstile_inheritor_t inheritor
,
954 turnstile_update_flags_t flags
)
956 if (wq
->wq_inheritor
== inheritor
) {
959 wq
->wq_inheritor
= inheritor
;
960 workq_perform_turnstile_operation_locked(wq
, ^{
961 turnstile_update_inheritor(wq
->wq_turnstile
, inheritor
,
962 flags
| TURNSTILE_IMMEDIATE_UPDATE
);
963 turnstile_update_inheritor_complete(wq
->wq_turnstile
,
964 TURNSTILE_INTERLOCK_HELD
);
969 workq_push_idle_thread(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
970 uint32_t setup_flags
)
972 uint64_t now
= mach_absolute_time();
973 bool is_creator
= (uth
== wq
->wq_creator
);
975 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
976 wq
->wq_constrained_threads_scheduled
--;
978 uth
->uu_workq_flags
&= ~(UT_WORKQ_RUNNING
| UT_WORKQ_OVERCOMMIT
);
979 TAILQ_REMOVE(&wq
->wq_thrunlist
, uth
, uu_workq_entry
);
980 wq
->wq_threads_scheduled
--;
983 wq
->wq_creator
= NULL
;
984 WQ_TRACE_WQ(TRACE_wq_creator_select
, wq
, 3, 0,
985 uth
->uu_save
.uus_workq_park_data
.yields
, 0);
988 if (wq
->wq_inheritor
== uth
->uu_thread
) {
989 assert(wq
->wq_creator
== NULL
);
990 if (wq
->wq_reqcount
) {
991 workq_turnstile_update_inheritor(wq
, wq
, TURNSTILE_INHERITOR_WORKQ
);
993 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
997 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
998 assert(is_creator
|| (_wq_flags(wq
) & WQ_EXITING
));
999 TAILQ_INSERT_TAIL(&wq
->wq_thnewlist
, uth
, uu_workq_entry
);
1000 wq
->wq_thidlecount
++;
1005 _wq_thactive_dec(wq
, uth
->uu_workq_pri
.qos_bucket
);
1006 wq
->wq_thscheduled_count
[_wq_bucket(uth
->uu_workq_pri
.qos_bucket
)]--;
1007 uth
->uu_workq_flags
|= UT_WORKQ_IDLE_CLEANUP
;
1010 uth
->uu_save
.uus_workq_park_data
.idle_stamp
= now
;
1012 struct uthread
*oldest
= workq_oldest_killable_idle_thread(wq
);
1013 uint16_t cur_idle
= wq
->wq_thidlecount
;
1015 if (cur_idle
>= wq_max_constrained_threads
||
1016 (wq
->wq_thdying_count
== 0 && oldest
&&
1017 workq_should_kill_idle_thread(wq
, oldest
, now
))) {
1019 * Immediately kill threads if we have too may of them.
1021 * And swap "place" with the oldest one we'd have woken up.
1022 * This is a relatively desperate situation where we really
1023 * need to kill threads quickly and it's best to kill
1024 * the one that's currently on core than context switching.
1027 oldest
->uu_save
.uus_workq_park_data
.idle_stamp
= now
;
1028 TAILQ_REMOVE(&wq
->wq_thidlelist
, oldest
, uu_workq_entry
);
1029 TAILQ_INSERT_HEAD(&wq
->wq_thidlelist
, oldest
, uu_workq_entry
);
1032 WQ_TRACE_WQ(TRACE_wq_thread_terminate
| DBG_FUNC_START
,
1033 wq
, cur_idle
, 0, 0, 0);
1034 wq
->wq_thdying_count
++;
1035 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
1036 uth
->uu_workq_flags
&= ~UT_WORKQ_IDLE_CLEANUP
;
1037 workq_unpark_for_death_and_unlock(p
, wq
, uth
, 0, setup_flags
);
1038 __builtin_unreachable();
1041 struct uthread
*tail
= TAILQ_LAST(&wq
->wq_thidlelist
, workq_uthread_head
);
1044 wq
->wq_thidlecount
= cur_idle
;
1046 if (cur_idle
>= wq_death_max_load
&& tail
&&
1047 tail
->uu_save
.uus_workq_park_data
.has_stack
) {
1048 uth
->uu_save
.uus_workq_park_data
.has_stack
= false;
1049 TAILQ_INSERT_TAIL(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
1051 uth
->uu_save
.uus_workq_park_data
.has_stack
= true;
1052 TAILQ_INSERT_HEAD(&wq
->wq_thidlelist
, uth
, uu_workq_entry
);
1056 uint64_t delay
= workq_kill_delay_for_idle_thread(wq
);
1057 workq_death_call_schedule(wq
, now
+ delay
);
1061 #pragma mark thread requests
1064 workq_priority_for_req(workq_threadreq_t req
)
1066 thread_qos_t qos
= req
->tr_qos
;
1068 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
1069 workq_threadreq_param_t trp
= kqueue_threadreq_workloop_param(req
);
1070 assert(trp
.trp_flags
& TRP_PRIORITY
);
1073 return thread_workq_pri_for_qos(qos
);
1076 static inline struct priority_queue
*
1077 workq_priority_queue_for_req(struct workqueue
*wq
, workq_threadreq_t req
)
1079 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
1080 return &wq
->wq_special_queue
;
1081 } else if (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
1082 return &wq
->wq_overcommit_queue
;
1084 return &wq
->wq_constrained_queue
;
1089 * returns true if the the enqueued request is the highest priority item
1090 * in its priority queue.
1093 workq_threadreq_enqueue(struct workqueue
*wq
, workq_threadreq_t req
)
1095 assert(req
->tr_state
== WORKQ_TR_STATE_NEW
);
1097 req
->tr_state
= WORKQ_TR_STATE_QUEUED
;
1098 wq
->wq_reqcount
+= req
->tr_count
;
1100 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
1101 assert(wq
->wq_event_manager_threadreq
== NULL
);
1102 assert(req
->tr_flags
& WORKQ_TR_FLAG_KEVENT
);
1103 assert(req
->tr_count
== 1);
1104 wq
->wq_event_manager_threadreq
= req
;
1107 if (priority_queue_insert(workq_priority_queue_for_req(wq
, req
),
1108 &req
->tr_entry
, workq_priority_for_req(req
),
1109 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
)) {
1110 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
1111 _wq_thactive_refresh_best_constrained_req_qos(wq
);
1119 * returns true if the the dequeued request was the highest priority item
1120 * in its priority queue.
1123 workq_threadreq_dequeue(struct workqueue
*wq
, workq_threadreq_t req
)
1127 if (--req
->tr_count
== 0) {
1128 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
1129 assert(wq
->wq_event_manager_threadreq
== req
);
1130 assert(req
->tr_count
== 0);
1131 wq
->wq_event_manager_threadreq
= NULL
;
1134 if (priority_queue_remove(workq_priority_queue_for_req(wq
, req
),
1135 &req
->tr_entry
, PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
)) {
1136 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
1137 _wq_thactive_refresh_best_constrained_req_qos(wq
);
1146 workq_threadreq_destroy(proc_t p
, workq_threadreq_t req
)
1148 req
->tr_state
= WORKQ_TR_STATE_CANCELED
;
1149 if (req
->tr_flags
& (WORKQ_TR_FLAG_WORKLOOP
| WORKQ_TR_FLAG_KEVENT
)) {
1150 kqueue_threadreq_cancel(p
, req
);
1152 zfree(workq_zone_threadreq
, req
);
1156 #pragma mark workqueue thread creation thread calls
1159 workq_thread_call_prepost(struct workqueue
*wq
, uint32_t sched
, uint32_t pend
,
1162 uint32_t old_flags
, new_flags
;
1164 os_atomic_rmw_loop(&wq
->wq_flags
, old_flags
, new_flags
, acquire
, {
1165 if (__improbable(old_flags
& (WQ_EXITING
| sched
| pend
| fail_mask
))) {
1166 os_atomic_rmw_loop_give_up(return false);
1168 if (__improbable(old_flags
& WQ_PROC_SUSPENDED
)) {
1169 new_flags
= old_flags
| pend
;
1171 new_flags
= old_flags
| sched
;
1175 return (old_flags
& WQ_PROC_SUSPENDED
) == 0;
1178 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1181 workq_schedule_delayed_thread_creation(struct workqueue
*wq
, int flags
)
1183 assert(!preemption_enabled());
1185 if (!workq_thread_call_prepost(wq
, WQ_DELAYED_CALL_SCHEDULED
,
1186 WQ_DELAYED_CALL_PENDED
, WQ_IMMEDIATE_CALL_PENDED
|
1187 WQ_IMMEDIATE_CALL_SCHEDULED
)) {
1191 uint64_t now
= mach_absolute_time();
1193 if (flags
& WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART
) {
1194 /* do not change the window */
1195 } else if (now
- wq
->wq_thread_call_last_run
<= wq
->wq_timer_interval
) {
1196 wq
->wq_timer_interval
*= 2;
1197 if (wq
->wq_timer_interval
> wq_max_timer_interval
.abstime
) {
1198 wq
->wq_timer_interval
= wq_max_timer_interval
.abstime
;
1200 } else if (now
- wq
->wq_thread_call_last_run
> 2 * wq
->wq_timer_interval
) {
1201 wq
->wq_timer_interval
/= 2;
1202 if (wq
->wq_timer_interval
< wq_stalled_window
.abstime
) {
1203 wq
->wq_timer_interval
= wq_stalled_window
.abstime
;
1207 WQ_TRACE_WQ(TRACE_wq_start_add_timer
, wq
, wq
->wq_reqcount
,
1208 _wq_flags(wq
), wq
->wq_timer_interval
, 0);
1210 thread_call_t call
= wq
->wq_delayed_call
;
1211 uintptr_t arg
= WQ_DELAYED_CALL_SCHEDULED
;
1212 uint64_t deadline
= now
+ wq
->wq_timer_interval
;
1213 if (thread_call_enter1_delayed(call
, (void *)arg
, deadline
)) {
1214 panic("delayed_call was already enqueued");
1220 workq_schedule_immediate_thread_creation(struct workqueue
*wq
)
1222 assert(!preemption_enabled());
1224 if (workq_thread_call_prepost(wq
, WQ_IMMEDIATE_CALL_SCHEDULED
,
1225 WQ_IMMEDIATE_CALL_PENDED
, 0)) {
1226 WQ_TRACE_WQ(TRACE_wq_start_add_timer
, wq
, wq
->wq_reqcount
,
1227 _wq_flags(wq
), 0, 0);
1229 uintptr_t arg
= WQ_IMMEDIATE_CALL_SCHEDULED
;
1230 if (thread_call_enter1(wq
->wq_immediate_call
, (void *)arg
)) {
1231 panic("immediate_call was already enqueued");
1237 workq_proc_suspended(struct proc
*p
)
1239 struct workqueue
*wq
= proc_get_wqptr(p
);
1242 os_atomic_or(&wq
->wq_flags
, WQ_PROC_SUSPENDED
, relaxed
);
1247 workq_proc_resumed(struct proc
*p
)
1249 struct workqueue
*wq
= proc_get_wqptr(p
);
1256 wq_flags
= os_atomic_andnot_orig(&wq
->wq_flags
, WQ_PROC_SUSPENDED
|
1257 WQ_DELAYED_CALL_PENDED
| WQ_IMMEDIATE_CALL_PENDED
, relaxed
);
1258 if ((wq_flags
& WQ_EXITING
) == 0) {
1259 disable_preemption();
1260 if (wq_flags
& WQ_IMMEDIATE_CALL_PENDED
) {
1261 workq_schedule_immediate_thread_creation(wq
);
1262 } else if (wq_flags
& WQ_DELAYED_CALL_PENDED
) {
1263 workq_schedule_delayed_thread_creation(wq
,
1264 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART
);
1266 enable_preemption();
1271 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1274 workq_thread_is_busy(uint64_t now
, _Atomic
uint64_t *lastblocked_tsp
)
1276 uint64_t lastblocked_ts
= os_atomic_load_wide(lastblocked_tsp
, relaxed
);
1277 if (now
<= lastblocked_ts
) {
1279 * Because the update of the timestamp when a thread blocks
1280 * isn't serialized against us looking at it (i.e. we don't hold
1281 * the workq lock), it's possible to have a timestamp that matches
1282 * the current time or that even looks to be in the future relative
1283 * to when we grabbed the current time...
1285 * Just treat this as a busy thread since it must have just blocked.
1289 return (now
- lastblocked_ts
) < wq_stalled_window
.abstime
;
1293 workq_add_new_threads_call(void *_p
, void *flags
)
1296 struct workqueue
*wq
= proc_get_wqptr(p
);
1297 uint32_t my_flag
= (uint32_t)(uintptr_t)flags
;
1300 * workq_exit() will set the workqueue to NULL before
1301 * it cancels thread calls.
1307 assert((my_flag
== WQ_DELAYED_CALL_SCHEDULED
) ||
1308 (my_flag
== WQ_IMMEDIATE_CALL_SCHEDULED
));
1310 WQ_TRACE_WQ(TRACE_wq_add_timer
| DBG_FUNC_START
, wq
, _wq_flags(wq
),
1311 wq
->wq_nthreads
, wq
->wq_thidlecount
, 0);
1313 workq_lock_spin(wq
);
1315 wq
->wq_thread_call_last_run
= mach_absolute_time();
1316 os_atomic_andnot(&wq
->wq_flags
, my_flag
, release
);
1318 /* This can drop the workqueue lock, and take it again */
1319 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
1323 WQ_TRACE_WQ(TRACE_wq_add_timer
| DBG_FUNC_END
, wq
, 0,
1324 wq
->wq_nthreads
, wq
->wq_thidlecount
, 0);
1327 #pragma mark thread state tracking
1330 workq_sched_callback(int type
, thread_t thread
)
1332 struct uthread
*uth
= get_bsdthread_info(thread
);
1333 proc_t proc
= get_bsdtask_info(get_threadtask(thread
));
1334 struct workqueue
*wq
= proc_get_wqptr(proc
);
1335 thread_qos_t req_qos
, qos
= uth
->uu_workq_pri
.qos_bucket
;
1336 wq_thactive_t old_thactive
;
1337 bool start_timer
= false;
1339 if (qos
== WORKQ_THREAD_QOS_MANAGER
) {
1344 case SCHED_CALL_BLOCK
:
1345 old_thactive
= _wq_thactive_dec(wq
, qos
);
1346 req_qos
= WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive
);
1349 * Remember the timestamp of the last thread that blocked in this
1350 * bucket, it used used by admission checks to ignore one thread
1351 * being inactive if this timestamp is recent enough.
1353 * If we collide with another thread trying to update the
1354 * last_blocked (really unlikely since another thread would have to
1355 * get scheduled and then block after we start down this path), it's
1356 * not a problem. Either timestamp is adequate, so no need to retry
1358 os_atomic_store_wide(&wq
->wq_lastblocked_ts
[_wq_bucket(qos
)],
1359 thread_last_run_time(thread
), relaxed
);
1361 if (req_qos
== THREAD_QOS_UNSPECIFIED
) {
1363 * No pending request at the moment we could unblock, move on.
1365 } else if (qos
< req_qos
) {
1367 * The blocking thread is at a lower QoS than the highest currently
1368 * pending constrained request, nothing has to be redriven
1371 uint32_t max_busycount
, old_req_count
;
1372 old_req_count
= _wq_thactive_aggregate_downto_qos(wq
, old_thactive
,
1373 req_qos
, NULL
, &max_busycount
);
1375 * If it is possible that may_start_constrained_thread had refused
1376 * admission due to being over the max concurrency, we may need to
1377 * spin up a new thread.
1379 * We take into account the maximum number of busy threads
1380 * that can affect may_start_constrained_thread as looking at the
1381 * actual number may_start_constrained_thread will see is racy.
1383 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1384 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1386 uint32_t conc
= wq_max_parallelism
[_wq_bucket(qos
)];
1387 if (old_req_count
<= conc
&& conc
<= old_req_count
+ max_busycount
) {
1388 start_timer
= workq_schedule_delayed_thread_creation(wq
, 0);
1391 if (__improbable(kdebug_enable
)) {
1392 __unused
uint32_t old
= _wq_thactive_aggregate_downto_qos(wq
,
1393 old_thactive
, qos
, NULL
, NULL
);
1394 WQ_TRACE_WQ(TRACE_wq_thread_block
| DBG_FUNC_START
, wq
,
1395 old
- 1, qos
| (req_qos
<< 8),
1396 wq
->wq_reqcount
<< 1 | start_timer
, 0);
1400 case SCHED_CALL_UNBLOCK
:
1402 * we cannot take the workqueue_lock here...
1403 * an UNBLOCK can occur from a timer event which
1404 * is run from an interrupt context... if the workqueue_lock
1405 * is already held by this processor, we'll deadlock...
1406 * the thread lock for the thread being UNBLOCKED
1409 old_thactive
= _wq_thactive_inc(wq
, qos
);
1410 if (__improbable(kdebug_enable
)) {
1411 __unused
uint32_t old
= _wq_thactive_aggregate_downto_qos(wq
,
1412 old_thactive
, qos
, NULL
, NULL
);
1413 req_qos
= WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive
);
1414 WQ_TRACE_WQ(TRACE_wq_thread_block
| DBG_FUNC_END
, wq
,
1415 old
+ 1, qos
| (req_qos
<< 8),
1416 wq
->wq_threads_scheduled
, 0);
1422 #pragma mark workq lifecycle
1425 workq_reference(struct workqueue
*wq
)
1427 os_ref_retain(&wq
->wq_refcnt
);
1431 workq_deallocate_queue_invoke(mpsc_queue_chain_t e
,
1432 __assert_only mpsc_daemon_queue_t dq
)
1434 struct workqueue
*wq
;
1435 struct turnstile
*ts
;
1437 wq
= mpsc_queue_element(e
, struct workqueue
, wq_destroy_link
);
1438 assert(dq
== &workq_deallocate_queue
);
1440 turnstile_complete((uintptr_t)wq
, &wq
->wq_turnstile
, &ts
, TURNSTILE_WORKQS
);
1442 turnstile_cleanup();
1443 turnstile_deallocate(ts
);
1445 lck_spin_destroy(&wq
->wq_lock
, workq_lck_grp
);
1446 zfree(workq_zone_workqueue
, wq
);
1450 workq_deallocate(struct workqueue
*wq
)
1452 if (os_ref_release_relaxed(&wq
->wq_refcnt
) == 0) {
1453 workq_deallocate_queue_invoke(&wq
->wq_destroy_link
,
1454 &workq_deallocate_queue
);
1459 workq_deallocate_safe(struct workqueue
*wq
)
1461 if (__improbable(os_ref_release_relaxed(&wq
->wq_refcnt
) == 0)) {
1462 mpsc_daemon_enqueue(&workq_deallocate_queue
, &wq
->wq_destroy_link
,
1463 MPSC_QUEUE_DISABLE_PREEMPTION
);
1468 * Setup per-process state for the workqueue.
1471 workq_open(struct proc
*p
, __unused
struct workq_open_args
*uap
,
1472 __unused
int32_t *retval
)
1474 struct workqueue
*wq
;
1477 if ((p
->p_lflag
& P_LREGISTER
) == 0) {
1481 if (wq_init_constrained_limit
) {
1482 uint32_t limit
, num_cpus
= ml_get_max_cpus();
1485 * set up the limit for the constrained pool
1486 * this is a virtual pool in that we don't
1487 * maintain it on a separate idle and run list
1489 limit
= num_cpus
* WORKQUEUE_CONSTRAINED_FACTOR
;
1491 if (limit
> wq_max_constrained_threads
) {
1492 wq_max_constrained_threads
= limit
;
1495 if (wq_max_threads
> WQ_THACTIVE_BUCKET_HALF
) {
1496 wq_max_threads
= WQ_THACTIVE_BUCKET_HALF
;
1498 if (wq_max_threads
> CONFIG_THREAD_MAX
- 20) {
1499 wq_max_threads
= CONFIG_THREAD_MAX
- 20;
1502 wq_death_max_load
= (uint16_t)fls(num_cpus
) + 1;
1504 for (thread_qos_t qos
= WORKQ_THREAD_QOS_MIN
; qos
<= WORKQ_THREAD_QOS_MAX
; qos
++) {
1505 wq_max_parallelism
[_wq_bucket(qos
)] =
1506 qos_max_parallelism(qos
, QOS_PARALLELISM_COUNT_LOGICAL
);
1509 wq_init_constrained_limit
= 0;
1512 if (proc_get_wqptr(p
) == NULL
) {
1513 if (proc_init_wqptr_or_wait(p
) == FALSE
) {
1514 assert(proc_get_wqptr(p
) != NULL
);
1518 wq
= (struct workqueue
*)zalloc(workq_zone_workqueue
);
1519 bzero(wq
, sizeof(struct workqueue
));
1521 os_ref_init_count(&wq
->wq_refcnt
, &workq_refgrp
, 1);
1523 // Start the event manager at the priority hinted at by the policy engine
1524 thread_qos_t mgr_priority_hint
= task_get_default_manager_qos(current_task());
1525 pthread_priority_t pp
= _pthread_priority_make_from_thread_qos(mgr_priority_hint
, 0, 0);
1526 wq
->wq_event_manager_priority
= (uint32_t)pp
;
1527 wq
->wq_timer_interval
= wq_stalled_window
.abstime
;
1529 turnstile_prepare((uintptr_t)wq
, &wq
->wq_turnstile
, turnstile_alloc(),
1532 TAILQ_INIT(&wq
->wq_thrunlist
);
1533 TAILQ_INIT(&wq
->wq_thnewlist
);
1534 TAILQ_INIT(&wq
->wq_thidlelist
);
1535 priority_queue_init(&wq
->wq_overcommit_queue
,
1536 PRIORITY_QUEUE_BUILTIN_MAX_HEAP
);
1537 priority_queue_init(&wq
->wq_constrained_queue
,
1538 PRIORITY_QUEUE_BUILTIN_MAX_HEAP
);
1539 priority_queue_init(&wq
->wq_special_queue
,
1540 PRIORITY_QUEUE_BUILTIN_MAX_HEAP
);
1542 wq
->wq_delayed_call
= thread_call_allocate_with_options(
1543 workq_add_new_threads_call
, p
, THREAD_CALL_PRIORITY_KERNEL
,
1544 THREAD_CALL_OPTIONS_ONCE
);
1545 wq
->wq_immediate_call
= thread_call_allocate_with_options(
1546 workq_add_new_threads_call
, p
, THREAD_CALL_PRIORITY_KERNEL
,
1547 THREAD_CALL_OPTIONS_ONCE
);
1548 wq
->wq_death_call
= thread_call_allocate_with_options(
1549 workq_kill_old_threads_call
, wq
,
1550 THREAD_CALL_PRIORITY_USER
, THREAD_CALL_OPTIONS_ONCE
);
1552 lck_spin_init(&wq
->wq_lock
, workq_lck_grp
, workq_lck_attr
);
1554 WQ_TRACE_WQ(TRACE_wq_create
| DBG_FUNC_NONE
, wq
,
1555 VM_KERNEL_ADDRHIDE(wq
), 0, 0, 0);
1556 proc_set_wqptr(p
, wq
);
1564 * Routine: workq_mark_exiting
1566 * Function: Mark the work queue such that new threads will not be added to the
1567 * work queue after we return.
1569 * Conditions: Called against the current process.
1572 workq_mark_exiting(struct proc
*p
)
1574 struct workqueue
*wq
= proc_get_wqptr(p
);
1576 workq_threadreq_t mgr_req
;
1582 WQ_TRACE_WQ(TRACE_wq_pthread_exit
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
1584 workq_lock_spin(wq
);
1586 wq_flags
= os_atomic_or_orig(&wq
->wq_flags
, WQ_EXITING
, relaxed
);
1587 if (__improbable(wq_flags
& WQ_EXITING
)) {
1588 panic("workq_mark_exiting called twice");
1592 * Opportunistically try to cancel thread calls that are likely in flight.
1593 * workq_exit() will do the proper cleanup.
1595 if (wq_flags
& WQ_IMMEDIATE_CALL_SCHEDULED
) {
1596 thread_call_cancel(wq
->wq_immediate_call
);
1598 if (wq_flags
& WQ_DELAYED_CALL_SCHEDULED
) {
1599 thread_call_cancel(wq
->wq_delayed_call
);
1601 if (wq_flags
& WQ_DEATH_CALL_SCHEDULED
) {
1602 thread_call_cancel(wq
->wq_death_call
);
1605 mgr_req
= wq
->wq_event_manager_threadreq
;
1606 wq
->wq_event_manager_threadreq
= NULL
;
1607 wq
->wq_reqcount
= 0; /* workq_schedule_creator must not look at queues */
1608 wq
->wq_creator
= NULL
;
1609 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
1614 kqueue_threadreq_cancel(p
, mgr_req
);
1617 * No one touches the priority queues once WQ_EXITING is set.
1618 * It is hence safe to do the tear down without holding any lock.
1620 priority_queue_destroy(&wq
->wq_overcommit_queue
,
1621 struct workq_threadreq_s
, tr_entry
, ^(void *e
){
1622 workq_threadreq_destroy(p
, e
);
1624 priority_queue_destroy(&wq
->wq_constrained_queue
,
1625 struct workq_threadreq_s
, tr_entry
, ^(void *e
){
1626 workq_threadreq_destroy(p
, e
);
1628 priority_queue_destroy(&wq
->wq_special_queue
,
1629 struct workq_threadreq_s
, tr_entry
, ^(void *e
){
1630 workq_threadreq_destroy(p
, e
);
1633 WQ_TRACE(TRACE_wq_pthread_exit
| DBG_FUNC_END
, 0, 0, 0, 0, 0);
1637 * Routine: workq_exit
1639 * Function: clean up the work queue structure(s) now that there are no threads
1640 * left running inside the work queue (except possibly current_thread).
1642 * Conditions: Called by the last thread in the process.
1643 * Called against current process.
1646 workq_exit(struct proc
*p
)
1648 struct workqueue
*wq
;
1649 struct uthread
*uth
, *tmp
;
1651 wq
= os_atomic_xchg(&p
->p_wqptr
, NULL
, relaxed
);
1653 thread_t th
= current_thread();
1655 WQ_TRACE_WQ(TRACE_wq_workqueue_exit
| DBG_FUNC_START
, wq
, 0, 0, 0, 0);
1657 if (thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) {
1659 * <rdar://problem/40111515> Make sure we will no longer call the
1660 * sched call, if we ever block this thread, which the cancel_wait
1663 thread_sched_call(th
, NULL
);
1667 * Thread calls are always scheduled by the proc itself or under the
1668 * workqueue spinlock if WQ_EXITING is not yet set.
1670 * Either way, when this runs, the proc has no threads left beside
1671 * the one running this very code, so we know no thread call can be
1672 * dispatched anymore.
1674 thread_call_cancel_wait(wq
->wq_delayed_call
);
1675 thread_call_cancel_wait(wq
->wq_immediate_call
);
1676 thread_call_cancel_wait(wq
->wq_death_call
);
1677 thread_call_free(wq
->wq_delayed_call
);
1678 thread_call_free(wq
->wq_immediate_call
);
1679 thread_call_free(wq
->wq_death_call
);
1682 * Clean up workqueue data structures for threads that exited and
1683 * didn't get a chance to clean up after themselves.
1685 * idle/new threads should have been interrupted and died on their own
1687 TAILQ_FOREACH_SAFE(uth
, &wq
->wq_thrunlist
, uu_workq_entry
, tmp
) {
1688 thread_sched_call(uth
->uu_thread
, NULL
);
1689 thread_deallocate(uth
->uu_thread
);
1691 assert(TAILQ_EMPTY(&wq
->wq_thnewlist
));
1692 assert(TAILQ_EMPTY(&wq
->wq_thidlelist
));
1694 WQ_TRACE_WQ(TRACE_wq_destroy
| DBG_FUNC_END
, wq
,
1695 VM_KERNEL_ADDRHIDE(wq
), 0, 0, 0);
1697 workq_deallocate(wq
);
1699 WQ_TRACE(TRACE_wq_workqueue_exit
| DBG_FUNC_END
, 0, 0, 0, 0, 0);
1704 #pragma mark bsd thread control
1707 _pthread_priority_to_policy(pthread_priority_t priority
,
1708 thread_qos_policy_data_t
*data
)
1710 data
->qos_tier
= _pthread_priority_thread_qos(priority
);
1711 data
->tier_importance
= _pthread_priority_relpri(priority
);
1712 if (data
->qos_tier
== THREAD_QOS_UNSPECIFIED
|| data
->tier_importance
> 0 ||
1713 data
->tier_importance
< THREAD_QOS_MIN_TIER_IMPORTANCE
) {
1720 bsdthread_set_self(proc_t p
, thread_t th
, pthread_priority_t priority
,
1721 mach_port_name_t voucher
, enum workq_set_self_flags flags
)
1723 struct uthread
*uth
= get_bsdthread_info(th
);
1724 struct workqueue
*wq
= proc_get_wqptr(p
);
1727 int unbind_rv
= 0, qos_rv
= 0, voucher_rv
= 0, fixedpri_rv
= 0;
1728 bool is_wq_thread
= (thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
);
1730 if (flags
& WORKQ_SET_SELF_WQ_KEVENT_UNBIND
) {
1731 if (!is_wq_thread
) {
1736 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
1741 workq_threadreq_t kqr
= uth
->uu_kqr_bound
;
1743 unbind_rv
= EALREADY
;
1747 if (kqr
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
1752 kqueue_threadreq_unbind(p
, kqr
);
1756 if (flags
& WORKQ_SET_SELF_QOS_FLAG
) {
1757 thread_qos_policy_data_t new_policy
;
1759 if (!_pthread_priority_to_policy(priority
, &new_policy
)) {
1764 if (!is_wq_thread
) {
1766 * Threads opted out of QoS can't change QoS
1768 if (!thread_has_qos_policy(th
)) {
1772 } else if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
||
1773 uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_ABOVEUI
) {
1775 * Workqueue manager threads or threads above UI can't change QoS
1781 * For workqueue threads, possibly adjust buckets and redrive thread
1784 bool old_overcommit
= uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
;
1785 bool new_overcommit
= priority
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
;
1786 struct uu_workq_policy old_pri
, new_pri
;
1787 bool force_run
= false;
1789 workq_lock_spin(wq
);
1791 if (old_overcommit
!= new_overcommit
) {
1792 uth
->uu_workq_flags
^= UT_WORKQ_OVERCOMMIT
;
1793 if (old_overcommit
) {
1794 wq
->wq_constrained_threads_scheduled
++;
1795 } else if (wq
->wq_constrained_threads_scheduled
-- ==
1796 wq_max_constrained_threads
) {
1801 old_pri
= new_pri
= uth
->uu_workq_pri
;
1802 new_pri
.qos_req
= new_policy
.qos_tier
;
1803 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, force_run
);
1807 kr
= thread_policy_set_internal(th
, THREAD_QOS_POLICY
,
1808 (thread_policy_t
)&new_policy
, THREAD_QOS_POLICY_COUNT
);
1809 if (kr
!= KERN_SUCCESS
) {
1815 if (flags
& WORKQ_SET_SELF_VOUCHER_FLAG
) {
1816 kr
= thread_set_voucher_name(voucher
);
1817 if (kr
!= KERN_SUCCESS
) {
1818 voucher_rv
= ENOENT
;
1827 if (flags
& WORKQ_SET_SELF_FIXEDPRIORITY_FLAG
) {
1828 thread_extended_policy_data_t extpol
= {.timeshare
= 0};
1831 /* Not allowed on workqueue threads */
1832 fixedpri_rv
= ENOTSUP
;
1836 kr
= thread_policy_set_internal(th
, THREAD_EXTENDED_POLICY
,
1837 (thread_policy_t
)&extpol
, THREAD_EXTENDED_POLICY_COUNT
);
1838 if (kr
!= KERN_SUCCESS
) {
1839 fixedpri_rv
= EINVAL
;
1842 } else if (flags
& WORKQ_SET_SELF_TIMESHARE_FLAG
) {
1843 thread_extended_policy_data_t extpol
= {.timeshare
= 1};
1846 /* Not allowed on workqueue threads */
1847 fixedpri_rv
= ENOTSUP
;
1851 kr
= thread_policy_set_internal(th
, THREAD_EXTENDED_POLICY
,
1852 (thread_policy_t
)&extpol
, THREAD_EXTENDED_POLICY_COUNT
);
1853 if (kr
!= KERN_SUCCESS
) {
1854 fixedpri_rv
= EINVAL
;
1861 if (qos_rv
&& voucher_rv
) {
1862 /* Both failed, give that a unique error. */
1886 bsdthread_add_explicit_override(proc_t p
, mach_port_name_t kport
,
1887 pthread_priority_t pp
, user_addr_t resource
)
1889 thread_qos_t qos
= _pthread_priority_thread_qos(pp
);
1890 if (qos
== THREAD_QOS_UNSPECIFIED
) {
1894 thread_t th
= port_name_to_thread(kport
,
1895 PORT_TO_THREAD_IN_CURRENT_TASK
);
1896 if (th
== THREAD_NULL
) {
1900 int rv
= proc_thread_qos_add_override(p
->task
, th
, 0, qos
, TRUE
,
1901 resource
, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE
);
1903 thread_deallocate(th
);
1908 bsdthread_remove_explicit_override(proc_t p
, mach_port_name_t kport
,
1909 user_addr_t resource
)
1911 thread_t th
= port_name_to_thread(kport
,
1912 PORT_TO_THREAD_IN_CURRENT_TASK
);
1913 if (th
== THREAD_NULL
) {
1917 int rv
= proc_thread_qos_remove_override(p
->task
, th
, 0, resource
,
1918 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE
);
1920 thread_deallocate(th
);
1925 workq_thread_add_dispatch_override(proc_t p
, mach_port_name_t kport
,
1926 pthread_priority_t pp
, user_addr_t ulock_addr
)
1928 struct uu_workq_policy old_pri
, new_pri
;
1929 struct workqueue
*wq
= proc_get_wqptr(p
);
1931 thread_qos_t qos_override
= _pthread_priority_thread_qos(pp
);
1932 if (qos_override
== THREAD_QOS_UNSPECIFIED
) {
1936 thread_t thread
= port_name_to_thread(kport
,
1937 PORT_TO_THREAD_IN_CURRENT_TASK
);
1938 if (thread
== THREAD_NULL
) {
1942 struct uthread
*uth
= get_bsdthread_info(thread
);
1943 if ((thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
) == 0) {
1944 thread_deallocate(thread
);
1948 WQ_TRACE_WQ(TRACE_wq_override_dispatch
| DBG_FUNC_NONE
,
1949 wq
, thread_tid(thread
), 1, pp
, 0);
1951 thread_mtx_lock(thread
);
1957 * Workaround lack of explicit support for 'no-fault copyin'
1958 * <rdar://problem/24999882>, as disabling preemption prevents paging in
1960 disable_preemption();
1961 rc
= copyin_atomic32(ulock_addr
, &val
);
1962 enable_preemption();
1963 if (rc
== 0 && ulock_owner_value_to_port_name(val
) != kport
) {
1968 workq_lock_spin(wq
);
1970 old_pri
= uth
->uu_workq_pri
;
1971 if (old_pri
.qos_override
>= qos_override
) {
1973 } else if (thread
== current_thread()) {
1975 new_pri
.qos_override
= qos_override
;
1976 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
1978 uth
->uu_workq_pri
.qos_override
= qos_override
;
1979 if (qos_override
> workq_pri_override(old_pri
)) {
1980 thread_set_workq_override(thread
, qos_override
);
1987 thread_mtx_unlock(thread
);
1988 thread_deallocate(thread
);
1993 workq_thread_reset_dispatch_override(proc_t p
, thread_t thread
)
1995 struct uu_workq_policy old_pri
, new_pri
;
1996 struct workqueue
*wq
= proc_get_wqptr(p
);
1997 struct uthread
*uth
= get_bsdthread_info(thread
);
1999 if ((thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
) == 0) {
2003 WQ_TRACE_WQ(TRACE_wq_override_reset
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
2005 workq_lock_spin(wq
);
2006 old_pri
= new_pri
= uth
->uu_workq_pri
;
2007 new_pri
.qos_override
= THREAD_QOS_UNSPECIFIED
;
2008 workq_thread_update_bucket(p
, wq
, uth
, old_pri
, new_pri
, false);
2014 workq_thread_allow_kill(__unused proc_t p
, thread_t thread
, bool enable
)
2016 if (!(thread_get_tag(thread
) & THREAD_TAG_WORKQUEUE
)) {
2017 // If the thread isn't a workqueue thread, don't set the
2018 // kill_allowed bit; however, we still need to return 0
2019 // instead of an error code since this code is executed
2020 // on the abort path which needs to not depend on the
2021 // pthread_t (returning an error depends on pthread_t via
2025 struct uthread
*uth
= get_bsdthread_info(thread
);
2026 uth
->uu_workq_pthread_kill_allowed
= enable
;
2031 bsdthread_get_max_parallelism(thread_qos_t qos
, unsigned long flags
,
2034 static_assert(QOS_PARALLELISM_COUNT_LOGICAL
==
2035 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL
, "logical");
2036 static_assert(QOS_PARALLELISM_REALTIME
==
2037 _PTHREAD_QOS_PARALLELISM_REALTIME
, "realtime");
2039 if (flags
& ~(QOS_PARALLELISM_REALTIME
| QOS_PARALLELISM_COUNT_LOGICAL
)) {
2043 if (flags
& QOS_PARALLELISM_REALTIME
) {
2047 } else if (qos
== THREAD_QOS_UNSPECIFIED
|| qos
>= THREAD_QOS_LAST
) {
2051 *retval
= qos_max_parallelism(qos
, flags
);
2055 #define ENSURE_UNUSED(arg) \
2056 ({ if ((arg) != 0) { return EINVAL; } })
2059 bsdthread_ctl(struct proc
*p
, struct bsdthread_ctl_args
*uap
, int *retval
)
2062 case BSDTHREAD_CTL_QOS_OVERRIDE_START
:
2063 return bsdthread_add_explicit_override(p
, (mach_port_name_t
)uap
->arg1
,
2064 (pthread_priority_t
)uap
->arg2
, uap
->arg3
);
2065 case BSDTHREAD_CTL_QOS_OVERRIDE_END
:
2066 ENSURE_UNUSED(uap
->arg3
);
2067 return bsdthread_remove_explicit_override(p
, (mach_port_name_t
)uap
->arg1
,
2068 (user_addr_t
)uap
->arg2
);
2070 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH
:
2071 return workq_thread_add_dispatch_override(p
, (mach_port_name_t
)uap
->arg1
,
2072 (pthread_priority_t
)uap
->arg2
, uap
->arg3
);
2073 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET
:
2074 return workq_thread_reset_dispatch_override(p
, current_thread());
2076 case BSDTHREAD_CTL_SET_SELF
:
2077 return bsdthread_set_self(p
, current_thread(),
2078 (pthread_priority_t
)uap
->arg1
, (mach_port_name_t
)uap
->arg2
,
2079 (enum workq_set_self_flags
)uap
->arg3
);
2081 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM
:
2082 ENSURE_UNUSED(uap
->arg3
);
2083 return bsdthread_get_max_parallelism((thread_qos_t
)uap
->arg1
,
2084 (unsigned long)uap
->arg2
, retval
);
2085 case BSDTHREAD_CTL_WORKQ_ALLOW_KILL
:
2086 ENSURE_UNUSED(uap
->arg2
);
2087 ENSURE_UNUSED(uap
->arg3
);
2088 return workq_thread_allow_kill(p
, current_thread(), (bool)uap
->arg1
);
2090 case BSDTHREAD_CTL_SET_QOS
:
2091 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD
:
2092 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET
:
2093 /* no longer supported */
2101 #pragma mark workqueue thread manipulation
2104 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
2105 struct uthread
*uth
, uint32_t setup_flags
);
2108 workq_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
2109 struct uthread
*uth
, uint32_t setup_flags
);
2111 static void workq_setup_and_run(proc_t p
, struct uthread
*uth
, int flags
) __dead2
;
2113 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2114 static inline uint64_t
2115 workq_trace_req_id(workq_threadreq_t req
)
2117 struct kqworkloop
*kqwl
;
2118 if (req
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
2119 kqwl
= __container_of(req
, struct kqworkloop
, kqwl_request
);
2120 return kqwl
->kqwl_dynamicid
;
2123 return VM_KERNEL_ADDRHIDE(req
);
2128 * Entry point for libdispatch to ask for threads
2131 workq_reqthreads(struct proc
*p
, uint32_t reqcount
, pthread_priority_t pp
)
2133 thread_qos_t qos
= _pthread_priority_thread_qos(pp
);
2134 struct workqueue
*wq
= proc_get_wqptr(p
);
2135 uint32_t unpaced
, upcall_flags
= WQ_FLAG_THREAD_NEWSPI
;
2137 if (wq
== NULL
|| reqcount
<= 0 || reqcount
> UINT16_MAX
||
2138 qos
== THREAD_QOS_UNSPECIFIED
) {
2142 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads
| DBG_FUNC_NONE
,
2143 wq
, reqcount
, pp
, 0, 0);
2145 workq_threadreq_t req
= zalloc(workq_zone_threadreq
);
2146 priority_queue_entry_init(&req
->tr_entry
);
2147 req
->tr_state
= WORKQ_TR_STATE_NEW
;
2151 if (pp
& _PTHREAD_PRIORITY_OVERCOMMIT_FLAG
) {
2152 req
->tr_flags
|= WORKQ_TR_FLAG_OVERCOMMIT
;
2153 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
2156 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate
| DBG_FUNC_NONE
,
2157 wq
, workq_trace_req_id(req
), req
->tr_qos
, reqcount
, 0);
2159 workq_lock_spin(wq
);
2161 if (_wq_exiting(wq
)) {
2166 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2167 * threads without pacing, to inform the scheduler of that workload.
2169 * The last requests, or the ones that failed the admission checks are
2170 * enqueued and go through the regular creator codepath.
2172 * If there aren't enough threads, add one, but re-evaluate everything
2173 * as conditions may now have changed.
2175 if (reqcount
> 1 && (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
2176 unpaced
= workq_constrained_allowance(wq
, qos
, NULL
, false);
2177 if (unpaced
>= reqcount
- 1) {
2178 unpaced
= reqcount
- 1;
2181 unpaced
= reqcount
- 1;
2185 * This path does not currently handle custom workloop parameters
2186 * when creating threads for parallelism.
2188 assert(!(req
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
));
2191 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
2193 while (unpaced
> 0 && wq
->wq_thidlecount
) {
2194 struct uthread
*uth
;
2196 uint8_t uu_flags
= UT_WORKQ_EARLY_BOUND
;
2198 if (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
2199 uu_flags
|= UT_WORKQ_OVERCOMMIT
;
2202 uth
= workq_pop_idle_thread(wq
, uu_flags
, &needs_wakeup
);
2204 _wq_thactive_inc(wq
, qos
);
2205 wq
->wq_thscheduled_count
[_wq_bucket(qos
)]++;
2206 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
2209 uth
->uu_save
.uus_workq_park_data
.upcall_flags
= upcall_flags
;
2210 uth
->uu_save
.uus_workq_park_data
.thread_request
= req
;
2212 workq_thread_wakeup(uth
);
2217 } while (unpaced
&& wq
->wq_nthreads
< wq_max_threads
&&
2218 workq_add_new_idle_thread(p
, wq
));
2220 if (_wq_exiting(wq
)) {
2224 req
->tr_count
= reqcount
;
2225 if (workq_threadreq_enqueue(wq
, req
)) {
2226 /* This can drop the workqueue lock, and take it again */
2227 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
2234 zfree(workq_zone_threadreq
, req
);
2239 workq_kern_threadreq_initiate(struct proc
*p
, workq_threadreq_t req
,
2240 struct turnstile
*workloop_ts
, thread_qos_t qos
,
2241 workq_kern_threadreq_flags_t flags
)
2243 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2244 struct uthread
*uth
= NULL
;
2246 assert(req
->tr_flags
& (WORKQ_TR_FLAG_WORKLOOP
| WORKQ_TR_FLAG_KEVENT
));
2248 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
2249 workq_threadreq_param_t trp
= kqueue_threadreq_workloop_param(req
);
2250 qos
= thread_workq_qos_for_pri(trp
.trp_pri
);
2251 if (qos
== THREAD_QOS_UNSPECIFIED
) {
2252 qos
= WORKQ_THREAD_QOS_ABOVEUI
;
2256 assert(req
->tr_state
== WORKQ_TR_STATE_IDLE
);
2257 priority_queue_entry_init(&req
->tr_entry
);
2259 req
->tr_state
= WORKQ_TR_STATE_NEW
;
2262 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate
| DBG_FUNC_NONE
, wq
,
2263 workq_trace_req_id(req
), qos
, 1, 0);
2265 if (flags
& WORKQ_THREADREQ_ATTEMPT_REBIND
) {
2267 * we're called back synchronously from the context of
2268 * kqueue_threadreq_unbind from within workq_thread_return()
2269 * we can try to match up this thread with this request !
2271 uth
= current_uthread();
2272 assert(uth
->uu_kqr_bound
== NULL
);
2275 workq_lock_spin(wq
);
2276 if (_wq_exiting(wq
)) {
2277 req
->tr_state
= WORKQ_TR_STATE_IDLE
;
2282 if (uth
&& workq_threadreq_admissible(wq
, uth
, req
)) {
2283 assert(uth
!= wq
->wq_creator
);
2284 if (uth
->uu_workq_pri
.qos_bucket
!= req
->tr_qos
) {
2285 _wq_thactive_move(wq
, uth
->uu_workq_pri
.qos_bucket
, req
->tr_qos
);
2286 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ false);
2289 * We're called from workq_kern_threadreq_initiate()
2290 * due to an unbind, with the kq req held.
2292 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
2293 workq_trace_req_id(req
), 0, 0, 0);
2295 kqueue_threadreq_bind(p
, req
, uth
->uu_thread
, 0);
2298 workq_perform_turnstile_operation_locked(wq
, ^{
2299 turnstile_update_inheritor(workloop_ts
, wq
->wq_turnstile
,
2300 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_TURNSTILE
);
2301 turnstile_update_inheritor_complete(workloop_ts
,
2302 TURNSTILE_INTERLOCK_HELD
);
2305 if (workq_threadreq_enqueue(wq
, req
)) {
2306 workq_schedule_creator(p
, wq
, flags
);
2316 workq_kern_threadreq_modify(struct proc
*p
, workq_threadreq_t req
,
2317 thread_qos_t qos
, workq_kern_threadreq_flags_t flags
)
2319 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2320 bool make_overcommit
= false;
2322 if (req
->tr_flags
& WORKQ_TR_FLAG_WL_OUTSIDE_QOS
) {
2323 /* Requests outside-of-QoS shouldn't accept modify operations */
2327 workq_lock_spin(wq
);
2329 assert(req
->tr_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2330 assert(req
->tr_flags
& (WORKQ_TR_FLAG_KEVENT
| WORKQ_TR_FLAG_WORKLOOP
));
2332 if (req
->tr_state
== WORKQ_TR_STATE_BINDING
) {
2333 kqueue_threadreq_bind(p
, req
, req
->tr_thread
, 0);
2338 if (flags
& WORKQ_THREADREQ_MAKE_OVERCOMMIT
) {
2339 make_overcommit
= (req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0;
2342 if (_wq_exiting(wq
) || (req
->tr_qos
== qos
&& !make_overcommit
)) {
2347 assert(req
->tr_count
== 1);
2348 if (req
->tr_state
!= WORKQ_TR_STATE_QUEUED
) {
2349 panic("Invalid thread request (%p) state %d", req
, req
->tr_state
);
2352 WQ_TRACE_WQ(TRACE_wq_thread_request_modify
| DBG_FUNC_NONE
, wq
,
2353 workq_trace_req_id(req
), qos
, 0, 0);
2355 struct priority_queue
*pq
= workq_priority_queue_for_req(wq
, req
);
2356 workq_threadreq_t req_max
;
2359 * Stage 1: Dequeue the request from its priority queue.
2361 * If we dequeue the root item of the constrained priority queue,
2362 * maintain the best constrained request qos invariant.
2364 if (priority_queue_remove(pq
, &req
->tr_entry
,
2365 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
)) {
2366 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
2367 _wq_thactive_refresh_best_constrained_req_qos(wq
);
2372 * Stage 2: Apply changes to the thread request
2374 * If the item will not become the root of the priority queue it belongs to,
2375 * then we need to wait in line, just enqueue and return quickly.
2377 if (__improbable(make_overcommit
)) {
2378 req
->tr_flags
^= WORKQ_TR_FLAG_OVERCOMMIT
;
2379 pq
= workq_priority_queue_for_req(wq
, req
);
2383 req_max
= priority_queue_max(pq
, struct workq_threadreq_s
, tr_entry
);
2384 if (req_max
&& req_max
->tr_qos
>= qos
) {
2385 priority_queue_insert(pq
, &req
->tr_entry
, workq_priority_for_req(req
),
2386 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE
);
2392 * Stage 3: Reevaluate whether we should run the thread request.
2394 * Pretend the thread request is new again:
2395 * - adjust wq_reqcount to not count it anymore.
2396 * - make its state WORKQ_TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
2397 * properly attempts a synchronous bind)
2400 req
->tr_state
= WORKQ_TR_STATE_NEW
;
2401 if (workq_threadreq_enqueue(wq
, req
)) {
2402 workq_schedule_creator(p
, wq
, flags
);
2408 workq_kern_threadreq_lock(struct proc
*p
)
2410 workq_lock_spin(proc_get_wqptr_fast(p
));
2414 workq_kern_threadreq_unlock(struct proc
*p
)
2416 workq_unlock(proc_get_wqptr_fast(p
));
2420 workq_kern_threadreq_update_inheritor(struct proc
*p
, workq_threadreq_t req
,
2421 thread_t owner
, struct turnstile
*wl_ts
,
2422 turnstile_update_flags_t flags
)
2424 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2425 turnstile_inheritor_t inheritor
;
2427 assert(req
->tr_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2428 assert(req
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
);
2429 workq_lock_held(wq
);
2431 if (req
->tr_state
== WORKQ_TR_STATE_BINDING
) {
2432 kqueue_threadreq_bind(p
, req
, req
->tr_thread
,
2433 KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE
);
2437 if (_wq_exiting(wq
)) {
2438 inheritor
= TURNSTILE_INHERITOR_NULL
;
2440 if (req
->tr_state
!= WORKQ_TR_STATE_QUEUED
) {
2441 panic("Invalid thread request (%p) state %d", req
, req
->tr_state
);
2446 flags
|= TURNSTILE_INHERITOR_THREAD
;
2448 inheritor
= wq
->wq_turnstile
;
2449 flags
|= TURNSTILE_INHERITOR_TURNSTILE
;
2453 workq_perform_turnstile_operation_locked(wq
, ^{
2454 turnstile_update_inheritor(wl_ts
, inheritor
, flags
);
2459 workq_kern_threadreq_redrive(struct proc
*p
, workq_kern_threadreq_flags_t flags
)
2461 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
2463 workq_lock_spin(wq
);
2464 workq_schedule_creator(p
, wq
, flags
);
2469 workq_schedule_creator_turnstile_redrive(struct workqueue
*wq
, bool locked
)
2472 workq_schedule_creator(NULL
, wq
, WORKQ_THREADREQ_NONE
);
2474 workq_schedule_immediate_thread_creation(wq
);
2479 workq_thread_return(struct proc
*p
, struct workq_kernreturn_args
*uap
,
2480 struct workqueue
*wq
)
2482 thread_t th
= current_thread();
2483 struct uthread
*uth
= get_bsdthread_info(th
);
2484 workq_threadreq_t kqr
= uth
->uu_kqr_bound
;
2485 workq_threadreq_param_t trp
= { };
2486 int nevents
= uap
->affinity
, error
;
2487 user_addr_t eventlist
= uap
->item
;
2489 if (((thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) == 0) ||
2490 (uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
2494 if (eventlist
&& nevents
&& kqr
== NULL
) {
2498 /* reset signal mask on the workqueue thread to default state */
2499 if (uth
->uu_sigmask
!= (sigset_t
)(~workq_threadmask
)) {
2501 uth
->uu_sigmask
= ~workq_threadmask
;
2505 if (kqr
&& kqr
->tr_flags
& WORKQ_TR_FLAG_WL_PARAMS
) {
2507 * Ensure we store the threadreq param before unbinding
2508 * the kqr from this thread.
2510 trp
= kqueue_threadreq_workloop_param(kqr
);
2514 * Freeze thee base pri while we decide the fate of this thread.
2517 * - we return to user and kevent_cleanup will have unfrozen the base pri,
2518 * - or we proceed to workq_select_threadreq_or_park_and_unlock() who will.
2520 thread_freeze_base_pri(th
);
2523 uint32_t upcall_flags
= WQ_FLAG_THREAD_NEWSPI
| WQ_FLAG_THREAD_REUSE
;
2524 if (kqr
->tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
2525 upcall_flags
|= WQ_FLAG_THREAD_WORKLOOP
| WQ_FLAG_THREAD_KEVENT
;
2527 upcall_flags
|= WQ_FLAG_THREAD_KEVENT
;
2529 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
2530 upcall_flags
|= WQ_FLAG_THREAD_EVENT_MANAGER
;
2532 if (uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) {
2533 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
2535 if (uth
->uu_workq_flags
& UT_WORKQ_OUTSIDE_QOS
) {
2536 upcall_flags
|= WQ_FLAG_THREAD_OUTSIDEQOS
;
2538 upcall_flags
|= uth
->uu_workq_pri
.qos_req
|
2539 WQ_FLAG_THREAD_PRIO_QOS
;
2543 error
= pthread_functions
->workq_handle_stack_events(p
, th
,
2544 get_task_map(p
->task
), uth
->uu_workq_stackaddr
,
2545 uth
->uu_workq_thport
, eventlist
, nevents
, upcall_flags
);
2547 assert(uth
->uu_kqr_bound
== kqr
);
2551 // pthread is supposed to pass KEVENT_FLAG_PARKING here
2552 // which should cause the above call to either:
2554 // - return an error
2555 // - return 0 and have unbound properly
2556 assert(uth
->uu_kqr_bound
== NULL
);
2559 WQ_TRACE_WQ(TRACE_wq_runthread
| DBG_FUNC_END
, wq
, uap
->options
, 0, 0, 0);
2561 thread_sched_call(th
, NULL
);
2562 thread_will_park_or_terminate(th
);
2563 #if CONFIG_WORKLOOP_DEBUG
2564 UU_KEVENT_HISTORY_WRITE_ENTRY(uth
, { .uu_error
= -1, });
2567 workq_lock_spin(wq
);
2568 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2569 uth
->uu_save
.uus_workq_park_data
.workloop_params
= trp
.trp_value
;
2570 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
,
2571 WQ_SETUP_CLEAR_VOUCHER
);
2572 __builtin_unreachable();
2576 * Multiplexed call to interact with the workqueue mechanism
2579 workq_kernreturn(struct proc
*p
, struct workq_kernreturn_args
*uap
, int32_t *retval
)
2581 int options
= uap
->options
;
2582 int arg2
= uap
->affinity
;
2583 int arg3
= uap
->prio
;
2584 struct workqueue
*wq
= proc_get_wqptr(p
);
2587 if ((p
->p_lflag
& P_LREGISTER
) == 0) {
2592 case WQOPS_QUEUE_NEWSPISUPP
: {
2594 * arg2 = offset of serialno into dispatch queue
2595 * arg3 = kevent support
2599 // If we get here, then userspace has indicated support for kevent delivery.
2602 p
->p_dispatchqueue_serialno_offset
= (uint64_t)offset
;
2605 case WQOPS_QUEUE_REQTHREADS
: {
2607 * arg2 = number of threads to start
2610 error
= workq_reqthreads(p
, arg2
, arg3
);
2613 case WQOPS_SET_EVENT_MANAGER_PRIORITY
: {
2615 * arg2 = priority for the manager thread
2617 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
2618 * the low bits of the value contains a scheduling priority
2619 * instead of a QOS value
2621 pthread_priority_t pri
= arg2
;
2629 * Normalize the incoming priority so that it is ordered numerically.
2631 if (pri
& _PTHREAD_PRIORITY_SCHED_PRI_FLAG
) {
2632 pri
&= (_PTHREAD_PRIORITY_SCHED_PRI_MASK
|
2633 _PTHREAD_PRIORITY_SCHED_PRI_FLAG
);
2635 thread_qos_t qos
= _pthread_priority_thread_qos(pri
);
2636 int relpri
= _pthread_priority_relpri(pri
);
2637 if (relpri
> 0 || relpri
< THREAD_QOS_MIN_TIER_IMPORTANCE
||
2638 qos
== THREAD_QOS_UNSPECIFIED
) {
2642 pri
&= ~_PTHREAD_PRIORITY_FLAGS_MASK
;
2646 * If userspace passes a scheduling priority, that wins over any QoS.
2647 * Userspace should takes care not to lower the priority this way.
2649 workq_lock_spin(wq
);
2650 if (wq
->wq_event_manager_priority
< (uint32_t)pri
) {
2651 wq
->wq_event_manager_priority
= (uint32_t)pri
;
2656 case WQOPS_THREAD_KEVENT_RETURN
:
2657 case WQOPS_THREAD_WORKLOOP_RETURN
:
2658 case WQOPS_THREAD_RETURN
: {
2659 error
= workq_thread_return(p
, uap
, wq
);
2663 case WQOPS_SHOULD_NARROW
: {
2665 * arg2 = priority to test
2668 thread_t th
= current_thread();
2669 struct uthread
*uth
= get_bsdthread_info(th
);
2670 if (((thread_get_tag(th
) & THREAD_TAG_WORKQUEUE
) == 0) ||
2671 (uth
->uu_workq_flags
& (UT_WORKQ_DYING
| UT_WORKQ_OVERCOMMIT
))) {
2676 thread_qos_t qos
= _pthread_priority_thread_qos(arg2
);
2677 if (qos
== THREAD_QOS_UNSPECIFIED
) {
2681 workq_lock_spin(wq
);
2682 bool should_narrow
= !workq_constrained_allowance(wq
, qos
, uth
, false);
2685 *retval
= should_narrow
;
2688 case WQOPS_SETUP_DISPATCH
: {
2690 * item = pointer to workq_dispatch_config structure
2691 * arg2 = sizeof(item)
2693 struct workq_dispatch_config cfg
;
2694 bzero(&cfg
, sizeof(cfg
));
2696 error
= copyin(uap
->item
, &cfg
, MIN(sizeof(cfg
), (unsigned long) arg2
));
2701 if (cfg
.wdc_flags
& ~WORKQ_DISPATCH_SUPPORTED_FLAGS
||
2702 cfg
.wdc_version
< WORKQ_DISPATCH_MIN_SUPPORTED_VERSION
) {
2707 /* Load fields from version 1 */
2708 p
->p_dispatchqueue_serialno_offset
= cfg
.wdc_queue_serialno_offs
;
2710 /* Load fields from version 2 */
2711 if (cfg
.wdc_version
>= 2) {
2712 p
->p_dispatchqueue_label_offset
= cfg
.wdc_queue_label_offs
;
2726 * We have no work to do, park ourselves on the idle list.
2728 * Consumes the workqueue lock and does not return.
2730 __attribute__((noreturn
, noinline
))
2732 workq_park_and_unlock(proc_t p
, struct workqueue
*wq
, struct uthread
*uth
,
2733 uint32_t setup_flags
)
2735 assert(uth
== current_uthread());
2736 assert(uth
->uu_kqr_bound
== NULL
);
2737 workq_push_idle_thread(p
, wq
, uth
, setup_flags
); // may not return
2739 workq_thread_reset_cpupercent(NULL
, uth
);
2741 if ((uth
->uu_workq_flags
& UT_WORKQ_IDLE_CLEANUP
) &&
2742 !(uth
->uu_workq_flags
& UT_WORKQ_DYING
)) {
2746 * workq_push_idle_thread() will unset `has_stack`
2747 * if it wants us to free the stack before parking.
2749 if (!uth
->uu_save
.uus_workq_park_data
.has_stack
) {
2750 pthread_functions
->workq_markfree_threadstack(p
, uth
->uu_thread
,
2751 get_task_map(p
->task
), uth
->uu_workq_stackaddr
);
2755 * When we remove the voucher from the thread, we may lose our importance
2756 * causing us to get preempted, so we do this after putting the thread on
2757 * the idle list. Then, when we get our importance back we'll be able to
2758 * use this thread from e.g. the kevent call out to deliver a boosting
2761 __assert_only kern_return_t kr
;
2762 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
2763 assert(kr
== KERN_SUCCESS
);
2765 workq_lock_spin(wq
);
2766 uth
->uu_workq_flags
&= ~UT_WORKQ_IDLE_CLEANUP
;
2767 setup_flags
&= ~WQ_SETUP_CLEAR_VOUCHER
;
2770 if (uth
->uu_workq_flags
& UT_WORKQ_RUNNING
) {
2772 * While we'd dropped the lock to unset our voucher, someone came
2773 * around and made us runnable. But because we weren't waiting on the
2774 * event their thread_wakeup() was ineffectual. To correct for that,
2775 * we just run the continuation ourselves.
2777 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2778 workq_unpark_select_threadreq_or_park_and_unlock(p
, wq
, uth
, setup_flags
);
2779 __builtin_unreachable();
2782 if (uth
->uu_workq_flags
& UT_WORKQ_DYING
) {
2783 workq_unpark_for_death_and_unlock(p
, wq
, uth
,
2784 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
, setup_flags
);
2785 __builtin_unreachable();
2788 thread_set_pending_block_hint(uth
->uu_thread
, kThreadWaitParkedWorkQueue
);
2789 assert_wait(workq_parked_wait_event(uth
), THREAD_INTERRUPTIBLE
);
2791 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_END
, wq
, 0, 0, 0, 0);
2792 thread_block(workq_unpark_continue
);
2793 __builtin_unreachable();
2797 workq_may_start_event_mgr_thread(struct workqueue
*wq
, struct uthread
*uth
)
2800 * There's an event manager request and either:
2801 * - no event manager currently running
2802 * - we are re-using the event manager
2804 return wq
->wq_thscheduled_count
[_wq_bucket(WORKQ_THREAD_QOS_MANAGER
)] == 0 ||
2805 (uth
&& uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
);
2809 workq_constrained_allowance(struct workqueue
*wq
, thread_qos_t at_qos
,
2810 struct uthread
*uth
, bool may_start_timer
)
2812 assert(at_qos
!= WORKQ_THREAD_QOS_MANAGER
);
2815 uint32_t max_count
= wq
->wq_constrained_threads_scheduled
;
2816 if (uth
&& (uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
2818 * don't count the current thread as scheduled
2820 assert(max_count
> 0);
2823 if (max_count
>= wq_max_constrained_threads
) {
2824 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 1,
2825 wq
->wq_constrained_threads_scheduled
,
2826 wq_max_constrained_threads
, 0);
2828 * we need 1 or more constrained threads to return to the kernel before
2829 * we can dispatch additional work
2833 max_count
-= wq_max_constrained_threads
;
2836 * Compute a metric for many how many threads are active. We find the
2837 * highest priority request outstanding and then add up the number of
2838 * active threads in that and all higher-priority buckets. We'll also add
2839 * any "busy" threads which are not active but blocked recently enough that
2840 * we can't be sure they've gone idle yet. We'll then compare this metric
2841 * to our max concurrency to decide whether to add a new thread.
2844 uint32_t busycount
, thactive_count
;
2846 thactive_count
= _wq_thactive_aggregate_downto_qos(wq
, _wq_thactive(wq
),
2847 at_qos
, &busycount
, NULL
);
2849 if (uth
&& uth
->uu_workq_pri
.qos_bucket
!= WORKQ_THREAD_QOS_MANAGER
&&
2850 at_qos
<= uth
->uu_workq_pri
.qos_bucket
) {
2852 * Don't count this thread as currently active, but only if it's not
2853 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
2856 assert(thactive_count
> 0);
2860 count
= wq_max_parallelism
[_wq_bucket(at_qos
)];
2861 if (count
> thactive_count
+ busycount
) {
2862 count
-= thactive_count
+ busycount
;
2863 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 2,
2864 thactive_count
, busycount
, 0);
2865 return MIN(count
, max_count
);
2867 WQ_TRACE_WQ(TRACE_wq_constrained_admission
| DBG_FUNC_NONE
, wq
, 3,
2868 thactive_count
, busycount
, 0);
2871 if (busycount
&& may_start_timer
) {
2873 * If this is called from the add timer, we won't have another timer
2874 * fire when the thread exits the "busy" state, so rearm the timer.
2876 workq_schedule_delayed_thread_creation(wq
, 0);
2883 workq_threadreq_admissible(struct workqueue
*wq
, struct uthread
*uth
,
2884 workq_threadreq_t req
)
2886 if (req
->tr_qos
== WORKQ_THREAD_QOS_MANAGER
) {
2887 return workq_may_start_event_mgr_thread(wq
, uth
);
2889 if ((req
->tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) == 0) {
2890 return workq_constrained_allowance(wq
, req
->tr_qos
, uth
, true);
2895 static workq_threadreq_t
2896 workq_threadreq_select_for_creator(struct workqueue
*wq
)
2898 workq_threadreq_t req_qos
, req_pri
, req_tmp
;
2899 thread_qos_t qos
= THREAD_QOS_UNSPECIFIED
;
2902 req_tmp
= wq
->wq_event_manager_threadreq
;
2903 if (req_tmp
&& workq_may_start_event_mgr_thread(wq
, NULL
)) {
2908 * Compute the best priority request, and ignore the turnstile for now
2911 req_pri
= priority_queue_max(&wq
->wq_special_queue
,
2912 struct workq_threadreq_s
, tr_entry
);
2914 pri
= priority_queue_entry_key(&wq
->wq_special_queue
, &req_pri
->tr_entry
);
2918 * Compute the best QoS Request, and check whether it beats the "pri" one
2921 req_qos
= priority_queue_max(&wq
->wq_overcommit_queue
,
2922 struct workq_threadreq_s
, tr_entry
);
2924 qos
= req_qos
->tr_qos
;
2927 req_tmp
= priority_queue_max(&wq
->wq_constrained_queue
,
2928 struct workq_threadreq_s
, tr_entry
);
2930 if (req_tmp
&& qos
< req_tmp
->tr_qos
) {
2931 if (pri
&& pri
>= thread_workq_pri_for_qos(req_tmp
->tr_qos
)) {
2935 if (workq_constrained_allowance(wq
, req_tmp
->tr_qos
, NULL
, true)) {
2937 * If the constrained thread request is the best one and passes
2938 * the admission check, pick it.
2944 if (pri
&& (!qos
|| pri
>= thread_workq_pri_for_qos(qos
))) {
2953 * If we had no eligible request but we have a turnstile push,
2954 * it must be a non overcommit thread request that failed
2955 * the admission check.
2957 * Just fake a BG thread request so that if the push stops the creator
2958 * priority just drops to 4.
2960 if (turnstile_workq_proprietor_of_max_turnstile(wq
->wq_turnstile
, NULL
)) {
2961 static struct workq_threadreq_s workq_sync_push_fake_req
= {
2962 .tr_qos
= THREAD_QOS_BACKGROUND
,
2965 return &workq_sync_push_fake_req
;
2971 static workq_threadreq_t
2972 workq_threadreq_select(struct workqueue
*wq
, struct uthread
*uth
)
2974 workq_threadreq_t req_qos
, req_pri
, req_tmp
;
2975 uintptr_t proprietor
;
2976 thread_qos_t qos
= THREAD_QOS_UNSPECIFIED
;
2979 if (uth
== wq
->wq_creator
) {
2983 req_tmp
= wq
->wq_event_manager_threadreq
;
2984 if (req_tmp
&& workq_may_start_event_mgr_thread(wq
, uth
)) {
2989 * Compute the best priority request (special or turnstile)
2992 pri
= turnstile_workq_proprietor_of_max_turnstile(wq
->wq_turnstile
,
2995 struct kqworkloop
*kqwl
= (struct kqworkloop
*)proprietor
;
2996 req_pri
= &kqwl
->kqwl_request
;
2997 if (req_pri
->tr_state
!= WORKQ_TR_STATE_QUEUED
) {
2998 panic("Invalid thread request (%p) state %d",
2999 req_pri
, req_pri
->tr_state
);
3005 req_tmp
= priority_queue_max(&wq
->wq_special_queue
,
3006 struct workq_threadreq_s
, tr_entry
);
3007 if (req_tmp
&& pri
< priority_queue_entry_key(&wq
->wq_special_queue
,
3008 &req_tmp
->tr_entry
)) {
3010 pri
= priority_queue_entry_key(&wq
->wq_special_queue
, &req_tmp
->tr_entry
);
3014 * Compute the best QoS Request, and check whether it beats the "pri" one
3017 req_qos
= priority_queue_max(&wq
->wq_overcommit_queue
,
3018 struct workq_threadreq_s
, tr_entry
);
3020 qos
= req_qos
->tr_qos
;
3023 req_tmp
= priority_queue_max(&wq
->wq_constrained_queue
,
3024 struct workq_threadreq_s
, tr_entry
);
3026 if (req_tmp
&& qos
< req_tmp
->tr_qos
) {
3027 if (pri
&& pri
>= thread_workq_pri_for_qos(req_tmp
->tr_qos
)) {
3031 if (workq_constrained_allowance(wq
, req_tmp
->tr_qos
, uth
, true)) {
3033 * If the constrained thread request is the best one and passes
3034 * the admission check, pick it.
3040 if (req_pri
&& (!qos
|| pri
>= thread_workq_pri_for_qos(qos
))) {
3048 * The creator is an anonymous thread that is counted as scheduled,
3049 * but otherwise without its scheduler callback set or tracked as active
3050 * that is used to make other threads.
3052 * When more requests are added or an existing one is hurried along,
3053 * a creator is elected and setup, or the existing one overridden accordingly.
3055 * While this creator is in flight, because no request has been dequeued,
3056 * already running threads have a chance at stealing thread requests avoiding
3057 * useless context switches, and the creator once scheduled may not find any
3058 * work to do and will then just park again.
3060 * The creator serves the dual purpose of informing the scheduler of work that
3061 * hasn't be materialized as threads yet, and also as a natural pacing mechanism
3062 * for thread creation.
3064 * By being anonymous (and not bound to anything) it means that thread requests
3065 * can be stolen from this creator by threads already on core yielding more
3066 * efficient scheduling and reduced context switches.
3069 workq_schedule_creator(proc_t p
, struct workqueue
*wq
,
3070 workq_kern_threadreq_flags_t flags
)
3072 workq_threadreq_t req
;
3073 struct uthread
*uth
;
3076 workq_lock_held(wq
);
3077 assert(p
|| (flags
& WORKQ_THREADREQ_CAN_CREATE_THREADS
) == 0);
3080 uth
= wq
->wq_creator
;
3082 if (!wq
->wq_reqcount
) {
3084 * There is no thread request left.
3086 * If there is a creator, leave everything in place, so that it cleans
3087 * up itself in workq_push_idle_thread().
3089 * Else, make sure the turnstile state is reset to no inheritor.
3092 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
3097 req
= workq_threadreq_select_for_creator(wq
);
3100 * There isn't a thread request that passes the admission check.
3102 * If there is a creator, do not touch anything, the creator will sort
3103 * it out when it runs.
3105 * Else, set the inheritor to "WORKQ" so that the turnstile propagation
3106 * code calls us if anything changes.
3109 workq_turnstile_update_inheritor(wq
, wq
, TURNSTILE_INHERITOR_WORKQ
);
3116 * We need to maybe override the creator we already have
3118 if (workq_thread_needs_priority_change(req
, uth
)) {
3119 WQ_TRACE_WQ(TRACE_wq_creator_select
| DBG_FUNC_NONE
,
3120 wq
, 1, thread_tid(uth
->uu_thread
), req
->tr_qos
, 0);
3121 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
3123 assert(wq
->wq_inheritor
== uth
->uu_thread
);
3124 } else if (wq
->wq_thidlecount
) {
3126 * We need to unpark a creator thread
3128 wq
->wq_creator
= uth
= workq_pop_idle_thread(wq
, UT_WORKQ_OVERCOMMIT
,
3130 if (workq_thread_needs_priority_change(req
, uth
)) {
3131 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
3133 workq_turnstile_update_inheritor(wq
, uth
->uu_thread
,
3134 TURNSTILE_INHERITOR_THREAD
);
3135 WQ_TRACE_WQ(TRACE_wq_creator_select
| DBG_FUNC_NONE
,
3136 wq
, 2, thread_tid(uth
->uu_thread
), req
->tr_qos
, 0);
3137 uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
= wq
->wq_fulfilled
;
3138 uth
->uu_save
.uus_workq_park_data
.yields
= 0;
3140 workq_thread_wakeup(uth
);
3144 * We need to allocate a thread...
3146 if (__improbable(wq
->wq_nthreads
>= wq_max_threads
)) {
3147 /* out of threads, just go away */
3148 flags
= WORKQ_THREADREQ_NONE
;
3149 } else if (flags
& WORKQ_THREADREQ_SET_AST_ON_FAILURE
) {
3150 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ
);
3151 } else if (!(flags
& WORKQ_THREADREQ_CAN_CREATE_THREADS
)) {
3152 /* This can drop the workqueue lock, and take it again */
3153 workq_schedule_immediate_thread_creation(wq
);
3154 } else if (workq_add_new_idle_thread(p
, wq
)) {
3157 workq_schedule_delayed_thread_creation(wq
, 0);
3161 * If the current thread is the inheritor:
3163 * If we set the AST, then the thread will stay the inheritor until
3164 * either the AST calls workq_kern_threadreq_redrive(), or it parks
3165 * and calls workq_push_idle_thread().
3167 * Else, the responsibility of the thread creation is with a thread-call
3168 * and we need to clear the inheritor.
3170 if ((flags
& WORKQ_THREADREQ_SET_AST_ON_FAILURE
) == 0 &&
3171 wq
->wq_inheritor
== current_thread()) {
3172 workq_turnstile_update_inheritor(wq
, TURNSTILE_INHERITOR_NULL
, 0);
3178 * Same as workq_unpark_select_threadreq_or_park_and_unlock,
3179 * but do not allow early binds.
3181 * Called with the base pri frozen, will unfreeze it.
3183 __attribute__((noreturn
, noinline
))
3185 workq_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
3186 struct uthread
*uth
, uint32_t setup_flags
)
3188 workq_threadreq_t req
= NULL
;
3189 bool is_creator
= (wq
->wq_creator
== uth
);
3190 bool schedule_creator
= false;
3192 if (__improbable(_wq_exiting(wq
))) {
3193 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 0, 0, 0, 0);
3197 if (wq
->wq_reqcount
== 0) {
3198 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 1, 0, 0, 0);
3202 req
= workq_threadreq_select(wq
, uth
);
3203 if (__improbable(req
== NULL
)) {
3204 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 2, 0, 0, 0);
3208 uint8_t tr_flags
= req
->tr_flags
;
3209 struct turnstile
*req_ts
= kqueue_threadreq_get_turnstile(req
);
3212 * Attempt to setup ourselves as the new thing to run, moving all priority
3213 * pushes to ourselves.
3215 * If the current thread is the creator, then the fact that we are presently
3216 * running is proof that we'll do something useful, so keep going.
3218 * For other cases, peek at the AST to know whether the scheduler wants
3219 * to preempt us, if yes, park instead, and move the thread request
3220 * turnstile back to the workqueue.
3223 workq_perform_turnstile_operation_locked(wq
, ^{
3224 turnstile_update_inheritor(req_ts
, uth
->uu_thread
,
3225 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_THREAD
);
3226 turnstile_update_inheritor_complete(req_ts
,
3227 TURNSTILE_INTERLOCK_HELD
);
3232 WQ_TRACE_WQ(TRACE_wq_creator_select
, wq
, 4, 0,
3233 uth
->uu_save
.uus_workq_park_data
.yields
, 0);
3234 wq
->wq_creator
= NULL
;
3235 _wq_thactive_inc(wq
, req
->tr_qos
);
3236 wq
->wq_thscheduled_count
[_wq_bucket(req
->tr_qos
)]++;
3237 } else if (uth
->uu_workq_pri
.qos_bucket
!= req
->tr_qos
) {
3238 _wq_thactive_move(wq
, uth
->uu_workq_pri
.qos_bucket
, req
->tr_qos
);
3241 workq_thread_reset_pri(wq
, uth
, req
, /*unpark*/ true);
3243 thread_unfreeze_base_pri(uth
->uu_thread
);
3244 #if 0 // <rdar://problem/55259863> to turn this back on
3245 if (__improbable(thread_unfreeze_base_pri(uth
->uu_thread
) && !is_creator
)) {
3247 workq_perform_turnstile_operation_locked(wq
, ^{
3248 turnstile_update_inheritor(req_ts
, wq
->wq_turnstile
,
3249 TURNSTILE_IMMEDIATE_UPDATE
| TURNSTILE_INHERITOR_TURNSTILE
);
3250 turnstile_update_inheritor_complete(req_ts
,
3251 TURNSTILE_INTERLOCK_HELD
);
3254 WQ_TRACE_WQ(TRACE_wq_select_threadreq
| DBG_FUNC_NONE
, wq
, 3, 0, 0, 0);
3260 * We passed all checks, dequeue the request, bind to it, and set it up
3261 * to return to user.
3263 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
3264 workq_trace_req_id(req
), 0, 0, 0);
3266 schedule_creator
= workq_threadreq_dequeue(wq
, req
);
3268 if (tr_flags
& (WORKQ_TR_FLAG_KEVENT
| WORKQ_TR_FLAG_WORKLOOP
)) {
3269 kqueue_threadreq_bind_prepost(p
, req
, uth
);
3271 } else if (req
->tr_count
> 0) {
3275 workq_thread_reset_cpupercent(req
, uth
);
3276 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
3277 uth
->uu_workq_flags
^= UT_WORKQ_NEW
;
3278 setup_flags
|= WQ_SETUP_FIRST_USE
;
3280 if (tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
3281 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) == 0) {
3282 uth
->uu_workq_flags
|= UT_WORKQ_OVERCOMMIT
;
3283 wq
->wq_constrained_threads_scheduled
--;
3286 if ((uth
->uu_workq_flags
& UT_WORKQ_OVERCOMMIT
) != 0) {
3287 uth
->uu_workq_flags
&= ~UT_WORKQ_OVERCOMMIT
;
3288 wq
->wq_constrained_threads_scheduled
++;
3292 if (is_creator
|| schedule_creator
) {
3293 /* This can drop the workqueue lock, and take it again */
3294 workq_schedule_creator(p
, wq
, WORKQ_THREADREQ_CAN_CREATE_THREADS
);
3300 zfree(workq_zone_threadreq
, req
);
3306 uint32_t upcall_flags
= WQ_FLAG_THREAD_NEWSPI
;
3307 if (uth
->uu_workq_pri
.qos_bucket
== WORKQ_THREAD_QOS_MANAGER
) {
3308 upcall_flags
|= WQ_FLAG_THREAD_EVENT_MANAGER
;
3309 } else if (tr_flags
& WORKQ_TR_FLAG_OVERCOMMIT
) {
3310 upcall_flags
|= WQ_FLAG_THREAD_OVERCOMMIT
;
3312 if (tr_flags
& WORKQ_TR_FLAG_KEVENT
) {
3313 upcall_flags
|= WQ_FLAG_THREAD_KEVENT
;
3315 if (tr_flags
& WORKQ_TR_FLAG_WORKLOOP
) {
3316 upcall_flags
|= WQ_FLAG_THREAD_WORKLOOP
| WQ_FLAG_THREAD_KEVENT
;
3318 uth
->uu_save
.uus_workq_park_data
.upcall_flags
= upcall_flags
;
3320 if (tr_flags
& (WORKQ_TR_FLAG_KEVENT
| WORKQ_TR_FLAG_WORKLOOP
)) {
3321 kqueue_threadreq_bind_commit(p
, uth
->uu_thread
);
3323 workq_setup_and_run(p
, uth
, setup_flags
);
3324 __builtin_unreachable();
3327 thread_unfreeze_base_pri(uth
->uu_thread
);
3328 #if 0 // <rdar://problem/55259863>
3331 workq_park_and_unlock(p
, wq
, uth
, setup_flags
);
3335 * Runs a thread request on a thread
3337 * - if thread is THREAD_NULL, will find a thread and run the request there.
3338 * Otherwise, the thread must be the current thread.
3340 * - if req is NULL, will find the highest priority request and run that. If
3341 * it is not NULL, it must be a threadreq object in state NEW. If it can not
3342 * be run immediately, it will be enqueued and moved to state QUEUED.
3344 * Either way, the thread request object serviced will be moved to state
3345 * BINDING and attached to the uthread.
3347 * Should be called with the workqueue lock held. Will drop it.
3348 * Should be called with the base pri not frozen.
3350 __attribute__((noreturn
, noinline
))
3352 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p
, struct workqueue
*wq
,
3353 struct uthread
*uth
, uint32_t setup_flags
)
3355 if (uth
->uu_workq_flags
& UT_WORKQ_EARLY_BOUND
) {
3356 if (uth
->uu_workq_flags
& UT_WORKQ_NEW
) {
3357 setup_flags
|= WQ_SETUP_FIRST_USE
;
3359 uth
->uu_workq_flags
&= ~(UT_WORKQ_NEW
| UT_WORKQ_EARLY_BOUND
);
3361 * This pointer is possibly freed and only used for tracing purposes.
3363 workq_threadreq_t req
= uth
->uu_save
.uus_workq_park_data
.thread_request
;
3365 WQ_TRACE_WQ(TRACE_wq_thread_logical_run
| DBG_FUNC_START
, wq
,
3366 VM_KERNEL_ADDRHIDE(req
), 0, 0, 0);
3368 workq_setup_and_run(p
, uth
, setup_flags
);
3369 __builtin_unreachable();
3372 thread_freeze_base_pri(uth
->uu_thread
);
3373 workq_select_threadreq_or_park_and_unlock(p
, wq
, uth
, setup_flags
);
3377 workq_creator_should_yield(struct workqueue
*wq
, struct uthread
*uth
)
3379 thread_qos_t qos
= workq_pri_override(uth
->uu_workq_pri
);
3381 if (qos
>= THREAD_QOS_USER_INTERACTIVE
) {
3385 uint32_t snapshot
= uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
;
3386 if (wq
->wq_fulfilled
== snapshot
) {
3390 uint32_t cnt
= 0, conc
= wq_max_parallelism
[_wq_bucket(qos
)];
3391 if (wq
->wq_fulfilled
- snapshot
> conc
) {
3392 /* we fulfilled more than NCPU requests since being dispatched */
3393 WQ_TRACE_WQ(TRACE_wq_creator_yield
, wq
, 1,
3394 wq
->wq_fulfilled
, snapshot
, 0);
3398 for (int i
= _wq_bucket(qos
); i
< WORKQ_NUM_QOS_BUCKETS
; i
++) {
3399 cnt
+= wq
->wq_thscheduled_count
[i
];
3402 /* We fulfilled requests and have more than NCPU scheduled threads */
3403 WQ_TRACE_WQ(TRACE_wq_creator_yield
, wq
, 2,
3404 wq
->wq_fulfilled
, snapshot
, 0);
3412 * parked thread wakes up
3414 __attribute__((noreturn
, noinline
))
3416 workq_unpark_continue(void *parameter __unused
, wait_result_t wr __unused
)
3418 thread_t th
= current_thread();
3419 struct uthread
*uth
= get_bsdthread_info(th
);
3420 proc_t p
= current_proc();
3421 struct workqueue
*wq
= proc_get_wqptr_fast(p
);
3423 workq_lock_spin(wq
);
3425 if (wq
->wq_creator
== uth
&& workq_creator_should_yield(wq
, uth
)) {
3427 * If the number of threads we have out are able to keep up with the
3428 * demand, then we should avoid sending this creator thread to
3431 uth
->uu_save
.uus_workq_park_data
.fulfilled_snapshot
= wq
->wq_fulfilled
;
3432 uth
->uu_save
.uus_workq_park_data
.yields
++;
3434 thread_yield_with_continuation(workq_unpark_continue
, NULL
);
3435 __builtin_unreachable();
3438 if (__probable(uth
->uu_workq_flags
& UT_WORKQ_RUNNING
)) {
3439 workq_unpark_select_threadreq_or_park_and_unlock(p
, wq
, uth
, WQ_SETUP_NONE
);
3440 __builtin_unreachable();
3443 if (__probable(wr
== THREAD_AWAKENED
)) {
3445 * We were set running, but for the purposes of dying.
3447 assert(uth
->uu_workq_flags
& UT_WORKQ_DYING
);
3448 assert((uth
->uu_workq_flags
& UT_WORKQ_NEW
) == 0);
3451 * workaround for <rdar://problem/38647347>,
3452 * in case we do hit userspace, make sure calling
3453 * workq_thread_terminate() does the right thing here,
3454 * and if we never call it, that workq_exit() will too because it sees
3455 * this thread on the runlist.
3457 assert(wr
== THREAD_INTERRUPTED
);
3458 wq
->wq_thdying_count
++;
3459 uth
->uu_workq_flags
|= UT_WORKQ_DYING
;
3462 workq_unpark_for_death_and_unlock(p
, wq
, uth
,
3463 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE
, WQ_SETUP_NONE
);
3464 __builtin_unreachable();
3467 __attribute__((noreturn
, noinline
))
3469 workq_setup_and_run(proc_t p
, struct uthread
*uth
, int setup_flags
)
3471 thread_t th
= uth
->uu_thread
;
3472 vm_map_t vmap
= get_task_map(p
->task
);
3474 if (setup_flags
& WQ_SETUP_CLEAR_VOUCHER
) {
3476 * For preemption reasons, we want to reset the voucher as late as
3477 * possible, so we do it in two places:
3478 * - Just before parking (i.e. in workq_park_and_unlock())
3479 * - Prior to doing the setup for the next workitem (i.e. here)
3481 * Those two places are sufficient to ensure we always reset it before
3482 * it goes back out to user space, but be careful to not break that
3485 __assert_only kern_return_t kr
;
3486 kr
= thread_set_voucher_name(MACH_PORT_NULL
);
3487 assert(kr
== KERN_SUCCESS
);
3490 uint32_t upcall_flags
= uth
->uu_save
.uus_workq_park_data
.upcall_flags
;
3491 if (!(setup_flags
& WQ_SETUP_FIRST_USE
)) {
3492 upcall_flags
|= WQ_FLAG_THREAD_REUSE
;
3495 if (uth
->uu_workq_flags
& UT_WORKQ_OUTSIDE_QOS
) {
3497 * For threads that have an outside-of-QoS thread priority, indicate
3498 * to userspace that setting QoS should only affect the TSD and not
3499 * change QOS in the kernel.
3501 upcall_flags
|= WQ_FLAG_THREAD_OUTSIDEQOS
;
3504 * Put the QoS class value into the lower bits of the reuse_thread
3505 * register, this is where the thread priority used to be stored
3508 upcall_flags
|= uth
->uu_save
.uus_workq_park_data
.qos
|
3509 WQ_FLAG_THREAD_PRIO_QOS
;
3512 if (uth
->uu_workq_thport
== MACH_PORT_NULL
) {
3513 /* convert_thread_to_port() consumes a reference */
3514 thread_reference(th
);
3515 ipc_port_t port
= convert_thread_to_port(th
);
3516 uth
->uu_workq_thport
= ipc_port_copyout_send(port
, get_task_ipcspace(p
->task
));
3520 * Call out to pthread, this sets up the thread, pulls in kevent structs
3521 * onto the stack, sets up the thread state and then returns to userspace.
3523 WQ_TRACE_WQ(TRACE_wq_runthread
| DBG_FUNC_START
,
3524 proc_get_wqptr_fast(p
), 0, 0, 0, 0);
3525 thread_sched_call(th
, workq_sched_callback
);
3526 pthread_functions
->workq_setup_thread(p
, th
, vmap
, uth
->uu_workq_stackaddr
,
3527 uth
->uu_workq_thport
, 0, setup_flags
, upcall_flags
);
3529 __builtin_unreachable();
3535 fill_procworkqueue(proc_t p
, struct proc_workqueueinfo
* pwqinfo
)
3537 struct workqueue
*wq
= proc_get_wqptr(p
);
3546 * This is sometimes called from interrupt context by the kperf sampler.
3547 * In that case, it's not safe to spin trying to take the lock since we
3548 * might already hold it. So, we just try-lock it and error out if it's
3549 * already held. Since this is just a debugging aid, and all our callers
3550 * are able to handle an error, that's fine.
3552 bool locked
= workq_lock_try(wq
);
3557 wq_thactive_t act
= _wq_thactive(wq
);
3558 activecount
= _wq_thactive_aggregate_downto_qos(wq
, act
,
3559 WORKQ_THREAD_QOS_MIN
, NULL
, NULL
);
3560 if (act
& _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER
)) {
3563 pwqinfo
->pwq_nthreads
= wq
->wq_nthreads
;
3564 pwqinfo
->pwq_runthreads
= activecount
;
3565 pwqinfo
->pwq_blockedthreads
= wq
->wq_threads_scheduled
- activecount
;
3566 pwqinfo
->pwq_state
= 0;
3568 if (wq
->wq_constrained_threads_scheduled
>= wq_max_constrained_threads
) {
3569 pwqinfo
->pwq_state
|= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
;
3572 if (wq
->wq_nthreads
>= wq_max_threads
) {
3573 pwqinfo
->pwq_state
|= WQ_EXCEEDED_TOTAL_THREAD_LIMIT
;
3581 workqueue_get_pwq_exceeded(void *v
, boolean_t
*exceeded_total
,
3582 boolean_t
*exceeded_constrained
)
3585 struct proc_workqueueinfo pwqinfo
;
3589 assert(exceeded_total
!= NULL
);
3590 assert(exceeded_constrained
!= NULL
);
3592 err
= fill_procworkqueue(p
, &pwqinfo
);
3596 if (!(pwqinfo
.pwq_state
& WQ_FLAGS_AVAILABLE
)) {
3600 *exceeded_total
= (pwqinfo
.pwq_state
& WQ_EXCEEDED_TOTAL_THREAD_LIMIT
);
3601 *exceeded_constrained
= (pwqinfo
.pwq_state
& WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
);
3607 workqueue_get_pwq_state_kdp(void * v
)
3609 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
<< 17) ==
3610 kTaskWqExceededConstrainedThreadLimit
);
3611 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT
<< 17) ==
3612 kTaskWqExceededTotalThreadLimit
);
3613 static_assert((WQ_FLAGS_AVAILABLE
<< 17) == kTaskWqFlagsAvailable
);
3614 static_assert((WQ_FLAGS_AVAILABLE
| WQ_EXCEEDED_TOTAL_THREAD_LIMIT
|
3615 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
) == 0x7);
3622 struct workqueue
*wq
= proc_get_wqptr(p
);
3624 if (wq
== NULL
|| workq_lock_spin_is_acquired_kdp(wq
)) {
3628 uint32_t pwq_state
= WQ_FLAGS_AVAILABLE
;
3630 if (wq
->wq_constrained_threads_scheduled
>= wq_max_constrained_threads
) {
3631 pwq_state
|= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT
;
3634 if (wq
->wq_nthreads
>= wq_max_threads
) {
3635 pwq_state
|= WQ_EXCEEDED_TOTAL_THREAD_LIMIT
;
3644 workq_lck_grp_attr
= lck_grp_attr_alloc_init();
3645 workq_lck_attr
= lck_attr_alloc_init();
3646 workq_lck_grp
= lck_grp_alloc_init("workq", workq_lck_grp_attr
);
3648 workq_zone_workqueue
= zinit(sizeof(struct workqueue
),
3649 1024 * sizeof(struct workqueue
), 8192, "workq.wq");
3650 workq_zone_threadreq
= zinit(sizeof(struct workq_threadreq_s
),
3651 1024 * sizeof(struct workq_threadreq_s
), 8192, "workq.threadreq");
3653 clock_interval_to_absolutetime_interval(wq_stalled_window
.usecs
,
3654 NSEC_PER_USEC
, &wq_stalled_window
.abstime
);
3655 clock_interval_to_absolutetime_interval(wq_reduce_pool_window
.usecs
,
3656 NSEC_PER_USEC
, &wq_reduce_pool_window
.abstime
);
3657 clock_interval_to_absolutetime_interval(wq_max_timer_interval
.usecs
,
3658 NSEC_PER_USEC
, &wq_max_timer_interval
.abstime
);
3660 thread_deallocate_daemon_register_queue(&workq_deallocate_queue
,
3661 workq_deallocate_queue_invoke
);