1 #include <AvailabilityMacros.h>
2 #include <mach/thread_policy.h>
4 #include <mach/mach_error.h>
5 #include <mach/mach_time.h>
15 * Pool is another multithreaded test/benchmarking program to evaluate
16 * affinity set placement in Leopard.
18 * The basic picture is:
20 * -> producer -- -> consumer --
22 * -> queue -- ... --> queue -- --
24 * | -> producer -- -> consumer -- |
25 * ---------------------------------------------------------------
27 * <---------- "stage" ---------> <---------- "stage" --------->
29 * There are a series of work stages. Each stage has an input and an output
30 * queue and multiple threads. The first stage is the producer and subsequent
31 * stages are consumers. By defuaut there are 2 stages. There are N producer
32 * and M consumer threads. The are B buffers per producer threads circulating
35 * When affinity is enabled, each producer thread is tagged with an affinity tag
36 * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to
37 * the work queue it is tagged with this affinity. When a consumer dequeues a
38 * work item, it sets its affinity to this tag. Hence consumer threads migrate
39 * to the same affinity set where the data was produced.
41 * Buffer management uses pthread mutex/condition variables. A thread blocks
42 * when no buffer is available on a queue and it is signaled when a buffer
43 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
44 * The queue management is centralized in a single routine: what queues to
45 * use as input and output and what function to call for processing is
49 pthread_mutex_t funnel
;
50 pthread_cond_t barrier
;
54 int threads_ready
= 0;
56 int iterations
= 10000;
57 boolean_t affinity
= FALSE
;
58 boolean_t halting
= FALSE
;
62 TAILQ_ENTRY(work
) link
;
70 * A work queue, complete with pthread objects for its management
72 typedef struct work_queue
{
75 TAILQ_HEAD(, work
) queue
;
79 /* Worker functions take a integer array and size */
80 typedef void (worker_fn_t
)(int *, int);
82 /* This struct controls the function of a stage */
83 #define WORKERS_MAX 10
94 /* This defines a worker thread */
95 typedef struct worker_info
{
101 #define DBG(x...) do { \
102 if (verbosity > 1) { \
103 pthread_mutex_lock(&funnel); \
105 pthread_mutex_unlock(&funnel); \
109 #define mutter(x...) do { \
110 if (verbosity > 0) { \
115 #define s_if_plural(x) (((x) > 1) ? "s" : "")
121 "usage: pool [-a] Turn affinity on (off)\n"
122 " [-b B] Number of buffers per producer (2)\n"
123 " [-i I] Number of buffers to produce (10000)\n"
124 " [-s S] Number of stages (2)\n"
125 " [-p P] Number of pages per buffer (256=1MB)]\n"
126 " [-w] Consumer writes data\n"
127 " [-v V] Verbosity level 0..2 (1)\n"
128 " [N [M]] Number of producer and consumers (2)\n"
133 /* Trivial producer: write to each byte */
135 writer_fn(int *data
, int isize
)
139 for (i
= 0; i
< isize
; i
++) {
144 /* Trivial consumer: read each byte */
146 reader_fn(int *data
, int isize
)
151 for (i
= 0; i
< isize
; i
++) {
156 /* Consumer reading and writing the buffer */
158 reader_writer_fn(int *data
, int isize
)
162 for (i
= 0; i
< isize
; i
++) {
168 affinity_set(int tag
)
171 thread_affinity_policy_data_t policy
;
173 policy
.affinity_tag
= tag
;
174 ret
= thread_policy_set(
175 mach_thread_self(), THREAD_AFFINITY_POLICY
,
176 (thread_policy_t
) &policy
,
177 THREAD_AFFINITY_POLICY_COUNT
);
178 if (ret
!= KERN_SUCCESS
)
179 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret
);
184 * This is the central function for every thread.
185 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
188 manager_fn(void *arg
)
190 worker_info_t
*wp
= (worker_info_t
*) arg
;
191 stage_info_t
*sp
= wp
->stage
;
192 boolean_t is_producer
= (sp
->stagenum
== 0);
197 thread_extended_policy_data_t epolicy
;
198 epolicy
.timeshare
= FALSE
;
199 ret
= thread_policy_set(
200 mach_thread_self(), THREAD_EXTENDED_POLICY
,
201 (thread_policy_t
) &epolicy
,
202 THREAD_EXTENDED_POLICY_COUNT
);
203 if (ret
!= KERN_SUCCESS
)
204 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret
);
207 * If we're using affinity sets and we're a producer
208 * set our tag to by our thread set number.
210 if (affinity
&& is_producer
) {
211 affinity_set(wp
->setnum
);
212 current_tag
= wp
->setnum
;
215 DBG("Starting %s %d, stage: %d\n", sp
->name
, wp
->setnum
, sp
->stagenum
);
219 * The tets thread to get here releases everyone and starts the timer.
221 pthread_mutex_lock(&funnel
);
223 if (threads_ready
== threads
) {
224 pthread_mutex_unlock(&funnel
);
226 printf(" all threads ready for process %d, "
227 "hit any key to start", getpid());
231 pthread_cond_broadcast(&barrier
);
232 timer
= mach_absolute_time();
234 pthread_cond_wait(&barrier
, &funnel
);
235 pthread_mutex_unlock(&funnel
);
242 * Get a buffer from the input queue.
244 * Quit if all work done.
246 pthread_mutex_lock(&sp
->input
->mtx
);
248 if (sp
->work_todo
== 0) {
249 pthread_mutex_unlock(&sp
->input
->mtx
);
252 workp
= TAILQ_FIRST(&(sp
->input
->queue
));
255 DBG(" %s[%d,%d] todo %d waiting for buffer\n",
256 sp
->name
, wp
->setnum
, sp
->stagenum
, sp
->work_todo
);
257 sp
->input
->waiters
++;
258 pthread_cond_wait(&sp
->input
->cnd
, &sp
->input
->mtx
);
259 sp
->input
->waiters
--;
261 TAILQ_REMOVE(&(sp
->input
->queue
), workp
, link
);
262 iteration
= sp
->work_todo
--;
263 pthread_mutex_unlock(&sp
->input
->mtx
);
266 workp
->number
= iteration
;
267 workp
->tag
= wp
->setnum
;
269 if (affinity
&& current_tag
!= workp
->tag
) {
270 affinity_set(workp
->tag
);
271 current_tag
= workp
->tag
;
275 DBG(" %s[%d,%d] todo %d work %p data %p\n",
276 sp
->name
, wp
->setnum
, sp
->stagenum
, iteration
, workp
, workp
->data
);
278 /* Do our stuff with the buffer */
279 (void) sp
->fn(workp
->data
, workp
->isize
);
282 * Place the buffer on the input queue of the next stage.
283 * Signal waiters if required.
285 pthread_mutex_lock(&sp
->output
->mtx
);
286 TAILQ_INSERT_TAIL(&(sp
->output
->queue
), workp
, link
);
287 if (sp
->output
->waiters
) {
288 DBG(" %s[%d,%d] todo %d signaling work\n",
289 sp
->name
, wp
->setnum
, sp
->stagenum
, iteration
);
290 pthread_cond_signal(&sp
->output
->cnd
);
292 pthread_mutex_unlock(&sp
->output
->mtx
);
297 pthread_cond_broadcast(&sp
->output
->cnd
);
299 DBG("Ending %s[%d,%d]\n", sp
->name
, wp
->setnum
, sp
->stagenum
);
301 return (void *) iteration
;
304 void (*producer_fnp
)(int *data
, int isize
) = &writer_fn
;
305 void (*consumer_fnp
)(int *data
, int isize
) = &reader_fn
;
308 main(int argc
, char *argv
[])
313 int pages
= 256; /* 1MB */
319 stage_info_t
*stage_info
;
321 worker_info_t
*worker_info
;
326 /* Do switch parsing: */
327 while ((c
= getopt (argc
, argv
, "ab:i:p:s:twv:")) != -1) {
330 affinity
= !affinity
;
333 buffers
= atoi(optarg
);
336 iterations
= atoi(optarg
);
339 pages
= atoi(optarg
);
342 stages
= atoi(optarg
);
343 if (stages
>= WORKERS_MAX
)
350 consumer_fnp
= &reader_writer_fn
;
353 verbosity
= atoi(optarg
);
361 argc
-= optind
; argv
+= optind
;
363 producers
= atoi(*argv
);
366 consumers
= atoi(*argv
);
368 pthread_mutex_init(&funnel
, NULL
);
369 pthread_cond_init(&barrier
, NULL
);
372 * Fire up the worker threads.
374 threads
= consumers
* (stages
- 1) + producers
;
375 mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n"
376 " with %saffinity, consumer reads%s data\n",
377 producers
, s_if_plural(producers
),
378 stages
- 1, s_if_plural(stages
- 1),
379 consumers
, s_if_plural(consumers
),
381 (consumer_fnp
== &reader_writer_fn
)? " and writes" : "");
383 mutter(" %dkB bytes per buffer, ", pages
* 4);
385 mutter(" %dMB bytes per buffer, ", pages
/ 256);
386 mutter("%d buffer%s per producer ",
387 buffers
, s_if_plural(buffers
));
388 if (buffers
* pages
< 256)
389 mutter("(total %dkB)\n", buffers
* pages
* 4);
391 mutter("(total %dMB)\n", buffers
* pages
/ 256);
392 mutter(" processing %d buffer%s...\n",
393 iterations
, s_if_plural(iterations
));
395 stage_info
= (stage_info_t
*) malloc(stages
* sizeof(stage_info_t
));
396 worker_info
= (worker_info_t
*) malloc(threads
* sizeof(worker_info_t
));
398 /* Set up the queue for the workers of this thread set: */
399 for (i
= 0; i
< stages
; i
++) {
402 pthread_mutex_init(&sp
->bufq
.mtx
, NULL
);
403 pthread_cond_init(&sp
->bufq
.cnd
, NULL
);
404 TAILQ_INIT(&sp
->bufq
.queue
);
405 sp
->bufq
.waiters
= 0;
407 sp
->fn
= producer_fnp
;
408 sp
->name
= "producer";
410 sp
->fn
= consumer_fnp
;
411 sp
->name
= "consumer";
413 sp
->input
= &sp
->bufq
;
414 sp
->output
= &stage_info
[(i
+ 1) % stages
].bufq
;
415 stage_info
[i
].work_todo
= iterations
;
418 /* Create the producers */
419 for (i
= 0; i
< producers
; i
++) {
424 isize
= pages
* 4096 / sizeof(int);
425 data
= (int *) malloc(buffers
* pages
* 4096);
427 /* Set up the empty work buffers */
428 work_array
= (work_t
*) malloc(buffers
* sizeof(work_t
));
429 for (j
= 0; j
< buffers
; j
++) {
430 work_array
[j
].data
= data
+ (isize
* j
);
431 work_array
[j
].isize
= isize
;
432 work_array
[j
].tag
= 0;
433 TAILQ_INSERT_TAIL(&stage_info
[0].bufq
.queue
, &work_array
[j
], link
);
434 DBG(" empty work item %p for data %p\n",
435 &work_array
[j
], work_array
[j
].data
);
437 wp
= &worker_info
[i
];
439 wp
->stage
= &stage_info
[0];
440 if (ret
= pthread_create(&wp
->thread
,
444 err(1, "pthread_create %d,%d", 0, i
);
447 /* Create consumers */
448 for (i
= 1; i
< stages
; i
++) {
449 for (j
= 0; j
< consumers
; j
++) {
450 wp
= &worker_info
[producers
+ (consumers
*(i
-1)) + j
];
452 wp
->stage
= &stage_info
[i
];
453 if (ret
= pthread_create(&wp
->thread
,
457 err(1, "pthread_create %d,%d", i
, j
);
462 * We sit back anf wait for the slaves to finish.
464 for (k
= 0; k
< threads
; k
++) {
468 wp
= &worker_info
[k
];
473 i
= (k
- producers
) / consumers
;
474 j
= (k
- producers
) % consumers
;
476 if(ret
= pthread_join(wp
->thread
, (void **)&status
))
477 err(1, "pthread_join %d,%d", i
, j
);
478 DBG("Thread %d,%d status %d\n", i
, j
, status
);
482 * See how long the work took.
484 timer
= mach_absolute_time() - timer
;
485 timer
= timer
/ 1000000ULL;
486 printf("%d.%03d seconds elapsed.\n",
487 (int) (timer
/1000ULL), (int) (timer
% 1000ULL));