]> git.saurik.com Git - apple/libdispatch.git/blob - src/io.c
libdispatch-703.1.4.tar.gz
[apple/libdispatch.git] / src / io.c
1 /*
2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 #ifndef DISPATCH_IO_DEBUG
24 #define DISPATCH_IO_DEBUG DISPATCH_DEBUG
25 #endif
26
27 #if DISPATCH_DATA_IS_BRIDGED_TO_NSDATA
28 #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
29 #define _dispatch_io_data_release(x) _dispatch_objc_release(x)
30 #else
31 #define _dispatch_io_data_retain(x) dispatch_retain(x)
32 #define _dispatch_io_data_release(x) dispatch_release(x)
33 #endif
34
35 typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
36
37 DISPATCH_EXPORT DISPATCH_NOTHROW
38 void _dispatch_iocntl(uint32_t param, uint64_t value);
39
40 static dispatch_operation_t _dispatch_operation_create(
41 dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
42 size_t length, dispatch_data_t data, dispatch_queue_t queue,
43 dispatch_io_handler_t handler);
44 static void _dispatch_operation_enqueue(dispatch_operation_t op,
45 dispatch_op_direction_t direction, dispatch_data_t data);
46 static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
47 dispatch_operation_t op);
48 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
49 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
50 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
51 dispatch_fd_entry_init_callback_t completion_callback);
52 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
53 uintptr_t hash);
54 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
55 dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
56 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
57 dispatch_io_t channel);
58 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
59 dispatch_io_t channel);
60 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
61 dispatch_queue_t tq);
62 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
63 dispatch_op_direction_t direction);
64 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
65 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
66 dispatch_operation_t operation, dispatch_data_t data);
67 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
68 dispatch_operation_t operation, dispatch_data_t data);
69 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
70 dispatch_io_t channel);
71 static void _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk,
72 dispatch_io_t channel);
73 static void _dispatch_stream_source_handler(void *ctx);
74 static void _dispatch_stream_queue_handler(void *ctx);
75 static void _dispatch_stream_handler(void *ctx);
76 static void _dispatch_disk_handler(void *ctx);
77 static void _dispatch_disk_perform(void *ctxt);
78 static void _dispatch_operation_advise(dispatch_operation_t op,
79 size_t chunk_size);
80 static int _dispatch_operation_perform(dispatch_operation_t op);
81 static void _dispatch_operation_deliver_data(dispatch_operation_t op,
82 dispatch_op_flags_t flags);
83
84 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
85 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
86 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
87 case EINTR: continue; \
88 __VA_ARGS__ \
89 } \
90 break; \
91 } while (1)
92 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
93 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
94 case 0: break; \
95 __VA_ARGS__ \
96 ); \
97 } while (0)
98 #define _dispatch_io_syscall(__syscall) do { int __err; \
99 _dispatch_io_syscall_switch(__err, __syscall); \
100 } while (0)
101
102 enum {
103 DISPATCH_OP_COMPLETE = 1,
104 DISPATCH_OP_DELIVER,
105 DISPATCH_OP_DELIVER_AND_COMPLETE,
106 DISPATCH_OP_COMPLETE_RESUME,
107 DISPATCH_OP_RESUME,
108 DISPATCH_OP_ERR,
109 DISPATCH_OP_FD_ERR,
110 };
111
112 #define _dispatch_io_Block_copy(x) \
113 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
114
115 #pragma mark -
116 #pragma mark dispatch_io_debug
117
118 #if DISPATCH_IO_DEBUG
119 #if !DISPATCH_DEBUG
120 #define _dispatch_io_log(x, ...) do { \
121 _dispatch_log("%llu\t%p\t" x, _dispatch_absolute_time(), \
122 (void *)_dispatch_thread_self(), ##__VA_ARGS__); \
123 } while (0)
124 #ifdef _dispatch_object_debug
125 #undef _dispatch_object_debug
126 #define _dispatch_object_debug dispatch_debug
127 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
128 #endif
129 #else
130 #define _dispatch_io_log(x, ...) _dispatch_debug(x, ##__VA_ARGS__)
131 #endif // DISPATCH_DEBUG
132 #else
133 #define _dispatch_io_log(x, ...)
134 #endif // DISPATCH_IO_DEBUG
135
136 #define _dispatch_fd_debug(msg, fd, ...) \
137 _dispatch_io_log("fd[0x%x]: " msg, fd, ##__VA_ARGS__)
138 #define _dispatch_op_debug(msg, op, ...) \
139 _dispatch_io_log("op[%p]: " msg, op, ##__VA_ARGS__)
140 #define _dispatch_channel_debug(msg, channel, ...) \
141 _dispatch_io_log("channel[%p]: " msg, channel, ##__VA_ARGS__)
142 #define _dispatch_fd_entry_debug(msg, fd_entry, ...) \
143 _dispatch_io_log("fd_entry[%p]: " msg, fd_entry, ##__VA_ARGS__)
144 #define _dispatch_disk_debug(msg, disk, ...) \
145 _dispatch_io_log("disk[%p]: " msg, disk, ##__VA_ARGS__)
146
147 #pragma mark -
148 #pragma mark dispatch_io_hashtables
149
150 // Global hashtable of dev_t -> disk_s mappings
151 DISPATCH_CACHELINE_ALIGN
152 static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
153 // Global hashtable of fd -> fd_entry_s mappings
154 DISPATCH_CACHELINE_ALIGN
155 static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
156
157 static dispatch_once_t _dispatch_io_devs_lockq_pred;
158 static dispatch_queue_t _dispatch_io_devs_lockq;
159 static dispatch_queue_t _dispatch_io_fds_lockq;
160
161 static char const * const _dispatch_io_key = "io";
162
163 static void
164 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
165 {
166 _dispatch_io_fds_lockq = dispatch_queue_create(
167 "com.apple.libdispatch-io.fd_lockq", NULL);
168 unsigned int i;
169 for (i = 0; i < DIO_HASH_SIZE; i++) {
170 TAILQ_INIT(&_dispatch_io_fds[i]);
171 }
172 }
173
174 static void
175 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
176 {
177 _dispatch_io_devs_lockq = dispatch_queue_create(
178 "com.apple.libdispatch-io.dev_lockq", NULL);
179 unsigned int i;
180 for (i = 0; i < DIO_HASH_SIZE; i++) {
181 TAILQ_INIT(&_dispatch_io_devs[i]);
182 }
183 }
184
185 #pragma mark -
186 #pragma mark dispatch_io_defaults
187
188 enum {
189 DISPATCH_IOCNTL_CHUNK_PAGES = 1,
190 DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
191 DISPATCH_IOCNTL_INITIAL_DELIVERY,
192 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
193 };
194
195 static struct dispatch_io_defaults_s {
196 size_t chunk_size, low_water_chunks, max_pending_io_reqs;
197 bool initial_delivery;
198 } dispatch_io_defaults = {
199 .chunk_size = DIO_MAX_CHUNK_SIZE,
200 .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
201 .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
202 };
203
204 #define _dispatch_iocntl_set_default(p, v) do { \
205 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
206 } while (0)
207
208 void
209 _dispatch_iocntl(uint32_t param, uint64_t value)
210 {
211 switch (param) {
212 case DISPATCH_IOCNTL_CHUNK_PAGES:
213 _dispatch_iocntl_set_default(chunk_size, value * PAGE_SIZE);
214 break;
215 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
216 _dispatch_iocntl_set_default(low_water_chunks, value);
217 break;
218 case DISPATCH_IOCNTL_INITIAL_DELIVERY:
219 _dispatch_iocntl_set_default(initial_delivery, value);
220 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
221 _dispatch_iocntl_set_default(max_pending_io_reqs, value);
222 break;
223 }
224 }
225
226 #pragma mark -
227 #pragma mark dispatch_io_t
228
229 static dispatch_io_t
230 _dispatch_io_create(dispatch_io_type_t type)
231 {
232 dispatch_io_t channel = _dispatch_alloc(DISPATCH_VTABLE(io),
233 sizeof(struct dispatch_io_s));
234 channel->do_next = DISPATCH_OBJECT_LISTLESS;
235 channel->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
236 true);
237 channel->params.type = type;
238 channel->params.high = SIZE_MAX;
239 channel->params.low = dispatch_io_defaults.low_water_chunks *
240 dispatch_io_defaults.chunk_size;
241 channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
242 NULL);
243 return channel;
244 }
245
246 static void
247 _dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
248 dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
249 {
250 // Enqueue the cleanup handler on the suspended close queue
251 if (cleanup_handler) {
252 _dispatch_retain(queue);
253 dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
254 dispatch_async(queue, ^{
255 _dispatch_channel_debug("cleanup handler invoke: err %d",
256 channel, err);
257 cleanup_handler(err);
258 });
259 _dispatch_release(queue);
260 });
261 }
262 if (fd_entry) {
263 channel->fd_entry = fd_entry;
264 dispatch_retain(fd_entry->barrier_queue);
265 dispatch_retain(fd_entry->barrier_group);
266 channel->barrier_queue = fd_entry->barrier_queue;
267 channel->barrier_group = fd_entry->barrier_group;
268 } else {
269 // Still need to create a barrier queue, since all operations go
270 // through it
271 channel->barrier_queue = dispatch_queue_create(
272 "com.apple.libdispatch-io.barrierq", NULL);
273 channel->barrier_group = dispatch_group_create();
274 }
275 }
276
277 void
278 _dispatch_io_dispose(dispatch_io_t channel)
279 {
280 _dispatch_object_debug(channel, "%s", __func__);
281 if (channel->fd_entry &&
282 !(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
283 if (channel->fd_entry->path_data) {
284 // This modification is safe since path_data->channel is checked
285 // only on close_queue (which is still suspended at this point)
286 channel->fd_entry->path_data->channel = NULL;
287 }
288 // Cleanup handlers will only run when all channels related to this
289 // fd are complete
290 _dispatch_fd_entry_release(channel->fd_entry);
291 }
292 if (channel->queue) {
293 dispatch_release(channel->queue);
294 }
295 if (channel->barrier_queue) {
296 dispatch_release(channel->barrier_queue);
297 }
298 if (channel->barrier_group) {
299 dispatch_release(channel->barrier_group);
300 }
301 }
302
303 static int
304 _dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
305 {
306 int err = 0;
307 if (S_ISDIR(mode)) {
308 err = EISDIR;
309 } else if (channel->params.type == DISPATCH_IO_RANDOM &&
310 (S_ISFIFO(mode) || S_ISSOCK(mode))) {
311 err = ESPIPE;
312 }
313 return err;
314 }
315
316 static int
317 _dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
318 bool ignore_closed)
319 {
320 // On _any_ queue
321 int err;
322 if (op) {
323 channel = op->channel;
324 }
325 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
326 if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
327 err = ECANCELED;
328 } else {
329 err = 0;
330 }
331 } else {
332 err = op ? op->fd_entry->err : channel->err;
333 }
334 return err;
335 }
336
337 #pragma mark -
338 #pragma mark dispatch_io_channels
339
340 dispatch_io_t
341 dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
342 dispatch_queue_t queue, void (^cleanup_handler)(int))
343 {
344 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
345 return DISPATCH_BAD_INPUT;
346 }
347 dispatch_io_t channel = _dispatch_io_create(type);
348 channel->fd = fd;
349 _dispatch_channel_debug("create", channel);
350 channel->fd_actual = fd;
351 dispatch_suspend(channel->queue);
352 _dispatch_retain(queue);
353 _dispatch_retain(channel);
354 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
355 // On barrier queue
356 int err = fd_entry->err;
357 if (!err) {
358 err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
359 }
360 if (!err && type == DISPATCH_IO_RANDOM) {
361 off_t f_ptr;
362 _dispatch_io_syscall_switch_noerr(err,
363 f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
364 case 0: channel->f_ptr = f_ptr; break;
365 default: (void)dispatch_assume_zero(err); break;
366 );
367 }
368 channel->err = err;
369 _dispatch_fd_entry_retain(fd_entry);
370 _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
371 dispatch_resume(channel->queue);
372 _dispatch_object_debug(channel, "%s", __func__);
373 _dispatch_release(channel);
374 _dispatch_release(queue);
375 });
376 _dispatch_object_debug(channel, "%s", __func__);
377 return channel;
378 }
379
380 dispatch_io_t
381 dispatch_io_create_f(dispatch_io_type_t type, dispatch_fd_t fd,
382 dispatch_queue_t queue, void *context,
383 void (*cleanup_handler)(void *context, int error))
384 {
385 return dispatch_io_create(type, fd, queue, !cleanup_handler ? NULL :
386 ^(int error){ cleanup_handler(context, error); });
387 }
388
389 dispatch_io_t
390 dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
391 int oflag, mode_t mode, dispatch_queue_t queue,
392 void (^cleanup_handler)(int error))
393 {
394 if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
395 !(*path == '/')) {
396 return DISPATCH_BAD_INPUT;
397 }
398 size_t pathlen = strlen(path);
399 dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
400 if (!path_data) {
401 return DISPATCH_OUT_OF_MEMORY;
402 }
403 dispatch_io_t channel = _dispatch_io_create(type);
404 channel->fd = -1;
405 _dispatch_channel_debug("create with path %s", channel, path);
406 channel->fd_actual = -1;
407 path_data->channel = channel;
408 path_data->oflag = oflag;
409 path_data->mode = mode;
410 path_data->pathlen = pathlen;
411 memcpy(path_data->path, path, pathlen + 1);
412 _dispatch_retain(queue);
413 _dispatch_retain(channel);
414 dispatch_async(channel->queue, ^{
415 int err = 0;
416 struct stat st;
417 _dispatch_io_syscall_switch_noerr(err,
418 (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW
419 #ifndef __linux__
420 || (path_data->oflag & O_SYMLINK) == O_SYMLINK
421 #endif
422 ? lstat(path_data->path, &st) : stat(path_data->path, &st),
423 case 0:
424 err = _dispatch_io_validate_type(channel, st.st_mode);
425 break;
426 default:
427 if ((path_data->oflag & O_CREAT) &&
428 (*(path_data->path + path_data->pathlen - 1) != '/')) {
429 // Check parent directory
430 char *c = strrchr(path_data->path, '/');
431 dispatch_assert(c);
432 *c = 0;
433 int perr;
434 _dispatch_io_syscall_switch_noerr(perr,
435 stat(path_data->path, &st),
436 case 0:
437 // Since the parent directory exists, open() will
438 // create a regular file after the fd_entry has
439 // been filled in
440 st.st_mode = S_IFREG;
441 err = 0;
442 break;
443 );
444 *c = '/';
445 }
446 break;
447 );
448 channel->err = err;
449 if (err) {
450 free(path_data);
451 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
452 _dispatch_release(channel);
453 _dispatch_release(queue);
454 return;
455 }
456 dispatch_suspend(channel->queue);
457 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
458 _dispatch_io_devs_lockq_init);
459 dispatch_async(_dispatch_io_devs_lockq, ^{
460 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
461 path_data, st.st_dev, st.st_mode);
462 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
463 dispatch_resume(channel->queue);
464 _dispatch_object_debug(channel, "%s", __func__);
465 _dispatch_release(channel);
466 _dispatch_release(queue);
467 });
468 });
469 _dispatch_object_debug(channel, "%s", __func__);
470 return channel;
471 }
472
473 dispatch_io_t
474 dispatch_io_create_with_path_f(dispatch_io_type_t type, const char *path,
475 int oflag, mode_t mode, dispatch_queue_t queue, void *context,
476 void (*cleanup_handler)(void *context, int error))
477 {
478 return dispatch_io_create_with_path(type, path, oflag, mode, queue,
479 !cleanup_handler ? NULL :
480 ^(int error){ cleanup_handler(context, error); });
481 }
482
483 dispatch_io_t
484 dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
485 dispatch_queue_t queue, void (^cleanup_handler)(int error))
486 {
487 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
488 return DISPATCH_BAD_INPUT;
489 }
490 dispatch_io_t channel = _dispatch_io_create(type);
491 _dispatch_channel_debug("create with channel %p", channel, in_channel);
492 dispatch_suspend(channel->queue);
493 _dispatch_retain(queue);
494 _dispatch_retain(channel);
495 _dispatch_retain(in_channel);
496 dispatch_async(in_channel->queue, ^{
497 int err0 = _dispatch_io_get_error(NULL, in_channel, false);
498 if (err0) {
499 channel->err = err0;
500 _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
501 dispatch_resume(channel->queue);
502 _dispatch_release(channel);
503 _dispatch_release(in_channel);
504 _dispatch_release(queue);
505 return;
506 }
507 dispatch_async(in_channel->barrier_queue, ^{
508 int err = _dispatch_io_get_error(NULL, in_channel, false);
509 // If there is no error, the fd_entry for the in_channel is valid.
510 // Since we are running on in_channel's queue, the fd_entry has been
511 // fully resolved and will stay valid for the duration of this block
512 if (!err) {
513 err = in_channel->err;
514 if (!err) {
515 err = in_channel->fd_entry->err;
516 }
517 }
518 if (!err) {
519 err = _dispatch_io_validate_type(channel,
520 in_channel->fd_entry->stat.mode);
521 }
522 if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
523 off_t f_ptr;
524 _dispatch_io_syscall_switch_noerr(err,
525 f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
526 case 0: channel->f_ptr = f_ptr; break;
527 default: (void)dispatch_assume_zero(err); break;
528 );
529 }
530 channel->err = err;
531 if (err) {
532 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
533 dispatch_resume(channel->queue);
534 _dispatch_release(channel);
535 _dispatch_release(in_channel);
536 _dispatch_release(queue);
537 return;
538 }
539 if (in_channel->fd == -1) {
540 // in_channel was created from path
541 channel->fd = -1;
542 channel->fd_actual = -1;
543 mode_t mode = in_channel->fd_entry->stat.mode;
544 dev_t dev = in_channel->fd_entry->stat.dev;
545 size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
546 in_channel->fd_entry->path_data->pathlen + 1;
547 dispatch_io_path_data_t path_data = malloc(path_data_len);
548 memcpy(path_data, in_channel->fd_entry->path_data,
549 path_data_len);
550 path_data->channel = channel;
551 // lockq_io_devs is known to already exist
552 dispatch_async(_dispatch_io_devs_lockq, ^{
553 dispatch_fd_entry_t fd_entry;
554 fd_entry = _dispatch_fd_entry_create_with_path(path_data,
555 dev, mode);
556 _dispatch_io_init(channel, fd_entry, queue, 0,
557 cleanup_handler);
558 dispatch_resume(channel->queue);
559 _dispatch_release(channel);
560 _dispatch_release(queue);
561 });
562 } else {
563 dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
564 channel->fd = in_channel->fd;
565 channel->fd_actual = in_channel->fd_actual;
566 _dispatch_fd_entry_retain(fd_entry);
567 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
568 dispatch_resume(channel->queue);
569 _dispatch_release(channel);
570 _dispatch_release(queue);
571 }
572 _dispatch_release(in_channel);
573 _dispatch_object_debug(channel, "%s", __func__);
574 });
575 });
576 _dispatch_object_debug(channel, "%s", __func__);
577 return channel;
578 }
579
580 dispatch_io_t
581 dispatch_io_create_with_io_f(dispatch_io_type_t type, dispatch_io_t in_channel,
582 dispatch_queue_t queue, void *context,
583 void (*cleanup_handler)(void *context, int error))
584 {
585 return dispatch_io_create_with_io(type, in_channel, queue,
586 !cleanup_handler ? NULL :
587 ^(int error){ cleanup_handler(context, error); });
588 }
589
590 #pragma mark -
591 #pragma mark dispatch_io_accessors
592
593 void
594 dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
595 {
596 _dispatch_retain(channel);
597 dispatch_async(channel->queue, ^{
598 _dispatch_channel_debug("set high water: %zu", channel, high_water);
599 if (channel->params.low > high_water) {
600 channel->params.low = high_water;
601 }
602 channel->params.high = high_water ? high_water : 1;
603 _dispatch_release(channel);
604 });
605 }
606
607 void
608 dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
609 {
610 _dispatch_retain(channel);
611 dispatch_async(channel->queue, ^{
612 _dispatch_channel_debug("set low water: %zu", channel, low_water);
613 if (channel->params.high < low_water) {
614 channel->params.high = low_water ? low_water : 1;
615 }
616 channel->params.low = low_water;
617 _dispatch_release(channel);
618 });
619 }
620
621 void
622 dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
623 unsigned long flags)
624 {
625 _dispatch_retain(channel);
626 dispatch_async(channel->queue, ^{
627 _dispatch_channel_debug("set interval: %llu", channel, interval);
628 channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX;
629 channel->params.interval_flags = flags;
630 _dispatch_release(channel);
631 });
632 }
633
634 void
635 _dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
636 {
637 _dispatch_retain(dq);
638 _dispatch_retain(channel);
639 dispatch_async(channel->queue, ^{
640 dispatch_queue_t prev_dq = channel->do_targetq;
641 channel->do_targetq = dq;
642 _dispatch_release(prev_dq);
643 _dispatch_object_debug(channel, "%s", __func__);
644 _dispatch_release(channel);
645 });
646 }
647
648 dispatch_fd_t
649 dispatch_io_get_descriptor(dispatch_io_t channel)
650 {
651 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
652 return -1;
653 }
654 dispatch_fd_t fd = channel->fd_actual;
655 if (fd == -1 && !_dispatch_io_get_error(NULL, channel, false)) {
656 dispatch_thread_context_t ctxt =
657 _dispatch_thread_context_find(_dispatch_io_key);
658 if (ctxt && ctxt->dtc_io_in_barrier == channel) {
659 (void)_dispatch_fd_entry_open(channel->fd_entry, channel);
660 }
661 }
662 return channel->fd_actual;
663 }
664
665 #pragma mark -
666 #pragma mark dispatch_io_operations
667
668 static void
669 _dispatch_io_stop(dispatch_io_t channel)
670 {
671 _dispatch_channel_debug("stop", channel);
672 (void)os_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed);
673 _dispatch_retain(channel);
674 dispatch_async(channel->queue, ^{
675 dispatch_async(channel->barrier_queue, ^{
676 _dispatch_object_debug(channel, "%s", __func__);
677 dispatch_fd_entry_t fd_entry = channel->fd_entry;
678 if (fd_entry) {
679 _dispatch_channel_debug("stop cleanup", channel);
680 _dispatch_fd_entry_cleanup_operations(fd_entry, channel);
681 if (!(channel->atomic_flags & DIO_CLOSED)) {
682 channel->fd_entry = NULL;
683 _dispatch_fd_entry_release(fd_entry);
684 }
685 } else if (channel->fd != -1) {
686 // Stop after close, need to check if fd_entry still exists
687 _dispatch_retain(channel);
688 dispatch_async(_dispatch_io_fds_lockq, ^{
689 _dispatch_object_debug(channel, "%s", __func__);
690 _dispatch_channel_debug("stop cleanup after close",
691 channel);
692 dispatch_fd_entry_t fdi;
693 uintptr_t hash = DIO_HASH(channel->fd);
694 TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
695 if (fdi->fd == channel->fd) {
696 _dispatch_fd_entry_cleanup_operations(fdi, channel);
697 break;
698 }
699 }
700 _dispatch_release(channel);
701 });
702 }
703 _dispatch_release(channel);
704 });
705 });
706 }
707
708 void
709 dispatch_io_close(dispatch_io_t channel, unsigned long flags)
710 {
711 if (flags & DISPATCH_IO_STOP) {
712 // Don't stop an already stopped channel
713 if (channel->atomic_flags & DIO_STOPPED) {
714 return;
715 }
716 return _dispatch_io_stop(channel);
717 }
718 // Don't close an already closed or stopped channel
719 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
720 return;
721 }
722 _dispatch_retain(channel);
723 dispatch_async(channel->queue, ^{
724 dispatch_async(channel->barrier_queue, ^{
725 _dispatch_object_debug(channel, "%s", __func__);
726 _dispatch_channel_debug("close", channel);
727 if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
728 (void)os_atomic_or2o(channel, atomic_flags, DIO_CLOSED,
729 relaxed);
730 dispatch_fd_entry_t fd_entry = channel->fd_entry;
731 if (fd_entry) {
732 if (!fd_entry->path_data) {
733 channel->fd_entry = NULL;
734 }
735 _dispatch_fd_entry_release(fd_entry);
736 }
737 }
738 _dispatch_release(channel);
739 });
740 });
741 }
742
743 void
744 dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
745 {
746 _dispatch_retain(channel);
747 dispatch_async(channel->queue, ^{
748 dispatch_queue_t io_q = channel->do_targetq;
749 dispatch_queue_t barrier_queue = channel->barrier_queue;
750 dispatch_group_t barrier_group = channel->barrier_group;
751 dispatch_async(barrier_queue, ^{
752 dispatch_suspend(barrier_queue);
753 dispatch_group_notify(barrier_group, io_q, ^{
754 dispatch_thread_context_s io_ctxt = {
755 .dtc_key = _dispatch_io_key,
756 .dtc_io_in_barrier = channel,
757 };
758
759 _dispatch_object_debug(channel, "%s", __func__);
760 _dispatch_thread_context_push(&io_ctxt);
761 barrier();
762 _dispatch_thread_context_pop(&io_ctxt);
763 dispatch_resume(barrier_queue);
764 _dispatch_release(channel);
765 });
766 });
767 });
768 }
769
770 void
771 dispatch_io_barrier_f(dispatch_io_t channel, void *context,
772 dispatch_function_t barrier)
773 {
774 return dispatch_io_barrier(channel, ^{ barrier(context); });
775 }
776
777 void
778 dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
779 dispatch_queue_t queue, dispatch_io_handler_t handler)
780 {
781 _dispatch_retain(channel);
782 _dispatch_retain(queue);
783 dispatch_async(channel->queue, ^{
784 dispatch_operation_t op;
785 op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
786 length, dispatch_data_empty, queue, handler);
787 if (op) {
788 dispatch_queue_t barrier_q = channel->barrier_queue;
789 dispatch_async(barrier_q, ^{
790 _dispatch_operation_enqueue(op, DOP_DIR_READ,
791 dispatch_data_empty);
792 });
793 }
794 _dispatch_release(channel);
795 _dispatch_release(queue);
796 });
797 }
798
799 void
800 dispatch_io_read_f(dispatch_io_t channel, off_t offset, size_t length,
801 dispatch_queue_t queue, void *context,
802 dispatch_io_handler_function_t handler)
803 {
804 return dispatch_io_read(channel, offset, length, queue,
805 ^(bool done, dispatch_data_t d, int error){
806 handler(context, done, d, error);
807 });
808 }
809
810 void
811 dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
812 dispatch_queue_t queue, dispatch_io_handler_t handler)
813 {
814 _dispatch_io_data_retain(data);
815 _dispatch_retain(channel);
816 _dispatch_retain(queue);
817 dispatch_async(channel->queue, ^{
818 dispatch_operation_t op;
819 op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
820 dispatch_data_get_size(data), data, queue, handler);
821 if (op) {
822 dispatch_queue_t barrier_q = channel->barrier_queue;
823 dispatch_async(barrier_q, ^{
824 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
825 _dispatch_io_data_release(data);
826 });
827 } else {
828 _dispatch_io_data_release(data);
829 }
830 _dispatch_release(channel);
831 _dispatch_release(queue);
832 });
833 }
834
835 void
836 dispatch_io_write_f(dispatch_io_t channel, off_t offset, dispatch_data_t data,
837 dispatch_queue_t queue, void *context,
838 dispatch_io_handler_function_t handler)
839 {
840 return dispatch_io_write(channel, offset, data, queue,
841 ^(bool done, dispatch_data_t d, int error){
842 handler(context, done, d, error);
843 });
844 }
845
846 void
847 dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
848 void (^handler)(dispatch_data_t, int))
849 {
850 _dispatch_retain(queue);
851 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
852 // On barrier queue
853 if (fd_entry->err) {
854 int err = fd_entry->err;
855 dispatch_async(queue, ^{
856 _dispatch_fd_debug("convenience handler invoke", fd);
857 handler(dispatch_data_empty, err);
858 });
859 _dispatch_release(queue);
860 return;
861 }
862 // Safe to access fd_entry on barrier queue
863 dispatch_io_t channel = fd_entry->convenience_channel;
864 if (!channel) {
865 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
866 channel->fd = fd;
867 channel->fd_actual = fd;
868 channel->fd_entry = fd_entry;
869 dispatch_retain(fd_entry->barrier_queue);
870 dispatch_retain(fd_entry->barrier_group);
871 channel->barrier_queue = fd_entry->barrier_queue;
872 channel->barrier_group = fd_entry->barrier_group;
873 fd_entry->convenience_channel = channel;
874 }
875 __block dispatch_data_t deliver_data = dispatch_data_empty;
876 __block int err = 0;
877 dispatch_async(fd_entry->close_queue, ^{
878 dispatch_async(queue, ^{
879 _dispatch_fd_debug("convenience handler invoke", fd);
880 handler(deliver_data, err);
881 _dispatch_io_data_release(deliver_data);
882 });
883 _dispatch_release(queue);
884 });
885 dispatch_operation_t op =
886 _dispatch_operation_create(DOP_DIR_READ, channel, 0,
887 length, dispatch_data_empty,
888 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false),
889 ^(bool done, dispatch_data_t data, int error) {
890 if (data) {
891 data = dispatch_data_create_concat(deliver_data, data);
892 _dispatch_io_data_release(deliver_data);
893 deliver_data = data;
894 }
895 if (done) {
896 err = error;
897 }
898 });
899 if (op) {
900 _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
901 }
902 });
903 }
904
905 void
906 dispatch_read_f(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
907 void *context, void (*handler)(void *, dispatch_data_t, int))
908 {
909 return dispatch_read(fd, length, queue, ^(dispatch_data_t d, int error){
910 handler(context, d, error);
911 });
912 }
913
914 void
915 dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
916 void (^handler)(dispatch_data_t, int))
917 {
918 _dispatch_io_data_retain(data);
919 _dispatch_retain(queue);
920 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
921 // On barrier queue
922 if (fd_entry->err) {
923 int err = fd_entry->err;
924 dispatch_async(queue, ^{
925 _dispatch_fd_debug("convenience handler invoke", fd);
926 handler(NULL, err);
927 });
928 _dispatch_release(queue);
929 return;
930 }
931 // Safe to access fd_entry on barrier queue
932 dispatch_io_t channel = fd_entry->convenience_channel;
933 if (!channel) {
934 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
935 channel->fd = fd;
936 channel->fd_actual = fd;
937 channel->fd_entry = fd_entry;
938 dispatch_retain(fd_entry->barrier_queue);
939 dispatch_retain(fd_entry->barrier_group);
940 channel->barrier_queue = fd_entry->barrier_queue;
941 channel->barrier_group = fd_entry->barrier_group;
942 fd_entry->convenience_channel = channel;
943 }
944 __block dispatch_data_t deliver_data = NULL;
945 __block int err = 0;
946 dispatch_async(fd_entry->close_queue, ^{
947 dispatch_async(queue, ^{
948 _dispatch_fd_debug("convenience handler invoke", fd);
949 handler(deliver_data, err);
950 if (deliver_data) {
951 _dispatch_io_data_release(deliver_data);
952 }
953 });
954 _dispatch_release(queue);
955 });
956 dispatch_operation_t op =
957 _dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
958 dispatch_data_get_size(data), data,
959 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false),
960 ^(bool done, dispatch_data_t d, int error) {
961 if (done) {
962 if (d) {
963 _dispatch_io_data_retain(d);
964 deliver_data = d;
965 }
966 err = error;
967 }
968 });
969 if (op) {
970 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
971 }
972 _dispatch_io_data_release(data);
973 });
974 }
975
976 void
977 dispatch_write_f(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
978 void *context, void (*handler)(void *, dispatch_data_t, int))
979 {
980 return dispatch_write(fd, data, queue, ^(dispatch_data_t d, int error){
981 handler(context, d, error);
982 });
983 }
984
985 #pragma mark -
986 #pragma mark dispatch_operation_t
987
988 static dispatch_operation_t
989 _dispatch_operation_create(dispatch_op_direction_t direction,
990 dispatch_io_t channel, off_t offset, size_t length,
991 dispatch_data_t data, dispatch_queue_t queue,
992 dispatch_io_handler_t handler)
993 {
994 // On channel queue
995 dispatch_assert(direction < DOP_DIR_MAX);
996 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
997 // that can only be NULL if atomic_flags are set rdar://problem/8362514
998 int err = _dispatch_io_get_error(NULL, channel, false);
999 if (err || !length) {
1000 _dispatch_io_data_retain(data);
1001 _dispatch_retain(queue);
1002 dispatch_async(channel->barrier_queue, ^{
1003 dispatch_async(queue, ^{
1004 dispatch_data_t d = data;
1005 if (direction == DOP_DIR_READ && err) {
1006 d = NULL;
1007 } else if (direction == DOP_DIR_WRITE && !err) {
1008 d = NULL;
1009 }
1010 _dispatch_channel_debug("IO handler invoke: err %d", channel,
1011 err);
1012 handler(true, d, err);
1013 _dispatch_io_data_release(data);
1014 });
1015 _dispatch_release(queue);
1016 });
1017 return NULL;
1018 }
1019 dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation),
1020 sizeof(struct dispatch_operation_s));
1021 _dispatch_channel_debug("operation create: %p", channel, op);
1022 op->do_next = DISPATCH_OBJECT_LISTLESS;
1023 op->do_xref_cnt = -1; // operation object is not exposed externally
1024 op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
1025 op->op_q->do_targetq = queue;
1026 _dispatch_retain(queue);
1027 op->active = false;
1028 op->direction = direction;
1029 op->offset = offset + channel->f_ptr;
1030 op->length = length;
1031 op->handler = _dispatch_io_Block_copy(handler);
1032 _dispatch_retain(channel);
1033 op->channel = channel;
1034 op->params = channel->params;
1035 // Take a snapshot of the priority of the channel queue. The actual I/O
1036 // for this operation will be performed at this priority
1037 dispatch_queue_t targetq = op->channel->do_targetq;
1038 while (fastpath(targetq->do_targetq)) {
1039 targetq = targetq->do_targetq;
1040 }
1041 op->do_targetq = targetq;
1042 _dispatch_object_debug(op, "%s", __func__);
1043 return op;
1044 }
1045
1046 void
1047 _dispatch_operation_dispose(dispatch_operation_t op)
1048 {
1049 _dispatch_object_debug(op, "%s", __func__);
1050 _dispatch_op_debug("dispose", op);
1051 // Deliver the data if there's any
1052 if (op->fd_entry) {
1053 _dispatch_operation_deliver_data(op, DOP_DONE);
1054 dispatch_group_leave(op->fd_entry->barrier_group);
1055 _dispatch_fd_entry_release(op->fd_entry);
1056 }
1057 if (op->channel) {
1058 _dispatch_release(op->channel);
1059 }
1060 if (op->timer) {
1061 dispatch_release(op->timer);
1062 }
1063 // For write operations, op->buf is owned by op->buf_data
1064 if (op->buf && op->direction == DOP_DIR_READ) {
1065 free(op->buf);
1066 }
1067 if (op->buf_data) {
1068 _dispatch_io_data_release(op->buf_data);
1069 }
1070 if (op->data) {
1071 _dispatch_io_data_release(op->data);
1072 }
1073 if (op->op_q) {
1074 dispatch_release(op->op_q);
1075 }
1076 Block_release(op->handler);
1077 _dispatch_op_debug("disposed", op);
1078 }
1079
1080 static void
1081 _dispatch_operation_enqueue(dispatch_operation_t op,
1082 dispatch_op_direction_t direction, dispatch_data_t data)
1083 {
1084 // Called from the barrier queue
1085 _dispatch_io_data_retain(data);
1086 // If channel is closed or stopped, then call the handler immediately
1087 int err = _dispatch_io_get_error(NULL, op->channel, false);
1088 if (err) {
1089 dispatch_io_handler_t handler = op->handler;
1090 dispatch_async(op->op_q, ^{
1091 dispatch_data_t d = data;
1092 if (direction == DOP_DIR_READ && err) {
1093 d = NULL;
1094 } else if (direction == DOP_DIR_WRITE && !err) {
1095 d = NULL;
1096 }
1097 handler(true, d, err);
1098 _dispatch_io_data_release(data);
1099 });
1100 _dispatch_op_debug("release -> %d, err %d", op, op->do_ref_cnt, err);
1101 _dispatch_release(op);
1102 return;
1103 }
1104 // Finish operation init
1105 op->fd_entry = op->channel->fd_entry;
1106 _dispatch_fd_entry_retain(op->fd_entry);
1107 dispatch_group_enter(op->fd_entry->barrier_group);
1108 dispatch_disk_t disk = op->fd_entry->disk;
1109 if (!disk) {
1110 dispatch_stream_t stream = op->fd_entry->streams[direction];
1111 dispatch_async(stream->dq, ^{
1112 _dispatch_stream_enqueue_operation(stream, op, data);
1113 _dispatch_io_data_release(data);
1114 });
1115 } else {
1116 dispatch_async(disk->pick_queue, ^{
1117 _dispatch_disk_enqueue_operation(disk, op, data);
1118 _dispatch_io_data_release(data);
1119 });
1120 }
1121 }
1122
1123 static bool
1124 _dispatch_operation_should_enqueue(dispatch_operation_t op,
1125 dispatch_queue_t tq, dispatch_data_t data)
1126 {
1127 // On stream queue or disk queue
1128 _dispatch_op_debug("enqueue", op);
1129 _dispatch_io_data_retain(data);
1130 op->data = data;
1131 int err = _dispatch_io_get_error(op, NULL, true);
1132 if (err) {
1133 op->err = err;
1134 // Final release
1135 _dispatch_op_debug("release -> %d, err %d", op, op->do_ref_cnt, err);
1136 _dispatch_release(op);
1137 return false;
1138 }
1139 if (op->params.interval) {
1140 dispatch_resume(_dispatch_operation_timer(tq, op));
1141 }
1142 return true;
1143 }
1144
1145 static dispatch_source_t
1146 _dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
1147 {
1148 // On stream queue or pick queue
1149 if (op->timer) {
1150 return op->timer;
1151 }
1152 dispatch_source_t timer = dispatch_source_create(
1153 DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
1154 dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW,
1155 (int64_t)op->params.interval), op->params.interval, 0);
1156 dispatch_source_set_event_handler(timer, ^{
1157 // On stream queue or pick queue
1158 if (dispatch_source_testcancel(timer)) {
1159 // Do nothing. The operation has already completed
1160 return;
1161 }
1162 dispatch_op_flags_t flags = DOP_DEFAULT;
1163 if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
1164 // Deliver even if there is less data than the low-water mark
1165 flags |= DOP_DELIVER;
1166 }
1167 // If the operation is active, dont deliver data
1168 if ((op->active) && (flags & DOP_DELIVER)) {
1169 op->flags = flags;
1170 } else {
1171 _dispatch_operation_deliver_data(op, flags);
1172 }
1173 });
1174 op->timer = timer;
1175 return op->timer;
1176 }
1177
1178 #pragma mark -
1179 #pragma mark dispatch_fd_entry_t
1180
1181 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1182 static void
1183 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry)
1184 {
1185 guardid_t guard = fd_entry;
1186 const unsigned int guard_flags = GUARD_CLOSE;
1187 int err, fd_flags = 0;
1188 _dispatch_io_syscall_switch_noerr(err,
1189 change_fdguard_np(fd_entry->fd, NULL, 0, &guard, guard_flags,
1190 &fd_flags),
1191 case 0:
1192 fd_entry->guard_flags = guard_flags;
1193 fd_entry->orig_fd_flags = fd_flags;
1194 break;
1195 case EPERM: break;
1196 default: (void)dispatch_assume_zero(err); break;
1197 );
1198 }
1199
1200 static void
1201 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry)
1202 {
1203 if (!fd_entry->guard_flags) {
1204 return;
1205 }
1206 guardid_t guard = fd_entry;
1207 int err, fd_flags = fd_entry->orig_fd_flags;
1208 _dispatch_io_syscall_switch(err,
1209 change_fdguard_np(fd_entry->fd, &guard, fd_entry->guard_flags, NULL, 0,
1210 &fd_flags),
1211 default: (void)dispatch_assume_zero(err); break;
1212 );
1213 }
1214 #else
1215 static inline void
1216 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1217 static inline void
1218 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1219 #endif // DISPATCH_USE_GUARDED_FD
1220
1221 static inline int
1222 _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry, const char *path,
1223 int oflag, mode_t mode) {
1224 #if DISPATCH_USE_GUARDED_FD
1225 guardid_t guard = (uintptr_t)fd_entry;
1226 const unsigned int guard_flags = GUARD_CLOSE | GUARD_DUP |
1227 GUARD_SOCKET_IPC | GUARD_FILEPORT;
1228 int fd = guarded_open_np(path, &guard, guard_flags, oflag | O_CLOEXEC,
1229 mode);
1230 if (fd != -1) {
1231 fd_entry->guard_flags = guard_flags;
1232 return fd;
1233 }
1234 errno = 0;
1235 #endif
1236 return open(path, oflag, mode);
1237 (void)fd_entry;
1238 }
1239
1240 static inline int
1241 _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry, int fd) {
1242 #if DISPATCH_USE_GUARDED_FD
1243 if (fd_entry->guard_flags) {
1244 guardid_t guard = (uintptr_t)fd_entry;
1245 return guarded_close_np(fd, &guard);
1246 } else
1247 #endif
1248 {
1249 return close(fd);
1250 }
1251 (void)fd_entry;
1252 }
1253
1254 static inline void
1255 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
1256 dispatch_suspend(fd_entry->close_queue);
1257 }
1258
1259 static inline void
1260 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
1261 dispatch_resume(fd_entry->close_queue);
1262 }
1263
1264 static void
1265 _dispatch_fd_entry_init_async(dispatch_fd_t fd,
1266 dispatch_fd_entry_init_callback_t completion_callback)
1267 {
1268 static dispatch_once_t _dispatch_io_fds_lockq_pred;
1269 dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
1270 _dispatch_io_fds_lockq_init);
1271 dispatch_async(_dispatch_io_fds_lockq, ^{
1272 dispatch_fd_entry_t fd_entry = NULL;
1273 // Check to see if there is an existing entry for the given fd
1274 uintptr_t hash = DIO_HASH(fd);
1275 TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
1276 if (fd_entry->fd == fd) {
1277 // Retain the fd_entry to ensure it cannot go away until the
1278 // stat() has completed
1279 _dispatch_fd_entry_retain(fd_entry);
1280 break;
1281 }
1282 }
1283 if (!fd_entry) {
1284 // If we did not find an existing entry, create one
1285 fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
1286 }
1287 _dispatch_fd_entry_debug("init", fd_entry);
1288 dispatch_async(fd_entry->barrier_queue, ^{
1289 _dispatch_fd_entry_debug("init completion", fd_entry);
1290 completion_callback(fd_entry);
1291 // stat() is complete, release reference to fd_entry
1292 _dispatch_fd_entry_release(fd_entry);
1293 });
1294 });
1295 }
1296
1297 static dispatch_fd_entry_t
1298 _dispatch_fd_entry_create(dispatch_queue_t q)
1299 {
1300 dispatch_fd_entry_t fd_entry;
1301 fd_entry = _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s));
1302 fd_entry->close_queue = dispatch_queue_create(
1303 "com.apple.libdispatch-io.closeq", NULL);
1304 // Use target queue to ensure that no concurrent lookups are going on when
1305 // the close queue is running
1306 fd_entry->close_queue->do_targetq = q;
1307 _dispatch_retain(q);
1308 // Suspend the cleanup queue until closing
1309 _dispatch_fd_entry_retain(fd_entry);
1310 return fd_entry;
1311 }
1312
1313 static dispatch_fd_entry_t
1314 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
1315 {
1316 // On fds lock queue
1317 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1318 _dispatch_io_fds_lockq);
1319 _dispatch_fd_entry_debug("create: fd %d", fd_entry, fd);
1320 fd_entry->fd = fd;
1321 TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
1322 fd_entry->barrier_queue = dispatch_queue_create(
1323 "com.apple.libdispatch-io.barrierq", NULL);
1324 fd_entry->barrier_group = dispatch_group_create();
1325 dispatch_async(fd_entry->barrier_queue, ^{
1326 _dispatch_fd_entry_debug("stat", fd_entry);
1327 int err, orig_flags, orig_nosigpipe = -1;
1328 struct stat st;
1329 _dispatch_io_syscall_switch(err,
1330 fstat(fd, &st),
1331 default: fd_entry->err = err; return;
1332 );
1333 fd_entry->stat.dev = st.st_dev;
1334 fd_entry->stat.mode = st.st_mode;
1335 _dispatch_fd_entry_guard(fd_entry);
1336 _dispatch_io_syscall_switch(err,
1337 orig_flags = fcntl(fd, F_GETFL),
1338 default: (void)dispatch_assume_zero(err); break;
1339 );
1340 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1341 if (S_ISFIFO(st.st_mode)) {
1342 _dispatch_io_syscall_switch(err,
1343 orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
1344 default: (void)dispatch_assume_zero(err); break;
1345 );
1346 if (orig_nosigpipe != -1) {
1347 _dispatch_io_syscall_switch(err,
1348 orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
1349 default:
1350 orig_nosigpipe = -1;
1351 (void)dispatch_assume_zero(err);
1352 break;
1353 );
1354 }
1355 }
1356 #endif
1357 if (S_ISREG(st.st_mode)) {
1358 if (orig_flags != -1) {
1359 _dispatch_io_syscall_switch(err,
1360 fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
1361 default:
1362 orig_flags = -1;
1363 (void)dispatch_assume_zero(err);
1364 break;
1365 );
1366 }
1367 int32_t dev = major(st.st_dev);
1368 // We have to get the disk on the global dev queue. The
1369 // barrier queue cannot continue until that is complete
1370 dispatch_suspend(fd_entry->barrier_queue);
1371 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
1372 _dispatch_io_devs_lockq_init);
1373 dispatch_async(_dispatch_io_devs_lockq, ^{
1374 _dispatch_disk_init(fd_entry, dev);
1375 dispatch_resume(fd_entry->barrier_queue);
1376 });
1377 } else {
1378 if (orig_flags != -1) {
1379 _dispatch_io_syscall_switch(err,
1380 fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
1381 default:
1382 orig_flags = -1;
1383 (void)dispatch_assume_zero(err);
1384 break;
1385 );
1386 }
1387 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1388 _DISPATCH_QOS_CLASS_DEFAULT, false));
1389 }
1390 fd_entry->orig_flags = orig_flags;
1391 fd_entry->orig_nosigpipe = orig_nosigpipe;
1392 });
1393 // This is the first item run when the close queue is resumed, indicating
1394 // that all channels associated with this entry have been closed and that
1395 // all operations associated with this entry have been freed
1396 dispatch_async(fd_entry->close_queue, ^{
1397 if (!fd_entry->disk) {
1398 _dispatch_fd_entry_debug("close queue cleanup", fd_entry);
1399 dispatch_op_direction_t dir;
1400 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1401 _dispatch_stream_dispose(fd_entry, dir);
1402 }
1403 } else {
1404 dispatch_disk_t disk = fd_entry->disk;
1405 dispatch_async(_dispatch_io_devs_lockq, ^{
1406 _dispatch_release(disk);
1407 });
1408 }
1409 // Remove this entry from the global fd list
1410 TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
1411 });
1412 // If there was a source associated with this stream, disposing of the
1413 // source cancels it and suspends the close queue. Freeing the fd_entry
1414 // structure must happen after the source cancel handler has finished
1415 dispatch_async(fd_entry->close_queue, ^{
1416 _dispatch_fd_entry_debug("close queue release", fd_entry);
1417 dispatch_release(fd_entry->close_queue);
1418 _dispatch_fd_entry_debug("barrier queue release", fd_entry);
1419 dispatch_release(fd_entry->barrier_queue);
1420 _dispatch_fd_entry_debug("barrier group release", fd_entry);
1421 dispatch_release(fd_entry->barrier_group);
1422 if (fd_entry->orig_flags != -1) {
1423 _dispatch_io_syscall(
1424 fcntl(fd, F_SETFL, fd_entry->orig_flags)
1425 );
1426 }
1427 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1428 if (fd_entry->orig_nosigpipe != -1) {
1429 _dispatch_io_syscall(
1430 fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
1431 );
1432 }
1433 #endif
1434 _dispatch_fd_entry_unguard(fd_entry);
1435 if (fd_entry->convenience_channel) {
1436 fd_entry->convenience_channel->fd_entry = NULL;
1437 dispatch_release(fd_entry->convenience_channel);
1438 }
1439 free(fd_entry);
1440 });
1441 return fd_entry;
1442 }
1443
1444 static dispatch_fd_entry_t
1445 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
1446 dev_t dev, mode_t mode)
1447 {
1448 // On devs lock queue
1449 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1450 path_data->channel->queue);
1451 _dispatch_fd_entry_debug("create: path %s", fd_entry, path_data->path);
1452 if (S_ISREG(mode)) {
1453 _dispatch_disk_init(fd_entry, major(dev));
1454 } else {
1455 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1456 _DISPATCH_QOS_CLASS_DEFAULT, false));
1457 }
1458 fd_entry->fd = -1;
1459 fd_entry->orig_flags = -1;
1460 fd_entry->path_data = path_data;
1461 fd_entry->stat.dev = dev;
1462 fd_entry->stat.mode = mode;
1463 fd_entry->barrier_queue = dispatch_queue_create(
1464 "com.apple.libdispatch-io.barrierq", NULL);
1465 fd_entry->barrier_group = dispatch_group_create();
1466 // This is the first item run when the close queue is resumed, indicating
1467 // that the channel associated with this entry has been closed and that
1468 // all operations associated with this entry have been freed
1469 dispatch_async(fd_entry->close_queue, ^{
1470 _dispatch_fd_entry_debug("close queue cleanup", fd_entry);
1471 if (!fd_entry->disk) {
1472 dispatch_op_direction_t dir;
1473 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1474 _dispatch_stream_dispose(fd_entry, dir);
1475 }
1476 }
1477 if (fd_entry->fd != -1) {
1478 _dispatch_fd_entry_guarded_close(fd_entry, fd_entry->fd);
1479 }
1480 if (fd_entry->path_data->channel) {
1481 // If associated channel has not been released yet, mark it as
1482 // no longer having an fd_entry (for stop after close).
1483 // It is safe to modify channel since we are on close_queue with
1484 // target queue the channel queue
1485 fd_entry->path_data->channel->fd_entry = NULL;
1486 }
1487 });
1488 dispatch_async(fd_entry->close_queue, ^{
1489 _dispatch_fd_entry_debug("close queue release", fd_entry);
1490 dispatch_release(fd_entry->close_queue);
1491 dispatch_release(fd_entry->barrier_queue);
1492 dispatch_release(fd_entry->barrier_group);
1493 free(fd_entry->path_data);
1494 free(fd_entry);
1495 });
1496 return fd_entry;
1497 }
1498
1499 static int
1500 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
1501 {
1502 if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
1503 return 0;
1504 }
1505 if (fd_entry->err) {
1506 return fd_entry->err;
1507 }
1508 int fd = -1;
1509 int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
1510 fd_entry->path_data->oflag | O_NONBLOCK;
1511 open:
1512 fd = _dispatch_fd_entry_guarded_open(fd_entry, fd_entry->path_data->path,
1513 oflag, fd_entry->path_data->mode);
1514 if (fd == -1) {
1515 int err = errno;
1516 if (err == EINTR) {
1517 goto open;
1518 }
1519 (void)os_atomic_cmpxchg2o(fd_entry, err, 0, err, relaxed);
1520 return err;
1521 }
1522 if (!os_atomic_cmpxchg2o(fd_entry, fd, -1, fd, relaxed)) {
1523 // Lost the race with another open
1524 _dispatch_fd_entry_guarded_close(fd_entry, fd);
1525 } else {
1526 channel->fd_actual = fd;
1527 }
1528 _dispatch_object_debug(channel, "%s", __func__);
1529 return 0;
1530 }
1531
1532 static void
1533 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
1534 dispatch_io_t channel)
1535 {
1536 if (fd_entry->disk) {
1537 if (channel) {
1538 _dispatch_retain(channel);
1539 }
1540 _dispatch_fd_entry_retain(fd_entry);
1541 dispatch_async(fd_entry->disk->pick_queue, ^{
1542 _dispatch_disk_cleanup_inactive_operations(fd_entry->disk, channel);
1543 _dispatch_fd_entry_release(fd_entry);
1544 if (channel) {
1545 _dispatch_release(channel);
1546 }
1547 });
1548 } else {
1549 dispatch_op_direction_t direction;
1550 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1551 dispatch_stream_t stream = fd_entry->streams[direction];
1552 if (!stream) {
1553 continue;
1554 }
1555 if (channel) {
1556 _dispatch_retain(channel);
1557 }
1558 _dispatch_fd_entry_retain(fd_entry);
1559 dispatch_async(stream->dq, ^{
1560 _dispatch_stream_cleanup_operations(stream, channel);
1561 _dispatch_fd_entry_release(fd_entry);
1562 if (channel) {
1563 _dispatch_release(channel);
1564 }
1565 });
1566 }
1567 }
1568 }
1569
1570 #pragma mark -
1571 #pragma mark dispatch_stream_t/dispatch_disk_t
1572
1573 static void
1574 _dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
1575 {
1576 dispatch_op_direction_t direction;
1577 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1578 dispatch_stream_t stream;
1579 stream = _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s));
1580 stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq",
1581 NULL);
1582 dispatch_set_context(stream->dq, stream);
1583 _dispatch_retain(tq);
1584 stream->dq->do_targetq = tq;
1585 TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
1586 TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
1587 fd_entry->streams[direction] = stream;
1588 }
1589 }
1590
1591 static void
1592 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
1593 dispatch_op_direction_t direction)
1594 {
1595 // On close queue
1596 dispatch_stream_t stream = fd_entry->streams[direction];
1597 if (!stream) {
1598 return;
1599 }
1600 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1601 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
1602 if (stream->source) {
1603 // Balanced by source cancel handler:
1604 _dispatch_fd_entry_retain(fd_entry);
1605 dispatch_source_cancel(stream->source);
1606 dispatch_resume(stream->source);
1607 dispatch_release(stream->source);
1608 }
1609 dispatch_set_context(stream->dq, NULL);
1610 dispatch_release(stream->dq);
1611 free(stream);
1612 }
1613
1614 static void
1615 _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
1616 {
1617 // On devs lock queue
1618 dispatch_disk_t disk;
1619 // Check to see if there is an existing entry for the given device
1620 uintptr_t hash = DIO_HASH(dev);
1621 TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
1622 if (disk->dev == dev) {
1623 _dispatch_retain(disk);
1624 goto out;
1625 }
1626 }
1627 // Otherwise create a new entry
1628 size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
1629 disk = _dispatch_alloc(DISPATCH_VTABLE(disk),
1630 sizeof(struct dispatch_disk_s) +
1631 (pending_reqs_depth * sizeof(dispatch_operation_t)));
1632 disk->do_next = DISPATCH_OBJECT_LISTLESS;
1633 disk->do_xref_cnt = -1;
1634 disk->advise_list_depth = pending_reqs_depth;
1635 disk->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
1636 false);
1637 disk->dev = dev;
1638 TAILQ_INIT(&disk->operations);
1639 disk->cur_rq = TAILQ_FIRST(&disk->operations);
1640 char label[45];
1641 snprintf(label, sizeof(label), "com.apple.libdispatch-io.deviceq.%d",
1642 (int)dev);
1643 disk->pick_queue = dispatch_queue_create(label, NULL);
1644 TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
1645 out:
1646 fd_entry->disk = disk;
1647 TAILQ_INIT(&fd_entry->stream_ops);
1648 }
1649
1650 void
1651 _dispatch_disk_dispose(dispatch_disk_t disk)
1652 {
1653 uintptr_t hash = DIO_HASH(disk->dev);
1654 TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
1655 dispatch_assert(TAILQ_EMPTY(&disk->operations));
1656 size_t i;
1657 for (i=0; i<disk->advise_list_depth; ++i) {
1658 dispatch_assert(!disk->advise_list[i]);
1659 }
1660 dispatch_release(disk->pick_queue);
1661 }
1662
1663 #pragma mark -
1664 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1665
1666 static inline bool
1667 _dispatch_stream_operation_avail(dispatch_stream_t stream)
1668 {
1669 return !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
1670 !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1671 }
1672
1673 static void
1674 _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
1675 dispatch_operation_t op, dispatch_data_t data)
1676 {
1677 if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
1678 return;
1679 }
1680 _dispatch_object_debug(op, "%s", __func__);
1681 bool no_ops = !_dispatch_stream_operation_avail(stream);
1682 TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
1683 if (no_ops) {
1684 dispatch_async_f(stream->dq, stream->dq,
1685 _dispatch_stream_queue_handler);
1686 }
1687 }
1688
1689 static void
1690 _dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
1691 dispatch_data_t data)
1692 {
1693 if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
1694 return;
1695 }
1696 _dispatch_object_debug(op, "%s", __func__);
1697 if (op->params.type == DISPATCH_IO_STREAM) {
1698 if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
1699 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1700 }
1701 TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
1702 } else {
1703 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1704 }
1705 _dispatch_disk_handler(disk);
1706 }
1707
1708 static void
1709 _dispatch_stream_complete_operation(dispatch_stream_t stream,
1710 dispatch_operation_t op)
1711 {
1712 // On stream queue
1713 _dispatch_object_debug(op, "%s", __func__);
1714 _dispatch_op_debug("complete: stream %p", op, stream);
1715 TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
1716 if (op == stream->op) {
1717 stream->op = NULL;
1718 }
1719 if (op->timer) {
1720 dispatch_source_cancel(op->timer);
1721 }
1722 // Final release will deliver any pending data
1723 _dispatch_op_debug("release -> %d (stream complete)", op, op->do_ref_cnt);
1724 _dispatch_release(op);
1725 }
1726
1727 static void
1728 _dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
1729 {
1730 // On pick queue
1731 _dispatch_object_debug(op, "%s", __func__);
1732 _dispatch_op_debug("complete: disk %p", op, disk);
1733 // Current request is always the last op returned
1734 if (disk->cur_rq == op) {
1735 disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
1736 operation_list);
1737 }
1738 if (op->params.type == DISPATCH_IO_STREAM) {
1739 // Check if there are other pending stream operations behind it
1740 dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
1741 TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
1742 if (op_next) {
1743 TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
1744 }
1745 }
1746 TAILQ_REMOVE(&disk->operations, op, operation_list);
1747 if (op->timer) {
1748 dispatch_source_cancel(op->timer);
1749 }
1750 // Final release will deliver any pending data
1751 _dispatch_op_debug("release -> %d (disk complete)", op, op->do_ref_cnt);
1752 _dispatch_release(op);
1753 }
1754
1755 static dispatch_operation_t
1756 _dispatch_stream_pick_next_operation(dispatch_stream_t stream,
1757 dispatch_operation_t op)
1758 {
1759 // On stream queue
1760 if (!op) {
1761 // On the first run through, pick the first operation
1762 if (!_dispatch_stream_operation_avail(stream)) {
1763 return op;
1764 }
1765 if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
1766 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
1767 } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
1768 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1769 }
1770 return op;
1771 }
1772 if (op->params.type == DISPATCH_IO_STREAM) {
1773 // Stream operations need to be serialized so continue the current
1774 // operation until it is finished
1775 return op;
1776 }
1777 // Get the next random operation (round-robin)
1778 if (op->params.type == DISPATCH_IO_RANDOM) {
1779 op = TAILQ_NEXT(op, operation_list);
1780 if (!op) {
1781 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1782 }
1783 return op;
1784 }
1785 return NULL;
1786 }
1787
1788 static dispatch_operation_t
1789 _dispatch_disk_pick_next_operation(dispatch_disk_t disk)
1790 {
1791 // On pick queue
1792 dispatch_operation_t op;
1793 if (!TAILQ_EMPTY(&disk->operations)) {
1794 if (disk->cur_rq == NULL) {
1795 op = TAILQ_FIRST(&disk->operations);
1796 } else {
1797 op = disk->cur_rq;
1798 do {
1799 op = TAILQ_NEXT(op, operation_list);
1800 if (!op) {
1801 op = TAILQ_FIRST(&disk->operations);
1802 }
1803 // TODO: more involved picking algorithm rdar://problem/8780312
1804 } while (op->active && op != disk->cur_rq);
1805 }
1806 if (!op->active) {
1807 disk->cur_rq = op;
1808 return op;
1809 }
1810 }
1811 return NULL;
1812 }
1813
1814 static void
1815 _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
1816 dispatch_io_t channel)
1817 {
1818 // On stream queue
1819 dispatch_operation_t op, tmp;
1820 typeof(*stream->operations) *operations;
1821 operations = &stream->operations[DISPATCH_IO_RANDOM];
1822 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1823 if (!channel || op->channel == channel) {
1824 _dispatch_stream_complete_operation(stream, op);
1825 }
1826 }
1827 operations = &stream->operations[DISPATCH_IO_STREAM];
1828 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1829 if (!channel || op->channel == channel) {
1830 _dispatch_stream_complete_operation(stream, op);
1831 }
1832 }
1833 if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
1834 dispatch_suspend(stream->source);
1835 stream->source_running = false;
1836 }
1837 }
1838
1839 static inline void
1840 _dispatch_disk_cleanup_specified_operations(dispatch_disk_t disk,
1841 dispatch_io_t channel, bool inactive_only)
1842 {
1843 // On pick queue
1844 dispatch_operation_t op, tmp;
1845 TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
1846 if (inactive_only && op->active) continue;
1847 if (!channel || op->channel == channel) {
1848 _dispatch_op_debug("cleanup: disk %p", op, disk);
1849 _dispatch_disk_complete_operation(disk, op);
1850 }
1851 }
1852 }
1853
1854 static void
1855 _dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
1856 {
1857 _dispatch_disk_cleanup_specified_operations(disk, channel, false);
1858 }
1859
1860 static void
1861 _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk,
1862 dispatch_io_t channel)
1863 {
1864 _dispatch_disk_cleanup_specified_operations(disk, channel, true);
1865 }
1866
1867 #pragma mark -
1868 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1869
1870 static dispatch_source_t
1871 _dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
1872 {
1873 // On stream queue
1874 if (stream->source) {
1875 return stream->source;
1876 }
1877 dispatch_fd_t fd = op->fd_entry->fd;
1878 _dispatch_op_debug("stream source create", op);
1879 dispatch_source_t source = NULL;
1880 if (op->direction == DOP_DIR_READ) {
1881 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
1882 (uintptr_t)fd, 0, stream->dq);
1883 } else if (op->direction == DOP_DIR_WRITE) {
1884 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE,
1885 (uintptr_t)fd, 0, stream->dq);
1886 } else {
1887 dispatch_assert(op->direction < DOP_DIR_MAX);
1888 return NULL;
1889 }
1890 dispatch_set_context(source, stream);
1891 dispatch_source_set_event_handler_f(source,
1892 _dispatch_stream_source_handler);
1893 // Close queue must not run user cleanup handlers until sources are fully
1894 // unregistered
1895 dispatch_queue_t close_queue = op->fd_entry->close_queue;
1896 dispatch_source_set_cancel_handler(source, ^{
1897 _dispatch_op_debug("stream source cancel", op);
1898 dispatch_resume(close_queue);
1899 });
1900 stream->source = source;
1901 return stream->source;
1902 }
1903
1904 static void
1905 _dispatch_stream_source_handler(void *ctx)
1906 {
1907 // On stream queue
1908 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1909 dispatch_suspend(stream->source);
1910 stream->source_running = false;
1911 return _dispatch_stream_handler(stream);
1912 }
1913
1914 static void
1915 _dispatch_stream_queue_handler(void *ctx)
1916 {
1917 // On stream queue
1918 dispatch_stream_t stream = (dispatch_stream_t)dispatch_get_context(ctx);
1919 if (!stream) {
1920 // _dispatch_stream_dispose has been called
1921 return;
1922 }
1923 return _dispatch_stream_handler(stream);
1924 }
1925
1926 static void
1927 _dispatch_stream_handler(void *ctx)
1928 {
1929 // On stream queue
1930 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1931 dispatch_operation_t op;
1932 pick:
1933 op = _dispatch_stream_pick_next_operation(stream, stream->op);
1934 if (!op) {
1935 _dispatch_debug("no operation found: stream %p", stream);
1936 return;
1937 }
1938 int err = _dispatch_io_get_error(op, NULL, true);
1939 if (err) {
1940 op->err = err;
1941 _dispatch_stream_complete_operation(stream, op);
1942 goto pick;
1943 }
1944 stream->op = op;
1945 _dispatch_op_debug("stream handler", op);
1946 dispatch_fd_entry_t fd_entry = op->fd_entry;
1947 _dispatch_fd_entry_retain(fd_entry);
1948 // For performance analysis
1949 if (!op->total && dispatch_io_defaults.initial_delivery) {
1950 // Empty delivery to signal the start of the operation
1951 _dispatch_op_debug("initial delivery", op);
1952 _dispatch_operation_deliver_data(op, DOP_DELIVER);
1953 }
1954 // TODO: perform on the operation target queue to get correct priority
1955 int result = _dispatch_operation_perform(op);
1956 dispatch_op_flags_t flags = ~0u;
1957 switch (result) {
1958 case DISPATCH_OP_DELIVER:
1959 flags = DOP_DEFAULT;
1960 // Fall through
1961 case DISPATCH_OP_DELIVER_AND_COMPLETE:
1962 flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
1963 DOP_DEFAULT;
1964 _dispatch_operation_deliver_data(op, flags);
1965 // Fall through
1966 case DISPATCH_OP_COMPLETE:
1967 if (flags != DOP_DEFAULT) {
1968 _dispatch_stream_complete_operation(stream, op);
1969 }
1970 if (_dispatch_stream_operation_avail(stream)) {
1971 dispatch_async_f(stream->dq, stream->dq,
1972 _dispatch_stream_queue_handler);
1973 }
1974 break;
1975 case DISPATCH_OP_COMPLETE_RESUME:
1976 _dispatch_stream_complete_operation(stream, op);
1977 // Fall through
1978 case DISPATCH_OP_RESUME:
1979 if (_dispatch_stream_operation_avail(stream)) {
1980 stream->source_running = true;
1981 dispatch_resume(_dispatch_stream_source(stream, op));
1982 }
1983 break;
1984 case DISPATCH_OP_ERR:
1985 _dispatch_stream_cleanup_operations(stream, op->channel);
1986 break;
1987 case DISPATCH_OP_FD_ERR:
1988 _dispatch_fd_entry_retain(fd_entry);
1989 dispatch_async(fd_entry->barrier_queue, ^{
1990 _dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
1991 _dispatch_fd_entry_release(fd_entry);
1992 });
1993 break;
1994 default:
1995 break;
1996 }
1997 _dispatch_fd_entry_release(fd_entry);
1998 return;
1999 }
2000
2001 static void
2002 _dispatch_disk_handler(void *ctx)
2003 {
2004 // On pick queue
2005 dispatch_disk_t disk = (dispatch_disk_t)ctx;
2006 if (disk->io_active) {
2007 return;
2008 }
2009 _dispatch_disk_debug("disk handler", disk);
2010 dispatch_operation_t op;
2011 size_t i = disk->free_idx, j = disk->req_idx;
2012 if (j <= i) {
2013 j += disk->advise_list_depth;
2014 }
2015 while (i <= j) {
2016 if ((!disk->advise_list[i%disk->advise_list_depth]) &&
2017 (op = _dispatch_disk_pick_next_operation(disk))) {
2018 int err = _dispatch_io_get_error(op, NULL, true);
2019 if (err) {
2020 op->err = err;
2021 _dispatch_disk_complete_operation(disk, op);
2022 continue;
2023 }
2024 _dispatch_retain(op);
2025 _dispatch_op_debug("retain -> %d", op, op->do_ref_cnt + 1);
2026 disk->advise_list[i%disk->advise_list_depth] = op;
2027 op->active = true;
2028 _dispatch_op_debug("activate: disk %p", op, disk);
2029 _dispatch_object_debug(op, "%s", __func__);
2030 } else {
2031 // No more operations to get
2032 break;
2033 }
2034 i++;
2035 }
2036 disk->free_idx = (i%disk->advise_list_depth);
2037 op = disk->advise_list[disk->req_idx];
2038 if (op) {
2039 disk->io_active = true;
2040 _dispatch_op_debug("async perform: disk %p", op, disk);
2041 dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
2042 }
2043 }
2044
2045 static void
2046 _dispatch_disk_perform(void *ctxt)
2047 {
2048 dispatch_disk_t disk = ctxt;
2049 _dispatch_disk_debug("disk perform", disk);
2050 size_t chunk_size = dispatch_io_defaults.chunk_size;
2051 dispatch_operation_t op;
2052 size_t i = disk->advise_idx, j = disk->free_idx;
2053 if (j <= i) {
2054 j += disk->advise_list_depth;
2055 }
2056 do {
2057 op = disk->advise_list[i%disk->advise_list_depth];
2058 if (!op) {
2059 // Nothing more to advise, must be at free_idx
2060 dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
2061 break;
2062 }
2063 if (op->direction == DOP_DIR_WRITE) {
2064 // TODO: preallocate writes ? rdar://problem/9032172
2065 continue;
2066 }
2067 if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
2068 op->channel)) {
2069 continue;
2070 }
2071 // For performance analysis
2072 if (!op->total && dispatch_io_defaults.initial_delivery) {
2073 // Empty delivery to signal the start of the operation
2074 _dispatch_op_debug("initial delivery", op);
2075 _dispatch_operation_deliver_data(op, DOP_DELIVER);
2076 }
2077 // Advise two chunks if the list only has one element and this is the
2078 // first advise on the operation
2079 if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
2080 !op->advise_offset) {
2081 chunk_size *= 2;
2082 }
2083 _dispatch_operation_advise(op, chunk_size);
2084 } while (++i < j);
2085 disk->advise_idx = i%disk->advise_list_depth;
2086 op = disk->advise_list[disk->req_idx];
2087 int result = _dispatch_operation_perform(op);
2088 disk->advise_list[disk->req_idx] = NULL;
2089 disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
2090 _dispatch_op_debug("async perform completion: disk %p", op, disk);
2091 dispatch_async(disk->pick_queue, ^{
2092 _dispatch_op_debug("perform completion", op);
2093 switch (result) {
2094 case DISPATCH_OP_DELIVER:
2095 _dispatch_operation_deliver_data(op, DOP_DEFAULT);
2096 break;
2097 case DISPATCH_OP_COMPLETE:
2098 _dispatch_disk_complete_operation(disk, op);
2099 break;
2100 case DISPATCH_OP_DELIVER_AND_COMPLETE:
2101 _dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
2102 _dispatch_disk_complete_operation(disk, op);
2103 break;
2104 case DISPATCH_OP_ERR:
2105 _dispatch_disk_cleanup_operations(disk, op->channel);
2106 break;
2107 case DISPATCH_OP_FD_ERR:
2108 _dispatch_disk_cleanup_operations(disk, NULL);
2109 break;
2110 default:
2111 dispatch_assert(result);
2112 break;
2113 }
2114 _dispatch_op_debug("deactivate: disk %p", op, disk);
2115 op->active = false;
2116 disk->io_active = false;
2117 _dispatch_disk_handler(disk);
2118 // Balancing the retain in _dispatch_disk_handler. Note that op must be
2119 // released at the very end, since it might hold the last reference to
2120 // the disk
2121 _dispatch_op_debug("release -> %d (disk perform complete)", op,
2122 op->do_ref_cnt);
2123 _dispatch_release(op);
2124 });
2125 }
2126
2127 #pragma mark -
2128 #pragma mark dispatch_operation_perform
2129
2130 static void
2131 _dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
2132 {
2133 _dispatch_op_debug("advise", op);
2134 if (_dispatch_io_get_error(op, NULL, true)) return;
2135 #ifdef __linux__
2136 // linux does not support fcntl (F_RDAVISE)
2137 // define necessary datastructure and use readahead
2138 struct radvisory {
2139 off_t ra_offset;
2140 int ra_count;
2141 };
2142 #endif
2143 int err;
2144 struct radvisory advise;
2145 // No point in issuing a read advise for the next chunk if we are already
2146 // a chunk ahead from reading the bytes
2147 if (op->advise_offset > (off_t)(((size_t)op->offset + op->total) +
2148 chunk_size + PAGE_SIZE)) {
2149 return;
2150 }
2151 _dispatch_object_debug(op, "%s", __func__);
2152 advise.ra_count = (int)chunk_size;
2153 if (!op->advise_offset) {
2154 op->advise_offset = op->offset;
2155 // If this is the first time through, align the advised range to a
2156 // page boundary
2157 size_t pg_fraction = ((size_t)op->offset + chunk_size) % PAGE_SIZE;
2158 advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
2159 }
2160 advise.ra_offset = op->advise_offset;
2161 op->advise_offset += advise.ra_count;
2162 #ifdef __linux__
2163 _dispatch_io_syscall_switch(err,
2164 readahead(op->fd_entry->fd, advise.ra_offset, advise.ra_count),
2165 case EINVAL: break; // fd does refer to a non-supported filetype
2166 default: (void)dispatch_assume_zero(err); break;
2167 );
2168 #else
2169 _dispatch_io_syscall_switch(err,
2170 fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
2171 case EFBIG: break; // advised past the end of the file rdar://10415691
2172 case ENOTSUP: break; // not all FS support radvise rdar://13484629
2173 // TODO: set disk status on error
2174 default: (void)dispatch_assume_zero(err); break;
2175 );
2176 #endif
2177 }
2178
2179 static int
2180 _dispatch_operation_perform(dispatch_operation_t op)
2181 {
2182 _dispatch_op_debug("perform", op);
2183 int err = _dispatch_io_get_error(op, NULL, true);
2184 if (err) {
2185 goto error;
2186 }
2187 _dispatch_object_debug(op, "%s", __func__);
2188 if (!op->buf) {
2189 size_t max_buf_siz = op->params.high;
2190 size_t chunk_siz = dispatch_io_defaults.chunk_size;
2191 if (op->direction == DOP_DIR_READ) {
2192 // If necessary, create a buffer for the ongoing operation, large
2193 // enough to fit chunk_size but at most high-water
2194 size_t data_siz = dispatch_data_get_size(op->data);
2195 if (data_siz) {
2196 dispatch_assert(data_siz < max_buf_siz);
2197 max_buf_siz -= data_siz;
2198 }
2199 if (max_buf_siz > chunk_siz) {
2200 max_buf_siz = chunk_siz;
2201 }
2202 if (op->length < SIZE_MAX) {
2203 op->buf_siz = op->length - op->total;
2204 if (op->buf_siz > max_buf_siz) {
2205 op->buf_siz = max_buf_siz;
2206 }
2207 } else {
2208 op->buf_siz = max_buf_siz;
2209 }
2210 op->buf = valloc(op->buf_siz);
2211 _dispatch_op_debug("buffer allocated", op);
2212 } else if (op->direction == DOP_DIR_WRITE) {
2213 // Always write the first data piece, if that is smaller than a
2214 // chunk, accumulate further data pieces until chunk size is reached
2215 if (chunk_siz > max_buf_siz) {
2216 chunk_siz = max_buf_siz;
2217 }
2218 op->buf_siz = 0;
2219 dispatch_data_apply(op->data,
2220 ^(dispatch_data_t region DISPATCH_UNUSED,
2221 size_t offset DISPATCH_UNUSED,
2222 const void* buf DISPATCH_UNUSED, size_t len) {
2223 size_t siz = op->buf_siz + len;
2224 if (!op->buf_siz || siz <= chunk_siz) {
2225 op->buf_siz = siz;
2226 }
2227 return (bool)(siz < chunk_siz);
2228 });
2229 if (op->buf_siz > max_buf_siz) {
2230 op->buf_siz = max_buf_siz;
2231 }
2232 dispatch_data_t d;
2233 d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
2234 op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
2235 NULL);
2236 _dispatch_io_data_release(d);
2237 _dispatch_op_debug("buffer mapped", op);
2238 }
2239 }
2240 if (op->fd_entry->fd == -1) {
2241 err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
2242 if (err) {
2243 goto error;
2244 }
2245 }
2246 void *buf = op->buf + op->buf_len;
2247 size_t len = op->buf_siz - op->buf_len;
2248 off_t off = (off_t)((size_t)op->offset + op->total);
2249 ssize_t processed = -1;
2250 syscall:
2251 if (op->direction == DOP_DIR_READ) {
2252 if (op->params.type == DISPATCH_IO_STREAM) {
2253 processed = read(op->fd_entry->fd, buf, len);
2254 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2255 processed = pread(op->fd_entry->fd, buf, len, off);
2256 }
2257 } else if (op->direction == DOP_DIR_WRITE) {
2258 if (op->params.type == DISPATCH_IO_STREAM) {
2259 processed = write(op->fd_entry->fd, buf, len);
2260 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2261 processed = pwrite(op->fd_entry->fd, buf, len, off);
2262 }
2263 }
2264 // Encountered an error on the file descriptor
2265 if (processed == -1) {
2266 err = errno;
2267 if (err == EINTR) {
2268 goto syscall;
2269 }
2270 goto error;
2271 }
2272 // EOF is indicated by two handler invocations
2273 if (processed == 0) {
2274 _dispatch_op_debug("performed: EOF", op);
2275 return DISPATCH_OP_DELIVER_AND_COMPLETE;
2276 }
2277 op->buf_len += (size_t)processed;
2278 op->total += (size_t)processed;
2279 if (op->total == op->length) {
2280 // Finished processing all the bytes requested by the operation
2281 return DISPATCH_OP_COMPLETE;
2282 } else {
2283 // Deliver data only if we satisfy the filters
2284 return DISPATCH_OP_DELIVER;
2285 }
2286 error:
2287 if (err == EAGAIN) {
2288 // For disk based files with blocking I/O we should never get EAGAIN
2289 dispatch_assert(!op->fd_entry->disk);
2290 _dispatch_op_debug("performed: EAGAIN", op);
2291 if (op->direction == DOP_DIR_READ && op->total &&
2292 op->channel == op->fd_entry->convenience_channel) {
2293 // Convenience read with available data completes on EAGAIN
2294 return DISPATCH_OP_COMPLETE_RESUME;
2295 }
2296 return DISPATCH_OP_RESUME;
2297 }
2298 _dispatch_op_debug("performed: err %d", op, err);
2299 op->err = err;
2300 switch (err) {
2301 case ECANCELED:
2302 return DISPATCH_OP_ERR;
2303 case EBADF:
2304 (void)os_atomic_cmpxchg2o(op->fd_entry, err, 0, err, relaxed);
2305 return DISPATCH_OP_FD_ERR;
2306 default:
2307 return DISPATCH_OP_COMPLETE;
2308 }
2309 }
2310
2311 static void
2312 _dispatch_operation_deliver_data(dispatch_operation_t op,
2313 dispatch_op_flags_t flags)
2314 {
2315 // Either called from stream resp. pick queue or when op is finalized
2316 dispatch_data_t data = NULL;
2317 int err = 0;
2318 size_t undelivered = op->undelivered + op->buf_len;
2319 bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
2320 (op->flags & DOP_DELIVER);
2321 op->flags = DOP_DEFAULT;
2322 if (!deliver) {
2323 // Don't deliver data until low water mark has been reached
2324 if (undelivered >= op->params.low) {
2325 deliver = true;
2326 } else if (op->buf_len < op->buf_siz) {
2327 // Request buffer is not yet used up
2328 _dispatch_op_debug("buffer data: undelivered %zu", op, undelivered);
2329 return;
2330 }
2331 } else {
2332 err = op->err;
2333 if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
2334 err = ECANCELED;
2335 op->err = err;
2336 }
2337 }
2338 // Deliver data or buffer used up
2339 if (op->direction == DOP_DIR_READ) {
2340 if (op->buf_len) {
2341 void *buf = op->buf;
2342 data = dispatch_data_create(buf, op->buf_len, NULL,
2343 DISPATCH_DATA_DESTRUCTOR_FREE);
2344 op->buf = NULL;
2345 op->buf_len = 0;
2346 dispatch_data_t d = dispatch_data_create_concat(op->data, data);
2347 _dispatch_io_data_release(op->data);
2348 _dispatch_io_data_release(data);
2349 data = d;
2350 } else {
2351 data = op->data;
2352 }
2353 op->data = deliver ? dispatch_data_empty : data;
2354 } else if (op->direction == DOP_DIR_WRITE) {
2355 if (deliver) {
2356 data = dispatch_data_create_subrange(op->data, op->buf_len,
2357 op->length);
2358 }
2359 if (op->buf_data && op->buf_len == op->buf_siz) {
2360 _dispatch_io_data_release(op->buf_data);
2361 op->buf_data = NULL;
2362 op->buf = NULL;
2363 op->buf_len = 0;
2364 // Trim newly written buffer from head of unwritten data
2365 dispatch_data_t d;
2366 if (deliver) {
2367 _dispatch_io_data_retain(data);
2368 d = data;
2369 } else {
2370 d = dispatch_data_create_subrange(op->data, op->buf_siz,
2371 op->length);
2372 }
2373 _dispatch_io_data_release(op->data);
2374 op->data = d;
2375 }
2376 } else {
2377 dispatch_assert(op->direction < DOP_DIR_MAX);
2378 return;
2379 }
2380 if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
2381 op->undelivered = undelivered;
2382 _dispatch_op_debug("buffer data: undelivered %zu", op, undelivered);
2383 return;
2384 }
2385 op->undelivered = 0;
2386 _dispatch_object_debug(op, "%s", __func__);
2387 _dispatch_op_debug("deliver data", op);
2388 dispatch_op_direction_t direction = op->direction;
2389 dispatch_io_handler_t handler = op->handler;
2390 dispatch_fd_entry_t fd_entry = op->fd_entry;
2391 _dispatch_fd_entry_retain(fd_entry);
2392 dispatch_io_t channel = op->channel;
2393 _dispatch_retain(channel);
2394 // Note that data delivery may occur after the operation is freed
2395 dispatch_async(op->op_q, ^{
2396 bool done = (flags & DOP_DONE);
2397 dispatch_data_t d = data;
2398 if (done) {
2399 if (direction == DOP_DIR_READ && err) {
2400 if (dispatch_data_get_size(d)) {
2401 _dispatch_op_debug("IO handler invoke", op);
2402 handler(false, d, 0);
2403 }
2404 d = NULL;
2405 } else if (direction == DOP_DIR_WRITE && !err) {
2406 d = NULL;
2407 }
2408 }
2409 _dispatch_op_debug("IO handler invoke: err %d", op, err);
2410 handler(done, d, err);
2411 _dispatch_release(channel);
2412 _dispatch_fd_entry_release(fd_entry);
2413 _dispatch_io_data_release(data);
2414 });
2415 }
2416
2417 #pragma mark -
2418 #pragma mark dispatch_io_debug
2419
2420 static size_t
2421 _dispatch_io_debug_attr(dispatch_io_t channel, char* buf, size_t bufsiz)
2422 {
2423 dispatch_queue_t target = channel->do_targetq;
2424 return dsnprintf(buf, bufsiz, "type = %s, fd = 0x%x, %sfd_entry = %p, "
2425 "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = "
2426 "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ",
2427 channel->params.type == DISPATCH_IO_STREAM ? "stream" : "random",
2428 channel->fd_actual, channel->atomic_flags & DIO_STOPPED ?
2429 "stopped, " : channel->atomic_flags & DIO_CLOSED ? "closed, " : "",
2430 channel->fd_entry, channel->queue, target && target->dq_label ?
2431 target->dq_label : "", target, channel->barrier_queue,
2432 channel->barrier_group, channel->err, channel->params.low,
2433 channel->params.high, channel->params.interval_flags &
2434 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "",
2435 (unsigned long long) channel->params.interval);
2436 }
2437
2438 size_t
2439 _dispatch_io_debug(dispatch_io_t channel, char* buf, size_t bufsiz)
2440 {
2441 size_t offset = 0;
2442 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2443 dx_kind(channel), channel);
2444 offset += _dispatch_object_debug_attr(channel, &buf[offset],
2445 bufsiz - offset);
2446 offset += _dispatch_io_debug_attr(channel, &buf[offset], bufsiz - offset);
2447 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2448 return offset;
2449 }
2450
2451 static size_t
2452 _dispatch_operation_debug_attr(dispatch_operation_t op, char* buf,
2453 size_t bufsiz)
2454 {
2455 dispatch_queue_t target = op->do_targetq;
2456 dispatch_queue_t oqtarget = op->op_q ? op->op_q->do_targetq : NULL;
2457 return dsnprintf(buf, bufsiz, "type = %s %s, fd = 0x%x, fd_entry = %p, "
2458 "channel = %p, queue = %p -> %s[%p], target = %s[%p], "
2459 "offset = %lld, length = %zu, done = %zu, undelivered = %zu, "
2460 "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, "
2461 "interval%s = %llu ", op->params.type == DISPATCH_IO_STREAM ?
2462 "stream" : "random", op->direction == DOP_DIR_READ ? "read" :
2463 "write", op->fd_entry ? op->fd_entry->fd : -1, op->fd_entry,
2464 op->channel, op->op_q, oqtarget && oqtarget->dq_label ?
2465 oqtarget->dq_label : "", oqtarget, target && target->dq_label ?
2466 target->dq_label : "", target, (long long)op->offset, op->length,
2467 op->total, op->undelivered + op->buf_len, op->flags, op->err,
2468 op->params.low, op->params.high, op->params.interval_flags &
2469 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "",
2470 (unsigned long long)op->params.interval);
2471 }
2472
2473 size_t
2474 _dispatch_operation_debug(dispatch_operation_t op, char* buf, size_t bufsiz)
2475 {
2476 size_t offset = 0;
2477 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2478 dx_kind(op), op);
2479 offset += _dispatch_object_debug_attr(op, &buf[offset], bufsiz - offset);
2480 offset += _dispatch_operation_debug_attr(op, &buf[offset], bufsiz - offset);
2481 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2482 return offset;
2483 }