]> git.saurik.com Git - apple/xnu.git/blame - tests/poll_select_kevent_paired_fds.c
xnu-7195.101.1.tar.gz
[apple/xnu.git] / tests / poll_select_kevent_paired_fds.c
CommitLineData
813fb2f6
A
1#ifdef T_NAMESPACE
2#undef T_NAMESPACE
3#endif
5ba3f43e 4
813fb2f6 5#include <darwintest.h>
5ba3f43e 6#include <mach/mach.h>
813fb2f6
A
7#include <darwintest_multiprocess.h>
8
9#include <assert.h>
10#include <dispatch/dispatch.h>
5ba3f43e 11#include <dispatch/private.h>
813fb2f6
A
12#include <err.h>
13#include <errno.h>
14#include <fcntl.h>
15#include <poll.h>
16#include <pthread.h>
17#include <pthread/workqueue_private.h>
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <sys/event.h>
22#include <sys/socket.h>
23#include <sys/stat.h>
24#include <sys/time.h>
25#include <sys/types.h>
26#include <sys/wait.h>
27#include <sysexits.h>
28#include <unistd.h>
29#include <util.h>
30#include <System/sys/event.h> /* kevent_qos */
31
5ba3f43e 32T_GLOBAL_META(
0a7de745
A
33 T_META_NAMESPACE("xnu.kevent"),
34 T_META_CHECK_LEAKS(false),
35 T_META_LTEPHASE(LTE_POSTINIT));
813fb2f6
A
36
37/*
38 * Test to validate that monitoring a PTY device, FIFO, pipe, or socket pair in
39 * a dispatch source, kqueue, poll, or select delivers read events within and
40 * between processes as expected.
41 *
42 * This test catches issues with watching special devices in kqueue(),
43 * which has tricky special cases for character devices like PTYs.
44 *
45 * It also exercises the path to wake up a dispatch worker thread from the
46 * special device kqueue event, which is also a special case in kqueue().
47 *
48 * See rdar://problem/26240299&26220074&26226862&28625427 for examples and
49 * history.
50 */
51
52#define EXPECTED_STRING "abcdefghijklmnopqrstuvwxyz. ABCDEFGHIJKLMNOPQRSTUVWXYZ. 1234567890"
53#define EXPECTED_LEN strlen(EXPECTED_STRING)
54
55#define READ_SETUP_TIMEOUT_SECS 2
56#define WRITE_TIMEOUT_SECS 4
5ba3f43e 57#define READ_TIMEOUT_SECS 4
813fb2f6
A
58#define INCREMENTAL_WRITE_SLEEP_USECS 50
59
5ba3f43e
A
60static mach_timespec_t READ_SETUP_timeout = {.tv_sec = READ_SETUP_TIMEOUT_SECS, .tv_nsec = 0};
61static mach_timespec_t READ_timeout = {.tv_sec = READ_TIMEOUT_SECS, .tv_nsec = 0};
62static mach_timespec_t WRITE_timeout = {.tv_sec = WRITE_TIMEOUT_SECS, .tv_nsec = 0};
63
813fb2f6
A
64enum fd_pair {
65 PTY_PAIR,
66 FIFO_PAIR,
67 PIPE_PAIR,
68 SOCKET_PAIR
69};
70
71enum write_mode {
72 FULL_WRITE,
73 INCREMENTAL_WRITE,
74 KEVENT_INCREMENTAL_WRITE,
75 KEVENT64_INCREMENTAL_WRITE,
76 KEVENT_QOS_INCREMENTAL_WRITE,
77 WORKQ_INCREMENTAL_WRITE,
78 DISPATCH_INCREMENTAL_WRITE
79};
80
81enum read_mode {
82 POLL_READ,
83 SELECT_READ,
84 KEVENT_READ,
85 KEVENT64_READ,
86 KEVENT_QOS_READ,
87 WORKQ_READ,
88 DISPATCH_READ
89};
90
91union mode {
92 enum read_mode rd;
93 enum write_mode wr;
94};
95
96static struct {
97 enum fd_pair fd_pair;
98 enum write_mode wr_mode;
99 int wr_fd;
100 enum read_mode rd_mode;
101 int rd_fd;
102
103 enum writer_kind {
104 THREAD_WRITER, /* sem */
105 PROCESS_WRITER /* fd */
106 } wr_kind;
107 union {
5ba3f43e 108 semaphore_t sem;
813fb2f6
A
109 struct {
110 int in_fd;
111 int out_fd;
112 };
113 } wr_wait;
5ba3f43e
A
114 semaphore_t wr_finished;
115 semaphore_t rd_finished;
813fb2f6
A
116} shared;
117
118static bool handle_reading(enum fd_pair fd_pair, int fd);
119static bool handle_writing(enum fd_pair fd_pair, int fd);
120static void drive_kq(bool reading, union mode mode, enum fd_pair fd_pair,
0a7de745 121 int fd);
813fb2f6
A
122
123#pragma mark writing
124
125static void
126wake_writer(void)
127{
128 T_LOG("waking writer");
129
130 switch (shared.wr_kind) {
131 case THREAD_WRITER:
5ba3f43e
A
132 T_LOG("signal shared.wr_wait.sem");
133 semaphore_signal(shared.wr_wait.sem);
813fb2f6
A
134 break;
135 case PROCESS_WRITER: {
136 char tmp = 'a';
137 close(shared.wr_wait.out_fd);
138 T_QUIET; T_ASSERT_POSIX_SUCCESS(write(
0a7de745 139 shared.wr_wait.in_fd, &tmp, 1), NULL);
813fb2f6
A
140 break;
141 }
142 }
143}
144
145static void
146writer_wait(void)
147{
148 switch (shared.wr_kind) {
149 case THREAD_WRITER:
5ba3f43e
A
150 T_LOG("wait shared.wr_wait.sem");
151 kern_return_t kret = semaphore_timedwait(shared.wr_wait.sem, READ_SETUP_timeout);
152
153 if (kret == KERN_OPERATION_TIMED_OUT) {
154 T_ASSERT_FAIL("THREAD_WRITER semaphore timedout after %d seconds", READ_SETUP_timeout.tv_sec);
155 }
156 T_QUIET;
157 T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.wr_wait.sem");
813fb2f6 158 break;
5ba3f43e 159
813fb2f6
A
160 case PROCESS_WRITER: {
161 char tmp;
162 close(shared.wr_wait.in_fd);
163 T_QUIET; T_ASSERT_POSIX_SUCCESS(read(
0a7de745 164 shared.wr_wait.out_fd, &tmp, 1), NULL);
813fb2f6
A
165 break;
166 }
167 }
168
169 T_LOG("writer woken up, starting to write");
170}
171
172static bool
173handle_writing(enum fd_pair __unused fd_pair, int fd)
174{
175 static unsigned int cur_char = 0;
176 T_QUIET; T_ASSERT_POSIX_SUCCESS(write(fd,
0a7de745 177 &(EXPECTED_STRING[cur_char]), 1), NULL);
813fb2f6
A
178 cur_char++;
179
0a7de745 180 return cur_char < EXPECTED_LEN;
813fb2f6
A
181}
182
183#define EXPECTED_QOS QOS_CLASS_USER_INITIATED
184
185static void
186reenable_workq(int fd, int16_t filt)
187{
188 struct kevent_qos_s events[] = {{
0a7de745
A
189 .ident = (uint64_t)fd,
190 .filter = filt,
191 .flags = EV_ENABLE | EV_UDATA_SPECIFIC | EV_DISPATCH,
192 .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS,
193 0, 0),
194 .fflags = NOTE_LOWAT,
195 .data = 1
196 }};
813fb2f6
A
197
198 int kev = kevent_qos(-1, events, 1, events, 1, NULL, NULL,
0a7de745 199 KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS);
813fb2f6
A
200 T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "reenable workq in kevent_qos");
201}
202
203static void
204workqueue_write_fn(void ** __unused buf, int * __unused count)
205{
206 // T_MAYFAIL;
207 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
0a7de745 208 // "writer thread should be woken up at correct QoS");
813fb2f6
A
209 if (!handle_writing(shared.fd_pair, shared.wr_fd)) {
210 /* finished handling the fd, tear down the source */
5ba3f43e
A
211 T_LOG("signal shared.wr_finished");
212 semaphore_signal(shared.wr_finished);
813fb2f6
A
213 return;
214 }
215
216 reenable_workq(shared.wr_fd, EVFILT_WRITE);
217}
218
219static void
220workqueue_fn(pthread_priority_t __unused priority)
221{
222 T_ASSERT_FAIL("workqueue function callback was called");
223}
224
225static void
226drive_kq(bool reading, union mode mode, enum fd_pair fd_pair, int fd)
227{
228 struct timespec timeout = { .tv_sec = READ_TIMEOUT_SECS };
229 int kev = -1;
230
231 struct kevent events;
232 EV_SET(&events, fd, reading ? EVFILT_READ : EVFILT_WRITE, EV_ADD,
0a7de745 233 NOTE_LOWAT, 1, NULL);
813fb2f6
A
234 struct kevent64_s events64;
235 EV_SET64(&events64, fd, reading ? EVFILT_READ : EVFILT_WRITE, EV_ADD,
0a7de745 236 NOTE_LOWAT, 1, 0, 0, 0);
813fb2f6 237 struct kevent_qos_s events_qos[] = {{
0a7de745
A
238 .ident = (uint64_t)fd,
239 .filter = reading ? EVFILT_READ : EVFILT_WRITE,
240 .flags = EV_ADD,
241 .fflags = NOTE_LOWAT,
242 .data = 1
243 }, {
244 .ident = 0,
245 .filter = EVFILT_TIMER,
246 .flags = EV_ADD,
247 .fflags = NOTE_SECONDS,
248 .data = READ_TIMEOUT_SECS
249 }};
813fb2f6
A
250
251 /* determine which variant of kevent to use */
252 enum read_mode which_kevent;
253 if (reading) {
254 which_kevent = mode.rd;
255 } else {
256 if (mode.wr == KEVENT_INCREMENTAL_WRITE) {
257 which_kevent = KEVENT_READ;
258 } else if (mode.wr == KEVENT64_INCREMENTAL_WRITE) {
259 which_kevent = KEVENT64_READ;
260 } else if (mode.wr == KEVENT_QOS_INCREMENTAL_WRITE) {
261 which_kevent = KEVENT_QOS_READ;
262 } else {
263 T_ASSERT_FAIL("unexpected mode: %d", mode.wr);
264 __builtin_unreachable();
265 }
266 }
267
268 int kq_fd = kqueue();
269 T_QUIET; T_ASSERT_POSIX_SUCCESS(kq_fd, "kqueue");
270
271 switch (which_kevent) {
272 case KEVENT_READ:
273 kev = kevent(kq_fd, &events, 1, NULL, 0, NULL);
274 break;
275 case KEVENT64_READ:
276 kev = kevent64(kq_fd, &events64, 1, NULL, 0, 0, NULL);
277 break;
278 case KEVENT_QOS_READ:
279 kev = kevent_qos(kq_fd, events_qos, 2, NULL, 0, NULL, NULL, 0);
280 break;
281 case POLL_READ: /* FALLTHROUGH */
282 case SELECT_READ: /* FALLTHROUGH */
283 case DISPATCH_READ: /* FALLTHROUGH */
284 case WORKQ_READ: /* FALLTHROUGH */
285 default:
286 T_ASSERT_FAIL("unexpected mode: %d", reading ? mode.rd : mode.wr);
287 break;
288 }
289
290 if (reading) {
291 wake_writer();
292 } else {
293 writer_wait();
294 }
295
296 for (;;) {
297 switch (which_kevent) {
298 case KEVENT_READ:
299 kev = kevent(kq_fd, NULL, 0, &events, 1, &timeout);
300 break;
301 case KEVENT64_READ:
302 kev = kevent64(kq_fd, NULL, 0, &events64, 1, 0, &timeout);
303 break;
304 case KEVENT_QOS_READ:
305 kev = kevent_qos(kq_fd, NULL, 0, events_qos, 2, NULL, NULL, 0);
306
307 /* check for a timeout */
308 for (int i = 0; i < kev; i++) {
309 if (events_qos[i].filter == EVFILT_TIMER) {
310 kev = 0;
311 }
312 }
313 break;
314 case POLL_READ: /* FALLTHROUGH */
315 case SELECT_READ: /* FALLTHROUGH */
316 case DISPATCH_READ: /* FALLTHROUGH */
317 case WORKQ_READ: /* FALLTHROUGH */
318 default:
319 T_ASSERT_FAIL("unexpected mode: %d", reading ? mode.rd : mode.wr);
320 break;
321 }
322
323 if (kev == -1 && errno == EINTR) {
324 T_LOG("kevent was interrupted");
325 continue;
326 }
327 T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent");
813fb2f6
A
328 T_QUIET; T_ASSERT_NE(kev, 0, "kevent timed out");
329
330 if (reading) {
331 if (!handle_reading(fd_pair, fd)) {
332 break;
333 }
334 } else {
335 if (!handle_writing(fd_pair, fd)) {
336 break;
337 }
338 }
339 }
340
341 close(kq_fd);
342}
343
344static void *
345write_to_fd(void * __unused ctx)
346{
347 ssize_t bytes_wr = 0;
348
349 writer_wait();
350
351 switch (shared.wr_mode) {
352 case FULL_WRITE:
353 do {
354 if (bytes_wr == -1) {
355 T_LOG("write from child was interrupted");
356 }
357 bytes_wr = write(shared.wr_fd, EXPECTED_STRING,
0a7de745 358 EXPECTED_LEN);
813fb2f6
A
359 } while (bytes_wr == -1 && errno == EINTR);
360 T_QUIET; T_ASSERT_POSIX_SUCCESS(bytes_wr, "write");
361 T_QUIET; T_ASSERT_EQ(bytes_wr, (ssize_t)EXPECTED_LEN,
0a7de745 362 "wrote enough bytes");
813fb2f6
A
363 break;
364
365 case INCREMENTAL_WRITE:
0a7de745 366 for (unsigned int i = 0; i < EXPECTED_LEN; i++) {
813fb2f6
A
367 T_QUIET;
368 T_ASSERT_POSIX_SUCCESS(write(shared.wr_fd,
0a7de745 369 &(EXPECTED_STRING[i]), 1), NULL);
813fb2f6
A
370 usleep(INCREMENTAL_WRITE_SLEEP_USECS);
371 }
372 break;
373
374 case KEVENT_INCREMENTAL_WRITE: /* FALLTHROUGH */
375 case KEVENT64_INCREMENTAL_WRITE: /* FALLTHROUGH */
376 case KEVENT_QOS_INCREMENTAL_WRITE: {
377 union mode mode = { .wr = shared.wr_mode };
378 drive_kq(false, mode, shared.fd_pair, shared.wr_fd);
379 break;
380 }
381
382 case WORKQ_INCREMENTAL_WRITE: {
5ba3f43e
A
383 // prohibit ourselves from going multi-threaded see:rdar://33296008
384 _dispatch_prohibit_transition_to_multithreaded(true);
813fb2f6
A
385 int changes = 1;
386
5ba3f43e 387 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_finished, SYNC_POLICY_FIFO, 0),
0a7de745 388 "semaphore_create shared.wr_finished");
813fb2f6 389
5ba3f43e
A
390 T_QUIET;
391 T_ASSERT_NE_UINT(shared.wr_finished, (unsigned)MACH_PORT_NULL, "wr_finished semaphore_create");
392
393 T_QUIET;
394 T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(workqueue_fn, workqueue_write_fn, 0, 0), NULL);
813fb2f6
A
395
396 struct kevent_qos_s events[] = {{
0a7de745
A
397 .ident = (uint64_t)shared.wr_fd,
398 .filter = EVFILT_WRITE,
399 .flags = EV_ADD | EV_UDATA_SPECIFIC | EV_DISPATCH | EV_VANISHED,
400 .fflags = NOTE_LOWAT,
401 .data = 1,
402 .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS,
403 0, 0)
404 }};
813fb2f6
A
405
406 for (;;) {
407 int kev = kevent_qos(-1, changes == 0 ? NULL : events, changes,
0a7de745
A
408 events, 1, NULL, NULL,
409 KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS);
813fb2f6
A
410 if (kev == -1 && errno == EINTR) {
411 changes = 0;
412 T_LOG("kevent_qos was interrupted");
413 continue;
414 }
415
416 T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent_qos");
417 break;
418 }
419 break;
420 }
421
422 case DISPATCH_INCREMENTAL_WRITE: {
423 dispatch_source_t write_src;
424
5ba3f43e 425 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_finished, SYNC_POLICY_FIFO, 0),
0a7de745 426 "semaphore_create shared.wr_finished");
5ba3f43e
A
427
428 T_QUIET;
429 T_ASSERT_NE_UINT(shared.wr_finished, (unsigned)MACH_PORT_NULL, "semaphore_create");
813fb2f6
A
430
431 write_src = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE,
0a7de745 432 (uintptr_t)shared.wr_fd, 0, NULL);
813fb2f6 433 T_QUIET; T_ASSERT_NOTNULL(write_src,
0a7de745 434 "dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE ...)");
813fb2f6
A
435
436 dispatch_block_t handler = dispatch_block_create_with_qos_class(
0a7de745
A
437 DISPATCH_BLOCK_ENFORCE_QOS_CLASS, EXPECTED_QOS, 0, ^{
438 // T_MAYFAIL;
439 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
440 // "write handler block should run at correct QoS");
441 if (!handle_writing(shared.fd_pair, shared.wr_fd)) {
442 /* finished handling the fd, tear down the source */
443 dispatch_source_cancel(write_src);
444 dispatch_release(write_src);
445 T_LOG("signal shared.wr_finished");
446 semaphore_signal(shared.wr_finished);
447 }
448 });
813fb2f6
A
449
450 dispatch_source_set_event_handler(write_src, handler);
451 dispatch_activate(write_src);
452
453 break;
454 }
455
456 default:
457 T_ASSERT_FAIL("unrecognized write mode: %d", shared.wr_mode);
458 break;
459 }
460
461 if (shared.wr_finished) {
5ba3f43e
A
462 T_LOG("wait shared.wr_finished");
463 kern_return_t kret = semaphore_timedwait(shared.wr_finished, WRITE_timeout);
464 if (kret == KERN_OPERATION_TIMED_OUT) {
465 T_ASSERT_FAIL("write side semaphore timedout after %d seconds", WRITE_timeout.tv_sec);
813fb2f6 466 }
5ba3f43e
A
467 T_QUIET;
468 T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.wr_finished");
469 semaphore_destroy(mach_task_self(), shared.wr_finished);
813fb2f6
A
470 }
471
472 T_LOG("writer finished, closing fd");
473 T_QUIET; T_ASSERT_POSIX_SUCCESS(close(shared.wr_fd), NULL);
474 return NULL;
475}
476
477#pragma mark reading
478
479#define BUF_LEN 1024
480static char final_string[BUF_LEN];
481static size_t final_length;
482
483/*
484 * Read from the master PTY descriptor.
485 *
486 * Returns false if EOF is encountered, and true otherwise.
487 */
488static bool
489handle_reading(enum fd_pair fd_pair, int fd)
490{
491 char read_buf[BUF_LEN] = { 0 };
492 ssize_t bytes_rd = 0;
493
494 do {
495 if (bytes_rd == -1) {
496 T_LOG("read was interrupted, retrying");
497 }
498 bytes_rd = read(fd, read_buf, sizeof(read_buf) - 1);
499 } while (bytes_rd == -1 && errno == EINTR);
500
5ba3f43e
A
501 // T_LOG("read %zd bytes: '%s'", bytes_rd, read_buf);
502
813fb2f6
A
503 T_QUIET; T_ASSERT_POSIX_SUCCESS(bytes_rd, "reading from file");
504 T_QUIET; T_ASSERT_LE(bytes_rd, (ssize_t)EXPECTED_LEN,
0a7de745 505 "read too much from file");
813fb2f6
A
506
507 if (bytes_rd == 0) {
508 T_LOG("read EOF from file");
509 return false;
510 }
511
512 read_buf[bytes_rd] = '\0';
513 strlcpy(&(final_string[final_length]), read_buf,
0a7de745 514 sizeof(final_string) - final_length);
813fb2f6
A
515 final_length += (size_t)bytes_rd;
516
813fb2f6 517 T_QUIET; T_ASSERT_LE(final_length, EXPECTED_LEN,
0a7de745 518 "should not read more from file than what can be sent");
813fb2f6 519
a39ff7e2 520 /* FIFOs don't send EOF when the write side closes */
813fb2f6 521 if (final_length == strlen(EXPECTED_STRING) &&
0a7de745 522 (fd_pair == FIFO_PAIR)) {
a39ff7e2 523 T_LOG("read all expected bytes from FIFO");
813fb2f6
A
524 return false;
525 }
526 return true;
527}
528
529static void
530workqueue_read_fn(void ** __unused buf, int * __unused count)
531{
532 // T_MAYFAIL;
533 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
0a7de745 534 // "reader thread should be requested at correct QoS");
813fb2f6 535 if (!handle_reading(shared.fd_pair, shared.rd_fd)) {
5ba3f43e
A
536 T_LOG("signal shared.rd_finished");
537 semaphore_signal(shared.rd_finished);
813fb2f6
A
538 }
539
540 reenable_workq(shared.rd_fd, EVFILT_READ);
541}
542
543static void
544read_from_fd(int fd, enum fd_pair fd_pair, enum read_mode mode)
545{
546 int fd_flags;
547
548 T_LOG("reader setting up");
549
550 bzero(final_string, sizeof(final_string));
551
552 fd_flags = fcntl(fd, F_GETFL, 0);
553 T_QUIET; T_ASSERT_POSIX_SUCCESS(fd_flags, "fcntl(F_GETFL)");
554
555 if (!(fd_flags & O_NONBLOCK)) {
556 T_QUIET;
557 T_ASSERT_POSIX_SUCCESS(fcntl(fd, F_SETFL,
0a7de745 558 fd_flags | O_NONBLOCK), NULL);
813fb2f6
A
559 }
560
561 switch (mode) {
562 case POLL_READ: {
563 struct pollfd fds[] = { { .fd = fd, .events = POLLIN } };
564 wake_writer();
5ba3f43e 565
813fb2f6
A
566 for (;;) {
567 fds[0].revents = 0;
568 int pol = poll(fds, 1, READ_TIMEOUT_SECS * 1000);
569 T_QUIET; T_ASSERT_POSIX_SUCCESS(pol, "poll");
813fb2f6 570 T_QUIET; T_ASSERT_NE(pol, 0,
0a7de745
A
571 "poll should not time out after %d seconds, read %zd out "
572 "of %zu bytes",
573 READ_TIMEOUT_SECS, final_length, strlen(EXPECTED_STRING));
813fb2f6 574 T_QUIET; T_ASSERT_FALSE(fds[0].revents & POLLERR,
0a7de745 575 "should not see an error on the device");
5ba3f43e 576 T_QUIET; T_ASSERT_FALSE(fds[0].revents & POLLNVAL,
0a7de745 577 "should not set up an invalid poll");
813fb2f6
A
578
579 if (!handle_reading(fd_pair, fd)) {
580 break;
581 }
582 }
583 break;
584 }
585
586 case SELECT_READ:
587 wake_writer();
588
589 for (;;) {
590 struct timeval tv = { .tv_sec = READ_TIMEOUT_SECS };
591
592 fd_set read_fd;
593 FD_ZERO(&read_fd);
594 FD_SET(fd, &read_fd);
595 fd_set err_fd;
596 FD_ZERO(&err_fd);
597 FD_SET(fd, &err_fd);
598
0a7de745 599 int sel = select(fd + 1, &read_fd, NULL, NULL /*&err_fd*/, &tv);
813fb2f6
A
600 if (sel == -1 && errno == EINTR) {
601 T_LOG("select interrupted");
602 continue;
603 }
604 (void)fd_pair;
605
606 T_QUIET; T_ASSERT_POSIX_SUCCESS(sel, "select");
607
813fb2f6 608 T_QUIET; T_ASSERT_NE(sel, 0,
0a7de745
A
609 "select waited for %d seconds and timed out",
610 READ_TIMEOUT_SECS);
813fb2f6 611
813fb2f6
A
612 /* didn't fail or time out, therefore data is ready */
613 T_QUIET; T_ASSERT_NE(FD_ISSET(fd, &read_fd), 0,
0a7de745 614 "select should show reading fd as readable");
813fb2f6
A
615
616 if (!handle_reading(fd_pair, fd)) {
617 break;
618 }
619 }
620 break;
621
622 case KEVENT_READ: /* FALLTHROUGH */
623 case KEVENT64_READ: /* FALLTHROUGH */
624 case KEVENT_QOS_READ: {
625 union mode rd_mode = { .rd = shared.rd_mode };
626 drive_kq(true, rd_mode, fd_pair, shared.rd_fd);
627 break;
628 }
629
630 case WORKQ_READ: {
5ba3f43e
A
631 // prohibit ourselves from going multi-threaded see:rdar://33296008
632 _dispatch_prohibit_transition_to_multithreaded(true);
633 T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(
0a7de745 634 workqueue_fn, workqueue_read_fn, 0, 0), NULL);
813fb2f6 635
5ba3f43e 636 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.rd_finished, SYNC_POLICY_FIFO, 0),
0a7de745 637 "semaphore_create shared.rd_finished");
5ba3f43e
A
638
639 T_QUIET;
640 T_ASSERT_NE_UINT(shared.rd_finished, (unsigned)MACH_PORT_NULL, "semaphore_create");
813fb2f6
A
641
642 int changes = 1;
643 struct kevent_qos_s events[] = {{
0a7de745
A
644 .ident = (uint64_t)shared.rd_fd,
645 .filter = EVFILT_READ,
646 .flags = EV_ADD | EV_UDATA_SPECIFIC | EV_DISPATCH | EV_VANISHED,
647 .fflags = NOTE_LOWAT,
648 .data = 1,
649 .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS,
650 0, 0)
651 }};
813fb2f6
A
652
653 for (;;) {
654 int kev = kevent_qos(-1, changes == 0 ? NULL : events, changes,
0a7de745
A
655 events, 1, NULL, NULL,
656 KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS);
813fb2f6
A
657 if (kev == -1 && errno == EINTR) {
658 changes = 0;
659 T_LOG("kevent_qos was interrupted");
660 continue;
661 }
662
663 T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent_qos");
664 break;
665 }
666
667 wake_writer();
668 break;
669 }
670
671 case DISPATCH_READ: {
672 dispatch_source_t read_src;
673
5ba3f43e 674 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.rd_finished, SYNC_POLICY_FIFO, 0),
0a7de745 675 "semaphore_create shared.rd_finished");
5ba3f43e
A
676
677 T_QUIET;
678 T_ASSERT_NE_UINT(shared.rd_finished, (unsigned)MACH_PORT_NULL, "semaphore_create");
813fb2f6
A
679
680 read_src = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
0a7de745 681 (uintptr_t)fd, 0, NULL);
813fb2f6 682 T_QUIET; T_ASSERT_NOTNULL(read_src,
0a7de745 683 "dispatch_source_create(DISPATCH_SOURCE_TYPE_READ)");
813fb2f6
A
684
685 dispatch_block_t handler = dispatch_block_create_with_qos_class(
0a7de745
A
686 DISPATCH_BLOCK_ENFORCE_QOS_CLASS, EXPECTED_QOS, 0, ^{
687 // T_MAYFAIL;
688 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
689 // "read handler block should run at correct QoS");
690
691 if (!handle_reading(fd_pair, fd)) {
692 /* finished handling the fd, tear down the source */
693 dispatch_source_cancel(read_src);
694 dispatch_release(read_src);
695 T_LOG("signal shared.rd_finished");
696 semaphore_signal(shared.rd_finished);
697 }
698 });
813fb2f6
A
699
700 dispatch_source_set_event_handler(read_src, handler);
701 dispatch_activate(read_src);
702
703 wake_writer();
704 break;
705 }
706
707 default:
708 T_ASSERT_FAIL("unrecognized read mode: %d", mode);
709 break;
710 }
711
712 if (shared.rd_finished) {
5ba3f43e
A
713 T_LOG("wait shared.rd_finished");
714 kern_return_t kret = semaphore_timedwait(shared.rd_finished, READ_timeout);
715 if (kret == KERN_OPERATION_TIMED_OUT) {
716 T_ASSERT_FAIL("reading timed out after %d seconds", READ_timeout.tv_sec);
813fb2f6 717 }
5ba3f43e
A
718 T_QUIET;
719 T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.rd_finished");
813fb2f6
A
720 }
721
722 T_EXPECT_EQ_STR(final_string, EXPECTED_STRING,
0a7de745 723 "reader should receive valid string");
813fb2f6
A
724 T_QUIET; T_ASSERT_POSIX_SUCCESS(close(fd), NULL);
725}
726
727#pragma mark file setup
728
729static void
730fd_pair_init(enum fd_pair fd_pair, int *rd_fd, int *wr_fd)
731{
732 switch (fd_pair) {
733 case PTY_PAIR:
734 T_ASSERT_POSIX_SUCCESS(openpty(rd_fd, wr_fd, NULL, NULL, NULL),
0a7de745 735 NULL);
813fb2f6
A
736 break;
737
738 case FIFO_PAIR: {
739 char fifo_path[] = "/tmp/async-io-fifo.XXXXXX";
740 T_QUIET; T_ASSERT_NOTNULL(mktemp(fifo_path), NULL);
741
742 T_ASSERT_POSIX_SUCCESS(mkfifo(fifo_path, 0700), "mkfifo(%s, 0700)",
0a7de745 743 fifo_path);
813fb2f6
A
744 /*
745 * Opening the read side of a pipe will block until the write
746 * side opens -- use O_NONBLOCK.
747 */
748 *rd_fd = open(fifo_path, O_RDONLY | O_NONBLOCK);
749 T_QUIET; T_ASSERT_POSIX_SUCCESS(*rd_fd, "open(... O_RDONLY)");
750 *wr_fd = open(fifo_path, O_WRONLY | O_NONBLOCK);
751 T_QUIET; T_ASSERT_POSIX_SUCCESS(*wr_fd, "open(... O_WRONLY)");
752 break;
753 }
754
755 case PIPE_PAIR: {
756 int pipe_fds[2];
757 T_ASSERT_POSIX_SUCCESS(pipe(pipe_fds), NULL);
758 *rd_fd = pipe_fds[0];
759 *wr_fd = pipe_fds[1];
760 break;
761 }
762
763 case SOCKET_PAIR: {
764 int sock_fds[2];
765 T_ASSERT_POSIX_SUCCESS(socketpair(AF_UNIX, SOCK_STREAM, 0, sock_fds),
0a7de745 766 NULL);
813fb2f6
A
767 *rd_fd = sock_fds[0];
768 *wr_fd = sock_fds[1];
769 break;
770 }
771
772 default:
773 T_ASSERT_FAIL("unknown descriptor pair type: %d", fd_pair);
774 break;
775 }
776
777 T_QUIET; T_ASSERT_NE(*rd_fd, -1, "reading descriptor");
778 T_QUIET; T_ASSERT_NE(*wr_fd, -1, "writing descriptor");
779}
780
781#pragma mark single process
782
783static void
784drive_threads(enum fd_pair fd_pair, enum read_mode rd_mode,
0a7de745 785 enum write_mode wr_mode)
813fb2f6
A
786{
787 pthread_t thread;
788
789 shared.fd_pair = fd_pair;
790 shared.rd_mode = rd_mode;
791 shared.wr_mode = wr_mode;
792 fd_pair_init(fd_pair, &(shared.rd_fd), &(shared.wr_fd));
793
794 shared.wr_kind = THREAD_WRITER;
5ba3f43e 795 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_wait.sem, SYNC_POLICY_FIFO, 0),
0a7de745 796 "semaphore_create shared.wr_wait.sem");
813fb2f6
A
797
798 T_QUIET;
799 T_ASSERT_POSIX_ZERO(pthread_create(&thread, NULL, write_to_fd, NULL),
0a7de745 800 NULL);
813fb2f6
A
801 T_LOG("created writer thread");
802
803 read_from_fd(shared.rd_fd, fd_pair, rd_mode);
5ba3f43e
A
804
805 T_ASSERT_POSIX_ZERO(pthread_join(thread, NULL), NULL);
806
813fb2f6
A
807 T_END;
808}
809
810#pragma mark multiple processes
811
812static void __attribute__((noreturn))
813drive_processes(enum fd_pair fd_pair, enum read_mode rd_mode, enum write_mode wr_mode)
814{
815 shared.fd_pair = fd_pair;
816 shared.rd_mode = rd_mode;
817 shared.wr_mode = wr_mode;
818 fd_pair_init(fd_pair, &(shared.rd_fd), &(shared.wr_fd));
819
820 shared.wr_kind = PROCESS_WRITER;
821 int fds[2];
822 T_QUIET; T_ASSERT_POSIX_SUCCESS(pipe(fds), NULL);
823 shared.wr_wait.out_fd = fds[0];
824 shared.wr_wait.in_fd = fds[1];
825
826 T_LOG("starting subprocesses");
827 dt_helper_t helpers[2] = {
828 dt_fork_helper("reader_helper"),
829 dt_fork_helper("writer_helper")
830 };
831
832 close(shared.rd_fd);
833 close(shared.wr_fd);
834
835 dt_run_helpers(helpers, 2, 50000);
836}
837
838T_HELPER_DECL(reader_helper, "Read asynchronously")
839{
840 close(shared.wr_fd);
841 read_from_fd(shared.rd_fd, shared.fd_pair, shared.rd_mode);
842 T_END;
843}
844
845T_HELPER_DECL(writer_helper, "Write asynchronously")
846{
847 close(shared.rd_fd);
848 write_to_fd(NULL);
849}
850
851#pragma mark tests
852
853#define WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \
0a7de745
A
854 write_mode, read_name, read_mode) \
855 T_DECL(desc_name##_r##read_name##_w##write_name##_procs, "read changes to a " \
856 #desc_name " with " #read_name " and writing " #write_str \
857 " across two processes") \
858 { \
859 drive_processes(fd_pair, read_mode, write_mode); \
860 }
813fb2f6 861#define WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \
0a7de745
A
862 write_mode, read_name, read_mode) \
863 T_DECL(desc_name##_r##read_name##_w##write_name##_thds, "read changes to a " \
864 #desc_name " with " #read_name " and writing " #write_str) \
865 { \
866 drive_threads(fd_pair, read_mode, write_mode); \
867 }
813fb2f6
A
868
869#define WR_DECL(desc_name, fd_pair, write_name, write_str, write_mode, \
0a7de745
A
870 read_name, read_mode) \
871 WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \
872 write_mode, read_name, read_mode) \
873 WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \
874 write_mode, read_name, read_mode)
813fb2f6
A
875
876#define RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
0a7de745
A
877 WR_DECL(desc_name, fd_pair, full, "the full string", FULL_WRITE, \
878 read_name, read_mode) \
879 WR_DECL(desc_name, fd_pair, inc, "incrementally", \
880 INCREMENTAL_WRITE, read_name, read_mode)
813fb2f6
A
881
882#define RD_DECL_DISPATCH_ONLY(suffix, desc_name, fd_pair, read_name, \
0a7de745
A
883 read_mode) \
884 WR_DECL##suffix(desc_name, fd_pair, inc_dispatch, \
885 "incrementally with a dispatch source", \
886 DISPATCH_INCREMENTAL_WRITE, read_name, read_mode)
813fb2f6 887#define RD_DECL_WORKQ_ONLY(suffix, desc_name, fd_pair, read_name, \
0a7de745
A
888 read_mode) \
889 WR_DECL##suffix(desc_name, fd_pair, inc_workq, \
890 "incrementally with the workqueue", \
891 WORKQ_INCREMENTAL_WRITE, read_name, read_mode)
813fb2f6
A
892
893#define RD_DECL(desc_name, fd_pair, read_name, read_mode) \
0a7de745
A
894 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
895 RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode)
896// RD_DECL_WORKQ_ONLY(, desc_name, fd_pair, read_name, read_mode)
813fb2f6
A
897
898/*
899 * dispatch_source tests cannot share the same process as other workqueue
900 * tests.
901 */
902#define RD_DECL_DISPATCH(desc_name, fd_pair, read_name, read_mode) \
0a7de745
A
903 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
904 RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode) \
905 RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
906 read_mode)
813fb2f6
A
907
908/*
909 * Workqueue tests cannot share the same process as other workqueue or
910 * dispatch_source tests.
0a7de745
A
911 #define RD_DECL_WORKQ(desc_name, fd_pair, read_name, read_mode) \
912 * RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
913 * RD_DECL_DISPATCH_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
914 * read_mode) \
915 * RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
916 * read_mode)
813fb2f6
A
917 */
918
919#define PAIR_DECL(desc_name, fd_pair) \
920 RD_DECL(desc_name, fd_pair, poll, POLL_READ) \
921 RD_DECL(desc_name, fd_pair, select, SELECT_READ) \
922 RD_DECL(desc_name, fd_pair, kevent, KEVENT_READ) \
923 RD_DECL(desc_name, fd_pair, kevent64, KEVENT64_READ) \
924 RD_DECL(desc_name, fd_pair, kevent_qos, KEVENT_QOS_READ) \
925 RD_DECL_DISPATCH(desc_name, fd_pair, dispatch_source, DISPATCH_READ)
0a7de745 926// RD_DECL_WORKQ(desc_name, fd_pair, workq, WORKQ_READ)
813fb2f6
A
927
928PAIR_DECL(tty, PTY_PAIR)
929PAIR_DECL(pipe, PIPE_PAIR)
930PAIR_DECL(fifo, FIFO_PAIR)
931PAIR_DECL(socket, SOCKET_PAIR)