1 #include <AvailabilityMacros.h>
2 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
3 #include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h>
6 #include <mach/mach_error.h>
7 #include <mach/mach_time.h>
16 #include <sys/sysctl.h>
19 * Sets is a multithreaded test/benchmarking program to evaluate
20 * affinity set placement in Leopard.
22 * The picture here, for each set, is:
25 * -> queue --> producer --> queue --> consumer --
27 * -----------------------------------------------
29 * <------ "stage" -----> <------ "stage" ----->
31 * We spin off sets of production line threads (2 sets by default).
32 * All threads of each line sets the same affinity tag (unless disabled).
33 * By default there are 2 stage (worker) threads per production line.
34 * A worker thread removes a buffer from an input queue, processses it and
35 * queues it on an output queue. By default the initial stage (producer)
36 * writes every byte in a buffer and the other (consumer) stages read every
37 * byte. By default the buffers are 1MB (256 pages) in size but this can be
38 * overidden. By default there are 2 buffers per set (again overridable).
39 * Worker threads process (iterate over) 10000 buffers by default.
41 * With affinity enabled, each producer and consumer thread sets its affinity
42 * to the set number, 1 .. N. So the threads of each set share an L2 cache.
44 * Buffer management uses pthread mutex/condition variables. A thread blocks
45 * when no buffer is available on a queue and it is signaled when a buffer
46 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
47 * The queue management is centralized in a single routine: what queues to
48 * use as input and output and what function to call for processing is
52 pthread_mutex_t funnel
;
53 pthread_cond_t barrier
;
57 int threads_ready
= 0;
59 int iterations
= 10000;
60 boolean_t affinity
= FALSE
;
61 boolean_t halting
= FALSE
;
62 boolean_t cache_config
= FALSE
;
66 TAILQ_ENTRY(work
) link
;
71 * A work queue, complete with pthread objects for its management
73 typedef struct work_queue
{
76 TAILQ_HEAD(, work
) queue
;
80 /* Worker functions take a integer array and size */
81 typedef void (worker_fn_t
)(int *, int);
83 /* This struct controls the function of a thread */
90 struct line_info
*set
;
95 /* This defines a thread set */
96 #define WORKERS_MAX 10
97 typedef struct line_info
{
101 stage_info_t
*stage
[WORKERS_MAX
];
104 #define DBG(x...) do { \
105 if (verbosity > 1) { \
106 pthread_mutex_lock(&funnel); \
108 pthread_mutex_unlock(&funnel); \
112 #define mutter(x...) do { \
113 if (verbosity > 0) { \
118 #define s_if_plural(x) (((x) > 1) ? "s" : "")
124 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
125 "usage: sets [-a] Turn affinity on (off)\n"
126 " [-b B] Number of buffers per set/line (2)\n"
128 "usage: sets [-b B] Number of buffers per set/line (2)\n"
130 " [-c] Configure for max cache performance\n"
132 " [-i I] Number of items/buffers to process (1000)\n"
133 " [-s S] Number of stages per set/line (2)\n"
134 " [-t] Halt for keyboard input to start\n"
135 " [-p P] Number of pages per buffer (256=1MB)]\n"
136 " [-w] Consumer writes data\n"
137 " [-v V] Level of verbosity 0..2 (1)\n"
138 " [N] Number of sets/lines (2)\n"
143 /* Trivial producer: write to each byte */
145 writer_fn(int *data
, int isize
)
149 for (i
= 0; i
< isize
; i
++) {
154 /* Trivial consumer: read each byte */
156 reader_fn(int *data
, int isize
)
161 for (i
= 0; i
< isize
; i
++) {
166 /* Consumer reading and writing the buffer */
168 reader_writer_fn(int *data
, int isize
)
172 for (i
= 0; i
< isize
; i
++) {
178 * This is the central function for every thread.
179 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
182 manager_fn(void *arg
)
184 stage_info_t
*sp
= (stage_info_t
*) arg
;
185 line_info_t
*lp
= sp
->set
;
190 * If we're using affinity sets (we are by default)
191 * set our tag to by our thread set number.
193 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
194 thread_extended_policy_data_t epolicy
;
195 thread_affinity_policy_data_t policy
;
197 epolicy
.timeshare
= FALSE
;
198 ret
= thread_policy_set(
199 mach_thread_self(), THREAD_EXTENDED_POLICY
,
200 (thread_policy_t
) &epolicy
,
201 THREAD_EXTENDED_POLICY_COUNT
);
202 if (ret
!= KERN_SUCCESS
)
203 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret
);
206 policy
.affinity_tag
= lp
->setnum
;
207 ret
= thread_policy_set(
208 mach_thread_self(), THREAD_AFFINITY_POLICY
,
209 (thread_policy_t
) &policy
,
210 THREAD_AFFINITY_POLICY_COUNT
);
211 if (ret
!= KERN_SUCCESS
)
212 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret
);
216 DBG("Starting %s set: %d stage: %d\n", sp
->name
, lp
->setnum
, sp
->stagenum
);
220 * The tets thread to get here releases everyone and starts the timer.
222 pthread_mutex_lock(&funnel
);
224 if (threads_ready
== threads
) {
225 pthread_mutex_unlock(&funnel
);
227 printf(" all threads ready for process %d, "
228 "hit any key to start", getpid());
232 pthread_cond_broadcast(&barrier
);
233 timer
= mach_absolute_time();
235 pthread_cond_wait(&barrier
, &funnel
);
236 pthread_mutex_unlock(&funnel
);
244 * Get a buffer from the input queue.
247 pthread_mutex_lock(&sp
->input
->mtx
);
249 workp
= TAILQ_FIRST(&(sp
->input
->queue
));
252 DBG(" %s[%d,%d] iteration %d waiting for buffer\n",
253 sp
->name
, lp
->setnum
, sp
->stagenum
, iteration
);
254 sp
->input
->waiters
= TRUE
;
255 pthread_cond_wait(&sp
->input
->cnd
, &sp
->input
->mtx
);
256 sp
->input
->waiters
= FALSE
;
258 TAILQ_REMOVE(&(sp
->input
->queue
), workp
, link
);
259 pthread_mutex_unlock(&sp
->input
->mtx
);
261 DBG(" %s[%d,%d] iteration %d work %p data %p\n",
262 sp
->name
, lp
->setnum
, sp
->stagenum
, iteration
, workp
, workp
->data
);
264 /* Do our stuff with the buffer */
265 (void) sp
->fn(workp
->data
, lp
->isize
);
268 * Place the buffer on the input queue.
269 * Signal waiters if required.
271 pthread_mutex_lock(&sp
->output
->mtx
);
272 TAILQ_INSERT_TAIL(&(sp
->output
->queue
), workp
, link
);
273 if (sp
->output
->waiters
) {
274 DBG(" %s[%d,%d] iteration %d signaling work\n",
275 sp
->name
, lp
->setnum
, sp
->stagenum
, iteration
);
276 pthread_cond_signal(&sp
->output
->cnd
);
278 pthread_mutex_unlock(&sp
->output
->mtx
);
279 } while (++iteration
< iterations
);
281 DBG("Ending %s[%d,%d]\n", sp
->name
, lp
->setnum
, sp
->stagenum
);
283 return (void *) iteration
;
286 #define MAX_CACHE_DEPTH 10
288 auto_config(int npages
, int *nbufs
, int *nsets
)
293 int64_t cacheconfig
[MAX_CACHE_DEPTH
];
294 int64_t cachesize
[MAX_CACHE_DEPTH
];
296 mutter("Autoconfiguring...\n");
298 len
= sizeof(cacheconfig
);
299 if (sysctlbyname("hw.cacheconfig",
300 &cacheconfig
[0], &len
, NULL
, 0) != 0) {
301 printf("Unable to get hw.cacheconfig, %d\n", errno
);
304 len
= sizeof(cachesize
);
305 if (sysctlbyname("hw.cachesize",
306 &cachesize
[0], &len
, NULL
, 0) != 0) {
307 printf("Unable to get hw.cachesize, %d\n", errno
);
314 for (llc
= MAX_CACHE_DEPTH
- 1; llc
> 0; llc
--)
315 if (cacheconfig
[llc
] != 0)
319 * Calculate number of buffers of size pages*4096 bytes
320 * fit into 90% of an L2 cache.
322 *nbufs
= cachesize
[llc
] * 9 / (npages
* 4096 * 10);
323 mutter(" L%d (LLC) cache %qd bytes: "
324 "using %d buffers of size %d bytes\n",
325 llc
, cachesize
[llc
], *nbufs
, (npages
* 4096));
328 * Calcalute how many sets:
330 *nsets
= cacheconfig
[0]/cacheconfig
[llc
];
331 mutter(" %qd cpus; %qd cpus per L%d cache: using %d sets\n",
332 cacheconfig
[0], cacheconfig
[llc
], llc
, *nsets
);
335 void (*producer_fnp
)(int *data
, int isize
) = &writer_fn
;
336 void (*consumer_fnp
)(int *data
, int isize
) = &reader_fn
;
339 main(int argc
, char *argv
[])
343 int pages
= 256; /* 1MB */
348 line_info_t
*line_info
;
350 stage_info_t
*stage_info
;
355 /* Do switch parsing: */
356 while ((c
= getopt (argc
, argv
, "ab:chi:p:s:twv:")) != -1) {
359 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
360 affinity
= !affinity
;
366 buffers
= atoi(optarg
);
372 iterations
= atoi(optarg
);
375 pages
= atoi(optarg
);
378 stages
= atoi(optarg
);
379 if (stages
>= WORKERS_MAX
)
386 consumer_fnp
= &reader_writer_fn
;
389 verbosity
= atoi(optarg
);
397 argc
-= optind
; argv
+= optind
;
402 auto_config(pages
, &buffers
, &sets
);
404 pthread_mutex_init(&funnel
, NULL
);
405 pthread_cond_init(&barrier
, NULL
);
408 * Fire up the worker threads.
410 threads
= sets
* stages
;
411 mutter("Launching %d set%s of %d threads with %saffinity, "
412 "consumer reads%s data\n",
413 sets
, s_if_plural(sets
), stages
, affinity
? "": "no ",
414 (consumer_fnp
== &reader_writer_fn
)? " and writes" : "");
416 mutter(" %dkB bytes per buffer, ", pages
* 4);
418 mutter(" %dMB bytes per buffer, ", pages
/ 256);
419 mutter("%d buffer%s per set ",
420 buffers
, s_if_plural(buffers
));
421 if (buffers
* pages
< 256)
422 mutter("(total %dkB)\n", buffers
* pages
* 4);
424 mutter("(total %dMB)\n", buffers
* pages
/ 256);
425 mutter(" processing %d buffer%s...\n",
426 iterations
, s_if_plural(iterations
));
427 line_info
= (line_info_t
*) malloc(sets
* sizeof(line_info_t
));
428 stage_info
= (stage_info_t
*) malloc(sets
* stages
* sizeof(stage_info_t
));
429 for (i
= 0; i
< sets
; i
++) {
435 lp
->isize
= pages
* 4096 / sizeof(int);
436 lp
->data
= (int *) malloc(buffers
* pages
* 4096);
438 /* Set up the queue for the workers of this thread set: */
439 for (j
= 0; j
< stages
; j
++) {
440 sp
= &stage_info
[(i
*stages
) + j
];
444 pthread_mutex_init(&sp
->bufq
.mtx
, NULL
);
445 pthread_cond_init(&sp
->bufq
.cnd
, NULL
);
446 TAILQ_INIT(&sp
->bufq
.queue
);
447 sp
->bufq
.waiters
= FALSE
;
451 * Take a second pass through the stages
452 * to define what the workers are and to interconnect their input/outputs
454 for (j
= 0; j
< stages
; j
++) {
457 sp
->fn
= producer_fnp
;
458 sp
->name
= "producer";
460 sp
->fn
= consumer_fnp
;
461 sp
->name
= "consumer";
463 sp
->input
= &lp
->stage
[j
]->bufq
;
464 sp
->output
= &lp
->stage
[(j
+ 1) % stages
]->bufq
;
467 /* Set up the buffers on the first worker of the set. */
468 work_array
= (work_t
*) malloc(buffers
* sizeof(work_t
));
469 for (j
= 0; j
< buffers
; j
++) {
470 work_array
[j
].data
= lp
->data
+ (lp
->isize
* j
);
471 TAILQ_INSERT_TAIL(&lp
->stage
[0]->bufq
.queue
, &work_array
[j
], link
);
472 DBG(" empty work item %p for set %d data %p\n",
473 &work_array
[j
], i
, work_array
[j
].data
);
476 /* Create this set of threads */
477 for (j
= 0; j
< stages
; j
++) {
478 if (ret
= pthread_create(&lp
->stage
[j
]->thread
, NULL
,
480 (void *) lp
->stage
[j
]))
481 err(1, "pthread_create %d,%d", i
, j
);
486 * We sit back anf wait for the slave to finish.
488 for (i
= 0; i
< sets
; i
++) {
490 for (j
= 0; j
< stages
; j
++) {
491 if(ret
= pthread_join(lp
->stage
[j
]->thread
, (void **)&status
))
492 err(1, "pthread_join %d,%d", i
, j
);
493 DBG("Thread %d,%d status %d\n", i
, j
, status
);
498 * See how long the work took.
500 timer
= mach_absolute_time() - timer
;
501 timer
= timer
/ 1000000ULL;
502 printf("%d.%03d seconds elapsed.\n",
503 (int) (timer
/1000ULL), (int) (timer
% 1000ULL));