2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 #ifndef DISPATCH_IO_DEBUG
24 #define DISPATCH_IO_DEBUG DISPATCH_DEBUG
28 #define _dispatch_fd_debug(msg, fd, args...) \
29 _dispatch_debug("fd[0x%x]: " msg, (fd), ##args)
31 #define _dispatch_fd_debug(msg, fd, args...)
35 #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
36 #define _dispatch_io_data_release(x) _dispatch_objc_release(x)
38 #define _dispatch_io_data_retain(x) dispatch_retain(x)
39 #define _dispatch_io_data_release(x) dispatch_release(x)
42 typedef void (^dispatch_fd_entry_init_callback_t
)(dispatch_fd_entry_t fd_entry
);
44 DISPATCH_EXPORT DISPATCH_NOTHROW
45 void _dispatch_iocntl(uint32_t param
, uint64_t value
);
47 static dispatch_operation_t
_dispatch_operation_create(
48 dispatch_op_direction_t direction
, dispatch_io_t channel
, off_t offset
,
49 size_t length
, dispatch_data_t data
, dispatch_queue_t queue
,
50 dispatch_io_handler_t handler
);
51 static void _dispatch_operation_enqueue(dispatch_operation_t op
,
52 dispatch_op_direction_t direction
, dispatch_data_t data
);
53 static dispatch_source_t
_dispatch_operation_timer(dispatch_queue_t tq
,
54 dispatch_operation_t op
);
55 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
);
56 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
);
57 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd
,
58 dispatch_fd_entry_init_callback_t completion_callback
);
59 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
,
61 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_path(
62 dispatch_io_path_data_t path_data
, dev_t dev
, mode_t mode
);
63 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
,
64 dispatch_io_t channel
);
65 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
,
66 dispatch_io_t channel
);
67 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry
,
69 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
,
70 dispatch_op_direction_t direction
);
71 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
);
72 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream
,
73 dispatch_operation_t operation
, dispatch_data_t data
);
74 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk
,
75 dispatch_operation_t operation
, dispatch_data_t data
);
76 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream
,
77 dispatch_io_t channel
);
78 static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk
,
79 dispatch_io_t channel
);
80 static void _dispatch_stream_source_handler(void *ctx
);
81 static void _dispatch_stream_queue_handler(void *ctx
);
82 static void _dispatch_stream_handler(void *ctx
);
83 static void _dispatch_disk_handler(void *ctx
);
84 static void _dispatch_disk_perform(void *ctxt
);
85 static void _dispatch_operation_advise(dispatch_operation_t op
,
87 static int _dispatch_operation_perform(dispatch_operation_t op
);
88 static void _dispatch_operation_deliver_data(dispatch_operation_t op
,
89 dispatch_op_flags_t flags
);
91 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
92 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
93 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
94 case EINTR: continue; \
99 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
100 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
105 #define _dispatch_io_syscall(__syscall) do { int __err; \
106 _dispatch_io_syscall_switch(__err, __syscall); \
110 DISPATCH_OP_COMPLETE
= 1,
112 DISPATCH_OP_DELIVER_AND_COMPLETE
,
113 DISPATCH_OP_COMPLETE_RESUME
,
119 #define _dispatch_io_Block_copy(x) \
120 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
123 #pragma mark dispatch_io_hashtables
125 #if TARGET_OS_EMBEDDED
126 #define DIO_HASH_SIZE 64u // must be a power of two
128 #define DIO_HASH_SIZE 256u // must be a power of two
130 #define DIO_HASH(x) ((uintptr_t)(x) & (DIO_HASH_SIZE - 1))
132 // Global hashtable of dev_t -> disk_s mappings
133 DISPATCH_CACHELINE_ALIGN
134 static TAILQ_HEAD(, dispatch_disk_s
) _dispatch_io_devs
[DIO_HASH_SIZE
];
135 // Global hashtable of fd -> fd_entry_s mappings
136 DISPATCH_CACHELINE_ALIGN
137 static TAILQ_HEAD(, dispatch_fd_entry_s
) _dispatch_io_fds
[DIO_HASH_SIZE
];
139 static dispatch_once_t _dispatch_io_devs_lockq_pred
;
140 static dispatch_queue_t _dispatch_io_devs_lockq
;
141 static dispatch_queue_t _dispatch_io_fds_lockq
;
144 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED
)
146 _dispatch_io_fds_lockq
= dispatch_queue_create(
147 "com.apple.libdispatch-io.fd_lockq", NULL
);
149 for (i
= 0; i
< DIO_HASH_SIZE
; i
++) {
150 TAILQ_INIT(&_dispatch_io_fds
[i
]);
155 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED
)
157 _dispatch_io_devs_lockq
= dispatch_queue_create(
158 "com.apple.libdispatch-io.dev_lockq", NULL
);
160 for (i
= 0; i
< DIO_HASH_SIZE
; i
++) {
161 TAILQ_INIT(&_dispatch_io_devs
[i
]);
166 #pragma mark dispatch_io_defaults
169 DISPATCH_IOCNTL_CHUNK_PAGES
= 1,
170 DISPATCH_IOCNTL_LOW_WATER_CHUNKS
,
171 DISPATCH_IOCNTL_INITIAL_DELIVERY
,
172 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
,
175 static struct dispatch_io_defaults_s
{
176 size_t chunk_pages
, low_water_chunks
, max_pending_io_reqs
;
177 bool initial_delivery
;
178 } dispatch_io_defaults
= {
179 .chunk_pages
= DIO_MAX_CHUNK_PAGES
,
180 .low_water_chunks
= DIO_DEFAULT_LOW_WATER_CHUNKS
,
181 .max_pending_io_reqs
= DIO_MAX_PENDING_IO_REQS
,
184 #define _dispatch_iocntl_set_default(p, v) do { \
185 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
189 _dispatch_iocntl(uint32_t param
, uint64_t value
)
192 case DISPATCH_IOCNTL_CHUNK_PAGES
:
193 _dispatch_iocntl_set_default(chunk_pages
, value
);
195 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS
:
196 _dispatch_iocntl_set_default(low_water_chunks
, value
);
198 case DISPATCH_IOCNTL_INITIAL_DELIVERY
:
199 _dispatch_iocntl_set_default(initial_delivery
, value
);
200 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
:
201 _dispatch_iocntl_set_default(max_pending_io_reqs
, value
);
207 #pragma mark dispatch_io_t
210 _dispatch_io_create(dispatch_io_type_t type
)
212 dispatch_io_t channel
= _dispatch_alloc(DISPATCH_VTABLE(io
),
213 sizeof(struct dispatch_io_s
));
214 channel
->do_next
= DISPATCH_OBJECT_LISTLESS
;
215 channel
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
217 channel
->params
.type
= type
;
218 channel
->params
.high
= SIZE_MAX
;
219 channel
->params
.low
= dispatch_io_defaults
.low_water_chunks
*
220 dispatch_io_defaults
.chunk_pages
* PAGE_SIZE
;
221 channel
->queue
= dispatch_queue_create("com.apple.libdispatch-io.channelq",
227 _dispatch_io_init(dispatch_io_t channel
, dispatch_fd_entry_t fd_entry
,
228 dispatch_queue_t queue
, int err
, void (^cleanup_handler
)(int))
230 // Enqueue the cleanup handler on the suspended close queue
231 if (cleanup_handler
) {
232 _dispatch_retain(queue
);
233 dispatch_async(!err
? fd_entry
->close_queue
: channel
->queue
, ^{
234 dispatch_async(queue
, ^{
235 _dispatch_fd_debug("cleanup handler invoke", -1);
236 cleanup_handler(err
);
238 _dispatch_release(queue
);
242 channel
->fd_entry
= fd_entry
;
243 dispatch_retain(fd_entry
->barrier_queue
);
244 dispatch_retain(fd_entry
->barrier_group
);
245 channel
->barrier_queue
= fd_entry
->barrier_queue
;
246 channel
->barrier_group
= fd_entry
->barrier_group
;
248 // Still need to create a barrier queue, since all operations go
250 channel
->barrier_queue
= dispatch_queue_create(
251 "com.apple.libdispatch-io.barrierq", NULL
);
252 channel
->barrier_group
= dispatch_group_create();
257 _dispatch_io_dispose(dispatch_io_t channel
)
259 _dispatch_object_debug(channel
, "%s", __func__
);
260 if (channel
->fd_entry
&&
261 !(channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
))) {
262 if (channel
->fd_entry
->path_data
) {
263 // This modification is safe since path_data->channel is checked
264 // only on close_queue (which is still suspended at this point)
265 channel
->fd_entry
->path_data
->channel
= NULL
;
267 // Cleanup handlers will only run when all channels related to this
269 _dispatch_fd_entry_release(channel
->fd_entry
);
271 if (channel
->queue
) {
272 dispatch_release(channel
->queue
);
274 if (channel
->barrier_queue
) {
275 dispatch_release(channel
->barrier_queue
);
277 if (channel
->barrier_group
) {
278 dispatch_release(channel
->barrier_group
);
283 _dispatch_io_validate_type(dispatch_io_t channel
, mode_t mode
)
288 } else if (channel
->params
.type
== DISPATCH_IO_RANDOM
&&
289 (S_ISFIFO(mode
) || S_ISSOCK(mode
))) {
296 _dispatch_io_get_error(dispatch_operation_t op
, dispatch_io_t channel
,
302 channel
= op
->channel
;
304 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
305 if (!ignore_closed
|| channel
->atomic_flags
& DIO_STOPPED
) {
311 err
= op
? op
->fd_entry
->err
: channel
->err
;
317 #pragma mark dispatch_io_channels
320 dispatch_io_create(dispatch_io_type_t type
, dispatch_fd_t fd
,
321 dispatch_queue_t queue
, void (^cleanup_handler
)(int))
323 if (type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) {
326 _dispatch_fd_debug("io create", fd
);
327 dispatch_io_t channel
= _dispatch_io_create(type
);
329 channel
->fd_actual
= fd
;
330 dispatch_suspend(channel
->queue
);
331 _dispatch_retain(queue
);
332 _dispatch_retain(channel
);
333 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
335 int err
= fd_entry
->err
;
337 err
= _dispatch_io_validate_type(channel
, fd_entry
->stat
.mode
);
339 if (!err
&& type
== DISPATCH_IO_RANDOM
) {
341 _dispatch_io_syscall_switch_noerr(err
,
342 f_ptr
= lseek(fd_entry
->fd
, 0, SEEK_CUR
),
343 case 0: channel
->f_ptr
= f_ptr
; break;
344 default: (void)dispatch_assume_zero(err
); break;
348 _dispatch_fd_entry_retain(fd_entry
);
349 _dispatch_io_init(channel
, fd_entry
, queue
, err
, cleanup_handler
);
350 dispatch_resume(channel
->queue
);
351 _dispatch_object_debug(channel
, "%s", __func__
);
352 _dispatch_release(channel
);
353 _dispatch_release(queue
);
355 _dispatch_object_debug(channel
, "%s", __func__
);
360 dispatch_io_create_f(dispatch_io_type_t type
, dispatch_fd_t fd
,
361 dispatch_queue_t queue
, void *context
,
362 void (*cleanup_handler
)(void *context
, int error
))
364 return dispatch_io_create(type
, fd
, queue
, !cleanup_handler
? NULL
:
365 ^(int error
){ cleanup_handler(context
, error
); });
369 dispatch_io_create_with_path(dispatch_io_type_t type
, const char *path
,
370 int oflag
, mode_t mode
, dispatch_queue_t queue
,
371 void (^cleanup_handler
)(int error
))
373 if ((type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) ||
374 !(path
&& *path
== '/')) {
377 size_t pathlen
= strlen(path
);
378 dispatch_io_path_data_t path_data
= malloc(sizeof(*path_data
) + pathlen
+1);
382 _dispatch_fd_debug("io create with path %s", -1, path
);
383 dispatch_io_t channel
= _dispatch_io_create(type
);
385 channel
->fd_actual
= -1;
386 path_data
->channel
= channel
;
387 path_data
->oflag
= oflag
;
388 path_data
->mode
= mode
;
389 path_data
->pathlen
= pathlen
;
390 memcpy(path_data
->path
, path
, pathlen
+ 1);
391 _dispatch_retain(queue
);
392 _dispatch_retain(channel
);
393 dispatch_async(channel
->queue
, ^{
396 _dispatch_io_syscall_switch_noerr(err
,
397 (path_data
->oflag
& O_NOFOLLOW
) == O_NOFOLLOW
||
398 (path_data
->oflag
& O_SYMLINK
) == O_SYMLINK
?
399 lstat(path_data
->path
, &st
) : stat(path_data
->path
, &st
),
401 err
= _dispatch_io_validate_type(channel
, st
.st_mode
);
404 if ((path_data
->oflag
& O_CREAT
) &&
405 (*(path_data
->path
+ path_data
->pathlen
- 1) != '/')) {
406 // Check parent directory
407 char *c
= strrchr(path_data
->path
, '/');
411 _dispatch_io_syscall_switch_noerr(perr
,
412 stat(path_data
->path
, &st
),
414 // Since the parent directory exists, open() will
415 // create a regular file after the fd_entry has
417 st
.st_mode
= S_IFREG
;
428 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
);
429 _dispatch_release(channel
);
430 _dispatch_release(queue
);
433 dispatch_suspend(channel
->queue
);
434 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
,
435 _dispatch_io_devs_lockq_init
);
436 dispatch_async(_dispatch_io_devs_lockq
, ^{
437 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create_with_path(
438 path_data
, st
.st_dev
, st
.st_mode
);
439 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
);
440 dispatch_resume(channel
->queue
);
441 _dispatch_object_debug(channel
, "%s", __func__
);
442 _dispatch_release(channel
);
443 _dispatch_release(queue
);
446 _dispatch_object_debug(channel
, "%s", __func__
);
451 dispatch_io_create_with_path_f(dispatch_io_type_t type
, const char *path
,
452 int oflag
, mode_t mode
, dispatch_queue_t queue
, void *context
,
453 void (*cleanup_handler
)(void *context
, int error
))
455 return dispatch_io_create_with_path(type
, path
, oflag
, mode
, queue
,
456 !cleanup_handler
? NULL
:
457 ^(int error
){ cleanup_handler(context
, error
); });
461 dispatch_io_create_with_io(dispatch_io_type_t type
, dispatch_io_t in_channel
,
462 dispatch_queue_t queue
, void (^cleanup_handler
)(int error
))
464 if (type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) {
467 _dispatch_fd_debug("io create with io %p", -1, in_channel
);
468 dispatch_io_t channel
= _dispatch_io_create(type
);
469 dispatch_suspend(channel
->queue
);
470 _dispatch_retain(queue
);
471 _dispatch_retain(channel
);
472 _dispatch_retain(in_channel
);
473 dispatch_async(in_channel
->queue
, ^{
474 int err0
= _dispatch_io_get_error(NULL
, in_channel
, false);
477 _dispatch_io_init(channel
, NULL
, queue
, err0
, cleanup_handler
);
478 dispatch_resume(channel
->queue
);
479 _dispatch_release(channel
);
480 _dispatch_release(in_channel
);
481 _dispatch_release(queue
);
484 dispatch_async(in_channel
->barrier_queue
, ^{
485 int err
= _dispatch_io_get_error(NULL
, in_channel
, false);
486 // If there is no error, the fd_entry for the in_channel is valid.
487 // Since we are running on in_channel's queue, the fd_entry has been
488 // fully resolved and will stay valid for the duration of this block
490 err
= in_channel
->err
;
492 err
= in_channel
->fd_entry
->err
;
496 err
= _dispatch_io_validate_type(channel
,
497 in_channel
->fd_entry
->stat
.mode
);
499 if (!err
&& type
== DISPATCH_IO_RANDOM
&& in_channel
->fd
!= -1) {
501 _dispatch_io_syscall_switch_noerr(err
,
502 f_ptr
= lseek(in_channel
->fd_entry
->fd
, 0, SEEK_CUR
),
503 case 0: channel
->f_ptr
= f_ptr
; break;
504 default: (void)dispatch_assume_zero(err
); break;
509 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
);
510 dispatch_resume(channel
->queue
);
511 _dispatch_release(channel
);
512 _dispatch_release(in_channel
);
513 _dispatch_release(queue
);
516 if (in_channel
->fd
== -1) {
517 // in_channel was created from path
519 channel
->fd_actual
= -1;
520 mode_t mode
= in_channel
->fd_entry
->stat
.mode
;
521 dev_t dev
= in_channel
->fd_entry
->stat
.dev
;
522 size_t path_data_len
= sizeof(struct dispatch_io_path_data_s
) +
523 in_channel
->fd_entry
->path_data
->pathlen
+ 1;
524 dispatch_io_path_data_t path_data
= malloc(path_data_len
);
525 memcpy(path_data
, in_channel
->fd_entry
->path_data
,
527 path_data
->channel
= channel
;
528 // lockq_io_devs is known to already exist
529 dispatch_async(_dispatch_io_devs_lockq
, ^{
530 dispatch_fd_entry_t fd_entry
;
531 fd_entry
= _dispatch_fd_entry_create_with_path(path_data
,
533 _dispatch_io_init(channel
, fd_entry
, queue
, 0,
535 dispatch_resume(channel
->queue
);
536 _dispatch_release(channel
);
537 _dispatch_release(queue
);
540 dispatch_fd_entry_t fd_entry
= in_channel
->fd_entry
;
541 channel
->fd
= in_channel
->fd
;
542 channel
->fd_actual
= in_channel
->fd_actual
;
543 _dispatch_fd_entry_retain(fd_entry
);
544 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
);
545 dispatch_resume(channel
->queue
);
546 _dispatch_release(channel
);
547 _dispatch_release(queue
);
549 _dispatch_release(in_channel
);
550 _dispatch_object_debug(channel
, "%s", __func__
);
553 _dispatch_object_debug(channel
, "%s", __func__
);
558 dispatch_io_create_with_io_f(dispatch_io_type_t type
, dispatch_io_t in_channel
,
559 dispatch_queue_t queue
, void *context
,
560 void (*cleanup_handler
)(void *context
, int error
))
562 return dispatch_io_create_with_io(type
, in_channel
, queue
,
563 !cleanup_handler
? NULL
:
564 ^(int error
){ cleanup_handler(context
, error
); });
568 #pragma mark dispatch_io_accessors
571 dispatch_io_set_high_water(dispatch_io_t channel
, size_t high_water
)
573 _dispatch_retain(channel
);
574 dispatch_async(channel
->queue
, ^{
575 _dispatch_fd_debug("io set high water", channel
->fd
);
576 if (channel
->params
.low
> high_water
) {
577 channel
->params
.low
= high_water
;
579 channel
->params
.high
= high_water
? high_water
: 1;
580 _dispatch_release(channel
);
585 dispatch_io_set_low_water(dispatch_io_t channel
, size_t low_water
)
587 _dispatch_retain(channel
);
588 dispatch_async(channel
->queue
, ^{
589 _dispatch_fd_debug("io set low water", channel
->fd
);
590 if (channel
->params
.high
< low_water
) {
591 channel
->params
.high
= low_water
? low_water
: 1;
593 channel
->params
.low
= low_water
;
594 _dispatch_release(channel
);
599 dispatch_io_set_interval(dispatch_io_t channel
, uint64_t interval
,
602 _dispatch_retain(channel
);
603 dispatch_async(channel
->queue
, ^{
604 _dispatch_fd_debug("io set interval", channel
->fd
);
605 channel
->params
.interval
= interval
< INT64_MAX
? interval
: INT64_MAX
;
606 channel
->params
.interval_flags
= flags
;
607 _dispatch_release(channel
);
612 _dispatch_io_set_target_queue(dispatch_io_t channel
, dispatch_queue_t dq
)
614 _dispatch_retain(dq
);
615 _dispatch_retain(channel
);
616 dispatch_async(channel
->queue
, ^{
617 dispatch_queue_t prev_dq
= channel
->do_targetq
;
618 channel
->do_targetq
= dq
;
619 _dispatch_release(prev_dq
);
620 _dispatch_object_debug(channel
, "%s", __func__
);
621 _dispatch_release(channel
);
626 dispatch_io_get_descriptor(dispatch_io_t channel
)
628 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
631 dispatch_fd_t fd
= channel
->fd_actual
;
632 if (fd
== -1 && _dispatch_thread_getspecific(dispatch_io_key
) == channel
&&
633 !_dispatch_io_get_error(NULL
, channel
, false)) {
634 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
635 (void)_dispatch_fd_entry_open(fd_entry
, channel
);
637 return channel
->fd_actual
;
641 #pragma mark dispatch_io_operations
644 _dispatch_io_stop(dispatch_io_t channel
)
646 _dispatch_fd_debug("io stop", channel
->fd
);
647 (void)dispatch_atomic_or2o(channel
, atomic_flags
, DIO_STOPPED
, relaxed
);
648 _dispatch_retain(channel
);
649 dispatch_async(channel
->queue
, ^{
650 dispatch_async(channel
->barrier_queue
, ^{
651 _dispatch_object_debug(channel
, "%s", __func__
);
652 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
654 _dispatch_fd_debug("io stop cleanup", channel
->fd
);
655 _dispatch_fd_entry_cleanup_operations(fd_entry
, channel
);
656 if (!(channel
->atomic_flags
& DIO_CLOSED
)) {
657 channel
->fd_entry
= NULL
;
658 _dispatch_fd_entry_release(fd_entry
);
660 } else if (channel
->fd
!= -1) {
661 // Stop after close, need to check if fd_entry still exists
662 _dispatch_retain(channel
);
663 dispatch_async(_dispatch_io_fds_lockq
, ^{
664 _dispatch_object_debug(channel
, "%s", __func__
);
665 _dispatch_fd_debug("io stop after close cleanup",
667 dispatch_fd_entry_t fdi
;
668 uintptr_t hash
= DIO_HASH(channel
->fd
);
669 TAILQ_FOREACH(fdi
, &_dispatch_io_fds
[hash
], fd_list
) {
670 if (fdi
->fd
== channel
->fd
) {
671 _dispatch_fd_entry_cleanup_operations(fdi
, channel
);
675 _dispatch_release(channel
);
678 _dispatch_release(channel
);
684 dispatch_io_close(dispatch_io_t channel
, unsigned long flags
)
686 if (flags
& DISPATCH_IO_STOP
) {
687 // Don't stop an already stopped channel
688 if (channel
->atomic_flags
& DIO_STOPPED
) {
691 return _dispatch_io_stop(channel
);
693 // Don't close an already closed or stopped channel
694 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
697 _dispatch_retain(channel
);
698 dispatch_async(channel
->queue
, ^{
699 dispatch_async(channel
->barrier_queue
, ^{
700 _dispatch_object_debug(channel
, "%s", __func__
);
701 _dispatch_fd_debug("io close", channel
->fd
);
702 if (!(channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
))) {
703 (void)dispatch_atomic_or2o(channel
, atomic_flags
, DIO_CLOSED
,
705 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
707 if (!fd_entry
->path_data
) {
708 channel
->fd_entry
= NULL
;
710 _dispatch_fd_entry_release(fd_entry
);
713 _dispatch_release(channel
);
719 dispatch_io_barrier(dispatch_io_t channel
, dispatch_block_t barrier
)
721 _dispatch_retain(channel
);
722 dispatch_async(channel
->queue
, ^{
723 dispatch_queue_t io_q
= channel
->do_targetq
;
724 dispatch_queue_t barrier_queue
= channel
->barrier_queue
;
725 dispatch_group_t barrier_group
= channel
->barrier_group
;
726 dispatch_async(barrier_queue
, ^{
727 dispatch_suspend(barrier_queue
);
728 dispatch_group_notify(barrier_group
, io_q
, ^{
729 _dispatch_object_debug(channel
, "%s", __func__
);
730 _dispatch_thread_setspecific(dispatch_io_key
, channel
);
732 _dispatch_thread_setspecific(dispatch_io_key
, NULL
);
733 dispatch_resume(barrier_queue
);
734 _dispatch_release(channel
);
741 dispatch_io_barrier_f(dispatch_io_t channel
, void *context
,
742 dispatch_function_t barrier
)
744 return dispatch_io_barrier(channel
, ^{ barrier(context
); });
748 dispatch_io_read(dispatch_io_t channel
, off_t offset
, size_t length
,
749 dispatch_queue_t queue
, dispatch_io_handler_t handler
)
751 _dispatch_retain(channel
);
752 _dispatch_retain(queue
);
753 dispatch_async(channel
->queue
, ^{
754 dispatch_operation_t op
;
755 op
= _dispatch_operation_create(DOP_DIR_READ
, channel
, offset
,
756 length
, dispatch_data_empty
, queue
, handler
);
758 dispatch_queue_t barrier_q
= channel
->barrier_queue
;
759 dispatch_async(barrier_q
, ^{
760 _dispatch_operation_enqueue(op
, DOP_DIR_READ
,
761 dispatch_data_empty
);
764 _dispatch_release(channel
);
765 _dispatch_release(queue
);
770 dispatch_io_read_f(dispatch_io_t channel
, off_t offset
, size_t length
,
771 dispatch_queue_t queue
, void *context
,
772 dispatch_io_handler_function_t handler
)
774 return dispatch_io_read(channel
, offset
, length
, queue
,
775 ^(bool done
, dispatch_data_t d
, int error
){
776 handler(context
, done
, d
, error
);
781 dispatch_io_write(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
,
782 dispatch_queue_t queue
, dispatch_io_handler_t handler
)
784 _dispatch_io_data_retain(data
);
785 _dispatch_retain(channel
);
786 _dispatch_retain(queue
);
787 dispatch_async(channel
->queue
, ^{
788 dispatch_operation_t op
;
789 op
= _dispatch_operation_create(DOP_DIR_WRITE
, channel
, offset
,
790 dispatch_data_get_size(data
), data
, queue
, handler
);
792 dispatch_queue_t barrier_q
= channel
->barrier_queue
;
793 dispatch_async(barrier_q
, ^{
794 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
);
795 _dispatch_io_data_release(data
);
798 _dispatch_io_data_release(data
);
800 _dispatch_release(channel
);
801 _dispatch_release(queue
);
806 dispatch_io_write_f(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
,
807 dispatch_queue_t queue
, void *context
,
808 dispatch_io_handler_function_t handler
)
810 return dispatch_io_write(channel
, offset
, data
, queue
,
811 ^(bool done
, dispatch_data_t d
, int error
){
812 handler(context
, done
, d
, error
);
817 dispatch_read(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
,
818 void (^handler
)(dispatch_data_t
, int))
820 _dispatch_retain(queue
);
821 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
824 int err
= fd_entry
->err
;
825 dispatch_async(queue
, ^{
826 _dispatch_fd_debug("convenience handler invoke", fd
);
827 handler(dispatch_data_empty
, err
);
829 _dispatch_release(queue
);
832 // Safe to access fd_entry on barrier queue
833 dispatch_io_t channel
= fd_entry
->convenience_channel
;
835 channel
= _dispatch_io_create(DISPATCH_IO_STREAM
);
837 channel
->fd_actual
= fd
;
838 channel
->fd_entry
= fd_entry
;
839 dispatch_retain(fd_entry
->barrier_queue
);
840 dispatch_retain(fd_entry
->barrier_group
);
841 channel
->barrier_queue
= fd_entry
->barrier_queue
;
842 channel
->barrier_group
= fd_entry
->barrier_group
;
843 fd_entry
->convenience_channel
= channel
;
845 __block dispatch_data_t deliver_data
= dispatch_data_empty
;
847 dispatch_async(fd_entry
->close_queue
, ^{
848 dispatch_async(queue
, ^{
849 _dispatch_fd_debug("convenience handler invoke", fd
);
850 handler(deliver_data
, err
);
851 _dispatch_io_data_release(deliver_data
);
853 _dispatch_release(queue
);
855 dispatch_operation_t op
=
856 _dispatch_operation_create(DOP_DIR_READ
, channel
, 0,
857 length
, dispatch_data_empty
,
858 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,false),
859 ^(bool done
, dispatch_data_t data
, int error
) {
861 data
= dispatch_data_create_concat(deliver_data
, data
);
862 _dispatch_io_data_release(deliver_data
);
870 _dispatch_operation_enqueue(op
, DOP_DIR_READ
, dispatch_data_empty
);
876 dispatch_read_f(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
,
877 void *context
, void (*handler
)(void *, dispatch_data_t
, int))
879 return dispatch_read(fd
, length
, queue
, ^(dispatch_data_t d
, int error
){
880 handler(context
, d
, error
);
885 dispatch_write(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
,
886 void (^handler
)(dispatch_data_t
, int))
888 _dispatch_io_data_retain(data
);
889 _dispatch_retain(queue
);
890 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
893 int err
= fd_entry
->err
;
894 dispatch_async(queue
, ^{
895 _dispatch_fd_debug("convenience handler invoke", fd
);
898 _dispatch_release(queue
);
901 // Safe to access fd_entry on barrier queue
902 dispatch_io_t channel
= fd_entry
->convenience_channel
;
904 channel
= _dispatch_io_create(DISPATCH_IO_STREAM
);
906 channel
->fd_actual
= fd
;
907 channel
->fd_entry
= fd_entry
;
908 dispatch_retain(fd_entry
->barrier_queue
);
909 dispatch_retain(fd_entry
->barrier_group
);
910 channel
->barrier_queue
= fd_entry
->barrier_queue
;
911 channel
->barrier_group
= fd_entry
->barrier_group
;
912 fd_entry
->convenience_channel
= channel
;
914 __block dispatch_data_t deliver_data
= NULL
;
916 dispatch_async(fd_entry
->close_queue
, ^{
917 dispatch_async(queue
, ^{
918 _dispatch_fd_debug("convenience handler invoke", fd
);
919 handler(deliver_data
, err
);
921 _dispatch_io_data_release(deliver_data
);
924 _dispatch_release(queue
);
926 dispatch_operation_t op
=
927 _dispatch_operation_create(DOP_DIR_WRITE
, channel
, 0,
928 dispatch_data_get_size(data
), data
,
929 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,false),
930 ^(bool done
, dispatch_data_t d
, int error
) {
933 _dispatch_io_data_retain(d
);
940 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
);
942 _dispatch_io_data_release(data
);
947 dispatch_write_f(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
,
948 void *context
, void (*handler
)(void *, dispatch_data_t
, int))
950 return dispatch_write(fd
, data
, queue
, ^(dispatch_data_t d
, int error
){
951 handler(context
, d
, error
);
956 #pragma mark dispatch_operation_t
958 static dispatch_operation_t
959 _dispatch_operation_create(dispatch_op_direction_t direction
,
960 dispatch_io_t channel
, off_t offset
, size_t length
,
961 dispatch_data_t data
, dispatch_queue_t queue
,
962 dispatch_io_handler_t handler
)
965 dispatch_assert(direction
< DOP_DIR_MAX
);
966 _dispatch_fd_debug("operation create", channel
->fd
);
967 #if DISPATCH_IO_DEBUG
968 int fd
= channel
->fd
;
970 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
971 // that can only be NULL if atomic_flags are set rdar://problem/8362514
972 int err
= _dispatch_io_get_error(NULL
, channel
, false);
973 if (err
|| !length
) {
974 _dispatch_io_data_retain(data
);
975 _dispatch_retain(queue
);
976 dispatch_async(channel
->barrier_queue
, ^{
977 dispatch_async(queue
, ^{
978 dispatch_data_t d
= data
;
979 if (direction
== DOP_DIR_READ
&& err
) {
981 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
984 _dispatch_fd_debug("IO handler invoke", fd
);
985 handler(true, d
, err
);
986 _dispatch_io_data_release(data
);
988 _dispatch_release(queue
);
992 dispatch_operation_t op
= _dispatch_alloc(DISPATCH_VTABLE(operation
),
993 sizeof(struct dispatch_operation_s
));
994 op
->do_next
= DISPATCH_OBJECT_LISTLESS
;
995 op
->do_xref_cnt
= -1; // operation object is not exposed externally
996 op
->op_q
= dispatch_queue_create("com.apple.libdispatch-io.opq", NULL
);
997 op
->op_q
->do_targetq
= queue
;
998 _dispatch_retain(queue
);
1000 op
->direction
= direction
;
1001 op
->offset
= offset
+ channel
->f_ptr
;
1002 op
->length
= length
;
1003 op
->handler
= _dispatch_io_Block_copy(handler
);
1004 _dispatch_retain(channel
);
1005 op
->channel
= channel
;
1006 op
->params
= channel
->params
;
1007 // Take a snapshot of the priority of the channel queue. The actual I/O
1008 // for this operation will be performed at this priority
1009 dispatch_queue_t targetq
= op
->channel
->do_targetq
;
1010 while (fastpath(targetq
->do_targetq
)) {
1011 targetq
= targetq
->do_targetq
;
1013 op
->do_targetq
= targetq
;
1014 _dispatch_object_debug(op
, "%s", __func__
);
1019 _dispatch_operation_dispose(dispatch_operation_t op
)
1021 _dispatch_object_debug(op
, "%s", __func__
);
1022 // Deliver the data if there's any
1024 _dispatch_operation_deliver_data(op
, DOP_DONE
);
1025 dispatch_group_leave(op
->fd_entry
->barrier_group
);
1026 _dispatch_fd_entry_release(op
->fd_entry
);
1029 _dispatch_release(op
->channel
);
1032 dispatch_release(op
->timer
);
1034 // For write operations, op->buf is owned by op->buf_data
1035 if (op
->buf
&& op
->direction
== DOP_DIR_READ
) {
1039 _dispatch_io_data_release(op
->buf_data
);
1042 _dispatch_io_data_release(op
->data
);
1045 dispatch_release(op
->op_q
);
1047 Block_release(op
->handler
);
1051 _dispatch_operation_enqueue(dispatch_operation_t op
,
1052 dispatch_op_direction_t direction
, dispatch_data_t data
)
1054 // Called from the barrier queue
1055 _dispatch_io_data_retain(data
);
1056 // If channel is closed or stopped, then call the handler immediately
1057 int err
= _dispatch_io_get_error(NULL
, op
->channel
, false);
1059 dispatch_io_handler_t handler
= op
->handler
;
1060 dispatch_async(op
->op_q
, ^{
1061 dispatch_data_t d
= data
;
1062 if (direction
== DOP_DIR_READ
&& err
) {
1064 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
1067 handler(true, d
, err
);
1068 _dispatch_io_data_release(data
);
1070 _dispatch_release(op
);
1073 // Finish operation init
1074 op
->fd_entry
= op
->channel
->fd_entry
;
1075 _dispatch_fd_entry_retain(op
->fd_entry
);
1076 dispatch_group_enter(op
->fd_entry
->barrier_group
);
1077 dispatch_disk_t disk
= op
->fd_entry
->disk
;
1079 dispatch_stream_t stream
= op
->fd_entry
->streams
[direction
];
1080 dispatch_async(stream
->dq
, ^{
1081 _dispatch_stream_enqueue_operation(stream
, op
, data
);
1082 _dispatch_io_data_release(data
);
1085 dispatch_async(disk
->pick_queue
, ^{
1086 _dispatch_disk_enqueue_operation(disk
, op
, data
);
1087 _dispatch_io_data_release(data
);
1093 _dispatch_operation_should_enqueue(dispatch_operation_t op
,
1094 dispatch_queue_t tq
, dispatch_data_t data
)
1096 // On stream queue or disk queue
1097 _dispatch_fd_debug("enqueue operation", op
->fd_entry
->fd
);
1098 _dispatch_io_data_retain(data
);
1100 int err
= _dispatch_io_get_error(op
, NULL
, true);
1104 _dispatch_release(op
);
1107 if (op
->params
.interval
) {
1108 dispatch_resume(_dispatch_operation_timer(tq
, op
));
1113 static dispatch_source_t
1114 _dispatch_operation_timer(dispatch_queue_t tq
, dispatch_operation_t op
)
1116 // On stream queue or pick queue
1120 dispatch_source_t timer
= dispatch_source_create(
1121 DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, tq
);
1122 dispatch_source_set_timer(timer
, dispatch_time(DISPATCH_TIME_NOW
,
1123 (int64_t)op
->params
.interval
), op
->params
.interval
, 0);
1124 dispatch_source_set_event_handler(timer
, ^{
1125 // On stream queue or pick queue
1126 if (dispatch_source_testcancel(timer
)) {
1127 // Do nothing. The operation has already completed
1130 dispatch_op_flags_t flags
= DOP_DEFAULT
;
1131 if (op
->params
.interval_flags
& DISPATCH_IO_STRICT_INTERVAL
) {
1132 // Deliver even if there is less data than the low-water mark
1133 flags
|= DOP_DELIVER
;
1135 // If the operation is active, dont deliver data
1136 if ((op
->active
) && (flags
& DOP_DELIVER
)) {
1139 _dispatch_operation_deliver_data(op
, flags
);
1147 #pragma mark dispatch_fd_entry_t
1149 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1151 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry
)
1153 guardid_t guard
= fd_entry
;
1154 const unsigned int guard_flags
= GUARD_CLOSE
;
1155 int err
, fd_flags
= 0;
1156 _dispatch_io_syscall_switch_noerr(err
,
1157 change_fdguard_np(fd_entry
->fd
, NULL
, 0, &guard
, guard_flags
,
1160 fd_entry
->guard_flags
= guard_flags
;
1161 fd_entry
->orig_fd_flags
= fd_flags
;
1164 default: (void)dispatch_assume_zero(err
); break;
1169 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry
)
1171 if (!fd_entry
->guard_flags
) {
1174 guardid_t guard
= fd_entry
;
1175 int err
, fd_flags
= fd_entry
->orig_fd_flags
;
1176 _dispatch_io_syscall_switch(err
,
1177 change_fdguard_np(fd_entry
->fd
, &guard
, fd_entry
->guard_flags
, NULL
, 0,
1179 default: (void)dispatch_assume_zero(err
); break;
1184 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry
) { (void)fd_entry
; }
1186 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry
) { (void)fd_entry
; }
1187 #endif // DISPATCH_USE_GUARDED_FD
1190 _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry
, const char *path
,
1191 int oflag
, mode_t mode
) {
1192 #if DISPATCH_USE_GUARDED_FD
1193 guardid_t guard
= (uintptr_t)fd_entry
;
1194 const unsigned int guard_flags
= GUARD_CLOSE
| GUARD_DUP
|
1195 GUARD_SOCKET_IPC
| GUARD_FILEPORT
;
1196 int fd
= guarded_open_np(path
, &guard
, guard_flags
, oflag
| O_CLOEXEC
,
1199 fd_entry
->guard_flags
= guard_flags
;
1204 return open(path
, oflag
, mode
);
1209 _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry
, int fd
) {
1210 #if DISPATCH_USE_GUARDED_FD
1211 if (fd_entry
->guard_flags
) {
1212 guardid_t guard
= (uintptr_t)fd_entry
;
1213 return guarded_close_np(fd
, &guard
);
1223 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
) {
1224 dispatch_suspend(fd_entry
->close_queue
);
1228 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
) {
1229 dispatch_resume(fd_entry
->close_queue
);
1233 _dispatch_fd_entry_init_async(dispatch_fd_t fd
,
1234 dispatch_fd_entry_init_callback_t completion_callback
)
1236 static dispatch_once_t _dispatch_io_fds_lockq_pred
;
1237 dispatch_once_f(&_dispatch_io_fds_lockq_pred
, NULL
,
1238 _dispatch_io_fds_lockq_init
);
1239 dispatch_async(_dispatch_io_fds_lockq
, ^{
1240 _dispatch_fd_debug("fd entry init", fd
);
1241 dispatch_fd_entry_t fd_entry
= NULL
;
1242 // Check to see if there is an existing entry for the given fd
1243 uintptr_t hash
= DIO_HASH(fd
);
1244 TAILQ_FOREACH(fd_entry
, &_dispatch_io_fds
[hash
], fd_list
) {
1245 if (fd_entry
->fd
== fd
) {
1246 // Retain the fd_entry to ensure it cannot go away until the
1247 // stat() has completed
1248 _dispatch_fd_entry_retain(fd_entry
);
1253 // If we did not find an existing entry, create one
1254 fd_entry
= _dispatch_fd_entry_create_with_fd(fd
, hash
);
1256 dispatch_async(fd_entry
->barrier_queue
, ^{
1257 _dispatch_fd_debug("fd entry init completion", fd
);
1258 completion_callback(fd_entry
);
1259 // stat() is complete, release reference to fd_entry
1260 _dispatch_fd_entry_release(fd_entry
);
1265 static dispatch_fd_entry_t
1266 _dispatch_fd_entry_create(dispatch_queue_t q
)
1268 dispatch_fd_entry_t fd_entry
;
1269 fd_entry
= _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s
));
1270 fd_entry
->close_queue
= dispatch_queue_create(
1271 "com.apple.libdispatch-io.closeq", NULL
);
1272 // Use target queue to ensure that no concurrent lookups are going on when
1273 // the close queue is running
1274 fd_entry
->close_queue
->do_targetq
= q
;
1275 _dispatch_retain(q
);
1276 // Suspend the cleanup queue until closing
1277 _dispatch_fd_entry_retain(fd_entry
);
1281 static dispatch_fd_entry_t
1282 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
, uintptr_t hash
)
1284 // On fds lock queue
1285 _dispatch_fd_debug("fd entry create", fd
);
1286 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create(
1287 _dispatch_io_fds_lockq
);
1289 TAILQ_INSERT_TAIL(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
);
1290 fd_entry
->barrier_queue
= dispatch_queue_create(
1291 "com.apple.libdispatch-io.barrierq", NULL
);
1292 fd_entry
->barrier_group
= dispatch_group_create();
1293 dispatch_async(fd_entry
->barrier_queue
, ^{
1294 _dispatch_fd_debug("fd entry stat", fd
);
1295 int err
, orig_flags
, orig_nosigpipe
= -1;
1297 _dispatch_io_syscall_switch(err
,
1299 default: fd_entry
->err
= err
; return;
1301 fd_entry
->stat
.dev
= st
.st_dev
;
1302 fd_entry
->stat
.mode
= st
.st_mode
;
1303 _dispatch_fd_entry_guard(fd_entry
);
1304 _dispatch_io_syscall_switch(err
,
1305 orig_flags
= fcntl(fd
, F_GETFL
),
1306 default: (void)dispatch_assume_zero(err
); break;
1308 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1309 if (S_ISFIFO(st
.st_mode
)) {
1310 _dispatch_io_syscall_switch(err
,
1311 orig_nosigpipe
= fcntl(fd
, F_GETNOSIGPIPE
),
1312 default: (void)dispatch_assume_zero(err
); break;
1314 if (orig_nosigpipe
!= -1) {
1315 _dispatch_io_syscall_switch(err
,
1316 orig_nosigpipe
= fcntl(fd
, F_SETNOSIGPIPE
, 1),
1318 orig_nosigpipe
= -1;
1319 (void)dispatch_assume_zero(err
);
1325 if (S_ISREG(st
.st_mode
)) {
1326 if (orig_flags
!= -1) {
1327 _dispatch_io_syscall_switch(err
,
1328 fcntl(fd
, F_SETFL
, orig_flags
& ~O_NONBLOCK
),
1331 (void)dispatch_assume_zero(err
);
1335 int32_t dev
= major(st
.st_dev
);
1336 // We have to get the disk on the global dev queue. The
1337 // barrier queue cannot continue until that is complete
1338 dispatch_suspend(fd_entry
->barrier_queue
);
1339 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
,
1340 _dispatch_io_devs_lockq_init
);
1341 dispatch_async(_dispatch_io_devs_lockq
, ^{
1342 _dispatch_disk_init(fd_entry
, dev
);
1343 dispatch_resume(fd_entry
->barrier_queue
);
1346 if (orig_flags
!= -1) {
1347 _dispatch_io_syscall_switch(err
,
1348 fcntl(fd
, F_SETFL
, orig_flags
| O_NONBLOCK
),
1351 (void)dispatch_assume_zero(err
);
1355 _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue(
1356 _DISPATCH_QOS_CLASS_DEFAULT
, false));
1358 fd_entry
->orig_flags
= orig_flags
;
1359 fd_entry
->orig_nosigpipe
= orig_nosigpipe
;
1361 // This is the first item run when the close queue is resumed, indicating
1362 // that all channels associated with this entry have been closed and that
1363 // all operations associated with this entry have been freed
1364 dispatch_async(fd_entry
->close_queue
, ^{
1365 if (!fd_entry
->disk
) {
1366 _dispatch_fd_debug("close queue fd_entry cleanup", fd
);
1367 dispatch_op_direction_t dir
;
1368 for (dir
= 0; dir
< DOP_DIR_MAX
; dir
++) {
1369 _dispatch_stream_dispose(fd_entry
, dir
);
1372 dispatch_disk_t disk
= fd_entry
->disk
;
1373 dispatch_async(_dispatch_io_devs_lockq
, ^{
1374 _dispatch_release(disk
);
1377 // Remove this entry from the global fd list
1378 TAILQ_REMOVE(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
);
1380 // If there was a source associated with this stream, disposing of the
1381 // source cancels it and suspends the close queue. Freeing the fd_entry
1382 // structure must happen after the source cancel handler has finished
1383 dispatch_async(fd_entry
->close_queue
, ^{
1384 _dispatch_fd_debug("close queue release", fd
);
1385 dispatch_release(fd_entry
->close_queue
);
1386 _dispatch_fd_debug("barrier queue release", fd
);
1387 dispatch_release(fd_entry
->barrier_queue
);
1388 _dispatch_fd_debug("barrier group release", fd
);
1389 dispatch_release(fd_entry
->barrier_group
);
1390 if (fd_entry
->orig_flags
!= -1) {
1391 _dispatch_io_syscall(
1392 fcntl(fd
, F_SETFL
, fd_entry
->orig_flags
)
1395 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1396 if (fd_entry
->orig_nosigpipe
!= -1) {
1397 _dispatch_io_syscall(
1398 fcntl(fd
, F_SETNOSIGPIPE
, fd_entry
->orig_nosigpipe
)
1402 _dispatch_fd_entry_unguard(fd_entry
);
1403 if (fd_entry
->convenience_channel
) {
1404 fd_entry
->convenience_channel
->fd_entry
= NULL
;
1405 dispatch_release(fd_entry
->convenience_channel
);
1412 static dispatch_fd_entry_t
1413 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data
,
1414 dev_t dev
, mode_t mode
)
1416 // On devs lock queue
1417 _dispatch_fd_debug("fd entry create with path %s", -1, path_data
->path
);
1418 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create(
1419 path_data
->channel
->queue
);
1420 if (S_ISREG(mode
)) {
1421 _dispatch_disk_init(fd_entry
, major(dev
));
1423 _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue(
1424 _DISPATCH_QOS_CLASS_DEFAULT
, false));
1427 fd_entry
->orig_flags
= -1;
1428 fd_entry
->path_data
= path_data
;
1429 fd_entry
->stat
.dev
= dev
;
1430 fd_entry
->stat
.mode
= mode
;
1431 fd_entry
->barrier_queue
= dispatch_queue_create(
1432 "com.apple.libdispatch-io.barrierq", NULL
);
1433 fd_entry
->barrier_group
= dispatch_group_create();
1434 // This is the first item run when the close queue is resumed, indicating
1435 // that the channel associated with this entry has been closed and that
1436 // all operations associated with this entry have been freed
1437 dispatch_async(fd_entry
->close_queue
, ^{
1438 _dispatch_fd_debug("close queue fd_entry cleanup", -1);
1439 if (!fd_entry
->disk
) {
1440 dispatch_op_direction_t dir
;
1441 for (dir
= 0; dir
< DOP_DIR_MAX
; dir
++) {
1442 _dispatch_stream_dispose(fd_entry
, dir
);
1445 if (fd_entry
->fd
!= -1) {
1446 _dispatch_fd_entry_guarded_close(fd_entry
, fd_entry
->fd
);
1448 if (fd_entry
->path_data
->channel
) {
1449 // If associated channel has not been released yet, mark it as
1450 // no longer having an fd_entry (for stop after close).
1451 // It is safe to modify channel since we are on close_queue with
1452 // target queue the channel queue
1453 fd_entry
->path_data
->channel
->fd_entry
= NULL
;
1456 dispatch_async(fd_entry
->close_queue
, ^{
1457 _dispatch_fd_debug("close queue release", -1);
1458 dispatch_release(fd_entry
->close_queue
);
1459 dispatch_release(fd_entry
->barrier_queue
);
1460 dispatch_release(fd_entry
->barrier_group
);
1461 free(fd_entry
->path_data
);
1468 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
, dispatch_io_t channel
)
1470 if (!(fd_entry
->fd
== -1 && fd_entry
->path_data
)) {
1473 if (fd_entry
->err
) {
1474 return fd_entry
->err
;
1477 int oflag
= fd_entry
->disk
? fd_entry
->path_data
->oflag
& ~O_NONBLOCK
:
1478 fd_entry
->path_data
->oflag
| O_NONBLOCK
;
1480 fd
= _dispatch_fd_entry_guarded_open(fd_entry
, fd_entry
->path_data
->path
,
1481 oflag
, fd_entry
->path_data
->mode
);
1487 (void)dispatch_atomic_cmpxchg2o(fd_entry
, err
, 0, err
, relaxed
);
1490 if (!dispatch_atomic_cmpxchg2o(fd_entry
, fd
, -1, fd
, relaxed
)) {
1491 // Lost the race with another open
1492 _dispatch_fd_entry_guarded_close(fd_entry
, fd
);
1494 channel
->fd_actual
= fd
;
1496 _dispatch_object_debug(channel
, "%s", __func__
);
1501 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
,
1502 dispatch_io_t channel
)
1504 if (fd_entry
->disk
) {
1506 _dispatch_retain(channel
);
1508 _dispatch_fd_entry_retain(fd_entry
);
1509 dispatch_async(fd_entry
->disk
->pick_queue
, ^{
1510 _dispatch_disk_cleanup_operations(fd_entry
->disk
, channel
);
1511 _dispatch_fd_entry_release(fd_entry
);
1513 _dispatch_release(channel
);
1517 dispatch_op_direction_t direction
;
1518 for (direction
= 0; direction
< DOP_DIR_MAX
; direction
++) {
1519 dispatch_stream_t stream
= fd_entry
->streams
[direction
];
1524 _dispatch_retain(channel
);
1526 _dispatch_fd_entry_retain(fd_entry
);
1527 dispatch_async(stream
->dq
, ^{
1528 _dispatch_stream_cleanup_operations(stream
, channel
);
1529 _dispatch_fd_entry_release(fd_entry
);
1531 _dispatch_release(channel
);
1539 #pragma mark dispatch_stream_t/dispatch_disk_t
1542 _dispatch_stream_init(dispatch_fd_entry_t fd_entry
, dispatch_queue_t tq
)
1544 dispatch_op_direction_t direction
;
1545 for (direction
= 0; direction
< DOP_DIR_MAX
; direction
++) {
1546 dispatch_stream_t stream
;
1547 stream
= _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s
));
1548 stream
->dq
= dispatch_queue_create("com.apple.libdispatch-io.streamq",
1550 dispatch_set_context(stream
->dq
, stream
);
1551 _dispatch_retain(tq
);
1552 stream
->dq
->do_targetq
= tq
;
1553 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1554 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_STREAM
]);
1555 fd_entry
->streams
[direction
] = stream
;
1560 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
,
1561 dispatch_op_direction_t direction
)
1564 dispatch_stream_t stream
= fd_entry
->streams
[direction
];
1568 dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
]));
1569 dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
]));
1570 if (stream
->source
) {
1571 // Balanced by source cancel handler:
1572 _dispatch_fd_entry_retain(fd_entry
);
1573 dispatch_source_cancel(stream
->source
);
1574 dispatch_resume(stream
->source
);
1575 dispatch_release(stream
->source
);
1577 dispatch_set_context(stream
->dq
, NULL
);
1578 dispatch_release(stream
->dq
);
1583 _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
)
1585 // On devs lock queue
1586 dispatch_disk_t disk
;
1587 // Check to see if there is an existing entry for the given device
1588 uintptr_t hash
= DIO_HASH(dev
);
1589 TAILQ_FOREACH(disk
, &_dispatch_io_devs
[hash
], disk_list
) {
1590 if (disk
->dev
== dev
) {
1591 _dispatch_retain(disk
);
1595 // Otherwise create a new entry
1596 size_t pending_reqs_depth
= dispatch_io_defaults
.max_pending_io_reqs
;
1597 disk
= _dispatch_alloc(DISPATCH_VTABLE(disk
),
1598 sizeof(struct dispatch_disk_s
) +
1599 (pending_reqs_depth
* sizeof(dispatch_operation_t
)));
1600 disk
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1601 disk
->do_xref_cnt
= -1;
1602 disk
->advise_list_depth
= pending_reqs_depth
;
1603 disk
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
1606 TAILQ_INIT(&disk
->operations
);
1607 disk
->cur_rq
= TAILQ_FIRST(&disk
->operations
);
1609 snprintf(label
, sizeof(label
), "com.apple.libdispatch-io.deviceq.%d", dev
);
1610 disk
->pick_queue
= dispatch_queue_create(label
, NULL
);
1611 TAILQ_INSERT_TAIL(&_dispatch_io_devs
[hash
], disk
, disk_list
);
1613 fd_entry
->disk
= disk
;
1614 TAILQ_INIT(&fd_entry
->stream_ops
);
1618 _dispatch_disk_dispose(dispatch_disk_t disk
)
1620 uintptr_t hash
= DIO_HASH(disk
->dev
);
1621 TAILQ_REMOVE(&_dispatch_io_devs
[hash
], disk
, disk_list
);
1622 dispatch_assert(TAILQ_EMPTY(&disk
->operations
));
1624 for (i
=0; i
<disk
->advise_list_depth
; ++i
) {
1625 dispatch_assert(!disk
->advise_list
[i
]);
1627 dispatch_release(disk
->pick_queue
);
1631 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1634 _dispatch_stream_operation_avail(dispatch_stream_t stream
)
1636 return !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) ||
1637 !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
]));
1641 _dispatch_stream_enqueue_operation(dispatch_stream_t stream
,
1642 dispatch_operation_t op
, dispatch_data_t data
)
1644 if (!_dispatch_operation_should_enqueue(op
, stream
->dq
, data
)) {
1647 _dispatch_object_debug(op
, "%s", __func__
);
1648 bool no_ops
= !_dispatch_stream_operation_avail(stream
);
1649 TAILQ_INSERT_TAIL(&stream
->operations
[op
->params
.type
], op
, operation_list
);
1651 dispatch_async_f(stream
->dq
, stream
->dq
,
1652 _dispatch_stream_queue_handler
);
1657 _dispatch_disk_enqueue_operation(dispatch_disk_t disk
, dispatch_operation_t op
,
1658 dispatch_data_t data
)
1660 if (!_dispatch_operation_should_enqueue(op
, disk
->pick_queue
, data
)) {
1663 _dispatch_object_debug(op
, "%s", __func__
);
1664 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1665 if (TAILQ_EMPTY(&op
->fd_entry
->stream_ops
)) {
1666 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
);
1668 TAILQ_INSERT_TAIL(&op
->fd_entry
->stream_ops
, op
, stream_list
);
1670 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
);
1672 _dispatch_disk_handler(disk
);
1676 _dispatch_stream_complete_operation(dispatch_stream_t stream
,
1677 dispatch_operation_t op
)
1680 _dispatch_object_debug(op
, "%s", __func__
);
1681 _dispatch_fd_debug("complete operation", op
->fd_entry
->fd
);
1682 TAILQ_REMOVE(&stream
->operations
[op
->params
.type
], op
, operation_list
);
1683 if (op
== stream
->op
) {
1687 dispatch_source_cancel(op
->timer
);
1689 // Final release will deliver any pending data
1690 _dispatch_release(op
);
1694 _dispatch_disk_complete_operation(dispatch_disk_t disk
, dispatch_operation_t op
)
1697 _dispatch_object_debug(op
, "%s", __func__
);
1698 _dispatch_fd_debug("complete operation", op
->fd_entry
->fd
);
1699 // Current request is always the last op returned
1700 if (disk
->cur_rq
== op
) {
1701 disk
->cur_rq
= TAILQ_PREV(op
, dispatch_disk_operations_s
,
1704 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1705 // Check if there are other pending stream operations behind it
1706 dispatch_operation_t op_next
= TAILQ_NEXT(op
, stream_list
);
1707 TAILQ_REMOVE(&op
->fd_entry
->stream_ops
, op
, stream_list
);
1709 TAILQ_INSERT_TAIL(&disk
->operations
, op_next
, operation_list
);
1712 TAILQ_REMOVE(&disk
->operations
, op
, operation_list
);
1714 dispatch_source_cancel(op
->timer
);
1716 // Final release will deliver any pending data
1717 _dispatch_release(op
);
1720 static dispatch_operation_t
1721 _dispatch_stream_pick_next_operation(dispatch_stream_t stream
,
1722 dispatch_operation_t op
)
1726 // On the first run through, pick the first operation
1727 if (!_dispatch_stream_operation_avail(stream
)) {
1730 if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
])) {
1731 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_STREAM
]);
1732 } else if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) {
1733 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1737 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1738 // Stream operations need to be serialized so continue the current
1739 // operation until it is finished
1742 // Get the next random operation (round-robin)
1743 if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
1744 op
= TAILQ_NEXT(op
, operation_list
);
1746 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1753 static dispatch_operation_t
1754 _dispatch_disk_pick_next_operation(dispatch_disk_t disk
)
1757 dispatch_operation_t op
;
1758 if (!TAILQ_EMPTY(&disk
->operations
)) {
1759 if (disk
->cur_rq
== NULL
) {
1760 op
= TAILQ_FIRST(&disk
->operations
);
1764 op
= TAILQ_NEXT(op
, operation_list
);
1766 op
= TAILQ_FIRST(&disk
->operations
);
1768 // TODO: more involved picking algorithm rdar://problem/8780312
1769 } while (op
->active
&& op
!= disk
->cur_rq
);
1780 _dispatch_stream_cleanup_operations(dispatch_stream_t stream
,
1781 dispatch_io_t channel
)
1784 dispatch_operation_t op
, tmp
;
1785 typeof(*stream
->operations
) *operations
;
1786 operations
= &stream
->operations
[DISPATCH_IO_RANDOM
];
1787 TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) {
1788 if (!channel
|| op
->channel
== channel
) {
1789 _dispatch_stream_complete_operation(stream
, op
);
1792 operations
= &stream
->operations
[DISPATCH_IO_STREAM
];
1793 TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) {
1794 if (!channel
|| op
->channel
== channel
) {
1795 _dispatch_stream_complete_operation(stream
, op
);
1798 if (stream
->source_running
&& !_dispatch_stream_operation_avail(stream
)) {
1799 dispatch_suspend(stream
->source
);
1800 stream
->source_running
= false;
1805 _dispatch_disk_cleanup_operations(dispatch_disk_t disk
, dispatch_io_t channel
)
1808 dispatch_operation_t op
, tmp
;
1809 TAILQ_FOREACH_SAFE(op
, &disk
->operations
, operation_list
, tmp
) {
1810 if (!channel
|| op
->channel
== channel
) {
1811 _dispatch_disk_complete_operation(disk
, op
);
1817 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1819 static dispatch_source_t
1820 _dispatch_stream_source(dispatch_stream_t stream
, dispatch_operation_t op
)
1823 if (stream
->source
) {
1824 return stream
->source
;
1826 dispatch_fd_t fd
= op
->fd_entry
->fd
;
1827 _dispatch_fd_debug("stream source create", fd
);
1828 dispatch_source_t source
= NULL
;
1829 if (op
->direction
== DOP_DIR_READ
) {
1830 source
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
,
1831 (uintptr_t)fd
, 0, stream
->dq
);
1832 } else if (op
->direction
== DOP_DIR_WRITE
) {
1833 source
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
,
1834 (uintptr_t)fd
, 0, stream
->dq
);
1836 dispatch_assert(op
->direction
< DOP_DIR_MAX
);
1839 dispatch_set_context(source
, stream
);
1840 dispatch_source_set_event_handler_f(source
,
1841 _dispatch_stream_source_handler
);
1842 // Close queue must not run user cleanup handlers until sources are fully
1844 dispatch_queue_t close_queue
= op
->fd_entry
->close_queue
;
1845 dispatch_source_set_cancel_handler(source
, ^{
1846 _dispatch_fd_debug("stream source cancel", fd
);
1847 dispatch_resume(close_queue
);
1849 stream
->source
= source
;
1850 return stream
->source
;
1854 _dispatch_stream_source_handler(void *ctx
)
1857 dispatch_stream_t stream
= (dispatch_stream_t
)ctx
;
1858 dispatch_suspend(stream
->source
);
1859 stream
->source_running
= false;
1860 return _dispatch_stream_handler(stream
);
1864 _dispatch_stream_queue_handler(void *ctx
)
1867 dispatch_stream_t stream
= (dispatch_stream_t
)dispatch_get_context(ctx
);
1869 // _dispatch_stream_dispose has been called
1872 return _dispatch_stream_handler(stream
);
1876 _dispatch_stream_handler(void *ctx
)
1879 dispatch_stream_t stream
= (dispatch_stream_t
)ctx
;
1880 dispatch_operation_t op
;
1882 op
= _dispatch_stream_pick_next_operation(stream
, stream
->op
);
1884 _dispatch_debug("no operation found: stream %p", stream
);
1887 int err
= _dispatch_io_get_error(op
, NULL
, true);
1890 _dispatch_stream_complete_operation(stream
, op
);
1894 _dispatch_fd_debug("stream handler", op
->fd_entry
->fd
);
1895 dispatch_fd_entry_t fd_entry
= op
->fd_entry
;
1896 _dispatch_fd_entry_retain(fd_entry
);
1897 // For performance analysis
1898 if (!op
->total
&& dispatch_io_defaults
.initial_delivery
) {
1899 // Empty delivery to signal the start of the operation
1900 _dispatch_fd_debug("initial delivery", op
->fd_entry
->fd
);
1901 _dispatch_operation_deliver_data(op
, DOP_DELIVER
);
1903 // TODO: perform on the operation target queue to get correct priority
1904 int result
= _dispatch_operation_perform(op
);
1905 dispatch_op_flags_t flags
= ~0u;
1907 case DISPATCH_OP_DELIVER
:
1908 flags
= DOP_DEFAULT
;
1910 case DISPATCH_OP_DELIVER_AND_COMPLETE
:
1911 flags
= (flags
!= DOP_DEFAULT
) ? DOP_DELIVER
| DOP_NO_EMPTY
:
1913 _dispatch_operation_deliver_data(op
, flags
);
1915 case DISPATCH_OP_COMPLETE
:
1916 if (flags
!= DOP_DEFAULT
) {
1917 _dispatch_stream_complete_operation(stream
, op
);
1919 if (_dispatch_stream_operation_avail(stream
)) {
1920 dispatch_async_f(stream
->dq
, stream
->dq
,
1921 _dispatch_stream_queue_handler
);
1924 case DISPATCH_OP_COMPLETE_RESUME
:
1925 _dispatch_stream_complete_operation(stream
, op
);
1927 case DISPATCH_OP_RESUME
:
1928 if (_dispatch_stream_operation_avail(stream
)) {
1929 stream
->source_running
= true;
1930 dispatch_resume(_dispatch_stream_source(stream
, op
));
1933 case DISPATCH_OP_ERR
:
1934 _dispatch_stream_cleanup_operations(stream
, op
->channel
);
1936 case DISPATCH_OP_FD_ERR
:
1937 _dispatch_fd_entry_retain(fd_entry
);
1938 dispatch_async(fd_entry
->barrier_queue
, ^{
1939 _dispatch_fd_entry_cleanup_operations(fd_entry
, NULL
);
1940 _dispatch_fd_entry_release(fd_entry
);
1946 _dispatch_fd_entry_release(fd_entry
);
1951 _dispatch_disk_handler(void *ctx
)
1954 dispatch_disk_t disk
= (dispatch_disk_t
)ctx
;
1955 if (disk
->io_active
) {
1958 _dispatch_fd_debug("disk handler", -1);
1959 dispatch_operation_t op
;
1960 size_t i
= disk
->free_idx
, j
= disk
->req_idx
;
1962 j
+= disk
->advise_list_depth
;
1965 if ((!disk
->advise_list
[i%disk
->advise_list_depth
]) &&
1966 (op
= _dispatch_disk_pick_next_operation(disk
))) {
1967 int err
= _dispatch_io_get_error(op
, NULL
, true);
1970 _dispatch_disk_complete_operation(disk
, op
);
1973 _dispatch_retain(op
);
1974 disk
->advise_list
[i%disk
->advise_list_depth
] = op
;
1976 _dispatch_object_debug(op
, "%s", __func__
);
1978 // No more operations to get
1983 disk
->free_idx
= (i%disk
->advise_list_depth
);
1984 op
= disk
->advise_list
[disk
->req_idx
];
1986 disk
->io_active
= true;
1987 dispatch_async_f(op
->do_targetq
, disk
, _dispatch_disk_perform
);
1992 _dispatch_disk_perform(void *ctxt
)
1994 dispatch_disk_t disk
= ctxt
;
1995 size_t chunk_size
= dispatch_io_defaults
.chunk_pages
* PAGE_SIZE
;
1996 _dispatch_fd_debug("disk perform", -1);
1997 dispatch_operation_t op
;
1998 size_t i
= disk
->advise_idx
, j
= disk
->free_idx
;
2000 j
+= disk
->advise_list_depth
;
2003 op
= disk
->advise_list
[i%disk
->advise_list_depth
];
2005 // Nothing more to advise, must be at free_idx
2006 dispatch_assert(i%disk
->advise_list_depth
== disk
->free_idx
);
2009 if (op
->direction
== DOP_DIR_WRITE
) {
2010 // TODO: preallocate writes ? rdar://problem/9032172
2013 if (op
->fd_entry
->fd
== -1 && _dispatch_fd_entry_open(op
->fd_entry
,
2017 // For performance analysis
2018 if (!op
->total
&& dispatch_io_defaults
.initial_delivery
) {
2019 // Empty delivery to signal the start of the operation
2020 _dispatch_fd_debug("initial delivery", op
->fd_entry
->fd
);
2021 _dispatch_operation_deliver_data(op
, DOP_DELIVER
);
2023 // Advise two chunks if the list only has one element and this is the
2024 // first advise on the operation
2025 if ((j
-i
) == 1 && !disk
->advise_list
[disk
->free_idx
] &&
2026 !op
->advise_offset
) {
2029 _dispatch_operation_advise(op
, chunk_size
);
2031 disk
->advise_idx
= i%disk
->advise_list_depth
;
2032 op
= disk
->advise_list
[disk
->req_idx
];
2033 int result
= _dispatch_operation_perform(op
);
2034 disk
->advise_list
[disk
->req_idx
] = NULL
;
2035 disk
->req_idx
= (++disk
->req_idx
)%disk
->advise_list_depth
;
2036 dispatch_async(disk
->pick_queue
, ^{
2038 case DISPATCH_OP_DELIVER
:
2039 _dispatch_operation_deliver_data(op
, DOP_DEFAULT
);
2041 case DISPATCH_OP_COMPLETE
:
2042 _dispatch_disk_complete_operation(disk
, op
);
2044 case DISPATCH_OP_DELIVER_AND_COMPLETE
:
2045 _dispatch_operation_deliver_data(op
, DOP_DELIVER
| DOP_NO_EMPTY
);
2046 _dispatch_disk_complete_operation(disk
, op
);
2048 case DISPATCH_OP_ERR
:
2049 _dispatch_disk_cleanup_operations(disk
, op
->channel
);
2051 case DISPATCH_OP_FD_ERR
:
2052 _dispatch_disk_cleanup_operations(disk
, NULL
);
2055 dispatch_assert(result
);
2059 disk
->io_active
= false;
2060 _dispatch_disk_handler(disk
);
2061 // Balancing the retain in _dispatch_disk_handler. Note that op must be
2062 // released at the very end, since it might hold the last reference to
2064 _dispatch_release(op
);
2069 #pragma mark dispatch_operation_perform
2072 _dispatch_operation_advise(dispatch_operation_t op
, size_t chunk_size
)
2075 struct radvisory advise
;
2076 // No point in issuing a read advise for the next chunk if we are already
2077 // a chunk ahead from reading the bytes
2078 if (op
->advise_offset
> (off_t
)(((size_t)op
->offset
+ op
->total
) +
2079 chunk_size
+ PAGE_SIZE
)) {
2082 _dispatch_object_debug(op
, "%s", __func__
);
2083 advise
.ra_count
= (int)chunk_size
;
2084 if (!op
->advise_offset
) {
2085 op
->advise_offset
= op
->offset
;
2086 // If this is the first time through, align the advised range to a
2088 size_t pg_fraction
= ((size_t)op
->offset
+ chunk_size
) % PAGE_SIZE
;
2089 advise
.ra_count
+= (int)(pg_fraction
? PAGE_SIZE
- pg_fraction
: 0);
2091 advise
.ra_offset
= op
->advise_offset
;
2092 op
->advise_offset
+= advise
.ra_count
;
2093 _dispatch_io_syscall_switch(err
,
2094 fcntl(op
->fd_entry
->fd
, F_RDADVISE
, &advise
),
2095 case EFBIG
: break; // advised past the end of the file rdar://10415691
2096 case ENOTSUP
: break; // not all FS support radvise rdar://13484629
2097 // TODO: set disk status on error
2098 default: (void)dispatch_assume_zero(err
); break;
2103 _dispatch_operation_perform(dispatch_operation_t op
)
2105 int err
= _dispatch_io_get_error(op
, NULL
, true);
2109 _dispatch_object_debug(op
, "%s", __func__
);
2111 size_t max_buf_siz
= op
->params
.high
;
2112 size_t chunk_siz
= dispatch_io_defaults
.chunk_pages
* PAGE_SIZE
;
2113 if (op
->direction
== DOP_DIR_READ
) {
2114 // If necessary, create a buffer for the ongoing operation, large
2115 // enough to fit chunk_pages but at most high-water
2116 size_t data_siz
= dispatch_data_get_size(op
->data
);
2118 dispatch_assert(data_siz
< max_buf_siz
);
2119 max_buf_siz
-= data_siz
;
2121 if (max_buf_siz
> chunk_siz
) {
2122 max_buf_siz
= chunk_siz
;
2124 if (op
->length
< SIZE_MAX
) {
2125 op
->buf_siz
= op
->length
- op
->total
;
2126 if (op
->buf_siz
> max_buf_siz
) {
2127 op
->buf_siz
= max_buf_siz
;
2130 op
->buf_siz
= max_buf_siz
;
2132 op
->buf
= valloc(op
->buf_siz
);
2133 _dispatch_fd_debug("buffer allocated", op
->fd_entry
->fd
);
2134 } else if (op
->direction
== DOP_DIR_WRITE
) {
2135 // Always write the first data piece, if that is smaller than a
2136 // chunk, accumulate further data pieces until chunk size is reached
2137 if (chunk_siz
> max_buf_siz
) {
2138 chunk_siz
= max_buf_siz
;
2141 dispatch_data_apply(op
->data
,
2142 ^(dispatch_data_t region DISPATCH_UNUSED
,
2143 size_t offset DISPATCH_UNUSED
,
2144 const void* buf DISPATCH_UNUSED
, size_t len
) {
2145 size_t siz
= op
->buf_siz
+ len
;
2146 if (!op
->buf_siz
|| siz
<= chunk_siz
) {
2149 return (bool)(siz
< chunk_siz
);
2151 if (op
->buf_siz
> max_buf_siz
) {
2152 op
->buf_siz
= max_buf_siz
;
2155 d
= dispatch_data_create_subrange(op
->data
, 0, op
->buf_siz
);
2156 op
->buf_data
= dispatch_data_create_map(d
, (const void**)&op
->buf
,
2158 _dispatch_io_data_release(d
);
2159 _dispatch_fd_debug("buffer mapped", op
->fd_entry
->fd
);
2162 if (op
->fd_entry
->fd
== -1) {
2163 err
= _dispatch_fd_entry_open(op
->fd_entry
, op
->channel
);
2168 void *buf
= op
->buf
+ op
->buf_len
;
2169 size_t len
= op
->buf_siz
- op
->buf_len
;
2170 off_t off
= (off_t
)((size_t)op
->offset
+ op
->total
);
2171 ssize_t processed
= -1;
2173 if (op
->direction
== DOP_DIR_READ
) {
2174 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
2175 processed
= read(op
->fd_entry
->fd
, buf
, len
);
2176 } else if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
2177 processed
= pread(op
->fd_entry
->fd
, buf
, len
, off
);
2179 } else if (op
->direction
== DOP_DIR_WRITE
) {
2180 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
2181 processed
= write(op
->fd_entry
->fd
, buf
, len
);
2182 } else if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
2183 processed
= pwrite(op
->fd_entry
->fd
, buf
, len
, off
);
2186 // Encountered an error on the file descriptor
2187 if (processed
== -1) {
2194 // EOF is indicated by two handler invocations
2195 if (processed
== 0) {
2196 _dispatch_fd_debug("EOF", op
->fd_entry
->fd
);
2197 return DISPATCH_OP_DELIVER_AND_COMPLETE
;
2199 op
->buf_len
+= (size_t)processed
;
2200 op
->total
+= (size_t)processed
;
2201 if (op
->total
== op
->length
) {
2202 // Finished processing all the bytes requested by the operation
2203 return DISPATCH_OP_COMPLETE
;
2205 // Deliver data only if we satisfy the filters
2206 return DISPATCH_OP_DELIVER
;
2209 if (err
== EAGAIN
) {
2210 // For disk based files with blocking I/O we should never get EAGAIN
2211 dispatch_assert(!op
->fd_entry
->disk
);
2212 _dispatch_fd_debug("EAGAIN %d", op
->fd_entry
->fd
, err
);
2213 if (op
->direction
== DOP_DIR_READ
&& op
->total
&&
2214 op
->channel
== op
->fd_entry
->convenience_channel
) {
2215 // Convenience read with available data completes on EAGAIN
2216 return DISPATCH_OP_COMPLETE_RESUME
;
2218 return DISPATCH_OP_RESUME
;
2223 return DISPATCH_OP_ERR
;
2225 (void)dispatch_atomic_cmpxchg2o(op
->fd_entry
, err
, 0, err
, relaxed
);
2226 return DISPATCH_OP_FD_ERR
;
2228 return DISPATCH_OP_COMPLETE
;
2233 _dispatch_operation_deliver_data(dispatch_operation_t op
,
2234 dispatch_op_flags_t flags
)
2236 // Either called from stream resp. pick queue or when op is finalized
2237 dispatch_data_t data
= NULL
;
2239 size_t undelivered
= op
->undelivered
+ op
->buf_len
;
2240 bool deliver
= (flags
& (DOP_DELIVER
|DOP_DONE
)) ||
2241 (op
->flags
& DOP_DELIVER
);
2242 op
->flags
= DOP_DEFAULT
;
2244 // Don't deliver data until low water mark has been reached
2245 if (undelivered
>= op
->params
.low
) {
2247 } else if (op
->buf_len
< op
->buf_siz
) {
2248 // Request buffer is not yet used up
2249 _dispatch_fd_debug("buffer data", op
->fd_entry
->fd
);
2254 if (!err
&& (op
->channel
->atomic_flags
& DIO_STOPPED
)) {
2259 // Deliver data or buffer used up
2260 if (op
->direction
== DOP_DIR_READ
) {
2262 void *buf
= op
->buf
;
2263 data
= dispatch_data_create(buf
, op
->buf_len
, NULL
,
2264 DISPATCH_DATA_DESTRUCTOR_FREE
);
2267 dispatch_data_t d
= dispatch_data_create_concat(op
->data
, data
);
2268 _dispatch_io_data_release(op
->data
);
2269 _dispatch_io_data_release(data
);
2274 op
->data
= deliver
? dispatch_data_empty
: data
;
2275 } else if (op
->direction
== DOP_DIR_WRITE
) {
2277 data
= dispatch_data_create_subrange(op
->data
, op
->buf_len
,
2280 if (op
->buf_data
&& op
->buf_len
== op
->buf_siz
) {
2281 _dispatch_io_data_release(op
->buf_data
);
2282 op
->buf_data
= NULL
;
2285 // Trim newly written buffer from head of unwritten data
2288 _dispatch_io_data_retain(data
);
2291 d
= dispatch_data_create_subrange(op
->data
, op
->buf_siz
,
2294 _dispatch_io_data_release(op
->data
);
2298 dispatch_assert(op
->direction
< DOP_DIR_MAX
);
2301 if (!deliver
|| ((flags
& DOP_NO_EMPTY
) && !dispatch_data_get_size(data
))) {
2302 op
->undelivered
= undelivered
;
2303 _dispatch_fd_debug("buffer data", op
->fd_entry
->fd
);
2306 op
->undelivered
= 0;
2307 _dispatch_object_debug(op
, "%s", __func__
);
2308 _dispatch_fd_debug("deliver data", op
->fd_entry
->fd
);
2309 dispatch_op_direction_t direction
= op
->direction
;
2310 dispatch_io_handler_t handler
= op
->handler
;
2311 #if DISPATCH_IO_DEBUG
2312 int fd
= op
->fd_entry
->fd
;
2314 dispatch_fd_entry_t fd_entry
= op
->fd_entry
;
2315 _dispatch_fd_entry_retain(fd_entry
);
2316 dispatch_io_t channel
= op
->channel
;
2317 _dispatch_retain(channel
);
2318 // Note that data delivery may occur after the operation is freed
2319 dispatch_async(op
->op_q
, ^{
2320 bool done
= (flags
& DOP_DONE
);
2321 dispatch_data_t d
= data
;
2323 if (direction
== DOP_DIR_READ
&& err
) {
2324 if (dispatch_data_get_size(d
)) {
2325 _dispatch_fd_debug("IO handler invoke", fd
);
2326 handler(false, d
, 0);
2329 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
2333 _dispatch_fd_debug("IO handler invoke", fd
);
2334 handler(done
, d
, err
);
2335 _dispatch_release(channel
);
2336 _dispatch_fd_entry_release(fd_entry
);
2337 _dispatch_io_data_release(data
);
2342 #pragma mark dispatch_io_debug
2345 _dispatch_io_debug_attr(dispatch_io_t channel
, char* buf
, size_t bufsiz
)
2347 dispatch_queue_t target
= channel
->do_targetq
;
2348 return dsnprintf(buf
, bufsiz
, "type = %s, fd = 0x%x, %sfd_entry = %p, "
2349 "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = "
2350 "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ",
2351 channel
->params
.type
== DISPATCH_IO_STREAM
? "stream" : "random",
2352 channel
->fd_actual
, channel
->atomic_flags
& DIO_STOPPED
?
2353 "stopped, " : channel
->atomic_flags
& DIO_CLOSED
? "closed, " : "",
2354 channel
->fd_entry
, channel
->queue
, target
&& target
->dq_label
?
2355 target
->dq_label
: "", target
, channel
->barrier_queue
,
2356 channel
->barrier_group
, channel
->err
, channel
->params
.low
,
2357 channel
->params
.high
, channel
->params
.interval_flags
&
2358 DISPATCH_IO_STRICT_INTERVAL
? "(strict)" : "",
2359 channel
->params
.interval
);
2363 _dispatch_io_debug(dispatch_io_t channel
, char* buf
, size_t bufsiz
)
2366 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
2367 dx_kind(channel
), channel
);
2368 offset
+= _dispatch_object_debug_attr(channel
, &buf
[offset
],
2370 offset
+= _dispatch_io_debug_attr(channel
, &buf
[offset
], bufsiz
- offset
);
2371 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
2376 _dispatch_operation_debug_attr(dispatch_operation_t op
, char* buf
,
2379 dispatch_queue_t target
= op
->do_targetq
;
2380 dispatch_queue_t oqtarget
= op
->op_q
? op
->op_q
->do_targetq
: NULL
;
2381 return dsnprintf(buf
, bufsiz
, "type = %s %s, fd = 0x%x, fd_entry = %p, "
2382 "channel = %p, queue = %p -> %s[%p], target = %s[%p], "
2383 "offset = %lld, length = %zu, done = %zu, undelivered = %zu, "
2384 "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, "
2385 "interval%s = %llu ", op
->params
.type
== DISPATCH_IO_STREAM
?
2386 "stream" : "random", op
->direction
== DOP_DIR_READ
? "read" :
2387 "write", op
->fd_entry
? op
->fd_entry
->fd
: -1, op
->fd_entry
,
2388 op
->channel
, op
->op_q
, oqtarget
&& oqtarget
->dq_label
?
2389 oqtarget
->dq_label
: "", oqtarget
, target
&& target
->dq_label
?
2390 target
->dq_label
: "", target
, op
->offset
, op
->length
, op
->total
,
2391 op
->undelivered
+ op
->buf_len
, op
->flags
, op
->err
, op
->params
.low
,
2392 op
->params
.high
, op
->params
.interval_flags
&
2393 DISPATCH_IO_STRICT_INTERVAL
? "(strict)" : "", op
->params
.interval
);
2397 _dispatch_operation_debug(dispatch_operation_t op
, char* buf
, size_t bufsiz
)
2400 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
2402 offset
+= _dispatch_object_debug_attr(op
, &buf
[offset
], bufsiz
- offset
);
2403 offset
+= _dispatch_operation_debug_attr(op
, &buf
[offset
], bufsiz
- offset
);
2404 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");