]> git.saurik.com Git - apple/libdispatch.git/blob - src/io.c
libdispatch-703.50.37.tar.gz
[apple/libdispatch.git] / src / io.c
1 /*
2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 #ifndef DISPATCH_IO_DEBUG
24 #define DISPATCH_IO_DEBUG DISPATCH_DEBUG
25 #endif
26
27 #ifndef PAGE_SIZE
28 #define PAGE_SIZE getpagesize()
29 #endif
30
31 #if DISPATCH_DATA_IS_BRIDGED_TO_NSDATA
32 #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
33 #define _dispatch_io_data_release(x) _dispatch_objc_release(x)
34 #else
35 #define _dispatch_io_data_retain(x) dispatch_retain(x)
36 #define _dispatch_io_data_release(x) dispatch_release(x)
37 #endif
38
39 typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
40
41 DISPATCH_EXPORT DISPATCH_NOTHROW
42 void _dispatch_iocntl(uint32_t param, uint64_t value);
43
44 static dispatch_operation_t _dispatch_operation_create(
45 dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
46 size_t length, dispatch_data_t data, dispatch_queue_t queue,
47 dispatch_io_handler_t handler);
48 static void _dispatch_operation_enqueue(dispatch_operation_t op,
49 dispatch_op_direction_t direction, dispatch_data_t data);
50 static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
51 dispatch_operation_t op);
52 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
53 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
54 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
55 dispatch_fd_entry_init_callback_t completion_callback);
56 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
57 uintptr_t hash);
58 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
59 dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
60 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
61 dispatch_io_t channel);
62 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
63 dispatch_io_t channel);
64 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
65 dispatch_queue_t tq);
66 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
67 dispatch_op_direction_t direction);
68 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
69 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
70 dispatch_operation_t operation, dispatch_data_t data);
71 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
72 dispatch_operation_t operation, dispatch_data_t data);
73 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
74 dispatch_io_t channel);
75 static void _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk,
76 dispatch_io_t channel);
77 static void _dispatch_stream_source_handler(void *ctx);
78 static void _dispatch_stream_queue_handler(void *ctx);
79 static void _dispatch_stream_handler(void *ctx);
80 static void _dispatch_disk_handler(void *ctx);
81 static void _dispatch_disk_perform(void *ctxt);
82 static void _dispatch_operation_advise(dispatch_operation_t op,
83 size_t chunk_size);
84 static int _dispatch_operation_perform(dispatch_operation_t op);
85 static void _dispatch_operation_deliver_data(dispatch_operation_t op,
86 dispatch_op_flags_t flags);
87
88 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
89 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
90 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
91 case EINTR: continue; \
92 __VA_ARGS__ \
93 } \
94 break; \
95 } while (1)
96 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
97 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
98 case 0: break; \
99 __VA_ARGS__ \
100 ); \
101 } while (0)
102 #define _dispatch_io_syscall(__syscall) do { int __err; \
103 _dispatch_io_syscall_switch(__err, __syscall); \
104 } while (0)
105
106 enum {
107 DISPATCH_OP_COMPLETE = 1,
108 DISPATCH_OP_DELIVER,
109 DISPATCH_OP_DELIVER_AND_COMPLETE,
110 DISPATCH_OP_COMPLETE_RESUME,
111 DISPATCH_OP_RESUME,
112 DISPATCH_OP_ERR,
113 DISPATCH_OP_FD_ERR,
114 };
115
116 #define _dispatch_io_Block_copy(x) \
117 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
118
119 #pragma mark -
120 #pragma mark dispatch_io_debug
121
122 #if DISPATCH_IO_DEBUG
123 #if !DISPATCH_DEBUG
124 #define _dispatch_io_log(x, ...) do { \
125 _dispatch_log("%llu\t%p\t" x, _dispatch_absolute_time(), \
126 (void *)_dispatch_thread_self(), ##__VA_ARGS__); \
127 } while (0)
128 #ifdef _dispatch_object_debug
129 #undef _dispatch_object_debug
130 #define _dispatch_object_debug dispatch_debug
131 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
132 #endif
133 #else
134 #define _dispatch_io_log(x, ...) _dispatch_debug(x, ##__VA_ARGS__)
135 #endif // DISPATCH_DEBUG
136 #else
137 #define _dispatch_io_log(x, ...)
138 #endif // DISPATCH_IO_DEBUG
139
140 #define _dispatch_fd_debug(msg, fd, ...) \
141 _dispatch_io_log("fd[0x%x]: " msg, fd, ##__VA_ARGS__)
142 #define _dispatch_op_debug(msg, op, ...) \
143 _dispatch_io_log("op[%p]: " msg, op, ##__VA_ARGS__)
144 #define _dispatch_channel_debug(msg, channel, ...) \
145 _dispatch_io_log("channel[%p]: " msg, channel, ##__VA_ARGS__)
146 #define _dispatch_fd_entry_debug(msg, fd_entry, ...) \
147 _dispatch_io_log("fd_entry[%p]: " msg, fd_entry, ##__VA_ARGS__)
148 #define _dispatch_disk_debug(msg, disk, ...) \
149 _dispatch_io_log("disk[%p]: " msg, disk, ##__VA_ARGS__)
150
151 #pragma mark -
152 #pragma mark dispatch_io_hashtables
153
154 // Global hashtable of dev_t -> disk_s mappings
155 DISPATCH_CACHELINE_ALIGN
156 static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
157 // Global hashtable of fd -> fd_entry_s mappings
158 DISPATCH_CACHELINE_ALIGN
159 static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
160
161 static dispatch_once_t _dispatch_io_devs_lockq_pred;
162 static dispatch_queue_t _dispatch_io_devs_lockq;
163 static dispatch_queue_t _dispatch_io_fds_lockq;
164
165 static char const * const _dispatch_io_key = "io";
166
167 static void
168 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
169 {
170 _dispatch_io_fds_lockq = dispatch_queue_create(
171 "com.apple.libdispatch-io.fd_lockq", NULL);
172 unsigned int i;
173 for (i = 0; i < DIO_HASH_SIZE; i++) {
174 TAILQ_INIT(&_dispatch_io_fds[i]);
175 }
176 }
177
178 static void
179 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
180 {
181 _dispatch_io_devs_lockq = dispatch_queue_create(
182 "com.apple.libdispatch-io.dev_lockq", NULL);
183 unsigned int i;
184 for (i = 0; i < DIO_HASH_SIZE; i++) {
185 TAILQ_INIT(&_dispatch_io_devs[i]);
186 }
187 }
188
189 #pragma mark -
190 #pragma mark dispatch_io_defaults
191
192 enum {
193 DISPATCH_IOCNTL_CHUNK_PAGES = 1,
194 DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
195 DISPATCH_IOCNTL_INITIAL_DELIVERY,
196 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
197 };
198
199 static struct dispatch_io_defaults_s {
200 size_t chunk_size, low_water_chunks, max_pending_io_reqs;
201 bool initial_delivery;
202 } dispatch_io_defaults = {
203 .chunk_size = DIO_MAX_CHUNK_SIZE,
204 .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
205 .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
206 };
207
208 #define _dispatch_iocntl_set_default(p, v) do { \
209 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
210 } while (0)
211
212 void
213 _dispatch_iocntl(uint32_t param, uint64_t value)
214 {
215 switch (param) {
216 case DISPATCH_IOCNTL_CHUNK_PAGES:
217 _dispatch_iocntl_set_default(chunk_size, value * PAGE_SIZE);
218 break;
219 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
220 _dispatch_iocntl_set_default(low_water_chunks, value);
221 break;
222 case DISPATCH_IOCNTL_INITIAL_DELIVERY:
223 _dispatch_iocntl_set_default(initial_delivery, value);
224 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
225 _dispatch_iocntl_set_default(max_pending_io_reqs, value);
226 break;
227 }
228 }
229
230 #pragma mark -
231 #pragma mark dispatch_io_t
232
233 static dispatch_io_t
234 _dispatch_io_create(dispatch_io_type_t type)
235 {
236 dispatch_io_t channel = _dispatch_alloc(DISPATCH_VTABLE(io),
237 sizeof(struct dispatch_io_s));
238 channel->do_next = DISPATCH_OBJECT_LISTLESS;
239 channel->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
240 true);
241 channel->params.type = type;
242 channel->params.high = SIZE_MAX;
243 channel->params.low = dispatch_io_defaults.low_water_chunks *
244 dispatch_io_defaults.chunk_size;
245 channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
246 NULL);
247 return channel;
248 }
249
250 static void
251 _dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
252 dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
253 {
254 // Enqueue the cleanup handler on the suspended close queue
255 if (cleanup_handler) {
256 _dispatch_retain(queue);
257 dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
258 dispatch_async(queue, ^{
259 _dispatch_channel_debug("cleanup handler invoke: err %d",
260 channel, err);
261 cleanup_handler(err);
262 });
263 _dispatch_release(queue);
264 });
265 }
266 if (fd_entry) {
267 channel->fd_entry = fd_entry;
268 dispatch_retain(fd_entry->barrier_queue);
269 dispatch_retain(fd_entry->barrier_group);
270 channel->barrier_queue = fd_entry->barrier_queue;
271 channel->barrier_group = fd_entry->barrier_group;
272 } else {
273 // Still need to create a barrier queue, since all operations go
274 // through it
275 channel->barrier_queue = dispatch_queue_create(
276 "com.apple.libdispatch-io.barrierq", NULL);
277 channel->barrier_group = dispatch_group_create();
278 }
279 }
280
281 void
282 _dispatch_io_dispose(dispatch_io_t channel)
283 {
284 _dispatch_object_debug(channel, "%s", __func__);
285 if (channel->fd_entry &&
286 !(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
287 if (channel->fd_entry->path_data) {
288 // This modification is safe since path_data->channel is checked
289 // only on close_queue (which is still suspended at this point)
290 channel->fd_entry->path_data->channel = NULL;
291 }
292 // Cleanup handlers will only run when all channels related to this
293 // fd are complete
294 _dispatch_fd_entry_release(channel->fd_entry);
295 }
296 if (channel->queue) {
297 dispatch_release(channel->queue);
298 }
299 if (channel->barrier_queue) {
300 dispatch_release(channel->barrier_queue);
301 }
302 if (channel->barrier_group) {
303 dispatch_release(channel->barrier_group);
304 }
305 }
306
307 static int
308 _dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
309 {
310 int err = 0;
311 if (S_ISDIR(mode)) {
312 err = EISDIR;
313 } else if (channel->params.type == DISPATCH_IO_RANDOM &&
314 (S_ISFIFO(mode) || S_ISSOCK(mode))) {
315 err = ESPIPE;
316 }
317 return err;
318 }
319
320 static int
321 _dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
322 bool ignore_closed)
323 {
324 // On _any_ queue
325 int err;
326 if (op) {
327 channel = op->channel;
328 }
329 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
330 if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
331 err = ECANCELED;
332 } else {
333 err = 0;
334 }
335 } else {
336 err = op ? op->fd_entry->err : channel->err;
337 }
338 return err;
339 }
340
341 #pragma mark -
342 #pragma mark dispatch_io_channels
343
344 dispatch_io_t
345 dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
346 dispatch_queue_t queue, void (^cleanup_handler)(int))
347 {
348 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
349 return DISPATCH_BAD_INPUT;
350 }
351 dispatch_io_t channel = _dispatch_io_create(type);
352 channel->fd = fd;
353 _dispatch_channel_debug("create", channel);
354 channel->fd_actual = fd;
355 dispatch_suspend(channel->queue);
356 _dispatch_retain(queue);
357 _dispatch_retain(channel);
358 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
359 // On barrier queue
360 int err = fd_entry->err;
361 if (!err) {
362 err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
363 }
364 if (!err && type == DISPATCH_IO_RANDOM) {
365 off_t f_ptr;
366 _dispatch_io_syscall_switch_noerr(err,
367 f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
368 case 0: channel->f_ptr = f_ptr; break;
369 default: (void)dispatch_assume_zero(err); break;
370 );
371 }
372 channel->err = err;
373 _dispatch_fd_entry_retain(fd_entry);
374 _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
375 dispatch_resume(channel->queue);
376 _dispatch_object_debug(channel, "%s", __func__);
377 _dispatch_release(channel);
378 _dispatch_release(queue);
379 });
380 _dispatch_object_debug(channel, "%s", __func__);
381 return channel;
382 }
383
384 dispatch_io_t
385 dispatch_io_create_f(dispatch_io_type_t type, dispatch_fd_t fd,
386 dispatch_queue_t queue, void *context,
387 void (*cleanup_handler)(void *context, int error))
388 {
389 return dispatch_io_create(type, fd, queue, !cleanup_handler ? NULL :
390 ^(int error){ cleanup_handler(context, error); });
391 }
392
393 dispatch_io_t
394 dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
395 int oflag, mode_t mode, dispatch_queue_t queue,
396 void (^cleanup_handler)(int error))
397 {
398 if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
399 !(*path == '/')) {
400 return DISPATCH_BAD_INPUT;
401 }
402 size_t pathlen = strlen(path);
403 dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
404 if (!path_data) {
405 return DISPATCH_OUT_OF_MEMORY;
406 }
407 dispatch_io_t channel = _dispatch_io_create(type);
408 channel->fd = -1;
409 _dispatch_channel_debug("create with path %s", channel, path);
410 channel->fd_actual = -1;
411 path_data->channel = channel;
412 path_data->oflag = oflag;
413 path_data->mode = mode;
414 path_data->pathlen = pathlen;
415 memcpy(path_data->path, path, pathlen + 1);
416 _dispatch_retain(queue);
417 _dispatch_retain(channel);
418 dispatch_async(channel->queue, ^{
419 int err = 0;
420 struct stat st;
421 _dispatch_io_syscall_switch_noerr(err,
422 (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW
423 #ifndef __linux__
424 || (path_data->oflag & O_SYMLINK) == O_SYMLINK
425 #endif
426 ? lstat(path_data->path, &st) : stat(path_data->path, &st),
427 case 0:
428 err = _dispatch_io_validate_type(channel, st.st_mode);
429 break;
430 default:
431 if ((path_data->oflag & O_CREAT) &&
432 (*(path_data->path + path_data->pathlen - 1) != '/')) {
433 // Check parent directory
434 char *c = strrchr(path_data->path, '/');
435 dispatch_assert(c);
436 *c = 0;
437 int perr;
438 _dispatch_io_syscall_switch_noerr(perr,
439 stat(path_data->path, &st),
440 case 0:
441 // Since the parent directory exists, open() will
442 // create a regular file after the fd_entry has
443 // been filled in
444 st.st_mode = S_IFREG;
445 err = 0;
446 break;
447 );
448 *c = '/';
449 }
450 break;
451 );
452 channel->err = err;
453 if (err) {
454 free(path_data);
455 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
456 _dispatch_release(channel);
457 _dispatch_release(queue);
458 return;
459 }
460 dispatch_suspend(channel->queue);
461 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
462 _dispatch_io_devs_lockq_init);
463 dispatch_async(_dispatch_io_devs_lockq, ^{
464 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
465 path_data, st.st_dev, st.st_mode);
466 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
467 dispatch_resume(channel->queue);
468 _dispatch_object_debug(channel, "%s", __func__);
469 _dispatch_release(channel);
470 _dispatch_release(queue);
471 });
472 });
473 _dispatch_object_debug(channel, "%s", __func__);
474 return channel;
475 }
476
477 dispatch_io_t
478 dispatch_io_create_with_path_f(dispatch_io_type_t type, const char *path,
479 int oflag, mode_t mode, dispatch_queue_t queue, void *context,
480 void (*cleanup_handler)(void *context, int error))
481 {
482 return dispatch_io_create_with_path(type, path, oflag, mode, queue,
483 !cleanup_handler ? NULL :
484 ^(int error){ cleanup_handler(context, error); });
485 }
486
487 dispatch_io_t
488 dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
489 dispatch_queue_t queue, void (^cleanup_handler)(int error))
490 {
491 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
492 return DISPATCH_BAD_INPUT;
493 }
494 dispatch_io_t channel = _dispatch_io_create(type);
495 _dispatch_channel_debug("create with channel %p", channel, in_channel);
496 dispatch_suspend(channel->queue);
497 _dispatch_retain(queue);
498 _dispatch_retain(channel);
499 _dispatch_retain(in_channel);
500 dispatch_async(in_channel->queue, ^{
501 int err0 = _dispatch_io_get_error(NULL, in_channel, false);
502 if (err0) {
503 channel->err = err0;
504 _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
505 dispatch_resume(channel->queue);
506 _dispatch_release(channel);
507 _dispatch_release(in_channel);
508 _dispatch_release(queue);
509 return;
510 }
511 dispatch_async(in_channel->barrier_queue, ^{
512 int err = _dispatch_io_get_error(NULL, in_channel, false);
513 // If there is no error, the fd_entry for the in_channel is valid.
514 // Since we are running on in_channel's queue, the fd_entry has been
515 // fully resolved and will stay valid for the duration of this block
516 if (!err) {
517 err = in_channel->err;
518 if (!err) {
519 err = in_channel->fd_entry->err;
520 }
521 }
522 if (!err) {
523 err = _dispatch_io_validate_type(channel,
524 in_channel->fd_entry->stat.mode);
525 }
526 if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
527 off_t f_ptr;
528 _dispatch_io_syscall_switch_noerr(err,
529 f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
530 case 0: channel->f_ptr = f_ptr; break;
531 default: (void)dispatch_assume_zero(err); break;
532 );
533 }
534 channel->err = err;
535 if (err) {
536 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
537 dispatch_resume(channel->queue);
538 _dispatch_release(channel);
539 _dispatch_release(in_channel);
540 _dispatch_release(queue);
541 return;
542 }
543 if (in_channel->fd == -1) {
544 // in_channel was created from path
545 channel->fd = -1;
546 channel->fd_actual = -1;
547 mode_t mode = in_channel->fd_entry->stat.mode;
548 dev_t dev = in_channel->fd_entry->stat.dev;
549 size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
550 in_channel->fd_entry->path_data->pathlen + 1;
551 dispatch_io_path_data_t path_data = malloc(path_data_len);
552 memcpy(path_data, in_channel->fd_entry->path_data,
553 path_data_len);
554 path_data->channel = channel;
555 // lockq_io_devs is known to already exist
556 dispatch_async(_dispatch_io_devs_lockq, ^{
557 dispatch_fd_entry_t fd_entry;
558 fd_entry = _dispatch_fd_entry_create_with_path(path_data,
559 dev, mode);
560 _dispatch_io_init(channel, fd_entry, queue, 0,
561 cleanup_handler);
562 dispatch_resume(channel->queue);
563 _dispatch_release(channel);
564 _dispatch_release(queue);
565 });
566 } else {
567 dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
568 channel->fd = in_channel->fd;
569 channel->fd_actual = in_channel->fd_actual;
570 _dispatch_fd_entry_retain(fd_entry);
571 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
572 dispatch_resume(channel->queue);
573 _dispatch_release(channel);
574 _dispatch_release(queue);
575 }
576 _dispatch_release(in_channel);
577 _dispatch_object_debug(channel, "%s", __func__);
578 });
579 });
580 _dispatch_object_debug(channel, "%s", __func__);
581 return channel;
582 }
583
584 dispatch_io_t
585 dispatch_io_create_with_io_f(dispatch_io_type_t type, dispatch_io_t in_channel,
586 dispatch_queue_t queue, void *context,
587 void (*cleanup_handler)(void *context, int error))
588 {
589 return dispatch_io_create_with_io(type, in_channel, queue,
590 !cleanup_handler ? NULL :
591 ^(int error){ cleanup_handler(context, error); });
592 }
593
594 #pragma mark -
595 #pragma mark dispatch_io_accessors
596
597 void
598 dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
599 {
600 _dispatch_retain(channel);
601 dispatch_async(channel->queue, ^{
602 _dispatch_channel_debug("set high water: %zu", channel, high_water);
603 if (channel->params.low > high_water) {
604 channel->params.low = high_water;
605 }
606 channel->params.high = high_water ? high_water : 1;
607 _dispatch_release(channel);
608 });
609 }
610
611 void
612 dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
613 {
614 _dispatch_retain(channel);
615 dispatch_async(channel->queue, ^{
616 _dispatch_channel_debug("set low water: %zu", channel, low_water);
617 if (channel->params.high < low_water) {
618 channel->params.high = low_water ? low_water : 1;
619 }
620 channel->params.low = low_water;
621 _dispatch_release(channel);
622 });
623 }
624
625 void
626 dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
627 unsigned long flags)
628 {
629 _dispatch_retain(channel);
630 dispatch_async(channel->queue, ^{
631 _dispatch_channel_debug("set interval: %llu", channel, interval);
632 channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX;
633 channel->params.interval_flags = flags;
634 _dispatch_release(channel);
635 });
636 }
637
638 void
639 _dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
640 {
641 _dispatch_retain(dq);
642 _dispatch_retain(channel);
643 dispatch_async(channel->queue, ^{
644 dispatch_queue_t prev_dq = channel->do_targetq;
645 channel->do_targetq = dq;
646 _dispatch_release(prev_dq);
647 _dispatch_object_debug(channel, "%s", __func__);
648 _dispatch_release(channel);
649 });
650 }
651
652 dispatch_fd_t
653 dispatch_io_get_descriptor(dispatch_io_t channel)
654 {
655 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
656 return -1;
657 }
658 dispatch_fd_t fd = channel->fd_actual;
659 if (fd == -1 && !_dispatch_io_get_error(NULL, channel, false)) {
660 dispatch_thread_context_t ctxt =
661 _dispatch_thread_context_find(_dispatch_io_key);
662 if (ctxt && ctxt->dtc_io_in_barrier == channel) {
663 (void)_dispatch_fd_entry_open(channel->fd_entry, channel);
664 }
665 }
666 return channel->fd_actual;
667 }
668
669 #pragma mark -
670 #pragma mark dispatch_io_operations
671
672 static void
673 _dispatch_io_stop(dispatch_io_t channel)
674 {
675 _dispatch_channel_debug("stop", channel);
676 (void)os_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed);
677 _dispatch_retain(channel);
678 dispatch_async(channel->queue, ^{
679 dispatch_async(channel->barrier_queue, ^{
680 _dispatch_object_debug(channel, "%s", __func__);
681 dispatch_fd_entry_t fd_entry = channel->fd_entry;
682 if (fd_entry) {
683 _dispatch_channel_debug("stop cleanup", channel);
684 _dispatch_fd_entry_cleanup_operations(fd_entry, channel);
685 if (!(channel->atomic_flags & DIO_CLOSED)) {
686 channel->fd_entry = NULL;
687 _dispatch_fd_entry_release(fd_entry);
688 }
689 } else if (channel->fd != -1) {
690 // Stop after close, need to check if fd_entry still exists
691 _dispatch_retain(channel);
692 dispatch_async(_dispatch_io_fds_lockq, ^{
693 _dispatch_object_debug(channel, "%s", __func__);
694 _dispatch_channel_debug("stop cleanup after close",
695 channel);
696 dispatch_fd_entry_t fdi;
697 uintptr_t hash = DIO_HASH(channel->fd);
698 TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
699 if (fdi->fd == channel->fd) {
700 _dispatch_fd_entry_cleanup_operations(fdi, channel);
701 break;
702 }
703 }
704 _dispatch_release(channel);
705 });
706 }
707 _dispatch_release(channel);
708 });
709 });
710 }
711
712 void
713 dispatch_io_close(dispatch_io_t channel, unsigned long flags)
714 {
715 if (flags & DISPATCH_IO_STOP) {
716 // Don't stop an already stopped channel
717 if (channel->atomic_flags & DIO_STOPPED) {
718 return;
719 }
720 return _dispatch_io_stop(channel);
721 }
722 // Don't close an already closed or stopped channel
723 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
724 return;
725 }
726 _dispatch_retain(channel);
727 dispatch_async(channel->queue, ^{
728 dispatch_async(channel->barrier_queue, ^{
729 _dispatch_object_debug(channel, "%s", __func__);
730 _dispatch_channel_debug("close", channel);
731 if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
732 (void)os_atomic_or2o(channel, atomic_flags, DIO_CLOSED,
733 relaxed);
734 dispatch_fd_entry_t fd_entry = channel->fd_entry;
735 if (fd_entry) {
736 if (!fd_entry->path_data) {
737 channel->fd_entry = NULL;
738 }
739 _dispatch_fd_entry_release(fd_entry);
740 }
741 }
742 _dispatch_release(channel);
743 });
744 });
745 }
746
747 void
748 dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
749 {
750 _dispatch_retain(channel);
751 dispatch_async(channel->queue, ^{
752 dispatch_queue_t io_q = channel->do_targetq;
753 dispatch_queue_t barrier_queue = channel->barrier_queue;
754 dispatch_group_t barrier_group = channel->barrier_group;
755 dispatch_async(barrier_queue, ^{
756 dispatch_suspend(barrier_queue);
757 dispatch_group_notify(barrier_group, io_q, ^{
758 dispatch_thread_context_s io_ctxt = {
759 .dtc_key = _dispatch_io_key,
760 .dtc_io_in_barrier = channel,
761 };
762
763 _dispatch_object_debug(channel, "%s", __func__);
764 _dispatch_thread_context_push(&io_ctxt);
765 barrier();
766 _dispatch_thread_context_pop(&io_ctxt);
767 dispatch_resume(barrier_queue);
768 _dispatch_release(channel);
769 });
770 });
771 });
772 }
773
774 void
775 dispatch_io_barrier_f(dispatch_io_t channel, void *context,
776 dispatch_function_t barrier)
777 {
778 return dispatch_io_barrier(channel, ^{ barrier(context); });
779 }
780
781 void
782 dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
783 dispatch_queue_t queue, dispatch_io_handler_t handler)
784 {
785 _dispatch_retain(channel);
786 _dispatch_retain(queue);
787 dispatch_async(channel->queue, ^{
788 dispatch_operation_t op;
789 op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
790 length, dispatch_data_empty, queue, handler);
791 if (op) {
792 dispatch_queue_t barrier_q = channel->barrier_queue;
793 dispatch_async(barrier_q, ^{
794 _dispatch_operation_enqueue(op, DOP_DIR_READ,
795 dispatch_data_empty);
796 });
797 }
798 _dispatch_release(channel);
799 _dispatch_release(queue);
800 });
801 }
802
803 void
804 dispatch_io_read_f(dispatch_io_t channel, off_t offset, size_t length,
805 dispatch_queue_t queue, void *context,
806 dispatch_io_handler_function_t handler)
807 {
808 return dispatch_io_read(channel, offset, length, queue,
809 ^(bool done, dispatch_data_t d, int error){
810 handler(context, done, d, error);
811 });
812 }
813
814 void
815 dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
816 dispatch_queue_t queue, dispatch_io_handler_t handler)
817 {
818 _dispatch_io_data_retain(data);
819 _dispatch_retain(channel);
820 _dispatch_retain(queue);
821 dispatch_async(channel->queue, ^{
822 dispatch_operation_t op;
823 op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
824 dispatch_data_get_size(data), data, queue, handler);
825 if (op) {
826 dispatch_queue_t barrier_q = channel->barrier_queue;
827 dispatch_async(barrier_q, ^{
828 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
829 _dispatch_io_data_release(data);
830 });
831 } else {
832 _dispatch_io_data_release(data);
833 }
834 _dispatch_release(channel);
835 _dispatch_release(queue);
836 });
837 }
838
839 void
840 dispatch_io_write_f(dispatch_io_t channel, off_t offset, dispatch_data_t data,
841 dispatch_queue_t queue, void *context,
842 dispatch_io_handler_function_t handler)
843 {
844 return dispatch_io_write(channel, offset, data, queue,
845 ^(bool done, dispatch_data_t d, int error){
846 handler(context, done, d, error);
847 });
848 }
849
850 void
851 dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
852 void (^handler)(dispatch_data_t, int))
853 {
854 _dispatch_retain(queue);
855 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
856 // On barrier queue
857 if (fd_entry->err) {
858 int err = fd_entry->err;
859 dispatch_async(queue, ^{
860 _dispatch_fd_debug("convenience handler invoke", fd);
861 handler(dispatch_data_empty, err);
862 });
863 _dispatch_release(queue);
864 return;
865 }
866 // Safe to access fd_entry on barrier queue
867 dispatch_io_t channel = fd_entry->convenience_channel;
868 if (!channel) {
869 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
870 channel->fd = fd;
871 channel->fd_actual = fd;
872 channel->fd_entry = fd_entry;
873 dispatch_retain(fd_entry->barrier_queue);
874 dispatch_retain(fd_entry->barrier_group);
875 channel->barrier_queue = fd_entry->barrier_queue;
876 channel->barrier_group = fd_entry->barrier_group;
877 fd_entry->convenience_channel = channel;
878 }
879 __block dispatch_data_t deliver_data = dispatch_data_empty;
880 __block int err = 0;
881 dispatch_async(fd_entry->close_queue, ^{
882 dispatch_async(queue, ^{
883 _dispatch_fd_debug("convenience handler invoke", fd);
884 handler(deliver_data, err);
885 _dispatch_io_data_release(deliver_data);
886 });
887 _dispatch_release(queue);
888 });
889 dispatch_operation_t op =
890 _dispatch_operation_create(DOP_DIR_READ, channel, 0,
891 length, dispatch_data_empty,
892 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false),
893 ^(bool done, dispatch_data_t data, int error) {
894 if (data) {
895 data = dispatch_data_create_concat(deliver_data, data);
896 _dispatch_io_data_release(deliver_data);
897 deliver_data = data;
898 }
899 if (done) {
900 err = error;
901 }
902 });
903 if (op) {
904 _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
905 }
906 });
907 }
908
909 void
910 dispatch_read_f(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
911 void *context, void (*handler)(void *, dispatch_data_t, int))
912 {
913 return dispatch_read(fd, length, queue, ^(dispatch_data_t d, int error){
914 handler(context, d, error);
915 });
916 }
917
918 void
919 dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
920 void (^handler)(dispatch_data_t, int))
921 {
922 _dispatch_io_data_retain(data);
923 _dispatch_retain(queue);
924 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
925 // On barrier queue
926 if (fd_entry->err) {
927 int err = fd_entry->err;
928 dispatch_async(queue, ^{
929 _dispatch_fd_debug("convenience handler invoke", fd);
930 handler(NULL, err);
931 });
932 _dispatch_release(queue);
933 return;
934 }
935 // Safe to access fd_entry on barrier queue
936 dispatch_io_t channel = fd_entry->convenience_channel;
937 if (!channel) {
938 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
939 channel->fd = fd;
940 channel->fd_actual = fd;
941 channel->fd_entry = fd_entry;
942 dispatch_retain(fd_entry->barrier_queue);
943 dispatch_retain(fd_entry->barrier_group);
944 channel->barrier_queue = fd_entry->barrier_queue;
945 channel->barrier_group = fd_entry->barrier_group;
946 fd_entry->convenience_channel = channel;
947 }
948 __block dispatch_data_t deliver_data = NULL;
949 __block int err = 0;
950 dispatch_async(fd_entry->close_queue, ^{
951 dispatch_async(queue, ^{
952 _dispatch_fd_debug("convenience handler invoke", fd);
953 handler(deliver_data, err);
954 if (deliver_data) {
955 _dispatch_io_data_release(deliver_data);
956 }
957 });
958 _dispatch_release(queue);
959 });
960 dispatch_operation_t op =
961 _dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
962 dispatch_data_get_size(data), data,
963 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false),
964 ^(bool done, dispatch_data_t d, int error) {
965 if (done) {
966 if (d) {
967 _dispatch_io_data_retain(d);
968 deliver_data = d;
969 }
970 err = error;
971 }
972 });
973 if (op) {
974 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
975 }
976 _dispatch_io_data_release(data);
977 });
978 }
979
980 void
981 dispatch_write_f(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
982 void *context, void (*handler)(void *, dispatch_data_t, int))
983 {
984 return dispatch_write(fd, data, queue, ^(dispatch_data_t d, int error){
985 handler(context, d, error);
986 });
987 }
988
989 #pragma mark -
990 #pragma mark dispatch_operation_t
991
992 static dispatch_operation_t
993 _dispatch_operation_create(dispatch_op_direction_t direction,
994 dispatch_io_t channel, off_t offset, size_t length,
995 dispatch_data_t data, dispatch_queue_t queue,
996 dispatch_io_handler_t handler)
997 {
998 // On channel queue
999 dispatch_assert(direction < DOP_DIR_MAX);
1000 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
1001 // that can only be NULL if atomic_flags are set rdar://problem/8362514
1002 int err = _dispatch_io_get_error(NULL, channel, false);
1003 if (err || !length) {
1004 _dispatch_io_data_retain(data);
1005 _dispatch_retain(queue);
1006 dispatch_async(channel->barrier_queue, ^{
1007 dispatch_async(queue, ^{
1008 dispatch_data_t d = data;
1009 if (direction == DOP_DIR_READ && err) {
1010 d = NULL;
1011 } else if (direction == DOP_DIR_WRITE && !err) {
1012 d = NULL;
1013 }
1014 _dispatch_channel_debug("IO handler invoke: err %d", channel,
1015 err);
1016 handler(true, d, err);
1017 _dispatch_io_data_release(data);
1018 });
1019 _dispatch_release(queue);
1020 });
1021 return NULL;
1022 }
1023 dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation),
1024 sizeof(struct dispatch_operation_s));
1025 _dispatch_channel_debug("operation create: %p", channel, op);
1026 op->do_next = DISPATCH_OBJECT_LISTLESS;
1027 op->do_xref_cnt = -1; // operation object is not exposed externally
1028 op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
1029 op->op_q->do_targetq = queue;
1030 _dispatch_retain(queue);
1031 op->active = false;
1032 op->direction = direction;
1033 op->offset = offset + channel->f_ptr;
1034 op->length = length;
1035 op->handler = _dispatch_io_Block_copy(handler);
1036 _dispatch_retain(channel);
1037 op->channel = channel;
1038 op->params = channel->params;
1039 // Take a snapshot of the priority of the channel queue. The actual I/O
1040 // for this operation will be performed at this priority
1041 dispatch_queue_t targetq = op->channel->do_targetq;
1042 while (fastpath(targetq->do_targetq)) {
1043 targetq = targetq->do_targetq;
1044 }
1045 op->do_targetq = targetq;
1046 _dispatch_object_debug(op, "%s", __func__);
1047 return op;
1048 }
1049
1050 void
1051 _dispatch_operation_dispose(dispatch_operation_t op)
1052 {
1053 _dispatch_object_debug(op, "%s", __func__);
1054 _dispatch_op_debug("dispose", op);
1055 // Deliver the data if there's any
1056 if (op->fd_entry) {
1057 _dispatch_operation_deliver_data(op, DOP_DONE);
1058 dispatch_group_leave(op->fd_entry->barrier_group);
1059 _dispatch_fd_entry_release(op->fd_entry);
1060 }
1061 if (op->channel) {
1062 _dispatch_release(op->channel);
1063 }
1064 if (op->timer) {
1065 dispatch_release(op->timer);
1066 }
1067 // For write operations, op->buf is owned by op->buf_data
1068 if (op->buf && op->direction == DOP_DIR_READ) {
1069 free(op->buf);
1070 }
1071 if (op->buf_data) {
1072 _dispatch_io_data_release(op->buf_data);
1073 }
1074 if (op->data) {
1075 _dispatch_io_data_release(op->data);
1076 }
1077 if (op->op_q) {
1078 dispatch_release(op->op_q);
1079 }
1080 Block_release(op->handler);
1081 _dispatch_op_debug("disposed", op);
1082 }
1083
1084 static void
1085 _dispatch_operation_enqueue(dispatch_operation_t op,
1086 dispatch_op_direction_t direction, dispatch_data_t data)
1087 {
1088 // Called from the barrier queue
1089 _dispatch_io_data_retain(data);
1090 // If channel is closed or stopped, then call the handler immediately
1091 int err = _dispatch_io_get_error(NULL, op->channel, false);
1092 if (err) {
1093 dispatch_io_handler_t handler = op->handler;
1094 dispatch_async(op->op_q, ^{
1095 dispatch_data_t d = data;
1096 if (direction == DOP_DIR_READ && err) {
1097 d = NULL;
1098 } else if (direction == DOP_DIR_WRITE && !err) {
1099 d = NULL;
1100 }
1101 handler(true, d, err);
1102 _dispatch_io_data_release(data);
1103 });
1104 _dispatch_op_debug("release -> %d, err %d", op, op->do_ref_cnt, err);
1105 _dispatch_release(op);
1106 return;
1107 }
1108 // Finish operation init
1109 op->fd_entry = op->channel->fd_entry;
1110 _dispatch_fd_entry_retain(op->fd_entry);
1111 dispatch_group_enter(op->fd_entry->barrier_group);
1112 dispatch_disk_t disk = op->fd_entry->disk;
1113 if (!disk) {
1114 dispatch_stream_t stream = op->fd_entry->streams[direction];
1115 dispatch_async(stream->dq, ^{
1116 _dispatch_stream_enqueue_operation(stream, op, data);
1117 _dispatch_io_data_release(data);
1118 });
1119 } else {
1120 dispatch_async(disk->pick_queue, ^{
1121 _dispatch_disk_enqueue_operation(disk, op, data);
1122 _dispatch_io_data_release(data);
1123 });
1124 }
1125 }
1126
1127 static bool
1128 _dispatch_operation_should_enqueue(dispatch_operation_t op,
1129 dispatch_queue_t tq, dispatch_data_t data)
1130 {
1131 // On stream queue or disk queue
1132 _dispatch_op_debug("enqueue", op);
1133 _dispatch_io_data_retain(data);
1134 op->data = data;
1135 int err = _dispatch_io_get_error(op, NULL, true);
1136 if (err) {
1137 op->err = err;
1138 // Final release
1139 _dispatch_op_debug("release -> %d, err %d", op, op->do_ref_cnt, err);
1140 _dispatch_release(op);
1141 return false;
1142 }
1143 if (op->params.interval) {
1144 dispatch_resume(_dispatch_operation_timer(tq, op));
1145 }
1146 return true;
1147 }
1148
1149 static dispatch_source_t
1150 _dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
1151 {
1152 // On stream queue or pick queue
1153 if (op->timer) {
1154 return op->timer;
1155 }
1156 dispatch_source_t timer = dispatch_source_create(
1157 DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
1158 dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW,
1159 (int64_t)op->params.interval), op->params.interval, 0);
1160 dispatch_source_set_event_handler(timer, ^{
1161 // On stream queue or pick queue
1162 if (dispatch_source_testcancel(timer)) {
1163 // Do nothing. The operation has already completed
1164 return;
1165 }
1166 dispatch_op_flags_t flags = DOP_DEFAULT;
1167 if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
1168 // Deliver even if there is less data than the low-water mark
1169 flags |= DOP_DELIVER;
1170 }
1171 // If the operation is active, dont deliver data
1172 if ((op->active) && (flags & DOP_DELIVER)) {
1173 op->flags = flags;
1174 } else {
1175 _dispatch_operation_deliver_data(op, flags);
1176 }
1177 });
1178 op->timer = timer;
1179 return op->timer;
1180 }
1181
1182 #pragma mark -
1183 #pragma mark dispatch_fd_entry_t
1184
1185 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1186 static void
1187 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry)
1188 {
1189 guardid_t guard = fd_entry;
1190 const unsigned int guard_flags = GUARD_CLOSE;
1191 int err, fd_flags = 0;
1192 _dispatch_io_syscall_switch_noerr(err,
1193 change_fdguard_np(fd_entry->fd, NULL, 0, &guard, guard_flags,
1194 &fd_flags),
1195 case 0:
1196 fd_entry->guard_flags = guard_flags;
1197 fd_entry->orig_fd_flags = fd_flags;
1198 break;
1199 case EPERM: break;
1200 default: (void)dispatch_assume_zero(err); break;
1201 );
1202 }
1203
1204 static void
1205 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry)
1206 {
1207 if (!fd_entry->guard_flags) {
1208 return;
1209 }
1210 guardid_t guard = fd_entry;
1211 int err, fd_flags = fd_entry->orig_fd_flags;
1212 _dispatch_io_syscall_switch(err,
1213 change_fdguard_np(fd_entry->fd, &guard, fd_entry->guard_flags, NULL, 0,
1214 &fd_flags),
1215 default: (void)dispatch_assume_zero(err); break;
1216 );
1217 }
1218 #else
1219 static inline void
1220 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1221 static inline void
1222 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1223 #endif // DISPATCH_USE_GUARDED_FD
1224
1225 static inline int
1226 _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry, const char *path,
1227 int oflag, mode_t mode) {
1228 #if DISPATCH_USE_GUARDED_FD
1229 guardid_t guard = (uintptr_t)fd_entry;
1230 const unsigned int guard_flags = GUARD_CLOSE | GUARD_DUP |
1231 GUARD_SOCKET_IPC | GUARD_FILEPORT;
1232 int fd = guarded_open_np(path, &guard, guard_flags, oflag | O_CLOEXEC,
1233 mode);
1234 if (fd != -1) {
1235 fd_entry->guard_flags = guard_flags;
1236 return fd;
1237 }
1238 errno = 0;
1239 #endif
1240 return open(path, oflag, mode);
1241 (void)fd_entry;
1242 }
1243
1244 static inline int
1245 _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry, int fd) {
1246 #if DISPATCH_USE_GUARDED_FD
1247 if (fd_entry->guard_flags) {
1248 guardid_t guard = (uintptr_t)fd_entry;
1249 return guarded_close_np(fd, &guard);
1250 } else
1251 #endif
1252 {
1253 return close(fd);
1254 }
1255 (void)fd_entry;
1256 }
1257
1258 static inline void
1259 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
1260 dispatch_suspend(fd_entry->close_queue);
1261 }
1262
1263 static inline void
1264 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
1265 dispatch_resume(fd_entry->close_queue);
1266 }
1267
1268 static void
1269 _dispatch_fd_entry_init_async(dispatch_fd_t fd,
1270 dispatch_fd_entry_init_callback_t completion_callback)
1271 {
1272 static dispatch_once_t _dispatch_io_fds_lockq_pred;
1273 dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
1274 _dispatch_io_fds_lockq_init);
1275 dispatch_async(_dispatch_io_fds_lockq, ^{
1276 dispatch_fd_entry_t fd_entry = NULL;
1277 // Check to see if there is an existing entry for the given fd
1278 uintptr_t hash = DIO_HASH(fd);
1279 TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
1280 if (fd_entry->fd == fd) {
1281 // Retain the fd_entry to ensure it cannot go away until the
1282 // stat() has completed
1283 _dispatch_fd_entry_retain(fd_entry);
1284 break;
1285 }
1286 }
1287 if (!fd_entry) {
1288 // If we did not find an existing entry, create one
1289 fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
1290 }
1291 _dispatch_fd_entry_debug("init", fd_entry);
1292 dispatch_async(fd_entry->barrier_queue, ^{
1293 _dispatch_fd_entry_debug("init completion", fd_entry);
1294 completion_callback(fd_entry);
1295 // stat() is complete, release reference to fd_entry
1296 _dispatch_fd_entry_release(fd_entry);
1297 });
1298 });
1299 }
1300
1301 static dispatch_fd_entry_t
1302 _dispatch_fd_entry_create(dispatch_queue_t q)
1303 {
1304 dispatch_fd_entry_t fd_entry;
1305 fd_entry = _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s));
1306 fd_entry->close_queue = dispatch_queue_create(
1307 "com.apple.libdispatch-io.closeq", NULL);
1308 // Use target queue to ensure that no concurrent lookups are going on when
1309 // the close queue is running
1310 fd_entry->close_queue->do_targetq = q;
1311 _dispatch_retain(q);
1312 // Suspend the cleanup queue until closing
1313 _dispatch_fd_entry_retain(fd_entry);
1314 return fd_entry;
1315 }
1316
1317 static dispatch_fd_entry_t
1318 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
1319 {
1320 // On fds lock queue
1321 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1322 _dispatch_io_fds_lockq);
1323 _dispatch_fd_entry_debug("create: fd %d", fd_entry, fd);
1324 fd_entry->fd = fd;
1325 TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
1326 fd_entry->barrier_queue = dispatch_queue_create(
1327 "com.apple.libdispatch-io.barrierq", NULL);
1328 fd_entry->barrier_group = dispatch_group_create();
1329 dispatch_async(fd_entry->barrier_queue, ^{
1330 _dispatch_fd_entry_debug("stat", fd_entry);
1331 int err, orig_flags, orig_nosigpipe = -1;
1332 struct stat st;
1333 _dispatch_io_syscall_switch(err,
1334 fstat(fd, &st),
1335 default: fd_entry->err = err; return;
1336 );
1337 fd_entry->stat.dev = st.st_dev;
1338 fd_entry->stat.mode = st.st_mode;
1339 _dispatch_fd_entry_guard(fd_entry);
1340 _dispatch_io_syscall_switch(err,
1341 orig_flags = fcntl(fd, F_GETFL),
1342 default: (void)dispatch_assume_zero(err); break;
1343 );
1344 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1345 if (S_ISFIFO(st.st_mode)) {
1346 _dispatch_io_syscall_switch(err,
1347 orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
1348 default: (void)dispatch_assume_zero(err); break;
1349 );
1350 if (orig_nosigpipe != -1) {
1351 _dispatch_io_syscall_switch(err,
1352 orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
1353 default:
1354 orig_nosigpipe = -1;
1355 (void)dispatch_assume_zero(err);
1356 break;
1357 );
1358 }
1359 }
1360 #endif
1361 if (S_ISREG(st.st_mode)) {
1362 if (orig_flags != -1) {
1363 _dispatch_io_syscall_switch(err,
1364 fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
1365 default:
1366 orig_flags = -1;
1367 (void)dispatch_assume_zero(err);
1368 break;
1369 );
1370 }
1371 int32_t dev = major(st.st_dev);
1372 // We have to get the disk on the global dev queue. The
1373 // barrier queue cannot continue until that is complete
1374 dispatch_suspend(fd_entry->barrier_queue);
1375 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
1376 _dispatch_io_devs_lockq_init);
1377 dispatch_async(_dispatch_io_devs_lockq, ^{
1378 _dispatch_disk_init(fd_entry, dev);
1379 dispatch_resume(fd_entry->barrier_queue);
1380 });
1381 } else {
1382 if (orig_flags != -1) {
1383 _dispatch_io_syscall_switch(err,
1384 fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
1385 default:
1386 orig_flags = -1;
1387 (void)dispatch_assume_zero(err);
1388 break;
1389 );
1390 }
1391 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1392 _DISPATCH_QOS_CLASS_DEFAULT, false));
1393 }
1394 fd_entry->orig_flags = orig_flags;
1395 fd_entry->orig_nosigpipe = orig_nosigpipe;
1396 });
1397 // This is the first item run when the close queue is resumed, indicating
1398 // that all channels associated with this entry have been closed and that
1399 // all operations associated with this entry have been freed
1400 dispatch_async(fd_entry->close_queue, ^{
1401 if (!fd_entry->disk) {
1402 _dispatch_fd_entry_debug("close queue cleanup", fd_entry);
1403 dispatch_op_direction_t dir;
1404 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1405 _dispatch_stream_dispose(fd_entry, dir);
1406 }
1407 } else {
1408 dispatch_disk_t disk = fd_entry->disk;
1409 dispatch_async(_dispatch_io_devs_lockq, ^{
1410 _dispatch_release(disk);
1411 });
1412 }
1413 // Remove this entry from the global fd list
1414 TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
1415 });
1416 // If there was a source associated with this stream, disposing of the
1417 // source cancels it and suspends the close queue. Freeing the fd_entry
1418 // structure must happen after the source cancel handler has finished
1419 dispatch_async(fd_entry->close_queue, ^{
1420 _dispatch_fd_entry_debug("close queue release", fd_entry);
1421 dispatch_release(fd_entry->close_queue);
1422 _dispatch_fd_entry_debug("barrier queue release", fd_entry);
1423 dispatch_release(fd_entry->barrier_queue);
1424 _dispatch_fd_entry_debug("barrier group release", fd_entry);
1425 dispatch_release(fd_entry->barrier_group);
1426 if (fd_entry->orig_flags != -1) {
1427 _dispatch_io_syscall(
1428 fcntl(fd, F_SETFL, fd_entry->orig_flags)
1429 );
1430 }
1431 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1432 if (fd_entry->orig_nosigpipe != -1) {
1433 _dispatch_io_syscall(
1434 fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
1435 );
1436 }
1437 #endif
1438 _dispatch_fd_entry_unguard(fd_entry);
1439 if (fd_entry->convenience_channel) {
1440 fd_entry->convenience_channel->fd_entry = NULL;
1441 dispatch_release(fd_entry->convenience_channel);
1442 }
1443 free(fd_entry);
1444 });
1445 return fd_entry;
1446 }
1447
1448 static dispatch_fd_entry_t
1449 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
1450 dev_t dev, mode_t mode)
1451 {
1452 // On devs lock queue
1453 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1454 path_data->channel->queue);
1455 _dispatch_fd_entry_debug("create: path %s", fd_entry, path_data->path);
1456 if (S_ISREG(mode)) {
1457 _dispatch_disk_init(fd_entry, major(dev));
1458 } else {
1459 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1460 _DISPATCH_QOS_CLASS_DEFAULT, false));
1461 }
1462 fd_entry->fd = -1;
1463 fd_entry->orig_flags = -1;
1464 fd_entry->path_data = path_data;
1465 fd_entry->stat.dev = dev;
1466 fd_entry->stat.mode = mode;
1467 fd_entry->barrier_queue = dispatch_queue_create(
1468 "com.apple.libdispatch-io.barrierq", NULL);
1469 fd_entry->barrier_group = dispatch_group_create();
1470 // This is the first item run when the close queue is resumed, indicating
1471 // that the channel associated with this entry has been closed and that
1472 // all operations associated with this entry have been freed
1473 dispatch_async(fd_entry->close_queue, ^{
1474 _dispatch_fd_entry_debug("close queue cleanup", fd_entry);
1475 if (!fd_entry->disk) {
1476 dispatch_op_direction_t dir;
1477 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1478 _dispatch_stream_dispose(fd_entry, dir);
1479 }
1480 }
1481 if (fd_entry->fd != -1) {
1482 _dispatch_fd_entry_guarded_close(fd_entry, fd_entry->fd);
1483 }
1484 if (fd_entry->path_data->channel) {
1485 // If associated channel has not been released yet, mark it as
1486 // no longer having an fd_entry (for stop after close).
1487 // It is safe to modify channel since we are on close_queue with
1488 // target queue the channel queue
1489 fd_entry->path_data->channel->fd_entry = NULL;
1490 }
1491 });
1492 dispatch_async(fd_entry->close_queue, ^{
1493 _dispatch_fd_entry_debug("close queue release", fd_entry);
1494 dispatch_release(fd_entry->close_queue);
1495 dispatch_release(fd_entry->barrier_queue);
1496 dispatch_release(fd_entry->barrier_group);
1497 free(fd_entry->path_data);
1498 free(fd_entry);
1499 });
1500 return fd_entry;
1501 }
1502
1503 static int
1504 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
1505 {
1506 if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
1507 return 0;
1508 }
1509 if (fd_entry->err) {
1510 return fd_entry->err;
1511 }
1512 int fd = -1;
1513 int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
1514 fd_entry->path_data->oflag | O_NONBLOCK;
1515 open:
1516 fd = _dispatch_fd_entry_guarded_open(fd_entry, fd_entry->path_data->path,
1517 oflag, fd_entry->path_data->mode);
1518 if (fd == -1) {
1519 int err = errno;
1520 if (err == EINTR) {
1521 goto open;
1522 }
1523 (void)os_atomic_cmpxchg2o(fd_entry, err, 0, err, relaxed);
1524 return err;
1525 }
1526 if (!os_atomic_cmpxchg2o(fd_entry, fd, -1, fd, relaxed)) {
1527 // Lost the race with another open
1528 _dispatch_fd_entry_guarded_close(fd_entry, fd);
1529 } else {
1530 channel->fd_actual = fd;
1531 }
1532 _dispatch_object_debug(channel, "%s", __func__);
1533 return 0;
1534 }
1535
1536 static void
1537 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
1538 dispatch_io_t channel)
1539 {
1540 if (fd_entry->disk) {
1541 if (channel) {
1542 _dispatch_retain(channel);
1543 }
1544 _dispatch_fd_entry_retain(fd_entry);
1545 dispatch_async(fd_entry->disk->pick_queue, ^{
1546 _dispatch_disk_cleanup_inactive_operations(fd_entry->disk, channel);
1547 _dispatch_fd_entry_release(fd_entry);
1548 if (channel) {
1549 _dispatch_release(channel);
1550 }
1551 });
1552 } else {
1553 dispatch_op_direction_t direction;
1554 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1555 dispatch_stream_t stream = fd_entry->streams[direction];
1556 if (!stream) {
1557 continue;
1558 }
1559 if (channel) {
1560 _dispatch_retain(channel);
1561 }
1562 _dispatch_fd_entry_retain(fd_entry);
1563 dispatch_async(stream->dq, ^{
1564 _dispatch_stream_cleanup_operations(stream, channel);
1565 _dispatch_fd_entry_release(fd_entry);
1566 if (channel) {
1567 _dispatch_release(channel);
1568 }
1569 });
1570 }
1571 }
1572 }
1573
1574 #pragma mark -
1575 #pragma mark dispatch_stream_t/dispatch_disk_t
1576
1577 static void
1578 _dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
1579 {
1580 dispatch_op_direction_t direction;
1581 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1582 dispatch_stream_t stream;
1583 stream = _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s));
1584 stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq",
1585 NULL);
1586 dispatch_set_context(stream->dq, stream);
1587 _dispatch_retain(tq);
1588 stream->dq->do_targetq = tq;
1589 TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
1590 TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
1591 fd_entry->streams[direction] = stream;
1592 }
1593 }
1594
1595 static void
1596 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
1597 dispatch_op_direction_t direction)
1598 {
1599 // On close queue
1600 dispatch_stream_t stream = fd_entry->streams[direction];
1601 if (!stream) {
1602 return;
1603 }
1604 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1605 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
1606 if (stream->source) {
1607 // Balanced by source cancel handler:
1608 _dispatch_fd_entry_retain(fd_entry);
1609 dispatch_source_cancel(stream->source);
1610 dispatch_resume(stream->source);
1611 dispatch_release(stream->source);
1612 }
1613 dispatch_set_context(stream->dq, NULL);
1614 dispatch_release(stream->dq);
1615 free(stream);
1616 }
1617
1618 static void
1619 _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
1620 {
1621 // On devs lock queue
1622 dispatch_disk_t disk;
1623 // Check to see if there is an existing entry for the given device
1624 uintptr_t hash = DIO_HASH(dev);
1625 TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
1626 if (disk->dev == dev) {
1627 _dispatch_retain(disk);
1628 goto out;
1629 }
1630 }
1631 // Otherwise create a new entry
1632 size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
1633 disk = _dispatch_alloc(DISPATCH_VTABLE(disk),
1634 sizeof(struct dispatch_disk_s) +
1635 (pending_reqs_depth * sizeof(dispatch_operation_t)));
1636 disk->do_next = DISPATCH_OBJECT_LISTLESS;
1637 disk->do_xref_cnt = -1;
1638 disk->advise_list_depth = pending_reqs_depth;
1639 disk->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
1640 false);
1641 disk->dev = dev;
1642 TAILQ_INIT(&disk->operations);
1643 disk->cur_rq = TAILQ_FIRST(&disk->operations);
1644 char label[45];
1645 snprintf(label, sizeof(label), "com.apple.libdispatch-io.deviceq.%d",
1646 (int)dev);
1647 disk->pick_queue = dispatch_queue_create(label, NULL);
1648 TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
1649 out:
1650 fd_entry->disk = disk;
1651 TAILQ_INIT(&fd_entry->stream_ops);
1652 }
1653
1654 void
1655 _dispatch_disk_dispose(dispatch_disk_t disk)
1656 {
1657 uintptr_t hash = DIO_HASH(disk->dev);
1658 TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
1659 dispatch_assert(TAILQ_EMPTY(&disk->operations));
1660 size_t i;
1661 for (i=0; i<disk->advise_list_depth; ++i) {
1662 dispatch_assert(!disk->advise_list[i]);
1663 }
1664 dispatch_release(disk->pick_queue);
1665 }
1666
1667 #pragma mark -
1668 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1669
1670 static inline bool
1671 _dispatch_stream_operation_avail(dispatch_stream_t stream)
1672 {
1673 return !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
1674 !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1675 }
1676
1677 static void
1678 _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
1679 dispatch_operation_t op, dispatch_data_t data)
1680 {
1681 if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
1682 return;
1683 }
1684 _dispatch_object_debug(op, "%s", __func__);
1685 bool no_ops = !_dispatch_stream_operation_avail(stream);
1686 TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
1687 if (no_ops) {
1688 dispatch_async_f(stream->dq, stream->dq,
1689 _dispatch_stream_queue_handler);
1690 }
1691 }
1692
1693 static void
1694 _dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
1695 dispatch_data_t data)
1696 {
1697 if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
1698 return;
1699 }
1700 _dispatch_object_debug(op, "%s", __func__);
1701 if (op->params.type == DISPATCH_IO_STREAM) {
1702 if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
1703 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1704 }
1705 TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
1706 } else {
1707 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1708 }
1709 _dispatch_disk_handler(disk);
1710 }
1711
1712 static void
1713 _dispatch_stream_complete_operation(dispatch_stream_t stream,
1714 dispatch_operation_t op)
1715 {
1716 // On stream queue
1717 _dispatch_object_debug(op, "%s", __func__);
1718 _dispatch_op_debug("complete: stream %p", op, stream);
1719 TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
1720 if (op == stream->op) {
1721 stream->op = NULL;
1722 }
1723 if (op->timer) {
1724 dispatch_source_cancel(op->timer);
1725 }
1726 // Final release will deliver any pending data
1727 _dispatch_op_debug("release -> %d (stream complete)", op, op->do_ref_cnt);
1728 _dispatch_release(op);
1729 }
1730
1731 static void
1732 _dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
1733 {
1734 // On pick queue
1735 _dispatch_object_debug(op, "%s", __func__);
1736 _dispatch_op_debug("complete: disk %p", op, disk);
1737 // Current request is always the last op returned
1738 if (disk->cur_rq == op) {
1739 disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
1740 operation_list);
1741 }
1742 if (op->params.type == DISPATCH_IO_STREAM) {
1743 // Check if there are other pending stream operations behind it
1744 dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
1745 TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
1746 if (op_next) {
1747 TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
1748 }
1749 }
1750 TAILQ_REMOVE(&disk->operations, op, operation_list);
1751 if (op->timer) {
1752 dispatch_source_cancel(op->timer);
1753 }
1754 // Final release will deliver any pending data
1755 _dispatch_op_debug("release -> %d (disk complete)", op, op->do_ref_cnt);
1756 _dispatch_release(op);
1757 }
1758
1759 static dispatch_operation_t
1760 _dispatch_stream_pick_next_operation(dispatch_stream_t stream,
1761 dispatch_operation_t op)
1762 {
1763 // On stream queue
1764 if (!op) {
1765 // On the first run through, pick the first operation
1766 if (!_dispatch_stream_operation_avail(stream)) {
1767 return op;
1768 }
1769 if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
1770 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
1771 } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
1772 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1773 }
1774 return op;
1775 }
1776 if (op->params.type == DISPATCH_IO_STREAM) {
1777 // Stream operations need to be serialized so continue the current
1778 // operation until it is finished
1779 return op;
1780 }
1781 // Get the next random operation (round-robin)
1782 if (op->params.type == DISPATCH_IO_RANDOM) {
1783 op = TAILQ_NEXT(op, operation_list);
1784 if (!op) {
1785 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1786 }
1787 return op;
1788 }
1789 return NULL;
1790 }
1791
1792 static dispatch_operation_t
1793 _dispatch_disk_pick_next_operation(dispatch_disk_t disk)
1794 {
1795 // On pick queue
1796 dispatch_operation_t op;
1797 if (!TAILQ_EMPTY(&disk->operations)) {
1798 if (disk->cur_rq == NULL) {
1799 op = TAILQ_FIRST(&disk->operations);
1800 } else {
1801 op = disk->cur_rq;
1802 do {
1803 op = TAILQ_NEXT(op, operation_list);
1804 if (!op) {
1805 op = TAILQ_FIRST(&disk->operations);
1806 }
1807 // TODO: more involved picking algorithm rdar://problem/8780312
1808 } while (op->active && op != disk->cur_rq);
1809 }
1810 if (!op->active) {
1811 disk->cur_rq = op;
1812 return op;
1813 }
1814 }
1815 return NULL;
1816 }
1817
1818 static void
1819 _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
1820 dispatch_io_t channel)
1821 {
1822 // On stream queue
1823 dispatch_operation_t op, tmp;
1824 typeof(*stream->operations) *operations;
1825 operations = &stream->operations[DISPATCH_IO_RANDOM];
1826 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1827 if (!channel || op->channel == channel) {
1828 _dispatch_stream_complete_operation(stream, op);
1829 }
1830 }
1831 operations = &stream->operations[DISPATCH_IO_STREAM];
1832 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1833 if (!channel || op->channel == channel) {
1834 _dispatch_stream_complete_operation(stream, op);
1835 }
1836 }
1837 if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
1838 dispatch_suspend(stream->source);
1839 stream->source_running = false;
1840 }
1841 }
1842
1843 static inline void
1844 _dispatch_disk_cleanup_specified_operations(dispatch_disk_t disk,
1845 dispatch_io_t channel, bool inactive_only)
1846 {
1847 // On pick queue
1848 dispatch_operation_t op, tmp;
1849 TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
1850 if (inactive_only && op->active) continue;
1851 if (!channel || op->channel == channel) {
1852 _dispatch_op_debug("cleanup: disk %p", op, disk);
1853 _dispatch_disk_complete_operation(disk, op);
1854 }
1855 }
1856 }
1857
1858 static void
1859 _dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
1860 {
1861 _dispatch_disk_cleanup_specified_operations(disk, channel, false);
1862 }
1863
1864 static void
1865 _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk,
1866 dispatch_io_t channel)
1867 {
1868 _dispatch_disk_cleanup_specified_operations(disk, channel, true);
1869 }
1870
1871 #pragma mark -
1872 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1873
1874 static dispatch_source_t
1875 _dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
1876 {
1877 // On stream queue
1878 if (stream->source) {
1879 return stream->source;
1880 }
1881 dispatch_fd_t fd = op->fd_entry->fd;
1882 _dispatch_op_debug("stream source create", op);
1883 dispatch_source_t source = NULL;
1884 if (op->direction == DOP_DIR_READ) {
1885 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
1886 (uintptr_t)fd, 0, stream->dq);
1887 } else if (op->direction == DOP_DIR_WRITE) {
1888 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE,
1889 (uintptr_t)fd, 0, stream->dq);
1890 } else {
1891 dispatch_assert(op->direction < DOP_DIR_MAX);
1892 return NULL;
1893 }
1894 dispatch_set_context(source, stream);
1895 dispatch_source_set_event_handler_f(source,
1896 _dispatch_stream_source_handler);
1897 // Close queue must not run user cleanup handlers until sources are fully
1898 // unregistered
1899 dispatch_queue_t close_queue = op->fd_entry->close_queue;
1900 dispatch_source_set_cancel_handler(source, ^{
1901 _dispatch_op_debug("stream source cancel", op);
1902 dispatch_resume(close_queue);
1903 });
1904 stream->source = source;
1905 return stream->source;
1906 }
1907
1908 static void
1909 _dispatch_stream_source_handler(void *ctx)
1910 {
1911 // On stream queue
1912 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1913 dispatch_suspend(stream->source);
1914 stream->source_running = false;
1915 return _dispatch_stream_handler(stream);
1916 }
1917
1918 static void
1919 _dispatch_stream_queue_handler(void *ctx)
1920 {
1921 // On stream queue
1922 dispatch_stream_t stream = (dispatch_stream_t)dispatch_get_context(ctx);
1923 if (!stream) {
1924 // _dispatch_stream_dispose has been called
1925 return;
1926 }
1927 return _dispatch_stream_handler(stream);
1928 }
1929
1930 static void
1931 _dispatch_stream_handler(void *ctx)
1932 {
1933 // On stream queue
1934 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1935 dispatch_operation_t op;
1936 pick:
1937 op = _dispatch_stream_pick_next_operation(stream, stream->op);
1938 if (!op) {
1939 _dispatch_debug("no operation found: stream %p", stream);
1940 return;
1941 }
1942 int err = _dispatch_io_get_error(op, NULL, true);
1943 if (err) {
1944 op->err = err;
1945 _dispatch_stream_complete_operation(stream, op);
1946 goto pick;
1947 }
1948 stream->op = op;
1949 _dispatch_op_debug("stream handler", op);
1950 dispatch_fd_entry_t fd_entry = op->fd_entry;
1951 _dispatch_fd_entry_retain(fd_entry);
1952 // For performance analysis
1953 if (!op->total && dispatch_io_defaults.initial_delivery) {
1954 // Empty delivery to signal the start of the operation
1955 _dispatch_op_debug("initial delivery", op);
1956 _dispatch_operation_deliver_data(op, DOP_DELIVER);
1957 }
1958 // TODO: perform on the operation target queue to get correct priority
1959 int result = _dispatch_operation_perform(op);
1960 dispatch_op_flags_t flags = ~0u;
1961 switch (result) {
1962 case DISPATCH_OP_DELIVER:
1963 flags = DOP_DEFAULT;
1964 // Fall through
1965 case DISPATCH_OP_DELIVER_AND_COMPLETE:
1966 flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
1967 DOP_DEFAULT;
1968 _dispatch_operation_deliver_data(op, flags);
1969 // Fall through
1970 case DISPATCH_OP_COMPLETE:
1971 if (flags != DOP_DEFAULT) {
1972 _dispatch_stream_complete_operation(stream, op);
1973 }
1974 if (_dispatch_stream_operation_avail(stream)) {
1975 dispatch_async_f(stream->dq, stream->dq,
1976 _dispatch_stream_queue_handler);
1977 }
1978 break;
1979 case DISPATCH_OP_COMPLETE_RESUME:
1980 _dispatch_stream_complete_operation(stream, op);
1981 // Fall through
1982 case DISPATCH_OP_RESUME:
1983 if (_dispatch_stream_operation_avail(stream)) {
1984 stream->source_running = true;
1985 dispatch_resume(_dispatch_stream_source(stream, op));
1986 }
1987 break;
1988 case DISPATCH_OP_ERR:
1989 _dispatch_stream_cleanup_operations(stream, op->channel);
1990 break;
1991 case DISPATCH_OP_FD_ERR:
1992 _dispatch_fd_entry_retain(fd_entry);
1993 dispatch_async(fd_entry->barrier_queue, ^{
1994 _dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
1995 _dispatch_fd_entry_release(fd_entry);
1996 });
1997 break;
1998 default:
1999 break;
2000 }
2001 _dispatch_fd_entry_release(fd_entry);
2002 return;
2003 }
2004
2005 static void
2006 _dispatch_disk_handler(void *ctx)
2007 {
2008 // On pick queue
2009 dispatch_disk_t disk = (dispatch_disk_t)ctx;
2010 if (disk->io_active) {
2011 return;
2012 }
2013 _dispatch_disk_debug("disk handler", disk);
2014 dispatch_operation_t op;
2015 size_t i = disk->free_idx, j = disk->req_idx;
2016 if (j <= i) {
2017 j += disk->advise_list_depth;
2018 }
2019 while (i <= j) {
2020 if ((!disk->advise_list[i%disk->advise_list_depth]) &&
2021 (op = _dispatch_disk_pick_next_operation(disk))) {
2022 int err = _dispatch_io_get_error(op, NULL, true);
2023 if (err) {
2024 op->err = err;
2025 _dispatch_disk_complete_operation(disk, op);
2026 continue;
2027 }
2028 _dispatch_retain(op);
2029 _dispatch_op_debug("retain -> %d", op, op->do_ref_cnt + 1);
2030 disk->advise_list[i%disk->advise_list_depth] = op;
2031 op->active = true;
2032 _dispatch_op_debug("activate: disk %p", op, disk);
2033 _dispatch_object_debug(op, "%s", __func__);
2034 } else {
2035 // No more operations to get
2036 break;
2037 }
2038 i++;
2039 }
2040 disk->free_idx = (i%disk->advise_list_depth);
2041 op = disk->advise_list[disk->req_idx];
2042 if (op) {
2043 disk->io_active = true;
2044 _dispatch_op_debug("async perform: disk %p", op, disk);
2045 dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
2046 }
2047 }
2048
2049 static void
2050 _dispatch_disk_perform(void *ctxt)
2051 {
2052 dispatch_disk_t disk = ctxt;
2053 _dispatch_disk_debug("disk perform", disk);
2054 size_t chunk_size = dispatch_io_defaults.chunk_size;
2055 dispatch_operation_t op;
2056 size_t i = disk->advise_idx, j = disk->free_idx;
2057 if (j <= i) {
2058 j += disk->advise_list_depth;
2059 }
2060 do {
2061 op = disk->advise_list[i%disk->advise_list_depth];
2062 if (!op) {
2063 // Nothing more to advise, must be at free_idx
2064 dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
2065 break;
2066 }
2067 if (op->direction == DOP_DIR_WRITE) {
2068 // TODO: preallocate writes ? rdar://problem/9032172
2069 continue;
2070 }
2071 if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
2072 op->channel)) {
2073 continue;
2074 }
2075 // For performance analysis
2076 if (!op->total && dispatch_io_defaults.initial_delivery) {
2077 // Empty delivery to signal the start of the operation
2078 _dispatch_op_debug("initial delivery", op);
2079 _dispatch_operation_deliver_data(op, DOP_DELIVER);
2080 }
2081 // Advise two chunks if the list only has one element and this is the
2082 // first advise on the operation
2083 if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
2084 !op->advise_offset) {
2085 chunk_size *= 2;
2086 }
2087 _dispatch_operation_advise(op, chunk_size);
2088 } while (++i < j);
2089 disk->advise_idx = i%disk->advise_list_depth;
2090 op = disk->advise_list[disk->req_idx];
2091 int result = _dispatch_operation_perform(op);
2092 disk->advise_list[disk->req_idx] = NULL;
2093 disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
2094 _dispatch_op_debug("async perform completion: disk %p", op, disk);
2095 dispatch_async(disk->pick_queue, ^{
2096 _dispatch_op_debug("perform completion", op);
2097 switch (result) {
2098 case DISPATCH_OP_DELIVER:
2099 _dispatch_operation_deliver_data(op, DOP_DEFAULT);
2100 break;
2101 case DISPATCH_OP_COMPLETE:
2102 _dispatch_disk_complete_operation(disk, op);
2103 break;
2104 case DISPATCH_OP_DELIVER_AND_COMPLETE:
2105 _dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
2106 _dispatch_disk_complete_operation(disk, op);
2107 break;
2108 case DISPATCH_OP_ERR:
2109 _dispatch_disk_cleanup_operations(disk, op->channel);
2110 break;
2111 case DISPATCH_OP_FD_ERR:
2112 _dispatch_disk_cleanup_operations(disk, NULL);
2113 break;
2114 default:
2115 dispatch_assert(result);
2116 break;
2117 }
2118 _dispatch_op_debug("deactivate: disk %p", op, disk);
2119 op->active = false;
2120 disk->io_active = false;
2121 _dispatch_disk_handler(disk);
2122 // Balancing the retain in _dispatch_disk_handler. Note that op must be
2123 // released at the very end, since it might hold the last reference to
2124 // the disk
2125 _dispatch_op_debug("release -> %d (disk perform complete)", op,
2126 op->do_ref_cnt);
2127 _dispatch_release(op);
2128 });
2129 }
2130
2131 #pragma mark -
2132 #pragma mark dispatch_operation_perform
2133
2134 static void
2135 _dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
2136 {
2137 _dispatch_op_debug("advise", op);
2138 if (_dispatch_io_get_error(op, NULL, true)) return;
2139 #ifdef __linux__
2140 // linux does not support fcntl (F_RDAVISE)
2141 // define necessary datastructure and use readahead
2142 struct radvisory {
2143 off_t ra_offset;
2144 int ra_count;
2145 };
2146 #endif
2147 int err;
2148 struct radvisory advise;
2149 // No point in issuing a read advise for the next chunk if we are already
2150 // a chunk ahead from reading the bytes
2151 if (op->advise_offset > (off_t)(((size_t)op->offset + op->total) +
2152 chunk_size + PAGE_SIZE)) {
2153 return;
2154 }
2155 _dispatch_object_debug(op, "%s", __func__);
2156 advise.ra_count = (int)chunk_size;
2157 if (!op->advise_offset) {
2158 op->advise_offset = op->offset;
2159 // If this is the first time through, align the advised range to a
2160 // page boundary
2161 size_t pg_fraction = ((size_t)op->offset + chunk_size) % PAGE_SIZE;
2162 advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
2163 }
2164 advise.ra_offset = op->advise_offset;
2165 op->advise_offset += advise.ra_count;
2166 #ifdef __linux__
2167 _dispatch_io_syscall_switch(err,
2168 readahead(op->fd_entry->fd, advise.ra_offset, advise.ra_count),
2169 case EINVAL: break; // fd does refer to a non-supported filetype
2170 default: (void)dispatch_assume_zero(err); break;
2171 );
2172 #else
2173 _dispatch_io_syscall_switch(err,
2174 fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
2175 case EFBIG: break; // advised past the end of the file rdar://10415691
2176 case ENOTSUP: break; // not all FS support radvise rdar://13484629
2177 // TODO: set disk status on error
2178 default: (void)dispatch_assume_zero(err); break;
2179 );
2180 #endif
2181 }
2182
2183 static int
2184 _dispatch_operation_perform(dispatch_operation_t op)
2185 {
2186 _dispatch_op_debug("perform", op);
2187 int err = _dispatch_io_get_error(op, NULL, true);
2188 if (err) {
2189 goto error;
2190 }
2191 _dispatch_object_debug(op, "%s", __func__);
2192 if (!op->buf) {
2193 size_t max_buf_siz = op->params.high;
2194 size_t chunk_siz = dispatch_io_defaults.chunk_size;
2195 if (op->direction == DOP_DIR_READ) {
2196 // If necessary, create a buffer for the ongoing operation, large
2197 // enough to fit chunk_size but at most high-water
2198 size_t data_siz = dispatch_data_get_size(op->data);
2199 if (data_siz) {
2200 dispatch_assert(data_siz < max_buf_siz);
2201 max_buf_siz -= data_siz;
2202 }
2203 if (max_buf_siz > chunk_siz) {
2204 max_buf_siz = chunk_siz;
2205 }
2206 if (op->length < SIZE_MAX) {
2207 op->buf_siz = op->length - op->total;
2208 if (op->buf_siz > max_buf_siz) {
2209 op->buf_siz = max_buf_siz;
2210 }
2211 } else {
2212 op->buf_siz = max_buf_siz;
2213 }
2214 op->buf = valloc(op->buf_siz);
2215 _dispatch_op_debug("buffer allocated", op);
2216 } else if (op->direction == DOP_DIR_WRITE) {
2217 // Always write the first data piece, if that is smaller than a
2218 // chunk, accumulate further data pieces until chunk size is reached
2219 if (chunk_siz > max_buf_siz) {
2220 chunk_siz = max_buf_siz;
2221 }
2222 op->buf_siz = 0;
2223 dispatch_data_apply(op->data,
2224 ^(dispatch_data_t region DISPATCH_UNUSED,
2225 size_t offset DISPATCH_UNUSED,
2226 const void* buf DISPATCH_UNUSED, size_t len) {
2227 size_t siz = op->buf_siz + len;
2228 if (!op->buf_siz || siz <= chunk_siz) {
2229 op->buf_siz = siz;
2230 }
2231 return (bool)(siz < chunk_siz);
2232 });
2233 if (op->buf_siz > max_buf_siz) {
2234 op->buf_siz = max_buf_siz;
2235 }
2236 dispatch_data_t d;
2237 d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
2238 op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
2239 NULL);
2240 _dispatch_io_data_release(d);
2241 _dispatch_op_debug("buffer mapped", op);
2242 }
2243 }
2244 if (op->fd_entry->fd == -1) {
2245 err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
2246 if (err) {
2247 goto error;
2248 }
2249 }
2250 void *buf = op->buf + op->buf_len;
2251 size_t len = op->buf_siz - op->buf_len;
2252 off_t off = (off_t)((size_t)op->offset + op->total);
2253 ssize_t processed = -1;
2254 syscall:
2255 if (op->direction == DOP_DIR_READ) {
2256 if (op->params.type == DISPATCH_IO_STREAM) {
2257 processed = read(op->fd_entry->fd, buf, len);
2258 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2259 processed = pread(op->fd_entry->fd, buf, len, off);
2260 }
2261 } else if (op->direction == DOP_DIR_WRITE) {
2262 if (op->params.type == DISPATCH_IO_STREAM) {
2263 processed = write(op->fd_entry->fd, buf, len);
2264 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2265 processed = pwrite(op->fd_entry->fd, buf, len, off);
2266 }
2267 }
2268 // Encountered an error on the file descriptor
2269 if (processed == -1) {
2270 err = errno;
2271 if (err == EINTR) {
2272 goto syscall;
2273 }
2274 goto error;
2275 }
2276 // EOF is indicated by two handler invocations
2277 if (processed == 0) {
2278 _dispatch_op_debug("performed: EOF", op);
2279 return DISPATCH_OP_DELIVER_AND_COMPLETE;
2280 }
2281 op->buf_len += (size_t)processed;
2282 op->total += (size_t)processed;
2283 if (op->total == op->length) {
2284 // Finished processing all the bytes requested by the operation
2285 return DISPATCH_OP_COMPLETE;
2286 } else {
2287 // Deliver data only if we satisfy the filters
2288 return DISPATCH_OP_DELIVER;
2289 }
2290 error:
2291 if (err == EAGAIN || err == EWOULDBLOCK) {
2292 // For disk based files with blocking I/O we should never get EAGAIN
2293 dispatch_assert(!op->fd_entry->disk);
2294 _dispatch_op_debug("performed: EAGAIN/EWOULDBLOCK", op);
2295 if (op->direction == DOP_DIR_READ && op->total &&
2296 op->channel == op->fd_entry->convenience_channel) {
2297 // Convenience read with available data completes on EAGAIN
2298 return DISPATCH_OP_COMPLETE_RESUME;
2299 }
2300 return DISPATCH_OP_RESUME;
2301 }
2302 _dispatch_op_debug("performed: err %d", op, err);
2303 op->err = err;
2304 switch (err) {
2305 case ECANCELED:
2306 return DISPATCH_OP_ERR;
2307 case EBADF:
2308 (void)os_atomic_cmpxchg2o(op->fd_entry, err, 0, err, relaxed);
2309 return DISPATCH_OP_FD_ERR;
2310 default:
2311 return DISPATCH_OP_COMPLETE;
2312 }
2313 }
2314
2315 static void
2316 _dispatch_operation_deliver_data(dispatch_operation_t op,
2317 dispatch_op_flags_t flags)
2318 {
2319 // Either called from stream resp. pick queue or when op is finalized
2320 dispatch_data_t data = NULL;
2321 int err = 0;
2322 size_t undelivered = op->undelivered + op->buf_len;
2323 bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
2324 (op->flags & DOP_DELIVER);
2325 op->flags = DOP_DEFAULT;
2326 if (!deliver) {
2327 // Don't deliver data until low water mark has been reached
2328 if (undelivered >= op->params.low) {
2329 deliver = true;
2330 } else if (op->buf_len < op->buf_siz) {
2331 // Request buffer is not yet used up
2332 _dispatch_op_debug("buffer data: undelivered %zu", op, undelivered);
2333 return;
2334 }
2335 } else {
2336 err = op->err;
2337 if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
2338 err = ECANCELED;
2339 op->err = err;
2340 }
2341 }
2342 // Deliver data or buffer used up
2343 if (op->direction == DOP_DIR_READ) {
2344 if (op->buf_len) {
2345 void *buf = op->buf;
2346 data = dispatch_data_create(buf, op->buf_len, NULL,
2347 DISPATCH_DATA_DESTRUCTOR_FREE);
2348 op->buf = NULL;
2349 op->buf_len = 0;
2350 dispatch_data_t d = dispatch_data_create_concat(op->data, data);
2351 _dispatch_io_data_release(op->data);
2352 _dispatch_io_data_release(data);
2353 data = d;
2354 } else {
2355 data = op->data;
2356 }
2357 op->data = deliver ? dispatch_data_empty : data;
2358 } else if (op->direction == DOP_DIR_WRITE) {
2359 if (deliver) {
2360 data = dispatch_data_create_subrange(op->data, op->buf_len,
2361 op->length);
2362 }
2363 if (op->buf_data && op->buf_len == op->buf_siz) {
2364 _dispatch_io_data_release(op->buf_data);
2365 op->buf_data = NULL;
2366 op->buf = NULL;
2367 op->buf_len = 0;
2368 // Trim newly written buffer from head of unwritten data
2369 dispatch_data_t d;
2370 if (deliver) {
2371 _dispatch_io_data_retain(data);
2372 d = data;
2373 } else {
2374 d = dispatch_data_create_subrange(op->data, op->buf_siz,
2375 op->length);
2376 }
2377 _dispatch_io_data_release(op->data);
2378 op->data = d;
2379 }
2380 } else {
2381 dispatch_assert(op->direction < DOP_DIR_MAX);
2382 return;
2383 }
2384 if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
2385 op->undelivered = undelivered;
2386 _dispatch_op_debug("buffer data: undelivered %zu", op, undelivered);
2387 return;
2388 }
2389 op->undelivered = 0;
2390 _dispatch_object_debug(op, "%s", __func__);
2391 _dispatch_op_debug("deliver data", op);
2392 dispatch_op_direction_t direction = op->direction;
2393 dispatch_io_handler_t handler = op->handler;
2394 dispatch_fd_entry_t fd_entry = op->fd_entry;
2395 _dispatch_fd_entry_retain(fd_entry);
2396 dispatch_io_t channel = op->channel;
2397 _dispatch_retain(channel);
2398 // Note that data delivery may occur after the operation is freed
2399 dispatch_async(op->op_q, ^{
2400 bool done = (flags & DOP_DONE);
2401 dispatch_data_t d = data;
2402 if (done) {
2403 if (direction == DOP_DIR_READ && err) {
2404 if (dispatch_data_get_size(d)) {
2405 _dispatch_op_debug("IO handler invoke", op);
2406 handler(false, d, 0);
2407 }
2408 d = NULL;
2409 } else if (direction == DOP_DIR_WRITE && !err) {
2410 d = NULL;
2411 }
2412 }
2413 _dispatch_op_debug("IO handler invoke: err %d", op, err);
2414 handler(done, d, err);
2415 _dispatch_release(channel);
2416 _dispatch_fd_entry_release(fd_entry);
2417 _dispatch_io_data_release(data);
2418 });
2419 }
2420
2421 #pragma mark -
2422 #pragma mark dispatch_io_debug
2423
2424 static size_t
2425 _dispatch_io_debug_attr(dispatch_io_t channel, char* buf, size_t bufsiz)
2426 {
2427 dispatch_queue_t target = channel->do_targetq;
2428 return dsnprintf(buf, bufsiz, "type = %s, fd = 0x%x, %sfd_entry = %p, "
2429 "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = "
2430 "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ",
2431 channel->params.type == DISPATCH_IO_STREAM ? "stream" : "random",
2432 channel->fd_actual, channel->atomic_flags & DIO_STOPPED ?
2433 "stopped, " : channel->atomic_flags & DIO_CLOSED ? "closed, " : "",
2434 channel->fd_entry, channel->queue, target && target->dq_label ?
2435 target->dq_label : "", target, channel->barrier_queue,
2436 channel->barrier_group, channel->err, channel->params.low,
2437 channel->params.high, channel->params.interval_flags &
2438 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "",
2439 (unsigned long long) channel->params.interval);
2440 }
2441
2442 size_t
2443 _dispatch_io_debug(dispatch_io_t channel, char* buf, size_t bufsiz)
2444 {
2445 size_t offset = 0;
2446 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2447 dx_kind(channel), channel);
2448 offset += _dispatch_object_debug_attr(channel, &buf[offset],
2449 bufsiz - offset);
2450 offset += _dispatch_io_debug_attr(channel, &buf[offset], bufsiz - offset);
2451 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2452 return offset;
2453 }
2454
2455 static size_t
2456 _dispatch_operation_debug_attr(dispatch_operation_t op, char* buf,
2457 size_t bufsiz)
2458 {
2459 dispatch_queue_t target = op->do_targetq;
2460 dispatch_queue_t oqtarget = op->op_q ? op->op_q->do_targetq : NULL;
2461 return dsnprintf(buf, bufsiz, "type = %s %s, fd = 0x%x, fd_entry = %p, "
2462 "channel = %p, queue = %p -> %s[%p], target = %s[%p], "
2463 "offset = %lld, length = %zu, done = %zu, undelivered = %zu, "
2464 "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, "
2465 "interval%s = %llu ", op->params.type == DISPATCH_IO_STREAM ?
2466 "stream" : "random", op->direction == DOP_DIR_READ ? "read" :
2467 "write", op->fd_entry ? op->fd_entry->fd : -1, op->fd_entry,
2468 op->channel, op->op_q, oqtarget && oqtarget->dq_label ?
2469 oqtarget->dq_label : "", oqtarget, target && target->dq_label ?
2470 target->dq_label : "", target, (long long)op->offset, op->length,
2471 op->total, op->undelivered + op->buf_len, op->flags, op->err,
2472 op->params.low, op->params.high, op->params.interval_flags &
2473 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "",
2474 (unsigned long long)op->params.interval);
2475 }
2476
2477 size_t
2478 _dispatch_operation_debug(dispatch_operation_t op, char* buf, size_t bufsiz)
2479 {
2480 size_t offset = 0;
2481 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2482 dx_kind(op), op);
2483 offset += _dispatch_object_debug_attr(op, &buf[offset], bufsiz - offset);
2484 offset += _dispatch_operation_debug_attr(op, &buf[offset], bufsiz - offset);
2485 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2486 return offset;
2487 }