2 * Copyright (c) 2018 Apple Inc. All rights reserved.
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
29 #include <machine/machine_cpu.h>
30 #include <kern/locks.h>
31 #include <kern/mpsc_queue.h>
32 #include <kern/thread.h>
34 #pragma mark Single Consumer calls
36 __attribute__((noinline
))
37 static mpsc_queue_chain_t
38 _mpsc_queue_wait_for_enqueuer(struct mpsc_queue_chain
*_Atomic
*ptr
)
40 return hw_wait_while_equals((void **)ptr
, NULL
);
44 mpsc_queue_restore_batch(mpsc_queue_head_t q
, mpsc_queue_chain_t first
,
45 mpsc_queue_chain_t last
)
47 mpsc_queue_chain_t head
= os_atomic_load(&q
->mpqh_head
.mpqc_next
, relaxed
);
49 os_atomic_store(&last
->mpqc_next
, head
, relaxed
);
52 !os_atomic_cmpxchg(&q
->mpqh_tail
, &q
->mpqh_head
, last
, release
)) {
53 head
= os_atomic_load(&q
->mpqh_head
.mpqc_next
, relaxed
);
54 if (__improbable(head
== NULL
)) {
55 head
= _mpsc_queue_wait_for_enqueuer(&q
->mpqh_head
.mpqc_next
);
57 os_atomic_store(&last
->mpqc_next
, head
, relaxed
);
60 os_atomic_store(&q
->mpqh_head
.mpqc_next
, first
, relaxed
);
64 mpsc_queue_dequeue_batch(mpsc_queue_head_t q
, mpsc_queue_chain_t
*tail_out
,
65 os_atomic_dependency_t dependency
)
67 mpsc_queue_chain_t head
, tail
;
69 q
= os_atomic_inject_dependency(q
, dependency
);
71 tail
= os_atomic_load(&q
->mpqh_tail
, relaxed
);
72 if (__improbable(tail
== &q
->mpqh_head
)) {
77 head
= os_atomic_load(&q
->mpqh_head
.mpqc_next
, relaxed
);
78 if (__improbable(head
== NULL
)) {
79 head
= _mpsc_queue_wait_for_enqueuer(&q
->mpqh_head
.mpqc_next
);
81 os_atomic_store(&q
->mpqh_head
.mpqc_next
, NULL
, relaxed
);
83 * 22708742: set tail to &q->mpqh_head with release, so that NULL write
84 * to head above doesn't clobber the head set by concurrent enqueuer
86 * The other half of the seq_cst is required to pair with any enqueuer that
87 * contributed to an element in this list (pairs with the release fence in
88 * __mpsc_queue_append_update_tail().
90 * Making this seq_cst instead of acq_rel makes mpsc_queue_append*()
91 * visibility transitive (when items hop from one queue to the next)
92 * which is expected by clients implicitly.
94 * Note that this is the same number of fences that a traditional lock
95 * would have, but as a once-per-batch cost.
97 *tail_out
= os_atomic_xchg(&q
->mpqh_tail
, &q
->mpqh_head
, seq_cst
);
103 mpsc_queue_batch_next(mpsc_queue_chain_t cur
, mpsc_queue_chain_t tail
)
105 mpsc_queue_chain_t elm
= NULL
;
106 if (cur
== tail
|| cur
== NULL
) {
110 elm
= os_atomic_load(&cur
->mpqc_next
, relaxed
);
111 if (__improbable(elm
== NULL
)) {
112 elm
= _mpsc_queue_wait_for_enqueuer(&cur
->mpqc_next
);
117 #pragma mark "GCD"-like facilities
119 static void _mpsc_daemon_queue_drain(mpsc_daemon_queue_t
, thread_t
);
120 static void _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t
, mpsc_queue_chain_t
);
122 /* thread based queues */
125 _mpsc_queue_thread_continue(void *param
, wait_result_t wr __unused
)
127 mpsc_daemon_queue_t dq
= param
;
128 mpsc_daemon_queue_kind_t kind
= dq
->mpd_kind
;
129 thread_t self
= dq
->mpd_thread
;
131 __builtin_assume(self
!= THREAD_NULL
);
133 if (kind
== MPSC_QUEUE_KIND_THREAD_CRITICAL
) {
134 self
->options
|= TH_OPT_SYSTEM_CRITICAL
;
137 assert(dq
->mpd_thread
== current_thread());
138 _mpsc_daemon_queue_drain(dq
, self
);
140 if (kind
== MPSC_QUEUE_KIND_THREAD_CRITICAL
) {
141 self
->options
&= ~TH_OPT_SYSTEM_CRITICAL
;
144 thread_block_parameter(_mpsc_queue_thread_continue
, dq
);
148 _mpsc_queue_thread_wakeup(mpsc_daemon_queue_t dq
)
150 thread_wakeup_thread((event_t
)dq
, dq
->mpd_thread
);
154 _mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq
,
155 mpsc_daemon_invoke_fn_t invoke
, int pri
, const char *name
,
156 mpsc_daemon_queue_kind_t kind
)
160 *dq
= (struct mpsc_daemon_queue
){
162 .mpd_invoke
= invoke
,
163 .mpd_queue
= MPSC_QUEUE_INITIALIZER(dq
->mpd_queue
),
164 .mpd_chain
= { MPSC_QUEUE_NOTQUEUED_MARKER
},
167 kr
= kernel_thread_create(_mpsc_queue_thread_continue
, dq
, pri
,
169 if (kr
== KERN_SUCCESS
) {
170 thread_set_thread_name(dq
->mpd_thread
, name
);
171 thread_start_in_assert_wait(dq
->mpd_thread
, (event_t
)dq
, THREAD_UNINT
);
172 thread_deallocate(dq
->mpd_thread
);
178 mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq
,
179 mpsc_daemon_invoke_fn_t invoke
, int pri
, const char *name
)
181 return _mpsc_daemon_queue_init_with_thread(dq
, invoke
, pri
, name
,
182 MPSC_QUEUE_KIND_THREAD
);
185 /* thread-call based queues */
188 _mpsc_queue_thread_call_drain(thread_call_param_t arg0
,
189 thread_call_param_t arg1 __unused
)
191 _mpsc_daemon_queue_drain((mpsc_daemon_queue_t
)arg0
, NULL
);
195 _mpsc_queue_thread_call_wakeup(mpsc_daemon_queue_t dq
)
197 thread_call_enter(dq
->mpd_call
);
201 mpsc_daemon_queue_init_with_thread_call(mpsc_daemon_queue_t dq
,
202 mpsc_daemon_invoke_fn_t invoke
, thread_call_priority_t pri
)
204 *dq
= (struct mpsc_daemon_queue
){
205 .mpd_kind
= MPSC_QUEUE_KIND_THREAD_CALL
,
206 .mpd_invoke
= invoke
,
207 .mpd_queue
= MPSC_QUEUE_INITIALIZER(dq
->mpd_queue
),
208 .mpd_chain
= { MPSC_QUEUE_NOTQUEUED_MARKER
},
210 dq
->mpd_call
= thread_call_allocate_with_options(
211 _mpsc_queue_thread_call_drain
, dq
, pri
, THREAD_CALL_OPTIONS_ONCE
);
217 mpsc_daemon_queue_nested_invoke(mpsc_queue_chain_t elm
,
218 __unused mpsc_daemon_queue_t tq
)
220 mpsc_daemon_queue_t dq
;
221 dq
= mpsc_queue_element(elm
, struct mpsc_daemon_queue
, mpd_chain
);
222 _mpsc_daemon_queue_drain(dq
, NULL
);
226 _mpsc_daemon_queue_nested_wakeup(mpsc_daemon_queue_t dq
)
228 _mpsc_daemon_queue_enqueue(dq
->mpd_target
, &dq
->mpd_chain
);
232 mpsc_daemon_queue_init_with_target(mpsc_daemon_queue_t dq
,
233 mpsc_daemon_invoke_fn_t invoke
, mpsc_daemon_queue_t target
)
235 *dq
= (struct mpsc_daemon_queue
){
236 .mpd_kind
= MPSC_QUEUE_KIND_NESTED
,
237 .mpd_invoke
= invoke
,
238 .mpd_target
= target
,
239 .mpd_queue
= MPSC_QUEUE_INITIALIZER(dq
->mpd_queue
),
240 .mpd_chain
= { MPSC_QUEUE_NOTQUEUED_MARKER
},
244 /* enqueue, drain & cancelation */
247 _mpsc_daemon_queue_drain(mpsc_daemon_queue_t dq
, thread_t self
)
249 mpsc_daemon_invoke_fn_t invoke
= dq
->mpd_invoke
;
250 mpsc_queue_chain_t head
, cur
, tail
;
251 mpsc_daemon_queue_state_t st
;
255 * Most of the time we're woken up because we're dirty,
256 * This atomic xor sets DRAINING and clears WAKEUP in a single atomic
259 * However, if we're woken up for cancelation, the state may be reduced to
260 * the CANCELED bit set only, and then the xor will actually set WAKEUP.
261 * We need to correct this and clear it back to avoid looping below.
262 * This is safe to do as no one is allowed to enqueue more work after
263 * cancelation has happened.
265 * We use `st` as a dependency token to pair with the release fence in
266 * _mpsc_daemon_queue_enqueue() which gives us the guarantee that the update
267 * to the tail of the MPSC queue that made it non empty is visible to us.
269 st
= os_atomic_xor(&dq
->mpd_state
,
270 MPSC_QUEUE_STATE_DRAINING
| MPSC_QUEUE_STATE_WAKEUP
, dependency
);
271 assert(st
& MPSC_QUEUE_STATE_DRAINING
);
272 if (__improbable(st
& MPSC_QUEUE_STATE_WAKEUP
)) {
273 assert(st
& MPSC_QUEUE_STATE_CANCELED
);
274 os_atomic_andnot(&dq
->mpd_state
, MPSC_QUEUE_STATE_WAKEUP
, relaxed
);
277 os_atomic_dependency_t dep
= os_atomic_make_dependency((uintptr_t)st
);
278 while ((head
= mpsc_queue_dequeue_batch(&dq
->mpd_queue
, &tail
, dep
))) {
279 mpsc_queue_batch_foreach_safe(cur
, head
, tail
) {
280 os_atomic_store(&cur
->mpqc_next
,
281 MPSC_QUEUE_NOTQUEUED_MARKER
, relaxed
);
287 assert_wait((event_t
)dq
, THREAD_UNINT
);
291 * Unlike GCD no fence is necessary here: there is no concept similar
292 * to "dispatch_sync()" that would require changes this thread made to be
293 * visible to other threads as part of the mpsc_daemon_queue machinery.
295 * Making updates that happened on the daemon queue visible to other threads
296 * is the responsibility of the client.
298 st
= os_atomic_andnot(&dq
->mpd_state
, MPSC_QUEUE_STATE_DRAINING
, relaxed
);
301 * A wakeup has happened while we were draining,
302 * which means that the queue did an [ empty -> non empty ]
303 * transition during our drain.
305 * Chances are we already observed and drained everything,
306 * but we need to be absolutely sure, so start a drain again
307 * as the enqueuer observed the DRAINING bit and has skipped calling
308 * _mpsc_daemon_queue_wakeup().
310 if (__improbable(st
& MPSC_QUEUE_STATE_WAKEUP
)) {
312 clear_wait(self
, THREAD_AWAKENED
);
317 /* dereferencing `dq` past this point is unsafe */
319 if (__improbable(st
& MPSC_QUEUE_STATE_CANCELED
)) {
320 thread_wakeup(&dq
->mpd_state
);
322 clear_wait(self
, THREAD_AWAKENED
);
323 thread_terminate_self();
324 __builtin_unreachable();
330 _mpsc_daemon_queue_wakeup(mpsc_daemon_queue_t dq
)
332 switch (dq
->mpd_kind
) {
333 case MPSC_QUEUE_KIND_NESTED
:
334 _mpsc_daemon_queue_nested_wakeup(dq
);
336 case MPSC_QUEUE_KIND_THREAD
:
337 case MPSC_QUEUE_KIND_THREAD_CRITICAL
:
338 _mpsc_queue_thread_wakeup(dq
);
340 case MPSC_QUEUE_KIND_THREAD_CALL
:
341 _mpsc_queue_thread_call_wakeup(dq
);
344 panic("mpsc_queue[%p]: invalid kind (%d)", dq
, dq
->mpd_kind
);
349 _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t dq
, mpsc_queue_chain_t elm
)
351 mpsc_daemon_queue_state_t st
;
353 if (mpsc_queue_append(&dq
->mpd_queue
, elm
)) {
355 * Pairs with the acquire fence in _mpsc_daemon_queue_drain().
357 st
= os_atomic_or_orig(&dq
->mpd_state
, MPSC_QUEUE_STATE_WAKEUP
, release
);
358 if (__improbable(st
& MPSC_QUEUE_STATE_CANCELED
)) {
359 panic("mpsc_queue[%p]: use after cancelation", dq
);
362 if ((st
& (MPSC_QUEUE_STATE_DRAINING
| MPSC_QUEUE_STATE_WAKEUP
)) == 0) {
363 _mpsc_daemon_queue_wakeup(dq
);
369 mpsc_daemon_enqueue(mpsc_daemon_queue_t dq
, mpsc_queue_chain_t elm
,
370 mpsc_queue_options_t options
)
372 if (options
& MPSC_QUEUE_DISABLE_PREEMPTION
) {
373 disable_preemption();
376 _mpsc_daemon_queue_enqueue(dq
, elm
);
378 if (options
& MPSC_QUEUE_DISABLE_PREEMPTION
) {
384 mpsc_daemon_queue_cancel_and_wait(mpsc_daemon_queue_t dq
)
386 mpsc_daemon_queue_state_t st
;
388 assert_wait((event_t
)&dq
->mpd_state
, THREAD_UNINT
);
390 st
= os_atomic_or_orig(&dq
->mpd_state
, MPSC_QUEUE_STATE_CANCELED
, relaxed
);
391 if (__improbable(st
& MPSC_QUEUE_STATE_CANCELED
)) {
392 panic("mpsc_queue[%p]: cancelled twice (%x)", dq
, st
);
395 if (dq
->mpd_kind
== MPSC_QUEUE_KIND_NESTED
&& st
== 0) {
396 clear_wait(current_thread(), THREAD_AWAKENED
);
398 disable_preemption();
399 _mpsc_daemon_queue_wakeup(dq
);
401 thread_block(THREAD_CONTINUE_NULL
);
404 switch (dq
->mpd_kind
) {
405 case MPSC_QUEUE_KIND_NESTED
:
406 dq
->mpd_target
= NULL
;
408 case MPSC_QUEUE_KIND_THREAD
:
409 case MPSC_QUEUE_KIND_THREAD_CRITICAL
:
410 dq
->mpd_thread
= NULL
;
412 case MPSC_QUEUE_KIND_THREAD_CALL
:
413 thread_call_cancel_wait(dq
->mpd_call
);
414 thread_call_free(dq
->mpd_call
);
418 panic("mpsc_queue[%p]: invalid kind (%d)", dq
, dq
->mpd_kind
);
420 dq
->mpd_kind
= MPSC_QUEUE_KIND_UNKNOWN
;
423 #pragma mark deferred deallocation daemon
425 static struct mpsc_daemon_queue thread_deferred_deallocation_queue
;
428 thread_deallocate_daemon_init(void)
432 kr
= _mpsc_daemon_queue_init_with_thread(&thread_deferred_deallocation_queue
,
433 mpsc_daemon_queue_nested_invoke
, MINPRI_KERNEL
,
434 "daemon.deferred-deallocation", MPSC_QUEUE_KIND_THREAD_CRITICAL
);
435 if (kr
!= KERN_SUCCESS
) {
436 panic("thread_deallocate_daemon_init: creating daemon failed (%d)", kr
);
441 thread_deallocate_daemon_register_queue(mpsc_daemon_queue_t dq
,
442 mpsc_daemon_invoke_fn_t invoke
)
444 mpsc_daemon_queue_init_with_target(dq
, invoke
,
445 &thread_deferred_deallocation_queue
);