]> git.saurik.com Git - apple/libdispatch.git/blob - src/io.c
libdispatch-187.9.tar.gz
[apple/libdispatch.git] / src / io.c
1 /*
2 * Copyright (c) 2009-2011 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
24
25 DISPATCH_EXPORT DISPATCH_NOTHROW
26 void _dispatch_iocntl(uint32_t param, uint64_t value);
27
28 static void _dispatch_io_dispose(dispatch_io_t channel);
29 static dispatch_operation_t _dispatch_operation_create(
30 dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
31 size_t length, dispatch_data_t data, dispatch_queue_t queue,
32 dispatch_io_handler_t handler);
33 static void _dispatch_operation_dispose(dispatch_operation_t operation);
34 static void _dispatch_operation_enqueue(dispatch_operation_t op,
35 dispatch_op_direction_t direction, dispatch_data_t data);
36 static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
37 dispatch_operation_t op);
38 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
39 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
40 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
41 dispatch_fd_entry_init_callback_t completion_callback);
42 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
43 uintptr_t hash);
44 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
45 dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
46 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
47 dispatch_io_t channel);
48 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
49 dispatch_io_t channel);
50 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
51 dispatch_queue_t tq);
52 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
53 dispatch_op_direction_t direction);
54 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
55 static void _dispatch_disk_dispose(dispatch_disk_t disk);
56 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
57 dispatch_operation_t operation, dispatch_data_t data);
58 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
59 dispatch_operation_t operation, dispatch_data_t data);
60 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
61 dispatch_io_t channel);
62 static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk,
63 dispatch_io_t channel);
64 static void _dispatch_stream_source_handler(void *ctx);
65 static void _dispatch_stream_handler(void *ctx);
66 static void _dispatch_disk_handler(void *ctx);
67 static void _dispatch_disk_perform(void *ctxt);
68 static void _dispatch_operation_advise(dispatch_operation_t op,
69 size_t chunk_size);
70 static int _dispatch_operation_perform(dispatch_operation_t op);
71 static void _dispatch_operation_deliver_data(dispatch_operation_t op,
72 dispatch_op_flags_t flags);
73
74 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
75 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
76 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
77 case EINTR: continue; \
78 __VA_ARGS__ \
79 } \
80 } while (0)
81 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
82 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
83 case 0: break; \
84 __VA_ARGS__ \
85 ); \
86 } while (0)
87 #define _dispatch_io_syscall(__syscall) do { int __err; \
88 _dispatch_io_syscall_switch(__err, __syscall); \
89 } while (0)
90
91 enum {
92 DISPATCH_OP_COMPLETE = 1,
93 DISPATCH_OP_DELIVER,
94 DISPATCH_OP_DELIVER_AND_COMPLETE,
95 DISPATCH_OP_COMPLETE_RESUME,
96 DISPATCH_OP_RESUME,
97 DISPATCH_OP_ERR,
98 DISPATCH_OP_FD_ERR,
99 };
100
101 #pragma mark -
102 #pragma mark dispatch_io_vtable
103
104 static const struct dispatch_io_vtable_s _dispatch_io_vtable = {
105 .do_type = DISPATCH_IO_TYPE,
106 .do_kind = "channel",
107 .do_dispose = _dispatch_io_dispose,
108 .do_invoke = NULL,
109 .do_probe = (void *)dummy_function_r0,
110 .do_debug = (void *)dummy_function_r0,
111 };
112
113 static const struct dispatch_operation_vtable_s _dispatch_operation_vtable = {
114 .do_type = DISPATCH_OPERATION_TYPE,
115 .do_kind = "operation",
116 .do_dispose = _dispatch_operation_dispose,
117 .do_invoke = NULL,
118 .do_probe = (void *)dummy_function_r0,
119 .do_debug = (void *)dummy_function_r0,
120 };
121
122 static const struct dispatch_disk_vtable_s _dispatch_disk_vtable = {
123 .do_type = DISPATCH_DISK_TYPE,
124 .do_kind = "disk",
125 .do_dispose = _dispatch_disk_dispose,
126 .do_invoke = NULL,
127 .do_probe = (void *)dummy_function_r0,
128 .do_debug = (void *)dummy_function_r0,
129 };
130
131 #pragma mark -
132 #pragma mark dispatch_io_hashtables
133
134 #if TARGET_OS_EMBEDDED
135 #define DIO_HASH_SIZE 64u // must be a power of two
136 #else
137 #define DIO_HASH_SIZE 256u // must be a power of two
138 #endif
139 #define DIO_HASH(x) ((uintptr_t)((x) & (DIO_HASH_SIZE - 1)))
140
141 // Global hashtable of dev_t -> disk_s mappings
142 DISPATCH_CACHELINE_ALIGN
143 static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
144 // Global hashtable of fd -> fd_entry_s mappings
145 DISPATCH_CACHELINE_ALIGN
146 static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
147
148 static dispatch_once_t _dispatch_io_devs_lockq_pred;
149 static dispatch_queue_t _dispatch_io_devs_lockq;
150 static dispatch_queue_t _dispatch_io_fds_lockq;
151
152 static void
153 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
154 {
155 _dispatch_io_fds_lockq = dispatch_queue_create(
156 "com.apple.libdispatch-io.fd_lockq", NULL);
157 unsigned int i;
158 for (i = 0; i < DIO_HASH_SIZE; i++) {
159 TAILQ_INIT(&_dispatch_io_fds[i]);
160 }
161 }
162
163 static void
164 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
165 {
166 _dispatch_io_devs_lockq = dispatch_queue_create(
167 "com.apple.libdispatch-io.dev_lockq", NULL);
168 unsigned int i;
169 for (i = 0; i < DIO_HASH_SIZE; i++) {
170 TAILQ_INIT(&_dispatch_io_devs[i]);
171 }
172 }
173
174 #pragma mark -
175 #pragma mark dispatch_io_defaults
176
177 enum {
178 DISPATCH_IOCNTL_CHUNK_PAGES = 1,
179 DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
180 DISPATCH_IOCNTL_INITIAL_DELIVERY,
181 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
182 };
183
184 static struct dispatch_io_defaults_s {
185 size_t chunk_pages, low_water_chunks, max_pending_io_reqs;
186 bool initial_delivery;
187 } dispatch_io_defaults = {
188 .chunk_pages = DIO_MAX_CHUNK_PAGES,
189 .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
190 .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
191 };
192
193 #define _dispatch_iocntl_set_default(p, v) do { \
194 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
195 } while (0)
196
197 void
198 _dispatch_iocntl(uint32_t param, uint64_t value)
199 {
200 switch (param) {
201 case DISPATCH_IOCNTL_CHUNK_PAGES:
202 _dispatch_iocntl_set_default(chunk_pages, value);
203 break;
204 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
205 _dispatch_iocntl_set_default(low_water_chunks, value);
206 break;
207 case DISPATCH_IOCNTL_INITIAL_DELIVERY:
208 _dispatch_iocntl_set_default(initial_delivery, value);
209 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
210 _dispatch_iocntl_set_default(max_pending_io_reqs, value);
211 break;
212 }
213 }
214
215 #pragma mark -
216 #pragma mark dispatch_io_t
217
218 static dispatch_io_t
219 _dispatch_io_create(dispatch_io_type_t type)
220 {
221 dispatch_io_t channel = calloc(1ul, sizeof(struct dispatch_io_s));
222 channel->do_vtable = &_dispatch_io_vtable;
223 channel->do_next = DISPATCH_OBJECT_LISTLESS;
224 channel->do_ref_cnt = 1;
225 channel->do_xref_cnt = 1;
226 channel->do_targetq = _dispatch_get_root_queue(0, true);
227 channel->params.type = type;
228 channel->params.high = SIZE_MAX;
229 channel->params.low = dispatch_io_defaults.low_water_chunks *
230 dispatch_io_defaults.chunk_pages * PAGE_SIZE;
231 channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
232 NULL);
233 return channel;
234 }
235
236 static void
237 _dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
238 dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
239 {
240 // Enqueue the cleanup handler on the suspended close queue
241 if (cleanup_handler) {
242 _dispatch_retain(queue);
243 dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
244 dispatch_async(queue, ^{
245 _dispatch_io_debug("cleanup handler invoke", -1);
246 cleanup_handler(err);
247 });
248 _dispatch_release(queue);
249 });
250 }
251 if (fd_entry) {
252 channel->fd_entry = fd_entry;
253 dispatch_retain(fd_entry->barrier_queue);
254 dispatch_retain(fd_entry->barrier_group);
255 channel->barrier_queue = fd_entry->barrier_queue;
256 channel->barrier_group = fd_entry->barrier_group;
257 } else {
258 // Still need to create a barrier queue, since all operations go
259 // through it
260 channel->barrier_queue = dispatch_queue_create(
261 "com.apple.libdispatch-io.barrierq", NULL);
262 channel->barrier_group = dispatch_group_create();
263 }
264 }
265
266 static void
267 _dispatch_io_dispose(dispatch_io_t channel)
268 {
269 if (channel->fd_entry && !(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
270 if (channel->fd_entry->path_data) {
271 // This modification is safe since path_data->channel is checked
272 // only on close_queue (which is still suspended at this point)
273 channel->fd_entry->path_data->channel = NULL;
274 }
275 // Cleanup handlers will only run when all channels related to this
276 // fd are complete
277 _dispatch_fd_entry_release(channel->fd_entry);
278 }
279 if (channel->queue) {
280 dispatch_release(channel->queue);
281 }
282 if (channel->barrier_queue) {
283 dispatch_release(channel->barrier_queue);
284 }
285 if (channel->barrier_group) {
286 dispatch_release(channel->barrier_group);
287 }
288 _dispatch_dispose(channel);
289 }
290
291 static int
292 _dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
293 {
294 int err = 0;
295 if (S_ISDIR(mode)) {
296 err = EISDIR;
297 } else if (channel->params.type == DISPATCH_IO_RANDOM &&
298 (S_ISFIFO(mode) || S_ISSOCK(mode))) {
299 err = ESPIPE;
300 }
301 return err;
302 }
303
304 static int
305 _dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
306 bool ignore_closed)
307 {
308 // On _any_ queue
309 int err;
310 if (op) {
311 channel = op->channel;
312 }
313 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
314 if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
315 err = ECANCELED;
316 } else {
317 err = 0;
318 }
319 } else {
320 err = op ? op->fd_entry->err : channel->err;
321 }
322 return err;
323 }
324
325 #pragma mark -
326 #pragma mark dispatch_io_channels
327
328 dispatch_io_t
329 dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
330 dispatch_queue_t queue, void (^cleanup_handler)(int))
331 {
332 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
333 return NULL;
334 }
335 _dispatch_io_debug("io create", fd);
336 dispatch_io_t channel = _dispatch_io_create(type);
337 channel->fd = fd;
338 channel->fd_actual = fd;
339 dispatch_suspend(channel->queue);
340 _dispatch_retain(queue);
341 _dispatch_retain(channel);
342 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
343 // On barrier queue
344 int err = fd_entry->err;
345 if (!err) {
346 err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
347 }
348 if (!err && type == DISPATCH_IO_RANDOM) {
349 off_t f_ptr;
350 _dispatch_io_syscall_switch_noerr(err,
351 f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
352 case 0: channel->f_ptr = f_ptr; break;
353 default: (void)dispatch_assume_zero(err); break;
354 );
355 }
356 channel->err = err;
357 _dispatch_fd_entry_retain(fd_entry);
358 _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
359 dispatch_resume(channel->queue);
360 _dispatch_release(channel);
361 _dispatch_release(queue);
362 });
363 return channel;
364 }
365
366 dispatch_io_t
367 dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
368 int oflag, mode_t mode, dispatch_queue_t queue,
369 void (^cleanup_handler)(int error))
370 {
371 if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
372 !(path && *path == '/')) {
373 return NULL;
374 }
375 size_t pathlen = strlen(path);
376 dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
377 if (!path_data) {
378 return NULL;
379 }
380 _dispatch_io_debug("io create with path %s", -1, path);
381 dispatch_io_t channel = _dispatch_io_create(type);
382 channel->fd = -1;
383 channel->fd_actual = -1;
384 path_data->channel = channel;
385 path_data->oflag = oflag;
386 path_data->mode = mode;
387 path_data->pathlen = pathlen;
388 memcpy(path_data->path, path, pathlen + 1);
389 _dispatch_retain(queue);
390 _dispatch_retain(channel);
391 dispatch_async(channel->queue, ^{
392 int err = 0;
393 struct stat st;
394 _dispatch_io_syscall_switch_noerr(err,
395 (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW ||
396 (path_data->oflag & O_SYMLINK) == O_SYMLINK ?
397 lstat(path_data->path, &st) : stat(path_data->path, &st),
398 case 0:
399 err = _dispatch_io_validate_type(channel, st.st_mode);
400 break;
401 default:
402 if ((path_data->oflag & O_CREAT) &&
403 (*(path_data->path + path_data->pathlen - 1) != '/')) {
404 // Check parent directory
405 char *c = strrchr(path_data->path, '/');
406 dispatch_assert(c);
407 *c = 0;
408 int perr;
409 _dispatch_io_syscall_switch_noerr(perr,
410 stat(path_data->path, &st),
411 case 0:
412 // Since the parent directory exists, open() will
413 // create a regular file after the fd_entry has
414 // been filled in
415 st.st_mode = S_IFREG;
416 err = 0;
417 break;
418 );
419 *c = '/';
420 }
421 break;
422 );
423 channel->err = err;
424 if (err) {
425 free(path_data);
426 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
427 _dispatch_release(channel);
428 _dispatch_release(queue);
429 return;
430 }
431 dispatch_suspend(channel->queue);
432 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
433 _dispatch_io_devs_lockq_init);
434 dispatch_async(_dispatch_io_devs_lockq, ^{
435 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
436 path_data, st.st_dev, st.st_mode);
437 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
438 dispatch_resume(channel->queue);
439 _dispatch_release(channel);
440 _dispatch_release(queue);
441 });
442 });
443 return channel;
444 }
445
446 dispatch_io_t
447 dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
448 dispatch_queue_t queue, void (^cleanup_handler)(int error))
449 {
450 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
451 return NULL;
452 }
453 _dispatch_io_debug("io create with io %p", -1, in_channel);
454 dispatch_io_t channel = _dispatch_io_create(type);
455 dispatch_suspend(channel->queue);
456 _dispatch_retain(queue);
457 _dispatch_retain(channel);
458 _dispatch_retain(in_channel);
459 dispatch_async(in_channel->queue, ^{
460 int err0 = _dispatch_io_get_error(NULL, in_channel, false);
461 if (err0) {
462 channel->err = err0;
463 _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
464 dispatch_resume(channel->queue);
465 _dispatch_release(channel);
466 _dispatch_release(in_channel);
467 _dispatch_release(queue);
468 return;
469 }
470 dispatch_async(in_channel->barrier_queue, ^{
471 int err = _dispatch_io_get_error(NULL, in_channel, false);
472 // If there is no error, the fd_entry for the in_channel is valid.
473 // Since we are running on in_channel's queue, the fd_entry has been
474 // fully resolved and will stay valid for the duration of this block
475 if (!err) {
476 err = in_channel->err;
477 if (!err) {
478 err = in_channel->fd_entry->err;
479 }
480 }
481 if (!err) {
482 err = _dispatch_io_validate_type(channel,
483 in_channel->fd_entry->stat.mode);
484 }
485 if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
486 off_t f_ptr;
487 _dispatch_io_syscall_switch_noerr(err,
488 f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
489 case 0: channel->f_ptr = f_ptr; break;
490 default: (void)dispatch_assume_zero(err); break;
491 );
492 }
493 channel->err = err;
494 if (err) {
495 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
496 dispatch_resume(channel->queue);
497 _dispatch_release(channel);
498 _dispatch_release(in_channel);
499 _dispatch_release(queue);
500 return;
501 }
502 if (in_channel->fd == -1) {
503 // in_channel was created from path
504 channel->fd = -1;
505 channel->fd_actual = -1;
506 mode_t mode = in_channel->fd_entry->stat.mode;
507 dev_t dev = in_channel->fd_entry->stat.dev;
508 size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
509 in_channel->fd_entry->path_data->pathlen + 1;
510 dispatch_io_path_data_t path_data = malloc(path_data_len);
511 memcpy(path_data, in_channel->fd_entry->path_data,
512 path_data_len);
513 path_data->channel = channel;
514 // lockq_io_devs is known to already exist
515 dispatch_async(_dispatch_io_devs_lockq, ^{
516 dispatch_fd_entry_t fd_entry;
517 fd_entry = _dispatch_fd_entry_create_with_path(path_data,
518 dev, mode);
519 _dispatch_io_init(channel, fd_entry, queue, 0,
520 cleanup_handler);
521 dispatch_resume(channel->queue);
522 _dispatch_release(channel);
523 _dispatch_release(queue);
524 });
525 } else {
526 dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
527 channel->fd = in_channel->fd;
528 channel->fd_actual = in_channel->fd_actual;
529 _dispatch_fd_entry_retain(fd_entry);
530 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
531 dispatch_resume(channel->queue);
532 _dispatch_release(channel);
533 _dispatch_release(queue);
534 }
535 _dispatch_release(in_channel);
536 });
537 });
538 return channel;
539 }
540
541 #pragma mark -
542 #pragma mark dispatch_io_accessors
543
544 void
545 dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
546 {
547 _dispatch_retain(channel);
548 dispatch_async(channel->queue, ^{
549 _dispatch_io_debug("io set high water", channel->fd);
550 if (channel->params.low > high_water) {
551 channel->params.low = high_water;
552 }
553 channel->params.high = high_water ? high_water : 1;
554 _dispatch_release(channel);
555 });
556 }
557
558 void
559 dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
560 {
561 _dispatch_retain(channel);
562 dispatch_async(channel->queue, ^{
563 _dispatch_io_debug("io set low water", channel->fd);
564 if (channel->params.high < low_water) {
565 channel->params.high = low_water ? low_water : 1;
566 }
567 channel->params.low = low_water;
568 _dispatch_release(channel);
569 });
570 }
571
572 void
573 dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
574 unsigned long flags)
575 {
576 _dispatch_retain(channel);
577 dispatch_async(channel->queue, ^{
578 _dispatch_io_debug("io set interval", channel->fd);
579 channel->params.interval = interval;
580 channel->params.interval_flags = flags;
581 _dispatch_release(channel);
582 });
583 }
584
585 void
586 _dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
587 {
588 _dispatch_retain(dq);
589 _dispatch_retain(channel);
590 dispatch_async(channel->queue, ^{
591 dispatch_queue_t prev_dq = channel->do_targetq;
592 channel->do_targetq = dq;
593 _dispatch_release(prev_dq);
594 _dispatch_release(channel);
595 });
596 }
597
598 dispatch_fd_t
599 dispatch_io_get_descriptor(dispatch_io_t channel)
600 {
601 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
602 return -1;
603 }
604 dispatch_fd_t fd = channel->fd_actual;
605 if (fd == -1 &&
606 _dispatch_thread_getspecific(dispatch_io_key) == channel) {
607 dispatch_fd_entry_t fd_entry = channel->fd_entry;
608 (void)_dispatch_fd_entry_open(fd_entry, channel);
609 }
610 return channel->fd_actual;
611 }
612
613 #pragma mark -
614 #pragma mark dispatch_io_operations
615
616 static void
617 _dispatch_io_stop(dispatch_io_t channel)
618 {
619 _dispatch_io_debug("io stop", channel->fd);
620 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED);
621 _dispatch_retain(channel);
622 dispatch_async(channel->queue, ^{
623 dispatch_async(channel->barrier_queue, ^{
624 dispatch_fd_entry_t fd_entry = channel->fd_entry;
625 if (fd_entry) {
626 _dispatch_io_debug("io stop cleanup", channel->fd);
627 _dispatch_fd_entry_cleanup_operations(fd_entry, channel);
628 if (!(channel->atomic_flags & DIO_CLOSED)) {
629 channel->fd_entry = NULL;
630 _dispatch_fd_entry_release(fd_entry);
631 }
632 } else if (channel->fd != -1) {
633 // Stop after close, need to check if fd_entry still exists
634 _dispatch_retain(channel);
635 dispatch_async(_dispatch_io_fds_lockq, ^{
636 _dispatch_io_debug("io stop after close cleanup",
637 channel->fd);
638 dispatch_fd_entry_t fdi;
639 uintptr_t hash = DIO_HASH(channel->fd);
640 TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
641 if (fdi->fd == channel->fd) {
642 _dispatch_fd_entry_cleanup_operations(fdi, channel);
643 break;
644 }
645 }
646 _dispatch_release(channel);
647 });
648 }
649 _dispatch_release(channel);
650 });
651 });
652 }
653
654 void
655 dispatch_io_close(dispatch_io_t channel, unsigned long flags)
656 {
657 if (flags & DISPATCH_IO_STOP) {
658 // Don't stop an already stopped channel
659 if (channel->atomic_flags & DIO_STOPPED) {
660 return;
661 }
662 return _dispatch_io_stop(channel);
663 }
664 // Don't close an already closed or stopped channel
665 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
666 return;
667 }
668 _dispatch_retain(channel);
669 dispatch_async(channel->queue, ^{
670 dispatch_async(channel->barrier_queue, ^{
671 _dispatch_io_debug("io close", channel->fd);
672 if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
673 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED);
674 dispatch_fd_entry_t fd_entry = channel->fd_entry;
675 if (!fd_entry->path_data) {
676 channel->fd_entry = NULL;
677 }
678 _dispatch_fd_entry_release(fd_entry);
679 }
680 _dispatch_release(channel);
681 });
682 });
683 }
684
685 void
686 dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
687 {
688 _dispatch_retain(channel);
689 dispatch_async(channel->queue, ^{
690 dispatch_queue_t io_q = channel->do_targetq;
691 dispatch_queue_t barrier_queue = channel->barrier_queue;
692 dispatch_group_t barrier_group = channel->barrier_group;
693 dispatch_async(barrier_queue, ^{
694 dispatch_suspend(barrier_queue);
695 dispatch_group_notify(barrier_group, io_q, ^{
696 _dispatch_thread_setspecific(dispatch_io_key, channel);
697 barrier();
698 _dispatch_thread_setspecific(dispatch_io_key, NULL);
699 dispatch_resume(barrier_queue);
700 _dispatch_release(channel);
701 });
702 });
703 });
704 }
705
706 void
707 dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
708 dispatch_queue_t queue, dispatch_io_handler_t handler)
709 {
710 _dispatch_retain(channel);
711 _dispatch_retain(queue);
712 dispatch_async(channel->queue, ^{
713 dispatch_operation_t op;
714 op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
715 length, dispatch_data_empty, queue, handler);
716 if (op) {
717 dispatch_queue_t barrier_q = channel->barrier_queue;
718 dispatch_async(barrier_q, ^{
719 _dispatch_operation_enqueue(op, DOP_DIR_READ,
720 dispatch_data_empty);
721 });
722 }
723 _dispatch_release(channel);
724 _dispatch_release(queue);
725 });
726 }
727
728 void
729 dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
730 dispatch_queue_t queue, dispatch_io_handler_t handler)
731 {
732 _dispatch_io_data_retain(data);
733 _dispatch_retain(channel);
734 _dispatch_retain(queue);
735 dispatch_async(channel->queue, ^{
736 dispatch_operation_t op;
737 op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
738 dispatch_data_get_size(data), data, queue, handler);
739 if (op) {
740 dispatch_queue_t barrier_q = channel->barrier_queue;
741 dispatch_async(barrier_q, ^{
742 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
743 _dispatch_io_data_release(data);
744 });
745 } else {
746 _dispatch_io_data_release(data);
747 }
748 _dispatch_release(channel);
749 _dispatch_release(queue);
750 });
751 }
752
753 void
754 dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
755 void (^handler)(dispatch_data_t, int))
756 {
757 _dispatch_retain(queue);
758 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
759 // On barrier queue
760 if (fd_entry->err) {
761 int err = fd_entry->err;
762 dispatch_async(queue, ^{
763 _dispatch_io_debug("convenience handler invoke", fd);
764 handler(dispatch_data_empty, err);
765 });
766 _dispatch_release(queue);
767 return;
768 }
769 // Safe to access fd_entry on barrier queue
770 dispatch_io_t channel = fd_entry->convenience_channel;
771 if (!channel) {
772 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
773 channel->fd = fd;
774 channel->fd_actual = fd;
775 channel->fd_entry = fd_entry;
776 dispatch_retain(fd_entry->barrier_queue);
777 dispatch_retain(fd_entry->barrier_group);
778 channel->barrier_queue = fd_entry->barrier_queue;
779 channel->barrier_group = fd_entry->barrier_group;
780 fd_entry->convenience_channel = channel;
781 }
782 __block dispatch_data_t deliver_data = dispatch_data_empty;
783 __block int err = 0;
784 dispatch_async(fd_entry->close_queue, ^{
785 dispatch_async(queue, ^{
786 _dispatch_io_debug("convenience handler invoke", fd);
787 handler(deliver_data, err);
788 _dispatch_io_data_release(deliver_data);
789 });
790 _dispatch_release(queue);
791 });
792 dispatch_operation_t op =
793 _dispatch_operation_create(DOP_DIR_READ, channel, 0,
794 length, dispatch_data_empty,
795 _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
796 false), ^(bool done, dispatch_data_t data, int error) {
797 if (data) {
798 data = dispatch_data_create_concat(deliver_data, data);
799 _dispatch_io_data_release(deliver_data);
800 deliver_data = data;
801 }
802 if (done) {
803 err = error;
804 }
805 });
806 if (op) {
807 _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
808 }
809 });
810 }
811
812 void
813 dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
814 void (^handler)(dispatch_data_t, int))
815 {
816 _dispatch_io_data_retain(data);
817 _dispatch_retain(queue);
818 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
819 // On barrier queue
820 if (fd_entry->err) {
821 int err = fd_entry->err;
822 dispatch_async(queue, ^{
823 _dispatch_io_debug("convenience handler invoke", fd);
824 handler(NULL, err);
825 });
826 _dispatch_release(queue);
827 return;
828 }
829 // Safe to access fd_entry on barrier queue
830 dispatch_io_t channel = fd_entry->convenience_channel;
831 if (!channel) {
832 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
833 channel->fd = fd;
834 channel->fd_actual = fd;
835 channel->fd_entry = fd_entry;
836 dispatch_retain(fd_entry->barrier_queue);
837 dispatch_retain(fd_entry->barrier_group);
838 channel->barrier_queue = fd_entry->barrier_queue;
839 channel->barrier_group = fd_entry->barrier_group;
840 fd_entry->convenience_channel = channel;
841 }
842 __block dispatch_data_t deliver_data = NULL;
843 __block int err = 0;
844 dispatch_async(fd_entry->close_queue, ^{
845 dispatch_async(queue, ^{
846 _dispatch_io_debug("convenience handler invoke", fd);
847 handler(deliver_data, err);
848 if (deliver_data) {
849 _dispatch_io_data_release(deliver_data);
850 }
851 });
852 _dispatch_release(queue);
853 });
854 dispatch_operation_t op =
855 _dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
856 dispatch_data_get_size(data), data,
857 _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
858 false), ^(bool done, dispatch_data_t d, int error) {
859 if (done) {
860 if (d) {
861 _dispatch_io_data_retain(d);
862 deliver_data = d;
863 }
864 err = error;
865 }
866 });
867 if (op) {
868 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
869 }
870 _dispatch_io_data_release(data);
871 });
872 }
873
874 #pragma mark -
875 #pragma mark dispatch_operation_t
876
877 static dispatch_operation_t
878 _dispatch_operation_create(dispatch_op_direction_t direction,
879 dispatch_io_t channel, off_t offset, size_t length,
880 dispatch_data_t data, dispatch_queue_t queue,
881 dispatch_io_handler_t handler)
882 {
883 // On channel queue
884 dispatch_assert(direction < DOP_DIR_MAX);
885 _dispatch_io_debug("operation create", channel->fd);
886 #if DISPATCH_IO_DEBUG
887 int fd = channel->fd;
888 #endif
889 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
890 // that can only be NULL if atomic_flags are set rdar://problem/8362514
891 int err = _dispatch_io_get_error(NULL, channel, false);
892 if (err || !length) {
893 _dispatch_io_data_retain(data);
894 _dispatch_retain(queue);
895 dispatch_async(channel->barrier_queue, ^{
896 dispatch_async(queue, ^{
897 dispatch_data_t d = data;
898 if (direction == DOP_DIR_READ && err) {
899 d = NULL;
900 } else if (direction == DOP_DIR_WRITE && !err) {
901 d = NULL;
902 }
903 _dispatch_io_debug("IO handler invoke", fd);
904 handler(true, d, err);
905 _dispatch_io_data_release(data);
906 });
907 _dispatch_release(queue);
908 });
909 return NULL;
910 }
911 dispatch_operation_t op;
912 op = calloc(1ul, sizeof(struct dispatch_operation_s));
913 op->do_vtable = &_dispatch_operation_vtable;
914 op->do_next = DISPATCH_OBJECT_LISTLESS;
915 op->do_ref_cnt = 1;
916 op->do_xref_cnt = 0; // operation object is not exposed externally
917 op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
918 op->op_q->do_targetq = queue;
919 _dispatch_retain(queue);
920 op->active = false;
921 op->direction = direction;
922 op->offset = offset + channel->f_ptr;
923 op->length = length;
924 op->handler = Block_copy(handler);
925 _dispatch_retain(channel);
926 op->channel = channel;
927 op->params = channel->params;
928 // Take a snapshot of the priority of the channel queue. The actual I/O
929 // for this operation will be performed at this priority
930 dispatch_queue_t targetq = op->channel->do_targetq;
931 while (fastpath(targetq->do_targetq)) {
932 targetq = targetq->do_targetq;
933 }
934 op->do_targetq = targetq;
935 return op;
936 }
937
938 static void
939 _dispatch_operation_dispose(dispatch_operation_t op)
940 {
941 // Deliver the data if there's any
942 if (op->fd_entry) {
943 _dispatch_operation_deliver_data(op, DOP_DONE);
944 dispatch_group_leave(op->fd_entry->barrier_group);
945 _dispatch_fd_entry_release(op->fd_entry);
946 }
947 if (op->channel) {
948 _dispatch_release(op->channel);
949 }
950 if (op->timer) {
951 dispatch_release(op->timer);
952 }
953 // For write operations, op->buf is owned by op->buf_data
954 if (op->buf && op->direction == DOP_DIR_READ) {
955 free(op->buf);
956 }
957 if (op->buf_data) {
958 _dispatch_io_data_release(op->buf_data);
959 }
960 if (op->data) {
961 _dispatch_io_data_release(op->data);
962 }
963 if (op->op_q) {
964 dispatch_release(op->op_q);
965 }
966 Block_release(op->handler);
967 _dispatch_dispose(op);
968 }
969
970 static void
971 _dispatch_operation_enqueue(dispatch_operation_t op,
972 dispatch_op_direction_t direction, dispatch_data_t data)
973 {
974 // Called from the barrier queue
975 _dispatch_io_data_retain(data);
976 // If channel is closed or stopped, then call the handler immediately
977 int err = _dispatch_io_get_error(NULL, op->channel, false);
978 if (err) {
979 dispatch_io_handler_t handler = op->handler;
980 dispatch_async(op->op_q, ^{
981 dispatch_data_t d = data;
982 if (direction == DOP_DIR_READ && err) {
983 d = NULL;
984 } else if (direction == DOP_DIR_WRITE && !err) {
985 d = NULL;
986 }
987 handler(true, d, err);
988 _dispatch_io_data_release(data);
989 });
990 _dispatch_release(op);
991 return;
992 }
993 // Finish operation init
994 op->fd_entry = op->channel->fd_entry;
995 _dispatch_fd_entry_retain(op->fd_entry);
996 dispatch_group_enter(op->fd_entry->barrier_group);
997 dispatch_disk_t disk = op->fd_entry->disk;
998 if (!disk) {
999 dispatch_stream_t stream = op->fd_entry->streams[direction];
1000 dispatch_async(stream->dq, ^{
1001 _dispatch_stream_enqueue_operation(stream, op, data);
1002 _dispatch_io_data_release(data);
1003 });
1004 } else {
1005 dispatch_async(disk->pick_queue, ^{
1006 _dispatch_disk_enqueue_operation(disk, op, data);
1007 _dispatch_io_data_release(data);
1008 });
1009 }
1010 }
1011
1012 static bool
1013 _dispatch_operation_should_enqueue(dispatch_operation_t op,
1014 dispatch_queue_t tq, dispatch_data_t data)
1015 {
1016 // On stream queue or disk queue
1017 _dispatch_io_debug("enqueue operation", op->fd_entry->fd);
1018 _dispatch_io_data_retain(data);
1019 op->data = data;
1020 int err = _dispatch_io_get_error(op, NULL, true);
1021 if (err) {
1022 op->err = err;
1023 // Final release
1024 _dispatch_release(op);
1025 return false;
1026 }
1027 if (op->params.interval) {
1028 dispatch_resume(_dispatch_operation_timer(tq, op));
1029 }
1030 return true;
1031 }
1032
1033 static dispatch_source_t
1034 _dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
1035 {
1036 // On stream queue or pick queue
1037 if (op->timer) {
1038 return op->timer;
1039 }
1040 dispatch_source_t timer = dispatch_source_create(
1041 DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
1042 dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW,
1043 op->params.interval), op->params.interval, 0);
1044 dispatch_source_set_event_handler(timer, ^{
1045 // On stream queue or pick queue
1046 if (dispatch_source_testcancel(timer)) {
1047 // Do nothing. The operation has already completed
1048 return;
1049 }
1050 dispatch_op_flags_t flags = DOP_DEFAULT;
1051 if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
1052 // Deliver even if there is less data than the low-water mark
1053 flags |= DOP_DELIVER;
1054 }
1055 // If the operation is active, dont deliver data
1056 if ((op->active) && (flags & DOP_DELIVER)) {
1057 op->flags = flags;
1058 } else {
1059 _dispatch_operation_deliver_data(op, flags);
1060 }
1061 });
1062 op->timer = timer;
1063 return op->timer;
1064 }
1065
1066 #pragma mark -
1067 #pragma mark dispatch_fd_entry_t
1068
1069 static inline void
1070 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
1071 dispatch_suspend(fd_entry->close_queue);
1072 }
1073
1074 static inline void
1075 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
1076 dispatch_resume(fd_entry->close_queue);
1077 }
1078
1079 static void
1080 _dispatch_fd_entry_init_async(dispatch_fd_t fd,
1081 dispatch_fd_entry_init_callback_t completion_callback)
1082 {
1083 static dispatch_once_t _dispatch_io_fds_lockq_pred;
1084 dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
1085 _dispatch_io_fds_lockq_init);
1086 dispatch_async(_dispatch_io_fds_lockq, ^{
1087 _dispatch_io_debug("fd entry init", fd);
1088 dispatch_fd_entry_t fd_entry = NULL;
1089 // Check to see if there is an existing entry for the given fd
1090 uintptr_t hash = DIO_HASH(fd);
1091 TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
1092 if (fd_entry->fd == fd) {
1093 // Retain the fd_entry to ensure it cannot go away until the
1094 // stat() has completed
1095 _dispatch_fd_entry_retain(fd_entry);
1096 break;
1097 }
1098 }
1099 if (!fd_entry) {
1100 // If we did not find an existing entry, create one
1101 fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
1102 }
1103 dispatch_async(fd_entry->barrier_queue, ^{
1104 _dispatch_io_debug("fd entry init completion", fd);
1105 completion_callback(fd_entry);
1106 // stat() is complete, release reference to fd_entry
1107 _dispatch_fd_entry_release(fd_entry);
1108 });
1109 });
1110 }
1111
1112 static dispatch_fd_entry_t
1113 _dispatch_fd_entry_create(dispatch_queue_t q)
1114 {
1115 dispatch_fd_entry_t fd_entry;
1116 fd_entry = calloc(1ul, sizeof(struct dispatch_fd_entry_s));
1117 fd_entry->close_queue = dispatch_queue_create(
1118 "com.apple.libdispatch-io.closeq", NULL);
1119 // Use target queue to ensure that no concurrent lookups are going on when
1120 // the close queue is running
1121 fd_entry->close_queue->do_targetq = q;
1122 _dispatch_retain(q);
1123 // Suspend the cleanup queue until closing
1124 _dispatch_fd_entry_retain(fd_entry);
1125 return fd_entry;
1126 }
1127
1128 static dispatch_fd_entry_t
1129 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
1130 {
1131 // On fds lock queue
1132 _dispatch_io_debug("fd entry create", fd);
1133 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1134 _dispatch_io_fds_lockq);
1135 fd_entry->fd = fd;
1136 TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
1137 fd_entry->barrier_queue = dispatch_queue_create(
1138 "com.apple.libdispatch-io.barrierq", NULL);
1139 fd_entry->barrier_group = dispatch_group_create();
1140 dispatch_async(fd_entry->barrier_queue, ^{
1141 _dispatch_io_debug("fd entry stat", fd);
1142 int err, orig_flags, orig_nosigpipe = -1;
1143 struct stat st;
1144 _dispatch_io_syscall_switch(err,
1145 fstat(fd, &st),
1146 default: fd_entry->err = err; return;
1147 );
1148 fd_entry->stat.dev = st.st_dev;
1149 fd_entry->stat.mode = st.st_mode;
1150 _dispatch_io_syscall_switch(err,
1151 orig_flags = fcntl(fd, F_GETFL),
1152 default: (void)dispatch_assume_zero(err); break;
1153 );
1154 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1155 if (S_ISFIFO(st.st_mode)) {
1156 _dispatch_io_syscall_switch(err,
1157 orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
1158 default: (void)dispatch_assume_zero(err); break;
1159 );
1160 if (orig_nosigpipe != -1) {
1161 _dispatch_io_syscall_switch(err,
1162 orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
1163 default:
1164 orig_nosigpipe = -1;
1165 (void)dispatch_assume_zero(err);
1166 break;
1167 );
1168 }
1169 }
1170 #endif
1171 if (S_ISREG(st.st_mode)) {
1172 if (orig_flags != -1) {
1173 _dispatch_io_syscall_switch(err,
1174 fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
1175 default:
1176 orig_flags = -1;
1177 (void)dispatch_assume_zero(err);
1178 break;
1179 );
1180 }
1181 int32_t dev = major(st.st_dev);
1182 // We have to get the disk on the global dev queue. The
1183 // barrier queue cannot continue until that is complete
1184 dispatch_suspend(fd_entry->barrier_queue);
1185 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
1186 _dispatch_io_devs_lockq_init);
1187 dispatch_async(_dispatch_io_devs_lockq, ^{
1188 _dispatch_disk_init(fd_entry, dev);
1189 dispatch_resume(fd_entry->barrier_queue);
1190 });
1191 } else {
1192 if (orig_flags != -1) {
1193 _dispatch_io_syscall_switch(err,
1194 fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
1195 default:
1196 orig_flags = -1;
1197 (void)dispatch_assume_zero(err);
1198 break;
1199 );
1200 }
1201 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1202 DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
1203 }
1204 fd_entry->orig_flags = orig_flags;
1205 fd_entry->orig_nosigpipe = orig_nosigpipe;
1206 });
1207 // This is the first item run when the close queue is resumed, indicating
1208 // that all channels associated with this entry have been closed and that
1209 // all operations associated with this entry have been freed
1210 dispatch_async(fd_entry->close_queue, ^{
1211 if (!fd_entry->disk) {
1212 _dispatch_io_debug("close queue fd_entry cleanup", fd);
1213 dispatch_op_direction_t dir;
1214 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1215 _dispatch_stream_dispose(fd_entry, dir);
1216 }
1217 } else {
1218 dispatch_disk_t disk = fd_entry->disk;
1219 dispatch_async(_dispatch_io_devs_lockq, ^{
1220 _dispatch_release(disk);
1221 });
1222 }
1223 // Remove this entry from the global fd list
1224 TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
1225 });
1226 // If there was a source associated with this stream, disposing of the
1227 // source cancels it and suspends the close queue. Freeing the fd_entry
1228 // structure must happen after the source cancel handler has finished
1229 dispatch_async(fd_entry->close_queue, ^{
1230 _dispatch_io_debug("close queue release", fd);
1231 dispatch_release(fd_entry->close_queue);
1232 _dispatch_io_debug("barrier queue release", fd);
1233 dispatch_release(fd_entry->barrier_queue);
1234 _dispatch_io_debug("barrier group release", fd);
1235 dispatch_release(fd_entry->barrier_group);
1236 if (fd_entry->orig_flags != -1) {
1237 _dispatch_io_syscall(
1238 fcntl(fd, F_SETFL, fd_entry->orig_flags)
1239 );
1240 }
1241 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1242 if (fd_entry->orig_nosigpipe != -1) {
1243 _dispatch_io_syscall(
1244 fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
1245 );
1246 }
1247 #endif
1248 if (fd_entry->convenience_channel) {
1249 fd_entry->convenience_channel->fd_entry = NULL;
1250 dispatch_release(fd_entry->convenience_channel);
1251 }
1252 free(fd_entry);
1253 });
1254 return fd_entry;
1255 }
1256
1257 static dispatch_fd_entry_t
1258 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
1259 dev_t dev, mode_t mode)
1260 {
1261 // On devs lock queue
1262 _dispatch_io_debug("fd entry create with path %s", -1, path_data->path);
1263 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1264 path_data->channel->queue);
1265 if (S_ISREG(mode)) {
1266 _dispatch_disk_init(fd_entry, major(dev));
1267 } else {
1268 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1269 DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
1270 }
1271 fd_entry->fd = -1;
1272 fd_entry->orig_flags = -1;
1273 fd_entry->path_data = path_data;
1274 fd_entry->stat.dev = dev;
1275 fd_entry->stat.mode = mode;
1276 fd_entry->barrier_queue = dispatch_queue_create(
1277 "com.apple.libdispatch-io.barrierq", NULL);
1278 fd_entry->barrier_group = dispatch_group_create();
1279 // This is the first item run when the close queue is resumed, indicating
1280 // that the channel associated with this entry has been closed and that
1281 // all operations associated with this entry have been freed
1282 dispatch_async(fd_entry->close_queue, ^{
1283 _dispatch_io_debug("close queue fd_entry cleanup", -1);
1284 if (!fd_entry->disk) {
1285 dispatch_op_direction_t dir;
1286 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1287 _dispatch_stream_dispose(fd_entry, dir);
1288 }
1289 }
1290 if (fd_entry->fd != -1) {
1291 close(fd_entry->fd);
1292 }
1293 if (fd_entry->path_data->channel) {
1294 // If associated channel has not been released yet, mark it as
1295 // no longer having an fd_entry (for stop after close).
1296 // It is safe to modify channel since we are on close_queue with
1297 // target queue the channel queue
1298 fd_entry->path_data->channel->fd_entry = NULL;
1299 }
1300 });
1301 dispatch_async(fd_entry->close_queue, ^{
1302 _dispatch_io_debug("close queue release", -1);
1303 dispatch_release(fd_entry->close_queue);
1304 dispatch_release(fd_entry->barrier_queue);
1305 dispatch_release(fd_entry->barrier_group);
1306 free(fd_entry->path_data);
1307 free(fd_entry);
1308 });
1309 return fd_entry;
1310 }
1311
1312 static int
1313 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
1314 {
1315 if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
1316 return 0;
1317 }
1318 if (fd_entry->err) {
1319 return fd_entry->err;
1320 }
1321 int fd = -1;
1322 int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
1323 fd_entry->path_data->oflag | O_NONBLOCK;
1324 open:
1325 fd = open(fd_entry->path_data->path, oflag, fd_entry->path_data->mode);
1326 if (fd == -1) {
1327 int err = errno;
1328 if (err == EINTR) {
1329 goto open;
1330 }
1331 (void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err);
1332 return err;
1333 }
1334 if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd)) {
1335 // Lost the race with another open
1336 close(fd);
1337 } else {
1338 channel->fd_actual = fd;
1339 }
1340 return 0;
1341 }
1342
1343 static void
1344 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
1345 dispatch_io_t channel)
1346 {
1347 if (fd_entry->disk) {
1348 if (channel) {
1349 _dispatch_retain(channel);
1350 }
1351 _dispatch_fd_entry_retain(fd_entry);
1352 dispatch_async(fd_entry->disk->pick_queue, ^{
1353 _dispatch_disk_cleanup_operations(fd_entry->disk, channel);
1354 _dispatch_fd_entry_release(fd_entry);
1355 if (channel) {
1356 _dispatch_release(channel);
1357 }
1358 });
1359 } else {
1360 dispatch_op_direction_t direction;
1361 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1362 dispatch_stream_t stream = fd_entry->streams[direction];
1363 if (!stream) {
1364 continue;
1365 }
1366 if (channel) {
1367 _dispatch_retain(channel);
1368 }
1369 _dispatch_fd_entry_retain(fd_entry);
1370 dispatch_async(stream->dq, ^{
1371 _dispatch_stream_cleanup_operations(stream, channel);
1372 _dispatch_fd_entry_release(fd_entry);
1373 if (channel) {
1374 _dispatch_release(channel);
1375 }
1376 });
1377 }
1378 }
1379 }
1380
1381 #pragma mark -
1382 #pragma mark dispatch_stream_t/dispatch_disk_t
1383
1384 static void
1385 _dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
1386 {
1387 dispatch_op_direction_t direction;
1388 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1389 dispatch_stream_t stream;
1390 stream = calloc(1ul, sizeof(struct dispatch_stream_s));
1391 stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq",
1392 NULL);
1393 _dispatch_retain(tq);
1394 stream->dq->do_targetq = tq;
1395 TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
1396 TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
1397 fd_entry->streams[direction] = stream;
1398 }
1399 }
1400
1401 static void
1402 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
1403 dispatch_op_direction_t direction)
1404 {
1405 // On close queue
1406 dispatch_stream_t stream = fd_entry->streams[direction];
1407 if (!stream) {
1408 return;
1409 }
1410 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1411 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
1412 if (stream->source) {
1413 // Balanced by source cancel handler:
1414 _dispatch_fd_entry_retain(fd_entry);
1415 dispatch_source_cancel(stream->source);
1416 dispatch_resume(stream->source);
1417 dispatch_release(stream->source);
1418 }
1419 dispatch_release(stream->dq);
1420 free(stream);
1421 }
1422
1423 static void
1424 _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
1425 {
1426 // On devs lock queue
1427 dispatch_disk_t disk;
1428 char label_name[256];
1429 // Check to see if there is an existing entry for the given device
1430 uintptr_t hash = DIO_HASH(dev);
1431 TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
1432 if (disk->dev == dev) {
1433 _dispatch_retain(disk);
1434 goto out;
1435 }
1436 }
1437 // Otherwise create a new entry
1438 size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
1439 disk = calloc(1ul, sizeof(struct dispatch_disk_s) + (pending_reqs_depth *
1440 sizeof(dispatch_operation_t)));
1441 disk->do_vtable = &_dispatch_disk_vtable;
1442 disk->do_next = DISPATCH_OBJECT_LISTLESS;
1443 disk->do_ref_cnt = 1;
1444 disk->do_xref_cnt = 0;
1445 disk->advise_list_depth = pending_reqs_depth;
1446 disk->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
1447 false);
1448 disk->dev = dev;
1449 TAILQ_INIT(&disk->operations);
1450 disk->cur_rq = TAILQ_FIRST(&disk->operations);
1451 sprintf(label_name, "com.apple.libdispatch-io.deviceq.%d", dev);
1452 disk->pick_queue = dispatch_queue_create(label_name, NULL);
1453 TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
1454 out:
1455 fd_entry->disk = disk;
1456 TAILQ_INIT(&fd_entry->stream_ops);
1457 }
1458
1459 static void
1460 _dispatch_disk_dispose(dispatch_disk_t disk)
1461 {
1462 uintptr_t hash = DIO_HASH(disk->dev);
1463 TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
1464 dispatch_assert(TAILQ_EMPTY(&disk->operations));
1465 size_t i;
1466 for (i=0; i<disk->advise_list_depth; ++i) {
1467 dispatch_assert(!disk->advise_list[i]);
1468 }
1469 dispatch_release(disk->pick_queue);
1470 free(disk);
1471 }
1472
1473 #pragma mark -
1474 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1475
1476 static inline bool
1477 _dispatch_stream_operation_avail(dispatch_stream_t stream)
1478 {
1479 return !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
1480 !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1481 }
1482
1483 static void
1484 _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
1485 dispatch_operation_t op, dispatch_data_t data)
1486 {
1487 if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
1488 return;
1489 }
1490 bool no_ops = !_dispatch_stream_operation_avail(stream);
1491 TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
1492 if (no_ops) {
1493 dispatch_async_f(stream->dq, stream, _dispatch_stream_handler);
1494 }
1495 }
1496
1497 static void
1498 _dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
1499 dispatch_data_t data)
1500 {
1501 if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
1502 return;
1503 }
1504 if (op->params.type == DISPATCH_IO_STREAM) {
1505 if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
1506 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1507 }
1508 TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
1509 } else {
1510 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1511 }
1512 _dispatch_disk_handler(disk);
1513 }
1514
1515 static void
1516 _dispatch_stream_complete_operation(dispatch_stream_t stream,
1517 dispatch_operation_t op)
1518 {
1519 // On stream queue
1520 _dispatch_io_debug("complete operation", op->fd_entry->fd);
1521 TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
1522 if (op == stream->op) {
1523 stream->op = NULL;
1524 }
1525 if (op->timer) {
1526 dispatch_source_cancel(op->timer);
1527 }
1528 // Final release will deliver any pending data
1529 _dispatch_release(op);
1530 }
1531
1532 static void
1533 _dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
1534 {
1535 // On pick queue
1536 _dispatch_io_debug("complete operation", op->fd_entry->fd);
1537 // Current request is always the last op returned
1538 if (disk->cur_rq == op) {
1539 disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
1540 operation_list);
1541 }
1542 if (op->params.type == DISPATCH_IO_STREAM) {
1543 // Check if there are other pending stream operations behind it
1544 dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
1545 TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
1546 if (op_next) {
1547 TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
1548 }
1549 }
1550 TAILQ_REMOVE(&disk->operations, op, operation_list);
1551 if (op->timer) {
1552 dispatch_source_cancel(op->timer);
1553 }
1554 // Final release will deliver any pending data
1555 _dispatch_release(op);
1556 }
1557
1558 static dispatch_operation_t
1559 _dispatch_stream_pick_next_operation(dispatch_stream_t stream,
1560 dispatch_operation_t op)
1561 {
1562 // On stream queue
1563 if (!op) {
1564 // On the first run through, pick the first operation
1565 if (!_dispatch_stream_operation_avail(stream)) {
1566 return op;
1567 }
1568 if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
1569 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
1570 } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
1571 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1572 }
1573 return op;
1574 }
1575 if (op->params.type == DISPATCH_IO_STREAM) {
1576 // Stream operations need to be serialized so continue the current
1577 // operation until it is finished
1578 return op;
1579 }
1580 // Get the next random operation (round-robin)
1581 if (op->params.type == DISPATCH_IO_RANDOM) {
1582 op = TAILQ_NEXT(op, operation_list);
1583 if (!op) {
1584 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1585 }
1586 return op;
1587 }
1588 return NULL;
1589 }
1590
1591 static dispatch_operation_t
1592 _dispatch_disk_pick_next_operation(dispatch_disk_t disk)
1593 {
1594 // On pick queue
1595 dispatch_operation_t op;
1596 if (!TAILQ_EMPTY(&disk->operations)) {
1597 if (disk->cur_rq == NULL) {
1598 op = TAILQ_FIRST(&disk->operations);
1599 } else {
1600 op = disk->cur_rq;
1601 do {
1602 op = TAILQ_NEXT(op, operation_list);
1603 if (!op) {
1604 op = TAILQ_FIRST(&disk->operations);
1605 }
1606 // TODO: more involved picking algorithm rdar://problem/8780312
1607 } while (op->active && op != disk->cur_rq);
1608 }
1609 if (!op->active) {
1610 disk->cur_rq = op;
1611 return op;
1612 }
1613 }
1614 return NULL;
1615 }
1616
1617 static void
1618 _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
1619 dispatch_io_t channel)
1620 {
1621 // On stream queue
1622 dispatch_operation_t op, tmp;
1623 typeof(*stream->operations) *operations;
1624 operations = &stream->operations[DISPATCH_IO_RANDOM];
1625 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1626 if (!channel || op->channel == channel) {
1627 _dispatch_stream_complete_operation(stream, op);
1628 }
1629 }
1630 operations = &stream->operations[DISPATCH_IO_STREAM];
1631 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1632 if (!channel || op->channel == channel) {
1633 _dispatch_stream_complete_operation(stream, op);
1634 }
1635 }
1636 if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
1637 dispatch_suspend(stream->source);
1638 stream->source_running = false;
1639 }
1640 }
1641
1642 static void
1643 _dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
1644 {
1645 // On pick queue
1646 dispatch_operation_t op, tmp;
1647 TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
1648 if (!channel || op->channel == channel) {
1649 _dispatch_disk_complete_operation(disk, op);
1650 }
1651 }
1652 }
1653
1654 #pragma mark -
1655 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1656
1657 static dispatch_source_t
1658 _dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
1659 {
1660 // On stream queue
1661 if (stream->source) {
1662 return stream->source;
1663 }
1664 dispatch_fd_t fd = op->fd_entry->fd;
1665 _dispatch_io_debug("stream source create", fd);
1666 dispatch_source_t source = NULL;
1667 if (op->direction == DOP_DIR_READ) {
1668 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0,
1669 stream->dq);
1670 } else if (op->direction == DOP_DIR_WRITE) {
1671 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fd, 0,
1672 stream->dq);
1673 } else {
1674 dispatch_assert(op->direction < DOP_DIR_MAX);
1675 return NULL;
1676 }
1677 dispatch_set_context(source, stream);
1678 dispatch_source_set_event_handler_f(source,
1679 _dispatch_stream_source_handler);
1680 // Close queue must not run user cleanup handlers until sources are fully
1681 // unregistered
1682 dispatch_queue_t close_queue = op->fd_entry->close_queue;
1683 dispatch_source_set_cancel_handler(source, ^{
1684 _dispatch_io_debug("stream source cancel", fd);
1685 dispatch_resume(close_queue);
1686 });
1687 stream->source = source;
1688 return stream->source;
1689 }
1690
1691 static void
1692 _dispatch_stream_source_handler(void *ctx)
1693 {
1694 // On stream queue
1695 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1696 dispatch_suspend(stream->source);
1697 stream->source_running = false;
1698 return _dispatch_stream_handler(stream);
1699 }
1700
1701 static void
1702 _dispatch_stream_handler(void *ctx)
1703 {
1704 // On stream queue
1705 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1706 dispatch_operation_t op;
1707 pick:
1708 op = _dispatch_stream_pick_next_operation(stream, stream->op);
1709 if (!op) {
1710 _dispatch_debug("no operation found: stream %p", stream);
1711 return;
1712 }
1713 int err = _dispatch_io_get_error(op, NULL, true);
1714 if (err) {
1715 op->err = err;
1716 _dispatch_stream_complete_operation(stream, op);
1717 goto pick;
1718 }
1719 stream->op = op;
1720 _dispatch_io_debug("stream handler", op->fd_entry->fd);
1721 dispatch_fd_entry_t fd_entry = op->fd_entry;
1722 _dispatch_fd_entry_retain(fd_entry);
1723 // For performance analysis
1724 if (!op->total && dispatch_io_defaults.initial_delivery) {
1725 // Empty delivery to signal the start of the operation
1726 _dispatch_io_debug("initial delivery", op->fd_entry->fd);
1727 _dispatch_operation_deliver_data(op, DOP_DELIVER);
1728 }
1729 // TODO: perform on the operation target queue to get correct priority
1730 int result = _dispatch_operation_perform(op), flags = -1;
1731 switch (result) {
1732 case DISPATCH_OP_DELIVER:
1733 flags = DOP_DEFAULT;
1734 // Fall through
1735 case DISPATCH_OP_DELIVER_AND_COMPLETE:
1736 flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
1737 DOP_DEFAULT;
1738 _dispatch_operation_deliver_data(op, flags);
1739 // Fall through
1740 case DISPATCH_OP_COMPLETE:
1741 if (flags != DOP_DEFAULT) {
1742 _dispatch_stream_complete_operation(stream, op);
1743 }
1744 if (_dispatch_stream_operation_avail(stream)) {
1745 dispatch_async_f(stream->dq, stream, _dispatch_stream_handler);
1746 }
1747 break;
1748 case DISPATCH_OP_COMPLETE_RESUME:
1749 _dispatch_stream_complete_operation(stream, op);
1750 // Fall through
1751 case DISPATCH_OP_RESUME:
1752 if (_dispatch_stream_operation_avail(stream)) {
1753 stream->source_running = true;
1754 dispatch_resume(_dispatch_stream_source(stream, op));
1755 }
1756 break;
1757 case DISPATCH_OP_ERR:
1758 _dispatch_stream_cleanup_operations(stream, op->channel);
1759 break;
1760 case DISPATCH_OP_FD_ERR:
1761 _dispatch_fd_entry_retain(fd_entry);
1762 dispatch_async(fd_entry->barrier_queue, ^{
1763 _dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
1764 _dispatch_fd_entry_release(fd_entry);
1765 });
1766 break;
1767 default:
1768 break;
1769 }
1770 _dispatch_fd_entry_release(fd_entry);
1771 return;
1772 }
1773
1774 static void
1775 _dispatch_disk_handler(void *ctx)
1776 {
1777 // On pick queue
1778 dispatch_disk_t disk = (dispatch_disk_t)ctx;
1779 if (disk->io_active) {
1780 return;
1781 }
1782 _dispatch_io_debug("disk handler", -1);
1783 dispatch_operation_t op;
1784 size_t i = disk->free_idx, j = disk->req_idx;
1785 if (j <= i) {
1786 j += disk->advise_list_depth;
1787 }
1788 while (i <= j) {
1789 if ((!disk->advise_list[i%disk->advise_list_depth]) &&
1790 (op = _dispatch_disk_pick_next_operation(disk))) {
1791 int err = _dispatch_io_get_error(op, NULL, true);
1792 if (err) {
1793 op->err = err;
1794 _dispatch_disk_complete_operation(disk, op);
1795 continue;
1796 }
1797 _dispatch_retain(op);
1798 disk->advise_list[i%disk->advise_list_depth] = op;
1799 op->active = true;
1800 } else {
1801 // No more operations to get
1802 break;
1803 }
1804 i++;
1805 }
1806 disk->free_idx = (i%disk->advise_list_depth);
1807 op = disk->advise_list[disk->req_idx];
1808 if (op) {
1809 disk->io_active = true;
1810 dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
1811 }
1812 }
1813
1814 static void
1815 _dispatch_disk_perform(void *ctxt)
1816 {
1817 dispatch_disk_t disk = ctxt;
1818 size_t chunk_size = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
1819 _dispatch_io_debug("disk perform", -1);
1820 dispatch_operation_t op;
1821 size_t i = disk->advise_idx, j = disk->free_idx;
1822 if (j <= i) {
1823 j += disk->advise_list_depth;
1824 }
1825 do {
1826 op = disk->advise_list[i%disk->advise_list_depth];
1827 if (!op) {
1828 // Nothing more to advise, must be at free_idx
1829 dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
1830 break;
1831 }
1832 if (op->direction == DOP_DIR_WRITE) {
1833 // TODO: preallocate writes ? rdar://problem/9032172
1834 continue;
1835 }
1836 if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
1837 op->channel)) {
1838 continue;
1839 }
1840 // For performance analysis
1841 if (!op->total && dispatch_io_defaults.initial_delivery) {
1842 // Empty delivery to signal the start of the operation
1843 _dispatch_io_debug("initial delivery", op->fd_entry->fd);
1844 _dispatch_operation_deliver_data(op, DOP_DELIVER);
1845 }
1846 // Advise two chunks if the list only has one element and this is the
1847 // first advise on the operation
1848 if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
1849 !op->advise_offset) {
1850 chunk_size *= 2;
1851 }
1852 _dispatch_operation_advise(op, chunk_size);
1853 } while (++i < j);
1854 disk->advise_idx = i%disk->advise_list_depth;
1855 op = disk->advise_list[disk->req_idx];
1856 int result = _dispatch_operation_perform(op);
1857 disk->advise_list[disk->req_idx] = NULL;
1858 disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
1859 dispatch_async(disk->pick_queue, ^{
1860 switch (result) {
1861 case DISPATCH_OP_DELIVER:
1862 _dispatch_operation_deliver_data(op, DOP_DEFAULT);
1863 break;
1864 case DISPATCH_OP_COMPLETE:
1865 _dispatch_disk_complete_operation(disk, op);
1866 break;
1867 case DISPATCH_OP_DELIVER_AND_COMPLETE:
1868 _dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
1869 _dispatch_disk_complete_operation(disk, op);
1870 break;
1871 case DISPATCH_OP_ERR:
1872 _dispatch_disk_cleanup_operations(disk, op->channel);
1873 break;
1874 case DISPATCH_OP_FD_ERR:
1875 _dispatch_disk_cleanup_operations(disk, NULL);
1876 break;
1877 default:
1878 dispatch_assert(result);
1879 break;
1880 }
1881 op->active = false;
1882 disk->io_active = false;
1883 _dispatch_disk_handler(disk);
1884 // Balancing the retain in _dispatch_disk_handler. Note that op must be
1885 // released at the very end, since it might hold the last reference to
1886 // the disk
1887 _dispatch_release(op);
1888 });
1889 }
1890
1891 #pragma mark -
1892 #pragma mark dispatch_operation_perform
1893
1894 static void
1895 _dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
1896 {
1897 int err;
1898 struct radvisory advise;
1899 // No point in issuing a read advise for the next chunk if we are already
1900 // a chunk ahead from reading the bytes
1901 if (op->advise_offset > (off_t)((op->offset+op->total) + chunk_size +
1902 PAGE_SIZE)) {
1903 return;
1904 }
1905 advise.ra_count = (int)chunk_size;
1906 if (!op->advise_offset) {
1907 op->advise_offset = op->offset;
1908 // If this is the first time through, align the advised range to a
1909 // page boundary
1910 size_t pg_fraction = (size_t)((op->offset + chunk_size) % PAGE_SIZE);
1911 advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
1912 }
1913 advise.ra_offset = op->advise_offset;
1914 op->advise_offset += advise.ra_count;
1915 _dispatch_io_syscall_switch(err,
1916 fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
1917 // TODO: set disk status on error
1918 default: (void)dispatch_assume_zero(err); break;
1919 );
1920 }
1921
1922 static int
1923 _dispatch_operation_perform(dispatch_operation_t op)
1924 {
1925 int err = _dispatch_io_get_error(op, NULL, true);
1926 if (err) {
1927 goto error;
1928 }
1929 if (!op->buf) {
1930 size_t max_buf_siz = op->params.high;
1931 size_t chunk_siz = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
1932 if (op->direction == DOP_DIR_READ) {
1933 // If necessary, create a buffer for the ongoing operation, large
1934 // enough to fit chunk_pages but at most high-water
1935 size_t data_siz = dispatch_data_get_size(op->data);
1936 if (data_siz) {
1937 dispatch_assert(data_siz < max_buf_siz);
1938 max_buf_siz -= data_siz;
1939 }
1940 if (max_buf_siz > chunk_siz) {
1941 max_buf_siz = chunk_siz;
1942 }
1943 if (op->length < SIZE_MAX) {
1944 op->buf_siz = op->length - op->total;
1945 if (op->buf_siz > max_buf_siz) {
1946 op->buf_siz = max_buf_siz;
1947 }
1948 } else {
1949 op->buf_siz = max_buf_siz;
1950 }
1951 op->buf = valloc(op->buf_siz);
1952 _dispatch_io_debug("buffer allocated", op->fd_entry->fd);
1953 } else if (op->direction == DOP_DIR_WRITE) {
1954 // Always write the first data piece, if that is smaller than a
1955 // chunk, accumulate further data pieces until chunk size is reached
1956 if (chunk_siz > max_buf_siz) {
1957 chunk_siz = max_buf_siz;
1958 }
1959 op->buf_siz = 0;
1960 dispatch_data_apply(op->data,
1961 ^(dispatch_data_t region DISPATCH_UNUSED,
1962 size_t offset DISPATCH_UNUSED,
1963 const void* buf DISPATCH_UNUSED, size_t len) {
1964 size_t siz = op->buf_siz + len;
1965 if (!op->buf_siz || siz <= chunk_siz) {
1966 op->buf_siz = siz;
1967 }
1968 return (bool)(siz < chunk_siz);
1969 });
1970 if (op->buf_siz > max_buf_siz) {
1971 op->buf_siz = max_buf_siz;
1972 }
1973 dispatch_data_t d;
1974 d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
1975 op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
1976 NULL);
1977 _dispatch_io_data_release(d);
1978 _dispatch_io_debug("buffer mapped", op->fd_entry->fd);
1979 }
1980 }
1981 if (op->fd_entry->fd == -1) {
1982 err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
1983 if (err) {
1984 goto error;
1985 }
1986 }
1987 void *buf = op->buf + op->buf_len;
1988 size_t len = op->buf_siz - op->buf_len;
1989 off_t off = op->offset + op->total;
1990 ssize_t processed = -1;
1991 syscall:
1992 if (op->direction == DOP_DIR_READ) {
1993 if (op->params.type == DISPATCH_IO_STREAM) {
1994 processed = read(op->fd_entry->fd, buf, len);
1995 } else if (op->params.type == DISPATCH_IO_RANDOM) {
1996 processed = pread(op->fd_entry->fd, buf, len, off);
1997 }
1998 } else if (op->direction == DOP_DIR_WRITE) {
1999 if (op->params.type == DISPATCH_IO_STREAM) {
2000 processed = write(op->fd_entry->fd, buf, len);
2001 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2002 processed = pwrite(op->fd_entry->fd, buf, len, off);
2003 }
2004 }
2005 // Encountered an error on the file descriptor
2006 if (processed == -1) {
2007 err = errno;
2008 if (err == EINTR) {
2009 goto syscall;
2010 }
2011 goto error;
2012 }
2013 // EOF is indicated by two handler invocations
2014 if (processed == 0) {
2015 _dispatch_io_debug("EOF", op->fd_entry->fd);
2016 return DISPATCH_OP_DELIVER_AND_COMPLETE;
2017 }
2018 op->buf_len += processed;
2019 op->total += processed;
2020 if (op->total == op->length) {
2021 // Finished processing all the bytes requested by the operation
2022 return DISPATCH_OP_COMPLETE;
2023 } else {
2024 // Deliver data only if we satisfy the filters
2025 return DISPATCH_OP_DELIVER;
2026 }
2027 error:
2028 if (err == EAGAIN) {
2029 // For disk based files with blocking I/O we should never get EAGAIN
2030 dispatch_assert(!op->fd_entry->disk);
2031 _dispatch_io_debug("EAGAIN %d", op->fd_entry->fd, err);
2032 if (op->direction == DOP_DIR_READ && op->total &&
2033 op->channel == op->fd_entry->convenience_channel) {
2034 // Convenience read with available data completes on EAGAIN
2035 return DISPATCH_OP_COMPLETE_RESUME;
2036 }
2037 return DISPATCH_OP_RESUME;
2038 }
2039 op->err = err;
2040 switch (err) {
2041 case ECANCELED:
2042 return DISPATCH_OP_ERR;
2043 case EBADF:
2044 (void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err);
2045 return DISPATCH_OP_FD_ERR;
2046 default:
2047 return DISPATCH_OP_COMPLETE;
2048 }
2049 }
2050
2051 static void
2052 _dispatch_operation_deliver_data(dispatch_operation_t op,
2053 dispatch_op_flags_t flags)
2054 {
2055 // Either called from stream resp. pick queue or when op is finalized
2056 dispatch_data_t data = NULL;
2057 int err = 0;
2058 size_t undelivered = op->undelivered + op->buf_len;
2059 bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
2060 (op->flags & DOP_DELIVER);
2061 op->flags = DOP_DEFAULT;
2062 if (!deliver) {
2063 // Don't deliver data until low water mark has been reached
2064 if (undelivered >= op->params.low) {
2065 deliver = true;
2066 } else if (op->buf_len < op->buf_siz) {
2067 // Request buffer is not yet used up
2068 _dispatch_io_debug("buffer data", op->fd_entry->fd);
2069 return;
2070 }
2071 } else {
2072 err = op->err;
2073 if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
2074 err = ECANCELED;
2075 op->err = err;
2076 }
2077 }
2078 // Deliver data or buffer used up
2079 if (op->direction == DOP_DIR_READ) {
2080 if (op->buf_len) {
2081 void *buf = op->buf;
2082 data = dispatch_data_create(buf, op->buf_len, NULL,
2083 DISPATCH_DATA_DESTRUCTOR_FREE);
2084 op->buf = NULL;
2085 op->buf_len = 0;
2086 dispatch_data_t d = dispatch_data_create_concat(op->data, data);
2087 _dispatch_io_data_release(op->data);
2088 _dispatch_io_data_release(data);
2089 data = d;
2090 } else {
2091 data = op->data;
2092 }
2093 op->data = deliver ? dispatch_data_empty : data;
2094 } else if (op->direction == DOP_DIR_WRITE) {
2095 if (deliver) {
2096 data = dispatch_data_create_subrange(op->data, op->buf_len,
2097 op->length);
2098 }
2099 if (op->buf_data && op->buf_len == op->buf_siz) {
2100 _dispatch_io_data_release(op->buf_data);
2101 op->buf_data = NULL;
2102 op->buf = NULL;
2103 op->buf_len = 0;
2104 // Trim newly written buffer from head of unwritten data
2105 dispatch_data_t d;
2106 if (deliver) {
2107 _dispatch_io_data_retain(data);
2108 d = data;
2109 } else {
2110 d = dispatch_data_create_subrange(op->data, op->buf_siz,
2111 op->length);
2112 }
2113 _dispatch_io_data_release(op->data);
2114 op->data = d;
2115 }
2116 } else {
2117 dispatch_assert(op->direction < DOP_DIR_MAX);
2118 return;
2119 }
2120 if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
2121 op->undelivered = undelivered;
2122 _dispatch_io_debug("buffer data", op->fd_entry->fd);
2123 return;
2124 }
2125 op->undelivered = 0;
2126 _dispatch_io_debug("deliver data", op->fd_entry->fd);
2127 dispatch_op_direction_t direction = op->direction;
2128 dispatch_io_handler_t handler = op->handler;
2129 #if DISPATCH_IO_DEBUG
2130 int fd = op->fd_entry->fd;
2131 #endif
2132 dispatch_fd_entry_t fd_entry = op->fd_entry;
2133 _dispatch_fd_entry_retain(fd_entry);
2134 dispatch_io_t channel = op->channel;
2135 _dispatch_retain(channel);
2136 // Note that data delivery may occur after the operation is freed
2137 dispatch_async(op->op_q, ^{
2138 bool done = (flags & DOP_DONE);
2139 dispatch_data_t d = data;
2140 if (done) {
2141 if (direction == DOP_DIR_READ && err) {
2142 if (dispatch_data_get_size(d)) {
2143 _dispatch_io_debug("IO handler invoke", fd);
2144 handler(false, d, 0);
2145 }
2146 d = NULL;
2147 } else if (direction == DOP_DIR_WRITE && !err) {
2148 d = NULL;
2149 }
2150 }
2151 _dispatch_io_debug("IO handler invoke", fd);
2152 handler(done, d, err);
2153 _dispatch_release(channel);
2154 _dispatch_fd_entry_release(fd_entry);
2155 _dispatch_io_data_release(data);
2156 });
2157 }