]> git.saurik.com Git - apple/libdispatch.git/blob - src/apply.c
libdispatch-84.5.tar.gz
[apple/libdispatch.git] / src / apply.c
1 /*
2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20 #include "internal.h"
21
22 // We'd use __attribute__((aligned(x))), but it does not atually increase the
23 // alignment of stack variables. All we really need is the stack usage of the
24 // local thread to be sufficiently away to avoid cache-line contention with the
25 // busy 'da_index' variable.
26 //
27 // NOTE: 'char' arrays cause GCC to insert buffer overflow detection logic
28 struct dispatch_apply_s {
29 long _da_pad0[DISPATCH_CACHELINE_SIZE / sizeof(long)];
30 void (*da_func)(void *, size_t);
31 void *da_ctxt;
32 size_t da_iterations;
33 size_t da_index;
34 uint32_t da_thr_cnt;
35 dispatch_semaphore_t da_sema;
36 long _da_pad1[DISPATCH_CACHELINE_SIZE / sizeof(long)];
37 };
38
39 static void
40 _dispatch_apply2(void *_ctxt)
41 {
42 struct dispatch_apply_s *da = _ctxt;
43 size_t const iter = da->da_iterations;
44 typeof(da->da_func) const func = da->da_func;
45 void *const ctxt = da->da_ctxt;
46 size_t idx;
47
48 _dispatch_workitem_dec(); // this unit executes many items
49
50 // Striding is the responsibility of the caller.
51 while (fastpath((idx = dispatch_atomic_inc(&da->da_index) - 1) < iter)) {
52 func(ctxt, idx);
53 _dispatch_workitem_inc();
54 }
55
56 if (dispatch_atomic_dec(&da->da_thr_cnt) == 0) {
57 dispatch_semaphore_signal(da->da_sema);
58 }
59 }
60
61 static void
62 _dispatch_apply_serial(void *context)
63 {
64 struct dispatch_apply_s *da = context;
65 size_t idx = 0;
66
67 _dispatch_workitem_dec(); // this unit executes many items
68 do {
69 da->da_func(da->da_ctxt, idx);
70 _dispatch_workitem_inc();
71 } while (++idx < da->da_iterations);
72 }
73
74 #ifdef __BLOCKS__
75 void
76 dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
77 {
78 struct Block_basic *bb = (void *)work;
79
80 dispatch_apply_f(iterations, dq, bb, (void *)bb->Block_invoke);
81 }
82 #endif
83
84 // 256 threads should be good enough for the short to mid term
85 #define DISPATCH_APPLY_MAX_CPUS 256
86
87 DISPATCH_NOINLINE
88 void
89 dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
90 {
91 struct dispatch_apply_dc_s {
92 DISPATCH_CONTINUATION_HEADER(dispatch_apply_dc_s);
93 } da_dc[DISPATCH_APPLY_MAX_CPUS];
94 struct dispatch_apply_s da;
95 size_t i;
96
97 da.da_func = func;
98 da.da_ctxt = ctxt;
99 da.da_iterations = iterations;
100 da.da_index = 0;
101 da.da_thr_cnt = _dispatch_hw_config.cc_max_active;
102
103 if (da.da_thr_cnt > DISPATCH_APPLY_MAX_CPUS) {
104 da.da_thr_cnt = DISPATCH_APPLY_MAX_CPUS;
105 }
106 if (slowpath(iterations == 0)) {
107 return;
108 }
109 if (iterations < da.da_thr_cnt) {
110 da.da_thr_cnt = (uint32_t)iterations;
111 }
112 if (slowpath(dq->dq_width <= 2 || da.da_thr_cnt <= 1)) {
113 return dispatch_sync_f(dq, &da, _dispatch_apply_serial);
114 }
115
116 for (i = 0; i < da.da_thr_cnt; i++) {
117 da_dc[i].do_vtable = NULL;
118 da_dc[i].do_next = &da_dc[i + 1];
119 da_dc[i].dc_func = _dispatch_apply2;
120 da_dc[i].dc_ctxt = &da;
121 }
122
123 da.da_sema = _dispatch_get_thread_semaphore();
124
125 // some queues are easy to borrow and some are not
126 if (slowpath(dq->do_targetq)) {
127 _dispatch_queue_push_list(dq, (void *)&da_dc[0], (void *)&da_dc[da.da_thr_cnt - 1]);
128 } else {
129 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
130 // root queues are always concurrent and safe to borrow
131 _dispatch_queue_push_list(dq, (void *)&da_dc[1], (void *)&da_dc[da.da_thr_cnt - 1]);
132 _dispatch_thread_setspecific(dispatch_queue_key, dq);
133 // The first da_dc[] element was explicitly not pushed on to the queue.
134 // We need to either call it like so:
135 // da_dc[0].dc_func(da_dc[0].dc_ctxt);
136 // Or, given that we know the 'func' and 'ctxt', we can call it directly:
137 _dispatch_apply2(&da);
138 _dispatch_workitem_inc();
139 _dispatch_thread_setspecific(dispatch_queue_key, old_dq);
140 }
141 dispatch_semaphore_wait(da.da_sema, DISPATCH_TIME_FOREVER);
142 _dispatch_put_thread_semaphore(da.da_sema);
143 }
144
145 #if 0
146 #ifdef __BLOCKS__
147 void
148 dispatch_stride(size_t offset, size_t stride, size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
149 {
150 struct Block_basic *bb = (void *)work;
151 dispatch_stride_f(offset, stride, iterations, dq, bb, (void *)bb->Block_invoke);
152 }
153 #endif
154
155 DISPATCH_NOINLINE
156 void
157 dispatch_stride_f(size_t offset, size_t stride, size_t iterations,
158 dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
159 {
160 if (stride == 0) {
161 stride = 1;
162 }
163 dispatch_apply(iterations / stride, queue, ^(size_t idx) {
164 size_t i = idx * stride + offset;
165 size_t stop = i + stride;
166 do {
167 func(ctxt, i++);
168 } while (i < stop);
169 });
170
171 dispatch_sync(queue, ^{
172 size_t i;
173 for (i = iterations - (iterations % stride); i < iterations; i++) {
174 func(ctxt, i + offset);
175 }
176 });
177 }
178 #endif