5 #include <darwintest.h>
7 #include <darwintest_multiprocess.h>
10 #include <dispatch/dispatch.h>
11 #include <dispatch/private.h>
17 #include <pthread/workqueue_private.h>
21 #include <sys/event.h>
22 #include <sys/socket.h>
25 #include <sys/types.h>
30 #include <System/sys/event.h> /* kevent_qos */
33 T_META_NAMESPACE("xnu.kevent"),
34 T_META_CHECK_LEAKS(false),
35 T_META_LTEPHASE(LTE_POSTINIT
));
38 * Test to validate that monitoring a PTY device, FIFO, pipe, or socket pair in
39 * a dispatch source, kqueue, poll, or select delivers read events within and
40 * between processes as expected.
42 * This test catches issues with watching special devices in kqueue(),
43 * which has tricky special cases for character devices like PTYs.
45 * It also exercises the path to wake up a dispatch worker thread from the
46 * special device kqueue event, which is also a special case in kqueue().
48 * See rdar://problem/26240299&26220074&26226862&28625427 for examples and
52 #define EXPECTED_STRING "abcdefghijklmnopqrstuvwxyz. ABCDEFGHIJKLMNOPQRSTUVWXYZ. 1234567890"
53 #define EXPECTED_LEN strlen(EXPECTED_STRING)
55 #define READ_SETUP_TIMEOUT_SECS 2
56 #define WRITE_TIMEOUT_SECS 4
57 #define READ_TIMEOUT_SECS 4
58 #define INCREMENTAL_WRITE_SLEEP_USECS 50
60 static mach_timespec_t READ_SETUP_timeout
= {.tv_sec
= READ_SETUP_TIMEOUT_SECS
, .tv_nsec
= 0};
61 static mach_timespec_t READ_timeout
= {.tv_sec
= READ_TIMEOUT_SECS
, .tv_nsec
= 0};
62 static mach_timespec_t WRITE_timeout
= {.tv_sec
= WRITE_TIMEOUT_SECS
, .tv_nsec
= 0};
74 KEVENT_INCREMENTAL_WRITE
,
75 KEVENT64_INCREMENTAL_WRITE
,
76 KEVENT_QOS_INCREMENTAL_WRITE
,
77 WORKQ_INCREMENTAL_WRITE
,
78 DISPATCH_INCREMENTAL_WRITE
98 enum write_mode wr_mode
;
100 enum read_mode rd_mode
;
104 THREAD_WRITER
, /* sem */
105 PROCESS_WRITER
/* fd */
114 semaphore_t wr_finished
;
115 semaphore_t rd_finished
;
118 static bool handle_reading(enum fd_pair fd_pair
, int fd
);
119 static bool handle_writing(enum fd_pair fd_pair
, int fd
);
120 static void drive_kq(bool reading
, union mode mode
, enum fd_pair fd_pair
,
128 T_LOG("waking writer");
130 switch (shared
.wr_kind
) {
132 T_LOG("signal shared.wr_wait.sem");
133 semaphore_signal(shared
.wr_wait
.sem
);
135 case PROCESS_WRITER
: {
137 close(shared
.wr_wait
.out_fd
);
138 T_QUIET
; T_ASSERT_POSIX_SUCCESS(write(
139 shared
.wr_wait
.in_fd
, &tmp
, 1), NULL
);
148 switch (shared
.wr_kind
) {
150 T_LOG("wait shared.wr_wait.sem");
151 kern_return_t kret
= semaphore_timedwait(shared
.wr_wait
.sem
, READ_SETUP_timeout
);
153 if (kret
== KERN_OPERATION_TIMED_OUT
) {
154 T_ASSERT_FAIL("THREAD_WRITER semaphore timedout after %d seconds", READ_SETUP_timeout
.tv_sec
);
157 T_ASSERT_MACH_SUCCESS(kret
, "semaphore_timedwait shared.wr_wait.sem");
160 case PROCESS_WRITER
: {
162 close(shared
.wr_wait
.in_fd
);
163 T_QUIET
; T_ASSERT_POSIX_SUCCESS(read(
164 shared
.wr_wait
.out_fd
, &tmp
, 1), NULL
);
169 T_LOG("writer woken up, starting to write");
173 handle_writing(enum fd_pair __unused fd_pair
, int fd
)
175 static unsigned int cur_char
= 0;
176 T_QUIET
; T_ASSERT_POSIX_SUCCESS(write(fd
,
177 &(EXPECTED_STRING
[cur_char
]), 1), NULL
);
180 return (cur_char
< EXPECTED_LEN
);
183 #define EXPECTED_QOS QOS_CLASS_USER_INITIATED
186 reenable_workq(int fd
, int16_t filt
)
188 struct kevent_qos_s events
[] = {{
189 .ident
= (uint64_t)fd
,
191 .flags
= EV_ENABLE
| EV_UDATA_SPECIFIC
| EV_DISPATCH
,
192 .qos
= (int32_t)_pthread_qos_class_encode(EXPECTED_QOS
,
194 .fflags
= NOTE_LOWAT
,
198 int kev
= kevent_qos(-1, events
, 1, events
, 1, NULL
, NULL
,
199 KEVENT_FLAG_WORKQ
| KEVENT_FLAG_ERROR_EVENTS
);
200 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kev
, "reenable workq in kevent_qos");
204 workqueue_write_fn(void ** __unused buf
, int * __unused count
)
207 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
208 // "writer thread should be woken up at correct QoS");
209 if (!handle_writing(shared
.fd_pair
, shared
.wr_fd
)) {
210 /* finished handling the fd, tear down the source */
211 T_LOG("signal shared.wr_finished");
212 semaphore_signal(shared
.wr_finished
);
216 reenable_workq(shared
.wr_fd
, EVFILT_WRITE
);
220 workqueue_fn(pthread_priority_t __unused priority
)
222 T_ASSERT_FAIL("workqueue function callback was called");
226 drive_kq(bool reading
, union mode mode
, enum fd_pair fd_pair
, int fd
)
228 struct timespec timeout
= { .tv_sec
= READ_TIMEOUT_SECS
};
231 struct kevent events
;
232 EV_SET(&events
, fd
, reading
? EVFILT_READ
: EVFILT_WRITE
, EV_ADD
,
233 NOTE_LOWAT
, 1, NULL
);
234 struct kevent64_s events64
;
235 EV_SET64(&events64
, fd
, reading
? EVFILT_READ
: EVFILT_WRITE
, EV_ADD
,
236 NOTE_LOWAT
, 1, 0, 0, 0);
237 struct kevent_qos_s events_qos
[] = {{
238 .ident
= (uint64_t)fd
,
239 .filter
= reading
? EVFILT_READ
: EVFILT_WRITE
,
241 .fflags
= NOTE_LOWAT
,
245 .filter
= EVFILT_TIMER
,
247 .fflags
= NOTE_SECONDS
,
248 .data
= READ_TIMEOUT_SECS
251 /* determine which variant of kevent to use */
252 enum read_mode which_kevent
;
254 which_kevent
= mode
.rd
;
256 if (mode
.wr
== KEVENT_INCREMENTAL_WRITE
) {
257 which_kevent
= KEVENT_READ
;
258 } else if (mode
.wr
== KEVENT64_INCREMENTAL_WRITE
) {
259 which_kevent
= KEVENT64_READ
;
260 } else if (mode
.wr
== KEVENT_QOS_INCREMENTAL_WRITE
) {
261 which_kevent
= KEVENT_QOS_READ
;
263 T_ASSERT_FAIL("unexpected mode: %d", mode
.wr
);
264 __builtin_unreachable();
268 int kq_fd
= kqueue();
269 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kq_fd
, "kqueue");
271 switch (which_kevent
) {
273 kev
= kevent(kq_fd
, &events
, 1, NULL
, 0, NULL
);
276 kev
= kevent64(kq_fd
, &events64
, 1, NULL
, 0, 0, NULL
);
278 case KEVENT_QOS_READ
:
279 kev
= kevent_qos(kq_fd
, events_qos
, 2, NULL
, 0, NULL
, NULL
, 0);
281 case POLL_READ
: /* FALLTHROUGH */
282 case SELECT_READ
: /* FALLTHROUGH */
283 case DISPATCH_READ
: /* FALLTHROUGH */
284 case WORKQ_READ
: /* FALLTHROUGH */
286 T_ASSERT_FAIL("unexpected mode: %d", reading
? mode
.rd
: mode
.wr
);
297 switch (which_kevent
) {
299 kev
= kevent(kq_fd
, NULL
, 0, &events
, 1, &timeout
);
302 kev
= kevent64(kq_fd
, NULL
, 0, &events64
, 1, 0, &timeout
);
304 case KEVENT_QOS_READ
:
305 kev
= kevent_qos(kq_fd
, NULL
, 0, events_qos
, 2, NULL
, NULL
, 0);
307 /* check for a timeout */
308 for (int i
= 0; i
< kev
; i
++) {
309 if (events_qos
[i
].filter
== EVFILT_TIMER
) {
314 case POLL_READ
: /* FALLTHROUGH */
315 case SELECT_READ
: /* FALLTHROUGH */
316 case DISPATCH_READ
: /* FALLTHROUGH */
317 case WORKQ_READ
: /* FALLTHROUGH */
319 T_ASSERT_FAIL("unexpected mode: %d", reading
? mode
.rd
: mode
.wr
);
323 if (kev
== -1 && errno
== EINTR
) {
324 T_LOG("kevent was interrupted");
327 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kev
, "kevent");
328 T_QUIET
; T_ASSERT_NE(kev
, 0, "kevent timed out");
331 if (!handle_reading(fd_pair
, fd
)) {
335 if (!handle_writing(fd_pair
, fd
)) {
345 write_to_fd(void * __unused ctx
)
347 ssize_t bytes_wr
= 0;
351 switch (shared
.wr_mode
) {
354 if (bytes_wr
== -1) {
355 T_LOG("write from child was interrupted");
357 bytes_wr
= write(shared
.wr_fd
, EXPECTED_STRING
,
359 } while (bytes_wr
== -1 && errno
== EINTR
);
360 T_QUIET
; T_ASSERT_POSIX_SUCCESS(bytes_wr
, "write");
361 T_QUIET
; T_ASSERT_EQ(bytes_wr
, (ssize_t
)EXPECTED_LEN
,
362 "wrote enough bytes");
365 case INCREMENTAL_WRITE
:
366 for (unsigned int i
= 0; i
< EXPECTED_LEN
; i
++) {
368 T_ASSERT_POSIX_SUCCESS(write(shared
.wr_fd
,
369 &(EXPECTED_STRING
[i
]), 1), NULL
);
370 usleep(INCREMENTAL_WRITE_SLEEP_USECS
);
374 case KEVENT_INCREMENTAL_WRITE
: /* FALLTHROUGH */
375 case KEVENT64_INCREMENTAL_WRITE
: /* FALLTHROUGH */
376 case KEVENT_QOS_INCREMENTAL_WRITE
: {
377 union mode mode
= { .wr
= shared
.wr_mode
};
378 drive_kq(false, mode
, shared
.fd_pair
, shared
.wr_fd
);
382 case WORKQ_INCREMENTAL_WRITE
: {
383 // prohibit ourselves from going multi-threaded see:rdar://33296008
384 _dispatch_prohibit_transition_to_multithreaded(true);
387 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.wr_finished
, SYNC_POLICY_FIFO
, 0),
388 "semaphore_create shared.wr_finished");
391 T_ASSERT_NE_UINT(shared
.wr_finished
, (unsigned)MACH_PORT_NULL
, "wr_finished semaphore_create");
394 T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(workqueue_fn
, workqueue_write_fn
, 0, 0), NULL
);
396 struct kevent_qos_s events
[] = {{
397 .ident
= (uint64_t)shared
.wr_fd
,
398 .filter
= EVFILT_WRITE
,
399 .flags
= EV_ADD
| EV_UDATA_SPECIFIC
| EV_DISPATCH
| EV_VANISHED
,
400 .fflags
= NOTE_LOWAT
,
402 .qos
= (int32_t)_pthread_qos_class_encode(EXPECTED_QOS
,
407 int kev
= kevent_qos(-1, changes
== 0 ? NULL
: events
, changes
,
408 events
, 1, NULL
, NULL
,
409 KEVENT_FLAG_WORKQ
| KEVENT_FLAG_ERROR_EVENTS
);
410 if (kev
== -1 && errno
== EINTR
) {
412 T_LOG("kevent_qos was interrupted");
416 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kev
, "kevent_qos");
422 case DISPATCH_INCREMENTAL_WRITE
: {
423 dispatch_source_t write_src
;
425 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.wr_finished
, SYNC_POLICY_FIFO
, 0),
426 "semaphore_create shared.wr_finished");
429 T_ASSERT_NE_UINT(shared
.wr_finished
, (unsigned)MACH_PORT_NULL
, "semaphore_create");
431 write_src
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
,
432 (uintptr_t)shared
.wr_fd
, 0, NULL
);
433 T_QUIET
; T_ASSERT_NOTNULL(write_src
,
434 "dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE ...)");
436 dispatch_block_t handler
= dispatch_block_create_with_qos_class(
437 DISPATCH_BLOCK_ENFORCE_QOS_CLASS
, EXPECTED_QOS
, 0, ^{
439 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
440 // "write handler block should run at correct QoS");
441 if (!handle_writing(shared
.fd_pair
, shared
.wr_fd
)) {
442 /* finished handling the fd, tear down the source */
443 dispatch_source_cancel(write_src
);
444 dispatch_release(write_src
);
445 T_LOG("signal shared.wr_finished");
446 semaphore_signal(shared
.wr_finished
);
450 dispatch_source_set_event_handler(write_src
, handler
);
451 dispatch_activate(write_src
);
457 T_ASSERT_FAIL("unrecognized write mode: %d", shared
.wr_mode
);
461 if (shared
.wr_finished
) {
462 T_LOG("wait shared.wr_finished");
463 kern_return_t kret
= semaphore_timedwait(shared
.wr_finished
, WRITE_timeout
);
464 if (kret
== KERN_OPERATION_TIMED_OUT
) {
465 T_ASSERT_FAIL("write side semaphore timedout after %d seconds", WRITE_timeout
.tv_sec
);
468 T_ASSERT_MACH_SUCCESS(kret
, "semaphore_timedwait shared.wr_finished");
469 semaphore_destroy(mach_task_self(), shared
.wr_finished
);
472 T_LOG("writer finished, closing fd");
473 T_QUIET
; T_ASSERT_POSIX_SUCCESS(close(shared
.wr_fd
), NULL
);
480 static char final_string
[BUF_LEN
];
481 static size_t final_length
;
484 * Read from the master PTY descriptor.
486 * Returns false if EOF is encountered, and true otherwise.
489 handle_reading(enum fd_pair fd_pair
, int fd
)
491 char read_buf
[BUF_LEN
] = { 0 };
492 ssize_t bytes_rd
= 0;
495 if (bytes_rd
== -1) {
496 T_LOG("read was interrupted, retrying");
498 bytes_rd
= read(fd
, read_buf
, sizeof(read_buf
) - 1);
499 } while (bytes_rd
== -1 && errno
== EINTR
);
501 // T_LOG("read %zd bytes: '%s'", bytes_rd, read_buf);
503 T_QUIET
; T_ASSERT_POSIX_SUCCESS(bytes_rd
, "reading from file");
504 T_QUIET
; T_ASSERT_LE(bytes_rd
, (ssize_t
)EXPECTED_LEN
,
505 "read too much from file");
508 T_LOG("read EOF from file");
512 read_buf
[bytes_rd
] = '\0';
513 strlcpy(&(final_string
[final_length
]), read_buf
,
514 sizeof(final_string
) - final_length
);
515 final_length
+= (size_t)bytes_rd
;
517 T_QUIET
; T_ASSERT_LE(final_length
, EXPECTED_LEN
,
518 "should not read more from file than what can be sent");
520 /* FIFOs don't send EOF when the write side closes */
521 if (final_length
== strlen(EXPECTED_STRING
) &&
522 (fd_pair
== FIFO_PAIR
))
524 T_LOG("read all expected bytes from FIFO");
531 workqueue_read_fn(void ** __unused buf
, int * __unused count
)
534 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
535 // "reader thread should be requested at correct QoS");
536 if (!handle_reading(shared
.fd_pair
, shared
.rd_fd
)) {
537 T_LOG("signal shared.rd_finished");
538 semaphore_signal(shared
.rd_finished
);
541 reenable_workq(shared
.rd_fd
, EVFILT_READ
);
545 read_from_fd(int fd
, enum fd_pair fd_pair
, enum read_mode mode
)
549 T_LOG("reader setting up");
551 bzero(final_string
, sizeof(final_string
));
553 fd_flags
= fcntl(fd
, F_GETFL
, 0);
554 T_QUIET
; T_ASSERT_POSIX_SUCCESS(fd_flags
, "fcntl(F_GETFL)");
556 if (!(fd_flags
& O_NONBLOCK
)) {
558 T_ASSERT_POSIX_SUCCESS(fcntl(fd
, F_SETFL
,
559 fd_flags
| O_NONBLOCK
), NULL
);
564 struct pollfd fds
[] = { { .fd
= fd
, .events
= POLLIN
} };
569 int pol
= poll(fds
, 1, READ_TIMEOUT_SECS
* 1000);
570 T_QUIET
; T_ASSERT_POSIX_SUCCESS(pol
, "poll");
571 T_QUIET
; T_ASSERT_NE(pol
, 0,
572 "poll should not time out after %d seconds, read %zd out "
574 READ_TIMEOUT_SECS
, final_length
, strlen(EXPECTED_STRING
));
575 T_QUIET
; T_ASSERT_FALSE(fds
[0].revents
& POLLERR
,
576 "should not see an error on the device");
577 T_QUIET
; T_ASSERT_FALSE(fds
[0].revents
& POLLNVAL
,
578 "should not set up an invalid poll");
580 if (!handle_reading(fd_pair
, fd
)) {
591 struct timeval tv
= { .tv_sec
= READ_TIMEOUT_SECS
};
595 FD_SET(fd
, &read_fd
);
600 int sel
= select(fd
+ 1, &read_fd
, NULL
, NULL
/*&err_fd*/, &tv
);
601 if (sel
== -1 && errno
== EINTR
) {
602 T_LOG("select interrupted");
607 T_QUIET
; T_ASSERT_POSIX_SUCCESS(sel
, "select");
609 T_QUIET
; T_ASSERT_NE(sel
, 0,
610 "select waited for %d seconds and timed out",
613 /* didn't fail or time out, therefore data is ready */
614 T_QUIET
; T_ASSERT_NE(FD_ISSET(fd
, &read_fd
), 0,
615 "select should show reading fd as readable");
617 if (!handle_reading(fd_pair
, fd
)) {
623 case KEVENT_READ
: /* FALLTHROUGH */
624 case KEVENT64_READ
: /* FALLTHROUGH */
625 case KEVENT_QOS_READ
: {
626 union mode rd_mode
= { .rd
= shared
.rd_mode
};
627 drive_kq(true, rd_mode
, fd_pair
, shared
.rd_fd
);
632 // prohibit ourselves from going multi-threaded see:rdar://33296008
633 _dispatch_prohibit_transition_to_multithreaded(true);
634 T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(
635 workqueue_fn
, workqueue_read_fn
, 0, 0), NULL
);
637 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.rd_finished
, SYNC_POLICY_FIFO
, 0),
638 "semaphore_create shared.rd_finished");
641 T_ASSERT_NE_UINT(shared
.rd_finished
, (unsigned)MACH_PORT_NULL
, "semaphore_create");
644 struct kevent_qos_s events
[] = {{
645 .ident
= (uint64_t)shared
.rd_fd
,
646 .filter
= EVFILT_READ
,
647 .flags
= EV_ADD
| EV_UDATA_SPECIFIC
| EV_DISPATCH
| EV_VANISHED
,
648 .fflags
= NOTE_LOWAT
,
650 .qos
= (int32_t)_pthread_qos_class_encode(EXPECTED_QOS
,
655 int kev
= kevent_qos(-1, changes
== 0 ? NULL
: events
, changes
,
656 events
, 1, NULL
, NULL
,
657 KEVENT_FLAG_WORKQ
| KEVENT_FLAG_ERROR_EVENTS
);
658 if (kev
== -1 && errno
== EINTR
) {
660 T_LOG("kevent_qos was interrupted");
664 T_QUIET
; T_ASSERT_POSIX_SUCCESS(kev
, "kevent_qos");
672 case DISPATCH_READ
: {
673 dispatch_source_t read_src
;
675 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.rd_finished
, SYNC_POLICY_FIFO
, 0),
676 "semaphore_create shared.rd_finished");
679 T_ASSERT_NE_UINT(shared
.rd_finished
, (unsigned)MACH_PORT_NULL
, "semaphore_create");
681 read_src
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
,
682 (uintptr_t)fd
, 0, NULL
);
683 T_QUIET
; T_ASSERT_NOTNULL(read_src
,
684 "dispatch_source_create(DISPATCH_SOURCE_TYPE_READ)");
686 dispatch_block_t handler
= dispatch_block_create_with_qos_class(
687 DISPATCH_BLOCK_ENFORCE_QOS_CLASS
, EXPECTED_QOS
, 0, ^{
689 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
690 // "read handler block should run at correct QoS");
692 if (!handle_reading(fd_pair
, fd
)) {
693 /* finished handling the fd, tear down the source */
694 dispatch_source_cancel(read_src
);
695 dispatch_release(read_src
);
696 T_LOG("signal shared.rd_finished");
697 semaphore_signal(shared
.rd_finished
);
701 dispatch_source_set_event_handler(read_src
, handler
);
702 dispatch_activate(read_src
);
709 T_ASSERT_FAIL("unrecognized read mode: %d", mode
);
713 if (shared
.rd_finished
) {
714 T_LOG("wait shared.rd_finished");
715 kern_return_t kret
= semaphore_timedwait(shared
.rd_finished
, READ_timeout
);
716 if (kret
== KERN_OPERATION_TIMED_OUT
) {
717 T_ASSERT_FAIL("reading timed out after %d seconds", READ_timeout
.tv_sec
);
720 T_ASSERT_MACH_SUCCESS(kret
, "semaphore_timedwait shared.rd_finished");
723 T_EXPECT_EQ_STR(final_string
, EXPECTED_STRING
,
724 "reader should receive valid string");
725 T_QUIET
; T_ASSERT_POSIX_SUCCESS(close(fd
), NULL
);
728 #pragma mark file setup
731 fd_pair_init(enum fd_pair fd_pair
, int *rd_fd
, int *wr_fd
)
735 T_ASSERT_POSIX_SUCCESS(openpty(rd_fd
, wr_fd
, NULL
, NULL
, NULL
),
740 char fifo_path
[] = "/tmp/async-io-fifo.XXXXXX";
741 T_QUIET
; T_ASSERT_NOTNULL(mktemp(fifo_path
), NULL
);
743 T_ASSERT_POSIX_SUCCESS(mkfifo(fifo_path
, 0700), "mkfifo(%s, 0700)",
746 * Opening the read side of a pipe will block until the write
747 * side opens -- use O_NONBLOCK.
749 *rd_fd
= open(fifo_path
, O_RDONLY
| O_NONBLOCK
);
750 T_QUIET
; T_ASSERT_POSIX_SUCCESS(*rd_fd
, "open(... O_RDONLY)");
751 *wr_fd
= open(fifo_path
, O_WRONLY
| O_NONBLOCK
);
752 T_QUIET
; T_ASSERT_POSIX_SUCCESS(*wr_fd
, "open(... O_WRONLY)");
758 T_ASSERT_POSIX_SUCCESS(pipe(pipe_fds
), NULL
);
759 *rd_fd
= pipe_fds
[0];
760 *wr_fd
= pipe_fds
[1];
766 T_ASSERT_POSIX_SUCCESS(socketpair(AF_UNIX
, SOCK_STREAM
, 0, sock_fds
),
768 *rd_fd
= sock_fds
[0];
769 *wr_fd
= sock_fds
[1];
774 T_ASSERT_FAIL("unknown descriptor pair type: %d", fd_pair
);
778 T_QUIET
; T_ASSERT_NE(*rd_fd
, -1, "reading descriptor");
779 T_QUIET
; T_ASSERT_NE(*wr_fd
, -1, "writing descriptor");
782 #pragma mark single process
785 drive_threads(enum fd_pair fd_pair
, enum read_mode rd_mode
,
786 enum write_mode wr_mode
)
790 shared
.fd_pair
= fd_pair
;
791 shared
.rd_mode
= rd_mode
;
792 shared
.wr_mode
= wr_mode
;
793 fd_pair_init(fd_pair
, &(shared
.rd_fd
), &(shared
.wr_fd
));
795 shared
.wr_kind
= THREAD_WRITER
;
796 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared
.wr_wait
.sem
, SYNC_POLICY_FIFO
, 0),
797 "semaphore_create shared.wr_wait.sem");
800 T_ASSERT_POSIX_ZERO(pthread_create(&thread
, NULL
, write_to_fd
, NULL
),
802 T_LOG("created writer thread");
804 read_from_fd(shared
.rd_fd
, fd_pair
, rd_mode
);
806 T_ASSERT_POSIX_ZERO(pthread_join(thread
, NULL
), NULL
);
811 #pragma mark multiple processes
813 static void __attribute__((noreturn
))
814 drive_processes(enum fd_pair fd_pair
, enum read_mode rd_mode
, enum write_mode wr_mode
)
816 shared
.fd_pair
= fd_pair
;
817 shared
.rd_mode
= rd_mode
;
818 shared
.wr_mode
= wr_mode
;
819 fd_pair_init(fd_pair
, &(shared
.rd_fd
), &(shared
.wr_fd
));
821 shared
.wr_kind
= PROCESS_WRITER
;
823 T_QUIET
; T_ASSERT_POSIX_SUCCESS(pipe(fds
), NULL
);
824 shared
.wr_wait
.out_fd
= fds
[0];
825 shared
.wr_wait
.in_fd
= fds
[1];
827 T_LOG("starting subprocesses");
828 dt_helper_t helpers
[2] = {
829 dt_fork_helper("reader_helper"),
830 dt_fork_helper("writer_helper")
836 dt_run_helpers(helpers
, 2, 50000);
839 T_HELPER_DECL(reader_helper
, "Read asynchronously")
842 read_from_fd(shared
.rd_fd
, shared
.fd_pair
, shared
.rd_mode
);
846 T_HELPER_DECL(writer_helper
, "Write asynchronously")
854 #define WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \
855 write_mode, read_name, read_mode) \
856 T_DECL(desc_name##_r##read_name##_w##write_name##_procs, "read changes to a " \
857 #desc_name " with " #read_name " and writing " #write_str \
858 " across two processes") \
860 drive_processes(fd_pair, read_mode, write_mode); \
862 #define WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \
863 write_mode, read_name, read_mode) \
864 T_DECL(desc_name##_r##read_name##_w##write_name##_thds, "read changes to a " \
865 #desc_name " with " #read_name " and writing " #write_str) \
867 drive_threads(fd_pair, read_mode, write_mode); \
870 #define WR_DECL(desc_name, fd_pair, write_name, write_str, write_mode, \
871 read_name, read_mode) \
872 WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \
873 write_mode, read_name, read_mode) \
874 WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \
875 write_mode, read_name, read_mode)
877 #define RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
878 WR_DECL(desc_name, fd_pair, full, "the full string", FULL_WRITE, \
879 read_name, read_mode) \
880 WR_DECL(desc_name, fd_pair, inc, "incrementally", \
881 INCREMENTAL_WRITE, read_name, read_mode)
883 #define RD_DECL_DISPATCH_ONLY(suffix, desc_name, fd_pair, read_name, \
885 WR_DECL##suffix(desc_name, fd_pair, inc_dispatch, \
886 "incrementally with a dispatch source", \
887 DISPATCH_INCREMENTAL_WRITE, read_name, read_mode)
888 #define RD_DECL_WORKQ_ONLY(suffix, desc_name, fd_pair, read_name, \
890 WR_DECL##suffix(desc_name, fd_pair, inc_workq, \
891 "incrementally with the workqueue", \
892 WORKQ_INCREMENTAL_WRITE, read_name, read_mode)
894 #define RD_DECL(desc_name, fd_pair, read_name, read_mode) \
895 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
896 RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode)
897 // RD_DECL_WORKQ_ONLY(, desc_name, fd_pair, read_name, read_mode)
900 * dispatch_source tests cannot share the same process as other workqueue
903 #define RD_DECL_DISPATCH(desc_name, fd_pair, read_name, read_mode) \
904 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
905 RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode) \
906 RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
910 * Workqueue tests cannot share the same process as other workqueue or
911 * dispatch_source tests.
912 #define RD_DECL_WORKQ(desc_name, fd_pair, read_name, read_mode) \
913 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
914 RD_DECL_DISPATCH_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
916 RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
920 #define PAIR_DECL(desc_name, fd_pair) \
921 RD_DECL(desc_name, fd_pair, poll, POLL_READ) \
922 RD_DECL(desc_name, fd_pair, select, SELECT_READ) \
923 RD_DECL(desc_name, fd_pair, kevent, KEVENT_READ) \
924 RD_DECL(desc_name, fd_pair, kevent64, KEVENT64_READ) \
925 RD_DECL(desc_name, fd_pair, kevent_qos, KEVENT_QOS_READ) \
926 RD_DECL_DISPATCH(desc_name, fd_pair, dispatch_source, DISPATCH_READ)
927 // RD_DECL_WORKQ(desc_name, fd_pair, workq, WORKQ_READ)
929 PAIR_DECL(tty
, PTY_PAIR
)
930 PAIR_DECL(pipe
, PIPE_PAIR
)
931 PAIR_DECL(fifo
, FIFO_PAIR
)
932 PAIR_DECL(socket
, SOCKET_PAIR
)