2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 #ifndef DISPATCH_IO_DEBUG
24 #define DISPATCH_IO_DEBUG DISPATCH_DEBUG
28 #define PAGE_SIZE ((size_t)getpagesize())
31 #if DISPATCH_DATA_IS_BRIDGED_TO_NSDATA
32 #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
33 #define _dispatch_io_data_release(x) _dispatch_objc_release(x)
35 #define _dispatch_io_data_retain(x) dispatch_retain(x)
36 #define _dispatch_io_data_release(x) dispatch_release(x)
39 typedef void (^dispatch_fd_entry_init_callback_t
)(dispatch_fd_entry_t fd_entry
);
41 DISPATCH_EXPORT DISPATCH_NOTHROW
42 void _dispatch_iocntl(uint32_t param
, uint64_t value
);
44 static dispatch_operation_t
_dispatch_operation_create(
45 dispatch_op_direction_t direction
, dispatch_io_t channel
, off_t offset
,
46 size_t length
, dispatch_data_t data
, dispatch_queue_t queue
,
47 dispatch_io_handler_t handler
);
48 static void _dispatch_operation_enqueue(dispatch_operation_t op
,
49 dispatch_op_direction_t direction
, dispatch_data_t data
);
50 static dispatch_source_t
_dispatch_operation_timer(dispatch_queue_t tq
,
51 dispatch_operation_t op
);
52 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
);
53 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
);
54 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd
,
55 dispatch_fd_entry_init_callback_t completion_callback
);
56 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
,
58 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_path(
59 dispatch_io_path_data_t path_data
, dev_t dev
, mode_t mode
);
60 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
,
61 dispatch_io_t channel
);
62 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
,
63 dispatch_io_t channel
);
64 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry
,
66 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
,
67 dispatch_op_direction_t direction
);
68 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
);
69 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream
,
70 dispatch_operation_t operation
, dispatch_data_t data
);
71 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk
,
72 dispatch_operation_t operation
, dispatch_data_t data
);
73 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream
,
74 dispatch_io_t channel
);
75 static void _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk
,
76 dispatch_io_t channel
);
77 static void _dispatch_stream_source_handler(void *ctx
);
78 static void _dispatch_stream_queue_handler(void *ctx
);
79 static void _dispatch_stream_handler(void *ctx
);
80 static void _dispatch_disk_handler(void *ctx
);
81 static void _dispatch_disk_perform(void *ctxt
);
82 static void _dispatch_operation_advise(dispatch_operation_t op
,
84 static int _dispatch_operation_perform(dispatch_operation_t op
);
85 static void _dispatch_operation_deliver_data(dispatch_operation_t op
,
86 dispatch_op_flags_t flags
);
88 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
89 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
90 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
91 case EINTR: continue; \
96 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
97 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
102 #define _dispatch_io_syscall(__syscall) do { int __err; \
103 _dispatch_io_syscall_switch(__err, __syscall); \
107 DISPATCH_OP_COMPLETE
= 1,
109 DISPATCH_OP_DELIVER_AND_COMPLETE
,
110 DISPATCH_OP_COMPLETE_RESUME
,
116 #define _dispatch_io_Block_copy(x) \
117 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
120 #pragma mark dispatch_io_debug
122 #if DISPATCH_IO_DEBUG
124 #define _dispatch_io_log(x, ...) do { \
125 _dispatch_log("%llu\t%p\t" x, _dispatch_absolute_time(), \
126 (void *)_dispatch_thread_self(), ##__VA_ARGS__); \
128 #ifdef _dispatch_object_debug
129 #undef _dispatch_object_debug
130 #define _dispatch_object_debug dispatch_debug
131 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
134 #define _dispatch_io_log(x, ...) _dispatch_debug(x, ##__VA_ARGS__)
135 #endif // DISPATCH_DEBUG
137 #define _dispatch_io_log(x, ...)
138 #endif // DISPATCH_IO_DEBUG
140 #define _dispatch_fd_debug(msg, fd, ...) \
141 _dispatch_io_log("fd[0x%x]: " msg, fd, ##__VA_ARGS__)
142 #define _dispatch_op_debug(msg, op, ...) \
143 _dispatch_io_log("op[%p]: " msg, op, ##__VA_ARGS__)
144 #define _dispatch_channel_debug(msg, channel, ...) \
145 _dispatch_io_log("channel[%p]: " msg, channel, ##__VA_ARGS__)
146 #define _dispatch_fd_entry_debug(msg, fd_entry, ...) \
147 _dispatch_io_log("fd_entry[%p]: " msg, fd_entry, ##__VA_ARGS__)
148 #define _dispatch_disk_debug(msg, disk, ...) \
149 _dispatch_io_log("disk[%p]: " msg, disk, ##__VA_ARGS__)
152 #pragma mark dispatch_io_hashtables
154 // Global hashtable of dev_t -> disk_s mappings
155 DISPATCH_CACHELINE_ALIGN
156 static TAILQ_HEAD(, dispatch_disk_s
) _dispatch_io_devs
[DIO_HASH_SIZE
];
157 // Global hashtable of fd -> fd_entry_s mappings
158 DISPATCH_CACHELINE_ALIGN
159 static TAILQ_HEAD(, dispatch_fd_entry_s
) _dispatch_io_fds
[DIO_HASH_SIZE
];
161 static dispatch_once_t _dispatch_io_devs_lockq_pred
;
162 static dispatch_queue_t _dispatch_io_devs_lockq
;
163 static dispatch_queue_t _dispatch_io_fds_lockq
;
165 static char const * const _dispatch_io_key
= "io";
168 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED
)
170 _dispatch_io_fds_lockq
= dispatch_queue_create(
171 "com.apple.libdispatch-io.fd_lockq", NULL
);
173 for (i
= 0; i
< DIO_HASH_SIZE
; i
++) {
174 TAILQ_INIT(&_dispatch_io_fds
[i
]);
179 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED
)
181 _dispatch_io_devs_lockq
= dispatch_queue_create(
182 "com.apple.libdispatch-io.dev_lockq", NULL
);
184 for (i
= 0; i
< DIO_HASH_SIZE
; i
++) {
185 TAILQ_INIT(&_dispatch_io_devs
[i
]);
190 #pragma mark dispatch_io_defaults
193 DISPATCH_IOCNTL_CHUNK_PAGES
= 1,
194 DISPATCH_IOCNTL_LOW_WATER_CHUNKS
,
195 DISPATCH_IOCNTL_INITIAL_DELIVERY
,
196 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
,
199 static struct dispatch_io_defaults_s
{
200 size_t chunk_size
, low_water_chunks
, max_pending_io_reqs
;
201 bool initial_delivery
;
202 } dispatch_io_defaults
= {
203 .chunk_size
= DIO_MAX_CHUNK_SIZE
,
204 .low_water_chunks
= DIO_DEFAULT_LOW_WATER_CHUNKS
,
205 .max_pending_io_reqs
= DIO_MAX_PENDING_IO_REQS
,
208 #define _dispatch_iocntl_set_default(p, v) do { \
209 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
213 _dispatch_iocntl(uint32_t param
, uint64_t value
)
216 case DISPATCH_IOCNTL_CHUNK_PAGES
:
217 _dispatch_iocntl_set_default(chunk_size
, value
* PAGE_SIZE
);
219 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS
:
220 _dispatch_iocntl_set_default(low_water_chunks
, value
);
222 case DISPATCH_IOCNTL_INITIAL_DELIVERY
:
223 _dispatch_iocntl_set_default(initial_delivery
, value
);
224 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
:
225 _dispatch_iocntl_set_default(max_pending_io_reqs
, value
);
231 #pragma mark dispatch_io_t
234 _dispatch_io_create(dispatch_io_type_t type
)
236 dispatch_io_t channel
= _dispatch_object_alloc(DISPATCH_VTABLE(io
),
237 sizeof(struct dispatch_io_s
));
238 channel
->do_next
= DISPATCH_OBJECT_LISTLESS
;
239 channel
->do_targetq
= _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT
, true);
240 channel
->params
.type
= type
;
241 channel
->params
.high
= SIZE_MAX
;
242 channel
->params
.low
= dispatch_io_defaults
.low_water_chunks
*
243 dispatch_io_defaults
.chunk_size
;
244 channel
->queue
= dispatch_queue_create("com.apple.libdispatch-io.channelq",
250 _dispatch_io_init(dispatch_io_t channel
, dispatch_fd_entry_t fd_entry
,
251 dispatch_queue_t queue
, int err
, void (^cleanup_handler
)(int))
253 // Enqueue the cleanup handler on the suspended close queue
254 if (cleanup_handler
) {
255 _dispatch_retain(queue
);
256 dispatch_async(!err
? fd_entry
->close_queue
: channel
->queue
, ^{
257 dispatch_async(queue
, ^{
258 _dispatch_channel_debug("cleanup handler invoke: err %d",
260 cleanup_handler(err
);
262 _dispatch_release(queue
);
266 channel
->fd_entry
= fd_entry
;
267 dispatch_retain(fd_entry
->barrier_queue
);
268 dispatch_retain(fd_entry
->barrier_group
);
269 channel
->barrier_queue
= fd_entry
->barrier_queue
;
270 channel
->barrier_group
= fd_entry
->barrier_group
;
272 // Still need to create a barrier queue, since all operations go
274 channel
->barrier_queue
= dispatch_queue_create(
275 "com.apple.libdispatch-io.barrierq", NULL
);
276 channel
->barrier_group
= dispatch_group_create();
281 _dispatch_io_dispose(dispatch_io_t channel
, DISPATCH_UNUSED
bool *allow_free
)
283 _dispatch_object_debug(channel
, "%s", __func__
);
284 if (channel
->fd_entry
&&
285 !(channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
))) {
286 if (channel
->fd_entry
->path_data
) {
287 // This modification is safe since path_data->channel is checked
288 // only on close_queue (which is still suspended at this point)
289 channel
->fd_entry
->path_data
->channel
= NULL
;
291 // Cleanup handlers will only run when all channels related to this
293 _dispatch_fd_entry_release(channel
->fd_entry
);
295 if (channel
->queue
) {
296 dispatch_release(channel
->queue
);
298 if (channel
->barrier_queue
) {
299 dispatch_release(channel
->barrier_queue
);
301 if (channel
->barrier_group
) {
302 dispatch_release(channel
->barrier_group
);
307 _dispatch_io_validate_type(dispatch_io_t channel
, mode_t mode
)
312 } else if (channel
->params
.type
== DISPATCH_IO_RANDOM
&&
313 (S_ISFIFO(mode
) || S_ISSOCK(mode
))) {
320 _dispatch_io_get_error(dispatch_operation_t op
, dispatch_io_t channel
,
326 channel
= op
->channel
;
328 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
329 if (!ignore_closed
|| channel
->atomic_flags
& DIO_STOPPED
) {
335 err
= op
? op
->fd_entry
->err
: channel
->err
;
341 #pragma mark dispatch_io_channels
344 dispatch_io_create(dispatch_io_type_t type
, dispatch_fd_t fd
,
345 dispatch_queue_t queue
, void (^cleanup_handler
)(int))
347 if (type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) {
348 return DISPATCH_BAD_INPUT
;
350 dispatch_io_t channel
= _dispatch_io_create(type
);
352 _dispatch_channel_debug("create", channel
);
353 channel
->fd_actual
= fd
;
354 dispatch_suspend(channel
->queue
);
355 _dispatch_retain(queue
);
356 _dispatch_retain(channel
);
357 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
359 int err
= fd_entry
->err
;
361 err
= _dispatch_io_validate_type(channel
, fd_entry
->stat
.mode
);
363 if (!err
&& type
== DISPATCH_IO_RANDOM
) {
365 _dispatch_io_syscall_switch_noerr(err
,
366 f_ptr
= lseek(fd_entry
->fd
, 0, SEEK_CUR
),
367 case 0: channel
->f_ptr
= f_ptr
; break;
368 default: (void)dispatch_assume_zero(err
); break;
372 _dispatch_fd_entry_retain(fd_entry
);
373 _dispatch_io_init(channel
, fd_entry
, queue
, err
, cleanup_handler
);
374 dispatch_resume(channel
->queue
);
375 _dispatch_object_debug(channel
, "%s", __func__
);
376 _dispatch_release(channel
);
377 _dispatch_release(queue
);
379 _dispatch_object_debug(channel
, "%s", __func__
);
384 dispatch_io_create_f(dispatch_io_type_t type
, dispatch_fd_t fd
,
385 dispatch_queue_t queue
, void *context
,
386 void (*cleanup_handler
)(void *context
, int error
))
388 return dispatch_io_create(type
, fd
, queue
, !cleanup_handler
? NULL
:
389 ^(int error
){ cleanup_handler(context
, error
); });
393 dispatch_io_create_with_path(dispatch_io_type_t type
, const char *path
,
394 int oflag
, mode_t mode
, dispatch_queue_t queue
,
395 void (^cleanup_handler
)(int error
))
397 if ((type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) ||
399 return DISPATCH_BAD_INPUT
;
401 size_t pathlen
= strlen(path
);
402 dispatch_io_path_data_t path_data
= malloc(sizeof(*path_data
) + pathlen
+1);
404 return DISPATCH_OUT_OF_MEMORY
;
406 dispatch_io_t channel
= _dispatch_io_create(type
);
408 _dispatch_channel_debug("create with path %s", channel
, path
);
409 channel
->fd_actual
= -1;
410 path_data
->channel
= channel
;
411 path_data
->oflag
= oflag
;
412 path_data
->mode
= mode
;
413 path_data
->pathlen
= pathlen
;
414 memcpy(path_data
->path
, path
, pathlen
+ 1);
415 _dispatch_retain(queue
);
416 _dispatch_retain(channel
);
417 dispatch_async(channel
->queue
, ^{
420 _dispatch_io_syscall_switch_noerr(err
,
421 (path_data
->oflag
& O_NOFOLLOW
) == O_NOFOLLOW
423 || (path_data
->oflag
& O_SYMLINK
) == O_SYMLINK
425 ? lstat(path_data
->path
, &st
) : stat(path_data
->path
, &st
),
427 err
= _dispatch_io_validate_type(channel
, st
.st_mode
);
430 if ((path_data
->oflag
& O_CREAT
) &&
431 (*(path_data
->path
+ path_data
->pathlen
- 1) != '/')) {
432 // Check parent directory
433 char *c
= strrchr(path_data
->path
, '/');
437 _dispatch_io_syscall_switch_noerr(perr
,
438 stat(path_data
->path
, &st
),
440 // Since the parent directory exists, open() will
441 // create a regular file after the fd_entry has
443 st
.st_mode
= S_IFREG
;
454 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
);
455 _dispatch_release(channel
);
456 _dispatch_release(queue
);
459 dispatch_suspend(channel
->queue
);
460 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
,
461 _dispatch_io_devs_lockq_init
);
462 dispatch_async(_dispatch_io_devs_lockq
, ^{
463 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create_with_path(
464 path_data
, st
.st_dev
, st
.st_mode
);
465 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
);
466 dispatch_resume(channel
->queue
);
467 _dispatch_object_debug(channel
, "%s", __func__
);
468 _dispatch_release(channel
);
469 _dispatch_release(queue
);
472 _dispatch_object_debug(channel
, "%s", __func__
);
477 dispatch_io_create_with_path_f(dispatch_io_type_t type
, const char *path
,
478 int oflag
, mode_t mode
, dispatch_queue_t queue
, void *context
,
479 void (*cleanup_handler
)(void *context
, int error
))
481 return dispatch_io_create_with_path(type
, path
, oflag
, mode
, queue
,
482 !cleanup_handler
? NULL
:
483 ^(int error
){ cleanup_handler(context
, error
); });
487 dispatch_io_create_with_io(dispatch_io_type_t type
, dispatch_io_t in_channel
,
488 dispatch_queue_t queue
, void (^cleanup_handler
)(int error
))
490 if (type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) {
491 return DISPATCH_BAD_INPUT
;
493 dispatch_io_t channel
= _dispatch_io_create(type
);
494 _dispatch_channel_debug("create with channel %p", channel
, in_channel
);
495 dispatch_suspend(channel
->queue
);
496 _dispatch_retain(queue
);
497 _dispatch_retain(channel
);
498 _dispatch_retain(in_channel
);
499 dispatch_async(in_channel
->queue
, ^{
500 int err0
= _dispatch_io_get_error(NULL
, in_channel
, false);
503 _dispatch_io_init(channel
, NULL
, queue
, err0
, cleanup_handler
);
504 dispatch_resume(channel
->queue
);
505 _dispatch_release(channel
);
506 _dispatch_release(in_channel
);
507 _dispatch_release(queue
);
510 dispatch_async(in_channel
->barrier_queue
, ^{
511 int err
= _dispatch_io_get_error(NULL
, in_channel
, false);
512 // If there is no error, the fd_entry for the in_channel is valid.
513 // Since we are running on in_channel's queue, the fd_entry has been
514 // fully resolved and will stay valid for the duration of this block
516 err
= in_channel
->err
;
518 err
= in_channel
->fd_entry
->err
;
522 err
= _dispatch_io_validate_type(channel
,
523 in_channel
->fd_entry
->stat
.mode
);
525 if (!err
&& type
== DISPATCH_IO_RANDOM
&& in_channel
->fd
!= -1) {
527 _dispatch_io_syscall_switch_noerr(err
,
528 f_ptr
= lseek(in_channel
->fd_entry
->fd
, 0, SEEK_CUR
),
529 case 0: channel
->f_ptr
= f_ptr
; break;
530 default: (void)dispatch_assume_zero(err
); break;
535 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
);
536 dispatch_resume(channel
->queue
);
537 _dispatch_release(channel
);
538 _dispatch_release(in_channel
);
539 _dispatch_release(queue
);
542 if (in_channel
->fd
== -1) {
543 // in_channel was created from path
545 channel
->fd_actual
= -1;
546 mode_t mode
= in_channel
->fd_entry
->stat
.mode
;
547 dev_t dev
= in_channel
->fd_entry
->stat
.dev
;
548 size_t path_data_len
= sizeof(struct dispatch_io_path_data_s
) +
549 in_channel
->fd_entry
->path_data
->pathlen
+ 1;
550 dispatch_io_path_data_t path_data
= malloc(path_data_len
);
551 memcpy(path_data
, in_channel
->fd_entry
->path_data
,
553 path_data
->channel
= channel
;
554 // lockq_io_devs is known to already exist
555 dispatch_async(_dispatch_io_devs_lockq
, ^{
556 dispatch_fd_entry_t fd_entry
;
557 fd_entry
= _dispatch_fd_entry_create_with_path(path_data
,
559 _dispatch_io_init(channel
, fd_entry
, queue
, 0,
561 dispatch_resume(channel
->queue
);
562 _dispatch_release(channel
);
563 _dispatch_release(queue
);
566 dispatch_fd_entry_t fd_entry
= in_channel
->fd_entry
;
567 channel
->fd
= in_channel
->fd
;
568 channel
->fd_actual
= in_channel
->fd_actual
;
569 _dispatch_fd_entry_retain(fd_entry
);
570 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
);
571 dispatch_resume(channel
->queue
);
572 _dispatch_release(channel
);
573 _dispatch_release(queue
);
575 _dispatch_release(in_channel
);
576 _dispatch_object_debug(channel
, "%s", __func__
);
579 _dispatch_object_debug(channel
, "%s", __func__
);
584 dispatch_io_create_with_io_f(dispatch_io_type_t type
, dispatch_io_t in_channel
,
585 dispatch_queue_t queue
, void *context
,
586 void (*cleanup_handler
)(void *context
, int error
))
588 return dispatch_io_create_with_io(type
, in_channel
, queue
,
589 !cleanup_handler
? NULL
:
590 ^(int error
){ cleanup_handler(context
, error
); });
594 #pragma mark dispatch_io_accessors
597 dispatch_io_set_high_water(dispatch_io_t channel
, size_t high_water
)
599 _dispatch_retain(channel
);
600 dispatch_async(channel
->queue
, ^{
601 _dispatch_channel_debug("set high water: %zu", channel
, high_water
);
602 if (channel
->params
.low
> high_water
) {
603 channel
->params
.low
= high_water
;
605 channel
->params
.high
= high_water
? high_water
: 1;
606 _dispatch_release(channel
);
611 dispatch_io_set_low_water(dispatch_io_t channel
, size_t low_water
)
613 _dispatch_retain(channel
);
614 dispatch_async(channel
->queue
, ^{
615 _dispatch_channel_debug("set low water: %zu", channel
, low_water
);
616 if (channel
->params
.high
< low_water
) {
617 channel
->params
.high
= low_water
? low_water
: 1;
619 channel
->params
.low
= low_water
;
620 _dispatch_release(channel
);
625 dispatch_io_set_interval(dispatch_io_t channel
, uint64_t interval
,
628 _dispatch_retain(channel
);
629 dispatch_async(channel
->queue
, ^{
630 _dispatch_channel_debug("set interval: %llu", channel
, interval
);
631 channel
->params
.interval
= interval
< INT64_MAX
? interval
: INT64_MAX
;
632 channel
->params
.interval_flags
= flags
;
633 _dispatch_release(channel
);
638 _dispatch_io_set_target_queue(dispatch_io_t channel
, dispatch_queue_t dq
)
640 _dispatch_retain(dq
);
641 _dispatch_retain(channel
);
642 dispatch_async(channel
->queue
, ^{
643 dispatch_queue_t prev_dq
= channel
->do_targetq
;
644 channel
->do_targetq
= dq
;
645 _dispatch_release(prev_dq
);
646 _dispatch_object_debug(channel
, "%s", __func__
);
647 _dispatch_release(channel
);
652 dispatch_io_get_descriptor(dispatch_io_t channel
)
654 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
657 dispatch_fd_t fd
= channel
->fd_actual
;
658 if (fd
== -1 && !_dispatch_io_get_error(NULL
, channel
, false)) {
659 dispatch_thread_context_t ctxt
=
660 _dispatch_thread_context_find(_dispatch_io_key
);
661 if (ctxt
&& ctxt
->dtc_io_in_barrier
== channel
) {
662 (void)_dispatch_fd_entry_open(channel
->fd_entry
, channel
);
665 return channel
->fd_actual
;
669 #pragma mark dispatch_io_operations
672 _dispatch_io_stop(dispatch_io_t channel
)
674 _dispatch_channel_debug("stop", channel
);
675 (void)os_atomic_or2o(channel
, atomic_flags
, DIO_STOPPED
, relaxed
);
676 _dispatch_retain(channel
);
677 dispatch_async(channel
->queue
, ^{
678 dispatch_async(channel
->barrier_queue
, ^{
679 _dispatch_object_debug(channel
, "%s", __func__
);
680 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
682 _dispatch_channel_debug("stop cleanup", channel
);
683 _dispatch_fd_entry_cleanup_operations(fd_entry
, channel
);
684 if (!(channel
->atomic_flags
& DIO_CLOSED
)) {
685 if (fd_entry
->path_data
) {
686 fd_entry
->path_data
->channel
= NULL
;
688 channel
->fd_entry
= NULL
;
689 _dispatch_fd_entry_release(fd_entry
);
691 } else if (channel
->fd
!= -1) {
692 // Stop after close, need to check if fd_entry still exists
693 _dispatch_retain(channel
);
694 dispatch_async(_dispatch_io_fds_lockq
, ^{
695 _dispatch_object_debug(channel
, "%s", __func__
);
696 _dispatch_channel_debug("stop cleanup after close",
698 dispatch_fd_entry_t fdi
;
699 uintptr_t hash
= DIO_HASH(channel
->fd
);
700 TAILQ_FOREACH(fdi
, &_dispatch_io_fds
[hash
], fd_list
) {
701 if (fdi
->fd
== channel
->fd
) {
702 _dispatch_fd_entry_cleanup_operations(fdi
, channel
);
706 _dispatch_release(channel
);
709 _dispatch_release(channel
);
715 dispatch_io_close(dispatch_io_t channel
, unsigned long flags
)
717 if (flags
& DISPATCH_IO_STOP
) {
718 // Don't stop an already stopped channel
719 if (channel
->atomic_flags
& DIO_STOPPED
) {
722 return _dispatch_io_stop(channel
);
724 // Don't close an already closed or stopped channel
725 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
728 _dispatch_retain(channel
);
729 dispatch_async(channel
->queue
, ^{
730 dispatch_async(channel
->barrier_queue
, ^{
731 _dispatch_object_debug(channel
, "%s", __func__
);
732 _dispatch_channel_debug("close", channel
);
733 if (!(channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
))) {
734 (void)os_atomic_or2o(channel
, atomic_flags
, DIO_CLOSED
,
736 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
738 if (fd_entry
->path_data
) {
739 fd_entry
->path_data
->channel
= NULL
;
741 channel
->fd_entry
= NULL
;
742 _dispatch_fd_entry_release(fd_entry
);
745 _dispatch_release(channel
);
751 dispatch_io_barrier(dispatch_io_t channel
, dispatch_block_t barrier
)
753 _dispatch_retain(channel
);
754 dispatch_async(channel
->queue
, ^{
755 dispatch_queue_t io_q
= channel
->do_targetq
;
756 dispatch_queue_t barrier_queue
= channel
->barrier_queue
;
757 dispatch_group_t barrier_group
= channel
->barrier_group
;
758 dispatch_async(barrier_queue
, ^{
759 dispatch_suspend(barrier_queue
);
760 dispatch_group_notify(barrier_group
, io_q
, ^{
761 dispatch_thread_context_s io_ctxt
= {
762 .dtc_key
= _dispatch_io_key
,
763 .dtc_io_in_barrier
= channel
,
766 _dispatch_object_debug(channel
, "%s", __func__
);
767 _dispatch_thread_context_push(&io_ctxt
);
769 _dispatch_thread_context_pop(&io_ctxt
);
770 dispatch_resume(barrier_queue
);
771 _dispatch_release(channel
);
778 dispatch_io_barrier_f(dispatch_io_t channel
, void *context
,
779 dispatch_function_t barrier
)
781 return dispatch_io_barrier(channel
, ^{ barrier(context
); });
785 dispatch_io_read(dispatch_io_t channel
, off_t offset
, size_t length
,
786 dispatch_queue_t queue
, dispatch_io_handler_t handler
)
788 _dispatch_retain(channel
);
789 _dispatch_retain(queue
);
790 dispatch_async(channel
->queue
, ^{
791 dispatch_operation_t op
;
792 op
= _dispatch_operation_create(DOP_DIR_READ
, channel
, offset
,
793 length
, dispatch_data_empty
, queue
, handler
);
795 dispatch_queue_t barrier_q
= channel
->barrier_queue
;
796 dispatch_async(barrier_q
, ^{
797 _dispatch_operation_enqueue(op
, DOP_DIR_READ
,
798 dispatch_data_empty
);
801 _dispatch_release(channel
);
802 _dispatch_release(queue
);
807 dispatch_io_read_f(dispatch_io_t channel
, off_t offset
, size_t length
,
808 dispatch_queue_t queue
, void *context
,
809 dispatch_io_handler_function_t handler
)
811 return dispatch_io_read(channel
, offset
, length
, queue
,
812 ^(bool done
, dispatch_data_t d
, int error
){
813 handler(context
, done
, d
, error
);
818 dispatch_io_write(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
,
819 dispatch_queue_t queue
, dispatch_io_handler_t handler
)
821 _dispatch_io_data_retain(data
);
822 _dispatch_retain(channel
);
823 _dispatch_retain(queue
);
824 dispatch_async(channel
->queue
, ^{
825 dispatch_operation_t op
;
826 op
= _dispatch_operation_create(DOP_DIR_WRITE
, channel
, offset
,
827 dispatch_data_get_size(data
), data
, queue
, handler
);
829 dispatch_queue_t barrier_q
= channel
->barrier_queue
;
830 dispatch_async(barrier_q
, ^{
831 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
);
832 _dispatch_io_data_release(data
);
835 _dispatch_io_data_release(data
);
837 _dispatch_release(channel
);
838 _dispatch_release(queue
);
843 dispatch_io_write_f(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
,
844 dispatch_queue_t queue
, void *context
,
845 dispatch_io_handler_function_t handler
)
847 return dispatch_io_write(channel
, offset
, data
, queue
,
848 ^(bool done
, dispatch_data_t d
, int error
){
849 handler(context
, done
, d
, error
);
854 dispatch_read(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
,
855 void (^handler
)(dispatch_data_t
, int))
857 _dispatch_retain(queue
);
858 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
861 int err
= fd_entry
->err
;
862 dispatch_async(queue
, ^{
863 _dispatch_fd_debug("convenience handler invoke", fd
);
864 handler(dispatch_data_empty
, err
);
866 _dispatch_release(queue
);
869 // Safe to access fd_entry on barrier queue
870 dispatch_io_t channel
= fd_entry
->convenience_channel
;
872 channel
= _dispatch_io_create(DISPATCH_IO_STREAM
);
874 channel
->fd_actual
= fd
;
875 channel
->fd_entry
= fd_entry
;
876 dispatch_retain(fd_entry
->barrier_queue
);
877 dispatch_retain(fd_entry
->barrier_group
);
878 channel
->barrier_queue
= fd_entry
->barrier_queue
;
879 channel
->barrier_group
= fd_entry
->barrier_group
;
880 fd_entry
->convenience_channel
= channel
;
882 __block dispatch_data_t deliver_data
= dispatch_data_empty
;
884 dispatch_async(fd_entry
->close_queue
, ^{
885 dispatch_async(queue
, ^{
886 _dispatch_fd_debug("convenience handler invoke", fd
);
887 handler(deliver_data
, err
);
888 _dispatch_io_data_release(deliver_data
);
890 _dispatch_release(queue
);
892 dispatch_operation_t op
=
893 _dispatch_operation_create(DOP_DIR_READ
, channel
, 0,
894 length
, dispatch_data_empty
,
895 _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT
, false),
896 ^(bool done
, dispatch_data_t data
, int error
) {
898 data
= dispatch_data_create_concat(deliver_data
, data
);
899 _dispatch_io_data_release(deliver_data
);
907 _dispatch_operation_enqueue(op
, DOP_DIR_READ
, dispatch_data_empty
);
913 dispatch_read_f(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
,
914 void *context
, void (*handler
)(void *, dispatch_data_t
, int))
916 return dispatch_read(fd
, length
, queue
, ^(dispatch_data_t d
, int error
){
917 handler(context
, d
, error
);
922 dispatch_write(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
,
923 void (^handler
)(dispatch_data_t
, int))
925 _dispatch_io_data_retain(data
);
926 _dispatch_retain(queue
);
927 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
930 int err
= fd_entry
->err
;
931 dispatch_async(queue
, ^{
932 _dispatch_fd_debug("convenience handler invoke", fd
);
935 _dispatch_release(queue
);
938 // Safe to access fd_entry on barrier queue
939 dispatch_io_t channel
= fd_entry
->convenience_channel
;
941 channel
= _dispatch_io_create(DISPATCH_IO_STREAM
);
943 channel
->fd_actual
= fd
;
944 channel
->fd_entry
= fd_entry
;
945 dispatch_retain(fd_entry
->barrier_queue
);
946 dispatch_retain(fd_entry
->barrier_group
);
947 channel
->barrier_queue
= fd_entry
->barrier_queue
;
948 channel
->barrier_group
= fd_entry
->barrier_group
;
949 fd_entry
->convenience_channel
= channel
;
951 __block dispatch_data_t deliver_data
= NULL
;
953 dispatch_async(fd_entry
->close_queue
, ^{
954 dispatch_async(queue
, ^{
955 _dispatch_fd_debug("convenience handler invoke", fd
);
956 handler(deliver_data
, err
);
958 _dispatch_io_data_release(deliver_data
);
961 _dispatch_release(queue
);
963 dispatch_operation_t op
=
964 _dispatch_operation_create(DOP_DIR_WRITE
, channel
, 0,
965 dispatch_data_get_size(data
), data
,
966 _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT
, false),
967 ^(bool done
, dispatch_data_t d
, int error
) {
970 _dispatch_io_data_retain(d
);
977 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
);
979 _dispatch_io_data_release(data
);
984 dispatch_write_f(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
,
985 void *context
, void (*handler
)(void *, dispatch_data_t
, int))
987 return dispatch_write(fd
, data
, queue
, ^(dispatch_data_t d
, int error
){
988 handler(context
, d
, error
);
993 #pragma mark dispatch_operation_t
995 static dispatch_operation_t
996 _dispatch_operation_create(dispatch_op_direction_t direction
,
997 dispatch_io_t channel
, off_t offset
, size_t length
,
998 dispatch_data_t data
, dispatch_queue_t queue
,
999 dispatch_io_handler_t handler
)
1002 dispatch_assert(direction
< DOP_DIR_MAX
);
1003 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
1004 // that can only be NULL if atomic_flags are set rdar://problem/8362514
1005 int err
= _dispatch_io_get_error(NULL
, channel
, false);
1006 if (err
|| !length
) {
1007 _dispatch_io_data_retain(data
);
1008 _dispatch_retain(queue
);
1009 dispatch_async(channel
->barrier_queue
, ^{
1010 dispatch_async(queue
, ^{
1011 dispatch_data_t d
= data
;
1012 if (direction
== DOP_DIR_READ
&& err
) {
1014 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
1017 _dispatch_channel_debug("IO handler invoke: err %d", channel
,
1019 handler(true, d
, err
);
1020 _dispatch_io_data_release(data
);
1022 _dispatch_release(queue
);
1026 dispatch_operation_t op
= _dispatch_object_alloc(DISPATCH_VTABLE(operation
),
1027 sizeof(struct dispatch_operation_s
));
1028 _dispatch_channel_debug("operation create: %p", channel
, op
);
1029 op
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1030 op
->do_xref_cnt
= -1; // operation object is not exposed externally
1031 op
->op_q
= dispatch_queue_create_with_target("com.apple.libdispatch-io.opq",
1034 op
->direction
= direction
;
1035 op
->offset
= offset
+ channel
->f_ptr
;
1036 op
->length
= length
;
1037 op
->handler
= _dispatch_io_Block_copy(handler
);
1038 _dispatch_retain(channel
);
1039 op
->channel
= channel
;
1040 op
->params
= channel
->params
;
1041 // Take a snapshot of the priority of the channel queue. The actual I/O
1042 // for this operation will be performed at this priority
1043 dispatch_queue_t targetq
= op
->channel
->do_targetq
;
1044 while (fastpath(targetq
->do_targetq
)) {
1045 targetq
= targetq
->do_targetq
;
1047 op
->do_targetq
= targetq
;
1048 _dispatch_object_debug(op
, "%s", __func__
);
1053 _dispatch_operation_dispose(dispatch_operation_t op
,
1054 DISPATCH_UNUSED
bool *allow_free
)
1056 _dispatch_object_debug(op
, "%s", __func__
);
1057 _dispatch_op_debug("dispose", op
);
1058 // Deliver the data if there's any
1060 _dispatch_operation_deliver_data(op
, DOP_DONE
);
1061 dispatch_group_leave(op
->fd_entry
->barrier_group
);
1062 _dispatch_fd_entry_release(op
->fd_entry
);
1065 _dispatch_release(op
->channel
);
1068 dispatch_release(op
->timer
);
1070 // For write operations, op->buf is owned by op->buf_data
1071 if (op
->buf
&& op
->direction
== DOP_DIR_READ
) {
1075 _dispatch_io_data_release(op
->buf_data
);
1078 _dispatch_io_data_release(op
->data
);
1081 dispatch_release(op
->op_q
);
1083 Block_release(op
->handler
);
1084 _dispatch_op_debug("disposed", op
);
1088 _dispatch_operation_enqueue(dispatch_operation_t op
,
1089 dispatch_op_direction_t direction
, dispatch_data_t data
)
1091 // Called from the barrier queue
1092 _dispatch_io_data_retain(data
);
1093 // If channel is closed or stopped, then call the handler immediately
1094 int err
= _dispatch_io_get_error(NULL
, op
->channel
, false);
1096 dispatch_io_handler_t handler
= op
->handler
;
1097 dispatch_async(op
->op_q
, ^{
1098 dispatch_data_t d
= data
;
1099 if (direction
== DOP_DIR_READ
&& err
) {
1101 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
1104 handler(true, d
, err
);
1105 _dispatch_io_data_release(data
);
1107 _dispatch_op_debug("release -> %d, err %d", op
, op
->do_ref_cnt
, err
);
1108 _dispatch_release(op
);
1111 // Finish operation init
1112 op
->fd_entry
= op
->channel
->fd_entry
;
1113 _dispatch_fd_entry_retain(op
->fd_entry
);
1114 dispatch_group_enter(op
->fd_entry
->barrier_group
);
1115 dispatch_disk_t disk
= op
->fd_entry
->disk
;
1117 dispatch_stream_t stream
= op
->fd_entry
->streams
[direction
];
1118 dispatch_async(stream
->dq
, ^{
1119 _dispatch_stream_enqueue_operation(stream
, op
, data
);
1120 _dispatch_io_data_release(data
);
1123 dispatch_async(disk
->pick_queue
, ^{
1124 _dispatch_disk_enqueue_operation(disk
, op
, data
);
1125 _dispatch_io_data_release(data
);
1131 _dispatch_operation_should_enqueue(dispatch_operation_t op
,
1132 dispatch_queue_t tq
, dispatch_data_t data
)
1134 // On stream queue or disk queue
1135 _dispatch_op_debug("enqueue", op
);
1136 _dispatch_io_data_retain(data
);
1138 int err
= _dispatch_io_get_error(op
, NULL
, true);
1142 _dispatch_op_debug("release -> %d, err %d", op
, op
->do_ref_cnt
, err
);
1143 _dispatch_release(op
);
1146 if (op
->params
.interval
) {
1147 dispatch_resume(_dispatch_operation_timer(tq
, op
));
1152 static dispatch_source_t
1153 _dispatch_operation_timer(dispatch_queue_t tq
, dispatch_operation_t op
)
1155 // On stream queue or pick queue
1159 dispatch_source_t timer
= dispatch_source_create(
1160 DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, tq
);
1161 dispatch_source_set_timer(timer
,
1162 dispatch_time(DISPATCH_TIME_NOW
, (int64_t)op
->params
.interval
),
1163 op
->params
.interval
, 0);
1164 dispatch_source_set_event_handler(timer
, ^{
1165 // On stream queue or pick queue
1166 if (dispatch_source_testcancel(timer
)) {
1167 // Do nothing. The operation has already completed
1170 dispatch_op_flags_t flags
= DOP_DEFAULT
;
1171 if (op
->params
.interval_flags
& DISPATCH_IO_STRICT_INTERVAL
) {
1172 // Deliver even if there is less data than the low-water mark
1173 flags
|= DOP_DELIVER
;
1175 // If the operation is active, dont deliver data
1176 if ((op
->active
) && (flags
& DOP_DELIVER
)) {
1179 _dispatch_operation_deliver_data(op
, flags
);
1187 #pragma mark dispatch_fd_entry_t
1189 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1191 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry
)
1193 guardid_t guard
= fd_entry
;
1194 const unsigned int guard_flags
= GUARD_CLOSE
;
1195 int err
, fd_flags
= 0;
1196 _dispatch_io_syscall_switch_noerr(err
,
1197 change_fdguard_np(fd_entry
->fd
, NULL
, 0, &guard
, guard_flags
,
1200 fd_entry
->guard_flags
= guard_flags
;
1201 fd_entry
->orig_fd_flags
= fd_flags
;
1204 default: (void)dispatch_assume_zero(err
); break;
1209 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry
)
1211 if (!fd_entry
->guard_flags
) {
1214 guardid_t guard
= fd_entry
;
1215 int err
, fd_flags
= fd_entry
->orig_fd_flags
;
1216 _dispatch_io_syscall_switch(err
,
1217 change_fdguard_np(fd_entry
->fd
, &guard
, fd_entry
->guard_flags
, NULL
, 0,
1219 default: (void)dispatch_assume_zero(err
); break;
1224 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry
) { (void)fd_entry
; }
1226 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry
) { (void)fd_entry
; }
1227 #endif // DISPATCH_USE_GUARDED_FD
1230 _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry
, const char *path
,
1231 int oflag
, mode_t mode
) {
1232 #if DISPATCH_USE_GUARDED_FD
1233 guardid_t guard
= (uintptr_t)fd_entry
;
1234 const unsigned int guard_flags
= GUARD_CLOSE
| GUARD_DUP
|
1235 GUARD_SOCKET_IPC
| GUARD_FILEPORT
;
1236 int fd
= guarded_open_np(path
, &guard
, guard_flags
, oflag
| O_CLOEXEC
,
1239 fd_entry
->guard_flags
= guard_flags
;
1246 return open(path
, oflag
, mode
);
1250 _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry
, int fd
) {
1251 #if DISPATCH_USE_GUARDED_FD
1252 if (fd_entry
->guard_flags
) {
1253 guardid_t guard
= (uintptr_t)fd_entry
;
1254 return guarded_close_np(fd
, &guard
);
1265 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
) {
1266 dispatch_suspend(fd_entry
->close_queue
);
1270 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
) {
1271 dispatch_resume(fd_entry
->close_queue
);
1275 _dispatch_fd_entry_init_async(dispatch_fd_t fd
,
1276 dispatch_fd_entry_init_callback_t completion_callback
)
1278 static dispatch_once_t _dispatch_io_fds_lockq_pred
;
1279 dispatch_once_f(&_dispatch_io_fds_lockq_pred
, NULL
,
1280 _dispatch_io_fds_lockq_init
);
1281 dispatch_async(_dispatch_io_fds_lockq
, ^{
1282 dispatch_fd_entry_t fd_entry
= NULL
;
1283 // Check to see if there is an existing entry for the given fd
1284 uintptr_t hash
= DIO_HASH(fd
);
1285 TAILQ_FOREACH(fd_entry
, &_dispatch_io_fds
[hash
], fd_list
) {
1286 if (fd_entry
->fd
== fd
) {
1287 // Retain the fd_entry to ensure it cannot go away until the
1288 // stat() has completed
1289 _dispatch_fd_entry_retain(fd_entry
);
1294 // If we did not find an existing entry, create one
1295 fd_entry
= _dispatch_fd_entry_create_with_fd(fd
, hash
);
1297 _dispatch_fd_entry_debug("init", fd_entry
);
1298 dispatch_async(fd_entry
->barrier_queue
, ^{
1299 _dispatch_fd_entry_debug("init completion", fd_entry
);
1300 completion_callback(fd_entry
);
1301 // stat() is complete, release reference to fd_entry
1302 _dispatch_fd_entry_release(fd_entry
);
1307 static dispatch_fd_entry_t
1308 _dispatch_fd_entry_create(dispatch_queue_t q
)
1310 dispatch_fd_entry_t fd_entry
;
1311 fd_entry
= _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s
));
1312 // Use target queue to ensure that no concurrent lookups are going on when
1313 // the close queue is running
1314 fd_entry
->close_queue
= dispatch_queue_create_with_target(
1315 "com.apple.libdispatch-io.closeq", NULL
, q
);
1316 // Suspend the cleanup queue until closing
1317 _dispatch_fd_entry_retain(fd_entry
);
1321 static dispatch_fd_entry_t
1322 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
, uintptr_t hash
)
1324 // On fds lock queue
1325 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create(
1326 _dispatch_io_fds_lockq
);
1327 _dispatch_fd_entry_debug("create: fd %d", fd_entry
, fd
);
1329 TAILQ_INSERT_TAIL(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
);
1330 fd_entry
->barrier_queue
= dispatch_queue_create(
1331 "com.apple.libdispatch-io.barrierq", NULL
);
1332 fd_entry
->barrier_group
= dispatch_group_create();
1333 dispatch_async(fd_entry
->barrier_queue
, ^{
1334 _dispatch_fd_entry_debug("stat", fd_entry
);
1335 int err
, orig_flags
, orig_nosigpipe
= -1;
1337 _dispatch_io_syscall_switch(err
,
1339 default: fd_entry
->err
= err
; return;
1341 fd_entry
->stat
.dev
= st
.st_dev
;
1342 fd_entry
->stat
.mode
= st
.st_mode
;
1343 _dispatch_fd_entry_guard(fd_entry
);
1344 _dispatch_io_syscall_switch(err
,
1345 orig_flags
= fcntl(fd
, F_GETFL
),
1346 default: (void)dispatch_assume_zero(err
); break;
1348 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1349 if (S_ISFIFO(st
.st_mode
)) {
1350 _dispatch_io_syscall_switch(err
,
1351 orig_nosigpipe
= fcntl(fd
, F_GETNOSIGPIPE
),
1352 default: (void)dispatch_assume_zero(err
); break;
1354 if (orig_nosigpipe
!= -1) {
1355 _dispatch_io_syscall_switch(err
,
1356 orig_nosigpipe
= fcntl(fd
, F_SETNOSIGPIPE
, 1),
1358 orig_nosigpipe
= -1;
1359 (void)dispatch_assume_zero(err
);
1365 if (S_ISREG(st
.st_mode
)) {
1366 if (orig_flags
!= -1) {
1367 _dispatch_io_syscall_switch(err
,
1368 fcntl(fd
, F_SETFL
, orig_flags
& ~O_NONBLOCK
),
1371 (void)dispatch_assume_zero(err
);
1375 dev_t dev
= major(st
.st_dev
);
1376 // We have to get the disk on the global dev queue. The
1377 // barrier queue cannot continue until that is complete
1378 dispatch_suspend(fd_entry
->barrier_queue
);
1379 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
,
1380 _dispatch_io_devs_lockq_init
);
1381 dispatch_async(_dispatch_io_devs_lockq
, ^{
1382 _dispatch_disk_init(fd_entry
, dev
);
1383 dispatch_resume(fd_entry
->barrier_queue
);
1386 if (orig_flags
!= -1) {
1387 _dispatch_io_syscall_switch(err
,
1388 fcntl(fd
, F_SETFL
, orig_flags
| O_NONBLOCK
),
1391 (void)dispatch_assume_zero(err
);
1396 _dispatch_stream_init(fd_entry
,
1397 _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT
, false));
1399 fd_entry
->orig_flags
= orig_flags
;
1400 fd_entry
->orig_nosigpipe
= orig_nosigpipe
;
1402 // This is the first item run when the close queue is resumed, indicating
1403 // that all channels associated with this entry have been closed and that
1404 // all operations associated with this entry have been freed
1405 dispatch_async(fd_entry
->close_queue
, ^{
1406 if (!fd_entry
->disk
) {
1407 _dispatch_fd_entry_debug("close queue cleanup", fd_entry
);
1408 dispatch_op_direction_t dir
;
1409 for (dir
= 0; dir
< DOP_DIR_MAX
; dir
++) {
1410 _dispatch_stream_dispose(fd_entry
, dir
);
1413 dispatch_disk_t disk
= fd_entry
->disk
;
1414 dispatch_async(_dispatch_io_devs_lockq
, ^{
1415 _dispatch_release(disk
);
1418 // Remove this entry from the global fd list
1419 TAILQ_REMOVE(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
);
1421 // If there was a source associated with this stream, disposing of the
1422 // source cancels it and suspends the close queue. Freeing the fd_entry
1423 // structure must happen after the source cancel handler has finished
1424 dispatch_async(fd_entry
->close_queue
, ^{
1425 _dispatch_fd_entry_debug("close queue release", fd_entry
);
1426 dispatch_release(fd_entry
->close_queue
);
1427 _dispatch_fd_entry_debug("barrier queue release", fd_entry
);
1428 dispatch_release(fd_entry
->barrier_queue
);
1429 _dispatch_fd_entry_debug("barrier group release", fd_entry
);
1430 dispatch_release(fd_entry
->barrier_group
);
1431 if (fd_entry
->orig_flags
!= -1) {
1432 _dispatch_io_syscall(
1433 fcntl(fd
, F_SETFL
, fd_entry
->orig_flags
)
1436 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1437 if (fd_entry
->orig_nosigpipe
!= -1) {
1438 _dispatch_io_syscall(
1439 fcntl(fd
, F_SETNOSIGPIPE
, fd_entry
->orig_nosigpipe
)
1443 _dispatch_fd_entry_unguard(fd_entry
);
1444 if (fd_entry
->convenience_channel
) {
1445 fd_entry
->convenience_channel
->fd_entry
= NULL
;
1446 dispatch_release(fd_entry
->convenience_channel
);
1453 static dispatch_fd_entry_t
1454 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data
,
1455 dev_t dev
, mode_t mode
)
1457 // On devs lock queue
1458 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create(
1459 path_data
->channel
->queue
);
1460 _dispatch_fd_entry_debug("create: path %s", fd_entry
, path_data
->path
);
1461 if (S_ISREG(mode
)) {
1462 _dispatch_disk_init(fd_entry
, major(dev
));
1464 _dispatch_stream_init(fd_entry
,
1465 _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT
, false));
1468 fd_entry
->orig_flags
= -1;
1469 fd_entry
->path_data
= path_data
;
1470 fd_entry
->stat
.dev
= dev
;
1471 fd_entry
->stat
.mode
= mode
;
1472 fd_entry
->barrier_queue
= dispatch_queue_create(
1473 "com.apple.libdispatch-io.barrierq", NULL
);
1474 fd_entry
->barrier_group
= dispatch_group_create();
1475 // This is the first item run when the close queue is resumed, indicating
1476 // that the channel associated with this entry has been closed and that
1477 // all operations associated with this entry have been freed
1478 dispatch_async(fd_entry
->close_queue
, ^{
1479 _dispatch_fd_entry_debug("close queue cleanup", fd_entry
);
1480 if (!fd_entry
->disk
) {
1481 dispatch_op_direction_t dir
;
1482 for (dir
= 0; dir
< DOP_DIR_MAX
; dir
++) {
1483 _dispatch_stream_dispose(fd_entry
, dir
);
1486 if (fd_entry
->fd
!= -1) {
1487 _dispatch_fd_entry_guarded_close(fd_entry
, fd_entry
->fd
);
1489 if (fd_entry
->path_data
->channel
) {
1490 // If associated channel has not been released yet, mark it as
1491 // no longer having an fd_entry (for stop after close).
1492 // It is safe to modify channel since we are on close_queue with
1493 // target queue the channel queue
1494 fd_entry
->path_data
->channel
->fd_entry
= NULL
;
1497 dispatch_async(fd_entry
->close_queue
, ^{
1498 _dispatch_fd_entry_debug("close queue release", fd_entry
);
1499 dispatch_release(fd_entry
->close_queue
);
1500 dispatch_release(fd_entry
->barrier_queue
);
1501 dispatch_release(fd_entry
->barrier_group
);
1502 free(fd_entry
->path_data
);
1509 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
, dispatch_io_t channel
)
1511 if (!(fd_entry
->fd
== -1 && fd_entry
->path_data
)) {
1514 if (fd_entry
->err
) {
1515 return fd_entry
->err
;
1518 int oflag
= fd_entry
->disk
? fd_entry
->path_data
->oflag
& ~O_NONBLOCK
:
1519 fd_entry
->path_data
->oflag
| O_NONBLOCK
;
1521 fd
= _dispatch_fd_entry_guarded_open(fd_entry
, fd_entry
->path_data
->path
,
1522 oflag
, fd_entry
->path_data
->mode
);
1528 (void)os_atomic_cmpxchg2o(fd_entry
, err
, 0, err
, relaxed
);
1531 if (!os_atomic_cmpxchg2o(fd_entry
, fd
, -1, fd
, relaxed
)) {
1532 // Lost the race with another open
1533 _dispatch_fd_entry_guarded_close(fd_entry
, fd
);
1535 channel
->fd_actual
= fd
;
1537 _dispatch_object_debug(channel
, "%s", __func__
);
1542 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
,
1543 dispatch_io_t channel
)
1545 if (fd_entry
->disk
) {
1547 _dispatch_retain(channel
);
1549 _dispatch_fd_entry_retain(fd_entry
);
1550 dispatch_async(fd_entry
->disk
->pick_queue
, ^{
1551 _dispatch_disk_cleanup_inactive_operations(fd_entry
->disk
, channel
);
1552 _dispatch_fd_entry_release(fd_entry
);
1554 _dispatch_release(channel
);
1558 dispatch_op_direction_t direction
;
1559 for (direction
= 0; direction
< DOP_DIR_MAX
; direction
++) {
1560 dispatch_stream_t stream
= fd_entry
->streams
[direction
];
1565 _dispatch_retain(channel
);
1567 _dispatch_fd_entry_retain(fd_entry
);
1568 dispatch_async(stream
->dq
, ^{
1569 _dispatch_stream_cleanup_operations(stream
, channel
);
1570 _dispatch_fd_entry_release(fd_entry
);
1572 _dispatch_release(channel
);
1580 #pragma mark dispatch_stream_t/dispatch_disk_t
1583 _dispatch_stream_init(dispatch_fd_entry_t fd_entry
, dispatch_queue_t tq
)
1585 dispatch_op_direction_t direction
;
1586 for (direction
= 0; direction
< DOP_DIR_MAX
; direction
++) {
1587 dispatch_stream_t stream
;
1588 stream
= _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s
));
1589 stream
->dq
= dispatch_queue_create_with_target(
1590 "com.apple.libdispatch-io.streamq", NULL
, tq
);
1591 dispatch_set_context(stream
->dq
, stream
);
1592 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1593 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_STREAM
]);
1594 fd_entry
->streams
[direction
] = stream
;
1599 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
,
1600 dispatch_op_direction_t direction
)
1603 dispatch_stream_t stream
= fd_entry
->streams
[direction
];
1607 dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
]));
1608 dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
]));
1609 if (stream
->source
) {
1610 // Balanced by source cancel handler:
1611 _dispatch_fd_entry_retain(fd_entry
);
1612 dispatch_source_cancel(stream
->source
);
1613 dispatch_resume(stream
->source
);
1614 dispatch_release(stream
->source
);
1616 dispatch_set_context(stream
->dq
, NULL
);
1617 dispatch_release(stream
->dq
);
1622 _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
)
1624 // On devs lock queue
1625 dispatch_disk_t disk
;
1626 // Check to see if there is an existing entry for the given device
1627 uintptr_t hash
= DIO_HASH(dev
);
1628 TAILQ_FOREACH(disk
, &_dispatch_io_devs
[hash
], disk_list
) {
1629 if (disk
->dev
== dev
) {
1630 _dispatch_retain(disk
);
1634 // Otherwise create a new entry
1635 size_t pending_reqs_depth
= dispatch_io_defaults
.max_pending_io_reqs
;
1636 disk
= _dispatch_object_alloc(DISPATCH_VTABLE(disk
),
1637 sizeof(struct dispatch_disk_s
) +
1638 (pending_reqs_depth
* sizeof(dispatch_operation_t
)));
1639 disk
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1640 disk
->do_xref_cnt
= -1;
1641 disk
->advise_list_depth
= pending_reqs_depth
;
1642 disk
->do_targetq
= _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT
, false);
1644 TAILQ_INIT(&disk
->operations
);
1645 disk
->cur_rq
= TAILQ_FIRST(&disk
->operations
);
1647 snprintf(label
, sizeof(label
), "com.apple.libdispatch-io.deviceq.%d",
1649 disk
->pick_queue
= dispatch_queue_create(label
, NULL
);
1650 TAILQ_INSERT_TAIL(&_dispatch_io_devs
[hash
], disk
, disk_list
);
1652 fd_entry
->disk
= disk
;
1653 TAILQ_INIT(&fd_entry
->stream_ops
);
1657 _dispatch_disk_dispose(dispatch_disk_t disk
, DISPATCH_UNUSED
bool *allow_free
)
1659 uintptr_t hash
= DIO_HASH(disk
->dev
);
1660 TAILQ_REMOVE(&_dispatch_io_devs
[hash
], disk
, disk_list
);
1661 dispatch_assert(TAILQ_EMPTY(&disk
->operations
));
1663 for (i
=0; i
<disk
->advise_list_depth
; ++i
) {
1664 dispatch_assert(!disk
->advise_list
[i
]);
1666 dispatch_release(disk
->pick_queue
);
1670 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1673 _dispatch_stream_operation_avail(dispatch_stream_t stream
)
1675 return !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) ||
1676 !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
]));
1680 _dispatch_stream_enqueue_operation(dispatch_stream_t stream
,
1681 dispatch_operation_t op
, dispatch_data_t data
)
1683 if (!_dispatch_operation_should_enqueue(op
, stream
->dq
, data
)) {
1686 _dispatch_object_debug(op
, "%s", __func__
);
1687 bool no_ops
= !_dispatch_stream_operation_avail(stream
);
1688 TAILQ_INSERT_TAIL(&stream
->operations
[op
->params
.type
], op
, operation_list
);
1690 dispatch_async_f(stream
->dq
, stream
->dq
,
1691 _dispatch_stream_queue_handler
);
1696 _dispatch_disk_enqueue_operation(dispatch_disk_t disk
, dispatch_operation_t op
,
1697 dispatch_data_t data
)
1699 if (!_dispatch_operation_should_enqueue(op
, disk
->pick_queue
, data
)) {
1702 _dispatch_object_debug(op
, "%s", __func__
);
1703 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1704 if (TAILQ_EMPTY(&op
->fd_entry
->stream_ops
)) {
1705 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
);
1707 TAILQ_INSERT_TAIL(&op
->fd_entry
->stream_ops
, op
, stream_list
);
1709 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
);
1711 _dispatch_disk_handler(disk
);
1715 _dispatch_stream_complete_operation(dispatch_stream_t stream
,
1716 dispatch_operation_t op
)
1719 _dispatch_object_debug(op
, "%s", __func__
);
1720 _dispatch_op_debug("complete: stream %p", op
, stream
);
1721 TAILQ_REMOVE(&stream
->operations
[op
->params
.type
], op
, operation_list
);
1722 if (op
== stream
->op
) {
1726 dispatch_source_cancel(op
->timer
);
1728 // Final release will deliver any pending data
1729 _dispatch_op_debug("release -> %d (stream complete)", op
, op
->do_ref_cnt
);
1730 _dispatch_release(op
);
1734 _dispatch_disk_complete_operation(dispatch_disk_t disk
, dispatch_operation_t op
)
1737 _dispatch_object_debug(op
, "%s", __func__
);
1738 _dispatch_op_debug("complete: disk %p", op
, disk
);
1739 // Current request is always the last op returned
1740 if (disk
->cur_rq
== op
) {
1741 disk
->cur_rq
= TAILQ_PREV(op
, dispatch_disk_operations_s
,
1744 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1745 // Check if there are other pending stream operations behind it
1746 dispatch_operation_t op_next
= TAILQ_NEXT(op
, stream_list
);
1747 TAILQ_REMOVE(&op
->fd_entry
->stream_ops
, op
, stream_list
);
1749 TAILQ_INSERT_TAIL(&disk
->operations
, op_next
, operation_list
);
1752 TAILQ_REMOVE(&disk
->operations
, op
, operation_list
);
1754 dispatch_source_cancel(op
->timer
);
1756 // Final release will deliver any pending data
1757 _dispatch_op_debug("release -> %d (disk complete)", op
, op
->do_ref_cnt
);
1758 _dispatch_release(op
);
1761 static dispatch_operation_t
1762 _dispatch_stream_pick_next_operation(dispatch_stream_t stream
,
1763 dispatch_operation_t op
)
1767 // On the first run through, pick the first operation
1768 if (!_dispatch_stream_operation_avail(stream
)) {
1771 if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
])) {
1772 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_STREAM
]);
1773 } else if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) {
1774 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1778 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1779 // Stream operations need to be serialized so continue the current
1780 // operation until it is finished
1783 // Get the next random operation (round-robin)
1784 if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
1785 op
= TAILQ_NEXT(op
, operation_list
);
1787 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1794 static dispatch_operation_t
1795 _dispatch_disk_pick_next_operation(dispatch_disk_t disk
)
1798 dispatch_operation_t op
;
1799 if (!TAILQ_EMPTY(&disk
->operations
)) {
1800 if (disk
->cur_rq
== NULL
) {
1801 op
= TAILQ_FIRST(&disk
->operations
);
1805 op
= TAILQ_NEXT(op
, operation_list
);
1807 op
= TAILQ_FIRST(&disk
->operations
);
1809 // TODO: more involved picking algorithm rdar://problem/8780312
1810 } while (op
->active
&& op
!= disk
->cur_rq
);
1821 _dispatch_stream_cleanup_operations(dispatch_stream_t stream
,
1822 dispatch_io_t channel
)
1825 dispatch_operation_t op
, tmp
;
1826 typeof(*stream
->operations
) *operations
;
1827 operations
= &stream
->operations
[DISPATCH_IO_RANDOM
];
1828 TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) {
1829 if (!channel
|| op
->channel
== channel
) {
1830 _dispatch_stream_complete_operation(stream
, op
);
1833 operations
= &stream
->operations
[DISPATCH_IO_STREAM
];
1834 TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) {
1835 if (!channel
|| op
->channel
== channel
) {
1836 _dispatch_stream_complete_operation(stream
, op
);
1839 if (stream
->source_running
&& !_dispatch_stream_operation_avail(stream
)) {
1840 dispatch_suspend(stream
->source
);
1841 stream
->source_running
= false;
1846 _dispatch_disk_cleanup_specified_operations(dispatch_disk_t disk
,
1847 dispatch_io_t channel
, bool inactive_only
)
1850 dispatch_operation_t op
, tmp
;
1851 TAILQ_FOREACH_SAFE(op
, &disk
->operations
, operation_list
, tmp
) {
1852 if (inactive_only
&& op
->active
) continue;
1853 if (!channel
|| op
->channel
== channel
) {
1854 _dispatch_op_debug("cleanup: disk %p", op
, disk
);
1855 _dispatch_disk_complete_operation(disk
, op
);
1861 _dispatch_disk_cleanup_operations(dispatch_disk_t disk
, dispatch_io_t channel
)
1863 _dispatch_disk_cleanup_specified_operations(disk
, channel
, false);
1867 _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk
,
1868 dispatch_io_t channel
)
1870 _dispatch_disk_cleanup_specified_operations(disk
, channel
, true);
1874 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1876 static dispatch_source_t
1877 _dispatch_stream_source(dispatch_stream_t stream
, dispatch_operation_t op
)
1880 if (stream
->source
) {
1881 return stream
->source
;
1883 dispatch_fd_t fd
= op
->fd_entry
->fd
;
1884 _dispatch_op_debug("stream source create", op
);
1885 dispatch_source_t source
= NULL
;
1886 if (op
->direction
== DOP_DIR_READ
) {
1887 source
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
,
1888 (uintptr_t)fd
, 0, stream
->dq
);
1889 } else if (op
->direction
== DOP_DIR_WRITE
) {
1890 source
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
,
1891 (uintptr_t)fd
, 0, stream
->dq
);
1893 dispatch_assert(op
->direction
< DOP_DIR_MAX
);
1896 dispatch_set_context(source
, stream
);
1897 dispatch_source_set_event_handler_f(source
,
1898 _dispatch_stream_source_handler
);
1899 // Close queue must not run user cleanup handlers until sources are fully
1901 dispatch_queue_t close_queue
= op
->fd_entry
->close_queue
;
1902 dispatch_source_set_mandatory_cancel_handler(source
, ^{
1903 _dispatch_op_debug("stream source cancel", op
);
1904 dispatch_resume(close_queue
);
1906 stream
->source
= source
;
1907 return stream
->source
;
1911 _dispatch_stream_source_handler(void *ctx
)
1914 dispatch_stream_t stream
= (dispatch_stream_t
)ctx
;
1915 dispatch_suspend(stream
->source
);
1916 stream
->source_running
= false;
1917 return _dispatch_stream_handler(stream
);
1921 _dispatch_stream_queue_handler(void *ctx
)
1924 dispatch_stream_t stream
= (dispatch_stream_t
)dispatch_get_context(ctx
);
1926 // _dispatch_stream_dispose has been called
1929 return _dispatch_stream_handler(stream
);
1933 _dispatch_stream_handler(void *ctx
)
1936 dispatch_stream_t stream
= (dispatch_stream_t
)ctx
;
1937 dispatch_operation_t op
;
1939 op
= _dispatch_stream_pick_next_operation(stream
, stream
->op
);
1941 _dispatch_debug("no operation found: stream %p", stream
);
1944 int err
= _dispatch_io_get_error(op
, NULL
, true);
1947 _dispatch_stream_complete_operation(stream
, op
);
1951 _dispatch_op_debug("stream handler", op
);
1952 dispatch_fd_entry_t fd_entry
= op
->fd_entry
;
1953 _dispatch_fd_entry_retain(fd_entry
);
1954 // For performance analysis
1955 if (!op
->total
&& dispatch_io_defaults
.initial_delivery
) {
1956 // Empty delivery to signal the start of the operation
1957 _dispatch_op_debug("initial delivery", op
);
1958 _dispatch_operation_deliver_data(op
, DOP_DELIVER
);
1960 // TODO: perform on the operation target queue to get correct priority
1961 int result
= _dispatch_operation_perform(op
);
1962 dispatch_op_flags_t flags
= ~0u;
1964 case DISPATCH_OP_DELIVER
:
1965 flags
= DOP_DEFAULT
;
1967 case DISPATCH_OP_DELIVER_AND_COMPLETE
:
1968 flags
= (flags
!= DOP_DEFAULT
) ? DOP_DELIVER
| DOP_NO_EMPTY
:
1970 _dispatch_operation_deliver_data(op
, flags
);
1972 case DISPATCH_OP_COMPLETE
:
1973 if (flags
!= DOP_DEFAULT
) {
1974 _dispatch_stream_complete_operation(stream
, op
);
1976 if (_dispatch_stream_operation_avail(stream
)) {
1977 dispatch_async_f(stream
->dq
, stream
->dq
,
1978 _dispatch_stream_queue_handler
);
1981 case DISPATCH_OP_COMPLETE_RESUME
:
1982 _dispatch_stream_complete_operation(stream
, op
);
1984 case DISPATCH_OP_RESUME
:
1985 if (_dispatch_stream_operation_avail(stream
)) {
1986 stream
->source_running
= true;
1987 dispatch_resume(_dispatch_stream_source(stream
, op
));
1990 case DISPATCH_OP_ERR
:
1991 _dispatch_stream_cleanup_operations(stream
, op
->channel
);
1993 case DISPATCH_OP_FD_ERR
:
1994 _dispatch_fd_entry_retain(fd_entry
);
1995 dispatch_async(fd_entry
->barrier_queue
, ^{
1996 _dispatch_fd_entry_cleanup_operations(fd_entry
, NULL
);
1997 _dispatch_fd_entry_release(fd_entry
);
2003 _dispatch_fd_entry_release(fd_entry
);
2008 _dispatch_disk_handler(void *ctx
)
2011 dispatch_disk_t disk
= (dispatch_disk_t
)ctx
;
2012 if (disk
->io_active
) {
2015 _dispatch_disk_debug("disk handler", disk
);
2016 dispatch_operation_t op
;
2017 size_t i
= disk
->free_idx
, j
= disk
->req_idx
;
2019 j
+= disk
->advise_list_depth
;
2022 if ((!disk
->advise_list
[i%disk
->advise_list_depth
]) &&
2023 (op
= _dispatch_disk_pick_next_operation(disk
))) {
2024 int err
= _dispatch_io_get_error(op
, NULL
, true);
2027 _dispatch_disk_complete_operation(disk
, op
);
2030 _dispatch_retain(op
);
2031 _dispatch_op_debug("retain -> %d", op
, op
->do_ref_cnt
+ 1);
2032 disk
->advise_list
[i%disk
->advise_list_depth
] = op
;
2034 _dispatch_op_debug("activate: disk %p", op
, disk
);
2035 _dispatch_object_debug(op
, "%s", __func__
);
2037 // No more operations to get
2042 disk
->free_idx
= (i%disk
->advise_list_depth
);
2043 op
= disk
->advise_list
[disk
->req_idx
];
2045 disk
->io_active
= true;
2046 _dispatch_op_debug("async perform: disk %p", op
, disk
);
2047 dispatch_async_f(op
->do_targetq
, disk
, _dispatch_disk_perform
);
2052 _dispatch_disk_perform(void *ctxt
)
2054 dispatch_disk_t disk
= ctxt
;
2055 _dispatch_disk_debug("disk perform", disk
);
2056 size_t chunk_size
= dispatch_io_defaults
.chunk_size
;
2057 dispatch_operation_t op
;
2058 size_t i
= disk
->advise_idx
, j
= disk
->free_idx
;
2060 j
+= disk
->advise_list_depth
;
2063 op
= disk
->advise_list
[i%disk
->advise_list_depth
];
2065 // Nothing more to advise, must be at free_idx
2066 dispatch_assert(i%disk
->advise_list_depth
== disk
->free_idx
);
2069 if (op
->direction
== DOP_DIR_WRITE
) {
2070 // TODO: preallocate writes ? rdar://problem/9032172
2073 if (op
->fd_entry
->fd
== -1 && _dispatch_fd_entry_open(op
->fd_entry
,
2077 // For performance analysis
2078 if (!op
->total
&& dispatch_io_defaults
.initial_delivery
) {
2079 // Empty delivery to signal the start of the operation
2080 _dispatch_op_debug("initial delivery", op
);
2081 _dispatch_operation_deliver_data(op
, DOP_DELIVER
);
2083 // Advise two chunks if the list only has one element and this is the
2084 // first advise on the operation
2085 if ((j
-i
) == 1 && !disk
->advise_list
[disk
->free_idx
] &&
2086 !op
->advise_offset
) {
2089 _dispatch_operation_advise(op
, chunk_size
);
2091 disk
->advise_idx
= i%disk
->advise_list_depth
;
2092 op
= disk
->advise_list
[disk
->req_idx
];
2093 int result
= _dispatch_operation_perform(op
);
2094 disk
->advise_list
[disk
->req_idx
] = NULL
;
2095 disk
->req_idx
= (++disk
->req_idx
)%disk
->advise_list_depth
;
2096 _dispatch_op_debug("async perform completion: disk %p", op
, disk
);
2097 dispatch_async(disk
->pick_queue
, ^{
2098 _dispatch_op_debug("perform completion", op
);
2100 case DISPATCH_OP_DELIVER
:
2101 _dispatch_operation_deliver_data(op
, DOP_DEFAULT
);
2103 case DISPATCH_OP_COMPLETE
:
2104 _dispatch_disk_complete_operation(disk
, op
);
2106 case DISPATCH_OP_DELIVER_AND_COMPLETE
:
2107 _dispatch_operation_deliver_data(op
, DOP_DELIVER
| DOP_NO_EMPTY
);
2108 _dispatch_disk_complete_operation(disk
, op
);
2110 case DISPATCH_OP_ERR
:
2111 _dispatch_disk_cleanup_operations(disk
, op
->channel
);
2113 case DISPATCH_OP_FD_ERR
:
2114 _dispatch_disk_cleanup_operations(disk
, NULL
);
2117 dispatch_assert(result
);
2120 _dispatch_op_debug("deactivate: disk %p", op
, disk
);
2122 disk
->io_active
= false;
2123 _dispatch_disk_handler(disk
);
2124 // Balancing the retain in _dispatch_disk_handler. Note that op must be
2125 // released at the very end, since it might hold the last reference to
2127 _dispatch_op_debug("release -> %d (disk perform complete)", op
,
2129 _dispatch_release(op
);
2134 #pragma mark dispatch_operation_perform
2137 _dispatch_operation_advise(dispatch_operation_t op
, size_t chunk_size
)
2139 _dispatch_op_debug("advise", op
);
2140 if (_dispatch_io_get_error(op
, NULL
, true)) return;
2142 // linux does not support fcntl (F_RDAVISE)
2143 // define necessary datastructure and use readahead
2150 struct radvisory advise
;
2151 // No point in issuing a read advise for the next chunk if we are already
2152 // a chunk ahead from reading the bytes
2153 if (op
->advise_offset
> (off_t
)(((size_t)op
->offset
+ op
->total
) +
2154 chunk_size
+ PAGE_SIZE
)) {
2157 _dispatch_object_debug(op
, "%s", __func__
);
2158 advise
.ra_count
= (int)chunk_size
;
2159 if (!op
->advise_offset
) {
2160 op
->advise_offset
= op
->offset
;
2161 // If this is the first time through, align the advised range to a
2163 size_t pg_fraction
= ((size_t)op
->offset
+ chunk_size
) % PAGE_SIZE
;
2164 advise
.ra_count
+= (int)(pg_fraction
? PAGE_SIZE
- pg_fraction
: 0);
2166 advise
.ra_offset
= op
->advise_offset
;
2167 op
->advise_offset
+= advise
.ra_count
;
2169 _dispatch_io_syscall_switch(err
,
2170 readahead(op
->fd_entry
->fd
, advise
.ra_offset
, (size_t)advise
.ra_count
),
2171 case EINVAL
: break; // fd does refer to a non-supported filetype
2172 default: (void)dispatch_assume_zero(err
); break;
2175 _dispatch_io_syscall_switch(err
,
2176 fcntl(op
->fd_entry
->fd
, F_RDADVISE
, &advise
),
2177 case EFBIG
: break; // advised past the end of the file rdar://10415691
2178 case ENOTSUP
: break; // not all FS support radvise rdar://13484629
2179 // TODO: set disk status on error
2180 default: (void)dispatch_assume_zero(err
); break;
2186 _dispatch_operation_perform(dispatch_operation_t op
)
2188 _dispatch_op_debug("perform", op
);
2189 int err
= _dispatch_io_get_error(op
, NULL
, true);
2193 _dispatch_object_debug(op
, "%s", __func__
);
2195 size_t max_buf_siz
= op
->params
.high
;
2196 size_t chunk_siz
= dispatch_io_defaults
.chunk_size
;
2197 if (op
->direction
== DOP_DIR_READ
) {
2198 // If necessary, create a buffer for the ongoing operation, large
2199 // enough to fit chunk_size but at most high-water
2200 size_t data_siz
= dispatch_data_get_size(op
->data
);
2202 dispatch_assert(data_siz
< max_buf_siz
);
2203 max_buf_siz
-= data_siz
;
2205 if (max_buf_siz
> chunk_siz
) {
2206 max_buf_siz
= chunk_siz
;
2208 if (op
->length
< SIZE_MAX
) {
2209 op
->buf_siz
= op
->length
- op
->total
;
2210 if (op
->buf_siz
> max_buf_siz
) {
2211 op
->buf_siz
= max_buf_siz
;
2214 op
->buf_siz
= max_buf_siz
;
2216 op
->buf
= valloc(op
->buf_siz
);
2217 _dispatch_op_debug("buffer allocated", op
);
2218 } else if (op
->direction
== DOP_DIR_WRITE
) {
2219 // Always write the first data piece, if that is smaller than a
2220 // chunk, accumulate further data pieces until chunk size is reached
2221 if (chunk_siz
> max_buf_siz
) {
2222 chunk_siz
= max_buf_siz
;
2225 dispatch_data_apply(op
->data
,
2226 ^(dispatch_data_t region DISPATCH_UNUSED
,
2227 size_t offset DISPATCH_UNUSED
,
2228 const void* buf DISPATCH_UNUSED
, size_t len
) {
2229 size_t siz
= op
->buf_siz
+ len
;
2230 if (!op
->buf_siz
|| siz
<= chunk_siz
) {
2233 return (bool)(siz
< chunk_siz
);
2235 if (op
->buf_siz
> max_buf_siz
) {
2236 op
->buf_siz
= max_buf_siz
;
2239 d
= dispatch_data_create_subrange(op
->data
, 0, op
->buf_siz
);
2240 op
->buf_data
= dispatch_data_create_map(d
, (const void**)&op
->buf
,
2242 _dispatch_io_data_release(d
);
2243 _dispatch_op_debug("buffer mapped", op
);
2246 if (op
->fd_entry
->fd
== -1) {
2247 err
= _dispatch_fd_entry_open(op
->fd_entry
, op
->channel
);
2252 void *buf
= op
->buf
+ op
->buf_len
;
2253 size_t len
= op
->buf_siz
- op
->buf_len
;
2254 off_t off
= (off_t
)((size_t)op
->offset
+ op
->total
);
2255 ssize_t processed
= -1;
2257 if (op
->direction
== DOP_DIR_READ
) {
2258 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
2259 processed
= read(op
->fd_entry
->fd
, buf
, len
);
2260 } else if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
2261 processed
= pread(op
->fd_entry
->fd
, buf
, len
, off
);
2263 } else if (op
->direction
== DOP_DIR_WRITE
) {
2264 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
2265 processed
= write(op
->fd_entry
->fd
, buf
, len
);
2266 } else if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
2267 processed
= pwrite(op
->fd_entry
->fd
, buf
, len
, off
);
2270 // Encountered an error on the file descriptor
2271 if (processed
== -1) {
2278 // EOF is indicated by two handler invocations
2279 if (processed
== 0) {
2280 _dispatch_op_debug("performed: EOF", op
);
2281 return DISPATCH_OP_DELIVER_AND_COMPLETE
;
2283 op
->buf_len
+= (size_t)processed
;
2284 op
->total
+= (size_t)processed
;
2285 if (op
->total
== op
->length
) {
2286 // Finished processing all the bytes requested by the operation
2287 return DISPATCH_OP_COMPLETE
;
2289 // Deliver data only if we satisfy the filters
2290 return DISPATCH_OP_DELIVER
;
2293 if (err
== EAGAIN
|| err
== EWOULDBLOCK
) {
2294 // For disk based files with blocking I/O we should never get EAGAIN
2295 dispatch_assert(!op
->fd_entry
->disk
);
2296 _dispatch_op_debug("performed: EAGAIN/EWOULDBLOCK", op
);
2297 if (op
->direction
== DOP_DIR_READ
&& op
->total
&&
2298 op
->channel
== op
->fd_entry
->convenience_channel
) {
2299 // Convenience read with available data completes on EAGAIN
2300 return DISPATCH_OP_COMPLETE_RESUME
;
2302 return DISPATCH_OP_RESUME
;
2304 _dispatch_op_debug("performed: err %d", op
, err
);
2308 return DISPATCH_OP_ERR
;
2310 (void)os_atomic_cmpxchg2o(op
->fd_entry
, err
, 0, err
, relaxed
);
2311 return DISPATCH_OP_FD_ERR
;
2313 return DISPATCH_OP_COMPLETE
;
2318 _dispatch_operation_deliver_data(dispatch_operation_t op
,
2319 dispatch_op_flags_t flags
)
2321 // Either called from stream resp. pick queue or when op is finalized
2322 dispatch_data_t data
= NULL
;
2324 size_t undelivered
= op
->undelivered
+ op
->buf_len
;
2325 bool deliver
= (flags
& (DOP_DELIVER
|DOP_DONE
)) ||
2326 (op
->flags
& DOP_DELIVER
);
2327 op
->flags
= DOP_DEFAULT
;
2329 // Don't deliver data until low water mark has been reached
2330 if (undelivered
>= op
->params
.low
) {
2332 } else if (op
->buf_len
< op
->buf_siz
) {
2333 // Request buffer is not yet used up
2334 _dispatch_op_debug("buffer data: undelivered %zu", op
, undelivered
);
2339 if (!err
&& (op
->channel
->atomic_flags
& DIO_STOPPED
)) {
2344 // Deliver data or buffer used up
2345 if (op
->direction
== DOP_DIR_READ
) {
2347 void *buf
= op
->buf
;
2348 data
= dispatch_data_create(buf
, op
->buf_len
, NULL
,
2349 DISPATCH_DATA_DESTRUCTOR_FREE
);
2352 dispatch_data_t d
= dispatch_data_create_concat(op
->data
, data
);
2353 _dispatch_io_data_release(op
->data
);
2354 _dispatch_io_data_release(data
);
2359 op
->data
= deliver
? dispatch_data_empty
: data
;
2360 } else if (op
->direction
== DOP_DIR_WRITE
) {
2362 data
= dispatch_data_create_subrange(op
->data
, op
->buf_len
,
2365 if (op
->buf_data
&& op
->buf_len
== op
->buf_siz
) {
2366 _dispatch_io_data_release(op
->buf_data
);
2367 op
->buf_data
= NULL
;
2370 // Trim newly written buffer from head of unwritten data
2373 _dispatch_io_data_retain(data
);
2376 d
= dispatch_data_create_subrange(op
->data
, op
->buf_siz
,
2379 _dispatch_io_data_release(op
->data
);
2383 dispatch_assert(op
->direction
< DOP_DIR_MAX
);
2386 if (!deliver
|| ((flags
& DOP_NO_EMPTY
) && !dispatch_data_get_size(data
))) {
2387 op
->undelivered
= undelivered
;
2388 _dispatch_op_debug("buffer data: undelivered %zu", op
, undelivered
);
2391 op
->undelivered
= 0;
2392 _dispatch_object_debug(op
, "%s", __func__
);
2393 _dispatch_op_debug("deliver data", op
);
2394 dispatch_op_direction_t direction
= op
->direction
;
2395 dispatch_io_handler_t handler
= op
->handler
;
2396 dispatch_fd_entry_t fd_entry
= op
->fd_entry
;
2397 _dispatch_fd_entry_retain(fd_entry
);
2398 dispatch_io_t channel
= op
->channel
;
2399 _dispatch_retain(channel
);
2400 // Note that data delivery may occur after the operation is freed
2401 dispatch_async(op
->op_q
, ^{
2402 bool done
= (flags
& DOP_DONE
);
2403 dispatch_data_t d
= data
;
2405 if (direction
== DOP_DIR_READ
&& err
) {
2406 if (dispatch_data_get_size(d
)) {
2407 _dispatch_op_debug("IO handler invoke", op
);
2408 handler(false, d
, 0);
2411 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
2415 _dispatch_op_debug("IO handler invoke: err %d", op
, err
);
2416 handler(done
, d
, err
);
2417 _dispatch_release(channel
);
2418 _dispatch_fd_entry_release(fd_entry
);
2419 _dispatch_io_data_release(data
);
2424 #pragma mark dispatch_io_debug
2427 _dispatch_io_debug_attr(dispatch_io_t channel
, char* buf
, size_t bufsiz
)
2429 dispatch_queue_t target
= channel
->do_targetq
;
2430 return dsnprintf(buf
, bufsiz
, "type = %s, fd = 0x%x, %sfd_entry = %p, "
2431 "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = "
2432 "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ",
2433 channel
->params
.type
== DISPATCH_IO_STREAM
? "stream" : "random",
2434 channel
->fd_actual
, channel
->atomic_flags
& DIO_STOPPED
?
2435 "stopped, " : channel
->atomic_flags
& DIO_CLOSED
? "closed, " : "",
2436 channel
->fd_entry
, channel
->queue
, target
&& target
->dq_label
?
2437 target
->dq_label
: "", target
, channel
->barrier_queue
,
2438 channel
->barrier_group
, channel
->err
, channel
->params
.low
,
2439 channel
->params
.high
, channel
->params
.interval_flags
&
2440 DISPATCH_IO_STRICT_INTERVAL
? "(strict)" : "",
2441 (unsigned long long) channel
->params
.interval
);
2445 _dispatch_io_debug(dispatch_io_t channel
, char* buf
, size_t bufsiz
)
2448 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
2449 dx_kind(channel
), channel
);
2450 offset
+= _dispatch_object_debug_attr(channel
, &buf
[offset
],
2452 offset
+= _dispatch_io_debug_attr(channel
, &buf
[offset
], bufsiz
- offset
);
2453 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
2458 _dispatch_operation_debug_attr(dispatch_operation_t op
, char* buf
,
2461 dispatch_queue_t target
= op
->do_targetq
;
2462 dispatch_queue_t oqtarget
= op
->op_q
? op
->op_q
->do_targetq
: NULL
;
2463 return dsnprintf(buf
, bufsiz
, "type = %s %s, fd = 0x%x, fd_entry = %p, "
2464 "channel = %p, queue = %p -> %s[%p], target = %s[%p], "
2465 "offset = %lld, length = %zu, done = %zu, undelivered = %zu, "
2466 "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, "
2467 "interval%s = %llu ", op
->params
.type
== DISPATCH_IO_STREAM
?
2468 "stream" : "random", op
->direction
== DOP_DIR_READ
? "read" :
2469 "write", op
->fd_entry
? op
->fd_entry
->fd
: -1, op
->fd_entry
,
2470 op
->channel
, op
->op_q
, oqtarget
&& oqtarget
->dq_label
?
2471 oqtarget
->dq_label
: "", oqtarget
, target
&& target
->dq_label
?
2472 target
->dq_label
: "", target
, (long long)op
->offset
, op
->length
,
2473 op
->total
, op
->undelivered
+ op
->buf_len
, op
->flags
, op
->err
,
2474 op
->params
.low
, op
->params
.high
, op
->params
.interval_flags
&
2475 DISPATCH_IO_STRICT_INTERVAL
? "(strict)" : "",
2476 (unsigned long long)op
->params
.interval
);
2480 _dispatch_operation_debug(dispatch_operation_t op
, char* buf
, size_t bufsiz
)
2483 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
2485 offset
+= _dispatch_object_debug_attr(op
, &buf
[offset
], bufsiz
- offset
);
2486 offset
+= _dispatch_operation_debug_attr(op
, &buf
[offset
], bufsiz
- offset
);
2487 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");