]> git.saurik.com Git - apple/xnu.git/blob - bsd/kern/pthread_synch.c
3fbed75323f95459b522482c092a8fb70a70095d
[apple/xnu.git] / bsd / kern / pthread_synch.c
1 /*
2 * Copyright (c) 2000-2009 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28 /* Copyright (c) 1995-2005 Apple Computer, Inc. All Rights Reserved */
29 /*
30 * pthread_synch.c
31 */
32
33 #define _PTHREAD_CONDATTR_T
34 #define _PTHREAD_COND_T
35 #define _PTHREAD_MUTEXATTR_T
36 #define _PTHREAD_MUTEX_T
37 #define _PTHREAD_RWLOCKATTR_T
38 #define _PTHREAD_RWLOCK_T
39
40 #undef pthread_mutexattr_t
41 #undef pthread_mutex_t
42 #undef pthread_condattr_t
43 #undef pthread_cond_t
44 #undef pthread_rwlockattr_t
45 #undef pthread_rwlock_t
46
47 #include <sys/param.h>
48 #include <sys/queue.h>
49 #include <sys/resourcevar.h>
50 #include <sys/proc_internal.h>
51 #include <sys/kauth.h>
52 #include <sys/systm.h>
53 #include <sys/timeb.h>
54 #include <sys/times.h>
55 #include <sys/acct.h>
56 #include <sys/kernel.h>
57 #include <sys/wait.h>
58 #include <sys/signalvar.h>
59 #include <sys/syslog.h>
60 #include <sys/stat.h>
61 #include <sys/lock.h>
62 #include <sys/kdebug.h>
63 #include <sys/sysproto.h>
64 #include <sys/pthread_internal.h>
65 #include <sys/vm.h>
66 #include <sys/user.h> /* for coredump */
67 #include <sys/proc_info.h> /* for fill_procworkqueue */
68
69
70 #include <mach/mach_types.h>
71 #include <mach/vm_prot.h>
72 #include <mach/semaphore.h>
73 #include <mach/sync_policy.h>
74 #include <mach/task.h>
75 #include <kern/kern_types.h>
76 #include <kern/task.h>
77 #include <kern/clock.h>
78 #include <mach/kern_return.h>
79 #include <kern/thread.h>
80 #include <kern/sched_prim.h>
81 #include <kern/kalloc.h>
82 #include <kern/sched_prim.h> /* for thread_exception_return */
83 #include <kern/processor.h>
84 #include <kern/affinity.h>
85 #include <kern/assert.h>
86 #include <mach/mach_vm.h>
87 #include <mach/mach_param.h>
88 #include <mach/thread_status.h>
89 #include <mach/thread_policy.h>
90 #include <mach/message.h>
91 #include <mach/port.h>
92 #include <vm/vm_protos.h>
93 #include <vm/vm_map.h> /* for current_map() */
94 #include <vm/vm_fault.h>
95 #include <mach/thread_act.h> /* for thread_resume */
96 #include <machine/machine_routines.h>
97 #if defined(__i386__)
98 #include <i386/machine_routines.h>
99 #include <i386/eflags.h>
100 #include <i386/psl.h>
101 #include <i386/seg.h>
102 #endif
103
104 #include <libkern/OSAtomic.h>
105
106 #if 0
107 #undef KERNEL_DEBUG
108 #define KERNEL_DEBUG KERNEL_DEBUG_CONSTANT
109 #undef KERNEL_DEBUG1
110 #define KERNEL_DEBUG1 KERNEL_DEBUG_CONSTANT1
111 #endif
112
113 lck_grp_attr_t *pthread_lck_grp_attr;
114 lck_grp_t *pthread_lck_grp;
115 lck_attr_t *pthread_lck_attr;
116
117 extern kern_return_t thread_getstatus(register thread_t act, int flavor,
118 thread_state_t tstate, mach_msg_type_number_t *count);
119 extern kern_return_t thread_setstatus(thread_t thread, int flavor,
120 thread_state_t tstate, mach_msg_type_number_t count);
121 extern void thread_set_cthreadself(thread_t thread, uint64_t pself, int isLP64);
122 extern kern_return_t mach_port_deallocate(ipc_space_t, mach_port_name_t);
123 extern kern_return_t semaphore_signal_internal_trap(mach_port_name_t);
124
125 extern void workqueue_thread_yielded(void);
126
127 static int workqueue_additem(struct workqueue *wq, int prio, user_addr_t item, int affinity);
128 static boolean_t workqueue_run_nextitem(proc_t p, struct workqueue *wq, thread_t th,
129 user_addr_t oc_item, int oc_prio, int oc_affinity);
130 static void wq_runitem(proc_t p, user_addr_t item, thread_t th, struct threadlist *tl,
131 int reuse_thread, int wake_thread, int return_directly);
132 static void wq_unpark_continue(void);
133 static void wq_unsuspend_continue(void);
134 static int setup_wqthread(proc_t p, thread_t th, user_addr_t item, int reuse_thread, struct threadlist *tl);
135 static boolean_t workqueue_addnewthread(struct workqueue *wq, boolean_t oc_thread);
136 static void workqueue_removethread(struct threadlist *tl);
137 static void workqueue_lock_spin(proc_t);
138 static void workqueue_unlock(proc_t);
139 int proc_settargetconc(pid_t pid, int queuenum, int32_t targetconc);
140 int proc_setalltargetconc(pid_t pid, int32_t * targetconcp);
141
142 #define WQ_MAXPRI_MIN 0 /* low prio queue num */
143 #define WQ_MAXPRI_MAX 2 /* max prio queuenum */
144 #define WQ_PRI_NUM 3 /* number of prio work queues */
145
146 #define C_32_STK_ALIGN 16
147 #define C_64_STK_ALIGN 16
148 #define C_64_REDZONE_LEN 128
149 #define TRUNC_DOWN32(a,c) ((((uint32_t)a)-(c)) & ((uint32_t)(-(c))))
150 #define TRUNC_DOWN64(a,c) ((((uint64_t)a)-(c)) & ((uint64_t)(-(c))))
151
152
153 /*
154 * Flags filed passed to bsdthread_create and back in pthread_start
155 31 <---------------------------------> 0
156 _________________________________________
157 | flags(8) | policy(8) | importance(16) |
158 -----------------------------------------
159 */
160 void _pthread_start(pthread_t self, mach_port_t kport, void *(*fun)(void *), void * funarg, size_t stacksize, unsigned int flags);
161
162 #define PTHREAD_START_CUSTOM 0x01000000
163 #define PTHREAD_START_SETSCHED 0x02000000
164 #define PTHREAD_START_DETACHED 0x04000000
165 #define PTHREAD_START_POLICY_BITSHIFT 16
166 #define PTHREAD_START_POLICY_MASK 0xff
167 #define PTHREAD_START_IMPORTANCE_MASK 0xffff
168
169 #define SCHED_OTHER POLICY_TIMESHARE
170 #define SCHED_FIFO POLICY_FIFO
171 #define SCHED_RR POLICY_RR
172
173
174
175 int
176 bsdthread_create(__unused struct proc *p, struct bsdthread_create_args *uap, user_addr_t *retval)
177 {
178 kern_return_t kret;
179 void * sright;
180 int error = 0;
181 int allocated = 0;
182 mach_vm_offset_t stackaddr;
183 mach_vm_size_t th_allocsize = 0;
184 mach_vm_size_t user_stacksize;
185 mach_vm_size_t th_stacksize;
186 mach_vm_offset_t th_stackaddr;
187 mach_vm_offset_t th_stack;
188 mach_vm_offset_t th_pthread;
189 mach_port_name_t th_thport;
190 thread_t th;
191 user_addr_t user_func = uap->func;
192 user_addr_t user_funcarg = uap->func_arg;
193 user_addr_t user_stack = uap->stack;
194 user_addr_t user_pthread = uap->pthread;
195 unsigned int flags = (unsigned int)uap->flags;
196 vm_map_t vmap = current_map();
197 task_t ctask = current_task();
198 unsigned int policy, importance;
199
200 int isLP64 = 0;
201
202
203 if ((p->p_lflag & P_LREGISTER) == 0)
204 return(EINVAL);
205 #if 0
206 KERNEL_DEBUG_CONSTANT(0x9000080 | DBG_FUNC_START, flags, 0, 0, 0, 0);
207 #endif
208
209 isLP64 = IS_64BIT_PROCESS(p);
210
211
212 #if defined(__i386__) || defined(__x86_64__)
213 stackaddr = 0xB0000000;
214 #else
215 #error Need to define a stack address hint for this architecture
216 #endif
217 kret = thread_create(ctask, &th);
218 if (kret != KERN_SUCCESS)
219 return(ENOMEM);
220 thread_reference(th);
221
222 sright = (void *) convert_thread_to_port(th);
223 th_thport = ipc_port_copyout_send(sright, get_task_ipcspace(ctask));
224
225 if ((flags & PTHREAD_START_CUSTOM) == 0) {
226 th_stacksize = (mach_vm_size_t)user_stack; /* if it is custom them it is stacksize */
227 th_allocsize = th_stacksize + PTH_DEFAULT_GUARDSIZE + p->p_pthsize;
228
229 kret = mach_vm_map(vmap, &stackaddr,
230 th_allocsize,
231 page_size-1,
232 VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE , NULL,
233 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL,
234 VM_INHERIT_DEFAULT);
235 if (kret != KERN_SUCCESS)
236 kret = mach_vm_allocate(vmap,
237 &stackaddr, th_allocsize,
238 VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE);
239 if (kret != KERN_SUCCESS) {
240 error = ENOMEM;
241 goto out;
242 }
243 #if 0
244 KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_NONE, th_allocsize, stackaddr, 0, 2, 0);
245 #endif
246 th_stackaddr = stackaddr;
247 allocated = 1;
248 /*
249 * The guard page is at the lowest address
250 * The stack base is the highest address
251 */
252 kret = mach_vm_protect(vmap, stackaddr, PTH_DEFAULT_GUARDSIZE, FALSE, VM_PROT_NONE);
253
254 if (kret != KERN_SUCCESS) {
255 error = ENOMEM;
256 goto out1;
257 }
258 th_stack = (stackaddr + th_stacksize + PTH_DEFAULT_GUARDSIZE);
259 th_pthread = (stackaddr + th_stacksize + PTH_DEFAULT_GUARDSIZE);
260 user_stacksize = th_stacksize;
261
262 /*
263 * Pre-fault the first page of the new thread's stack and the page that will
264 * contain the pthread_t structure.
265 */
266 vm_fault( vmap,
267 vm_map_trunc_page(th_stack - PAGE_SIZE_64),
268 VM_PROT_READ | VM_PROT_WRITE,
269 FALSE,
270 THREAD_UNINT, NULL, 0);
271
272 vm_fault( vmap,
273 vm_map_trunc_page(th_pthread),
274 VM_PROT_READ | VM_PROT_WRITE,
275 FALSE,
276 THREAD_UNINT, NULL, 0);
277 } else {
278 th_stack = user_stack;
279 user_stacksize = user_stack;
280 th_pthread = user_pthread;
281 #if 0
282 KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_NONE, 0, 0, 0, 3, 0);
283 #endif
284 }
285
286 #if defined(__i386__) || defined(__x86_64__)
287 {
288 /*
289 * Set up i386 registers & function call.
290 */
291 if (isLP64 == 0) {
292 x86_thread_state32_t state;
293 x86_thread_state32_t *ts = &state;
294
295 ts->eip = (int)p->p_threadstart;
296 ts->eax = (unsigned int)th_pthread;
297 ts->ebx = (unsigned int)th_thport;
298 ts->ecx = (unsigned int)user_func;
299 ts->edx = (unsigned int)user_funcarg;
300 ts->edi = (unsigned int)user_stacksize;
301 ts->esi = (unsigned int)uap->flags;
302 /*
303 * set stack pointer
304 */
305 ts->esp = (int)((vm_offset_t)(th_stack-C_32_STK_ALIGN));
306
307 thread_set_wq_state32(th, (thread_state_t)ts);
308
309 } else {
310 x86_thread_state64_t state64;
311 x86_thread_state64_t *ts64 = &state64;
312
313 ts64->rip = (uint64_t)p->p_threadstart;
314 ts64->rdi = (uint64_t)th_pthread;
315 ts64->rsi = (uint64_t)(th_thport);
316 ts64->rdx = (uint64_t)user_func;
317 ts64->rcx = (uint64_t)user_funcarg;
318 ts64->r8 = (uint64_t)user_stacksize;
319 ts64->r9 = (uint64_t)uap->flags;
320 /*
321 * set stack pointer aligned to 16 byte boundary
322 */
323 ts64->rsp = (uint64_t)(th_stack - C_64_REDZONE_LEN);
324
325 thread_set_wq_state64(th, (thread_state_t)ts64);
326 }
327 }
328 #else
329 #error bsdthread_create not defined for this architecture
330 #endif
331 /* Set scheduling parameters if needed */
332 if ((flags & PTHREAD_START_SETSCHED) != 0) {
333 thread_extended_policy_data_t extinfo;
334 thread_precedence_policy_data_t precedinfo;
335
336 importance = (flags & PTHREAD_START_IMPORTANCE_MASK);
337 policy = (flags >> PTHREAD_START_POLICY_BITSHIFT) & PTHREAD_START_POLICY_MASK;
338
339 if (policy == SCHED_OTHER)
340 extinfo.timeshare = 1;
341 else
342 extinfo.timeshare = 0;
343 thread_policy_set(th, THREAD_EXTENDED_POLICY, (thread_policy_t)&extinfo, THREAD_EXTENDED_POLICY_COUNT);
344
345 #define BASEPRI_DEFAULT 31
346 precedinfo.importance = (importance - BASEPRI_DEFAULT);
347 thread_policy_set(th, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
348 }
349
350 kret = thread_resume(th);
351 if (kret != KERN_SUCCESS) {
352 error = EINVAL;
353 goto out1;
354 }
355 thread_deallocate(th); /* drop the creator reference */
356 #if 0
357 KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_END, error, th_pthread, 0, 0, 0);
358 #endif
359 *retval = th_pthread;
360
361 return(0);
362
363 out1:
364 if (allocated != 0)
365 (void)mach_vm_deallocate(vmap, stackaddr, th_allocsize);
366 out:
367 (void)mach_port_deallocate(get_task_ipcspace(ctask), th_thport);
368 (void)thread_terminate(th);
369 (void)thread_deallocate(th);
370 return(error);
371 }
372
373 int
374 bsdthread_terminate(__unused struct proc *p, struct bsdthread_terminate_args *uap, __unused int32_t *retval)
375 {
376 mach_vm_offset_t freeaddr;
377 mach_vm_size_t freesize;
378 kern_return_t kret;
379 mach_port_name_t kthport = (mach_port_name_t)uap->port;
380 mach_port_name_t sem = (mach_port_name_t)uap->sem;
381
382 freeaddr = (mach_vm_offset_t)uap->stackaddr;
383 freesize = uap->freesize;
384
385 #if 0
386 KERNEL_DEBUG_CONSTANT(0x9000084 |DBG_FUNC_START, freeaddr, freesize, kthport, 0xff, 0);
387 #endif
388 if ((freesize != (mach_vm_size_t)0) && (freeaddr != (mach_vm_offset_t)0)) {
389 kret = mach_vm_deallocate(current_map(), freeaddr, freesize);
390 if (kret != KERN_SUCCESS) {
391 return(EINVAL);
392 }
393 }
394
395 (void) thread_terminate(current_thread());
396 if (sem != MACH_PORT_NULL) {
397 kret = semaphore_signal_internal_trap(sem);
398 if (kret != KERN_SUCCESS) {
399 return(EINVAL);
400 }
401 }
402
403 if (kthport != MACH_PORT_NULL)
404 mach_port_deallocate(get_task_ipcspace(current_task()), kthport);
405 thread_exception_return();
406 panic("bsdthread_terminate: still running\n");
407 #if 0
408 KERNEL_DEBUG_CONSTANT(0x9000084 |DBG_FUNC_END, 0, 0, 0, 0xff, 0);
409 #endif
410 return(0);
411 }
412
413
414 int
415 bsdthread_register(struct proc *p, struct bsdthread_register_args *uap, __unused int32_t *retval)
416 {
417 /* prevent multiple registrations */
418 if ((p->p_lflag & P_LREGISTER) != 0)
419 return(EINVAL);
420 /* syscall randomizer test can pass bogus values */
421 if (uap->pthsize > MAX_PTHREAD_SIZE) {
422 return(EINVAL);
423 }
424 p->p_threadstart = uap->threadstart;
425 p->p_wqthread = uap->wqthread;
426 p->p_pthsize = uap->pthsize;
427 p->p_targconc = uap->targetconc_ptr;
428 p->p_dispatchqueue_offset = uap->dispatchqueue_offset;
429 proc_setregister(p);
430
431 return(0);
432 }
433
434 uint32_t wq_yielded_threshold = WQ_YIELDED_THRESHOLD;
435 uint32_t wq_yielded_window_usecs = WQ_YIELDED_WINDOW_USECS;
436 uint32_t wq_stalled_window_usecs = WQ_STALLED_WINDOW_USECS;
437 uint32_t wq_reduce_pool_window_usecs = WQ_REDUCE_POOL_WINDOW_USECS;
438 uint32_t wq_max_timer_interval_usecs = WQ_MAX_TIMER_INTERVAL_USECS;
439 uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS;
440 uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8;
441
442
443 SYSCTL_INT(_kern, OID_AUTO, wq_yielded_threshold, CTLFLAG_RW | CTLFLAG_LOCKED,
444 &wq_yielded_threshold, 0, "");
445
446 SYSCTL_INT(_kern, OID_AUTO, wq_yielded_window_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
447 &wq_yielded_window_usecs, 0, "");
448
449 SYSCTL_INT(_kern, OID_AUTO, wq_stalled_window_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
450 &wq_stalled_window_usecs, 0, "");
451
452 SYSCTL_INT(_kern, OID_AUTO, wq_reduce_pool_window_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
453 &wq_reduce_pool_window_usecs, 0, "");
454
455 SYSCTL_INT(_kern, OID_AUTO, wq_max_timer_interval_usecs, CTLFLAG_RW | CTLFLAG_LOCKED,
456 &wq_max_timer_interval_usecs, 0, "");
457
458 SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
459 &wq_max_threads, 0, "");
460
461 SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
462 &wq_max_constrained_threads, 0, "");
463
464
465 static uint32_t wq_init_constrained_limit = 1;
466
467
468 void
469 workqueue_init_lock(proc_t p)
470 {
471 lck_spin_init(&p->p_wqlock, pthread_lck_grp, pthread_lck_attr);
472
473 p->p_wqiniting = FALSE;
474 }
475
476 void
477 workqueue_destroy_lock(proc_t p)
478 {
479 lck_spin_destroy(&p->p_wqlock, pthread_lck_grp);
480 }
481
482
483 static void
484 workqueue_lock_spin(proc_t p)
485 {
486 lck_spin_lock(&p->p_wqlock);
487 }
488
489 static void
490 workqueue_unlock(proc_t p)
491 {
492 lck_spin_unlock(&p->p_wqlock);
493 }
494
495
496 static void
497 workqueue_interval_timer_start(struct workqueue *wq)
498 {
499 uint64_t deadline;
500
501 if (wq->wq_timer_interval == 0)
502 wq->wq_timer_interval = wq_stalled_window_usecs;
503 else {
504 wq->wq_timer_interval = wq->wq_timer_interval * 2;
505
506 if (wq->wq_timer_interval > wq_max_timer_interval_usecs)
507 wq->wq_timer_interval = wq_max_timer_interval_usecs;
508 }
509 clock_interval_to_deadline(wq->wq_timer_interval, 1000, &deadline);
510
511 thread_call_enter_delayed(wq->wq_atimer_call, deadline);
512
513 KERNEL_DEBUG(0xefffd110, wq, wq->wq_itemcount, wq->wq_flags, wq->wq_timer_interval, 0);
514 }
515
516
517 static boolean_t
518 wq_thread_is_busy(uint64_t cur_ts, uint64_t *lastblocked_tsp)
519 { clock_sec_t secs;
520 clock_usec_t usecs;
521 uint64_t lastblocked_ts;
522 uint64_t elapsed;
523
524 /*
525 * the timestamp is updated atomically w/o holding the workqueue lock
526 * so we need to do an atomic read of the 64 bits so that we don't see
527 * a mismatched pair of 32 bit reads... we accomplish this in an architecturally
528 * independent fashion by using OSCompareAndSwap64 to write back the
529 * value we grabbed... if it succeeds, then we have a good timestamp to
530 * evaluate... if it fails, we straddled grabbing the timestamp while it
531 * was being updated... treat a failed update as a busy thread since
532 * it implies we are about to see a really fresh timestamp anyway
533 */
534 lastblocked_ts = *lastblocked_tsp;
535
536 if ( !OSCompareAndSwap64((UInt64)lastblocked_ts, (UInt64)lastblocked_ts, lastblocked_tsp))
537 return (TRUE);
538
539 if (lastblocked_ts >= cur_ts) {
540 /*
541 * because the update of the timestamp when a thread blocks isn't
542 * serialized against us looking at it (i.e. we don't hold the workq lock)
543 * it's possible to have a timestamp that matches the current time or
544 * that even looks to be in the future relative to when we grabbed the current
545 * time... just treat this as a busy thread since it must have just blocked.
546 */
547 return (TRUE);
548 }
549 elapsed = cur_ts - lastblocked_ts;
550
551 absolutetime_to_microtime(elapsed, &secs, &usecs);
552
553 if (secs == 0 && usecs < wq_stalled_window_usecs)
554 return (TRUE);
555 return (FALSE);
556 }
557
558
559 #define WQ_TIMER_NEEDED(wq, start_timer) do { \
560 int oldflags = wq->wq_flags; \
561 \
562 if ( !(oldflags & (WQ_EXITING | WQ_ATIMER_RUNNING))) { \
563 if (OSCompareAndSwap(oldflags, oldflags | WQ_ATIMER_RUNNING, (UInt32 *)&wq->wq_flags)) \
564 start_timer = TRUE; \
565 } \
566 } while (0)
567
568
569
570 static void
571 workqueue_add_timer(struct workqueue *wq, __unused int param1)
572 {
573 proc_t p;
574 boolean_t start_timer = FALSE;
575 boolean_t retval;
576 boolean_t add_thread;
577 uint32_t busycount;
578
579 KERNEL_DEBUG(0xefffd108 | DBG_FUNC_START, wq, wq->wq_flags, wq->wq_nthreads, wq->wq_thidlecount, 0);
580
581 p = wq->wq_proc;
582
583 workqueue_lock_spin(p);
584
585 /*
586 * because workqueue_callback now runs w/o taking the workqueue lock
587 * we are unsynchronized w/r to a change in state of the running threads...
588 * to make sure we always evaluate that change, we allow it to start up
589 * a new timer if the current one is actively evalutating the state
590 * however, we do not need more than 2 timers fired up (1 active and 1 pending)
591 * and we certainly do not want 2 active timers evaluating the state
592 * simultaneously... so use WQL_ATIMER_BUSY to serialize the timers...
593 * note that WQL_ATIMER_BUSY is in a different flag word from WQ_ATIMER_RUNNING since
594 * it is always protected by the workq lock... WQ_ATIMER_RUNNING is evaluated
595 * and set atomimcally since the callback function needs to manipulate it
596 * w/o holding the workq lock...
597 *
598 * !WQ_ATIMER_RUNNING && !WQL_ATIMER_BUSY == no pending timer, no active timer
599 * !WQ_ATIMER_RUNNING && WQL_ATIMER_BUSY == no pending timer, 1 active timer
600 * WQ_ATIMER_RUNNING && !WQL_ATIMER_BUSY == 1 pending timer, no active timer
601 * WQ_ATIMER_RUNNING && WQL_ATIMER_BUSY == 1 pending timer, 1 active timer
602 */
603 while (wq->wq_lflags & WQL_ATIMER_BUSY) {
604 wq->wq_lflags |= WQL_ATIMER_WAITING;
605
606 assert_wait((caddr_t)wq, (THREAD_UNINT));
607 workqueue_unlock(p);
608
609 thread_block(THREAD_CONTINUE_NULL);
610
611 workqueue_lock_spin(p);
612 }
613 wq->wq_lflags |= WQL_ATIMER_BUSY;
614
615 /*
616 * the workq lock will protect us from seeing WQ_EXITING change state, but we
617 * still need to update this atomically in case someone else tries to start
618 * the timer just as we're releasing it
619 */
620 while ( !(OSCompareAndSwap(wq->wq_flags, (wq->wq_flags & ~WQ_ATIMER_RUNNING), (UInt32 *)&wq->wq_flags)));
621
622 again:
623 retval = TRUE;
624 add_thread = FALSE;
625
626 if ( !(wq->wq_flags & WQ_EXITING)) {
627 /*
628 * check to see if the stall frequency was beyond our tolerance
629 * or we have work on the queue, but haven't scheduled any
630 * new work within our acceptable time interval because
631 * there were no idle threads left to schedule
632 */
633 if (wq->wq_itemcount) {
634 uint32_t priority;
635 uint32_t affinity_tag;
636 uint32_t i;
637 uint64_t curtime;
638
639 for (priority = 0; priority < WORKQUEUE_NUMPRIOS; priority++) {
640 if (wq->wq_list_bitmap & (1 << priority))
641 break;
642 }
643 assert(priority < WORKQUEUE_NUMPRIOS);
644
645 curtime = mach_absolute_time();
646 busycount = 0;
647
648 for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; affinity_tag++) {
649 /*
650 * if we have no idle threads, we can try to add them if needed
651 */
652 if (wq->wq_thidlecount == 0)
653 add_thread = TRUE;
654
655 /*
656 * look for first affinity group that is currently not active
657 * i.e. no active threads at this priority level or higher
658 * and has not been active recently at this priority level or higher
659 */
660 for (i = 0; i <= priority; i++) {
661 if (wq->wq_thactive_count[i][affinity_tag]) {
662 add_thread = FALSE;
663 break;
664 }
665 if (wq->wq_thscheduled_count[i][affinity_tag]) {
666 if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i][affinity_tag])) {
667 add_thread = FALSE;
668 busycount++;
669 break;
670 }
671 }
672 }
673 if (add_thread == TRUE) {
674 retval = workqueue_addnewthread(wq, FALSE);
675 break;
676 }
677 }
678 if (wq->wq_itemcount) {
679 /*
680 * as long as we have threads to schedule, and we successfully
681 * scheduled new work, keep trying
682 */
683 while (wq->wq_thidlecount && !(wq->wq_flags & WQ_EXITING)) {
684 /*
685 * workqueue_run_nextitem is responsible for
686 * dropping the workqueue lock in all cases
687 */
688 retval = workqueue_run_nextitem(p, wq, THREAD_NULL, 0, 0, 0);
689 workqueue_lock_spin(p);
690
691 if (retval == FALSE)
692 break;
693 }
694 if ( !(wq->wq_flags & WQ_EXITING) && wq->wq_itemcount) {
695
696 if (wq->wq_thidlecount == 0 && retval == TRUE && add_thread == TRUE)
697 goto again;
698
699 if (wq->wq_thidlecount == 0 || busycount)
700 WQ_TIMER_NEEDED(wq, start_timer);
701
702 KERNEL_DEBUG(0xefffd108 | DBG_FUNC_NONE, wq, wq->wq_itemcount, wq->wq_thidlecount, busycount, 0);
703 }
704 }
705 }
706 }
707 if ( !(wq->wq_flags & WQ_ATIMER_RUNNING))
708 wq->wq_timer_interval = 0;
709
710 wq->wq_lflags &= ~WQL_ATIMER_BUSY;
711
712 if ((wq->wq_flags & WQ_EXITING) || (wq->wq_lflags & WQL_ATIMER_WAITING)) {
713 /*
714 * wakeup the thread hung up in workqueue_exit or workqueue_add_timer waiting for this timer
715 * to finish getting out of the way
716 */
717 wq->wq_lflags &= ~WQL_ATIMER_WAITING;
718 wakeup(wq);
719 }
720 KERNEL_DEBUG(0xefffd108 | DBG_FUNC_END, wq, start_timer, wq->wq_nthreads, wq->wq_thidlecount, 0);
721
722 workqueue_unlock(p);
723
724 if (start_timer == TRUE)
725 workqueue_interval_timer_start(wq);
726 }
727
728
729 void
730 workqueue_thread_yielded(void)
731 {
732 struct workqueue *wq;
733 proc_t p;
734
735 p = current_proc();
736
737 if ((wq = p->p_wqptr) == NULL || wq->wq_itemcount == 0)
738 return;
739
740 workqueue_lock_spin(p);
741
742 if (wq->wq_itemcount) {
743 uint64_t curtime;
744 uint64_t elapsed;
745 clock_sec_t secs;
746 clock_usec_t usecs;
747
748 if (wq->wq_thread_yielded_count++ == 0)
749 wq->wq_thread_yielded_timestamp = mach_absolute_time();
750
751 if (wq->wq_thread_yielded_count < wq_yielded_threshold) {
752 workqueue_unlock(p);
753 return;
754 }
755 KERNEL_DEBUG(0xefffd138 | DBG_FUNC_START, wq, wq->wq_thread_yielded_count, wq->wq_itemcount, 0, 0);
756
757 wq->wq_thread_yielded_count = 0;
758
759 curtime = mach_absolute_time();
760 elapsed = curtime - wq->wq_thread_yielded_timestamp;
761 absolutetime_to_microtime(elapsed, &secs, &usecs);
762
763 if (secs == 0 && usecs < wq_yielded_window_usecs) {
764
765 if (wq->wq_thidlecount == 0) {
766 workqueue_addnewthread(wq, TRUE);
767 /*
768 * 'workqueue_addnewthread' drops the workqueue lock
769 * when creating the new thread and then retakes it before
770 * returning... this window allows other threads to process
771 * work on the queue, so we need to recheck for available work
772 * if none found, we just return... the newly created thread
773 * will eventually get used (if it hasn't already)...
774 */
775 if (wq->wq_itemcount == 0) {
776 workqueue_unlock(p);
777 return;
778 }
779 }
780 if (wq->wq_thidlecount) {
781 uint32_t priority;
782 uint32_t affinity = -1;
783 user_addr_t item;
784 struct workitem *witem = NULL;
785 struct workitemlist *wl = NULL;
786 struct uthread *uth;
787 struct threadlist *tl;
788
789 uth = get_bsdthread_info(current_thread());
790 if ((tl = uth->uu_threadlist))
791 affinity = tl->th_affinity_tag;
792
793 for (priority = 0; priority < WORKQUEUE_NUMPRIOS; priority++) {
794 if (wq->wq_list_bitmap & (1 << priority)) {
795 wl = (struct workitemlist *)&wq->wq_list[priority];
796 break;
797 }
798 }
799 assert(wl != NULL);
800 assert(!(TAILQ_EMPTY(&wl->wl_itemlist)));
801
802 witem = TAILQ_FIRST(&wl->wl_itemlist);
803 TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry);
804
805 if (TAILQ_EMPTY(&wl->wl_itemlist))
806 wq->wq_list_bitmap &= ~(1 << priority);
807 wq->wq_itemcount--;
808
809 item = witem->wi_item;
810 witem->wi_item = (user_addr_t)0;
811 witem->wi_affinity = 0;
812
813 TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, wi_entry);
814
815 (void)workqueue_run_nextitem(p, wq, THREAD_NULL, item, priority, affinity);
816 /*
817 * workqueue_run_nextitem is responsible for
818 * dropping the workqueue lock in all cases
819 */
820 KERNEL_DEBUG(0xefffd138 | DBG_FUNC_END, wq, wq->wq_thread_yielded_count, wq->wq_itemcount, 1, 0);
821
822 return;
823 }
824 }
825 KERNEL_DEBUG(0xefffd138 | DBG_FUNC_END, wq, wq->wq_thread_yielded_count, wq->wq_itemcount, 2, 0);
826 }
827 workqueue_unlock(p);
828 }
829
830
831
832 static void
833 workqueue_callback(int type, thread_t thread)
834 {
835 struct uthread *uth;
836 struct threadlist *tl;
837 struct workqueue *wq;
838
839 uth = get_bsdthread_info(thread);
840 tl = uth->uu_threadlist;
841 wq = tl->th_workq;
842
843 switch (type) {
844
845 case SCHED_CALL_BLOCK:
846 {
847 uint32_t old_activecount;
848
849 old_activecount = OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority][tl->th_affinity_tag]);
850
851 if (old_activecount == 1) {
852 boolean_t start_timer = FALSE;
853 uint64_t curtime;
854 UInt64 *lastblocked_ptr;
855
856 /*
857 * we were the last active thread on this affinity set
858 * and we've got work to do
859 */
860 lastblocked_ptr = (UInt64 *)&wq->wq_lastblocked_ts[tl->th_priority][tl->th_affinity_tag];
861 curtime = mach_absolute_time();
862
863 /*
864 * if we collide with another thread trying to update the last_blocked (really unlikely
865 * since another thread would have to get scheduled and then block after we start down
866 * this path), it's not a problem. Either timestamp is adequate, so no need to retry
867 */
868
869 OSCompareAndSwap64(*lastblocked_ptr, (UInt64)curtime, lastblocked_ptr);
870
871 if (wq->wq_itemcount)
872 WQ_TIMER_NEEDED(wq, start_timer);
873
874 if (start_timer == TRUE)
875 workqueue_interval_timer_start(wq);
876 }
877 KERNEL_DEBUG1(0xefffd020 | DBG_FUNC_START, wq, old_activecount, tl->th_priority, tl->th_affinity_tag, thread_tid(thread));
878 }
879 break;
880
881 case SCHED_CALL_UNBLOCK:
882 /*
883 * we cannot take the workqueue_lock here...
884 * an UNBLOCK can occur from a timer event which
885 * is run from an interrupt context... if the workqueue_lock
886 * is already held by this processor, we'll deadlock...
887 * the thread lock for the thread being UNBLOCKED
888 * is also held
889 */
890 OSAddAtomic(1, &wq->wq_thactive_count[tl->th_priority][tl->th_affinity_tag]);
891
892 KERNEL_DEBUG1(0xefffd020 | DBG_FUNC_END, wq, wq->wq_threads_scheduled, tl->th_priority, tl->th_affinity_tag, thread_tid(thread));
893
894 break;
895 }
896 }
897
898
899 static void
900 workqueue_removethread(struct threadlist *tl)
901 {
902 struct workqueue *wq;
903 struct uthread * uth;
904
905 wq = tl->th_workq;
906
907 TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
908
909 wq->wq_nthreads--;
910 wq->wq_thidlecount--;
911
912 /*
913 * Clear the threadlist pointer in uthread so
914 * blocked thread on wakeup for termination will
915 * not access the thread list as it is going to be
916 * freed.
917 */
918 thread_sched_call(tl->th_thread, NULL);
919
920 uth = get_bsdthread_info(tl->th_thread);
921 if (uth != (struct uthread *)0) {
922 uth->uu_threadlist = NULL;
923 }
924 workqueue_unlock(wq->wq_proc);
925
926 if ( (tl->th_flags & TH_LIST_SUSPENDED) ) {
927 /*
928 * thread was created, but never used...
929 * need to clean up the stack and port ourselves
930 * since we're not going to spin up through the
931 * normal exit path triggered from Libc
932 */
933 (void)mach_vm_deallocate(wq->wq_map, tl->th_stackaddr, tl->th_allocsize);
934 (void)mach_port_deallocate(get_task_ipcspace(wq->wq_task), tl->th_thport);
935
936 KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_END, wq, (uintptr_t)thread_tid(current_thread()), wq->wq_nthreads, 0xdead, thread_tid(tl->th_thread));
937 } else {
938
939 KERNEL_DEBUG1(0xefffd018 | DBG_FUNC_END, wq, (uintptr_t)thread_tid(current_thread()), wq->wq_nthreads, 0xdead, thread_tid(tl->th_thread));
940 }
941 /*
942 * drop our ref on the thread
943 */
944 thread_deallocate(tl->th_thread);
945
946 kfree(tl, sizeof(struct threadlist));
947 }
948
949
950 /*
951 * called with workq lock held
952 * dropped and retaken around thread creation
953 * return with workq lock held
954 */
955 static boolean_t
956 workqueue_addnewthread(struct workqueue *wq, boolean_t oc_thread)
957 {
958 struct threadlist *tl;
959 struct uthread *uth;
960 kern_return_t kret;
961 thread_t th;
962 proc_t p;
963 void *sright;
964 mach_vm_offset_t stackaddr;
965
966 if (wq->wq_nthreads >= wq_max_threads || wq->wq_nthreads >= (CONFIG_THREAD_MAX - 20)) {
967 wq->wq_lflags |= WQL_EXCEEDED_TOTAL_THREAD_LIMIT;
968 return (FALSE);
969 }
970 wq->wq_lflags &= ~WQL_EXCEEDED_TOTAL_THREAD_LIMIT;
971
972 if (oc_thread == FALSE && wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
973 /*
974 * if we're not creating this thread to service an overcommit request,
975 * then check the size of the constrained thread pool... if we've already
976 * reached our max for threads scheduled from this pool, don't create a new
977 * one... the callers of this function are prepared for failure.
978 */
979 wq->wq_lflags |= WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
980 return (FALSE);
981 }
982 if (wq->wq_constrained_threads_scheduled < wq_max_constrained_threads)
983 wq->wq_lflags &= ~WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
984
985 wq->wq_nthreads++;
986
987 p = wq->wq_proc;
988 workqueue_unlock(p);
989
990 kret = thread_create_workq(wq->wq_task, (thread_continue_t)wq_unsuspend_continue, &th);
991
992 if (kret != KERN_SUCCESS)
993 goto failed;
994
995 tl = kalloc(sizeof(struct threadlist));
996 bzero(tl, sizeof(struct threadlist));
997
998 #if defined(__i386__) || defined(__x86_64__)
999 stackaddr = 0xB0000000;
1000 #else
1001 #error Need to define a stack address hint for this architecture
1002 #endif
1003 tl->th_allocsize = PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE + p->p_pthsize;
1004
1005 kret = mach_vm_map(wq->wq_map, &stackaddr,
1006 tl->th_allocsize,
1007 page_size-1,
1008 VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE , NULL,
1009 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL,
1010 VM_INHERIT_DEFAULT);
1011
1012 if (kret != KERN_SUCCESS) {
1013 kret = mach_vm_allocate(wq->wq_map,
1014 &stackaddr, tl->th_allocsize,
1015 VM_MAKE_TAG(VM_MEMORY_STACK) | VM_FLAGS_ANYWHERE);
1016 }
1017 if (kret == KERN_SUCCESS) {
1018 /*
1019 * The guard page is at the lowest address
1020 * The stack base is the highest address
1021 */
1022 kret = mach_vm_protect(wq->wq_map, stackaddr, PTH_DEFAULT_GUARDSIZE, FALSE, VM_PROT_NONE);
1023
1024 if (kret != KERN_SUCCESS)
1025 (void) mach_vm_deallocate(wq->wq_map, stackaddr, tl->th_allocsize);
1026 }
1027 if (kret != KERN_SUCCESS) {
1028 (void) thread_terminate(th);
1029 thread_deallocate(th);
1030
1031 kfree(tl, sizeof(struct threadlist));
1032 goto failed;
1033 }
1034 thread_reference(th);
1035
1036 sright = (void *) convert_thread_to_port(th);
1037 tl->th_thport = ipc_port_copyout_send(sright, get_task_ipcspace(wq->wq_task));
1038
1039 thread_static_param(th, TRUE);
1040
1041 tl->th_flags = TH_LIST_INITED | TH_LIST_SUSPENDED;
1042
1043 tl->th_thread = th;
1044 tl->th_workq = wq;
1045 tl->th_stackaddr = stackaddr;
1046 tl->th_affinity_tag = -1;
1047 tl->th_priority = WORKQUEUE_NUMPRIOS;
1048 tl->th_policy = -1;
1049
1050 uth = get_bsdthread_info(tl->th_thread);
1051 uth->uu_threadlist = (void *)tl;
1052
1053 workqueue_lock_spin(p);
1054
1055 TAILQ_INSERT_TAIL(&wq->wq_thidlelist, tl, th_entry);
1056
1057 wq->wq_thidlecount++;
1058
1059 KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_START, wq, wq->wq_nthreads, 0, thread_tid(current_thread()), thread_tid(tl->th_thread));
1060
1061 return (TRUE);
1062
1063 failed:
1064 workqueue_lock_spin(p);
1065 wq->wq_nthreads--;
1066
1067 return (FALSE);
1068 }
1069
1070
1071 int
1072 workq_open(struct proc *p, __unused struct workq_open_args *uap, __unused int32_t *retval)
1073 {
1074 struct workqueue * wq;
1075 int wq_size;
1076 char * ptr;
1077 char * nptr;
1078 int j;
1079 uint32_t i;
1080 uint32_t num_cpus;
1081 int error = 0;
1082 boolean_t need_wakeup = FALSE;
1083 struct workitem * witem;
1084 struct workitemlist *wl;
1085
1086 if ((p->p_lflag & P_LREGISTER) == 0)
1087 return(EINVAL);
1088
1089 num_cpus = ml_get_max_cpus();
1090
1091 if (wq_init_constrained_limit) {
1092 uint32_t limit;
1093 /*
1094 * set up the limit for the constrained pool
1095 * this is a virtual pool in that we don't
1096 * maintain it on a separate idle and run list
1097 */
1098 limit = num_cpus * (WORKQUEUE_NUMPRIOS + 1);
1099
1100 if (limit > wq_max_constrained_threads)
1101 wq_max_constrained_threads = limit;
1102
1103 wq_init_constrained_limit = 0;
1104 }
1105 workqueue_lock_spin(p);
1106
1107 if (p->p_wqptr == NULL) {
1108
1109 while (p->p_wqiniting == TRUE) {
1110
1111 assert_wait((caddr_t)&p->p_wqiniting, THREAD_UNINT);
1112 workqueue_unlock(p);
1113
1114 thread_block(THREAD_CONTINUE_NULL);
1115
1116 workqueue_lock_spin(p);
1117 }
1118 if (p->p_wqptr != NULL)
1119 goto out;
1120
1121 p->p_wqiniting = TRUE;
1122
1123 workqueue_unlock(p);
1124
1125 wq_size = sizeof(struct workqueue) +
1126 (num_cpus * WORKQUEUE_NUMPRIOS * sizeof(uint32_t)) +
1127 (num_cpus * WORKQUEUE_NUMPRIOS * sizeof(uint32_t)) +
1128 (num_cpus * WORKQUEUE_NUMPRIOS * sizeof(uint64_t)) +
1129 sizeof(uint64_t);
1130
1131 ptr = (char *)kalloc(wq_size);
1132 bzero(ptr, wq_size);
1133
1134 wq = (struct workqueue *)ptr;
1135 wq->wq_flags = WQ_LIST_INITED;
1136 wq->wq_proc = p;
1137 wq->wq_affinity_max = num_cpus;
1138 wq->wq_task = current_task();
1139 wq->wq_map = current_map();
1140
1141 for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
1142 wl = (struct workitemlist *)&wq->wq_list[i];
1143 TAILQ_INIT(&wl->wl_itemlist);
1144 TAILQ_INIT(&wl->wl_freelist);
1145
1146 for (j = 0; j < WORKITEM_SIZE; j++) {
1147 witem = &wq->wq_array[(i*WORKITEM_SIZE) + j];
1148 TAILQ_INSERT_TAIL(&wl->wl_freelist, witem, wi_entry);
1149 }
1150 wq->wq_reqconc[i] = wq->wq_affinity_max;
1151 }
1152 nptr = ptr + sizeof(struct workqueue);
1153
1154 for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
1155 wq->wq_thactive_count[i] = (uint32_t *)nptr;
1156 nptr += (num_cpus * sizeof(uint32_t));
1157 }
1158 for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
1159 wq->wq_thscheduled_count[i] = (uint32_t *)nptr;
1160 nptr += (num_cpus * sizeof(uint32_t));
1161 }
1162 /*
1163 * align nptr on a 64 bit boundary so that we can do nice
1164 * atomic64 operations on the timestamps...
1165 * note that we requested an extra uint64_t when calcuating
1166 * the size for the allocation of the workqueue struct
1167 */
1168 nptr += (sizeof(uint64_t) - 1);
1169 nptr = (char *)((uintptr_t)nptr & ~(sizeof(uint64_t) - 1));
1170
1171 for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
1172 wq->wq_lastblocked_ts[i] = (uint64_t *)nptr;
1173 nptr += (num_cpus * sizeof(uint64_t));
1174 }
1175 TAILQ_INIT(&wq->wq_thrunlist);
1176 TAILQ_INIT(&wq->wq_thidlelist);
1177
1178 wq->wq_atimer_call = thread_call_allocate((thread_call_func_t)workqueue_add_timer, (thread_call_param_t)wq);
1179
1180 workqueue_lock_spin(p);
1181
1182 p->p_wqptr = (void *)wq;
1183 p->p_wqsize = wq_size;
1184
1185 p->p_wqiniting = FALSE;
1186 need_wakeup = TRUE;
1187 }
1188 out:
1189 workqueue_unlock(p);
1190
1191 if (need_wakeup == TRUE)
1192 wakeup(&p->p_wqiniting);
1193 return(error);
1194 }
1195
1196 int
1197 workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, __unused int32_t *retval)
1198 {
1199 user_addr_t item = uap->item;
1200 int options = uap->options;
1201 int prio = uap->prio; /* should be used to find the right workqueue */
1202 int affinity = uap->affinity;
1203 int error = 0;
1204 thread_t th = THREAD_NULL;
1205 user_addr_t oc_item = 0;
1206 struct workqueue *wq;
1207
1208 if ((p->p_lflag & P_LREGISTER) == 0)
1209 return(EINVAL);
1210
1211 /*
1212 * affinity not yet hooked up on this path
1213 */
1214 affinity = -1;
1215
1216 switch (options) {
1217
1218 case WQOPS_QUEUE_ADD: {
1219
1220 if (prio & WORKQUEUE_OVERCOMMIT) {
1221 prio &= ~WORKQUEUE_OVERCOMMIT;
1222 oc_item = item;
1223 }
1224 if ((prio < 0) || (prio >= WORKQUEUE_NUMPRIOS))
1225 return (EINVAL);
1226
1227 workqueue_lock_spin(p);
1228
1229 if ((wq = (struct workqueue *)p->p_wqptr) == NULL) {
1230 workqueue_unlock(p);
1231 return (EINVAL);
1232 }
1233 if (wq->wq_thidlecount == 0 && (oc_item || (wq->wq_constrained_threads_scheduled < wq->wq_affinity_max))) {
1234
1235 workqueue_addnewthread(wq, oc_item ? TRUE : FALSE);
1236
1237 if (wq->wq_thidlecount == 0)
1238 oc_item = 0;
1239 }
1240 if (oc_item == 0)
1241 error = workqueue_additem(wq, prio, item, affinity);
1242
1243 KERNEL_DEBUG(0xefffd008 | DBG_FUNC_NONE, wq, prio, affinity, oc_item, 0);
1244 }
1245 break;
1246 case WQOPS_THREAD_RETURN: {
1247
1248 th = current_thread();
1249 struct uthread *uth = get_bsdthread_info(th);
1250
1251 /* reset signal mask on the workqueue thread to default state */
1252 if (uth->uu_sigmask != (sigset_t)(~workq_threadmask)) {
1253 proc_lock(p);
1254 uth->uu_sigmask = ~workq_threadmask;
1255 proc_unlock(p);
1256 }
1257
1258 workqueue_lock_spin(p);
1259
1260 if ((wq = (struct workqueue *)p->p_wqptr) == NULL || (uth->uu_threadlist == NULL)) {
1261 workqueue_unlock(p);
1262 return (EINVAL);
1263 }
1264 KERNEL_DEBUG(0xefffd004 | DBG_FUNC_END, wq, 0, 0, 0, 0);
1265 }
1266 break;
1267 case WQOPS_THREAD_SETCONC: {
1268
1269 if ((prio < 0) || (prio > WORKQUEUE_NUMPRIOS))
1270 return (EINVAL);
1271
1272 workqueue_lock_spin(p);
1273
1274 if ((wq = (struct workqueue *)p->p_wqptr) == NULL) {
1275 workqueue_unlock(p);
1276 return (EINVAL);
1277 }
1278 /*
1279 * for this operation, we re-purpose the affinity
1280 * argument as the concurrency target
1281 */
1282 if (prio < WORKQUEUE_NUMPRIOS)
1283 wq->wq_reqconc[prio] = affinity;
1284 else {
1285 for (prio = 0; prio < WORKQUEUE_NUMPRIOS; prio++)
1286 wq->wq_reqconc[prio] = affinity;
1287
1288 }
1289 }
1290 break;
1291 default:
1292 return (EINVAL);
1293 }
1294 (void)workqueue_run_nextitem(p, wq, th, oc_item, prio, affinity);
1295 /*
1296 * workqueue_run_nextitem is responsible for
1297 * dropping the workqueue lock in all cases
1298 */
1299 return (error);
1300
1301 }
1302
1303 void
1304 workqueue_exit(struct proc *p)
1305 {
1306 struct workqueue * wq;
1307 struct threadlist * tl, *tlist;
1308 struct uthread *uth;
1309 int wq_size = 0;
1310
1311 if (p->p_wqptr != NULL) {
1312
1313 KERNEL_DEBUG(0x900808c | DBG_FUNC_START, p->p_wqptr, 0, 0, 0, 0);
1314
1315 workqueue_lock_spin(p);
1316
1317 wq = (struct workqueue *)p->p_wqptr;
1318
1319 if (wq == NULL) {
1320 workqueue_unlock(p);
1321
1322 KERNEL_DEBUG(0x900808c | DBG_FUNC_END, 0, 0, 0, -1, 0);
1323 return;
1324 }
1325 wq_size = p->p_wqsize;
1326 p->p_wqptr = NULL;
1327 p->p_wqsize = 0;
1328
1329 /*
1330 * we now arm the timer in the callback function w/o holding the workq lock...
1331 * we do this by setting WQ_ATIMER_RUNNING via OSCompareAndSwap in order to
1332 * insure only a single timer if running and to notice that WQ_EXITING has
1333 * been set (we don't want to start a timer once WQ_EXITING is posted)
1334 *
1335 * so once we have successfully set WQ_EXITING, we cannot fire up a new timer...
1336 * therefor no need to clear the timer state atomically from the flags
1337 *
1338 * since we always hold the workq lock when dropping WQ_ATIMER_RUNNING
1339 * the check for and sleep until clear is protected
1340 */
1341 while ( !(OSCompareAndSwap(wq->wq_flags, (wq->wq_flags | WQ_EXITING), (UInt32 *)&wq->wq_flags)));
1342
1343 if (wq->wq_flags & WQ_ATIMER_RUNNING) {
1344 if (thread_call_cancel(wq->wq_atimer_call) == TRUE)
1345 wq->wq_flags &= ~WQ_ATIMER_RUNNING;
1346 }
1347 while ((wq->wq_flags & WQ_ATIMER_RUNNING) || (wq->wq_lflags & WQL_ATIMER_BUSY)) {
1348
1349 assert_wait((caddr_t)wq, (THREAD_UNINT));
1350 workqueue_unlock(p);
1351
1352 thread_block(THREAD_CONTINUE_NULL);
1353
1354 workqueue_lock_spin(p);
1355 }
1356 workqueue_unlock(p);
1357
1358 TAILQ_FOREACH_SAFE(tl, &wq->wq_thrunlist, th_entry, tlist) {
1359
1360 thread_sched_call(tl->th_thread, NULL);
1361
1362 uth = get_bsdthread_info(tl->th_thread);
1363 if (uth != (struct uthread *)0) {
1364 uth->uu_threadlist = NULL;
1365 }
1366 TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry);
1367
1368 /*
1369 * drop our last ref on the thread
1370 */
1371 thread_deallocate(tl->th_thread);
1372
1373 kfree(tl, sizeof(struct threadlist));
1374 }
1375 TAILQ_FOREACH_SAFE(tl, &wq->wq_thidlelist, th_entry, tlist) {
1376
1377 thread_sched_call(tl->th_thread, NULL);
1378
1379 uth = get_bsdthread_info(tl->th_thread);
1380 if (uth != (struct uthread *)0) {
1381 uth->uu_threadlist = NULL;
1382 }
1383 TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
1384
1385 /*
1386 * drop our last ref on the thread
1387 */
1388 thread_deallocate(tl->th_thread);
1389
1390 kfree(tl, sizeof(struct threadlist));
1391 }
1392 thread_call_free(wq->wq_atimer_call);
1393
1394 kfree(wq, wq_size);
1395
1396 KERNEL_DEBUG(0x900808c | DBG_FUNC_END, 0, 0, 0, 0, 0);
1397 }
1398 }
1399
1400 static int
1401 workqueue_additem(struct workqueue *wq, int prio, user_addr_t item, int affinity)
1402 {
1403 struct workitem *witem;
1404 struct workitemlist *wl;
1405
1406 wl = (struct workitemlist *)&wq->wq_list[prio];
1407
1408 if (TAILQ_EMPTY(&wl->wl_freelist))
1409 return (ENOMEM);
1410
1411 witem = (struct workitem *)TAILQ_FIRST(&wl->wl_freelist);
1412 TAILQ_REMOVE(&wl->wl_freelist, witem, wi_entry);
1413
1414 witem->wi_item = item;
1415 witem->wi_affinity = affinity;
1416 TAILQ_INSERT_TAIL(&wl->wl_itemlist, witem, wi_entry);
1417
1418 wq->wq_list_bitmap |= (1 << prio);
1419
1420 wq->wq_itemcount++;
1421
1422 return (0);
1423 }
1424
1425 static int workqueue_importance[WORKQUEUE_NUMPRIOS] =
1426 {
1427 2, 0, -2, INT_MIN,
1428 };
1429
1430 #define WORKQ_POLICY_TIMESHARE 1
1431
1432 static int workqueue_policy[WORKQUEUE_NUMPRIOS] =
1433 {
1434 WORKQ_POLICY_TIMESHARE, WORKQ_POLICY_TIMESHARE, WORKQ_POLICY_TIMESHARE, WORKQ_POLICY_TIMESHARE
1435 };
1436
1437
1438 /*
1439 * workqueue_run_nextitem:
1440 * called with the workqueue lock held...
1441 * responsible for dropping it in all cases
1442 */
1443 static boolean_t
1444 workqueue_run_nextitem(proc_t p, struct workqueue *wq, thread_t thread, user_addr_t oc_item, int oc_prio, int oc_affinity)
1445 {
1446 struct workitem *witem = NULL;
1447 user_addr_t item = 0;
1448 thread_t th_to_run = THREAD_NULL;
1449 thread_t th_to_park = THREAD_NULL;
1450 int wake_thread = 0;
1451 int reuse_thread = 1;
1452 uint32_t priority, orig_priority;
1453 uint32_t affinity_tag, orig_affinity_tag;
1454 uint32_t i, n;
1455 uint32_t activecount;
1456 uint32_t busycount;
1457 uint32_t us_to_wait;
1458 struct threadlist *tl = NULL;
1459 struct threadlist *ttl = NULL;
1460 struct uthread *uth = NULL;
1461 struct workitemlist *wl = NULL;
1462 boolean_t start_timer = FALSE;
1463 boolean_t adjust_counters = TRUE;
1464 uint64_t curtime;
1465
1466
1467 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_START, wq, thread, wq->wq_thidlecount, wq->wq_itemcount, 0);
1468
1469 /*
1470 * from here until we drop the workq lock
1471 * we can't be pre-empted since we hold
1472 * the lock in spin mode... this is important
1473 * since we have to independently update the priority
1474 * and affinity that the thread is associated with
1475 * and these values are used to index the multi-dimensional
1476 * counter arrays in 'workqueue_callback'
1477 */
1478 if (oc_item) {
1479 uint32_t min_scheduled = 0;
1480 uint32_t scheduled_count;
1481 uint32_t active_count;
1482 uint32_t t_affinity = 0;
1483
1484 priority = oc_prio;
1485 item = oc_item;
1486
1487 if ((affinity_tag = oc_affinity) == (uint32_t)-1) {
1488 for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; affinity_tag++) {
1489 /*
1490 * look for the affinity group with the least number of threads
1491 */
1492 scheduled_count = 0;
1493 active_count = 0;
1494
1495 for (i = 0; i <= priority; i++) {
1496 scheduled_count += wq->wq_thscheduled_count[i][affinity_tag];
1497 active_count += wq->wq_thactive_count[i][affinity_tag];
1498 }
1499 if (active_count == 0) {
1500 t_affinity = affinity_tag;
1501 break;
1502 }
1503 if (affinity_tag == 0 || scheduled_count < min_scheduled) {
1504 min_scheduled = scheduled_count;
1505 t_affinity = affinity_tag;
1506 }
1507 }
1508 affinity_tag = t_affinity;
1509 }
1510 goto grab_idle_thread;
1511 }
1512 /*
1513 * if we get here, the work should be handled by a constrained thread
1514 */
1515 if (wq->wq_itemcount == 0 || wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
1516 /*
1517 * no work to do, or we're already at or over the scheduling limit for
1518 * constrained threads... just return or park the thread...
1519 * do not start the timer for this condition... if we don't have any work,
1520 * we'll check again when new work arrives... if we're over the limit, we need 1 or more
1521 * constrained threads to return to the kernel before we can dispatch work from our queue
1522 */
1523 if ((th_to_park = thread) == THREAD_NULL)
1524 goto out_of_work;
1525 goto parkit;
1526 }
1527 for (priority = 0; priority < WORKQUEUE_NUMPRIOS; priority++) {
1528 if (wq->wq_list_bitmap & (1 << priority)) {
1529 wl = (struct workitemlist *)&wq->wq_list[priority];
1530 break;
1531 }
1532 }
1533 assert(wl != NULL);
1534 assert(!(TAILQ_EMPTY(&wl->wl_itemlist)));
1535
1536 curtime = mach_absolute_time();
1537
1538 if (thread != THREAD_NULL) {
1539 uth = get_bsdthread_info(thread);
1540 tl = uth->uu_threadlist;
1541 affinity_tag = tl->th_affinity_tag;
1542
1543 /*
1544 * check to see if the affinity group this thread is
1545 * associated with is still within the bounds of the
1546 * specified concurrency for the priority level
1547 * we're considering running work for
1548 */
1549 if (affinity_tag < wq->wq_reqconc[priority]) {
1550 /*
1551 * we're a worker thread from the pool... currently we
1552 * are considered 'active' which means we're counted
1553 * in "wq_thactive_count"
1554 * add up the active counts of all the priority levels
1555 * up to and including the one we want to schedule
1556 */
1557 for (activecount = 0, i = 0; i <= priority; i++) {
1558 uint32_t acount;
1559
1560 acount = wq->wq_thactive_count[i][affinity_tag];
1561
1562 if (acount == 0 && wq->wq_thscheduled_count[i][affinity_tag]) {
1563 if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i][affinity_tag]))
1564 acount = 1;
1565 }
1566 activecount += acount;
1567 }
1568 if (activecount == 1) {
1569 /*
1570 * we're the only active thread associated with our
1571 * affinity group at this priority level and higher,
1572 * so pick up some work and keep going
1573 */
1574 th_to_run = thread;
1575 goto pick_up_work;
1576 }
1577 }
1578 /*
1579 * there's more than 1 thread running in this affinity group
1580 * or the concurrency level has been cut back for this priority...
1581 * lets continue on and look for an 'empty' group to run this
1582 * work item in
1583 */
1584 }
1585 busycount = 0;
1586
1587 for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; affinity_tag++) {
1588 /*
1589 * look for first affinity group that is currently not active
1590 * i.e. no active threads at this priority level or higher
1591 * and no threads that have run recently
1592 */
1593 for (activecount = 0, i = 0; i <= priority; i++) {
1594 if ((activecount = wq->wq_thactive_count[i][affinity_tag]))
1595 break;
1596
1597 if (wq->wq_thscheduled_count[i][affinity_tag]) {
1598 if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i][affinity_tag])) {
1599 busycount++;
1600 break;
1601 }
1602 }
1603 }
1604 if (activecount == 0 && busycount == 0)
1605 break;
1606 }
1607 if (affinity_tag >= wq->wq_reqconc[priority]) {
1608 /*
1609 * we've already got at least 1 thread per
1610 * affinity group in the active state...
1611 */
1612 if (busycount) {
1613 /*
1614 * we found at least 1 thread in the
1615 * 'busy' state... make sure we start
1616 * the timer because if they are the only
1617 * threads keeping us from scheduling
1618 * this workitem, we won't get a callback
1619 * to kick off the timer... we need to
1620 * start it now...
1621 */
1622 WQ_TIMER_NEEDED(wq, start_timer);
1623 }
1624 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_NONE, wq, busycount, start_timer, 0, 0);
1625
1626 if (thread != THREAD_NULL) {
1627 /*
1628 * go park this one for later
1629 */
1630 th_to_park = thread;
1631 goto parkit;
1632 }
1633 goto out_of_work;
1634 }
1635 if (thread != THREAD_NULL) {
1636 /*
1637 * we're overbooked on the affinity group this thread is
1638 * currently associated with, but we have work to do
1639 * and at least 1 idle processor, so we'll just retarget
1640 * this thread to a new affinity group
1641 */
1642 th_to_run = thread;
1643 goto pick_up_work;
1644 }
1645 if (wq->wq_thidlecount == 0) {
1646 /*
1647 * we don't have a thread to schedule, but we have
1648 * work to do and at least 1 affinity group that
1649 * doesn't currently have an active thread...
1650 */
1651 WQ_TIMER_NEEDED(wq, start_timer);
1652
1653 KERNEL_DEBUG(0xefffd118, wq, wq->wq_nthreads, start_timer, 0, 0);
1654
1655 goto no_thread_to_run;
1656 }
1657
1658 grab_idle_thread:
1659 /*
1660 * we've got a candidate (affinity group with no currently
1661 * active threads) to start a new thread on...
1662 * we already know there is both work available
1663 * and an idle thread, so activate a thread and then
1664 * fall into the code that pulls a new workitem...
1665 */
1666 TAILQ_FOREACH(ttl, &wq->wq_thidlelist, th_entry) {
1667 if (ttl->th_affinity_tag == affinity_tag || ttl->th_affinity_tag == (uint16_t)-1) {
1668
1669 TAILQ_REMOVE(&wq->wq_thidlelist, ttl, th_entry);
1670 tl = ttl;
1671
1672 break;
1673 }
1674 }
1675 if (tl == NULL) {
1676 tl = TAILQ_FIRST(&wq->wq_thidlelist);
1677 TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
1678 }
1679 wq->wq_thidlecount--;
1680
1681 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, tl, th_entry);
1682
1683 if ((tl->th_flags & TH_LIST_SUSPENDED) == TH_LIST_SUSPENDED) {
1684 tl->th_flags &= ~TH_LIST_SUSPENDED;
1685 reuse_thread = 0;
1686
1687 } else if ((tl->th_flags & TH_LIST_BLOCKED) == TH_LIST_BLOCKED) {
1688 tl->th_flags &= ~TH_LIST_BLOCKED;
1689 wake_thread = 1;
1690 }
1691 tl->th_flags |= TH_LIST_RUNNING | TH_LIST_BUSY;
1692
1693 wq->wq_threads_scheduled++;
1694 wq->wq_thscheduled_count[priority][affinity_tag]++;
1695 OSAddAtomic(1, &wq->wq_thactive_count[priority][affinity_tag]);
1696
1697 adjust_counters = FALSE;
1698 th_to_run = tl->th_thread;
1699
1700 pick_up_work:
1701 if (item == 0) {
1702 witem = TAILQ_FIRST(&wl->wl_itemlist);
1703 TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry);
1704
1705 if (TAILQ_EMPTY(&wl->wl_itemlist))
1706 wq->wq_list_bitmap &= ~(1 << priority);
1707 wq->wq_itemcount--;
1708
1709 item = witem->wi_item;
1710 witem->wi_item = (user_addr_t)0;
1711 witem->wi_affinity = 0;
1712 TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, wi_entry);
1713
1714 if ( !(tl->th_flags & TH_LIST_CONSTRAINED)) {
1715 wq->wq_constrained_threads_scheduled++;
1716 tl->th_flags |= TH_LIST_CONSTRAINED;
1717 }
1718 } else {
1719 if (tl->th_flags & TH_LIST_CONSTRAINED) {
1720 wq->wq_constrained_threads_scheduled--;
1721 tl->th_flags &= ~TH_LIST_CONSTRAINED;
1722 }
1723 }
1724 orig_priority = tl->th_priority;
1725 orig_affinity_tag = tl->th_affinity_tag;
1726
1727 tl->th_priority = priority;
1728 tl->th_affinity_tag = affinity_tag;
1729
1730 if (adjust_counters == TRUE && (orig_priority != priority || orig_affinity_tag != affinity_tag)) {
1731 /*
1732 * we need to adjust these counters based on this
1733 * thread's new disposition w/r to affinity and priority
1734 */
1735 OSAddAtomic(-1, &wq->wq_thactive_count[orig_priority][orig_affinity_tag]);
1736 OSAddAtomic(1, &wq->wq_thactive_count[priority][affinity_tag]);
1737
1738 wq->wq_thscheduled_count[orig_priority][orig_affinity_tag]--;
1739 wq->wq_thscheduled_count[priority][affinity_tag]++;
1740 }
1741 wq->wq_thread_yielded_count = 0;
1742
1743 workqueue_unlock(p);
1744
1745 if (orig_affinity_tag != affinity_tag) {
1746 /*
1747 * this thread's affinity does not match the affinity group
1748 * its being placed on (it's either a brand new thread or
1749 * we're retargeting an existing thread to a new group)...
1750 * affinity tag of 0 means no affinity...
1751 * but we want our tags to be 0 based because they
1752 * are used to index arrays, so...
1753 * keep it 0 based internally and bump by 1 when
1754 * calling out to set it
1755 */
1756 KERNEL_DEBUG(0xefffd114 | DBG_FUNC_START, wq, orig_affinity_tag, 0, 0, 0);
1757
1758 (void)thread_affinity_set(th_to_run, affinity_tag + 1);
1759
1760 KERNEL_DEBUG(0xefffd114 | DBG_FUNC_END, wq, affinity_tag, 0, 0, 0);
1761 }
1762 if (orig_priority != priority) {
1763 thread_precedence_policy_data_t precedinfo;
1764 thread_extended_policy_data_t extinfo;
1765 uint32_t policy;
1766
1767 policy = workqueue_policy[priority];
1768
1769 KERNEL_DEBUG(0xefffd120 | DBG_FUNC_START, wq, orig_priority, tl->th_policy, 0, 0);
1770
1771 if ((orig_priority == WORKQUEUE_BG_PRIOQUEUE) || (priority == WORKQUEUE_BG_PRIOQUEUE)) {
1772 struct uthread *ut = NULL;
1773
1774 ut = get_bsdthread_info(th_to_run);
1775
1776 if (orig_priority == WORKQUEUE_BG_PRIOQUEUE) {
1777 /* remove the disk throttle, importance will be reset in anycase */
1778 #if !CONFIG_EMBEDDED
1779 proc_restore_workq_bgthreadpolicy(th_to_run);
1780 #else /* !CONFIG_EMBEDDED */
1781 if ((ut->uu_flag & UT_BACKGROUND) != 0) {
1782 ut->uu_flag &= ~UT_BACKGROUND;
1783 ut->uu_iopol_disk = IOPOL_NORMAL;
1784 }
1785 #endif /* !CONFIG_EMBEDDED */
1786 }
1787
1788 if (priority == WORKQUEUE_BG_PRIOQUEUE) {
1789 #if !CONFIG_EMBEDDED
1790 proc_apply_workq_bgthreadpolicy(th_to_run);
1791 #else /* !CONFIG_EMBEDDED */
1792 if ((ut->uu_flag & UT_BACKGROUND) == 0) {
1793 /* set diskthrottling */
1794 ut->uu_flag |= UT_BACKGROUND;
1795 ut->uu_iopol_disk = IOPOL_THROTTLE;
1796 }
1797 #endif /* !CONFIG_EMBEDDED */
1798 }
1799 }
1800
1801 if (tl->th_policy != policy) {
1802 extinfo.timeshare = policy;
1803 (void)thread_policy_set_internal(th_to_run, THREAD_EXTENDED_POLICY, (thread_policy_t)&extinfo, THREAD_EXTENDED_POLICY_COUNT);
1804
1805 tl->th_policy = policy;
1806 }
1807
1808 precedinfo.importance = workqueue_importance[priority];
1809 (void)thread_policy_set_internal(th_to_run, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
1810
1811
1812 KERNEL_DEBUG(0xefffd120 | DBG_FUNC_END, wq, priority, policy, 0, 0);
1813 }
1814 if (kdebug_enable) {
1815 int lpri = -1;
1816 int laffinity = -1;
1817 int first = -1;
1818 uint32_t code = 0xefffd02c | DBG_FUNC_START;
1819
1820 for (n = 0; n < WORKQUEUE_NUMPRIOS; n++) {
1821 for (i = 0; i < wq->wq_affinity_max; i++) {
1822 if (wq->wq_thactive_count[n][i]) {
1823 if (lpri != -1) {
1824 KERNEL_DEBUG(code, lpri, laffinity, wq->wq_thactive_count[lpri][laffinity], first, 0);
1825 code = 0xefffd02c;
1826 first = 0;
1827 }
1828 lpri = n;
1829 laffinity = i;
1830 }
1831 }
1832 }
1833 if (lpri != -1) {
1834 if (first == -1)
1835 first = 0xeeeeeeee;
1836 KERNEL_DEBUG(0xefffd02c | DBG_FUNC_END, lpri, laffinity, wq->wq_thactive_count[lpri][laffinity], first, 0);
1837 }
1838 }
1839 /*
1840 * if current thread is reused for workitem, does not return via unix_syscall
1841 */
1842 wq_runitem(p, item, th_to_run, tl, reuse_thread, wake_thread, (thread == th_to_run));
1843
1844 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, wq, thread_tid(th_to_run), item, 1, 0);
1845
1846 return (TRUE);
1847
1848 out_of_work:
1849 /*
1850 * we have no work to do or we are fully booked
1851 * w/r to running threads...
1852 */
1853 no_thread_to_run:
1854 workqueue_unlock(p);
1855
1856 if (start_timer)
1857 workqueue_interval_timer_start(wq);
1858
1859 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, wq, thread_tid(thread), 0, 2, 0);
1860
1861 return (FALSE);
1862
1863 parkit:
1864 /*
1865 * this is a workqueue thread with no more
1866 * work to do... park it for now
1867 */
1868 uth = get_bsdthread_info(th_to_park);
1869 tl = uth->uu_threadlist;
1870 if (tl == 0)
1871 panic("wq thread with no threadlist ");
1872
1873 TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry);
1874 tl->th_flags &= ~TH_LIST_RUNNING;
1875
1876 tl->th_flags |= TH_LIST_BLOCKED;
1877 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, tl, th_entry);
1878
1879 thread_sched_call(th_to_park, NULL);
1880
1881 OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority][tl->th_affinity_tag]);
1882 wq->wq_thscheduled_count[tl->th_priority][tl->th_affinity_tag]--;
1883 wq->wq_threads_scheduled--;
1884
1885 if (tl->th_flags & TH_LIST_CONSTRAINED) {
1886 wq->wq_constrained_threads_scheduled--;
1887 wq->wq_lflags &= ~WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
1888 tl->th_flags &= ~TH_LIST_CONSTRAINED;
1889 }
1890 if (wq->wq_thidlecount < 100)
1891 us_to_wait = wq_reduce_pool_window_usecs - (wq->wq_thidlecount * (wq_reduce_pool_window_usecs / 100));
1892 else
1893 us_to_wait = wq_reduce_pool_window_usecs / 100;
1894
1895 wq->wq_thidlecount++;
1896 wq->wq_lflags &= ~WQL_EXCEEDED_TOTAL_THREAD_LIMIT;
1897
1898 assert_wait_timeout((caddr_t)tl, (THREAD_INTERRUPTIBLE), us_to_wait, NSEC_PER_USEC);
1899
1900 workqueue_unlock(p);
1901
1902 if (start_timer)
1903 workqueue_interval_timer_start(wq);
1904
1905 KERNEL_DEBUG1(0xefffd018 | DBG_FUNC_START, wq, wq->wq_threads_scheduled, wq->wq_thidlecount, us_to_wait, thread_tid(th_to_park));
1906 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, wq, thread_tid(thread), 0, 3, 0);
1907
1908 thread_block((thread_continue_t)wq_unpark_continue);
1909 /* NOT REACHED */
1910
1911 return (FALSE);
1912 }
1913
1914
1915 static void
1916 wq_unsuspend_continue(void)
1917 {
1918 struct uthread *uth = NULL;
1919 thread_t th_to_unsuspend;
1920 struct threadlist *tl;
1921 proc_t p;
1922
1923 th_to_unsuspend = current_thread();
1924 uth = get_bsdthread_info(th_to_unsuspend);
1925
1926 if (uth != NULL && (tl = uth->uu_threadlist) != NULL) {
1927
1928 if ((tl->th_flags & (TH_LIST_RUNNING | TH_LIST_BUSY)) == TH_LIST_RUNNING) {
1929 /*
1930 * most likely a normal resume of this thread occurred...
1931 * it's also possible that the thread was aborted after we
1932 * finished setting it up so that it could be dispatched... if
1933 * so, thread_bootstrap_return will notice the abort and put
1934 * the thread on the path to self-destruction
1935 */
1936 normal_resume_to_user:
1937 thread_sched_call(th_to_unsuspend, workqueue_callback);
1938
1939 thread_bootstrap_return();
1940 }
1941 /*
1942 * if we get here, it's because we've been resumed due to
1943 * an abort of this thread (process is crashing)
1944 */
1945 p = current_proc();
1946
1947 workqueue_lock_spin(p);
1948
1949 if (tl->th_flags & TH_LIST_SUSPENDED) {
1950 /*
1951 * thread has been aborted while still on our idle
1952 * queue... remove it from our domain...
1953 * workqueue_removethread consumes the lock
1954 */
1955 workqueue_removethread(tl);
1956
1957 thread_bootstrap_return();
1958 }
1959 while ((tl->th_flags & TH_LIST_BUSY)) {
1960 /*
1961 * this thread was aborted after we started making
1962 * it runnable, but before we finished dispatching it...
1963 * we need to wait for that process to finish,
1964 * and we need to ask for a wakeup instead of a
1965 * thread_resume since the abort has already resumed us
1966 */
1967 tl->th_flags |= TH_LIST_NEED_WAKEUP;
1968
1969 assert_wait((caddr_t)tl, (THREAD_UNINT));
1970
1971 workqueue_unlock(p);
1972
1973 thread_block(THREAD_CONTINUE_NULL);
1974
1975 workqueue_lock_spin(p);
1976 }
1977 workqueue_unlock(p);
1978 /*
1979 * we have finished setting up the thread's context...
1980 * thread_bootstrap_return will take us through the abort path
1981 * where the thread will self destruct
1982 */
1983 goto normal_resume_to_user;
1984 }
1985 thread_bootstrap_return();
1986 }
1987
1988
1989 static void
1990 wq_unpark_continue(void)
1991 {
1992 struct uthread *uth = NULL;
1993 struct threadlist *tl;
1994 thread_t th_to_unpark;
1995 proc_t p;
1996
1997 th_to_unpark = current_thread();
1998 uth = get_bsdthread_info(th_to_unpark);
1999
2000 if (uth != NULL) {
2001 if ((tl = uth->uu_threadlist) != NULL) {
2002
2003 if ((tl->th_flags & (TH_LIST_RUNNING | TH_LIST_BUSY)) == TH_LIST_RUNNING) {
2004 /*
2005 * a normal wakeup of this thread occurred... no need
2006 * for any synchronization with the timer and wq_runitem
2007 */
2008 normal_return_to_user:
2009 thread_sched_call(th_to_unpark, workqueue_callback);
2010
2011 KERNEL_DEBUG(0xefffd018 | DBG_FUNC_END, tl->th_workq, 0, 0, 0, 0);
2012
2013 thread_exception_return();
2014 }
2015 p = current_proc();
2016
2017 workqueue_lock_spin(p);
2018
2019 if ( !(tl->th_flags & TH_LIST_RUNNING)) {
2020 /*
2021 * the timer popped us out and we've not
2022 * been moved off of the idle list
2023 * so we should now self-destruct
2024 *
2025 * workqueue_removethread consumes the lock
2026 */
2027 workqueue_removethread(tl);
2028
2029 thread_exception_return();
2030 }
2031 /*
2032 * the timer woke us up, but we have already
2033 * started to make this a runnable thread,
2034 * but have not yet finished that process...
2035 * so wait for the normal wakeup
2036 */
2037 while ((tl->th_flags & TH_LIST_BUSY)) {
2038
2039 assert_wait((caddr_t)tl, (THREAD_UNINT));
2040
2041 workqueue_unlock(p);
2042
2043 thread_block(THREAD_CONTINUE_NULL);
2044
2045 workqueue_lock_spin(p);
2046 }
2047 /*
2048 * we have finished setting up the thread's context
2049 * now we can return as if we got a normal wakeup
2050 */
2051 workqueue_unlock(p);
2052
2053 goto normal_return_to_user;
2054 }
2055 }
2056 thread_exception_return();
2057 }
2058
2059
2060
2061 static void
2062 wq_runitem(proc_t p, user_addr_t item, thread_t th, struct threadlist *tl,
2063 int reuse_thread, int wake_thread, int return_directly)
2064 {
2065 int ret = 0;
2066 boolean_t need_resume = FALSE;
2067
2068 KERNEL_DEBUG1(0xefffd004 | DBG_FUNC_START, tl->th_workq, tl->th_priority, tl->th_affinity_tag, thread_tid(current_thread()), thread_tid(th));
2069
2070 ret = setup_wqthread(p, th, item, reuse_thread, tl);
2071
2072 if (ret != 0)
2073 panic("setup_wqthread failed %x\n", ret);
2074
2075 if (return_directly) {
2076 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, tl->th_workq, 0, 0, 4, 0);
2077
2078 thread_exception_return();
2079
2080 panic("wq_runitem: thread_exception_return returned ...\n");
2081 }
2082 if (wake_thread) {
2083 workqueue_lock_spin(p);
2084
2085 tl->th_flags &= ~TH_LIST_BUSY;
2086 wakeup(tl);
2087
2088 workqueue_unlock(p);
2089 } else {
2090 KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_END, tl->th_workq, 0, 0, thread_tid(current_thread()), thread_tid(th));
2091
2092 workqueue_lock_spin(p);
2093
2094 if (tl->th_flags & TH_LIST_NEED_WAKEUP)
2095 wakeup(tl);
2096 else
2097 need_resume = TRUE;
2098
2099 tl->th_flags &= ~(TH_LIST_BUSY | TH_LIST_NEED_WAKEUP);
2100
2101 workqueue_unlock(p);
2102
2103 if (need_resume) {
2104 /*
2105 * need to do this outside of the workqueue spin lock
2106 * since thread_resume locks the thread via a full mutex
2107 */
2108 thread_resume(th);
2109 }
2110 }
2111 }
2112
2113
2114 int
2115 setup_wqthread(proc_t p, thread_t th, user_addr_t item, int reuse_thread, struct threadlist *tl)
2116 {
2117 #if defined(__i386__) || defined(__x86_64__)
2118 int isLP64 = 0;
2119
2120 isLP64 = IS_64BIT_PROCESS(p);
2121 /*
2122 * Set up i386 registers & function call.
2123 */
2124 if (isLP64 == 0) {
2125 x86_thread_state32_t state;
2126 x86_thread_state32_t *ts = &state;
2127
2128 ts->eip = (int)p->p_wqthread;
2129 ts->eax = (unsigned int)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE);
2130 ts->ebx = (unsigned int)tl->th_thport;
2131 ts->ecx = (unsigned int)(tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE);
2132 ts->edx = (unsigned int)item;
2133 ts->edi = (unsigned int)reuse_thread;
2134 ts->esi = (unsigned int)0;
2135 /*
2136 * set stack pointer
2137 */
2138 ts->esp = (int)((vm_offset_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE) - C_32_STK_ALIGN));
2139
2140 if ((reuse_thread != 0) && (ts->eax == (unsigned int)0))
2141 panic("setup_wqthread: setting reuse thread with null pthread\n");
2142 thread_set_wq_state32(th, (thread_state_t)ts);
2143
2144 } else {
2145 x86_thread_state64_t state64;
2146 x86_thread_state64_t *ts64 = &state64;
2147
2148 ts64->rip = (uint64_t)p->p_wqthread;
2149 ts64->rdi = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE);
2150 ts64->rsi = (uint64_t)(tl->th_thport);
2151 ts64->rdx = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE);
2152 ts64->rcx = (uint64_t)item;
2153 ts64->r8 = (uint64_t)reuse_thread;
2154 ts64->r9 = (uint64_t)0;
2155
2156 /*
2157 * set stack pointer aligned to 16 byte boundary
2158 */
2159 ts64->rsp = (uint64_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE) - C_64_REDZONE_LEN);
2160
2161 if ((reuse_thread != 0) && (ts64->rdi == (uint64_t)0))
2162 panic("setup_wqthread: setting reuse thread with null pthread\n");
2163 thread_set_wq_state64(th, (thread_state_t)ts64);
2164 }
2165 #else
2166 #error setup_wqthread not defined for this architecture
2167 #endif
2168 return(0);
2169 }
2170
2171 int
2172 fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
2173 {
2174 struct workqueue * wq;
2175 int error = 0;
2176 int activecount;
2177 uint32_t pri, affinity;
2178
2179 workqueue_lock_spin(p);
2180 if ((wq = p->p_wqptr) == NULL) {
2181 error = EINVAL;
2182 goto out;
2183 }
2184 activecount = 0;
2185
2186 for (pri = 0; pri < WORKQUEUE_NUMPRIOS; pri++) {
2187 for (affinity = 0; affinity < wq->wq_affinity_max; affinity++)
2188 activecount += wq->wq_thactive_count[pri][affinity];
2189 }
2190 pwqinfo->pwq_nthreads = wq->wq_nthreads;
2191 pwqinfo->pwq_runthreads = activecount;
2192 pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
2193 pwqinfo->pwq_state = 0;
2194
2195 if (wq->wq_lflags & WQL_EXCEEDED_CONSTRAINED_THREAD_LIMIT)
2196 pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
2197
2198 if (wq->wq_lflags & WQL_EXCEEDED_TOTAL_THREAD_LIMIT)
2199 pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
2200
2201 out:
2202 workqueue_unlock(p);
2203 return(error);
2204 }
2205
2206 /* Set target concurrency of one of the queue(0,1,2) with specified value */
2207 int
2208 proc_settargetconc(pid_t pid, int queuenum, int32_t targetconc)
2209 {
2210 proc_t p, self;
2211 uint64_t addr;
2212 int32_t conc = targetconc;
2213 int error = 0;
2214 vm_map_t oldmap = VM_MAP_NULL;
2215 int gotref = 0;
2216
2217 self = current_proc();
2218 if (self->p_pid != pid) {
2219 /* if not on self, hold a refernce on the process */
2220
2221 if (pid == 0)
2222 return(EINVAL);
2223
2224 p = proc_find(pid);
2225
2226 if (p == PROC_NULL)
2227 return(ESRCH);
2228 gotref = 1;
2229
2230 } else
2231 p = self;
2232
2233 if ((addr = p->p_targconc) == (uint64_t)0) {
2234 error = EINVAL;
2235 goto out;
2236 }
2237
2238
2239 if ((queuenum >= WQ_MAXPRI_MIN) && (queuenum <= WQ_MAXPRI_MAX)) {
2240 addr += (queuenum * sizeof(int32_t));
2241 if (gotref == 1)
2242 oldmap = vm_map_switch(get_task_map(p->task));
2243 error = copyout(&conc, addr, sizeof(int32_t));
2244 if (gotref == 1)
2245 (void)vm_map_switch(oldmap);
2246
2247 } else {
2248 error = EINVAL;
2249 }
2250 out:
2251 if (gotref == 1)
2252 proc_rele(p);
2253 return(error);
2254 }
2255
2256
2257 /* Set target concurrency on all the prio queues with specified value */
2258 int
2259 proc_setalltargetconc(pid_t pid, int32_t * targetconcp)
2260 {
2261 proc_t p, self;
2262 uint64_t addr;
2263 int error = 0;
2264 vm_map_t oldmap = VM_MAP_NULL;
2265 int gotref = 0;
2266
2267 self = current_proc();
2268 if (self->p_pid != pid) {
2269 /* if not on self, hold a refernce on the process */
2270
2271 if (pid == 0)
2272 return(EINVAL);
2273
2274 p = proc_find(pid);
2275
2276 if (p == PROC_NULL)
2277 return(ESRCH);
2278 gotref = 1;
2279
2280 } else
2281 p = self;
2282
2283 if ((addr = (uint64_t)p->p_targconc) == (uint64_t)0) {
2284 error = EINVAL;
2285 goto out;
2286 }
2287
2288
2289 if (gotref == 1)
2290 oldmap = vm_map_switch(get_task_map(p->task));
2291
2292 error = copyout(targetconcp, addr, WQ_PRI_NUM * sizeof(int32_t));
2293 if (gotref == 1)
2294 (void)vm_map_switch(oldmap);
2295
2296 out:
2297 if (gotref == 1)
2298 proc_rele(p);
2299 return(error);
2300 }
2301
2302 int thread_selfid(__unused struct proc *p, __unused struct thread_selfid_args *uap, uint64_t *retval)
2303 {
2304 thread_t thread = current_thread();
2305 *retval = thread_tid(thread);
2306 return KERN_SUCCESS;
2307 }
2308
2309 void
2310 pthread_init(void)
2311 {
2312 pthread_lck_grp_attr = lck_grp_attr_alloc_init();
2313 pthread_lck_grp = lck_grp_alloc_init("pthread", pthread_lck_grp_attr);
2314
2315 /*
2316 * allocate the lock attribute for pthread synchronizers
2317 */
2318 pthread_lck_attr = lck_attr_alloc_init();
2319
2320 workqueue_init_lock((proc_t) get_bsdtask_info(kernel_task));
2321 #if PSYNCH
2322 pthread_list_mlock = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr);
2323
2324 pth_global_hashinit();
2325 psynch_thcall = thread_call_allocate(psynch_wq_cleanup, NULL);
2326 psynch_zoneinit();
2327 #endif /* PSYNCH */
2328 }