]> git.saurik.com Git - apple/libdispatch.git/blobdiff - src/io.c
libdispatch-187.5.tar.gz
[apple/libdispatch.git] / src / io.c
diff --git a/src/io.c b/src/io.c
new file mode 100644 (file)
index 0000000..b306054
--- /dev/null
+++ b/src/io.c
@@ -0,0 +1,2155 @@
+/*
+ * Copyright (c) 2009-2011 Apple Inc. All rights reserved.
+ *
+ * @APPLE_APACHE_LICENSE_HEADER_START@
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * @APPLE_APACHE_LICENSE_HEADER_END@
+ */
+
+#include "internal.h"
+
+typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
+
+DISPATCH_EXPORT DISPATCH_NOTHROW
+void _dispatch_iocntl(uint32_t param, uint64_t value);
+
+static void _dispatch_io_dispose(dispatch_io_t channel);
+static dispatch_operation_t _dispatch_operation_create(
+               dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
+               size_t length, dispatch_data_t data, dispatch_queue_t queue,
+               dispatch_io_handler_t handler);
+static void _dispatch_operation_dispose(dispatch_operation_t operation);
+static void _dispatch_operation_enqueue(dispatch_operation_t op,
+               dispatch_op_direction_t direction, dispatch_data_t data);
+static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
+               dispatch_operation_t op);
+static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
+static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
+static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
+               dispatch_fd_entry_init_callback_t completion_callback);
+static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
+               uintptr_t hash);
+static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
+               dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
+static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
+               dispatch_io_t channel);
+static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
+               dispatch_io_t channel);
+static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
+               dispatch_queue_t tq);
+static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
+               dispatch_op_direction_t direction);
+static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
+static void _dispatch_disk_dispose(dispatch_disk_t disk);
+static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
+               dispatch_operation_t operation, dispatch_data_t data);
+static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
+               dispatch_operation_t operation, dispatch_data_t data);
+static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
+               dispatch_io_t channel);
+static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk,
+               dispatch_io_t channel);
+static void _dispatch_stream_source_handler(void *ctx);
+static void _dispatch_stream_handler(void *ctx);
+static void _dispatch_disk_handler(void *ctx);
+static void _dispatch_disk_perform(void *ctxt);
+static void _dispatch_operation_advise(dispatch_operation_t op,
+               size_t chunk_size);
+static int _dispatch_operation_perform(dispatch_operation_t op);
+static void _dispatch_operation_deliver_data(dispatch_operation_t op,
+               dispatch_op_flags_t flags);
+
+// Macros to wrap syscalls which return -1 on error, and retry on EINTR
+#define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
+               switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
+               case EINTR: continue; \
+               __VA_ARGS__ \
+               } \
+       } while (0)
+#define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
+               _dispatch_io_syscall_switch_noerr(__err, __syscall, \
+               case 0: break; \
+               __VA_ARGS__ \
+               ); \
+       } while (0)
+#define _dispatch_io_syscall(__syscall) do { int __err; \
+               _dispatch_io_syscall_switch(__err, __syscall); \
+       } while (0)
+
+enum {
+       DISPATCH_OP_COMPLETE = 1,
+       DISPATCH_OP_DELIVER,
+       DISPATCH_OP_DELIVER_AND_COMPLETE,
+       DISPATCH_OP_COMPLETE_RESUME,
+       DISPATCH_OP_RESUME,
+       DISPATCH_OP_ERR,
+       DISPATCH_OP_FD_ERR,
+};
+
+#pragma mark -
+#pragma mark dispatch_io_vtable
+
+static const struct dispatch_io_vtable_s _dispatch_io_vtable = {
+       .do_type = DISPATCH_IO_TYPE,
+       .do_kind = "channel",
+       .do_dispose = _dispatch_io_dispose,
+       .do_invoke = NULL,
+       .do_probe = (void *)dummy_function_r0,
+       .do_debug = (void *)dummy_function_r0,
+};
+
+static const struct dispatch_operation_vtable_s _dispatch_operation_vtable = {
+       .do_type = DISPATCH_OPERATION_TYPE,
+       .do_kind = "operation",
+       .do_dispose = _dispatch_operation_dispose,
+       .do_invoke = NULL,
+       .do_probe = (void *)dummy_function_r0,
+       .do_debug = (void *)dummy_function_r0,
+};
+
+static const struct dispatch_disk_vtable_s _dispatch_disk_vtable = {
+       .do_type = DISPATCH_DISK_TYPE,
+       .do_kind = "disk",
+       .do_dispose = _dispatch_disk_dispose,
+       .do_invoke = NULL,
+       .do_probe = (void *)dummy_function_r0,
+       .do_debug = (void *)dummy_function_r0,
+};
+
+#pragma mark -
+#pragma mark dispatch_io_hashtables
+
+#if TARGET_OS_EMBEDDED
+#define DIO_HASH_SIZE  64u // must be a power of two
+#else
+#define DIO_HASH_SIZE 256u // must be a power of two
+#endif
+#define DIO_HASH(x) ((uintptr_t)((x) & (DIO_HASH_SIZE - 1)))
+
+// Global hashtable of dev_t -> disk_s mappings
+DISPATCH_CACHELINE_ALIGN
+static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
+// Global hashtable of fd -> fd_entry_s mappings
+DISPATCH_CACHELINE_ALIGN
+static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
+
+static dispatch_once_t  _dispatch_io_devs_lockq_pred;
+static dispatch_queue_t _dispatch_io_devs_lockq;
+static dispatch_queue_t _dispatch_io_fds_lockq;
+
+static void
+_dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
+{
+       _dispatch_io_fds_lockq = dispatch_queue_create(
+                       "com.apple.libdispatch-io.fd_lockq", NULL);
+       unsigned int i;
+       for (i = 0; i < DIO_HASH_SIZE; i++) {
+               TAILQ_INIT(&_dispatch_io_fds[i]);
+       }
+}
+
+static void
+_dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
+{
+       _dispatch_io_devs_lockq = dispatch_queue_create(
+                       "com.apple.libdispatch-io.dev_lockq", NULL);
+       unsigned int i;
+       for (i = 0; i < DIO_HASH_SIZE; i++) {
+               TAILQ_INIT(&_dispatch_io_devs[i]);
+       }
+}
+
+#pragma mark -
+#pragma mark dispatch_io_defaults
+
+enum {
+       DISPATCH_IOCNTL_CHUNK_PAGES = 1,
+       DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
+       DISPATCH_IOCNTL_INITIAL_DELIVERY,
+       DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
+};
+
+static struct dispatch_io_defaults_s {
+       size_t chunk_pages, low_water_chunks, max_pending_io_reqs;
+       bool initial_delivery;
+} dispatch_io_defaults = {
+       .chunk_pages = DIO_MAX_CHUNK_PAGES,
+       .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
+       .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
+};
+
+#define _dispatch_iocntl_set_default(p, v) do { \
+               dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
+       } while (0)
+
+void
+_dispatch_iocntl(uint32_t param, uint64_t value)
+{
+       switch (param) {
+       case DISPATCH_IOCNTL_CHUNK_PAGES:
+               _dispatch_iocntl_set_default(chunk_pages, value);
+               break;
+       case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
+               _dispatch_iocntl_set_default(low_water_chunks, value);
+               break;
+       case DISPATCH_IOCNTL_INITIAL_DELIVERY:
+               _dispatch_iocntl_set_default(initial_delivery, value);
+       case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
+               _dispatch_iocntl_set_default(max_pending_io_reqs, value);
+               break;
+       }
+}
+
+#pragma mark -
+#pragma mark dispatch_io_t
+
+static dispatch_io_t
+_dispatch_io_create(dispatch_io_type_t type)
+{
+       dispatch_io_t channel = calloc(1ul, sizeof(struct dispatch_io_s));
+       channel->do_vtable = &_dispatch_io_vtable;
+       channel->do_next = DISPATCH_OBJECT_LISTLESS;
+       channel->do_ref_cnt = 1;
+       channel->do_xref_cnt = 1;
+       channel->do_targetq = _dispatch_get_root_queue(0, true);
+       channel->params.type = type;
+       channel->params.high = SIZE_MAX;
+       channel->params.low = dispatch_io_defaults.low_water_chunks *
+                       dispatch_io_defaults.chunk_pages * PAGE_SIZE;
+       channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
+                       NULL);
+       return channel;
+}
+
+static void
+_dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
+               dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
+{
+       // Enqueue the cleanup handler on the suspended close queue
+       if (cleanup_handler) {
+               _dispatch_retain(queue);
+               dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
+                       dispatch_async(queue, ^{
+                               _dispatch_io_debug("cleanup handler invoke", -1);
+                               cleanup_handler(err);
+                       });
+                       _dispatch_release(queue);
+               });
+       }
+       if (fd_entry) {
+               channel->fd_entry = fd_entry;
+               dispatch_retain(fd_entry->barrier_queue);
+               dispatch_retain(fd_entry->barrier_group);
+               channel->barrier_queue = fd_entry->barrier_queue;
+               channel->barrier_group = fd_entry->barrier_group;
+       } else {
+               // Still need to create a barrier queue, since all operations go
+               // through it
+               channel->barrier_queue = dispatch_queue_create(
+                               "com.apple.libdispatch-io.barrierq", NULL);
+               channel->barrier_group = dispatch_group_create();
+       }
+}
+
+static void
+_dispatch_io_dispose(dispatch_io_t channel)
+{
+       if (channel->fd_entry) {
+               if (channel->fd_entry->path_data) {
+                       // This modification is safe since path_data->channel is checked
+                       // only on close_queue (which is still suspended at this point)
+                       channel->fd_entry->path_data->channel = NULL;
+               }
+               // Cleanup handlers will only run when all channels related to this
+               // fd are complete
+               _dispatch_fd_entry_release(channel->fd_entry);
+       }
+       if (channel->queue) {
+               dispatch_release(channel->queue);
+       }
+       if (channel->barrier_queue) {
+               dispatch_release(channel->barrier_queue);
+       }
+       if (channel->barrier_group) {
+               dispatch_release(channel->barrier_group);
+       }
+       _dispatch_dispose(channel);
+}
+
+static int
+_dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
+{
+       int err = 0;
+       if (S_ISDIR(mode)) {
+               err = EISDIR;
+       } else if (channel->params.type == DISPATCH_IO_RANDOM &&
+                       (S_ISFIFO(mode) || S_ISSOCK(mode))) {
+               err = ESPIPE;
+       }
+       return err;
+}
+
+static int
+_dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
+               bool ignore_closed)
+{
+       // On _any_ queue
+       int err;
+       if (op) {
+               channel = op->channel;
+       }
+       if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
+               if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
+                       err = ECANCELED;
+               } else {
+                       err = 0;
+               }
+       } else {
+               err = op ? op->fd_entry->err : channel->err;
+       }
+       return err;
+}
+
+#pragma mark -
+#pragma mark dispatch_io_channels
+
+dispatch_io_t
+dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
+               dispatch_queue_t queue, void (^cleanup_handler)(int))
+{
+       if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
+               return NULL;
+       }
+       _dispatch_io_debug("io create", fd);
+       dispatch_io_t channel = _dispatch_io_create(type);
+       channel->fd = fd;
+       channel->fd_actual = fd;
+       dispatch_suspend(channel->queue);
+       _dispatch_retain(queue);
+       _dispatch_retain(channel);
+       _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
+               // On barrier queue
+               int err = fd_entry->err;
+               if (!err) {
+                       err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
+               }
+               if (!err && type == DISPATCH_IO_RANDOM) {
+                       off_t f_ptr;
+                       _dispatch_io_syscall_switch_noerr(err,
+                               f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
+                               case 0: channel->f_ptr = f_ptr; break;
+                               default: (void)dispatch_assume_zero(err); break;
+                       );
+               }
+               channel->err = err;
+               _dispatch_fd_entry_retain(fd_entry);
+               _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
+               dispatch_resume(channel->queue);
+               _dispatch_release(channel);
+               _dispatch_release(queue);
+       });
+       return channel;
+}
+
+dispatch_io_t
+dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
+               int oflag, mode_t mode, dispatch_queue_t queue,
+               void (^cleanup_handler)(int error))
+{
+       if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
+                       !(path && *path == '/')) {
+               return NULL;
+       }
+       size_t pathlen = strlen(path);
+       dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
+       if (!path_data) {
+               return NULL;
+       }
+       _dispatch_io_debug("io create with path %s", -1, path);
+       dispatch_io_t channel = _dispatch_io_create(type);
+       channel->fd = -1;
+       channel->fd_actual = -1;
+       path_data->channel = channel;
+       path_data->oflag = oflag;
+       path_data->mode = mode;
+       path_data->pathlen = pathlen;
+       memcpy(path_data->path, path, pathlen + 1);
+       _dispatch_retain(queue);
+       _dispatch_retain(channel);
+       dispatch_async(channel->queue, ^{
+               int err = 0;
+               struct stat st;
+               _dispatch_io_syscall_switch_noerr(err,
+                       (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW ||
+                                       (path_data->oflag & O_SYMLINK) == O_SYMLINK ?
+                                       lstat(path_data->path, &st) : stat(path_data->path, &st),
+                       case 0:
+                               err = _dispatch_io_validate_type(channel, st.st_mode);
+                               break;
+                       default:
+                               if ((path_data->oflag & O_CREAT) &&
+                                               (*(path_data->path + path_data->pathlen - 1) != '/')) {
+                                       // Check parent directory
+                                       char *c = strrchr(path_data->path, '/');
+                                       dispatch_assert(c);
+                                       *c = 0;
+                                       int perr;
+                                       _dispatch_io_syscall_switch_noerr(perr,
+                                               stat(path_data->path, &st),
+                                               case 0:
+                                                       // Since the parent directory exists, open() will
+                                                       // create a regular file after the fd_entry has
+                                                       // been filled in
+                                                       st.st_mode = S_IFREG;
+                                                       err = 0;
+                                                       break;
+                                       );
+                                       *c = '/';
+                               }
+                               break;
+               );
+               channel->err = err;
+               if (err) {
+                       free(path_data);
+                       _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
+                       _dispatch_release(channel);
+                       _dispatch_release(queue);
+                       return;
+               }
+               dispatch_suspend(channel->queue);
+               dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
+                               _dispatch_io_devs_lockq_init);
+               dispatch_async(_dispatch_io_devs_lockq, ^{
+                       dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
+                                       path_data, st.st_dev, st.st_mode);
+                       _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
+                       dispatch_resume(channel->queue);
+                       _dispatch_release(channel);
+                       _dispatch_release(queue);
+               });
+       });
+       return channel;
+}
+
+dispatch_io_t
+dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
+               dispatch_queue_t queue, void (^cleanup_handler)(int error))
+{
+       if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
+               return NULL;
+       }
+       _dispatch_io_debug("io create with io %p", -1, in_channel);
+       dispatch_io_t channel = _dispatch_io_create(type);
+       dispatch_suspend(channel->queue);
+       _dispatch_retain(queue);
+       _dispatch_retain(channel);
+       _dispatch_retain(in_channel);
+       dispatch_async(in_channel->queue, ^{
+               int err0 = _dispatch_io_get_error(NULL, in_channel, false);
+               if (err0) {
+                       channel->err = err0;
+                       _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
+                       dispatch_resume(channel->queue);
+                       _dispatch_release(channel);
+                       _dispatch_release(in_channel);
+                       _dispatch_release(queue);
+                       return;
+               }
+               dispatch_async(in_channel->barrier_queue, ^{
+                       int err = _dispatch_io_get_error(NULL, in_channel, false);
+                       // If there is no error, the fd_entry for the in_channel is valid.
+                       // Since we are running on in_channel's queue, the fd_entry has been
+                       // fully resolved and will stay valid for the duration of this block
+                       if (!err) {
+                               err = in_channel->err;
+                               if (!err) {
+                                       err = in_channel->fd_entry->err;
+                               }
+                       }
+                       if (!err) {
+                               err = _dispatch_io_validate_type(channel,
+                                               in_channel->fd_entry->stat.mode);
+                       }
+                       if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
+                               off_t f_ptr;
+                               _dispatch_io_syscall_switch_noerr(err,
+                                       f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
+                                       case 0: channel->f_ptr = f_ptr; break;
+                                       default: (void)dispatch_assume_zero(err); break;
+                               );
+                       }
+                       channel->err = err;
+                       if (err) {
+                               _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
+                               dispatch_resume(channel->queue);
+                               _dispatch_release(channel);
+                               _dispatch_release(in_channel);
+                               _dispatch_release(queue);
+                               return;
+                       }
+                       if (in_channel->fd == -1) {
+                               // in_channel was created from path
+                               channel->fd = -1;
+                               channel->fd_actual = -1;
+                               mode_t mode = in_channel->fd_entry->stat.mode;
+                               dev_t dev = in_channel->fd_entry->stat.dev;
+                               size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
+                                               in_channel->fd_entry->path_data->pathlen + 1;
+                               dispatch_io_path_data_t path_data = malloc(path_data_len);
+                               memcpy(path_data, in_channel->fd_entry->path_data,
+                                               path_data_len);
+                               path_data->channel = channel;
+                               // lockq_io_devs is known to already exist
+                               dispatch_async(_dispatch_io_devs_lockq, ^{
+                                       dispatch_fd_entry_t fd_entry;
+                                       fd_entry = _dispatch_fd_entry_create_with_path(path_data,
+                                                       dev, mode);
+                                       _dispatch_io_init(channel, fd_entry, queue, 0,
+                                                       cleanup_handler);
+                                       dispatch_resume(channel->queue);
+                                       _dispatch_release(channel);
+                                       _dispatch_release(queue);
+                               });
+                       } else {
+                               dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
+                               channel->fd = in_channel->fd;
+                               channel->fd_actual = in_channel->fd_actual;
+                               _dispatch_fd_entry_retain(fd_entry);
+                               _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
+                               dispatch_resume(channel->queue);
+                               _dispatch_release(channel);
+                               _dispatch_release(queue);
+                       }
+                       _dispatch_release(in_channel);
+               });
+       });
+       return channel;
+}
+
+#pragma mark -
+#pragma mark dispatch_io_accessors
+
+void
+dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
+{
+       _dispatch_retain(channel);
+       dispatch_async(channel->queue, ^{
+               _dispatch_io_debug("io set high water", channel->fd);
+               if (channel->params.low > high_water) {
+                       channel->params.low = high_water;
+               }
+               channel->params.high = high_water ? high_water : 1;
+               _dispatch_release(channel);
+       });
+}
+
+void
+dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
+{
+       _dispatch_retain(channel);
+       dispatch_async(channel->queue, ^{
+               _dispatch_io_debug("io set low water", channel->fd);
+               if (channel->params.high < low_water) {
+                       channel->params.high = low_water ? low_water : 1;
+               }
+               channel->params.low = low_water;
+               _dispatch_release(channel);
+       });
+}
+
+void
+dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
+               unsigned long flags)
+{
+       _dispatch_retain(channel);
+       dispatch_async(channel->queue, ^{
+               _dispatch_io_debug("io set interval", channel->fd);
+               channel->params.interval = interval;
+               channel->params.interval_flags = flags;
+               _dispatch_release(channel);
+       });
+}
+
+void
+_dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
+{
+       _dispatch_retain(dq);
+       _dispatch_retain(channel);
+       dispatch_async(channel->queue, ^{
+               dispatch_queue_t prev_dq = channel->do_targetq;
+               channel->do_targetq = dq;
+               _dispatch_release(prev_dq);
+               _dispatch_release(channel);
+       });
+}
+
+dispatch_fd_t
+dispatch_io_get_descriptor(dispatch_io_t channel)
+{
+       if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
+               return -1;
+       }
+       dispatch_fd_t fd = channel->fd_actual;
+       if (fd == -1 &&
+                       _dispatch_thread_getspecific(dispatch_io_key) == channel) {
+               dispatch_fd_entry_t fd_entry = channel->fd_entry;
+               (void)_dispatch_fd_entry_open(fd_entry, channel);
+       }
+       return channel->fd_actual;
+}
+
+#pragma mark -
+#pragma mark dispatch_io_operations
+
+static void
+_dispatch_io_stop(dispatch_io_t channel)
+{
+       _dispatch_io_debug("io stop", channel->fd);
+       (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED);
+       _dispatch_retain(channel);
+       dispatch_async(channel->queue, ^{
+               dispatch_async(channel->barrier_queue, ^{
+                       dispatch_fd_entry_t fd_entry = channel->fd_entry;
+                       if (fd_entry) {
+                               _dispatch_io_debug("io stop cleanup", channel->fd);
+                               _dispatch_fd_entry_cleanup_operations(fd_entry, channel);
+                               channel->fd_entry = NULL;
+                               _dispatch_fd_entry_release(fd_entry);
+                       } else if (channel->fd != -1) {
+                               // Stop after close, need to check if fd_entry still exists
+                               _dispatch_retain(channel);
+                               dispatch_async(_dispatch_io_fds_lockq, ^{
+                                       _dispatch_io_debug("io stop after close cleanup",
+                                                       channel->fd);
+                                       dispatch_fd_entry_t fdi;
+                                       uintptr_t hash = DIO_HASH(channel->fd);
+                                       TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
+                                               if (fdi->fd == channel->fd) {
+                                                       _dispatch_fd_entry_cleanup_operations(fdi, channel);
+                                                       break;
+                                               }
+                                       }
+                                       _dispatch_release(channel);
+                               });
+                       }
+                       _dispatch_release(channel);
+               });
+       });
+}
+
+void
+dispatch_io_close(dispatch_io_t channel, unsigned long flags)
+{
+       if (flags & DISPATCH_IO_STOP) {
+               // Don't stop an already stopped channel
+               if (channel->atomic_flags & DIO_STOPPED) {
+                       return;
+               }
+               return _dispatch_io_stop(channel);
+       }
+       // Don't close an already closed or stopped channel
+       if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
+               return;
+       }
+       _dispatch_retain(channel);
+       dispatch_async(channel->queue, ^{
+               dispatch_async(channel->barrier_queue, ^{
+                       _dispatch_io_debug("io close", channel->fd);
+                       (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED);
+                       dispatch_fd_entry_t fd_entry = channel->fd_entry;
+                       if (fd_entry) {
+                               if (!fd_entry->path_data) {
+                                       channel->fd_entry = NULL;
+                               }
+                               _dispatch_fd_entry_release(fd_entry);
+                       }
+                       _dispatch_release(channel);
+               });
+       });
+}
+
+void
+dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
+{
+       _dispatch_retain(channel);
+       dispatch_async(channel->queue, ^{
+               dispatch_queue_t io_q = channel->do_targetq;
+               dispatch_queue_t barrier_queue = channel->barrier_queue;
+               dispatch_group_t barrier_group = channel->barrier_group;
+               dispatch_async(barrier_queue, ^{
+                       dispatch_suspend(barrier_queue);
+                       dispatch_group_notify(barrier_group, io_q, ^{
+                               _dispatch_thread_setspecific(dispatch_io_key, channel);
+                               barrier();
+                               _dispatch_thread_setspecific(dispatch_io_key, NULL);
+                               dispatch_resume(barrier_queue);
+                               _dispatch_release(channel);
+                       });
+               });
+       });
+}
+
+void
+dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
+               dispatch_queue_t queue, dispatch_io_handler_t handler)
+{
+       _dispatch_retain(channel);
+       _dispatch_retain(queue);
+       dispatch_async(channel->queue, ^{
+               dispatch_operation_t op;
+               op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
+                               length, dispatch_data_empty, queue, handler);
+               if (op) {
+                       dispatch_queue_t barrier_q = channel->barrier_queue;
+                       dispatch_async(barrier_q, ^{
+                               _dispatch_operation_enqueue(op, DOP_DIR_READ,
+                                               dispatch_data_empty);
+                       });
+               }
+               _dispatch_release(channel);
+               _dispatch_release(queue);
+       });
+}
+
+void
+dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
+               dispatch_queue_t queue, dispatch_io_handler_t handler)
+{
+       _dispatch_io_data_retain(data);
+       _dispatch_retain(channel);
+       _dispatch_retain(queue);
+       dispatch_async(channel->queue, ^{
+               dispatch_operation_t op;
+               op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
+                               dispatch_data_get_size(data), data, queue, handler);
+               if (op) {
+                       dispatch_queue_t barrier_q = channel->barrier_queue;
+                       dispatch_async(barrier_q, ^{
+                               _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
+                               _dispatch_io_data_release(data);
+                       });
+               } else {
+                       _dispatch_io_data_release(data);
+               }
+               _dispatch_release(channel);
+               _dispatch_release(queue);
+       });
+}
+
+void
+dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
+               void (^handler)(dispatch_data_t, int))
+{
+       _dispatch_retain(queue);
+       _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
+               // On barrier queue
+               if (fd_entry->err) {
+                       int err = fd_entry->err;
+                       dispatch_async(queue, ^{
+                               _dispatch_io_debug("convenience handler invoke", fd);
+                               handler(dispatch_data_empty, err);
+                       });
+                       _dispatch_release(queue);
+                       return;
+               }
+               // Safe to access fd_entry on barrier queue
+               dispatch_io_t channel = fd_entry->convenience_channel;
+               if (!channel) {
+                       channel = _dispatch_io_create(DISPATCH_IO_STREAM);
+                       channel->fd = fd;
+                       channel->fd_actual = fd;
+                       channel->fd_entry = fd_entry;
+                       dispatch_retain(fd_entry->barrier_queue);
+                       dispatch_retain(fd_entry->barrier_group);
+                       channel->barrier_queue = fd_entry->barrier_queue;
+                       channel->barrier_group = fd_entry->barrier_group;
+                       fd_entry->convenience_channel = channel;
+               }
+               __block dispatch_data_t deliver_data = dispatch_data_empty;
+               __block int err = 0;
+               dispatch_async(fd_entry->close_queue, ^{
+                       dispatch_async(queue, ^{
+                               _dispatch_io_debug("convenience handler invoke", fd);
+                               handler(deliver_data, err);
+                               _dispatch_io_data_release(deliver_data);
+                       });
+                       _dispatch_release(queue);
+               });
+               dispatch_operation_t op =
+                       _dispatch_operation_create(DOP_DIR_READ, channel, 0,
+                                       length, dispatch_data_empty,
+                                       _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
+                                       false), ^(bool done, dispatch_data_t data, int error) {
+                               if (data) {
+                                       data = dispatch_data_create_concat(deliver_data, data);
+                                       _dispatch_io_data_release(deliver_data);
+                                       deliver_data = data;
+                               }
+                               if (done) {
+                                       err = error;
+                               }
+                       });
+               if (op) {
+                       _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
+               }
+       });
+}
+
+void
+dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
+               void (^handler)(dispatch_data_t, int))
+{
+       _dispatch_io_data_retain(data);
+       _dispatch_retain(queue);
+       _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
+               // On barrier queue
+               if (fd_entry->err) {
+                       int err = fd_entry->err;
+                       dispatch_async(queue, ^{
+                               _dispatch_io_debug("convenience handler invoke", fd);
+                               handler(NULL, err);
+                       });
+                       _dispatch_release(queue);
+                       return;
+               }
+               // Safe to access fd_entry on barrier queue
+               dispatch_io_t channel = fd_entry->convenience_channel;
+               if (!channel) {
+                       channel = _dispatch_io_create(DISPATCH_IO_STREAM);
+                       channel->fd = fd;
+                       channel->fd_actual = fd;
+                       channel->fd_entry = fd_entry;
+                       dispatch_retain(fd_entry->barrier_queue);
+                       dispatch_retain(fd_entry->barrier_group);
+                       channel->barrier_queue = fd_entry->barrier_queue;
+                       channel->barrier_group = fd_entry->barrier_group;
+                       fd_entry->convenience_channel = channel;
+               }
+               __block dispatch_data_t deliver_data = NULL;
+               __block int err = 0;
+               dispatch_async(fd_entry->close_queue, ^{
+                       dispatch_async(queue, ^{
+                               _dispatch_io_debug("convenience handler invoke", fd);
+                               handler(deliver_data, err);
+                               if (deliver_data) {
+                                       _dispatch_io_data_release(deliver_data);
+                               }
+                       });
+                       _dispatch_release(queue);
+               });
+               dispatch_operation_t op =
+                       _dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
+                                       dispatch_data_get_size(data), data,
+                                       _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
+                                       false), ^(bool done, dispatch_data_t d, int error) {
+                               if (done) {
+                                       if (d) {
+                                               _dispatch_io_data_retain(d);
+                                               deliver_data = d;
+                                       }
+                                       err = error;
+                               }
+                       });
+               if (op) {
+                       _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
+               }
+               _dispatch_io_data_release(data);
+       });
+}
+
+#pragma mark -
+#pragma mark dispatch_operation_t
+
+static dispatch_operation_t
+_dispatch_operation_create(dispatch_op_direction_t direction,
+               dispatch_io_t channel, off_t offset, size_t length,
+               dispatch_data_t data, dispatch_queue_t queue,
+               dispatch_io_handler_t handler)
+{
+       // On channel queue
+       dispatch_assert(direction < DOP_DIR_MAX);
+       _dispatch_io_debug("operation create", channel->fd);
+#if DISPATCH_IO_DEBUG
+       int fd = channel->fd;
+#endif
+       // Safe to call _dispatch_io_get_error() with channel->fd_entry since
+       // that can only be NULL if atomic_flags are set rdar://problem/8362514
+       int err = _dispatch_io_get_error(NULL, channel, false);
+       if (err || !length) {
+               _dispatch_io_data_retain(data);
+               _dispatch_retain(queue);
+               dispatch_async(channel->barrier_queue, ^{
+                       dispatch_async(queue, ^{
+                               dispatch_data_t d = data;
+                               if (direction == DOP_DIR_READ && err) {
+                                       d = NULL;
+                               } else if (direction == DOP_DIR_WRITE && !err) {
+                                       d = NULL;
+                               }
+                               _dispatch_io_debug("IO handler invoke", fd);
+                               handler(true, d, err);
+                               _dispatch_io_data_release(data);
+                       });
+                       _dispatch_release(queue);
+               });
+               return NULL;
+       }
+       dispatch_operation_t op;
+       op = calloc(1ul, sizeof(struct dispatch_operation_s));
+       op->do_vtable = &_dispatch_operation_vtable;
+       op->do_next = DISPATCH_OBJECT_LISTLESS;
+       op->do_ref_cnt = 1;
+       op->do_xref_cnt = 0; // operation object is not exposed externally
+       op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
+       op->op_q->do_targetq = queue;
+       _dispatch_retain(queue);
+       op->active = false;
+       op->direction = direction;
+       op->offset = offset + channel->f_ptr;
+       op->length = length;
+       op->handler = Block_copy(handler);
+       _dispatch_retain(channel);
+       op->channel = channel;
+       op->params = channel->params;
+       // Take a snapshot of the priority of the channel queue. The actual I/O
+       // for this operation will be performed at this priority
+       dispatch_queue_t targetq = op->channel->do_targetq;
+       while (fastpath(targetq->do_targetq)) {
+               targetq = targetq->do_targetq;
+       }
+       op->do_targetq = targetq;
+       return op;
+}
+
+static void
+_dispatch_operation_dispose(dispatch_operation_t op)
+{
+       // Deliver the data if there's any
+       if (op->fd_entry) {
+               _dispatch_operation_deliver_data(op, DOP_DONE);
+               dispatch_group_leave(op->fd_entry->barrier_group);
+               _dispatch_fd_entry_release(op->fd_entry);
+       }
+       if (op->channel) {
+               _dispatch_release(op->channel);
+       }
+       if (op->timer) {
+               dispatch_release(op->timer);
+       }
+       // For write operations, op->buf is owned by op->buf_data
+       if (op->buf && op->direction == DOP_DIR_READ) {
+               free(op->buf);
+       }
+       if (op->buf_data) {
+               _dispatch_io_data_release(op->buf_data);
+       }
+       if (op->data) {
+               _dispatch_io_data_release(op->data);
+       }
+       if (op->op_q) {
+               dispatch_release(op->op_q);
+       }
+       Block_release(op->handler);
+       _dispatch_dispose(op);
+}
+
+static void
+_dispatch_operation_enqueue(dispatch_operation_t op,
+               dispatch_op_direction_t direction, dispatch_data_t data)
+{
+       // Called from the barrier queue
+       _dispatch_io_data_retain(data);
+       // If channel is closed or stopped, then call the handler immediately
+       int err = _dispatch_io_get_error(NULL, op->channel, false);
+       if (err) {
+               dispatch_io_handler_t handler = op->handler;
+               dispatch_async(op->op_q, ^{
+                       dispatch_data_t d = data;
+                       if (direction == DOP_DIR_READ && err) {
+                               d = NULL;
+                       } else if (direction == DOP_DIR_WRITE && !err) {
+                               d = NULL;
+                       }
+                       handler(true, d, err);
+                       _dispatch_io_data_release(data);
+               });
+               _dispatch_release(op);
+               return;
+       }
+       // Finish operation init
+       op->fd_entry = op->channel->fd_entry;
+       _dispatch_fd_entry_retain(op->fd_entry);
+       dispatch_group_enter(op->fd_entry->barrier_group);
+       dispatch_disk_t disk = op->fd_entry->disk;
+       if (!disk) {
+               dispatch_stream_t stream = op->fd_entry->streams[direction];
+               dispatch_async(stream->dq, ^{
+                       _dispatch_stream_enqueue_operation(stream, op, data);
+                       _dispatch_io_data_release(data);
+               });
+       } else {
+               dispatch_async(disk->pick_queue, ^{
+                       _dispatch_disk_enqueue_operation(disk, op, data);
+                       _dispatch_io_data_release(data);
+               });
+       }
+}
+
+static bool
+_dispatch_operation_should_enqueue(dispatch_operation_t op,
+               dispatch_queue_t tq, dispatch_data_t data)
+{
+       // On stream queue or disk queue
+       _dispatch_io_debug("enqueue operation", op->fd_entry->fd);
+       _dispatch_io_data_retain(data);
+       op->data = data;
+       int err = _dispatch_io_get_error(op, NULL, true);
+       if (err) {
+               op->err = err;
+               // Final release
+               _dispatch_release(op);
+               return false;
+       }
+       if (op->params.interval) {
+               dispatch_resume(_dispatch_operation_timer(tq, op));
+       }
+       return true;
+}
+
+static dispatch_source_t
+_dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
+{
+       // On stream queue or pick queue
+       if (op->timer) {
+               return op->timer;
+       }
+       dispatch_source_t timer = dispatch_source_create(
+                       DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
+       dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW,
+                       op->params.interval), op->params.interval, 0);
+       dispatch_source_set_event_handler(timer, ^{
+               // On stream queue or pick queue
+               if (dispatch_source_testcancel(timer)) {
+                       // Do nothing. The operation has already completed
+                       return;
+               }
+               dispatch_op_flags_t flags = DOP_DEFAULT;
+               if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
+                       // Deliver even if there is less data than the low-water mark
+                       flags |= DOP_DELIVER;
+               }
+               // If the operation is active, dont deliver data
+               if ((op->active) && (flags & DOP_DELIVER)) {
+                       op->flags = flags;
+               } else {
+                       _dispatch_operation_deliver_data(op, flags);
+               }
+       });
+       op->timer = timer;
+       return op->timer;
+}
+
+#pragma mark -
+#pragma mark dispatch_fd_entry_t
+
+static inline void
+_dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
+       dispatch_suspend(fd_entry->close_queue);
+}
+
+static inline void
+_dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
+       dispatch_resume(fd_entry->close_queue);
+}
+
+static void
+_dispatch_fd_entry_init_async(dispatch_fd_t fd,
+               dispatch_fd_entry_init_callback_t completion_callback)
+{
+       static dispatch_once_t _dispatch_io_fds_lockq_pred;
+       dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
+                       _dispatch_io_fds_lockq_init);
+       dispatch_async(_dispatch_io_fds_lockq, ^{
+               _dispatch_io_debug("fd entry init", fd);
+               dispatch_fd_entry_t fd_entry = NULL;
+               // Check to see if there is an existing entry for the given fd
+               uintptr_t hash = DIO_HASH(fd);
+               TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
+                       if (fd_entry->fd == fd) {
+                               // Retain the fd_entry to ensure it cannot go away until the
+                               // stat() has completed
+                               _dispatch_fd_entry_retain(fd_entry);
+                               break;
+                       }
+               }
+               if (!fd_entry) {
+                       // If we did not find an existing entry, create one
+                       fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
+               }
+               dispatch_async(fd_entry->barrier_queue, ^{
+                       _dispatch_io_debug("fd entry init completion", fd);
+                       completion_callback(fd_entry);
+                       // stat() is complete, release reference to fd_entry
+                       _dispatch_fd_entry_release(fd_entry);
+               });
+       });
+}
+
+static dispatch_fd_entry_t
+_dispatch_fd_entry_create(dispatch_queue_t q)
+{
+       dispatch_fd_entry_t fd_entry;
+       fd_entry = calloc(1ul, sizeof(struct dispatch_fd_entry_s));
+       fd_entry->close_queue = dispatch_queue_create(
+                       "com.apple.libdispatch-io.closeq", NULL);
+       // Use target queue to ensure that no concurrent lookups are going on when
+       // the close queue is running
+       fd_entry->close_queue->do_targetq = q;
+       _dispatch_retain(q);
+       // Suspend the cleanup queue until closing
+       _dispatch_fd_entry_retain(fd_entry);
+       return fd_entry;
+}
+
+static dispatch_fd_entry_t
+_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
+{
+       // On fds lock queue
+       _dispatch_io_debug("fd entry create", fd);
+       dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
+                       _dispatch_io_fds_lockq);
+       fd_entry->fd = fd;
+       TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
+       fd_entry->barrier_queue = dispatch_queue_create(
+                       "com.apple.libdispatch-io.barrierq", NULL);
+       fd_entry->barrier_group = dispatch_group_create();
+       dispatch_async(fd_entry->barrier_queue, ^{
+               _dispatch_io_debug("fd entry stat", fd);
+               int err, orig_flags, orig_nosigpipe = -1;
+               struct stat st;
+               _dispatch_io_syscall_switch(err,
+                       fstat(fd, &st),
+                       default: fd_entry->err = err; return;
+               );
+               fd_entry->stat.dev = st.st_dev;
+               fd_entry->stat.mode = st.st_mode;
+               _dispatch_io_syscall_switch(err,
+                       orig_flags = fcntl(fd, F_GETFL),
+                       default: (void)dispatch_assume_zero(err); break;
+               );
+#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
+               if (S_ISFIFO(st.st_mode)) {
+                       _dispatch_io_syscall_switch(err,
+                               orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
+                               default: (void)dispatch_assume_zero(err); break;
+                       );
+                       if (orig_nosigpipe != -1) {
+                               _dispatch_io_syscall_switch(err,
+                                       orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
+                                       default:
+                                               orig_nosigpipe = -1;
+                                               (void)dispatch_assume_zero(err);
+                                               break;
+                               );
+                       }
+               }
+#endif
+               if (S_ISREG(st.st_mode)) {
+                       if (orig_flags != -1) {
+                               _dispatch_io_syscall_switch(err,
+                                       fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
+                                       default:
+                                               orig_flags = -1;
+                                               (void)dispatch_assume_zero(err);
+                                               break;
+                               );
+                       }
+                       int32_t dev = major(st.st_dev);
+                       // We have to get the disk on the global dev queue. The
+                       // barrier queue cannot continue until that is complete
+                       dispatch_suspend(fd_entry->barrier_queue);
+                       dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
+                                       _dispatch_io_devs_lockq_init);
+                       dispatch_async(_dispatch_io_devs_lockq, ^{
+                               _dispatch_disk_init(fd_entry, dev);
+                               dispatch_resume(fd_entry->barrier_queue);
+                       });
+               } else {
+                       if (orig_flags != -1) {
+                               _dispatch_io_syscall_switch(err,
+                                       fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
+                                       default:
+                                               orig_flags = -1;
+                                               (void)dispatch_assume_zero(err);
+                                               break;
+                               );
+                       }
+                       _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
+                                       DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
+               }
+               fd_entry->orig_flags = orig_flags;
+               fd_entry->orig_nosigpipe = orig_nosigpipe;
+       });
+       // This is the first item run when the close queue is resumed, indicating
+       // that all channels associated with this entry have been closed and that
+       // all operations associated with this entry have been freed
+       dispatch_async(fd_entry->close_queue, ^{
+               if (!fd_entry->disk) {
+                       _dispatch_io_debug("close queue fd_entry cleanup", fd);
+                       dispatch_op_direction_t dir;
+                       for (dir = 0; dir < DOP_DIR_MAX; dir++) {
+                               _dispatch_stream_dispose(fd_entry, dir);
+                       }
+               } else {
+                       dispatch_disk_t disk = fd_entry->disk;
+                       dispatch_async(_dispatch_io_devs_lockq, ^{
+                               _dispatch_release(disk);
+                       });
+               }
+               // Remove this entry from the global fd list
+               TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
+       });
+       // If there was a source associated with this stream, disposing of the
+       // source cancels it and suspends the close queue. Freeing the fd_entry
+       // structure must happen after the source cancel handler has finished
+       dispatch_async(fd_entry->close_queue, ^{
+               _dispatch_io_debug("close queue release", fd);
+               dispatch_release(fd_entry->close_queue);
+               _dispatch_io_debug("barrier queue release", fd);
+               dispatch_release(fd_entry->barrier_queue);
+               _dispatch_io_debug("barrier group release", fd);
+               dispatch_release(fd_entry->barrier_group);
+               if (fd_entry->orig_flags != -1) {
+                       _dispatch_io_syscall(
+                               fcntl(fd, F_SETFL, fd_entry->orig_flags)
+                       );
+               }
+#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
+               if (fd_entry->orig_nosigpipe != -1) {
+                       _dispatch_io_syscall(
+                               fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
+                       );
+               }
+#endif
+               if (fd_entry->convenience_channel) {
+                       fd_entry->convenience_channel->fd_entry = NULL;
+                       dispatch_release(fd_entry->convenience_channel);
+               }
+               free(fd_entry);
+       });
+       return fd_entry;
+}
+
+static dispatch_fd_entry_t
+_dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
+               dev_t dev, mode_t mode)
+{
+       // On devs lock queue
+       _dispatch_io_debug("fd entry create with path %s", -1, path_data->path);
+       dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
+                       path_data->channel->queue);
+       if (S_ISREG(mode)) {
+               _dispatch_disk_init(fd_entry, major(dev));
+       } else {
+               _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
+                               DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
+       }
+       fd_entry->fd = -1;
+       fd_entry->orig_flags = -1;
+       fd_entry->path_data = path_data;
+       fd_entry->stat.dev = dev;
+       fd_entry->stat.mode = mode;
+       fd_entry->barrier_queue = dispatch_queue_create(
+                       "com.apple.libdispatch-io.barrierq", NULL);
+       fd_entry->barrier_group = dispatch_group_create();
+       // This is the first item run when the close queue is resumed, indicating
+       // that the channel associated with this entry has been closed and that
+       // all operations associated with this entry have been freed
+       dispatch_async(fd_entry->close_queue, ^{
+               _dispatch_io_debug("close queue fd_entry cleanup", -1);
+               if (!fd_entry->disk) {
+                       dispatch_op_direction_t dir;
+                       for (dir = 0; dir < DOP_DIR_MAX; dir++) {
+                               _dispatch_stream_dispose(fd_entry, dir);
+                       }
+               }
+               if (fd_entry->fd != -1) {
+                       close(fd_entry->fd);
+               }
+               if (fd_entry->path_data->channel) {
+                       // If associated channel has not been released yet, mark it as
+                       // no longer having an fd_entry (for stop after close).
+                       // It is safe to modify channel since we are on close_queue with
+                       // target queue the channel queue
+                       fd_entry->path_data->channel->fd_entry = NULL;
+               }
+       });
+       dispatch_async(fd_entry->close_queue, ^{
+               _dispatch_io_debug("close queue release", -1);
+               dispatch_release(fd_entry->close_queue);
+               dispatch_release(fd_entry->barrier_queue);
+               dispatch_release(fd_entry->barrier_group);
+               free(fd_entry->path_data);
+               free(fd_entry);
+       });
+       return fd_entry;
+}
+
+static int
+_dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
+{
+       if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
+               return 0;
+       }
+       if (fd_entry->err) {
+               return fd_entry->err;
+       }
+       int fd = -1;
+       int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
+                       fd_entry->path_data->oflag | O_NONBLOCK;
+open:
+       fd = open(fd_entry->path_data->path, oflag, fd_entry->path_data->mode);
+       if (fd == -1) {
+               int err = errno;
+               if (err == EINTR) {
+                       goto open;
+               }
+               (void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err);
+               return err;
+       }
+       if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd)) {
+               // Lost the race with another open
+               close(fd);
+       } else {
+               channel->fd_actual = fd;
+       }
+       return 0;
+}
+
+static void
+_dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
+               dispatch_io_t channel)
+{
+       if (fd_entry->disk) {
+               if (channel) {
+                       _dispatch_retain(channel);
+               }
+               _dispatch_fd_entry_retain(fd_entry);
+               dispatch_async(fd_entry->disk->pick_queue, ^{
+                       _dispatch_disk_cleanup_operations(fd_entry->disk, channel);
+                       _dispatch_fd_entry_release(fd_entry);
+                       if (channel) {
+                               _dispatch_release(channel);
+                       }
+               });
+       } else {
+               dispatch_op_direction_t direction;
+               for (direction = 0; direction < DOP_DIR_MAX; direction++) {
+                       dispatch_stream_t stream = fd_entry->streams[direction];
+                       if (!stream) {
+                               continue;
+                       }
+                       if (channel) {
+                               _dispatch_retain(channel);
+                       }
+                       _dispatch_fd_entry_retain(fd_entry);
+                       dispatch_async(stream->dq, ^{
+                               _dispatch_stream_cleanup_operations(stream, channel);
+                               _dispatch_fd_entry_release(fd_entry);
+                               if (channel) {
+                                       _dispatch_release(channel);
+                               }
+                       });
+               }
+       }
+}
+
+#pragma mark -
+#pragma mark dispatch_stream_t/dispatch_disk_t
+
+static void
+_dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
+{
+       dispatch_op_direction_t direction;
+       for (direction = 0; direction < DOP_DIR_MAX; direction++) {
+               dispatch_stream_t stream;
+               stream = calloc(1ul, sizeof(struct dispatch_stream_s));
+               stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq",
+                               NULL);
+               _dispatch_retain(tq);
+               stream->dq->do_targetq = tq;
+               TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
+               TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
+               fd_entry->streams[direction] = stream;
+       }
+}
+
+static void
+_dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
+               dispatch_op_direction_t direction)
+{
+       // On close queue
+       dispatch_stream_t stream = fd_entry->streams[direction];
+       if (!stream) {
+               return;
+       }
+       dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
+       dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
+       if (stream->source) {
+               // Balanced by source cancel handler:
+               _dispatch_fd_entry_retain(fd_entry);
+               dispatch_source_cancel(stream->source);
+               dispatch_resume(stream->source);
+               dispatch_release(stream->source);
+       }
+       dispatch_release(stream->dq);
+       free(stream);
+}
+
+static void
+_dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
+{
+       // On devs lock queue
+       dispatch_disk_t disk;
+       char label_name[256];
+       // Check to see if there is an existing entry for the given device
+       uintptr_t hash = DIO_HASH(dev);
+       TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
+               if (disk->dev == dev) {
+                       _dispatch_retain(disk);
+                       goto out;
+               }
+       }
+       // Otherwise create a new entry
+       size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
+       disk = calloc(1ul, sizeof(struct dispatch_disk_s) + (pending_reqs_depth *
+                       sizeof(dispatch_operation_t)));
+       disk->do_vtable = &_dispatch_disk_vtable;
+       disk->do_next = DISPATCH_OBJECT_LISTLESS;
+       disk->do_ref_cnt = 1;
+       disk->do_xref_cnt = 0;
+       disk->advise_list_depth = pending_reqs_depth;
+       disk->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
+                       false);
+       disk->dev = dev;
+       TAILQ_INIT(&disk->operations);
+       disk->cur_rq = TAILQ_FIRST(&disk->operations);
+       sprintf(label_name, "com.apple.libdispatch-io.deviceq.%d", dev);
+       disk->pick_queue = dispatch_queue_create(label_name, NULL);
+       TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
+out:
+       fd_entry->disk = disk;
+       TAILQ_INIT(&fd_entry->stream_ops);
+}
+
+static void
+_dispatch_disk_dispose(dispatch_disk_t disk)
+{
+       uintptr_t hash = DIO_HASH(disk->dev);
+       TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
+       dispatch_assert(TAILQ_EMPTY(&disk->operations));
+       size_t i;
+       for (i=0; i<disk->advise_list_depth; ++i) {
+               dispatch_assert(!disk->advise_list[i]);
+       }
+       dispatch_release(disk->pick_queue);
+       free(disk);
+}
+
+#pragma mark -
+#pragma mark dispatch_stream_operations/dispatch_disk_operations
+
+static inline bool
+_dispatch_stream_operation_avail(dispatch_stream_t stream)
+{
+       return  !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
+                       !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
+}
+
+static void
+_dispatch_stream_enqueue_operation(dispatch_stream_t stream,
+               dispatch_operation_t op, dispatch_data_t data)
+{
+       if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
+               return;
+       }
+       bool no_ops = !_dispatch_stream_operation_avail(stream);
+       TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
+       if (no_ops) {
+               dispatch_async_f(stream->dq, stream, _dispatch_stream_handler);
+       }
+}
+
+static void
+_dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
+               dispatch_data_t data)
+{
+       if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
+               return;
+       }
+       if (op->params.type == DISPATCH_IO_STREAM) {
+               if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
+                       TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
+               }
+               TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
+       } else {
+               TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
+       }
+       _dispatch_disk_handler(disk);
+}
+
+static void
+_dispatch_stream_complete_operation(dispatch_stream_t stream,
+               dispatch_operation_t op)
+{
+       // On stream queue
+       _dispatch_io_debug("complete operation", op->fd_entry->fd);
+       TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
+       if (op == stream->op) {
+               stream->op = NULL;
+       }
+       if (op->timer) {
+               dispatch_source_cancel(op->timer);
+       }
+       // Final release will deliver any pending data
+       _dispatch_release(op);
+}
+
+static void
+_dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
+{
+       // On pick queue
+       _dispatch_io_debug("complete operation", op->fd_entry->fd);
+       // Current request is always the last op returned
+       if (disk->cur_rq == op) {
+               disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
+                               operation_list);
+       }
+       if (op->params.type == DISPATCH_IO_STREAM) {
+               // Check if there are other pending stream operations behind it
+               dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
+               TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
+               if (op_next) {
+                       TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
+               }
+       }
+       TAILQ_REMOVE(&disk->operations, op, operation_list);
+       if (op->timer) {
+               dispatch_source_cancel(op->timer);
+       }
+       // Final release will deliver any pending data
+       _dispatch_release(op);
+}
+
+static dispatch_operation_t
+_dispatch_stream_pick_next_operation(dispatch_stream_t stream,
+               dispatch_operation_t op)
+{
+       // On stream queue
+       if (!op) {
+               // On the first run through, pick the first operation
+               if (!_dispatch_stream_operation_avail(stream)) {
+                       return op;
+               }
+               if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
+                       op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
+               } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
+                       op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
+               }
+               return op;
+       }
+       if (op->params.type == DISPATCH_IO_STREAM) {
+               // Stream operations need to be serialized so continue the current
+               // operation until it is finished
+               return op;
+       }
+       // Get the next random operation (round-robin)
+       if (op->params.type == DISPATCH_IO_RANDOM) {
+               op = TAILQ_NEXT(op, operation_list);
+               if (!op) {
+                       op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
+               }
+               return op;
+       }
+       return NULL;
+}
+
+static dispatch_operation_t
+_dispatch_disk_pick_next_operation(dispatch_disk_t disk)
+{
+       // On pick queue
+       dispatch_operation_t op;
+       if (!TAILQ_EMPTY(&disk->operations)) {
+               if (disk->cur_rq == NULL) {
+                       op = TAILQ_FIRST(&disk->operations);
+               } else {
+                       op = disk->cur_rq;
+                       do {
+                               op = TAILQ_NEXT(op, operation_list);
+                               if (!op) {
+                                       op = TAILQ_FIRST(&disk->operations);
+                               }
+                               // TODO: more involved picking algorithm rdar://problem/8780312
+                       } while (op->active && op != disk->cur_rq);
+               }
+               if (!op->active) {
+                       disk->cur_rq = op;
+                       return op;
+               }
+       }
+       return NULL;
+}
+
+static void
+_dispatch_stream_cleanup_operations(dispatch_stream_t stream,
+               dispatch_io_t channel)
+{
+       // On stream queue
+       dispatch_operation_t op, tmp;
+       typeof(*stream->operations) *operations;
+       operations = &stream->operations[DISPATCH_IO_RANDOM];
+       TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
+               if (!channel || op->channel == channel) {
+                       _dispatch_stream_complete_operation(stream, op);
+               }
+       }
+       operations = &stream->operations[DISPATCH_IO_STREAM];
+       TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
+               if (!channel || op->channel == channel) {
+                       _dispatch_stream_complete_operation(stream, op);
+               }
+       }
+       if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
+               dispatch_suspend(stream->source);
+               stream->source_running = false;
+       }
+}
+
+static void
+_dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
+{
+       // On pick queue
+       dispatch_operation_t op, tmp;
+       TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
+               if (!channel || op->channel == channel) {
+                       _dispatch_disk_complete_operation(disk, op);
+               }
+       }
+}
+
+#pragma mark -
+#pragma mark dispatch_stream_handler/dispatch_disk_handler
+
+static dispatch_source_t
+_dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
+{
+       // On stream queue
+       if (stream->source) {
+               return stream->source;
+       }
+       dispatch_fd_t fd = op->fd_entry->fd;
+       _dispatch_io_debug("stream source create", fd);
+       dispatch_source_t source = NULL;
+       if (op->direction == DOP_DIR_READ) {
+               source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0,
+                               stream->dq);
+       } else if (op->direction == DOP_DIR_WRITE) {
+               source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fd, 0,
+                               stream->dq);
+       } else {
+               dispatch_assert(op->direction < DOP_DIR_MAX);
+               return NULL;
+       }
+       dispatch_set_context(source, stream);
+       dispatch_source_set_event_handler_f(source,
+                       _dispatch_stream_source_handler);
+       // Close queue must not run user cleanup handlers until sources are fully
+       // unregistered
+       dispatch_queue_t close_queue = op->fd_entry->close_queue;
+       dispatch_source_set_cancel_handler(source, ^{
+               _dispatch_io_debug("stream source cancel", fd);
+               dispatch_resume(close_queue);
+       });
+       stream->source = source;
+       return stream->source;
+}
+
+static void
+_dispatch_stream_source_handler(void *ctx)
+{
+       // On stream queue
+       dispatch_stream_t stream = (dispatch_stream_t)ctx;
+       dispatch_suspend(stream->source);
+       stream->source_running = false;
+       return _dispatch_stream_handler(stream);
+}
+
+static void
+_dispatch_stream_handler(void *ctx)
+{
+       // On stream queue
+       dispatch_stream_t stream = (dispatch_stream_t)ctx;
+       dispatch_operation_t op;
+pick:
+       op = _dispatch_stream_pick_next_operation(stream, stream->op);
+       if (!op) {
+               _dispatch_debug("no operation found: stream %p", stream);
+               return;
+       }
+       int err = _dispatch_io_get_error(op, NULL, true);
+       if (err) {
+               op->err = err;
+               _dispatch_stream_complete_operation(stream, op);
+               goto pick;
+       }
+       stream->op = op;
+       _dispatch_io_debug("stream handler", op->fd_entry->fd);
+       dispatch_fd_entry_t fd_entry = op->fd_entry;
+       _dispatch_fd_entry_retain(fd_entry);
+       // For performance analysis
+       if (!op->total && dispatch_io_defaults.initial_delivery) {
+               // Empty delivery to signal the start of the operation
+               _dispatch_io_debug("initial delivery", op->fd_entry->fd);
+               _dispatch_operation_deliver_data(op, DOP_DELIVER);
+       }
+       // TODO: perform on the operation target queue to get correct priority
+       int result = _dispatch_operation_perform(op), flags = -1;
+       switch (result) {
+       case DISPATCH_OP_DELIVER:
+               flags = DOP_DEFAULT;
+               // Fall through
+       case DISPATCH_OP_DELIVER_AND_COMPLETE:
+               flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
+                               DOP_DEFAULT;
+               _dispatch_operation_deliver_data(op, flags);
+               // Fall through
+       case DISPATCH_OP_COMPLETE:
+               if (flags != DOP_DEFAULT) {
+                       _dispatch_stream_complete_operation(stream, op);
+               }
+               if (_dispatch_stream_operation_avail(stream)) {
+                       dispatch_async_f(stream->dq, stream, _dispatch_stream_handler);
+               }
+               break;
+       case DISPATCH_OP_COMPLETE_RESUME:
+               _dispatch_stream_complete_operation(stream, op);
+               // Fall through
+       case DISPATCH_OP_RESUME:
+               if (_dispatch_stream_operation_avail(stream)) {
+                       stream->source_running = true;
+                       dispatch_resume(_dispatch_stream_source(stream, op));
+               }
+               break;
+       case DISPATCH_OP_ERR:
+               _dispatch_stream_cleanup_operations(stream, op->channel);
+               break;
+       case DISPATCH_OP_FD_ERR:
+               _dispatch_fd_entry_retain(fd_entry);
+               dispatch_async(fd_entry->barrier_queue, ^{
+                       _dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
+                       _dispatch_fd_entry_release(fd_entry);
+               });
+               break;
+       default:
+               break;
+       }
+       _dispatch_fd_entry_release(fd_entry);
+       return;
+}
+
+static void
+_dispatch_disk_handler(void *ctx)
+{
+       // On pick queue
+       dispatch_disk_t disk = (dispatch_disk_t)ctx;
+       if (disk->io_active) {
+               return;
+       }
+       _dispatch_io_debug("disk handler", -1);
+       dispatch_operation_t op;
+       size_t i = disk->free_idx, j = disk->req_idx;
+       if (j <= i) {
+               j += disk->advise_list_depth;
+       }
+       while (i <= j) {
+               if ((!disk->advise_list[i%disk->advise_list_depth]) &&
+                               (op = _dispatch_disk_pick_next_operation(disk))) {
+                       int err = _dispatch_io_get_error(op, NULL, true);
+                       if (err) {
+                               op->err = err;
+                               _dispatch_disk_complete_operation(disk, op);
+                               continue;
+                       }
+                       _dispatch_retain(op);
+                       disk->advise_list[i%disk->advise_list_depth] = op;
+                       op->active = true;
+               } else {
+                       // No more operations to get
+                       break;
+               }
+               i++;
+       }
+       disk->free_idx = (i%disk->advise_list_depth);
+       op = disk->advise_list[disk->req_idx];
+       if (op) {
+               disk->io_active = true;
+               dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
+       }
+}
+
+static void
+_dispatch_disk_perform(void *ctxt)
+{
+       dispatch_disk_t disk = ctxt;
+       size_t chunk_size = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
+       _dispatch_io_debug("disk perform", -1);
+       dispatch_operation_t op;
+       size_t i = disk->advise_idx, j = disk->free_idx;
+       if (j <= i) {
+               j += disk->advise_list_depth;
+       }
+       do {
+               op = disk->advise_list[i%disk->advise_list_depth];
+               if (!op) {
+                       // Nothing more to advise, must be at free_idx
+                       dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
+                       break;
+               }
+               if (op->direction == DOP_DIR_WRITE) {
+                       // TODO: preallocate writes ? rdar://problem/9032172
+                       continue;
+               }
+               if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
+                               op->channel)) {
+                       continue;
+               }
+               // For performance analysis
+               if (!op->total && dispatch_io_defaults.initial_delivery) {
+                       // Empty delivery to signal the start of the operation
+                       _dispatch_io_debug("initial delivery", op->fd_entry->fd);
+                       _dispatch_operation_deliver_data(op, DOP_DELIVER);
+               }
+               // Advise two chunks if the list only has one element and this is the
+               // first advise on the operation
+               if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
+                               !op->advise_offset) {
+                       chunk_size *= 2;
+               }
+               _dispatch_operation_advise(op, chunk_size);
+       } while (++i < j);
+       disk->advise_idx = i%disk->advise_list_depth;
+       op = disk->advise_list[disk->req_idx];
+       int result = _dispatch_operation_perform(op);
+       disk->advise_list[disk->req_idx] = NULL;
+       disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
+       dispatch_async(disk->pick_queue, ^{
+               switch (result) {
+               case DISPATCH_OP_DELIVER:
+                       _dispatch_operation_deliver_data(op, DOP_DELIVER);
+                       break;
+               case DISPATCH_OP_COMPLETE:
+                       _dispatch_disk_complete_operation(disk, op);
+                       break;
+               case DISPATCH_OP_DELIVER_AND_COMPLETE:
+                       _dispatch_operation_deliver_data(op, DOP_DELIVER);
+                       _dispatch_disk_complete_operation(disk, op);
+                       break;
+               case DISPATCH_OP_ERR:
+                       _dispatch_disk_cleanup_operations(disk, op->channel);
+                       break;
+               case DISPATCH_OP_FD_ERR:
+                       _dispatch_disk_cleanup_operations(disk, NULL);
+                       break;
+               default:
+                       dispatch_assert(result);
+                       break;
+               }
+               op->active = false;
+               disk->io_active = false;
+               _dispatch_disk_handler(disk);
+               // Balancing the retain in _dispatch_disk_handler. Note that op must be
+               // released at the very end, since it might hold the last reference to
+               // the disk
+               _dispatch_release(op);
+       });
+}
+
+#pragma mark -
+#pragma mark dispatch_operation_perform
+
+static void
+_dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
+{
+       int err;
+       struct radvisory advise;
+       // No point in issuing a read advise for the next chunk if we are already
+       // a chunk ahead from reading the bytes
+       if (op->advise_offset > (off_t)((op->offset+op->total) + chunk_size +
+                       PAGE_SIZE)) {
+               return;
+       }
+       advise.ra_count = (int)chunk_size;
+       if (!op->advise_offset) {
+               op->advise_offset = op->offset;
+               // If this is the first time through, align the advised range to a
+               // page boundary
+               size_t pg_fraction = (size_t)((op->offset + chunk_size) % PAGE_SIZE);
+               advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
+       }
+       advise.ra_offset = op->advise_offset;
+       op->advise_offset += advise.ra_count;
+       _dispatch_io_syscall_switch(err,
+               fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
+               // TODO: set disk status on error
+               default: (void)dispatch_assume_zero(err); break;
+       );
+}
+
+static int
+_dispatch_operation_perform(dispatch_operation_t op)
+{
+       int err = _dispatch_io_get_error(op, NULL, true);
+       if (err) {
+               goto error;
+       }
+       if (!op->buf) {
+               size_t max_buf_siz = op->params.high;
+               size_t chunk_siz = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
+               if (op->direction == DOP_DIR_READ) {
+                       // If necessary, create a buffer for the ongoing operation, large
+                       // enough to fit chunk_pages but at most high-water
+                       size_t data_siz = dispatch_data_get_size(op->data);
+                       if (data_siz) {
+                               dispatch_assert(data_siz < max_buf_siz);
+                               max_buf_siz -= data_siz;
+                       }
+                       if (max_buf_siz > chunk_siz) {
+                               max_buf_siz = chunk_siz;
+                       }
+                       if (op->length < SIZE_MAX) {
+                               op->buf_siz = op->length - op->total;
+                               if (op->buf_siz > max_buf_siz) {
+                                       op->buf_siz = max_buf_siz;
+                               }
+                       } else {
+                               op->buf_siz = max_buf_siz;
+                       }
+                       op->buf = valloc(op->buf_siz);
+                       _dispatch_io_debug("buffer allocated", op->fd_entry->fd);
+               } else if (op->direction == DOP_DIR_WRITE) {
+                       // Always write the first data piece, if that is smaller than a
+                       // chunk, accumulate further data pieces until chunk size is reached
+                       if (chunk_siz > max_buf_siz) {
+                               chunk_siz = max_buf_siz;
+                       }
+                       op->buf_siz = 0;
+                       dispatch_data_apply(op->data,
+                                       ^(dispatch_data_t region DISPATCH_UNUSED,
+                                       size_t offset DISPATCH_UNUSED,
+                                       const void* buf DISPATCH_UNUSED, size_t len) {
+                               size_t siz = op->buf_siz + len;
+                               if (!op->buf_siz || siz <= chunk_siz) {
+                                       op->buf_siz = siz;
+                               }
+                               return (bool)(siz < chunk_siz);
+                       });
+                       if (op->buf_siz > max_buf_siz) {
+                               op->buf_siz = max_buf_siz;
+                       }
+                       dispatch_data_t d;
+                       d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
+                       op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
+                                       NULL);
+                       _dispatch_io_data_release(d);
+                       _dispatch_io_debug("buffer mapped", op->fd_entry->fd);
+               }
+       }
+       if (op->fd_entry->fd == -1) {
+               err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
+               if (err) {
+                       goto error;
+               }
+       }
+       void *buf = op->buf + op->buf_len;
+       size_t len = op->buf_siz - op->buf_len;
+       off_t off = op->offset + op->total;
+       ssize_t processed = -1;
+syscall:
+       if (op->direction == DOP_DIR_READ) {
+               if (op->params.type == DISPATCH_IO_STREAM) {
+                       processed = read(op->fd_entry->fd, buf, len);
+               } else if (op->params.type == DISPATCH_IO_RANDOM) {
+                       processed = pread(op->fd_entry->fd, buf, len, off);
+               }
+       } else if (op->direction == DOP_DIR_WRITE) {
+               if (op->params.type == DISPATCH_IO_STREAM) {
+                       processed = write(op->fd_entry->fd, buf, len);
+               } else if (op->params.type == DISPATCH_IO_RANDOM) {
+                       processed = pwrite(op->fd_entry->fd, buf, len, off);
+               }
+       }
+       // Encountered an error on the file descriptor
+       if (processed == -1) {
+               err = errno;
+               if (err == EINTR) {
+                       goto syscall;
+               }
+               goto error;
+       }
+       // EOF is indicated by two handler invocations
+       if (processed == 0) {
+               _dispatch_io_debug("EOF", op->fd_entry->fd);
+               return DISPATCH_OP_DELIVER_AND_COMPLETE;
+       }
+       op->buf_len += processed;
+       op->total += processed;
+       if (op->total == op->length) {
+               // Finished processing all the bytes requested by the operation
+               return DISPATCH_OP_COMPLETE;
+       } else {
+               // Deliver data only if we satisfy the filters
+               return DISPATCH_OP_DELIVER;
+       }
+error:
+       if (err == EAGAIN) {
+               // For disk based files with blocking I/O we should never get EAGAIN
+               dispatch_assert(!op->fd_entry->disk);
+               _dispatch_io_debug("EAGAIN %d", op->fd_entry->fd, err);
+               if (op->direction == DOP_DIR_READ && op->total &&
+                               op->channel == op->fd_entry->convenience_channel) {
+                       // Convenience read with available data completes on EAGAIN
+                       return DISPATCH_OP_COMPLETE_RESUME;
+               }
+               return DISPATCH_OP_RESUME;
+       }
+       op->err = err;
+       switch (err) {
+       case ECANCELED:
+               return DISPATCH_OP_ERR;
+       case EBADF:
+               (void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err);
+               return DISPATCH_OP_FD_ERR;
+       default:
+               return DISPATCH_OP_COMPLETE;
+       }
+}
+
+static void
+_dispatch_operation_deliver_data(dispatch_operation_t op,
+               dispatch_op_flags_t flags)
+{
+       // Either called from stream resp. pick queue or when op is finalized
+       dispatch_data_t data = NULL;
+       int err = 0;
+       size_t undelivered = op->undelivered + op->buf_len;
+       bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
+                       (op->flags & DOP_DELIVER);
+       op->flags = DOP_DEFAULT;
+       if (!deliver) {
+               // Don't deliver data until low water mark has been reached
+               if (undelivered >= op->params.low) {
+                       deliver = true;
+               } else if (op->buf_len < op->buf_siz) {
+                       // Request buffer is not yet used up
+                       _dispatch_io_debug("buffer data", op->fd_entry->fd);
+                       return;
+               }
+       } else {
+               err = op->err;
+               if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
+                       err = ECANCELED;
+                       op->err = err;
+               }
+       }
+       // Deliver data or buffer used up
+       if (op->direction == DOP_DIR_READ) {
+               if (op->buf_len) {
+                       void *buf = op->buf;
+                       data = dispatch_data_create(buf, op->buf_len, NULL,
+                                       DISPATCH_DATA_DESTRUCTOR_FREE);
+                       op->buf = NULL;
+                       op->buf_len = 0;
+                       dispatch_data_t d = dispatch_data_create_concat(op->data, data);
+                       _dispatch_io_data_release(op->data);
+                       _dispatch_io_data_release(data);
+                       data = d;
+               } else {
+                       data = op->data;
+               }
+               op->data = deliver ? dispatch_data_empty : data;
+       } else if (op->direction == DOP_DIR_WRITE) {
+               if (deliver) {
+                       data = dispatch_data_create_subrange(op->data, op->buf_len,
+                                       op->length);
+               }
+               if (op->buf_len == op->buf_siz) {
+                       _dispatch_io_data_release(op->buf_data);
+                       op->buf_data = NULL;
+                       op->buf = NULL;
+                       op->buf_len = 0;
+                       // Trim newly written buffer from head of unwritten data
+                       dispatch_data_t d;
+                       if (deliver) {
+                               _dispatch_io_data_retain(data);
+                               d = data;
+                       } else {
+                               d = dispatch_data_create_subrange(op->data, op->buf_len,
+                                               op->length);
+                       }
+                       _dispatch_io_data_release(op->data);
+                       op->data = d;
+               }
+       } else {
+               dispatch_assert(op->direction < DOP_DIR_MAX);
+               return;
+       }
+       if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
+               op->undelivered = undelivered;
+               _dispatch_io_debug("buffer data", op->fd_entry->fd);
+               return;
+       }
+       op->undelivered = 0;
+       _dispatch_io_debug("deliver data", op->fd_entry->fd);
+       dispatch_op_direction_t direction = op->direction;
+       __block dispatch_data_t d = data;
+       dispatch_io_handler_t handler = op->handler;
+#if DISPATCH_IO_DEBUG
+       int fd = op->fd_entry->fd;
+#endif
+       dispatch_fd_entry_t fd_entry = op->fd_entry;
+       _dispatch_fd_entry_retain(fd_entry);
+       dispatch_io_t channel = op->channel;
+       _dispatch_retain(channel);
+       // Note that data delivery may occur after the operation is freed
+       dispatch_async(op->op_q, ^{
+               bool done = (flags & DOP_DONE);
+               if (done) {
+                       if (direction == DOP_DIR_READ && err) {
+                               if (dispatch_data_get_size(d)) {
+                                       _dispatch_io_debug("IO handler invoke", fd);
+                                       handler(false, d, 0);
+                               }
+                               d = NULL;
+                       } else if (direction == DOP_DIR_WRITE && !err) {
+                               d = NULL;
+                       }
+               }
+               _dispatch_io_debug("IO handler invoke", fd);
+               handler(done, d, err);
+               _dispatch_release(channel);
+               _dispatch_fd_entry_release(fd_entry);
+               _dispatch_io_data_release(data);
+       });
+}