4 * Stress test for dispatch read and write sources.
7 #include <dispatch/dispatch.h>
10 #include <CommonCrypto/CommonDigest.h>
14 #include <netinet/in.h>
17 #include <sys/param.h>
20 static inline size_t max(size_t a
, size_t b
) {
21 return (a
> b
) ? a
: b
;
24 static inline size_t min(size_t a
, size_t b
) {
25 return (a
< b
) ? a
: b
;
30 #define DEBUG(...) do { \
31 if (debug) fprintf(stderr, __VA_ARGS__); \
34 #define assert_errno(str, expr) do { \
36 fprintf(stderr, "%s: %s\n", (str), strerror(errno)); \
40 #define assert_gai_errno(str, expr) do { \
42 fprintf(stderr, "%s: %s\n", (str), gai_strerror(errno)); \
49 * Context structure used by the reader and writer queues.
51 * Writers begin by generating a random length and writing it to the descriptor.
52 * The write buffer is filled with a random byte value and written until empty
53 * or until the total length is reached. The write buffer is refilled with more
54 * random data when empty. Each write updates an MD5 digest which is written to
55 * the descriptor once the total length is reached.
57 * Readers begin by reading the total length of data. The read buffer is filled
58 * and an MD5 digest is computed on the bytes as they are received. Once the
59 * total length of data has be read, an MD5 digest is read from the descriptor
60 * and compared with the computed value.
75 char md5
[CC_MD5_DIGEST_LENGTH
];
79 create_writer(int wfd
, dispatch_block_t completion
)
82 struct sock_context
*ctx
= calloc(1, sizeof(struct sock_context
));
85 snprintf(ctx
->label
, sizeof(ctx
->label
), "writer.fd.%d", wfd
);
86 dispatch_queue_t queue
= dispatch_queue_create(ctx
->label
, 0);
88 ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
, wfd
, 0, queue
);
90 dispatch_release(queue
);
94 len
= (arc4random() & 0x7FFF);
97 CC_MD5_Init(&ctx
->md5ctx
);
99 ctx
->buflen
= sizeof(len
);
101 memcpy(ctx
->buf
, &len
, ctx
->buflen
);
102 DEBUG("%s: LENGTH %d\n", ctx
->label
, ctx
->len
);
104 dispatch_source_set_event_handler(ds
, ^{
105 DEBUG("%s: available %ld\n", ctx
->label
, dispatch_source_get_data(ds
));
107 size_t wrsz
= min(ctx
->len
, ctx
->buflen
);
108 res
= write(wfd
, &ctx
->buf
[ctx
->offset
], wrsz
);
109 DEBUG("%s: write(%d, %p, %ld): %ld\n", ctx
->label
, wfd
, &ctx
->buf
[ctx
->offset
], wrsz
, res
);
111 if (ctx
->state
== DATA
) {
112 CC_MD5_Update(&ctx
->md5ctx
, &ctx
->buf
[ctx
->offset
], res
);
117 assert(ctx
->offset
>= 0);
118 assert(ctx
->len
>= 0);
119 assert(ctx
->buflen
>= 0);
120 if (ctx
->buflen
== 0 || ctx
->len
== 0) {
121 if (ctx
->state
== LENGTH
) {
122 // finished writing length, move on to data.
124 ctx
->buflen
= sizeof(ctx
->buf
);
125 char pattern
= arc4random() & 0xFF;
126 memset(ctx
->buf
, pattern
, ctx
->buflen
);
127 } else if (ctx
->state
== DATA
&& ctx
->len
== 0) {
128 // finished writing data, move on to cksum.
130 ctx
->len
= sizeof(ctx
->md5
);
131 ctx
->buflen
= sizeof(ctx
->md5
);
132 CC_MD5_Final(ctx
->md5
, &ctx
->md5ctx
);
133 memcpy(ctx
->buf
, ctx
->md5
, ctx
->buflen
);
134 } else if (ctx
->state
== DATA
) {
135 ctx
->buflen
= sizeof(ctx
->buf
);
136 char pattern
= arc4random() & 0xFF;
137 memset(ctx
->buf
, pattern
, ctx
->buflen
);
138 } else if (ctx
->state
== CKSUM
) {
140 dispatch_source_cancel(ds
);
146 } else if (res
== 0) {
147 assert(ctx
->state
== DONE
);
149 } else if (res
== -1 && errno
== EAGAIN
) {
150 DEBUG("%s: EAGAIN\n", ctx
->label
);
152 assert_errno("write", res
>= 0);
155 dispatch_source_set_cancel_handler(ds
, ^{
156 DEBUG("%s: close(%d)\n", ctx
->label
, wfd
);
157 int res
= close(wfd
);
158 assert_errno("close", res
== 0);
160 dispatch_release(ds
);
168 create_reader(int rfd
, dispatch_block_t completion
)
170 dispatch_source_t ds
;
171 struct sock_context
*ctx
= calloc(1, sizeof(struct sock_context
));
174 snprintf(ctx
->label
, sizeof(ctx
->label
), "reader.fd.%d", rfd
);
175 dispatch_queue_t queue
= dispatch_queue_create(ctx
->label
, 0);
177 ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, rfd
, 0, queue
);
179 dispatch_release(queue
);
182 ctx
->len
= sizeof(ctx
->len
);
183 ctx
->buflen
= sizeof(ctx
->len
);
184 CC_MD5_Init(&ctx
->md5ctx
);
186 dispatch_source_set_event_handler(ds
, ^{
187 DEBUG("%s: available %ld\n", ctx
->label
, dispatch_source_get_data(ds
));
189 size_t rdsz
= min(ctx
->len
, ctx
->buflen
);
190 res
= read(rfd
, &ctx
->buf
[ctx
->offset
], rdsz
);
191 DEBUG("%s: read(%d,%p,%ld): %ld\n", ctx
->label
, rfd
, &ctx
->buf
[ctx
->offset
], rdsz
, res
);
193 // log unexpected data lengths...
194 long expected
= dispatch_source_get_data(ds
);
196 if (actual
>= 0 && (actual
!= expected
&& actual
!= rdsz
)) {
197 fprintf(stderr
, "%s: expected %ld, actual %ld (rdsz = %ld)\n", ctx
->label
, expected
, actual
, rdsz
);
201 if (ctx
->state
== DATA
) {
202 CC_MD5_Update(&ctx
->md5ctx
, &ctx
->buf
[ctx
->offset
], res
);
207 if (ctx
->buflen
== 0 || ctx
->len
== 0) {
208 if (ctx
->state
== LENGTH
) {
209 // buffer is full, interpret as uint32_t
210 memcpy(&ctx
->len
, ctx
->buf
, sizeof(ctx
->len
));
211 ctx
->len
= ntohl(ctx
->len
);
212 ctx
->buflen
= sizeof(ctx
->buf
);
214 } else if (ctx
->state
== DATA
&& ctx
->len
== 0) {
215 CC_MD5_Final(ctx
->md5
, &ctx
->md5ctx
);
217 ctx
->len
= CC_MD5_DIGEST_LENGTH
;
218 ctx
->buflen
= ctx
->len
;
219 } else if (ctx
->state
== DATA
) {
220 ctx
->buflen
= sizeof(ctx
->buf
);
221 } else if (ctx
->state
== CKSUM
) {
223 res
= memcmp(ctx
->buf
, ctx
->md5
, sizeof(ctx
->md5
));
225 DEBUG("%s: MD5 FAILURE\n", ctx
->label
);
231 } else if (res
== 0) {
232 assert(ctx
->state
== DONE
);
233 DEBUG("%s: EOF\n", ctx
->label
);
234 dispatch_source_cancel(ds
);
236 assert_errno("read", res
>= 0);
239 dispatch_source_set_cancel_handler(ds
, ^{
240 DEBUG("%s: close(%d)\n", ctx
->label
, rfd
);
241 int res
= close(rfd
);
242 assert_errno("close", res
== 0);
244 dispatch_release(ds
);
255 flags
= fcntl(fd
, F_GETFL
);
258 res
= fcntl(fd
, F_SETFL
, flags
);
259 assert_errno("fcntl(F_SETFL,O_NONBLOCK)", res
== 0);
263 create_fifo(int *rfd
, int *wfd
)
268 char path
[MAXPATHLEN
];
269 strlcpy(path
, "/tmp/fd_stress.fifo.XXXXXX", sizeof(path
));
274 res
= mkfifo(name
, 0700);
275 assert_errno(name
, res
== 0);
277 *rfd
= open(name
, O_RDONLY
| O_NONBLOCK
);
278 assert_errno(name
, *rfd
>= 0);
280 *wfd
= open(name
, O_WRONLY
| O_NONBLOCK
);
281 assert_errno(name
, *wfd
>= 0);
285 create_pipe(int *rfd
, int *wfd
)
291 assert_errno("pipe", res
== 0);
301 create_server_socket(int *rfd
, struct sockaddr_in
*sa
)
305 socklen_t salen
= sizeof(*sa
);
307 memset(sa
, 0, salen
);
309 sa
->sin_family
= AF_INET
;
310 sa
->sin_port
= htons(12345);
311 sa
->sin_addr
.s_addr
= htonl(INADDR_LOOPBACK
);
313 *rfd
= socket(AF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
314 assert_errno("socket", *rfd
>= 0);
317 res
= setsockopt(*rfd
, SOL_SOCKET
, SO_REUSEADDR
, &value
, sizeof(value
));
318 assert_errno("setsockopt(SO_REUSEADDR)", res
== 0);
321 res
= setsockopt(*rfd
, SOL_SOCKET
, SO_REUSEPORT
, &value
, sizeof(value
));
322 assert_errno("setsockopt(SO_REUSEPORT)", res
== 0);
324 res
= bind(*rfd
, (const struct sockaddr
*)sa
, salen
);
325 assert_errno("bind", res
== 0);
327 res
= listen(*rfd
, 128);
328 assert_errno("listen", res
== 0);
332 create_client_socket(int *wfd
, const struct sockaddr_in
*sa
)
336 *wfd
= socket(PF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
337 assert_errno("socket", *wfd
>= 0);
341 res
= connect(*wfd
, (const struct sockaddr
*)sa
, sa
->sin_len
);
342 assert_errno("connect", res
== 0 || errno
== EINPROGRESS
);
350 fprintf(stderr
, "usage: fd_stress [-d] iterations width\n");
355 main(int argc
, char* argv
[])
358 struct sockaddr_in sa
;
359 create_server_socket(&serverfd
, &sa
);
363 while ((ch
= getopt(argc
, argv
, "d")) != -1) {
381 size_t iterations
= strtol(argv
[0], NULL
, 10);
382 size_t width
= strtol(argv
[1], NULL
, 10);
384 if (iterations
== 0 || width
== 0) {
388 fprintf(stdout
, "pid %d\n", getpid());
390 dispatch_group_t group
;
391 group
= dispatch_group_create();
395 dispatch_queue_t queue
= dispatch_queue_create("server", NULL
);
397 dispatch_source_t ds
;
398 ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, serverfd
, 0, queue
);
400 dispatch_source_set_event_handler(ds
, ^{
403 struct sockaddr peer
;
406 fd
= accept(serverfd
, &peer
, &peerlen
);
407 assert_errno("accept", fd
>= 0);
411 char host
[NI_MAXHOST
], serv
[NI_MAXSERV
];
414 res
= getnameinfo(&peer
, peerlen
, host
, sizeof(host
), serv
, sizeof(serv
), NI_NUMERICHOST
|NI_NUMERICSERV
);
415 DEBUG("ACCEPTED %d (%s:%s)\n", fd
, host
, serv
);
417 create_reader(fd
, ^{ dispatch_group_leave(group
); });
423 for (i
= 1; i
< iterations
; ++i
) {
424 fprintf(stderr
, "iteration %ld\n", i
);
427 for (j
= 0; j
< width
; ++j
) {
429 dispatch_group_enter(group
);
430 create_pipe(&rfd
, &wfd
);
431 DEBUG("PIPE %d %d\n", rfd
, wfd
);
432 dispatch_source_t reader
;
433 reader
= create_reader(rfd
, ^{ dispatch_group_leave(group
); });
434 create_writer(wfd
, ^{});
439 dispatch_group_enter(group
);
440 create_client_socket(&clientfd
, &sa
);
441 DEBUG("CLIENT %d\n", clientfd
);
442 create_writer(clientfd
, ^{});
444 dispatch_group_enter(group
);
445 create_fifo(&rfd
, &wfd
);
446 DEBUG("FIFO %d %d\n", rfd
, wfd
);
447 create_writer(wfd
, ^{});
448 create_reader(rfd
, ^{ dispatch_group_leave(group
); });
451 dispatch_group_wait(group
, DISPATCH_TIME_FOREVER
);
453 fprintf(stdout
, "pid %d\n", getpid());