]> git.saurik.com Git - apple/xnu.git/blame - tools/tests/affinity/pool.c
xnu-7195.101.1.tar.gz
[apple/xnu.git] / tools / tests / affinity / pool.c
CommitLineData
2d21ac55 1#include <AvailabilityMacros.h>
fe8ab488 2#include <mach/thread_policy.h>
2d21ac55
A
3#include <mach/mach.h>
4#include <mach/mach_error.h>
5#include <mach/mach_time.h>
6#include <pthread.h>
7#include <sys/queue.h>
8#include <stdio.h>
9#include <stdlib.h>
10#include <string.h>
11#include <unistd.h>
12#include <err.h>
13
14/*
15 * Pool is another multithreaded test/benchmarking program to evaluate
16 * affinity set placement in Leopard.
17 *
18 * The basic picture is:
19 *
20 * -> producer -- -> consumer --
21 * free / \ work / \
22 * -> queue -- ... --> queue -- --
23 * | \ / \ / |
24 * | -> producer -- -> consumer -- |
25 * ---------------------------------------------------------------
26 *
27 * <---------- "stage" ---------> <---------- "stage" --------->
28 *
29 * There are a series of work stages. Each stage has an input and an output
30 * queue and multiple threads. The first stage is the producer and subsequent
31 * stages are consumers. By defuaut there are 2 stages. There are N producer
32 * and M consumer threads. The are B buffers per producer threads circulating
33 * through the system.
34 *
35 * When affinity is enabled, each producer thread is tagged with an affinity tag
36 * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to
37 * the work queue it is tagged with this affinity. When a consumer dequeues a
38 * work item, it sets its affinity to this tag. Hence consumer threads migrate
39 * to the same affinity set where the data was produced.
40 *
41 * Buffer management uses pthread mutex/condition variables. A thread blocks
42 * when no buffer is available on a queue and it is signaled when a buffer
43 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
44 * The queue management is centralized in a single routine: what queues to
45 * use as input and output and what function to call for processing is
46 * data-driven.
47 */
0a7de745 48
2d21ac55 49pthread_mutex_t funnel;
0a7de745 50pthread_cond_t barrier;
2d21ac55 51
0a7de745
A
52uint64_t timer;
53int threads;
54int threads_ready = 0;
2d21ac55 55
0a7de745
A
56int iterations = 10000;
57boolean_t affinity = FALSE;
58boolean_t halting = FALSE;
59int verbosity = 1;
2d21ac55
A
60
61typedef struct work {
0a7de745
A
62 TAILQ_ENTRY(work) link;
63 int *data;
64 int isize;
65 int tag;
66 int number;
2d21ac55
A
67} work_t;
68
69/*
70 * A work queue, complete with pthread objects for its management
71 */
72typedef struct work_queue {
0a7de745
A
73 pthread_mutex_t mtx;
74 pthread_cond_t cnd;
75 TAILQ_HEAD(, work) queue;
76 unsigned int waiters;
2d21ac55
A
77} work_queue_t;
78
79/* Worker functions take a integer array and size */
0a7de745 80typedef void (worker_fn_t)(int *, int);
2d21ac55
A
81
82/* This struct controls the function of a stage */
83#define WORKERS_MAX 10
84typedef struct {
0a7de745
A
85 int stagenum;
86 char *name;
87 worker_fn_t *fn;
88 work_queue_t *input;
89 work_queue_t *output;
90 work_queue_t bufq;
91 int work_todo;
2d21ac55
A
92} stage_info_t;
93
94/* This defines a worker thread */
95typedef struct worker_info {
0a7de745
A
96 int setnum;
97 stage_info_t *stage;
98 pthread_t thread;
2d21ac55
A
99} worker_info_t;
100
0a7de745
A
101#define DBG(x...) do { \
102 if (verbosity > 1) { \
103 pthread_mutex_lock(&funnel); \
104 printf(x); \
105 pthread_mutex_unlock(&funnel); \
106 } \
2d21ac55
A
107} while (0)
108
0a7de745
A
109#define mutter(x...) do { \
110 if (verbosity > 0) { \
111 printf(x); \
112 } \
2d21ac55
A
113} while (0)
114
0a7de745 115#define s_if_plural(x) (((x) > 1) ? "s" : "")
2d21ac55
A
116
117static void
118usage()
119{
120 fprintf(stderr,
0a7de745
A
121 "usage: pool [-a] Turn affinity on (off)\n"
122 " [-b B] Number of buffers per producer (2)\n"
123 " [-i I] Number of buffers to produce (10000)\n"
124 " [-s S] Number of stages (2)\n"
125 " [-p P] Number of pages per buffer (256=1MB)]\n"
126 " [-w] Consumer writes data\n"
127 " [-v V] Verbosity level 0..2 (1)\n"
128 " [N [M]] Number of producer and consumers (2)\n"
129 );
2d21ac55
A
130 exit(1);
131}
132
133/* Trivial producer: write to each byte */
134void
135writer_fn(int *data, int isize)
136{
0a7de745 137 int i;
2d21ac55
A
138
139 for (i = 0; i < isize; i++) {
140 data[i] = i;
141 }
142}
143
144/* Trivial consumer: read each byte */
145void
146reader_fn(int *data, int isize)
147{
0a7de745
A
148 int i;
149 int datum;
2d21ac55
A
150
151 for (i = 0; i < isize; i++) {
152 datum = data[i];
153 }
154}
155
156/* Consumer reading and writing the buffer */
157void
158reader_writer_fn(int *data, int isize)
159{
0a7de745 160 int i;
2d21ac55
A
161
162 for (i = 0; i < isize; i++) {
163 data[i] += 1;
164 }
165}
166
167void
168affinity_set(int tag)
169{
0a7de745
A
170 kern_return_t ret;
171 thread_affinity_policy_data_t policy;
2d21ac55
A
172 if (affinity) {
173 policy.affinity_tag = tag;
174 ret = thread_policy_set(
0a7de745
A
175 mach_thread_self(), THREAD_AFFINITY_POLICY,
176 (thread_policy_t) &policy,
177 THREAD_AFFINITY_POLICY_COUNT);
178 if (ret != KERN_SUCCESS) {
2d21ac55 179 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
0a7de745 180 }
2d21ac55 181 }
2d21ac55
A
182}
183
184/*
185 * This is the central function for every thread.
186 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
187 */
188void *
189manager_fn(void *arg)
190{
0a7de745
A
191 worker_info_t *wp = (worker_info_t *) arg;
192 stage_info_t *sp = wp->stage;
193 boolean_t is_producer = (sp->stagenum == 0);
194 long iteration = 0;
195 int current_tag = 0;
196
197 kern_return_t ret;
198 thread_extended_policy_data_t epolicy;
2d21ac55
A
199 epolicy.timeshare = FALSE;
200 ret = thread_policy_set(
0a7de745
A
201 mach_thread_self(), THREAD_EXTENDED_POLICY,
202 (thread_policy_t) &epolicy,
203 THREAD_EXTENDED_POLICY_COUNT);
204 if (ret != KERN_SUCCESS) {
2d21ac55 205 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
0a7de745
A
206 }
207
2d21ac55
A
208 /*
209 * If we're using affinity sets and we're a producer
210 * set our tag to by our thread set number.
211 */
212 if (affinity && is_producer) {
213 affinity_set(wp->setnum);
214 current_tag = wp->setnum;
215 }
216
217 DBG("Starting %s %d, stage: %d\n", sp->name, wp->setnum, sp->stagenum);
218
219 /*
220 * Start barrier.
221 * The tets thread to get here releases everyone and starts the timer.
222 */
223 pthread_mutex_lock(&funnel);
224 threads_ready++;
225 if (threads_ready == threads) {
226 pthread_mutex_unlock(&funnel);
227 if (halting) {
228 printf(" all threads ready for process %d, "
0a7de745 229 "hit any key to start", getpid());
2d21ac55
A
230 fflush(stdout);
231 (void) getchar();
232 }
233 pthread_cond_broadcast(&barrier);
234 timer = mach_absolute_time();
235 } else {
236 pthread_cond_wait(&barrier, &funnel);
237 pthread_mutex_unlock(&funnel);
238 }
239
240 do {
0a7de745 241 work_t *workp;
2d21ac55
A
242
243 /*
244 * Get a buffer from the input queue.
245 * Block if none.
246 * Quit if all work done.
247 */
248 pthread_mutex_lock(&sp->input->mtx);
249 while (1) {
250 if (sp->work_todo == 0) {
251 pthread_mutex_unlock(&sp->input->mtx);
252 goto out;
253 }
254 workp = TAILQ_FIRST(&(sp->input->queue));
0a7de745 255 if (workp != NULL) {
2d21ac55 256 break;
0a7de745 257 }
2d21ac55 258 DBG(" %s[%d,%d] todo %d waiting for buffer\n",
0a7de745 259 sp->name, wp->setnum, sp->stagenum, sp->work_todo);
2d21ac55
A
260 sp->input->waiters++;
261 pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
262 sp->input->waiters--;
263 }
264 TAILQ_REMOVE(&(sp->input->queue), workp, link);
265 iteration = sp->work_todo--;
266 pthread_mutex_unlock(&sp->input->mtx);
267
268 if (is_producer) {
269 workp->number = iteration;
270 workp->tag = wp->setnum;
271 } else {
272 if (affinity && current_tag != workp->tag) {
273 affinity_set(workp->tag);
274 current_tag = workp->tag;
275 }
276 }
277
278 DBG(" %s[%d,%d] todo %d work %p data %p\n",
0a7de745 279 sp->name, wp->setnum, sp->stagenum, iteration, workp, workp->data);
2d21ac55
A
280
281 /* Do our stuff with the buffer */
282 (void) sp->fn(workp->data, workp->isize);
283
284 /*
285 * Place the buffer on the input queue of the next stage.
286 * Signal waiters if required.
287 */
288 pthread_mutex_lock(&sp->output->mtx);
289 TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
290 if (sp->output->waiters) {
291 DBG(" %s[%d,%d] todo %d signaling work\n",
0a7de745 292 sp->name, wp->setnum, sp->stagenum, iteration);
2d21ac55
A
293 pthread_cond_signal(&sp->output->cnd);
294 }
295 pthread_mutex_unlock(&sp->output->mtx);
2d21ac55
A
296 } while (1);
297
298out:
299 pthread_cond_broadcast(&sp->output->cnd);
300
301 DBG("Ending %s[%d,%d]\n", sp->name, wp->setnum, sp->stagenum);
302
303 return (void *) iteration;
304}
305
306void (*producer_fnp)(int *data, int isize) = &writer_fn;
307void (*consumer_fnp)(int *data, int isize) = &reader_fn;
308
309int
310main(int argc, char *argv[])
311{
0a7de745
A
312 int i;
313 int j;
314 int k;
315 int pages = 256; /* 1MB */
316 int buffers = 2;
317 int producers = 2;
318 int consumers = 2;
319 int stages = 2;
320 int *status;
321 stage_info_t *stage_info;
322 stage_info_t *sp;
323 worker_info_t *worker_info;
324 worker_info_t *wp;
325 kern_return_t ret;
326 int c;
2d21ac55
A
327
328 /* Do switch parsing: */
0a7de745 329 while ((c = getopt(argc, argv, "ab:i:p:s:twv:")) != -1) {
2d21ac55
A
330 switch (c) {
331 case 'a':
2d21ac55
A
332 affinity = !affinity;
333 break;
2d21ac55
A
334 case 'b':
335 buffers = atoi(optarg);
336 break;
337 case 'i':
338 iterations = atoi(optarg);
339 break;
340 case 'p':
341 pages = atoi(optarg);
342 break;
343 case 's':
344 stages = atoi(optarg);
0a7de745 345 if (stages >= WORKERS_MAX) {
2d21ac55 346 usage();
0a7de745 347 }
2d21ac55
A
348 break;
349 case 't':
350 halting = TRUE;
351 break;
352 case 'w':
353 consumer_fnp = &reader_writer_fn;
354 break;
355 case 'v':
356 verbosity = atoi(optarg);
357 break;
358 case 'h':
359 case '?':
360 default:
361 usage();
362 }
363 }
364 argc -= optind; argv += optind;
0a7de745 365 if (argc > 0) {
2d21ac55 366 producers = atoi(*argv);
0a7de745 367 }
2d21ac55 368 argc--; argv++;
0a7de745 369 if (argc > 0) {
2d21ac55 370 consumers = atoi(*argv);
0a7de745
A
371 }
372
2d21ac55
A
373 pthread_mutex_init(&funnel, NULL);
374 pthread_cond_init(&barrier, NULL);
375
376 /*
0a7de745 377 * Fire up the worker threads.
2d21ac55
A
378 */
379 threads = consumers * (stages - 1) + producers;
380 mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n"
0a7de745
A
381 " with %saffinity, consumer reads%s data\n",
382 producers, s_if_plural(producers),
383 stages - 1, s_if_plural(stages - 1),
384 consumers, s_if_plural(consumers),
385 affinity? "": "no ",
386 (consumer_fnp == &reader_writer_fn)? " and writes" : "");
387 if (pages < 256) {
2d21ac55 388 mutter(" %dkB bytes per buffer, ", pages * 4);
0a7de745 389 } else {
2d21ac55 390 mutter(" %dMB bytes per buffer, ", pages / 256);
0a7de745 391 }
2d21ac55 392 mutter("%d buffer%s per producer ",
0a7de745
A
393 buffers, s_if_plural(buffers));
394 if (buffers * pages < 256) {
2d21ac55 395 mutter("(total %dkB)\n", buffers * pages * 4);
0a7de745 396 } else {
2d21ac55 397 mutter("(total %dMB)\n", buffers * pages / 256);
0a7de745 398 }
2d21ac55 399 mutter(" processing %d buffer%s...\n",
0a7de745 400 iterations, s_if_plural(iterations));
2d21ac55
A
401
402 stage_info = (stage_info_t *) malloc(stages * sizeof(stage_info_t));
403 worker_info = (worker_info_t *) malloc(threads * sizeof(worker_info_t));
404
405 /* Set up the queue for the workers of this thread set: */
406 for (i = 0; i < stages; i++) {
407 sp = &stage_info[i];
408 sp->stagenum = i;
409 pthread_mutex_init(&sp->bufq.mtx, NULL);
410 pthread_cond_init(&sp->bufq.cnd, NULL);
411 TAILQ_INIT(&sp->bufq.queue);
412 sp->bufq.waiters = 0;
413 if (i == 0) {
414 sp->fn = producer_fnp;
415 sp->name = "producer";
416 } else {
417 sp->fn = consumer_fnp;
418 sp->name = "consumer";
419 }
420 sp->input = &sp->bufq;
421 sp->output = &stage_info[(i + 1) % stages].bufq;
422 stage_info[i].work_todo = iterations;
423 }
0a7de745 424
2d21ac55
A
425 /* Create the producers */
426 for (i = 0; i < producers; i++) {
0a7de745
A
427 work_t *work_array;
428 int *data;
429 int isize;
2d21ac55
A
430
431 isize = pages * 4096 / sizeof(int);
432 data = (int *) malloc(buffers * pages * 4096);
433
434 /* Set up the empty work buffers */
435 work_array = (work_t *) malloc(buffers * sizeof(work_t));
436 for (j = 0; j < buffers; j++) {
0a7de745 437 work_array[j].data = data + (isize * j);
2d21ac55
A
438 work_array[j].isize = isize;
439 work_array[j].tag = 0;
440 TAILQ_INSERT_TAIL(&stage_info[0].bufq.queue, &work_array[j], link);
441 DBG(" empty work item %p for data %p\n",
0a7de745 442 &work_array[j], work_array[j].data);
2d21ac55
A
443 }
444 wp = &worker_info[i];
445 wp->setnum = i + 1;
446 wp->stage = &stage_info[0];
447 if (ret = pthread_create(&wp->thread,
0a7de745
A
448 NULL,
449 &manager_fn,
450 (void *) wp)) {
2d21ac55 451 err(1, "pthread_create %d,%d", 0, i);
0a7de745 452 }
2d21ac55
A
453 }
454
455 /* Create consumers */
456 for (i = 1; i < stages; i++) {
457 for (j = 0; j < consumers; j++) {
0a7de745 458 wp = &worker_info[producers + (consumers * (i - 1)) + j];
2d21ac55
A
459 wp->setnum = j + 1;
460 wp->stage = &stage_info[i];
461 if (ret = pthread_create(&wp->thread,
0a7de745
A
462 NULL,
463 &manager_fn,
464 (void *) wp)) {
2d21ac55 465 err(1, "pthread_create %d,%d", i, j);
0a7de745 466 }
2d21ac55
A
467 }
468 }
469
470 /*
471 * We sit back anf wait for the slaves to finish.
472 */
473 for (k = 0; k < threads; k++) {
0a7de745
A
474 int i;
475 int j;
2d21ac55
A
476
477 wp = &worker_info[k];
478 if (k < producers) {
479 i = 0;
480 j = k;
481 } else {
482 i = (k - producers) / consumers;
483 j = (k - producers) % consumers;
484 }
0a7de745
A
485 if (ret = pthread_join(wp->thread, (void **)&status)) {
486 err(1, "pthread_join %d,%d", i, j);
487 }
2d21ac55
A
488 DBG("Thread %d,%d status %d\n", i, j, status);
489 }
490
491 /*
492 * See how long the work took.
493 */
494 timer = mach_absolute_time() - timer;
495 timer = timer / 1000000ULL;
496 printf("%d.%03d seconds elapsed.\n",
0a7de745 497 (int) (timer / 1000ULL), (int) (timer % 1000ULL));
2d21ac55
A
498
499 return 0;
500}