]> git.saurik.com Git - apple/libpthread.git/blame_incremental - kern/kern_support.c
libpthread-138.10.4.tar.gz
[apple/libpthread.git] / kern / kern_support.c
... / ...
CommitLineData
1/*
2 * Copyright (c) 2000-2012 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28/* Copyright (c) 1995-2005 Apple Computer, Inc. All Rights Reserved */
29/*
30 * pthread_synch.c
31 */
32
33#pragma mark - Front Matter
34
35#define _PTHREAD_CONDATTR_T
36#define _PTHREAD_COND_T
37#define _PTHREAD_MUTEXATTR_T
38#define _PTHREAD_MUTEX_T
39#define _PTHREAD_RWLOCKATTR_T
40#define _PTHREAD_RWLOCK_T
41
42#undef pthread_mutexattr_t
43#undef pthread_mutex_t
44#undef pthread_condattr_t
45#undef pthread_cond_t
46#undef pthread_rwlockattr_t
47#undef pthread_rwlock_t
48
49#include <sys/param.h>
50#include <sys/queue.h>
51#include <sys/resourcevar.h>
52//#include <sys/proc_internal.h>
53#include <sys/kauth.h>
54#include <sys/systm.h>
55#include <sys/timeb.h>
56#include <sys/times.h>
57#include <sys/acct.h>
58#include <sys/kernel.h>
59#include <sys/wait.h>
60#include <sys/signalvar.h>
61#include <sys/sysctl.h>
62#include <sys/syslog.h>
63#include <sys/stat.h>
64#include <sys/lock.h>
65#include <sys/kdebug.h>
66//#include <sys/sysproto.h>
67#include <sys/vm.h>
68#include <sys/user.h> /* for coredump */
69#include <sys/proc_info.h> /* for fill_procworkqueue */
70
71#include <mach/mach_port.h>
72#include <mach/mach_types.h>
73#include <mach/semaphore.h>
74#include <mach/sync_policy.h>
75#include <mach/task.h>
76#include <mach/vm_prot.h>
77#include <kern/kern_types.h>
78#include <kern/task.h>
79#include <kern/clock.h>
80#include <mach/kern_return.h>
81#include <kern/thread.h>
82#include <kern/sched_prim.h>
83#include <kern/kalloc.h>
84#include <kern/sched_prim.h> /* for thread_exception_return */
85#include <kern/processor.h>
86#include <kern/assert.h>
87#include <mach/mach_vm.h>
88#include <mach/mach_param.h>
89#include <mach/thread_status.h>
90#include <mach/thread_policy.h>
91#include <mach/message.h>
92#include <mach/port.h>
93//#include <vm/vm_protos.h>
94#include <vm/vm_fault.h>
95#include <vm/vm_map.h>
96#include <mach/thread_act.h> /* for thread_resume */
97#include <machine/machine_routines.h>
98#include <mach/shared_region.h>
99
100#include <libkern/OSAtomic.h>
101
102#include <sys/pthread_shims.h>
103#include "kern_internal.h"
104
105#if DEBUG
106#define kevent_qos_internal kevent_qos_internal_stub
107static int kevent_qos_internal_stub(__unused struct proc *p, __unused int fd,
108 __unused user_addr_t changelist, __unused int nchanges,
109 __unused user_addr_t eventlist, __unused int nevents,
110 __unused user_addr_t data_out, user_size_t *data_available,
111 __unused unsigned int flags, int32_t *retval){
112 if (data_available){
113 static int i = 0;
114 switch (i++ % 4) {
115 case 0:
116 case 2:
117 *data_available = *data_available / 2;
118 *retval = 4;
119 break;
120 case 1:
121 *data_available = 0;
122 *retval = 4;
123 break;
124 case 3:
125 *retval = 0;
126 break;
127 }
128 } else {
129 *retval = 0;
130 }
131 return 0;
132}
133#endif /* DEBUG */
134
135uint32_t pthread_debug_tracing = 1;
136
137SYSCTL_INT(_kern, OID_AUTO, pthread_debug_tracing, CTLFLAG_RW | CTLFLAG_LOCKED,
138 &pthread_debug_tracing, 0, "")
139
140// XXX: Dirty import for sys/signarvar.h that's wrapped in BSD_KERNEL_PRIVATE
141#define sigcantmask (sigmask(SIGKILL) | sigmask(SIGSTOP))
142
143lck_grp_attr_t *pthread_lck_grp_attr;
144lck_grp_t *pthread_lck_grp;
145lck_attr_t *pthread_lck_attr;
146
147extern void thread_set_cthreadself(thread_t thread, uint64_t pself, int isLP64);
148extern void workqueue_thread_yielded(void);
149
150enum run_nextreq_mode {RUN_NEXTREQ_DEFAULT, RUN_NEXTREQ_OVERCOMMIT, RUN_NEXTREQ_DEFERRED_OVERCOMMIT, RUN_NEXTREQ_UNCONSTRAINED, RUN_NEXTREQ_EVENT_MANAGER};
151static boolean_t workqueue_run_nextreq(proc_t p, struct workqueue *wq, thread_t th, enum run_nextreq_mode mode, pthread_priority_t oc_prio);
152
153static boolean_t workqueue_run_one(proc_t p, struct workqueue *wq, boolean_t overcommit, pthread_priority_t priority);
154
155static void wq_runreq(proc_t p, pthread_priority_t priority, thread_t th, struct threadlist *tl,
156 int reuse_thread, int wake_thread, int return_directly);
157
158static int _setup_wqthread(proc_t p, thread_t th, pthread_priority_t priority, int reuse_thread, struct threadlist *tl);
159
160static void wq_unpark_continue(void);
161static void wq_unsuspend_continue(void);
162
163static boolean_t workqueue_addnewthread(struct workqueue *wq, boolean_t ignore_constrained_thread_limit);
164static void workqueue_removethread(struct threadlist *tl, int fromexit);
165static void workqueue_lock_spin(proc_t);
166static void workqueue_unlock(proc_t);
167
168static boolean_t may_start_constrained_thread(struct workqueue *wq, uint32_t at_priclass, uint32_t my_priclass, boolean_t *start_timer);
169
170static mach_vm_offset_t stackaddr_hint(proc_t p);
171
172int proc_settargetconc(pid_t pid, int queuenum, int32_t targetconc);
173int proc_setalltargetconc(pid_t pid, int32_t * targetconcp);
174
175#define WQ_MAXPRI_MIN 0 /* low prio queue num */
176#define WQ_MAXPRI_MAX 2 /* max prio queuenum */
177#define WQ_PRI_NUM 3 /* number of prio work queues */
178
179#define C_32_STK_ALIGN 16
180#define C_64_STK_ALIGN 16
181#define C_64_REDZONE_LEN 128
182
183#define PTHREAD_T_OFFSET 0
184
185/*
186 * Flags filed passed to bsdthread_create and back in pthread_start
18731 <---------------------------------> 0
188_________________________________________
189| flags(8) | policy(8) | importance(16) |
190-----------------------------------------
191*/
192
193#define PTHREAD_START_CUSTOM 0x01000000
194#define PTHREAD_START_SETSCHED 0x02000000
195#define PTHREAD_START_DETACHED 0x04000000
196#define PTHREAD_START_QOSCLASS 0x08000000
197#define PTHREAD_START_QOSCLASS_MASK 0xffffff
198#define PTHREAD_START_POLICY_BITSHIFT 16
199#define PTHREAD_START_POLICY_MASK 0xff
200#define PTHREAD_START_IMPORTANCE_MASK 0xffff
201
202#define SCHED_OTHER POLICY_TIMESHARE
203#define SCHED_FIFO POLICY_FIFO
204#define SCHED_RR POLICY_RR
205
206#define BASEPRI_DEFAULT 31
207
208#pragma mark - Process/Thread Setup/Teardown syscalls
209
210static mach_vm_offset_t stackaddr_hint(proc_t p __unused){
211 mach_vm_offset_t stackaddr;
212#if defined(__i386__) || defined(__x86_64__)
213 if (proc_is64bit(p)){
214 // Above nanomalloc range (see NANOZONE_SIGNATURE)
215 stackaddr = 0x700000000000;
216 } else {
217 stackaddr = SHARED_REGION_BASE_I386 + SHARED_REGION_SIZE_I386;
218 }
219#elif defined(__arm__) || defined(__arm64__)
220 if (proc_is64bit(p)){
221 // 64 stacks below nanomalloc (see NANOZONE_SIGNATURE)
222 stackaddr = 0x170000000 - 64 * PTH_DEFAULT_STACKSIZE;
223#if defined(__arm__)
224 } else if (pthread_kern->map_is_1gb(get_task_map(pthread_kern->proc_get_task(p)))){
225 stackaddr = SHARED_REGION_BASE_ARM - 32 * PTH_DEFAULT_STACKSIZE;
226#endif
227 } else {
228 stackaddr = SHARED_REGION_BASE_ARM + SHARED_REGION_SIZE_ARM;
229 }
230#else
231#error Need to define a stack address hint for this architecture
232#endif
233 return stackaddr;
234}
235
236/**
237 * bsdthread_create system call. Used by pthread_create.
238 */
239int
240_bsdthread_create(struct proc *p, user_addr_t user_func, user_addr_t user_funcarg, user_addr_t user_stack, user_addr_t user_pthread, uint32_t flags, user_addr_t *retval)
241{
242 kern_return_t kret;
243 void * sright;
244 int error = 0;
245 int allocated = 0;
246 mach_vm_offset_t stackaddr;
247 mach_vm_size_t th_allocsize = 0;
248 mach_vm_size_t th_guardsize;
249 mach_vm_offset_t th_stack;
250 mach_vm_offset_t th_pthread;
251 mach_port_name_t th_thport;
252 thread_t th;
253 vm_map_t vmap = pthread_kern->current_map();
254 task_t ctask = current_task();
255 unsigned int policy, importance;
256
257 int isLP64 = 0;
258
259 if (pthread_kern->proc_get_register(p) == 0) {
260 return EINVAL;
261 }
262
263 PTHREAD_TRACE(TRACE_pthread_thread_create | DBG_FUNC_START, flags, 0, 0, 0, 0);
264
265 isLP64 = proc_is64bit(p);
266 th_guardsize = vm_map_page_size(vmap);
267
268 stackaddr = stackaddr_hint(p);
269 kret = pthread_kern->thread_create(ctask, &th);
270 if (kret != KERN_SUCCESS)
271 return(ENOMEM);
272 thread_reference(th);
273
274 sright = (void *)pthread_kern->convert_thread_to_port(th);
275 th_thport = pthread_kern->ipc_port_copyout_send(sright, pthread_kern->task_get_ipcspace(ctask));
276
277 if ((flags & PTHREAD_START_CUSTOM) == 0) {
278 mach_vm_size_t pthread_size =
279 vm_map_round_page_mask(pthread_kern->proc_get_pthsize(p) + PTHREAD_T_OFFSET, vm_map_page_mask(vmap));
280 th_allocsize = th_guardsize + user_stack + pthread_size;
281 user_stack += PTHREAD_T_OFFSET;
282
283 kret = mach_vm_map(vmap, &stackaddr,
284 th_allocsize,
285 page_size-1,
286 VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE , NULL,
287 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL,
288 VM_INHERIT_DEFAULT);
289 if (kret != KERN_SUCCESS){
290 kret = mach_vm_allocate(vmap,
291 &stackaddr, th_allocsize,
292 VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE);
293 }
294 if (kret != KERN_SUCCESS) {
295 error = ENOMEM;
296 goto out;
297 }
298
299 PTHREAD_TRACE(TRACE_pthread_thread_create|DBG_FUNC_NONE, th_allocsize, stackaddr, 0, 2, 0);
300
301 allocated = 1;
302 /*
303 * The guard page is at the lowest address
304 * The stack base is the highest address
305 */
306 kret = mach_vm_protect(vmap, stackaddr, th_guardsize, FALSE, VM_PROT_NONE);
307
308 if (kret != KERN_SUCCESS) {
309 error = ENOMEM;
310 goto out1;
311 }
312
313 th_pthread = stackaddr + th_guardsize + user_stack;
314 th_stack = th_pthread;
315
316 /*
317 * Pre-fault the first page of the new thread's stack and the page that will
318 * contain the pthread_t structure.
319 */
320 if (vm_map_trunc_page_mask((vm_map_offset_t)(th_stack - C_64_REDZONE_LEN), vm_map_page_mask(vmap)) !=
321 vm_map_trunc_page_mask((vm_map_offset_t)th_pthread, vm_map_page_mask(vmap))){
322 vm_fault( vmap,
323 vm_map_trunc_page_mask((vm_map_offset_t)(th_stack - C_64_REDZONE_LEN), vm_map_page_mask(vmap)),
324 VM_PROT_READ | VM_PROT_WRITE,
325 FALSE,
326 THREAD_UNINT, NULL, 0);
327 }
328
329 vm_fault( vmap,
330 vm_map_trunc_page_mask((vm_map_offset_t)th_pthread, vm_map_page_mask(vmap)),
331 VM_PROT_READ | VM_PROT_WRITE,
332 FALSE,
333 THREAD_UNINT, NULL, 0);
334
335 } else {
336 th_stack = user_stack;
337 th_pthread = user_pthread;
338
339 PTHREAD_TRACE(TRACE_pthread_thread_create|DBG_FUNC_NONE, 0, 0, 0, 3, 0);
340 }
341
342#if defined(__i386__) || defined(__x86_64__)
343 /*
344 * Set up i386 registers & function call.
345 */
346 if (isLP64 == 0) {
347 x86_thread_state32_t state = {
348 .eip = (unsigned int)pthread_kern->proc_get_threadstart(p),
349 .eax = (unsigned int)th_pthread,
350 .ebx = (unsigned int)th_thport,
351 .ecx = (unsigned int)user_func,
352 .edx = (unsigned int)user_funcarg,
353 .edi = (unsigned int)user_stack,
354 .esi = (unsigned int)flags,
355 /*
356 * set stack pointer
357 */
358 .esp = (int)((vm_offset_t)(th_stack-C_32_STK_ALIGN))
359 };
360
361 error = pthread_kern->thread_set_wq_state32(th, (thread_state_t)&state);
362 if (error != KERN_SUCCESS) {
363 error = EINVAL;
364 goto out;
365 }
366 } else {
367 x86_thread_state64_t state64 = {
368 .rip = (uint64_t)pthread_kern->proc_get_threadstart(p),
369 .rdi = (uint64_t)th_pthread,
370 .rsi = (uint64_t)(th_thport),
371 .rdx = (uint64_t)user_func,
372 .rcx = (uint64_t)user_funcarg,
373 .r8 = (uint64_t)user_stack,
374 .r9 = (uint64_t)flags,
375 /*
376 * set stack pointer aligned to 16 byte boundary
377 */
378 .rsp = (uint64_t)(th_stack - C_64_REDZONE_LEN)
379 };
380
381 error = pthread_kern->thread_set_wq_state64(th, (thread_state_t)&state64);
382 if (error != KERN_SUCCESS) {
383 error = EINVAL;
384 goto out;
385 }
386
387 }
388#elif defined(__arm__)
389 arm_thread_state_t state = {
390 .pc = (int)pthread_kern->proc_get_threadstart(p),
391 .r[0] = (unsigned int)th_pthread,
392 .r[1] = (unsigned int)th_thport,
393 .r[2] = (unsigned int)user_func,
394 .r[3] = (unsigned int)user_funcarg,
395 .r[4] = (unsigned int)user_stack,
396 .r[5] = (unsigned int)flags,
397
398 /* Set r7 & lr to 0 for better back tracing */
399 .r[7] = 0,
400 .lr = 0,
401
402 /*
403 * set stack pointer
404 */
405 .sp = (int)((vm_offset_t)(th_stack-C_32_STK_ALIGN))
406 };
407
408 (void) pthread_kern->thread_set_wq_state32(th, (thread_state_t)&state);
409
410#else
411#error bsdthread_create not defined for this architecture
412#endif
413
414 if ((flags & PTHREAD_START_SETSCHED) != 0) {
415 /* Set scheduling parameters if needed */
416 thread_extended_policy_data_t extinfo;
417 thread_precedence_policy_data_t precedinfo;
418
419 importance = (flags & PTHREAD_START_IMPORTANCE_MASK);
420 policy = (flags >> PTHREAD_START_POLICY_BITSHIFT) & PTHREAD_START_POLICY_MASK;
421
422 if (policy == SCHED_OTHER) {
423 extinfo.timeshare = 1;
424 } else {
425 extinfo.timeshare = 0;
426 }
427
428 thread_policy_set(th, THREAD_EXTENDED_POLICY, (thread_policy_t)&extinfo, THREAD_EXTENDED_POLICY_COUNT);
429
430 precedinfo.importance = (importance - BASEPRI_DEFAULT);
431 thread_policy_set(th, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
432 } else if ((flags & PTHREAD_START_QOSCLASS) != 0) {
433 /* Set thread QoS class if requested. */
434 pthread_priority_t priority = (pthread_priority_t)(flags & PTHREAD_START_QOSCLASS_MASK);
435
436 thread_qos_policy_data_t qos;
437 qos.qos_tier = pthread_priority_get_qos_class(priority);
438 qos.tier_importance = (qos.qos_tier == QOS_CLASS_UNSPECIFIED) ? 0 :
439 _pthread_priority_get_relpri(priority);
440
441 pthread_kern->thread_policy_set_internal(th, THREAD_QOS_POLICY, (thread_policy_t)&qos, THREAD_QOS_POLICY_COUNT);
442 }
443
444 kret = pthread_kern->thread_resume(th);
445 if (kret != KERN_SUCCESS) {
446 error = EINVAL;
447 goto out1;
448 }
449 thread_deallocate(th); /* drop the creator reference */
450
451 PTHREAD_TRACE(TRACE_pthread_thread_create|DBG_FUNC_END, error, th_pthread, 0, 0, 0);
452
453 // cast required as mach_vm_offset_t is always 64 bits even on 32-bit platforms
454 *retval = (user_addr_t)th_pthread;
455
456 return(0);
457
458out1:
459 if (allocated != 0) {
460 (void)mach_vm_deallocate(vmap, stackaddr, th_allocsize);
461 }
462out:
463 (void)pthread_kern->mach_port_deallocate(pthread_kern->task_get_ipcspace(ctask), th_thport);
464 (void)thread_terminate(th);
465 (void)thread_deallocate(th);
466 return(error);
467}
468
469/**
470 * bsdthread_terminate system call. Used by pthread_terminate
471 */
472int
473_bsdthread_terminate(__unused struct proc *p,
474 user_addr_t stackaddr,
475 size_t size,
476 uint32_t kthport,
477 uint32_t sem,
478 __unused int32_t *retval)
479{
480 mach_vm_offset_t freeaddr;
481 mach_vm_size_t freesize;
482 kern_return_t kret;
483
484 freeaddr = (mach_vm_offset_t)stackaddr;
485 freesize = size;
486
487 PTHREAD_TRACE(TRACE_pthread_thread_terminate|DBG_FUNC_START, freeaddr, freesize, kthport, 0xff, 0);
488
489 if ((freesize != (mach_vm_size_t)0) && (freeaddr != (mach_vm_offset_t)0)) {
490 kret = mach_vm_deallocate(pthread_kern->current_map(), freeaddr, freesize);
491 if (kret != KERN_SUCCESS) {
492 PTHREAD_TRACE(TRACE_pthread_thread_terminate|DBG_FUNC_END, kret, 0, 0, 0, 0);
493 return(EINVAL);
494 }
495 }
496
497 (void) thread_terminate(current_thread());
498 if (sem != MACH_PORT_NULL) {
499 kret = pthread_kern->semaphore_signal_internal_trap(sem);
500 if (kret != KERN_SUCCESS) {
501 PTHREAD_TRACE(TRACE_pthread_thread_terminate|DBG_FUNC_END, kret, 0, 0, 0, 0);
502 return(EINVAL);
503 }
504 }
505
506 if (kthport != MACH_PORT_NULL) {
507 pthread_kern->mach_port_deallocate(pthread_kern->task_get_ipcspace(current_task()), kthport);
508 }
509
510 PTHREAD_TRACE(TRACE_pthread_thread_terminate|DBG_FUNC_END, 0, 0, 0, 0, 0);
511
512 pthread_kern->thread_exception_return();
513 panic("bsdthread_terminate: still running\n");
514
515 PTHREAD_TRACE(TRACE_pthread_thread_terminate|DBG_FUNC_END, 0, 0xff, 0, 0, 0);
516
517 return(0);
518}
519
520/**
521 * bsdthread_register system call. Performs per-process setup. Responsible for
522 * returning capabilitiy bits to userspace and receiving userspace function addresses.
523 */
524int
525_bsdthread_register(struct proc *p,
526 user_addr_t threadstart,
527 user_addr_t wqthread,
528 int pthsize,
529 user_addr_t pthread_init_data,
530 user_addr_t targetconc_ptr,
531 uint64_t dispatchqueue_offset,
532 int32_t *retval)
533{
534 /* prevent multiple registrations */
535 if (pthread_kern->proc_get_register(p) != 0) {
536 return(EINVAL);
537 }
538 /* syscall randomizer test can pass bogus values */
539 if (pthsize < 0 || pthsize > MAX_PTHREAD_SIZE) {
540 return(EINVAL);
541 }
542 pthread_kern->proc_set_threadstart(p, threadstart);
543 pthread_kern->proc_set_wqthread(p, wqthread);
544 pthread_kern->proc_set_pthsize(p, pthsize);
545 pthread_kern->proc_set_register(p);
546
547 /* if we have pthread_init_data, then we use that and target_concptr (which is an offset) get data. */
548 if (pthread_init_data != 0) {
549 thread_qos_policy_data_t qos;
550
551 struct _pthread_registration_data data;
552 size_t pthread_init_sz = MIN(sizeof(struct _pthread_registration_data), (size_t)targetconc_ptr);
553
554 kern_return_t kr = copyin(pthread_init_data, &data, pthread_init_sz);
555 if (kr != KERN_SUCCESS) {
556 return EINVAL;
557 }
558
559 /* Incoming data from the data structure */
560 pthread_kern->proc_set_dispatchqueue_offset(p, data.dispatch_queue_offset);
561
562 /* Outgoing data that userspace expects as a reply */
563 if (pthread_kern->qos_main_thread_active()) {
564 mach_msg_type_number_t nqos = THREAD_QOS_POLICY_COUNT;
565 boolean_t gd = FALSE;
566
567 kr = pthread_kern->thread_policy_get(current_thread(), THREAD_QOS_POLICY, (thread_policy_t)&qos, &nqos, &gd);
568 if (kr != KERN_SUCCESS || qos.qos_tier == THREAD_QOS_UNSPECIFIED) {
569 /* Unspecified threads means the kernel wants us to impose legacy upon the thread. */
570 qos.qos_tier = THREAD_QOS_LEGACY;
571 qos.tier_importance = 0;
572
573 kr = pthread_kern->thread_policy_set_internal(current_thread(), THREAD_QOS_POLICY, (thread_policy_t)&qos, THREAD_QOS_POLICY_COUNT);
574 }
575
576 if (kr == KERN_SUCCESS) {
577 data.main_qos = pthread_qos_class_get_priority(qos.qos_tier);
578 } else {
579 data.main_qos = _pthread_priority_make_newest(QOS_CLASS_UNSPECIFIED, 0, 0);
580 }
581 } else {
582 data.main_qos = _pthread_priority_make_newest(QOS_CLASS_UNSPECIFIED, 0, 0);
583 }
584
585 kr = copyout(&data, pthread_init_data, pthread_init_sz);
586 if (kr != KERN_SUCCESS) {
587 return EINVAL;
588 }
589 } else {
590 pthread_kern->proc_set_dispatchqueue_offset(p, dispatchqueue_offset);
591 pthread_kern->proc_set_targconc(p, targetconc_ptr);
592 }
593
594 /* return the supported feature set as the return value. */
595 *retval = PTHREAD_FEATURE_SUPPORTED;
596
597 return(0);
598}
599
600#pragma mark - QoS Manipulation
601
602int
603_bsdthread_ctl_set_qos(struct proc *p, user_addr_t __unused cmd, mach_port_name_t kport, user_addr_t tsd_priority_addr, user_addr_t arg3, int *retval)
604{
605 kern_return_t kr;
606 thread_t th;
607
608 pthread_priority_t priority;
609
610 /* Unused parameters must be zero. */
611 if (arg3 != 0) {
612 return EINVAL;
613 }
614
615 /* QoS is stored in a given slot in the pthread TSD. We need to copy that in and set our QoS based on it. */
616 if (proc_is64bit(p)) {
617 uint64_t v;
618 kr = copyin(tsd_priority_addr, &v, sizeof(v));
619 if (kr != KERN_SUCCESS) {
620 return kr;
621 }
622 priority = (int)(v & 0xffffffff);
623 } else {
624 uint32_t v;
625 kr = copyin(tsd_priority_addr, &v, sizeof(v));
626 if (kr != KERN_SUCCESS) {
627 return kr;
628 }
629 priority = v;
630 }
631
632 if ((th = port_name_to_thread(kport)) == THREAD_NULL) {
633 return ESRCH;
634 }
635
636 /* <rdar://problem/16211829> Disable pthread_set_qos_class_np() on threads other than pthread_self */
637 if (th != current_thread()) {
638 thread_deallocate(th);
639 return EPERM;
640 }
641
642 int rv = _bsdthread_ctl_set_self(p, 0, priority, 0, _PTHREAD_SET_SELF_QOS_FLAG, retval);
643
644 /* Static param the thread, we just set QoS on it, so its stuck in QoS land now. */
645 /* pthread_kern->thread_static_param(th, TRUE); */ // see <rdar://problem/16433744>, for details
646
647 thread_deallocate(th);
648
649 return rv;
650}
651
652static inline struct threadlist *
653util_get_thread_threadlist_entry(thread_t th)
654{
655 struct uthread *uth = pthread_kern->get_bsdthread_info(th);
656 if (uth) {
657 struct threadlist *tl = pthread_kern->uthread_get_threadlist(uth);
658 return tl;
659 }
660 return NULL;
661}
662
663static inline void
664wq_thread_override_reset(thread_t th, user_addr_t resource)
665{
666 struct uthread *uth = pthread_kern->get_bsdthread_info(th);
667 struct threadlist *tl = pthread_kern->uthread_get_threadlist(uth);
668
669 if (tl) {
670 /*
671 * Drop all outstanding overrides on this thread, done outside the wq lock
672 * because proc_usynch_thread_qos_remove_override_for_resource takes a spinlock that
673 * could cause us to panic.
674 */
675 PTHREAD_TRACE(TRACE_wq_override_reset | DBG_FUNC_NONE, tl->th_workq, 0, 0, 0, 0);
676
677 pthread_kern->proc_usynch_thread_qos_reset_override_for_resource(current_task(), uth, 0, resource, THREAD_QOS_OVERRIDE_TYPE_DISPATCH_ASYNCHRONOUS_OVERRIDE);
678 }
679}
680
681int
682_bsdthread_ctl_set_self(struct proc *p, user_addr_t __unused cmd, pthread_priority_t priority, mach_port_name_t voucher, _pthread_set_flags_t flags, int __unused *retval)
683{
684 thread_qos_policy_data_t qos;
685 mach_msg_type_number_t nqos = THREAD_QOS_POLICY_COUNT;
686 boolean_t gd = FALSE;
687
688 kern_return_t kr;
689 int qos_rv = 0, voucher_rv = 0, fixedpri_rv = 0;
690
691 if ((flags & _PTHREAD_SET_SELF_QOS_FLAG) != 0) {
692 kr = pthread_kern->thread_policy_get(current_thread(), THREAD_QOS_POLICY, (thread_policy_t)&qos, &nqos, &gd);
693 if (kr != KERN_SUCCESS) {
694 qos_rv = EINVAL;
695 goto voucher;
696 }
697
698 /* If we have main-thread QoS then we don't allow a thread to come out of QOS_CLASS_UNSPECIFIED. */
699 if (pthread_kern->qos_main_thread_active() && qos.qos_tier == THREAD_QOS_UNSPECIFIED) {
700 qos_rv = EPERM;
701 goto voucher;
702 }
703
704 /* Get the work queue for tracing, also the threadlist for bucket manipluation. */
705 struct workqueue *wq = NULL;
706 struct threadlist *tl = util_get_thread_threadlist_entry(current_thread());
707 if (tl) {
708 wq = tl->th_workq;
709 }
710
711 PTHREAD_TRACE(TRACE_pthread_set_qos_self | DBG_FUNC_START, wq, qos.qos_tier, qos.tier_importance, 0, 0);
712
713 qos.qos_tier = pthread_priority_get_qos_class(priority);
714 qos.tier_importance = (qos.qos_tier == QOS_CLASS_UNSPECIFIED) ? 0 : _pthread_priority_get_relpri(priority);
715
716 kr = pthread_kern->thread_policy_set_internal(current_thread(), THREAD_QOS_POLICY, (thread_policy_t)&qos, THREAD_QOS_POLICY_COUNT);
717 if (kr != KERN_SUCCESS) {
718 qos_rv = EINVAL;
719 goto voucher;
720 }
721
722 /* If we're a workqueue, the threadlist item priority needs adjusting, along with the bucket we were running in. */
723 if (tl) {
724 workqueue_lock_spin(p);
725
726 /* Fix up counters. */
727 uint8_t old_bucket = tl->th_priority;
728 uint8_t new_bucket = pthread_priority_get_class_index(priority);
729
730 uint32_t old_active = OSAddAtomic(-1, &wq->wq_thactive_count[old_bucket]);
731 OSAddAtomic(1, &wq->wq_thactive_count[new_bucket]);
732
733 wq->wq_thscheduled_count[old_bucket]--;
734 wq->wq_thscheduled_count[new_bucket]++;
735
736 tl->th_priority = new_bucket;
737
738 /* If we were at the ceiling of non-overcommitted threads for a given bucket, we have to
739 * reevaluate whether we should start more work.
740 */
741 if (old_active == wq->wq_reqconc[old_bucket]) {
742 /* workqueue_run_nextreq will drop the workqueue lock in all exit paths. */
743 (void)workqueue_run_nextreq(p, wq, THREAD_NULL, RUN_NEXTREQ_DEFAULT, 0);
744 } else {
745 workqueue_unlock(p);
746 }
747 }
748
749 PTHREAD_TRACE(TRACE_pthread_set_qos_self | DBG_FUNC_END, wq, qos.qos_tier, qos.tier_importance, 0, 0);
750 }
751
752voucher:
753 if ((flags & _PTHREAD_SET_SELF_VOUCHER_FLAG) != 0) {
754 kr = pthread_kern->thread_set_voucher_name(voucher);
755 if (kr != KERN_SUCCESS) {
756 voucher_rv = ENOENT;
757 goto fixedpri;
758 }
759 }
760
761fixedpri:
762 if ((flags & _PTHREAD_SET_SELF_FIXEDPRIORITY_FLAG) != 0) {
763 thread_extended_policy_data_t extpol = {.timeshare = 0};
764 thread_t thread = current_thread();
765
766 struct threadlist *tl = util_get_thread_threadlist_entry(thread);
767 if (tl) {
768 /* Not allowed on workqueue threads */
769 fixedpri_rv = ENOTSUP;
770 goto done;
771 }
772
773 kr = pthread_kern->thread_policy_set_internal(thread, THREAD_EXTENDED_POLICY, (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
774 if (kr != KERN_SUCCESS) {
775 fixedpri_rv = EINVAL;
776 goto done;
777 }
778 } else if ((flags & _PTHREAD_SET_SELF_TIMESHARE_FLAG) != 0) {
779 thread_extended_policy_data_t extpol = {.timeshare = 1};
780 thread_t thread = current_thread();
781
782 struct threadlist *tl = util_get_thread_threadlist_entry(thread);
783 if (tl) {
784 /* Not allowed on workqueue threads */
785 fixedpri_rv = ENOTSUP;
786 goto done;
787 }
788
789 kr = pthread_kern->thread_policy_set_internal(thread, THREAD_EXTENDED_POLICY, (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
790 if (kr != KERN_SUCCESS) {
791 fixedpri_rv = EINVAL;
792 goto done;
793 }
794 }
795
796done:
797 if (qos_rv && voucher_rv) {
798 /* Both failed, give that a unique error. */
799 return EBADMSG;
800 }
801
802 if (qos_rv) {
803 return qos_rv;
804 }
805
806 if (voucher_rv) {
807 return voucher_rv;
808 }
809
810 if (fixedpri_rv) {
811 return fixedpri_rv;
812 }
813
814 return 0;
815}
816
817int
818_bsdthread_ctl_qos_override_start(struct proc __unused *p, user_addr_t __unused cmd, mach_port_name_t kport, pthread_priority_t priority, user_addr_t resource, int __unused *retval)
819{
820 thread_t th;
821 int rv = 0;
822
823 if ((th = port_name_to_thread(kport)) == THREAD_NULL) {
824 return ESRCH;
825 }
826
827 struct uthread *uth = pthread_kern->get_bsdthread_info(th);
828 int override_qos = pthread_priority_get_qos_class(priority);
829
830 struct threadlist *tl = util_get_thread_threadlist_entry(th);
831 if (tl) {
832 PTHREAD_TRACE(TRACE_wq_override_start | DBG_FUNC_NONE, tl->th_workq, thread_tid(th), 1, priority, 0);
833 }
834
835 /* The only failure case here is if we pass a tid and have it lookup the thread, we pass the uthread, so this all always succeeds. */
836 pthread_kern->proc_usynch_thread_qos_add_override_for_resource(current_task(), uth, 0, override_qos, TRUE, resource, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
837
838 thread_deallocate(th);
839 return rv;
840}
841
842int
843_bsdthread_ctl_qos_override_end(struct proc __unused *p, user_addr_t __unused cmd, mach_port_name_t kport, user_addr_t resource, user_addr_t arg3, int __unused *retval)
844{
845 thread_t th;
846 int rv = 0;
847
848 if (arg3 != 0) {
849 return EINVAL;
850 }
851
852 if ((th = port_name_to_thread(kport)) == THREAD_NULL) {
853 return ESRCH;
854 }
855
856 struct uthread *uth = pthread_kern->get_bsdthread_info(th);
857
858 struct threadlist *tl = util_get_thread_threadlist_entry(th);
859 if (tl) {
860 PTHREAD_TRACE(TRACE_wq_override_end | DBG_FUNC_NONE, tl->th_workq, thread_tid(th), 0, 0, 0);
861 }
862
863 pthread_kern->proc_usynch_thread_qos_remove_override_for_resource(current_task(), uth, 0, resource, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
864
865 thread_deallocate(th);
866 return rv;
867}
868
869int
870_bsdthread_ctl_qos_override_dispatch(struct proc *p, user_addr_t cmd, mach_port_name_t kport, pthread_priority_t priority, user_addr_t arg3, int *retval)
871{
872 if (arg3 != 0) {
873 return EINVAL;
874 }
875
876 return _bsdthread_ctl_qos_dispatch_asynchronous_override_add(p, cmd, kport, priority, USER_ADDR_NULL, retval);
877}
878
879int
880_bsdthread_ctl_qos_dispatch_asynchronous_override_add(struct proc __unused *p, user_addr_t __unused cmd, mach_port_name_t kport, pthread_priority_t priority, user_addr_t resource, int __unused *retval)
881{
882 thread_t th;
883 int rv = 0;
884
885 if ((th = port_name_to_thread(kport)) == THREAD_NULL) {
886 return ESRCH;
887 }
888
889 struct uthread *uth = pthread_kern->get_bsdthread_info(th);
890 int override_qos = pthread_priority_get_qos_class(priority);
891
892 struct threadlist *tl = util_get_thread_threadlist_entry(th);
893 if (!tl) {
894 thread_deallocate(th);
895 return EPERM;
896 }
897
898 PTHREAD_TRACE(TRACE_wq_override_dispatch | DBG_FUNC_NONE, tl->th_workq, thread_tid(th), 1, priority, 0);
899
900 /* The only failure case here is if we pass a tid and have it lookup the thread, we pass the uthread, so this all always succeeds. */
901 pthread_kern->proc_usynch_thread_qos_add_override_for_resource(current_task(), uth, 0, override_qos, TRUE, resource, THREAD_QOS_OVERRIDE_TYPE_DISPATCH_ASYNCHRONOUS_OVERRIDE);
902
903 thread_deallocate(th);
904 return rv;
905}
906
907int
908_bsdthread_ctl_qos_override_reset(struct proc *p, user_addr_t cmd, user_addr_t arg1, user_addr_t arg2, user_addr_t arg3, int *retval)
909{
910 if (arg1 != 0 || arg2 != 0 || arg3 != 0) {
911 return EINVAL;
912 }
913
914 return _bsdthread_ctl_qos_dispatch_asynchronous_override_reset(p, cmd, 1 /* reset_all */, 0, 0, retval);
915}
916
917int
918_bsdthread_ctl_qos_dispatch_asynchronous_override_reset(struct proc __unused *p, user_addr_t __unused cmd, int reset_all, user_addr_t resource, user_addr_t arg3, int __unused *retval)
919{
920 thread_t th;
921 struct threadlist *tl;
922 int rv = 0;
923
924 if ((reset_all && (resource != 0)) || arg3 != 0) {
925 return EINVAL;
926 }
927
928 th = current_thread();
929 tl = util_get_thread_threadlist_entry(th);
930
931 if (tl) {
932 wq_thread_override_reset(th, reset_all ? THREAD_QOS_OVERRIDE_RESOURCE_WILDCARD : resource);
933 } else {
934 rv = EPERM;
935 }
936
937 return rv;
938}
939
940int
941_bsdthread_ctl(struct proc *p, user_addr_t cmd, user_addr_t arg1, user_addr_t arg2, user_addr_t arg3, int *retval)
942{
943 switch (cmd) {
944 case BSDTHREAD_CTL_SET_QOS:
945 return _bsdthread_ctl_set_qos(p, cmd, (mach_port_name_t)arg1, arg2, arg3, retval);
946 case BSDTHREAD_CTL_QOS_OVERRIDE_START:
947 return _bsdthread_ctl_qos_override_start(p, cmd, (mach_port_name_t)arg1, (pthread_priority_t)arg2, arg3, retval);
948 case BSDTHREAD_CTL_QOS_OVERRIDE_END:
949 return _bsdthread_ctl_qos_override_end(p, cmd, (mach_port_name_t)arg1, arg2, arg3, retval);
950 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET:
951 return _bsdthread_ctl_qos_override_reset(p, cmd, arg1, arg2, arg3, retval);
952 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH:
953 return _bsdthread_ctl_qos_override_dispatch(p, cmd, (mach_port_name_t)arg1, (pthread_priority_t)arg2, arg3, retval);
954 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD:
955 return _bsdthread_ctl_qos_dispatch_asynchronous_override_add(p, cmd, (mach_port_name_t)arg1, (pthread_priority_t)arg2, arg3, retval);
956 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET:
957 return _bsdthread_ctl_qos_dispatch_asynchronous_override_reset(p, cmd, (int)arg1, arg2, arg3, retval);
958 case BSDTHREAD_CTL_SET_SELF:
959 return _bsdthread_ctl_set_self(p, cmd, (pthread_priority_t)arg1, (mach_port_name_t)arg2, (_pthread_set_flags_t)arg3, retval);
960 default:
961 return EINVAL;
962 }
963}
964
965#pragma mark - Workqueue Implementation
966#pragma mark sysctls
967
968uint32_t wq_yielded_threshold = WQ_YIELDED_THRESHOLD;
969uint32_t wq_yielded_window_usecs = WQ_YIELDED_WINDOW_USECS;
970uint32_t wq_stalled_window_usecs = WQ_STALLED_WINDOW_USECS;
971uint32_t wq_reduce_pool_window_usecs = WQ_REDUCE_POOL_WINDOW_USECS;
972uint32_t wq_max_timer_interval_usecs = WQ_MAX_TIMER_INTERVAL_USECS;
973uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS;
974uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8;
975uint32_t wq_max_concurrency = 1; // set to ncpus on load
976
977SYSCTL_INT(_kern, OID_AUTO, wq_yielded_threshold, CTLFLAG_RW | CTLFLAG_LOCKED,
978 &wq_yielded_threshold, 0, "");
979
980SYSCTL_INT(_kern, OID_AUTO, wq_yielded_window_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
981 &wq_yielded_window_usecs, 0, "");
982
983SYSCTL_INT(_kern, OID_AUTO, wq_stalled_window_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
984 &wq_stalled_window_usecs, 0, "");
985
986SYSCTL_INT(_kern, OID_AUTO, wq_reduce_pool_window_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
987 &wq_reduce_pool_window_usecs, 0, "");
988
989SYSCTL_INT(_kern, OID_AUTO, wq_max_timer_interval_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
990 &wq_max_timer_interval_usecs, 0, "");
991
992SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
993 &wq_max_threads, 0, "");
994
995SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
996 &wq_max_constrained_threads, 0, "");
997
998#ifdef DEBUG
999SYSCTL_INT(_kern, OID_AUTO, wq_max_concurrency, CTLFLAG_RW | CTLFLAG_LOCKED,
1000 &wq_max_concurrency, 0, "");
1001
1002static int wq_kevent_test SYSCTL_HANDLER_ARGS;
1003SYSCTL_PROC(_debug, OID_AUTO, wq_kevent_test, CTLFLAG_MASKED | CTLFLAG_RW | CTLFLAG_LOCKED | CTLFLAG_ANYBODY | CTLTYPE_OPAQUE, NULL, 0, wq_kevent_test, 0, "-");
1004#endif
1005
1006static uint32_t wq_init_constrained_limit = 1;
1007
1008#pragma mark workqueue lock
1009
1010void
1011_workqueue_init_lock(proc_t p)
1012{
1013 lck_spin_init(pthread_kern->proc_get_wqlockptr(p), pthread_lck_grp, pthread_lck_attr);
1014 *(pthread_kern->proc_get_wqinitingptr(p)) = FALSE;
1015}
1016
1017void
1018_workqueue_destroy_lock(proc_t p)
1019{
1020 lck_spin_destroy(pthread_kern->proc_get_wqlockptr(p), pthread_lck_grp);
1021}
1022
1023
1024static void
1025workqueue_lock_spin(proc_t p)
1026{
1027 lck_spin_lock(pthread_kern->proc_get_wqlockptr(p));
1028}
1029
1030static void
1031workqueue_unlock(proc_t p)
1032{
1033 lck_spin_unlock(pthread_kern->proc_get_wqlockptr(p));
1034}
1035
1036#pragma mark workqueue add timer
1037
1038/**
1039 * Sets up the timer which will call out to workqueue_add_timer
1040 */
1041static void
1042workqueue_interval_timer_start(struct workqueue *wq)
1043{
1044 uint64_t deadline;
1045
1046 /* n.b. wq_timer_interval is reset to 0 in workqueue_add_timer if the
1047 ATIMER_RUNNING flag is not present. The net effect here is that if a
1048 sequence of threads is required, we'll double the time before we give out
1049 the next one. */
1050 if (wq->wq_timer_interval == 0) {
1051 wq->wq_timer_interval = wq_stalled_window_usecs;
1052
1053 } else {
1054 wq->wq_timer_interval = wq->wq_timer_interval * 2;
1055
1056 if (wq->wq_timer_interval > wq_max_timer_interval_usecs) {
1057 wq->wq_timer_interval = wq_max_timer_interval_usecs;
1058 }
1059 }
1060 clock_interval_to_deadline(wq->wq_timer_interval, 1000, &deadline);
1061
1062 thread_call_enter_delayed(wq->wq_atimer_call, deadline);
1063
1064 PTHREAD_TRACE(TRACE_wq_start_add_timer, wq, wq->wq_reqcount, wq->wq_flags, wq->wq_timer_interval, 0);
1065}
1066
1067/**
1068 * returns whether lastblocked_tsp is within wq_stalled_window_usecs of cur_ts
1069 */
1070static boolean_t
1071wq_thread_is_busy(uint64_t cur_ts, uint64_t *lastblocked_tsp)
1072{
1073 clock_sec_t secs;
1074 clock_usec_t usecs;
1075 uint64_t lastblocked_ts;
1076 uint64_t elapsed;
1077
1078 /*
1079 * the timestamp is updated atomically w/o holding the workqueue lock
1080 * so we need to do an atomic read of the 64 bits so that we don't see
1081 * a mismatched pair of 32 bit reads... we accomplish this in an architecturally
1082 * independent fashion by using OSCompareAndSwap64 to write back the
1083 * value we grabbed... if it succeeds, then we have a good timestamp to
1084 * evaluate... if it fails, we straddled grabbing the timestamp while it
1085 * was being updated... treat a failed update as a busy thread since
1086 * it implies we are about to see a really fresh timestamp anyway
1087 */
1088 lastblocked_ts = *lastblocked_tsp;
1089
1090 if ( !OSCompareAndSwap64((UInt64)lastblocked_ts, (UInt64)lastblocked_ts, lastblocked_tsp))
1091 return (TRUE);
1092
1093 if (lastblocked_ts >= cur_ts) {
1094 /*
1095 * because the update of the timestamp when a thread blocks isn't
1096 * serialized against us looking at it (i.e. we don't hold the workq lock)
1097 * it's possible to have a timestamp that matches the current time or
1098 * that even looks to be in the future relative to when we grabbed the current
1099 * time... just treat this as a busy thread since it must have just blocked.
1100 */
1101 return (TRUE);
1102 }
1103 elapsed = cur_ts - lastblocked_ts;
1104
1105 pthread_kern->absolutetime_to_microtime(elapsed, &secs, &usecs);
1106
1107 if (secs == 0 && usecs < wq_stalled_window_usecs)
1108 return (TRUE);
1109 return (FALSE);
1110}
1111
1112#define WQ_TIMER_NEEDED(wq, start_timer) do { \
1113 int oldflags = wq->wq_flags; \
1114 \
1115 if ( !(oldflags & (WQ_EXITING | WQ_ATIMER_RUNNING))) { \
1116 if (OSCompareAndSwap(oldflags, oldflags | WQ_ATIMER_RUNNING, (UInt32 *)&wq->wq_flags)) \
1117 start_timer = TRUE; \
1118 } \
1119} while (0)
1120
1121/**
1122 * handler function for the timer
1123 */
1124static void
1125workqueue_add_timer(struct workqueue *wq, __unused int param1)
1126{
1127 proc_t p;
1128 boolean_t start_timer = FALSE;
1129 boolean_t retval;
1130
1131 PTHREAD_TRACE(TRACE_wq_add_timer | DBG_FUNC_START, wq, wq->wq_flags, wq->wq_nthreads, wq->wq_thidlecount, 0);
1132
1133 p = wq->wq_proc;
1134
1135 workqueue_lock_spin(p);
1136
1137 /*
1138 * because workqueue_callback now runs w/o taking the workqueue lock
1139 * we are unsynchronized w/r to a change in state of the running threads...
1140 * to make sure we always evaluate that change, we allow it to start up
1141 * a new timer if the current one is actively evalutating the state
1142 * however, we do not need more than 2 timers fired up (1 active and 1 pending)
1143 * and we certainly do not want 2 active timers evaluating the state
1144 * simultaneously... so use WQL_ATIMER_BUSY to serialize the timers...
1145 * note that WQL_ATIMER_BUSY is in a different flag word from WQ_ATIMER_RUNNING since
1146 * it is always protected by the workq lock... WQ_ATIMER_RUNNING is evaluated
1147 * and set atomimcally since the callback function needs to manipulate it
1148 * w/o holding the workq lock...
1149 *
1150 * !WQ_ATIMER_RUNNING && !WQL_ATIMER_BUSY == no pending timer, no active timer
1151 * !WQ_ATIMER_RUNNING && WQL_ATIMER_BUSY == no pending timer, 1 active timer
1152 * WQ_ATIMER_RUNNING && !WQL_ATIMER_BUSY == 1 pending timer, no active timer
1153 * WQ_ATIMER_RUNNING && WQL_ATIMER_BUSY == 1 pending timer, 1 active timer
1154 */
1155 while (wq->wq_lflags & WQL_ATIMER_BUSY) {
1156 wq->wq_lflags |= WQL_ATIMER_WAITING;
1157
1158 assert_wait((caddr_t)wq, (THREAD_UNINT));
1159 workqueue_unlock(p);
1160
1161 thread_block(THREAD_CONTINUE_NULL);
1162
1163 workqueue_lock_spin(p);
1164 }
1165 wq->wq_lflags |= WQL_ATIMER_BUSY;
1166
1167 /*
1168 * the workq lock will protect us from seeing WQ_EXITING change state, but we
1169 * still need to update this atomically in case someone else tries to start
1170 * the timer just as we're releasing it
1171 */
1172 while ( !(OSCompareAndSwap(wq->wq_flags, (wq->wq_flags & ~WQ_ATIMER_RUNNING), (UInt32 *)&wq->wq_flags)));
1173
1174again:
1175 retval = TRUE;
1176 if ( !(wq->wq_flags & WQ_EXITING)) {
1177 boolean_t add_thread = FALSE;
1178 /*
1179 * check to see if the stall frequency was beyond our tolerance
1180 * or we have work on the queue, but haven't scheduled any
1181 * new work within our acceptable time interval because
1182 * there were no idle threads left to schedule
1183 */
1184 if (wq->wq_reqcount) {
1185 uint32_t priclass = 0;
1186 uint32_t thactive_count = 0;
1187 uint64_t curtime = mach_absolute_time();
1188 uint64_t busycount = 0;
1189
1190 if (wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] &&
1191 wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0){
1192 priclass = WORKQUEUE_EVENT_MANAGER_BUCKET;
1193 } else {
1194 for (priclass = 0; priclass < WORKQUEUE_NUM_BUCKETS; priclass++) {
1195 if (wq->wq_requests[priclass])
1196 break;
1197 }
1198 }
1199
1200 if (priclass < WORKQUEUE_EVENT_MANAGER_BUCKET){
1201 /*
1202 * Compute a metric for many how many threads are active. We
1203 * find the highest priority request outstanding and then add up
1204 * the number of active threads in that and all higher-priority
1205 * buckets. We'll also add any "busy" threads which are not
1206 * active but blocked recently enough that we can't be sure
1207 * they've gone idle yet. We'll then compare this metric to our
1208 * max concurrency to decide whether to add a new thread.
1209 */
1210 for (uint32_t i = 0; i <= priclass; i++) {
1211 thactive_count += wq->wq_thactive_count[i];
1212
1213 // XXX why isn't this checking thscheduled_count < thactive_count ?
1214 if (wq->wq_thscheduled_count[i]) {
1215 if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i]))
1216 busycount++;
1217 }
1218 }
1219 }
1220
1221 if (thactive_count + busycount < wq->wq_max_concurrency ||
1222 priclass == WORKQUEUE_EVENT_MANAGER_BUCKET) {
1223
1224 if (wq->wq_thidlecount == 0) {
1225 /*
1226 * if we have no idle threads, try to add one
1227 */
1228 retval = workqueue_addnewthread(wq, priclass == WORKQUEUE_EVENT_MANAGER_BUCKET);
1229 }
1230 add_thread = TRUE;
1231 }
1232
1233 if (wq->wq_reqcount) {
1234 /*
1235 * as long as we have threads to schedule, and we successfully
1236 * scheduled new work, keep trying
1237 */
1238 while (wq->wq_thidlecount && !(wq->wq_flags & WQ_EXITING)) {
1239 /*
1240 * workqueue_run_nextreq is responsible for
1241 * dropping the workqueue lock in all cases
1242 */
1243 retval = workqueue_run_nextreq(p, wq, THREAD_NULL, RUN_NEXTREQ_DEFAULT, 0);
1244 workqueue_lock_spin(p);
1245
1246 if (retval == FALSE)
1247 break;
1248 }
1249 if ( !(wq->wq_flags & WQ_EXITING) && wq->wq_reqcount) {
1250
1251 if (wq->wq_thidlecount == 0 && retval == TRUE && add_thread == TRUE)
1252 goto again;
1253
1254 if (wq->wq_thidlecount == 0 || busycount)
1255 WQ_TIMER_NEEDED(wq, start_timer);
1256
1257 PTHREAD_TRACE(TRACE_wq_add_timer | DBG_FUNC_NONE, wq, wq->wq_reqcount, wq->wq_thidlecount, busycount, 0);
1258 }
1259 }
1260 }
1261 }
1262
1263 /*
1264 * If we called WQ_TIMER_NEEDED above, then this flag will be set if that
1265 * call marked the timer running. If so, we let the timer interval grow.
1266 * Otherwise, we reset it back to 0.
1267 */
1268 if ( !(wq->wq_flags & WQ_ATIMER_RUNNING))
1269 wq->wq_timer_interval = 0;
1270
1271 wq->wq_lflags &= ~WQL_ATIMER_BUSY;
1272
1273 if ((wq->wq_flags & WQ_EXITING) || (wq->wq_lflags & WQL_ATIMER_WAITING)) {
1274 /*
1275 * wakeup the thread hung up in workqueue_exit or workqueue_add_timer waiting for this timer
1276 * to finish getting out of the way
1277 */
1278 wq->wq_lflags &= ~WQL_ATIMER_WAITING;
1279 wakeup(wq);
1280 }
1281
1282 PTHREAD_TRACE(TRACE_wq_add_timer | DBG_FUNC_END, wq, start_timer, wq->wq_nthreads, wq->wq_thidlecount, 0);
1283
1284 workqueue_unlock(p);
1285
1286 if (start_timer == TRUE)
1287 workqueue_interval_timer_start(wq);
1288}
1289
1290#pragma mark thread state tracking
1291
1292// called by spinlock code when trying to yield to lock owner
1293void
1294_workqueue_thread_yielded(void)
1295{
1296 struct workqueue *wq;
1297 proc_t p;
1298
1299 p = current_proc();
1300
1301 if ((wq = pthread_kern->proc_get_wqptr(p)) == NULL || wq->wq_reqcount == 0)
1302 return;
1303
1304 workqueue_lock_spin(p);
1305
1306 if (wq->wq_reqcount) {
1307 uint64_t curtime;
1308 uint64_t elapsed;
1309 clock_sec_t secs;
1310 clock_usec_t usecs;
1311
1312 if (wq->wq_thread_yielded_count++ == 0)
1313 wq->wq_thread_yielded_timestamp = mach_absolute_time();
1314
1315 if (wq->wq_thread_yielded_count < wq_yielded_threshold) {
1316 workqueue_unlock(p);
1317 return;
1318 }
1319
1320 PTHREAD_TRACE(TRACE_wq_thread_yielded | DBG_FUNC_START, wq, wq->wq_thread_yielded_count, wq->wq_reqcount, 0, 0);
1321
1322 wq->wq_thread_yielded_count = 0;
1323
1324 curtime = mach_absolute_time();
1325 elapsed = curtime - wq->wq_thread_yielded_timestamp;
1326 pthread_kern->absolutetime_to_microtime(elapsed, &secs, &usecs);
1327
1328 if (secs == 0 && usecs < wq_yielded_window_usecs) {
1329
1330 if (wq->wq_thidlecount == 0) {
1331 workqueue_addnewthread(wq, TRUE);
1332 /*
1333 * 'workqueue_addnewthread' drops the workqueue lock
1334 * when creating the new thread and then retakes it before
1335 * returning... this window allows other threads to process
1336 * requests, so we need to recheck for available work
1337 * if none found, we just return... the newly created thread
1338 * will eventually get used (if it hasn't already)...
1339 */
1340 if (wq->wq_reqcount == 0) {
1341 workqueue_unlock(p);
1342 return;
1343 }
1344 }
1345 if (wq->wq_thidlecount) {
1346 (void)workqueue_run_nextreq(p, wq, THREAD_NULL, RUN_NEXTREQ_UNCONSTRAINED, 0);
1347 /*
1348 * workqueue_run_nextreq is responsible for
1349 * dropping the workqueue lock in all cases
1350 */
1351 PTHREAD_TRACE(TRACE_wq_thread_yielded | DBG_FUNC_END, wq, wq->wq_thread_yielded_count, wq->wq_reqcount, 1, 0);
1352
1353 return;
1354 }
1355 }
1356 PTHREAD_TRACE(TRACE_wq_thread_yielded | DBG_FUNC_END, wq, wq->wq_thread_yielded_count, wq->wq_reqcount, 2, 0);
1357 }
1358 workqueue_unlock(p);
1359}
1360
1361
1362
1363static void
1364workqueue_callback(int type, thread_t thread)
1365{
1366 struct uthread *uth;
1367 struct threadlist *tl;
1368 struct workqueue *wq;
1369
1370 uth = pthread_kern->get_bsdthread_info(thread);
1371 tl = pthread_kern->uthread_get_threadlist(uth);
1372 wq = tl->th_workq;
1373
1374 switch (type) {
1375 case SCHED_CALL_BLOCK: {
1376 uint32_t old_activecount;
1377 boolean_t start_timer = FALSE;
1378
1379 old_activecount = OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority]);
1380
1381 /*
1382 * If we blocked and were at the requested concurrency previously, we may
1383 * need to spin up a new thread. Of course, if it's the event manager
1384 * then that's moot, so ignore that case.
1385 */
1386 if (old_activecount == wq->wq_reqconc[tl->th_priority] &&
1387 tl->th_priority != WORKQUEUE_EVENT_MANAGER_BUCKET) {
1388 uint64_t curtime;
1389 UInt64 *lastblocked_ptr;
1390
1391 /*
1392 * the number of active threads at this priority
1393 * has fallen below the maximum number of concurrent
1394 * threads that we're allowed to run
1395 */
1396 lastblocked_ptr = (UInt64 *)&wq->wq_lastblocked_ts[tl->th_priority];
1397 curtime = mach_absolute_time();
1398
1399 /*
1400 * if we collide with another thread trying to update the last_blocked (really unlikely
1401 * since another thread would have to get scheduled and then block after we start down
1402 * this path), it's not a problem. Either timestamp is adequate, so no need to retry
1403 */
1404
1405 OSCompareAndSwap64(*lastblocked_ptr, (UInt64)curtime, lastblocked_ptr);
1406
1407 if (wq->wq_reqcount) {
1408 /*
1409 * we have work to do so start up the timer
1410 * if it's not running... we'll let it sort
1411 * out whether we really need to start up
1412 * another thread
1413 */
1414 WQ_TIMER_NEEDED(wq, start_timer);
1415 }
1416
1417 if (start_timer == TRUE) {
1418 workqueue_interval_timer_start(wq);
1419 }
1420 }
1421 PTHREAD_TRACE1(TRACE_wq_thread_block | DBG_FUNC_START, wq, old_activecount, tl->th_priority, start_timer, thread_tid(thread));
1422 break;
1423 }
1424 case SCHED_CALL_UNBLOCK:
1425 /*
1426 * we cannot take the workqueue_lock here...
1427 * an UNBLOCK can occur from a timer event which
1428 * is run from an interrupt context... if the workqueue_lock
1429 * is already held by this processor, we'll deadlock...
1430 * the thread lock for the thread being UNBLOCKED
1431 * is also held
1432 */
1433 OSAddAtomic(1, &wq->wq_thactive_count[tl->th_priority]);
1434
1435 PTHREAD_TRACE1(TRACE_wq_thread_block | DBG_FUNC_END, wq, wq->wq_threads_scheduled, tl->th_priority, 0, thread_tid(thread));
1436
1437 break;
1438 }
1439}
1440
1441sched_call_t
1442_workqueue_get_sched_callback(void)
1443{
1444 return workqueue_callback;
1445}
1446
1447#pragma mark thread addition/removal
1448
1449/**
1450 * pop goes the thread
1451 */
1452static void
1453workqueue_removethread(struct threadlist *tl, int fromexit)
1454{
1455 struct workqueue *wq;
1456 struct uthread * uth;
1457
1458 /*
1459 * If fromexit is set, the call is from workqueue_exit(,
1460 * so some cleanups are to be avoided.
1461 */
1462 wq = tl->th_workq;
1463
1464 TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
1465
1466 if (fromexit == 0) {
1467 wq->wq_nthreads--;
1468 wq->wq_thidlecount--;
1469 }
1470
1471 /*
1472 * Clear the threadlist pointer in uthread so
1473 * blocked thread on wakeup for termination will
1474 * not access the thread list as it is going to be
1475 * freed.
1476 */
1477 pthread_kern->thread_sched_call(tl->th_thread, NULL);
1478
1479 uth = pthread_kern->get_bsdthread_info(tl->th_thread);
1480 if (uth != (struct uthread *)0) {
1481 pthread_kern->uthread_set_threadlist(uth, NULL);
1482 }
1483 if (fromexit == 0) {
1484 /* during exit the lock is not held */
1485 workqueue_unlock(wq->wq_proc);
1486 }
1487
1488 if ( (tl->th_flags & TH_LIST_SUSPENDED) ) {
1489 /*
1490 * thread was created, but never used...
1491 * need to clean up the stack and port ourselves
1492 * since we're not going to spin up through the
1493 * normal exit path triggered from Libc
1494 */
1495 if (fromexit == 0) {
1496 /* vm map is already deallocated when this is called from exit */
1497 (void)mach_vm_deallocate(wq->wq_map, tl->th_stackaddr, tl->th_allocsize);
1498 }
1499 (void)pthread_kern->mach_port_deallocate(pthread_kern->task_get_ipcspace(wq->wq_task), tl->th_thport);
1500
1501 PTHREAD_TRACE1(TRACE_wq_thread_suspend | DBG_FUNC_END, wq, (uintptr_t)thread_tid(current_thread()), wq->wq_nthreads, 0xdead, thread_tid(tl->th_thread));
1502 } else {
1503
1504 PTHREAD_TRACE1(TRACE_wq_thread_park | DBG_FUNC_END, wq, (uintptr_t)thread_tid(current_thread()), wq->wq_nthreads, 0xdead, thread_tid(tl->th_thread));
1505 }
1506 /*
1507 * drop our ref on the thread
1508 */
1509 thread_deallocate(tl->th_thread);
1510
1511 kfree(tl, sizeof(struct threadlist));
1512}
1513
1514
1515/**
1516 * Try to add a new workqueue thread.
1517 *
1518 * - called with workq lock held
1519 * - dropped and retaken around thread creation
1520 * - return with workq lock held
1521 */
1522static boolean_t
1523workqueue_addnewthread(struct workqueue *wq, boolean_t ignore_constrained_thread_limit)
1524{
1525 struct threadlist *tl;
1526 struct uthread *uth;
1527 kern_return_t kret;
1528 thread_t th;
1529 proc_t p;
1530 void *sright;
1531 mach_vm_offset_t stackaddr;
1532
1533 if ((wq->wq_flags & WQ_EXITING) == WQ_EXITING) {
1534 PTHREAD_TRACE(TRACE_wq_thread_add_during_exit | DBG_FUNC_NONE, wq, 0, 0, 0, 0);
1535 return (FALSE);
1536 }
1537
1538 if (wq->wq_nthreads >= wq_max_threads || wq->wq_nthreads >= (pthread_kern->config_thread_max - 20)) {
1539 wq->wq_lflags |= WQL_EXCEEDED_TOTAL_THREAD_LIMIT;
1540
1541 PTHREAD_TRACE(TRACE_wq_thread_limit_exceeded | DBG_FUNC_NONE, wq, wq->wq_nthreads, wq_max_threads,
1542 pthread_kern->config_thread_max - 20, 0);
1543 return (FALSE);
1544 }
1545 wq->wq_lflags &= ~WQL_EXCEEDED_TOTAL_THREAD_LIMIT;
1546
1547 if (ignore_constrained_thread_limit == FALSE &&
1548 wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
1549 /*
1550 * If we're not creating this thread to service an overcommit or
1551 * event manager request, then we check to see if we are over our
1552 * constrained thread limit, in which case we error out.
1553 */
1554 wq->wq_lflags |= WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
1555
1556 PTHREAD_TRACE(TRACE_wq_thread_constrained_maxed | DBG_FUNC_NONE, wq, wq->wq_constrained_threads_scheduled,
1557 wq_max_constrained_threads, 0, 0);
1558 return (FALSE);
1559 }
1560 if (wq->wq_constrained_threads_scheduled < wq_max_constrained_threads)
1561 wq->wq_lflags &= ~WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
1562
1563 wq->wq_nthreads++;
1564
1565 p = wq->wq_proc;
1566 workqueue_unlock(p);
1567
1568 kret = pthread_kern->thread_create_workq(wq->wq_task, (thread_continue_t)wq_unsuspend_continue, &th);
1569 if (kret != KERN_SUCCESS) {
1570 PTHREAD_TRACE(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, kret, 0, 0, 0);
1571 goto failed;
1572 }
1573
1574 tl = kalloc(sizeof(struct threadlist));
1575 bzero(tl, sizeof(struct threadlist));
1576
1577 stackaddr = stackaddr_hint(p);
1578
1579 mach_vm_size_t guardsize = vm_map_page_size(wq->wq_map);
1580 mach_vm_size_t pthread_size =
1581 vm_map_round_page_mask(pthread_kern->proc_get_pthsize(p) + PTHREAD_T_OFFSET, vm_map_page_mask(wq->wq_map));
1582 tl->th_allocsize = guardsize + PTH_DEFAULT_STACKSIZE + pthread_size;
1583
1584 kret = mach_vm_map(wq->wq_map, &stackaddr,
1585 tl->th_allocsize,
1586 page_size-1,
1587 VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE , NULL,
1588 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL,
1589 VM_INHERIT_DEFAULT);
1590
1591 if (kret != KERN_SUCCESS) {
1592 PTHREAD_TRACE(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, kret, 1, 0, 0);
1593
1594 kret = mach_vm_allocate(wq->wq_map,
1595 &stackaddr, tl->th_allocsize,
1596 VM_MAKE_TAG(VM_MEMORY_STACK) | VM_FLAGS_ANYWHERE);
1597 }
1598 if (kret == KERN_SUCCESS) {
1599 /*
1600 * The guard page is at the lowest address
1601 * The stack base is the highest address
1602 */
1603 kret = mach_vm_protect(wq->wq_map, stackaddr, guardsize, FALSE, VM_PROT_NONE);
1604
1605 if (kret != KERN_SUCCESS) {
1606 (void) mach_vm_deallocate(wq->wq_map, stackaddr, tl->th_allocsize);
1607 PTHREAD_TRACE(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, kret, 2, 0, 0);
1608 }
1609 }
1610 if (kret != KERN_SUCCESS) {
1611 (void) thread_terminate(th);
1612 thread_deallocate(th);
1613
1614 kfree(tl, sizeof(struct threadlist));
1615 goto failed;
1616 }
1617 thread_reference(th);
1618
1619 sright = (void *)pthread_kern->convert_thread_to_port(th);
1620 tl->th_thport = pthread_kern->ipc_port_copyout_send(sright, pthread_kern->task_get_ipcspace(wq->wq_task));
1621
1622 pthread_kern->thread_static_param(th, TRUE);
1623
1624 tl->th_flags = TH_LIST_INITED | TH_LIST_SUSPENDED;
1625
1626 tl->th_thread = th;
1627 tl->th_workq = wq;
1628 tl->th_stackaddr = stackaddr;
1629 tl->th_priority = WORKQUEUE_NUM_BUCKETS;
1630 tl->th_policy = -1;
1631
1632 uth = pthread_kern->get_bsdthread_info(tl->th_thread);
1633
1634 workqueue_lock_spin(p);
1635
1636 pthread_kern->uthread_set_threadlist(uth, tl);
1637 TAILQ_INSERT_TAIL(&wq->wq_thidlelist, tl, th_entry);
1638
1639 wq->wq_thidlecount++;
1640
1641 PTHREAD_TRACE1(TRACE_wq_thread_suspend | DBG_FUNC_START, wq, wq->wq_nthreads, 0, thread_tid(current_thread()), thread_tid(tl->th_thread));
1642
1643 return (TRUE);
1644
1645failed:
1646 workqueue_lock_spin(p);
1647 wq->wq_nthreads--;
1648
1649 return (FALSE);
1650}
1651
1652/**
1653 * Setup per-process state for the workqueue.
1654 */
1655int
1656_workq_open(struct proc *p, __unused int32_t *retval)
1657{
1658 struct workqueue * wq;
1659 int wq_size;
1660 char * ptr;
1661 uint32_t i;
1662 uint32_t num_cpus;
1663 int error = 0;
1664 boolean_t need_wakeup = FALSE;
1665
1666 if (pthread_kern->proc_get_register(p) == 0) {
1667 return EINVAL;
1668 }
1669
1670 num_cpus = pthread_kern->ml_get_max_cpus();
1671
1672 if (wq_init_constrained_limit) {
1673 uint32_t limit;
1674 /*
1675 * set up the limit for the constrained pool
1676 * this is a virtual pool in that we don't
1677 * maintain it on a separate idle and run list
1678 */
1679 limit = num_cpus * WORKQUEUE_CONSTRAINED_FACTOR;
1680
1681 if (limit > wq_max_constrained_threads)
1682 wq_max_constrained_threads = limit;
1683
1684 wq_init_constrained_limit = 0;
1685 }
1686 workqueue_lock_spin(p);
1687
1688 if (pthread_kern->proc_get_wqptr(p) == NULL) {
1689
1690 while (*pthread_kern->proc_get_wqinitingptr(p) == TRUE) {
1691
1692 assert_wait((caddr_t)pthread_kern->proc_get_wqinitingptr(p), THREAD_UNINT);
1693 workqueue_unlock(p);
1694
1695 thread_block(THREAD_CONTINUE_NULL);
1696
1697 workqueue_lock_spin(p);
1698 }
1699 if (pthread_kern->proc_get_wqptr(p) != NULL) {
1700 goto out;
1701 }
1702
1703 *(pthread_kern->proc_get_wqinitingptr(p)) = TRUE;
1704
1705 workqueue_unlock(p);
1706
1707 wq_size = sizeof(struct workqueue);
1708
1709 ptr = (char *)kalloc(wq_size);
1710 bzero(ptr, wq_size);
1711
1712 wq = (struct workqueue *)ptr;
1713 wq->wq_flags = WQ_LIST_INITED;
1714 wq->wq_proc = p;
1715 wq->wq_max_concurrency = wq_max_concurrency;
1716 wq->wq_task = current_task();
1717 wq->wq_map = pthread_kern->current_map();
1718
1719 for (i = 0; i < WORKQUEUE_NUM_BUCKETS; i++)
1720 wq->wq_reqconc[i] = (uint16_t)wq->wq_max_concurrency;
1721
1722 // The event manager bucket is special, so its gets a concurrency of 1
1723 // though we shouldn't ever read this value for that bucket
1724 wq->wq_reqconc[WORKQUEUE_EVENT_MANAGER_BUCKET] = 1;
1725
1726 // Always start the event manager at BACKGROUND
1727 wq->wq_event_manager_priority = (uint32_t)pthread_qos_class_get_priority(THREAD_QOS_BACKGROUND) | _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
1728
1729 TAILQ_INIT(&wq->wq_thrunlist);
1730 TAILQ_INIT(&wq->wq_thidlelist);
1731
1732 wq->wq_atimer_call = thread_call_allocate((thread_call_func_t)workqueue_add_timer, (thread_call_param_t)wq);
1733
1734 workqueue_lock_spin(p);
1735
1736 pthread_kern->proc_set_wqptr(p, wq);
1737 pthread_kern->proc_set_wqsize(p, wq_size);
1738
1739 *(pthread_kern->proc_get_wqinitingptr(p)) = FALSE;
1740 need_wakeup = TRUE;
1741 }
1742out:
1743 workqueue_unlock(p);
1744
1745 if (need_wakeup == TRUE) {
1746 wakeup(pthread_kern->proc_get_wqinitingptr(p));
1747 }
1748 return(error);
1749}
1750
1751/*
1752 * Routine: workqueue_mark_exiting
1753 *
1754 * Function: Mark the work queue such that new threads will not be added to the
1755 * work queue after we return.
1756 *
1757 * Conditions: Called against the current process.
1758 */
1759void
1760_workqueue_mark_exiting(struct proc *p)
1761{
1762 struct workqueue *wq = pthread_kern->proc_get_wqptr(p);
1763
1764 if (wq != NULL) {
1765
1766 PTHREAD_TRACE(TRACE_wq_pthread_exit|DBG_FUNC_START, wq, 0, 0, 0, 0);
1767
1768 workqueue_lock_spin(p);
1769
1770 /*
1771 * we now arm the timer in the callback function w/o holding the workq lock...
1772 * we do this by setting WQ_ATIMER_RUNNING via OSCompareAndSwap in order to
1773 * insure only a single timer if running and to notice that WQ_EXITING has
1774 * been set (we don't want to start a timer once WQ_EXITING is posted)
1775 *
1776 * so once we have successfully set WQ_EXITING, we cannot fire up a new timer...
1777 * therefor no need to clear the timer state atomically from the flags
1778 *
1779 * since we always hold the workq lock when dropping WQ_ATIMER_RUNNING
1780 * the check for and sleep until clear is protected
1781 */
1782 while (!(OSCompareAndSwap(wq->wq_flags, (wq->wq_flags | WQ_EXITING), (UInt32 *)&wq->wq_flags)));
1783
1784 if (wq->wq_flags & WQ_ATIMER_RUNNING) {
1785 if (thread_call_cancel(wq->wq_atimer_call) == TRUE) {
1786 wq->wq_flags &= ~WQ_ATIMER_RUNNING;
1787 }
1788 }
1789 while ((wq->wq_flags & WQ_ATIMER_RUNNING) || (wq->wq_lflags & WQL_ATIMER_BUSY)) {
1790 assert_wait((caddr_t)wq, (THREAD_UNINT));
1791 workqueue_unlock(p);
1792
1793 thread_block(THREAD_CONTINUE_NULL);
1794
1795 workqueue_lock_spin(p);
1796 }
1797 workqueue_unlock(p);
1798
1799 PTHREAD_TRACE(TRACE_wq_pthread_exit|DBG_FUNC_END, 0, 0, 0, 0, 0);
1800 }
1801}
1802
1803/*
1804 * Routine: workqueue_exit
1805 *
1806 * Function: clean up the work queue structure(s) now that there are no threads
1807 * left running inside the work queue (except possibly current_thread).
1808 *
1809 * Conditions: Called by the last thread in the process.
1810 * Called against current process.
1811 */
1812void
1813_workqueue_exit(struct proc *p)
1814{
1815 struct workqueue * wq;
1816 struct threadlist * tl, *tlist;
1817 struct uthread *uth;
1818 int wq_size = 0;
1819
1820 wq = pthread_kern->proc_get_wqptr(p);
1821 if (wq != NULL) {
1822
1823 PTHREAD_TRACE(TRACE_wq_workqueue_exit|DBG_FUNC_START, wq, 0, 0, 0, 0);
1824
1825 wq_size = pthread_kern->proc_get_wqsize(p);
1826 pthread_kern->proc_set_wqptr(p, NULL);
1827 pthread_kern->proc_set_wqsize(p, 0);
1828
1829 /*
1830 * Clean up workqueue data structures for threads that exited and
1831 * didn't get a chance to clean up after themselves.
1832 */
1833 TAILQ_FOREACH_SAFE(tl, &wq->wq_thrunlist, th_entry, tlist) {
1834 pthread_kern->thread_sched_call(tl->th_thread, NULL);
1835
1836 uth = pthread_kern->get_bsdthread_info(tl->th_thread);
1837 if (uth != (struct uthread *)0) {
1838 pthread_kern->uthread_set_threadlist(uth, NULL);
1839 }
1840 TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry);
1841
1842 /*
1843 * drop our last ref on the thread
1844 */
1845 thread_deallocate(tl->th_thread);
1846
1847 kfree(tl, sizeof(struct threadlist));
1848 }
1849 TAILQ_FOREACH_SAFE(tl, &wq->wq_thidlelist, th_entry, tlist) {
1850 workqueue_removethread(tl, 1);
1851 }
1852 thread_call_free(wq->wq_atimer_call);
1853
1854 kfree(wq, wq_size);
1855
1856 PTHREAD_TRACE(TRACE_wq_workqueue_exit|DBG_FUNC_END, 0, 0, 0, 0, 0);
1857 }
1858}
1859
1860
1861#pragma mark workqueue thread manipulation
1862
1863/**
1864 * Entry point for libdispatch to ask for threads
1865 */
1866static int wqops_queue_reqthreads(struct proc *p, int reqcount, pthread_priority_t priority){
1867 struct workqueue *wq;
1868
1869 boolean_t overcommit = (_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) != 0;
1870 int class = pthread_priority_get_class_index(priority);
1871
1872 boolean_t event_manager = (_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG) != 0;
1873 if (event_manager){
1874 class = WORKQUEUE_EVENT_MANAGER_BUCKET;
1875 }
1876
1877 if ((reqcount <= 0) || (class < 0) || (class >= WORKQUEUE_NUM_BUCKETS) || (overcommit && event_manager)) {
1878 return EINVAL;
1879 }
1880
1881 workqueue_lock_spin(p);
1882
1883 if ((wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p)) == NULL) {
1884 workqueue_unlock(p);
1885
1886 return EINVAL;
1887 }
1888
1889 if (overcommit == 0 && event_manager == 0) {
1890 wq->wq_reqcount += reqcount;
1891 wq->wq_requests[class] += reqcount;
1892
1893 PTHREAD_TRACE(TRACE_wq_req_threads | DBG_FUNC_NONE, wq, priority, wq->wq_requests[class], reqcount, 0);
1894
1895 while (wq->wq_reqcount) {
1896 if (!workqueue_run_one(p, wq, overcommit, 0))
1897 break;
1898 }
1899 } else if (overcommit){
1900 PTHREAD_TRACE(TRACE_wq_req_octhreads | DBG_FUNC_NONE, wq, priority, wq->wq_ocrequests[class], reqcount, 0);
1901
1902 while (reqcount) {
1903 if (!workqueue_run_one(p, wq, overcommit, priority))
1904 break;
1905 reqcount--;
1906 }
1907 if (reqcount) {
1908 /*
1909 * we need to delay starting some of the overcommit requests...
1910 * we should only fail to create the overcommit threads if
1911 * we're at the max thread limit... as existing threads
1912 * return to the kernel, we'll notice the ocrequests
1913 * and spin them back to user space as the overcommit variety
1914 */
1915 wq->wq_reqcount += reqcount;
1916 wq->wq_requests[class] += reqcount;
1917 wq->wq_ocrequests[class] += reqcount;
1918
1919 PTHREAD_TRACE(TRACE_wq_delay_octhreads | DBG_FUNC_NONE, wq, priority, wq->wq_ocrequests[class], reqcount, 0);
1920
1921 /* if we delayed this thread coming up but we're not constrained
1922 * or at max threads then we need to start the timer so we don't
1923 * risk dropping this request on the floor.
1924 */
1925 if ((wq->wq_lflags & (WQL_EXCEEDED_TOTAL_THREAD_LIMIT | WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT)) == 0) {
1926 boolean_t start_timer = FALSE;
1927 WQ_TIMER_NEEDED(wq, start_timer);
1928
1929 if (start_timer) {
1930 workqueue_interval_timer_start(wq);
1931 }
1932 }
1933 }
1934 } else if (event_manager) {
1935 PTHREAD_TRACE(TRACE_wq_req_event_manager | DBG_FUNC_NONE, wq, wq->wq_event_manager_priority, wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET], wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET], 0);
1936
1937 if (wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0){
1938 wq->wq_reqcount += 1;
1939 wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] = 1;
1940 }
1941
1942 // We've recorded the request for an event manager thread above. We'll
1943 // let the timer pick it up as we would for a kernel callout. We can
1944 // do a direct add/wakeup when that support is added for the kevent path.
1945 boolean_t start_timer = FALSE;
1946 if (wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0)
1947 WQ_TIMER_NEEDED(wq, start_timer);
1948 if (start_timer == TRUE)
1949 workqueue_interval_timer_start(wq);
1950 }
1951 workqueue_unlock(p);
1952
1953 return 0;
1954}
1955
1956/* Used by the kevent system to request threads. Currently count is ignored
1957 * and we always return one thread per invocation.
1958 */
1959thread_t _workq_reqthreads(struct proc *p, int requests_count, workq_reqthreads_req_t requests){
1960 boolean_t start_timer = FALSE;
1961 assert(requests_count > 0);
1962
1963#if DEBUG
1964 // Make sure that the requests array is sorted, highest priority first
1965 if (requests_count > 1){
1966 __assert_only qos_class_t priority = _pthread_priority_get_qos_newest(requests[0].priority);
1967 __assert_only unsigned long flags = ((_pthread_priority_get_flags(requests[0].priority) & (_PTHREAD_PRIORITY_OVERCOMMIT_FLAG|_PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) != 0);
1968 for (int i = 1; i < requests_count; i++){
1969 if (requests[i].count == 0) continue;
1970 __assert_only qos_class_t next_priority = _pthread_priority_get_qos_newest(requests[i].priority);
1971 __assert_only unsigned long next_flags = ((_pthread_priority_get_flags(requests[i].priority) & (_PTHREAD_PRIORITY_OVERCOMMIT_FLAG|_PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) != 0);
1972 if (next_flags != flags){
1973 flags = next_flags;
1974 priority = next_priority;
1975 } else {
1976 assert(next_priority <= priority);
1977 }
1978 }
1979 }
1980#endif // DEBUG
1981
1982 int error = 0;
1983 struct workqueue *wq;
1984
1985 workqueue_lock_spin(p);
1986
1987 if ((wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p)) == NULL) {
1988 error = EINVAL;
1989 goto done;
1990 }
1991
1992 PTHREAD_TRACE(TRACE_wq_kevent_req_threads | DBG_FUNC_START, wq, requests_count, 0, 0, 0);
1993
1994 // Look for overcommit or event-manager-only requests.
1995 boolean_t have_overcommit = FALSE;
1996 pthread_priority_t priority = 0;
1997 for (int i = 0; i < requests_count; i++){
1998 if (requests[i].count == 0)
1999 continue;
2000 priority = requests[i].priority;
2001 if (_pthread_priority_get_qos_newest(priority) == QOS_CLASS_UNSPECIFIED){
2002 priority |= _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
2003 }
2004 if ((_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG) != 0){
2005 goto event_manager;
2006 }
2007 if ((_pthread_priority_get_flags(priority) & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) != 0){
2008 have_overcommit = TRUE;
2009 break;
2010 }
2011 }
2012
2013 if (have_overcommit){
2014 // I can't make this call, since it's not safe from some contexts yet,
2015 // so just setup a delayed overcommit and let the timer do the work
2016 //boolean_t success = workqueue_run_one(p, wq, TRUE, priority);
2017 if (/* !success */ TRUE){
2018 int class = pthread_priority_get_class_index(priority);
2019 wq->wq_reqcount += 1;
2020 wq->wq_requests[class] += 1;
2021 wq->wq_kevent_ocrequests[class] += 1;
2022
2023 PTHREAD_TRACE(TRACE_wq_req_kevent_octhreads | DBG_FUNC_NONE, wq, priority, wq->wq_kevent_ocrequests[class], 1, 0);
2024
2025 WQ_TIMER_NEEDED(wq, start_timer);
2026 }
2027 goto done;
2028 }
2029
2030 // Having no overcommit requests, try to find any request that can start
2031 // There's no TOCTTOU since we hold the workqueue lock
2032 for (int i = 0; i < requests_count; i++){
2033 workq_reqthreads_req_t req = requests + i;
2034 priority = req->priority;
2035
2036 if (req->count == 0)
2037 continue;
2038
2039 int class = pthread_priority_get_class_index(priority);
2040
2041 // Ask if we can start a new thread at the given class. Pass NUM_BUCKETS as
2042 // my class to indicate we won't reuse this thread
2043 if (may_start_constrained_thread(wq, class, WORKQUEUE_NUM_BUCKETS, NULL)){
2044 wq->wq_reqcount += 1;
2045 wq->wq_requests[class] += 1;
2046 wq->wq_kevent_requests[class] += 1;
2047
2048 PTHREAD_TRACE(TRACE_wq_req_kevent_threads | DBG_FUNC_NONE, wq, priority, wq->wq_kevent_requests[class], 1, 0);
2049
2050 // I can't make this call because it's not yet safe to make from
2051 // scheduler callout context, so instead we'll just start up the timer
2052 // which will spin up the thread when it files.
2053 // workqueue_run_one(p, wq, FALSE, priority);
2054
2055 WQ_TIMER_NEEDED(wq, start_timer);
2056
2057 goto done;
2058 }
2059 }
2060
2061 // Okay, here's the fun case: we can't spin up any of the non-overcommit threads
2062 // that we've seen a request for, so we kick this over to the event manager thread
2063
2064event_manager:
2065 PTHREAD_TRACE(TRACE_wq_req_event_manager | DBG_FUNC_NONE, wq, wq->wq_event_manager_priority, wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET], wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET], 0);
2066
2067 if (wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0){
2068 wq->wq_reqcount += 1;
2069 wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] = 1;
2070 }
2071 wq->wq_kevent_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] = 1;
2072
2073 if (wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0)
2074 WQ_TIMER_NEEDED(wq, start_timer);
2075
2076done:
2077 workqueue_unlock(p);
2078
2079 if (start_timer == TRUE)
2080 workqueue_interval_timer_start(wq);
2081
2082 PTHREAD_TRACE(TRACE_wq_kevent_req_threads | DBG_FUNC_END, wq, start_timer, 0, 0, 0);
2083
2084 return THREAD_NULL;
2085}
2086
2087
2088static int wqops_thread_return(struct proc *p){
2089 thread_t th = current_thread();
2090 struct uthread *uth = pthread_kern->get_bsdthread_info(th);
2091 struct threadlist *tl = util_get_thread_threadlist_entry(th);
2092
2093 /* reset signal mask on the workqueue thread to default state */
2094 if (pthread_kern->uthread_get_sigmask(uth) != (sigset_t)(~workq_threadmask)) {
2095 pthread_kern->proc_lock(p);
2096 pthread_kern->uthread_set_sigmask(uth, ~workq_threadmask);
2097 pthread_kern->proc_unlock(p);
2098 }
2099
2100 /* dropping WQ override counts has to be done outside the wq lock. */
2101 wq_thread_override_reset(th, THREAD_QOS_OVERRIDE_RESOURCE_WILDCARD);
2102
2103 workqueue_lock_spin(p);
2104
2105 struct workqueue *wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p);
2106 if (wq == NULL || !tl) {
2107 workqueue_unlock(p);
2108
2109 return EINVAL;
2110 }
2111 PTHREAD_TRACE(TRACE_wq_runitem | DBG_FUNC_END, wq, 0, 0, 0, 0);
2112
2113 (void)workqueue_run_nextreq(p, wq, th, RUN_NEXTREQ_DEFAULT, 0);
2114 /*
2115 * workqueue_run_nextreq is responsible for
2116 * dropping the workqueue lock in all cases
2117 */
2118 return 0;
2119}
2120
2121/**
2122 * Multiplexed call to interact with the workqueue mechanism
2123 */
2124int
2125_workq_kernreturn(struct proc *p,
2126 int options,
2127 __unused user_addr_t item,
2128 int arg2,
2129 int arg3,
2130 int32_t *retval)
2131{
2132 int error = 0;
2133
2134 if (pthread_kern->proc_get_register(p) == 0) {
2135 return EINVAL;
2136 }
2137
2138 switch (options) {
2139 case WQOPS_QUEUE_NEWSPISUPP: {
2140 /*
2141 * arg2 = offset of serialno into dispatch queue
2142 * arg3 = kevent support
2143 */
2144 int offset = arg2;
2145 if (arg3 & 0x01){
2146 // If we get here, then userspace has indicated support for kevent delivery.
2147 }
2148
2149 pthread_kern->proc_set_dispatchqueue_serialno_offset(p, (uint64_t)offset);
2150 break;
2151 }
2152 case WQOPS_QUEUE_REQTHREADS: {
2153 /*
2154 * arg2 = number of threads to start
2155 * arg3 = priority
2156 */
2157 error = wqops_queue_reqthreads(p, arg2, arg3);
2158 break;
2159 }
2160 case WQOPS_SET_EVENT_MANAGER_PRIORITY: {
2161 /*
2162 * arg2 = priority for the manager thread
2163 *
2164 * if _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG is set, the
2165 * ~_PTHREAD_PRIORITY_FLAGS_MASK contains a scheduling priority instead
2166 * of a QOS value
2167 */
2168 pthread_priority_t pri = arg2;
2169
2170 workqueue_lock_spin(p);
2171 struct workqueue *wq = (struct workqueue *)pthread_kern->proc_get_wqptr(p);
2172 if (wq == NULL ) {
2173 workqueue_unlock(p);
2174 error = EINVAL;
2175 break;
2176 }
2177 if (pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG){
2178 // If userspace passes a scheduling priority, that takes precidence
2179 // over any QoS. (So, userspace should take care not to accidenatally
2180 // lower the priority this way.)
2181 uint32_t sched_pri = pri & (~_PTHREAD_PRIORITY_FLAGS_MASK);
2182 if (wq->wq_event_manager_priority & _PTHREAD_PRIORITY_SCHED_PRI_FLAG){
2183 wq->wq_event_manager_priority = MAX(sched_pri, wq->wq_event_manager_priority & (~_PTHREAD_PRIORITY_FLAGS_MASK))
2184 | _PTHREAD_PRIORITY_SCHED_PRI_FLAG | _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
2185 } else {
2186 wq->wq_event_manager_priority = sched_pri
2187 | _PTHREAD_PRIORITY_SCHED_PRI_FLAG | _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
2188 }
2189 } else if ((wq->wq_event_manager_priority & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) == 0){
2190 int cur_qos = pthread_priority_get_qos_class(wq->wq_event_manager_priority);
2191 int new_qos = pthread_priority_get_qos_class(pri);
2192 wq->wq_event_manager_priority = (uint32_t)pthread_qos_class_get_priority(MAX(cur_qos, new_qos)) | _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
2193 }
2194 workqueue_unlock(p);
2195 break;
2196 }
2197 case WQOPS_THREAD_KEVENT_RETURN: {
2198 int32_t kevent_retval;
2199 int ret = kevent_qos_internal(p, -1, item, arg2, item, arg2, NULL, NULL, KEVENT_FLAG_WORKQ | KEVENT_FLAG_IMMEDIATE | KEVENT_FLAG_ERROR_EVENTS, &kevent_retval);
2200 // We shouldn't be getting more errors out than events we put in, so
2201 // reusing the input buffer should always provide enough space
2202 assert(ret == KERN_SUCCESS && kevent_retval >= 0);
2203 if (ret != KERN_SUCCESS){
2204 error = ret;
2205 break;
2206 } else if (kevent_retval > 0){
2207 assert(kevent_retval <= arg2);
2208 *retval = kevent_retval;
2209 error = 0;
2210 break;
2211 }
2212 } /* FALLTHROUGH */
2213 case WQOPS_THREAD_RETURN: {
2214 error = wqops_thread_return(p);
2215 // NOT REACHED except in case of error
2216 assert(error);
2217 break;
2218 }
2219 default:
2220 error = EINVAL;
2221 break;
2222 }
2223 return (error);
2224}
2225
2226
2227static boolean_t
2228workqueue_run_one(proc_t p, struct workqueue *wq, boolean_t overcommit, pthread_priority_t priority)
2229{
2230 boolean_t ran_one;
2231
2232 if (wq->wq_thidlecount == 0) {
2233 if (overcommit == FALSE) {
2234 if (wq->wq_constrained_threads_scheduled < wq->wq_max_concurrency)
2235 workqueue_addnewthread(wq, overcommit);
2236 } else {
2237 workqueue_addnewthread(wq, overcommit);
2238
2239 if (wq->wq_thidlecount == 0)
2240 return (FALSE);
2241 }
2242 }
2243 ran_one = workqueue_run_nextreq(p, wq, THREAD_NULL, overcommit ? RUN_NEXTREQ_OVERCOMMIT : RUN_NEXTREQ_DEFAULT, priority);
2244 /*
2245 * workqueue_run_nextreq is responsible for
2246 * dropping the workqueue lock in all cases
2247 */
2248 workqueue_lock_spin(p);
2249
2250 return (ran_one);
2251}
2252
2253/*
2254 * this is a workqueue thread with no more
2255 * work to do... park it for now
2256 */
2257static void
2258parkit(struct workqueue *wq, struct threadlist *tl, thread_t thread)
2259{
2260 uint32_t us_to_wait;
2261
2262 TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry);
2263 tl->th_flags &= ~TH_LIST_RUNNING;
2264
2265 tl->th_flags |= TH_LIST_BLOCKED;
2266 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, tl, th_entry);
2267
2268 pthread_kern->thread_sched_call(thread, NULL);
2269
2270 OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority]);
2271 wq->wq_thscheduled_count[tl->th_priority]--;
2272 wq->wq_threads_scheduled--;
2273
2274 if (tl->th_flags & TH_LIST_CONSTRAINED) {
2275 wq->wq_constrained_threads_scheduled--;
2276 wq->wq_lflags &= ~WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
2277 tl->th_flags &= ~TH_LIST_CONSTRAINED;
2278 }
2279
2280 if (wq->wq_thidlecount < 100)
2281 us_to_wait = wq_reduce_pool_window_usecs - (wq->wq_thidlecount * (wq_reduce_pool_window_usecs / 100));
2282 else
2283 us_to_wait = wq_reduce_pool_window_usecs / 100;
2284
2285 wq->wq_thidlecount++;
2286 wq->wq_lflags &= ~WQL_EXCEEDED_TOTAL_THREAD_LIMIT;
2287
2288 assert_wait_timeout_with_leeway((caddr_t)tl, (THREAD_INTERRUPTIBLE),
2289 TIMEOUT_URGENCY_SYS_BACKGROUND|TIMEOUT_URGENCY_LEEWAY, us_to_wait,
2290 wq_reduce_pool_window_usecs, NSEC_PER_USEC);
2291
2292 PTHREAD_TRACE1(TRACE_wq_thread_park | DBG_FUNC_START, wq, wq->wq_threads_scheduled, wq->wq_thidlecount, us_to_wait, thread_tid(thread));
2293}
2294
2295static boolean_t may_start_constrained_thread(struct workqueue *wq, uint32_t at_priclass, uint32_t my_priclass, boolean_t *start_timer){
2296 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
2297 /*
2298 * we need 1 or more constrained threads to return to the kernel before
2299 * we can dispatch additional work
2300 */
2301 return FALSE;
2302 }
2303
2304 uint32_t busycount = 0;
2305 uint32_t thactive_count = wq->wq_thactive_count[at_priclass];
2306
2307 // Has our most recently blocked thread blocked recently enough that we
2308 // should still consider it busy?
2309 // XXX should this be wq->wq_thscheduled_count[at_priclass] > thactive_count ?
2310 if (wq->wq_thscheduled_count[at_priclass]) {
2311 if (wq_thread_is_busy(mach_absolute_time(), &wq->wq_lastblocked_ts[at_priclass])) {
2312 busycount++;
2313 }
2314 }
2315
2316 if (my_priclass < WORKQUEUE_NUM_BUCKETS && my_priclass == at_priclass){
2317 /*
2318 * dont't count this thread as currently active
2319 */
2320 thactive_count--;
2321 }
2322
2323 if (thactive_count + busycount >= wq->wq_max_concurrency) {
2324 if (busycount && start_timer) {
2325 /*
2326 * we found at least 1 thread in the
2327 * 'busy' state... make sure we start
2328 * the timer because if they are the only
2329 * threads keeping us from scheduling
2330 * this work request, we won't get a callback
2331 * to kick off the timer... we need to
2332 * start it now...
2333 */
2334 WQ_TIMER_NEEDED(wq, *start_timer);
2335 }
2336
2337 PTHREAD_TRACE(TRACE_wq_overcommitted|DBG_FUNC_NONE, wq, (start_timer ? 1<<7 : 0) | pthread_priority_from_class_index(at_priclass), thactive_count, busycount, 0);
2338
2339 return FALSE;
2340 }
2341 return TRUE;
2342}
2343
2344static struct threadlist *pop_from_thidlelist(struct workqueue *wq, uint32_t priclass, int *upcall_flags, int *wake_thread){
2345 struct threadlist *tl = TAILQ_FIRST(&wq->wq_thidlelist);
2346 TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
2347 wq->wq_thidlecount--;
2348
2349 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, tl, th_entry);
2350
2351 if ((tl->th_flags & TH_LIST_SUSPENDED) == TH_LIST_SUSPENDED) {
2352 tl->th_flags &= ~TH_LIST_SUSPENDED;
2353 *upcall_flags &= ~WQ_FLAG_THREAD_REUSE;
2354
2355 } else if ((tl->th_flags & TH_LIST_BLOCKED) == TH_LIST_BLOCKED) {
2356 tl->th_flags &= ~TH_LIST_BLOCKED;
2357 *wake_thread = 1;
2358 }
2359 tl->th_flags |= TH_LIST_RUNNING | TH_LIST_BUSY;
2360
2361 wq->wq_threads_scheduled++;
2362 wq->wq_thscheduled_count[priclass]++;
2363 OSAddAtomic(1, &wq->wq_thactive_count[priclass]);
2364
2365 return tl;
2366}
2367
2368static void
2369reset_to_priority(struct threadlist *tl, pthread_priority_t pri){
2370 kern_return_t ret;
2371 thread_t th = tl->th_thread;
2372
2373 if (tl->th_flags & TH_LIST_EVENT_MGR_SCHED_PRI){
2374 thread_precedence_policy_data_t precedinfo = {
2375 .importance = 0
2376 };
2377 ret = pthread_kern->thread_policy_set_internal(th, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
2378 assert(ret == KERN_SUCCESS || ret == KERN_TERMINATED);
2379 tl->th_flags &= ~TH_LIST_EVENT_MGR_SCHED_PRI;
2380 }
2381
2382 thread_qos_policy_data_t qosinfo = {
2383 .qos_tier = pthread_priority_get_qos_class(pri),
2384 .tier_importance = 0
2385 };
2386 ret = pthread_kern->thread_policy_set_internal(th, THREAD_QOS_POLICY, (thread_policy_t)&qosinfo, THREAD_QOS_POLICY_COUNT);
2387 assert(ret == KERN_SUCCESS || ret == KERN_TERMINATED);
2388}
2389
2390static void
2391reset_to_schedpri(struct threadlist *tl, pthread_priority_t pri){
2392 kern_return_t ret;
2393 thread_t th = tl->th_thread;
2394
2395 thread_qos_policy_data_t qosinfo = {
2396 .qos_tier = THREAD_QOS_UNSPECIFIED,
2397 .tier_importance = 0
2398 };
2399 ret = pthread_kern->thread_policy_set_internal(th, THREAD_QOS_POLICY, (thread_policy_t)&qosinfo, THREAD_QOS_POLICY_COUNT);
2400 assert(ret == KERN_SUCCESS || ret == KERN_TERMINATED);
2401
2402 thread_precedence_policy_data_t precedinfo = {
2403 .importance = ((pri & (~_PTHREAD_PRIORITY_FLAGS_MASK)) - BASEPRI_DEFAULT)
2404 };
2405 ret = pthread_kern->thread_policy_set_internal(th, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
2406 assert(ret == KERN_SUCCESS || ret == KERN_TERMINATED);
2407
2408 tl->th_flags |= TH_LIST_EVENT_MGR_SCHED_PRI;
2409}
2410
2411/**
2412 * grabs a thread for a request
2413 *
2414 * - called with the workqueue lock held...
2415 * - responsible for dropping it in all cases
2416 * - if provided mode is for overcommit, doesn't consume a reqcount
2417 *
2418 */
2419static boolean_t
2420workqueue_run_nextreq(proc_t p, struct workqueue *wq, thread_t thread,
2421 enum run_nextreq_mode mode, pthread_priority_t oc_prio)
2422{
2423 thread_t th_to_run = THREAD_NULL;
2424 int wake_thread = 0;
2425 int upcall_flags = WQ_FLAG_THREAD_REUSE;
2426 uint32_t priclass;
2427 struct threadlist *tl = NULL;
2428 struct uthread *uth = NULL;
2429 boolean_t start_timer = FALSE;
2430
2431 // valid modes to call this function with
2432 assert(mode == RUN_NEXTREQ_DEFAULT || mode == RUN_NEXTREQ_OVERCOMMIT || mode == RUN_NEXTREQ_UNCONSTRAINED);
2433 // may only have a priority if in OVERCOMMIT mode
2434 assert(mode == RUN_NEXTREQ_OVERCOMMIT || oc_prio == 0);
2435 // thread == thread_null means "please spin up a new workqueue thread, we can't reuse this"
2436 // thread != thread_null is thread reuse, and must be the current thread
2437 assert(thread == THREAD_NULL || thread == current_thread());
2438
2439 PTHREAD_TRACE(TRACE_wq_run_nextitem|DBG_FUNC_START, wq, thread, wq->wq_thidlecount, wq->wq_reqcount, 0);
2440
2441 if (thread != THREAD_NULL) {
2442 uth = pthread_kern->get_bsdthread_info(thread);
2443
2444 if ((tl = pthread_kern->uthread_get_threadlist(uth)) == NULL) {
2445 panic("wq thread with no threadlist");
2446 }
2447 }
2448
2449 /*
2450 * from here until we drop the workq lock
2451 * we can't be pre-empted since we hold
2452 * the lock in spin mode... this is important
2453 * since we have to independently update the priority that
2454 * the thread is associated with and the priorty based
2455 * counters that "workqueue_callback" also changes and bases
2456 * decisons on.
2457 */
2458
2459 if (mode == RUN_NEXTREQ_OVERCOMMIT) {
2460 priclass = pthread_priority_get_class_index(oc_prio);
2461 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2462 } else if (wq->wq_reqcount == 0){
2463 // no work to do. we'll check again when new work arrives.
2464 goto done;
2465 } else if (wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] &&
2466 ((wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 0) ||
2467 (thread != THREAD_NULL && tl->th_priority == WORKQUEUE_EVENT_MANAGER_BUCKET))){
2468 // There's an event manager request and either:
2469 // - no event manager currently running
2470 // - we are re-using the event manager
2471 mode = RUN_NEXTREQ_EVENT_MANAGER;
2472 priclass = WORKQUEUE_EVENT_MANAGER_BUCKET;
2473 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
2474 if (wq->wq_kevent_requests[WORKQUEUE_EVENT_MANAGER_BUCKET])
2475 upcall_flags |= WQ_FLAG_THREAD_KEVENT;
2476 } else {
2477 // Find highest priority and check for special request types
2478 for (priclass = 0; priclass < WORKQUEUE_EVENT_MANAGER_BUCKET; priclass++) {
2479 if (wq->wq_requests[priclass])
2480 break;
2481 }
2482 if (priclass == WORKQUEUE_EVENT_MANAGER_BUCKET){
2483 // only request should have been event manager since it's not in a bucket,
2484 // but we weren't able to handle it since there's already an event manager running,
2485 // so we fell into this case
2486 assert(wq->wq_requests[WORKQUEUE_EVENT_MANAGER_BUCKET] == 1 &&
2487 wq->wq_thscheduled_count[WORKQUEUE_EVENT_MANAGER_BUCKET] == 1 &&
2488 wq->wq_reqcount == 1);
2489 goto done;
2490 }
2491
2492 if (wq->wq_kevent_ocrequests[priclass]){
2493 mode = RUN_NEXTREQ_DEFERRED_OVERCOMMIT;
2494 upcall_flags |= WQ_FLAG_THREAD_KEVENT;
2495 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2496 } else if (wq->wq_ocrequests[priclass]){
2497 mode = RUN_NEXTREQ_DEFERRED_OVERCOMMIT;
2498 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2499 } else if (wq->wq_kevent_requests[priclass]){
2500 upcall_flags |= WQ_FLAG_THREAD_KEVENT;
2501 }
2502 }
2503
2504 if (mode == RUN_NEXTREQ_DEFAULT /* non-overcommit */){
2505 uint32_t my_priclass = (thread != THREAD_NULL) ? tl->th_priority : WORKQUEUE_NUM_BUCKETS;
2506 if (may_start_constrained_thread(wq, priclass, my_priclass, &start_timer) == FALSE){
2507 // per policy, we won't start another constrained thread
2508 goto done;
2509 }
2510 }
2511
2512 if (thread != THREAD_NULL) {
2513 /*
2514 * thread is non-NULL here when we return from userspace
2515 * in workq_kernreturn, rather than trying to find a thread
2516 * we pick up new work for this specific thread.
2517 */
2518 th_to_run = thread;
2519 } else if (wq->wq_thidlecount == 0) {
2520 /*
2521 * we have no additional threads waiting to pick up
2522 * work, however, there is additional work to do.
2523 */
2524 WQ_TIMER_NEEDED(wq, start_timer);
2525
2526 PTHREAD_TRACE(TRACE_wq_stalled, wq, wq->wq_nthreads, start_timer, 0, 0);
2527
2528 goto done;
2529 } else {
2530 // there is both work available and an idle thread, so activate a thread
2531 tl = pop_from_thidlelist(wq, priclass, &upcall_flags, &wake_thread);
2532 th_to_run = tl->th_thread;
2533 }
2534
2535 // Adjust counters and thread flags AKA consume the request
2536 // TODO: It would be lovely if OVERCOMMIT consumed reqcount
2537 switch (mode) {
2538 case RUN_NEXTREQ_DEFAULT:
2539 case RUN_NEXTREQ_UNCONSTRAINED:
2540 wq->wq_reqcount--;
2541 wq->wq_requests[priclass]--;
2542
2543 if (mode == RUN_NEXTREQ_DEFAULT){
2544 if (!(tl->th_flags & TH_LIST_CONSTRAINED)) {
2545 wq->wq_constrained_threads_scheduled++;
2546 tl->th_flags |= TH_LIST_CONSTRAINED;
2547 }
2548 } else if (mode == RUN_NEXTREQ_UNCONSTRAINED){
2549 if (tl->th_flags & TH_LIST_CONSTRAINED) {
2550 // XXX: Why aren't we unsetting CONSTRAINED_THREAD_LIMIT here
2551 wq->wq_constrained_threads_scheduled--;
2552 tl->th_flags &= ~TH_LIST_CONSTRAINED;
2553 }
2554 }
2555 if (upcall_flags & WQ_FLAG_THREAD_KEVENT){
2556 wq->wq_kevent_requests[priclass]--;
2557 }
2558 break;
2559
2560 case RUN_NEXTREQ_EVENT_MANAGER:
2561 wq->wq_reqcount--;
2562 wq->wq_requests[priclass]--;
2563
2564 if (tl->th_flags & TH_LIST_CONSTRAINED) {
2565 wq->wq_constrained_threads_scheduled--;
2566 tl->th_flags &= ~TH_LIST_CONSTRAINED;
2567 }
2568 if (upcall_flags & WQ_FLAG_THREAD_KEVENT){
2569 wq->wq_kevent_requests[priclass]--;
2570 }
2571 break;
2572
2573 case RUN_NEXTREQ_DEFERRED_OVERCOMMIT:
2574 wq->wq_reqcount--;
2575 wq->wq_requests[priclass]--;
2576 if (upcall_flags & WQ_FLAG_THREAD_KEVENT){
2577 wq->wq_kevent_ocrequests[priclass]--;
2578 } else {
2579 wq->wq_ocrequests[priclass]--;
2580 }
2581 /* FALLTHROUGH */
2582 case RUN_NEXTREQ_OVERCOMMIT:
2583 if (tl->th_flags & TH_LIST_CONSTRAINED) {
2584 wq->wq_constrained_threads_scheduled--;
2585 tl->th_flags &= ~TH_LIST_CONSTRAINED;
2586 }
2587 break;
2588 }
2589
2590 // Confirm we've maintained our counter invariants
2591 assert(wq->wq_requests[priclass] < UINT16_MAX);
2592 assert(wq->wq_ocrequests[priclass] < UINT16_MAX);
2593 assert(wq->wq_kevent_requests[priclass] < UINT16_MAX);
2594 assert(wq->wq_kevent_ocrequests[priclass] < UINT16_MAX);
2595 assert(wq->wq_ocrequests[priclass] + wq->wq_kevent_requests[priclass] +
2596 wq->wq_kevent_ocrequests[priclass] <=
2597 wq->wq_requests[priclass]);
2598
2599 uint32_t orig_class = tl->th_priority;
2600 tl->th_priority = (uint8_t)priclass;
2601
2602 if ((thread != THREAD_NULL) && (orig_class != priclass)) {
2603 /*
2604 * we need to adjust these counters based on this
2605 * thread's new disposition w/r to priority
2606 */
2607 OSAddAtomic(-1, &wq->wq_thactive_count[orig_class]);
2608 OSAddAtomic(1, &wq->wq_thactive_count[priclass]);
2609
2610 wq->wq_thscheduled_count[orig_class]--;
2611 wq->wq_thscheduled_count[priclass]++;
2612 }
2613 wq->wq_thread_yielded_count = 0;
2614
2615 workqueue_unlock(p);
2616
2617 pthread_priority_t outgoing_priority;
2618 if (mode == RUN_NEXTREQ_EVENT_MANAGER){
2619 outgoing_priority = wq->wq_event_manager_priority;
2620 } else {
2621 outgoing_priority = pthread_priority_from_class_index(priclass);
2622 }
2623
2624 PTHREAD_TRACE(TRACE_wq_reset_priority | DBG_FUNC_START, wq, thread_tid(tl->th_thread), outgoing_priority, 0, 0);
2625 if (outgoing_priority & _PTHREAD_PRIORITY_SCHED_PRI_FLAG){
2626 reset_to_schedpri(tl, outgoing_priority & (~_PTHREAD_PRIORITY_FLAGS_MASK));
2627 } else if (orig_class != priclass) {
2628 reset_to_priority(tl, outgoing_priority);
2629 }
2630 PTHREAD_TRACE(TRACE_wq_reset_priority | DBG_FUNC_END, wq, thread_tid(tl->th_thread), outgoing_priority, 0, 0);
2631
2632 /*
2633 * if current thread is reused for work request, does not return via unix_syscall
2634 */
2635 wq_runreq(p, outgoing_priority, th_to_run, tl, upcall_flags, wake_thread, (thread == th_to_run));
2636
2637 PTHREAD_TRACE(TRACE_wq_run_nextitem|DBG_FUNC_END, wq, thread_tid(th_to_run), mode == RUN_NEXTREQ_OVERCOMMIT, 1, 0);
2638
2639 return (TRUE);
2640
2641done:
2642 if (thread != THREAD_NULL){
2643 parkit(wq,tl,thread);
2644 }
2645
2646 workqueue_unlock(p);
2647
2648 if (start_timer)
2649 workqueue_interval_timer_start(wq);
2650
2651 PTHREAD_TRACE(TRACE_wq_run_nextitem | DBG_FUNC_END, wq, thread_tid(thread), start_timer, 3, 0);
2652
2653 if (thread != THREAD_NULL){
2654 thread_block((thread_continue_t)wq_unpark_continue);
2655 /* NOT REACHED */
2656 }
2657
2658 return (FALSE);
2659}
2660
2661/**
2662 * Called when a new thread is created
2663 */
2664static void
2665wq_unsuspend_continue(void)
2666{
2667 struct uthread *uth = NULL;
2668 thread_t th_to_unsuspend;
2669 struct threadlist *tl;
2670 proc_t p;
2671
2672 th_to_unsuspend = current_thread();
2673 uth = pthread_kern->get_bsdthread_info(th_to_unsuspend);
2674
2675 if (uth != NULL && (tl = pthread_kern->uthread_get_threadlist(uth)) != NULL) {
2676
2677 if ((tl->th_flags & (TH_LIST_RUNNING | TH_LIST_BUSY)) == TH_LIST_RUNNING) {
2678 /*
2679 * most likely a normal resume of this thread occurred...
2680 * it's also possible that the thread was aborted after we
2681 * finished setting it up so that it could be dispatched... if
2682 * so, thread_bootstrap_return will notice the abort and put
2683 * the thread on the path to self-destruction
2684 */
2685normal_resume_to_user:
2686 pthread_kern->thread_sched_call(th_to_unsuspend, workqueue_callback);
2687 pthread_kern->thread_bootstrap_return();
2688 }
2689 /*
2690 * if we get here, it's because we've been resumed due to
2691 * an abort of this thread (process is crashing)
2692 */
2693 p = current_proc();
2694
2695 workqueue_lock_spin(p);
2696
2697 if (tl->th_flags & TH_LIST_SUSPENDED) {
2698 /*
2699 * thread has been aborted while still on our idle
2700 * queue... remove it from our domain...
2701 * workqueue_removethread consumes the lock
2702 */
2703 workqueue_removethread(tl, 0);
2704 pthread_kern->thread_bootstrap_return();
2705 }
2706 while ((tl->th_flags & TH_LIST_BUSY)) {
2707 /*
2708 * this thread was aborted after we started making
2709 * it runnable, but before we finished dispatching it...
2710 * we need to wait for that process to finish,
2711 * and we need to ask for a wakeup instead of a
2712 * thread_resume since the abort has already resumed us
2713 */
2714 tl->th_flags |= TH_LIST_NEED_WAKEUP;
2715
2716 assert_wait((caddr_t)tl, (THREAD_UNINT));
2717
2718 workqueue_unlock(p);
2719 thread_block(THREAD_CONTINUE_NULL);
2720 workqueue_lock_spin(p);
2721 }
2722 workqueue_unlock(p);
2723 /*
2724 * we have finished setting up the thread's context...
2725 * thread_bootstrap_return will take us through the abort path
2726 * where the thread will self destruct
2727 */
2728 goto normal_resume_to_user;
2729 }
2730 pthread_kern->thread_bootstrap_return();
2731}
2732
2733/**
2734 * parked thread wakes up
2735 */
2736static void
2737wq_unpark_continue(void)
2738{
2739 struct uthread *uth;
2740 struct threadlist *tl;
2741
2742 thread_t th_to_unpark = current_thread();
2743
2744 if ((uth = pthread_kern->get_bsdthread_info(th_to_unpark)) == NULL)
2745 goto done;
2746 if ((tl = pthread_kern->uthread_get_threadlist(uth)) == NULL)
2747 goto done;
2748
2749 /*
2750 * check if a normal wakeup of this thread occurred... if so, there's no need
2751 * for any synchronization with the timer and wq_runreq so we just skip all this.
2752 */
2753 if ((tl->th_flags & (TH_LIST_RUNNING | TH_LIST_BUSY)) != TH_LIST_RUNNING) {
2754 proc_t p = current_proc();
2755
2756 workqueue_lock_spin(p);
2757
2758 if ( !(tl->th_flags & TH_LIST_RUNNING)) {
2759 /*
2760 * the timer popped us out and we've not
2761 * been moved off of the idle list
2762 * so we should now self-destruct
2763 *
2764 * workqueue_removethread consumes the lock
2765 */
2766 workqueue_removethread(tl, 0);
2767 pthread_kern->unix_syscall_return(0);
2768 }
2769
2770 /*
2771 * the timer woke us up, but we have already
2772 * started to make this a runnable thread,
2773 * but have not yet finished that process...
2774 * so wait for the normal wakeup
2775 */
2776 while ((tl->th_flags & TH_LIST_BUSY)) {
2777
2778 assert_wait((caddr_t)tl, (THREAD_UNINT));
2779
2780 workqueue_unlock(p);
2781
2782 thread_block(THREAD_CONTINUE_NULL);
2783
2784 workqueue_lock_spin(p);
2785 }
2786
2787 /*
2788 * we have finished setting up the thread's context
2789 * now we can return as if we got a normal wakeup
2790 */
2791 workqueue_unlock(p);
2792 }
2793
2794 pthread_kern->thread_sched_call(th_to_unpark, workqueue_callback);
2795
2796 // FIXME: What's this?
2797 PTHREAD_TRACE(0xefffd018 | DBG_FUNC_END, tl->th_workq, 0, 0, 0, 0);
2798
2799done:
2800
2801 // XXX should be using unix_syscall_return(EJUSTRETURN)
2802 pthread_kern->thread_exception_return();
2803}
2804
2805
2806
2807static void
2808wq_runreq(proc_t p, pthread_priority_t priority, thread_t th, struct threadlist *tl,
2809 int flags, int wake_thread, int return_directly)
2810{
2811 int ret = 0;
2812 boolean_t need_resume = FALSE;
2813
2814 PTHREAD_TRACE1(TRACE_wq_runitem | DBG_FUNC_START, tl->th_workq, flags, priority, thread_tid(current_thread()), thread_tid(th));
2815
2816 ret = _setup_wqthread(p, th, priority, flags, tl);
2817
2818 if (ret != 0)
2819 panic("setup_wqthread failed %x\n", ret);
2820
2821 if (return_directly) {
2822 PTHREAD_TRACE(TRACE_wq_run_nextitem|DBG_FUNC_END, tl->th_workq, 0, 0, 4, 0);
2823
2824 // XXX should be using unix_syscall_return(EJUSTRETURN)
2825 pthread_kern->thread_exception_return();
2826 panic("wq_runreq: thread_exception_return returned ...\n");
2827 }
2828 if (wake_thread) {
2829 workqueue_lock_spin(p);
2830
2831 tl->th_flags &= ~TH_LIST_BUSY;
2832 wakeup(tl);
2833
2834 workqueue_unlock(p);
2835 } else {
2836 PTHREAD_TRACE1(TRACE_wq_thread_suspend | DBG_FUNC_END, tl->th_workq, 0, 0, thread_tid(current_thread()), thread_tid(th));
2837
2838 workqueue_lock_spin(p);
2839
2840 if (tl->th_flags & TH_LIST_NEED_WAKEUP) {
2841 wakeup(tl);
2842 } else {
2843 need_resume = TRUE;
2844 }
2845
2846 tl->th_flags &= ~(TH_LIST_BUSY | TH_LIST_NEED_WAKEUP);
2847
2848 workqueue_unlock(p);
2849
2850 if (need_resume) {
2851 /*
2852 * need to do this outside of the workqueue spin lock
2853 * since thread_resume locks the thread via a full mutex
2854 */
2855 pthread_kern->thread_resume(th);
2856 }
2857 }
2858}
2859
2860#define KEVENT_LIST_LEN 16
2861#define KEVENT_DATA_SIZE (32 * 1024)
2862
2863/**
2864 * configures initial thread stack/registers to jump into:
2865 * _pthread_wqthread(pthread_t self, mach_port_t kport, void *stackaddr, void *keventlist, int flags, int nkevents);
2866 * to get there we jump through assembily stubs in pthread_asm.s. Those
2867 * routines setup a stack frame, using the current stack pointer, and marshall
2868 * arguments from registers to the stack as required by the ABI.
2869 *
2870 * One odd thing we do here is to start the pthread_t 4k below what would be the
2871 * top of the stack otherwise. This is because usually only the first 4k of the
2872 * pthread_t will be used and so we want to put it on the same 16k page as the
2873 * top of the stack to save memory.
2874 *
2875 * When we are done the stack will look like:
2876 * |-----------| th_stackaddr + th_allocsize
2877 * |pthread_t | th_stackaddr + DEFAULT_STACKSIZE + guardsize + PTHREAD_STACK_OFFSET
2878 * |kevent list| optionally - at most KEVENT_LIST_LEN events
2879 * |kevent data| optionally - at most KEVENT_DATA_SIZE bytes
2880 * |stack gap | bottom aligned to 16 bytes, and at least as big as stack_gap_min
2881 * | STACK |
2882 * | ⇓ |
2883 * | |
2884 * |guard page | guardsize
2885 * |-----------| th_stackaddr
2886 */
2887int
2888_setup_wqthread(proc_t p, thread_t th, pthread_priority_t priority, int flags, struct threadlist *tl)
2889{
2890 int error = 0;
2891
2892 const vm_size_t guardsize = vm_map_page_size(tl->th_workq->wq_map);
2893 const vm_size_t stack_gap_min = (proc_is64bit(p) == 0) ? C_32_STK_ALIGN : C_64_REDZONE_LEN;
2894 const vm_size_t stack_align_min = (proc_is64bit(p) == 0) ? C_32_STK_ALIGN : C_64_STK_ALIGN;
2895
2896 user_addr_t pthread_self_addr = (user_addr_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + guardsize + PTHREAD_T_OFFSET);
2897 user_addr_t stack_top_addr = (user_addr_t)((pthread_self_addr - stack_gap_min) & -stack_align_min);
2898 user_addr_t stack_bottom_addr = (user_addr_t)(tl->th_stackaddr + guardsize);
2899
2900 /* Put the QoS class value into the lower bits of the reuse_thread register, this is where
2901 * the thread priority used to be stored anyway.
2902 */
2903 flags |= (_pthread_priority_get_qos_newest(priority) & WQ_FLAG_THREAD_PRIOMASK);
2904
2905 flags |= WQ_FLAG_THREAD_NEWSPI;
2906
2907 user_addr_t kevent_list = NULL;
2908 int kevent_count = 0;
2909 if (flags & WQ_FLAG_THREAD_KEVENT){
2910 kevent_list = pthread_self_addr - KEVENT_LIST_LEN * sizeof(struct kevent_qos_s);
2911 kevent_count = KEVENT_LIST_LEN;
2912
2913 user_addr_t kevent_data_buf = kevent_list - KEVENT_DATA_SIZE;
2914 user_size_t kevent_data_available = KEVENT_DATA_SIZE;
2915
2916 int32_t events_out = 0;
2917
2918 int ret = kevent_qos_internal(p, -1, NULL, 0, kevent_list, kevent_count,
2919 kevent_data_buf, &kevent_data_available,
2920 KEVENT_FLAG_WORKQ | KEVENT_FLAG_STACK_DATA | KEVENT_FLAG_STACK_EVENTS | KEVENT_FLAG_IMMEDIATE,
2921 &events_out);
2922
2923 // squash any errors into just empty output on non-debug builds
2924 assert(ret == KERN_SUCCESS && events_out != -1);
2925 if (ret != KERN_SUCCESS || events_out == -1){
2926 events_out = 0;
2927 kevent_data_available = KEVENT_DATA_SIZE;
2928 }
2929
2930 // We shouldn't get data out if there aren't events available
2931 assert(events_out != 0 || kevent_data_available == KEVENT_DATA_SIZE);
2932
2933 if (events_out >= 0){
2934 kevent_count = events_out;
2935 kevent_list = pthread_self_addr - kevent_count * sizeof(struct kevent_qos_s);
2936
2937 if (kevent_data_available == KEVENT_DATA_SIZE){
2938 stack_top_addr = (kevent_list - stack_gap_min) & -stack_align_min;
2939 } else {
2940 stack_top_addr = (kevent_data_buf + kevent_data_available - stack_gap_min) & -stack_align_min;
2941 }
2942 } else {
2943 kevent_list = NULL;
2944 kevent_count = 0;
2945 }
2946 }
2947
2948#if defined(__i386__) || defined(__x86_64__)
2949 int isLP64 = proc_is64bit(p);
2950
2951 if (isLP64 == 0) {
2952 x86_thread_state32_t state = {
2953 .eip = (unsigned int)pthread_kern->proc_get_wqthread(p),
2954 .eax = /* arg0 */ (unsigned int)pthread_self_addr,
2955 .ebx = /* arg1 */ (unsigned int)tl->th_thport,
2956 .ecx = /* arg2 */ (unsigned int)stack_bottom_addr,
2957 .edx = /* arg3 */ (unsigned int)kevent_list,
2958 .edi = /* arg4 */ (unsigned int)flags,
2959 .esi = /* arg5 */ (unsigned int)kevent_count,
2960
2961 .esp = (int)((vm_offset_t)stack_top_addr),
2962 };
2963
2964 (void)pthread_kern->thread_set_wq_state32(th, (thread_state_t)&state);
2965 } else {
2966 x86_thread_state64_t state64 = {
2967 // x86-64 already passes all the arguments in registers, so we just put them in their final place here
2968 .rip = (uint64_t)pthread_kern->proc_get_wqthread(p),
2969 .rdi = (uint64_t)pthread_self_addr,
2970 .rsi = (uint64_t)tl->th_thport,
2971 .rdx = (uint64_t)stack_bottom_addr,
2972 .rcx = (uint64_t)kevent_list,
2973 .r8 = (uint64_t)flags,
2974 .r9 = (uint64_t)kevent_count,
2975
2976 .rsp = (uint64_t)(stack_top_addr)
2977 };
2978
2979 error = pthread_kern->thread_set_wq_state64(th, (thread_state_t)&state64);
2980 if (error != KERN_SUCCESS) {
2981 error = EINVAL;
2982 }
2983 }
2984#else
2985#error setup_wqthread not defined for this architecture
2986#endif
2987
2988 return error;
2989}
2990
2991#if DEBUG
2992static int wq_kevent_test SYSCTL_HANDLER_ARGS {
2993 //(struct sysctl_oid *oidp, void *arg1, int arg2, struct sysctl_req *req)
2994#pragma unused(oidp, arg1, arg2)
2995 int error;
2996 struct workq_reqthreads_req_s requests[64] = {};
2997
2998 if (req->newlen > sizeof(requests) || req->newlen < sizeof(struct workq_reqthreads_req_s))
2999 return EINVAL;
3000
3001 error = copyin(req->newptr, requests, req->newlen);
3002 if (error) return error;
3003
3004 _workq_reqthreads(req->p, (int)(req->newlen / sizeof(struct workq_reqthreads_req_s)), requests);
3005
3006 return 0;
3007}
3008#endif // DEBUG
3009
3010#pragma mark - Misc
3011
3012int
3013_fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
3014{
3015 struct workqueue * wq;
3016 int error = 0;
3017 int activecount;
3018 uint32_t pri;
3019
3020 workqueue_lock_spin(p);
3021 if ((wq = pthread_kern->proc_get_wqptr(p)) == NULL) {
3022 error = EINVAL;
3023 goto out;
3024 }
3025 activecount = 0;
3026
3027 for (pri = 0; pri < WORKQUEUE_NUM_BUCKETS; pri++) {
3028 activecount += wq->wq_thactive_count[pri];
3029 }
3030 pwqinfo->pwq_nthreads = wq->wq_nthreads;
3031 pwqinfo->pwq_runthreads = activecount;
3032 pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
3033 pwqinfo->pwq_state = 0;
3034
3035 if (wq->wq_lflags & WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT) {
3036 pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
3037 }
3038
3039 if (wq->wq_lflags & WQL_EXCEEDED_TOTAL_THREAD_LIMIT) {
3040 pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
3041 }
3042
3043out:
3044 workqueue_unlock(p);
3045 return(error);
3046}
3047
3048int
3049_thread_selfid(__unused struct proc *p, uint64_t *retval)
3050{
3051 thread_t thread = current_thread();
3052 *retval = thread_tid(thread);
3053 return KERN_SUCCESS;
3054}
3055
3056void
3057_pthread_init(void)
3058{
3059 pthread_lck_grp_attr = lck_grp_attr_alloc_init();
3060 pthread_lck_grp = lck_grp_alloc_init("pthread", pthread_lck_grp_attr);
3061
3062 /*
3063 * allocate the lock attribute for pthread synchronizers
3064 */
3065 pthread_lck_attr = lck_attr_alloc_init();
3066
3067 _workqueue_init_lock((proc_t)get_bsdtask_info(kernel_task));
3068 pthread_list_mlock = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr);
3069
3070 pth_global_hashinit();
3071 psynch_thcall = thread_call_allocate(psynch_wq_cleanup, NULL);
3072 psynch_zoneinit();
3073
3074 /*
3075 * register sysctls
3076 */
3077 sysctl_register_oid(&sysctl__kern_wq_yielded_threshold);
3078 sysctl_register_oid(&sysctl__kern_wq_yielded_window_usecs);
3079 sysctl_register_oid(&sysctl__kern_wq_stalled_window_usecs);
3080 sysctl_register_oid(&sysctl__kern_wq_reduce_pool_window_usecs);
3081 sysctl_register_oid(&sysctl__kern_wq_max_timer_interval_usecs);
3082 sysctl_register_oid(&sysctl__kern_wq_max_threads);
3083 sysctl_register_oid(&sysctl__kern_wq_max_constrained_threads);
3084 sysctl_register_oid(&sysctl__kern_pthread_debug_tracing);
3085
3086#if DEBUG
3087 sysctl_register_oid(&sysctl__kern_wq_max_concurrency);
3088 sysctl_register_oid(&sysctl__debug_wq_kevent_test);
3089#endif
3090
3091 wq_max_concurrency = pthread_kern->ml_get_max_cpus();
3092
3093}