2 * Copyright (c) 2009-2011 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 typedef void (^dispatch_fd_entry_init_callback_t
)(dispatch_fd_entry_t fd_entry
);
25 DISPATCH_EXPORT DISPATCH_NOTHROW
26 void _dispatch_iocntl(uint32_t param
, uint64_t value
);
28 static dispatch_operation_t
_dispatch_operation_create(
29 dispatch_op_direction_t direction
, dispatch_io_t channel
, off_t offset
,
30 size_t length
, dispatch_data_t data
, dispatch_queue_t queue
,
31 dispatch_io_handler_t handler
);
32 static void _dispatch_operation_enqueue(dispatch_operation_t op
,
33 dispatch_op_direction_t direction
, dispatch_data_t data
);
34 static dispatch_source_t
_dispatch_operation_timer(dispatch_queue_t tq
,
35 dispatch_operation_t op
);
36 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
);
37 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
);
38 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd
,
39 dispatch_fd_entry_init_callback_t completion_callback
);
40 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
,
42 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_path(
43 dispatch_io_path_data_t path_data
, dev_t dev
, mode_t mode
);
44 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
,
45 dispatch_io_t channel
);
46 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
,
47 dispatch_io_t channel
);
48 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry
,
50 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
,
51 dispatch_op_direction_t direction
);
52 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
);
53 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream
,
54 dispatch_operation_t operation
, dispatch_data_t data
);
55 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk
,
56 dispatch_operation_t operation
, dispatch_data_t data
);
57 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream
,
58 dispatch_io_t channel
);
59 static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk
,
60 dispatch_io_t channel
);
61 static void _dispatch_stream_source_handler(void *ctx
);
62 static void _dispatch_stream_handler(void *ctx
);
63 static void _dispatch_disk_handler(void *ctx
);
64 static void _dispatch_disk_perform(void *ctxt
);
65 static void _dispatch_operation_advise(dispatch_operation_t op
,
67 static int _dispatch_operation_perform(dispatch_operation_t op
);
68 static void _dispatch_operation_deliver_data(dispatch_operation_t op
,
69 dispatch_op_flags_t flags
);
71 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
72 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
73 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
74 case EINTR: continue; \
78 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
79 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
84 #define _dispatch_io_syscall(__syscall) do { int __err; \
85 _dispatch_io_syscall_switch(__err, __syscall); \
89 DISPATCH_OP_COMPLETE
= 1,
91 DISPATCH_OP_DELIVER_AND_COMPLETE
,
92 DISPATCH_OP_COMPLETE_RESUME
,
98 #define _dispatch_io_Block_copy(x) ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
101 #pragma mark dispatch_io_hashtables
103 #if TARGET_OS_EMBEDDED
104 #define DIO_HASH_SIZE 64u // must be a power of two
106 #define DIO_HASH_SIZE 256u // must be a power of two
108 #define DIO_HASH(x) ((uintptr_t)((x) & (DIO_HASH_SIZE - 1)))
110 // Global hashtable of dev_t -> disk_s mappings
111 DISPATCH_CACHELINE_ALIGN
112 static TAILQ_HEAD(, dispatch_disk_s
) _dispatch_io_devs
[DIO_HASH_SIZE
];
113 // Global hashtable of fd -> fd_entry_s mappings
114 DISPATCH_CACHELINE_ALIGN
115 static TAILQ_HEAD(, dispatch_fd_entry_s
) _dispatch_io_fds
[DIO_HASH_SIZE
];
117 static dispatch_once_t _dispatch_io_devs_lockq_pred
;
118 static dispatch_queue_t _dispatch_io_devs_lockq
;
119 static dispatch_queue_t _dispatch_io_fds_lockq
;
122 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED
)
124 _dispatch_io_fds_lockq
= dispatch_queue_create(
125 "com.apple.libdispatch-io.fd_lockq", NULL
);
127 for (i
= 0; i
< DIO_HASH_SIZE
; i
++) {
128 TAILQ_INIT(&_dispatch_io_fds
[i
]);
133 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED
)
135 _dispatch_io_devs_lockq
= dispatch_queue_create(
136 "com.apple.libdispatch-io.dev_lockq", NULL
);
138 for (i
= 0; i
< DIO_HASH_SIZE
; i
++) {
139 TAILQ_INIT(&_dispatch_io_devs
[i
]);
144 #pragma mark dispatch_io_defaults
147 DISPATCH_IOCNTL_CHUNK_PAGES
= 1,
148 DISPATCH_IOCNTL_LOW_WATER_CHUNKS
,
149 DISPATCH_IOCNTL_INITIAL_DELIVERY
,
150 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
,
153 static struct dispatch_io_defaults_s
{
154 size_t chunk_pages
, low_water_chunks
, max_pending_io_reqs
;
155 bool initial_delivery
;
156 } dispatch_io_defaults
= {
157 .chunk_pages
= DIO_MAX_CHUNK_PAGES
,
158 .low_water_chunks
= DIO_DEFAULT_LOW_WATER_CHUNKS
,
159 .max_pending_io_reqs
= DIO_MAX_PENDING_IO_REQS
,
162 #define _dispatch_iocntl_set_default(p, v) do { \
163 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
167 _dispatch_iocntl(uint32_t param
, uint64_t value
)
170 case DISPATCH_IOCNTL_CHUNK_PAGES
:
171 _dispatch_iocntl_set_default(chunk_pages
, value
);
173 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS
:
174 _dispatch_iocntl_set_default(low_water_chunks
, value
);
176 case DISPATCH_IOCNTL_INITIAL_DELIVERY
:
177 _dispatch_iocntl_set_default(initial_delivery
, value
);
178 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
:
179 _dispatch_iocntl_set_default(max_pending_io_reqs
, value
);
185 #pragma mark dispatch_io_t
188 _dispatch_io_create(dispatch_io_type_t type
)
190 dispatch_io_t channel
= _dispatch_alloc(DISPATCH_VTABLE(io
),
191 sizeof(struct dispatch_io_s
));
192 channel
->do_next
= DISPATCH_OBJECT_LISTLESS
;
193 channel
->do_targetq
= _dispatch_get_root_queue(0, true);
194 channel
->params
.type
= type
;
195 channel
->params
.high
= SIZE_MAX
;
196 channel
->params
.low
= dispatch_io_defaults
.low_water_chunks
*
197 dispatch_io_defaults
.chunk_pages
* PAGE_SIZE
;
198 channel
->queue
= dispatch_queue_create("com.apple.libdispatch-io.channelq",
204 _dispatch_io_init(dispatch_io_t channel
, dispatch_fd_entry_t fd_entry
,
205 dispatch_queue_t queue
, int err
, void (^cleanup_handler
)(int))
207 // Enqueue the cleanup handler on the suspended close queue
208 if (cleanup_handler
) {
209 _dispatch_retain(queue
);
210 dispatch_async(!err
? fd_entry
->close_queue
: channel
->queue
, ^{
211 dispatch_async(queue
, ^{
212 _dispatch_io_debug("cleanup handler invoke", -1);
213 cleanup_handler(err
);
215 _dispatch_release(queue
);
219 channel
->fd_entry
= fd_entry
;
220 dispatch_retain(fd_entry
->barrier_queue
);
221 dispatch_retain(fd_entry
->barrier_group
);
222 channel
->barrier_queue
= fd_entry
->barrier_queue
;
223 channel
->barrier_group
= fd_entry
->barrier_group
;
225 // Still need to create a barrier queue, since all operations go
227 channel
->barrier_queue
= dispatch_queue_create(
228 "com.apple.libdispatch-io.barrierq", NULL
);
229 channel
->barrier_group
= dispatch_group_create();
234 _dispatch_io_dispose(dispatch_io_t channel
)
236 if (channel
->fd_entry
&& !(channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
))) {
237 if (channel
->fd_entry
->path_data
) {
238 // This modification is safe since path_data->channel is checked
239 // only on close_queue (which is still suspended at this point)
240 channel
->fd_entry
->path_data
->channel
= NULL
;
242 // Cleanup handlers will only run when all channels related to this
244 _dispatch_fd_entry_release(channel
->fd_entry
);
246 if (channel
->queue
) {
247 dispatch_release(channel
->queue
);
249 if (channel
->barrier_queue
) {
250 dispatch_release(channel
->barrier_queue
);
252 if (channel
->barrier_group
) {
253 dispatch_release(channel
->barrier_group
);
258 _dispatch_io_validate_type(dispatch_io_t channel
, mode_t mode
)
263 } else if (channel
->params
.type
== DISPATCH_IO_RANDOM
&&
264 (S_ISFIFO(mode
) || S_ISSOCK(mode
))) {
271 _dispatch_io_get_error(dispatch_operation_t op
, dispatch_io_t channel
,
277 channel
= op
->channel
;
279 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
280 if (!ignore_closed
|| channel
->atomic_flags
& DIO_STOPPED
) {
286 err
= op
? op
->fd_entry
->err
: channel
->err
;
292 #pragma mark dispatch_io_channels
295 dispatch_io_create(dispatch_io_type_t type
, dispatch_fd_t fd
,
296 dispatch_queue_t queue
, void (^cleanup_handler
)(int))
298 if (type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) {
301 _dispatch_io_debug("io create", fd
);
302 dispatch_io_t channel
= _dispatch_io_create(type
);
304 channel
->fd_actual
= fd
;
305 dispatch_suspend(channel
->queue
);
306 _dispatch_retain(queue
);
307 _dispatch_retain(channel
);
308 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
310 int err
= fd_entry
->err
;
312 err
= _dispatch_io_validate_type(channel
, fd_entry
->stat
.mode
);
314 if (!err
&& type
== DISPATCH_IO_RANDOM
) {
316 _dispatch_io_syscall_switch_noerr(err
,
317 f_ptr
= lseek(fd_entry
->fd
, 0, SEEK_CUR
),
318 case 0: channel
->f_ptr
= f_ptr
; break;
319 default: (void)dispatch_assume_zero(err
); break;
323 _dispatch_fd_entry_retain(fd_entry
);
324 _dispatch_io_init(channel
, fd_entry
, queue
, err
, cleanup_handler
);
325 dispatch_resume(channel
->queue
);
326 _dispatch_release(channel
);
327 _dispatch_release(queue
);
333 dispatch_io_create_with_path(dispatch_io_type_t type
, const char *path
,
334 int oflag
, mode_t mode
, dispatch_queue_t queue
,
335 void (^cleanup_handler
)(int error
))
337 if ((type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) ||
338 !(path
&& *path
== '/')) {
341 size_t pathlen
= strlen(path
);
342 dispatch_io_path_data_t path_data
= malloc(sizeof(*path_data
) + pathlen
+1);
346 _dispatch_io_debug("io create with path %s", -1, path
);
347 dispatch_io_t channel
= _dispatch_io_create(type
);
349 channel
->fd_actual
= -1;
350 path_data
->channel
= channel
;
351 path_data
->oflag
= oflag
;
352 path_data
->mode
= mode
;
353 path_data
->pathlen
= pathlen
;
354 memcpy(path_data
->path
, path
, pathlen
+ 1);
355 _dispatch_retain(queue
);
356 _dispatch_retain(channel
);
357 dispatch_async(channel
->queue
, ^{
360 _dispatch_io_syscall_switch_noerr(err
,
361 (path_data
->oflag
& O_NOFOLLOW
) == O_NOFOLLOW
||
362 (path_data
->oflag
& O_SYMLINK
) == O_SYMLINK
?
363 lstat(path_data
->path
, &st
) : stat(path_data
->path
, &st
),
365 err
= _dispatch_io_validate_type(channel
, st
.st_mode
);
368 if ((path_data
->oflag
& O_CREAT
) &&
369 (*(path_data
->path
+ path_data
->pathlen
- 1) != '/')) {
370 // Check parent directory
371 char *c
= strrchr(path_data
->path
, '/');
375 _dispatch_io_syscall_switch_noerr(perr
,
376 stat(path_data
->path
, &st
),
378 // Since the parent directory exists, open() will
379 // create a regular file after the fd_entry has
381 st
.st_mode
= S_IFREG
;
392 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
);
393 _dispatch_release(channel
);
394 _dispatch_release(queue
);
397 dispatch_suspend(channel
->queue
);
398 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
,
399 _dispatch_io_devs_lockq_init
);
400 dispatch_async(_dispatch_io_devs_lockq
, ^{
401 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create_with_path(
402 path_data
, st
.st_dev
, st
.st_mode
);
403 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
);
404 dispatch_resume(channel
->queue
);
405 _dispatch_release(channel
);
406 _dispatch_release(queue
);
413 dispatch_io_create_with_io(dispatch_io_type_t type
, dispatch_io_t in_channel
,
414 dispatch_queue_t queue
, void (^cleanup_handler
)(int error
))
416 if (type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) {
419 _dispatch_io_debug("io create with io %p", -1, in_channel
);
420 dispatch_io_t channel
= _dispatch_io_create(type
);
421 dispatch_suspend(channel
->queue
);
422 _dispatch_retain(queue
);
423 _dispatch_retain(channel
);
424 _dispatch_retain(in_channel
);
425 dispatch_async(in_channel
->queue
, ^{
426 int err0
= _dispatch_io_get_error(NULL
, in_channel
, false);
429 _dispatch_io_init(channel
, NULL
, queue
, err0
, cleanup_handler
);
430 dispatch_resume(channel
->queue
);
431 _dispatch_release(channel
);
432 _dispatch_release(in_channel
);
433 _dispatch_release(queue
);
436 dispatch_async(in_channel
->barrier_queue
, ^{
437 int err
= _dispatch_io_get_error(NULL
, in_channel
, false);
438 // If there is no error, the fd_entry for the in_channel is valid.
439 // Since we are running on in_channel's queue, the fd_entry has been
440 // fully resolved and will stay valid for the duration of this block
442 err
= in_channel
->err
;
444 err
= in_channel
->fd_entry
->err
;
448 err
= _dispatch_io_validate_type(channel
,
449 in_channel
->fd_entry
->stat
.mode
);
451 if (!err
&& type
== DISPATCH_IO_RANDOM
&& in_channel
->fd
!= -1) {
453 _dispatch_io_syscall_switch_noerr(err
,
454 f_ptr
= lseek(in_channel
->fd_entry
->fd
, 0, SEEK_CUR
),
455 case 0: channel
->f_ptr
= f_ptr
; break;
456 default: (void)dispatch_assume_zero(err
); break;
461 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
);
462 dispatch_resume(channel
->queue
);
463 _dispatch_release(channel
);
464 _dispatch_release(in_channel
);
465 _dispatch_release(queue
);
468 if (in_channel
->fd
== -1) {
469 // in_channel was created from path
471 channel
->fd_actual
= -1;
472 mode_t mode
= in_channel
->fd_entry
->stat
.mode
;
473 dev_t dev
= in_channel
->fd_entry
->stat
.dev
;
474 size_t path_data_len
= sizeof(struct dispatch_io_path_data_s
) +
475 in_channel
->fd_entry
->path_data
->pathlen
+ 1;
476 dispatch_io_path_data_t path_data
= malloc(path_data_len
);
477 memcpy(path_data
, in_channel
->fd_entry
->path_data
,
479 path_data
->channel
= channel
;
480 // lockq_io_devs is known to already exist
481 dispatch_async(_dispatch_io_devs_lockq
, ^{
482 dispatch_fd_entry_t fd_entry
;
483 fd_entry
= _dispatch_fd_entry_create_with_path(path_data
,
485 _dispatch_io_init(channel
, fd_entry
, queue
, 0,
487 dispatch_resume(channel
->queue
);
488 _dispatch_release(channel
);
489 _dispatch_release(queue
);
492 dispatch_fd_entry_t fd_entry
= in_channel
->fd_entry
;
493 channel
->fd
= in_channel
->fd
;
494 channel
->fd_actual
= in_channel
->fd_actual
;
495 _dispatch_fd_entry_retain(fd_entry
);
496 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
);
497 dispatch_resume(channel
->queue
);
498 _dispatch_release(channel
);
499 _dispatch_release(queue
);
501 _dispatch_release(in_channel
);
508 #pragma mark dispatch_io_accessors
511 dispatch_io_set_high_water(dispatch_io_t channel
, size_t high_water
)
513 _dispatch_retain(channel
);
514 dispatch_async(channel
->queue
, ^{
515 _dispatch_io_debug("io set high water", channel
->fd
);
516 if (channel
->params
.low
> high_water
) {
517 channel
->params
.low
= high_water
;
519 channel
->params
.high
= high_water
? high_water
: 1;
520 _dispatch_release(channel
);
525 dispatch_io_set_low_water(dispatch_io_t channel
, size_t low_water
)
527 _dispatch_retain(channel
);
528 dispatch_async(channel
->queue
, ^{
529 _dispatch_io_debug("io set low water", channel
->fd
);
530 if (channel
->params
.high
< low_water
) {
531 channel
->params
.high
= low_water
? low_water
: 1;
533 channel
->params
.low
= low_water
;
534 _dispatch_release(channel
);
539 dispatch_io_set_interval(dispatch_io_t channel
, uint64_t interval
,
542 _dispatch_retain(channel
);
543 dispatch_async(channel
->queue
, ^{
544 _dispatch_io_debug("io set interval", channel
->fd
);
545 channel
->params
.interval
= interval
;
546 channel
->params
.interval_flags
= flags
;
547 _dispatch_release(channel
);
552 _dispatch_io_set_target_queue(dispatch_io_t channel
, dispatch_queue_t dq
)
554 _dispatch_retain(dq
);
555 _dispatch_retain(channel
);
556 dispatch_async(channel
->queue
, ^{
557 dispatch_queue_t prev_dq
= channel
->do_targetq
;
558 channel
->do_targetq
= dq
;
559 _dispatch_release(prev_dq
);
560 _dispatch_release(channel
);
565 dispatch_io_get_descriptor(dispatch_io_t channel
)
567 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
570 dispatch_fd_t fd
= channel
->fd_actual
;
572 _dispatch_thread_getspecific(dispatch_io_key
) == channel
) {
573 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
574 (void)_dispatch_fd_entry_open(fd_entry
, channel
);
576 return channel
->fd_actual
;
580 #pragma mark dispatch_io_operations
583 _dispatch_io_stop(dispatch_io_t channel
)
585 _dispatch_io_debug("io stop", channel
->fd
);
586 (void)dispatch_atomic_or2o(channel
, atomic_flags
, DIO_STOPPED
);
587 _dispatch_retain(channel
);
588 dispatch_async(channel
->queue
, ^{
589 dispatch_async(channel
->barrier_queue
, ^{
590 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
592 _dispatch_io_debug("io stop cleanup", channel
->fd
);
593 _dispatch_fd_entry_cleanup_operations(fd_entry
, channel
);
594 if (!(channel
->atomic_flags
& DIO_CLOSED
)) {
595 channel
->fd_entry
= NULL
;
596 _dispatch_fd_entry_release(fd_entry
);
598 } else if (channel
->fd
!= -1) {
599 // Stop after close, need to check if fd_entry still exists
600 _dispatch_retain(channel
);
601 dispatch_async(_dispatch_io_fds_lockq
, ^{
602 _dispatch_io_debug("io stop after close cleanup",
604 dispatch_fd_entry_t fdi
;
605 uintptr_t hash
= DIO_HASH(channel
->fd
);
606 TAILQ_FOREACH(fdi
, &_dispatch_io_fds
[hash
], fd_list
) {
607 if (fdi
->fd
== channel
->fd
) {
608 _dispatch_fd_entry_cleanup_operations(fdi
, channel
);
612 _dispatch_release(channel
);
615 _dispatch_release(channel
);
621 dispatch_io_close(dispatch_io_t channel
, unsigned long flags
)
623 if (flags
& DISPATCH_IO_STOP
) {
624 // Don't stop an already stopped channel
625 if (channel
->atomic_flags
& DIO_STOPPED
) {
628 return _dispatch_io_stop(channel
);
630 // Don't close an already closed or stopped channel
631 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
634 _dispatch_retain(channel
);
635 dispatch_async(channel
->queue
, ^{
636 dispatch_async(channel
->barrier_queue
, ^{
637 _dispatch_io_debug("io close", channel
->fd
);
638 if (!(channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
))) {
639 (void)dispatch_atomic_or2o(channel
, atomic_flags
, DIO_CLOSED
);
640 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
641 if (!fd_entry
->path_data
) {
642 channel
->fd_entry
= NULL
;
644 _dispatch_fd_entry_release(fd_entry
);
646 _dispatch_release(channel
);
652 dispatch_io_barrier(dispatch_io_t channel
, dispatch_block_t barrier
)
654 _dispatch_retain(channel
);
655 dispatch_async(channel
->queue
, ^{
656 dispatch_queue_t io_q
= channel
->do_targetq
;
657 dispatch_queue_t barrier_queue
= channel
->barrier_queue
;
658 dispatch_group_t barrier_group
= channel
->barrier_group
;
659 dispatch_async(barrier_queue
, ^{
660 dispatch_suspend(barrier_queue
);
661 dispatch_group_notify(barrier_group
, io_q
, ^{
662 _dispatch_thread_setspecific(dispatch_io_key
, channel
);
664 _dispatch_thread_setspecific(dispatch_io_key
, NULL
);
665 dispatch_resume(barrier_queue
);
666 _dispatch_release(channel
);
673 dispatch_io_read(dispatch_io_t channel
, off_t offset
, size_t length
,
674 dispatch_queue_t queue
, dispatch_io_handler_t handler
)
676 _dispatch_retain(channel
);
677 _dispatch_retain(queue
);
678 dispatch_async(channel
->queue
, ^{
679 dispatch_operation_t op
;
680 op
= _dispatch_operation_create(DOP_DIR_READ
, channel
, offset
,
681 length
, dispatch_data_empty
, queue
, handler
);
683 dispatch_queue_t barrier_q
= channel
->barrier_queue
;
684 dispatch_async(barrier_q
, ^{
685 _dispatch_operation_enqueue(op
, DOP_DIR_READ
,
686 dispatch_data_empty
);
689 _dispatch_release(channel
);
690 _dispatch_release(queue
);
695 dispatch_io_write(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
,
696 dispatch_queue_t queue
, dispatch_io_handler_t handler
)
698 _dispatch_io_data_retain(data
);
699 _dispatch_retain(channel
);
700 _dispatch_retain(queue
);
701 dispatch_async(channel
->queue
, ^{
702 dispatch_operation_t op
;
703 op
= _dispatch_operation_create(DOP_DIR_WRITE
, channel
, offset
,
704 dispatch_data_get_size(data
), data
, queue
, handler
);
706 dispatch_queue_t barrier_q
= channel
->barrier_queue
;
707 dispatch_async(barrier_q
, ^{
708 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
);
709 _dispatch_io_data_release(data
);
712 _dispatch_io_data_release(data
);
714 _dispatch_release(channel
);
715 _dispatch_release(queue
);
720 dispatch_read(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
,
721 void (^handler
)(dispatch_data_t
, int))
723 _dispatch_retain(queue
);
724 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
727 int err
= fd_entry
->err
;
728 dispatch_async(queue
, ^{
729 _dispatch_io_debug("convenience handler invoke", fd
);
730 handler(dispatch_data_empty
, err
);
732 _dispatch_release(queue
);
735 // Safe to access fd_entry on barrier queue
736 dispatch_io_t channel
= fd_entry
->convenience_channel
;
738 channel
= _dispatch_io_create(DISPATCH_IO_STREAM
);
740 channel
->fd_actual
= fd
;
741 channel
->fd_entry
= fd_entry
;
742 dispatch_retain(fd_entry
->barrier_queue
);
743 dispatch_retain(fd_entry
->barrier_group
);
744 channel
->barrier_queue
= fd_entry
->barrier_queue
;
745 channel
->barrier_group
= fd_entry
->barrier_group
;
746 fd_entry
->convenience_channel
= channel
;
748 __block dispatch_data_t deliver_data
= dispatch_data_empty
;
750 dispatch_async(fd_entry
->close_queue
, ^{
751 dispatch_async(queue
, ^{
752 _dispatch_io_debug("convenience handler invoke", fd
);
753 handler(deliver_data
, err
);
754 _dispatch_io_data_release(deliver_data
);
756 _dispatch_release(queue
);
758 dispatch_operation_t op
=
759 _dispatch_operation_create(DOP_DIR_READ
, channel
, 0,
760 length
, dispatch_data_empty
,
761 _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT
,
762 false), ^(bool done
, dispatch_data_t data
, int error
) {
764 data
= dispatch_data_create_concat(deliver_data
, data
);
765 _dispatch_io_data_release(deliver_data
);
773 _dispatch_operation_enqueue(op
, DOP_DIR_READ
, dispatch_data_empty
);
779 dispatch_write(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
,
780 void (^handler
)(dispatch_data_t
, int))
782 _dispatch_io_data_retain(data
);
783 _dispatch_retain(queue
);
784 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
787 int err
= fd_entry
->err
;
788 dispatch_async(queue
, ^{
789 _dispatch_io_debug("convenience handler invoke", fd
);
792 _dispatch_release(queue
);
795 // Safe to access fd_entry on barrier queue
796 dispatch_io_t channel
= fd_entry
->convenience_channel
;
798 channel
= _dispatch_io_create(DISPATCH_IO_STREAM
);
800 channel
->fd_actual
= fd
;
801 channel
->fd_entry
= fd_entry
;
802 dispatch_retain(fd_entry
->barrier_queue
);
803 dispatch_retain(fd_entry
->barrier_group
);
804 channel
->barrier_queue
= fd_entry
->barrier_queue
;
805 channel
->barrier_group
= fd_entry
->barrier_group
;
806 fd_entry
->convenience_channel
= channel
;
808 __block dispatch_data_t deliver_data
= NULL
;
810 dispatch_async(fd_entry
->close_queue
, ^{
811 dispatch_async(queue
, ^{
812 _dispatch_io_debug("convenience handler invoke", fd
);
813 handler(deliver_data
, err
);
815 _dispatch_io_data_release(deliver_data
);
818 _dispatch_release(queue
);
820 dispatch_operation_t op
=
821 _dispatch_operation_create(DOP_DIR_WRITE
, channel
, 0,
822 dispatch_data_get_size(data
), data
,
823 _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT
,
824 false), ^(bool done
, dispatch_data_t d
, int error
) {
827 _dispatch_io_data_retain(d
);
834 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
);
836 _dispatch_io_data_release(data
);
841 #pragma mark dispatch_operation_t
843 static dispatch_operation_t
844 _dispatch_operation_create(dispatch_op_direction_t direction
,
845 dispatch_io_t channel
, off_t offset
, size_t length
,
846 dispatch_data_t data
, dispatch_queue_t queue
,
847 dispatch_io_handler_t handler
)
850 dispatch_assert(direction
< DOP_DIR_MAX
);
851 _dispatch_io_debug("operation create", channel
->fd
);
852 #if DISPATCH_IO_DEBUG
853 int fd
= channel
->fd
;
855 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
856 // that can only be NULL if atomic_flags are set rdar://problem/8362514
857 int err
= _dispatch_io_get_error(NULL
, channel
, false);
858 if (err
|| !length
) {
859 _dispatch_io_data_retain(data
);
860 _dispatch_retain(queue
);
861 dispatch_async(channel
->barrier_queue
, ^{
862 dispatch_async(queue
, ^{
863 dispatch_data_t d
= data
;
864 if (direction
== DOP_DIR_READ
&& err
) {
866 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
869 _dispatch_io_debug("IO handler invoke", fd
);
870 handler(true, d
, err
);
871 _dispatch_io_data_release(data
);
873 _dispatch_release(queue
);
877 dispatch_operation_t op
= _dispatch_alloc(DISPATCH_VTABLE(operation
),
878 sizeof(struct dispatch_operation_s
));
879 op
->do_next
= DISPATCH_OBJECT_LISTLESS
;
880 op
->do_xref_cnt
= -1; // operation object is not exposed externally
881 op
->op_q
= dispatch_queue_create("com.apple.libdispatch-io.opq", NULL
);
882 op
->op_q
->do_targetq
= queue
;
883 _dispatch_retain(queue
);
885 op
->direction
= direction
;
886 op
->offset
= offset
+ channel
->f_ptr
;
888 op
->handler
= _dispatch_io_Block_copy(handler
);
889 _dispatch_retain(channel
);
890 op
->channel
= channel
;
891 op
->params
= channel
->params
;
892 // Take a snapshot of the priority of the channel queue. The actual I/O
893 // for this operation will be performed at this priority
894 dispatch_queue_t targetq
= op
->channel
->do_targetq
;
895 while (fastpath(targetq
->do_targetq
)) {
896 targetq
= targetq
->do_targetq
;
898 op
->do_targetq
= targetq
;
903 _dispatch_operation_dispose(dispatch_operation_t op
)
905 // Deliver the data if there's any
907 _dispatch_operation_deliver_data(op
, DOP_DONE
);
908 dispatch_group_leave(op
->fd_entry
->barrier_group
);
909 _dispatch_fd_entry_release(op
->fd_entry
);
912 _dispatch_release(op
->channel
);
915 dispatch_release(op
->timer
);
917 // For write operations, op->buf is owned by op->buf_data
918 if (op
->buf
&& op
->direction
== DOP_DIR_READ
) {
922 _dispatch_io_data_release(op
->buf_data
);
925 _dispatch_io_data_release(op
->data
);
928 dispatch_release(op
->op_q
);
930 Block_release(op
->handler
);
934 _dispatch_operation_enqueue(dispatch_operation_t op
,
935 dispatch_op_direction_t direction
, dispatch_data_t data
)
937 // Called from the barrier queue
938 _dispatch_io_data_retain(data
);
939 // If channel is closed or stopped, then call the handler immediately
940 int err
= _dispatch_io_get_error(NULL
, op
->channel
, false);
942 dispatch_io_handler_t handler
= op
->handler
;
943 dispatch_async(op
->op_q
, ^{
944 dispatch_data_t d
= data
;
945 if (direction
== DOP_DIR_READ
&& err
) {
947 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
950 handler(true, d
, err
);
951 _dispatch_io_data_release(data
);
953 _dispatch_release(op
);
956 // Finish operation init
957 op
->fd_entry
= op
->channel
->fd_entry
;
958 _dispatch_fd_entry_retain(op
->fd_entry
);
959 dispatch_group_enter(op
->fd_entry
->barrier_group
);
960 dispatch_disk_t disk
= op
->fd_entry
->disk
;
962 dispatch_stream_t stream
= op
->fd_entry
->streams
[direction
];
963 dispatch_async(stream
->dq
, ^{
964 _dispatch_stream_enqueue_operation(stream
, op
, data
);
965 _dispatch_io_data_release(data
);
968 dispatch_async(disk
->pick_queue
, ^{
969 _dispatch_disk_enqueue_operation(disk
, op
, data
);
970 _dispatch_io_data_release(data
);
976 _dispatch_operation_should_enqueue(dispatch_operation_t op
,
977 dispatch_queue_t tq
, dispatch_data_t data
)
979 // On stream queue or disk queue
980 _dispatch_io_debug("enqueue operation", op
->fd_entry
->fd
);
981 _dispatch_io_data_retain(data
);
983 int err
= _dispatch_io_get_error(op
, NULL
, true);
987 _dispatch_release(op
);
990 if (op
->params
.interval
) {
991 dispatch_resume(_dispatch_operation_timer(tq
, op
));
996 static dispatch_source_t
997 _dispatch_operation_timer(dispatch_queue_t tq
, dispatch_operation_t op
)
999 // On stream queue or pick queue
1003 dispatch_source_t timer
= dispatch_source_create(
1004 DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, tq
);
1005 dispatch_source_set_timer(timer
, dispatch_time(DISPATCH_TIME_NOW
,
1006 op
->params
.interval
), op
->params
.interval
, 0);
1007 dispatch_source_set_event_handler(timer
, ^{
1008 // On stream queue or pick queue
1009 if (dispatch_source_testcancel(timer
)) {
1010 // Do nothing. The operation has already completed
1013 dispatch_op_flags_t flags
= DOP_DEFAULT
;
1014 if (op
->params
.interval_flags
& DISPATCH_IO_STRICT_INTERVAL
) {
1015 // Deliver even if there is less data than the low-water mark
1016 flags
|= DOP_DELIVER
;
1018 // If the operation is active, dont deliver data
1019 if ((op
->active
) && (flags
& DOP_DELIVER
)) {
1022 _dispatch_operation_deliver_data(op
, flags
);
1030 #pragma mark dispatch_fd_entry_t
1033 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
) {
1034 dispatch_suspend(fd_entry
->close_queue
);
1038 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
) {
1039 dispatch_resume(fd_entry
->close_queue
);
1043 _dispatch_fd_entry_init_async(dispatch_fd_t fd
,
1044 dispatch_fd_entry_init_callback_t completion_callback
)
1046 static dispatch_once_t _dispatch_io_fds_lockq_pred
;
1047 dispatch_once_f(&_dispatch_io_fds_lockq_pred
, NULL
,
1048 _dispatch_io_fds_lockq_init
);
1049 dispatch_async(_dispatch_io_fds_lockq
, ^{
1050 _dispatch_io_debug("fd entry init", fd
);
1051 dispatch_fd_entry_t fd_entry
= NULL
;
1052 // Check to see if there is an existing entry for the given fd
1053 uintptr_t hash
= DIO_HASH(fd
);
1054 TAILQ_FOREACH(fd_entry
, &_dispatch_io_fds
[hash
], fd_list
) {
1055 if (fd_entry
->fd
== fd
) {
1056 // Retain the fd_entry to ensure it cannot go away until the
1057 // stat() has completed
1058 _dispatch_fd_entry_retain(fd_entry
);
1063 // If we did not find an existing entry, create one
1064 fd_entry
= _dispatch_fd_entry_create_with_fd(fd
, hash
);
1066 dispatch_async(fd_entry
->barrier_queue
, ^{
1067 _dispatch_io_debug("fd entry init completion", fd
);
1068 completion_callback(fd_entry
);
1069 // stat() is complete, release reference to fd_entry
1070 _dispatch_fd_entry_release(fd_entry
);
1075 static dispatch_fd_entry_t
1076 _dispatch_fd_entry_create(dispatch_queue_t q
)
1078 dispatch_fd_entry_t fd_entry
;
1079 fd_entry
= calloc(1ul, sizeof(struct dispatch_fd_entry_s
));
1080 fd_entry
->close_queue
= dispatch_queue_create(
1081 "com.apple.libdispatch-io.closeq", NULL
);
1082 // Use target queue to ensure that no concurrent lookups are going on when
1083 // the close queue is running
1084 fd_entry
->close_queue
->do_targetq
= q
;
1085 _dispatch_retain(q
);
1086 // Suspend the cleanup queue until closing
1087 _dispatch_fd_entry_retain(fd_entry
);
1091 static dispatch_fd_entry_t
1092 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
, uintptr_t hash
)
1094 // On fds lock queue
1095 _dispatch_io_debug("fd entry create", fd
);
1096 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create(
1097 _dispatch_io_fds_lockq
);
1099 TAILQ_INSERT_TAIL(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
);
1100 fd_entry
->barrier_queue
= dispatch_queue_create(
1101 "com.apple.libdispatch-io.barrierq", NULL
);
1102 fd_entry
->barrier_group
= dispatch_group_create();
1103 dispatch_async(fd_entry
->barrier_queue
, ^{
1104 _dispatch_io_debug("fd entry stat", fd
);
1105 int err
, orig_flags
, orig_nosigpipe
= -1;
1107 _dispatch_io_syscall_switch(err
,
1109 default: fd_entry
->err
= err
; return;
1111 fd_entry
->stat
.dev
= st
.st_dev
;
1112 fd_entry
->stat
.mode
= st
.st_mode
;
1113 _dispatch_io_syscall_switch(err
,
1114 orig_flags
= fcntl(fd
, F_GETFL
),
1115 default: (void)dispatch_assume_zero(err
); break;
1117 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1118 if (S_ISFIFO(st
.st_mode
)) {
1119 _dispatch_io_syscall_switch(err
,
1120 orig_nosigpipe
= fcntl(fd
, F_GETNOSIGPIPE
),
1121 default: (void)dispatch_assume_zero(err
); break;
1123 if (orig_nosigpipe
!= -1) {
1124 _dispatch_io_syscall_switch(err
,
1125 orig_nosigpipe
= fcntl(fd
, F_SETNOSIGPIPE
, 1),
1127 orig_nosigpipe
= -1;
1128 (void)dispatch_assume_zero(err
);
1134 if (S_ISREG(st
.st_mode
)) {
1135 if (orig_flags
!= -1) {
1136 _dispatch_io_syscall_switch(err
,
1137 fcntl(fd
, F_SETFL
, orig_flags
& ~O_NONBLOCK
),
1140 (void)dispatch_assume_zero(err
);
1144 int32_t dev
= major(st
.st_dev
);
1145 // We have to get the disk on the global dev queue. The
1146 // barrier queue cannot continue until that is complete
1147 dispatch_suspend(fd_entry
->barrier_queue
);
1148 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
,
1149 _dispatch_io_devs_lockq_init
);
1150 dispatch_async(_dispatch_io_devs_lockq
, ^{
1151 _dispatch_disk_init(fd_entry
, dev
);
1152 dispatch_resume(fd_entry
->barrier_queue
);
1155 if (orig_flags
!= -1) {
1156 _dispatch_io_syscall_switch(err
,
1157 fcntl(fd
, F_SETFL
, orig_flags
| O_NONBLOCK
),
1160 (void)dispatch_assume_zero(err
);
1164 _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue(
1165 DISPATCH_QUEUE_PRIORITY_DEFAULT
, false));
1167 fd_entry
->orig_flags
= orig_flags
;
1168 fd_entry
->orig_nosigpipe
= orig_nosigpipe
;
1170 // This is the first item run when the close queue is resumed, indicating
1171 // that all channels associated with this entry have been closed and that
1172 // all operations associated with this entry have been freed
1173 dispatch_async(fd_entry
->close_queue
, ^{
1174 if (!fd_entry
->disk
) {
1175 _dispatch_io_debug("close queue fd_entry cleanup", fd
);
1176 dispatch_op_direction_t dir
;
1177 for (dir
= 0; dir
< DOP_DIR_MAX
; dir
++) {
1178 _dispatch_stream_dispose(fd_entry
, dir
);
1181 dispatch_disk_t disk
= fd_entry
->disk
;
1182 dispatch_async(_dispatch_io_devs_lockq
, ^{
1183 _dispatch_release(disk
);
1186 // Remove this entry from the global fd list
1187 TAILQ_REMOVE(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
);
1189 // If there was a source associated with this stream, disposing of the
1190 // source cancels it and suspends the close queue. Freeing the fd_entry
1191 // structure must happen after the source cancel handler has finished
1192 dispatch_async(fd_entry
->close_queue
, ^{
1193 _dispatch_io_debug("close queue release", fd
);
1194 dispatch_release(fd_entry
->close_queue
);
1195 _dispatch_io_debug("barrier queue release", fd
);
1196 dispatch_release(fd_entry
->barrier_queue
);
1197 _dispatch_io_debug("barrier group release", fd
);
1198 dispatch_release(fd_entry
->barrier_group
);
1199 if (fd_entry
->orig_flags
!= -1) {
1200 _dispatch_io_syscall(
1201 fcntl(fd
, F_SETFL
, fd_entry
->orig_flags
)
1204 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1205 if (fd_entry
->orig_nosigpipe
!= -1) {
1206 _dispatch_io_syscall(
1207 fcntl(fd
, F_SETNOSIGPIPE
, fd_entry
->orig_nosigpipe
)
1211 if (fd_entry
->convenience_channel
) {
1212 fd_entry
->convenience_channel
->fd_entry
= NULL
;
1213 dispatch_release(fd_entry
->convenience_channel
);
1220 static dispatch_fd_entry_t
1221 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data
,
1222 dev_t dev
, mode_t mode
)
1224 // On devs lock queue
1225 _dispatch_io_debug("fd entry create with path %s", -1, path_data
->path
);
1226 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create(
1227 path_data
->channel
->queue
);
1228 if (S_ISREG(mode
)) {
1229 _dispatch_disk_init(fd_entry
, major(dev
));
1231 _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue(
1232 DISPATCH_QUEUE_PRIORITY_DEFAULT
, false));
1235 fd_entry
->orig_flags
= -1;
1236 fd_entry
->path_data
= path_data
;
1237 fd_entry
->stat
.dev
= dev
;
1238 fd_entry
->stat
.mode
= mode
;
1239 fd_entry
->barrier_queue
= dispatch_queue_create(
1240 "com.apple.libdispatch-io.barrierq", NULL
);
1241 fd_entry
->barrier_group
= dispatch_group_create();
1242 // This is the first item run when the close queue is resumed, indicating
1243 // that the channel associated with this entry has been closed and that
1244 // all operations associated with this entry have been freed
1245 dispatch_async(fd_entry
->close_queue
, ^{
1246 _dispatch_io_debug("close queue fd_entry cleanup", -1);
1247 if (!fd_entry
->disk
) {
1248 dispatch_op_direction_t dir
;
1249 for (dir
= 0; dir
< DOP_DIR_MAX
; dir
++) {
1250 _dispatch_stream_dispose(fd_entry
, dir
);
1253 if (fd_entry
->fd
!= -1) {
1254 close(fd_entry
->fd
);
1256 if (fd_entry
->path_data
->channel
) {
1257 // If associated channel has not been released yet, mark it as
1258 // no longer having an fd_entry (for stop after close).
1259 // It is safe to modify channel since we are on close_queue with
1260 // target queue the channel queue
1261 fd_entry
->path_data
->channel
->fd_entry
= NULL
;
1264 dispatch_async(fd_entry
->close_queue
, ^{
1265 _dispatch_io_debug("close queue release", -1);
1266 dispatch_release(fd_entry
->close_queue
);
1267 dispatch_release(fd_entry
->barrier_queue
);
1268 dispatch_release(fd_entry
->barrier_group
);
1269 free(fd_entry
->path_data
);
1276 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
, dispatch_io_t channel
)
1278 if (!(fd_entry
->fd
== -1 && fd_entry
->path_data
)) {
1281 if (fd_entry
->err
) {
1282 return fd_entry
->err
;
1285 int oflag
= fd_entry
->disk
? fd_entry
->path_data
->oflag
& ~O_NONBLOCK
:
1286 fd_entry
->path_data
->oflag
| O_NONBLOCK
;
1288 fd
= open(fd_entry
->path_data
->path
, oflag
, fd_entry
->path_data
->mode
);
1294 (void)dispatch_atomic_cmpxchg2o(fd_entry
, err
, 0, err
);
1297 if (!dispatch_atomic_cmpxchg2o(fd_entry
, fd
, -1, fd
)) {
1298 // Lost the race with another open
1301 channel
->fd_actual
= fd
;
1307 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
,
1308 dispatch_io_t channel
)
1310 if (fd_entry
->disk
) {
1312 _dispatch_retain(channel
);
1314 _dispatch_fd_entry_retain(fd_entry
);
1315 dispatch_async(fd_entry
->disk
->pick_queue
, ^{
1316 _dispatch_disk_cleanup_operations(fd_entry
->disk
, channel
);
1317 _dispatch_fd_entry_release(fd_entry
);
1319 _dispatch_release(channel
);
1323 dispatch_op_direction_t direction
;
1324 for (direction
= 0; direction
< DOP_DIR_MAX
; direction
++) {
1325 dispatch_stream_t stream
= fd_entry
->streams
[direction
];
1330 _dispatch_retain(channel
);
1332 _dispatch_fd_entry_retain(fd_entry
);
1333 dispatch_async(stream
->dq
, ^{
1334 _dispatch_stream_cleanup_operations(stream
, channel
);
1335 _dispatch_fd_entry_release(fd_entry
);
1337 _dispatch_release(channel
);
1345 #pragma mark dispatch_stream_t/dispatch_disk_t
1348 _dispatch_stream_init(dispatch_fd_entry_t fd_entry
, dispatch_queue_t tq
)
1350 dispatch_op_direction_t direction
;
1351 for (direction
= 0; direction
< DOP_DIR_MAX
; direction
++) {
1352 dispatch_stream_t stream
;
1353 stream
= calloc(1ul, sizeof(struct dispatch_stream_s
));
1354 stream
->dq
= dispatch_queue_create("com.apple.libdispatch-io.streamq",
1356 _dispatch_retain(tq
);
1357 stream
->dq
->do_targetq
= tq
;
1358 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1359 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_STREAM
]);
1360 fd_entry
->streams
[direction
] = stream
;
1365 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
,
1366 dispatch_op_direction_t direction
)
1369 dispatch_stream_t stream
= fd_entry
->streams
[direction
];
1373 dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
]));
1374 dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
]));
1375 if (stream
->source
) {
1376 // Balanced by source cancel handler:
1377 _dispatch_fd_entry_retain(fd_entry
);
1378 dispatch_source_cancel(stream
->source
);
1379 dispatch_resume(stream
->source
);
1380 dispatch_release(stream
->source
);
1382 dispatch_release(stream
->dq
);
1387 _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
)
1389 // On devs lock queue
1390 dispatch_disk_t disk
;
1391 char label_name
[256];
1392 // Check to see if there is an existing entry for the given device
1393 uintptr_t hash
= DIO_HASH(dev
);
1394 TAILQ_FOREACH(disk
, &_dispatch_io_devs
[hash
], disk_list
) {
1395 if (disk
->dev
== dev
) {
1396 _dispatch_retain(disk
);
1400 // Otherwise create a new entry
1401 size_t pending_reqs_depth
= dispatch_io_defaults
.max_pending_io_reqs
;
1402 disk
= _dispatch_alloc(DISPATCH_VTABLE(disk
),
1403 sizeof(struct dispatch_disk_s
) +
1404 (pending_reqs_depth
* sizeof(dispatch_operation_t
)));
1405 disk
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1406 disk
->do_xref_cnt
= -1;
1407 disk
->advise_list_depth
= pending_reqs_depth
;
1408 disk
->do_targetq
= _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT
,
1411 TAILQ_INIT(&disk
->operations
);
1412 disk
->cur_rq
= TAILQ_FIRST(&disk
->operations
);
1413 sprintf(label_name
, "com.apple.libdispatch-io.deviceq.%d", dev
);
1414 disk
->pick_queue
= dispatch_queue_create(label_name
, NULL
);
1415 TAILQ_INSERT_TAIL(&_dispatch_io_devs
[hash
], disk
, disk_list
);
1417 fd_entry
->disk
= disk
;
1418 TAILQ_INIT(&fd_entry
->stream_ops
);
1422 _dispatch_disk_dispose(dispatch_disk_t disk
)
1424 uintptr_t hash
= DIO_HASH(disk
->dev
);
1425 TAILQ_REMOVE(&_dispatch_io_devs
[hash
], disk
, disk_list
);
1426 dispatch_assert(TAILQ_EMPTY(&disk
->operations
));
1428 for (i
=0; i
<disk
->advise_list_depth
; ++i
) {
1429 dispatch_assert(!disk
->advise_list
[i
]);
1431 dispatch_release(disk
->pick_queue
);
1435 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1438 _dispatch_stream_operation_avail(dispatch_stream_t stream
)
1440 return !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) ||
1441 !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
]));
1445 _dispatch_stream_enqueue_operation(dispatch_stream_t stream
,
1446 dispatch_operation_t op
, dispatch_data_t data
)
1448 if (!_dispatch_operation_should_enqueue(op
, stream
->dq
, data
)) {
1451 bool no_ops
= !_dispatch_stream_operation_avail(stream
);
1452 TAILQ_INSERT_TAIL(&stream
->operations
[op
->params
.type
], op
, operation_list
);
1454 dispatch_async_f(stream
->dq
, stream
, _dispatch_stream_handler
);
1459 _dispatch_disk_enqueue_operation(dispatch_disk_t disk
, dispatch_operation_t op
,
1460 dispatch_data_t data
)
1462 if (!_dispatch_operation_should_enqueue(op
, disk
->pick_queue
, data
)) {
1465 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1466 if (TAILQ_EMPTY(&op
->fd_entry
->stream_ops
)) {
1467 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
);
1469 TAILQ_INSERT_TAIL(&op
->fd_entry
->stream_ops
, op
, stream_list
);
1471 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
);
1473 _dispatch_disk_handler(disk
);
1477 _dispatch_stream_complete_operation(dispatch_stream_t stream
,
1478 dispatch_operation_t op
)
1481 _dispatch_io_debug("complete operation", op
->fd_entry
->fd
);
1482 TAILQ_REMOVE(&stream
->operations
[op
->params
.type
], op
, operation_list
);
1483 if (op
== stream
->op
) {
1487 dispatch_source_cancel(op
->timer
);
1489 // Final release will deliver any pending data
1490 _dispatch_release(op
);
1494 _dispatch_disk_complete_operation(dispatch_disk_t disk
, dispatch_operation_t op
)
1497 _dispatch_io_debug("complete operation", op
->fd_entry
->fd
);
1498 // Current request is always the last op returned
1499 if (disk
->cur_rq
== op
) {
1500 disk
->cur_rq
= TAILQ_PREV(op
, dispatch_disk_operations_s
,
1503 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1504 // Check if there are other pending stream operations behind it
1505 dispatch_operation_t op_next
= TAILQ_NEXT(op
, stream_list
);
1506 TAILQ_REMOVE(&op
->fd_entry
->stream_ops
, op
, stream_list
);
1508 TAILQ_INSERT_TAIL(&disk
->operations
, op_next
, operation_list
);
1511 TAILQ_REMOVE(&disk
->operations
, op
, operation_list
);
1513 dispatch_source_cancel(op
->timer
);
1515 // Final release will deliver any pending data
1516 _dispatch_release(op
);
1519 static dispatch_operation_t
1520 _dispatch_stream_pick_next_operation(dispatch_stream_t stream
,
1521 dispatch_operation_t op
)
1525 // On the first run through, pick the first operation
1526 if (!_dispatch_stream_operation_avail(stream
)) {
1529 if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
])) {
1530 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_STREAM
]);
1531 } else if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) {
1532 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1536 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1537 // Stream operations need to be serialized so continue the current
1538 // operation until it is finished
1541 // Get the next random operation (round-robin)
1542 if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
1543 op
= TAILQ_NEXT(op
, operation_list
);
1545 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1552 static dispatch_operation_t
1553 _dispatch_disk_pick_next_operation(dispatch_disk_t disk
)
1556 dispatch_operation_t op
;
1557 if (!TAILQ_EMPTY(&disk
->operations
)) {
1558 if (disk
->cur_rq
== NULL
) {
1559 op
= TAILQ_FIRST(&disk
->operations
);
1563 op
= TAILQ_NEXT(op
, operation_list
);
1565 op
= TAILQ_FIRST(&disk
->operations
);
1567 // TODO: more involved picking algorithm rdar://problem/8780312
1568 } while (op
->active
&& op
!= disk
->cur_rq
);
1579 _dispatch_stream_cleanup_operations(dispatch_stream_t stream
,
1580 dispatch_io_t channel
)
1583 dispatch_operation_t op
, tmp
;
1584 typeof(*stream
->operations
) *operations
;
1585 operations
= &stream
->operations
[DISPATCH_IO_RANDOM
];
1586 TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) {
1587 if (!channel
|| op
->channel
== channel
) {
1588 _dispatch_stream_complete_operation(stream
, op
);
1591 operations
= &stream
->operations
[DISPATCH_IO_STREAM
];
1592 TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) {
1593 if (!channel
|| op
->channel
== channel
) {
1594 _dispatch_stream_complete_operation(stream
, op
);
1597 if (stream
->source_running
&& !_dispatch_stream_operation_avail(stream
)) {
1598 dispatch_suspend(stream
->source
);
1599 stream
->source_running
= false;
1604 _dispatch_disk_cleanup_operations(dispatch_disk_t disk
, dispatch_io_t channel
)
1607 dispatch_operation_t op
, tmp
;
1608 TAILQ_FOREACH_SAFE(op
, &disk
->operations
, operation_list
, tmp
) {
1609 if (!channel
|| op
->channel
== channel
) {
1610 _dispatch_disk_complete_operation(disk
, op
);
1616 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1618 static dispatch_source_t
1619 _dispatch_stream_source(dispatch_stream_t stream
, dispatch_operation_t op
)
1622 if (stream
->source
) {
1623 return stream
->source
;
1625 dispatch_fd_t fd
= op
->fd_entry
->fd
;
1626 _dispatch_io_debug("stream source create", fd
);
1627 dispatch_source_t source
= NULL
;
1628 if (op
->direction
== DOP_DIR_READ
) {
1629 source
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, fd
, 0,
1631 } else if (op
->direction
== DOP_DIR_WRITE
) {
1632 source
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
, fd
, 0,
1635 dispatch_assert(op
->direction
< DOP_DIR_MAX
);
1638 dispatch_set_context(source
, stream
);
1639 dispatch_source_set_event_handler_f(source
,
1640 _dispatch_stream_source_handler
);
1641 // Close queue must not run user cleanup handlers until sources are fully
1643 dispatch_queue_t close_queue
= op
->fd_entry
->close_queue
;
1644 dispatch_source_set_cancel_handler(source
, ^{
1645 _dispatch_io_debug("stream source cancel", fd
);
1646 dispatch_resume(close_queue
);
1648 stream
->source
= source
;
1649 return stream
->source
;
1653 _dispatch_stream_source_handler(void *ctx
)
1656 dispatch_stream_t stream
= (dispatch_stream_t
)ctx
;
1657 dispatch_suspend(stream
->source
);
1658 stream
->source_running
= false;
1659 return _dispatch_stream_handler(stream
);
1663 _dispatch_stream_handler(void *ctx
)
1666 dispatch_stream_t stream
= (dispatch_stream_t
)ctx
;
1667 dispatch_operation_t op
;
1669 op
= _dispatch_stream_pick_next_operation(stream
, stream
->op
);
1671 _dispatch_debug("no operation found: stream %p", stream
);
1674 int err
= _dispatch_io_get_error(op
, NULL
, true);
1677 _dispatch_stream_complete_operation(stream
, op
);
1681 _dispatch_io_debug("stream handler", op
->fd_entry
->fd
);
1682 dispatch_fd_entry_t fd_entry
= op
->fd_entry
;
1683 _dispatch_fd_entry_retain(fd_entry
);
1684 // For performance analysis
1685 if (!op
->total
&& dispatch_io_defaults
.initial_delivery
) {
1686 // Empty delivery to signal the start of the operation
1687 _dispatch_io_debug("initial delivery", op
->fd_entry
->fd
);
1688 _dispatch_operation_deliver_data(op
, DOP_DELIVER
);
1690 // TODO: perform on the operation target queue to get correct priority
1691 int result
= _dispatch_operation_perform(op
), flags
= -1;
1693 case DISPATCH_OP_DELIVER
:
1694 flags
= DOP_DEFAULT
;
1696 case DISPATCH_OP_DELIVER_AND_COMPLETE
:
1697 flags
= (flags
!= DOP_DEFAULT
) ? DOP_DELIVER
| DOP_NO_EMPTY
:
1699 _dispatch_operation_deliver_data(op
, flags
);
1701 case DISPATCH_OP_COMPLETE
:
1702 if (flags
!= DOP_DEFAULT
) {
1703 _dispatch_stream_complete_operation(stream
, op
);
1705 if (_dispatch_stream_operation_avail(stream
)) {
1706 dispatch_async_f(stream
->dq
, stream
, _dispatch_stream_handler
);
1709 case DISPATCH_OP_COMPLETE_RESUME
:
1710 _dispatch_stream_complete_operation(stream
, op
);
1712 case DISPATCH_OP_RESUME
:
1713 if (_dispatch_stream_operation_avail(stream
)) {
1714 stream
->source_running
= true;
1715 dispatch_resume(_dispatch_stream_source(stream
, op
));
1718 case DISPATCH_OP_ERR
:
1719 _dispatch_stream_cleanup_operations(stream
, op
->channel
);
1721 case DISPATCH_OP_FD_ERR
:
1722 _dispatch_fd_entry_retain(fd_entry
);
1723 dispatch_async(fd_entry
->barrier_queue
, ^{
1724 _dispatch_fd_entry_cleanup_operations(fd_entry
, NULL
);
1725 _dispatch_fd_entry_release(fd_entry
);
1731 _dispatch_fd_entry_release(fd_entry
);
1736 _dispatch_disk_handler(void *ctx
)
1739 dispatch_disk_t disk
= (dispatch_disk_t
)ctx
;
1740 if (disk
->io_active
) {
1743 _dispatch_io_debug("disk handler", -1);
1744 dispatch_operation_t op
;
1745 size_t i
= disk
->free_idx
, j
= disk
->req_idx
;
1747 j
+= disk
->advise_list_depth
;
1750 if ((!disk
->advise_list
[i%disk
->advise_list_depth
]) &&
1751 (op
= _dispatch_disk_pick_next_operation(disk
))) {
1752 int err
= _dispatch_io_get_error(op
, NULL
, true);
1755 _dispatch_disk_complete_operation(disk
, op
);
1758 _dispatch_retain(op
);
1759 disk
->advise_list
[i%disk
->advise_list_depth
] = op
;
1762 // No more operations to get
1767 disk
->free_idx
= (i%disk
->advise_list_depth
);
1768 op
= disk
->advise_list
[disk
->req_idx
];
1770 disk
->io_active
= true;
1771 dispatch_async_f(op
->do_targetq
, disk
, _dispatch_disk_perform
);
1776 _dispatch_disk_perform(void *ctxt
)
1778 dispatch_disk_t disk
= ctxt
;
1779 size_t chunk_size
= dispatch_io_defaults
.chunk_pages
* PAGE_SIZE
;
1780 _dispatch_io_debug("disk perform", -1);
1781 dispatch_operation_t op
;
1782 size_t i
= disk
->advise_idx
, j
= disk
->free_idx
;
1784 j
+= disk
->advise_list_depth
;
1787 op
= disk
->advise_list
[i%disk
->advise_list_depth
];
1789 // Nothing more to advise, must be at free_idx
1790 dispatch_assert(i%disk
->advise_list_depth
== disk
->free_idx
);
1793 if (op
->direction
== DOP_DIR_WRITE
) {
1794 // TODO: preallocate writes ? rdar://problem/9032172
1797 if (op
->fd_entry
->fd
== -1 && _dispatch_fd_entry_open(op
->fd_entry
,
1801 // For performance analysis
1802 if (!op
->total
&& dispatch_io_defaults
.initial_delivery
) {
1803 // Empty delivery to signal the start of the operation
1804 _dispatch_io_debug("initial delivery", op
->fd_entry
->fd
);
1805 _dispatch_operation_deliver_data(op
, DOP_DELIVER
);
1807 // Advise two chunks if the list only has one element and this is the
1808 // first advise on the operation
1809 if ((j
-i
) == 1 && !disk
->advise_list
[disk
->free_idx
] &&
1810 !op
->advise_offset
) {
1813 _dispatch_operation_advise(op
, chunk_size
);
1815 disk
->advise_idx
= i%disk
->advise_list_depth
;
1816 op
= disk
->advise_list
[disk
->req_idx
];
1817 int result
= _dispatch_operation_perform(op
);
1818 disk
->advise_list
[disk
->req_idx
] = NULL
;
1819 disk
->req_idx
= (++disk
->req_idx
)%disk
->advise_list_depth
;
1820 dispatch_async(disk
->pick_queue
, ^{
1822 case DISPATCH_OP_DELIVER
:
1823 _dispatch_operation_deliver_data(op
, DOP_DEFAULT
);
1825 case DISPATCH_OP_COMPLETE
:
1826 _dispatch_disk_complete_operation(disk
, op
);
1828 case DISPATCH_OP_DELIVER_AND_COMPLETE
:
1829 _dispatch_operation_deliver_data(op
, DOP_DELIVER
| DOP_NO_EMPTY
);
1830 _dispatch_disk_complete_operation(disk
, op
);
1832 case DISPATCH_OP_ERR
:
1833 _dispatch_disk_cleanup_operations(disk
, op
->channel
);
1835 case DISPATCH_OP_FD_ERR
:
1836 _dispatch_disk_cleanup_operations(disk
, NULL
);
1839 dispatch_assert(result
);
1843 disk
->io_active
= false;
1844 _dispatch_disk_handler(disk
);
1845 // Balancing the retain in _dispatch_disk_handler. Note that op must be
1846 // released at the very end, since it might hold the last reference to
1848 _dispatch_release(op
);
1853 #pragma mark dispatch_operation_perform
1856 _dispatch_operation_advise(dispatch_operation_t op
, size_t chunk_size
)
1859 struct radvisory advise
;
1860 // No point in issuing a read advise for the next chunk if we are already
1861 // a chunk ahead from reading the bytes
1862 if (op
->advise_offset
> (off_t
)((op
->offset
+op
->total
) + chunk_size
+
1866 advise
.ra_count
= (int)chunk_size
;
1867 if (!op
->advise_offset
) {
1868 op
->advise_offset
= op
->offset
;
1869 // If this is the first time through, align the advised range to a
1871 size_t pg_fraction
= (size_t)((op
->offset
+ chunk_size
) % PAGE_SIZE
);
1872 advise
.ra_count
+= (int)(pg_fraction
? PAGE_SIZE
- pg_fraction
: 0);
1874 advise
.ra_offset
= op
->advise_offset
;
1875 op
->advise_offset
+= advise
.ra_count
;
1876 _dispatch_io_syscall_switch(err
,
1877 fcntl(op
->fd_entry
->fd
, F_RDADVISE
, &advise
),
1878 // TODO: set disk status on error
1879 default: (void)dispatch_assume_zero(err
); break;
1884 _dispatch_operation_perform(dispatch_operation_t op
)
1886 int err
= _dispatch_io_get_error(op
, NULL
, true);
1891 size_t max_buf_siz
= op
->params
.high
;
1892 size_t chunk_siz
= dispatch_io_defaults
.chunk_pages
* PAGE_SIZE
;
1893 if (op
->direction
== DOP_DIR_READ
) {
1894 // If necessary, create a buffer for the ongoing operation, large
1895 // enough to fit chunk_pages but at most high-water
1896 size_t data_siz
= dispatch_data_get_size(op
->data
);
1898 dispatch_assert(data_siz
< max_buf_siz
);
1899 max_buf_siz
-= data_siz
;
1901 if (max_buf_siz
> chunk_siz
) {
1902 max_buf_siz
= chunk_siz
;
1904 if (op
->length
< SIZE_MAX
) {
1905 op
->buf_siz
= op
->length
- op
->total
;
1906 if (op
->buf_siz
> max_buf_siz
) {
1907 op
->buf_siz
= max_buf_siz
;
1910 op
->buf_siz
= max_buf_siz
;
1912 op
->buf
= valloc(op
->buf_siz
);
1913 _dispatch_io_debug("buffer allocated", op
->fd_entry
->fd
);
1914 } else if (op
->direction
== DOP_DIR_WRITE
) {
1915 // Always write the first data piece, if that is smaller than a
1916 // chunk, accumulate further data pieces until chunk size is reached
1917 if (chunk_siz
> max_buf_siz
) {
1918 chunk_siz
= max_buf_siz
;
1921 dispatch_data_apply(op
->data
,
1922 ^(dispatch_data_t region DISPATCH_UNUSED
,
1923 size_t offset DISPATCH_UNUSED
,
1924 const void* buf DISPATCH_UNUSED
, size_t len
) {
1925 size_t siz
= op
->buf_siz
+ len
;
1926 if (!op
->buf_siz
|| siz
<= chunk_siz
) {
1929 return (bool)(siz
< chunk_siz
);
1931 if (op
->buf_siz
> max_buf_siz
) {
1932 op
->buf_siz
= max_buf_siz
;
1935 d
= dispatch_data_create_subrange(op
->data
, 0, op
->buf_siz
);
1936 op
->buf_data
= dispatch_data_create_map(d
, (const void**)&op
->buf
,
1938 _dispatch_io_data_release(d
);
1939 _dispatch_io_debug("buffer mapped", op
->fd_entry
->fd
);
1942 if (op
->fd_entry
->fd
== -1) {
1943 err
= _dispatch_fd_entry_open(op
->fd_entry
, op
->channel
);
1948 void *buf
= op
->buf
+ op
->buf_len
;
1949 size_t len
= op
->buf_siz
- op
->buf_len
;
1950 off_t off
= op
->offset
+ op
->total
;
1951 ssize_t processed
= -1;
1953 if (op
->direction
== DOP_DIR_READ
) {
1954 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1955 processed
= read(op
->fd_entry
->fd
, buf
, len
);
1956 } else if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
1957 processed
= pread(op
->fd_entry
->fd
, buf
, len
, off
);
1959 } else if (op
->direction
== DOP_DIR_WRITE
) {
1960 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1961 processed
= write(op
->fd_entry
->fd
, buf
, len
);
1962 } else if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
1963 processed
= pwrite(op
->fd_entry
->fd
, buf
, len
, off
);
1966 // Encountered an error on the file descriptor
1967 if (processed
== -1) {
1974 // EOF is indicated by two handler invocations
1975 if (processed
== 0) {
1976 _dispatch_io_debug("EOF", op
->fd_entry
->fd
);
1977 return DISPATCH_OP_DELIVER_AND_COMPLETE
;
1979 op
->buf_len
+= processed
;
1980 op
->total
+= processed
;
1981 if (op
->total
== op
->length
) {
1982 // Finished processing all the bytes requested by the operation
1983 return DISPATCH_OP_COMPLETE
;
1985 // Deliver data only if we satisfy the filters
1986 return DISPATCH_OP_DELIVER
;
1989 if (err
== EAGAIN
) {
1990 // For disk based files with blocking I/O we should never get EAGAIN
1991 dispatch_assert(!op
->fd_entry
->disk
);
1992 _dispatch_io_debug("EAGAIN %d", op
->fd_entry
->fd
, err
);
1993 if (op
->direction
== DOP_DIR_READ
&& op
->total
&&
1994 op
->channel
== op
->fd_entry
->convenience_channel
) {
1995 // Convenience read with available data completes on EAGAIN
1996 return DISPATCH_OP_COMPLETE_RESUME
;
1998 return DISPATCH_OP_RESUME
;
2003 return DISPATCH_OP_ERR
;
2005 (void)dispatch_atomic_cmpxchg2o(op
->fd_entry
, err
, 0, err
);
2006 return DISPATCH_OP_FD_ERR
;
2008 return DISPATCH_OP_COMPLETE
;
2013 _dispatch_operation_deliver_data(dispatch_operation_t op
,
2014 dispatch_op_flags_t flags
)
2016 // Either called from stream resp. pick queue or when op is finalized
2017 dispatch_data_t data
= NULL
;
2019 size_t undelivered
= op
->undelivered
+ op
->buf_len
;
2020 bool deliver
= (flags
& (DOP_DELIVER
|DOP_DONE
)) ||
2021 (op
->flags
& DOP_DELIVER
);
2022 op
->flags
= DOP_DEFAULT
;
2024 // Don't deliver data until low water mark has been reached
2025 if (undelivered
>= op
->params
.low
) {
2027 } else if (op
->buf_len
< op
->buf_siz
) {
2028 // Request buffer is not yet used up
2029 _dispatch_io_debug("buffer data", op
->fd_entry
->fd
);
2034 if (!err
&& (op
->channel
->atomic_flags
& DIO_STOPPED
)) {
2039 // Deliver data or buffer used up
2040 if (op
->direction
== DOP_DIR_READ
) {
2042 void *buf
= op
->buf
;
2043 data
= dispatch_data_create(buf
, op
->buf_len
, NULL
,
2044 DISPATCH_DATA_DESTRUCTOR_FREE
);
2047 dispatch_data_t d
= dispatch_data_create_concat(op
->data
, data
);
2048 _dispatch_io_data_release(op
->data
);
2049 _dispatch_io_data_release(data
);
2054 op
->data
= deliver
? dispatch_data_empty
: data
;
2055 } else if (op
->direction
== DOP_DIR_WRITE
) {
2057 data
= dispatch_data_create_subrange(op
->data
, op
->buf_len
,
2060 if (op
->buf_data
&& op
->buf_len
== op
->buf_siz
) {
2061 _dispatch_io_data_release(op
->buf_data
);
2062 op
->buf_data
= NULL
;
2065 // Trim newly written buffer from head of unwritten data
2068 _dispatch_io_data_retain(data
);
2071 d
= dispatch_data_create_subrange(op
->data
, op
->buf_siz
,
2074 _dispatch_io_data_release(op
->data
);
2078 dispatch_assert(op
->direction
< DOP_DIR_MAX
);
2081 if (!deliver
|| ((flags
& DOP_NO_EMPTY
) && !dispatch_data_get_size(data
))) {
2082 op
->undelivered
= undelivered
;
2083 _dispatch_io_debug("buffer data", op
->fd_entry
->fd
);
2086 op
->undelivered
= 0;
2087 _dispatch_io_debug("deliver data", op
->fd_entry
->fd
);
2088 dispatch_op_direction_t direction
= op
->direction
;
2089 dispatch_io_handler_t handler
= op
->handler
;
2090 #if DISPATCH_IO_DEBUG
2091 int fd
= op
->fd_entry
->fd
;
2093 dispatch_fd_entry_t fd_entry
= op
->fd_entry
;
2094 _dispatch_fd_entry_retain(fd_entry
);
2095 dispatch_io_t channel
= op
->channel
;
2096 _dispatch_retain(channel
);
2097 // Note that data delivery may occur after the operation is freed
2098 dispatch_async(op
->op_q
, ^{
2099 bool done
= (flags
& DOP_DONE
);
2100 dispatch_data_t d
= data
;
2102 if (direction
== DOP_DIR_READ
&& err
) {
2103 if (dispatch_data_get_size(d
)) {
2104 _dispatch_io_debug("IO handler invoke", fd
);
2105 handler(false, d
, 0);
2108 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
2112 _dispatch_io_debug("IO handler invoke", fd
);
2113 handler(done
, d
, err
);
2114 _dispatch_release(channel
);
2115 _dispatch_fd_entry_release(fd_entry
);
2116 _dispatch_io_data_release(data
);