]>
Commit | Line | Data |
---|---|---|
813fb2f6 A |
1 | #ifdef T_NAMESPACE |
2 | #undef T_NAMESPACE | |
3 | #endif | |
4 | #include <darwintest.h> | |
5 | #include <darwintest_multiprocess.h> | |
6 | ||
7 | #include <assert.h> | |
8 | #include <dispatch/dispatch.h> | |
9 | #include <err.h> | |
10 | #include <errno.h> | |
11 | #include <fcntl.h> | |
12 | #include <poll.h> | |
13 | #include <pthread.h> | |
14 | #include <pthread/workqueue_private.h> | |
15 | #include <stdio.h> | |
16 | #include <stdlib.h> | |
17 | #include <string.h> | |
18 | #include <sys/event.h> | |
19 | #include <sys/socket.h> | |
20 | #include <sys/stat.h> | |
21 | #include <sys/time.h> | |
22 | #include <sys/types.h> | |
23 | #include <sys/wait.h> | |
24 | #include <sysexits.h> | |
25 | #include <unistd.h> | |
26 | #include <util.h> | |
27 | #include <System/sys/event.h> /* kevent_qos */ | |
28 | ||
29 | T_GLOBAL_META(T_META_NAMESPACE("xnu.poll_select_kevent_paired_fds")); | |
30 | ||
31 | /* | |
32 | * Test to validate that monitoring a PTY device, FIFO, pipe, or socket pair in | |
33 | * a dispatch source, kqueue, poll, or select delivers read events within and | |
34 | * between processes as expected. | |
35 | * | |
36 | * This test catches issues with watching special devices in kqueue(), | |
37 | * which has tricky special cases for character devices like PTYs. | |
38 | * | |
39 | * It also exercises the path to wake up a dispatch worker thread from the | |
40 | * special device kqueue event, which is also a special case in kqueue(). | |
41 | * | |
42 | * See rdar://problem/26240299&26220074&26226862&28625427 for examples and | |
43 | * history. | |
44 | */ | |
45 | ||
46 | #define EXPECTED_STRING "abcdefghijklmnopqrstuvwxyz. ABCDEFGHIJKLMNOPQRSTUVWXYZ. 1234567890" | |
47 | #define EXPECTED_LEN strlen(EXPECTED_STRING) | |
48 | ||
49 | #define READ_SETUP_TIMEOUT_SECS 2 | |
50 | #define WRITE_TIMEOUT_SECS 4 | |
51 | #define READ_TIMEOUT_SECS 2 | |
52 | #define INCREMENTAL_WRITE_SLEEP_USECS 50 | |
53 | ||
54 | enum fd_pair { | |
55 | PTY_PAIR, | |
56 | FIFO_PAIR, | |
57 | PIPE_PAIR, | |
58 | SOCKET_PAIR | |
59 | }; | |
60 | ||
61 | enum write_mode { | |
62 | FULL_WRITE, | |
63 | INCREMENTAL_WRITE, | |
64 | KEVENT_INCREMENTAL_WRITE, | |
65 | KEVENT64_INCREMENTAL_WRITE, | |
66 | KEVENT_QOS_INCREMENTAL_WRITE, | |
67 | WORKQ_INCREMENTAL_WRITE, | |
68 | DISPATCH_INCREMENTAL_WRITE | |
69 | }; | |
70 | ||
71 | enum read_mode { | |
72 | POLL_READ, | |
73 | SELECT_READ, | |
74 | KEVENT_READ, | |
75 | KEVENT64_READ, | |
76 | KEVENT_QOS_READ, | |
77 | WORKQ_READ, | |
78 | DISPATCH_READ | |
79 | }; | |
80 | ||
81 | union mode { | |
82 | enum read_mode rd; | |
83 | enum write_mode wr; | |
84 | }; | |
85 | ||
86 | static struct { | |
87 | enum fd_pair fd_pair; | |
88 | enum write_mode wr_mode; | |
89 | int wr_fd; | |
90 | enum read_mode rd_mode; | |
91 | int rd_fd; | |
92 | ||
93 | enum writer_kind { | |
94 | THREAD_WRITER, /* sem */ | |
95 | PROCESS_WRITER /* fd */ | |
96 | } wr_kind; | |
97 | union { | |
98 | dispatch_semaphore_t sem; | |
99 | struct { | |
100 | int in_fd; | |
101 | int out_fd; | |
102 | }; | |
103 | } wr_wait; | |
104 | dispatch_semaphore_t wr_finished; | |
105 | dispatch_semaphore_t rd_finished; | |
106 | } shared; | |
107 | ||
108 | static bool handle_reading(enum fd_pair fd_pair, int fd); | |
109 | static bool handle_writing(enum fd_pair fd_pair, int fd); | |
110 | static void drive_kq(bool reading, union mode mode, enum fd_pair fd_pair, | |
111 | int fd); | |
112 | ||
113 | #pragma mark writing | |
114 | ||
115 | static void | |
116 | wake_writer(void) | |
117 | { | |
118 | T_LOG("waking writer"); | |
119 | ||
120 | switch (shared.wr_kind) { | |
121 | case THREAD_WRITER: | |
122 | dispatch_semaphore_signal(shared.wr_wait.sem); | |
123 | break; | |
124 | case PROCESS_WRITER: { | |
125 | char tmp = 'a'; | |
126 | close(shared.wr_wait.out_fd); | |
127 | T_QUIET; T_ASSERT_POSIX_SUCCESS(write( | |
128 | shared.wr_wait.in_fd, &tmp, 1), NULL); | |
129 | break; | |
130 | } | |
131 | } | |
132 | } | |
133 | ||
134 | static void | |
135 | writer_wait(void) | |
136 | { | |
137 | switch (shared.wr_kind) { | |
138 | case THREAD_WRITER: | |
139 | T_QUIET; T_ASSERT_EQ(dispatch_semaphore_wait( | |
140 | shared.wr_wait.sem, | |
141 | dispatch_time(DISPATCH_TIME_NOW, | |
142 | READ_SETUP_TIMEOUT_SECS * NSEC_PER_SEC)), 0L, | |
143 | NULL); | |
144 | break; | |
145 | case PROCESS_WRITER: { | |
146 | char tmp; | |
147 | close(shared.wr_wait.in_fd); | |
148 | T_QUIET; T_ASSERT_POSIX_SUCCESS(read( | |
149 | shared.wr_wait.out_fd, &tmp, 1), NULL); | |
150 | break; | |
151 | } | |
152 | } | |
153 | ||
154 | T_LOG("writer woken up, starting to write"); | |
155 | } | |
156 | ||
157 | static bool | |
158 | handle_writing(enum fd_pair __unused fd_pair, int fd) | |
159 | { | |
160 | static unsigned int cur_char = 0; | |
161 | T_QUIET; T_ASSERT_POSIX_SUCCESS(write(fd, | |
162 | &(EXPECTED_STRING[cur_char]), 1), NULL); | |
163 | cur_char++; | |
164 | ||
165 | return (cur_char < EXPECTED_LEN); | |
166 | } | |
167 | ||
168 | #define EXPECTED_QOS QOS_CLASS_USER_INITIATED | |
169 | ||
170 | static void | |
171 | reenable_workq(int fd, int16_t filt) | |
172 | { | |
173 | struct kevent_qos_s events[] = {{ | |
174 | .ident = (uint64_t)fd, | |
175 | .filter = filt, | |
176 | .flags = EV_ENABLE | EV_UDATA_SPECIFIC | EV_DISPATCH, | |
177 | .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS, | |
178 | 0, 0), | |
179 | .fflags = NOTE_LOWAT, | |
180 | .data = 1 | |
181 | }}; | |
182 | ||
183 | int kev = kevent_qos(-1, events, 1, events, 1, NULL, NULL, | |
184 | KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS); | |
185 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "reenable workq in kevent_qos"); | |
186 | } | |
187 | ||
188 | static void | |
189 | workqueue_write_fn(void ** __unused buf, int * __unused count) | |
190 | { | |
191 | // T_MAYFAIL; | |
192 | // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS, | |
193 | // "writer thread should be woken up at correct QoS"); | |
194 | if (!handle_writing(shared.fd_pair, shared.wr_fd)) { | |
195 | /* finished handling the fd, tear down the source */ | |
196 | dispatch_semaphore_signal(shared.wr_finished); | |
197 | return; | |
198 | } | |
199 | ||
200 | reenable_workq(shared.wr_fd, EVFILT_WRITE); | |
201 | } | |
202 | ||
203 | static void | |
204 | workqueue_fn(pthread_priority_t __unused priority) | |
205 | { | |
206 | T_ASSERT_FAIL("workqueue function callback was called"); | |
207 | } | |
208 | ||
209 | static void | |
210 | drive_kq(bool reading, union mode mode, enum fd_pair fd_pair, int fd) | |
211 | { | |
212 | struct timespec timeout = { .tv_sec = READ_TIMEOUT_SECS }; | |
213 | int kev = -1; | |
214 | ||
215 | struct kevent events; | |
216 | EV_SET(&events, fd, reading ? EVFILT_READ : EVFILT_WRITE, EV_ADD, | |
217 | NOTE_LOWAT, 1, NULL); | |
218 | struct kevent64_s events64; | |
219 | EV_SET64(&events64, fd, reading ? EVFILT_READ : EVFILT_WRITE, EV_ADD, | |
220 | NOTE_LOWAT, 1, 0, 0, 0); | |
221 | struct kevent_qos_s events_qos[] = {{ | |
222 | .ident = (uint64_t)fd, | |
223 | .filter = reading ? EVFILT_READ : EVFILT_WRITE, | |
224 | .flags = EV_ADD, | |
225 | .fflags = NOTE_LOWAT, | |
226 | .data = 1 | |
227 | }, { | |
228 | .ident = 0, | |
229 | .filter = EVFILT_TIMER, | |
230 | .flags = EV_ADD, | |
231 | .fflags = NOTE_SECONDS, | |
232 | .data = READ_TIMEOUT_SECS | |
233 | }}; | |
234 | ||
235 | /* determine which variant of kevent to use */ | |
236 | enum read_mode which_kevent; | |
237 | if (reading) { | |
238 | which_kevent = mode.rd; | |
239 | } else { | |
240 | if (mode.wr == KEVENT_INCREMENTAL_WRITE) { | |
241 | which_kevent = KEVENT_READ; | |
242 | } else if (mode.wr == KEVENT64_INCREMENTAL_WRITE) { | |
243 | which_kevent = KEVENT64_READ; | |
244 | } else if (mode.wr == KEVENT_QOS_INCREMENTAL_WRITE) { | |
245 | which_kevent = KEVENT_QOS_READ; | |
246 | } else { | |
247 | T_ASSERT_FAIL("unexpected mode: %d", mode.wr); | |
248 | __builtin_unreachable(); | |
249 | } | |
250 | } | |
251 | ||
252 | int kq_fd = kqueue(); | |
253 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kq_fd, "kqueue"); | |
254 | ||
255 | switch (which_kevent) { | |
256 | case KEVENT_READ: | |
257 | kev = kevent(kq_fd, &events, 1, NULL, 0, NULL); | |
258 | break; | |
259 | case KEVENT64_READ: | |
260 | kev = kevent64(kq_fd, &events64, 1, NULL, 0, 0, NULL); | |
261 | break; | |
262 | case KEVENT_QOS_READ: | |
263 | kev = kevent_qos(kq_fd, events_qos, 2, NULL, 0, NULL, NULL, 0); | |
264 | break; | |
265 | case POLL_READ: /* FALLTHROUGH */ | |
266 | case SELECT_READ: /* FALLTHROUGH */ | |
267 | case DISPATCH_READ: /* FALLTHROUGH */ | |
268 | case WORKQ_READ: /* FALLTHROUGH */ | |
269 | default: | |
270 | T_ASSERT_FAIL("unexpected mode: %d", reading ? mode.rd : mode.wr); | |
271 | break; | |
272 | } | |
273 | ||
274 | if (reading) { | |
275 | wake_writer(); | |
276 | } else { | |
277 | writer_wait(); | |
278 | } | |
279 | ||
280 | for (;;) { | |
281 | switch (which_kevent) { | |
282 | case KEVENT_READ: | |
283 | kev = kevent(kq_fd, NULL, 0, &events, 1, &timeout); | |
284 | break; | |
285 | case KEVENT64_READ: | |
286 | kev = kevent64(kq_fd, NULL, 0, &events64, 1, 0, &timeout); | |
287 | break; | |
288 | case KEVENT_QOS_READ: | |
289 | kev = kevent_qos(kq_fd, NULL, 0, events_qos, 2, NULL, NULL, 0); | |
290 | ||
291 | /* check for a timeout */ | |
292 | for (int i = 0; i < kev; i++) { | |
293 | if (events_qos[i].filter == EVFILT_TIMER) { | |
294 | kev = 0; | |
295 | } | |
296 | } | |
297 | break; | |
298 | case POLL_READ: /* FALLTHROUGH */ | |
299 | case SELECT_READ: /* FALLTHROUGH */ | |
300 | case DISPATCH_READ: /* FALLTHROUGH */ | |
301 | case WORKQ_READ: /* FALLTHROUGH */ | |
302 | default: | |
303 | T_ASSERT_FAIL("unexpected mode: %d", reading ? mode.rd : mode.wr); | |
304 | break; | |
305 | } | |
306 | ||
307 | if (kev == -1 && errno == EINTR) { | |
308 | T_LOG("kevent was interrupted"); | |
309 | continue; | |
310 | } | |
311 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent"); | |
312 | /* <rdar://problem/28747760> */ | |
313 | if (shared.fd_pair == PTY_PAIR) { | |
314 | T_MAYFAIL; | |
315 | } | |
316 | T_QUIET; T_ASSERT_NE(kev, 0, "kevent timed out"); | |
317 | ||
318 | if (reading) { | |
319 | if (!handle_reading(fd_pair, fd)) { | |
320 | break; | |
321 | } | |
322 | } else { | |
323 | if (!handle_writing(fd_pair, fd)) { | |
324 | break; | |
325 | } | |
326 | } | |
327 | } | |
328 | ||
329 | close(kq_fd); | |
330 | } | |
331 | ||
332 | static void * | |
333 | write_to_fd(void * __unused ctx) | |
334 | { | |
335 | ssize_t bytes_wr = 0; | |
336 | ||
337 | writer_wait(); | |
338 | ||
339 | switch (shared.wr_mode) { | |
340 | case FULL_WRITE: | |
341 | do { | |
342 | if (bytes_wr == -1) { | |
343 | T_LOG("write from child was interrupted"); | |
344 | } | |
345 | bytes_wr = write(shared.wr_fd, EXPECTED_STRING, | |
346 | EXPECTED_LEN); | |
347 | } while (bytes_wr == -1 && errno == EINTR); | |
348 | T_QUIET; T_ASSERT_POSIX_SUCCESS(bytes_wr, "write"); | |
349 | T_QUIET; T_ASSERT_EQ(bytes_wr, (ssize_t)EXPECTED_LEN, | |
350 | "wrote enough bytes"); | |
351 | break; | |
352 | ||
353 | case INCREMENTAL_WRITE: | |
354 | for (unsigned int i = 0; i < EXPECTED_LEN ; i++) { | |
355 | T_QUIET; | |
356 | T_ASSERT_POSIX_SUCCESS(write(shared.wr_fd, | |
357 | &(EXPECTED_STRING[i]), 1), NULL); | |
358 | usleep(INCREMENTAL_WRITE_SLEEP_USECS); | |
359 | } | |
360 | break; | |
361 | ||
362 | case KEVENT_INCREMENTAL_WRITE: /* FALLTHROUGH */ | |
363 | case KEVENT64_INCREMENTAL_WRITE: /* FALLTHROUGH */ | |
364 | case KEVENT_QOS_INCREMENTAL_WRITE: { | |
365 | union mode mode = { .wr = shared.wr_mode }; | |
366 | drive_kq(false, mode, shared.fd_pair, shared.wr_fd); | |
367 | break; | |
368 | } | |
369 | ||
370 | case WORKQ_INCREMENTAL_WRITE: { | |
371 | int changes = 1; | |
372 | ||
373 | shared.wr_finished = dispatch_semaphore_create(0); | |
374 | T_QUIET; T_ASSERT_NOTNULL(shared.wr_finished, | |
375 | "dispatch_semaphore_create"); | |
376 | ||
377 | T_QUIET; T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent( | |
378 | workqueue_fn, workqueue_write_fn, 0, 0), NULL); | |
379 | ||
380 | struct kevent_qos_s events[] = {{ | |
381 | .ident = (uint64_t)shared.wr_fd, | |
382 | .filter = EVFILT_WRITE, | |
383 | .flags = EV_ADD | EV_UDATA_SPECIFIC | EV_DISPATCH | EV_VANISHED, | |
384 | .fflags = NOTE_LOWAT, | |
385 | .data = 1, | |
386 | .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS, | |
387 | 0, 0) | |
388 | }}; | |
389 | ||
390 | for (;;) { | |
391 | int kev = kevent_qos(-1, changes == 0 ? NULL : events, changes, | |
392 | events, 1, NULL, NULL, | |
393 | KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS); | |
394 | if (kev == -1 && errno == EINTR) { | |
395 | changes = 0; | |
396 | T_LOG("kevent_qos was interrupted"); | |
397 | continue; | |
398 | } | |
399 | ||
400 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent_qos"); | |
401 | break; | |
402 | } | |
403 | break; | |
404 | } | |
405 | ||
406 | case DISPATCH_INCREMENTAL_WRITE: { | |
407 | dispatch_source_t write_src; | |
408 | ||
409 | shared.wr_finished = dispatch_semaphore_create(0); | |
410 | T_QUIET; T_ASSERT_NOTNULL(shared.wr_finished, | |
411 | "dispatch_semaphore_create"); | |
412 | ||
413 | write_src = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, | |
414 | (uintptr_t)shared.wr_fd, 0, NULL); | |
415 | T_QUIET; T_ASSERT_NOTNULL(write_src, | |
416 | "dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE ...)"); | |
417 | ||
418 | dispatch_block_t handler = dispatch_block_create_with_qos_class( | |
419 | DISPATCH_BLOCK_ENFORCE_QOS_CLASS, EXPECTED_QOS, 0, ^{ | |
420 | // T_MAYFAIL; | |
421 | // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS, | |
422 | // "write handler block should run at correct QoS"); | |
423 | if (!handle_writing(shared.fd_pair, shared.wr_fd)) { | |
424 | /* finished handling the fd, tear down the source */ | |
425 | dispatch_source_cancel(write_src); | |
426 | dispatch_release(write_src); | |
427 | dispatch_semaphore_signal(shared.wr_finished); | |
428 | } | |
429 | }); | |
430 | ||
431 | dispatch_source_set_event_handler(write_src, handler); | |
432 | dispatch_activate(write_src); | |
433 | ||
434 | break; | |
435 | } | |
436 | ||
437 | default: | |
438 | T_ASSERT_FAIL("unrecognized write mode: %d", shared.wr_mode); | |
439 | break; | |
440 | } | |
441 | ||
442 | if (shared.wr_finished) { | |
443 | long sem_timed_out = dispatch_semaphore_wait(shared.wr_finished, | |
444 | dispatch_time(DISPATCH_TIME_NOW, | |
445 | WRITE_TIMEOUT_SECS * NSEC_PER_SEC)); | |
446 | dispatch_release(shared.wr_finished); | |
447 | /* <rdar://problem/28747760> */ | |
448 | if (shared.fd_pair == PTY_PAIR) { | |
449 | T_MAYFAIL; | |
450 | } | |
451 | T_QUIET; T_ASSERT_EQ(sem_timed_out, 0L, | |
452 | "write side semaphore timed out after %d seconds", | |
453 | WRITE_TIMEOUT_SECS); | |
454 | } | |
455 | ||
456 | T_LOG("writer finished, closing fd"); | |
457 | T_QUIET; T_ASSERT_POSIX_SUCCESS(close(shared.wr_fd), NULL); | |
458 | return NULL; | |
459 | } | |
460 | ||
461 | #pragma mark reading | |
462 | ||
463 | #define BUF_LEN 1024 | |
464 | static char final_string[BUF_LEN]; | |
465 | static size_t final_length; | |
466 | ||
467 | /* | |
468 | * Read from the master PTY descriptor. | |
469 | * | |
470 | * Returns false if EOF is encountered, and true otherwise. | |
471 | */ | |
472 | static bool | |
473 | handle_reading(enum fd_pair fd_pair, int fd) | |
474 | { | |
475 | char read_buf[BUF_LEN] = { 0 }; | |
476 | ssize_t bytes_rd = 0; | |
477 | ||
478 | do { | |
479 | if (bytes_rd == -1) { | |
480 | T_LOG("read was interrupted, retrying"); | |
481 | } | |
482 | bytes_rd = read(fd, read_buf, sizeof(read_buf) - 1); | |
483 | } while (bytes_rd == -1 && errno == EINTR); | |
484 | ||
485 | T_QUIET; T_ASSERT_POSIX_SUCCESS(bytes_rd, "reading from file"); | |
486 | T_QUIET; T_ASSERT_LE(bytes_rd, (ssize_t)EXPECTED_LEN, | |
487 | "read too much from file"); | |
488 | ||
489 | if (bytes_rd == 0) { | |
490 | T_LOG("read EOF from file"); | |
491 | return false; | |
492 | } | |
493 | ||
494 | read_buf[bytes_rd] = '\0'; | |
495 | strlcpy(&(final_string[final_length]), read_buf, | |
496 | sizeof(final_string) - final_length); | |
497 | final_length += (size_t)bytes_rd; | |
498 | ||
499 | // T_LOG("read %zd bytes: '%s'", bytes_rd, read_buf); | |
500 | ||
501 | T_QUIET; T_ASSERT_LE(final_length, EXPECTED_LEN, | |
502 | "should not read more from file than what can be sent"); | |
503 | ||
504 | /* FIFOs don't (and TTYs may not) send EOF when the write side closes */ | |
505 | if (final_length == strlen(EXPECTED_STRING) && | |
506 | (fd_pair == FIFO_PAIR || fd_pair == PTY_PAIR)) | |
507 | { | |
508 | T_LOG("read all expected bytes from %s", | |
509 | fd_pair == FIFO_PAIR ? "FIFO" : "PTY"); | |
510 | return false; | |
511 | } | |
512 | return true; | |
513 | } | |
514 | ||
515 | static void | |
516 | workqueue_read_fn(void ** __unused buf, int * __unused count) | |
517 | { | |
518 | // T_MAYFAIL; | |
519 | // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS, | |
520 | // "reader thread should be requested at correct QoS"); | |
521 | if (!handle_reading(shared.fd_pair, shared.rd_fd)) { | |
522 | dispatch_semaphore_signal(shared.rd_finished); | |
523 | } | |
524 | ||
525 | reenable_workq(shared.rd_fd, EVFILT_READ); | |
526 | } | |
527 | ||
528 | static void | |
529 | read_from_fd(int fd, enum fd_pair fd_pair, enum read_mode mode) | |
530 | { | |
531 | int fd_flags; | |
532 | ||
533 | T_LOG("reader setting up"); | |
534 | ||
535 | bzero(final_string, sizeof(final_string)); | |
536 | ||
537 | fd_flags = fcntl(fd, F_GETFL, 0); | |
538 | T_QUIET; T_ASSERT_POSIX_SUCCESS(fd_flags, "fcntl(F_GETFL)"); | |
539 | ||
540 | if (!(fd_flags & O_NONBLOCK)) { | |
541 | T_QUIET; | |
542 | T_ASSERT_POSIX_SUCCESS(fcntl(fd, F_SETFL, | |
543 | fd_flags | O_NONBLOCK), NULL); | |
544 | } | |
545 | ||
546 | switch (mode) { | |
547 | case POLL_READ: { | |
548 | struct pollfd fds[] = { { .fd = fd, .events = POLLIN } }; | |
549 | wake_writer(); | |
550 | for (;;) { | |
551 | fds[0].revents = 0; | |
552 | int pol = poll(fds, 1, READ_TIMEOUT_SECS * 1000); | |
553 | T_QUIET; T_ASSERT_POSIX_SUCCESS(pol, "poll"); | |
554 | /* <rdar://problem/28747760> */ | |
555 | if (shared.fd_pair == PTY_PAIR) { | |
556 | T_MAYFAIL; | |
557 | } | |
558 | T_QUIET; T_ASSERT_NE(pol, 0, | |
559 | "poll should not time out after %d seconds, read %zd out " | |
560 | "of %zu bytes", | |
561 | READ_TIMEOUT_SECS, final_length, strlen(EXPECTED_STRING)); | |
562 | T_QUIET; T_ASSERT_FALSE(fds[0].revents & POLLERR, | |
563 | "should not see an error on the device"); | |
564 | ||
565 | if (!handle_reading(fd_pair, fd)) { | |
566 | break; | |
567 | } | |
568 | } | |
569 | break; | |
570 | } | |
571 | ||
572 | case SELECT_READ: | |
573 | wake_writer(); | |
574 | ||
575 | for (;;) { | |
576 | struct timeval tv = { .tv_sec = READ_TIMEOUT_SECS }; | |
577 | ||
578 | fd_set read_fd; | |
579 | FD_ZERO(&read_fd); | |
580 | FD_SET(fd, &read_fd); | |
581 | fd_set err_fd; | |
582 | FD_ZERO(&err_fd); | |
583 | FD_SET(fd, &err_fd); | |
584 | ||
585 | int sel = select(fd + 1, &read_fd, NULL, NULL/*&err_fd*/, &tv); | |
586 | if (sel == -1 && errno == EINTR) { | |
587 | T_LOG("select interrupted"); | |
588 | continue; | |
589 | } | |
590 | (void)fd_pair; | |
591 | ||
592 | T_QUIET; T_ASSERT_POSIX_SUCCESS(sel, "select"); | |
593 | ||
594 | /* <rdar://problem/28747760> */ | |
595 | if (shared.fd_pair == PTY_PAIR) { | |
596 | T_MAYFAIL; | |
597 | } | |
598 | T_QUIET; T_ASSERT_NE(sel, 0, | |
599 | "select waited for %d seconds and timed out", | |
600 | READ_TIMEOUT_SECS); | |
601 | ||
602 | if (fd_pair == PTY_PAIR) { | |
603 | /* | |
604 | * XXX sometimes a PTY doesn't send EOF when the writer closes | |
605 | */ | |
606 | T_MAYFAIL; | |
607 | } | |
608 | /* didn't fail or time out, therefore data is ready */ | |
609 | T_QUIET; T_ASSERT_NE(FD_ISSET(fd, &read_fd), 0, | |
610 | "select should show reading fd as readable"); | |
611 | ||
612 | if (!handle_reading(fd_pair, fd)) { | |
613 | break; | |
614 | } | |
615 | } | |
616 | break; | |
617 | ||
618 | case KEVENT_READ: /* FALLTHROUGH */ | |
619 | case KEVENT64_READ: /* FALLTHROUGH */ | |
620 | case KEVENT_QOS_READ: { | |
621 | union mode rd_mode = { .rd = shared.rd_mode }; | |
622 | drive_kq(true, rd_mode, fd_pair, shared.rd_fd); | |
623 | break; | |
624 | } | |
625 | ||
626 | case WORKQ_READ: { | |
627 | T_QUIET; T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent( | |
628 | workqueue_fn, workqueue_read_fn, 0, 0), NULL); | |
629 | ||
630 | shared.rd_finished = dispatch_semaphore_create(0); | |
631 | T_QUIET; T_ASSERT_NOTNULL(shared.rd_finished, | |
632 | "dispatch_semaphore_create"); | |
633 | ||
634 | int changes = 1; | |
635 | struct kevent_qos_s events[] = {{ | |
636 | .ident = (uint64_t)shared.rd_fd, | |
637 | .filter = EVFILT_READ, | |
638 | .flags = EV_ADD | EV_UDATA_SPECIFIC | EV_DISPATCH | EV_VANISHED, | |
639 | .fflags = NOTE_LOWAT, | |
640 | .data = 1, | |
641 | .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS, | |
642 | 0, 0) | |
643 | }}; | |
644 | ||
645 | for (;;) { | |
646 | int kev = kevent_qos(-1, changes == 0 ? NULL : events, changes, | |
647 | events, 1, NULL, NULL, | |
648 | KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS); | |
649 | if (kev == -1 && errno == EINTR) { | |
650 | changes = 0; | |
651 | T_LOG("kevent_qos was interrupted"); | |
652 | continue; | |
653 | } | |
654 | ||
655 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent_qos"); | |
656 | break; | |
657 | } | |
658 | ||
659 | wake_writer(); | |
660 | break; | |
661 | } | |
662 | ||
663 | case DISPATCH_READ: { | |
664 | dispatch_source_t read_src; | |
665 | ||
666 | shared.rd_finished = dispatch_semaphore_create(0); | |
667 | T_QUIET; T_ASSERT_NOTNULL(shared.rd_finished, | |
668 | "dispatch_semaphore_create"); | |
669 | ||
670 | read_src = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, | |
671 | (uintptr_t)fd, 0, NULL); | |
672 | T_QUIET; T_ASSERT_NOTNULL(read_src, | |
673 | "dispatch_source_create(DISPATCH_SOURCE_TYPE_READ)"); | |
674 | ||
675 | dispatch_block_t handler = dispatch_block_create_with_qos_class( | |
676 | DISPATCH_BLOCK_ENFORCE_QOS_CLASS, EXPECTED_QOS, 0, ^{ | |
677 | // T_MAYFAIL; | |
678 | // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS, | |
679 | // "read handler block should run at correct QoS"); | |
680 | ||
681 | if (!handle_reading(fd_pair, fd)) { | |
682 | /* finished handling the fd, tear down the source */ | |
683 | dispatch_source_cancel(read_src); | |
684 | dispatch_release(read_src); | |
685 | dispatch_semaphore_signal(shared.rd_finished); | |
686 | } | |
687 | }); | |
688 | ||
689 | dispatch_source_set_event_handler(read_src, handler); | |
690 | dispatch_activate(read_src); | |
691 | ||
692 | wake_writer(); | |
693 | break; | |
694 | } | |
695 | ||
696 | default: | |
697 | T_ASSERT_FAIL("unrecognized read mode: %d", mode); | |
698 | break; | |
699 | } | |
700 | ||
701 | if (shared.rd_finished) { | |
702 | long timed_out = dispatch_semaphore_wait(shared.rd_finished, | |
703 | dispatch_time(DISPATCH_TIME_NOW, | |
704 | READ_TIMEOUT_SECS * NSEC_PER_SEC)); | |
705 | /* <rdar://problem/28747760> */ | |
706 | if (shared.fd_pair == PTY_PAIR) { | |
707 | T_MAYFAIL; | |
708 | } | |
709 | T_QUIET; T_ASSERT_EQ(timed_out, 0L, | |
710 | "reading timed out after %d seconds", READ_TIMEOUT_SECS); | |
711 | ||
712 | } | |
713 | ||
714 | T_EXPECT_EQ_STR(final_string, EXPECTED_STRING, | |
715 | "reader should receive valid string"); | |
716 | T_QUIET; T_ASSERT_POSIX_SUCCESS(close(fd), NULL); | |
717 | } | |
718 | ||
719 | #pragma mark file setup | |
720 | ||
721 | static void | |
722 | fd_pair_init(enum fd_pair fd_pair, int *rd_fd, int *wr_fd) | |
723 | { | |
724 | switch (fd_pair) { | |
725 | case PTY_PAIR: | |
726 | T_ASSERT_POSIX_SUCCESS(openpty(rd_fd, wr_fd, NULL, NULL, NULL), | |
727 | NULL); | |
728 | break; | |
729 | ||
730 | case FIFO_PAIR: { | |
731 | char fifo_path[] = "/tmp/async-io-fifo.XXXXXX"; | |
732 | T_QUIET; T_ASSERT_NOTNULL(mktemp(fifo_path), NULL); | |
733 | ||
734 | T_ASSERT_POSIX_SUCCESS(mkfifo(fifo_path, 0700), "mkfifo(%s, 0700)", | |
735 | fifo_path); | |
736 | /* | |
737 | * Opening the read side of a pipe will block until the write | |
738 | * side opens -- use O_NONBLOCK. | |
739 | */ | |
740 | *rd_fd = open(fifo_path, O_RDONLY | O_NONBLOCK); | |
741 | T_QUIET; T_ASSERT_POSIX_SUCCESS(*rd_fd, "open(... O_RDONLY)"); | |
742 | *wr_fd = open(fifo_path, O_WRONLY | O_NONBLOCK); | |
743 | T_QUIET; T_ASSERT_POSIX_SUCCESS(*wr_fd, "open(... O_WRONLY)"); | |
744 | break; | |
745 | } | |
746 | ||
747 | case PIPE_PAIR: { | |
748 | int pipe_fds[2]; | |
749 | T_ASSERT_POSIX_SUCCESS(pipe(pipe_fds), NULL); | |
750 | *rd_fd = pipe_fds[0]; | |
751 | *wr_fd = pipe_fds[1]; | |
752 | break; | |
753 | } | |
754 | ||
755 | case SOCKET_PAIR: { | |
756 | int sock_fds[2]; | |
757 | T_ASSERT_POSIX_SUCCESS(socketpair(AF_UNIX, SOCK_STREAM, 0, sock_fds), | |
758 | NULL); | |
759 | *rd_fd = sock_fds[0]; | |
760 | *wr_fd = sock_fds[1]; | |
761 | break; | |
762 | } | |
763 | ||
764 | default: | |
765 | T_ASSERT_FAIL("unknown descriptor pair type: %d", fd_pair); | |
766 | break; | |
767 | } | |
768 | ||
769 | T_QUIET; T_ASSERT_NE(*rd_fd, -1, "reading descriptor"); | |
770 | T_QUIET; T_ASSERT_NE(*wr_fd, -1, "writing descriptor"); | |
771 | } | |
772 | ||
773 | #pragma mark single process | |
774 | ||
775 | static void | |
776 | drive_threads(enum fd_pair fd_pair, enum read_mode rd_mode, | |
777 | enum write_mode wr_mode) | |
778 | { | |
779 | pthread_t thread; | |
780 | ||
781 | shared.fd_pair = fd_pair; | |
782 | shared.rd_mode = rd_mode; | |
783 | shared.wr_mode = wr_mode; | |
784 | fd_pair_init(fd_pair, &(shared.rd_fd), &(shared.wr_fd)); | |
785 | ||
786 | shared.wr_kind = THREAD_WRITER; | |
787 | shared.wr_wait.sem = dispatch_semaphore_create(0); | |
788 | ||
789 | T_QUIET; | |
790 | T_ASSERT_POSIX_ZERO(pthread_create(&thread, NULL, write_to_fd, NULL), | |
791 | NULL); | |
792 | T_LOG("created writer thread"); | |
793 | ||
794 | read_from_fd(shared.rd_fd, fd_pair, rd_mode); | |
795 | T_END; | |
796 | } | |
797 | ||
798 | #pragma mark multiple processes | |
799 | ||
800 | static void __attribute__((noreturn)) | |
801 | drive_processes(enum fd_pair fd_pair, enum read_mode rd_mode, enum write_mode wr_mode) | |
802 | { | |
803 | shared.fd_pair = fd_pair; | |
804 | shared.rd_mode = rd_mode; | |
805 | shared.wr_mode = wr_mode; | |
806 | fd_pair_init(fd_pair, &(shared.rd_fd), &(shared.wr_fd)); | |
807 | ||
808 | shared.wr_kind = PROCESS_WRITER; | |
809 | int fds[2]; | |
810 | T_QUIET; T_ASSERT_POSIX_SUCCESS(pipe(fds), NULL); | |
811 | shared.wr_wait.out_fd = fds[0]; | |
812 | shared.wr_wait.in_fd = fds[1]; | |
813 | ||
814 | T_LOG("starting subprocesses"); | |
815 | dt_helper_t helpers[2] = { | |
816 | dt_fork_helper("reader_helper"), | |
817 | dt_fork_helper("writer_helper") | |
818 | }; | |
819 | ||
820 | close(shared.rd_fd); | |
821 | close(shared.wr_fd); | |
822 | ||
823 | dt_run_helpers(helpers, 2, 50000); | |
824 | } | |
825 | ||
826 | T_HELPER_DECL(reader_helper, "Read asynchronously") | |
827 | { | |
828 | close(shared.wr_fd); | |
829 | read_from_fd(shared.rd_fd, shared.fd_pair, shared.rd_mode); | |
830 | T_END; | |
831 | } | |
832 | ||
833 | T_HELPER_DECL(writer_helper, "Write asynchronously") | |
834 | { | |
835 | close(shared.rd_fd); | |
836 | write_to_fd(NULL); | |
837 | } | |
838 | ||
839 | #pragma mark tests | |
840 | ||
841 | #define WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \ | |
842 | write_mode, read_name, read_mode) \ | |
843 | T_DECL(processes_##desc_name##_##read_name##_##write_name, "read changes to a " \ | |
844 | #desc_name " with " #read_name " and writing " #write_str \ | |
845 | " across two processes") \ | |
846 | { \ | |
847 | drive_processes(fd_pair, read_mode, write_mode); \ | |
848 | } | |
849 | #define WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \ | |
850 | write_mode, read_name, read_mode) \ | |
851 | T_DECL(threads_##desc_name##_##read_name##_##write_name, "read changes to a " \ | |
852 | #desc_name " with " #read_name " and writing " #write_str) \ | |
853 | { \ | |
854 | drive_threads(fd_pair, read_mode, write_mode); \ | |
855 | } | |
856 | ||
857 | #define WR_DECL(desc_name, fd_pair, write_name, write_str, write_mode, \ | |
858 | read_name, read_mode) \ | |
859 | WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \ | |
860 | write_mode, read_name, read_mode) \ | |
861 | WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \ | |
862 | write_mode, read_name, read_mode) | |
863 | ||
864 | #define RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \ | |
865 | WR_DECL(desc_name, fd_pair, full, "the full string", FULL_WRITE, \ | |
866 | read_name, read_mode) \ | |
867 | WR_DECL(desc_name, fd_pair, incremental, "incrementally", \ | |
868 | INCREMENTAL_WRITE, read_name, read_mode) | |
869 | ||
870 | #define RD_DECL_DISPATCH_ONLY(suffix, desc_name, fd_pair, read_name, \ | |
871 | read_mode) \ | |
872 | WR_DECL##suffix(desc_name, fd_pair, incremental_dispatch, \ | |
873 | "incrementally with a dispatch source", \ | |
874 | DISPATCH_INCREMENTAL_WRITE, read_name, read_mode) | |
875 | #define RD_DECL_WORKQ_ONLY(suffix, desc_name, fd_pair, read_name, \ | |
876 | read_mode) \ | |
877 | WR_DECL##suffix(desc_name, fd_pair, incremental_workq, \ | |
878 | "incrementally with the workqueue", \ | |
879 | WORKQ_INCREMENTAL_WRITE, read_name, read_mode) | |
880 | ||
881 | #define RD_DECL(desc_name, fd_pair, read_name, read_mode) \ | |
882 | RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \ | |
883 | RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode) | |
884 | // RD_DECL_WORKQ_ONLY(, desc_name, fd_pair, read_name, read_mode) | |
885 | ||
886 | /* | |
887 | * dispatch_source tests cannot share the same process as other workqueue | |
888 | * tests. | |
889 | */ | |
890 | #define RD_DECL_DISPATCH(desc_name, fd_pair, read_name, read_mode) \ | |
891 | RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \ | |
892 | RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode) \ | |
893 | RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \ | |
894 | read_mode) | |
895 | ||
896 | /* | |
897 | * Workqueue tests cannot share the same process as other workqueue or | |
898 | * dispatch_source tests. | |
899 | #define RD_DECL_WORKQ(desc_name, fd_pair, read_name, read_mode) \ | |
900 | RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \ | |
901 | RD_DECL_DISPATCH_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \ | |
902 | read_mode) \ | |
903 | RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \ | |
904 | read_mode) | |
905 | */ | |
906 | ||
907 | #define PAIR_DECL(desc_name, fd_pair) \ | |
908 | RD_DECL(desc_name, fd_pair, poll, POLL_READ) \ | |
909 | RD_DECL(desc_name, fd_pair, select, SELECT_READ) \ | |
910 | RD_DECL(desc_name, fd_pair, kevent, KEVENT_READ) \ | |
911 | RD_DECL(desc_name, fd_pair, kevent64, KEVENT64_READ) \ | |
912 | RD_DECL(desc_name, fd_pair, kevent_qos, KEVENT_QOS_READ) \ | |
913 | RD_DECL_DISPATCH(desc_name, fd_pair, dispatch_source, DISPATCH_READ) | |
914 | // RD_DECL_WORKQ(desc_name, fd_pair, workq, WORKQ_READ) | |
915 | ||
916 | PAIR_DECL(tty, PTY_PAIR) | |
917 | PAIR_DECL(pipe, PIPE_PAIR) | |
918 | PAIR_DECL(fifo, FIFO_PAIR) | |
919 | PAIR_DECL(socket, SOCKET_PAIR) |