1 #include <AvailabilityMacros.h>
2 #include <mach/thread_policy.h>
4 #include <mach/mach_error.h>
5 #include <mach/mach_time.h>
14 #include <sys/sysctl.h>
17 * Sets is a multithreaded test/benchmarking program to evaluate
18 * affinity set placement in Leopard.
20 * The picture here, for each set, is:
23 * -> queue --> producer --> queue --> consumer --
25 * -----------------------------------------------
27 * <------ "stage" -----> <------ "stage" ----->
29 * We spin off sets of production line threads (2 sets by default).
30 * All threads of each line sets the same affinity tag (unless disabled).
31 * By default there are 2 stage (worker) threads per production line.
32 * A worker thread removes a buffer from an input queue, processses it and
33 * queues it on an output queue. By default the initial stage (producer)
34 * writes every byte in a buffer and the other (consumer) stages read every
35 * byte. By default the buffers are 1MB (256 pages) in size but this can be
36 * overidden. By default there are 2 buffers per set (again overridable).
37 * Worker threads process (iterate over) 10000 buffers by default.
39 * With affinity enabled, each producer and consumer thread sets its affinity
40 * to the set number, 1 .. N. So the threads of each set share an L2 cache.
42 * Buffer management uses pthread mutex/condition variables. A thread blocks
43 * when no buffer is available on a queue and it is signaled when a buffer
44 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
45 * The queue management is centralized in a single routine: what queues to
46 * use as input and output and what function to call for processing is
50 pthread_mutex_t funnel
;
51 pthread_cond_t barrier
;
55 int threads_ready
= 0;
57 int iterations
= 10000;
58 boolean_t affinity
= FALSE
;
59 boolean_t halting
= FALSE
;
60 boolean_t cache_config
= FALSE
;
64 TAILQ_ENTRY(work
) link
;
69 * A work queue, complete with pthread objects for its management
71 typedef struct work_queue
{
74 TAILQ_HEAD(, work
) queue
;
78 /* Worker functions take a integer array and size */
79 typedef void (worker_fn_t
)(int *, int);
81 /* This struct controls the function of a thread */
88 struct line_info
*set
;
93 /* This defines a thread set */
94 #define WORKERS_MAX 10
95 typedef struct line_info
{
99 stage_info_t
*stage
[WORKERS_MAX
];
102 #define DBG(x...) do { \
103 if (verbosity > 1) { \
104 pthread_mutex_lock(&funnel); \
106 pthread_mutex_unlock(&funnel); \
110 #define mutter(x...) do { \
111 if (verbosity > 0) { \
116 #define s_if_plural(x) (((x) > 1) ? "s" : "")
122 "usage: sets [-a] Turn affinity on (off)\n"
123 " [-b B] Number of buffers per set/line (2)\n"
124 " [-c] Configure for max cache performance\n"
126 " [-i I] Number of items/buffers to process (1000)\n"
127 " [-s S] Number of stages per set/line (2)\n"
128 " [-t] Halt for keyboard input to start\n"
129 " [-p P] Number of pages per buffer (256=1MB)]\n"
130 " [-w] Consumer writes data\n"
131 " [-v V] Level of verbosity 0..2 (1)\n"
132 " [N] Number of sets/lines (2)\n"
137 /* Trivial producer: write to each byte */
139 writer_fn(int *data
, int isize
)
143 for (i
= 0; i
< isize
; i
++) {
148 /* Trivial consumer: read each byte */
150 reader_fn(int *data
, int isize
)
155 for (i
= 0; i
< isize
; i
++) {
160 /* Consumer reading and writing the buffer */
162 reader_writer_fn(int *data
, int isize
)
166 for (i
= 0; i
< isize
; i
++) {
172 * This is the central function for every thread.
173 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
176 manager_fn(void *arg
)
178 stage_info_t
*sp
= (stage_info_t
*) arg
;
179 line_info_t
*lp
= sp
->set
;
184 * If we're using affinity sets (we are by default)
185 * set our tag to by our thread set number.
187 thread_extended_policy_data_t epolicy
;
188 thread_affinity_policy_data_t policy
;
190 epolicy
.timeshare
= FALSE
;
191 ret
= thread_policy_set(
192 mach_thread_self(), THREAD_EXTENDED_POLICY
,
193 (thread_policy_t
) &epolicy
,
194 THREAD_EXTENDED_POLICY_COUNT
);
195 if (ret
!= KERN_SUCCESS
) {
196 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret
);
200 policy
.affinity_tag
= lp
->setnum
;
201 ret
= thread_policy_set(
202 mach_thread_self(), THREAD_AFFINITY_POLICY
,
203 (thread_policy_t
) &policy
,
204 THREAD_AFFINITY_POLICY_COUNT
);
205 if (ret
!= KERN_SUCCESS
) {
206 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret
);
210 DBG("Starting %s set: %d stage: %d\n", sp
->name
, lp
->setnum
, sp
->stagenum
);
214 * The tets thread to get here releases everyone and starts the timer.
216 pthread_mutex_lock(&funnel
);
218 if (threads_ready
== threads
) {
219 pthread_mutex_unlock(&funnel
);
221 printf(" all threads ready for process %d, "
222 "hit any key to start", getpid());
226 pthread_cond_broadcast(&barrier
);
227 timer
= mach_absolute_time();
229 pthread_cond_wait(&barrier
, &funnel
);
230 pthread_mutex_unlock(&funnel
);
238 * Get a buffer from the input queue.
241 pthread_mutex_lock(&sp
->input
->mtx
);
243 workp
= TAILQ_FIRST(&(sp
->input
->queue
));
247 DBG(" %s[%d,%d] iteration %d waiting for buffer\n",
248 sp
->name
, lp
->setnum
, sp
->stagenum
, iteration
);
249 sp
->input
->waiters
= TRUE
;
250 pthread_cond_wait(&sp
->input
->cnd
, &sp
->input
->mtx
);
251 sp
->input
->waiters
= FALSE
;
253 TAILQ_REMOVE(&(sp
->input
->queue
), workp
, link
);
254 pthread_mutex_unlock(&sp
->input
->mtx
);
256 DBG(" %s[%d,%d] iteration %d work %p data %p\n",
257 sp
->name
, lp
->setnum
, sp
->stagenum
, iteration
, workp
, workp
->data
);
259 /* Do our stuff with the buffer */
260 (void) sp
->fn(workp
->data
, lp
->isize
);
263 * Place the buffer on the input queue.
264 * Signal waiters if required.
266 pthread_mutex_lock(&sp
->output
->mtx
);
267 TAILQ_INSERT_TAIL(&(sp
->output
->queue
), workp
, link
);
268 if (sp
->output
->waiters
) {
269 DBG(" %s[%d,%d] iteration %d signaling work\n",
270 sp
->name
, lp
->setnum
, sp
->stagenum
, iteration
);
271 pthread_cond_signal(&sp
->output
->cnd
);
273 pthread_mutex_unlock(&sp
->output
->mtx
);
274 } while (++iteration
< iterations
);
276 DBG("Ending %s[%d,%d]\n", sp
->name
, lp
->setnum
, sp
->stagenum
);
278 return (void *) iteration
;
281 #define MAX_CACHE_DEPTH 10
283 auto_config(int npages
, int *nbufs
, int *nsets
)
288 int64_t cacheconfig
[MAX_CACHE_DEPTH
];
289 int64_t cachesize
[MAX_CACHE_DEPTH
];
291 mutter("Autoconfiguring...\n");
293 len
= sizeof(cacheconfig
);
294 if (sysctlbyname("hw.cacheconfig",
295 &cacheconfig
[0], &len
, NULL
, 0) != 0) {
296 printf("Unable to get hw.cacheconfig, %d\n", errno
);
299 len
= sizeof(cachesize
);
300 if (sysctlbyname("hw.cachesize",
301 &cachesize
[0], &len
, NULL
, 0) != 0) {
302 printf("Unable to get hw.cachesize, %d\n", errno
);
309 for (llc
= MAX_CACHE_DEPTH
- 1; llc
> 0; llc
--) {
310 if (cacheconfig
[llc
] != 0) {
316 * Calculate number of buffers of size pages*4096 bytes
317 * fit into 90% of an L2 cache.
319 *nbufs
= cachesize
[llc
] * 9 / (npages
* 4096 * 10);
320 mutter(" L%d (LLC) cache %qd bytes: "
321 "using %d buffers of size %d bytes\n",
322 llc
, cachesize
[llc
], *nbufs
, (npages
* 4096));
325 * Calcalute how many sets:
327 *nsets
= cacheconfig
[0] / cacheconfig
[llc
];
328 mutter(" %qd cpus; %qd cpus per L%d cache: using %d sets\n",
329 cacheconfig
[0], cacheconfig
[llc
], llc
, *nsets
);
332 void (*producer_fnp
)(int *data
, int isize
) = &writer_fn
;
333 void (*consumer_fnp
)(int *data
, int isize
) = &reader_fn
;
336 main(int argc
, char *argv
[])
340 int pages
= 256; /* 1MB */
345 line_info_t
*line_info
;
347 stage_info_t
*stage_info
;
352 /* Do switch parsing: */
353 while ((c
= getopt(argc
, argv
, "ab:chi:p:s:twv:")) != -1) {
356 affinity
= !affinity
;
359 buffers
= atoi(optarg
);
365 iterations
= atoi(optarg
);
368 pages
= atoi(optarg
);
371 stages
= atoi(optarg
);
372 if (stages
>= WORKERS_MAX
) {
380 consumer_fnp
= &reader_writer_fn
;
383 verbosity
= atoi(optarg
);
391 argc
-= optind
; argv
+= optind
;
397 auto_config(pages
, &buffers
, &sets
);
400 pthread_mutex_init(&funnel
, NULL
);
401 pthread_cond_init(&barrier
, NULL
);
404 * Fire up the worker threads.
406 threads
= sets
* stages
;
407 mutter("Launching %d set%s of %d threads with %saffinity, "
408 "consumer reads%s data\n",
409 sets
, s_if_plural(sets
), stages
, affinity
? "": "no ",
410 (consumer_fnp
== &reader_writer_fn
)? " and writes" : "");
412 mutter(" %dkB bytes per buffer, ", pages
* 4);
414 mutter(" %dMB bytes per buffer, ", pages
/ 256);
416 mutter("%d buffer%s per set ",
417 buffers
, s_if_plural(buffers
));
418 if (buffers
* pages
< 256) {
419 mutter("(total %dkB)\n", buffers
* pages
* 4);
421 mutter("(total %dMB)\n", buffers
* pages
/ 256);
423 mutter(" processing %d buffer%s...\n",
424 iterations
, s_if_plural(iterations
));
425 line_info
= (line_info_t
*) malloc(sets
* sizeof(line_info_t
));
426 stage_info
= (stage_info_t
*) malloc(sets
* stages
* sizeof(stage_info_t
));
427 for (i
= 0; i
< sets
; i
++) {
433 lp
->isize
= pages
* 4096 / sizeof(int);
434 lp
->data
= (int *) malloc(buffers
* pages
* 4096);
436 /* Set up the queue for the workers of this thread set: */
437 for (j
= 0; j
< stages
; j
++) {
438 sp
= &stage_info
[(i
* stages
) + j
];
442 pthread_mutex_init(&sp
->bufq
.mtx
, NULL
);
443 pthread_cond_init(&sp
->bufq
.cnd
, NULL
);
444 TAILQ_INIT(&sp
->bufq
.queue
);
445 sp
->bufq
.waiters
= FALSE
;
449 * Take a second pass through the stages
450 * to define what the workers are and to interconnect their input/outputs
452 for (j
= 0; j
< stages
; j
++) {
455 sp
->fn
= producer_fnp
;
456 sp
->name
= "producer";
458 sp
->fn
= consumer_fnp
;
459 sp
->name
= "consumer";
461 sp
->input
= &lp
->stage
[j
]->bufq
;
462 sp
->output
= &lp
->stage
[(j
+ 1) % stages
]->bufq
;
465 /* Set up the buffers on the first worker of the set. */
466 work_array
= (work_t
*) malloc(buffers
* sizeof(work_t
));
467 for (j
= 0; j
< buffers
; j
++) {
468 work_array
[j
].data
= lp
->data
+ (lp
->isize
* j
);
469 TAILQ_INSERT_TAIL(&lp
->stage
[0]->bufq
.queue
, &work_array
[j
], link
);
470 DBG(" empty work item %p for set %d data %p\n",
471 &work_array
[j
], i
, work_array
[j
].data
);
474 /* Create this set of threads */
475 for (j
= 0; j
< stages
; j
++) {
476 if (ret
= pthread_create(&lp
->stage
[j
]->thread
, NULL
,
478 (void *) lp
->stage
[j
])) {
479 err(1, "pthread_create %d,%d", i
, j
);
485 * We sit back anf wait for the slave to finish.
487 for (i
= 0; i
< sets
; i
++) {
489 for (j
= 0; j
< stages
; j
++) {
490 if (ret
= pthread_join(lp
->stage
[j
]->thread
, (void **)&status
)) {
491 err(1, "pthread_join %d,%d", i
, j
);
493 DBG("Thread %d,%d status %d\n", i
, j
, status
);
498 * See how long the work took.
500 timer
= mach_absolute_time() - timer
;
501 timer
= timer
/ 1000000ULL;
502 printf("%d.%03d seconds elapsed.\n",
503 (int) (timer
/ 1000ULL), (int) (timer
% 1000ULL));