2 * Copyright (c) 2018 Apple Inc. All rights reserved.
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
29 #include <machine/machine_cpu.h>
30 #include <kern/locks.h>
31 #include <kern/mpsc_queue.h>
32 #include <kern/thread.h>
34 #pragma mark Single Consumer calls
36 __attribute__((noinline
))
37 static mpsc_queue_chain_t
38 _mpsc_queue_wait_for_enqueuer(struct mpsc_queue_chain
*_Atomic
*ptr
)
40 return hw_wait_while_equals((void **)ptr
, NULL
);
44 mpsc_queue_restore_batch(mpsc_queue_head_t q
, mpsc_queue_chain_t first
,
45 mpsc_queue_chain_t last
)
47 mpsc_queue_chain_t head
= os_atomic_load(&q
->mpqh_head
.mpqc_next
, relaxed
);
49 os_atomic_store(&last
->mpqc_next
, head
, relaxed
);
52 !os_atomic_cmpxchg(&q
->mpqh_tail
, &q
->mpqh_head
, last
, release
)) {
53 head
= os_atomic_load(&q
->mpqh_head
.mpqc_next
, relaxed
);
54 if (__improbable(head
== NULL
)) {
55 head
= _mpsc_queue_wait_for_enqueuer(&q
->mpqh_head
.mpqc_next
);
57 os_atomic_store(&last
->mpqc_next
, head
, relaxed
);
60 os_atomic_store(&q
->mpqh_head
.mpqc_next
, first
, relaxed
);
64 mpsc_queue_dequeue_batch(mpsc_queue_head_t q
, mpsc_queue_chain_t
*tail_out
,
65 os_atomic_dependency_t dependency
)
67 mpsc_queue_chain_t head
, tail
;
69 q
= os_atomic_inject_dependency(q
, dependency
);
71 tail
= os_atomic_load(&q
->mpqh_tail
, relaxed
);
72 if (__improbable(tail
== &q
->mpqh_head
)) {
77 head
= os_atomic_load(&q
->mpqh_head
.mpqc_next
, relaxed
);
78 if (__improbable(head
== NULL
)) {
79 head
= _mpsc_queue_wait_for_enqueuer(&q
->mpqh_head
.mpqc_next
);
81 os_atomic_store(&q
->mpqh_head
.mpqc_next
, NULL
, relaxed
);
83 * 22708742: set tail to &q->mpqh_head with release, so that NULL write
84 * to head above doesn't clobber the head set by concurrent enqueuer
86 * The other half of the seq_cst is required to pair with any enqueuer that
87 * contributed to an element in this list (pairs with the release fence in
88 * __mpsc_queue_append_update_tail().
90 * Making this seq_cst instead of acq_rel makes mpsc_queue_append*()
91 * visibility transitive (when items hop from one queue to the next)
92 * which is expected by clients implicitly.
94 * Note that this is the same number of fences that a traditional lock
95 * would have, but as a once-per-batch cost.
97 *tail_out
= os_atomic_xchg(&q
->mpqh_tail
, &q
->mpqh_head
, seq_cst
);
103 mpsc_queue_batch_next(mpsc_queue_chain_t cur
, mpsc_queue_chain_t tail
)
105 mpsc_queue_chain_t elm
= NULL
;
106 if (cur
== tail
|| cur
== NULL
) {
110 elm
= os_atomic_load(&cur
->mpqc_next
, relaxed
);
111 if (__improbable(elm
== NULL
)) {
112 elm
= _mpsc_queue_wait_for_enqueuer(&cur
->mpqc_next
);
117 #pragma mark "GCD"-like facilities
119 static void _mpsc_daemon_queue_drain(mpsc_daemon_queue_t
, thread_t
);
120 static void _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t
, mpsc_queue_chain_t
);
122 /* thread based queues */
125 _mpsc_queue_thread_continue(void *param
, wait_result_t wr __unused
)
127 mpsc_daemon_queue_t dq
= param
;
129 assert(dq
->mpd_thread
== current_thread());
130 _mpsc_daemon_queue_drain(dq
, dq
->mpd_thread
);
131 thread_block_parameter(_mpsc_queue_thread_continue
, dq
);
135 _mpsc_queue_thread_wakeup(mpsc_daemon_queue_t dq
)
137 thread_wakeup_thread((event_t
)dq
, dq
->mpd_thread
);
141 _mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq
,
142 mpsc_daemon_invoke_fn_t invoke
, int pri
, const char *name
,
143 mpsc_daemon_queue_kind_t kind
)
147 *dq
= (struct mpsc_daemon_queue
){
149 .mpd_invoke
= invoke
,
150 .mpd_queue
= MPSC_QUEUE_INITIALIZER(dq
->mpd_queue
),
151 .mpd_chain
= { MPSC_QUEUE_NOTQUEUED_MARKER
},
154 kr
= kernel_thread_create(_mpsc_queue_thread_continue
, dq
, pri
,
156 if (kr
== KERN_SUCCESS
) {
157 thread_set_thread_name(dq
->mpd_thread
, name
);
158 thread_start_in_assert_wait(dq
->mpd_thread
, (event_t
)dq
, THREAD_UNINT
);
159 thread_deallocate(dq
->mpd_thread
);
165 mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq
,
166 mpsc_daemon_invoke_fn_t invoke
, int pri
, const char *name
)
168 return _mpsc_daemon_queue_init_with_thread(dq
, invoke
, pri
, name
,
169 MPSC_QUEUE_KIND_THREAD
);
172 /* thread-call based queues */
175 _mpsc_queue_thread_call_drain(thread_call_param_t arg0
,
176 thread_call_param_t arg1 __unused
)
178 _mpsc_daemon_queue_drain((mpsc_daemon_queue_t
)arg0
, NULL
);
182 _mpsc_queue_thread_call_wakeup(mpsc_daemon_queue_t dq
)
184 thread_call_enter(dq
->mpd_call
);
188 mpsc_daemon_queue_init_with_thread_call(mpsc_daemon_queue_t dq
,
189 mpsc_daemon_invoke_fn_t invoke
, thread_call_priority_t pri
)
191 *dq
= (struct mpsc_daemon_queue
){
192 .mpd_kind
= MPSC_QUEUE_KIND_THREAD_CALL
,
193 .mpd_invoke
= invoke
,
194 .mpd_queue
= MPSC_QUEUE_INITIALIZER(dq
->mpd_queue
),
195 .mpd_chain
= { MPSC_QUEUE_NOTQUEUED_MARKER
},
197 dq
->mpd_call
= thread_call_allocate_with_options(
198 _mpsc_queue_thread_call_drain
, dq
, pri
, THREAD_CALL_OPTIONS_ONCE
);
204 mpsc_daemon_queue_nested_invoke(mpsc_queue_chain_t elm
,
205 __unused mpsc_daemon_queue_t tq
)
207 mpsc_daemon_queue_t dq
;
208 dq
= mpsc_queue_element(elm
, struct mpsc_daemon_queue
, mpd_chain
);
209 _mpsc_daemon_queue_drain(dq
, NULL
);
213 _mpsc_daemon_queue_nested_wakeup(mpsc_daemon_queue_t dq
)
215 _mpsc_daemon_queue_enqueue(dq
->mpd_target
, &dq
->mpd_chain
);
219 mpsc_daemon_queue_init_with_target(mpsc_daemon_queue_t dq
,
220 mpsc_daemon_invoke_fn_t invoke
, mpsc_daemon_queue_t target
)
222 *dq
= (struct mpsc_daemon_queue
){
223 .mpd_kind
= MPSC_QUEUE_KIND_NESTED
,
224 .mpd_invoke
= invoke
,
225 .mpd_target
= target
,
226 .mpd_queue
= MPSC_QUEUE_INITIALIZER(dq
->mpd_queue
),
227 .mpd_chain
= { MPSC_QUEUE_NOTQUEUED_MARKER
},
231 /* enqueue, drain & cancelation */
234 _mpsc_daemon_queue_drain(mpsc_daemon_queue_t dq
, thread_t self
)
236 mpsc_daemon_invoke_fn_t invoke
= dq
->mpd_invoke
;
237 mpsc_daemon_queue_kind_t kind
= dq
->mpd_kind
;
238 mpsc_queue_chain_t head
, cur
, tail
;
239 mpsc_daemon_queue_state_t st
;
241 if (kind
== MPSC_QUEUE_KIND_THREAD_CRITICAL
) {
242 self
->options
|= TH_OPT_SYSTEM_CRITICAL
;
247 * Most of the time we're woken up because we're dirty,
248 * This atomic xor sets DRAINING and clears WAKEUP in a single atomic
251 * However, if we're woken up for cancelation, the state may be reduced to
252 * the CANCELED bit set only, and then the xor will actually set WAKEUP.
253 * We need to correct this and clear it back to avoid looping below.
254 * This is safe to do as no one is allowed to enqueue more work after
255 * cancelation has happened.
257 * We use `st` as a dependency token to pair with the release fence in
258 * _mpsc_daemon_queue_enqueue() which gives us the guarantee that the update
259 * to the tail of the MPSC queue that made it non empty is visible to us.
261 st
= os_atomic_xor(&dq
->mpd_state
,
262 MPSC_QUEUE_STATE_DRAINING
| MPSC_QUEUE_STATE_WAKEUP
, dependency
);
263 assert(st
& MPSC_QUEUE_STATE_DRAINING
);
264 if (__improbable(st
& MPSC_QUEUE_STATE_WAKEUP
)) {
265 assert(st
& MPSC_QUEUE_STATE_CANCELED
);
266 os_atomic_andnot(&dq
->mpd_state
, MPSC_QUEUE_STATE_WAKEUP
, relaxed
);
269 os_atomic_dependency_t dep
= os_atomic_make_dependency((uintptr_t)st
);
270 while ((head
= mpsc_queue_dequeue_batch(&dq
->mpd_queue
, &tail
, dep
))) {
271 mpsc_queue_batch_foreach_safe(cur
, head
, tail
) {
272 os_atomic_store(&cur
->mpqc_next
,
273 MPSC_QUEUE_NOTQUEUED_MARKER
, relaxed
);
279 assert_wait((event_t
)dq
, THREAD_UNINT
);
283 * Unlike GCD no fence is necessary here: there is no concept similar
284 * to "dispatch_sync()" that would require changes this thread made to be
285 * visible to other threads as part of the mpsc_daemon_queue machinery.
287 * Making updates that happened on the daemon queue visible to other threads
288 * is the responsibility of the client.
290 st
= os_atomic_andnot(&dq
->mpd_state
, MPSC_QUEUE_STATE_DRAINING
, relaxed
);
293 * A wakeup has happened while we were draining,
294 * which means that the queue did an [ empty -> non empty ]
295 * transition during our drain.
297 * Chances are we already observed and drained everything,
298 * but we need to be absolutely sure, so start a drain again
299 * as the enqueuer observed the DRAINING bit and has skipped calling
300 * _mpsc_daemon_queue_wakeup().
302 if (__improbable(st
& MPSC_QUEUE_STATE_WAKEUP
)) {
304 clear_wait(self
, THREAD_AWAKENED
);
309 /* dereferencing `dq` past this point is unsafe */
311 if (kind
== MPSC_QUEUE_KIND_THREAD_CRITICAL
) {
312 self
->options
&= ~TH_OPT_SYSTEM_CRITICAL
;
315 if (__improbable(st
& MPSC_QUEUE_STATE_CANCELED
)) {
316 thread_wakeup(&dq
->mpd_state
);
318 clear_wait(self
, THREAD_AWAKENED
);
319 thread_terminate_self();
320 __builtin_unreachable();
326 _mpsc_daemon_queue_wakeup(mpsc_daemon_queue_t dq
)
328 switch (dq
->mpd_kind
) {
329 case MPSC_QUEUE_KIND_NESTED
:
330 _mpsc_daemon_queue_nested_wakeup(dq
);
332 case MPSC_QUEUE_KIND_THREAD
:
333 case MPSC_QUEUE_KIND_THREAD_CRITICAL
:
334 _mpsc_queue_thread_wakeup(dq
);
336 case MPSC_QUEUE_KIND_THREAD_CALL
:
337 _mpsc_queue_thread_call_wakeup(dq
);
340 panic("mpsc_queue[%p]: invalid kind (%d)", dq
, dq
->mpd_kind
);
345 _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t dq
, mpsc_queue_chain_t elm
)
347 mpsc_daemon_queue_state_t st
;
349 if (mpsc_queue_append(&dq
->mpd_queue
, elm
)) {
351 * Pairs with the acquire fence in _mpsc_daemon_queue_drain().
353 st
= os_atomic_or_orig(&dq
->mpd_state
, MPSC_QUEUE_STATE_WAKEUP
, release
);
354 if (__improbable(st
& MPSC_QUEUE_STATE_CANCELED
)) {
355 panic("mpsc_queue[%p]: use after cancelation", dq
);
358 if ((st
& (MPSC_QUEUE_STATE_DRAINING
| MPSC_QUEUE_STATE_WAKEUP
)) == 0) {
359 _mpsc_daemon_queue_wakeup(dq
);
365 mpsc_daemon_enqueue(mpsc_daemon_queue_t dq
, mpsc_queue_chain_t elm
,
366 mpsc_queue_options_t options
)
368 if (options
& MPSC_QUEUE_DISABLE_PREEMPTION
) {
369 disable_preemption();
372 _mpsc_daemon_queue_enqueue(dq
, elm
);
374 if (options
& MPSC_QUEUE_DISABLE_PREEMPTION
) {
380 mpsc_daemon_queue_cancel_and_wait(mpsc_daemon_queue_t dq
)
382 mpsc_daemon_queue_state_t st
;
384 assert_wait((event_t
)&dq
->mpd_state
, THREAD_UNINT
);
386 st
= os_atomic_or_orig(&dq
->mpd_state
, MPSC_QUEUE_STATE_CANCELED
, relaxed
);
387 if (__improbable(st
& MPSC_QUEUE_STATE_CANCELED
)) {
388 panic("mpsc_queue[%p]: cancelled twice (%x)", dq
, st
);
391 if (dq
->mpd_kind
== MPSC_QUEUE_KIND_NESTED
&& st
== 0) {
392 clear_wait(current_thread(), THREAD_AWAKENED
);
394 disable_preemption();
395 _mpsc_daemon_queue_wakeup(dq
);
397 thread_block(THREAD_CONTINUE_NULL
);
400 switch (dq
->mpd_kind
) {
401 case MPSC_QUEUE_KIND_NESTED
:
402 dq
->mpd_target
= NULL
;
404 case MPSC_QUEUE_KIND_THREAD
:
405 case MPSC_QUEUE_KIND_THREAD_CRITICAL
:
406 dq
->mpd_thread
= NULL
;
408 case MPSC_QUEUE_KIND_THREAD_CALL
:
409 thread_call_cancel_wait(dq
->mpd_call
);
410 thread_call_free(dq
->mpd_call
);
414 panic("mpsc_queue[%p]: invalid kind (%d)", dq
, dq
->mpd_kind
);
416 dq
->mpd_kind
= MPSC_QUEUE_KIND_UNKNOWN
;
419 #pragma mark deferred deallocation daemon
421 static struct mpsc_daemon_queue thread_deferred_deallocation_queue
;
424 thread_deallocate_daemon_init(void)
428 kr
= _mpsc_daemon_queue_init_with_thread(&thread_deferred_deallocation_queue
,
429 mpsc_daemon_queue_nested_invoke
, MINPRI_KERNEL
,
430 "daemon.deferred-deallocation", MPSC_QUEUE_KIND_THREAD_CRITICAL
);
431 if (kr
!= KERN_SUCCESS
) {
432 panic("thread_deallocate_daemon_init: creating daemon failed (%d)", kr
);
437 thread_deallocate_daemon_register_queue(mpsc_daemon_queue_t dq
,
438 mpsc_daemon_invoke_fn_t invoke
)
440 mpsc_daemon_queue_init_with_target(dq
, invoke
,
441 &thread_deferred_deallocation_queue
);