]> git.saurik.com Git - apple/xnu.git/blame - osfmk/kern/mpsc_queue.c
xnu-7195.101.1.tar.gz
[apple/xnu.git] / osfmk / kern / mpsc_queue.c
CommitLineData
cb323159
A
1/*
2 * Copyright (c) 2018 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28
29#include <machine/machine_cpu.h>
30#include <kern/locks.h>
31#include <kern/mpsc_queue.h>
32#include <kern/thread.h>
33
34#pragma mark Single Consumer calls
35
36__attribute__((noinline))
37static mpsc_queue_chain_t
38_mpsc_queue_wait_for_enqueuer(struct mpsc_queue_chain *_Atomic *ptr)
39{
40 return hw_wait_while_equals((void **)ptr, NULL);
41}
42
43void
44mpsc_queue_restore_batch(mpsc_queue_head_t q, mpsc_queue_chain_t first,
45 mpsc_queue_chain_t last)
46{
47 mpsc_queue_chain_t head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed);
48
49 os_atomic_store(&last->mpqc_next, head, relaxed);
50
51 if (head == NULL &&
52 !os_atomic_cmpxchg(&q->mpqh_tail, &q->mpqh_head, last, release)) {
53 head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed);
54 if (__improbable(head == NULL)) {
55 head = _mpsc_queue_wait_for_enqueuer(&q->mpqh_head.mpqc_next);
56 }
57 os_atomic_store(&last->mpqc_next, head, relaxed);
58 }
59
60 os_atomic_store(&q->mpqh_head.mpqc_next, first, relaxed);
61}
62
63mpsc_queue_chain_t
64mpsc_queue_dequeue_batch(mpsc_queue_head_t q, mpsc_queue_chain_t *tail_out,
65 os_atomic_dependency_t dependency)
66{
67 mpsc_queue_chain_t head, tail;
68
69 q = os_atomic_inject_dependency(q, dependency);
70
71 tail = os_atomic_load(&q->mpqh_tail, relaxed);
72 if (__improbable(tail == &q->mpqh_head)) {
73 *tail_out = NULL;
74 return NULL;
75 }
76
77 head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed);
78 if (__improbable(head == NULL)) {
79 head = _mpsc_queue_wait_for_enqueuer(&q->mpqh_head.mpqc_next);
80 }
81 os_atomic_store(&q->mpqh_head.mpqc_next, NULL, relaxed);
82 /*
83 * 22708742: set tail to &q->mpqh_head with release, so that NULL write
84 * to head above doesn't clobber the head set by concurrent enqueuer
85 *
86 * The other half of the seq_cst is required to pair with any enqueuer that
87 * contributed to an element in this list (pairs with the release fence in
88 * __mpsc_queue_append_update_tail().
89 *
90 * Making this seq_cst instead of acq_rel makes mpsc_queue_append*()
91 * visibility transitive (when items hop from one queue to the next)
92 * which is expected by clients implicitly.
93 *
94 * Note that this is the same number of fences that a traditional lock
95 * would have, but as a once-per-batch cost.
96 */
97 *tail_out = os_atomic_xchg(&q->mpqh_tail, &q->mpqh_head, seq_cst);
98
99 return head;
100}
101
102mpsc_queue_chain_t
103mpsc_queue_batch_next(mpsc_queue_chain_t cur, mpsc_queue_chain_t tail)
104{
105 mpsc_queue_chain_t elm = NULL;
106 if (cur == tail || cur == NULL) {
107 return elm;
108 }
109
110 elm = os_atomic_load(&cur->mpqc_next, relaxed);
111 if (__improbable(elm == NULL)) {
112 elm = _mpsc_queue_wait_for_enqueuer(&cur->mpqc_next);
113 }
114 return elm;
115}
116
117#pragma mark "GCD"-like facilities
118
119static void _mpsc_daemon_queue_drain(mpsc_daemon_queue_t, thread_t);
120static void _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t, mpsc_queue_chain_t);
121
122/* thread based queues */
123
124static void
125_mpsc_queue_thread_continue(void *param, wait_result_t wr __unused)
126{
127 mpsc_daemon_queue_t dq = param;
f427ee49
A
128 mpsc_daemon_queue_kind_t kind = dq->mpd_kind;
129 thread_t self = dq->mpd_thread;
130
131 __builtin_assume(self != THREAD_NULL);
132
133 if (kind == MPSC_QUEUE_KIND_THREAD_CRITICAL) {
134 self->options |= TH_OPT_SYSTEM_CRITICAL;
135 }
cb323159
A
136
137 assert(dq->mpd_thread == current_thread());
f427ee49
A
138 _mpsc_daemon_queue_drain(dq, self);
139
140 if (kind == MPSC_QUEUE_KIND_THREAD_CRITICAL) {
141 self->options &= ~TH_OPT_SYSTEM_CRITICAL;
142 }
143
cb323159
A
144 thread_block_parameter(_mpsc_queue_thread_continue, dq);
145}
146
147static void
148_mpsc_queue_thread_wakeup(mpsc_daemon_queue_t dq)
149{
150 thread_wakeup_thread((event_t)dq, dq->mpd_thread);
151}
152
153static kern_return_t
154_mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq,
155 mpsc_daemon_invoke_fn_t invoke, int pri, const char *name,
156 mpsc_daemon_queue_kind_t kind)
157{
158 kern_return_t kr;
159
160 *dq = (struct mpsc_daemon_queue){
161 .mpd_kind = kind,
162 .mpd_invoke = invoke,
163 .mpd_queue = MPSC_QUEUE_INITIALIZER(dq->mpd_queue),
164 .mpd_chain = { MPSC_QUEUE_NOTQUEUED_MARKER },
165 };
166
167 kr = kernel_thread_create(_mpsc_queue_thread_continue, dq, pri,
168 &dq->mpd_thread);
169 if (kr == KERN_SUCCESS) {
170 thread_set_thread_name(dq->mpd_thread, name);
171 thread_start_in_assert_wait(dq->mpd_thread, (event_t)dq, THREAD_UNINT);
172 thread_deallocate(dq->mpd_thread);
173 }
174 return kr;
175}
176
177kern_return_t
178mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq,
179 mpsc_daemon_invoke_fn_t invoke, int pri, const char *name)
180{
181 return _mpsc_daemon_queue_init_with_thread(dq, invoke, pri, name,
182 MPSC_QUEUE_KIND_THREAD);
183}
184
185/* thread-call based queues */
186
187static void
188_mpsc_queue_thread_call_drain(thread_call_param_t arg0,
189 thread_call_param_t arg1 __unused)
190{
191 _mpsc_daemon_queue_drain((mpsc_daemon_queue_t)arg0, NULL);
192}
193
194static void
195_mpsc_queue_thread_call_wakeup(mpsc_daemon_queue_t dq)
196{
197 thread_call_enter(dq->mpd_call);
198}
199
200void
201mpsc_daemon_queue_init_with_thread_call(mpsc_daemon_queue_t dq,
202 mpsc_daemon_invoke_fn_t invoke, thread_call_priority_t pri)
203{
204 *dq = (struct mpsc_daemon_queue){
205 .mpd_kind = MPSC_QUEUE_KIND_THREAD_CALL,
206 .mpd_invoke = invoke,
207 .mpd_queue = MPSC_QUEUE_INITIALIZER(dq->mpd_queue),
208 .mpd_chain = { MPSC_QUEUE_NOTQUEUED_MARKER },
209 };
210 dq->mpd_call = thread_call_allocate_with_options(
211 _mpsc_queue_thread_call_drain, dq, pri, THREAD_CALL_OPTIONS_ONCE);
212}
213
214/* nested queues */
215
216void
217mpsc_daemon_queue_nested_invoke(mpsc_queue_chain_t elm,
218 __unused mpsc_daemon_queue_t tq)
219{
220 mpsc_daemon_queue_t dq;
221 dq = mpsc_queue_element(elm, struct mpsc_daemon_queue, mpd_chain);
222 _mpsc_daemon_queue_drain(dq, NULL);
223}
224
225static void
226_mpsc_daemon_queue_nested_wakeup(mpsc_daemon_queue_t dq)
227{
228 _mpsc_daemon_queue_enqueue(dq->mpd_target, &dq->mpd_chain);
229}
230
231void
232mpsc_daemon_queue_init_with_target(mpsc_daemon_queue_t dq,
233 mpsc_daemon_invoke_fn_t invoke, mpsc_daemon_queue_t target)
234{
235 *dq = (struct mpsc_daemon_queue){
236 .mpd_kind = MPSC_QUEUE_KIND_NESTED,
237 .mpd_invoke = invoke,
238 .mpd_target = target,
239 .mpd_queue = MPSC_QUEUE_INITIALIZER(dq->mpd_queue),
240 .mpd_chain = { MPSC_QUEUE_NOTQUEUED_MARKER },
241 };
242}
243
244/* enqueue, drain & cancelation */
245
246static void
247_mpsc_daemon_queue_drain(mpsc_daemon_queue_t dq, thread_t self)
248{
249 mpsc_daemon_invoke_fn_t invoke = dq->mpd_invoke;
cb323159
A
250 mpsc_queue_chain_t head, cur, tail;
251 mpsc_daemon_queue_state_t st;
252
cb323159
A
253again:
254 /*
255 * Most of the time we're woken up because we're dirty,
256 * This atomic xor sets DRAINING and clears WAKEUP in a single atomic
257 * in that case.
258 *
259 * However, if we're woken up for cancelation, the state may be reduced to
260 * the CANCELED bit set only, and then the xor will actually set WAKEUP.
261 * We need to correct this and clear it back to avoid looping below.
262 * This is safe to do as no one is allowed to enqueue more work after
263 * cancelation has happened.
264 *
265 * We use `st` as a dependency token to pair with the release fence in
266 * _mpsc_daemon_queue_enqueue() which gives us the guarantee that the update
267 * to the tail of the MPSC queue that made it non empty is visible to us.
268 */
269 st = os_atomic_xor(&dq->mpd_state,
270 MPSC_QUEUE_STATE_DRAINING | MPSC_QUEUE_STATE_WAKEUP, dependency);
271 assert(st & MPSC_QUEUE_STATE_DRAINING);
272 if (__improbable(st & MPSC_QUEUE_STATE_WAKEUP)) {
273 assert(st & MPSC_QUEUE_STATE_CANCELED);
274 os_atomic_andnot(&dq->mpd_state, MPSC_QUEUE_STATE_WAKEUP, relaxed);
275 }
276
277 os_atomic_dependency_t dep = os_atomic_make_dependency((uintptr_t)st);
278 while ((head = mpsc_queue_dequeue_batch(&dq->mpd_queue, &tail, dep))) {
279 mpsc_queue_batch_foreach_safe(cur, head, tail) {
280 os_atomic_store(&cur->mpqc_next,
281 MPSC_QUEUE_NOTQUEUED_MARKER, relaxed);
282 invoke(cur, dq);
283 }
284 }
285
286 if (self) {
287 assert_wait((event_t)dq, THREAD_UNINT);
288 }
289
290 /*
291 * Unlike GCD no fence is necessary here: there is no concept similar
292 * to "dispatch_sync()" that would require changes this thread made to be
293 * visible to other threads as part of the mpsc_daemon_queue machinery.
294 *
295 * Making updates that happened on the daemon queue visible to other threads
296 * is the responsibility of the client.
297 */
298 st = os_atomic_andnot(&dq->mpd_state, MPSC_QUEUE_STATE_DRAINING, relaxed);
299
300 /*
301 * A wakeup has happened while we were draining,
302 * which means that the queue did an [ empty -> non empty ]
303 * transition during our drain.
304 *
305 * Chances are we already observed and drained everything,
306 * but we need to be absolutely sure, so start a drain again
307 * as the enqueuer observed the DRAINING bit and has skipped calling
308 * _mpsc_daemon_queue_wakeup().
309 */
310 if (__improbable(st & MPSC_QUEUE_STATE_WAKEUP)) {
311 if (self) {
312 clear_wait(self, THREAD_AWAKENED);
313 }
314 goto again;
315 }
316
317 /* dereferencing `dq` past this point is unsafe */
318
cb323159
A
319 if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) {
320 thread_wakeup(&dq->mpd_state);
321 if (self) {
322 clear_wait(self, THREAD_AWAKENED);
323 thread_terminate_self();
324 __builtin_unreachable();
325 }
326 }
327}
328
329static void
330_mpsc_daemon_queue_wakeup(mpsc_daemon_queue_t dq)
331{
332 switch (dq->mpd_kind) {
333 case MPSC_QUEUE_KIND_NESTED:
334 _mpsc_daemon_queue_nested_wakeup(dq);
335 break;
336 case MPSC_QUEUE_KIND_THREAD:
337 case MPSC_QUEUE_KIND_THREAD_CRITICAL:
338 _mpsc_queue_thread_wakeup(dq);
339 break;
340 case MPSC_QUEUE_KIND_THREAD_CALL:
341 _mpsc_queue_thread_call_wakeup(dq);
342 break;
343 default:
344 panic("mpsc_queue[%p]: invalid kind (%d)", dq, dq->mpd_kind);
345 }
346}
347
348static void
349_mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t dq, mpsc_queue_chain_t elm)
350{
351 mpsc_daemon_queue_state_t st;
352
353 if (mpsc_queue_append(&dq->mpd_queue, elm)) {
354 /*
355 * Pairs with the acquire fence in _mpsc_daemon_queue_drain().
356 */
357 st = os_atomic_or_orig(&dq->mpd_state, MPSC_QUEUE_STATE_WAKEUP, release);
358 if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) {
359 panic("mpsc_queue[%p]: use after cancelation", dq);
360 }
361
362 if ((st & (MPSC_QUEUE_STATE_DRAINING | MPSC_QUEUE_STATE_WAKEUP)) == 0) {
363 _mpsc_daemon_queue_wakeup(dq);
364 }
365 }
366}
367
368void
369mpsc_daemon_enqueue(mpsc_daemon_queue_t dq, mpsc_queue_chain_t elm,
370 mpsc_queue_options_t options)
371{
372 if (options & MPSC_QUEUE_DISABLE_PREEMPTION) {
373 disable_preemption();
374 }
375
376 _mpsc_daemon_queue_enqueue(dq, elm);
377
378 if (options & MPSC_QUEUE_DISABLE_PREEMPTION) {
379 enable_preemption();
380 }
381}
382
383void
384mpsc_daemon_queue_cancel_and_wait(mpsc_daemon_queue_t dq)
385{
386 mpsc_daemon_queue_state_t st;
387
388 assert_wait((event_t)&dq->mpd_state, THREAD_UNINT);
389
390 st = os_atomic_or_orig(&dq->mpd_state, MPSC_QUEUE_STATE_CANCELED, relaxed);
391 if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) {
392 panic("mpsc_queue[%p]: cancelled twice (%x)", dq, st);
393 }
394
395 if (dq->mpd_kind == MPSC_QUEUE_KIND_NESTED && st == 0) {
396 clear_wait(current_thread(), THREAD_AWAKENED);
397 } else {
398 disable_preemption();
399 _mpsc_daemon_queue_wakeup(dq);
400 enable_preemption();
401 thread_block(THREAD_CONTINUE_NULL);
402 }
403
404 switch (dq->mpd_kind) {
405 case MPSC_QUEUE_KIND_NESTED:
406 dq->mpd_target = NULL;
407 break;
408 case MPSC_QUEUE_KIND_THREAD:
409 case MPSC_QUEUE_KIND_THREAD_CRITICAL:
410 dq->mpd_thread = NULL;
411 break;
412 case MPSC_QUEUE_KIND_THREAD_CALL:
413 thread_call_cancel_wait(dq->mpd_call);
414 thread_call_free(dq->mpd_call);
415 dq->mpd_call = NULL;
416 break;
417 default:
418 panic("mpsc_queue[%p]: invalid kind (%d)", dq, dq->mpd_kind);
419 }
420 dq->mpd_kind = MPSC_QUEUE_KIND_UNKNOWN;
421}
422
423#pragma mark deferred deallocation daemon
424
425static struct mpsc_daemon_queue thread_deferred_deallocation_queue;
426
427void
428thread_deallocate_daemon_init(void)
429{
430 kern_return_t kr;
431
432 kr = _mpsc_daemon_queue_init_with_thread(&thread_deferred_deallocation_queue,
433 mpsc_daemon_queue_nested_invoke, MINPRI_KERNEL,
434 "daemon.deferred-deallocation", MPSC_QUEUE_KIND_THREAD_CRITICAL);
435 if (kr != KERN_SUCCESS) {
436 panic("thread_deallocate_daemon_init: creating daemon failed (%d)", kr);
437 }
438}
439
440void
441thread_deallocate_daemon_register_queue(mpsc_daemon_queue_t dq,
442 mpsc_daemon_invoke_fn_t invoke)
443{
444 mpsc_daemon_queue_init_with_target(dq, invoke,
445 &thread_deferred_deallocation_queue);
446}