]> git.saurik.com Git - apple/libdispatch.git/blob - src/apply.c
libdispatch-913.1.6.tar.gz
[apple/libdispatch.git] / src / apply.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 typedef void (*dispatch_apply_function_t)(void *, size_t);
24 static char const * const _dispatch_apply_key = "apply";
25
26 #define DISPATCH_APPLY_INVOKE_REDIRECT 0x1
27 #define DISPATCH_APPLY_INVOKE_WAIT 0x2
28
29 DISPATCH_ALWAYS_INLINE
30 static inline void
31 _dispatch_apply_invoke2(void *ctxt, long invoke_flags)
32 {
33 dispatch_apply_t da = (dispatch_apply_t)ctxt;
34 size_t const iter = da->da_iterations;
35 size_t idx, done = 0;
36
37 idx = os_atomic_inc_orig2o(da, da_index, acquire);
38 if (unlikely(idx >= iter)) goto out;
39
40 // da_dc is only safe to access once the 'index lock' has been acquired
41 dispatch_apply_function_t const func = (void *)da->da_dc->dc_func;
42 void *const da_ctxt = da->da_dc->dc_ctxt;
43 dispatch_queue_t dq = da->da_dc->dc_data;
44
45 _dispatch_perfmon_workitem_dec(); // this unit executes many items
46
47 // Handle nested dispatch_apply rdar://problem/9294578
48 dispatch_thread_context_s apply_ctxt = {
49 .dtc_key = _dispatch_apply_key,
50 .dtc_apply_nesting = da->da_nested,
51 };
52 _dispatch_thread_context_push(&apply_ctxt);
53
54 dispatch_thread_frame_s dtf;
55 dispatch_priority_t old_dbp = 0;
56 if (invoke_flags & DISPATCH_APPLY_INVOKE_REDIRECT) {
57 _dispatch_thread_frame_push(&dtf, dq);
58 old_dbp = _dispatch_set_basepri(dq->dq_priority);
59 }
60 dispatch_invoke_flags_t flags = da->da_flags;
61
62 // Striding is the responsibility of the caller.
63 do {
64 dispatch_invoke_with_autoreleasepool(flags, {
65 _dispatch_client_callout2(da_ctxt, idx, func);
66 _dispatch_perfmon_workitem_inc();
67 done++;
68 idx = os_atomic_inc_orig2o(da, da_index, relaxed);
69 });
70 } while (likely(idx < iter));
71
72 if (invoke_flags & DISPATCH_APPLY_INVOKE_REDIRECT) {
73 _dispatch_reset_basepri(old_dbp);
74 _dispatch_thread_frame_pop(&dtf);
75 }
76
77 _dispatch_thread_context_pop(&apply_ctxt);
78
79 // The thread that finished the last workitem wakes up the possibly waiting
80 // thread that called dispatch_apply. They could be one and the same.
81 if (!os_atomic_sub2o(da, da_todo, done, release)) {
82 _dispatch_thread_event_signal(&da->da_event);
83 }
84 out:
85 if (invoke_flags & DISPATCH_APPLY_INVOKE_WAIT) {
86 _dispatch_thread_event_wait(&da->da_event);
87 _dispatch_thread_event_destroy(&da->da_event);
88 }
89 if (os_atomic_dec2o(da, da_thr_cnt, release) == 0) {
90 #if DISPATCH_INTROSPECTION
91 _dispatch_continuation_free(da->da_dc);
92 #endif
93 _dispatch_continuation_free((dispatch_continuation_t)da);
94 }
95 }
96
97 DISPATCH_NOINLINE
98 void
99 _dispatch_apply_invoke(void *ctxt)
100 {
101 _dispatch_apply_invoke2(ctxt, 0);
102 }
103
104 DISPATCH_NOINLINE
105 static void
106 _dispatch_apply_invoke_and_wait(void *ctxt)
107 {
108 _dispatch_apply_invoke2(ctxt, DISPATCH_APPLY_INVOKE_WAIT);
109 _dispatch_perfmon_workitem_inc();
110 }
111
112 DISPATCH_NOINLINE
113 void
114 _dispatch_apply_redirect_invoke(void *ctxt)
115 {
116 _dispatch_apply_invoke2(ctxt, DISPATCH_APPLY_INVOKE_REDIRECT);
117 }
118
119 DISPATCH_ALWAYS_INLINE
120 static inline dispatch_invoke_flags_t
121 _dispatch_apply_autorelease_frequency(dispatch_queue_t dq)
122 {
123 dispatch_invoke_flags_t qaf = 0;
124
125 while (dq && !qaf) {
126 qaf = _dispatch_queue_autorelease_frequency(dq);
127 dq = dq->do_targetq;
128 }
129 return qaf;
130 }
131
132 DISPATCH_NOINLINE
133 static void
134 _dispatch_apply_serial(void *ctxt)
135 {
136 dispatch_apply_t da = (dispatch_apply_t)ctxt;
137 dispatch_continuation_t dc = da->da_dc;
138 size_t const iter = da->da_iterations;
139 dispatch_invoke_flags_t flags;
140 size_t idx = 0;
141
142 _dispatch_perfmon_workitem_dec(); // this unit executes many items
143 flags = _dispatch_apply_autorelease_frequency(dc->dc_data);
144 do {
145 dispatch_invoke_with_autoreleasepool(flags, {
146 _dispatch_client_callout2(dc->dc_ctxt, idx, (void*)dc->dc_func);
147 _dispatch_perfmon_workitem_inc();
148 });
149 } while (++idx < iter);
150
151 #if DISPATCH_INTROSPECTION
152 _dispatch_continuation_free(da->da_dc);
153 #endif
154 _dispatch_continuation_free((dispatch_continuation_t)da);
155 }
156
157 DISPATCH_ALWAYS_INLINE
158 static inline void
159 _dispatch_apply_f2(dispatch_queue_t dq, dispatch_apply_t da,
160 dispatch_function_t func)
161 {
162 int32_t i = 0;
163 dispatch_continuation_t head = NULL, tail = NULL;
164
165 // The current thread does not need a continuation
166 int32_t continuation_cnt = da->da_thr_cnt - 1;
167
168 dispatch_assert(continuation_cnt);
169
170 for (i = 0; i < continuation_cnt; i++) {
171 dispatch_continuation_t next = _dispatch_continuation_alloc();
172 uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;
173
174 _dispatch_continuation_init_f(next, dq, da, func, 0, 0, dc_flags);
175 next->do_next = head;
176 head = next;
177
178 if (!tail) {
179 tail = next;
180 }
181 }
182
183 _dispatch_thread_event_init(&da->da_event);
184 // FIXME: dq may not be the right queue for the priority of `head`
185 _dispatch_root_queue_push_inline(dq, head, tail, continuation_cnt);
186 // Call the first element directly
187 _dispatch_apply_invoke_and_wait(da);
188 }
189
190 DISPATCH_NOINLINE
191 static void
192 _dispatch_apply_redirect(void *ctxt)
193 {
194 dispatch_apply_t da = (dispatch_apply_t)ctxt;
195 int32_t da_width = da->da_thr_cnt - 1;
196 dispatch_queue_t dq = da->da_dc->dc_data, rq = dq, tq;
197
198 do {
199 int32_t width = _dispatch_queue_try_reserve_apply_width(rq, da_width);
200
201 if (unlikely(da_width > width)) {
202 int32_t excess = da_width - width;
203 for (tq = dq; tq != rq; tq = tq->do_targetq) {
204 _dispatch_queue_relinquish_width(tq, excess);
205 }
206 da_width -= excess;
207 if (unlikely(!da_width)) {
208 return _dispatch_apply_serial(da);
209 }
210 da->da_thr_cnt -= excess;
211 }
212 if (!da->da_flags) {
213 // find first queue in descending target queue order that has
214 // an autorelease frequency set, and use that as the frequency for
215 // this continuation.
216 da->da_flags = _dispatch_queue_autorelease_frequency(dq);
217 }
218 rq = rq->do_targetq;
219 } while (unlikely(rq->do_targetq));
220 _dispatch_apply_f2(rq, da, _dispatch_apply_redirect_invoke);
221 do {
222 _dispatch_queue_relinquish_width(dq, da_width);
223 dq = dq->do_targetq;
224 } while (unlikely(dq->do_targetq));
225 }
226
227 #define DISPATCH_APPLY_MAX UINT16_MAX // must be < sqrt(SIZE_MAX)
228
229 DISPATCH_ALWAYS_INLINE
230 static inline dispatch_queue_t
231 _dispatch_apply_root_queue(dispatch_queue_t dq)
232 {
233 if (dq) {
234 while (unlikely(dq->do_targetq)) {
235 dq = dq->do_targetq;
236 }
237 // if the current root queue is a pthread root queue, select it
238 if (!_dispatch_priority_qos(dq->dq_priority)) {
239 return dq;
240 }
241 }
242
243 pthread_priority_t pp = _dispatch_get_priority();
244 dispatch_qos_t qos = _dispatch_qos_from_pp(pp);
245 return _dispatch_get_root_queue(qos ? qos : DISPATCH_QOS_DEFAULT, false);
246 }
247
248 DISPATCH_NOINLINE
249 void
250 dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt,
251 void (*func)(void *, size_t))
252 {
253 if (unlikely(iterations == 0)) {
254 return;
255 }
256 dispatch_thread_context_t dtctxt =
257 _dispatch_thread_context_find(_dispatch_apply_key);
258 size_t nested = dtctxt ? dtctxt->dtc_apply_nesting : 0;
259 dispatch_queue_t old_dq = _dispatch_queue_get_current();
260
261 if (likely(dq == DISPATCH_APPLY_AUTO)) {
262 dq = _dispatch_apply_root_queue(old_dq);
263 }
264 dispatch_qos_t qos = _dispatch_priority_qos(dq->dq_priority);
265 if (unlikely(dq->do_targetq)) {
266 // if the queue passed-in is not a root queue, use the current QoS
267 // since the caller participates in the work anyway
268 qos = _dispatch_qos_from_pp(_dispatch_get_priority());
269 }
270 int32_t thr_cnt = (int32_t)_dispatch_qos_max_parallelism(qos,
271 DISPATCH_MAX_PARALLELISM_ACTIVE);
272
273 if (likely(!nested)) {
274 nested = iterations;
275 } else {
276 thr_cnt = nested < (size_t)thr_cnt ? thr_cnt / (int32_t)nested : 1;
277 nested = nested < DISPATCH_APPLY_MAX && iterations < DISPATCH_APPLY_MAX
278 ? nested * iterations : DISPATCH_APPLY_MAX;
279 }
280 if (iterations < (size_t)thr_cnt) {
281 thr_cnt = (int32_t)iterations;
282 }
283 struct dispatch_continuation_s dc = {
284 .dc_func = (void*)func,
285 .dc_ctxt = ctxt,
286 .dc_data = dq,
287 };
288 dispatch_apply_t da = (typeof(da))_dispatch_continuation_alloc();
289 da->da_index = 0;
290 da->da_todo = iterations;
291 da->da_iterations = iterations;
292 da->da_nested = nested;
293 da->da_thr_cnt = thr_cnt;
294 #if DISPATCH_INTROSPECTION
295 da->da_dc = _dispatch_continuation_alloc();
296 *da->da_dc = dc;
297 #else
298 da->da_dc = &dc;
299 #endif
300 da->da_flags = 0;
301
302 if (unlikely(dq->dq_width == 1 || thr_cnt <= 1)) {
303 return dispatch_sync_f(dq, da, _dispatch_apply_serial);
304 }
305 if (unlikely(dq->do_targetq)) {
306 if (unlikely(dq == old_dq)) {
307 return dispatch_sync_f(dq, da, _dispatch_apply_serial);
308 } else {
309 return dispatch_sync_f(dq, da, _dispatch_apply_redirect);
310 }
311 }
312
313 dispatch_thread_frame_s dtf;
314 _dispatch_thread_frame_push(&dtf, dq);
315 _dispatch_apply_f2(dq, da, _dispatch_apply_invoke);
316 _dispatch_thread_frame_pop(&dtf);
317 }
318
319 #ifdef __BLOCKS__
320 void
321 dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
322 {
323 dispatch_apply_f(iterations, dq, work,
324 (dispatch_apply_function_t)_dispatch_Block_invoke(work));
325 }
326 #endif
327
328 #if 0
329 #ifdef __BLOCKS__
330 void
331 dispatch_stride(size_t offset, size_t stride, size_t iterations,
332 dispatch_queue_t dq, void (^work)(size_t))
333 {
334 dispatch_stride_f(offset, stride, iterations, dq, work,
335 (dispatch_apply_function_t)_dispatch_Block_invoke(work));
336 }
337 #endif
338
339 DISPATCH_NOINLINE
340 void
341 dispatch_stride_f(size_t offset, size_t stride, size_t iterations,
342 dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
343 {
344 if (stride == 0) {
345 stride = 1;
346 }
347 dispatch_apply(iterations / stride, queue, ^(size_t idx) {
348 size_t i = idx * stride + offset;
349 size_t stop = i + stride;
350 do {
351 func(ctxt, i++);
352 } while (i < stop);
353 });
354
355 dispatch_sync(queue, ^{
356 size_t i;
357 for (i = iterations - (iterations % stride); i < iterations; i++) {
358 func(ctxt, i + offset);
359 }
360 });
361 }
362 #endif