]> git.saurik.com Git - apple/libdispatch.git/blob - src/io.c
libdispatch-913.1.6.tar.gz
[apple/libdispatch.git] / src / io.c
1 /*
2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 #ifndef DISPATCH_IO_DEBUG
24 #define DISPATCH_IO_DEBUG DISPATCH_DEBUG
25 #endif
26
27 #ifndef PAGE_SIZE
28 #define PAGE_SIZE ((size_t)getpagesize())
29 #endif
30
31 #if DISPATCH_DATA_IS_BRIDGED_TO_NSDATA
32 #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
33 #define _dispatch_io_data_release(x) _dispatch_objc_release(x)
34 #else
35 #define _dispatch_io_data_retain(x) dispatch_retain(x)
36 #define _dispatch_io_data_release(x) dispatch_release(x)
37 #endif
38
39 typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
40
41 DISPATCH_EXPORT DISPATCH_NOTHROW
42 void _dispatch_iocntl(uint32_t param, uint64_t value);
43
44 static dispatch_operation_t _dispatch_operation_create(
45 dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
46 size_t length, dispatch_data_t data, dispatch_queue_t queue,
47 dispatch_io_handler_t handler);
48 static void _dispatch_operation_enqueue(dispatch_operation_t op,
49 dispatch_op_direction_t direction, dispatch_data_t data);
50 static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
51 dispatch_operation_t op);
52 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
53 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
54 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
55 dispatch_fd_entry_init_callback_t completion_callback);
56 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
57 uintptr_t hash);
58 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
59 dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
60 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
61 dispatch_io_t channel);
62 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
63 dispatch_io_t channel);
64 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
65 dispatch_queue_t tq);
66 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
67 dispatch_op_direction_t direction);
68 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
69 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
70 dispatch_operation_t operation, dispatch_data_t data);
71 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
72 dispatch_operation_t operation, dispatch_data_t data);
73 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
74 dispatch_io_t channel);
75 static void _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk,
76 dispatch_io_t channel);
77 static void _dispatch_stream_source_handler(void *ctx);
78 static void _dispatch_stream_queue_handler(void *ctx);
79 static void _dispatch_stream_handler(void *ctx);
80 static void _dispatch_disk_handler(void *ctx);
81 static void _dispatch_disk_perform(void *ctxt);
82 static void _dispatch_operation_advise(dispatch_operation_t op,
83 size_t chunk_size);
84 static int _dispatch_operation_perform(dispatch_operation_t op);
85 static void _dispatch_operation_deliver_data(dispatch_operation_t op,
86 dispatch_op_flags_t flags);
87
88 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
89 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
90 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
91 case EINTR: continue; \
92 __VA_ARGS__ \
93 } \
94 break; \
95 } while (1)
96 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
97 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
98 case 0: break; \
99 __VA_ARGS__ \
100 ); \
101 } while (0)
102 #define _dispatch_io_syscall(__syscall) do { int __err; \
103 _dispatch_io_syscall_switch(__err, __syscall); \
104 } while (0)
105
106 enum {
107 DISPATCH_OP_COMPLETE = 1,
108 DISPATCH_OP_DELIVER,
109 DISPATCH_OP_DELIVER_AND_COMPLETE,
110 DISPATCH_OP_COMPLETE_RESUME,
111 DISPATCH_OP_RESUME,
112 DISPATCH_OP_ERR,
113 DISPATCH_OP_FD_ERR,
114 };
115
116 #define _dispatch_io_Block_copy(x) \
117 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
118
119 #pragma mark -
120 #pragma mark dispatch_io_debug
121
122 #if DISPATCH_IO_DEBUG
123 #if !DISPATCH_DEBUG
124 #define _dispatch_io_log(x, ...) do { \
125 _dispatch_log("%llu\t%p\t" x, _dispatch_absolute_time(), \
126 (void *)_dispatch_thread_self(), ##__VA_ARGS__); \
127 } while (0)
128 #ifdef _dispatch_object_debug
129 #undef _dispatch_object_debug
130 #define _dispatch_object_debug dispatch_debug
131 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
132 #endif
133 #else
134 #define _dispatch_io_log(x, ...) _dispatch_debug(x, ##__VA_ARGS__)
135 #endif // DISPATCH_DEBUG
136 #else
137 #define _dispatch_io_log(x, ...)
138 #endif // DISPATCH_IO_DEBUG
139
140 #define _dispatch_fd_debug(msg, fd, ...) \
141 _dispatch_io_log("fd[0x%x]: " msg, fd, ##__VA_ARGS__)
142 #define _dispatch_op_debug(msg, op, ...) \
143 _dispatch_io_log("op[%p]: " msg, op, ##__VA_ARGS__)
144 #define _dispatch_channel_debug(msg, channel, ...) \
145 _dispatch_io_log("channel[%p]: " msg, channel, ##__VA_ARGS__)
146 #define _dispatch_fd_entry_debug(msg, fd_entry, ...) \
147 _dispatch_io_log("fd_entry[%p]: " msg, fd_entry, ##__VA_ARGS__)
148 #define _dispatch_disk_debug(msg, disk, ...) \
149 _dispatch_io_log("disk[%p]: " msg, disk, ##__VA_ARGS__)
150
151 #pragma mark -
152 #pragma mark dispatch_io_hashtables
153
154 // Global hashtable of dev_t -> disk_s mappings
155 DISPATCH_CACHELINE_ALIGN
156 static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
157 // Global hashtable of fd -> fd_entry_s mappings
158 DISPATCH_CACHELINE_ALIGN
159 static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
160
161 static dispatch_once_t _dispatch_io_devs_lockq_pred;
162 static dispatch_queue_t _dispatch_io_devs_lockq;
163 static dispatch_queue_t _dispatch_io_fds_lockq;
164
165 static char const * const _dispatch_io_key = "io";
166
167 static void
168 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
169 {
170 _dispatch_io_fds_lockq = dispatch_queue_create(
171 "com.apple.libdispatch-io.fd_lockq", NULL);
172 unsigned int i;
173 for (i = 0; i < DIO_HASH_SIZE; i++) {
174 TAILQ_INIT(&_dispatch_io_fds[i]);
175 }
176 }
177
178 static void
179 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
180 {
181 _dispatch_io_devs_lockq = dispatch_queue_create(
182 "com.apple.libdispatch-io.dev_lockq", NULL);
183 unsigned int i;
184 for (i = 0; i < DIO_HASH_SIZE; i++) {
185 TAILQ_INIT(&_dispatch_io_devs[i]);
186 }
187 }
188
189 #pragma mark -
190 #pragma mark dispatch_io_defaults
191
192 enum {
193 DISPATCH_IOCNTL_CHUNK_PAGES = 1,
194 DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
195 DISPATCH_IOCNTL_INITIAL_DELIVERY,
196 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
197 };
198
199 static struct dispatch_io_defaults_s {
200 size_t chunk_size, low_water_chunks, max_pending_io_reqs;
201 bool initial_delivery;
202 } dispatch_io_defaults = {
203 .chunk_size = DIO_MAX_CHUNK_SIZE,
204 .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
205 .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
206 };
207
208 #define _dispatch_iocntl_set_default(p, v) do { \
209 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
210 } while (0)
211
212 void
213 _dispatch_iocntl(uint32_t param, uint64_t value)
214 {
215 switch (param) {
216 case DISPATCH_IOCNTL_CHUNK_PAGES:
217 _dispatch_iocntl_set_default(chunk_size, value * PAGE_SIZE);
218 break;
219 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
220 _dispatch_iocntl_set_default(low_water_chunks, value);
221 break;
222 case DISPATCH_IOCNTL_INITIAL_DELIVERY:
223 _dispatch_iocntl_set_default(initial_delivery, value);
224 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
225 _dispatch_iocntl_set_default(max_pending_io_reqs, value);
226 break;
227 }
228 }
229
230 #pragma mark -
231 #pragma mark dispatch_io_t
232
233 static dispatch_io_t
234 _dispatch_io_create(dispatch_io_type_t type)
235 {
236 dispatch_io_t channel = _dispatch_object_alloc(DISPATCH_VTABLE(io),
237 sizeof(struct dispatch_io_s));
238 channel->do_next = DISPATCH_OBJECT_LISTLESS;
239 channel->do_targetq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
240 channel->params.type = type;
241 channel->params.high = SIZE_MAX;
242 channel->params.low = dispatch_io_defaults.low_water_chunks *
243 dispatch_io_defaults.chunk_size;
244 channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
245 NULL);
246 return channel;
247 }
248
249 static void
250 _dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
251 dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
252 {
253 // Enqueue the cleanup handler on the suspended close queue
254 if (cleanup_handler) {
255 _dispatch_retain(queue);
256 dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
257 dispatch_async(queue, ^{
258 _dispatch_channel_debug("cleanup handler invoke: err %d",
259 channel, err);
260 cleanup_handler(err);
261 });
262 _dispatch_release(queue);
263 });
264 }
265 if (fd_entry) {
266 channel->fd_entry = fd_entry;
267 dispatch_retain(fd_entry->barrier_queue);
268 dispatch_retain(fd_entry->barrier_group);
269 channel->barrier_queue = fd_entry->barrier_queue;
270 channel->barrier_group = fd_entry->barrier_group;
271 } else {
272 // Still need to create a barrier queue, since all operations go
273 // through it
274 channel->barrier_queue = dispatch_queue_create(
275 "com.apple.libdispatch-io.barrierq", NULL);
276 channel->barrier_group = dispatch_group_create();
277 }
278 }
279
280 void
281 _dispatch_io_dispose(dispatch_io_t channel, DISPATCH_UNUSED bool *allow_free)
282 {
283 _dispatch_object_debug(channel, "%s", __func__);
284 if (channel->fd_entry &&
285 !(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
286 if (channel->fd_entry->path_data) {
287 // This modification is safe since path_data->channel is checked
288 // only on close_queue (which is still suspended at this point)
289 channel->fd_entry->path_data->channel = NULL;
290 }
291 // Cleanup handlers will only run when all channels related to this
292 // fd are complete
293 _dispatch_fd_entry_release(channel->fd_entry);
294 }
295 if (channel->queue) {
296 dispatch_release(channel->queue);
297 }
298 if (channel->barrier_queue) {
299 dispatch_release(channel->barrier_queue);
300 }
301 if (channel->barrier_group) {
302 dispatch_release(channel->barrier_group);
303 }
304 }
305
306 static int
307 _dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
308 {
309 int err = 0;
310 if (S_ISDIR(mode)) {
311 err = EISDIR;
312 } else if (channel->params.type == DISPATCH_IO_RANDOM &&
313 (S_ISFIFO(mode) || S_ISSOCK(mode))) {
314 err = ESPIPE;
315 }
316 return err;
317 }
318
319 static int
320 _dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
321 bool ignore_closed)
322 {
323 // On _any_ queue
324 int err;
325 if (op) {
326 channel = op->channel;
327 }
328 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
329 if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
330 err = ECANCELED;
331 } else {
332 err = 0;
333 }
334 } else {
335 err = op ? op->fd_entry->err : channel->err;
336 }
337 return err;
338 }
339
340 #pragma mark -
341 #pragma mark dispatch_io_channels
342
343 dispatch_io_t
344 dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
345 dispatch_queue_t queue, void (^cleanup_handler)(int))
346 {
347 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
348 return DISPATCH_BAD_INPUT;
349 }
350 dispatch_io_t channel = _dispatch_io_create(type);
351 channel->fd = fd;
352 _dispatch_channel_debug("create", channel);
353 channel->fd_actual = fd;
354 dispatch_suspend(channel->queue);
355 _dispatch_retain(queue);
356 _dispatch_retain(channel);
357 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
358 // On barrier queue
359 int err = fd_entry->err;
360 if (!err) {
361 err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
362 }
363 if (!err && type == DISPATCH_IO_RANDOM) {
364 off_t f_ptr;
365 _dispatch_io_syscall_switch_noerr(err,
366 f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
367 case 0: channel->f_ptr = f_ptr; break;
368 default: (void)dispatch_assume_zero(err); break;
369 );
370 }
371 channel->err = err;
372 _dispatch_fd_entry_retain(fd_entry);
373 _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
374 dispatch_resume(channel->queue);
375 _dispatch_object_debug(channel, "%s", __func__);
376 _dispatch_release(channel);
377 _dispatch_release(queue);
378 });
379 _dispatch_object_debug(channel, "%s", __func__);
380 return channel;
381 }
382
383 dispatch_io_t
384 dispatch_io_create_f(dispatch_io_type_t type, dispatch_fd_t fd,
385 dispatch_queue_t queue, void *context,
386 void (*cleanup_handler)(void *context, int error))
387 {
388 return dispatch_io_create(type, fd, queue, !cleanup_handler ? NULL :
389 ^(int error){ cleanup_handler(context, error); });
390 }
391
392 dispatch_io_t
393 dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
394 int oflag, mode_t mode, dispatch_queue_t queue,
395 void (^cleanup_handler)(int error))
396 {
397 if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
398 !(*path == '/')) {
399 return DISPATCH_BAD_INPUT;
400 }
401 size_t pathlen = strlen(path);
402 dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
403 if (!path_data) {
404 return DISPATCH_OUT_OF_MEMORY;
405 }
406 dispatch_io_t channel = _dispatch_io_create(type);
407 channel->fd = -1;
408 _dispatch_channel_debug("create with path %s", channel, path);
409 channel->fd_actual = -1;
410 path_data->channel = channel;
411 path_data->oflag = oflag;
412 path_data->mode = mode;
413 path_data->pathlen = pathlen;
414 memcpy(path_data->path, path, pathlen + 1);
415 _dispatch_retain(queue);
416 _dispatch_retain(channel);
417 dispatch_async(channel->queue, ^{
418 int err = 0;
419 struct stat st;
420 _dispatch_io_syscall_switch_noerr(err,
421 (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW
422 #ifndef __linux__
423 || (path_data->oflag & O_SYMLINK) == O_SYMLINK
424 #endif
425 ? lstat(path_data->path, &st) : stat(path_data->path, &st),
426 case 0:
427 err = _dispatch_io_validate_type(channel, st.st_mode);
428 break;
429 default:
430 if ((path_data->oflag & O_CREAT) &&
431 (*(path_data->path + path_data->pathlen - 1) != '/')) {
432 // Check parent directory
433 char *c = strrchr(path_data->path, '/');
434 dispatch_assert(c);
435 *c = 0;
436 int perr;
437 _dispatch_io_syscall_switch_noerr(perr,
438 stat(path_data->path, &st),
439 case 0:
440 // Since the parent directory exists, open() will
441 // create a regular file after the fd_entry has
442 // been filled in
443 st.st_mode = S_IFREG;
444 err = 0;
445 break;
446 );
447 *c = '/';
448 }
449 break;
450 );
451 channel->err = err;
452 if (err) {
453 free(path_data);
454 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
455 _dispatch_release(channel);
456 _dispatch_release(queue);
457 return;
458 }
459 dispatch_suspend(channel->queue);
460 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
461 _dispatch_io_devs_lockq_init);
462 dispatch_async(_dispatch_io_devs_lockq, ^{
463 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
464 path_data, st.st_dev, st.st_mode);
465 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
466 dispatch_resume(channel->queue);
467 _dispatch_object_debug(channel, "%s", __func__);
468 _dispatch_release(channel);
469 _dispatch_release(queue);
470 });
471 });
472 _dispatch_object_debug(channel, "%s", __func__);
473 return channel;
474 }
475
476 dispatch_io_t
477 dispatch_io_create_with_path_f(dispatch_io_type_t type, const char *path,
478 int oflag, mode_t mode, dispatch_queue_t queue, void *context,
479 void (*cleanup_handler)(void *context, int error))
480 {
481 return dispatch_io_create_with_path(type, path, oflag, mode, queue,
482 !cleanup_handler ? NULL :
483 ^(int error){ cleanup_handler(context, error); });
484 }
485
486 dispatch_io_t
487 dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
488 dispatch_queue_t queue, void (^cleanup_handler)(int error))
489 {
490 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
491 return DISPATCH_BAD_INPUT;
492 }
493 dispatch_io_t channel = _dispatch_io_create(type);
494 _dispatch_channel_debug("create with channel %p", channel, in_channel);
495 dispatch_suspend(channel->queue);
496 _dispatch_retain(queue);
497 _dispatch_retain(channel);
498 _dispatch_retain(in_channel);
499 dispatch_async(in_channel->queue, ^{
500 int err0 = _dispatch_io_get_error(NULL, in_channel, false);
501 if (err0) {
502 channel->err = err0;
503 _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
504 dispatch_resume(channel->queue);
505 _dispatch_release(channel);
506 _dispatch_release(in_channel);
507 _dispatch_release(queue);
508 return;
509 }
510 dispatch_async(in_channel->barrier_queue, ^{
511 int err = _dispatch_io_get_error(NULL, in_channel, false);
512 // If there is no error, the fd_entry for the in_channel is valid.
513 // Since we are running on in_channel's queue, the fd_entry has been
514 // fully resolved and will stay valid for the duration of this block
515 if (!err) {
516 err = in_channel->err;
517 if (!err) {
518 err = in_channel->fd_entry->err;
519 }
520 }
521 if (!err) {
522 err = _dispatch_io_validate_type(channel,
523 in_channel->fd_entry->stat.mode);
524 }
525 if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
526 off_t f_ptr;
527 _dispatch_io_syscall_switch_noerr(err,
528 f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
529 case 0: channel->f_ptr = f_ptr; break;
530 default: (void)dispatch_assume_zero(err); break;
531 );
532 }
533 channel->err = err;
534 if (err) {
535 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
536 dispatch_resume(channel->queue);
537 _dispatch_release(channel);
538 _dispatch_release(in_channel);
539 _dispatch_release(queue);
540 return;
541 }
542 if (in_channel->fd == -1) {
543 // in_channel was created from path
544 channel->fd = -1;
545 channel->fd_actual = -1;
546 mode_t mode = in_channel->fd_entry->stat.mode;
547 dev_t dev = in_channel->fd_entry->stat.dev;
548 size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
549 in_channel->fd_entry->path_data->pathlen + 1;
550 dispatch_io_path_data_t path_data = malloc(path_data_len);
551 memcpy(path_data, in_channel->fd_entry->path_data,
552 path_data_len);
553 path_data->channel = channel;
554 // lockq_io_devs is known to already exist
555 dispatch_async(_dispatch_io_devs_lockq, ^{
556 dispatch_fd_entry_t fd_entry;
557 fd_entry = _dispatch_fd_entry_create_with_path(path_data,
558 dev, mode);
559 _dispatch_io_init(channel, fd_entry, queue, 0,
560 cleanup_handler);
561 dispatch_resume(channel->queue);
562 _dispatch_release(channel);
563 _dispatch_release(queue);
564 });
565 } else {
566 dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
567 channel->fd = in_channel->fd;
568 channel->fd_actual = in_channel->fd_actual;
569 _dispatch_fd_entry_retain(fd_entry);
570 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
571 dispatch_resume(channel->queue);
572 _dispatch_release(channel);
573 _dispatch_release(queue);
574 }
575 _dispatch_release(in_channel);
576 _dispatch_object_debug(channel, "%s", __func__);
577 });
578 });
579 _dispatch_object_debug(channel, "%s", __func__);
580 return channel;
581 }
582
583 dispatch_io_t
584 dispatch_io_create_with_io_f(dispatch_io_type_t type, dispatch_io_t in_channel,
585 dispatch_queue_t queue, void *context,
586 void (*cleanup_handler)(void *context, int error))
587 {
588 return dispatch_io_create_with_io(type, in_channel, queue,
589 !cleanup_handler ? NULL :
590 ^(int error){ cleanup_handler(context, error); });
591 }
592
593 #pragma mark -
594 #pragma mark dispatch_io_accessors
595
596 void
597 dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
598 {
599 _dispatch_retain(channel);
600 dispatch_async(channel->queue, ^{
601 _dispatch_channel_debug("set high water: %zu", channel, high_water);
602 if (channel->params.low > high_water) {
603 channel->params.low = high_water;
604 }
605 channel->params.high = high_water ? high_water : 1;
606 _dispatch_release(channel);
607 });
608 }
609
610 void
611 dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
612 {
613 _dispatch_retain(channel);
614 dispatch_async(channel->queue, ^{
615 _dispatch_channel_debug("set low water: %zu", channel, low_water);
616 if (channel->params.high < low_water) {
617 channel->params.high = low_water ? low_water : 1;
618 }
619 channel->params.low = low_water;
620 _dispatch_release(channel);
621 });
622 }
623
624 void
625 dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
626 unsigned long flags)
627 {
628 _dispatch_retain(channel);
629 dispatch_async(channel->queue, ^{
630 _dispatch_channel_debug("set interval: %llu", channel, interval);
631 channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX;
632 channel->params.interval_flags = flags;
633 _dispatch_release(channel);
634 });
635 }
636
637 void
638 _dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
639 {
640 _dispatch_retain(dq);
641 _dispatch_retain(channel);
642 dispatch_async(channel->queue, ^{
643 dispatch_queue_t prev_dq = channel->do_targetq;
644 channel->do_targetq = dq;
645 _dispatch_release(prev_dq);
646 _dispatch_object_debug(channel, "%s", __func__);
647 _dispatch_release(channel);
648 });
649 }
650
651 dispatch_fd_t
652 dispatch_io_get_descriptor(dispatch_io_t channel)
653 {
654 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
655 return -1;
656 }
657 dispatch_fd_t fd = channel->fd_actual;
658 if (fd == -1 && !_dispatch_io_get_error(NULL, channel, false)) {
659 dispatch_thread_context_t ctxt =
660 _dispatch_thread_context_find(_dispatch_io_key);
661 if (ctxt && ctxt->dtc_io_in_barrier == channel) {
662 (void)_dispatch_fd_entry_open(channel->fd_entry, channel);
663 }
664 }
665 return channel->fd_actual;
666 }
667
668 #pragma mark -
669 #pragma mark dispatch_io_operations
670
671 static void
672 _dispatch_io_stop(dispatch_io_t channel)
673 {
674 _dispatch_channel_debug("stop", channel);
675 (void)os_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed);
676 _dispatch_retain(channel);
677 dispatch_async(channel->queue, ^{
678 dispatch_async(channel->barrier_queue, ^{
679 _dispatch_object_debug(channel, "%s", __func__);
680 dispatch_fd_entry_t fd_entry = channel->fd_entry;
681 if (fd_entry) {
682 _dispatch_channel_debug("stop cleanup", channel);
683 _dispatch_fd_entry_cleanup_operations(fd_entry, channel);
684 if (!(channel->atomic_flags & DIO_CLOSED)) {
685 if (fd_entry->path_data) {
686 fd_entry->path_data->channel = NULL;
687 }
688 channel->fd_entry = NULL;
689 _dispatch_fd_entry_release(fd_entry);
690 }
691 } else if (channel->fd != -1) {
692 // Stop after close, need to check if fd_entry still exists
693 _dispatch_retain(channel);
694 dispatch_async(_dispatch_io_fds_lockq, ^{
695 _dispatch_object_debug(channel, "%s", __func__);
696 _dispatch_channel_debug("stop cleanup after close",
697 channel);
698 dispatch_fd_entry_t fdi;
699 uintptr_t hash = DIO_HASH(channel->fd);
700 TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
701 if (fdi->fd == channel->fd) {
702 _dispatch_fd_entry_cleanup_operations(fdi, channel);
703 break;
704 }
705 }
706 _dispatch_release(channel);
707 });
708 }
709 _dispatch_release(channel);
710 });
711 });
712 }
713
714 void
715 dispatch_io_close(dispatch_io_t channel, unsigned long flags)
716 {
717 if (flags & DISPATCH_IO_STOP) {
718 // Don't stop an already stopped channel
719 if (channel->atomic_flags & DIO_STOPPED) {
720 return;
721 }
722 return _dispatch_io_stop(channel);
723 }
724 // Don't close an already closed or stopped channel
725 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
726 return;
727 }
728 _dispatch_retain(channel);
729 dispatch_async(channel->queue, ^{
730 dispatch_async(channel->barrier_queue, ^{
731 _dispatch_object_debug(channel, "%s", __func__);
732 _dispatch_channel_debug("close", channel);
733 if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
734 (void)os_atomic_or2o(channel, atomic_flags, DIO_CLOSED,
735 relaxed);
736 dispatch_fd_entry_t fd_entry = channel->fd_entry;
737 if (fd_entry) {
738 if (fd_entry->path_data) {
739 fd_entry->path_data->channel = NULL;
740 }
741 channel->fd_entry = NULL;
742 _dispatch_fd_entry_release(fd_entry);
743 }
744 }
745 _dispatch_release(channel);
746 });
747 });
748 }
749
750 void
751 dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
752 {
753 _dispatch_retain(channel);
754 dispatch_async(channel->queue, ^{
755 dispatch_queue_t io_q = channel->do_targetq;
756 dispatch_queue_t barrier_queue = channel->barrier_queue;
757 dispatch_group_t barrier_group = channel->barrier_group;
758 dispatch_async(barrier_queue, ^{
759 dispatch_suspend(barrier_queue);
760 dispatch_group_notify(barrier_group, io_q, ^{
761 dispatch_thread_context_s io_ctxt = {
762 .dtc_key = _dispatch_io_key,
763 .dtc_io_in_barrier = channel,
764 };
765
766 _dispatch_object_debug(channel, "%s", __func__);
767 _dispatch_thread_context_push(&io_ctxt);
768 barrier();
769 _dispatch_thread_context_pop(&io_ctxt);
770 dispatch_resume(barrier_queue);
771 _dispatch_release(channel);
772 });
773 });
774 });
775 }
776
777 void
778 dispatch_io_barrier_f(dispatch_io_t channel, void *context,
779 dispatch_function_t barrier)
780 {
781 return dispatch_io_barrier(channel, ^{ barrier(context); });
782 }
783
784 void
785 dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
786 dispatch_queue_t queue, dispatch_io_handler_t handler)
787 {
788 _dispatch_retain(channel);
789 _dispatch_retain(queue);
790 dispatch_async(channel->queue, ^{
791 dispatch_operation_t op;
792 op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
793 length, dispatch_data_empty, queue, handler);
794 if (op) {
795 dispatch_queue_t barrier_q = channel->barrier_queue;
796 dispatch_async(barrier_q, ^{
797 _dispatch_operation_enqueue(op, DOP_DIR_READ,
798 dispatch_data_empty);
799 });
800 }
801 _dispatch_release(channel);
802 _dispatch_release(queue);
803 });
804 }
805
806 void
807 dispatch_io_read_f(dispatch_io_t channel, off_t offset, size_t length,
808 dispatch_queue_t queue, void *context,
809 dispatch_io_handler_function_t handler)
810 {
811 return dispatch_io_read(channel, offset, length, queue,
812 ^(bool done, dispatch_data_t d, int error){
813 handler(context, done, d, error);
814 });
815 }
816
817 void
818 dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
819 dispatch_queue_t queue, dispatch_io_handler_t handler)
820 {
821 _dispatch_io_data_retain(data);
822 _dispatch_retain(channel);
823 _dispatch_retain(queue);
824 dispatch_async(channel->queue, ^{
825 dispatch_operation_t op;
826 op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
827 dispatch_data_get_size(data), data, queue, handler);
828 if (op) {
829 dispatch_queue_t barrier_q = channel->barrier_queue;
830 dispatch_async(barrier_q, ^{
831 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
832 _dispatch_io_data_release(data);
833 });
834 } else {
835 _dispatch_io_data_release(data);
836 }
837 _dispatch_release(channel);
838 _dispatch_release(queue);
839 });
840 }
841
842 void
843 dispatch_io_write_f(dispatch_io_t channel, off_t offset, dispatch_data_t data,
844 dispatch_queue_t queue, void *context,
845 dispatch_io_handler_function_t handler)
846 {
847 return dispatch_io_write(channel, offset, data, queue,
848 ^(bool done, dispatch_data_t d, int error){
849 handler(context, done, d, error);
850 });
851 }
852
853 void
854 dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
855 void (^handler)(dispatch_data_t, int))
856 {
857 _dispatch_retain(queue);
858 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
859 // On barrier queue
860 if (fd_entry->err) {
861 int err = fd_entry->err;
862 dispatch_async(queue, ^{
863 _dispatch_fd_debug("convenience handler invoke", fd);
864 handler(dispatch_data_empty, err);
865 });
866 _dispatch_release(queue);
867 return;
868 }
869 // Safe to access fd_entry on barrier queue
870 dispatch_io_t channel = fd_entry->convenience_channel;
871 if (!channel) {
872 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
873 channel->fd = fd;
874 channel->fd_actual = fd;
875 channel->fd_entry = fd_entry;
876 dispatch_retain(fd_entry->barrier_queue);
877 dispatch_retain(fd_entry->barrier_group);
878 channel->barrier_queue = fd_entry->barrier_queue;
879 channel->barrier_group = fd_entry->barrier_group;
880 fd_entry->convenience_channel = channel;
881 }
882 __block dispatch_data_t deliver_data = dispatch_data_empty;
883 __block int err = 0;
884 dispatch_async(fd_entry->close_queue, ^{
885 dispatch_async(queue, ^{
886 _dispatch_fd_debug("convenience handler invoke", fd);
887 handler(deliver_data, err);
888 _dispatch_io_data_release(deliver_data);
889 });
890 _dispatch_release(queue);
891 });
892 dispatch_operation_t op =
893 _dispatch_operation_create(DOP_DIR_READ, channel, 0,
894 length, dispatch_data_empty,
895 _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false),
896 ^(bool done, dispatch_data_t data, int error) {
897 if (data) {
898 data = dispatch_data_create_concat(deliver_data, data);
899 _dispatch_io_data_release(deliver_data);
900 deliver_data = data;
901 }
902 if (done) {
903 err = error;
904 }
905 });
906 if (op) {
907 _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
908 }
909 });
910 }
911
912 void
913 dispatch_read_f(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
914 void *context, void (*handler)(void *, dispatch_data_t, int))
915 {
916 return dispatch_read(fd, length, queue, ^(dispatch_data_t d, int error){
917 handler(context, d, error);
918 });
919 }
920
921 void
922 dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
923 void (^handler)(dispatch_data_t, int))
924 {
925 _dispatch_io_data_retain(data);
926 _dispatch_retain(queue);
927 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
928 // On barrier queue
929 if (fd_entry->err) {
930 int err = fd_entry->err;
931 dispatch_async(queue, ^{
932 _dispatch_fd_debug("convenience handler invoke", fd);
933 handler(NULL, err);
934 });
935 _dispatch_release(queue);
936 return;
937 }
938 // Safe to access fd_entry on barrier queue
939 dispatch_io_t channel = fd_entry->convenience_channel;
940 if (!channel) {
941 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
942 channel->fd = fd;
943 channel->fd_actual = fd;
944 channel->fd_entry = fd_entry;
945 dispatch_retain(fd_entry->barrier_queue);
946 dispatch_retain(fd_entry->barrier_group);
947 channel->barrier_queue = fd_entry->barrier_queue;
948 channel->barrier_group = fd_entry->barrier_group;
949 fd_entry->convenience_channel = channel;
950 }
951 __block dispatch_data_t deliver_data = NULL;
952 __block int err = 0;
953 dispatch_async(fd_entry->close_queue, ^{
954 dispatch_async(queue, ^{
955 _dispatch_fd_debug("convenience handler invoke", fd);
956 handler(deliver_data, err);
957 if (deliver_data) {
958 _dispatch_io_data_release(deliver_data);
959 }
960 });
961 _dispatch_release(queue);
962 });
963 dispatch_operation_t op =
964 _dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
965 dispatch_data_get_size(data), data,
966 _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false),
967 ^(bool done, dispatch_data_t d, int error) {
968 if (done) {
969 if (d) {
970 _dispatch_io_data_retain(d);
971 deliver_data = d;
972 }
973 err = error;
974 }
975 });
976 if (op) {
977 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
978 }
979 _dispatch_io_data_release(data);
980 });
981 }
982
983 void
984 dispatch_write_f(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
985 void *context, void (*handler)(void *, dispatch_data_t, int))
986 {
987 return dispatch_write(fd, data, queue, ^(dispatch_data_t d, int error){
988 handler(context, d, error);
989 });
990 }
991
992 #pragma mark -
993 #pragma mark dispatch_operation_t
994
995 static dispatch_operation_t
996 _dispatch_operation_create(dispatch_op_direction_t direction,
997 dispatch_io_t channel, off_t offset, size_t length,
998 dispatch_data_t data, dispatch_queue_t queue,
999 dispatch_io_handler_t handler)
1000 {
1001 // On channel queue
1002 dispatch_assert(direction < DOP_DIR_MAX);
1003 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
1004 // that can only be NULL if atomic_flags are set rdar://problem/8362514
1005 int err = _dispatch_io_get_error(NULL, channel, false);
1006 if (err || !length) {
1007 _dispatch_io_data_retain(data);
1008 _dispatch_retain(queue);
1009 dispatch_async(channel->barrier_queue, ^{
1010 dispatch_async(queue, ^{
1011 dispatch_data_t d = data;
1012 if (direction == DOP_DIR_READ && err) {
1013 d = NULL;
1014 } else if (direction == DOP_DIR_WRITE && !err) {
1015 d = NULL;
1016 }
1017 _dispatch_channel_debug("IO handler invoke: err %d", channel,
1018 err);
1019 handler(true, d, err);
1020 _dispatch_io_data_release(data);
1021 });
1022 _dispatch_release(queue);
1023 });
1024 return NULL;
1025 }
1026 dispatch_operation_t op = _dispatch_object_alloc(DISPATCH_VTABLE(operation),
1027 sizeof(struct dispatch_operation_s));
1028 _dispatch_channel_debug("operation create: %p", channel, op);
1029 op->do_next = DISPATCH_OBJECT_LISTLESS;
1030 op->do_xref_cnt = -1; // operation object is not exposed externally
1031 op->op_q = dispatch_queue_create_with_target("com.apple.libdispatch-io.opq",
1032 NULL, queue);
1033 op->active = false;
1034 op->direction = direction;
1035 op->offset = offset + channel->f_ptr;
1036 op->length = length;
1037 op->handler = _dispatch_io_Block_copy(handler);
1038 _dispatch_retain(channel);
1039 op->channel = channel;
1040 op->params = channel->params;
1041 // Take a snapshot of the priority of the channel queue. The actual I/O
1042 // for this operation will be performed at this priority
1043 dispatch_queue_t targetq = op->channel->do_targetq;
1044 while (fastpath(targetq->do_targetq)) {
1045 targetq = targetq->do_targetq;
1046 }
1047 op->do_targetq = targetq;
1048 _dispatch_object_debug(op, "%s", __func__);
1049 return op;
1050 }
1051
1052 void
1053 _dispatch_operation_dispose(dispatch_operation_t op,
1054 DISPATCH_UNUSED bool *allow_free)
1055 {
1056 _dispatch_object_debug(op, "%s", __func__);
1057 _dispatch_op_debug("dispose", op);
1058 // Deliver the data if there's any
1059 if (op->fd_entry) {
1060 _dispatch_operation_deliver_data(op, DOP_DONE);
1061 dispatch_group_leave(op->fd_entry->barrier_group);
1062 _dispatch_fd_entry_release(op->fd_entry);
1063 }
1064 if (op->channel) {
1065 _dispatch_release(op->channel);
1066 }
1067 if (op->timer) {
1068 dispatch_release(op->timer);
1069 }
1070 // For write operations, op->buf is owned by op->buf_data
1071 if (op->buf && op->direction == DOP_DIR_READ) {
1072 free(op->buf);
1073 }
1074 if (op->buf_data) {
1075 _dispatch_io_data_release(op->buf_data);
1076 }
1077 if (op->data) {
1078 _dispatch_io_data_release(op->data);
1079 }
1080 if (op->op_q) {
1081 dispatch_release(op->op_q);
1082 }
1083 Block_release(op->handler);
1084 _dispatch_op_debug("disposed", op);
1085 }
1086
1087 static void
1088 _dispatch_operation_enqueue(dispatch_operation_t op,
1089 dispatch_op_direction_t direction, dispatch_data_t data)
1090 {
1091 // Called from the barrier queue
1092 _dispatch_io_data_retain(data);
1093 // If channel is closed or stopped, then call the handler immediately
1094 int err = _dispatch_io_get_error(NULL, op->channel, false);
1095 if (err) {
1096 dispatch_io_handler_t handler = op->handler;
1097 dispatch_async(op->op_q, ^{
1098 dispatch_data_t d = data;
1099 if (direction == DOP_DIR_READ && err) {
1100 d = NULL;
1101 } else if (direction == DOP_DIR_WRITE && !err) {
1102 d = NULL;
1103 }
1104 handler(true, d, err);
1105 _dispatch_io_data_release(data);
1106 });
1107 _dispatch_op_debug("release -> %d, err %d", op, op->do_ref_cnt, err);
1108 _dispatch_release(op);
1109 return;
1110 }
1111 // Finish operation init
1112 op->fd_entry = op->channel->fd_entry;
1113 _dispatch_fd_entry_retain(op->fd_entry);
1114 dispatch_group_enter(op->fd_entry->barrier_group);
1115 dispatch_disk_t disk = op->fd_entry->disk;
1116 if (!disk) {
1117 dispatch_stream_t stream = op->fd_entry->streams[direction];
1118 dispatch_async(stream->dq, ^{
1119 _dispatch_stream_enqueue_operation(stream, op, data);
1120 _dispatch_io_data_release(data);
1121 });
1122 } else {
1123 dispatch_async(disk->pick_queue, ^{
1124 _dispatch_disk_enqueue_operation(disk, op, data);
1125 _dispatch_io_data_release(data);
1126 });
1127 }
1128 }
1129
1130 static bool
1131 _dispatch_operation_should_enqueue(dispatch_operation_t op,
1132 dispatch_queue_t tq, dispatch_data_t data)
1133 {
1134 // On stream queue or disk queue
1135 _dispatch_op_debug("enqueue", op);
1136 _dispatch_io_data_retain(data);
1137 op->data = data;
1138 int err = _dispatch_io_get_error(op, NULL, true);
1139 if (err) {
1140 op->err = err;
1141 // Final release
1142 _dispatch_op_debug("release -> %d, err %d", op, op->do_ref_cnt, err);
1143 _dispatch_release(op);
1144 return false;
1145 }
1146 if (op->params.interval) {
1147 dispatch_resume(_dispatch_operation_timer(tq, op));
1148 }
1149 return true;
1150 }
1151
1152 static dispatch_source_t
1153 _dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
1154 {
1155 // On stream queue or pick queue
1156 if (op->timer) {
1157 return op->timer;
1158 }
1159 dispatch_source_t timer = dispatch_source_create(
1160 DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
1161 dispatch_source_set_timer(timer,
1162 dispatch_time(DISPATCH_TIME_NOW, (int64_t)op->params.interval),
1163 op->params.interval, 0);
1164 dispatch_source_set_event_handler(timer, ^{
1165 // On stream queue or pick queue
1166 if (dispatch_source_testcancel(timer)) {
1167 // Do nothing. The operation has already completed
1168 return;
1169 }
1170 dispatch_op_flags_t flags = DOP_DEFAULT;
1171 if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
1172 // Deliver even if there is less data than the low-water mark
1173 flags |= DOP_DELIVER;
1174 }
1175 // If the operation is active, dont deliver data
1176 if ((op->active) && (flags & DOP_DELIVER)) {
1177 op->flags = flags;
1178 } else {
1179 _dispatch_operation_deliver_data(op, flags);
1180 }
1181 });
1182 op->timer = timer;
1183 return op->timer;
1184 }
1185
1186 #pragma mark -
1187 #pragma mark dispatch_fd_entry_t
1188
1189 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1190 static void
1191 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry)
1192 {
1193 guardid_t guard = fd_entry;
1194 const unsigned int guard_flags = GUARD_CLOSE;
1195 int err, fd_flags = 0;
1196 _dispatch_io_syscall_switch_noerr(err,
1197 change_fdguard_np(fd_entry->fd, NULL, 0, &guard, guard_flags,
1198 &fd_flags),
1199 case 0:
1200 fd_entry->guard_flags = guard_flags;
1201 fd_entry->orig_fd_flags = fd_flags;
1202 break;
1203 case EPERM: break;
1204 default: (void)dispatch_assume_zero(err); break;
1205 );
1206 }
1207
1208 static void
1209 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry)
1210 {
1211 if (!fd_entry->guard_flags) {
1212 return;
1213 }
1214 guardid_t guard = fd_entry;
1215 int err, fd_flags = fd_entry->orig_fd_flags;
1216 _dispatch_io_syscall_switch(err,
1217 change_fdguard_np(fd_entry->fd, &guard, fd_entry->guard_flags, NULL, 0,
1218 &fd_flags),
1219 default: (void)dispatch_assume_zero(err); break;
1220 );
1221 }
1222 #else
1223 static inline void
1224 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1225 static inline void
1226 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1227 #endif // DISPATCH_USE_GUARDED_FD
1228
1229 static inline int
1230 _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry, const char *path,
1231 int oflag, mode_t mode) {
1232 #if DISPATCH_USE_GUARDED_FD
1233 guardid_t guard = (uintptr_t)fd_entry;
1234 const unsigned int guard_flags = GUARD_CLOSE | GUARD_DUP |
1235 GUARD_SOCKET_IPC | GUARD_FILEPORT;
1236 int fd = guarded_open_np(path, &guard, guard_flags, oflag | O_CLOEXEC,
1237 mode);
1238 if (fd != -1) {
1239 fd_entry->guard_flags = guard_flags;
1240 return fd;
1241 }
1242 errno = 0;
1243 #else
1244 (void)fd_entry;
1245 #endif
1246 return open(path, oflag, mode);
1247 }
1248
1249 static inline int
1250 _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry, int fd) {
1251 #if DISPATCH_USE_GUARDED_FD
1252 if (fd_entry->guard_flags) {
1253 guardid_t guard = (uintptr_t)fd_entry;
1254 return guarded_close_np(fd, &guard);
1255 } else
1256 #else
1257 (void)fd_entry;
1258 #endif
1259 {
1260 return close(fd);
1261 }
1262 }
1263
1264 static inline void
1265 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
1266 dispatch_suspend(fd_entry->close_queue);
1267 }
1268
1269 static inline void
1270 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
1271 dispatch_resume(fd_entry->close_queue);
1272 }
1273
1274 static void
1275 _dispatch_fd_entry_init_async(dispatch_fd_t fd,
1276 dispatch_fd_entry_init_callback_t completion_callback)
1277 {
1278 static dispatch_once_t _dispatch_io_fds_lockq_pred;
1279 dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
1280 _dispatch_io_fds_lockq_init);
1281 dispatch_async(_dispatch_io_fds_lockq, ^{
1282 dispatch_fd_entry_t fd_entry = NULL;
1283 // Check to see if there is an existing entry for the given fd
1284 uintptr_t hash = DIO_HASH(fd);
1285 TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
1286 if (fd_entry->fd == fd) {
1287 // Retain the fd_entry to ensure it cannot go away until the
1288 // stat() has completed
1289 _dispatch_fd_entry_retain(fd_entry);
1290 break;
1291 }
1292 }
1293 if (!fd_entry) {
1294 // If we did not find an existing entry, create one
1295 fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
1296 }
1297 _dispatch_fd_entry_debug("init", fd_entry);
1298 dispatch_async(fd_entry->barrier_queue, ^{
1299 _dispatch_fd_entry_debug("init completion", fd_entry);
1300 completion_callback(fd_entry);
1301 // stat() is complete, release reference to fd_entry
1302 _dispatch_fd_entry_release(fd_entry);
1303 });
1304 });
1305 }
1306
1307 static dispatch_fd_entry_t
1308 _dispatch_fd_entry_create(dispatch_queue_t q)
1309 {
1310 dispatch_fd_entry_t fd_entry;
1311 fd_entry = _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s));
1312 // Use target queue to ensure that no concurrent lookups are going on when
1313 // the close queue is running
1314 fd_entry->close_queue = dispatch_queue_create_with_target(
1315 "com.apple.libdispatch-io.closeq", NULL, q);
1316 // Suspend the cleanup queue until closing
1317 _dispatch_fd_entry_retain(fd_entry);
1318 return fd_entry;
1319 }
1320
1321 static dispatch_fd_entry_t
1322 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
1323 {
1324 // On fds lock queue
1325 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1326 _dispatch_io_fds_lockq);
1327 _dispatch_fd_entry_debug("create: fd %d", fd_entry, fd);
1328 fd_entry->fd = fd;
1329 TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
1330 fd_entry->barrier_queue = dispatch_queue_create(
1331 "com.apple.libdispatch-io.barrierq", NULL);
1332 fd_entry->barrier_group = dispatch_group_create();
1333 dispatch_async(fd_entry->barrier_queue, ^{
1334 _dispatch_fd_entry_debug("stat", fd_entry);
1335 int err, orig_flags, orig_nosigpipe = -1;
1336 struct stat st;
1337 _dispatch_io_syscall_switch(err,
1338 fstat(fd, &st),
1339 default: fd_entry->err = err; return;
1340 );
1341 fd_entry->stat.dev = st.st_dev;
1342 fd_entry->stat.mode = st.st_mode;
1343 _dispatch_fd_entry_guard(fd_entry);
1344 _dispatch_io_syscall_switch(err,
1345 orig_flags = fcntl(fd, F_GETFL),
1346 default: (void)dispatch_assume_zero(err); break;
1347 );
1348 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1349 if (S_ISFIFO(st.st_mode)) {
1350 _dispatch_io_syscall_switch(err,
1351 orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
1352 default: (void)dispatch_assume_zero(err); break;
1353 );
1354 if (orig_nosigpipe != -1) {
1355 _dispatch_io_syscall_switch(err,
1356 orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
1357 default:
1358 orig_nosigpipe = -1;
1359 (void)dispatch_assume_zero(err);
1360 break;
1361 );
1362 }
1363 }
1364 #endif
1365 if (S_ISREG(st.st_mode)) {
1366 if (orig_flags != -1) {
1367 _dispatch_io_syscall_switch(err,
1368 fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
1369 default:
1370 orig_flags = -1;
1371 (void)dispatch_assume_zero(err);
1372 break;
1373 );
1374 }
1375 dev_t dev = major(st.st_dev);
1376 // We have to get the disk on the global dev queue. The
1377 // barrier queue cannot continue until that is complete
1378 dispatch_suspend(fd_entry->barrier_queue);
1379 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
1380 _dispatch_io_devs_lockq_init);
1381 dispatch_async(_dispatch_io_devs_lockq, ^{
1382 _dispatch_disk_init(fd_entry, dev);
1383 dispatch_resume(fd_entry->barrier_queue);
1384 });
1385 } else {
1386 if (orig_flags != -1) {
1387 _dispatch_io_syscall_switch(err,
1388 fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
1389 default:
1390 orig_flags = -1;
1391 (void)dispatch_assume_zero(err);
1392 break;
1393 );
1394 }
1395
1396 _dispatch_stream_init(fd_entry,
1397 _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false));
1398 }
1399 fd_entry->orig_flags = orig_flags;
1400 fd_entry->orig_nosigpipe = orig_nosigpipe;
1401 });
1402 // This is the first item run when the close queue is resumed, indicating
1403 // that all channels associated with this entry have been closed and that
1404 // all operations associated with this entry have been freed
1405 dispatch_async(fd_entry->close_queue, ^{
1406 if (!fd_entry->disk) {
1407 _dispatch_fd_entry_debug("close queue cleanup", fd_entry);
1408 dispatch_op_direction_t dir;
1409 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1410 _dispatch_stream_dispose(fd_entry, dir);
1411 }
1412 } else {
1413 dispatch_disk_t disk = fd_entry->disk;
1414 dispatch_async(_dispatch_io_devs_lockq, ^{
1415 _dispatch_release(disk);
1416 });
1417 }
1418 // Remove this entry from the global fd list
1419 TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
1420 });
1421 // If there was a source associated with this stream, disposing of the
1422 // source cancels it and suspends the close queue. Freeing the fd_entry
1423 // structure must happen after the source cancel handler has finished
1424 dispatch_async(fd_entry->close_queue, ^{
1425 _dispatch_fd_entry_debug("close queue release", fd_entry);
1426 dispatch_release(fd_entry->close_queue);
1427 _dispatch_fd_entry_debug("barrier queue release", fd_entry);
1428 dispatch_release(fd_entry->barrier_queue);
1429 _dispatch_fd_entry_debug("barrier group release", fd_entry);
1430 dispatch_release(fd_entry->barrier_group);
1431 if (fd_entry->orig_flags != -1) {
1432 _dispatch_io_syscall(
1433 fcntl(fd, F_SETFL, fd_entry->orig_flags)
1434 );
1435 }
1436 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1437 if (fd_entry->orig_nosigpipe != -1) {
1438 _dispatch_io_syscall(
1439 fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
1440 );
1441 }
1442 #endif
1443 _dispatch_fd_entry_unguard(fd_entry);
1444 if (fd_entry->convenience_channel) {
1445 fd_entry->convenience_channel->fd_entry = NULL;
1446 dispatch_release(fd_entry->convenience_channel);
1447 }
1448 free(fd_entry);
1449 });
1450 return fd_entry;
1451 }
1452
1453 static dispatch_fd_entry_t
1454 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
1455 dev_t dev, mode_t mode)
1456 {
1457 // On devs lock queue
1458 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1459 path_data->channel->queue);
1460 _dispatch_fd_entry_debug("create: path %s", fd_entry, path_data->path);
1461 if (S_ISREG(mode)) {
1462 _dispatch_disk_init(fd_entry, major(dev));
1463 } else {
1464 _dispatch_stream_init(fd_entry,
1465 _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false));
1466 }
1467 fd_entry->fd = -1;
1468 fd_entry->orig_flags = -1;
1469 fd_entry->path_data = path_data;
1470 fd_entry->stat.dev = dev;
1471 fd_entry->stat.mode = mode;
1472 fd_entry->barrier_queue = dispatch_queue_create(
1473 "com.apple.libdispatch-io.barrierq", NULL);
1474 fd_entry->barrier_group = dispatch_group_create();
1475 // This is the first item run when the close queue is resumed, indicating
1476 // that the channel associated with this entry has been closed and that
1477 // all operations associated with this entry have been freed
1478 dispatch_async(fd_entry->close_queue, ^{
1479 _dispatch_fd_entry_debug("close queue cleanup", fd_entry);
1480 if (!fd_entry->disk) {
1481 dispatch_op_direction_t dir;
1482 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1483 _dispatch_stream_dispose(fd_entry, dir);
1484 }
1485 }
1486 if (fd_entry->fd != -1) {
1487 _dispatch_fd_entry_guarded_close(fd_entry, fd_entry->fd);
1488 }
1489 if (fd_entry->path_data->channel) {
1490 // If associated channel has not been released yet, mark it as
1491 // no longer having an fd_entry (for stop after close).
1492 // It is safe to modify channel since we are on close_queue with
1493 // target queue the channel queue
1494 fd_entry->path_data->channel->fd_entry = NULL;
1495 }
1496 });
1497 dispatch_async(fd_entry->close_queue, ^{
1498 _dispatch_fd_entry_debug("close queue release", fd_entry);
1499 dispatch_release(fd_entry->close_queue);
1500 dispatch_release(fd_entry->barrier_queue);
1501 dispatch_release(fd_entry->barrier_group);
1502 free(fd_entry->path_data);
1503 free(fd_entry);
1504 });
1505 return fd_entry;
1506 }
1507
1508 static int
1509 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
1510 {
1511 if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
1512 return 0;
1513 }
1514 if (fd_entry->err) {
1515 return fd_entry->err;
1516 }
1517 int fd = -1;
1518 int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
1519 fd_entry->path_data->oflag | O_NONBLOCK;
1520 open:
1521 fd = _dispatch_fd_entry_guarded_open(fd_entry, fd_entry->path_data->path,
1522 oflag, fd_entry->path_data->mode);
1523 if (fd == -1) {
1524 int err = errno;
1525 if (err == EINTR) {
1526 goto open;
1527 }
1528 (void)os_atomic_cmpxchg2o(fd_entry, err, 0, err, relaxed);
1529 return err;
1530 }
1531 if (!os_atomic_cmpxchg2o(fd_entry, fd, -1, fd, relaxed)) {
1532 // Lost the race with another open
1533 _dispatch_fd_entry_guarded_close(fd_entry, fd);
1534 } else {
1535 channel->fd_actual = fd;
1536 }
1537 _dispatch_object_debug(channel, "%s", __func__);
1538 return 0;
1539 }
1540
1541 static void
1542 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
1543 dispatch_io_t channel)
1544 {
1545 if (fd_entry->disk) {
1546 if (channel) {
1547 _dispatch_retain(channel);
1548 }
1549 _dispatch_fd_entry_retain(fd_entry);
1550 dispatch_async(fd_entry->disk->pick_queue, ^{
1551 _dispatch_disk_cleanup_inactive_operations(fd_entry->disk, channel);
1552 _dispatch_fd_entry_release(fd_entry);
1553 if (channel) {
1554 _dispatch_release(channel);
1555 }
1556 });
1557 } else {
1558 dispatch_op_direction_t direction;
1559 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1560 dispatch_stream_t stream = fd_entry->streams[direction];
1561 if (!stream) {
1562 continue;
1563 }
1564 if (channel) {
1565 _dispatch_retain(channel);
1566 }
1567 _dispatch_fd_entry_retain(fd_entry);
1568 dispatch_async(stream->dq, ^{
1569 _dispatch_stream_cleanup_operations(stream, channel);
1570 _dispatch_fd_entry_release(fd_entry);
1571 if (channel) {
1572 _dispatch_release(channel);
1573 }
1574 });
1575 }
1576 }
1577 }
1578
1579 #pragma mark -
1580 #pragma mark dispatch_stream_t/dispatch_disk_t
1581
1582 static void
1583 _dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
1584 {
1585 dispatch_op_direction_t direction;
1586 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1587 dispatch_stream_t stream;
1588 stream = _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s));
1589 stream->dq = dispatch_queue_create_with_target(
1590 "com.apple.libdispatch-io.streamq", NULL, tq);
1591 dispatch_set_context(stream->dq, stream);
1592 TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
1593 TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
1594 fd_entry->streams[direction] = stream;
1595 }
1596 }
1597
1598 static void
1599 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
1600 dispatch_op_direction_t direction)
1601 {
1602 // On close queue
1603 dispatch_stream_t stream = fd_entry->streams[direction];
1604 if (!stream) {
1605 return;
1606 }
1607 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1608 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
1609 if (stream->source) {
1610 // Balanced by source cancel handler:
1611 _dispatch_fd_entry_retain(fd_entry);
1612 dispatch_source_cancel(stream->source);
1613 dispatch_resume(stream->source);
1614 dispatch_release(stream->source);
1615 }
1616 dispatch_set_context(stream->dq, NULL);
1617 dispatch_release(stream->dq);
1618 free(stream);
1619 }
1620
1621 static void
1622 _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
1623 {
1624 // On devs lock queue
1625 dispatch_disk_t disk;
1626 // Check to see if there is an existing entry for the given device
1627 uintptr_t hash = DIO_HASH(dev);
1628 TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
1629 if (disk->dev == dev) {
1630 _dispatch_retain(disk);
1631 goto out;
1632 }
1633 }
1634 // Otherwise create a new entry
1635 size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
1636 disk = _dispatch_object_alloc(DISPATCH_VTABLE(disk),
1637 sizeof(struct dispatch_disk_s) +
1638 (pending_reqs_depth * sizeof(dispatch_operation_t)));
1639 disk->do_next = DISPATCH_OBJECT_LISTLESS;
1640 disk->do_xref_cnt = -1;
1641 disk->advise_list_depth = pending_reqs_depth;
1642 disk->do_targetq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false);
1643 disk->dev = dev;
1644 TAILQ_INIT(&disk->operations);
1645 disk->cur_rq = TAILQ_FIRST(&disk->operations);
1646 char label[45];
1647 snprintf(label, sizeof(label), "com.apple.libdispatch-io.deviceq.%d",
1648 (int)dev);
1649 disk->pick_queue = dispatch_queue_create(label, NULL);
1650 TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
1651 out:
1652 fd_entry->disk = disk;
1653 TAILQ_INIT(&fd_entry->stream_ops);
1654 }
1655
1656 void
1657 _dispatch_disk_dispose(dispatch_disk_t disk, DISPATCH_UNUSED bool *allow_free)
1658 {
1659 uintptr_t hash = DIO_HASH(disk->dev);
1660 TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
1661 dispatch_assert(TAILQ_EMPTY(&disk->operations));
1662 size_t i;
1663 for (i=0; i<disk->advise_list_depth; ++i) {
1664 dispatch_assert(!disk->advise_list[i]);
1665 }
1666 dispatch_release(disk->pick_queue);
1667 }
1668
1669 #pragma mark -
1670 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1671
1672 static inline bool
1673 _dispatch_stream_operation_avail(dispatch_stream_t stream)
1674 {
1675 return !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
1676 !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1677 }
1678
1679 static void
1680 _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
1681 dispatch_operation_t op, dispatch_data_t data)
1682 {
1683 if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
1684 return;
1685 }
1686 _dispatch_object_debug(op, "%s", __func__);
1687 bool no_ops = !_dispatch_stream_operation_avail(stream);
1688 TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
1689 if (no_ops) {
1690 dispatch_async_f(stream->dq, stream->dq,
1691 _dispatch_stream_queue_handler);
1692 }
1693 }
1694
1695 static void
1696 _dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
1697 dispatch_data_t data)
1698 {
1699 if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
1700 return;
1701 }
1702 _dispatch_object_debug(op, "%s", __func__);
1703 if (op->params.type == DISPATCH_IO_STREAM) {
1704 if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
1705 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1706 }
1707 TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
1708 } else {
1709 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1710 }
1711 _dispatch_disk_handler(disk);
1712 }
1713
1714 static void
1715 _dispatch_stream_complete_operation(dispatch_stream_t stream,
1716 dispatch_operation_t op)
1717 {
1718 // On stream queue
1719 _dispatch_object_debug(op, "%s", __func__);
1720 _dispatch_op_debug("complete: stream %p", op, stream);
1721 TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
1722 if (op == stream->op) {
1723 stream->op = NULL;
1724 }
1725 if (op->timer) {
1726 dispatch_source_cancel(op->timer);
1727 }
1728 // Final release will deliver any pending data
1729 _dispatch_op_debug("release -> %d (stream complete)", op, op->do_ref_cnt);
1730 _dispatch_release(op);
1731 }
1732
1733 static void
1734 _dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
1735 {
1736 // On pick queue
1737 _dispatch_object_debug(op, "%s", __func__);
1738 _dispatch_op_debug("complete: disk %p", op, disk);
1739 // Current request is always the last op returned
1740 if (disk->cur_rq == op) {
1741 disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
1742 operation_list);
1743 }
1744 if (op->params.type == DISPATCH_IO_STREAM) {
1745 // Check if there are other pending stream operations behind it
1746 dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
1747 TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
1748 if (op_next) {
1749 TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
1750 }
1751 }
1752 TAILQ_REMOVE(&disk->operations, op, operation_list);
1753 if (op->timer) {
1754 dispatch_source_cancel(op->timer);
1755 }
1756 // Final release will deliver any pending data
1757 _dispatch_op_debug("release -> %d (disk complete)", op, op->do_ref_cnt);
1758 _dispatch_release(op);
1759 }
1760
1761 static dispatch_operation_t
1762 _dispatch_stream_pick_next_operation(dispatch_stream_t stream,
1763 dispatch_operation_t op)
1764 {
1765 // On stream queue
1766 if (!op) {
1767 // On the first run through, pick the first operation
1768 if (!_dispatch_stream_operation_avail(stream)) {
1769 return op;
1770 }
1771 if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
1772 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
1773 } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
1774 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1775 }
1776 return op;
1777 }
1778 if (op->params.type == DISPATCH_IO_STREAM) {
1779 // Stream operations need to be serialized so continue the current
1780 // operation until it is finished
1781 return op;
1782 }
1783 // Get the next random operation (round-robin)
1784 if (op->params.type == DISPATCH_IO_RANDOM) {
1785 op = TAILQ_NEXT(op, operation_list);
1786 if (!op) {
1787 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1788 }
1789 return op;
1790 }
1791 return NULL;
1792 }
1793
1794 static dispatch_operation_t
1795 _dispatch_disk_pick_next_operation(dispatch_disk_t disk)
1796 {
1797 // On pick queue
1798 dispatch_operation_t op;
1799 if (!TAILQ_EMPTY(&disk->operations)) {
1800 if (disk->cur_rq == NULL) {
1801 op = TAILQ_FIRST(&disk->operations);
1802 } else {
1803 op = disk->cur_rq;
1804 do {
1805 op = TAILQ_NEXT(op, operation_list);
1806 if (!op) {
1807 op = TAILQ_FIRST(&disk->operations);
1808 }
1809 // TODO: more involved picking algorithm rdar://problem/8780312
1810 } while (op->active && op != disk->cur_rq);
1811 }
1812 if (!op->active) {
1813 disk->cur_rq = op;
1814 return op;
1815 }
1816 }
1817 return NULL;
1818 }
1819
1820 static void
1821 _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
1822 dispatch_io_t channel)
1823 {
1824 // On stream queue
1825 dispatch_operation_t op, tmp;
1826 typeof(*stream->operations) *operations;
1827 operations = &stream->operations[DISPATCH_IO_RANDOM];
1828 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1829 if (!channel || op->channel == channel) {
1830 _dispatch_stream_complete_operation(stream, op);
1831 }
1832 }
1833 operations = &stream->operations[DISPATCH_IO_STREAM];
1834 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1835 if (!channel || op->channel == channel) {
1836 _dispatch_stream_complete_operation(stream, op);
1837 }
1838 }
1839 if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
1840 dispatch_suspend(stream->source);
1841 stream->source_running = false;
1842 }
1843 }
1844
1845 static inline void
1846 _dispatch_disk_cleanup_specified_operations(dispatch_disk_t disk,
1847 dispatch_io_t channel, bool inactive_only)
1848 {
1849 // On pick queue
1850 dispatch_operation_t op, tmp;
1851 TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
1852 if (inactive_only && op->active) continue;
1853 if (!channel || op->channel == channel) {
1854 _dispatch_op_debug("cleanup: disk %p", op, disk);
1855 _dispatch_disk_complete_operation(disk, op);
1856 }
1857 }
1858 }
1859
1860 static void
1861 _dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
1862 {
1863 _dispatch_disk_cleanup_specified_operations(disk, channel, false);
1864 }
1865
1866 static void
1867 _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk,
1868 dispatch_io_t channel)
1869 {
1870 _dispatch_disk_cleanup_specified_operations(disk, channel, true);
1871 }
1872
1873 #pragma mark -
1874 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1875
1876 static dispatch_source_t
1877 _dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
1878 {
1879 // On stream queue
1880 if (stream->source) {
1881 return stream->source;
1882 }
1883 dispatch_fd_t fd = op->fd_entry->fd;
1884 _dispatch_op_debug("stream source create", op);
1885 dispatch_source_t source = NULL;
1886 if (op->direction == DOP_DIR_READ) {
1887 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
1888 (uintptr_t)fd, 0, stream->dq);
1889 } else if (op->direction == DOP_DIR_WRITE) {
1890 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE,
1891 (uintptr_t)fd, 0, stream->dq);
1892 } else {
1893 dispatch_assert(op->direction < DOP_DIR_MAX);
1894 return NULL;
1895 }
1896 dispatch_set_context(source, stream);
1897 dispatch_source_set_event_handler_f(source,
1898 _dispatch_stream_source_handler);
1899 // Close queue must not run user cleanup handlers until sources are fully
1900 // unregistered
1901 dispatch_queue_t close_queue = op->fd_entry->close_queue;
1902 dispatch_source_set_mandatory_cancel_handler(source, ^{
1903 _dispatch_op_debug("stream source cancel", op);
1904 dispatch_resume(close_queue);
1905 });
1906 stream->source = source;
1907 return stream->source;
1908 }
1909
1910 static void
1911 _dispatch_stream_source_handler(void *ctx)
1912 {
1913 // On stream queue
1914 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1915 dispatch_suspend(stream->source);
1916 stream->source_running = false;
1917 return _dispatch_stream_handler(stream);
1918 }
1919
1920 static void
1921 _dispatch_stream_queue_handler(void *ctx)
1922 {
1923 // On stream queue
1924 dispatch_stream_t stream = (dispatch_stream_t)dispatch_get_context(ctx);
1925 if (!stream) {
1926 // _dispatch_stream_dispose has been called
1927 return;
1928 }
1929 return _dispatch_stream_handler(stream);
1930 }
1931
1932 static void
1933 _dispatch_stream_handler(void *ctx)
1934 {
1935 // On stream queue
1936 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1937 dispatch_operation_t op;
1938 pick:
1939 op = _dispatch_stream_pick_next_operation(stream, stream->op);
1940 if (!op) {
1941 _dispatch_debug("no operation found: stream %p", stream);
1942 return;
1943 }
1944 int err = _dispatch_io_get_error(op, NULL, true);
1945 if (err) {
1946 op->err = err;
1947 _dispatch_stream_complete_operation(stream, op);
1948 goto pick;
1949 }
1950 stream->op = op;
1951 _dispatch_op_debug("stream handler", op);
1952 dispatch_fd_entry_t fd_entry = op->fd_entry;
1953 _dispatch_fd_entry_retain(fd_entry);
1954 // For performance analysis
1955 if (!op->total && dispatch_io_defaults.initial_delivery) {
1956 // Empty delivery to signal the start of the operation
1957 _dispatch_op_debug("initial delivery", op);
1958 _dispatch_operation_deliver_data(op, DOP_DELIVER);
1959 }
1960 // TODO: perform on the operation target queue to get correct priority
1961 int result = _dispatch_operation_perform(op);
1962 dispatch_op_flags_t flags = ~0u;
1963 switch (result) {
1964 case DISPATCH_OP_DELIVER:
1965 flags = DOP_DEFAULT;
1966 // Fall through
1967 case DISPATCH_OP_DELIVER_AND_COMPLETE:
1968 flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
1969 DOP_DEFAULT;
1970 _dispatch_operation_deliver_data(op, flags);
1971 // Fall through
1972 case DISPATCH_OP_COMPLETE:
1973 if (flags != DOP_DEFAULT) {
1974 _dispatch_stream_complete_operation(stream, op);
1975 }
1976 if (_dispatch_stream_operation_avail(stream)) {
1977 dispatch_async_f(stream->dq, stream->dq,
1978 _dispatch_stream_queue_handler);
1979 }
1980 break;
1981 case DISPATCH_OP_COMPLETE_RESUME:
1982 _dispatch_stream_complete_operation(stream, op);
1983 // Fall through
1984 case DISPATCH_OP_RESUME:
1985 if (_dispatch_stream_operation_avail(stream)) {
1986 stream->source_running = true;
1987 dispatch_resume(_dispatch_stream_source(stream, op));
1988 }
1989 break;
1990 case DISPATCH_OP_ERR:
1991 _dispatch_stream_cleanup_operations(stream, op->channel);
1992 break;
1993 case DISPATCH_OP_FD_ERR:
1994 _dispatch_fd_entry_retain(fd_entry);
1995 dispatch_async(fd_entry->barrier_queue, ^{
1996 _dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
1997 _dispatch_fd_entry_release(fd_entry);
1998 });
1999 break;
2000 default:
2001 break;
2002 }
2003 _dispatch_fd_entry_release(fd_entry);
2004 return;
2005 }
2006
2007 static void
2008 _dispatch_disk_handler(void *ctx)
2009 {
2010 // On pick queue
2011 dispatch_disk_t disk = (dispatch_disk_t)ctx;
2012 if (disk->io_active) {
2013 return;
2014 }
2015 _dispatch_disk_debug("disk handler", disk);
2016 dispatch_operation_t op;
2017 size_t i = disk->free_idx, j = disk->req_idx;
2018 if (j <= i) {
2019 j += disk->advise_list_depth;
2020 }
2021 while (i <= j) {
2022 if ((!disk->advise_list[i%disk->advise_list_depth]) &&
2023 (op = _dispatch_disk_pick_next_operation(disk))) {
2024 int err = _dispatch_io_get_error(op, NULL, true);
2025 if (err) {
2026 op->err = err;
2027 _dispatch_disk_complete_operation(disk, op);
2028 continue;
2029 }
2030 _dispatch_retain(op);
2031 _dispatch_op_debug("retain -> %d", op, op->do_ref_cnt + 1);
2032 disk->advise_list[i%disk->advise_list_depth] = op;
2033 op->active = true;
2034 _dispatch_op_debug("activate: disk %p", op, disk);
2035 _dispatch_object_debug(op, "%s", __func__);
2036 } else {
2037 // No more operations to get
2038 break;
2039 }
2040 i++;
2041 }
2042 disk->free_idx = (i%disk->advise_list_depth);
2043 op = disk->advise_list[disk->req_idx];
2044 if (op) {
2045 disk->io_active = true;
2046 _dispatch_op_debug("async perform: disk %p", op, disk);
2047 dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
2048 }
2049 }
2050
2051 static void
2052 _dispatch_disk_perform(void *ctxt)
2053 {
2054 dispatch_disk_t disk = ctxt;
2055 _dispatch_disk_debug("disk perform", disk);
2056 size_t chunk_size = dispatch_io_defaults.chunk_size;
2057 dispatch_operation_t op;
2058 size_t i = disk->advise_idx, j = disk->free_idx;
2059 if (j <= i) {
2060 j += disk->advise_list_depth;
2061 }
2062 do {
2063 op = disk->advise_list[i%disk->advise_list_depth];
2064 if (!op) {
2065 // Nothing more to advise, must be at free_idx
2066 dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
2067 break;
2068 }
2069 if (op->direction == DOP_DIR_WRITE) {
2070 // TODO: preallocate writes ? rdar://problem/9032172
2071 continue;
2072 }
2073 if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
2074 op->channel)) {
2075 continue;
2076 }
2077 // For performance analysis
2078 if (!op->total && dispatch_io_defaults.initial_delivery) {
2079 // Empty delivery to signal the start of the operation
2080 _dispatch_op_debug("initial delivery", op);
2081 _dispatch_operation_deliver_data(op, DOP_DELIVER);
2082 }
2083 // Advise two chunks if the list only has one element and this is the
2084 // first advise on the operation
2085 if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
2086 !op->advise_offset) {
2087 chunk_size *= 2;
2088 }
2089 _dispatch_operation_advise(op, chunk_size);
2090 } while (++i < j);
2091 disk->advise_idx = i%disk->advise_list_depth;
2092 op = disk->advise_list[disk->req_idx];
2093 int result = _dispatch_operation_perform(op);
2094 disk->advise_list[disk->req_idx] = NULL;
2095 disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
2096 _dispatch_op_debug("async perform completion: disk %p", op, disk);
2097 dispatch_async(disk->pick_queue, ^{
2098 _dispatch_op_debug("perform completion", op);
2099 switch (result) {
2100 case DISPATCH_OP_DELIVER:
2101 _dispatch_operation_deliver_data(op, DOP_DEFAULT);
2102 break;
2103 case DISPATCH_OP_COMPLETE:
2104 _dispatch_disk_complete_operation(disk, op);
2105 break;
2106 case DISPATCH_OP_DELIVER_AND_COMPLETE:
2107 _dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
2108 _dispatch_disk_complete_operation(disk, op);
2109 break;
2110 case DISPATCH_OP_ERR:
2111 _dispatch_disk_cleanup_operations(disk, op->channel);
2112 break;
2113 case DISPATCH_OP_FD_ERR:
2114 _dispatch_disk_cleanup_operations(disk, NULL);
2115 break;
2116 default:
2117 dispatch_assert(result);
2118 break;
2119 }
2120 _dispatch_op_debug("deactivate: disk %p", op, disk);
2121 op->active = false;
2122 disk->io_active = false;
2123 _dispatch_disk_handler(disk);
2124 // Balancing the retain in _dispatch_disk_handler. Note that op must be
2125 // released at the very end, since it might hold the last reference to
2126 // the disk
2127 _dispatch_op_debug("release -> %d (disk perform complete)", op,
2128 op->do_ref_cnt);
2129 _dispatch_release(op);
2130 });
2131 }
2132
2133 #pragma mark -
2134 #pragma mark dispatch_operation_perform
2135
2136 static void
2137 _dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
2138 {
2139 _dispatch_op_debug("advise", op);
2140 if (_dispatch_io_get_error(op, NULL, true)) return;
2141 #ifdef __linux__
2142 // linux does not support fcntl (F_RDAVISE)
2143 // define necessary datastructure and use readahead
2144 struct radvisory {
2145 off_t ra_offset;
2146 int ra_count;
2147 };
2148 #endif
2149 int err;
2150 struct radvisory advise;
2151 // No point in issuing a read advise for the next chunk if we are already
2152 // a chunk ahead from reading the bytes
2153 if (op->advise_offset > (off_t)(((size_t)op->offset + op->total) +
2154 chunk_size + PAGE_SIZE)) {
2155 return;
2156 }
2157 _dispatch_object_debug(op, "%s", __func__);
2158 advise.ra_count = (int)chunk_size;
2159 if (!op->advise_offset) {
2160 op->advise_offset = op->offset;
2161 // If this is the first time through, align the advised range to a
2162 // page boundary
2163 size_t pg_fraction = ((size_t)op->offset + chunk_size) % PAGE_SIZE;
2164 advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
2165 }
2166 advise.ra_offset = op->advise_offset;
2167 op->advise_offset += advise.ra_count;
2168 #ifdef __linux__
2169 _dispatch_io_syscall_switch(err,
2170 readahead(op->fd_entry->fd, advise.ra_offset, (size_t)advise.ra_count),
2171 case EINVAL: break; // fd does refer to a non-supported filetype
2172 default: (void)dispatch_assume_zero(err); break;
2173 );
2174 #else
2175 _dispatch_io_syscall_switch(err,
2176 fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
2177 case EFBIG: break; // advised past the end of the file rdar://10415691
2178 case ENOTSUP: break; // not all FS support radvise rdar://13484629
2179 // TODO: set disk status on error
2180 default: (void)dispatch_assume_zero(err); break;
2181 );
2182 #endif
2183 }
2184
2185 static int
2186 _dispatch_operation_perform(dispatch_operation_t op)
2187 {
2188 _dispatch_op_debug("perform", op);
2189 int err = _dispatch_io_get_error(op, NULL, true);
2190 if (err) {
2191 goto error;
2192 }
2193 _dispatch_object_debug(op, "%s", __func__);
2194 if (!op->buf) {
2195 size_t max_buf_siz = op->params.high;
2196 size_t chunk_siz = dispatch_io_defaults.chunk_size;
2197 if (op->direction == DOP_DIR_READ) {
2198 // If necessary, create a buffer for the ongoing operation, large
2199 // enough to fit chunk_size but at most high-water
2200 size_t data_siz = dispatch_data_get_size(op->data);
2201 if (data_siz) {
2202 dispatch_assert(data_siz < max_buf_siz);
2203 max_buf_siz -= data_siz;
2204 }
2205 if (max_buf_siz > chunk_siz) {
2206 max_buf_siz = chunk_siz;
2207 }
2208 if (op->length < SIZE_MAX) {
2209 op->buf_siz = op->length - op->total;
2210 if (op->buf_siz > max_buf_siz) {
2211 op->buf_siz = max_buf_siz;
2212 }
2213 } else {
2214 op->buf_siz = max_buf_siz;
2215 }
2216 op->buf = valloc(op->buf_siz);
2217 _dispatch_op_debug("buffer allocated", op);
2218 } else if (op->direction == DOP_DIR_WRITE) {
2219 // Always write the first data piece, if that is smaller than a
2220 // chunk, accumulate further data pieces until chunk size is reached
2221 if (chunk_siz > max_buf_siz) {
2222 chunk_siz = max_buf_siz;
2223 }
2224 op->buf_siz = 0;
2225 dispatch_data_apply(op->data,
2226 ^(dispatch_data_t region DISPATCH_UNUSED,
2227 size_t offset DISPATCH_UNUSED,
2228 const void* buf DISPATCH_UNUSED, size_t len) {
2229 size_t siz = op->buf_siz + len;
2230 if (!op->buf_siz || siz <= chunk_siz) {
2231 op->buf_siz = siz;
2232 }
2233 return (bool)(siz < chunk_siz);
2234 });
2235 if (op->buf_siz > max_buf_siz) {
2236 op->buf_siz = max_buf_siz;
2237 }
2238 dispatch_data_t d;
2239 d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
2240 op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
2241 NULL);
2242 _dispatch_io_data_release(d);
2243 _dispatch_op_debug("buffer mapped", op);
2244 }
2245 }
2246 if (op->fd_entry->fd == -1) {
2247 err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
2248 if (err) {
2249 goto error;
2250 }
2251 }
2252 void *buf = op->buf + op->buf_len;
2253 size_t len = op->buf_siz - op->buf_len;
2254 off_t off = (off_t)((size_t)op->offset + op->total);
2255 ssize_t processed = -1;
2256 syscall:
2257 if (op->direction == DOP_DIR_READ) {
2258 if (op->params.type == DISPATCH_IO_STREAM) {
2259 processed = read(op->fd_entry->fd, buf, len);
2260 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2261 processed = pread(op->fd_entry->fd, buf, len, off);
2262 }
2263 } else if (op->direction == DOP_DIR_WRITE) {
2264 if (op->params.type == DISPATCH_IO_STREAM) {
2265 processed = write(op->fd_entry->fd, buf, len);
2266 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2267 processed = pwrite(op->fd_entry->fd, buf, len, off);
2268 }
2269 }
2270 // Encountered an error on the file descriptor
2271 if (processed == -1) {
2272 err = errno;
2273 if (err == EINTR) {
2274 goto syscall;
2275 }
2276 goto error;
2277 }
2278 // EOF is indicated by two handler invocations
2279 if (processed == 0) {
2280 _dispatch_op_debug("performed: EOF", op);
2281 return DISPATCH_OP_DELIVER_AND_COMPLETE;
2282 }
2283 op->buf_len += (size_t)processed;
2284 op->total += (size_t)processed;
2285 if (op->total == op->length) {
2286 // Finished processing all the bytes requested by the operation
2287 return DISPATCH_OP_COMPLETE;
2288 } else {
2289 // Deliver data only if we satisfy the filters
2290 return DISPATCH_OP_DELIVER;
2291 }
2292 error:
2293 if (err == EAGAIN || err == EWOULDBLOCK) {
2294 // For disk based files with blocking I/O we should never get EAGAIN
2295 dispatch_assert(!op->fd_entry->disk);
2296 _dispatch_op_debug("performed: EAGAIN/EWOULDBLOCK", op);
2297 if (op->direction == DOP_DIR_READ && op->total &&
2298 op->channel == op->fd_entry->convenience_channel) {
2299 // Convenience read with available data completes on EAGAIN
2300 return DISPATCH_OP_COMPLETE_RESUME;
2301 }
2302 return DISPATCH_OP_RESUME;
2303 }
2304 _dispatch_op_debug("performed: err %d", op, err);
2305 op->err = err;
2306 switch (err) {
2307 case ECANCELED:
2308 return DISPATCH_OP_ERR;
2309 case EBADF:
2310 (void)os_atomic_cmpxchg2o(op->fd_entry, err, 0, err, relaxed);
2311 return DISPATCH_OP_FD_ERR;
2312 default:
2313 return DISPATCH_OP_COMPLETE;
2314 }
2315 }
2316
2317 static void
2318 _dispatch_operation_deliver_data(dispatch_operation_t op,
2319 dispatch_op_flags_t flags)
2320 {
2321 // Either called from stream resp. pick queue or when op is finalized
2322 dispatch_data_t data = NULL;
2323 int err = 0;
2324 size_t undelivered = op->undelivered + op->buf_len;
2325 bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
2326 (op->flags & DOP_DELIVER);
2327 op->flags = DOP_DEFAULT;
2328 if (!deliver) {
2329 // Don't deliver data until low water mark has been reached
2330 if (undelivered >= op->params.low) {
2331 deliver = true;
2332 } else if (op->buf_len < op->buf_siz) {
2333 // Request buffer is not yet used up
2334 _dispatch_op_debug("buffer data: undelivered %zu", op, undelivered);
2335 return;
2336 }
2337 } else {
2338 err = op->err;
2339 if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
2340 err = ECANCELED;
2341 op->err = err;
2342 }
2343 }
2344 // Deliver data or buffer used up
2345 if (op->direction == DOP_DIR_READ) {
2346 if (op->buf_len) {
2347 void *buf = op->buf;
2348 data = dispatch_data_create(buf, op->buf_len, NULL,
2349 DISPATCH_DATA_DESTRUCTOR_FREE);
2350 op->buf = NULL;
2351 op->buf_len = 0;
2352 dispatch_data_t d = dispatch_data_create_concat(op->data, data);
2353 _dispatch_io_data_release(op->data);
2354 _dispatch_io_data_release(data);
2355 data = d;
2356 } else {
2357 data = op->data;
2358 }
2359 op->data = deliver ? dispatch_data_empty : data;
2360 } else if (op->direction == DOP_DIR_WRITE) {
2361 if (deliver) {
2362 data = dispatch_data_create_subrange(op->data, op->buf_len,
2363 op->length);
2364 }
2365 if (op->buf_data && op->buf_len == op->buf_siz) {
2366 _dispatch_io_data_release(op->buf_data);
2367 op->buf_data = NULL;
2368 op->buf = NULL;
2369 op->buf_len = 0;
2370 // Trim newly written buffer from head of unwritten data
2371 dispatch_data_t d;
2372 if (deliver) {
2373 _dispatch_io_data_retain(data);
2374 d = data;
2375 } else {
2376 d = dispatch_data_create_subrange(op->data, op->buf_siz,
2377 op->length);
2378 }
2379 _dispatch_io_data_release(op->data);
2380 op->data = d;
2381 }
2382 } else {
2383 dispatch_assert(op->direction < DOP_DIR_MAX);
2384 return;
2385 }
2386 if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
2387 op->undelivered = undelivered;
2388 _dispatch_op_debug("buffer data: undelivered %zu", op, undelivered);
2389 return;
2390 }
2391 op->undelivered = 0;
2392 _dispatch_object_debug(op, "%s", __func__);
2393 _dispatch_op_debug("deliver data", op);
2394 dispatch_op_direction_t direction = op->direction;
2395 dispatch_io_handler_t handler = op->handler;
2396 dispatch_fd_entry_t fd_entry = op->fd_entry;
2397 _dispatch_fd_entry_retain(fd_entry);
2398 dispatch_io_t channel = op->channel;
2399 _dispatch_retain(channel);
2400 // Note that data delivery may occur after the operation is freed
2401 dispatch_async(op->op_q, ^{
2402 bool done = (flags & DOP_DONE);
2403 dispatch_data_t d = data;
2404 if (done) {
2405 if (direction == DOP_DIR_READ && err) {
2406 if (dispatch_data_get_size(d)) {
2407 _dispatch_op_debug("IO handler invoke", op);
2408 handler(false, d, 0);
2409 }
2410 d = NULL;
2411 } else if (direction == DOP_DIR_WRITE && !err) {
2412 d = NULL;
2413 }
2414 }
2415 _dispatch_op_debug("IO handler invoke: err %d", op, err);
2416 handler(done, d, err);
2417 _dispatch_release(channel);
2418 _dispatch_fd_entry_release(fd_entry);
2419 _dispatch_io_data_release(data);
2420 });
2421 }
2422
2423 #pragma mark -
2424 #pragma mark dispatch_io_debug
2425
2426 static size_t
2427 _dispatch_io_debug_attr(dispatch_io_t channel, char* buf, size_t bufsiz)
2428 {
2429 dispatch_queue_t target = channel->do_targetq;
2430 return dsnprintf(buf, bufsiz, "type = %s, fd = 0x%x, %sfd_entry = %p, "
2431 "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = "
2432 "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ",
2433 channel->params.type == DISPATCH_IO_STREAM ? "stream" : "random",
2434 channel->fd_actual, channel->atomic_flags & DIO_STOPPED ?
2435 "stopped, " : channel->atomic_flags & DIO_CLOSED ? "closed, " : "",
2436 channel->fd_entry, channel->queue, target && target->dq_label ?
2437 target->dq_label : "", target, channel->barrier_queue,
2438 channel->barrier_group, channel->err, channel->params.low,
2439 channel->params.high, channel->params.interval_flags &
2440 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "",
2441 (unsigned long long) channel->params.interval);
2442 }
2443
2444 size_t
2445 _dispatch_io_debug(dispatch_io_t channel, char* buf, size_t bufsiz)
2446 {
2447 size_t offset = 0;
2448 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2449 dx_kind(channel), channel);
2450 offset += _dispatch_object_debug_attr(channel, &buf[offset],
2451 bufsiz - offset);
2452 offset += _dispatch_io_debug_attr(channel, &buf[offset], bufsiz - offset);
2453 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2454 return offset;
2455 }
2456
2457 static size_t
2458 _dispatch_operation_debug_attr(dispatch_operation_t op, char* buf,
2459 size_t bufsiz)
2460 {
2461 dispatch_queue_t target = op->do_targetq;
2462 dispatch_queue_t oqtarget = op->op_q ? op->op_q->do_targetq : NULL;
2463 return dsnprintf(buf, bufsiz, "type = %s %s, fd = 0x%x, fd_entry = %p, "
2464 "channel = %p, queue = %p -> %s[%p], target = %s[%p], "
2465 "offset = %lld, length = %zu, done = %zu, undelivered = %zu, "
2466 "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, "
2467 "interval%s = %llu ", op->params.type == DISPATCH_IO_STREAM ?
2468 "stream" : "random", op->direction == DOP_DIR_READ ? "read" :
2469 "write", op->fd_entry ? op->fd_entry->fd : -1, op->fd_entry,
2470 op->channel, op->op_q, oqtarget && oqtarget->dq_label ?
2471 oqtarget->dq_label : "", oqtarget, target && target->dq_label ?
2472 target->dq_label : "", target, (long long)op->offset, op->length,
2473 op->total, op->undelivered + op->buf_len, op->flags, op->err,
2474 op->params.low, op->params.high, op->params.interval_flags &
2475 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "",
2476 (unsigned long long)op->params.interval);
2477 }
2478
2479 size_t
2480 _dispatch_operation_debug(dispatch_operation_t op, char* buf, size_t bufsiz)
2481 {
2482 size_t offset = 0;
2483 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2484 dx_kind(op), op);
2485 offset += _dispatch_object_debug_attr(op, &buf[offset], bufsiz - offset);
2486 offset += _dispatch_operation_debug_attr(op, &buf[offset], bufsiz - offset);
2487 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2488 return offset;
2489 }