]>
Commit | Line | Data |
---|---|---|
813fb2f6 A |
1 | #ifdef T_NAMESPACE |
2 | #undef T_NAMESPACE | |
3 | #endif | |
5ba3f43e | 4 | |
813fb2f6 | 5 | #include <darwintest.h> |
5ba3f43e | 6 | #include <mach/mach.h> |
813fb2f6 A |
7 | #include <darwintest_multiprocess.h> |
8 | ||
9 | #include <assert.h> | |
10 | #include <dispatch/dispatch.h> | |
5ba3f43e | 11 | #include <dispatch/private.h> |
813fb2f6 A |
12 | #include <err.h> |
13 | #include <errno.h> | |
14 | #include <fcntl.h> | |
15 | #include <poll.h> | |
16 | #include <pthread.h> | |
17 | #include <pthread/workqueue_private.h> | |
18 | #include <stdio.h> | |
19 | #include <stdlib.h> | |
20 | #include <string.h> | |
21 | #include <sys/event.h> | |
22 | #include <sys/socket.h> | |
23 | #include <sys/stat.h> | |
24 | #include <sys/time.h> | |
25 | #include <sys/types.h> | |
26 | #include <sys/wait.h> | |
27 | #include <sysexits.h> | |
28 | #include <unistd.h> | |
29 | #include <util.h> | |
30 | #include <System/sys/event.h> /* kevent_qos */ | |
31 | ||
5ba3f43e | 32 | T_GLOBAL_META( |
0a7de745 A |
33 | T_META_NAMESPACE("xnu.kevent"), |
34 | T_META_CHECK_LEAKS(false), | |
35 | T_META_LTEPHASE(LTE_POSTINIT)); | |
813fb2f6 A |
36 | |
37 | /* | |
38 | * Test to validate that monitoring a PTY device, FIFO, pipe, or socket pair in | |
39 | * a dispatch source, kqueue, poll, or select delivers read events within and | |
40 | * between processes as expected. | |
41 | * | |
42 | * This test catches issues with watching special devices in kqueue(), | |
43 | * which has tricky special cases for character devices like PTYs. | |
44 | * | |
45 | * It also exercises the path to wake up a dispatch worker thread from the | |
46 | * special device kqueue event, which is also a special case in kqueue(). | |
47 | * | |
48 | * See rdar://problem/26240299&26220074&26226862&28625427 for examples and | |
49 | * history. | |
50 | */ | |
51 | ||
52 | #define EXPECTED_STRING "abcdefghijklmnopqrstuvwxyz. ABCDEFGHIJKLMNOPQRSTUVWXYZ. 1234567890" | |
53 | #define EXPECTED_LEN strlen(EXPECTED_STRING) | |
54 | ||
55 | #define READ_SETUP_TIMEOUT_SECS 2 | |
56 | #define WRITE_TIMEOUT_SECS 4 | |
5ba3f43e | 57 | #define READ_TIMEOUT_SECS 4 |
813fb2f6 A |
58 | #define INCREMENTAL_WRITE_SLEEP_USECS 50 |
59 | ||
5ba3f43e A |
60 | static mach_timespec_t READ_SETUP_timeout = {.tv_sec = READ_SETUP_TIMEOUT_SECS, .tv_nsec = 0}; |
61 | static mach_timespec_t READ_timeout = {.tv_sec = READ_TIMEOUT_SECS, .tv_nsec = 0}; | |
62 | static mach_timespec_t WRITE_timeout = {.tv_sec = WRITE_TIMEOUT_SECS, .tv_nsec = 0}; | |
63 | ||
813fb2f6 A |
64 | enum fd_pair { |
65 | PTY_PAIR, | |
66 | FIFO_PAIR, | |
67 | PIPE_PAIR, | |
68 | SOCKET_PAIR | |
69 | }; | |
70 | ||
71 | enum write_mode { | |
72 | FULL_WRITE, | |
73 | INCREMENTAL_WRITE, | |
74 | KEVENT_INCREMENTAL_WRITE, | |
75 | KEVENT64_INCREMENTAL_WRITE, | |
76 | KEVENT_QOS_INCREMENTAL_WRITE, | |
77 | WORKQ_INCREMENTAL_WRITE, | |
78 | DISPATCH_INCREMENTAL_WRITE | |
79 | }; | |
80 | ||
81 | enum read_mode { | |
82 | POLL_READ, | |
83 | SELECT_READ, | |
84 | KEVENT_READ, | |
85 | KEVENT64_READ, | |
86 | KEVENT_QOS_READ, | |
87 | WORKQ_READ, | |
88 | DISPATCH_READ | |
89 | }; | |
90 | ||
91 | union mode { | |
92 | enum read_mode rd; | |
93 | enum write_mode wr; | |
94 | }; | |
95 | ||
96 | static struct { | |
97 | enum fd_pair fd_pair; | |
98 | enum write_mode wr_mode; | |
99 | int wr_fd; | |
100 | enum read_mode rd_mode; | |
101 | int rd_fd; | |
102 | ||
103 | enum writer_kind { | |
104 | THREAD_WRITER, /* sem */ | |
105 | PROCESS_WRITER /* fd */ | |
106 | } wr_kind; | |
107 | union { | |
5ba3f43e | 108 | semaphore_t sem; |
813fb2f6 A |
109 | struct { |
110 | int in_fd; | |
111 | int out_fd; | |
112 | }; | |
113 | } wr_wait; | |
5ba3f43e A |
114 | semaphore_t wr_finished; |
115 | semaphore_t rd_finished; | |
813fb2f6 A |
116 | } shared; |
117 | ||
118 | static bool handle_reading(enum fd_pair fd_pair, int fd); | |
119 | static bool handle_writing(enum fd_pair fd_pair, int fd); | |
120 | static void drive_kq(bool reading, union mode mode, enum fd_pair fd_pair, | |
0a7de745 | 121 | int fd); |
813fb2f6 A |
122 | |
123 | #pragma mark writing | |
124 | ||
125 | static void | |
126 | wake_writer(void) | |
127 | { | |
128 | T_LOG("waking writer"); | |
129 | ||
130 | switch (shared.wr_kind) { | |
131 | case THREAD_WRITER: | |
5ba3f43e A |
132 | T_LOG("signal shared.wr_wait.sem"); |
133 | semaphore_signal(shared.wr_wait.sem); | |
813fb2f6 A |
134 | break; |
135 | case PROCESS_WRITER: { | |
136 | char tmp = 'a'; | |
137 | close(shared.wr_wait.out_fd); | |
138 | T_QUIET; T_ASSERT_POSIX_SUCCESS(write( | |
0a7de745 | 139 | shared.wr_wait.in_fd, &tmp, 1), NULL); |
813fb2f6 A |
140 | break; |
141 | } | |
142 | } | |
143 | } | |
144 | ||
145 | static void | |
146 | writer_wait(void) | |
147 | { | |
148 | switch (shared.wr_kind) { | |
149 | case THREAD_WRITER: | |
5ba3f43e A |
150 | T_LOG("wait shared.wr_wait.sem"); |
151 | kern_return_t kret = semaphore_timedwait(shared.wr_wait.sem, READ_SETUP_timeout); | |
152 | ||
153 | if (kret == KERN_OPERATION_TIMED_OUT) { | |
154 | T_ASSERT_FAIL("THREAD_WRITER semaphore timedout after %d seconds", READ_SETUP_timeout.tv_sec); | |
155 | } | |
156 | T_QUIET; | |
157 | T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.wr_wait.sem"); | |
813fb2f6 | 158 | break; |
5ba3f43e | 159 | |
813fb2f6 A |
160 | case PROCESS_WRITER: { |
161 | char tmp; | |
162 | close(shared.wr_wait.in_fd); | |
163 | T_QUIET; T_ASSERT_POSIX_SUCCESS(read( | |
0a7de745 | 164 | shared.wr_wait.out_fd, &tmp, 1), NULL); |
813fb2f6 A |
165 | break; |
166 | } | |
167 | } | |
168 | ||
169 | T_LOG("writer woken up, starting to write"); | |
170 | } | |
171 | ||
172 | static bool | |
173 | handle_writing(enum fd_pair __unused fd_pair, int fd) | |
174 | { | |
175 | static unsigned int cur_char = 0; | |
176 | T_QUIET; T_ASSERT_POSIX_SUCCESS(write(fd, | |
0a7de745 | 177 | &(EXPECTED_STRING[cur_char]), 1), NULL); |
813fb2f6 A |
178 | cur_char++; |
179 | ||
0a7de745 | 180 | return cur_char < EXPECTED_LEN; |
813fb2f6 A |
181 | } |
182 | ||
183 | #define EXPECTED_QOS QOS_CLASS_USER_INITIATED | |
184 | ||
185 | static void | |
186 | reenable_workq(int fd, int16_t filt) | |
187 | { | |
188 | struct kevent_qos_s events[] = {{ | |
0a7de745 A |
189 | .ident = (uint64_t)fd, |
190 | .filter = filt, | |
191 | .flags = EV_ENABLE | EV_UDATA_SPECIFIC | EV_DISPATCH, | |
192 | .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS, | |
193 | 0, 0), | |
194 | .fflags = NOTE_LOWAT, | |
195 | .data = 1 | |
196 | }}; | |
813fb2f6 A |
197 | |
198 | int kev = kevent_qos(-1, events, 1, events, 1, NULL, NULL, | |
0a7de745 | 199 | KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS); |
813fb2f6 A |
200 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "reenable workq in kevent_qos"); |
201 | } | |
202 | ||
203 | static void | |
204 | workqueue_write_fn(void ** __unused buf, int * __unused count) | |
205 | { | |
206 | // T_MAYFAIL; | |
207 | // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS, | |
0a7de745 | 208 | // "writer thread should be woken up at correct QoS"); |
813fb2f6 A |
209 | if (!handle_writing(shared.fd_pair, shared.wr_fd)) { |
210 | /* finished handling the fd, tear down the source */ | |
5ba3f43e A |
211 | T_LOG("signal shared.wr_finished"); |
212 | semaphore_signal(shared.wr_finished); | |
813fb2f6 A |
213 | return; |
214 | } | |
215 | ||
216 | reenable_workq(shared.wr_fd, EVFILT_WRITE); | |
217 | } | |
218 | ||
219 | static void | |
220 | workqueue_fn(pthread_priority_t __unused priority) | |
221 | { | |
222 | T_ASSERT_FAIL("workqueue function callback was called"); | |
223 | } | |
224 | ||
225 | static void | |
226 | drive_kq(bool reading, union mode mode, enum fd_pair fd_pair, int fd) | |
227 | { | |
228 | struct timespec timeout = { .tv_sec = READ_TIMEOUT_SECS }; | |
229 | int kev = -1; | |
230 | ||
231 | struct kevent events; | |
232 | EV_SET(&events, fd, reading ? EVFILT_READ : EVFILT_WRITE, EV_ADD, | |
0a7de745 | 233 | NOTE_LOWAT, 1, NULL); |
813fb2f6 A |
234 | struct kevent64_s events64; |
235 | EV_SET64(&events64, fd, reading ? EVFILT_READ : EVFILT_WRITE, EV_ADD, | |
0a7de745 | 236 | NOTE_LOWAT, 1, 0, 0, 0); |
813fb2f6 | 237 | struct kevent_qos_s events_qos[] = {{ |
0a7de745 A |
238 | .ident = (uint64_t)fd, |
239 | .filter = reading ? EVFILT_READ : EVFILT_WRITE, | |
240 | .flags = EV_ADD, | |
241 | .fflags = NOTE_LOWAT, | |
242 | .data = 1 | |
243 | }, { | |
244 | .ident = 0, | |
245 | .filter = EVFILT_TIMER, | |
246 | .flags = EV_ADD, | |
247 | .fflags = NOTE_SECONDS, | |
248 | .data = READ_TIMEOUT_SECS | |
249 | }}; | |
813fb2f6 A |
250 | |
251 | /* determine which variant of kevent to use */ | |
252 | enum read_mode which_kevent; | |
253 | if (reading) { | |
254 | which_kevent = mode.rd; | |
255 | } else { | |
256 | if (mode.wr == KEVENT_INCREMENTAL_WRITE) { | |
257 | which_kevent = KEVENT_READ; | |
258 | } else if (mode.wr == KEVENT64_INCREMENTAL_WRITE) { | |
259 | which_kevent = KEVENT64_READ; | |
260 | } else if (mode.wr == KEVENT_QOS_INCREMENTAL_WRITE) { | |
261 | which_kevent = KEVENT_QOS_READ; | |
262 | } else { | |
263 | T_ASSERT_FAIL("unexpected mode: %d", mode.wr); | |
264 | __builtin_unreachable(); | |
265 | } | |
266 | } | |
267 | ||
268 | int kq_fd = kqueue(); | |
269 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kq_fd, "kqueue"); | |
270 | ||
271 | switch (which_kevent) { | |
272 | case KEVENT_READ: | |
273 | kev = kevent(kq_fd, &events, 1, NULL, 0, NULL); | |
274 | break; | |
275 | case KEVENT64_READ: | |
276 | kev = kevent64(kq_fd, &events64, 1, NULL, 0, 0, NULL); | |
277 | break; | |
278 | case KEVENT_QOS_READ: | |
279 | kev = kevent_qos(kq_fd, events_qos, 2, NULL, 0, NULL, NULL, 0); | |
280 | break; | |
281 | case POLL_READ: /* FALLTHROUGH */ | |
282 | case SELECT_READ: /* FALLTHROUGH */ | |
283 | case DISPATCH_READ: /* FALLTHROUGH */ | |
284 | case WORKQ_READ: /* FALLTHROUGH */ | |
285 | default: | |
286 | T_ASSERT_FAIL("unexpected mode: %d", reading ? mode.rd : mode.wr); | |
287 | break; | |
288 | } | |
289 | ||
290 | if (reading) { | |
291 | wake_writer(); | |
292 | } else { | |
293 | writer_wait(); | |
294 | } | |
295 | ||
296 | for (;;) { | |
297 | switch (which_kevent) { | |
298 | case KEVENT_READ: | |
299 | kev = kevent(kq_fd, NULL, 0, &events, 1, &timeout); | |
300 | break; | |
301 | case KEVENT64_READ: | |
302 | kev = kevent64(kq_fd, NULL, 0, &events64, 1, 0, &timeout); | |
303 | break; | |
304 | case KEVENT_QOS_READ: | |
305 | kev = kevent_qos(kq_fd, NULL, 0, events_qos, 2, NULL, NULL, 0); | |
306 | ||
307 | /* check for a timeout */ | |
308 | for (int i = 0; i < kev; i++) { | |
309 | if (events_qos[i].filter == EVFILT_TIMER) { | |
310 | kev = 0; | |
311 | } | |
312 | } | |
313 | break; | |
314 | case POLL_READ: /* FALLTHROUGH */ | |
315 | case SELECT_READ: /* FALLTHROUGH */ | |
316 | case DISPATCH_READ: /* FALLTHROUGH */ | |
317 | case WORKQ_READ: /* FALLTHROUGH */ | |
318 | default: | |
319 | T_ASSERT_FAIL("unexpected mode: %d", reading ? mode.rd : mode.wr); | |
320 | break; | |
321 | } | |
322 | ||
323 | if (kev == -1 && errno == EINTR) { | |
324 | T_LOG("kevent was interrupted"); | |
325 | continue; | |
326 | } | |
327 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent"); | |
813fb2f6 A |
328 | T_QUIET; T_ASSERT_NE(kev, 0, "kevent timed out"); |
329 | ||
330 | if (reading) { | |
331 | if (!handle_reading(fd_pair, fd)) { | |
332 | break; | |
333 | } | |
334 | } else { | |
335 | if (!handle_writing(fd_pair, fd)) { | |
336 | break; | |
337 | } | |
338 | } | |
339 | } | |
340 | ||
341 | close(kq_fd); | |
342 | } | |
343 | ||
344 | static void * | |
345 | write_to_fd(void * __unused ctx) | |
346 | { | |
347 | ssize_t bytes_wr = 0; | |
348 | ||
349 | writer_wait(); | |
350 | ||
351 | switch (shared.wr_mode) { | |
352 | case FULL_WRITE: | |
353 | do { | |
354 | if (bytes_wr == -1) { | |
355 | T_LOG("write from child was interrupted"); | |
356 | } | |
357 | bytes_wr = write(shared.wr_fd, EXPECTED_STRING, | |
0a7de745 | 358 | EXPECTED_LEN); |
813fb2f6 A |
359 | } while (bytes_wr == -1 && errno == EINTR); |
360 | T_QUIET; T_ASSERT_POSIX_SUCCESS(bytes_wr, "write"); | |
361 | T_QUIET; T_ASSERT_EQ(bytes_wr, (ssize_t)EXPECTED_LEN, | |
0a7de745 | 362 | "wrote enough bytes"); |
813fb2f6 A |
363 | break; |
364 | ||
365 | case INCREMENTAL_WRITE: | |
0a7de745 | 366 | for (unsigned int i = 0; i < EXPECTED_LEN; i++) { |
813fb2f6 A |
367 | T_QUIET; |
368 | T_ASSERT_POSIX_SUCCESS(write(shared.wr_fd, | |
0a7de745 | 369 | &(EXPECTED_STRING[i]), 1), NULL); |
813fb2f6 A |
370 | usleep(INCREMENTAL_WRITE_SLEEP_USECS); |
371 | } | |
372 | break; | |
373 | ||
374 | case KEVENT_INCREMENTAL_WRITE: /* FALLTHROUGH */ | |
375 | case KEVENT64_INCREMENTAL_WRITE: /* FALLTHROUGH */ | |
376 | case KEVENT_QOS_INCREMENTAL_WRITE: { | |
377 | union mode mode = { .wr = shared.wr_mode }; | |
378 | drive_kq(false, mode, shared.fd_pair, shared.wr_fd); | |
379 | break; | |
380 | } | |
381 | ||
382 | case WORKQ_INCREMENTAL_WRITE: { | |
5ba3f43e A |
383 | // prohibit ourselves from going multi-threaded see:rdar://33296008 |
384 | _dispatch_prohibit_transition_to_multithreaded(true); | |
813fb2f6 A |
385 | int changes = 1; |
386 | ||
5ba3f43e | 387 | T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_finished, SYNC_POLICY_FIFO, 0), |
0a7de745 | 388 | "semaphore_create shared.wr_finished"); |
813fb2f6 | 389 | |
5ba3f43e A |
390 | T_QUIET; |
391 | T_ASSERT_NE_UINT(shared.wr_finished, (unsigned)MACH_PORT_NULL, "wr_finished semaphore_create"); | |
392 | ||
393 | T_QUIET; | |
394 | T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent(workqueue_fn, workqueue_write_fn, 0, 0), NULL); | |
813fb2f6 A |
395 | |
396 | struct kevent_qos_s events[] = {{ | |
0a7de745 A |
397 | .ident = (uint64_t)shared.wr_fd, |
398 | .filter = EVFILT_WRITE, | |
399 | .flags = EV_ADD | EV_UDATA_SPECIFIC | EV_DISPATCH | EV_VANISHED, | |
400 | .fflags = NOTE_LOWAT, | |
401 | .data = 1, | |
402 | .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS, | |
403 | 0, 0) | |
404 | }}; | |
813fb2f6 A |
405 | |
406 | for (;;) { | |
407 | int kev = kevent_qos(-1, changes == 0 ? NULL : events, changes, | |
0a7de745 A |
408 | events, 1, NULL, NULL, |
409 | KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS); | |
813fb2f6 A |
410 | if (kev == -1 && errno == EINTR) { |
411 | changes = 0; | |
412 | T_LOG("kevent_qos was interrupted"); | |
413 | continue; | |
414 | } | |
415 | ||
416 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent_qos"); | |
417 | break; | |
418 | } | |
419 | break; | |
420 | } | |
421 | ||
422 | case DISPATCH_INCREMENTAL_WRITE: { | |
423 | dispatch_source_t write_src; | |
424 | ||
5ba3f43e | 425 | T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_finished, SYNC_POLICY_FIFO, 0), |
0a7de745 | 426 | "semaphore_create shared.wr_finished"); |
5ba3f43e A |
427 | |
428 | T_QUIET; | |
429 | T_ASSERT_NE_UINT(shared.wr_finished, (unsigned)MACH_PORT_NULL, "semaphore_create"); | |
813fb2f6 A |
430 | |
431 | write_src = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, | |
0a7de745 | 432 | (uintptr_t)shared.wr_fd, 0, NULL); |
813fb2f6 | 433 | T_QUIET; T_ASSERT_NOTNULL(write_src, |
0a7de745 | 434 | "dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE ...)"); |
813fb2f6 A |
435 | |
436 | dispatch_block_t handler = dispatch_block_create_with_qos_class( | |
0a7de745 A |
437 | DISPATCH_BLOCK_ENFORCE_QOS_CLASS, EXPECTED_QOS, 0, ^{ |
438 | // T_MAYFAIL; | |
439 | // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS, | |
440 | // "write handler block should run at correct QoS"); | |
441 | if (!handle_writing(shared.fd_pair, shared.wr_fd)) { | |
442 | /* finished handling the fd, tear down the source */ | |
443 | dispatch_source_cancel(write_src); | |
444 | dispatch_release(write_src); | |
445 | T_LOG("signal shared.wr_finished"); | |
446 | semaphore_signal(shared.wr_finished); | |
447 | } | |
448 | }); | |
813fb2f6 A |
449 | |
450 | dispatch_source_set_event_handler(write_src, handler); | |
451 | dispatch_activate(write_src); | |
452 | ||
453 | break; | |
454 | } | |
455 | ||
456 | default: | |
457 | T_ASSERT_FAIL("unrecognized write mode: %d", shared.wr_mode); | |
458 | break; | |
459 | } | |
460 | ||
461 | if (shared.wr_finished) { | |
5ba3f43e A |
462 | T_LOG("wait shared.wr_finished"); |
463 | kern_return_t kret = semaphore_timedwait(shared.wr_finished, WRITE_timeout); | |
464 | if (kret == KERN_OPERATION_TIMED_OUT) { | |
465 | T_ASSERT_FAIL("write side semaphore timedout after %d seconds", WRITE_timeout.tv_sec); | |
813fb2f6 | 466 | } |
5ba3f43e A |
467 | T_QUIET; |
468 | T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.wr_finished"); | |
469 | semaphore_destroy(mach_task_self(), shared.wr_finished); | |
813fb2f6 A |
470 | } |
471 | ||
472 | T_LOG("writer finished, closing fd"); | |
473 | T_QUIET; T_ASSERT_POSIX_SUCCESS(close(shared.wr_fd), NULL); | |
474 | return NULL; | |
475 | } | |
476 | ||
477 | #pragma mark reading | |
478 | ||
479 | #define BUF_LEN 1024 | |
480 | static char final_string[BUF_LEN]; | |
481 | static size_t final_length; | |
482 | ||
483 | /* | |
484 | * Read from the master PTY descriptor. | |
485 | * | |
486 | * Returns false if EOF is encountered, and true otherwise. | |
487 | */ | |
488 | static bool | |
489 | handle_reading(enum fd_pair fd_pair, int fd) | |
490 | { | |
491 | char read_buf[BUF_LEN] = { 0 }; | |
492 | ssize_t bytes_rd = 0; | |
493 | ||
494 | do { | |
495 | if (bytes_rd == -1) { | |
496 | T_LOG("read was interrupted, retrying"); | |
497 | } | |
498 | bytes_rd = read(fd, read_buf, sizeof(read_buf) - 1); | |
499 | } while (bytes_rd == -1 && errno == EINTR); | |
500 | ||
5ba3f43e A |
501 | // T_LOG("read %zd bytes: '%s'", bytes_rd, read_buf); |
502 | ||
813fb2f6 A |
503 | T_QUIET; T_ASSERT_POSIX_SUCCESS(bytes_rd, "reading from file"); |
504 | T_QUIET; T_ASSERT_LE(bytes_rd, (ssize_t)EXPECTED_LEN, | |
0a7de745 | 505 | "read too much from file"); |
813fb2f6 A |
506 | |
507 | if (bytes_rd == 0) { | |
508 | T_LOG("read EOF from file"); | |
509 | return false; | |
510 | } | |
511 | ||
512 | read_buf[bytes_rd] = '\0'; | |
513 | strlcpy(&(final_string[final_length]), read_buf, | |
0a7de745 | 514 | sizeof(final_string) - final_length); |
813fb2f6 A |
515 | final_length += (size_t)bytes_rd; |
516 | ||
813fb2f6 | 517 | T_QUIET; T_ASSERT_LE(final_length, EXPECTED_LEN, |
0a7de745 | 518 | "should not read more from file than what can be sent"); |
813fb2f6 | 519 | |
a39ff7e2 | 520 | /* FIFOs don't send EOF when the write side closes */ |
813fb2f6 | 521 | if (final_length == strlen(EXPECTED_STRING) && |
0a7de745 | 522 | (fd_pair == FIFO_PAIR)) { |
a39ff7e2 | 523 | T_LOG("read all expected bytes from FIFO"); |
813fb2f6 A |
524 | return false; |
525 | } | |
526 | return true; | |
527 | } | |
528 | ||
529 | static void | |
530 | workqueue_read_fn(void ** __unused buf, int * __unused count) | |
531 | { | |
532 | // T_MAYFAIL; | |
533 | // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS, | |
0a7de745 | 534 | // "reader thread should be requested at correct QoS"); |
813fb2f6 | 535 | if (!handle_reading(shared.fd_pair, shared.rd_fd)) { |
5ba3f43e A |
536 | T_LOG("signal shared.rd_finished"); |
537 | semaphore_signal(shared.rd_finished); | |
813fb2f6 A |
538 | } |
539 | ||
540 | reenable_workq(shared.rd_fd, EVFILT_READ); | |
541 | } | |
542 | ||
543 | static void | |
544 | read_from_fd(int fd, enum fd_pair fd_pair, enum read_mode mode) | |
545 | { | |
546 | int fd_flags; | |
547 | ||
548 | T_LOG("reader setting up"); | |
549 | ||
550 | bzero(final_string, sizeof(final_string)); | |
551 | ||
552 | fd_flags = fcntl(fd, F_GETFL, 0); | |
553 | T_QUIET; T_ASSERT_POSIX_SUCCESS(fd_flags, "fcntl(F_GETFL)"); | |
554 | ||
555 | if (!(fd_flags & O_NONBLOCK)) { | |
556 | T_QUIET; | |
557 | T_ASSERT_POSIX_SUCCESS(fcntl(fd, F_SETFL, | |
0a7de745 | 558 | fd_flags | O_NONBLOCK), NULL); |
813fb2f6 A |
559 | } |
560 | ||
561 | switch (mode) { | |
562 | case POLL_READ: { | |
563 | struct pollfd fds[] = { { .fd = fd, .events = POLLIN } }; | |
564 | wake_writer(); | |
5ba3f43e | 565 | |
813fb2f6 A |
566 | for (;;) { |
567 | fds[0].revents = 0; | |
568 | int pol = poll(fds, 1, READ_TIMEOUT_SECS * 1000); | |
569 | T_QUIET; T_ASSERT_POSIX_SUCCESS(pol, "poll"); | |
813fb2f6 | 570 | T_QUIET; T_ASSERT_NE(pol, 0, |
0a7de745 A |
571 | "poll should not time out after %d seconds, read %zd out " |
572 | "of %zu bytes", | |
573 | READ_TIMEOUT_SECS, final_length, strlen(EXPECTED_STRING)); | |
813fb2f6 | 574 | T_QUIET; T_ASSERT_FALSE(fds[0].revents & POLLERR, |
0a7de745 | 575 | "should not see an error on the device"); |
5ba3f43e | 576 | T_QUIET; T_ASSERT_FALSE(fds[0].revents & POLLNVAL, |
0a7de745 | 577 | "should not set up an invalid poll"); |
813fb2f6 A |
578 | |
579 | if (!handle_reading(fd_pair, fd)) { | |
580 | break; | |
581 | } | |
582 | } | |
583 | break; | |
584 | } | |
585 | ||
586 | case SELECT_READ: | |
587 | wake_writer(); | |
588 | ||
589 | for (;;) { | |
590 | struct timeval tv = { .tv_sec = READ_TIMEOUT_SECS }; | |
591 | ||
592 | fd_set read_fd; | |
593 | FD_ZERO(&read_fd); | |
594 | FD_SET(fd, &read_fd); | |
595 | fd_set err_fd; | |
596 | FD_ZERO(&err_fd); | |
597 | FD_SET(fd, &err_fd); | |
598 | ||
0a7de745 | 599 | int sel = select(fd + 1, &read_fd, NULL, NULL /*&err_fd*/, &tv); |
813fb2f6 A |
600 | if (sel == -1 && errno == EINTR) { |
601 | T_LOG("select interrupted"); | |
602 | continue; | |
603 | } | |
604 | (void)fd_pair; | |
605 | ||
606 | T_QUIET; T_ASSERT_POSIX_SUCCESS(sel, "select"); | |
607 | ||
813fb2f6 | 608 | T_QUIET; T_ASSERT_NE(sel, 0, |
0a7de745 A |
609 | "select waited for %d seconds and timed out", |
610 | READ_TIMEOUT_SECS); | |
813fb2f6 | 611 | |
813fb2f6 A |
612 | /* didn't fail or time out, therefore data is ready */ |
613 | T_QUIET; T_ASSERT_NE(FD_ISSET(fd, &read_fd), 0, | |
0a7de745 | 614 | "select should show reading fd as readable"); |
813fb2f6 A |
615 | |
616 | if (!handle_reading(fd_pair, fd)) { | |
617 | break; | |
618 | } | |
619 | } | |
620 | break; | |
621 | ||
622 | case KEVENT_READ: /* FALLTHROUGH */ | |
623 | case KEVENT64_READ: /* FALLTHROUGH */ | |
624 | case KEVENT_QOS_READ: { | |
625 | union mode rd_mode = { .rd = shared.rd_mode }; | |
626 | drive_kq(true, rd_mode, fd_pair, shared.rd_fd); | |
627 | break; | |
628 | } | |
629 | ||
630 | case WORKQ_READ: { | |
5ba3f43e A |
631 | // prohibit ourselves from going multi-threaded see:rdar://33296008 |
632 | _dispatch_prohibit_transition_to_multithreaded(true); | |
633 | T_ASSERT_POSIX_ZERO(_pthread_workqueue_init_with_kevent( | |
0a7de745 | 634 | workqueue_fn, workqueue_read_fn, 0, 0), NULL); |
813fb2f6 | 635 | |
5ba3f43e | 636 | T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.rd_finished, SYNC_POLICY_FIFO, 0), |
0a7de745 | 637 | "semaphore_create shared.rd_finished"); |
5ba3f43e A |
638 | |
639 | T_QUIET; | |
640 | T_ASSERT_NE_UINT(shared.rd_finished, (unsigned)MACH_PORT_NULL, "semaphore_create"); | |
813fb2f6 A |
641 | |
642 | int changes = 1; | |
643 | struct kevent_qos_s events[] = {{ | |
0a7de745 A |
644 | .ident = (uint64_t)shared.rd_fd, |
645 | .filter = EVFILT_READ, | |
646 | .flags = EV_ADD | EV_UDATA_SPECIFIC | EV_DISPATCH | EV_VANISHED, | |
647 | .fflags = NOTE_LOWAT, | |
648 | .data = 1, | |
649 | .qos = (int32_t)_pthread_qos_class_encode(EXPECTED_QOS, | |
650 | 0, 0) | |
651 | }}; | |
813fb2f6 A |
652 | |
653 | for (;;) { | |
654 | int kev = kevent_qos(-1, changes == 0 ? NULL : events, changes, | |
0a7de745 A |
655 | events, 1, NULL, NULL, |
656 | KEVENT_FLAG_WORKQ | KEVENT_FLAG_ERROR_EVENTS); | |
813fb2f6 A |
657 | if (kev == -1 && errno == EINTR) { |
658 | changes = 0; | |
659 | T_LOG("kevent_qos was interrupted"); | |
660 | continue; | |
661 | } | |
662 | ||
663 | T_QUIET; T_ASSERT_POSIX_SUCCESS(kev, "kevent_qos"); | |
664 | break; | |
665 | } | |
666 | ||
667 | wake_writer(); | |
668 | break; | |
669 | } | |
670 | ||
671 | case DISPATCH_READ: { | |
672 | dispatch_source_t read_src; | |
673 | ||
5ba3f43e | 674 | T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.rd_finished, SYNC_POLICY_FIFO, 0), |
0a7de745 | 675 | "semaphore_create shared.rd_finished"); |
5ba3f43e A |
676 | |
677 | T_QUIET; | |
678 | T_ASSERT_NE_UINT(shared.rd_finished, (unsigned)MACH_PORT_NULL, "semaphore_create"); | |
813fb2f6 A |
679 | |
680 | read_src = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, | |
0a7de745 | 681 | (uintptr_t)fd, 0, NULL); |
813fb2f6 | 682 | T_QUIET; T_ASSERT_NOTNULL(read_src, |
0a7de745 | 683 | "dispatch_source_create(DISPATCH_SOURCE_TYPE_READ)"); |
813fb2f6 A |
684 | |
685 | dispatch_block_t handler = dispatch_block_create_with_qos_class( | |
0a7de745 A |
686 | DISPATCH_BLOCK_ENFORCE_QOS_CLASS, EXPECTED_QOS, 0, ^{ |
687 | // T_MAYFAIL; | |
688 | // T_QUIET; T_ASSERT_EFFECTIVE_QOS_EQ(EXPECTED_QOS, | |
689 | // "read handler block should run at correct QoS"); | |
690 | ||
691 | if (!handle_reading(fd_pair, fd)) { | |
692 | /* finished handling the fd, tear down the source */ | |
693 | dispatch_source_cancel(read_src); | |
694 | dispatch_release(read_src); | |
695 | T_LOG("signal shared.rd_finished"); | |
696 | semaphore_signal(shared.rd_finished); | |
697 | } | |
698 | }); | |
813fb2f6 A |
699 | |
700 | dispatch_source_set_event_handler(read_src, handler); | |
701 | dispatch_activate(read_src); | |
702 | ||
703 | wake_writer(); | |
704 | break; | |
705 | } | |
706 | ||
707 | default: | |
708 | T_ASSERT_FAIL("unrecognized read mode: %d", mode); | |
709 | break; | |
710 | } | |
711 | ||
712 | if (shared.rd_finished) { | |
5ba3f43e A |
713 | T_LOG("wait shared.rd_finished"); |
714 | kern_return_t kret = semaphore_timedwait(shared.rd_finished, READ_timeout); | |
715 | if (kret == KERN_OPERATION_TIMED_OUT) { | |
716 | T_ASSERT_FAIL("reading timed out after %d seconds", READ_timeout.tv_sec); | |
813fb2f6 | 717 | } |
5ba3f43e A |
718 | T_QUIET; |
719 | T_ASSERT_MACH_SUCCESS(kret, "semaphore_timedwait shared.rd_finished"); | |
813fb2f6 A |
720 | } |
721 | ||
722 | T_EXPECT_EQ_STR(final_string, EXPECTED_STRING, | |
0a7de745 | 723 | "reader should receive valid string"); |
813fb2f6 A |
724 | T_QUIET; T_ASSERT_POSIX_SUCCESS(close(fd), NULL); |
725 | } | |
726 | ||
727 | #pragma mark file setup | |
728 | ||
729 | static void | |
730 | fd_pair_init(enum fd_pair fd_pair, int *rd_fd, int *wr_fd) | |
731 | { | |
732 | switch (fd_pair) { | |
733 | case PTY_PAIR: | |
734 | T_ASSERT_POSIX_SUCCESS(openpty(rd_fd, wr_fd, NULL, NULL, NULL), | |
0a7de745 | 735 | NULL); |
813fb2f6 A |
736 | break; |
737 | ||
738 | case FIFO_PAIR: { | |
739 | char fifo_path[] = "/tmp/async-io-fifo.XXXXXX"; | |
740 | T_QUIET; T_ASSERT_NOTNULL(mktemp(fifo_path), NULL); | |
741 | ||
742 | T_ASSERT_POSIX_SUCCESS(mkfifo(fifo_path, 0700), "mkfifo(%s, 0700)", | |
0a7de745 | 743 | fifo_path); |
813fb2f6 A |
744 | /* |
745 | * Opening the read side of a pipe will block until the write | |
746 | * side opens -- use O_NONBLOCK. | |
747 | */ | |
748 | *rd_fd = open(fifo_path, O_RDONLY | O_NONBLOCK); | |
749 | T_QUIET; T_ASSERT_POSIX_SUCCESS(*rd_fd, "open(... O_RDONLY)"); | |
750 | *wr_fd = open(fifo_path, O_WRONLY | O_NONBLOCK); | |
751 | T_QUIET; T_ASSERT_POSIX_SUCCESS(*wr_fd, "open(... O_WRONLY)"); | |
752 | break; | |
753 | } | |
754 | ||
755 | case PIPE_PAIR: { | |
756 | int pipe_fds[2]; | |
757 | T_ASSERT_POSIX_SUCCESS(pipe(pipe_fds), NULL); | |
758 | *rd_fd = pipe_fds[0]; | |
759 | *wr_fd = pipe_fds[1]; | |
760 | break; | |
761 | } | |
762 | ||
763 | case SOCKET_PAIR: { | |
764 | int sock_fds[2]; | |
765 | T_ASSERT_POSIX_SUCCESS(socketpair(AF_UNIX, SOCK_STREAM, 0, sock_fds), | |
0a7de745 | 766 | NULL); |
813fb2f6 A |
767 | *rd_fd = sock_fds[0]; |
768 | *wr_fd = sock_fds[1]; | |
769 | break; | |
770 | } | |
771 | ||
772 | default: | |
773 | T_ASSERT_FAIL("unknown descriptor pair type: %d", fd_pair); | |
774 | break; | |
775 | } | |
776 | ||
777 | T_QUIET; T_ASSERT_NE(*rd_fd, -1, "reading descriptor"); | |
778 | T_QUIET; T_ASSERT_NE(*wr_fd, -1, "writing descriptor"); | |
779 | } | |
780 | ||
781 | #pragma mark single process | |
782 | ||
783 | static void | |
784 | drive_threads(enum fd_pair fd_pair, enum read_mode rd_mode, | |
0a7de745 | 785 | enum write_mode wr_mode) |
813fb2f6 A |
786 | { |
787 | pthread_t thread; | |
788 | ||
789 | shared.fd_pair = fd_pair; | |
790 | shared.rd_mode = rd_mode; | |
791 | shared.wr_mode = wr_mode; | |
792 | fd_pair_init(fd_pair, &(shared.rd_fd), &(shared.wr_fd)); | |
793 | ||
794 | shared.wr_kind = THREAD_WRITER; | |
5ba3f43e | 795 | T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &shared.wr_wait.sem, SYNC_POLICY_FIFO, 0), |
0a7de745 | 796 | "semaphore_create shared.wr_wait.sem"); |
813fb2f6 A |
797 | |
798 | T_QUIET; | |
799 | T_ASSERT_POSIX_ZERO(pthread_create(&thread, NULL, write_to_fd, NULL), | |
0a7de745 | 800 | NULL); |
813fb2f6 A |
801 | T_LOG("created writer thread"); |
802 | ||
803 | read_from_fd(shared.rd_fd, fd_pair, rd_mode); | |
5ba3f43e A |
804 | |
805 | T_ASSERT_POSIX_ZERO(pthread_join(thread, NULL), NULL); | |
806 | ||
813fb2f6 A |
807 | T_END; |
808 | } | |
809 | ||
810 | #pragma mark multiple processes | |
811 | ||
812 | static void __attribute__((noreturn)) | |
813 | drive_processes(enum fd_pair fd_pair, enum read_mode rd_mode, enum write_mode wr_mode) | |
814 | { | |
815 | shared.fd_pair = fd_pair; | |
816 | shared.rd_mode = rd_mode; | |
817 | shared.wr_mode = wr_mode; | |
818 | fd_pair_init(fd_pair, &(shared.rd_fd), &(shared.wr_fd)); | |
819 | ||
820 | shared.wr_kind = PROCESS_WRITER; | |
821 | int fds[2]; | |
822 | T_QUIET; T_ASSERT_POSIX_SUCCESS(pipe(fds), NULL); | |
823 | shared.wr_wait.out_fd = fds[0]; | |
824 | shared.wr_wait.in_fd = fds[1]; | |
825 | ||
826 | T_LOG("starting subprocesses"); | |
827 | dt_helper_t helpers[2] = { | |
828 | dt_fork_helper("reader_helper"), | |
829 | dt_fork_helper("writer_helper") | |
830 | }; | |
831 | ||
832 | close(shared.rd_fd); | |
833 | close(shared.wr_fd); | |
834 | ||
835 | dt_run_helpers(helpers, 2, 50000); | |
836 | } | |
837 | ||
838 | T_HELPER_DECL(reader_helper, "Read asynchronously") | |
839 | { | |
840 | close(shared.wr_fd); | |
841 | read_from_fd(shared.rd_fd, shared.fd_pair, shared.rd_mode); | |
842 | T_END; | |
843 | } | |
844 | ||
845 | T_HELPER_DECL(writer_helper, "Write asynchronously") | |
846 | { | |
847 | close(shared.rd_fd); | |
848 | write_to_fd(NULL); | |
849 | } | |
850 | ||
851 | #pragma mark tests | |
852 | ||
853 | #define WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \ | |
0a7de745 A |
854 | write_mode, read_name, read_mode) \ |
855 | T_DECL(desc_name##_r##read_name##_w##write_name##_procs, "read changes to a " \ | |
856 | #desc_name " with " #read_name " and writing " #write_str \ | |
857 | " across two processes") \ | |
858 | { \ | |
859 | drive_processes(fd_pair, read_mode, write_mode); \ | |
860 | } | |
813fb2f6 | 861 | #define WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \ |
0a7de745 A |
862 | write_mode, read_name, read_mode) \ |
863 | T_DECL(desc_name##_r##read_name##_w##write_name##_thds, "read changes to a " \ | |
864 | #desc_name " with " #read_name " and writing " #write_str) \ | |
865 | { \ | |
866 | drive_threads(fd_pair, read_mode, write_mode); \ | |
867 | } | |
813fb2f6 A |
868 | |
869 | #define WR_DECL(desc_name, fd_pair, write_name, write_str, write_mode, \ | |
0a7de745 A |
870 | read_name, read_mode) \ |
871 | WR_DECL_PROCESSES(desc_name, fd_pair, write_name, write_str, \ | |
872 | write_mode, read_name, read_mode) \ | |
873 | WR_DECL_THREADS(desc_name, fd_pair, write_name, write_str, \ | |
874 | write_mode, read_name, read_mode) | |
813fb2f6 A |
875 | |
876 | #define RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \ | |
0a7de745 A |
877 | WR_DECL(desc_name, fd_pair, full, "the full string", FULL_WRITE, \ |
878 | read_name, read_mode) \ | |
879 | WR_DECL(desc_name, fd_pair, inc, "incrementally", \ | |
880 | INCREMENTAL_WRITE, read_name, read_mode) | |
813fb2f6 A |
881 | |
882 | #define RD_DECL_DISPATCH_ONLY(suffix, desc_name, fd_pair, read_name, \ | |
0a7de745 A |
883 | read_mode) \ |
884 | WR_DECL##suffix(desc_name, fd_pair, inc_dispatch, \ | |
885 | "incrementally with a dispatch source", \ | |
886 | DISPATCH_INCREMENTAL_WRITE, read_name, read_mode) | |
813fb2f6 | 887 | #define RD_DECL_WORKQ_ONLY(suffix, desc_name, fd_pair, read_name, \ |
0a7de745 A |
888 | read_mode) \ |
889 | WR_DECL##suffix(desc_name, fd_pair, inc_workq, \ | |
890 | "incrementally with the workqueue", \ | |
891 | WORKQ_INCREMENTAL_WRITE, read_name, read_mode) | |
813fb2f6 A |
892 | |
893 | #define RD_DECL(desc_name, fd_pair, read_name, read_mode) \ | |
0a7de745 A |
894 | RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \ |
895 | RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode) | |
896 | // RD_DECL_WORKQ_ONLY(, desc_name, fd_pair, read_name, read_mode) | |
813fb2f6 A |
897 | |
898 | /* | |
899 | * dispatch_source tests cannot share the same process as other workqueue | |
900 | * tests. | |
901 | */ | |
902 | #define RD_DECL_DISPATCH(desc_name, fd_pair, read_name, read_mode) \ | |
0a7de745 A |
903 | RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \ |
904 | RD_DECL_DISPATCH_ONLY(, desc_name, fd_pair, read_name, read_mode) \ | |
905 | RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \ | |
906 | read_mode) | |
813fb2f6 A |
907 | |
908 | /* | |
909 | * Workqueue tests cannot share the same process as other workqueue or | |
910 | * dispatch_source tests. | |
0a7de745 A |
911 | #define RD_DECL_WORKQ(desc_name, fd_pair, read_name, read_mode) \ |
912 | * RD_DECL_SAFE(desc_name, fd_pair, read_name, read_mode) \ | |
913 | * RD_DECL_DISPATCH_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \ | |
914 | * read_mode) \ | |
915 | * RD_DECL_WORKQ_ONLY(_PROCESSES, desc_name, fd_pair, read_name, \ | |
916 | * read_mode) | |
813fb2f6 A |
917 | */ |
918 | ||
919 | #define PAIR_DECL(desc_name, fd_pair) \ | |
920 | RD_DECL(desc_name, fd_pair, poll, POLL_READ) \ | |
921 | RD_DECL(desc_name, fd_pair, select, SELECT_READ) \ | |
922 | RD_DECL(desc_name, fd_pair, kevent, KEVENT_READ) \ | |
923 | RD_DECL(desc_name, fd_pair, kevent64, KEVENT64_READ) \ | |
924 | RD_DECL(desc_name, fd_pair, kevent_qos, KEVENT_QOS_READ) \ | |
925 | RD_DECL_DISPATCH(desc_name, fd_pair, dispatch_source, DISPATCH_READ) | |
0a7de745 | 926 | // RD_DECL_WORKQ(desc_name, fd_pair, workq, WORKQ_READ) |
813fb2f6 A |
927 | |
928 | PAIR_DECL(tty, PTY_PAIR) | |
929 | PAIR_DECL(pipe, PIPE_PAIR) | |
930 | PAIR_DECL(fifo, FIFO_PAIR) | |
931 | PAIR_DECL(socket, SOCKET_PAIR) |