static void
_dispatch_io_dispose(dispatch_io_t channel)
{
- if (channel->fd_entry) {
+ if (channel->fd_entry && !(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
if (channel->fd_entry->path_data) {
// This modification is safe since path_data->channel is checked
// only on close_queue (which is still suspended at this point)
if (fd_entry) {
_dispatch_io_debug("io stop cleanup", channel->fd);
_dispatch_fd_entry_cleanup_operations(fd_entry, channel);
- channel->fd_entry = NULL;
- _dispatch_fd_entry_release(fd_entry);
+ if (!(channel->atomic_flags & DIO_CLOSED)) {
+ channel->fd_entry = NULL;
+ _dispatch_fd_entry_release(fd_entry);
+ }
} else if (channel->fd != -1) {
// Stop after close, need to check if fd_entry still exists
_dispatch_retain(channel);
dispatch_async(channel->queue, ^{
dispatch_async(channel->barrier_queue, ^{
_dispatch_io_debug("io close", channel->fd);
- (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED);
- dispatch_fd_entry_t fd_entry = channel->fd_entry;
- if (fd_entry) {
+ if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
+ (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED);
+ dispatch_fd_entry_t fd_entry = channel->fd_entry;
if (!fd_entry->path_data) {
channel->fd_entry = NULL;
}
dispatch_async(disk->pick_queue, ^{
switch (result) {
case DISPATCH_OP_DELIVER:
- _dispatch_operation_deliver_data(op, DOP_DELIVER);
+ _dispatch_operation_deliver_data(op, DOP_DEFAULT);
break;
case DISPATCH_OP_COMPLETE:
_dispatch_disk_complete_operation(disk, op);
break;
case DISPATCH_OP_DELIVER_AND_COMPLETE:
- _dispatch_operation_deliver_data(op, DOP_DELIVER);
+ _dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
_dispatch_disk_complete_operation(disk, op);
break;
case DISPATCH_OP_ERR:
data = dispatch_data_create_subrange(op->data, op->buf_len,
op->length);
}
- if (op->buf_len == op->buf_siz) {
+ if (op->buf_data && op->buf_len == op->buf_siz) {
_dispatch_io_data_release(op->buf_data);
op->buf_data = NULL;
op->buf = NULL;
_dispatch_io_data_retain(data);
d = data;
} else {
- d = dispatch_data_create_subrange(op->data, op->buf_len,
+ d = dispatch_data_create_subrange(op->data, op->buf_siz,
op->length);
}
_dispatch_io_data_release(op->data);
op->undelivered = 0;
_dispatch_io_debug("deliver data", op->fd_entry->fd);
dispatch_op_direction_t direction = op->direction;
- __block dispatch_data_t d = data;
dispatch_io_handler_t handler = op->handler;
#if DISPATCH_IO_DEBUG
int fd = op->fd_entry->fd;
// Note that data delivery may occur after the operation is freed
dispatch_async(op->op_q, ^{
bool done = (flags & DOP_DONE);
+ dispatch_data_t d = data;
if (done) {
if (direction == DOP_DIR_READ && err) {
if (dispatch_data_get_size(d)) {
return &_dispatch_mgr_q;
}
_dispatch_kevent_unregister(ds);
- return ds->do_targetq;
- } else if (dr->ds_cancel_handler) {
+ }
+ if (dr->ds_cancel_handler || ds->ds_handler_is_block ||
+ ds->ds_registration_is_block) {
if (dq != ds->do_targetq) {
return ds->do_targetq;
}
// The source needs to be uninstalled from the manager queue, or the
// cancellation handler needs to be delivered to the target queue.
// Note: cancellation assumes installation.
- if (ds->ds_dkev || dr->ds_cancel_handler) {
+ if (ds->ds_dkev || dr->ds_cancel_handler
+#ifdef __BLOCKS__
+ || ds->ds_handler_is_block || ds->ds_registration_is_block
+#endif
+ ) {
return true;
}
} else if (ds->ds_pending_data) {