]> git.saurik.com Git - apple/libdispatch.git/blob - src/io.c
libdispatch-339.92.1.tar.gz
[apple/libdispatch.git] / src / io.c
1 /*
2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 #ifndef DISPATCH_IO_DEBUG
24 #define DISPATCH_IO_DEBUG DISPATCH_DEBUG
25 #endif
26
27 #if DISPATCH_IO_DEBUG
28 #define _dispatch_fd_debug(msg, fd, args...) \
29 _dispatch_debug("fd[0x%x]: " msg, (fd), ##args)
30 #else
31 #define _dispatch_fd_debug(msg, fd, args...)
32 #endif
33
34 #if USE_OBJC
35 #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
36 #define _dispatch_io_data_release(x) _dispatch_objc_release(x)
37 #else
38 #define _dispatch_io_data_retain(x) dispatch_retain(x)
39 #define _dispatch_io_data_release(x) dispatch_release(x)
40 #endif
41
42 typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
43
44 DISPATCH_EXPORT DISPATCH_NOTHROW
45 void _dispatch_iocntl(uint32_t param, uint64_t value);
46
47 static dispatch_operation_t _dispatch_operation_create(
48 dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
49 size_t length, dispatch_data_t data, dispatch_queue_t queue,
50 dispatch_io_handler_t handler);
51 static void _dispatch_operation_enqueue(dispatch_operation_t op,
52 dispatch_op_direction_t direction, dispatch_data_t data);
53 static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
54 dispatch_operation_t op);
55 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
56 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
57 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
58 dispatch_fd_entry_init_callback_t completion_callback);
59 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
60 uintptr_t hash);
61 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
62 dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
63 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
64 dispatch_io_t channel);
65 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
66 dispatch_io_t channel);
67 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
68 dispatch_queue_t tq);
69 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
70 dispatch_op_direction_t direction);
71 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
72 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
73 dispatch_operation_t operation, dispatch_data_t data);
74 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
75 dispatch_operation_t operation, dispatch_data_t data);
76 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
77 dispatch_io_t channel);
78 static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk,
79 dispatch_io_t channel);
80 static void _dispatch_stream_source_handler(void *ctx);
81 static void _dispatch_stream_queue_handler(void *ctx);
82 static void _dispatch_stream_handler(void *ctx);
83 static void _dispatch_disk_handler(void *ctx);
84 static void _dispatch_disk_perform(void *ctxt);
85 static void _dispatch_operation_advise(dispatch_operation_t op,
86 size_t chunk_size);
87 static int _dispatch_operation_perform(dispatch_operation_t op);
88 static void _dispatch_operation_deliver_data(dispatch_operation_t op,
89 dispatch_op_flags_t flags);
90
91 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
92 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
93 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
94 case EINTR: continue; \
95 __VA_ARGS__ \
96 } \
97 break; \
98 } while (1)
99 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
100 _dispatch_io_syscall_switch_noerr(__err, __syscall, \
101 case 0: break; \
102 __VA_ARGS__ \
103 ); \
104 } while (0)
105 #define _dispatch_io_syscall(__syscall) do { int __err; \
106 _dispatch_io_syscall_switch(__err, __syscall); \
107 } while (0)
108
109 enum {
110 DISPATCH_OP_COMPLETE = 1,
111 DISPATCH_OP_DELIVER,
112 DISPATCH_OP_DELIVER_AND_COMPLETE,
113 DISPATCH_OP_COMPLETE_RESUME,
114 DISPATCH_OP_RESUME,
115 DISPATCH_OP_ERR,
116 DISPATCH_OP_FD_ERR,
117 };
118
119 #define _dispatch_io_Block_copy(x) \
120 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
121
122 #pragma mark -
123 #pragma mark dispatch_io_hashtables
124
125 #if TARGET_OS_EMBEDDED
126 #define DIO_HASH_SIZE 64u // must be a power of two
127 #else
128 #define DIO_HASH_SIZE 256u // must be a power of two
129 #endif
130 #define DIO_HASH(x) ((uintptr_t)(x) & (DIO_HASH_SIZE - 1))
131
132 // Global hashtable of dev_t -> disk_s mappings
133 DISPATCH_CACHELINE_ALIGN
134 static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
135 // Global hashtable of fd -> fd_entry_s mappings
136 DISPATCH_CACHELINE_ALIGN
137 static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
138
139 static dispatch_once_t _dispatch_io_devs_lockq_pred;
140 static dispatch_queue_t _dispatch_io_devs_lockq;
141 static dispatch_queue_t _dispatch_io_fds_lockq;
142
143 static void
144 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
145 {
146 _dispatch_io_fds_lockq = dispatch_queue_create(
147 "com.apple.libdispatch-io.fd_lockq", NULL);
148 unsigned int i;
149 for (i = 0; i < DIO_HASH_SIZE; i++) {
150 TAILQ_INIT(&_dispatch_io_fds[i]);
151 }
152 }
153
154 static void
155 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
156 {
157 _dispatch_io_devs_lockq = dispatch_queue_create(
158 "com.apple.libdispatch-io.dev_lockq", NULL);
159 unsigned int i;
160 for (i = 0; i < DIO_HASH_SIZE; i++) {
161 TAILQ_INIT(&_dispatch_io_devs[i]);
162 }
163 }
164
165 #pragma mark -
166 #pragma mark dispatch_io_defaults
167
168 enum {
169 DISPATCH_IOCNTL_CHUNK_PAGES = 1,
170 DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
171 DISPATCH_IOCNTL_INITIAL_DELIVERY,
172 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
173 };
174
175 static struct dispatch_io_defaults_s {
176 size_t chunk_pages, low_water_chunks, max_pending_io_reqs;
177 bool initial_delivery;
178 } dispatch_io_defaults = {
179 .chunk_pages = DIO_MAX_CHUNK_PAGES,
180 .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
181 .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
182 };
183
184 #define _dispatch_iocntl_set_default(p, v) do { \
185 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
186 } while (0)
187
188 void
189 _dispatch_iocntl(uint32_t param, uint64_t value)
190 {
191 switch (param) {
192 case DISPATCH_IOCNTL_CHUNK_PAGES:
193 _dispatch_iocntl_set_default(chunk_pages, value);
194 break;
195 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
196 _dispatch_iocntl_set_default(low_water_chunks, value);
197 break;
198 case DISPATCH_IOCNTL_INITIAL_DELIVERY:
199 _dispatch_iocntl_set_default(initial_delivery, value);
200 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
201 _dispatch_iocntl_set_default(max_pending_io_reqs, value);
202 break;
203 }
204 }
205
206 #pragma mark -
207 #pragma mark dispatch_io_t
208
209 static dispatch_io_t
210 _dispatch_io_create(dispatch_io_type_t type)
211 {
212 dispatch_io_t channel = _dispatch_alloc(DISPATCH_VTABLE(io),
213 sizeof(struct dispatch_io_s));
214 channel->do_next = DISPATCH_OBJECT_LISTLESS;
215 channel->do_targetq = _dispatch_get_root_queue(0, true);
216 channel->params.type = type;
217 channel->params.high = SIZE_MAX;
218 channel->params.low = dispatch_io_defaults.low_water_chunks *
219 dispatch_io_defaults.chunk_pages * PAGE_SIZE;
220 channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
221 NULL);
222 return channel;
223 }
224
225 static void
226 _dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
227 dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
228 {
229 // Enqueue the cleanup handler on the suspended close queue
230 if (cleanup_handler) {
231 _dispatch_retain(queue);
232 dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
233 dispatch_async(queue, ^{
234 _dispatch_fd_debug("cleanup handler invoke", -1);
235 cleanup_handler(err);
236 });
237 _dispatch_release(queue);
238 });
239 }
240 if (fd_entry) {
241 channel->fd_entry = fd_entry;
242 dispatch_retain(fd_entry->barrier_queue);
243 dispatch_retain(fd_entry->barrier_group);
244 channel->barrier_queue = fd_entry->barrier_queue;
245 channel->barrier_group = fd_entry->barrier_group;
246 } else {
247 // Still need to create a barrier queue, since all operations go
248 // through it
249 channel->barrier_queue = dispatch_queue_create(
250 "com.apple.libdispatch-io.barrierq", NULL);
251 channel->barrier_group = dispatch_group_create();
252 }
253 }
254
255 void
256 _dispatch_io_dispose(dispatch_io_t channel)
257 {
258 _dispatch_object_debug(channel, "%s", __func__);
259 if (channel->fd_entry &&
260 !(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
261 if (channel->fd_entry->path_data) {
262 // This modification is safe since path_data->channel is checked
263 // only on close_queue (which is still suspended at this point)
264 channel->fd_entry->path_data->channel = NULL;
265 }
266 // Cleanup handlers will only run when all channels related to this
267 // fd are complete
268 _dispatch_fd_entry_release(channel->fd_entry);
269 }
270 if (channel->queue) {
271 dispatch_release(channel->queue);
272 }
273 if (channel->barrier_queue) {
274 dispatch_release(channel->barrier_queue);
275 }
276 if (channel->barrier_group) {
277 dispatch_release(channel->barrier_group);
278 }
279 }
280
281 static int
282 _dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
283 {
284 int err = 0;
285 if (S_ISDIR(mode)) {
286 err = EISDIR;
287 } else if (channel->params.type == DISPATCH_IO_RANDOM &&
288 (S_ISFIFO(mode) || S_ISSOCK(mode))) {
289 err = ESPIPE;
290 }
291 return err;
292 }
293
294 static int
295 _dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
296 bool ignore_closed)
297 {
298 // On _any_ queue
299 int err;
300 if (op) {
301 channel = op->channel;
302 }
303 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
304 if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
305 err = ECANCELED;
306 } else {
307 err = 0;
308 }
309 } else {
310 err = op ? op->fd_entry->err : channel->err;
311 }
312 return err;
313 }
314
315 #pragma mark -
316 #pragma mark dispatch_io_channels
317
318 dispatch_io_t
319 dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
320 dispatch_queue_t queue, void (^cleanup_handler)(int))
321 {
322 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
323 return NULL;
324 }
325 _dispatch_fd_debug("io create", fd);
326 dispatch_io_t channel = _dispatch_io_create(type);
327 channel->fd = fd;
328 channel->fd_actual = fd;
329 dispatch_suspend(channel->queue);
330 _dispatch_retain(queue);
331 _dispatch_retain(channel);
332 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
333 // On barrier queue
334 int err = fd_entry->err;
335 if (!err) {
336 err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
337 }
338 if (!err && type == DISPATCH_IO_RANDOM) {
339 off_t f_ptr;
340 _dispatch_io_syscall_switch_noerr(err,
341 f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
342 case 0: channel->f_ptr = f_ptr; break;
343 default: (void)dispatch_assume_zero(err); break;
344 );
345 }
346 channel->err = err;
347 _dispatch_fd_entry_retain(fd_entry);
348 _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
349 dispatch_resume(channel->queue);
350 _dispatch_object_debug(channel, "%s", __func__);
351 _dispatch_release(channel);
352 _dispatch_release(queue);
353 });
354 _dispatch_object_debug(channel, "%s", __func__);
355 return channel;
356 }
357
358 dispatch_io_t
359 dispatch_io_create_f(dispatch_io_type_t type, dispatch_fd_t fd,
360 dispatch_queue_t queue, void *context,
361 void (*cleanup_handler)(void *context, int error))
362 {
363 return dispatch_io_create(type, fd, queue, !cleanup_handler ? NULL :
364 ^(int error){ cleanup_handler(context, error); });
365 }
366
367 dispatch_io_t
368 dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
369 int oflag, mode_t mode, dispatch_queue_t queue,
370 void (^cleanup_handler)(int error))
371 {
372 if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
373 !(path && *path == '/')) {
374 return NULL;
375 }
376 size_t pathlen = strlen(path);
377 dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
378 if (!path_data) {
379 return NULL;
380 }
381 _dispatch_fd_debug("io create with path %s", -1, path);
382 dispatch_io_t channel = _dispatch_io_create(type);
383 channel->fd = -1;
384 channel->fd_actual = -1;
385 path_data->channel = channel;
386 path_data->oflag = oflag;
387 path_data->mode = mode;
388 path_data->pathlen = pathlen;
389 memcpy(path_data->path, path, pathlen + 1);
390 _dispatch_retain(queue);
391 _dispatch_retain(channel);
392 dispatch_async(channel->queue, ^{
393 int err = 0;
394 struct stat st;
395 _dispatch_io_syscall_switch_noerr(err,
396 (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW ||
397 (path_data->oflag & O_SYMLINK) == O_SYMLINK ?
398 lstat(path_data->path, &st) : stat(path_data->path, &st),
399 case 0:
400 err = _dispatch_io_validate_type(channel, st.st_mode);
401 break;
402 default:
403 if ((path_data->oflag & O_CREAT) &&
404 (*(path_data->path + path_data->pathlen - 1) != '/')) {
405 // Check parent directory
406 char *c = strrchr(path_data->path, '/');
407 dispatch_assert(c);
408 *c = 0;
409 int perr;
410 _dispatch_io_syscall_switch_noerr(perr,
411 stat(path_data->path, &st),
412 case 0:
413 // Since the parent directory exists, open() will
414 // create a regular file after the fd_entry has
415 // been filled in
416 st.st_mode = S_IFREG;
417 err = 0;
418 break;
419 );
420 *c = '/';
421 }
422 break;
423 );
424 channel->err = err;
425 if (err) {
426 free(path_data);
427 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
428 _dispatch_release(channel);
429 _dispatch_release(queue);
430 return;
431 }
432 dispatch_suspend(channel->queue);
433 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
434 _dispatch_io_devs_lockq_init);
435 dispatch_async(_dispatch_io_devs_lockq, ^{
436 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
437 path_data, st.st_dev, st.st_mode);
438 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
439 dispatch_resume(channel->queue);
440 _dispatch_object_debug(channel, "%s", __func__);
441 _dispatch_release(channel);
442 _dispatch_release(queue);
443 });
444 });
445 _dispatch_object_debug(channel, "%s", __func__);
446 return channel;
447 }
448
449 dispatch_io_t
450 dispatch_io_create_with_path_f(dispatch_io_type_t type, const char *path,
451 int oflag, mode_t mode, dispatch_queue_t queue, void *context,
452 void (*cleanup_handler)(void *context, int error))
453 {
454 return dispatch_io_create_with_path(type, path, oflag, mode, queue,
455 !cleanup_handler ? NULL :
456 ^(int error){ cleanup_handler(context, error); });
457 }
458
459 dispatch_io_t
460 dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
461 dispatch_queue_t queue, void (^cleanup_handler)(int error))
462 {
463 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
464 return NULL;
465 }
466 _dispatch_fd_debug("io create with io %p", -1, in_channel);
467 dispatch_io_t channel = _dispatch_io_create(type);
468 dispatch_suspend(channel->queue);
469 _dispatch_retain(queue);
470 _dispatch_retain(channel);
471 _dispatch_retain(in_channel);
472 dispatch_async(in_channel->queue, ^{
473 int err0 = _dispatch_io_get_error(NULL, in_channel, false);
474 if (err0) {
475 channel->err = err0;
476 _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
477 dispatch_resume(channel->queue);
478 _dispatch_release(channel);
479 _dispatch_release(in_channel);
480 _dispatch_release(queue);
481 return;
482 }
483 dispatch_async(in_channel->barrier_queue, ^{
484 int err = _dispatch_io_get_error(NULL, in_channel, false);
485 // If there is no error, the fd_entry for the in_channel is valid.
486 // Since we are running on in_channel's queue, the fd_entry has been
487 // fully resolved and will stay valid for the duration of this block
488 if (!err) {
489 err = in_channel->err;
490 if (!err) {
491 err = in_channel->fd_entry->err;
492 }
493 }
494 if (!err) {
495 err = _dispatch_io_validate_type(channel,
496 in_channel->fd_entry->stat.mode);
497 }
498 if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
499 off_t f_ptr;
500 _dispatch_io_syscall_switch_noerr(err,
501 f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
502 case 0: channel->f_ptr = f_ptr; break;
503 default: (void)dispatch_assume_zero(err); break;
504 );
505 }
506 channel->err = err;
507 if (err) {
508 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
509 dispatch_resume(channel->queue);
510 _dispatch_release(channel);
511 _dispatch_release(in_channel);
512 _dispatch_release(queue);
513 return;
514 }
515 if (in_channel->fd == -1) {
516 // in_channel was created from path
517 channel->fd = -1;
518 channel->fd_actual = -1;
519 mode_t mode = in_channel->fd_entry->stat.mode;
520 dev_t dev = in_channel->fd_entry->stat.dev;
521 size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
522 in_channel->fd_entry->path_data->pathlen + 1;
523 dispatch_io_path_data_t path_data = malloc(path_data_len);
524 memcpy(path_data, in_channel->fd_entry->path_data,
525 path_data_len);
526 path_data->channel = channel;
527 // lockq_io_devs is known to already exist
528 dispatch_async(_dispatch_io_devs_lockq, ^{
529 dispatch_fd_entry_t fd_entry;
530 fd_entry = _dispatch_fd_entry_create_with_path(path_data,
531 dev, mode);
532 _dispatch_io_init(channel, fd_entry, queue, 0,
533 cleanup_handler);
534 dispatch_resume(channel->queue);
535 _dispatch_release(channel);
536 _dispatch_release(queue);
537 });
538 } else {
539 dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
540 channel->fd = in_channel->fd;
541 channel->fd_actual = in_channel->fd_actual;
542 _dispatch_fd_entry_retain(fd_entry);
543 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
544 dispatch_resume(channel->queue);
545 _dispatch_release(channel);
546 _dispatch_release(queue);
547 }
548 _dispatch_release(in_channel);
549 _dispatch_object_debug(channel, "%s", __func__);
550 });
551 });
552 _dispatch_object_debug(channel, "%s", __func__);
553 return channel;
554 }
555
556 dispatch_io_t
557 dispatch_io_create_with_io_f(dispatch_io_type_t type, dispatch_io_t in_channel,
558 dispatch_queue_t queue, void *context,
559 void (*cleanup_handler)(void *context, int error))
560 {
561 return dispatch_io_create_with_io(type, in_channel, queue,
562 !cleanup_handler ? NULL :
563 ^(int error){ cleanup_handler(context, error); });
564 }
565
566 #pragma mark -
567 #pragma mark dispatch_io_accessors
568
569 void
570 dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
571 {
572 _dispatch_retain(channel);
573 dispatch_async(channel->queue, ^{
574 _dispatch_fd_debug("io set high water", channel->fd);
575 if (channel->params.low > high_water) {
576 channel->params.low = high_water;
577 }
578 channel->params.high = high_water ? high_water : 1;
579 _dispatch_release(channel);
580 });
581 }
582
583 void
584 dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
585 {
586 _dispatch_retain(channel);
587 dispatch_async(channel->queue, ^{
588 _dispatch_fd_debug("io set low water", channel->fd);
589 if (channel->params.high < low_water) {
590 channel->params.high = low_water ? low_water : 1;
591 }
592 channel->params.low = low_water;
593 _dispatch_release(channel);
594 });
595 }
596
597 void
598 dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
599 unsigned long flags)
600 {
601 _dispatch_retain(channel);
602 dispatch_async(channel->queue, ^{
603 _dispatch_fd_debug("io set interval", channel->fd);
604 channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX;
605 channel->params.interval_flags = flags;
606 _dispatch_release(channel);
607 });
608 }
609
610 void
611 _dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
612 {
613 _dispatch_retain(dq);
614 _dispatch_retain(channel);
615 dispatch_async(channel->queue, ^{
616 dispatch_queue_t prev_dq = channel->do_targetq;
617 channel->do_targetq = dq;
618 _dispatch_release(prev_dq);
619 _dispatch_object_debug(channel, "%s", __func__);
620 _dispatch_release(channel);
621 });
622 }
623
624 dispatch_fd_t
625 dispatch_io_get_descriptor(dispatch_io_t channel)
626 {
627 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
628 return -1;
629 }
630 dispatch_fd_t fd = channel->fd_actual;
631 if (fd == -1 && _dispatch_thread_getspecific(dispatch_io_key) == channel &&
632 !_dispatch_io_get_error(NULL, channel, false)) {
633 dispatch_fd_entry_t fd_entry = channel->fd_entry;
634 (void)_dispatch_fd_entry_open(fd_entry, channel);
635 }
636 return channel->fd_actual;
637 }
638
639 #pragma mark -
640 #pragma mark dispatch_io_operations
641
642 static void
643 _dispatch_io_stop(dispatch_io_t channel)
644 {
645 _dispatch_fd_debug("io stop", channel->fd);
646 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed);
647 _dispatch_retain(channel);
648 dispatch_async(channel->queue, ^{
649 dispatch_async(channel->barrier_queue, ^{
650 _dispatch_object_debug(channel, "%s", __func__);
651 dispatch_fd_entry_t fd_entry = channel->fd_entry;
652 if (fd_entry) {
653 _dispatch_fd_debug("io stop cleanup", channel->fd);
654 _dispatch_fd_entry_cleanup_operations(fd_entry, channel);
655 if (!(channel->atomic_flags & DIO_CLOSED)) {
656 channel->fd_entry = NULL;
657 _dispatch_fd_entry_release(fd_entry);
658 }
659 } else if (channel->fd != -1) {
660 // Stop after close, need to check if fd_entry still exists
661 _dispatch_retain(channel);
662 dispatch_async(_dispatch_io_fds_lockq, ^{
663 _dispatch_object_debug(channel, "%s", __func__);
664 _dispatch_fd_debug("io stop after close cleanup",
665 channel->fd);
666 dispatch_fd_entry_t fdi;
667 uintptr_t hash = DIO_HASH(channel->fd);
668 TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
669 if (fdi->fd == channel->fd) {
670 _dispatch_fd_entry_cleanup_operations(fdi, channel);
671 break;
672 }
673 }
674 _dispatch_release(channel);
675 });
676 }
677 _dispatch_release(channel);
678 });
679 });
680 }
681
682 void
683 dispatch_io_close(dispatch_io_t channel, unsigned long flags)
684 {
685 if (flags & DISPATCH_IO_STOP) {
686 // Don't stop an already stopped channel
687 if (channel->atomic_flags & DIO_STOPPED) {
688 return;
689 }
690 return _dispatch_io_stop(channel);
691 }
692 // Don't close an already closed or stopped channel
693 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
694 return;
695 }
696 _dispatch_retain(channel);
697 dispatch_async(channel->queue, ^{
698 dispatch_async(channel->barrier_queue, ^{
699 _dispatch_object_debug(channel, "%s", __func__);
700 _dispatch_fd_debug("io close", channel->fd);
701 if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
702 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED,
703 relaxed);
704 dispatch_fd_entry_t fd_entry = channel->fd_entry;
705 if (fd_entry) {
706 if (!fd_entry->path_data) {
707 channel->fd_entry = NULL;
708 }
709 _dispatch_fd_entry_release(fd_entry);
710 }
711 }
712 _dispatch_release(channel);
713 });
714 });
715 }
716
717 void
718 dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
719 {
720 _dispatch_retain(channel);
721 dispatch_async(channel->queue, ^{
722 dispatch_queue_t io_q = channel->do_targetq;
723 dispatch_queue_t barrier_queue = channel->barrier_queue;
724 dispatch_group_t barrier_group = channel->barrier_group;
725 dispatch_async(barrier_queue, ^{
726 dispatch_suspend(barrier_queue);
727 dispatch_group_notify(barrier_group, io_q, ^{
728 _dispatch_object_debug(channel, "%s", __func__);
729 _dispatch_thread_setspecific(dispatch_io_key, channel);
730 barrier();
731 _dispatch_thread_setspecific(dispatch_io_key, NULL);
732 dispatch_resume(barrier_queue);
733 _dispatch_release(channel);
734 });
735 });
736 });
737 }
738
739 void
740 dispatch_io_barrier_f(dispatch_io_t channel, void *context,
741 dispatch_function_t barrier)
742 {
743 return dispatch_io_barrier(channel, ^{ barrier(context); });
744 }
745
746 void
747 dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
748 dispatch_queue_t queue, dispatch_io_handler_t handler)
749 {
750 _dispatch_retain(channel);
751 _dispatch_retain(queue);
752 dispatch_async(channel->queue, ^{
753 dispatch_operation_t op;
754 op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
755 length, dispatch_data_empty, queue, handler);
756 if (op) {
757 dispatch_queue_t barrier_q = channel->barrier_queue;
758 dispatch_async(barrier_q, ^{
759 _dispatch_operation_enqueue(op, DOP_DIR_READ,
760 dispatch_data_empty);
761 });
762 }
763 _dispatch_release(channel);
764 _dispatch_release(queue);
765 });
766 }
767
768 void
769 dispatch_io_read_f(dispatch_io_t channel, off_t offset, size_t length,
770 dispatch_queue_t queue, void *context,
771 dispatch_io_handler_function_t handler)
772 {
773 return dispatch_io_read(channel, offset, length, queue,
774 ^(bool done, dispatch_data_t d, int error){
775 handler(context, done, d, error);
776 });
777 }
778
779 void
780 dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
781 dispatch_queue_t queue, dispatch_io_handler_t handler)
782 {
783 _dispatch_io_data_retain(data);
784 _dispatch_retain(channel);
785 _dispatch_retain(queue);
786 dispatch_async(channel->queue, ^{
787 dispatch_operation_t op;
788 op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
789 dispatch_data_get_size(data), data, queue, handler);
790 if (op) {
791 dispatch_queue_t barrier_q = channel->barrier_queue;
792 dispatch_async(barrier_q, ^{
793 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
794 _dispatch_io_data_release(data);
795 });
796 } else {
797 _dispatch_io_data_release(data);
798 }
799 _dispatch_release(channel);
800 _dispatch_release(queue);
801 });
802 }
803
804 void
805 dispatch_io_write_f(dispatch_io_t channel, off_t offset, dispatch_data_t data,
806 dispatch_queue_t queue, void *context,
807 dispatch_io_handler_function_t handler)
808 {
809 return dispatch_io_write(channel, offset, data, queue,
810 ^(bool done, dispatch_data_t d, int error){
811 handler(context, done, d, error);
812 });
813 }
814
815 void
816 dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
817 void (^handler)(dispatch_data_t, int))
818 {
819 _dispatch_retain(queue);
820 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
821 // On barrier queue
822 if (fd_entry->err) {
823 int err = fd_entry->err;
824 dispatch_async(queue, ^{
825 _dispatch_fd_debug("convenience handler invoke", fd);
826 handler(dispatch_data_empty, err);
827 });
828 _dispatch_release(queue);
829 return;
830 }
831 // Safe to access fd_entry on barrier queue
832 dispatch_io_t channel = fd_entry->convenience_channel;
833 if (!channel) {
834 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
835 channel->fd = fd;
836 channel->fd_actual = fd;
837 channel->fd_entry = fd_entry;
838 dispatch_retain(fd_entry->barrier_queue);
839 dispatch_retain(fd_entry->barrier_group);
840 channel->barrier_queue = fd_entry->barrier_queue;
841 channel->barrier_group = fd_entry->barrier_group;
842 fd_entry->convenience_channel = channel;
843 }
844 __block dispatch_data_t deliver_data = dispatch_data_empty;
845 __block int err = 0;
846 dispatch_async(fd_entry->close_queue, ^{
847 dispatch_async(queue, ^{
848 _dispatch_fd_debug("convenience handler invoke", fd);
849 handler(deliver_data, err);
850 _dispatch_io_data_release(deliver_data);
851 });
852 _dispatch_release(queue);
853 });
854 dispatch_operation_t op =
855 _dispatch_operation_create(DOP_DIR_READ, channel, 0,
856 length, dispatch_data_empty,
857 _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
858 false), ^(bool done, dispatch_data_t data, int error) {
859 if (data) {
860 data = dispatch_data_create_concat(deliver_data, data);
861 _dispatch_io_data_release(deliver_data);
862 deliver_data = data;
863 }
864 if (done) {
865 err = error;
866 }
867 });
868 if (op) {
869 _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
870 }
871 });
872 }
873
874 void
875 dispatch_read_f(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
876 void *context, void (*handler)(void *, dispatch_data_t, int))
877 {
878 return dispatch_read(fd, length, queue, ^(dispatch_data_t d, int error){
879 handler(context, d, error);
880 });
881 }
882
883 void
884 dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
885 void (^handler)(dispatch_data_t, int))
886 {
887 _dispatch_io_data_retain(data);
888 _dispatch_retain(queue);
889 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
890 // On barrier queue
891 if (fd_entry->err) {
892 int err = fd_entry->err;
893 dispatch_async(queue, ^{
894 _dispatch_fd_debug("convenience handler invoke", fd);
895 handler(NULL, err);
896 });
897 _dispatch_release(queue);
898 return;
899 }
900 // Safe to access fd_entry on barrier queue
901 dispatch_io_t channel = fd_entry->convenience_channel;
902 if (!channel) {
903 channel = _dispatch_io_create(DISPATCH_IO_STREAM);
904 channel->fd = fd;
905 channel->fd_actual = fd;
906 channel->fd_entry = fd_entry;
907 dispatch_retain(fd_entry->barrier_queue);
908 dispatch_retain(fd_entry->barrier_group);
909 channel->barrier_queue = fd_entry->barrier_queue;
910 channel->barrier_group = fd_entry->barrier_group;
911 fd_entry->convenience_channel = channel;
912 }
913 __block dispatch_data_t deliver_data = NULL;
914 __block int err = 0;
915 dispatch_async(fd_entry->close_queue, ^{
916 dispatch_async(queue, ^{
917 _dispatch_fd_debug("convenience handler invoke", fd);
918 handler(deliver_data, err);
919 if (deliver_data) {
920 _dispatch_io_data_release(deliver_data);
921 }
922 });
923 _dispatch_release(queue);
924 });
925 dispatch_operation_t op =
926 _dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
927 dispatch_data_get_size(data), data,
928 _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
929 false), ^(bool done, dispatch_data_t d, int error) {
930 if (done) {
931 if (d) {
932 _dispatch_io_data_retain(d);
933 deliver_data = d;
934 }
935 err = error;
936 }
937 });
938 if (op) {
939 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
940 }
941 _dispatch_io_data_release(data);
942 });
943 }
944
945 void
946 dispatch_write_f(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
947 void *context, void (*handler)(void *, dispatch_data_t, int))
948 {
949 return dispatch_write(fd, data, queue, ^(dispatch_data_t d, int error){
950 handler(context, d, error);
951 });
952 }
953
954 #pragma mark -
955 #pragma mark dispatch_operation_t
956
957 static dispatch_operation_t
958 _dispatch_operation_create(dispatch_op_direction_t direction,
959 dispatch_io_t channel, off_t offset, size_t length,
960 dispatch_data_t data, dispatch_queue_t queue,
961 dispatch_io_handler_t handler)
962 {
963 // On channel queue
964 dispatch_assert(direction < DOP_DIR_MAX);
965 _dispatch_fd_debug("operation create", channel->fd);
966 #if DISPATCH_IO_DEBUG
967 int fd = channel->fd;
968 #endif
969 // Safe to call _dispatch_io_get_error() with channel->fd_entry since
970 // that can only be NULL if atomic_flags are set rdar://problem/8362514
971 int err = _dispatch_io_get_error(NULL, channel, false);
972 if (err || !length) {
973 _dispatch_io_data_retain(data);
974 _dispatch_retain(queue);
975 dispatch_async(channel->barrier_queue, ^{
976 dispatch_async(queue, ^{
977 dispatch_data_t d = data;
978 if (direction == DOP_DIR_READ && err) {
979 d = NULL;
980 } else if (direction == DOP_DIR_WRITE && !err) {
981 d = NULL;
982 }
983 _dispatch_fd_debug("IO handler invoke", fd);
984 handler(true, d, err);
985 _dispatch_io_data_release(data);
986 });
987 _dispatch_release(queue);
988 });
989 return NULL;
990 }
991 dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation),
992 sizeof(struct dispatch_operation_s));
993 op->do_next = DISPATCH_OBJECT_LISTLESS;
994 op->do_xref_cnt = -1; // operation object is not exposed externally
995 op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
996 op->op_q->do_targetq = queue;
997 _dispatch_retain(queue);
998 op->active = false;
999 op->direction = direction;
1000 op->offset = offset + channel->f_ptr;
1001 op->length = length;
1002 op->handler = _dispatch_io_Block_copy(handler);
1003 _dispatch_retain(channel);
1004 op->channel = channel;
1005 op->params = channel->params;
1006 // Take a snapshot of the priority of the channel queue. The actual I/O
1007 // for this operation will be performed at this priority
1008 dispatch_queue_t targetq = op->channel->do_targetq;
1009 while (fastpath(targetq->do_targetq)) {
1010 targetq = targetq->do_targetq;
1011 }
1012 op->do_targetq = targetq;
1013 _dispatch_object_debug(op, "%s", __func__);
1014 return op;
1015 }
1016
1017 void
1018 _dispatch_operation_dispose(dispatch_operation_t op)
1019 {
1020 _dispatch_object_debug(op, "%s", __func__);
1021 // Deliver the data if there's any
1022 if (op->fd_entry) {
1023 _dispatch_operation_deliver_data(op, DOP_DONE);
1024 dispatch_group_leave(op->fd_entry->barrier_group);
1025 _dispatch_fd_entry_release(op->fd_entry);
1026 }
1027 if (op->channel) {
1028 _dispatch_release(op->channel);
1029 }
1030 if (op->timer) {
1031 dispatch_release(op->timer);
1032 }
1033 // For write operations, op->buf is owned by op->buf_data
1034 if (op->buf && op->direction == DOP_DIR_READ) {
1035 free(op->buf);
1036 }
1037 if (op->buf_data) {
1038 _dispatch_io_data_release(op->buf_data);
1039 }
1040 if (op->data) {
1041 _dispatch_io_data_release(op->data);
1042 }
1043 if (op->op_q) {
1044 dispatch_release(op->op_q);
1045 }
1046 Block_release(op->handler);
1047 }
1048
1049 static void
1050 _dispatch_operation_enqueue(dispatch_operation_t op,
1051 dispatch_op_direction_t direction, dispatch_data_t data)
1052 {
1053 // Called from the barrier queue
1054 _dispatch_io_data_retain(data);
1055 // If channel is closed or stopped, then call the handler immediately
1056 int err = _dispatch_io_get_error(NULL, op->channel, false);
1057 if (err) {
1058 dispatch_io_handler_t handler = op->handler;
1059 dispatch_async(op->op_q, ^{
1060 dispatch_data_t d = data;
1061 if (direction == DOP_DIR_READ && err) {
1062 d = NULL;
1063 } else if (direction == DOP_DIR_WRITE && !err) {
1064 d = NULL;
1065 }
1066 handler(true, d, err);
1067 _dispatch_io_data_release(data);
1068 });
1069 _dispatch_release(op);
1070 return;
1071 }
1072 // Finish operation init
1073 op->fd_entry = op->channel->fd_entry;
1074 _dispatch_fd_entry_retain(op->fd_entry);
1075 dispatch_group_enter(op->fd_entry->barrier_group);
1076 dispatch_disk_t disk = op->fd_entry->disk;
1077 if (!disk) {
1078 dispatch_stream_t stream = op->fd_entry->streams[direction];
1079 dispatch_async(stream->dq, ^{
1080 _dispatch_stream_enqueue_operation(stream, op, data);
1081 _dispatch_io_data_release(data);
1082 });
1083 } else {
1084 dispatch_async(disk->pick_queue, ^{
1085 _dispatch_disk_enqueue_operation(disk, op, data);
1086 _dispatch_io_data_release(data);
1087 });
1088 }
1089 }
1090
1091 static bool
1092 _dispatch_operation_should_enqueue(dispatch_operation_t op,
1093 dispatch_queue_t tq, dispatch_data_t data)
1094 {
1095 // On stream queue or disk queue
1096 _dispatch_fd_debug("enqueue operation", op->fd_entry->fd);
1097 _dispatch_io_data_retain(data);
1098 op->data = data;
1099 int err = _dispatch_io_get_error(op, NULL, true);
1100 if (err) {
1101 op->err = err;
1102 // Final release
1103 _dispatch_release(op);
1104 return false;
1105 }
1106 if (op->params.interval) {
1107 dispatch_resume(_dispatch_operation_timer(tq, op));
1108 }
1109 return true;
1110 }
1111
1112 static dispatch_source_t
1113 _dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
1114 {
1115 // On stream queue or pick queue
1116 if (op->timer) {
1117 return op->timer;
1118 }
1119 dispatch_source_t timer = dispatch_source_create(
1120 DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
1121 dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW,
1122 (int64_t)op->params.interval), op->params.interval, 0);
1123 dispatch_source_set_event_handler(timer, ^{
1124 // On stream queue or pick queue
1125 if (dispatch_source_testcancel(timer)) {
1126 // Do nothing. The operation has already completed
1127 return;
1128 }
1129 dispatch_op_flags_t flags = DOP_DEFAULT;
1130 if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
1131 // Deliver even if there is less data than the low-water mark
1132 flags |= DOP_DELIVER;
1133 }
1134 // If the operation is active, dont deliver data
1135 if ((op->active) && (flags & DOP_DELIVER)) {
1136 op->flags = flags;
1137 } else {
1138 _dispatch_operation_deliver_data(op, flags);
1139 }
1140 });
1141 op->timer = timer;
1142 return op->timer;
1143 }
1144
1145 #pragma mark -
1146 #pragma mark dispatch_fd_entry_t
1147
1148 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1149 static void
1150 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry)
1151 {
1152 guardid_t guard = fd_entry;
1153 const unsigned int guard_flags = GUARD_CLOSE;
1154 int err, fd_flags = 0;
1155 _dispatch_io_syscall_switch_noerr(err,
1156 change_fdguard_np(fd_entry->fd, NULL, 0, &guard, guard_flags,
1157 &fd_flags),
1158 case 0:
1159 fd_entry->guard_flags = guard_flags;
1160 fd_entry->orig_fd_flags = fd_flags;
1161 break;
1162 case EPERM: break;
1163 default: (void)dispatch_assume_zero(err); break;
1164 );
1165 }
1166
1167 static void
1168 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry)
1169 {
1170 if (!fd_entry->guard_flags) {
1171 return;
1172 }
1173 guardid_t guard = fd_entry;
1174 int err, fd_flags = fd_entry->orig_fd_flags;
1175 _dispatch_io_syscall_switch(err,
1176 change_fdguard_np(fd_entry->fd, &guard, fd_entry->guard_flags, NULL, 0,
1177 &fd_flags),
1178 default: (void)dispatch_assume_zero(err); break;
1179 );
1180 }
1181 #else
1182 static inline void
1183 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1184 static inline void
1185 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1186 #endif // DISPATCH_USE_GUARDED_FD
1187
1188 static inline int
1189 _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry, const char *path,
1190 int oflag, mode_t mode) {
1191 #if DISPATCH_USE_GUARDED_FD
1192 guardid_t guard = (uintptr_t)fd_entry;
1193 const unsigned int guard_flags = GUARD_CLOSE | GUARD_DUP |
1194 GUARD_SOCKET_IPC | GUARD_FILEPORT;
1195 int fd = guarded_open_np(path, &guard, guard_flags, oflag | O_CLOEXEC,
1196 mode);
1197 if (fd != -1) {
1198 fd_entry->guard_flags = guard_flags;
1199 return fd;
1200 }
1201 errno = 0;
1202 #endif
1203 return open(path, oflag, mode);
1204 (void)fd_entry;
1205 }
1206
1207 static inline int
1208 _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry, int fd) {
1209 #if DISPATCH_USE_GUARDED_FD
1210 if (fd_entry->guard_flags) {
1211 guardid_t guard = (uintptr_t)fd_entry;
1212 return guarded_close_np(fd, &guard);
1213 } else
1214 #endif
1215 {
1216 return close(fd);
1217 }
1218 (void)fd_entry;
1219 }
1220
1221 static inline void
1222 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
1223 dispatch_suspend(fd_entry->close_queue);
1224 }
1225
1226 static inline void
1227 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
1228 dispatch_resume(fd_entry->close_queue);
1229 }
1230
1231 static void
1232 _dispatch_fd_entry_init_async(dispatch_fd_t fd,
1233 dispatch_fd_entry_init_callback_t completion_callback)
1234 {
1235 static dispatch_once_t _dispatch_io_fds_lockq_pred;
1236 dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
1237 _dispatch_io_fds_lockq_init);
1238 dispatch_async(_dispatch_io_fds_lockq, ^{
1239 _dispatch_fd_debug("fd entry init", fd);
1240 dispatch_fd_entry_t fd_entry = NULL;
1241 // Check to see if there is an existing entry for the given fd
1242 uintptr_t hash = DIO_HASH(fd);
1243 TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
1244 if (fd_entry->fd == fd) {
1245 // Retain the fd_entry to ensure it cannot go away until the
1246 // stat() has completed
1247 _dispatch_fd_entry_retain(fd_entry);
1248 break;
1249 }
1250 }
1251 if (!fd_entry) {
1252 // If we did not find an existing entry, create one
1253 fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
1254 }
1255 dispatch_async(fd_entry->barrier_queue, ^{
1256 _dispatch_fd_debug("fd entry init completion", fd);
1257 completion_callback(fd_entry);
1258 // stat() is complete, release reference to fd_entry
1259 _dispatch_fd_entry_release(fd_entry);
1260 });
1261 });
1262 }
1263
1264 static dispatch_fd_entry_t
1265 _dispatch_fd_entry_create(dispatch_queue_t q)
1266 {
1267 dispatch_fd_entry_t fd_entry;
1268 fd_entry = _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s));
1269 fd_entry->close_queue = dispatch_queue_create(
1270 "com.apple.libdispatch-io.closeq", NULL);
1271 // Use target queue to ensure that no concurrent lookups are going on when
1272 // the close queue is running
1273 fd_entry->close_queue->do_targetq = q;
1274 _dispatch_retain(q);
1275 // Suspend the cleanup queue until closing
1276 _dispatch_fd_entry_retain(fd_entry);
1277 return fd_entry;
1278 }
1279
1280 static dispatch_fd_entry_t
1281 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
1282 {
1283 // On fds lock queue
1284 _dispatch_fd_debug("fd entry create", fd);
1285 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1286 _dispatch_io_fds_lockq);
1287 fd_entry->fd = fd;
1288 TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
1289 fd_entry->barrier_queue = dispatch_queue_create(
1290 "com.apple.libdispatch-io.barrierq", NULL);
1291 fd_entry->barrier_group = dispatch_group_create();
1292 dispatch_async(fd_entry->barrier_queue, ^{
1293 _dispatch_fd_debug("fd entry stat", fd);
1294 int err, orig_flags, orig_nosigpipe = -1;
1295 struct stat st;
1296 _dispatch_io_syscall_switch(err,
1297 fstat(fd, &st),
1298 default: fd_entry->err = err; return;
1299 );
1300 fd_entry->stat.dev = st.st_dev;
1301 fd_entry->stat.mode = st.st_mode;
1302 _dispatch_fd_entry_guard(fd_entry);
1303 _dispatch_io_syscall_switch(err,
1304 orig_flags = fcntl(fd, F_GETFL),
1305 default: (void)dispatch_assume_zero(err); break;
1306 );
1307 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1308 if (S_ISFIFO(st.st_mode)) {
1309 _dispatch_io_syscall_switch(err,
1310 orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
1311 default: (void)dispatch_assume_zero(err); break;
1312 );
1313 if (orig_nosigpipe != -1) {
1314 _dispatch_io_syscall_switch(err,
1315 orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
1316 default:
1317 orig_nosigpipe = -1;
1318 (void)dispatch_assume_zero(err);
1319 break;
1320 );
1321 }
1322 }
1323 #endif
1324 if (S_ISREG(st.st_mode)) {
1325 if (orig_flags != -1) {
1326 _dispatch_io_syscall_switch(err,
1327 fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
1328 default:
1329 orig_flags = -1;
1330 (void)dispatch_assume_zero(err);
1331 break;
1332 );
1333 }
1334 int32_t dev = major(st.st_dev);
1335 // We have to get the disk on the global dev queue. The
1336 // barrier queue cannot continue until that is complete
1337 dispatch_suspend(fd_entry->barrier_queue);
1338 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
1339 _dispatch_io_devs_lockq_init);
1340 dispatch_async(_dispatch_io_devs_lockq, ^{
1341 _dispatch_disk_init(fd_entry, dev);
1342 dispatch_resume(fd_entry->barrier_queue);
1343 });
1344 } else {
1345 if (orig_flags != -1) {
1346 _dispatch_io_syscall_switch(err,
1347 fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
1348 default:
1349 orig_flags = -1;
1350 (void)dispatch_assume_zero(err);
1351 break;
1352 );
1353 }
1354 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1355 DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
1356 }
1357 fd_entry->orig_flags = orig_flags;
1358 fd_entry->orig_nosigpipe = orig_nosigpipe;
1359 });
1360 // This is the first item run when the close queue is resumed, indicating
1361 // that all channels associated with this entry have been closed and that
1362 // all operations associated with this entry have been freed
1363 dispatch_async(fd_entry->close_queue, ^{
1364 if (!fd_entry->disk) {
1365 _dispatch_fd_debug("close queue fd_entry cleanup", fd);
1366 dispatch_op_direction_t dir;
1367 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1368 _dispatch_stream_dispose(fd_entry, dir);
1369 }
1370 } else {
1371 dispatch_disk_t disk = fd_entry->disk;
1372 dispatch_async(_dispatch_io_devs_lockq, ^{
1373 _dispatch_release(disk);
1374 });
1375 }
1376 // Remove this entry from the global fd list
1377 TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
1378 });
1379 // If there was a source associated with this stream, disposing of the
1380 // source cancels it and suspends the close queue. Freeing the fd_entry
1381 // structure must happen after the source cancel handler has finished
1382 dispatch_async(fd_entry->close_queue, ^{
1383 _dispatch_fd_debug("close queue release", fd);
1384 dispatch_release(fd_entry->close_queue);
1385 _dispatch_fd_debug("barrier queue release", fd);
1386 dispatch_release(fd_entry->barrier_queue);
1387 _dispatch_fd_debug("barrier group release", fd);
1388 dispatch_release(fd_entry->barrier_group);
1389 if (fd_entry->orig_flags != -1) {
1390 _dispatch_io_syscall(
1391 fcntl(fd, F_SETFL, fd_entry->orig_flags)
1392 );
1393 }
1394 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1395 if (fd_entry->orig_nosigpipe != -1) {
1396 _dispatch_io_syscall(
1397 fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
1398 );
1399 }
1400 #endif
1401 _dispatch_fd_entry_unguard(fd_entry);
1402 if (fd_entry->convenience_channel) {
1403 fd_entry->convenience_channel->fd_entry = NULL;
1404 dispatch_release(fd_entry->convenience_channel);
1405 }
1406 free(fd_entry);
1407 });
1408 return fd_entry;
1409 }
1410
1411 static dispatch_fd_entry_t
1412 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
1413 dev_t dev, mode_t mode)
1414 {
1415 // On devs lock queue
1416 _dispatch_fd_debug("fd entry create with path %s", -1, path_data->path);
1417 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1418 path_data->channel->queue);
1419 if (S_ISREG(mode)) {
1420 _dispatch_disk_init(fd_entry, major(dev));
1421 } else {
1422 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1423 DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
1424 }
1425 fd_entry->fd = -1;
1426 fd_entry->orig_flags = -1;
1427 fd_entry->path_data = path_data;
1428 fd_entry->stat.dev = dev;
1429 fd_entry->stat.mode = mode;
1430 fd_entry->barrier_queue = dispatch_queue_create(
1431 "com.apple.libdispatch-io.barrierq", NULL);
1432 fd_entry->barrier_group = dispatch_group_create();
1433 // This is the first item run when the close queue is resumed, indicating
1434 // that the channel associated with this entry has been closed and that
1435 // all operations associated with this entry have been freed
1436 dispatch_async(fd_entry->close_queue, ^{
1437 _dispatch_fd_debug("close queue fd_entry cleanup", -1);
1438 if (!fd_entry->disk) {
1439 dispatch_op_direction_t dir;
1440 for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1441 _dispatch_stream_dispose(fd_entry, dir);
1442 }
1443 }
1444 if (fd_entry->fd != -1) {
1445 _dispatch_fd_entry_guarded_close(fd_entry, fd_entry->fd);
1446 }
1447 if (fd_entry->path_data->channel) {
1448 // If associated channel has not been released yet, mark it as
1449 // no longer having an fd_entry (for stop after close).
1450 // It is safe to modify channel since we are on close_queue with
1451 // target queue the channel queue
1452 fd_entry->path_data->channel->fd_entry = NULL;
1453 }
1454 });
1455 dispatch_async(fd_entry->close_queue, ^{
1456 _dispatch_fd_debug("close queue release", -1);
1457 dispatch_release(fd_entry->close_queue);
1458 dispatch_release(fd_entry->barrier_queue);
1459 dispatch_release(fd_entry->barrier_group);
1460 free(fd_entry->path_data);
1461 free(fd_entry);
1462 });
1463 return fd_entry;
1464 }
1465
1466 static int
1467 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
1468 {
1469 if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
1470 return 0;
1471 }
1472 if (fd_entry->err) {
1473 return fd_entry->err;
1474 }
1475 int fd = -1;
1476 int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
1477 fd_entry->path_data->oflag | O_NONBLOCK;
1478 open:
1479 fd = _dispatch_fd_entry_guarded_open(fd_entry, fd_entry->path_data->path,
1480 oflag, fd_entry->path_data->mode);
1481 if (fd == -1) {
1482 int err = errno;
1483 if (err == EINTR) {
1484 goto open;
1485 }
1486 (void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err, relaxed);
1487 return err;
1488 }
1489 if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd, relaxed)) {
1490 // Lost the race with another open
1491 _dispatch_fd_entry_guarded_close(fd_entry, fd);
1492 } else {
1493 channel->fd_actual = fd;
1494 }
1495 _dispatch_object_debug(channel, "%s", __func__);
1496 return 0;
1497 }
1498
1499 static void
1500 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
1501 dispatch_io_t channel)
1502 {
1503 if (fd_entry->disk) {
1504 if (channel) {
1505 _dispatch_retain(channel);
1506 }
1507 _dispatch_fd_entry_retain(fd_entry);
1508 dispatch_async(fd_entry->disk->pick_queue, ^{
1509 _dispatch_disk_cleanup_operations(fd_entry->disk, channel);
1510 _dispatch_fd_entry_release(fd_entry);
1511 if (channel) {
1512 _dispatch_release(channel);
1513 }
1514 });
1515 } else {
1516 dispatch_op_direction_t direction;
1517 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1518 dispatch_stream_t stream = fd_entry->streams[direction];
1519 if (!stream) {
1520 continue;
1521 }
1522 if (channel) {
1523 _dispatch_retain(channel);
1524 }
1525 _dispatch_fd_entry_retain(fd_entry);
1526 dispatch_async(stream->dq, ^{
1527 _dispatch_stream_cleanup_operations(stream, channel);
1528 _dispatch_fd_entry_release(fd_entry);
1529 if (channel) {
1530 _dispatch_release(channel);
1531 }
1532 });
1533 }
1534 }
1535 }
1536
1537 #pragma mark -
1538 #pragma mark dispatch_stream_t/dispatch_disk_t
1539
1540 static void
1541 _dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
1542 {
1543 dispatch_op_direction_t direction;
1544 for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1545 dispatch_stream_t stream;
1546 stream = _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s));
1547 stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq",
1548 NULL);
1549 dispatch_set_context(stream->dq, stream);
1550 _dispatch_retain(tq);
1551 stream->dq->do_targetq = tq;
1552 TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
1553 TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
1554 fd_entry->streams[direction] = stream;
1555 }
1556 }
1557
1558 static void
1559 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
1560 dispatch_op_direction_t direction)
1561 {
1562 // On close queue
1563 dispatch_stream_t stream = fd_entry->streams[direction];
1564 if (!stream) {
1565 return;
1566 }
1567 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1568 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
1569 if (stream->source) {
1570 // Balanced by source cancel handler:
1571 _dispatch_fd_entry_retain(fd_entry);
1572 dispatch_source_cancel(stream->source);
1573 dispatch_resume(stream->source);
1574 dispatch_release(stream->source);
1575 }
1576 dispatch_set_context(stream->dq, NULL);
1577 dispatch_release(stream->dq);
1578 free(stream);
1579 }
1580
1581 static void
1582 _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
1583 {
1584 // On devs lock queue
1585 dispatch_disk_t disk;
1586 // Check to see if there is an existing entry for the given device
1587 uintptr_t hash = DIO_HASH(dev);
1588 TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
1589 if (disk->dev == dev) {
1590 _dispatch_retain(disk);
1591 goto out;
1592 }
1593 }
1594 // Otherwise create a new entry
1595 size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
1596 disk = _dispatch_alloc(DISPATCH_VTABLE(disk),
1597 sizeof(struct dispatch_disk_s) +
1598 (pending_reqs_depth * sizeof(dispatch_operation_t)));
1599 disk->do_next = DISPATCH_OBJECT_LISTLESS;
1600 disk->do_xref_cnt = -1;
1601 disk->advise_list_depth = pending_reqs_depth;
1602 disk->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
1603 false);
1604 disk->dev = dev;
1605 TAILQ_INIT(&disk->operations);
1606 disk->cur_rq = TAILQ_FIRST(&disk->operations);
1607 char label[45];
1608 snprintf(label, sizeof(label), "com.apple.libdispatch-io.deviceq.%d", dev);
1609 disk->pick_queue = dispatch_queue_create(label, NULL);
1610 TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
1611 out:
1612 fd_entry->disk = disk;
1613 TAILQ_INIT(&fd_entry->stream_ops);
1614 }
1615
1616 void
1617 _dispatch_disk_dispose(dispatch_disk_t disk)
1618 {
1619 uintptr_t hash = DIO_HASH(disk->dev);
1620 TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
1621 dispatch_assert(TAILQ_EMPTY(&disk->operations));
1622 size_t i;
1623 for (i=0; i<disk->advise_list_depth; ++i) {
1624 dispatch_assert(!disk->advise_list[i]);
1625 }
1626 dispatch_release(disk->pick_queue);
1627 }
1628
1629 #pragma mark -
1630 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1631
1632 static inline bool
1633 _dispatch_stream_operation_avail(dispatch_stream_t stream)
1634 {
1635 return !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
1636 !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1637 }
1638
1639 static void
1640 _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
1641 dispatch_operation_t op, dispatch_data_t data)
1642 {
1643 if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
1644 return;
1645 }
1646 _dispatch_object_debug(op, "%s", __func__);
1647 bool no_ops = !_dispatch_stream_operation_avail(stream);
1648 TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
1649 if (no_ops) {
1650 dispatch_async_f(stream->dq, stream->dq,
1651 _dispatch_stream_queue_handler);
1652 }
1653 }
1654
1655 static void
1656 _dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
1657 dispatch_data_t data)
1658 {
1659 if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
1660 return;
1661 }
1662 _dispatch_object_debug(op, "%s", __func__);
1663 if (op->params.type == DISPATCH_IO_STREAM) {
1664 if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
1665 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1666 }
1667 TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
1668 } else {
1669 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1670 }
1671 _dispatch_disk_handler(disk);
1672 }
1673
1674 static void
1675 _dispatch_stream_complete_operation(dispatch_stream_t stream,
1676 dispatch_operation_t op)
1677 {
1678 // On stream queue
1679 _dispatch_object_debug(op, "%s", __func__);
1680 _dispatch_fd_debug("complete operation", op->fd_entry->fd);
1681 TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
1682 if (op == stream->op) {
1683 stream->op = NULL;
1684 }
1685 if (op->timer) {
1686 dispatch_source_cancel(op->timer);
1687 }
1688 // Final release will deliver any pending data
1689 _dispatch_release(op);
1690 }
1691
1692 static void
1693 _dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
1694 {
1695 // On pick queue
1696 _dispatch_object_debug(op, "%s", __func__);
1697 _dispatch_fd_debug("complete operation", op->fd_entry->fd);
1698 // Current request is always the last op returned
1699 if (disk->cur_rq == op) {
1700 disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
1701 operation_list);
1702 }
1703 if (op->params.type == DISPATCH_IO_STREAM) {
1704 // Check if there are other pending stream operations behind it
1705 dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
1706 TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
1707 if (op_next) {
1708 TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
1709 }
1710 }
1711 TAILQ_REMOVE(&disk->operations, op, operation_list);
1712 if (op->timer) {
1713 dispatch_source_cancel(op->timer);
1714 }
1715 // Final release will deliver any pending data
1716 _dispatch_release(op);
1717 }
1718
1719 static dispatch_operation_t
1720 _dispatch_stream_pick_next_operation(dispatch_stream_t stream,
1721 dispatch_operation_t op)
1722 {
1723 // On stream queue
1724 if (!op) {
1725 // On the first run through, pick the first operation
1726 if (!_dispatch_stream_operation_avail(stream)) {
1727 return op;
1728 }
1729 if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
1730 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
1731 } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
1732 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1733 }
1734 return op;
1735 }
1736 if (op->params.type == DISPATCH_IO_STREAM) {
1737 // Stream operations need to be serialized so continue the current
1738 // operation until it is finished
1739 return op;
1740 }
1741 // Get the next random operation (round-robin)
1742 if (op->params.type == DISPATCH_IO_RANDOM) {
1743 op = TAILQ_NEXT(op, operation_list);
1744 if (!op) {
1745 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1746 }
1747 return op;
1748 }
1749 return NULL;
1750 }
1751
1752 static dispatch_operation_t
1753 _dispatch_disk_pick_next_operation(dispatch_disk_t disk)
1754 {
1755 // On pick queue
1756 dispatch_operation_t op;
1757 if (!TAILQ_EMPTY(&disk->operations)) {
1758 if (disk->cur_rq == NULL) {
1759 op = TAILQ_FIRST(&disk->operations);
1760 } else {
1761 op = disk->cur_rq;
1762 do {
1763 op = TAILQ_NEXT(op, operation_list);
1764 if (!op) {
1765 op = TAILQ_FIRST(&disk->operations);
1766 }
1767 // TODO: more involved picking algorithm rdar://problem/8780312
1768 } while (op->active && op != disk->cur_rq);
1769 }
1770 if (!op->active) {
1771 disk->cur_rq = op;
1772 return op;
1773 }
1774 }
1775 return NULL;
1776 }
1777
1778 static void
1779 _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
1780 dispatch_io_t channel)
1781 {
1782 // On stream queue
1783 dispatch_operation_t op, tmp;
1784 typeof(*stream->operations) *operations;
1785 operations = &stream->operations[DISPATCH_IO_RANDOM];
1786 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1787 if (!channel || op->channel == channel) {
1788 _dispatch_stream_complete_operation(stream, op);
1789 }
1790 }
1791 operations = &stream->operations[DISPATCH_IO_STREAM];
1792 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1793 if (!channel || op->channel == channel) {
1794 _dispatch_stream_complete_operation(stream, op);
1795 }
1796 }
1797 if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
1798 dispatch_suspend(stream->source);
1799 stream->source_running = false;
1800 }
1801 }
1802
1803 static void
1804 _dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
1805 {
1806 // On pick queue
1807 dispatch_operation_t op, tmp;
1808 TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
1809 if (!channel || op->channel == channel) {
1810 _dispatch_disk_complete_operation(disk, op);
1811 }
1812 }
1813 }
1814
1815 #pragma mark -
1816 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1817
1818 static dispatch_source_t
1819 _dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
1820 {
1821 // On stream queue
1822 if (stream->source) {
1823 return stream->source;
1824 }
1825 dispatch_fd_t fd = op->fd_entry->fd;
1826 _dispatch_fd_debug("stream source create", fd);
1827 dispatch_source_t source = NULL;
1828 if (op->direction == DOP_DIR_READ) {
1829 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
1830 (uintptr_t)fd, 0, stream->dq);
1831 } else if (op->direction == DOP_DIR_WRITE) {
1832 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE,
1833 (uintptr_t)fd, 0, stream->dq);
1834 } else {
1835 dispatch_assert(op->direction < DOP_DIR_MAX);
1836 return NULL;
1837 }
1838 dispatch_set_context(source, stream);
1839 dispatch_source_set_event_handler_f(source,
1840 _dispatch_stream_source_handler);
1841 // Close queue must not run user cleanup handlers until sources are fully
1842 // unregistered
1843 dispatch_queue_t close_queue = op->fd_entry->close_queue;
1844 dispatch_source_set_cancel_handler(source, ^{
1845 _dispatch_fd_debug("stream source cancel", fd);
1846 dispatch_resume(close_queue);
1847 });
1848 stream->source = source;
1849 return stream->source;
1850 }
1851
1852 static void
1853 _dispatch_stream_source_handler(void *ctx)
1854 {
1855 // On stream queue
1856 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1857 dispatch_suspend(stream->source);
1858 stream->source_running = false;
1859 return _dispatch_stream_handler(stream);
1860 }
1861
1862 static void
1863 _dispatch_stream_queue_handler(void *ctx)
1864 {
1865 // On stream queue
1866 dispatch_stream_t stream = (dispatch_stream_t)dispatch_get_context(ctx);
1867 if (!stream) {
1868 // _dispatch_stream_dispose has been called
1869 return;
1870 }
1871 return _dispatch_stream_handler(stream);
1872 }
1873
1874 static void
1875 _dispatch_stream_handler(void *ctx)
1876 {
1877 // On stream queue
1878 dispatch_stream_t stream = (dispatch_stream_t)ctx;
1879 dispatch_operation_t op;
1880 pick:
1881 op = _dispatch_stream_pick_next_operation(stream, stream->op);
1882 if (!op) {
1883 _dispatch_debug("no operation found: stream %p", stream);
1884 return;
1885 }
1886 int err = _dispatch_io_get_error(op, NULL, true);
1887 if (err) {
1888 op->err = err;
1889 _dispatch_stream_complete_operation(stream, op);
1890 goto pick;
1891 }
1892 stream->op = op;
1893 _dispatch_fd_debug("stream handler", op->fd_entry->fd);
1894 dispatch_fd_entry_t fd_entry = op->fd_entry;
1895 _dispatch_fd_entry_retain(fd_entry);
1896 // For performance analysis
1897 if (!op->total && dispatch_io_defaults.initial_delivery) {
1898 // Empty delivery to signal the start of the operation
1899 _dispatch_fd_debug("initial delivery", op->fd_entry->fd);
1900 _dispatch_operation_deliver_data(op, DOP_DELIVER);
1901 }
1902 // TODO: perform on the operation target queue to get correct priority
1903 int result = _dispatch_operation_perform(op);
1904 dispatch_op_flags_t flags = ~0u;
1905 switch (result) {
1906 case DISPATCH_OP_DELIVER:
1907 flags = DOP_DEFAULT;
1908 // Fall through
1909 case DISPATCH_OP_DELIVER_AND_COMPLETE:
1910 flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
1911 DOP_DEFAULT;
1912 _dispatch_operation_deliver_data(op, flags);
1913 // Fall through
1914 case DISPATCH_OP_COMPLETE:
1915 if (flags != DOP_DEFAULT) {
1916 _dispatch_stream_complete_operation(stream, op);
1917 }
1918 if (_dispatch_stream_operation_avail(stream)) {
1919 dispatch_async_f(stream->dq, stream->dq,
1920 _dispatch_stream_queue_handler);
1921 }
1922 break;
1923 case DISPATCH_OP_COMPLETE_RESUME:
1924 _dispatch_stream_complete_operation(stream, op);
1925 // Fall through
1926 case DISPATCH_OP_RESUME:
1927 if (_dispatch_stream_operation_avail(stream)) {
1928 stream->source_running = true;
1929 dispatch_resume(_dispatch_stream_source(stream, op));
1930 }
1931 break;
1932 case DISPATCH_OP_ERR:
1933 _dispatch_stream_cleanup_operations(stream, op->channel);
1934 break;
1935 case DISPATCH_OP_FD_ERR:
1936 _dispatch_fd_entry_retain(fd_entry);
1937 dispatch_async(fd_entry->barrier_queue, ^{
1938 _dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
1939 _dispatch_fd_entry_release(fd_entry);
1940 });
1941 break;
1942 default:
1943 break;
1944 }
1945 _dispatch_fd_entry_release(fd_entry);
1946 return;
1947 }
1948
1949 static void
1950 _dispatch_disk_handler(void *ctx)
1951 {
1952 // On pick queue
1953 dispatch_disk_t disk = (dispatch_disk_t)ctx;
1954 if (disk->io_active) {
1955 return;
1956 }
1957 _dispatch_fd_debug("disk handler", -1);
1958 dispatch_operation_t op;
1959 size_t i = disk->free_idx, j = disk->req_idx;
1960 if (j <= i) {
1961 j += disk->advise_list_depth;
1962 }
1963 while (i <= j) {
1964 if ((!disk->advise_list[i%disk->advise_list_depth]) &&
1965 (op = _dispatch_disk_pick_next_operation(disk))) {
1966 int err = _dispatch_io_get_error(op, NULL, true);
1967 if (err) {
1968 op->err = err;
1969 _dispatch_disk_complete_operation(disk, op);
1970 continue;
1971 }
1972 _dispatch_retain(op);
1973 disk->advise_list[i%disk->advise_list_depth] = op;
1974 op->active = true;
1975 _dispatch_object_debug(op, "%s", __func__);
1976 } else {
1977 // No more operations to get
1978 break;
1979 }
1980 i++;
1981 }
1982 disk->free_idx = (i%disk->advise_list_depth);
1983 op = disk->advise_list[disk->req_idx];
1984 if (op) {
1985 disk->io_active = true;
1986 dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
1987 }
1988 }
1989
1990 static void
1991 _dispatch_disk_perform(void *ctxt)
1992 {
1993 dispatch_disk_t disk = ctxt;
1994 size_t chunk_size = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
1995 _dispatch_fd_debug("disk perform", -1);
1996 dispatch_operation_t op;
1997 size_t i = disk->advise_idx, j = disk->free_idx;
1998 if (j <= i) {
1999 j += disk->advise_list_depth;
2000 }
2001 do {
2002 op = disk->advise_list[i%disk->advise_list_depth];
2003 if (!op) {
2004 // Nothing more to advise, must be at free_idx
2005 dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
2006 break;
2007 }
2008 if (op->direction == DOP_DIR_WRITE) {
2009 // TODO: preallocate writes ? rdar://problem/9032172
2010 continue;
2011 }
2012 if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
2013 op->channel)) {
2014 continue;
2015 }
2016 // For performance analysis
2017 if (!op->total && dispatch_io_defaults.initial_delivery) {
2018 // Empty delivery to signal the start of the operation
2019 _dispatch_fd_debug("initial delivery", op->fd_entry->fd);
2020 _dispatch_operation_deliver_data(op, DOP_DELIVER);
2021 }
2022 // Advise two chunks if the list only has one element and this is the
2023 // first advise on the operation
2024 if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
2025 !op->advise_offset) {
2026 chunk_size *= 2;
2027 }
2028 _dispatch_operation_advise(op, chunk_size);
2029 } while (++i < j);
2030 disk->advise_idx = i%disk->advise_list_depth;
2031 op = disk->advise_list[disk->req_idx];
2032 int result = _dispatch_operation_perform(op);
2033 disk->advise_list[disk->req_idx] = NULL;
2034 disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
2035 dispatch_async(disk->pick_queue, ^{
2036 switch (result) {
2037 case DISPATCH_OP_DELIVER:
2038 _dispatch_operation_deliver_data(op, DOP_DEFAULT);
2039 break;
2040 case DISPATCH_OP_COMPLETE:
2041 _dispatch_disk_complete_operation(disk, op);
2042 break;
2043 case DISPATCH_OP_DELIVER_AND_COMPLETE:
2044 _dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
2045 _dispatch_disk_complete_operation(disk, op);
2046 break;
2047 case DISPATCH_OP_ERR:
2048 _dispatch_disk_cleanup_operations(disk, op->channel);
2049 break;
2050 case DISPATCH_OP_FD_ERR:
2051 _dispatch_disk_cleanup_operations(disk, NULL);
2052 break;
2053 default:
2054 dispatch_assert(result);
2055 break;
2056 }
2057 op->active = false;
2058 disk->io_active = false;
2059 _dispatch_disk_handler(disk);
2060 // Balancing the retain in _dispatch_disk_handler. Note that op must be
2061 // released at the very end, since it might hold the last reference to
2062 // the disk
2063 _dispatch_release(op);
2064 });
2065 }
2066
2067 #pragma mark -
2068 #pragma mark dispatch_operation_perform
2069
2070 static void
2071 _dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
2072 {
2073 int err;
2074 struct radvisory advise;
2075 // No point in issuing a read advise for the next chunk if we are already
2076 // a chunk ahead from reading the bytes
2077 if (op->advise_offset > (off_t)(((size_t)op->offset + op->total) +
2078 chunk_size + PAGE_SIZE)) {
2079 return;
2080 }
2081 _dispatch_object_debug(op, "%s", __func__);
2082 advise.ra_count = (int)chunk_size;
2083 if (!op->advise_offset) {
2084 op->advise_offset = op->offset;
2085 // If this is the first time through, align the advised range to a
2086 // page boundary
2087 size_t pg_fraction = ((size_t)op->offset + chunk_size) % PAGE_SIZE;
2088 advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
2089 }
2090 advise.ra_offset = op->advise_offset;
2091 op->advise_offset += advise.ra_count;
2092 _dispatch_io_syscall_switch(err,
2093 fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
2094 case EFBIG: break; // advised past the end of the file rdar://10415691
2095 case ENOTSUP: break; // not all FS support radvise rdar://13484629
2096 // TODO: set disk status on error
2097 default: (void)dispatch_assume_zero(err); break;
2098 );
2099 }
2100
2101 static int
2102 _dispatch_operation_perform(dispatch_operation_t op)
2103 {
2104 int err = _dispatch_io_get_error(op, NULL, true);
2105 if (err) {
2106 goto error;
2107 }
2108 _dispatch_object_debug(op, "%s", __func__);
2109 if (!op->buf) {
2110 size_t max_buf_siz = op->params.high;
2111 size_t chunk_siz = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
2112 if (op->direction == DOP_DIR_READ) {
2113 // If necessary, create a buffer for the ongoing operation, large
2114 // enough to fit chunk_pages but at most high-water
2115 size_t data_siz = dispatch_data_get_size(op->data);
2116 if (data_siz) {
2117 dispatch_assert(data_siz < max_buf_siz);
2118 max_buf_siz -= data_siz;
2119 }
2120 if (max_buf_siz > chunk_siz) {
2121 max_buf_siz = chunk_siz;
2122 }
2123 if (op->length < SIZE_MAX) {
2124 op->buf_siz = op->length - op->total;
2125 if (op->buf_siz > max_buf_siz) {
2126 op->buf_siz = max_buf_siz;
2127 }
2128 } else {
2129 op->buf_siz = max_buf_siz;
2130 }
2131 op->buf = valloc(op->buf_siz);
2132 _dispatch_fd_debug("buffer allocated", op->fd_entry->fd);
2133 } else if (op->direction == DOP_DIR_WRITE) {
2134 // Always write the first data piece, if that is smaller than a
2135 // chunk, accumulate further data pieces until chunk size is reached
2136 if (chunk_siz > max_buf_siz) {
2137 chunk_siz = max_buf_siz;
2138 }
2139 op->buf_siz = 0;
2140 dispatch_data_apply(op->data,
2141 ^(dispatch_data_t region DISPATCH_UNUSED,
2142 size_t offset DISPATCH_UNUSED,
2143 const void* buf DISPATCH_UNUSED, size_t len) {
2144 size_t siz = op->buf_siz + len;
2145 if (!op->buf_siz || siz <= chunk_siz) {
2146 op->buf_siz = siz;
2147 }
2148 return (bool)(siz < chunk_siz);
2149 });
2150 if (op->buf_siz > max_buf_siz) {
2151 op->buf_siz = max_buf_siz;
2152 }
2153 dispatch_data_t d;
2154 d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
2155 op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
2156 NULL);
2157 _dispatch_io_data_release(d);
2158 _dispatch_fd_debug("buffer mapped", op->fd_entry->fd);
2159 }
2160 }
2161 if (op->fd_entry->fd == -1) {
2162 err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
2163 if (err) {
2164 goto error;
2165 }
2166 }
2167 void *buf = op->buf + op->buf_len;
2168 size_t len = op->buf_siz - op->buf_len;
2169 off_t off = (off_t)((size_t)op->offset + op->total);
2170 ssize_t processed = -1;
2171 syscall:
2172 if (op->direction == DOP_DIR_READ) {
2173 if (op->params.type == DISPATCH_IO_STREAM) {
2174 processed = read(op->fd_entry->fd, buf, len);
2175 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2176 processed = pread(op->fd_entry->fd, buf, len, off);
2177 }
2178 } else if (op->direction == DOP_DIR_WRITE) {
2179 if (op->params.type == DISPATCH_IO_STREAM) {
2180 processed = write(op->fd_entry->fd, buf, len);
2181 } else if (op->params.type == DISPATCH_IO_RANDOM) {
2182 processed = pwrite(op->fd_entry->fd, buf, len, off);
2183 }
2184 }
2185 // Encountered an error on the file descriptor
2186 if (processed == -1) {
2187 err = errno;
2188 if (err == EINTR) {
2189 goto syscall;
2190 }
2191 goto error;
2192 }
2193 // EOF is indicated by two handler invocations
2194 if (processed == 0) {
2195 _dispatch_fd_debug("EOF", op->fd_entry->fd);
2196 return DISPATCH_OP_DELIVER_AND_COMPLETE;
2197 }
2198 op->buf_len += (size_t)processed;
2199 op->total += (size_t)processed;
2200 if (op->total == op->length) {
2201 // Finished processing all the bytes requested by the operation
2202 return DISPATCH_OP_COMPLETE;
2203 } else {
2204 // Deliver data only if we satisfy the filters
2205 return DISPATCH_OP_DELIVER;
2206 }
2207 error:
2208 if (err == EAGAIN) {
2209 // For disk based files with blocking I/O we should never get EAGAIN
2210 dispatch_assert(!op->fd_entry->disk);
2211 _dispatch_fd_debug("EAGAIN %d", op->fd_entry->fd, err);
2212 if (op->direction == DOP_DIR_READ && op->total &&
2213 op->channel == op->fd_entry->convenience_channel) {
2214 // Convenience read with available data completes on EAGAIN
2215 return DISPATCH_OP_COMPLETE_RESUME;
2216 }
2217 return DISPATCH_OP_RESUME;
2218 }
2219 op->err = err;
2220 switch (err) {
2221 case ECANCELED:
2222 return DISPATCH_OP_ERR;
2223 case EBADF:
2224 (void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err, relaxed);
2225 return DISPATCH_OP_FD_ERR;
2226 default:
2227 return DISPATCH_OP_COMPLETE;
2228 }
2229 }
2230
2231 static void
2232 _dispatch_operation_deliver_data(dispatch_operation_t op,
2233 dispatch_op_flags_t flags)
2234 {
2235 // Either called from stream resp. pick queue or when op is finalized
2236 dispatch_data_t data = NULL;
2237 int err = 0;
2238 size_t undelivered = op->undelivered + op->buf_len;
2239 bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
2240 (op->flags & DOP_DELIVER);
2241 op->flags = DOP_DEFAULT;
2242 if (!deliver) {
2243 // Don't deliver data until low water mark has been reached
2244 if (undelivered >= op->params.low) {
2245 deliver = true;
2246 } else if (op->buf_len < op->buf_siz) {
2247 // Request buffer is not yet used up
2248 _dispatch_fd_debug("buffer data", op->fd_entry->fd);
2249 return;
2250 }
2251 } else {
2252 err = op->err;
2253 if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
2254 err = ECANCELED;
2255 op->err = err;
2256 }
2257 }
2258 // Deliver data or buffer used up
2259 if (op->direction == DOP_DIR_READ) {
2260 if (op->buf_len) {
2261 void *buf = op->buf;
2262 data = dispatch_data_create(buf, op->buf_len, NULL,
2263 DISPATCH_DATA_DESTRUCTOR_FREE);
2264 op->buf = NULL;
2265 op->buf_len = 0;
2266 dispatch_data_t d = dispatch_data_create_concat(op->data, data);
2267 _dispatch_io_data_release(op->data);
2268 _dispatch_io_data_release(data);
2269 data = d;
2270 } else {
2271 data = op->data;
2272 }
2273 op->data = deliver ? dispatch_data_empty : data;
2274 } else if (op->direction == DOP_DIR_WRITE) {
2275 if (deliver) {
2276 data = dispatch_data_create_subrange(op->data, op->buf_len,
2277 op->length);
2278 }
2279 if (op->buf_data && op->buf_len == op->buf_siz) {
2280 _dispatch_io_data_release(op->buf_data);
2281 op->buf_data = NULL;
2282 op->buf = NULL;
2283 op->buf_len = 0;
2284 // Trim newly written buffer from head of unwritten data
2285 dispatch_data_t d;
2286 if (deliver) {
2287 _dispatch_io_data_retain(data);
2288 d = data;
2289 } else {
2290 d = dispatch_data_create_subrange(op->data, op->buf_siz,
2291 op->length);
2292 }
2293 _dispatch_io_data_release(op->data);
2294 op->data = d;
2295 }
2296 } else {
2297 dispatch_assert(op->direction < DOP_DIR_MAX);
2298 return;
2299 }
2300 if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
2301 op->undelivered = undelivered;
2302 _dispatch_fd_debug("buffer data", op->fd_entry->fd);
2303 return;
2304 }
2305 op->undelivered = 0;
2306 _dispatch_object_debug(op, "%s", __func__);
2307 _dispatch_fd_debug("deliver data", op->fd_entry->fd);
2308 dispatch_op_direction_t direction = op->direction;
2309 dispatch_io_handler_t handler = op->handler;
2310 #if DISPATCH_IO_DEBUG
2311 int fd = op->fd_entry->fd;
2312 #endif
2313 dispatch_fd_entry_t fd_entry = op->fd_entry;
2314 _dispatch_fd_entry_retain(fd_entry);
2315 dispatch_io_t channel = op->channel;
2316 _dispatch_retain(channel);
2317 // Note that data delivery may occur after the operation is freed
2318 dispatch_async(op->op_q, ^{
2319 bool done = (flags & DOP_DONE);
2320 dispatch_data_t d = data;
2321 if (done) {
2322 if (direction == DOP_DIR_READ && err) {
2323 if (dispatch_data_get_size(d)) {
2324 _dispatch_fd_debug("IO handler invoke", fd);
2325 handler(false, d, 0);
2326 }
2327 d = NULL;
2328 } else if (direction == DOP_DIR_WRITE && !err) {
2329 d = NULL;
2330 }
2331 }
2332 _dispatch_fd_debug("IO handler invoke", fd);
2333 handler(done, d, err);
2334 _dispatch_release(channel);
2335 _dispatch_fd_entry_release(fd_entry);
2336 _dispatch_io_data_release(data);
2337 });
2338 }
2339
2340 #pragma mark -
2341 #pragma mark dispatch_io_debug
2342
2343 static size_t
2344 _dispatch_io_debug_attr(dispatch_io_t channel, char* buf, size_t bufsiz)
2345 {
2346 dispatch_queue_t target = channel->do_targetq;
2347 return dsnprintf(buf, bufsiz, "type = %s, fd = 0x%x, %sfd_entry = %p, "
2348 "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = "
2349 "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ",
2350 channel->params.type == DISPATCH_IO_STREAM ? "stream" : "random",
2351 channel->fd_actual, channel->atomic_flags & DIO_STOPPED ?
2352 "stopped, " : channel->atomic_flags & DIO_CLOSED ? "closed, " : "",
2353 channel->fd_entry, channel->queue, target && target->dq_label ?
2354 target->dq_label : "", target, channel->barrier_queue,
2355 channel->barrier_group, channel->err, channel->params.low,
2356 channel->params.high, channel->params.interval_flags &
2357 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "",
2358 channel->params.interval);
2359 }
2360
2361 size_t
2362 _dispatch_io_debug(dispatch_io_t channel, char* buf, size_t bufsiz)
2363 {
2364 size_t offset = 0;
2365 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2366 dx_kind(channel), channel);
2367 offset += _dispatch_object_debug_attr(channel, &buf[offset],
2368 bufsiz - offset);
2369 offset += _dispatch_io_debug_attr(channel, &buf[offset], bufsiz - offset);
2370 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2371 return offset;
2372 }
2373
2374 static size_t
2375 _dispatch_operation_debug_attr(dispatch_operation_t op, char* buf,
2376 size_t bufsiz)
2377 {
2378 dispatch_queue_t target = op->do_targetq;
2379 dispatch_queue_t oqtarget = op->op_q ? op->op_q->do_targetq : NULL;
2380 return dsnprintf(buf, bufsiz, "type = %s %s, fd = 0x%x, fd_entry = %p, "
2381 "channel = %p, queue = %p -> %s[%p], target = %s[%p], "
2382 "offset = %lld, length = %zu, done = %zu, undelivered = %zu, "
2383 "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, "
2384 "interval%s = %llu ", op->params.type == DISPATCH_IO_STREAM ?
2385 "stream" : "random", op->direction == DOP_DIR_READ ? "read" :
2386 "write", op->fd_entry ? op->fd_entry->fd : -1, op->fd_entry,
2387 op->channel, op->op_q, oqtarget && oqtarget->dq_label ?
2388 oqtarget->dq_label : "", oqtarget, target && target->dq_label ?
2389 target->dq_label : "", target, op->offset, op->length, op->total,
2390 op->undelivered + op->buf_len, op->flags, op->err, op->params.low,
2391 op->params.high, op->params.interval_flags &
2392 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "", op->params.interval);
2393 }
2394
2395 size_t
2396 _dispatch_operation_debug(dispatch_operation_t op, char* buf, size_t bufsiz)
2397 {
2398 size_t offset = 0;
2399 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2400 dx_kind(op), op);
2401 offset += _dispatch_object_debug_attr(op, &buf[offset], bufsiz - offset);
2402 offset += _dispatch_operation_debug_attr(op, &buf[offset], bufsiz - offset);
2403 offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2404 return offset;
2405 }