2  * Copyright (c) 2009-2013 Apple Inc. All rights reserved. 
   4  * @APPLE_APACHE_LICENSE_HEADER_START@ 
   6  * Licensed under the Apache License, Version 2.0 (the "License"); 
   7  * you may not use this file except in compliance with the License. 
   8  * You may obtain a copy of the License at 
  10  *     http://www.apache.org/licenses/LICENSE-2.0 
  12  * Unless required by applicable law or agreed to in writing, software 
  13  * distributed under the License is distributed on an "AS IS" BASIS, 
  14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  15  * See the License for the specific language governing permissions and 
  16  * limitations under the License. 
  18  * @APPLE_APACHE_LICENSE_HEADER_END@ 
  23 #ifndef DISPATCH_IO_DEBUG 
  24 #define DISPATCH_IO_DEBUG DISPATCH_DEBUG 
  28 #define _dispatch_fd_debug(msg, fd, args...) \ 
  29         _dispatch_debug("fd[0x%x]: " msg, (fd), ##args) 
  31 #define _dispatch_fd_debug(msg, fd, args...) 
  35 #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x) 
  36 #define _dispatch_io_data_release(x) _dispatch_objc_release(x) 
  38 #define _dispatch_io_data_retain(x) dispatch_retain(x) 
  39 #define _dispatch_io_data_release(x) dispatch_release(x) 
  42 typedef void (^dispatch_fd_entry_init_callback_t
)(dispatch_fd_entry_t fd_entry
); 
  44 DISPATCH_EXPORT DISPATCH_NOTHROW
 
  45 void _dispatch_iocntl(uint32_t param
, uint64_t value
); 
  47 static dispatch_operation_t 
_dispatch_operation_create( 
  48                 dispatch_op_direction_t direction
, dispatch_io_t channel
, off_t offset
, 
  49                 size_t length
, dispatch_data_t data
, dispatch_queue_t queue
, 
  50                 dispatch_io_handler_t handler
); 
  51 static void _dispatch_operation_enqueue(dispatch_operation_t op
, 
  52                 dispatch_op_direction_t direction
, dispatch_data_t data
); 
  53 static dispatch_source_t 
_dispatch_operation_timer(dispatch_queue_t tq
, 
  54                 dispatch_operation_t op
); 
  55 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
); 
  56 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
); 
  57 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd
, 
  58                 dispatch_fd_entry_init_callback_t completion_callback
); 
  59 static dispatch_fd_entry_t 
_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
, 
  61 static dispatch_fd_entry_t 
_dispatch_fd_entry_create_with_path( 
  62                 dispatch_io_path_data_t path_data
, dev_t dev
, mode_t mode
); 
  63 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
, 
  64                 dispatch_io_t channel
); 
  65 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
, 
  66                 dispatch_io_t channel
); 
  67 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry
, 
  69 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
, 
  70                 dispatch_op_direction_t direction
); 
  71 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
); 
  72 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream
, 
  73                 dispatch_operation_t operation
, dispatch_data_t data
); 
  74 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk
, 
  75                 dispatch_operation_t operation
, dispatch_data_t data
); 
  76 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream
, 
  77                 dispatch_io_t channel
); 
  78 static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk
, 
  79                 dispatch_io_t channel
); 
  80 static void _dispatch_stream_source_handler(void *ctx
); 
  81 static void _dispatch_stream_queue_handler(void *ctx
); 
  82 static void _dispatch_stream_handler(void *ctx
); 
  83 static void _dispatch_disk_handler(void *ctx
); 
  84 static void _dispatch_disk_perform(void *ctxt
); 
  85 static void _dispatch_operation_advise(dispatch_operation_t op
, 
  87 static int _dispatch_operation_perform(dispatch_operation_t op
); 
  88 static void _dispatch_operation_deliver_data(dispatch_operation_t op
, 
  89                 dispatch_op_flags_t flags
); 
  91 // Macros to wrap syscalls which return -1 on error, and retry on EINTR 
  92 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \ 
  93                 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \ 
  94                 case EINTR: continue; \ 
  99 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \ 
 100                 _dispatch_io_syscall_switch_noerr(__err, __syscall, \ 
 105 #define _dispatch_io_syscall(__syscall) do { int __err; \ 
 106                 _dispatch_io_syscall_switch(__err, __syscall); \ 
 110         DISPATCH_OP_COMPLETE 
= 1, 
 112         DISPATCH_OP_DELIVER_AND_COMPLETE
, 
 113         DISPATCH_OP_COMPLETE_RESUME
, 
 119 #define _dispatch_io_Block_copy(x) \ 
 120                 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x))) 
 123 #pragma mark dispatch_io_hashtables 
 125 #if TARGET_OS_EMBEDDED 
 126 #define DIO_HASH_SIZE  64u // must be a power of two 
 128 #define DIO_HASH_SIZE 256u // must be a power of two 
 130 #define DIO_HASH(x) ((uintptr_t)(x) & (DIO_HASH_SIZE - 1)) 
 132 // Global hashtable of dev_t -> disk_s mappings 
 133 DISPATCH_CACHELINE_ALIGN
 
 134 static TAILQ_HEAD(, dispatch_disk_s
) _dispatch_io_devs
[DIO_HASH_SIZE
]; 
 135 // Global hashtable of fd -> fd_entry_s mappings 
 136 DISPATCH_CACHELINE_ALIGN
 
 137 static TAILQ_HEAD(, dispatch_fd_entry_s
) _dispatch_io_fds
[DIO_HASH_SIZE
]; 
 139 static dispatch_once_t  _dispatch_io_devs_lockq_pred
; 
 140 static dispatch_queue_t _dispatch_io_devs_lockq
; 
 141 static dispatch_queue_t _dispatch_io_fds_lockq
; 
 144 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED
) 
 146         _dispatch_io_fds_lockq 
= dispatch_queue_create( 
 147                         "com.apple.libdispatch-io.fd_lockq", NULL
); 
 149         for (i 
= 0; i 
< DIO_HASH_SIZE
; i
++) { 
 150                 TAILQ_INIT(&_dispatch_io_fds
[i
]); 
 155 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED
) 
 157         _dispatch_io_devs_lockq 
