]> git.saurik.com Git - apple/xnu.git/blob - tests/poll_select_kevent_paired_fds.c
xnu-6153.81.5.tar.gz
[apple/xnu.git] / tests / poll_select_kevent_paired_fds.c
1 #ifdef T_NAMESPACE
2 #undef T_NAMESPACE
3 #endif
4
5 #include <darwintest.h>
6 #include <mach/mach.h>
7 #include <darwintest_multiprocess.h>
8
9 #include <assert.h>
10 #include <dispatch/dispatch.h>
11 #include <dispatch/private.h>
12 #include <err.h>
13 #include <errno.h>
14 #include <fcntl.h>
15 #include <poll.h>
16 #include <pthread.h>
17 #include <pthread/workqueue_private.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <sys/event.h>
22 #include <sys/socket.h>
23 #include <sys/stat.h>
24 #include <sys/time.h>
25 #include <sys/types.h>
26 #include <sys/wait.h>
27 #include <sysexits.h>
28 #include <unistd.h>
29 #include <util.h>
30 #include <System/sys/event.h> /* kevent_qos */
31
32 T_GLOBAL_META(
33 T_META_NAMESPACE("xnu.kevent"),
34 T_META_CHECK_LEAKS(false),
35 T_META_LTEPHASE(LTE_POSTINIT));
36
37 /*
38 * Test to validate that monitoring a PTY device, FIFO, pipe, or socket pair in
39 * a dispatch source, kqueue, poll, or select delivers read events within and
40 * between processes as expected.
41 *
42 * This test catches issues with watching special devices in kqueue(),
43 * which has tricky special cases for character devices like PTYs.
44 *
45 * It also exercises the path to wake up a dispatch worker thread from the
46 * special device kqueue event, which is also a special case in kqueue().
47 *
48 * See rdar://problem/26240299&26220074&26226862&28625427 for examples and
49 * history.
50 */
51
52 #define EXPECTED_STRING "abcdefghijklmnopqrstuvwxyz. ABCDEFGHIJKLMNOPQRSTUVWXYZ. 1234567890"
53 #define EXPECTED_LEN strlen(EXPECTED_STRING)
54
55 #define READ_SETUP_TIMEOUT_SECS 2
56 #define WRITE_TIMEOUT_SECS 4
57 #define READ_TIMEOUT_SECS 4
58 #define INCREMENTAL_WRITE_SLEEP_USECS 50
59
60 static mach_timespec_t READ_SETUP_timeout = {.tv_sec = READ_SETUP_TIMEOUT_SECS, .tv_nsec = 0};
61 static mach_timespec_t READ_timeout = {.tv_sec = READ_TIMEOUT_SECS, .tv_nsec = 0};
62 static mach_timespec_t WRITE_timeout = {.tv_sec = WRITE_TIMEOUT_SECS, .tv_nsec = 0};
63
64 enum fd_pair {
65 PTY_PAIR,
66 FIFO_PAIR,
67 PIPE_PAIR,
68 SOCKET_PAIR
69 };
70
71 enum write_mode {
72 FULL_WRITE,
73 INCREMENTAL_WRITE,
74 KEVENT_INCREMENTAL_WRITE,
75 KEVENT64_INCREMENTAL_WRITE,
76 KEVENT_QOS_INCREMENTAL_WRITE,
77 WORKQ_INCREMENTAL_WRITE,
78 DISPATCH_INCREMENTAL_WRITE
79 };
80
81 enum read_mode {
82 POLL_READ,
83 SELECT_READ,
84 KEVENT_READ,
85 KEVENT64_READ,
86 KEVENT_QOS_READ,
87 WORKQ_READ,
88 DISPATCH_READ
89 };
90
91 union mode {
92 enum read_mode rd;
93 enum write_mode wr;
94 };
95
96 static struct {
97 enum fd_pair fd_pair;
98 enum write_mode wr_mode;
99 int wr_fd;
100 enum read_mode rd_mode;
101 int rd_fd;
102
103 enum writer_kind {
104 THREAD_WRITER, /* sem */
105 PROCESS_WRITER /* fd */
106 } wr_kind;
107 union {
108 semaphore_t sem;
109 struct {
110 int in_fd;
111 int out_fd;
112 };
113 } wr_wait;
114 semaphore_t wr_finished;
115 semaphore_t rd_finished;
116 } shared;
117
118 static bool handle_reading(enum fd_pair fd_pair, int fd);
119 static bool handle_writing(enum fd_pair fd_pair, int fd);
120 static void drive_kq(bool reading, union mode mode, enum fd_pair fd_pair,
121 int fd);
122
123 #pragma mark writing
124
125 static void
126 wake_writer(void)
127 {
128 T_LOG("waking writer");
129
130 switch (shared.wr_kind) {
131 case THREAD_WRITER:
132 T_LOG("signal shared.wr_wait.sem");
133 semaphore_signal(shared.wr_wait.sem);
134 break;
135 case PROCESS_WRITER: {
136 char tmp = 'a';
137 close(shared.wr_wait.out_fd);
138 T_QUIET; T_ASSERT_POSIX_SUCCESS(write(
139 shared.wr_wait.in_fd, &tmp, 1), NULL);
140 break;
141 }
142 }
143 }
144
145 static void
146 writer_wait(void)
147 {
148 switch (shared.wr_kind) {
149 case THREAD_WRITER:
150 T_LOG("wait shared.wr_wait.sem");
151 kern_return_t kret = semaphore_timedwait(shared.wr_wait.sem, READ_SETUP_timeout);
152
153 if (kret == KERN_OPERATION_TIMED_OUT) {
154 T_ASSERT_FAIL("THREAD_WRITER semaphore timedout after %d seconds", READ_SETUP_timeout.tv_sec);
155 }
156 T_QUIET;
157 T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.wr_wait.sem");
158 break;
159
160 case PROCESS_WRITER: {
161 char tmp;
162 close(shared.wr_wait.in_fd);
163 T_QUIET; T_ASSERT_POSIX_SUCCESS(read(
164 shared.wr_wait.out_fd, &tmp, 1), NULL);
165 break;
166 }
167 }
168
169 T_LOG("writer woken up, starting to write");
170 }
171
172 static bool
173 handle_writing(enum fd_pair __unused fd_pair, int fd)
174 {
175 static unsigned int cur_char = 0;
176 T_QUIET; T_ASSERT_POSIX_SUCCESS(write(fd,
177 &(EXPECTED_STRING[cur_char]), 1), NULL);
178 cur_char++;
179
180 return cur_char < EXPECTED_LEN;
181 }
182
183 #define EXPECTED_QOS QOS_CLASS_USER_INITIATED
184
185 static void
186 reenable_workq(int fd, int16_t filt)
187 {
188 struct kevent_qos_s events[] = {{
189 .ident = (uint64_t)fd,
190 .filter = filt,
191 .flags = EV_ENABLE | EV_UDATA_SPECIFIC | EV_DISPATCH,
192 .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS,
193 0, 0),
194 .fflags = NOTE_LOWAT,
195 .data = 1
196 }};
197
198 int kev = kevent_qos(-1, events, 1, events, 1, NULL, NULL,
199 KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS);
200 T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "reenable workq in kevent_qos");
201 }
202
203 static void
204 workqueue_write_fn(void ** __unused buf, int * __unused count)
205 {
206 // T_MAYFAIL;
207 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
208 // "writer thread should be woken up at correct QoS");
209 if (!handle_writing(shared.fd_pair, shared.wr_fd)) {
210 /* finished handling the fd, tear down the source */
211 T_LOG("signal shared.wr_finished");
212 semaphore_signal(shared.wr_finished);
213 return;
214 }
215
216 reenable_workq(shared.wr_fd, EVFILT_WRITE);
217 }
218
219 static void
220 workqueue_fn(pthread_priority_t __unused priority)
221 {
222 T_ASSERT_FAIL("workqueue function callback was called");
223 }
224
225 static void
226 drive_kq(bool reading, union mode mode, enum fd_pair fd_pair, int fd)
227 {
228 struct timespec timeout = { .tv_sec = READ_TIMEOUT_SECS };
229 int kev = -1;
230
231 struct kevent events;
232 EV_SET(&events, fd, reading ? EVFILT_READ : EVFILT_WRITE, EV_ADD,
233 NOTE_LOWAT, 1, NULL);
234 struct kevent64_s events64;
235 EV_SET64(&events64, fd, reading ? EVFILT_READ : EVFILT_WRITE, EV_ADD,
236 NOTE_LOWAT, 1, 0, 0, 0);
237 struct kevent_qos_s events_qos[] = {{
238 .ident = (uint64_t)fd,
239 .filter = reading ? EVFILT_READ : EVFILT_WRITE,
240 .flags = EV_ADD,
241 .fflags = NOTE_LOWAT,
242 .data = 1
243 }, {
244 .ident = 0,
245 .filter = EVFILT_TIMER,
246 .flags = EV_ADD,
247 .fflags = NOTE_SECONDS,
248 .data = READ_TIMEOUT_SECS
249 }};
250
251 /* determine which variant of kevent to use */
252 enum read_mode which_kevent;
253 if (reading) {
254 which_kevent = mode.rd;
255 } else {
256 if (mode.wr == KEVENT_INCREMENTAL_WRITE) {
257 which_kevent = KEVENT_READ;
258 } else if (mode.wr == KEVENT64_INCREMENTAL_WRITE) {
259 which_kevent = KEVENT64_READ;
260 } else if (mode.wr == KEVENT_QOS_INCREMENTAL_WRITE) {
261 which_kevent = KEVENT_QOS_READ;
262 } else {
263 T_ASSERT_FAIL("unexpected mode: %d", mode.wr);
264 __builtin_unreachable();
265 }
266 }
267
268 int kq_fd = kqueue();
269 T_QUIET; T_ASSERT_POSIX_SUCCESS(kq_fd, "kqueue");
270
271 switch (which_kevent) {
272 case KEVENT_READ:
273 kev = kevent(kq_fd, &events, 1, NULL, 0, NULL);
274 break;
275 case KEVENT64_READ:
276 kev = kevent64(kq_fd, &events64, 1, NULL, 0, 0, NULL);
277 break;
278 case KEVENT_QOS_READ:
279 kev = kevent_qos(kq_fd, events_qos, 2, NULL, 0, NULL, NULL, 0);
280 break;
281 case POLL_READ: /* FALLTHROUGH */
282 case SELECT_READ: /* FALLTHROUGH */
283 case DISPATCH_READ: /* FALLTHROUGH */
284 case WORKQ_READ: /* FALLTHROUGH */
285 default:
286 T_ASSERT_FAIL("unexpected mode: %d", reading ? mode.rd : mode.wr);
287 break;
288 }
289
290 if (reading) {
291 wake_writer();
292 } else {
293 writer_wait();
294 }
295
296 for (;;) {
297 switch (which_kevent) {
298 case KEVENT_READ:
299 kev = kevent(kq_fd, NULL, 0, &events, 1, &timeout);
300 break;
301 case KEVENT64_READ:
302 kev = kevent64(kq_fd, NULL, 0, &events64, 1, 0, &timeout);
303 break;
304 case KEVENT_QOS_READ:
305 kev = kevent_qos(kq_fd, NULL, 0, events_qos, 2, NULL, NULL, 0);
306
307 /* check for a timeout */
308 for (int i = 0; i < kev; i++) {
309 if (events_qos[i].filter == EVFILT_TIMER) {
310 kev = 0;
311 }
312 }
313 break;
314 case POLL_READ: /* FALLTHROUGH */
315 case SELECT_READ: /* FALLTHROUGH */
316 case DISPATCH_READ: /* FALLTHROUGH */
317 case WORKQ_READ: /* FALLTHROUGH */
318 default:
319 T_ASSERT_FAIL("unexpected mode: %d", reading ? mode.rd : mode.wr);
320 break;
321 }
322
323 if (kev == -1 && errno == EINTR) {
324 T_LOG("kevent was interrupted");
325 continue;
326 }
327 T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent");
328 T_QUIET; T_ASSERT_NE(kev, 0, "kevent timed out");
329
330 if (reading) {
331 if (!handle_reading(fd_pair, fd)) {
332 break;
333 }
334 } else {
335 if (!handle_writing(fd_pair, fd)) {
336 break;
337 }
338 }
339 }
340
341 close(kq_fd);
342 }
343
344 static void *
345 write_to_fd(void * __unused ctx)
346 {
347 ssize_t bytes_wr = 0;
348
349 writer_wait();
350
351 switch (shared.wr_mode) {
352 case FULL_WRITE:
353 do {
354 if (bytes_wr == -1) {
355 T_LOG("write from child was interrupted");
356 }
357 bytes_wr = write(shared.wr_fd, EXPECTED_STRING,
358 EXPECTED_LEN);
359 } while (bytes_wr == -1 && errno == EINTR);
360 T_QUIET; T_ASSERT_POSIX_SUCCESS(bytes_wr, "write");
361 T_QUIET; T_ASSERT_EQ(bytes_wr, (ssize_t)EXPECTED_LEN,
362 "wrote enough bytes");
363 break;
364
365 case INCREMENTAL_WRITE:
366 for (unsigned int i = 0; i < EXPECTED_LEN; i++) {
367 T_QUIET;
368 T_ASSERT_POSIX_SUCCESS(write(shared.wr_fd,
369 &(EXPECTED_STRING[i]), 1), NULL);
370 usleep(INCREMENTAL_WRITE_SLEEP_USECS);
371 }
372 break;
373
374 case KEVENT_INCREMENTAL_WRITE: /* FALLTHROUGH */
375 case KEVENT64_INCREMENTAL_WRITE: /* FALLTHROUGH */
376 case KEVENT_QOS_INCREMENTAL_WRITE: {
377 union mode mode = { .wr = shared.wr_mode };
378 drive_kq(false, mode, shared.fd_pair, shared.wr_fd);
379 break;
380 }
381
382 case WORKQ_INCREMENTAL_WRITE: {
383 // prohibit ourselves from going multi-threaded see:rdar://33296008
384 _dispatch_prohibit_transition_to_multithreaded(true);
385 int changes = 1;
386
387 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_finished, SYNC_POLICY_FIFO, 0),
388 "semaphore_create shared.wr_finished");
389
390 T_QUIET;
391 T_ASSERT_NE_UINT(shared.wr_finished, (unsigned)MACH_PORT_NULL, "wr_finished semaphore_create");
392
393 T_QUIET;
394 T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(workqueue_fn, workqueue_write_fn, 0, 0), NULL);
395
396 struct kevent_qos_s events[] = {{
397 .ident = (uint64_t)shared.wr_fd,
398 .filter = EVFILT_WRITE,
399 .flags = EV_ADD | EV_UDATA_SPECIFIC | EV_DISPATCH | EV_VANISHED,
400 .fflags = NOTE_LOWAT,
401 .data = 1,
402 .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS,
403 0, 0)
404 }};
405
406 for (;;) {
407 int kev = kevent_qos(-1, changes == 0 ? NULL : events, changes,
408 events, 1, NULL, NULL,
409 KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS);
410 if (kev == -1 && errno == EINTR) {
411 changes = 0;
412 T_LOG("kevent_qos was interrupted");
413 continue;
414 }
415
416 T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent_qos");
417 break;
418 }
419 break;
420 }
421
422 case DISPATCH_INCREMENTAL_WRITE: {
423 dispatch_source_t write_src;
424
425 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_finished, SYNC_POLICY_FIFO, 0),
426 "semaphore_create shared.wr_finished");
427
428 T_QUIET;
429 T_ASSERT_NE_UINT(shared.wr_finished, (unsigned)MACH_PORT_NULL, "semaphore_create");
430
431 write_src = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE,
432 (uintptr_t)shared.wr_fd, 0, NULL);
433 T_QUIET; T_ASSERT_NOTNULL(write_src,
434 "dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE ...)");
435
436 dispatch_block_t handler = dispatch_block_create_with_qos_class(
437 DISPATCH_BLOCK_ENFORCE_QOS_CLASS, EXPECTED_QOS, 0, ^{
438 // T_MAYFAIL;
439 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
440 // "write handler block should run at correct QoS");
441 if (!handle_writing(shared.fd_pair, shared.wr_fd)) {
442 /* finished handling the fd, tear down the source */
443 dispatch_source_cancel(write_src);
444 dispatch_release(write_src);
445 T_LOG("signal shared.wr_finished");
446 semaphore_signal(shared.wr_finished);
447 }
448 });
449
450 dispatch_source_set_event_handler(write_src, handler);
451 dispatch_activate(write_src);
452
453 break;
454 }
455
456 default:
457 T_ASSERT_FAIL("unrecognized write mode: %d", shared.wr_mode);
458 break;
459 }
460
461 if (shared.wr_finished) {
462 T_LOG("wait shared.wr_finished");
463 kern_return_t kret = semaphore_timedwait(shared.wr_finished, WRITE_timeout);
464 if (kret == KERN_OPERATION_TIMED_OUT) {
465 T_ASSERT_FAIL("write side semaphore timedout after %d seconds", WRITE_timeout.tv_sec);
466 }
467 T_QUIET;
468 T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.wr_finished");
469 semaphore_destroy(mach_task_self(), shared.wr_finished);
470 }
471
472 T_LOG("writer finished, closing fd");
473 T_QUIET; T_ASSERT_POSIX_SUCCESS(close(shared.wr_fd), NULL);
474 return NULL;
475 }
476
477 #pragma mark reading
478
479 #define BUF_LEN 1024
480 static char final_string[BUF_LEN];
481 static size_t final_length;
482
483 /*
484 * Read from the master PTY descriptor.
485 *
486 * Returns false if EOF is encountered, and true otherwise.
487 */
488 static bool
489 handle_reading(enum fd_pair fd_pair, int fd)
490 {
491 char read_buf[BUF_LEN] = { 0 };
492 ssize_t bytes_rd = 0;
493
494 do {
495 if (bytes_rd == -1) {
496 T_LOG("read was interrupted, retrying");
497 }
498 bytes_rd = read(fd, read_buf, sizeof(read_buf) - 1);
499 } while (bytes_rd == -1 && errno == EINTR);
500
501 // T_LOG("read %zd bytes: '%s'", bytes_rd, read_buf);
502
503 T_QUIET; T_ASSERT_POSIX_SUCCESS(bytes_rd, "reading from file");
504 T_QUIET; T_ASSERT_LE(bytes_rd, (ssize_t)EXPECTED_LEN,
505 "read too much from file");
506
507 if (bytes_rd == 0) {
508 T_LOG("read EOF from file");
509 return false;
510 }
511
512 read_buf[bytes_rd] = '\0';
513 strlcpy(&(final_string[final_length]), read_buf,
514 sizeof(final_string) - final_length);
515 final_length += (size_t)bytes_rd;
516
517 T_QUIET; T_ASSERT_LE(final_length, EXPECTED_LEN,
518 "should not read more from file than what can be sent");
519
520 /* FIFOs don't send EOF when the write side closes */
521 if (final_length == strlen(EXPECTED_STRING) &&
522 (fd_pair == FIFO_PAIR)) {
523 T_LOG("read all expected bytes from FIFO");
524 return false;
525 }
526 return true;
527 }
528
529 static void
530 workqueue_read_fn(void ** __unused buf, int * __unused count)
531 {
532 // T_MAYFAIL;
533 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
534 // "reader thread should be requested at correct QoS");
535 if (!handle_reading(shared.fd_pair, shared.rd_fd)) {
536 T_LOG("signal shared.rd_finished");
537 semaphore_signal(shared.rd_finished);
538 }
539
540 reenable_workq(shared.rd_fd, EVFILT_READ);
541 }
542
543 static void
544 read_from_fd(int fd, enum fd_pair fd_pair, enum read_mode mode)
545 {
546 int fd_flags;
547
548 T_LOG("reader setting up");
549
550 bzero(final_string, sizeof(final_string));
551
552 fd_flags = fcntl(fd, F_GETFL, 0);
553 T_QUIET; T_ASSERT_POSIX_SUCCESS(fd_flags, "fcntl(F_GETFL)");
554
555 if (!(fd_flags & O_NONBLOCK)) {
556 T_QUIET;
557 T_ASSERT_POSIX_SUCCESS(fcntl(fd, F_SETFL,
558 fd_flags | O_NONBLOCK), NULL);
559 }
560
561 switch (mode) {
562 case POLL_READ: {
563 struct pollfd fds[] = { { .fd = fd, .events = POLLIN } };
564 wake_writer();
565
566 for (;;) {
567 fds[0].revents = 0;
568 int pol = poll(fds, 1, READ_TIMEOUT_SECS * 1000);
569 T_QUIET; T_ASSERT_POSIX_SUCCESS(pol, "poll");
570 T_QUIET; T_ASSERT_NE(pol, 0,
571 "poll should not time out after %d seconds, read %zd out "
572 "of %zu bytes",
573 READ_TIMEOUT_SECS, final_length, strlen(EXPECTED_STRING));
574 T_QUIET; T_ASSERT_FALSE(fds[0].revents & POLLERR,
575 "should not see an error on the device");
576 T_QUIET; T_ASSERT_FALSE(fds[0].revents & POLLNVAL,
577 "should not set up an invalid poll");
578
579 if (!handle_reading(fd_pair, fd)) {
580 break;
581 }
582 }
583 break;
584 }
585
586 case SELECT_READ:
587 wake_writer();
588
589 for (;;) {
590 struct timeval tv = { .tv_sec = READ_TIMEOUT_SECS };
591
592 fd_set read_fd;
593 FD_ZERO(&read_fd);
594 FD_SET(fd, &read_fd);
595 fd_set err_fd;
596 FD_ZERO(&err_fd);
597 FD_SET(fd, &err_fd);
598
599 int sel = select(fd + 1, &read_fd, NULL, NULL /*&err_fd*/, &tv);
600 if (sel == -1 && errno == EINTR) {
601 T_LOG("select interrupted");
602 continue;
603 }
604 (void)fd_pair;
605
606 T_QUIET; T_ASSERT_POSIX_SUCCESS(sel, "select");
607
608 T_QUIET; T_ASSERT_NE(sel, 0,
609 "select waited for %d seconds and timed out",
610 READ_TIMEOUT_SECS);
611
612 /* didn't fail or time out, therefore data is ready */
613 T_QUIET; T_ASSERT_NE(FD_ISSET(fd, &read_fd), 0,
614 "select should show reading fd as readable");
615
616 if (!handle_reading(fd_pair, fd)) {
617 break;
618 }
619 }
620 break;
621
622 case KEVENT_READ: /* FALLTHROUGH */
623 case KEVENT64_READ: /* FALLTHROUGH */
624 case KEVENT_QOS_READ: {
625 union mode rd_mode = { .rd = shared.rd_mode };
626 drive_kq(true, rd_mode, fd_pair, shared.rd_fd);
627 break;
628 }
629
630 case WORKQ_READ: {
631 // prohibit ourselves from going multi-threaded see:rdar://33296008
632 _dispatch_prohibit_transition_to_multithreaded(true);
633 T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(
634 workqueue_fn, workqueue_read_fn, 0, 0), NULL);
635
636 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.rd_finished, SYNC_POLICY_FIFO, 0),
637 "semaphore_create shared.rd_finished");
638
639 T_QUIET;
640 T_ASSERT_NE_UINT(shared.rd_finished, (unsigned)MACH_PORT_NULL, "semaphore_create");
641
642 int changes = 1;
643 struct kevent_qos_s events[] = {{
644 .ident = (uint64_t)shared.rd_fd,
645 .filter = EVFILT_READ,
646 .flags = EV_ADD | EV_UDATA_SPECIFIC | EV_DISPATCH | EV_VANISHED,
647 .fflags = NOTE_LOWAT,
648 .data = 1,
649 .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS,
650 0, 0)
651 }};
652
653 for (;;) {
654 int kev = kevent_qos(-1, changes == 0 ? NULL : events, changes,
655 events, 1, NULL, NULL,
656 KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS);
657 if (kev == -1 && errno == EINTR) {
658 changes = 0;
659 T_LOG("kevent_qos was interrupted");
660 continue;
661 }
662
663 T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent_qos");
664 break;
665 }
666
667 wake_writer();
668 break;
669 }
670
671 case DISPATCH_READ: {
672 dispatch_source_t read_src;
673
674 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.rd_finished, SYNC_POLICY_FIFO, 0),
675 "semaphore_create shared.rd_finished");
676
677 T_QUIET;
678 T_ASSERT_NE_UINT(shared.rd_finished, (unsigned)MACH_PORT_NULL, "semaphore_create");
679
680 read_src = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
681 (uintptr_t)fd, 0, NULL);
682 T_QUIET; T_ASSERT_NOTNULL(read_src,
683 "dispatch_source_create(DISPATCH_SOURCE_TYPE_READ)");
684
685 dispatch_block_t handler = dispatch_block_create_with_qos_class(
686 DISPATCH_BLOCK_ENFORCE_QOS_CLASS, EXPECTED_QOS, 0, ^{
687 // T_MAYFAIL;
688 // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS,
689 // "read handler block should run at correct QoS");
690
691 if (!handle_reading(fd_pair, fd)) {
692 /* finished handling the fd, tear down the source */
693 dispatch_source_cancel(read_src);
694 dispatch_release(read_src);
695 T_LOG("signal shared.rd_finished");
696 semaphore_signal(shared.rd_finished);
697 }
698 });
699
700 dispatch_source_set_event_handler(read_src, handler);
701 dispatch_activate(read_src);
702
703 wake_writer();
704 break;
705 }
706
707 default:
708 T_ASSERT_FAIL("unrecognized read mode: %d", mode);
709 break;
710 }
711
712 if (shared.rd_finished) {
713 T_LOG("wait shared.rd_finished");
714 kern_return_t kret = semaphore_timedwait(shared.rd_finished, READ_timeout);
715 if (kret == KERN_OPERATION_TIMED_OUT) {
716 T_ASSERT_FAIL("reading timed out after %d seconds", READ_timeout.tv_sec);
717 }
718 T_QUIET;
719 T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.rd_finished");
720 }
721
722 T_EXPECT_EQ_STR(final_string, EXPECTED_STRING,
723 "reader should receive valid string");
724 T_QUIET; T_ASSERT_POSIX_SUCCESS(close(fd), NULL);
725 }
726
727 #pragma mark file setup
728
729 static void
730 fd_pair_init(enum fd_pair fd_pair, int *rd_fd, int *wr_fd)
731 {
732 switch (fd_pair) {
733 case PTY_PAIR:
734 T_ASSERT_POSIX_SUCCESS(openpty(rd_fd, wr_fd, NULL, NULL, NULL),
735 NULL);
736 break;
737
738 case FIFO_PAIR: {
739 char fifo_path[] = "/tmp/async-io-fifo.XXXXXX";
740 T_QUIET; T_ASSERT_NOTNULL(mktemp(fifo_path), NULL);
741
742 T_ASSERT_POSIX_SUCCESS(mkfifo(fifo_path, 0700), "mkfifo(%s, 0700)",
743 fifo_path);
744 /*
745 * Opening the read side of a pipe will block until the write
746 * side opens -- use O_NONBLOCK.
747 */
748 *rd_fd = open(fifo_path, O_RDONLY | O_NONBLOCK);
749 T_QUIET; T_ASSERT_POSIX_SUCCESS(*rd_fd, "open(... O_RDONLY)");
750 *wr_fd = open(fifo_path, O_WRONLY | O_NONBLOCK);
751 T_QUIET; T_ASSERT_POSIX_SUCCESS(*wr_fd, "open(... O_WRONLY)");
752 break;
753 }
754
755 case PIPE_PAIR: {
756 int pipe_fds[2];
757 T_ASSERT_POSIX_SUCCESS(pipe(pipe_fds), NULL);
758 *rd_fd = pipe_fds[0];
759 *wr_fd = pipe_fds[1];
760 break;
761 }
762
763 case SOCKET_PAIR: {
764 int sock_fds[2];
765 T_ASSERT_POSIX_SUCCESS(socketpair(AF_UNIX, SOCK_STREAM, 0, sock_fds),
766 NULL);
767 *rd_fd = sock_fds[0];
768 *wr_fd = sock_fds[1];
769 break;
770 }
771
772 default:
773 T_ASSERT_FAIL("unknown descriptor pair type: %d", fd_pair);
774 break;
775 }
776
777 T_QUIET; T_ASSERT_NE(*rd_fd, -1, "reading descriptor");
778 T_QUIET; T_ASSERT_NE(*wr_fd, -1, "writing descriptor");
779 }
780
781 #pragma mark single process
782
783 static void
784 drive_threads(enum fd_pair fd_pair, enum read_mode rd_mode,
785 enum write_mode wr_mode)
786 {
787 pthread_t thread;
788
789 shared.fd_pair = fd_pair;
790 shared.rd_mode = rd_mode;
791 shared.wr_mode = wr_mode;
792 fd_pair_init(fd_pair, &(shared.rd_fd), &(shared.wr_fd));
793
794 shared.wr_kind = THREAD_WRITER;
795 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_wait.sem, SYNC_POLICY_FIFO, 0),
796 "semaphore_create shared.wr_wait.sem");
797
798 T_QUIET;
799 T_ASSERT_POSIX_ZERO(pthread_create(&thread, NULL, write_to_fd, NULL),
800 NULL);
801 T_LOG("created writer thread");
802
803 read_from_fd(shared.rd_fd, fd_pair, rd_mode);
804
805 T_ASSERT_POSIX_ZERO(pthread_join(thread, NULL), NULL);
806
807 T_END;
808 }
809
810 #pragma mark multiple processes
811
812 static void __attribute__((noreturn))
813 drive_processes(enum fd_pair fd_pair, enum read_mode rd_mode, enum write_mode wr_mode)
814 {
815 shared.fd_pair = fd_pair;
816 shared.rd_mode = rd_mode;
817 shared.wr_mode = wr_mode;
818 fd_pair_init(fd_pair, &(shared.rd_fd), &(shared.wr_fd));
819
820 shared.wr_kind = PROCESS_WRITER;
821 int fds[2];
822 T_QUIET; T_ASSERT_POSIX_SUCCESS(pipe(fds), NULL);
823 shared.wr_wait.out_fd = fds[0];
824 shared.wr_wait.in_fd = fds[1];
825
826 T_LOG("starting subprocesses");
827 dt_helper_t helpers[2] = {
828 dt_fork_helper("reader_helper"),
829 dt_fork_helper("writer_helper")
830 };
831
832 close(shared.rd_fd);
833 close(shared.wr_fd);
834
835 dt_run_helpers(helpers, 2, 50000);
836 }
837
838 T_HELPER_DECL(reader_helper, "Read asynchronously")
839 {
840 close(shared.wr_fd);
841 read_from_fd(shared.rd_fd, shared.fd_pair, shared.rd_mode);
842 T_END;
843 }
844
845 T_HELPER_DECL(writer_helper, "Write asynchronously")
846 {
847 close(shared.rd_fd);
848 write_to_fd(NULL);
849 }
850
851 #pragma mark tests
852
853 #define WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \
854 write_mode, read_name, read_mode) \
855 T_DECL(desc_name##_r##read_name##_w##write_name##_procs, "read changes to a " \
856 #desc_name " with " #read_name " and writing " #write_str \
857 " across two processes") \
858 { \
859 drive_processes(fd_pair, read_mode, write_mode); \
860 }
861 #define WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \
862 write_mode, read_name, read_mode) \
863 T_DECL(desc_name##_r##read_name##_w##write_name##_thds, "read changes to a " \
864 #desc_name " with " #read_name " and writing " #write_str) \
865 { \
866 drive_threads(fd_pair, read_mode, write_mode); \
867 }
868
869 #define WR_DECL(desc_name, fd_pair, write_name, write_str, write_mode, \
870 read_name, read_mode) \
871 WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \
872 write_mode, read_name, read_mode) \
873 WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \
874 write_mode, read_name, read_mode)
875
876 #define RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
877 WR_DECL(desc_name, fd_pair, full, "the full string", FULL_WRITE, \
878 read_name, read_mode) \
879 WR_DECL(desc_name, fd_pair, inc, "incrementally", \
880 INCREMENTAL_WRITE, read_name, read_mode)
881
882 #define RD_DECL_DISPATCH_ONLY(suffix, desc_name, fd_pair, read_name, \
883 read_mode) \
884 WR_DECL##suffix(desc_name, fd_pair, inc_dispatch, \
885 "incrementally with a dispatch source", \
886 DISPATCH_INCREMENTAL_WRITE, read_name, read_mode)
887 #define RD_DECL_WORKQ_ONLY(suffix, desc_name, fd_pair, read_name, \
888 read_mode) \
889 WR_DECL##suffix(desc_name, fd_pair, inc_workq, \
890 "incrementally with the workqueue", \
891 WORKQ_INCREMENTAL_WRITE, read_name, read_mode)
892
893 #define RD_DECL(desc_name, fd_pair, read_name, read_mode) \
894 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
895 RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode)
896 // RD_DECL_WORKQ_ONLY(, desc_name, fd_pair, read_name, read_mode)
897
898 /*
899 * dispatch_source tests cannot share the same process as other workqueue
900 * tests.
901 */
902 #define RD_DECL_DISPATCH(desc_name, fd_pair, read_name, read_mode) \
903 RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
904 RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode) \
905 RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
906 read_mode)
907
908 /*
909 * Workqueue tests cannot share the same process as other workqueue or
910 * dispatch_source tests.
911 #define RD_DECL_WORKQ(desc_name, fd_pair, read_name, read_mode) \
912 * RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \
913 * RD_DECL_DISPATCH_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
914 * read_mode) \
915 * RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \
916 * read_mode)
917 */
918
919 #define PAIR_DECL(desc_name, fd_pair) \
920 RD_DECL(desc_name, fd_pair, poll, POLL_READ) \
921 RD_DECL(desc_name, fd_pair, select, SELECT_READ) \
922 RD_DECL(desc_name, fd_pair, kevent, KEVENT_READ) \
923 RD_DECL(desc_name, fd_pair, kevent64, KEVENT64_READ) \
924 RD_DECL(desc_name, fd_pair, kevent_qos, KEVENT_QOS_READ) \
925 RD_DECL_DISPATCH(desc_name, fd_pair, dispatch_source, DISPATCH_READ)
926 // RD_DECL_WORKQ(desc_name, fd_pair, workq, WORKQ_READ)
927
928 PAIR_DECL(tty, PTY_PAIR)
929 PAIR_DECL(pipe, PIPE_PAIR)
930 PAIR_DECL(fifo, FIFO_PAIR)
931 PAIR_DECL(socket, SOCKET_PAIR)