]> git.saurik.com Git - apple/libdispatch.git/blob - src/apply.c
libdispatch-442.1.4.tar.gz
[apple/libdispatch.git] / src / apply.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 typedef void (*dispatch_apply_function_t)(void *, size_t);
24
25 DISPATCH_ALWAYS_INLINE
26 static inline void
27 _dispatch_apply_invoke2(void *ctxt, bool redirect)
28 {
29 dispatch_apply_t da = (dispatch_apply_t)ctxt;
30 size_t const iter = da->da_iterations;
31 size_t idx, done = 0;
32
33 idx = dispatch_atomic_inc_orig2o(da, da_index, acquire);
34 if (!fastpath(idx < iter)) goto out;
35
36 // da_dc is only safe to access once the 'index lock' has been acquired
37 dispatch_apply_function_t const func = (void *)da->da_dc->dc_func;
38 void *const da_ctxt = da->da_dc->dc_ctxt;
39 dispatch_queue_t dq = da->da_dc->dc_data;
40
41 _dispatch_perfmon_workitem_dec(); // this unit executes many items
42
43 // Handle nested dispatch_apply rdar://problem/9294578
44 size_t nested = (size_t)_dispatch_thread_getspecific(dispatch_apply_key);
45 _dispatch_thread_setspecific(dispatch_apply_key, (void*)da->da_nested);
46
47 dispatch_queue_t old_dq;
48 pthread_priority_t old_dp;
49 if (redirect) {
50 old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
51 _dispatch_thread_setspecific(dispatch_queue_key, dq);
52 old_dp = _dispatch_set_defaultpriority(dq->dq_priority);
53 }
54
55 // Striding is the responsibility of the caller.
56 do {
57 _dispatch_client_callout2(da_ctxt, idx, func);
58 _dispatch_perfmon_workitem_inc();
59 done++;
60 idx = dispatch_atomic_inc_orig2o(da, da_index, relaxed);
61 } while (fastpath(idx < iter));
62
63 if (redirect) {
64 _dispatch_reset_defaultpriority(old_dp);
65 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
66 }
67 _dispatch_thread_setspecific(dispatch_apply_key, (void*)nested);
68
69 // The thread that finished the last workitem wakes up the possibly waiting
70 // thread that called dispatch_apply. They could be one and the same.
71 if (!dispatch_atomic_sub2o(da, da_todo, done, release)) {
72 _dispatch_thread_semaphore_signal(da->da_sema);
73 }
74 out:
75 if (dispatch_atomic_dec2o(da, da_thr_cnt, release) == 0) {
76 _dispatch_continuation_free((dispatch_continuation_t)da);
77 }
78 }
79
80 DISPATCH_NOINLINE
81 void
82 _dispatch_apply_invoke(void *ctxt)
83 {
84 _dispatch_apply_invoke2(ctxt, false);
85 }
86
87 DISPATCH_NOINLINE
88 void
89 _dispatch_apply_redirect_invoke(void *ctxt)
90 {
91 _dispatch_apply_invoke2(ctxt, true);
92 }
93
94 static void
95 _dispatch_apply_serial(void *ctxt)
96 {
97 dispatch_apply_t da = (dispatch_apply_t)ctxt;
98 dispatch_continuation_t dc = da->da_dc;
99 size_t const iter = da->da_iterations;
100 size_t idx = 0;
101
102 _dispatch_perfmon_workitem_dec(); // this unit executes many items
103 do {
104 _dispatch_client_callout2(dc->dc_ctxt, idx, (void*)dc->dc_func);
105 _dispatch_perfmon_workitem_inc();
106 } while (++idx < iter);
107
108 _dispatch_continuation_free((dispatch_continuation_t)da);
109 }
110
111 DISPATCH_ALWAYS_INLINE
112 static inline void
113 _dispatch_apply_f2(dispatch_queue_t dq, dispatch_apply_t da,
114 dispatch_function_t func)
115 {
116 uint32_t i = 0;
117 dispatch_continuation_t head = NULL, tail = NULL;
118
119 // The current thread does not need a continuation
120 uint32_t continuation_cnt = da->da_thr_cnt - 1;
121
122 dispatch_assert(continuation_cnt);
123
124 for (i = 0; i < continuation_cnt; i++) {
125 dispatch_continuation_t next = _dispatch_continuation_alloc();
126 next->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
127 next->dc_func = func;
128 next->dc_ctxt = da;
129 _dispatch_continuation_voucher_set(next, 0);
130 _dispatch_continuation_priority_set(next, 0, 0);
131
132 next->do_next = head;
133 head = next;
134
135 if (!tail) {
136 tail = next;
137 }
138 }
139
140 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
141 da->da_sema = sema;
142
143 _dispatch_queue_push_list(dq, head, tail, head->dc_priority,
144 continuation_cnt);
145 // Call the first element directly
146 _dispatch_apply_invoke(da);
147 _dispatch_perfmon_workitem_inc();
148
149 _dispatch_thread_semaphore_wait(sema);
150 _dispatch_put_thread_semaphore(sema);
151
152 }
153
154 static void
155 _dispatch_apply_redirect(void *ctxt)
156 {
157 dispatch_apply_t da = (dispatch_apply_t)ctxt;
158 uint32_t da_width = 2 * (da->da_thr_cnt - 1);
159 dispatch_queue_t dq = da->da_dc->dc_data, rq = dq, tq;
160
161 do {
162 uint32_t running, width = rq->dq_width;
163 running = dispatch_atomic_add2o(rq, dq_running, da_width, relaxed);
164 if (slowpath(running > width)) {
165 uint32_t excess = width > 1 ? running - width : da_width;
166 for (tq = dq; 1; tq = tq->do_targetq) {
167 (void)dispatch_atomic_sub2o(tq, dq_running, excess, relaxed);
168 if (tq == rq) {
169 break;
170 }
171 }
172 da_width -= excess;
173 if (slowpath(!da_width)) {
174 return _dispatch_apply_serial(da);
175 }
176 da->da_thr_cnt -= excess / 2;
177 }
178 rq = rq->do_targetq;
179 } while (slowpath(rq->do_targetq));
180 _dispatch_apply_f2(rq, da, _dispatch_apply_redirect_invoke);
181 do {
182 (void)dispatch_atomic_sub2o(dq, dq_running, da_width, relaxed);
183 dq = dq->do_targetq;
184 } while (slowpath(dq->do_targetq));
185 }
186
187 #define DISPATCH_APPLY_MAX UINT16_MAX // must be < sqrt(SIZE_MAX)
188
189 DISPATCH_NOINLINE
190 void
191 dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt,
192 void (*func)(void *, size_t))
193 {
194 if (slowpath(iterations == 0)) {
195 return;
196 }
197 uint32_t thr_cnt = dispatch_hw_config(active_cpus);
198 size_t nested = (size_t)_dispatch_thread_getspecific(dispatch_apply_key);
199 if (!slowpath(nested)) {
200 nested = iterations;
201 } else {
202 thr_cnt = nested < thr_cnt ? thr_cnt / nested : 1;
203 nested = nested < DISPATCH_APPLY_MAX && iterations < DISPATCH_APPLY_MAX
204 ? nested * iterations : DISPATCH_APPLY_MAX;
205 }
206 if (iterations < thr_cnt) {
207 thr_cnt = (uint32_t)iterations;
208 }
209 struct dispatch_continuation_s dc = {
210 .dc_func = (void*)func,
211 .dc_ctxt = ctxt,
212 };
213 dispatch_apply_t da = (typeof(da))_dispatch_continuation_alloc();
214 da->da_index = 0;
215 da->da_todo = iterations;
216 da->da_iterations = iterations;
217 da->da_nested = nested;
218 da->da_thr_cnt = thr_cnt;
219 da->da_dc = &dc;
220
221 dispatch_queue_t old_dq;
222 old_dq = (dispatch_queue_t)_dispatch_thread_getspecific(dispatch_queue_key);
223 if (slowpath(dq == DISPATCH_APPLY_CURRENT_ROOT_QUEUE)) {
224 dq = old_dq ? old_dq : _dispatch_get_root_queue(
225 _DISPATCH_QOS_CLASS_DEFAULT, false);
226 while (slowpath(dq->do_targetq)) {
227 dq = dq->do_targetq;
228 }
229 }
230 if (slowpath(dq->dq_width <= 2) || slowpath(thr_cnt <= 1)) {
231 return dispatch_sync_f(dq, da, _dispatch_apply_serial);
232 }
233 if (slowpath(dq->do_targetq)) {
234 if (slowpath(dq == old_dq)) {
235 return dispatch_sync_f(dq, da, _dispatch_apply_serial);
236 } else {
237 dc.dc_data = dq;
238 return dispatch_sync_f(dq, da, _dispatch_apply_redirect);
239 }
240 }
241 _dispatch_thread_setspecific(dispatch_queue_key, dq);
242 _dispatch_apply_f2(dq, da, _dispatch_apply_invoke);
243 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
244 }
245
246 #ifdef __BLOCKS__
247 #if DISPATCH_COCOA_COMPAT
248 DISPATCH_NOINLINE
249 static void
250 _dispatch_apply_slow(size_t iterations, dispatch_queue_t dq,
251 void (^work)(size_t))
252 {
253 dispatch_block_t bb = _dispatch_Block_copy((void *)work);
254 dispatch_apply_f(iterations, dq, bb,
255 (dispatch_apply_function_t)_dispatch_Block_invoke(bb));
256 Block_release(bb);
257 }
258 #endif
259
260 void
261 dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
262 {
263 #if DISPATCH_COCOA_COMPAT
264 // Under GC, blocks transferred to other threads must be Block_copy()ed
265 // rdar://problem/7455071
266 if (dispatch_begin_thread_4GC) {
267 return _dispatch_apply_slow(iterations, dq, work);
268 }
269 #endif
270 dispatch_apply_f(iterations, dq, work,
271 (dispatch_apply_function_t)_dispatch_Block_invoke(work));
272 }
273 #endif
274
275 #if 0
276 #ifdef __BLOCKS__
277 void
278 dispatch_stride(size_t offset, size_t stride, size_t iterations,
279 dispatch_queue_t dq, void (^work)(size_t))
280 {
281 dispatch_stride_f(offset, stride, iterations, dq, work,
282 (dispatch_apply_function_t)_dispatch_Block_invoke(work));
283 }
284 #endif
285
286 DISPATCH_NOINLINE
287 void
288 dispatch_stride_f(size_t offset, size_t stride, size_t iterations,
289 dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
290 {
291 if (stride == 0) {
292 stride = 1;
293 }
294 dispatch_apply(iterations / stride, queue, ^(size_t idx) {
295 size_t i = idx * stride + offset;
296 size_t stop = i + stride;
297 do {
298 func(ctxt, i++);
299 } while (i < stop);
300 });
301
302 dispatch_sync(queue, ^{
303 size_t i;
304 for (i = iterations - (iterations % stride); i < iterations; i++) {
305 func(ctxt, i + offset);
306 }
307 });
308 }
309 #endif