]> git.saurik.com Git - apple/libdispatch.git/blob - src/io.c
libdispatch-228.23.tar.gz
[apple/libdispatch.git] / src / io.c
1 /*
2 * Copyright (c) 2009-2011 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
24
25 DISPATCH_EXPORT DISPATCH_NOTHROW
26 void _dispatch_iocntl(uint32_t param, uint64_t value);
27
28 static dispatch_operation_t _dispatch_operation_create(
29 dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
30 size_t length, dispatch_data_t data, dispatch_queue_t queue,
31 dispatch_io_handler_t handler);
32 static void _dispatch_operation_enqueue(dispatch_operation_t op,
33 dispatch_op_direction_t direction, dispatch_data_t data);
34 static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
35 dispatch_operation_t op);
36 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
37 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
38 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
39 dispatch_fd_entry_init_callback_t completion_callback);
40 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
41 uintptr_t hash);
42 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
43 dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
44 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
45 dispatch_io_t channel);
46 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
47 dispatch_io_t channel);
48 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
49 dispatch_queue_t tq);
50 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
51 dispatch_op_direction_t direction);
52 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
53 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
54 dispatch_operation_t operation, dispatch_data_t data);
55 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
56 dispatch_operation_t operation, dispatch_data_t data);
57 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
58 dispatch_io_t channel);
59 static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk,
60 dispatch_io_t channel);
61 static void _dispatch_stream_source_handler(void *ctx);
62 static void _dispatch_stream_handler(void *ctx);
63 static void _dispatch_disk_handler(void *ctx);
64 static void _dispatch_disk_perform(void *ctxt);
65 static void _dispatch_operation_advise(dispatch_operation_t op,
66 size_t chunk_size);
67 static int _dispatch_operation_perform(dispatch_operation_t op);
68 static void _dispatch_operation_deliver_data(dispatch_operation_t op,
69 dispatch_op_flags_t flags);
70
71 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
72 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
73 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
74 case EINTR: continue; \
75 __VA_ARGS__ \
76 } \
77 } while (0)
78 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
79 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
80 case 0: break; \
81 __VA_ARGS__ \
82 ); \
83 } while (0)
84 #define _dispatch_io_syscall(__syscall) do { int __err; \
85 _dispatch_io_syscall_switch(__err, __syscall); \
86 } while (0)
87
88 enum {
89 DISPATCH_OP_COMPLETE = 1,
90 DISPATCH_OP_DELIVER,
91 DISPATCH_OP_DELIVER_AND_COMPLETE,
92 DISPATCH_OP_COMPLETE_RESUME,
93 DISPATCH_OP_RESUME,
94 DISPATCH_OP_ERR,
95 DISPATCH_OP_FD_ERR,
96 };
97
98 #define _dispatch_io_Block_copy(x) ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
99
100 #pragma mark -
101 #pragma mark dispatch_io_hashtables
102
103 #if TARGET_OS_EMBEDDED
104 #define DIO_HASH_SIZE 64u // must be a power of two
105 #else
106 #define DIO_HASH_SIZE 256u // must be a power of two
107 #endif
108 #define DIO_HASH(x) ((uintptr_t)((x) & (DIO_HASH_SIZE - 1)))
109
110 // Global hashtable of dev_t -> disk_s mappings
111 DISPATCH_CACHELINE_ALIGN
112 static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
113 // Global hashtable of fd -> fd_entry_s mappings
114 DISPATCH_CACHELINE_ALIGN
115 static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
116
117 static dispatch_once_t _dispatch_io_devs_lockq_pred;
118 static dispatch_queue_t _dispatch_io_devs_lockq;
119 static dispatch_queue_t _dispatch_io_fds_lockq;
120
121 static void
122 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
123 {
124 _dispatch_io_fds_lockq = dispatch_queue_create(
125 "com.apple.libdispatch-io.fd_lockq", NULL);
126 unsigned int i;
127 for (i = 0; i < DIO_HASH_SIZE; i++) {
128 TAILQ_INIT(&_dispatch_io_fds[i]);
129 }
130 }
131
132 static void
133 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
134 {
135 _dispatch_io_devs_lockq = dispatch_queue_create(
136 "com.apple.libdispatch-io.dev_lockq", NULL);
137 unsigned int i;
138 for (i = 0; i < DIO_HASH_SIZE; i++) {
139 TAILQ_INIT(&_dispatch_io_devs[i]);
140 }
141 }
142
143 #pragma mark -
144 #pragma mark dispatch_io_defaults
145
146 enum {
147 DISPATCH_IOCNTL_CHUNK_PAGES = 1,
148 DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
149 DISPATCH_IOCNTL_INITIAL_DELIVERY,
150 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
151 };
152
153 static struct dispatch_io_defaults_s {
154 size_t chunk_pages, low_water_chunks, max_pending_io_reqs;
155 bool initial_delivery;
156 } dispatch_io_defaults = {
157 .chunk_pages = DIO_MAX_CHUNK_PAGES,
158 .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
159 .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
160 };
161
162 #define _dispatch_iocntl_set_default(p, v) do { \
163 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
164 } while (0)
165
166 void
167 _dispatch_iocntl(uint32_t param, uint64_t value)
168 {
169 switch (param) {
170 case DISPATCH_IOCNTL_CHUNK_PAGES:
171 _dispatch_iocntl_set_default(chunk_pages, value);
172 break;
173 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
174 _dispatch_iocntl_set_default(low_water_chunks, value);
175 break;
176 case DISPATCH_IOCNTL_INITIAL_DELIVERY:
177 _dispatch_iocntl_set_default(initial_delivery, value);
178 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
179 _dispatch_iocntl_set_default(max_pending_io_reqs, value);
180 break;
181 }
182 }
183
184 #pragma mark -
185 #pragma mark dispatch_io_t
186
187 static dispatch_io_t
188 _dispatch_io_create(dispatch_io_type_t type)
189 {
190 dispatch_io_t channel = _dispatch_alloc(DISPATCH_VTABLE(io),
191 sizeof(struct dispatch_io_s));
192 channel->do_next = DISPATCH_OBJECT_LISTLESS;
193 channel->do_targetq = _dispatch_get_root_queue(0, true);
194 channel->params.type = type;
195 channel->params.high = SIZE_MAX;
196 channel->params.low = dispatch_io_defaults.low_water_chunks *
197 dispatch_io_defaults.chunk_pages * PAGE_SIZE;
198 channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
199 NULL);
200 return channel;
201 }
202
203 static void
204 _dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
205 dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
206 {
207 // Enqueue the cleanup handler on the suspended close queue
208 if (cleanup_handler) {
209 _dispatch_retain(queue);
210 dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
211 dispatch_async(queue, ^{
212 _dispatch_io_debug("cleanup handler invoke", -1);
213 cleanup_handler(err);
214 });
215 _dispatch_release(queue);
216 });
217 }
218 if (fd_entry) {
219 channel->fd_entry = fd_entry;
220 dispatch_retain(fd_entry->barrier_queue);
221 dispatch_retain(fd_entry->barrier_group);
222 channel->barrier_queue = fd_entry->barrier_queue;
223 channel->barrier_group = fd_entry->barrier_group;
224 } else {
225 // Still need to create a barrier queue, since all operations go
226 // through it
227 channel->barrier_queue = dispatch_queue_create(
228 "com.apple.libdispatch-io.barrierq", NULL);
229 channel->barrier_group = dispatch_group_create();
230 }
231 }
232
233 void
234 _dispatch_io_dispose(dispatch_io_t channel)
235 {
236 if (channel->fd_entry && !(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
237 if (channel->fd_entry->path_data) {
238 // This modification is safe since path_data->channel is checked
239 // only on close_queue (which is still suspended at this point)
240 channel->fd_entry->path_data->channel = NULL;
241 }
242 // Cleanup handlers will only run when all channels related to this
243 // fd are complete
244 _dispatch_fd_entry_release(channel->fd_entry);
245 }
246 if (channel->queue) {
247 dispatch_release(channel->queue);
248 }
249 if (channel->barrier_queue) {
250 dispatch_release(channel->barrier_queue);
251 }
252 if (channel->barrier_group) {
253 dispatch_release(channel->barrier_group);
254 }
255 }
256
257 static int
258 _dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
259 {
260 int err = 0;
261 if (S_ISDIR(mode)) {
262 err = EISDIR;
263 } else if (channel->params.type == DISPATCH_IO_RANDOM &&
264 (S_ISFIFO(mode) || S_ISSOCK(mode))) {
265 err = ESPIPE;
266 }
267 return err;
268 }
269
270 static int
271 _dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
272 bool ignore_closed)
273 {
274 // On _any_ queue
275 int err;
276 if (op) {
277 channel = op->channel;
278 }
279 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
280 if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
281 err = ECANCELED;
282 } else {
283 err = 0;
284 }
285 } else {
286 err = op ? op->fd_entry->err : channel->err;
287 }
288 return err;
289 }
290
291 #pragma mark -
292 #pragma mark dispatch_io_channels
293
294 dispatch_io_t
295 dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
296 dispatch_queue_t queue, void (^cleanup_handler)(int))
297 {
298 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
299 return NULL;
300 }
301 _dispatch_io_debug("io create", fd);
302 dispatch_io_t channel = _dispatch_io_create(type);
303 channel->fd = fd;
304 channel->fd_actual = fd;
305 dispatch_suspend(channel->queue);
306 _dispatch_retain(queue);
307 _dispatch_retain(channel);
308 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
309 // On barrier queue
310 int err = fd_entry->err;
311 if (!err) {
312 err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
313 }
314 if (!err && type == DISPATCH_IO_RANDOM) {
315 off_t f_ptr;
316 _dispatch_io_syscall_switch_noerr(err,
317 f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
318 case 0: channel->f_ptr = f_ptr; break;
319 default: (void)dispatch_assume_zero(err); break;
320 );
321 }
322 channel->err = err;
323 _dispatch_fd_entry_retain(fd_entry);
324 _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
325 dispatch_resume(channel->queue);
326 _dispatch_release(channel);
327 _dispatch_release(queue);
328 });
329 return channel;
330 }
331
332 dispatch_io_t
333 dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
334 int oflag, mode_t mode, dispatch_queue_t queue,
335 void (^cleanup_handler)(int error))
336 {
337 if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
338 !(path && *path == '/')) {
339 return NULL;
340 }
341 size_t pathlen = strlen(path);
342 dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
343 if (!path_data) {
344 return NULL;
345 }
346 _dispatch_io_debug("io create with path %s", -1, path);
347 dispatch_io_t channel = _dispatch_io_create(type);
348 channel->fd = -1;
349 channel->fd_actual = -1;
350 path_data->channel = channel;
351 path_data->oflag = oflag;
352 path_data->mode = mode;
353 path_data->pathlen = pathlen;
354 memcpy(path_data->path, path, pathlen + 1);
355 _dispatch_retain(queue);
356 _dispatch_retain(channel);
357 dispatch_async(channel->queue, ^{
358 int err = 0;
359 struct stat st;
360 _dispatch_io_syscall_switch_noerr(err,
361 (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW ||
362 (path_data->oflag & O_SYMLINK) == O_SYMLINK ?
363 lstat(path_data->path, &st) : stat(path_data->path, &st),
364 case 0:
365 err = _dispatch_io_validate_type(channel, st.st_mode);
366 break;
367 default:
368 if ((path_data->oflag & O_CREAT) &&
369 (*(path_data->path + path_data->pathlen - 1) != '/')) {
370 // Check parent directory
371 char *c = strrchr(path_data->path, '/');
372 dispatch_assert(c);
373 *c = 0;
374 int perr;
375 _dispatch_io_syscall_switch_noerr(perr,
376 stat(path_data->path, &st),
377 case 0:
378 // Since the parent directory exists, open() will
379 // create a regular file after the fd_entry has
380 // been filled in
381 st.st_mode = S_IFREG;
382 err = 0;
383 break;
384 );
385 *c = '/';
386 }
387 break;
388 );
389 channel->err = err;
390 if (err) {
391 free(path_data);
392 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
393 _dispatch_release(channel);
394 _dispatch_release(queue);
395 return;
396 }
397 dispatch_suspend(channel->queue);
398 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
399 _dispatch_io_devs_lockq_init);
400 dispatch_async(_dispatch_io_devs_lockq, ^{
401 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
402 path_data, st.st_dev, st.st_mode);
403 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
404 dispatch_resume(channel->queue);
405 _dispatch_release(channel);
406 _dispatch_release(queue);
407 });
408 });
409 return channel;
410 }
411
412 dispatch_io_t
413 dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
414 dispatch_queue_t queue, void (^cleanup_handler)(int error))
415 {
416 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
417 return NULL;
418 }
419 _dispatch_io_debug("io create with io %p", -1, in_channel);
420 dispatch_io_t channel = _dispatch_io_create(type);
421 dispatch_suspend(channel->queue);
422 _dispatch_retain(queue);
423 _dispatch_retain(channel);
424 _dispatch_retain(in_channel);
425 dispatch_async(in_channel->queue, ^{
426 int err0 = _dispatch_io_get_error(NULL, in_channel, false);
427 if (err0) {
428 channel->err = err0;
429 _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
430 dispatch_resume(channel->queue);
431 _dispatch_release(channel);
432 _dispatch_release(in_channel);
433 _dispatch_release(queue);
434 return;
435 }
436 dispatch_async(in_channel->barrier_queue, ^{
437 int err = _dispatch_io_get_error(NULL, in_channel, false);
438 // If there is no error, the fd_entry for the in_channel is valid.
439 // Since we are running on in_channel's queue, the fd_entry has been
440 // fully resolved and will stay valid for the duration of this block
441 if (!err) {
442 err = in_channel->err;
443 if (!err) {
444 err = in_channel->fd_entry->err;
445 }
446 }
447 if (!err) {
448 err = _dispatch_io_validate_type(channel,
449 in_channel->fd_entry->stat.mode);
450 }
451 if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
452 off_t f_ptr;
453 _dispatch_io_syscall_switch_noerr(err,
454 f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
455 case 0: channel->f_ptr = f_ptr; break;
456 default: (void)dispatch_assume_zero(err); break;
457 );
458 }
459 channel->err = err;
460 if (err) {
461 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
462 dispatch_resume(channel->queue);
463 _dispatch_release(channel);
464 _dispatch_release(in_channel);
465 _dispatch_release(queue);
466 return;
467 }
468 if (in_channel->fd == -1) {
469 // in_channel was created from path
470 channel->fd = -1;
471 channel->fd_actual = -1;
472 mode_t mode = in_channel->fd_entry->stat.mode;
473 dev_t dev = in_channel->fd_entry->stat.dev;
474 size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
475 in_channel->fd_entry->path_data->pathlen + 1;
476 dispatch_io_path_data_t path_data = malloc(path_data_len);
477 memcpy(path_data, in_channel->fd_entry->path_data,
478 path_data_len);
479 path_data->channel = channel;
480 // lockq_io_devs is known to already exist
481 dispatch_async(_dispatch_io_devs_lockq, ^{
482 dispatch_fd_entry_t fd_entry;
483 fd_entry = _dispatch_fd_entry_create_with_path(path_data,
484 dev, mode);
485 _dispatch_io_init(channel, fd_entry, queue, 0,
486 cleanup_handler);
487 dispatch_resume(channel->queue);
488 _dispatch_release(channel);
489 _dispatch_release(queue);
490 });
491 } else {
492 dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
493 channel->fd = in_channel->fd;
494 channel->fd_actual = in_channel->fd_actual;
495 _dispatch_fd_entry_retain(fd_entry);
496 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
497 dispatch_resume(channel->queue);
498 _dispatch_release(channel);
499 _dispatch_release(queue);
500 }
501 _dispatch_release(in_channel);
502 });
503 });
504 return channel;
505 }
506
507 #pragma mark -
508 #pragma mark dispatch_io_accessors
509
510 void
511 dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
512 {
513 _dispatch_retain(channel);
514 dispatch_async(channel->queue, ^{
515 _dispatch_io_debug("io set high water", channel->fd);
516 if (channel->params.low > high_water) {
517 channel->params.low = high_water;
518 }
519 channel->params.high = high_water ? high_water : 1;
520 _dispatch_release(channel);
521 });
522 }
523
524 void
525 dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
526 {
527 _dispatch_retain(channel);
528 dispatch_async(channel->queue, ^{
529 _dispatch_io_debug("io set low water", channel->fd);
530 if (channel->params.high < low_water) {
531 channel->params.high = low_water ? low_water : 1;
532 }
533 channel->params.low = low_water;
534 _dispatch_release(channel);
535 });
536 }
537
538 void
539 dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
540 unsigned long flags)
541 {
542 _dispatch_retain(channel);
543 dispatch_async(channel->queue, ^{
544 _dispatch_io_debug("io set interval", channel->fd);
545 channel->params.interval = interval;
546 channel->params.interval_flags = flags;
547 _dispatch_release(channel);
548 });
549 }
550
551 void
552 _dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
553 {
554 _dispatch_retain(dq);
555 _dispatch_retain(channel);
556 dispatch_async(channel->queue, ^{
557 dispatch_queue_t prev_dq = channel->do_targetq;
558 channel->do_targetq = dq;
559 _dispatch_release(prev_dq);
560 _dispatch_release(channel);
561 });
562 }
563
564 dispatch_fd_t
565 dispatch_io_get_descriptor(dispatch_io_t channel)
566 {
567 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
568 return -1;
569 }
570 dispatch_fd_t fd = channel->fd_actual;
571 if (fd == -1 &&
572 _dispatch_thread_getspecific(dispatch_io_key) == channel) {
573 dispatch_fd_entry_t fd_entry = channel->fd_entry;
574 (void)_dispatch_fd_entry_open(fd_entry, channel);
575 }
576 return channel->fd_actual;
577 }
578
579 #pragma mark -
580 #pragma mark dispatch_io_operations
581
582 static void
583 _dispatch_io_stop(dispatch_io_t channel)
584 {
585 _dispatch_io_debug("io stop", channel->fd);
586 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED);
587 _dispatch_retain(channel);
588 dispatch_async(channel->queue, ^{
589 dispatch_async(channel->barrier_queue, ^{
590 dispatch_fd_entry_t fd_entry = channel->fd_entry;
591 if (fd_entry) {
592 _dispatch_io_debug("io stop cleanup", channel->fd);
593 _dispatch_fd_entry_cleanup_operations(fd_entry, channel);
594 if (!(channel->atomic_flags & DIO_CLOSED)) {
595 channel->fd_entry = NULL;
596 _dispatch_fd_entry_release(fd_entry);
597 }
598 } else if (channel->fd != -1) {
599 // Stop after close, need to check if fd_entry still exists
600 _dispatch_retain(channel);
601 dispatch_async(_dispatch_io_fds_lockq, ^{
602 _dispatch_io_debug("io stop after close cleanup",
603 channel->fd);
604 dispatch_fd_entry_t fdi;
605 uintptr_t hash = DIO_HASH(channel->fd);
606 TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
607 if (fdi->fd == channel->fd) {
608 _dispatch_fd_entry_cleanup_operations(fdi, channel);
609 break;
610 }
611 }
612 _dispatch_release(channel);
613 });
614 }
615 _dispatch_release(channel);
616 });
617 });
618 }
619
620 void
621 dispatch_io_close(dispatch_io_t channel, unsigned long flags)
622 {
623 if (flags & DISPATCH_IO_STOP) {
624 // Don't stop an already stopped channel
625 if (channel->atomic_flags & DIO_STOPPED) {
626 return;
627 }
628 return _dispatch_io_stop(channel);
629 }
630 // Don't close an already closed or stopped channel
631 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
632 return;
633 }
634 _dispatch_retain(channel);
635 dispatch_async(channel->queue, ^{
636 dispatch_async(channel->barrier_queue, ^{
637 _dispatch_io_debug("io close", channel->fd);
638 if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
639 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED);
640 dispatch_fd_entry_t fd_entry = channel->fd_entry;
641 if (!fd_entry->path_data) {
642 channel->fd_entry = NULL;
643 }
644 _dispatch_fd_entry_release(fd_entry);
645 }
646 _dispatch_release(channel);
647 });
648 });
649 }
650
651 void
652 dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
653 {
654 _dispatch_retain(channel);
655 dispatch_async(channel->queue, ^{
656 dispatch_queue_t io_q = channel->do_targetq;
657 dispatch_queue_t barrier_queue = channel->barrier_queue;
658 dispatch_group_t barrier_group = channel->barrier_group;
659 dispatch_async(barrier_queue, ^{
660 dispatch_suspend(barrier_queue);
661 dispatch_group_notify(barrier_group, io_q, ^{
662 _dispatch_thread_setspecific(dispatch_io_key, channel);
663 barrier();
664 _dispatch_thread_setspecific(dispatch_io_key, NULL);
665 dispatch_resume(barrier_queue);
666 _dispatch_release(channel);
667 });
668 });
669 });
670 }
671
672 void
673 dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
674 dispatch_queue_t queue, dispatch_io_handler_t handler)
675 {
676 _dispatch_retain(channel);
677 _dispatch_retain(queue);
678 dispatch_async(channel->queue, ^{
679 dispatch_operation_t op;
680 op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
681 length, dispatch_data_empty, queue, handler);
682 if (op) {
683 dispatch_queue_t barrier_q = channel->barrier_queue;
684 dispatch_async(barrier_q, ^{
685 _dispatch_operation_enqueue(op, DOP_DIR_READ,
686 dispatch_data_empty);
687 });
688 }
689 _dispatch_release(channel);
690 _dispatch_release(queue);
691 });
692 }
693
694 void
695 dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
696 dispatch_queue_t queue, dispatch_io_handler_t handler)
697 {
698 _dispatch_io_data_retain(data);
699 _dispatch_retain(channel);
700 _dispatch_retain(queue);
701 dispatch_async(channel->queue, ^{
702 dispatch_operation_t op;
703 op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
704 dispatch_data_get_size(data), data, queue, handler);
705 if (op) {
706 dispatch_queue_t barrier_q = channel->barrier_queue;
707 dispatch_async(barrier_q, ^{
708 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
709 _dispatch_io_data_release(data);
710 });
711 } else {
712 _dispatch_io_data_release(data);
713 }
714 _dispatch_release(channel);
715 _dispatch_release(queue);
716 });
717 }
718
719 void
720 dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
721 void (^handler)(dispatch_data_t, int))
722 {
723 _dispatch_retain(queue);
724 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
725 // On barrier queue
726 if (fd_entry->err) {
727 int err = fd_entry->err;
728 dispatch_async(queue, ^{
729 _dispatch_io_debug("convenience handler invoke", fd);
730 handler(dispatch_data_empty, err);
731 });
732 _dispatch_release(queue);
733 return;
734 }
735 // Safe to access fd_entry on barrier queue
736 dispatch_io_t channel = fd_entry->convenience_channel;
737 if (!channel) {
738 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
739 channel->fd = fd;
740 channel->fd_actual = fd;
741 channel->fd_entry = fd_entry;
742 dispatch_retain(fd_entry->barrier_queue);
743 dispatch_retain(fd_entry->barrier_group);
744 channel->barrier_queue = fd_entry->barrier_queue;
745 channel->barrier_group = fd_entry->barrier_group;
746 fd_entry->convenience_channel = channel;
747 }
748 __block dispatch_data_t deliver_data = dispatch_data_empty;
749 __block int err = 0;
750 dispatch_async(fd_entry->close_queue, ^{
751 dispatch_async(queue, ^{
752 _dispatch_io_debug("convenience handler invoke", fd);
753 handler(deliver_data, err);
754 _dispatch_io_data_release(deliver_data);
755 });
756 _dispatch_release(queue);
757 });
758 dispatch_operation_t op =
759 _dispatch_operation_create(DOP_DIR_READ, channel, 0,
760 length, dispatch_data_empty,
761 _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
762 false), ^(bool done, dispatch_data_t data, int error) {
763 if (data) {
764 data = dispatch_data_create_concat(deliver_data, data);
765 _dispatch_io_data_release(deliver_data);
766 deliver_data = data;
767 }
768 if (done) {
769 err = error;
770 }
771 });
772 if (op) {
773 _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
774 }
775 });
776 }
777
778 void
779 dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
780 void (^handler)(dispatch_data_t, int))
781 {
782 _dispatch_io_data_retain(data);
783 _dispatch_retain(queue);
784 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
785 // On barrier queue
786 if (fd_entry->err) {
787 int err = fd_entry->err;
788 dispatch_async(queue, ^{
789 _dispatch_io_debug("convenience handler invoke", fd);
790 handler(NULL, err);
791 });
792 _dispatch_release(queue);
793 return;
794 }
795 // Safe to access fd_entry on barrier queue
796 dispatch_io_t channel = fd_entry->convenience_channel;
797 if (!channel) {
798 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
799 channel->fd = fd;
800 channel->fd_actual = fd;
801 channel->fd_entry = fd_entry;
802 dispatch_retain(fd_entry->barrier_queue);
803 dispatch_retain(fd_entry->barrier_group);
804 channel->barrier_queue = fd_entry->barrier_queue;
805 channel->barrier_group = fd_entry->barrier_group;
806 fd_entry->convenience_channel = channel;
807 }
808 __block dispatch_data_t deliver_data = NULL;
809 __block int err = 0;
810 dispatch_async(fd_entry->close_queue, ^{
811 dispatch_async(queue, ^{
812 _dispatch_io_debug("convenience handler invoke", fd);
813 handler(deliver_data, err);
814 if (deliver_data) {
815 _dispatch_io_data_release(deliver_data);
816 }
817 });
818 _dispatch_release(queue);
819 });
820 dispatch_operation_t op =
821 _dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
822 dispatch_data_get_size(data), data,
823 _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
824 false), ^(bool done, dispatch_data_t d, int error) {
825 if (done) {
826 if (d) {
827 _dispatch_io_data_retain(d);
828 deliver_data = d;
829 }
830 err = error;
831 }
832 });
833 if (op) {
834 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
835 }
836 _dispatch_io_data_release(data);
837 });
838 }
839
840 #pragma mark -
841 #pragma mark dispatch_operation_t
842
843 static dispatch_operation_t
844 _dispatch_operation_create(dispatch_op_direction_t direction,
845 dispatch_io_t channel, off_t offset, size_t length,
846 dispatch_data_t data, dispatch_queue_t queue,
847 dispatch_io_handler_t handler)
848 {
849 // On channel queue
850 dispatch_assert(direction < DOP_DIR_MAX);
851 _dispatch_io_debug("operation create", channel->fd);
852 #if DISPATCH_IO_DEBUG
853 int fd = channel->fd;
854 #endif
855 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
856 // that can only be NULL if atomic_flags are set rdar://problem/8362514
857 int err = _dispatch_io_get_error(NULL, channel, false);
858 if (err || !length) {
859 _dispatch_io_data_retain(data);
860 _dispatch_retain(queue);
861 dispatch_async(channel->barrier_queue, ^{
862 dispatch_async(queue, ^{
863 dispatch_data_t d = data;
864 if (direction == DOP_DIR_READ && err) {
865 d = NULL;
866 } else if (direction == DOP_DIR_WRITE && !err) {
867 d = NULL;
868 }
869 _dispatch_io_debug("IO handler invoke", fd);
870 handler(true, d, err);
871 _dispatch_io_data_release(data);
872 });
873 _dispatch_release(queue);
874 });
875 return NULL;
876 }
877 dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation),
878 sizeof(struct dispatch_operation_s));
879 op->do_next = DISPATCH_OBJECT_LISTLESS;
880 op->do_xref_cnt = -1; // operation object is not exposed externally
881 op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
882 op->op_q->do_targetq = queue;
883 _dispatch_retain(queue);
884 op->active = false;
885 op->direction = direction;
886 op->offset = offset + channel->f_ptr;
887 op->length = length;
888 op->handler = _dispatch_io_Block_copy(handler);
889 _dispatch_retain(channel);
890 op->channel = channel;
891 op->params = channel->params;
892 // Take a snapshot of the priority of the channel queue. The actual I/O
893 // for this operation will be performed at this priority
894 dispatch_queue_t targetq = op->channel->do_targetq;
895 while (fastpath(targetq->do_targetq)) {
896 targetq = targetq->do_targetq;
897 }
898 op->do_targetq = targetq;
899 return op;
900 }
901
902 void
903 _dispatch_operation_dispose(dispatch_operation_t op)
904 {
905 // Deliver the data if there's any
906 if (op->fd_entry) {
907 _dispatch_operation_deliver_data(op, DOP_DONE);
908 dispatch_group_leave(op->fd_entry->barrier_group);
909 _dispatch_fd_entry_release(op->fd_entry);
910 }
911 if (op->channel) {
912 _dispatch_release(op->channel);
913 }
914 if (op->timer) {
915 dispatch_release(op->timer);
916 }
917 // For write operations, op->buf is owned by op->buf_data
918 if (op->buf && op->direction == DOP_DIR_READ) {
919 free(op->buf);
920 }
921 if (op->buf_data) {
922 _dispatch_io_data_release(op->buf_data);
923 }
924 if (op->data) {
925 _dispatch_io_data_release(op->data);
926 }
927 if (op->op_q) {
928 dispatch_release(op->op_q);
929 }
930 Block_release(op->handler);
931 }
932
933 static void
934 _dispatch_operation_enqueue(dispatch_operation_t op,
935 dispatch_op_direction_t direction, dispatch_data_t data)
936 {
937 // Called from the barrier queue
938 _dispatch_io_data_retain(data);
939 // If channel is closed or stopped, then call the handler immediately
940 int err = _dispatch_io_get_error(NULL, op->channel, false);
941 if (err) {
942 dispatch_io_handler_t handler = op->handler;
943 dispatch_async(op->op_q, ^{
944 dispatch_data_t d = data;
945 if (direction == DOP_DIR_READ && err) {
946 d = NULL;
947 } else if (direction == DOP_DIR_WRITE && !err) {
948 d = NULL;
949 }
950 handler(true, d, err);
951 _dispatch_io_data_release(data);
952 });
953 _dispatch_release(op);
954 return;
955 }
956 // Finish operation init
957 op->fd_entry = op->channel->fd_entry;
958 _dispatch_fd_entry_retain(op->fd_entry);
959 dispatch_group_enter(op->fd_entry->barrier_group);
960 dispatch_disk_t disk = op->fd_entry->disk;
961 if (!disk) {
962 dispatch_stream_t stream = op->fd_entry->streams[direction];
963 dispatch_async(stream->dq, ^{
964 _dispatch_stream_enqueue_operation(stream, op, data);
965 _dispatch_io_data_release(data);
966 });
967 } else {
968 dispatch_async(disk->pick_queue, ^{
969 _dispatch_disk_enqueue_operation(disk, op, data);
970 _dispatch_io_data_release(data);
971 });
972 }
973 }
974
975 static bool
976 _dispatch_operation_should_enqueue(dispatch_operation_t op,
977 dispatch_queue_t tq, dispatch_data_t data)
978 {
979 // On stream queue or disk queue
980 _dispatch_io_debug("enqueue operation", op->fd_entry->fd);
981 _dispatch_io_data_retain(data);
982 op->data = data;
983 int err = _dispatch_io_get_error(op, NULL, true);
984 if (err) {
985 op->err = err;
986 // Final release
987 _dispatch_release(op);
988 return false;
989 }
990 if (op->params.interval) {
991 dispatch_resume(_dispatch_operation_timer(tq, op));
992 }
993 return true;
994 }
995
996 static dispatch_source_t
997 _dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
998 {
999 // On stream queue or pick queue
1000 if (op->timer) {
1001 return op->timer;
1002 }
1003 dispatch_source_t timer = dispatch_source_create(
1004 DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
1005 dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW,
1006 op->params.interval), op->params.interval, 0);
1007 dispatch_source_set_event_handler(timer, ^{
1008 // On stream queue or pick queue
1009 if (dispatch_source_testcancel(timer)) {
1010 // Do nothing. The operation has already completed
1011 return;
1012 }
1013 dispatch_op_flags_t flags = DOP_DEFAULT;
1014 if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
1015 // Deliver even if there is less data than the low-water mark
1016 flags |= DOP_DELIVER;
1017 }
1018 // If the operation is active, dont deliver data
1019 if ((op->active) && (flags & DOP_DELIVER)) {
1020 op->flags = flags;
1021 } else {
1022 _dispatch_operation_deliver_data(op, flags);
1023 }
1024 });
1025 op->timer = timer;
1026 return op->timer;
1027 }
1028
1029 #pragma mark -
1030 #pragma mark dispatch_fd_entry_t
1031
1032 static inline void
1033 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
1034 dispatch_suspend(fd_entry->close_queue);
1035 }
1036
1037 static inline void
1038 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
1039 dispatch_resume(fd_entry->close_queue);
1040 }
1041
1042 static void
1043 _dispatch_fd_entry_init_async(dispatch_fd_t fd,
1044 dispatch_fd_entry_init_callback_t completion_callback)
1045 {
1046 static dispatch_once_t _dispatch_io_fds_lockq_pred;
1047 dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
1048 _dispatch_io_fds_lockq_init);
1049 dispatch_async(_dispatch_io_fds_lockq, ^{
1050 _dispatch_io_debug("fd entry init", fd);
1051 dispatch_fd_entry_t fd_entry = NULL;
1052 // Check to see if there is an existing entry for the given fd
1053 uintptr_t hash = DIO_HASH(fd);
1054 TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
1055 if (fd_entry->fd == fd) {
1056 // Retain the fd_entry to ensure it cannot go away until the
1057 // stat() has completed
1058 _dispatch_fd_entry_retain(fd_entry);
1059 break;
1060 }
1061 }
1062 if (!fd_entry) {
1063 // If we did not find an existing entry, create one
1064 fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
1065 }
1066 dispatch_async(fd_entry->barrier_queue, ^{
1067 _dispatch_io_debug("fd entry init completion", fd);
1068 completion_callback(fd_entry);
1069 // stat() is complete, release reference to fd_entry
1070 _dispatch_fd_entry_release(fd_entry);
1071 });
1072 });
1073 }
1074
1075 static dispatch_fd_entry_t
1076 _dispatch_fd_entry_create(dispatch_queue_t q)
1077 {
1078 dispatch_fd_entry_t fd_entry;
1079 fd_entry = calloc(1ul, sizeof(struct dispatch_fd_entry_s));
1080 fd_entry->close_queue = dispatch_queue_create(
1081 "com.apple.libdispatch-io.closeq", NULL);
1082 // Use target queue to ensure that no concurrent lookups are going on when
1083 // the close queue is running
1084 fd_entry->close_queue->do_targetq = q;
1085 _dispatch_retain(q);
1086 // Suspend the cleanup queue until closing
1087 _dispatch_fd_entry_retain(fd_entry);
1088 return fd_entry;
1089 }
1090
1091 static dispatch_fd_entry_t
1092 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
1093 {
1094 // On fds lock queue
1095 _dispatch_io_debug("fd entry create", fd);
1096 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1097 _dispatch_io_fds_lockq);
1098 fd_entry->fd = fd;
1099 TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
1100 fd_entry->barrier_queue = dispatch_queue_create(
1101 "com.apple.libdispatch-io.barrierq", NULL);
1102 fd_entry->barrier_group = dispatch_group_create();
1103 dispatch_async(fd_entry->barrier_queue, ^{
1104 _dispatch_io_debug("fd entry stat", fd);
1105 int err, orig_flags, orig_nosigpipe = -1;
1106 struct stat st;
1107 _dispatch_io_syscall_switch(err,
1108 fstat(fd, &st),
1109 default: fd_entry->err = err; return;
1110 );
1111 fd_entry->stat.dev = st.st_dev;
1112 fd_entry->stat.mode = st.st_mode;
1113 _dispatch_io_syscall_switch(err,
1114 orig_flags = fcntl(fd, F_GETFL),
1115 default: (void)dispatch_assume_zero(err); break;
1116 );
1117 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1118 if (S_ISFIFO(st.st_mode)) {
1119 _dispatch_io_syscall_switch(err,
1120 orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
1121 default: (void)dispatch_assume_zero(err); break;
1122 );
1123 if (orig_nosigpipe != -1) {
1124 _dispatch_io_syscall_switch(err,
1125 orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
1126 default:
1127 orig_nosigpipe = -1;
1128 (void)dispatch_assume_zero(err);
1129 break;
1130 );
1131 }
1132 }
1133 #endif
1134 if (S_ISREG(st.st_mode)) {
1135 if (orig_flags != -1) {
1136 _dispatch_io_syscall_switch(err,
1137 fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
1138 default:
1139 orig_flags = -1;
1140 (void)dispatch_assume_zero(err);
1141 break;
1142 );
1143 }
1144 int32_t dev = major(st.st_dev);
1145 // We have to get the disk on the global dev queue. The
1146 // barrier queue cannot continue until that is complete
1147 dispatch_suspend(fd_entry->barrier_queue);
1148 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
1149 _dispatch_io_devs_lockq_init);
1150 dispatch_async(_dispatch_io_devs_lockq, ^{
1151 _dispatch_disk_init(fd_entry, dev);
1152 dispatch_resume(fd_entry->barrier_queue);
1153 });
1154 } else {
1155 if (orig_flags != -1) {
1156 _dispatch_io_syscall_switch(err,
1157 fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
1158 default:
1159 orig_flags = -1;
1160 (void)dispatch_assume_zero(err);
1161 break;
1162 );
1163 }
1164 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1165 DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
1166 }
1167 fd_entry->orig_flags = orig_flags;
1168 fd_entry->orig_nosigpipe = orig_nosigpipe;
1169 });
1170 // This is the first item run when the close queue is resumed, indicating
1171 // that all channels associated with this entry have been closed and that
1172 // all operations associated with this entry have been freed
1173 dispatch_async(fd_entry->close_queue, ^{
1174 if (!fd_entry->disk) {
1175 _dispatch_io_debug("close queue fd_entry cleanup", fd);
1176 dispatch_op_direction_t dir;
1177 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1178 _dispatch_stream_dispose(fd_entry, dir);
1179 }
1180 } else {
1181 dispatch_disk_t disk = fd_entry->disk;
1182 dispatch_async(_dispatch_io_devs_lockq, ^{
1183 _dispatch_release(disk);
1184 });
1185 }
1186 // Remove this entry from the global fd list
1187 TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
1188 });
1189 // If there was a source associated with this stream, disposing of the
1190 // source cancels it and suspends the close queue. Freeing the fd_entry
1191 // structure must happen after the source cancel handler has finished
1192 dispatch_async(fd_entry->close_queue, ^{
1193 _dispatch_io_debug("close queue release", fd);
1194 dispatch_release(fd_entry->close_queue);
1195 _dispatch_io_debug("barrier queue release", fd);
1196 dispatch_release(fd_entry->barrier_queue);
1197 _dispatch_io_debug("barrier group release", fd);
1198 dispatch_release(fd_entry->barrier_group);
1199 if (fd_entry->orig_flags != -1) {
1200 _dispatch_io_syscall(
1201 fcntl(fd, F_SETFL, fd_entry->orig_flags)
1202 );
1203 }
1204 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1205 if (fd_entry->orig_nosigpipe != -1) {
1206 _dispatch_io_syscall(
1207 fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
1208 );
1209 }
1210 #endif
1211 if (fd_entry->convenience_channel) {
1212 fd_entry->convenience_channel->fd_entry = NULL;
1213 dispatch_release(fd_entry->convenience_channel);
1214 }
1215 free(fd_entry);
1216 });
1217 return fd_entry;
1218 }
1219
1220 static dispatch_fd_entry_t
1221 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
1222 dev_t dev, mode_t mode)
1223 {
1224 // On devs lock queue
1225 _dispatch_io_debug("fd entry create with path %s", -1, path_data->path);
1226 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1227 path_data->channel->queue);
1228 if (S_ISREG(mode)) {
1229 _dispatch_disk_init(fd_entry, major(dev));
1230 } else {
1231 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1232 DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
1233 }
1234 fd_entry->fd = -1;
1235 fd_entry->orig_flags = -1;
1236 fd_entry->path_data = path_data;
1237 fd_entry->stat.dev = dev;
1238 fd_entry->stat.mode = mode;
1239 fd_entry->barrier_queue = dispatch_queue_create(
1240 "com.apple.libdispatch-io.barrierq", NULL);
1241 fd_entry->barrier_group = dispatch_group_create();
1242 // This is the first item run when the close queue is resumed, indicating
1243 // that the channel associated with this entry has been closed and that
1244 // all operations associated with this entry have been freed
1245 dispatch_async(fd_entry->close_queue, ^{
1246 _dispatch_io_debug("close queue fd_entry cleanup", -1);
1247 if (!fd_entry->disk) {
1248 dispatch_op_direction_t dir;
1249 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1250 _dispatch_stream_dispose(fd_entry, dir);
1251 }
1252 }
1253 if (fd_entry->fd != -1) {
1254 close(fd_entry->fd);
1255 }
1256 if (fd_entry->path_data->channel) {
1257 // If associated channel has not been released yet, mark it as
1258 // no longer having an fd_entry (for stop after close).
1259 // It is safe to modify channel since we are on close_queue with
1260 // target queue the channel queue
1261 fd_entry->path_data->channel->fd_entry = NULL;
1262 }
1263 });
1264 dispatch_async(fd_entry->close_queue, ^{
1265 _dispatch_io_debug("close queue release", -1);
1266 dispatch_release(fd_entry->close_queue);
1267 dispatch_release(fd_entry->barrier_queue);
1268 dispatch_release(fd_entry->barrier_group);
1269 free(fd_entry->path_data);
1270 free(fd_entry);
1271 });
1272 return fd_entry;
1273 }
1274
1275 static int
1276 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
1277 {
1278 if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
1279 return 0;
1280 }
1281 if (fd_entry->err) {
1282 return fd_entry->err;
1283 }
1284 int fd = -1;
1285 int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
1286 fd_entry->path_data->oflag | O_NONBLOCK;
1287 open:
1288 fd = open(fd_entry->path_data->path, oflag, fd_entry->path_data->mode);
1289 if (fd == -1) {
1290 int err = errno;
1291 if (err == EINTR) {
1292 goto open;
1293 }
1294 (void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err);
1295 return err;
1296 }
1297 if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd)) {
1298 // Lost the race with another open
1299 close(fd);
1300 } else {
1301 channel->fd_actual = fd;
1302 }
1303 return 0;
1304 }
1305
1306 static void
1307 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
1308 dispatch_io_t channel)
1309 {
1310 if (fd_entry->disk) {
1311 if (channel) {
1312 _dispatch_retain(channel);
1313 }
1314 _dispatch_fd_entry_retain(fd_entry);
1315 dispatch_async(fd_entry->disk->pick_queue, ^{
1316 _dispatch_disk_cleanup_operations(fd_entry->disk, channel);
1317 _dispatch_fd_entry_release(fd_entry);
1318 if (channel) {
1319 _dispatch_release(channel);
1320 }
1321 });
1322 } else {
1323 dispatch_op_direction_t direction;
1324 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1325 dispatch_stream_t stream = fd_entry->streams[direction];
1326 if (!stream) {
1327 continue;
1328 }
1329 if (channel) {
1330 _dispatch_retain(channel);
1331 }
1332 _dispatch_fd_entry_retain(fd_entry);
1333 dispatch_async(stream->dq, ^{
1334 _dispatch_stream_cleanup_operations(stream, channel);
1335 _dispatch_fd_entry_release(fd_entry);
1336 if (channel) {
1337 _dispatch_release(channel);
1338 }
1339 });
1340 }
1341 }
1342 }
1343
1344 #pragma mark -
1345 #pragma mark dispatch_stream_t/dispatch_disk_t
1346
1347 static void
1348 _dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
1349 {
1350 dispatch_op_direction_t direction;
1351 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1352 dispatch_stream_t stream;
1353 stream = calloc(1ul, sizeof(struct dispatch_stream_s));
1354 stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq",
1355 NULL);
1356 _dispatch_retain(tq);
1357 stream->dq->do_targetq = tq;
1358 TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
1359 TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
1360 fd_entry->streams[direction] = stream;
1361 }
1362 }
1363
1364 static void
1365 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
1366 dispatch_op_direction_t direction)
1367 {
1368 // On close queue
1369 dispatch_stream_t stream = fd_entry->streams[direction];
1370 if (!stream) {
1371 return;
1372 }
1373 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1374 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
1375 if (stream->source) {
1376 // Balanced by source cancel handler:
1377 _dispatch_fd_entry_retain(fd_entry);
1378 dispatch_source_cancel(stream->source);
1379 dispatch_resume(stream->source);
1380 dispatch_release(stream->source);
1381 }
1382 dispatch_release(stream->dq);
1383 free(stream);
1384 }
1385
1386 static void
1387 _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
1388 {
1389 // On devs lock queue
1390 dispatch_disk_t disk;
1391 char label_name[256];
1392 // Check to see if there is an existing entry for the given device
1393 uintptr_t hash = DIO_HASH(dev);
1394 TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
1395 if (disk->dev == dev) {
1396 _dispatch_retain(disk);
1397 goto out;
1398 }
1399 }
1400 // Otherwise create a new entry
1401 size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
1402 disk = _dispatch_alloc(DISPATCH_VTABLE(disk),
1403 sizeof(struct dispatch_disk_s) +
1404 (pending_reqs_depth * sizeof(dispatch_operation_t)));
1405 disk->do_next = DISPATCH_OBJECT_LISTLESS;
1406 disk->do_xref_cnt = -1;
1407 disk->advise_list_depth = pending_reqs_depth;
1408 disk->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
1409 false);
1410 disk->dev = dev;
1411 TAILQ_INIT(&disk->operations);
1412 disk->cur_rq = TAILQ_FIRST(&disk->operations);
1413 sprintf(label_name, "com.apple.libdispatch-io.deviceq.%d", dev);
1414 disk->pick_queue = dispatch_queue_create(label_name, NULL);
1415 TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
1416 out:
1417 fd_entry->disk = disk;
1418 TAILQ_INIT(&fd_entry->stream_ops);
1419 }
1420
1421 void
1422 _dispatch_disk_dispose(dispatch_disk_t disk)
1423 {
1424 uintptr_t hash = DIO_HASH(disk->dev);
1425 TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
1426 dispatch_assert(TAILQ_EMPTY(&disk->operations));
1427 size_t i;
1428 for (i=0; i<disk->advise_list_depth; ++i) {
1429 dispatch_assert(!disk->advise_list[i]);
1430 }
1431 dispatch_release(disk->pick_queue);
1432 }
1433
1434 #pragma mark -
1435 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1436
1437 static inline bool
1438 _dispatch_stream_operation_avail(dispatch_stream_t stream)
1439 {
1440 return !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
1441 !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1442 }
1443
1444 static void
1445 _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
1446 dispatch_operation_t op, dispatch_data_t data)
1447 {
1448 if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
1449 return;
1450 }
1451 bool no_ops = !_dispatch_stream_operation_avail(stream);
1452 TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
1453 if (no_ops) {
1454 dispatch_async_f(stream->dq, stream, _dispatch_stream_handler);
1455 }
1456 }
1457
1458 static void
1459 _dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
1460 dispatch_data_t data)
1461 {
1462 if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
1463 return;
1464 }
1465 if (op->params.type == DISPATCH_IO_STREAM) {
1466 if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
1467 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1468 }
1469 TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
1470 } else {
1471 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1472 }
1473 _dispatch_disk_handler(disk);
1474 }
1475
1476 static void
1477 _dispatch_stream_complete_operation(dispatch_stream_t stream,
1478 dispatch_operation_t op)
1479 {
1480 // On stream queue
1481 _dispatch_io_debug("complete operation", op->fd_entry->fd);
1482 TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
1483 if (op == stream->op) {
1484 stream->op = NULL;
1485 }
1486 if (op->timer) {
1487 dispatch_source_cancel(op->timer);
1488 }
1489 // Final release will deliver any pending data
1490 _dispatch_release(op);
1491 }
1492
1493 static void
1494 _dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
1495 {
1496 // On pick queue
1497 _dispatch_io_debug("complete operation", op->fd_entry->fd);
1498 // Current request is always the last op returned
1499 if (disk->cur_rq == op) {
1500 disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
1501 operation_list);
1502 }
1503 if (op->params.type == DISPATCH_IO_STREAM) {
1504 // Check if there are other pending stream operations behind it
1505 dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
1506 TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
1507 if (op_next) {
1508 TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
1509 }
1510 }
1511 TAILQ_REMOVE(&disk->operations, op, operation_list);
1512 if (op->timer) {
1513 dispatch_source_cancel(op->timer);
1514 }
1515 // Final release will deliver any pending data
1516 _dispatch_release(op);
1517 }
1518
1519 static dispatch_operation_t
1520 _dispatch_stream_pick_next_operation(dispatch_stream_t stream,
1521 dispatch_operation_t op)
1522 {
1523 // On stream queue
1524 if (!op) {
1525 // On the first run through, pick the first operation
1526 if (!_dispatch_stream_operation_avail(stream)) {
1527 return op;
1528 }
1529 if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
1530 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
1531 } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
1532 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1533 }
1534 return op;
1535 }
1536 if (op->params.type == DISPATCH_IO_STREAM) {
1537 // Stream operations need to be serialized so continue the current
1538 // operation until it is finished
1539 return op;
1540 }
1541 // Get the next random operation (round-robin)
1542 if (op->params.type == DISPATCH_IO_RANDOM) {
1543 op = TAILQ_NEXT(op, operation_list);
1544 if (!op) {
1545 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1546 }
1547 return op;
1548 }
1549 return NULL;
1550 }
1551
1552 static dispatch_operation_t
1553 _dispatch_disk_pick_next_operation(dispatch_disk_t disk)
1554 {
1555 // On pick queue
1556 dispatch_operation_t op;
1557 if (!TAILQ_EMPTY(&disk->operations)) {
1558 if (disk->cur_rq == NULL) {
1559 op = TAILQ_FIRST(&disk->operations);
1560 } else {
1561 op = disk->cur_rq;
1562 do {
1563 op = TAILQ_NEXT(op, operation_list);
1564 if (!op) {
1565 op = TAILQ_FIRST(&disk->operations);
1566 }
1567 // TODO: more involved picking algorithm rdar://problem/8780312
1568 } while (op->active && op != disk->cur_rq);
1569 }
1570 if (!op->active) {
1571 disk->cur_rq = op;
1572 return op;
1573 }
1574 }
1575 return NULL;
1576 }
1577
1578 static void
1579 _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
1580 dispatch_io_t channel)
1581 {
1582 // On stream queue
1583 dispatch_operation_t op, tmp;
1584 typeof(*stream->operations) *operations;
1585 operations = &stream->operations[DISPATCH_IO_RANDOM];
1586 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1587 if (!channel || op->channel == channel) {
1588 _dispatch_stream_complete_operation(stream, op);
1589 }
1590 }
1591 operations = &stream->operations[DISPATCH_IO_STREAM];
1592 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1593 if (!channel || op->channel == channel) {
1594 _dispatch_stream_complete_operation(stream, op);
1595 }
1596 }
1597 if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
1598 dispatch_suspend(stream->source);
1599 stream->source_running = false;
1600 }
1601 }
1602
1603 static void
1604 _dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
1605 {
1606 // On pick queue
1607 dispatch_operation_t op, tmp;
1608 TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
1609 if (!channel || op->channel == channel) {
1610 _dispatch_disk_complete_operation(disk, op);
1611 }
1612 }
1613 }
1614
1615 #pragma mark -
1616 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1617
1618 static dispatch_source_t
1619 _dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
1620 {
1621 // On stream queue
1622 if (stream->source) {
1623 return stream->source;
1624 }
1625 dispatch_fd_t fd = op->fd_entry->fd;
1626 _dispatch_io_debug("stream source create", fd);
1627 dispatch_source_t source = NULL;
1628 if (op->direction == DOP_DIR_READ) {
1629 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0,
1630 stream->dq);
1631 } else if (op->direction == DOP_DIR_WRITE) {
1632 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fd, 0,
1633 stream->dq);
1634 } else {
1635 dispatch_assert(op->direction < DOP_DIR_MAX);
1636 return NULL;
1637 }
1638 dispatch_set_context(source, stream);
1639 dispatch_source_set_event_handler_f(source,
1640 _dispatch_stream_source_handler);
1641 // Close queue must not run user cleanup handlers until sources are fully
1642 // unregistered
1643 dispatch_queue_t close_queue = op->fd_entry->close_queue;
1644 dispatch_source_set_cancel_handler(source, ^{
1645 _dispatch_io_debug("stream source cancel", fd);
1646 dispatch_resume(close_queue);
1647 });
1648 stream->source = source;
1649 return stream->source;
1650 }
1651
1652 static void
1653 _dispatch_stream_source_handler(void *ctx)
1654 {
1655 // On stream queue
1656 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1657 dispatch_suspend(stream->source);
1658 stream->source_running = false;
1659 return _dispatch_stream_handler(stream);
1660 }
1661
1662 static void
1663 _dispatch_stream_handler(void *ctx)
1664 {
1665 // On stream queue
1666 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1667 dispatch_operation_t op;
1668 pick:
1669 op = _dispatch_stream_pick_next_operation(stream, stream->op);
1670 if (!op) {
1671 _dispatch_debug("no operation found: stream %p", stream);
1672 return;
1673 }
1674 int err = _dispatch_io_get_error(op, NULL, true);
1675 if (err) {
1676 op->err = err;
1677 _dispatch_stream_complete_operation(stream, op);
1678 goto pick;
1679 }
1680 stream->op = op;
1681 _dispatch_io_debug("stream handler", op->fd_entry->fd);
1682 dispatch_fd_entry_t fd_entry = op->fd_entry;
1683 _dispatch_fd_entry_retain(fd_entry);
1684 // For performance analysis
1685 if (!op->total && dispatch_io_defaults.initial_delivery) {
1686 // Empty delivery to signal the start of the operation
1687 _dispatch_io_debug("initial delivery", op->fd_entry->fd);
1688 _dispatch_operation_deliver_data(op, DOP_DELIVER);
1689 }
1690 // TODO: perform on the operation target queue to get correct priority
1691 int result = _dispatch_operation_perform(op), flags = -1;
1692 switch (result) {
1693 case DISPATCH_OP_DELIVER:
1694 flags = DOP_DEFAULT;
1695 // Fall through
1696 case DISPATCH_OP_DELIVER_AND_COMPLETE:
1697 flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
1698 DOP_DEFAULT;
1699 _dispatch_operation_deliver_data(op, flags);
1700 // Fall through
1701 case DISPATCH_OP_COMPLETE:
1702 if (flags != DOP_DEFAULT) {
1703 _dispatch_stream_complete_operation(stream, op);
1704 }
1705 if (_dispatch_stream_operation_avail(stream)) {
1706 dispatch_async_f(stream->dq, stream, _dispatch_stream_handler);
1707 }
1708 break;
1709 case DISPATCH_OP_COMPLETE_RESUME:
1710 _dispatch_stream_complete_operation(stream, op);
1711 // Fall through
1712 case DISPATCH_OP_RESUME:
1713 if (_dispatch_stream_operation_avail(stream)) {
1714 stream->source_running = true;
1715 dispatch_resume(_dispatch_stream_source(stream, op));
1716 }
1717 break;
1718 case DISPATCH_OP_ERR:
1719 _dispatch_stream_cleanup_operations(stream, op->channel);
1720 break;
1721 case DISPATCH_OP_FD_ERR:
1722 _dispatch_fd_entry_retain(fd_entry);
1723 dispatch_async(fd_entry->barrier_queue, ^{
1724 _dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
1725 _dispatch_fd_entry_release(fd_entry);
1726 });
1727 break;
1728 default:
1729 break;
1730 }
1731 _dispatch_fd_entry_release(fd_entry);
1732 return;
1733 }
1734
1735 static void
1736 _dispatch_disk_handler(void *ctx)
1737 {
1738 // On pick queue
1739 dispatch_disk_t disk = (dispatch_disk_t)ctx;
1740 if (disk->io_active) {
1741 return;
1742 }
1743 _dispatch_io_debug("disk handler", -1);
1744 dispatch_operation_t op;
1745 size_t i = disk->free_idx, j = disk->req_idx;
1746 if (j <= i) {
1747 j += disk->advise_list_depth;
1748 }
1749 while (i <= j) {
1750 if ((!disk->advise_list[i%disk->advise_list_depth]) &&
1751 (op = _dispatch_disk_pick_next_operation(disk))) {
1752 int err = _dispatch_io_get_error(op, NULL, true);
1753 if (err) {
1754 op->err = err;
1755 _dispatch_disk_complete_operation(disk, op);
1756 continue;
1757 }
1758 _dispatch_retain(op);
1759 disk->advise_list[i%disk->advise_list_depth] = op;
1760 op->active = true;
1761 } else {
1762 // No more operations to get
1763 break;
1764 }
1765 i++;
1766 }
1767 disk->free_idx = (i%disk->advise_list_depth);
1768 op = disk->advise_list[disk->req_idx];
1769 if (op) {
1770 disk->io_active = true;
1771 dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
1772 }
1773 }
1774
1775 static void
1776 _dispatch_disk_perform(void *ctxt)
1777 {
1778 dispatch_disk_t disk = ctxt;
1779 size_t chunk_size = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
1780 _dispatch_io_debug("disk perform", -1);
1781 dispatch_operation_t op;
1782 size_t i = disk->advise_idx, j = disk->free_idx;
1783 if (j <= i) {
1784 j += disk->advise_list_depth;
1785 }
1786 do {
1787 op = disk->advise_list[i%disk->advise_list_depth];
1788 if (!op) {
1789 // Nothing more to advise, must be at free_idx
1790 dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
1791 break;
1792 }
1793 if (op->direction == DOP_DIR_WRITE) {
1794 // TODO: preallocate writes ? rdar://problem/9032172
1795 continue;
1796 }
1797 if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
1798 op->channel)) {
1799 continue;
1800 }
1801 // For performance analysis
1802 if (!op->total && dispatch_io_defaults.initial_delivery) {
1803 // Empty delivery to signal the start of the operation
1804 _dispatch_io_debug("initial delivery", op->fd_entry->fd);
1805 _dispatch_operation_deliver_data(op, DOP_DELIVER);
1806 }
1807 // Advise two chunks if the list only has one element and this is the
1808 // first advise on the operation
1809 if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
1810 !op->advise_offset) {
1811 chunk_size *= 2;
1812 }
1813 _dispatch_operation_advise(op, chunk_size);
1814 } while (++i < j);
1815 disk->advise_idx = i%disk->advise_list_depth;
1816 op = disk->advise_list[disk->req_idx];
1817 int result = _dispatch_operation_perform(op);
1818 disk->advise_list[disk->req_idx] = NULL;
1819 disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
1820 dispatch_async(disk->pick_queue, ^{
1821 switch (result) {
1822 case DISPATCH_OP_DELIVER:
1823 _dispatch_operation_deliver_data(op, DOP_DEFAULT);
1824 break;
1825 case DISPATCH_OP_COMPLETE:
1826 _dispatch_disk_complete_operation(disk, op);
1827 break;
1828 case DISPATCH_OP_DELIVER_AND_COMPLETE:
1829 _dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
1830 _dispatch_disk_complete_operation(disk, op);
1831 break;
1832 case DISPATCH_OP_ERR:
1833 _dispatch_disk_cleanup_operations(disk, op->channel);
1834 break;
1835 case DISPATCH_OP_FD_ERR:
1836 _dispatch_disk_cleanup_operations(disk, NULL);
1837 break;
1838 default:
1839 dispatch_assert(result);
1840 break;
1841 }
1842 op->active = false;
1843 disk->io_active = false;
1844 _dispatch_disk_handler(disk);
1845 // Balancing the retain in _dispatch_disk_handler. Note that op must be
1846 // released at the very end, since it might hold the last reference to
1847 // the disk
1848 _dispatch_release(op);
1849 });
1850 }
1851
1852 #pragma mark -
1853 #pragma mark dispatch_operation_perform
1854
1855 static void
1856 _dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
1857 {
1858 int err;
1859 struct radvisory advise;
1860 // No point in issuing a read advise for the next chunk if we are already
1861 // a chunk ahead from reading the bytes
1862 if (op->advise_offset > (off_t)((op->offset+op->total) + chunk_size +
1863 PAGE_SIZE)) {
1864 return;
1865 }
1866 advise.ra_count = (int)chunk_size;
1867 if (!op->advise_offset) {
1868 op->advise_offset = op->offset;
1869 // If this is the first time through, align the advised range to a
1870 // page boundary
1871 size_t pg_fraction = (size_t)((op->offset + chunk_size) % PAGE_SIZE);
1872 advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
1873 }
1874 advise.ra_offset = op->advise_offset;
1875 op->advise_offset += advise.ra_count;
1876 _dispatch_io_syscall_switch(err,
1877 fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
1878 // TODO: set disk status on error
1879 default: (void)dispatch_assume_zero(err); break;
1880 );
1881 }
1882
1883 static int
1884 _dispatch_operation_perform(dispatch_operation_t op)
1885 {
1886 int err = _dispatch_io_get_error(op, NULL, true);
1887 if (err) {
1888 goto error;
1889 }
1890 if (!op->buf) {
1891 size_t max_buf_siz = op->params.high;
1892 size_t chunk_siz = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
1893 if (op->direction == DOP_DIR_READ) {
1894 // If necessary, create a buffer for the ongoing operation, large
1895 // enough to fit chunk_pages but at most high-water
1896 size_t data_siz = dispatch_data_get_size(op->data);
1897 if (data_siz) {
1898 dispatch_assert(data_siz < max_buf_siz);
1899 max_buf_siz -= data_siz;
1900 }
1901 if (max_buf_siz > chunk_siz) {
1902 max_buf_siz = chunk_siz;
1903 }
1904 if (op->length < SIZE_MAX) {
1905 op->buf_siz = op->length - op->total;
1906 if (op->buf_siz > max_buf_siz) {
1907 op->buf_siz = max_buf_siz;
1908 }
1909 } else {
1910 op->buf_siz = max_buf_siz;
1911 }
1912 op->buf = valloc(op->buf_siz);
1913 _dispatch_io_debug("buffer allocated", op->fd_entry->fd);
1914 } else if (op->direction == DOP_DIR_WRITE) {
1915 // Always write the first data piece, if that is smaller than a
1916 // chunk, accumulate further data pieces until chunk size is reached
1917 if (chunk_siz > max_buf_siz) {
1918 chunk_siz = max_buf_siz;
1919 }
1920 op->buf_siz = 0;
1921 dispatch_data_apply(op->data,
1922 ^(dispatch_data_t region DISPATCH_UNUSED,
1923 size_t offset DISPATCH_UNUSED,
1924 const void* buf DISPATCH_UNUSED, size_t len) {
1925 size_t siz = op->buf_siz + len;
1926 if (!op->buf_siz || siz <= chunk_siz) {
1927 op->buf_siz = siz;
1928 }
1929 return (bool)(siz < chunk_siz);
1930 });
1931 if (op->buf_siz > max_buf_siz) {
1932 op->buf_siz = max_buf_siz;
1933 }
1934 dispatch_data_t d;
1935 d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
1936 op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
1937 NULL);
1938 _dispatch_io_data_release(d);
1939 _dispatch_io_debug("buffer mapped", op->fd_entry->fd);
1940 }
1941 }
1942 if (op->fd_entry->fd == -1) {
1943 err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
1944 if (err) {
1945 goto error;
1946 }
1947 }
1948 void *buf = op->buf + op->buf_len;
1949 size_t len = op->buf_siz - op->buf_len;
1950 off_t off = op->offset + op->total;
1951 ssize_t processed = -1;
1952 syscall:
1953 if (op->direction == DOP_DIR_READ) {
1954 if (op->params.type == DISPATCH_IO_STREAM) {
1955 processed = read(op->fd_entry->fd, buf, len);
1956 } else if (op->params.type == DISPATCH_IO_RANDOM) {
1957 processed = pread(op->fd_entry->fd, buf, len, off);
1958 }
1959 } else if (op->direction == DOP_DIR_WRITE) {
1960 if (op->params.type == DISPATCH_IO_STREAM) {
1961 processed = write(op->fd_entry->fd, buf, len);
1962 } else if (op->params.type == DISPATCH_IO_RANDOM) {
1963 processed = pwrite(op->fd_entry->fd, buf, len, off);
1964 }
1965 }
1966 // Encountered an error on the file descriptor
1967 if (processed == -1) {
1968 err = errno;
1969 if (err == EINTR) {
1970 goto syscall;
1971 }
1972 goto error;
1973 }
1974 // EOF is indicated by two handler invocations
1975 if (processed == 0) {
1976 _dispatch_io_debug("EOF", op->fd_entry->fd);
1977 return DISPATCH_OP_DELIVER_AND_COMPLETE;
1978 }
1979 op->buf_len += processed;
1980 op->total += processed;
1981 if (op->total == op->length) {
1982 // Finished processing all the bytes requested by the operation
1983 return DISPATCH_OP_COMPLETE;
1984 } else {
1985 // Deliver data only if we satisfy the filters
1986 return DISPATCH_OP_DELIVER;
1987 }
1988 error:
1989 if (err == EAGAIN) {
1990 // For disk based files with blocking I/O we should never get EAGAIN
1991 dispatch_assert(!op->fd_entry->disk);
1992 _dispatch_io_debug("EAGAIN %d", op->fd_entry->fd, err);
1993 if (op->direction == DOP_DIR_READ && op->total &&
1994 op->channel == op->fd_entry->convenience_channel) {
1995 // Convenience read with available data completes on EAGAIN
1996 return DISPATCH_OP_COMPLETE_RESUME;
1997 }
1998 return DISPATCH_OP_RESUME;
1999 }
2000 op->err = err;
2001 switch (err) {
2002 case ECANCELED:
2003 return DISPATCH_OP_ERR;
2004 case EBADF:
2005 (void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err);
2006 return DISPATCH_OP_FD_ERR;
2007 default:
2008 return DISPATCH_OP_COMPLETE;
2009 }
2010 }
2011
2012 static void
2013 _dispatch_operation_deliver_data(dispatch_operation_t op,
2014 dispatch_op_flags_t flags)
2015 {
2016 // Either called from stream resp. pick queue or when op is finalized
2017 dispatch_data_t data = NULL;
2018 int err = 0;
2019 size_t undelivered = op->undelivered + op->buf_len;
2020 bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
2021 (op->flags & DOP_DELIVER);
2022 op->flags = DOP_DEFAULT;
2023 if (!deliver) {
2024 // Don't deliver data until low water mark has been reached
2025 if (undelivered >= op->params.low) {
2026 deliver = true;
2027 } else if (op->buf_len < op->buf_siz) {
2028 // Request buffer is not yet used up
2029 _dispatch_io_debug("buffer data", op->fd_entry->fd);
2030 return;
2031 }
2032 } else {
2033 err = op->err;
2034 if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
2035 err = ECANCELED;
2036 op->err = err;
2037 }
2038 }
2039 // Deliver data or buffer used up
2040 if (op->direction == DOP_DIR_READ) {
2041 if (op->buf_len) {
2042 void *buf = op->buf;
2043 data = dispatch_data_create(buf, op->buf_len, NULL,
2044 DISPATCH_DATA_DESTRUCTOR_FREE);
2045 op->buf = NULL;
2046 op->buf_len = 0;
2047 dispatch_data_t d = dispatch_data_create_concat(op->data, data);
2048 _dispatch_io_data_release(op->data);
2049 _dispatch_io_data_release(data);
2050 data = d;
2051 } else {
2052 data = op->data;
2053 }
2054 op->data = deliver ? dispatch_data_empty : data;
2055 } else if (op->direction == DOP_DIR_WRITE) {
2056 if (deliver) {
2057 data = dispatch_data_create_subrange(op->data, op->buf_len,
2058 op->length);
2059 }
2060 if (op->buf_data && op->buf_len == op->buf_siz) {
2061 _dispatch_io_data_release(op->buf_data);
2062 op->buf_data = NULL;
2063 op->buf = NULL;
2064 op->buf_len = 0;
2065 // Trim newly written buffer from head of unwritten data
2066 dispatch_data_t d;
2067 if (deliver) {
2068 _dispatch_io_data_retain(data);
2069 d = data;
2070 } else {
2071 d = dispatch_data_create_subrange(op->data, op->buf_siz,
2072 op->length);
2073 }
2074 _dispatch_io_data_release(op->data);
2075 op->data = d;
2076 }
2077 } else {
2078 dispatch_assert(op->direction < DOP_DIR_MAX);
2079 return;
2080 }
2081 if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
2082 op->undelivered = undelivered;
2083 _dispatch_io_debug("buffer data", op->fd_entry->fd);
2084 return;
2085 }
2086 op->undelivered = 0;
2087 _dispatch_io_debug("deliver data", op->fd_entry->fd);
2088 dispatch_op_direction_t direction = op->direction;
2089 dispatch_io_handler_t handler = op->handler;
2090 #if DISPATCH_IO_DEBUG
2091 int fd = op->fd_entry->fd;
2092 #endif
2093 dispatch_fd_entry_t fd_entry = op->fd_entry;
2094 _dispatch_fd_entry_retain(fd_entry);
2095 dispatch_io_t channel = op->channel;
2096 _dispatch_retain(channel);
2097 // Note that data delivery may occur after the operation is freed
2098 dispatch_async(op->op_q, ^{
2099 bool done = (flags & DOP_DONE);
2100 dispatch_data_t d = data;
2101 if (done) {
2102 if (direction == DOP_DIR_READ && err) {
2103 if (dispatch_data_get_size(d)) {
2104 _dispatch_io_debug("IO handler invoke", fd);
2105 handler(false, d, 0);
2106 }
2107 d = NULL;
2108 } else if (direction == DOP_DIR_WRITE && !err) {
2109 d = NULL;
2110 }
2111 }
2112 _dispatch_io_debug("IO handler invoke", fd);
2113 handler(done, d, err);
2114 _dispatch_release(channel);
2115 _dispatch_fd_entry_release(fd_entry);
2116 _dispatch_io_data_release(data);
2117 });
2118 }