]> git.saurik.com Git - apple/libdispatch.git/blob - src/apply.c
libdispatch-187.10.tar.gz
[apple/libdispatch.git] / src / apply.c
1 /*
2 * Copyright (c) 2008-2011 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20 #include "internal.h"
21
22 // We'd use __attribute__((aligned(x))), but it does not atually increase the
23 // alignment of stack variables. All we really need is the stack usage of the
24 // local thread to be sufficiently away to avoid cache-line contention with the
25 // busy 'da_index' variable.
26 //
27 // NOTE: 'char' arrays cause GCC to insert buffer overflow detection logic
28 struct dispatch_apply_s {
29 long _da_pad0[DISPATCH_CACHELINE_SIZE / sizeof(long)];
30 void (*da_func)(void *, size_t);
31 void *da_ctxt;
32 size_t da_iterations;
33 size_t da_index;
34 uint32_t da_thr_cnt;
35 _dispatch_thread_semaphore_t da_sema;
36 dispatch_queue_t da_queue;
37 long _da_pad1[DISPATCH_CACHELINE_SIZE / sizeof(long)];
38 };
39
40 DISPATCH_ALWAYS_INLINE
41 static inline void
42 _dispatch_apply_invoke(void *ctxt)
43 {
44 struct dispatch_apply_s *da = ctxt;
45 size_t const iter = da->da_iterations;
46 typeof(da->da_func) const func = da->da_func;
47 void *const da_ctxt = da->da_ctxt;
48 size_t idx;
49
50 _dispatch_workitem_dec(); // this unit executes many items
51
52 // Make nested dispatch_apply fall into serial case rdar://problem/9294578
53 _dispatch_thread_setspecific(dispatch_apply_key, (void*)~0ul);
54 // Striding is the responsibility of the caller.
55 while (fastpath((idx = dispatch_atomic_inc2o(da, da_index) - 1) < iter)) {
56 _dispatch_client_callout2(da_ctxt, idx, func);
57 _dispatch_workitem_inc();
58 }
59 _dispatch_thread_setspecific(dispatch_apply_key, NULL);
60
61 dispatch_atomic_release_barrier();
62 if (dispatch_atomic_dec2o(da, da_thr_cnt) == 0) {
63 _dispatch_thread_semaphore_signal(da->da_sema);
64 }
65 }
66
67 DISPATCH_NOINLINE
68 static void
69 _dispatch_apply2(void *ctxt)
70 {
71 _dispatch_apply_invoke(ctxt);
72 }
73
74 static void
75 _dispatch_apply3(void *ctxt)
76 {
77 struct dispatch_apply_s *da = ctxt;
78 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
79
80 _dispatch_thread_setspecific(dispatch_queue_key, da->da_queue);
81 _dispatch_apply_invoke(ctxt);
82 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
83 }
84
85 static void
86 _dispatch_apply_serial(void *ctxt)
87 {
88 struct dispatch_apply_s *da = ctxt;
89 size_t idx = 0;
90
91 _dispatch_workitem_dec(); // this unit executes many items
92 do {
93 _dispatch_client_callout2(da->da_ctxt, idx, da->da_func);
94 _dispatch_workitem_inc();
95 } while (++idx < da->da_iterations);
96 }
97
98 // 256 threads should be good enough for the short to mid term
99 #define DISPATCH_APPLY_MAX_CPUS 256
100
101 DISPATCH_ALWAYS_INLINE
102 static inline void
103 _dispatch_apply_f2(dispatch_queue_t dq, struct dispatch_apply_s *da,
104 dispatch_function_t func)
105 {
106 struct dispatch_apply_dc_s {
107 DISPATCH_CONTINUATION_HEADER(dispatch_apply_dc_s);
108 } da_dc[DISPATCH_APPLY_MAX_CPUS];
109 size_t i;
110
111 for (i = 0; i < da->da_thr_cnt - 1; i++) {
112 da_dc[i].do_vtable = NULL;
113 da_dc[i].do_next = &da_dc[i + 1];
114 da_dc[i].dc_func = func;
115 da_dc[i].dc_ctxt = da;
116 }
117
118 da->da_sema = _dispatch_get_thread_semaphore();
119
120 _dispatch_queue_push_list(dq, (void *)&da_dc[0],
121 (void *)&da_dc[da->da_thr_cnt - 2]);
122 // Call the first element directly
123 _dispatch_apply2(da);
124 _dispatch_workitem_inc();
125
126 _dispatch_thread_semaphore_wait(da->da_sema);
127 _dispatch_put_thread_semaphore(da->da_sema);
128 }
129
130 static void
131 _dispatch_apply_redirect(void *ctxt)
132 {
133 struct dispatch_apply_s *da = ctxt;
134 uint32_t da_width = 2 * (da->da_thr_cnt - 1);
135 dispatch_queue_t dq = da->da_queue, rq = dq, tq;
136
137 do {
138 uint32_t running = dispatch_atomic_add2o(rq, dq_running, da_width);
139 uint32_t width = rq->dq_width;
140 if (slowpath(running > width)) {
141 uint32_t excess = width > 1 ? running - width : da_width;
142 for (tq = dq; 1; tq = tq->do_targetq) {
143 (void)dispatch_atomic_sub2o(tq, dq_running, excess);
144 if (tq == rq) {
145 break;
146 }
147 }
148 da_width -= excess;
149 if (slowpath(!da_width)) {
150 return _dispatch_apply_serial(da);
151 }
152 da->da_thr_cnt -= excess / 2;
153 }
154 rq = rq->do_targetq;
155 } while (slowpath(rq->do_targetq));
156 _dispatch_apply_f2(rq, da, _dispatch_apply3);
157 do {
158 (void)dispatch_atomic_sub2o(dq, dq_running, da_width);
159 dq = dq->do_targetq;
160 } while (slowpath(dq->do_targetq));
161 }
162
163 DISPATCH_NOINLINE
164 void
165 dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt,
166 void (*func)(void *, size_t))
167 {
168 struct dispatch_apply_s da;
169
170 da.da_func = func;
171 da.da_ctxt = ctxt;
172 da.da_iterations = iterations;
173 da.da_index = 0;
174 da.da_thr_cnt = _dispatch_hw_config.cc_max_active;
175 da.da_queue = NULL;
176
177 if (da.da_thr_cnt > DISPATCH_APPLY_MAX_CPUS) {
178 da.da_thr_cnt = DISPATCH_APPLY_MAX_CPUS;
179 }
180 if (slowpath(iterations == 0)) {
181 return;
182 }
183 if (iterations < da.da_thr_cnt) {
184 da.da_thr_cnt = (uint32_t)iterations;
185 }
186 if (slowpath(dq->dq_width <= 2) || slowpath(da.da_thr_cnt <= 1) ||
187 slowpath(_dispatch_thread_getspecific(dispatch_apply_key))) {
188 return dispatch_sync_f(dq, &da, _dispatch_apply_serial);
189 }
190 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
191 if (slowpath(dq->do_targetq)) {
192 if (slowpath(dq == old_dq)) {
193 return dispatch_sync_f(dq, &da, _dispatch_apply_serial);
194 } else {
195 da.da_queue = dq;
196 return dispatch_sync_f(dq, &da, _dispatch_apply_redirect);
197 }
198 }
199 dispatch_atomic_acquire_barrier();
200 _dispatch_thread_setspecific(dispatch_queue_key, dq);
201 _dispatch_apply_f2(dq, &da, _dispatch_apply2);
202 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
203 }
204
205 #ifdef __BLOCKS__
206 #if DISPATCH_COCOA_COMPAT
207 DISPATCH_NOINLINE
208 static void
209 _dispatch_apply_slow(size_t iterations, dispatch_queue_t dq,
210 void (^work)(size_t))
211 {
212 struct Block_basic *bb = (void *)_dispatch_Block_copy((void *)work);
213 dispatch_apply_f(iterations, dq, bb, (void *)bb->Block_invoke);
214 Block_release(bb);
215 }
216 #endif
217
218 void
219 dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
220 {
221 #if DISPATCH_COCOA_COMPAT
222 // Under GC, blocks transferred to other threads must be Block_copy()ed
223 // rdar://problem/7455071
224 if (dispatch_begin_thread_4GC) {
225 return _dispatch_apply_slow(iterations, dq, work);
226 }
227 #endif
228 struct Block_basic *bb = (void *)work;
229 dispatch_apply_f(iterations, dq, bb, (void *)bb->Block_invoke);
230 }
231 #endif
232
233 #if 0
234 #ifdef __BLOCKS__
235 void
236 dispatch_stride(size_t offset, size_t stride, size_t iterations,
237 dispatch_queue_t dq, void (^work)(size_t))
238 {
239 struct Block_basic *bb = (void *)work;
240 dispatch_stride_f(offset, stride, iterations, dq, bb,
241 (void *)bb->Block_invoke);
242 }
243 #endif
244
245 DISPATCH_NOINLINE
246 void
247 dispatch_stride_f(size_t offset, size_t stride, size_t iterations,
248 dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
249 {
250 if (stride == 0) {
251 stride = 1;
252 }
253 dispatch_apply(iterations / stride, queue, ^(size_t idx) {
254 size_t i = idx * stride + offset;
255 size_t stop = i + stride;
256 do {
257 func(ctxt, i++);
258 } while (i < stop);
259 });
260
261 dispatch_sync(queue, ^{
262 size_t i;
263 for (i = iterations - (iterations % stride); i < iterations; i++) {
264 func(ctxt, i + offset);
265 }
266 });
267 }
268 #endif