2  * Copyright (c) 2009-2013 Apple Inc. All rights reserved. 
   4  * @APPLE_APACHE_LICENSE_HEADER_START@ 
   6  * Licensed under the Apache License, Version 2.0 (the "License"); 
   7  * you may not use this file except in compliance with the License. 
   8  * You may obtain a copy of the License at 
  10  *     http://www.apache.org/licenses/LICENSE-2.0 
  12  * Unless required by applicable law or agreed to in writing, software 
  13  * distributed under the License is distributed on an "AS IS" BASIS, 
  14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  15  * See the License for the specific language governing permissions and 
  16  * limitations under the License. 
  18  * @APPLE_APACHE_LICENSE_HEADER_END@ 
  23 #ifndef DISPATCH_IO_DEBUG 
  24 #define DISPATCH_IO_DEBUG DISPATCH_DEBUG 
  28 #define PAGE_SIZE getpagesize() 
  31 #if DISPATCH_DATA_IS_BRIDGED_TO_NSDATA 
  32 #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x) 
  33 #define _dispatch_io_data_release(x) _dispatch_objc_release(x) 
  35 #define _dispatch_io_data_retain(x) dispatch_retain(x) 
  36 #define _dispatch_io_data_release(x) dispatch_release(x) 
  39 typedef void (^dispatch_fd_entry_init_callback_t
)(dispatch_fd_entry_t fd_entry
); 
  41 DISPATCH_EXPORT DISPATCH_NOTHROW
 
  42 void _dispatch_iocntl(uint32_t param
, uint64_t value
); 
  44 static dispatch_operation_t 
_dispatch_operation_create( 
  45                 dispatch_op_direction_t direction
, dispatch_io_t channel
, off_t offset
, 
  46                 size_t length
, dispatch_data_t data
, dispatch_queue_t queue
, 
  47                 dispatch_io_handler_t handler
); 
  48 static void _dispatch_operation_enqueue(dispatch_operation_t op
, 
  49                 dispatch_op_direction_t direction
, dispatch_data_t data
); 
  50 static dispatch_source_t 
_dispatch_operation_timer(dispatch_queue_t tq
, 
  51                 dispatch_operation_t op
); 
  52 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
); 
  53 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
); 
  54 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd
, 
  55                 dispatch_fd_entry_init_callback_t completion_callback
); 
  56 static dispatch_fd_entry_t 
_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
, 
  58 static dispatch_fd_entry_t 
_dispatch_fd_entry_create_with_path( 
  59                 dispatch_io_path_data_t path_data
, dev_t dev
, mode_t mode
); 
  60 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
, 
  61                 dispatch_io_t channel
); 
  62 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
, 
  63                 dispatch_io_t channel
); 
  64 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry
, 
  66 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
, 
  67                 dispatch_op_direction_t direction
); 
  68 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
); 
  69 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream
, 
  70                 dispatch_operation_t operation
, dispatch_data_t data
); 
  71 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk
, 
  72                 dispatch_operation_t operation
, dispatch_data_t data
); 
  73 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream
, 
  74                 dispatch_io_t channel
); 
  75 static void _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk
