]> git.saurik.com Git - apple/libdispatch.git/blob - src/semaphore.c
libdispatch-84.5.5.tar.gz
[apple/libdispatch.git] / src / semaphore.c
1 /*
2 * Copyright (c) 2008-2009 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 // semaphores are too fundamental to use the dispatch_assume*() macros
24 #define DISPATCH_SEMAPHORE_VERIFY_KR(x) do { \
25 if (x) { \
26 DISPATCH_CRASH("flawed group/semaphore logic"); \
27 } \
28 } while (0)
29
30 struct dispatch_semaphore_vtable_s {
31 DISPATCH_VTABLE_HEADER(dispatch_semaphore_s);
32 };
33
34 static void _dispatch_semaphore_dispose(dispatch_semaphore_t dsema);
35 static size_t _dispatch_semaphore_debug(dispatch_semaphore_t dsema, char *buf, size_t bufsiz);
36 static long _dispatch_group_wake(dispatch_semaphore_t dsema);
37
38 const struct dispatch_semaphore_vtable_s _dispatch_semaphore_vtable = {
39 .do_type = DISPATCH_SEMAPHORE_TYPE,
40 .do_kind = "semaphore",
41 .do_dispose = _dispatch_semaphore_dispose,
42 .do_debug = _dispatch_semaphore_debug,
43 };
44
45 dispatch_semaphore_t
46 _dispatch_get_thread_semaphore(void)
47 {
48 dispatch_semaphore_t dsema;
49
50 dsema = fastpath(_dispatch_thread_getspecific(dispatch_sema4_key));
51 if (!dsema) {
52 while (!(dsema = dispatch_semaphore_create(0))) {
53 sleep(1);
54 }
55 }
56 _dispatch_thread_setspecific(dispatch_sema4_key, NULL);
57 return dsema;
58 }
59
60 void
61 _dispatch_put_thread_semaphore(dispatch_semaphore_t dsema)
62 {
63 dispatch_semaphore_t old_sema = _dispatch_thread_getspecific(dispatch_sema4_key);
64 _dispatch_thread_setspecific(dispatch_sema4_key, dsema);
65 if (old_sema) {
66 dispatch_release(old_sema);
67 }
68 }
69
70 dispatch_group_t
71 dispatch_group_create(void)
72 {
73 return (dispatch_group_t)dispatch_semaphore_create(LONG_MAX);
74 }
75
76 dispatch_semaphore_t
77 dispatch_semaphore_create(long value)
78 {
79 dispatch_semaphore_t dsema;
80
81 // If the internal value is negative, then the absolute of the value is
82 // equal to the number of waiting threads. Therefore it is bogus to
83 // initialize the semaphore with a negative value.
84 if (value < 0) {
85 return NULL;
86 }
87
88 dsema = calloc(1, sizeof(struct dispatch_semaphore_s));
89
90 if (fastpath(dsema)) {
91 dsema->do_vtable = &_dispatch_semaphore_vtable;
92 dsema->do_next = DISPATCH_OBJECT_LISTLESS;
93 dsema->do_ref_cnt = 1;
94 dsema->do_xref_cnt = 1;
95 dsema->do_targetq = dispatch_get_global_queue(0, 0);
96 dsema->dsema_value = value;
97 dsema->dsema_orig = value;
98 }
99
100 return dsema;
101 }
102
103 static void
104 _dispatch_semaphore_create_port(semaphore_t *s4)
105 {
106 kern_return_t kr;
107 semaphore_t tmp;
108
109 if (*s4) {
110 return;
111 }
112
113 // lazily allocate the semaphore port
114
115 // Someday:
116 // 1) Switch to a doubly-linked FIFO in user-space.
117 // 2) User-space timers for the timeout.
118 // 3) Use the per-thread semaphore port.
119
120 while (dispatch_assume_zero(kr = semaphore_create(mach_task_self(), &tmp, SYNC_POLICY_FIFO, 0))) {
121 DISPATCH_VERIFY_MIG(kr);
122 sleep(1);
123 }
124
125 if (!dispatch_atomic_cmpxchg(s4, 0, tmp)) {
126 kr = semaphore_destroy(mach_task_self(), tmp);
127 DISPATCH_SEMAPHORE_VERIFY_KR(kr);
128 }
129
130 _dispatch_safe_fork = false;
131 }
132
133 DISPATCH_NOINLINE
134 static long
135 _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout)
136 {
137 mach_timespec_t _timeout;
138 kern_return_t kr;
139 uint64_t nsec;
140 long orig;
141
142 again:
143 // Mach semaphores appear to sometimes spuriously wake up. Therefore,
144 // we keep a parallel count of the number of times a Mach semaphore is
145 // signaled.
146 while ((orig = dsema->dsema_sent_ksignals)) {
147 if (dispatch_atomic_cmpxchg(&dsema->dsema_sent_ksignals, orig, orig - 1)) {
148 return 0;
149 }
150 }
151
152 _dispatch_semaphore_create_port(&dsema->dsema_port);
153
154 // From xnu/osfmk/kern/sync_sema.c:
155 // wait_semaphore->count = -1; /* we don't keep an actual count */
156 //
157 // The code above does not match the documentation, and that fact is
158 // not surprising. The documented semantics are clumsy to use in any
159 // practical way. The above hack effectively tricks the rest of the
160 // Mach semaphore logic to behave like the libdispatch algorithm.
161
162 switch (timeout) {
163 default:
164 do {
165 // timeout() already calculates relative time left
166 nsec = _dispatch_timeout(timeout);
167 _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
168 _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
169 kr = slowpath(semaphore_timedwait(dsema->dsema_port, _timeout));
170 } while (kr == KERN_ABORTED);
171
172 if (kr != KERN_OPERATION_TIMED_OUT) {
173 DISPATCH_SEMAPHORE_VERIFY_KR(kr);
174 break;
175 }
176 // Fall through and try to undo what the fast path did to dsema->dsema_value
177 case DISPATCH_TIME_NOW:
178 while ((orig = dsema->dsema_value) < 0) {
179 if (dispatch_atomic_cmpxchg(&dsema->dsema_value, orig, orig + 1)) {
180 return KERN_OPERATION_TIMED_OUT;
181 }
182 }
183 // Another thread called semaphore_signal().
184 // Fall through and drain the wakeup.
185 case DISPATCH_TIME_FOREVER:
186 do {
187 kr = semaphore_wait(dsema->dsema_port);
188 } while (kr == KERN_ABORTED);
189 DISPATCH_SEMAPHORE_VERIFY_KR(kr);
190 break;
191 }
192
193 goto again;
194 }
195
196 DISPATCH_NOINLINE
197 void
198 dispatch_group_enter(dispatch_group_t dg)
199 {
200 dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
201 #if defined(__OPTIMIZE__) && defined(__GNUC__) && (defined(__x86_64__) || defined(__i386__))
202 // This assumes:
203 // 1) Way too much about the optimizer of GCC.
204 // 2) There will never be more than LONG_MAX threads.
205 // Therefore: no overflow detection
206 asm(
207 #ifdef __LP64__
208 "lock decq %0\n\t"
209 #else
210 "lock decl %0\n\t"
211 #endif
212 "js 1f\n\t"
213 "xor %%eax, %%eax\n\t"
214 "ret\n\t"
215 "1:"
216 : "+m" (dsema->dsema_value)
217 :
218 : "cc"
219 );
220 _dispatch_semaphore_wait_slow(dsema, DISPATCH_TIME_FOREVER);
221 #else
222 dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
223 #endif
224 }
225
226 DISPATCH_NOINLINE
227 long
228 dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
229 {
230 #if defined(__OPTIMIZE__) && defined(__GNUC__) && (defined(__x86_64__) || defined(__i386__))
231 // This assumes:
232 // 1) Way too much about the optimizer of GCC.
233 // 2) There will never be more than LONG_MAX threads.
234 // Therefore: no overflow detection
235 asm(
236 #ifdef __LP64__
237 "lock decq %0\n\t"
238 #else
239 "lock decl %0\n\t"
240 #endif
241 "js 1f\n\t"
242 "xor %%eax, %%eax\n\t"
243 "ret\n\t"
244 "1:"
245 : "+m" (dsema->dsema_value)
246 :
247 : "cc"
248 );
249 #else
250 if (dispatch_atomic_dec(&dsema->dsema_value) >= 0) {
251 return 0;
252 }
253 #endif
254 return _dispatch_semaphore_wait_slow(dsema, timeout);
255 }
256
257 DISPATCH_NOINLINE
258 static long
259 _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
260 {
261 kern_return_t kr;
262
263 _dispatch_semaphore_create_port(&dsema->dsema_port);
264
265 // Before dsema_sent_ksignals is incremented we can rely on the reference
266 // held by the waiter. However, once this value is incremented the waiter
267 // may return between the atomic increment and the semaphore_signal(),
268 // therefore an explicit reference must be held in order to safely access
269 // dsema after the atomic increment.
270 _dispatch_retain(dsema);
271
272 dispatch_atomic_inc(&dsema->dsema_sent_ksignals);
273
274 kr = semaphore_signal(dsema->dsema_port);
275 DISPATCH_SEMAPHORE_VERIFY_KR(kr);
276
277 _dispatch_release(dsema);
278
279 return 1;
280 }
281
282 void
283 dispatch_group_leave(dispatch_group_t dg)
284 {
285 dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
286
287 dispatch_semaphore_signal(dsema);
288
289 if (dsema->dsema_value == dsema->dsema_orig) {
290 _dispatch_group_wake(dsema);
291 }
292 }
293
294 DISPATCH_NOINLINE
295 long
296 dispatch_semaphore_signal(dispatch_semaphore_t dsema)
297 {
298 #if defined(__OPTIMIZE__) && defined(__GNUC__) && (defined(__x86_64__) || defined(__i386__))
299 // overflow detection
300 // this assumes way too much about the optimizer of GCC
301 asm(
302 #ifdef __LP64__
303 "lock incq %0\n\t"
304 #else
305 "lock incl %0\n\t"
306 #endif
307 "jo 1f\n\t"
308 "jle 2f\n\t"
309 "xor %%eax, %%eax\n\t"
310 "ret\n\t"
311 "1:\n\t"
312 "int $4\n\t"
313 "2:"
314 : "+m" (dsema->dsema_value)
315 :
316 : "cc"
317 );
318 #else
319 if (dispatch_atomic_inc(&dsema->dsema_value) > 0) {
320 return 0;
321 }
322 #endif
323 return _dispatch_semaphore_signal_slow(dsema);
324 }
325
326 DISPATCH_NOINLINE
327 long
328 _dispatch_group_wake(dispatch_semaphore_t dsema)
329 {
330 struct dispatch_sema_notify_s *tmp, *head = dispatch_atomic_xchg(&dsema->dsema_notify_head, NULL);
331 long rval = dispatch_atomic_xchg(&dsema->dsema_group_waiters, 0);
332 bool do_rel = head;
333 long kr;
334
335 // wake any "group" waiter or notify blocks
336
337 if (rval) {
338 _dispatch_semaphore_create_port(&dsema->dsema_waiter_port);
339 do {
340 kr = semaphore_signal(dsema->dsema_waiter_port);
341 DISPATCH_SEMAPHORE_VERIFY_KR(kr);
342 } while (--rval);
343 }
344 while (head) {
345 dispatch_async_f(head->dsn_queue, head->dsn_ctxt, head->dsn_func);
346 _dispatch_release(head->dsn_queue);
347 do {
348 tmp = head->dsn_next;
349 } while (!tmp && !dispatch_atomic_cmpxchg(&dsema->dsema_notify_tail, head, NULL));
350 free(head);
351 head = tmp;
352 }
353 if (do_rel) {
354 _dispatch_release(dsema);
355 }
356 return 0;
357 }
358
359 DISPATCH_NOINLINE
360 static long
361 _dispatch_group_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout)
362 {
363 mach_timespec_t _timeout;
364 kern_return_t kr;
365 uint64_t nsec;
366 long orig;
367
368 again:
369 // check before we cause another signal to be sent by incrementing dsema->dsema_group_waiters
370 if (dsema->dsema_value == dsema->dsema_orig) {
371 return _dispatch_group_wake(dsema);
372 }
373 // Mach semaphores appear to sometimes spuriously wake up. Therefore,
374 // we keep a parallel count of the number of times a Mach semaphore is
375 // signaled.
376 dispatch_atomic_inc(&dsema->dsema_group_waiters);
377 // check the values again in case we need to wake any threads
378 if (dsema->dsema_value == dsema->dsema_orig) {
379 return _dispatch_group_wake(dsema);
380 }
381
382 _dispatch_semaphore_create_port(&dsema->dsema_waiter_port);
383
384 // From xnu/osfmk/kern/sync_sema.c:
385 // wait_semaphore->count = -1; /* we don't keep an actual count */
386 //
387 // The code above does not match the documentation, and that fact is
388 // not surprising. The documented semantics are clumsy to use in any
389 // practical way. The above hack effectively tricks the rest of the
390 // Mach semaphore logic to behave like the libdispatch algorithm.
391
392 switch (timeout) {
393 default:
394 do {
395 nsec = _dispatch_timeout(timeout);
396 _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
397 _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
398 kr = slowpath(semaphore_timedwait(dsema->dsema_waiter_port, _timeout));
399 } while (kr == KERN_ABORTED);
400 if (kr != KERN_OPERATION_TIMED_OUT) {
401 DISPATCH_SEMAPHORE_VERIFY_KR(kr);
402 break;
403 }
404 // Fall through and try to undo the earlier change to dsema->dsema_group_waiters
405 case DISPATCH_TIME_NOW:
406 while ((orig = dsema->dsema_group_waiters)) {
407 if (dispatch_atomic_cmpxchg(&dsema->dsema_group_waiters, orig, orig - 1)) {
408 return KERN_OPERATION_TIMED_OUT;
409 }
410 }
411 // Another thread called semaphore_signal().
412 // Fall through and drain the wakeup.
413 case DISPATCH_TIME_FOREVER:
414 do {
415 kr = semaphore_wait(dsema->dsema_waiter_port);
416 } while (kr == KERN_ABORTED);
417 DISPATCH_SEMAPHORE_VERIFY_KR(kr);
418 break;
419 }
420
421 goto again;
422 }
423
424 long
425 dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
426 {
427 dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
428
429 if (dsema->dsema_value == dsema->dsema_orig) {
430 return 0;
431 }
432 if (timeout == 0) {
433 return KERN_OPERATION_TIMED_OUT;
434 }
435 return _dispatch_group_wait_slow(dsema, timeout);
436 }
437
438 #ifdef __BLOCKS__
439 void
440 dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db)
441 {
442 dispatch_group_notify_f(dg, dq, _dispatch_Block_copy(db), _dispatch_call_block_and_release);
443 }
444 #endif
445
446 void
447 dispatch_group_notify_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, void (*func)(void *))
448 {
449 dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
450 struct dispatch_sema_notify_s *dsn, *prev;
451
452 // FIXME -- this should be updated to use the continuation cache
453 while (!(dsn = malloc(sizeof(*dsn)))) {
454 sleep(1);
455 }
456
457 dsn->dsn_next = NULL;
458 dsn->dsn_queue = dq;
459 dsn->dsn_ctxt = ctxt;
460 dsn->dsn_func = func;
461 _dispatch_retain(dq);
462
463 prev = dispatch_atomic_xchg(&dsema->dsema_notify_tail, dsn);
464 if (fastpath(prev)) {
465 prev->dsn_next = dsn;
466 } else {
467 _dispatch_retain(dg);
468 dsema->dsema_notify_head = dsn;
469 if (dsema->dsema_value == dsema->dsema_orig) {
470 _dispatch_group_wake(dsema);
471 }
472 }
473 }
474
475 void
476 _dispatch_semaphore_dispose(dispatch_semaphore_t dsema)
477 {
478 kern_return_t kr;
479
480 if (dsema->dsema_value < dsema->dsema_orig) {
481 DISPATCH_CLIENT_CRASH("Semaphore/group object deallocated while in use");
482 }
483
484 if (dsema->dsema_port) {
485 kr = semaphore_destroy(mach_task_self(), dsema->dsema_port);
486 DISPATCH_SEMAPHORE_VERIFY_KR(kr);
487 }
488 if (dsema->dsema_waiter_port) {
489 kr = semaphore_destroy(mach_task_self(), dsema->dsema_waiter_port);
490 DISPATCH_SEMAPHORE_VERIFY_KR(kr);
491 }
492
493 _dispatch_dispose(dsema);
494 }
495
496 size_t
497 _dispatch_semaphore_debug(dispatch_semaphore_t dsema, char *buf, size_t bufsiz)
498 {
499 size_t offset = 0;
500 offset += snprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", dx_kind(dsema), dsema);
501 offset += dispatch_object_debug_attr(dsema, &buf[offset], bufsiz - offset);
502 offset += snprintf(&buf[offset], bufsiz - offset, "port = 0x%u, value = %ld, orig = %ld }",
503 dsema->dsema_port, dsema->dsema_value, dsema->dsema_orig);
504 return offset;
505 }
506
507 #ifdef __BLOCKS__
508 void
509 dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db)
510 {
511 dispatch_group_async_f(dg, dq, _dispatch_Block_copy(db), _dispatch_call_block_and_release);
512 }
513 #endif
514
515 DISPATCH_NOINLINE
516 void
517 dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, void (*func)(void *))
518 {
519 dispatch_continuation_t dc;
520
521 _dispatch_retain(dg);
522 dispatch_group_enter(dg);
523
524 dc = _dispatch_continuation_alloc_cacheonly() ?: _dispatch_continuation_alloc_from_heap();
525
526 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT|DISPATCH_OBJ_GROUP_BIT);
527 dc->dc_func = func;
528 dc->dc_ctxt = ctxt;
529 dc->dc_group = dg;
530
531 _dispatch_queue_push(dq, dc);
532 }