2 * Copyright (c) 2008-2011 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
22 // We'd use __attribute__((aligned(x))), but it does not atually increase the
23 // alignment of stack variables. All we really need is the stack usage of the
24 // local thread to be sufficiently away to avoid cache-line contention with the
25 // busy 'da_index' variable.
27 // NOTE: 'char' arrays cause GCC to insert buffer overflow detection logic
28 struct dispatch_apply_s
{
29 long _da_pad0
[DISPATCH_CACHELINE_SIZE
/ sizeof(long)];
30 void (*da_func
)(void *, size_t);
35 _dispatch_thread_semaphore_t da_sema
;
36 dispatch_queue_t da_queue
;
37 long _da_pad1
[DISPATCH_CACHELINE_SIZE
/ sizeof(long)];
40 DISPATCH_ALWAYS_INLINE
42 _dispatch_apply_invoke(void *ctxt
)
44 struct dispatch_apply_s
*da
= ctxt
;
45 size_t const iter
= da
->da_iterations
;
46 typeof(da
->da_func
) const func
= da
->da_func
;
47 void *const da_ctxt
= da
->da_ctxt
;
50 _dispatch_workitem_dec(); // this unit executes many items
52 // Make nested dispatch_apply fall into serial case rdar://problem/9294578
53 _dispatch_thread_setspecific(dispatch_apply_key
, (void*)~0ul);
54 // Striding is the responsibility of the caller.
55 while (fastpath((idx
= dispatch_atomic_inc2o(da
, da_index
) - 1) < iter
)) {
56 _dispatch_client_callout2(da_ctxt
, idx
, func
);
57 _dispatch_workitem_inc();
59 _dispatch_thread_setspecific(dispatch_apply_key
, NULL
);
61 dispatch_atomic_release_barrier();
62 if (dispatch_atomic_dec2o(da
, da_thr_cnt
) == 0) {
63 _dispatch_thread_semaphore_signal(da
->da_sema
);
69 _dispatch_apply2(void *ctxt
)
71 _dispatch_apply_invoke(ctxt
);
75 _dispatch_apply3(void *ctxt
)
77 struct dispatch_apply_s
*da
= ctxt
;
78 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
80 _dispatch_thread_setspecific(dispatch_queue_key
, da
->da_queue
);
81 _dispatch_apply_invoke(ctxt
);
82 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
86 _dispatch_apply_serial(void *ctxt
)
88 struct dispatch_apply_s
*da
= ctxt
;
91 _dispatch_workitem_dec(); // this unit executes many items
93 _dispatch_client_callout2(da
->da_ctxt
, idx
, da
->da_func
);
94 _dispatch_workitem_inc();
95 } while (++idx
< da
->da_iterations
);
98 // 256 threads should be good enough for the short to mid term
99 #define DISPATCH_APPLY_MAX_CPUS 256
101 DISPATCH_ALWAYS_INLINE
103 _dispatch_apply_f2(dispatch_queue_t dq
, struct dispatch_apply_s
*da
,
104 dispatch_function_t func
)
106 struct dispatch_apply_dc_s
{
107 DISPATCH_CONTINUATION_HEADER(dispatch_apply_dc_s
);
108 } da_dc
[DISPATCH_APPLY_MAX_CPUS
];
111 for (i
= 0; i
< da
->da_thr_cnt
- 1; i
++) {
112 da_dc
[i
].do_vtable
= NULL
;
113 da_dc
[i
].do_next
= &da_dc
[i
+ 1];
114 da_dc
[i
].dc_func
= func
;
115 da_dc
[i
].dc_ctxt
= da
;
118 da
->da_sema
= _dispatch_get_thread_semaphore();
120 _dispatch_queue_push_list(dq
, (void *)&da_dc
[0],
121 (void *)&da_dc
[da
->da_thr_cnt
- 2]);
122 // Call the first element directly
123 _dispatch_apply2(da
);
124 _dispatch_workitem_inc();
126 _dispatch_thread_semaphore_wait(da
->da_sema
);
127 _dispatch_put_thread_semaphore(da
->da_sema
);
131 _dispatch_apply_redirect(void *ctxt
)
133 struct dispatch_apply_s
*da
= ctxt
;
134 uint32_t da_width
= 2 * (da
->da_thr_cnt
- 1);
135 dispatch_queue_t dq
= da
->da_queue
, rq
= dq
, tq
;
138 uint32_t running
= dispatch_atomic_add2o(rq
, dq_running
, da_width
);
139 uint32_t width
= rq
->dq_width
;
140 if (slowpath(running
> width
)) {
141 uint32_t excess
= width
> 1 ? running
- width
: da_width
;
142 for (tq
= dq
; 1; tq
= tq
->do_targetq
) {
143 (void)dispatch_atomic_sub2o(tq
, dq_running
, excess
);
149 if (slowpath(!da_width
)) {
150 return _dispatch_apply_serial(da
);
152 da
->da_thr_cnt
-= excess
/ 2;
155 } while (slowpath(rq
->do_targetq
));
156 _dispatch_apply_f2(rq
, da
, _dispatch_apply3
);
158 (void)dispatch_atomic_sub2o(dq
, dq_running
, da_width
);
160 } while (slowpath(dq
->do_targetq
));
165 dispatch_apply_f(size_t iterations
, dispatch_queue_t dq
, void *ctxt
,
166 void (*func
)(void *, size_t))
168 struct dispatch_apply_s da
;
172 da
.da_iterations
= iterations
;
174 da
.da_thr_cnt
= _dispatch_hw_config
.cc_max_active
;
177 if (da
.da_thr_cnt
> DISPATCH_APPLY_MAX_CPUS
) {
178 da
.da_thr_cnt
= DISPATCH_APPLY_MAX_CPUS
;
180 if (slowpath(iterations
== 0)) {
183 if (iterations
< da
.da_thr_cnt
) {
184 da
.da_thr_cnt
= (uint32_t)iterations
;
186 if (slowpath(dq
->dq_width
<= 2) || slowpath(da
.da_thr_cnt
<= 1) ||
187 slowpath(_dispatch_thread_getspecific(dispatch_apply_key
))) {
188 return dispatch_sync_f(dq
, &da
, _dispatch_apply_serial
);
190 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
191 if (slowpath(dq
->do_targetq
)) {
192 if (slowpath(dq
== old_dq
)) {
193 return dispatch_sync_f(dq
, &da
, _dispatch_apply_serial
);
196 return dispatch_sync_f(dq
, &da
, _dispatch_apply_redirect
);
199 dispatch_atomic_acquire_barrier();
200 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
201 _dispatch_apply_f2(dq
, &da
, _dispatch_apply2
);
202 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
206 #if DISPATCH_COCOA_COMPAT
209 _dispatch_apply_slow(size_t iterations
, dispatch_queue_t dq
,
210 void (^work
)(size_t))
212 struct Block_basic
*bb
= (void *)_dispatch_Block_copy((void *)work
);
213 dispatch_apply_f(iterations
, dq
, bb
, (void *)bb
->Block_invoke
);
219 dispatch_apply(size_t iterations
, dispatch_queue_t dq
, void (^work
)(size_t))
221 #if DISPATCH_COCOA_COMPAT
222 // Under GC, blocks transferred to other threads must be Block_copy()ed
223 // rdar://problem/7455071
224 if (dispatch_begin_thread_4GC
) {
225 return _dispatch_apply_slow(iterations
, dq
, work
);
228 struct Block_basic
*bb
= (void *)work
;
229 dispatch_apply_f(iterations
, dq
, bb
, (void *)bb
->Block_invoke
);
236 dispatch_stride(size_t offset
, size_t stride
, size_t iterations
,
237 dispatch_queue_t dq
, void (^work
)(size_t))
239 struct Block_basic
*bb
= (void *)work
;
240 dispatch_stride_f(offset
, stride
, iterations
, dq
, bb
,
241 (void *)bb
->Block_invoke
);
247 dispatch_stride_f(size_t offset
, size_t stride
, size_t iterations
,
248 dispatch_queue_t dq
, void *ctxt
, void (*func
)(void *, size_t))
253 dispatch_apply(iterations
/ stride
, queue
, ^(size_t idx
) {
254 size_t i
= idx
* stride
+ offset
;
255 size_t stop
= i
+ stride
;
261 dispatch_sync(queue
, ^{
263 for (i
= iterations
- (iterations
% stride
); i
< iterations
; i
++) {
264 func(ctxt
, i
+ offset
);