]> git.saurik.com Git - apple/xnu.git/blob - bsd/kern/pthread_synch.c
d037ee0a1402234839c5e9c7754c9776403d3d13
[apple/xnu.git] / bsd / kern / pthread_synch.c
1 /*
2 * Copyright (c) 2000-2009 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28 /* Copyright (c) 1995-2005 Apple Computer, Inc. All Rights Reserved */
29 /*
30 * pthread_synch.c
31 */
32
33 #define _PTHREAD_CONDATTR_T
34 #define _PTHREAD_COND_T
35 #define _PTHREAD_MUTEXATTR_T
36 #define _PTHREAD_MUTEX_T
37 #define _PTHREAD_RWLOCKATTR_T
38 #define _PTHREAD_RWLOCK_T
39
40 #undef pthread_mutexattr_t
41 #undef pthread_mutex_t
42 #undef pthread_condattr_t
43 #undef pthread_cond_t
44 #undef pthread_rwlockattr_t
45 #undef pthread_rwlock_t
46
47 #include <sys/param.h>
48 #include <sys/queue.h>
49 #include <sys/resourcevar.h>
50 #include <sys/proc_internal.h>
51 #include <sys/kauth.h>
52 #include <sys/systm.h>
53 #include <sys/timeb.h>
54 #include <sys/times.h>
55 #include <sys/acct.h>
56 #include <sys/kernel.h>
57 #include <sys/wait.h>
58 #include <sys/signalvar.h>
59 #include <sys/syslog.h>
60 #include <sys/stat.h>
61 #include <sys/lock.h>
62 #include <sys/kdebug.h>
63 #include <sys/sysproto.h>
64 #include <sys/pthread_internal.h>
65 #include <sys/vm.h>
66 #include <sys/user.h> /* for coredump */
67 #include <sys/proc_info.h> /* for fill_procworkqueue */
68
69
70 #include <mach/mach_types.h>
71 #include <mach/vm_prot.h>
72 #include <mach/semaphore.h>
73 #include <mach/sync_policy.h>
74 #include <mach/task.h>
75 #include <kern/kern_types.h>
76 #include <kern/task.h>
77 #include <kern/clock.h>
78 #include <mach/kern_return.h>
79 #include <kern/thread.h>
80 #include <kern/sched_prim.h>
81 #include <kern/kalloc.h>
82 #include <kern/sched_prim.h> /* for thread_exception_return */
83 #include <kern/processor.h>
84 #include <kern/affinity.h>
85 #include <kern/assert.h>
86 #include <mach/mach_vm.h>
87 #include <mach/mach_param.h>
88 #include <mach/thread_status.h>
89 #include <mach/thread_policy.h>
90 #include <mach/message.h>
91 #include <mach/port.h>
92 #include <vm/vm_protos.h>
93 #include <vm/vm_map.h> /* for current_map() */
94 #include <vm/vm_fault.h>
95 #include <mach/thread_act.h> /* for thread_resume */
96 #include <machine/machine_routines.h>
97 #if defined(__i386__)
98 #include <i386/machine_routines.h>
99 #include <i386/eflags.h>
100 #include <i386/psl.h>
101 #include <i386/seg.h>
102 #endif
103
104 #include <libkern/OSAtomic.h>
105
106 #if 0
107 #undef KERNEL_DEBUG
108 #define KERNEL_DEBUG KERNEL_DEBUG_CONSTANT
109 #undef KERNEL_DEBUG1
110 #define KERNEL_DEBUG1 KERNEL_DEBUG_CONSTANT1
111 #endif
112
113 lck_grp_attr_t *pthread_lck_grp_attr;
114 lck_grp_t *pthread_lck_grp;
115 lck_attr_t *pthread_lck_attr;
116
117 extern void thread_set_cthreadself(thread_t thread, uint64_t pself, int isLP64);
118 extern kern_return_t mach_port_deallocate(ipc_space_t, mach_port_name_t);
119 extern kern_return_t semaphore_signal_internal_trap(mach_port_name_t);
120
121 extern void workqueue_thread_yielded(void);
122
123 #if defined(__i386__) || defined(__x86_64__)
124 extern boolean_t is_useraddr64_canonical(uint64_t addr64);
125 #endif
126
127 static boolean_t workqueue_run_nextreq(proc_t p, struct workqueue *wq, thread_t th, boolean_t force_oc,
128 boolean_t overcommit, int oc_prio, int oc_affinity);
129
130 static boolean_t workqueue_run_one(proc_t p, struct workqueue *wq, boolean_t overcommit, int priority);
131
132 static void wq_runreq(proc_t p, boolean_t overcommit, uint32_t priority, thread_t th, struct threadlist *tl,
133 int reuse_thread, int wake_thread, int return_directly);
134
135 static int setup_wqthread(proc_t p, thread_t th, boolean_t overcommit, uint32_t priority, int reuse_thread, struct threadlist *tl);
136
137 static void wq_unpark_continue(void);
138 static void wq_unsuspend_continue(void);
139
140 static boolean_t workqueue_addnewthread(struct workqueue *wq, boolean_t oc_thread);
141 static void workqueue_removethread(struct threadlist *tl, int fromexit);
142 static void workqueue_lock_spin(proc_t);
143 static void workqueue_unlock(proc_t);
144
145 int proc_settargetconc(pid_t pid, int queuenum, int32_t targetconc);
146 int proc_setalltargetconc(pid_t pid, int32_t * targetconcp);
147
148 #define WQ_MAXPRI_MIN 0 /* low prio queue num */
149 #define WQ_MAXPRI_MAX 2 /* max prio queuenum */
150 #define WQ_PRI_NUM 3 /* number of prio work queues */
151
152 #define C_32_STK_ALIGN 16
153 #define C_64_STK_ALIGN 16
154 #define C_64_REDZONE_LEN 128
155 #define TRUNC_DOWN32(a,c) ((((uint32_t)a)-(c)) & ((uint32_t)(-(c))))
156 #define TRUNC_DOWN64(a,c) ((((uint64_t)a)-(c)) & ((uint64_t)(-(c))))
157
158
159 /* flag values for reuse field in the libc side _pthread_wqthread */
160 #define WQ_FLAG_THREAD_PRIOMASK 0x0000ffff
161 #define WQ_FLAG_THREAD_OVERCOMMIT 0x00010000 /* thread is with overcommit prio */
162 #define WQ_FLAG_THREAD_REUSE 0x00020000 /* thread is being reused */
163 #define WQ_FLAG_THREAD_NEWSPI 0x00040000 /* the call is with new SPIs */
164
165 /*
166 * Flags filed passed to bsdthread_create and back in pthread_start
167 31 <---------------------------------> 0
168 _________________________________________
169 | flags(8) | policy(8) | importance(16) |
170 -----------------------------------------
171 */
172 void _pthread_start(pthread_t self, mach_port_t kport, void *(*fun)(void *), void * funarg, size_t stacksize, unsigned int flags);
173
174 #define PTHREAD_START_CUSTOM 0x01000000
175 #define PTHREAD_START_SETSCHED 0x02000000
176 #define PTHREAD_START_DETACHED 0x04000000
177 #define PTHREAD_START_POLICY_BITSHIFT 16
178 #define PTHREAD_START_POLICY_MASK 0xff
179 #define PTHREAD_START_IMPORTANCE_MASK 0xffff
180
181 #define SCHED_OTHER POLICY_TIMESHARE
182 #define SCHED_FIFO POLICY_FIFO
183 #define SCHED_RR POLICY_RR
184
185
186
187 int
188 bsdthread_create(__unused struct proc *p, struct bsdthread_create_args *uap, user_addr_t *retval)
189 {
190 kern_return_t kret;
191 void * sright;
192 int error = 0;
193 int allocated = 0;
194 mach_vm_offset_t stackaddr;
195 mach_vm_size_t th_allocsize = 0;
196 mach_vm_size_t user_stacksize;
197 mach_vm_size_t th_stacksize;
198 mach_vm_offset_t th_stackaddr;
199 mach_vm_offset_t th_stack;
200 mach_vm_offset_t th_pthread;
201 mach_port_name_t th_thport;
202 thread_t th;
203 user_addr_t user_func = uap->func;
204 user_addr_t user_funcarg = uap->func_arg;
205 user_addr_t user_stack = uap->stack;
206 user_addr_t user_pthread = uap->pthread;
207 unsigned int flags = (unsigned int)uap->flags;
208 vm_map_t vmap = current_map();
209 task_t ctask = current_task();
210 unsigned int policy, importance;
211
212 int isLP64 = 0;
213
214
215 if ((p->p_lflag & P_LREGISTER) == 0)
216 return(EINVAL);
217 #if 0
218 KERNEL_DEBUG_CONSTANT(0x9000080 | DBG_FUNC_START, flags, 0, 0, 0, 0);
219 #endif
220
221 isLP64 = IS_64BIT_PROCESS(p);
222
223
224 #if defined(__i386__) || defined(__x86_64__)
225 stackaddr = 0xB0000000;
226 #else
227 #error Need to define a stack address hint for this architecture
228 #endif
229 kret = thread_create(ctask, &th);
230 if (kret != KERN_SUCCESS)
231 return(ENOMEM);
232 thread_reference(th);
233
234 sright = (void *) convert_thread_to_port(th);
235 th_thport = ipc_port_copyout_send(sright, get_task_ipcspace(ctask));
236
237 if ((flags & PTHREAD_START_CUSTOM) == 0) {
238 th_stacksize = (mach_vm_size_t)user_stack; /* if it is custom them it is stacksize */
239 th_allocsize = th_stacksize + PTH_DEFAULT_GUARDSIZE + p->p_pthsize;
240
241 kret = mach_vm_map(vmap, &stackaddr,
242 th_allocsize,
243 page_size-1,
244 VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE , NULL,
245 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL,
246 VM_INHERIT_DEFAULT);
247 if (kret != KERN_SUCCESS)
248 kret = mach_vm_allocate(vmap,
249 &stackaddr, th_allocsize,
250 VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE);
251 if (kret != KERN_SUCCESS) {
252 error = ENOMEM;
253 goto out;
254 }
255 #if 0
256 KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_NONE, th_allocsize, stackaddr, 0, 2, 0);
257 #endif
258 th_stackaddr = stackaddr;
259 allocated = 1;
260 /*
261 * The guard page is at the lowest address
262 * The stack base is the highest address
263 */
264 kret = mach_vm_protect(vmap, stackaddr, PTH_DEFAULT_GUARDSIZE, FALSE, VM_PROT_NONE);
265
266 if (kret != KERN_SUCCESS) {
267 error = ENOMEM;
268 goto out1;
269 }
270 th_stack = (stackaddr + th_stacksize + PTH_DEFAULT_GUARDSIZE);
271 th_pthread = (stackaddr + th_stacksize + PTH_DEFAULT_GUARDSIZE);
272 user_stacksize = th_stacksize;
273
274 /*
275 * Pre-fault the first page of the new thread's stack and the page that will
276 * contain the pthread_t structure.
277 */
278 vm_fault( vmap,
279 vm_map_trunc_page(th_stack - PAGE_SIZE_64),
280 VM_PROT_READ | VM_PROT_WRITE,
281 FALSE,
282 THREAD_UNINT, NULL, 0);
283
284 vm_fault( vmap,
285 vm_map_trunc_page(th_pthread),
286 VM_PROT_READ | VM_PROT_WRITE,
287 FALSE,
288 THREAD_UNINT, NULL, 0);
289 } else {
290 th_stack = user_stack;
291 user_stacksize = user_stack;
292 th_pthread = user_pthread;
293 #if 0
294 KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_NONE, 0, 0, 0, 3, 0);
295 #endif
296 }
297
298 #if defined(__i386__) || defined(__x86_64__)
299 {
300 /*
301 * Set up i386 registers & function call.
302 */
303 if (isLP64 == 0) {
304 x86_thread_state32_t state;
305 x86_thread_state32_t *ts = &state;
306
307 ts->eip = (int)p->p_threadstart;
308 ts->eax = (unsigned int)th_pthread;
309 ts->ebx = (unsigned int)th_thport;
310 ts->ecx = (unsigned int)user_func;
311 ts->edx = (unsigned int)user_funcarg;
312 ts->edi = (unsigned int)user_stacksize;
313 ts->esi = (unsigned int)uap->flags;
314 /*
315 * set stack pointer
316 */
317 ts->esp = (int)((vm_offset_t)(th_stack-C_32_STK_ALIGN));
318
319 thread_set_wq_state32(th, (thread_state_t)ts);
320
321 } else {
322 x86_thread_state64_t state64;
323 x86_thread_state64_t *ts64 = &state64;
324
325 ts64->rip = (uint64_t)p->p_threadstart;
326 ts64->rdi = (uint64_t)th_pthread;
327 ts64->rsi = (uint64_t)(th_thport);
328 ts64->rdx = (uint64_t)user_func;
329 ts64->rcx = (uint64_t)user_funcarg;
330 ts64->r8 = (uint64_t)user_stacksize;
331 ts64->r9 = (uint64_t)uap->flags;
332 /*
333 * set stack pointer aligned to 16 byte boundary
334 */
335 ts64->rsp = (uint64_t)(th_stack - C_64_REDZONE_LEN);
336
337 /* Disallow setting non-canonical PC or stack */
338 if (!is_useraddr64_canonical(ts64->rsp) ||
339 !is_useraddr64_canonical(ts64->rip)) {
340 error = EINVAL;
341 goto out;
342 }
343
344 thread_set_wq_state64(th, (thread_state_t)ts64);
345 }
346 }
347 #else
348 #error bsdthread_create not defined for this architecture
349 #endif
350 /* Set scheduling parameters if needed */
351 if ((flags & PTHREAD_START_SETSCHED) != 0) {
352 thread_extended_policy_data_t extinfo;
353 thread_precedence_policy_data_t precedinfo;
354 #if CONFIG_EMBEDDED
355 int ret = 0;
356 #endif /* CONFIG_EMBEDDED */
357
358 importance = (flags & PTHREAD_START_IMPORTANCE_MASK);
359 #if CONFIG_EMBEDDED
360 /* sets the saved importance for apple ios daemon if backgrounded. else returns 0 */
361 ret = proc_setthread_saved_importance(th, importance);
362 if (ret == 0) {
363 #endif /* CONFIG_EMBEDDED */
364 policy = (flags >> PTHREAD_START_POLICY_BITSHIFT) & PTHREAD_START_POLICY_MASK;
365
366 if (policy == SCHED_OTHER)
367 extinfo.timeshare = 1;
368 else
369 extinfo.timeshare = 0;
370 thread_policy_set(th, THREAD_EXTENDED_POLICY, (thread_policy_t)&extinfo, THREAD_EXTENDED_POLICY_COUNT);
371
372 #define BASEPRI_DEFAULT 31
373 precedinfo.importance = (importance - BASEPRI_DEFAULT);
374 thread_policy_set(th, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
375 #if CONFIG_EMBEDDED
376 }
377 #endif /* CONFIG_EMBEDDED */
378 }
379
380 kret = thread_resume(th);
381 if (kret != KERN_SUCCESS) {
382 error = EINVAL;
383 goto out1;
384 }
385 thread_deallocate(th); /* drop the creator reference */
386 #if 0
387 KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_END, error, th_pthread, 0, 0, 0);
388 #endif
389 *retval = th_pthread;
390
391 return(0);
392
393 out1:
394 if (allocated != 0)
395 (void)mach_vm_deallocate(vmap, stackaddr, th_allocsize);
396 out:
397 (void)mach_port_deallocate(get_task_ipcspace(ctask), th_thport);
398 (void)thread_terminate(th);
399 (void)thread_deallocate(th);
400 return(error);
401 }
402
403 int
404 bsdthread_terminate(__unused struct proc *p, struct bsdthread_terminate_args *uap, __unused int32_t *retval)
405 {
406 mach_vm_offset_t freeaddr;
407 mach_vm_size_t freesize;
408 kern_return_t kret;
409 mach_port_name_t kthport = (mach_port_name_t)uap->port;
410 mach_port_name_t sem = (mach_port_name_t)uap->sem;
411
412 freeaddr = (mach_vm_offset_t)uap->stackaddr;
413 freesize = uap->freesize;
414
415 #if 0
416 KERNEL_DEBUG_CONSTANT(0x9000084 |DBG_FUNC_START, freeaddr, freesize, kthport, 0xff, 0);
417 #endif
418 if ((freesize != (mach_vm_size_t)0) && (freeaddr != (mach_vm_offset_t)0)) {
419 kret = mach_vm_deallocate(current_map(), freeaddr, freesize);
420 if (kret != KERN_SUCCESS) {
421 return(EINVAL);
422 }
423 }
424
425 (void) thread_terminate(current_thread());
426 if (sem != MACH_PORT_NULL) {
427 kret = semaphore_signal_internal_trap(sem);
428 if (kret != KERN_SUCCESS) {
429 return(EINVAL);
430 }
431 }
432
433 if (kthport != MACH_PORT_NULL)
434 mach_port_deallocate(get_task_ipcspace(current_task()), kthport);
435 thread_exception_return();
436 panic("bsdthread_terminate: still running\n");
437 #if 0
438 KERNEL_DEBUG_CONSTANT(0x9000084 |DBG_FUNC_END, 0, 0, 0, 0xff, 0);
439 #endif
440 return(0);
441 }
442
443
444 int
445 bsdthread_register(struct proc *p, struct bsdthread_register_args *uap, __unused int32_t *retval)
446 {
447 /* prevent multiple registrations */
448 if ((p->p_lflag & P_LREGISTER) != 0)
449 return(EINVAL);
450 /* syscall randomizer test can pass bogus values */
451 if (uap->pthsize > MAX_PTHREAD_SIZE) {
452 return(EINVAL);
453 }
454 p->p_threadstart = uap->threadstart;
455 p->p_wqthread = uap->wqthread;
456 p->p_pthsize = uap->pthsize;
457 p->p_targconc = uap->targetconc_ptr;
458 p->p_dispatchqueue_offset = uap->dispatchqueue_offset;
459 proc_setregister(p);
460
461 return(0);
462 }
463
464 uint32_t wq_yielded_threshold = WQ_YIELDED_THRESHOLD;
465 uint32_t wq_yielded_window_usecs = WQ_YIELDED_WINDOW_USECS;
466 uint32_t wq_stalled_window_usecs = WQ_STALLED_WINDOW_USECS;
467 uint32_t wq_reduce_pool_window_usecs = WQ_REDUCE_POOL_WINDOW_USECS;
468 uint32_t wq_max_timer_interval_usecs = WQ_MAX_TIMER_INTERVAL_USECS;
469 uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS;
470 uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8;
471
472
473 SYSCTL_INT(_kern, OID_AUTO, wq_yielded_threshold, CTLFLAG_RW | CTLFLAG_LOCKED,
474 &wq_yielded_threshold, 0, "");
475
476 SYSCTL_INT(_kern, OID_AUTO, wq_yielded_window_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
477 &wq_yielded_window_usecs, 0, "");
478
479 SYSCTL_INT(_kern, OID_AUTO, wq_stalled_window_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
480 &wq_stalled_window_usecs, 0, "");
481
482 SYSCTL_INT(_kern, OID_AUTO, wq_reduce_pool_window_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
483 &wq_reduce_pool_window_usecs, 0, "");
484
485 SYSCTL_INT(_kern, OID_AUTO, wq_max_timer_interval_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
486 &wq_max_timer_interval_usecs, 0, "");
487
488 SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
489 &wq_max_threads, 0, "");
490
491 SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
492 &wq_max_constrained_threads, 0, "");
493
494
495 static uint32_t wq_init_constrained_limit = 1;
496
497
498 void
499 workqueue_init_lock(proc_t p)
500 {
501 lck_spin_init(&p->p_wqlock, pthread_lck_grp, pthread_lck_attr);
502
503 p->p_wqiniting = FALSE;
504 }
505
506 void
507 workqueue_destroy_lock(proc_t p)
508 {
509 lck_spin_destroy(&p->p_wqlock, pthread_lck_grp);
510 }
511
512
513 static void
514 workqueue_lock_spin(proc_t p)
515 {
516 lck_spin_lock(&p->p_wqlock);
517 }
518
519 static void
520 workqueue_unlock(proc_t p)
521 {
522 lck_spin_unlock(&p->p_wqlock);
523 }
524
525
526 static void
527 workqueue_interval_timer_start(struct workqueue *wq)
528 {
529 uint64_t deadline;
530
531 if (wq->wq_timer_interval == 0)
532 wq->wq_timer_interval = wq_stalled_window_usecs;
533 else {
534 wq->wq_timer_interval = wq->wq_timer_interval * 2;
535
536 if (wq->wq_timer_interval > wq_max_timer_interval_usecs)
537 wq->wq_timer_interval = wq_max_timer_interval_usecs;
538 }
539 clock_interval_to_deadline(wq->wq_timer_interval, 1000, &deadline);
540
541 thread_call_enter_delayed(wq->wq_atimer_call, deadline);
542
543 KERNEL_DEBUG(0xefffd110, wq, wq->wq_reqcount, wq->wq_flags, wq->wq_timer_interval, 0);
544 }
545
546
547 static boolean_t
548 wq_thread_is_busy(uint64_t cur_ts, uint64_t *lastblocked_tsp)
549 { clock_sec_t secs;
550 clock_usec_t usecs;
551 uint64_t lastblocked_ts;
552 uint64_t elapsed;
553
554 /*
555 * the timestamp is updated atomically w/o holding the workqueue lock
556 * so we need to do an atomic read of the 64 bits so that we don't see
557 * a mismatched pair of 32 bit reads... we accomplish this in an architecturally
558 * independent fashion by using OSCompareAndSwap64 to write back the
559 * value we grabbed... if it succeeds, then we have a good timestamp to
560 * evaluate... if it fails, we straddled grabbing the timestamp while it
561 * was being updated... treat a failed update as a busy thread since
562 * it implies we are about to see a really fresh timestamp anyway
563 */
564 lastblocked_ts = *lastblocked_tsp;
565
566 if ( !OSCompareAndSwap64((UInt64)lastblocked_ts, (UInt64)lastblocked_ts, lastblocked_tsp))
567 return (TRUE);
568
569 if (lastblocked_ts >= cur_ts) {
570 /*
571 * because the update of the timestamp when a thread blocks isn't
572 * serialized against us looking at it (i.e. we don't hold the workq lock)
573 * it's possible to have a timestamp that matches the current time or
574 * that even looks to be in the future relative to when we grabbed the current
575 * time... just treat this as a busy thread since it must have just blocked.
576 */
577 return (TRUE);
578 }
579 elapsed = cur_ts - lastblocked_ts;
580
581 absolutetime_to_microtime(elapsed, &secs, &usecs);
582
583 if (secs == 0 && usecs < wq_stalled_window_usecs)
584 return (TRUE);
585 return (FALSE);
586 }
587
588
589 #define WQ_TIMER_NEEDED(wq, start_timer) do { \
590 int oldflags = wq->wq_flags; \
591 \
592 if ( !(oldflags & (WQ_EXITING | WQ_ATIMER_RUNNING))) { \
593 if (OSCompareAndSwap(oldflags, oldflags | WQ_ATIMER_RUNNING, (UInt32 *)&wq->wq_flags)) \
594 start_timer = TRUE; \
595 } \
596 } while (0)
597
598
599
600 static void
601 workqueue_add_timer(struct workqueue *wq, __unused int param1)
602 {
603 proc_t p;
604 boolean_t start_timer = FALSE;
605 boolean_t retval;
606 boolean_t add_thread;
607 uint32_t busycount;
608
609 KERNEL_DEBUG(0xefffd108 | DBG_FUNC_START, wq, wq->wq_flags, wq->wq_nthreads, wq->wq_thidlecount, 0);
610
611 p = wq->wq_proc;
612
613 workqueue_lock_spin(p);
614
615 /*
616 * because workqueue_callback now runs w/o taking the workqueue lock
617 * we are unsynchronized w/r to a change in state of the running threads...
618 * to make sure we always evaluate that change, we allow it to start up
619 * a new timer if the current one is actively evalutating the state
620 * however, we do not need more than 2 timers fired up (1 active and 1 pending)
621 * and we certainly do not want 2 active timers evaluating the state
622 * simultaneously... so use WQL_ATIMER_BUSY to serialize the timers...
623 * note that WQL_ATIMER_BUSY is in a different flag word from WQ_ATIMER_RUNNING since
624 * it is always protected by the workq lock... WQ_ATIMER_RUNNING is evaluated
625 * and set atomimcally since the callback function needs to manipulate it
626 * w/o holding the workq lock...
627 *
628 * !WQ_ATIMER_RUNNING && !WQL_ATIMER_BUSY == no pending timer, no active timer
629 * !WQ_ATIMER_RUNNING && WQL_ATIMER_BUSY == no pending timer, 1 active timer
630 * WQ_ATIMER_RUNNING && !WQL_ATIMER_BUSY == 1 pending timer, no active timer
631 * WQ_ATIMER_RUNNING && WQL_ATIMER_BUSY == 1 pending timer, 1 active timer
632 */
633 while (wq->wq_lflags & WQL_ATIMER_BUSY) {
634 wq->wq_lflags |= WQL_ATIMER_WAITING;
635
636 assert_wait((caddr_t)wq, (THREAD_UNINT));
637 workqueue_unlock(p);
638
639 thread_block(THREAD_CONTINUE_NULL);
640
641 workqueue_lock_spin(p);
642 }
643 wq->wq_lflags |= WQL_ATIMER_BUSY;
644
645 /*
646 * the workq lock will protect us from seeing WQ_EXITING change state, but we
647 * still need to update this atomically in case someone else tries to start
648 * the timer just as we're releasing it
649 */
650 while ( !(OSCompareAndSwap(wq->wq_flags, (wq->wq_flags & ~WQ_ATIMER_RUNNING), (UInt32 *)&wq->wq_flags)));
651
652 again:
653 retval = TRUE;
654 add_thread = FALSE;
655
656 if ( !(wq->wq_flags & WQ_EXITING)) {
657 /*
658 * check to see if the stall frequency was beyond our tolerance
659 * or we have work on the queue, but haven't scheduled any
660 * new work within our acceptable time interval because
661 * there were no idle threads left to schedule
662 */
663 if (wq->wq_reqcount) {
664 uint32_t priority;
665 uint32_t affinity_tag;
666 uint32_t i;
667 uint64_t curtime;
668
669 for (priority = 0; priority < WORKQUEUE_NUMPRIOS; priority++) {
670 if (wq->wq_requests[priority])
671 break;
672 }
673 assert(priority < WORKQUEUE_NUMPRIOS);
674
675 curtime = mach_absolute_time();
676 busycount = 0;
677
678 for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; affinity_tag++) {
679 /*
680 * if we have no idle threads, we can try to add them if needed
681 */
682 if (wq->wq_thidlecount == 0)
683 add_thread = TRUE;
684
685 /*
686 * look for first affinity group that is currently not active
687 * i.e. no active threads at this priority level or higher
688 * and has not been active recently at this priority level or higher
689 */
690 for (i = 0; i <= priority; i++) {
691 if (wq->wq_thactive_count[i][affinity_tag]) {
692 add_thread = FALSE;
693 break;
694 }
695 if (wq->wq_thscheduled_count[i][affinity_tag]) {
696 if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i][affinity_tag])) {
697 add_thread = FALSE;
698 busycount++;
699 break;
700 }
701 }
702 }
703 if (add_thread == TRUE) {
704 retval = workqueue_addnewthread(wq, FALSE);
705 break;
706 }
707 }
708 if (wq->wq_reqcount) {
709 /*
710 * as long as we have threads to schedule, and we successfully
711 * scheduled new work, keep trying
712 */
713 while (wq->wq_thidlecount && !(wq->wq_flags & WQ_EXITING)) {
714 /*
715 * workqueue_run_nextreq is responsible for
716 * dropping the workqueue lock in all cases
717 */
718 retval = workqueue_run_nextreq(p, wq, THREAD_NULL, FALSE, FALSE, 0, 0);
719 workqueue_lock_spin(p);
720
721 if (retval == FALSE)
722 break;
723 }
724 if ( !(wq->wq_flags & WQ_EXITING) && wq->wq_reqcount) {
725
726 if (wq->wq_thidlecount == 0 && retval == TRUE && add_thread == TRUE)
727 goto again;
728
729 if (wq->wq_thidlecount == 0 || busycount)
730 WQ_TIMER_NEEDED(wq, start_timer);
731
732 KERNEL_DEBUG(0xefffd108 | DBG_FUNC_NONE, wq, wq->wq_reqcount, wq->wq_thidlecount, busycount, 0);
733 }
734 }
735 }
736 }
737 if ( !(wq->wq_flags & WQ_ATIMER_RUNNING))
738 wq->wq_timer_interval = 0;
739
740 wq->wq_lflags &= ~WQL_ATIMER_BUSY;
741
742 if ((wq->wq_flags & WQ_EXITING) || (wq->wq_lflags & WQL_ATIMER_WAITING)) {
743 /*
744 * wakeup the thread hung up in workqueue_exit or workqueue_add_timer waiting for this timer
745 * to finish getting out of the way
746 */
747 wq->wq_lflags &= ~WQL_ATIMER_WAITING;
748 wakeup(wq);
749 }
750 KERNEL_DEBUG(0xefffd108 | DBG_FUNC_END, wq, start_timer, wq->wq_nthreads, wq->wq_thidlecount, 0);
751
752 workqueue_unlock(p);
753
754 if (start_timer == TRUE)
755 workqueue_interval_timer_start(wq);
756 }
757
758
759 void
760 workqueue_thread_yielded(void)
761 {
762 struct workqueue *wq;
763 proc_t p;
764
765 p = current_proc();
766
767 if ((wq = p->p_wqptr) == NULL || wq->wq_reqcount == 0)
768 return;
769
770 workqueue_lock_spin(p);
771
772 if (wq->wq_reqcount) {
773 uint64_t curtime;
774 uint64_t elapsed;
775 clock_sec_t secs;
776 clock_usec_t usecs;
777
778 if (wq->wq_thread_yielded_count++ == 0)
779 wq->wq_thread_yielded_timestamp = mach_absolute_time();
780
781 if (wq->wq_thread_yielded_count < wq_yielded_threshold) {
782 workqueue_unlock(p);
783 return;
784 }
785 KERNEL_DEBUG(0xefffd138 | DBG_FUNC_START, wq, wq->wq_thread_yielded_count, wq->wq_reqcount, 0, 0);
786
787 wq->wq_thread_yielded_count = 0;
788
789 curtime = mach_absolute_time();
790 elapsed = curtime - wq->wq_thread_yielded_timestamp;
791 absolutetime_to_microtime(elapsed, &secs, &usecs);
792
793 if (secs == 0 && usecs < wq_yielded_window_usecs) {
794
795 if (wq->wq_thidlecount == 0) {
796 workqueue_addnewthread(wq, TRUE);
797 /*
798 * 'workqueue_addnewthread' drops the workqueue lock
799 * when creating the new thread and then retakes it before
800 * returning... this window allows other threads to process
801 * requests, so we need to recheck for available work
802 * if none found, we just return... the newly created thread
803 * will eventually get used (if it hasn't already)...
804 */
805 if (wq->wq_reqcount == 0) {
806 workqueue_unlock(p);
807 return;
808 }
809 }
810 if (wq->wq_thidlecount) {
811 uint32_t priority;
812 uint32_t affinity = -1;
813 boolean_t overcommit = FALSE;
814 boolean_t force_oc = FALSE;
815 struct uthread *uth;
816 struct threadlist *tl;
817
818 uth = get_bsdthread_info(current_thread());
819 if ((tl = uth->uu_threadlist))
820 affinity = tl->th_affinity_tag;
821
822 for (priority = 0; priority < WORKQUEUE_NUMPRIOS; priority++) {
823 if (wq->wq_requests[priority])
824 break;
825 }
826 assert(priority < WORKQUEUE_NUMPRIOS);
827
828 wq->wq_reqcount--;
829 wq->wq_requests[priority]--;
830
831 if (wq->wq_ocrequests[priority]) {
832 wq->wq_ocrequests[priority]--;
833 overcommit = TRUE;
834 } else
835 force_oc = TRUE;
836
837 (void)workqueue_run_nextreq(p, wq, THREAD_NULL, force_oc, overcommit, priority, affinity);
838 /*
839 * workqueue_run_nextreq is responsible for
840 * dropping the workqueue lock in all cases
841 */
842 KERNEL_DEBUG(0xefffd138 | DBG_FUNC_END, wq, wq->wq_thread_yielded_count, wq->wq_reqcount, 1, 0);
843
844 return;
845 }
846 }
847 KERNEL_DEBUG(0xefffd138 | DBG_FUNC_END, wq, wq->wq_thread_yielded_count, wq->wq_reqcount, 2, 0);
848 }
849 workqueue_unlock(p);
850 }
851
852
853
854 static void
855 workqueue_callback(int type, thread_t thread)
856 {
857 struct uthread *uth;
858 struct threadlist *tl;
859 struct workqueue *wq;
860
861 uth = get_bsdthread_info(thread);
862 tl = uth->uu_threadlist;
863 wq = tl->th_workq;
864
865 switch (type) {
866
867 case SCHED_CALL_BLOCK:
868 {
869 uint32_t old_activecount;
870
871 old_activecount = OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority][tl->th_affinity_tag]);
872
873 if (old_activecount == 1) {
874 boolean_t start_timer = FALSE;
875 uint64_t curtime;
876 UInt64 *lastblocked_ptr;
877
878 /*
879 * we were the last active thread on this affinity set
880 * and we've got work to do
881 */
882 lastblocked_ptr = (UInt64 *)&wq->wq_lastblocked_ts[tl->th_priority][tl->th_affinity_tag];
883 curtime = mach_absolute_time();
884
885 /*
886 * if we collide with another thread trying to update the last_blocked (really unlikely
887 * since another thread would have to get scheduled and then block after we start down
888 * this path), it's not a problem. Either timestamp is adequate, so no need to retry
889 */
890
891 OSCompareAndSwap64(*lastblocked_ptr, (UInt64)curtime, lastblocked_ptr);
892
893 if (wq->wq_reqcount)
894 WQ_TIMER_NEEDED(wq, start_timer);
895
896 if (start_timer == TRUE)
897 workqueue_interval_timer_start(wq);
898 }
899 KERNEL_DEBUG1(0xefffd020 | DBG_FUNC_START, wq, old_activecount, tl->th_priority, tl->th_affinity_tag, thread_tid(thread));
900 }
901 break;
902
903 case SCHED_CALL_UNBLOCK:
904 /*
905 * we cannot take the workqueue_lock here...
906 * an UNBLOCK can occur from a timer event which
907 * is run from an interrupt context... if the workqueue_lock
908 * is already held by this processor, we'll deadlock...
909 * the thread lock for the thread being UNBLOCKED
910 * is also held
911 */
912 OSAddAtomic(1, &wq->wq_thactive_count[tl->th_priority][tl->th_affinity_tag]);
913
914 KERNEL_DEBUG1(0xefffd020 | DBG_FUNC_END, wq, wq->wq_threads_scheduled, tl->th_priority, tl->th_affinity_tag, thread_tid(thread));
915
916 break;
917 }
918 }
919
920
921 static void
922 workqueue_removethread(struct threadlist *tl, int fromexit)
923 {
924 struct workqueue *wq;
925 struct uthread * uth;
926
927 /*
928 * If fromexit is set, the call is from workqueue_exit(,
929 * so some cleanups are to be avoided.
930 */
931 wq = tl->th_workq;
932
933 TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
934
935 if (fromexit == 0) {
936 wq->wq_nthreads--;
937 wq->wq_thidlecount--;
938 }
939
940 /*
941 * Clear the threadlist pointer in uthread so
942 * blocked thread on wakeup for termination will
943 * not access the thread list as it is going to be
944 * freed.
945 */
946 thread_sched_call(tl->th_thread, NULL);
947
948 uth = get_bsdthread_info(tl->th_thread);
949 if (uth != (struct uthread *)0) {
950 uth->uu_threadlist = NULL;
951 }
952 if (fromexit == 0) {
953 /* during exit the lock is not held */
954 workqueue_unlock(wq->wq_proc);
955 }
956
957 if ( (tl->th_flags & TH_LIST_SUSPENDED) ) {
958 /*
959 * thread was created, but never used...
960 * need to clean up the stack and port ourselves
961 * since we're not going to spin up through the
962 * normal exit path triggered from Libc
963 */
964 if (fromexit == 0) {
965 /* vm map is already deallocated when this is called from exit */
966 (void)mach_vm_deallocate(wq->wq_map, tl->th_stackaddr, tl->th_allocsize);
967 }
968 (void)mach_port_deallocate(get_task_ipcspace(wq->wq_task), tl->th_thport);
969
970 KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_END, wq, (uintptr_t)thread_tid(current_thread()), wq->wq_nthreads, 0xdead, thread_tid(tl->th_thread));
971 } else {
972
973 KERNEL_DEBUG1(0xefffd018 | DBG_FUNC_END, wq, (uintptr_t)thread_tid(current_thread()), wq->wq_nthreads, 0xdead, thread_tid(tl->th_thread));
974 }
975 /*
976 * drop our ref on the thread
977 */
978 thread_deallocate(tl->th_thread);
979
980 kfree(tl, sizeof(struct threadlist));
981 }
982
983
984 /*
985 * called with workq lock held
986 * dropped and retaken around thread creation
987 * return with workq lock held
988 */
989 static boolean_t
990 workqueue_addnewthread(struct workqueue *wq, boolean_t oc_thread)
991 {
992 struct threadlist *tl;
993 struct uthread *uth;
994 kern_return_t kret;
995 thread_t th;
996 proc_t p;
997 void *sright;
998 mach_vm_offset_t stackaddr;
999
1000 if ((wq->wq_flags & WQ_EXITING) == WQ_EXITING)
1001 return (FALSE);
1002
1003 if (wq->wq_nthreads >= wq_max_threads || wq->wq_nthreads >= (CONFIG_THREAD_MAX - 20)) {
1004 wq->wq_lflags |= WQL_EXCEEDED_TOTAL_THREAD_LIMIT;
1005 return (FALSE);
1006 }
1007 wq->wq_lflags &= ~WQL_EXCEEDED_TOTAL_THREAD_LIMIT;
1008
1009 if (oc_thread == FALSE && wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
1010 /*
1011 * if we're not creating this thread to service an overcommit request,
1012 * then check the size of the constrained thread pool... if we've already
1013 * reached our max for threads scheduled from this pool, don't create a new
1014 * one... the callers of this function are prepared for failure.
1015 */
1016 wq->wq_lflags |= WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
1017 return (FALSE);
1018 }
1019 if (wq->wq_constrained_threads_scheduled < wq_max_constrained_threads)
1020 wq->wq_lflags &= ~WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
1021
1022 wq->wq_nthreads++;
1023
1024 p = wq->wq_proc;
1025 workqueue_unlock(p);
1026
1027 kret = thread_create_workq(wq->wq_task, (thread_continue_t)wq_unsuspend_continue, &th);
1028
1029 if (kret != KERN_SUCCESS)
1030 goto failed;
1031
1032 tl = kalloc(sizeof(struct threadlist));
1033 bzero(tl, sizeof(struct threadlist));
1034
1035 #if defined(__i386__) || defined(__x86_64__)
1036 stackaddr = 0xB0000000;
1037 #else
1038 #error Need to define a stack address hint for this architecture
1039 #endif
1040 tl->th_allocsize = PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE + p->p_pthsize;
1041
1042 kret = mach_vm_map(wq->wq_map, &stackaddr,
1043 tl->th_allocsize,
1044 page_size-1,
1045 VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE , NULL,
1046 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL,
1047 VM_INHERIT_DEFAULT);
1048
1049 if (kret != KERN_SUCCESS) {
1050 kret = mach_vm_allocate(wq->wq_map,
1051 &stackaddr, tl->th_allocsize,
1052 VM_MAKE_TAG(VM_MEMORY_STACK) | VM_FLAGS_ANYWHERE);
1053 }
1054 if (kret == KERN_SUCCESS) {
1055 /*
1056 * The guard page is at the lowest address
1057 * The stack base is the highest address
1058 */
1059 kret = mach_vm_protect(wq->wq_map, stackaddr, PTH_DEFAULT_GUARDSIZE, FALSE, VM_PROT_NONE);
1060
1061 if (kret != KERN_SUCCESS)
1062 (void) mach_vm_deallocate(wq->wq_map, stackaddr, tl->th_allocsize);
1063 }
1064 if (kret != KERN_SUCCESS) {
1065 (void) thread_terminate(th);
1066 thread_deallocate(th);
1067
1068 kfree(tl, sizeof(struct threadlist));
1069 goto failed;
1070 }
1071 thread_reference(th);
1072
1073 sright = (void *) convert_thread_to_port(th);
1074 tl->th_thport = ipc_port_copyout_send(sright, get_task_ipcspace(wq->wq_task));
1075
1076 thread_static_param(th, TRUE);
1077
1078 tl->th_flags = TH_LIST_INITED | TH_LIST_SUSPENDED;
1079
1080 tl->th_thread = th;
1081 tl->th_workq = wq;
1082 tl->th_stackaddr = stackaddr;
1083 tl->th_affinity_tag = -1;
1084 tl->th_priority = WORKQUEUE_NUMPRIOS;
1085 tl->th_policy = -1;
1086
1087 uth = get_bsdthread_info(tl->th_thread);
1088
1089 workqueue_lock_spin(p);
1090
1091 uth->uu_threadlist = (void *)tl;
1092 TAILQ_INSERT_TAIL(&wq->wq_thidlelist, tl, th_entry);
1093
1094 wq->wq_thidlecount++;
1095
1096 KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_START, wq, wq->wq_nthreads, 0, thread_tid(current_thread()), thread_tid(tl->th_thread));
1097
1098 return (TRUE);
1099
1100 failed:
1101 workqueue_lock_spin(p);
1102 wq->wq_nthreads--;
1103
1104 return (FALSE);
1105 }
1106
1107
1108 int
1109 workq_open(struct proc *p, __unused struct workq_open_args *uap, __unused int32_t *retval)
1110 {
1111 struct workqueue * wq;
1112 int wq_size;
1113 char * ptr;
1114 char * nptr;
1115 uint32_t i;
1116 uint32_t num_cpus;
1117 int error = 0;
1118 boolean_t need_wakeup = FALSE;
1119
1120
1121 if ((p->p_lflag & P_LREGISTER) == 0)
1122 return(EINVAL);
1123
1124 num_cpus = ml_get_max_cpus();
1125
1126 if (wq_init_constrained_limit) {
1127 uint32_t limit;
1128 /*
1129 * set up the limit for the constrained pool
1130 * this is a virtual pool in that we don't
1131 * maintain it on a separate idle and run list
1132 */
1133 limit = num_cpus * (WORKQUEUE_NUMPRIOS + 1);
1134
1135 if (limit > wq_max_constrained_threads)
1136 wq_max_constrained_threads = limit;
1137
1138 wq_init_constrained_limit = 0;
1139 }
1140 workqueue_lock_spin(p);
1141
1142 if (p->p_wqptr == NULL) {
1143
1144 while (p->p_wqiniting == TRUE) {
1145
1146 assert_wait((caddr_t)&p->p_wqiniting, THREAD_UNINT);
1147 workqueue_unlock(p);
1148
1149 thread_block(THREAD_CONTINUE_NULL);
1150
1151 workqueue_lock_spin(p);
1152 }
1153 if (p->p_wqptr != NULL)
1154 goto out;
1155
1156 p->p_wqiniting = TRUE;
1157
1158 workqueue_unlock(p);
1159
1160 wq_size = sizeof(struct workqueue) +
1161 (num_cpus * WORKQUEUE_NUMPRIOS * sizeof(uint16_t)) +
1162 (num_cpus * WORKQUEUE_NUMPRIOS * sizeof(uint32_t)) +
1163 (num_cpus * WORKQUEUE_NUMPRIOS * sizeof(uint64_t)) +
1164 sizeof(uint32_t) + sizeof(uint64_t);
1165
1166 ptr = (char *)kalloc(wq_size);
1167 bzero(ptr, wq_size);
1168
1169 wq = (struct workqueue *)ptr;
1170 wq->wq_flags = WQ_LIST_INITED;
1171 wq->wq_proc = p;
1172 wq->wq_affinity_max = num_cpus;
1173 wq->wq_task = current_task();
1174 wq->wq_map = current_map();
1175
1176 for (i = 0; i < WORKQUEUE_NUMPRIOS; i++)
1177 wq->wq_reqconc[i] = wq->wq_affinity_max;
1178
1179 nptr = ptr + sizeof(struct workqueue);
1180
1181 for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
1182 wq->wq_thscheduled_count[i] = (uint16_t *)nptr;
1183 nptr += (num_cpus * sizeof(uint16_t));
1184 }
1185 nptr += (sizeof(uint32_t) - 1);
1186 nptr = (char *)((uintptr_t)nptr & ~(sizeof(uint32_t) - 1));
1187
1188 for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
1189 wq->wq_thactive_count[i] = (uint32_t *)nptr;
1190 nptr += (num_cpus * sizeof(uint32_t));
1191 }
1192 /*
1193 * align nptr on a 64 bit boundary so that we can do nice
1194 * atomic64 operations on the timestamps...
1195 * note that we requested an extra uint64_t when calcuating
1196 * the size for the allocation of the workqueue struct
1197 */
1198 nptr += (sizeof(uint64_t) - 1);
1199 nptr = (char *)((uintptr_t)nptr & ~(sizeof(uint64_t) - 1));
1200
1201 for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
1202 wq->wq_lastblocked_ts[i] = (uint64_t *)nptr;
1203 nptr += (num_cpus * sizeof(uint64_t));
1204 }
1205 TAILQ_INIT(&wq->wq_thrunlist);
1206 TAILQ_INIT(&wq->wq_thidlelist);
1207
1208 wq->wq_atimer_call = thread_call_allocate((thread_call_func_t)workqueue_add_timer, (thread_call_param_t)wq);
1209
1210 workqueue_lock_spin(p);
1211
1212 p->p_wqptr = (void *)wq;
1213 p->p_wqsize = wq_size;
1214
1215 p->p_wqiniting = FALSE;
1216 need_wakeup = TRUE;
1217 }
1218 out:
1219 workqueue_unlock(p);
1220
1221 if (need_wakeup == TRUE)
1222 wakeup(&p->p_wqiniting);
1223 return(error);
1224 }
1225
1226
1227 int
1228 workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, __unused int32_t *retval)
1229 {
1230 struct workqueue *wq;
1231 int error = 0;
1232
1233 if ((p->p_lflag & P_LREGISTER) == 0)
1234 return(EINVAL);
1235
1236 switch (uap->options) {
1237
1238 case WQOPS_QUEUE_NEWSPISUPP:
1239 break;
1240
1241 case WQOPS_QUEUE_REQTHREADS: {
1242 /*
1243 * for this operation, we re-purpose the affinity
1244 * argument as the number of threads to start
1245 */
1246 boolean_t overcommit = FALSE;
1247 int priority = uap->prio;
1248 int reqcount = uap->affinity;
1249
1250 if (priority & WORKQUEUE_OVERCOMMIT) {
1251 priority &= ~WORKQUEUE_OVERCOMMIT;
1252 overcommit = TRUE;
1253 }
1254 if ((reqcount <= 0) || (priority < 0) || (priority >= WORKQUEUE_NUMPRIOS)) {
1255 error = EINVAL;
1256 break;
1257 }
1258 workqueue_lock_spin(p);
1259
1260 if ((wq = (struct workqueue *)p->p_wqptr) == NULL) {
1261 workqueue_unlock(p);
1262
1263 error = EINVAL;
1264 break;
1265 }
1266 if (overcommit == FALSE) {
1267 wq->wq_reqcount += reqcount;
1268 wq->wq_requests[priority] += reqcount;
1269
1270 KERNEL_DEBUG(0xefffd008 | DBG_FUNC_NONE, wq, priority, wq->wq_requests[priority], reqcount, 0);
1271
1272 while (wq->wq_reqcount) {
1273 if (workqueue_run_one(p, wq, overcommit, priority) == FALSE)
1274 break;
1275 }
1276 } else {
1277 KERNEL_DEBUG(0xefffd13c | DBG_FUNC_NONE, wq, priority, wq->wq_requests[priority], reqcount, 0);
1278
1279 while (reqcount) {
1280 if (workqueue_run_one(p, wq, overcommit, priority) == FALSE)
1281 break;
1282 reqcount--;
1283 }
1284 if (reqcount) {
1285 /*
1286 * we need to delay starting some of the overcommit requests...
1287 * we should only fail to create the overcommit threads if
1288 * we're at the max thread limit... as existing threads
1289 * return to the kernel, we'll notice the ocrequests
1290 * and spin them back to user space as the overcommit variety
1291 */
1292 wq->wq_reqcount += reqcount;
1293 wq->wq_requests[priority] += reqcount;
1294 wq->wq_ocrequests[priority] += reqcount;
1295
1296 KERNEL_DEBUG(0xefffd140 | DBG_FUNC_NONE, wq, priority, wq->wq_requests[priority], reqcount, 0);
1297 }
1298 }
1299 workqueue_unlock(p);
1300
1301 }
1302 break;
1303
1304 case WQOPS_THREAD_RETURN: {
1305 thread_t th = current_thread();
1306 struct uthread *uth = get_bsdthread_info(th);
1307
1308 /* reset signal mask on the workqueue thread to default state */
1309 if (uth->uu_sigmask != (sigset_t)(~workq_threadmask)) {
1310 proc_lock(p);
1311 uth->uu_sigmask = ~workq_threadmask;
1312 proc_unlock(p);
1313 }
1314 workqueue_lock_spin(p);
1315
1316 if ((wq = (struct workqueue *)p->p_wqptr) == NULL || (uth->uu_threadlist == NULL)) {
1317 workqueue_unlock(p);
1318
1319 error = EINVAL;
1320 break;
1321 }
1322 KERNEL_DEBUG(0xefffd004 | DBG_FUNC_END, wq, 0, 0, 0, 0);
1323
1324 (void)workqueue_run_nextreq(p, wq, th, FALSE, FALSE, 0, -1);
1325 /*
1326 * workqueue_run_nextreq is responsible for
1327 * dropping the workqueue lock in all cases
1328 */
1329 }
1330 break;
1331
1332 default:
1333 error = EINVAL;
1334 break;
1335 }
1336 return (error);
1337 }
1338
1339 /*
1340 * Routine: workqueue_mark_exiting
1341 *
1342 * Function: Mark the work queue such that new threads will not be added to the
1343 * work queue after we return.
1344 *
1345 * Conditions: Called against the current process.
1346 */
1347 void
1348 workqueue_mark_exiting(struct proc *p)
1349 {
1350 struct workqueue * wq;
1351
1352 wq = p->p_wqptr;
1353 if (wq != NULL) {
1354
1355 KERNEL_DEBUG(0x9008088 | DBG_FUNC_START, p->p_wqptr, 0, 0, 0, 0);
1356
1357 workqueue_lock_spin(p);
1358
1359 /*
1360 * we now arm the timer in the callback function w/o holding the workq lock...
1361 * we do this by setting WQ_ATIMER_RUNNING via OSCompareAndSwap in order to
1362 * insure only a single timer if running and to notice that WQ_EXITING has
1363 * been set (we don't want to start a timer once WQ_EXITING is posted)
1364 *
1365 * so once we have successfully set WQ_EXITING, we cannot fire up a new timer...
1366 * therefor no need to clear the timer state atomically from the flags
1367 *
1368 * since we always hold the workq lock when dropping WQ_ATIMER_RUNNING
1369 * the check for and sleep until clear is protected
1370 */
1371 while ( !(OSCompareAndSwap(wq->wq_flags, (wq->wq_flags | WQ_EXITING), (UInt32 *)&wq->wq_flags)));
1372
1373 if (wq->wq_flags & WQ_ATIMER_RUNNING) {
1374 if (thread_call_cancel(wq->wq_atimer_call) == TRUE)
1375 wq->wq_flags &= ~WQ_ATIMER_RUNNING;
1376 }
1377 while ((wq->wq_flags & WQ_ATIMER_RUNNING) || (wq->wq_lflags & WQL_ATIMER_BUSY)) {
1378
1379 assert_wait((caddr_t)wq, (THREAD_UNINT));
1380 workqueue_unlock(p);
1381
1382 thread_block(THREAD_CONTINUE_NULL);
1383
1384 workqueue_lock_spin(p);
1385 }
1386 workqueue_unlock(p);
1387
1388 KERNEL_DEBUG(0x9008088 | DBG_FUNC_END, 0, 0, 0, 0, 0);
1389 }
1390 }
1391
1392 /*
1393 * Routine: workqueue_exit
1394 *
1395 * Function: clean up the work queue structure(s) now that there are no threads
1396 * left running inside the work queue (except possibly current_thread).
1397 *
1398 * Conditions: Called by the last thread in the process.
1399 * Called against current process.
1400 */
1401 void
1402 workqueue_exit(struct proc *p)
1403 {
1404 struct workqueue * wq;
1405 struct threadlist * tl, *tlist;
1406 struct uthread *uth;
1407 int wq_size = 0;
1408
1409 wq = (struct workqueue *)p->p_wqptr;
1410 if (wq != NULL) {
1411
1412 KERNEL_DEBUG(0x900808c | DBG_FUNC_START, p->p_wqptr, 0, 0, 0, 0);
1413
1414 wq_size = p->p_wqsize;
1415 p->p_wqptr = NULL;
1416 p->p_wqsize = 0;
1417
1418 /*
1419 * Clean up workqueue data structures for threads that exited and
1420 * didn't get a chance to clean up after themselves.
1421 */
1422 TAILQ_FOREACH_SAFE(tl, &wq->wq_thrunlist, th_entry, tlist) {
1423
1424 thread_sched_call(tl->th_thread, NULL);
1425
1426 uth = get_bsdthread_info(tl->th_thread);
1427 if (uth != (struct uthread *)0) {
1428 uth->uu_threadlist = NULL;
1429 }
1430 TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry);
1431
1432 /*
1433 * drop our last ref on the thread
1434 */
1435 thread_deallocate(tl->th_thread);
1436
1437 kfree(tl, sizeof(struct threadlist));
1438 }
1439 TAILQ_FOREACH_SAFE(tl, &wq->wq_thidlelist, th_entry, tlist) {
1440 workqueue_removethread(tl, 1);
1441 }
1442 thread_call_free(wq->wq_atimer_call);
1443
1444 kfree(wq, wq_size);
1445
1446 KERNEL_DEBUG(0x900808c | DBG_FUNC_END, 0, 0, 0, 0, 0);
1447 }
1448 }
1449
1450
1451 static int workqueue_importance[WORKQUEUE_NUMPRIOS] =
1452 {
1453 2, 0, -2, INT_MIN,
1454 };
1455
1456 #define WORKQ_POLICY_TIMESHARE 1
1457
1458 static int workqueue_policy[WORKQUEUE_NUMPRIOS] =
1459 {
1460 WORKQ_POLICY_TIMESHARE, WORKQ_POLICY_TIMESHARE, WORKQ_POLICY_TIMESHARE, WORKQ_POLICY_TIMESHARE
1461 };
1462
1463
1464
1465 static boolean_t
1466 workqueue_run_one(proc_t p, struct workqueue *wq, boolean_t overcommit, int priority)
1467 {
1468 boolean_t ran_one;
1469
1470 if (wq->wq_thidlecount == 0) {
1471 if (overcommit == FALSE) {
1472 if (wq->wq_constrained_threads_scheduled < wq->wq_affinity_max)
1473 workqueue_addnewthread(wq, overcommit);
1474 } else {
1475 workqueue_addnewthread(wq, overcommit);
1476
1477 if (wq->wq_thidlecount == 0)
1478 return (FALSE);
1479 }
1480 }
1481 ran_one = workqueue_run_nextreq(p, wq, THREAD_NULL, FALSE, overcommit, priority, -1);
1482 /*
1483 * workqueue_run_nextreq is responsible for
1484 * dropping the workqueue lock in all cases
1485 */
1486 workqueue_lock_spin(p);
1487
1488 return (ran_one);
1489 }
1490
1491
1492
1493 /*
1494 * workqueue_run_nextreq:
1495 * called with the workqueue lock held...
1496 * responsible for dropping it in all cases
1497 */
1498 static boolean_t
1499 workqueue_run_nextreq(proc_t p, struct workqueue *wq, thread_t thread,
1500 boolean_t force_oc, boolean_t overcommit, int oc_prio, int oc_affinity)
1501 {
1502 thread_t th_to_run = THREAD_NULL;
1503 thread_t th_to_park = THREAD_NULL;
1504 int wake_thread = 0;
1505 int reuse_thread = WQ_FLAG_THREAD_REUSE;
1506 uint32_t priority, orig_priority;
1507 uint32_t affinity_tag, orig_affinity_tag;
1508 uint32_t i, n;
1509 uint32_t busycount;
1510 uint32_t us_to_wait;
1511 struct threadlist *tl = NULL;
1512 struct threadlist *ttl = NULL;
1513 struct uthread *uth = NULL;
1514 boolean_t start_timer = FALSE;
1515 boolean_t adjust_counters = TRUE;
1516 uint64_t curtime;
1517
1518
1519 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_START, wq, thread, wq->wq_thidlecount, wq->wq_reqcount, 0);
1520
1521 if (thread != THREAD_NULL) {
1522 uth = get_bsdthread_info(thread);
1523
1524 if ( (tl = uth->uu_threadlist) == NULL)
1525 panic("wq thread with no threadlist ");
1526 }
1527 /*
1528 * from here until we drop the workq lock
1529 * we can't be pre-empted since we hold
1530 * the lock in spin mode... this is important
1531 * since we have to independently update the priority
1532 * and affinity that the thread is associated with
1533 * and these values are used to index the multi-dimensional
1534 * counter arrays in 'workqueue_callback'
1535 */
1536 dispatch_overcommit:
1537
1538 if (overcommit == TRUE || force_oc == TRUE) {
1539 uint32_t min_scheduled = 0;
1540 uint32_t scheduled_count;
1541 uint32_t active_count;
1542 uint32_t t_affinity = 0;
1543
1544 priority = oc_prio;
1545
1546 if ((affinity_tag = oc_affinity) == (uint32_t)-1) {
1547 for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; affinity_tag++) {
1548 /*
1549 * look for the affinity group with the least number of threads
1550 */
1551 scheduled_count = 0;
1552 active_count = 0;
1553
1554 for (i = 0; i <= priority; i++) {
1555 scheduled_count += wq->wq_thscheduled_count[i][affinity_tag];
1556 active_count += wq->wq_thactive_count[i][affinity_tag];
1557 }
1558 if (active_count == 0) {
1559 t_affinity = affinity_tag;
1560 break;
1561 }
1562 if (affinity_tag == 0 || scheduled_count < min_scheduled) {
1563 min_scheduled = scheduled_count;
1564 t_affinity = affinity_tag;
1565 }
1566 }
1567 affinity_tag = t_affinity;
1568 }
1569 if (thread != THREAD_NULL) {
1570 th_to_run = thread;
1571 goto pick_up_work;
1572 }
1573 goto grab_idle_thread;
1574 }
1575 if (wq->wq_reqcount) {
1576 for (priority = 0; priority < WORKQUEUE_NUMPRIOS; priority++) {
1577 if (wq->wq_requests[priority])
1578 break;
1579 }
1580 assert(priority < WORKQUEUE_NUMPRIOS);
1581
1582 if (wq->wq_ocrequests[priority] && (thread != THREAD_NULL || wq->wq_thidlecount)) {
1583 /*
1584 * handle delayed overcommit request...
1585 * they have priority over normal requests
1586 * within a given priority level
1587 */
1588 wq->wq_reqcount--;
1589 wq->wq_requests[priority]--;
1590 wq->wq_ocrequests[priority]--;
1591
1592 oc_prio = priority;
1593 overcommit = TRUE;
1594
1595 goto dispatch_overcommit;
1596 }
1597 }
1598 /*
1599 * if we get here, the work should be handled by a constrained thread
1600 */
1601 if (wq->wq_reqcount == 0 || wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
1602 /*
1603 * no work to do, or we're already at or over the scheduling limit for
1604 * constrained threads... just return or park the thread...
1605 * do not start the timer for this condition... if we don't have any work,
1606 * we'll check again when new work arrives... if we're over the limit, we need 1 or more
1607 * constrained threads to return to the kernel before we can dispatch additional work
1608 */
1609 if ((th_to_park = thread) == THREAD_NULL)
1610 goto out_of_work;
1611 goto parkit;
1612 }
1613
1614 curtime = mach_absolute_time();
1615
1616 if (thread != THREAD_NULL) {
1617
1618 affinity_tag = tl->th_affinity_tag;
1619
1620 /*
1621 * check to see if the affinity group this thread is
1622 * associated with is still within the bounds of the
1623 * specified concurrency for the priority level
1624 * we're considering running work for
1625 */
1626 if (affinity_tag < wq->wq_reqconc[priority]) {
1627 uint32_t bcount = 0;
1628 uint32_t acount = 0;
1629 uint32_t tcount = 0;
1630
1631 /*
1632 * we're a worker thread from the pool... currently we
1633 * are considered 'active' which means we're counted
1634 * in "wq_thactive_count"
1635 * add up the active counts of all the priority levels
1636 * up to and including the one we want to schedule
1637 */
1638 for (i = 0; i <= priority; i++) {
1639
1640 tcount = wq->wq_thactive_count[i][affinity_tag];
1641 acount += tcount;
1642
1643 if (tcount == 0 && wq->wq_thscheduled_count[i][affinity_tag]) {
1644 if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i][affinity_tag]))
1645 bcount++;
1646 }
1647 }
1648 if ((acount + bcount) == 1) {
1649 /*
1650 * we're the only active thread associated with our
1651 * affinity group at this priority level and higher,
1652 * and there are no threads considered 'busy',
1653 * so pick up some work and keep going
1654 */
1655 th_to_run = thread;
1656 goto pick_up_work;
1657 }
1658 if (wq->wq_reqconc[priority] == 1) {
1659 /*
1660 * we have at least one other active or busy thread running at this
1661 * priority level or higher and since we only have
1662 * 1 affinity group to schedule against, no need
1663 * to try and find another... we can't start up another thread to
1664 * service the request and we already have the info
1665 * needed to determine if we need to start a timer or not
1666 */
1667 if (acount == 1) {
1668 /*
1669 * we're the only active thread, but we must have found
1670 * at least 1 busy thread, so indicate that we need
1671 * to start a timer
1672 */
1673 busycount = 1;
1674 } else
1675 busycount = 0;
1676
1677 affinity_tag = 1;
1678 goto cant_schedule;
1679 }
1680 }
1681 /*
1682 * there's more than 1 thread running in this affinity group
1683 * or the concurrency level has been cut back for this priority...
1684 * let's continue on and look for an 'empty' group to run this
1685 * work request in
1686 */
1687 }
1688 busycount = 0;
1689
1690 for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; affinity_tag++) {
1691 boolean_t can_schedule;
1692
1693 /*
1694 * look for first affinity group that is currently not active
1695 * i.e. no active threads at this priority level or higher
1696 * and no threads that have run recently
1697 */
1698 for (i = 0; i <= priority; i++) {
1699 can_schedule = FALSE;
1700
1701 if (wq->wq_thactive_count[i][affinity_tag])
1702 break;
1703
1704 if (wq->wq_thscheduled_count[i][affinity_tag] &&
1705 wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i][affinity_tag])) {
1706 busycount++;
1707 break;
1708 }
1709 can_schedule = TRUE;
1710 }
1711 if (can_schedule == TRUE)
1712 break;
1713 }
1714 cant_schedule:
1715
1716 if (affinity_tag >= wq->wq_reqconc[priority]) {
1717 /*
1718 * we've already got at least 1 thread per
1719 * affinity group in the active state...
1720 */
1721 if (busycount) {
1722 /*
1723 * we found at least 1 thread in the
1724 * 'busy' state... make sure we start
1725 * the timer because if they are the only
1726 * threads keeping us from scheduling
1727 * this work request, we won't get a callback
1728 * to kick off the timer... we need to
1729 * start it now...
1730 */
1731 WQ_TIMER_NEEDED(wq, start_timer);
1732 }
1733 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_NONE, wq, busycount, start_timer, 0, 0);
1734
1735 if (thread != THREAD_NULL) {
1736 /*
1737 * go park this one for later
1738 */
1739 th_to_park = thread;
1740 goto parkit;
1741 }
1742 goto out_of_work;
1743 }
1744 if (thread != THREAD_NULL) {
1745 /*
1746 * we're overbooked on the affinity group this thread is
1747 * currently associated with, but we have work to do
1748 * and at least 1 idle processor, so we'll just retarget
1749 * this thread to a new affinity group
1750 */
1751 th_to_run = thread;
1752 goto pick_up_work;
1753 }
1754
1755 grab_idle_thread:
1756 if (wq->wq_thidlecount == 0) {
1757 /*
1758 * we don't have a thread to schedule, but we have
1759 * work to do and at least 1 affinity group that
1760 * doesn't currently have an active thread...
1761 */
1762 WQ_TIMER_NEEDED(wq, start_timer);
1763
1764 KERNEL_DEBUG(0xefffd118, wq, wq->wq_nthreads, start_timer, 0, 0);
1765
1766 goto no_thread_to_run;
1767 }
1768 /*
1769 * we've got a candidate (affinity group with no currently
1770 * active threads) to start a new thread on...
1771 * we already know there is both work available
1772 * and an idle thread, so activate a thread and then
1773 * fall into the code that pulls a new work request...
1774 */
1775 TAILQ_FOREACH(ttl, &wq->wq_thidlelist, th_entry) {
1776 if (ttl->th_affinity_tag == affinity_tag || ttl->th_affinity_tag == (uint16_t)-1) {
1777
1778 TAILQ_REMOVE(&wq->wq_thidlelist, ttl, th_entry);
1779 tl = ttl;
1780
1781 break;
1782 }
1783 }
1784 if (tl == NULL) {
1785 tl = TAILQ_FIRST(&wq->wq_thidlelist);
1786 TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
1787 }
1788 wq->wq_thidlecount--;
1789
1790 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, tl, th_entry);
1791
1792 if ((tl->th_flags & TH_LIST_SUSPENDED) == TH_LIST_SUSPENDED) {
1793 tl->th_flags &= ~TH_LIST_SUSPENDED;
1794 reuse_thread = 0;
1795
1796 } else if ((tl->th_flags & TH_LIST_BLOCKED) == TH_LIST_BLOCKED) {
1797 tl->th_flags &= ~TH_LIST_BLOCKED;
1798 wake_thread = 1;
1799 }
1800 tl->th_flags |= TH_LIST_RUNNING | TH_LIST_BUSY;
1801
1802 wq->wq_threads_scheduled++;
1803 wq->wq_thscheduled_count[priority][affinity_tag]++;
1804 OSAddAtomic(1, &wq->wq_thactive_count[priority][affinity_tag]);
1805
1806 adjust_counters = FALSE;
1807 th_to_run = tl->th_thread;
1808
1809 pick_up_work:
1810 if (overcommit == FALSE && force_oc == FALSE) {
1811 wq->wq_reqcount--;
1812 wq->wq_requests[priority]--;
1813
1814 if ( !(tl->th_flags & TH_LIST_CONSTRAINED)) {
1815 wq->wq_constrained_threads_scheduled++;
1816 tl->th_flags |= TH_LIST_CONSTRAINED;
1817 }
1818 } else {
1819 if (tl->th_flags & TH_LIST_CONSTRAINED) {
1820 wq->wq_constrained_threads_scheduled--;
1821 tl->th_flags &= ~TH_LIST_CONSTRAINED;
1822 }
1823 }
1824 orig_priority = tl->th_priority;
1825 orig_affinity_tag = tl->th_affinity_tag;
1826
1827 tl->th_priority = priority;
1828 tl->th_affinity_tag = affinity_tag;
1829
1830 if (adjust_counters == TRUE && (orig_priority != priority || orig_affinity_tag != affinity_tag)) {
1831 /*
1832 * we need to adjust these counters based on this
1833 * thread's new disposition w/r to affinity and priority
1834 */
1835 OSAddAtomic(-1, &wq->wq_thactive_count[orig_priority][orig_affinity_tag]);
1836 OSAddAtomic(1, &wq->wq_thactive_count[priority][affinity_tag]);
1837
1838 wq->wq_thscheduled_count[orig_priority][orig_affinity_tag]--;
1839 wq->wq_thscheduled_count[priority][affinity_tag]++;
1840 }
1841 wq->wq_thread_yielded_count = 0;
1842
1843 workqueue_unlock(p);
1844
1845 if (orig_affinity_tag != affinity_tag) {
1846 /*
1847 * this thread's affinity does not match the affinity group
1848 * its being placed on (it's either a brand new thread or
1849 * we're retargeting an existing thread to a new group)...
1850 * affinity tag of 0 means no affinity...
1851 * but we want our tags to be 0 based because they
1852 * are used to index arrays, so...
1853 * keep it 0 based internally and bump by 1 when
1854 * calling out to set it
1855 */
1856 KERNEL_DEBUG(0xefffd114 | DBG_FUNC_START, wq, orig_affinity_tag, 0, 0, 0);
1857
1858 (void)thread_affinity_set(th_to_run, affinity_tag + 1);
1859
1860 KERNEL_DEBUG(0xefffd114 | DBG_FUNC_END, wq, affinity_tag, 0, 0, 0);
1861 }
1862 if (orig_priority != priority) {
1863 thread_precedence_policy_data_t precedinfo;
1864 thread_extended_policy_data_t extinfo;
1865 uint32_t policy;
1866 #if CONFIG_EMBEDDED
1867 int retval = 0;
1868
1869 /* sets the saved importance for apple ios daemon if backgrounded. else returns 0 */
1870 retval = proc_setthread_saved_importance(th_to_run, workqueue_importance[priority]);
1871 if (retval == 0) {
1872 #endif /* CONFIG_EMBEDDED */
1873 policy = workqueue_policy[priority];
1874
1875 KERNEL_DEBUG(0xefffd120 | DBG_FUNC_START, wq, orig_priority, tl->th_policy, 0, 0);
1876
1877 if ((orig_priority == WORKQUEUE_BG_PRIOQUEUE) || (priority == WORKQUEUE_BG_PRIOQUEUE)) {
1878 if (orig_priority == WORKQUEUE_BG_PRIOQUEUE) {
1879 /* remove the disk throttle, importance will be reset in anycase */
1880 proc_restore_workq_bgthreadpolicy(th_to_run);
1881 }
1882
1883 if (priority == WORKQUEUE_BG_PRIOQUEUE) {
1884 proc_apply_workq_bgthreadpolicy(th_to_run);
1885 }
1886 }
1887
1888 if (tl->th_policy != policy) {
1889 extinfo.timeshare = policy;
1890 (void)thread_policy_set_internal(th_to_run, THREAD_EXTENDED_POLICY, (thread_policy_t)&extinfo, THREAD_EXTENDED_POLICY_COUNT);
1891
1892 tl->th_policy = policy;
1893 }
1894
1895 precedinfo.importance = workqueue_importance[priority];
1896 (void)thread_policy_set_internal(th_to_run, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
1897
1898
1899 KERNEL_DEBUG(0xefffd120 | DBG_FUNC_END, wq, priority, policy, 0, 0);
1900 #if CONFIG_EMBEDDED
1901 }
1902 #endif /* CONFIG_EMBEDDED */
1903 }
1904 if (kdebug_enable) {
1905 int lpri = -1;
1906 int laffinity = -1;
1907 int first = -1;
1908 uint32_t code = 0xefffd02c | DBG_FUNC_START;
1909
1910 for (n = 0; n < WORKQUEUE_NUMPRIOS; n++) {
1911 for (i = 0; i < wq->wq_affinity_max; i++) {
1912 if (wq->wq_thactive_count[n][i]) {
1913 if (lpri != -1) {
1914 KERNEL_DEBUG(code, lpri, laffinity, wq->wq_thactive_count[lpri][laffinity], first, 0);
1915 code = 0xefffd02c;
1916 first = 0;
1917 }
1918 lpri = n;
1919 laffinity = i;
1920 }
1921 }
1922 }
1923 if (lpri != -1) {
1924 if (first == -1)
1925 first = 0xeeeeeeee;
1926 KERNEL_DEBUG(0xefffd02c | DBG_FUNC_END, lpri, laffinity, wq->wq_thactive_count[lpri][laffinity], first, 0);
1927 }
1928 }
1929 /*
1930 * if current thread is reused for work request, does not return via unix_syscall
1931 */
1932 wq_runreq(p, overcommit, priority, th_to_run, tl, reuse_thread, wake_thread, (thread == th_to_run));
1933
1934 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, wq, thread_tid(th_to_run), overcommit, 1, 0);
1935
1936 return (TRUE);
1937
1938 out_of_work:
1939 /*
1940 * we have no work to do or we are fully booked
1941 * w/r to running threads...
1942 */
1943 no_thread_to_run:
1944 workqueue_unlock(p);
1945
1946 if (start_timer)
1947 workqueue_interval_timer_start(wq);
1948
1949 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, wq, thread_tid(thread), 0, 2, 0);
1950
1951 return (FALSE);
1952
1953 parkit:
1954 /*
1955 * this is a workqueue thread with no more
1956 * work to do... park it for now
1957 */
1958 TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry);
1959 tl->th_flags &= ~TH_LIST_RUNNING;
1960
1961 tl->th_flags |= TH_LIST_BLOCKED;
1962 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, tl, th_entry);
1963
1964 thread_sched_call(th_to_park, NULL);
1965
1966 OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority][tl->th_affinity_tag]);
1967 wq->wq_thscheduled_count[tl->th_priority][tl->th_affinity_tag]--;
1968 wq->wq_threads_scheduled--;
1969
1970 if (tl->th_flags & TH_LIST_CONSTRAINED) {
1971 wq->wq_constrained_threads_scheduled--;
1972 wq->wq_lflags &= ~WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
1973 tl->th_flags &= ~TH_LIST_CONSTRAINED;
1974 }
1975 if (wq->wq_thidlecount < 100)
1976 us_to_wait = wq_reduce_pool_window_usecs - (wq->wq_thidlecount * (wq_reduce_pool_window_usecs / 100));
1977 else
1978 us_to_wait = wq_reduce_pool_window_usecs / 100;
1979
1980 wq->wq_thidlecount++;
1981 wq->wq_lflags &= ~WQL_EXCEEDED_TOTAL_THREAD_LIMIT;
1982
1983 assert_wait_timeout((caddr_t)tl, (THREAD_INTERRUPTIBLE), us_to_wait, NSEC_PER_USEC);
1984
1985 workqueue_unlock(p);
1986
1987 if (start_timer)
1988 workqueue_interval_timer_start(wq);
1989
1990 KERNEL_DEBUG1(0xefffd018 | DBG_FUNC_START, wq, wq->wq_threads_scheduled, wq->wq_thidlecount, us_to_wait, thread_tid(th_to_park));
1991 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, wq, thread_tid(thread), 0, 3, 0);
1992
1993 thread_block((thread_continue_t)wq_unpark_continue);
1994 /* NOT REACHED */
1995
1996 return (FALSE);
1997 }
1998
1999
2000 static void
2001 wq_unsuspend_continue(void)
2002 {
2003 struct uthread *uth = NULL;
2004 thread_t th_to_unsuspend;
2005 struct threadlist *tl;
2006 proc_t p;
2007
2008 th_to_unsuspend = current_thread();
2009 uth = get_bsdthread_info(th_to_unsuspend);
2010
2011 if (uth != NULL && (tl = uth->uu_threadlist) != NULL) {
2012
2013 if ((tl->th_flags & (TH_LIST_RUNNING | TH_LIST_BUSY)) == TH_LIST_RUNNING) {
2014 /*
2015 * most likely a normal resume of this thread occurred...
2016 * it's also possible that the thread was aborted after we
2017 * finished setting it up so that it could be dispatched... if
2018 * so, thread_bootstrap_return will notice the abort and put
2019 * the thread on the path to self-destruction
2020 */
2021 normal_resume_to_user:
2022 thread_sched_call(th_to_unsuspend, workqueue_callback);
2023
2024 thread_bootstrap_return();
2025 }
2026 /*
2027 * if we get here, it's because we've been resumed due to
2028 * an abort of this thread (process is crashing)
2029 */
2030 p = current_proc();
2031
2032 workqueue_lock_spin(p);
2033
2034 if (tl->th_flags & TH_LIST_SUSPENDED) {
2035 /*
2036 * thread has been aborted while still on our idle
2037 * queue... remove it from our domain...
2038 * workqueue_removethread consumes the lock
2039 */
2040 workqueue_removethread(tl, 0);
2041
2042 thread_bootstrap_return();
2043 }
2044 while ((tl->th_flags & TH_LIST_BUSY)) {
2045 /*
2046 * this thread was aborted after we started making
2047 * it runnable, but before we finished dispatching it...
2048 * we need to wait for that process to finish,
2049 * and we need to ask for a wakeup instead of a
2050 * thread_resume since the abort has already resumed us
2051 */
2052 tl->th_flags |= TH_LIST_NEED_WAKEUP;
2053
2054 assert_wait((caddr_t)tl, (THREAD_UNINT));
2055
2056 workqueue_unlock(p);
2057
2058 thread_block(THREAD_CONTINUE_NULL);
2059
2060 workqueue_lock_spin(p);
2061 }
2062 workqueue_unlock(p);
2063 /*
2064 * we have finished setting up the thread's context...
2065 * thread_bootstrap_return will take us through the abort path
2066 * where the thread will self destruct
2067 */
2068 goto normal_resume_to_user;
2069 }
2070 thread_bootstrap_return();
2071 }
2072
2073
2074 static void
2075 wq_unpark_continue(void)
2076 {
2077 struct uthread *uth = NULL;
2078 struct threadlist *tl;
2079 thread_t th_to_unpark;
2080 proc_t p;
2081
2082 th_to_unpark = current_thread();
2083 uth = get_bsdthread_info(th_to_unpark);
2084
2085 if (uth != NULL) {
2086 if ((tl = uth->uu_threadlist) != NULL) {
2087
2088 if ((tl->th_flags & (TH_LIST_RUNNING | TH_LIST_BUSY)) == TH_LIST_RUNNING) {
2089 /*
2090 * a normal wakeup of this thread occurred... no need
2091 * for any synchronization with the timer and wq_runreq
2092 */
2093 normal_return_to_user:
2094 thread_sched_call(th_to_unpark, workqueue_callback);
2095
2096 KERNEL_DEBUG(0xefffd018 | DBG_FUNC_END, tl->th_workq, 0, 0, 0, 0);
2097
2098 thread_exception_return();
2099 }
2100 p = current_proc();
2101
2102 workqueue_lock_spin(p);
2103
2104 if ( !(tl->th_flags & TH_LIST_RUNNING)) {
2105 /*
2106 * the timer popped us out and we've not
2107 * been moved off of the idle list
2108 * so we should now self-destruct
2109 *
2110 * workqueue_removethread consumes the lock
2111 */
2112 workqueue_removethread(tl, 0);
2113
2114 thread_exception_return();
2115 }
2116 /*
2117 * the timer woke us up, but we have already
2118 * started to make this a runnable thread,
2119 * but have not yet finished that process...
2120 * so wait for the normal wakeup
2121 */
2122 while ((tl->th_flags & TH_LIST_BUSY)) {
2123
2124 assert_wait((caddr_t)tl, (THREAD_UNINT));
2125
2126 workqueue_unlock(p);
2127
2128 thread_block(THREAD_CONTINUE_NULL);
2129
2130 workqueue_lock_spin(p);
2131 }
2132 /*
2133 * we have finished setting up the thread's context
2134 * now we can return as if we got a normal wakeup
2135 */
2136 workqueue_unlock(p);
2137
2138 goto normal_return_to_user;
2139 }
2140 }
2141 thread_exception_return();
2142 }
2143
2144
2145
2146 static void
2147 wq_runreq(proc_t p, boolean_t overcommit, uint32_t priority, thread_t th, struct threadlist *tl,
2148 int reuse_thread, int wake_thread, int return_directly)
2149 {
2150 int ret = 0;
2151 boolean_t need_resume = FALSE;
2152
2153 KERNEL_DEBUG1(0xefffd004 | DBG_FUNC_START, tl->th_workq, tl->th_priority, tl->th_affinity_tag, thread_tid(current_thread()), thread_tid(th));
2154
2155 ret = setup_wqthread(p, th, overcommit, priority, reuse_thread, tl);
2156
2157 if (ret != 0)
2158 panic("setup_wqthread failed %x\n", ret);
2159
2160 if (return_directly) {
2161 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, tl->th_workq, 0, 0, 4, 0);
2162
2163 thread_exception_return();
2164
2165 panic("wq_runreq: thread_exception_return returned ...\n");
2166 }
2167 if (wake_thread) {
2168 workqueue_lock_spin(p);
2169
2170 tl->th_flags &= ~TH_LIST_BUSY;
2171 wakeup(tl);
2172
2173 workqueue_unlock(p);
2174 } else {
2175 KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_END, tl->th_workq, 0, 0, thread_tid(current_thread()), thread_tid(th));
2176
2177 workqueue_lock_spin(p);
2178
2179 if (tl->th_flags & TH_LIST_NEED_WAKEUP)
2180 wakeup(tl);
2181 else
2182 need_resume = TRUE;
2183
2184 tl->th_flags &= ~(TH_LIST_BUSY | TH_LIST_NEED_WAKEUP);
2185
2186 workqueue_unlock(p);
2187
2188 if (need_resume) {
2189 /*
2190 * need to do this outside of the workqueue spin lock
2191 * since thread_resume locks the thread via a full mutex
2192 */
2193 thread_resume(th);
2194 }
2195 }
2196 }
2197
2198
2199 int
2200 setup_wqthread(proc_t p, thread_t th, boolean_t overcommit, uint32_t priority, int reuse_thread, struct threadlist *tl)
2201 {
2202 uint32_t flags = reuse_thread | WQ_FLAG_THREAD_NEWSPI;
2203
2204 if (overcommit == TRUE)
2205 flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2206
2207 flags |= priority;
2208
2209 #if defined(__i386__) || defined(__x86_64__)
2210 int isLP64 = 0;
2211
2212 isLP64 = IS_64BIT_PROCESS(p);
2213 /*
2214 * Set up i386 registers & function call.
2215 */
2216 if (isLP64 == 0) {
2217 x86_thread_state32_t state;
2218 x86_thread_state32_t *ts = &state;
2219
2220 ts->eip = (int)p->p_wqthread;
2221 ts->eax = (unsigned int)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE);
2222 ts->ebx = (unsigned int)tl->th_thport;
2223 ts->ecx = (unsigned int)(tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE);
2224 ts->edx = (unsigned int)0;
2225 ts->edi = (unsigned int)flags;
2226 ts->esi = (unsigned int)0;
2227 /*
2228 * set stack pointer
2229 */
2230 ts->esp = (int)((vm_offset_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE) - C_32_STK_ALIGN));
2231
2232 thread_set_wq_state32(th, (thread_state_t)ts);
2233
2234 } else {
2235 x86_thread_state64_t state64;
2236 x86_thread_state64_t *ts64 = &state64;
2237
2238 ts64->rip = (uint64_t)p->p_wqthread;
2239 ts64->rdi = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE);
2240 ts64->rsi = (uint64_t)(tl->th_thport);
2241 ts64->rdx = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE);
2242 ts64->rcx = (uint64_t)0;
2243 ts64->r8 = (uint64_t)flags;
2244 ts64->r9 = (uint64_t)0;
2245
2246 /*
2247 * set stack pointer aligned to 16 byte boundary
2248 */
2249 ts64->rsp = (uint64_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE) - C_64_REDZONE_LEN);
2250
2251 thread_set_wq_state64(th, (thread_state_t)ts64);
2252 }
2253 #else
2254 #error setup_wqthread not defined for this architecture
2255 #endif
2256 return(0);
2257 }
2258
2259 int
2260 fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
2261 {
2262 struct workqueue * wq;
2263 int error = 0;
2264 int activecount;
2265 uint32_t pri, affinity;
2266
2267 workqueue_lock_spin(p);
2268 if ((wq = p->p_wqptr) == NULL) {
2269 error = EINVAL;
2270 goto out;
2271 }
2272 activecount = 0;
2273
2274 for (pri = 0; pri < WORKQUEUE_NUMPRIOS; pri++) {
2275 for (affinity = 0; affinity < wq->wq_affinity_max; affinity++)
2276 activecount += wq->wq_thactive_count[pri][affinity];
2277 }
2278 pwqinfo->pwq_nthreads = wq->wq_nthreads;
2279 pwqinfo->pwq_runthreads = activecount;
2280 pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
2281 pwqinfo->pwq_state = 0;
2282
2283 if (wq->wq_lflags & WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT)
2284 pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
2285
2286 if (wq->wq_lflags & WQL_EXCEEDED_TOTAL_THREAD_LIMIT)
2287 pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
2288
2289 out:
2290 workqueue_unlock(p);
2291 return(error);
2292 }
2293
2294 /* Set target concurrency of one of the queue(0,1,2) with specified value */
2295 int
2296 proc_settargetconc(pid_t pid, int queuenum, int32_t targetconc)
2297 {
2298 proc_t p, self;
2299 uint64_t addr;
2300 int32_t conc = targetconc;
2301 int error = 0;
2302 vm_map_t oldmap = VM_MAP_NULL;
2303 int gotref = 0;
2304
2305 self = current_proc();
2306 if (self->p_pid != pid) {
2307 /* if not on self, hold a refernce on the process */
2308
2309 if (pid == 0)
2310 return(EINVAL);
2311
2312 p = proc_find(pid);
2313
2314 if (p == PROC_NULL)
2315 return(ESRCH);
2316 gotref = 1;
2317
2318 } else
2319 p = self;
2320
2321 if ((addr = p->p_targconc) == (uint64_t)0) {
2322 error = EINVAL;
2323 goto out;
2324 }
2325
2326
2327 if ((queuenum >= WQ_MAXPRI_MIN) && (queuenum <= WQ_MAXPRI_MAX)) {
2328 addr += (queuenum * sizeof(int32_t));
2329 if (gotref == 1)
2330 oldmap = vm_map_switch(get_task_map(p->task));
2331 error = copyout(&conc, addr, sizeof(int32_t));
2332 if (gotref == 1)
2333 (void)vm_map_switch(oldmap);
2334
2335 } else {
2336 error = EINVAL;
2337 }
2338 out:
2339 if (gotref == 1)
2340 proc_rele(p);
2341 return(error);
2342 }
2343
2344
2345 /* Set target concurrency on all the prio queues with specified value */
2346 int
2347 proc_setalltargetconc(pid_t pid, int32_t * targetconcp)
2348 {
2349 proc_t p, self;
2350 uint64_t addr;
2351 int error = 0;
2352 vm_map_t oldmap = VM_MAP_NULL;
2353 int gotref = 0;
2354
2355 self = current_proc();
2356 if (self->p_pid != pid) {
2357 /* if not on self, hold a refernce on the process */
2358
2359 if (pid == 0)
2360 return(EINVAL);
2361
2362 p = proc_find(pid);
2363
2364 if (p == PROC_NULL)
2365 return(ESRCH);
2366 gotref = 1;
2367
2368 } else
2369 p = self;
2370
2371 if ((addr = (uint64_t)p->p_targconc) == (uint64_t)0) {
2372 error = EINVAL;
2373 goto out;
2374 }
2375
2376
2377 if (gotref == 1)
2378 oldmap = vm_map_switch(get_task_map(p->task));
2379
2380 error = copyout(targetconcp, addr, WQ_PRI_NUM * sizeof(int32_t));
2381 if (gotref == 1)
2382 (void)vm_map_switch(oldmap);
2383
2384 out:
2385 if (gotref == 1)
2386 proc_rele(p);
2387 return(error);
2388 }
2389
2390 int thread_selfid(__unused struct proc *p, __unused struct thread_selfid_args *uap, uint64_t *retval)
2391 {
2392 thread_t thread = current_thread();
2393 *retval = thread_tid(thread);
2394 return KERN_SUCCESS;
2395 }
2396
2397 void
2398 pthread_init(void)
2399 {
2400 pthread_lck_grp_attr = lck_grp_attr_alloc_init();
2401 pthread_lck_grp = lck_grp_alloc_init("pthread", pthread_lck_grp_attr);
2402
2403 /*
2404 * allocate the lock attribute for pthread synchronizers
2405 */
2406 pthread_lck_attr = lck_attr_alloc_init();
2407
2408 workqueue_init_lock((proc_t) get_bsdtask_info(kernel_task));
2409 #if PSYNCH
2410 pthread_list_mlock = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr);
2411
2412 pth_global_hashinit();
2413 psynch_thcall = thread_call_allocate(psynch_wq_cleanup, NULL);
2414 psynch_zoneinit();
2415 #endif /* PSYNCH */
2416 }