]> git.saurik.com Git - apple/libdispatch.git/blob - src/semaphore.c
libdispatch-703.50.37.tar.gz
[apple/libdispatch.git] / src / semaphore.c
1 /*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 DISPATCH_WEAK // rdar://problem/8503746
24 long _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema);
25
26 #pragma mark -
27 #pragma mark dispatch_semaphore_class_t
28
29 static void
30 _dispatch_semaphore_class_init(long value, dispatch_semaphore_class_t dsemau)
31 {
32 struct dispatch_semaphore_header_s *dsema = dsemau._dsema_hdr;
33
34 dsema->do_next = DISPATCH_OBJECT_LISTLESS;
35 dsema->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
36 false);
37 dsema->dsema_value = value;
38 _os_semaphore_init(&dsema->dsema_sema, _OS_SEM_POLICY_FIFO);
39 }
40
41 #pragma mark -
42 #pragma mark dispatch_semaphore_t
43
44 dispatch_semaphore_t
45 dispatch_semaphore_create(long value)
46 {
47 dispatch_semaphore_t dsema;
48
49 // If the internal value is negative, then the absolute of the value is
50 // equal to the number of waiting threads. Therefore it is bogus to
51 // initialize the semaphore with a negative value.
52 if (value < 0) {
53 return DISPATCH_BAD_INPUT;
54 }
55
56 dsema = (dispatch_semaphore_t)_dispatch_alloc(DISPATCH_VTABLE(semaphore),
57 sizeof(struct dispatch_semaphore_s));
58 _dispatch_semaphore_class_init(value, dsema);
59 dsema->dsema_orig = value;
60 return dsema;
61 }
62
63 void
64 _dispatch_semaphore_dispose(dispatch_object_t dou)
65 {
66 dispatch_semaphore_t dsema = dou._dsema;
67
68 if (dsema->dsema_value < dsema->dsema_orig) {
69 DISPATCH_CLIENT_CRASH(dsema->dsema_orig - dsema->dsema_value,
70 "Semaphore object deallocated while in use");
71 }
72
73 _os_semaphore_dispose(&dsema->dsema_sema);
74 }
75
76 size_t
77 _dispatch_semaphore_debug(dispatch_object_t dou, char *buf, size_t bufsiz)
78 {
79 dispatch_semaphore_t dsema = dou._dsema;
80
81 size_t offset = 0;
82 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
83 dx_kind(dsema), dsema);
84 offset += _dispatch_object_debug_attr(dsema, &buf[offset], bufsiz - offset);
85 #if USE_MACH_SEM
86 offset += dsnprintf(&buf[offset], bufsiz - offset, "port = 0x%u, ",
87 dsema->dsema_sema);
88 #endif
89 offset += dsnprintf(&buf[offset], bufsiz - offset,
90 "value = %ld, orig = %ld }", dsema->dsema_value, dsema->dsema_orig);
91 return offset;
92 }
93
94 DISPATCH_NOINLINE
95 long
96 _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
97 {
98 _os_semaphore_create(&dsema->dsema_sema, _OS_SEM_POLICY_FIFO);
99 _os_semaphore_signal(&dsema->dsema_sema, 1);
100 return 1;
101 }
102
103 long
104 dispatch_semaphore_signal(dispatch_semaphore_t dsema)
105 {
106 long value = os_atomic_inc2o(dsema, dsema_value, release);
107 if (fastpath(value > 0)) {
108 return 0;
109 }
110 if (slowpath(value == LONG_MIN)) {
111 DISPATCH_CLIENT_CRASH(value,
112 "Unbalanced call to dispatch_semaphore_signal()");
113 }
114 return _dispatch_semaphore_signal_slow(dsema);
115 }
116
117 DISPATCH_NOINLINE
118 static long
119 _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
120 dispatch_time_t timeout)
121 {
122 long orig;
123
124 _os_semaphore_create(&dsema->dsema_sema, _OS_SEM_POLICY_FIFO);
125 switch (timeout) {
126 default:
127 if (!_os_semaphore_timedwait(&dsema->dsema_sema, timeout)) {
128 break;
129 }
130 // Fall through and try to undo what the fast path did to
131 // dsema->dsema_value
132 case DISPATCH_TIME_NOW:
133 orig = dsema->dsema_value;
134 while (orig < 0) {
135 if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
136 &orig, relaxed)) {
137 return _OS_SEM_TIMEOUT();
138 }
139 }
140 // Another thread called semaphore_signal().
141 // Fall through and drain the wakeup.
142 case DISPATCH_TIME_FOREVER:
143 _os_semaphore_wait(&dsema->dsema_sema);
144 break;
145 }
146 return 0;
147 }
148
149 long
150 dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
151 {
152 long value = os_atomic_dec2o(dsema, dsema_value, acquire);
153 if (fastpath(value >= 0)) {
154 return 0;
155 }
156 return _dispatch_semaphore_wait_slow(dsema, timeout);
157 }
158
159 #pragma mark -
160 #pragma mark dispatch_group_t
161
162 DISPATCH_ALWAYS_INLINE
163 static inline dispatch_group_t
164 _dispatch_group_create_with_count(long count)
165 {
166 dispatch_group_t dg = (dispatch_group_t)_dispatch_alloc(
167 DISPATCH_VTABLE(group), sizeof(struct dispatch_group_s));
168 _dispatch_semaphore_class_init(count, dg);
169 if (count) {
170 os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://problem/22318411>
171 }
172 return dg;
173 }
174
175 dispatch_group_t
176 dispatch_group_create(void)
177 {
178 return _dispatch_group_create_with_count(0);
179 }
180
181 dispatch_group_t
182 _dispatch_group_create_and_enter(void)
183 {
184 return _dispatch_group_create_with_count(1);
185 }
186
187 void
188 dispatch_group_enter(dispatch_group_t dg)
189 {
190 long value = os_atomic_inc_orig2o(dg, dg_value, acquire);
191 if (slowpath((unsigned long)value >= (unsigned long)LONG_MAX)) {
192 DISPATCH_CLIENT_CRASH(value,
193 "Too many nested calls to dispatch_group_enter()");
194 }
195 if (value == 0) {
196 _dispatch_retain(dg); // <rdar://problem/22318411>
197 }
198 }
199
200 DISPATCH_NOINLINE
201 static long
202 _dispatch_group_wake(dispatch_group_t dg, bool needs_release)
203 {
204 dispatch_continuation_t next, head, tail = NULL;
205 long rval;
206
207 // cannot use os_mpsc_capture_snapshot() because we can have concurrent
208 // _dispatch_group_wake() calls
209 head = os_atomic_xchg2o(dg, dg_notify_head, NULL, relaxed);
210 if (head) {
211 // snapshot before anything is notified/woken <rdar://problem/8554546>
212 tail = os_atomic_xchg2o(dg, dg_notify_tail, NULL, release);
213 }
214 rval = (long)os_atomic_xchg2o(dg, dg_waiters, 0, relaxed);
215 if (rval) {
216 // wake group waiters
217 _os_semaphore_create(&dg->dg_sema, _OS_SEM_POLICY_FIFO);
218 _os_semaphore_signal(&dg->dg_sema, rval);
219 }
220 if (head) {
221 // async group notify blocks
222 do {
223 next = os_mpsc_pop_snapshot_head(head, tail, do_next);
224 dispatch_queue_t dsn_queue = (dispatch_queue_t)head->dc_data;
225 _dispatch_continuation_async(dsn_queue, head);
226 _dispatch_release(dsn_queue);
227 } while ((head = next));
228 _dispatch_release(dg);
229 }
230 if (needs_release) {
231 _dispatch_release(dg); // <rdar://problem/22318411>
232 }
233 return 0;
234 }
235
236 void
237 dispatch_group_leave(dispatch_group_t dg)
238 {
239 long value = os_atomic_dec2o(dg, dg_value, release);
240 if (slowpath(value == 0)) {
241 return (void)_dispatch_group_wake(dg, true);
242 }
243 if (slowpath(value < 0)) {
244 DISPATCH_CLIENT_CRASH(value,
245 "Unbalanced call to dispatch_group_leave()");
246 }
247 }
248
249 void
250 _dispatch_group_dispose(dispatch_object_t dou)
251 {
252 dispatch_group_t dg = dou._dg;
253
254 if (dg->dg_value) {
255 DISPATCH_CLIENT_CRASH(dg->dg_value,
256 "Group object deallocated while in use");
257 }
258
259 _os_semaphore_dispose(&dg->dg_sema);
260 }
261
262 size_t
263 _dispatch_group_debug(dispatch_object_t dou, char *buf, size_t bufsiz)
264 {
265 dispatch_group_t dg = dou._dg;
266
267 size_t offset = 0;
268 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
269 dx_kind(dg), dg);
270 offset += _dispatch_object_debug_attr(dg, &buf[offset], bufsiz - offset);
271 #if USE_MACH_SEM
272 offset += dsnprintf(&buf[offset], bufsiz - offset, "port = 0x%u, ",
273 dg->dg_sema);
274 #endif
275 offset += dsnprintf(&buf[offset], bufsiz - offset,
276 "count = %ld, waiters = %d }", dg->dg_value, dg->dg_waiters);
277 return offset;
278 }
279
280 DISPATCH_NOINLINE
281 static long
282 _dispatch_group_wait_slow(dispatch_group_t dg, dispatch_time_t timeout)
283 {
284 long value;
285 int orig_waiters;
286
287 // check before we cause another signal to be sent by incrementing
288 // dg->dg_waiters
289 value = os_atomic_load2o(dg, dg_value, ordered); // 19296565
290 if (value == 0) {
291 return _dispatch_group_wake(dg, false);
292 }
293
294 (void)os_atomic_inc2o(dg, dg_waiters, relaxed);
295 // check the values again in case we need to wake any threads
296 value = os_atomic_load2o(dg, dg_value, ordered); // 19296565
297 if (value == 0) {
298 _dispatch_group_wake(dg, false);
299 // Fall through to consume the extra signal, forcing timeout to avoid
300 // useless setups as it won't block
301 timeout = DISPATCH_TIME_FOREVER;
302 }
303
304 _os_semaphore_create(&dg->dg_sema, _OS_SEM_POLICY_FIFO);
305 switch (timeout) {
306 default:
307 if (!_os_semaphore_timedwait(&dg->dg_sema, timeout)) {
308 break;
309 }
310 // Fall through and try to undo the earlier change to
311 // dg->dg_waiters
312 case DISPATCH_TIME_NOW:
313 orig_waiters = dg->dg_waiters;
314 while (orig_waiters) {
315 if (os_atomic_cmpxchgvw2o(dg, dg_waiters, orig_waiters,
316 orig_waiters - 1, &orig_waiters, relaxed)) {
317 return _OS_SEM_TIMEOUT();
318 }
319 }
320 // Another thread is running _dispatch_group_wake()
321 // Fall through and drain the wakeup.
322 case DISPATCH_TIME_FOREVER:
323 _os_semaphore_wait(&dg->dg_sema);
324 break;
325 }
326 return 0;
327 }
328
329 long
330 dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
331 {
332 if (dg->dg_value == 0) {
333 return 0;
334 }
335 if (timeout == 0) {
336 return _OS_SEM_TIMEOUT();
337 }
338 return _dispatch_group_wait_slow(dg, timeout);
339 }
340
341 DISPATCH_ALWAYS_INLINE
342 static inline void
343 _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
344 dispatch_continuation_t dsn)
345 {
346 dsn->dc_data = dq;
347 dsn->do_next = NULL;
348 _dispatch_retain(dq);
349 if (os_mpsc_push_update_tail(dg, dg_notify, dsn, do_next)) {
350 _dispatch_retain(dg);
351 os_atomic_store2o(dg, dg_notify_head, dsn, ordered);
352 // seq_cst with atomic store to notify_head <rdar://problem/11750916>
353 if (os_atomic_load2o(dg, dg_value, ordered) == 0) {
354 _dispatch_group_wake(dg, false);
355 }
356 }
357 }
358
359 DISPATCH_NOINLINE
360 void
361 dispatch_group_notify_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt,
362 dispatch_function_t func)
363 {
364 dispatch_continuation_t dsn = _dispatch_continuation_alloc();
365 _dispatch_continuation_init_f(dsn, dq, ctxt, func, 0, 0,
366 DISPATCH_OBJ_CONSUME_BIT);
367 _dispatch_group_notify(dg, dq, dsn);
368 }
369
370 #ifdef __BLOCKS__
371 void
372 dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
373 dispatch_block_t db)
374 {
375 dispatch_continuation_t dsn = _dispatch_continuation_alloc();
376 _dispatch_continuation_init(dsn, dq, db, 0, 0, DISPATCH_OBJ_CONSUME_BIT);
377 _dispatch_group_notify(dg, dq, dsn);
378 }
379 #endif