]> git.saurik.com Git - apple/xnu.git/blob - bsd/kern/pthread_synch.c
xnu-1504.3.12.tar.gz
[apple/xnu.git] / bsd / kern / pthread_synch.c
1 /*
2 * Copyright (c) 2000-2009 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28 /* Copyright (c) 1995-2005 Apple Computer, Inc. All Rights Reserved */
29 /*
30 * pthread_synch.c
31 */
32
33 #define _PTHREAD_CONDATTR_T
34 #define _PTHREAD_COND_T
35 #define _PTHREAD_MUTEXATTR_T
36 #define _PTHREAD_MUTEX_T
37 #define _PTHREAD_RWLOCKATTR_T
38 #define _PTHREAD_RWLOCK_T
39
40 #undef pthread_mutexattr_t
41 #undef pthread_mutex_t
42 #undef pthread_condattr_t
43 #undef pthread_cond_t
44 #undef pthread_rwlockattr_t
45 #undef pthread_rwlock_t
46
47 #include <sys/param.h>
48 #include <sys/queue.h>
49 #include <sys/resourcevar.h>
50 #include <sys/proc_internal.h>
51 #include <sys/kauth.h>
52 #include <sys/systm.h>
53 #include <sys/timeb.h>
54 #include <sys/times.h>
55 #include <sys/acct.h>
56 #include <sys/kernel.h>
57 #include <sys/wait.h>
58 #include <sys/signalvar.h>
59 #include <sys/syslog.h>
60 #include <sys/stat.h>
61 #include <sys/lock.h>
62 #include <sys/kdebug.h>
63 #include <sys/sysproto.h>
64 #include <sys/pthread_internal.h>
65 #include <sys/vm.h>
66 #include <sys/user.h> /* for coredump */
67 #include <sys/proc_info.h> /* for fill_procworkqueue */
68
69
70 #include <mach/mach_types.h>
71 #include <mach/vm_prot.h>
72 #include <mach/semaphore.h>
73 #include <mach/sync_policy.h>
74 #include <mach/task.h>
75 #include <kern/kern_types.h>
76 #include <kern/task.h>
77 #include <kern/clock.h>
78 #include <mach/kern_return.h>
79 #include <kern/thread.h>
80 #include <kern/sched_prim.h>
81 #include <kern/kalloc.h>
82 #include <kern/sched_prim.h> /* for thread_exception_return */
83 #include <kern/processor.h>
84 #include <kern/affinity.h>
85 #include <kern/assert.h>
86 #include <mach/mach_vm.h>
87 #include <mach/mach_param.h>
88 #include <mach/thread_status.h>
89 #include <mach/thread_policy.h>
90 #include <mach/message.h>
91 #include <mach/port.h>
92 #include <vm/vm_protos.h>
93 #include <vm/vm_map.h> /* for current_map() */
94 #include <mach/thread_act.h> /* for thread_resume */
95 #include <machine/machine_routines.h>
96 #if defined(__i386__)
97 #include <i386/machine_routines.h>
98 #include <i386/eflags.h>
99 #include <i386/psl.h>
100 #include <i386/seg.h>
101 #endif
102
103 #include <libkern/OSAtomic.h>
104
105 #if 0
106 #undef KERNEL_DEBUG
107 #define KERNEL_DEBUG KERNEL_DEBUG_CONSTANT
108 #undef KERNEL_DEBUG1
109 #define KERNEL_DEBUG1 KERNEL_DEBUG_CONSTANT1
110 #endif
111
112
113 #if defined(__ppc__) || defined(__ppc64__)
114 #include <architecture/ppc/cframe.h>
115 #endif
116
117
118 lck_grp_attr_t *pthread_lck_grp_attr;
119 lck_grp_t *pthread_lck_grp;
120 lck_attr_t *pthread_lck_attr;
121
122 extern kern_return_t thread_getstatus(register thread_t act, int flavor,
123 thread_state_t tstate, mach_msg_type_number_t *count);
124 extern kern_return_t thread_setstatus(thread_t thread, int flavor,
125 thread_state_t tstate, mach_msg_type_number_t count);
126 extern void thread_set_cthreadself(thread_t thread, uint64_t pself, int isLP64);
127 extern kern_return_t mach_port_deallocate(ipc_space_t, mach_port_name_t);
128 extern kern_return_t semaphore_signal_internal_trap(mach_port_name_t);
129
130 extern void workqueue_thread_yielded(void);
131
132 static int workqueue_additem(struct workqueue *wq, int prio, user_addr_t item, int affinity);
133 static int workqueue_removeitem(struct workqueue *wq, int prio, user_addr_t item);
134 static boolean_t workqueue_run_nextitem(proc_t p, struct workqueue *wq, thread_t th,
135 user_addr_t oc_item, int oc_prio, int oc_affinity);
136 static void wq_runitem(proc_t p, user_addr_t item, thread_t th, struct threadlist *tl,
137 int reuse_thread, int wake_thread, int return_directly);
138 static void wq_unpark_continue(void);
139 static void wq_unsuspend_continue(void);
140 static int setup_wqthread(proc_t p, thread_t th, user_addr_t item, int reuse_thread, struct threadlist *tl);
141 static boolean_t workqueue_addnewthread(struct workqueue *wq);
142 static void workqueue_removethread(struct threadlist *tl);
143 static void workqueue_lock_spin(proc_t);
144 static void workqueue_unlock(proc_t);
145 int proc_settargetconc(pid_t pid, int queuenum, int32_t targetconc);
146 int proc_setalltargetconc(pid_t pid, int32_t * targetconcp);
147
148 #define WQ_MAXPRI_MIN 0 /* low prio queue num */
149 #define WQ_MAXPRI_MAX 2 /* max prio queuenum */
150 #define WQ_PRI_NUM 3 /* number of prio work queues */
151
152 #define C_32_STK_ALIGN 16
153 #define C_64_STK_ALIGN 16
154 #define C_64_REDZONE_LEN 128
155 #define TRUNC_DOWN32(a,c) ((((uint32_t)a)-(c)) & ((uint32_t)(-(c))))
156 #define TRUNC_DOWN64(a,c) ((((uint64_t)a)-(c)) & ((uint64_t)(-(c))))
157
158
159 /*
160 * Flags filed passed to bsdthread_create and back in pthread_start
161 31 <---------------------------------> 0
162 _________________________________________
163 | flags(8) | policy(8) | importance(16) |
164 -----------------------------------------
165 */
166 void _pthread_start(pthread_t self, mach_port_t kport, void *(*fun)(void *), void * funarg, size_t stacksize, unsigned int flags);
167
168 #define PTHREAD_START_CUSTOM 0x01000000
169 #define PTHREAD_START_SETSCHED 0x02000000
170 #define PTHREAD_START_DETACHED 0x04000000
171 #define PTHREAD_START_POLICY_BITSHIFT 16
172 #define PTHREAD_START_POLICY_MASK 0xff
173 #define PTHREAD_START_IMPORTANCE_MASK 0xffff
174
175 #define SCHED_OTHER POLICY_TIMESHARE
176 #define SCHED_FIFO POLICY_FIFO
177 #define SCHED_RR POLICY_RR
178
179
180
181 int
182 bsdthread_create(__unused struct proc *p, struct bsdthread_create_args *uap, user_addr_t *retval)
183 {
184 kern_return_t kret;
185 void * sright;
186 int error = 0;
187 int allocated = 0;
188 mach_vm_offset_t stackaddr;
189 mach_vm_size_t th_allocsize = 0;
190 mach_vm_size_t user_stacksize;
191 mach_vm_size_t th_stacksize;
192 mach_vm_offset_t th_stackaddr;
193 mach_vm_offset_t th_stack;
194 mach_vm_offset_t th_pthread;
195 mach_port_name_t th_thport;
196 thread_t th;
197 user_addr_t user_func = uap->func;
198 user_addr_t user_funcarg = uap->func_arg;
199 user_addr_t user_stack = uap->stack;
200 user_addr_t user_pthread = uap->pthread;
201 unsigned int flags = (unsigned int)uap->flags;
202 vm_map_t vmap = current_map();
203 task_t ctask = current_task();
204 unsigned int policy, importance;
205
206 int isLP64 = 0;
207
208
209 if ((p->p_lflag & P_LREGISTER) == 0)
210 return(EINVAL);
211 #if 0
212 KERNEL_DEBUG_CONSTANT(0x9000080 | DBG_FUNC_START, flags, 0, 0, 0, 0);
213 #endif
214
215 isLP64 = IS_64BIT_PROCESS(p);
216
217
218 #if defined(__ppc__)
219 stackaddr = 0xF0000000;
220 #elif defined(__i386__) || defined(__x86_64__)
221 stackaddr = 0xB0000000;
222 #else
223 #error Need to define a stack address hint for this architecture
224 #endif
225 kret = thread_create(ctask, &th);
226 if (kret != KERN_SUCCESS)
227 return(ENOMEM);
228 thread_reference(th);
229
230 sright = (void *) convert_thread_to_port(th);
231 th_thport = ipc_port_copyout_send(sright, get_task_ipcspace(ctask));
232
233 if ((flags & PTHREAD_START_CUSTOM) == 0) {
234 th_stacksize = (mach_vm_size_t)user_stack; /* if it is custom them it is stacksize */
235 th_allocsize = th_stacksize + PTH_DEFAULT_GUARDSIZE + p->p_pthsize;
236
237 kret = mach_vm_map(vmap, &stackaddr,
238 th_allocsize,
239 page_size-1,
240 VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE , NULL,
241 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL,
242 VM_INHERIT_DEFAULT);
243 if (kret != KERN_SUCCESS)
244 kret = mach_vm_allocate(vmap,
245 &stackaddr, th_allocsize,
246 VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE);
247 if (kret != KERN_SUCCESS) {
248 error = ENOMEM;
249 goto out;
250 }
251 #if 0
252 KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_NONE, th_allocsize, stackaddr, 0, 2, 0);
253 #endif
254 th_stackaddr = stackaddr;
255 allocated = 1;
256 /*
257 * The guard page is at the lowest address
258 * The stack base is the highest address
259 */
260 kret = mach_vm_protect(vmap, stackaddr, PTH_DEFAULT_GUARDSIZE, FALSE, VM_PROT_NONE);
261
262 if (kret != KERN_SUCCESS) {
263 error = ENOMEM;
264 goto out1;
265 }
266 th_stack = (stackaddr + th_stacksize + PTH_DEFAULT_GUARDSIZE);
267 th_pthread = (stackaddr + th_stacksize + PTH_DEFAULT_GUARDSIZE);
268 user_stacksize = th_stacksize;
269 } else {
270 th_stack = user_stack;
271 user_stacksize = user_stack;
272 th_pthread = user_pthread;
273 #if 0
274 KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_NONE, 0, 0, 0, 3, 0);
275 #endif
276 }
277
278 #if defined(__ppc__)
279 /*
280 * Set up PowerPC registers...
281 * internally they are always kept as 64 bit and
282 * since the register set is the same between 32 and 64bit modes
283 * we don't need 2 different methods for setting the state
284 */
285 {
286 ppc_thread_state64_t state64;
287 ppc_thread_state64_t *ts64 = &state64;
288
289 ts64->srr0 = (uint64_t)p->p_threadstart;
290 ts64->r1 = (uint64_t)(th_stack - C_ARGSAVE_LEN - C_RED_ZONE);
291 ts64->r3 = (uint64_t)th_pthread;
292 ts64->r4 = (uint64_t)(th_thport);
293 ts64->r5 = (uint64_t)user_func;
294 ts64->r6 = (uint64_t)user_funcarg;
295 ts64->r7 = (uint64_t)user_stacksize;
296 ts64->r8 = (uint64_t)uap->flags;
297
298 thread_set_wq_state64(th, (thread_state_t)ts64);
299
300 thread_set_cthreadself(th, (uint64_t)th_pthread, isLP64);
301 }
302 #elif defined(__i386__) || defined(__x86_64__)
303 {
304 /*
305 * Set up i386 registers & function call.
306 */
307 if (isLP64 == 0) {
308 x86_thread_state32_t state;
309 x86_thread_state32_t *ts = &state;
310
311 ts->eip = (int)p->p_threadstart;
312 ts->eax = (unsigned int)th_pthread;
313 ts->ebx = (unsigned int)th_thport;
314 ts->ecx = (unsigned int)user_func;
315 ts->edx = (unsigned int)user_funcarg;
316 ts->edi = (unsigned int)user_stacksize;
317 ts->esi = (unsigned int)uap->flags;
318 /*
319 * set stack pointer
320 */
321 ts->esp = (int)((vm_offset_t)(th_stack-C_32_STK_ALIGN));
322
323 thread_set_wq_state32(th, (thread_state_t)ts);
324
325 } else {
326 x86_thread_state64_t state64;
327 x86_thread_state64_t *ts64 = &state64;
328
329 ts64->rip = (uint64_t)p->p_threadstart;
330 ts64->rdi = (uint64_t)th_pthread;
331 ts64->rsi = (uint64_t)(th_thport);
332 ts64->rdx = (uint64_t)user_func;
333 ts64->rcx = (uint64_t)user_funcarg;
334 ts64->r8 = (uint64_t)user_stacksize;
335 ts64->r9 = (uint64_t)uap->flags;
336 /*
337 * set stack pointer aligned to 16 byte boundary
338 */
339 ts64->rsp = (uint64_t)(th_stack - C_64_REDZONE_LEN);
340
341 thread_set_wq_state64(th, (thread_state_t)ts64);
342 }
343 }
344 #else
345 #error bsdthread_create not defined for this architecture
346 #endif
347 /* Set scheduling parameters if needed */
348 if ((flags & PTHREAD_START_SETSCHED) != 0) {
349 thread_extended_policy_data_t extinfo;
350 thread_precedence_policy_data_t precedinfo;
351
352 importance = (flags & PTHREAD_START_IMPORTANCE_MASK);
353 policy = (flags >> PTHREAD_START_POLICY_BITSHIFT) & PTHREAD_START_POLICY_MASK;
354
355 if (policy == SCHED_OTHER)
356 extinfo.timeshare = 1;
357 else
358 extinfo.timeshare = 0;
359 thread_policy_set(th, THREAD_EXTENDED_POLICY, (thread_policy_t)&extinfo, THREAD_EXTENDED_POLICY_COUNT);
360
361 #define BASEPRI_DEFAULT 31
362 precedinfo.importance = (importance - BASEPRI_DEFAULT);
363 thread_policy_set(th, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
364 }
365
366 kret = thread_resume(th);
367 if (kret != KERN_SUCCESS) {
368 error = EINVAL;
369 goto out1;
370 }
371 thread_deallocate(th); /* drop the creator reference */
372 #if 0
373 KERNEL_DEBUG_CONSTANT(0x9000080 |DBG_FUNC_END, error, th_pthread, 0, 0, 0);
374 #endif
375 *retval = th_pthread;
376
377 return(0);
378
379 out1:
380 if (allocated != 0)
381 (void)mach_vm_deallocate(vmap, stackaddr, th_allocsize);
382 out:
383 (void)mach_port_deallocate(get_task_ipcspace(ctask), th_thport);
384 (void)thread_terminate(th);
385 (void)thread_deallocate(th);
386 return(error);
387 }
388
389 int
390 bsdthread_terminate(__unused struct proc *p, struct bsdthread_terminate_args *uap, __unused int32_t *retval)
391 {
392 mach_vm_offset_t freeaddr;
393 mach_vm_size_t freesize;
394 kern_return_t kret;
395 mach_port_name_t kthport = (mach_port_name_t)uap->port;
396 mach_port_name_t sem = (mach_port_name_t)uap->sem;
397
398 freeaddr = (mach_vm_offset_t)uap->stackaddr;
399 freesize = uap->freesize;
400
401 #if 0
402 KERNEL_DEBUG_CONSTANT(0x9000084 |DBG_FUNC_START, freeaddr, freesize, kthport, 0xff, 0);
403 #endif
404 if ((freesize != (mach_vm_size_t)0) && (freeaddr != (mach_vm_offset_t)0)) {
405 kret = mach_vm_deallocate(current_map(), freeaddr, freesize);
406 if (kret != KERN_SUCCESS) {
407 return(EINVAL);
408 }
409 }
410
411 (void) thread_terminate(current_thread());
412 if (sem != MACH_PORT_NULL) {
413 kret = semaphore_signal_internal_trap(sem);
414 if (kret != KERN_SUCCESS) {
415 return(EINVAL);
416 }
417 }
418
419 if (kthport != MACH_PORT_NULL)
420 mach_port_deallocate(get_task_ipcspace(current_task()), kthport);
421 thread_exception_return();
422 panic("bsdthread_terminate: still running\n");
423 #if 0
424 KERNEL_DEBUG_CONSTANT(0x9000084 |DBG_FUNC_END, 0, 0, 0, 0xff, 0);
425 #endif
426 return(0);
427 }
428
429
430 int
431 bsdthread_register(struct proc *p, struct bsdthread_register_args *uap, __unused int32_t *retval)
432 {
433 /* prevent multiple registrations */
434 if ((p->p_lflag & P_LREGISTER) != 0)
435 return(EINVAL);
436 /* syscall randomizer test can pass bogus values */
437 if (uap->pthsize > MAX_PTHREAD_SIZE) {
438 return(EINVAL);
439 }
440 p->p_threadstart = uap->threadstart;
441 p->p_wqthread = uap->wqthread;
442 p->p_pthsize = uap->pthsize;
443 p->p_targconc = uap->targetconc_ptr;
444 p->p_dispatchqueue_offset = uap->dispatchqueue_offset;
445 proc_setregister(p);
446
447 return(0);
448 }
449
450 uint32_t wq_yielded_threshold = WQ_YIELDED_THRESHOLD;
451 uint32_t wq_yielded_window_usecs = WQ_YIELDED_WINDOW_USECS;
452 uint32_t wq_stalled_window_usecs = WQ_STALLED_WINDOW_USECS;
453 uint32_t wq_reduce_pool_window_usecs = WQ_REDUCE_POOL_WINDOW_USECS;
454 uint32_t wq_max_timer_interval_usecs = WQ_MAX_TIMER_INTERVAL_USECS;
455 uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS;
456
457
458 SYSCTL_INT(_kern, OID_AUTO, wq_yielded_threshold, CTLFLAG_RW,
459 &wq_yielded_threshold, 0, "");
460
461 SYSCTL_INT(_kern, OID_AUTO, wq_yielded_window_usecs, CTLFLAG_RW,
462 &wq_yielded_window_usecs, 0, "");
463
464 SYSCTL_INT(_kern, OID_AUTO, wq_stalled_window_usecs, CTLFLAG_RW,
465 &wq_stalled_window_usecs, 0, "");
466
467 SYSCTL_INT(_kern, OID_AUTO, wq_reduce_pool_window_usecs, CTLFLAG_RW,
468 &wq_reduce_pool_window_usecs, 0, "");
469
470 SYSCTL_INT(_kern, OID_AUTO, wq_max_timer_interval_usecs, CTLFLAG_RW,
471 &wq_max_timer_interval_usecs, 0, "");
472
473 SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW,
474 &wq_max_threads, 0, "");
475
476
477 void
478 workqueue_init_lock(proc_t p)
479 {
480 lck_spin_init(&p->p_wqlock, pthread_lck_grp, pthread_lck_attr);
481
482 p->p_wqiniting = FALSE;
483 }
484
485 void
486 workqueue_destroy_lock(proc_t p)
487 {
488 lck_spin_destroy(&p->p_wqlock, pthread_lck_grp);
489 }
490
491
492 static void
493 workqueue_lock_spin(proc_t p)
494 {
495 lck_spin_lock(&p->p_wqlock);
496 }
497
498 static void
499 workqueue_unlock(proc_t p)
500 {
501 lck_spin_unlock(&p->p_wqlock);
502 }
503
504
505 static void
506 workqueue_interval_timer_start(struct workqueue *wq)
507 {
508 uint64_t deadline;
509
510 if (wq->wq_timer_interval == 0)
511 wq->wq_timer_interval = wq_stalled_window_usecs;
512 else {
513 wq->wq_timer_interval = wq->wq_timer_interval * 2;
514
515 if (wq->wq_timer_interval > wq_max_timer_interval_usecs)
516 wq->wq_timer_interval = wq_max_timer_interval_usecs;
517 }
518 clock_interval_to_deadline(wq->wq_timer_interval, 1000, &deadline);
519
520 thread_call_enter_delayed(wq->wq_atimer_call, deadline);
521
522 KERNEL_DEBUG(0xefffd110, wq, wq->wq_itemcount, wq->wq_flags, wq->wq_timer_interval, 0);
523 }
524
525
526 static boolean_t
527 wq_thread_is_busy(uint64_t cur_ts, uint64_t *lastblocked_tsp)
528 { clock_sec_t secs;
529 clock_usec_t usecs;
530 uint64_t lastblocked_ts;
531 uint64_t elapsed;
532
533 /*
534 * the timestamp is updated atomically w/o holding the workqueue lock
535 * so we need to do an atomic read of the 64 bits so that we don't see
536 * a mismatched pair of 32 bit reads... we accomplish this in an architecturally
537 * independent fashion by using OSCompareAndSwap64 to write back the
538 * value we grabbed... if it succeeds, then we have a good timestamp to
539 * evaluate... if it fails, we straddled grabbing the timestamp while it
540 * was being updated... treat a failed update as a busy thread since
541 * it implies we are about to see a really fresh timestamp anyway
542 */
543 lastblocked_ts = *lastblocked_tsp;
544
545 #if defined(__ppc__)
546 #else
547 if ( !OSCompareAndSwap64((UInt64)lastblocked_ts, (UInt64)lastblocked_ts, lastblocked_tsp))
548 return (TRUE);
549 #endif
550 if (lastblocked_ts >= cur_ts) {
551 /*
552 * because the update of the timestamp when a thread blocks isn't
553 * serialized against us looking at it (i.e. we don't hold the workq lock)
554 * it's possible to have a timestamp that matches the current time or
555 * that even looks to be in the future relative to when we grabbed the current
556 * time... just treat this as a busy thread since it must have just blocked.
557 */
558 return (TRUE);
559 }
560 elapsed = cur_ts - lastblocked_ts;
561
562 absolutetime_to_microtime(elapsed, &secs, &usecs);
563
564 if (secs == 0 && usecs < wq_stalled_window_usecs)
565 return (TRUE);
566 return (FALSE);
567 }
568
569
570 #define WQ_TIMER_NEEDED(wq, start_timer) do { \
571 int oldflags = wq->wq_flags; \
572 \
573 if ( !(oldflags & (WQ_EXITING | WQ_ATIMER_RUNNING))) { \
574 if (OSCompareAndSwap(oldflags, oldflags | WQ_ATIMER_RUNNING, (UInt32 *)&wq->wq_flags)) \
575 start_timer = TRUE; \
576 } \
577 } while (0)
578
579
580
581 static void
582 workqueue_add_timer(struct workqueue *wq, __unused int param1)
583 {
584 proc_t p;
585 boolean_t start_timer = FALSE;
586 boolean_t retval;
587 boolean_t add_thread;
588 uint32_t busycount;
589
590 KERNEL_DEBUG(0xefffd108 | DBG_FUNC_START, wq, wq->wq_flags, wq->wq_nthreads, wq->wq_thidlecount, 0);
591
592 p = wq->wq_proc;
593
594 workqueue_lock_spin(p);
595
596 /*
597 * because workqueue_callback now runs w/o taking the workqueue lock
598 * we are unsynchronized w/r to a change in state of the running threads...
599 * to make sure we always evaluate that change, we allow it to start up
600 * a new timer if the current one is actively evalutating the state
601 * however, we do not need more than 2 timers fired up (1 active and 1 pending)
602 * and we certainly do not want 2 active timers evaluating the state
603 * simultaneously... so use WQL_ATIMER_BUSY to serialize the timers...
604 * note that WQL_ATIMER_BUSY is in a different flag word from WQ_ATIMER_RUNNING since
605 * it is always protected by the workq lock... WQ_ATIMER_RUNNING is evaluated
606 * and set atomimcally since the callback function needs to manipulate it
607 * w/o holding the workq lock...
608 *
609 * !WQ_ATIMER_RUNNING && !WQL_ATIMER_BUSY == no pending timer, no active timer
610 * !WQ_ATIMER_RUNNING && WQL_ATIMER_BUSY == no pending timer, 1 active timer
611 * WQ_ATIMER_RUNNING && !WQL_ATIMER_BUSY == 1 pending timer, no active timer
612 * WQ_ATIMER_RUNNING && WQL_ATIMER_BUSY == 1 pending timer, 1 active timer
613 */
614 while (wq->wq_lflags & WQL_ATIMER_BUSY) {
615 wq->wq_lflags |= WQL_ATIMER_WAITING;
616
617 assert_wait((caddr_t)wq, (THREAD_UNINT));
618 workqueue_unlock(p);
619
620 thread_block(THREAD_CONTINUE_NULL);
621
622 workqueue_lock_spin(p);
623 }
624 wq->wq_lflags |= WQL_ATIMER_BUSY;
625
626 /*
627 * the workq lock will protect us from seeing WQ_EXITING change state, but we
628 * still need to update this atomically in case someone else tries to start
629 * the timer just as we're releasing it
630 */
631 while ( !(OSCompareAndSwap(wq->wq_flags, (wq->wq_flags & ~WQ_ATIMER_RUNNING), (UInt32 *)&wq->wq_flags)));
632
633 again:
634 retval = TRUE;
635 add_thread = FALSE;
636
637 if ( !(wq->wq_flags & WQ_EXITING)) {
638 /*
639 * check to see if the stall frequency was beyond our tolerance
640 * or we have work on the queue, but haven't scheduled any
641 * new work within our acceptable time interval because
642 * there were no idle threads left to schedule
643 */
644 if (wq->wq_itemcount) {
645 uint32_t priority;
646 uint32_t affinity_tag;
647 uint32_t i;
648 uint64_t curtime;
649
650 for (priority = 0; priority < WORKQUEUE_NUMPRIOS; priority++) {
651 if (wq->wq_list_bitmap & (1 << priority))
652 break;
653 }
654 assert(priority < WORKQUEUE_NUMPRIOS);
655
656 curtime = mach_absolute_time();
657 busycount = 0;
658
659 for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; affinity_tag++) {
660 /*
661 * if we have no idle threads, we can try to add them if needed
662 */
663 if (wq->wq_thidlecount == 0)
664 add_thread = TRUE;
665
666 /*
667 * look for first affinity group that is currently not active
668 * i.e. no active threads at this priority level or higher
669 * and has not been active recently at this priority level or higher
670 */
671 for (i = 0; i <= priority; i++) {
672 if (wq->wq_thactive_count[i][affinity_tag]) {
673 add_thread = FALSE;
674 break;
675 }
676 if (wq->wq_thscheduled_count[i][affinity_tag]) {
677 if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i][affinity_tag])) {
678 add_thread = FALSE;
679 busycount++;
680 break;
681 }
682 }
683 }
684 if (add_thread == TRUE) {
685 retval = workqueue_addnewthread(wq);
686 break;
687 }
688 }
689 if (wq->wq_itemcount) {
690 /*
691 * as long as we have threads to schedule, and we successfully
692 * scheduled new work, keep trying
693 */
694 while (wq->wq_thidlecount && !(wq->wq_flags & WQ_EXITING)) {
695 /*
696 * workqueue_run_nextitem is responsible for
697 * dropping the workqueue lock in all cases
698 */
699 retval = workqueue_run_nextitem(p, wq, THREAD_NULL, 0, 0, 0);
700 workqueue_lock_spin(p);
701
702 if (retval == FALSE)
703 break;
704 }
705 if ( !(wq->wq_flags & WQ_EXITING) && wq->wq_itemcount) {
706
707 if (wq->wq_thidlecount == 0 && retval == TRUE && add_thread == TRUE)
708 goto again;
709
710 if (wq->wq_thidlecount == 0 || busycount)
711 WQ_TIMER_NEEDED(wq, start_timer);
712
713 KERNEL_DEBUG(0xefffd108 | DBG_FUNC_NONE, wq, wq->wq_itemcount, wq->wq_thidlecount, busycount, 0);
714 }
715 }
716 }
717 }
718 if ( !(wq->wq_flags & WQ_ATIMER_RUNNING))
719 wq->wq_timer_interval = 0;
720
721 wq->wq_lflags &= ~WQL_ATIMER_BUSY;
722
723 if ((wq->wq_flags & WQ_EXITING) || (wq->wq_lflags & WQL_ATIMER_WAITING)) {
724 /*
725 * wakeup the thread hung up in workqueue_exit or workqueue_add_timer waiting for this timer
726 * to finish getting out of the way
727 */
728 wq->wq_lflags &= ~WQL_ATIMER_WAITING;
729 wakeup(wq);
730 }
731 KERNEL_DEBUG(0xefffd108 | DBG_FUNC_END, wq, start_timer, wq->wq_nthreads, wq->wq_thidlecount, 0);
732
733 workqueue_unlock(p);
734
735 if (start_timer == TRUE)
736 workqueue_interval_timer_start(wq);
737 }
738
739
740 void
741 workqueue_thread_yielded(void)
742 {
743 struct workqueue *wq;
744 proc_t p;
745
746 p = current_proc();
747
748 if ((wq = p->p_wqptr) == NULL || wq->wq_itemcount == 0)
749 return;
750
751 workqueue_lock_spin(p);
752
753 if (wq->wq_itemcount) {
754 uint64_t curtime;
755 uint64_t elapsed;
756 clock_sec_t secs;
757 clock_usec_t usecs;
758
759 if (wq->wq_thread_yielded_count++ == 0)
760 wq->wq_thread_yielded_timestamp = mach_absolute_time();
761
762 if (wq->wq_thread_yielded_count < wq_yielded_threshold) {
763 workqueue_unlock(p);
764 return;
765 }
766 KERNEL_DEBUG(0xefffd138 | DBG_FUNC_START, wq, wq->wq_thread_yielded_count, wq->wq_itemcount, 0, 0);
767
768 wq->wq_thread_yielded_count = 0;
769
770 curtime = mach_absolute_time();
771 elapsed = curtime - wq->wq_thread_yielded_timestamp;
772 absolutetime_to_microtime(elapsed, &secs, &usecs);
773
774 if (secs == 0 && usecs < wq_yielded_window_usecs) {
775
776 if (wq->wq_thidlecount == 0) {
777 workqueue_addnewthread(wq);
778 /*
779 * 'workqueue_addnewthread' drops the workqueue lock
780 * when creating the new thread and then retakes it before
781 * returning... this window allows other threads to process
782 * work on the queue, so we need to recheck for available work
783 * if none found, we just return... the newly created thread
784 * will eventually get used (if it hasn't already)...
785 */
786 if (wq->wq_itemcount == 0) {
787 workqueue_unlock(p);
788 return;
789 }
790 }
791 if (wq->wq_thidlecount) {
792 uint32_t priority;
793 uint32_t affinity = -1;
794 user_addr_t item;
795 struct workitem *witem = NULL;
796 struct workitemlist *wl = NULL;
797 struct uthread *uth;
798 struct threadlist *tl;
799
800 uth = get_bsdthread_info(current_thread());
801 if ((tl = uth->uu_threadlist))
802 affinity = tl->th_affinity_tag;
803
804 for (priority = 0; priority < WORKQUEUE_NUMPRIOS; priority++) {
805 if (wq->wq_list_bitmap & (1 << priority)) {
806 wl = (struct workitemlist *)&wq->wq_list[priority];
807 break;
808 }
809 }
810 assert(wl != NULL);
811 assert(!(TAILQ_EMPTY(&wl->wl_itemlist)));
812
813 witem = TAILQ_FIRST(&wl->wl_itemlist);
814 TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry);
815
816 if (TAILQ_EMPTY(&wl->wl_itemlist))
817 wq->wq_list_bitmap &= ~(1 << priority);
818 wq->wq_itemcount--;
819
820 item = witem->wi_item;
821 witem->wi_item = (user_addr_t)0;
822 witem->wi_affinity = 0;
823
824 TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, wi_entry);
825
826 (void)workqueue_run_nextitem(p, wq, THREAD_NULL, item, priority, affinity);
827 /*
828 * workqueue_run_nextitem is responsible for
829 * dropping the workqueue lock in all cases
830 */
831 KERNEL_DEBUG(0xefffd138 | DBG_FUNC_END, wq, wq->wq_thread_yielded_count, wq->wq_itemcount, 1, 0);
832
833 return;
834 }
835 }
836 KERNEL_DEBUG(0xefffd138 | DBG_FUNC_END, wq, wq->wq_thread_yielded_count, wq->wq_itemcount, 2, 0);
837 }
838 workqueue_unlock(p);
839 }
840
841
842
843 static void
844 workqueue_callback(int type, thread_t thread)
845 {
846 struct uthread *uth;
847 struct threadlist *tl;
848 struct workqueue *wq;
849
850 uth = get_bsdthread_info(thread);
851 tl = uth->uu_threadlist;
852 wq = tl->th_workq;
853
854 switch (type) {
855
856 case SCHED_CALL_BLOCK:
857 {
858 uint32_t old_activecount;
859
860 old_activecount = OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority][tl->th_affinity_tag]);
861
862 if (old_activecount == 1) {
863 boolean_t start_timer = FALSE;
864 uint64_t curtime;
865 UInt64 *lastblocked_ptr;
866
867 /*
868 * we were the last active thread on this affinity set
869 * and we've got work to do
870 */
871 lastblocked_ptr = (UInt64 *)&wq->wq_lastblocked_ts[tl->th_priority][tl->th_affinity_tag];
872 curtime = mach_absolute_time();
873
874 /*
875 * if we collide with another thread trying to update the last_blocked (really unlikely
876 * since another thread would have to get scheduled and then block after we start down
877 * this path), it's not a problem. Either timestamp is adequate, so no need to retry
878 */
879 #if defined(__ppc__)
880 /*
881 * this doesn't have to actually work reliablly for PPC, it just has to compile/link
882 */
883 *lastblocked_ptr = (UInt64)curtime;
884 #else
885 OSCompareAndSwap64(*lastblocked_ptr, (UInt64)curtime, lastblocked_ptr);
886 #endif
887 if (wq->wq_itemcount)
888 WQ_TIMER_NEEDED(wq, start_timer);
889
890 if (start_timer == TRUE)
891 workqueue_interval_timer_start(wq);
892 }
893 KERNEL_DEBUG1(0xefffd020 | DBG_FUNC_START, wq, old_activecount, tl->th_priority, tl->th_affinity_tag, thread_tid(thread));
894 }
895 break;
896
897 case SCHED_CALL_UNBLOCK:
898 /*
899 * we cannot take the workqueue_lock here...
900 * an UNBLOCK can occur from a timer event which
901 * is run from an interrupt context... if the workqueue_lock
902 * is already held by this processor, we'll deadlock...
903 * the thread lock for the thread being UNBLOCKED
904 * is also held
905 */
906 OSAddAtomic(1, &wq->wq_thactive_count[tl->th_priority][tl->th_affinity_tag]);
907
908 KERNEL_DEBUG1(0xefffd020 | DBG_FUNC_END, wq, wq->wq_threads_scheduled, tl->th_priority, tl->th_affinity_tag, thread_tid(thread));
909
910 break;
911 }
912 }
913
914
915 static void
916 workqueue_removethread(struct threadlist *tl)
917 {
918 struct workqueue *wq;
919 struct uthread * uth;
920
921 wq = tl->th_workq;
922
923 TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
924
925 wq->wq_nthreads--;
926 wq->wq_thidlecount--;
927
928 /*
929 * Clear the threadlist pointer in uthread so
930 * blocked thread on wakeup for termination will
931 * not access the thread list as it is going to be
932 * freed.
933 */
934 thread_sched_call(tl->th_thread, NULL);
935
936 uth = get_bsdthread_info(tl->th_thread);
937 if (uth != (struct uthread *)0) {
938 uth->uu_threadlist = NULL;
939 }
940 workqueue_unlock(wq->wq_proc);
941
942 if ( (tl->th_flags & TH_LIST_SUSPENDED) ) {
943 /*
944 * thread was created, but never used...
945 * need to clean up the stack and port ourselves
946 * since we're not going to spin up through the
947 * normal exit path triggered from Libc
948 */
949 (void)mach_vm_deallocate(wq->wq_map, tl->th_stackaddr, tl->th_allocsize);
950 (void)mach_port_deallocate(get_task_ipcspace(wq->wq_task), tl->th_thport);
951
952 KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_END, wq, (uintptr_t)thread_tid(current_thread()), wq->wq_nthreads, 0xdead, thread_tid(tl->th_thread));
953 } else {
954
955 KERNEL_DEBUG1(0xefffd018 | DBG_FUNC_END, wq, (uintptr_t)thread_tid(current_thread()), wq->wq_nthreads, 0xdead, thread_tid(tl->th_thread));
956 }
957 /*
958 * drop our ref on the thread
959 */
960 thread_deallocate(tl->th_thread);
961
962 kfree(tl, sizeof(struct threadlist));
963 }
964
965
966
967 static boolean_t
968 workqueue_addnewthread(struct workqueue *wq)
969 {
970 struct threadlist *tl;
971 struct uthread *uth;
972 kern_return_t kret;
973 thread_t th;
974 proc_t p;
975 void *sright;
976 mach_vm_offset_t stackaddr;
977
978 if (wq->wq_nthreads >= wq_max_threads || wq->wq_nthreads >= (CONFIG_THREAD_MAX - 20))
979 return (FALSE);
980 wq->wq_nthreads++;
981
982 p = wq->wq_proc;
983 workqueue_unlock(p);
984
985 kret = thread_create_workq(wq->wq_task, (thread_continue_t)wq_unsuspend_continue, &th);
986
987 if (kret != KERN_SUCCESS)
988 goto failed;
989
990 tl = kalloc(sizeof(struct threadlist));
991 bzero(tl, sizeof(struct threadlist));
992
993 #if defined(__ppc__)
994 stackaddr = 0xF0000000;
995 #elif defined(__i386__) || defined(__x86_64__)
996 stackaddr = 0xB0000000;
997 #else
998 #error Need to define a stack address hint for this architecture
999 #endif
1000 tl->th_allocsize = PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE + p->p_pthsize;
1001
1002 kret = mach_vm_map(wq->wq_map, &stackaddr,
1003 tl->th_allocsize,
1004 page_size-1,
1005 VM_MAKE_TAG(VM_MEMORY_STACK)| VM_FLAGS_ANYWHERE , NULL,
1006 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL,
1007 VM_INHERIT_DEFAULT);
1008
1009 if (kret != KERN_SUCCESS) {
1010 kret = mach_vm_allocate(wq->wq_map,
1011 &stackaddr, tl->th_allocsize,
1012 VM_MAKE_TAG(VM_MEMORY_STACK) | VM_FLAGS_ANYWHERE);
1013 }
1014 if (kret == KERN_SUCCESS) {
1015 /*
1016 * The guard page is at the lowest address
1017 * The stack base is the highest address
1018 */
1019 kret = mach_vm_protect(wq->wq_map, stackaddr, PTH_DEFAULT_GUARDSIZE, FALSE, VM_PROT_NONE);
1020
1021 if (kret != KERN_SUCCESS)
1022 (void) mach_vm_deallocate(wq->wq_map, stackaddr, tl->th_allocsize);
1023 }
1024 if (kret != KERN_SUCCESS) {
1025 (void) thread_terminate(th);
1026
1027 kfree(tl, sizeof(struct threadlist));
1028 goto failed;
1029 }
1030 thread_reference(th);
1031
1032 sright = (void *) convert_thread_to_port(th);
1033 tl->th_thport = ipc_port_copyout_send(sright, get_task_ipcspace(wq->wq_task));
1034
1035 thread_static_param(th, TRUE);
1036
1037 tl->th_flags = TH_LIST_INITED | TH_LIST_SUSPENDED;
1038
1039 tl->th_thread = th;
1040 tl->th_workq = wq;
1041 tl->th_stackaddr = stackaddr;
1042 tl->th_affinity_tag = -1;
1043 tl->th_priority = WORKQUEUE_NUMPRIOS;
1044 tl->th_policy = -1;
1045
1046 #if defined(__ppc__)
1047 //ml_fp_setvalid(FALSE);
1048 thread_set_cthreadself(th, (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE), IS_64BIT_PROCESS(p));
1049 #endif /* __ppc__ */
1050
1051 uth = get_bsdthread_info(tl->th_thread);
1052 uth->uu_threadlist = (void *)tl;
1053
1054 workqueue_lock_spin(p);
1055
1056 TAILQ_INSERT_TAIL(&wq->wq_thidlelist, tl, th_entry);
1057
1058 wq->wq_thidlecount++;
1059
1060 KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_START, wq, wq->wq_nthreads, 0, thread_tid(current_thread()), thread_tid(tl->th_thread));
1061
1062 return (TRUE);
1063
1064 failed:
1065 workqueue_lock_spin(p);
1066 wq->wq_nthreads--;
1067
1068 return (FALSE);
1069 }
1070
1071
1072 int
1073 workq_open(struct proc *p, __unused struct workq_open_args *uap, __unused int32_t *retval)
1074 {
1075 struct workqueue * wq;
1076 int wq_size;
1077 char * ptr;
1078 char * nptr;
1079 int j;
1080 uint32_t i;
1081 uint32_t num_cpus;
1082 int error = 0;
1083 boolean_t need_wakeup = FALSE;
1084 struct workitem * witem;
1085 struct workitemlist *wl;
1086
1087 if ((p->p_lflag & P_LREGISTER) == 0)
1088 return(EINVAL);
1089
1090 workqueue_lock_spin(p);
1091
1092 if (p->p_wqptr == NULL) {
1093
1094 while (p->p_wqiniting == TRUE) {
1095
1096 assert_wait((caddr_t)&p->p_wqiniting, THREAD_UNINT);
1097 workqueue_unlock(p);
1098
1099 thread_block(THREAD_CONTINUE_NULL);
1100
1101 workqueue_lock_spin(p);
1102 }
1103 if (p->p_wqptr != NULL)
1104 goto out;
1105
1106 p->p_wqiniting = TRUE;
1107
1108 workqueue_unlock(p);
1109
1110 num_cpus = ml_get_max_cpus();
1111
1112 wq_size = sizeof(struct workqueue) +
1113 (num_cpus * WORKQUEUE_NUMPRIOS * sizeof(uint32_t)) +
1114 (num_cpus * WORKQUEUE_NUMPRIOS * sizeof(uint32_t)) +
1115 (num_cpus * WORKQUEUE_NUMPRIOS * sizeof(uint64_t)) +
1116 sizeof(uint64_t);
1117
1118 ptr = (char *)kalloc(wq_size);
1119 bzero(ptr, wq_size);
1120
1121 wq = (struct workqueue *)ptr;
1122 wq->wq_flags = WQ_LIST_INITED;
1123 wq->wq_proc = p;
1124 wq->wq_affinity_max = num_cpus;
1125 wq->wq_task = current_task();
1126 wq->wq_map = current_map();
1127
1128 for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
1129 wl = (struct workitemlist *)&wq->wq_list[i];
1130 TAILQ_INIT(&wl->wl_itemlist);
1131 TAILQ_INIT(&wl->wl_freelist);
1132
1133 for (j = 0; j < WORKITEM_SIZE; j++) {
1134 witem = &wq->wq_array[(i*WORKITEM_SIZE) + j];
1135 TAILQ_INSERT_TAIL(&wl->wl_freelist, witem, wi_entry);
1136 }
1137 wq->wq_reqconc[i] = wq->wq_affinity_max;
1138 }
1139 nptr = ptr + sizeof(struct workqueue);
1140
1141 for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
1142 wq->wq_thactive_count[i] = (uint32_t *)nptr;
1143 nptr += (num_cpus * sizeof(uint32_t));
1144 }
1145 for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
1146 wq->wq_thscheduled_count[i] = (uint32_t *)nptr;
1147 nptr += (num_cpus * sizeof(uint32_t));
1148 }
1149 /*
1150 * align nptr on a 64 bit boundary so that we can do nice
1151 * atomic64 operations on the timestamps...
1152 * note that we requested an extra uint64_t when calcuating
1153 * the size for the allocation of the workqueue struct
1154 */
1155 nptr += (sizeof(uint64_t) - 1);
1156 nptr = (char *)((long)nptr & ~(sizeof(uint64_t) - 1));
1157
1158 for (i = 0; i < WORKQUEUE_NUMPRIOS; i++) {
1159 wq->wq_lastblocked_ts[i] = (uint64_t *)nptr;
1160 nptr += (num_cpus * sizeof(uint64_t));
1161 }
1162 TAILQ_INIT(&wq->wq_thrunlist);
1163 TAILQ_INIT(&wq->wq_thidlelist);
1164
1165 wq->wq_atimer_call = thread_call_allocate((thread_call_func_t)workqueue_add_timer, (thread_call_param_t)wq);
1166
1167 workqueue_lock_spin(p);
1168
1169 p->p_wqptr = (void *)wq;
1170 p->p_wqsize = wq_size;
1171
1172 p->p_wqiniting = FALSE;
1173 need_wakeup = TRUE;
1174 }
1175 out:
1176 workqueue_unlock(p);
1177
1178 if (need_wakeup == TRUE)
1179 wakeup(&p->p_wqiniting);
1180 return(error);
1181 }
1182
1183 int
1184 workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, __unused int32_t *retval)
1185 {
1186 user_addr_t item = uap->item;
1187 int options = uap->options;
1188 int prio = uap->prio; /* should be used to find the right workqueue */
1189 int affinity = uap->affinity;
1190 int error = 0;
1191 thread_t th = THREAD_NULL;
1192 user_addr_t oc_item = 0;
1193 struct workqueue *wq;
1194
1195 if ((p->p_lflag & P_LREGISTER) == 0)
1196 return(EINVAL);
1197
1198 /*
1199 * affinity not yet hooked up on this path
1200 */
1201 affinity = -1;
1202
1203 switch (options) {
1204
1205 case WQOPS_QUEUE_ADD: {
1206
1207 if (prio & WORKQUEUE_OVERCOMMIT) {
1208 prio &= ~WORKQUEUE_OVERCOMMIT;
1209 oc_item = item;
1210 }
1211 if ((prio < 0) || (prio >= WORKQUEUE_NUMPRIOS))
1212 return (EINVAL);
1213
1214 workqueue_lock_spin(p);
1215
1216 if ((wq = (struct workqueue *)p->p_wqptr) == NULL) {
1217 workqueue_unlock(p);
1218 return (EINVAL);
1219 }
1220 if (wq->wq_thidlecount == 0 && (oc_item || (wq->wq_nthreads < wq->wq_affinity_max))) {
1221
1222 workqueue_addnewthread(wq);
1223
1224 if (wq->wq_thidlecount == 0)
1225 oc_item = 0;
1226 }
1227 if (oc_item == 0)
1228 error = workqueue_additem(wq, prio, item, affinity);
1229
1230 KERNEL_DEBUG(0xefffd008 | DBG_FUNC_NONE, wq, prio, affinity, oc_item, 0);
1231 }
1232 break;
1233 case WQOPS_QUEUE_REMOVE: {
1234
1235 if ((prio < 0) || (prio >= WORKQUEUE_NUMPRIOS))
1236 return (EINVAL);
1237
1238 workqueue_lock_spin(p);
1239
1240 if ((wq = (struct workqueue *)p->p_wqptr) == NULL) {
1241 workqueue_unlock(p);
1242 return (EINVAL);
1243 }
1244 error = workqueue_removeitem(wq, prio, item);
1245 }
1246 break;
1247 case WQOPS_THREAD_RETURN: {
1248
1249 th = current_thread();
1250 struct uthread *uth = get_bsdthread_info(th);
1251
1252 /* reset signal mask on the workqueue thread to default state */
1253 if (uth->uu_sigmask != (sigset_t)(~workq_threadmask)) {
1254 proc_lock(p);
1255 uth->uu_sigmask = ~workq_threadmask;
1256 proc_unlock(p);
1257 }
1258
1259 workqueue_lock_spin(p);
1260
1261 if ((wq = (struct workqueue *)p->p_wqptr) == NULL || (uth->uu_threadlist == NULL)) {
1262 workqueue_unlock(p);
1263 return (EINVAL);
1264 }
1265 KERNEL_DEBUG(0xefffd004 | DBG_FUNC_END, wq, 0, 0, 0, 0);
1266 }
1267 break;
1268 case WQOPS_THREAD_SETCONC: {
1269
1270 if ((prio < 0) || (prio > WORKQUEUE_NUMPRIOS))
1271 return (EINVAL);
1272
1273 workqueue_lock_spin(p);
1274
1275 if ((wq = (struct workqueue *)p->p_wqptr) == NULL) {
1276 workqueue_unlock(p);
1277 return (EINVAL);
1278 }
1279 /*
1280 * for this operation, we re-purpose the affinity
1281 * argument as the concurrency target
1282 */
1283 if (prio < WORKQUEUE_NUMPRIOS)
1284 wq->wq_reqconc[prio] = affinity;
1285 else {
1286 for (prio = 0; prio < WORKQUEUE_NUMPRIOS; prio++)
1287 wq->wq_reqconc[prio] = affinity;
1288
1289 }
1290 }
1291 break;
1292 default:
1293 return (EINVAL);
1294 }
1295 (void)workqueue_run_nextitem(p, wq, th, oc_item, prio, affinity);
1296 /*
1297 * workqueue_run_nextitem is responsible for
1298 * dropping the workqueue lock in all cases
1299 */
1300 return (error);
1301
1302 }
1303
1304 void
1305 workqueue_exit(struct proc *p)
1306 {
1307 struct workqueue * wq;
1308 struct threadlist * tl, *tlist;
1309 struct uthread *uth;
1310 int wq_size = 0;
1311
1312 if (p->p_wqptr != NULL) {
1313
1314 KERNEL_DEBUG(0x900808c | DBG_FUNC_START, p->p_wqptr, 0, 0, 0, 0);
1315
1316 workqueue_lock_spin(p);
1317
1318 wq = (struct workqueue *)p->p_wqptr;
1319
1320 if (wq == NULL) {
1321 workqueue_unlock(p);
1322
1323 KERNEL_DEBUG(0x900808c | DBG_FUNC_END, 0, 0, 0, -1, 0);
1324 return;
1325 }
1326 wq_size = p->p_wqsize;
1327 p->p_wqptr = NULL;
1328 p->p_wqsize = 0;
1329
1330 /*
1331 * we now arm the timer in the callback function w/o holding the workq lock...
1332 * we do this by setting WQ_ATIMER_RUNNING via OSCompareAndSwap in order to
1333 * insure only a single timer if running and to notice that WQ_EXITING has
1334 * been set (we don't want to start a timer once WQ_EXITING is posted)
1335 *
1336 * so once we have successfully set WQ_EXITING, we cannot fire up a new timer...
1337 * therefor no need to clear the timer state atomically from the flags
1338 *
1339 * since we always hold the workq lock when dropping WQ_ATIMER_RUNNING
1340 * the check for and sleep until clear is protected
1341 */
1342 while ( !(OSCompareAndSwap(wq->wq_flags, (wq->wq_flags | WQ_EXITING), (UInt32 *)&wq->wq_flags)));
1343
1344 if (wq->wq_flags & WQ_ATIMER_RUNNING) {
1345 if (thread_call_cancel(wq->wq_atimer_call) == TRUE)
1346 wq->wq_flags &= ~WQ_ATIMER_RUNNING;
1347 }
1348 while ((wq->wq_flags & WQ_ATIMER_RUNNING) || (wq->wq_lflags & WQL_ATIMER_BUSY)) {
1349
1350 assert_wait((caddr_t)wq, (THREAD_UNINT));
1351 workqueue_unlock(p);
1352
1353 thread_block(THREAD_CONTINUE_NULL);
1354
1355 workqueue_lock_spin(p);
1356 }
1357 workqueue_unlock(p);
1358
1359 TAILQ_FOREACH_SAFE(tl, &wq->wq_thrunlist, th_entry, tlist) {
1360
1361 thread_sched_call(tl->th_thread, NULL);
1362
1363 uth = get_bsdthread_info(tl->th_thread);
1364 if (uth != (struct uthread *)0) {
1365 uth->uu_threadlist = NULL;
1366 }
1367 TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry);
1368
1369 /*
1370 * drop our last ref on the thread
1371 */
1372 thread_deallocate(tl->th_thread);
1373
1374 kfree(tl, sizeof(struct threadlist));
1375 }
1376 TAILQ_FOREACH_SAFE(tl, &wq->wq_thidlelist, th_entry, tlist) {
1377
1378 thread_sched_call(tl->th_thread, NULL);
1379
1380 uth = get_bsdthread_info(tl->th_thread);
1381 if (uth != (struct uthread *)0) {
1382 uth->uu_threadlist = NULL;
1383 }
1384 TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
1385
1386 /*
1387 * drop our last ref on the thread
1388 */
1389 thread_deallocate(tl->th_thread);
1390
1391 kfree(tl, sizeof(struct threadlist));
1392 }
1393 thread_call_free(wq->wq_atimer_call);
1394
1395 kfree(wq, wq_size);
1396
1397 KERNEL_DEBUG(0x900808c | DBG_FUNC_END, 0, 0, 0, 0, 0);
1398 }
1399 }
1400
1401 static int
1402 workqueue_additem(struct workqueue *wq, int prio, user_addr_t item, int affinity)
1403 {
1404 struct workitem *witem;
1405 struct workitemlist *wl;
1406
1407 wl = (struct workitemlist *)&wq->wq_list[prio];
1408
1409 if (TAILQ_EMPTY(&wl->wl_freelist))
1410 return (ENOMEM);
1411
1412 witem = (struct workitem *)TAILQ_FIRST(&wl->wl_freelist);
1413 TAILQ_REMOVE(&wl->wl_freelist, witem, wi_entry);
1414
1415 witem->wi_item = item;
1416 witem->wi_affinity = affinity;
1417 TAILQ_INSERT_TAIL(&wl->wl_itemlist, witem, wi_entry);
1418
1419 wq->wq_list_bitmap |= (1 << prio);
1420
1421 wq->wq_itemcount++;
1422
1423 return (0);
1424 }
1425
1426 static int
1427 workqueue_removeitem(struct workqueue *wq, int prio, user_addr_t item)
1428 {
1429 struct workitem *witem;
1430 struct workitemlist *wl;
1431 int error = ESRCH;
1432
1433 wl = (struct workitemlist *)&wq->wq_list[prio];
1434
1435 TAILQ_FOREACH(witem, &wl->wl_itemlist, wi_entry) {
1436 if (witem->wi_item == item) {
1437 TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry);
1438
1439 if (TAILQ_EMPTY(&wl->wl_itemlist))
1440 wq->wq_list_bitmap &= ~(1 << prio);
1441 wq->wq_itemcount--;
1442
1443 witem->wi_item = (user_addr_t)0;
1444 witem->wi_affinity = 0;
1445 TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, wi_entry);
1446
1447 error = 0;
1448 break;
1449 }
1450 }
1451 return (error);
1452 }
1453
1454 static int workqueue_importance[WORKQUEUE_NUMPRIOS] =
1455 {
1456 2, 0, -2,
1457 };
1458
1459 static int workqueue_policy[WORKQUEUE_NUMPRIOS] =
1460 {
1461 1, 1, 1,
1462 };
1463
1464
1465 /*
1466 * workqueue_run_nextitem:
1467 * called with the workqueue lock held...
1468 * responsible for dropping it in all cases
1469 */
1470 static boolean_t
1471 workqueue_run_nextitem(proc_t p, struct workqueue *wq, thread_t thread, user_addr_t oc_item, int oc_prio, int oc_affinity)
1472 {
1473 struct workitem *witem = NULL;
1474 user_addr_t item = 0;
1475 thread_t th_to_run = THREAD_NULL;
1476 thread_t th_to_park = THREAD_NULL;
1477 int wake_thread = 0;
1478 int reuse_thread = 1;
1479 uint32_t priority, orig_priority;
1480 uint32_t affinity_tag, orig_affinity_tag;
1481 uint32_t i, n;
1482 uint32_t activecount;
1483 uint32_t busycount;
1484 uint32_t us_to_wait;
1485 struct threadlist *tl = NULL;
1486 struct threadlist *ttl = NULL;
1487 struct uthread *uth = NULL;
1488 struct workitemlist *wl = NULL;
1489 boolean_t start_timer = FALSE;
1490 boolean_t adjust_counters = TRUE;
1491 uint64_t curtime;
1492
1493
1494 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_START, wq, thread, wq->wq_thidlecount, wq->wq_itemcount, 0);
1495
1496 /*
1497 * from here until we drop the workq lock
1498 * we can't be pre-empted since we hold
1499 * the lock in spin mode... this is important
1500 * since we have to independently update the priority
1501 * and affinity that the thread is associated with
1502 * and these values are used to index the multi-dimensional
1503 * counter arrays in 'workqueue_callback'
1504 */
1505 if (oc_item) {
1506 uint32_t min_scheduled = 0;
1507 uint32_t scheduled_count;
1508 uint32_t active_count;
1509 uint32_t t_affinity = 0;
1510
1511 priority = oc_prio;
1512 item = oc_item;
1513
1514 if ((affinity_tag = oc_affinity) == (uint32_t)-1) {
1515 for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; affinity_tag++) {
1516 /*
1517 * look for the affinity group with the least number of threads
1518 */
1519 scheduled_count = 0;
1520 active_count = 0;
1521
1522 for (i = 0; i <= priority; i++) {
1523 scheduled_count += wq->wq_thscheduled_count[i][affinity_tag];
1524 active_count += wq->wq_thactive_count[i][affinity_tag];
1525 }
1526 if (active_count == 0) {
1527 t_affinity = affinity_tag;
1528 break;
1529 }
1530 if (affinity_tag == 0 || scheduled_count < min_scheduled) {
1531 min_scheduled = scheduled_count;
1532 t_affinity = affinity_tag;
1533 }
1534 }
1535 affinity_tag = t_affinity;
1536 }
1537 goto grab_idle_thread;
1538 }
1539 if (wq->wq_itemcount == 0) {
1540 if ((th_to_park = thread) == THREAD_NULL)
1541 goto out_of_work;
1542 goto parkit;
1543 }
1544 for (priority = 0; priority < WORKQUEUE_NUMPRIOS; priority++) {
1545 if (wq->wq_list_bitmap & (1 << priority)) {
1546 wl = (struct workitemlist *)&wq->wq_list[priority];
1547 break;
1548 }
1549 }
1550 assert(wl != NULL);
1551 assert(!(TAILQ_EMPTY(&wl->wl_itemlist)));
1552
1553 curtime = mach_absolute_time();
1554
1555 if (thread != THREAD_NULL) {
1556 uth = get_bsdthread_info(thread);
1557 tl = uth->uu_threadlist;
1558 affinity_tag = tl->th_affinity_tag;
1559
1560 /*
1561 * check to see if the affinity group this thread is
1562 * associated with is still within the bounds of the
1563 * specified concurrency for the priority level
1564 * we're considering running work for
1565 */
1566 if (affinity_tag < wq->wq_reqconc[priority]) {
1567 /*
1568 * we're a worker thread from the pool... currently we
1569 * are considered 'active' which means we're counted
1570 * in "wq_thactive_count"
1571 * add up the active counts of all the priority levels
1572 * up to and including the one we want to schedule
1573 */
1574 for (activecount = 0, i = 0; i <= priority; i++) {
1575 uint32_t acount;
1576
1577 acount = wq->wq_thactive_count[i][affinity_tag];
1578
1579 if (acount == 0 && wq->wq_thscheduled_count[i][affinity_tag]) {
1580 if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i][affinity_tag]))
1581 acount = 1;
1582 }
1583 activecount += acount;
1584 }
1585 if (activecount == 1) {
1586 /*
1587 * we're the only active thread associated with our
1588 * affinity group at this priority level and higher,
1589 * so pick up some work and keep going
1590 */
1591 th_to_run = thread;
1592 goto pick_up_work;
1593 }
1594 }
1595 /*
1596 * there's more than 1 thread running in this affinity group
1597 * or the concurrency level has been cut back for this priority...
1598 * lets continue on and look for an 'empty' group to run this
1599 * work item in
1600 */
1601 }
1602 busycount = 0;
1603
1604 for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; affinity_tag++) {
1605 /*
1606 * look for first affinity group that is currently not active
1607 * i.e. no active threads at this priority level or higher
1608 * and no threads that have run recently
1609 */
1610 for (activecount = 0, i = 0; i <= priority; i++) {
1611 if ((activecount = wq->wq_thactive_count[i][affinity_tag]))
1612 break;
1613
1614 if (wq->wq_thscheduled_count[i][affinity_tag]) {
1615 if (wq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i][affinity_tag])) {
1616 busycount++;
1617 break;
1618 }
1619 }
1620 }
1621 if (activecount == 0 && busycount == 0)
1622 break;
1623 }
1624 if (affinity_tag >= wq->wq_reqconc[priority]) {
1625 /*
1626 * we've already got at least 1 thread per
1627 * affinity group in the active state...
1628 */
1629 if (busycount) {
1630 /*
1631 * we found at least 1 thread in the
1632 * 'busy' state... make sure we start
1633 * the timer because if they are the only
1634 * threads keeping us from scheduling
1635 * this workitem, we won't get a callback
1636 * to kick off the timer... we need to
1637 * start it now...
1638 */
1639 WQ_TIMER_NEEDED(wq, start_timer);
1640 }
1641 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_NONE, wq, busycount, start_timer, 0, 0);
1642
1643 if (thread != THREAD_NULL) {
1644 /*
1645 * go park this one for later
1646 */
1647 th_to_park = thread;
1648 goto parkit;
1649 }
1650 goto out_of_work;
1651 }
1652 if (thread != THREAD_NULL) {
1653 /*
1654 * we're overbooked on the affinity group this thread is
1655 * currently associated with, but we have work to do
1656 * and at least 1 idle processor, so we'll just retarget
1657 * this thread to a new affinity group
1658 */
1659 th_to_run = thread;
1660 goto pick_up_work;
1661 }
1662 if (wq->wq_thidlecount == 0) {
1663 /*
1664 * we don't have a thread to schedule, but we have
1665 * work to do and at least 1 affinity group that
1666 * doesn't currently have an active thread...
1667 */
1668 WQ_TIMER_NEEDED(wq, start_timer);
1669
1670 KERNEL_DEBUG(0xefffd118, wq, wq->wq_nthreads, start_timer, 0, 0);
1671
1672 goto no_thread_to_run;
1673 }
1674
1675 grab_idle_thread:
1676 /*
1677 * we've got a candidate (affinity group with no currently
1678 * active threads) to start a new thread on...
1679 * we already know there is both work available
1680 * and an idle thread, so activate a thread and then
1681 * fall into the code that pulls a new workitem...
1682 */
1683 TAILQ_FOREACH(ttl, &wq->wq_thidlelist, th_entry) {
1684 if (ttl->th_affinity_tag == affinity_tag || ttl->th_affinity_tag == (uint16_t)-1) {
1685
1686 TAILQ_REMOVE(&wq->wq_thidlelist, ttl, th_entry);
1687 tl = ttl;
1688
1689 break;
1690 }
1691 }
1692 if (tl == NULL) {
1693 tl = TAILQ_FIRST(&wq->wq_thidlelist);
1694 TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry);
1695 }
1696 wq->wq_thidlecount--;
1697
1698 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, tl, th_entry);
1699
1700 if ((tl->th_flags & TH_LIST_SUSPENDED) == TH_LIST_SUSPENDED) {
1701 tl->th_flags &= ~TH_LIST_SUSPENDED;
1702 reuse_thread = 0;
1703
1704 } else if ((tl->th_flags & TH_LIST_BLOCKED) == TH_LIST_BLOCKED) {
1705 tl->th_flags &= ~TH_LIST_BLOCKED;
1706 wake_thread = 1;
1707 }
1708 tl->th_flags |= TH_LIST_RUNNING | TH_LIST_BUSY;
1709
1710 wq->wq_threads_scheduled++;
1711 wq->wq_thscheduled_count[priority][affinity_tag]++;
1712 OSAddAtomic(1, &wq->wq_thactive_count[priority][affinity_tag]);
1713
1714 adjust_counters = FALSE;
1715 th_to_run = tl->th_thread;
1716
1717 pick_up_work:
1718 if (item == 0) {
1719 witem = TAILQ_FIRST(&wl->wl_itemlist);
1720 TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry);
1721
1722 if (TAILQ_EMPTY(&wl->wl_itemlist))
1723 wq->wq_list_bitmap &= ~(1 << priority);
1724 wq->wq_itemcount--;
1725
1726 item = witem->wi_item;
1727 witem->wi_item = (user_addr_t)0;
1728 witem->wi_affinity = 0;
1729 TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, wi_entry);
1730 }
1731 orig_priority = tl->th_priority;
1732 orig_affinity_tag = tl->th_affinity_tag;
1733
1734 tl->th_priority = priority;
1735 tl->th_affinity_tag = affinity_tag;
1736
1737 if (adjust_counters == TRUE && (orig_priority != priority || orig_affinity_tag != affinity_tag)) {
1738 /*
1739 * we need to adjust these counters based on this
1740 * thread's new disposition w/r to affinity and priority
1741 */
1742 OSAddAtomic(-1, &wq->wq_thactive_count[orig_priority][orig_affinity_tag]);
1743 OSAddAtomic(1, &wq->wq_thactive_count[priority][affinity_tag]);
1744
1745 wq->wq_thscheduled_count[orig_priority][orig_affinity_tag]--;
1746 wq->wq_thscheduled_count[priority][affinity_tag]++;
1747 }
1748 wq->wq_thread_yielded_count = 0;
1749
1750 workqueue_unlock(p);
1751
1752 if (orig_affinity_tag != affinity_tag) {
1753 /*
1754 * this thread's affinity does not match the affinity group
1755 * its being placed on (it's either a brand new thread or
1756 * we're retargeting an existing thread to a new group)...
1757 * affinity tag of 0 means no affinity...
1758 * but we want our tags to be 0 based because they
1759 * are used to index arrays, so...
1760 * keep it 0 based internally and bump by 1 when
1761 * calling out to set it
1762 */
1763 KERNEL_DEBUG(0xefffd114 | DBG_FUNC_START, wq, orig_affinity_tag, 0, 0, 0);
1764
1765 (void)thread_affinity_set(th_to_run, affinity_tag + 1);
1766
1767 KERNEL_DEBUG(0xefffd114 | DBG_FUNC_END, wq, affinity_tag, 0, 0, 0);
1768 }
1769 if (orig_priority != priority) {
1770 thread_precedence_policy_data_t precedinfo;
1771 thread_extended_policy_data_t extinfo;
1772 uint32_t policy;
1773
1774 policy = workqueue_policy[priority];
1775
1776 KERNEL_DEBUG(0xefffd120 | DBG_FUNC_START, wq, orig_priority, tl->th_policy, 0, 0);
1777
1778 if (tl->th_policy != policy) {
1779
1780 extinfo.timeshare = policy;
1781 (void)thread_policy_set_internal(th_to_run, THREAD_EXTENDED_POLICY, (thread_policy_t)&extinfo, THREAD_EXTENDED_POLICY_COUNT);
1782
1783 tl->th_policy = policy;
1784 }
1785 precedinfo.importance = workqueue_importance[priority];
1786 (void)thread_policy_set_internal(th_to_run, THREAD_PRECEDENCE_POLICY, (thread_policy_t)&precedinfo, THREAD_PRECEDENCE_POLICY_COUNT);
1787
1788 KERNEL_DEBUG(0xefffd120 | DBG_FUNC_END, wq, priority, policy, 0, 0);
1789 }
1790 if (kdebug_enable) {
1791 int lpri = -1;
1792 int laffinity = -1;
1793 int first = -1;
1794 uint32_t code = 0xefffd02c | DBG_FUNC_START;
1795
1796 for (n = 0; n < WORKQUEUE_NUMPRIOS; n++) {
1797 for (i = 0; i < wq->wq_affinity_max; i++) {
1798 if (wq->wq_thactive_count[n][i]) {
1799 if (lpri != -1) {
1800 KERNEL_DEBUG(code, lpri, laffinity, wq->wq_thactive_count[lpri][laffinity], first, 0);
1801 code = 0xefffd02c;
1802 first = 0;
1803 }
1804 lpri = n;
1805 laffinity = i;
1806 }
1807 }
1808 }
1809 if (lpri != -1) {
1810 if (first == -1)
1811 first = 0xeeeeeeee;
1812 KERNEL_DEBUG(0xefffd02c | DBG_FUNC_END, lpri, laffinity, wq->wq_thactive_count[lpri][laffinity], first, 0);
1813 }
1814 }
1815 /*
1816 * if current thread is reused for workitem, does not return via unix_syscall
1817 */
1818 wq_runitem(p, item, th_to_run, tl, reuse_thread, wake_thread, (thread == th_to_run));
1819
1820 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, wq, thread_tid(th_to_run), item, 1, 0);
1821
1822 return (TRUE);
1823
1824 out_of_work:
1825 /*
1826 * we have no work to do or we are fully booked
1827 * w/r to running threads...
1828 */
1829 no_thread_to_run:
1830 workqueue_unlock(p);
1831
1832 if (start_timer)
1833 workqueue_interval_timer_start(wq);
1834
1835 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, wq, thread_tid(thread), 0, 2, 0);
1836
1837 return (FALSE);
1838
1839 parkit:
1840 /*
1841 * this is a workqueue thread with no more
1842 * work to do... park it for now
1843 */
1844 uth = get_bsdthread_info(th_to_park);
1845 tl = uth->uu_threadlist;
1846 if (tl == 0)
1847 panic("wq thread with no threadlist ");
1848
1849 TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry);
1850 tl->th_flags &= ~TH_LIST_RUNNING;
1851
1852 tl->th_flags |= TH_LIST_BLOCKED;
1853 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, tl, th_entry);
1854
1855 thread_sched_call(th_to_park, NULL);
1856
1857 OSAddAtomic(-1, &wq->wq_thactive_count[tl->th_priority][tl->th_affinity_tag]);
1858 wq->wq_thscheduled_count[tl->th_priority][tl->th_affinity_tag]--;
1859 wq->wq_threads_scheduled--;
1860
1861 if (wq->wq_thidlecount < 100)
1862 us_to_wait = wq_reduce_pool_window_usecs - (wq->wq_thidlecount * (wq_reduce_pool_window_usecs / 100));
1863 else
1864 us_to_wait = wq_reduce_pool_window_usecs / 100;
1865
1866 wq->wq_thidlecount++;
1867
1868 assert_wait_timeout((caddr_t)tl, (THREAD_INTERRUPTIBLE), us_to_wait, NSEC_PER_USEC);
1869
1870 workqueue_unlock(p);
1871
1872 if (start_timer)
1873 workqueue_interval_timer_start(wq);
1874
1875 KERNEL_DEBUG1(0xefffd018 | DBG_FUNC_START, wq, wq->wq_threads_scheduled, wq->wq_thidlecount, us_to_wait, thread_tid(th_to_park));
1876 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, wq, thread_tid(thread), 0, 3, 0);
1877
1878 thread_block((thread_continue_t)wq_unpark_continue);
1879 /* NOT REACHED */
1880
1881 return (FALSE);
1882 }
1883
1884
1885 static void
1886 wq_unsuspend_continue(void)
1887 {
1888 struct uthread *uth = NULL;
1889 thread_t th_to_unsuspend;
1890 struct threadlist *tl;
1891 proc_t p;
1892
1893 th_to_unsuspend = current_thread();
1894 uth = get_bsdthread_info(th_to_unsuspend);
1895
1896 if (uth != NULL && (tl = uth->uu_threadlist) != NULL) {
1897
1898 if ((tl->th_flags & (TH_LIST_RUNNING | TH_LIST_BUSY)) == TH_LIST_RUNNING) {
1899 /*
1900 * most likely a normal resume of this thread occurred...
1901 * it's also possible that the thread was aborted after we
1902 * finished setting it up so that it could be dispatched... if
1903 * so, thread_bootstrap_return will notice the abort and put
1904 * the thread on the path to self-destruction
1905 */
1906 normal_resume_to_user:
1907 thread_sched_call(th_to_unsuspend, workqueue_callback);
1908
1909 thread_bootstrap_return();
1910 }
1911 /*
1912 * if we get here, it's because we've been resumed due to
1913 * an abort of this thread (process is crashing)
1914 */
1915 p = current_proc();
1916
1917 workqueue_lock_spin(p);
1918
1919 if (tl->th_flags & TH_LIST_SUSPENDED) {
1920 /*
1921 * thread has been aborted while still on our idle
1922 * queue... remove it from our domain...
1923 * workqueue_removethread consumes the lock
1924 */
1925 workqueue_removethread(tl);
1926
1927 thread_bootstrap_return();
1928 }
1929 while ((tl->th_flags & TH_LIST_BUSY)) {
1930 /*
1931 * this thread was aborted after we started making
1932 * it runnable, but before we finished dispatching it...
1933 * we need to wait for that process to finish,
1934 * and we need to ask for a wakeup instead of a
1935 * thread_resume since the abort has already resumed us
1936 */
1937 tl->th_flags |= TH_LIST_NEED_WAKEUP;
1938
1939 assert_wait((caddr_t)tl, (THREAD_UNINT));
1940
1941 workqueue_unlock(p);
1942
1943 thread_block(THREAD_CONTINUE_NULL);
1944
1945 workqueue_lock_spin(p);
1946 }
1947 workqueue_unlock(p);
1948 /*
1949 * we have finished setting up the thread's context...
1950 * thread_bootstrap_return will take us through the abort path
1951 * where the thread will self destruct
1952 */
1953 goto normal_resume_to_user;
1954 }
1955 thread_bootstrap_return();
1956 }
1957
1958
1959 static void
1960 wq_unpark_continue(void)
1961 {
1962 struct uthread *uth = NULL;
1963 struct threadlist *tl;
1964 thread_t th_to_unpark;
1965 proc_t p;
1966
1967 th_to_unpark = current_thread();
1968 uth = get_bsdthread_info(th_to_unpark);
1969
1970 if (uth != NULL) {
1971 if ((tl = uth->uu_threadlist) != NULL) {
1972
1973 if ((tl->th_flags & (TH_LIST_RUNNING | TH_LIST_BUSY)) == TH_LIST_RUNNING) {
1974 /*
1975 * a normal wakeup of this thread occurred... no need
1976 * for any synchronization with the timer and wq_runitem
1977 */
1978 normal_return_to_user:
1979 thread_sched_call(th_to_unpark, workqueue_callback);
1980
1981 KERNEL_DEBUG(0xefffd018 | DBG_FUNC_END, tl->th_workq, 0, 0, 0, 0);
1982
1983 thread_exception_return();
1984 }
1985 p = current_proc();
1986
1987 workqueue_lock_spin(p);
1988
1989 if ( !(tl->th_flags & TH_LIST_RUNNING)) {
1990 /*
1991 * the timer popped us out and we've not
1992 * been moved off of the idle list
1993 * so we should now self-destruct
1994 *
1995 * workqueue_removethread consumes the lock
1996 */
1997 workqueue_removethread(tl);
1998
1999 thread_exception_return();
2000 }
2001 /*
2002 * the timer woke us up, but we have already
2003 * started to make this a runnable thread,
2004 * but have not yet finished that process...
2005 * so wait for the normal wakeup
2006 */
2007 while ((tl->th_flags & TH_LIST_BUSY)) {
2008
2009 assert_wait((caddr_t)tl, (THREAD_UNINT));
2010
2011 workqueue_unlock(p);
2012
2013 thread_block(THREAD_CONTINUE_NULL);
2014
2015 workqueue_lock_spin(p);
2016 }
2017 /*
2018 * we have finished setting up the thread's context
2019 * now we can return as if we got a normal wakeup
2020 */
2021 workqueue_unlock(p);
2022
2023 goto normal_return_to_user;
2024 }
2025 }
2026 thread_exception_return();
2027 }
2028
2029
2030
2031 static void
2032 wq_runitem(proc_t p, user_addr_t item, thread_t th, struct threadlist *tl,
2033 int reuse_thread, int wake_thread, int return_directly)
2034 {
2035 int ret = 0;
2036
2037 KERNEL_DEBUG1(0xefffd004 | DBG_FUNC_START, tl->th_workq, tl->th_priority, tl->th_affinity_tag, thread_tid(current_thread()), thread_tid(th));
2038
2039 ret = setup_wqthread(p, th, item, reuse_thread, tl);
2040
2041 if (ret != 0)
2042 panic("setup_wqthread failed %x\n", ret);
2043
2044 if (return_directly) {
2045 KERNEL_DEBUG(0xefffd000 | DBG_FUNC_END, tl->th_workq, 0, 0, 4, 0);
2046
2047 thread_exception_return();
2048
2049 panic("wq_runitem: thread_exception_return returned ...\n");
2050 }
2051 if (wake_thread) {
2052 workqueue_lock_spin(p);
2053
2054 tl->th_flags &= ~TH_LIST_BUSY;
2055 wakeup(tl);
2056
2057 workqueue_unlock(p);
2058 } else {
2059 KERNEL_DEBUG1(0xefffd014 | DBG_FUNC_END, tl->th_workq, 0, 0, thread_tid(current_thread()), thread_tid(th));
2060
2061 workqueue_lock_spin(p);
2062
2063 if (tl->th_flags & TH_LIST_NEED_WAKEUP)
2064 wakeup(tl);
2065 else
2066 thread_resume(th);
2067
2068 tl->th_flags &= ~(TH_LIST_BUSY | TH_LIST_NEED_WAKEUP);
2069
2070 workqueue_unlock(p);
2071 }
2072 }
2073
2074 int
2075 setup_wqthread(proc_t p, thread_t th, user_addr_t item, int reuse_thread, struct threadlist *tl)
2076 {
2077 #if defined(__ppc__)
2078 /*
2079 * Set up PowerPC registers...
2080 * internally they are always kept as 64 bit and
2081 * since the register set is the same between 32 and 64bit modes
2082 * we don't need 2 different methods for setting the state
2083 */
2084 {
2085 ppc_thread_state64_t state64;
2086 ppc_thread_state64_t *ts64 = &state64;
2087
2088 ts64->srr0 = (uint64_t)p->p_wqthread;
2089 ts64->r1 = (uint64_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE) - C_ARGSAVE_LEN - C_RED_ZONE);
2090 ts64->r3 = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE);
2091 ts64->r4 = (uint64_t)(tl->th_thport);
2092 ts64->r5 = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE);
2093 ts64->r6 = (uint64_t)item;
2094 ts64->r7 = (uint64_t)reuse_thread;
2095 ts64->r8 = (uint64_t)0;
2096
2097 if ((reuse_thread != 0) && (ts64->r3 == (uint64_t)0))
2098 panic("setup_wqthread: setting reuse thread with null pthread\n");
2099 thread_set_wq_state64(th, (thread_state_t)ts64);
2100 }
2101 #elif defined(__i386__) || defined(__x86_64__)
2102 int isLP64 = 0;
2103
2104 isLP64 = IS_64BIT_PROCESS(p);
2105 /*
2106 * Set up i386 registers & function call.
2107 */
2108 if (isLP64 == 0) {
2109 x86_thread_state32_t state;
2110 x86_thread_state32_t *ts = &state;
2111
2112 ts->eip = (int)p->p_wqthread;
2113 ts->eax = (unsigned int)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE);
2114 ts->ebx = (unsigned int)tl->th_thport;
2115 ts->ecx = (unsigned int)(tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE);
2116 ts->edx = (unsigned int)item;
2117 ts->edi = (unsigned int)reuse_thread;
2118 ts->esi = (unsigned int)0;
2119 /*
2120 * set stack pointer
2121 */
2122 ts->esp = (int)((vm_offset_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE) - C_32_STK_ALIGN));
2123
2124 if ((reuse_thread != 0) && (ts->eax == (unsigned int)0))
2125 panic("setup_wqthread: setting reuse thread with null pthread\n");
2126 thread_set_wq_state32(th, (thread_state_t)ts);
2127
2128 } else {
2129 x86_thread_state64_t state64;
2130 x86_thread_state64_t *ts64 = &state64;
2131
2132 ts64->rip = (uint64_t)p->p_wqthread;
2133 ts64->rdi = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE);
2134 ts64->rsi = (uint64_t)(tl->th_thport);
2135 ts64->rdx = (uint64_t)(tl->th_stackaddr + PTH_DEFAULT_GUARDSIZE);
2136 ts64->rcx = (uint64_t)item;
2137 ts64->r8 = (uint64_t)reuse_thread;
2138 ts64->r9 = (uint64_t)0;
2139
2140 /*
2141 * set stack pointer aligned to 16 byte boundary
2142 */
2143 ts64->rsp = (uint64_t)((tl->th_stackaddr + PTH_DEFAULT_STACKSIZE + PTH_DEFAULT_GUARDSIZE) - C_64_REDZONE_LEN);
2144
2145 if ((reuse_thread != 0) && (ts64->rdi == (uint64_t)0))
2146 panic("setup_wqthread: setting reuse thread with null pthread\n");
2147 thread_set_wq_state64(th, (thread_state_t)ts64);
2148 }
2149 #else
2150 #error setup_wqthread not defined for this architecture
2151 #endif
2152 return(0);
2153 }
2154
2155 int
2156 fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
2157 {
2158 struct workqueue * wq;
2159 int error = 0;
2160 int activecount;
2161 uint32_t pri, affinity;
2162
2163 workqueue_lock_spin(p);
2164 if ((wq = p->p_wqptr) == NULL) {
2165 error = EINVAL;
2166 goto out;
2167 }
2168 activecount = 0;
2169
2170 for (pri = 0; pri < WORKQUEUE_NUMPRIOS; pri++) {
2171 for (affinity = 0; affinity < wq->wq_affinity_max; affinity++)
2172 activecount += wq->wq_thactive_count[pri][affinity];
2173 }
2174 pwqinfo->pwq_nthreads = wq->wq_nthreads;
2175 pwqinfo->pwq_runthreads = activecount;
2176 pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
2177 out:
2178 workqueue_unlock(p);
2179 return(error);
2180 }
2181
2182 /* Set target concurrency of one of the queue(0,1,2) with specified value */
2183 int
2184 proc_settargetconc(pid_t pid, int queuenum, int32_t targetconc)
2185 {
2186 proc_t p, self;
2187 uint64_t addr;
2188 int32_t conc = targetconc;
2189 int error = 0;
2190 vm_map_t oldmap = VM_MAP_NULL;
2191 int gotref = 0;
2192
2193 self = current_proc();
2194 if (self->p_pid != pid) {
2195 /* if not on self, hold a refernce on the process */
2196
2197 if (pid == 0)
2198 return(EINVAL);
2199
2200 p = proc_find(pid);
2201
2202 if (p == PROC_NULL)
2203 return(ESRCH);
2204 gotref = 1;
2205
2206 } else
2207 p = self;
2208
2209 if ((addr = p->p_targconc) == (uint64_t)0) {
2210 error = EINVAL;
2211 goto out;
2212 }
2213
2214
2215 if ((queuenum >= WQ_MAXPRI_MIN) && (queuenum <= WQ_MAXPRI_MAX)) {
2216 addr += (queuenum * sizeof(int32_t));
2217 if (gotref == 1)
2218 oldmap = vm_map_switch(get_task_map(p->task));
2219 error = copyout(&conc, addr, sizeof(int32_t));
2220 if (gotref == 1)
2221 (void)vm_map_switch(oldmap);
2222
2223 } else {
2224 error = EINVAL;
2225 }
2226 out:
2227 if (gotref == 1)
2228 proc_rele(p);
2229 return(error);
2230 }
2231
2232
2233 /* Set target concurrency on all the prio queues with specified value */
2234 int
2235 proc_setalltargetconc(pid_t pid, int32_t * targetconcp)
2236 {
2237 proc_t p, self;
2238 uint64_t addr;
2239 int error = 0;
2240 vm_map_t oldmap = VM_MAP_NULL;
2241 int gotref = 0;
2242
2243 self = current_proc();
2244 if (self->p_pid != pid) {
2245 /* if not on self, hold a refernce on the process */
2246
2247 if (pid == 0)
2248 return(EINVAL);
2249
2250 p = proc_find(pid);
2251
2252 if (p == PROC_NULL)
2253 return(ESRCH);
2254 gotref = 1;
2255
2256 } else
2257 p = self;
2258
2259 if ((addr = (uint64_t)p->p_targconc) == (uint64_t)0) {
2260 error = EINVAL;
2261 goto out;
2262 }
2263
2264
2265 if (gotref == 1)
2266 oldmap = vm_map_switch(get_task_map(p->task));
2267
2268 error = copyout(targetconcp, addr, WQ_PRI_NUM * sizeof(int32_t));
2269 if (gotref == 1)
2270 (void)vm_map_switch(oldmap);
2271
2272 out:
2273 if (gotref == 1)
2274 proc_rele(p);
2275 return(error);
2276 }
2277
2278 int thread_selfid(__unused struct proc *p, __unused struct thread_selfid_args *uap, user_addr_t *retval)
2279 {
2280 thread_t thread = current_thread();
2281 uint64_t thread_id = thread_tid(thread);
2282 *retval = thread_id;
2283 return KERN_SUCCESS;
2284 }
2285
2286 void
2287 pthread_init(void)
2288 {
2289 pthread_lck_grp_attr = lck_grp_attr_alloc_init();
2290 pthread_lck_grp = lck_grp_alloc_init("pthread", pthread_lck_grp_attr);
2291
2292 /*
2293 * allocate the lock attribute for pthread synchronizers
2294 */
2295 pthread_lck_attr = lck_attr_alloc_init();
2296
2297 workqueue_init_lock((proc_t) get_bsdtask_info(kernel_task));
2298 #if PSYNCH
2299 pthread_list_mlock = lck_mtx_alloc_init(pthread_lck_grp, pthread_lck_attr);
2300
2301 pth_global_hashinit();
2302 psynch_thcall = thread_call_allocate(psynch_wq_cleanup, NULL);
2303 #endif /* PSYNCH */
2304 }