2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 #ifndef DISPATCH_IO_DEBUG
24 #define DISPATCH_IO_DEBUG DISPATCH_DEBUG
28 #define PAGE_SIZE getpagesize()
31 #if DISPATCH_DATA_IS_BRIDGED_TO_NSDATA
32 #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
33 #define _dispatch_io_data_release(x) _dispatch_objc_release(x)
35 #define _dispatch_io_data_retain(x) dispatch_retain(x)
36 #define _dispatch_io_data_release(x) dispatch_release(x)
39 typedef void (^dispatch_fd_entry_init_callback_t
)(dispatch_fd_entry_t fd_entry
);
41 DISPATCH_EXPORT DISPATCH_NOTHROW
42 void _dispatch_iocntl(uint32_t param
, uint64_t value
);
44 static dispatch_operation_t
_dispatch_operation_create(
45 dispatch_op_direction_t direction
, dispatch_io_t channel
, off_t offset
,
46 size_t length
, dispatch_data_t data
, dispatch_queue_t queue
,
47 dispatch_io_handler_t handler
);
48 static void _dispatch_operation_enqueue(dispatch_operation_t op
,
49 dispatch_op_direction_t direction
, dispatch_data_t data
);
50 static dispatch_source_t
_dispatch_operation_timer(dispatch_queue_t tq
,
51 dispatch_operation_t op
);
52 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
);
53 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
);
54 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd
,
55 dispatch_fd_entry_init_callback_t completion_callback
);
56 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
,
58 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_path(
59 dispatch_io_path_data_t path_data
, dev_t dev
, mode_t mode
);
60 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
,
61 dispatch_io_t channel
);
62 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
,
63 dispatch_io_t channel
);
64 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry
,
66 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
,
67 dispatch_op_direction_t direction
);
68 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
);
69 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream
,
70 dispatch_operation_t operation
, dispatch_data_t data
);
71 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk
,
72 dispatch_operation_t operation
, dispatch_data_t data
);
73 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream
,
74 dispatch_io_t channel
);
75 static void _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk
,
76 dispatch_io_t channel
);
77 static void _dispatch_stream_source_handler(void *ctx
);
78 static void _dispatch_stream_queue_handler(void *ctx
);
79 static void _dispatch_stream_handler(void *ctx
);
80 static void _dispatch_disk_handler(void *ctx
);
81 static void _dispatch_disk_perform(void *ctxt
);
82 static void _dispatch_operation_advise(dispatch_operation_t op
,
84 static int _dispatch_operation_perform(dispatch_operation_t op
);
85 static void _dispatch_operation_deliver_data(dispatch_operation_t op
,
86 dispatch_op_flags_t flags
);
88 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
89 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
90 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
91 case EINTR: continue; \
96 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
97 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
102 #define _dispatch_io_syscall(__syscall) do { int __err; \
103 _dispatch_io_syscall_switch(__err, __syscall); \
107 DISPATCH_OP_COMPLETE
= 1,
109 DISPATCH_OP_DELIVER_AND_COMPLETE
,
110 DISPATCH_OP_COMPLETE_RESUME
,
116 #define _dispatch_io_Block_copy(x) \
117 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
120 #pragma mark dispatch_io_debug
122 #if DISPATCH_IO_DEBUG
124 #define _dispatch_io_log(x, ...) do { \
125 _dispatch_log("%llu\t%p\t" x, _dispatch_absolute_time(), \
126 (void *)_dispatch_thread_self(), ##__VA_ARGS__); \
128 #ifdef _dispatch_object_debug
129 #undef _dispatch_object_debug
130 #define _dispatch_object_debug dispatch_debug
131 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
134 #define _dispatch_io_log(x, ...) _dispatch_debug(x, ##__VA_ARGS__)
135 #endif // DISPATCH_DEBUG
137 #define _dispatch_io_log(x, ...)
138 #endif // DISPATCH_IO_DEBUG
140 #define _dispatch_fd_debug(msg, fd, ...) \
141 _dispatch_io_log("fd[0x%x]: " msg, fd, ##__VA_ARGS__)
142 #define _dispatch_op_debug(msg, op, ...) \
143 _dispatch_io_log("op[%p]: " msg, op, ##__VA_ARGS__)
144 #define _dispatch_channel_debug(msg, channel, ...) \
145 _dispatch_io_log("channel[%p]: " msg, channel, ##__VA_ARGS__)
146 #define _dispatch_fd_entry_debug(msg, fd_entry, ...) \
147 _dispatch_io_log("fd_entry[%p]: " msg, fd_entry, ##__VA_ARGS__)
148 #define _dispatch_disk_debug(msg, disk, ...) \
149 _dispatch_io_log("disk[%p]: " msg, disk, ##__VA_ARGS__)
152 #pragma mark dispatch_io_hashtables
154 // Global hashtable of dev_t -> disk_s mappings
155 DISPATCH_CACHELINE_ALIGN
156 static TAILQ_HEAD(, dispatch_disk_s
) _dispatch_io_devs
[DIO_HASH_SIZE
];
157 // Global hashtable of fd -> fd_entry_s mappings
158 DISPATCH_CACHELINE_ALIGN
159 static TAILQ_HEAD(, dispatch_fd_entry_s
) _dispatch_io_fds
[DIO_HASH_SIZE
];
161 static dispatch_once_t _dispatch_io_devs_lockq_pred
;
162 static dispatch_queue_t _dispatch_io_devs_lockq
;
163 static dispatch_queue_t _dispatch_io_fds_lockq
;
165 static char const * const _dispatch_io_key
= "io";
168 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED
)
170 _dispatch_io_fds_lockq
= dispatch_queue_create(
171 "com.apple.libdispatch-io.fd_lockq", NULL
);
173 for (i
= 0; i
< DIO_HASH_SIZE
; i
++) {
174 TAILQ_INIT(&_dispatch_io_fds
[i
]);
179 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED
)
181 _dispatch_io_devs_lockq
= dispatch_queue_create(
182 "com.apple.libdispatch-io.dev_lockq", NULL
);
184 for (i
= 0; i
< DIO_HASH_SIZE
; i
++) {
185 TAILQ_INIT(&_dispatch_io_devs
[i
]);
190 #pragma mark dispatch_io_defaults
193 DISPATCH_IOCNTL_CHUNK_PAGES
= 1,
194 DISPATCH_IOCNTL_LOW_WATER_CHUNKS
,
195 DISPATCH_IOCNTL_INITIAL_DELIVERY
,
196 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
,
199 static struct dispatch_io_defaults_s
{
200 size_t chunk_size
, low_water_chunks
, max_pending_io_reqs
;
201 bool initial_delivery
;
202 } dispatch_io_defaults
= {
203 .chunk_size
= DIO_MAX_CHUNK_SIZE
,
204 .low_water_chunks
= DIO_DEFAULT_LOW_WATER_CHUNKS
,
205 .max_pending_io_reqs
= DIO_MAX_PENDING_IO_REQS
,
208 #define _dispatch_iocntl_set_default(p, v) do { \
209 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
213 _dispatch_iocntl(uint32_t param
, uint64_t value
)
216 case DISPATCH_IOCNTL_CHUNK_PAGES
:
217 _dispatch_iocntl_set_default(chunk_size
, value
* PAGE_SIZE
);
219 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS
:
220 _dispatch_iocntl_set_default(low_water_chunks
, value
);
222 case DISPATCH_IOCNTL_INITIAL_DELIVERY
:
223 _dispatch_iocntl_set_default(initial_delivery
, value
);
224 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
:
225 _dispatch_iocntl_set_default(max_pending_io_reqs
, value
);
231 #pragma mark dispatch_io_t
234 _dispatch_io_create(dispatch_io_type_t type
)
236 dispatch_io_t channel
= _dispatch_alloc(DISPATCH_VTABLE(io
),
237 sizeof(struct dispatch_io_s
));
238 channel
->do_next
= DISPATCH_OBJECT_LISTLESS
;
239 channel
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
241 channel
->params
.type
= type
;
242 channel
->params
.high
= SIZE_MAX
;
243 channel
->params
.low
= dispatch_io_defaults
.low_water_chunks
*
244 dispatch_io_defaults
.chunk_size
;
245 channel
->queue
= dispatch_queue_create("com.apple.libdispatch-io.channelq",
251 _dispatch_io_init(dispatch_io_t channel
, dispatch_fd_entry_t fd_entry
,
252 dispatch_queue_t queue
, int err
, void (^cleanup_handler
)(int))
254 // Enqueue the cleanup handler on the suspended close queue
255 if (cleanup_handler
) {
256 _dispatch_retain(queue
);
257 dispatch_async(!err
? fd_entry
->close_queue
: channel
->queue
, ^{
258 dispatch_async(queue
, ^{
259 _dispatch_channel_debug("cleanup handler invoke: err %d",
261 cleanup_handler(err
);
263 _dispatch_release(queue
);
267 channel
->fd_entry
= fd_entry
;
268 dispatch_retain(fd_entry
->barrier_queue
);
269 dispatch_retain(fd_entry
->barrier_group
);
270 channel
->barrier_queue
= fd_entry
->barrier_queue
;
271 channel
->barrier_group
= fd_entry
->barrier_group
;
273 // Still need to create a barrier queue, since all operations go
275 channel
->barrier_queue
= dispatch_queue_create(
276 "com.apple.libdispatch-io.barrierq", NULL
);
277 channel
->barrier_group
= dispatch_group_create();
282 _dispatch_io_dispose(dispatch_io_t channel
)
284 _dispatch_object_debug(channel
, "%s", __func__
);
285 if (channel
->fd_entry
&&
286 !(channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
))) {
287 if (channel
->fd_entry
->path_data
) {
288 // This modification is safe since path_data->channel is checked
289 // only on close_queue (which is still suspended at this point)
290 channel
->fd_entry
->path_data
->channel
= NULL
;
292 // Cleanup handlers will only run when all channels related to this
294 _dispatch_fd_entry_release(channel
->fd_entry
);
296 if (channel
->queue
) {
297 dispatch_release(channel
->queue
);
299 if (channel
->barrier_queue
) {
300 dispatch_release(channel
->barrier_queue
);
302 if (channel
->barrier_group
) {
303 dispatch_release(channel
->barrier_group
);
308 _dispatch_io_validate_type(dispatch_io_t channel
, mode_t mode
)
313 } else if (channel
->params
.type
== DISPATCH_IO_RANDOM
&&
314 (S_ISFIFO(mode
) || S_ISSOCK(mode
))) {
321 _dispatch_io_get_error(dispatch_operation_t op
, dispatch_io_t channel
,
327 channel
= op
->channel
;
329 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
330 if (!ignore_closed
|| channel
->atomic_flags
& DIO_STOPPED
) {
336 err
= op
? op
->fd_entry
->err
: channel
->err
;
342 #pragma mark dispatch_io_channels
345 dispatch_io_create(dispatch_io_type_t type
, dispatch_fd_t fd
,
346 dispatch_queue_t queue
, void (^cleanup_handler
)(int))
348 if (type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) {
349 return DISPATCH_BAD_INPUT
;
351 dispatch_io_t channel
= _dispatch_io_create(type
);
353 _dispatch_channel_debug("create", channel
);
354 channel
->fd_actual
= fd
;
355 dispatch_suspend(channel
->queue
);
356 _dispatch_retain(queue
);
357 _dispatch_retain(channel
);
358 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
360 int err
= fd_entry
->err
;
362 err
= _dispatch_io_validate_type(channel
, fd_entry
->stat
.mode
);
364 if (!err
&& type
== DISPATCH_IO_RANDOM
) {
366 _dispatch_io_syscall_switch_noerr(err
,
367 f_ptr
= lseek(fd_entry
->fd
, 0, SEEK_CUR
),
368 case 0: channel
->f_ptr
= f_ptr
; break;
369 default: (void)dispatch_assume_zero(err
); break;
373 _dispatch_fd_entry_retain(fd_entry
);
374 _dispatch_io_init(channel
, fd_entry
, queue
, err
, cleanup_handler
);
375 dispatch_resume(channel
->queue
);
376 _dispatch_object_debug(channel
, "%s", __func__
);
377 _dispatch_release(channel
);
378 _dispatch_release(queue
);
380 _dispatch_object_debug(channel
, "%s", __func__
);
385 dispatch_io_create_f(dispatch_io_type_t type
, dispatch_fd_t fd
,
386 dispatch_queue_t queue
, void *context
,
387 void (*cleanup_handler
)(void *context
, int error
))
389 return dispatch_io_create(type
, fd
, queue
, !cleanup_handler
? NULL
:
390 ^(int error
){ cleanup_handler(context
, error
); });
394 dispatch_io_create_with_path(dispatch_io_type_t type
, const char *path
,
395 int oflag
, mode_t mode
, dispatch_queue_t queue
,
396 void (^cleanup_handler
)(int error
))
398 if ((type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) ||
400 return DISPATCH_BAD_INPUT
;
402 size_t pathlen
= strlen(path
);
403 dispatch_io_path_data_t path_data
= malloc(sizeof(*path_data
) + pathlen
+1);
405 return DISPATCH_OUT_OF_MEMORY
;
407 dispatch_io_t channel
= _dispatch_io_create(type
);
409 _dispatch_channel_debug("create with path %s", channel
, path
);
410 channel
->fd_actual
= -1;
411 path_data
->channel
= channel
;
412 path_data
->oflag
= oflag
;
413 path_data
->mode
= mode
;
414 path_data
->pathlen
= pathlen
;
415 memcpy(path_data
->path
, path
, pathlen
+ 1);
416 _dispatch_retain(queue
);
417 _dispatch_retain(channel
);
418 dispatch_async(channel
->queue
, ^{
421 _dispatch_io_syscall_switch_noerr(err
,
422 (path_data
->oflag
& O_NOFOLLOW
) == O_NOFOLLOW
424 || (path_data
->oflag
& O_SYMLINK
) == O_SYMLINK
426 ? lstat(path_data
->path
, &st
) : stat(path_data
->path
, &st
),
428 err
= _dispatch_io_validate_type(channel
, st
.st_mode
);
431 if ((path_data
->oflag
& O_CREAT
) &&
432 (*(path_data
->path
+ path_data
->pathlen
- 1) != '/')) {
433 // Check parent directory
434 char *c
= strrchr(path_data
->path
, '/');
438 _dispatch_io_syscall_switch_noerr(perr
,
439 stat(path_data
->path
, &st
),
441 // Since the parent directory exists, open() will
442 // create a regular file after the fd_entry has
444 st
.st_mode
= S_IFREG
;
455 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
);
456 _dispatch_release(channel
);
457 _dispatch_release(queue
);
460 dispatch_suspend(channel
->queue
);
461 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
,
462 _dispatch_io_devs_lockq_init
);
463 dispatch_async(_dispatch_io_devs_lockq
, ^{
464 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create_with_path(
465 path_data
, st
.st_dev
, st
.st_mode
);
466 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
);
467 dispatch_resume(channel
->queue
);
468 _dispatch_object_debug(channel
, "%s", __func__
);
469 _dispatch_release(channel
);
470 _dispatch_release(queue
);
473 _dispatch_object_debug(channel
, "%s", __func__
);
478 dispatch_io_create_with_path_f(dispatch_io_type_t type
, const char *path
,
479 int oflag
, mode_t mode
, dispatch_queue_t queue
, void *context
,
480 void (*cleanup_handler
)(void *context
, int error
))
482 return dispatch_io_create_with_path(type
, path
, oflag
, mode
, queue
,
483 !cleanup_handler
? NULL
:
484 ^(int error
){ cleanup_handler(context
, error
); });
488 dispatch_io_create_with_io(dispatch_io_type_t type
, dispatch_io_t in_channel
,
489 dispatch_queue_t queue
, void (^cleanup_handler
)(int error
))
491 if (type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) {
492 return DISPATCH_BAD_INPUT
;
494 dispatch_io_t channel
= _dispatch_io_create(type
);
495 _dispatch_channel_debug("create with channel %p", channel
, in_channel
);
496 dispatch_suspend(channel
->queue
);
497 _dispatch_retain(queue
);
498 _dispatch_retain(channel
);
499 _dispatch_retain(in_channel
);
500 dispatch_async(in_channel
->queue
, ^{
501 int err0
= _dispatch_io_get_error(NULL
, in_channel
, false);
504 _dispatch_io_init(channel
, NULL
, queue
, err0
, cleanup_handler
);
505 dispatch_resume(channel
->queue
);
506 _dispatch_release(channel
);
507 _dispatch_release(in_channel
);
508 _dispatch_release(queue
);
511 dispatch_async(in_channel
->barrier_queue
, ^{
512 int err
= _dispatch_io_get_error(NULL
, in_channel
, false);
513 // If there is no error, the fd_entry for the in_channel is valid.
514 // Since we are running on in_channel's queue, the fd_entry has been
515 // fully resolved and will stay valid for the duration of this block
517 err
= in_channel
->err
;
519 err
= in_channel
->fd_entry
->err
;
523 err
= _dispatch_io_validate_type(channel
,
524 in_channel
->fd_entry
->stat
.mode
);
526 if (!err
&& type
== DISPATCH_IO_RANDOM
&& in_channel
->fd
!= -1) {
528 _dispatch_io_syscall_switch_noerr(err
,
529 f_ptr
= lseek(in_channel
->fd_entry
->fd
, 0, SEEK_CUR
),
530 case 0: channel
->f_ptr
= f_ptr
; break;
531 default: (void)dispatch_assume_zero(err
); break;
536 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
);
537 dispatch_resume(channel
->queue
);
538 _dispatch_release(channel
);
539 _dispatch_release(in_channel
);
540 _dispatch_release(queue
);
543 if (in_channel
->fd
== -1) {
544 // in_channel was created from path
546 channel
->fd_actual
= -1;
547 mode_t mode
= in_channel
->fd_entry
->stat
.mode
;
548 dev_t dev
= in_channel
->fd_entry
->stat
.dev
;
549 size_t path_data_len
= sizeof(struct dispatch_io_path_data_s
) +
550 in_channel
->fd_entry
->path_data
->pathlen
+ 1;
551 dispatch_io_path_data_t path_data
= malloc(path_data_len
);
552 memcpy(path_data
, in_channel
->fd_entry
->path_data
,
554 path_data
->channel
= channel
;
555 // lockq_io_devs is known to already exist
556 dispatch_async(_dispatch_io_devs_lockq
, ^{
557 dispatch_fd_entry_t fd_entry
;
558 fd_entry
= _dispatch_fd_entry_create_with_path(path_data
,
560 _dispatch_io_init(channel
, fd_entry
, queue
, 0,
562 dispatch_resume(channel
->queue
);
563 _dispatch_release(channel
);
564 _dispatch_release(queue
);
567 dispatch_fd_entry_t fd_entry
= in_channel
->fd_entry
;
568 channel
->fd
= in_channel
->fd
;
569 channel
->fd_actual
= in_channel
->fd_actual
;
570 _dispatch_fd_entry_retain(fd_entry
);
571 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
);
572 dispatch_resume(channel
->queue
);
573 _dispatch_release(channel
);
574 _dispatch_release(queue
);
576 _dispatch_release(in_channel
);
577 _dispatch_object_debug(channel
, "%s", __func__
);
580 _dispatch_object_debug(channel
, "%s", __func__
);
585 dispatch_io_create_with_io_f(dispatch_io_type_t type
, dispatch_io_t in_channel
,
586 dispatch_queue_t queue
, void *context
,
587 void (*cleanup_handler
)(void *context
, int error
))
589 return dispatch_io_create_with_io(type
, in_channel
, queue
,
590 !cleanup_handler
? NULL
:
591 ^(int error
){ cleanup_handler(context
, error
); });
595 #pragma mark dispatch_io_accessors
598 dispatch_io_set_high_water(dispatch_io_t channel
, size_t high_water
)
600 _dispatch_retain(channel
);
601 dispatch_async(channel
->queue
, ^{
602 _dispatch_channel_debug("set high water: %zu", channel
, high_water
);
603 if (channel
->params
.low
> high_water
) {
604 channel
->params
.low
= high_water
;
606 channel
->params
.high
= high_water
? high_water
: 1;
607 _dispatch_release(channel
);
612 dispatch_io_set_low_water(dispatch_io_t channel
, size_t low_water
)
614 _dispatch_retain(channel
);
615 dispatch_async(channel
->queue
, ^{
616 _dispatch_channel_debug("set low water: %zu", channel
, low_water
);
617 if (channel
->params
.high
< low_water
) {
618 channel
->params
.high
= low_water
? low_water
: 1;
620 channel
->params
.low
= low_water
;
621 _dispatch_release(channel
);
626 dispatch_io_set_interval(dispatch_io_t channel
, uint64_t interval
,
629 _dispatch_retain(channel
);
630 dispatch_async(channel
->queue
, ^{
631 _dispatch_channel_debug("set interval: %llu", channel
, interval
);
632 channel
->params
.interval
= interval
< INT64_MAX
? interval
: INT64_MAX
;
633 channel
->params
.interval_flags
= flags
;
634 _dispatch_release(channel
);
639 _dispatch_io_set_target_queue(dispatch_io_t channel
, dispatch_queue_t dq
)
641 _dispatch_retain(dq
);
642 _dispatch_retain(channel
);
643 dispatch_async(channel
->queue
, ^{
644 dispatch_queue_t prev_dq
= channel
->do_targetq
;
645 channel
->do_targetq
= dq
;
646 _dispatch_release(prev_dq
);
647 _dispatch_object_debug(channel
, "%s", __func__
);
648 _dispatch_release(channel
);
653 dispatch_io_get_descriptor(dispatch_io_t channel
)
655 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
658 dispatch_fd_t fd
= channel
->fd_actual
;
659 if (fd
== -1 && !_dispatch_io_get_error(NULL
, channel
, false)) {
660 dispatch_thread_context_t ctxt
=
661 _dispatch_thread_context_find(_dispatch_io_key
);
662 if (ctxt
&& ctxt
->dtc_io_in_barrier
== channel
) {
663 (void)_dispatch_fd_entry_open(channel
->fd_entry
, channel
);
666 return channel
->fd_actual
;
670 #pragma mark dispatch_io_operations
673 _dispatch_io_stop(dispatch_io_t channel
)
675 _dispatch_channel_debug("stop", channel
);
676 (void)os_atomic_or2o(channel
, atomic_flags
, DIO_STOPPED
, relaxed
);
677 _dispatch_retain(channel
);
678 dispatch_async(channel
->queue
, ^{
679 dispatch_async(channel
->barrier_queue
, ^{
680 _dispatch_object_debug(channel
, "%s", __func__
);
681 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
683 _dispatch_channel_debug("stop cleanup", channel
);
684 _dispatch_fd_entry_cleanup_operations(fd_entry
, channel
);
685 if (!(channel
->atomic_flags
& DIO_CLOSED
)) {
686 channel
->fd_entry
= NULL
;
687 _dispatch_fd_entry_release(fd_entry
);
689 } else if (channel
->fd
!= -1) {
690 // Stop after close, need to check if fd_entry still exists
691 _dispatch_retain(channel
);
692 dispatch_async(_dispatch_io_fds_lockq
, ^{
693 _dispatch_object_debug(channel
, "%s", __func__
);
694 _dispatch_channel_debug("stop cleanup after close",
696 dispatch_fd_entry_t fdi
;
697 uintptr_t hash
= DIO_HASH(channel
->fd
);
698 TAILQ_FOREACH(fdi
, &_dispatch_io_fds
[hash
], fd_list
) {
699 if (fdi
->fd
== channel
->fd
) {
700 _dispatch_fd_entry_cleanup_operations(fdi
, channel
);
704 _dispatch_release(channel
);
707 _dispatch_release(channel
);
713 dispatch_io_close(dispatch_io_t channel
, unsigned long flags
)
715 if (flags
& DISPATCH_IO_STOP
) {
716 // Don't stop an already stopped channel
717 if (channel
->atomic_flags
& DIO_STOPPED
) {
720 return _dispatch_io_stop(channel
);
722 // Don't close an already closed or stopped channel
723 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
726 _dispatch_retain(channel
);
727 dispatch_async(channel
->queue
, ^{
728 dispatch_async(channel
->barrier_queue
, ^{
729 _dispatch_object_debug(channel
, "%s", __func__
);
730 _dispatch_channel_debug("close", channel
);
731 if (!(channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
))) {
732 (void)os_atomic_or2o(channel
, atomic_flags
, DIO_CLOSED
,
734 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
736 if (!fd_entry
->path_data
) {
737 channel
->fd_entry
= NULL
;
739 _dispatch_fd_entry_release(fd_entry
);
742 _dispatch_release(channel
);
748 dispatch_io_barrier(dispatch_io_t channel
, dispatch_block_t barrier
)
750 _dispatch_retain(channel
);
751 dispatch_async(channel
->queue
, ^{
752 dispatch_queue_t io_q
= channel
->do_targetq
;
753 dispatch_queue_t barrier_queue
= channel
->barrier_queue
;
754 dispatch_group_t barrier_group
= channel
->barrier_group
;
755 dispatch_async(barrier_queue
, ^{
756 dispatch_suspend(barrier_queue
);
757 dispatch_group_notify(barrier_group
, io_q
, ^{
758 dispatch_thread_context_s io_ctxt
= {
759 .dtc_key
= _dispatch_io_key
,
760 .dtc_io_in_barrier
= channel
,
763 _dispatch_object_debug(channel
, "%s", __func__
);
764 _dispatch_thread_context_push(&io_ctxt
);
766 _dispatch_thread_context_pop(&io_ctxt
);
767 dispatch_resume(barrier_queue
);
768 _dispatch_release(channel
);
775 dispatch_io_barrier_f(dispatch_io_t channel
, void *context
,
776 dispatch_function_t barrier
)
778 return dispatch_io_barrier(channel
, ^{ barrier(context
); });
782 dispatch_io_read(dispatch_io_t channel
, off_t offset
, size_t length
,
783 dispatch_queue_t queue
, dispatch_io_handler_t handler
)
785 _dispatch_retain(channel
);
786 _dispatch_retain(queue
);
787 dispatch_async(channel
->queue
, ^{
788 dispatch_operation_t op
;
789 op
= _dispatch_operation_create(DOP_DIR_READ
, channel
, offset
,
790 length
, dispatch_data_empty
, queue
, handler
);
792 dispatch_queue_t barrier_q
= channel
->barrier_queue
;
793 dispatch_async(barrier_q
, ^{
794 _dispatch_operation_enqueue(op
, DOP_DIR_READ
,
795 dispatch_data_empty
);
798 _dispatch_release(channel
);
799 _dispatch_release(queue
);
804 dispatch_io_read_f(dispatch_io_t channel
, off_t offset
, size_t length
,
805 dispatch_queue_t queue
, void *context
,
806 dispatch_io_handler_function_t handler
)
808 return dispatch_io_read(channel
, offset
, length
, queue
,
809 ^(bool done
, dispatch_data_t d
, int error
){
810 handler(context
, done
, d
, error
);
815 dispatch_io_write(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
,
816 dispatch_queue_t queue
, dispatch_io_handler_t handler
)
818 _dispatch_io_data_retain(data
);
819 _dispatch_retain(channel
);
820 _dispatch_retain(queue
);
821 dispatch_async(channel
->queue
, ^{
822 dispatch_operation_t op
;
823 op
= _dispatch_operation_create(DOP_DIR_WRITE
, channel
, offset
,
824 dispatch_data_get_size(data
), data
, queue
, handler
);
826 dispatch_queue_t barrier_q
= channel
->barrier_queue
;
827 dispatch_async(barrier_q
, ^{
828 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
);
829 _dispatch_io_data_release(data
);
832 _dispatch_io_data_release(data
);
834 _dispatch_release(channel
);
835 _dispatch_release(queue
);
840 dispatch_io_write_f(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
,
841 dispatch_queue_t queue
, void *context
,
842 dispatch_io_handler_function_t handler
)
844 return dispatch_io_write(channel
, offset
, data
, queue
,
845 ^(bool done
, dispatch_data_t d
, int error
){
846 handler(context
, done
, d
, error
);
851 dispatch_read(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
,
852 void (^handler
)(dispatch_data_t
, int))
854 _dispatch_retain(queue
);
855 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
858 int err
= fd_entry
->err
;
859 dispatch_async(queue
, ^{
860 _dispatch_fd_debug("convenience handler invoke", fd
);
861 handler(dispatch_data_empty
, err
);
863 _dispatch_release(queue
);
866 // Safe to access fd_entry on barrier queue
867 dispatch_io_t channel
= fd_entry
->convenience_channel
;
869 channel
= _dispatch_io_create(DISPATCH_IO_STREAM
);
871 channel
->fd_actual
= fd
;
872 channel
->fd_entry
= fd_entry
;
873 dispatch_retain(fd_entry
->barrier_queue
);
874 dispatch_retain(fd_entry
->barrier_group
);
875 channel
->barrier_queue
= fd_entry
->barrier_queue
;
876 channel
->barrier_group
= fd_entry
->barrier_group
;
877 fd_entry
->convenience_channel
= channel
;
879 __block dispatch_data_t deliver_data
= dispatch_data_empty
;
881 dispatch_async(fd_entry
->close_queue
, ^{
882 dispatch_async(queue
, ^{
883 _dispatch_fd_debug("convenience handler invoke", fd
);
884 handler(deliver_data
, err
);
885 _dispatch_io_data_release(deliver_data
);
887 _dispatch_release(queue
);
889 dispatch_operation_t op
=
890 _dispatch_operation_create(DOP_DIR_READ
, channel
, 0,
891 length
, dispatch_data_empty
,
892 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,false),
893 ^(bool done
, dispatch_data_t data
, int error
) {
895 data
= dispatch_data_create_concat(deliver_data
, data
);
896 _dispatch_io_data_release(deliver_data
);
904 _dispatch_operation_enqueue(op
, DOP_DIR_READ
, dispatch_data_empty
);
910 dispatch_read_f(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
,
911 void *context
, void (*handler
)(void *, dispatch_data_t
, int))
913 return dispatch_read(fd
, length
, queue
, ^(dispatch_data_t d
, int error
){
914 handler(context
, d
, error
);
919 dispatch_write(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
,
920 void (^handler
)(dispatch_data_t
, int))
922 _dispatch_io_data_retain(data
);
923 _dispatch_retain(queue
);
924 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
927 int err
= fd_entry
->err
;
928 dispatch_async(queue
, ^{
929 _dispatch_fd_debug("convenience handler invoke", fd
);
932 _dispatch_release(queue
);
935 // Safe to access fd_entry on barrier queue
936 dispatch_io_t channel
= fd_entry
->convenience_channel
;
938 channel
= _dispatch_io_create(DISPATCH_IO_STREAM
);
940 channel
->fd_actual
= fd
;
941 channel
->fd_entry
= fd_entry
;
942 dispatch_retain(fd_entry
->barrier_queue
);
943 dispatch_retain(fd_entry
->barrier_group
);
944 channel
->barrier_queue
= fd_entry
->barrier_queue
;
945 channel
->barrier_group
= fd_entry
->barrier_group
;
946 fd_entry
->convenience_channel
= channel
;
948 __block dispatch_data_t deliver_data
= NULL
;
950 dispatch_async(fd_entry
->close_queue
, ^{
951 dispatch_async(queue
, ^{
952 _dispatch_fd_debug("convenience handler invoke", fd
);
953 handler(deliver_data
, err
);
955 _dispatch_io_data_release(deliver_data
);
958 _dispatch_release(queue
);
960 dispatch_operation_t op
=
961 _dispatch_operation_create(DOP_DIR_WRITE
, channel
, 0,
962 dispatch_data_get_size(data
), data
,
963 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,false),
964 ^(bool done
, dispatch_data_t d
, int error
) {
967 _dispatch_io_data_retain(d
);
974 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
);
976 _dispatch_io_data_release(data
);
981 dispatch_write_f(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
,
982 void *context
, void (*handler
)(void *, dispatch_data_t
, int))
984 return dispatch_write(fd
, data
, queue
, ^(dispatch_data_t d
, int error
){
985 handler(context
, d
, error
);
990 #pragma mark dispatch_operation_t
992 static dispatch_operation_t
993 _dispatch_operation_create(dispatch_op_direction_t direction
,
994 dispatch_io_t channel
, off_t offset
, size_t length
,
995 dispatch_data_t data
, dispatch_queue_t queue
,
996 dispatch_io_handler_t handler
)
999 dispatch_assert(direction
< DOP_DIR_MAX
);
1000 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
1001 // that can only be NULL if atomic_flags are set rdar://problem/8362514
1002 int err
= _dispatch_io_get_error(NULL
, channel
, false);
1003 if (err
|| !length
) {
1004 _dispatch_io_data_retain(data
);
1005 _dispatch_retain(queue
);
1006 dispatch_async(channel
->barrier_queue
, ^{
1007 dispatch_async(queue
, ^{
1008 dispatch_data_t d
= data
;
1009 if (direction
== DOP_DIR_READ
&& err
) {
1011 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
1014 _dispatch_channel_debug("IO handler invoke: err %d", channel
,
1016 handler(true, d
, err
);
1017 _dispatch_io_data_release(data
);
1019 _dispatch_release(queue
);
1023 dispatch_operation_t op
= _dispatch_alloc(DISPATCH_VTABLE(operation
),
1024 sizeof(struct dispatch_operation_s
));
1025 _dispatch_channel_debug("operation create: %p", channel
, op
);
1026 op
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1027 op
->do_xref_cnt
= -1; // operation object is not exposed externally
1028 op
->op_q
= dispatch_queue_create("com.apple.libdispatch-io.opq", NULL
);
1029 op
->op_q
->do_targetq
= queue
;
1030 _dispatch_retain(queue
);
1032 op
->direction
= direction
;
1033 op
->offset
= offset
+ channel
->f_ptr
;
1034 op
->length
= length
;
1035 op
->handler
= _dispatch_io_Block_copy(handler
);
1036 _dispatch_retain(channel
);
1037 op
->channel
= channel
;
1038 op
->params
= channel
->params
;
1039 // Take a snapshot of the priority of the channel queue. The actual I/O
1040 // for this operation will be performed at this priority
1041 dispatch_queue_t targetq
= op
->channel
->do_targetq
;
1042 while (fastpath(targetq
->do_targetq
)) {
1043 targetq
= targetq
->do_targetq
;
1045 op
->do_targetq
= targetq
;
1046 _dispatch_object_debug(op
, "%s", __func__
);
1051 _dispatch_operation_dispose(dispatch_operation_t op
)
1053 _dispatch_object_debug(op
, "%s", __func__
);
1054 _dispatch_op_debug("dispose", op
);
1055 // Deliver the data if there's any
1057 _dispatch_operation_deliver_data(op
, DOP_DONE
);
1058 dispatch_group_leave(op
->fd_entry
->barrier_group
);
1059 _dispatch_fd_entry_release(op
->fd_entry
);
1062 _dispatch_release(op
->channel
);
1065 dispatch_release(op
->timer
);
1067 // For write operations, op->buf is owned by op->buf_data
1068 if (op
->buf
&& op
->direction
== DOP_DIR_READ
) {
1072 _dispatch_io_data_release(op
->buf_data
);
1075 _dispatch_io_data_release(op
->data
);
1078 dispatch_release(op
->op_q
);
1080 Block_release(op
->handler
);
1081 _dispatch_op_debug("disposed", op
);
1085 _dispatch_operation_enqueue(dispatch_operation_t op
,
1086 dispatch_op_direction_t direction
, dispatch_data_t data
)
1088 // Called from the barrier queue
1089 _dispatch_io_data_retain(data
);
1090 // If channel is closed or stopped, then call the handler immediately
1091 int err
= _dispatch_io_get_error(NULL
, op
->channel
, false);
1093 dispatch_io_handler_t handler
= op
->handler
;
1094 dispatch_async(op
->op_q
, ^{
1095 dispatch_data_t d
= data
;
1096 if (direction
== DOP_DIR_READ
&& err
) {
1098 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
1101 handler(true, d
, err
);
1102 _dispatch_io_data_release(data
);
1104 _dispatch_op_debug("release -> %d, err %d", op
, op
->do_ref_cnt
, err
);
1105 _dispatch_release(op
);
1108 // Finish operation init
1109 op
->fd_entry
= op
->channel
->fd_entry
;
1110 _dispatch_fd_entry_retain(op
->fd_entry
);
1111 dispatch_group_enter(op
->fd_entry
->barrier_group
);
1112 dispatch_disk_t disk
= op
->fd_entry
->disk
;
1114 dispatch_stream_t stream
= op
->fd_entry
->streams
[direction
];
1115 dispatch_async(stream
->dq
, ^{
1116 _dispatch_stream_enqueue_operation(stream
, op
, data
);
1117 _dispatch_io_data_release(data
);
1120 dispatch_async(disk
->pick_queue
, ^{
1121 _dispatch_disk_enqueue_operation(disk
, op
, data
);
1122 _dispatch_io_data_release(data
);
1128 _dispatch_operation_should_enqueue(dispatch_operation_t op
,
1129 dispatch_queue_t tq
, dispatch_data_t data
)
1131 // On stream queue or disk queue
1132 _dispatch_op_debug("enqueue", op
);
1133 _dispatch_io_data_retain(data
);
1135 int err
= _dispatch_io_get_error(op
, NULL
, true);
1139 _dispatch_op_debug("release -> %d, err %d", op
, op
->do_ref_cnt
, err
);
1140 _dispatch_release(op
);
1143 if (op
->params
.interval
) {
1144 dispatch_resume(_dispatch_operation_timer(tq
, op
));
1149 static dispatch_source_t
1150 _dispatch_operation_timer(dispatch_queue_t tq
, dispatch_operation_t op
)
1152 // On stream queue or pick queue
1156 dispatch_source_t timer
= dispatch_source_create(
1157 DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, tq
);
1158 dispatch_source_set_timer(timer
, dispatch_time(DISPATCH_TIME_NOW
,
1159 (int64_t)op
->params
.interval
), op
->params
.interval
, 0);
1160 dispatch_source_set_event_handler(timer
, ^{
1161 // On stream queue or pick queue
1162 if (dispatch_source_testcancel(timer
)) {
1163 // Do nothing. The operation has already completed
1166 dispatch_op_flags_t flags
= DOP_DEFAULT
;
1167 if (op
->params
.interval_flags
& DISPATCH_IO_STRICT_INTERVAL
) {
1168 // Deliver even if there is less data than the low-water mark
1169 flags
|= DOP_DELIVER
;
1171 // If the operation is active, dont deliver data
1172 if ((op
->active
) && (flags
& DOP_DELIVER
)) {
1175 _dispatch_operation_deliver_data(op
, flags
);
1183 #pragma mark dispatch_fd_entry_t
1185 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1187 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry
)
1189 guardid_t guard
= fd_entry
;
1190 const unsigned int guard_flags
= GUARD_CLOSE
;
1191 int err
, fd_flags
= 0;
1192 _dispatch_io_syscall_switch_noerr(err
,
1193 change_fdguard_np(fd_entry
->fd
, NULL
, 0, &guard
, guard_flags
,
1196 fd_entry
->guard_flags
= guard_flags
;
1197 fd_entry
->orig_fd_flags
= fd_flags
;
1200 default: (void)dispatch_assume_zero(err
); break;
1205 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry
)
1207 if (!fd_entry
->guard_flags
) {
1210 guardid_t guard
= fd_entry
;
1211 int err
, fd_flags
= fd_entry
->orig_fd_flags
;
1212 _dispatch_io_syscall_switch(err
,
1213 change_fdguard_np(fd_entry
->fd
, &guard
, fd_entry
->guard_flags
, NULL
, 0,
1215 default: (void)dispatch_assume_zero(err
); break;
1220 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry
) { (void)fd_entry
; }
1222 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry
) { (void)fd_entry
; }
1223 #endif // DISPATCH_USE_GUARDED_FD
1226 _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry
, const char *path
,
1227 int oflag
, mode_t mode
) {
1228 #if DISPATCH_USE_GUARDED_FD
1229 guardid_t guard
= (uintptr_t)fd_entry
;
1230 const unsigned int guard_flags
= GUARD_CLOSE
| GUARD_DUP
|
1231 GUARD_SOCKET_IPC
| GUARD_FILEPORT
;
1232 int fd
= guarded_open_np(path
, &guard
, guard_flags
, oflag
| O_CLOEXEC
,
1235 fd_entry
->guard_flags
= guard_flags
;
1240 return open(path
, oflag
, mode
);
1245 _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry
, int fd
) {
1246 #if DISPATCH_USE_GUARDED_FD
1247 if (fd_entry
->guard_flags
) {
1248 guardid_t guard
= (uintptr_t)fd_entry
;
1249 return guarded_close_np(fd
, &guard
);
1259 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
) {
1260 dispatch_suspend(fd_entry
->close_queue
);
1264 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
) {
1265 dispatch_resume(fd_entry
->close_queue
);
1269 _dispatch_fd_entry_init_async(dispatch_fd_t fd
,
1270 dispatch_fd_entry_init_callback_t completion_callback
)
1272 static dispatch_once_t _dispatch_io_fds_lockq_pred
;
1273 dispatch_once_f(&_dispatch_io_fds_lockq_pred
, NULL
,
1274 _dispatch_io_fds_lockq_init
);
1275 dispatch_async(_dispatch_io_fds_lockq
, ^{
1276 dispatch_fd_entry_t fd_entry
= NULL
;
1277 // Check to see if there is an existing entry for the given fd
1278 uintptr_t hash
= DIO_HASH(fd
);
1279 TAILQ_FOREACH(fd_entry
, &_dispatch_io_fds
[hash
], fd_list
) {
1280 if (fd_entry
->fd
== fd
) {
1281 // Retain the fd_entry to ensure it cannot go away until the
1282 // stat() has completed
1283 _dispatch_fd_entry_retain(fd_entry
);
1288 // If we did not find an existing entry, create one
1289 fd_entry
= _dispatch_fd_entry_create_with_fd(fd
, hash
);
1291 _dispatch_fd_entry_debug("init", fd_entry
);
1292 dispatch_async(fd_entry
->barrier_queue
, ^{
1293 _dispatch_fd_entry_debug("init completion", fd_entry
);
1294 completion_callback(fd_entry
);
1295 // stat() is complete, release reference to fd_entry
1296 _dispatch_fd_entry_release(fd_entry
);
1301 static dispatch_fd_entry_t
1302 _dispatch_fd_entry_create(dispatch_queue_t q
)
1304 dispatch_fd_entry_t fd_entry
;
1305 fd_entry
= _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s
));
1306 fd_entry
->close_queue
= dispatch_queue_create(
1307 "com.apple.libdispatch-io.closeq", NULL
);
1308 // Use target queue to ensure that no concurrent lookups are going on when
1309 // the close queue is running
1310 fd_entry
->close_queue
->do_targetq
= q
;
1311 _dispatch_retain(q
);
1312 // Suspend the cleanup queue until closing
1313 _dispatch_fd_entry_retain(fd_entry
);
1317 static dispatch_fd_entry_t
1318 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
, uintptr_t hash
)
1320 // On fds lock queue
1321 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create(
1322 _dispatch_io_fds_lockq
);
1323 _dispatch_fd_entry_debug("create: fd %d", fd_entry
, fd
);
1325 TAILQ_INSERT_TAIL(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
);
1326 fd_entry
->barrier_queue
= dispatch_queue_create(
1327 "com.apple.libdispatch-io.barrierq", NULL
);
1328 fd_entry
->barrier_group
= dispatch_group_create();
1329 dispatch_async(fd_entry
->barrier_queue
, ^{
1330 _dispatch_fd_entry_debug("stat", fd_entry
);
1331 int err
, orig_flags
, orig_nosigpipe
= -1;
1333 _dispatch_io_syscall_switch(err
,
1335 default: fd_entry
->err
= err
; return;
1337 fd_entry
->stat
.dev
= st
.st_dev
;
1338 fd_entry
->stat
.mode
= st
.st_mode
;
1339 _dispatch_fd_entry_guard(fd_entry
);
1340 _dispatch_io_syscall_switch(err
,
1341 orig_flags
= fcntl(fd
, F_GETFL
),
1342 default: (void)dispatch_assume_zero(err
); break;
1344 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1345 if (S_ISFIFO(st
.st_mode
)) {
1346 _dispatch_io_syscall_switch(err
,
1347 orig_nosigpipe
= fcntl(fd
, F_GETNOSIGPIPE
),
1348 default: (void)dispatch_assume_zero(err
); break;
1350 if (orig_nosigpipe
!= -1) {
1351 _dispatch_io_syscall_switch(err
,
1352 orig_nosigpipe
= fcntl(fd
, F_SETNOSIGPIPE
, 1),
1354 orig_nosigpipe
= -1;
1355 (void)dispatch_assume_zero(err
);
1361 if (S_ISREG(st
.st_mode
)) {
1362 if (orig_flags
!= -1) {
1363 _dispatch_io_syscall_switch(err
,
1364 fcntl(fd
, F_SETFL
, orig_flags
& ~O_NONBLOCK
),
1367 (void)dispatch_assume_zero(err
);
1371 int32_t dev
= major(st
.st_dev
);
1372 // We have to get the disk on the global dev queue. The
1373 // barrier queue cannot continue until that is complete
1374 dispatch_suspend(fd_entry
->barrier_queue
);
1375 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
,
1376 _dispatch_io_devs_lockq_init
);
1377 dispatch_async(_dispatch_io_devs_lockq
, ^{
1378 _dispatch_disk_init(fd_entry
, dev
);
1379 dispatch_resume(fd_entry
->barrier_queue
);
1382 if (orig_flags
!= -1) {
1383 _dispatch_io_syscall_switch(err
,
1384 fcntl(fd
, F_SETFL
, orig_flags
| O_NONBLOCK
),
1387 (void)dispatch_assume_zero(err
);
1391 _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue(
1392 _DISPATCH_QOS_CLASS_DEFAULT
, false));
1394 fd_entry
->orig_flags
= orig_flags
;
1395 fd_entry
->orig_nosigpipe
= orig_nosigpipe
;
1397 // This is the first item run when the close queue is resumed, indicating
1398 // that all channels associated with this entry have been closed and that
1399 // all operations associated with this entry have been freed
1400 dispatch_async(fd_entry
->close_queue
, ^{
1401 if (!fd_entry
->disk
) {
1402 _dispatch_fd_entry_debug("close queue cleanup", fd_entry
);
1403 dispatch_op_direction_t dir
;
1404 for (dir
= 0; dir
< DOP_DIR_MAX
; dir
++) {
1405 _dispatch_stream_dispose(fd_entry
, dir
);
1408 dispatch_disk_t disk
= fd_entry
->disk
;
1409 dispatch_async(_dispatch_io_devs_lockq
, ^{
1410 _dispatch_release(disk
);
1413 // Remove this entry from the global fd list
1414 TAILQ_REMOVE(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
);
1416 // If there was a source associated with this stream, disposing of the
1417 // source cancels it and suspends the close queue. Freeing the fd_entry
1418 // structure must happen after the source cancel handler has finished
1419 dispatch_async(fd_entry
->close_queue
, ^{
1420 _dispatch_fd_entry_debug("close queue release", fd_entry
);
1421 dispatch_release(fd_entry
->close_queue
);
1422 _dispatch_fd_entry_debug("barrier queue release", fd_entry
);
1423 dispatch_release(fd_entry
->barrier_queue
);
1424 _dispatch_fd_entry_debug("barrier group release", fd_entry
);
1425 dispatch_release(fd_entry
->barrier_group
);
1426 if (fd_entry
->orig_flags
!= -1) {
1427 _dispatch_io_syscall(
1428 fcntl(fd
, F_SETFL
, fd_entry
->orig_flags
)
1431 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1432 if (fd_entry
->orig_nosigpipe
!= -1) {
1433 _dispatch_io_syscall(
1434 fcntl(fd
, F_SETNOSIGPIPE
, fd_entry
->orig_nosigpipe
)
1438 _dispatch_fd_entry_unguard(fd_entry
);
1439 if (fd_entry
->convenience_channel
) {
1440 fd_entry
->convenience_channel
->fd_entry
= NULL
;
1441 dispatch_release(fd_entry
->convenience_channel
);
1448 static dispatch_fd_entry_t
1449 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data
,
1450 dev_t dev
, mode_t mode
)
1452 // On devs lock queue
1453 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create(
1454 path_data
->channel
->queue
);
1455 _dispatch_fd_entry_debug("create: path %s", fd_entry
, path_data
->path
);
1456 if (S_ISREG(mode
)) {
1457 _dispatch_disk_init(fd_entry
, major(dev
));
1459 _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue(
1460 _DISPATCH_QOS_CLASS_DEFAULT
, false));
1463 fd_entry
->orig_flags
= -1;
1464 fd_entry
->path_data
= path_data
;
1465 fd_entry
->stat
.dev
= dev
;
1466 fd_entry
->stat
.mode
= mode
;
1467 fd_entry
->barrier_queue
= dispatch_queue_create(
1468 "com.apple.libdispatch-io.barrierq", NULL
);
1469 fd_entry
->barrier_group
= dispatch_group_create();
1470 // This is the first item run when the close queue is resumed, indicating
1471 // that the channel associated with this entry has been closed and that
1472 // all operations associated with this entry have been freed
1473 dispatch_async(fd_entry
->close_queue
, ^{
1474 _dispatch_fd_entry_debug("close queue cleanup", fd_entry
);
1475 if (!fd_entry
->disk
) {
1476 dispatch_op_direction_t dir
;
1477 for (dir
= 0; dir
< DOP_DIR_MAX
; dir
++) {
1478 _dispatch_stream_dispose(fd_entry
, dir
);
1481 if (fd_entry
->fd
!= -1) {
1482 _dispatch_fd_entry_guarded_close(fd_entry
, fd_entry
->fd
);
1484 if (fd_entry
->path_data
->channel
) {
1485 // If associated channel has not been released yet, mark it as
1486 // no longer having an fd_entry (for stop after close).
1487 // It is safe to modify channel since we are on close_queue with
1488 // target queue the channel queue
1489 fd_entry
->path_data
->channel
->fd_entry
= NULL
;
1492 dispatch_async(fd_entry
->close_queue
, ^{
1493 _dispatch_fd_entry_debug("close queue release", fd_entry
);
1494 dispatch_release(fd_entry
->close_queue
);
1495 dispatch_release(fd_entry
->barrier_queue
);
1496 dispatch_release(fd_entry
->barrier_group
);
1497 free(fd_entry
->path_data
);
1504 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
, dispatch_io_t channel
)
1506 if (!(fd_entry
->fd
== -1 && fd_entry
->path_data
)) {
1509 if (fd_entry
->err
) {
1510 return fd_entry
->err
;
1513 int oflag
= fd_entry
->disk
? fd_entry
->path_data
->oflag
& ~O_NONBLOCK
:
1514 fd_entry
->path_data
->oflag
| O_NONBLOCK
;
1516 fd
= _dispatch_fd_entry_guarded_open(fd_entry
, fd_entry
->path_data
->path
,
1517 oflag
, fd_entry
->path_data
->mode
);
1523 (void)os_atomic_cmpxchg2o(fd_entry
, err
, 0, err
, relaxed
);
1526 if (!os_atomic_cmpxchg2o(fd_entry
, fd
, -1, fd
, relaxed
)) {
1527 // Lost the race with another open
1528 _dispatch_fd_entry_guarded_close(fd_entry
, fd
);
1530 channel
->fd_actual
= fd
;
1532 _dispatch_object_debug(channel
, "%s", __func__
);
1537 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
,
1538 dispatch_io_t channel
)
1540 if (fd_entry
->disk
) {
1542 _dispatch_retain(channel
);
1544 _dispatch_fd_entry_retain(fd_entry
);
1545 dispatch_async(fd_entry
->disk
->pick_queue
, ^{
1546 _dispatch_disk_cleanup_inactive_operations(fd_entry
->disk
, channel
);
1547 _dispatch_fd_entry_release(fd_entry
);
1549 _dispatch_release(channel
);
1553 dispatch_op_direction_t direction
;
1554 for (direction
= 0; direction
< DOP_DIR_MAX
; direction
++) {
1555 dispatch_stream_t stream
= fd_entry
->streams
[direction
];
1560 _dispatch_retain(channel
);
1562 _dispatch_fd_entry_retain(fd_entry
);
1563 dispatch_async(stream
->dq
, ^{
1564 _dispatch_stream_cleanup_operations(stream
, channel
);
1565 _dispatch_fd_entry_release(fd_entry
);
1567 _dispatch_release(channel
);
1575 #pragma mark dispatch_stream_t/dispatch_disk_t
1578 _dispatch_stream_init(dispatch_fd_entry_t fd_entry
, dispatch_queue_t tq
)
1580 dispatch_op_direction_t direction
;
1581 for (direction
= 0; direction
< DOP_DIR_MAX
; direction
++) {
1582 dispatch_stream_t stream
;
1583 stream
= _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s
));
1584 stream
->dq
= dispatch_queue_create("com.apple.libdispatch-io.streamq",
1586 dispatch_set_context(stream
->dq
, stream
);
1587 _dispatch_retain(tq
);
1588 stream
->dq
->do_targetq
= tq
;
1589 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1590 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_STREAM
]);
1591 fd_entry
->streams
[direction
] = stream
;
1596 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
,
1597 dispatch_op_direction_t direction
)
1600 dispatch_stream_t stream
= fd_entry
->streams
[direction
];
1604 dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
]));
1605 dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
]));
1606 if (stream
->source
) {
1607 // Balanced by source cancel handler:
1608 _dispatch_fd_entry_retain(fd_entry
);
1609 dispatch_source_cancel(stream
->source
);
1610 dispatch_resume(stream
->source
);
1611 dispatch_release(stream
->source
);
1613 dispatch_set_context(stream
->dq
, NULL
);
1614 dispatch_release(stream
->dq
);
1619 _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
)
1621 // On devs lock queue
1622 dispatch_disk_t disk
;
1623 // Check to see if there is an existing entry for the given device
1624 uintptr_t hash
= DIO_HASH(dev
);
1625 TAILQ_FOREACH(disk
, &_dispatch_io_devs
[hash
], disk_list
) {
1626 if (disk
->dev
== dev
) {
1627 _dispatch_retain(disk
);
1631 // Otherwise create a new entry
1632 size_t pending_reqs_depth
= dispatch_io_defaults
.max_pending_io_reqs
;
1633 disk
= _dispatch_alloc(DISPATCH_VTABLE(disk
),
1634 sizeof(struct dispatch_disk_s
) +
1635 (pending_reqs_depth
* sizeof(dispatch_operation_t
)));
1636 disk
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1637 disk
->do_xref_cnt
= -1;
1638 disk
->advise_list_depth
= pending_reqs_depth
;
1639 disk
->do_targetq
= _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT
,
1642 TAILQ_INIT(&disk
->operations
);
1643 disk
->cur_rq
= TAILQ_FIRST(&disk
->operations
);
1645 snprintf(label
, sizeof(label
), "com.apple.libdispatch-io.deviceq.%d",
1647 disk
->pick_queue
= dispatch_queue_create(label
, NULL
);
1648 TAILQ_INSERT_TAIL(&_dispatch_io_devs
[hash
], disk
, disk_list
);
1650 fd_entry
->disk
= disk
;
1651 TAILQ_INIT(&fd_entry
->stream_ops
);
1655 _dispatch_disk_dispose(dispatch_disk_t disk
)
1657 uintptr_t hash
= DIO_HASH(disk
->dev
);
1658 TAILQ_REMOVE(&_dispatch_io_devs
[hash
], disk
, disk_list
);
1659 dispatch_assert(TAILQ_EMPTY(&disk
->operations
));
1661 for (i
=0; i
<disk
->advise_list_depth
; ++i
) {
1662 dispatch_assert(!disk
->advise_list
[i
]);
1664 dispatch_release(disk
->pick_queue
);
1668 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1671 _dispatch_stream_operation_avail(dispatch_stream_t stream
)
1673 return !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) ||
1674 !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
]));
1678 _dispatch_stream_enqueue_operation(dispatch_stream_t stream
,
1679 dispatch_operation_t op
, dispatch_data_t data
)
1681 if (!_dispatch_operation_should_enqueue(op
, stream
->dq
, data
)) {
1684 _dispatch_object_debug(op
, "%s", __func__
);
1685 bool no_ops
= !_dispatch_stream_operation_avail(stream
);
1686 TAILQ_INSERT_TAIL(&stream
->operations
[op
->params
.type
], op
, operation_list
);
1688 dispatch_async_f(stream
->dq
, stream
->dq
,
1689 _dispatch_stream_queue_handler
);
1694 _dispatch_disk_enqueue_operation(dispatch_disk_t disk
, dispatch_operation_t op
,
1695 dispatch_data_t data
)
1697 if (!_dispatch_operation_should_enqueue(op
, disk
->pick_queue
, data
)) {
1700 _dispatch_object_debug(op
, "%s", __func__
);
1701 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1702 if (TAILQ_EMPTY(&op
->fd_entry
->stream_ops
)) {
1703 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
);
1705 TAILQ_INSERT_TAIL(&op
->fd_entry
->stream_ops
, op
, stream_list
);
1707 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
);
1709 _dispatch_disk_handler(disk
);
1713 _dispatch_stream_complete_operation(dispatch_stream_t stream
,
1714 dispatch_operation_t op
)
1717 _dispatch_object_debug(op
, "%s", __func__
);
1718 _dispatch_op_debug("complete: stream %p", op
, stream
);
1719 TAILQ_REMOVE(&stream
->operations
[op
->params
.type
], op
, operation_list
);
1720 if (op
== stream
->op
) {
1724 dispatch_source_cancel(op
->timer
);
1726 // Final release will deliver any pending data
1727 _dispatch_op_debug("release -> %d (stream complete)", op
, op
->do_ref_cnt
);
1728 _dispatch_release(op
);
1732 _dispatch_disk_complete_operation(dispatch_disk_t disk
, dispatch_operation_t op
)
1735 _dispatch_object_debug(op
, "%s", __func__
);
1736 _dispatch_op_debug("complete: disk %p", op
, disk
);
1737 // Current request is always the last op returned
1738 if (disk
->cur_rq
== op
) {
1739 disk
->cur_rq
= TAILQ_PREV(op
, dispatch_disk_operations_s
,
1742 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1743 // Check if there are other pending stream operations behind it
1744 dispatch_operation_t op_next
= TAILQ_NEXT(op
, stream_list
);
1745 TAILQ_REMOVE(&op
->fd_entry
->stream_ops
, op
, stream_list
);
1747 TAILQ_INSERT_TAIL(&disk
->operations
, op_next
, operation_list
);
1750 TAILQ_REMOVE(&disk
->operations
, op
, operation_list
);
1752 dispatch_source_cancel(op
->timer
);
1754 // Final release will deliver any pending data
1755 _dispatch_op_debug("release -> %d (disk complete)", op
, op
->do_ref_cnt
);
1756 _dispatch_release(op
);
1759 static dispatch_operation_t
1760 _dispatch_stream_pick_next_operation(dispatch_stream_t stream
,
1761 dispatch_operation_t op
)
1765 // On the first run through, pick the first operation
1766 if (!_dispatch_stream_operation_avail(stream
)) {
1769 if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
])) {
1770 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_STREAM
]);
1771 } else if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) {
1772 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1776 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1777 // Stream operations need to be serialized so continue the current
1778 // operation until it is finished
1781 // Get the next random operation (round-robin)
1782 if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
1783 op
= TAILQ_NEXT(op
, operation_list
);
1785 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1792 static dispatch_operation_t
1793 _dispatch_disk_pick_next_operation(dispatch_disk_t disk
)
1796 dispatch_operation_t op
;
1797 if (!TAILQ_EMPTY(&disk
->operations
)) {
1798 if (disk
->cur_rq
== NULL
) {
1799 op
= TAILQ_FIRST(&disk
->operations
);
1803 op
= TAILQ_NEXT(op
, operation_list
);
1805 op
= TAILQ_FIRST(&disk
->operations
);
1807 // TODO: more involved picking algorithm rdar://problem/8780312
1808 } while (op
->active
&& op
!= disk
->cur_rq
);
1819 _dispatch_stream_cleanup_operations(dispatch_stream_t stream
,
1820 dispatch_io_t channel
)
1823 dispatch_operation_t op
, tmp
;
1824 typeof(*stream
->operations
) *operations
;
1825 operations
= &stream
->operations
[DISPATCH_IO_RANDOM
];
1826 TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) {
1827 if (!channel
|| op
->channel
== channel
) {
1828 _dispatch_stream_complete_operation(stream
, op
);
1831 operations
= &stream
->operations
[DISPATCH_IO_STREAM
];
1832 TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) {
1833 if (!channel
|| op
->channel
== channel
) {
1834 _dispatch_stream_complete_operation(stream
, op
);
1837 if (stream
->source_running
&& !_dispatch_stream_operation_avail(stream
)) {
1838 dispatch_suspend(stream
->source
);
1839 stream
->source_running
= false;
1844 _dispatch_disk_cleanup_specified_operations(dispatch_disk_t disk
,
1845 dispatch_io_t channel
, bool inactive_only
)
1848 dispatch_operation_t op
, tmp
;
1849 TAILQ_FOREACH_SAFE(op
, &disk
->operations
, operation_list
, tmp
) {
1850 if (inactive_only
&& op
->active
) continue;
1851 if (!channel
|| op
->channel
== channel
) {
1852 _dispatch_op_debug("cleanup: disk %p", op
, disk
);
1853 _dispatch_disk_complete_operation(disk
, op
);
1859 _dispatch_disk_cleanup_operations(dispatch_disk_t disk
, dispatch_io_t channel
)
1861 _dispatch_disk_cleanup_specified_operations(disk
, channel
, false);
1865 _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk
,
1866 dispatch_io_t channel
)
1868 _dispatch_disk_cleanup_specified_operations(disk
, channel
, true);
1872 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1874 static dispatch_source_t
1875 _dispatch_stream_source(dispatch_stream_t stream
, dispatch_operation_t op
)
1878 if (stream
->source
) {
1879 return stream
->source
;
1881 dispatch_fd_t fd
= op
->fd_entry
->fd
;
1882 _dispatch_op_debug("stream source create", op
);
1883 dispatch_source_t source
= NULL
;
1884 if (op
->direction
== DOP_DIR_READ
) {
1885 source
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
,
1886 (uintptr_t)fd
, 0, stream
->dq
);
1887 } else if (op
->direction
== DOP_DIR_WRITE
) {
1888 source
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
,
1889 (uintptr_t)fd
, 0, stream
->dq
);
1891 dispatch_assert(op
->direction
< DOP_DIR_MAX
);
1894 dispatch_set_context(source
, stream
);
1895 dispatch_source_set_event_handler_f(source
,
1896 _dispatch_stream_source_handler
);
1897 // Close queue must not run user cleanup handlers until sources are fully
1899 dispatch_queue_t close_queue
= op
->fd_entry
->close_queue
;
1900 dispatch_source_set_cancel_handler(source
, ^{
1901 _dispatch_op_debug("stream source cancel", op
);
1902 dispatch_resume(close_queue
);
1904 stream
->source
= source
;
1905 return stream
->source
;
1909 _dispatch_stream_source_handler(void *ctx
)
1912 dispatch_stream_t stream
= (dispatch_stream_t
)ctx
;
1913 dispatch_suspend(stream
->source
);
1914 stream
->source_running
= false;
1915 return _dispatch_stream_handler(stream
);
1919 _dispatch_stream_queue_handler(void *ctx
)
1922 dispatch_stream_t stream
= (dispatch_stream_t
)dispatch_get_context(ctx
);
1924 // _dispatch_stream_dispose has been called
1927 return _dispatch_stream_handler(stream
);
1931 _dispatch_stream_handler(void *ctx
)
1934 dispatch_stream_t stream
= (dispatch_stream_t
)ctx
;
1935 dispatch_operation_t op
;
1937 op
= _dispatch_stream_pick_next_operation(stream
, stream
->op
);
1939 _dispatch_debug("no operation found: stream %p", stream
);
1942 int err
= _dispatch_io_get_error(op
, NULL
, true);
1945 _dispatch_stream_complete_operation(stream
, op
);
1949 _dispatch_op_debug("stream handler", op
);
1950 dispatch_fd_entry_t fd_entry
= op
->fd_entry
;
1951 _dispatch_fd_entry_retain(fd_entry
);
1952 // For performance analysis
1953 if (!op
->total
&& dispatch_io_defaults
.initial_delivery
) {
1954 // Empty delivery to signal the start of the operation
1955 _dispatch_op_debug("initial delivery", op
);
1956 _dispatch_operation_deliver_data(op
, DOP_DELIVER
);
1958 // TODO: perform on the operation target queue to get correct priority
1959 int result
= _dispatch_operation_perform(op
);
1960 dispatch_op_flags_t flags
= ~0u;
1962 case DISPATCH_OP_DELIVER
:
1963 flags
= DOP_DEFAULT
;
1965 case DISPATCH_OP_DELIVER_AND_COMPLETE
:
1966 flags
= (flags
!= DOP_DEFAULT
) ? DOP_DELIVER
| DOP_NO_EMPTY
:
1968 _dispatch_operation_deliver_data(op
, flags
);
1970 case DISPATCH_OP_COMPLETE
:
1971 if (flags
!= DOP_DEFAULT
) {
1972 _dispatch_stream_complete_operation(stream
, op
);
1974 if (_dispatch_stream_operation_avail(stream
)) {
1975 dispatch_async_f(stream
->dq
, stream
->dq
,
1976 _dispatch_stream_queue_handler
);
1979 case DISPATCH_OP_COMPLETE_RESUME
:
1980 _dispatch_stream_complete_operation(stream
, op
);
1982 case DISPATCH_OP_RESUME
:
1983 if (_dispatch_stream_operation_avail(stream
)) {
1984 stream
->source_running
= true;
1985 dispatch_resume(_dispatch_stream_source(stream
, op
));
1988 case DISPATCH_OP_ERR
:
1989 _dispatch_stream_cleanup_operations(stream
, op
->channel
);
1991 case DISPATCH_OP_FD_ERR
:
1992 _dispatch_fd_entry_retain(fd_entry
);
1993 dispatch_async(fd_entry
->barrier_queue
, ^{
1994 _dispatch_fd_entry_cleanup_operations(fd_entry
, NULL
);
1995 _dispatch_fd_entry_release(fd_entry
);
2001 _dispatch_fd_entry_release(fd_entry
);
2006 _dispatch_disk_handler(void *ctx
)
2009 dispatch_disk_t disk
= (dispatch_disk_t
)ctx
;
2010 if (disk
->io_active
) {
2013 _dispatch_disk_debug("disk handler", disk
);
2014 dispatch_operation_t op
;
2015 size_t i
= disk
->free_idx
, j
= disk
->req_idx
;
2017 j
+= disk
->advise_list_depth
;
2020 if ((!disk
->advise_list
[i%disk
->advise_list_depth
]) &&
2021 (op
= _dispatch_disk_pick_next_operation(disk
))) {
2022 int err
= _dispatch_io_get_error(op
, NULL
, true);
2025 _dispatch_disk_complete_operation(disk
, op
);
2028 _dispatch_retain(op
);
2029 _dispatch_op_debug("retain -> %d", op
, op
->do_ref_cnt
+ 1);
2030 disk
->advise_list
[i%disk
->advise_list_depth
] = op
;
2032 _dispatch_op_debug("activate: disk %p", op
, disk
);
2033 _dispatch_object_debug(op
, "%s", __func__
);
2035 // No more operations to get
2040 disk
->free_idx
= (i%disk
->advise_list_depth
);
2041 op
= disk
->advise_list
[disk
->req_idx
];
2043 disk
->io_active
= true;
2044 _dispatch_op_debug("async perform: disk %p", op
, disk
);
2045 dispatch_async_f(op
->do_targetq
, disk
, _dispatch_disk_perform
);
2050 _dispatch_disk_perform(void *ctxt
)
2052 dispatch_disk_t disk
= ctxt
;
2053 _dispatch_disk_debug("disk perform", disk
);
2054 size_t chunk_size
= dispatch_io_defaults
.chunk_size
;
2055 dispatch_operation_t op
;
2056 size_t i
= disk
->advise_idx
, j
= disk
->free_idx
;
2058 j
+= disk
->advise_list_depth
;
2061 op
= disk
->advise_list
[i%disk
->advise_list_depth
];
2063 // Nothing more to advise, must be at free_idx
2064 dispatch_assert(i%disk
->advise_list_depth
== disk
->free_idx
);
2067 if (op
->direction
== DOP_DIR_WRITE
) {
2068 // TODO: preallocate writes ? rdar://problem/9032172
2071 if (op
->fd_entry
->fd
== -1 && _dispatch_fd_entry_open(op
->fd_entry
,
2075 // For performance analysis
2076 if (!op
->total
&& dispatch_io_defaults
.initial_delivery
) {
2077 // Empty delivery to signal the start of the operation
2078 _dispatch_op_debug("initial delivery", op
);
2079 _dispatch_operation_deliver_data(op
, DOP_DELIVER
);
2081 // Advise two chunks if the list only has one element and this is the
2082 // first advise on the operation
2083 if ((j
-i
) == 1 && !disk
->advise_list
[disk
->free_idx
] &&
2084 !op
->advise_offset
) {
2087 _dispatch_operation_advise(op
, chunk_size
);
2089 disk
->advise_idx
= i%disk
->advise_list_depth
;
2090 op
= disk
->advise_list
[disk
->req_idx
];
2091 int result
= _dispatch_operation_perform(op
);
2092 disk
->advise_list
[disk
->req_idx
] = NULL
;
2093 disk
->req_idx
= (++disk
->req_idx
)%disk
->advise_list_depth
;
2094 _dispatch_op_debug("async perform completion: disk %p", op
, disk
);
2095 dispatch_async(disk
->pick_queue
, ^{
2096 _dispatch_op_debug("perform completion", op
);
2098 case DISPATCH_OP_DELIVER
:
2099 _dispatch_operation_deliver_data(op
, DOP_DEFAULT
);
2101 case DISPATCH_OP_COMPLETE
:
2102 _dispatch_disk_complete_operation(disk
, op
);
2104 case DISPATCH_OP_DELIVER_AND_COMPLETE
:
2105 _dispatch_operation_deliver_data(op
, DOP_DELIVER
| DOP_NO_EMPTY
);
2106 _dispatch_disk_complete_operation(disk
, op
);
2108 case DISPATCH_OP_ERR
:
2109 _dispatch_disk_cleanup_operations(disk
, op
->channel
);
2111 case DISPATCH_OP_FD_ERR
:
2112 _dispatch_disk_cleanup_operations(disk
, NULL
);
2115 dispatch_assert(result
);
2118 _dispatch_op_debug("deactivate: disk %p", op
, disk
);
2120 disk
->io_active
= false;
2121 _dispatch_disk_handler(disk
);
2122 // Balancing the retain in _dispatch_disk_handler. Note that op must be
2123 // released at the very end, since it might hold the last reference to
2125 _dispatch_op_debug("release -> %d (disk perform complete)", op
,
2127 _dispatch_release(op
);
2132 #pragma mark dispatch_operation_perform
2135 _dispatch_operation_advise(dispatch_operation_t op
, size_t chunk_size
)
2137 _dispatch_op_debug("advise", op
);
2138 if (_dispatch_io_get_error(op
, NULL
, true)) return;
2140 // linux does not support fcntl (F_RDAVISE)
2141 // define necessary datastructure and use readahead
2148 struct radvisory advise
;
2149 // No point in issuing a read advise for the next chunk if we are already
2150 // a chunk ahead from reading the bytes
2151 if (op
->advise_offset
> (off_t
)(((size_t)op
->offset
+ op
->total
) +
2152 chunk_size
+ PAGE_SIZE
)) {
2155 _dispatch_object_debug(op
, "%s", __func__
);
2156 advise
.ra_count
= (int)chunk_size
;
2157 if (!op
->advise_offset
) {
2158 op
->advise_offset
= op
->offset
;
2159 // If this is the first time through, align the advised range to a
2161 size_t pg_fraction
= ((size_t)op
->offset
+ chunk_size
) % PAGE_SIZE
;
2162 advise
.ra_count
+= (int)(pg_fraction
? PAGE_SIZE
- pg_fraction
: 0);
2164 advise
.ra_offset
= op
->advise_offset
;
2165 op
->advise_offset
+= advise
.ra_count
;
2167 _dispatch_io_syscall_switch(err
,
2168 readahead(op
->fd_entry
->fd
, advise
.ra_offset
, advise
.ra_count
),
2169 case EINVAL
: break; // fd does refer to a non-supported filetype
2170 default: (void)dispatch_assume_zero(err
); break;
2173 _dispatch_io_syscall_switch(err
,
2174 fcntl(op
->fd_entry
->fd
, F_RDADVISE
, &advise
),
2175 case EFBIG
: break; // advised past the end of the file rdar://10415691
2176 case ENOTSUP
: break; // not all FS support radvise rdar://13484629
2177 // TODO: set disk status on error
2178 default: (void)dispatch_assume_zero(err
); break;
2184 _dispatch_operation_perform(dispatch_operation_t op
)
2186 _dispatch_op_debug("perform", op
);
2187 int err
= _dispatch_io_get_error(op
, NULL
, true);
2191 _dispatch_object_debug(op
, "%s", __func__
);
2193 size_t max_buf_siz
= op
->params
.high
;
2194 size_t chunk_siz
= dispatch_io_defaults
.chunk_size
;
2195 if (op
->direction
== DOP_DIR_READ
) {
2196 // If necessary, create a buffer for the ongoing operation, large
2197 // enough to fit chunk_size but at most high-water
2198 size_t data_siz
= dispatch_data_get_size(op
->data
);
2200 dispatch_assert(data_siz
< max_buf_siz
);
2201 max_buf_siz
-= data_siz
;
2203 if (max_buf_siz
> chunk_siz
) {
2204 max_buf_siz
= chunk_siz
;
2206 if (op
->length
< SIZE_MAX
) {
2207 op
->buf_siz
= op
->length
- op
->total
;
2208 if (op
->buf_siz
> max_buf_siz
) {
2209 op
->buf_siz
= max_buf_siz
;
2212 op
->buf_siz
= max_buf_siz
;
2214 op
->buf
= valloc(op
->buf_siz
);
2215 _dispatch_op_debug("buffer allocated", op
);
2216 } else if (op
->direction
== DOP_DIR_WRITE
) {
2217 // Always write the first data piece, if that is smaller than a
2218 // chunk, accumulate further data pieces until chunk size is reached
2219 if (chunk_siz
> max_buf_siz
) {
2220 chunk_siz
= max_buf_siz
;
2223 dispatch_data_apply(op
->data
,
2224 ^(dispatch_data_t region DISPATCH_UNUSED
,
2225 size_t offset DISPATCH_UNUSED
,
2226 const void* buf DISPATCH_UNUSED
, size_t len
) {
2227 size_t siz
= op
->buf_siz
+ len
;
2228 if (!op
->buf_siz
|| siz
<= chunk_siz
) {
2231 return (bool)(siz
< chunk_siz
);
2233 if (op
->buf_siz
> max_buf_siz
) {
2234 op
->buf_siz
= max_buf_siz
;
2237 d
= dispatch_data_create_subrange(op
->data
, 0, op
->buf_siz
);
2238 op
->buf_data
= dispatch_data_create_map(d
, (const void**)&op
->buf
,
2240 _dispatch_io_data_release(d
);
2241 _dispatch_op_debug("buffer mapped", op
);
2244 if (op
->fd_entry
->fd
== -1) {
2245 err
= _dispatch_fd_entry_open(op
->fd_entry
, op
->channel
);
2250 void *buf
= op
->buf
+ op
->buf_len
;
2251 size_t len
= op
->buf_siz
- op
->buf_len
;
2252 off_t off
= (off_t
)((size_t)op
->offset
+ op
->total
);
2253 ssize_t processed
= -1;
2255 if (op
->direction
== DOP_DIR_READ
) {
2256 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
2257 processed
= read(op
->fd_entry
->fd
, buf
, len
);
2258 } else if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
2259 processed
= pread(op
->fd_entry
->fd
, buf
, len
, off
);
2261 } else if (op
->direction
== DOP_DIR_WRITE
) {
2262 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
2263 processed
= write(op
->fd_entry
->fd
, buf
, len
);
2264 } else if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
2265 processed
= pwrite(op
->fd_entry
->fd
, buf
, len
, off
);
2268 // Encountered an error on the file descriptor
2269 if (processed
== -1) {
2276 // EOF is indicated by two handler invocations
2277 if (processed
== 0) {
2278 _dispatch_op_debug("performed: EOF", op
);
2279 return DISPATCH_OP_DELIVER_AND_COMPLETE
;
2281 op
->buf_len
+= (size_t)processed
;
2282 op
->total
+= (size_t)processed
;
2283 if (op
->total
== op
->length
) {
2284 // Finished processing all the bytes requested by the operation
2285 return DISPATCH_OP_COMPLETE
;
2287 // Deliver data only if we satisfy the filters
2288 return DISPATCH_OP_DELIVER
;
2291 if (err
== EAGAIN
|| err
== EWOULDBLOCK
) {
2292 // For disk based files with blocking I/O we should never get EAGAIN
2293 dispatch_assert(!op
->fd_entry
->disk
);
2294 _dispatch_op_debug("performed: EAGAIN/EWOULDBLOCK", op
);
2295 if (op
->direction
== DOP_DIR_READ
&& op
->total
&&
2296 op
->channel
== op
->fd_entry
->convenience_channel
) {
2297 // Convenience read with available data completes on EAGAIN
2298 return DISPATCH_OP_COMPLETE_RESUME
;
2300 return DISPATCH_OP_RESUME
;
2302 _dispatch_op_debug("performed: err %d", op
, err
);
2306 return DISPATCH_OP_ERR
;
2308 (void)os_atomic_cmpxchg2o(op
->fd_entry
, err
, 0, err
, relaxed
);
2309 return DISPATCH_OP_FD_ERR
;
2311 return DISPATCH_OP_COMPLETE
;
2316 _dispatch_operation_deliver_data(dispatch_operation_t op
,
2317 dispatch_op_flags_t flags
)
2319 // Either called from stream resp. pick queue or when op is finalized
2320 dispatch_data_t data
= NULL
;
2322 size_t undelivered
= op
->undelivered
+ op
->buf_len
;
2323 bool deliver
= (flags
& (DOP_DELIVER
|DOP_DONE
)) ||
2324 (op
->flags
& DOP_DELIVER
);
2325 op
->flags
= DOP_DEFAULT
;
2327 // Don't deliver data until low water mark has been reached
2328 if (undelivered
>= op
->params
.low
) {
2330 } else if (op
->buf_len
< op
->buf_siz
) {
2331 // Request buffer is not yet used up
2332 _dispatch_op_debug("buffer data: undelivered %zu", op
, undelivered
);
2337 if (!err
&& (op
->channel
->atomic_flags
& DIO_STOPPED
)) {
2342 // Deliver data or buffer used up
2343 if (op
->direction
== DOP_DIR_READ
) {
2345 void *buf
= op
->buf
;
2346 data
= dispatch_data_create(buf
, op
->buf_len
, NULL
,
2347 DISPATCH_DATA_DESTRUCTOR_FREE
);
2350 dispatch_data_t d
= dispatch_data_create_concat(op
->data
, data
);
2351 _dispatch_io_data_release(op
->data
);
2352 _dispatch_io_data_release(data
);
2357 op
->data
= deliver
? dispatch_data_empty
: data
;
2358 } else if (op
->direction
== DOP_DIR_WRITE
) {
2360 data
= dispatch_data_create_subrange(op
->data
, op
->buf_len
,
2363 if (op
->buf_data
&& op
->buf_len
== op
->buf_siz
) {
2364 _dispatch_io_data_release(op
->buf_data
);
2365 op
->buf_data
= NULL
;
2368 // Trim newly written buffer from head of unwritten data
2371 _dispatch_io_data_retain(data
);
2374 d
= dispatch_data_create_subrange(op
->data
, op
->buf_siz
,
2377 _dispatch_io_data_release(op
->data
);
2381 dispatch_assert(op
->direction
< DOP_DIR_MAX
);
2384 if (!deliver
|| ((flags
& DOP_NO_EMPTY
) && !dispatch_data_get_size(data
))) {
2385 op
->undelivered
= undelivered
;
2386 _dispatch_op_debug("buffer data: undelivered %zu", op
, undelivered
);
2389 op
->undelivered
= 0;
2390 _dispatch_object_debug(op
, "%s", __func__
);
2391 _dispatch_op_debug("deliver data", op
);
2392 dispatch_op_direction_t direction
= op
->direction
;
2393 dispatch_io_handler_t handler
= op
->handler
;
2394 dispatch_fd_entry_t fd_entry
= op
->fd_entry
;
2395 _dispatch_fd_entry_retain(fd_entry
);
2396 dispatch_io_t channel
= op
->channel
;
2397 _dispatch_retain(channel
);
2398 // Note that data delivery may occur after the operation is freed
2399 dispatch_async(op
->op_q
, ^{
2400 bool done
= (flags
& DOP_DONE
);
2401 dispatch_data_t d
= data
;
2403 if (direction
== DOP_DIR_READ
&& err
) {
2404 if (dispatch_data_get_size(d
)) {
2405 _dispatch_op_debug("IO handler invoke", op
);
2406 handler(false, d
, 0);
2409 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
2413 _dispatch_op_debug("IO handler invoke: err %d", op
, err
);
2414 handler(done
, d
, err
);
2415 _dispatch_release(channel
);
2416 _dispatch_fd_entry_release(fd_entry
);
2417 _dispatch_io_data_release(data
);
2422 #pragma mark dispatch_io_debug
2425 _dispatch_io_debug_attr(dispatch_io_t channel
, char* buf
, size_t bufsiz
)
2427 dispatch_queue_t target
= channel
->do_targetq
;
2428 return dsnprintf(buf
, bufsiz
, "type = %s, fd = 0x%x, %sfd_entry = %p, "
2429 "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = "
2430 "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ",
2431 channel
->params
.type
== DISPATCH_IO_STREAM
? "stream" : "random",
2432 channel
->fd_actual
, channel
->atomic_flags
& DIO_STOPPED
?
2433 "stopped, " : channel
->atomic_flags
& DIO_CLOSED
? "closed, " : "",
2434 channel
->fd_entry
, channel
->queue
, target
&& target
->dq_label
?
2435 target
->dq_label
: "", target
, channel
->barrier_queue
,
2436 channel
->barrier_group
, channel
->err
, channel
->params
.low
,
2437 channel
->params
.high
, channel
->params
.interval_flags
&
2438 DISPATCH_IO_STRICT_INTERVAL
? "(strict)" : "",
2439 (unsigned long long) channel
->params
.interval
);
2443 _dispatch_io_debug(dispatch_io_t channel
, char* buf
, size_t bufsiz
)
2446 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
2447 dx_kind(channel
), channel
);
2448 offset
+= _dispatch_object_debug_attr(channel
, &buf
[offset
],
2450 offset
+= _dispatch_io_debug_attr(channel
, &buf
[offset
], bufsiz
- offset
);
2451 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");
2456 _dispatch_operation_debug_attr(dispatch_operation_t op
, char* buf
,
2459 dispatch_queue_t target
= op
->do_targetq
;
2460 dispatch_queue_t oqtarget
= op
->op_q
? op
->op_q
->do_targetq
: NULL
;
2461 return dsnprintf(buf
, bufsiz
, "type = %s %s, fd = 0x%x, fd_entry = %p, "
2462 "channel = %p, queue = %p -> %s[%p], target = %s[%p], "
2463 "offset = %lld, length = %zu, done = %zu, undelivered = %zu, "
2464 "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, "
2465 "interval%s = %llu ", op
->params
.type
== DISPATCH_IO_STREAM
?
2466 "stream" : "random", op
->direction
== DOP_DIR_READ
? "read" :
2467 "write", op
->fd_entry
? op
->fd_entry
->fd
: -1, op
->fd_entry
,
2468 op
->channel
, op
->op_q
, oqtarget
&& oqtarget
->dq_label
?
2469 oqtarget
->dq_label
: "", oqtarget
, target
&& target
->dq_label
?
2470 target
->dq_label
: "", target
, (long long)op
->offset
, op
->length
,
2471 op
->total
, op
->undelivered
+ op
->buf_len
, op
->flags
, op
->err
,
2472 op
->params
.low
, op
->params
.high
, op
->params
.interval_flags
&
2473 DISPATCH_IO_STRICT_INTERVAL
? "(strict)" : "",
2474 (unsigned long long)op
->params
.interval
);
2478 _dispatch_operation_debug(dispatch_operation_t op
, char* buf
, size_t bufsiz
)
2481 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "%s[%p] = { ",
2483 offset
+= _dispatch_object_debug_attr(op
, &buf
[offset
], bufsiz
- offset
);
2484 offset
+= _dispatch_operation_debug_attr(op
, &buf
[offset
], bufsiz
- offset
);
2485 offset
+= dsnprintf(&buf
[offset
], bufsiz
- offset
, "}");