2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 typedef void (*dispatch_apply_function_t
)(void *, size_t);
25 DISPATCH_ALWAYS_INLINE
27 _dispatch_apply_invoke2(void *ctxt
, bool redirect
)
29 dispatch_apply_t da
= (dispatch_apply_t
)ctxt
;
30 size_t const iter
= da
->da_iterations
;
33 idx
= dispatch_atomic_inc_orig2o(da
, da_index
, acquire
);
34 if (!fastpath(idx
< iter
)) goto out
;
36 // da_dc is only safe to access once the 'index lock' has been acquired
37 dispatch_apply_function_t
const func
= (void *)da
->da_dc
->dc_func
;
38 void *const da_ctxt
= da
->da_dc
->dc_ctxt
;
39 dispatch_queue_t dq
= da
->da_dc
->dc_data
;
41 _dispatch_perfmon_workitem_dec(); // this unit executes many items
43 // Handle nested dispatch_apply rdar://problem/9294578
44 size_t nested
= (size_t)_dispatch_thread_getspecific(dispatch_apply_key
);
45 _dispatch_thread_setspecific(dispatch_apply_key
, (void*)da
->da_nested
);
47 dispatch_queue_t old_dq
;
48 pthread_priority_t old_dp
;
50 old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
51 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
52 old_dp
= _dispatch_set_defaultpriority(dq
->dq_priority
);
55 // Striding is the responsibility of the caller.
57 _dispatch_client_callout2(da_ctxt
, idx
, func
);
58 _dispatch_perfmon_workitem_inc();
60 idx
= dispatch_atomic_inc_orig2o(da
, da_index
, relaxed
);
61 } while (fastpath(idx
< iter
));
64 _dispatch_reset_defaultpriority(old_dp
);
65 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
67 _dispatch_thread_setspecific(dispatch_apply_key
, (void*)nested
);
69 // The thread that finished the last workitem wakes up the possibly waiting
70 // thread that called dispatch_apply. They could be one and the same.
71 if (!dispatch_atomic_sub2o(da
, da_todo
, done
, release
)) {
72 _dispatch_thread_semaphore_signal(da
->da_sema
);
75 if (dispatch_atomic_dec2o(da
, da_thr_cnt
, release
) == 0) {
76 _dispatch_continuation_free((dispatch_continuation_t
)da
);
82 _dispatch_apply_invoke(void *ctxt
)
84 _dispatch_apply_invoke2(ctxt
, false);
89 _dispatch_apply_redirect_invoke(void *ctxt
)
91 _dispatch_apply_invoke2(ctxt
, true);
95 _dispatch_apply_serial(void *ctxt
)
97 dispatch_apply_t da
= (dispatch_apply_t
)ctxt
;
98 dispatch_continuation_t dc
= da
->da_dc
;
99 size_t const iter
= da
->da_iterations
;
102 _dispatch_perfmon_workitem_dec(); // this unit executes many items
104 _dispatch_client_callout2(dc
->dc_ctxt
, idx
, (void*)dc
->dc_func
);
105 _dispatch_perfmon_workitem_inc();
106 } while (++idx
< iter
);
108 _dispatch_continuation_free((dispatch_continuation_t
)da
);
111 DISPATCH_ALWAYS_INLINE
113 _dispatch_apply_f2(dispatch_queue_t dq
, dispatch_apply_t da
,
114 dispatch_function_t func
)
117 dispatch_continuation_t head
= NULL
, tail
= NULL
;
119 // The current thread does not need a continuation
120 uint32_t continuation_cnt
= da
->da_thr_cnt
- 1;
122 dispatch_assert(continuation_cnt
);
124 for (i
= 0; i
< continuation_cnt
; i
++) {
125 dispatch_continuation_t next
= _dispatch_continuation_alloc();
126 next
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
127 next
->dc_func
= func
;
129 _dispatch_continuation_voucher_set(next
, 0);
130 _dispatch_continuation_priority_set(next
, 0, 0);
132 next
->do_next
= head
;
140 _dispatch_thread_semaphore_t sema
= _dispatch_get_thread_semaphore();
143 _dispatch_queue_push_list(dq
, head
, tail
, head
->dc_priority
,
145 // Call the first element directly
146 _dispatch_apply_invoke(da
);
147 _dispatch_perfmon_workitem_inc();
149 _dispatch_thread_semaphore_wait(sema
);
150 _dispatch_put_thread_semaphore(sema
);
155 _dispatch_apply_redirect(void *ctxt
)
157 dispatch_apply_t da
= (dispatch_apply_t
)ctxt
;
158 uint32_t da_width
= 2 * (da
->da_thr_cnt
- 1);
159 dispatch_queue_t dq
= da
->da_dc
->dc_data
, rq
= dq
, tq
;
162 uint32_t running
, width
= rq
->dq_width
;
163 running
= dispatch_atomic_add2o(rq
, dq_running
, da_width
, relaxed
);
164 if (slowpath(running
> width
)) {
165 uint32_t excess
= width
> 1 ? running
- width
: da_width
;
166 for (tq
= dq
; 1; tq
= tq
->do_targetq
) {
167 (void)dispatch_atomic_sub2o(tq
, dq_running
, excess
, relaxed
);
173 if (slowpath(!da_width
)) {
174 return _dispatch_apply_serial(da
);
176 da
->da_thr_cnt
-= excess
/ 2;
179 } while (slowpath(rq
->do_targetq
));
180 _dispatch_apply_f2(rq
, da
, _dispatch_apply_redirect_invoke
);
182 (void)dispatch_atomic_sub2o(dq
, dq_running
, da_width
, relaxed
);
184 } while (slowpath(dq
->do_targetq
));
187 #define DISPATCH_APPLY_MAX UINT16_MAX // must be < sqrt(SIZE_MAX)
191 dispatch_apply_f(size_t iterations
, dispatch_queue_t dq
, void *ctxt
,
192 void (*func
)(void *, size_t))
194 if (slowpath(iterations
== 0)) {
197 uint32_t thr_cnt
= dispatch_hw_config(active_cpus
);
198 size_t nested
= (size_t)_dispatch_thread_getspecific(dispatch_apply_key
);
199 if (!slowpath(nested
)) {
202 thr_cnt
= nested
< thr_cnt
? thr_cnt
/ nested
: 1;
203 nested
= nested
< DISPATCH_APPLY_MAX
&& iterations
< DISPATCH_APPLY_MAX
204 ? nested
* iterations
: DISPATCH_APPLY_MAX
;
206 if (iterations
< thr_cnt
) {
207 thr_cnt
= (uint32_t)iterations
;
209 struct dispatch_continuation_s dc
= {
210 .dc_func
= (void*)func
,
213 dispatch_apply_t da
= (typeof(da
))_dispatch_continuation_alloc();
215 da
->da_todo
= iterations
;
216 da
->da_iterations
= iterations
;
217 da
->da_nested
= nested
;
218 da
->da_thr_cnt
= thr_cnt
;
221 dispatch_queue_t old_dq
;
222 old_dq
= (dispatch_queue_t
)_dispatch_thread_getspecific(dispatch_queue_key
);
223 if (slowpath(dq
== DISPATCH_APPLY_CURRENT_ROOT_QUEUE
)) {
224 dq
= old_dq
? old_dq
: _dispatch_get_root_queue(
225 _DISPATCH_QOS_CLASS_DEFAULT
, false);
226 while (slowpath(dq
->do_targetq
)) {
230 if (slowpath(dq
->dq_width
<= 2) || slowpath(thr_cnt
<= 1)) {
231 return dispatch_sync_f(dq
, da
, _dispatch_apply_serial
);
233 if (slowpath(dq
->do_targetq
)) {
234 if (slowpath(dq
== old_dq
)) {
235 return dispatch_sync_f(dq
, da
, _dispatch_apply_serial
);
238 return dispatch_sync_f(dq
, da
, _dispatch_apply_redirect
);
241 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
242 _dispatch_apply_f2(dq
, da
, _dispatch_apply_invoke
);
243 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
247 #if DISPATCH_COCOA_COMPAT
250 _dispatch_apply_slow(size_t iterations
, dispatch_queue_t dq
,
251 void (^work
)(size_t))
253 dispatch_block_t bb
= _dispatch_Block_copy((void *)work
);
254 dispatch_apply_f(iterations
, dq
, bb
,
255 (dispatch_apply_function_t
)_dispatch_Block_invoke(bb
));
261 dispatch_apply(size_t iterations
, dispatch_queue_t dq
, void (^work
)(size_t))
263 #if DISPATCH_COCOA_COMPAT
264 // Under GC, blocks transferred to other threads must be Block_copy()ed
265 // rdar://problem/7455071
266 if (dispatch_begin_thread_4GC
) {
267 return _dispatch_apply_slow(iterations
, dq
, work
);
270 dispatch_apply_f(iterations
, dq
, work
,
271 (dispatch_apply_function_t
)_dispatch_Block_invoke(work
));
278 dispatch_stride(size_t offset
, size_t stride
, size_t iterations
,
279 dispatch_queue_t dq
, void (^work
)(size_t))
281 dispatch_stride_f(offset
, stride
, iterations
, dq
, work
,
282 (dispatch_apply_function_t
)_dispatch_Block_invoke(work
));
288 dispatch_stride_f(size_t offset
, size_t stride
, size_t iterations
,
289 dispatch_queue_t dq
, void *ctxt
, void (*func
)(void *, size_t))
294 dispatch_apply(iterations
/ stride
, queue
, ^(size_t idx
) {
295 size_t i
= idx
* stride
+ offset
;
296 size_t stop
= i
+ stride
;
302 dispatch_sync(queue
, ^{
304 for (i
= iterations
- (iterations
% stride
); i
< iterations
; i
++) {
305 func(ctxt
, i
+ offset
);