]> git.saurik.com Git - apple/xnu.git/blob - tools/tests/MPMMTest/MPMMtest.c
xnu-3248.30.4.tar.gz
[apple/xnu.git] / tools / tests / MPMMTest / MPMMtest.c
1 #include <AvailabilityMacros.h>
2 #include <mach/thread_policy.h>
3
4 #include <pthread.h>
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <err.h>
9 #include <unistd.h>
10
11 #include <pthread.h>
12 #include <mach/mach.h>
13 #include <mach/mach_error.h>
14 #include <mach/mach_time.h>
15 #include <mach/notify.h>
16 #include <servers/bootstrap.h>
17 #include <sys/types.h>
18 #include <sys/time.h>
19 #include <sys/signal.h>
20 #include <errno.h>
21 #include "../unit_tests/tests_common.h" /* for record_perf_data() */
22
23 #include <libkern/OSAtomic.h>
24
25 #define MAX(A, B) ((A) < (B) ? (B) : (A))
26
27
28 typedef struct {
29 mach_msg_header_t header;
30 mach_msg_trailer_t trailer; // subtract this when sending
31 } ipc_trivial_message;
32
33 typedef struct {
34 mach_msg_header_t header;
35 u_int32_t numbers[0];
36 mach_msg_trailer_t trailer; // subtract this when sending
37 } ipc_inline_message;
38
39 typedef struct {
40 mach_msg_header_t header;
41 mach_msg_body_t body;
42 mach_msg_ool_descriptor_t descriptor;
43 mach_msg_trailer_t trailer; // subtract this when sending
44 } ipc_complex_message;
45
46 enum {
47 msg_type_trivial = 0,
48 msg_type_inline = 1,
49 msg_type_complex = 2
50 };
51
52 struct port_args {
53 int server_num;
54 int req_size;
55 mach_msg_header_t *req_msg;
56 int reply_size;
57 mach_msg_header_t *reply_msg;
58 mach_port_t port;
59 mach_port_t rcv_set;
60
61 mach_port_t *set;
62 mach_port_t *port_list;
63 };
64
65 typedef union {
66 pid_t pid;
67 pthread_t tid;
68 } thread_id_t;
69
70 /* Global options */
71 static int verbose = 0;
72 static boolean_t affinity = FALSE;
73 static boolean_t timeshare = FALSE;
74 static boolean_t threaded = FALSE;
75 static boolean_t oneway = FALSE;
76 static boolean_t useset = FALSE;
77 static boolean_t save_perfdata = FALSE;
78 int msg_type;
79 int num_ints;
80 int num_msgs;
81 int num_clients;
82 int num_servers;
83 int client_delay;
84 int client_spin;
85 int client_pages;
86 int portcount = 1;
87 int setcount = 0;
88 boolean_t stress_prepost = FALSE;
89 char **server_port_name;
90
91 struct port_args *server_port_args;
92
93 /* global data */
94 mach_timebase_info_data_t g_timebase;
95 int64_t g_client_send_time = 0;
96
97 static inline uint64_t ns_to_abs(uint64_t ns)
98 {
99 return ns * g_timebase.denom / g_timebase.numer;
100 }
101
102 static inline uint64_t abs_to_ns(uint64_t abs)
103 {
104 return abs * g_timebase.numer / g_timebase.denom;
105 }
106
107
108 void signal_handler(int sig) {
109 }
110
111 void usage(const char *progname) {
112 fprintf(stderr, "usage: %s [options]\n", progname);
113 fprintf(stderr, "where options are:\n");
114 fprintf(stderr, " -affinity\t\tthreads use affinity\n");
115 fprintf(stderr, " -timeshare\t\tthreads use timeshare\n");
116 fprintf(stderr, " -threaded\t\tuse (p)threads\n");
117 fprintf(stderr, " -verbose\t\tbe verbose (use multiple times to increase verbosity)\n");
118 fprintf(stderr, " -oneway\t\tdo not request return reply\n");
119 fprintf(stderr, " -count num\t\tnumber of messages to send\n");
120 fprintf(stderr, " -perf \t\tCreate perfdata files for metrics.\n");
121 fprintf(stderr, " -type trivial|inline|complex\ttype of messages to send\n");
122 fprintf(stderr, " -numints num\tnumber of 32-bit ints to send in messages\n");
123 fprintf(stderr, " -servers num\tnumber of server threads to run\n");
124 fprintf(stderr, " -clients num\tnumber of clients per server\n");
125 fprintf(stderr, " -delay num\t\tmicroseconds to sleep clients between messages\n");
126 fprintf(stderr, " -work num\t\tmicroseconds of client work\n");
127 fprintf(stderr, " -pages num\t\tpages of memory touched by client work\n");
128 fprintf(stderr, " -set nset num\tcreate [nset] portsets and [num] ports in each server.\n");
129 fprintf(stderr, " \tEach port is connected to each set.\n");
130 fprintf(stderr, " -prepost\t\tstress the prepost system (implies -threaded, requires -set X Y)\n");
131 fprintf(stderr, "default values are:\n");
132 fprintf(stderr, " . no affinity\n");
133 fprintf(stderr, " . not timeshare\n");
134 fprintf(stderr, " . not threaded\n");
135 fprintf(stderr, " . not verbose\n");
136 fprintf(stderr, " . not oneway\n");
137 fprintf(stderr, " . client sends 100000 messages\n");
138 fprintf(stderr, " . inline message type\n");
139 fprintf(stderr, " . 64 32-bit integers in inline/complex messages\n");
140 fprintf(stderr, " . (num_available_processors+1)%%2 servers\n");
141 fprintf(stderr, " . 4 clients per server\n");
142 fprintf(stderr, " . no delay\n");
143 fprintf(stderr, " . no sets / extra ports\n");
144 fprintf(stderr, " . no prepost stress\n");
145 exit(1);
146 }
147
148 void parse_args(int argc, char *argv[]) {
149 host_basic_info_data_t info;
150 mach_msg_type_number_t count;
151 kern_return_t result;
152
153 /* Initialize defaults */
154 msg_type = msg_type_trivial;
155 num_ints = 64;
156 num_msgs = 100000;
157 client_delay = 0;
158 num_clients = 4;
159
160 count = HOST_BASIC_INFO_COUNT;
161 result = host_info(mach_host_self(), HOST_BASIC_INFO,
162 (host_info_t)&info, &count);
163 if (result == KERN_SUCCESS && info.avail_cpus > 1)
164 num_servers = info.avail_cpus / 2;
165 else
166 num_servers = 1;
167
168 const char *progname = argv[0];
169 argc--; argv++;
170 while (0 < argc) {
171 if (0 == strcmp("-verbose", argv[0])) {
172 verbose++;
173 argc--; argv++;
174 } else if (0 == strcmp("-affinity", argv[0])) {
175 affinity = TRUE;
176 argc--; argv++;
177 } else if (0 == strcmp("-timeshare", argv[0])) {
178 timeshare = TRUE;
179 argc--; argv++;
180 } else if (0 == strcmp("-threaded", argv[0])) {
181 threaded = TRUE;
182 argc--; argv++;
183 } else if (0 == strcmp("-oneway", argv[0])) {
184 oneway = TRUE;
185 argc--; argv++;
186 } else if (0 == strcmp("-perf", argv[0])) {
187 save_perfdata = TRUE;
188 argc--; argv++;
189 } else if (0 == strcmp("-type", argv[0])) {
190 if (argc < 2)
191 usage(progname);
192 if (0 == strcmp("trivial", argv[1])) {
193 msg_type = msg_type_trivial;
194 } else if (0 == strcmp("inline", argv[1])) {
195 msg_type = msg_type_inline;
196 } else if (0 == strcmp("complex", argv[1])) {
197 msg_type = msg_type_complex;
198 } else
199 usage(progname);
200 argc -= 2; argv += 2;
201 } else if (0 == strcmp("-numints", argv[0])) {
202 if (argc < 2)
203 usage(progname);
204 num_ints = strtoul(argv[1], NULL, 0);
205 argc -= 2; argv += 2;
206 } else if (0 == strcmp("-count", argv[0])) {
207 if (argc < 2)
208 usage(progname);
209 num_msgs = strtoul(argv[1], NULL, 0);
210 argc -= 2; argv += 2;
211 } else if (0 == strcmp("-clients", argv[0])) {
212 if (argc < 2)
213 usage(progname);
214 num_clients = strtoul(argv[1], NULL, 0);
215 argc -= 2; argv += 2;
216 } else if (0 == strcmp("-servers", argv[0])) {
217 if (argc < 2)
218 usage(progname);
219 num_servers = strtoul(argv[1], NULL, 0);
220 argc -= 2; argv += 2;
221 } else if (0 == strcmp("-delay", argv[0])) {
222 if (argc < 2)
223 usage(progname);
224 client_delay = strtoul(argv[1], NULL, 0);
225 argc -= 2; argv += 2;
226 } else if (0 == strcmp("-spin", argv[0])) {
227 if (argc < 2)
228 usage(progname);
229 client_spin = strtoul(argv[1], NULL, 0);
230 argc -= 2; argv += 2;
231 } else if (0 == strcmp("-pages", argv[0])) {
232 if (argc < 2)
233 usage(progname);
234 client_pages = strtoul(argv[1], NULL, 0);
235 argc -= 2; argv += 2;
236 } else if (0 == strcmp("-set", argv[0])) {
237 if (argc < 3)
238 usage(progname);
239 setcount = strtoul(argv[1], NULL, 0);
240 portcount = strtoul(argv[2], NULL, 0);
241 if (setcount <= 0 || portcount <= 0)
242 usage(progname);
243 useset = TRUE;
244 argc -= 3; argv += 3;
245 } else if (0 == strcmp("-prepost", argv[0])) {
246 stress_prepost = TRUE;
247 threaded = TRUE;
248 argc--; argv++;
249 } else {
250 fprintf(stderr, "unknown option '%s'\n", argv[0]);
251 usage(progname);
252 }
253 }
254
255 if (stress_prepost) {
256 if (!threaded) {
257 fprintf(stderr, "Prepost stress test _must_ be threaded\n");
258 exit(1);
259 }
260 if (portcount < 1 || setcount < 1) {
261 fprintf(stderr, "Prepost stress test requires >= 1 port in >= 1 set.\n");
262 exit(1);
263 }
264 }
265 }
266
267 void setup_server_ports(struct port_args *ports)
268 {
269 kern_return_t ret = 0;
270 mach_port_t bsport;
271 mach_port_t port;
272
273 ports->req_size = MAX(sizeof(ipc_inline_message) +
274 sizeof(u_int32_t) * num_ints,
275 sizeof(ipc_complex_message));
276 ports->reply_size = sizeof(ipc_trivial_message) -
277 sizeof(mach_msg_trailer_t);
278 ports->req_msg = malloc(ports->req_size);
279 ports->reply_msg = malloc(ports->reply_size);
280 if (setcount > 0) {
281 ports->set = (mach_port_t *)calloc(sizeof(mach_port_t), setcount);
282 if (!ports->set) {
283 fprintf(stderr, "calloc(%lu, %d) failed!\n", sizeof(mach_port_t), setcount);
284 exit(1);
285 }
286 }
287 if (stress_prepost) {
288 ports->port_list = (mach_port_t *)calloc(sizeof(mach_port_t), portcount);
289 if (!ports->port_list) {
290 fprintf(stderr, "calloc(%lu, %d) failed!\n", sizeof(mach_port_t), portcount);
291 exit(1);
292 }
293 }
294
295 if (useset) {
296 mach_port_t set;
297 if (setcount < 1) {
298 fprintf(stderr, "Can't use sets with a setcount of %d\n", setcount);
299 exit(1);
300 }
301
302 for (int ns = 0; ns < setcount; ns++) {
303 ret = mach_port_allocate(mach_task_self(),
304 MACH_PORT_RIGHT_PORT_SET,
305 &ports->set[ns]);
306 if (KERN_SUCCESS != ret) {
307 mach_error("mach_port_allocate(SET): ", ret);
308 exit(1);
309 }
310 if (verbose > 1)
311 printf("SVR[%d] allocated set[%d] %#x\n",
312 ports->server_num, ns, ports->set[ns]);
313
314 set = ports->set[ns];
315 }
316
317 /* receive on a port set (always use the first in the chain) */
318 ports->rcv_set = ports->set[0];
319 }
320
321 /* stuff the portset(s) with ports */
322 for (int i = 0; i < portcount; i++) {
323 ret = mach_port_allocate(mach_task_self(),
324 MACH_PORT_RIGHT_RECEIVE,
325 &port);
326 if (KERN_SUCCESS != ret) {
327 mach_error("mach_port_allocate(PORT): ", ret);
328 exit(1);
329 }
330
331 if (stress_prepost)
332 ports->port_list[i] = port;
333
334 if (useset) {
335 /* insert the port into _all_ allocated lowest-level sets */
336 for (int ns = 0; ns < setcount; ns++) {
337 if (verbose > 1)
338 printf("SVR[%d] moving port %#x into set %#x...\n",
339 ports->server_num, port, ports->set[ns]);
340 ret = mach_port_insert_member(mach_task_self(),
341 port, ports->set[ns]);
342 if (KERN_SUCCESS != ret) {
343 mach_error("mach_port_insert_member(): ", ret);
344 exit(1);
345 }
346 }
347 }
348 }
349
350 /* use the last one as the server's bootstrap port */
351 ports->port = port;
352
353 if (stress_prepost) {
354 /* insert a send right for _each_ port */
355 for (int i = 0; i < portcount; i++) {
356 ret = mach_port_insert_right(mach_task_self(),
357 ports->port_list[i],
358 ports->port_list[i],
359 MACH_MSG_TYPE_MAKE_SEND);
360 if (KERN_SUCCESS != ret) {
361 mach_error("mach_port_insert_right(): ", ret);
362 exit(1);
363 }
364 }
365 } else {
366 ret = mach_port_insert_right(mach_task_self(),
367 ports->port,
368 ports->port,
369 MACH_MSG_TYPE_MAKE_SEND);
370 if (KERN_SUCCESS != ret) {
371 mach_error("mach_port_insert_right(): ", ret);
372 exit(1);
373 }
374 }
375
376 ret = task_get_bootstrap_port(mach_task_self(), &bsport);
377 if (KERN_SUCCESS != ret) {
378 mach_error("task_get_bootstrap_port(): ", ret);
379 exit(1);
380 }
381
382 if (verbose) {
383 printf("server waiting for IPC messages from client on port '%s' (%#x).\n",
384 server_port_name[ports->server_num], ports->port);
385 }
386 ret = bootstrap_register(bsport,
387 server_port_name[ports->server_num],
388 ports->port);
389 if (KERN_SUCCESS != ret) {
390 mach_error("bootstrap_register(): ", ret);
391 exit(1);
392 }
393 }
394
395 void setup_client_ports(struct port_args *ports)
396 {
397 kern_return_t ret = 0;
398 switch(msg_type) {
399 case msg_type_trivial:
400 ports->req_size = sizeof(ipc_trivial_message);
401 break;
402 case msg_type_inline:
403 ports->req_size = sizeof(ipc_inline_message) +
404 sizeof(u_int32_t) * num_ints;
405 break;
406 case msg_type_complex:
407 ports->req_size = sizeof(ipc_complex_message);
408 break;
409 }
410 ports->req_size -= sizeof(mach_msg_trailer_t);
411 ports->reply_size = sizeof(ipc_trivial_message);
412 ports->req_msg = malloc(ports->req_size);
413 ports->reply_msg = malloc(ports->reply_size);
414
415 ret = mach_port_allocate(mach_task_self(),
416 MACH_PORT_RIGHT_RECEIVE,
417 &(ports->port));
418 if (KERN_SUCCESS != ret) {
419 mach_error("mach_port_allocate(): ", ret);
420 exit(1);
421 }
422 if (verbose) {
423 printf("Client sending %d %s IPC messages to port '%s' in %s mode\n",
424 num_msgs, (msg_type == msg_type_inline) ?
425 "inline" : ((msg_type == msg_type_complex) ?
426 "complex" : "trivial"),
427 server_port_name[ports->server_num],
428 (oneway ? "oneway" : "rpc"));
429 }
430 }
431
432
433 static void
434 thread_setup(int tag) {
435 kern_return_t ret;
436 thread_extended_policy_data_t epolicy;
437 thread_affinity_policy_data_t policy;
438
439 if (!timeshare) {
440 epolicy.timeshare = FALSE;
441 ret = thread_policy_set(
442 mach_thread_self(), THREAD_EXTENDED_POLICY,
443 (thread_policy_t) &epolicy,
444 THREAD_EXTENDED_POLICY_COUNT);
445 if (ret != KERN_SUCCESS)
446 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
447 }
448
449 if (affinity) {
450 policy.affinity_tag = tag;
451 ret = thread_policy_set(
452 mach_thread_self(), THREAD_AFFINITY_POLICY,
453 (thread_policy_t) &policy,
454 THREAD_AFFINITY_POLICY_COUNT);
455 if (ret != KERN_SUCCESS)
456 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
457 }
458 }
459
460 void *
461 server(void *serverarg)
462 {
463 int idx;
464 kern_return_t ret;
465 int totalmsg = num_msgs * num_clients;
466 mach_port_t recv_port;
467 uint64_t starttm, endtm;
468
469 int svr_num = (int)(uintptr_t)serverarg;
470 struct port_args *args = &server_port_args[svr_num];
471
472 args->server_num = svr_num;
473 setup_server_ports(args);
474
475 thread_setup(args->server_num + 1);
476
477 recv_port = (useset) ? args->rcv_set : args->port;
478
479 for (idx = 0; idx < totalmsg; idx++) {
480 if (verbose > 2)
481 printf("server awaiting message %d\n", idx);
482 ret = mach_msg(args->req_msg,
483 MACH_RCV_MSG|MACH_RCV_INTERRUPT|MACH_RCV_LARGE,
484 0,
485 args->req_size,
486 recv_port,
487 MACH_MSG_TIMEOUT_NONE,
488 MACH_PORT_NULL);
489 if (MACH_RCV_INTERRUPTED == ret)
490 break;
491 if (MACH_MSG_SUCCESS != ret) {
492 if (verbose)
493 printf("mach_msg() ret=%d", ret);
494 mach_error("mach_msg (receive): ", ret);
495 exit(1);
496 }
497 if (verbose > 2)
498 printf("server received message %d\n", idx);
499 if (args->req_msg->msgh_bits & MACH_MSGH_BITS_COMPLEX) {
500 ret = vm_deallocate(mach_task_self(),
501 (vm_address_t)((ipc_complex_message *)args->req_msg)->descriptor.address,
502 ((ipc_complex_message *)args->req_msg)->descriptor.size);
503 }
504
505 if (1 == args->req_msg->msgh_id) {
506 if (verbose > 2)
507 printf("server sending reply %d\n", idx);
508 args->reply_msg->msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_MOVE_SEND_ONCE, 0);
509 args->reply_msg->msgh_size = args->reply_size;
510 args->reply_msg->msgh_remote_port = args->req_msg->msgh_remote_port;
511 args->reply_msg->msgh_local_port = MACH_PORT_NULL;
512 args->reply_msg->msgh_id = 2;
513 ret = mach_msg(args->reply_msg,
514 MACH_SEND_MSG,
515 args->reply_size,
516 0,
517 MACH_PORT_NULL,
518 MACH_MSG_TIMEOUT_NONE,
519 MACH_PORT_NULL);
520 if (MACH_MSG_SUCCESS != ret) {
521 mach_error("mach_msg (send): ", ret);
522 exit(1);
523 }
524 }
525 }
526
527 if (!useset)
528 return NULL;
529
530 if (verbose < 1)
531 return NULL;
532
533 uint64_t deltans = 0;
534 /*
535 * If we're using multiple sets, explicitly tear them all down
536 * and measure the time.
537 */
538 for (int ns = 0; ns < setcount; ns++) {
539 if (verbose > 1)
540 printf("\tTearing down set[%d] %#x...\n", ns, args->set[ns]);
541 starttm = mach_absolute_time();
542 ret = mach_port_mod_refs(mach_task_self(), args->set[ns], MACH_PORT_RIGHT_PORT_SET, -1);
543 endtm = mach_absolute_time();
544 deltans += abs_to_ns(endtm - starttm);
545 if (ret != KERN_SUCCESS) {
546 mach_error("mach_port_mod_refs(): ", ret);
547 exit(1);
548 }
549 }
550
551 uint64_t nlinks = (uint64_t)setcount * (uint64_t)portcount;
552
553 printf("\tteardown of %llu links took %llu ns\n", nlinks, deltans);
554 printf("\t%lluns per set\n", deltans / (uint64_t)setcount);
555
556 return NULL;
557 }
558
559 static inline void
560 client_spin_loop(unsigned count, void (fn)(void))
561 {
562 while (count--)
563 fn();
564 }
565
566 static long dummy_memory;
567 static long *client_memory = &dummy_memory;
568 static void
569 client_work_atom(void)
570 {
571 static int i;
572
573 if (++i > client_pages * PAGE_SIZE / sizeof(long))
574 i = 0;
575 client_memory[i] = 0;
576 }
577
578 static int calibration_count = 10000;
579 static int calibration_usec;
580 static void *
581 calibrate_client_work(void)
582 {
583 long dummy;
584 struct timeval nowtv;
585 struct timeval warmuptv = { 0, 100 * 1000 }; /* 100ms */
586 struct timeval starttv;
587 struct timeval endtv;
588
589 if (client_spin) {
590 /* Warm-up the stepper first... */
591 gettimeofday(&nowtv, NULL);
592 timeradd(&nowtv, &warmuptv, &endtv);
593 do {
594 client_spin_loop(calibration_count, client_work_atom);
595 gettimeofday(&nowtv, NULL);
596 } while (timercmp(&nowtv, &endtv, < ));
597
598 /* Now do the calibration */
599 while (TRUE) {
600 gettimeofday(&starttv, NULL);
601 client_spin_loop(calibration_count, client_work_atom);
602 gettimeofday(&endtv, NULL);
603 if (endtv.tv_sec - starttv.tv_sec > 1) {
604 calibration_count /= 10;
605 continue;
606 }
607 calibration_usec = endtv.tv_usec - starttv.tv_usec;
608 if (endtv.tv_usec < starttv.tv_usec) {
609 calibration_usec += 1000000;
610 }
611 if (calibration_usec < 1000) {
612 calibration_count *= 10;
613 continue;
614 }
615 calibration_count /= calibration_usec;
616 break;
617 }
618 if (verbose > 1)
619 printf("calibration_count=%d calibration_usec=%d\n",
620 calibration_count, calibration_usec);
621 }
622 return NULL;
623 }
624
625 static void *
626 client_work(void)
627 {
628
629 if (client_spin) {
630 client_spin_loop(calibration_count*client_spin,
631 client_work_atom);
632 }
633
634 if (client_delay) {
635 usleep(client_delay);
636 }
637 return NULL;
638 }
639
640 void *client(void *threadarg)
641 {
642 struct port_args args;
643 struct port_args *svr_args = NULL;
644 int idx;
645 mach_msg_header_t *req, *reply;
646 mach_port_t bsport, servport;
647 kern_return_t ret;
648 int server_num = (int)(uintptr_t)threadarg;
649 void *ints = malloc(sizeof(u_int32_t) * num_ints);
650
651 if (verbose)
652 printf("client(%d) started, server port name %s\n",
653 server_num, server_port_name[server_num]);
654
655 args.server_num = server_num;
656 thread_setup(server_num + 1);
657
658 if (stress_prepost)
659 svr_args = &server_port_args[server_num];
660
661 /* find server port */
662 ret = task_get_bootstrap_port(mach_task_self(), &bsport);
663 if (KERN_SUCCESS != ret) {
664 mach_error("task_get_bootstrap_port(): ", ret);
665 exit(1);
666 }
667 ret = bootstrap_look_up(bsport,
668 server_port_name[server_num],
669 &servport);
670 if (KERN_SUCCESS != ret) {
671 mach_error("bootstrap_look_up(): ", ret);
672 exit(1);
673 }
674
675 setup_client_ports(&args);
676
677 /* Allocate and touch memory */
678 if (client_pages) {
679 unsigned i;
680 client_memory = (long *) malloc(client_pages * PAGE_SIZE);
681 for (i = 0; i < client_pages; i++)
682 client_memory[i * PAGE_SIZE / sizeof(long)] = 0;
683 }
684
685 uint64_t starttm, endtm;
686
687 /* start message loop */
688 for (idx = 0; idx < num_msgs; idx++) {
689 req = args.req_msg;
690 reply = args.reply_msg;
691
692 req->msgh_size = args.req_size;
693 if (stress_prepost) {
694 req->msgh_remote_port = svr_args->port_list[idx % portcount];
695 } else {
696 req->msgh_remote_port = servport;
697 }
698 if (oneway) {
699 req->msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_COPY_SEND, 0);
700 req->msgh_local_port = MACH_PORT_NULL;
701 } else {
702 req->msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_COPY_SEND,
703 MACH_MSG_TYPE_MAKE_SEND_ONCE);
704 req->msgh_local_port = args.port;
705 }
706 req->msgh_id = oneway ? 0 : 1;
707 if (msg_type == msg_type_complex) {
708 (req)->msgh_bits |= MACH_MSGH_BITS_COMPLEX;
709 ((ipc_complex_message *)req)->body.msgh_descriptor_count = 1;
710 ((ipc_complex_message *)req)->descriptor.address = ints;
711 ((ipc_complex_message *)req)->descriptor.size =
712 num_ints * sizeof(u_int32_t);
713 ((ipc_complex_message *)req)->descriptor.deallocate = FALSE;
714 ((ipc_complex_message *)req)->descriptor.copy = MACH_MSG_VIRTUAL_COPY;
715 ((ipc_complex_message *)req)->descriptor.type = MACH_MSG_OOL_DESCRIPTOR;
716 }
717 if (verbose > 2)
718 printf("client sending message %d to port %#x\n",
719 idx, req->msgh_remote_port);
720 starttm = mach_absolute_time();
721 ret = mach_msg(req,
722 MACH_SEND_MSG,
723 args.req_size,
724 0,
725 MACH_PORT_NULL,
726 MACH_MSG_TIMEOUT_NONE,
727 MACH_PORT_NULL);
728 endtm = mach_absolute_time();
729 if (MACH_MSG_SUCCESS != ret) {
730 mach_error("mach_msg (send): ", ret);
731 fprintf(stderr, "bailing after %u iterations\n", idx);
732 exit(1);
733 break;
734 }
735 if (stress_prepost)
736 OSAtomicAdd64(endtm - starttm, &g_client_send_time);
737
738 if (!oneway) {
739 if (verbose > 2)
740 printf("client awaiting reply %d\n", idx);
741 reply->msgh_bits = 0;
742 reply->msgh_size = args.reply_size;
743 reply->msgh_local_port = args.port;
744 ret = mach_msg(args.reply_msg,
745 MACH_RCV_MSG|MACH_RCV_INTERRUPT,
746 0,
747 args.reply_size,
748 args.port,
749 MACH_MSG_TIMEOUT_NONE,
750 MACH_PORT_NULL);
751 if (MACH_MSG_SUCCESS != ret) {
752 mach_error("mach_msg (receive): ", ret);
753 fprintf(stderr, "bailing after %u iterations\n",
754 idx);
755 exit(1);
756 }
757 if (verbose > 2)
758 printf("client received reply %d\n", idx);
759 }
760
761 client_work();
762 }
763
764 free(ints);
765 return NULL;
766 }
767
768 static void
769 thread_spawn(thread_id_t *thread, void *(fn)(void *), void *arg) {
770 if (threaded) {
771 kern_return_t ret;
772 ret = pthread_create(
773 &thread->tid,
774 NULL,
775 fn,
776 arg);
777 if (ret != 0)
778 err(1, "pthread_create()");
779 if (verbose > 1)
780 printf("created pthread %p\n", thread->tid);
781 } else {
782 thread->pid = fork();
783 if (thread->pid == 0) {
784 if (verbose > 1)
785 printf("calling %p(%p)\n", fn, arg);
786 fn(arg);
787 exit(0);
788 }
789 if (verbose > 1)
790 printf("forked pid %d\n", thread->pid);
791 }
792 }
793
794 static void
795 thread_join(thread_id_t *thread) {
796 if (threaded) {
797 kern_return_t ret;
798 if (verbose > 1)
799 printf("joining thread %p\n", thread->tid);
800 ret = pthread_join(thread->tid, NULL);
801 if (ret != KERN_SUCCESS)
802 err(1, "pthread_join(%p)", thread->tid);
803 } else {
804 int stat;
805 if (verbose > 1)
806 printf("waiting for pid %d\n", thread->pid);
807 waitpid(thread->pid, &stat, 0);
808 }
809 }
810
811 static void
812 wait_for_servers(void)
813 {
814 int i;
815 int retry_count = 10;
816 mach_port_t bsport, servport;
817 kern_return_t ret;
818
819 /* find server port */
820 ret = task_get_bootstrap_port(mach_task_self(), &bsport);
821 if (KERN_SUCCESS != ret) {
822 mach_error("task_get_bootstrap_port(): ", ret);
823 exit(1);
824 }
825
826 while (retry_count-- > 0) {
827 for (i = 0; i < num_servers; i++) {
828 ret = bootstrap_look_up(bsport,
829 server_port_name[i],
830 &servport);
831 if (ret != KERN_SUCCESS) {
832 break;
833 }
834 }
835 if (ret == KERN_SUCCESS)
836 return;
837 usleep(100 * 1000); /* 100ms */
838 }
839 fprintf(stderr, "Server(s) failed to register\n");
840 exit(1);
841 }
842
843 int main(int argc, char *argv[])
844 {
845 int i;
846 int j;
847 thread_id_t *client_id;
848 thread_id_t *server_id;
849
850 signal(SIGINT, signal_handler);
851 parse_args(argc, argv);
852
853 if (mach_timebase_info(&g_timebase) != KERN_SUCCESS) {
854 fprintf(stderr, "Can't get mach_timebase_info!\n");
855 exit(1);
856 }
857
858 calibrate_client_work();
859
860 /*
861 * If we're using affinity create an empty namespace now
862 * so this is shared by all our offspring.
863 */
864 if (affinity)
865 thread_setup(0);
866
867 server_id = (thread_id_t *) malloc(num_servers * sizeof(thread_id_t));
868 server_port_name = (char **) malloc(num_servers * sizeof(char *));
869 server_port_args = (struct port_args *)calloc(sizeof(struct port_args), num_servers);
870 if (!server_id || !server_port_name || !server_port_args) {
871 fprintf(stderr, "malloc/calloc of %d server book keeping structs failed\n", num_servers);
872 exit(1);
873 }
874
875 if (verbose)
876 printf("creating %d servers\n", num_servers);
877 for (i = 0; i < num_servers; i++) {
878 server_port_name[i] = (char *) malloc(sizeof("PORT.pppppp.xx"));
879 /* PORT names include pid of main process for disambiguation */
880 sprintf(server_port_name[i], "PORT.%06d.%02d", getpid(), i);
881 thread_spawn(&server_id[i], server, (void *) (long) i);
882 }
883
884 int totalclients = num_servers * num_clients;
885 int totalmsg = num_msgs * totalclients;
886 struct timeval starttv, endtv, deltatv;
887
888 /*
889 * Wait for all servers to have registered all ports before starting
890 * the clients and the clock.
891 */
892 wait_for_servers();
893
894 printf("%d server%s, %d client%s per server (%d total) %u messages...",
895 num_servers, (num_servers > 1)? "s" : "",
896 num_clients, (num_clients > 1)? "s" : "",
897 totalclients,
898 totalmsg);
899 fflush(stdout);
900
901 /* Call gettimeofday() once and throw away result; some implementations
902 * (like Mach's) cache some time zone info on first call.
903 */
904 gettimeofday(&starttv, NULL);
905 gettimeofday(&starttv, NULL);
906
907 client_id = (thread_id_t *) malloc(totalclients * sizeof(thread_id_t));
908 if (verbose)
909 printf("creating %d clients\n", totalclients);
910 for (i = 0; i < num_servers; i++) {
911 for (j = 0; j < num_clients; j++) {
912 thread_spawn(
913 &client_id[(i*num_clients) + j],
914 client,
915 (void *) (long) i);
916 }
917 }
918
919 /* Wait for servers to complete */
920 for (i = 0; i < num_servers; i++) {
921 thread_join(&server_id[i]);
922 }
923
924 gettimeofday(&endtv, NULL);
925 if (verbose)
926 printf("all servers complete: waiting for clients...\n");
927
928 for (i = 0; i < totalclients; i++) {
929 thread_join(&client_id[i]);
930 }
931
932 /* report results */
933 deltatv.tv_sec = endtv.tv_sec - starttv.tv_sec;
934 deltatv.tv_usec = endtv.tv_usec - starttv.tv_usec;
935 if (endtv.tv_usec < starttv.tv_usec) {
936 deltatv.tv_sec--;
937 deltatv.tv_usec += 1000000;
938 }
939
940 double dsecs = (double) deltatv.tv_sec +
941 1.0E-6 * (double) deltatv.tv_usec;
942
943 printf(" in %lu.%03u seconds\n",
944 deltatv.tv_sec, deltatv.tv_usec/1000);
945 printf(" throughput in messages/sec: %g\n",
946 (double)totalmsg / dsecs);
947 printf(" average message latency (usec): %2.3g\n",
948 dsecs * 1.0E6 / (double) totalmsg);
949
950 double time_in_sec = (double)deltatv.tv_sec + (double)deltatv.tv_usec/1000.0;
951 double throughput_msg_p_sec = (double) totalmsg/dsecs;
952 double avg_msg_latency = dsecs*1.0E6 / (double)totalmsg;
953
954 if (save_perfdata == TRUE) {
955 record_perf_data("mpmm_avg_msg_latency", "usec", avg_msg_latency, "Message latency measured in microseconds. Lower is better", stderr);
956 }
957
958 if (stress_prepost) {
959 int64_t sendns = abs_to_ns(g_client_send_time);
960 dsecs = (double)sendns / (double)NSEC_PER_SEC;
961 printf(" total send time: %2.3gs\n", dsecs);
962 printf(" average send time (usec): %2.3g\n",
963 dsecs * 1.0E6 / (double)totalmsg);
964 }
965
966 return (0);
967
968 }