]> git.saurik.com Git - apple/xnu.git/blob - osfmk/kern/thread_call.c
xnu-2050.48.11.tar.gz
[apple/xnu.git] / osfmk / kern / thread_call.c
1 /*
2 * Copyright (c) 1993-1995, 1999-2008 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28
29 #include <mach/mach_types.h>
30 #include <mach/thread_act.h>
31
32 #include <kern/kern_types.h>
33 #include <kern/zalloc.h>
34 #include <kern/sched_prim.h>
35 #include <kern/clock.h>
36 #include <kern/task.h>
37 #include <kern/thread.h>
38 #include <kern/wait_queue.h>
39
40 #include <vm/vm_pageout.h>
41
42 #include <kern/thread_call.h>
43 #include <kern/call_entry.h>
44 #include <kern/timer_call.h>
45
46 #include <libkern/OSAtomic.h>
47
48 #include <sys/kdebug.h>
49 #if CONFIG_DTRACE
50 #include <mach/sdt.h>
51 #endif
52
53 static zone_t thread_call_zone;
54 static struct wait_queue daemon_wqueue;
55
56 struct thread_call_group {
57 queue_head_t pending_queue;
58 uint32_t pending_count;
59
60 queue_head_t delayed_queue;
61 uint32_t delayed_count;
62
63 timer_call_data_t delayed_timer;
64 timer_call_data_t dealloc_timer;
65
66 struct wait_queue idle_wqueue;
67 uint32_t idle_count, active_count;
68
69 integer_t pri;
70 uint32_t target_thread_count;
71 uint64_t idle_timestamp;
72
73 uint32_t flags;
74 sched_call_t sched_call;
75 };
76
77 typedef struct thread_call_group *thread_call_group_t;
78
79 #define TCG_PARALLEL 0x01
80 #define TCG_DEALLOC_ACTIVE 0x02
81
82 #define THREAD_CALL_GROUP_COUNT 4
83 #define THREAD_CALL_THREAD_MIN 4
84 #define INTERNAL_CALL_COUNT 768
85 #define THREAD_CALL_DEALLOC_INTERVAL_NS (5 * 1000 * 1000) /* 5 ms */
86 #define THREAD_CALL_ADD_RATIO 4
87 #define THREAD_CALL_MACH_FACTOR_CAP 3
88
89 static struct thread_call_group thread_call_groups[THREAD_CALL_GROUP_COUNT];
90 static boolean_t thread_call_daemon_awake;
91 static thread_call_data_t internal_call_storage[INTERNAL_CALL_COUNT];
92 static queue_head_t thread_call_internal_queue;
93 static uint64_t thread_call_dealloc_interval_abs;
94
95 static __inline__ thread_call_t _internal_call_allocate(void);
96 static __inline__ void _internal_call_release(thread_call_t call);
97 static __inline__ boolean_t _pending_call_enqueue(thread_call_t call, thread_call_group_t group);
98 static __inline__ boolean_t _delayed_call_enqueue(thread_call_t call, thread_call_group_t group, uint64_t deadline);
99 static __inline__ boolean_t _call_dequeue(thread_call_t call, thread_call_group_t group);
100 static __inline__ void thread_call_wake(thread_call_group_t group);
101 static __inline__ void _set_delayed_call_timer(thread_call_t call, thread_call_group_t group);
102 static boolean_t _remove_from_pending_queue(thread_call_func_t func, thread_call_param_t param0, boolean_t remove_all);
103 static boolean_t _remove_from_delayed_queue(thread_call_func_t func, thread_call_param_t param0, boolean_t remove_all);
104 static void thread_call_daemon(void *arg);
105 static void thread_call_thread(thread_call_group_t group, wait_result_t wres);
106 extern void thread_call_delayed_timer(timer_call_param_t p0, timer_call_param_t p1);
107 static void thread_call_dealloc_timer(timer_call_param_t p0, timer_call_param_t p1);
108 static void thread_call_group_setup(thread_call_group_t group, thread_call_priority_t pri, uint32_t target_thread_count, boolean_t parallel);
109 static void sched_call_thread(int type, thread_t thread);
110 static void thread_call_start_deallocate_timer(thread_call_group_t group);
111 static void thread_call_wait_locked(thread_call_t call);
112
113 #define qe(x) ((queue_entry_t)(x))
114 #define TC(x) ((thread_call_t)(x))
115
116
117 lck_grp_t thread_call_queues_lck_grp;
118 lck_grp_t thread_call_lck_grp;
119 lck_attr_t thread_call_lck_attr;
120 lck_grp_attr_t thread_call_lck_grp_attr;
121
122 #if defined(__i386__) || defined(__x86_64__)
123 lck_mtx_t thread_call_lock_data;
124 #else
125 lck_spin_t thread_call_lock_data;
126 #endif
127
128
129 #define thread_call_lock_spin() \
130 lck_mtx_lock_spin_always(&thread_call_lock_data)
131
132 #define thread_call_unlock() \
133 lck_mtx_unlock_always(&thread_call_lock_data)
134
135
136 static inline spl_t
137 disable_ints_and_lock(void)
138 {
139 spl_t s;
140
141 s = splsched();
142 thread_call_lock_spin();
143
144 return s;
145 }
146
147 static inline void
148 enable_ints_and_unlock(void)
149 {
150 thread_call_unlock();
151 (void)spllo();
152 }
153
154
155 static inline boolean_t
156 group_isparallel(thread_call_group_t group)
157 {
158 return ((group->flags & TCG_PARALLEL) != 0);
159 }
160
161 static boolean_t
162 thread_call_group_should_add_thread(thread_call_group_t group)
163 {
164 uint32_t thread_count;
165
166 if (!group_isparallel(group)) {
167 if (group->pending_count > 0 && group->active_count == 0) {
168 return TRUE;
169 }
170
171 return FALSE;
172 }
173
174 if (group->pending_count > 0) {
175 if (group->idle_count > 0) {
176 panic("Pending work, but threads are idle?");
177 }
178
179 thread_count = group->active_count;
180
181 /*
182 * Add a thread if either there are no threads,
183 * the group has fewer than its target number of
184 * threads, or the amount of work is large relative
185 * to the number of threads. In the last case, pay attention
186 * to the total load on the system, and back off if
187 * it's high.
188 */
189 if ((thread_count == 0) ||
190 (thread_count < group->target_thread_count) ||
191 ((group->pending_count > THREAD_CALL_ADD_RATIO * thread_count) &&
192 (sched_mach_factor < THREAD_CALL_MACH_FACTOR_CAP))) {
193 return TRUE;
194 }
195 }
196
197 return FALSE;
198 }
199
200 static inline integer_t
201 thread_call_priority_to_sched_pri(thread_call_priority_t pri)
202 {
203 switch (pri) {
204 case THREAD_CALL_PRIORITY_HIGH:
205 return BASEPRI_PREEMPT;
206 case THREAD_CALL_PRIORITY_KERNEL:
207 return BASEPRI_KERNEL;
208 case THREAD_CALL_PRIORITY_USER:
209 return BASEPRI_DEFAULT;
210 case THREAD_CALL_PRIORITY_LOW:
211 return DEPRESSPRI;
212 default:
213 panic("Invalid priority.");
214 }
215
216 return 0;
217 }
218
219 /* Lock held */
220 static inline thread_call_group_t
221 thread_call_get_group(
222 thread_call_t call)
223 {
224 thread_call_priority_t pri = call->tc_pri;
225
226 assert(pri == THREAD_CALL_PRIORITY_LOW ||
227 pri == THREAD_CALL_PRIORITY_USER ||
228 pri == THREAD_CALL_PRIORITY_KERNEL ||
229 pri == THREAD_CALL_PRIORITY_HIGH);
230
231 return &thread_call_groups[pri];
232 }
233
234 static void
235 thread_call_group_setup(
236 thread_call_group_t group,
237 thread_call_priority_t pri,
238 uint32_t target_thread_count,
239 boolean_t parallel)
240 {
241 queue_init(&group->pending_queue);
242 queue_init(&group->delayed_queue);
243
244 timer_call_setup(&group->delayed_timer, thread_call_delayed_timer, group);
245 timer_call_setup(&group->dealloc_timer, thread_call_dealloc_timer, group);
246
247 wait_queue_init(&group->idle_wqueue, SYNC_POLICY_FIFO);
248
249 group->target_thread_count = target_thread_count;
250 group->pri = thread_call_priority_to_sched_pri(pri);
251
252 group->sched_call = sched_call_thread;
253 if (parallel) {
254 group->flags |= TCG_PARALLEL;
255 group->sched_call = NULL;
256 }
257 }
258
259 /*
260 * Simple wrapper for creating threads bound to
261 * thread call groups.
262 */
263 static kern_return_t
264 thread_call_thread_create(
265 thread_call_group_t group)
266 {
267 thread_t thread;
268 kern_return_t result;
269
270 result = kernel_thread_start_priority((thread_continue_t)thread_call_thread, group, group->pri, &thread);
271 if (result != KERN_SUCCESS) {
272 return result;
273 }
274
275 if (group->pri < BASEPRI_PREEMPT) {
276 /*
277 * New style doesn't get to run to completion in
278 * kernel if there are higher priority threads
279 * available.
280 */
281 thread_set_eager_preempt(thread);
282 }
283
284 thread_deallocate(thread);
285 return KERN_SUCCESS;
286 }
287
288 /*
289 * thread_call_initialize:
290 *
291 * Initialize this module, called
292 * early during system initialization.
293 */
294 void
295 thread_call_initialize(void)
296 {
297 thread_call_t call;
298 kern_return_t result;
299 thread_t thread;
300 int i;
301
302 i = sizeof (thread_call_data_t);
303 thread_call_zone = zinit(i, 4096 * i, 16 * i, "thread_call");
304 zone_change(thread_call_zone, Z_CALLERACCT, FALSE);
305 zone_change(thread_call_zone, Z_NOENCRYPT, TRUE);
306
307 lck_attr_setdefault(&thread_call_lck_attr);
308 lck_grp_attr_setdefault(&thread_call_lck_grp_attr);
309 lck_grp_init(&thread_call_queues_lck_grp, "thread_call_queues", &thread_call_lck_grp_attr);
310 lck_grp_init(&thread_call_lck_grp, "thread_call", &thread_call_lck_grp_attr);
311
312 #if defined(__i386__) || defined(__x86_64__)
313 lck_mtx_init(&thread_call_lock_data, &thread_call_lck_grp, &thread_call_lck_attr);
314 #else
315 lck_spin_init(&thread_call_lock_data, &thread_call_lck_grp, &thread_call_lck_attr);
316 #endif
317
318 nanotime_to_absolutetime(0, THREAD_CALL_DEALLOC_INTERVAL_NS, &thread_call_dealloc_interval_abs);
319 wait_queue_init(&daemon_wqueue, SYNC_POLICY_FIFO);
320
321 thread_call_group_setup(&thread_call_groups[THREAD_CALL_PRIORITY_LOW], THREAD_CALL_PRIORITY_LOW, 0, TRUE);
322 thread_call_group_setup(&thread_call_groups[THREAD_CALL_PRIORITY_USER], THREAD_CALL_PRIORITY_USER, 0, TRUE);
323 thread_call_group_setup(&thread_call_groups[THREAD_CALL_PRIORITY_KERNEL], THREAD_CALL_PRIORITY_KERNEL, 1, TRUE);
324 thread_call_group_setup(&thread_call_groups[THREAD_CALL_PRIORITY_HIGH], THREAD_CALL_PRIORITY_HIGH, THREAD_CALL_THREAD_MIN, FALSE);
325
326 disable_ints_and_lock();
327
328 queue_init(&thread_call_internal_queue);
329 for (
330 call = internal_call_storage;
331 call < &internal_call_storage[INTERNAL_CALL_COUNT];
332 call++) {
333
334 enqueue_tail(&thread_call_internal_queue, qe(call));
335 }
336
337 thread_call_daemon_awake = TRUE;
338
339 enable_ints_and_unlock();
340
341 result = kernel_thread_start_priority((thread_continue_t)thread_call_daemon, NULL, BASEPRI_PREEMPT + 1, &thread);
342 if (result != KERN_SUCCESS)
343 panic("thread_call_initialize");
344
345 thread_deallocate(thread);
346 }
347
348 void
349 thread_call_setup(
350 thread_call_t call,
351 thread_call_func_t func,
352 thread_call_param_t param0)
353 {
354 bzero(call, sizeof(*call));
355 call_entry_setup((call_entry_t)call, func, param0);
356 call->tc_pri = THREAD_CALL_PRIORITY_HIGH; /* Default priority */
357 }
358
359 /*
360 * _internal_call_allocate:
361 *
362 * Allocate an internal callout entry.
363 *
364 * Called with thread_call_lock held.
365 */
366 static __inline__ thread_call_t
367 _internal_call_allocate(void)
368 {
369 thread_call_t call;
370
371 if (queue_empty(&thread_call_internal_queue))
372 panic("_internal_call_allocate");
373
374 call = TC(dequeue_head(&thread_call_internal_queue));
375
376 return (call);
377 }
378
379 /*
380 * _internal_call_release:
381 *
382 * Release an internal callout entry which
383 * is no longer pending (or delayed).
384 *
385 * Called with thread_call_lock held.
386 */
387 static __inline__ void
388 _internal_call_release(
389 thread_call_t call)
390 {
391 if ( call >= internal_call_storage &&
392 call < &internal_call_storage[INTERNAL_CALL_COUNT] )
393 enqueue_head(&thread_call_internal_queue, qe(call));
394 }
395
396 /*
397 * _pending_call_enqueue:
398 *
399 * Place an entry at the end of the
400 * pending queue, to be executed soon.
401 *
402 * Returns TRUE if the entry was already
403 * on a queue.
404 *
405 * Called with thread_call_lock held.
406 */
407 static __inline__ boolean_t
408 _pending_call_enqueue(
409 thread_call_t call,
410 thread_call_group_t group)
411 {
412 queue_head_t *old_queue;
413
414 old_queue = call_entry_enqueue_tail(CE(call), &group->pending_queue);
415
416 if (old_queue == NULL) {
417 call->tc_submit_count++;
418 }
419
420 group->pending_count++;
421
422 thread_call_wake(group);
423
424 return (old_queue != NULL);
425 }
426
427 /*
428 * _delayed_call_enqueue:
429 *
430 * Place an entry on the delayed queue,
431 * after existing entries with an earlier
432 * (or identical) deadline.
433 *
434 * Returns TRUE if the entry was already
435 * on a queue.
436 *
437 * Called with thread_call_lock held.
438 */
439 static __inline__ boolean_t
440 _delayed_call_enqueue(
441 thread_call_t call,
442 thread_call_group_t group,
443 uint64_t deadline)
444 {
445 queue_head_t *old_queue;
446
447 old_queue = call_entry_enqueue_deadline(CE(call), &group->delayed_queue, deadline);
448
449 if (old_queue == &group->pending_queue)
450 group->pending_count--;
451 else if (old_queue == NULL)
452 call->tc_submit_count++;
453
454 return (old_queue != NULL);
455 }
456
457 /*
458 * _call_dequeue:
459 *
460 * Remove an entry from a queue.
461 *
462 * Returns TRUE if the entry was on a queue.
463 *
464 * Called with thread_call_lock held.
465 */
466 static __inline__ boolean_t
467 _call_dequeue(
468 thread_call_t call,
469 thread_call_group_t group)
470 {
471 queue_head_t *old_queue;
472
473 old_queue = call_entry_dequeue(CE(call));
474
475 if (old_queue != NULL) {
476 call->tc_finish_count++;
477 if (old_queue == &group->pending_queue)
478 group->pending_count--;
479 }
480
481 return (old_queue != NULL);
482 }
483
484 /*
485 * _set_delayed_call_timer:
486 *
487 * Reset the timer so that it
488 * next expires when the entry is due.
489 *
490 * Called with thread_call_lock held.
491 */
492 static __inline__ void
493 _set_delayed_call_timer(
494 thread_call_t call,
495 thread_call_group_t group)
496 {
497 timer_call_enter(&group->delayed_timer, call->tc_call.deadline, 0);
498 }
499
500 /*
501 * _remove_from_pending_queue:
502 *
503 * Remove the first (or all) matching
504 * entries from the pending queue.
505 *
506 * Returns TRUE if any matching entries
507 * were found.
508 *
509 * Called with thread_call_lock held.
510 */
511 static boolean_t
512 _remove_from_pending_queue(
513 thread_call_func_t func,
514 thread_call_param_t param0,
515 boolean_t remove_all)
516 {
517 boolean_t call_removed = FALSE;
518 thread_call_t call;
519 thread_call_group_t group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH];
520
521 call = TC(queue_first(&group->pending_queue));
522
523 while (!queue_end(&group->pending_queue, qe(call))) {
524 if (call->tc_call.func == func &&
525 call->tc_call.param0 == param0) {
526 thread_call_t next = TC(queue_next(qe(call)));
527
528 _call_dequeue(call, group);
529
530 _internal_call_release(call);
531
532 call_removed = TRUE;
533 if (!remove_all)
534 break;
535
536 call = next;
537 }
538 else
539 call = TC(queue_next(qe(call)));
540 }
541
542 return (call_removed);
543 }
544
545 /*
546 * _remove_from_delayed_queue:
547 *
548 * Remove the first (or all) matching
549 * entries from the delayed queue.
550 *
551 * Returns TRUE if any matching entries
552 * were found.
553 *
554 * Called with thread_call_lock held.
555 */
556 static boolean_t
557 _remove_from_delayed_queue(
558 thread_call_func_t func,
559 thread_call_param_t param0,
560 boolean_t remove_all)
561 {
562 boolean_t call_removed = FALSE;
563 thread_call_t call;
564 thread_call_group_t group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH];
565
566 call = TC(queue_first(&group->delayed_queue));
567
568 while (!queue_end(&group->delayed_queue, qe(call))) {
569 if (call->tc_call.func == func &&
570 call->tc_call.param0 == param0) {
571 thread_call_t next = TC(queue_next(qe(call)));
572
573 _call_dequeue(call, group);
574
575 _internal_call_release(call);
576
577 call_removed = TRUE;
578 if (!remove_all)
579 break;
580
581 call = next;
582 }
583 else
584 call = TC(queue_next(qe(call)));
585 }
586
587 return (call_removed);
588 }
589
590 #ifndef __LP64__
591
592 /*
593 * thread_call_func:
594 *
595 * Enqueue a function callout.
596 *
597 * Guarantees { function, argument }
598 * uniqueness if unique_call is TRUE.
599 */
600 void
601 thread_call_func(
602 thread_call_func_t func,
603 thread_call_param_t param,
604 boolean_t unique_call)
605 {
606 thread_call_t call;
607 thread_call_group_t group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH];
608 spl_t s;
609
610 s = splsched();
611 thread_call_lock_spin();
612
613 call = TC(queue_first(&group->pending_queue));
614
615 while (unique_call && !queue_end(&group->pending_queue, qe(call))) {
616 if (call->tc_call.func == func && call->tc_call.param0 == param) {
617 break;
618 }
619
620 call = TC(queue_next(qe(call)));
621 }
622
623 if (!unique_call || queue_end(&group->pending_queue, qe(call))) {
624 call = _internal_call_allocate();
625 call->tc_call.func = func;
626 call->tc_call.param0 = param;
627 call->tc_call.param1 = NULL;
628
629 _pending_call_enqueue(call, group);
630 }
631
632 thread_call_unlock();
633 splx(s);
634 }
635
636 #endif /* __LP64__ */
637
638 /*
639 * thread_call_func_delayed:
640 *
641 * Enqueue a function callout to
642 * occur at the stated time.
643 */
644 void
645 thread_call_func_delayed(
646 thread_call_func_t func,
647 thread_call_param_t param,
648 uint64_t deadline)
649 {
650 thread_call_t call;
651 thread_call_group_t group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH];
652 spl_t s;
653
654 s = splsched();
655 thread_call_lock_spin();
656
657 call = _internal_call_allocate();
658 call->tc_call.func = func;
659 call->tc_call.param0 = param;
660 call->tc_call.param1 = 0;
661
662 _delayed_call_enqueue(call, group, deadline);
663
664 if (queue_first(&group->delayed_queue) == qe(call))
665 _set_delayed_call_timer(call, group);
666
667 thread_call_unlock();
668 splx(s);
669 }
670
671 /*
672 * thread_call_func_cancel:
673 *
674 * Dequeue a function callout.
675 *
676 * Removes one (or all) { function, argument }
677 * instance(s) from either (or both)
678 * the pending and the delayed queue,
679 * in that order.
680 *
681 * Returns TRUE if any calls were cancelled.
682 */
683 boolean_t
684 thread_call_func_cancel(
685 thread_call_func_t func,
686 thread_call_param_t param,
687 boolean_t cancel_all)
688 {
689 boolean_t result;
690 spl_t s;
691
692 s = splsched();
693 thread_call_lock_spin();
694
695 if (cancel_all)
696 result = _remove_from_pending_queue(func, param, cancel_all) |
697 _remove_from_delayed_queue(func, param, cancel_all);
698 else
699 result = _remove_from_pending_queue(func, param, cancel_all) ||
700 _remove_from_delayed_queue(func, param, cancel_all);
701
702 thread_call_unlock();
703 splx(s);
704
705 return (result);
706 }
707
708 /*
709 * Allocate a thread call with a given priority. Importances
710 * other than THREAD_CALL_PRIORITY_HIGH will be run in threads
711 * with eager preemption enabled (i.e. may be aggressively preempted
712 * by higher-priority threads which are not in the normal "urgent" bands).
713 */
714 thread_call_t
715 thread_call_allocate_with_priority(
716 thread_call_func_t func,
717 thread_call_param_t param0,
718 thread_call_priority_t pri)
719 {
720 thread_call_t call;
721
722 if (pri > THREAD_CALL_PRIORITY_LOW) {
723 panic("Invalid pri: %d\n", pri);
724 }
725
726 call = thread_call_allocate(func, param0);
727 call->tc_pri = pri;
728
729 return call;
730 }
731
732 /*
733 * thread_call_allocate:
734 *
735 * Allocate a callout entry.
736 */
737 thread_call_t
738 thread_call_allocate(
739 thread_call_func_t func,
740 thread_call_param_t param0)
741 {
742 thread_call_t call = zalloc(thread_call_zone);
743
744 thread_call_setup(call, func, param0);
745 call->tc_refs = 1;
746 call->tc_flags = THREAD_CALL_ALLOC;
747
748 return (call);
749 }
750
751 /*
752 * thread_call_free:
753 *
754 * Release a callout. If the callout is currently
755 * executing, it will be freed when all invocations
756 * finish.
757 */
758 boolean_t
759 thread_call_free(
760 thread_call_t call)
761 {
762 spl_t s;
763 int32_t refs;
764
765 s = splsched();
766 thread_call_lock_spin();
767
768 if (call->tc_call.queue != NULL) {
769 thread_call_unlock();
770 splx(s);
771
772 return (FALSE);
773 }
774
775 refs = --call->tc_refs;
776 if (refs < 0) {
777 panic("Refcount negative: %d\n", refs);
778 }
779
780 thread_call_unlock();
781 splx(s);
782
783 if (refs == 0) {
784 zfree(thread_call_zone, call);
785 }
786
787 return (TRUE);
788 }
789
790 /*
791 * thread_call_enter:
792 *
793 * Enqueue a callout entry to occur "soon".
794 *
795 * Returns TRUE if the call was
796 * already on a queue.
797 */
798 boolean_t
799 thread_call_enter(
800 thread_call_t call)
801 {
802 boolean_t result = TRUE;
803 thread_call_group_t group;
804 spl_t s;
805
806 group = thread_call_get_group(call);
807
808 s = splsched();
809 thread_call_lock_spin();
810
811 if (call->tc_call.queue != &group->pending_queue) {
812 result = _pending_call_enqueue(call, group);
813 }
814
815 call->tc_call.param1 = 0;
816
817 thread_call_unlock();
818 splx(s);
819
820 return (result);
821 }
822
823 boolean_t
824 thread_call_enter1(
825 thread_call_t call,
826 thread_call_param_t param1)
827 {
828 boolean_t result = TRUE;
829 thread_call_group_t group;
830 spl_t s;
831
832 group = thread_call_get_group(call);
833
834 s = splsched();
835 thread_call_lock_spin();
836
837 if (call->tc_call.queue != &group->pending_queue) {
838 result = _pending_call_enqueue(call, group);
839 }
840
841 call->tc_call.param1 = param1;
842
843 thread_call_unlock();
844 splx(s);
845
846 return (result);
847 }
848
849 /*
850 * thread_call_enter_delayed:
851 *
852 * Enqueue a callout entry to occur
853 * at the stated time.
854 *
855 * Returns TRUE if the call was
856 * already on a queue.
857 */
858 boolean_t
859 thread_call_enter_delayed(
860 thread_call_t call,
861 uint64_t deadline)
862 {
863 boolean_t result = TRUE;
864 thread_call_group_t group;
865 spl_t s;
866
867 group = thread_call_get_group(call);
868
869 s = splsched();
870 thread_call_lock_spin();
871
872 result = _delayed_call_enqueue(call, group, deadline);
873
874 if (queue_first(&group->delayed_queue) == qe(call))
875 _set_delayed_call_timer(call, group);
876
877 call->tc_call.param1 = 0;
878
879 thread_call_unlock();
880 splx(s);
881
882 return (result);
883 }
884
885 boolean_t
886 thread_call_enter1_delayed(
887 thread_call_t call,
888 thread_call_param_t param1,
889 uint64_t deadline)
890 {
891 boolean_t result = TRUE;
892 thread_call_group_t group;
893 spl_t s;
894 uint64_t abstime;
895
896 group = thread_call_get_group(call);
897
898 s = splsched();
899 thread_call_lock_spin();
900 abstime = mach_absolute_time();
901
902 result = _delayed_call_enqueue(call, group, deadline);
903
904 if (queue_first(&group->delayed_queue) == qe(call))
905 _set_delayed_call_timer(call, group);
906
907 call->tc_call.param1 = param1;
908
909 call->ttd = (deadline > abstime) ? (deadline - abstime) : 0;
910 #if CONFIG_DTRACE
911 DTRACE_TMR4(thread_callout__create, thread_call_func_t, call->tc_call.func, 0, (call->ttd >> 32), (unsigned) (call->ttd & 0xFFFFFFFF));
912 #endif
913 thread_call_unlock();
914 splx(s);
915
916 return (result);
917 }
918
919 /*
920 * thread_call_cancel:
921 *
922 * Dequeue a callout entry.
923 *
924 * Returns TRUE if the call was
925 * on a queue.
926 */
927 boolean_t
928 thread_call_cancel(
929 thread_call_t call)
930 {
931 boolean_t result;
932 thread_call_group_t group;
933 spl_t s;
934
935 group = thread_call_get_group(call);
936
937 s = splsched();
938 thread_call_lock_spin();
939
940 result = _call_dequeue(call, group);
941
942 thread_call_unlock();
943 splx(s);
944 #if CONFIG_DTRACE
945 DTRACE_TMR4(thread_callout__cancel, thread_call_func_t, call->tc_call.func, 0, (call->ttd >> 32), (unsigned) (call->ttd & 0xFFFFFFFF));
946 #endif
947
948 return (result);
949 }
950
951 /*
952 * Cancel a thread call. If it cannot be cancelled (i.e.
953 * is already in flight), waits for the most recent invocation
954 * to finish. Note that if clients re-submit this thread call,
955 * it may still be pending or in flight when thread_call_cancel_wait
956 * returns, but all requests to execute this work item prior
957 * to the call to thread_call_cancel_wait will have finished.
958 */
959 boolean_t
960 thread_call_cancel_wait(
961 thread_call_t call)
962 {
963 boolean_t result;
964 thread_call_group_t group;
965
966 if ((call->tc_flags & THREAD_CALL_ALLOC) == 0) {
967 panic("%s: Can't wait on thread call whose storage I don't own.", __FUNCTION__);
968 }
969
970 group = thread_call_get_group(call);
971
972 (void) splsched();
973 thread_call_lock_spin();
974
975 result = _call_dequeue(call, group);
976 if (result == FALSE) {
977 thread_call_wait_locked(call);
978 }
979
980 thread_call_unlock();
981 (void) spllo();
982
983 return result;
984 }
985
986
987 #ifndef __LP64__
988
989 /*
990 * thread_call_is_delayed:
991 *
992 * Returns TRUE if the call is
993 * currently on a delayed queue.
994 *
995 * Optionally returns the expiration time.
996 */
997 boolean_t
998 thread_call_is_delayed(
999 thread_call_t call,
1000 uint64_t *deadline)
1001 {
1002 boolean_t result = FALSE;
1003 thread_call_group_t group;
1004 spl_t s;
1005
1006 group = thread_call_get_group(call);
1007
1008 s = splsched();
1009 thread_call_lock_spin();
1010
1011 if (call->tc_call.queue == &group->delayed_queue) {
1012 if (deadline != NULL)
1013 *deadline = call->tc_call.deadline;
1014 result = TRUE;
1015 }
1016
1017 thread_call_unlock();
1018 splx(s);
1019
1020 return (result);
1021 }
1022
1023 #endif /* __LP64__ */
1024
1025 /*
1026 * thread_call_wake:
1027 *
1028 * Wake a call thread to service
1029 * pending call entries. May wake
1030 * the daemon thread in order to
1031 * create additional call threads.
1032 *
1033 * Called with thread_call_lock held.
1034 *
1035 * For high-priority group, only does wakeup/creation if there are no threads
1036 * running.
1037 */
1038 static __inline__ void
1039 thread_call_wake(
1040 thread_call_group_t group)
1041 {
1042 /*
1043 * New behavior: use threads if you've got 'em.
1044 * Traditional behavior: wake only if no threads running.
1045 */
1046 if (group_isparallel(group) || group->active_count == 0) {
1047 if (wait_queue_wakeup_one(&group->idle_wqueue, NO_EVENT, THREAD_AWAKENED, -1) == KERN_SUCCESS) {
1048 group->idle_count--; group->active_count++;
1049
1050 if (group->idle_count == 0) {
1051 timer_call_cancel(&group->dealloc_timer);
1052 group->flags &= TCG_DEALLOC_ACTIVE;
1053 }
1054 } else {
1055 if (!thread_call_daemon_awake && thread_call_group_should_add_thread(group)) {
1056 thread_call_daemon_awake = TRUE;
1057 wait_queue_wakeup_one(&daemon_wqueue, NO_EVENT, THREAD_AWAKENED, -1);
1058 }
1059 }
1060 }
1061 }
1062
1063 /*
1064 * sched_call_thread:
1065 *
1066 * Call out invoked by the scheduler. Used only for high-priority
1067 * thread call group.
1068 */
1069 static void
1070 sched_call_thread(
1071 int type,
1072 __unused thread_t thread)
1073 {
1074 thread_call_group_t group;
1075
1076 group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH]; /* XXX */
1077
1078 thread_call_lock_spin();
1079
1080 switch (type) {
1081
1082 case SCHED_CALL_BLOCK:
1083 --group->active_count;
1084 if (group->pending_count > 0)
1085 thread_call_wake(group);
1086 break;
1087
1088 case SCHED_CALL_UNBLOCK:
1089 group->active_count++;
1090 break;
1091 }
1092
1093 thread_call_unlock();
1094 }
1095
1096 /*
1097 * Interrupts disabled, lock held; returns the same way.
1098 * Only called on thread calls whose storage we own. Wakes up
1099 * anyone who might be waiting on this work item and frees it
1100 * if the client has so requested.
1101 */
1102 static void
1103 thread_call_finish(thread_call_t call)
1104 {
1105 boolean_t dowake = FALSE;
1106
1107 call->tc_finish_count++;
1108 call->tc_refs--;
1109
1110 if ((call->tc_flags & THREAD_CALL_WAIT) != 0) {
1111 dowake = TRUE;
1112 call->tc_flags &= ~THREAD_CALL_WAIT;
1113
1114 /*
1115 * Dropping lock here because the sched call for the
1116 * high-pri group can take the big lock from under
1117 * a thread lock.
1118 */
1119 thread_call_unlock();
1120 thread_wakeup((event_t)call);
1121 thread_call_lock_spin();
1122 }
1123
1124 if (call->tc_refs == 0) {
1125 if (dowake) {
1126 panic("Someone waiting on a thread call that is scheduled for free: %p\n", call->tc_call.func);
1127 }
1128
1129 enable_ints_and_unlock();
1130
1131 zfree(thread_call_zone, call);
1132
1133 (void)disable_ints_and_lock();
1134 }
1135
1136 }
1137
1138 /*
1139 * thread_call_thread:
1140 */
1141 static void
1142 thread_call_thread(
1143 thread_call_group_t group,
1144 wait_result_t wres)
1145 {
1146 thread_t self = current_thread();
1147 boolean_t canwait;
1148
1149 if ((thread_get_tag_internal(self) & THREAD_TAG_CALLOUT) == 0)
1150 (void)thread_set_tag_internal(self, THREAD_TAG_CALLOUT);
1151
1152 /*
1153 * A wakeup with THREAD_INTERRUPTED indicates that
1154 * we should terminate.
1155 */
1156 if (wres == THREAD_INTERRUPTED) {
1157 thread_terminate(self);
1158
1159 /* NOTREACHED */
1160 panic("thread_terminate() returned?");
1161 }
1162
1163 (void)disable_ints_and_lock();
1164
1165 thread_sched_call(self, group->sched_call);
1166
1167 while (group->pending_count > 0) {
1168 thread_call_t call;
1169 thread_call_func_t func;
1170 thread_call_param_t param0, param1;
1171
1172 call = TC(dequeue_head(&group->pending_queue));
1173 group->pending_count--;
1174
1175 func = call->tc_call.func;
1176 param0 = call->tc_call.param0;
1177 param1 = call->tc_call.param1;
1178
1179 call->tc_call.queue = NULL;
1180
1181 _internal_call_release(call);
1182
1183 /*
1184 * Can only do wakeups for thread calls whose storage
1185 * we control.
1186 */
1187 if ((call->tc_flags & THREAD_CALL_ALLOC) != 0) {
1188 canwait = TRUE;
1189 call->tc_refs++; /* Delay free until we're done */
1190 } else
1191 canwait = FALSE;
1192
1193 enable_ints_and_unlock();
1194
1195 KERNEL_DEBUG_CONSTANT(
1196 MACHDBG_CODE(DBG_MACH_SCHED,MACH_CALLOUT) | DBG_FUNC_NONE,
1197 VM_KERNEL_UNSLIDE(func), param0, param1, 0, 0);
1198
1199 (*func)(param0, param1);
1200
1201 if (get_preemption_level() != 0) {
1202 int pl = get_preemption_level();
1203 panic("thread_call_thread: preemption_level %d, last callout %p(%p, %p)",
1204 pl, (void *)VM_KERNEL_UNSLIDE(func), param0, param1);
1205 }
1206
1207 (void)thread_funnel_set(self->funnel_lock, FALSE); /* XXX */
1208
1209 (void) disable_ints_and_lock();
1210
1211 if (canwait) {
1212 /* Frees if so desired */
1213 thread_call_finish(call);
1214 }
1215 }
1216
1217 thread_sched_call(self, NULL);
1218 group->active_count--;
1219
1220 if (group_isparallel(group)) {
1221 /*
1222 * For new style of thread group, thread always blocks.
1223 * If we have more than the target number of threads,
1224 * and this is the first to block, and it isn't active
1225 * already, set a timer for deallocating a thread if we
1226 * continue to have a surplus.
1227 */
1228 group->idle_count++;
1229
1230 if (group->idle_count == 1) {
1231 group->idle_timestamp = mach_absolute_time();
1232 }
1233
1234 if (((group->flags & TCG_DEALLOC_ACTIVE) == 0) &&
1235 ((group->active_count + group->idle_count) > group->target_thread_count)) {
1236 group->flags |= TCG_DEALLOC_ACTIVE;
1237 thread_call_start_deallocate_timer(group);
1238 }
1239
1240 /* Wait for more work (or termination) */
1241 wres = wait_queue_assert_wait(&group->idle_wqueue, NO_EVENT, THREAD_INTERRUPTIBLE, 0);
1242 if (wres != THREAD_WAITING) {
1243 panic("kcall worker unable to assert wait?");
1244 }
1245
1246 enable_ints_and_unlock();
1247
1248 thread_block_parameter((thread_continue_t)thread_call_thread, group);
1249 } else {
1250 if (group->idle_count < group->target_thread_count) {
1251 group->idle_count++;
1252
1253 wait_queue_assert_wait(&group->idle_wqueue, NO_EVENT, THREAD_UNINT, 0); /* Interrupted means to exit */
1254
1255 enable_ints_and_unlock();
1256
1257 thread_block_parameter((thread_continue_t)thread_call_thread, group);
1258 /* NOTREACHED */
1259 }
1260 }
1261
1262 enable_ints_and_unlock();
1263
1264 thread_terminate(self);
1265 /* NOTREACHED */
1266 }
1267
1268 /*
1269 * thread_call_daemon: walk list of groups, allocating
1270 * threads if appropriate (as determined by
1271 * thread_call_group_should_add_thread()).
1272 */
1273 static void
1274 thread_call_daemon_continue(__unused void *arg)
1275 {
1276 int i;
1277 kern_return_t kr;
1278 thread_call_group_t group;
1279
1280 (void)disable_ints_and_lock();
1281
1282 /* Starting at zero happens to be high-priority first. */
1283 for (i = 0; i < THREAD_CALL_GROUP_COUNT; i++) {
1284 group = &thread_call_groups[i];
1285 while (thread_call_group_should_add_thread(group)) {
1286 group->active_count++;
1287
1288 enable_ints_and_unlock();
1289
1290 kr = thread_call_thread_create(group);
1291 if (kr != KERN_SUCCESS) {
1292 /*
1293 * On failure, just pause for a moment and give up.
1294 * We can try again later.
1295 */
1296 delay(10000); /* 10 ms */
1297 (void)disable_ints_and_lock();
1298 goto out;
1299 }
1300
1301 (void)disable_ints_and_lock();
1302 }
1303 }
1304
1305 out:
1306 thread_call_daemon_awake = FALSE;
1307 wait_queue_assert_wait(&daemon_wqueue, NO_EVENT, THREAD_UNINT, 0);
1308
1309 enable_ints_and_unlock();
1310
1311 thread_block_parameter((thread_continue_t)thread_call_daemon_continue, NULL);
1312 /* NOTREACHED */
1313 }
1314
1315 static void
1316 thread_call_daemon(
1317 __unused void *arg)
1318 {
1319 thread_t self = current_thread();
1320
1321 self->options |= TH_OPT_VMPRIV;
1322 vm_page_free_reserve(2); /* XXX */
1323
1324 thread_call_daemon_continue(NULL);
1325 /* NOTREACHED */
1326 }
1327
1328 /*
1329 * Schedule timer to deallocate a worker thread if we have a surplus
1330 * of threads (in excess of the group's target) and at least one thread
1331 * is idle the whole time.
1332 */
1333 static void
1334 thread_call_start_deallocate_timer(
1335 thread_call_group_t group)
1336 {
1337 uint64_t deadline;
1338 boolean_t onqueue;
1339
1340 assert(group->idle_count > 0);
1341
1342 group->flags |= TCG_DEALLOC_ACTIVE;
1343 deadline = group->idle_timestamp + thread_call_dealloc_interval_abs;
1344 onqueue = timer_call_enter(&group->dealloc_timer, deadline, 0);
1345
1346 if (onqueue) {
1347 panic("Deallocate timer already active?");
1348 }
1349 }
1350
1351 void
1352 thread_call_delayed_timer(
1353 timer_call_param_t p0,
1354 __unused timer_call_param_t p1
1355 )
1356 {
1357 thread_call_t call;
1358 thread_call_group_t group = p0;
1359 uint64_t timestamp;
1360
1361 thread_call_lock_spin();
1362
1363 timestamp = mach_absolute_time();
1364
1365 call = TC(queue_first(&group->delayed_queue));
1366
1367 while (!queue_end(&group->delayed_queue, qe(call))) {
1368 if (call->tc_call.deadline <= timestamp) {
1369 _pending_call_enqueue(call, group);
1370 }
1371 else
1372 break;
1373
1374 call = TC(queue_first(&group->delayed_queue));
1375 }
1376
1377 if (!queue_end(&group->delayed_queue, qe(call)))
1378 _set_delayed_call_timer(call, group);
1379
1380 thread_call_unlock();
1381 }
1382
1383 /*
1384 * Timer callback to tell a thread to terminate if
1385 * we have an excess of threads and at least one has been
1386 * idle for a long time.
1387 */
1388 static void
1389 thread_call_dealloc_timer(
1390 timer_call_param_t p0,
1391 __unused timer_call_param_t p1)
1392 {
1393 thread_call_group_t group = (thread_call_group_t)p0;
1394 uint64_t now;
1395 kern_return_t res;
1396 boolean_t terminated = FALSE;
1397
1398 thread_call_lock_spin();
1399
1400 now = mach_absolute_time();
1401 if (group->idle_count > 0) {
1402 if (now > group->idle_timestamp + thread_call_dealloc_interval_abs) {
1403 terminated = TRUE;
1404 group->idle_count--;
1405 res = wait_queue_wakeup_one(&group->idle_wqueue, NO_EVENT, THREAD_INTERRUPTED, -1);
1406 if (res != KERN_SUCCESS) {
1407 panic("Unable to wake up idle thread for termination?");
1408 }
1409 }
1410
1411 }
1412
1413 /*
1414 * If we still have an excess of threads, schedule another
1415 * invocation of this function.
1416 */
1417 if (group->idle_count > 0 && (group->idle_count + group->active_count > group->target_thread_count)) {
1418 /*
1419 * If we killed someone just now, push out the
1420 * next deadline.
1421 */
1422 if (terminated) {
1423 group->idle_timestamp = now;
1424 }
1425
1426 thread_call_start_deallocate_timer(group);
1427 } else {
1428 group->flags &= ~TCG_DEALLOC_ACTIVE;
1429 }
1430
1431 thread_call_unlock();
1432 }
1433
1434 /*
1435 * Wait for all requested invocations of a thread call prior to now
1436 * to finish. Can only be invoked on thread calls whose storage we manage.
1437 * Just waits for the finish count to catch up to the submit count we find
1438 * at the beginning of our wait.
1439 */
1440 static void
1441 thread_call_wait_locked(thread_call_t call)
1442 {
1443 uint64_t submit_count;
1444 wait_result_t res;
1445
1446 assert(call->tc_flags & THREAD_CALL_ALLOC);
1447
1448 submit_count = call->tc_submit_count;
1449
1450 while (call->tc_finish_count < submit_count) {
1451 call->tc_flags |= THREAD_CALL_WAIT;
1452
1453 res = assert_wait(call, THREAD_UNINT);
1454 if (res != THREAD_WAITING) {
1455 panic("Unable to assert wait?");
1456 }
1457
1458 thread_call_unlock();
1459 (void) spllo();
1460
1461 res = thread_block(NULL);
1462 if (res != THREAD_AWAKENED) {
1463 panic("Awoken with %d?", res);
1464 }
1465
1466 (void) splsched();
1467 thread_call_lock_spin();
1468 }
1469 }
1470
1471 /*
1472 * Determine whether a thread call is either on a queue or
1473 * currently being executed.
1474 */
1475 boolean_t
1476 thread_call_isactive(thread_call_t call)
1477 {
1478 boolean_t active;
1479
1480 disable_ints_and_lock();
1481 active = (call->tc_submit_count > call->tc_finish_count);
1482 enable_ints_and_unlock();
1483
1484 return active;
1485 }
1486