]> git.saurik.com Git - apple/xnu.git/blob - bsd/pthread/pthread_workqueue.c
xnu-4903.270.47.tar.gz
[apple/xnu.git] / bsd / pthread / pthread_workqueue.c
1 /*
2 * Copyright (c) 2000-2017 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
29
30 #include <sys/cdefs.h>
31
32 // <rdar://problem/26158937> panic() should be marked noreturn
33 extern void panic(const char *string, ...) __printflike(1, 2) __dead2;
34
35 #include <kern/assert.h>
36 #include <kern/ast.h>
37 #include <kern/clock.h>
38 #include <kern/cpu_data.h>
39 #include <kern/kern_types.h>
40 #include <kern/policy_internal.h>
41 #include <kern/processor.h>
42 #include <kern/sched_prim.h> /* for thread_exception_return */
43 #include <kern/task.h>
44 #include <kern/thread.h>
45 #include <kern/zalloc.h>
46 #include <mach/kern_return.h>
47 #include <mach/mach_param.h>
48 #include <mach/mach_port.h>
49 #include <mach/mach_types.h>
50 #include <mach/mach_vm.h>
51 #include <mach/sync_policy.h>
52 #include <mach/task.h>
53 #include <mach/thread_act.h> /* for thread_resume */
54 #include <mach/thread_policy.h>
55 #include <mach/thread_status.h>
56 #include <mach/vm_prot.h>
57 #include <mach/vm_statistics.h>
58 #include <machine/atomic.h>
59 #include <machine/machine_routines.h>
60 #include <vm/vm_map.h>
61 #include <vm/vm_protos.h>
62
63 #include <sys/eventvar.h>
64 #include <sys/kdebug.h>
65 #include <sys/kernel.h>
66 #include <sys/lock.h>
67 #include <sys/param.h>
68 #include <sys/proc_info.h> /* for fill_procworkqueue */
69 #include <sys/proc_internal.h>
70 #include <sys/pthread_shims.h>
71 #include <sys/resourcevar.h>
72 #include <sys/signalvar.h>
73 #include <sys/sysctl.h>
74 #include <sys/sysproto.h>
75 #include <sys/systm.h>
76 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
77
78 #include <pthread/bsdthread_private.h>
79 #include <pthread/workqueue_syscalls.h>
80 #include <pthread/workqueue_internal.h>
81 #include <pthread/workqueue_trace.h>
82
83 #include <os/log.h>
84
85 extern thread_t port_name_to_thread(mach_port_name_t port_name); /* osfmk/kern/ipc_tt.h */
86
87 static void workq_unpark_continue(void *uth, wait_result_t wr) __dead2;
88 static void workq_schedule_creator(proc_t p, struct workqueue *wq, int flags);
89
90 static bool workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
91 workq_threadreq_t req);
92
93 static uint32_t workq_constrained_allowance(struct workqueue *wq,
94 thread_qos_t at_qos, struct uthread *uth, bool may_start_timer);
95
96 static bool workq_thread_is_busy(uint64_t cur_ts,
97 _Atomic uint64_t *lastblocked_tsp);
98
99 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS;
100
101 #pragma mark globals
102
103 struct workq_usec_var {
104 uint32_t usecs;
105 uint64_t abstime;
106 };
107
108 #define WORKQ_SYSCTL_USECS(var, init) \
109 static struct workq_usec_var var = { .usecs = init }; \
110 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
111 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
112 workq_sysctl_handle_usecs, "I", "")
113
114 static lck_grp_t *workq_lck_grp;
115 static lck_attr_t *workq_lck_attr;
116 static lck_grp_attr_t *workq_lck_grp_attr;
117 os_refgrp_decl(static, workq_refgrp, "workq", NULL);
118
119 static zone_t workq_zone_workqueue;
120 static zone_t workq_zone_threadreq;
121
122 WORKQ_SYSCTL_USECS(wq_stalled_window, WQ_STALLED_WINDOW_USECS);
123 WORKQ_SYSCTL_USECS(wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS);
124 WORKQ_SYSCTL_USECS(wq_max_timer_interval, WQ_MAX_TIMER_INTERVAL_USECS);
125 static uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS;
126 static uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8;
127 static uint32_t wq_init_constrained_limit = 1;
128 static uint16_t wq_death_max_load;
129 static uint32_t wq_max_parallelism[WORKQ_NUM_QOS_BUCKETS];
130
131 #pragma mark sysctls
132
133 static int
134 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
135 {
136 #pragma unused(arg2)
137 struct workq_usec_var *v = arg1;
138 int error = sysctl_handle_int(oidp, &v->usecs, 0, req);
139 if (error || !req->newptr) {
140 return error;
141 }
142 clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC,
143 &v->abstime);
144 return 0;
145 }
146
147 SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
148 &wq_max_threads, 0, "");
149
150 SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
151 &wq_max_constrained_threads, 0, "");
152
153 #pragma mark p_wqptr
154
155 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
156
157 static struct workqueue *
158 proc_get_wqptr_fast(struct proc *p)
159 {
160 return os_atomic_load(&p->p_wqptr, relaxed);
161 }
162
163 static struct workqueue *
164 proc_get_wqptr(struct proc *p)
165 {
166 struct workqueue *wq = proc_get_wqptr_fast(p);
167 return wq == WQPTR_IS_INITING_VALUE ? NULL : wq;
168 }
169
170 static void
171 proc_set_wqptr(struct proc *p, struct workqueue *wq)
172 {
173 wq = os_atomic_xchg(&p->p_wqptr, wq, release);
174 if (wq == WQPTR_IS_INITING_VALUE) {
175 proc_lock(p);
176 thread_wakeup(&p->p_wqptr);
177 proc_unlock(p);
178 }
179 }
180
181 static bool
182 proc_init_wqptr_or_wait(struct proc *p)
183 {
184 struct workqueue *wq;
185
186 proc_lock(p);
187 wq = p->p_wqptr;
188
189 if (wq == NULL) {
190 p->p_wqptr = WQPTR_IS_INITING_VALUE;
191 proc_unlock(p);
192 return true;
193 }
194
195 if (wq == WQPTR_IS_INITING_VALUE) {
196 assert_wait(&p->p_wqptr, THREAD_UNINT);
197 proc_unlock(p);
198 thread_block(THREAD_CONTINUE_NULL);
199 } else {
200 proc_unlock(p);
201 }
202 return false;
203 }
204
205 static inline event_t
206 workq_parked_wait_event(struct uthread *uth)
207 {
208 return (event_t)&uth->uu_workq_stackaddr;
209 }
210
211 static inline void
212 workq_thread_wakeup(struct uthread *uth)
213 {
214 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) == 0) {
215 thread_wakeup_thread(workq_parked_wait_event(uth), uth->uu_thread);
216 }
217 }
218
219 #pragma mark wq_thactive
220
221 #if defined(__LP64__)
222 // Layout is:
223 // 127 - 115 : 13 bits of zeroes
224 // 114 - 112 : best QoS among all pending constrained requests
225 // 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
226 #define WQ_THACTIVE_BUCKET_WIDTH 16
227 #define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH)
228 #else
229 // Layout is:
230 // 63 - 61 : best QoS among all pending constrained requests
231 // 60 : Manager bucket (0 or 1)
232 // 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
233 #define WQ_THACTIVE_BUCKET_WIDTH 10
234 #define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
235 #endif
236 #define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
237 #define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
238
239 static_assert(sizeof(wq_thactive_t) * CHAR_BIT - WQ_THACTIVE_QOS_SHIFT >= 3,
240 "Make sure we have space to encode a QoS");
241
242 static inline wq_thactive_t
243 _wq_thactive(struct workqueue *wq)
244 {
245 return os_atomic_load(&wq->wq_thactive, relaxed);
246 }
247
248 static inline int
249 _wq_bucket(thread_qos_t qos)
250 {
251 // Map both BG and MT to the same bucket by over-shifting down and
252 // clamping MT and BG together.
253 switch (qos) {
254 case THREAD_QOS_MAINTENANCE:
255 return 0;
256 default:
257 return qos - 2;
258 }
259 }
260
261 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
262 ((tha) >> WQ_THACTIVE_QOS_SHIFT)
263
264 static inline thread_qos_t
265 _wq_thactive_best_constrained_req_qos(struct workqueue *wq)
266 {
267 // Avoid expensive atomic operations: the three bits we're loading are in
268 // a single byte, and always updated under the workqueue lock
269 wq_thactive_t v = *(wq_thactive_t *)&wq->wq_thactive;
270 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v);
271 }
272
273 static void
274 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue *wq)
275 {
276 thread_qos_t old_qos, new_qos;
277 workq_threadreq_t req;
278
279 req = priority_queue_max(&wq->wq_constrained_queue,
280 struct workq_threadreq_s, tr_entry);
281 new_qos = req ? req->tr_qos : THREAD_QOS_UNSPECIFIED;
282 old_qos = _wq_thactive_best_constrained_req_qos(wq);
283 if (old_qos != new_qos) {
284 long delta = (long)new_qos - (long)old_qos;
285 wq_thactive_t v = (wq_thactive_t)delta << WQ_THACTIVE_QOS_SHIFT;
286 /*
287 * We can do an atomic add relative to the initial load because updates
288 * to this qos are always serialized under the workqueue lock.
289 */
290 v = os_atomic_add(&wq->wq_thactive, v, relaxed);
291 #ifdef __LP64__
292 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, (uint64_t)v,
293 (uint64_t)(v >> 64), 0, 0);
294 #else
295 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, v, 0, 0, 0);
296 #endif
297 }
298 }
299
300 static inline wq_thactive_t
301 _wq_thactive_offset_for_qos(thread_qos_t qos)
302 {
303 return (wq_thactive_t)1 << (_wq_bucket(qos) * WQ_THACTIVE_BUCKET_WIDTH);
304 }
305
306 static inline wq_thactive_t
307 _wq_thactive_inc(struct workqueue *wq, thread_qos_t qos)
308 {
309 wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
310 return os_atomic_add_orig(&wq->wq_thactive, v, relaxed);
311 }
312
313 static inline wq_thactive_t
314 _wq_thactive_dec(struct workqueue *wq, thread_qos_t qos)
315 {
316 wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
317 return os_atomic_sub_orig(&wq->wq_thactive, v, relaxed);
318 }
319
320 static inline void
321 _wq_thactive_move(struct workqueue *wq,
322 thread_qos_t old_qos, thread_qos_t new_qos)
323 {
324 wq_thactive_t v = _wq_thactive_offset_for_qos(new_qos) -
325 _wq_thactive_offset_for_qos(old_qos);
326 os_atomic_add_orig(&wq->wq_thactive, v, relaxed);
327 wq->wq_thscheduled_count[_wq_bucket(old_qos)]--;
328 wq->wq_thscheduled_count[_wq_bucket(new_qos)]++;
329 }
330
331 static inline uint32_t
332 _wq_thactive_aggregate_downto_qos(struct workqueue *wq, wq_thactive_t v,
333 thread_qos_t qos, uint32_t *busycount, uint32_t *max_busycount)
334 {
335 uint32_t count = 0, active;
336 uint64_t curtime;
337
338 assert(WORKQ_THREAD_QOS_MIN <= qos && qos <= WORKQ_THREAD_QOS_MAX);
339
340 if (busycount) {
341 curtime = mach_absolute_time();
342 *busycount = 0;
343 }
344 if (max_busycount) {
345 *max_busycount = THREAD_QOS_LAST - qos;
346 }
347
348 int i = _wq_bucket(qos);
349 v >>= i * WQ_THACTIVE_BUCKET_WIDTH;
350 for (; i < WORKQ_NUM_QOS_BUCKETS; i++, v >>= WQ_THACTIVE_BUCKET_WIDTH) {
351 active = v & WQ_THACTIVE_BUCKET_MASK;
352 count += active;
353
354 if (busycount && wq->wq_thscheduled_count[i] > active) {
355 if (workq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i])) {
356 /*
357 * We only consider the last blocked thread for a given bucket
358 * as busy because we don't want to take the list lock in each
359 * sched callback. However this is an approximation that could
360 * contribute to thread creation storms.
361 */
362 (*busycount)++;
363 }
364 }
365 }
366
367 return count;
368 }
369
370 #pragma mark wq_flags
371
372 static inline uint32_t
373 _wq_flags(struct workqueue *wq)
374 {
375 return os_atomic_load(&wq->wq_flags, relaxed);
376 }
377
378 static inline bool
379 _wq_exiting(struct workqueue *wq)
380 {
381 return _wq_flags(wq) & WQ_EXITING;
382 }
383
384 bool
385 workq_is_exiting(struct proc *p)
386 {
387 struct workqueue *wq = proc_get_wqptr(p);
388 return !wq || _wq_exiting(wq);
389 }
390
391 struct turnstile *
392 workq_turnstile(struct proc *p)
393 {
394 struct workqueue *wq = proc_get_wqptr(p);
395 return wq ? wq->wq_turnstile : TURNSTILE_NULL;
396 }
397
398 #pragma mark workqueue lock
399
400 static bool
401 workq_lock_spin_is_acquired_kdp(struct workqueue *wq)
402 {
403 return kdp_lck_spin_is_acquired(&wq->wq_lock);
404 }
405
406 static inline void
407 workq_lock_spin(struct workqueue *wq)
408 {
409 lck_spin_lock_grp(&wq->wq_lock, workq_lck_grp);
410 }
411
412 static inline void
413 workq_lock_held(__assert_only struct workqueue *wq)
414 {
415 LCK_SPIN_ASSERT(&wq->wq_lock, LCK_ASSERT_OWNED);
416 }
417
418 static inline bool
419 workq_lock_try(struct workqueue *wq)
420 {
421 return lck_spin_try_lock_grp(&wq->wq_lock, workq_lck_grp);
422 }
423
424 static inline void
425 workq_unlock(struct workqueue *wq)
426 {
427 lck_spin_unlock(&wq->wq_lock);
428 }
429
430 #pragma mark idle thread lists
431
432 #define WORKQ_POLICY_INIT(qos) \
433 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
434
435 static inline thread_qos_t
436 workq_pri_bucket(struct uu_workq_policy req)
437 {
438 return MAX(MAX(req.qos_req, req.qos_max), req.qos_override);
439 }
440
441 static inline thread_qos_t
442 workq_pri_override(struct uu_workq_policy req)
443 {
444 return MAX(workq_pri_bucket(req), req.qos_bucket);
445 }
446
447 static inline bool
448 workq_thread_needs_params_change(workq_threadreq_t req, struct uthread *uth)
449 {
450 workq_threadreq_param_t cur_trp, req_trp = { };
451
452 cur_trp.trp_value = uth->uu_save.uus_workq_park_data.workloop_params;
453 if (req->tr_flags & TR_FLAG_WL_PARAMS) {
454 req_trp = kqueue_threadreq_workloop_param(req);
455 }
456
457 /*
458 * CPU percent flags are handled separately to policy changes, so ignore
459 * them for all of these checks.
460 */
461 uint16_t cur_flags = (cur_trp.trp_flags & ~TRP_CPUPERCENT);
462 uint16_t req_flags = (req_trp.trp_flags & ~TRP_CPUPERCENT);
463
464 if (!req_flags && !cur_flags) {
465 return false;
466 }
467
468 if (req_flags != cur_flags) {
469 return true;
470 }
471
472 if ((req_flags & TRP_PRIORITY) && req_trp.trp_pri != cur_trp.trp_pri) {
473 return true;
474 }
475
476 if ((req_flags & TRP_POLICY) && cur_trp.trp_pol != cur_trp.trp_pol) {
477 return true;
478 }
479
480 return false;
481 }
482
483 static inline bool
484 workq_thread_needs_priority_change(workq_threadreq_t req, struct uthread *uth)
485 {
486 if (workq_thread_needs_params_change(req, uth)) {
487 return true;
488 }
489
490 return req->tr_qos != workq_pri_override(uth->uu_workq_pri);
491 }
492
493 static void
494 workq_thread_update_bucket(proc_t p, struct workqueue *wq, struct uthread *uth,
495 struct uu_workq_policy old_pri, struct uu_workq_policy new_pri,
496 bool force_run)
497 {
498 thread_qos_t old_bucket = old_pri.qos_bucket;
499 thread_qos_t new_bucket = workq_pri_bucket(new_pri);
500
501 if (old_bucket != new_bucket) {
502 _wq_thactive_move(wq, old_bucket, new_bucket);
503 }
504
505 new_pri.qos_bucket = new_bucket;
506 uth->uu_workq_pri = new_pri;
507
508 if (workq_pri_override(old_pri) != new_bucket) {
509 thread_set_workq_override(uth->uu_thread, new_bucket);
510 }
511
512 if (wq->wq_reqcount && (old_bucket > new_bucket || force_run)) {
513 int flags = WORKQ_THREADREQ_CAN_CREATE_THREADS;
514 if (old_bucket > new_bucket) {
515 /*
516 * When lowering our bucket, we may unblock a thread request,
517 * but we can't drop our priority before we have evaluated
518 * whether this is the case, and if we ever drop the workqueue lock
519 * that would cause a priority inversion.
520 *
521 * We hence have to disallow thread creation in that case.
522 */
523 flags = 0;
524 }
525 workq_schedule_creator(p, wq, flags);
526 }
527 }
528
529 /*
530 * Sets/resets the cpu percent limits on the current thread. We can't set
531 * these limits from outside of the current thread, so this function needs
532 * to be called when we're executing on the intended
533 */
534 static void
535 workq_thread_reset_cpupercent(workq_threadreq_t req, struct uthread *uth)
536 {
537 assert(uth == current_uthread());
538 workq_threadreq_param_t trp = { };
539
540 if (req && (req->tr_flags & TR_FLAG_WL_PARAMS)) {
541 trp = kqueue_threadreq_workloop_param(req);
542 }
543
544 if (uth->uu_workq_flags & UT_WORKQ_CPUPERCENT) {
545 /*
546 * Going through disable when we have an existing CPU percent limit
547 * set will force the ledger to refill the token bucket of the current
548 * thread. Removing any penalty applied by previous thread use.
549 */
550 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE, 0, 0);
551 uth->uu_workq_flags &= ~UT_WORKQ_CPUPERCENT;
552 }
553
554 if (trp.trp_flags & TRP_CPUPERCENT) {
555 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK, trp.trp_cpupercent,
556 (uint64_t)trp.trp_refillms * NSEC_PER_SEC);
557 uth->uu_workq_flags |= UT_WORKQ_CPUPERCENT;
558 }
559 }
560
561 static void
562 workq_thread_reset_pri(struct workqueue *wq, struct uthread *uth,
563 workq_threadreq_t req)
564 {
565 thread_t th = uth->uu_thread;
566 thread_qos_t qos = req ? req->tr_qos : WORKQ_THREAD_QOS_CLEANUP;
567 workq_threadreq_param_t trp = { };
568 int priority = 31;
569 int policy = POLICY_TIMESHARE;
570
571 if (req && (req->tr_flags & TR_FLAG_WL_PARAMS)) {
572 trp = kqueue_threadreq_workloop_param(req);
573 }
574
575 uth->uu_workq_pri = WORKQ_POLICY_INIT(qos);
576 uth->uu_workq_flags &= ~UT_WORKQ_OUTSIDE_QOS;
577 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
578
579 // qos sent out to userspace (may differ from uu_workq_pri on param threads)
580 uth->uu_save.uus_workq_park_data.qos = qos;
581
582 if (qos == WORKQ_THREAD_QOS_MANAGER) {
583 uint32_t mgr_pri = wq->wq_event_manager_priority;
584 assert(trp.trp_value == 0); // manager qos and thread policy don't mix
585
586 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
587 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
588 thread_set_workq_pri(th, THREAD_QOS_UNSPECIFIED, mgr_pri,
589 POLICY_TIMESHARE);
590 return;
591 }
592
593 qos = _pthread_priority_thread_qos(mgr_pri);
594 } else {
595 if (trp.trp_flags & TRP_PRIORITY) {
596 qos = THREAD_QOS_UNSPECIFIED;
597 priority = trp.trp_pri;
598 uth->uu_workq_flags |= UT_WORKQ_OUTSIDE_QOS;
599 }
600
601 if (trp.trp_flags & TRP_POLICY) {
602 policy = trp.trp_pol;
603 }
604 }
605
606 thread_set_workq_pri(th, qos, priority, policy);
607 }
608
609 /*
610 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
611 * every time a servicer is being told about a new max QoS.
612 */
613 void
614 workq_thread_set_max_qos(struct proc *p, struct kqrequest *kqr)
615 {
616 struct uu_workq_policy old_pri, new_pri;
617 struct uthread *uth = get_bsdthread_info(kqr->kqr_thread);
618 struct workqueue *wq = proc_get_wqptr_fast(p);
619 thread_qos_t qos = kqr->kqr_qos_index;
620
621 if (uth->uu_workq_pri.qos_max == qos) {
622 return;
623 }
624
625 workq_lock_spin(wq);
626 old_pri = new_pri = uth->uu_workq_pri;
627 new_pri.qos_max = qos;
628 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
629 workq_unlock(wq);
630 }
631
632 #pragma mark idle threads accounting and handling
633
634 static inline struct uthread *
635 workq_oldest_killable_idle_thread(struct workqueue *wq)
636 {
637 struct uthread *uth = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
638
639 if (uth && !uth->uu_save.uus_workq_park_data.has_stack) {
640 uth = TAILQ_PREV(uth, workq_uthread_head, uu_workq_entry);
641 if (uth) {
642 assert(uth->uu_save.uus_workq_park_data.has_stack);
643 }
644 }
645 return uth;
646 }
647
648 static inline uint64_t
649 workq_kill_delay_for_idle_thread(struct workqueue *wq)
650 {
651 uint64_t delay = wq_reduce_pool_window.abstime;
652 uint16_t idle = wq->wq_thidlecount;
653
654 /*
655 * If we have less than wq_death_max_load threads, have a 5s timer.
656 *
657 * For the next wq_max_constrained_threads ones, decay linearly from
658 * from 5s to 50ms.
659 */
660 if (idle <= wq_death_max_load) {
661 return delay;
662 }
663
664 if (wq_max_constrained_threads > idle - wq_death_max_load) {
665 delay *= (wq_max_constrained_threads - (idle - wq_death_max_load));
666 }
667 return delay / wq_max_constrained_threads;
668 }
669
670 static inline bool
671 workq_should_kill_idle_thread(struct workqueue *wq, struct uthread *uth,
672 uint64_t now)
673 {
674 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
675 return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay;
676 }
677
678 static void
679 workq_death_call_schedule(struct workqueue *wq, uint64_t deadline)
680 {
681 uint32_t wq_flags = os_atomic_load(&wq->wq_flags, relaxed);
682
683 if (wq_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) {
684 return;
685 }
686 os_atomic_or(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
687
688 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_NONE, wq, 1, 0, 0, 0);
689
690 /*
691 * <rdar://problem/13139182> Due to how long term timers work, the leeway
692 * can't be too short, so use 500ms which is long enough that we will not
693 * wake up the CPU for killing threads, but short enough that it doesn't
694 * fall into long-term timer list shenanigans.
695 */
696 thread_call_enter_delayed_with_leeway(wq->wq_death_call, NULL, deadline,
697 wq_reduce_pool_window.abstime / 10,
698 THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND);
699 }
700
701 /*
702 * `decrement` is set to the number of threads that are no longer dying:
703 * - because they have been resuscitated just in time (workq_pop_idle_thread)
704 * - or have been killed (workq_thread_terminate).
705 */
706 static void
707 workq_death_policy_evaluate(struct workqueue *wq, uint16_t decrement)
708 {
709 struct uthread *uth;
710
711 assert(wq->wq_thdying_count >= decrement);
712 if ((wq->wq_thdying_count -= decrement) > 0) {
713 return;
714 }
715
716 if (wq->wq_thidlecount <= 1) {
717 return;
718 }
719
720 if ((uth = workq_oldest_killable_idle_thread(wq)) == NULL) {
721 return;
722 }
723
724 uint64_t now = mach_absolute_time();
725 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
726
727 if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) {
728 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
729 wq, wq->wq_thidlecount, 0, 0, 0);
730 wq->wq_thdying_count++;
731 uth->uu_workq_flags |= UT_WORKQ_DYING;
732 workq_thread_wakeup(uth);
733 return;
734 }
735
736 workq_death_call_schedule(wq,
737 uth->uu_save.uus_workq_park_data.idle_stamp + delay);
738 }
739
740 void
741 workq_thread_terminate(struct proc *p, struct uthread *uth)
742 {
743 struct workqueue *wq = proc_get_wqptr_fast(p);
744
745 workq_lock_spin(wq);
746 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
747 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
748 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_END,
749 wq, wq->wq_thidlecount, 0, 0, 0);
750 workq_death_policy_evaluate(wq, 1);
751 }
752 if (wq->wq_nthreads-- == wq_max_threads) {
753 /*
754 * We got under the thread limit again, which may have prevented
755 * thread creation from happening, redrive if there are pending requests
756 */
757 if (wq->wq_reqcount) {
758 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
759 }
760 }
761 workq_unlock(wq);
762
763 thread_deallocate(uth->uu_thread);
764 }
765
766 static void
767 workq_kill_old_threads_call(void *param0, void *param1 __unused)
768 {
769 struct workqueue *wq = param0;
770
771 workq_lock_spin(wq);
772 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_START, wq, 0, 0, 0, 0);
773 os_atomic_and(&wq->wq_flags, ~WQ_DEATH_CALL_SCHEDULED, relaxed);
774 workq_death_policy_evaluate(wq, 0);
775 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_END, wq, 0, 0, 0, 0);
776 workq_unlock(wq);
777 }
778
779 static struct uthread *
780 workq_pop_idle_thread(struct workqueue *wq)
781 {
782 struct uthread *uth;
783
784 if ((uth = TAILQ_FIRST(&wq->wq_thidlelist))) {
785 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
786 } else {
787 uth = TAILQ_FIRST(&wq->wq_thnewlist);
788 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
789 }
790 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
791
792 assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0);
793 uth->uu_workq_flags |= UT_WORKQ_RUNNING | UT_WORKQ_OVERCOMMIT;
794 wq->wq_threads_scheduled++;
795 wq->wq_thidlecount--;
796
797 if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) {
798 uth->uu_workq_flags ^= UT_WORKQ_DYING;
799 workq_death_policy_evaluate(wq, 1);
800 }
801 return uth;
802 }
803
804 /*
805 * Called by thread_create_workq_waiting() during thread initialization, before
806 * assert_wait, before the thread has been started.
807 */
808 event_t
809 workq_thread_init_and_wq_lock(task_t task, thread_t th)
810 {
811 struct uthread *uth = get_bsdthread_info(th);
812
813 uth->uu_workq_flags = UT_WORKQ_NEW;
814 uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY);
815 uth->uu_workq_thport = MACH_PORT_NULL;
816 uth->uu_workq_stackaddr = 0;
817
818 thread_set_tag(th, THREAD_TAG_PTHREAD | THREAD_TAG_WORKQUEUE);
819 thread_reset_workq_qos(th, THREAD_QOS_LEGACY);
820
821 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task)));
822 return workq_parked_wait_event(uth);
823 }
824
825 /**
826 * Try to add a new workqueue thread.
827 *
828 * - called with workq lock held
829 * - dropped and retaken around thread creation
830 * - return with workq lock held
831 */
832 static bool
833 workq_add_new_idle_thread(proc_t p, struct workqueue *wq)
834 {
835 mach_vm_offset_t th_stackaddr;
836 kern_return_t kret;
837 thread_t th;
838
839 wq->wq_nthreads++;
840
841 workq_unlock(wq);
842
843 vm_map_t vmap = get_task_map(p->task);
844
845 kret = pthread_functions->workq_create_threadstack(p, vmap, &th_stackaddr);
846 if (kret != KERN_SUCCESS) {
847 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
848 kret, 1, 0, 0);
849 goto out;
850 }
851
852 kret = thread_create_workq_waiting(p->task, workq_unpark_continue, &th);
853 if (kret != KERN_SUCCESS) {
854 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
855 kret, 0, 0, 0);
856 pthread_functions->workq_destroy_threadstack(p, vmap, th_stackaddr);
857 goto out;
858 }
859
860 // thread_create_workq_waiting() will return with the wq lock held
861 // on success, because it calls workq_thread_init_and_wq_lock() above
862
863 struct uthread *uth = get_bsdthread_info(th);
864
865 wq->wq_creations++;
866 wq->wq_thidlecount++;
867 uth->uu_workq_stackaddr = th_stackaddr;
868 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
869
870 WQ_TRACE_WQ(TRACE_wq_thread_create | DBG_FUNC_NONE, wq, 0, 0, 0, 0);
871 return true;
872
873 out:
874 workq_lock_spin(wq);
875 /*
876 * Do not redrive here if we went under wq_max_threads again,
877 * it is the responsibility of the callers of this function
878 * to do so when it fails.
879 */
880 wq->wq_nthreads--;
881 return false;
882 }
883
884 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
885
886 __attribute__((noreturn, noinline))
887 static void
888 workq_unpark_for_death_and_unlock(proc_t p, struct workqueue *wq,
889 struct uthread *uth, uint32_t death_flags)
890 {
891 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
892 bool first_use = uth->uu_workq_flags & UT_WORKQ_NEW;
893
894 if (qos > WORKQ_THREAD_QOS_CLEANUP) {
895 workq_thread_reset_pri(wq, uth, NULL);
896 qos = WORKQ_THREAD_QOS_CLEANUP;
897 }
898
899 workq_thread_reset_cpupercent(NULL, uth);
900
901 if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) {
902 wq->wq_thidlecount--;
903 if (first_use) {
904 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
905 } else {
906 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
907 }
908 }
909 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
910
911 workq_unlock(wq);
912
913 uint32_t flags = WQ_FLAG_THREAD_NEWSPI | qos | WQ_FLAG_THREAD_PRIO_QOS;
914 uint32_t setup_flags = WQ_SETUP_EXIT_THREAD;
915 thread_t th = uth->uu_thread;
916 vm_map_t vmap = get_task_map(p->task);
917
918 if (!first_use) {
919 flags |= WQ_FLAG_THREAD_REUSE;
920 }
921
922 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
923 uth->uu_workq_thport, 0, setup_flags, flags);
924 __builtin_unreachable();
925 }
926
927 bool
928 workq_is_current_thread_updating_turnstile(struct workqueue *wq)
929 {
930 return wq->wq_turnstile_updater == current_thread();
931 }
932
933 __attribute__((always_inline))
934 static inline void
935 workq_perform_turnstile_operation_locked(struct workqueue *wq,
936 void (^operation)(void))
937 {
938 workq_lock_held(wq);
939 wq->wq_turnstile_updater = current_thread();
940 operation();
941 wq->wq_turnstile_updater = THREAD_NULL;
942 }
943
944 static void
945 workq_turnstile_update_inheritor(struct workqueue *wq,
946 turnstile_inheritor_t inheritor,
947 turnstile_update_flags_t flags)
948 {
949 workq_perform_turnstile_operation_locked(wq, ^{
950 turnstile_update_inheritor(wq->wq_turnstile, inheritor,
951 flags | TURNSTILE_IMMEDIATE_UPDATE);
952 turnstile_update_inheritor_complete(wq->wq_turnstile,
953 TURNSTILE_INTERLOCK_HELD);
954 });
955 }
956
957 static void
958 workq_push_idle_thread(proc_t p, struct workqueue *wq, struct uthread *uth)
959 {
960 uint64_t now = mach_absolute_time();
961
962 uth->uu_workq_flags &= ~UT_WORKQ_RUNNING;
963 if ((uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) == 0) {
964 wq->wq_constrained_threads_scheduled--;
965 }
966 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
967 wq->wq_threads_scheduled--;
968
969 if (wq->wq_creator == uth) {
970 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 3, 0,
971 uth->uu_save.uus_workq_park_data.yields, 0);
972 wq->wq_creator = NULL;
973 if (wq->wq_reqcount) {
974 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
975 } else {
976 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
977 }
978 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
979 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
980 wq->wq_thidlecount++;
981 return;
982 }
983 } else {
984 _wq_thactive_dec(wq, uth->uu_workq_pri.qos_bucket);
985 wq->wq_thscheduled_count[_wq_bucket(uth->uu_workq_pri.qos_bucket)]--;
986 assert(!(uth->uu_workq_flags & UT_WORKQ_NEW));
987 uth->uu_workq_flags |= UT_WORKQ_IDLE_CLEANUP;
988 }
989
990 uth->uu_save.uus_workq_park_data.idle_stamp = now;
991
992 struct uthread *oldest = workq_oldest_killable_idle_thread(wq);
993 uint16_t cur_idle = wq->wq_thidlecount;
994
995 if (cur_idle >= wq_max_constrained_threads ||
996 (wq->wq_thdying_count == 0 && oldest &&
997 workq_should_kill_idle_thread(wq, oldest, now))) {
998 /*
999 * Immediately kill threads if we have too may of them.
1000 *
1001 * And swap "place" with the oldest one we'd have woken up.
1002 * This is a relatively desperate situation where we really
1003 * need to kill threads quickly and it's best to kill
1004 * the one that's currently on core than context switching.
1005 */
1006 if (oldest) {
1007 oldest->uu_save.uus_workq_park_data.idle_stamp = now;
1008 TAILQ_REMOVE(&wq->wq_thidlelist, oldest, uu_workq_entry);
1009 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, oldest, uu_workq_entry);
1010 }
1011
1012 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
1013 wq, cur_idle, 0, 0, 0);
1014 wq->wq_thdying_count++;
1015 uth->uu_workq_flags |= UT_WORKQ_DYING;
1016 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
1017 workq_unpark_for_death_and_unlock(p, wq, uth, 0);
1018 __builtin_unreachable();
1019 }
1020
1021 struct uthread *tail = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
1022
1023 cur_idle += 1;
1024 wq->wq_thidlecount = cur_idle;
1025
1026 if (cur_idle >= wq_death_max_load && tail &&
1027 tail->uu_save.uus_workq_park_data.has_stack) {
1028 uth->uu_save.uus_workq_park_data.has_stack = false;
1029 TAILQ_INSERT_TAIL(&wq->wq_thidlelist, uth, uu_workq_entry);
1030 } else {
1031 uth->uu_save.uus_workq_park_data.has_stack = true;
1032 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, uth, uu_workq_entry);
1033 }
1034
1035 if (!tail) {
1036 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
1037 workq_death_call_schedule(wq, now + delay);
1038 }
1039 }
1040
1041 #pragma mark thread requests
1042
1043 static inline int
1044 workq_priority_for_req(workq_threadreq_t req)
1045 {
1046 thread_qos_t qos = req->tr_qos;
1047
1048 if (req->tr_flags & TR_FLAG_WL_OUTSIDE_QOS) {
1049 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
1050 assert(trp.trp_flags & TRP_PRIORITY);
1051 return trp.trp_pri;
1052 }
1053 return thread_workq_pri_for_qos(qos);
1054 }
1055
1056 static inline struct priority_queue *
1057 workq_priority_queue_for_req(struct workqueue *wq, workq_threadreq_t req)
1058 {
1059 if (req->tr_flags & TR_FLAG_WL_OUTSIDE_QOS) {
1060 return &wq->wq_special_queue;
1061 } else if (req->tr_flags & TR_FLAG_OVERCOMMIT) {
1062 return &wq->wq_overcommit_queue;
1063 } else {
1064 return &wq->wq_constrained_queue;
1065 }
1066 }
1067
1068 /*
1069 * returns true if the the enqueued request is the highest priority item
1070 * in its priority queue.
1071 */
1072 static bool
1073 workq_threadreq_enqueue(struct workqueue *wq, workq_threadreq_t req)
1074 {
1075 assert(req->tr_state == TR_STATE_NEW);
1076
1077 req->tr_state = TR_STATE_QUEUED;
1078 wq->wq_reqcount += req->tr_count;
1079
1080 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1081 assert(wq->wq_event_manager_threadreq == NULL);
1082 assert(req->tr_flags & TR_FLAG_KEVENT);
1083 assert(req->tr_count == 1);
1084 wq->wq_event_manager_threadreq = req;
1085 return true;
1086 }
1087 if (priority_queue_insert(workq_priority_queue_for_req(wq, req),
1088 &req->tr_entry, workq_priority_for_req(req),
1089 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE)) {
1090 if ((req->tr_flags & TR_FLAG_OVERCOMMIT) == 0) {
1091 _wq_thactive_refresh_best_constrained_req_qos(wq);
1092 }
1093 return true;
1094 }
1095 return false;
1096 }
1097
1098 /*
1099 * returns true if the the dequeued request was the highest priority item
1100 * in its priority queue.
1101 */
1102 static bool
1103 workq_threadreq_dequeue(struct workqueue *wq, workq_threadreq_t req)
1104 {
1105 wq->wq_reqcount--;
1106
1107 if (--req->tr_count == 0) {
1108 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1109 assert(wq->wq_event_manager_threadreq == req);
1110 assert(req->tr_count == 0);
1111 wq->wq_event_manager_threadreq = NULL;
1112 return true;
1113 }
1114 if (priority_queue_remove(workq_priority_queue_for_req(wq, req),
1115 &req->tr_entry, PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE)) {
1116 if ((req->tr_flags & TR_FLAG_OVERCOMMIT) == 0) {
1117 _wq_thactive_refresh_best_constrained_req_qos(wq);
1118 }
1119 return true;
1120 }
1121 }
1122 return false;
1123 }
1124
1125 static void
1126 workq_threadreq_destroy(proc_t p, workq_threadreq_t req)
1127 {
1128 req->tr_state = TR_STATE_IDLE;
1129 if (req->tr_flags & (TR_FLAG_WORKLOOP | TR_FLAG_KEVENT)) {
1130 kqueue_threadreq_cancel(p, req);
1131 } else {
1132 zfree(workq_zone_threadreq, req);
1133 }
1134 }
1135
1136 /*
1137 * Mark a thread request as complete. At this point, it is treated as owned by
1138 * the submitting subsystem and you should assume it could be freed.
1139 *
1140 * Called with the workqueue lock held.
1141 */
1142 static void
1143 workq_threadreq_bind_and_unlock(proc_t p, struct workqueue *wq,
1144 workq_threadreq_t req, struct uthread *uth)
1145 {
1146 uint8_t tr_flags = req->tr_flags;
1147 bool needs_commit = false;
1148 int creator_flags = 0;
1149
1150 wq->wq_fulfilled++;
1151
1152 if (req->tr_state == TR_STATE_QUEUED) {
1153 workq_threadreq_dequeue(wq, req);
1154 creator_flags = WORKQ_THREADREQ_CAN_CREATE_THREADS;
1155 }
1156
1157 if (wq->wq_creator == uth) {
1158 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 4, 0,
1159 uth->uu_save.uus_workq_park_data.yields, 0);
1160 creator_flags = WORKQ_THREADREQ_CAN_CREATE_THREADS |
1161 WORKQ_THREADREQ_CREATOR_TRANSFER;
1162 wq->wq_creator = NULL;
1163 _wq_thactive_inc(wq, req->tr_qos);
1164 wq->wq_thscheduled_count[_wq_bucket(req->tr_qos)]++;
1165 } else if (uth->uu_workq_pri.qos_bucket != req->tr_qos) {
1166 _wq_thactive_move(wq, uth->uu_workq_pri.qos_bucket, req->tr_qos);
1167 }
1168 workq_thread_reset_pri(wq, uth, req);
1169
1170 if (tr_flags & TR_FLAG_OVERCOMMIT) {
1171 if ((uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) == 0) {
1172 uth->uu_workq_flags |= UT_WORKQ_OVERCOMMIT;
1173 wq->wq_constrained_threads_scheduled--;
1174 }
1175 } else {
1176 if ((uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) != 0) {
1177 uth->uu_workq_flags &= ~UT_WORKQ_OVERCOMMIT;
1178 wq->wq_constrained_threads_scheduled++;
1179 }
1180 }
1181
1182 if (tr_flags & (TR_FLAG_KEVENT | TR_FLAG_WORKLOOP)) {
1183 if (req->tr_state == TR_STATE_NEW) {
1184 /*
1185 * We're called from workq_kern_threadreq_initiate()
1186 * due to an unbind, with the kq req held.
1187 */
1188 assert(!creator_flags);
1189 req->tr_state = TR_STATE_IDLE;
1190 kqueue_threadreq_bind(p, req, uth->uu_thread, 0);
1191 } else {
1192 assert(req->tr_count == 0);
1193 workq_perform_turnstile_operation_locked(wq, ^{
1194 kqueue_threadreq_bind_prepost(p, req, uth->uu_thread);
1195 });
1196 needs_commit = true;
1197 }
1198 req = NULL;
1199 } else if (req->tr_count > 0) {
1200 req = NULL;
1201 }
1202
1203 if (creator_flags) {
1204 /* This can drop the workqueue lock, and take it again */
1205 workq_schedule_creator(p, wq, creator_flags);
1206 }
1207
1208 workq_unlock(wq);
1209
1210 if (req) {
1211 zfree(workq_zone_threadreq, req);
1212 }
1213 if (needs_commit) {
1214 kqueue_threadreq_bind_commit(p, uth->uu_thread);
1215 }
1216
1217 /*
1218 * Run Thread, Run!
1219 */
1220 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI;
1221 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
1222 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
1223 } else if (tr_flags & TR_FLAG_OVERCOMMIT) {
1224 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
1225 }
1226 if (tr_flags & TR_FLAG_KEVENT) {
1227 upcall_flags |= WQ_FLAG_THREAD_KEVENT;
1228 }
1229 if (tr_flags & TR_FLAG_WORKLOOP) {
1230 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
1231 }
1232 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
1233 }
1234
1235 #pragma mark workqueue thread creation thread calls
1236
1237 static inline bool
1238 workq_thread_call_prepost(struct workqueue *wq, uint32_t sched, uint32_t pend,
1239 uint32_t fail_mask)
1240 {
1241 uint32_t old_flags, new_flags;
1242
1243 os_atomic_rmw_loop(&wq->wq_flags, old_flags, new_flags, acquire, {
1244 if (__improbable(old_flags & (WQ_EXITING | sched | pend | fail_mask))) {
1245 os_atomic_rmw_loop_give_up(return false);
1246 }
1247 if (__improbable(old_flags & WQ_PROC_SUSPENDED)) {
1248 new_flags = old_flags | pend;
1249 } else {
1250 new_flags = old_flags | sched;
1251 }
1252 });
1253
1254 return (old_flags & WQ_PROC_SUSPENDED) == 0;
1255 }
1256
1257 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1258
1259 static bool
1260 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags)
1261 {
1262 assert(!preemption_enabled());
1263
1264 if (!workq_thread_call_prepost(wq, WQ_DELAYED_CALL_SCHEDULED,
1265 WQ_DELAYED_CALL_PENDED, WQ_IMMEDIATE_CALL_PENDED |
1266 WQ_IMMEDIATE_CALL_SCHEDULED)) {
1267 return false;
1268 }
1269
1270 uint64_t now = mach_absolute_time();
1271
1272 if (flags & WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART) {
1273 /* do not change the window */
1274 } else if (now - wq->wq_thread_call_last_run <= wq->wq_timer_interval) {
1275 wq->wq_timer_interval *= 2;
1276 if (wq->wq_timer_interval > wq_max_timer_interval.abstime) {
1277 wq->wq_timer_interval = wq_max_timer_interval.abstime;
1278 }
1279 } else if (now - wq->wq_thread_call_last_run > 2 * wq->wq_timer_interval) {
1280 wq->wq_timer_interval /= 2;
1281 if (wq->wq_timer_interval < wq_stalled_window.abstime) {
1282 wq->wq_timer_interval = wq_stalled_window.abstime;
1283 }
1284 }
1285
1286 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1287 _wq_flags(wq), wq->wq_timer_interval, 0);
1288
1289 thread_call_t call = wq->wq_delayed_call;
1290 uintptr_t arg = WQ_DELAYED_CALL_SCHEDULED;
1291 uint64_t deadline = now + wq->wq_timer_interval;
1292 if (thread_call_enter1_delayed(call, (void *)arg, deadline)) {
1293 panic("delayed_call was already enqueued");
1294 }
1295 return true;
1296 }
1297
1298 static void
1299 workq_schedule_immediate_thread_creation(struct workqueue *wq)
1300 {
1301 assert(!preemption_enabled());
1302
1303 if (workq_thread_call_prepost(wq, WQ_IMMEDIATE_CALL_SCHEDULED,
1304 WQ_IMMEDIATE_CALL_PENDED, 0)) {
1305 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1306 _wq_flags(wq), 0, 0);
1307
1308 uintptr_t arg = WQ_IMMEDIATE_CALL_SCHEDULED;
1309 if (thread_call_enter1(wq->wq_immediate_call, (void *)arg)) {
1310 panic("immediate_call was already enqueued");
1311 }
1312 }
1313 }
1314
1315 void
1316 workq_proc_suspended(struct proc *p)
1317 {
1318 struct workqueue *wq = proc_get_wqptr(p);
1319
1320 if (wq) {
1321 os_atomic_or(&wq->wq_flags, WQ_PROC_SUSPENDED, relaxed);
1322 }
1323 }
1324
1325 void
1326 workq_proc_resumed(struct proc *p)
1327 {
1328 struct workqueue *wq = proc_get_wqptr(p);
1329 uint32_t wq_flags;
1330
1331 if (!wq) {
1332 return;
1333 }
1334
1335 wq_flags = os_atomic_and_orig(&wq->wq_flags, ~(WQ_PROC_SUSPENDED |
1336 WQ_DELAYED_CALL_PENDED | WQ_IMMEDIATE_CALL_PENDED), relaxed);
1337 if ((wq_flags & WQ_EXITING) == 0) {
1338 disable_preemption();
1339 if (wq_flags & WQ_IMMEDIATE_CALL_PENDED) {
1340 workq_schedule_immediate_thread_creation(wq);
1341 } else if (wq_flags & WQ_DELAYED_CALL_PENDED) {
1342 workq_schedule_delayed_thread_creation(wq,
1343 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART);
1344 }
1345 enable_preemption();
1346 }
1347 }
1348
1349 /**
1350 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1351 */
1352 static bool
1353 workq_thread_is_busy(uint64_t now, _Atomic uint64_t *lastblocked_tsp)
1354 {
1355 uint64_t lastblocked_ts = os_atomic_load(lastblocked_tsp, relaxed);
1356 if (now <= lastblocked_ts) {
1357 /*
1358 * Because the update of the timestamp when a thread blocks
1359 * isn't serialized against us looking at it (i.e. we don't hold
1360 * the workq lock), it's possible to have a timestamp that matches
1361 * the current time or that even looks to be in the future relative
1362 * to when we grabbed the current time...
1363 *
1364 * Just treat this as a busy thread since it must have just blocked.
1365 */
1366 return true;
1367 }
1368 return (now - lastblocked_ts) < wq_stalled_window.abstime;
1369 }
1370
1371 static void
1372 workq_add_new_threads_call(void *_p, void *flags)
1373 {
1374 proc_t p = _p;
1375 struct workqueue *wq = proc_get_wqptr(p);
1376 uint32_t my_flag = (uint32_t)(uintptr_t)flags;
1377
1378 /*
1379 * workq_exit() will set the workqueue to NULL before
1380 * it cancels thread calls.
1381 */
1382 if (!wq) {
1383 return;
1384 }
1385
1386 assert((my_flag == WQ_DELAYED_CALL_SCHEDULED) ||
1387 (my_flag == WQ_IMMEDIATE_CALL_SCHEDULED));
1388
1389 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_START, wq, _wq_flags(wq),
1390 wq->wq_nthreads, wq->wq_thidlecount, 0);
1391
1392 workq_lock_spin(wq);
1393
1394 wq->wq_thread_call_last_run = mach_absolute_time();
1395 os_atomic_and(&wq->wq_flags, ~my_flag, release);
1396
1397 /* This can drop the workqueue lock, and take it again */
1398 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
1399
1400 workq_unlock(wq);
1401
1402 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_END, wq, 0,
1403 wq->wq_nthreads, wq->wq_thidlecount, 0);
1404 }
1405
1406 #pragma mark thread state tracking
1407
1408 static void
1409 workq_sched_callback(int type, thread_t thread)
1410 {
1411 struct uthread *uth = get_bsdthread_info(thread);
1412 proc_t proc = get_bsdtask_info(get_threadtask(thread));
1413 struct workqueue *wq = proc_get_wqptr(proc);
1414 thread_qos_t req_qos, qos = uth->uu_workq_pri.qos_bucket;
1415 wq_thactive_t old_thactive;
1416 bool start_timer = false;
1417
1418 if (qos == WORKQ_THREAD_QOS_MANAGER) {
1419 return;
1420 }
1421
1422 switch (type) {
1423 case SCHED_CALL_BLOCK:
1424 old_thactive = _wq_thactive_dec(wq, qos);
1425 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1426
1427 /*
1428 * Remember the timestamp of the last thread that blocked in this
1429 * bucket, it used used by admission checks to ignore one thread
1430 * being inactive if this timestamp is recent enough.
1431 *
1432 * If we collide with another thread trying to update the
1433 * last_blocked (really unlikely since another thread would have to
1434 * get scheduled and then block after we start down this path), it's
1435 * not a problem. Either timestamp is adequate, so no need to retry
1436 */
1437 os_atomic_store(&wq->wq_lastblocked_ts[_wq_bucket(qos)],
1438 thread_last_run_time(thread), relaxed);
1439
1440 if (req_qos == THREAD_QOS_UNSPECIFIED) {
1441 /*
1442 * No pending request at the moment we could unblock, move on.
1443 */
1444 } else if (qos < req_qos) {
1445 /*
1446 * The blocking thread is at a lower QoS than the highest currently
1447 * pending constrained request, nothing has to be redriven
1448 */
1449 } else {
1450 uint32_t max_busycount, old_req_count;
1451 old_req_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive,
1452 req_qos, NULL, &max_busycount);
1453 /*
1454 * If it is possible that may_start_constrained_thread had refused
1455 * admission due to being over the max concurrency, we may need to
1456 * spin up a new thread.
1457 *
1458 * We take into account the maximum number of busy threads
1459 * that can affect may_start_constrained_thread as looking at the
1460 * actual number may_start_constrained_thread will see is racy.
1461 *
1462 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1463 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1464 */
1465 uint32_t conc = wq_max_parallelism[_wq_bucket(qos)];
1466 if (old_req_count <= conc && conc <= old_req_count + max_busycount) {
1467 start_timer = workq_schedule_delayed_thread_creation(wq, 0);
1468 }
1469 }
1470 if (__improbable(kdebug_enable)) {
1471 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1472 old_thactive, qos, NULL, NULL);
1473 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_START, wq,
1474 old - 1, qos | (req_qos << 8),
1475 wq->wq_reqcount << 1 | start_timer, 0);
1476 }
1477 break;
1478
1479 case SCHED_CALL_UNBLOCK:
1480 /*
1481 * we cannot take the workqueue_lock here...
1482 * an UNBLOCK can occur from a timer event which
1483 * is run from an interrupt context... if the workqueue_lock
1484 * is already held by this processor, we'll deadlock...
1485 * the thread lock for the thread being UNBLOCKED
1486 * is also held
1487 */
1488 old_thactive = _wq_thactive_inc(wq, qos);
1489 if (__improbable(kdebug_enable)) {
1490 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1491 old_thactive, qos, NULL, NULL);
1492 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1493 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_END, wq,
1494 old + 1, qos | (req_qos << 8),
1495 wq->wq_threads_scheduled, 0);
1496 }
1497 break;
1498 }
1499 }
1500
1501 #pragma mark workq lifecycle
1502
1503 void
1504 workq_reference(struct workqueue *wq)
1505 {
1506 os_ref_retain(&wq->wq_refcnt);
1507 }
1508
1509 void
1510 workq_destroy(struct workqueue *wq)
1511 {
1512 struct turnstile *ts;
1513
1514 turnstile_complete((uintptr_t)wq, &wq->wq_turnstile, &ts);
1515 assert(ts);
1516 turnstile_cleanup();
1517 turnstile_deallocate(ts);
1518
1519 lck_spin_destroy(&wq->wq_lock, workq_lck_grp);
1520 zfree(workq_zone_workqueue, wq);
1521 }
1522
1523 static void
1524 workq_deallocate(struct workqueue *wq)
1525 {
1526 if (os_ref_release_relaxed(&wq->wq_refcnt) == 0) {
1527 workq_destroy(wq);
1528 }
1529 }
1530
1531 void
1532 workq_deallocate_safe(struct workqueue *wq)
1533 {
1534 if (__improbable(os_ref_release_relaxed(&wq->wq_refcnt) == 0)) {
1535 workq_deallocate_enqueue(wq);
1536 }
1537 }
1538
1539 /**
1540 * Setup per-process state for the workqueue.
1541 */
1542 int
1543 workq_open(struct proc *p, __unused struct workq_open_args *uap,
1544 __unused int32_t *retval)
1545 {
1546 struct workqueue *wq;
1547 int error = 0;
1548
1549 if ((p->p_lflag & P_LREGISTER) == 0) {
1550 return EINVAL;
1551 }
1552
1553 if (wq_init_constrained_limit) {
1554 uint32_t limit, num_cpus = ml_get_max_cpus();
1555
1556 /*
1557 * set up the limit for the constrained pool
1558 * this is a virtual pool in that we don't
1559 * maintain it on a separate idle and run list
1560 */
1561 limit = num_cpus * WORKQUEUE_CONSTRAINED_FACTOR;
1562
1563 if (limit > wq_max_constrained_threads) {
1564 wq_max_constrained_threads = limit;
1565 }
1566
1567 if (wq_max_threads > WQ_THACTIVE_BUCKET_HALF) {
1568 wq_max_threads = WQ_THACTIVE_BUCKET_HALF;
1569 }
1570 if (wq_max_threads > CONFIG_THREAD_MAX - 20) {
1571 wq_max_threads = CONFIG_THREAD_MAX - 20;
1572 }
1573
1574 wq_death_max_load = (uint16_t)fls(num_cpus) + 1;
1575
1576 for (thread_qos_t qos = WORKQ_THREAD_QOS_MIN; qos <= WORKQ_THREAD_QOS_MAX; qos++) {
1577 wq_max_parallelism[_wq_bucket(qos)] =
1578 qos_max_parallelism(qos, QOS_PARALLELISM_COUNT_LOGICAL);
1579 }
1580
1581 wq_init_constrained_limit = 0;
1582 }
1583
1584 if (proc_get_wqptr(p) == NULL) {
1585 if (proc_init_wqptr_or_wait(p) == FALSE) {
1586 assert(proc_get_wqptr(p) != NULL);
1587 goto out;
1588 }
1589
1590 wq = (struct workqueue *)zalloc(workq_zone_workqueue);
1591 bzero(wq, sizeof(struct workqueue));
1592
1593 os_ref_init_count(&wq->wq_refcnt, &workq_refgrp, 1);
1594
1595 // Start the event manager at the priority hinted at by the policy engine
1596 thread_qos_t mgr_priority_hint = task_get_default_manager_qos(current_task());
1597 pthread_priority_t pp = _pthread_priority_make_from_thread_qos(mgr_priority_hint, 0, 0);
1598 wq->wq_event_manager_priority = (uint32_t)pp;
1599 wq->wq_timer_interval = wq_stalled_window.abstime;
1600 wq->wq_proc = p;
1601 turnstile_prepare((uintptr_t)wq, &wq->wq_turnstile, turnstile_alloc(),
1602 TURNSTILE_WORKQS);
1603
1604 TAILQ_INIT(&wq->wq_thrunlist);
1605 TAILQ_INIT(&wq->wq_thnewlist);
1606 TAILQ_INIT(&wq->wq_thidlelist);
1607 priority_queue_init(&wq->wq_overcommit_queue,
1608 PRIORITY_QUEUE_BUILTIN_MAX_HEAP);
1609 priority_queue_init(&wq->wq_constrained_queue,
1610 PRIORITY_QUEUE_BUILTIN_MAX_HEAP);
1611 priority_queue_init(&wq->wq_special_queue,
1612 PRIORITY_QUEUE_BUILTIN_MAX_HEAP);
1613
1614 wq->wq_delayed_call = thread_call_allocate_with_options(
1615 workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL,
1616 THREAD_CALL_OPTIONS_ONCE);
1617 wq->wq_immediate_call = thread_call_allocate_with_options(
1618 workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL,
1619 THREAD_CALL_OPTIONS_ONCE);
1620 wq->wq_death_call = thread_call_allocate_with_options(
1621 workq_kill_old_threads_call, wq,
1622 THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE);
1623
1624 lck_spin_init(&wq->wq_lock, workq_lck_grp, workq_lck_attr);
1625
1626 WQ_TRACE_WQ(TRACE_wq_create | DBG_FUNC_NONE, wq,
1627 VM_KERNEL_ADDRHIDE(wq), 0, 0, 0);
1628 proc_set_wqptr(p, wq);
1629 }
1630 out:
1631
1632 return error;
1633 }
1634
1635 /*
1636 * Routine: workq_mark_exiting
1637 *
1638 * Function: Mark the work queue such that new threads will not be added to the
1639 * work queue after we return.
1640 *
1641 * Conditions: Called against the current process.
1642 */
1643 void
1644 workq_mark_exiting(struct proc *p)
1645 {
1646 struct workqueue *wq = proc_get_wqptr(p);
1647 uint32_t wq_flags;
1648 workq_threadreq_t mgr_req;
1649
1650 if (!wq) {
1651 return;
1652 }
1653
1654 WQ_TRACE_WQ(TRACE_wq_pthread_exit | DBG_FUNC_START, wq, 0, 0, 0, 0);
1655
1656 workq_lock_spin(wq);
1657
1658 wq_flags = os_atomic_or_orig(&wq->wq_flags, WQ_EXITING, relaxed);
1659 if (__improbable(wq_flags & WQ_EXITING)) {
1660 panic("workq_mark_exiting called twice");
1661 }
1662
1663 /*
1664 * Opportunistically try to cancel thread calls that are likely in flight.
1665 * workq_exit() will do the proper cleanup.
1666 */
1667 if (wq_flags & WQ_IMMEDIATE_CALL_SCHEDULED) {
1668 thread_call_cancel(wq->wq_immediate_call);
1669 }
1670 if (wq_flags & WQ_DELAYED_CALL_SCHEDULED) {
1671 thread_call_cancel(wq->wq_delayed_call);
1672 }
1673 if (wq_flags & WQ_DEATH_CALL_SCHEDULED) {
1674 thread_call_cancel(wq->wq_death_call);
1675 }
1676
1677 mgr_req = wq->wq_event_manager_threadreq;
1678 wq->wq_event_manager_threadreq = NULL;
1679 wq->wq_reqcount = 0; /* workq_schedule_creator must not look at queues */
1680 workq_turnstile_update_inheritor(wq, NULL, 0);
1681
1682 workq_unlock(wq);
1683
1684 if (mgr_req) {
1685 kqueue_threadreq_cancel(p, mgr_req);
1686 }
1687 /*
1688 * No one touches the priority queues once WQ_EXITING is set.
1689 * It is hence safe to do the tear down without holding any lock.
1690 */
1691 priority_queue_destroy(&wq->wq_overcommit_queue,
1692 struct workq_threadreq_s, tr_entry, ^(void *e){
1693 workq_threadreq_destroy(p, e);
1694 });
1695 priority_queue_destroy(&wq->wq_constrained_queue,
1696 struct workq_threadreq_s, tr_entry, ^(void *e){
1697 workq_threadreq_destroy(p, e);
1698 });
1699 priority_queue_destroy(&wq->wq_special_queue,
1700 struct workq_threadreq_s, tr_entry, ^(void *e){
1701 workq_threadreq_destroy(p, e);
1702 });
1703
1704 WQ_TRACE(TRACE_wq_pthread_exit | DBG_FUNC_END, 0, 0, 0, 0, 0);
1705 }
1706
1707 /*
1708 * Routine: workq_exit
1709 *
1710 * Function: clean up the work queue structure(s) now that there are no threads
1711 * left running inside the work queue (except possibly current_thread).
1712 *
1713 * Conditions: Called by the last thread in the process.
1714 * Called against current process.
1715 */
1716 void
1717 workq_exit(struct proc *p)
1718 {
1719 struct workqueue *wq;
1720 struct uthread *uth, *tmp;
1721
1722 wq = os_atomic_xchg(&p->p_wqptr, NULL, relaxed);
1723 if (wq != NULL) {
1724 thread_t th = current_thread();
1725
1726 WQ_TRACE_WQ(TRACE_wq_workqueue_exit | DBG_FUNC_START, wq, 0, 0, 0, 0);
1727
1728 if (thread_get_tag(th) & THREAD_TAG_WORKQUEUE) {
1729 /*
1730 * <rdar://problem/40111515> Make sure we will no longer call the
1731 * sched call, if we ever block this thread, which the cancel_wait
1732 * below can do.
1733 */
1734 thread_sched_call(th, NULL);
1735 }
1736
1737 /*
1738 * Thread calls are always scheduled by the proc itself or under the
1739 * workqueue spinlock if WQ_EXITING is not yet set.
1740 *
1741 * Either way, when this runs, the proc has no threads left beside
1742 * the one running this very code, so we know no thread call can be
1743 * dispatched anymore.
1744 */
1745 thread_call_cancel_wait(wq->wq_delayed_call);
1746 thread_call_cancel_wait(wq->wq_immediate_call);
1747 thread_call_cancel_wait(wq->wq_death_call);
1748 thread_call_free(wq->wq_delayed_call);
1749 thread_call_free(wq->wq_immediate_call);
1750 thread_call_free(wq->wq_death_call);
1751
1752 /*
1753 * Clean up workqueue data structures for threads that exited and
1754 * didn't get a chance to clean up after themselves.
1755 *
1756 * idle/new threads should have been interrupted and died on their own
1757 */
1758 TAILQ_FOREACH_SAFE(uth, &wq->wq_thrunlist, uu_workq_entry, tmp) {
1759 thread_sched_call(uth->uu_thread, NULL);
1760 thread_deallocate(uth->uu_thread);
1761 }
1762 assert(TAILQ_EMPTY(&wq->wq_thnewlist));
1763 assert(TAILQ_EMPTY(&wq->wq_thidlelist));
1764
1765 WQ_TRACE_WQ(TRACE_wq_destroy | DBG_FUNC_END, wq,
1766 VM_KERNEL_ADDRHIDE(wq), 0, 0, 0);
1767
1768 workq_deallocate(wq);
1769
1770 WQ_TRACE(TRACE_wq_workqueue_exit | DBG_FUNC_END, 0, 0, 0, 0, 0);
1771 }
1772 }
1773
1774
1775 #pragma mark bsd thread control
1776
1777 static bool
1778 _pthread_priority_to_policy(pthread_priority_t priority,
1779 thread_qos_policy_data_t *data)
1780 {
1781 data->qos_tier = _pthread_priority_thread_qos(priority);
1782 data->tier_importance = _pthread_priority_relpri(priority);
1783 if (data->qos_tier == THREAD_QOS_UNSPECIFIED || data->tier_importance > 0 ||
1784 data->tier_importance < THREAD_QOS_MIN_TIER_IMPORTANCE) {
1785 return false;
1786 }
1787 return true;
1788 }
1789
1790 static int
1791 bsdthread_set_self(proc_t p, thread_t th, pthread_priority_t priority,
1792 mach_port_name_t voucher, enum workq_set_self_flags flags)
1793 {
1794 struct uthread *uth = get_bsdthread_info(th);
1795 struct workqueue *wq = proc_get_wqptr(p);
1796
1797 kern_return_t kr;
1798 int unbind_rv = 0, qos_rv = 0, voucher_rv = 0, fixedpri_rv = 0;
1799 bool is_wq_thread = (thread_get_tag(th) & THREAD_TAG_WORKQUEUE);
1800
1801 if (flags & WORKQ_SET_SELF_WQ_KEVENT_UNBIND) {
1802 if (!is_wq_thread) {
1803 unbind_rv = EINVAL;
1804 goto qos;
1805 }
1806
1807 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
1808 unbind_rv = EINVAL;
1809 goto qos;
1810 }
1811
1812 struct kqrequest *kqr = uth->uu_kqr_bound;
1813 if (kqr == NULL) {
1814 unbind_rv = EALREADY;
1815 goto qos;
1816 }
1817
1818 if (kqr->kqr_state & KQR_WORKLOOP) {
1819 unbind_rv = EINVAL;
1820 goto qos;
1821 }
1822
1823 kqueue_threadreq_unbind(p, uth->uu_kqr_bound);
1824 }
1825
1826 qos:
1827 if (flags & WORKQ_SET_SELF_QOS_FLAG) {
1828 thread_qos_policy_data_t new_policy;
1829
1830 if (!_pthread_priority_to_policy(priority, &new_policy)) {
1831 qos_rv = EINVAL;
1832 goto voucher;
1833 }
1834
1835 if (!is_wq_thread) {
1836 /*
1837 * Threads opted out of QoS can't change QoS
1838 */
1839 if (!thread_has_qos_policy(th)) {
1840 qos_rv = EPERM;
1841 goto voucher;
1842 }
1843 } else if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
1844 /*
1845 * Workqueue manager threads can't change QoS
1846 */
1847 qos_rv = EINVAL;
1848 goto voucher;
1849 } else {
1850 /*
1851 * For workqueue threads, possibly adjust buckets and redrive thread
1852 * requests.
1853 */
1854 bool old_overcommit = uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT;
1855 bool new_overcommit = priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
1856 struct uu_workq_policy old_pri, new_pri;
1857 bool force_run = false;
1858
1859 workq_lock_spin(wq);
1860
1861 if (old_overcommit != new_overcommit) {
1862 uth->uu_workq_flags ^= UT_WORKQ_OVERCOMMIT;
1863 if (old_overcommit) {
1864 wq->wq_constrained_threads_scheduled++;
1865 } else if (wq->wq_constrained_threads_scheduled-- ==
1866 wq_max_constrained_threads) {
1867 force_run = true;
1868 }
1869 }
1870
1871 old_pri = new_pri = uth->uu_workq_pri;
1872 new_pri.qos_req = new_policy.qos_tier;
1873 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, force_run);
1874 workq_unlock(wq);
1875 }
1876
1877 kr = thread_policy_set_internal(th, THREAD_QOS_POLICY,
1878 (thread_policy_t)&new_policy, THREAD_QOS_POLICY_COUNT);
1879 if (kr != KERN_SUCCESS) {
1880 qos_rv = EINVAL;
1881 }
1882 }
1883
1884 voucher:
1885 if (flags & WORKQ_SET_SELF_VOUCHER_FLAG) {
1886 kr = thread_set_voucher_name(voucher);
1887 if (kr != KERN_SUCCESS) {
1888 voucher_rv = ENOENT;
1889 goto fixedpri;
1890 }
1891 }
1892
1893 fixedpri:
1894 if (qos_rv) {
1895 goto done;
1896 }
1897 if (flags & WORKQ_SET_SELF_FIXEDPRIORITY_FLAG) {
1898 thread_extended_policy_data_t extpol = {.timeshare = 0};
1899
1900 if (is_wq_thread) {
1901 /* Not allowed on workqueue threads */
1902 fixedpri_rv = ENOTSUP;
1903 goto done;
1904 }
1905
1906 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
1907 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
1908 if (kr != KERN_SUCCESS) {
1909 fixedpri_rv = EINVAL;
1910 goto done;
1911 }
1912 } else if (flags & WORKQ_SET_SELF_TIMESHARE_FLAG) {
1913 thread_extended_policy_data_t extpol = {.timeshare = 1};
1914
1915 if (is_wq_thread) {
1916 /* Not allowed on workqueue threads */
1917 fixedpri_rv = ENOTSUP;
1918 goto done;
1919 }
1920
1921 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
1922 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
1923 if (kr != KERN_SUCCESS) {
1924 fixedpri_rv = EINVAL;
1925 goto done;
1926 }
1927 }
1928
1929 done:
1930 if (qos_rv && voucher_rv) {
1931 /* Both failed, give that a unique error. */
1932 return EBADMSG;
1933 }
1934
1935 if (unbind_rv) {
1936 return unbind_rv;
1937 }
1938
1939 if (qos_rv) {
1940 return qos_rv;
1941 }
1942
1943 if (voucher_rv) {
1944 return voucher_rv;
1945 }
1946
1947 if (fixedpri_rv) {
1948 return fixedpri_rv;
1949 }
1950
1951 return 0;
1952 }
1953
1954 static int
1955 bsdthread_add_explicit_override(proc_t p, mach_port_name_t kport,
1956 pthread_priority_t pp, user_addr_t resource)
1957 {
1958 thread_qos_t qos = _pthread_priority_thread_qos(pp);
1959 if (qos == THREAD_QOS_UNSPECIFIED) {
1960 return EINVAL;
1961 }
1962
1963 thread_t th = port_name_to_thread(kport);
1964 if (th == THREAD_NULL) {
1965 return ESRCH;
1966 }
1967
1968 int rv = proc_thread_qos_add_override(p->task, th, 0, qos, TRUE,
1969 resource, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
1970
1971 thread_deallocate(th);
1972 return rv;
1973 }
1974
1975 static int
1976 bsdthread_remove_explicit_override(proc_t p, mach_port_name_t kport,
1977 user_addr_t resource)
1978 {
1979 thread_t th = port_name_to_thread(kport);
1980 if (th == THREAD_NULL) {
1981 return ESRCH;
1982 }
1983
1984 int rv = proc_thread_qos_remove_override(p->task, th, 0, resource,
1985 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
1986
1987 thread_deallocate(th);
1988 return rv;
1989 }
1990
1991 static int
1992 workq_thread_add_dispatch_override(proc_t p, mach_port_name_t kport,
1993 pthread_priority_t pp, user_addr_t ulock_addr)
1994 {
1995 struct uu_workq_policy old_pri, new_pri;
1996 struct workqueue *wq = proc_get_wqptr(p);
1997
1998 thread_qos_t qos_override = _pthread_priority_thread_qos(pp);
1999 if (qos_override == THREAD_QOS_UNSPECIFIED) {
2000 return EINVAL;
2001 }
2002
2003 thread_t thread = port_name_to_thread(kport);
2004 if (thread == THREAD_NULL) {
2005 return ESRCH;
2006 }
2007
2008 struct uthread *uth = get_bsdthread_info(thread);
2009 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2010 thread_deallocate(thread);
2011 return EPERM;
2012 }
2013
2014 WQ_TRACE_WQ(TRACE_wq_override_dispatch | DBG_FUNC_NONE,
2015 wq, thread_tid(thread), 1, pp, 0);
2016
2017 thread_mtx_lock(thread);
2018
2019 if (ulock_addr) {
2020 uint64_t val;
2021 int rc;
2022 /*
2023 * Workaround lack of explicit support for 'no-fault copyin'
2024 * <rdar://problem/24999882>, as disabling preemption prevents paging in
2025 */
2026 disable_preemption();
2027 rc = copyin_word(ulock_addr, &val, sizeof(kport));
2028 enable_preemption();
2029 if (rc == 0 && ulock_owner_value_to_port_name((uint32_t)val) != kport) {
2030 goto out;
2031 }
2032 }
2033
2034 workq_lock_spin(wq);
2035
2036 old_pri = uth->uu_workq_pri;
2037 if (old_pri.qos_override >= qos_override) {
2038 /* Nothing to do */
2039 } else if (thread == current_thread()) {
2040 new_pri = old_pri;
2041 new_pri.qos_override = qos_override;
2042 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2043 } else {
2044 uth->uu_workq_pri.qos_override = qos_override;
2045 if (qos_override > workq_pri_override(old_pri)) {
2046 thread_set_workq_override(thread, qos_override);
2047 }
2048 }
2049
2050 workq_unlock(wq);
2051
2052 out:
2053 thread_mtx_unlock(thread);
2054 thread_deallocate(thread);
2055 return 0;
2056 }
2057
2058 static int
2059 workq_thread_reset_dispatch_override(proc_t p, thread_t thread)
2060 {
2061 struct uu_workq_policy old_pri, new_pri;
2062 struct workqueue *wq = proc_get_wqptr(p);
2063 struct uthread *uth = get_bsdthread_info(thread);
2064
2065 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2066 return EPERM;
2067 }
2068
2069 WQ_TRACE_WQ(TRACE_wq_override_reset | DBG_FUNC_NONE, wq, 0, 0, 0, 0);
2070
2071 workq_lock_spin(wq);
2072 old_pri = new_pri = uth->uu_workq_pri;
2073 new_pri.qos_override = THREAD_QOS_UNSPECIFIED;
2074 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2075 workq_unlock(wq);
2076 return 0;
2077 }
2078
2079 static int
2080 bsdthread_get_max_parallelism(thread_qos_t qos, unsigned long flags,
2081 int *retval)
2082 {
2083 static_assert(QOS_PARALLELISM_COUNT_LOGICAL ==
2084 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL, "logical");
2085 static_assert(QOS_PARALLELISM_REALTIME ==
2086 _PTHREAD_QOS_PARALLELISM_REALTIME, "realtime");
2087
2088 if (flags & ~(QOS_PARALLELISM_REALTIME | QOS_PARALLELISM_COUNT_LOGICAL)) {
2089 return EINVAL;
2090 }
2091
2092 if (flags & QOS_PARALLELISM_REALTIME) {
2093 if (qos) {
2094 return EINVAL;
2095 }
2096 } else if (qos == THREAD_QOS_UNSPECIFIED || qos >= THREAD_QOS_LAST) {
2097 return EINVAL;
2098 }
2099
2100 *retval = qos_max_parallelism(qos, flags);
2101 return 0;
2102 }
2103
2104 #define ENSURE_UNUSED(arg) \
2105 ({ if ((arg) != 0) { return EINVAL; } })
2106
2107 int
2108 bsdthread_ctl(struct proc *p, struct bsdthread_ctl_args *uap, int *retval)
2109 {
2110 switch (uap->cmd) {
2111 case BSDTHREAD_CTL_QOS_OVERRIDE_START:
2112 return bsdthread_add_explicit_override(p, (mach_port_name_t)uap->arg1,
2113 (pthread_priority_t)uap->arg2, uap->arg3);
2114 case BSDTHREAD_CTL_QOS_OVERRIDE_END:
2115 ENSURE_UNUSED(uap->arg3);
2116 return bsdthread_remove_explicit_override(p, (mach_port_name_t)uap->arg1,
2117 (user_addr_t)uap->arg2);
2118
2119 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH:
2120 return workq_thread_add_dispatch_override(p, (mach_port_name_t)uap->arg1,
2121 (pthread_priority_t)uap->arg2, uap->arg3);
2122 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET:
2123 return workq_thread_reset_dispatch_override(p, current_thread());
2124
2125 case BSDTHREAD_CTL_SET_SELF:
2126 return bsdthread_set_self(p, current_thread(),
2127 (pthread_priority_t)uap->arg1, (mach_port_name_t)uap->arg2,
2128 (enum workq_set_self_flags)uap->arg3);
2129
2130 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM:
2131 ENSURE_UNUSED(uap->arg3);
2132 return bsdthread_get_max_parallelism((thread_qos_t)uap->arg1,
2133 (unsigned long)uap->arg2, retval);
2134
2135 case BSDTHREAD_CTL_SET_QOS:
2136 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD:
2137 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET:
2138 /* no longer supported */
2139 return ENOTSUP;
2140
2141 default:
2142 return EINVAL;
2143 }
2144 }
2145
2146 #pragma mark workqueue thread manipulation
2147
2148 static void __dead2
2149 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2150 struct uthread *uth);
2151
2152 static void workq_setup_and_run(proc_t p, struct uthread *uth, int flags) __dead2;
2153
2154 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2155 static inline uint64_t
2156 workq_trace_req_id(workq_threadreq_t req)
2157 {
2158 struct kqworkloop *kqwl;
2159 if (req->tr_flags & TR_FLAG_WORKLOOP) {
2160 kqwl = __container_of(req, struct kqworkloop, kqwl_request.kqr_req);
2161 return kqwl->kqwl_dynamicid;
2162 }
2163
2164 return VM_KERNEL_ADDRHIDE(req);
2165 }
2166 #endif
2167
2168 /**
2169 * Entry point for libdispatch to ask for threads
2170 */
2171 static int
2172 workq_reqthreads(struct proc *p, uint32_t reqcount, pthread_priority_t pp)
2173 {
2174 thread_qos_t qos = _pthread_priority_thread_qos(pp);
2175 struct workqueue *wq = proc_get_wqptr(p);
2176 uint32_t unpaced, upcall_flags = WQ_FLAG_THREAD_NEWSPI;
2177
2178 if (wq == NULL || reqcount <= 0 || reqcount > UINT16_MAX ||
2179 qos == THREAD_QOS_UNSPECIFIED) {
2180 return EINVAL;
2181 }
2182
2183 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads | DBG_FUNC_NONE,
2184 wq, reqcount, pp, 0, 0);
2185
2186 workq_threadreq_t req = zalloc(workq_zone_threadreq);
2187 priority_queue_entry_init(&req->tr_entry);
2188 req->tr_state = TR_STATE_NEW;
2189 req->tr_flags = 0;
2190 req->tr_qos = qos;
2191
2192 if (pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
2193 req->tr_flags |= TR_FLAG_OVERCOMMIT;
2194 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2195 }
2196
2197 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE,
2198 wq, workq_trace_req_id(req), req->tr_qos, reqcount, 0);
2199
2200 workq_lock_spin(wq);
2201 do {
2202 if (_wq_exiting(wq)) {
2203 goto exiting;
2204 }
2205
2206 /*
2207 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2208 * threads without pacing, to inform the scheduler of that workload.
2209 *
2210 * The last requests, or the ones that failed the admission checks are
2211 * enqueued and go through the regular creator codepath.
2212 *
2213 * If there aren't enough threads, add one, but re-evaluate everything
2214 * as conditions may now have changed.
2215 */
2216 if (reqcount > 1 && (req->tr_flags & TR_FLAG_OVERCOMMIT) == 0) {
2217 unpaced = workq_constrained_allowance(wq, qos, NULL, false);
2218 if (unpaced >= reqcount - 1) {
2219 unpaced = reqcount - 1;
2220 }
2221 } else {
2222 unpaced = reqcount - 1;
2223 }
2224
2225 /*
2226 * This path does not currently handle custom workloop parameters
2227 * when creating threads for parallelism.
2228 */
2229 assert(!(req->tr_flags & TR_FLAG_WL_PARAMS));
2230
2231 /*
2232 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
2233 */
2234 while (unpaced > 0 && wq->wq_thidlecount) {
2235 struct uthread *uth = workq_pop_idle_thread(wq);
2236
2237 _wq_thactive_inc(wq, qos);
2238 wq->wq_thscheduled_count[_wq_bucket(qos)]++;
2239 workq_thread_reset_pri(wq, uth, req);
2240 wq->wq_fulfilled++;
2241
2242 uth->uu_workq_flags |= UT_WORKQ_EARLY_BOUND;
2243 if ((req->tr_flags & TR_FLAG_OVERCOMMIT) == 0) {
2244 uth->uu_workq_flags &= ~UT_WORKQ_OVERCOMMIT;
2245 wq->wq_constrained_threads_scheduled++;
2246 }
2247 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
2248 uth->uu_save.uus_workq_park_data.thread_request = req;
2249 workq_thread_wakeup(uth);
2250 unpaced--;
2251 reqcount--;
2252 }
2253 } while (unpaced && wq->wq_nthreads < wq_max_threads &&
2254 workq_add_new_idle_thread(p, wq));
2255
2256 if (_wq_exiting(wq)) {
2257 goto exiting;
2258 }
2259
2260 req->tr_count = reqcount;
2261 if (workq_threadreq_enqueue(wq, req)) {
2262 /* This can drop the workqueue lock, and take it again */
2263 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
2264 }
2265 workq_unlock(wq);
2266 return 0;
2267
2268 exiting:
2269 workq_unlock(wq);
2270 zfree(workq_zone_threadreq, req);
2271 return ECANCELED;
2272 }
2273
2274 bool
2275 workq_kern_threadreq_initiate(struct proc *p, struct kqrequest *kqr,
2276 struct turnstile *workloop_ts, thread_qos_t qos, int flags)
2277 {
2278 struct workqueue *wq = proc_get_wqptr_fast(p);
2279 workq_threadreq_t req = &kqr->kqr_req;
2280 struct uthread *uth = NULL;
2281 uint8_t tr_flags = 0;
2282
2283 if (kqr->kqr_state & KQR_WORKLOOP) {
2284 tr_flags = TR_FLAG_WORKLOOP;
2285
2286 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
2287 if (trp.trp_flags & TRP_PRIORITY) {
2288 tr_flags |= TR_FLAG_WL_OUTSIDE_QOS;
2289 qos = thread_workq_qos_for_pri(trp.trp_pri);
2290 if (qos == THREAD_QOS_UNSPECIFIED) {
2291 qos = WORKQ_THREAD_QOS_ABOVEUI;
2292 }
2293 }
2294 if (trp.trp_flags) {
2295 tr_flags |= TR_FLAG_WL_PARAMS;
2296 }
2297 } else {
2298 tr_flags = TR_FLAG_KEVENT;
2299 }
2300 if (qos != WORKQ_THREAD_QOS_MANAGER &&
2301 (kqr->kqr_state & KQR_THOVERCOMMIT)) {
2302 tr_flags |= TR_FLAG_OVERCOMMIT;
2303 }
2304
2305 assert(req->tr_state == TR_STATE_IDLE);
2306 priority_queue_entry_init(&req->tr_entry);
2307 req->tr_count = 1;
2308 req->tr_state = TR_STATE_NEW;
2309 req->tr_flags = tr_flags;
2310 req->tr_qos = qos;
2311
2312 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, wq,
2313 workq_trace_req_id(req), qos, 1, 0);
2314
2315 if (flags & WORKQ_THREADREQ_ATTEMPT_REBIND) {
2316 /*
2317 * we're called back synchronously from the context of
2318 * kqueue_threadreq_unbind from within workq_thread_return()
2319 * we can try to match up this thread with this request !
2320 */
2321 uth = current_uthread();
2322 assert(uth->uu_kqr_bound == NULL);
2323 }
2324
2325 workq_lock_spin(wq);
2326 if (_wq_exiting(wq)) {
2327 workq_unlock(wq);
2328 return false;
2329 }
2330
2331 if (uth && workq_threadreq_admissible(wq, uth, req)) {
2332 assert(uth != wq->wq_creator);
2333 workq_threadreq_bind_and_unlock(p, wq, req, uth);
2334 } else {
2335 if (workloop_ts) {
2336 workq_perform_turnstile_operation_locked(wq, ^{
2337 turnstile_update_inheritor(workloop_ts, wq->wq_turnstile,
2338 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
2339 turnstile_update_inheritor_complete(workloop_ts,
2340 TURNSTILE_INTERLOCK_HELD);
2341 });
2342 }
2343 if (workq_threadreq_enqueue(wq, req)) {
2344 workq_schedule_creator(p, wq, flags);
2345 }
2346 workq_unlock(wq);
2347 }
2348
2349 return true;
2350 }
2351
2352 void
2353 workq_kern_threadreq_modify(struct proc *p, struct kqrequest *kqr,
2354 thread_qos_t qos, int flags)
2355 {
2356 struct workqueue *wq = proc_get_wqptr_fast(p);
2357 workq_threadreq_t req = &kqr->kqr_req;
2358 bool change_overcommit = false;
2359
2360 if (req->tr_flags & TR_FLAG_WL_OUTSIDE_QOS) {
2361 /* Requests outside-of-QoS shouldn't accept modify operations */
2362 return;
2363 }
2364
2365 workq_lock_spin(wq);
2366
2367 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
2368 assert(req->tr_flags & (TR_FLAG_KEVENT | TR_FLAG_WORKLOOP));
2369
2370 if (req->tr_state == TR_STATE_BINDING) {
2371 kqueue_threadreq_bind(p, req, req->tr_binding_thread, 0);
2372 workq_unlock(wq);
2373 return;
2374 }
2375
2376 change_overcommit = (bool)(kqr->kqr_state & KQR_THOVERCOMMIT) !=
2377 (bool)(req->tr_flags & TR_FLAG_OVERCOMMIT);
2378
2379 if (_wq_exiting(wq) || (req->tr_qos == qos && !change_overcommit)) {
2380 workq_unlock(wq);
2381 return;
2382 }
2383
2384 assert(req->tr_count == 1);
2385 if (req->tr_state != TR_STATE_QUEUED) {
2386 panic("Invalid thread request (%p) state %d", req, req->tr_state);
2387 }
2388
2389 WQ_TRACE_WQ(TRACE_wq_thread_request_modify | DBG_FUNC_NONE, wq,
2390 workq_trace_req_id(req), qos, 0, 0);
2391
2392 struct priority_queue *pq = workq_priority_queue_for_req(wq, req);
2393 workq_threadreq_t req_max;
2394
2395 /*
2396 * Stage 1: Dequeue the request from its priority queue.
2397 *
2398 * If we dequeue the root item of the constrained priority queue,
2399 * maintain the best constrained request qos invariant.
2400 */
2401 if (priority_queue_remove(pq, &req->tr_entry,
2402 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE)) {
2403 if ((req->tr_flags & TR_FLAG_OVERCOMMIT) == 0) {
2404 _wq_thactive_refresh_best_constrained_req_qos(wq);
2405 }
2406 }
2407
2408 /*
2409 * Stage 2: Apply changes to the thread request
2410 *
2411 * If the item will not become the root of the priority queue it belongs to,
2412 * then we need to wait in line, just enqueue and return quickly.
2413 */
2414 if (__improbable(change_overcommit)) {
2415 req->tr_flags ^= TR_FLAG_OVERCOMMIT;
2416 pq = workq_priority_queue_for_req(wq, req);
2417 }
2418 req->tr_qos = qos;
2419
2420 req_max = priority_queue_max(pq, struct workq_threadreq_s, tr_entry);
2421 if (req_max && req_max->tr_qos >= qos) {
2422 priority_queue_insert(pq, &req->tr_entry, workq_priority_for_req(req),
2423 PRIORITY_QUEUE_SCHED_PRI_MAX_HEAP_COMPARE);
2424 workq_unlock(wq);
2425 return;
2426 }
2427
2428 /*
2429 * Stage 3: Reevaluate whether we should run the thread request.
2430 *
2431 * Pretend the thread request is new again:
2432 * - adjust wq_reqcount to not count it anymore.
2433 * - make its state TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
2434 * properly attempts a synchronous bind)
2435 */
2436 wq->wq_reqcount--;
2437 req->tr_state = TR_STATE_NEW;
2438 if (workq_threadreq_enqueue(wq, req)) {
2439 workq_schedule_creator(p, wq, flags);
2440 }
2441 workq_unlock(wq);
2442 }
2443
2444 void
2445 workq_kern_threadreq_lock(struct proc *p)
2446 {
2447 workq_lock_spin(proc_get_wqptr_fast(p));
2448 }
2449
2450 void
2451 workq_kern_threadreq_unlock(struct proc *p)
2452 {
2453 workq_unlock(proc_get_wqptr_fast(p));
2454 }
2455
2456 void
2457 workq_kern_threadreq_update_inheritor(struct proc *p, struct kqrequest *kqr,
2458 thread_t owner, struct turnstile *wl_ts,
2459 turnstile_update_flags_t flags)
2460 {
2461 struct workqueue *wq = proc_get_wqptr_fast(p);
2462 workq_threadreq_t req = &kqr->kqr_req;
2463 turnstile_inheritor_t inheritor;
2464
2465 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
2466 assert(req->tr_flags & TR_FLAG_WORKLOOP);
2467 workq_lock_held(wq);
2468
2469 if (req->tr_state == TR_STATE_BINDING) {
2470 kqueue_threadreq_bind(p, req, req->tr_binding_thread,
2471 KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE);
2472 return;
2473 }
2474
2475 if (_wq_exiting(wq)) {
2476 inheritor = TURNSTILE_INHERITOR_NULL;
2477 } else {
2478 if (req->tr_state != TR_STATE_QUEUED) {
2479 panic("Invalid thread request (%p) state %d", req, req->tr_state);
2480 }
2481
2482 if (owner) {
2483 inheritor = owner;
2484 flags |= TURNSTILE_INHERITOR_THREAD;
2485 } else {
2486 inheritor = wq->wq_turnstile;
2487 flags |= TURNSTILE_INHERITOR_TURNSTILE;
2488 }
2489 }
2490
2491 workq_perform_turnstile_operation_locked(wq, ^{
2492 turnstile_update_inheritor(wl_ts, inheritor, flags);
2493 });
2494 }
2495
2496 void
2497 workq_kern_threadreq_redrive(struct proc *p, int flags)
2498 {
2499 struct workqueue *wq = proc_get_wqptr_fast(p);
2500
2501 workq_lock_spin(wq);
2502 workq_schedule_creator(p, wq, flags);
2503 workq_unlock(wq);
2504 }
2505
2506 void
2507 workq_schedule_creator_turnstile_redrive(struct workqueue *wq, bool locked)
2508 {
2509 if (!locked) {
2510 workq_lock_spin(wq);
2511 }
2512 workq_schedule_creator(NULL, wq, WORKQ_THREADREQ_CREATOR_SYNC_UPDATE);
2513 if (!locked) {
2514 workq_unlock(wq);
2515 }
2516 }
2517
2518 static int
2519 workq_thread_return(struct proc *p, struct workq_kernreturn_args *uap,
2520 struct workqueue *wq)
2521 {
2522 thread_t th = current_thread();
2523 struct uthread *uth = get_bsdthread_info(th);
2524 struct kqrequest *kqr = uth->uu_kqr_bound;
2525 workq_threadreq_param_t trp = { };
2526 int nevents = uap->affinity, error;
2527 user_addr_t eventlist = uap->item;
2528
2529 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
2530 (uth->uu_workq_flags & UT_WORKQ_DYING)) {
2531 return EINVAL;
2532 }
2533
2534 if (eventlist && nevents && kqr == NULL) {
2535 return EINVAL;
2536 }
2537
2538 /* reset signal mask on the workqueue thread to default state */
2539 if (uth->uu_sigmask != (sigset_t)(~workq_threadmask)) {
2540 proc_lock(p);
2541 uth->uu_sigmask = ~workq_threadmask;
2542 proc_unlock(p);
2543 }
2544
2545 if (kqr && kqr->kqr_req.tr_flags & TR_FLAG_WL_PARAMS) {
2546 /*
2547 * Ensure we store the threadreq param before unbinding
2548 * the kqr from this thread.
2549 */
2550 trp = kqueue_threadreq_workloop_param(&kqr->kqr_req);
2551 }
2552
2553 if (kqr) {
2554 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI | WQ_FLAG_THREAD_REUSE;
2555 if (kqr->kqr_state & KQR_WORKLOOP) {
2556 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
2557 } else {
2558 upcall_flags |= WQ_FLAG_THREAD_KEVENT;
2559 }
2560 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
2561 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
2562 } else {
2563 if (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) {
2564 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2565 }
2566 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
2567 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
2568 } else {
2569 upcall_flags |= uth->uu_workq_pri.qos_req |
2570 WQ_FLAG_THREAD_PRIO_QOS;
2571 }
2572 }
2573
2574 error = pthread_functions->workq_handle_stack_events(p, th,
2575 get_task_map(p->task), uth->uu_workq_stackaddr,
2576 uth->uu_workq_thport, eventlist, nevents, upcall_flags);
2577 if (error) {
2578 return error;
2579 }
2580
2581 // pthread is supposed to pass KEVENT_FLAG_PARKING here
2582 // which should cause the above call to either:
2583 // - not return
2584 // - return an error
2585 // - return 0 and have unbound properly
2586 assert(uth->uu_kqr_bound == NULL);
2587 }
2588
2589 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_END, wq, uap->options, 0, 0, 0);
2590
2591 thread_sched_call(th, NULL);
2592 thread_will_park_or_terminate(th);
2593 #if CONFIG_WORKLOOP_DEBUG
2594 UU_KEVENT_HISTORY_WRITE_ENTRY(uth, { .uu_error = -1, });
2595 #endif
2596
2597 workq_lock_spin(wq);
2598 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0, 0);
2599 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
2600 workq_select_threadreq_or_park_and_unlock(p, wq, uth);
2601 __builtin_unreachable();
2602 }
2603
2604 /**
2605 * Multiplexed call to interact with the workqueue mechanism
2606 */
2607 int
2608 workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, int32_t *retval)
2609 {
2610 int options = uap->options;
2611 int arg2 = uap->affinity;
2612 int arg3 = uap->prio;
2613 struct workqueue *wq = proc_get_wqptr(p);
2614 int error = 0;
2615
2616 if ((p->p_lflag & P_LREGISTER) == 0) {
2617 return EINVAL;
2618 }
2619
2620 switch (options) {
2621 case WQOPS_QUEUE_NEWSPISUPP: {
2622 /*
2623 * arg2 = offset of serialno into dispatch queue
2624 * arg3 = kevent support
2625 */
2626 int offset = arg2;
2627 if (arg3 & 0x01) {
2628 // If we get here, then userspace has indicated support for kevent delivery.
2629 }
2630
2631 p->p_dispatchqueue_serialno_offset = (uint64_t)offset;
2632 break;
2633 }
2634 case WQOPS_QUEUE_REQTHREADS: {
2635 /*
2636 * arg2 = number of threads to start
2637 * arg3 = priority
2638 */
2639 error = workq_reqthreads(p, arg2, arg3);
2640 break;
2641 }
2642 case WQOPS_SET_EVENT_MANAGER_PRIORITY: {
2643 /*
2644 * arg2 = priority for the manager thread
2645 *
2646 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
2647 * the low bits of the value contains a scheduling priority
2648 * instead of a QOS value
2649 */
2650 pthread_priority_t pri = arg2;
2651
2652 if (wq == NULL) {
2653 error = EINVAL;
2654 break;
2655 }
2656
2657 /*
2658 * Normalize the incoming priority so that it is ordered numerically.
2659 */
2660 if (pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
2661 pri &= (_PTHREAD_PRIORITY_SCHED_PRI_MASK |
2662 _PTHREAD_PRIORITY_SCHED_PRI_FLAG);
2663 } else {
2664 thread_qos_t qos = _pthread_priority_thread_qos(pri);
2665 int relpri = _pthread_priority_relpri(pri);
2666 if (relpri > 0 || relpri < THREAD_QOS_MIN_TIER_IMPORTANCE ||
2667 qos == THREAD_QOS_UNSPECIFIED) {
2668 error = EINVAL;
2669 break;
2670 }
2671 pri &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
2672 }
2673
2674 /*
2675 * If userspace passes a scheduling priority, that wins over any QoS.
2676 * Userspace should takes care not to lower the priority this way.
2677 */
2678 workq_lock_spin(wq);
2679 if (wq->wq_event_manager_priority < (uint32_t)pri) {
2680 wq->wq_event_manager_priority = (uint32_t)pri;
2681 }
2682 workq_unlock(wq);
2683 break;
2684 }
2685 case WQOPS_THREAD_KEVENT_RETURN:
2686 case WQOPS_THREAD_WORKLOOP_RETURN:
2687 case WQOPS_THREAD_RETURN: {
2688 error = workq_thread_return(p, uap, wq);
2689 break;
2690 }
2691
2692 case WQOPS_SHOULD_NARROW: {
2693 /*
2694 * arg2 = priority to test
2695 * arg3 = unused
2696 */
2697 thread_t th = current_thread();
2698 struct uthread *uth = get_bsdthread_info(th);
2699 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
2700 (uth->uu_workq_flags & (UT_WORKQ_DYING | UT_WORKQ_OVERCOMMIT))) {
2701 error = EINVAL;
2702 break;
2703 }
2704
2705 thread_qos_t qos = _pthread_priority_thread_qos(arg2);
2706 if (qos == THREAD_QOS_UNSPECIFIED) {
2707 error = EINVAL;
2708 break;
2709 }
2710 workq_lock_spin(wq);
2711 bool should_narrow = !workq_constrained_allowance(wq, qos, uth, false);
2712 workq_unlock(wq);
2713
2714 *retval = should_narrow;
2715 break;
2716 }
2717 default:
2718 error = EINVAL;
2719 break;
2720 }
2721
2722 return error;
2723 }
2724
2725 /*
2726 * We have no work to do, park ourselves on the idle list.
2727 *
2728 * Consumes the workqueue lock and does not return.
2729 */
2730 __attribute__((noreturn, noinline))
2731 static void
2732 workq_park_and_unlock(proc_t p, struct workqueue *wq, struct uthread *uth)
2733 {
2734 assert(uth == current_uthread());
2735 assert(uth->uu_kqr_bound == NULL);
2736 workq_push_idle_thread(p, wq, uth); // may not return
2737
2738 workq_thread_reset_cpupercent(NULL, uth);
2739
2740 if (uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) {
2741 workq_unlock(wq);
2742
2743 /*
2744 * workq_push_idle_thread() will unset `has_stack`
2745 * if it wants us to free the stack before parking.
2746 */
2747 if (!uth->uu_save.uus_workq_park_data.has_stack) {
2748 pthread_functions->workq_markfree_threadstack(p, uth->uu_thread,
2749 get_task_map(p->task), uth->uu_workq_stackaddr);
2750 }
2751
2752 /*
2753 * When we remove the voucher from the thread, we may lose our importance
2754 * causing us to get preempted, so we do this after putting the thread on
2755 * the idle list. Then, when we get our importance back we'll be able to
2756 * use this thread from e.g. the kevent call out to deliver a boosting
2757 * message.
2758 */
2759 __assert_only kern_return_t kr;
2760 kr = thread_set_voucher_name(MACH_PORT_NULL);
2761 assert(kr == KERN_SUCCESS);
2762
2763 workq_lock_spin(wq);
2764 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
2765 }
2766
2767 if (uth->uu_workq_flags & UT_WORKQ_RUNNING) {
2768 /*
2769 * While we'd dropped the lock to unset our voucher, someone came
2770 * around and made us runnable. But because we weren't waiting on the
2771 * event their thread_wakeup() was ineffectual. To correct for that,
2772 * we just run the continuation ourselves.
2773 */
2774 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0, 0);
2775 workq_select_threadreq_or_park_and_unlock(p, wq, uth);
2776 __builtin_unreachable();
2777 }
2778
2779 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
2780 workq_unpark_for_death_and_unlock(p, wq, uth,
2781 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE);
2782 __builtin_unreachable();
2783 }
2784
2785 thread_set_pending_block_hint(uth->uu_thread, kThreadWaitParkedWorkQueue);
2786 assert_wait(workq_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
2787 workq_unlock(wq);
2788 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0, 0);
2789 thread_block(workq_unpark_continue);
2790 __builtin_unreachable();
2791 }
2792
2793 static inline bool
2794 workq_may_start_event_mgr_thread(struct workqueue *wq, struct uthread *uth)
2795 {
2796 /*
2797 * There's an event manager request and either:
2798 * - no event manager currently running
2799 * - we are re-using the event manager
2800 */
2801 return wq->wq_thscheduled_count[_wq_bucket(WORKQ_THREAD_QOS_MANAGER)] == 0 ||
2802 (uth && uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER);
2803 }
2804
2805 static uint32_t
2806 workq_constrained_allowance(struct workqueue *wq, thread_qos_t at_qos,
2807 struct uthread *uth, bool may_start_timer)
2808 {
2809 assert(at_qos != WORKQ_THREAD_QOS_MANAGER);
2810 uint32_t count = 0;
2811
2812 uint32_t max_count = wq->wq_constrained_threads_scheduled;
2813 if (uth && (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) == 0) {
2814 /*
2815 * don't count the current thread as scheduled
2816 */
2817 assert(max_count > 0);
2818 max_count--;
2819 }
2820 if (max_count >= wq_max_constrained_threads) {
2821 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 1,
2822 wq->wq_constrained_threads_scheduled,
2823 wq_max_constrained_threads, 0);
2824 /*
2825 * we need 1 or more constrained threads to return to the kernel before
2826 * we can dispatch additional work
2827 */
2828 return 0;
2829 }
2830 max_count -= wq_max_constrained_threads;
2831
2832 /*
2833 * Compute a metric for many how many threads are active. We find the
2834 * highest priority request outstanding and then add up the number of
2835 * active threads in that and all higher-priority buckets. We'll also add
2836 * any "busy" threads which are not active but blocked recently enough that
2837 * we can't be sure they've gone idle yet. We'll then compare this metric
2838 * to our max concurrency to decide whether to add a new thread.
2839 */
2840
2841 uint32_t busycount, thactive_count;
2842
2843 thactive_count = _wq_thactive_aggregate_downto_qos(wq, _wq_thactive(wq),
2844 at_qos, &busycount, NULL);
2845
2846 if (uth && uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER &&
2847 at_qos <= uth->uu_workq_pri.qos_bucket) {
2848 /*
2849 * Don't count this thread as currently active, but only if it's not
2850 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
2851 * managers.
2852 */
2853 assert(thactive_count > 0);
2854 thactive_count--;
2855 }
2856
2857 count = wq_max_parallelism[_wq_bucket(at_qos)];
2858 if (count > thactive_count + busycount) {
2859 count -= thactive_count + busycount;
2860 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 2,
2861 thactive_count, busycount, 0);
2862 return MIN(count, max_count);
2863 } else {
2864 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 3,
2865 thactive_count, busycount, 0);
2866 }
2867
2868 if (busycount && may_start_timer) {
2869 /*
2870 * If this is called from the add timer, we won't have another timer
2871 * fire when the thread exits the "busy" state, so rearm the timer.
2872 */
2873 workq_schedule_delayed_thread_creation(wq, 0);
2874 }
2875
2876 return 0;
2877 }
2878
2879 static bool
2880 workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
2881 workq_threadreq_t req)
2882 {
2883 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
2884 return workq_may_start_event_mgr_thread(wq, uth);
2885 }
2886 if ((req->tr_flags & TR_FLAG_OVERCOMMIT) == 0) {
2887 return workq_constrained_allowance(wq, req->tr_qos, uth, true);
2888 }
2889 return true;
2890 }
2891
2892 static workq_threadreq_t
2893 workq_threadreq_select_for_creator(struct workqueue *wq)
2894 {
2895 workq_threadreq_t req_qos, req_pri, req_tmp;
2896 thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
2897 uint8_t pri = 0;
2898
2899 req_tmp = wq->wq_event_manager_threadreq;
2900 if (req_tmp && workq_may_start_event_mgr_thread(wq, NULL)) {
2901 return req_tmp;
2902 }
2903
2904 /*
2905 * Compute the best priority request, and ignore the turnstile for now
2906 */
2907
2908 req_pri = priority_queue_max(&wq->wq_special_queue,
2909 struct workq_threadreq_s, tr_entry);
2910 if (req_pri) {
2911 pri = priority_queue_entry_key(&wq->wq_special_queue, &req_pri->tr_entry);
2912 }
2913
2914 /*
2915 * Compute the best QoS Request, and check whether it beats the "pri" one
2916 */
2917
2918 req_qos = priority_queue_max(&wq->wq_overcommit_queue,
2919 struct workq_threadreq_s, tr_entry);
2920 if (req_qos) {
2921 qos = req_qos->tr_qos;
2922 }
2923
2924 req_tmp = priority_queue_max(&wq->wq_constrained_queue,
2925 struct workq_threadreq_s, tr_entry);
2926
2927 if (req_tmp && qos < req_tmp->tr_qos) {
2928 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
2929 return req_pri;
2930 }
2931
2932 if (workq_constrained_allowance(wq, req_tmp->tr_qos, NULL, true)) {
2933 /*
2934 * If the constrained thread request is the best one and passes
2935 * the admission check, pick it.
2936 */
2937 return req_tmp;
2938 }
2939 }
2940
2941 if (pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
2942 return req_pri;
2943 }
2944
2945 if (req_qos) {
2946 return req_qos;
2947 }
2948
2949 /*
2950 * If we had no eligible request but we have a turnstile push,
2951 * it must be a non overcommit thread request that failed
2952 * the admission check.
2953 *
2954 * Just fake a BG thread request so that if the push stops the creator
2955 * priority just drops to 4.
2956 */
2957 if (turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, NULL)) {
2958 static struct workq_threadreq_s workq_sync_push_fake_req = {
2959 .tr_qos = THREAD_QOS_BACKGROUND,
2960 };
2961
2962 return &workq_sync_push_fake_req;
2963 }
2964
2965 return NULL;
2966 }
2967
2968 static workq_threadreq_t
2969 workq_threadreq_select(struct workqueue *wq, struct uthread *uth)
2970 {
2971 workq_threadreq_t req_qos, req_pri, req_tmp;
2972 uintptr_t proprietor;
2973 thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
2974 uint8_t pri = 0;
2975
2976 if (uth == wq->wq_creator) {
2977 uth = NULL;
2978 }
2979
2980 req_tmp = wq->wq_event_manager_threadreq;
2981 if (req_tmp && workq_may_start_event_mgr_thread(wq, uth)) {
2982 return req_tmp;
2983 }
2984
2985 /*
2986 * Compute the best priority request (special or turnstile)
2987 */
2988
2989 pri = turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile,
2990 &proprietor);
2991 if (pri) {
2992 struct kqworkloop *kqwl = (struct kqworkloop *)proprietor;
2993 req_pri = &kqwl->kqwl_request.kqr_req;
2994 if (req_pri->tr_state != TR_STATE_QUEUED) {
2995 panic("Invalid thread request (%p) state %d",
2996 req_pri, req_pri->tr_state);
2997 }
2998 } else {
2999 req_pri = NULL;
3000 }
3001
3002 req_tmp = priority_queue_max(&wq->wq_special_queue,
3003 struct workq_threadreq_s, tr_entry);
3004 if (req_tmp && pri < priority_queue_entry_key(&wq->wq_special_queue,
3005 &req_tmp->tr_entry)) {
3006 req_pri = req_tmp;
3007 pri = priority_queue_entry_key(&wq->wq_special_queue, &req_tmp->tr_entry);
3008 }
3009
3010 /*
3011 * Compute the best QoS Request, and check whether it beats the "pri" one
3012 */
3013
3014 req_qos = priority_queue_max(&wq->wq_overcommit_queue,
3015 struct workq_threadreq_s, tr_entry);
3016 if (req_qos) {
3017 qos = req_qos->tr_qos;
3018 }
3019
3020 req_tmp = priority_queue_max(&wq->wq_constrained_queue,
3021 struct workq_threadreq_s, tr_entry);
3022
3023 if (req_tmp && qos < req_tmp->tr_qos) {
3024 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
3025 return req_pri;
3026 }
3027
3028 if (workq_constrained_allowance(wq, req_tmp->tr_qos, uth, true)) {
3029 /*
3030 * If the constrained thread request is the best one and passes
3031 * the admission check, pick it.
3032 */
3033 return req_tmp;
3034 }
3035 }
3036
3037 if (req_pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
3038 return req_pri;
3039 }
3040
3041 return req_qos;
3042 }
3043
3044 /*
3045 * The creator is an anonymous thread that is counted as scheduled,
3046 * but otherwise without its scheduler callback set or tracked as active
3047 * that is used to make other threads.
3048 *
3049 * When more requests are added or an existing one is hurried along,
3050 * a creator is elected and setup, or the existing one overridden accordingly.
3051 *
3052 * While this creator is in flight, because no request has been dequeued,
3053 * already running threads have a chance at stealing thread requests avoiding
3054 * useless context switches, and the creator once scheduled may not find any
3055 * work to do and will then just park again.
3056 *
3057 * The creator serves the dual purpose of informing the scheduler of work that
3058 * hasn't be materialized as threads yet, and also as a natural pacing mechanism
3059 * for thread creation.
3060 *
3061 * By being anonymous (and not bound to anything) it means that thread requests
3062 * can be stolen from this creator by threads already on core yielding more
3063 * efficient scheduling and reduced context switches.
3064 */
3065 static void
3066 workq_schedule_creator(proc_t p, struct workqueue *wq, int flags)
3067 {
3068 workq_threadreq_t req;
3069 struct uthread *uth;
3070
3071 workq_lock_held(wq);
3072 assert(p || (flags & WORKQ_THREADREQ_CAN_CREATE_THREADS) == 0);
3073
3074 again:
3075 uth = wq->wq_creator;
3076
3077 if (!wq->wq_reqcount) {
3078 if (uth == NULL) {
3079 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
3080 }
3081 return;
3082 }
3083
3084 req = workq_threadreq_select_for_creator(wq);
3085 if (req == NULL) {
3086 if (flags & WORKQ_THREADREQ_CREATOR_SYNC_UPDATE) {
3087 assert((flags & WORKQ_THREADREQ_CREATOR_TRANSFER) == 0);
3088 /*
3089 * turnstile propagation code is reaching out to us,
3090 * and we still don't want to do anything, do not recurse.
3091 */
3092 } else {
3093 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
3094 }
3095 return;
3096 }
3097
3098 if (uth) {
3099 /*
3100 * We need to maybe override the creator we already have
3101 */
3102 if (workq_thread_needs_priority_change(req, uth)) {
3103 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
3104 wq, 1, thread_tid(uth->uu_thread), req->tr_qos, 0);
3105 workq_thread_reset_pri(wq, uth, req);
3106 }
3107 } else if (wq->wq_thidlecount) {
3108 /*
3109 * We need to unpark a creator thread
3110 */
3111 wq->wq_creator = uth = workq_pop_idle_thread(wq);
3112 if (workq_thread_needs_priority_change(req, uth)) {
3113 workq_thread_reset_pri(wq, uth, req);
3114 }
3115 workq_turnstile_update_inheritor(wq, uth->uu_thread,
3116 TURNSTILE_INHERITOR_THREAD);
3117 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
3118 wq, 2, thread_tid(uth->uu_thread), req->tr_qos, 0);
3119 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
3120 uth->uu_save.uus_workq_park_data.yields = 0;
3121 workq_thread_wakeup(uth);
3122 } else {
3123 /*
3124 * We need to allocate a thread...
3125 */
3126 if (__improbable(wq->wq_nthreads >= wq_max_threads)) {
3127 /* out of threads, just go away */
3128 } else if (flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) {
3129 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ);
3130 } else if (!(flags & WORKQ_THREADREQ_CAN_CREATE_THREADS)) {
3131 /* This can drop the workqueue lock, and take it again */
3132 workq_schedule_immediate_thread_creation(wq);
3133 } else if (workq_add_new_idle_thread(p, wq)) {
3134 goto again;
3135 } else {
3136 workq_schedule_delayed_thread_creation(wq, 0);
3137 }
3138
3139 if (flags & WORKQ_THREADREQ_CREATOR_TRANSFER) {
3140 /*
3141 * workq_schedule_creator() failed at creating a thread,
3142 * and the responsibility of redriving is now with a thread-call.
3143 *
3144 * We still need to tell the turnstile the previous creator is gone.
3145 */
3146 workq_turnstile_update_inheritor(wq, NULL, 0);
3147 }
3148 }
3149 }
3150
3151 /**
3152 * Runs a thread request on a thread
3153 *
3154 * - if thread is THREAD_NULL, will find a thread and run the request there.
3155 * Otherwise, the thread must be the current thread.
3156 *
3157 * - if req is NULL, will find the highest priority request and run that. If
3158 * it is not NULL, it must be a threadreq object in state NEW. If it can not
3159 * be run immediately, it will be enqueued and moved to state QUEUED.
3160 *
3161 * Either way, the thread request object serviced will be moved to state
3162 * BINDING and attached to the uthread.
3163 *
3164 * Should be called with the workqueue lock held. Will drop it.
3165 */
3166 __attribute__((noreturn, noinline))
3167 static void
3168 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
3169 struct uthread *uth)
3170 {
3171 uint32_t setup_flags = 0;
3172 workq_threadreq_t req;
3173
3174 if (uth->uu_workq_flags & UT_WORKQ_EARLY_BOUND) {
3175 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
3176 setup_flags |= WQ_SETUP_FIRST_USE;
3177 }
3178 uth->uu_workq_flags &= ~(UT_WORKQ_NEW | UT_WORKQ_EARLY_BOUND);
3179 /*
3180 * This pointer is possibly freed and only used for tracing purposes.
3181 */
3182 req = uth->uu_save.uus_workq_park_data.thread_request;
3183 workq_unlock(wq);
3184 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
3185 VM_KERNEL_ADDRHIDE(req), 0, 0, 0);
3186 goto run;
3187 } else if (_wq_exiting(wq)) {
3188 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 0, 0, 0, 0);
3189 } else if (wq->wq_reqcount == 0) {
3190 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 1, 0, 0, 0);
3191 } else if ((req = workq_threadreq_select(wq, uth)) == NULL) {
3192 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 2, 0, 0, 0);
3193 } else {
3194 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
3195 workq_trace_req_id(req), 0, 0, 0);
3196 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
3197 uth->uu_workq_flags ^= UT_WORKQ_NEW;
3198 setup_flags |= WQ_SETUP_FIRST_USE;
3199 }
3200 workq_thread_reset_cpupercent(req, uth);
3201 workq_threadreq_bind_and_unlock(p, wq, req, uth);
3202 run:
3203 workq_setup_and_run(p, uth, setup_flags);
3204 __builtin_unreachable();
3205 }
3206
3207 workq_park_and_unlock(p, wq, uth);
3208 __builtin_unreachable();
3209 }
3210
3211 static bool
3212 workq_creator_should_yield(struct workqueue *wq, struct uthread *uth)
3213 {
3214 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
3215
3216 if (qos >= THREAD_QOS_USER_INTERACTIVE) {
3217 return false;
3218 }
3219
3220 uint32_t snapshot = uth->uu_save.uus_workq_park_data.fulfilled_snapshot;
3221 if (wq->wq_fulfilled == snapshot) {
3222 return false;
3223 }
3224
3225 uint32_t cnt = 0, conc = wq_max_parallelism[_wq_bucket(qos)];
3226 if (wq->wq_fulfilled - snapshot > conc) {
3227 /* we fulfilled more than NCPU requests since being dispatched */
3228 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 1,
3229 wq->wq_fulfilled, snapshot, 0);
3230 return true;
3231 }
3232
3233 for (int i = _wq_bucket(qos); i < WORKQ_NUM_QOS_BUCKETS; i++) {
3234 cnt += wq->wq_thscheduled_count[i];
3235 }
3236 if (conc <= cnt) {
3237 /* We fulfilled requests and have more than NCPU scheduled threads */
3238 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 2,
3239 wq->wq_fulfilled, snapshot, 0);
3240 return true;
3241 }
3242
3243 return false;
3244 }
3245
3246 /**
3247 * parked thread wakes up
3248 */
3249 __attribute__((noreturn, noinline))
3250 static void
3251 workq_unpark_continue(void *parameter __unused, wait_result_t wr __unused)
3252 {
3253 struct uthread *uth = current_uthread();
3254 proc_t p = current_proc();
3255 struct workqueue *wq = proc_get_wqptr_fast(p);
3256
3257 workq_lock_spin(wq);
3258
3259 if (wq->wq_creator == uth && workq_creator_should_yield(wq, uth)) {
3260 /*
3261 * If the number of threads we have out are able to keep up with the
3262 * demand, then we should avoid sending this creator thread to
3263 * userspace.
3264 */
3265 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
3266 uth->uu_save.uus_workq_park_data.yields++;
3267 workq_unlock(wq);
3268 thread_yield_with_continuation(workq_unpark_continue, NULL);
3269 __builtin_unreachable();
3270 }
3271
3272 if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) {
3273 workq_select_threadreq_or_park_and_unlock(p, wq, uth);
3274 __builtin_unreachable();
3275 }
3276
3277 if (__probable(wr == THREAD_AWAKENED)) {
3278 /*
3279 * We were set running, but for the purposes of dying.
3280 */
3281 assert(uth->uu_workq_flags & UT_WORKQ_DYING);
3282 assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0);
3283 } else {
3284 /*
3285 * workaround for <rdar://problem/38647347>,
3286 * in case we do hit userspace, make sure calling
3287 * workq_thread_terminate() does the right thing here,
3288 * and if we never call it, that workq_exit() will too because it sees
3289 * this thread on the runlist.
3290 */
3291 assert(wr == THREAD_INTERRUPTED);
3292 wq->wq_thdying_count++;
3293 uth->uu_workq_flags |= UT_WORKQ_DYING;
3294 }
3295
3296 workq_unpark_for_death_and_unlock(p, wq, uth,
3297 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE);
3298 __builtin_unreachable();
3299 }
3300
3301 __attribute__((noreturn, noinline))
3302 static void
3303 workq_setup_and_run(proc_t p, struct uthread *uth, int setup_flags)
3304 {
3305 thread_t th = uth->uu_thread;
3306 vm_map_t vmap = get_task_map(p->task);
3307
3308 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
3309 /*
3310 * For preemption reasons, we want to reset the voucher as late as
3311 * possible, so we do it in two places:
3312 * - Just before parking (i.e. in workq_park_and_unlock())
3313 * - Prior to doing the setup for the next workitem (i.e. here)
3314 *
3315 * Those two places are sufficient to ensure we always reset it before
3316 * it goes back out to user space, but be careful to not break that
3317 * guarantee.
3318 */
3319 __assert_only kern_return_t kr;
3320 kr = thread_set_voucher_name(MACH_PORT_NULL);
3321 assert(kr == KERN_SUCCESS);
3322 }
3323
3324 uint32_t upcall_flags = uth->uu_save.uus_workq_park_data.upcall_flags;
3325 if (!(setup_flags & WQ_SETUP_FIRST_USE)) {
3326 upcall_flags |= WQ_FLAG_THREAD_REUSE;
3327 }
3328
3329 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
3330 /*
3331 * For threads that have an outside-of-QoS thread priority, indicate
3332 * to userspace that setting QoS should only affect the TSD and not
3333 * change QOS in the kernel.
3334 */
3335 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
3336 } else {
3337 /*
3338 * Put the QoS class value into the lower bits of the reuse_thread
3339 * register, this is where the thread priority used to be stored
3340 * anyway.
3341 */
3342 upcall_flags |= uth->uu_save.uus_workq_park_data.qos |
3343 WQ_FLAG_THREAD_PRIO_QOS;
3344 }
3345
3346 if (uth->uu_workq_thport == MACH_PORT_NULL) {
3347 /* convert_thread_to_port() consumes a reference */
3348 thread_reference(th);
3349 ipc_port_t port = convert_thread_to_port(th);
3350 uth->uu_workq_thport = ipc_port_copyout_send(port, get_task_ipcspace(p->task));
3351 }
3352
3353 /*
3354 * Call out to pthread, this sets up the thread, pulls in kevent structs
3355 * onto the stack, sets up the thread state and then returns to userspace.
3356 */
3357 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_START,
3358 proc_get_wqptr_fast(p), 0, 0, 0, 0);
3359 thread_sched_call(th, workq_sched_callback);
3360 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
3361 uth->uu_workq_thport, 0, setup_flags, upcall_flags);
3362
3363 __builtin_unreachable();
3364 }
3365
3366 #pragma mark misc
3367
3368 int
3369 fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
3370 {
3371 struct workqueue *wq = proc_get_wqptr(p);
3372 int error = 0;
3373 int activecount;
3374
3375 if (wq == NULL) {
3376 return EINVAL;
3377 }
3378
3379 /*
3380 * This is sometimes called from interrupt context by the kperf sampler.
3381 * In that case, it's not safe to spin trying to take the lock since we
3382 * might already hold it. So, we just try-lock it and error out if it's
3383 * already held. Since this is just a debugging aid, and all our callers
3384 * are able to handle an error, that's fine.
3385 */
3386 bool locked = workq_lock_try(wq);
3387 if (!locked) {
3388 return EBUSY;
3389 }
3390
3391 wq_thactive_t act = _wq_thactive(wq);
3392 activecount = _wq_thactive_aggregate_downto_qos(wq, act,
3393 WORKQ_THREAD_QOS_MIN, NULL, NULL);
3394 if (act & _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER)) {
3395 activecount++;
3396 }
3397 pwqinfo->pwq_nthreads = wq->wq_nthreads;
3398 pwqinfo->pwq_runthreads = activecount;
3399 pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
3400 pwqinfo->pwq_state = 0;
3401
3402 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
3403 pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
3404 }
3405
3406 if (wq->wq_nthreads >= wq_max_threads) {
3407 pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
3408 }
3409
3410 workq_unlock(wq);
3411 return error;
3412 }
3413
3414 boolean_t
3415 workqueue_get_pwq_exceeded(void *v, boolean_t *exceeded_total,
3416 boolean_t *exceeded_constrained)
3417 {
3418 proc_t p = v;
3419 struct proc_workqueueinfo pwqinfo;
3420 int err;
3421
3422 assert(p != NULL);
3423 assert(exceeded_total != NULL);
3424 assert(exceeded_constrained != NULL);
3425
3426 err = fill_procworkqueue(p, &pwqinfo);
3427 if (err) {
3428 return FALSE;
3429 }
3430 if (!(pwqinfo.pwq_state & WQ_FLAGS_AVAILABLE)) {
3431 return FALSE;
3432 }
3433
3434 *exceeded_total = (pwqinfo.pwq_state & WQ_EXCEEDED_TOTAL_THREAD_LIMIT);
3435 *exceeded_constrained = (pwqinfo.pwq_state & WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT);
3436
3437 return TRUE;
3438 }
3439
3440 uint32_t
3441 workqueue_get_pwq_state_kdp(void * v)
3442 {
3443 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT << 17) ==
3444 kTaskWqExceededConstrainedThreadLimit);
3445 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT << 17) ==
3446 kTaskWqExceededTotalThreadLimit);
3447 static_assert((WQ_FLAGS_AVAILABLE << 17) == kTaskWqFlagsAvailable);
3448 static_assert((WQ_FLAGS_AVAILABLE | WQ_EXCEEDED_TOTAL_THREAD_LIMIT |
3449 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT) == 0x7);
3450
3451 if (v == NULL) {
3452 return 0;
3453 }
3454
3455 proc_t p = v;
3456 struct workqueue *wq = proc_get_wqptr(p);
3457
3458 if (wq == NULL || workq_lock_spin_is_acquired_kdp(wq)) {
3459 return 0;
3460 }
3461
3462 uint32_t pwq_state = WQ_FLAGS_AVAILABLE;
3463
3464 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
3465 pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
3466 }
3467
3468 if (wq->wq_nthreads >= wq_max_threads) {
3469 pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
3470 }
3471
3472 return pwq_state;
3473 }
3474
3475 void
3476 workq_init(void)
3477 {
3478 workq_lck_grp_attr = lck_grp_attr_alloc_init();
3479 workq_lck_attr = lck_attr_alloc_init();
3480 workq_lck_grp = lck_grp_alloc_init("workq", workq_lck_grp_attr);
3481
3482 workq_zone_workqueue = zinit(sizeof(struct workqueue),
3483 1024 * sizeof(struct workqueue), 8192, "workq.wq");
3484 workq_zone_threadreq = zinit(sizeof(struct workq_threadreq_s),
3485 1024 * sizeof(struct workq_threadreq_s), 8192, "workq.threadreq");
3486
3487 clock_interval_to_absolutetime_interval(wq_stalled_window.usecs,
3488 NSEC_PER_USEC, &wq_stalled_window.abstime);
3489 clock_interval_to_absolutetime_interval(wq_reduce_pool_window.usecs,
3490 NSEC_PER_USEC, &wq_reduce_pool_window.abstime);
3491 clock_interval_to_absolutetime_interval(wq_max_timer_interval.usecs,
3492 NSEC_PER_USEC, &wq_max_timer_interval.abstime);
3493 }