]> git.saurik.com Git - apple/libdispatch.git/blob - src/apply.c
libdispatch-339.1.9.tar.gz
[apple/libdispatch.git] / src / apply.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 typedef void (*dispatch_apply_function_t)(void *, size_t);
24
25 DISPATCH_ALWAYS_INLINE
26 static inline void
27 _dispatch_apply_invoke2(void *ctxt)
28 {
29 dispatch_apply_t da = (dispatch_apply_t)ctxt;
30 size_t const iter = da->da_iterations;
31 size_t idx, done = 0;
32
33 idx = dispatch_atomic_inc_orig2o(da, da_index, acquire);
34 if (!fastpath(idx < iter)) goto out;
35
36 // da_dc is only safe to access once the 'index lock' has been acquired
37 dispatch_apply_function_t const func = (void *)da->da_dc->dc_func;
38 void *const da_ctxt = da->da_dc->dc_ctxt;
39
40 _dispatch_perfmon_workitem_dec(); // this unit executes many items
41
42 // Handle nested dispatch_apply rdar://problem/9294578
43 size_t nested = (size_t)_dispatch_thread_getspecific(dispatch_apply_key);
44 _dispatch_thread_setspecific(dispatch_apply_key, (void*)da->da_nested);
45
46 // Striding is the responsibility of the caller.
47 do {
48 _dispatch_client_callout2(da_ctxt, idx, func);
49 _dispatch_perfmon_workitem_inc();
50 done++;
51 idx = dispatch_atomic_inc_orig2o(da, da_index, relaxed);
52 } while (fastpath(idx < iter));
53 _dispatch_thread_setspecific(dispatch_apply_key, (void*)nested);
54
55 // The thread that finished the last workitem wakes up the possibly waiting
56 // thread that called dispatch_apply. They could be one and the same.
57 if (!dispatch_atomic_sub2o(da, da_todo, done, release)) {
58 _dispatch_thread_semaphore_signal(da->da_sema);
59 }
60 out:
61 if (dispatch_atomic_dec2o(da, da_thr_cnt, release) == 0) {
62 _dispatch_continuation_free((dispatch_continuation_t)da);
63 }
64 }
65
66 DISPATCH_NOINLINE
67 void
68 _dispatch_apply_invoke(void *ctxt)
69 {
70 _dispatch_apply_invoke2(ctxt);
71 }
72
73 DISPATCH_NOINLINE
74 void
75 _dispatch_apply_redirect_invoke(void *ctxt)
76 {
77 dispatch_apply_t da = (dispatch_apply_t)ctxt;
78 dispatch_queue_t old_dq;
79 old_dq = (dispatch_queue_t)_dispatch_thread_getspecific(dispatch_queue_key);
80
81 _dispatch_thread_setspecific(dispatch_queue_key, da->da_dc->dc_data);
82 _dispatch_apply_invoke2(ctxt);
83 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
84 }
85
86 static void
87 _dispatch_apply_serial(void *ctxt)
88 {
89 dispatch_apply_t da = (dispatch_apply_t)ctxt;
90 dispatch_continuation_t dc = da->da_dc;
91 size_t const iter = da->da_iterations;
92 size_t idx = 0;
93
94 _dispatch_perfmon_workitem_dec(); // this unit executes many items
95 do {
96 _dispatch_client_callout2(dc->dc_ctxt, idx, (void*)dc->dc_func);
97 _dispatch_perfmon_workitem_inc();
98 } while (++idx < iter);
99
100 _dispatch_continuation_free((dispatch_continuation_t)da);
101 }
102
103 DISPATCH_ALWAYS_INLINE
104 static inline void
105 _dispatch_apply_f2(dispatch_queue_t dq, dispatch_apply_t da,
106 dispatch_function_t func)
107 {
108 uint32_t i = 0;
109 dispatch_continuation_t head = NULL, tail = NULL;
110
111 // The current thread does not need a continuation
112 uint32_t continuation_cnt = da->da_thr_cnt - 1;
113
114 dispatch_assert(continuation_cnt);
115
116 for (i = 0; i < continuation_cnt; i++) {
117 dispatch_continuation_t next = _dispatch_continuation_alloc();
118 next->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
119 next->dc_func = func;
120 next->dc_ctxt = da;
121
122 next->do_next = head;
123 head = next;
124
125 if (!tail) {
126 tail = next;
127 }
128 }
129
130 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
131 da->da_sema = sema;
132
133 _dispatch_queue_push_list(dq, head, tail, continuation_cnt);
134 // Call the first element directly
135 _dispatch_apply_invoke(da);
136 _dispatch_perfmon_workitem_inc();
137
138 _dispatch_thread_semaphore_wait(sema);
139 _dispatch_put_thread_semaphore(sema);
140
141 }
142
143 static void
144 _dispatch_apply_redirect(void *ctxt)
145 {
146 dispatch_apply_t da = (dispatch_apply_t)ctxt;
147 uint32_t da_width = 2 * (da->da_thr_cnt - 1);
148 dispatch_queue_t dq = da->da_dc->dc_data, rq = dq, tq;
149
150 do {
151 uint32_t running, width = rq->dq_width;
152 running = dispatch_atomic_add2o(rq, dq_running, da_width, relaxed);
153 if (slowpath(running > width)) {
154 uint32_t excess = width > 1 ? running - width : da_width;
155 for (tq = dq; 1; tq = tq->do_targetq) {
156 (void)dispatch_atomic_sub2o(tq, dq_running, excess, relaxed);
157 if (tq == rq) {
158 break;
159 }
160 }
161 da_width -= excess;
162 if (slowpath(!da_width)) {
163 return _dispatch_apply_serial(da);
164 }
165 da->da_thr_cnt -= excess / 2;
166 }
167 rq = rq->do_targetq;
168 } while (slowpath(rq->do_targetq));
169 _dispatch_apply_f2(rq, da, _dispatch_apply_redirect_invoke);
170 do {
171 (void)dispatch_atomic_sub2o(dq, dq_running, da_width, relaxed);
172 dq = dq->do_targetq;
173 } while (slowpath(dq->do_targetq));
174 }
175
176 #define DISPATCH_APPLY_MAX UINT16_MAX // must be < sqrt(SIZE_MAX)
177
178 DISPATCH_NOINLINE
179 void
180 dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt,
181 void (*func)(void *, size_t))
182 {
183 if (slowpath(iterations == 0)) {
184 return;
185 }
186 uint32_t thr_cnt = _dispatch_hw_config.cc_max_active;
187 size_t nested = (size_t)_dispatch_thread_getspecific(dispatch_apply_key);
188 if (!slowpath(nested)) {
189 nested = iterations;
190 } else {
191 thr_cnt = nested < thr_cnt ? thr_cnt / nested : 1;
192 nested = nested < DISPATCH_APPLY_MAX && iterations < DISPATCH_APPLY_MAX
193 ? nested * iterations : DISPATCH_APPLY_MAX;
194 }
195 if (iterations < thr_cnt) {
196 thr_cnt = (uint32_t)iterations;
197 }
198 struct dispatch_continuation_s dc = {
199 .dc_func = (void*)func,
200 .dc_ctxt = ctxt,
201 };
202 dispatch_apply_t da = (typeof(da))_dispatch_continuation_alloc();
203 da->da_index = 0;
204 da->da_todo = iterations;
205 da->da_iterations = iterations;
206 da->da_nested = nested;
207 da->da_thr_cnt = thr_cnt;
208 da->da_dc = &dc;
209
210 dispatch_queue_t old_dq;
211 old_dq = (dispatch_queue_t)_dispatch_thread_getspecific(dispatch_queue_key);
212 if (slowpath(dq == DISPATCH_APPLY_CURRENT_ROOT_QUEUE)) {
213 dq = old_dq ? old_dq : _dispatch_get_root_queue(0, 0);
214 while (slowpath(dq->do_targetq)) {
215 dq = dq->do_targetq;
216 }
217 }
218 if (slowpath(dq->dq_width <= 2) || slowpath(thr_cnt <= 1)) {
219 return dispatch_sync_f(dq, da, _dispatch_apply_serial);
220 }
221 if (slowpath(dq->do_targetq)) {
222 if (slowpath(dq == old_dq)) {
223 return dispatch_sync_f(dq, da, _dispatch_apply_serial);
224 } else {
225 dc.dc_data = dq;
226 return dispatch_sync_f(dq, da, _dispatch_apply_redirect);
227 }
228 }
229 _dispatch_thread_setspecific(dispatch_queue_key, dq);
230 _dispatch_apply_f2(dq, da, _dispatch_apply_invoke);
231 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
232 }
233
234 #ifdef __BLOCKS__
235 #if DISPATCH_COCOA_COMPAT
236 DISPATCH_NOINLINE
237 static void
238 _dispatch_apply_slow(size_t iterations, dispatch_queue_t dq,
239 void (^work)(size_t))
240 {
241 dispatch_block_t bb = _dispatch_Block_copy((void *)work);
242 dispatch_apply_f(iterations, dq, bb,
243 (dispatch_apply_function_t)_dispatch_Block_invoke(bb));
244 Block_release(bb);
245 }
246 #endif
247
248 void
249 dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
250 {
251 #if DISPATCH_COCOA_COMPAT
252 // Under GC, blocks transferred to other threads must be Block_copy()ed
253 // rdar://problem/7455071
254 if (dispatch_begin_thread_4GC) {
255 return _dispatch_apply_slow(iterations, dq, work);
256 }
257 #endif
258 dispatch_apply_f(iterations, dq, work,
259 (dispatch_apply_function_t)_dispatch_Block_invoke(work));
260 }
261 #endif
262
263 #if 0
264 #ifdef __BLOCKS__
265 void
266 dispatch_stride(size_t offset, size_t stride, size_t iterations,
267 dispatch_queue_t dq, void (^work)(size_t))
268 {
269 dispatch_stride_f(offset, stride, iterations, dq, work,
270 (dispatch_apply_function_t)_dispatch_Block_invoke(work));
271 }
272 #endif
273
274 DISPATCH_NOINLINE
275 void
276 dispatch_stride_f(size_t offset, size_t stride, size_t iterations,
277 dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
278 {
279 if (stride == 0) {
280 stride = 1;
281 }
282 dispatch_apply(iterations / stride, queue, ^(size_t idx) {
283 size_t i = idx * stride + offset;
284 size_t stop = i + stride;
285 do {
286 func(ctxt, i++);
287 } while (i < stop);
288 });
289
290 dispatch_sync(queue, ^{
291 size_t i;
292 for (i = iterations - (iterations % stride); i < iterations; i++) {
293 func(ctxt, i + offset);
294 }
295 });
296 }
297 #endif