2 * Copyright (c) 2009-2011 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 typedef void (^dispatch_fd_entry_init_callback_t
)(dispatch_fd_entry_t fd_entry
);
25 DISPATCH_EXPORT DISPATCH_NOTHROW
26 void _dispatch_iocntl(uint32_t param
, uint64_t value
);
28 static void _dispatch_io_dispose(dispatch_io_t channel
);
29 static dispatch_operation_t
_dispatch_operation_create(
30 dispatch_op_direction_t direction
, dispatch_io_t channel
, off_t offset
,
31 size_t length
, dispatch_data_t data
, dispatch_queue_t queue
,
32 dispatch_io_handler_t handler
);
33 static void _dispatch_operation_dispose(dispatch_operation_t operation
);
34 static void _dispatch_operation_enqueue(dispatch_operation_t op
,
35 dispatch_op_direction_t direction
, dispatch_data_t data
);
36 static dispatch_source_t
_dispatch_operation_timer(dispatch_queue_t tq
,
37 dispatch_operation_t op
);
38 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
);
39 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
);
40 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd
,
41 dispatch_fd_entry_init_callback_t completion_callback
);
42 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
,
44 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_path(
45 dispatch_io_path_data_t path_data
, dev_t dev
, mode_t mode
);
46 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
,
47 dispatch_io_t channel
);
48 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
,
49 dispatch_io_t channel
);
50 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry
,
52 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
,
53 dispatch_op_direction_t direction
);
54 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
);
55 static void _dispatch_disk_dispose(dispatch_disk_t disk
);
56 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream
,
57 dispatch_operation_t operation
, dispatch_data_t data
);
58 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk
,
59 dispatch_operation_t operation
, dispatch_data_t data
);
60 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream
,
61 dispatch_io_t channel
);
62 static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk
,
63 dispatch_io_t channel
);
64 static void _dispatch_stream_source_handler(void *ctx
);
65 static void _dispatch_stream_handler(void *ctx
);
66 static void _dispatch_disk_handler(void *ctx
);
67 static void _dispatch_disk_perform(void *ctxt
);
68 static void _dispatch_operation_advise(dispatch_operation_t op
,
70 static int _dispatch_operation_perform(dispatch_operation_t op
);
71 static void _dispatch_operation_deliver_data(dispatch_operation_t op
,
72 dispatch_op_flags_t flags
);
74 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
75 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
76 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
77 case EINTR: continue; \
81 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
82 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
87 #define _dispatch_io_syscall(__syscall) do { int __err; \
88 _dispatch_io_syscall_switch(__err, __syscall); \
92 DISPATCH_OP_COMPLETE
= 1,
94 DISPATCH_OP_DELIVER_AND_COMPLETE
,
95 DISPATCH_OP_COMPLETE_RESUME
,
102 #pragma mark dispatch_io_vtable
104 static const struct dispatch_io_vtable_s _dispatch_io_vtable
= {
105 .do_type
= DISPATCH_IO_TYPE
,
106 .do_kind
= "channel",
107 .do_dispose
= _dispatch_io_dispose
,
109 .do_probe
= (void *)dummy_function_r0
,
110 .do_debug
= (void *)dummy_function_r0
,
113 static const struct dispatch_operation_vtable_s _dispatch_operation_vtable
= {
114 .do_type
= DISPATCH_OPERATION_TYPE
,
115 .do_kind
= "operation",
116 .do_dispose
= _dispatch_operation_dispose
,
118 .do_probe
= (void *)dummy_function_r0
,
119 .do_debug
= (void *)dummy_function_r0
,
122 static const struct dispatch_disk_vtable_s _dispatch_disk_vtable
= {
123 .do_type
= DISPATCH_DISK_TYPE
,
125 .do_dispose
= _dispatch_disk_dispose
,
127 .do_probe
= (void *)dummy_function_r0
,
128 .do_debug
= (void *)dummy_function_r0
,
132 #pragma mark dispatch_io_hashtables
134 #if TARGET_OS_EMBEDDED
135 #define DIO_HASH_SIZE 64u // must be a power of two
137 #define DIO_HASH_SIZE 256u // must be a power of two
139 #define DIO_HASH(x) ((uintptr_t)((x) & (DIO_HASH_SIZE - 1)))
141 // Global hashtable of dev_t -> disk_s mappings
142 DISPATCH_CACHELINE_ALIGN
143 static TAILQ_HEAD(, dispatch_disk_s
) _dispatch_io_devs
[DIO_HASH_SIZE
];
144 // Global hashtable of fd -> fd_entry_s mappings
145 DISPATCH_CACHELINE_ALIGN
146 static TAILQ_HEAD(, dispatch_fd_entry_s
) _dispatch_io_fds
[DIO_HASH_SIZE
];
148 static dispatch_once_t _dispatch_io_devs_lockq_pred
;
149 static dispatch_queue_t _dispatch_io_devs_lockq
;
150 static dispatch_queue_t _dispatch_io_fds_lockq
;
153 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED
)
155 _dispatch_io_fds_lockq
= dispatch_queue_create(
156 "com.apple.libdispatch-io.fd_lockq", NULL
);
158 for (i
= 0; i
< DIO_HASH_SIZE
; i
++) {
159 TAILQ_INIT(&_dispatch_io_fds
[i
]);
164 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED
)
166 _dispatch_io_devs_lockq
= dispatch_queue_create(
167 "com.apple.libdispatch-io.dev_lockq", NULL
);
169 for (i
= 0; i
< DIO_HASH_SIZE
; i
++) {
170 TAILQ_INIT(&_dispatch_io_devs
[i
]);
175 #pragma mark dispatch_io_defaults
178 DISPATCH_IOCNTL_CHUNK_PAGES
= 1,
179 DISPATCH_IOCNTL_LOW_WATER_CHUNKS
,
180 DISPATCH_IOCNTL_INITIAL_DELIVERY
,
181 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
,
184 static struct dispatch_io_defaults_s
{
185 size_t chunk_pages
, low_water_chunks
, max_pending_io_reqs
;
186 bool initial_delivery
;
187 } dispatch_io_defaults
= {
188 .chunk_pages
= DIO_MAX_CHUNK_PAGES
,
189 .low_water_chunks
= DIO_DEFAULT_LOW_WATER_CHUNKS
,
190 .max_pending_io_reqs
= DIO_MAX_PENDING_IO_REQS
,
193 #define _dispatch_iocntl_set_default(p, v) do { \
194 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
198 _dispatch_iocntl(uint32_t param
, uint64_t value
)
201 case DISPATCH_IOCNTL_CHUNK_PAGES
:
202 _dispatch_iocntl_set_default(chunk_pages
, value
);
204 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS
:
205 _dispatch_iocntl_set_default(low_water_chunks
, value
);
207 case DISPATCH_IOCNTL_INITIAL_DELIVERY
:
208 _dispatch_iocntl_set_default(initial_delivery
, value
);
209 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS
:
210 _dispatch_iocntl_set_default(max_pending_io_reqs
, value
);
216 #pragma mark dispatch_io_t
219 _dispatch_io_create(dispatch_io_type_t type
)
221 dispatch_io_t channel
= calloc(1ul, sizeof(struct dispatch_io_s
));
222 channel
->do_vtable
= &_dispatch_io_vtable
;
223 channel
->do_next
= DISPATCH_OBJECT_LISTLESS
;
224 channel
->do_ref_cnt
= 1;
225 channel
->do_xref_cnt
= 1;
226 channel
->do_targetq
= _dispatch_get_root_queue(0, true);
227 channel
->params
.type
= type
;
228 channel
->params
.high
= SIZE_MAX
;
229 channel
->params
.low
= dispatch_io_defaults
.low_water_chunks
*
230 dispatch_io_defaults
.chunk_pages
* PAGE_SIZE
;
231 channel
->queue
= dispatch_queue_create("com.apple.libdispatch-io.channelq",
237 _dispatch_io_init(dispatch_io_t channel
, dispatch_fd_entry_t fd_entry
,
238 dispatch_queue_t queue
, int err
, void (^cleanup_handler
)(int))
240 // Enqueue the cleanup handler on the suspended close queue
241 if (cleanup_handler
) {
242 _dispatch_retain(queue
);
243 dispatch_async(!err
? fd_entry
->close_queue
: channel
->queue
, ^{
244 dispatch_async(queue
, ^{
245 _dispatch_io_debug("cleanup handler invoke", -1);
246 cleanup_handler(err
);
248 _dispatch_release(queue
);
252 channel
->fd_entry
= fd_entry
;
253 dispatch_retain(fd_entry
->barrier_queue
);
254 dispatch_retain(fd_entry
->barrier_group
);
255 channel
->barrier_queue
= fd_entry
->barrier_queue
;
256 channel
->barrier_group
= fd_entry
->barrier_group
;
258 // Still need to create a barrier queue, since all operations go
260 channel
->barrier_queue
= dispatch_queue_create(
261 "com.apple.libdispatch-io.barrierq", NULL
);
262 channel
->barrier_group
= dispatch_group_create();
267 _dispatch_io_dispose(dispatch_io_t channel
)
269 if (channel
->fd_entry
&& !(channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
))) {
270 if (channel
->fd_entry
->path_data
) {
271 // This modification is safe since path_data->channel is checked
272 // only on close_queue (which is still suspended at this point)
273 channel
->fd_entry
->path_data
->channel
= NULL
;
275 // Cleanup handlers will only run when all channels related to this
277 _dispatch_fd_entry_release(channel
->fd_entry
);
279 if (channel
->queue
) {
280 dispatch_release(channel
->queue
);
282 if (channel
->barrier_queue
) {
283 dispatch_release(channel
->barrier_queue
);
285 if (channel
->barrier_group
) {
286 dispatch_release(channel
->barrier_group
);
288 _dispatch_dispose(channel
);
292 _dispatch_io_validate_type(dispatch_io_t channel
, mode_t mode
)
297 } else if (channel
->params
.type
== DISPATCH_IO_RANDOM
&&
298 (S_ISFIFO(mode
) || S_ISSOCK(mode
))) {
305 _dispatch_io_get_error(dispatch_operation_t op
, dispatch_io_t channel
,
311 channel
= op
->channel
;
313 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
314 if (!ignore_closed
|| channel
->atomic_flags
& DIO_STOPPED
) {
320 err
= op
? op
->fd_entry
->err
: channel
->err
;
326 #pragma mark dispatch_io_channels
329 dispatch_io_create(dispatch_io_type_t type
, dispatch_fd_t fd
,
330 dispatch_queue_t queue
, void (^cleanup_handler
)(int))
332 if (type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) {
335 _dispatch_io_debug("io create", fd
);
336 dispatch_io_t channel
= _dispatch_io_create(type
);
338 channel
->fd_actual
= fd
;
339 dispatch_suspend(channel
->queue
);
340 _dispatch_retain(queue
);
341 _dispatch_retain(channel
);
342 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
344 int err
= fd_entry
->err
;
346 err
= _dispatch_io_validate_type(channel
, fd_entry
->stat
.mode
);
348 if (!err
&& type
== DISPATCH_IO_RANDOM
) {
350 _dispatch_io_syscall_switch_noerr(err
,
351 f_ptr
= lseek(fd_entry
->fd
, 0, SEEK_CUR
),
352 case 0: channel
->f_ptr
= f_ptr
; break;
353 default: (void)dispatch_assume_zero(err
); break;
357 _dispatch_fd_entry_retain(fd_entry
);
358 _dispatch_io_init(channel
, fd_entry
, queue
, err
, cleanup_handler
);
359 dispatch_resume(channel
->queue
);
360 _dispatch_release(channel
);
361 _dispatch_release(queue
);
367 dispatch_io_create_with_path(dispatch_io_type_t type
, const char *path
,
368 int oflag
, mode_t mode
, dispatch_queue_t queue
,
369 void (^cleanup_handler
)(int error
))
371 if ((type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) ||
372 !(path
&& *path
== '/')) {
375 size_t pathlen
= strlen(path
);
376 dispatch_io_path_data_t path_data
= malloc(sizeof(*path_data
) + pathlen
+1);
380 _dispatch_io_debug("io create with path %s", -1, path
);
381 dispatch_io_t channel
= _dispatch_io_create(type
);
383 channel
->fd_actual
= -1;
384 path_data
->channel
= channel
;
385 path_data
->oflag
= oflag
;
386 path_data
->mode
= mode
;
387 path_data
->pathlen
= pathlen
;
388 memcpy(path_data
->path
, path
, pathlen
+ 1);
389 _dispatch_retain(queue
);
390 _dispatch_retain(channel
);
391 dispatch_async(channel
->queue
, ^{
394 _dispatch_io_syscall_switch_noerr(err
,
395 (path_data
->oflag
& O_NOFOLLOW
) == O_NOFOLLOW
||
396 (path_data
->oflag
& O_SYMLINK
) == O_SYMLINK
?
397 lstat(path_data
->path
, &st
) : stat(path_data
->path
, &st
),
399 err
= _dispatch_io_validate_type(channel
, st
.st_mode
);
402 if ((path_data
->oflag
& O_CREAT
) &&
403 (*(path_data
->path
+ path_data
->pathlen
- 1) != '/')) {
404 // Check parent directory
405 char *c
= strrchr(path_data
->path
, '/');
409 _dispatch_io_syscall_switch_noerr(perr
,
410 stat(path_data
->path
, &st
),
412 // Since the parent directory exists, open() will
413 // create a regular file after the fd_entry has
415 st
.st_mode
= S_IFREG
;
426 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
);
427 _dispatch_release(channel
);
428 _dispatch_release(queue
);
431 dispatch_suspend(channel
->queue
);
432 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
,
433 _dispatch_io_devs_lockq_init
);
434 dispatch_async(_dispatch_io_devs_lockq
, ^{
435 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create_with_path(
436 path_data
, st
.st_dev
, st
.st_mode
);
437 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
);
438 dispatch_resume(channel
->queue
);
439 _dispatch_release(channel
);
440 _dispatch_release(queue
);
447 dispatch_io_create_with_io(dispatch_io_type_t type
, dispatch_io_t in_channel
,
448 dispatch_queue_t queue
, void (^cleanup_handler
)(int error
))
450 if (type
!= DISPATCH_IO_STREAM
&& type
!= DISPATCH_IO_RANDOM
) {
453 _dispatch_io_debug("io create with io %p", -1, in_channel
);
454 dispatch_io_t channel
= _dispatch_io_create(type
);
455 dispatch_suspend(channel
->queue
);
456 _dispatch_retain(queue
);
457 _dispatch_retain(channel
);
458 _dispatch_retain(in_channel
);
459 dispatch_async(in_channel
->queue
, ^{
460 int err0
= _dispatch_io_get_error(NULL
, in_channel
, false);
463 _dispatch_io_init(channel
, NULL
, queue
, err0
, cleanup_handler
);
464 dispatch_resume(channel
->queue
);
465 _dispatch_release(channel
);
466 _dispatch_release(in_channel
);
467 _dispatch_release(queue
);
470 dispatch_async(in_channel
->barrier_queue
, ^{
471 int err
= _dispatch_io_get_error(NULL
, in_channel
, false);
472 // If there is no error, the fd_entry for the in_channel is valid.
473 // Since we are running on in_channel's queue, the fd_entry has been
474 // fully resolved and will stay valid for the duration of this block
476 err
= in_channel
->err
;
478 err
= in_channel
->fd_entry
->err
;
482 err
= _dispatch_io_validate_type(channel
,
483 in_channel
->fd_entry
->stat
.mode
);
485 if (!err
&& type
== DISPATCH_IO_RANDOM
&& in_channel
->fd
!= -1) {
487 _dispatch_io_syscall_switch_noerr(err
,
488 f_ptr
= lseek(in_channel
->fd_entry
->fd
, 0, SEEK_CUR
),
489 case 0: channel
->f_ptr
= f_ptr
; break;
490 default: (void)dispatch_assume_zero(err
); break;
495 _dispatch_io_init(channel
, NULL
, queue
, err
, cleanup_handler
);
496 dispatch_resume(channel
->queue
);
497 _dispatch_release(channel
);
498 _dispatch_release(in_channel
);
499 _dispatch_release(queue
);
502 if (in_channel
->fd
== -1) {
503 // in_channel was created from path
505 channel
->fd_actual
= -1;
506 mode_t mode
= in_channel
->fd_entry
->stat
.mode
;
507 dev_t dev
= in_channel
->fd_entry
->stat
.dev
;
508 size_t path_data_len
= sizeof(struct dispatch_io_path_data_s
) +
509 in_channel
->fd_entry
->path_data
->pathlen
+ 1;
510 dispatch_io_path_data_t path_data
= malloc(path_data_len
);
511 memcpy(path_data
, in_channel
->fd_entry
->path_data
,
513 path_data
->channel
= channel
;
514 // lockq_io_devs is known to already exist
515 dispatch_async(_dispatch_io_devs_lockq
, ^{
516 dispatch_fd_entry_t fd_entry
;
517 fd_entry
= _dispatch_fd_entry_create_with_path(path_data
,
519 _dispatch_io_init(channel
, fd_entry
, queue
, 0,
521 dispatch_resume(channel
->queue
);
522 _dispatch_release(channel
);
523 _dispatch_release(queue
);
526 dispatch_fd_entry_t fd_entry
= in_channel
->fd_entry
;
527 channel
->fd
= in_channel
->fd
;
528 channel
->fd_actual
= in_channel
->fd_actual
;
529 _dispatch_fd_entry_retain(fd_entry
);
530 _dispatch_io_init(channel
, fd_entry
, queue
, 0, cleanup_handler
);
531 dispatch_resume(channel
->queue
);
532 _dispatch_release(channel
);
533 _dispatch_release(queue
);
535 _dispatch_release(in_channel
);
542 #pragma mark dispatch_io_accessors
545 dispatch_io_set_high_water(dispatch_io_t channel
, size_t high_water
)
547 _dispatch_retain(channel
);
548 dispatch_async(channel
->queue
, ^{
549 _dispatch_io_debug("io set high water", channel
->fd
);
550 if (channel
->params
.low
> high_water
) {
551 channel
->params
.low
= high_water
;
553 channel
->params
.high
= high_water
? high_water
: 1;
554 _dispatch_release(channel
);
559 dispatch_io_set_low_water(dispatch_io_t channel
, size_t low_water
)
561 _dispatch_retain(channel
);
562 dispatch_async(channel
->queue
, ^{
563 _dispatch_io_debug("io set low water", channel
->fd
);
564 if (channel
->params
.high
< low_water
) {
565 channel
->params
.high
= low_water
? low_water
: 1;
567 channel
->params
.low
= low_water
;
568 _dispatch_release(channel
);
573 dispatch_io_set_interval(dispatch_io_t channel
, uint64_t interval
,
576 _dispatch_retain(channel
);
577 dispatch_async(channel
->queue
, ^{
578 _dispatch_io_debug("io set interval", channel
->fd
);
579 channel
->params
.interval
= interval
;
580 channel
->params
.interval_flags
= flags
;
581 _dispatch_release(channel
);
586 _dispatch_io_set_target_queue(dispatch_io_t channel
, dispatch_queue_t dq
)
588 _dispatch_retain(dq
);
589 _dispatch_retain(channel
);
590 dispatch_async(channel
->queue
, ^{
591 dispatch_queue_t prev_dq
= channel
->do_targetq
;
592 channel
->do_targetq
= dq
;
593 _dispatch_release(prev_dq
);
594 _dispatch_release(channel
);
599 dispatch_io_get_descriptor(dispatch_io_t channel
)
601 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
604 dispatch_fd_t fd
= channel
->fd_actual
;
606 _dispatch_thread_getspecific(dispatch_io_key
) == channel
) {
607 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
608 (void)_dispatch_fd_entry_open(fd_entry
, channel
);
610 return channel
->fd_actual
;
614 #pragma mark dispatch_io_operations
617 _dispatch_io_stop(dispatch_io_t channel
)
619 _dispatch_io_debug("io stop", channel
->fd
);
620 (void)dispatch_atomic_or2o(channel
, atomic_flags
, DIO_STOPPED
);
621 _dispatch_retain(channel
);
622 dispatch_async(channel
->queue
, ^{
623 dispatch_async(channel
->barrier_queue
, ^{
624 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
626 _dispatch_io_debug("io stop cleanup", channel
->fd
);
627 _dispatch_fd_entry_cleanup_operations(fd_entry
, channel
);
628 if (!(channel
->atomic_flags
& DIO_CLOSED
)) {
629 channel
->fd_entry
= NULL
;
630 _dispatch_fd_entry_release(fd_entry
);
632 } else if (channel
->fd
!= -1) {
633 // Stop after close, need to check if fd_entry still exists
634 _dispatch_retain(channel
);
635 dispatch_async(_dispatch_io_fds_lockq
, ^{
636 _dispatch_io_debug("io stop after close cleanup",
638 dispatch_fd_entry_t fdi
;
639 uintptr_t hash
= DIO_HASH(channel
->fd
);
640 TAILQ_FOREACH(fdi
, &_dispatch_io_fds
[hash
], fd_list
) {
641 if (fdi
->fd
== channel
->fd
) {
642 _dispatch_fd_entry_cleanup_operations(fdi
, channel
);
646 _dispatch_release(channel
);
649 _dispatch_release(channel
);
655 dispatch_io_close(dispatch_io_t channel
, unsigned long flags
)
657 if (flags
& DISPATCH_IO_STOP
) {
658 // Don't stop an already stopped channel
659 if (channel
->atomic_flags
& DIO_STOPPED
) {
662 return _dispatch_io_stop(channel
);
664 // Don't close an already closed or stopped channel
665 if (channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
)) {
668 _dispatch_retain(channel
);
669 dispatch_async(channel
->queue
, ^{
670 dispatch_async(channel
->barrier_queue
, ^{
671 _dispatch_io_debug("io close", channel
->fd
);
672 if (!(channel
->atomic_flags
& (DIO_CLOSED
|DIO_STOPPED
))) {
673 (void)dispatch_atomic_or2o(channel
, atomic_flags
, DIO_CLOSED
);
674 dispatch_fd_entry_t fd_entry
= channel
->fd_entry
;
675 if (!fd_entry
->path_data
) {
676 channel
->fd_entry
= NULL
;
678 _dispatch_fd_entry_release(fd_entry
);
680 _dispatch_release(channel
);
686 dispatch_io_barrier(dispatch_io_t channel
, dispatch_block_t barrier
)
688 _dispatch_retain(channel
);
689 dispatch_async(channel
->queue
, ^{
690 dispatch_queue_t io_q
= channel
->do_targetq
;
691 dispatch_queue_t barrier_queue
= channel
->barrier_queue
;
692 dispatch_group_t barrier_group
= channel
->barrier_group
;
693 dispatch_async(barrier_queue
, ^{
694 dispatch_suspend(barrier_queue
);
695 dispatch_group_notify(barrier_group
, io_q
, ^{
696 _dispatch_thread_setspecific(dispatch_io_key
, channel
);
698 _dispatch_thread_setspecific(dispatch_io_key
, NULL
);
699 dispatch_resume(barrier_queue
);
700 _dispatch_release(channel
);
707 dispatch_io_read(dispatch_io_t channel
, off_t offset
, size_t length
,
708 dispatch_queue_t queue
, dispatch_io_handler_t handler
)
710 _dispatch_retain(channel
);
711 _dispatch_retain(queue
);
712 dispatch_async(channel
->queue
, ^{
713 dispatch_operation_t op
;
714 op
= _dispatch_operation_create(DOP_DIR_READ
, channel
, offset
,
715 length
, dispatch_data_empty
, queue
, handler
);
717 dispatch_queue_t barrier_q
= channel
->barrier_queue
;
718 dispatch_async(barrier_q
, ^{
719 _dispatch_operation_enqueue(op
, DOP_DIR_READ
,
720 dispatch_data_empty
);
723 _dispatch_release(channel
);
724 _dispatch_release(queue
);
729 dispatch_io_write(dispatch_io_t channel
, off_t offset
, dispatch_data_t data
,
730 dispatch_queue_t queue
, dispatch_io_handler_t handler
)
732 _dispatch_io_data_retain(data
);
733 _dispatch_retain(channel
);
734 _dispatch_retain(queue
);
735 dispatch_async(channel
->queue
, ^{
736 dispatch_operation_t op
;
737 op
= _dispatch_operation_create(DOP_DIR_WRITE
, channel
, offset
,
738 dispatch_data_get_size(data
), data
, queue
, handler
);
740 dispatch_queue_t barrier_q
= channel
->barrier_queue
;
741 dispatch_async(barrier_q
, ^{
742 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
);
743 _dispatch_io_data_release(data
);
746 _dispatch_io_data_release(data
);
748 _dispatch_release(channel
);
749 _dispatch_release(queue
);
754 dispatch_read(dispatch_fd_t fd
, size_t length
, dispatch_queue_t queue
,
755 void (^handler
)(dispatch_data_t
, int))
757 _dispatch_retain(queue
);
758 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
761 int err
= fd_entry
->err
;
762 dispatch_async(queue
, ^{
763 _dispatch_io_debug("convenience handler invoke", fd
);
764 handler(dispatch_data_empty
, err
);
766 _dispatch_release(queue
);
769 // Safe to access fd_entry on barrier queue
770 dispatch_io_t channel
= fd_entry
->convenience_channel
;
772 channel
= _dispatch_io_create(DISPATCH_IO_STREAM
);
774 channel
->fd_actual
= fd
;
775 channel
->fd_entry
= fd_entry
;
776 dispatch_retain(fd_entry
->barrier_queue
);
777 dispatch_retain(fd_entry
->barrier_group
);
778 channel
->barrier_queue
= fd_entry
->barrier_queue
;
779 channel
->barrier_group
= fd_entry
->barrier_group
;
780 fd_entry
->convenience_channel
= channel
;
782 __block dispatch_data_t deliver_data
= dispatch_data_empty
;
784 dispatch_async(fd_entry
->close_queue
, ^{
785 dispatch_async(queue
, ^{
786 _dispatch_io_debug("convenience handler invoke", fd
);
787 handler(deliver_data
, err
);
788 _dispatch_io_data_release(deliver_data
);
790 _dispatch_release(queue
);
792 dispatch_operation_t op
=
793 _dispatch_operation_create(DOP_DIR_READ
, channel
, 0,
794 length
, dispatch_data_empty
,
795 _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT
,
796 false), ^(bool done
, dispatch_data_t data
, int error
) {
798 data
= dispatch_data_create_concat(deliver_data
, data
);
799 _dispatch_io_data_release(deliver_data
);
807 _dispatch_operation_enqueue(op
, DOP_DIR_READ
, dispatch_data_empty
);
813 dispatch_write(dispatch_fd_t fd
, dispatch_data_t data
, dispatch_queue_t queue
,
814 void (^handler
)(dispatch_data_t
, int))
816 _dispatch_io_data_retain(data
);
817 _dispatch_retain(queue
);
818 _dispatch_fd_entry_init_async(fd
, ^(dispatch_fd_entry_t fd_entry
) {
821 int err
= fd_entry
->err
;
822 dispatch_async(queue
, ^{
823 _dispatch_io_debug("convenience handler invoke", fd
);
826 _dispatch_release(queue
);
829 // Safe to access fd_entry on barrier queue
830 dispatch_io_t channel
= fd_entry
->convenience_channel
;
832 channel
= _dispatch_io_create(DISPATCH_IO_STREAM
);
834 channel
->fd_actual
= fd
;
835 channel
->fd_entry
= fd_entry
;
836 dispatch_retain(fd_entry
->barrier_queue
);
837 dispatch_retain(fd_entry
->barrier_group
);
838 channel
->barrier_queue
= fd_entry
->barrier_queue
;
839 channel
->barrier_group
= fd_entry
->barrier_group
;
840 fd_entry
->convenience_channel
= channel
;
842 __block dispatch_data_t deliver_data
= NULL
;
844 dispatch_async(fd_entry
->close_queue
, ^{
845 dispatch_async(queue
, ^{
846 _dispatch_io_debug("convenience handler invoke", fd
);
847 handler(deliver_data
, err
);
849 _dispatch_io_data_release(deliver_data
);
852 _dispatch_release(queue
);
854 dispatch_operation_t op
=
855 _dispatch_operation_create(DOP_DIR_WRITE
, channel
, 0,
856 dispatch_data_get_size(data
), data
,
857 _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT
,
858 false), ^(bool done
, dispatch_data_t d
, int error
) {
861 _dispatch_io_data_retain(d
);
868 _dispatch_operation_enqueue(op
, DOP_DIR_WRITE
, data
);
870 _dispatch_io_data_release(data
);
875 #pragma mark dispatch_operation_t
877 static dispatch_operation_t
878 _dispatch_operation_create(dispatch_op_direction_t direction
,
879 dispatch_io_t channel
, off_t offset
, size_t length
,
880 dispatch_data_t data
, dispatch_queue_t queue
,
881 dispatch_io_handler_t handler
)
884 dispatch_assert(direction
< DOP_DIR_MAX
);
885 _dispatch_io_debug("operation create", channel
->fd
);
886 #if DISPATCH_IO_DEBUG
887 int fd
= channel
->fd
;
889 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
890 // that can only be NULL if atomic_flags are set rdar://problem/8362514
891 int err
= _dispatch_io_get_error(NULL
, channel
, false);
892 if (err
|| !length
) {
893 _dispatch_io_data_retain(data
);
894 _dispatch_retain(queue
);
895 dispatch_async(channel
->barrier_queue
, ^{
896 dispatch_async(queue
, ^{
897 dispatch_data_t d
= data
;
898 if (direction
== DOP_DIR_READ
&& err
) {
900 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
903 _dispatch_io_debug("IO handler invoke", fd
);
904 handler(true, d
, err
);
905 _dispatch_io_data_release(data
);
907 _dispatch_release(queue
);
911 dispatch_operation_t op
;
912 op
= calloc(1ul, sizeof(struct dispatch_operation_s
));
913 op
->do_vtable
= &_dispatch_operation_vtable
;
914 op
->do_next
= DISPATCH_OBJECT_LISTLESS
;
916 op
->do_xref_cnt
= 0; // operation object is not exposed externally
917 op
->op_q
= dispatch_queue_create("com.apple.libdispatch-io.opq", NULL
);
918 op
->op_q
->do_targetq
= queue
;
919 _dispatch_retain(queue
);
921 op
->direction
= direction
;
922 op
->offset
= offset
+ channel
->f_ptr
;
924 op
->handler
= Block_copy(handler
);
925 _dispatch_retain(channel
);
926 op
->channel
= channel
;
927 op
->params
= channel
->params
;
928 // Take a snapshot of the priority of the channel queue. The actual I/O
929 // for this operation will be performed at this priority
930 dispatch_queue_t targetq
= op
->channel
->do_targetq
;
931 while (fastpath(targetq
->do_targetq
)) {
932 targetq
= targetq
->do_targetq
;
934 op
->do_targetq
= targetq
;
939 _dispatch_operation_dispose(dispatch_operation_t op
)
941 // Deliver the data if there's any
943 _dispatch_operation_deliver_data(op
, DOP_DONE
);
944 dispatch_group_leave(op
->fd_entry
->barrier_group
);
945 _dispatch_fd_entry_release(op
->fd_entry
);
948 _dispatch_release(op
->channel
);
951 dispatch_release(op
->timer
);
953 // For write operations, op->buf is owned by op->buf_data
954 if (op
->buf
&& op
->direction
== DOP_DIR_READ
) {
958 _dispatch_io_data_release(op
->buf_data
);
961 _dispatch_io_data_release(op
->data
);
964 dispatch_release(op
->op_q
);
966 Block_release(op
->handler
);
967 _dispatch_dispose(op
);
971 _dispatch_operation_enqueue(dispatch_operation_t op
,
972 dispatch_op_direction_t direction
, dispatch_data_t data
)
974 // Called from the barrier queue
975 _dispatch_io_data_retain(data
);
976 // If channel is closed or stopped, then call the handler immediately
977 int err
= _dispatch_io_get_error(NULL
, op
->channel
, false);
979 dispatch_io_handler_t handler
= op
->handler
;
980 dispatch_async(op
->op_q
, ^{
981 dispatch_data_t d
= data
;
982 if (direction
== DOP_DIR_READ
&& err
) {
984 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
987 handler(true, d
, err
);
988 _dispatch_io_data_release(data
);
990 _dispatch_release(op
);
993 // Finish operation init
994 op
->fd_entry
= op
->channel
->fd_entry
;
995 _dispatch_fd_entry_retain(op
->fd_entry
);
996 dispatch_group_enter(op
->fd_entry
->barrier_group
);
997 dispatch_disk_t disk
= op
->fd_entry
->disk
;
999 dispatch_stream_t stream
= op
->fd_entry
->streams
[direction
];
1000 dispatch_async(stream
->dq
, ^{
1001 _dispatch_stream_enqueue_operation(stream
, op
, data
);
1002 _dispatch_io_data_release(data
);
1005 dispatch_async(disk
->pick_queue
, ^{
1006 _dispatch_disk_enqueue_operation(disk
, op
, data
);
1007 _dispatch_io_data_release(data
);
1013 _dispatch_operation_should_enqueue(dispatch_operation_t op
,
1014 dispatch_queue_t tq
, dispatch_data_t data
)
1016 // On stream queue or disk queue
1017 _dispatch_io_debug("enqueue operation", op
->fd_entry
->fd
);
1018 _dispatch_io_data_retain(data
);
1020 int err
= _dispatch_io_get_error(op
, NULL
, true);
1024 _dispatch_release(op
);
1027 if (op
->params
.interval
) {
1028 dispatch_resume(_dispatch_operation_timer(tq
, op
));
1033 static dispatch_source_t
1034 _dispatch_operation_timer(dispatch_queue_t tq
, dispatch_operation_t op
)
1036 // On stream queue or pick queue
1040 dispatch_source_t timer
= dispatch_source_create(
1041 DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, tq
);
1042 dispatch_source_set_timer(timer
, dispatch_time(DISPATCH_TIME_NOW
,
1043 op
->params
.interval
), op
->params
.interval
, 0);
1044 dispatch_source_set_event_handler(timer
, ^{
1045 // On stream queue or pick queue
1046 if (dispatch_source_testcancel(timer
)) {
1047 // Do nothing. The operation has already completed
1050 dispatch_op_flags_t flags
= DOP_DEFAULT
;
1051 if (op
->params
.interval_flags
& DISPATCH_IO_STRICT_INTERVAL
) {
1052 // Deliver even if there is less data than the low-water mark
1053 flags
|= DOP_DELIVER
;
1055 // If the operation is active, dont deliver data
1056 if ((op
->active
) && (flags
& DOP_DELIVER
)) {
1059 _dispatch_operation_deliver_data(op
, flags
);
1067 #pragma mark dispatch_fd_entry_t
1070 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry
) {
1071 dispatch_suspend(fd_entry
->close_queue
);
1075 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry
) {
1076 dispatch_resume(fd_entry
->close_queue
);
1080 _dispatch_fd_entry_init_async(dispatch_fd_t fd
,
1081 dispatch_fd_entry_init_callback_t completion_callback
)
1083 static dispatch_once_t _dispatch_io_fds_lockq_pred
;
1084 dispatch_once_f(&_dispatch_io_fds_lockq_pred
, NULL
,
1085 _dispatch_io_fds_lockq_init
);
1086 dispatch_async(_dispatch_io_fds_lockq
, ^{
1087 _dispatch_io_debug("fd entry init", fd
);
1088 dispatch_fd_entry_t fd_entry
= NULL
;
1089 // Check to see if there is an existing entry for the given fd
1090 uintptr_t hash
= DIO_HASH(fd
);
1091 TAILQ_FOREACH(fd_entry
, &_dispatch_io_fds
[hash
], fd_list
) {
1092 if (fd_entry
->fd
== fd
) {
1093 // Retain the fd_entry to ensure it cannot go away until the
1094 // stat() has completed
1095 _dispatch_fd_entry_retain(fd_entry
);
1100 // If we did not find an existing entry, create one
1101 fd_entry
= _dispatch_fd_entry_create_with_fd(fd
, hash
);
1103 dispatch_async(fd_entry
->barrier_queue
, ^{
1104 _dispatch_io_debug("fd entry init completion", fd
);
1105 completion_callback(fd_entry
);
1106 // stat() is complete, release reference to fd_entry
1107 _dispatch_fd_entry_release(fd_entry
);
1112 static dispatch_fd_entry_t
1113 _dispatch_fd_entry_create(dispatch_queue_t q
)
1115 dispatch_fd_entry_t fd_entry
;
1116 fd_entry
= calloc(1ul, sizeof(struct dispatch_fd_entry_s
));
1117 fd_entry
->close_queue
= dispatch_queue_create(
1118 "com.apple.libdispatch-io.closeq", NULL
);
1119 // Use target queue to ensure that no concurrent lookups are going on when
1120 // the close queue is running
1121 fd_entry
->close_queue
->do_targetq
= q
;
1122 _dispatch_retain(q
);
1123 // Suspend the cleanup queue until closing
1124 _dispatch_fd_entry_retain(fd_entry
);
1128 static dispatch_fd_entry_t
1129 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd
, uintptr_t hash
)
1131 // On fds lock queue
1132 _dispatch_io_debug("fd entry create", fd
);
1133 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create(
1134 _dispatch_io_fds_lockq
);
1136 TAILQ_INSERT_TAIL(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
);
1137 fd_entry
->barrier_queue
= dispatch_queue_create(
1138 "com.apple.libdispatch-io.barrierq", NULL
);
1139 fd_entry
->barrier_group
= dispatch_group_create();
1140 dispatch_async(fd_entry
->barrier_queue
, ^{
1141 _dispatch_io_debug("fd entry stat", fd
);
1142 int err
, orig_flags
, orig_nosigpipe
= -1;
1144 _dispatch_io_syscall_switch(err
,
1146 default: fd_entry
->err
= err
; return;
1148 fd_entry
->stat
.dev
= st
.st_dev
;
1149 fd_entry
->stat
.mode
= st
.st_mode
;
1150 _dispatch_io_syscall_switch(err
,
1151 orig_flags
= fcntl(fd
, F_GETFL
),
1152 default: (void)dispatch_assume_zero(err
); break;
1154 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1155 if (S_ISFIFO(st
.st_mode
)) {
1156 _dispatch_io_syscall_switch(err
,
1157 orig_nosigpipe
= fcntl(fd
, F_GETNOSIGPIPE
),
1158 default: (void)dispatch_assume_zero(err
); break;
1160 if (orig_nosigpipe
!= -1) {
1161 _dispatch_io_syscall_switch(err
,
1162 orig_nosigpipe
= fcntl(fd
, F_SETNOSIGPIPE
, 1),
1164 orig_nosigpipe
= -1;
1165 (void)dispatch_assume_zero(err
);
1171 if (S_ISREG(st
.st_mode
)) {
1172 if (orig_flags
!= -1) {
1173 _dispatch_io_syscall_switch(err
,
1174 fcntl(fd
, F_SETFL
, orig_flags
& ~O_NONBLOCK
),
1177 (void)dispatch_assume_zero(err
);
1181 int32_t dev
= major(st
.st_dev
);
1182 // We have to get the disk on the global dev queue. The
1183 // barrier queue cannot continue until that is complete
1184 dispatch_suspend(fd_entry
->barrier_queue
);
1185 dispatch_once_f(&_dispatch_io_devs_lockq_pred
, NULL
,
1186 _dispatch_io_devs_lockq_init
);
1187 dispatch_async(_dispatch_io_devs_lockq
, ^{
1188 _dispatch_disk_init(fd_entry
, dev
);
1189 dispatch_resume(fd_entry
->barrier_queue
);
1192 if (orig_flags
!= -1) {
1193 _dispatch_io_syscall_switch(err
,
1194 fcntl(fd
, F_SETFL
, orig_flags
| O_NONBLOCK
),
1197 (void)dispatch_assume_zero(err
);
1201 _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue(
1202 DISPATCH_QUEUE_PRIORITY_DEFAULT
, false));
1204 fd_entry
->orig_flags
= orig_flags
;
1205 fd_entry
->orig_nosigpipe
= orig_nosigpipe
;
1207 // This is the first item run when the close queue is resumed, indicating
1208 // that all channels associated with this entry have been closed and that
1209 // all operations associated with this entry have been freed
1210 dispatch_async(fd_entry
->close_queue
, ^{
1211 if (!fd_entry
->disk
) {
1212 _dispatch_io_debug("close queue fd_entry cleanup", fd
);
1213 dispatch_op_direction_t dir
;
1214 for (dir
= 0; dir
< DOP_DIR_MAX
; dir
++) {
1215 _dispatch_stream_dispose(fd_entry
, dir
);
1218 dispatch_disk_t disk
= fd_entry
->disk
;
1219 dispatch_async(_dispatch_io_devs_lockq
, ^{
1220 _dispatch_release(disk
);
1223 // Remove this entry from the global fd list
1224 TAILQ_REMOVE(&_dispatch_io_fds
[hash
], fd_entry
, fd_list
);
1226 // If there was a source associated with this stream, disposing of the
1227 // source cancels it and suspends the close queue. Freeing the fd_entry
1228 // structure must happen after the source cancel handler has finished
1229 dispatch_async(fd_entry
->close_queue
, ^{
1230 _dispatch_io_debug("close queue release", fd
);
1231 dispatch_release(fd_entry
->close_queue
);
1232 _dispatch_io_debug("barrier queue release", fd
);
1233 dispatch_release(fd_entry
->barrier_queue
);
1234 _dispatch_io_debug("barrier group release", fd
);
1235 dispatch_release(fd_entry
->barrier_group
);
1236 if (fd_entry
->orig_flags
!= -1) {
1237 _dispatch_io_syscall(
1238 fcntl(fd
, F_SETFL
, fd_entry
->orig_flags
)
1241 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1242 if (fd_entry
->orig_nosigpipe
!= -1) {
1243 _dispatch_io_syscall(
1244 fcntl(fd
, F_SETNOSIGPIPE
, fd_entry
->orig_nosigpipe
)
1248 if (fd_entry
->convenience_channel
) {
1249 fd_entry
->convenience_channel
->fd_entry
= NULL
;
1250 dispatch_release(fd_entry
->convenience_channel
);
1257 static dispatch_fd_entry_t
1258 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data
,
1259 dev_t dev
, mode_t mode
)
1261 // On devs lock queue
1262 _dispatch_io_debug("fd entry create with path %s", -1, path_data
->path
);
1263 dispatch_fd_entry_t fd_entry
= _dispatch_fd_entry_create(
1264 path_data
->channel
->queue
);
1265 if (S_ISREG(mode
)) {
1266 _dispatch_disk_init(fd_entry
, major(dev
));
1268 _dispatch_stream_init(fd_entry
, _dispatch_get_root_queue(
1269 DISPATCH_QUEUE_PRIORITY_DEFAULT
, false));
1272 fd_entry
->orig_flags
= -1;
1273 fd_entry
->path_data
= path_data
;
1274 fd_entry
->stat
.dev
= dev
;
1275 fd_entry
->stat
.mode
= mode
;
1276 fd_entry
->barrier_queue
= dispatch_queue_create(
1277 "com.apple.libdispatch-io.barrierq", NULL
);
1278 fd_entry
->barrier_group
= dispatch_group_create();
1279 // This is the first item run when the close queue is resumed, indicating
1280 // that the channel associated with this entry has been closed and that
1281 // all operations associated with this entry have been freed
1282 dispatch_async(fd_entry
->close_queue
, ^{
1283 _dispatch_io_debug("close queue fd_entry cleanup", -1);
1284 if (!fd_entry
->disk
) {
1285 dispatch_op_direction_t dir
;
1286 for (dir
= 0; dir
< DOP_DIR_MAX
; dir
++) {
1287 _dispatch_stream_dispose(fd_entry
, dir
);
1290 if (fd_entry
->fd
!= -1) {
1291 close(fd_entry
->fd
);
1293 if (fd_entry
->path_data
->channel
) {
1294 // If associated channel has not been released yet, mark it as
1295 // no longer having an fd_entry (for stop after close).
1296 // It is safe to modify channel since we are on close_queue with
1297 // target queue the channel queue
1298 fd_entry
->path_data
->channel
->fd_entry
= NULL
;
1301 dispatch_async(fd_entry
->close_queue
, ^{
1302 _dispatch_io_debug("close queue release", -1);
1303 dispatch_release(fd_entry
->close_queue
);
1304 dispatch_release(fd_entry
->barrier_queue
);
1305 dispatch_release(fd_entry
->barrier_group
);
1306 free(fd_entry
->path_data
);
1313 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry
, dispatch_io_t channel
)
1315 if (!(fd_entry
->fd
== -1 && fd_entry
->path_data
)) {
1318 if (fd_entry
->err
) {
1319 return fd_entry
->err
;
1322 int oflag
= fd_entry
->disk
? fd_entry
->path_data
->oflag
& ~O_NONBLOCK
:
1323 fd_entry
->path_data
->oflag
| O_NONBLOCK
;
1325 fd
= open(fd_entry
->path_data
->path
, oflag
, fd_entry
->path_data
->mode
);
1331 (void)dispatch_atomic_cmpxchg2o(fd_entry
, err
, 0, err
);
1334 if (!dispatch_atomic_cmpxchg2o(fd_entry
, fd
, -1, fd
)) {
1335 // Lost the race with another open
1338 channel
->fd_actual
= fd
;
1344 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry
,
1345 dispatch_io_t channel
)
1347 if (fd_entry
->disk
) {
1349 _dispatch_retain(channel
);
1351 _dispatch_fd_entry_retain(fd_entry
);
1352 dispatch_async(fd_entry
->disk
->pick_queue
, ^{
1353 _dispatch_disk_cleanup_operations(fd_entry
->disk
, channel
);
1354 _dispatch_fd_entry_release(fd_entry
);
1356 _dispatch_release(channel
);
1360 dispatch_op_direction_t direction
;
1361 for (direction
= 0; direction
< DOP_DIR_MAX
; direction
++) {
1362 dispatch_stream_t stream
= fd_entry
->streams
[direction
];
1367 _dispatch_retain(channel
);
1369 _dispatch_fd_entry_retain(fd_entry
);
1370 dispatch_async(stream
->dq
, ^{
1371 _dispatch_stream_cleanup_operations(stream
, channel
);
1372 _dispatch_fd_entry_release(fd_entry
);
1374 _dispatch_release(channel
);
1382 #pragma mark dispatch_stream_t/dispatch_disk_t
1385 _dispatch_stream_init(dispatch_fd_entry_t fd_entry
, dispatch_queue_t tq
)
1387 dispatch_op_direction_t direction
;
1388 for (direction
= 0; direction
< DOP_DIR_MAX
; direction
++) {
1389 dispatch_stream_t stream
;
1390 stream
= calloc(1ul, sizeof(struct dispatch_stream_s
));
1391 stream
->dq
= dispatch_queue_create("com.apple.libdispatch-io.streamq",
1393 _dispatch_retain(tq
);
1394 stream
->dq
->do_targetq
= tq
;
1395 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1396 TAILQ_INIT(&stream
->operations
[DISPATCH_IO_STREAM
]);
1397 fd_entry
->streams
[direction
] = stream
;
1402 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry
,
1403 dispatch_op_direction_t direction
)
1406 dispatch_stream_t stream
= fd_entry
->streams
[direction
];
1410 dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
]));
1411 dispatch_assert(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
]));
1412 if (stream
->source
) {
1413 // Balanced by source cancel handler:
1414 _dispatch_fd_entry_retain(fd_entry
);
1415 dispatch_source_cancel(stream
->source
);
1416 dispatch_resume(stream
->source
);
1417 dispatch_release(stream
->source
);
1419 dispatch_release(stream
->dq
);
1424 _dispatch_disk_init(dispatch_fd_entry_t fd_entry
, dev_t dev
)
1426 // On devs lock queue
1427 dispatch_disk_t disk
;
1428 char label_name
[256];
1429 // Check to see if there is an existing entry for the given device
1430 uintptr_t hash
= DIO_HASH(dev
);
1431 TAILQ_FOREACH(disk
, &_dispatch_io_devs
[hash
], disk_list
) {
1432 if (disk
->dev
== dev
) {
1433 _dispatch_retain(disk
);
1437 // Otherwise create a new entry
1438 size_t pending_reqs_depth
= dispatch_io_defaults
.max_pending_io_reqs
;
1439 disk
= calloc(1ul, sizeof(struct dispatch_disk_s
) + (pending_reqs_depth
*
1440 sizeof(dispatch_operation_t
)));
1441 disk
->do_vtable
= &_dispatch_disk_vtable
;
1442 disk
->do_next
= DISPATCH_OBJECT_LISTLESS
;
1443 disk
->do_ref_cnt
= 1;
1444 disk
->do_xref_cnt
= 0;
1445 disk
->advise_list_depth
= pending_reqs_depth
;
1446 disk
->do_targetq
= _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT
,
1449 TAILQ_INIT(&disk
->operations
);
1450 disk
->cur_rq
= TAILQ_FIRST(&disk
->operations
);
1451 sprintf(label_name
, "com.apple.libdispatch-io.deviceq.%d", dev
);
1452 disk
->pick_queue
= dispatch_queue_create(label_name
, NULL
);
1453 TAILQ_INSERT_TAIL(&_dispatch_io_devs
[hash
], disk
, disk_list
);
1455 fd_entry
->disk
= disk
;
1456 TAILQ_INIT(&fd_entry
->stream_ops
);
1460 _dispatch_disk_dispose(dispatch_disk_t disk
)
1462 uintptr_t hash
= DIO_HASH(disk
->dev
);
1463 TAILQ_REMOVE(&_dispatch_io_devs
[hash
], disk
, disk_list
);
1464 dispatch_assert(TAILQ_EMPTY(&disk
->operations
));
1466 for (i
=0; i
<disk
->advise_list_depth
; ++i
) {
1467 dispatch_assert(!disk
->advise_list
[i
]);
1469 dispatch_release(disk
->pick_queue
);
1474 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1477 _dispatch_stream_operation_avail(dispatch_stream_t stream
)
1479 return !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) ||
1480 !(TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
]));
1484 _dispatch_stream_enqueue_operation(dispatch_stream_t stream
,
1485 dispatch_operation_t op
, dispatch_data_t data
)
1487 if (!_dispatch_operation_should_enqueue(op
, stream
->dq
, data
)) {
1490 bool no_ops
= !_dispatch_stream_operation_avail(stream
);
1491 TAILQ_INSERT_TAIL(&stream
->operations
[op
->params
.type
], op
, operation_list
);
1493 dispatch_async_f(stream
->dq
, stream
, _dispatch_stream_handler
);
1498 _dispatch_disk_enqueue_operation(dispatch_disk_t disk
, dispatch_operation_t op
,
1499 dispatch_data_t data
)
1501 if (!_dispatch_operation_should_enqueue(op
, disk
->pick_queue
, data
)) {
1504 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1505 if (TAILQ_EMPTY(&op
->fd_entry
->stream_ops
)) {
1506 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
);
1508 TAILQ_INSERT_TAIL(&op
->fd_entry
->stream_ops
, op
, stream_list
);
1510 TAILQ_INSERT_TAIL(&disk
->operations
, op
, operation_list
);
1512 _dispatch_disk_handler(disk
);
1516 _dispatch_stream_complete_operation(dispatch_stream_t stream
,
1517 dispatch_operation_t op
)
1520 _dispatch_io_debug("complete operation", op
->fd_entry
->fd
);
1521 TAILQ_REMOVE(&stream
->operations
[op
->params
.type
], op
, operation_list
);
1522 if (op
== stream
->op
) {
1526 dispatch_source_cancel(op
->timer
);
1528 // Final release will deliver any pending data
1529 _dispatch_release(op
);
1533 _dispatch_disk_complete_operation(dispatch_disk_t disk
, dispatch_operation_t op
)
1536 _dispatch_io_debug("complete operation", op
->fd_entry
->fd
);
1537 // Current request is always the last op returned
1538 if (disk
->cur_rq
== op
) {
1539 disk
->cur_rq
= TAILQ_PREV(op
, dispatch_disk_operations_s
,
1542 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1543 // Check if there are other pending stream operations behind it
1544 dispatch_operation_t op_next
= TAILQ_NEXT(op
, stream_list
);
1545 TAILQ_REMOVE(&op
->fd_entry
->stream_ops
, op
, stream_list
);
1547 TAILQ_INSERT_TAIL(&disk
->operations
, op_next
, operation_list
);
1550 TAILQ_REMOVE(&disk
->operations
, op
, operation_list
);
1552 dispatch_source_cancel(op
->timer
);
1554 // Final release will deliver any pending data
1555 _dispatch_release(op
);
1558 static dispatch_operation_t
1559 _dispatch_stream_pick_next_operation(dispatch_stream_t stream
,
1560 dispatch_operation_t op
)
1564 // On the first run through, pick the first operation
1565 if (!_dispatch_stream_operation_avail(stream
)) {
1568 if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_STREAM
])) {
1569 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_STREAM
]);
1570 } else if (!TAILQ_EMPTY(&stream
->operations
[DISPATCH_IO_RANDOM
])) {
1571 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1575 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1576 // Stream operations need to be serialized so continue the current
1577 // operation until it is finished
1580 // Get the next random operation (round-robin)
1581 if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
1582 op
= TAILQ_NEXT(op
, operation_list
);
1584 op
= TAILQ_FIRST(&stream
->operations
[DISPATCH_IO_RANDOM
]);
1591 static dispatch_operation_t
1592 _dispatch_disk_pick_next_operation(dispatch_disk_t disk
)
1595 dispatch_operation_t op
;
1596 if (!TAILQ_EMPTY(&disk
->operations
)) {
1597 if (disk
->cur_rq
== NULL
) {
1598 op
= TAILQ_FIRST(&disk
->operations
);
1602 op
= TAILQ_NEXT(op
, operation_list
);
1604 op
= TAILQ_FIRST(&disk
->operations
);
1606 // TODO: more involved picking algorithm rdar://problem/8780312
1607 } while (op
->active
&& op
!= disk
->cur_rq
);
1618 _dispatch_stream_cleanup_operations(dispatch_stream_t stream
,
1619 dispatch_io_t channel
)
1622 dispatch_operation_t op
, tmp
;
1623 typeof(*stream
->operations
) *operations
;
1624 operations
= &stream
->operations
[DISPATCH_IO_RANDOM
];
1625 TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) {
1626 if (!channel
|| op
->channel
== channel
) {
1627 _dispatch_stream_complete_operation(stream
, op
);
1630 operations
= &stream
->operations
[DISPATCH_IO_STREAM
];
1631 TAILQ_FOREACH_SAFE(op
, operations
, operation_list
, tmp
) {
1632 if (!channel
|| op
->channel
== channel
) {
1633 _dispatch_stream_complete_operation(stream
, op
);
1636 if (stream
->source_running
&& !_dispatch_stream_operation_avail(stream
)) {
1637 dispatch_suspend(stream
->source
);
1638 stream
->source_running
= false;
1643 _dispatch_disk_cleanup_operations(dispatch_disk_t disk
, dispatch_io_t channel
)
1646 dispatch_operation_t op
, tmp
;
1647 TAILQ_FOREACH_SAFE(op
, &disk
->operations
, operation_list
, tmp
) {
1648 if (!channel
|| op
->channel
== channel
) {
1649 _dispatch_disk_complete_operation(disk
, op
);
1655 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1657 static dispatch_source_t
1658 _dispatch_stream_source(dispatch_stream_t stream
, dispatch_operation_t op
)
1661 if (stream
->source
) {
1662 return stream
->source
;
1664 dispatch_fd_t fd
= op
->fd_entry
->fd
;
1665 _dispatch_io_debug("stream source create", fd
);
1666 dispatch_source_t source
= NULL
;
1667 if (op
->direction
== DOP_DIR_READ
) {
1668 source
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, fd
, 0,
1670 } else if (op
->direction
== DOP_DIR_WRITE
) {
1671 source
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
, fd
, 0,
1674 dispatch_assert(op
->direction
< DOP_DIR_MAX
);
1677 dispatch_set_context(source
, stream
);
1678 dispatch_source_set_event_handler_f(source
,
1679 _dispatch_stream_source_handler
);
1680 // Close queue must not run user cleanup handlers until sources are fully
1682 dispatch_queue_t close_queue
= op
->fd_entry
->close_queue
;
1683 dispatch_source_set_cancel_handler(source
, ^{
1684 _dispatch_io_debug("stream source cancel", fd
);
1685 dispatch_resume(close_queue
);
1687 stream
->source
= source
;
1688 return stream
->source
;
1692 _dispatch_stream_source_handler(void *ctx
)
1695 dispatch_stream_t stream
= (dispatch_stream_t
)ctx
;
1696 dispatch_suspend(stream
->source
);
1697 stream
->source_running
= false;
1698 return _dispatch_stream_handler(stream
);
1702 _dispatch_stream_handler(void *ctx
)
1705 dispatch_stream_t stream
= (dispatch_stream_t
)ctx
;
1706 dispatch_operation_t op
;
1708 op
= _dispatch_stream_pick_next_operation(stream
, stream
->op
);
1710 _dispatch_debug("no operation found: stream %p", stream
);
1713 int err
= _dispatch_io_get_error(op
, NULL
, true);
1716 _dispatch_stream_complete_operation(stream
, op
);
1720 _dispatch_io_debug("stream handler", op
->fd_entry
->fd
);
1721 dispatch_fd_entry_t fd_entry
= op
->fd_entry
;
1722 _dispatch_fd_entry_retain(fd_entry
);
1723 // For performance analysis
1724 if (!op
->total
&& dispatch_io_defaults
.initial_delivery
) {
1725 // Empty delivery to signal the start of the operation
1726 _dispatch_io_debug("initial delivery", op
->fd_entry
->fd
);
1727 _dispatch_operation_deliver_data(op
, DOP_DELIVER
);
1729 // TODO: perform on the operation target queue to get correct priority
1730 int result
= _dispatch_operation_perform(op
), flags
= -1;
1732 case DISPATCH_OP_DELIVER
:
1733 flags
= DOP_DEFAULT
;
1735 case DISPATCH_OP_DELIVER_AND_COMPLETE
:
1736 flags
= (flags
!= DOP_DEFAULT
) ? DOP_DELIVER
| DOP_NO_EMPTY
:
1738 _dispatch_operation_deliver_data(op
, flags
);
1740 case DISPATCH_OP_COMPLETE
:
1741 if (flags
!= DOP_DEFAULT
) {
1742 _dispatch_stream_complete_operation(stream
, op
);
1744 if (_dispatch_stream_operation_avail(stream
)) {
1745 dispatch_async_f(stream
->dq
, stream
, _dispatch_stream_handler
);
1748 case DISPATCH_OP_COMPLETE_RESUME
:
1749 _dispatch_stream_complete_operation(stream
, op
);
1751 case DISPATCH_OP_RESUME
:
1752 if (_dispatch_stream_operation_avail(stream
)) {
1753 stream
->source_running
= true;
1754 dispatch_resume(_dispatch_stream_source(stream
, op
));
1757 case DISPATCH_OP_ERR
:
1758 _dispatch_stream_cleanup_operations(stream
, op
->channel
);
1760 case DISPATCH_OP_FD_ERR
:
1761 _dispatch_fd_entry_retain(fd_entry
);
1762 dispatch_async(fd_entry
->barrier_queue
, ^{
1763 _dispatch_fd_entry_cleanup_operations(fd_entry
, NULL
);
1764 _dispatch_fd_entry_release(fd_entry
);
1770 _dispatch_fd_entry_release(fd_entry
);
1775 _dispatch_disk_handler(void *ctx
)
1778 dispatch_disk_t disk
= (dispatch_disk_t
)ctx
;
1779 if (disk
->io_active
) {
1782 _dispatch_io_debug("disk handler", -1);
1783 dispatch_operation_t op
;
1784 size_t i
= disk
->free_idx
, j
= disk
->req_idx
;
1786 j
+= disk
->advise_list_depth
;
1789 if ((!disk
->advise_list
[i%disk
->advise_list_depth
]) &&
1790 (op
= _dispatch_disk_pick_next_operation(disk
))) {
1791 int err
= _dispatch_io_get_error(op
, NULL
, true);
1794 _dispatch_disk_complete_operation(disk
, op
);
1797 _dispatch_retain(op
);
1798 disk
->advise_list
[i%disk
->advise_list_depth
] = op
;
1801 // No more operations to get
1806 disk
->free_idx
= (i%disk
->advise_list_depth
);
1807 op
= disk
->advise_list
[disk
->req_idx
];
1809 disk
->io_active
= true;
1810 dispatch_async_f(op
->do_targetq
, disk
, _dispatch_disk_perform
);
1815 _dispatch_disk_perform(void *ctxt
)
1817 dispatch_disk_t disk
= ctxt
;
1818 size_t chunk_size
= dispatch_io_defaults
.chunk_pages
* PAGE_SIZE
;
1819 _dispatch_io_debug("disk perform", -1);
1820 dispatch_operation_t op
;
1821 size_t i
= disk
->advise_idx
, j
= disk
->free_idx
;
1823 j
+= disk
->advise_list_depth
;
1826 op
= disk
->advise_list
[i%disk
->advise_list_depth
];
1828 // Nothing more to advise, must be at free_idx
1829 dispatch_assert(i%disk
->advise_list_depth
== disk
->free_idx
);
1832 if (op
->direction
== DOP_DIR_WRITE
) {
1833 // TODO: preallocate writes ? rdar://problem/9032172
1836 if (op
->fd_entry
->fd
== -1 && _dispatch_fd_entry_open(op
->fd_entry
,
1840 // For performance analysis
1841 if (!op
->total
&& dispatch_io_defaults
.initial_delivery
) {
1842 // Empty delivery to signal the start of the operation
1843 _dispatch_io_debug("initial delivery", op
->fd_entry
->fd
);
1844 _dispatch_operation_deliver_data(op
, DOP_DELIVER
);
1846 // Advise two chunks if the list only has one element and this is the
1847 // first advise on the operation
1848 if ((j
-i
) == 1 && !disk
->advise_list
[disk
->free_idx
] &&
1849 !op
->advise_offset
) {
1852 _dispatch_operation_advise(op
, chunk_size
);
1854 disk
->advise_idx
= i%disk
->advise_list_depth
;
1855 op
= disk
->advise_list
[disk
->req_idx
];
1856 int result
= _dispatch_operation_perform(op
);
1857 disk
->advise_list
[disk
->req_idx
] = NULL
;
1858 disk
->req_idx
= (++disk
->req_idx
)%disk
->advise_list_depth
;
1859 dispatch_async(disk
->pick_queue
, ^{
1861 case DISPATCH_OP_DELIVER
:
1862 _dispatch_operation_deliver_data(op
, DOP_DEFAULT
);
1864 case DISPATCH_OP_COMPLETE
:
1865 _dispatch_disk_complete_operation(disk
, op
);
1867 case DISPATCH_OP_DELIVER_AND_COMPLETE
:
1868 _dispatch_operation_deliver_data(op
, DOP_DELIVER
| DOP_NO_EMPTY
);
1869 _dispatch_disk_complete_operation(disk
, op
);
1871 case DISPATCH_OP_ERR
:
1872 _dispatch_disk_cleanup_operations(disk
, op
->channel
);
1874 case DISPATCH_OP_FD_ERR
:
1875 _dispatch_disk_cleanup_operations(disk
, NULL
);
1878 dispatch_assert(result
);
1882 disk
->io_active
= false;
1883 _dispatch_disk_handler(disk
);
1884 // Balancing the retain in _dispatch_disk_handler. Note that op must be
1885 // released at the very end, since it might hold the last reference to
1887 _dispatch_release(op
);
1892 #pragma mark dispatch_operation_perform
1895 _dispatch_operation_advise(dispatch_operation_t op
, size_t chunk_size
)
1898 struct radvisory advise
;
1899 // No point in issuing a read advise for the next chunk if we are already
1900 // a chunk ahead from reading the bytes
1901 if (op
->advise_offset
> (off_t
)((op
->offset
+op
->total
) + chunk_size
+
1905 advise
.ra_count
= (int)chunk_size
;
1906 if (!op
->advise_offset
) {
1907 op
->advise_offset
= op
->offset
;
1908 // If this is the first time through, align the advised range to a
1910 size_t pg_fraction
= (size_t)((op
->offset
+ chunk_size
) % PAGE_SIZE
);
1911 advise
.ra_count
+= (int)(pg_fraction
? PAGE_SIZE
- pg_fraction
: 0);
1913 advise
.ra_offset
= op
->advise_offset
;
1914 op
->advise_offset
+= advise
.ra_count
;
1915 _dispatch_io_syscall_switch(err
,
1916 fcntl(op
->fd_entry
->fd
, F_RDADVISE
, &advise
),
1917 // TODO: set disk status on error
1918 default: (void)dispatch_assume_zero(err
); break;
1923 _dispatch_operation_perform(dispatch_operation_t op
)
1925 int err
= _dispatch_io_get_error(op
, NULL
, true);
1930 size_t max_buf_siz
= op
->params
.high
;
1931 size_t chunk_siz
= dispatch_io_defaults
.chunk_pages
* PAGE_SIZE
;
1932 if (op
->direction
== DOP_DIR_READ
) {
1933 // If necessary, create a buffer for the ongoing operation, large
1934 // enough to fit chunk_pages but at most high-water
1935 size_t data_siz
= dispatch_data_get_size(op
->data
);
1937 dispatch_assert(data_siz
< max_buf_siz
);
1938 max_buf_siz
-= data_siz
;
1940 if (max_buf_siz
> chunk_siz
) {
1941 max_buf_siz
= chunk_siz
;
1943 if (op
->length
< SIZE_MAX
) {
1944 op
->buf_siz
= op
->length
- op
->total
;
1945 if (op
->buf_siz
> max_buf_siz
) {
1946 op
->buf_siz
= max_buf_siz
;
1949 op
->buf_siz
= max_buf_siz
;
1951 op
->buf
= valloc(op
->buf_siz
);
1952 _dispatch_io_debug("buffer allocated", op
->fd_entry
->fd
);
1953 } else if (op
->direction
== DOP_DIR_WRITE
) {
1954 // Always write the first data piece, if that is smaller than a
1955 // chunk, accumulate further data pieces until chunk size is reached
1956 if (chunk_siz
> max_buf_siz
) {
1957 chunk_siz
= max_buf_siz
;
1960 dispatch_data_apply(op
->data
,
1961 ^(dispatch_data_t region DISPATCH_UNUSED
,
1962 size_t offset DISPATCH_UNUSED
,
1963 const void* buf DISPATCH_UNUSED
, size_t len
) {
1964 size_t siz
= op
->buf_siz
+ len
;
1965 if (!op
->buf_siz
|| siz
<= chunk_siz
) {
1968 return (bool)(siz
< chunk_siz
);
1970 if (op
->buf_siz
> max_buf_siz
) {
1971 op
->buf_siz
= max_buf_siz
;
1974 d
= dispatch_data_create_subrange(op
->data
, 0, op
->buf_siz
);
1975 op
->buf_data
= dispatch_data_create_map(d
, (const void**)&op
->buf
,
1977 _dispatch_io_data_release(d
);
1978 _dispatch_io_debug("buffer mapped", op
->fd_entry
->fd
);
1981 if (op
->fd_entry
->fd
== -1) {
1982 err
= _dispatch_fd_entry_open(op
->fd_entry
, op
->channel
);
1987 void *buf
= op
->buf
+ op
->buf_len
;
1988 size_t len
= op
->buf_siz
- op
->buf_len
;
1989 off_t off
= op
->offset
+ op
->total
;
1990 ssize_t processed
= -1;
1992 if (op
->direction
== DOP_DIR_READ
) {
1993 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
1994 processed
= read(op
->fd_entry
->fd
, buf
, len
);
1995 } else if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
1996 processed
= pread(op
->fd_entry
->fd
, buf
, len
, off
);
1998 } else if (op
->direction
== DOP_DIR_WRITE
) {
1999 if (op
->params
.type
== DISPATCH_IO_STREAM
) {
2000 processed
= write(op
->fd_entry
->fd
, buf
, len
);
2001 } else if (op
->params
.type
== DISPATCH_IO_RANDOM
) {
2002 processed
= pwrite(op
->fd_entry
->fd
, buf
, len
, off
);
2005 // Encountered an error on the file descriptor
2006 if (processed
== -1) {
2013 // EOF is indicated by two handler invocations
2014 if (processed
== 0) {
2015 _dispatch_io_debug("EOF", op
->fd_entry
->fd
);
2016 return DISPATCH_OP_DELIVER_AND_COMPLETE
;
2018 op
->buf_len
+= processed
;
2019 op
->total
+= processed
;
2020 if (op
->total
== op
->length
) {
2021 // Finished processing all the bytes requested by the operation
2022 return DISPATCH_OP_COMPLETE
;
2024 // Deliver data only if we satisfy the filters
2025 return DISPATCH_OP_DELIVER
;
2028 if (err
== EAGAIN
) {
2029 // For disk based files with blocking I/O we should never get EAGAIN
2030 dispatch_assert(!op
->fd_entry
->disk
);
2031 _dispatch_io_debug("EAGAIN %d", op
->fd_entry
->fd
, err
);
2032 if (op
->direction
== DOP_DIR_READ
&& op
->total
&&
2033 op
->channel
== op
->fd_entry
->convenience_channel
) {
2034 // Convenience read with available data completes on EAGAIN
2035 return DISPATCH_OP_COMPLETE_RESUME
;
2037 return DISPATCH_OP_RESUME
;
2042 return DISPATCH_OP_ERR
;
2044 (void)dispatch_atomic_cmpxchg2o(op
->fd_entry
, err
, 0, err
);
2045 return DISPATCH_OP_FD_ERR
;
2047 return DISPATCH_OP_COMPLETE
;
2052 _dispatch_operation_deliver_data(dispatch_operation_t op
,
2053 dispatch_op_flags_t flags
)
2055 // Either called from stream resp. pick queue or when op is finalized
2056 dispatch_data_t data
= NULL
;
2058 size_t undelivered
= op
->undelivered
+ op
->buf_len
;
2059 bool deliver
= (flags
& (DOP_DELIVER
|DOP_DONE
)) ||
2060 (op
->flags
& DOP_DELIVER
);
2061 op
->flags
= DOP_DEFAULT
;
2063 // Don't deliver data until low water mark has been reached
2064 if (undelivered
>= op
->params
.low
) {
2066 } else if (op
->buf_len
< op
->buf_siz
) {
2067 // Request buffer is not yet used up
2068 _dispatch_io_debug("buffer data", op
->fd_entry
->fd
);
2073 if (!err
&& (op
->channel
->atomic_flags
& DIO_STOPPED
)) {
2078 // Deliver data or buffer used up
2079 if (op
->direction
== DOP_DIR_READ
) {
2081 void *buf
= op
->buf
;
2082 data
= dispatch_data_create(buf
, op
->buf_len
, NULL
,
2083 DISPATCH_DATA_DESTRUCTOR_FREE
);
2086 dispatch_data_t d
= dispatch_data_create_concat(op
->data
, data
);
2087 _dispatch_io_data_release(op
->data
);
2088 _dispatch_io_data_release(data
);
2093 op
->data
= deliver
? dispatch_data_empty
: data
;
2094 } else if (op
->direction
== DOP_DIR_WRITE
) {
2096 data
= dispatch_data_create_subrange(op
->data
, op
->buf_len
,
2099 if (op
->buf_data
&& op
->buf_len
== op
->buf_siz
) {
2100 _dispatch_io_data_release(op
->buf_data
);
2101 op
->buf_data
= NULL
;
2104 // Trim newly written buffer from head of unwritten data
2107 _dispatch_io_data_retain(data
);
2110 d
= dispatch_data_create_subrange(op
->data
, op
->buf_siz
,
2113 _dispatch_io_data_release(op
->data
);
2117 dispatch_assert(op
->direction
< DOP_DIR_MAX
);
2120 if (!deliver
|| ((flags
& DOP_NO_EMPTY
) && !dispatch_data_get_size(data
))) {
2121 op
->undelivered
= undelivered
;
2122 _dispatch_io_debug("buffer data", op
->fd_entry
->fd
);
2125 op
->undelivered
= 0;
2126 _dispatch_io_debug("deliver data", op
->fd_entry
->fd
);
2127 dispatch_op_direction_t direction
= op
->direction
;
2128 dispatch_io_handler_t handler
= op
->handler
;
2129 #if DISPATCH_IO_DEBUG
2130 int fd
= op
->fd_entry
->fd
;
2132 dispatch_fd_entry_t fd_entry
= op
->fd_entry
;
2133 _dispatch_fd_entry_retain(fd_entry
);
2134 dispatch_io_t channel
= op
->channel
;
2135 _dispatch_retain(channel
);
2136 // Note that data delivery may occur after the operation is freed
2137 dispatch_async(op
->op_q
, ^{
2138 bool done
= (flags
& DOP_DONE
);
2139 dispatch_data_t d
= data
;
2141 if (direction
== DOP_DIR_READ
&& err
) {
2142 if (dispatch_data_get_size(d
)) {
2143 _dispatch_io_debug("IO handler invoke", fd
);
2144 handler(false, d
, 0);
2147 } else if (direction
== DOP_DIR_WRITE
&& !err
) {
2151 _dispatch_io_debug("IO handler invoke", fd
);
2152 handler(done
, d
, err
);
2153 _dispatch_release(channel
);
2154 _dispatch_fd_entry_release(fd_entry
);
2155 _dispatch_io_data_release(data
);