]> git.saurik.com Git - apple/libdispatch.git/blob - src/io.c
libdispatch-501.20.1.tar.gz
[apple/libdispatch.git] / src / io.c
1 /*
2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 #ifndef DISPATCH_IO_DEBUG
24 #define DISPATCH_IO_DEBUG DISPATCH_DEBUG
25 #endif
26
27 #if DISPATCH_IO_DEBUG
28 #define _dispatch_fd_debug(msg, fd, args...) \
29 _dispatch_debug("fd[0x%x]: " msg, (fd), ##args)
30 #else
31 #define _dispatch_fd_debug(msg, fd, args...)
32 #endif
33
34 #if USE_OBJC
35 #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
36 #define _dispatch_io_data_release(x) _dispatch_objc_release(x)
37 #else
38 #define _dispatch_io_data_retain(x) dispatch_retain(x)
39 #define _dispatch_io_data_release(x) dispatch_release(x)
40 #endif
41
42 typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
43
44 DISPATCH_EXPORT DISPATCH_NOTHROW
45 void _dispatch_iocntl(uint32_t param, uint64_t value);
46
47 static dispatch_operation_t _dispatch_operation_create(
48 dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
49 size_t length, dispatch_data_t data, dispatch_queue_t queue,
50 dispatch_io_handler_t handler);
51 static void _dispatch_operation_enqueue(dispatch_operation_t op,
52 dispatch_op_direction_t direction, dispatch_data_t data);
53 static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
54 dispatch_operation_t op);
55 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
56 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
57 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
58 dispatch_fd_entry_init_callback_t completion_callback);
59 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
60 uintptr_t hash);
61 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
62 dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
63 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
64 dispatch_io_t channel);
65 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
66 dispatch_io_t channel);
67 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
68 dispatch_queue_t tq);
69 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
70 dispatch_op_direction_t direction);
71 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
72 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
73 dispatch_operation_t operation, dispatch_data_t data);
74 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
75 dispatch_operation_t operation, dispatch_data_t data);
76 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
77 dispatch_io_t channel);
78 static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk,
79 dispatch_io_t channel);
80 static void _dispatch_stream_source_handler(void *ctx);
81 static void _dispatch_stream_queue_handler(void *ctx);
82 static void _dispatch_stream_handler(void *ctx);
83 static void _dispatch_disk_handler(void *ctx);
84 static void _dispatch_disk_perform(void *ctxt);
85 static void _dispatch_operation_advise(dispatch_operation_t op,
86 size_t chunk_size);
87 static int _dispatch_operation_perform(dispatch_operation_t op);
88 static void _dispatch_operation_deliver_data(dispatch_operation_t op,
89 dispatch_op_flags_t flags);
90
91 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
92 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
93 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
94 case EINTR: continue; \
95 __VA_ARGS__ \
96 } \
97 break; \
98 } while (1)
99 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
100 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
101 case 0: break; \
102 __VA_ARGS__ \
103 ); \
104 } while (0)
105 #define _dispatch_io_syscall(__syscall) do { int __err; \
106 _dispatch_io_syscall_switch(__err, __syscall); \
107 } while (0)
108
109 enum {
110 DISPATCH_OP_COMPLETE = 1,
111 DISPATCH_OP_DELIVER,
112 DISPATCH_OP_DELIVER_AND_COMPLETE,
113 DISPATCH_OP_COMPLETE_RESUME,
114 DISPATCH_OP_RESUME,
115 DISPATCH_OP_ERR,
116 DISPATCH_OP_FD_ERR,
117 };
118
119 #define _dispatch_io_Block_copy(x) \
120 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
121
122 #pragma mark -
123 #pragma mark dispatch_io_hashtables
124
125 // Global hashtable of dev_t -> disk_s mappings
126 DISPATCH_CACHELINE_ALIGN
127 static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
128 // Global hashtable of fd -> fd_entry_s mappings
129 DISPATCH_CACHELINE_ALIGN
130 static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
131
132 static dispatch_once_t _dispatch_io_devs_lockq_pred;
133 static dispatch_queue_t _dispatch_io_devs_lockq;
134 static dispatch_queue_t _dispatch_io_fds_lockq;
135
136 static void
137 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
138 {
139 _dispatch_io_fds_lockq = dispatch_queue_create(
140 "com.apple.libdispatch-io.fd_lockq", NULL);
141 unsigned int i;
142 for (i = 0; i < DIO_HASH_SIZE; i++) {
143 TAILQ_INIT(&_dispatch_io_fds[i]);
144 }
145 }
146
147 static void
148 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
149 {
150 _dispatch_io_devs_lockq = dispatch_queue_create(
151 "com.apple.libdispatch-io.dev_lockq", NULL);
152 unsigned int i;
153 for (i = 0; i < DIO_HASH_SIZE; i++) {
154 TAILQ_INIT(&_dispatch_io_devs[i]);
155 }
156 }
157
158 #pragma mark -
159 #pragma mark dispatch_io_defaults
160
161 enum {
162 DISPATCH_IOCNTL_CHUNK_PAGES = 1,
163 DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
164 DISPATCH_IOCNTL_INITIAL_DELIVERY,
165 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
166 };
167
168 static struct dispatch_io_defaults_s {
169 size_t chunk_size, low_water_chunks, max_pending_io_reqs;
170 bool initial_delivery;
171 } dispatch_io_defaults = {
172 .chunk_size = DIO_MAX_CHUNK_SIZE,
173 .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
174 .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
175 };
176
177 #define _dispatch_iocntl_set_default(p, v) do { \
178 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
179 } while (0)
180
181 void
182 _dispatch_iocntl(uint32_t param, uint64_t value)
183 {
184 switch (param) {
185 case DISPATCH_IOCNTL_CHUNK_PAGES:
186 _dispatch_iocntl_set_default(chunk_size, value * PAGE_SIZE);
187 break;
188 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
189 _dispatch_iocntl_set_default(low_water_chunks, value);
190 break;
191 case DISPATCH_IOCNTL_INITIAL_DELIVERY:
192 _dispatch_iocntl_set_default(initial_delivery, value);
193 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
194 _dispatch_iocntl_set_default(max_pending_io_reqs, value);
195 break;
196 }
197 }
198
199 #pragma mark -
200 #pragma mark dispatch_io_t
201
202 static dispatch_io_t
203 _dispatch_io_create(dispatch_io_type_t type)
204 {
205 dispatch_io_t channel = _dispatch_alloc(DISPATCH_VTABLE(io),
206 sizeof(struct dispatch_io_s));
207 channel->do_next = DISPATCH_OBJECT_LISTLESS;
208 channel->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
209 true);
210 channel->params.type = type;
211 channel->params.high = SIZE_MAX;
212 channel->params.low = dispatch_io_defaults.low_water_chunks *
213 dispatch_io_defaults.chunk_size;
214 channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
215 NULL);
216 return channel;
217 }
218
219 static void
220 _dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
221 dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
222 {
223 // Enqueue the cleanup handler on the suspended close queue
224 if (cleanup_handler) {
225 _dispatch_retain(queue);
226 dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
227 dispatch_async(queue, ^{
228 _dispatch_fd_debug("cleanup handler invoke", -1);
229 cleanup_handler(err);
230 });
231 _dispatch_release(queue);
232 });
233 }
234 if (fd_entry) {
235 channel->fd_entry = fd_entry;
236 dispatch_retain(fd_entry->barrier_queue);
237 dispatch_retain(fd_entry->barrier_group);
238 channel->barrier_queue = fd_entry->barrier_queue;
239 channel->barrier_group = fd_entry->barrier_group;
240 } else {
241 // Still need to create a barrier queue, since all operations go
242 // through it
243 channel->barrier_queue = dispatch_queue_create(
244 "com.apple.libdispatch-io.barrierq", NULL);
245 channel->barrier_group = dispatch_group_create();
246 }
247 }
248
249 void
250 _dispatch_io_dispose(dispatch_io_t channel)
251 {
252 _dispatch_object_debug(channel, "%s", __func__);
253 if (channel->fd_entry &&
254 !(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
255 if (channel->fd_entry->path_data) {
256 // This modification is safe since path_data->channel is checked
257 // only on close_queue (which is still suspended at this point)
258 channel->fd_entry->path_data->channel = NULL;
259 }
260 // Cleanup handlers will only run when all channels related to this
261 // fd are complete
262 _dispatch_fd_entry_release(channel->fd_entry);
263 }
264 if (channel->queue) {
265 dispatch_release(channel->queue);
266 }
267 if (channel->barrier_queue) {
268 dispatch_release(channel->barrier_queue);
269 }
270 if (channel->barrier_group) {
271 dispatch_release(channel->barrier_group);
272 }
273 }
274
275 static int
276 _dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
277 {
278 int err = 0;
279 if (S_ISDIR(mode)) {
280 err = EISDIR;
281 } else if (channel->params.type == DISPATCH_IO_RANDOM &&
282 (S_ISFIFO(mode) || S_ISSOCK(mode))) {
283 err = ESPIPE;
284 }
285 return err;
286 }
287
288 static int
289 _dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
290 bool ignore_closed)
291 {
292 // On _any_ queue
293 int err;
294 if (op) {
295 channel = op->channel;
296 }
297 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
298 if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
299 err = ECANCELED;
300 } else {
301 err = 0;
302 }
303 } else {
304 err = op ? op->fd_entry->err : channel->err;
305 }
306 return err;
307 }
308
309 #pragma mark -
310 #pragma mark dispatch_io_channels
311
312 dispatch_io_t
313 dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
314 dispatch_queue_t queue, void (^cleanup_handler)(int))
315 {
316 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
317 return NULL;
318 }
319 _dispatch_fd_debug("io create", fd);
320 dispatch_io_t channel = _dispatch_io_create(type);
321 channel->fd = fd;
322 channel->fd_actual = fd;
323 dispatch_suspend(channel->queue);
324 _dispatch_retain(queue);
325 _dispatch_retain(channel);
326 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
327 // On barrier queue
328 int err = fd_entry->err;
329 if (!err) {
330 err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
331 }
332 if (!err && type == DISPATCH_IO_RANDOM) {
333 off_t f_ptr;
334 _dispatch_io_syscall_switch_noerr(err,
335 f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
336 case 0: channel->f_ptr = f_ptr; break;
337 default: (void)dispatch_assume_zero(err); break;
338 );
339 }
340 channel->err = err;
341 _dispatch_fd_entry_retain(fd_entry);
342 _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
343 dispatch_resume(channel->queue);
344 _dispatch_object_debug(channel, "%s", __func__);
345 _dispatch_release(channel);
346 _dispatch_release(queue);
347 });
348 _dispatch_object_debug(channel, "%s", __func__);
349 return channel;
350 }
351
352 dispatch_io_t
353 dispatch_io_create_f(dispatch_io_type_t type, dispatch_fd_t fd,
354 dispatch_queue_t queue, void *context,
355 void (*cleanup_handler)(void *context, int error))
356 {
357 return dispatch_io_create(type, fd, queue, !cleanup_handler ? NULL :
358 ^(int error){ cleanup_handler(context, error); });
359 }
360
361 dispatch_io_t
362 dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
363 int oflag, mode_t mode, dispatch_queue_t queue,
364 void (^cleanup_handler)(int error))
365 {
366 if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
367 !(*path == '/')) {
368 return NULL;
369 }
370 size_t pathlen = strlen(path);
371 dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
372 if (!path_data) {
373 return NULL;
374 }
375 _dispatch_fd_debug("io create with path %s", -1, path);
376 dispatch_io_t channel = _dispatch_io_create(type);
377 channel->fd = -1;
378 channel->fd_actual = -1;
379 path_data->channel = channel;
380 path_data->oflag = oflag;
381 path_data->mode = mode;
382 path_data->pathlen = pathlen;
383 memcpy(path_data->path, path, pathlen + 1);
384 _dispatch_retain(queue);
385 _dispatch_retain(channel);
386 dispatch_async(channel->queue, ^{
387 int err = 0;
388 struct stat st;
389 _dispatch_io_syscall_switch_noerr(err,
390 (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW ||
391 (path_data->oflag & O_SYMLINK) == O_SYMLINK ?
392 lstat(path_data->path, &st) : stat(path_data->path, &st),
393 case 0:
394 err = _dispatch_io_validate_type(channel, st.st_mode);
395 break;
396 default:
397 if ((path_data->oflag & O_CREAT) &&
398 (*(path_data->path + path_data->pathlen - 1) != '/')) {
399 // Check parent directory
400 char *c = strrchr(path_data->path, '/');
401 dispatch_assert(c);
402 *c = 0;
403 int perr;
404 _dispatch_io_syscall_switch_noerr(perr,
405 stat(path_data->path, &st),
406 case 0:
407 // Since the parent directory exists, open() will
408 // create a regular file after the fd_entry has
409 // been filled in
410 st.st_mode = S_IFREG;
411 err = 0;
412 break;
413 );
414 *c = '/';
415 }
416 break;
417 );
418 channel->err = err;
419 if (err) {
420 free(path_data);
421 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
422 _dispatch_release(channel);
423 _dispatch_release(queue);
424 return;
425 }
426 dispatch_suspend(channel->queue);
427 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
428 _dispatch_io_devs_lockq_init);
429 dispatch_async(_dispatch_io_devs_lockq, ^{
430 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
431 path_data, st.st_dev, st.st_mode);
432 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
433 dispatch_resume(channel->queue);
434 _dispatch_object_debug(channel, "%s", __func__);
435 _dispatch_release(channel);
436 _dispatch_release(queue);
437 });
438 });
439 _dispatch_object_debug(channel, "%s", __func__);
440 return channel;
441 }
442
443 dispatch_io_t
444 dispatch_io_create_with_path_f(dispatch_io_type_t type, const char *path,
445 int oflag, mode_t mode, dispatch_queue_t queue, void *context,
446 void (*cleanup_handler)(void *context, int error))
447 {
448 return dispatch_io_create_with_path(type, path, oflag, mode, queue,
449 !cleanup_handler ? NULL :
450 ^(int error){ cleanup_handler(context, error); });
451 }
452
453 dispatch_io_t
454 dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
455 dispatch_queue_t queue, void (^cleanup_handler)(int error))
456 {
457 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
458 return NULL;
459 }
460 _dispatch_fd_debug("io create with io %p", -1, in_channel);
461 dispatch_io_t channel = _dispatch_io_create(type);
462 dispatch_suspend(channel->queue);
463 _dispatch_retain(queue);
464 _dispatch_retain(channel);
465 _dispatch_retain(in_channel);
466 dispatch_async(in_channel->queue, ^{
467 int err0 = _dispatch_io_get_error(NULL, in_channel, false);
468 if (err0) {
469 channel->err = err0;
470 _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
471 dispatch_resume(channel->queue);
472 _dispatch_release(channel);
473 _dispatch_release(in_channel);
474 _dispatch_release(queue);
475 return;
476 }
477 dispatch_async(in_channel->barrier_queue, ^{
478 int err = _dispatch_io_get_error(NULL, in_channel, false);
479 // If there is no error, the fd_entry for the in_channel is valid.
480 // Since we are running on in_channel's queue, the fd_entry has been
481 // fully resolved and will stay valid for the duration of this block
482 if (!err) {
483 err = in_channel->err;
484 if (!err) {
485 err = in_channel->fd_entry->err;
486 }
487 }
488 if (!err) {
489 err = _dispatch_io_validate_type(channel,
490 in_channel->fd_entry->stat.mode);
491 }
492 if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
493 off_t f_ptr;
494 _dispatch_io_syscall_switch_noerr(err,
495 f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
496 case 0: channel->f_ptr = f_ptr; break;
497 default: (void)dispatch_assume_zero(err); break;
498 );
499 }
500 channel->err = err;
501 if (err) {
502 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
503 dispatch_resume(channel->queue);
504 _dispatch_release(channel);
505 _dispatch_release(in_channel);
506 _dispatch_release(queue);
507 return;
508 }
509 if (in_channel->fd == -1) {
510 // in_channel was created from path
511 channel->fd = -1;
512 channel->fd_actual = -1;
513 mode_t mode = in_channel->fd_entry->stat.mode;
514 dev_t dev = in_channel->fd_entry->stat.dev;
515 size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
516 in_channel->fd_entry->path_data->pathlen + 1;
517 dispatch_io_path_data_t path_data = malloc(path_data_len);
518 memcpy(path_data, in_channel->fd_entry->path_data,
519 path_data_len);
520 path_data->channel = channel;
521 // lockq_io_devs is known to already exist
522 dispatch_async(_dispatch_io_devs_lockq, ^{
523 dispatch_fd_entry_t fd_entry;
524 fd_entry = _dispatch_fd_entry_create_with_path(path_data,
525 dev, mode);
526 _dispatch_io_init(channel, fd_entry, queue, 0,
527 cleanup_handler);
528 dispatch_resume(channel->queue);
529 _dispatch_release(channel);
530 _dispatch_release(queue);
531 });
532 } else {
533 dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
534 channel->fd = in_channel->fd;
535 channel->fd_actual = in_channel->fd_actual;
536 _dispatch_fd_entry_retain(fd_entry);
537 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
538 dispatch_resume(channel->queue);
539 _dispatch_release(channel);
540 _dispatch_release(queue);
541 }
542 _dispatch_release(in_channel);
543 _dispatch_object_debug(channel, "%s", __func__);
544 });
545 });
546 _dispatch_object_debug(channel, "%s", __func__);
547 return channel;
548 }
549
550 dispatch_io_t
551 dispatch_io_create_with_io_f(dispatch_io_type_t type, dispatch_io_t in_channel,
552 dispatch_queue_t queue, void *context,
553 void (*cleanup_handler)(void *context, int error))
554 {
555 return dispatch_io_create_with_io(type, in_channel, queue,
556 !cleanup_handler ? NULL :
557 ^(int error){ cleanup_handler(context, error); });
558 }
559
560 #pragma mark -
561 #pragma mark dispatch_io_accessors
562
563 void
564 dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
565 {
566 _dispatch_retain(channel);
567 dispatch_async(channel->queue, ^{
568 _dispatch_fd_debug("io set high water", channel->fd);
569 if (channel->params.low > high_water) {
570 channel->params.low = high_water;
571 }
572 channel->params.high = high_water ? high_water : 1;
573 _dispatch_release(channel);
574 });
575 }
576
577 void
578 dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
579 {
580 _dispatch_retain(channel);
581 dispatch_async(channel->queue, ^{
582 _dispatch_fd_debug("io set low water", channel->fd);
583 if (channel->params.high < low_water) {
584 channel->params.high = low_water ? low_water : 1;
585 }
586 channel->params.low = low_water;
587 _dispatch_release(channel);
588 });
589 }
590
591 void
592 dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
593 unsigned long flags)
594 {
595 _dispatch_retain(channel);
596 dispatch_async(channel->queue, ^{
597 _dispatch_fd_debug("io set interval", channel->fd);
598 channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX;
599 channel->params.interval_flags = flags;
600 _dispatch_release(channel);
601 });
602 }
603
604 void
605 _dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
606 {
607 _dispatch_retain(dq);
608 _dispatch_retain(channel);
609 dispatch_async(channel->queue, ^{
610 dispatch_queue_t prev_dq = channel->do_targetq;
611 channel->do_targetq = dq;
612 _dispatch_release(prev_dq);
613 _dispatch_object_debug(channel, "%s", __func__);
614 _dispatch_release(channel);
615 });
616 }
617
618 dispatch_fd_t
619 dispatch_io_get_descriptor(dispatch_io_t channel)
620 {
621 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
622 return -1;
623 }
624 dispatch_fd_t fd = channel->fd_actual;
625 if (fd == -1 && _dispatch_thread_getspecific(dispatch_io_key) == channel &&
626 !_dispatch_io_get_error(NULL, channel, false)) {
627 dispatch_fd_entry_t fd_entry = channel->fd_entry;
628 (void)_dispatch_fd_entry_open(fd_entry, channel);
629 }
630 return channel->fd_actual;
631 }
632
633 #pragma mark -
634 #pragma mark dispatch_io_operations
635
636 static void
637 _dispatch_io_stop(dispatch_io_t channel)
638 {
639 _dispatch_fd_debug("io stop", channel->fd);
640 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed);
641 _dispatch_retain(channel);
642 dispatch_async(channel->queue, ^{
643 dispatch_async(channel->barrier_queue, ^{
644 _dispatch_object_debug(channel, "%s", __func__);
645 dispatch_fd_entry_t fd_entry = channel->fd_entry;
646 if (fd_entry) {
647 _dispatch_fd_debug("io stop cleanup", channel->fd);
648 _dispatch_fd_entry_cleanup_operations(fd_entry, channel);
649 if (!(channel->atomic_flags & DIO_CLOSED)) {
650 channel->fd_entry = NULL;
651 _dispatch_fd_entry_release(fd_entry);
652 }
653 } else if (channel->fd != -1) {
654 // Stop after close, need to check if fd_entry still exists
655 _dispatch_retain(channel);
656 dispatch_async(_dispatch_io_fds_lockq, ^{
657 _dispatch_object_debug(channel, "%s", __func__);
658 _dispatch_fd_debug("io stop after close cleanup",
659 channel->fd);
660 dispatch_fd_entry_t fdi;
661 uintptr_t hash = DIO_HASH(channel->fd);
662 TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
663 if (fdi->fd == channel->fd) {
664 _dispatch_fd_entry_cleanup_operations(fdi, channel);
665 break;
666 }
667 }
668 _dispatch_release(channel);
669 });
670 }
671 _dispatch_release(channel);
672 });
673 });
674 }
675
676 void
677 dispatch_io_close(dispatch_io_t channel, unsigned long flags)
678 {
679 if (flags & DISPATCH_IO_STOP) {
680 // Don't stop an already stopped channel
681 if (channel->atomic_flags & DIO_STOPPED) {
682 return;
683 }
684 return _dispatch_io_stop(channel);
685 }
686 // Don't close an already closed or stopped channel
687 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
688 return;
689 }
690 _dispatch_retain(channel);
691 dispatch_async(channel->queue, ^{
692 dispatch_async(channel->barrier_queue, ^{
693 _dispatch_object_debug(channel, "%s", __func__);
694 _dispatch_fd_debug("io close", channel->fd);
695 if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
696 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED,
697 relaxed);
698 dispatch_fd_entry_t fd_entry = channel->fd_entry;
699 if (fd_entry) {
700 if (!fd_entry->path_data) {
701 channel->fd_entry = NULL;
702 }
703 _dispatch_fd_entry_release(fd_entry);
704 }
705 }
706 _dispatch_release(channel);
707 });
708 });
709 }
710
711 void
712 dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
713 {
714 _dispatch_retain(channel);
715 dispatch_async(channel->queue, ^{
716 dispatch_queue_t io_q = channel->do_targetq;
717 dispatch_queue_t barrier_queue = channel->barrier_queue;
718 dispatch_group_t barrier_group = channel->barrier_group;
719 dispatch_async(barrier_queue, ^{
720 dispatch_suspend(barrier_queue);
721 dispatch_group_notify(barrier_group, io_q, ^{
722 _dispatch_object_debug(channel, "%s", __func__);
723 _dispatch_thread_setspecific(dispatch_io_key, channel);
724 barrier();
725 _dispatch_thread_setspecific(dispatch_io_key, NULL);
726 dispatch_resume(barrier_queue);
727 _dispatch_release(channel);
728 });
729 });
730 });
731 }
732
733 void
734 dispatch_io_barrier_f(dispatch_io_t channel, void *context,
735 dispatch_function_t barrier)
736 {
737 return dispatch_io_barrier(channel, ^{ barrier(context); });
738 }
739
740 void
741 dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
742 dispatch_queue_t queue, dispatch_io_handler_t handler)
743 {
744 _dispatch_retain(channel);
745 _dispatch_retain(queue);
746 dispatch_async(channel->queue, ^{
747 dispatch_operation_t op;
748 op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
749 length, dispatch_data_empty, queue, handler);
750 if (op) {
751 dispatch_queue_t barrier_q = channel->barrier_queue;
752 dispatch_async(barrier_q, ^{
753 _dispatch_operation_enqueue(op, DOP_DIR_READ,
754 dispatch_data_empty);
755 });
756 }
757 _dispatch_release(channel);
758 _dispatch_release(queue);
759 });
760 }
761
762 void
763 dispatch_io_read_f(dispatch_io_t channel, off_t offset, size_t length,
764 dispatch_queue_t queue, void *context,
765 dispatch_io_handler_function_t handler)
766 {
767 return dispatch_io_read(channel, offset, length, queue,
768 ^(bool done, dispatch_data_t d, int error){
769 handler(context, done, d, error);
770 });
771 }
772
773 void
774 dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
775 dispatch_queue_t queue, dispatch_io_handler_t handler)
776 {
777 _dispatch_io_data_retain(data);
778 _dispatch_retain(channel);
779 _dispatch_retain(queue);
780 dispatch_async(channel->queue, ^{
781 dispatch_operation_t op;
782 op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
783 dispatch_data_get_size(data), data, queue, handler);
784 if (op) {
785 dispatch_queue_t barrier_q = channel->barrier_queue;
786 dispatch_async(barrier_q, ^{
787 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
788 _dispatch_io_data_release(data);
789 });
790 } else {
791 _dispatch_io_data_release(data);
792 }
793 _dispatch_release(channel);
794 _dispatch_release(queue);
795 });
796 }
797
798 void
799 dispatch_io_write_f(dispatch_io_t channel, off_t offset, dispatch_data_t data,
800 dispatch_queue_t queue, void *context,
801 dispatch_io_handler_function_t handler)
802 {
803 return dispatch_io_write(channel, offset, data, queue,
804 ^(bool done, dispatch_data_t d, int error){
805 handler(context, done, d, error);
806 });
807 }
808
809 void
810 dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
811 void (^handler)(dispatch_data_t, int))
812 {
813 _dispatch_retain(queue);
814 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
815 // On barrier queue
816 if (fd_entry->err) {
817 int err = fd_entry->err;
818 dispatch_async(queue, ^{
819 _dispatch_fd_debug("convenience handler invoke", fd);
820 handler(dispatch_data_empty, err);
821 });
822 _dispatch_release(queue);
823 return;
824 }
825 // Safe to access fd_entry on barrier queue
826 dispatch_io_t channel = fd_entry->convenience_channel;
827 if (!channel) {
828 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
829 channel->fd = fd;
830 channel->fd_actual = fd;
831 channel->fd_entry = fd_entry;
832 dispatch_retain(fd_entry->barrier_queue);
833 dispatch_retain(fd_entry->barrier_group);
834 channel->barrier_queue = fd_entry->barrier_queue;
835 channel->barrier_group = fd_entry->barrier_group;
836 fd_entry->convenience_channel = channel;
837 }
838 __block dispatch_data_t deliver_data = dispatch_data_empty;
839 __block int err = 0;
840 dispatch_async(fd_entry->close_queue, ^{
841 dispatch_async(queue, ^{
842 _dispatch_fd_debug("convenience handler invoke", fd);
843 handler(deliver_data, err);
844 _dispatch_io_data_release(deliver_data);
845 });
846 _dispatch_release(queue);
847 });
848 dispatch_operation_t op =
849 _dispatch_operation_create(DOP_DIR_READ, channel, 0,
850 length, dispatch_data_empty,
851 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false),
852 ^(bool done, dispatch_data_t data, int error) {
853 if (data) {
854 data = dispatch_data_create_concat(deliver_data, data);
855 _dispatch_io_data_release(deliver_data);
856 deliver_data = data;
857 }
858 if (done) {
859 err = error;
860 }
861 });
862 if (op) {
863 _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
864 }
865 });
866 }
867
868 void
869 dispatch_read_f(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
870 void *context, void (*handler)(void *, dispatch_data_t, int))
871 {
872 return dispatch_read(fd, length, queue, ^(dispatch_data_t d, int error){
873 handler(context, d, error);
874 });
875 }
876
877 void
878 dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
879 void (^handler)(dispatch_data_t, int))
880 {
881 _dispatch_io_data_retain(data);
882 _dispatch_retain(queue);
883 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
884 // On barrier queue
885 if (fd_entry->err) {
886 int err = fd_entry->err;
887 dispatch_async(queue, ^{
888 _dispatch_fd_debug("convenience handler invoke", fd);
889 handler(NULL, err);
890 });
891 _dispatch_release(queue);
892 return;
893 }
894 // Safe to access fd_entry on barrier queue
895 dispatch_io_t channel = fd_entry->convenience_channel;
896 if (!channel) {
897 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
898 channel->fd = fd;
899 channel->fd_actual = fd;
900 channel->fd_entry = fd_entry;
901 dispatch_retain(fd_entry->barrier_queue);
902 dispatch_retain(fd_entry->barrier_group);
903 channel->barrier_queue = fd_entry->barrier_queue;
904 channel->barrier_group = fd_entry->barrier_group;
905 fd_entry->convenience_channel = channel;
906 }
907 __block dispatch_data_t deliver_data = NULL;
908 __block int err = 0;
909 dispatch_async(fd_entry->close_queue, ^{
910 dispatch_async(queue, ^{
911 _dispatch_fd_debug("convenience handler invoke", fd);
912 handler(deliver_data, err);
913 if (deliver_data) {
914 _dispatch_io_data_release(deliver_data);
915 }
916 });
917 _dispatch_release(queue);
918 });
919 dispatch_operation_t op =
920 _dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
921 dispatch_data_get_size(data), data,
922 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false),
923 ^(bool done, dispatch_data_t d, int error) {
924 if (done) {
925 if (d) {
926 _dispatch_io_data_retain(d);
927 deliver_data = d;
928 }
929 err = error;
930 }
931 });
932 if (op) {
933 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
934 }
935 _dispatch_io_data_release(data);
936 });
937 }
938
939 void
940 dispatch_write_f(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
941 void *context, void (*handler)(void *, dispatch_data_t, int))
942 {
943 return dispatch_write(fd, data, queue, ^(dispatch_data_t d, int error){
944 handler(context, d, error);
945 });
946 }
947
948 #pragma mark -
949 #pragma mark dispatch_operation_t
950
951 static dispatch_operation_t
952 _dispatch_operation_create(dispatch_op_direction_t direction,
953 dispatch_io_t channel, off_t offset, size_t length,
954 dispatch_data_t data, dispatch_queue_t queue,
955 dispatch_io_handler_t handler)
956 {
957 // On channel queue
958 dispatch_assert(direction < DOP_DIR_MAX);
959 _dispatch_fd_debug("operation create", channel->fd);
960 #if DISPATCH_IO_DEBUG
961 int fd = channel->fd;
962 #endif
963 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
964 // that can only be NULL if atomic_flags are set rdar://problem/8362514
965 int err = _dispatch_io_get_error(NULL, channel, false);
966 if (err || !length) {
967 _dispatch_io_data_retain(data);
968 _dispatch_retain(queue);
969 dispatch_async(channel->barrier_queue, ^{
970 dispatch_async(queue, ^{
971 dispatch_data_t d = data;
972 if (direction == DOP_DIR_READ && err) {
973 d = NULL;
974 } else if (direction == DOP_DIR_WRITE && !err) {
975 d = NULL;
976 }
977 _dispatch_fd_debug("IO handler invoke", fd);
978 handler(true, d, err);
979 _dispatch_io_data_release(data);
980 });
981 _dispatch_release(queue);
982 });
983 return NULL;
984 }
985 dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation),
986 sizeof(struct dispatch_operation_s));
987 op->do_next = DISPATCH_OBJECT_LISTLESS;
988 op->do_xref_cnt = -1; // operation object is not exposed externally
989 op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
990 op->op_q->do_targetq = queue;
991 _dispatch_retain(queue);
992 op->active = false;
993 op->direction = direction;
994 op->offset = offset + channel->f_ptr;
995 op->length = length;
996 op->handler = _dispatch_io_Block_copy(handler);
997 _dispatch_retain(channel);
998 op->channel = channel;
999 op->params = channel->params;
1000 // Take a snapshot of the priority of the channel queue. The actual I/O
1001 // for this operation will be performed at this priority
1002 dispatch_queue_t targetq = op->channel->do_targetq;
1003 while (fastpath(targetq->do_targetq)) {
1004 targetq = targetq->do_targetq;
1005 }
1006 op->do_targetq = targetq;
1007 _dispatch_object_debug(op, "%s", __func__);
1008 return op;
1009 }
1010
1011 void
1012 _dispatch_operation_dispose(dispatch_operation_t op)
1013 {
1014 _dispatch_object_debug(op, "%s", __func__);
1015 // Deliver the data if there's any
1016 if (op->fd_entry) {
1017 _dispatch_operation_deliver_data(op, DOP_DONE);
1018 dispatch_group_leave(op->fd_entry->barrier_group);
1019 _dispatch_fd_entry_release(op->fd_entry);
1020 }
1021 if (op->channel) {
1022 _dispatch_release(op->channel);
1023 }
1024 if (op->timer) {
1025 dispatch_release(op->timer);
1026 }
1027 // For write operations, op->buf is owned by op->buf_data
1028 if (op->buf && op->direction == DOP_DIR_READ) {
1029 free(op->buf);
1030 }
1031 if (op->buf_data) {
1032 _dispatch_io_data_release(op->buf_data);
1033 }
1034 if (op->data) {
1035 _dispatch_io_data_release(op->data);
1036 }
1037 if (op->op_q) {
1038 dispatch_release(op->op_q);
1039 }
1040 Block_release(op->handler);
1041 }
1042
1043 static void
1044 _dispatch_operation_enqueue(dispatch_operation_t op,
1045 dispatch_op_direction_t direction, dispatch_data_t data)
1046 {
1047 // Called from the barrier queue
1048 _dispatch_io_data_retain(data);
1049 // If channel is closed or stopped, then call the handler immediately
1050 int err = _dispatch_io_get_error(NULL, op->channel, false);
1051 if (err) {
1052 dispatch_io_handler_t handler = op->handler;
1053 dispatch_async(op->op_q, ^{
1054 dispatch_data_t d = data;
1055 if (direction == DOP_DIR_READ && err) {
1056 d = NULL;
1057 } else if (direction == DOP_DIR_WRITE && !err) {
1058 d = NULL;
1059 }
1060 handler(true, d, err);
1061 _dispatch_io_data_release(data);
1062 });
1063 _dispatch_release(op);
1064 return;
1065 }
1066 // Finish operation init
1067 op->fd_entry = op->channel->fd_entry;
1068 _dispatch_fd_entry_retain(op->fd_entry);
1069 dispatch_group_enter(op->fd_entry->barrier_group);
1070 dispatch_disk_t disk = op->fd_entry->disk;
1071 if (!disk) {
1072 dispatch_stream_t stream = op->fd_entry->streams[direction];
1073 dispatch_async(stream->dq, ^{
1074 _dispatch_stream_enqueue_operation(stream, op, data);
1075 _dispatch_io_data_release(data);
1076 });
1077 } else {
1078 dispatch_async(disk->pick_queue, ^{
1079 _dispatch_disk_enqueue_operation(disk, op, data);
1080 _dispatch_io_data_release(data);
1081 });
1082 }
1083 }
1084
1085 static bool
1086 _dispatch_operation_should_enqueue(dispatch_operation_t op,
1087 dispatch_queue_t tq, dispatch_data_t data)
1088 {
1089 // On stream queue or disk queue
1090 _dispatch_fd_debug("enqueue operation", op->fd_entry->fd);
1091 _dispatch_io_data_retain(data);
1092 op->data = data;
1093 int err = _dispatch_io_get_error(op, NULL, true);
1094 if (err) {
1095 op->err = err;
1096 // Final release
1097 _dispatch_release(op);
1098 return false;
1099 }
1100 if (op->params.interval) {
1101 dispatch_resume(_dispatch_operation_timer(tq, op));
1102 }
1103 return true;
1104 }
1105
1106 static dispatch_source_t
1107 _dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
1108 {
1109 // On stream queue or pick queue
1110 if (op->timer) {
1111 return op->timer;
1112 }
1113 dispatch_source_t timer = dispatch_source_create(
1114 DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
1115 dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW,
1116 (int64_t)op->params.interval), op->params.interval, 0);
1117 dispatch_source_set_event_handler(timer, ^{
1118 // On stream queue or pick queue
1119 if (dispatch_source_testcancel(timer)) {
1120 // Do nothing. The operation has already completed
1121 return;
1122 }
1123 dispatch_op_flags_t flags = DOP_DEFAULT;
1124 if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
1125 // Deliver even if there is less data than the low-water mark
1126 flags |= DOP_DELIVER;
1127 }
1128 // If the operation is active, dont deliver data
1129 if ((op->active) && (flags & DOP_DELIVER)) {
1130 op->flags = flags;
1131 } else {
1132 _dispatch_operation_deliver_data(op, flags);
1133 }
1134 });
1135 op->timer = timer;
1136 return op->timer;
1137 }
1138
1139 #pragma mark -
1140 #pragma mark dispatch_fd_entry_t
1141
1142 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1143 static void
1144 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry)
1145 {
1146 guardid_t guard = fd_entry;
1147 const unsigned int guard_flags = GUARD_CLOSE;
1148 int err, fd_flags = 0;
1149 _dispatch_io_syscall_switch_noerr(err,
1150 change_fdguard_np(fd_entry->fd, NULL, 0, &guard, guard_flags,
1151 &fd_flags),
1152 case 0:
1153 fd_entry->guard_flags = guard_flags;
1154 fd_entry->orig_fd_flags = fd_flags;
1155 break;
1156 case EPERM: break;
1157 default: (void)dispatch_assume_zero(err); break;
1158 );
1159 }
1160
1161 static void
1162 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry)
1163 {
1164 if (!fd_entry->guard_flags) {
1165 return;
1166 }
1167 guardid_t guard = fd_entry;
1168 int err, fd_flags = fd_entry->orig_fd_flags;
1169 _dispatch_io_syscall_switch(err,
1170 change_fdguard_np(fd_entry->fd, &guard, fd_entry->guard_flags, NULL, 0,
1171 &fd_flags),
1172 default: (void)dispatch_assume_zero(err); break;
1173 );
1174 }
1175 #else
1176 static inline void
1177 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1178 static inline void
1179 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1180 #endif // DISPATCH_USE_GUARDED_FD
1181
1182 static inline int
1183 _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry, const char *path,
1184 int oflag, mode_t mode) {
1185 #if DISPATCH_USE_GUARDED_FD
1186 guardid_t guard = (uintptr_t)fd_entry;
1187 const unsigned int guard_flags = GUARD_CLOSE | GUARD_DUP |
1188 GUARD_SOCKET_IPC | GUARD_FILEPORT;
1189 int fd = guarded_open_np(path, &guard, guard_flags, oflag | O_CLOEXEC,
1190 mode);
1191 if (fd != -1) {
1192 fd_entry->guard_flags = guard_flags;
1193 return fd;
1194 }
1195 errno = 0;
1196 #endif
1197 return open(path, oflag, mode);
1198 (void)fd_entry;
1199 }
1200
1201 static inline int
1202 _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry, int fd) {
1203 #if DISPATCH_USE_GUARDED_FD
1204 if (fd_entry->guard_flags) {
1205 guardid_t guard = (uintptr_t)fd_entry;
1206 return guarded_close_np(fd, &guard);
1207 } else
1208 #endif
1209 {
1210 return close(fd);
1211 }
1212 (void)fd_entry;
1213 }
1214
1215 static inline void
1216 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
1217 dispatch_suspend(fd_entry->close_queue);
1218 }
1219
1220 static inline void
1221 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
1222 dispatch_resume(fd_entry->close_queue);
1223 }
1224
1225 static void
1226 _dispatch_fd_entry_init_async(dispatch_fd_t fd,
1227 dispatch_fd_entry_init_callback_t completion_callback)
1228 {
1229 static dispatch_once_t _dispatch_io_fds_lockq_pred;
1230 dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
1231 _dispatch_io_fds_lockq_init);
1232 dispatch_async(_dispatch_io_fds_lockq, ^{
1233 _dispatch_fd_debug("fd entry init", fd);
1234 dispatch_fd_entry_t fd_entry = NULL;
1235 // Check to see if there is an existing entry for the given fd
1236 uintptr_t hash = DIO_HASH(fd);
1237 TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
1238 if (fd_entry->fd == fd) {
1239 // Retain the fd_entry to ensure it cannot go away until the
1240 // stat() has completed
1241 _dispatch_fd_entry_retain(fd_entry);
1242 break;
1243 }
1244 }
1245 if (!fd_entry) {
1246 // If we did not find an existing entry, create one
1247 fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
1248 }
1249 dispatch_async(fd_entry->barrier_queue, ^{
1250 _dispatch_fd_debug("fd entry init completion", fd);
1251 completion_callback(fd_entry);
1252 // stat() is complete, release reference to fd_entry
1253 _dispatch_fd_entry_release(fd_entry);
1254 });
1255 });
1256 }
1257
1258 static dispatch_fd_entry_t
1259 _dispatch_fd_entry_create(dispatch_queue_t q)
1260 {
1261 dispatch_fd_entry_t fd_entry;
1262 fd_entry = _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s));
1263 fd_entry->close_queue = dispatch_queue_create(
1264 "com.apple.libdispatch-io.closeq", NULL);
1265 // Use target queue to ensure that no concurrent lookups are going on when
1266 // the close queue is running
1267 fd_entry->close_queue->do_targetq = q;
1268 _dispatch_retain(q);
1269 // Suspend the cleanup queue until closing
1270 _dispatch_fd_entry_retain(fd_entry);
1271 return fd_entry;
1272 }
1273
1274 static dispatch_fd_entry_t
1275 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
1276 {
1277 // On fds lock queue
1278 _dispatch_fd_debug("fd entry create", fd);
1279 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1280 _dispatch_io_fds_lockq);
1281 fd_entry->fd = fd;
1282 TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
1283 fd_entry->barrier_queue = dispatch_queue_create(
1284 "com.apple.libdispatch-io.barrierq", NULL);
1285 fd_entry->barrier_group = dispatch_group_create();
1286 dispatch_async(fd_entry->barrier_queue, ^{
1287 _dispatch_fd_debug("fd entry stat", fd);
1288 int err, orig_flags, orig_nosigpipe = -1;
1289 struct stat st;
1290 _dispatch_io_syscall_switch(err,
1291 fstat(fd, &st),
1292 default: fd_entry->err = err; return;
1293 );
1294 fd_entry->stat.dev = st.st_dev;
1295 fd_entry->stat.mode = st.st_mode;
1296 _dispatch_fd_entry_guard(fd_entry);
1297 _dispatch_io_syscall_switch(err,
1298 orig_flags = fcntl(fd, F_GETFL),
1299 default: (void)dispatch_assume_zero(err); break;
1300 );
1301 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1302 if (S_ISFIFO(st.st_mode)) {
1303 _dispatch_io_syscall_switch(err,
1304 orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
1305 default: (void)dispatch_assume_zero(err); break;
1306 );
1307 if (orig_nosigpipe != -1) {
1308 _dispatch_io_syscall_switch(err,
1309 orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
1310 default:
1311 orig_nosigpipe = -1;
1312 (void)dispatch_assume_zero(err);
1313 break;
1314 );
1315 }
1316 }
1317 #endif
1318 if (S_ISREG(st.st_mode)) {
1319 if (orig_flags != -1) {
1320 _dispatch_io_syscall_switch(err,
1321 fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
1322 default:
1323 orig_flags = -1;
1324 (void)dispatch_assume_zero(err);
1325 break;
1326 );
1327 }
1328 int32_t dev = major(st.st_dev);
1329 // We have to get the disk on the global dev queue. The
1330 // barrier queue cannot continue until that is complete
1331 dispatch_suspend(fd_entry->barrier_queue);
1332 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
1333 _dispatch_io_devs_lockq_init);
1334 dispatch_async(_dispatch_io_devs_lockq, ^{
1335 _dispatch_disk_init(fd_entry, dev);
1336 dispatch_resume(fd_entry->barrier_queue);
1337 });
1338 } else {
1339 if (orig_flags != -1) {
1340 _dispatch_io_syscall_switch(err,
1341 fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
1342 default:
1343 orig_flags = -1;
1344 (void)dispatch_assume_zero(err);
1345 break;
1346 );
1347 }
1348 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1349 _DISPATCH_QOS_CLASS_DEFAULT, false));
1350 }
1351 fd_entry->orig_flags = orig_flags;
1352 fd_entry->orig_nosigpipe = orig_nosigpipe;
1353 });
1354 // This is the first item run when the close queue is resumed, indicating
1355 // that all channels associated with this entry have been closed and that
1356 // all operations associated with this entry have been freed
1357 dispatch_async(fd_entry->close_queue, ^{
1358 if (!fd_entry->disk) {
1359 _dispatch_fd_debug("close queue fd_entry cleanup", fd);
1360 dispatch_op_direction_t dir;
1361 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1362 _dispatch_stream_dispose(fd_entry, dir);
1363 }
1364 } else {
1365 dispatch_disk_t disk = fd_entry->disk;
1366 dispatch_async(_dispatch_io_devs_lockq, ^{
1367 _dispatch_release(disk);
1368 });
1369 }
1370 // Remove this entry from the global fd list
1371 TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
1372 });
1373 // If there was a source associated with this stream, disposing of the
1374 // source cancels it and suspends the close queue. Freeing the fd_entry
1375 // structure must happen after the source cancel handler has finished
1376 dispatch_async(fd_entry->close_queue, ^{
1377 _dispatch_fd_debug("close queue release", fd);
1378 dispatch_release(fd_entry->close_queue);
1379 _dispatch_fd_debug("barrier queue release", fd);
1380 dispatch_release(fd_entry->barrier_queue);
1381 _dispatch_fd_debug("barrier group release", fd);
1382 dispatch_release(fd_entry->barrier_group);
1383 if (fd_entry->orig_flags != -1) {
1384 _dispatch_io_syscall(
1385 fcntl(fd, F_SETFL, fd_entry->orig_flags)
1386 );
1387 }
1388 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1389 if (fd_entry->orig_nosigpipe != -1) {
1390 _dispatch_io_syscall(
1391 fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
1392 );
1393 }
1394 #endif
1395 _dispatch_fd_entry_unguard(fd_entry);
1396 if (fd_entry->convenience_channel) {
1397 fd_entry->convenience_channel->fd_entry = NULL;
1398 dispatch_release(fd_entry->convenience_channel);
1399 }
1400 free(fd_entry);
1401 });
1402 return fd_entry;
1403 }
1404
1405 static dispatch_fd_entry_t
1406 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
1407 dev_t dev, mode_t mode)
1408 {
1409 // On devs lock queue
1410 _dispatch_fd_debug("fd entry create with path %s", -1, path_data->path);
1411 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1412 path_data->channel->queue);
1413 if (S_ISREG(mode)) {
1414 _dispatch_disk_init(fd_entry, major(dev));
1415 } else {
1416 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1417 _DISPATCH_QOS_CLASS_DEFAULT, false));
1418 }
1419 fd_entry->fd = -1;
1420 fd_entry->orig_flags = -1;
1421 fd_entry->path_data = path_data;
1422 fd_entry->stat.dev = dev;
1423 fd_entry->stat.mode = mode;
1424 fd_entry->barrier_queue = dispatch_queue_create(
1425 "com.apple.libdispatch-io.barrierq", NULL);
1426 fd_entry->barrier_group = dispatch_group_create();
1427 // This is the first item run when the close queue is resumed, indicating
1428 // that the channel associated with this entry has been closed and that
1429 // all operations associated with this entry have been freed
1430 dispatch_async(fd_entry->close_queue, ^{
1431 _dispatch_fd_debug("close queue fd_entry cleanup", -1);
1432 if (!fd_entry->disk) {
1433 dispatch_op_direction_t dir;
1434 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1435 _dispatch_stream_dispose(fd_entry, dir);
1436 }
1437 }
1438 if (fd_entry->fd != -1) {
1439 _dispatch_fd_entry_guarded_close(fd_entry, fd_entry->fd);
1440 }
1441 if (fd_entry->path_data->channel) {
1442 // If associated channel has not been released yet, mark it as
1443 // no longer having an fd_entry (for stop after close).
1444 // It is safe to modify channel since we are on close_queue with
1445 // target queue the channel queue
1446 fd_entry->path_data->channel->fd_entry = NULL;
1447 }
1448 });
1449 dispatch_async(fd_entry->close_queue, ^{
1450 _dispatch_fd_debug("close queue release", -1);
1451 dispatch_release(fd_entry->close_queue);
1452 dispatch_release(fd_entry->barrier_queue);
1453 dispatch_release(fd_entry->barrier_group);
1454 free(fd_entry->path_data);
1455 free(fd_entry);
1456 });
1457 return fd_entry;
1458 }
1459
1460 static int
1461 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
1462 {
1463 if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
1464 return 0;
1465 }
1466 if (fd_entry->err) {
1467 return fd_entry->err;
1468 }
1469 int fd = -1;
1470 int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
1471 fd_entry->path_data->oflag | O_NONBLOCK;
1472 open:
1473 fd = _dispatch_fd_entry_guarded_open(fd_entry, fd_entry->path_data->path,
1474 oflag, fd_entry->path_data->mode);
1475 if (fd == -1) {
1476 int err = errno;
1477 if (err == EINTR) {
1478 goto open;
1479 }
1480 (void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err, relaxed);
1481 return err;
1482 }
1483 if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd, relaxed)) {
1484 // Lost the race with another open
1485 _dispatch_fd_entry_guarded_close(fd_entry, fd);
1486 } else {
1487 channel->fd_actual = fd;
1488 }
1489 _dispatch_object_debug(channel, "%s", __func__);
1490 return 0;
1491 }
1492
1493 static void
1494 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
1495 dispatch_io_t channel)
1496 {
1497 if (fd_entry->disk) {
1498 if (channel) {
1499 _dispatch_retain(channel);
1500 }
1501 _dispatch_fd_entry_retain(fd_entry);
1502 dispatch_async(fd_entry->disk->pick_queue, ^{
1503 _dispatch_disk_cleanup_operations(fd_entry->disk, channel);
1504 _dispatch_fd_entry_release(fd_entry);
1505 if (channel) {
1506 _dispatch_release(channel);
1507 }
1508 });
1509 } else {
1510 dispatch_op_direction_t direction;
1511 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1512 dispatch_stream_t stream = fd_entry->streams[direction];
1513 if (!stream) {
1514 continue;
1515 }
1516 if (channel) {
1517 _dispatch_retain(channel);
1518 }
1519 _dispatch_fd_entry_retain(fd_entry);
1520 dispatch_async(stream->dq, ^{
1521 _dispatch_stream_cleanup_operations(stream, channel);
1522 _dispatch_fd_entry_release(fd_entry);
1523 if (channel) {
1524 _dispatch_release(channel);
1525 }
1526 });
1527 }
1528 }
1529 }
1530
1531 #pragma mark -
1532 #pragma mark dispatch_stream_t/dispatch_disk_t
1533
1534 static void
1535 _dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
1536 {
1537 dispatch_op_direction_t direction;
1538 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1539 dispatch_stream_t stream;
1540 stream = _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s));
1541 stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq",
1542 NULL);
1543 dispatch_set_context(stream->dq, stream);
1544 _dispatch_retain(tq);
1545 stream->dq->do_targetq = tq;
1546 TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
1547 TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
1548 fd_entry->streams[direction] = stream;
1549 }
1550 }
1551
1552 static void
1553 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
1554 dispatch_op_direction_t direction)
1555 {
1556 // On close queue
1557 dispatch_stream_t stream = fd_entry->streams[direction];
1558 if (!stream) {
1559 return;
1560 }
1561 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1562 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
1563 if (stream->source) {
1564 // Balanced by source cancel handler:
1565 _dispatch_fd_entry_retain(fd_entry);
1566 dispatch_source_cancel(stream->source);
1567 dispatch_resume(stream->source);
1568 dispatch_release(stream->source);
1569 }
1570 dispatch_set_context(stream->dq, NULL);
1571 dispatch_release(stream->dq);
1572 free(stream);
1573 }
1574
1575 static void
1576 _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
1577 {
1578 // On devs lock queue
1579 dispatch_disk_t disk;
1580 // Check to see if there is an existing entry for the given device
1581 uintptr_t hash = DIO_HASH(dev);
1582 TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
1583 if (disk->dev == dev) {
1584 _dispatch_retain(disk);
1585 goto out;
1586 }
1587 }
1588 // Otherwise create a new entry
1589 size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
1590 disk = _dispatch_alloc(DISPATCH_VTABLE(disk),
1591 sizeof(struct dispatch_disk_s) +
1592 (pending_reqs_depth * sizeof(dispatch_operation_t)));
1593 disk->do_next = DISPATCH_OBJECT_LISTLESS;
1594 disk->do_xref_cnt = -1;
1595 disk->advise_list_depth = pending_reqs_depth;
1596 disk->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
1597 false);
1598 disk->dev = dev;
1599 TAILQ_INIT(&disk->operations);
1600 disk->cur_rq = TAILQ_FIRST(&disk->operations);
1601 char label[45];
1602 snprintf(label, sizeof(label), "com.apple.libdispatch-io.deviceq.%d", dev);
1603 disk->pick_queue = dispatch_queue_create(label, NULL);
1604 TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
1605 out:
1606 fd_entry->disk = disk;
1607 TAILQ_INIT(&fd_entry->stream_ops);
1608 }
1609
1610 void
1611 _dispatch_disk_dispose(dispatch_disk_t disk)
1612 {
1613 uintptr_t hash = DIO_HASH(disk->dev);
1614 TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
1615 dispatch_assert(TAILQ_EMPTY(&disk->operations));
1616 size_t i;
1617 for (i=0; i<disk->advise_list_depth; ++i) {
1618 dispatch_assert(!disk->advise_list[i]);
1619 }
1620 dispatch_release(disk->pick_queue);
1621 }
1622
1623 #pragma mark -
1624 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1625
1626 static inline bool
1627 _dispatch_stream_operation_avail(dispatch_stream_t stream)
1628 {
1629 return !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
1630 !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1631 }
1632
1633 static void
1634 _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
1635 dispatch_operation_t op, dispatch_data_t data)
1636 {
1637 if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
1638 return;
1639 }
1640 _dispatch_object_debug(op, "%s", __func__);
1641 bool no_ops = !_dispatch_stream_operation_avail(stream);
1642 TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
1643 if (no_ops) {
1644 dispatch_async_f(stream->dq, stream->dq,
1645 _dispatch_stream_queue_handler);
1646 }
1647 }
1648
1649 static void
1650 _dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
1651 dispatch_data_t data)
1652 {
1653 if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
1654 return;
1655 }
1656 _dispatch_object_debug(op, "%s", __func__);
1657 if (op->params.type == DISPATCH_IO_STREAM) {
1658 if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
1659 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1660 }
1661 TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
1662 } else {
1663 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1664 }
1665 _dispatch_disk_handler(disk);
1666 }
1667
1668 static void
1669 _dispatch_stream_complete_operation(dispatch_stream_t stream,
1670 dispatch_operation_t op)
1671 {
1672 // On stream queue
1673 _dispatch_object_debug(op, "%s", __func__);
1674 _dispatch_fd_debug("complete operation", op->fd_entry->fd);
1675 TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
1676 if (op == stream->op) {
1677 stream->op = NULL;
1678 }
1679 if (op->timer) {
1680 dispatch_source_cancel(op->timer);
1681 }
1682 // Final release will deliver any pending data
1683 _dispatch_release(op);
1684 }
1685
1686 static void
1687 _dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
1688 {
1689 // On pick queue
1690 _dispatch_object_debug(op, "%s", __func__);
1691 _dispatch_fd_debug("complete operation", op->fd_entry->fd);
1692 // Current request is always the last op returned
1693 if (disk->cur_rq == op) {
1694 disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
1695 operation_list);
1696 }
1697 if (op->params.type == DISPATCH_IO_STREAM) {
1698 // Check if there are other pending stream operations behind it
1699 dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
1700 TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
1701 if (op_next) {
1702 TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
1703 }
1704 }
1705 TAILQ_REMOVE(&disk->operations, op, operation_list);
1706 if (op->timer) {
1707 dispatch_source_cancel(op->timer);
1708 }
1709 // Final release will deliver any pending data
1710 _dispatch_release(op);
1711 }
1712
1713 static dispatch_operation_t
1714 _dispatch_stream_pick_next_operation(dispatch_stream_t stream,
1715 dispatch_operation_t op)
1716 {
1717 // On stream queue
1718 if (!op) {
1719 // On the first run through, pick the first operation
1720 if (!_dispatch_stream_operation_avail(stream)) {
1721 return op;
1722 }
1723 if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
1724 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
1725 } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
1726 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1727 }
1728 return op;
1729 }
1730 if (op->params.type == DISPATCH_IO_STREAM) {
1731 // Stream operations need to be serialized so continue the current
1732 // operation until it is finished
1733 return op;
1734 }
1735 // Get the next random operation (round-robin)
1736 if (op->params.type == DISPATCH_IO_RANDOM) {
1737 op = TAILQ_NEXT(op, operation_list);
1738 if (!op) {
1739 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1740 }
1741 return op;
1742 }
1743 return NULL;
1744 }
1745
1746 static dispatch_operation_t
1747 _dispatch_disk_pick_next_operation(dispatch_disk_t disk)
1748 {
1749 // On pick queue
1750 dispatch_operation_t op;
1751 if (!TAILQ_EMPTY(&disk->operations)) {
1752 if (disk->cur_rq == NULL) {
1753 op = TAILQ_FIRST(&disk->operations);
1754 } else {
1755 op = disk->cur_rq;
1756 do {
1757 op = TAILQ_NEXT(op, operation_list);
1758 if (!op) {
1759 op = TAILQ_FIRST(&disk->operations);
1760 }
1761 // TODO: more involved picking algorithm rdar://problem/8780312
1762 } while (op->active && op != disk->cur_rq);
1763 }
1764 if (!op->active) {
1765 disk->cur_rq = op;
1766 return op;
1767 }
1768 }
1769 return NULL;
1770 }
1771
1772 static void
1773 _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
1774 dispatch_io_t channel)
1775 {
1776 // On stream queue
1777 dispatch_operation_t op, tmp;
1778 typeof(*stream->operations) *operations;
1779 operations = &stream->operations[DISPATCH_IO_RANDOM];
1780 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1781 if (!channel || op->channel == channel) {
1782 _dispatch_stream_complete_operation(stream, op);
1783 }
1784 }
1785 operations = &stream->operations[DISPATCH_IO_STREAM];
1786 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1787 if (!channel || op->channel == channel) {
1788 _dispatch_stream_complete_operation(stream, op);
1789 }
1790 }
1791 if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
1792 dispatch_suspend(stream->source);
1793 stream->source_running = false;
1794 }
1795 }
1796
1797 static void
1798 _dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
1799 {
1800 // On pick queue
1801 dispatch_operation_t op, tmp;
1802 TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
1803 if (!channel || op->channel == channel) {
1804 _dispatch_disk_complete_operation(disk, op);
1805 }
1806 }
1807 }
1808
1809 #pragma mark -
1810 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1811
1812 static dispatch_source_t
1813 _dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
1814 {
1815 // On stream queue
1816 if (stream->source) {
1817 return stream->source;
1818 }
1819 dispatch_fd_t fd = op->fd_entry->fd;
1820 _dispatch_fd_debug("stream source create", fd);
1821 dispatch_source_t source = NULL;
1822 if (op->direction == DOP_DIR_READ) {
1823 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
1824 (uintptr_t)fd, 0, stream->dq);
1825 } else if (op->direction == DOP_DIR_WRITE) {
1826 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE,
1827 (uintptr_t)fd, 0, stream->dq);
1828 } else {
1829 dispatch_assert(op->direction < DOP_DIR_MAX);
1830 return NULL;
1831 }
1832 dispatch_set_context(source, stream);
1833 dispatch_source_set_event_handler_f(source,
1834 _dispatch_stream_source_handler);
1835 // Close queue must not run user cleanup handlers until sources are fully
1836 // unregistered
1837 dispatch_queue_t close_queue = op->fd_entry->close_queue;
1838 dispatch_source_set_cancel_handler(source, ^{
1839 _dispatch_fd_debug("stream source cancel", fd);
1840 dispatch_resume(close_queue);
1841 });
1842 stream->source = source;
1843 return stream->source;
1844 }
1845
1846 static void
1847 _dispatch_stream_source_handler(void *ctx)
1848 {
1849 // On stream queue
1850 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1851 dispatch_suspend(stream->source);
1852 stream->source_running = false;
1853 return _dispatch_stream_handler(stream);
1854 }
1855
1856 static void
1857 _dispatch_stream_queue_handler(void *ctx)
1858 {
1859 // On stream queue
1860 dispatch_stream_t stream = (dispatch_stream_t)dispatch_get_context(ctx);
1861 if (!stream) {
1862 // _dispatch_stream_dispose has been called
1863 return;
1864 }
1865 return _dispatch_stream_handler(stream);
1866 }
1867
1868 static void
1869 _dispatch_stream_handler(void *ctx)
1870 {
1871 // On stream queue
1872 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1873 dispatch_operation_t op;
1874 pick:
1875 op = _dispatch_stream_pick_next_operation(stream, stream->op);
1876 if (!op) {
1877 _dispatch_debug("no operation found: stream %p", stream);
1878 return;
1879 }
1880 int err = _dispatch_io_get_error(op, NULL, true);
1881 if (err) {
1882 op->err = err;
1883 _dispatch_stream_complete_operation(stream, op);
1884 goto pick;
1885 }
1886 stream->op = op;
1887 _dispatch_fd_debug("stream handler", op->fd_entry->fd);
1888 dispatch_fd_entry_t fd_entry = op->fd_entry;
1889 _dispatch_fd_entry_retain(fd_entry);
1890 // For performance analysis
1891 if (!op->total && dispatch_io_defaults.initial_delivery) {
1892 // Empty delivery to signal the start of the operation
1893 _dispatch_fd_debug("initial delivery", op->fd_entry->fd);
1894 _dispatch_operation_deliver_data(op, DOP_DELIVER);
1895 }
1896 // TODO: perform on the operation target queue to get correct priority
1897 int result = _dispatch_operation_perform(op);
1898 dispatch_op_flags_t flags = ~0u;
1899 switch (result) {
1900 case DISPATCH_OP_DELIVER:
1901 flags = DOP_DEFAULT;
1902 // Fall through
1903 case DISPATCH_OP_DELIVER_AND_COMPLETE:
1904 flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
1905 DOP_DEFAULT;
1906 _dispatch_operation_deliver_data(op, flags);
1907 // Fall through
1908 case DISPATCH_OP_COMPLETE:
1909 if (flags != DOP_DEFAULT) {
1910 _dispatch_stream_complete_operation(stream, op);
1911 }
1912 if (_dispatch_stream_operation_avail(stream)) {
1913 dispatch_async_f(stream->dq, stream->dq,
1914 _dispatch_stream_queue_handler);
1915 }
1916 break;
1917 case DISPATCH_OP_COMPLETE_RESUME:
1918 _dispatch_stream_complete_operation(stream, op);
1919 // Fall through
1920 case DISPATCH_OP_RESUME:
1921 if (_dispatch_stream_operation_avail(stream)) {
1922 stream->source_running = true;
1923 dispatch_resume(_dispatch_stream_source(stream, op));
1924 }
1925 break;
1926 case DISPATCH_OP_ERR:
1927 _dispatch_stream_cleanup_operations(stream, op->channel);
1928 break;
1929 case DISPATCH_OP_FD_ERR:
1930 _dispatch_fd_entry_retain(fd_entry);
1931 dispatch_async(fd_entry->barrier_queue, ^{
1932 _dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
1933 _dispatch_fd_entry_release(fd_entry);
1934 });
1935 break;
1936 default:
1937 break;
1938 }
1939 _dispatch_fd_entry_release(fd_entry);
1940 return;
1941 }
1942
1943 static void
1944 _dispatch_disk_handler(void *ctx)
1945 {
1946 // On pick queue
1947 dispatch_disk_t disk = (dispatch_disk_t)ctx;
1948 if (disk->io_active) {
1949 return;
1950 }
1951 _dispatch_fd_debug("disk handler", -1);
1952 dispatch_operation_t op;
1953 size_t i = disk->free_idx, j = disk->req_idx;
1954 if (j <= i) {
1955 j += disk->advise_list_depth;
1956 }
1957 while (i <= j) {
1958 if ((!disk->advise_list[i%disk->advise_list_depth]) &&
1959 (op = _dispatch_disk_pick_next_operation(disk))) {
1960 int err = _dispatch_io_get_error(op, NULL, true);
1961 if (err) {
1962 op->err = err;
1963 _dispatch_disk_complete_operation(disk, op);
1964 continue;
1965 }
1966 _dispatch_retain(op);
1967 disk->advise_list[i%disk->advise_list_depth] = op;
1968 op->active = true;
1969 _dispatch_object_debug(op, "%s", __func__);
1970 } else {
1971 // No more operations to get
1972 break;
1973 }
1974 i++;
1975 }
1976 disk->free_idx = (i%disk->advise_list_depth);
1977 op = disk->advise_list[disk->req_idx];
1978 if (op) {
1979 disk->io_active = true;
1980 dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
1981 }
1982 }
1983
1984 static void
1985 _dispatch_disk_perform(void *ctxt)
1986 {
1987 dispatch_disk_t disk = ctxt;
1988 size_t chunk_size = dispatch_io_defaults.chunk_size;
1989 _dispatch_fd_debug("disk perform", -1);
1990 dispatch_operation_t op;
1991 size_t i = disk->advise_idx, j = disk->free_idx;
1992 if (j <= i) {
1993 j += disk->advise_list_depth;
1994 }
1995 do {
1996 op = disk->advise_list[i%disk->advise_list_depth];
1997 if (!op) {
1998 // Nothing more to advise, must be at free_idx
1999 dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
2000 break;
2001 }
2002 if (op->direction == DOP_DIR_WRITE) {
2003 // TODO: preallocate writes ? rdar://problem/9032172
2004 continue;
2005 }
2006 if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
2007 op->channel)) {
2008 continue;
2009 }
2010 // For performance analysis
2011 if (!op->total && dispatch_io_defaults.initial_delivery) {
2012 // Empty delivery to signal the start of the operation
2013 _dispatch_fd_debug("initial delivery", op->fd_entry->fd);
2014 _dispatch_operation_deliver_data(op, DOP_DELIVER);
2015 }
2016 // Advise two chunks if the list only has one element and this is the
2017 // first advise on the operation
2018 if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
2019 !op->advise_offset) {
2020 chunk_size *= 2;
2021 }
2022 _dispatch_operation_advise(op, chunk_size);
2023 } while (++i < j);
2024 disk->advise_idx = i%disk->advise_list_depth;
2025 op = disk->advise_list[disk->req_idx];
2026 int result = _dispatch_operation_perform(op);
2027 disk->advise_list[disk->req_idx] = NULL;
2028 disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
2029 dispatch_async(disk->pick_queue, ^{
2030 switch (result) {
2031 case DISPATCH_OP_DELIVER:
2032 _dispatch_operation_deliver_data(op, DOP_DEFAULT);
2033 break;
2034 case DISPATCH_OP_COMPLETE:
2035 _dispatch_disk_complete_operation(disk, op);
2036 break;
2037 case DISPATCH_OP_DELIVER_AND_COMPLETE:
2038 _dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
2039 _dispatch_disk_complete_operation(disk, op);
2040 break;
2041 case DISPATCH_OP_ERR:
2042 _dispatch_disk_cleanup_operations(disk, op->channel);
2043 break;
2044 case DISPATCH_OP_FD_ERR:
2045 _dispatch_disk_cleanup_operations(disk, NULL);
2046 break;
2047 default:
2048 dispatch_assert(result);
2049 break;
2050 }
2051 op->active = false;
2052 disk->io_active = false;
2053 _dispatch_disk_handler(disk);
2054 // Balancing the retain in _dispatch_disk_handler. Note that op must be
2055 // released at the very end, since it might hold the last reference to
2056 // the disk
2057 _dispatch_release(op);
2058 });
2059 }
2060
2061 #pragma mark -
2062 #pragma mark dispatch_operation_perform
2063
2064 static void
2065 _dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
2066 {
2067 int err;
2068 struct radvisory advise;
2069 // No point in issuing a read advise for the next chunk if we are already
2070 // a chunk ahead from reading the bytes
2071 if (op->advise_offset > (off_t)(((size_t)op->offset + op->total) +
2072 chunk_size + PAGE_SIZE)) {
2073 return;
2074 }
2075 _dispatch_object_debug(op, "%s", __func__);
2076 advise.ra_count = (int)chunk_size;
2077 if (!op->advise_offset) {
2078 op->advise_offset = op->offset;
2079 // If this is the first time through, align the advised range to a
2080 // page boundary
2081 size_t pg_fraction = ((size_t)op->offset + chunk_size) % PAGE_SIZE;
2082 advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
2083 }
2084 advise.ra_offset = op->advise_offset;
2085 op->advise_offset += advise.ra_count;
2086 _dispatch_io_syscall_switch(err,
2087 fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
2088 case EFBIG: break; // advised past the end of the file rdar://10415691
2089 case ENOTSUP: break; // not all FS support radvise rdar://13484629
2090 // TODO: set disk status on error
2091 default: (void)dispatch_assume_zero(err); break;
2092 );
2093 }
2094
2095 static int
2096 _dispatch_operation_perform(dispatch_operation_t op)
2097 {
2098 int err = _dispatch_io_get_error(op, NULL, true);
2099 if (err) {
2100 goto error;
2101 }
2102 _dispatch_object_debug(op, "%s", __func__);
2103 if (!op->buf) {
2104 size_t max_buf_siz = op->params.high;
2105 size_t chunk_siz = dispatch_io_defaults.chunk_size;
2106 if (op->direction == DOP_DIR_READ) {
2107 // If necessary, create a buffer for the ongoing operation, large
2108 // enough to fit chunk_size but at most high-water
2109 size_t data_siz = dispatch_data_get_size(op->data);
2110 if (data_siz) {
2111 dispatch_assert(data_siz < max_buf_siz);
2112 max_buf_siz -= data_siz;
2113 }
2114 if (max_buf_siz > chunk_siz) {
2115 max_buf_siz = chunk_siz;
2116 }
2117 if (op->length < SIZE_MAX) {
2118 op->buf_siz = op->length - op->total;
2119 if (op->buf_siz > max_buf_siz) {
2120 op->buf_siz = max_buf_siz;
2121 }
2122 } else {
2123 op->buf_siz = max_buf_siz;
2124 }
2125 op->buf = valloc(op->buf_siz);
2126 _dispatch_fd_debug("buffer allocated", op->fd_entry->fd);
2127 } else if (op->direction == DOP_DIR_WRITE) {
2128 // Always write the first data piece, if that is smaller than a
2129 // chunk, accumulate further data pieces until chunk size is reached
2130 if (chunk_siz > max_buf_siz) {
2131 chunk_siz = max_buf_siz;
2132 }
2133 op->buf_siz = 0;
2134 dispatch_data_apply(op->data,
2135 ^(dispatch_data_t region DISPATCH_UNUSED,
2136 size_t offset DISPATCH_UNUSED,
2137 const void* buf DISPATCH_UNUSED, size_t len) {
2138 size_t siz = op->buf_siz + len;
2139 if (!op->buf_siz || siz <= chunk_siz) {
2140 op->buf_siz = siz;
2141 }
2142 return (bool)(siz < chunk_siz);
2143 });
2144 if (op->buf_siz > max_buf_siz) {
2145 op->buf_siz = max_buf_siz;
2146 }
2147 dispatch_data_t d;
2148 d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
2149 op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
2150 NULL);
2151 _dispatch_io_data_release(d);
2152 _dispatch_fd_debug("buffer mapped", op->fd_entry->fd);
2153 }
2154 }
2155 if (op->fd_entry->fd == -1) {
2156 err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
2157 if (err) {
2158 goto error;
2159 }
2160 }
2161 void *buf = op->buf + op->buf_len;
2162 size_t len = op->buf_siz - op->buf_len;
2163 off_t off = (off_t)((size_t)op->offset + op->total);
2164 ssize_t processed = -1;
2165 syscall:
2166 if (op->direction == DOP_DIR_READ) {
2167 if (op->params.type == DISPATCH_IO_STREAM) {
2168 processed = read(op->fd_entry->fd, buf, len);
2169 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2170 processed = pread(op->fd_entry->fd, buf, len, off);
2171 }
2172 } else if (op->direction == DOP_DIR_WRITE) {
2173 if (op->params.type == DISPATCH_IO_STREAM) {
2174 processed = write(op->fd_entry->fd, buf, len);
2175 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2176 processed = pwrite(op->fd_entry->fd, buf, len, off);
2177 }
2178 }
2179 // Encountered an error on the file descriptor
2180 if (processed == -1) {
2181 err = errno;
2182 if (err == EINTR) {
2183 goto syscall;
2184 }
2185 goto error;
2186 }
2187 // EOF is indicated by two handler invocations
2188 if (processed == 0) {
2189 _dispatch_fd_debug("EOF", op->fd_entry->fd);
2190 return DISPATCH_OP_DELIVER_AND_COMPLETE;
2191 }
2192 op->buf_len += (size_t)processed;
2193 op->total += (size_t)processed;
2194 if (op->total == op->length) {
2195 // Finished processing all the bytes requested by the operation
2196 return DISPATCH_OP_COMPLETE;
2197 } else {
2198 // Deliver data only if we satisfy the filters
2199 return DISPATCH_OP_DELIVER;
2200 }
2201 error:
2202 if (err == EAGAIN) {
2203 // For disk based files with blocking I/O we should never get EAGAIN
2204 dispatch_assert(!op->fd_entry->disk);
2205 _dispatch_fd_debug("EAGAIN %d", op->fd_entry->fd, err);
2206 if (op->direction == DOP_DIR_READ && op->total &&
2207 op->channel == op->fd_entry->convenience_channel) {
2208 // Convenience read with available data completes on EAGAIN
2209 return DISPATCH_OP_COMPLETE_RESUME;
2210 }
2211 return DISPATCH_OP_RESUME;
2212 }
2213 op->err = err;
2214 switch (err) {
2215 case ECANCELED:
2216 return DISPATCH_OP_ERR;
2217 case EBADF:
2218 (void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err, relaxed);
2219 return DISPATCH_OP_FD_ERR;
2220 default:
2221 return DISPATCH_OP_COMPLETE;
2222 }
2223 }
2224
2225 static void
2226 _dispatch_operation_deliver_data(dispatch_operation_t op,
2227 dispatch_op_flags_t flags)
2228 {
2229 // Either called from stream resp. pick queue or when op is finalized
2230 dispatch_data_t data = NULL;
2231 int err = 0;
2232 size_t undelivered = op->undelivered + op->buf_len;
2233 bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
2234 (op->flags & DOP_DELIVER);
2235 op->flags = DOP_DEFAULT;
2236 if (!deliver) {
2237 // Don't deliver data until low water mark has been reached
2238 if (undelivered >= op->params.low) {
2239 deliver = true;
2240 } else if (op->buf_len < op->buf_siz) {
2241 // Request buffer is not yet used up
2242 _dispatch_fd_debug("buffer data", op->fd_entry->fd);
2243 return;
2244 }
2245 } else {
2246 err = op->err;
2247 if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
2248 err = ECANCELED;
2249 op->err = err;
2250 }
2251 }
2252 // Deliver data or buffer used up
2253 if (op->direction == DOP_DIR_READ) {
2254 if (op->buf_len) {
2255 void *buf = op->buf;
2256 data = dispatch_data_create(buf, op->buf_len, NULL,
2257 DISPATCH_DATA_DESTRUCTOR_FREE);
2258 op->buf = NULL;
2259 op->buf_len = 0;
2260 dispatch_data_t d = dispatch_data_create_concat(op->data, data);
2261 _dispatch_io_data_release(op->data);
2262 _dispatch_io_data_release(data);
2263 data = d;
2264 } else {
2265 data = op->data;
2266 }
2267 op->data = deliver ? dispatch_data_empty : data;
2268 } else if (op->direction == DOP_DIR_WRITE) {
2269 if (deliver) {
2270 data = dispatch_data_create_subrange(op->data, op->buf_len,
2271 op->length);
2272 }
2273 if (op->buf_data && op->buf_len == op->buf_siz) {
2274 _dispatch_io_data_release(op->buf_data);
2275 op->buf_data = NULL;
2276 op->buf = NULL;
2277 op->buf_len = 0;
2278 // Trim newly written buffer from head of unwritten data
2279 dispatch_data_t d;
2280 if (deliver) {
2281 _dispatch_io_data_retain(data);
2282 d = data;
2283 } else {
2284 d = dispatch_data_create_subrange(op->data, op->buf_siz,
2285 op->length);
2286 }
2287 _dispatch_io_data_release(op->data);
2288 op->data = d;
2289 }
2290 } else {
2291 dispatch_assert(op->direction < DOP_DIR_MAX);
2292 return;
2293 }
2294 if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
2295 op->undelivered = undelivered;
2296 _dispatch_fd_debug("buffer data", op->fd_entry->fd);
2297 return;
2298 }
2299 op->undelivered = 0;
2300 _dispatch_object_debug(op, "%s", __func__);
2301 _dispatch_fd_debug("deliver data", op->fd_entry->fd);
2302 dispatch_op_direction_t direction = op->direction;
2303 dispatch_io_handler_t handler = op->handler;
2304 #if DISPATCH_IO_DEBUG
2305 int fd = op->fd_entry->fd;
2306 #endif
2307 dispatch_fd_entry_t fd_entry = op->fd_entry;
2308 _dispatch_fd_entry_retain(fd_entry);
2309 dispatch_io_t channel = op->channel;
2310 _dispatch_retain(channel);
2311 // Note that data delivery may occur after the operation is freed
2312 dispatch_async(op->op_q, ^{
2313 bool done = (flags & DOP_DONE);
2314 dispatch_data_t d = data;
2315 if (done) {
2316 if (direction == DOP_DIR_READ && err) {
2317 if (dispatch_data_get_size(d)) {
2318 _dispatch_fd_debug("IO handler invoke", fd);
2319 handler(false, d, 0);
2320 }
2321 d = NULL;
2322 } else if (direction == DOP_DIR_WRITE && !err) {
2323 d = NULL;
2324 }
2325 }
2326 _dispatch_fd_debug("IO handler invoke", fd);
2327 handler(done, d, err);
2328 _dispatch_release(channel);
2329 _dispatch_fd_entry_release(fd_entry);
2330 _dispatch_io_data_release(data);
2331 });
2332 }
2333
2334 #pragma mark -
2335 #pragma mark dispatch_io_debug
2336
2337 static size_t
2338 _dispatch_io_debug_attr(dispatch_io_t channel, char* buf, size_t bufsiz)
2339 {
2340 dispatch_queue_t target = channel->do_targetq;
2341 return dsnprintf(buf, bufsiz, "type = %s, fd = 0x%x, %sfd_entry = %p, "
2342 "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = "
2343 "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ",
2344 channel->params.type == DISPATCH_IO_STREAM ? "stream" : "random",
2345 channel->fd_actual, channel->atomic_flags & DIO_STOPPED ?
2346 "stopped, " : channel->atomic_flags & DIO_CLOSED ? "closed, " : "",
2347 channel->fd_entry, channel->queue, target && target->dq_label ?
2348 target->dq_label : "", target, channel->barrier_queue,
2349 channel->barrier_group, channel->err, channel->params.low,
2350 channel->params.high, channel->params.interval_flags &
2351 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "",
2352 channel->params.interval);
2353 }
2354
2355 size_t
2356 _dispatch_io_debug(dispatch_io_t channel, char* buf, size_t bufsiz)
2357 {
2358 size_t offset = 0;
2359 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2360 dx_kind(channel), channel);
2361 offset += _dispatch_object_debug_attr(channel, &buf[offset],
2362 bufsiz - offset);
2363 offset += _dispatch_io_debug_attr(channel, &buf[offset], bufsiz - offset);
2364 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2365 return offset;
2366 }
2367
2368 static size_t
2369 _dispatch_operation_debug_attr(dispatch_operation_t op, char* buf,
2370 size_t bufsiz)
2371 {
2372 dispatch_queue_t target = op->do_targetq;
2373 dispatch_queue_t oqtarget = op->op_q ? op->op_q->do_targetq : NULL;
2374 return dsnprintf(buf, bufsiz, "type = %s %s, fd = 0x%x, fd_entry = %p, "
2375 "channel = %p, queue = %p -> %s[%p], target = %s[%p], "
2376 "offset = %lld, length = %zu, done = %zu, undelivered = %zu, "
2377 "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, "
2378 "interval%s = %llu ", op->params.type == DISPATCH_IO_STREAM ?
2379 "stream" : "random", op->direction == DOP_DIR_READ ? "read" :
2380 "write", op->fd_entry ? op->fd_entry->fd : -1, op->fd_entry,
2381 op->channel, op->op_q, oqtarget && oqtarget->dq_label ?
2382 oqtarget->dq_label : "", oqtarget, target && target->dq_label ?
2383 target->dq_label : "", target, op->offset, op->length, op->total,
2384 op->undelivered + op->buf_len, op->flags, op->err, op->params.low,
2385 op->params.high, op->params.interval_flags &
2386 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "", op->params.interval);
2387 }
2388
2389 size_t
2390 _dispatch_operation_debug(dispatch_operation_t op, char* buf, size_t bufsiz)
2391 {
2392 size_t offset = 0;
2393 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2394 dx_kind(op), op);
2395 offset += _dispatch_object_debug_attr(op, &buf[offset], bufsiz - offset);
2396 offset += _dispatch_operation_debug_attr(op, &buf[offset], bufsiz - offset);
2397 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2398 return offset;
2399 }