2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 #ifndef DISPATCH_IO_DEBUG
24 #define DISPATCH_IO_DEBUG DISPATCH_DEBUG
27 #if DISPATCH_DATA_IS_BRIDGED_TO_NSDATA
28 #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
29 #define _dispatch_io_data_release(x) _dispatch_objc_release(x)
31 #define _dispatch_io_data_retain(x) dispatch_retain(x)
32 #define _dispatch_io_data_release(x) dispatch_release(x)
35 typedef void (^dispatch_fd_entry_init_callback_t
)(dispatch_fd_entry_t fd_entry
);
37 DISPATCH_EXPORT DISPATCH_NOTHROW
38 void _dispatch_iocntl(uint32_t param
, uint64_t value
);
40 static dispatch_operation_t
_dispatch_operation_create(
41 dispatch_op_direction_t direction
, dispatch_io_t channel
, off_t offset
,
42 size_t length
, dispatch_data_t data
, dispatch_queue_t queue
,
43 dispatch_io_handler_t handler
);
44 static void _dispatch_operation_enqueue(dispatch_operation_t op
,
45 dispatch_op_direction_t direction
, dispatch_data_t data
);
46 static dispatch_source_t
_dispatch_operation_timer(dispatch_queue_t tq
,
47 dispatch_operation_t op
);
48 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
);
49 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
);
50 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd
,
51 dispatch_fd_entry_init_callback_t completion_callback
);
52 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
,
54 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_path(
55 dispatch_io_path_data_t path_data
, dev_t dev
, mode_t mode
);
56 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
,
57 dispatch_io_t channel
);
58 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
,
59 dispatch_io_t channel
);
60 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry
,
62 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
,
63 dispatch_op_direction_t direction
);
64 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
);
65 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream
,
66 dispatch_operation_t operation
, dispatch_data_t data
);
67 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk
,
68 dispatch_operation_t operation
, dispatch_data_t data
);
69 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream
,
70 dispatch_io_t channel
);
71 static void _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk
,
72 dispatch_io_t channel
);
73 static void _dispatch_stream_source_handler(void *ctx
);
74 static void _dispatch_stream_queue_handler(void *ctx
);
75 static void _dispatch_stream_handler(void *ctx
);
76 static void _dispatch_disk_handler(void *ctx
);
77 static void _dispatch_disk_perform(void *ctxt
);
78 static void _dispatch_operation_advise(dispatch_operation_t op
,
80 static int _dispatch_operation_perform(dispatch_operation_t op
);
81 static void _dispatch_operation_deliver_data(dispatch_operation_t op
,
82 dispatch_op_flags_t flags
);
84 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
85 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
86 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
87 case EINTR: continue; \
92 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
93 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
98 #define _dispatch_io_syscall(__syscall) do { int __err; \
99 _dispatch_io_syscall_switch(__err, __syscall); \
103 DISPATCH_OP_COMPLETE
= 1,
105 DISPATCH_OP_DELIVER_AND_COMPLETE
,
106 DISPATCH_OP_COMPLETE_RESUME
,
112 #define _dispatch_io_Block_copy(x) \
113 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
116 #pragma mark dispatch_io_debug
118 #if DISPATCH_IO_DEBUG
120 #define _dispatch_io_log(x, ...) do { \
121 _dispatch_log("%llu\t%p\t" x, _dispatch_absolute_time(), \
122 (void *)_dispatch_thread_self(), ##__VA_ARGS__); \
124 #ifdef _dispatch_object_debug
125 #undef _dispatch_object_debug
126 #define _dispatch_object_debug dispatch_debug
127 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
130 #define _dispatch_io_log(x, ...) _dispatch_debug(x, ##__VA_ARGS__)
131 #endif // DISPATCH_DEBUG
133 #define _dispatch_io_log(x, ...)
134 #endif // DISPATCH_IO_DEBUG
136 #define _dispatch_fd_debug(msg, fd, ...) \
137 _dispatch_io_log("fd[0x%x]: " msg, fd, ##__VA_ARGS__)
138 #define _dispatch_op_debug(msg, op, ...) \
139 _dispatch_io_log("op[%p]: " msg, op, ##__VA_ARGS__)
140 #define _dispatch_channel_debug(msg, channel, ...) \
141 _dispatch_io_log("channel[%p]: " msg, channel, ##__VA_ARGS__)
142 #define _dispatch_fd_entry_debug(msg, fd_entry, ...) \
143 _dispatch_io_log("fd_entry[%p]: " msg, fd_entry, ##__VA_ARGS__)
144 #define _dispatch_disk_debug(msg, disk, ...) \
145 _dispatch_io_log("disk[%p]: " msg, disk, ##__VA_ARGS__)
148 #pragma mark dispatch_io_hashtables
150 // Global hashtable of dev_t -> disk_s mappings
151 DISPATCH_CACHELINE_ALIGN
152 static TAILQ_HEAD(, dispatch_disk_s
) _dispatch_io_devs
[DIO_HASH_SIZE
];
153 // Global hashtable of fd -> fd_entry_s mappings
154 DISPATCH_CACHELINE_ALIGN
155 static TAILQ_HEAD(, dispatch_fd_entry_s
) _dispatch_io_fds
[DIO_HASH_SIZE
];
157 static dispatch_once_t _dispatch_io_devs_lockq_pred
;
158 static dispatch_queue_t _dispatch_io_devs_lockq
;
159 static dispatch_queue_t _dispatch_io_fds_lockq
;
161 static char const * const _dispatch_io_key
= "io";
164 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED
)
166 _dispatch_io_fds_lockq
= dispatch_queue_create(
167 "com.apple.libdispatch-io.fd_lockq", NULL
);
169 for (i
= 0; i
< DIO_HASH_SIZE
; i
++) {
170 TAILQ_INIT(&_dispatch_io_fds
[i
]);
175 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED
)
177 _dispatch_io_devs_lockq
= dispatch_queue_create(
178 "com.apple.libdispatch-io.dev_lockq", NULL
);
180 for (i
= 0; i
< DIO_HASH_SIZE
; i
++) {
181 TAILQ_INIT(&_dispatch_io_devs
[i
]);
186 #pragma mark dispatch_io_defaults
189 DISPATCH_IOCNTL_CHUNK_PAGES
= 1,
190 DISPATCH_IOCNTL_LOW_WATER_CHUNKS
,
191 DISPATCH_IOCNTL_INITIAL_DELIVERY
,
192 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
,
195 static struct dispatch_io_defaults_s
{
196 size_t chunk_size
, low_water_chunks
, max_pending_io_reqs
;
197 bool initial_delivery
;
198 } dispatch_io_defaults
= {
199 .chunk_size
= DIO_MAX_CHUNK_SIZE
,
200 .low_water_chunks
= DIO_DEFAULT_LOW_WATER_CHUNKS
,
201 .max_pending_io_reqs
= DIO_MAX_PENDING_IO_REQS
,
204 #define _dispatch_iocntl_set_default(p, v) do { \
205 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
209 _dispatch_iocntl(uint32_t param
, uint64_t value
)
212 case DISPATCH_IOCNTL_CHUNK_PAGES
:
213 _dispatch_iocntl_set_default(chunk_size
, value
* PAGE_SIZE
);
215 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS
:
216 _dispatch_iocntl_set_default(low_water_chunks
, value
);
218 case DISPATCH_IOCNTL_INITIAL_DELIVERY
:
219 _dispatch_iocntl_set_default(initial_delivery
, value
);
220 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
:
221 _dispatch_iocntl_set_default(max_pending_io_reqs
, value
);
227 #pragma mark dispatch_io_t
230 _dispatch_io_create(dispatch_io_type_t type
)
232 dispatch_io_t channel
= _dispatch_alloc(DISPATCH_VTABLE(io
),
233 sizeof(struct dispatch_io_s
));
234 channel
->do_next
= DISPATCH_OBJECT_LISTLESS
;
235 channel
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
237 channel
->params
.type
= type
;
238 channel
->params
.high
= SIZE_MAX
;
239 channel
->params
.low
= dispatch_io_defaults
.low_water_chunks
*
240 dispatch_io_defaults
.chunk_size
;
241 channel
->queue
= dispatch_queue_create("com.apple.libdispatch-io.channelq",
247 _dispatch_io_init(dispatch_io_t channel
, dispatch_fd_entry_t fd_entry
,
248 dispatch_queue_t queue
, int err
, void (^cleanup_handler
)(int))
250 // Enqueue the cleanup handler on the suspended close queue
251 if (cleanup_handler
) {
252 _dispatch_retain(queue
);
253 dispatch_async(!err
? fd_entry
->close_queue
: channel
->queue
, ^{
254 dispatch_async(queue
, ^{
255 _dispatch_channel_debug("cleanup handler invoke: err %d",
257 cleanup_handler(err
);
259 _dispatch_release(queue
);
263 channel
->fd_entry
= fd_entry
;
264 dispatch_retain(fd_entry
->barrier_queue
);
265 dispatch_retain(fd_entry
->barrier_group
);
266 channel
->barrier_queue
= fd_entry
->barrier_queue
;
267 channel
->barrier_group
= fd_entry
->barrier_group
;
269 // Still need to create a barrier queue, since all operations go
271 channel
->barrier_queue
= dispatch_queue_create(
272 "com.apple.libdispatch-io.barrierq", NULL
);
273 channel
->barrier_group
= dispatch_group_create();
278 _dispatch_io_dispose(dispatch_io_t channel
)
280 _dispatch_object_debug(channel
, "%s", __func__
);
281 if (channel
->fd_entry
&&
282 !(channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
))) {
283 if (channel
->fd_entry
->path_data
) {
284 // This modification is safe since path_data->channel is checked
285 // only on close_queue (which is still suspended at this point)
286 channel
->fd_entry
->path_data
->channel
= NULL
;
288 // Cleanup handlers will only run when all channels related to this
290 _dispatch_fd_entry_release(channel
->fd_entry
);
292 if (channel
->queue
) {
293 dispatch_release(channel
->queue
);
295 if (channel
->barrier_queue
) {
296 dispatch_release(channel
->barrier_queue
);
298 if (channel
->barrier_group
) {
299 dispatch_release(channel
->barrier_group
);
304 _dispatch_io_validate_type(dispatch_io_t channel
, mode_t mode
)
309 } else if (channel
->params
.type
== DISPATCH_IO_RANDOM
&&
310 (S_ISFIFO(mode
) || S_ISSOCK(mode
))) {
317 _dispatch_io_get_error(dispatch_operation_t op
, dispatch_io_t channel
,
323 channel
= op
->channel
;
325 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
326 if (!ignore_closed
|| channel
->atomic_flags
& DIO_STOPPED
) {
332 err
= op
? op
->fd_entry
->err
: channel
->err
;
338 #pragma mark dispatch_io_channels
341 dispatch_io_create(dispatch_io_type_t type
, dispatch_fd_t fd
,
342 dispatch_queue_t queue
, void (^cleanup_handler
)(int))
344 if (type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) {
345 return DISPATCH_BAD_INPUT
;
347 dispatch_io_t channel
= _dispatch_io_create(type
);
349 _dispatch_channel_debug("create", channel
);
350 channel
->fd_actual
= fd
;
351 dispatch_suspend(channel
->queue
);
352 _dispatch_retain(queue
);
353 _dispatch_retain(channel
);
354 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
356 int err
= fd_entry
->err
;
358 err
= _dispatch_io_validate_type(channel
, fd_entry
->stat
.mode
);
360 if (!err
&& type
== DISPATCH_IO_RANDOM
) {
362 _dispatch_io_syscall_switch_noerr(err
,
363 f_ptr
= lseek(fd_entry
->fd
, 0, SEEK_CUR
),
364 case 0: channel
->f_ptr
= f_ptr
; break;
365 default: (void)dispatch_assume_zero(err
); break;
369 _dispatch_fd_entry_retain(fd_entry
);
370 _dispatch_io_init(channel
, fd_entry
, queue
, err
, cleanup_handler
);
371 dispatch_resume(channel
->queue
);
372 _dispatch_object_debug(channel
, "%s", __func__
);
373 _dispatch_release(channel
);
374 _dispatch_release(queue
);
376 _dispatch_object_debug(channel
, "%s", __func__
);
381 dispatch_io_create_f(dispatch_io_type_t type
, dispatch_fd_t fd
,
382 dispatch_queue_t queue
, void *context
,
383 void (*cleanup_handler
)(void *context
, int error
))
385 return dispatch_io_create(type
, fd
, queue
, !cleanup_handler
? NULL
:
386 ^(int error
){ cleanup_handler(context
, error
); });
390 dispatch_io_create_with_path(dispatch_io_type_t type
, const char *path
,
391 int oflag
, mode_t mode
, dispatch_queue_t queue
,
392 void (^cleanup_handler
)(int error
))
394 if ((type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) ||
396 return DISPATCH_BAD_INPUT
;
398 size_t pathlen
= strlen(path
);
399 dispatch_io_path_data_t path_data
= malloc(sizeof(*path_data
) + pathlen
+1);
401 return DISPATCH_OUT_OF_MEMORY
;
403 dispatch_io_t channel
= _dispatch_io_create(type
);
405 _dispatch_channel_debug("create with path %s", channel
, path
);
406 channel
->fd_actual
= -1;
407 path_data
->channel
= channel
;
408 path_data
->oflag
= oflag
;
409 path_data
->mode
= mode
;
410 path_data
->pathlen
= pathlen
;
411 memcpy(path_data
->path
, path
, pathlen
+ 1);
412 _dispatch_retain(queue
);
413 _dispatch_retain(channel
);
414 dispatch_async(channel
->queue
, ^{
417 _dispatch_io_syscall_switch_noerr(err
,
418 (path_data
->oflag
& O_NOFOLLOW
) == O_NOFOLLOW
420 || (path_data
->oflag
& O_SYMLINK
) == O_SYMLINK
422 ? lstat(path_data
->path
, &st
) : stat(path_data
->path
, &st
),
424 err
= _dispatch_io_validate_type(channel
, st
.st_mode
);
427 if ((path_data
->oflag
& O_CREAT
) &&
428 (*(path_data
->path
+ path_data
->pathlen
- 1) != '/')) {
429 // Check parent directory
430 char *c
= strrchr(path_data
->path
, '/');
434 _dispatch_io_syscall_switch_noerr(perr
,
435 stat(path_data
->path
, &st
),
437 // Since the parent directory exists, open() will
438 // create a regular file after the fd_entry has
440 st
.st_mode
= S_IFREG
;
451 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
);
452 _dispatch_release(channel
);
453 _dispatch_release(queue
);
456 dispatch_suspend(channel
->queue
);
457 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
,
458 _dispatch_io_devs_lockq_init
);
459 dispatch_async(_dispatch_io_devs_lockq
, ^{
460 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create_with_path(
461 path_data
, st
.st_dev
, st
.st_mode
);
462 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
);
463 dispatch_resume(channel
->queue
);
464 _dispatch_object_debug(channel
, "%s", __func__
);
465 _dispatch_release(channel
);
466 _dispatch_release(queue
);
469 _dispatch_object_debug(channel
, "%s", __func__
);
474 dispatch_io_create_with_path_f(dispatch_io_type_t type
, const char *path
,
475 int oflag
, mode_t mode
, dispatch_queue_t queue
, void *context
,
476 void (*cleanup_handler
)(void *context
, int error
))
478 return dispatch_io_create_with_path(type
, path
, oflag
, mode
, queue
,
479 !cleanup_handler
? NULL
:
480 ^(int error
){ cleanup_handler(context
, error
); });
484 dispatch_io_create_with_io(dispatch_io_type_t type
, dispatch_io_t in_channel
,
485 dispatch_queue_t queue
, void (^cleanup_handler
)(int error
))
487 if (type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) {
488 return DISPATCH_BAD_INPUT
;
490 dispatch_io_t channel
= _dispatch_io_create(type
);
491 _dispatch_channel_debug("create with channel %p", channel
, in_channel
);
492 dispatch_suspend(channel
->queue
);
493 _dispatch_retain(queue
);
494 _dispatch_retain(channel
);
495 _dispatch_retain(in_channel
);
496 dispatch_async(in_channel
->queue
, ^{
497 int err0
= _dispatch_io_get_error(NULL
, in_channel
, false);
500 _dispatch_io_init(channel
, NULL
, queue
, err0
, cleanup_handler
);
501 dispatch_resume(channel
->queue
);
502 _dispatch_release(channel
);
503 _dispatch_release(in_channel
);
504 _dispatch_release(queue
);
507 dispatch_async(in_channel
->barrier_queue
, ^{
508 int err
= _dispatch_io_get_error(NULL
, in_channel
, false);
509 // If there is no error, the fd_entry for the in_channel is valid.
510 // Since we are running on in_channel's queue, the fd_entry has been
511 // fully resolved and will stay valid for the duration of this block
513 err
= in_channel
->err
;
515 err
= in_channel
->fd_entry
->err
;
519 err
= _dispatch_io_validate_type(channel
,
520 in_channel
->fd_entry
->stat
.mode
);
522 if (!err
&& type
== DISPATCH_IO_RANDOM
&& in_channel
->fd
!= -1) {
524 _dispatch_io_syscall_switch_noerr(err
,
525 f_ptr
= lseek(in_channel
->fd_entry
->fd
, 0, SEEK_CUR
),
526 case 0: channel
->f_ptr
= f_ptr
; break;
527 default: (void)dispatch_assume_zero(err
); break;
532 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
);
533 dispatch_resume(channel
->queue
);
534 _dispatch_release(channel
);
535 _dispatch_release(in_channel
);
536 _dispatch_release(queue
);
539 if (in_channel
->fd
== -1) {
540 // in_channel was created from path
542 channel
->fd_actual
= -1;
543 mode_t mode
= in_channel
->fd_entry
->stat
.mode
;
544 dev_t dev
= in_channel
->fd_entry
->stat
.dev
;
545 size_t path_data_len
= sizeof(struct dispatch_io_path_data_s
) +
546 in_channel
->fd_entry
->path_data
->pathlen
+ 1;
547 dispatch_io_path_data_t path_data
= malloc(path_data_len
);
548 memcpy(path_data
, in_channel
->fd_entry
->path_data
,
550 path_data
->channel
= channel
;
551 // lockq_io_devs is known to already exist
552 dispatch_async(_dispatch_io_devs_lockq
, ^{
553 dispatch_fd_entry_t fd_entry
;
554 fd_entry
= _dispatch_fd_entry_create_with_path(path_data
,
556 _dispatch_io_init(channel
, fd_entry
, queue
, 0,
558 dispatch_resume(channel
->queue
);
559 _dispatch_release(channel
);
560 _dispatch_release(queue
);
563 dispatch_fd_entry_t fd_entry
= in_channel
->fd_entry
;
564 channel
->fd
= in_channel
->fd
;
565 channel
->fd_actual
= in_channel
->fd_actual
;
566 _dispatch_fd_entry_retain(fd_entry
);
567 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
);
568 dispatch_resume(channel
->queue
);
569 _dispatch_release(channel
);
570 _dispatch_release(queue
);
572 _dispatch_release(in_channel
);
573 _dispatch_object_debug(channel
, "%s", __func__
);
576 _dispatch_object_debug(channel
, "%s", __func__
);
581 dispatch_io_create_with_io_f(dispatch_io_type_t type
, dispatch_io_t in_channel
,
582 dispatch_queue_t queue
, void *context
,
583 void (*cleanup_handler
)(void *context
, int error
))
585 return dispatch_io_create_with_io(type
, in_channel
, queue
,
586 !cleanup_handler
? NULL
:
587 ^(int error
){ cleanup_handler(context
, error
); });
591 #pragma mark dispatch_io_accessors
594 dispatch_io_set_high_water(dispatch_io_t channel
, size_t high_water
)
596 _dispatch_retain(channel
);
597 dispatch_async(channel
->queue
, ^{
598 _dispatch_channel_debug("set high water: %zu", channel
, high_water
);
599 if (channel
->params
.low
> high_water
) {
600 channel
->params
.low
= high_water
;
602 channel
->params
.high
= high_water
? high_water
: 1;
603 _dispatch_release(channel
);
608 dispatch_io_set_low_water(dispatch_io_t channel
, size_t low_water
)
610 _dispatch_retain(channel
);
611 dispatch_async(channel
->queue
, ^{
612 _dispatch_channel_debug("set low water: %zu", channel
, low_water
);
613 if (channel
->params
.high
< low_water
) {
614 channel
->params
.high
= low_water
? low_water
: 1;
616 channel
->params
.low
= low_water
;
617 _dispatch_release(channel
);
622 dispatch_io_set_interval(dispatch_io_t channel
, uint64_t interval
,
625 _dispatch_retain(channel
);
626 dispatch_async(channel
->queue
, ^{
627 _dispatch_channel_debug("set interval: %llu", channel
, interval
);
628 channel
->params
.interval
= interval
< INT64_MAX
? interval
: INT64_MAX
;
629 channel
->params
.interval_flags
= flags
;
630 _dispatch_release(channel
);
635 _dispatch_io_set_target_queue(dispatch_io_t channel
, dispatch_queue_t dq
)
637 _dispatch_retain(dq
);
638 _dispatch_retain(channel
);
639 dispatch_async(channel
->queue
, ^{
640 dispatch_queue_t prev_dq
= channel
->do_targetq
;
641 channel
->do_targetq
= dq
;
642 _dispatch_release(prev_dq
);
643 _dispatch_object_debug(channel
, "%s", __func__
);
644 _dispatch_release(channel
);
649 dispatch_io_get_descriptor(dispatch_io_t channel
)
651 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
654 dispatch_fd_t fd
= channel
->fd_actual
;
655 if (fd
== -1 && !_dispatch_io_get_error(NULL
, channel
, false)) {
656 dispatch_thread_context_t ctxt
=
657 _dispatch_thread_context_find(_dispatch_io_key
);
658 if (ctxt
&& ctxt
->dtc_io_in_barrier
== channel
) {
659 (void)_dispatch_fd_entry_open(channel
->fd_entry
, channel
);
662 return channel
->fd_actual
;
666 #pragma mark dispatch_io_operations
669 _dispatch_io_stop(dispatch_io_t channel
)
671 _dispatch_channel_debug("stop", channel
);
672 (void)os_atomic_or2o(channel
, atomic_flags
, DIO_STOPPED
, relaxed
);
673 _dispatch_retain(channel
);
674 dispatch_async(channel
->queue
, ^{
675 dispatch_async(channel
->barrier_queue
, ^{
676 _dispatch_object_debug(channel
, "%s", __func__
);
677 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
679 _dispatch_channel_debug("stop cleanup", channel
);
680 _dispatch_fd_entry_cleanup_operations(fd_entry
, channel
);
681 if (!(channel
->atomic_flags
& DIO_CLOSED
)) {
682 channel
->fd_entry
= NULL
;
683 _dispatch_fd_entry_release(fd_entry
);
685 } else if (channel
->fd
!= -1) {
686 // Stop after close, need to check if fd_entry still exists
687 _dispatch_retain(channel
);
688 dispatch_async(_dispatch_io_fds_lockq
, ^{
689 _dispatch_object_debug(channel
, "%s", __func__
);
690 _dispatch_channel_debug("stop cleanup after close",
692 dispatch_fd_entry_t fdi
;
693 uintptr_t hash
= DIO_HASH(channel
->fd
);
694 TAILQ_FOREACH(fdi
, &_dispatch_io_fds
[hash
], fd_list
) {
695 if (fdi
->fd
== channel
->fd
) {
696 _dispatch_fd_entry_cleanup_operations(fdi
, channel
);
700 _dispatch_release(channel
);
703 _dispatch_release(channel
);
709 dispatch_io_close(dispatch_io_t channel
, unsigned long flags
)
711 if (flags
& DISPATCH_IO_STOP
) {
712 // Don't stop an already stopped channel
713 if (channel
->atomic_flags
& DIO_STOPPED
) {
716 return _dispatch_io_stop(channel
);
718 // Don't close an already closed or stopped channel
719 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
722 _dispatch_retain(channel
);
723 dispatch_async(channel
->queue
, ^{
724 dispatch_async(channel
->barrier_queue
, ^{
725 _dispatch_object_debug(channel
, "%s", __func__
);
726 _dispatch_channel_debug("close", channel
);
727 if (!(channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
))) {
728 (void)os_atomic_or2o(channel
, atomic_flags
, DIO_CLOSED
,
730 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
732 if (!fd_entry
->path_data
) {
733 channel
->fd_entry
= NULL
;
735 _dispatch_fd_entry_release(fd_entry
);
738 _dispatch_release(channel
);
744 dispatch_io_barrier(dispatch_io_t channel
, dispatch_block_t barrier
)
746 _dispatch_retain(channel
);
747 dispatch_async(channel
->queue
, ^{
748 dispatch_queue_t io_q
= channel
->do_targetq
;
749 dispatch_queue_t barrier_queue
= channel
->barrier_queue
;
750 dispatch_group_t barrier_group
= channel
->barrier_group
;
751 dispatch_async(barrier_queue
, ^{
752 dispatch_suspend(barrier_queue
);
753 dispatch_group_notify(barrier_group
, io_q
, ^{
754 dispatch_thread_context_s io_ctxt
= {
755 .dtc_key
= _dispatch_io_key
,
756 .dtc_io_in_barrier
= channel
,
759 _dispatch_object_debug(channel
, "%s", __func__
);
760 _dispatch_thread_context_push(&io_ctxt
);
762 _dispatch_thread_context_pop(&io_ctxt
);
763 dispatch_resume(barrier_queue
);
764 _dispatch_release(channel
);
771 dispatch_io_barrier_f(dispatch_io_t channel
, void *context
,
772 dispatch_function_t barrier
)
774 return dispatch_io_barrier(channel
, ^{ barrier(context
); });
778 dispatch_io_read(dispatch_io_t channel
, off_t offset
, size_t length
,
779 dispatch_queue_t queue
, dispatch_io_handler_t handler
)
781 _dispatch_retain(channel
);
782 _dispatch_retain(queue
);
783 dispatch_async(channel
->queue
, ^{
784 dispatch_operation_t op
;
785 op
= _dispatch_operation_create(DOP_DIR_READ
, channel
, offset
,
786 length
, dispatch_data_empty
, queue
, handler
);
788 dispatch_queue_t barrier_q
= channel
->barrier_queue
;
789 dispatch_async(barrier_q
, ^{
790 _dispatch_operation_enqueue(op
, DOP_DIR_READ
,
791 dispatch_data_empty
);
794 _dispatch_release(channel
);
795 _dispatch_release(queue
);
800 dispatch_io_read_f(dispatch_io_t channel
, off_t offset
, size_t length
,
801 dispatch_queue_t queue
, void *context
,
802 dispatch_io_handler_function_t handler
)
804 return dispatch_io_read(channel
, offset
, length
, queue
,
805 ^(bool done
, dispatch_data_t d
, int error
){
806 handler(context
, done
, d
, error
);
811 dispatch_io_write(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
,
812 dispatch_queue_t queue
, dispatch_io_handler_t handler
)
814 _dispatch_io_data_retain(data
);
815 _dispatch_retain(channel
);
816 _dispatch_retain(queue
);
817 dispatch_async(channel
->queue
, ^{
818 dispatch_operation_t op
;
819 op
= _dispatch_operation_create(DOP_DIR_WRITE
, channel
, offset
,
820 dispatch_data_get_size(data
), data
, queue
, handler
);
822 dispatch_queue_t barrier_q
= channel
->barrier_queue
;
823 dispatch_async(barrier_q
, ^{
824 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
);
825 _dispatch_io_data_release(data
);
828 _dispatch_io_data_release(data
);
830 _dispatch_release(channel
);
831 _dispatch_release(queue
);
836 dispatch_io_write_f(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
,
837 dispatch_queue_t queue
, void *context
,
838 dispatch_io_handler_function_t handler
)
840 return dispatch_io_write(channel
, offset
, data
, queue
,
841 ^(bool done
, dispatch_data_t d
, int error
){
842 handler(context
, done
, d
, error
);
847 dispatch_read(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
,
848 void (^handler
)(dispatch_data_t
, int))
850 _dispatch_retain(queue
);
851 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
854 int err
= fd_entry
->err
;
855 dispatch_async(queue
, ^{
856 _dispatch_fd_debug("convenience handler invoke", fd
);
857 handler(dispatch_data_empty
, err
);
859 _dispatch_release(queue
);
862 // Safe to access fd_entry on barrier queue
863 dispatch_io_t channel
= fd_entry
->convenience_channel
;
865 channel
= _dispatch_io_create(DISPATCH_IO_STREAM
);
867 channel
->fd_actual
= fd
;
868 channel
->fd_entry
= fd_entry
;
869 dispatch_retain(fd_entry
->barrier_queue
);
870 dispatch_retain(fd_entry
->barrier_group
);
871 channel
->barrier_queue
= fd_entry
->barrier_queue
;
872 channel
->barrier_group
= fd_entry
->barrier_group
;
873 fd_entry
->convenience_channel
= channel
;
875 __block dispatch_data_t deliver_data
= dispatch_data_empty
;
877 dispatch_async(fd_entry
->close_queue
, ^{
878 dispatch_async(queue
, ^{
879 _dispatch_fd_debug("convenience handler invoke", fd
);
880 handler(deliver_data
, err
);
881 _dispatch_io_data_release(deliver_data
);
883 _dispatch_release(queue
);
885 dispatch_operation_t op
=
886 _dispatch_operation_create(DOP_DIR_READ
, channel
, 0,
887 length
, dispatch_data_empty
,
888 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,false),
889 ^(bool done
, dispatch_data_t data
, int error
) {
891 data
= dispatch_data_create_concat(deliver_data
, data
);
892 _dispatch_io_data_release(deliver_data
);
900 _dispatch_operation_enqueue(op
, DOP_DIR_READ
, dispatch_data_empty
);
906 dispatch_read_f(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
,
907 void *context
, void (*handler
)(void *, dispatch_data_t
, int))
909 return dispatch_read(fd
, length
, queue
, ^(dispatch_data_t d
, int error
){
910 handler(context
, d
, error
);
915 dispatch_write(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
,
916 void (^handler
)(dispatch_data_t
, int))
918 _dispatch_io_data_retain(data
);
919 _dispatch_retain(queue
);
920 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
923 int err
= fd_entry
->err
;
924 dispatch_async(queue
, ^{
925 _dispatch_fd_debug("convenience handler invoke", fd
);
928 _dispatch_release(queue
);
931 // Safe to access fd_entry on barrier queue
932 dispatch_io_t channel
= fd_entry
->convenience_channel
;
934 channel
= _dispatch_io_create(DISPATCH_IO_STREAM
);
936 channel
->fd_actual
= fd
;
937 channel
->fd_entry
= fd_entry
;
938 dispatch_retain(fd_entry
->barrier_queue
);
939 dispatch_retain(fd_entry
->barrier_group
);
940 channel
->barrier_queue
= fd_entry
->barrier_queue
;
941 channel
->barrier_group
= fd_entry
->barrier_group
;
942 fd_entry
->convenience_channel
= channel
;
944 __block dispatch_data_t deliver_data
= NULL
;
946 dispatch_async(fd_entry
->close_queue
, ^{
947 dispatch_async(queue
, ^{
948 _dispatch_fd_debug("convenience handler invoke", fd
);
949 handler(deliver_data
, err
);
951 _dispatch_io_data_release(deliver_data
);
954 _dispatch_release(queue
);
956 dispatch_operation_t op
=
957 _dispatch_operation_create(DOP_DIR_WRITE
, channel
, 0,
958 dispatch_data_get_size(data
), data
,
959 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,false),
960 ^(bool done
, dispatch_data_t d
, int error
) {
963 _dispatch_io_data_retain(d
);
970 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
);
972 _dispatch_io_data_release(data
);
977 dispatch_write_f(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
,
978 void *context
, void (*handler
)(void *, dispatch_data_t
, int))
980 return dispatch_write(fd
, data
, queue
, ^(dispatch_data_t d
, int error
){
981 handler(context
, d
, error
);
986 #pragma mark dispatch_operation_t
988 static dispatch_operation_t
989 _dispatch_operation_create(dispatch_op_direction_t direction
,
990 dispatch_io_t channel
, off_t offset
, size_t length
,
991 dispatch_data_t data
, dispatch_queue_t queue
,
992 dispatch_io_handler_t handler
)
995 dispatch_assert(direction
< DOP_DIR_MAX
);
996 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
997 // that can only be NULL if atomic_flags are set rdar://problem/8362514
998 int err
= _dispatch_io_get_error(NULL
, channel
, false);
999 if (err
|| !length
) {
1000 _dispatch_io_data_retain(data
);
1001 _dispatch_retain(queue
);
1002 dispatch_async(channel
->barrier_queue
, ^{
1003 dispatch_async(queue
, ^{
1004 dispatch_data_t d
= data
;
1005 if (direction
== DOP_DIR_READ
&& err
) {
1007 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
1010 _dispatch_channel_debug("IO handler invoke: err %d", channel
,
1012 handler(true, d
, err
);
1013 _dispatch_io_data_release(data
);
1015 _dispatch_release(queue
);
1019 dispatch_operation_t op
= _dispatch_alloc(DISPATCH_VTABLE(operation
),
1020 sizeof(struct dispatch_operation_s
));
1021 _dispatch_channel_debug("operation create: %p", channel
, op
);
1022 op
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1023 op
->do_xref_cnt
= -1; // operation object is not exposed externally
1024 op
->op_q
= dispatch_queue_create("com.apple.libdispatch-io.opq", NULL
);
1025 op
->op_q
->do_targetq
= queue
;
1026 _dispatch_retain(queue
);
1028 op
->direction
= direction
;
1029 op
->offset
= offset
+ channel
->f_ptr
;
1030 op
->length
= length
;
1031 op
->handler
= _dispatch_io_Block_copy(handler
);
1032 _dispatch_retain(channel
);
1033 op
->channel
= channel
;
1034 op
->params
= channel
->params
;
1035 // Take a snapshot of the priority of the channel queue. The actual I/O
1036 // for this operation will be performed at this priority
1037 dispatch_queue_t targetq
= op
->channel
->do_targetq
;
1038 while (fastpath(targetq
->do_targetq
)) {
1039 targetq
= targetq
->do_targetq
;
1041 op
->do_targetq
= targetq
;
1042 _dispatch_object_debug(op
, "%s", __func__
);
1047 _dispatch_operation_dispose(dispatch_operation_t op
)
1049 _dispatch_object_debug(op
, "%s", __func__
);
1050 _dispatch_op_debug("dispose", op
);
1051 // Deliver the data if there's any
1053 _dispatch_operation_deliver_data(op
, DOP_DONE
);
1054 dispatch_group_leave(op
->fd_entry
->barrier_group
);
1055 _dispatch_fd_entry_release(op
->fd_entry
);
1058 _dispatch_release(op
->channel
);
1061 dispatch_release(op
->timer
);
1063 // For write operations, op->buf is owned by op->buf_data
1064 if (op
->buf
&& op
->direction
== DOP_DIR_READ
) {
1068 _dispatch_io_data_release(op
->buf_data
);
1071 _dispatch_io_data_release(op
->data
);
1074 dispatch_release(op
->op_q
);
1076 Block_release(op
->handler
);
1077 _dispatch_op_debug("disposed", op
);
1081 _dispatch_operation_enqueue(dispatch_operation_t op
,
1082 dispatch_op_direction_t direction
, dispatch_data_t data
)
1084 // Called from the barrier queue
1085 _dispatch_io_data_retain(data
);
1086 // If channel is closed or stopped, then call the handler immediately
1087 int err
= _dispatch_io_get_error(NULL
, op
->channel
, false);
1089 dispatch_io_handler_t handler
= op
->handler
;
1090 dispatch_async(op
->op_q
, ^{
1091 dispatch_data_t d
= data
;
1092 if (direction
== DOP_DIR_READ
&& err
) {
1094 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
1097 handler(true, d
, err
);
1098 _dispatch_io_data_release(data
);
1100 _dispatch_op_debug("release -> %d, err %d", op
, op
->do_ref_cnt
, err
);
1101 _dispatch_release(op
);
1104 // Finish operation init
1105 op
->fd_entry
= op
->channel
->fd_entry
;
1106 _dispatch_fd_entry_retain(op
->fd_entry
);
1107 dispatch_group_enter(op
->fd_entry
->barrier_group
);
1108 dispatch_disk_t disk
= op
->fd_entry
->disk
;
1110 dispatch_stream_t stream
= op
->fd_entry
->streams
[direction
];
1111 dispatch_async(stream
->dq
, ^{
1112 _dispatch_stream_enqueue_operation(stream
, op
, data
);
1113 _dispatch_io_data_release(data
);
1116 dispatch_async(disk
->pick_queue
, ^{
1117 _dispatch_disk_enqueue_operation(disk
, op
, data
);
1118 _dispatch_io_data_release(data
);
1124 _dispatch_operation_should_enqueue(dispatch_operation_t op
,
1125 dispatch_queue_t tq
, dispatch_data_t data
)
1127 // On stream queue or disk queue
1128 _dispatch_op_debug("enqueue", op
);
1129 _dispatch_io_data_retain(data
);
1131 int err
= _dispatch_io_get_error(op
, NULL
, true);
1135 _dispatch_op_debug("release -> %d, err %d", op
, op
->do_ref_cnt
, err
);
1136 _dispatch_release(op
);
1139 if (op
->params
.interval
) {
1140 dispatch_resume(_dispatch_operation_timer(tq
, op
));
1145 static dispatch_source_t
1146 _dispatch_operation_timer(dispatch_queue_t tq
, dispatch_operation_t op
)
1148 // On stream queue or pick queue
1152 dispatch_source_t timer
= dispatch_source_create(
1153 DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, tq
);
1154 dispatch_source_set_timer(timer
, dispatch_time(DISPATCH_TIME_NOW
,
1155 (int64_t)op
->params
.interval
), op
->params
.interval
, 0);
1156 dispatch_source_set_event_handler(timer
, ^{
1157 // On stream queue or pick queue
1158 if (dispatch_source_testcancel(timer
)) {
1159 // Do nothing. The operation has already completed
1162 dispatch_op_flags_t flags
= DOP_DEFAULT
;
1163 if (op
->params
.interval_flags
& DISPATCH_IO_STRICT_INTERVAL
) {
1164 // Deliver even if there is less data than the low-water mark
1165 flags
|= DOP_DELIVER
;
1167 // If the operation is active, dont deliver data
1168 if ((op
->active
) && (flags
& DOP_DELIVER
)) {
1171 _dispatch_operation_deliver_data(op
, flags
);
1179 #pragma mark dispatch_fd_entry_t
1181 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1183 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry
)
1185 guardid_t guard
= fd_entry
;
1186 const unsigned int guard_flags
= GUARD_CLOSE
;
1187 int err
, fd_flags
= 0;
1188 _dispatch_io_syscall_switch_noerr(err
,
1189 change_fdguard_np(fd_entry
->fd
, NULL
, 0, &guard
, guard_flags
,
1192 fd_entry
->guard_flags
= guard_flags
;
1193 fd_entry
->orig_fd_flags
= fd_flags
;
1196 default: (void)dispatch_assume_zero(err
); break;
1201 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry
)
1203 if (!fd_entry
->guard_flags
) {
1206 guardid_t guard
= fd_entry
;
1207 int err
, fd_flags
= fd_entry
->orig_fd_flags
;
1208 _dispatch_io_syscall_switch(err
,
1209 change_fdguard_np(fd_entry
->fd
, &guard
, fd_entry
->guard_flags
, NULL
, 0,
1211 default: (void)dispatch_assume_zero(err
); break;
1216 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry
) { (void)fd_entry
; }
1218 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry
) { (void)fd_entry
; }
1219 #endif // DISPATCH_USE_GUARDED_FD
1222 _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry
, const char *path
,
1223 int oflag
, mode_t mode
) {
1224 #if DISPATCH_USE_GUARDED_FD
1225 guardid_t guard
= (uintptr_t)fd_entry
;
1226 const unsigned int guard_flags
= GUARD_CLOSE
| GUARD_DUP
|
1227 GUARD_SOCKET_IPC
| GUARD_FILEPORT
;
1228 int fd
= guarded_open_np(path
, &guard
, guard_flags
, oflag
| O_CLOEXEC
,
1231 fd_entry
->guard_flags
= guard_flags
;
1236 return open(path
, oflag
, mode
);
1241 _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry
, int fd
) {
1242 #if DISPATCH_USE_GUARDED_FD
1243 if (fd_entry
->guard_flags
) {
1244 guardid_t guard
= (uintptr_t)fd_entry
;
1245 return guarded_close_np(fd
, &guard
);
1255 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
) {
1256 dispatch_suspend(fd_entry
->close_queue
);
1260 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
) {
1261 dispatch_resume(fd_entry
->close_queue
);
1265 _dispatch_fd_entry_init_async(dispatch_fd_t fd
,
1266 dispatch_fd_entry_init_callback_t completion_callback
)
1268 static dispatch_once_t _dispatch_io_fds_lockq_pred
;
1269 dispatch_once_f(&_dispatch_io_fds_lockq_pred
, NULL
,
1270 _dispatch_io_fds_lockq_init
);
1271 dispatch_async(_dispatch_io_fds_lockq
, ^{
1272 dispatch_fd_entry_t fd_entry
= NULL
;
1273 // Check to see if there is an existing entry for the given fd
1274 uintptr_t hash
= DIO_HASH(fd
);
1275 TAILQ_FOREACH(fd_entry
, &_dispatch_io_fds
[hash
], fd_list
) {
1276 if (fd_entry
->fd
== fd
) {
1277 // Retain the fd_entry to ensure it cannot go away until the
1278 // stat() has completed
1279 _dispatch_fd_entry_retain(fd_entry
);
1284 // If we did not find an existing entry, create one
1285 fd_entry
= _dispatch_fd_entry_create_with_fd(fd
, hash
);
1287 _dispatch_fd_entry_debug("init", fd_entry
);
1288 dispatch_async(fd_entry
->barrier_queue
, ^{
1289 _dispatch_fd_entry_debug("init completion", fd_entry
);
1290 completion_callback(fd_entry
);
1291 // stat() is complete, release reference to fd_entry
1292 _dispatch_fd_entry_release(fd_entry
);
1297 static dispatch_fd_entry_t
1298 _dispatch_fd_entry_create(dispatch_queue_t q
)
1300 dispatch_fd_entry_t fd_entry
;
1301 fd_entry
= _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s
));
1302 fd_entry
->close_queue
= dispatch_queue_create(
1303 "com.apple.libdispatch-io.closeq", NULL
);
1304 // Use target queue to ensure that no concurrent lookups are going on when
1305 // the close queue is running
1306 fd_entry
->close_queue
->do_targetq
= q
;
1307 _dispatch_retain(q
);
1308 // Suspend the cleanup queue until closing
1309 _dispatch_fd_entry_retain(fd_entry
);
1313 static dispatch_fd_entry_t
1314 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
, uintptr_t hash
)
1316 // On fds lock queue
1317 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create(
1318 _dispatch_io_fds_lockq
);
1319 _dispatch_fd_entry_debug("create: fd %d", fd_entry
, fd
);
1321 TAILQ_INSERT_TAIL(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
);
1322 fd_entry
->barrier_queue
= dispatch_queue_create(
1323 "com.apple.libdispatch-io.barrierq", NULL
);
1324 fd_entry
->barrier_group
= dispatch_group_create();
1325 dispatch_async(fd_entry
->barrier_queue
, ^{
1326 _dispatch_fd_entry_debug("stat", fd_entry
);
1327 int err
, orig_flags
, orig_nosigpipe
= -1;
1329 _dispatch_io_syscall_switch(err
,
1331 default: fd_entry
->err
= err
; return;
1333 fd_entry
->stat
.dev
= st
.st_dev
;
1334 fd_entry
->stat
.mode
= st
.st_mode
;
1335 _dispatch_fd_entry_guard(fd_entry
);
1336 _dispatch_io_syscall_switch(err
,
1337 orig_flags
= fcntl(fd
, F_GETFL
),
1338 default: (void)dispatch_assume_zero(err
); break;
1340 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1341 if (S_ISFIFO(st
.st_mode
)) {
1342 _dispatch_io_syscall_switch(err
,
1343 orig_nosigpipe
= fcntl(fd
, F_GETNOSIGPIPE
),
1344 default: (void)dispatch_assume_zero(err
); break;
1346 if (orig_nosigpipe
!= -1) {
1347 _dispatch_io_syscall_switch(err
,
1348 orig_nosigpipe
= fcntl(fd
, F_SETNOSIGPIPE
, 1),
1350 orig_nosigpipe
= -1;
1351 (void)dispatch_assume_zero(err
);
1357 if (S_ISREG(st
.st_mode
)) {
1358 if (orig_flags
!= -1) {
1359 _dispatch_io_syscall_switch(err
,
1360 fcntl(fd
, F_SETFL
, orig_flags
& ~O_NONBLOCK
),
1363 (void)dispatch_assume_zero(err
);
1367 int32_t dev
= major(st
.st_dev
);
1368 // We have to get the disk on the global dev queue. The
1369 // barrier queue cannot continue until that is complete
1370 dispatch_suspend(fd_entry
->barrier_queue
);
1371 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
,
1372 _dispatch_io_devs_lockq_init
);
1373 dispatch_async(_dispatch_io_devs_lockq
, ^{
1374 _dispatch_disk_init(fd_entry
, dev
);
1375 dispatch_resume(fd_entry
->barrier_queue
);
1378 if (orig_flags
!= -1) {
1379 _dispatch_io_syscall_switch(err
,
1380 fcntl(fd
, F_SETFL
, orig_flags
| O_NONBLOCK
),
1383 (void)dispatch_assume_zero(err
);
1387 _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue(
1388 _DISPATCH_QOS_CLASS_DEFAULT
, false));
1390 fd_entry
->orig_flags
= orig_flags
;
1391 fd_entry
->orig_nosigpipe
= orig_nosigpipe
;
1393 // This is the first item run when the close queue is resumed, indicating
1394 // that all channels associated with this entry have been closed and that
1395 // all operations associated with this entry have been freed
1396 dispatch_async(fd_entry
->close_queue
, ^{
1397 if (!fd_entry
->disk
) {
1398 _dispatch_fd_entry_debug("close queue cleanup", fd_entry
);
1399 dispatch_op_direction_t dir
;
1400 for (dir
= 0; dir
< DOP_DIR_MAX
; dir
++) {
1401 _dispatch_stream_dispose(fd_entry
, dir
);
1404 dispatch_disk_t disk
= fd_entry
->disk
;
1405 dispatch_async(_dispatch_io_devs_lockq
, ^{
1406 _dispatch_release(disk
);
1409 // Remove this entry from the global fd list
1410 TAILQ_REMOVE(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
);
1412 // If there was a source associated with this stream, disposing of the
1413 // source cancels it and suspends the close queue. Freeing the fd_entry
1414 // structure must happen after the source cancel handler has finished
1415 dispatch_async(fd_entry
->close_queue
, ^{
1416 _dispatch_fd_entry_debug("close queue release", fd_entry
);
1417 dispatch_release(fd_entry
->close_queue
);
1418 _dispatch_fd_entry_debug("barrier queue release", fd_entry
);
1419 dispatch_release(fd_entry
->barrier_queue
);
1420 _dispatch_fd_entry_debug("barrier group release", fd_entry
);
1421 dispatch_release(fd_entry
->barrier_group
);
1422 if (fd_entry
->orig_flags
!= -1) {
1423 _dispatch_io_syscall(
1424 fcntl(fd
, F_SETFL
, fd_entry
->orig_flags
)
1427 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1428 if (fd_entry
->orig_nosigpipe
!= -1) {
1429 _dispatch_io_syscall(
1430 fcntl(fd
, F_SETNOSIGPIPE
, fd_entry
->orig_nosigpipe
)
1434 _dispatch_fd_entry_unguard(fd_entry
);
1435 if (fd_entry
->convenience_channel
) {
1436 fd_entry
->convenience_channel
->fd_entry
= NULL
;
1437 dispatch_release(fd_entry
->convenience_channel
);
1444 static dispatch_fd_entry_t
1445 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data
,
1446 dev_t dev
, mode_t mode
)
1448 // On devs lock queue
1449 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create(
1450 path_data
->channel
->queue
);
1451 _dispatch_fd_entry_debug("create: path %s", fd_entry
, path_data
->path
);
1452 if (S_ISREG(mode
)) {
1453 _dispatch_disk_init(fd_entry
, major(dev
));
1455 _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue(
1456 _DISPATCH_QOS_CLASS_DEFAULT
, false));
1459 fd_entry
->orig_flags
= -1;
1460 fd_entry
->path_data
= path_data
;
1461 fd_entry
->stat
.dev
= dev
;
1462 fd_entry
->stat
.mode
= mode
;
1463 fd_entry
->barrier_queue
= dispatch_queue_create(
1464 "com.apple.libdispatch-io.barrierq", NULL
);
1465 fd_entry
->barrier_group
= dispatch_group_create();
1466 // This is the first item run when the close queue is resumed, indicating
1467 // that the channel associated with this entry has been closed and that
1468 // all operations associated with this entry have been freed
1469 dispatch_async(fd_entry
->close_queue
, ^{
1470 _dispatch_fd_entry_debug("close queue cleanup", fd_entry
);
1471 if (!fd_entry
->disk
) {
1472 dispatch_op_direction_t dir
;
1473 for (dir
= 0; dir
< DOP_DIR_MAX
; dir
++) {
1474 _dispatch_stream_dispose(fd_entry
, dir
);
1477 if (fd_entry
->fd
!= -1) {
1478 _dispatch_fd_entry_guarded_close(fd_entry
, fd_entry
->fd
);
1480 if (fd_entry
->path_data
->channel
) {
1481 // If associated channel has not been released yet, mark it as
1482 // no longer having an fd_entry (for stop after close).
1483 // It is safe to modify channel since we are on close_queue with
1484 // target queue the channel queue
1485 fd_entry
->path_data
->channel
->fd_entry
= NULL
;
1488 dispatch_async(fd_entry
->close_queue
, ^{
1489 _dispatch_fd_entry_debug("close queue release", fd_entry
);
1490 dispatch_release(fd_entry
->close_queue
);
1491 dispatch_release(fd_entry
->barrier_queue
);
1492 dispatch_release(fd_entry
->barrier_group
);
1493 free(fd_entry
->path_data
);
1500 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
, dispatch_io_t channel
)
1502 if (!(fd_entry
->fd
== -1 && fd_entry
->path_data
)) {
1505 if (fd_entry
->err
) {
1506 return fd_entry
->err
;
1509 int oflag
= fd_entry
->disk
? fd_entry
->path_data
->oflag
& ~O_NONBLOCK
:
1510 fd_entry
->path_data
->oflag
| O_NONBLOCK
;
1512 fd
= _dispatch_fd_entry_guarded_open(fd_entry
, fd_entry
->path_data
->path
,
1513 oflag
, fd_entry
->path_data
->mode
);
1519 (void)os_atomic_cmpxchg2o(fd_entry
, err
, 0, err
, relaxed
);
1522 if (!os_atomic_cmpxchg2o(fd_entry
, fd
, -1, fd
, relaxed
)) {
1523 // Lost the race with another open
1524 _dispatch_fd_entry_guarded_close(fd_entry
, fd
);
1526 channel
->fd_actual
= fd
;
1528 _dispatch_object_debug(channel
, "%s", __func__
);
1533 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
,
1534 dispatch_io_t channel
)
1536 if (fd_entry
->disk
) {
1538 _dispatch_retain(channel
);
1540 _dispatch_fd_entry_retain(fd_entry
);
1541 dispatch_async(fd_entry
->disk
->pick_queue
, ^{
1542 _dispatch_disk_cleanup_inactive_operations(fd_entry
->disk
, channel
);
1543 _dispatch_fd_entry_release(fd_entry
);
1545 _dispatch_release(channel
);
1549 dispatch_op_direction_t direction
;
1550 for (direction
= 0; direction
< DOP_DIR_MAX
; direction
++) {
1551 dispatch_stream_t stream
= fd_entry
->streams
[direction
];
1556 _dispatch_retain(channel
);
1558 _dispatch_fd_entry_retain(fd_entry
);
1559 dispatch_async(stream
->dq
, ^{
1560 _dispatch_stream_cleanup_operations(stream
, channel
);
1561 _dispatch_fd_entry_release(fd_entry
);
1563 _dispatch_release(channel
);
1571 #pragma mark dispatch_stream_t/dispatch_disk_t
1574 _dispatch_stream_init(dispatch_fd_entry_t fd_entry
, dispatch_queue_t tq
)
1576 dispatch_op_direction_t direction
;
1577 for (direction
= 0; direction
< DOP_DIR_MAX
; direction
++) {
1578 dispatch_stream_t stream
;
1579 stream
= _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s
));
1580 stream
->dq
= dispatch_queue_create("com.apple.libdispatch-io.streamq",
1582 dispatch_set_context(stream
->dq
, stream
);
1583 _dispatch_retain(tq
);
1584 stream
->dq
->do_targetq
= tq
;
1585 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1586 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_STREAM
]);
1587 fd_entry
->streams
[direction
] = stream
;
1592 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
,
1593 dispatch_op_direction_t direction
)
1596 dispatch_stream_t stream
= fd_entry
->streams
[direction
];
1600 dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
]));
1601 dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
]));
1602 if (stream
->source
) {
1603 // Balanced by source cancel handler:
1604 _dispatch_fd_entry_retain(fd_entry
);
1605 dispatch_source_cancel(stream
->source
);
1606 dispatch_resume(stream
->source
);
1607 dispatch_release(stream
->source
);
1609 dispatch_set_context(stream
->dq
, NULL
);
1610 dispatch_release(stream
->dq
);
1615 _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
)
1617 // On devs lock queue
1618 dispatch_disk_t disk
;
1619 // Check to see if there is an existing entry for the given device
1620 uintptr_t hash
= DIO_HASH(dev
);
1621 TAILQ_FOREACH(disk
, &_dispatch_io_devs
[hash
], disk_list
) {
1622 if (disk
->dev
== dev
) {
1623 _dispatch_retain(disk
);
1627 // Otherwise create a new entry
1628 size_t pending_reqs_depth
= dispatch_io_defaults
.max_pending_io_reqs
;
1629 disk
= _dispatch_alloc(DISPATCH_VTABLE(disk
),
1630 sizeof(struct dispatch_disk_s
) +
1631 (pending_reqs_depth
* sizeof(dispatch_operation_t
)));
1632 disk
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1633 disk
->do_xref_cnt
= -1;
1634 disk
->advise_list_depth
= pending_reqs_depth
;
1635 disk
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
1638 TAILQ_INIT(&disk
->operations
);
1639 disk
->cur_rq
= TAILQ_FIRST(&disk
->operations
);
1641 snprintf(label
, sizeof(label
), "com.apple.libdispatch-io.deviceq.%d",
1643 disk
->pick_queue
= dispatch_queue_create(label
, NULL
);
1644 TAILQ_INSERT_TAIL(&_dispatch_io_devs
[hash
], disk
, disk_list
);
1646 fd_entry
->disk
= disk
;
1647 TAILQ_INIT(&fd_entry
->stream_ops
);
1651 _dispatch_disk_dispose(dispatch_disk_t disk
)
1653 uintptr_t hash
= DIO_HASH(disk
->dev
);
1654 TAILQ_REMOVE(&_dispatch_io_devs
[hash
], disk
, disk_list
);
1655 dispatch_assert(TAILQ_EMPTY(&disk
->operations
));
1657 for (i
=0; i
<disk
->advise_list_depth
; ++i
) {
1658 dispatch_assert(!disk
->advise_list
[i
]);
1660 dispatch_release(disk
->pick_queue
);
1664 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1667 _dispatch_stream_operation_avail(dispatch_stream_t stream
)
1669 return !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) ||
1670 !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
]));
1674 _dispatch_stream_enqueue_operation(dispatch_stream_t stream
,
1675 dispatch_operation_t op
, dispatch_data_t data
)
1677 if (!_dispatch_operation_should_enqueue(op
, stream
->dq
, data
)) {
1680 _dispatch_object_debug(op
, "%s", __func__
);
1681 bool no_ops
= !_dispatch_stream_operation_avail(stream
);
1682 TAILQ_INSERT_TAIL(&stream
->operations
[op
->params
.type
], op
, operation_list
);
1684 dispatch_async_f(stream
->dq
, stream
->dq
,
1685 _dispatch_stream_queue_handler
);
1690 _dispatch_disk_enqueue_operation(dispatch_disk_t disk
, dispatch_operation_t op
,
1691 dispatch_data_t data
)
1693 if (!_dispatch_operation_should_enqueue(op
, disk
->pick_queue
, data
)) {
1696 _dispatch_object_debug(op
, "%s", __func__
);
1697 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1698 if (TAILQ_EMPTY(&op
->fd_entry
->stream_ops
)) {
1699 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
);
1701 TAILQ_INSERT_TAIL(&op
->fd_entry
->stream_ops
, op
, stream_list
);
1703 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
);
1705 _dispatch_disk_handler(disk
);
1709 _dispatch_stream_complete_operation(dispatch_stream_t stream
,
1710 dispatch_operation_t op
)
1713 _dispatch_object_debug(op
, "%s", __func__
);
1714 _dispatch_op_debug("complete: stream %p", op
, stream
);
1715 TAILQ_REMOVE(&stream
->operations
[op
->params
.type
], op
, operation_list
);
1716 if (op
== stream
->op
) {
1720 dispatch_source_cancel(op
->timer
);
1722 // Final release will deliver any pending data
1723 _dispatch_op_debug("release -> %d (stream complete)", op
, op
->do_ref_cnt
);
1724 _dispatch_release(op
);
1728 _dispatch_disk_complete_operation(dispatch_disk_t disk
, dispatch_operation_t op
)
1731 _dispatch_object_debug(op
, "%s", __func__
);
1732 _dispatch_op_debug("complete: disk %p", op
, disk
);
1733 // Current request is always the last op returned
1734 if (disk
->cur_rq
== op
) {
1735 disk
->cur_rq
= TAILQ_PREV(op
, dispatch_disk_operations_s
,
1738 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1739 // Check if there are other pending stream operations behind it
1740 dispatch_operation_t op_next
= TAILQ_NEXT(op
, stream_list
);
1741 TAILQ_REMOVE(&op
->fd_entry
->stream_ops
, op
, stream_list
);
1743 TAILQ_INSERT_TAIL(&disk
->operations
, op_next
, operation_list
);
1746 TAILQ_REMOVE(&disk
->operations
, op
, operation_list
);
1748 dispatch_source_cancel(op
->timer
);
1750 // Final release will deliver any pending data
1751 _dispatch_op_debug("release -> %d (disk complete)", op
, op
->do_ref_cnt
);
1752 _dispatch_release(op
);
1755 static dispatch_operation_t
1756 _dispatch_stream_pick_next_operation(dispatch_stream_t stream
,
1757 dispatch_operation_t op
)
1761 // On the first run through, pick the first operation
1762 if (!_dispatch_stream_operation_avail(stream
)) {
1765 if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
])) {
1766 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_STREAM
]);
1767 } else if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) {
1768 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1772 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1773 // Stream operations need to be serialized so continue the current
1774 // operation until it is finished
1777 // Get the next random operation (round-robin)
1778 if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
1779 op
= TAILQ_NEXT(op
, operation_list
);
1781 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1788 static dispatch_operation_t
1789 _dispatch_disk_pick_next_operation(dispatch_disk_t disk
)
1792 dispatch_operation_t op
;
1793 if (!TAILQ_EMPTY(&disk
->operations
)) {
1794 if (disk
->cur_rq
== NULL
) {
1795 op
= TAILQ_FIRST(&disk
->operations
);
1799 op
= TAILQ_NEXT(op
, operation_list
);
1801 op
= TAILQ_FIRST(&disk
->operations
);
1803 // TODO: more involved picking algorithm rdar://problem/8780312
1804 } while (op
->active
&& op
!= disk
->cur_rq
);
1815 _dispatch_stream_cleanup_operations(dispatch_stream_t stream
,
1816 dispatch_io_t channel
)
1819 dispatch_operation_t op
, tmp
;
1820 typeof(*stream
->operations
) *operations
;
1821 operations
= &stream
->operations
[DISPATCH_IO_RANDOM
];
1822 TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) {
1823 if (!channel
|| op
->channel
== channel
) {
1824 _dispatch_stream_complete_operation(stream
, op
);
1827 operations
= &stream
->operations
[DISPATCH_IO_STREAM
];
1828 TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) {
1829 if (!channel
|| op
->channel
== channel
) {
1830 _dispatch_stream_complete_operation(stream
, op
);
1833 if (stream
->source_running
&& !_dispatch_stream_operation_avail(stream
)) {
1834 dispatch_suspend(stream
->source
);
1835 stream
->source_running
= false;
1840 _dispatch_disk_cleanup_specified_operations(dispatch_disk_t disk
,
1841 dispatch_io_t channel
, bool inactive_only
)
1844 dispatch_operation_t op
, tmp
;
1845 TAILQ_FOREACH_SAFE(op
, &disk
->operations
, operation_list
, tmp
) {
1846 if (inactive_only
&& op
->active
) continue;
1847 if (!channel
|| op
->channel
== channel
) {
1848 _dispatch_op_debug("cleanup: disk %p", op
, disk
);
1849 _dispatch_disk_complete_operation(disk
, op
);
1855 _dispatch_disk_cleanup_operations(dispatch_disk_t disk
, dispatch_io_t channel
)
1857 _dispatch_disk_cleanup_specified_operations(disk
, channel
, false);
1861 _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk
,
1862 dispatch_io_t channel
)
1864 _dispatch_disk_cleanup_specified_operations(disk
, channel
, true);
1868 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1870 static dispatch_source_t
1871 _dispatch_stream_source(dispatch_stream_t stream
, dispatch_operation_t op
)
1874 if (stream
->source
) {
1875 return stream
->source
;
1877 dispatch_fd_t fd
= op
->fd_entry
->fd
;
1878 _dispatch_op_debug("stream source create", op
);
1879 dispatch_source_t source
= NULL
;
1880 if (op
->direction
== DOP_DIR_READ
) {
1881 source
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
,
1882 (uintptr_t)fd
, 0, stream
->dq
);
1883 } else if (op
->direction
== DOP_DIR_WRITE
) {
1884 source
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
,
1885 (uintptr_t)fd
, 0, stream
->dq
);
1887 dispatch_assert(op
->direction
< DOP_DIR_MAX
);
1890 dispatch_set_context(source
, stream
);
1891 dispatch_source_set_event_handler_f(source
,
1892 _dispatch_stream_source_handler
);
1893 // Close queue must not run user cleanup handlers until sources are fully
1895 dispatch_queue_t close_queue
= op
->fd_entry
->close_queue
;
1896 dispatch_source_set_cancel_handler(source
, ^{
1897 _dispatch_op_debug("stream source cancel", op
);
1898 dispatch_resume(close_queue
);
1900 stream
->source
= source
;
1901 return stream
->source
;
1905 _dispatch_stream_source_handler(void *ctx
)
1908 dispatch_stream_t stream
= (dispatch_stream_t
)ctx
;
1909 dispatch_suspend(stream
->source
);
1910 stream
->source_running
= false;
1911 return _dispatch_stream_handler(stream
);
1915 _dispatch_stream_queue_handler(void *ctx
)
1918 dispatch_stream_t stream
= (dispatch_stream_t
)dispatch_get_context(ctx
);
1920 // _dispatch_stream_dispose has been called
1923 return _dispatch_stream_handler(stream
);
1927 _dispatch_stream_handler(void *ctx
)
1930 dispatch_stream_t stream
= (dispatch_stream_t
)ctx
;
1931 dispatch_operation_t op
;
1933 op
= _dispatch_stream_pick_next_operation(stream
, stream
->op
);
1935 _dispatch_debug("no operation found: stream %p", stream
);
1938 int err
= _dispatch_io_get_error(op
, NULL
, true);
1941 _dispatch_stream_complete_operation(stream
, op
);
1945 _dispatch_op_debug("stream handler", op
);
1946 dispatch_fd_entry_t fd_entry
= op
->fd_entry
;
1947 _dispatch_fd_entry_retain(fd_entry
);
1948 // For performance analysis
1949 if (!op
->total
&& dispatch_io_defaults
.initial_delivery
) {
1950 // Empty delivery to signal the start of the operation
1951 _dispatch_op_debug("initial delivery", op
);
1952 _dispatch_operation_deliver_data(op
, DOP_DELIVER
);
1954 // TODO: perform on the operation target queue to get correct priority
1955 int result
= _dispatch_operation_perform(op
);
1956 dispatch_op_flags_t flags
= ~0u;
1958 case DISPATCH_OP_DELIVER
:
1959 flags
= DOP_DEFAULT
;
1961 case DISPATCH_OP_DELIVER_AND_COMPLETE
:
1962 flags
= (flags
!= DOP_DEFAULT
) ? DOP_DELIVER
| DOP_NO_EMPTY
:
1964 _dispatch_operation_deliver_data(op
, flags
);
1966 case DISPATCH_OP_COMPLETE
:
1967 if (flags
!= DOP_DEFAULT
) {
1968 _dispatch_stream_complete_operation(stream
, op
);
1970 if (_dispatch_stream_operation_avail(stream
)) {
1971 dispatch_async_f(stream
->dq
, stream
->dq
,
1972 _dispatch_stream_queue_handler
);
1975 case DISPATCH_OP_COMPLETE_RESUME
:
1976 _dispatch_stream_complete_operation(stream
, op
);
1978 case DISPATCH_OP_RESUME
:
1979 if (_dispatch_stream_operation_avail(stream
)) {
1980 stream
->source_running
= true;
1981 dispatch_resume(_dispatch_stream_source(stream
, op
));
1984 case DISPATCH_OP_ERR
:
1985 _dispatch_stream_cleanup_operations(stream
, op
->channel
);
1987 case DISPATCH_OP_FD_ERR
:
1988 _dispatch_fd_entry_retain(fd_entry
);
1989 dispatch_async(fd_entry
->barrier_queue
, ^{
1990 _dispatch_fd_entry_cleanup_operations(fd_entry
, NULL
);
1991 _dispatch_fd_entry_release(fd_entry
);
1997 _dispatch_fd_entry_release(fd_entry
);
2002 _dispatch_disk_handler(void *ctx
)
2005 dispatch_disk_t disk
= (dispatch_disk_t
)ctx
;
2006 if (disk
->io_active
) {
2009 _dispatch_disk_debug("disk handler", disk
);
2010 dispatch_operation_t op
;
2011 size_t i
= disk
->free_idx
, j
= disk
->req_idx
;
2013 j
+= disk
->advise_list_depth
;
2016 if ((!disk
->advise_list
[i%disk
->advise_list_depth
]) &&
2017 (op
= _dispatch_disk_pick_next_operation(disk
))) {
2018 int err
= _dispatch_io_get_error(op
, NULL
, true);
2021 _dispatch_disk_complete_operation(disk
, op
);
2024 _dispatch_retain(op
);
2025 _dispatch_op_debug("retain -> %d", op
, op
->do_ref_cnt
+ 1);
2026 disk
->advise_list
[i%disk
->advise_list_depth
] = op
;
2028 _dispatch_op_debug("activate: disk %p", op
, disk
);
2029 _dispatch_object_debug(op
, "%s", __func__
);
2031 // No more operations to get
2036 disk
->free_idx
= (i%disk
->advise_list_depth
);
2037 op
= disk
->advise_list
[disk
->req_idx
];
2039 disk
->io_active
= true;
2040 _dispatch_op_debug("async perform: disk %p", op
, disk
);
2041 dispatch_async_f(op
->do_targetq
, disk
, _dispatch_disk_perform
);
2046 _dispatch_disk_perform(void *ctxt
)
2048 dispatch_disk_t disk
= ctxt
;
2049 _dispatch_disk_debug("disk perform", disk
);
2050 size_t chunk_size
= dispatch_io_defaults
.chunk_size
;
2051 dispatch_operation_t op
;
2052 size_t i
= disk
->advise_idx
, j
= disk
->free_idx
;
2054 j
+= disk
->advise_list_depth
;
2057 op
= disk
->advise_list
[i%disk
->advise_list_depth
];
2059 // Nothing more to advise, must be at free_idx
2060 dispatch_assert(i%disk
->advise_list_depth
== disk
->free_idx
);
2063 if (op
->direction
== DOP_DIR_WRITE
) {
2064 // TODO: preallocate writes ? rdar://problem/9032172
2067 if (op
->fd_entry
->fd
== -1 && _dispatch_fd_entry_open(op
->fd_entry
,
2071 // For performance analysis
2072 if (!op
->total
&& dispatch_io_defaults
.initial_delivery
) {
2073 // Empty delivery to signal the start of the operation
2074 _dispatch_op_debug("initial delivery", op
);
2075 _dispatch_operation_deliver_data(op
, DOP_DELIVER
);
2077 // Advise two chunks if the list only has one element and this is the
2078 // first advise on the operation
2079 if ((j
-i
) == 1 && !disk
->advise_list
[disk
->free_idx
] &&
2080 !op
->advise_offset
) {
2083 _dispatch_operation_advise(op
, chunk_size
);
2085 disk
->advise_idx
= i%disk
->advise_list_depth
;
2086 op
= disk
->advise_list
[disk
->req_idx
];
2087 int result
= _dispatch_operation_perform(op
);
2088 disk
->advise_list
[disk
->req_idx
] = NULL
;
2089 disk
->req_idx
= (++disk
->req_idx
)%disk
->advise_list_depth
;
2090 _dispatch_op_debug("async perform completion: disk %p", op
, disk
);
2091 dispatch_async(disk
->pick_queue
, ^{
2092 _dispatch_op_debug("perform completion", op
);
2094 case DISPATCH_OP_DELIVER
:
2095 _dispatch_operation_deliver_data(op
, DOP_DEFAULT
);
2097 case DISPATCH_OP_COMPLETE
:
2098 _dispatch_disk_complete_operation(disk
, op
);
2100 case DISPATCH_OP_DELIVER_AND_COMPLETE
:
2101 _dispatch_operation_deliver_data(op
, DOP_DELIVER
| DOP_NO_EMPTY
);
2102 _dispatch_disk_complete_operation(disk
, op
);
2104 case DISPATCH_OP_ERR
:
2105 _dispatch_disk_cleanup_operations(disk
, op
->channel
);
2107 case DISPATCH_OP_FD_ERR
:
2108 _dispatch_disk_cleanup_operations(disk
, NULL
);
2111 dispatch_assert(result
);
2114 _dispatch_op_debug("deactivate: disk %p", op
, disk
);
2116 disk
->io_active
= false;
2117 _dispatch_disk_handler(disk
);
2118 // Balancing the retain in _dispatch_disk_handler. Note that op must be
2119 // released at the very end, since it might hold the last reference to
2121 _dispatch_op_debug("release -> %d (disk perform complete)", op
,
2123 _dispatch_release(op
);
2128 #pragma mark dispatch_operation_perform
2131 _dispatch_operation_advise(dispatch_operation_t op
, size_t chunk_size
)
2133 _dispatch_op_debug("advise", op
);
2134 if (_dispatch_io_get_error(op
, NULL
, true)) return;
2136 // linux does not support fcntl (F_RDAVISE)
2137 // define necessary datastructure and use readahead
2144 struct radvisory advise
;
2145 // No point in issuing a read advise for the next chunk if we are already
2146 // a chunk ahead from reading the bytes
2147 if (op
->advise_offset
> (off_t
)(((size_t)op
->offset
+ op
->total
) +
2148 chunk_size
+ PAGE_SIZE
)) {
2151 _dispatch_object_debug(op
, "%s", __func__
);
2152 advise
.ra_count
= (int)chunk_size
;
2153 if (!op
->advise_offset
) {
2154 op
->advise_offset
= op
->offset
;
2155 // If this is the first time through, align the advised range to a
2157 size_t pg_fraction
= ((size_t)op
->offset
+ chunk_size
) % PAGE_SIZE
;
2158 advise
.ra_count
+= (int)(pg_fraction
? PAGE_SIZE
- pg_fraction
: 0);
2160 advise
.ra_offset
= op
->advise_offset
;
2161 op
->advise_offset
+= advise
.ra_count
;
2163 _dispatch_io_syscall_switch(err
,
2164 readahead(op
->fd_entry
->fd
, advise
.ra_offset
, advise
.ra_count
),
2165 case EINVAL
: break; // fd does refer to a non-supported filetype
2166 default: (void)dispatch_assume_zero(err
); break;
2169 _dispatch_io_syscall_switch(err
,
2170 fcntl(op
->fd_entry
->fd
, F_RDADVISE
, &advise
),
2171 case EFBIG
: break; // advised past the end of the file rdar://10415691
2172 case ENOTSUP
: break; // not all FS support radvise rdar://13484629
2173 // TODO: set disk status on error
2174 default: (void)dispatch_assume_zero(err
); break;
2180 _dispatch_operation_perform(dispatch_operation_t op
)
2182 _dispatch_op_debug("perform", op
);
2183 int err
= _dispatch_io_get_error(op
, NULL
, true);
2187 _dispatch_object_debug(op
, "%s", __func__
);
2189 size_t max_buf_siz
= op
->params
.high
;
2190 size_t chunk_siz
= dispatch_io_defaults
.chunk_size
;
2191 if (op
->direction
== DOP_DIR_READ
) {
2192 // If necessary, create a buffer for the ongoing operation, large
2193 // enough to fit chunk_size but at most high-water
2194 size_t data_siz
= dispatch_data_get_size(op
->data
);
2196 dispatch_assert(data_siz
< max_buf_siz
);
2197 max_buf_siz
-= data_siz
;
2199 if (max_buf_siz
> chunk_siz
) {
2200 max_buf_siz
= chunk_siz
;
2202 if (op
->length
< SIZE_MAX
) {
2203 op
->buf_siz
= op
->length
- op
->total
;
2204 if (op
->buf_siz
> max_buf_siz
) {
2205 op
->buf_siz
= max_buf_siz
;
2208 op
->buf_siz
= max_buf_siz
;
2210 op
->buf
= valloc(op
->buf_siz
);
2211 _dispatch_op_debug("buffer allocated", op
);
2212 } else if (op
->direction
== DOP_DIR_WRITE
) {
2213 // Always write the first data piece, if that is smaller than a
2214 // chunk, accumulate further data pieces until chunk size is reached
2215 if (chunk_siz
> max_buf_siz
) {
2216 chunk_siz
= max_buf_siz
;
2219 dispatch_data_apply(op
->data
,
2220 ^(dispatch_data_t region DISPATCH_UNUSED
,
2221 size_t offset DISPATCH_UNUSED
,
2222 const void* buf DISPATCH_UNUSED
, size_t len
) {
2223 size_t siz
= op
->buf_siz
+ len
;
2224 if (!op
->buf_siz
|| siz
<= chunk_siz
) {
2227 return (bool)(siz
< chunk_siz
);
2229 if (op
->buf_siz
> max_buf_siz
) {
2230 op
->buf_siz
= max_buf_siz
;
2233 d
= dispatch_data_create_subrange(op
->data
, 0, op
->buf_siz
);
2234 op
->buf_data
= dispatch_data_create_map(d
, (const void**)&op
->buf
,
2236 _dispatch_io_data_release(d
);
2237 _dispatch_op_debug("buffer mapped", op
);
2240 if (op
->fd_entry
->fd
== -1) {
2241 err
= _dispatch_fd_entry_open(op
->fd_entry
, op
->channel
);
2246 void *buf
= op
->buf
+ op
->buf_len
;
2247 size_t len
= op
->buf_siz
- op
->buf_len
;
2248 off_t off
= (off_t
)((size_t)op
->offset
+ op
->total
);
2249 ssize_t processed
= -1;
2251 if (op
->direction
== DOP_DIR_READ
) {
2252 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
2253 processed
= read(op
->fd_entry
->fd
, buf
, len
);
2254 } else if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
2255 processed
= pread(op
->fd_entry
->fd
, buf
, len
, off
);
2257 } else if (op
->direction
== DOP_DIR_WRITE
) {
2258 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
2259 processed
= write(op
->fd_entry
->fd
, buf
, len
);
2260 } else if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
2261 processed
= pwrite(op
->fd_entry
->fd
, buf
, len
, off
);
2264 // Encountered an error on the file descriptor
2265 if (processed
== -1) {
2272 // EOF is indicated by two handler invocations
2273 if (processed
== 0) {
2274 _dispatch_op_debug("performed: EOF", op
);
2275 return DISPATCH_OP_DELIVER_AND_COMPLETE
;
2277 op
->buf_len
+= (size_t)processed
;
2278 op
->total
+= (size_t)processed
;
2279 if (op
->total
== op
->length
) {
2280 // Finished processing all the bytes requested by the operation
2281 return DISPATCH_OP_COMPLETE
;
2283 // Deliver data only if we satisfy the filters
2284 return DISPATCH_OP_DELIVER
;
2287 if (err
== EAGAIN
) {
2288 // For disk based files with blocking I/O we should never get EAGAIN
2289 dispatch_assert(!op
->fd_entry
->disk
);
2290 _dispatch_op_debug("performed: EAGAIN", op
);
2291 if (op
->direction
== DOP_DIR_READ
&& op
->total
&&
2292 op
->channel
== op
->fd_entry
->convenience_channel
) {
2293 // Convenience read with available data completes on EAGAIN
2294 return DISPATCH_OP_COMPLETE_RESUME
;
2296 return DISPATCH_OP_RESUME
;
2298 _dispatch_op_debug("performed: err %d", op
, err
);
2302 return DISPATCH_OP_ERR
;
2304 (void)os_atomic_cmpxchg2o(op
->fd_entry
, err
, 0, err
, relaxed
);
2305 return DISPATCH_OP_FD_ERR
;
2307 return DISPATCH_OP_COMPLETE
;
2312 _dispatch_operation_deliver_data(dispatch_operation_t op
,
2313 dispatch_op_flags_t flags
)
2315 // Either called from stream resp. pick queue or when op is finalized
2316 dispatch_data_t data
= NULL
;
2318 size_t undelivered
= op
->undelivered
+ op
->buf_len
;
2319 bool deliver
= (flags
& (DOP_DELIVER
|DOP_DONE
)) ||
2320 (op
->flags
& DOP_DELIVER
);
2321 op
->flags
= DOP_DEFAULT
;
2323 // Don't deliver data until low water mark has been reached
2324 if (undelivered
>= op
->params
.low
) {
2326 } else if (op
->buf_len
< op
->buf_siz
) {
2327 // Request buffer is not yet used up
2328 _dispatch_op_debug("buffer data: undelivered %zu", op
, undelivered
);
2333 if (!err
&& (op
->channel
->atomic_flags
& DIO_STOPPED
)) {
2338 // Deliver data or buffer used up
2339 if (op
->direction
== DOP_DIR_READ
) {
2341 void *buf
= op
->buf
;
2342 data
= dispatch_data_create(buf
, op
->buf_len
, NULL
,
2343 DISPATCH_DATA_DESTRUCTOR_FREE
);
2346 dispatch_data_t d
= dispatch_data_create_concat(op
->data
, data
);
2347 _dispatch_io_data_release(op
->data
);
2348 _dispatch_io_data_release(data
);
2353 op
->data
= deliver
? dispatch_data_empty
: data
;
2354 } else if (op
->direction
== DOP_DIR_WRITE
) {
2356 data
= dispatch_data_create_subrange(op
->data
, op
->buf_len
,
2359 if (op
->buf_data
&& op
->buf_len
== op
->buf_siz
) {
2360 _dispatch_io_data_release(op
->buf_data
);
2361 op
->buf_data
= NULL
;
2364 // Trim newly written buffer from head of unwritten data
2367 _dispatch_io_data_retain(data
);
2370 d
= dispatch_data_create_subrange(op
->data
, op
->buf_siz
,
2373 _dispatch_io_data_release(op
->data
);
2377 dispatch_assert(op
->direction
< DOP_DIR_MAX
);
2380 if (!deliver
|| ((flags
& DOP_NO_EMPTY
) && !dispatch_data_get_size(data
))) {
2381 op
->undelivered
= undelivered
;
2382 _dispatch_op_debug("buffer data: undelivered %zu", op
, undelivered
);
2385 op
->undelivered
= 0;
2386 _dispatch_object_debug(op
, "%s", __func__
);
2387 _dispatch_op_debug("deliver data", op
);
2388 dispatch_op_direction_t direction
= op
->direction
;
2389 dispatch_io_handler_t handler
= op
->handler
;
2390 dispatch_fd_entry_t fd_entry
= op
->fd_entry
;
2391 _dispatch_fd_entry_retain(fd_entry
);
2392 dispatch_io_t channel
= op
->channel
;
2393 _dispatch_retain(channel
);
2394 // Note that data delivery may occur after the operation is freed
2395 dispatch_async(op
->op_q
, ^{
2396 bool done
= (flags
& DOP_DONE
);
2397 dispatch_data_t d
= data
;
2399 if (direction
== DOP_DIR_READ
&& err
) {
2400 if (dispatch_data_get_size(d
)) {
2401 _dispatch_op_debug("IO handler invoke", op
);
2402 handler(false, d
, 0);
2405 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
2409 _dispatch_op_debug("IO handler invoke: err %d", op
, err
);
2410 handler(done
, d
, err
);
2411 _dispatch_release(channel
);
2412 _dispatch_fd_entry_release(fd_entry
);
2413 _dispatch_io_data_release(data
);
2418 #pragma mark dispatch_io_debug
2421 _dispatch_io_debug_attr(dispatch_io_t channel
, char* buf
, size_t bufsiz
)
2423 dispatch_queue_t target
= channel
->do_targetq
;
2424 return dsnprintf(buf
, bufsiz
, "type = %s, fd = 0x%x, %sfd_entry = %p, "
2425 "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = "
2426 "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ",
2427 channel
->params
.type
== DISPATCH_IO_STREAM
? "stream" : "random",
2428 channel
->fd_actual
, channel
->atomic_flags
& DIO_STOPPED
?
2429 "stopped, " : channel
->atomic_flags
& DIO_CLOSED
? "closed, " : "",
2430 channel
->fd_entry
, channel
->queue
, target
&& target
->dq_label
?
2431 target
->dq_label
: "", target
, channel
->barrier_queue
,
2432 channel
->barrier_group
, channel
->err
, channel
->params
.low
,
2433 channel
->params
.high
, channel
->params
.interval_flags
&
2434 DISPATCH_IO_STRICT_INTERVAL
? "(strict)" : "",
2435 (unsigned long long) channel
->params
.interval
);
2439 _dispatch_io_debug(dispatch_io_t channel
, char* buf
, size_t bufsiz
)
2442 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
2443 dx_kind(channel
), channel
);
2444 offset
+= _dispatch_object_debug_attr(channel
, &buf
[offset
],
2446 offset
+= _dispatch_io_debug_attr(channel
, &buf
[offset
], bufsiz
- offset
);
2447 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
2452 _dispatch_operation_debug_attr(dispatch_operation_t op
, char* buf
,
2455 dispatch_queue_t target
= op
->do_targetq
;
2456 dispatch_queue_t oqtarget
= op
->op_q
? op
->op_q
->do_targetq
: NULL
;
2457 return dsnprintf(buf
, bufsiz
, "type = %s %s, fd = 0x%x, fd_entry = %p, "
2458 "channel = %p, queue = %p -> %s[%p], target = %s[%p], "
2459 "offset = %lld, length = %zu, done = %zu, undelivered = %zu, "
2460 "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, "
2461 "interval%s = %llu ", op
->params
.type
== DISPATCH_IO_STREAM
?
2462 "stream" : "random", op
->direction
== DOP_DIR_READ
? "read" :
2463 "write", op
->fd_entry
? op
->fd_entry
->fd
: -1, op
->fd_entry
,
2464 op
->channel
, op
->op_q
, oqtarget
&& oqtarget
->dq_label
?
2465 oqtarget
->dq_label
: "", oqtarget
, target
&& target
->dq_label
?
2466 target
->dq_label
: "", target
, (long long)op
->offset
, op
->length
,
2467 op
->total
, op
->undelivered
+ op
->buf_len
, op
->flags
, op
->err
,
2468 op
->params
.low
, op
->params
.high
, op
->params
.interval_flags
&
2469 DISPATCH_IO_STRICT_INTERVAL
? "(strict)" : "",
2470 (unsigned long long)op
->params
.interval
);
2474 _dispatch_operation_debug(dispatch_operation_t op
, char* buf
, size_t bufsiz
)
2477 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
2479 offset
+= _dispatch_object_debug_attr(op
, &buf
[offset
], bufsiz
- offset
);
2480 offset
+= _dispatch_operation_debug_attr(op
, &buf
[offset
], bufsiz
- offset
);
2481 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");