= dispatch_queue_create( 
 158                         "com.apple.libdispatch-io.dev_lockq", NULL
); 
 160         for (i 
= 0; i 
< DIO_HASH_SIZE
; i
++) { 
 161                 TAILQ_INIT(&_dispatch_io_devs
[i
]); 
 166 #pragma mark dispatch_io_defaults 
 169         DISPATCH_IOCNTL_CHUNK_PAGES 
= 1, 
 170         DISPATCH_IOCNTL_LOW_WATER_CHUNKS
, 
 171         DISPATCH_IOCNTL_INITIAL_DELIVERY
, 
 172         DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
, 
 175 static struct dispatch_io_defaults_s 
{ 
 176         size_t chunk_pages
, low_water_chunks
, max_pending_io_reqs
; 
 177         bool initial_delivery
; 
 178 } dispatch_io_defaults 
= { 
 179         .chunk_pages 
= DIO_MAX_CHUNK_PAGES
, 
 180         .low_water_chunks 
= DIO_DEFAULT_LOW_WATER_CHUNKS
, 
 181         .max_pending_io_reqs 
= DIO_MAX_PENDING_IO_REQS
, 
 184 #define _dispatch_iocntl_set_default(p, v) do { \ 
 185                 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \ 
 189 _dispatch_iocntl(uint32_t param
, uint64_t value
) 
 192         case DISPATCH_IOCNTL_CHUNK_PAGES
: 
 193                 _dispatch_iocntl_set_default(chunk_pages
, value
); 
 195         case DISPATCH_IOCNTL_LOW_WATER_CHUNKS
: 
 196                 _dispatch_iocntl_set_default(low_water_chunks
, value
); 
 198         case DISPATCH_IOCNTL_INITIAL_DELIVERY
: 
 199                 _dispatch_iocntl_set_default(initial_delivery
, value
); 
 200         case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
: 
 201                 _dispatch_iocntl_set_default(max_pending_io_reqs
, value
); 
 207 #pragma mark dispatch_io_t 
 210 _dispatch_io_create(dispatch_io_type_t type
) 
 212         dispatch_io_t channel 
= _dispatch_alloc(DISPATCH_VTABLE(io
), 
 213                         sizeof(struct dispatch_io_s
)); 
 214         channel
->do_next 
= DISPATCH_OBJECT_LISTLESS
; 
 215         channel
->do_targetq 
= _dispatch_get_root_queue(0, true); 
 216         channel
->params
.type 
= type
; 
 217         channel
->params
.high 
= SIZE_MAX
; 
 218         channel
->params
.low 
= dispatch_io_defaults
.low_water_chunks 
* 
 219                         dispatch_io_defaults
.chunk_pages 
* PAGE_SIZE
; 
 220         channel
->queue 
= dispatch_queue_create("com.apple.libdispatch-io.channelq", 
 226 _dispatch_io_init(dispatch_io_t channel
, dispatch_fd_entry_t fd_entry
, 
 227                 dispatch_queue_t queue
, int err
, void (^cleanup_handler
)(int)) 
 229         // Enqueue the cleanup handler on the suspended close queue 
 230         if (cleanup_handler
) { 
 231                 _dispatch_retain(queue
); 
 232                 dispatch_async(!err 
? fd_entry
->close_queue 
: channel
->queue
, ^{ 
 233                         dispatch_async(queue
, ^{ 
 234                                 _dispatch_fd_debug("cleanup handler invoke", -1); 
 235                                 cleanup_handler(err
); 
 237                         _dispatch_release(queue
); 
 241                 channel
->fd_entry 
= fd_entry
; 
 242                 dispatch_retain(fd_entry
->barrier_queue
); 
 243                 dispatch_retain(fd_entry
->barrier_group
); 
 244                 channel
->barrier_queue 
= fd_entry
->barrier_queue
; 
 245                 channel
->barrier_group 
= fd_entry
->barrier_group
; 
 247                 // Still need to create a barrier queue, since all operations go 
 249                 channel
->barrier_queue 
= dispatch_queue_create( 
 250                                 "com.apple.libdispatch-io.barrierq", NULL
); 
 251                 channel
->barrier_group 
= dispatch_group_create(); 
 256 _dispatch_io_dispose(dispatch_io_t channel
) 
 258         _dispatch_object_debug(channel
, "%s", __func__
); 
 259         if (channel
->fd_entry 
&& 
 260                         !(channel
->atomic_flags 
& (DIO_CLOSED
|DIO_STOPPED
))) { 
 261                 if (channel
->fd_entry
->path_data
) { 
 262                         // This modification is safe since path_data->channel is checked 
 263                         // only on close_queue (which is still suspended at this point) 
 264                         channel
->fd_entry
->path_data
->channel 
= NULL
; 
 266                 // Cleanup handlers will only run when all channels related to this 
 268                 _dispatch_fd_entry_release(channel
->fd_entry
); 
 270         if (channel
->queue
) { 
 271                 dispatch_release(channel
->queue
); 
 273         if (channel
->barrier_queue
) { 
 274                 dispatch_release(channel
->barrier_queue
); 
 276         if (channel
->barrier_group
) { 
 277                 dispatch_release(channel
->barrier_group
); 
 282 _dispatch_io_validate_type(dispatch_io_t channel
, mode_t mode
) 
 287         } else if (channel
->params
.type 
== DISPATCH_IO_RANDOM 
&& 
 288                         (S_ISFIFO(mode
) || S_ISSOCK(mode
))) { 
 295 _dispatch_io_get_error(dispatch_operation_t op
, dispatch_io_t channel
, 
 301                 channel 
= op
->channel
; 
 303         if (channel
->atomic_flags 
& (DIO_CLOSED
|DIO_STOPPED
)) { 
 304                 if (!ignore_closed 
|| channel
->atomic_flags 
& DIO_STOPPED
) { 
 310                 err 
= op 
? op
->fd_entry
->err 
: channel
->err
; 
 316 #pragma mark dispatch_io_channels 
 319 dispatch_io_create(dispatch_io_type_t type
, dispatch_fd_t fd
, 
 320                 dispatch_queue_t queue
, void (^cleanup_handler
)(int)) 
 322         if (type 
!= DISPATCH_IO_STREAM 
&& type 
!= DISPATCH_IO_RANDOM
) { 
 325         _dispatch_fd_debug("io create", fd
); 
 326         dispatch_io_t channel 
= _dispatch_io_create(type
); 
 328         channel
->fd_actual 
= fd
; 
 329         dispatch_suspend(channel
->queue
); 
 330         _dispatch_retain(queue
); 
 331         _dispatch_retain(channel
); 
 332         _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) { 
 334                 int err 
= fd_entry
->err
; 
 336                         err 
= _dispatch_io_validate_type(channel
, fd_entry
->stat
.mode
); 
 338                 if (!err 
&& type 
== DISPATCH_IO_RANDOM
) { 
 340                         _dispatch_io_syscall_switch_noerr(err
, 
 341                                 f_ptr 
= lseek(fd_entry
->fd
, 0, SEEK_CUR
), 
 342                                 case 0: channel
->f_ptr 
= f_ptr
; break; 
 343                                 default: (void)dispatch_assume_zero(err
); break; 
 347                 _dispatch_fd_entry_retain(fd_entry
); 
 348                 _dispatch_io_init(channel
, fd_entry
, queue
, err
, cleanup_handler
); 
 349                 dispatch_resume(channel
->queue
); 
 350                 _dispatch_object_debug(channel
, "%s", __func__
); 
 351                 _dispatch_release(channel
); 
 352                 _dispatch_release(queue
); 
 354         _dispatch_object_debug(channel
, "%s", __func__
); 
 359 dispatch_io_create_f(dispatch_io_type_t type
, dispatch_fd_t fd
, 
 360                 dispatch_queue_t queue
, void *context
, 
 361                 void (*cleanup_handler
)(void *context
, int error
)) 
 363         return dispatch_io_create(type
, fd
, queue
, !cleanup_handler 
? NULL 
: 
 364                         ^(int error
){ cleanup_handler(context
, error
); }); 
 368 dispatch_io_create_with_path(dispatch_io_type_t type
, const char *path
, 
 369                 int oflag
, mode_t mode
, dispatch_queue_t queue
, 
 370                 void (^cleanup_handler
)(int error
)) 
 372         if ((type 
!= DISPATCH_IO_STREAM 
&& type 
!= DISPATCH_IO_RANDOM
) || 
 373                         !(path 
&& *path 
== '/')) { 
 376         size_t pathlen 
= strlen(path
); 
 377         dispatch_io_path_data_t path_data 
= malloc(sizeof(*path_data
) + pathlen
+1); 
 381         _dispatch_fd_debug("io create with path %s", -1, path
); 
 382         dispatch_io_t channel 
= _dispatch_io_create(type
); 
 384         channel
->fd_actual 
= -1; 
 385         path_data
->channel 
= channel
; 
 386         path_data
->oflag 
= oflag
; 
 387         path_data
->mode 
= mode
; 
 388         path_data
->pathlen 
= pathlen
; 
 389         memcpy(path_data
->path
, path
, pathlen 
+ 1); 
 390         _dispatch_retain(queue
); 
 391         _dispatch_retain(channel
); 
 392         dispatch_async(channel
->queue
, ^{ 
 395                 _dispatch_io_syscall_switch_noerr(err
, 
 396                         (path_data
->oflag 
& O_NOFOLLOW
) == O_NOFOLLOW 
|| 
 397                                         (path_data
->oflag 
& O_SYMLINK
) == O_SYMLINK 
? 
 398                                         lstat(path_data
->path
, &st
) : stat(path_data
->path
, &st
), 
 400                                 err 
= _dispatch_io_validate_type(channel
, st
.st_mode
); 
 403                                 if ((path_data
->oflag 
& O_CREAT
) && 
 404                                                 (*(path_data
->path 
+ path_data
->pathlen 
- 1) != '/')) { 
 405                                         // Check parent directory 
 406                                         char *c 
= strrchr(path_data
->path
, '/'); 
 410                                         _dispatch_io_syscall_switch_noerr(perr
, 
 411                                                 stat(path_data
->path
, &st
), 
 413                                                         // Since the parent directory exists, open() will 
 414                                                         // create a regular file after the fd_entry has 
 416                                                         st
.st_mode 
= S_IFREG
; 
 427                         _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
); 
 428                         _dispatch_release(channel
); 
 429                         _dispatch_release(queue
); 
 432                 dispatch_suspend(channel
->queue
); 
 433                 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
, 
 434                                 _dispatch_io_devs_lockq_init
); 
 435                 dispatch_async(_dispatch_io_devs_lockq
, ^{ 
 436                         dispatch_fd_entry_t fd_entry 
= _dispatch_fd_entry_create_with_path( 
 437                                         path_data
, st
.st_dev
, st
.st_mode
); 
 438                         _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
); 
 439                         dispatch_resume(channel
->queue
); 
 440                         _dispatch_object_debug(channel
, "%s", __func__
); 
 441                         _dispatch_release(channel
); 
 442                         _dispatch_release(queue
); 
 445         _dispatch_object_debug(channel
, "%s", __func__
); 
 450 dispatch_io_create_with_path_f(dispatch_io_type_t type
, const char *path
, 
 451                 int oflag
, mode_t mode
, dispatch_queue_t queue
, void *context
, 
 452                 void (*cleanup_handler
)(void *context
, int error
)) 
 454         return dispatch_io_create_with_path(type
, path
, oflag
, mode
, queue
, 
 455                         !cleanup_handler 
? NULL 
: 
 456                         ^(int error
){ cleanup_handler(context
, error
); }); 
 460 dispatch_io_create_with_io(dispatch_io_type_t type
, dispatch_io_t in_channel
, 
 461                 dispatch_queue_t queue
, void (^cleanup_handler
)(int error
)) 
 463         if (type 
!= DISPATCH_IO_STREAM 
&& type 
!= DISPATCH_IO_RANDOM
) { 
 466         _dispatch_fd_debug("io create with io %p", -1, in_channel
); 
 467         dispatch_io_t channel 
= _dispatch_io_create(type
); 
 468         dispatch_suspend(channel
->queue
); 
 469         _dispatch_retain(queue
); 
 470         _dispatch_retain(channel
); 
 471         _dispatch_retain(in_channel
); 
 472         dispatch_async(in_channel
->queue
, ^{ 
 473                 int err0 
= _dispatch_io_get_error(NULL
, in_channel
, false); 
 476                         _dispatch_io_init(channel
, NULL
, queue
, err0
, cleanup_handler
); 
 477                         dispatch_resume(channel
->queue
); 
 478                         _dispatch_release(channel
); 
 479                         _dispatch_release(in_channel
); 
 480                         _dispatch_release(queue
); 
 483                 dispatch_async(in_channel
->barrier_queue
, ^{ 
 484                         int err 
= _dispatch_io_get_error(NULL
, in_channel
, false); 
 485                         // If there is no error, the fd_entry for the in_channel is valid. 
 486                         // Since we are running on in_channel's queue, the fd_entry has been 
 487                         // fully resolved and will stay valid for the duration of this block 
 489                                 err 
= in_channel
->err
; 
 491                                         err 
= in_channel
->fd_entry
->err
; 
 495                                 err 
= _dispatch_io_validate_type(channel
, 
 496                                                 in_channel
->fd_entry
->stat
.mode
); 
 498                         if (!err 
&& type 
== DISPATCH_IO_RANDOM 
&& in_channel
->fd 
!= -1) { 
 500                                 _dispatch_io_syscall_switch_noerr(err
, 
 501                                         f_ptr 
= lseek(in_channel
->fd_entry
->fd
, 0, SEEK_CUR
), 
 502                                         case 0: channel
->f_ptr 
= f_ptr
; break; 
 503                                         default: (void)dispatch_assume_zero(err
); break; 
 508                                 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
); 
 509                                 dispatch_resume(channel
->queue
); 
 510                                 _dispatch_release(channel
); 
 511                                 _dispatch_release(in_channel
); 
 512                                 _dispatch_release(queue
); 
 515                         if (in_channel
->fd 
== -1) { 
 516                                 // in_channel was created from path 
 518                                 channel
->fd_actual 
= -1; 
 519                                 mode_t mode 
= in_channel
->fd_entry
->stat
.mode
; 
 520                                 dev_t dev 
= in_channel
->fd_entry
->stat
.dev
; 
 521                                 size_t path_data_len 
= sizeof(struct dispatch_io_path_data_s
) + 
 522                                                 in_channel
->fd_entry
->path_data
->pathlen 
+ 1; 
 523                                 dispatch_io_path_data_t path_data 
= malloc(path_data_len
); 
 524                                 memcpy(path_data
, in_channel
->fd_entry
->path_data
, 
 526                                 path_data
->channel 
= channel
; 
 527                                 // lockq_io_devs is known to already exist 
 528                                 dispatch_async(_dispatch_io_devs_lockq
, ^{ 
 529                                         dispatch_fd_entry_t fd_entry
; 
 530                                         fd_entry 
= _dispatch_fd_entry_create_with_path(path_data
, 
 532                                         _dispatch_io_init(channel
, fd_entry
, queue
, 0, 
 534                                         dispatch_resume(channel
->queue
); 
 535                                         _dispatch_release(channel
); 
 536                                         _dispatch_release(queue
); 
 539                                 dispatch_fd_entry_t fd_entry 
= in_channel
->fd_entry
; 
 540                                 channel
->fd 
= in_channel
->fd
; 
 541                                 channel
->fd_actual 
= in_channel
->fd_actual
; 
 542                                 _dispatch_fd_entry_retain(fd_entry
); 
 543                                 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
); 
 544                                 dispatch_resume(channel
->queue
); 
 545                                 _dispatch_release(channel
); 
 546                                 _dispatch_release(queue
); 
 548                         _dispatch_release(in_channel
); 
 549                         _dispatch_object_debug(channel
, "%s", __func__
); 
 552         _dispatch_object_debug(channel
, "%s", __func__
); 
 557 dispatch_io_create_with_io_f(dispatch_io_type_t type
, dispatch_io_t in_channel
, 
 558                 dispatch_queue_t queue
, void *context
, 
 559                 void (*cleanup_handler
)(void *context
, int error
)) 
 561         return dispatch_io_create_with_io(type
, in_channel
, queue
, 
 562                         !cleanup_handler 
? NULL 
: 
 563                         ^(int error
){ cleanup_handler(context
, error
); }); 
 567 #pragma mark dispatch_io_accessors 
 570 dispatch_io_set_high_water(dispatch_io_t channel
, size_t high_water
) 
 572         _dispatch_retain(channel
); 
 573         dispatch_async(channel
->queue
, ^{ 
 574                 _dispatch_fd_debug("io set high water", channel
->fd
); 
 575                 if (channel
->params
.low 
> high_water
) { 
 576                         channel
->params
.low 
= high_water
; 
 578                 channel
->params
.high 
= high_water 
? high_water 
: 1; 
 579                 _dispatch_release(channel
); 
 584 dispatch_io_set_low_water(dispatch_io_t channel
, size_t low_water
) 
 586         _dispatch_retain(channel
); 
 587         dispatch_async(channel
->queue
, ^{ 
 588                 _dispatch_fd_debug("io set low water", channel
->fd
); 
 589                 if (channel
->params
.high 
< low_water
) { 
 590                         channel
->params
.high 
= low_water 
? low_water 
: 1; 
 592                 channel
->params
.low 
= low_water
; 
 593                 _dispatch_release(channel
); 
 598 dispatch_io_set_interval(dispatch_io_t channel
, uint64_t interval
, 
 601         _dispatch_retain(channel
); 
 602         dispatch_async(channel
->queue
, ^{ 
 603                 _dispatch_fd_debug("io set interval", channel
->fd
); 
 604                 channel
->params
.interval 
= interval 
< INT64_MAX 
? interval 
: INT64_MAX
; 
 605                 channel
->params
.interval_flags 
= flags
; 
 606                 _dispatch_release(channel
); 
 611 _dispatch_io_set_target_queue(dispatch_io_t channel
, dispatch_queue_t dq
) 
 613         _dispatch_retain(dq
); 
 614         _dispatch_retain(channel
); 
 615         dispatch_async(channel
->queue
, ^{ 
 616                 dispatch_queue_t prev_dq 
= channel
->do_targetq
; 
 617                 channel
->do_targetq 
= dq
; 
 618                 _dispatch_release(prev_dq
); 
 619                 _dispatch_object_debug(channel
, "%s", __func__
); 
 620                 _dispatch_release(channel
); 
 625 dispatch_io_get_descriptor(dispatch_io_t channel
) 
 627         if (channel
->atomic_flags 
& (DIO_CLOSED
|DIO_STOPPED
)) { 
 630         dispatch_fd_t fd 
= channel
->fd_actual
; 
 631         if (fd 
== -1 && _dispatch_thread_getspecific(dispatch_io_key
) == channel 
&& 
 632                         !_dispatch_io_get_error(NULL
, channel
, false)) { 
 633                 dispatch_fd_entry_t fd_entry 
= channel
->fd_entry
; 
 634                 (void)_dispatch_fd_entry_open(fd_entry
, channel
); 
 636         return channel
->fd_actual
; 
 640 #pragma mark dispatch_io_operations 
 643 _dispatch_io_stop(dispatch_io_t channel
) 
 645         _dispatch_fd_debug("io stop", channel
->fd
); 
 646         (void)dispatch_atomic_or2o(channel
, atomic_flags
, DIO_STOPPED
, relaxed
); 
 647         _dispatch_retain(channel
); 
 648         dispatch_async(channel
->queue
, ^{ 
 649                 dispatch_async(channel
->barrier_queue
, ^{ 
 650                         _dispatch_object_debug(channel
, "%s", __func__
); 
 651                         dispatch_fd_entry_t fd_entry 
= channel
->fd_entry
; 
 653                                 _dispatch_fd_debug("io stop cleanup", channel
->fd
); 
 654                                 _dispatch_fd_entry_cleanup_operations(fd_entry
, channel
); 
 655                                 if (!(channel
->atomic_flags 
& DIO_CLOSED
)) { 
 656                                         channel
->fd_entry 
= NULL
; 
 657                                         _dispatch_fd_entry_release(fd_entry
); 
 659                         } else if (channel
->fd 
!= -1) { 
 660                                 // Stop after close, need to check if fd_entry still exists 
 661                                 _dispatch_retain(channel
); 
 662                                 dispatch_async(_dispatch_io_fds_lockq
, ^{ 
 663                                         _dispatch_object_debug(channel
, "%s", __func__
); 
 664                                         _dispatch_fd_debug("io stop after close cleanup", 
 666                                         dispatch_fd_entry_t fdi
; 
 667                                         uintptr_t hash 
= DIO_HASH(channel
->fd
); 
 668                                         TAILQ_FOREACH(fdi
, &_dispatch_io_fds
[hash
], fd_list
) { 
 669                                                 if (fdi
->fd 
== channel
->fd
) { 
 670                                                         _dispatch_fd_entry_cleanup_operations(fdi
, channel
); 
 674                                         _dispatch_release(channel
); 
 677                         _dispatch_release(channel
); 
 683 dispatch_io_close(dispatch_io_t channel
, unsigned long flags
) 
 685         if (flags 
& DISPATCH_IO_STOP
) { 
 686                 // Don't stop an already stopped channel 
 687                 if (channel
->atomic_flags 
& DIO_STOPPED
) { 
 690                 return _dispatch_io_stop(channel
); 
 692         // Don't close an already closed or stopped channel 
 693         if (channel
->atomic_flags 
& (DIO_CLOSED
|DIO_STOPPED
)) { 
 696         _dispatch_retain(channel
); 
 697         dispatch_async(channel
->queue
, ^{ 
 698                 dispatch_async(channel
->barrier_queue
, ^{ 
 699                         _dispatch_object_debug(channel
, "%s", __func__
); 
 700                         _dispatch_fd_debug("io close", channel
->fd
); 
 701                         if (!(channel
->atomic_flags 
& (DIO_CLOSED
|DIO_STOPPED
))) { 
 702                                 (void)dispatch_atomic_or2o(channel
, atomic_flags
, DIO_CLOSED
, 
 704                                 dispatch_fd_entry_t fd_entry 
= channel
->fd_entry
; 
 706                                         if (!fd_entry
->path_data
) { 
 707                                                 channel
->fd_entry 
= NULL
; 
 709                                         _dispatch_fd_entry_release(fd_entry
); 
 712                         _dispatch_release(channel
); 
 718 dispatch_io_barrier(dispatch_io_t channel
, dispatch_block_t barrier
) 
 720         _dispatch_retain(channel
); 
 721         dispatch_async(channel
->queue
, ^{ 
 722                 dispatch_queue_t io_q 
= channel
->do_targetq
; 
 723                 dispatch_queue_t barrier_queue 
= channel
->barrier_queue
; 
 724                 dispatch_group_t barrier_group 
= channel
->barrier_group
; 
 725                 dispatch_async(barrier_queue
, ^{ 
 726                         dispatch_suspend(barrier_queue
); 
 727                         dispatch_group_notify(barrier_group
, io_q
, ^{ 
 728                                 _dispatch_object_debug(channel
, "%s", __func__
); 
 729                                 _dispatch_thread_setspecific(dispatch_io_key
, channel
); 
 731                                 _dispatch_thread_setspecific(dispatch_io_key
, NULL
); 
 732                                 dispatch_resume(barrier_queue
); 
 733                                 _dispatch_release(channel
); 
 740 dispatch_io_barrier_f(dispatch_io_t channel
, void *context
, 
 741                 dispatch_function_t barrier
) 
 743         return dispatch_io_barrier(channel
, ^{ barrier(context
); }); 
 747 dispatch_io_read(dispatch_io_t channel
, off_t offset
, size_t length
, 
 748                 dispatch_queue_t queue
, dispatch_io_handler_t handler
) 
 750         _dispatch_retain(channel
); 
 751         _dispatch_retain(queue
); 
 752         dispatch_async(channel
->queue
, ^{ 
 753                 dispatch_operation_t op
; 
 754                 op 
= _dispatch_operation_create(DOP_DIR_READ
, channel
, offset
, 
 755                                 length
, dispatch_data_empty
, queue
, handler
); 
 757                         dispatch_queue_t barrier_q 
= channel
->barrier_queue
; 
 758                         dispatch_async(barrier_q
, ^{ 
 759                                 _dispatch_operation_enqueue(op
, DOP_DIR_READ
, 
 760                                                 dispatch_data_empty
); 
 763                 _dispatch_release(channel
); 
 764                 _dispatch_release(queue
); 
 769 dispatch_io_read_f(dispatch_io_t channel
, off_t offset
, size_t length
, 
 770                 dispatch_queue_t queue
, void *context
, 
 771                 dispatch_io_handler_function_t handler
) 
 773         return dispatch_io_read(channel
, offset
, length
, queue
, 
 774                         ^(bool done
, dispatch_data_t d
, int error
){ 
 775                 handler(context
, done
, d
, error
); 
 780 dispatch_io_write(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
, 
 781                 dispatch_queue_t queue
, dispatch_io_handler_t handler
) 
 783         _dispatch_io_data_retain(data
); 
 784         _dispatch_retain(channel
); 
 785         _dispatch_retain(queue
); 
 786         dispatch_async(channel
->queue
, ^{ 
 787                 dispatch_operation_t op
; 
 788                 op 
= _dispatch_operation_create(DOP_DIR_WRITE
, channel
, offset
, 
 789                                 dispatch_data_get_size(data
), data
, queue
, handler
); 
 791                         dispatch_queue_t barrier_q 
= channel
->barrier_queue
; 
 792                         dispatch_async(barrier_q
, ^{ 
 793                                 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
); 
 794                                 _dispatch_io_data_release(data
); 
 797                         _dispatch_io_data_release(data
); 
 799                 _dispatch_release(channel
); 
 800                 _dispatch_release(queue
); 
 805 dispatch_io_write_f(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
, 
 806                 dispatch_queue_t queue
, void *context
, 
 807                 dispatch_io_handler_function_t handler
) 
 809         return dispatch_io_write(channel
, offset
, data
, queue
, 
 810                         ^(bool done
, dispatch_data_t d
, int error
){ 
 811                 handler(context
, done
, d
, error
); 
 816 dispatch_read(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
, 
 817                 void (^handler
)(dispatch_data_t
, int)) 
 819         _dispatch_retain(queue
); 
 820         _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) { 
 823                         int err 
= fd_entry
->err
; 
 824                         dispatch_async(queue
, ^{ 
 825                                 _dispatch_fd_debug("convenience handler invoke", fd
); 
 826                                 handler(dispatch_data_empty
, err
); 
 828                         _dispatch_release(queue
); 
 831                 // Safe to access fd_entry on barrier queue 
 832                 dispatch_io_t channel 
= fd_entry
->convenience_channel
; 
 834                         channel 
= _dispatch_io_create(DISPATCH_IO_STREAM
); 
 836                         channel
->fd_actual 
= fd
; 
 837                         channel
->fd_entry 
= fd_entry
; 
 838                         dispatch_retain(fd_entry
->barrier_queue
); 
 839                         dispatch_retain(fd_entry
->barrier_group
); 
 840                         channel
->barrier_queue 
= fd_entry
->barrier_queue
; 
 841                         channel
->barrier_group 
= fd_entry
->barrier_group
; 
 842                         fd_entry
->convenience_channel 
= channel
; 
 844                 __block dispatch_data_t deliver_data 
= dispatch_data_empty
; 
 846                 dispatch_async(fd_entry
->close_queue
, ^{ 
 847                         dispatch_async(queue
, ^{ 
 848                                 _dispatch_fd_debug("convenience handler invoke", fd
); 
 849                                 handler(deliver_data
, err
); 
 850                                 _dispatch_io_data_release(deliver_data
); 
 852                         _dispatch_release(queue
); 
 854                 dispatch_operation_t op 
= 
 855                         _dispatch_operation_create(DOP_DIR_READ
, channel
, 0, 
 856                                         length
, dispatch_data_empty
, 
 857                                         _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT
, 
 858                                         false), ^(bool done
, dispatch_data_t data
, int error
) { 
 860                                         data 
= dispatch_data_create_concat(deliver_data
, data
); 
 861                                         _dispatch_io_data_release(deliver_data
); 
 869                         _dispatch_operation_enqueue(op
, DOP_DIR_READ
, dispatch_data_empty
); 
 875 dispatch_read_f(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
, 
 876                 void *context
, void (*handler
)(void *, dispatch_data_t
, int)) 
 878         return dispatch_read(fd
, length
, queue
, ^(dispatch_data_t d
, int error
){ 
 879                 handler(context
, d
, error
); 
 884 dispatch_write(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
, 
 885                 void (^handler
)(dispatch_data_t
, int)) 
 887         _dispatch_io_data_retain(data
); 
 888         _dispatch_retain(queue
); 
 889         _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) { 
 892                         int err 
= fd_entry
->err
; 
 893                         dispatch_async(queue
, ^{ 
 894                                 _dispatch_fd_debug("convenience handler invoke", fd
); 
 897                         _dispatch_release(queue
); 
 900                 // Safe to access fd_entry on barrier queue 
 901                 dispatch_io_t channel 
= fd_entry
->convenience_channel
; 
 903                         channel 
= _dispatch_io_create(DISPATCH_IO_STREAM
); 
 905                         channel
->fd_actual 
= fd
; 
 906                         channel
->fd_entry 
= fd_entry
; 
 907                         dispatch_retain(fd_entry
->barrier_queue
); 
 908                         dispatch_retain(fd_entry
->barrier_group
); 
 909                         channel
->barrier_queue 
= fd_entry
->barrier_queue
; 
 910                         channel
->barrier_group 
= fd_entry
->barrier_group
; 
 911                         fd_entry
->convenience_channel 
= channel
; 
 913                 __block dispatch_data_t deliver_data 
= NULL
; 
 915                 dispatch_async(fd_entry
->close_queue
, ^{ 
 916                         dispatch_async(queue
, ^{ 
 917                                 _dispatch_fd_debug("convenience handler invoke", fd
); 
 918                                 handler(deliver_data
, err
); 
 920                                         _dispatch_io_data_release(deliver_data
); 
 923                         _dispatch_release(queue
); 
 925                 dispatch_operation_t op 
= 
 926                         _dispatch_operation_create(DOP_DIR_WRITE
, channel
, 0, 
 927                                         dispatch_data_get_size(data
), data
, 
 928                                         _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT
, 
 929                                         false), ^(bool done
, dispatch_data_t d
, int error
) { 
 932                                                 _dispatch_io_data_retain(d
); 
 939                         _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
); 
 941                 _dispatch_io_data_release(data
); 
 946 dispatch_write_f(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
, 
 947                 void *context
, void (*handler
)(void *, dispatch_data_t
, int)) 
 949         return dispatch_write(fd
, data
, queue
, ^(dispatch_data_t d
, int error
){ 
 950                 handler(context
, d
, error
); 
 955 #pragma mark dispatch_operation_t 
 957 static dispatch_operation_t
 
 958 _dispatch_operation_create(dispatch_op_direction_t direction
, 
 959                 dispatch_io_t channel
, off_t offset
, size_t length
, 
 960                 dispatch_data_t data
, dispatch_queue_t queue
, 
 961                 dispatch_io_handler_t handler
) 
 964         dispatch_assert(direction 
< DOP_DIR_MAX
); 
 965         _dispatch_fd_debug("operation create", channel
->fd
); 
 966 #if DISPATCH_IO_DEBUG 
 967         int fd 
= channel
->fd
; 
 969         // Safe to call _dispatch_io_get_error() with channel->fd_entry since 
 970         // that can only be NULL if atomic_flags are set rdar://problem/8362514 
 971         int err 
= _dispatch_io_get_error(NULL
, channel
, false); 
 972         if (err 
|| !length
) { 
 973                 _dispatch_io_data_retain(data
); 
 974                 _dispatch_retain(queue
); 
 975                 dispatch_async(channel
->barrier_queue
, ^{ 
 976                         dispatch_async(queue
, ^{ 
 977                                 dispatch_data_t d 
= data
; 
 978                                 if (direction 
== DOP_DIR_READ 
&& err
) { 
 980                                 } else if (direction 
== DOP_DIR_WRITE 
&& !err
) { 
 983                                 _dispatch_fd_debug("IO handler invoke", fd
); 
 984                                 handler(true, d
, err
); 
 985                                 _dispatch_io_data_release(data
); 
 987                         _dispatch_release(queue
); 
 991         dispatch_operation_t op 
= _dispatch_alloc(DISPATCH_VTABLE(operation
), 
 992                         sizeof(struct dispatch_operation_s
)); 
 993         op
->do_next 
= DISPATCH_OBJECT_LISTLESS
; 
 994         op
->do_xref_cnt 
= -1; // operation object is not exposed externally 
 995         op
->op_q 
= dispatch_queue_create("com.apple.libdispatch-io.opq", NULL
); 
 996         op
->op_q
->do_targetq 
= queue
; 
 997         _dispatch_retain(queue
); 
 999         op
->direction 
= direction
; 
1000         op
->offset 
= offset 
+ channel
->f_ptr
; 
1001         op
->length 
= length
; 
1002         op
->handler 
= _dispatch_io_Block_copy(handler
); 
1003         _dispatch_retain(channel
); 
1004         op
->channel 
= channel
; 
1005         op
->params 
= channel
->params
; 
1006         // Take a snapshot of the priority of the channel queue. The actual I/O 
1007         // for this operation will be performed at this priority 
1008         dispatch_queue_t targetq 
= op
->channel
->do_targetq
; 
1009         while (fastpath(targetq
->do_targetq
)) { 
1010                 targetq 
= targetq
->do_targetq
; 
1012         op
->do_targetq 
= targetq
; 
1013         _dispatch_object_debug(op
, "%s", __func__
); 
1018 _dispatch_operation_dispose(dispatch_operation_t op
) 
1020         _dispatch_object_debug(op
, "%s", __func__
); 
1021         // Deliver the data if there's any 
1023                 _dispatch_operation_deliver_data(op
, DOP_DONE
); 
1024                 dispatch_group_leave(op
->fd_entry
->barrier_group
); 
1025                 _dispatch_fd_entry_release(op
->fd_entry
); 
1028                 _dispatch_release(op
->channel
); 
1031                 dispatch_release(op
->timer
); 
1033         // For write operations, op->buf is owned by op->buf_data 
1034         if (op
->buf 
&& op
->direction 
== DOP_DIR_READ
) { 
1038                 _dispatch_io_data_release(op
->buf_data
); 
1041                 _dispatch_io_data_release(op
->data
); 
1044                 dispatch_release(op
->op_q
); 
1046         Block_release(op
->handler
); 
1050 _dispatch_operation_enqueue(dispatch_operation_t op
, 
1051                 dispatch_op_direction_t direction
, dispatch_data_t data
) 
1053         // Called from the barrier queue 
1054         _dispatch_io_data_retain(data
); 
1055         // If channel is closed or stopped, then call the handler immediately 
1056         int err 
= _dispatch_io_get_error(NULL
, op
->channel
, false); 
1058                 dispatch_io_handler_t handler 
= op
->handler
; 
1059                 dispatch_async(op
->op_q
, ^{ 
1060                         dispatch_data_t d 
= data
; 
1061                         if (direction 
== DOP_DIR_READ 
&& err
) { 
1063                         } else if (direction 
== DOP_DIR_WRITE 
&& !err
) { 
1066                         handler(true, d
, err
); 
1067                         _dispatch_io_data_release(data
); 
1069                 _dispatch_release(op
); 
1072         // Finish operation init 
1073         op
->fd_entry 
= op
->channel
->fd_entry
; 
1074         _dispatch_fd_entry_retain(op
->fd_entry
); 
1075         dispatch_group_enter(op
->fd_entry
->barrier_group
); 
1076         dispatch_disk_t disk 
= op
->fd_entry
->disk
; 
1078                 dispatch_stream_t stream 
= op
->fd_entry
->streams
[direction
]; 
1079                 dispatch_async(stream
->dq
, ^{ 
1080                         _dispatch_stream_enqueue_operation(stream
, op
, data
); 
1081                         _dispatch_io_data_release(data
); 
1084                 dispatch_async(disk
->pick_queue
, ^{ 
1085                         _dispatch_disk_enqueue_operation(disk
, op
, data
); 
1086                         _dispatch_io_data_release(data
); 
1092 _dispatch_operation_should_enqueue(dispatch_operation_t op
, 
1093                 dispatch_queue_t tq
, dispatch_data_t data
) 
1095         // On stream queue or disk queue 
1096         _dispatch_fd_debug("enqueue operation", op
->fd_entry
->fd
); 
1097         _dispatch_io_data_retain(data
); 
1099         int err 
= _dispatch_io_get_error(op
, NULL
, true); 
1103                 _dispatch_release(op
); 
1106         if (op
->params
.interval
) { 
1107                 dispatch_resume(_dispatch_operation_timer(tq
, op
)); 
1112 static dispatch_source_t
 
1113 _dispatch_operation_timer(dispatch_queue_t tq
, dispatch_operation_t op
) 
1115         // On stream queue or pick queue 
1119         dispatch_source_t timer 
= dispatch_source_create( 
1120                         DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, tq
); 
1121         dispatch_source_set_timer(timer
, dispatch_time(DISPATCH_TIME_NOW
, 
1122                         (int64_t)op
->params
.interval
), op
->params
.interval
, 0); 
1123         dispatch_source_set_event_handler(timer
, ^{ 
1124                 // On stream queue or pick queue 
1125                 if (dispatch_source_testcancel(timer
)) { 
1126                         // Do nothing. The operation has already completed 
1129                 dispatch_op_flags_t flags 
= DOP_DEFAULT
; 
1130                 if (op
->params
.interval_flags 
& DISPATCH_IO_STRICT_INTERVAL
) { 
1131                         // Deliver even if there is less data than the low-water mark 
1132                         flags 
|= DOP_DELIVER
; 
1134                 // If the operation is active, dont deliver data 
1135                 if ((op
->active
) && (flags 
& DOP_DELIVER
)) { 
1138                         _dispatch_operation_deliver_data(op
, flags
); 
1146 #pragma mark dispatch_fd_entry_t 
1148 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD 
1150 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry
) 
1152         guardid_t guard 
= fd_entry
; 
1153         const unsigned int guard_flags 
= GUARD_CLOSE
; 
1154         int err
, fd_flags 
= 0; 
1155         _dispatch_io_syscall_switch_noerr(err
, 
1156                 change_fdguard_np(fd_entry
->fd
, NULL
, 0, &guard
, guard_flags
, 
1159                         fd_entry
->guard_flags 
= guard_flags
; 
1160                         fd_entry
->orig_fd_flags 
= fd_flags
; 
1163                 default: (void)dispatch_assume_zero(err
); break; 
1168 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry
) 
1170         if (!fd_entry
->guard_flags
) { 
1173         guardid_t guard 
= fd_entry
; 
1174         int err
, fd_flags 
= fd_entry
->orig_fd_flags
; 
1175         _dispatch_io_syscall_switch(err
, 
1176                 change_fdguard_np(fd_entry
->fd
, &guard
, fd_entry
->guard_flags
, NULL
, 0, 
1178                 default: (void)dispatch_assume_zero(err
); break; 
1183 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry
) { (void)fd_entry
; } 
1185 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry
) { (void)fd_entry
; } 
1186 #endif // DISPATCH_USE_GUARDED_FD 
1189 _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry
, const char *path
, 
1190                 int oflag
, mode_t mode
) { 
1191 #if DISPATCH_USE_GUARDED_FD 
1192         guardid_t guard 
= (uintptr_t)fd_entry
; 
1193         const unsigned int guard_flags 
= GUARD_CLOSE 
| GUARD_DUP 
| 
1194                         GUARD_SOCKET_IPC 
| GUARD_FILEPORT
; 
1195         int fd 
= guarded_open_np(path
, &guard
, guard_flags
, oflag 
| O_CLOEXEC
, 
1198                 fd_entry
->guard_flags 
= guard_flags
; 
1203         return open(path
, oflag
, mode
); 
1208 _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry
, int fd
) { 
1209 #if DISPATCH_USE_GUARDED_FD 
1210         if (fd_entry
->guard_flags
) { 
1211                 guardid_t guard 
= (uintptr_t)fd_entry
; 
1212                 return guarded_close_np(fd
, &guard
); 
1222 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
) { 
1223         dispatch_suspend(fd_entry
->close_queue
); 
1227 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
) { 
1228         dispatch_resume(fd_entry
->close_queue
); 
1232 _dispatch_fd_entry_init_async(dispatch_fd_t fd
, 
1233                 dispatch_fd_entry_init_callback_t completion_callback
) 
1235         static dispatch_once_t _dispatch_io_fds_lockq_pred
; 
1236         dispatch_once_f(&_dispatch_io_fds_lockq_pred
, NULL
, 
1237                         _dispatch_io_fds_lockq_init
); 
1238         dispatch_async(_dispatch_io_fds_lockq
, ^{ 
1239                 _dispatch_fd_debug("fd entry init", fd
); 
1240                 dispatch_fd_entry_t fd_entry 
= NULL
; 
1241                 // Check to see if there is an existing entry for the given fd 
1242                 uintptr_t hash 
= DIO_HASH(fd
); 
1243                 TAILQ_FOREACH(fd_entry
, &_dispatch_io_fds
[hash
], fd_list
) { 
1244                         if (fd_entry
->fd 
== fd
) { 
1245                                 // Retain the fd_entry to ensure it cannot go away until the 
1246                                 // stat() has completed 
1247                                 _dispatch_fd_entry_retain(fd_entry
); 
1252                         // If we did not find an existing entry, create one 
1253                         fd_entry 
= _dispatch_fd_entry_create_with_fd(fd
, hash
); 
1255                 dispatch_async(fd_entry
->barrier_queue
, ^{ 
1256                         _dispatch_fd_debug("fd entry init completion", fd
); 
1257                         completion_callback(fd_entry
); 
1258                         // stat() is complete, release reference to fd_entry 
1259                         _dispatch_fd_entry_release(fd_entry
); 
1264 static dispatch_fd_entry_t
 
1265 _dispatch_fd_entry_create(dispatch_queue_t q
) 
1267         dispatch_fd_entry_t fd_entry
; 
1268         fd_entry 
= _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s
)); 
1269         fd_entry
->close_queue 
= dispatch_queue_create( 
1270                         "com.apple.libdispatch-io.closeq", NULL
); 
1271         // Use target queue to ensure that no concurrent lookups are going on when 
1272         // the close queue is running 
1273         fd_entry
->close_queue
->do_targetq 
= q
; 
1274         _dispatch_retain(q
); 
1275         // Suspend the cleanup queue until closing 
1276         _dispatch_fd_entry_retain(fd_entry
); 
1280 static dispatch_fd_entry_t
 
1281 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
, uintptr_t hash
) 
1283         // On fds lock queue 
1284         _dispatch_fd_debug("fd entry create", fd
); 
1285         dispatch_fd_entry_t fd_entry 
= _dispatch_fd_entry_create( 
1286                         _dispatch_io_fds_lockq
); 
1288         TAILQ_INSERT_TAIL(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
); 
1289         fd_entry
->barrier_queue 
= dispatch_queue_create( 
1290                         "com.apple.libdispatch-io.barrierq", NULL
); 
1291         fd_entry
->barrier_group 
= dispatch_group_create(); 
1292         dispatch_async(fd_entry
->barrier_queue
, ^{ 
1293                 _dispatch_fd_debug("fd entry stat", fd
); 
1294                 int err
, orig_flags
, orig_nosigpipe 
= -1; 
1296                 _dispatch_io_syscall_switch(err
, 
1298                         default: fd_entry
->err 
= err
; return; 
1300                 fd_entry
->stat
.dev 
= st
.st_dev
; 
1301                 fd_entry
->stat
.mode 
= st
.st_mode
; 
1302                 _dispatch_fd_entry_guard(fd_entry
); 
1303                 _dispatch_io_syscall_switch(err
, 
1304                         orig_flags 
= fcntl(fd
, F_GETFL
), 
1305                         default: (void)dispatch_assume_zero(err
); break; 
1307 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123 
1308                 if (S_ISFIFO(st
.st_mode
)) { 
1309                         _dispatch_io_syscall_switch(err
, 
1310                                 orig_nosigpipe 
= fcntl(fd
, F_GETNOSIGPIPE
), 
1311                                 default: (void)dispatch_assume_zero(err
); break; 
1313                         if (orig_nosigpipe 
!= -1) { 
1314                                 _dispatch_io_syscall_switch(err
, 
1315                                         orig_nosigpipe 
= fcntl(fd
, F_SETNOSIGPIPE
, 1), 
1317                                                 orig_nosigpipe 
= -1; 
1318                                                 (void)dispatch_assume_zero(err
); 
1324                 if (S_ISREG(st
.st_mode
)) { 
1325                         if (orig_flags 
!= -1) { 
1326                                 _dispatch_io_syscall_switch(err
, 
1327                                         fcntl(fd
, F_SETFL
, orig_flags 
& ~O_NONBLOCK
), 
1330                                                 (void)dispatch_assume_zero(err
); 
1334                         int32_t dev 
= major(st
.st_dev
); 
1335                         // We have to get the disk on the global dev queue. The 
1336                         // barrier queue cannot continue until that is complete 
1337                         dispatch_suspend(fd_entry
->barrier_queue
); 
1338                         dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
, 
1339                                         _dispatch_io_devs_lockq_init
); 
1340                         dispatch_async(_dispatch_io_devs_lockq
, ^{ 
1341                                 _dispatch_disk_init(fd_entry
, dev
); 
1342                                 dispatch_resume(fd_entry
->barrier_queue
); 
1345                         if (orig_flags 
!= -1) { 
1346                                 _dispatch_io_syscall_switch(err
, 
1347                                         fcntl(fd
, F_SETFL
, orig_flags 
| O_NONBLOCK
), 
1350                                                 (void)dispatch_assume_zero(err
); 
1354                         _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue( 
1355                                         DISPATCH_QUEUE_PRIORITY_DEFAULT
, false)); 
1357                 fd_entry
->orig_flags 
= orig_flags
; 
1358                 fd_entry
->orig_nosigpipe 
= orig_nosigpipe
; 
1360         // This is the first item run when the close queue is resumed, indicating 
1361         // that all channels associated with this entry have been closed and that 
1362         // all operations associated with this entry have been freed 
1363         dispatch_async(fd_entry
->close_queue
, ^{ 
1364                 if (!fd_entry
->disk
) { 
1365                         _dispatch_fd_debug("close queue fd_entry cleanup", fd
); 
1366                         dispatch_op_direction_t dir
; 
1367                         for (dir 
= 0; dir 
< DOP_DIR_MAX
; dir
++) { 
1368                                 _dispatch_stream_dispose(fd_entry
, dir
); 
1371                         dispatch_disk_t disk 
= fd_entry
->disk
; 
1372                         dispatch_async(_dispatch_io_devs_lockq
, ^{ 
1373                                 _dispatch_release(disk
); 
1376                 // Remove this entry from the global fd list 
1377                 TAILQ_REMOVE(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
); 
1379         // If there was a source associated with this stream, disposing of the 
1380         // source cancels it and suspends the close queue. Freeing the fd_entry 
1381         // structure must happen after the source cancel handler has finished 
1382         dispatch_async(fd_entry
->close_queue
, ^{ 
1383                 _dispatch_fd_debug("close queue release", fd
); 
1384                 dispatch_release(fd_entry
->close_queue
); 
1385                 _dispatch_fd_debug("barrier queue release", fd
); 
1386                 dispatch_release(fd_entry
->barrier_queue
); 
1387                 _dispatch_fd_debug("barrier group release", fd
); 
1388                 dispatch_release(fd_entry
->barrier_group
); 
1389                 if (fd_entry
->orig_flags 
!= -1) { 
1390                         _dispatch_io_syscall( 
1391                                 fcntl(fd
, F_SETFL
, fd_entry
->orig_flags
) 
1394 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123 
1395                 if (fd_entry
->orig_nosigpipe 
!= -1) { 
1396                         _dispatch_io_syscall( 
1397                                 fcntl(fd
, F_SETNOSIGPIPE
, fd_entry
->orig_nosigpipe
) 
1401                 _dispatch_fd_entry_unguard(fd_entry
); 
1402                 if (fd_entry
->convenience_channel
) { 
1403                         fd_entry
->convenience_channel
->fd_entry 
= NULL
; 
1404                         dispatch_release(fd_entry
->convenience_channel
); 
1411 static dispatch_fd_entry_t
 
1412 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data
, 
1413                 dev_t dev
, mode_t mode
) 
1415         // On devs lock queue 
1416         _dispatch_fd_debug("fd entry create with path %s", -1, path_data
->path
); 
1417         dispatch_fd_entry_t fd_entry 
= _dispatch_fd_entry_create( 
1418                         path_data
->channel
->queue
); 
1419         if (S_ISREG(mode
)) { 
1420                 _dispatch_disk_init(fd_entry
, major(dev
)); 
1422                 _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue( 
1423                                 DISPATCH_QUEUE_PRIORITY_DEFAULT
, false)); 
1426         fd_entry
->orig_flags 
= -1; 
1427         fd_entry
->path_data 
= path_data
; 
1428         fd_entry
->stat
.dev 
= dev
; 
1429         fd_entry
->stat
.mode 
= mode
; 
1430         fd_entry
->barrier_queue 
= dispatch_queue_create( 
1431                         "com.apple.libdispatch-io.barrierq", NULL
); 
1432         fd_entry
->barrier_group 
= dispatch_group_create(); 
1433         // This is the first item run when the close queue is resumed, indicating 
1434         // that the channel associated with this entry has been closed and that 
1435         // all operations associated with this entry have been freed 
1436         dispatch_async(fd_entry
->close_queue
, ^{ 
1437                 _dispatch_fd_debug("close queue fd_entry cleanup", -1); 
1438                 if (!fd_entry
->disk
) { 
1439                         dispatch_op_direction_t dir
; 
1440                         for (dir 
= 0; dir 
< DOP_DIR_MAX
; dir
++) { 
1441                                 _dispatch_stream_dispose(fd_entry
, dir
); 
1444                 if (fd_entry
->fd 
!= -1) { 
1445                         _dispatch_fd_entry_guarded_close(fd_entry
, fd_entry
->fd
); 
1447                 if (fd_entry
->path_data
->channel
) { 
1448                         // If associated channel has not been released yet, mark it as 
1449                         // no longer having an fd_entry (for stop after close). 
1450                         // It is safe to modify channel since we are on close_queue with 
1451                         // target queue the channel queue 
1452                         fd_entry
->path_data
->channel
->fd_entry 
= NULL
; 
1455         dispatch_async(fd_entry
->close_queue
, ^{ 
1456                 _dispatch_fd_debug("close queue release", -1); 
1457                 dispatch_release(fd_entry
->close_queue
); 
1458                 dispatch_release(fd_entry
->barrier_queue
); 
1459                 dispatch_release(fd_entry
->barrier_group
); 
1460                 free(fd_entry
->path_data
); 
1467 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
, dispatch_io_t channel
) 
1469         if (!(fd_entry
->fd 
== -1 && fd_entry
->path_data
)) { 
1472         if (fd_entry
->err
) { 
1473                 return fd_entry
->err
; 
1476         int oflag 
= fd_entry
->disk 
? fd_entry
->path_data
->oflag 
& ~O_NONBLOCK 
: 
1477                         fd_entry
->path_data
->oflag 
| O_NONBLOCK
; 
1479         fd 
= _dispatch_fd_entry_guarded_open(fd_entry
, fd_entry
->path_data
->path
, 
1480                         oflag
, fd_entry
->path_data
->mode
); 
1486                 (void)dispatch_atomic_cmpxchg2o(fd_entry
, err
, 0, err
, relaxed
); 
1489         if (!dispatch_atomic_cmpxchg2o(fd_entry
, fd
, -1, fd
, relaxed
)) { 
1490                 // Lost the race with another open 
1491                 _dispatch_fd_entry_guarded_close(fd_entry
, fd
); 
1493                 channel
->fd_actual 
= fd
; 
1495         _dispatch_object_debug(channel
, "%s", __func__
); 
1500 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
, 
1501                 dispatch_io_t channel
) 
1503         if (fd_entry
->disk
) { 
1505                         _dispatch_retain(channel
); 
1507                 _dispatch_fd_entry_retain(fd_entry
); 
1508                 dispatch_async(fd_entry
->disk
->pick_queue
, ^{ 
1509                         _dispatch_disk_cleanup_operations(fd_entry
->disk
, channel
); 
1510                         _dispatch_fd_entry_release(fd_entry
); 
1512                                 _dispatch_release(channel
); 
1516                 dispatch_op_direction_t direction
; 
1517                 for (direction 
= 0; direction 
< DOP_DIR_MAX
; direction
++) { 
1518                         dispatch_stream_t stream 
= fd_entry
->streams
[direction
]; 
1523                                 _dispatch_retain(channel
); 
1525                         _dispatch_fd_entry_retain(fd_entry
); 
1526                         dispatch_async(stream
->dq
, ^{ 
1527                                 _dispatch_stream_cleanup_operations(stream
, channel
); 
1528                                 _dispatch_fd_entry_release(fd_entry
); 
1530                                         _dispatch_release(channel
); 
1538 #pragma mark dispatch_stream_t/dispatch_disk_t 
1541 _dispatch_stream_init(dispatch_fd_entry_t fd_entry
, dispatch_queue_t tq
) 
1543         dispatch_op_direction_t direction
; 
1544         for (direction 
= 0; direction 
< DOP_DIR_MAX
; direction
++) { 
1545                 dispatch_stream_t stream
; 
1546                 stream 
= _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s
)); 
1547                 stream
->dq 
= dispatch_queue_create("com.apple.libdispatch-io.streamq", 
1549                 dispatch_set_context(stream
->dq
, stream
); 
1550                 _dispatch_retain(tq
); 
1551                 stream
->dq
->do_targetq 
= tq
; 
1552                 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_RANDOM
]); 
1553                 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_STREAM
]); 
1554                 fd_entry
->streams
[direction
] = stream
; 
1559 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
, 
1560                 dispatch_op_direction_t direction
) 
1563         dispatch_stream_t stream 
= fd_entry
->streams
[direction
]; 
1567         dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
])); 
1568         dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])); 
1569         if (stream
->source
) { 
1570                 // Balanced by source cancel handler: 
1571                 _dispatch_fd_entry_retain(fd_entry
); 
1572                 dispatch_source_cancel(stream
->source
); 
1573                 dispatch_resume(stream
->source
); 
1574                 dispatch_release(stream
->source
); 
1576         dispatch_set_context(stream
->dq
, NULL
); 
1577         dispatch_release(stream
->dq
); 
1582 _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
) 
1584         // On devs lock queue 
1585         dispatch_disk_t disk
; 
1586         // Check to see if there is an existing entry for the given device 
1587         uintptr_t hash 
= DIO_HASH(dev
); 
1588         TAILQ_FOREACH(disk
, &_dispatch_io_devs
[hash
], disk_list
) { 
1589                 if (disk
->dev 
== dev
) { 
1590                         _dispatch_retain(disk
); 
1594         // Otherwise create a new entry 
1595         size_t pending_reqs_depth 
= dispatch_io_defaults
.max_pending_io_reqs
; 
1596         disk 
= _dispatch_alloc(DISPATCH_VTABLE(disk
), 
1597                         sizeof(struct dispatch_disk_s
) + 
1598                         (pending_reqs_depth 
* sizeof(dispatch_operation_t
))); 
1599         disk
->do_next 
= DISPATCH_OBJECT_LISTLESS
; 
1600         disk
->do_xref_cnt 
= -1; 
1601         disk
->advise_list_depth 
= pending_reqs_depth
; 
1602         disk
->do_targetq 
= _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT
, 
1605         TAILQ_INIT(&disk
->operations
); 
1606         disk
->cur_rq 
= TAILQ_FIRST(&disk
->operations
); 
1608         snprintf(label
, sizeof(label
), "com.apple.libdispatch-io.deviceq.%d", dev
); 
1609         disk
->pick_queue 
= dispatch_queue_create(label
, NULL
); 
1610         TAILQ_INSERT_TAIL(&_dispatch_io_devs
[hash
], disk
, disk_list
); 
1612         fd_entry
->disk 
= disk
; 
1613         TAILQ_INIT(&fd_entry
->stream_ops
); 
1617 _dispatch_disk_dispose(dispatch_disk_t disk
) 
1619         uintptr_t hash 
= DIO_HASH(disk
->dev
); 
1620         TAILQ_REMOVE(&_dispatch_io_devs
[hash
], disk
, disk_list
); 
1621         dispatch_assert(TAILQ_EMPTY(&disk
->operations
)); 
1623         for (i
=0; i
<disk
->advise_list_depth
; ++i
) { 
1624                 dispatch_assert(!disk
->advise_list
[i
]); 
1626         dispatch_release(disk
->pick_queue
); 
1630 #pragma mark dispatch_stream_operations/dispatch_disk_operations 
1633 _dispatch_stream_operation_avail(dispatch_stream_t stream
) 
1635         return  !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) || 
1636                         !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
])); 
1640 _dispatch_stream_enqueue_operation(dispatch_stream_t stream
, 
1641                 dispatch_operation_t op
, dispatch_data_t data
) 
1643         if (!_dispatch_operation_should_enqueue(op
, stream
->dq
, data
)) { 
1646         _dispatch_object_debug(op
, "%s", __func__
); 
1647         bool no_ops 
= !_dispatch_stream_operation_avail(stream
); 
1648         TAILQ_INSERT_TAIL(&stream
->operations
[op
->params
.type
], op
, operation_list
); 
1650                 dispatch_async_f(stream
->dq
, stream
->dq
, 
1651                                 _dispatch_stream_queue_handler
); 
1656 _dispatch_disk_enqueue_operation(dispatch_disk_t disk
, dispatch_operation_t op
, 
1657                 dispatch_data_t data
) 
1659         if (!_dispatch_operation_should_enqueue(op
, disk
->pick_queue
, data
)) { 
1662         _dispatch_object_debug(op
, "%s", __func__
); 
1663         if (op
->params
.type 
== DISPATCH_IO_STREAM
) { 
1664                 if (TAILQ_EMPTY(&op
->fd_entry
->stream_ops
)) { 
1665                         TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
); 
1667                 TAILQ_INSERT_TAIL(&op
->fd_entry
->stream_ops
, op
, stream_list
); 
1669                 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
); 
1671         _dispatch_disk_handler(disk
); 
1675 _dispatch_stream_complete_operation(dispatch_stream_t stream
, 
1676                 dispatch_operation_t op
) 
1679         _dispatch_object_debug(op
, "%s", __func__
); 
1680         _dispatch_fd_debug("complete operation", op
->fd_entry
->fd
); 
1681         TAILQ_REMOVE(&stream
->operations
[op
->params
.type
], op
, operation_list
); 
1682         if (op 
== stream
->op
) { 
1686                 dispatch_source_cancel(op
->timer
); 
1688         // Final release will deliver any pending data 
1689         _dispatch_release(op
); 
1693 _dispatch_disk_complete_operation(dispatch_disk_t disk
, dispatch_operation_t op
) 
1696         _dispatch_object_debug(op
, "%s", __func__
); 
1697         _dispatch_fd_debug("complete operation", op
->fd_entry
->fd
); 
1698         // Current request is always the last op returned 
1699         if (disk
->cur_rq 
== op
) { 
1700                 disk
->cur_rq 
= TAILQ_PREV(op
, dispatch_disk_operations_s
, 
1703         if (op
->params
.type 
== DISPATCH_IO_STREAM
) { 
1704                 // Check if there are other pending stream operations behind it 
1705                 dispatch_operation_t op_next 
= TAILQ_NEXT(op
, stream_list
); 
1706                 TAILQ_REMOVE(&op
->fd_entry
->stream_ops
, op
, stream_list
); 
1708                         TAILQ_INSERT_TAIL(&disk
->operations
, op_next
, operation_list
); 
1711         TAILQ_REMOVE(&disk
->operations
, op
, operation_list
); 
1713                 dispatch_source_cancel(op
->timer
); 
1715         // Final release will deliver any pending data 
1716         _dispatch_release(op
); 
1719 static dispatch_operation_t
 
1720 _dispatch_stream_pick_next_operation(dispatch_stream_t stream
, 
1721                 dispatch_operation_t op
) 
1725                 // On the first run through, pick the first operation 
1726                 if (!_dispatch_stream_operation_avail(stream
)) { 
1729                 if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
])) { 
1730                         op 
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_STREAM
]); 
1731                 } else if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) { 
1732                         op 
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]); 
1736         if (op
->params
.type 
== DISPATCH_IO_STREAM
) { 
1737                 // Stream operations need to be serialized so continue the current 
1738                 // operation until it is finished 
1741         // Get the next random operation (round-robin) 
1742         if (op
->params
.type 
== DISPATCH_IO_RANDOM
) { 
1743                 op 
= TAILQ_NEXT(op
, operation_list
); 
1745                         op 
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]); 
1752 static dispatch_operation_t
 
1753 _dispatch_disk_pick_next_operation(dispatch_disk_t disk
) 
1756         dispatch_operation_t op
; 
1757         if (!TAILQ_EMPTY(&disk
->operations
)) { 
1758                 if (disk
->cur_rq 
== NULL
) { 
1759                         op 
= TAILQ_FIRST(&disk
->operations
); 
1763                                 op 
= TAILQ_NEXT(op
, operation_list
); 
1765                                         op 
= TAILQ_FIRST(&disk
->operations
); 
1767                                 // TODO: more involved picking algorithm rdar://problem/8780312 
1768                         } while (op
->active 
&& op 
!= disk
->cur_rq
); 
1779 _dispatch_stream_cleanup_operations(dispatch_stream_t stream
, 
1780                 dispatch_io_t channel
) 
1783         dispatch_operation_t op
, tmp
; 
1784         typeof(*stream
->operations
) *operations
; 
1785         operations 
= &stream
->operations
[DISPATCH_IO_RANDOM
]; 
1786         TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) { 
1787                 if (!channel 
|| op
->channel 
== channel
) { 
1788                         _dispatch_stream_complete_operation(stream
, op
); 
1791         operations 
= &stream
->operations
[DISPATCH_IO_STREAM
]; 
1792         TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) { 
1793                 if (!channel 
|| op
->channel 
== channel
) { 
1794                         _dispatch_stream_complete_operation(stream
, op
); 
1797         if (stream
->source_running 
&& !_dispatch_stream_operation_avail(stream
)) { 
1798                 dispatch_suspend(stream
->source
); 
1799                 stream
->source_running 
= false; 
1804 _dispatch_disk_cleanup_operations(dispatch_disk_t disk
, dispatch_io_t channel
) 
1807         dispatch_operation_t op
, tmp
; 
1808         TAILQ_FOREACH_SAFE(op
, &disk
->operations
, operation_list
, tmp
) { 
1809                 if (!channel 
|| op
->channel 
== channel
) { 
1810                         _dispatch_disk_complete_operation(disk
, op
); 
1816 #pragma mark dispatch_stream_handler/dispatch_disk_handler 
1818 static dispatch_source_t
 
1819 _dispatch_stream_source(dispatch_stream_t stream
, dispatch_operation_t op
) 
1822         if (stream
->source
) { 
1823                 return stream
->source
; 
1825         dispatch_fd_t fd 
= op
->fd_entry
->fd
; 
1826         _dispatch_fd_debug("stream source create", fd
); 
1827         dispatch_source_t source 
= NULL
; 
1828         if (op
->direction 
== DOP_DIR_READ
) { 
1829                 source 
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, 
1830                                 (uintptr_t)fd
, 0, stream
->dq
); 
1831         } else if (op
->direction 
== DOP_DIR_WRITE
) { 
1832                 source 
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
, 
1833                                 (uintptr_t)fd
, 0, stream
->dq
); 
1835                 dispatch_assert(op
->direction 
< DOP_DIR_MAX
); 
1838         dispatch_set_context(source
, stream
); 
1839         dispatch_source_set_event_handler_f(source
, 
1840                         _dispatch_stream_source_handler
); 
1841         // Close queue must not run user cleanup handlers until sources are fully 
1843         dispatch_queue_t close_queue 
= op
->fd_entry
->close_queue
; 
1844         dispatch_source_set_cancel_handler(source
, ^{ 
1845                 _dispatch_fd_debug("stream source cancel", fd
); 
1846                 dispatch_resume(close_queue
); 
1848         stream
->source 
= source
; 
1849         return stream
->source
; 
1853 _dispatch_stream_source_handler(void *ctx
) 
1856         dispatch_stream_t stream 
= (dispatch_stream_t
)ctx
; 
1857         dispatch_suspend(stream
->source
); 
1858         stream
->source_running 
= false; 
1859         return _dispatch_stream_handler(stream
); 
1863 _dispatch_stream_queue_handler(void *ctx
) 
1866         dispatch_stream_t stream 
= (dispatch_stream_t
)dispatch_get_context(ctx
); 
1868                 // _dispatch_stream_dispose has been called 
1871         return _dispatch_stream_handler(stream
); 
1875 _dispatch_stream_handler(void *ctx
) 
1878         dispatch_stream_t stream 
= (dispatch_stream_t
)ctx
; 
1879         dispatch_operation_t op
; 
1881         op 
= _dispatch_stream_pick_next_operation(stream
, stream
->op
); 
1883                 _dispatch_debug("no operation found: stream %p", stream
); 
1886         int err 
= _dispatch_io_get_error(op
, NULL
, true); 
1889                 _dispatch_stream_complete_operation(stream
, op
); 
1893         _dispatch_fd_debug("stream handler", op
->fd_entry
->fd
); 
1894         dispatch_fd_entry_t fd_entry 
= op
->fd_entry
; 
1895         _dispatch_fd_entry_retain(fd_entry
); 
1896         // For performance analysis 
1897         if (!op
->total 
&& dispatch_io_defaults
.initial_delivery
) { 
1898                 // Empty delivery to signal the start of the operation 
1899                 _dispatch_fd_debug("initial delivery", op
->fd_entry
->fd
); 
1900                 _dispatch_operation_deliver_data(op
, DOP_DELIVER
); 
1902         // TODO: perform on the operation target queue to get correct priority 
1903         int result 
= _dispatch_operation_perform(op
); 
1904         dispatch_op_flags_t flags 
= ~0u; 
1906         case DISPATCH_OP_DELIVER
: 
1907                 flags 
= DOP_DEFAULT
; 
1909         case DISPATCH_OP_DELIVER_AND_COMPLETE
: 
1910                 flags 
= (flags 
!= DOP_DEFAULT
) ? DOP_DELIVER 
| DOP_NO_EMPTY 
: 
1912                 _dispatch_operation_deliver_data(op
, flags
); 
1914         case DISPATCH_OP_COMPLETE
: 
1915                 if (flags 
!= DOP_DEFAULT
) { 
1916                         _dispatch_stream_complete_operation(stream
, op
); 
1918                 if (_dispatch_stream_operation_avail(stream
)) { 
1919                         dispatch_async_f(stream
->dq
, stream
->dq
, 
1920                                         _dispatch_stream_queue_handler
); 
1923         case DISPATCH_OP_COMPLETE_RESUME
: 
1924                 _dispatch_stream_complete_operation(stream
, op
); 
1926         case DISPATCH_OP_RESUME
: 
1927                 if (_dispatch_stream_operation_avail(stream
)) { 
1928                         stream
->source_running 
= true; 
1929                         dispatch_resume(_dispatch_stream_source(stream
, op
)); 
1932         case DISPATCH_OP_ERR
: 
1933                 _dispatch_stream_cleanup_operations(stream
, op
->channel
); 
1935         case DISPATCH_OP_FD_ERR
: 
1936                 _dispatch_fd_entry_retain(fd_entry
); 
1937                 dispatch_async(fd_entry
->barrier_queue
, ^{ 
1938                         _dispatch_fd_entry_cleanup_operations(fd_entry
, NULL
); 
1939                         _dispatch_fd_entry_release(fd_entry
); 
1945         _dispatch_fd_entry_release(fd_entry
); 
1950 _dispatch_disk_handler(void *ctx
) 
1953         dispatch_disk_t disk 
= (dispatch_disk_t
)ctx
; 
1954         if (disk
->io_active
) { 
1957         _dispatch_fd_debug("disk handler", -1); 
1958         dispatch_operation_t op
; 
1959         size_t i 
= disk
->free_idx
, j 
= disk
->req_idx
; 
1961                 j 
+= disk
->advise_list_depth
; 
1964                 if ((!disk
->advise_list
[i%disk
->advise_list_depth
]) && 
1965                                 (op 
= _dispatch_disk_pick_next_operation(disk
))) { 
1966                         int err 
= _dispatch_io_get_error(op
, NULL
, true); 
1969                                 _dispatch_disk_complete_operation(disk
, op
); 
1972                         _dispatch_retain(op
); 
1973                         disk
->advise_list
[i%disk
->advise_list_depth
] = op
; 
1975                         _dispatch_object_debug(op
, "%s", __func__
); 
1977                         // No more operations to get 
1982         disk
->free_idx 
= (i%disk
->advise_list_depth
); 
1983         op 
= disk
->advise_list
[disk
->req_idx
]; 
1985                 disk
->io_active 
= true; 
1986                 dispatch_async_f(op
->do_targetq
, disk
, _dispatch_disk_perform
); 
1991 _dispatch_disk_perform(void *ctxt
) 
1993         dispatch_disk_t disk 
= ctxt
; 
1994         size_t chunk_size 
= dispatch_io_defaults
.chunk_pages 
* PAGE_SIZE
; 
1995         _dispatch_fd_debug("disk perform", -1); 
1996         dispatch_operation_t op
; 
1997         size_t i 
= disk
->advise_idx
, j 
= disk
->free_idx
; 
1999                 j 
+= disk
->advise_list_depth
; 
2002                 op 
= disk
->advise_list
[i%disk
->advise_list_depth
]; 
2004                         // Nothing more to advise, must be at free_idx 
2005                         dispatch_assert(i%disk
->advise_list_depth 
== disk
->free_idx
); 
2008                 if (op
->direction 
== DOP_DIR_WRITE
) { 
2009                         // TODO: preallocate writes ? rdar://problem/9032172 
2012                 if (op
->fd_entry
->fd 
== -1 && _dispatch_fd_entry_open(op
->fd_entry
, 
2016                 // For performance analysis 
2017                 if (!op
->total 
&& dispatch_io_defaults
.initial_delivery
) { 
2018                         // Empty delivery to signal the start of the operation 
2019                         _dispatch_fd_debug("initial delivery", op
->fd_entry
->fd
); 
2020                         _dispatch_operation_deliver_data(op
, DOP_DELIVER
); 
2022                 // Advise two chunks if the list only has one element and this is the 
2023                 // first advise on the operation 
2024                 if ((j
-i
) == 1 && !disk
->advise_list
[disk
->free_idx
] && 
2025                                 !op
->advise_offset
) { 
2028                 _dispatch_operation_advise(op
, chunk_size
); 
2030         disk
->advise_idx 
= i%disk
->advise_list_depth
; 
2031         op 
= disk
->advise_list
[disk
->req_idx
]; 
2032         int result 
= _dispatch_operation_perform(op
); 
2033         disk
->advise_list
[disk
->req_idx
] = NULL
; 
2034         disk
->req_idx 
= (++disk
->req_idx
)%disk
->advise_list_depth
; 
2035         dispatch_async(disk
->pick_queue
, ^{ 
2037                 case DISPATCH_OP_DELIVER
: 
2038                         _dispatch_operation_deliver_data(op
, DOP_DEFAULT
); 
2040                 case DISPATCH_OP_COMPLETE
: 
2041                         _dispatch_disk_complete_operation(disk
, op
); 
2043                 case DISPATCH_OP_DELIVER_AND_COMPLETE
: 
2044                         _dispatch_operation_deliver_data(op
, DOP_DELIVER 
| DOP_NO_EMPTY
); 
2045                         _dispatch_disk_complete_operation(disk
, op
); 
2047                 case DISPATCH_OP_ERR
: 
2048                         _dispatch_disk_cleanup_operations(disk
, op
->channel
); 
2050                 case DISPATCH_OP_FD_ERR
: 
2051                         _dispatch_disk_cleanup_operations(disk
, NULL
); 
2054                         dispatch_assert(result
); 
2058                 disk
->io_active 
= false; 
2059                 _dispatch_disk_handler(disk
); 
2060                 // Balancing the retain in _dispatch_disk_handler. Note that op must be 
2061                 // released at the very end, since it might hold the last reference to 
2063                 _dispatch_release(op
); 
2068 #pragma mark dispatch_operation_perform 
2071 _dispatch_operation_advise(dispatch_operation_t op
, size_t chunk_size
) 
2074         struct radvisory advise
; 
2075         // No point in issuing a read advise for the next chunk if we are already 
2076         // a chunk ahead from reading the bytes 
2077         if (op
->advise_offset 
> (off_t
)(((size_t)op
->offset 
+ op
->total
) + 
2078                         chunk_size 
+ PAGE_SIZE
)) { 
2081         _dispatch_object_debug(op
, "%s", __func__
); 
2082         advise
.ra_count 
= (int)chunk_size
; 
2083         if (!op
->advise_offset
) { 
2084                 op
->advise_offset 
= op
->offset
; 
2085                 // If this is the first time through, align the advised range to a 
2087                 size_t pg_fraction 
= ((size_t)op
->offset 
+ chunk_size
) % PAGE_SIZE
; 
2088                 advise
.ra_count 
+= (int)(pg_fraction 
? PAGE_SIZE 
- pg_fraction 
: 0); 
2090         advise
.ra_offset 
= op
->advise_offset
; 
2091         op
->advise_offset 
+= advise
.ra_count
; 
2092         _dispatch_io_syscall_switch(err
, 
2093                 fcntl(op
->fd_entry
->fd
, F_RDADVISE
, &advise
), 
2094                 case EFBIG
: break; // advised past the end of the file rdar://10415691 
2095                 case ENOTSUP
: break; // not all FS support radvise rdar://13484629 
2096                 // TODO: set disk status on error 
2097                 default: (void)dispatch_assume_zero(err
); break; 
2102 _dispatch_operation_perform(dispatch_operation_t op
) 
2104         int err 
= _dispatch_io_get_error(op
, NULL
, true); 
2108         _dispatch_object_debug(op
, "%s", __func__
); 
2110                 size_t max_buf_siz 
= op
->params
.high
; 
2111                 size_t chunk_siz 
= dispatch_io_defaults
.chunk_pages 
* PAGE_SIZE
; 
2112                 if (op
->direction 
== DOP_DIR_READ
) { 
2113                         // If necessary, create a buffer for the ongoing operation, large 
2114                         // enough to fit chunk_pages but at most high-water 
2115                         size_t data_siz 
= dispatch_data_get_size(op
->data
); 
2117                                 dispatch_assert(data_siz 
< max_buf_siz
); 
2118                                 max_buf_siz 
-= data_siz
; 
2120                         if (max_buf_siz 
> chunk_siz
) { 
2121                                 max_buf_siz 
= chunk_siz
; 
2123                         if (op
->length 
< SIZE_MAX
) { 
2124                                 op
->buf_siz 
= op
->length 
- op
->total
; 
2125                                 if (op
->buf_siz 
> max_buf_siz
) { 
2126                                         op
->buf_siz 
= max_buf_siz
; 
2129                                 op
->buf_siz 
= max_buf_siz
; 
2131                         op
->buf 
= valloc(op
->buf_siz
); 
2132                         _dispatch_fd_debug("buffer allocated", op
->fd_entry
->fd
); 
2133                 } else if (op
->direction 
== DOP_DIR_WRITE
) { 
2134                         // Always write the first data piece, if that is smaller than a 
2135                         // chunk, accumulate further data pieces until chunk size is reached 
2136                         if (chunk_siz 
> max_buf_siz
) { 
2137                                 chunk_siz 
= max_buf_siz
; 
2140                         dispatch_data_apply(op
->data
, 
2141                                         ^(dispatch_data_t region DISPATCH_UNUSED
, 
2142                                         size_t offset DISPATCH_UNUSED
, 
2143                                         const void* buf DISPATCH_UNUSED
, size_t len
) { 
2144                                 size_t siz 
= op
->buf_siz 
+ len
; 
2145                                 if (!op
->buf_siz 
|| siz 
<= chunk_siz
) { 
2148                                 return (bool)(siz 
< chunk_siz
); 
2150                         if (op
->buf_siz 
> max_buf_siz
) { 
2151                                 op
->buf_siz 
= max_buf_siz
; 
2154                         d 
= dispatch_data_create_subrange(op
->data
, 0, op
->buf_siz
); 
2155                         op
->buf_data 
= dispatch_data_create_map(d
, (const void**)&op
->buf
, 
2157                         _dispatch_io_data_release(d
); 
2158                         _dispatch_fd_debug("buffer mapped", op
->fd_entry
->fd
); 
2161         if (op
->fd_entry
->fd 
== -1) { 
2162                 err 
= _dispatch_fd_entry_open(op
->fd_entry
, op
->channel
); 
2167         void *buf 
= op
->buf 
+ op
->buf_len
; 
2168         size_t len 
= op
->buf_siz 
- op
->buf_len
; 
2169         off_t off 
= (off_t
)((size_t)op
->offset 
+ op
->total
); 
2170         ssize_t processed 
= -1; 
2172         if (op
->direction 
== DOP_DIR_READ
) { 
2173                 if (op
->params
.type 
== DISPATCH_IO_STREAM
) { 
2174                         processed 
= read(op
->fd_entry
->fd
, buf
, len
); 
2175                 } else if (op
->params
.type 
== DISPATCH_IO_RANDOM
) { 
2176                         processed 
= pread(op
->fd_entry
->fd
, buf
, len
, off
); 
2178         } else if (op
->direction 
== DOP_DIR_WRITE
) { 
2179                 if (op
->params
.type 
== DISPATCH_IO_STREAM
) { 
2180                         processed 
= write(op
->fd_entry
->fd
, buf
, len
); 
2181                 } else if (op
->params
.type 
== DISPATCH_IO_RANDOM
) { 
2182                         processed 
= pwrite(op
->fd_entry
->fd
, buf
, len
, off
); 
2185         // Encountered an error on the file descriptor 
2186         if (processed 
== -1) { 
2193         // EOF is indicated by two handler invocations 
2194         if (processed 
== 0) { 
2195                 _dispatch_fd_debug("EOF", op
->fd_entry
->fd
); 
2196                 return DISPATCH_OP_DELIVER_AND_COMPLETE
; 
2198         op
->buf_len 
+= (size_t)processed
; 
2199         op
->total 
+= (size_t)processed
; 
2200         if (op
->total 
== op
->length
) { 
2201                 // Finished processing all the bytes requested by the operation 
2202                 return DISPATCH_OP_COMPLETE
; 
2204                 // Deliver data only if we satisfy the filters 
2205                 return DISPATCH_OP_DELIVER
; 
2208         if (err 
== EAGAIN
) { 
2209                 // For disk based files with blocking I/O we should never get EAGAIN 
2210                 dispatch_assert(!op
->fd_entry
->disk
); 
2211                 _dispatch_fd_debug("EAGAIN %d", op
->fd_entry
->fd
, err
); 
2212                 if (op
->direction 
== DOP_DIR_READ 
&& op
->total 
&& 
2213                                 op
->channel 
== op
->fd_entry
->convenience_channel
) { 
2214                         // Convenience read with available data completes on EAGAIN 
2215                         return DISPATCH_OP_COMPLETE_RESUME
; 
2217                 return DISPATCH_OP_RESUME
; 
2222                 return DISPATCH_OP_ERR
; 
2224                 (void)dispatch_atomic_cmpxchg2o(op
->fd_entry
, err
, 0, err
, relaxed
); 
2225                 return DISPATCH_OP_FD_ERR
; 
2227                 return DISPATCH_OP_COMPLETE
; 
2232 _dispatch_operation_deliver_data(dispatch_operation_t op
, 
2233                 dispatch_op_flags_t flags
) 
2235         // Either called from stream resp. pick queue or when op is finalized 
2236         dispatch_data_t data 
= NULL
; 
2238         size_t undelivered 
= op
->undelivered 
+ op
->buf_len
; 
2239         bool deliver 
= (flags 
& (DOP_DELIVER
|DOP_DONE
)) || 
2240                         (op
->flags 
& DOP_DELIVER
); 
2241         op
->flags 
= DOP_DEFAULT
; 
2243                 // Don't deliver data until low water mark has been reached 
2244                 if (undelivered 
>= op
->params
.low
) { 
2246                 } else if (op
->buf_len 
< op
->buf_siz
) { 
2247                         // Request buffer is not yet used up 
2248                         _dispatch_fd_debug("buffer data", op
->fd_entry
->fd
); 
2253                 if (!err 
&& (op
->channel
->atomic_flags 
& DIO_STOPPED
)) { 
2258         // Deliver data or buffer used up 
2259         if (op
->direction 
== DOP_DIR_READ
) { 
2261                         void *buf 
= op
->buf
; 
2262                         data 
= dispatch_data_create(buf
, op
->buf_len
, NULL
, 
2263                                         DISPATCH_DATA_DESTRUCTOR_FREE
); 
2266                         dispatch_data_t d 
= dispatch_data_create_concat(op
->data
, data
); 
2267                         _dispatch_io_data_release(op
->data
); 
2268                         _dispatch_io_data_release(data
); 
2273                 op
->data 
= deliver 
? dispatch_data_empty 
: data
; 
2274         } else if (op
->direction 
== DOP_DIR_WRITE
) { 
2276                         data 
= dispatch_data_create_subrange(op
->data
, op
->buf_len
, 
2279                 if (op
->buf_data 
&& op
->buf_len 
== op
->buf_siz
) { 
2280                         _dispatch_io_data_release(op
->buf_data
); 
2281                         op
->buf_data 
= NULL
; 
2284                         // Trim newly written buffer from head of unwritten data 
2287                                 _dispatch_io_data_retain(data
); 
2290                                 d 
= dispatch_data_create_subrange(op
->data
, op
->buf_siz
, 
2293                         _dispatch_io_data_release(op
->data
); 
2297                 dispatch_assert(op
->direction 
< DOP_DIR_MAX
); 
2300         if (!deliver 
|| ((flags 
& DOP_NO_EMPTY
) && !dispatch_data_get_size(data
))) { 
2301                 op
->undelivered 
= undelivered
; 
2302                 _dispatch_fd_debug("buffer data", op
->fd_entry
->fd
); 
2305         op
->undelivered 
= 0; 
2306         _dispatch_object_debug(op
, "%s", __func__
); 
2307         _dispatch_fd_debug("deliver data", op
->fd_entry
->fd
); 
2308         dispatch_op_direction_t direction 
= op
->direction
; 
2309         dispatch_io_handler_t handler 
= op
->handler
; 
2310 #if DISPATCH_IO_DEBUG 
2311         int fd 
= op
->fd_entry
->fd
; 
2313         dispatch_fd_entry_t fd_entry 
= op
->fd_entry
; 
2314         _dispatch_fd_entry_retain(fd_entry
); 
2315         dispatch_io_t channel 
= op
->channel
; 
2316         _dispatch_retain(channel
); 
2317         // Note that data delivery may occur after the operation is freed 
2318         dispatch_async(op
->op_q
, ^{ 
2319                 bool done 
= (flags 
& DOP_DONE
); 
2320                 dispatch_data_t d 
= data
; 
2322                         if (direction 
== DOP_DIR_READ 
&& err
) { 
2323                                 if (dispatch_data_get_size(d
)) { 
2324                                         _dispatch_fd_debug("IO handler invoke", fd
); 
2325                                         handler(false, d
, 0); 
2328                         } else if (direction 
== DOP_DIR_WRITE 
&& !err
) { 
2332                 _dispatch_fd_debug("IO handler invoke", fd
); 
2333                 handler(done
, d
, err
); 
2334                 _dispatch_release(channel
); 
2335                 _dispatch_fd_entry_release(fd_entry
); 
2336                 _dispatch_io_data_release(data
); 
2341 #pragma mark dispatch_io_debug 
2344 _dispatch_io_debug_attr(dispatch_io_t channel
, char* buf
, size_t bufsiz
) 
2346         dispatch_queue_t target 
= channel
->do_targetq
; 
2347         return dsnprintf(buf
, bufsiz
, "type = %s, fd = 0x%x, %sfd_entry = %p, " 
2348                         "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = " 
2349                         "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ", 
2350                         channel
->params
.type 
== DISPATCH_IO_STREAM 
? "stream" : "random", 
2351                         channel
->fd_actual
, channel
->atomic_flags 
& DIO_STOPPED 
? 
2352                         "stopped, " : channel
->atomic_flags 
& DIO_CLOSED 
? "closed, " : "", 
2353                         channel
->fd_entry
, channel
->queue
, target 
&& target
->dq_label 
? 
2354                         target
->dq_label 
: "", target
, channel
->barrier_queue
, 
2355                         channel
->barrier_group
, channel
->err
, channel
->params
.low
, 
2356                         channel
->params
.high
, channel
->params
.interval_flags 
& 
2357                         DISPATCH_IO_STRICT_INTERVAL 
? "(strict)" : "", 
2358                         channel
->params
.interval
); 
2362 _dispatch_io_debug(dispatch_io_t channel
, char* buf
, size_t bufsiz
) 
2365         offset 
+= dsnprintf(&buf
[offset
], bufsiz 
- offset
, "%s[%p] = { ", 
2366                         dx_kind(channel
), channel
); 
2367         offset 
+= _dispatch_object_debug_attr(channel
, &buf
[offset
], 
2369         offset 
+= _dispatch_io_debug_attr(channel
, &buf
[offset
], bufsiz 
- offset
); 
2370         offset 
+= dsnprintf(&buf
[offset
], bufsiz 
- offset
, "}"); 
2375 _dispatch_operation_debug_attr(dispatch_operation_t op
, char* buf
, 
2378         dispatch_queue_t target 
= op
->do_targetq
; 
2379         dispatch_queue_t oqtarget 
= op
->op_q 
? op
->op_q
->do_targetq 
: NULL
; 
2380         return dsnprintf(buf
, bufsiz
, "type = %s %s, fd = 0x%x, fd_entry = %p, " 
2381                         "channel = %p, queue = %p -> %s[%p], target = %s[%p], " 
2382                         "offset = %lld, length = %zu, done = %zu, undelivered = %zu, " 
2383                         "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, " 
2384                         "interval%s = %llu ", op
->params
.type 
== DISPATCH_IO_STREAM 
? 
2385                         "stream" : "random", op
->direction 
== DOP_DIR_READ 
? "read" : 
2386                         "write", op
->fd_entry 
? op
->fd_entry
->fd 
: -1, op
->fd_entry
, 
2387                         op
->channel
, op
->op_q
, oqtarget 
&& oqtarget
->dq_label 
? 
2388                         oqtarget
->dq_label 
: "", oqtarget
, target 
&& target
->dq_label 
? 
2389                         target
->dq_label 
: "", target
, op
->offset
, op
->length
, op
->total
, 
2390                         op
->undelivered 
+ op
->buf_len
, op
->flags
, op
->err
, op
->params
.low
, 
2391                         op
->params
.high
, op
->params
.interval_flags 
& 
2392                         DISPATCH_IO_STRICT_INTERVAL 
? "(strict)" : "", op
->params
.interval
); 
2396 _dispatch_operation_debug(dispatch_operation_t op
, char* buf
, size_t bufsiz
) 
2399         offset 
+= dsnprintf(&buf
[offset
], bufsiz 
- offset
, "%s[%p] = { ", 
2401         offset 
+= _dispatch_object_debug_attr(op
, &buf
[offset
], bufsiz 
- offset
); 
2402         offset 
+= _dispatch_operation_debug_attr(op
, &buf
[offset
], bufsiz 
- offset
); 
2403         offset 
+= dsnprintf(&buf
[offset
], bufsiz 
- offset
, "}");