1 #include <AvailabilityMacros.h>
2 #include <mach/thread_policy.h>
4 #include <mach/mach_error.h>
5 #include <mach/mach_time.h>
14 #include <sys/sysctl.h>
17 * Sets is a multithreaded test/benchmarking program to evaluate
18 * affinity set placement in Leopard.
20 * The picture here, for each set, is:
23 * -> queue --> producer --> queue --> consumer --
25 * -----------------------------------------------
27 * <------ "stage" -----> <------ "stage" ----->
29 * We spin off sets of production line threads (2 sets by default).
30 * All threads of each line sets the same affinity tag (unless disabled).
31 * By default there are 2 stage (worker) threads per production line.
32 * A worker thread removes a buffer from an input queue, processses it and
33 * queues it on an output queue. By default the initial stage (producer)
34 * writes every byte in a buffer and the other (consumer) stages read every
35 * byte. By default the buffers are 1MB (256 pages) in size but this can be
36 * overidden. By default there are 2 buffers per set (again overridable).
37 * Worker threads process (iterate over) 10000 buffers by default.
39 * With affinity enabled, each producer and consumer thread sets its affinity
40 * to the set number, 1 .. N. So the threads of each set share an L2 cache.
42 * Buffer management uses pthread mutex/condition variables. A thread blocks
43 * when no buffer is available on a queue and it is signaled when a buffer
44 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
45 * The queue management is centralized in a single routine: what queues to
46 * use as input and output and what function to call for processing is
50 pthread_mutex_t funnel
;
51 pthread_cond_t barrier
;
55 int threads_ready
= 0;
57 int iterations
= 10000;
58 boolean_t affinity
= FALSE
;
59 boolean_t halting
= FALSE
;
60 boolean_t cache_config
= FALSE
;
64 TAILQ_ENTRY(work
) link
;
69 * A work queue, complete with pthread objects for its management
71 typedef struct work_queue
{
74 TAILQ_HEAD(, work
) queue
;
78 /* Worker functions take a integer array and size */
79 typedef void (worker_fn_t
)(int *, int);
81 /* This struct controls the function of a thread */
88 struct line_info
*set
;
93 /* This defines a thread set */
94 #define WORKERS_MAX 10
95 typedef struct line_info
{
99 stage_info_t
*stage
[WORKERS_MAX
];
102 #define DBG(x...) do { \
103 if (verbosity > 1) { \
104 pthread_mutex_lock(&funnel); \
106 pthread_mutex_unlock(&funnel); \
110 #define mutter(x...) do { \
111 if (verbosity > 0) { \
116 #define s_if_plural(x) (((x) > 1) ? "s" : "")
122 "usage: sets [-a] Turn affinity on (off)\n"
123 " [-b B] Number of buffers per set/line (2)\n"
124 " [-c] Configure for max cache performance\n"
126 " [-i I] Number of items/buffers to process (1000)\n"
127 " [-s S] Number of stages per set/line (2)\n"
128 " [-t] Halt for keyboard input to start\n"
129 " [-p P] Number of pages per buffer (256=1MB)]\n"
130 " [-w] Consumer writes data\n"
131 " [-v V] Level of verbosity 0..2 (1)\n"
132 " [N] Number of sets/lines (2)\n"
137 /* Trivial producer: write to each byte */
139 writer_fn(int *data
, int isize
)
143 for (i
= 0; i
< isize
; i
++) {
148 /* Trivial consumer: read each byte */
150 reader_fn(int *data
, int isize
)
155 for (i
= 0; i
< isize
; i
++) {
160 /* Consumer reading and writing the buffer */
162 reader_writer_fn(int *data
, int isize
)
166 for (i
= 0; i
< isize
; i
++) {
172 * This is the central function for every thread.
173 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
176 manager_fn(void *arg
)
178 stage_info_t
*sp
= (stage_info_t
*) arg
;
179 line_info_t
*lp
= sp
->set
;
184 * If we're using affinity sets (we are by default)
185 * set our tag to by our thread set number.
187 thread_extended_policy_data_t epolicy
;
188 thread_affinity_policy_data_t policy
;
190 epolicy
.timeshare
= FALSE
;
191 ret
= thread_policy_set(
192 mach_thread_self(), THREAD_EXTENDED_POLICY
,
193 (thread_policy_t
) &epolicy
,
194 THREAD_EXTENDED_POLICY_COUNT
);
195 if (ret
!= KERN_SUCCESS
)
196 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret
);
199 policy
.affinity_tag
= lp
->setnum
;
200 ret
= thread_policy_set(
201 mach_thread_self(), THREAD_AFFINITY_POLICY
,
202 (thread_policy_t
) &policy
,
203 THREAD_AFFINITY_POLICY_COUNT
);
204 if (ret
!= KERN_SUCCESS
)
205 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret
);
208 DBG("Starting %s set: %d stage: %d\n", sp
->name
, lp
->setnum
, sp
->stagenum
);
212 * The tets thread to get here releases everyone and starts the timer.
214 pthread_mutex_lock(&funnel
);
216 if (threads_ready
== threads
) {
217 pthread_mutex_unlock(&funnel
);
219 printf(" all threads ready for process %d, "
220 "hit any key to start", getpid());
224 pthread_cond_broadcast(&barrier
);
225 timer
= mach_absolute_time();
227 pthread_cond_wait(&barrier
, &funnel
);
228 pthread_mutex_unlock(&funnel
);
236 * Get a buffer from the input queue.
239 pthread_mutex_lock(&sp
->input
->mtx
);
241 workp
= TAILQ_FIRST(&(sp
->input
->queue
));
244 DBG(" %s[%d,%d] iteration %d waiting for buffer\n",
245 sp
->name
, lp
->setnum
, sp
->stagenum
, iteration
);
246 sp
->input
->waiters
= TRUE
;
247 pthread_cond_wait(&sp
->input
->cnd
, &sp
->input
->mtx
);
248 sp
->input
->waiters
= FALSE
;
250 TAILQ_REMOVE(&(sp
->input
->queue
), workp
, link
);
251 pthread_mutex_unlock(&sp
->input
->mtx
);
253 DBG(" %s[%d,%d] iteration %d work %p data %p\n",
254 sp
->name
, lp
->setnum
, sp
->stagenum
, iteration
, workp
, workp
->data
);
256 /* Do our stuff with the buffer */
257 (void) sp
->fn(workp
->data
, lp
->isize
);
260 * Place the buffer on the input queue.
261 * Signal waiters if required.
263 pthread_mutex_lock(&sp
->output
->mtx
);
264 TAILQ_INSERT_TAIL(&(sp
->output
->queue
), workp
, link
);
265 if (sp
->output
->waiters
) {
266 DBG(" %s[%d,%d] iteration %d signaling work\n",
267 sp
->name
, lp
->setnum
, sp
->stagenum
, iteration
);
268 pthread_cond_signal(&sp
->output
->cnd
);
270 pthread_mutex_unlock(&sp
->output
->mtx
);
271 } while (++iteration
< iterations
);
273 DBG("Ending %s[%d,%d]\n", sp
->name
, lp
->setnum
, sp
->stagenum
);
275 return (void *) iteration
;
278 #define MAX_CACHE_DEPTH 10
280 auto_config(int npages
, int *nbufs
, int *nsets
)
285 int64_t cacheconfig
[MAX_CACHE_DEPTH
];
286 int64_t cachesize
[MAX_CACHE_DEPTH
];
288 mutter("Autoconfiguring...\n");
290 len
= sizeof(cacheconfig
);
291 if (sysctlbyname("hw.cacheconfig",
292 &cacheconfig
[0], &len
, NULL
, 0) != 0) {
293 printf("Unable to get hw.cacheconfig, %d\n", errno
);
296 len
= sizeof(cachesize
);
297 if (sysctlbyname("hw.cachesize",
298 &cachesize
[0], &len
, NULL
, 0) != 0) {
299 printf("Unable to get hw.cachesize, %d\n", errno
);
306 for (llc
= MAX_CACHE_DEPTH
- 1; llc
> 0; llc
--)
307 if (cacheconfig
[llc
] != 0)
311 * Calculate number of buffers of size pages*4096 bytes
312 * fit into 90% of an L2 cache.
314 *nbufs
= cachesize
[llc
] * 9 / (npages
* 4096 * 10);
315 mutter(" L%d (LLC) cache %qd bytes: "
316 "using %d buffers of size %d bytes\n",
317 llc
, cachesize
[llc
], *nbufs
, (npages
* 4096));
320 * Calcalute how many sets:
322 *nsets
= cacheconfig
[0]/cacheconfig
[llc
];
323 mutter(" %qd cpus; %qd cpus per L%d cache: using %d sets\n",
324 cacheconfig
[0], cacheconfig
[llc
], llc
, *nsets
);
327 void (*producer_fnp
)(int *data
, int isize
) = &writer_fn
;
328 void (*consumer_fnp
)(int *data
, int isize
) = &reader_fn
;
331 main(int argc
, char *argv
[])
335 int pages
= 256; /* 1MB */
340 line_info_t
*line_info
;
342 stage_info_t
*stage_info
;
347 /* Do switch parsing: */
348 while ((c
= getopt (argc
, argv
, "ab:chi:p:s:twv:")) != -1) {
351 affinity
= !affinity
;
354 buffers
= atoi(optarg
);
360 iterations
= atoi(optarg
);
363 pages
= atoi(optarg
);
366 stages
= atoi(optarg
);
367 if (stages
>= WORKERS_MAX
)
374 consumer_fnp
= &reader_writer_fn
;
377 verbosity
= atoi(optarg
);
385 argc
-= optind
; argv
+= optind
;
390 auto_config(pages
, &buffers
, &sets
);
392 pthread_mutex_init(&funnel
, NULL
);
393 pthread_cond_init(&barrier
, NULL
);
396 * Fire up the worker threads.
398 threads
= sets
* stages
;
399 mutter("Launching %d set%s of %d threads with %saffinity, "
400 "consumer reads%s data\n",
401 sets
, s_if_plural(sets
), stages
, affinity
? "": "no ",
402 (consumer_fnp
== &reader_writer_fn
)? " and writes" : "");
404 mutter(" %dkB bytes per buffer, ", pages
* 4);
406 mutter(" %dMB bytes per buffer, ", pages
/ 256);
407 mutter("%d buffer%s per set ",
408 buffers
, s_if_plural(buffers
));
409 if (buffers
* pages
< 256)
410 mutter("(total %dkB)\n", buffers
* pages
* 4);
412 mutter("(total %dMB)\n", buffers
* pages
/ 256);
413 mutter(" processing %d buffer%s...\n",
414 iterations
, s_if_plural(iterations
));
415 line_info
= (line_info_t
*) malloc(sets
* sizeof(line_info_t
));
416 stage_info
= (stage_info_t
*) malloc(sets
* stages
* sizeof(stage_info_t
));
417 for (i
= 0; i
< sets
; i
++) {
423 lp
->isize
= pages
* 4096 / sizeof(int);
424 lp
->data
= (int *) malloc(buffers
* pages
* 4096);
426 /* Set up the queue for the workers of this thread set: */
427 for (j
= 0; j
< stages
; j
++) {
428 sp
= &stage_info
[(i
*stages
) + j
];
432 pthread_mutex_init(&sp
->bufq
.mtx
, NULL
);
433 pthread_cond_init(&sp
->bufq
.cnd
, NULL
);
434 TAILQ_INIT(&sp
->bufq
.queue
);
435 sp
->bufq
.waiters
= FALSE
;
439 * Take a second pass through the stages
440 * to define what the workers are and to interconnect their input/outputs
442 for (j
= 0; j
< stages
; j
++) {
445 sp
->fn
= producer_fnp
;
446 sp
->name
= "producer";
448 sp
->fn
= consumer_fnp
;
449 sp
->name
= "consumer";
451 sp
->input
= &lp
->stage
[j
]->bufq
;
452 sp
->output
= &lp
->stage
[(j
+ 1) % stages
]->bufq
;
455 /* Set up the buffers on the first worker of the set. */
456 work_array
= (work_t
*) malloc(buffers
* sizeof(work_t
));
457 for (j
= 0; j
< buffers
; j
++) {
458 work_array
[j
].data
= lp
->data
+ (lp
->isize
* j
);
459 TAILQ_INSERT_TAIL(&lp
->stage
[0]->bufq
.queue
, &work_array
[j
], link
);
460 DBG(" empty work item %p for set %d data %p\n",
461 &work_array
[j
], i
, work_array
[j
].data
);
464 /* Create this set of threads */
465 for (j
= 0; j
< stages
; j
++) {
466 if (ret
= pthread_create(&lp
->stage
[j
]->thread
, NULL
,
468 (void *) lp
->stage
[j
]))
469 err(1, "pthread_create %d,%d", i
, j
);
474 * We sit back anf wait for the slave to finish.
476 for (i
= 0; i
< sets
; i
++) {
478 for (j
= 0; j
< stages
; j
++) {
479 if(ret
= pthread_join(lp
->stage
[j
]->thread
, (void **)&status
))
480 err(1, "pthread_join %d,%d", i
, j
);
481 DBG("Thread %d,%d status %d\n", i
, j
, status
);
486 * See how long the work took.
488 timer
= mach_absolute_time() - timer
;
489 timer
= timer
/ 1000000ULL;
490 printf("%d.%03d seconds elapsed.\n",
491 (int) (timer
/1000ULL), (int) (timer
% 1000ULL));