5 #include <darwintest.h>
7 #include <darwintest_multiprocess.h>
10 #include <dispatch/dispatch.h>
11 #include <dispatch/private.h>
17 #include <pthread/workqueue_private.h>
21 #include <sys/event.h>
22 #include <sys/socket.h>
25 #include <sys/types.h>
30 #include <System/sys/event.h> /* kevent_qos */
33 T_META_NAMESPACE("xnu.kevent"),
34 T_META_CHECK_LEAKS(false),
35 T_META_LTEPHASE(LTE_POSTINIT
));
38 * Test to validate that monitoring a PTY device, FIFO, pipe, or socket pair in
39 * a dispatch source, kqueue, poll, or select delivers read events within and
40 * between processes as expected.
42 * This test catches issues with watching special devices in kqueue(),
43 * which has tricky special cases for character devices like PTYs.
45 * It also exercises the path to wake up a dispatch worker thread from the
46 * special device kqueue event, which is also a special case in kqueue().
48 * See rdar://problem/26240299&26220074&26226862&28625427 for examples and
52 #define EXPECTED_STRING "abcdefghijklmnopqrstuvwxyz. ABCDEFGHIJKLMNOPQRSTUVWXYZ. 1234567890"
53 #define EXPECTED_LEN strlen(EXPECTED_STRING)
55 #define READ_SETUP_TIMEOUT_SECS 2
56 #define WRITE_TIMEOUT_SECS 4
57 #define READ_TIMEOUT_SECS 4
58 #define INCREMENTAL_WRITE_SLEEP_USECS 50
60 static mach_timespec_t READ_SETUP_timeout
= {.tv_sec
= READ_SETUP_TIMEOUT_SECS
, .tv_nsec
= 0};
61 static mach_timespec_t READ_timeout
= {.tv_sec
= READ_TIMEOUT_SECS
, .tv_nsec
= 0};
62 static mach_timespec_t WRITE_timeout
= {.tv_sec
= WRITE_TIMEOUT_SECS
, .tv_nsec
= 0};
74 KEVENT_INCREMENTAL_WRITE
,
75 KEVENT64_INCREMENTAL_WRITE
,
76 KEVENT_QOS_INCREMENTAL_WRITE
,
77 WORKQ_INCREMENTAL_WRITE
,
78 DISPATCH_INCREMENTAL_WRITE
98 enum write_mode wr_mode
;
100 enum read_mode rd_mode
;
104 THREAD_WRITER
, /* sem */
105 PROCESS_WRITER
/* fd */
114 semaphore_t wr_finished
;
115 semaphore_t rd_finished
;
118 static bool handle_reading(enum fd_pair fd_pair
, int fd
);
119 static bool handle_writing(enum fd_pair fd_pair
, int fd
);
120 static void drive_kq(bool reading
, union mode mode
, enum fd_pair fd_pair
,
128 T_LOG("waking writer");
130 switch (shared
.wr_kind
) {
132 T_LOG("signal shared.wr_wait.sem");
133 semaphore_signal(shared
.wr_wait
.sem
);
135 case PROCESS_WRITER
: {
137 close(shared
.wr_wait
.out_fd
);
138 T_QUIET
; T_ASSERT_POSIX_SUCCESS(write(
139 shared
.wr_wait
.in_fd
, &tmp
, 1), NULL
);
148 switch (shared
.wr_kind
) {
150 T_LOG("wait shared.wr_wait.sem");
151 kern_return_t kret
= semaphore_timedwait(shared
.wr_wait
.sem
, READ_SETUP_timeout
);
153 if (kret
== KERN_OPERATION_TIMED_OUT
) {
154 T_ASSERT_FAIL("THREAD_WRITER semaphore timedout after %d seconds", READ_SETUP_timeout
.tv_sec
);
157 T_ASSERT_MACH_SUCCESS(kret
, "semaphore_timedwait shared.wr_wait.sem");
160 case PROCESS_WRITER
: {
162 close(shared
.wr_wait
.in_fd
);
163 T_QUIET
; T_ASSERT_POSIX_SUCCESS(read(
164 shared
.wr_wait
.out_fd
, &tmp
, 1), NULL
);
169 T_LOG("writer woken up, starting to write");
173 handle_writing(enum fd_pair __unused fd_pair
, int fd
)
175 static unsigned int cur_char
= 0;
176 T_QUIET
; T_ASSERT_POSIX_SUCCESS(write(fd
,
177 &(EXPECTED_STRING
[cur_char
]), 1), NULL
);
180 return cur_char
< EXPECTED_LEN
;
183 #define EXPECTED_QOS QOS_CLASS_USER_INITIATED
186 reenable_workq(int fd
, int16_t filt
)
188 struct kevent_qos_s events
[] = {{
189 .ident
= (uint64_t)fd
,
191 .flags
= EV_ENABLE
| EV_UDATA_SPECIFIC
| EV_DISPATCH
,
192 .qos
= (int32_t)_pthread_qos_class_encode(EXPECTED_QOS
,
194 .fflags
= NOTE_LOWAT
,
198 int kev
= kevent_qos(-1, events
, 1, events
, 1, NULL
, NULL
,
199 KEVENT_FLAG_WORKQ
| KEVENT_FLAG_ERROR_EVENTS
);
200 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kev
, "reenable workq in kevent_qos");
204 workqueue_write_fn(void ** __unused buf
, int * __unused count
)
207 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
208 // "writer thread should be woken up at correct QoS");
209 if (!handle_writing(shared
.fd_pair
, shared
.wr_fd
)) {
210 /* finished handling the fd, tear down the source */
211 T_LOG("signal shared.wr_finished");
212 semaphore_signal(shared
.wr_finished
);
216 reenable_workq(shared
.wr_fd
, EVFILT_WRITE
);
220 workqueue_fn(pthread_priority_t __unused priority
)
222 T_ASSERT_FAIL("workqueue function callback was called");
226 drive_kq(bool reading
, union mode mode
, enum fd_pair fd_pair
, int fd
)
228 struct timespec timeout
= { .tv_sec
= READ_TIMEOUT_SECS
};
231 struct kevent events
;
232 EV_SET(&events
, fd
, reading
? EVFILT_READ
: EVFILT_WRITE
, EV_ADD
,
233 NOTE_LOWAT
, 1, NULL
);
234 struct kevent64_s events64
;
235 EV_SET64(&events64
, fd
, reading
? EVFILT_READ
: EVFILT_WRITE
, EV_ADD
,
236 NOTE_LOWAT
, 1, 0, 0, 0);
237 struct kevent_qos_s events_qos
[] = {{
238 .ident
= (uint64_t)fd
,
239 .filter
= reading
? EVFILT_READ
: EVFILT_WRITE
,
241 .fflags
= NOTE_LOWAT
,
245 .filter
= EVFILT_TIMER
,
247 .fflags
= NOTE_SECONDS
,
248 .data
= READ_TIMEOUT_SECS
251 /* determine which variant of kevent to use */
252 enum read_mode which_kevent
;
254 which_kevent
= mode
.rd
;
256 if (mode
.wr
== KEVENT_INCREMENTAL_WRITE
) {
257 which_kevent
= KEVENT_READ
;
258 } else if (mode
.wr
== KEVENT64_INCREMENTAL_WRITE
) {
259 which_kevent
= KEVENT64_READ
;
260 } else if (mode
.wr
== KEVENT_QOS_INCREMENTAL_WRITE
) {
261 which_kevent
= KEVENT_QOS_READ
;
263 T_ASSERT_FAIL("unexpected mode: %d", mode
.wr
);
264 __builtin_unreachable();
268 int kq_fd
= kqueue();
269 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kq_fd
, "kqueue");
271 switch (which_kevent
) {
273 kev
= kevent(kq_fd
, &events
, 1, NULL
, 0, NULL
);
276 kev
= kevent64(kq_fd
, &events64
, 1, NULL
, 0, 0, NULL
);
278 case KEVENT_QOS_READ
:
279 kev
= kevent_qos(kq_fd
, events_qos
, 2, NULL
, 0, NULL
, NULL
, 0);
281 case POLL_READ
: /* FALLTHROUGH */
282 case SELECT_READ
: /* FALLTHROUGH */
283 case DISPATCH_READ
: /* FALLTHROUGH */
284 case WORKQ_READ
: /* FALLTHROUGH */
286 T_ASSERT_FAIL("unexpected mode: %d", reading
? mode
.rd
: mode
.wr
);
297 switch (which_kevent
) {
299 kev
= kevent(kq_fd
, NULL
, 0, &events
, 1, &timeout
);
302 kev
= kevent64(kq_fd
, NULL
, 0, &events64
, 1, 0, &timeout
);
304 case KEVENT_QOS_READ
:
305 kev
= kevent_qos(kq_fd
, NULL
, 0, events_qos
, 2, NULL
, NULL
, 0);
307 /* check for a timeout */
308 for (int i
= 0; i
< kev
; i
++) {
309 if (events_qos
[i
].filter
== EVFILT_TIMER
) {
314 case POLL_READ
: /* FALLTHROUGH */
315 case SELECT_READ
: /* FALLTHROUGH */
316 case DISPATCH_READ
: /* FALLTHROUGH */
317 case WORKQ_READ
: /* FALLTHROUGH */
319 T_ASSERT_FAIL("unexpected mode: %d", reading
? mode
.rd
: mode
.wr
);
323 if (kev
== -1 && errno
== EINTR
) {
324 T_LOG("kevent was interrupted");
327 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kev
, "kevent");
328 T_QUIET
; T_ASSERT_NE(kev
, 0, "kevent timed out");
331 if (!handle_reading(fd_pair
, fd
)) {
335 if (!handle_writing(fd_pair
, fd
)) {
345 write_to_fd(void * __unused ctx
)
347 ssize_t bytes_wr
= 0;
351 switch (shared
.wr_mode
) {
354 if (bytes_wr
== -1) {
355 T_LOG("write from child was interrupted");
357 bytes_wr
= write(shared
.wr_fd
, EXPECTED_STRING
,
359 } while (bytes_wr
== -1 && errno
== EINTR
);
360 T_QUIET
; T_ASSERT_POSIX_SUCCESS(bytes_wr
, "write");
361 T_QUIET
; T_ASSERT_EQ(bytes_wr
, (ssize_t
)EXPECTED_LEN
,
362 "wrote enough bytes");
365 case INCREMENTAL_WRITE
:
366 for (unsigned int i
= 0; i
< EXPECTED_LEN
; i
++) {
368 T_ASSERT_POSIX_SUCCESS(write(shared
.wr_fd
,
369 &(EXPECTED_STRING
[i
]), 1), NULL
);
370 usleep(INCREMENTAL_WRITE_SLEEP_USECS
);
374 case KEVENT_INCREMENTAL_WRITE
: /* FALLTHROUGH */
375 case KEVENT64_INCREMENTAL_WRITE
: /* FALLTHROUGH */
376 case KEVENT_QOS_INCREMENTAL_WRITE
: {
377 union mode mode
= { .wr
= shared
.wr_mode
};
378 drive_kq(false, mode
, shared
.fd_pair
, shared
.wr_fd
);
382 case WORKQ_INCREMENTAL_WRITE
: {
383 // prohibit ourselves from going multi-threaded see:rdar://33296008
384 _dispatch_prohibit_transition_to_multithreaded(true);
387 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.wr_finished
, SYNC_POLICY_FIFO
, 0),
388 "semaphore_create shared.wr_finished");
391 T_ASSERT_NE_UINT(shared
.wr_finished
, (unsigned)MACH_PORT_NULL
, "wr_finished semaphore_create");
394 T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(workqueue_fn
, workqueue_write_fn
, 0, 0), NULL
);
396 struct kevent_qos_s events
[] = {{
397 .ident
= (uint64_t)shared
.wr_fd
,
398 .filter
= EVFILT_WRITE
,
399 .flags
= EV_ADD
| EV_UDATA_SPECIFIC
| EV_DISPATCH
| EV_VANISHED
,
400 .fflags
= NOTE_LOWAT
,
402 .qos
= (int32_t)_pthread_qos_class_encode(EXPECTED_QOS
,
407 int kev
= kevent_qos(-1, changes
== 0 ? NULL
: events
, changes
,
408 events
, 1, NULL
, NULL
,
409 KEVENT_FLAG_WORKQ
| KEVENT_FLAG_ERROR_EVENTS
);
410 if (kev
== -1 && errno
== EINTR
) {
412 T_LOG("kevent_qos was interrupted");
416 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kev
, "kevent_qos");
422 case DISPATCH_INCREMENTAL_WRITE
: {
423 dispatch_source_t write_src
;
425 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.wr_finished
, SYNC_POLICY_FIFO
, 0),
426 "semaphore_create shared.wr_finished");
429 T_ASSERT_NE_UINT(shared
.wr_finished
, (unsigned)MACH_PORT_NULL
, "semaphore_create");
431 write_src
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
,
432 (uintptr_t)shared
.wr_fd
, 0, NULL
);
433 T_QUIET
; T_ASSERT_NOTNULL(write_src
,
434 "dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE ...)");
436 dispatch_block_t handler
= dispatch_block_create_with_qos_class(
437 DISPATCH_BLOCK_ENFORCE_QOS_CLASS
, EXPECTED_QOS
, 0, ^{
439 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
440 // "write handler block should run at correct QoS");
441 if (!handle_writing(shared
.fd_pair
, shared
.wr_fd
)) {
442 /* finished handling the fd, tear down the source */
443 dispatch_source_cancel(write_src
);
444 dispatch_release(write_src
);
445 T_LOG("signal shared.wr_finished");
446 semaphore_signal(shared
.wr_finished
);
450 dispatch_source_set_event_handler(write_src
, handler
);
451 dispatch_activate(write_src
);
457 T_ASSERT_FAIL("unrecognized write mode: %d", shared
.wr_mode
);
461 if (shared
.wr_finished
) {
462 T_LOG("wait shared.wr_finished");
463 kern_return_t kret
= semaphore_timedwait(shared
.wr_finished
, WRITE_timeout
);
464 if (kret
== KERN_OPERATION_TIMED_OUT
) {
465 T_ASSERT_FAIL("write side semaphore timedout after %d seconds", WRITE_timeout
.tv_sec
);
468 T_ASSERT_MACH_SUCCESS(kret
, "semaphore_timedwait shared.wr_finished");
469 semaphore_destroy(mach_task_self(), shared
.wr_finished
);
472 T_LOG("writer finished, closing fd");
473 T_QUIET
; T_ASSERT_POSIX_SUCCESS(close(shared
.wr_fd
), NULL
);
480 static char final_string
[BUF_LEN
];
481 static size_t final_length
;
484 * Read from the master PTY descriptor.
486 * Returns false if EOF is encountered, and true otherwise.
489 handle_reading(enum fd_pair fd_pair
, int fd
)
491 char read_buf
[BUF_LEN
] = { 0 };
492 ssize_t bytes_rd
= 0;
495 if (bytes_rd
== -1) {
496 T_LOG("read was interrupted, retrying");
498 bytes_rd
= read(fd
, read_buf
, sizeof(read_buf
) - 1);
499 } while (bytes_rd
== -1 && errno
== EINTR
);
501 // T_LOG("read %zd bytes: '%s'", bytes_rd, read_buf);
503 T_QUIET
; T_ASSERT_POSIX_SUCCESS(bytes_rd
, "reading from file");
504 T_QUIET
; T_ASSERT_LE(bytes_rd
, (ssize_t
)EXPECTED_LEN
,
505 "read too much from file");
508 T_LOG("read EOF from file");
512 read_buf
[bytes_rd
] = '\0';
513 strlcpy(&(final_string
[final_length
]), read_buf
,
514 sizeof(final_string
) - final_length
);
515 final_length
+= (size_t)bytes_rd
;
517 T_QUIET
; T_ASSERT_LE(final_length
, EXPECTED_LEN
,
518 "should not read more from file than what can be sent");
520 /* FIFOs don't send EOF when the write side closes */
521 if (final_length
== strlen(EXPECTED_STRING
) &&
522 (fd_pair
== FIFO_PAIR
)) {
523 T_LOG("read all expected bytes from FIFO");
530 workqueue_read_fn(void ** __unused buf
, int * __unused count
)
533 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
534 // "reader thread should be requested at correct QoS");
535 if (!handle_reading(shared
.fd_pair
, shared
.rd_fd
)) {
536 T_LOG("signal shared.rd_finished");
537 semaphore_signal(shared
.rd_finished
);
540 reenable_workq(shared
.rd_fd
, EVFILT_READ
);
544 read_from_fd(int fd
, enum fd_pair fd_pair
, enum read_mode mode
)
548 T_LOG("reader setting up");
550 bzero(final_string
, sizeof(final_string
));
552 fd_flags
= fcntl(fd
, F_GETFL
, 0);
553 T_QUIET
; T_ASSERT_POSIX_SUCCESS(fd_flags
, "fcntl(F_GETFL)");
555 if (!(fd_flags
& O_NONBLOCK
)) {
557 T_ASSERT_POSIX_SUCCESS(fcntl(fd
, F_SETFL
,
558 fd_flags
| O_NONBLOCK
), NULL
);
563 struct pollfd fds
[] = { { .fd
= fd
, .events
= POLLIN
} };
568 int pol
= poll(fds
, 1, READ_TIMEOUT_SECS
* 1000);
569 T_QUIET
; T_ASSERT_POSIX_SUCCESS(pol
, "poll");
570 T_QUIET
; T_ASSERT_NE(pol
, 0,
571 "poll should not time out after %d seconds, read %zd out "
573 READ_TIMEOUT_SECS
, final_length
, strlen(EXPECTED_STRING
));
574 T_QUIET
; T_ASSERT_FALSE(fds
[0].revents
& POLLERR
,
575 "should not see an error on the device");
576 T_QUIET
; T_ASSERT_FALSE(fds
[0].revents
& POLLNVAL
,
577 "should not set up an invalid poll");
579 if (!handle_reading(fd_pair
, fd
)) {
590 struct timeval tv
= { .tv_sec
= READ_TIMEOUT_SECS
};
594 FD_SET(fd
, &read_fd
);
599 int sel
= select(fd
+ 1, &read_fd
, NULL
, NULL
/*&err_fd*/, &tv
);
600 if (sel
== -1 && errno
== EINTR
) {
601 T_LOG("select interrupted");
606 T_QUIET
; T_ASSERT_POSIX_SUCCESS(sel
, "select");
608 T_QUIET
; T_ASSERT_NE(sel
, 0,
609 "select waited for %d seconds and timed out",
612 /* didn't fail or time out, therefore data is ready */
613 T_QUIET
; T_ASSERT_NE(FD_ISSET(fd
, &read_fd
), 0,
614 "select should show reading fd as readable");
616 if (!handle_reading(fd_pair
, fd
)) {
622 case KEVENT_READ
: /* FALLTHROUGH */
623 case KEVENT64_READ
: /* FALLTHROUGH */
624 case KEVENT_QOS_READ
: {
625 union mode rd_mode
= { .rd
= shared
.rd_mode
};
626 drive_kq(true, rd_mode
, fd_pair
, shared
.rd_fd
);
631 // prohibit ourselves from going multi-threaded see:rdar://33296008
632 _dispatch_prohibit_transition_to_multithreaded(true);
633 T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(
634 workqueue_fn
, workqueue_read_fn
, 0, 0), NULL
);
636 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.rd_finished
, SYNC_POLICY_FIFO
, 0),
637 "semaphore_create shared.rd_finished");
640 T_ASSERT_NE_UINT(shared
.rd_finished
, (unsigned)MACH_PORT_NULL
, "semaphore_create");
643 struct kevent_qos_s events
[] = {{
644 .ident
= (uint64_t)shared
.rd_fd
,
645 .filter
= EVFILT_READ
,
646 .flags
= EV_ADD
| EV_UDATA_SPECIFIC
| EV_DISPATCH
| EV_VANISHED
,
647 .fflags
= NOTE_LOWAT
,
649 .qos
= (int32_t)_pthread_qos_class_encode(EXPECTED_QOS
,
654 int kev
= kevent_qos(-1, changes
== 0 ? NULL
: events
, changes
,
655 events
, 1, NULL
, NULL
,
656 KEVENT_FLAG_WORKQ
| KEVENT_FLAG_ERROR_EVENTS
);
657 if (kev
== -1 && errno
== EINTR
) {
659 T_LOG("kevent_qos was interrupted");
663 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kev
, "kevent_qos");
671 case DISPATCH_READ
: {
672 dispatch_source_t read_src
;
674 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.rd_finished
, SYNC_POLICY_FIFO
, 0),
675 "semaphore_create shared.rd_finished");
678 T_ASSERT_NE_UINT(shared
.rd_finished
, (unsigned)MACH_PORT_NULL
, "semaphore_create");
680 read_src
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
,
681 (uintptr_t)fd
, 0, NULL
);
682 T_QUIET
; T_ASSERT_NOTNULL(read_src
,
683 "dispatch_source_create(DISPATCH_SOURCE_TYPE_READ)");
685 dispatch_block_t handler
= dispatch_block_create_with_qos_class(
686 DISPATCH_BLOCK_ENFORCE_QOS_CLASS
, EXPECTED_QOS
, 0, ^{
688 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
689 // "read handler block should run at correct QoS");
691 if (!handle_reading(fd_pair
, fd
)) {
692 /* finished handling the fd, tear down the source */
693 dispatch_source_cancel(read_src
);
694 dispatch_release(read_src
);
695 T_LOG("signal shared.rd_finished");
696 semaphore_signal(shared
.rd_finished
);
700 dispatch_source_set_event_handler(read_src
, handler
);
701 dispatch_activate(read_src
);
708 T_ASSERT_FAIL("unrecognized read mode: %d", mode
);
712 if (shared
.rd_finished
) {
713 T_LOG("wait shared.rd_finished");
714 kern_return_t kret
= semaphore_timedwait(shared
.rd_finished
, READ_timeout
);
715 if (kret
== KERN_OPERATION_TIMED_OUT
) {
716 T_ASSERT_FAIL("reading timed out after %d seconds", READ_timeout
.tv_sec
);
719 T_ASSERT_MACH_SUCCESS(kret
, "semaphore_timedwait shared.rd_finished");
722 T_EXPECT_EQ_STR(final_string
, EXPECTED_STRING
,
723 "reader should receive valid string");
724 T_QUIET
; T_ASSERT_POSIX_SUCCESS(close(fd
), NULL
);
727 #pragma mark file setup
730 fd_pair_init(enum fd_pair fd_pair
, int *rd_fd
, int *wr_fd
)
734 T_ASSERT_POSIX_SUCCESS(openpty(rd_fd
, wr_fd
, NULL
, NULL
, NULL
),
739 char fifo_path
[] = "/tmp/async-io-fifo.XXXXXX";
740 T_QUIET
; T_ASSERT_NOTNULL(mktemp(fifo_path
), NULL
);
742 T_ASSERT_POSIX_SUCCESS(mkfifo(fifo_path
, 0700), "mkfifo(%s, 0700)",
745 * Opening the read side of a pipe will block until the write
746 * side opens -- use O_NONBLOCK.
748 *rd_fd
= open(fifo_path
, O_RDONLY
| O_NONBLOCK
);
749 T_QUIET
; T_ASSERT_POSIX_SUCCESS(*rd_fd
, "open(... O_RDONLY)");
750 *wr_fd
= open(fifo_path
, O_WRONLY
| O_NONBLOCK
);
751 T_QUIET
; T_ASSERT_POSIX_SUCCESS(*wr_fd
, "open(... O_WRONLY)");
757 T_ASSERT_POSIX_SUCCESS(pipe(pipe_fds
), NULL
);
758 *rd_fd
= pipe_fds
[0];
759 *wr_fd
= pipe_fds
[1];
765 T_ASSERT_POSIX_SUCCESS(socketpair(AF_UNIX
, SOCK_STREAM
, 0, sock_fds
),
767 *rd_fd
= sock_fds
[0];
768 *wr_fd
= sock_fds
[1];
773 T_ASSERT_FAIL("unknown descriptor pair type: %d", fd_pair
);
777 T_QUIET
; T_ASSERT_NE(*rd_fd
, -1, "reading descriptor");
778 T_QUIET
; T_ASSERT_NE(*wr_fd
, -1, "writing descriptor");
781 #pragma mark single process
784 drive_threads(enum fd_pair fd_pair
, enum read_mode rd_mode
,
785 enum write_mode wr_mode
)
789 shared
.fd_pair
= fd_pair
;
790 shared
.rd_mode
= rd_mode
;
791 shared
.wr_mode
= wr_mode
;
792 fd_pair_init(fd_pair
, &(shared
.rd_fd
), &(shared
.wr_fd
));
794 shared
.wr_kind
= THREAD_WRITER
;
795 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.wr_wait
.sem
, SYNC_POLICY_FIFO
, 0),
796 "semaphore_create shared.wr_wait.sem");
799 T_ASSERT_POSIX_ZERO(pthread_create(&thread
, NULL
, write_to_fd
, NULL
),
801 T_LOG("created writer thread");
803 read_from_fd(shared
.rd_fd
, fd_pair
, rd_mode
);
805 T_ASSERT_POSIX_ZERO(pthread_join(thread
, NULL
), NULL
);
810 #pragma mark multiple processes
812 static void __attribute__((noreturn
))
813 drive_processes(enum fd_pair fd_pair
, enum read_mode rd_mode
, enum write_mode wr_mode
)
815 shared
.fd_pair
= fd_pair
;
816 shared
.rd_mode
= rd_mode
;
817 shared
.wr_mode
= wr_mode
;
818 fd_pair_init(fd_pair
, &(shared
.rd_fd
), &(shared
.wr_fd
));
820 shared
.wr_kind
= PROCESS_WRITER
;
822 T_QUIET
; T_ASSERT_POSIX_SUCCESS(pipe(fds
), NULL
);
823 shared
.wr_wait
.out_fd
= fds
[0];
824 shared
.wr_wait
.in_fd
= fds
[1];
826 T_LOG("starting subprocesses");
827 dt_helper_t helpers
[2] = {
828 dt_fork_helper("reader_helper"),
829 dt_fork_helper("writer_helper")
835 dt_run_helpers(helpers
, 2, 50000);
838 T_HELPER_DECL(reader_helper
, "Read asynchronously")
841 read_from_fd(shared
.rd_fd
, shared
.fd_pair
, shared
.rd_mode
);
845 T_HELPER_DECL(writer_helper
, "Write asynchronously")
853 #define WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \
854 write_mode, read_name, read_mode) \
855 T_DECL(desc_name##_r##read_name##_w##write_name##_procs, "read changes to a " \
856 #desc_name " with " #read_name " and writing " #write_str \
857 " across two processes") \
859 drive_processes(fd_pair, read_mode, write_mode); \
861 #define WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \
862 write_mode, read_name, read_mode) \
863 T_DECL(desc_name##_r##read_name##_w##write_name##_thds, "read changes to a " \
864 #desc_name " with " #read_name " and writing " #write_str) \
866 drive_threads(fd_pair, read_mode, write_mode); \
869 #define WR_DECL(desc_name, fd_pair, write_name, write_str, write_mode, \
870 read_name, read_mode) \
871 WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \
872 write_mode, read_name, read_mode) \
873 WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \
874 write_mode, read_name, read_mode)
876 #define RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
877 WR_DECL(desc_name, fd_pair, full, "the full string", FULL_WRITE, \
878 read_name, read_mode) \
879 WR_DECL(desc_name, fd_pair, inc, "incrementally", \
880 INCREMENTAL_WRITE, read_name, read_mode)
882 #define RD_DECL_DISPATCH_ONLY(suffix, desc_name, fd_pair, read_name, \
884 WR_DECL##suffix(desc_name, fd_pair, inc_dispatch, \
885 "incrementally with a dispatch source", \
886 DISPATCH_INCREMENTAL_WRITE, read_name, read_mode)
887 #define RD_DECL_WORKQ_ONLY(suffix, desc_name, fd_pair, read_name, \
889 WR_DECL##suffix(desc_name, fd_pair, inc_workq, \
890 "incrementally with the workqueue", \
891 WORKQ_INCREMENTAL_WRITE, read_name, read_mode)
893 #define RD_DECL(desc_name, fd_pair, read_name, read_mode) \
894 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
895 RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode)
896 // RD_DECL_WORKQ_ONLY(, desc_name, fd_pair, read_name, read_mode)
899 * dispatch_source tests cannot share the same process as other workqueue
902 #define RD_DECL_DISPATCH(desc_name, fd_pair, read_name, read_mode) \
903 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
904 RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode) \
905 RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
909 * Workqueue tests cannot share the same process as other workqueue or
910 * dispatch_source tests.
911 #define RD_DECL_WORKQ(desc_name, fd_pair, read_name, read_mode) \
912 * RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
913 * RD_DECL_DISPATCH_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
915 * RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
919 #define PAIR_DECL(desc_name, fd_pair) \
920 RD_DECL(desc_name, fd_pair, poll, POLL_READ) \
921 RD_DECL(desc_name, fd_pair, select, SELECT_READ) \
922 RD_DECL(desc_name, fd_pair, kevent, KEVENT_READ) \
923 RD_DECL(desc_name, fd_pair, kevent64, KEVENT64_READ) \
924 RD_DECL(desc_name, fd_pair, kevent_qos, KEVENT_QOS_READ) \
925 RD_DECL_DISPATCH(desc_name, fd_pair, dispatch_source, DISPATCH_READ)
926 // RD_DECL_WORKQ(desc_name, fd_pair, workq, WORKQ_READ)
928 PAIR_DECL(tty
, PTY_PAIR
)
929 PAIR_DECL(pipe
, PIPE_PAIR
)
930 PAIR_DECL(fifo
, FIFO_PAIR
)
931 PAIR_DECL(socket
, SOCKET_PAIR
)