5 #include <darwintest.h>
7 #include <darwintest_multiprocess.h>
10 #include <dispatch/dispatch.h>
11 #include <dispatch/private.h>
17 #include <pthread/workqueue_private.h>
21 #include <sys/event.h>
22 #include <sys/socket.h>
25 #include <sys/types.h>
30 #include <System/sys/event.h> /* kevent_qos */
33 T_META_NAMESPACE("xnu.kevent"),
34 T_META_CHECK_LEAKS(false),
35 T_META_LTEPHASE(LTE_POSTINIT
));
38 * Test to validate that monitoring a PTY device, FIFO, pipe, or socket pair in
39 * a dispatch source, kqueue, poll, or select delivers read events within and
40 * between processes as expected.
42 * This test catches issues with watching special devices in kqueue(),
43 * which has tricky special cases for character devices like PTYs.
45 * It also exercises the path to wake up a dispatch worker thread from the
46 * special device kqueue event, which is also a special case in kqueue().
48 * See rdar://problem/26240299&26220074&26226862&28625427 for examples and
52 #define EXPECTED_STRING "abcdefghijklmnopqrstuvwxyz. ABCDEFGHIJKLMNOPQRSTUVWXYZ. 1234567890"
53 #define EXPECTED_LEN strlen(EXPECTED_STRING)
55 #define READ_SETUP_TIMEOUT_SECS 2
56 #define WRITE_TIMEOUT_SECS 4
57 #define READ_TIMEOUT_SECS 4
58 #define INCREMENTAL_WRITE_SLEEP_USECS 50
60 static mach_timespec_t READ_SETUP_timeout
= {.tv_sec
= READ_SETUP_TIMEOUT_SECS
, .tv_nsec
= 0};
61 static mach_timespec_t READ_timeout
= {.tv_sec
= READ_TIMEOUT_SECS
, .tv_nsec
= 0};
62 static mach_timespec_t WRITE_timeout
= {.tv_sec
= WRITE_TIMEOUT_SECS
, .tv_nsec
= 0};
74 KEVENT_INCREMENTAL_WRITE
,
75 KEVENT64_INCREMENTAL_WRITE
,
76 KEVENT_QOS_INCREMENTAL_WRITE
,
77 WORKQ_INCREMENTAL_WRITE
,
78 DISPATCH_INCREMENTAL_WRITE
98 enum write_mode wr_mode
;
100 enum read_mode rd_mode
;
104 THREAD_WRITER
, /* sem */
105 PROCESS_WRITER
/* fd */
114 semaphore_t wr_finished
;
115 semaphore_t rd_finished
;
118 static bool handle_reading(enum fd_pair fd_pair
, int fd
);
119 static bool handle_writing(enum fd_pair fd_pair
, int fd
);
120 static void drive_kq(bool reading
, union mode mode
, enum fd_pair fd_pair
,
128 T_LOG("waking writer");
130 switch (shared
.wr_kind
) {
132 T_LOG("signal shared.wr_wait.sem");
133 semaphore_signal(shared
.wr_wait
.sem
);
135 case PROCESS_WRITER
: {
137 close(shared
.wr_wait
.out_fd
);
138 T_QUIET
; T_ASSERT_POSIX_SUCCESS(write(
139 shared
.wr_wait
.in_fd
, &tmp
, 1), NULL
);
148 switch (shared
.wr_kind
) {
150 T_LOG("wait shared.wr_wait.sem");
151 kern_return_t kret
= semaphore_timedwait(shared
.wr_wait
.sem
, READ_SETUP_timeout
);
153 if (kret
== KERN_OPERATION_TIMED_OUT
) {
154 T_ASSERT_FAIL("THREAD_WRITER semaphore timedout after %d seconds", READ_SETUP_timeout
.tv_sec
);
157 T_ASSERT_MACH_SUCCESS(kret
, "semaphore_timedwait shared.wr_wait.sem");
160 case PROCESS_WRITER
: {
162 close(shared
.wr_wait
.in_fd
);
163 T_QUIET
; T_ASSERT_POSIX_SUCCESS(read(
164 shared
.wr_wait
.out_fd
, &tmp
, 1), NULL
);
169 T_LOG("writer woken up, starting to write");
173 handle_writing(enum fd_pair __unused fd_pair
, int fd
)
175 static unsigned int cur_char
= 0;
176 T_QUIET
; T_ASSERT_POSIX_SUCCESS(write(fd
,
177 &(EXPECTED_STRING
[cur_char
]), 1), NULL
);
180 return (cur_char
< EXPECTED_LEN
);
183 #define EXPECTED_QOS QOS_CLASS_USER_INITIATED
186 reenable_workq(int fd
, int16_t filt
)
188 struct kevent_qos_s events
[] = {{
189 .ident
= (uint64_t)fd
,
191 .flags
= EV_ENABLE
| EV_UDATA_SPECIFIC
| EV_DISPATCH
,
192 .qos
= (int32_t)_pthread_qos_class_encode(EXPECTED_QOS
,
194 .fflags
= NOTE_LOWAT
,
198 int kev
= kevent_qos(-1, events
, 1, events
, 1, NULL
, NULL
,
199 KEVENT_FLAG_WORKQ
| KEVENT_FLAG_ERROR_EVENTS
);
200 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kev
, "reenable workq in kevent_qos");
204 workqueue_write_fn(void ** __unused buf
, int * __unused count
)
207 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
208 // "writer thread should be woken up at correct QoS");
209 if (!handle_writing(shared
.fd_pair
, shared
.wr_fd
)) {
210 /* finished handling the fd, tear down the source */
211 T_LOG("signal shared.wr_finished");
212 semaphore_signal(shared
.wr_finished
);
216 reenable_workq(shared
.wr_fd
, EVFILT_WRITE
);
220 workqueue_fn(pthread_priority_t __unused priority
)
222 T_ASSERT_FAIL("workqueue function callback was called");
226 drive_kq(bool reading
, union mode mode
, enum fd_pair fd_pair
, int fd
)
228 struct timespec timeout
= { .tv_sec
= READ_TIMEOUT_SECS
};
231 struct kevent events
;
232 EV_SET(&events
, fd
, reading
? EVFILT_READ
: EVFILT_WRITE
, EV_ADD
,
233 NOTE_LOWAT
, 1, NULL
);
234 struct kevent64_s events64
;
235 EV_SET64(&events64
, fd
, reading
? EVFILT_READ
: EVFILT_WRITE
, EV_ADD
,
236 NOTE_LOWAT
, 1, 0, 0, 0);
237 struct kevent_qos_s events_qos
[] = {{
238 .ident
= (uint64_t)fd
,
239 .filter
= reading
? EVFILT_READ
: EVFILT_WRITE
,
241 .fflags
= NOTE_LOWAT
,
245 .filter
= EVFILT_TIMER
,
247 .fflags
= NOTE_SECONDS
,
248 .data
= READ_TIMEOUT_SECS
251 /* determine which variant of kevent to use */
252 enum read_mode which_kevent
;
254 which_kevent
= mode
.rd
;
256 if (mode
.wr
== KEVENT_INCREMENTAL_WRITE
) {
257 which_kevent
= KEVENT_READ
;
258 } else if (mode
.wr
== KEVENT64_INCREMENTAL_WRITE
) {
259 which_kevent
= KEVENT64_READ
;
260 } else if (mode
.wr
== KEVENT_QOS_INCREMENTAL_WRITE
) {
261 which_kevent
= KEVENT_QOS_READ
;
263 T_ASSERT_FAIL("unexpected mode: %d", mode
.wr
);
264 __builtin_unreachable();
268 int kq_fd
= kqueue();
269 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kq_fd
, "kqueue");
271 switch (which_kevent
) {
273 kev
= kevent(kq_fd
, &events
, 1, NULL
, 0, NULL
);
276 kev
= kevent64(kq_fd
, &events64
, 1, NULL
, 0, 0, NULL
);
278 case KEVENT_QOS_READ
:
279 kev
= kevent_qos(kq_fd
, events_qos
, 2, NULL
, 0, NULL
, NULL
, 0);
281 case POLL_READ
: /* FALLTHROUGH */
282 case SELECT_READ
: /* FALLTHROUGH */
283 case DISPATCH_READ
: /* FALLTHROUGH */
284 case WORKQ_READ
: /* FALLTHROUGH */
286 T_ASSERT_FAIL("unexpected mode: %d", reading
? mode
.rd
: mode
.wr
);
297 switch (which_kevent
) {
299 kev
= kevent(kq_fd
, NULL
, 0, &events
, 1, &timeout
);
302 kev
= kevent64(kq_fd
, NULL
, 0, &events64
, 1, 0, &timeout
);
304 case KEVENT_QOS_READ
:
305 kev
= kevent_qos(kq_fd
, NULL
, 0, events_qos
, 2, NULL
, NULL
, 0);
307 /* check for a timeout */
308 for (int i
= 0; i
< kev
; i
++) {
309 if (events_qos
[i
].filter
== EVFILT_TIMER
) {
314 case POLL_READ
: /* FALLTHROUGH */
315 case SELECT_READ
: /* FALLTHROUGH */
316 case DISPATCH_READ
: /* FALLTHROUGH */
317 case WORKQ_READ
: /* FALLTHROUGH */
319 T_ASSERT_FAIL("unexpected mode: %d", reading
? mode
.rd
: mode
.wr
);
323 if (kev
== -1 && errno
== EINTR
) {
324 T_LOG("kevent was interrupted");
327 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kev
, "kevent");
328 T_QUIET
; T_ASSERT_NE(kev
, 0, "kevent timed out");
331 if (!handle_reading(fd_pair
, fd
)) {
335 if (!handle_writing(fd_pair
, fd
)) {
345 write_to_fd(void * __unused ctx
)
347 ssize_t bytes_wr
= 0;
351 switch (shared
.wr_mode
) {
354 if (bytes_wr
== -1) {
355 T_LOG("write from child was interrupted");
357 bytes_wr
= write(shared
.wr_fd
, EXPECTED_STRING
,
359 } while (bytes_wr
== -1 && errno
== EINTR
);
360 T_QUIET
; T_ASSERT_POSIX_SUCCESS(bytes_wr
, "write");
361 T_QUIET
; T_ASSERT_EQ(bytes_wr
, (ssize_t
)EXPECTED_LEN
,
362 "wrote enough bytes");
365 case INCREMENTAL_WRITE
:
366 for (unsigned int i
= 0; i
< EXPECTED_LEN
; i
++) {
368 T_ASSERT_POSIX_SUCCESS(write(shared
.wr_fd
,
369 &(EXPECTED_STRING
[i
]), 1), NULL
);
370 usleep(INCREMENTAL_WRITE_SLEEP_USECS
);
374 case KEVENT_INCREMENTAL_WRITE
: /* FALLTHROUGH */
375 case KEVENT64_INCREMENTAL_WRITE
: /* FALLTHROUGH */
376 case KEVENT_QOS_INCREMENTAL_WRITE
: {
377 union mode mode
= { .wr
= shared
.wr_mode
};
378 drive_kq(false, mode
, shared
.fd_pair
, shared
.wr_fd
);
382 case WORKQ_INCREMENTAL_WRITE
: {
383 // prohibit ourselves from going multi-threaded see:rdar://33296008
384 _dispatch_prohibit_transition_to_multithreaded(true);
387 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.wr_finished
, SYNC_POLICY_FIFO
, 0),
388 "semaphore_create shared.wr_finished");
391 T_ASSERT_NE_UINT(shared
.wr_finished
, (unsigned)MACH_PORT_NULL
, "wr_finished semaphore_create");
394 T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(workqueue_fn
, workqueue_write_fn
, 0, 0), NULL
);
396 struct kevent_qos_s events
[] = {{
397 .ident
= (uint64_t)shared
.wr_fd
,
398 .filter
= EVFILT_WRITE
,
399 .flags
= EV_ADD
| EV_UDATA_SPECIFIC
| EV_DISPATCH
| EV_VANISHED
,
400 .fflags
= NOTE_LOWAT
,
402 .qos
= (int32_t)_pthread_qos_class_encode(EXPECTED_QOS
,
407 int kev
= kevent_qos(-1, changes
== 0 ? NULL
: events
, changes
,
408 events
, 1, NULL
, NULL
,
409 KEVENT_FLAG_WORKQ
| KEVENT_FLAG_ERROR_EVENTS
);
410 if (kev
== -1 && errno
== EINTR
) {
412 T_LOG("kevent_qos was interrupted");
416 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kev
, "kevent_qos");
422 case DISPATCH_INCREMENTAL_WRITE
: {
423 dispatch_source_t write_src
;
425 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.wr_finished
, SYNC_POLICY_FIFO
, 0),
426 "semaphore_create shared.wr_finished");
429 T_ASSERT_NE_UINT(shared
.wr_finished
, (unsigned)MACH_PORT_NULL
, "semaphore_create");
431 write_src
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
,
432 (uintptr_t)shared
.wr_fd
, 0, NULL
);
433 T_QUIET
; T_ASSERT_NOTNULL(write_src
,
434 "dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE ...)");
436 dispatch_block_t handler
= dispatch_block_create_with_qos_class(
437 DISPATCH_BLOCK_ENFORCE_QOS_CLASS
, EXPECTED_QOS
, 0, ^{
439 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
440 // "write handler block should run at correct QoS");
441 if (!handle_writing(shared
.fd_pair
, shared
.wr_fd
)) {
442 /* finished handling the fd, tear down the source */
443 dispatch_source_cancel(write_src
);
444 dispatch_release(write_src
);
445 T_LOG("signal shared.wr_finished");
446 semaphore_signal(shared
.wr_finished
);
450 dispatch_source_set_event_handler(write_src
, handler
);
451 dispatch_activate(write_src
);
457 T_ASSERT_FAIL("unrecognized write mode: %d", shared
.wr_mode
);
461 if (shared
.wr_finished
) {
462 T_LOG("wait shared.wr_finished");
463 kern_return_t kret
= semaphore_timedwait(shared
.wr_finished
, WRITE_timeout
);
464 if (kret
== KERN_OPERATION_TIMED_OUT
) {
465 T_ASSERT_FAIL("write side semaphore timedout after %d seconds", WRITE_timeout
.tv_sec
);
468 T_ASSERT_MACH_SUCCESS(kret
, "semaphore_timedwait shared.wr_finished");
469 semaphore_destroy(mach_task_self(), shared
.wr_finished
);
472 T_LOG("writer finished, closing fd");
473 T_QUIET
; T_ASSERT_POSIX_SUCCESS(close(shared
.wr_fd
), NULL
);
480 static char final_string
[BUF_LEN
];
481 static size_t final_length
;
484 * Read from the master PTY descriptor.
486 * Returns false if EOF is encountered, and true otherwise.
489 handle_reading(enum fd_pair fd_pair
, int fd
)
491 char read_buf
[BUF_LEN
] = { 0 };
492 ssize_t bytes_rd
= 0;
495 if (bytes_rd
== -1) {
496 T_LOG("read was interrupted, retrying");
498 bytes_rd
= read(fd
, read_buf
, sizeof(read_buf
) - 1);
499 } while (bytes_rd
== -1 && errno
== EINTR
);
501 // T_LOG("read %zd bytes: '%s'", bytes_rd, read_buf);
503 T_QUIET
; T_ASSERT_POSIX_SUCCESS(bytes_rd
, "reading from file");
504 T_QUIET
; T_ASSERT_LE(bytes_rd
, (ssize_t
)EXPECTED_LEN
,
505 "read too much from file");
508 T_LOG("read EOF from file");
512 read_buf
[bytes_rd
] = '\0';
513 strlcpy(&(final_string
[final_length
]), read_buf
,
514 sizeof(final_string
) - final_length
);
515 final_length
+= (size_t)bytes_rd
;
517 T_QUIET
; T_ASSERT_LE(final_length
, EXPECTED_LEN
,
518 "should not read more from file than what can be sent");
520 /* FIFOs don't (and TTYs may not) send EOF when the write side closes */
521 if (final_length
== strlen(EXPECTED_STRING
) &&
522 (fd_pair
== FIFO_PAIR
|| fd_pair
== PTY_PAIR
))
524 T_LOG("read all expected bytes from %s",
525 fd_pair
== FIFO_PAIR
? "FIFO" : "PTY");
532 workqueue_read_fn(void ** __unused buf
, int * __unused count
)
535 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
536 // "reader thread should be requested at correct QoS");
537 if (!handle_reading(shared
.fd_pair
, shared
.rd_fd
)) {
538 T_LOG("signal shared.rd_finished");
539 semaphore_signal(shared
.rd_finished
);
542 reenable_workq(shared
.rd_fd
, EVFILT_READ
);
546 read_from_fd(int fd
, enum fd_pair fd_pair
, enum read_mode mode
)
550 T_LOG("reader setting up");
552 bzero(final_string
, sizeof(final_string
));
554 fd_flags
= fcntl(fd
, F_GETFL
, 0);
555 T_QUIET
; T_ASSERT_POSIX_SUCCESS(fd_flags
, "fcntl(F_GETFL)");
557 if (!(fd_flags
& O_NONBLOCK
)) {
559 T_ASSERT_POSIX_SUCCESS(fcntl(fd
, F_SETFL
,
560 fd_flags
| O_NONBLOCK
), NULL
);
565 struct pollfd fds
[] = { { .fd
= fd
, .events
= POLLIN
} };
570 int pol
= poll(fds
, 1, READ_TIMEOUT_SECS
* 1000);
571 T_QUIET
; T_ASSERT_POSIX_SUCCESS(pol
, "poll");
572 T_QUIET
; T_ASSERT_NE(pol
, 0,
573 "poll should not time out after %d seconds, read %zd out "
575 READ_TIMEOUT_SECS
, final_length
, strlen(EXPECTED_STRING
));
576 T_QUIET
; T_ASSERT_FALSE(fds
[0].revents
& POLLERR
,
577 "should not see an error on the device");
578 T_QUIET
; T_ASSERT_FALSE(fds
[0].revents
& POLLNVAL
,
579 "should not set up an invalid poll");
581 if (!handle_reading(fd_pair
, fd
)) {
592 struct timeval tv
= { .tv_sec
= READ_TIMEOUT_SECS
};
596 FD_SET(fd
, &read_fd
);
601 int sel
= select(fd
+ 1, &read_fd
, NULL
, NULL
/*&err_fd*/, &tv
);
602 if (sel
== -1 && errno
== EINTR
) {
603 T_LOG("select interrupted");
608 T_QUIET
; T_ASSERT_POSIX_SUCCESS(sel
, "select");
610 T_QUIET
; T_ASSERT_NE(sel
, 0,
611 "select waited for %d seconds and timed out",
614 if (fd_pair
== PTY_PAIR
) {
616 * XXX sometimes a PTY doesn't send EOF when the writer closes
620 /* didn't fail or time out, therefore data is ready */
621 T_QUIET
; T_ASSERT_NE(FD_ISSET(fd
, &read_fd
), 0,
622 "select should show reading fd as readable");
624 if (!handle_reading(fd_pair
, fd
)) {
630 case KEVENT_READ
: /* FALLTHROUGH */
631 case KEVENT64_READ
: /* FALLTHROUGH */
632 case KEVENT_QOS_READ
: {
633 union mode rd_mode
= { .rd
= shared
.rd_mode
};
634 drive_kq(true, rd_mode
, fd_pair
, shared
.rd_fd
);
639 // prohibit ourselves from going multi-threaded see:rdar://33296008
640 _dispatch_prohibit_transition_to_multithreaded(true);
641 T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(
642 workqueue_fn
, workqueue_read_fn
, 0, 0), NULL
);
644 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.rd_finished
, SYNC_POLICY_FIFO
, 0),
645 "semaphore_create shared.rd_finished");
648 T_ASSERT_NE_UINT(shared
.rd_finished
, (unsigned)MACH_PORT_NULL
, "semaphore_create");
651 struct kevent_qos_s events
[] = {{
652 .ident
= (uint64_t)shared
.rd_fd
,
653 .filter
= EVFILT_READ
,
654 .flags
= EV_ADD
| EV_UDATA_SPECIFIC
| EV_DISPATCH
| EV_VANISHED
,
655 .fflags
= NOTE_LOWAT
,
657 .qos
= (int32_t)_pthread_qos_class_encode(EXPECTED_QOS
,
662 int kev
= kevent_qos(-1, changes
== 0 ? NULL
: events
, changes
,
663 events
, 1, NULL
, NULL
,
664 KEVENT_FLAG_WORKQ
| KEVENT_FLAG_ERROR_EVENTS
);
665 if (kev
== -1 && errno
== EINTR
) {
667 T_LOG("kevent_qos was interrupted");
671 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kev
, "kevent_qos");
679 case DISPATCH_READ
: {
680 dispatch_source_t read_src
;
682 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.rd_finished
, SYNC_POLICY_FIFO
, 0),
683 "semaphore_create shared.rd_finished");
686 T_ASSERT_NE_UINT(shared
.rd_finished
, (unsigned)MACH_PORT_NULL
, "semaphore_create");
688 read_src
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
,
689 (uintptr_t)fd
, 0, NULL
);
690 T_QUIET
; T_ASSERT_NOTNULL(read_src
,
691 "dispatch_source_create(DISPATCH_SOURCE_TYPE_READ)");
693 dispatch_block_t handler
= dispatch_block_create_with_qos_class(
694 DISPATCH_BLOCK_ENFORCE_QOS_CLASS
, EXPECTED_QOS
, 0, ^{
696 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
697 // "read handler block should run at correct QoS");
699 if (!handle_reading(fd_pair
, fd
)) {
700 /* finished handling the fd, tear down the source */
701 dispatch_source_cancel(read_src
);
702 dispatch_release(read_src
);
703 T_LOG("signal shared.rd_finished");
704 semaphore_signal(shared
.rd_finished
);
708 dispatch_source_set_event_handler(read_src
, handler
);
709 dispatch_activate(read_src
);
716 T_ASSERT_FAIL("unrecognized read mode: %d", mode
);
720 if (shared
.rd_finished
) {
721 T_LOG("wait shared.rd_finished");
722 kern_return_t kret
= semaphore_timedwait(shared
.rd_finished
, READ_timeout
);
723 if (kret
== KERN_OPERATION_TIMED_OUT
) {
724 T_ASSERT_FAIL("reading timed out after %d seconds", READ_timeout
.tv_sec
);
727 T_ASSERT_MACH_SUCCESS(kret
, "semaphore_timedwait shared.rd_finished");
730 T_EXPECT_EQ_STR(final_string
, EXPECTED_STRING
,
731 "reader should receive valid string");
732 T_QUIET
; T_ASSERT_POSIX_SUCCESS(close(fd
), NULL
);
735 #pragma mark file setup
738 fd_pair_init(enum fd_pair fd_pair
, int *rd_fd
, int *wr_fd
)
742 T_ASSERT_POSIX_SUCCESS(openpty(rd_fd
, wr_fd
, NULL
, NULL
, NULL
),
747 char fifo_path
[] = "/tmp/async-io-fifo.XXXXXX";
748 T_QUIET
; T_ASSERT_NOTNULL(mktemp(fifo_path
), NULL
);
750 T_ASSERT_POSIX_SUCCESS(mkfifo(fifo_path
, 0700), "mkfifo(%s, 0700)",
753 * Opening the read side of a pipe will block until the write
754 * side opens -- use O_NONBLOCK.
756 *rd_fd
= open(fifo_path
, O_RDONLY
| O_NONBLOCK
);
757 T_QUIET
; T_ASSERT_POSIX_SUCCESS(*rd_fd
, "open(... O_RDONLY)");
758 *wr_fd
= open(fifo_path
, O_WRONLY
| O_NONBLOCK
);
759 T_QUIET
; T_ASSERT_POSIX_SUCCESS(*wr_fd
, "open(... O_WRONLY)");
765 T_ASSERT_POSIX_SUCCESS(pipe(pipe_fds
), NULL
);
766 *rd_fd
= pipe_fds
[0];
767 *wr_fd
= pipe_fds
[1];
773 T_ASSERT_POSIX_SUCCESS(socketpair(AF_UNIX
, SOCK_STREAM
, 0, sock_fds
),
775 *rd_fd
= sock_fds
[0];
776 *wr_fd
= sock_fds
[1];
781 T_ASSERT_FAIL("unknown descriptor pair type: %d", fd_pair
);
785 T_QUIET
; T_ASSERT_NE(*rd_fd
, -1, "reading descriptor");
786 T_QUIET
; T_ASSERT_NE(*wr_fd
, -1, "writing descriptor");
789 #pragma mark single process
792 drive_threads(enum fd_pair fd_pair
, enum read_mode rd_mode
,
793 enum write_mode wr_mode
)
797 shared
.fd_pair
= fd_pair
;
798 shared
.rd_mode
= rd_mode
;
799 shared
.wr_mode
= wr_mode
;
800 fd_pair_init(fd_pair
, &(shared
.rd_fd
), &(shared
.wr_fd
));
802 shared
.wr_kind
= THREAD_WRITER
;
803 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.wr_wait
.sem
, SYNC_POLICY_FIFO
, 0),
804 "semaphore_create shared.wr_wait.sem");
807 T_ASSERT_POSIX_ZERO(pthread_create(&thread
, NULL
, write_to_fd
, NULL
),
809 T_LOG("created writer thread");
811 read_from_fd(shared
.rd_fd
, fd_pair
, rd_mode
);
813 T_ASSERT_POSIX_ZERO(pthread_join(thread
, NULL
), NULL
);
818 #pragma mark multiple processes
820 static void __attribute__((noreturn
))
821 drive_processes(enum fd_pair fd_pair
, enum read_mode rd_mode
, enum write_mode wr_mode
)
823 shared
.fd_pair
= fd_pair
;
824 shared
.rd_mode
= rd_mode
;
825 shared
.wr_mode
= wr_mode
;
826 fd_pair_init(fd_pair
, &(shared
.rd_fd
), &(shared
.wr_fd
));
828 shared
.wr_kind
= PROCESS_WRITER
;
830 T_QUIET
; T_ASSERT_POSIX_SUCCESS(pipe(fds
), NULL
);
831 shared
.wr_wait
.out_fd
= fds
[0];
832 shared
.wr_wait
.in_fd
= fds
[1];
834 T_LOG("starting subprocesses");
835 dt_helper_t helpers
[2] = {
836 dt_fork_helper("reader_helper"),
837 dt_fork_helper("writer_helper")
843 dt_run_helpers(helpers
, 2, 50000);
846 T_HELPER_DECL(reader_helper
, "Read asynchronously")
849 read_from_fd(shared
.rd_fd
, shared
.fd_pair
, shared
.rd_mode
);
853 T_HELPER_DECL(writer_helper
, "Write asynchronously")
861 #define WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \
862 write_mode, read_name, read_mode) \
863 T_DECL(desc_name##_r##read_name##_w##write_name##_procs, "read changes to a " \
864 #desc_name " with " #read_name " and writing " #write_str \
865 " across two processes") \
867 drive_processes(fd_pair, read_mode, write_mode); \
869 #define WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \
870 write_mode, read_name, read_mode) \
871 T_DECL(desc_name##_r##read_name##_w##write_name##_thds, "read changes to a " \
872 #desc_name " with " #read_name " and writing " #write_str) \
874 drive_threads(fd_pair, read_mode, write_mode); \
877 #define WR_DECL(desc_name, fd_pair, write_name, write_str, write_mode, \
878 read_name, read_mode) \
879 WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \
880 write_mode, read_name, read_mode) \
881 WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \
882 write_mode, read_name, read_mode)
884 #define RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
885 WR_DECL(desc_name, fd_pair, full, "the full string", FULL_WRITE, \
886 read_name, read_mode) \
887 WR_DECL(desc_name, fd_pair, inc, "incrementally", \
888 INCREMENTAL_WRITE, read_name, read_mode)
890 #define RD_DECL_DISPATCH_ONLY(suffix, desc_name, fd_pair, read_name, \
892 WR_DECL##suffix(desc_name, fd_pair, inc_dispatch, \
893 "incrementally with a dispatch source", \
894 DISPATCH_INCREMENTAL_WRITE, read_name, read_mode)
895 #define RD_DECL_WORKQ_ONLY(suffix, desc_name, fd_pair, read_name, \
897 WR_DECL##suffix(desc_name, fd_pair, inc_workq, \
898 "incrementally with the workqueue", \
899 WORKQ_INCREMENTAL_WRITE, read_name, read_mode)
901 #define RD_DECL(desc_name, fd_pair, read_name, read_mode) \
902 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
903 RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode)
904 // RD_DECL_WORKQ_ONLY(, desc_name, fd_pair, read_name, read_mode)
907 * dispatch_source tests cannot share the same process as other workqueue
910 #define RD_DECL_DISPATCH(desc_name, fd_pair, read_name, read_mode) \
911 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
912 RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode) \
913 RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
917 * Workqueue tests cannot share the same process as other workqueue or
918 * dispatch_source tests.
919 #define RD_DECL_WORKQ(desc_name, fd_pair, read_name, read_mode) \
920 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
921 RD_DECL_DISPATCH_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
923 RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
927 #define PAIR_DECL(desc_name, fd_pair) \
928 RD_DECL(desc_name, fd_pair, poll, POLL_READ) \
929 RD_DECL(desc_name, fd_pair, select, SELECT_READ) \
930 RD_DECL(desc_name, fd_pair, kevent, KEVENT_READ) \
931 RD_DECL(desc_name, fd_pair, kevent64, KEVENT64_READ) \
932 RD_DECL(desc_name, fd_pair, kevent_qos, KEVENT_QOS_READ) \
933 RD_DECL_DISPATCH(desc_name, fd_pair, dispatch_source, DISPATCH_READ)
934 // RD_DECL_WORKQ(desc_name, fd_pair, workq, WORKQ_READ)
936 PAIR_DECL(tty
, PTY_PAIR
)
937 PAIR_DECL(pipe
, PIPE_PAIR
)
938 PAIR_DECL(fifo
, FIFO_PAIR
)
939 PAIR_DECL(socket
, SOCKET_PAIR
)