, 
  76                 dispatch_io_t channel
); 
  77 static void _dispatch_stream_source_handler(void *ctx
); 
  78 static void _dispatch_stream_queue_handler(void *ctx
); 
  79 static void _dispatch_stream_handler(void *ctx
); 
  80 static void _dispatch_disk_handler(void *ctx
); 
  81 static void _dispatch_disk_perform(void *ctxt
); 
  82 static void _dispatch_operation_advise(dispatch_operation_t op
, 
  84 static int _dispatch_operation_perform(dispatch_operation_t op
); 
  85 static void _dispatch_operation_deliver_data(dispatch_operation_t op
, 
  86                 dispatch_op_flags_t flags
); 
  88 // Macros to wrap syscalls which return -1 on error, and retry on EINTR 
  89 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \ 
  90                 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \ 
  91                 case EINTR: continue; \ 
  96 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \ 
  97                 _dispatch_io_syscall_switch_noerr(__err, __syscall, \ 
 102 #define _dispatch_io_syscall(__syscall) do { int __err; \ 
 103                 _dispatch_io_syscall_switch(__err, __syscall); \ 
 107         DISPATCH_OP_COMPLETE 
= 1, 
 109         DISPATCH_OP_DELIVER_AND_COMPLETE
, 
 110         DISPATCH_OP_COMPLETE_RESUME
, 
 116 #define _dispatch_io_Block_copy(x) \ 
 117                 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x))) 
 120 #pragma mark dispatch_io_debug 
 122 #if DISPATCH_IO_DEBUG 
 124 #define _dispatch_io_log(x, ...) do { \ 
 125                         _dispatch_log("%llu\t%p\t" x, _dispatch_absolute_time(), \ 
 126                         (void *)_dispatch_thread_self(), ##__VA_ARGS__); \ 
 128 #ifdef _dispatch_object_debug 
 129 #undef _dispatch_object_debug 
 130 #define _dispatch_object_debug dispatch_debug 
 131 #pragma clang diagnostic ignored "-Wdeprecated-declarations" 
 134 #define _dispatch_io_log(x, ...) _dispatch_debug(x, ##__VA_ARGS__) 
 135 #endif // DISPATCH_DEBUG 
 137 #define _dispatch_io_log(x, ...) 
 138 #endif // DISPATCH_IO_DEBUG 
 140 #define _dispatch_fd_debug(msg, fd, ...) \ 
 141                 _dispatch_io_log("fd[0x%x]: " msg, fd, ##__VA_ARGS__) 
 142 #define _dispatch_op_debug(msg, op, ...) \ 
 143                 _dispatch_io_log("op[%p]: " msg, op, ##__VA_ARGS__) 
 144 #define _dispatch_channel_debug(msg, channel, ...) \ 
 145                 _dispatch_io_log("channel[%p]: " msg, channel, ##__VA_ARGS__) 
 146 #define _dispatch_fd_entry_debug(msg, fd_entry, ...) \ 
 147                 _dispatch_io_log("fd_entry[%p]: " msg, fd_entry, ##__VA_ARGS__) 
 148 #define _dispatch_disk_debug(msg, disk, ...) \ 
 149                 _dispatch_io_log("disk[%p]: " msg, disk, ##__VA_ARGS__) 
 152 #pragma mark dispatch_io_hashtables 
 154 // Global hashtable of dev_t -> disk_s mappings 
 155 DISPATCH_CACHELINE_ALIGN
 
 156 static TAILQ_HEAD(, dispatch_disk_s
) _dispatch_io_devs
[DIO_HASH_SIZE
]; 
 157 // Global hashtable of fd -> fd_entry_s mappings 
 158 DISPATCH_CACHELINE_ALIGN
 
 159 static TAILQ_HEAD(, dispatch_fd_entry_s
) _dispatch_io_fds
[DIO_HASH_SIZE
]; 
 161 static dispatch_once_t  _dispatch_io_devs_lockq_pred
; 
 162 static dispatch_queue_t _dispatch_io_devs_lockq
; 
 163 static dispatch_queue_t _dispatch_io_fds_lockq
; 
 165 static char const * const _dispatch_io_key 
= "io"; 
 168 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED
) 
 170         _dispatch_io_fds_lockq 
= dispatch_queue_create( 
 171                         "com.apple.libdispatch-io.fd_lockq", NULL
); 
 173         for (i 
= 0; i 
< DIO_HASH_SIZE
; i
++) { 
 174                 TAILQ_INIT(&_dispatch_io_fds
[i
]); 
 179 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED
) 
 181         _dispatch_io_devs_lockq 
= dispatch_queue_create( 
 182                         "com.apple.libdispatch-io.dev_lockq", NULL
); 
 184         for (i 
= 0; i 
< DIO_HASH_SIZE
; i
++) { 
 185                 TAILQ_INIT(&_dispatch_io_devs
[i
]); 
 190 #pragma mark dispatch_io_defaults 
 193         DISPATCH_IOCNTL_CHUNK_PAGES 
= 1, 
 194         DISPATCH_IOCNTL_LOW_WATER_CHUNKS
, 
 195         DISPATCH_IOCNTL_INITIAL_DELIVERY
, 
 196         DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
, 
 199 static struct dispatch_io_defaults_s 
{ 
 200         size_t chunk_size
, low_water_chunks
, max_pending_io_reqs
; 
 201         bool initial_delivery
; 
 202 } dispatch_io_defaults 
= { 
 203         .chunk_size 
= DIO_MAX_CHUNK_SIZE
, 
 204         .low_water_chunks 
= DIO_DEFAULT_LOW_WATER_CHUNKS
, 
 205         .max_pending_io_reqs 
= DIO_MAX_PENDING_IO_REQS
, 
 208 #define _dispatch_iocntl_set_default(p, v) do { \ 
 209                 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \ 
 213 _dispatch_iocntl(uint32_t param
, uint64_t value
) 
 216         case DISPATCH_IOCNTL_CHUNK_PAGES
: 
 217                 _dispatch_iocntl_set_default(chunk_size
, value 
* PAGE_SIZE
); 
 219         case DISPATCH_IOCNTL_LOW_WATER_CHUNKS
: 
 220                 _dispatch_iocntl_set_default(low_water_chunks
, value
); 
 222         case DISPATCH_IOCNTL_INITIAL_DELIVERY
: 
 223                 _dispatch_iocntl_set_default(initial_delivery
, value
); 
 224         case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
: 
 225                 _dispatch_iocntl_set_default(max_pending_io_reqs
, value
); 
 231 #pragma mark dispatch_io_t 
 234 _dispatch_io_create(dispatch_io_type_t type
) 
 236         dispatch_io_t channel 
= _dispatch_alloc(DISPATCH_VTABLE(io
), 
 237                         sizeof(struct dispatch_io_s
)); 
 238         channel
->do_next 
= DISPATCH_OBJECT_LISTLESS
; 
 239         channel
->do_targetq 
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
, 
 241         channel
->params
.type 
= type
; 
 242         channel
->params
.high 
= SIZE_MAX
; 
 243         channel
->params
.low 
= dispatch_io_defaults
.low_water_chunks 
* 
 244                         dispatch_io_defaults
.chunk_size
; 
 245         channel
->queue 
= dispatch_queue_create("com.apple.libdispatch-io.channelq", 
 251 _dispatch_io_init(dispatch_io_t channel
, dispatch_fd_entry_t fd_entry
, 
 252                 dispatch_queue_t queue
, int err
, void (^cleanup_handler
)(int)) 
 254         // Enqueue the cleanup handler on the suspended close queue 
 255         if (cleanup_handler
) { 
 256                 _dispatch_retain(queue
); 
 257                 dispatch_async(!err 
? fd_entry
->close_queue 
: channel
->queue
, ^{ 
 258                         dispatch_async(queue
, ^{ 
 259                                 _dispatch_channel_debug("cleanup handler invoke: err %d", 
 261                                 cleanup_handler(err
); 
 263                         _dispatch_release(queue
); 
 267                 channel
->fd_entry 
= fd_entry
; 
 268                 dispatch_retain(fd_entry
->barrier_queue
); 
 269                 dispatch_retain(fd_entry
->barrier_group
); 
 270                 channel
->barrier_queue 
= fd_entry
->barrier_queue
; 
 271                 channel
->barrier_group 
= fd_entry
->barrier_group
; 
 273                 // Still need to create a barrier queue, since all operations go 
 275                 channel
->barrier_queue 
= dispatch_queue_create( 
 276                                 "com.apple.libdispatch-io.barrierq", NULL
); 
 277                 channel
->barrier_group 
= dispatch_group_create(); 
 282 _dispatch_io_dispose(dispatch_io_t channel
) 
 284         _dispatch_object_debug(channel
, "%s", __func__
); 
 285         if (channel
->fd_entry 
&& 
 286                         !(channel
->atomic_flags 
& (DIO_CLOSED
|DIO_STOPPED
))) { 
 287                 if (channel
->fd_entry
->path_data
) { 
 288                         // This modification is safe since path_data->channel is checked 
 289                         // only on close_queue (which is still suspended at this point) 
 290                         channel
->fd_entry
->path_data
->channel 
= NULL
; 
 292                 // Cleanup handlers will only run when all channels related to this 
 294                 _dispatch_fd_entry_release(channel
->fd_entry
); 
 296         if (channel
->queue
) { 
 297                 dispatch_release(channel
->queue
); 
 299         if (channel
->barrier_queue
) { 
 300                 dispatch_release(channel
->barrier_queue
); 
 302         if (channel
->barrier_group
) { 
 303                 dispatch_release(channel
->barrier_group
); 
 308 _dispatch_io_validate_type(dispatch_io_t channel
, mode_t mode
) 
 313         } else if (channel
->params
.type 
== DISPATCH_IO_RANDOM 
&& 
 314                         (S_ISFIFO(mode
) || S_ISSOCK(mode
))) { 
 321 _dispatch_io_get_error(dispatch_operation_t op
, dispatch_io_t channel
, 
 327                 channel 
= op
->channel
; 
 329         if (channel
->atomic_flags 
& (DIO_CLOSED
|DIO_STOPPED
)) { 
 330                 if (!ignore_closed 
|| channel
->atomic_flags 
& DIO_STOPPED
) { 
 336                 err 
= op 
? op
->fd_entry
->err 
: channel
->err
; 
 342 #pragma mark dispatch_io_channels 
 345 dispatch_io_create(dispatch_io_type_t type
, dispatch_fd_t fd
, 
 346                 dispatch_queue_t queue
, void (^cleanup_handler
)(int)) 
 348         if (type 
!= DISPATCH_IO_STREAM 
&& type 
!= DISPATCH_IO_RANDOM
) { 
 349                 return DISPATCH_BAD_INPUT
; 
 351         dispatch_io_t channel 
= _dispatch_io_create(type
); 
 353         _dispatch_channel_debug("create", channel
); 
 354         channel
->fd_actual 
= fd
; 
 355         dispatch_suspend(channel
->queue
); 
 356         _dispatch_retain(queue
); 
 357         _dispatch_retain(channel
); 
 358         _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) { 
 360                 int err 
= fd_entry
->err
; 
 362                         err 
= _dispatch_io_validate_type(channel
, fd_entry
->stat
.mode
); 
 364                 if (!err 
&& type 
== DISPATCH_IO_RANDOM
) { 
 366                         _dispatch_io_syscall_switch_noerr(err
, 
 367                                 f_ptr 
= lseek(fd_entry
->fd
, 0, SEEK_CUR
), 
 368                                 case 0: channel
->f_ptr 
= f_ptr
; break; 
 369                                 default: (void)dispatch_assume_zero(err
); break; 
 373                 _dispatch_fd_entry_retain(fd_entry
); 
 374                 _dispatch_io_init(channel
, fd_entry
, queue
, err
, cleanup_handler
); 
 375                 dispatch_resume(channel
->queue
); 
 376                 _dispatch_object_debug(channel
, "%s", __func__
); 
 377                 _dispatch_release(channel
); 
 378                 _dispatch_release(queue
); 
 380         _dispatch_object_debug(channel
, "%s", __func__
); 
 385 dispatch_io_create_f(dispatch_io_type_t type
, dispatch_fd_t fd
, 
 386                 dispatch_queue_t queue
, void *context
, 
 387                 void (*cleanup_handler
)(void *context
, int error
)) 
 389         return dispatch_io_create(type
, fd
, queue
, !cleanup_handler 
? NULL 
: 
 390                         ^(int error
){ cleanup_handler(context
, error
); }); 
 394 dispatch_io_create_with_path(dispatch_io_type_t type
, const char *path
, 
 395                 int oflag
, mode_t mode
, dispatch_queue_t queue
, 
 396                 void (^cleanup_handler
)(int error
)) 
 398         if ((type 
!= DISPATCH_IO_STREAM 
&& type 
!= DISPATCH_IO_RANDOM
) || 
 400                 return DISPATCH_BAD_INPUT
; 
 402         size_t pathlen 
= strlen(path
); 
 403         dispatch_io_path_data_t path_data 
= malloc(sizeof(*path_data
) + pathlen
+1); 
 405                 return DISPATCH_OUT_OF_MEMORY
; 
 407         dispatch_io_t channel 
= _dispatch_io_create(type
); 
 409         _dispatch_channel_debug("create with path %s", channel
, path
); 
 410         channel
->fd_actual 
= -1; 
 411         path_data
->channel 
= channel
; 
 412         path_data
->oflag 
= oflag
; 
 413         path_data
->mode 
= mode
; 
 414         path_data
->pathlen 
= pathlen
; 
 415         memcpy(path_data
->path
, path
, pathlen 
+ 1); 
 416         _dispatch_retain(queue
); 
 417         _dispatch_retain(channel
); 
 418         dispatch_async(channel
->queue
, ^{ 
 421                 _dispatch_io_syscall_switch_noerr(err
, 
 422                         (path_data
->oflag 
& O_NOFOLLOW
) == O_NOFOLLOW
 
 424                                         || (path_data
->oflag 
& O_SYMLINK
) == O_SYMLINK
 
 426                                         ? lstat(path_data
->path
, &st
) : stat(path_data
->path
, &st
), 
 428                                 err 
= _dispatch_io_validate_type(channel
, st
.st_mode
); 
 431                                 if ((path_data
->oflag 
& O_CREAT
) && 
 432                                                 (*(path_data
->path 
+ path_data
->pathlen 
- 1) != '/')) { 
 433                                         // Check parent directory 
 434                                         char *c 
= strrchr(path_data
->path
, '/'); 
 438                                         _dispatch_io_syscall_switch_noerr(perr
, 
 439                                                 stat(path_data
->path
, &st
), 
 441                                                         // Since the parent directory exists, open() will 
 442                                                         // create a regular file after the fd_entry has 
 444                                                         st
.st_mode 
= S_IFREG
; 
 455                         _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
); 
 456                         _dispatch_release(channel
); 
 457                         _dispatch_release(queue
); 
 460                 dispatch_suspend(channel
->queue
); 
 461                 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
, 
 462                                 _dispatch_io_devs_lockq_init
); 
 463                 dispatch_async(_dispatch_io_devs_lockq
, ^{ 
 464                         dispatch_fd_entry_t fd_entry 
= _dispatch_fd_entry_create_with_path( 
 465                                         path_data
, st
.st_dev
, st
.st_mode
); 
 466                         _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
); 
 467                         dispatch_resume(channel
->queue
); 
 468                         _dispatch_object_debug(channel
, "%s", __func__
); 
 469                         _dispatch_release(channel
); 
 470                         _dispatch_release(queue
); 
 473         _dispatch_object_debug(channel
, "%s", __func__
); 
 478 dispatch_io_create_with_path_f(dispatch_io_type_t type
, const char *path
, 
 479                 int oflag
, mode_t mode
, dispatch_queue_t queue
, void *context
, 
 480                 void (*cleanup_handler
)(void *context
, int error
)) 
 482         return dispatch_io_create_with_path(type
, path
, oflag
, mode
, queue
, 
 483                         !cleanup_handler 
? NULL 
: 
 484                         ^(int error
){ cleanup_handler(context
, error
); }); 
 488 dispatch_io_create_with_io(dispatch_io_type_t type
, dispatch_io_t in_channel
, 
 489                 dispatch_queue_t queue
, void (^cleanup_handler
)(int error
)) 
 491         if (type 
!= DISPATCH_IO_STREAM 
&& type 
!= DISPATCH_IO_RANDOM
) { 
 492                 return DISPATCH_BAD_INPUT
; 
 494         dispatch_io_t channel 
= _dispatch_io_create(type
); 
 495         _dispatch_channel_debug("create with channel %p", channel
, in_channel
); 
 496         dispatch_suspend(channel
->queue
); 
 497         _dispatch_retain(queue
); 
 498         _dispatch_retain(channel
); 
 499         _dispatch_retain(in_channel
); 
 500         dispatch_async(in_channel
->queue
, ^{ 
 501                 int err0 
= _dispatch_io_get_error(NULL
, in_channel
, false); 
 504                         _dispatch_io_init(channel
, NULL
, queue
, err0
, cleanup_handler
); 
 505                         dispatch_resume(channel
->queue
); 
 506                         _dispatch_release(channel
); 
 507                         _dispatch_release(in_channel
); 
 508                         _dispatch_release(queue
); 
 511                 dispatch_async(in_channel
->barrier_queue
, ^{ 
 512                         int err 
= _dispatch_io_get_error(NULL
, in_channel
, false); 
 513                         // If there is no error, the fd_entry for the in_channel is valid. 
 514                         // Since we are running on in_channel's queue, the fd_entry has been 
 515                         // fully resolved and will stay valid for the duration of this block 
 517                                 err 
= in_channel
->err
; 
 519                                         err 
= in_channel
->fd_entry
->err
; 
 523                                 err 
= _dispatch_io_validate_type(channel
, 
 524                                                 in_channel
->fd_entry
->stat
.mode
); 
 526                         if (!err 
&& type 
== DISPATCH_IO_RANDOM 
&& in_channel
->fd 
!= -1) { 
 528                                 _dispatch_io_syscall_switch_noerr(err
, 
 529                                         f_ptr 
= lseek(in_channel
->fd_entry
->fd
, 0, SEEK_CUR
), 
 530                                         case 0: channel
->f_ptr 
= f_ptr
; break; 
 531                                         default: (void)dispatch_assume_zero(err
); break; 
 536                                 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
); 
 537                                 dispatch_resume(channel
->queue
); 
 538                                 _dispatch_release(channel
); 
 539                                 _dispatch_release(in_channel
); 
 540                                 _dispatch_release(queue
); 
 543                         if (in_channel
->fd 
== -1) { 
 544                                 // in_channel was created from path 
 546                                 channel
->fd_actual 
= -1; 
 547                                 mode_t mode 
= in_channel
->fd_entry
->stat
.mode
; 
 548                                 dev_t dev 
= in_channel
->fd_entry
->stat
.dev
; 
 549                                 size_t path_data_len 
= sizeof(struct dispatch_io_path_data_s
) + 
 550                                                 in_channel
->fd_entry
->path_data
->pathlen 
+ 1; 
 551                                 dispatch_io_path_data_t path_data 
= malloc(path_data_len
); 
 552                                 memcpy(path_data
, in_channel
->fd_entry
->path_data
, 
 554                                 path_data
->channel 
= channel
; 
 555                                 // lockq_io_devs is known to already exist 
 556                                 dispatch_async(_dispatch_io_devs_lockq
, ^{ 
 557                                         dispatch_fd_entry_t fd_entry
; 
 558                                         fd_entry 
= _dispatch_fd_entry_create_with_path(path_data
, 
 560                                         _dispatch_io_init(channel
, fd_entry
, queue
, 0, 
 562                                         dispatch_resume(channel
->queue
); 
 563                                         _dispatch_release(channel
); 
 564                                         _dispatch_release(queue
); 
 567                                 dispatch_fd_entry_t fd_entry 
= in_channel
->fd_entry
; 
 568                                 channel
->fd 
= in_channel
->fd
; 
 569                                 channel
->fd_actual 
= in_channel
->fd_actual
; 
 570                                 _dispatch_fd_entry_retain(fd_entry
); 
 571                                 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
); 
 572                                 dispatch_resume(channel
->queue
); 
 573                                 _dispatch_release(channel
); 
 574                                 _dispatch_release(queue
); 
 576                         _dispatch_release(in_channel
); 
 577                         _dispatch_object_debug(channel
, "%s", __func__
); 
 580         _dispatch_object_debug(channel
, "%s", __func__
); 
 585 dispatch_io_create_with_io_f(dispatch_io_type_t type
, dispatch_io_t in_channel
, 
 586                 dispatch_queue_t queue
, void *context
, 
 587                 void (*cleanup_handler
)(void *context
, int error
)) 
 589         return dispatch_io_create_with_io(type
, in_channel
, queue
, 
 590                         !cleanup_handler 
? NULL 
: 
 591                         ^(int error
){ cleanup_handler(context
, error
); }); 
 595 #pragma mark dispatch_io_accessors 
 598 dispatch_io_set_high_water(dispatch_io_t channel
, size_t high_water
) 
 600         _dispatch_retain(channel
); 
 601         dispatch_async(channel
->queue
, ^{ 
 602                 _dispatch_channel_debug("set high water: %zu", channel
, high_water
); 
 603                 if (channel
->params
.low 
> high_water
) { 
 604                         channel
->params
.low 
= high_water
; 
 606                 channel
->params
.high 
= high_water 
? high_water 
: 1; 
 607                 _dispatch_release(channel
); 
 612 dispatch_io_set_low_water(dispatch_io_t channel
, size_t low_water
) 
 614         _dispatch_retain(channel
); 
 615         dispatch_async(channel
->queue
, ^{ 
 616                 _dispatch_channel_debug("set low water: %zu", channel
, low_water
); 
 617                 if (channel
->params
.high 
< low_water
) { 
 618                         channel
->params
.high 
= low_water 
? low_water 
: 1; 
 620                 channel
->params
.low 
= low_water
; 
 621                 _dispatch_release(channel
); 
 626 dispatch_io_set_interval(dispatch_io_t channel
, uint64_t interval
, 
 629         _dispatch_retain(channel
); 
 630         dispatch_async(channel
->queue
, ^{ 
 631                 _dispatch_channel_debug("set interval: %llu", channel
, interval
); 
 632                 channel
->params
.interval 
= interval 
< INT64_MAX 
? interval 
: INT64_MAX
; 
 633                 channel
->params
.interval_flags 
= flags
; 
 634                 _dispatch_release(channel
); 
 639 _dispatch_io_set_target_queue(dispatch_io_t channel
, dispatch_queue_t dq
) 
 641         _dispatch_retain(dq
); 
 642         _dispatch_retain(channel
); 
 643         dispatch_async(channel
->queue
, ^{ 
 644                 dispatch_queue_t prev_dq 
= channel
->do_targetq
; 
 645                 channel
->do_targetq 
= dq
; 
 646                 _dispatch_release(prev_dq
); 
 647                 _dispatch_object_debug(channel
, "%s", __func__
); 
 648                 _dispatch_release(channel
); 
 653 dispatch_io_get_descriptor(dispatch_io_t channel
) 
 655         if (channel
->atomic_flags 
& (DIO_CLOSED
|DIO_STOPPED
)) { 
 658         dispatch_fd_t fd 
= channel
->fd_actual
; 
 659         if (fd 
== -1 && !_dispatch_io_get_error(NULL
, channel
, false)) { 
 660                 dispatch_thread_context_t ctxt 
= 
 661                                 _dispatch_thread_context_find(_dispatch_io_key
); 
 662                 if (ctxt 
&& ctxt
->dtc_io_in_barrier 
== channel
) { 
 663                         (void)_dispatch_fd_entry_open(channel
->fd_entry
, channel
); 
 666         return channel
->fd_actual
; 
 670 #pragma mark dispatch_io_operations 
 673 _dispatch_io_stop(dispatch_io_t channel
) 
 675         _dispatch_channel_debug("stop", channel
); 
 676         (void)os_atomic_or2o(channel
, atomic_flags
, DIO_STOPPED
, relaxed
); 
 677         _dispatch_retain(channel
); 
 678         dispatch_async(channel
->queue
, ^{ 
 679                 dispatch_async(channel
->barrier_queue
, ^{ 
 680                         _dispatch_object_debug(channel
, "%s", __func__
); 
 681                         dispatch_fd_entry_t fd_entry 
= channel
->fd_entry
; 
 683                                 _dispatch_channel_debug("stop cleanup", channel
); 
 684                                 _dispatch_fd_entry_cleanup_operations(fd_entry
, channel
); 
 685                                 if (!(channel
->atomic_flags 
& DIO_CLOSED
)) { 
 686                                         channel
->fd_entry 
= NULL
; 
 687                                         _dispatch_fd_entry_release(fd_entry
); 
 689                         } else if (channel
->fd 
!= -1) { 
 690                                 // Stop after close, need to check if fd_entry still exists 
 691                                 _dispatch_retain(channel
); 
 692                                 dispatch_async(_dispatch_io_fds_lockq
, ^{ 
 693                                         _dispatch_object_debug(channel
, "%s", __func__
); 
 694                                         _dispatch_channel_debug("stop cleanup after close", 
 696                                         dispatch_fd_entry_t fdi
; 
 697                                         uintptr_t hash 
= DIO_HASH(channel
->fd
); 
 698                                         TAILQ_FOREACH(fdi
, &_dispatch_io_fds
[hash
], fd_list
) { 
 699                                                 if (fdi
->fd 
== channel
->fd
) { 
 700                                                         _dispatch_fd_entry_cleanup_operations(fdi
, channel
); 
 704                                         _dispatch_release(channel
); 
 707                         _dispatch_release(channel
); 
 713 dispatch_io_close(dispatch_io_t channel
, unsigned long flags
) 
 715         if (flags 
& DISPATCH_IO_STOP
) { 
 716                 // Don't stop an already stopped channel 
 717                 if (channel
->atomic_flags 
& DIO_STOPPED
) { 
 720                 return _dispatch_io_stop(channel
); 
 722         // Don't close an already closed or stopped channel 
 723         if (channel
->atomic_flags 
& (DIO_CLOSED
|DIO_STOPPED
)) { 
 726         _dispatch_retain(channel
); 
 727         dispatch_async(channel
->queue
, ^{ 
 728                 dispatch_async(channel
->barrier_queue
, ^{ 
 729                         _dispatch_object_debug(channel
, "%s", __func__
); 
 730                         _dispatch_channel_debug("close", channel
); 
 731                         if (!(channel
->atomic_flags 
& (DIO_CLOSED
|DIO_STOPPED
))) { 
 732                                 (void)os_atomic_or2o(channel
, atomic_flags
, DIO_CLOSED
, 
 734                                 dispatch_fd_entry_t fd_entry 
= channel
->fd_entry
; 
 736                                         if (!fd_entry
->path_data
) { 
 737                                                 channel
->fd_entry 
= NULL
; 
 739                                         _dispatch_fd_entry_release(fd_entry
); 
 742                         _dispatch_release(channel
); 
 748 dispatch_io_barrier(dispatch_io_t channel
, dispatch_block_t barrier
) 
 750         _dispatch_retain(channel
); 
 751         dispatch_async(channel
->queue
, ^{ 
 752                 dispatch_queue_t io_q 
= channel
->do_targetq
; 
 753                 dispatch_queue_t barrier_queue 
= channel
->barrier_queue
; 
 754                 dispatch_group_t barrier_group 
= channel
->barrier_group
; 
 755                 dispatch_async(barrier_queue
, ^{ 
 756                         dispatch_suspend(barrier_queue
); 
 757                         dispatch_group_notify(barrier_group
, io_q
, ^{ 
 758                                 dispatch_thread_context_s io_ctxt 
= { 
 759                                         .dtc_key 
= _dispatch_io_key
, 
 760                                         .dtc_io_in_barrier 
= channel
, 
 763                                 _dispatch_object_debug(channel
, "%s", __func__
); 
 764                                 _dispatch_thread_context_push(&io_ctxt
); 
 766                                 _dispatch_thread_context_pop(&io_ctxt
); 
 767                                 dispatch_resume(barrier_queue
); 
 768                                 _dispatch_release(channel
); 
 775 dispatch_io_barrier_f(dispatch_io_t channel
, void *context
, 
 776                 dispatch_function_t barrier
) 
 778         return dispatch_io_barrier(channel
, ^{ barrier(context
); }); 
 782 dispatch_io_read(dispatch_io_t channel
, off_t offset
, size_t length
, 
 783                 dispatch_queue_t queue
, dispatch_io_handler_t handler
) 
 785         _dispatch_retain(channel
); 
 786         _dispatch_retain(queue
); 
 787         dispatch_async(channel
->queue
, ^{ 
 788                 dispatch_operation_t op
; 
 789                 op 
= _dispatch_operation_create(DOP_DIR_READ
, channel
, offset
, 
 790                                 length
, dispatch_data_empty
, queue
, handler
); 
 792                         dispatch_queue_t barrier_q 
= channel
->barrier_queue
; 
 793                         dispatch_async(barrier_q
, ^{ 
 794                                 _dispatch_operation_enqueue(op
, DOP_DIR_READ
, 
 795                                                 dispatch_data_empty
); 
 798                 _dispatch_release(channel
); 
 799                 _dispatch_release(queue
); 
 804 dispatch_io_read_f(dispatch_io_t channel
, off_t offset
, size_t length
, 
 805                 dispatch_queue_t queue
, void *context
, 
 806                 dispatch_io_handler_function_t handler
) 
 808         return dispatch_io_read(channel
, offset
, length
, queue
, 
 809                         ^(bool done
, dispatch_data_t d
, int error
){ 
 810                 handler(context
, done
, d
, error
); 
 815 dispatch_io_write(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
, 
 816                 dispatch_queue_t queue
, dispatch_io_handler_t handler
) 
 818         _dispatch_io_data_retain(data
); 
 819         _dispatch_retain(channel
); 
 820         _dispatch_retain(queue
); 
 821         dispatch_async(channel
->queue
, ^{ 
 822                 dispatch_operation_t op
; 
 823                 op 
= _dispatch_operation_create(DOP_DIR_WRITE
, channel
, offset
, 
 824                                 dispatch_data_get_size(data
), data
, queue
, handler
); 
 826                         dispatch_queue_t barrier_q 
= channel
->barrier_queue
; 
 827                         dispatch_async(barrier_q
, ^{ 
 828                                 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
); 
 829                                 _dispatch_io_data_release(data
); 
 832                         _dispatch_io_data_release(data
); 
 834                 _dispatch_release(channel
); 
 835                 _dispatch_release(queue
); 
 840 dispatch_io_write_f(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
, 
 841                 dispatch_queue_t queue
, void *context
, 
 842                 dispatch_io_handler_function_t handler
) 
 844         return dispatch_io_write(channel
, offset
, data
, queue
, 
 845                         ^(bool done
, dispatch_data_t d
, int error
){ 
 846                 handler(context
, done
, d
, error
); 
 851 dispatch_read(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
, 
 852                 void (^handler
)(dispatch_data_t
, int)) 
 854         _dispatch_retain(queue
); 
 855         _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) { 
 858                         int err 
= fd_entry
->err
; 
 859                         dispatch_async(queue
, ^{ 
 860                                 _dispatch_fd_debug("convenience handler invoke", fd
); 
 861                                 handler(dispatch_data_empty
, err
); 
 863                         _dispatch_release(queue
); 
 866                 // Safe to access fd_entry on barrier queue 
 867                 dispatch_io_t channel 
= fd_entry
->convenience_channel
; 
 869                         channel 
= _dispatch_io_create(DISPATCH_IO_STREAM
); 
 871                         channel
->fd_actual 
= fd
; 
 872                         channel
->fd_entry 
= fd_entry
; 
 873                         dispatch_retain(fd_entry
->barrier_queue
); 
 874                         dispatch_retain(fd_entry
->barrier_group
); 
 875                         channel
->barrier_queue 
= fd_entry
->barrier_queue
; 
 876                         channel
->barrier_group 
= fd_entry
->barrier_group
; 
 877                         fd_entry
->convenience_channel 
= channel
; 
 879                 __block dispatch_data_t deliver_data 
= dispatch_data_empty
; 
 881                 dispatch_async(fd_entry
->close_queue
, ^{ 
 882                         dispatch_async(queue
, ^{ 
 883                                 _dispatch_fd_debug("convenience handler invoke", fd
); 
 884                                 handler(deliver_data
, err
); 
 885                                 _dispatch_io_data_release(deliver_data
); 
 887                         _dispatch_release(queue
); 
 889                 dispatch_operation_t op 
= 
 890                         _dispatch_operation_create(DOP_DIR_READ
, channel
, 0, 
 891                                         length
, dispatch_data_empty
, 
 892                                         _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,false), 
 893                                         ^(bool done
, dispatch_data_t data
, int error
) { 
 895                                         data 
= dispatch_data_create_concat(deliver_data
, data
); 
 896                                         _dispatch_io_data_release(deliver_data
); 
 904                         _dispatch_operation_enqueue(op
, DOP_DIR_READ
, dispatch_data_empty
); 
 910 dispatch_read_f(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
, 
 911                 void *context
, void (*handler
)(void *, dispatch_data_t
, int)) 
 913         return dispatch_read(fd
, length
, queue
, ^(dispatch_data_t d
, int error
){ 
 914                 handler(context
, d
, error
); 
 919 dispatch_write(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
, 
 920                 void (^handler
)(dispatch_data_t
, int)) 
 922         _dispatch_io_data_retain(data
); 
 923         _dispatch_retain(queue
); 
 924         _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) { 
 927                         int err 
= fd_entry
->err
; 
 928                         dispatch_async(queue
, ^{ 
 929                                 _dispatch_fd_debug("convenience handler invoke", fd
); 
 932                         _dispatch_release(queue
); 
 935                 // Safe to access fd_entry on barrier queue 
 936                 dispatch_io_t channel 
= fd_entry
->convenience_channel
; 
 938                         channel 
= _dispatch_io_create(DISPATCH_IO_STREAM
); 
 940                         channel
->fd_actual 
= fd
; 
 941                         channel
->fd_entry 
= fd_entry
; 
 942                         dispatch_retain(fd_entry
->barrier_queue
); 
 943                         dispatch_retain(fd_entry
->barrier_group
); 
 944                         channel
->barrier_queue 
= fd_entry
->barrier_queue
; 
 945                         channel
->barrier_group 
= fd_entry
->barrier_group
; 
 946                         fd_entry
->convenience_channel 
= channel
; 
 948                 __block dispatch_data_t deliver_data 
= NULL
; 
 950                 dispatch_async(fd_entry
->close_queue
, ^{ 
 951                         dispatch_async(queue
, ^{ 
 952                                 _dispatch_fd_debug("convenience handler invoke", fd
); 
 953                                 handler(deliver_data
, err
); 
 955                                         _dispatch_io_data_release(deliver_data
); 
 958                         _dispatch_release(queue
); 
 960                 dispatch_operation_t op 
= 
 961                         _dispatch_operation_create(DOP_DIR_WRITE
, channel
, 0, 
 962                                         dispatch_data_get_size(data
), data
, 
 963                                         _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,false), 
 964                                         ^(bool done
, dispatch_data_t d
, int error
) { 
 967                                                 _dispatch_io_data_retain(d
); 
 974                         _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
); 
 976                 _dispatch_io_data_release(data
); 
 981 dispatch_write_f(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
, 
 982                 void *context
, void (*handler
)(void *, dispatch_data_t
, int)) 
 984         return dispatch_write(fd
, data
, queue
, ^(dispatch_data_t d
, int error
){ 
 985                 handler(context
, d
, error
); 
 990 #pragma mark dispatch_operation_t 
 992 static dispatch_operation_t
 
 993 _dispatch_operation_create(dispatch_op_direction_t direction
, 
 994                 dispatch_io_t channel
, off_t offset
, size_t length
, 
 995                 dispatch_data_t data
, dispatch_queue_t queue
, 
 996                 dispatch_io_handler_t handler
) 
 999         dispatch_assert(direction 
< DOP_DIR_MAX
); 
1000         // Safe to call _dispatch_io_get_error() with channel->fd_entry since 
1001         // that can only be NULL if atomic_flags are set rdar://problem/8362514 
1002         int err 
= _dispatch_io_get_error(NULL
, channel
, false); 
1003         if (err 
|| !length
) { 
1004                 _dispatch_io_data_retain(data
); 
1005                 _dispatch_retain(queue
); 
1006                 dispatch_async(channel
->barrier_queue
, ^{ 
1007                         dispatch_async(queue
, ^{ 
1008                                 dispatch_data_t d 
= data
; 
1009                                 if (direction 
== DOP_DIR_READ 
&& err
) { 
1011                                 } else if (direction 
== DOP_DIR_WRITE 
&& !err
) { 
1014                                 _dispatch_channel_debug("IO handler invoke: err %d", channel
, 
1016                                 handler(true, d
, err
); 
1017                                 _dispatch_io_data_release(data
); 
1019                         _dispatch_release(queue
); 
1023         dispatch_operation_t op 
= _dispatch_alloc(DISPATCH_VTABLE(operation
), 
1024                         sizeof(struct dispatch_operation_s
)); 
1025         _dispatch_channel_debug("operation create: %p", channel
, op
); 
1026         op
->do_next 
= DISPATCH_OBJECT_LISTLESS
; 
1027         op
->do_xref_cnt 
= -1; // operation object is not exposed externally 
1028         op
->op_q 
= dispatch_queue_create("com.apple.libdispatch-io.opq", NULL
); 
1029         op
->op_q
->do_targetq 
= queue
; 
1030         _dispatch_retain(queue
); 
1032         op
->direction 
= direction
; 
1033         op
->offset 
= offset 
+ channel
->f_ptr
; 
1034         op
->length 
= length
; 
1035         op
->handler 
= _dispatch_io_Block_copy(handler
); 
1036         _dispatch_retain(channel
); 
1037         op
->channel 
= channel
; 
1038         op
->params 
= channel
->params
; 
1039         // Take a snapshot of the priority of the channel queue. The actual I/O 
1040         // for this operation will be performed at this priority 
1041         dispatch_queue_t targetq 
= op
->channel
->do_targetq
; 
1042         while (fastpath(targetq
->do_targetq
)) { 
1043                 targetq 
= targetq
->do_targetq
; 
1045         op
->do_targetq 
= targetq
; 
1046         _dispatch_object_debug(op
, "%s", __func__
); 
1051 _dispatch_operation_dispose(dispatch_operation_t op
) 
1053         _dispatch_object_debug(op
, "%s", __func__
); 
1054         _dispatch_op_debug("dispose", op
); 
1055         // Deliver the data if there's any 
1057                 _dispatch_operation_deliver_data(op
, DOP_DONE
); 
1058                 dispatch_group_leave(op
->fd_entry
->barrier_group
); 
1059                 _dispatch_fd_entry_release(op
->fd_entry
); 
1062                 _dispatch_release(op
->channel
); 
1065                 dispatch_release(op
->timer
); 
1067         // For write operations, op->buf is owned by op->buf_data 
1068         if (op
->buf 
&& op
->direction 
== DOP_DIR_READ
) { 
1072                 _dispatch_io_data_release(op
->buf_data
); 
1075                 _dispatch_io_data_release(op
->data
); 
1078                 dispatch_release(op
->op_q
); 
1080         Block_release(op
->handler
); 
1081         _dispatch_op_debug("disposed", op
); 
1085 _dispatch_operation_enqueue(dispatch_operation_t op
, 
1086                 dispatch_op_direction_t direction
, dispatch_data_t data
) 
1088         // Called from the barrier queue 
1089         _dispatch_io_data_retain(data
); 
1090         // If channel is closed or stopped, then call the handler immediately 
1091         int err 
= _dispatch_io_get_error(NULL
, op
->channel
, false); 
1093                 dispatch_io_handler_t handler 
= op
->handler
; 
1094                 dispatch_async(op
->op_q
, ^{ 
1095                         dispatch_data_t d 
= data
; 
1096                         if (direction 
== DOP_DIR_READ 
&& err
) { 
1098                         } else if (direction 
== DOP_DIR_WRITE 
&& !err
) { 
1101                         handler(true, d
, err
); 
1102                         _dispatch_io_data_release(data
); 
1104                 _dispatch_op_debug("release -> %d, err %d", op
, op
->do_ref_cnt
, err
); 
1105                 _dispatch_release(op
); 
1108         // Finish operation init 
1109         op
->fd_entry 
= op
->channel
->fd_entry
; 
1110         _dispatch_fd_entry_retain(op
->fd_entry
); 
1111         dispatch_group_enter(op
->fd_entry
->barrier_group
); 
1112         dispatch_disk_t disk 
= op
->fd_entry
->disk
; 
1114                 dispatch_stream_t stream 
= op
->fd_entry
->streams
[direction
]; 
1115                 dispatch_async(stream
->dq
, ^{ 
1116                         _dispatch_stream_enqueue_operation(stream
, op
, data
); 
1117                         _dispatch_io_data_release(data
); 
1120                 dispatch_async(disk
->pick_queue
, ^{ 
1121                         _dispatch_disk_enqueue_operation(disk
, op
, data
); 
1122                         _dispatch_io_data_release(data
); 
1128 _dispatch_operation_should_enqueue(dispatch_operation_t op
, 
1129                 dispatch_queue_t tq
, dispatch_data_t data
) 
1131         // On stream queue or disk queue 
1132         _dispatch_op_debug("enqueue", op
); 
1133         _dispatch_io_data_retain(data
); 
1135         int err 
= _dispatch_io_get_error(op
, NULL
, true); 
1139                 _dispatch_op_debug("release -> %d, err %d", op
, op
->do_ref_cnt
, err
); 
1140                 _dispatch_release(op
); 
1143         if (op
->params
.interval
) { 
1144                 dispatch_resume(_dispatch_operation_timer(tq
, op
)); 
1149 static dispatch_source_t
 
1150 _dispatch_operation_timer(dispatch_queue_t tq
, dispatch_operation_t op
) 
1152         // On stream queue or pick queue 
1156         dispatch_source_t timer 
= dispatch_source_create( 
1157                         DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, tq
); 
1158         dispatch_source_set_timer(timer
, dispatch_time(DISPATCH_TIME_NOW
, 
1159                         (int64_t)op
->params
.interval
), op
->params
.interval
, 0); 
1160         dispatch_source_set_event_handler(timer
, ^{ 
1161                 // On stream queue or pick queue 
1162                 if (dispatch_source_testcancel(timer
)) { 
1163                         // Do nothing. The operation has already completed 
1166                 dispatch_op_flags_t flags 
= DOP_DEFAULT
; 
1167                 if (op
->params
.interval_flags 
& DISPATCH_IO_STRICT_INTERVAL
) { 
1168                         // Deliver even if there is less data than the low-water mark 
1169                         flags 
|= DOP_DELIVER
; 
1171                 // If the operation is active, dont deliver data 
1172                 if ((op
->active
) && (flags 
& DOP_DELIVER
)) { 
1175                         _dispatch_operation_deliver_data(op
, flags
); 
1183 #pragma mark dispatch_fd_entry_t 
1185 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD 
1187 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry
) 
1189         guardid_t guard 
= fd_entry
; 
1190         const unsigned int guard_flags 
= GUARD_CLOSE
; 
1191         int err
, fd_flags 
= 0; 
1192         _dispatch_io_syscall_switch_noerr(err
, 
1193                 change_fdguard_np(fd_entry
->fd
, NULL
, 0, &guard
, guard_flags
, 
1196                         fd_entry
->guard_flags 
= guard_flags
; 
1197                         fd_entry
->orig_fd_flags 
= fd_flags
; 
1200                 default: (void)dispatch_assume_zero(err
); break; 
1205 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry
) 
1207         if (!fd_entry
->guard_flags
) { 
1210         guardid_t guard 
= fd_entry
; 
1211         int err
, fd_flags 
= fd_entry
->orig_fd_flags
; 
1212         _dispatch_io_syscall_switch(err
, 
1213                 change_fdguard_np(fd_entry
->fd
, &guard
, fd_entry
->guard_flags
, NULL
, 0, 
1215                 default: (void)dispatch_assume_zero(err
); break; 
1220 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry
) { (void)fd_entry
; } 
1222 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry
) { (void)fd_entry
; } 
1223 #endif // DISPATCH_USE_GUARDED_FD 
1226 _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry
, const char *path
, 
1227                 int oflag
, mode_t mode
) { 
1228 #if DISPATCH_USE_GUARDED_FD 
1229         guardid_t guard 
= (uintptr_t)fd_entry
; 
1230         const unsigned int guard_flags 
= GUARD_CLOSE 
| GUARD_DUP 
| 
1231                         GUARD_SOCKET_IPC 
| GUARD_FILEPORT
; 
1232         int fd 
= guarded_open_np(path
, &guard
, guard_flags
, oflag 
| O_CLOEXEC
, 
1235                 fd_entry
->guard_flags 
= guard_flags
; 
1240         return open(path
, oflag
, mode
); 
1245 _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry
, int fd
) { 
1246 #if DISPATCH_USE_GUARDED_FD 
1247         if (fd_entry
->guard_flags
) { 
1248                 guardid_t guard 
= (uintptr_t)fd_entry
; 
1249                 return guarded_close_np(fd
, &guard
); 
1259 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
) { 
1260         dispatch_suspend(fd_entry
->close_queue
); 
1264 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
) { 
1265         dispatch_resume(fd_entry
->close_queue
); 
1269 _dispatch_fd_entry_init_async(dispatch_fd_t fd
, 
1270                 dispatch_fd_entry_init_callback_t completion_callback
) 
1272         static dispatch_once_t _dispatch_io_fds_lockq_pred
; 
1273         dispatch_once_f(&_dispatch_io_fds_lockq_pred
, NULL
, 
1274                         _dispatch_io_fds_lockq_init
); 
1275         dispatch_async(_dispatch_io_fds_lockq
, ^{ 
1276                 dispatch_fd_entry_t fd_entry 
= NULL
; 
1277                 // Check to see if there is an existing entry for the given fd 
1278                 uintptr_t hash 
= DIO_HASH(fd
); 
1279                 TAILQ_FOREACH(fd_entry
, &_dispatch_io_fds
[hash
], fd_list
) { 
1280                         if (fd_entry
->fd 
== fd
) { 
1281                                 // Retain the fd_entry to ensure it cannot go away until the 
1282                                 // stat() has completed 
1283                                 _dispatch_fd_entry_retain(fd_entry
); 
1288                         // If we did not find an existing entry, create one 
1289                         fd_entry 
= _dispatch_fd_entry_create_with_fd(fd
, hash
); 
1291                 _dispatch_fd_entry_debug("init", fd_entry
); 
1292                 dispatch_async(fd_entry
->barrier_queue
, ^{ 
1293                         _dispatch_fd_entry_debug("init completion", fd_entry
); 
1294                         completion_callback(fd_entry
); 
1295                         // stat() is complete, release reference to fd_entry 
1296                         _dispatch_fd_entry_release(fd_entry
); 
1301 static dispatch_fd_entry_t
 
1302 _dispatch_fd_entry_create(dispatch_queue_t q
) 
1304         dispatch_fd_entry_t fd_entry
; 
1305         fd_entry 
= _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s
)); 
1306         fd_entry
->close_queue 
= dispatch_queue_create( 
1307                         "com.apple.libdispatch-io.closeq", NULL
); 
1308         // Use target queue to ensure that no concurrent lookups are going on when 
1309         // the close queue is running 
1310         fd_entry
->close_queue
->do_targetq 
= q
; 
1311         _dispatch_retain(q
); 
1312         // Suspend the cleanup queue until closing 
1313         _dispatch_fd_entry_retain(fd_entry
); 
1317 static dispatch_fd_entry_t
 
1318 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
, uintptr_t hash
) 
1320         // On fds lock queue 
1321         dispatch_fd_entry_t fd_entry 
= _dispatch_fd_entry_create( 
1322                         _dispatch_io_fds_lockq
); 
1323         _dispatch_fd_entry_debug("create: fd %d", fd_entry
, fd
); 
1325         TAILQ_INSERT_TAIL(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
); 
1326         fd_entry
->barrier_queue 
= dispatch_queue_create( 
1327                         "com.apple.libdispatch-io.barrierq", NULL
); 
1328         fd_entry
->barrier_group 
= dispatch_group_create(); 
1329         dispatch_async(fd_entry
->barrier_queue
, ^{ 
1330                 _dispatch_fd_entry_debug("stat", fd_entry
); 
1331                 int err
, orig_flags
, orig_nosigpipe 
= -1; 
1333                 _dispatch_io_syscall_switch(err
, 
1335                         default: fd_entry
->err 
= err
; return; 
1337                 fd_entry
->stat
.dev 
= st
.st_dev
; 
1338                 fd_entry
->stat
.mode 
= st
.st_mode
; 
1339                 _dispatch_fd_entry_guard(fd_entry
); 
1340                 _dispatch_io_syscall_switch(err
, 
1341                         orig_flags 
= fcntl(fd
, F_GETFL
), 
1342                         default: (void)dispatch_assume_zero(err
); break; 
1344 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123 
1345                 if (S_ISFIFO(st
.st_mode
)) { 
1346                         _dispatch_io_syscall_switch(err
, 
1347                                 orig_nosigpipe 
= fcntl(fd
, F_GETNOSIGPIPE
), 
1348                                 default: (void)dispatch_assume_zero(err
); break; 
1350                         if (orig_nosigpipe 
!= -1) { 
1351                                 _dispatch_io_syscall_switch(err
, 
1352                                         orig_nosigpipe 
= fcntl(fd
, F_SETNOSIGPIPE
, 1), 
1354                                                 orig_nosigpipe 
= -1; 
1355                                                 (void)dispatch_assume_zero(err
); 
1361                 if (S_ISREG(st
.st_mode
)) { 
1362                         if (orig_flags 
!= -1) { 
1363                                 _dispatch_io_syscall_switch(err
, 
1364                                         fcntl(fd
, F_SETFL
, orig_flags 
& ~O_NONBLOCK
), 
1367                                                 (void)dispatch_assume_zero(err
); 
1371                         int32_t dev 
= major(st
.st_dev
); 
1372                         // We have to get the disk on the global dev queue. The 
1373                         // barrier queue cannot continue until that is complete 
1374                         dispatch_suspend(fd_entry
->barrier_queue
); 
1375                         dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
, 
1376                                         _dispatch_io_devs_lockq_init
); 
1377                         dispatch_async(_dispatch_io_devs_lockq
, ^{ 
1378                                 _dispatch_disk_init(fd_entry
, dev
); 
1379                                 dispatch_resume(fd_entry
->barrier_queue
); 
1382                         if (orig_flags 
!= -1) { 
1383                                 _dispatch_io_syscall_switch(err
, 
1384                                         fcntl(fd
, F_SETFL
, orig_flags 
| O_NONBLOCK
), 
1387                                                 (void)dispatch_assume_zero(err
); 
1391                         _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue( 
1392                                         _DISPATCH_QOS_CLASS_DEFAULT
, false)); 
1394                 fd_entry
->orig_flags 
= orig_flags
; 
1395                 fd_entry
->orig_nosigpipe 
= orig_nosigpipe
; 
1397         // This is the first item run when the close queue is resumed, indicating 
1398         // that all channels associated with this entry have been closed and that 
1399         // all operations associated with this entry have been freed 
1400         dispatch_async(fd_entry
->close_queue
, ^{ 
1401                 if (!fd_entry
->disk
) { 
1402                         _dispatch_fd_entry_debug("close queue cleanup", fd_entry
); 
1403                         dispatch_op_direction_t dir
; 
1404                         for (dir 
= 0; dir 
< DOP_DIR_MAX
; dir
++) { 
1405                                 _dispatch_stream_dispose(fd_entry
, dir
); 
1408                         dispatch_disk_t disk 
= fd_entry
->disk
; 
1409                         dispatch_async(_dispatch_io_devs_lockq
, ^{ 
1410                                 _dispatch_release(disk
); 
1413                 // Remove this entry from the global fd list 
1414                 TAILQ_REMOVE(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
); 
1416         // If there was a source associated with this stream, disposing of the 
1417         // source cancels it and suspends the close queue. Freeing the fd_entry 
1418         // structure must happen after the source cancel handler has finished 
1419         dispatch_async(fd_entry
->close_queue
, ^{ 
1420                 _dispatch_fd_entry_debug("close queue release", fd_entry
); 
1421                 dispatch_release(fd_entry
->close_queue
); 
1422                 _dispatch_fd_entry_debug("barrier queue release", fd_entry
); 
1423                 dispatch_release(fd_entry
->barrier_queue
); 
1424                 _dispatch_fd_entry_debug("barrier group release", fd_entry
); 
1425                 dispatch_release(fd_entry
->barrier_group
); 
1426                 if (fd_entry
->orig_flags 
!= -1) { 
1427                         _dispatch_io_syscall( 
1428                                 fcntl(fd
, F_SETFL
, fd_entry
->orig_flags
) 
1431 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123 
1432                 if (fd_entry
->orig_nosigpipe 
!= -1) { 
1433                         _dispatch_io_syscall( 
1434                                 fcntl(fd
, F_SETNOSIGPIPE
, fd_entry
->orig_nosigpipe
) 
1438                 _dispatch_fd_entry_unguard(fd_entry
); 
1439                 if (fd_entry
->convenience_channel
) { 
1440                         fd_entry
->convenience_channel
->fd_entry 
= NULL
; 
1441                         dispatch_release(fd_entry
->convenience_channel
); 
1448 static dispatch_fd_entry_t
 
1449 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data
, 
1450                 dev_t dev
, mode_t mode
) 
1452         // On devs lock queue 
1453         dispatch_fd_entry_t fd_entry 
= _dispatch_fd_entry_create( 
1454                         path_data
->channel
->queue
); 
1455         _dispatch_fd_entry_debug("create: path %s", fd_entry
, path_data
->path
); 
1456         if (S_ISREG(mode
)) { 
1457                 _dispatch_disk_init(fd_entry
, major(dev
)); 
1459                 _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue( 
1460                                 _DISPATCH_QOS_CLASS_DEFAULT
, false)); 
1463         fd_entry
->orig_flags 
= -1; 
1464         fd_entry
->path_data 
= path_data
; 
1465         fd_entry
->stat
.dev 
= dev
; 
1466         fd_entry
->stat
.mode 
= mode
; 
1467         fd_entry
->barrier_queue 
= dispatch_queue_create( 
1468                         "com.apple.libdispatch-io.barrierq", NULL
); 
1469         fd_entry
->barrier_group 
= dispatch_group_create(); 
1470         // This is the first item run when the close queue is resumed, indicating 
1471         // that the channel associated with this entry has been closed and that 
1472         // all operations associated with this entry have been freed 
1473         dispatch_async(fd_entry
->close_queue
, ^{ 
1474                 _dispatch_fd_entry_debug("close queue cleanup", fd_entry
); 
1475                 if (!fd_entry
->disk
) { 
1476                         dispatch_op_direction_t dir
; 
1477                         for (dir 
= 0; dir 
< DOP_DIR_MAX
; dir
++) { 
1478                                 _dispatch_stream_dispose(fd_entry
, dir
); 
1481                 if (fd_entry
->fd 
!= -1) { 
1482                         _dispatch_fd_entry_guarded_close(fd_entry
, fd_entry
->fd
); 
1484                 if (fd_entry
->path_data
->channel
) { 
1485                         // If associated channel has not been released yet, mark it as 
1486                         // no longer having an fd_entry (for stop after close). 
1487                         // It is safe to modify channel since we are on close_queue with 
1488                         // target queue the channel queue 
1489                         fd_entry
->path_data
->channel
->fd_entry 
= NULL
; 
1492         dispatch_async(fd_entry
->close_queue
, ^{ 
1493                 _dispatch_fd_entry_debug("close queue release", fd_entry
); 
1494                 dispatch_release(fd_entry
->close_queue
); 
1495                 dispatch_release(fd_entry
->barrier_queue
); 
1496                 dispatch_release(fd_entry
->barrier_group
); 
1497                 free(fd_entry
->path_data
); 
1504 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
, dispatch_io_t channel
) 
1506         if (!(fd_entry
->fd 
== -1 && fd_entry
->path_data
)) { 
1509         if (fd_entry
->err
) { 
1510                 return fd_entry
->err
; 
1513         int oflag 
= fd_entry
->disk 
? fd_entry
->path_data
->oflag 
& ~O_NONBLOCK 
: 
1514                         fd_entry
->path_data
->oflag 
| O_NONBLOCK
; 
1516         fd 
= _dispatch_fd_entry_guarded_open(fd_entry
, fd_entry
->path_data
->path
, 
1517                         oflag
, fd_entry
->path_data
->mode
); 
1523                 (void)os_atomic_cmpxchg2o(fd_entry
, err
, 0, err
, relaxed
); 
1526         if (!os_atomic_cmpxchg2o(fd_entry
, fd
, -1, fd
, relaxed
)) { 
1527                 // Lost the race with another open 
1528                 _dispatch_fd_entry_guarded_close(fd_entry
, fd
); 
1530                 channel
->fd_actual 
= fd
; 
1532         _dispatch_object_debug(channel
, "%s", __func__
); 
1537 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
, 
1538                 dispatch_io_t channel
) 
1540         if (fd_entry
->disk
) { 
1542                         _dispatch_retain(channel
); 
1544                 _dispatch_fd_entry_retain(fd_entry
); 
1545                 dispatch_async(fd_entry
->disk
->pick_queue
, ^{ 
1546                         _dispatch_disk_cleanup_inactive_operations(fd_entry
->disk
, channel
); 
1547                         _dispatch_fd_entry_release(fd_entry
); 
1549                                 _dispatch_release(channel
); 
1553                 dispatch_op_direction_t direction
; 
1554                 for (direction 
= 0; direction 
< DOP_DIR_MAX
; direction
++) { 
1555                         dispatch_stream_t stream 
= fd_entry
->streams
[direction
]; 
1560                                 _dispatch_retain(channel
); 
1562                         _dispatch_fd_entry_retain(fd_entry
); 
1563                         dispatch_async(stream
->dq
, ^{ 
1564                                 _dispatch_stream_cleanup_operations(stream
, channel
); 
1565                                 _dispatch_fd_entry_release(fd_entry
); 
1567                                         _dispatch_release(channel
); 
1575 #pragma mark dispatch_stream_t/dispatch_disk_t 
1578 _dispatch_stream_init(dispatch_fd_entry_t fd_entry
, dispatch_queue_t tq
) 
1580         dispatch_op_direction_t direction
; 
1581         for (direction 
= 0; direction 
< DOP_DIR_MAX
; direction
++) { 
1582                 dispatch_stream_t stream
; 
1583                 stream 
= _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s
)); 
1584                 stream
->dq 
= dispatch_queue_create("com.apple.libdispatch-io.streamq", 
1586                 dispatch_set_context(stream
->dq
, stream
); 
1587                 _dispatch_retain(tq
); 
1588                 stream
->dq
->do_targetq 
= tq
; 
1589                 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_RANDOM
]); 
1590                 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_STREAM
]); 
1591                 fd_entry
->streams
[direction
] = stream
; 
1596 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
, 
1597                 dispatch_op_direction_t direction
) 
1600         dispatch_stream_t stream 
= fd_entry
->streams
[direction
]; 
1604         dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
])); 
1605         dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])); 
1606         if (stream
->source
) { 
1607                 // Balanced by source cancel handler: 
1608                 _dispatch_fd_entry_retain(fd_entry
); 
1609                 dispatch_source_cancel(stream
->source
); 
1610                 dispatch_resume(stream
->source
); 
1611                 dispatch_release(stream
->source
); 
1613         dispatch_set_context(stream
->dq
, NULL
); 
1614         dispatch_release(stream
->dq
); 
1619 _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
) 
1621         // On devs lock queue 
1622         dispatch_disk_t disk
; 
1623         // Check to see if there is an existing entry for the given device 
1624         uintptr_t hash 
= DIO_HASH(dev
); 
1625         TAILQ_FOREACH(disk
, &_dispatch_io_devs
[hash
], disk_list
) { 
1626                 if (disk
->dev 
== dev
) { 
1627                         _dispatch_retain(disk
); 
1631         // Otherwise create a new entry 
1632         size_t pending_reqs_depth 
= dispatch_io_defaults
.max_pending_io_reqs
; 
1633         disk 
= _dispatch_alloc(DISPATCH_VTABLE(disk
), 
1634                         sizeof(struct dispatch_disk_s
) + 
1635                         (pending_reqs_depth 
* sizeof(dispatch_operation_t
))); 
1636         disk
->do_next 
= DISPATCH_OBJECT_LISTLESS
; 
1637         disk
->do_xref_cnt 
= -1; 
1638         disk
->advise_list_depth 
= pending_reqs_depth
; 
1639         disk
->do_targetq 
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
, 
1642         TAILQ_INIT(&disk
->operations
); 
1643         disk
->cur_rq 
= TAILQ_FIRST(&disk
->operations
); 
1645         snprintf(label
, sizeof(label
), "com.apple.libdispatch-io.deviceq.%d", 
1647         disk
->pick_queue 
= dispatch_queue_create(label
, NULL
); 
1648         TAILQ_INSERT_TAIL(&_dispatch_io_devs
[hash
], disk
, disk_list
); 
1650         fd_entry
->disk 
= disk
; 
1651         TAILQ_INIT(&fd_entry
->stream_ops
); 
1655 _dispatch_disk_dispose(dispatch_disk_t disk
) 
1657         uintptr_t hash 
= DIO_HASH(disk
->dev
); 
1658         TAILQ_REMOVE(&_dispatch_io_devs
[hash
], disk
, disk_list
); 
1659         dispatch_assert(TAILQ_EMPTY(&disk
->operations
)); 
1661         for (i
=0; i
<disk
->advise_list_depth
; ++i
) { 
1662                 dispatch_assert(!disk
->advise_list
[i
]); 
1664         dispatch_release(disk
->pick_queue
); 
1668 #pragma mark dispatch_stream_operations/dispatch_disk_operations 
1671 _dispatch_stream_operation_avail(dispatch_stream_t stream
) 
1673         return  !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) || 
1674                         !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
])); 
1678 _dispatch_stream_enqueue_operation(dispatch_stream_t stream
, 
1679                 dispatch_operation_t op
, dispatch_data_t data
) 
1681         if (!_dispatch_operation_should_enqueue(op
, stream
->dq
, data
)) { 
1684         _dispatch_object_debug(op
, "%s", __func__
); 
1685         bool no_ops 
= !_dispatch_stream_operation_avail(stream
); 
1686         TAILQ_INSERT_TAIL(&stream
->operations
[op
->params
.type
], op
, operation_list
); 
1688                 dispatch_async_f(stream
->dq
, stream
->dq
, 
1689                                 _dispatch_stream_queue_handler
); 
1694 _dispatch_disk_enqueue_operation(dispatch_disk_t disk
, dispatch_operation_t op
, 
1695                 dispatch_data_t data
) 
1697         if (!_dispatch_operation_should_enqueue(op
, disk
->pick_queue
, data
)) { 
1700         _dispatch_object_debug(op
, "%s", __func__
); 
1701         if (op
->params
.type 
== DISPATCH_IO_STREAM
) { 
1702                 if (TAILQ_EMPTY(&op
->fd_entry
->stream_ops
)) { 
1703                         TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
); 
1705                 TAILQ_INSERT_TAIL(&op
->fd_entry
->stream_ops
, op
, stream_list
); 
1707                 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
); 
1709         _dispatch_disk_handler(disk
); 
1713 _dispatch_stream_complete_operation(dispatch_stream_t stream
, 
1714                 dispatch_operation_t op
) 
1717         _dispatch_object_debug(op
, "%s", __func__
); 
1718         _dispatch_op_debug("complete: stream %p", op
, stream
); 
1719         TAILQ_REMOVE(&stream
->operations
[op
->params
.type
], op
, operation_list
); 
1720         if (op 
== stream
->op
) { 
1724                 dispatch_source_cancel(op
->timer
); 
1726         // Final release will deliver any pending data 
1727         _dispatch_op_debug("release -> %d (stream complete)", op
, op
->do_ref_cnt
); 
1728         _dispatch_release(op
); 
1732 _dispatch_disk_complete_operation(dispatch_disk_t disk
, dispatch_operation_t op
) 
1735         _dispatch_object_debug(op
, "%s", __func__
); 
1736         _dispatch_op_debug("complete: disk %p", op
, disk
); 
1737         // Current request is always the last op returned 
1738         if (disk
->cur_rq 
== op
) { 
1739                 disk
->cur_rq 
= TAILQ_PREV(op
, dispatch_disk_operations_s
, 
1742         if (op
->params
.type 
== DISPATCH_IO_STREAM
) { 
1743                 // Check if there are other pending stream operations behind it 
1744                 dispatch_operation_t op_next 
= TAILQ_NEXT(op
, stream_list
); 
1745                 TAILQ_REMOVE(&op
->fd_entry
->stream_ops
, op
, stream_list
); 
1747                         TAILQ_INSERT_TAIL(&disk
->operations
, op_next
, operation_list
); 
1750         TAILQ_REMOVE(&disk
->operations
, op
, operation_list
); 
1752                 dispatch_source_cancel(op
->timer
); 
1754         // Final release will deliver any pending data 
1755         _dispatch_op_debug("release -> %d (disk complete)", op
, op
->do_ref_cnt
); 
1756         _dispatch_release(op
); 
1759 static dispatch_operation_t
 
1760 _dispatch_stream_pick_next_operation(dispatch_stream_t stream
, 
1761                 dispatch_operation_t op
) 
1765                 // On the first run through, pick the first operation 
1766                 if (!_dispatch_stream_operation_avail(stream
)) { 
1769                 if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
])) { 
1770                         op 
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_STREAM
]); 
1771                 } else if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) { 
1772                         op 
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]); 
1776         if (op
->params
.type 
== DISPATCH_IO_STREAM
) { 
1777                 // Stream operations need to be serialized so continue the current 
1778                 // operation until it is finished 
1781         // Get the next random operation (round-robin) 
1782         if (op
->params
.type 
== DISPATCH_IO_RANDOM
) { 
1783                 op 
= TAILQ_NEXT(op
, operation_list
); 
1785                         op 
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]); 
1792 static dispatch_operation_t
 
1793 _dispatch_disk_pick_next_operation(dispatch_disk_t disk
) 
1796         dispatch_operation_t op
; 
1797         if (!TAILQ_EMPTY(&disk
->operations
)) { 
1798                 if (disk
->cur_rq 
== NULL
) { 
1799                         op 
= TAILQ_FIRST(&disk
->operations
); 
1803                                 op 
= TAILQ_NEXT(op
, operation_list
); 
1805                                         op 
= TAILQ_FIRST(&disk
->operations
); 
1807                                 // TODO: more involved picking algorithm rdar://problem/8780312 
1808                         } while (op
->active 
&& op 
!= disk
->cur_rq
); 
1819 _dispatch_stream_cleanup_operations(dispatch_stream_t stream
, 
1820                 dispatch_io_t channel
) 
1823         dispatch_operation_t op
, tmp
; 
1824         typeof(*stream
->operations
) *operations
; 
1825         operations 
= &stream
->operations
[DISPATCH_IO_RANDOM
]; 
1826         TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) { 
1827                 if (!channel 
|| op
->channel 
== channel
) { 
1828                         _dispatch_stream_complete_operation(stream
, op
); 
1831         operations 
= &stream
->operations
[DISPATCH_IO_STREAM
]; 
1832         TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) { 
1833                 if (!channel 
|| op
->channel 
== channel
) { 
1834                         _dispatch_stream_complete_operation(stream
, op
); 
1837         if (stream
->source_running 
&& !_dispatch_stream_operation_avail(stream
)) { 
1838                 dispatch_suspend(stream
->source
); 
1839                 stream
->source_running 
= false; 
1844 _dispatch_disk_cleanup_specified_operations(dispatch_disk_t disk
, 
1845                 dispatch_io_t channel
, bool inactive_only
) 
1848         dispatch_operation_t op
, tmp
; 
1849         TAILQ_FOREACH_SAFE(op
, &disk
->operations
, operation_list
, tmp
) { 
1850                 if (inactive_only 
&& op
->active
) continue; 
1851                 if (!channel 
|| op
->channel 
== channel
) { 
1852                         _dispatch_op_debug("cleanup: disk %p", op
, disk
); 
1853                         _dispatch_disk_complete_operation(disk
, op
); 
1859 _dispatch_disk_cleanup_operations(dispatch_disk_t disk
, dispatch_io_t channel
) 
1861         _dispatch_disk_cleanup_specified_operations(disk
, channel
, false); 
1865 _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk
, 
1866                 dispatch_io_t channel
) 
1868         _dispatch_disk_cleanup_specified_operations(disk
, channel
, true); 
1872 #pragma mark dispatch_stream_handler/dispatch_disk_handler 
1874 static dispatch_source_t
 
1875 _dispatch_stream_source(dispatch_stream_t stream
, dispatch_operation_t op
) 
1878         if (stream
->source
) { 
1879                 return stream
->source
; 
1881         dispatch_fd_t fd 
= op
->fd_entry
->fd
; 
1882         _dispatch_op_debug("stream source create", op
); 
1883         dispatch_source_t source 
= NULL
; 
1884         if (op
->direction 
== DOP_DIR_READ
) { 
1885                 source 
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, 
1886                                 (uintptr_t)fd
, 0, stream
->dq
); 
1887         } else if (op
->direction 
== DOP_DIR_WRITE
) { 
1888                 source 
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
, 
1889                                 (uintptr_t)fd
, 0, stream
->dq
); 
1891                 dispatch_assert(op
->direction 
< DOP_DIR_MAX
); 
1894         dispatch_set_context(source
, stream
); 
1895         dispatch_source_set_event_handler_f(source
, 
1896                         _dispatch_stream_source_handler
); 
1897         // Close queue must not run user cleanup handlers until sources are fully 
1899         dispatch_queue_t close_queue 
= op
->fd_entry
->close_queue
; 
1900         dispatch_source_set_cancel_handler(source
, ^{ 
1901                 _dispatch_op_debug("stream source cancel", op
); 
1902                 dispatch_resume(close_queue
); 
1904         stream
->source 
= source
; 
1905         return stream
->source
; 
1909 _dispatch_stream_source_handler(void *ctx
) 
1912         dispatch_stream_t stream 
= (dispatch_stream_t
)ctx
; 
1913         dispatch_suspend(stream
->source
); 
1914         stream
->source_running 
= false; 
1915         return _dispatch_stream_handler(stream
); 
1919 _dispatch_stream_queue_handler(void *ctx
) 
1922         dispatch_stream_t stream 
= (dispatch_stream_t
)dispatch_get_context(ctx
); 
1924                 // _dispatch_stream_dispose has been called 
1927         return _dispatch_stream_handler(stream
); 
1931 _dispatch_stream_handler(void *ctx
) 
1934         dispatch_stream_t stream 
= (dispatch_stream_t
)ctx
; 
1935         dispatch_operation_t op
; 
1937         op 
= _dispatch_stream_pick_next_operation(stream
, stream
->op
); 
1939                 _dispatch_debug("no operation found: stream %p", stream
); 
1942         int err 
= _dispatch_io_get_error(op
, NULL
, true); 
1945                 _dispatch_stream_complete_operation(stream
, op
); 
1949         _dispatch_op_debug("stream handler", op
); 
1950         dispatch_fd_entry_t fd_entry 
= op
->fd_entry
; 
1951         _dispatch_fd_entry_retain(fd_entry
); 
1952         // For performance analysis 
1953         if (!op
->total 
&& dispatch_io_defaults
.initial_delivery
) { 
1954                 // Empty delivery to signal the start of the operation 
1955                 _dispatch_op_debug("initial delivery", op
); 
1956                 _dispatch_operation_deliver_data(op
, DOP_DELIVER
); 
1958         // TODO: perform on the operation target queue to get correct priority 
1959         int result 
= _dispatch_operation_perform(op
); 
1960         dispatch_op_flags_t flags 
= ~0u; 
1962         case DISPATCH_OP_DELIVER
: 
1963                 flags 
= DOP_DEFAULT
; 
1965         case DISPATCH_OP_DELIVER_AND_COMPLETE
: 
1966                 flags 
= (flags 
!= DOP_DEFAULT
) ? DOP_DELIVER 
| DOP_NO_EMPTY 
: 
1968                 _dispatch_operation_deliver_data(op
, flags
); 
1970         case DISPATCH_OP_COMPLETE
: 
1971                 if (flags 
!= DOP_DEFAULT
) { 
1972                         _dispatch_stream_complete_operation(stream
, op
); 
1974                 if (_dispatch_stream_operation_avail(stream
)) { 
1975                         dispatch_async_f(stream
->dq
, stream
->dq
, 
1976                                         _dispatch_stream_queue_handler
); 
1979         case DISPATCH_OP_COMPLETE_RESUME
: 
1980                 _dispatch_stream_complete_operation(stream
, op
); 
1982         case DISPATCH_OP_RESUME
: 
1983                 if (_dispatch_stream_operation_avail(stream
)) { 
1984                         stream
->source_running 
= true; 
1985                         dispatch_resume(_dispatch_stream_source(stream
, op
)); 
1988         case DISPATCH_OP_ERR
: 
1989                 _dispatch_stream_cleanup_operations(stream
, op
->channel
); 
1991         case DISPATCH_OP_FD_ERR
: 
1992                 _dispatch_fd_entry_retain(fd_entry
); 
1993                 dispatch_async(fd_entry
->barrier_queue
, ^{ 
1994                         _dispatch_fd_entry_cleanup_operations(fd_entry
, NULL
); 
1995                         _dispatch_fd_entry_release(fd_entry
); 
2001         _dispatch_fd_entry_release(fd_entry
); 
2006 _dispatch_disk_handler(void *ctx
) 
2009         dispatch_disk_t disk 
= (dispatch_disk_t
)ctx
; 
2010         if (disk
->io_active
) { 
2013         _dispatch_disk_debug("disk handler", disk
); 
2014         dispatch_operation_t op
; 
2015         size_t i 
= disk
->free_idx
, j 
= disk
->req_idx
; 
2017                 j 
+= disk
->advise_list_depth
; 
2020                 if ((!disk
->advise_list
[i%disk
->advise_list_depth
]) && 
2021                                 (op 
= _dispatch_disk_pick_next_operation(disk
))) { 
2022                         int err 
= _dispatch_io_get_error(op
, NULL
, true); 
2025                                 _dispatch_disk_complete_operation(disk
, op
); 
2028                         _dispatch_retain(op
); 
2029                         _dispatch_op_debug("retain -> %d", op
, op
->do_ref_cnt 
+ 1); 
2030                         disk
->advise_list
[i%disk
->advise_list_depth
] = op
; 
2032                         _dispatch_op_debug("activate: disk %p", op
, disk
); 
2033                         _dispatch_object_debug(op
, "%s", __func__
); 
2035                         // No more operations to get 
2040         disk
->free_idx 
= (i%disk
->advise_list_depth
); 
2041         op 
= disk
->advise_list
[disk
->req_idx
]; 
2043                 disk
->io_active 
= true; 
2044                 _dispatch_op_debug("async perform: disk %p", op
, disk
); 
2045                 dispatch_async_f(op
->do_targetq
, disk
, _dispatch_disk_perform
); 
2050 _dispatch_disk_perform(void *ctxt
) 
2052         dispatch_disk_t disk 
= ctxt
; 
2053         _dispatch_disk_debug("disk perform", disk
); 
2054         size_t chunk_size 
= dispatch_io_defaults
.chunk_size
; 
2055         dispatch_operation_t op
; 
2056         size_t i 
= disk
->advise_idx
, j 
= disk
->free_idx
; 
2058                 j 
+= disk
->advise_list_depth
; 
2061                 op 
= disk
->advise_list
[i%disk
->advise_list_depth
]; 
2063                         // Nothing more to advise, must be at free_idx 
2064                         dispatch_assert(i%disk
->advise_list_depth 
== disk
->free_idx
); 
2067                 if (op
->direction 
== DOP_DIR_WRITE
) { 
2068                         // TODO: preallocate writes ? rdar://problem/9032172 
2071                 if (op
->fd_entry
->fd 
== -1 && _dispatch_fd_entry_open(op
->fd_entry
, 
2075                 // For performance analysis 
2076                 if (!op
->total 
&& dispatch_io_defaults
.initial_delivery
) { 
2077                         // Empty delivery to signal the start of the operation 
2078                         _dispatch_op_debug("initial delivery", op
); 
2079                         _dispatch_operation_deliver_data(op
, DOP_DELIVER
); 
2081                 // Advise two chunks if the list only has one element and this is the 
2082                 // first advise on the operation 
2083                 if ((j
-i
) == 1 && !disk
->advise_list
[disk
->free_idx
] && 
2084                                 !op
->advise_offset
) { 
2087                 _dispatch_operation_advise(op
, chunk_size
); 
2089         disk
->advise_idx 
= i%disk
->advise_list_depth
; 
2090         op 
= disk
->advise_list
[disk
->req_idx
]; 
2091         int result 
= _dispatch_operation_perform(op
); 
2092         disk
->advise_list
[disk
->req_idx
] = NULL
; 
2093         disk
->req_idx 
= (++disk
->req_idx
)%disk
->advise_list_depth
; 
2094         _dispatch_op_debug("async perform completion: disk %p", op
, disk
); 
2095         dispatch_async(disk
->pick_queue
, ^{ 
2096                 _dispatch_op_debug("perform completion", op
); 
2098                 case DISPATCH_OP_DELIVER
: 
2099                         _dispatch_operation_deliver_data(op
, DOP_DEFAULT
); 
2101                 case DISPATCH_OP_COMPLETE
: 
2102                         _dispatch_disk_complete_operation(disk
, op
); 
2104                 case DISPATCH_OP_DELIVER_AND_COMPLETE
: 
2105                         _dispatch_operation_deliver_data(op
, DOP_DELIVER 
| DOP_NO_EMPTY
); 
2106                         _dispatch_disk_complete_operation(disk
, op
); 
2108                 case DISPATCH_OP_ERR
: 
2109                         _dispatch_disk_cleanup_operations(disk
, op
->channel
); 
2111                 case DISPATCH_OP_FD_ERR
: 
2112                         _dispatch_disk_cleanup_operations(disk
, NULL
); 
2115                         dispatch_assert(result
); 
2118                 _dispatch_op_debug("deactivate: disk %p", op
, disk
); 
2120                 disk
->io_active 
= false; 
2121                 _dispatch_disk_handler(disk
); 
2122                 // Balancing the retain in _dispatch_disk_handler. Note that op must be 
2123                 // released at the very end, since it might hold the last reference to 
2125                 _dispatch_op_debug("release -> %d (disk perform complete)", op
, 
2127                 _dispatch_release(op
); 
2132 #pragma mark dispatch_operation_perform 
2135 _dispatch_operation_advise(dispatch_operation_t op
, size_t chunk_size
) 
2137         _dispatch_op_debug("advise", op
); 
2138         if (_dispatch_io_get_error(op
, NULL
, true)) return; 
2140         // linux does not support fcntl (F_RDAVISE) 
2141         // define necessary datastructure and use readahead 
2148         struct radvisory advise
; 
2149         // No point in issuing a read advise for the next chunk if we are already 
2150         // a chunk ahead from reading the bytes 
2151         if (op
->advise_offset 
> (off_t
)(((size_t)op
->offset 
+ op
->total
) + 
2152                         chunk_size 
+ PAGE_SIZE
)) { 
2155         _dispatch_object_debug(op
, "%s", __func__
); 
2156         advise
.ra_count 
= (int)chunk_size
; 
2157         if (!op
->advise_offset
) { 
2158                 op
->advise_offset 
= op
->offset
; 
2159                 // If this is the first time through, align the advised range to a 
2161                 size_t pg_fraction 
= ((size_t)op
->offset 
+ chunk_size
) % PAGE_SIZE
; 
2162                 advise
.ra_count 
+= (int)(pg_fraction 
? PAGE_SIZE 
- pg_fraction 
: 0); 
2164         advise
.ra_offset 
= op
->advise_offset
; 
2165         op
->advise_offset 
+= advise
.ra_count
; 
2167         _dispatch_io_syscall_switch(err
, 
2168                 readahead(op
->fd_entry
->fd
, advise
.ra_offset
, advise
.ra_count
), 
2169                 case EINVAL
: break; // fd does refer to a non-supported filetype 
2170                 default: (void)dispatch_assume_zero(err
); break; 
2173         _dispatch_io_syscall_switch(err
, 
2174                 fcntl(op
->fd_entry
->fd
, F_RDADVISE
, &advise
), 
2175                 case EFBIG
: break; // advised past the end of the file rdar://10415691 
2176                 case ENOTSUP
: break; // not all FS support radvise rdar://13484629 
2177                 // TODO: set disk status on error 
2178                 default: (void)dispatch_assume_zero(err
); break; 
2184 _dispatch_operation_perform(dispatch_operation_t op
) 
2186         _dispatch_op_debug("perform", op
); 
2187         int err 
= _dispatch_io_get_error(op
, NULL
, true); 
2191         _dispatch_object_debug(op
, "%s", __func__
); 
2193                 size_t max_buf_siz 
= op
->params
.high
; 
2194                 size_t chunk_siz 
= dispatch_io_defaults
.chunk_size
; 
2195                 if (op
->direction 
== DOP_DIR_READ
) { 
2196                         // If necessary, create a buffer for the ongoing operation, large 
2197                         // enough to fit chunk_size but at most high-water 
2198                         size_t data_siz 
= dispatch_data_get_size(op
->data
); 
2200                                 dispatch_assert(data_siz 
< max_buf_siz
); 
2201                                 max_buf_siz 
-= data_siz
; 
2203                         if (max_buf_siz 
> chunk_siz
) { 
2204                                 max_buf_siz 
= chunk_siz
; 
2206                         if (op
->length 
< SIZE_MAX
) { 
2207                                 op
->buf_siz 
= op
->length 
- op
->total
; 
2208                                 if (op
->buf_siz 
> max_buf_siz
) { 
2209                                         op
->buf_siz 
= max_buf_siz
; 
2212                                 op
->buf_siz 
= max_buf_siz
; 
2214                         op
->buf 
= valloc(op
->buf_siz
); 
2215                         _dispatch_op_debug("buffer allocated", op
); 
2216                 } else if (op
->direction 
== DOP_DIR_WRITE
) { 
2217                         // Always write the first data piece, if that is smaller than a 
2218                         // chunk, accumulate further data pieces until chunk size is reached 
2219                         if (chunk_siz 
> max_buf_siz
) { 
2220                                 chunk_siz 
= max_buf_siz
; 
2223                         dispatch_data_apply(op
->data
, 
2224                                         ^(dispatch_data_t region DISPATCH_UNUSED
, 
2225                                         size_t offset DISPATCH_UNUSED
, 
2226                                         const void* buf DISPATCH_UNUSED
, size_t len
) { 
2227                                 size_t siz 
= op
->buf_siz 
+ len
; 
2228                                 if (!op
->buf_siz 
|| siz 
<= chunk_siz
) { 
2231                                 return (bool)(siz 
< chunk_siz
); 
2233                         if (op
->buf_siz 
> max_buf_siz
) { 
2234                                 op
->buf_siz 
= max_buf_siz
; 
2237                         d 
= dispatch_data_create_subrange(op
->data
, 0, op
->buf_siz
); 
2238                         op
->buf_data 
= dispatch_data_create_map(d
, (const void**)&op
->buf
, 
2240                         _dispatch_io_data_release(d
); 
2241                         _dispatch_op_debug("buffer mapped", op
); 
2244         if (op
->fd_entry
->fd 
== -1) { 
2245                 err 
= _dispatch_fd_entry_open(op
->fd_entry
, op
->channel
); 
2250         void *buf 
= op
->buf 
+ op
->buf_len
; 
2251         size_t len 
= op
->buf_siz 
- op
->buf_len
; 
2252         off_t off 
= (off_t
)((size_t)op
->offset 
+ op
->total
); 
2253         ssize_t processed 
= -1; 
2255         if (op
->direction 
== DOP_DIR_READ
) { 
2256                 if (op
->params
.type 
== DISPATCH_IO_STREAM
) { 
2257                         processed 
= read(op
->fd_entry
->fd
, buf
, len
); 
2258                 } else if (op
->params
.type 
== DISPATCH_IO_RANDOM
) { 
2259                         processed 
= pread(op
->fd_entry
->fd
, buf
, len
, off
); 
2261         } else if (op
->direction 
== DOP_DIR_WRITE
) { 
2262                 if (op
->params
.type 
== DISPATCH_IO_STREAM
) { 
2263                         processed 
= write(op
->fd_entry
->fd
, buf
, len
); 
2264                 } else if (op
->params
.type 
== DISPATCH_IO_RANDOM
) { 
2265                         processed 
= pwrite(op
->fd_entry
->fd
, buf
, len
, off
); 
2268         // Encountered an error on the file descriptor 
2269         if (processed 
== -1) { 
2276         // EOF is indicated by two handler invocations 
2277         if (processed 
== 0) { 
2278                 _dispatch_op_debug("performed: EOF", op
); 
2279                 return DISPATCH_OP_DELIVER_AND_COMPLETE
; 
2281         op
->buf_len 
+= (size_t)processed
; 
2282         op
->total 
+= (size_t)processed
; 
2283         if (op
->total 
== op
->length
) { 
2284                 // Finished processing all the bytes requested by the operation 
2285                 return DISPATCH_OP_COMPLETE
; 
2287                 // Deliver data only if we satisfy the filters 
2288                 return DISPATCH_OP_DELIVER
; 
2291         if (err 
== EAGAIN 
|| err 
== EWOULDBLOCK
) { 
2292                 // For disk based files with blocking I/O we should never get EAGAIN 
2293                 dispatch_assert(!op
->fd_entry
->disk
); 
2294                 _dispatch_op_debug("performed: EAGAIN/EWOULDBLOCK", op
); 
2295                 if (op
->direction 
== DOP_DIR_READ 
&& op
->total 
&& 
2296                                 op
->channel 
== op
->fd_entry
->convenience_channel
) { 
2297                         // Convenience read with available data completes on EAGAIN 
2298                         return DISPATCH_OP_COMPLETE_RESUME
; 
2300                 return DISPATCH_OP_RESUME
; 
2302         _dispatch_op_debug("performed: err %d", op
, err
); 
2306                 return DISPATCH_OP_ERR
; 
2308                 (void)os_atomic_cmpxchg2o(op
->fd_entry
, err
, 0, err
, relaxed
); 
2309                 return DISPATCH_OP_FD_ERR
; 
2311                 return DISPATCH_OP_COMPLETE
; 
2316 _dispatch_operation_deliver_data(dispatch_operation_t op
, 
2317                 dispatch_op_flags_t flags
) 
2319         // Either called from stream resp. pick queue or when op is finalized 
2320         dispatch_data_t data 
= NULL
; 
2322         size_t undelivered 
= op
->undelivered 
+ op
->buf_len
; 
2323         bool deliver 
= (flags 
& (DOP_DELIVER
|DOP_DONE
)) || 
2324                         (op
->flags 
& DOP_DELIVER
); 
2325         op
->flags 
= DOP_DEFAULT
; 
2327                 // Don't deliver data until low water mark has been reached 
2328                 if (undelivered 
>= op
->params
.low
) { 
2330                 } else if (op
->buf_len 
< op
->buf_siz
) { 
2331                         // Request buffer is not yet used up 
2332                         _dispatch_op_debug("buffer data: undelivered %zu", op
, undelivered
); 
2337                 if (!err 
&& (op
->channel
->atomic_flags 
& DIO_STOPPED
)) { 
2342         // Deliver data or buffer used up 
2343         if (op
->direction 
== DOP_DIR_READ
) { 
2345                         void *buf 
= op
->buf
; 
2346                         data 
= dispatch_data_create(buf
, op
->buf_len
, NULL
, 
2347                                         DISPATCH_DATA_DESTRUCTOR_FREE
); 
2350                         dispatch_data_t d 
= dispatch_data_create_concat(op
->data
, data
); 
2351                         _dispatch_io_data_release(op
->data
); 
2352                         _dispatch_io_data_release(data
); 
2357                 op
->data 
= deliver 
? dispatch_data_empty 
: data
; 
2358         } else if (op
->direction 
== DOP_DIR_WRITE
) { 
2360                         data 
= dispatch_data_create_subrange(op
->data
, op
->buf_len
, 
2363                 if (op
->buf_data 
&& op
->buf_len 
== op
->buf_siz
) { 
2364                         _dispatch_io_data_release(op
->buf_data
); 
2365                         op
->buf_data 
= NULL
; 
2368                         // Trim newly written buffer from head of unwritten data 
2371                                 _dispatch_io_data_retain(data
); 
2374                                 d 
= dispatch_data_create_subrange(op
->data
, op
->buf_siz
, 
2377                         _dispatch_io_data_release(op
->data
); 
2381                 dispatch_assert(op
->direction 
< DOP_DIR_MAX
); 
2384         if (!deliver 
|| ((flags 
& DOP_NO_EMPTY
) && !dispatch_data_get_size(data
))) { 
2385                 op
->undelivered 
= undelivered
; 
2386                 _dispatch_op_debug("buffer data: undelivered %zu", op
, undelivered
); 
2389         op
->undelivered 
= 0; 
2390         _dispatch_object_debug(op
, "%s", __func__
); 
2391         _dispatch_op_debug("deliver data", op
); 
2392         dispatch_op_direction_t direction 
= op
->direction
; 
2393         dispatch_io_handler_t handler 
= op
->handler
; 
2394         dispatch_fd_entry_t fd_entry 
= op
->fd_entry
; 
2395         _dispatch_fd_entry_retain(fd_entry
); 
2396         dispatch_io_t channel 
= op
->channel
; 
2397         _dispatch_retain(channel
); 
2398         // Note that data delivery may occur after the operation is freed 
2399         dispatch_async(op
->op_q
, ^{ 
2400                 bool done 
= (flags 
& DOP_DONE
); 
2401                 dispatch_data_t d 
= data
; 
2403                         if (direction 
== DOP_DIR_READ 
&& err
) { 
2404                                 if (dispatch_data_get_size(d
)) { 
2405                                         _dispatch_op_debug("IO handler invoke", op
); 
2406                                         handler(false, d
, 0); 
2409                         } else if (direction 
== DOP_DIR_WRITE 
&& !err
) { 
2413                 _dispatch_op_debug("IO handler invoke: err %d", op
, err
); 
2414                 handler(done
, d
, err
); 
2415                 _dispatch_release(channel
); 
2416                 _dispatch_fd_entry_release(fd_entry
); 
2417                 _dispatch_io_data_release(data
); 
2422 #pragma mark dispatch_io_debug 
2425 _dispatch_io_debug_attr(dispatch_io_t channel
, char* buf
, size_t bufsiz
) 
2427         dispatch_queue_t target 
= channel
->do_targetq
; 
2428         return dsnprintf(buf
, bufsiz
, "type = %s, fd = 0x%x, %sfd_entry = %p, " 
2429                         "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = " 
2430                         "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ", 
2431                         channel
->params
.type 
== DISPATCH_IO_STREAM 
? "stream" : "random", 
2432                         channel
->fd_actual
, channel
->atomic_flags 
& DIO_STOPPED 
? 
2433                         "stopped, " : channel
->atomic_flags 
& DIO_CLOSED 
? "closed, " : "", 
2434                         channel
->fd_entry
, channel
->queue
, target 
&& target
->dq_label 
? 
2435                         target
->dq_label 
: "", target
, channel
->barrier_queue
, 
2436                         channel
->barrier_group
, channel
->err
, channel
->params
.low
, 
2437                         channel
->params
.high
, channel
->params
.interval_flags 
& 
2438                         DISPATCH_IO_STRICT_INTERVAL 
? "(strict)" : "", 
2439                         (unsigned long long) channel
->params
.interval
); 
2443 _dispatch_io_debug(dispatch_io_t channel
, char* buf
, size_t bufsiz
) 
2446         offset 
+= dsnprintf(&buf
[offset
], bufsiz 
- offset
, "%s[%p] = { ", 
2447                         dx_kind(channel
), channel
); 
2448         offset 
+= _dispatch_object_debug_attr(channel
, &buf
[offset
], 
2450         offset 
+= _dispatch_io_debug_attr(channel
, &buf
[offset
], bufsiz 
- offset
); 
2451         offset 
+= dsnprintf(&buf
[offset
], bufsiz 
- offset
, "}"); 
2456 _dispatch_operation_debug_attr(dispatch_operation_t op
, char* buf
, 
2459         dispatch_queue_t target 
= op
->do_targetq
; 
2460         dispatch_queue_t oqtarget 
= op
->op_q 
? op
->op_q
->do_targetq 
: NULL
; 
2461         return dsnprintf(buf
, bufsiz
, "type = %s %s, fd = 0x%x, fd_entry = %p, " 
2462                         "channel = %p, queue = %p -> %s[%p], target = %s[%p], " 
2463                         "offset = %lld, length = %zu, done = %zu, undelivered = %zu, " 
2464                         "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, " 
2465                         "interval%s = %llu ", op
->params
.type 
== DISPATCH_IO_STREAM 
? 
2466                         "stream" : "random", op
->direction 
== DOP_DIR_READ 
? "read" : 
2467                         "write", op
->fd_entry 
? op
->fd_entry
->fd 
: -1, op
->fd_entry
, 
2468                         op
->channel
, op
->op_q
, oqtarget 
&& oqtarget
->dq_label 
? 
2469                         oqtarget
->dq_label 
: "", oqtarget
, target 
&& target
->dq_label 
? 
2470                         target
->dq_label 
: "", target
, (long long)op
->offset
, op
->length
, 
2471                         op
->total
, op
->undelivered 
+ op
->buf_len
, op
->flags
, op
->err
, 
2472                         op
->params
.low
, op
->params
.high
, op
->params
.interval_flags 
& 
2473                         DISPATCH_IO_STRICT_INTERVAL 
? "(strict)" : "", 
2474                         (unsigned long long)op
->params
.interval
); 
2478 _dispatch_operation_debug(dispatch_operation_t op
, char* buf
, size_t bufsiz
) 
2481         offset 
+= dsnprintf(&buf
[offset
], bufsiz 
- offset
, "%s[%p] = { ", 
2483         offset 
+= _dispatch_object_debug_attr(op
, &buf
[offset
], bufsiz 
- offset
); 
2484         offset 
+= _dispatch_operation_debug_attr(op
, &buf
[offset
], bufsiz 
- offset
); 
2485         offset 
+= dsnprintf(&buf
[offset
], bufsiz 
- offset
, "}");