]> git.saurik.com Git - apple/xnu.git/blame - tools/tests/affinity/sets.c
xnu-2422.1.72.tar.gz
[apple/xnu.git] / tools / tests / affinity / sets.c
CommitLineData
2d21ac55
A
1#include <AvailabilityMacros.h>
2#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
3#include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h>
4#endif
5#include <mach/mach.h>
6#include <mach/mach_error.h>
7#include <mach/mach_time.h>
8#include <pthread.h>
9#include <sys/queue.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <unistd.h>
14#include <err.h>
15#include <errno.h>
39236c6e 16#include <sys/sysctl.h>
2d21ac55
A
17
18/*
19 * Sets is a multithreaded test/benchmarking program to evaluate
20 * affinity set placement in Leopard.
21 *
22 * The picture here, for each set, is:
23 *
24 * free work
25 * -> queue --> producer --> queue --> consumer --
26 * | |
27 * -----------------------------------------------
28 *
29 * <------ "stage" -----> <------ "stage" ----->
30
31 * We spin off sets of production line threads (2 sets by default).
32 * All threads of each line sets the same affinity tag (unless disabled).
33 * By default there are 2 stage (worker) threads per production line.
34 * A worker thread removes a buffer from an input queue, processses it and
35 * queues it on an output queue. By default the initial stage (producer)
36 * writes every byte in a buffer and the other (consumer) stages read every
37 * byte. By default the buffers are 1MB (256 pages) in size but this can be
38 * overidden. By default there are 2 buffers per set (again overridable).
39 * Worker threads process (iterate over) 10000 buffers by default.
40 *
41 * With affinity enabled, each producer and consumer thread sets its affinity
42 * to the set number, 1 .. N. So the threads of each set share an L2 cache.
43 *
44 * Buffer management uses pthread mutex/condition variables. A thread blocks
45 * when no buffer is available on a queue and it is signaled when a buffer
46 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
47 * The queue management is centralized in a single routine: what queues to
48 * use as input and output and what function to call for processing is
49 * data-driven.
50 */
51
52pthread_mutex_t funnel;
53pthread_cond_t barrier;
54
55uint64_t timer;
56int threads;
57int threads_ready = 0;
58
59int iterations = 10000;
60boolean_t affinity = FALSE;
61boolean_t halting = FALSE;
62boolean_t cache_config = FALSE;
63int verbosity = 1;
64
65typedef struct work {
66 TAILQ_ENTRY(work) link;
67 int *data;
68} work_t;
69
70/*
71 * A work queue, complete with pthread objects for its management
72 */
73typedef struct work_queue {
74 pthread_mutex_t mtx;
75 pthread_cond_t cnd;
76 TAILQ_HEAD(, work) queue;
77 boolean_t waiters;
78} work_queue_t;
79
80/* Worker functions take a integer array and size */
81typedef void (worker_fn_t)(int *, int);
82
83/* This struct controls the function of a thread */
84typedef struct {
85 int stagenum;
86 char *name;
87 worker_fn_t *fn;
88 work_queue_t *input;
89 work_queue_t *output;
90 struct line_info *set;
91 pthread_t thread;
92 work_queue_t bufq;
93} stage_info_t;
94
95/* This defines a thread set */
96#define WORKERS_MAX 10
97typedef struct line_info {
98 int setnum;
99 int *data;
100 int isize;
101 stage_info_t *stage[WORKERS_MAX];
102} line_info_t;
103
104#define DBG(x...) do { \
105 if (verbosity > 1) { \
106 pthread_mutex_lock(&funnel); \
107 printf(x); \
108 pthread_mutex_unlock(&funnel); \
109 } \
110} while (0)
111
112#define mutter(x...) do { \
113 if (verbosity > 0) { \
114 printf(x); \
115 } \
116} while (0)
117
118#define s_if_plural(x) (((x) > 1) ? "s" : "")
119
120static void
121usage()
122{
123 fprintf(stderr,
124#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
125 "usage: sets [-a] Turn affinity on (off)\n"
126 " [-b B] Number of buffers per set/line (2)\n"
127#else
128 "usage: sets [-b B] Number of buffers per set/line (2)\n"
129#endif
130 " [-c] Configure for max cache performance\n"
131 " [-h] Print this\n"
132 " [-i I] Number of items/buffers to process (1000)\n"
133 " [-s S] Number of stages per set/line (2)\n"
134 " [-t] Halt for keyboard input to start\n"
135 " [-p P] Number of pages per buffer (256=1MB)]\n"
136 " [-w] Consumer writes data\n"
137 " [-v V] Level of verbosity 0..2 (1)\n"
138 " [N] Number of sets/lines (2)\n"
139 );
140 exit(1);
141}
142
143/* Trivial producer: write to each byte */
144void
145writer_fn(int *data, int isize)
146{
147 int i;
148
149 for (i = 0; i < isize; i++) {
150 data[i] = i;
151 }
152}
153
154/* Trivial consumer: read each byte */
155void
156reader_fn(int *data, int isize)
157{
158 int i;
159 int datum;
160
161 for (i = 0; i < isize; i++) {
162 datum = data[i];
163 }
164}
165
166/* Consumer reading and writing the buffer */
167void
168reader_writer_fn(int *data, int isize)
169{
170 int i;
171
172 for (i = 0; i < isize; i++) {
173 data[i] += 1;
174 }
175}
176
177/*
178 * This is the central function for every thread.
179 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
180 */
181void *
182manager_fn(void *arg)
183{
184 stage_info_t *sp = (stage_info_t *) arg;
185 line_info_t *lp = sp->set;
186 kern_return_t ret;
187 long iteration = 0;
188
189 /*
190 * If we're using affinity sets (we are by default)
191 * set our tag to by our thread set number.
192 */
193#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
194 thread_extended_policy_data_t epolicy;
195 thread_affinity_policy_data_t policy;
196
197 epolicy.timeshare = FALSE;
198 ret = thread_policy_set(
199 mach_thread_self(), THREAD_EXTENDED_POLICY,
200 (thread_policy_t) &epolicy,
201 THREAD_EXTENDED_POLICY_COUNT);
202 if (ret != KERN_SUCCESS)
203 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
204
205 if (affinity) {
206 policy.affinity_tag = lp->setnum;
207 ret = thread_policy_set(
208 mach_thread_self(), THREAD_AFFINITY_POLICY,
209 (thread_policy_t) &policy,
210 THREAD_AFFINITY_POLICY_COUNT);
211 if (ret != KERN_SUCCESS)
212 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
213 }
214#endif
215
216 DBG("Starting %s set: %d stage: %d\n", sp->name, lp->setnum, sp->stagenum);
217
218 /*
219 * Start barrier.
220 * The tets thread to get here releases everyone and starts the timer.
221 */
222 pthread_mutex_lock(&funnel);
223 threads_ready++;
224 if (threads_ready == threads) {
225 pthread_mutex_unlock(&funnel);
226 if (halting) {
227 printf(" all threads ready for process %d, "
228 "hit any key to start", getpid());
229 fflush(stdout);
230 (void) getchar();
231 }
232 pthread_cond_broadcast(&barrier);
233 timer = mach_absolute_time();
234 } else {
235 pthread_cond_wait(&barrier, &funnel);
236 pthread_mutex_unlock(&funnel);
237 }
238
239 do {
240 int i;
241 work_t *workp;
242
243 /*
244 * Get a buffer from the input queue.
245 * Block if none.
246 */
247 pthread_mutex_lock(&sp->input->mtx);
248 while (1) {
249 workp = TAILQ_FIRST(&(sp->input->queue));
250 if (workp != NULL)
251 break;
252 DBG(" %s[%d,%d] iteration %d waiting for buffer\n",
253 sp->name, lp->setnum, sp->stagenum, iteration);
254 sp->input->waiters = TRUE;
255 pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
256 sp->input->waiters = FALSE;
257 }
258 TAILQ_REMOVE(&(sp->input->queue), workp, link);
259 pthread_mutex_unlock(&sp->input->mtx);
260
261 DBG(" %s[%d,%d] iteration %d work %p data %p\n",
262 sp->name, lp->setnum, sp->stagenum, iteration, workp, workp->data);
263
264 /* Do our stuff with the buffer */
265 (void) sp->fn(workp->data, lp->isize);
266
267 /*
268 * Place the buffer on the input queue.
269 * Signal waiters if required.
270 */
271 pthread_mutex_lock(&sp->output->mtx);
272 TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
273 if (sp->output->waiters) {
274 DBG(" %s[%d,%d] iteration %d signaling work\n",
275 sp->name, lp->setnum, sp->stagenum, iteration);
276 pthread_cond_signal(&sp->output->cnd);
277 }
278 pthread_mutex_unlock(&sp->output->mtx);
279 } while (++iteration < iterations);
280
281 DBG("Ending %s[%d,%d]\n", sp->name, lp->setnum, sp->stagenum);
282
283 return (void *) iteration;
284}
285
593a1d5f 286#define MAX_CACHE_DEPTH 10
2d21ac55
A
287static void
288auto_config(int npages, int *nbufs, int *nsets)
289{
39236c6e 290 size_t len;
2d21ac55 291 int ncpu;
593a1d5f
A
292 int llc;
293 int64_t cacheconfig[MAX_CACHE_DEPTH];
294 int64_t cachesize[MAX_CACHE_DEPTH];
2d21ac55
A
295
296 mutter("Autoconfiguring...\n");
297
298 len = sizeof(cacheconfig);
299 if (sysctlbyname("hw.cacheconfig",
300 &cacheconfig[0], &len, NULL, 0) != 0) {
301 printf("Unable to get hw.cacheconfig, %d\n", errno);
302 exit(1);
303 }
304 len = sizeof(cachesize);
305 if (sysctlbyname("hw.cachesize",
306 &cachesize[0], &len, NULL, 0) != 0) {
307 printf("Unable to get hw.cachesize, %d\n", errno);
308 exit(1);
309 }
310
593a1d5f
A
311 /*
312 * Find LLC
313 */
314 for (llc = MAX_CACHE_DEPTH - 1; llc > 0; llc--)
315 if (cacheconfig[llc] != 0)
316 break;
317
2d21ac55
A
318 /*
319 * Calculate number of buffers of size pages*4096 bytes
320 * fit into 90% of an L2 cache.
321 */
593a1d5f
A
322 *nbufs = cachesize[llc] * 9 / (npages * 4096 * 10);
323 mutter(" L%d (LLC) cache %qd bytes: "
2d21ac55 324 "using %d buffers of size %d bytes\n",
593a1d5f 325 llc, cachesize[llc], *nbufs, (npages * 4096));
2d21ac55
A
326
327 /*
328 * Calcalute how many sets:
329 */
593a1d5f
A
330 *nsets = cacheconfig[0]/cacheconfig[llc];
331 mutter(" %qd cpus; %qd cpus per L%d cache: using %d sets\n",
332 cacheconfig[0], cacheconfig[llc], llc, *nsets);
2d21ac55
A
333}
334
335void (*producer_fnp)(int *data, int isize) = &writer_fn;
336void (*consumer_fnp)(int *data, int isize) = &reader_fn;
337
338int
339main(int argc, char *argv[])
340{
341 int i;
342 int j;
343 int pages = 256; /* 1MB */
344 int buffers = 2;
345 int sets = 2;
346 int stages = 2;
347 int *status;
348 line_info_t *line_info;
349 line_info_t *lp;
350 stage_info_t *stage_info;
351 stage_info_t *sp;
352 kern_return_t ret;
353 int c;
354
355 /* Do switch parsing: */
356 while ((c = getopt (argc, argv, "ab:chi:p:s:twv:")) != -1) {
357 switch (c) {
358 case 'a':
359#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
360 affinity = !affinity;
361 break;
362#else
363 usage();
364#endif
365 case 'b':
366 buffers = atoi(optarg);
367 break;
368 case 'c':
369 cache_config = TRUE;
370 break;
371 case 'i':
372 iterations = atoi(optarg);
373 break;
374 case 'p':
375 pages = atoi(optarg);
376 break;
377 case 's':
378 stages = atoi(optarg);
379 if (stages >= WORKERS_MAX)
380 usage();
381 break;
382 case 't':
383 halting = TRUE;
384 break;
385 case 'w':
386 consumer_fnp = &reader_writer_fn;
387 break;
388 case 'v':
389 verbosity = atoi(optarg);
390 break;
391 case '?':
392 case 'h':
393 default:
394 usage();
395 }
396 }
397 argc -= optind; argv += optind;
398 if (argc > 0)
399 sets = atoi(*argv);
400
401 if (cache_config)
402 auto_config(pages, &buffers, &sets);
403
404 pthread_mutex_init(&funnel, NULL);
405 pthread_cond_init(&barrier, NULL);
406
407 /*
408 * Fire up the worker threads.
409 */
410 threads = sets * stages;
411 mutter("Launching %d set%s of %d threads with %saffinity, "
412 "consumer reads%s data\n",
413 sets, s_if_plural(sets), stages, affinity? "": "no ",
414 (consumer_fnp == &reader_writer_fn)? " and writes" : "");
415 if (pages < 256)
416 mutter(" %dkB bytes per buffer, ", pages * 4);
417 else
418 mutter(" %dMB bytes per buffer, ", pages / 256);
419 mutter("%d buffer%s per set ",
420 buffers, s_if_plural(buffers));
421 if (buffers * pages < 256)
422 mutter("(total %dkB)\n", buffers * pages * 4);
423 else
424 mutter("(total %dMB)\n", buffers * pages / 256);
425 mutter(" processing %d buffer%s...\n",
426 iterations, s_if_plural(iterations));
427 line_info = (line_info_t *) malloc(sets * sizeof(line_info_t));
428 stage_info = (stage_info_t *) malloc(sets * stages * sizeof(stage_info_t));
429 for (i = 0; i < sets; i++) {
430 work_t *work_array;
431
432 lp = &line_info[i];
433
434 lp->setnum = i + 1;
435 lp->isize = pages * 4096 / sizeof(int);
436 lp->data = (int *) malloc(buffers * pages * 4096);
437
438 /* Set up the queue for the workers of this thread set: */
439 for (j = 0; j < stages; j++) {
440 sp = &stage_info[(i*stages) + j];
441 sp->stagenum = j;
442 sp->set = lp;
443 lp->stage[j] = sp;
444 pthread_mutex_init(&sp->bufq.mtx, NULL);
445 pthread_cond_init(&sp->bufq.cnd, NULL);
446 TAILQ_INIT(&sp->bufq.queue);
447 sp->bufq.waiters = FALSE;
448 }
449
450 /*
451 * Take a second pass through the stages
452 * to define what the workers are and to interconnect their input/outputs
453 */
454 for (j = 0; j < stages; j++) {
455 sp = lp->stage[j];
456 if (j == 0) {
457 sp->fn = producer_fnp;
458 sp->name = "producer";
459 } else {
460 sp->fn = consumer_fnp;
461 sp->name = "consumer";
462 }
463 sp->input = &lp->stage[j]->bufq;
464 sp->output = &lp->stage[(j + 1) % stages]->bufq;
465 }
466
467 /* Set up the buffers on the first worker of the set. */
468 work_array = (work_t *) malloc(buffers * sizeof(work_t));
469 for (j = 0; j < buffers; j++) {
470 work_array[j].data = lp->data + (lp->isize * j);
471 TAILQ_INSERT_TAIL(&lp->stage[0]->bufq.queue, &work_array[j], link);
472 DBG(" empty work item %p for set %d data %p\n",
473 &work_array[j], i, work_array[j].data);
474 }
475
476 /* Create this set of threads */
477 for (j = 0; j < stages; j++) {
478 if (ret = pthread_create(&lp->stage[j]->thread, NULL,
479 &manager_fn,
480 (void *) lp->stage[j]))
481 err(1, "pthread_create %d,%d", i, j);
482 }
483 }
484
485 /*
486 * We sit back anf wait for the slave to finish.
487 */
488 for (i = 0; i < sets; i++) {
489 lp = &line_info[i];
490 for (j = 0; j < stages; j++) {
491 if(ret = pthread_join(lp->stage[j]->thread, (void **)&status))
492 err(1, "pthread_join %d,%d", i, j);
493 DBG("Thread %d,%d status %d\n", i, j, status);
494 }
495 }
496
497 /*
498 * See how long the work took.
499 */
500 timer = mach_absolute_time() - timer;
501 timer = timer / 1000000ULL;
502 printf("%d.%03d seconds elapsed.\n",
503 (int) (timer/1000ULL), (int) (timer % 1000ULL));
504
505 return 0;
506}