]> git.saurik.com Git - apple/libdispatch.git/blob - testing/fd_stress.c
libdispatch-84.5.5.tar.gz
[apple/libdispatch.git] / testing / fd_stress.c
1 /*
2 * fd_stress.c
3 *
4 * Stress test for dispatch read and write sources.
5 */
6
7 #include <dispatch/dispatch.h>
8
9 #include <assert.h>
10 #include <CommonCrypto/CommonDigest.h>
11 #include <errno.h>
12 #include <fcntl.h>
13 #include <netdb.h>
14 #include <netinet/in.h>
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <sys/param.h>
18 #include <unistd.h>
19
20 static inline size_t max(size_t a, size_t b) {
21 return (a > b) ? a : b;
22 }
23
24 static inline size_t min(size_t a, size_t b) {
25 return (a < b) ? a : b;
26 }
27
28 int debug = 0;
29
30 #define DEBUG(...) do { \
31 if (debug) fprintf(stderr, __VA_ARGS__); \
32 } while(0);
33
34 #define assert_errno(str, expr) do { \
35 if (!(expr)) { \
36 fprintf(stderr, "%s: %s\n", (str), strerror(errno)); \
37 exit(1); \
38 } } while(0);
39
40 #define assert_gai_errno(str, expr) do { \
41 if (!(expr)) { \
42 fprintf(stderr, "%s: %s\n", (str), gai_strerror(errno)); \
43 exit(1); \
44 } } while(0);
45
46
47 /* sock_context
48 *
49 * Context structure used by the reader and writer queues.
50 *
51 * Writers begin by generating a random length and writing it to the descriptor.
52 * The write buffer is filled with a random byte value and written until empty
53 * or until the total length is reached. The write buffer is refilled with more
54 * random data when empty. Each write updates an MD5 digest which is written to
55 * the descriptor once the total length is reached.
56 *
57 * Readers begin by reading the total length of data. The read buffer is filled
58 * and an MD5 digest is computed on the bytes as they are received. Once the
59 * total length of data has be read, an MD5 digest is read from the descriptor
60 * and compared with the computed value.
61 */
62 struct sock_context {
63 enum {
64 LENGTH,
65 DATA,
66 CKSUM,
67 DONE,
68 } state;
69 char label[64];
70 uint32_t len;
71 off_t offset;
72 char buf[8192];
73 size_t buflen;
74 CC_MD5_CTX md5ctx;
75 char md5[CC_MD5_DIGEST_LENGTH];
76 };
77
78 dispatch_source_t
79 create_writer(int wfd, dispatch_block_t completion)
80 {
81 dispatch_source_t ds;
82 struct sock_context *ctx = calloc(1, sizeof(struct sock_context));
83 assert(ctx);
84
85 snprintf(ctx->label, sizeof(ctx->label), "writer.fd.%d", wfd);
86 dispatch_queue_t queue = dispatch_queue_create(ctx->label, 0);
87
88 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, wfd, 0, queue);
89 assert(ds);
90 dispatch_release(queue);
91
92 uint32_t len;
93 do {
94 len = (arc4random() & 0x7FFF);
95 } while (len == 0);
96 ctx->state = LENGTH;
97 CC_MD5_Init(&ctx->md5ctx);
98 ctx->len = len;
99 ctx->buflen = sizeof(len);
100 len = htonl(len);
101 memcpy(ctx->buf, &len, ctx->buflen);
102 DEBUG("%s: LENGTH %d\n", ctx->label, ctx->len);
103
104 dispatch_source_set_event_handler(ds, ^{
105 DEBUG("%s: available %ld\n", ctx->label, dispatch_source_get_data(ds));
106 ssize_t res;
107 size_t wrsz = min(ctx->len, ctx->buflen);
108 res = write(wfd, &ctx->buf[ctx->offset], wrsz);
109 DEBUG("%s: write(%d, %p, %ld): %ld\n", ctx->label, wfd, &ctx->buf[ctx->offset], wrsz, res);
110 if (res > 0) {
111 if (ctx->state == DATA) {
112 CC_MD5_Update(&ctx->md5ctx, &ctx->buf[ctx->offset], res);
113 ctx->len -= res;
114 }
115 ctx->offset += res;
116 ctx->buflen -= res;
117 assert(ctx->offset >= 0);
118 assert(ctx->len >= 0);
119 assert(ctx->buflen >= 0);
120 if (ctx->buflen == 0 || ctx->len == 0) {
121 if (ctx->state == LENGTH) {
122 // finished writing length, move on to data.
123 ctx->state = DATA;
124 ctx->buflen = sizeof(ctx->buf);
125 char pattern = arc4random() & 0xFF;
126 memset(ctx->buf, pattern, ctx->buflen);
127 } else if (ctx->state == DATA && ctx->len == 0) {
128 // finished writing data, move on to cksum.
129 ctx->state = CKSUM;
130 ctx->len = sizeof(ctx->md5);
131 ctx->buflen = sizeof(ctx->md5);
132 CC_MD5_Final(ctx->md5, &ctx->md5ctx);
133 memcpy(ctx->buf, ctx->md5, ctx->buflen);
134 } else if (ctx->state == DATA) {
135 ctx->buflen = sizeof(ctx->buf);
136 char pattern = arc4random() & 0xFF;
137 memset(ctx->buf, pattern, ctx->buflen);
138 } else if (ctx->state == CKSUM) {
139 ctx->state = DONE;
140 dispatch_source_cancel(ds);
141 } else {
142 assert(0);
143 }
144 ctx->offset = 0;
145 }
146 } else if (res == 0) {
147 assert(ctx->state == DONE);
148 assert(0);
149 } else if (res == -1 && errno == EAGAIN) {
150 DEBUG("%s: EAGAIN\n", ctx->label);
151 } else {
152 assert_errno("write", res >= 0);
153 }
154 });
155 dispatch_source_set_cancel_handler(ds, ^{
156 DEBUG("%s: close(%d)\n", ctx->label, wfd);
157 int res = close(wfd);
158 assert_errno("close", res == 0);
159 completion();
160 dispatch_release(ds);
161 free(ctx);
162 });
163 dispatch_resume(ds);
164 return ds;
165 }
166
167 dispatch_source_t
168 create_reader(int rfd, dispatch_block_t completion)
169 {
170 dispatch_source_t ds;
171 struct sock_context *ctx = calloc(1, sizeof(struct sock_context));
172 assert(ctx);
173
174 snprintf(ctx->label, sizeof(ctx->label), "reader.fd.%d", rfd);
175 dispatch_queue_t queue = dispatch_queue_create(ctx->label, 0);
176
177 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, rfd, 0, queue);
178 assert(ds);
179 dispatch_release(queue);
180
181 ctx->state = LENGTH;
182 ctx->len = sizeof(ctx->len);
183 ctx->buflen = sizeof(ctx->len);
184 CC_MD5_Init(&ctx->md5ctx);
185
186 dispatch_source_set_event_handler(ds, ^{
187 DEBUG("%s: available %ld\n", ctx->label, dispatch_source_get_data(ds));
188 ssize_t res;
189 size_t rdsz = min(ctx->len, ctx->buflen);
190 res = read(rfd, &ctx->buf[ctx->offset], rdsz);
191 DEBUG("%s: read(%d,%p,%ld): %ld\n", ctx->label, rfd, &ctx->buf[ctx->offset], rdsz, res);
192
193 // log unexpected data lengths...
194 long expected = dispatch_source_get_data(ds);
195 long actual = res;
196 if (actual >= 0 && (actual != expected && actual != rdsz)) {
197 fprintf(stderr, "%s: expected %ld, actual %ld (rdsz = %ld)\n", ctx->label, expected, actual, rdsz);
198 }
199
200 if (res > 0) {
201 if (ctx->state == DATA) {
202 CC_MD5_Update(&ctx->md5ctx, &ctx->buf[ctx->offset], res);
203 ctx->len -= res;
204 }
205 ctx->offset += res;
206 ctx->buflen -= res;
207 if (ctx->buflen == 0 || ctx->len == 0) {
208 if (ctx->state == LENGTH) {
209 // buffer is full, interpret as uint32_t
210 memcpy(&ctx->len, ctx->buf, sizeof(ctx->len));
211 ctx->len = ntohl(ctx->len);
212 ctx->buflen = sizeof(ctx->buf);
213 ctx->state = DATA;
214 } else if (ctx->state == DATA && ctx->len == 0) {
215 CC_MD5_Final(ctx->md5, &ctx->md5ctx);
216 ctx->state = CKSUM;
217 ctx->len = CC_MD5_DIGEST_LENGTH;
218 ctx->buflen = ctx->len;
219 } else if (ctx->state == DATA) {
220 ctx->buflen = sizeof(ctx->buf);
221 } else if (ctx->state == CKSUM) {
222 ctx->state = DONE;
223 res = memcmp(ctx->buf, ctx->md5, sizeof(ctx->md5));
224 if (res != 0) {
225 DEBUG("%s: MD5 FAILURE\n", ctx->label);
226 }
227 assert(res == 0);
228 }
229 ctx->offset = 0;
230 }
231 } else if (res == 0) {
232 assert(ctx->state == DONE);
233 DEBUG("%s: EOF\n", ctx->label);
234 dispatch_source_cancel(ds);
235 } else {
236 assert_errno("read", res >= 0);
237 }
238 });
239 dispatch_source_set_cancel_handler(ds, ^{
240 DEBUG("%s: close(%d)\n", ctx->label, rfd);
241 int res = close(rfd);
242 assert_errno("close", res == 0);
243 completion();
244 dispatch_release(ds);
245 free(ctx);
246 });
247 dispatch_resume(ds);
248 return ds;
249 }
250
251 void
252 set_nonblock(int fd)
253 {
254 int res, flags;
255 flags = fcntl(fd, F_GETFL);
256
257 flags |= O_NONBLOCK;
258 res = fcntl(fd, F_SETFL, flags);
259 assert_errno("fcntl(F_SETFL,O_NONBLOCK)", res == 0);
260 }
261
262 void
263 create_fifo(int *rfd, int *wfd)
264 {
265 int res;
266 char *name;
267
268 char path[MAXPATHLEN];
269 strlcpy(path, "/tmp/fd_stress.fifo.XXXXXX", sizeof(path));
270 name = mktemp(path);
271
272 res = unlink(name);
273
274 res = mkfifo(name, 0700);
275 assert_errno(name, res == 0);
276
277 *rfd = open(name, O_RDONLY | O_NONBLOCK);
278 assert_errno(name, *rfd >= 0);
279
280 *wfd = open(name, O_WRONLY | O_NONBLOCK);
281 assert_errno(name, *wfd >= 0);
282 }
283
284 void
285 create_pipe(int *rfd, int *wfd)
286 {
287 int res;
288 int fildes[2];
289
290 res = pipe(fildes);
291 assert_errno("pipe", res == 0);
292
293 *rfd = fildes[0];
294 *wfd = fildes[1];
295
296 set_nonblock(*rfd);
297 set_nonblock(*wfd);
298 }
299
300 void
301 create_server_socket(int *rfd, struct sockaddr_in *sa)
302 {
303 int res;
304 int value;
305 socklen_t salen = sizeof(*sa);
306
307 memset(sa, 0, salen);
308 sa->sin_len = salen;
309 sa->sin_family = AF_INET;
310 sa->sin_port = htons(12345);
311 sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
312
313 *rfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
314 assert_errno("socket", *rfd >= 0);
315
316 value = 1;
317 res = setsockopt(*rfd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
318 assert_errno("setsockopt(SO_REUSEADDR)", res == 0);
319
320 value = 1;
321 res = setsockopt(*rfd, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(value));
322 assert_errno("setsockopt(SO_REUSEPORT)", res == 0);
323
324 res = bind(*rfd, (const struct sockaddr *)sa, salen);
325 assert_errno("bind", res == 0);
326
327 res = listen(*rfd, 128);
328 assert_errno("listen", res == 0);
329 }
330
331 void
332 create_client_socket(int *wfd, const struct sockaddr_in *sa)
333 {
334 int res;
335
336 *wfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
337 assert_errno("socket", *wfd >= 0);
338
339 set_nonblock(*wfd);
340
341 res = connect(*wfd, (const struct sockaddr *)sa, sa->sin_len);
342 assert_errno("connect", res == 0 || errno == EINPROGRESS);
343 }
344
345 extern int optind;
346
347 void
348 usage(void)
349 {
350 fprintf(stderr, "usage: fd_stress [-d] iterations width\n");
351 exit(1);
352 }
353
354 int
355 main(int argc, char* argv[])
356 {
357 int serverfd;
358 struct sockaddr_in sa;
359 create_server_socket(&serverfd, &sa);
360
361 int ch;
362
363 while ((ch = getopt(argc, argv, "d")) != -1) {
364 switch (ch) {
365 case 'd':
366 debug = 1;
367 break;
368 case '?':
369 default:
370 usage();
371 break;
372 }
373 }
374 argc -= optind;
375 argv += optind;
376
377 if (argc != 2) {
378 usage();
379 }
380
381 size_t iterations = strtol(argv[0], NULL, 10);
382 size_t width = strtol(argv[1], NULL, 10);
383
384 if (iterations == 0 || width == 0) {
385 usage();
386 }
387
388 fprintf(stdout, "pid %d\n", getpid());
389
390 dispatch_group_t group;
391 group = dispatch_group_create();
392 assert(group);
393
394 #if 0
395 dispatch_queue_t queue = dispatch_queue_create("server", NULL);
396
397 dispatch_source_t ds;
398 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, serverfd, 0, queue);
399 assert(ds);
400 dispatch_source_set_event_handler(ds, ^{
401 int res;
402 int fd;
403 struct sockaddr peer;
404 socklen_t peerlen;
405
406 fd = accept(serverfd, &peer, &peerlen);
407 assert_errno("accept", fd >= 0);
408
409 set_nonblock(fd);
410
411 char host[NI_MAXHOST], serv[NI_MAXSERV];
412 host[0] = 0;
413 serv[0] = 0;
414 res = getnameinfo(&peer, peerlen, host, sizeof(host), serv, sizeof(serv), NI_NUMERICHOST|NI_NUMERICSERV);
415 DEBUG("ACCEPTED %d (%s:%s)\n", fd, host, serv);
416
417 create_reader(fd, ^{ dispatch_group_leave(group); });
418 });
419 dispatch_resume(ds);
420 #endif
421
422 size_t i;
423 for (i = 1; i < iterations; ++i) {
424 fprintf(stderr, "iteration %ld\n", i);
425
426 size_t j;
427 for (j = 0; j < width; ++j) {
428 int rfd, wfd;
429 dispatch_group_enter(group);
430 create_pipe(&rfd, &wfd);
431 DEBUG("PIPE %d %d\n", rfd, wfd);
432 dispatch_source_t reader;
433 reader = create_reader(rfd, ^{ dispatch_group_leave(group); });
434 create_writer(wfd, ^{});
435 }
436
437 #if 0
438 int clientfd;
439 dispatch_group_enter(group);
440 create_client_socket(&clientfd, &sa);
441 DEBUG("CLIENT %d\n", clientfd);
442 create_writer(clientfd, ^{});
443
444 dispatch_group_enter(group);
445 create_fifo(&rfd, &wfd);
446 DEBUG("FIFO %d %d\n", rfd, wfd);
447 create_writer(wfd, ^{});
448 create_reader(rfd, ^{ dispatch_group_leave(group); });
449 #endif
450
451 dispatch_group_wait(group, DISPATCH_TIME_FOREVER);
452 }
453 fprintf(stdout, "pid %d\n", getpid());
454 dispatch_main();
455
456 return 0;
457 }