]> git.saurik.com Git - apple/libdispatch.git/blob - src/apply.c
libdispatch-228.23.tar.gz
[apple/libdispatch.git] / src / apply.c
1 /*
2 * Copyright (c) 2008-2011 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 DISPATCH_ALWAYS_INLINE
24 static inline void
25 _dispatch_apply_invoke(void *ctxt)
26 {
27 dispatch_apply_t da = ctxt;
28 size_t const iter = da->da_iterations;
29 typeof(da->da_func) const func = da->da_func;
30 void *const da_ctxt = da->da_ctxt;
31 size_t idx, done = 0;
32
33 _dispatch_workitem_dec(); // this unit executes many items
34
35 // Make nested dispatch_apply fall into serial case rdar://problem/9294578
36 _dispatch_thread_setspecific(dispatch_apply_key, (void*)~0ul);
37 // Striding is the responsibility of the caller.
38 while (fastpath((idx = dispatch_atomic_inc2o(da, da_index) - 1) < iter)) {
39 _dispatch_client_callout2(da_ctxt, idx, func);
40 _dispatch_workitem_inc();
41 done++;
42 }
43 _dispatch_thread_setspecific(dispatch_apply_key, NULL);
44
45 dispatch_atomic_release_barrier();
46
47 // The thread that finished the last workitem wakes up the (possibly waiting)
48 // thread that called dispatch_apply. They could be one and the same.
49 if (done && (dispatch_atomic_add2o(da, da_done, done) == iter)) {
50 _dispatch_thread_semaphore_signal(da->da_sema);
51 }
52
53 if (dispatch_atomic_dec2o(da, da_thr_cnt) == 0) {
54 _dispatch_continuation_free((dispatch_continuation_t)da);
55 }
56 }
57
58 DISPATCH_NOINLINE
59 static void
60 _dispatch_apply2(void *ctxt)
61 {
62 _dispatch_apply_invoke(ctxt);
63 }
64
65 static void
66 _dispatch_apply3(void *ctxt)
67 {
68 dispatch_apply_t da = ctxt;
69 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
70
71 _dispatch_thread_setspecific(dispatch_queue_key, da->da_queue);
72 _dispatch_apply_invoke(ctxt);
73 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
74 }
75
76 static void
77 _dispatch_apply_serial(void *ctxt)
78 {
79 dispatch_apply_t da = ctxt;
80 size_t idx = 0;
81
82 _dispatch_workitem_dec(); // this unit executes many items
83 do {
84 _dispatch_client_callout2(da->da_ctxt, idx, da->da_func);
85 _dispatch_workitem_inc();
86 } while (++idx < da->da_iterations);
87
88 _dispatch_continuation_free((dispatch_continuation_t)da);
89 }
90
91 // 64 threads should be good enough for the short to mid term
92 #define DISPATCH_APPLY_MAX_CPUS 64
93
94 DISPATCH_ALWAYS_INLINE
95 static inline void
96 _dispatch_apply_f2(dispatch_queue_t dq, dispatch_apply_t da,
97 dispatch_function_t func)
98 {
99 uint32_t i = 0;
100 dispatch_continuation_t head = NULL, tail = NULL;
101
102 // The current thread does not need a continuation
103 uint32_t continuation_cnt = da->da_thr_cnt - 1;
104
105 dispatch_assert(continuation_cnt);
106
107 for (i = 0; i < continuation_cnt; i++) {
108 dispatch_continuation_t next = _dispatch_continuation_alloc();
109 next->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
110 next->dc_func = func;
111 next->dc_ctxt = da;
112
113 next->do_next = head;
114 head = next;
115
116 if (!tail) {
117 tail = next;
118 }
119 }
120
121 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
122 da->da_sema = sema;
123
124 _dispatch_queue_push_list(dq, head, tail, continuation_cnt);
125 // Call the first element directly
126 _dispatch_apply2(da);
127 _dispatch_workitem_inc();
128
129 _dispatch_thread_semaphore_wait(sema);
130 _dispatch_put_thread_semaphore(sema);
131
132 }
133
134 static void
135 _dispatch_apply_redirect(void *ctxt)
136 {
137 dispatch_apply_t da = ctxt;
138 uint32_t da_width = 2 * (da->da_thr_cnt - 1);
139 dispatch_queue_t dq = da->da_queue, rq = dq, tq;
140
141 do {
142 uint32_t running = dispatch_atomic_add2o(rq, dq_running, da_width);
143 uint32_t width = rq->dq_width;
144 if (slowpath(running > width)) {
145 uint32_t excess = width > 1 ? running - width : da_width;
146 for (tq = dq; 1; tq = tq->do_targetq) {
147 (void)dispatch_atomic_sub2o(tq, dq_running, excess);
148 if (tq == rq) {
149 break;
150 }
151 }
152 da_width -= excess;
153 if (slowpath(!da_width)) {
154 return _dispatch_apply_serial(da);
155 }
156 da->da_thr_cnt -= excess / 2;
157 }
158 rq = rq->do_targetq;
159 } while (slowpath(rq->do_targetq));
160 _dispatch_apply_f2(rq, da, _dispatch_apply3);
161 do {
162 (void)dispatch_atomic_sub2o(dq, dq_running, da_width);
163 dq = dq->do_targetq;
164 } while (slowpath(dq->do_targetq));
165 }
166
167 DISPATCH_NOINLINE
168 void
169 dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt,
170 void (*func)(void *, size_t))
171 {
172 if (slowpath(iterations == 0)) {
173 return;
174 }
175
176 dispatch_apply_t da = (typeof(da))_dispatch_continuation_alloc();
177
178 da->da_func = func;
179 da->da_ctxt = ctxt;
180 da->da_iterations = iterations;
181 da->da_index = 0;
182 da->da_thr_cnt = _dispatch_hw_config.cc_max_active;
183 da->da_done = 0;
184 da->da_queue = NULL;
185
186 if (da->da_thr_cnt > DISPATCH_APPLY_MAX_CPUS) {
187 da->da_thr_cnt = DISPATCH_APPLY_MAX_CPUS;
188 }
189 if (iterations < da->da_thr_cnt) {
190 da->da_thr_cnt = (uint32_t)iterations;
191 }
192 if (slowpath(dq->dq_width <= 2) || slowpath(da->da_thr_cnt <= 1) ||
193 slowpath(_dispatch_thread_getspecific(dispatch_apply_key))) {
194 return dispatch_sync_f(dq, da, _dispatch_apply_serial);
195 }
196 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
197 if (slowpath(dq->do_targetq)) {
198 if (slowpath(dq == old_dq)) {
199 return dispatch_sync_f(dq, da, _dispatch_apply_serial);
200 } else {
201 da->da_queue = dq;
202 return dispatch_sync_f(dq, da, _dispatch_apply_redirect);
203 }
204 }
205 dispatch_atomic_acquire_barrier();
206 _dispatch_thread_setspecific(dispatch_queue_key, dq);
207 _dispatch_apply_f2(dq, da, _dispatch_apply2);
208 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
209 }
210
211 #ifdef __BLOCKS__
212 #if DISPATCH_COCOA_COMPAT
213 DISPATCH_NOINLINE
214 static void
215 _dispatch_apply_slow(size_t iterations, dispatch_queue_t dq,
216 void (^work)(size_t))
217 {
218 struct Block_basic *bb = (void *)_dispatch_Block_copy((void *)work);
219 dispatch_apply_f(iterations, dq, bb, (void *)bb->Block_invoke);
220 Block_release(bb);
221 }
222 #endif
223
224 void
225 dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
226 {
227 #if DISPATCH_COCOA_COMPAT
228 // Under GC, blocks transferred to other threads must be Block_copy()ed
229 // rdar://problem/7455071
230 if (dispatch_begin_thread_4GC) {
231 return _dispatch_apply_slow(iterations, dq, work);
232 }
233 #endif
234 struct Block_basic *bb = (void *)work;
235 dispatch_apply_f(iterations, dq, bb, (void *)bb->Block_invoke);
236 }
237 #endif
238
239 #if 0
240 #ifdef __BLOCKS__
241 void
242 dispatch_stride(size_t offset, size_t stride, size_t iterations,
243 dispatch_queue_t dq, void (^work)(size_t))
244 {
245 struct Block_basic *bb = (void *)work;
246 dispatch_stride_f(offset, stride, iterations, dq, bb,
247 (void *)bb->Block_invoke);
248 }
249 #endif
250
251 DISPATCH_NOINLINE
252 void
253 dispatch_stride_f(size_t offset, size_t stride, size_t iterations,
254 dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
255 {
256 if (stride == 0) {
257 stride = 1;
258 }
259 dispatch_apply(iterations / stride, queue, ^(size_t idx) {
260 size_t i = idx * stride + offset;
261 size_t stop = i + stride;
262 do {
263 func(ctxt, i++);
264 } while (i < stop);
265 });
266
267 dispatch_sync(queue, ^{
268 size_t i;
269 for (i = iterations - (iterations % stride); i < iterations; i++) {
270 func(ctxt, i + offset);
271 }
272 });
273 }
274 #endif