]> git.saurik.com Git - apple/xnu.git/blame_incremental - tools/tests/affinity/sets.c
xnu-7195.101.1.tar.gz
[apple/xnu.git] / tools / tests / affinity / sets.c
... / ...
CommitLineData
1#include <AvailabilityMacros.h>
2#include <mach/thread_policy.h>
3#include <mach/mach.h>
4#include <mach/mach_error.h>
5#include <mach/mach_time.h>
6#include <pthread.h>
7#include <sys/queue.h>
8#include <stdio.h>
9#include <stdlib.h>
10#include <string.h>
11#include <unistd.h>
12#include <err.h>
13#include <errno.h>
14#include <sys/sysctl.h>
15
16/*
17 * Sets is a multithreaded test/benchmarking program to evaluate
18 * affinity set placement in Leopard.
19 *
20 * The picture here, for each set, is:
21 *
22 * free work
23 * -> queue --> producer --> queue --> consumer --
24 * | |
25 * -----------------------------------------------
26 *
27 * <------ "stage" -----> <------ "stage" ----->
28 *
29 * We spin off sets of production line threads (2 sets by default).
30 * All threads of each line sets the same affinity tag (unless disabled).
31 * By default there are 2 stage (worker) threads per production line.
32 * A worker thread removes a buffer from an input queue, processses it and
33 * queues it on an output queue. By default the initial stage (producer)
34 * writes every byte in a buffer and the other (consumer) stages read every
35 * byte. By default the buffers are 1MB (256 pages) in size but this can be
36 * overidden. By default there are 2 buffers per set (again overridable).
37 * Worker threads process (iterate over) 10000 buffers by default.
38 *
39 * With affinity enabled, each producer and consumer thread sets its affinity
40 * to the set number, 1 .. N. So the threads of each set share an L2 cache.
41 *
42 * Buffer management uses pthread mutex/condition variables. A thread blocks
43 * when no buffer is available on a queue and it is signaled when a buffer
44 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
45 * The queue management is centralized in a single routine: what queues to
46 * use as input and output and what function to call for processing is
47 * data-driven.
48 */
49
50pthread_mutex_t funnel;
51pthread_cond_t barrier;
52
53uint64_t timer;
54int threads;
55int threads_ready = 0;
56
57int iterations = 10000;
58boolean_t affinity = FALSE;
59boolean_t halting = FALSE;
60boolean_t cache_config = FALSE;
61int verbosity = 1;
62
63typedef struct work {
64 TAILQ_ENTRY(work) link;
65 int *data;
66} work_t;
67
68/*
69 * A work queue, complete with pthread objects for its management
70 */
71typedef struct work_queue {
72 pthread_mutex_t mtx;
73 pthread_cond_t cnd;
74 TAILQ_HEAD(, work) queue;
75 boolean_t waiters;
76} work_queue_t;
77
78/* Worker functions take a integer array and size */
79typedef void (worker_fn_t)(int *, int);
80
81/* This struct controls the function of a thread */
82typedef struct {
83 int stagenum;
84 char *name;
85 worker_fn_t *fn;
86 work_queue_t *input;
87 work_queue_t *output;
88 struct line_info *set;
89 pthread_t thread;
90 work_queue_t bufq;
91} stage_info_t;
92
93/* This defines a thread set */
94#define WORKERS_MAX 10
95typedef struct line_info {
96 int setnum;
97 int *data;
98 int isize;
99 stage_info_t *stage[WORKERS_MAX];
100} line_info_t;
101
102#define DBG(x...) do { \
103 if (verbosity > 1) { \
104 pthread_mutex_lock(&funnel); \
105 printf(x); \
106 pthread_mutex_unlock(&funnel); \
107 } \
108} while (0)
109
110#define mutter(x...) do { \
111 if (verbosity > 0) { \
112 printf(x); \
113 } \
114} while (0)
115
116#define s_if_plural(x) (((x) > 1) ? "s" : "")
117
118static void
119usage()
120{
121 fprintf(stderr,
122 "usage: sets [-a] Turn affinity on (off)\n"
123 " [-b B] Number of buffers per set/line (2)\n"
124 " [-c] Configure for max cache performance\n"
125 " [-h] Print this\n"
126 " [-i I] Number of items/buffers to process (1000)\n"
127 " [-s S] Number of stages per set/line (2)\n"
128 " [-t] Halt for keyboard input to start\n"
129 " [-p P] Number of pages per buffer (256=1MB)]\n"
130 " [-w] Consumer writes data\n"
131 " [-v V] Level of verbosity 0..2 (1)\n"
132 " [N] Number of sets/lines (2)\n"
133 );
134 exit(1);
135}
136
137/* Trivial producer: write to each byte */
138void
139writer_fn(int *data, int isize)
140{
141 int i;
142
143 for (i = 0; i < isize; i++) {
144 data[i] = i;
145 }
146}
147
148/* Trivial consumer: read each byte */
149void
150reader_fn(int *data, int isize)
151{
152 int i;
153 int datum;
154
155 for (i = 0; i < isize; i++) {
156 datum = data[i];
157 }
158}
159
160/* Consumer reading and writing the buffer */
161void
162reader_writer_fn(int *data, int isize)
163{
164 int i;
165
166 for (i = 0; i < isize; i++) {
167 data[i] += 1;
168 }
169}
170
171/*
172 * This is the central function for every thread.
173 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
174 */
175void *
176manager_fn(void *arg)
177{
178 stage_info_t *sp = (stage_info_t *) arg;
179 line_info_t *lp = sp->set;
180 kern_return_t ret;
181 long iteration = 0;
182
183 /*
184 * If we're using affinity sets (we are by default)
185 * set our tag to by our thread set number.
186 */
187 thread_extended_policy_data_t epolicy;
188 thread_affinity_policy_data_t policy;
189
190 epolicy.timeshare = FALSE;
191 ret = thread_policy_set(
192 mach_thread_self(), THREAD_EXTENDED_POLICY,
193 (thread_policy_t) &epolicy,
194 THREAD_EXTENDED_POLICY_COUNT);
195 if (ret != KERN_SUCCESS) {
196 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
197 }
198
199 if (affinity) {
200 policy.affinity_tag = lp->setnum;
201 ret = thread_policy_set(
202 mach_thread_self(), THREAD_AFFINITY_POLICY,
203 (thread_policy_t) &policy,
204 THREAD_AFFINITY_POLICY_COUNT);
205 if (ret != KERN_SUCCESS) {
206 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
207 }
208 }
209
210 DBG("Starting %s set: %d stage: %d\n", sp->name, lp->setnum, sp->stagenum);
211
212 /*
213 * Start barrier.
214 * The tets thread to get here releases everyone and starts the timer.
215 */
216 pthread_mutex_lock(&funnel);
217 threads_ready++;
218 if (threads_ready == threads) {
219 pthread_mutex_unlock(&funnel);
220 if (halting) {
221 printf(" all threads ready for process %d, "
222 "hit any key to start", getpid());
223 fflush(stdout);
224 (void) getchar();
225 }
226 pthread_cond_broadcast(&barrier);
227 timer = mach_absolute_time();
228 } else {
229 pthread_cond_wait(&barrier, &funnel);
230 pthread_mutex_unlock(&funnel);
231 }
232
233 do {
234 int i;
235 work_t *workp;
236
237 /*
238 * Get a buffer from the input queue.
239 * Block if none.
240 */
241 pthread_mutex_lock(&sp->input->mtx);
242 while (1) {
243 workp = TAILQ_FIRST(&(sp->input->queue));
244 if (workp != NULL) {
245 break;
246 }
247 DBG(" %s[%d,%d] iteration %d waiting for buffer\n",
248 sp->name, lp->setnum, sp->stagenum, iteration);
249 sp->input->waiters = TRUE;
250 pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
251 sp->input->waiters = FALSE;
252 }
253 TAILQ_REMOVE(&(sp->input->queue), workp, link);
254 pthread_mutex_unlock(&sp->input->mtx);
255
256 DBG(" %s[%d,%d] iteration %d work %p data %p\n",
257 sp->name, lp->setnum, sp->stagenum, iteration, workp, workp->data);
258
259 /* Do our stuff with the buffer */
260 (void) sp->fn(workp->data, lp->isize);
261
262 /*
263 * Place the buffer on the input queue.
264 * Signal waiters if required.
265 */
266 pthread_mutex_lock(&sp->output->mtx);
267 TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
268 if (sp->output->waiters) {
269 DBG(" %s[%d,%d] iteration %d signaling work\n",
270 sp->name, lp->setnum, sp->stagenum, iteration);
271 pthread_cond_signal(&sp->output->cnd);
272 }
273 pthread_mutex_unlock(&sp->output->mtx);
274 } while (++iteration < iterations);
275
276 DBG("Ending %s[%d,%d]\n", sp->name, lp->setnum, sp->stagenum);
277
278 return (void *) iteration;
279}
280
281#define MAX_CACHE_DEPTH 10
282static void
283auto_config(int npages, int *nbufs, int *nsets)
284{
285 size_t len;
286 int ncpu;
287 int llc;
288 int64_t cacheconfig[MAX_CACHE_DEPTH];
289 int64_t cachesize[MAX_CACHE_DEPTH];
290
291 mutter("Autoconfiguring...\n");
292
293 len = sizeof(cacheconfig);
294 if (sysctlbyname("hw.cacheconfig",
295 &cacheconfig[0], &len, NULL, 0) != 0) {
296 printf("Unable to get hw.cacheconfig, %d\n", errno);
297 exit(1);
298 }
299 len = sizeof(cachesize);
300 if (sysctlbyname("hw.cachesize",
301 &cachesize[0], &len, NULL, 0) != 0) {
302 printf("Unable to get hw.cachesize, %d\n", errno);
303 exit(1);
304 }
305
306 /*
307 * Find LLC
308 */
309 for (llc = MAX_CACHE_DEPTH - 1; llc > 0; llc--) {
310 if (cacheconfig[llc] != 0) {
311 break;
312 }
313 }
314
315 /*
316 * Calculate number of buffers of size pages*4096 bytes
317 * fit into 90% of an L2 cache.
318 */
319 *nbufs = cachesize[llc] * 9 / (npages * 4096 * 10);
320 mutter(" L%d (LLC) cache %qd bytes: "
321 "using %d buffers of size %d bytes\n",
322 llc, cachesize[llc], *nbufs, (npages * 4096));
323
324 /*
325 * Calcalute how many sets:
326 */
327 *nsets = cacheconfig[0] / cacheconfig[llc];
328 mutter(" %qd cpus; %qd cpus per L%d cache: using %d sets\n",
329 cacheconfig[0], cacheconfig[llc], llc, *nsets);
330}
331
332void (*producer_fnp)(int *data, int isize) = &writer_fn;
333void (*consumer_fnp)(int *data, int isize) = &reader_fn;
334
335int
336main(int argc, char *argv[])
337{
338 int i;
339 int j;
340 int pages = 256; /* 1MB */
341 int buffers = 2;
342 int sets = 2;
343 int stages = 2;
344 int *status;
345 line_info_t *line_info;
346 line_info_t *lp;
347 stage_info_t *stage_info;
348 stage_info_t *sp;
349 kern_return_t ret;
350 int c;
351
352 /* Do switch parsing: */
353 while ((c = getopt(argc, argv, "ab:chi:p:s:twv:")) != -1) {
354 switch (c) {
355 case 'a':
356 affinity = !affinity;
357 break;
358 case 'b':
359 buffers = atoi(optarg);
360 break;
361 case 'c':
362 cache_config = TRUE;
363 break;
364 case 'i':
365 iterations = atoi(optarg);
366 break;
367 case 'p':
368 pages = atoi(optarg);
369 break;
370 case 's':
371 stages = atoi(optarg);
372 if (stages >= WORKERS_MAX) {
373 usage();
374 }
375 break;
376 case 't':
377 halting = TRUE;
378 break;
379 case 'w':
380 consumer_fnp = &reader_writer_fn;
381 break;
382 case 'v':
383 verbosity = atoi(optarg);
384 break;
385 case '?':
386 case 'h':
387 default:
388 usage();
389 }
390 }
391 argc -= optind; argv += optind;
392 if (argc > 0) {
393 sets = atoi(*argv);
394 }
395
396 if (cache_config) {
397 auto_config(pages, &buffers, &sets);
398 }
399
400 pthread_mutex_init(&funnel, NULL);
401 pthread_cond_init(&barrier, NULL);
402
403 /*
404 * Fire up the worker threads.
405 */
406 threads = sets * stages;
407 mutter("Launching %d set%s of %d threads with %saffinity, "
408 "consumer reads%s data\n",
409 sets, s_if_plural(sets), stages, affinity? "": "no ",
410 (consumer_fnp == &reader_writer_fn)? " and writes" : "");
411 if (pages < 256) {
412 mutter(" %dkB bytes per buffer, ", pages * 4);
413 } else {
414 mutter(" %dMB bytes per buffer, ", pages / 256);
415 }
416 mutter("%d buffer%s per set ",
417 buffers, s_if_plural(buffers));
418 if (buffers * pages < 256) {
419 mutter("(total %dkB)\n", buffers * pages * 4);
420 } else {
421 mutter("(total %dMB)\n", buffers * pages / 256);
422 }
423 mutter(" processing %d buffer%s...\n",
424 iterations, s_if_plural(iterations));
425 line_info = (line_info_t *) malloc(sets * sizeof(line_info_t));
426 stage_info = (stage_info_t *) malloc(sets * stages * sizeof(stage_info_t));
427 for (i = 0; i < sets; i++) {
428 work_t *work_array;
429
430 lp = &line_info[i];
431
432 lp->setnum = i + 1;
433 lp->isize = pages * 4096 / sizeof(int);
434 lp->data = (int *) malloc(buffers * pages * 4096);
435
436 /* Set up the queue for the workers of this thread set: */
437 for (j = 0; j < stages; j++) {
438 sp = &stage_info[(i * stages) + j];
439 sp->stagenum = j;
440 sp->set = lp;
441 lp->stage[j] = sp;
442 pthread_mutex_init(&sp->bufq.mtx, NULL);
443 pthread_cond_init(&sp->bufq.cnd, NULL);
444 TAILQ_INIT(&sp->bufq.queue);
445 sp->bufq.waiters = FALSE;
446 }
447
448 /*
449 * Take a second pass through the stages
450 * to define what the workers are and to interconnect their input/outputs
451 */
452 for (j = 0; j < stages; j++) {
453 sp = lp->stage[j];
454 if (j == 0) {
455 sp->fn = producer_fnp;
456 sp->name = "producer";
457 } else {
458 sp->fn = consumer_fnp;
459 sp->name = "consumer";
460 }
461 sp->input = &lp->stage[j]->bufq;
462 sp->output = &lp->stage[(j + 1) % stages]->bufq;
463 }
464
465 /* Set up the buffers on the first worker of the set. */
466 work_array = (work_t *) malloc(buffers * sizeof(work_t));
467 for (j = 0; j < buffers; j++) {
468 work_array[j].data = lp->data + (lp->isize * j);
469 TAILQ_INSERT_TAIL(&lp->stage[0]->bufq.queue, &work_array[j], link);
470 DBG(" empty work item %p for set %d data %p\n",
471 &work_array[j], i, work_array[j].data);
472 }
473
474 /* Create this set of threads */
475 for (j = 0; j < stages; j++) {
476 if (ret = pthread_create(&lp->stage[j]->thread, NULL,
477 &manager_fn,
478 (void *) lp->stage[j])) {
479 err(1, "pthread_create %d,%d", i, j);
480 }
481 }
482 }
483
484 /*
485 * We sit back anf wait for the slave to finish.
486 */
487 for (i = 0; i < sets; i++) {
488 lp = &line_info[i];
489 for (j = 0; j < stages; j++) {
490 if (ret = pthread_join(lp->stage[j]->thread, (void **)&status)) {
491 err(1, "pthread_join %d,%d", i, j);
492 }
493 DBG("Thread %d,%d status %d\n", i, j, status);
494 }
495 }
496
497 /*
498 * See how long the work took.
499 */
500 timer = mach_absolute_time() - timer;
501 timer = timer / 1000000ULL;
502 printf("%d.%03d seconds elapsed.\n",
503 (int) (timer / 1000ULL), (int) (timer % 1000ULL));
504
505 return 0;
506}