]> git.saurik.com Git - apple/libdispatch.git/blob - src/io.c
d66873ba7c97172cfd6cf6fa1617db439cec8821
[apple/libdispatch.git] / src / io.c
1 /*
2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 #ifndef DISPATCH_IO_DEBUG
24 #define DISPATCH_IO_DEBUG DISPATCH_DEBUG
25 #endif
26
27 #if DISPATCH_IO_DEBUG
28 #define _dispatch_fd_debug(msg, fd, args...) \
29 _dispatch_debug("fd[0x%x]: " msg, (fd), ##args)
30 #else
31 #define _dispatch_fd_debug(msg, fd, args...)
32 #endif
33
34 #if USE_OBJC
35 #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
36 #define _dispatch_io_data_release(x) _dispatch_objc_release(x)
37 #else
38 #define _dispatch_io_data_retain(x) dispatch_retain(x)
39 #define _dispatch_io_data_release(x) dispatch_release(x)
40 #endif
41
42 typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
43
44 DISPATCH_EXPORT DISPATCH_NOTHROW
45 void _dispatch_iocntl(uint32_t param, uint64_t value);
46
47 static dispatch_operation_t _dispatch_operation_create(
48 dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
49 size_t length, dispatch_data_t data, dispatch_queue_t queue,
50 dispatch_io_handler_t handler);
51 static void _dispatch_operation_enqueue(dispatch_operation_t op,
52 dispatch_op_direction_t direction, dispatch_data_t data);
53 static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
54 dispatch_operation_t op);
55 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
56 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
57 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
58 dispatch_fd_entry_init_callback_t completion_callback);
59 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
60 uintptr_t hash);
61 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
62 dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
63 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
64 dispatch_io_t channel);
65 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
66 dispatch_io_t channel);
67 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
68 dispatch_queue_t tq);
69 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
70 dispatch_op_direction_t direction);
71 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
72 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
73 dispatch_operation_t operation, dispatch_data_t data);
74 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
75 dispatch_operation_t operation, dispatch_data_t data);
76 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
77 dispatch_io_t channel);
78 static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk,
79 dispatch_io_t channel);
80 static void _dispatch_stream_source_handler(void *ctx);
81 static void _dispatch_stream_queue_handler(void *ctx);
82 static void _dispatch_stream_handler(void *ctx);
83 static void _dispatch_disk_handler(void *ctx);
84 static void _dispatch_disk_perform(void *ctxt);
85 static void _dispatch_operation_advise(dispatch_operation_t op,
86 size_t chunk_size);
87 static int _dispatch_operation_perform(dispatch_operation_t op);
88 static void _dispatch_operation_deliver_data(dispatch_operation_t op,
89 dispatch_op_flags_t flags);
90
91 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
92 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
93 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
94 case EINTR: continue; \
95 __VA_ARGS__ \
96 } \
97 break; \
98 } while (1)
99 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
100 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
101 case 0: break; \
102 __VA_ARGS__ \
103 ); \
104 } while (0)
105 #define _dispatch_io_syscall(__syscall) do { int __err; \
106 _dispatch_io_syscall_switch(__err, __syscall); \
107 } while (0)
108
109 enum {
110 DISPATCH_OP_COMPLETE = 1,
111 DISPATCH_OP_DELIVER,
112 DISPATCH_OP_DELIVER_AND_COMPLETE,
113 DISPATCH_OP_COMPLETE_RESUME,
114 DISPATCH_OP_RESUME,
115 DISPATCH_OP_ERR,
116 DISPATCH_OP_FD_ERR,
117 };
118
119 #define _dispatch_io_Block_copy(x) \
120 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
121
122 #pragma mark -
123 #pragma mark dispatch_io_hashtables
124
125 #if TARGET_OS_EMBEDDED
126 #define DIO_HASH_SIZE 64u // must be a power of two
127 #else
128 #define DIO_HASH_SIZE 256u // must be a power of two
129 #endif
130 #define DIO_HASH(x) ((uintptr_t)(x) & (DIO_HASH_SIZE - 1))
131
132 // Global hashtable of dev_t -> disk_s mappings
133 DISPATCH_CACHELINE_ALIGN
134 static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
135 // Global hashtable of fd -> fd_entry_s mappings
136 DISPATCH_CACHELINE_ALIGN
137 static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
138
139 static dispatch_once_t _dispatch_io_devs_lockq_pred;
140 static dispatch_queue_t _dispatch_io_devs_lockq;
141 static dispatch_queue_t _dispatch_io_fds_lockq;
142
143 static void
144 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
145 {
146 _dispatch_io_fds_lockq = dispatch_queue_create(
147 "com.apple.libdispatch-io.fd_lockq", NULL);
148 unsigned int i;
149 for (i = 0; i < DIO_HASH_SIZE; i++) {
150 TAILQ_INIT(&_dispatch_io_fds[i]);
151 }
152 }
153
154 static void
155 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
156 {
157 _dispatch_io_devs_lockq = dispatch_queue_create(
158 "com.apple.libdispatch-io.dev_lockq", NULL);
159 unsigned int i;
160 for (i = 0; i < DIO_HASH_SIZE; i++) {
161 TAILQ_INIT(&_dispatch_io_devs[i]);
162 }
163 }
164
165 #pragma mark -
166 #pragma mark dispatch_io_defaults
167
168 enum {
169 DISPATCH_IOCNTL_CHUNK_PAGES = 1,
170 DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
171 DISPATCH_IOCNTL_INITIAL_DELIVERY,
172 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
173 };
174
175 static struct dispatch_io_defaults_s {
176 size_t chunk_pages, low_water_chunks, max_pending_io_reqs;
177 bool initial_delivery;
178 } dispatch_io_defaults = {
179 .chunk_pages = DIO_MAX_CHUNK_PAGES,
180 .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
181 .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
182 };
183
184 #define _dispatch_iocntl_set_default(p, v) do { \
185 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
186 } while (0)
187
188 void
189 _dispatch_iocntl(uint32_t param, uint64_t value)
190 {
191 switch (param) {
192 case DISPATCH_IOCNTL_CHUNK_PAGES:
193 _dispatch_iocntl_set_default(chunk_pages, value);
194 break;
195 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
196 _dispatch_iocntl_set_default(low_water_chunks, value);
197 break;
198 case DISPATCH_IOCNTL_INITIAL_DELIVERY:
199 _dispatch_iocntl_set_default(initial_delivery, value);
200 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
201 _dispatch_iocntl_set_default(max_pending_io_reqs, value);
202 break;
203 }
204 }
205
206 #pragma mark -
207 #pragma mark dispatch_io_t
208
209 static dispatch_io_t
210 _dispatch_io_create(dispatch_io_type_t type)
211 {
212 dispatch_io_t channel = _dispatch_alloc(DISPATCH_VTABLE(io),
213 sizeof(struct dispatch_io_s));
214 channel->do_next = DISPATCH_OBJECT_LISTLESS;
215 channel->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
216 true);
217 channel->params.type = type;
218 channel->params.high = SIZE_MAX;
219 channel->params.low = dispatch_io_defaults.low_water_chunks *
220 dispatch_io_defaults.chunk_pages * PAGE_SIZE;
221 channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
222 NULL);
223 return channel;
224 }
225
226 static void
227 _dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
228 dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
229 {
230 // Enqueue the cleanup handler on the suspended close queue
231 if (cleanup_handler) {
232 _dispatch_retain(queue);
233 dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
234 dispatch_async(queue, ^{
235 _dispatch_fd_debug("cleanup handler invoke", -1);
236 cleanup_handler(err);
237 });
238 _dispatch_release(queue);
239 });
240 }
241 if (fd_entry) {
242 channel->fd_entry = fd_entry;
243 dispatch_retain(fd_entry->barrier_queue);
244 dispatch_retain(fd_entry->barrier_group);
245 channel->barrier_queue = fd_entry->barrier_queue;
246 channel->barrier_group = fd_entry->barrier_group;
247 } else {
248 // Still need to create a barrier queue, since all operations go
249 // through it
250 channel->barrier_queue = dispatch_queue_create(
251 "com.apple.libdispatch-io.barrierq", NULL);
252 channel->barrier_group = dispatch_group_create();
253 }
254 }
255
256 void
257 _dispatch_io_dispose(dispatch_io_t channel)
258 {
259 _dispatch_object_debug(channel, "%s", __func__);
260 if (channel->fd_entry &&
261 !(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
262 if (channel->fd_entry->path_data) {
263 // This modification is safe since path_data->channel is checked
264 // only on close_queue (which is still suspended at this point)
265 channel->fd_entry->path_data->channel = NULL;
266 }
267 // Cleanup handlers will only run when all channels related to this
268 // fd are complete
269 _dispatch_fd_entry_release(channel->fd_entry);
270 }
271 if (channel->queue) {
272 dispatch_release(channel->queue);
273 }
274 if (channel->barrier_queue) {
275 dispatch_release(channel->barrier_queue);
276 }
277 if (channel->barrier_group) {
278 dispatch_release(channel->barrier_group);
279 }
280 }
281
282 static int
283 _dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
284 {
285 int err = 0;
286 if (S_ISDIR(mode)) {
287 err = EISDIR;
288 } else if (channel->params.type == DISPATCH_IO_RANDOM &&
289 (S_ISFIFO(mode) || S_ISSOCK(mode))) {
290 err = ESPIPE;
291 }
292 return err;
293 }
294
295 static int
296 _dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
297 bool ignore_closed)
298 {
299 // On _any_ queue
300 int err;
301 if (op) {
302 channel = op->channel;
303 }
304 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
305 if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
306 err = ECANCELED;
307 } else {
308 err = 0;
309 }
310 } else {
311 err = op ? op->fd_entry->err : channel->err;
312 }
313 return err;
314 }
315
316 #pragma mark -
317 #pragma mark dispatch_io_channels
318
319 dispatch_io_t
320 dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
321 dispatch_queue_t queue, void (^cleanup_handler)(int))
322 {
323 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
324 return NULL;
325 }
326 _dispatch_fd_debug("io create", fd);
327 dispatch_io_t channel = _dispatch_io_create(type);
328 channel->fd = fd;
329 channel->fd_actual = fd;
330 dispatch_suspend(channel->queue);
331 _dispatch_retain(queue);
332 _dispatch_retain(channel);
333 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
334 // On barrier queue
335 int err = fd_entry->err;
336 if (!err) {
337 err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
338 }
339 if (!err && type == DISPATCH_IO_RANDOM) {
340 off_t f_ptr;
341 _dispatch_io_syscall_switch_noerr(err,
342 f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
343 case 0: channel->f_ptr = f_ptr; break;
344 default: (void)dispatch_assume_zero(err); break;
345 );
346 }
347 channel->err = err;
348 _dispatch_fd_entry_retain(fd_entry);
349 _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
350 dispatch_resume(channel->queue);
351 _dispatch_object_debug(channel, "%s", __func__);
352 _dispatch_release(channel);
353 _dispatch_release(queue);
354 });
355 _dispatch_object_debug(channel, "%s", __func__);
356 return channel;
357 }
358
359 dispatch_io_t
360 dispatch_io_create_f(dispatch_io_type_t type, dispatch_fd_t fd,
361 dispatch_queue_t queue, void *context,
362 void (*cleanup_handler)(void *context, int error))
363 {
364 return dispatch_io_create(type, fd, queue, !cleanup_handler ? NULL :
365 ^(int error){ cleanup_handler(context, error); });
366 }
367
368 dispatch_io_t
369 dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
370 int oflag, mode_t mode, dispatch_queue_t queue,
371 void (^cleanup_handler)(int error))
372 {
373 if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
374 !(path && *path == '/')) {
375 return NULL;
376 }
377 size_t pathlen = strlen(path);
378 dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
379 if (!path_data) {
380 return NULL;
381 }
382 _dispatch_fd_debug("io create with path %s", -1, path);
383 dispatch_io_t channel = _dispatch_io_create(type);
384 channel->fd = -1;
385 channel->fd_actual = -1;
386 path_data->channel = channel;
387 path_data->oflag = oflag;
388 path_data->mode = mode;
389 path_data->pathlen = pathlen;
390 memcpy(path_data->path, path, pathlen + 1);
391 _dispatch_retain(queue);
392 _dispatch_retain(channel);
393 dispatch_async(channel->queue, ^{
394 int err = 0;
395 struct stat st;
396 _dispatch_io_syscall_switch_noerr(err,
397 (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW ||
398 (path_data->oflag & O_SYMLINK) == O_SYMLINK ?
399 lstat(path_data->path, &st) : stat(path_data->path, &st),
400 case 0:
401 err = _dispatch_io_validate_type(channel, st.st_mode);
402 break;
403 default:
404 if ((path_data->oflag & O_CREAT) &&
405 (*(path_data->path + path_data->pathlen - 1) != '/')) {
406 // Check parent directory
407 char *c = strrchr(path_data->path, '/');
408 dispatch_assert(c);
409 *c = 0;
410 int perr;
411 _dispatch_io_syscall_switch_noerr(perr,
412 stat(path_data->path, &st),
413 case 0:
414 // Since the parent directory exists, open() will
415 // create a regular file after the fd_entry has
416 // been filled in
417 st.st_mode = S_IFREG;
418 err = 0;
419 break;
420 );
421 *c = '/';
422 }
423 break;
424 );
425 channel->err = err;
426 if (err) {
427 free(path_data);
428 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
429 _dispatch_release(channel);
430 _dispatch_release(queue);
431 return;
432 }
433 dispatch_suspend(channel->queue);
434 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
435 _dispatch_io_devs_lockq_init);
436 dispatch_async(_dispatch_io_devs_lockq, ^{
437 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
438 path_data, st.st_dev, st.st_mode);
439 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
440 dispatch_resume(channel->queue);
441 _dispatch_object_debug(channel, "%s", __func__);
442 _dispatch_release(channel);
443 _dispatch_release(queue);
444 });
445 });
446 _dispatch_object_debug(channel, "%s", __func__);
447 return channel;
448 }
449
450 dispatch_io_t
451 dispatch_io_create_with_path_f(dispatch_io_type_t type, const char *path,
452 int oflag, mode_t mode, dispatch_queue_t queue, void *context,
453 void (*cleanup_handler)(void *context, int error))
454 {
455 return dispatch_io_create_with_path(type, path, oflag, mode, queue,
456 !cleanup_handler ? NULL :
457 ^(int error){ cleanup_handler(context, error); });
458 }
459
460 dispatch_io_t
461 dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
462 dispatch_queue_t queue, void (^cleanup_handler)(int error))
463 {
464 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
465 return NULL;
466 }
467 _dispatch_fd_debug("io create with io %p", -1, in_channel);
468 dispatch_io_t channel = _dispatch_io_create(type);
469 dispatch_suspend(channel->queue);
470 _dispatch_retain(queue);
471 _dispatch_retain(channel);
472 _dispatch_retain(in_channel);
473 dispatch_async(in_channel->queue, ^{
474 int err0 = _dispatch_io_get_error(NULL, in_channel, false);
475 if (err0) {
476 channel->err = err0;
477 _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
478 dispatch_resume(channel->queue);
479 _dispatch_release(channel);
480 _dispatch_release(in_channel);
481 _dispatch_release(queue);
482 return;
483 }
484 dispatch_async(in_channel->barrier_queue, ^{
485 int err = _dispatch_io_get_error(NULL, in_channel, false);
486 // If there is no error, the fd_entry for the in_channel is valid.
487 // Since we are running on in_channel's queue, the fd_entry has been
488 // fully resolved and will stay valid for the duration of this block
489 if (!err) {
490 err = in_channel->err;
491 if (!err) {
492 err = in_channel->fd_entry->err;
493 }
494 }
495 if (!err) {
496 err = _dispatch_io_validate_type(channel,
497 in_channel->fd_entry->stat.mode);
498 }
499 if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
500 off_t f_ptr;
501 _dispatch_io_syscall_switch_noerr(err,
502 f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
503 case 0: channel->f_ptr = f_ptr; break;
504 default: (void)dispatch_assume_zero(err); break;
505 );
506 }
507 channel->err = err;
508 if (err) {
509 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
510 dispatch_resume(channel->queue);
511 _dispatch_release(channel);
512 _dispatch_release(in_channel);
513 _dispatch_release(queue);
514 return;
515 }
516 if (in_channel->fd == -1) {
517 // in_channel was created from path
518 channel->fd = -1;
519 channel->fd_actual = -1;
520 mode_t mode = in_channel->fd_entry->stat.mode;
521 dev_t dev = in_channel->fd_entry->stat.dev;
522 size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
523 in_channel->fd_entry->path_data->pathlen + 1;
524 dispatch_io_path_data_t path_data = malloc(path_data_len);
525 memcpy(path_data, in_channel->fd_entry->path_data,
526 path_data_len);
527 path_data->channel = channel;
528 // lockq_io_devs is known to already exist
529 dispatch_async(_dispatch_io_devs_lockq, ^{
530 dispatch_fd_entry_t fd_entry;
531 fd_entry = _dispatch_fd_entry_create_with_path(path_data,
532 dev, mode);
533 _dispatch_io_init(channel, fd_entry, queue, 0,
534 cleanup_handler);
535 dispatch_resume(channel->queue);
536 _dispatch_release(channel);
537 _dispatch_release(queue);
538 });
539 } else {
540 dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
541 channel->fd = in_channel->fd;
542 channel->fd_actual = in_channel->fd_actual;
543 _dispatch_fd_entry_retain(fd_entry);
544 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
545 dispatch_resume(channel->queue);
546 _dispatch_release(channel);
547 _dispatch_release(queue);
548 }
549 _dispatch_release(in_channel);
550 _dispatch_object_debug(channel, "%s", __func__);
551 });
552 });
553 _dispatch_object_debug(channel, "%s", __func__);
554 return channel;
555 }
556
557 dispatch_io_t
558 dispatch_io_create_with_io_f(dispatch_io_type_t type, dispatch_io_t in_channel,
559 dispatch_queue_t queue, void *context,
560 void (*cleanup_handler)(void *context, int error))
561 {
562 return dispatch_io_create_with_io(type, in_channel, queue,
563 !cleanup_handler ? NULL :
564 ^(int error){ cleanup_handler(context, error); });
565 }
566
567 #pragma mark -
568 #pragma mark dispatch_io_accessors
569
570 void
571 dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
572 {
573 _dispatch_retain(channel);
574 dispatch_async(channel->queue, ^{
575 _dispatch_fd_debug("io set high water", channel->fd);
576 if (channel->params.low > high_water) {
577 channel->params.low = high_water;
578 }
579 channel->params.high = high_water ? high_water : 1;
580 _dispatch_release(channel);
581 });
582 }
583
584 void
585 dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
586 {
587 _dispatch_retain(channel);
588 dispatch_async(channel->queue, ^{
589 _dispatch_fd_debug("io set low water", channel->fd);
590 if (channel->params.high < low_water) {
591 channel->params.high = low_water ? low_water : 1;
592 }
593 channel->params.low = low_water;
594 _dispatch_release(channel);
595 });
596 }
597
598 void
599 dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
600 unsigned long flags)
601 {
602 _dispatch_retain(channel);
603 dispatch_async(channel->queue, ^{
604 _dispatch_fd_debug("io set interval", channel->fd);
605 channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX;
606 channel->params.interval_flags = flags;
607 _dispatch_release(channel);
608 });
609 }
610
611 void
612 _dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
613 {
614 _dispatch_retain(dq);
615 _dispatch_retain(channel);
616 dispatch_async(channel->queue, ^{
617 dispatch_queue_t prev_dq = channel->do_targetq;
618 channel->do_targetq = dq;
619 _dispatch_release(prev_dq);
620 _dispatch_object_debug(channel, "%s", __func__);
621 _dispatch_release(channel);
622 });
623 }
624
625 dispatch_fd_t
626 dispatch_io_get_descriptor(dispatch_io_t channel)
627 {
628 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
629 return -1;
630 }
631 dispatch_fd_t fd = channel->fd_actual;
632 if (fd == -1 && _dispatch_thread_getspecific(dispatch_io_key) == channel &&
633 !_dispatch_io_get_error(NULL, channel, false)) {
634 dispatch_fd_entry_t fd_entry = channel->fd_entry;
635 (void)_dispatch_fd_entry_open(fd_entry, channel);
636 }
637 return channel->fd_actual;
638 }
639
640 #pragma mark -
641 #pragma mark dispatch_io_operations
642
643 static void
644 _dispatch_io_stop(dispatch_io_t channel)
645 {
646 _dispatch_fd_debug("io stop", channel->fd);
647 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed);
648 _dispatch_retain(channel);
649 dispatch_async(channel->queue, ^{
650 dispatch_async(channel->barrier_queue, ^{
651 _dispatch_object_debug(channel, "%s", __func__);
652 dispatch_fd_entry_t fd_entry = channel->fd_entry;
653 if (fd_entry) {
654 _dispatch_fd_debug("io stop cleanup", channel->fd);
655 _dispatch_fd_entry_cleanup_operations(fd_entry, channel);
656 if (!(channel->atomic_flags & DIO_CLOSED)) {
657 channel->fd_entry = NULL;
658 _dispatch_fd_entry_release(fd_entry);
659 }
660 } else if (channel->fd != -1) {
661 // Stop after close, need to check if fd_entry still exists
662 _dispatch_retain(channel);
663 dispatch_async(_dispatch_io_fds_lockq, ^{
664 _dispatch_object_debug(channel, "%s", __func__);
665 _dispatch_fd_debug("io stop after close cleanup",
666 channel->fd);
667 dispatch_fd_entry_t fdi;
668 uintptr_t hash = DIO_HASH(channel->fd);
669 TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
670 if (fdi->fd == channel->fd) {
671 _dispatch_fd_entry_cleanup_operations(fdi, channel);
672 break;
673 }
674 }
675 _dispatch_release(channel);
676 });
677 }
678 _dispatch_release(channel);
679 });
680 });
681 }
682
683 void
684 dispatch_io_close(dispatch_io_t channel, unsigned long flags)
685 {
686 if (flags & DISPATCH_IO_STOP) {
687 // Don't stop an already stopped channel
688 if (channel->atomic_flags & DIO_STOPPED) {
689 return;
690 }
691 return _dispatch_io_stop(channel);
692 }
693 // Don't close an already closed or stopped channel
694 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
695 return;
696 }
697 _dispatch_retain(channel);
698 dispatch_async(channel->queue, ^{
699 dispatch_async(channel->barrier_queue, ^{
700 _dispatch_object_debug(channel, "%s", __func__);
701 _dispatch_fd_debug("io close", channel->fd);
702 if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
703 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED,
704 relaxed);
705 dispatch_fd_entry_t fd_entry = channel->fd_entry;
706 if (fd_entry) {
707 if (!fd_entry->path_data) {
708 channel->fd_entry = NULL;
709 }
710 _dispatch_fd_entry_release(fd_entry);
711 }
712 }
713 _dispatch_release(channel);
714 });
715 });
716 }
717
718 void
719 dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
720 {
721 _dispatch_retain(channel);
722 dispatch_async(channel->queue, ^{
723 dispatch_queue_t io_q = channel->do_targetq;
724 dispatch_queue_t barrier_queue = channel->barrier_queue;
725 dispatch_group_t barrier_group = channel->barrier_group;
726 dispatch_async(barrier_queue, ^{
727 dispatch_suspend(barrier_queue);
728 dispatch_group_notify(barrier_group, io_q, ^{
729 _dispatch_object_debug(channel, "%s", __func__);
730 _dispatch_thread_setspecific(dispatch_io_key, channel);
731 barrier();
732 _dispatch_thread_setspecific(dispatch_io_key, NULL);
733 dispatch_resume(barrier_queue);
734 _dispatch_release(channel);
735 });
736 });
737 });
738 }
739
740 void
741 dispatch_io_barrier_f(dispatch_io_t channel, void *context,
742 dispatch_function_t barrier)
743 {
744 return dispatch_io_barrier(channel, ^{ barrier(context); });
745 }
746
747 void
748 dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
749 dispatch_queue_t queue, dispatch_io_handler_t handler)
750 {
751 _dispatch_retain(channel);
752 _dispatch_retain(queue);
753 dispatch_async(channel->queue, ^{
754 dispatch_operation_t op;
755 op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
756 length, dispatch_data_empty, queue, handler);
757 if (op) {
758 dispatch_queue_t barrier_q = channel->barrier_queue;
759 dispatch_async(barrier_q, ^{
760 _dispatch_operation_enqueue(op, DOP_DIR_READ,
761 dispatch_data_empty);
762 });
763 }
764 _dispatch_release(channel);
765 _dispatch_release(queue);
766 });
767 }
768
769 void
770 dispatch_io_read_f(dispatch_io_t channel, off_t offset, size_t length,
771 dispatch_queue_t queue, void *context,
772 dispatch_io_handler_function_t handler)
773 {
774 return dispatch_io_read(channel, offset, length, queue,
775 ^(bool done, dispatch_data_t d, int error){
776 handler(context, done, d, error);
777 });
778 }
779
780 void
781 dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
782 dispatch_queue_t queue, dispatch_io_handler_t handler)
783 {
784 _dispatch_io_data_retain(data);
785 _dispatch_retain(channel);
786 _dispatch_retain(queue);
787 dispatch_async(channel->queue, ^{
788 dispatch_operation_t op;
789 op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
790 dispatch_data_get_size(data), data, queue, handler);
791 if (op) {
792 dispatch_queue_t barrier_q = channel->barrier_queue;
793 dispatch_async(barrier_q, ^{
794 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
795 _dispatch_io_data_release(data);
796 });
797 } else {
798 _dispatch_io_data_release(data);
799 }
800 _dispatch_release(channel);
801 _dispatch_release(queue);
802 });
803 }
804
805 void
806 dispatch_io_write_f(dispatch_io_t channel, off_t offset, dispatch_data_t data,
807 dispatch_queue_t queue, void *context,
808 dispatch_io_handler_function_t handler)
809 {
810 return dispatch_io_write(channel, offset, data, queue,
811 ^(bool done, dispatch_data_t d, int error){
812 handler(context, done, d, error);
813 });
814 }
815
816 void
817 dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
818 void (^handler)(dispatch_data_t, int))
819 {
820 _dispatch_retain(queue);
821 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
822 // On barrier queue
823 if (fd_entry->err) {
824 int err = fd_entry->err;
825 dispatch_async(queue, ^{
826 _dispatch_fd_debug("convenience handler invoke", fd);
827 handler(dispatch_data_empty, err);
828 });
829 _dispatch_release(queue);
830 return;
831 }
832 // Safe to access fd_entry on barrier queue
833 dispatch_io_t channel = fd_entry->convenience_channel;
834 if (!channel) {
835 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
836 channel->fd = fd;
837 channel->fd_actual = fd;
838 channel->fd_entry = fd_entry;
839 dispatch_retain(fd_entry->barrier_queue);
840 dispatch_retain(fd_entry->barrier_group);
841 channel->barrier_queue = fd_entry->barrier_queue;
842 channel->barrier_group = fd_entry->barrier_group;
843 fd_entry->convenience_channel = channel;
844 }
845 __block dispatch_data_t deliver_data = dispatch_data_empty;
846 __block int err = 0;
847 dispatch_async(fd_entry->close_queue, ^{
848 dispatch_async(queue, ^{
849 _dispatch_fd_debug("convenience handler invoke", fd);
850 handler(deliver_data, err);
851 _dispatch_io_data_release(deliver_data);
852 });
853 _dispatch_release(queue);
854 });
855 dispatch_operation_t op =
856 _dispatch_operation_create(DOP_DIR_READ, channel, 0,
857 length, dispatch_data_empty,
858 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false),
859 ^(bool done, dispatch_data_t data, int error) {
860 if (data) {
861 data = dispatch_data_create_concat(deliver_data, data);
862 _dispatch_io_data_release(deliver_data);
863 deliver_data = data;
864 }
865 if (done) {
866 err = error;
867 }
868 });
869 if (op) {
870 _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
871 }
872 });
873 }
874
875 void
876 dispatch_read_f(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
877 void *context, void (*handler)(void *, dispatch_data_t, int))
878 {
879 return dispatch_read(fd, length, queue, ^(dispatch_data_t d, int error){
880 handler(context, d, error);
881 });
882 }
883
884 void
885 dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
886 void (^handler)(dispatch_data_t, int))
887 {
888 _dispatch_io_data_retain(data);
889 _dispatch_retain(queue);
890 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
891 // On barrier queue
892 if (fd_entry->err) {
893 int err = fd_entry->err;
894 dispatch_async(queue, ^{
895 _dispatch_fd_debug("convenience handler invoke", fd);
896 handler(NULL, err);
897 });
898 _dispatch_release(queue);
899 return;
900 }
901 // Safe to access fd_entry on barrier queue
902 dispatch_io_t channel = fd_entry->convenience_channel;
903 if (!channel) {
904 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
905 channel->fd = fd;
906 channel->fd_actual = fd;
907 channel->fd_entry = fd_entry;
908 dispatch_retain(fd_entry->barrier_queue);
909 dispatch_retain(fd_entry->barrier_group);
910 channel->barrier_queue = fd_entry->barrier_queue;
911 channel->barrier_group = fd_entry->barrier_group;
912 fd_entry->convenience_channel = channel;
913 }
914 __block dispatch_data_t deliver_data = NULL;
915 __block int err = 0;
916 dispatch_async(fd_entry->close_queue, ^{
917 dispatch_async(queue, ^{
918 _dispatch_fd_debug("convenience handler invoke", fd);
919 handler(deliver_data, err);
920 if (deliver_data) {
921 _dispatch_io_data_release(deliver_data);
922 }
923 });
924 _dispatch_release(queue);
925 });
926 dispatch_operation_t op =
927 _dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
928 dispatch_data_get_size(data), data,
929 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false),
930 ^(bool done, dispatch_data_t d, int error) {
931 if (done) {
932 if (d) {
933 _dispatch_io_data_retain(d);
934 deliver_data = d;
935 }
936 err = error;
937 }
938 });
939 if (op) {
940 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
941 }
942 _dispatch_io_data_release(data);
943 });
944 }
945
946 void
947 dispatch_write_f(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
948 void *context, void (*handler)(void *, dispatch_data_t, int))
949 {
950 return dispatch_write(fd, data, queue, ^(dispatch_data_t d, int error){
951 handler(context, d, error);
952 });
953 }
954
955 #pragma mark -
956 #pragma mark dispatch_operation_t
957
958 static dispatch_operation_t
959 _dispatch_operation_create(dispatch_op_direction_t direction,
960 dispatch_io_t channel, off_t offset, size_t length,
961 dispatch_data_t data, dispatch_queue_t queue,
962 dispatch_io_handler_t handler)
963 {
964 // On channel queue
965 dispatch_assert(direction < DOP_DIR_MAX);
966 _dispatch_fd_debug("operation create", channel->fd);
967 #if DISPATCH_IO_DEBUG
968 int fd = channel->fd;
969 #endif
970 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
971 // that can only be NULL if atomic_flags are set rdar://problem/8362514
972 int err = _dispatch_io_get_error(NULL, channel, false);
973 if (err || !length) {
974 _dispatch_io_data_retain(data);
975 _dispatch_retain(queue);
976 dispatch_async(channel->barrier_queue, ^{
977 dispatch_async(queue, ^{
978 dispatch_data_t d = data;
979 if (direction == DOP_DIR_READ && err) {
980 d = NULL;
981 } else if (direction == DOP_DIR_WRITE && !err) {
982 d = NULL;
983 }
984 _dispatch_fd_debug("IO handler invoke", fd);
985 handler(true, d, err);
986 _dispatch_io_data_release(data);
987 });
988 _dispatch_release(queue);
989 });
990 return NULL;
991 }
992 dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation),
993 sizeof(struct dispatch_operation_s));
994 op->do_next = DISPATCH_OBJECT_LISTLESS;
995 op->do_xref_cnt = -1; // operation object is not exposed externally
996 op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
997 op->op_q->do_targetq = queue;
998 _dispatch_retain(queue);
999 op->active = false;
1000 op->direction = direction;
1001 op->offset = offset + channel->f_ptr;
1002 op->length = length;
1003 op->handler = _dispatch_io_Block_copy(handler);
1004 _dispatch_retain(channel);
1005 op->channel = channel;
1006 op->params = channel->params;
1007 // Take a snapshot of the priority of the channel queue. The actual I/O
1008 // for this operation will be performed at this priority
1009 dispatch_queue_t targetq = op->channel->do_targetq;
1010 while (fastpath(targetq->do_targetq)) {
1011 targetq = targetq->do_targetq;
1012 }
1013 op->do_targetq = targetq;
1014 _dispatch_object_debug(op, "%s", __func__);
1015 return op;
1016 }
1017
1018 void
1019 _dispatch_operation_dispose(dispatch_operation_t op)
1020 {
1021 _dispatch_object_debug(op, "%s", __func__);
1022 // Deliver the data if there's any
1023 if (op->fd_entry) {
1024 _dispatch_operation_deliver_data(op, DOP_DONE);
1025 dispatch_group_leave(op->fd_entry->barrier_group);
1026 _dispatch_fd_entry_release(op->fd_entry);
1027 }
1028 if (op->channel) {
1029 _dispatch_release(op->channel);
1030 }
1031 if (op->timer) {
1032 dispatch_release(op->timer);
1033 }
1034 // For write operations, op->buf is owned by op->buf_data
1035 if (op->buf && op->direction == DOP_DIR_READ) {
1036 free(op->buf);
1037 }
1038 if (op->buf_data) {
1039 _dispatch_io_data_release(op->buf_data);
1040 }
1041 if (op->data) {
1042 _dispatch_io_data_release(op->data);
1043 }
1044 if (op->op_q) {
1045 dispatch_release(op->op_q);
1046 }
1047 Block_release(op->handler);
1048 }
1049
1050 static void
1051 _dispatch_operation_enqueue(dispatch_operation_t op,
1052 dispatch_op_direction_t direction, dispatch_data_t data)
1053 {
1054 // Called from the barrier queue
1055 _dispatch_io_data_retain(data);
1056 // If channel is closed or stopped, then call the handler immediately
1057 int err = _dispatch_io_get_error(NULL, op->channel, false);
1058 if (err) {
1059 dispatch_io_handler_t handler = op->handler;
1060 dispatch_async(op->op_q, ^{
1061 dispatch_data_t d = data;
1062 if (direction == DOP_DIR_READ && err) {
1063 d = NULL;
1064 } else if (direction == DOP_DIR_WRITE && !err) {
1065 d = NULL;
1066 }
1067 handler(true, d, err);
1068 _dispatch_io_data_release(data);
1069 });
1070 _dispatch_release(op);
1071 return;
1072 }
1073 // Finish operation init
1074 op->fd_entry = op->channel->fd_entry;
1075 _dispatch_fd_entry_retain(op->fd_entry);
1076 dispatch_group_enter(op->fd_entry->barrier_group);
1077 dispatch_disk_t disk = op->fd_entry->disk;
1078 if (!disk) {
1079 dispatch_stream_t stream = op->fd_entry->streams[direction];
1080 dispatch_async(stream->dq, ^{
1081 _dispatch_stream_enqueue_operation(stream, op, data);
1082 _dispatch_io_data_release(data);
1083 });
1084 } else {
1085 dispatch_async(disk->pick_queue, ^{
1086 _dispatch_disk_enqueue_operation(disk, op, data);
1087 _dispatch_io_data_release(data);
1088 });
1089 }
1090 }
1091
1092 static bool
1093 _dispatch_operation_should_enqueue(dispatch_operation_t op,
1094 dispatch_queue_t tq, dispatch_data_t data)
1095 {
1096 // On stream queue or disk queue
1097 _dispatch_fd_debug("enqueue operation", op->fd_entry->fd);
1098 _dispatch_io_data_retain(data);
1099 op->data = data;
1100 int err = _dispatch_io_get_error(op, NULL, true);
1101 if (err) {
1102 op->err = err;
1103 // Final release
1104 _dispatch_release(op);
1105 return false;
1106 }
1107 if (op->params.interval) {
1108 dispatch_resume(_dispatch_operation_timer(tq, op));
1109 }
1110 return true;
1111 }
1112
1113 static dispatch_source_t
1114 _dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
1115 {
1116 // On stream queue or pick queue
1117 if (op->timer) {
1118 return op->timer;
1119 }
1120 dispatch_source_t timer = dispatch_source_create(
1121 DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
1122 dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW,
1123 (int64_t)op->params.interval), op->params.interval, 0);
1124 dispatch_source_set_event_handler(timer, ^{
1125 // On stream queue or pick queue
1126 if (dispatch_source_testcancel(timer)) {
1127 // Do nothing. The operation has already completed
1128 return;
1129 }
1130 dispatch_op_flags_t flags = DOP_DEFAULT;
1131 if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
1132 // Deliver even if there is less data than the low-water mark
1133 flags |= DOP_DELIVER;
1134 }
1135 // If the operation is active, dont deliver data
1136 if ((op->active) && (flags & DOP_DELIVER)) {
1137 op->flags = flags;
1138 } else {
1139 _dispatch_operation_deliver_data(op, flags);
1140 }
1141 });
1142 op->timer = timer;
1143 return op->timer;
1144 }
1145
1146 #pragma mark -
1147 #pragma mark dispatch_fd_entry_t
1148
1149 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1150 static void
1151 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry)
1152 {
1153 guardid_t guard = fd_entry;
1154 const unsigned int guard_flags = GUARD_CLOSE;
1155 int err, fd_flags = 0;
1156 _dispatch_io_syscall_switch_noerr(err,
1157 change_fdguard_np(fd_entry->fd, NULL, 0, &guard, guard_flags,
1158 &fd_flags),
1159 case 0:
1160 fd_entry->guard_flags = guard_flags;
1161 fd_entry->orig_fd_flags = fd_flags;
1162 break;
1163 case EPERM: break;
1164 default: (void)dispatch_assume_zero(err); break;
1165 );
1166 }
1167
1168 static void
1169 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry)
1170 {
1171 if (!fd_entry->guard_flags) {
1172 return;
1173 }
1174 guardid_t guard = fd_entry;
1175 int err, fd_flags = fd_entry->orig_fd_flags;
1176 _dispatch_io_syscall_switch(err,
1177 change_fdguard_np(fd_entry->fd, &guard, fd_entry->guard_flags, NULL, 0,
1178 &fd_flags),
1179 default: (void)dispatch_assume_zero(err); break;
1180 );
1181 }
1182 #else
1183 static inline void
1184 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1185 static inline void
1186 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1187 #endif // DISPATCH_USE_GUARDED_FD
1188
1189 static inline int
1190 _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry, const char *path,
1191 int oflag, mode_t mode) {
1192 #if DISPATCH_USE_GUARDED_FD
1193 guardid_t guard = (uintptr_t)fd_entry;
1194 const unsigned int guard_flags = GUARD_CLOSE | GUARD_DUP |
1195 GUARD_SOCKET_IPC | GUARD_FILEPORT;
1196 int fd = guarded_open_np(path, &guard, guard_flags, oflag | O_CLOEXEC,
1197 mode);
1198 if (fd != -1) {
1199 fd_entry->guard_flags = guard_flags;
1200 return fd;
1201 }
1202 errno = 0;
1203 #endif
1204 return open(path, oflag, mode);
1205 (void)fd_entry;
1206 }
1207
1208 static inline int
1209 _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry, int fd) {
1210 #if DISPATCH_USE_GUARDED_FD
1211 if (fd_entry->guard_flags) {
1212 guardid_t guard = (uintptr_t)fd_entry;
1213 return guarded_close_np(fd, &guard);
1214 } else
1215 #endif
1216 {
1217 return close(fd);
1218 }
1219 (void)fd_entry;
1220 }
1221
1222 static inline void
1223 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
1224 dispatch_suspend(fd_entry->close_queue);
1225 }
1226
1227 static inline void
1228 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
1229 dispatch_resume(fd_entry->close_queue);
1230 }
1231
1232 static void
1233 _dispatch_fd_entry_init_async(dispatch_fd_t fd,
1234 dispatch_fd_entry_init_callback_t completion_callback)
1235 {
1236 static dispatch_once_t _dispatch_io_fds_lockq_pred;
1237 dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
1238 _dispatch_io_fds_lockq_init);
1239 dispatch_async(_dispatch_io_fds_lockq, ^{
1240 _dispatch_fd_debug("fd entry init", fd);
1241 dispatch_fd_entry_t fd_entry = NULL;
1242 // Check to see if there is an existing entry for the given fd
1243 uintptr_t hash = DIO_HASH(fd);
1244 TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
1245 if (fd_entry->fd == fd) {
1246 // Retain the fd_entry to ensure it cannot go away until the
1247 // stat() has completed
1248 _dispatch_fd_entry_retain(fd_entry);
1249 break;
1250 }
1251 }
1252 if (!fd_entry) {
1253 // If we did not find an existing entry, create one
1254 fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
1255 }
1256 dispatch_async(fd_entry->barrier_queue, ^{
1257 _dispatch_fd_debug("fd entry init completion", fd);
1258 completion_callback(fd_entry);
1259 // stat() is complete, release reference to fd_entry
1260 _dispatch_fd_entry_release(fd_entry);
1261 });
1262 });
1263 }
1264
1265 static dispatch_fd_entry_t
1266 _dispatch_fd_entry_create(dispatch_queue_t q)
1267 {
1268 dispatch_fd_entry_t fd_entry;
1269 fd_entry = _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s));
1270 fd_entry->close_queue = dispatch_queue_create(
1271 "com.apple.libdispatch-io.closeq", NULL);
1272 // Use target queue to ensure that no concurrent lookups are going on when
1273 // the close queue is running
1274 fd_entry->close_queue->do_targetq = q;
1275 _dispatch_retain(q);
1276 // Suspend the cleanup queue until closing
1277 _dispatch_fd_entry_retain(fd_entry);
1278 return fd_entry;
1279 }
1280
1281 static dispatch_fd_entry_t
1282 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
1283 {
1284 // On fds lock queue
1285 _dispatch_fd_debug("fd entry create", fd);
1286 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1287 _dispatch_io_fds_lockq);
1288 fd_entry->fd = fd;
1289 TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
1290 fd_entry->barrier_queue = dispatch_queue_create(
1291 "com.apple.libdispatch-io.barrierq", NULL);
1292 fd_entry->barrier_group = dispatch_group_create();
1293 dispatch_async(fd_entry->barrier_queue, ^{
1294 _dispatch_fd_debug("fd entry stat", fd);
1295 int err, orig_flags, orig_nosigpipe = -1;
1296 struct stat st;
1297 _dispatch_io_syscall_switch(err,
1298 fstat(fd, &st),
1299 default: fd_entry->err = err; return;
1300 );
1301 fd_entry->stat.dev = st.st_dev;
1302 fd_entry->stat.mode = st.st_mode;
1303 _dispatch_fd_entry_guard(fd_entry);
1304 _dispatch_io_syscall_switch(err,
1305 orig_flags = fcntl(fd, F_GETFL),
1306 default: (void)dispatch_assume_zero(err); break;
1307 );
1308 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1309 if (S_ISFIFO(st.st_mode)) {
1310 _dispatch_io_syscall_switch(err,
1311 orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
1312 default: (void)dispatch_assume_zero(err); break;
1313 );
1314 if (orig_nosigpipe != -1) {
1315 _dispatch_io_syscall_switch(err,
1316 orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
1317 default:
1318 orig_nosigpipe = -1;
1319 (void)dispatch_assume_zero(err);
1320 break;
1321 );
1322 }
1323 }
1324 #endif
1325 if (S_ISREG(st.st_mode)) {
1326 if (orig_flags != -1) {
1327 _dispatch_io_syscall_switch(err,
1328 fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
1329 default:
1330 orig_flags = -1;
1331 (void)dispatch_assume_zero(err);
1332 break;
1333 );
1334 }
1335 int32_t dev = major(st.st_dev);
1336 // We have to get the disk on the global dev queue. The
1337 // barrier queue cannot continue until that is complete
1338 dispatch_suspend(fd_entry->barrier_queue);
1339 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
1340 _dispatch_io_devs_lockq_init);
1341 dispatch_async(_dispatch_io_devs_lockq, ^{
1342 _dispatch_disk_init(fd_entry, dev);
1343 dispatch_resume(fd_entry->barrier_queue);
1344 });
1345 } else {
1346 if (orig_flags != -1) {
1347 _dispatch_io_syscall_switch(err,
1348 fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
1349 default:
1350 orig_flags = -1;
1351 (void)dispatch_assume_zero(err);
1352 break;
1353 );
1354 }
1355 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1356 _DISPATCH_QOS_CLASS_DEFAULT, false));
1357 }
1358 fd_entry->orig_flags = orig_flags;
1359 fd_entry->orig_nosigpipe = orig_nosigpipe;
1360 });
1361 // This is the first item run when the close queue is resumed, indicating
1362 // that all channels associated with this entry have been closed and that
1363 // all operations associated with this entry have been freed
1364 dispatch_async(fd_entry->close_queue, ^{
1365 if (!fd_entry->disk) {
1366 _dispatch_fd_debug("close queue fd_entry cleanup", fd);
1367 dispatch_op_direction_t dir;
1368 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1369 _dispatch_stream_dispose(fd_entry, dir);
1370 }
1371 } else {
1372 dispatch_disk_t disk = fd_entry->disk;
1373 dispatch_async(_dispatch_io_devs_lockq, ^{
1374 _dispatch_release(disk);
1375 });
1376 }
1377 // Remove this entry from the global fd list
1378 TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
1379 });
1380 // If there was a source associated with this stream, disposing of the
1381 // source cancels it and suspends the close queue. Freeing the fd_entry
1382 // structure must happen after the source cancel handler has finished
1383 dispatch_async(fd_entry->close_queue, ^{
1384 _dispatch_fd_debug("close queue release", fd);
1385 dispatch_release(fd_entry->close_queue);
1386 _dispatch_fd_debug("barrier queue release", fd);
1387 dispatch_release(fd_entry->barrier_queue);
1388 _dispatch_fd_debug("barrier group release", fd);
1389 dispatch_release(fd_entry->barrier_group);
1390 if (fd_entry->orig_flags != -1) {
1391 _dispatch_io_syscall(
1392 fcntl(fd, F_SETFL, fd_entry->orig_flags)
1393 );
1394 }
1395 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1396 if (fd_entry->orig_nosigpipe != -1) {
1397 _dispatch_io_syscall(
1398 fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
1399 );
1400 }
1401 #endif
1402 _dispatch_fd_entry_unguard(fd_entry);
1403 if (fd_entry->convenience_channel) {
1404 fd_entry->convenience_channel->fd_entry = NULL;
1405 dispatch_release(fd_entry->convenience_channel);
1406 }
1407 free(fd_entry);
1408 });
1409 return fd_entry;
1410 }
1411
1412 static dispatch_fd_entry_t
1413 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
1414 dev_t dev, mode_t mode)
1415 {
1416 // On devs lock queue
1417 _dispatch_fd_debug("fd entry create with path %s", -1, path_data->path);
1418 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1419 path_data->channel->queue);
1420 if (S_ISREG(mode)) {
1421 _dispatch_disk_init(fd_entry, major(dev));
1422 } else {
1423 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1424 _DISPATCH_QOS_CLASS_DEFAULT, false));
1425 }
1426 fd_entry->fd = -1;
1427 fd_entry->orig_flags = -1;
1428 fd_entry->path_data = path_data;
1429 fd_entry->stat.dev = dev;
1430 fd_entry->stat.mode = mode;
1431 fd_entry->barrier_queue = dispatch_queue_create(
1432 "com.apple.libdispatch-io.barrierq", NULL);
1433 fd_entry->barrier_group = dispatch_group_create();
1434 // This is the first item run when the close queue is resumed, indicating
1435 // that the channel associated with this entry has been closed and that
1436 // all operations associated with this entry have been freed
1437 dispatch_async(fd_entry->close_queue, ^{
1438 _dispatch_fd_debug("close queue fd_entry cleanup", -1);
1439 if (!fd_entry->disk) {
1440 dispatch_op_direction_t dir;
1441 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1442 _dispatch_stream_dispose(fd_entry, dir);
1443 }
1444 }
1445 if (fd_entry->fd != -1) {
1446 _dispatch_fd_entry_guarded_close(fd_entry, fd_entry->fd);
1447 }
1448 if (fd_entry->path_data->channel) {
1449 // If associated channel has not been released yet, mark it as
1450 // no longer having an fd_entry (for stop after close).
1451 // It is safe to modify channel since we are on close_queue with
1452 // target queue the channel queue
1453 fd_entry->path_data->channel->fd_entry = NULL;
1454 }
1455 });
1456 dispatch_async(fd_entry->close_queue, ^{
1457 _dispatch_fd_debug("close queue release", -1);
1458 dispatch_release(fd_entry->close_queue);
1459 dispatch_release(fd_entry->barrier_queue);
1460 dispatch_release(fd_entry->barrier_group);
1461 free(fd_entry->path_data);
1462 free(fd_entry);
1463 });
1464 return fd_entry;
1465 }
1466
1467 static int
1468 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
1469 {
1470 if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
1471 return 0;
1472 }
1473 if (fd_entry->err) {
1474 return fd_entry->err;
1475 }
1476 int fd = -1;
1477 int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
1478 fd_entry->path_data->oflag | O_NONBLOCK;
1479 open:
1480 fd = _dispatch_fd_entry_guarded_open(fd_entry, fd_entry->path_data->path,
1481 oflag, fd_entry->path_data->mode);
1482 if (fd == -1) {
1483 int err = errno;
1484 if (err == EINTR) {
1485 goto open;
1486 }
1487 (void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err, relaxed);
1488 return err;
1489 }
1490 if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd, relaxed)) {
1491 // Lost the race with another open
1492 _dispatch_fd_entry_guarded_close(fd_entry, fd);
1493 } else {
1494 channel->fd_actual = fd;
1495 }
1496 _dispatch_object_debug(channel, "%s", __func__);
1497 return 0;
1498 }
1499
1500 static void
1501 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
1502 dispatch_io_t channel)
1503 {
1504 if (fd_entry->disk) {
1505 if (channel) {
1506 _dispatch_retain(channel);
1507 }
1508 _dispatch_fd_entry_retain(fd_entry);
1509 dispatch_async(fd_entry->disk->pick_queue, ^{
1510 _dispatch_disk_cleanup_operations(fd_entry->disk, channel);
1511 _dispatch_fd_entry_release(fd_entry);
1512 if (channel) {
1513 _dispatch_release(channel);
1514 }
1515 });
1516 } else {
1517 dispatch_op_direction_t direction;
1518 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1519 dispatch_stream_t stream = fd_entry->streams[direction];
1520 if (!stream) {
1521 continue;
1522 }
1523 if (channel) {
1524 _dispatch_retain(channel);
1525 }
1526 _dispatch_fd_entry_retain(fd_entry);
1527 dispatch_async(stream->dq, ^{
1528 _dispatch_stream_cleanup_operations(stream, channel);
1529 _dispatch_fd_entry_release(fd_entry);
1530 if (channel) {
1531 _dispatch_release(channel);
1532 }
1533 });
1534 }
1535 }
1536 }
1537
1538 #pragma mark -
1539 #pragma mark dispatch_stream_t/dispatch_disk_t
1540
1541 static void
1542 _dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
1543 {
1544 dispatch_op_direction_t direction;
1545 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1546 dispatch_stream_t stream;
1547 stream = _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s));
1548 stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq",
1549 NULL);
1550 dispatch_set_context(stream->dq, stream);
1551 _dispatch_retain(tq);
1552 stream->dq->do_targetq = tq;
1553 TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
1554 TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
1555 fd_entry->streams[direction] = stream;
1556 }
1557 }
1558
1559 static void
1560 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
1561 dispatch_op_direction_t direction)
1562 {
1563 // On close queue
1564 dispatch_stream_t stream = fd_entry->streams[direction];
1565 if (!stream) {
1566 return;
1567 }
1568 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1569 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
1570 if (stream->source) {
1571 // Balanced by source cancel handler:
1572 _dispatch_fd_entry_retain(fd_entry);
1573 dispatch_source_cancel(stream->source);
1574 dispatch_resume(stream->source);
1575 dispatch_release(stream->source);
1576 }
1577 dispatch_set_context(stream->dq, NULL);
1578 dispatch_release(stream->dq);
1579 free(stream);
1580 }
1581
1582 static void
1583 _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
1584 {
1585 // On devs lock queue
1586 dispatch_disk_t disk;
1587 // Check to see if there is an existing entry for the given device
1588 uintptr_t hash = DIO_HASH(dev);
1589 TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
1590 if (disk->dev == dev) {
1591 _dispatch_retain(disk);
1592 goto out;
1593 }
1594 }
1595 // Otherwise create a new entry
1596 size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
1597 disk = _dispatch_alloc(DISPATCH_VTABLE(disk),
1598 sizeof(struct dispatch_disk_s) +
1599 (pending_reqs_depth * sizeof(dispatch_operation_t)));
1600 disk->do_next = DISPATCH_OBJECT_LISTLESS;
1601 disk->do_xref_cnt = -1;
1602 disk->advise_list_depth = pending_reqs_depth;
1603 disk->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
1604 false);
1605 disk->dev = dev;
1606 TAILQ_INIT(&disk->operations);
1607 disk->cur_rq = TAILQ_FIRST(&disk->operations);
1608 char label[45];
1609 snprintf(label, sizeof(label), "com.apple.libdispatch-io.deviceq.%d", dev);
1610 disk->pick_queue = dispatch_queue_create(label, NULL);
1611 TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
1612 out:
1613 fd_entry->disk = disk;
1614 TAILQ_INIT(&fd_entry->stream_ops);
1615 }
1616
1617 void
1618 _dispatch_disk_dispose(dispatch_disk_t disk)
1619 {
1620 uintptr_t hash = DIO_HASH(disk->dev);
1621 TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
1622 dispatch_assert(TAILQ_EMPTY(&disk->operations));
1623 size_t i;
1624 for (i=0; i<disk->advise_list_depth; ++i) {
1625 dispatch_assert(!disk->advise_list[i]);
1626 }
1627 dispatch_release(disk->pick_queue);
1628 }
1629
1630 #pragma mark -
1631 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1632
1633 static inline bool
1634 _dispatch_stream_operation_avail(dispatch_stream_t stream)
1635 {
1636 return !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
1637 !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1638 }
1639
1640 static void
1641 _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
1642 dispatch_operation_t op, dispatch_data_t data)
1643 {
1644 if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
1645 return;
1646 }
1647 _dispatch_object_debug(op, "%s", __func__);
1648 bool no_ops = !_dispatch_stream_operation_avail(stream);
1649 TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
1650 if (no_ops) {
1651 dispatch_async_f(stream->dq, stream->dq,
1652 _dispatch_stream_queue_handler);
1653 }
1654 }
1655
1656 static void
1657 _dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
1658 dispatch_data_t data)
1659 {
1660 if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
1661 return;
1662 }
1663 _dispatch_object_debug(op, "%s", __func__);
1664 if (op->params.type == DISPATCH_IO_STREAM) {
1665 if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
1666 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1667 }
1668 TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
1669 } else {
1670 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1671 }
1672 _dispatch_disk_handler(disk);
1673 }
1674
1675 static void
1676 _dispatch_stream_complete_operation(dispatch_stream_t stream,
1677 dispatch_operation_t op)
1678 {
1679 // On stream queue
1680 _dispatch_object_debug(op, "%s", __func__);
1681 _dispatch_fd_debug("complete operation", op->fd_entry->fd);
1682 TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
1683 if (op == stream->op) {
1684 stream->op = NULL;
1685 }
1686 if (op->timer) {
1687 dispatch_source_cancel(op->timer);
1688 }
1689 // Final release will deliver any pending data
1690 _dispatch_release(op);
1691 }
1692
1693 static void
1694 _dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
1695 {
1696 // On pick queue
1697 _dispatch_object_debug(op, "%s", __func__);
1698 _dispatch_fd_debug("complete operation", op->fd_entry->fd);
1699 // Current request is always the last op returned
1700 if (disk->cur_rq == op) {
1701 disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
1702 operation_list);
1703 }
1704 if (op->params.type == DISPATCH_IO_STREAM) {
1705 // Check if there are other pending stream operations behind it
1706 dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
1707 TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
1708 if (op_next) {
1709 TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
1710 }
1711 }
1712 TAILQ_REMOVE(&disk->operations, op, operation_list);
1713 if (op->timer) {
1714 dispatch_source_cancel(op->timer);
1715 }
1716 // Final release will deliver any pending data
1717 _dispatch_release(op);
1718 }
1719
1720 static dispatch_operation_t
1721 _dispatch_stream_pick_next_operation(dispatch_stream_t stream,
1722 dispatch_operation_t op)
1723 {
1724 // On stream queue
1725 if (!op) {
1726 // On the first run through, pick the first operation
1727 if (!_dispatch_stream_operation_avail(stream)) {
1728 return op;
1729 }
1730 if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
1731 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
1732 } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
1733 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1734 }
1735 return op;
1736 }
1737 if (op->params.type == DISPATCH_IO_STREAM) {
1738 // Stream operations need to be serialized so continue the current
1739 // operation until it is finished
1740 return op;
1741 }
1742 // Get the next random operation (round-robin)
1743 if (op->params.type == DISPATCH_IO_RANDOM) {
1744 op = TAILQ_NEXT(op, operation_list);
1745 if (!op) {
1746 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1747 }
1748 return op;
1749 }
1750 return NULL;
1751 }
1752
1753 static dispatch_operation_t
1754 _dispatch_disk_pick_next_operation(dispatch_disk_t disk)
1755 {
1756 // On pick queue
1757 dispatch_operation_t op;
1758 if (!TAILQ_EMPTY(&disk->operations)) {
1759 if (disk->cur_rq == NULL) {
1760 op = TAILQ_FIRST(&disk->operations);
1761 } else {
1762 op = disk->cur_rq;
1763 do {
1764 op = TAILQ_NEXT(op, operation_list);
1765 if (!op) {
1766 op = TAILQ_FIRST(&disk->operations);
1767 }
1768 // TODO: more involved picking algorithm rdar://problem/8780312
1769 } while (op->active && op != disk->cur_rq);
1770 }
1771 if (!op->active) {
1772 disk->cur_rq = op;
1773 return op;
1774 }
1775 }
1776 return NULL;
1777 }
1778
1779 static void
1780 _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
1781 dispatch_io_t channel)
1782 {
1783 // On stream queue
1784 dispatch_operation_t op, tmp;
1785 typeof(*stream->operations) *operations;
1786 operations = &stream->operations[DISPATCH_IO_RANDOM];
1787 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1788 if (!channel || op->channel == channel) {
1789 _dispatch_stream_complete_operation(stream, op);
1790 }
1791 }
1792 operations = &stream->operations[DISPATCH_IO_STREAM];
1793 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1794 if (!channel || op->channel == channel) {
1795 _dispatch_stream_complete_operation(stream, op);
1796 }
1797 }
1798 if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
1799 dispatch_suspend(stream->source);
1800 stream->source_running = false;
1801 }
1802 }
1803
1804 static void
1805 _dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
1806 {
1807 // On pick queue
1808 dispatch_operation_t op, tmp;
1809 TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
1810 if (!channel || op->channel == channel) {
1811 _dispatch_disk_complete_operation(disk, op);
1812 }
1813 }
1814 }
1815
1816 #pragma mark -
1817 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1818
1819 static dispatch_source_t
1820 _dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
1821 {
1822 // On stream queue
1823 if (stream->source) {
1824 return stream->source;
1825 }
1826 dispatch_fd_t fd = op->fd_entry->fd;
1827 _dispatch_fd_debug("stream source create", fd);
1828 dispatch_source_t source = NULL;
1829 if (op->direction == DOP_DIR_READ) {
1830 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
1831 (uintptr_t)fd, 0, stream->dq);
1832 } else if (op->direction == DOP_DIR_WRITE) {
1833 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE,
1834 (uintptr_t)fd, 0, stream->dq);
1835 } else {
1836 dispatch_assert(op->direction < DOP_DIR_MAX);
1837 return NULL;
1838 }
1839 dispatch_set_context(source, stream);
1840 dispatch_source_set_event_handler_f(source,
1841 _dispatch_stream_source_handler);
1842 // Close queue must not run user cleanup handlers until sources are fully
1843 // unregistered
1844 dispatch_queue_t close_queue = op->fd_entry->close_queue;
1845 dispatch_source_set_cancel_handler(source, ^{
1846 _dispatch_fd_debug("stream source cancel", fd);
1847 dispatch_resume(close_queue);
1848 });
1849 stream->source = source;
1850 return stream->source;
1851 }
1852
1853 static void
1854 _dispatch_stream_source_handler(void *ctx)
1855 {
1856 // On stream queue
1857 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1858 dispatch_suspend(stream->source);
1859 stream->source_running = false;
1860 return _dispatch_stream_handler(stream);
1861 }
1862
1863 static void
1864 _dispatch_stream_queue_handler(void *ctx)
1865 {
1866 // On stream queue
1867 dispatch_stream_t stream = (dispatch_stream_t)dispatch_get_context(ctx);
1868 if (!stream) {
1869 // _dispatch_stream_dispose has been called
1870 return;
1871 }
1872 return _dispatch_stream_handler(stream);
1873 }
1874
1875 static void
1876 _dispatch_stream_handler(void *ctx)
1877 {
1878 // On stream queue
1879 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1880 dispatch_operation_t op;
1881 pick:
1882 op = _dispatch_stream_pick_next_operation(stream, stream->op);
1883 if (!op) {
1884 _dispatch_debug("no operation found: stream %p", stream);
1885 return;
1886 }
1887 int err = _dispatch_io_get_error(op, NULL, true);
1888 if (err) {
1889 op->err = err;
1890 _dispatch_stream_complete_operation(stream, op);
1891 goto pick;
1892 }
1893 stream->op = op;
1894 _dispatch_fd_debug("stream handler", op->fd_entry->fd);
1895 dispatch_fd_entry_t fd_entry = op->fd_entry;
1896 _dispatch_fd_entry_retain(fd_entry);
1897 // For performance analysis
1898 if (!op->total && dispatch_io_defaults.initial_delivery) {
1899 // Empty delivery to signal the start of the operation
1900 _dispatch_fd_debug("initial delivery", op->fd_entry->fd);
1901 _dispatch_operation_deliver_data(op, DOP_DELIVER);
1902 }
1903 // TODO: perform on the operation target queue to get correct priority
1904 int result = _dispatch_operation_perform(op);
1905 dispatch_op_flags_t flags = ~0u;
1906 switch (result) {
1907 case DISPATCH_OP_DELIVER:
1908 flags = DOP_DEFAULT;
1909 // Fall through
1910 case DISPATCH_OP_DELIVER_AND_COMPLETE:
1911 flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
1912 DOP_DEFAULT;
1913 _dispatch_operation_deliver_data(op, flags);
1914 // Fall through
1915 case DISPATCH_OP_COMPLETE:
1916 if (flags != DOP_DEFAULT) {
1917 _dispatch_stream_complete_operation(stream, op);
1918 }
1919 if (_dispatch_stream_operation_avail(stream)) {
1920 dispatch_async_f(stream->dq, stream->dq,
1921 _dispatch_stream_queue_handler);
1922 }
1923 break;
1924 case DISPATCH_OP_COMPLETE_RESUME:
1925 _dispatch_stream_complete_operation(stream, op);
1926 // Fall through
1927 case DISPATCH_OP_RESUME:
1928 if (_dispatch_stream_operation_avail(stream)) {
1929 stream->source_running = true;
1930 dispatch_resume(_dispatch_stream_source(stream, op));
1931 }
1932 break;
1933 case DISPATCH_OP_ERR:
1934 _dispatch_stream_cleanup_operations(stream, op->channel);
1935 break;
1936 case DISPATCH_OP_FD_ERR:
1937 _dispatch_fd_entry_retain(fd_entry);
1938 dispatch_async(fd_entry->barrier_queue, ^{
1939 _dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
1940 _dispatch_fd_entry_release(fd_entry);
1941 });
1942 break;
1943 default:
1944 break;
1945 }
1946 _dispatch_fd_entry_release(fd_entry);
1947 return;
1948 }
1949
1950 static void
1951 _dispatch_disk_handler(void *ctx)
1952 {
1953 // On pick queue
1954 dispatch_disk_t disk = (dispatch_disk_t)ctx;
1955 if (disk->io_active) {
1956 return;
1957 }
1958 _dispatch_fd_debug("disk handler", -1);
1959 dispatch_operation_t op;
1960 size_t i = disk->free_idx, j = disk->req_idx;
1961 if (j <= i) {
1962 j += disk->advise_list_depth;
1963 }
1964 while (i <= j) {
1965 if ((!disk->advise_list[i%disk->advise_list_depth]) &&
1966 (op = _dispatch_disk_pick_next_operation(disk))) {
1967 int err = _dispatch_io_get_error(op, NULL, true);
1968 if (err) {
1969 op->err = err;
1970 _dispatch_disk_complete_operation(disk, op);
1971 continue;
1972 }
1973 _dispatch_retain(op);
1974 disk->advise_list[i%disk->advise_list_depth] = op;
1975 op->active = true;
1976 _dispatch_object_debug(op, "%s", __func__);
1977 } else {
1978 // No more operations to get
1979 break;
1980 }
1981 i++;
1982 }
1983 disk->free_idx = (i%disk->advise_list_depth);
1984 op = disk->advise_list[disk->req_idx];
1985 if (op) {
1986 disk->io_active = true;
1987 dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
1988 }
1989 }
1990
1991 static void
1992 _dispatch_disk_perform(void *ctxt)
1993 {
1994 dispatch_disk_t disk = ctxt;
1995 size_t chunk_size = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
1996 _dispatch_fd_debug("disk perform", -1);
1997 dispatch_operation_t op;
1998 size_t i = disk->advise_idx, j = disk->free_idx;
1999 if (j <= i) {
2000 j += disk->advise_list_depth;
2001 }
2002 do {
2003 op = disk->advise_list[i%disk->advise_list_depth];
2004 if (!op) {
2005 // Nothing more to advise, must be at free_idx
2006 dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
2007 break;
2008 }
2009 if (op->direction == DOP_DIR_WRITE) {
2010 // TODO: preallocate writes ? rdar://problem/9032172
2011 continue;
2012 }
2013 if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
2014 op->channel)) {
2015 continue;
2016 }
2017 // For performance analysis
2018 if (!op->total && dispatch_io_defaults.initial_delivery) {
2019 // Empty delivery to signal the start of the operation
2020 _dispatch_fd_debug("initial delivery", op->fd_entry->fd);
2021 _dispatch_operation_deliver_data(op, DOP_DELIVER);
2022 }
2023 // Advise two chunks if the list only has one element and this is the
2024 // first advise on the operation
2025 if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
2026 !op->advise_offset) {
2027 chunk_size *= 2;
2028 }
2029 _dispatch_operation_advise(op, chunk_size);
2030 } while (++i < j);
2031 disk->advise_idx = i%disk->advise_list_depth;
2032 op = disk->advise_list[disk->req_idx];
2033 int result = _dispatch_operation_perform(op);
2034 disk->advise_list[disk->req_idx] = NULL;
2035 disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
2036 dispatch_async(disk->pick_queue, ^{
2037 switch (result) {
2038 case DISPATCH_OP_DELIVER:
2039 _dispatch_operation_deliver_data(op, DOP_DEFAULT);
2040 break;
2041 case DISPATCH_OP_COMPLETE:
2042 _dispatch_disk_complete_operation(disk, op);
2043 break;
2044 case DISPATCH_OP_DELIVER_AND_COMPLETE:
2045 _dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
2046 _dispatch_disk_complete_operation(disk, op);
2047 break;
2048 case DISPATCH_OP_ERR:
2049 _dispatch_disk_cleanup_operations(disk, op->channel);
2050 break;
2051 case DISPATCH_OP_FD_ERR:
2052 _dispatch_disk_cleanup_operations(disk, NULL);
2053 break;
2054 default:
2055 dispatch_assert(result);
2056 break;
2057 }
2058 op->active = false;
2059 disk->io_active = false;
2060 _dispatch_disk_handler(disk);
2061 // Balancing the retain in _dispatch_disk_handler. Note that op must be
2062 // released at the very end, since it might hold the last reference to
2063 // the disk
2064 _dispatch_release(op);
2065 });
2066 }
2067
2068 #pragma mark -
2069 #pragma mark dispatch_operation_perform
2070
2071 static void
2072 _dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
2073 {
2074 int err;
2075 struct radvisory advise;
2076 // No point in issuing a read advise for the next chunk if we are already
2077 // a chunk ahead from reading the bytes
2078 if (op->advise_offset > (off_t)(((size_t)op->offset + op->total) +
2079 chunk_size + PAGE_SIZE)) {
2080 return;
2081 }
2082 _dispatch_object_debug(op, "%s", __func__);
2083 advise.ra_count = (int)chunk_size;
2084 if (!op->advise_offset) {
2085 op->advise_offset = op->offset;
2086 // If this is the first time through, align the advised range to a
2087 // page boundary
2088 size_t pg_fraction = ((size_t)op->offset + chunk_size) % PAGE_SIZE;
2089 advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
2090 }
2091 advise.ra_offset = op->advise_offset;
2092 op->advise_offset += advise.ra_count;
2093 _dispatch_io_syscall_switch(err,
2094 fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
2095 case EFBIG: break; // advised past the end of the file rdar://10415691
2096 case ENOTSUP: break; // not all FS support radvise rdar://13484629
2097 // TODO: set disk status on error
2098 default: (void)dispatch_assume_zero(err); break;
2099 );
2100 }
2101
2102 static int
2103 _dispatch_operation_perform(dispatch_operation_t op)
2104 {
2105 int err = _dispatch_io_get_error(op, NULL, true);
2106 if (err) {
2107 goto error;
2108 }
2109 _dispatch_object_debug(op, "%s", __func__);
2110 if (!op->buf) {
2111 size_t max_buf_siz = op->params.high;
2112 size_t chunk_siz = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
2113 if (op->direction == DOP_DIR_READ) {
2114 // If necessary, create a buffer for the ongoing operation, large
2115 // enough to fit chunk_pages but at most high-water
2116 size_t data_siz = dispatch_data_get_size(op->data);
2117 if (data_siz) {
2118 dispatch_assert(data_siz < max_buf_siz);
2119 max_buf_siz -= data_siz;
2120 }
2121 if (max_buf_siz > chunk_siz) {
2122 max_buf_siz = chunk_siz;
2123 }
2124 if (op->length < SIZE_MAX) {
2125 op->buf_siz = op->length - op->total;
2126 if (op->buf_siz > max_buf_siz) {
2127 op->buf_siz = max_buf_siz;
2128 }
2129 } else {
2130 op->buf_siz = max_buf_siz;
2131 }
2132 op->buf = valloc(op->buf_siz);
2133 _dispatch_fd_debug("buffer allocated", op->fd_entry->fd);
2134 } else if (op->direction == DOP_DIR_WRITE) {
2135 // Always write the first data piece, if that is smaller than a
2136 // chunk, accumulate further data pieces until chunk size is reached
2137 if (chunk_siz > max_buf_siz) {
2138 chunk_siz = max_buf_siz;
2139 }
2140 op->buf_siz = 0;
2141 dispatch_data_apply(op->data,
2142 ^(dispatch_data_t region DISPATCH_UNUSED,
2143 size_t offset DISPATCH_UNUSED,
2144 const void* buf DISPATCH_UNUSED, size_t len) {
2145 size_t siz = op->buf_siz + len;
2146 if (!op->buf_siz || siz <= chunk_siz) {
2147 op->buf_siz = siz;
2148 }
2149 return (bool)(siz < chunk_siz);
2150 });
2151 if (op->buf_siz > max_buf_siz) {
2152 op->buf_siz = max_buf_siz;
2153 }
2154 dispatch_data_t d;
2155 d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
2156 op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
2157 NULL);
2158 _dispatch_io_data_release(d);
2159 _dispatch_fd_debug("buffer mapped", op->fd_entry->fd);
2160 }
2161 }
2162 if (op->fd_entry->fd == -1) {
2163 err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
2164 if (err) {
2165 goto error;
2166 }
2167 }
2168 void *buf = op->buf + op->buf_len;
2169 size_t len = op->buf_siz - op->buf_len;
2170 off_t off = (off_t)((size_t)op->offset + op->total);
2171 ssize_t processed = -1;
2172 syscall:
2173 if (op->direction == DOP_DIR_READ) {
2174 if (op->params.type == DISPATCH_IO_STREAM) {
2175 processed = read(op->fd_entry->fd, buf, len);
2176 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2177 processed = pread(op->fd_entry->fd, buf, len, off);
2178 }
2179 } else if (op->direction == DOP_DIR_WRITE) {
2180 if (op->params.type == DISPATCH_IO_STREAM) {
2181 processed = write(op->fd_entry->fd, buf, len);
2182 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2183 processed = pwrite(op->fd_entry->fd, buf, len, off);
2184 }
2185 }
2186 // Encountered an error on the file descriptor
2187 if (processed == -1) {
2188 err = errno;
2189 if (err == EINTR) {
2190 goto syscall;
2191 }
2192 goto error;
2193 }
2194 // EOF is indicated by two handler invocations
2195 if (processed == 0) {
2196 _dispatch_fd_debug("EOF", op->fd_entry->fd);
2197 return DISPATCH_OP_DELIVER_AND_COMPLETE;
2198 }
2199 op->buf_len += (size_t)processed;
2200 op->total += (size_t)processed;
2201 if (op->total == op->length) {
2202 // Finished processing all the bytes requested by the operation
2203 return DISPATCH_OP_COMPLETE;
2204 } else {
2205 // Deliver data only if we satisfy the filters
2206 return DISPATCH_OP_DELIVER;
2207 }
2208 error:
2209 if (err == EAGAIN) {
2210 // For disk based files with blocking I/O we should never get EAGAIN
2211 dispatch_assert(!op->fd_entry->disk);
2212 _dispatch_fd_debug("EAGAIN %d", op->fd_entry->fd, err);
2213 if (op->direction == DOP_DIR_READ && op->total &&
2214 op->channel == op->fd_entry->convenience_channel) {
2215 // Convenience read with available data completes on EAGAIN
2216 return DISPATCH_OP_COMPLETE_RESUME;
2217 }
2218 return DISPATCH_OP_RESUME;
2219 }
2220 op->err = err;
2221 switch (err) {
2222 case ECANCELED:
2223 return DISPATCH_OP_ERR;
2224 case EBADF:
2225 (void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err, relaxed);
2226 return DISPATCH_OP_FD_ERR;
2227 default:
2228 return DISPATCH_OP_COMPLETE;
2229 }
2230 }
2231
2232 static void
2233 _dispatch_operation_deliver_data(dispatch_operation_t op,
2234 dispatch_op_flags_t flags)
2235 {
2236 // Either called from stream resp. pick queue or when op is finalized
2237 dispatch_data_t data = NULL;
2238 int err = 0;
2239 size_t undelivered = op->undelivered + op->buf_len;
2240 bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
2241 (op->flags & DOP_DELIVER);
2242 op->flags = DOP_DEFAULT;
2243 if (!deliver) {
2244 // Don't deliver data until low water mark has been reached
2245 if (undelivered >= op->params.low) {
2246 deliver = true;
2247 } else if (op->buf_len < op->buf_siz) {
2248 // Request buffer is not yet used up
2249 _dispatch_fd_debug("buffer data", op->fd_entry->fd);
2250 return;
2251 }
2252 } else {
2253 err = op->err;
2254 if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
2255 err = ECANCELED;
2256 op->err = err;
2257 }
2258 }
2259 // Deliver data or buffer used up
2260 if (op->direction == DOP_DIR_READ) {
2261 if (op->buf_len) {
2262 void *buf = op->buf;
2263 data = dispatch_data_create(buf, op->buf_len, NULL,
2264 DISPATCH_DATA_DESTRUCTOR_FREE);
2265 op->buf = NULL;
2266 op->buf_len = 0;
2267 dispatch_data_t d = dispatch_data_create_concat(op->data, data);
2268 _dispatch_io_data_release(op->data);
2269 _dispatch_io_data_release(data);
2270 data = d;
2271 } else {
2272 data = op->data;
2273 }
2274 op->data = deliver ? dispatch_data_empty : data;
2275 } else if (op->direction == DOP_DIR_WRITE) {
2276 if (deliver) {
2277 data = dispatch_data_create_subrange(op->data, op->buf_len,
2278 op->length);
2279 }
2280 if (op->buf_data && op->buf_len == op->buf_siz) {
2281 _dispatch_io_data_release(op->buf_data);
2282 op->buf_data = NULL;
2283 op->buf = NULL;
2284 op->buf_len = 0;
2285 // Trim newly written buffer from head of unwritten data
2286 dispatch_data_t d;
2287 if (deliver) {
2288 _dispatch_io_data_retain(data);
2289 d = data;
2290 } else {
2291 d = dispatch_data_create_subrange(op->data, op->buf_siz,
2292 op->length);
2293 }
2294 _dispatch_io_data_release(op->data);
2295 op->data = d;
2296 }
2297 } else {
2298 dispatch_assert(op->direction < DOP_DIR_MAX);
2299 return;
2300 }
2301 if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
2302 op->undelivered = undelivered;
2303 _dispatch_fd_debug("buffer data", op->fd_entry->fd);
2304 return;
2305 }
2306 op->undelivered = 0;
2307 _dispatch_object_debug(op, "%s", __func__);
2308 _dispatch_fd_debug("deliver data", op->fd_entry->fd);
2309 dispatch_op_direction_t direction = op->direction;
2310 dispatch_io_handler_t handler = op->handler;
2311 #if DISPATCH_IO_DEBUG
2312 int fd = op->fd_entry->fd;
2313 #endif
2314 dispatch_fd_entry_t fd_entry = op->fd_entry;
2315 _dispatch_fd_entry_retain(fd_entry);
2316 dispatch_io_t channel = op->channel;
2317 _dispatch_retain(channel);
2318 // Note that data delivery may occur after the operation is freed
2319 dispatch_async(op->op_q, ^{
2320 bool done = (flags & DOP_DONE);
2321 dispatch_data_t d = data;
2322 if (done) {
2323 if (direction == DOP_DIR_READ && err) {
2324 if (dispatch_data_get_size(d)) {
2325 _dispatch_fd_debug("IO handler invoke", fd);
2326 handler(false, d, 0);
2327 }
2328 d = NULL;
2329 } else if (direction == DOP_DIR_WRITE && !err) {
2330 d = NULL;
2331 }
2332 }
2333 _dispatch_fd_debug("IO handler invoke", fd);
2334 handler(done, d, err);
2335 _dispatch_release(channel);
2336 _dispatch_fd_entry_release(fd_entry);
2337 _dispatch_io_data_release(data);
2338 });
2339 }
2340
2341 #pragma mark -
2342 #pragma mark dispatch_io_debug
2343
2344 static size_t
2345 _dispatch_io_debug_attr(dispatch_io_t channel, char* buf, size_t bufsiz)
2346 {
2347 dispatch_queue_t target = channel->do_targetq;
2348 return dsnprintf(buf, bufsiz, "type = %s, fd = 0x%x, %sfd_entry = %p, "
2349 "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = "
2350 "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ",
2351 channel->params.type == DISPATCH_IO_STREAM ? "stream" : "random",
2352 channel->fd_actual, channel->atomic_flags & DIO_STOPPED ?
2353 "stopped, " : channel->atomic_flags & DIO_CLOSED ? "closed, " : "",
2354 channel->fd_entry, channel->queue, target && target->dq_label ?
2355 target->dq_label : "", target, channel->barrier_queue,
2356 channel->barrier_group, channel->err, channel->params.low,
2357 channel->params.high, channel->params.interval_flags &
2358 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "",
2359 channel->params.interval);
2360 }
2361
2362 size_t
2363 _dispatch_io_debug(dispatch_io_t channel, char* buf, size_t bufsiz)
2364 {
2365 size_t offset = 0;
2366 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2367 dx_kind(channel), channel);
2368 offset += _dispatch_object_debug_attr(channel, &buf[offset],
2369 bufsiz - offset);
2370 offset += _dispatch_io_debug_attr(channel, &buf[offset], bufsiz - offset);
2371 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2372 return offset;
2373 }
2374
2375 static size_t
2376 _dispatch_operation_debug_attr(dispatch_operation_t op, char* buf,
2377 size_t bufsiz)
2378 {
2379 dispatch_queue_t target = op->do_targetq;
2380 dispatch_queue_t oqtarget = op->op_q ? op->op_q->do_targetq : NULL;
2381 return dsnprintf(buf, bufsiz, "type = %s %s, fd = 0x%x, fd_entry = %p, "
2382 "channel = %p, queue = %p -> %s[%p], target = %s[%p], "
2383 "offset = %lld, length = %zu, done = %zu, undelivered = %zu, "
2384 "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, "
2385 "interval%s = %llu ", op->params.type == DISPATCH_IO_STREAM ?
2386 "stream" : "random", op->direction == DOP_DIR_READ ? "read" :
2387 "write", op->fd_entry ? op->fd_entry->fd : -1, op->fd_entry,
2388 op->channel, op->op_q, oqtarget && oqtarget->dq_label ?
2389 oqtarget->dq_label : "", oqtarget, target && target->dq_label ?
2390 target->dq_label : "", target, op->offset, op->length, op->total,
2391 op->undelivered + op->buf_len, op->flags, op->err, op->params.low,
2392 op->params.high, op->params.interval_flags &
2393 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "", op->params.interval);
2394 }
2395
2396 size_t
2397 _dispatch_operation_debug(dispatch_operation_t op, char* buf, size_t bufsiz)
2398 {
2399 size_t offset = 0;
2400 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2401 dx_kind(op), op);
2402 offset += _dispatch_object_debug_attr(op, &buf[offset], bufsiz - offset);
2403 offset += _dispatch_operation_debug_attr(op, &buf[offset], bufsiz - offset);
2404 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2405 return offset;
2406 }