]>
Commit | Line | Data |
---|---|---|
cb323159 A |
1 | /* |
2 | * Copyright (c) 2018 Apple Inc. All rights reserved. | |
3 | * | |
4 | * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ | |
5 | * | |
6 | * This file contains Original Code and/or Modifications of Original Code | |
7 | * as defined in and that are subject to the Apple Public Source License | |
8 | * Version 2.0 (the 'License'). You may not use this file except in | |
9 | * compliance with the License. The rights granted to you under the License | |
10 | * may not be used to create, or enable the creation or redistribution of, | |
11 | * unlawful or unlicensed copies of an Apple operating system, or to | |
12 | * circumvent, violate, or enable the circumvention or violation of, any | |
13 | * terms of an Apple operating system software license agreement. | |
14 | * | |
15 | * Please obtain a copy of the License at | |
16 | * http://www.opensource.apple.com/apsl/ and read it before using this file. | |
17 | * | |
18 | * The Original Code and all software distributed under the License are | |
19 | * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER | |
20 | * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, | |
21 | * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, | |
22 | * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. | |
23 | * Please see the License for the specific language governing rights and | |
24 | * limitations under the License. | |
25 | * | |
26 | * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ | |
27 | */ | |
28 | ||
29 | #include <machine/machine_cpu.h> | |
30 | #include <kern/locks.h> | |
31 | #include <kern/mpsc_queue.h> | |
32 | #include <kern/thread.h> | |
33 | ||
34 | #pragma mark Single Consumer calls | |
35 | ||
36 | __attribute__((noinline)) | |
37 | static mpsc_queue_chain_t | |
38 | _mpsc_queue_wait_for_enqueuer(struct mpsc_queue_chain *_Atomic *ptr) | |
39 | { | |
40 | return hw_wait_while_equals((void **)ptr, NULL); | |
41 | } | |
42 | ||
43 | void | |
44 | mpsc_queue_restore_batch(mpsc_queue_head_t q, mpsc_queue_chain_t first, | |
45 | mpsc_queue_chain_t last) | |
46 | { | |
47 | mpsc_queue_chain_t head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed); | |
48 | ||
49 | os_atomic_store(&last->mpqc_next, head, relaxed); | |
50 | ||
51 | if (head == NULL && | |
52 | !os_atomic_cmpxchg(&q->mpqh_tail, &q->mpqh_head, last, release)) { | |
53 | head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed); | |
54 | if (__improbable(head == NULL)) { | |
55 | head = _mpsc_queue_wait_for_enqueuer(&q->mpqh_head.mpqc_next); | |
56 | } | |
57 | os_atomic_store(&last->mpqc_next, head, relaxed); | |
58 | } | |
59 | ||
60 | os_atomic_store(&q->mpqh_head.mpqc_next, first, relaxed); | |
61 | } | |
62 | ||
63 | mpsc_queue_chain_t | |
64 | mpsc_queue_dequeue_batch(mpsc_queue_head_t q, mpsc_queue_chain_t *tail_out, | |
65 | os_atomic_dependency_t dependency) | |
66 | { | |
67 | mpsc_queue_chain_t head, tail; | |
68 | ||
69 | q = os_atomic_inject_dependency(q, dependency); | |
70 | ||
71 | tail = os_atomic_load(&q->mpqh_tail, relaxed); | |
72 | if (__improbable(tail == &q->mpqh_head)) { | |
73 | *tail_out = NULL; | |
74 | return NULL; | |
75 | } | |
76 | ||
77 | head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed); | |
78 | if (__improbable(head == NULL)) { | |
79 | head = _mpsc_queue_wait_for_enqueuer(&q->mpqh_head.mpqc_next); | |
80 | } | |
81 | os_atomic_store(&q->mpqh_head.mpqc_next, NULL, relaxed); | |
82 | /* | |
83 | * 22708742: set tail to &q->mpqh_head with release, so that NULL write | |
84 | * to head above doesn't clobber the head set by concurrent enqueuer | |
85 | * | |
86 | * The other half of the seq_cst is required to pair with any enqueuer that | |
87 | * contributed to an element in this list (pairs with the release fence in | |
88 | * __mpsc_queue_append_update_tail(). | |
89 | * | |
90 | * Making this seq_cst instead of acq_rel makes mpsc_queue_append*() | |
91 | * visibility transitive (when items hop from one queue to the next) | |
92 | * which is expected by clients implicitly. | |
93 | * | |
94 | * Note that this is the same number of fences that a traditional lock | |
95 | * would have, but as a once-per-batch cost. | |
96 | */ | |
97 | *tail_out = os_atomic_xchg(&q->mpqh_tail, &q->mpqh_head, seq_cst); | |
98 | ||
99 | return head; | |
100 | } | |
101 | ||
102 | mpsc_queue_chain_t | |
103 | mpsc_queue_batch_next(mpsc_queue_chain_t cur, mpsc_queue_chain_t tail) | |
104 | { | |
105 | mpsc_queue_chain_t elm = NULL; | |
106 | if (cur == tail || cur == NULL) { | |
107 | return elm; | |
108 | } | |
109 | ||
110 | elm = os_atomic_load(&cur->mpqc_next, relaxed); | |
111 | if (__improbable(elm == NULL)) { | |
112 | elm = _mpsc_queue_wait_for_enqueuer(&cur->mpqc_next); | |
113 | } | |
114 | return elm; | |
115 | } | |
116 | ||
117 | #pragma mark "GCD"-like facilities | |
118 | ||
119 | static void _mpsc_daemon_queue_drain(mpsc_daemon_queue_t, thread_t); | |
120 | static void _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t, mpsc_queue_chain_t); | |
121 | ||
122 | /* thread based queues */ | |
123 | ||
124 | static void | |
125 | _mpsc_queue_thread_continue(void *param, wait_result_t wr __unused) | |
126 | { | |
127 | mpsc_daemon_queue_t dq = param; | |
f427ee49 A |
128 | mpsc_daemon_queue_kind_t kind = dq->mpd_kind; |
129 | thread_t self = dq->mpd_thread; | |
130 | ||
131 | __builtin_assume(self != THREAD_NULL); | |
132 | ||
133 | if (kind == MPSC_QUEUE_KIND_THREAD_CRITICAL) { | |
134 | self->options |= TH_OPT_SYSTEM_CRITICAL; | |
135 | } | |
cb323159 A |
136 | |
137 | assert(dq->mpd_thread == current_thread()); | |
f427ee49 A |
138 | _mpsc_daemon_queue_drain(dq, self); |
139 | ||
140 | if (kind == MPSC_QUEUE_KIND_THREAD_CRITICAL) { | |
141 | self->options &= ~TH_OPT_SYSTEM_CRITICAL; | |
142 | } | |
143 | ||
cb323159 A |
144 | thread_block_parameter(_mpsc_queue_thread_continue, dq); |
145 | } | |
146 | ||
147 | static void | |
148 | _mpsc_queue_thread_wakeup(mpsc_daemon_queue_t dq) | |
149 | { | |
150 | thread_wakeup_thread((event_t)dq, dq->mpd_thread); | |
151 | } | |
152 | ||
153 | static kern_return_t | |
154 | _mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq, | |
155 | mpsc_daemon_invoke_fn_t invoke, int pri, const char *name, | |
156 | mpsc_daemon_queue_kind_t kind) | |
157 | { | |
158 | kern_return_t kr; | |
159 | ||
160 | *dq = (struct mpsc_daemon_queue){ | |
161 | .mpd_kind = kind, | |
162 | .mpd_invoke = invoke, | |
163 | .mpd_queue = MPSC_QUEUE_INITIALIZER(dq->mpd_queue), | |
164 | .mpd_chain = { MPSC_QUEUE_NOTQUEUED_MARKER }, | |
165 | }; | |
166 | ||
167 | kr = kernel_thread_create(_mpsc_queue_thread_continue, dq, pri, | |
168 | &dq->mpd_thread); | |
169 | if (kr == KERN_SUCCESS) { | |
170 | thread_set_thread_name(dq->mpd_thread, name); | |
171 | thread_start_in_assert_wait(dq->mpd_thread, (event_t)dq, THREAD_UNINT); | |
172 | thread_deallocate(dq->mpd_thread); | |
173 | } | |
174 | return kr; | |
175 | } | |
176 | ||
177 | kern_return_t | |
178 | mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq, | |
179 | mpsc_daemon_invoke_fn_t invoke, int pri, const char *name) | |
180 | { | |
181 | return _mpsc_daemon_queue_init_with_thread(dq, invoke, pri, name, | |
182 | MPSC_QUEUE_KIND_THREAD); | |
183 | } | |
184 | ||
185 | /* thread-call based queues */ | |
186 | ||
187 | static void | |
188 | _mpsc_queue_thread_call_drain(thread_call_param_t arg0, | |
189 | thread_call_param_t arg1 __unused) | |
190 | { | |
191 | _mpsc_daemon_queue_drain((mpsc_daemon_queue_t)arg0, NULL); | |
192 | } | |
193 | ||
194 | static void | |
195 | _mpsc_queue_thread_call_wakeup(mpsc_daemon_queue_t dq) | |
196 | { | |
197 | thread_call_enter(dq->mpd_call); | |
198 | } | |
199 | ||
200 | void | |
201 | mpsc_daemon_queue_init_with_thread_call(mpsc_daemon_queue_t dq, | |
202 | mpsc_daemon_invoke_fn_t invoke, thread_call_priority_t pri) | |
203 | { | |
204 | *dq = (struct mpsc_daemon_queue){ | |
205 | .mpd_kind = MPSC_QUEUE_KIND_THREAD_CALL, | |
206 | .mpd_invoke = invoke, | |
207 | .mpd_queue = MPSC_QUEUE_INITIALIZER(dq->mpd_queue), | |
208 | .mpd_chain = { MPSC_QUEUE_NOTQUEUED_MARKER }, | |
209 | }; | |
210 | dq->mpd_call = thread_call_allocate_with_options( | |
211 | _mpsc_queue_thread_call_drain, dq, pri, THREAD_CALL_OPTIONS_ONCE); | |
212 | } | |
213 | ||
214 | /* nested queues */ | |
215 | ||
216 | void | |
217 | mpsc_daemon_queue_nested_invoke(mpsc_queue_chain_t elm, | |
218 | __unused mpsc_daemon_queue_t tq) | |
219 | { | |
220 | mpsc_daemon_queue_t dq; | |
221 | dq = mpsc_queue_element(elm, struct mpsc_daemon_queue, mpd_chain); | |
222 | _mpsc_daemon_queue_drain(dq, NULL); | |
223 | } | |
224 | ||
225 | static void | |
226 | _mpsc_daemon_queue_nested_wakeup(mpsc_daemon_queue_t dq) | |
227 | { | |
228 | _mpsc_daemon_queue_enqueue(dq->mpd_target, &dq->mpd_chain); | |
229 | } | |
230 | ||
231 | void | |
232 | mpsc_daemon_queue_init_with_target(mpsc_daemon_queue_t dq, | |
233 | mpsc_daemon_invoke_fn_t invoke, mpsc_daemon_queue_t target) | |
234 | { | |
235 | *dq = (struct mpsc_daemon_queue){ | |
236 | .mpd_kind = MPSC_QUEUE_KIND_NESTED, | |
237 | .mpd_invoke = invoke, | |
238 | .mpd_target = target, | |
239 | .mpd_queue = MPSC_QUEUE_INITIALIZER(dq->mpd_queue), | |
240 | .mpd_chain = { MPSC_QUEUE_NOTQUEUED_MARKER }, | |
241 | }; | |
242 | } | |
243 | ||
244 | /* enqueue, drain & cancelation */ | |
245 | ||
246 | static void | |
247 | _mpsc_daemon_queue_drain(mpsc_daemon_queue_t dq, thread_t self) | |
248 | { | |
249 | mpsc_daemon_invoke_fn_t invoke = dq->mpd_invoke; | |
cb323159 A |
250 | mpsc_queue_chain_t head, cur, tail; |
251 | mpsc_daemon_queue_state_t st; | |
252 | ||
cb323159 A |
253 | again: |
254 | /* | |
255 | * Most of the time we're woken up because we're dirty, | |
256 | * This atomic xor sets DRAINING and clears WAKEUP in a single atomic | |
257 | * in that case. | |
258 | * | |
259 | * However, if we're woken up for cancelation, the state may be reduced to | |
260 | * the CANCELED bit set only, and then the xor will actually set WAKEUP. | |
261 | * We need to correct this and clear it back to avoid looping below. | |
262 | * This is safe to do as no one is allowed to enqueue more work after | |
263 | * cancelation has happened. | |
264 | * | |
265 | * We use `st` as a dependency token to pair with the release fence in | |
266 | * _mpsc_daemon_queue_enqueue() which gives us the guarantee that the update | |
267 | * to the tail of the MPSC queue that made it non empty is visible to us. | |
268 | */ | |
269 | st = os_atomic_xor(&dq->mpd_state, | |
270 | MPSC_QUEUE_STATE_DRAINING | MPSC_QUEUE_STATE_WAKEUP, dependency); | |
271 | assert(st & MPSC_QUEUE_STATE_DRAINING); | |
272 | if (__improbable(st & MPSC_QUEUE_STATE_WAKEUP)) { | |
273 | assert(st & MPSC_QUEUE_STATE_CANCELED); | |
274 | os_atomic_andnot(&dq->mpd_state, MPSC_QUEUE_STATE_WAKEUP, relaxed); | |
275 | } | |
276 | ||
277 | os_atomic_dependency_t dep = os_atomic_make_dependency((uintptr_t)st); | |
278 | while ((head = mpsc_queue_dequeue_batch(&dq->mpd_queue, &tail, dep))) { | |
279 | mpsc_queue_batch_foreach_safe(cur, head, tail) { | |
280 | os_atomic_store(&cur->mpqc_next, | |
281 | MPSC_QUEUE_NOTQUEUED_MARKER, relaxed); | |
282 | invoke(cur, dq); | |
283 | } | |
284 | } | |
285 | ||
286 | if (self) { | |
287 | assert_wait((event_t)dq, THREAD_UNINT); | |
288 | } | |
289 | ||
290 | /* | |
291 | * Unlike GCD no fence is necessary here: there is no concept similar | |
292 | * to "dispatch_sync()" that would require changes this thread made to be | |
293 | * visible to other threads as part of the mpsc_daemon_queue machinery. | |
294 | * | |
295 | * Making updates that happened on the daemon queue visible to other threads | |
296 | * is the responsibility of the client. | |
297 | */ | |
298 | st = os_atomic_andnot(&dq->mpd_state, MPSC_QUEUE_STATE_DRAINING, relaxed); | |
299 | ||
300 | /* | |
301 | * A wakeup has happened while we were draining, | |
302 | * which means that the queue did an [ empty -> non empty ] | |
303 | * transition during our drain. | |
304 | * | |
305 | * Chances are we already observed and drained everything, | |
306 | * but we need to be absolutely sure, so start a drain again | |
307 | * as the enqueuer observed the DRAINING bit and has skipped calling | |
308 | * _mpsc_daemon_queue_wakeup(). | |
309 | */ | |
310 | if (__improbable(st & MPSC_QUEUE_STATE_WAKEUP)) { | |
311 | if (self) { | |
312 | clear_wait(self, THREAD_AWAKENED); | |
313 | } | |
314 | goto again; | |
315 | } | |
316 | ||
317 | /* dereferencing `dq` past this point is unsafe */ | |
318 | ||
cb323159 A |
319 | if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) { |
320 | thread_wakeup(&dq->mpd_state); | |
321 | if (self) { | |
322 | clear_wait(self, THREAD_AWAKENED); | |
323 | thread_terminate_self(); | |
324 | __builtin_unreachable(); | |
325 | } | |
326 | } | |
327 | } | |
328 | ||
329 | static void | |
330 | _mpsc_daemon_queue_wakeup(mpsc_daemon_queue_t dq) | |
331 | { | |
332 | switch (dq->mpd_kind) { | |
333 | case MPSC_QUEUE_KIND_NESTED: | |
334 | _mpsc_daemon_queue_nested_wakeup(dq); | |
335 | break; | |
336 | case MPSC_QUEUE_KIND_THREAD: | |
337 | case MPSC_QUEUE_KIND_THREAD_CRITICAL: | |
338 | _mpsc_queue_thread_wakeup(dq); | |
339 | break; | |
340 | case MPSC_QUEUE_KIND_THREAD_CALL: | |
341 | _mpsc_queue_thread_call_wakeup(dq); | |
342 | break; | |
343 | default: | |
344 | panic("mpsc_queue[%p]: invalid kind (%d)", dq, dq->mpd_kind); | |
345 | } | |
346 | } | |
347 | ||
348 | static void | |
349 | _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t dq, mpsc_queue_chain_t elm) | |
350 | { | |
351 | mpsc_daemon_queue_state_t st; | |
352 | ||
353 | if (mpsc_queue_append(&dq->mpd_queue, elm)) { | |
354 | /* | |
355 | * Pairs with the acquire fence in _mpsc_daemon_queue_drain(). | |
356 | */ | |
357 | st = os_atomic_or_orig(&dq->mpd_state, MPSC_QUEUE_STATE_WAKEUP, release); | |
358 | if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) { | |
359 | panic("mpsc_queue[%p]: use after cancelation", dq); | |
360 | } | |
361 | ||
362 | if ((st & (MPSC_QUEUE_STATE_DRAINING | MPSC_QUEUE_STATE_WAKEUP)) == 0) { | |
363 | _mpsc_daemon_queue_wakeup(dq); | |
364 | } | |
365 | } | |
366 | } | |
367 | ||
368 | void | |
369 | mpsc_daemon_enqueue(mpsc_daemon_queue_t dq, mpsc_queue_chain_t elm, | |
370 | mpsc_queue_options_t options) | |
371 | { | |
372 | if (options & MPSC_QUEUE_DISABLE_PREEMPTION) { | |
373 | disable_preemption(); | |
374 | } | |
375 | ||
376 | _mpsc_daemon_queue_enqueue(dq, elm); | |
377 | ||
378 | if (options & MPSC_QUEUE_DISABLE_PREEMPTION) { | |
379 | enable_preemption(); | |
380 | } | |
381 | } | |
382 | ||
383 | void | |
384 | mpsc_daemon_queue_cancel_and_wait(mpsc_daemon_queue_t dq) | |
385 | { | |
386 | mpsc_daemon_queue_state_t st; | |
387 | ||
388 | assert_wait((event_t)&dq->mpd_state, THREAD_UNINT); | |
389 | ||
390 | st = os_atomic_or_orig(&dq->mpd_state, MPSC_QUEUE_STATE_CANCELED, relaxed); | |
391 | if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) { | |
392 | panic("mpsc_queue[%p]: cancelled twice (%x)", dq, st); | |
393 | } | |
394 | ||
395 | if (dq->mpd_kind == MPSC_QUEUE_KIND_NESTED && st == 0) { | |
396 | clear_wait(current_thread(), THREAD_AWAKENED); | |
397 | } else { | |
398 | disable_preemption(); | |
399 | _mpsc_daemon_queue_wakeup(dq); | |
400 | enable_preemption(); | |
401 | thread_block(THREAD_CONTINUE_NULL); | |
402 | } | |
403 | ||
404 | switch (dq->mpd_kind) { | |
405 | case MPSC_QUEUE_KIND_NESTED: | |
406 | dq->mpd_target = NULL; | |
407 | break; | |
408 | case MPSC_QUEUE_KIND_THREAD: | |
409 | case MPSC_QUEUE_KIND_THREAD_CRITICAL: | |
410 | dq->mpd_thread = NULL; | |
411 | break; | |
412 | case MPSC_QUEUE_KIND_THREAD_CALL: | |
413 | thread_call_cancel_wait(dq->mpd_call); | |
414 | thread_call_free(dq->mpd_call); | |
415 | dq->mpd_call = NULL; | |
416 | break; | |
417 | default: | |
418 | panic("mpsc_queue[%p]: invalid kind (%d)", dq, dq->mpd_kind); | |
419 | } | |
420 | dq->mpd_kind = MPSC_QUEUE_KIND_UNKNOWN; | |
421 | } | |
422 | ||
423 | #pragma mark deferred deallocation daemon | |
424 | ||
425 | static struct mpsc_daemon_queue thread_deferred_deallocation_queue; | |
426 | ||
427 | void | |
428 | thread_deallocate_daemon_init(void) | |
429 | { | |
430 | kern_return_t kr; | |
431 | ||
432 | kr = _mpsc_daemon_queue_init_with_thread(&thread_deferred_deallocation_queue, | |
433 | mpsc_daemon_queue_nested_invoke, MINPRI_KERNEL, | |
434 | "daemon.deferred-deallocation", MPSC_QUEUE_KIND_THREAD_CRITICAL); | |
435 | if (kr != KERN_SUCCESS) { | |
436 | panic("thread_deallocate_daemon_init: creating daemon failed (%d)", kr); | |
437 | } | |
438 | } | |
439 | ||
440 | void | |
441 | thread_deallocate_daemon_register_queue(mpsc_daemon_queue_t dq, | |
442 | mpsc_daemon_invoke_fn_t invoke) | |
443 | { | |
444 | mpsc_daemon_queue_init_with_target(dq, invoke, | |
445 | &thread_deferred_deallocation_queue); | |
446 | } |