1 #include <AvailabilityMacros.h>
2 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
3 #include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h>
6 #include <mach/mach_error.h>
7 #include <mach/mach_time.h>
18 * Sets is a multithreaded test/benchmarking program to evaluate
19 * affinity set placement in Leopard.
21 * The picture here, for each set, is:
24 * -> queue --> producer --> queue --> consumer --
26 * -----------------------------------------------
28 * <------ "stage" -----> <------ "stage" ----->
30 * We spin off sets of production line threads (2 sets by default).
31 * All threads of each line sets the same affinity tag (unless disabled).
32 * By default there are 2 stage (worker) threads per production line.
33 * A worker thread removes a buffer from an input queue, processses it and
34 * queues it on an output queue. By default the initial stage (producer)
35 * writes every byte in a buffer and the other (consumer) stages read every
36 * byte. By default the buffers are 1MB (256 pages) in size but this can be
37 * overidden. By default there are 2 buffers per set (again overridable).
38 * Worker threads process (iterate over) 10000 buffers by default.
40 * With affinity enabled, each producer and consumer thread sets its affinity
41 * to the set number, 1 .. N. So the threads of each set share an L2 cache.
43 * Buffer management uses pthread mutex/condition variables. A thread blocks
44 * when no buffer is available on a queue and it is signaled when a buffer
45 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
46 * The queue management is centralized in a single routine: what queues to
47 * use as input and output and what function to call for processing is
51 pthread_mutex_t funnel
;
52 pthread_cond_t barrier
;
56 int threads_ready
= 0;
58 int iterations
= 10000;
59 boolean_t affinity
= FALSE
;
60 boolean_t halting
= FALSE
;
61 boolean_t cache_config
= FALSE
;
65 TAILQ_ENTRY(work
) link
;
70 * A work queue, complete with pthread objects for its management
72 typedef struct work_queue
{
75 TAILQ_HEAD(, work
) queue
;
79 /* Worker functions take a integer array and size */
80 typedef void (worker_fn_t
)(int *, int);
82 /* This struct controls the function of a thread */
89 struct line_info
*set
;
94 /* This defines a thread set */
95 #define WORKERS_MAX 10
96 typedef struct line_info
{
100 stage_info_t
*stage
[WORKERS_MAX
];
103 #define DBG(x...) do { \
104 if (verbosity > 1) { \
105 pthread_mutex_lock(&funnel); \
107 pthread_mutex_unlock(&funnel); \
111 #define mutter(x...) do { \
112 if (verbosity > 0) { \
117 #define s_if_plural(x) (((x) > 1) ? "s" : "")
123 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
124 "usage: sets [-a] Turn affinity on (off)\n"
125 " [-b B] Number of buffers per set/line (2)\n"
127 "usage: sets [-b B] Number of buffers per set/line (2)\n"
129 " [-c] Configure for max cache performance\n"
131 " [-i I] Number of items/buffers to process (1000)\n"
132 " [-s S] Number of stages per set/line (2)\n"
133 " [-t] Halt for keyboard input to start\n"
134 " [-p P] Number of pages per buffer (256=1MB)]\n"
135 " [-w] Consumer writes data\n"
136 " [-v V] Level of verbosity 0..2 (1)\n"
137 " [N] Number of sets/lines (2)\n"
142 /* Trivial producer: write to each byte */
144 writer_fn(int *data
, int isize
)
148 for (i
= 0; i
< isize
; i
++) {
153 /* Trivial consumer: read each byte */
155 reader_fn(int *data
, int isize
)
160 for (i
= 0; i
< isize
; i
++) {
165 /* Consumer reading and writing the buffer */
167 reader_writer_fn(int *data
, int isize
)
171 for (i
= 0; i
< isize
; i
++) {
177 * This is the central function for every thread.
178 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
181 manager_fn(void *arg
)
183 stage_info_t
*sp
= (stage_info_t
*) arg
;
184 line_info_t
*lp
= sp
->set
;
189 * If we're using affinity sets (we are by default)
190 * set our tag to by our thread set number.
192 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
193 thread_extended_policy_data_t epolicy
;
194 thread_affinity_policy_data_t policy
;
196 epolicy
.timeshare
= FALSE
;
197 ret
= thread_policy_set(
198 mach_thread_self(), THREAD_EXTENDED_POLICY
,
199 (thread_policy_t
) &epolicy
,
200 THREAD_EXTENDED_POLICY_COUNT
);
201 if (ret
!= KERN_SUCCESS
)
202 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret
);
205 policy
.affinity_tag
= lp
->setnum
;
206 ret
= thread_policy_set(
207 mach_thread_self(), THREAD_AFFINITY_POLICY
,
208 (thread_policy_t
) &policy
,
209 THREAD_AFFINITY_POLICY_COUNT
);
210 if (ret
!= KERN_SUCCESS
)
211 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret
);
215 DBG("Starting %s set: %d stage: %d\n", sp
->name
, lp
->setnum
, sp
->stagenum
);
219 * The tets thread to get here releases everyone and starts the timer.
221 pthread_mutex_lock(&funnel
);
223 if (threads_ready
== threads
) {
224 pthread_mutex_unlock(&funnel
);
226 printf(" all threads ready for process %d, "
227 "hit any key to start", getpid());
231 pthread_cond_broadcast(&barrier
);
232 timer
= mach_absolute_time();
234 pthread_cond_wait(&barrier
, &funnel
);
235 pthread_mutex_unlock(&funnel
);
243 * Get a buffer from the input queue.
246 pthread_mutex_lock(&sp
->input
->mtx
);
248 workp
= TAILQ_FIRST(&(sp
->input
->queue
));
251 DBG(" %s[%d,%d] iteration %d waiting for buffer\n",
252 sp
->name
, lp
->setnum
, sp
->stagenum
, iteration
);
253 sp
->input
->waiters
= TRUE
;
254 pthread_cond_wait(&sp
->input
->cnd
, &sp
->input
->mtx
);
255 sp
->input
->waiters
= FALSE
;
257 TAILQ_REMOVE(&(sp
->input
->queue
), workp
, link
);
258 pthread_mutex_unlock(&sp
->input
->mtx
);
260 DBG(" %s[%d,%d] iteration %d work %p data %p\n",
261 sp
->name
, lp
->setnum
, sp
->stagenum
, iteration
, workp
, workp
->data
);
263 /* Do our stuff with the buffer */
264 (void) sp
->fn(workp
->data
, lp
->isize
);
267 * Place the buffer on the input queue.
268 * Signal waiters if required.
270 pthread_mutex_lock(&sp
->output
->mtx
);
271 TAILQ_INSERT_TAIL(&(sp
->output
->queue
), workp
, link
);
272 if (sp
->output
->waiters
) {
273 DBG(" %s[%d,%d] iteration %d signaling work\n",
274 sp
->name
, lp
->setnum
, sp
->stagenum
, iteration
);
275 pthread_cond_signal(&sp
->output
->cnd
);
277 pthread_mutex_unlock(&sp
->output
->mtx
);
278 } while (++iteration
< iterations
);
280 DBG("Ending %s[%d,%d]\n", sp
->name
, lp
->setnum
, sp
->stagenum
);
282 return (void *) iteration
;
285 #define MAX_CACHE_DEPTH 10
287 auto_config(int npages
, int *nbufs
, int *nsets
)
292 int64_t cacheconfig
[MAX_CACHE_DEPTH
];
293 int64_t cachesize
[MAX_CACHE_DEPTH
];
295 mutter("Autoconfiguring...\n");
297 len
= sizeof(cacheconfig
);
298 if (sysctlbyname("hw.cacheconfig",
299 &cacheconfig
[0], &len
, NULL
, 0) != 0) {
300 printf("Unable to get hw.cacheconfig, %d\n", errno
);
303 len
= sizeof(cachesize
);
304 if (sysctlbyname("hw.cachesize",
305 &cachesize
[0], &len
, NULL
, 0) != 0) {
306 printf("Unable to get hw.cachesize, %d\n", errno
);
313 for (llc
= MAX_CACHE_DEPTH
- 1; llc
> 0; llc
--)
314 if (cacheconfig
[llc
] != 0)
318 * Calculate number of buffers of size pages*4096 bytes
319 * fit into 90% of an L2 cache.
321 *nbufs
= cachesize
[llc
] * 9 / (npages
* 4096 * 10);
322 mutter(" L%d (LLC) cache %qd bytes: "
323 "using %d buffers of size %d bytes\n",
324 llc
, cachesize
[llc
], *nbufs
, (npages
* 4096));
327 * Calcalute how many sets:
329 *nsets
= cacheconfig
[0]/cacheconfig
[llc
];
330 mutter(" %qd cpus; %qd cpus per L%d cache: using %d sets\n",
331 cacheconfig
[0], cacheconfig
[llc
], llc
, *nsets
);
334 void (*producer_fnp
)(int *data
, int isize
) = &writer_fn
;
335 void (*consumer_fnp
)(int *data
, int isize
) = &reader_fn
;
338 main(int argc
, char *argv
[])
342 int pages
= 256; /* 1MB */
347 line_info_t
*line_info
;
349 stage_info_t
*stage_info
;
354 /* Do switch parsing: */
355 while ((c
= getopt (argc
, argv
, "ab:chi:p:s:twv:")) != -1) {
358 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
359 affinity
= !affinity
;
365 buffers
= atoi(optarg
);
371 iterations
= atoi(optarg
);
374 pages
= atoi(optarg
);
377 stages
= atoi(optarg
);
378 if (stages
>= WORKERS_MAX
)
385 consumer_fnp
= &reader_writer_fn
;
388 verbosity
= atoi(optarg
);
396 argc
-= optind
; argv
+= optind
;
401 auto_config(pages
, &buffers
, &sets
);
403 pthread_mutex_init(&funnel
, NULL
);
404 pthread_cond_init(&barrier
, NULL
);
407 * Fire up the worker threads.
409 threads
= sets
* stages
;
410 mutter("Launching %d set%s of %d threads with %saffinity, "
411 "consumer reads%s data\n",
412 sets
, s_if_plural(sets
), stages
, affinity
? "": "no ",
413 (consumer_fnp
== &reader_writer_fn
)? " and writes" : "");
415 mutter(" %dkB bytes per buffer, ", pages
* 4);
417 mutter(" %dMB bytes per buffer, ", pages
/ 256);
418 mutter("%d buffer%s per set ",
419 buffers
, s_if_plural(buffers
));
420 if (buffers
* pages
< 256)
421 mutter("(total %dkB)\n", buffers
* pages
* 4);
423 mutter("(total %dMB)\n", buffers
* pages
/ 256);
424 mutter(" processing %d buffer%s...\n",
425 iterations
, s_if_plural(iterations
));
426 line_info
= (line_info_t
*) malloc(sets
* sizeof(line_info_t
));
427 stage_info
= (stage_info_t
*) malloc(sets
* stages
* sizeof(stage_info_t
));
428 for (i
= 0; i
< sets
; i
++) {
434 lp
->isize
= pages
* 4096 / sizeof(int);
435 lp
->data
= (int *) malloc(buffers
* pages
* 4096);
437 /* Set up the queue for the workers of this thread set: */
438 for (j
= 0; j
< stages
; j
++) {
439 sp
= &stage_info
[(i
*stages
) + j
];
443 pthread_mutex_init(&sp
->bufq
.mtx
, NULL
);
444 pthread_cond_init(&sp
->bufq
.cnd
, NULL
);
445 TAILQ_INIT(&sp
->bufq
.queue
);
446 sp
->bufq
.waiters
= FALSE
;
450 * Take a second pass through the stages
451 * to define what the workers are and to interconnect their input/outputs
453 for (j
= 0; j
< stages
; j
++) {
456 sp
->fn
= producer_fnp
;
457 sp
->name
= "producer";
459 sp
->fn
= consumer_fnp
;
460 sp
->name
= "consumer";
462 sp
->input
= &lp
->stage
[j
]->bufq
;
463 sp
->output
= &lp
->stage
[(j
+ 1) % stages
]->bufq
;
466 /* Set up the buffers on the first worker of the set. */
467 work_array
= (work_t
*) malloc(buffers
* sizeof(work_t
));
468 for (j
= 0; j
< buffers
; j
++) {
469 work_array
[j
].data
= lp
->data
+ (lp
->isize
* j
);
470 TAILQ_INSERT_TAIL(&lp
->stage
[0]->bufq
.queue
, &work_array
[j
], link
);
471 DBG(" empty work item %p for set %d data %p\n",
472 &work_array
[j
], i
, work_array
[j
].data
);
475 /* Create this set of threads */
476 for (j
= 0; j
< stages
; j
++) {
477 if (ret
= pthread_create(&lp
->stage
[j
]->thread
, NULL
,
479 (void *) lp
->stage
[j
]))
480 err(1, "pthread_create %d,%d", i
, j
);
485 * We sit back anf wait for the slave to finish.
487 for (i
= 0; i
< sets
; i
++) {
489 for (j
= 0; j
< stages
; j
++) {
490 if(ret
= pthread_join(lp
->stage
[j
]->thread
, (void **)&status
))
491 err(1, "pthread_join %d,%d", i
, j
);
492 DBG("Thread %d,%d status %d\n", i
, j
, status
);
497 * See how long the work took.
499 timer
= mach_absolute_time() - timer
;
500 timer
= timer
/ 1000000ULL;
501 printf("%d.%03d seconds elapsed.\n",
502 (int) (timer
/1000ULL), (int) (timer
% 1000ULL));