]> git.saurik.com Git - apple/libdispatch.git/blob - src/apply.c
libdispatch-84.5.5.tar.gz
[apple/libdispatch.git] / src / apply.c
1 /*
2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20 #include "internal.h"
21
22 // We'd use __attribute__((aligned(x))), but it does not atually increase the
23 // alignment of stack variables. All we really need is the stack usage of the
24 // local thread to be sufficiently away to avoid cache-line contention with the
25 // busy 'da_index' variable.
26 //
27 // NOTE: 'char' arrays cause GCC to insert buffer overflow detection logic
28 struct dispatch_apply_s {
29 long _da_pad0[DISPATCH_CACHELINE_SIZE / sizeof(long)];
30 void (*da_func)(void *, size_t);
31 void *da_ctxt;
32 size_t da_iterations;
33 size_t da_index;
34 uint32_t da_thr_cnt;
35 dispatch_semaphore_t da_sema;
36 long _da_pad1[DISPATCH_CACHELINE_SIZE / sizeof(long)];
37 };
38
39 static void
40 _dispatch_apply2(void *_ctxt)
41 {
42 struct dispatch_apply_s *da = _ctxt;
43 size_t const iter = da->da_iterations;
44 typeof(da->da_func) const func = da->da_func;
45 void *const ctxt = da->da_ctxt;
46 size_t idx;
47
48 _dispatch_workitem_dec(); // this unit executes many items
49
50 // Striding is the responsibility of the caller.
51 while (fastpath((idx = dispatch_atomic_inc(&da->da_index) - 1) < iter)) {
52 func(ctxt, idx);
53 _dispatch_workitem_inc();
54 }
55
56 if (dispatch_atomic_dec(&da->da_thr_cnt) == 0) {
57 dispatch_semaphore_signal(da->da_sema);
58 }
59 }
60
61 static void
62 _dispatch_apply_serial(void *context)
63 {
64 struct dispatch_apply_s *da = context;
65 size_t idx = 0;
66
67 _dispatch_workitem_dec(); // this unit executes many items
68 do {
69 da->da_func(da->da_ctxt, idx);
70 _dispatch_workitem_inc();
71 } while (++idx < da->da_iterations);
72 }
73
74 #ifdef __BLOCKS__
75 #if DISPATCH_COCOA_COMPAT
76 DISPATCH_NOINLINE
77 static void
78 _dispatch_apply_slow(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
79 {
80 struct Block_basic *bb = (void *)_dispatch_Block_copy((void *)work);
81 dispatch_apply_f(iterations, dq, bb, (void *)bb->Block_invoke);
82 Block_release(bb);
83 }
84 #endif
85
86 void
87 dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
88 {
89 #if DISPATCH_COCOA_COMPAT
90 // Under GC, blocks transferred to other threads must be Block_copy()ed
91 // rdar://problem/7455071
92 if (dispatch_begin_thread_4GC) {
93 return _dispatch_apply_slow(iterations, dq, work);
94 }
95 #endif
96 struct Block_basic *bb = (void *)work;
97 dispatch_apply_f(iterations, dq, bb, (void *)bb->Block_invoke);
98 }
99 #endif
100
101 // 256 threads should be good enough for the short to mid term
102 #define DISPATCH_APPLY_MAX_CPUS 256
103
104 DISPATCH_NOINLINE
105 void
106 dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
107 {
108 struct dispatch_apply_dc_s {
109 DISPATCH_CONTINUATION_HEADER(dispatch_apply_dc_s);
110 } da_dc[DISPATCH_APPLY_MAX_CPUS];
111 struct dispatch_apply_s da;
112 size_t i;
113
114 da.da_func = func;
115 da.da_ctxt = ctxt;
116 da.da_iterations = iterations;
117 da.da_index = 0;
118 da.da_thr_cnt = _dispatch_hw_config.cc_max_active;
119
120 if (da.da_thr_cnt > DISPATCH_APPLY_MAX_CPUS) {
121 da.da_thr_cnt = DISPATCH_APPLY_MAX_CPUS;
122 }
123 if (slowpath(iterations == 0)) {
124 return;
125 }
126 if (iterations < da.da_thr_cnt) {
127 da.da_thr_cnt = (uint32_t)iterations;
128 }
129 if (slowpath(dq->dq_width <= 2 || da.da_thr_cnt <= 1)) {
130 return dispatch_sync_f(dq, &da, _dispatch_apply_serial);
131 }
132
133 for (i = 0; i < da.da_thr_cnt; i++) {
134 da_dc[i].do_vtable = NULL;
135 da_dc[i].do_next = &da_dc[i + 1];
136 da_dc[i].dc_func = _dispatch_apply2;
137 da_dc[i].dc_ctxt = &da;
138 }
139
140 da.da_sema = _dispatch_get_thread_semaphore();
141
142 // some queues are easy to borrow and some are not
143 if (slowpath(dq->do_targetq)) {
144 _dispatch_queue_push_list(dq, (void *)&da_dc[0], (void *)&da_dc[da.da_thr_cnt - 1]);
145 } else {
146 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
147 // root queues are always concurrent and safe to borrow
148 _dispatch_queue_push_list(dq, (void *)&da_dc[1], (void *)&da_dc[da.da_thr_cnt - 1]);
149 _dispatch_thread_setspecific(dispatch_queue_key, dq);
150 // The first da_dc[] element was explicitly not pushed on to the queue.
151 // We need to either call it like so:
152 // da_dc[0].dc_func(da_dc[0].dc_ctxt);
153 // Or, given that we know the 'func' and 'ctxt', we can call it directly:
154 _dispatch_apply2(&da);
155 _dispatch_workitem_inc();
156 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
157 }
158 dispatch_semaphore_wait(da.da_sema, DISPATCH_TIME_FOREVER);
159 _dispatch_put_thread_semaphore(da.da_sema);
160 }
161
162 #if 0
163 #ifdef __BLOCKS__
164 void
165 dispatch_stride(size_t offset, size_t stride, size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
166 {
167 struct Block_basic *bb = (void *)work;
168 dispatch_stride_f(offset, stride, iterations, dq, bb, (void *)bb->Block_invoke);
169 }
170 #endif
171
172 DISPATCH_NOINLINE
173 void
174 dispatch_stride_f(size_t offset, size_t stride, size_t iterations,
175 dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
176 {
177 if (stride == 0) {
178 stride = 1;
179 }
180 dispatch_apply(iterations / stride, queue, ^(size_t idx) {
181 size_t i = idx * stride + offset;
182 size_t stop = i + stride;
183 do {
184 func(ctxt, i++);
185 } while (i < stop);
186 });
187
188 dispatch_sync(queue, ^{
189 size_t i;
190 for (i = iterations - (iterations % stride); i < iterations; i++) {
191 func(ctxt, i + offset);
192 }
193 });
194 }
195 #endif