2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
22 // We'd use __attribute__((aligned(x))), but it does not atually increase the
23 // alignment of stack variables. All we really need is the stack usage of the
24 // local thread to be sufficiently away to avoid cache-line contention with the
25 // busy 'da_index' variable.
27 // NOTE: 'char' arrays cause GCC to insert buffer overflow detection logic
28 struct dispatch_apply_s
{
29 long _da_pad0
[DISPATCH_CACHELINE_SIZE
/ sizeof(long)];
30 void (*da_func
)(void *, size_t);
35 dispatch_semaphore_t da_sema
;
36 long _da_pad1
[DISPATCH_CACHELINE_SIZE
/ sizeof(long)];
40 _dispatch_apply2(void *_ctxt
)
42 struct dispatch_apply_s
*da
= _ctxt
;
43 size_t const iter
= da
->da_iterations
;
44 typeof(da
->da_func
) const func
= da
->da_func
;
45 void *const ctxt
= da
->da_ctxt
;
48 _dispatch_workitem_dec(); // this unit executes many items
50 // Striding is the responsibility of the caller.
51 while (fastpath((idx
= dispatch_atomic_inc(&da
->da_index
) - 1) < iter
)) {
53 _dispatch_workitem_inc();
56 if (dispatch_atomic_dec(&da
->da_thr_cnt
) == 0) {
57 dispatch_semaphore_signal(da
->da_sema
);
62 _dispatch_apply_serial(void *context
)
64 struct dispatch_apply_s
*da
= context
;
67 _dispatch_workitem_dec(); // this unit executes many items
69 da
->da_func(da
->da_ctxt
, idx
);
70 _dispatch_workitem_inc();
71 } while (++idx
< da
->da_iterations
);
76 dispatch_apply(size_t iterations
, dispatch_queue_t dq
, void (^work
)(size_t))
78 struct Block_basic
*bb
= (void *)work
;
80 dispatch_apply_f(iterations
, dq
, bb
, (void *)bb
->Block_invoke
);
84 // 256 threads should be good enough for the short to mid term
85 #define DISPATCH_APPLY_MAX_CPUS 256
89 dispatch_apply_f(size_t iterations
, dispatch_queue_t dq
, void *ctxt
, void (*func
)(void *, size_t))
91 struct dispatch_apply_dc_s
{
92 DISPATCH_CONTINUATION_HEADER(dispatch_apply_dc_s
);
93 } da_dc
[DISPATCH_APPLY_MAX_CPUS
];
94 struct dispatch_apply_s da
;
99 da
.da_iterations
= iterations
;
101 da
.da_thr_cnt
= _dispatch_hw_config
.cc_max_active
;
103 if (da
.da_thr_cnt
> DISPATCH_APPLY_MAX_CPUS
) {
104 da
.da_thr_cnt
= DISPATCH_APPLY_MAX_CPUS
;
106 if (slowpath(iterations
== 0)) {
109 if (iterations
< da
.da_thr_cnt
) {
110 da
.da_thr_cnt
= (uint32_t)iterations
;
112 if (slowpath(dq
->dq_width
<= 2 || da
.da_thr_cnt
<= 1)) {
113 return dispatch_sync_f(dq
, &da
, _dispatch_apply_serial
);
116 for (i
= 0; i
< da
.da_thr_cnt
; i
++) {
117 da_dc
[i
].do_vtable
= NULL
;
118 da_dc
[i
].do_next
= &da_dc
[i
+ 1];
119 da_dc
[i
].dc_func
= _dispatch_apply2
;
120 da_dc
[i
].dc_ctxt
= &da
;
123 da
.da_sema
= _dispatch_get_thread_semaphore();
125 // some queues are easy to borrow and some are not
126 if (slowpath(dq
->do_targetq
)) {
127 _dispatch_queue_push_list(dq
, (void *)&da_dc
[0], (void *)&da_dc
[da
.da_thr_cnt
- 1]);
129 dispatch_queue_t old_dq
= _dispatch_thread_getspecific(dispatch_queue_key
);
130 // root queues are always concurrent and safe to borrow
131 _dispatch_queue_push_list(dq
, (void *)&da_dc
[1], (void *)&da_dc
[da
.da_thr_cnt
- 1]);
132 _dispatch_thread_setspecific(dispatch_queue_key
, dq
);
133 // The first da_dc[] element was explicitly not pushed on to the queue.
134 // We need to either call it like so:
135 // da_dc[0].dc_func(da_dc[0].dc_ctxt);
136 // Or, given that we know the 'func' and 'ctxt', we can call it directly:
137 _dispatch_apply2(&da
);
138 _dispatch_workitem_inc();
139 _dispatch_thread_setspecific(dispatch_queue_key
, old_dq
);
141 dispatch_semaphore_wait(da
.da_sema
, DISPATCH_TIME_FOREVER
);
142 _dispatch_put_thread_semaphore(da
.da_sema
);
148 dispatch_stride(size_t offset
, size_t stride
, size_t iterations
, dispatch_queue_t dq
, void (^work
)(size_t))
150 struct Block_basic
*bb
= (void *)work
;
151 dispatch_stride_f(offset
, stride
, iterations
, dq
, bb
, (void *)bb
->Block_invoke
);
157 dispatch_stride_f(size_t offset
, size_t stride
, size_t iterations
,
158 dispatch_queue_t dq
, void *ctxt
, void (*func
)(void *, size_t))
163 dispatch_apply(iterations
/ stride
, queue
, ^(size_t idx
) {
164 size_t i
= idx
* stride
+ offset
;
165 size_t stop
= i
+ stride
;
171 dispatch_sync(queue
, ^{
173 for (i
= iterations
- (iterations
% stride
); i
< iterations
; i
++) {
174 func(ctxt
, i
+ offset
);