X-Git-Url: https://git.saurik.com/apple/libdispatch.git/blobdiff_plain/00abc8588ccfc6d910a4ae103bbb45112e917d4c..e85f44377864e428703fb21503e29f422c11288f:/src/io.c?ds=inline diff --git a/src/io.c b/src/io.c new file mode 100644 index 0000000..b306054 --- /dev/null +++ b/src/io.c @@ -0,0 +1,2155 @@ +/* + * Copyright (c) 2009-2011 Apple Inc. All rights reserved. + * + * @APPLE_APACHE_LICENSE_HEADER_START@ + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @APPLE_APACHE_LICENSE_HEADER_END@ + */ + +#include "internal.h" + +typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry); + +DISPATCH_EXPORT DISPATCH_NOTHROW +void _dispatch_iocntl(uint32_t param, uint64_t value); + +static void _dispatch_io_dispose(dispatch_io_t channel); +static dispatch_operation_t _dispatch_operation_create( + dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset, + size_t length, dispatch_data_t data, dispatch_queue_t queue, + dispatch_io_handler_t handler); +static void _dispatch_operation_dispose(dispatch_operation_t operation); +static void _dispatch_operation_enqueue(dispatch_operation_t op, + dispatch_op_direction_t direction, dispatch_data_t data); +static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq, + dispatch_operation_t op); +static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry); +static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry); +static void _dispatch_fd_entry_init_async(dispatch_fd_t fd, + dispatch_fd_entry_init_callback_t completion_callback); +static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, + uintptr_t hash); +static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path( + dispatch_io_path_data_t path_data, dev_t dev, mode_t mode); +static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, + dispatch_io_t channel); +static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry, + dispatch_io_t channel); +static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry, + dispatch_queue_t tq); +static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry, + dispatch_op_direction_t direction); +static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev); +static void _dispatch_disk_dispose(dispatch_disk_t disk); +static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream, + dispatch_operation_t operation, dispatch_data_t data); +static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk, + dispatch_operation_t operation, dispatch_data_t data); +static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream, + dispatch_io_t channel); +static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk, + dispatch_io_t channel); +static void _dispatch_stream_source_handler(void *ctx); +static void _dispatch_stream_handler(void *ctx); +static void _dispatch_disk_handler(void *ctx); +static void _dispatch_disk_perform(void *ctxt); +static void _dispatch_operation_advise(dispatch_operation_t op, + size_t chunk_size); +static int _dispatch_operation_perform(dispatch_operation_t op); +static void _dispatch_operation_deliver_data(dispatch_operation_t op, + dispatch_op_flags_t flags); + +// Macros to wrap syscalls which return -1 on error, and retry on EINTR +#define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \ + switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \ + case EINTR: continue; \ + __VA_ARGS__ \ + } \ + } while (0) +#define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \ + _dispatch_io_syscall_switch_noerr(__err, __syscall, \ + case 0: break; \ + __VA_ARGS__ \ + ); \ + } while (0) +#define _dispatch_io_syscall(__syscall) do { int __err; \ + _dispatch_io_syscall_switch(__err, __syscall); \ + } while (0) + +enum { + DISPATCH_OP_COMPLETE = 1, + DISPATCH_OP_DELIVER, + DISPATCH_OP_DELIVER_AND_COMPLETE, + DISPATCH_OP_COMPLETE_RESUME, + DISPATCH_OP_RESUME, + DISPATCH_OP_ERR, + DISPATCH_OP_FD_ERR, +}; + +#pragma mark - +#pragma mark dispatch_io_vtable + +static const struct dispatch_io_vtable_s _dispatch_io_vtable = { + .do_type = DISPATCH_IO_TYPE, + .do_kind = "channel", + .do_dispose = _dispatch_io_dispose, + .do_invoke = NULL, + .do_probe = (void *)dummy_function_r0, + .do_debug = (void *)dummy_function_r0, +}; + +static const struct dispatch_operation_vtable_s _dispatch_operation_vtable = { + .do_type = DISPATCH_OPERATION_TYPE, + .do_kind = "operation", + .do_dispose = _dispatch_operation_dispose, + .do_invoke = NULL, + .do_probe = (void *)dummy_function_r0, + .do_debug = (void *)dummy_function_r0, +}; + +static const struct dispatch_disk_vtable_s _dispatch_disk_vtable = { + .do_type = DISPATCH_DISK_TYPE, + .do_kind = "disk", + .do_dispose = _dispatch_disk_dispose, + .do_invoke = NULL, + .do_probe = (void *)dummy_function_r0, + .do_debug = (void *)dummy_function_r0, +}; + +#pragma mark - +#pragma mark dispatch_io_hashtables + +#if TARGET_OS_EMBEDDED +#define DIO_HASH_SIZE 64u // must be a power of two +#else +#define DIO_HASH_SIZE 256u // must be a power of two +#endif +#define DIO_HASH(x) ((uintptr_t)((x) & (DIO_HASH_SIZE - 1))) + +// Global hashtable of dev_t -> disk_s mappings +DISPATCH_CACHELINE_ALIGN +static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE]; +// Global hashtable of fd -> fd_entry_s mappings +DISPATCH_CACHELINE_ALIGN +static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE]; + +static dispatch_once_t _dispatch_io_devs_lockq_pred; +static dispatch_queue_t _dispatch_io_devs_lockq; +static dispatch_queue_t _dispatch_io_fds_lockq; + +static void +_dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED) +{ + _dispatch_io_fds_lockq = dispatch_queue_create( + "com.apple.libdispatch-io.fd_lockq", NULL); + unsigned int i; + for (i = 0; i < DIO_HASH_SIZE; i++) { + TAILQ_INIT(&_dispatch_io_fds[i]); + } +} + +static void +_dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED) +{ + _dispatch_io_devs_lockq = dispatch_queue_create( + "com.apple.libdispatch-io.dev_lockq", NULL); + unsigned int i; + for (i = 0; i < DIO_HASH_SIZE; i++) { + TAILQ_INIT(&_dispatch_io_devs[i]); + } +} + +#pragma mark - +#pragma mark dispatch_io_defaults + +enum { + DISPATCH_IOCNTL_CHUNK_PAGES = 1, + DISPATCH_IOCNTL_LOW_WATER_CHUNKS, + DISPATCH_IOCNTL_INITIAL_DELIVERY, + DISPATCH_IOCNTL_MAX_PENDING_IO_REQS, +}; + +static struct dispatch_io_defaults_s { + size_t chunk_pages, low_water_chunks, max_pending_io_reqs; + bool initial_delivery; +} dispatch_io_defaults = { + .chunk_pages = DIO_MAX_CHUNK_PAGES, + .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS, + .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS, +}; + +#define _dispatch_iocntl_set_default(p, v) do { \ + dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \ + } while (0) + +void +_dispatch_iocntl(uint32_t param, uint64_t value) +{ + switch (param) { + case DISPATCH_IOCNTL_CHUNK_PAGES: + _dispatch_iocntl_set_default(chunk_pages, value); + break; + case DISPATCH_IOCNTL_LOW_WATER_CHUNKS: + _dispatch_iocntl_set_default(low_water_chunks, value); + break; + case DISPATCH_IOCNTL_INITIAL_DELIVERY: + _dispatch_iocntl_set_default(initial_delivery, value); + case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS: + _dispatch_iocntl_set_default(max_pending_io_reqs, value); + break; + } +} + +#pragma mark - +#pragma mark dispatch_io_t + +static dispatch_io_t +_dispatch_io_create(dispatch_io_type_t type) +{ + dispatch_io_t channel = calloc(1ul, sizeof(struct dispatch_io_s)); + channel->do_vtable = &_dispatch_io_vtable; + channel->do_next = DISPATCH_OBJECT_LISTLESS; + channel->do_ref_cnt = 1; + channel->do_xref_cnt = 1; + channel->do_targetq = _dispatch_get_root_queue(0, true); + channel->params.type = type; + channel->params.high = SIZE_MAX; + channel->params.low = dispatch_io_defaults.low_water_chunks * + dispatch_io_defaults.chunk_pages * PAGE_SIZE; + channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq", + NULL); + return channel; +} + +static void +_dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry, + dispatch_queue_t queue, int err, void (^cleanup_handler)(int)) +{ + // Enqueue the cleanup handler on the suspended close queue + if (cleanup_handler) { + _dispatch_retain(queue); + dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{ + dispatch_async(queue, ^{ + _dispatch_io_debug("cleanup handler invoke", -1); + cleanup_handler(err); + }); + _dispatch_release(queue); + }); + } + if (fd_entry) { + channel->fd_entry = fd_entry; + dispatch_retain(fd_entry->barrier_queue); + dispatch_retain(fd_entry->barrier_group); + channel->barrier_queue = fd_entry->barrier_queue; + channel->barrier_group = fd_entry->barrier_group; + } else { + // Still need to create a barrier queue, since all operations go + // through it + channel->barrier_queue = dispatch_queue_create( + "com.apple.libdispatch-io.barrierq", NULL); + channel->barrier_group = dispatch_group_create(); + } +} + +static void +_dispatch_io_dispose(dispatch_io_t channel) +{ + if (channel->fd_entry) { + if (channel->fd_entry->path_data) { + // This modification is safe since path_data->channel is checked + // only on close_queue (which is still suspended at this point) + channel->fd_entry->path_data->channel = NULL; + } + // Cleanup handlers will only run when all channels related to this + // fd are complete + _dispatch_fd_entry_release(channel->fd_entry); + } + if (channel->queue) { + dispatch_release(channel->queue); + } + if (channel->barrier_queue) { + dispatch_release(channel->barrier_queue); + } + if (channel->barrier_group) { + dispatch_release(channel->barrier_group); + } + _dispatch_dispose(channel); +} + +static int +_dispatch_io_validate_type(dispatch_io_t channel, mode_t mode) +{ + int err = 0; + if (S_ISDIR(mode)) { + err = EISDIR; + } else if (channel->params.type == DISPATCH_IO_RANDOM && + (S_ISFIFO(mode) || S_ISSOCK(mode))) { + err = ESPIPE; + } + return err; +} + +static int +_dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel, + bool ignore_closed) +{ + // On _any_ queue + int err; + if (op) { + channel = op->channel; + } + if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) { + if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) { + err = ECANCELED; + } else { + err = 0; + } + } else { + err = op ? op->fd_entry->err : channel->err; + } + return err; +} + +#pragma mark - +#pragma mark dispatch_io_channels + +dispatch_io_t +dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd, + dispatch_queue_t queue, void (^cleanup_handler)(int)) +{ + if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) { + return NULL; + } + _dispatch_io_debug("io create", fd); + dispatch_io_t channel = _dispatch_io_create(type); + channel->fd = fd; + channel->fd_actual = fd; + dispatch_suspend(channel->queue); + _dispatch_retain(queue); + _dispatch_retain(channel); + _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) { + // On barrier queue + int err = fd_entry->err; + if (!err) { + err = _dispatch_io_validate_type(channel, fd_entry->stat.mode); + } + if (!err && type == DISPATCH_IO_RANDOM) { + off_t f_ptr; + _dispatch_io_syscall_switch_noerr(err, + f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR), + case 0: channel->f_ptr = f_ptr; break; + default: (void)dispatch_assume_zero(err); break; + ); + } + channel->err = err; + _dispatch_fd_entry_retain(fd_entry); + _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler); + dispatch_resume(channel->queue); + _dispatch_release(channel); + _dispatch_release(queue); + }); + return channel; +} + +dispatch_io_t +dispatch_io_create_with_path(dispatch_io_type_t type, const char *path, + int oflag, mode_t mode, dispatch_queue_t queue, + void (^cleanup_handler)(int error)) +{ + if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) || + !(path && *path == '/')) { + return NULL; + } + size_t pathlen = strlen(path); + dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1); + if (!path_data) { + return NULL; + } + _dispatch_io_debug("io create with path %s", -1, path); + dispatch_io_t channel = _dispatch_io_create(type); + channel->fd = -1; + channel->fd_actual = -1; + path_data->channel = channel; + path_data->oflag = oflag; + path_data->mode = mode; + path_data->pathlen = pathlen; + memcpy(path_data->path, path, pathlen + 1); + _dispatch_retain(queue); + _dispatch_retain(channel); + dispatch_async(channel->queue, ^{ + int err = 0; + struct stat st; + _dispatch_io_syscall_switch_noerr(err, + (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW || + (path_data->oflag & O_SYMLINK) == O_SYMLINK ? + lstat(path_data->path, &st) : stat(path_data->path, &st), + case 0: + err = _dispatch_io_validate_type(channel, st.st_mode); + break; + default: + if ((path_data->oflag & O_CREAT) && + (*(path_data->path + path_data->pathlen - 1) != '/')) { + // Check parent directory + char *c = strrchr(path_data->path, '/'); + dispatch_assert(c); + *c = 0; + int perr; + _dispatch_io_syscall_switch_noerr(perr, + stat(path_data->path, &st), + case 0: + // Since the parent directory exists, open() will + // create a regular file after the fd_entry has + // been filled in + st.st_mode = S_IFREG; + err = 0; + break; + ); + *c = '/'; + } + break; + ); + channel->err = err; + if (err) { + free(path_data); + _dispatch_io_init(channel, NULL, queue, err, cleanup_handler); + _dispatch_release(channel); + _dispatch_release(queue); + return; + } + dispatch_suspend(channel->queue); + dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL, + _dispatch_io_devs_lockq_init); + dispatch_async(_dispatch_io_devs_lockq, ^{ + dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path( + path_data, st.st_dev, st.st_mode); + _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler); + dispatch_resume(channel->queue); + _dispatch_release(channel); + _dispatch_release(queue); + }); + }); + return channel; +} + +dispatch_io_t +dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel, + dispatch_queue_t queue, void (^cleanup_handler)(int error)) +{ + if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) { + return NULL; + } + _dispatch_io_debug("io create with io %p", -1, in_channel); + dispatch_io_t channel = _dispatch_io_create(type); + dispatch_suspend(channel->queue); + _dispatch_retain(queue); + _dispatch_retain(channel); + _dispatch_retain(in_channel); + dispatch_async(in_channel->queue, ^{ + int err0 = _dispatch_io_get_error(NULL, in_channel, false); + if (err0) { + channel->err = err0; + _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler); + dispatch_resume(channel->queue); + _dispatch_release(channel); + _dispatch_release(in_channel); + _dispatch_release(queue); + return; + } + dispatch_async(in_channel->barrier_queue, ^{ + int err = _dispatch_io_get_error(NULL, in_channel, false); + // If there is no error, the fd_entry for the in_channel is valid. + // Since we are running on in_channel's queue, the fd_entry has been + // fully resolved and will stay valid for the duration of this block + if (!err) { + err = in_channel->err; + if (!err) { + err = in_channel->fd_entry->err; + } + } + if (!err) { + err = _dispatch_io_validate_type(channel, + in_channel->fd_entry->stat.mode); + } + if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) { + off_t f_ptr; + _dispatch_io_syscall_switch_noerr(err, + f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR), + case 0: channel->f_ptr = f_ptr; break; + default: (void)dispatch_assume_zero(err); break; + ); + } + channel->err = err; + if (err) { + _dispatch_io_init(channel, NULL, queue, err, cleanup_handler); + dispatch_resume(channel->queue); + _dispatch_release(channel); + _dispatch_release(in_channel); + _dispatch_release(queue); + return; + } + if (in_channel->fd == -1) { + // in_channel was created from path + channel->fd = -1; + channel->fd_actual = -1; + mode_t mode = in_channel->fd_entry->stat.mode; + dev_t dev = in_channel->fd_entry->stat.dev; + size_t path_data_len = sizeof(struct dispatch_io_path_data_s) + + in_channel->fd_entry->path_data->pathlen + 1; + dispatch_io_path_data_t path_data = malloc(path_data_len); + memcpy(path_data, in_channel->fd_entry->path_data, + path_data_len); + path_data->channel = channel; + // lockq_io_devs is known to already exist + dispatch_async(_dispatch_io_devs_lockq, ^{ + dispatch_fd_entry_t fd_entry; + fd_entry = _dispatch_fd_entry_create_with_path(path_data, + dev, mode); + _dispatch_io_init(channel, fd_entry, queue, 0, + cleanup_handler); + dispatch_resume(channel->queue); + _dispatch_release(channel); + _dispatch_release(queue); + }); + } else { + dispatch_fd_entry_t fd_entry = in_channel->fd_entry; + channel->fd = in_channel->fd; + channel->fd_actual = in_channel->fd_actual; + _dispatch_fd_entry_retain(fd_entry); + _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler); + dispatch_resume(channel->queue); + _dispatch_release(channel); + _dispatch_release(queue); + } + _dispatch_release(in_channel); + }); + }); + return channel; +} + +#pragma mark - +#pragma mark dispatch_io_accessors + +void +dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water) +{ + _dispatch_retain(channel); + dispatch_async(channel->queue, ^{ + _dispatch_io_debug("io set high water", channel->fd); + if (channel->params.low > high_water) { + channel->params.low = high_water; + } + channel->params.high = high_water ? high_water : 1; + _dispatch_release(channel); + }); +} + +void +dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water) +{ + _dispatch_retain(channel); + dispatch_async(channel->queue, ^{ + _dispatch_io_debug("io set low water", channel->fd); + if (channel->params.high < low_water) { + channel->params.high = low_water ? low_water : 1; + } + channel->params.low = low_water; + _dispatch_release(channel); + }); +} + +void +dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval, + unsigned long flags) +{ + _dispatch_retain(channel); + dispatch_async(channel->queue, ^{ + _dispatch_io_debug("io set interval", channel->fd); + channel->params.interval = interval; + channel->params.interval_flags = flags; + _dispatch_release(channel); + }); +} + +void +_dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq) +{ + _dispatch_retain(dq); + _dispatch_retain(channel); + dispatch_async(channel->queue, ^{ + dispatch_queue_t prev_dq = channel->do_targetq; + channel->do_targetq = dq; + _dispatch_release(prev_dq); + _dispatch_release(channel); + }); +} + +dispatch_fd_t +dispatch_io_get_descriptor(dispatch_io_t channel) +{ + if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) { + return -1; + } + dispatch_fd_t fd = channel->fd_actual; + if (fd == -1 && + _dispatch_thread_getspecific(dispatch_io_key) == channel) { + dispatch_fd_entry_t fd_entry = channel->fd_entry; + (void)_dispatch_fd_entry_open(fd_entry, channel); + } + return channel->fd_actual; +} + +#pragma mark - +#pragma mark dispatch_io_operations + +static void +_dispatch_io_stop(dispatch_io_t channel) +{ + _dispatch_io_debug("io stop", channel->fd); + (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED); + _dispatch_retain(channel); + dispatch_async(channel->queue, ^{ + dispatch_async(channel->barrier_queue, ^{ + dispatch_fd_entry_t fd_entry = channel->fd_entry; + if (fd_entry) { + _dispatch_io_debug("io stop cleanup", channel->fd); + _dispatch_fd_entry_cleanup_operations(fd_entry, channel); + channel->fd_entry = NULL; + _dispatch_fd_entry_release(fd_entry); + } else if (channel->fd != -1) { + // Stop after close, need to check if fd_entry still exists + _dispatch_retain(channel); + dispatch_async(_dispatch_io_fds_lockq, ^{ + _dispatch_io_debug("io stop after close cleanup", + channel->fd); + dispatch_fd_entry_t fdi; + uintptr_t hash = DIO_HASH(channel->fd); + TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) { + if (fdi->fd == channel->fd) { + _dispatch_fd_entry_cleanup_operations(fdi, channel); + break; + } + } + _dispatch_release(channel); + }); + } + _dispatch_release(channel); + }); + }); +} + +void +dispatch_io_close(dispatch_io_t channel, unsigned long flags) +{ + if (flags & DISPATCH_IO_STOP) { + // Don't stop an already stopped channel + if (channel->atomic_flags & DIO_STOPPED) { + return; + } + return _dispatch_io_stop(channel); + } + // Don't close an already closed or stopped channel + if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) { + return; + } + _dispatch_retain(channel); + dispatch_async(channel->queue, ^{ + dispatch_async(channel->barrier_queue, ^{ + _dispatch_io_debug("io close", channel->fd); + (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED); + dispatch_fd_entry_t fd_entry = channel->fd_entry; + if (fd_entry) { + if (!fd_entry->path_data) { + channel->fd_entry = NULL; + } + _dispatch_fd_entry_release(fd_entry); + } + _dispatch_release(channel); + }); + }); +} + +void +dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier) +{ + _dispatch_retain(channel); + dispatch_async(channel->queue, ^{ + dispatch_queue_t io_q = channel->do_targetq; + dispatch_queue_t barrier_queue = channel->barrier_queue; + dispatch_group_t barrier_group = channel->barrier_group; + dispatch_async(barrier_queue, ^{ + dispatch_suspend(barrier_queue); + dispatch_group_notify(barrier_group, io_q, ^{ + _dispatch_thread_setspecific(dispatch_io_key, channel); + barrier(); + _dispatch_thread_setspecific(dispatch_io_key, NULL); + dispatch_resume(barrier_queue); + _dispatch_release(channel); + }); + }); + }); +} + +void +dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length, + dispatch_queue_t queue, dispatch_io_handler_t handler) +{ + _dispatch_retain(channel); + _dispatch_retain(queue); + dispatch_async(channel->queue, ^{ + dispatch_operation_t op; + op = _dispatch_operation_create(DOP_DIR_READ, channel, offset, + length, dispatch_data_empty, queue, handler); + if (op) { + dispatch_queue_t barrier_q = channel->barrier_queue; + dispatch_async(barrier_q, ^{ + _dispatch_operation_enqueue(op, DOP_DIR_READ, + dispatch_data_empty); + }); + } + _dispatch_release(channel); + _dispatch_release(queue); + }); +} + +void +dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data, + dispatch_queue_t queue, dispatch_io_handler_t handler) +{ + _dispatch_io_data_retain(data); + _dispatch_retain(channel); + _dispatch_retain(queue); + dispatch_async(channel->queue, ^{ + dispatch_operation_t op; + op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset, + dispatch_data_get_size(data), data, queue, handler); + if (op) { + dispatch_queue_t barrier_q = channel->barrier_queue; + dispatch_async(barrier_q, ^{ + _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data); + _dispatch_io_data_release(data); + }); + } else { + _dispatch_io_data_release(data); + } + _dispatch_release(channel); + _dispatch_release(queue); + }); +} + +void +dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue, + void (^handler)(dispatch_data_t, int)) +{ + _dispatch_retain(queue); + _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) { + // On barrier queue + if (fd_entry->err) { + int err = fd_entry->err; + dispatch_async(queue, ^{ + _dispatch_io_debug("convenience handler invoke", fd); + handler(dispatch_data_empty, err); + }); + _dispatch_release(queue); + return; + } + // Safe to access fd_entry on barrier queue + dispatch_io_t channel = fd_entry->convenience_channel; + if (!channel) { + channel = _dispatch_io_create(DISPATCH_IO_STREAM); + channel->fd = fd; + channel->fd_actual = fd; + channel->fd_entry = fd_entry; + dispatch_retain(fd_entry->barrier_queue); + dispatch_retain(fd_entry->barrier_group); + channel->barrier_queue = fd_entry->barrier_queue; + channel->barrier_group = fd_entry->barrier_group; + fd_entry->convenience_channel = channel; + } + __block dispatch_data_t deliver_data = dispatch_data_empty; + __block int err = 0; + dispatch_async(fd_entry->close_queue, ^{ + dispatch_async(queue, ^{ + _dispatch_io_debug("convenience handler invoke", fd); + handler(deliver_data, err); + _dispatch_io_data_release(deliver_data); + }); + _dispatch_release(queue); + }); + dispatch_operation_t op = + _dispatch_operation_create(DOP_DIR_READ, channel, 0, + length, dispatch_data_empty, + _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, + false), ^(bool done, dispatch_data_t data, int error) { + if (data) { + data = dispatch_data_create_concat(deliver_data, data); + _dispatch_io_data_release(deliver_data); + deliver_data = data; + } + if (done) { + err = error; + } + }); + if (op) { + _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty); + } + }); +} + +void +dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue, + void (^handler)(dispatch_data_t, int)) +{ + _dispatch_io_data_retain(data); + _dispatch_retain(queue); + _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) { + // On barrier queue + if (fd_entry->err) { + int err = fd_entry->err; + dispatch_async(queue, ^{ + _dispatch_io_debug("convenience handler invoke", fd); + handler(NULL, err); + }); + _dispatch_release(queue); + return; + } + // Safe to access fd_entry on barrier queue + dispatch_io_t channel = fd_entry->convenience_channel; + if (!channel) { + channel = _dispatch_io_create(DISPATCH_IO_STREAM); + channel->fd = fd; + channel->fd_actual = fd; + channel->fd_entry = fd_entry; + dispatch_retain(fd_entry->barrier_queue); + dispatch_retain(fd_entry->barrier_group); + channel->barrier_queue = fd_entry->barrier_queue; + channel->barrier_group = fd_entry->barrier_group; + fd_entry->convenience_channel = channel; + } + __block dispatch_data_t deliver_data = NULL; + __block int err = 0; + dispatch_async(fd_entry->close_queue, ^{ + dispatch_async(queue, ^{ + _dispatch_io_debug("convenience handler invoke", fd); + handler(deliver_data, err); + if (deliver_data) { + _dispatch_io_data_release(deliver_data); + } + }); + _dispatch_release(queue); + }); + dispatch_operation_t op = + _dispatch_operation_create(DOP_DIR_WRITE, channel, 0, + dispatch_data_get_size(data), data, + _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, + false), ^(bool done, dispatch_data_t d, int error) { + if (done) { + if (d) { + _dispatch_io_data_retain(d); + deliver_data = d; + } + err = error; + } + }); + if (op) { + _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data); + } + _dispatch_io_data_release(data); + }); +} + +#pragma mark - +#pragma mark dispatch_operation_t + +static dispatch_operation_t +_dispatch_operation_create(dispatch_op_direction_t direction, + dispatch_io_t channel, off_t offset, size_t length, + dispatch_data_t data, dispatch_queue_t queue, + dispatch_io_handler_t handler) +{ + // On channel queue + dispatch_assert(direction < DOP_DIR_MAX); + _dispatch_io_debug("operation create", channel->fd); +#if DISPATCH_IO_DEBUG + int fd = channel->fd; +#endif + // Safe to call _dispatch_io_get_error() with channel->fd_entry since + // that can only be NULL if atomic_flags are set rdar://problem/8362514 + int err = _dispatch_io_get_error(NULL, channel, false); + if (err || !length) { + _dispatch_io_data_retain(data); + _dispatch_retain(queue); + dispatch_async(channel->barrier_queue, ^{ + dispatch_async(queue, ^{ + dispatch_data_t d = data; + if (direction == DOP_DIR_READ && err) { + d = NULL; + } else if (direction == DOP_DIR_WRITE && !err) { + d = NULL; + } + _dispatch_io_debug("IO handler invoke", fd); + handler(true, d, err); + _dispatch_io_data_release(data); + }); + _dispatch_release(queue); + }); + return NULL; + } + dispatch_operation_t op; + op = calloc(1ul, sizeof(struct dispatch_operation_s)); + op->do_vtable = &_dispatch_operation_vtable; + op->do_next = DISPATCH_OBJECT_LISTLESS; + op->do_ref_cnt = 1; + op->do_xref_cnt = 0; // operation object is not exposed externally + op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL); + op->op_q->do_targetq = queue; + _dispatch_retain(queue); + op->active = false; + op->direction = direction; + op->offset = offset + channel->f_ptr; + op->length = length; + op->handler = Block_copy(handler); + _dispatch_retain(channel); + op->channel = channel; + op->params = channel->params; + // Take a snapshot of the priority of the channel queue. The actual I/O + // for this operation will be performed at this priority + dispatch_queue_t targetq = op->channel->do_targetq; + while (fastpath(targetq->do_targetq)) { + targetq = targetq->do_targetq; + } + op->do_targetq = targetq; + return op; +} + +static void +_dispatch_operation_dispose(dispatch_operation_t op) +{ + // Deliver the data if there's any + if (op->fd_entry) { + _dispatch_operation_deliver_data(op, DOP_DONE); + dispatch_group_leave(op->fd_entry->barrier_group); + _dispatch_fd_entry_release(op->fd_entry); + } + if (op->channel) { + _dispatch_release(op->channel); + } + if (op->timer) { + dispatch_release(op->timer); + } + // For write operations, op->buf is owned by op->buf_data + if (op->buf && op->direction == DOP_DIR_READ) { + free(op->buf); + } + if (op->buf_data) { + _dispatch_io_data_release(op->buf_data); + } + if (op->data) { + _dispatch_io_data_release(op->data); + } + if (op->op_q) { + dispatch_release(op->op_q); + } + Block_release(op->handler); + _dispatch_dispose(op); +} + +static void +_dispatch_operation_enqueue(dispatch_operation_t op, + dispatch_op_direction_t direction, dispatch_data_t data) +{ + // Called from the barrier queue + _dispatch_io_data_retain(data); + // If channel is closed or stopped, then call the handler immediately + int err = _dispatch_io_get_error(NULL, op->channel, false); + if (err) { + dispatch_io_handler_t handler = op->handler; + dispatch_async(op->op_q, ^{ + dispatch_data_t d = data; + if (direction == DOP_DIR_READ && err) { + d = NULL; + } else if (direction == DOP_DIR_WRITE && !err) { + d = NULL; + } + handler(true, d, err); + _dispatch_io_data_release(data); + }); + _dispatch_release(op); + return; + } + // Finish operation init + op->fd_entry = op->channel->fd_entry; + _dispatch_fd_entry_retain(op->fd_entry); + dispatch_group_enter(op->fd_entry->barrier_group); + dispatch_disk_t disk = op->fd_entry->disk; + if (!disk) { + dispatch_stream_t stream = op->fd_entry->streams[direction]; + dispatch_async(stream->dq, ^{ + _dispatch_stream_enqueue_operation(stream, op, data); + _dispatch_io_data_release(data); + }); + } else { + dispatch_async(disk->pick_queue, ^{ + _dispatch_disk_enqueue_operation(disk, op, data); + _dispatch_io_data_release(data); + }); + } +} + +static bool +_dispatch_operation_should_enqueue(dispatch_operation_t op, + dispatch_queue_t tq, dispatch_data_t data) +{ + // On stream queue or disk queue + _dispatch_io_debug("enqueue operation", op->fd_entry->fd); + _dispatch_io_data_retain(data); + op->data = data; + int err = _dispatch_io_get_error(op, NULL, true); + if (err) { + op->err = err; + // Final release + _dispatch_release(op); + return false; + } + if (op->params.interval) { + dispatch_resume(_dispatch_operation_timer(tq, op)); + } + return true; +} + +static dispatch_source_t +_dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op) +{ + // On stream queue or pick queue + if (op->timer) { + return op->timer; + } + dispatch_source_t timer = dispatch_source_create( + DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq); + dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW, + op->params.interval), op->params.interval, 0); + dispatch_source_set_event_handler(timer, ^{ + // On stream queue or pick queue + if (dispatch_source_testcancel(timer)) { + // Do nothing. The operation has already completed + return; + } + dispatch_op_flags_t flags = DOP_DEFAULT; + if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) { + // Deliver even if there is less data than the low-water mark + flags |= DOP_DELIVER; + } + // If the operation is active, dont deliver data + if ((op->active) && (flags & DOP_DELIVER)) { + op->flags = flags; + } else { + _dispatch_operation_deliver_data(op, flags); + } + }); + op->timer = timer; + return op->timer; +} + +#pragma mark - +#pragma mark dispatch_fd_entry_t + +static inline void +_dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) { + dispatch_suspend(fd_entry->close_queue); +} + +static inline void +_dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) { + dispatch_resume(fd_entry->close_queue); +} + +static void +_dispatch_fd_entry_init_async(dispatch_fd_t fd, + dispatch_fd_entry_init_callback_t completion_callback) +{ + static dispatch_once_t _dispatch_io_fds_lockq_pred; + dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL, + _dispatch_io_fds_lockq_init); + dispatch_async(_dispatch_io_fds_lockq, ^{ + _dispatch_io_debug("fd entry init", fd); + dispatch_fd_entry_t fd_entry = NULL; + // Check to see if there is an existing entry for the given fd + uintptr_t hash = DIO_HASH(fd); + TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) { + if (fd_entry->fd == fd) { + // Retain the fd_entry to ensure it cannot go away until the + // stat() has completed + _dispatch_fd_entry_retain(fd_entry); + break; + } + } + if (!fd_entry) { + // If we did not find an existing entry, create one + fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash); + } + dispatch_async(fd_entry->barrier_queue, ^{ + _dispatch_io_debug("fd entry init completion", fd); + completion_callback(fd_entry); + // stat() is complete, release reference to fd_entry + _dispatch_fd_entry_release(fd_entry); + }); + }); +} + +static dispatch_fd_entry_t +_dispatch_fd_entry_create(dispatch_queue_t q) +{ + dispatch_fd_entry_t fd_entry; + fd_entry = calloc(1ul, sizeof(struct dispatch_fd_entry_s)); + fd_entry->close_queue = dispatch_queue_create( + "com.apple.libdispatch-io.closeq", NULL); + // Use target queue to ensure that no concurrent lookups are going on when + // the close queue is running + fd_entry->close_queue->do_targetq = q; + _dispatch_retain(q); + // Suspend the cleanup queue until closing + _dispatch_fd_entry_retain(fd_entry); + return fd_entry; +} + +static dispatch_fd_entry_t +_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash) +{ + // On fds lock queue + _dispatch_io_debug("fd entry create", fd); + dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create( + _dispatch_io_fds_lockq); + fd_entry->fd = fd; + TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list); + fd_entry->barrier_queue = dispatch_queue_create( + "com.apple.libdispatch-io.barrierq", NULL); + fd_entry->barrier_group = dispatch_group_create(); + dispatch_async(fd_entry->barrier_queue, ^{ + _dispatch_io_debug("fd entry stat", fd); + int err, orig_flags, orig_nosigpipe = -1; + struct stat st; + _dispatch_io_syscall_switch(err, + fstat(fd, &st), + default: fd_entry->err = err; return; + ); + fd_entry->stat.dev = st.st_dev; + fd_entry->stat.mode = st.st_mode; + _dispatch_io_syscall_switch(err, + orig_flags = fcntl(fd, F_GETFL), + default: (void)dispatch_assume_zero(err); break; + ); +#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123 + if (S_ISFIFO(st.st_mode)) { + _dispatch_io_syscall_switch(err, + orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE), + default: (void)dispatch_assume_zero(err); break; + ); + if (orig_nosigpipe != -1) { + _dispatch_io_syscall_switch(err, + orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1), + default: + orig_nosigpipe = -1; + (void)dispatch_assume_zero(err); + break; + ); + } + } +#endif + if (S_ISREG(st.st_mode)) { + if (orig_flags != -1) { + _dispatch_io_syscall_switch(err, + fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK), + default: + orig_flags = -1; + (void)dispatch_assume_zero(err); + break; + ); + } + int32_t dev = major(st.st_dev); + // We have to get the disk on the global dev queue. The + // barrier queue cannot continue until that is complete + dispatch_suspend(fd_entry->barrier_queue); + dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL, + _dispatch_io_devs_lockq_init); + dispatch_async(_dispatch_io_devs_lockq, ^{ + _dispatch_disk_init(fd_entry, dev); + dispatch_resume(fd_entry->barrier_queue); + }); + } else { + if (orig_flags != -1) { + _dispatch_io_syscall_switch(err, + fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK), + default: + orig_flags = -1; + (void)dispatch_assume_zero(err); + break; + ); + } + _dispatch_stream_init(fd_entry, _dispatch_get_root_queue( + DISPATCH_QUEUE_PRIORITY_DEFAULT, false)); + } + fd_entry->orig_flags = orig_flags; + fd_entry->orig_nosigpipe = orig_nosigpipe; + }); + // This is the first item run when the close queue is resumed, indicating + // that all channels associated with this entry have been closed and that + // all operations associated with this entry have been freed + dispatch_async(fd_entry->close_queue, ^{ + if (!fd_entry->disk) { + _dispatch_io_debug("close queue fd_entry cleanup", fd); + dispatch_op_direction_t dir; + for (dir = 0; dir < DOP_DIR_MAX; dir++) { + _dispatch_stream_dispose(fd_entry, dir); + } + } else { + dispatch_disk_t disk = fd_entry->disk; + dispatch_async(_dispatch_io_devs_lockq, ^{ + _dispatch_release(disk); + }); + } + // Remove this entry from the global fd list + TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list); + }); + // If there was a source associated with this stream, disposing of the + // source cancels it and suspends the close queue. Freeing the fd_entry + // structure must happen after the source cancel handler has finished + dispatch_async(fd_entry->close_queue, ^{ + _dispatch_io_debug("close queue release", fd); + dispatch_release(fd_entry->close_queue); + _dispatch_io_debug("barrier queue release", fd); + dispatch_release(fd_entry->barrier_queue); + _dispatch_io_debug("barrier group release", fd); + dispatch_release(fd_entry->barrier_group); + if (fd_entry->orig_flags != -1) { + _dispatch_io_syscall( + fcntl(fd, F_SETFL, fd_entry->orig_flags) + ); + } +#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123 + if (fd_entry->orig_nosigpipe != -1) { + _dispatch_io_syscall( + fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe) + ); + } +#endif + if (fd_entry->convenience_channel) { + fd_entry->convenience_channel->fd_entry = NULL; + dispatch_release(fd_entry->convenience_channel); + } + free(fd_entry); + }); + return fd_entry; +} + +static dispatch_fd_entry_t +_dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data, + dev_t dev, mode_t mode) +{ + // On devs lock queue + _dispatch_io_debug("fd entry create with path %s", -1, path_data->path); + dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create( + path_data->channel->queue); + if (S_ISREG(mode)) { + _dispatch_disk_init(fd_entry, major(dev)); + } else { + _dispatch_stream_init(fd_entry, _dispatch_get_root_queue( + DISPATCH_QUEUE_PRIORITY_DEFAULT, false)); + } + fd_entry->fd = -1; + fd_entry->orig_flags = -1; + fd_entry->path_data = path_data; + fd_entry->stat.dev = dev; + fd_entry->stat.mode = mode; + fd_entry->barrier_queue = dispatch_queue_create( + "com.apple.libdispatch-io.barrierq", NULL); + fd_entry->barrier_group = dispatch_group_create(); + // This is the first item run when the close queue is resumed, indicating + // that the channel associated with this entry has been closed and that + // all operations associated with this entry have been freed + dispatch_async(fd_entry->close_queue, ^{ + _dispatch_io_debug("close queue fd_entry cleanup", -1); + if (!fd_entry->disk) { + dispatch_op_direction_t dir; + for (dir = 0; dir < DOP_DIR_MAX; dir++) { + _dispatch_stream_dispose(fd_entry, dir); + } + } + if (fd_entry->fd != -1) { + close(fd_entry->fd); + } + if (fd_entry->path_data->channel) { + // If associated channel has not been released yet, mark it as + // no longer having an fd_entry (for stop after close). + // It is safe to modify channel since we are on close_queue with + // target queue the channel queue + fd_entry->path_data->channel->fd_entry = NULL; + } + }); + dispatch_async(fd_entry->close_queue, ^{ + _dispatch_io_debug("close queue release", -1); + dispatch_release(fd_entry->close_queue); + dispatch_release(fd_entry->barrier_queue); + dispatch_release(fd_entry->barrier_group); + free(fd_entry->path_data); + free(fd_entry); + }); + return fd_entry; +} + +static int +_dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel) +{ + if (!(fd_entry->fd == -1 && fd_entry->path_data)) { + return 0; + } + if (fd_entry->err) { + return fd_entry->err; + } + int fd = -1; + int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK : + fd_entry->path_data->oflag | O_NONBLOCK; +open: + fd = open(fd_entry->path_data->path, oflag, fd_entry->path_data->mode); + if (fd == -1) { + int err = errno; + if (err == EINTR) { + goto open; + } + (void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err); + return err; + } + if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd)) { + // Lost the race with another open + close(fd); + } else { + channel->fd_actual = fd; + } + return 0; +} + +static void +_dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry, + dispatch_io_t channel) +{ + if (fd_entry->disk) { + if (channel) { + _dispatch_retain(channel); + } + _dispatch_fd_entry_retain(fd_entry); + dispatch_async(fd_entry->disk->pick_queue, ^{ + _dispatch_disk_cleanup_operations(fd_entry->disk, channel); + _dispatch_fd_entry_release(fd_entry); + if (channel) { + _dispatch_release(channel); + } + }); + } else { + dispatch_op_direction_t direction; + for (direction = 0; direction < DOP_DIR_MAX; direction++) { + dispatch_stream_t stream = fd_entry->streams[direction]; + if (!stream) { + continue; + } + if (channel) { + _dispatch_retain(channel); + } + _dispatch_fd_entry_retain(fd_entry); + dispatch_async(stream->dq, ^{ + _dispatch_stream_cleanup_operations(stream, channel); + _dispatch_fd_entry_release(fd_entry); + if (channel) { + _dispatch_release(channel); + } + }); + } + } +} + +#pragma mark - +#pragma mark dispatch_stream_t/dispatch_disk_t + +static void +_dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq) +{ + dispatch_op_direction_t direction; + for (direction = 0; direction < DOP_DIR_MAX; direction++) { + dispatch_stream_t stream; + stream = calloc(1ul, sizeof(struct dispatch_stream_s)); + stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq", + NULL); + _dispatch_retain(tq); + stream->dq->do_targetq = tq; + TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]); + TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]); + fd_entry->streams[direction] = stream; + } +} + +static void +_dispatch_stream_dispose(dispatch_fd_entry_t fd_entry, + dispatch_op_direction_t direction) +{ + // On close queue + dispatch_stream_t stream = fd_entry->streams[direction]; + if (!stream) { + return; + } + dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])); + dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])); + if (stream->source) { + // Balanced by source cancel handler: + _dispatch_fd_entry_retain(fd_entry); + dispatch_source_cancel(stream->source); + dispatch_resume(stream->source); + dispatch_release(stream->source); + } + dispatch_release(stream->dq); + free(stream); +} + +static void +_dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev) +{ + // On devs lock queue + dispatch_disk_t disk; + char label_name[256]; + // Check to see if there is an existing entry for the given device + uintptr_t hash = DIO_HASH(dev); + TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) { + if (disk->dev == dev) { + _dispatch_retain(disk); + goto out; + } + } + // Otherwise create a new entry + size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs; + disk = calloc(1ul, sizeof(struct dispatch_disk_s) + (pending_reqs_depth * + sizeof(dispatch_operation_t))); + disk->do_vtable = &_dispatch_disk_vtable; + disk->do_next = DISPATCH_OBJECT_LISTLESS; + disk->do_ref_cnt = 1; + disk->do_xref_cnt = 0; + disk->advise_list_depth = pending_reqs_depth; + disk->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, + false); + disk->dev = dev; + TAILQ_INIT(&disk->operations); + disk->cur_rq = TAILQ_FIRST(&disk->operations); + sprintf(label_name, "com.apple.libdispatch-io.deviceq.%d", dev); + disk->pick_queue = dispatch_queue_create(label_name, NULL); + TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list); +out: + fd_entry->disk = disk; + TAILQ_INIT(&fd_entry->stream_ops); +} + +static void +_dispatch_disk_dispose(dispatch_disk_t disk) +{ + uintptr_t hash = DIO_HASH(disk->dev); + TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list); + dispatch_assert(TAILQ_EMPTY(&disk->operations)); + size_t i; + for (i=0; iadvise_list_depth; ++i) { + dispatch_assert(!disk->advise_list[i]); + } + dispatch_release(disk->pick_queue); + free(disk); +} + +#pragma mark - +#pragma mark dispatch_stream_operations/dispatch_disk_operations + +static inline bool +_dispatch_stream_operation_avail(dispatch_stream_t stream) +{ + return !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) || + !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])); +} + +static void +_dispatch_stream_enqueue_operation(dispatch_stream_t stream, + dispatch_operation_t op, dispatch_data_t data) +{ + if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) { + return; + } + bool no_ops = !_dispatch_stream_operation_avail(stream); + TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list); + if (no_ops) { + dispatch_async_f(stream->dq, stream, _dispatch_stream_handler); + } +} + +static void +_dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op, + dispatch_data_t data) +{ + if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) { + return; + } + if (op->params.type == DISPATCH_IO_STREAM) { + if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) { + TAILQ_INSERT_TAIL(&disk->operations, op, operation_list); + } + TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list); + } else { + TAILQ_INSERT_TAIL(&disk->operations, op, operation_list); + } + _dispatch_disk_handler(disk); +} + +static void +_dispatch_stream_complete_operation(dispatch_stream_t stream, + dispatch_operation_t op) +{ + // On stream queue + _dispatch_io_debug("complete operation", op->fd_entry->fd); + TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list); + if (op == stream->op) { + stream->op = NULL; + } + if (op->timer) { + dispatch_source_cancel(op->timer); + } + // Final release will deliver any pending data + _dispatch_release(op); +} + +static void +_dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op) +{ + // On pick queue + _dispatch_io_debug("complete operation", op->fd_entry->fd); + // Current request is always the last op returned + if (disk->cur_rq == op) { + disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s, + operation_list); + } + if (op->params.type == DISPATCH_IO_STREAM) { + // Check if there are other pending stream operations behind it + dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list); + TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list); + if (op_next) { + TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list); + } + } + TAILQ_REMOVE(&disk->operations, op, operation_list); + if (op->timer) { + dispatch_source_cancel(op->timer); + } + // Final release will deliver any pending data + _dispatch_release(op); +} + +static dispatch_operation_t +_dispatch_stream_pick_next_operation(dispatch_stream_t stream, + dispatch_operation_t op) +{ + // On stream queue + if (!op) { + // On the first run through, pick the first operation + if (!_dispatch_stream_operation_avail(stream)) { + return op; + } + if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) { + op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]); + } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) { + op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]); + } + return op; + } + if (op->params.type == DISPATCH_IO_STREAM) { + // Stream operations need to be serialized so continue the current + // operation until it is finished + return op; + } + // Get the next random operation (round-robin) + if (op->params.type == DISPATCH_IO_RANDOM) { + op = TAILQ_NEXT(op, operation_list); + if (!op) { + op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]); + } + return op; + } + return NULL; +} + +static dispatch_operation_t +_dispatch_disk_pick_next_operation(dispatch_disk_t disk) +{ + // On pick queue + dispatch_operation_t op; + if (!TAILQ_EMPTY(&disk->operations)) { + if (disk->cur_rq == NULL) { + op = TAILQ_FIRST(&disk->operations); + } else { + op = disk->cur_rq; + do { + op = TAILQ_NEXT(op, operation_list); + if (!op) { + op = TAILQ_FIRST(&disk->operations); + } + // TODO: more involved picking algorithm rdar://problem/8780312 + } while (op->active && op != disk->cur_rq); + } + if (!op->active) { + disk->cur_rq = op; + return op; + } + } + return NULL; +} + +static void +_dispatch_stream_cleanup_operations(dispatch_stream_t stream, + dispatch_io_t channel) +{ + // On stream queue + dispatch_operation_t op, tmp; + typeof(*stream->operations) *operations; + operations = &stream->operations[DISPATCH_IO_RANDOM]; + TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) { + if (!channel || op->channel == channel) { + _dispatch_stream_complete_operation(stream, op); + } + } + operations = &stream->operations[DISPATCH_IO_STREAM]; + TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) { + if (!channel || op->channel == channel) { + _dispatch_stream_complete_operation(stream, op); + } + } + if (stream->source_running && !_dispatch_stream_operation_avail(stream)) { + dispatch_suspend(stream->source); + stream->source_running = false; + } +} + +static void +_dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel) +{ + // On pick queue + dispatch_operation_t op, tmp; + TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) { + if (!channel || op->channel == channel) { + _dispatch_disk_complete_operation(disk, op); + } + } +} + +#pragma mark - +#pragma mark dispatch_stream_handler/dispatch_disk_handler + +static dispatch_source_t +_dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op) +{ + // On stream queue + if (stream->source) { + return stream->source; + } + dispatch_fd_t fd = op->fd_entry->fd; + _dispatch_io_debug("stream source create", fd); + dispatch_source_t source = NULL; + if (op->direction == DOP_DIR_READ) { + source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0, + stream->dq); + } else if (op->direction == DOP_DIR_WRITE) { + source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fd, 0, + stream->dq); + } else { + dispatch_assert(op->direction < DOP_DIR_MAX); + return NULL; + } + dispatch_set_context(source, stream); + dispatch_source_set_event_handler_f(source, + _dispatch_stream_source_handler); + // Close queue must not run user cleanup handlers until sources are fully + // unregistered + dispatch_queue_t close_queue = op->fd_entry->close_queue; + dispatch_source_set_cancel_handler(source, ^{ + _dispatch_io_debug("stream source cancel", fd); + dispatch_resume(close_queue); + }); + stream->source = source; + return stream->source; +} + +static void +_dispatch_stream_source_handler(void *ctx) +{ + // On stream queue + dispatch_stream_t stream = (dispatch_stream_t)ctx; + dispatch_suspend(stream->source); + stream->source_running = false; + return _dispatch_stream_handler(stream); +} + +static void +_dispatch_stream_handler(void *ctx) +{ + // On stream queue + dispatch_stream_t stream = (dispatch_stream_t)ctx; + dispatch_operation_t op; +pick: + op = _dispatch_stream_pick_next_operation(stream, stream->op); + if (!op) { + _dispatch_debug("no operation found: stream %p", stream); + return; + } + int err = _dispatch_io_get_error(op, NULL, true); + if (err) { + op->err = err; + _dispatch_stream_complete_operation(stream, op); + goto pick; + } + stream->op = op; + _dispatch_io_debug("stream handler", op->fd_entry->fd); + dispatch_fd_entry_t fd_entry = op->fd_entry; + _dispatch_fd_entry_retain(fd_entry); + // For performance analysis + if (!op->total && dispatch_io_defaults.initial_delivery) { + // Empty delivery to signal the start of the operation + _dispatch_io_debug("initial delivery", op->fd_entry->fd); + _dispatch_operation_deliver_data(op, DOP_DELIVER); + } + // TODO: perform on the operation target queue to get correct priority + int result = _dispatch_operation_perform(op), flags = -1; + switch (result) { + case DISPATCH_OP_DELIVER: + flags = DOP_DEFAULT; + // Fall through + case DISPATCH_OP_DELIVER_AND_COMPLETE: + flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY : + DOP_DEFAULT; + _dispatch_operation_deliver_data(op, flags); + // Fall through + case DISPATCH_OP_COMPLETE: + if (flags != DOP_DEFAULT) { + _dispatch_stream_complete_operation(stream, op); + } + if (_dispatch_stream_operation_avail(stream)) { + dispatch_async_f(stream->dq, stream, _dispatch_stream_handler); + } + break; + case DISPATCH_OP_COMPLETE_RESUME: + _dispatch_stream_complete_operation(stream, op); + // Fall through + case DISPATCH_OP_RESUME: + if (_dispatch_stream_operation_avail(stream)) { + stream->source_running = true; + dispatch_resume(_dispatch_stream_source(stream, op)); + } + break; + case DISPATCH_OP_ERR: + _dispatch_stream_cleanup_operations(stream, op->channel); + break; + case DISPATCH_OP_FD_ERR: + _dispatch_fd_entry_retain(fd_entry); + dispatch_async(fd_entry->barrier_queue, ^{ + _dispatch_fd_entry_cleanup_operations(fd_entry, NULL); + _dispatch_fd_entry_release(fd_entry); + }); + break; + default: + break; + } + _dispatch_fd_entry_release(fd_entry); + return; +} + +static void +_dispatch_disk_handler(void *ctx) +{ + // On pick queue + dispatch_disk_t disk = (dispatch_disk_t)ctx; + if (disk->io_active) { + return; + } + _dispatch_io_debug("disk handler", -1); + dispatch_operation_t op; + size_t i = disk->free_idx, j = disk->req_idx; + if (j <= i) { + j += disk->advise_list_depth; + } + while (i <= j) { + if ((!disk->advise_list[i%disk->advise_list_depth]) && + (op = _dispatch_disk_pick_next_operation(disk))) { + int err = _dispatch_io_get_error(op, NULL, true); + if (err) { + op->err = err; + _dispatch_disk_complete_operation(disk, op); + continue; + } + _dispatch_retain(op); + disk->advise_list[i%disk->advise_list_depth] = op; + op->active = true; + } else { + // No more operations to get + break; + } + i++; + } + disk->free_idx = (i%disk->advise_list_depth); + op = disk->advise_list[disk->req_idx]; + if (op) { + disk->io_active = true; + dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform); + } +} + +static void +_dispatch_disk_perform(void *ctxt) +{ + dispatch_disk_t disk = ctxt; + size_t chunk_size = dispatch_io_defaults.chunk_pages * PAGE_SIZE; + _dispatch_io_debug("disk perform", -1); + dispatch_operation_t op; + size_t i = disk->advise_idx, j = disk->free_idx; + if (j <= i) { + j += disk->advise_list_depth; + } + do { + op = disk->advise_list[i%disk->advise_list_depth]; + if (!op) { + // Nothing more to advise, must be at free_idx + dispatch_assert(i%disk->advise_list_depth == disk->free_idx); + break; + } + if (op->direction == DOP_DIR_WRITE) { + // TODO: preallocate writes ? rdar://problem/9032172 + continue; + } + if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry, + op->channel)) { + continue; + } + // For performance analysis + if (!op->total && dispatch_io_defaults.initial_delivery) { + // Empty delivery to signal the start of the operation + _dispatch_io_debug("initial delivery", op->fd_entry->fd); + _dispatch_operation_deliver_data(op, DOP_DELIVER); + } + // Advise two chunks if the list only has one element and this is the + // first advise on the operation + if ((j-i) == 1 && !disk->advise_list[disk->free_idx] && + !op->advise_offset) { + chunk_size *= 2; + } + _dispatch_operation_advise(op, chunk_size); + } while (++i < j); + disk->advise_idx = i%disk->advise_list_depth; + op = disk->advise_list[disk->req_idx]; + int result = _dispatch_operation_perform(op); + disk->advise_list[disk->req_idx] = NULL; + disk->req_idx = (++disk->req_idx)%disk->advise_list_depth; + dispatch_async(disk->pick_queue, ^{ + switch (result) { + case DISPATCH_OP_DELIVER: + _dispatch_operation_deliver_data(op, DOP_DELIVER); + break; + case DISPATCH_OP_COMPLETE: + _dispatch_disk_complete_operation(disk, op); + break; + case DISPATCH_OP_DELIVER_AND_COMPLETE: + _dispatch_operation_deliver_data(op, DOP_DELIVER); + _dispatch_disk_complete_operation(disk, op); + break; + case DISPATCH_OP_ERR: + _dispatch_disk_cleanup_operations(disk, op->channel); + break; + case DISPATCH_OP_FD_ERR: + _dispatch_disk_cleanup_operations(disk, NULL); + break; + default: + dispatch_assert(result); + break; + } + op->active = false; + disk->io_active = false; + _dispatch_disk_handler(disk); + // Balancing the retain in _dispatch_disk_handler. Note that op must be + // released at the very end, since it might hold the last reference to + // the disk + _dispatch_release(op); + }); +} + +#pragma mark - +#pragma mark dispatch_operation_perform + +static void +_dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size) +{ + int err; + struct radvisory advise; + // No point in issuing a read advise for the next chunk if we are already + // a chunk ahead from reading the bytes + if (op->advise_offset > (off_t)((op->offset+op->total) + chunk_size + + PAGE_SIZE)) { + return; + } + advise.ra_count = (int)chunk_size; + if (!op->advise_offset) { + op->advise_offset = op->offset; + // If this is the first time through, align the advised range to a + // page boundary + size_t pg_fraction = (size_t)((op->offset + chunk_size) % PAGE_SIZE); + advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0); + } + advise.ra_offset = op->advise_offset; + op->advise_offset += advise.ra_count; + _dispatch_io_syscall_switch(err, + fcntl(op->fd_entry->fd, F_RDADVISE, &advise), + // TODO: set disk status on error + default: (void)dispatch_assume_zero(err); break; + ); +} + +static int +_dispatch_operation_perform(dispatch_operation_t op) +{ + int err = _dispatch_io_get_error(op, NULL, true); + if (err) { + goto error; + } + if (!op->buf) { + size_t max_buf_siz = op->params.high; + size_t chunk_siz = dispatch_io_defaults.chunk_pages * PAGE_SIZE; + if (op->direction == DOP_DIR_READ) { + // If necessary, create a buffer for the ongoing operation, large + // enough to fit chunk_pages but at most high-water + size_t data_siz = dispatch_data_get_size(op->data); + if (data_siz) { + dispatch_assert(data_siz < max_buf_siz); + max_buf_siz -= data_siz; + } + if (max_buf_siz > chunk_siz) { + max_buf_siz = chunk_siz; + } + if (op->length < SIZE_MAX) { + op->buf_siz = op->length - op->total; + if (op->buf_siz > max_buf_siz) { + op->buf_siz = max_buf_siz; + } + } else { + op->buf_siz = max_buf_siz; + } + op->buf = valloc(op->buf_siz); + _dispatch_io_debug("buffer allocated", op->fd_entry->fd); + } else if (op->direction == DOP_DIR_WRITE) { + // Always write the first data piece, if that is smaller than a + // chunk, accumulate further data pieces until chunk size is reached + if (chunk_siz > max_buf_siz) { + chunk_siz = max_buf_siz; + } + op->buf_siz = 0; + dispatch_data_apply(op->data, + ^(dispatch_data_t region DISPATCH_UNUSED, + size_t offset DISPATCH_UNUSED, + const void* buf DISPATCH_UNUSED, size_t len) { + size_t siz = op->buf_siz + len; + if (!op->buf_siz || siz <= chunk_siz) { + op->buf_siz = siz; + } + return (bool)(siz < chunk_siz); + }); + if (op->buf_siz > max_buf_siz) { + op->buf_siz = max_buf_siz; + } + dispatch_data_t d; + d = dispatch_data_create_subrange(op->data, 0, op->buf_siz); + op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf, + NULL); + _dispatch_io_data_release(d); + _dispatch_io_debug("buffer mapped", op->fd_entry->fd); + } + } + if (op->fd_entry->fd == -1) { + err = _dispatch_fd_entry_open(op->fd_entry, op->channel); + if (err) { + goto error; + } + } + void *buf = op->buf + op->buf_len; + size_t len = op->buf_siz - op->buf_len; + off_t off = op->offset + op->total; + ssize_t processed = -1; +syscall: + if (op->direction == DOP_DIR_READ) { + if (op->params.type == DISPATCH_IO_STREAM) { + processed = read(op->fd_entry->fd, buf, len); + } else if (op->params.type == DISPATCH_IO_RANDOM) { + processed = pread(op->fd_entry->fd, buf, len, off); + } + } else if (op->direction == DOP_DIR_WRITE) { + if (op->params.type == DISPATCH_IO_STREAM) { + processed = write(op->fd_entry->fd, buf, len); + } else if (op->params.type == DISPATCH_IO_RANDOM) { + processed = pwrite(op->fd_entry->fd, buf, len, off); + } + } + // Encountered an error on the file descriptor + if (processed == -1) { + err = errno; + if (err == EINTR) { + goto syscall; + } + goto error; + } + // EOF is indicated by two handler invocations + if (processed == 0) { + _dispatch_io_debug("EOF", op->fd_entry->fd); + return DISPATCH_OP_DELIVER_AND_COMPLETE; + } + op->buf_len += processed; + op->total += processed; + if (op->total == op->length) { + // Finished processing all the bytes requested by the operation + return DISPATCH_OP_COMPLETE; + } else { + // Deliver data only if we satisfy the filters + return DISPATCH_OP_DELIVER; + } +error: + if (err == EAGAIN) { + // For disk based files with blocking I/O we should never get EAGAIN + dispatch_assert(!op->fd_entry->disk); + _dispatch_io_debug("EAGAIN %d", op->fd_entry->fd, err); + if (op->direction == DOP_DIR_READ && op->total && + op->channel == op->fd_entry->convenience_channel) { + // Convenience read with available data completes on EAGAIN + return DISPATCH_OP_COMPLETE_RESUME; + } + return DISPATCH_OP_RESUME; + } + op->err = err; + switch (err) { + case ECANCELED: + return DISPATCH_OP_ERR; + case EBADF: + (void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err); + return DISPATCH_OP_FD_ERR; + default: + return DISPATCH_OP_COMPLETE; + } +} + +static void +_dispatch_operation_deliver_data(dispatch_operation_t op, + dispatch_op_flags_t flags) +{ + // Either called from stream resp. pick queue or when op is finalized + dispatch_data_t data = NULL; + int err = 0; + size_t undelivered = op->undelivered + op->buf_len; + bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) || + (op->flags & DOP_DELIVER); + op->flags = DOP_DEFAULT; + if (!deliver) { + // Don't deliver data until low water mark has been reached + if (undelivered >= op->params.low) { + deliver = true; + } else if (op->buf_len < op->buf_siz) { + // Request buffer is not yet used up + _dispatch_io_debug("buffer data", op->fd_entry->fd); + return; + } + } else { + err = op->err; + if (!err && (op->channel->atomic_flags & DIO_STOPPED)) { + err = ECANCELED; + op->err = err; + } + } + // Deliver data or buffer used up + if (op->direction == DOP_DIR_READ) { + if (op->buf_len) { + void *buf = op->buf; + data = dispatch_data_create(buf, op->buf_len, NULL, + DISPATCH_DATA_DESTRUCTOR_FREE); + op->buf = NULL; + op->buf_len = 0; + dispatch_data_t d = dispatch_data_create_concat(op->data, data); + _dispatch_io_data_release(op->data); + _dispatch_io_data_release(data); + data = d; + } else { + data = op->data; + } + op->data = deliver ? dispatch_data_empty : data; + } else if (op->direction == DOP_DIR_WRITE) { + if (deliver) { + data = dispatch_data_create_subrange(op->data, op->buf_len, + op->length); + } + if (op->buf_len == op->buf_siz) { + _dispatch_io_data_release(op->buf_data); + op->buf_data = NULL; + op->buf = NULL; + op->buf_len = 0; + // Trim newly written buffer from head of unwritten data + dispatch_data_t d; + if (deliver) { + _dispatch_io_data_retain(data); + d = data; + } else { + d = dispatch_data_create_subrange(op->data, op->buf_len, + op->length); + } + _dispatch_io_data_release(op->data); + op->data = d; + } + } else { + dispatch_assert(op->direction < DOP_DIR_MAX); + return; + } + if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) { + op->undelivered = undelivered; + _dispatch_io_debug("buffer data", op->fd_entry->fd); + return; + } + op->undelivered = 0; + _dispatch_io_debug("deliver data", op->fd_entry->fd); + dispatch_op_direction_t direction = op->direction; + __block dispatch_data_t d = data; + dispatch_io_handler_t handler = op->handler; +#if DISPATCH_IO_DEBUG + int fd = op->fd_entry->fd; +#endif + dispatch_fd_entry_t fd_entry = op->fd_entry; + _dispatch_fd_entry_retain(fd_entry); + dispatch_io_t channel = op->channel; + _dispatch_retain(channel); + // Note that data delivery may occur after the operation is freed + dispatch_async(op->op_q, ^{ + bool done = (flags & DOP_DONE); + if (done) { + if (direction == DOP_DIR_READ && err) { + if (dispatch_data_get_size(d)) { + _dispatch_io_debug("IO handler invoke", fd); + handler(false, d, 0); + } + d = NULL; + } else if (direction == DOP_DIR_WRITE && !err) { + d = NULL; + } + } + _dispatch_io_debug("IO handler invoke", fd); + handler(done, d, err); + _dispatch_release(channel); + _dispatch_fd_entry_release(fd_entry); + _dispatch_io_data_release(data); + }); +}