2 * Copyright (c) 2008-2011 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 DISPATCH_ALWAYS_INLINE
25 _dispatch_apply_invoke(void *ctxt
)
27 dispatch_apply_t da
= ctxt
;
28 size_t const iter
= da
->da_iterations
;
29 typeof(da
->da_func
) const func
= da
->da_func
;
30 void *const da_ctxt
= da
->da_ctxt
;
33 _dispatch_workitem_dec(); // this unit executes many items
35 // Make nested dispatch_apply fall into serial case rdar://problem/9294578
36 _dispatch_thread_setspecific(dispatch_apply_key
, (void*)~0ul);
37 // Striding is the responsibility of the caller.
38 while (fastpath((idx
= dispatch_atomic_inc2o(da
, da_index
) - 1) < iter
)) {
39 _dispatch_client_callout2(da_ctxt
, idx
, func
);
40 _dispatch_workitem_inc();
43 _dispatch_thread_setspecific(dispatch_apply_key
, NULL
);
45 dispatch_atomic_release_barrier();
47 // The thread that finished the last workitem wakes up the (possibly waiting)
48 // thread that called dispatch_apply. They could be one and the same.
49 if (done
&& (dispatch_atomic_add2o(da
, da_done
, done
) == iter
)) {
50 _dispatch_thread_semaphore_signal(da
->da_sema
);
53 if (dispatch_atomic_dec2o(da
, da_thr_cnt
) == 0) {
54 _dispatch_continuation_free((dispatch_continuation_t
)da
);
60 _dispatch_apply2(void *ctxt
)
62 _dispatch_apply_invoke(ctxt
);
66 _dispatch_apply3(void *ctxt
)
68 dispatch_apply_t da
= ctxt
;
69 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
71 _dispatch_thread_setspecific(dispatch_queue_key
, da
->da_queue
);
72 _dispatch_apply_invoke(ctxt
);
73 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
77 _dispatch_apply_serial(void *ctxt
)
79 dispatch_apply_t da
= ctxt
;
82 _dispatch_workitem_dec(); // this unit executes many items
84 _dispatch_client_callout2(da
->da_ctxt
, idx
, da
->da_func
);
85 _dispatch_workitem_inc();
86 } while (++idx
< da
->da_iterations
);
88 _dispatch_continuation_free((dispatch_continuation_t
)da
);
91 // 64 threads should be good enough for the short to mid term
92 #define DISPATCH_APPLY_MAX_CPUS 64
94 DISPATCH_ALWAYS_INLINE
96 _dispatch_apply_f2(dispatch_queue_t dq
, dispatch_apply_t da
,
97 dispatch_function_t func
)
100 dispatch_continuation_t head
= NULL
, tail
= NULL
;
102 // The current thread does not need a continuation
103 uint32_t continuation_cnt
= da
->da_thr_cnt
- 1;
105 dispatch_assert(continuation_cnt
);
107 for (i
= 0; i
< continuation_cnt
; i
++) {
108 dispatch_continuation_t next
= _dispatch_continuation_alloc();
109 next
->do_vtable
= (void *)DISPATCH_OBJ_ASYNC_BIT
;
110 next
->dc_func
= func
;
113 next
->do_next
= head
;
121 _dispatch_thread_semaphore_t sema
= _dispatch_get_thread_semaphore();
124 _dispatch_queue_push_list(dq
, head
, tail
, continuation_cnt
);
125 // Call the first element directly
126 _dispatch_apply2(da
);
127 _dispatch_workitem_inc();
129 _dispatch_thread_semaphore_wait(sema
);
130 _dispatch_put_thread_semaphore(sema
);
135 _dispatch_apply_redirect(void *ctxt
)
137 dispatch_apply_t da
= ctxt
;
138 uint32_t da_width
= 2 * (da
->da_thr_cnt
- 1);
139 dispatch_queue_t dq
= da
->da_queue
, rq
= dq
, tq
;
142 uint32_t running
= dispatch_atomic_add2o(rq
, dq_running
, da_width
);
143 uint32_t width
= rq
->dq_width
;
144 if (slowpath(running
> width
)) {
145 uint32_t excess
= width
> 1 ? running
- width
: da_width
;
146 for (tq
= dq
; 1; tq
= tq
->do_targetq
) {
147 (void)dispatch_atomic_sub2o(tq
, dq_running
, excess
);
153 if (slowpath(!da_width
)) {
154 return _dispatch_apply_serial(da
);
156 da
->da_thr_cnt
-= excess
/ 2;
159 } while (slowpath(rq
->do_targetq
));
160 _dispatch_apply_f2(rq
, da
, _dispatch_apply3
);
162 (void)dispatch_atomic_sub2o(dq
, dq_running
, da_width
);
164 } while (slowpath(dq
->do_targetq
));
169 dispatch_apply_f(size_t iterations
, dispatch_queue_t dq
, void *ctxt
,
170 void (*func
)(void *, size_t))
172 if (slowpath(iterations
== 0)) {
176 dispatch_apply_t da
= (typeof(da
))_dispatch_continuation_alloc();
180 da
->da_iterations
= iterations
;
182 da
->da_thr_cnt
= _dispatch_hw_config
.cc_max_active
;
186 if (da
->da_thr_cnt
> DISPATCH_APPLY_MAX_CPUS
) {
187 da
->da_thr_cnt
= DISPATCH_APPLY_MAX_CPUS
;
189 if (iterations
< da
->da_thr_cnt
) {
190 da
->da_thr_cnt
= (uint32_t)iterations
;
192 if (slowpath(dq
->dq_width
<= 2) || slowpath(da
->da_thr_cnt
<= 1) ||
193 slowpath(_dispatch_thread_getspecific(dispatch_apply_key
))) {
194 return dispatch_sync_f(dq
, da
, _dispatch_apply_serial
);
196 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
197 if (slowpath(dq
->do_targetq
)) {
198 if (slowpath(dq
== old_dq
)) {
199 return dispatch_sync_f(dq
, da
, _dispatch_apply_serial
);
202 return dispatch_sync_f(dq
, da
, _dispatch_apply_redirect
);
205 dispatch_atomic_acquire_barrier();
206 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
207 _dispatch_apply_f2(dq
, da
, _dispatch_apply2
);
208 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
212 #if DISPATCH_COCOA_COMPAT
215 _dispatch_apply_slow(size_t iterations
, dispatch_queue_t dq
,
216 void (^work
)(size_t))
218 struct Block_basic
*bb
= (void *)_dispatch_Block_copy((void *)work
);
219 dispatch_apply_f(iterations
, dq
, bb
, (void *)bb
->Block_invoke
);
225 dispatch_apply(size_t iterations
, dispatch_queue_t dq
, void (^work
)(size_t))
227 #if DISPATCH_COCOA_COMPAT
228 // Under GC, blocks transferred to other threads must be Block_copy()ed
229 // rdar://problem/7455071
230 if (dispatch_begin_thread_4GC
) {
231 return _dispatch_apply_slow(iterations
, dq
, work
);
234 struct Block_basic
*bb
= (void *)work
;
235 dispatch_apply_f(iterations
, dq
, bb
, (void *)bb
->Block_invoke
);
242 dispatch_stride(size_t offset
, size_t stride
, size_t iterations
,
243 dispatch_queue_t dq
, void (^work
)(size_t))
245 struct Block_basic
*bb
= (void *)work
;
246 dispatch_stride_f(offset
, stride
, iterations
, dq
, bb
,
247 (void *)bb
->Block_invoke
);
253 dispatch_stride_f(size_t offset
, size_t stride
, size_t iterations
,
254 dispatch_queue_t dq
, void *ctxt
, void (*func
)(void *, size_t))
259 dispatch_apply(iterations
/ stride
, queue
, ^(size_t idx
) {
260 size_t i
= idx
* stride
+ offset
;
261 size_t stop
= i
+ stride
;
267 dispatch_sync(queue
, ^{
269 for (i
= iterations
- (iterations
% stride
); i
< iterations
; i
++) {
270 func(ctxt
, i
+ offset
);