1 #include <AvailabilityMacros.h>
2 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
3 #include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h>
6 #include <mach/mach_error.h>
7 #include <mach/mach_time.h>
17 * Pool is another multithreaded test/benchmarking program to evaluate
18 * affinity set placement in Leopard.
20 * The basic picture is:
22 * -> producer -- -> consumer --
24 * -> queue -- ... --> queue -- --
26 * | -> producer -- -> consumer -- |
27 * ---------------------------------------------------------------
29 * <---------- "stage" ---------> <---------- "stage" --------->
31 * There are a series of work stages. Each stage has an input and an output
32 * queue and multiple threads. The first stage is the producer and subsequent
33 * stages are consumers. By defuaut there are 2 stages. There are N producer
34 * and M consumer threads. The are B buffers per producer threads circulating
37 * When affinity is enabled, each producer thread is tagged with an affinity tag
38 * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to
39 * the work queue it is tagged with this affinity. When a consumer dequeues a
40 * work item, it sets its affinity to this tag. Hence consumer threads migrate
41 * to the same affinity set where the data was produced.
43 * Buffer management uses pthread mutex/condition variables. A thread blocks
44 * when no buffer is available on a queue and it is signaled when a buffer
45 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
46 * The queue management is centralized in a single routine: what queues to
47 * use as input and output and what function to call for processing is
51 pthread_mutex_t funnel
;
52 pthread_cond_t barrier
;
56 int threads_ready
= 0;
58 int iterations
= 10000;
59 boolean_t affinity
= FALSE
;
60 boolean_t halting
= FALSE
;
64 TAILQ_ENTRY(work
) link
;
72 * A work queue, complete with pthread objects for its management
74 typedef struct work_queue
{
77 TAILQ_HEAD(, work
) queue
;
81 /* Worker functions take a integer array and size */
82 typedef void (worker_fn_t
)(int *, int);
84 /* This struct controls the function of a stage */
85 #define WORKERS_MAX 10
96 /* This defines a worker thread */
97 typedef struct worker_info
{
103 #define DBG(x...) do { \
104 if (verbosity > 1) { \
105 pthread_mutex_lock(&funnel); \
107 pthread_mutex_unlock(&funnel); \
111 #define mutter(x...) do { \
112 if (verbosity > 0) { \
117 #define s_if_plural(x) (((x) > 1) ? "s" : "")
123 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
124 "usage: pool [-a] Turn affinity on (off)\n"
125 " [-b B] Number of buffers per producer (2)\n"
127 "usage: pool [-b B] Number of buffers per producer (2)\n"
129 " [-i I] Number of buffers to produce (10000)\n"
130 " [-s S] Number of stages (2)\n"
131 " [-p P] Number of pages per buffer (256=1MB)]\n"
132 " [-w] Consumer writes data\n"
133 " [-v V] Verbosity level 0..2 (1)\n"
134 " [N [M]] Number of producer and consumers (2)\n"
139 /* Trivial producer: write to each byte */
141 writer_fn(int *data
, int isize
)
145 for (i
= 0; i
< isize
; i
++) {
150 /* Trivial consumer: read each byte */
152 reader_fn(int *data
, int isize
)
157 for (i
= 0; i
< isize
; i
++) {
162 /* Consumer reading and writing the buffer */
164 reader_writer_fn(int *data
, int isize
)
168 for (i
= 0; i
< isize
; i
++) {
174 affinity_set(int tag
)
176 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
178 thread_affinity_policy_data_t policy
;
180 policy
.affinity_tag
= tag
;
181 ret
= thread_policy_set(
182 mach_thread_self(), THREAD_AFFINITY_POLICY
,
183 (thread_policy_t
) &policy
,
184 THREAD_AFFINITY_POLICY_COUNT
);
185 if (ret
!= KERN_SUCCESS
)
186 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret
);
192 * This is the central function for every thread.
193 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
196 manager_fn(void *arg
)
198 worker_info_t
*wp
= (worker_info_t
*) arg
;
199 stage_info_t
*sp
= wp
->stage
;
200 boolean_t is_producer
= (sp
->stagenum
== 0);
204 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
206 thread_extended_policy_data_t epolicy
;
207 epolicy
.timeshare
= FALSE
;
208 ret
= thread_policy_set(
209 mach_thread_self(), THREAD_EXTENDED_POLICY
,
210 (thread_policy_t
) &epolicy
,
211 THREAD_EXTENDED_POLICY_COUNT
);
212 if (ret
!= KERN_SUCCESS
)
213 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret
);
217 * If we're using affinity sets and we're a producer
218 * set our tag to by our thread set number.
220 if (affinity
&& is_producer
) {
221 affinity_set(wp
->setnum
);
222 current_tag
= wp
->setnum
;
225 DBG("Starting %s %d, stage: %d\n", sp
->name
, wp
->setnum
, sp
->stagenum
);
229 * The tets thread to get here releases everyone and starts the timer.
231 pthread_mutex_lock(&funnel
);
233 if (threads_ready
== threads
) {
234 pthread_mutex_unlock(&funnel
);
236 printf(" all threads ready for process %d, "
237 "hit any key to start", getpid());
241 pthread_cond_broadcast(&barrier
);
242 timer
= mach_absolute_time();
244 pthread_cond_wait(&barrier
, &funnel
);
245 pthread_mutex_unlock(&funnel
);
252 * Get a buffer from the input queue.
254 * Quit if all work done.
256 pthread_mutex_lock(&sp
->input
->mtx
);
258 if (sp
->work_todo
== 0) {
259 pthread_mutex_unlock(&sp
->input
->mtx
);
262 workp
= TAILQ_FIRST(&(sp
->input
->queue
));
265 DBG(" %s[%d,%d] todo %d waiting for buffer\n",
266 sp
->name
, wp
->setnum
, sp
->stagenum
, sp
->work_todo
);
267 sp
->input
->waiters
++;
268 pthread_cond_wait(&sp
->input
->cnd
, &sp
->input
->mtx
);
269 sp
->input
->waiters
--;
271 TAILQ_REMOVE(&(sp
->input
->queue
), workp
, link
);
272 iteration
= sp
->work_todo
--;
273 pthread_mutex_unlock(&sp
->input
->mtx
);
276 workp
->number
= iteration
;
277 workp
->tag
= wp
->setnum
;
279 if (affinity
&& current_tag
!= workp
->tag
) {
280 affinity_set(workp
->tag
);
281 current_tag
= workp
->tag
;
285 DBG(" %s[%d,%d] todo %d work %p data %p\n",
286 sp
->name
, wp
->setnum
, sp
->stagenum
, iteration
, workp
, workp
->data
);
288 /* Do our stuff with the buffer */
289 (void) sp
->fn(workp
->data
, workp
->isize
);
292 * Place the buffer on the input queue of the next stage.
293 * Signal waiters if required.
295 pthread_mutex_lock(&sp
->output
->mtx
);
296 TAILQ_INSERT_TAIL(&(sp
->output
->queue
), workp
, link
);
297 if (sp
->output
->waiters
) {
298 DBG(" %s[%d,%d] todo %d signaling work\n",
299 sp
->name
, wp
->setnum
, sp
->stagenum
, iteration
);
300 pthread_cond_signal(&sp
->output
->cnd
);
302 pthread_mutex_unlock(&sp
->output
->mtx
);
307 pthread_cond_broadcast(&sp
->output
->cnd
);
309 DBG("Ending %s[%d,%d]\n", sp
->name
, wp
->setnum
, sp
->stagenum
);
311 return (void *) iteration
;
314 void (*producer_fnp
)(int *data
, int isize
) = &writer_fn
;
315 void (*consumer_fnp
)(int *data
, int isize
) = &reader_fn
;
318 main(int argc
, char *argv
[])
323 int pages
= 256; /* 1MB */
329 stage_info_t
*stage_info
;
331 worker_info_t
*worker_info
;
336 /* Do switch parsing: */
337 while ((c
= getopt (argc
, argv
, "ab:i:p:s:twv:")) != -1) {
340 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
341 affinity
= !affinity
;
347 buffers
= atoi(optarg
);
350 iterations
= atoi(optarg
);
353 pages
= atoi(optarg
);
356 stages
= atoi(optarg
);
357 if (stages
>= WORKERS_MAX
)
364 consumer_fnp
= &reader_writer_fn
;
367 verbosity
= atoi(optarg
);
375 argc
-= optind
; argv
+= optind
;
377 producers
= atoi(*argv
);
380 consumers
= atoi(*argv
);
382 pthread_mutex_init(&funnel
, NULL
);
383 pthread_cond_init(&barrier
, NULL
);
386 * Fire up the worker threads.
388 threads
= consumers
* (stages
- 1) + producers
;
389 mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n"
390 " with %saffinity, consumer reads%s data\n",
391 producers
, s_if_plural(producers
),
392 stages
- 1, s_if_plural(stages
- 1),
393 consumers
, s_if_plural(consumers
),
395 (consumer_fnp
== &reader_writer_fn
)? " and writes" : "");
397 mutter(" %dkB bytes per buffer, ", pages
* 4);
399 mutter(" %dMB bytes per buffer, ", pages
/ 256);
400 mutter("%d buffer%s per producer ",
401 buffers
, s_if_plural(buffers
));
402 if (buffers
* pages
< 256)
403 mutter("(total %dkB)\n", buffers
* pages
* 4);
405 mutter("(total %dMB)\n", buffers
* pages
/ 256);
406 mutter(" processing %d buffer%s...\n",
407 iterations
, s_if_plural(iterations
));
409 stage_info
= (stage_info_t
*) malloc(stages
* sizeof(stage_info_t
));
410 worker_info
= (worker_info_t
*) malloc(threads
* sizeof(worker_info_t
));
412 /* Set up the queue for the workers of this thread set: */
413 for (i
= 0; i
< stages
; i
++) {
416 pthread_mutex_init(&sp
->bufq
.mtx
, NULL
);
417 pthread_cond_init(&sp
->bufq
.cnd
, NULL
);
418 TAILQ_INIT(&sp
->bufq
.queue
);
419 sp
->bufq
.waiters
= 0;
421 sp
->fn
= producer_fnp
;
422 sp
->name
= "producer";
424 sp
->fn
= consumer_fnp
;
425 sp
->name
= "consumer";
427 sp
->input
= &sp
->bufq
;
428 sp
->output
= &stage_info
[(i
+ 1) % stages
].bufq
;
429 stage_info
[i
].work_todo
= iterations
;
432 /* Create the producers */
433 for (i
= 0; i
< producers
; i
++) {
438 isize
= pages
* 4096 / sizeof(int);
439 data
= (int *) malloc(buffers
* pages
* 4096);
441 /* Set up the empty work buffers */
442 work_array
= (work_t
*) malloc(buffers
* sizeof(work_t
));
443 for (j
= 0; j
< buffers
; j
++) {
444 work_array
[j
].data
= data
+ (isize
* j
);
445 work_array
[j
].isize
= isize
;
446 work_array
[j
].tag
= 0;
447 TAILQ_INSERT_TAIL(&stage_info
[0].bufq
.queue
, &work_array
[j
], link
);
448 DBG(" empty work item %p for data %p\n",
449 &work_array
[j
], work_array
[j
].data
);
451 wp
= &worker_info
[i
];
453 wp
->stage
= &stage_info
[0];
454 if (ret
= pthread_create(&wp
->thread
,
458 err(1, "pthread_create %d,%d", 0, i
);
461 /* Create consumers */
462 for (i
= 1; i
< stages
; i
++) {
463 for (j
= 0; j
< consumers
; j
++) {
464 wp
= &worker_info
[producers
+ (consumers
*(i
-1)) + j
];
466 wp
->stage
= &stage_info
[i
];
467 if (ret
= pthread_create(&wp
->thread
,
471 err(1, "pthread_create %d,%d", i
, j
);
476 * We sit back anf wait for the slaves to finish.
478 for (k
= 0; k
< threads
; k
++) {
482 wp
= &worker_info
[k
];
487 i
= (k
- producers
) / consumers
;
488 j
= (k
- producers
) % consumers
;
490 if(ret
= pthread_join(wp
->thread
, (void **)&status
))
491 err(1, "pthread_join %d,%d", i
, j
);
492 DBG("Thread %d,%d status %d\n", i
, j
, status
);
496 * See how long the work took.
498 timer
= mach_absolute_time() - timer
;
499 timer
= timer
/ 1000000ULL;
500 printf("%d.%03d seconds elapsed.\n",
501 (int) (timer
/1000ULL), (int) (timer
% 1000ULL));