]>
Commit | Line | Data |
---|---|---|
813fb2f6 A |
1 | #ifdef T_NAMESPACE |
2 | #undef T_NAMESPACE | |
3 | #endif | |
5ba3f43e | 4 | |
813fb2f6 | 5 | #include <darwintest.h> |
5ba3f43e | 6 | #include <mach/mach.h> |
813fb2f6 A |
7 | #include <darwintest_multiprocess.h> |
8 | ||
9 | #include <assert.h> | |
10 | #include <dispatch/dispatch.h> | |
5ba3f43e | 11 | #include <dispatch/private.h> |
813fb2f6 A |
12 | #include <err.h> |
13 | #include <errno.h> | |
14 | #include <fcntl.h> | |
15 | #include <poll.h> | |
16 | #include <pthread.h> | |
17 | #include <pthread/workqueue_private.h> | |
18 | #include <stdio.h> | |
19 | #include <stdlib.h> | |
20 | #include <string.h> | |
21 | #include <sys/event.h> | |
22 | #include <sys/socket.h> | |
23 | #include <sys/stat.h> | |
24 | #include <sys/time.h> | |
25 | #include <sys/types.h> | |
26 | #include <sys/wait.h> | |
27 | #include <sysexits.h> | |
28 | #include <unistd.h> | |
29 | #include <util.h> | |
30 | #include <System/sys/event.h> /* kevent_qos */ | |
31 | ||
5ba3f43e A |
32 | T_GLOBAL_META( |
33 | T_META_NAMESPACE("xnu.kevent"), | |
34 | T_META_CHECK_LEAKS(false), | |
35 | T_META_LTEPHASE(LTE_POSTINIT)); | |
813fb2f6 A |
36 | |
37 | /* | |
38 | * Test to validate that monitoring a PTY device, FIFO, pipe, or socket pair in | |
39 | * a dispatch source, kqueue, poll, or select delivers read events within and | |
40 | * between processes as expected. | |
41 | * | |
42 | * This test catches issues with watching special devices in kqueue(), | |
43 | * which has tricky special cases for character devices like PTYs. | |
44 | * | |
45 | * It also exercises the path to wake up a dispatch worker thread from the | |
46 | * special device kqueue event, which is also a special case in kqueue(). | |
47 | * | |
48 | * See rdar://problem/26240299&26220074&26226862&28625427 for examples and | |
49 | * history. | |
50 | */ | |
51 | ||
52 | #define EXPECTED_STRING "abcdefghijklmnopqrstuvwxyz. ABCDEFGHIJKLMNOPQRSTUVWXYZ. 1234567890" | |
53 | #define EXPECTED_LEN strlen(EXPECTED_STRING) | |
54 | ||
55 | #define READ_SETUP_TIMEOUT_SECS 2 | |
56 | #define WRITE_TIMEOUT_SECS 4 | |
5ba3f43e | 57 | #define READ_TIMEOUT_SECS 4 |
813fb2f6 A |
58 | #define INCREMENTAL_WRITE_SLEEP_USECS 50 |
59 | ||
5ba3f43e A |
60 | static mach_timespec_t READ_SETUP_timeout = {.tv_sec = READ_SETUP_TIMEOUT_SECS, .tv_nsec = 0}; |
61 | static mach_timespec_t READ_timeout = {.tv_sec = READ_TIMEOUT_SECS, .tv_nsec = 0}; | |
62 | static mach_timespec_t WRITE_timeout = {.tv_sec = WRITE_TIMEOUT_SECS, .tv_nsec = 0}; | |
63 | ||
813fb2f6 A |
64 | enum fd_pair { |
65 | PTY_PAIR, | |
66 | FIFO_PAIR, | |
67 | PIPE_PAIR, | |
68 | SOCKET_PAIR | |
69 | }; | |
70 | ||
71 | enum write_mode { | |
72 | FULL_WRITE, | |
73 | INCREMENTAL_WRITE, | |
74 | KEVENT_INCREMENTAL_WRITE, | |
75 | KEVENT64_INCREMENTAL_WRITE, | |
76 | KEVENT_QOS_INCREMENTAL_WRITE, | |
77 | WORKQ_INCREMENTAL_WRITE, | |
78 | DISPATCH_INCREMENTAL_WRITE | |
79 | }; | |
80 | ||
81 | enum read_mode { | |
82 | POLL_READ, | |
83 | SELECT_READ, | |
84 | KEVENT_READ, | |
85 | KEVENT64_READ, | |
86 | KEVENT_QOS_READ, | |
87 | WORKQ_READ, | |
88 | DISPATCH_READ | |
89 | }; | |
90 | ||
91 | union mode { | |
92 | enum read_mode rd; | |
93 | enum write_mode wr; | |
94 | }; | |
95 | ||
96 | static struct { | |
97 | enum fd_pair fd_pair; | |
98 | enum write_mode wr_mode; | |
99 | int wr_fd; | |
100 | enum read_mode rd_mode; | |
101 | int rd_fd; | |
102 | ||
103 | enum writer_kind { | |
104 | THREAD_WRITER, /* sem */ | |
105 | PROCESS_WRITER /* fd */ | |
106 | } wr_kind; | |
107 | union { | |
5ba3f43e | 108 | semaphore_t sem; |
813fb2f6 A |
109 | struct { |
110 | int in_fd; | |
111 | int out_fd; | |
112 | }; | |
113 | } wr_wait; | |
5ba3f43e A |
114 | semaphore_t wr_finished; |
115 | semaphore_t rd_finished; | |
813fb2f6 A |
116 | } shared; |
117 | ||
118 | static bool handle_reading(enum fd_pair fd_pair, int fd); | |
119 | static bool handle_writing(enum fd_pair fd_pair, int fd); | |
120 | static void drive_kq(bool reading, union mode mode, enum fd_pair fd_pair, | |
121 | int fd); | |
122 | ||
123 | #pragma mark writing | |
124 | ||
125 | static void | |
126 | wake_writer(void) | |
127 | { | |
128 | T_LOG("waking writer"); | |
129 | ||
130 | switch (shared.wr_kind) { | |
131 | case THREAD_WRITER: | |
5ba3f43e A |
132 | T_LOG("signal shared.wr_wait.sem"); |
133 | semaphore_signal(shared.wr_wait.sem); | |
813fb2f6 A |
134 | break; |
135 | case PROCESS_WRITER: { | |
136 | char tmp = 'a'; | |
137 | close(shared.wr_wait.out_fd); | |
138 | T_QUIET; T_ASSERT_POSIX_SUCCESS(write( | |
139 | shared.wr_wait.in_fd, &tmp, 1), NULL); | |
140 | break; | |
141 | } | |
142 | } | |
143 | } | |
144 | ||
145 | static void | |
146 | writer_wait(void) | |
147 | { | |
148 | switch (shared.wr_kind) { | |
149 | case THREAD_WRITER: | |
5ba3f43e A |
150 | T_LOG("wait shared.wr_wait.sem"); |
151 | kern_return_t kret = semaphore_timedwait(shared.wr_wait.sem, READ_SETUP_timeout); | |
152 | ||
153 | if (kret == KERN_OPERATION_TIMED_OUT) { | |
154 | T_ASSERT_FAIL("THREAD_WRITER semaphore timedout after %d seconds", READ_SETUP_timeout.tv_sec); | |
155 | } | |
156 | T_QUIET; | |
157 | T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.wr_wait.sem"); | |
813fb2f6 | 158 | break; |
5ba3f43e | 159 | |
813fb2f6 A |
160 | case PROCESS_WRITER: { |
161 | char tmp; | |
162 | close(shared.wr_wait.in_fd); | |
163 | T_QUIET; T_ASSERT_POSIX_SUCCESS(read( | |
164 | shared.wr_wait.out_fd, &tmp, 1), NULL); | |
165 | break; | |
166 | } | |
167 | } | |
168 | ||
169 | T_LOG("writer woken up, starting to write"); | |
170 | } | |
171 | ||
172 | static bool | |
173 | handle_writing(enum fd_pair __unused fd_pair, int fd) | |
174 | { | |
175 | static unsigned int cur_char = 0; | |
176 | T_QUIET; T_ASSERT_POSIX_SUCCESS(write(fd, | |
177 | &(EXPECTED_STRING[cur_char]), 1), NULL); | |
178 | cur_char++; | |
179 | ||
180 | return (cur_char < EXPECTED_LEN); | |
181 | } | |
182 | ||
183 | #define EXPECTED_QOS QOS_CLASS_USER_INITIATED | |
184 | ||
185 | static void | |
186 | reenable_workq(int fd, int16_t filt) | |
187 | { | |
188 | struct kevent_qos_s events[] = {{ | |
189 | .ident = (uint64_t)fd, | |
190 | .filter = filt, | |
191 | .flags = EV_ENABLE | EV_UDATA_SPECIFIC | EV_DISPATCH, | |
192 | .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS, | |
193 | 0, 0), | |
194 | .fflags = NOTE_LOWAT, | |
195 | .data = 1 | |
196 | }}; | |
197 | ||
198 | int kev = kevent_qos(-1, events, 1, events, 1, NULL, NULL, | |
199 | KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS); | |
200 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "reenable workq in kevent_qos"); | |
201 | } | |
202 | ||
203 | static void | |
204 | workqueue_write_fn(void ** __unused buf, int * __unused count) | |
205 | { | |
206 | // T_MAYFAIL; | |
207 | // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS, | |
208 | // "writer thread should be woken up at correct QoS"); | |
209 | if (!handle_writing(shared.fd_pair, shared.wr_fd)) { | |
210 | /* finished handling the fd, tear down the source */ | |
5ba3f43e A |
211 | T_LOG("signal shared.wr_finished"); |
212 | semaphore_signal(shared.wr_finished); | |
813fb2f6 A |
213 | return; |
214 | } | |
215 | ||
216 | reenable_workq(shared.wr_fd, EVFILT_WRITE); | |
217 | } | |
218 | ||
219 | static void | |
220 | workqueue_fn(pthread_priority_t __unused priority) | |
221 | { | |
222 | T_ASSERT_FAIL("workqueue function callback was called"); | |
223 | } | |
224 | ||
225 | static void | |
226 | drive_kq(bool reading, union mode mode, enum fd_pair fd_pair, int fd) | |
227 | { | |
228 | struct timespec timeout = { .tv_sec = READ_TIMEOUT_SECS }; | |
229 | int kev = -1; | |
230 | ||
231 | struct kevent events; | |
232 | EV_SET(&events, fd, reading ? EVFILT_READ : EVFILT_WRITE, EV_ADD, | |
233 | NOTE_LOWAT, 1, NULL); | |
234 | struct kevent64_s events64; | |
235 | EV_SET64(&events64, fd, reading ? EVFILT_READ : EVFILT_WRITE, EV_ADD, | |
236 | NOTE_LOWAT, 1, 0, 0, 0); | |
237 | struct kevent_qos_s events_qos[] = {{ | |
238 | .ident = (uint64_t)fd, | |
239 | .filter = reading ? EVFILT_READ : EVFILT_WRITE, | |
240 | .flags = EV_ADD, | |
241 | .fflags = NOTE_LOWAT, | |
242 | .data = 1 | |
243 | }, { | |
244 | .ident = 0, | |
245 | .filter = EVFILT_TIMER, | |
246 | .flags = EV_ADD, | |
247 | .fflags = NOTE_SECONDS, | |
248 | .data = READ_TIMEOUT_SECS | |
249 | }}; | |
250 | ||
251 | /* determine which variant of kevent to use */ | |
252 | enum read_mode which_kevent; | |
253 | if (reading) { | |
254 | which_kevent = mode.rd; | |
255 | } else { | |
256 | if (mode.wr == KEVENT_INCREMENTAL_WRITE) { | |
257 | which_kevent = KEVENT_READ; | |
258 | } else if (mode.wr == KEVENT64_INCREMENTAL_WRITE) { | |
259 | which_kevent = KEVENT64_READ; | |
260 | } else if (mode.wr == KEVENT_QOS_INCREMENTAL_WRITE) { | |
261 | which_kevent = KEVENT_QOS_READ; | |
262 | } else { | |
263 | T_ASSERT_FAIL("unexpected mode: %d", mode.wr); | |
264 | __builtin_unreachable(); | |
265 | } | |
266 | } | |
267 | ||
268 | int kq_fd = kqueue(); | |
269 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kq_fd, "kqueue"); | |
270 | ||
271 | switch (which_kevent) { | |
272 | case KEVENT_READ: | |
273 | kev = kevent(kq_fd, &events, 1, NULL, 0, NULL); | |
274 | break; | |
275 | case KEVENT64_READ: | |
276 | kev = kevent64(kq_fd, &events64, 1, NULL, 0, 0, NULL); | |
277 | break; | |
278 | case KEVENT_QOS_READ: | |
279 | kev = kevent_qos(kq_fd, events_qos, 2, NULL, 0, NULL, NULL, 0); | |
280 | break; | |
281 | case POLL_READ: /* FALLTHROUGH */ | |
282 | case SELECT_READ: /* FALLTHROUGH */ | |
283 | case DISPATCH_READ: /* FALLTHROUGH */ | |
284 | case WORKQ_READ: /* FALLTHROUGH */ | |
285 | default: | |
286 | T_ASSERT_FAIL("unexpected mode: %d", reading ? mode.rd : mode.wr); | |
287 | break; | |
288 | } | |
289 | ||
290 | if (reading) { | |
291 | wake_writer(); | |
292 | } else { | |
293 | writer_wait(); | |
294 | } | |
295 | ||
296 | for (;;) { | |
297 | switch (which_kevent) { | |
298 | case KEVENT_READ: | |
299 | kev = kevent(kq_fd, NULL, 0, &events, 1, &timeout); | |
300 | break; | |
301 | case KEVENT64_READ: | |
302 | kev = kevent64(kq_fd, NULL, 0, &events64, 1, 0, &timeout); | |
303 | break; | |
304 | case KEVENT_QOS_READ: | |
305 | kev = kevent_qos(kq_fd, NULL, 0, events_qos, 2, NULL, NULL, 0); | |
306 | ||
307 | /* check for a timeout */ | |
308 | for (int i = 0; i < kev; i++) { | |
309 | if (events_qos[i].filter == EVFILT_TIMER) { | |
310 | kev = 0; | |
311 | } | |
312 | } | |
313 | break; | |
314 | case POLL_READ: /* FALLTHROUGH */ | |
315 | case SELECT_READ: /* FALLTHROUGH */ | |
316 | case DISPATCH_READ: /* FALLTHROUGH */ | |
317 | case WORKQ_READ: /* FALLTHROUGH */ | |
318 | default: | |
319 | T_ASSERT_FAIL("unexpected mode: %d", reading ? mode.rd : mode.wr); | |
320 | break; | |
321 | } | |
322 | ||
323 | if (kev == -1 && errno == EINTR) { | |
324 | T_LOG("kevent was interrupted"); | |
325 | continue; | |
326 | } | |
327 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent"); | |
813fb2f6 A |
328 | T_QUIET; T_ASSERT_NE(kev, 0, "kevent timed out"); |
329 | ||
330 | if (reading) { | |
331 | if (!handle_reading(fd_pair, fd)) { | |
332 | break; | |
333 | } | |
334 | } else { | |
335 | if (!handle_writing(fd_pair, fd)) { | |
336 | break; | |
337 | } | |
338 | } | |
339 | } | |
340 | ||
341 | close(kq_fd); | |
342 | } | |
343 | ||
344 | static void * | |
345 | write_to_fd(void * __unused ctx) | |
346 | { | |
347 | ssize_t bytes_wr = 0; | |
348 | ||
349 | writer_wait(); | |
350 | ||
351 | switch (shared.wr_mode) { | |
352 | case FULL_WRITE: | |
353 | do { | |
354 | if (bytes_wr == -1) { | |
355 | T_LOG("write from child was interrupted"); | |
356 | } | |
357 | bytes_wr = write(shared.wr_fd, EXPECTED_STRING, | |
358 | EXPECTED_LEN); | |
359 | } while (bytes_wr == -1 && errno == EINTR); | |
360 | T_QUIET; T_ASSERT_POSIX_SUCCESS(bytes_wr, "write"); | |
361 | T_QUIET; T_ASSERT_EQ(bytes_wr, (ssize_t)EXPECTED_LEN, | |
362 | "wrote enough bytes"); | |
363 | break; | |
364 | ||
365 | case INCREMENTAL_WRITE: | |
366 | for (unsigned int i = 0; i < EXPECTED_LEN ; i++) { | |
367 | T_QUIET; | |
368 | T_ASSERT_POSIX_SUCCESS(write(shared.wr_fd, | |
369 | &(EXPECTED_STRING[i]), 1), NULL); | |
370 | usleep(INCREMENTAL_WRITE_SLEEP_USECS); | |
371 | } | |
372 | break; | |
373 | ||
374 | case KEVENT_INCREMENTAL_WRITE: /* FALLTHROUGH */ | |
375 | case KEVENT64_INCREMENTAL_WRITE: /* FALLTHROUGH */ | |
376 | case KEVENT_QOS_INCREMENTAL_WRITE: { | |
377 | union mode mode = { .wr = shared.wr_mode }; | |
378 | drive_kq(false, mode, shared.fd_pair, shared.wr_fd); | |
379 | break; | |
380 | } | |
381 | ||
382 | case WORKQ_INCREMENTAL_WRITE: { | |
5ba3f43e A |
383 | // prohibit ourselves from going multi-threaded see:rdar://33296008 |
384 | _dispatch_prohibit_transition_to_multithreaded(true); | |
813fb2f6 A |
385 | int changes = 1; |
386 | ||
5ba3f43e A |
387 | T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_finished, SYNC_POLICY_FIFO, 0), |
388 | "semaphore_create shared.wr_finished"); | |
813fb2f6 | 389 | |
5ba3f43e A |
390 | T_QUIET; |
391 | T_ASSERT_NE_UINT(shared.wr_finished, (unsigned)MACH_PORT_NULL, "wr_finished semaphore_create"); | |
392 | ||
393 | T_QUIET; | |
394 | T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(workqueue_fn, workqueue_write_fn, 0, 0), NULL); | |
813fb2f6 A |
395 | |
396 | struct kevent_qos_s events[] = {{ | |
397 | .ident = (uint64_t)shared.wr_fd, | |
398 | .filter = EVFILT_WRITE, | |
399 | .flags = EV_ADD | EV_UDATA_SPECIFIC | EV_DISPATCH | EV_VANISHED, | |
400 | .fflags = NOTE_LOWAT, | |
401 | .data = 1, | |
402 | .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS, | |
403 | 0, 0) | |
404 | }}; | |
405 | ||
406 | for (;;) { | |
407 | int kev = kevent_qos(-1, changes == 0 ? NULL : events, changes, | |
408 | events, 1, NULL, NULL, | |
409 | KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS); | |
410 | if (kev == -1 && errno == EINTR) { | |
411 | changes = 0; | |
412 | T_LOG("kevent_qos was interrupted"); | |
413 | continue; | |
414 | } | |
415 | ||
416 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent_qos"); | |
417 | break; | |
418 | } | |
419 | break; | |
420 | } | |
421 | ||
422 | case DISPATCH_INCREMENTAL_WRITE: { | |
423 | dispatch_source_t write_src; | |
424 | ||
5ba3f43e A |
425 | T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_finished, SYNC_POLICY_FIFO, 0), |
426 | "semaphore_create shared.wr_finished"); | |
427 | ||
428 | T_QUIET; | |
429 | T_ASSERT_NE_UINT(shared.wr_finished, (unsigned)MACH_PORT_NULL, "semaphore_create"); | |
813fb2f6 A |
430 | |
431 | write_src = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, | |
432 | (uintptr_t)shared.wr_fd, 0, NULL); | |
433 | T_QUIET; T_ASSERT_NOTNULL(write_src, | |
434 | "dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE ...)"); | |
435 | ||
436 | dispatch_block_t handler = dispatch_block_create_with_qos_class( | |
437 | DISPATCH_BLOCK_ENFORCE_QOS_CLASS, EXPECTED_QOS, 0, ^{ | |
438 | // T_MAYFAIL; | |
439 | // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS, | |
440 | // "write handler block should run at correct QoS"); | |
441 | if (!handle_writing(shared.fd_pair, shared.wr_fd)) { | |
442 | /* finished handling the fd, tear down the source */ | |
443 | dispatch_source_cancel(write_src); | |
444 | dispatch_release(write_src); | |
5ba3f43e A |
445 | T_LOG("signal shared.wr_finished"); |
446 | semaphore_signal(shared.wr_finished); | |
813fb2f6 A |
447 | } |
448 | }); | |
449 | ||
450 | dispatch_source_set_event_handler(write_src, handler); | |
451 | dispatch_activate(write_src); | |
452 | ||
453 | break; | |
454 | } | |
455 | ||
456 | default: | |
457 | T_ASSERT_FAIL("unrecognized write mode: %d", shared.wr_mode); | |
458 | break; | |
459 | } | |
460 | ||
461 | if (shared.wr_finished) { | |
5ba3f43e A |
462 | T_LOG("wait shared.wr_finished"); |
463 | kern_return_t kret = semaphore_timedwait(shared.wr_finished, WRITE_timeout); | |
464 | if (kret == KERN_OPERATION_TIMED_OUT) { | |
465 | T_ASSERT_FAIL("write side semaphore timedout after %d seconds", WRITE_timeout.tv_sec); | |
813fb2f6 | 466 | } |
5ba3f43e A |
467 | T_QUIET; |
468 | T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.wr_finished"); | |
469 | semaphore_destroy(mach_task_self(), shared.wr_finished); | |
813fb2f6 A |
470 | } |
471 | ||
472 | T_LOG("writer finished, closing fd"); | |
473 | T_QUIET; T_ASSERT_POSIX_SUCCESS(close(shared.wr_fd), NULL); | |
474 | return NULL; | |
475 | } | |
476 | ||
477 | #pragma mark reading | |
478 | ||
479 | #define BUF_LEN 1024 | |
480 | static char final_string[BUF_LEN]; | |
481 | static size_t final_length; | |
482 | ||
483 | /* | |
484 | * Read from the master PTY descriptor. | |
485 | * | |
486 | * Returns false if EOF is encountered, and true otherwise. | |
487 | */ | |
488 | static bool | |
489 | handle_reading(enum fd_pair fd_pair, int fd) | |
490 | { | |
491 | char read_buf[BUF_LEN] = { 0 }; | |
492 | ssize_t bytes_rd = 0; | |
493 | ||
494 | do { | |
495 | if (bytes_rd == -1) { | |
496 | T_LOG("read was interrupted, retrying"); | |
497 | } | |
498 | bytes_rd = read(fd, read_buf, sizeof(read_buf) - 1); | |
499 | } while (bytes_rd == -1 && errno == EINTR); | |
500 | ||
5ba3f43e A |
501 | // T_LOG("read %zd bytes: '%s'", bytes_rd, read_buf); |
502 | ||
813fb2f6 A |
503 | T_QUIET; T_ASSERT_POSIX_SUCCESS(bytes_rd, "reading from file"); |
504 | T_QUIET; T_ASSERT_LE(bytes_rd, (ssize_t)EXPECTED_LEN, | |
505 | "read too much from file"); | |
506 | ||
507 | if (bytes_rd == 0) { | |
508 | T_LOG("read EOF from file"); | |
509 | return false; | |
510 | } | |
511 | ||
512 | read_buf[bytes_rd] = '\0'; | |
513 | strlcpy(&(final_string[final_length]), read_buf, | |
514 | sizeof(final_string) - final_length); | |
515 | final_length += (size_t)bytes_rd; | |
516 | ||
813fb2f6 A |
517 | T_QUIET; T_ASSERT_LE(final_length, EXPECTED_LEN, |
518 | "should not read more from file than what can be sent"); | |
519 | ||
a39ff7e2 | 520 | /* FIFOs don't send EOF when the write side closes */ |
813fb2f6 | 521 | if (final_length == strlen(EXPECTED_STRING) && |
a39ff7e2 | 522 | (fd_pair == FIFO_PAIR)) |
813fb2f6 | 523 | { |
a39ff7e2 | 524 | T_LOG("read all expected bytes from FIFO"); |
813fb2f6 A |
525 | return false; |
526 | } | |
527 | return true; | |
528 | } | |
529 | ||
530 | static void | |
531 | workqueue_read_fn(void ** __unused buf, int * __unused count) | |
532 | { | |
533 | // T_MAYFAIL; | |
534 | // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS, | |
535 | // "reader thread should be requested at correct QoS"); | |
536 | if (!handle_reading(shared.fd_pair, shared.rd_fd)) { | |
5ba3f43e A |
537 | T_LOG("signal shared.rd_finished"); |
538 | semaphore_signal(shared.rd_finished); | |
813fb2f6 A |
539 | } |
540 | ||
541 | reenable_workq(shared.rd_fd, EVFILT_READ); | |
542 | } | |
543 | ||
544 | static void | |
545 | read_from_fd(int fd, enum fd_pair fd_pair, enum read_mode mode) | |
546 | { | |
547 | int fd_flags; | |
548 | ||
549 | T_LOG("reader setting up"); | |
550 | ||
551 | bzero(final_string, sizeof(final_string)); | |
552 | ||
553 | fd_flags = fcntl(fd, F_GETFL, 0); | |
554 | T_QUIET; T_ASSERT_POSIX_SUCCESS(fd_flags, "fcntl(F_GETFL)"); | |
555 | ||
556 | if (!(fd_flags & O_NONBLOCK)) { | |
557 | T_QUIET; | |
558 | T_ASSERT_POSIX_SUCCESS(fcntl(fd, F_SETFL, | |
559 | fd_flags | O_NONBLOCK), NULL); | |
560 | } | |
561 | ||
562 | switch (mode) { | |
563 | case POLL_READ: { | |
564 | struct pollfd fds[] = { { .fd = fd, .events = POLLIN } }; | |
565 | wake_writer(); | |
5ba3f43e | 566 | |
813fb2f6 A |
567 | for (;;) { |
568 | fds[0].revents = 0; | |
569 | int pol = poll(fds, 1, READ_TIMEOUT_SECS * 1000); | |
570 | T_QUIET; T_ASSERT_POSIX_SUCCESS(pol, "poll"); | |
813fb2f6 A |
571 | T_QUIET; T_ASSERT_NE(pol, 0, |
572 | "poll should not time out after %d seconds, read %zd out " | |
573 | "of %zu bytes", | |
574 | READ_TIMEOUT_SECS, final_length, strlen(EXPECTED_STRING)); | |
575 | T_QUIET; T_ASSERT_FALSE(fds[0].revents & POLLERR, | |
576 | "should not see an error on the device"); | |
5ba3f43e A |
577 | T_QUIET; T_ASSERT_FALSE(fds[0].revents & POLLNVAL, |
578 | "should not set up an invalid poll"); | |
813fb2f6 A |
579 | |
580 | if (!handle_reading(fd_pair, fd)) { | |
581 | break; | |
582 | } | |
583 | } | |
584 | break; | |
585 | } | |
586 | ||
587 | case SELECT_READ: | |
588 | wake_writer(); | |
589 | ||
590 | for (;;) { | |
591 | struct timeval tv = { .tv_sec = READ_TIMEOUT_SECS }; | |
592 | ||
593 | fd_set read_fd; | |
594 | FD_ZERO(&read_fd); | |
595 | FD_SET(fd, &read_fd); | |
596 | fd_set err_fd; | |
597 | FD_ZERO(&err_fd); | |
598 | FD_SET(fd, &err_fd); | |
599 | ||
600 | int sel = select(fd + 1, &read_fd, NULL, NULL/*&err_fd*/, &tv); | |
601 | if (sel == -1 && errno == EINTR) { | |
602 | T_LOG("select interrupted"); | |
603 | continue; | |
604 | } | |
605 | (void)fd_pair; | |
606 | ||
607 | T_QUIET; T_ASSERT_POSIX_SUCCESS(sel, "select"); | |
608 | ||
813fb2f6 A |
609 | T_QUIET; T_ASSERT_NE(sel, 0, |
610 | "select waited for %d seconds and timed out", | |
611 | READ_TIMEOUT_SECS); | |
612 | ||
813fb2f6 A |
613 | /* didn't fail or time out, therefore data is ready */ |
614 | T_QUIET; T_ASSERT_NE(FD_ISSET(fd, &read_fd), 0, | |
615 | "select should show reading fd as readable"); | |
616 | ||
617 | if (!handle_reading(fd_pair, fd)) { | |
618 | break; | |
619 | } | |
620 | } | |
621 | break; | |
622 | ||
623 | case KEVENT_READ: /* FALLTHROUGH */ | |
624 | case KEVENT64_READ: /* FALLTHROUGH */ | |
625 | case KEVENT_QOS_READ: { | |
626 | union mode rd_mode = { .rd = shared.rd_mode }; | |
627 | drive_kq(true, rd_mode, fd_pair, shared.rd_fd); | |
628 | break; | |
629 | } | |
630 | ||
631 | case WORKQ_READ: { | |
5ba3f43e A |
632 | // prohibit ourselves from going multi-threaded see:rdar://33296008 |
633 | _dispatch_prohibit_transition_to_multithreaded(true); | |
634 | T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent( | |
813fb2f6 A |
635 | workqueue_fn, workqueue_read_fn, 0, 0), NULL); |
636 | ||
5ba3f43e A |
637 | T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.rd_finished, SYNC_POLICY_FIFO, 0), |
638 | "semaphore_create shared.rd_finished"); | |
639 | ||
640 | T_QUIET; | |
641 | T_ASSERT_NE_UINT(shared.rd_finished, (unsigned)MACH_PORT_NULL, "semaphore_create"); | |
813fb2f6 A |
642 | |
643 | int changes = 1; | |
644 | struct kevent_qos_s events[] = {{ | |
645 | .ident = (uint64_t)shared.rd_fd, | |
646 | .filter = EVFILT_READ, | |
647 | .flags = EV_ADD | EV_UDATA_SPECIFIC | EV_DISPATCH | EV_VANISHED, | |
648 | .fflags = NOTE_LOWAT, | |
649 | .data = 1, | |
650 | .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS, | |
651 | 0, 0) | |
652 | }}; | |
653 | ||
654 | for (;;) { | |
655 | int kev = kevent_qos(-1, changes == 0 ? NULL : events, changes, | |
656 | events, 1, NULL, NULL, | |
657 | KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS); | |
658 | if (kev == -1 && errno == EINTR) { | |
659 | changes = 0; | |
660 | T_LOG("kevent_qos was interrupted"); | |
661 | continue; | |
662 | } | |
663 | ||
664 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent_qos"); | |
665 | break; | |
666 | } | |
667 | ||
668 | wake_writer(); | |
669 | break; | |
670 | } | |
671 | ||
672 | case DISPATCH_READ: { | |
673 | dispatch_source_t read_src; | |
674 | ||
5ba3f43e A |
675 | T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.rd_finished, SYNC_POLICY_FIFO, 0), |
676 | "semaphore_create shared.rd_finished"); | |
677 | ||
678 | T_QUIET; | |
679 | T_ASSERT_NE_UINT(shared.rd_finished, (unsigned)MACH_PORT_NULL, "semaphore_create"); | |
813fb2f6 A |
680 | |
681 | read_src = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, | |
682 | (uintptr_t)fd, 0, NULL); | |
683 | T_QUIET; T_ASSERT_NOTNULL(read_src, | |
684 | "dispatch_source_create(DISPATCH_SOURCE_TYPE_READ)"); | |
685 | ||
686 | dispatch_block_t handler = dispatch_block_create_with_qos_class( | |
687 | DISPATCH_BLOCK_ENFORCE_QOS_CLASS, EXPECTED_QOS, 0, ^{ | |
688 | // T_MAYFAIL; | |
689 | // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS, | |
690 | // "read handler block should run at correct QoS"); | |
691 | ||
692 | if (!handle_reading(fd_pair, fd)) { | |
693 | /* finished handling the fd, tear down the source */ | |
694 | dispatch_source_cancel(read_src); | |
695 | dispatch_release(read_src); | |
5ba3f43e A |
696 | T_LOG("signal shared.rd_finished"); |
697 | semaphore_signal(shared.rd_finished); | |
813fb2f6 A |
698 | } |
699 | }); | |
700 | ||
701 | dispatch_source_set_event_handler(read_src, handler); | |
702 | dispatch_activate(read_src); | |
703 | ||
704 | wake_writer(); | |
705 | break; | |
706 | } | |
707 | ||
708 | default: | |
709 | T_ASSERT_FAIL("unrecognized read mode: %d", mode); | |
710 | break; | |
711 | } | |
712 | ||
713 | if (shared.rd_finished) { | |
5ba3f43e A |
714 | T_LOG("wait shared.rd_finished"); |
715 | kern_return_t kret = semaphore_timedwait(shared.rd_finished, READ_timeout); | |
716 | if (kret == KERN_OPERATION_TIMED_OUT) { | |
717 | T_ASSERT_FAIL("reading timed out after %d seconds", READ_timeout.tv_sec); | |
813fb2f6 | 718 | } |
5ba3f43e A |
719 | T_QUIET; |
720 | T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.rd_finished"); | |
813fb2f6 A |
721 | } |
722 | ||
723 | T_EXPECT_EQ_STR(final_string, EXPECTED_STRING, | |
724 | "reader should receive valid string"); | |
725 | T_QUIET; T_ASSERT_POSIX_SUCCESS(close(fd), NULL); | |
726 | } | |
727 | ||
728 | #pragma mark file setup | |
729 | ||
730 | static void | |
731 | fd_pair_init(enum fd_pair fd_pair, int *rd_fd, int *wr_fd) | |
732 | { | |
733 | switch (fd_pair) { | |
734 | case PTY_PAIR: | |
735 | T_ASSERT_POSIX_SUCCESS(openpty(rd_fd, wr_fd, NULL, NULL, NULL), | |
736 | NULL); | |
737 | break; | |
738 | ||
739 | case FIFO_PAIR: { | |
740 | char fifo_path[] = "/tmp/async-io-fifo.XXXXXX"; | |
741 | T_QUIET; T_ASSERT_NOTNULL(mktemp(fifo_path), NULL); | |
742 | ||
743 | T_ASSERT_POSIX_SUCCESS(mkfifo(fifo_path, 0700), "mkfifo(%s, 0700)", | |
744 | fifo_path); | |
745 | /* | |
746 | * Opening the read side of a pipe will block until the write | |
747 | * side opens -- use O_NONBLOCK. | |
748 | */ | |
749 | *rd_fd = open(fifo_path, O_RDONLY | O_NONBLOCK); | |
750 | T_QUIET; T_ASSERT_POSIX_SUCCESS(*rd_fd, "open(... O_RDONLY)"); | |
751 | *wr_fd = open(fifo_path, O_WRONLY | O_NONBLOCK); | |
752 | T_QUIET; T_ASSERT_POSIX_SUCCESS(*wr_fd, "open(... O_WRONLY)"); | |
753 | break; | |
754 | } | |
755 | ||
756 | case PIPE_PAIR: { | |
757 | int pipe_fds[2]; | |
758 | T_ASSERT_POSIX_SUCCESS(pipe(pipe_fds), NULL); | |
759 | *rd_fd = pipe_fds[0]; | |
760 | *wr_fd = pipe_fds[1]; | |
761 | break; | |
762 | } | |
763 | ||
764 | case SOCKET_PAIR: { | |
765 | int sock_fds[2]; | |
766 | T_ASSERT_POSIX_SUCCESS(socketpair(AF_UNIX, SOCK_STREAM, 0, sock_fds), | |
767 | NULL); | |
768 | *rd_fd = sock_fds[0]; | |
769 | *wr_fd = sock_fds[1]; | |
770 | break; | |
771 | } | |
772 | ||
773 | default: | |
774 | T_ASSERT_FAIL("unknown descriptor pair type: %d", fd_pair); | |
775 | break; | |
776 | } | |
777 | ||
778 | T_QUIET; T_ASSERT_NE(*rd_fd, -1, "reading descriptor"); | |
779 | T_QUIET; T_ASSERT_NE(*wr_fd, -1, "writing descriptor"); | |
780 | } | |
781 | ||
782 | #pragma mark single process | |
783 | ||
784 | static void | |
785 | drive_threads(enum fd_pair fd_pair, enum read_mode rd_mode, | |
786 | enum write_mode wr_mode) | |
787 | { | |
788 | pthread_t thread; | |
789 | ||
790 | shared.fd_pair = fd_pair; | |
791 | shared.rd_mode = rd_mode; | |
792 | shared.wr_mode = wr_mode; | |
793 | fd_pair_init(fd_pair, &(shared.rd_fd), &(shared.wr_fd)); | |
794 | ||
795 | shared.wr_kind = THREAD_WRITER; | |
5ba3f43e A |
796 | T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_wait.sem, SYNC_POLICY_FIFO, 0), |
797 | "semaphore_create shared.wr_wait.sem"); | |
813fb2f6 A |
798 | |
799 | T_QUIET; | |
800 | T_ASSERT_POSIX_ZERO(pthread_create(&thread, NULL, write_to_fd, NULL), | |
801 | NULL); | |
802 | T_LOG("created writer thread"); | |
803 | ||
804 | read_from_fd(shared.rd_fd, fd_pair, rd_mode); | |
5ba3f43e A |
805 | |
806 | T_ASSERT_POSIX_ZERO(pthread_join(thread, NULL), NULL); | |
807 | ||
813fb2f6 A |
808 | T_END; |
809 | } | |
810 | ||
811 | #pragma mark multiple processes | |
812 | ||
813 | static void __attribute__((noreturn)) | |
814 | drive_processes(enum fd_pair fd_pair, enum read_mode rd_mode, enum write_mode wr_mode) | |
815 | { | |
816 | shared.fd_pair = fd_pair; | |
817 | shared.rd_mode = rd_mode; | |
818 | shared.wr_mode = wr_mode; | |
819 | fd_pair_init(fd_pair, &(shared.rd_fd), &(shared.wr_fd)); | |
820 | ||
821 | shared.wr_kind = PROCESS_WRITER; | |
822 | int fds[2]; | |
823 | T_QUIET; T_ASSERT_POSIX_SUCCESS(pipe(fds), NULL); | |
824 | shared.wr_wait.out_fd = fds[0]; | |
825 | shared.wr_wait.in_fd = fds[1]; | |
826 | ||
827 | T_LOG("starting subprocesses"); | |
828 | dt_helper_t helpers[2] = { | |
829 | dt_fork_helper("reader_helper"), | |
830 | dt_fork_helper("writer_helper") | |
831 | }; | |
832 | ||
833 | close(shared.rd_fd); | |
834 | close(shared.wr_fd); | |
835 | ||
836 | dt_run_helpers(helpers, 2, 50000); | |
837 | } | |
838 | ||
839 | T_HELPER_DECL(reader_helper, "Read asynchronously") | |
840 | { | |
841 | close(shared.wr_fd); | |
842 | read_from_fd(shared.rd_fd, shared.fd_pair, shared.rd_mode); | |
843 | T_END; | |
844 | } | |
845 | ||
846 | T_HELPER_DECL(writer_helper, "Write asynchronously") | |
847 | { | |
848 | close(shared.rd_fd); | |
849 | write_to_fd(NULL); | |
850 | } | |
851 | ||
852 | #pragma mark tests | |
853 | ||
854 | #define WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \ | |
855 | write_mode, read_name, read_mode) \ | |
5ba3f43e | 856 | T_DECL(desc_name##_r##read_name##_w##write_name##_procs, "read changes to a " \ |
813fb2f6 A |
857 | #desc_name " with " #read_name " and writing " #write_str \ |
858 | " across two processes") \ | |
859 | { \ | |
860 | drive_processes(fd_pair, read_mode, write_mode); \ | |
861 | } | |
862 | #define WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \ | |
863 | write_mode, read_name, read_mode) \ | |
5ba3f43e | 864 | T_DECL(desc_name##_r##read_name##_w##write_name##_thds, "read changes to a " \ |
813fb2f6 A |
865 | #desc_name " with " #read_name " and writing " #write_str) \ |
866 | { \ | |
867 | drive_threads(fd_pair, read_mode, write_mode); \ | |
868 | } | |
869 | ||
870 | #define WR_DECL(desc_name, fd_pair, write_name, write_str, write_mode, \ | |
871 | read_name, read_mode) \ | |
872 | WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \ | |
873 | write_mode, read_name, read_mode) \ | |
874 | WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \ | |
875 | write_mode, read_name, read_mode) | |
876 | ||
877 | #define RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \ | |
878 | WR_DECL(desc_name, fd_pair, full, "the full string", FULL_WRITE, \ | |
879 | read_name, read_mode) \ | |
5ba3f43e | 880 | WR_DECL(desc_name, fd_pair, inc, "incrementally", \ |
813fb2f6 A |
881 | INCREMENTAL_WRITE, read_name, read_mode) |
882 | ||
883 | #define RD_DECL_DISPATCH_ONLY(suffix, desc_name, fd_pair, read_name, \ | |
884 | read_mode) \ | |
5ba3f43e | 885 | WR_DECL##suffix(desc_name, fd_pair, inc_dispatch, \ |
813fb2f6 A |
886 | "incrementally with a dispatch source", \ |
887 | DISPATCH_INCREMENTAL_WRITE, read_name, read_mode) | |
888 | #define RD_DECL_WORKQ_ONLY(suffix, desc_name, fd_pair, read_name, \ | |
889 | read_mode) \ | |
5ba3f43e | 890 | WR_DECL##suffix(desc_name, fd_pair, inc_workq, \ |
813fb2f6 A |
891 | "incrementally with the workqueue", \ |
892 | WORKQ_INCREMENTAL_WRITE, read_name, read_mode) | |
893 | ||
894 | #define RD_DECL(desc_name, fd_pair, read_name, read_mode) \ | |
895 | RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \ | |
896 | RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode) | |
897 | // RD_DECL_WORKQ_ONLY(, desc_name, fd_pair, read_name, read_mode) | |
898 | ||
899 | /* | |
900 | * dispatch_source tests cannot share the same process as other workqueue | |
901 | * tests. | |
902 | */ | |
903 | #define RD_DECL_DISPATCH(desc_name, fd_pair, read_name, read_mode) \ | |
904 | RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \ | |
905 | RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode) \ | |
906 | RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \ | |
907 | read_mode) | |
908 | ||
909 | /* | |
910 | * Workqueue tests cannot share the same process as other workqueue or | |
911 | * dispatch_source tests. | |
912 | #define RD_DECL_WORKQ(desc_name, fd_pair, read_name, read_mode) \ | |
913 | RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \ | |
914 | RD_DECL_DISPATCH_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \ | |
915 | read_mode) \ | |
916 | RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \ | |
917 | read_mode) | |
918 | */ | |
919 | ||
920 | #define PAIR_DECL(desc_name, fd_pair) \ | |
921 | RD_DECL(desc_name, fd_pair, poll, POLL_READ) \ | |
922 | RD_DECL(desc_name, fd_pair, select, SELECT_READ) \ | |
923 | RD_DECL(desc_name, fd_pair, kevent, KEVENT_READ) \ | |
924 | RD_DECL(desc_name, fd_pair, kevent64, KEVENT64_READ) \ | |
925 | RD_DECL(desc_name, fd_pair, kevent_qos, KEVENT_QOS_READ) \ | |
926 | RD_DECL_DISPATCH(desc_name, fd_pair, dispatch_source, DISPATCH_READ) | |
927 | // RD_DECL_WORKQ(desc_name, fd_pair, workq, WORKQ_READ) | |
928 | ||
929 | PAIR_DECL(tty, PTY_PAIR) | |
930 | PAIR_DECL(pipe, PIPE_PAIR) | |
931 | PAIR_DECL(fifo, FIFO_PAIR) | |
932 | PAIR_DECL(socket, SOCKET_PAIR) |