2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 typedef void (*dispatch_apply_function_t
)(void *, size_t);
24 static char const * const _dispatch_apply_key
= "apply";
26 #define DISPATCH_APPLY_INVOKE_REDIRECT 0x1
27 #define DISPATCH_APPLY_INVOKE_WAIT 0x2
29 DISPATCH_ALWAYS_INLINE
31 _dispatch_apply_invoke2(void *ctxt
, long invoke_flags
)
33 dispatch_apply_t da
= (dispatch_apply_t
)ctxt
;
34 size_t const iter
= da
->da_iterations
;
37 idx
= os_atomic_inc_orig2o(da
, da_index
, acquire
);
38 if (unlikely(idx
>= iter
)) goto out
;
40 // da_dc is only safe to access once the 'index lock' has been acquired
41 dispatch_apply_function_t
const func
= (void *)da
->da_dc
->dc_func
;
42 void *const da_ctxt
= da
->da_dc
->dc_ctxt
;
43 dispatch_queue_t dq
= da
->da_dc
->dc_data
;
45 _dispatch_perfmon_workitem_dec(); // this unit executes many items
47 // Handle nested dispatch_apply rdar://problem/9294578
48 dispatch_thread_context_s apply_ctxt
= {
49 .dtc_key
= _dispatch_apply_key
,
50 .dtc_apply_nesting
= da
->da_nested
,
52 _dispatch_thread_context_push(&apply_ctxt
);
54 dispatch_thread_frame_s dtf
;
55 dispatch_priority_t old_dbp
= 0;
56 if (invoke_flags
& DISPATCH_APPLY_INVOKE_REDIRECT
) {
57 _dispatch_thread_frame_push(&dtf
, dq
);
58 old_dbp
= _dispatch_set_basepri(dq
->dq_priority
);
60 dispatch_invoke_flags_t flags
= da
->da_flags
;
62 // Striding is the responsibility of the caller.
64 dispatch_invoke_with_autoreleasepool(flags
, {
65 _dispatch_client_callout2(da_ctxt
, idx
, func
);
66 _dispatch_perfmon_workitem_inc();
68 idx
= os_atomic_inc_orig2o(da
, da_index
, relaxed
);
70 } while (likely(idx
< iter
));
72 if (invoke_flags
& DISPATCH_APPLY_INVOKE_REDIRECT
) {
73 _dispatch_reset_basepri(old_dbp
);
74 _dispatch_thread_frame_pop(&dtf
);
77 _dispatch_thread_context_pop(&apply_ctxt
);
79 // The thread that finished the last workitem wakes up the possibly waiting
80 // thread that called dispatch_apply. They could be one and the same.
81 if (!os_atomic_sub2o(da
, da_todo
, done
, release
)) {
82 _dispatch_thread_event_signal(&da
->da_event
);
85 if (invoke_flags
& DISPATCH_APPLY_INVOKE_WAIT
) {
86 _dispatch_thread_event_wait(&da
->da_event
);
87 _dispatch_thread_event_destroy(&da
->da_event
);
89 if (os_atomic_dec2o(da
, da_thr_cnt
, release
) == 0) {
90 #if DISPATCH_INTROSPECTION
91 _dispatch_continuation_free(da
->da_dc
);
93 _dispatch_continuation_free((dispatch_continuation_t
)da
);
99 _dispatch_apply_invoke(void *ctxt
)
101 _dispatch_apply_invoke2(ctxt
, 0);
106 _dispatch_apply_invoke_and_wait(void *ctxt
)
108 _dispatch_apply_invoke2(ctxt
, DISPATCH_APPLY_INVOKE_WAIT
);
109 _dispatch_perfmon_workitem_inc();
114 _dispatch_apply_redirect_invoke(void *ctxt
)
116 _dispatch_apply_invoke2(ctxt
, DISPATCH_APPLY_INVOKE_REDIRECT
);
119 DISPATCH_ALWAYS_INLINE
120 static inline dispatch_invoke_flags_t
121 _dispatch_apply_autorelease_frequency(dispatch_queue_t dq
)
123 dispatch_invoke_flags_t qaf
= 0;
126 qaf
= _dispatch_queue_autorelease_frequency(dq
);
134 _dispatch_apply_serial(void *ctxt
)
136 dispatch_apply_t da
= (dispatch_apply_t
)ctxt
;
137 dispatch_continuation_t dc
= da
->da_dc
;
138 size_t const iter
= da
->da_iterations
;
139 dispatch_invoke_flags_t flags
;
142 _dispatch_perfmon_workitem_dec(); // this unit executes many items
143 flags
= _dispatch_apply_autorelease_frequency(dc
->dc_data
);
145 dispatch_invoke_with_autoreleasepool(flags
, {
146 _dispatch_client_callout2(dc
->dc_ctxt
, idx
, (void*)dc
->dc_func
);
147 _dispatch_perfmon_workitem_inc();
149 } while (++idx
< iter
);
151 #if DISPATCH_INTROSPECTION
152 _dispatch_continuation_free(da
->da_dc
);
154 _dispatch_continuation_free((dispatch_continuation_t
)da
);
157 DISPATCH_ALWAYS_INLINE
159 _dispatch_apply_f2(dispatch_queue_t dq
, dispatch_apply_t da
,
160 dispatch_function_t func
)
163 dispatch_continuation_t head
= NULL
, tail
= NULL
;
165 // The current thread does not need a continuation
166 int32_t continuation_cnt
= da
->da_thr_cnt
- 1;
168 dispatch_assert(continuation_cnt
);
170 for (i
= 0; i
< continuation_cnt
; i
++) {
171 dispatch_continuation_t next
= _dispatch_continuation_alloc();
172 uintptr_t dc_flags
= DISPATCH_OBJ_CONSUME_BIT
;
174 _dispatch_continuation_init_f(next
, dq
, da
, func
, 0, 0, dc_flags
);
175 next
->do_next
= head
;
183 _dispatch_thread_event_init(&da
->da_event
);
184 // FIXME: dq may not be the right queue for the priority of `head`
185 _dispatch_root_queue_push_inline(dq
, head
, tail
, continuation_cnt
);
186 // Call the first element directly
187 _dispatch_apply_invoke_and_wait(da
);
192 _dispatch_apply_redirect(void *ctxt
)
194 dispatch_apply_t da
= (dispatch_apply_t
)ctxt
;
195 int32_t da_width
= da
->da_thr_cnt
- 1;
196 dispatch_queue_t dq
= da
->da_dc
->dc_data
, rq
= dq
, tq
;
199 int32_t width
= _dispatch_queue_try_reserve_apply_width(rq
, da_width
);
201 if (unlikely(da_width
> width
)) {
202 int32_t excess
= da_width
- width
;
203 for (tq
= dq
; tq
!= rq
; tq
= tq
->do_targetq
) {
204 _dispatch_queue_relinquish_width(tq
, excess
);
207 if (unlikely(!da_width
)) {
208 return _dispatch_apply_serial(da
);
210 da
->da_thr_cnt
-= excess
;
213 // find first queue in descending target queue order that has
214 // an autorelease frequency set, and use that as the frequency for
215 // this continuation.
216 da
->da_flags
= _dispatch_queue_autorelease_frequency(dq
);
219 } while (unlikely(rq
->do_targetq
));
220 _dispatch_apply_f2(rq
, da
, _dispatch_apply_redirect_invoke
);
222 _dispatch_queue_relinquish_width(dq
, da_width
);
224 } while (unlikely(dq
->do_targetq
));
227 #define DISPATCH_APPLY_MAX UINT16_MAX // must be < sqrt(SIZE_MAX)
229 DISPATCH_ALWAYS_INLINE
230 static inline dispatch_queue_t
231 _dispatch_apply_root_queue(dispatch_queue_t dq
)
234 while (unlikely(dq
->do_targetq
)) {
237 // if the current root queue is a pthread root queue, select it
238 if (!_dispatch_priority_qos(dq
->dq_priority
)) {
243 pthread_priority_t pp
= _dispatch_get_priority();
244 dispatch_qos_t qos
= _dispatch_qos_from_pp(pp
);
245 return _dispatch_get_root_queue(qos
? qos
: DISPATCH_QOS_DEFAULT
, false);
250 dispatch_apply_f(size_t iterations
, dispatch_queue_t dq
, void *ctxt
,
251 void (*func
)(void *, size_t))
253 if (unlikely(iterations
== 0)) {
256 dispatch_thread_context_t dtctxt
=
257 _dispatch_thread_context_find(_dispatch_apply_key
);
258 size_t nested
= dtctxt
? dtctxt
->dtc_apply_nesting
: 0;
259 dispatch_queue_t old_dq
= _dispatch_queue_get_current();
261 if (likely(dq
== DISPATCH_APPLY_AUTO
)) {
262 dq
= _dispatch_apply_root_queue(old_dq
);
264 dispatch_qos_t qos
= _dispatch_priority_qos(dq
->dq_priority
);
265 if (unlikely(dq
->do_targetq
)) {
266 // if the queue passed-in is not a root queue, use the current QoS
267 // since the caller participates in the work anyway
268 qos
= _dispatch_qos_from_pp(_dispatch_get_priority());
270 int32_t thr_cnt
= (int32_t)_dispatch_qos_max_parallelism(qos
,
271 DISPATCH_MAX_PARALLELISM_ACTIVE
);
273 if (likely(!nested
)) {
276 thr_cnt
= nested
< (size_t)thr_cnt
? thr_cnt
/ (int32_t)nested
: 1;
277 nested
= nested
< DISPATCH_APPLY_MAX
&& iterations
< DISPATCH_APPLY_MAX
278 ? nested
* iterations
: DISPATCH_APPLY_MAX
;
280 if (iterations
< (size_t)thr_cnt
) {
281 thr_cnt
= (int32_t)iterations
;
283 struct dispatch_continuation_s dc
= {
284 .dc_func
= (void*)func
,
288 dispatch_apply_t da
= (typeof(da
))_dispatch_continuation_alloc();
290 da
->da_todo
= iterations
;
291 da
->da_iterations
= iterations
;
292 da
->da_nested
= nested
;
293 da
->da_thr_cnt
= thr_cnt
;
294 #if DISPATCH_INTROSPECTION
295 da
->da_dc
= _dispatch_continuation_alloc();
302 if (unlikely(dq
->dq_width
== 1 || thr_cnt
<= 1)) {
303 return dispatch_sync_f(dq
, da
, _dispatch_apply_serial
);
305 if (unlikely(dq
->do_targetq
)) {
306 if (unlikely(dq
== old_dq
)) {
307 return dispatch_sync_f(dq
, da
, _dispatch_apply_serial
);
309 return dispatch_sync_f(dq
, da
, _dispatch_apply_redirect
);
313 dispatch_thread_frame_s dtf
;
314 _dispatch_thread_frame_push(&dtf
, dq
);
315 _dispatch_apply_f2(dq
, da
, _dispatch_apply_invoke
);
316 _dispatch_thread_frame_pop(&dtf
);
321 dispatch_apply(size_t iterations
, dispatch_queue_t dq
, void (^work
)(size_t))
323 dispatch_apply_f(iterations
, dq
, work
,
324 (dispatch_apply_function_t
)_dispatch_Block_invoke(work
));
331 dispatch_stride(size_t offset
, size_t stride
, size_t iterations
,
332 dispatch_queue_t dq
, void (^work
)(size_t))
334 dispatch_stride_f(offset
, stride
, iterations
, dq
, work
,
335 (dispatch_apply_function_t
)_dispatch_Block_invoke(work
));
341 dispatch_stride_f(size_t offset
, size_t stride
, size_t iterations
,
342 dispatch_queue_t dq
, void *ctxt
, void (*func
)(void *, size_t))
347 dispatch_apply(iterations
/ stride
, queue
, ^(size_t idx
) {
348 size_t i
= idx
* stride
+ offset
;
349 size_t stop
= i
+ stride
;
355 dispatch_sync(queue
, ^{
357 for (i
= iterations
- (iterations
% stride
); i
< iterations
; i
++) {
358 func(ctxt
, i
+ offset
);