1 #include <AvailabilityMacros.h>
2 #include <mach/thread_policy.h>
4 #include <mach/mach_error.h>
5 #include <mach/mach_time.h>
15 * Pool is another multithreaded test/benchmarking program to evaluate
16 * affinity set placement in Leopard.
18 * The basic picture is:
20 * -> producer -- -> consumer --
22 * -> queue -- ... --> queue -- --
24 * | -> producer -- -> consumer -- |
25 * ---------------------------------------------------------------
27 * <---------- "stage" ---------> <---------- "stage" --------->
29 * There are a series of work stages. Each stage has an input and an output
30 * queue and multiple threads. The first stage is the producer and subsequent
31 * stages are consumers. By defuaut there are 2 stages. There are N producer
32 * and M consumer threads. The are B buffers per producer threads circulating
35 * When affinity is enabled, each producer thread is tagged with an affinity tag
36 * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to
37 * the work queue it is tagged with this affinity. When a consumer dequeues a
38 * work item, it sets its affinity to this tag. Hence consumer threads migrate
39 * to the same affinity set where the data was produced.
41 * Buffer management uses pthread mutex/condition variables. A thread blocks
42 * when no buffer is available on a queue and it is signaled when a buffer
43 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
44 * The queue management is centralized in a single routine: what queues to
45 * use as input and output and what function to call for processing is
49 pthread_mutex_t funnel
;
50 pthread_cond_t barrier
;
54 int threads_ready
= 0;
56 int iterations
= 10000;
57 boolean_t affinity
= FALSE
;
58 boolean_t halting
= FALSE
;
62 TAILQ_ENTRY(work
) link
;
70 * A work queue, complete with pthread objects for its management
72 typedef struct work_queue
{
75 TAILQ_HEAD(, work
) queue
;
79 /* Worker functions take a integer array and size */
80 typedef void (worker_fn_t
)(int *, int);
82 /* This struct controls the function of a stage */
83 #define WORKERS_MAX 10
94 /* This defines a worker thread */
95 typedef struct worker_info
{
101 #define DBG(x...) do { \
102 if (verbosity > 1) { \
103 pthread_mutex_lock(&funnel); \
105 pthread_mutex_unlock(&funnel); \
109 #define mutter(x...) do { \
110 if (verbosity > 0) { \
115 #define s_if_plural(x) (((x) > 1) ? "s" : "")
121 "usage: pool [-a] Turn affinity on (off)\n"
122 " [-b B] Number of buffers per producer (2)\n"
123 " [-i I] Number of buffers to produce (10000)\n"
124 " [-s S] Number of stages (2)\n"
125 " [-p P] Number of pages per buffer (256=1MB)]\n"
126 " [-w] Consumer writes data\n"
127 " [-v V] Verbosity level 0..2 (1)\n"
128 " [N [M]] Number of producer and consumers (2)\n"
133 /* Trivial producer: write to each byte */
135 writer_fn(int *data
, int isize
)
139 for (i
= 0; i
< isize
; i
++) {
144 /* Trivial consumer: read each byte */
146 reader_fn(int *data
, int isize
)
151 for (i
= 0; i
< isize
; i
++) {
156 /* Consumer reading and writing the buffer */
158 reader_writer_fn(int *data
, int isize
)
162 for (i
= 0; i
< isize
; i
++) {
168 affinity_set(int tag
)
171 thread_affinity_policy_data_t policy
;
173 policy
.affinity_tag
= tag
;
174 ret
= thread_policy_set(
175 mach_thread_self(), THREAD_AFFINITY_POLICY
,
176 (thread_policy_t
) &policy
,
177 THREAD_AFFINITY_POLICY_COUNT
);
178 if (ret
!= KERN_SUCCESS
) {
179 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret
);
185 * This is the central function for every thread.
186 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
189 manager_fn(void *arg
)
191 worker_info_t
*wp
= (worker_info_t
*) arg
;
192 stage_info_t
*sp
= wp
->stage
;
193 boolean_t is_producer
= (sp
->stagenum
== 0);
198 thread_extended_policy_data_t epolicy
;
199 epolicy
.timeshare
= FALSE
;
200 ret
= thread_policy_set(
201 mach_thread_self(), THREAD_EXTENDED_POLICY
,
202 (thread_policy_t
) &epolicy
,
203 THREAD_EXTENDED_POLICY_COUNT
);
204 if (ret
!= KERN_SUCCESS
) {
205 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret
);
209 * If we're using affinity sets and we're a producer
210 * set our tag to by our thread set number.
212 if (affinity
&& is_producer
) {
213 affinity_set(wp
->setnum
);
214 current_tag
= wp
->setnum
;
217 DBG("Starting %s %d, stage: %d\n", sp
->name
, wp
->setnum
, sp
->stagenum
);
221 * The tets thread to get here releases everyone and starts the timer.
223 pthread_mutex_lock(&funnel
);
225 if (threads_ready
== threads
) {
226 pthread_mutex_unlock(&funnel
);
228 printf(" all threads ready for process %d, "
229 "hit any key to start", getpid());
233 pthread_cond_broadcast(&barrier
);
234 timer
= mach_absolute_time();
236 pthread_cond_wait(&barrier
, &funnel
);
237 pthread_mutex_unlock(&funnel
);
244 * Get a buffer from the input queue.
246 * Quit if all work done.
248 pthread_mutex_lock(&sp
->input
->mtx
);
250 if (sp
->work_todo
== 0) {
251 pthread_mutex_unlock(&sp
->input
->mtx
);
254 workp
= TAILQ_FIRST(&(sp
->input
->queue
));
258 DBG(" %s[%d,%d] todo %d waiting for buffer\n",
259 sp
->name
, wp
->setnum
, sp
->stagenum
, sp
->work_todo
);
260 sp
->input
->waiters
++;
261 pthread_cond_wait(&sp
->input
->cnd
, &sp
->input
->mtx
);
262 sp
->input
->waiters
--;
264 TAILQ_REMOVE(&(sp
->input
->queue
), workp
, link
);
265 iteration
= sp
->work_todo
--;
266 pthread_mutex_unlock(&sp
->input
->mtx
);
269 workp
->number
= iteration
;
270 workp
->tag
= wp
->setnum
;
272 if (affinity
&& current_tag
!= workp
->tag
) {
273 affinity_set(workp
->tag
);
274 current_tag
= workp
->tag
;
278 DBG(" %s[%d,%d] todo %d work %p data %p\n",
279 sp
->name
, wp
->setnum
, sp
->stagenum
, iteration
, workp
, workp
->data
);
281 /* Do our stuff with the buffer */
282 (void) sp
->fn(workp
->data
, workp
->isize
);
285 * Place the buffer on the input queue of the next stage.
286 * Signal waiters if required.
288 pthread_mutex_lock(&sp
->output
->mtx
);
289 TAILQ_INSERT_TAIL(&(sp
->output
->queue
), workp
, link
);
290 if (sp
->output
->waiters
) {
291 DBG(" %s[%d,%d] todo %d signaling work\n",
292 sp
->name
, wp
->setnum
, sp
->stagenum
, iteration
);
293 pthread_cond_signal(&sp
->output
->cnd
);
295 pthread_mutex_unlock(&sp
->output
->mtx
);
299 pthread_cond_broadcast(&sp
->output
->cnd
);
301 DBG("Ending %s[%d,%d]\n", sp
->name
, wp
->setnum
, sp
->stagenum
);
303 return (void *) iteration
;
306 void (*producer_fnp
)(int *data
, int isize
) = &writer_fn
;
307 void (*consumer_fnp
)(int *data
, int isize
) = &reader_fn
;
310 main(int argc
, char *argv
[])
315 int pages
= 256; /* 1MB */
321 stage_info_t
*stage_info
;
323 worker_info_t
*worker_info
;
328 /* Do switch parsing: */
329 while ((c
= getopt(argc
, argv
, "ab:i:p:s:twv:")) != -1) {
332 affinity
= !affinity
;
335 buffers
= atoi(optarg
);
338 iterations
= atoi(optarg
);
341 pages
= atoi(optarg
);
344 stages
= atoi(optarg
);
345 if (stages
>= WORKERS_MAX
) {
353 consumer_fnp
= &reader_writer_fn
;
356 verbosity
= atoi(optarg
);
364 argc
-= optind
; argv
+= optind
;
366 producers
= atoi(*argv
);
370 consumers
= atoi(*argv
);
373 pthread_mutex_init(&funnel
, NULL
);
374 pthread_cond_init(&barrier
, NULL
);
377 * Fire up the worker threads.
379 threads
= consumers
* (stages
- 1) + producers
;
380 mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n"
381 " with %saffinity, consumer reads%s data\n",
382 producers
, s_if_plural(producers
),
383 stages
- 1, s_if_plural(stages
- 1),
384 consumers
, s_if_plural(consumers
),
386 (consumer_fnp
== &reader_writer_fn
)? " and writes" : "");
388 mutter(" %dkB bytes per buffer, ", pages
* 4);
390 mutter(" %dMB bytes per buffer, ", pages
/ 256);
392 mutter("%d buffer%s per producer ",
393 buffers
, s_if_plural(buffers
));
394 if (buffers
* pages
< 256) {
395 mutter("(total %dkB)\n", buffers
* pages
* 4);
397 mutter("(total %dMB)\n", buffers
* pages
/ 256);
399 mutter(" processing %d buffer%s...\n",
400 iterations
, s_if_plural(iterations
));
402 stage_info
= (stage_info_t
*) malloc(stages
* sizeof(stage_info_t
));
403 worker_info
= (worker_info_t
*) malloc(threads
* sizeof(worker_info_t
));
405 /* Set up the queue for the workers of this thread set: */
406 for (i
= 0; i
< stages
; i
++) {
409 pthread_mutex_init(&sp
->bufq
.mtx
, NULL
);
410 pthread_cond_init(&sp
->bufq
.cnd
, NULL
);
411 TAILQ_INIT(&sp
->bufq
.queue
);
412 sp
->bufq
.waiters
= 0;
414 sp
->fn
= producer_fnp
;
415 sp
->name
= "producer";
417 sp
->fn
= consumer_fnp
;
418 sp
->name
= "consumer";
420 sp
->input
= &sp
->bufq
;
421 sp
->output
= &stage_info
[(i
+ 1) % stages
].bufq
;
422 stage_info
[i
].work_todo
= iterations
;
425 /* Create the producers */
426 for (i
= 0; i
< producers
; i
++) {
431 isize
= pages
* 4096 / sizeof(int);
432 data
= (int *) malloc(buffers
* pages
* 4096);
434 /* Set up the empty work buffers */
435 work_array
= (work_t
*) malloc(buffers
* sizeof(work_t
));
436 for (j
= 0; j
< buffers
; j
++) {
437 work_array
[j
].data
= data
+ (isize
* j
);
438 work_array
[j
].isize
= isize
;
439 work_array
[j
].tag
= 0;
440 TAILQ_INSERT_TAIL(&stage_info
[0].bufq
.queue
, &work_array
[j
], link
);
441 DBG(" empty work item %p for data %p\n",
442 &work_array
[j
], work_array
[j
].data
);
444 wp
= &worker_info
[i
];
446 wp
->stage
= &stage_info
[0];
447 if (ret
= pthread_create(&wp
->thread
,
451 err(1, "pthread_create %d,%d", 0, i
);
455 /* Create consumers */
456 for (i
= 1; i
< stages
; i
++) {
457 for (j
= 0; j
< consumers
; j
++) {
458 wp
= &worker_info
[producers
+ (consumers
* (i
- 1)) + j
];
460 wp
->stage
= &stage_info
[i
];
461 if (ret
= pthread_create(&wp
->thread
,
465 err(1, "pthread_create %d,%d", i
, j
);
471 * We sit back anf wait for the slaves to finish.
473 for (k
= 0; k
< threads
; k
++) {
477 wp
= &worker_info
[k
];
482 i
= (k
- producers
) / consumers
;
483 j
= (k
- producers
) % consumers
;
485 if (ret
= pthread_join(wp
->thread
, (void **)&status
)) {
486 err(1, "pthread_join %d,%d", i
, j
);
488 DBG("Thread %d,%d status %d\n", i
, j
, status
);
492 * See how long the work took.
494 timer
= mach_absolute_time() - timer
;
495 timer
= timer
/ 1000000ULL;
496 printf("%d.%03d seconds elapsed.\n",
497 (int) (timer
/ 1000ULL), (int) (timer
% 1000ULL));