]> git.saurik.com Git - apple/xnu.git/blob - tests/poll_select_kevent_paired_fds.c
xnu-4903.241.1.tar.gz
[apple/xnu.git] / tests / poll_select_kevent_paired_fds.c
1 #ifdef T_NAMESPACE
2 #undef T_NAMESPACE
3 #endif
4
5 #include <darwintest.h>
6 #include <mach/mach.h>
7 #include <darwintest_multiprocess.h>
8
9 #include <assert.h>
10 #include <dispatch/dispatch.h>
11 #include <dispatch/private.h>
12 #include <err.h>
13 #include <errno.h>
14 #include <fcntl.h>
15 #include <poll.h>
16 #include <pthread.h>
17 #include <pthread/workqueue_private.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <sys/event.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 #include <sys/time.h>
25 #include <sys/types.h>
26 #include <sys/wait.h>
27 #include <sysexits.h>
28 #include <unistd.h>
29 #include <util.h>
30 #include <System/sys/event.h> /* kevent_qos */
31
32 T_GLOBAL_META(
33 T_META_NAMESPACE("xnu.kevent"),
34 T_META_CHECK_LEAKS(false),
35 T_META_LTEPHASE(LTE_POSTINIT));
36
37 /*
38 * Test to validate that monitoring a PTY device, FIFO, pipe, or socket pair in
39 * a dispatch source, kqueue, poll, or select delivers read events within and
40 * between processes as expected.
41 *
42 * This test catches issues with watching special devices in kqueue(),
43 * which has tricky special cases for character devices like PTYs.
44 *
45 * It also exercises the path to wake up a dispatch worker thread from the
46 * special device kqueue event, which is also a special case in kqueue().
47 *
48 * See rdar://problem/26240299&26220074&26226862&28625427 for examples and
49 * history.
50 */
51
52 #define EXPECTED_STRING "abcdefghijklmnopqrstuvwxyz. ABCDEFGHIJKLMNOPQRSTUVWXYZ. 1234567890"
53 #define EXPECTED_LEN strlen(EXPECTED_STRING)
54
55 #define READ_SETUP_TIMEOUT_SECS 2
56 #define WRITE_TIMEOUT_SECS 4
57 #define READ_TIMEOUT_SECS 4
58 #define INCREMENTAL_WRITE_SLEEP_USECS 50
59
60 static mach_timespec_t READ_SETUP_timeout = {.tv_sec = READ_SETUP_TIMEOUT_SECS, .tv_nsec = 0};
61 static mach_timespec_t READ_timeout = {.tv_sec = READ_TIMEOUT_SECS, .tv_nsec = 0};
62 static mach_timespec_t WRITE_timeout = {.tv_sec = WRITE_TIMEOUT_SECS, .tv_nsec = 0};
63
64 enum fd_pair {
65 PTY_PAIR,
66 FIFO_PAIR,
67 PIPE_PAIR,
68 SOCKET_PAIR
69 };
70
71 enum write_mode {
72 FULL_WRITE,
73 INCREMENTAL_WRITE,
74 KEVENT_INCREMENTAL_WRITE,
75 KEVENT64_INCREMENTAL_WRITE,
76 KEVENT_QOS_INCREMENTAL_WRITE,
77 WORKQ_INCREMENTAL_WRITE,
78 DISPATCH_INCREMENTAL_WRITE
79 };
80
81 enum read_mode {
82 POLL_READ,
83 SELECT_READ,
84 KEVENT_READ,
85 KEVENT64_READ,
86 KEVENT_QOS_READ,
87 WORKQ_READ,
88 DISPATCH_READ
89 };
90
91 union mode {
92 enum read_mode rd;
93 enum write_mode wr;
94 };
95
96 static struct {
97 enum fd_pair fd_pair;
98 enum write_mode wr_mode;
99 int wr_fd;
100 enum read_mode rd_mode;
101 int rd_fd;
102
103 enum writer_kind {
104 THREAD_WRITER, /* sem */
105 PROCESS_WRITER /* fd */
106 } wr_kind;
107 union {
108 semaphore_t sem;
109 struct {
110 int in_fd;
111 int out_fd;
112 };
113 } wr_wait;
114 semaphore_t wr_finished;
115 semaphore_t rd_finished;
116 } shared;
117
118 static bool handle_reading(enum fd_pair fd_pair, int fd);
119 static bool handle_writing(enum fd_pair fd_pair, int fd);
120 static void drive_kq(bool reading, union mode mode, enum fd_pair fd_pair,
121 int fd);
122
123 #pragma mark writing
124
125 static void
126 wake_writer(void)
127 {
128 T_LOG("waking writer");
129
130 switch (shared.wr_kind) {
131 case THREAD_WRITER:
132 T_LOG("signal shared.wr_wait.sem");
133 semaphore_signal(shared.wr_wait.sem);
134 break;
135 case PROCESS_WRITER: {
136 char tmp = 'a';
137 close(shared.wr_wait.out_fd);
138 T_QUIET; T_ASSERT_POSIX_SUCCESS(write(
139 shared.wr_wait.in_fd, &tmp, 1), NULL);
140 break;
141 }
142 }
143 }
144
145 static void
146 writer_wait(void)
147 {
148 switch (shared.wr_kind) {
149 case THREAD_WRITER:
150 T_LOG("wait shared.wr_wait.sem");
151 kern_return_t kret = semaphore_timedwait(shared.wr_wait.sem, READ_SETUP_timeout);
152
153 if (kret == KERN_OPERATION_TIMED_OUT) {
154 T_ASSERT_FAIL("THREAD_WRITER semaphore timedout after %d seconds", READ_SETUP_timeout.tv_sec);
155 }
156 T_QUIET;
157 T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.wr_wait.sem");
158 break;
159
160 case PROCESS_WRITER: {
161 char tmp;
162 close(shared.wr_wait.in_fd);
163 T_QUIET; T_ASSERT_POSIX_SUCCESS(read(
164 shared.wr_wait.out_fd, &tmp, 1), NULL);
165 break;
166 }
167 }
168
169 T_LOG("writer woken up, starting to write");
170 }
171
172 static bool
173 handle_writing(enum fd_pair __unused fd_pair, int fd)
174 {
175 static unsigned int cur_char = 0;
176 T_QUIET; T_ASSERT_POSIX_SUCCESS(write(fd,
177 &(EXPECTED_STRING[cur_char]), 1), NULL);
178 cur_char++;
179
180 return (cur_char < EXPECTED_LEN);
181 }
182
183 #define EXPECTED_QOS QOS_CLASS_USER_INITIATED
184
185 static void
186 reenable_workq(int fd, int16_t filt)
187 {
188 struct kevent_qos_s events[] = {{
189 .ident = (uint64_t)fd,
190 .filter = filt,
191 .flags = EV_ENABLE | EV_UDATA_SPECIFIC | EV_DISPATCH,
192 .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS,
193 0, 0),
194 .fflags = NOTE_LOWAT,
195 .data = 1
196 }};
197
198 int kev = kevent_qos(-1, events, 1, events, 1, NULL, NULL,
199 KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS);
200 T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "reenable workq in kevent_qos");
201 }
202
203 static void
204 workqueue_write_fn(void ** __unused buf, int * __unused count)
205 {
206 // T_MAYFAIL;
207 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
208 // "writer thread should be woken up at correct QoS");
209 if (!handle_writing(shared.fd_pair, shared.wr_fd)) {
210 /* finished handling the fd, tear down the source */
211 T_LOG("signal shared.wr_finished");
212 semaphore_signal(shared.wr_finished);
213 return;
214 }
215
216 reenable_workq(shared.wr_fd, EVFILT_WRITE);
217 }
218
219 static void
220 workqueue_fn(pthread_priority_t __unused priority)
221 {
222 T_ASSERT_FAIL("workqueue function callback was called");
223 }
224
225 static void
226 drive_kq(bool reading, union mode mode, enum fd_pair fd_pair, int fd)
227 {
228 struct timespec timeout = { .tv_sec = READ_TIMEOUT_SECS };
229 int kev = -1;
230
231 struct kevent events;
232 EV_SET(&events, fd, reading ? EVFILT_READ : EVFILT_WRITE, EV_ADD,
233 NOTE_LOWAT, 1, NULL);
234 struct kevent64_s events64;
235 EV_SET64(&events64, fd, reading ? EVFILT_READ : EVFILT_WRITE, EV_ADD,
236 NOTE_LOWAT, 1, 0, 0, 0);
237 struct kevent_qos_s events_qos[] = {{
238 .ident = (uint64_t)fd,
239 .filter = reading ? EVFILT_READ : EVFILT_WRITE,
240 .flags = EV_ADD,
241 .fflags = NOTE_LOWAT,
242 .data = 1
243 }, {
244 .ident = 0,
245 .filter = EVFILT_TIMER,
246 .flags = EV_ADD,
247 .fflags = NOTE_SECONDS,
248 .data = READ_TIMEOUT_SECS
249 }};
250
251 /* determine which variant of kevent to use */
252 enum read_mode which_kevent;
253 if (reading) {
254 which_kevent = mode.rd;
255 } else {
256 if (mode.wr == KEVENT_INCREMENTAL_WRITE) {
257 which_kevent = KEVENT_READ;
258 } else if (mode.wr == KEVENT64_INCREMENTAL_WRITE) {
259 which_kevent = KEVENT64_READ;
260 } else if (mode.wr == KEVENT_QOS_INCREMENTAL_WRITE) {
261 which_kevent = KEVENT_QOS_READ;
262 } else {
263 T_ASSERT_FAIL("unexpected mode: %d", mode.wr);
264 __builtin_unreachable();
265 }
266 }
267
268 int kq_fd = kqueue();
269 T_QUIET; T_ASSERT_POSIX_SUCCESS(kq_fd, "kqueue");
270
271 switch (which_kevent) {
272 case KEVENT_READ:
273 kev = kevent(kq_fd, &events, 1, NULL, 0, NULL);
274 break;
275 case KEVENT64_READ:
276 kev = kevent64(kq_fd, &events64, 1, NULL, 0, 0, NULL);
277 break;
278 case KEVENT_QOS_READ:
279 kev = kevent_qos(kq_fd, events_qos, 2, NULL, 0, NULL, NULL, 0);
280 break;
281 case POLL_READ: /* FALLTHROUGH */
282 case SELECT_READ: /* FALLTHROUGH */
283 case DISPATCH_READ: /* FALLTHROUGH */
284 case WORKQ_READ: /* FALLTHROUGH */
285 default:
286 T_ASSERT_FAIL("unexpected mode: %d", reading ? mode.rd : mode.wr);
287 break;
288 }
289
290 if (reading) {
291 wake_writer();
292 } else {
293 writer_wait();
294 }
295
296 for (;;) {
297 switch (which_kevent) {
298 case KEVENT_READ:
299 kev = kevent(kq_fd, NULL, 0, &events, 1, &timeout);
300 break;
301 case KEVENT64_READ:
302 kev = kevent64(kq_fd, NULL, 0, &events64, 1, 0, &timeout);
303 break;
304 case KEVENT_QOS_READ:
305 kev = kevent_qos(kq_fd, NULL, 0, events_qos, 2, NULL, NULL, 0);
306
307 /* check for a timeout */
308 for (int i = 0; i < kev; i++) {
309 if (events_qos[i].filter == EVFILT_TIMER) {
310 kev = 0;
311 }
312 }
313 break;
314 case POLL_READ: /* FALLTHROUGH */
315 case SELECT_READ: /* FALLTHROUGH */
316 case DISPATCH_READ: /* FALLTHROUGH */
317 case WORKQ_READ: /* FALLTHROUGH */
318 default:
319 T_ASSERT_FAIL("unexpected mode: %d", reading ? mode.rd : mode.wr);
320 break;
321 }
322
323 if (kev == -1 && errno == EINTR) {
324 T_LOG("kevent was interrupted");
325 continue;
326 }
327 T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent");
328 T_QUIET; T_ASSERT_NE(kev, 0, "kevent timed out");
329
330 if (reading) {
331 if (!handle_reading(fd_pair, fd)) {
332 break;
333 }
334 } else {
335 if (!handle_writing(fd_pair, fd)) {
336 break;
337 }
338 }
339 }
340
341 close(kq_fd);
342 }
343
344 static void *
345 write_to_fd(void * __unused ctx)
346 {
347 ssize_t bytes_wr = 0;
348
349 writer_wait();
350
351 switch (shared.wr_mode) {
352 case FULL_WRITE:
353 do {
354 if (bytes_wr == -1) {
355 T_LOG("write from child was interrupted");
356 }
357 bytes_wr = write(shared.wr_fd, EXPECTED_STRING,
358 EXPECTED_LEN);
359 } while (bytes_wr == -1 && errno == EINTR);
360 T_QUIET; T_ASSERT_POSIX_SUCCESS(bytes_wr, "write");
361 T_QUIET; T_ASSERT_EQ(bytes_wr, (ssize_t)EXPECTED_LEN,
362 "wrote enough bytes");
363 break;
364
365 case INCREMENTAL_WRITE:
366 for (unsigned int i = 0; i < EXPECTED_LEN ; i++) {
367 T_QUIET;
368 T_ASSERT_POSIX_SUCCESS(write(shared.wr_fd,
369 &(EXPECTED_STRING[i]), 1), NULL);
370 usleep(INCREMENTAL_WRITE_SLEEP_USECS);
371 }
372 break;
373
374 case KEVENT_INCREMENTAL_WRITE: /* FALLTHROUGH */
375 case KEVENT64_INCREMENTAL_WRITE: /* FALLTHROUGH */
376 case KEVENT_QOS_INCREMENTAL_WRITE: {
377 union mode mode = { .wr = shared.wr_mode };
378 drive_kq(false, mode, shared.fd_pair, shared.wr_fd);
379 break;
380 }
381
382 case WORKQ_INCREMENTAL_WRITE: {
383 // prohibit ourselves from going multi-threaded see:rdar://33296008
384 _dispatch_prohibit_transition_to_multithreaded(true);
385 int changes = 1;
386
387 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_finished, SYNC_POLICY_FIFO, 0),
388 "semaphore_create shared.wr_finished");
389
390 T_QUIET;
391 T_ASSERT_NE_UINT(shared.wr_finished, (unsigned)MACH_PORT_NULL, "wr_finished semaphore_create");
392
393 T_QUIET;
394 T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(workqueue_fn, workqueue_write_fn, 0, 0), NULL);
395
396 struct kevent_qos_s events[] = {{
397 .ident = (uint64_t)shared.wr_fd,
398 .filter = EVFILT_WRITE,
399 .flags = EV_ADD | EV_UDATA_SPECIFIC | EV_DISPATCH | EV_VANISHED,
400 .fflags = NOTE_LOWAT,
401 .data = 1,
402 .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS,
403 0, 0)
404 }};
405
406 for (;;) {
407 int kev = kevent_qos(-1, changes == 0 ? NULL : events, changes,
408 events, 1, NULL, NULL,
409 KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS);
410 if (kev == -1 && errno == EINTR) {
411 changes = 0;
412 T_LOG("kevent_qos was interrupted");
413 continue;
414 }
415
416 T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent_qos");
417 break;
418 }
419 break;
420 }
421
422 case DISPATCH_INCREMENTAL_WRITE: {
423 dispatch_source_t write_src;
424
425 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_finished, SYNC_POLICY_FIFO, 0),
426 "semaphore_create shared.wr_finished");
427
428 T_QUIET;
429 T_ASSERT_NE_UINT(shared.wr_finished, (unsigned)MACH_PORT_NULL, "semaphore_create");
430
431 write_src = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE,
432 (uintptr_t)shared.wr_fd, 0, NULL);
433 T_QUIET; T_ASSERT_NOTNULL(write_src,
434 "dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE ...)");
435
436 dispatch_block_t handler = dispatch_block_create_with_qos_class(
437 DISPATCH_BLOCK_ENFORCE_QOS_CLASS, EXPECTED_QOS, 0, ^{
438 // T_MAYFAIL;
439 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
440 // "write handler block should run at correct QoS");
441 if (!handle_writing(shared.fd_pair, shared.wr_fd)) {
442 /* finished handling the fd, tear down the source */
443 dispatch_source_cancel(write_src);
444 dispatch_release(write_src);
445 T_LOG("signal shared.wr_finished");
446 semaphore_signal(shared.wr_finished);
447 }
448 });
449
450 dispatch_source_set_event_handler(write_src, handler);
451 dispatch_activate(write_src);
452
453 break;
454 }
455
456 default:
457 T_ASSERT_FAIL("unrecognized write mode: %d", shared.wr_mode);
458 break;
459 }
460
461 if (shared.wr_finished) {
462 T_LOG("wait shared.wr_finished");
463 kern_return_t kret = semaphore_timedwait(shared.wr_finished, WRITE_timeout);
464 if (kret == KERN_OPERATION_TIMED_OUT) {
465 T_ASSERT_FAIL("write side semaphore timedout after %d seconds", WRITE_timeout.tv_sec);
466 }
467 T_QUIET;
468 T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.wr_finished");
469 semaphore_destroy(mach_task_self(), shared.wr_finished);
470 }
471
472 T_LOG("writer finished, closing fd");
473 T_QUIET; T_ASSERT_POSIX_SUCCESS(close(shared.wr_fd), NULL);
474 return NULL;
475 }
476
477 #pragma mark reading
478
479 #define BUF_LEN 1024
480 static char final_string[BUF_LEN];
481 static size_t final_length;
482
483 /*
484 * Read from the master PTY descriptor.
485 *
486 * Returns false if EOF is encountered, and true otherwise.
487 */
488 static bool
489 handle_reading(enum fd_pair fd_pair, int fd)
490 {
491 char read_buf[BUF_LEN] = { 0 };
492 ssize_t bytes_rd = 0;
493
494 do {
495 if (bytes_rd == -1) {
496 T_LOG("read was interrupted, retrying");
497 }
498 bytes_rd = read(fd, read_buf, sizeof(read_buf) - 1);
499 } while (bytes_rd == -1 && errno == EINTR);
500
501 // T_LOG("read %zd bytes: '%s'", bytes_rd, read_buf);
502
503 T_QUIET; T_ASSERT_POSIX_SUCCESS(bytes_rd, "reading from file");
504 T_QUIET; T_ASSERT_LE(bytes_rd, (ssize_t)EXPECTED_LEN,
505 "read too much from file");
506
507 if (bytes_rd == 0) {
508 T_LOG("read EOF from file");
509 return false;
510 }
511
512 read_buf[bytes_rd] = '\0';
513 strlcpy(&(final_string[final_length]), read_buf,
514 sizeof(final_string) - final_length);
515 final_length += (size_t)bytes_rd;
516
517 T_QUIET; T_ASSERT_LE(final_length, EXPECTED_LEN,
518 "should not read more from file than what can be sent");
519
520 /* FIFOs don't send EOF when the write side closes */
521 if (final_length == strlen(EXPECTED_STRING) &&
522 (fd_pair == FIFO_PAIR))
523 {
524 T_LOG("read all expected bytes from FIFO");
525 return false;
526 }
527 return true;
528 }
529
530 static void
531 workqueue_read_fn(void ** __unused buf, int * __unused count)
532 {
533 // T_MAYFAIL;
534 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
535 // "reader thread should be requested at correct QoS");
536 if (!handle_reading(shared.fd_pair, shared.rd_fd)) {
537 T_LOG("signal shared.rd_finished");
538 semaphore_signal(shared.rd_finished);
539 }
540
541 reenable_workq(shared.rd_fd, EVFILT_READ);
542 }
543
544 static void
545 read_from_fd(int fd, enum fd_pair fd_pair, enum read_mode mode)
546 {
547 int fd_flags;
548
549 T_LOG("reader setting up");
550
551 bzero(final_string, sizeof(final_string));
552
553 fd_flags = fcntl(fd, F_GETFL, 0);
554 T_QUIET; T_ASSERT_POSIX_SUCCESS(fd_flags, "fcntl(F_GETFL)");
555
556 if (!(fd_flags & O_NONBLOCK)) {
557 T_QUIET;
558 T_ASSERT_POSIX_SUCCESS(fcntl(fd, F_SETFL,
559 fd_flags | O_NONBLOCK), NULL);
560 }
561
562 switch (mode) {
563 case POLL_READ: {
564 struct pollfd fds[] = { { .fd = fd, .events = POLLIN } };
565 wake_writer();
566
567 for (;;) {
568 fds[0].revents = 0;
569 int pol = poll(fds, 1, READ_TIMEOUT_SECS * 1000);
570 T_QUIET; T_ASSERT_POSIX_SUCCESS(pol, "poll");
571 T_QUIET; T_ASSERT_NE(pol, 0,
572 "poll should not time out after %d seconds, read %zd out "
573 "of %zu bytes",
574 READ_TIMEOUT_SECS, final_length, strlen(EXPECTED_STRING));
575 T_QUIET; T_ASSERT_FALSE(fds[0].revents & POLLERR,
576 "should not see an error on the device");
577 T_QUIET; T_ASSERT_FALSE(fds[0].revents & POLLNVAL,
578 "should not set up an invalid poll");
579
580 if (!handle_reading(fd_pair, fd)) {
581 break;
582 }
583 }
584 break;
585 }
586
587 case SELECT_READ:
588 wake_writer();
589
590 for (;;) {
591 struct timeval tv = { .tv_sec = READ_TIMEOUT_SECS };
592
593 fd_set read_fd;
594 FD_ZERO(&read_fd);
595 FD_SET(fd, &read_fd);
596 fd_set err_fd;
597 FD_ZERO(&err_fd);
598 FD_SET(fd, &err_fd);
599
600 int sel = select(fd + 1, &read_fd, NULL, NULL/*&err_fd*/, &tv);
601 if (sel == -1 && errno == EINTR) {
602 T_LOG("select interrupted");
603 continue;
604 }
605 (void)fd_pair;
606
607 T_QUIET; T_ASSERT_POSIX_SUCCESS(sel, "select");
608
609 T_QUIET; T_ASSERT_NE(sel, 0,
610 "select waited for %d seconds and timed out",
611 READ_TIMEOUT_SECS);
612
613 /* didn't fail or time out, therefore data is ready */
614 T_QUIET; T_ASSERT_NE(FD_ISSET(fd, &read_fd), 0,
615 "select should show reading fd as readable");
616
617 if (!handle_reading(fd_pair, fd)) {
618 break;
619 }
620 }
621 break;
622
623 case KEVENT_READ: /* FALLTHROUGH */
624 case KEVENT64_READ: /* FALLTHROUGH */
625 case KEVENT_QOS_READ: {
626 union mode rd_mode = { .rd = shared.rd_mode };
627 drive_kq(true, rd_mode, fd_pair, shared.rd_fd);
628 break;
629 }
630
631 case WORKQ_READ: {
632 // prohibit ourselves from going multi-threaded see:rdar://33296008
633 _dispatch_prohibit_transition_to_multithreaded(true);
634 T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(
635 workqueue_fn, workqueue_read_fn, 0, 0), NULL);
636
637 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.rd_finished, SYNC_POLICY_FIFO, 0),
638 "semaphore_create shared.rd_finished");
639
640 T_QUIET;
641 T_ASSERT_NE_UINT(shared.rd_finished, (unsigned)MACH_PORT_NULL, "semaphore_create");
642
643 int changes = 1;
644 struct kevent_qos_s events[] = {{
645 .ident = (uint64_t)shared.rd_fd,
646 .filter = EVFILT_READ,
647 .flags = EV_ADD | EV_UDATA_SPECIFIC | EV_DISPATCH | EV_VANISHED,
648 .fflags = NOTE_LOWAT,
649 .data = 1,
650 .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS,
651 0, 0)
652 }};
653
654 for (;;) {
655 int kev = kevent_qos(-1, changes == 0 ? NULL : events, changes,
656 events, 1, NULL, NULL,
657 KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS);
658 if (kev == -1 && errno == EINTR) {
659 changes = 0;
660 T_LOG("kevent_qos was interrupted");
661 continue;
662 }
663
664 T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent_qos");
665 break;
666 }
667
668 wake_writer();
669 break;
670 }
671
672 case DISPATCH_READ: {
673 dispatch_source_t read_src;
674
675 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.rd_finished, SYNC_POLICY_FIFO, 0),
676 "semaphore_create shared.rd_finished");
677
678 T_QUIET;
679 T_ASSERT_NE_UINT(shared.rd_finished, (unsigned)MACH_PORT_NULL, "semaphore_create");
680
681 read_src = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
682 (uintptr_t)fd, 0, NULL);
683 T_QUIET; T_ASSERT_NOTNULL(read_src,
684 "dispatch_source_create(DISPATCH_SOURCE_TYPE_READ)");
685
686 dispatch_block_t handler = dispatch_block_create_with_qos_class(
687 DISPATCH_BLOCK_ENFORCE_QOS_CLASS, EXPECTED_QOS, 0, ^{
688 // T_MAYFAIL;
689 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
690 // "read handler block should run at correct QoS");
691
692 if (!handle_reading(fd_pair, fd)) {
693 /* finished handling the fd, tear down the source */
694 dispatch_source_cancel(read_src);
695 dispatch_release(read_src);
696 T_LOG("signal shared.rd_finished");
697 semaphore_signal(shared.rd_finished);
698 }
699 });
700
701 dispatch_source_set_event_handler(read_src, handler);
702 dispatch_activate(read_src);
703
704 wake_writer();
705 break;
706 }
707
708 default:
709 T_ASSERT_FAIL("unrecognized read mode: %d", mode);
710 break;
711 }
712
713 if (shared.rd_finished) {
714 T_LOG("wait shared.rd_finished");
715 kern_return_t kret = semaphore_timedwait(shared.rd_finished, READ_timeout);
716 if (kret == KERN_OPERATION_TIMED_OUT) {
717 T_ASSERT_FAIL("reading timed out after %d seconds", READ_timeout.tv_sec);
718 }
719 T_QUIET;
720 T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.rd_finished");
721 }
722
723 T_EXPECT_EQ_STR(final_string, EXPECTED_STRING,
724 "reader should receive valid string");
725 T_QUIET; T_ASSERT_POSIX_SUCCESS(close(fd), NULL);
726 }
727
728 #pragma mark file setup
729
730 static void
731 fd_pair_init(enum fd_pair fd_pair, int *rd_fd, int *wr_fd)
732 {
733 switch (fd_pair) {
734 case PTY_PAIR:
735 T_ASSERT_POSIX_SUCCESS(openpty(rd_fd, wr_fd, NULL, NULL, NULL),
736 NULL);
737 break;
738
739 case FIFO_PAIR: {
740 char fifo_path[] = "/tmp/async-io-fifo.XXXXXX";
741 T_QUIET; T_ASSERT_NOTNULL(mktemp(fifo_path), NULL);
742
743 T_ASSERT_POSIX_SUCCESS(mkfifo(fifo_path, 0700), "mkfifo(%s, 0700)",
744 fifo_path);
745 /*
746 * Opening the read side of a pipe will block until the write
747 * side opens -- use O_NONBLOCK.
748 */
749 *rd_fd = open(fifo_path, O_RDONLY | O_NONBLOCK);
750 T_QUIET; T_ASSERT_POSIX_SUCCESS(*rd_fd, "open(... O_RDONLY)");
751 *wr_fd = open(fifo_path, O_WRONLY | O_NONBLOCK);
752 T_QUIET; T_ASSERT_POSIX_SUCCESS(*wr_fd, "open(... O_WRONLY)");
753 break;
754 }
755
756 case PIPE_PAIR: {
757 int pipe_fds[2];
758 T_ASSERT_POSIX_SUCCESS(pipe(pipe_fds), NULL);
759 *rd_fd = pipe_fds[0];
760 *wr_fd = pipe_fds[1];
761 break;
762 }
763
764 case SOCKET_PAIR: {
765 int sock_fds[2];
766 T_ASSERT_POSIX_SUCCESS(socketpair(AF_UNIX, SOCK_STREAM, 0, sock_fds),
767 NULL);
768 *rd_fd = sock_fds[0];
769 *wr_fd = sock_fds[1];
770 break;
771 }
772
773 default:
774 T_ASSERT_FAIL("unknown descriptor pair type: %d", fd_pair);
775 break;
776 }
777
778 T_QUIET; T_ASSERT_NE(*rd_fd, -1, "reading descriptor");
779 T_QUIET; T_ASSERT_NE(*wr_fd, -1, "writing descriptor");
780 }
781
782 #pragma mark single process
783
784 static void
785 drive_threads(enum fd_pair fd_pair, enum read_mode rd_mode,
786 enum write_mode wr_mode)
787 {
788 pthread_t thread;
789
790 shared.fd_pair = fd_pair;
791 shared.rd_mode = rd_mode;
792 shared.wr_mode = wr_mode;
793 fd_pair_init(fd_pair, &(shared.rd_fd), &(shared.wr_fd));
794
795 shared.wr_kind = THREAD_WRITER;
796 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_wait.sem, SYNC_POLICY_FIFO, 0),
797 "semaphore_create shared.wr_wait.sem");
798
799 T_QUIET;
800 T_ASSERT_POSIX_ZERO(pthread_create(&thread, NULL, write_to_fd, NULL),
801 NULL);
802 T_LOG("created writer thread");
803
804 read_from_fd(shared.rd_fd, fd_pair, rd_mode);
805
806 T_ASSERT_POSIX_ZERO(pthread_join(thread, NULL), NULL);
807
808 T_END;
809 }
810
811 #pragma mark multiple processes
812
813 static void __attribute__((noreturn))
814 drive_processes(enum fd_pair fd_pair, enum read_mode rd_mode, enum write_mode wr_mode)
815 {
816 shared.fd_pair = fd_pair;
817 shared.rd_mode = rd_mode;
818 shared.wr_mode = wr_mode;
819 fd_pair_init(fd_pair, &(shared.rd_fd), &(shared.wr_fd));
820
821 shared.wr_kind = PROCESS_WRITER;
822 int fds[2];
823 T_QUIET; T_ASSERT_POSIX_SUCCESS(pipe(fds), NULL);
824 shared.wr_wait.out_fd = fds[0];
825 shared.wr_wait.in_fd = fds[1];
826
827 T_LOG("starting subprocesses");
828 dt_helper_t helpers[2] = {
829 dt_fork_helper("reader_helper"),
830 dt_fork_helper("writer_helper")
831 };
832
833 close(shared.rd_fd);
834 close(shared.wr_fd);
835
836 dt_run_helpers(helpers, 2, 50000);
837 }
838
839 T_HELPER_DECL(reader_helper, "Read asynchronously")
840 {
841 close(shared.wr_fd);
842 read_from_fd(shared.rd_fd, shared.fd_pair, shared.rd_mode);
843 T_END;
844 }
845
846 T_HELPER_DECL(writer_helper, "Write asynchronously")
847 {
848 close(shared.rd_fd);
849 write_to_fd(NULL);
850 }
851
852 #pragma mark tests
853
854 #define WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \
855 write_mode, read_name, read_mode) \
856 T_DECL(desc_name##_r##read_name##_w##write_name##_procs, "read changes to a " \
857 #desc_name " with " #read_name " and writing " #write_str \
858 " across two processes") \
859 { \
860 drive_processes(fd_pair, read_mode, write_mode); \
861 }
862 #define WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \
863 write_mode, read_name, read_mode) \
864 T_DECL(desc_name##_r##read_name##_w##write_name##_thds, "read changes to a " \
865 #desc_name " with " #read_name " and writing " #write_str) \
866 { \
867 drive_threads(fd_pair, read_mode, write_mode); \
868 }
869
870 #define WR_DECL(desc_name, fd_pair, write_name, write_str, write_mode, \
871 read_name, read_mode) \
872 WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \
873 write_mode, read_name, read_mode) \
874 WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \
875 write_mode, read_name, read_mode)
876
877 #define RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
878 WR_DECL(desc_name, fd_pair, full, "the full string", FULL_WRITE, \
879 read_name, read_mode) \
880 WR_DECL(desc_name, fd_pair, inc, "incrementally", \
881 INCREMENTAL_WRITE, read_name, read_mode)
882
883 #define RD_DECL_DISPATCH_ONLY(suffix, desc_name, fd_pair, read_name, \
884 read_mode) \
885 WR_DECL##suffix(desc_name, fd_pair, inc_dispatch, \
886 "incrementally with a dispatch source", \
887 DISPATCH_INCREMENTAL_WRITE, read_name, read_mode)
888 #define RD_DECL_WORKQ_ONLY(suffix, desc_name, fd_pair, read_name, \
889 read_mode) \
890 WR_DECL##suffix(desc_name, fd_pair, inc_workq, \
891 "incrementally with the workqueue", \
892 WORKQ_INCREMENTAL_WRITE, read_name, read_mode)
893
894 #define RD_DECL(desc_name, fd_pair, read_name, read_mode) \
895 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
896 RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode)
897 // RD_DECL_WORKQ_ONLY(, desc_name, fd_pair, read_name, read_mode)
898
899 /*
900 * dispatch_source tests cannot share the same process as other workqueue
901 * tests.
902 */
903 #define RD_DECL_DISPATCH(desc_name, fd_pair, read_name, read_mode) \
904 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
905 RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode) \
906 RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
907 read_mode)
908
909 /*
910 * Workqueue tests cannot share the same process as other workqueue or
911 * dispatch_source tests.
912 #define RD_DECL_WORKQ(desc_name, fd_pair, read_name, read_mode) \
913 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
914 RD_DECL_DISPATCH_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
915 read_mode) \
916 RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
917 read_mode)
918 */
919
920 #define PAIR_DECL(desc_name, fd_pair) \
921 RD_DECL(desc_name, fd_pair, poll, POLL_READ) \
922 RD_DECL(desc_name, fd_pair, select, SELECT_READ) \
923 RD_DECL(desc_name, fd_pair, kevent, KEVENT_READ) \
924 RD_DECL(desc_name, fd_pair, kevent64, KEVENT64_READ) \
925 RD_DECL(desc_name, fd_pair, kevent_qos, KEVENT_QOS_READ) \
926 RD_DECL_DISPATCH(desc_name, fd_pair, dispatch_source, DISPATCH_READ)
927 // RD_DECL_WORKQ(desc_name, fd_pair, workq, WORKQ_READ)
928
929 PAIR_DECL(tty, PTY_PAIR)
930 PAIR_DECL(pipe, PIPE_PAIR)
931 PAIR_DECL(fifo, FIFO_PAIR)
932 PAIR_DECL(socket, SOCKET_PAIR)