1 #include <darwintest.h>
10 #include <mach/mach_time.h>
11 #include <mach/semaphore.h>
12 #include <sys/select.h>
14 /* Select parameters */
15 #define TIMEOUT_CHANCE 17 /* one in this many times, timeout */
16 #define TIMEOUT_POLLCHANCE 11 /* one in this many is a poll */
17 #define TIMEOUT_SCALE 5 /* microseconds multiplier */
19 static semaphore_t g_thread_sem
;
20 static semaphore_t g_sync_sem
;
27 typedef void * (*thread_func
)(struct endpoint
*ep
);
28 typedef void (*setup_func
)(struct endpoint
*ep
);
30 struct thread_sync_arg
{
36 static mach_timebase_info_data_t g_timebase
;
38 static int g_sleep_iterations
= 150000;
39 static int g_sleep_usecs
= 30;
40 static int g_stress_nthreads
= 100;
41 static uint64_t g_stress_duration
= 60;
43 static inline uint64_t
44 ns_to_abs(uint64_t ns
)
46 return ns
* g_timebase
.denom
/ g_timebase
.numer
;
49 static inline uint64_t
50 abs_to_ns(uint64_t abs
)
52 return abs
* g_timebase
.numer
/ g_timebase
.denom
;
58 * Synchronize the startup / initialization of a set of threads
61 thread_sync(void *ctx
)
63 struct thread_sync_arg
*a
= (struct thread_sync_arg
*)ctx
;
65 T_ASSERT_TRUE(((a
!= NULL
) && (a
->work
!= NULL
)), "thread setup error");
71 semaphore_wait_signal(g_thread_sem
, g_sync_sem
);
72 return (a
->work
)(&a
->ep
);
75 struct select_stress_args
{
81 setup_stress_event(struct endpoint
*ep
)
85 T_ASSERT_POSIX_SUCCESS(pipe(&ep
->fd
[0]), "pipe()");
87 T_LOG("th[0x%lx]: fd:{%d,%d}, ep@%p",
88 (uintptr_t)pthread_self(), ep
->fd
[0], ep
->fd
[1], (void *)ep
);
92 * Cause file descriptors to be reused/replaced. We expect that it will at
93 * least take the lowest fd as part of the descriptor list. This may be
94 * optimistic, but it shows replacing an fd out from under a select() if it
97 * We potentially delay the open for a random amount of time so that another
98 * thread can come in and wake up the fd_set with a bad (closed) fd in the set.
101 recycle_fds(struct endpoint
*ep
)
103 /* close endpoint descriptors in random order */
112 /* randomize a delay */
113 if ((random() % ep
->fd
[0]) == 0) {
114 usleep(((random() % ep
->fd
[1]) + 1) * ep
->fd
[1]);
117 /* reopen the FDs, hopefully in the middle of select() */
120 T_ASSERT_POSIX_SUCCESS(pipe(&ep
->fd
[0]), "pipe");
125 * Send a byte of data down the thread end of a pipe to wake up the select
126 * on the other end of it. Select will wake up normally because of this,
127 * and read the byte out. Hopefully, another thread has closed/reopened its FDs.
130 write_data(struct endpoint
*ep
)
134 T_ASSERT_POSIX_SUCCESS(write(ep
->fd
[1], "X", 1), "th[0x%lx] write_data(fd=%d)",
135 (uintptr_t)pthread_self(), ep
->fd
[1]);
139 do_stress_events(struct endpoint
*ep
)
141 unsigned write_freq
= (unsigned)(((uintptr_t)pthread_self() & 0xff0000) >> 16);
144 if (write_freq
== 0) {
148 T_LOG("th[0x%lx] write_freq:%d", (uintptr_t)pthread_self(), write_freq
);
151 /* randomized delay between events */
152 usleep(((random() % ep
->fd
[1]) + 1) * ep
->fd
[1]);
154 if ((random() % write_freq
) == 0) {
163 struct thread_sync_arg
*th
;
173 * Put the actual call to select in its own thread so we can catch errors that
174 * occur only the first time a thread calls select.
179 struct selarg
*sarg
= (struct selarg
*)arg
;
180 struct timeval timeout
;
181 struct timeval
*tp
= NULL
;
187 FD_COPY(&sarg
->def_readfds
, &readfds
);
189 /* Add a timeout probablistically */
190 if ((random() % TIMEOUT_CHANCE
) == 0) {
191 timeout
.tv_sec
= random() % 1;
192 timeout
.tv_usec
= ((random() % TIMEOUT_POLLCHANCE
) * TIMEOUT_SCALE
);
197 nfd
= select(sarg
->max_fd
+ 1, &readfds
, 0, 0, tp
);
199 /* EBADF: fd_set has changed */
200 if (errno
== EBADF
) {
205 /* Other errors are fatal */
208 T_ASSERT_POSIX_SUCCESS(nfd
, "select:stress");
211 /* Fast: handle timeouts */
216 /* Slower: discard read input thrown at us from threads */
217 for (int i
= 0; i
< sarg
->nthreads
; i
++) {
218 struct endpoint
*ep
= &sarg
->th
[i
].ep
;
220 if (FD_ISSET(ep
->fd
[0], &readfds
)) {
222 (void)read(ep
->fd
[0], &c
, 1);
231 test_select_stress(int nthreads
, uint64_t duration_seconds
)
234 uint64_t seconds_remain
, last_print_time
;
238 int started_threads
= 0;
239 struct thread_sync_arg
*th
;
242 T_LOG("forcing a minimum of 2 threads");
247 * Allocate memory for endpoint data
249 th
= calloc(nthreads
, sizeof(*th
));
251 T_ASSERT_NOTNULL(th
, "select_stress: No memory for thread endpoints");
253 T_LOG("Select stress test: %d threads, for %lld seconds", nthreads
, duration_seconds
);
256 * Startup all the threads
258 T_LOG("\tcreating threads...");
259 for (int i
= 0; i
< nthreads
; i
++) {
260 struct endpoint
*e
= &th
[i
].ep
;
261 th
[i
].setup
= setup_stress_event
;
262 th
[i
].work
= do_stress_events
;
265 T_ASSERT_POSIX_ZERO(pthread_create(&e
->pth
, 0, thread_sync
, &th
[i
]),
266 "pthread_create:do_stress_events");
270 * Wait for all the threads to start up
272 while (started_threads
< nthreads
) {
273 if (semaphore_wait(g_sync_sem
) == KERN_SUCCESS
) {
281 semaphore_signal_all(g_thread_sem
);
284 * Calculate a stop time
286 deadline
= mach_absolute_time() + ns_to_abs(duration_seconds
* NSEC_PER_SEC
);
287 seconds_remain
= duration_seconds
;
288 last_print_time
= seconds_remain
+ 1;
291 * Perform the select and read any data that comes from the
292 * constituent thread FDs.
295 T_LOG("\ttest running!");
297 /* (re) set up the select fd set */
299 FD_ZERO(&sarg
.def_readfds
);
300 for (int i
= 0; i
< nthreads
; i
++) {
301 struct endpoint
*ep
= &th
[i
].ep
;
303 FD_SET(ep
->fd
[0], &sarg
.def_readfds
);
304 if (ep
->fd
[0] > sarg
.max_fd
) {
305 sarg
.max_fd
= ep
->fd
[0];
310 sarg
.nthreads
= nthreads
;
312 while (mach_absolute_time() < deadline
) {
315 seconds_remain
= abs_to_ns(deadline
- mach_absolute_time()) / NSEC_PER_SEC
;
316 if (last_print_time
> seconds_remain
) {
317 T_LOG(" %6lld...", seconds_remain
);
318 last_print_time
= seconds_remain
;
324 T_ASSERT_POSIX_ZERO(pthread_create(&sarg
.pth
, 0, do_select
, &sarg
),
325 "pthread_create:do_select");
329 T_ASSERT_POSIX_ZERO(pthread_cancel(sarg
.pth
), "pthread_cancel");
332 T_ASSERT_POSIX_ZERO(pthread_join(sarg
.pth
, &thret
), "pthread_join");
334 if (sarg
.ret
== EBADF
) {
338 T_ASSERT_GE(sarg
.ret
, 0, "threaded do_select returned an \
339 error: %d!", sarg
.ret
);
342 T_PASS("select stress test passed");
347 * TEST: use select as sleep()
350 test_select_sleep(uint32_t niterations
, unsigned long usecs
)
358 T_FAIL("select sleep test skipped");
362 T_LOG("Testing select as sleep (n=%d, us=%ld)...", niterations
, usecs
);
364 while (niterations
--) {
365 ret
= select(0, NULL
, NULL
, NULL
, &tv
);
366 if (ret
< 0 && errno
!= EINTR
) {
369 T_ASSERT_POSIX_SUCCESS(ret
, "select:sleep");
373 T_PASS("select sleep test passed");
376 #define get_env_arg(NM, sval, val) \
378 sval = getenv(#NM); \
380 long v = atol(sval); \
383 val = (typeof(val))v; \
387 T_DECL(select_sleep
, "select sleep test for rdar://problem/20804876 Gala: select with no FDs leaks waitq table objects (causes asserts/panics)")
389 char *env_sval
= NULL
;
391 get_env_arg(SELSLEEP_ITERATIONS
, env_sval
, g_sleep_iterations
);
392 get_env_arg(SELSLEEP_INTERVAL
, env_sval
, g_sleep_usecs
);
394 test_select_sleep((uint32_t)g_sleep_iterations
, (unsigned long)g_sleep_usecs
);
397 T_DECL(select_stress
, "select stress test for rdar://problem/20804876 Gala: select with no FDs leaks waitq table objects (causes asserts/panics)")
399 char *env_sval
= NULL
;
402 T_ASSERT_MACH_SUCCESS(mach_timebase_info(&g_timebase
),
403 "Can't get mach_timebase_info!");
405 get_env_arg(SELSTRESS_THREADS
, env_sval
, g_stress_nthreads
);
406 get_env_arg(SELSTRESS_DURATION
, env_sval
, g_stress_duration
);
409 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &g_sync_sem
, SYNC_POLICY_FIFO
, 0),
410 "semaphore_create(g_sync_sem)");
412 T_ASSERT_MACH_SUCCESS(semaphore_create(mach_task_self(), &g_thread_sem
, SYNC_POLICY_FIFO
, 0),
413 "semaphore_create(g_thread_sem)");
415 test_select_stress(g_stress_nthreads
, g_stress_duration
);