]> git.saurik.com Git - apple/xnu.git/blame_incremental - tools/tests/affinity/pool.c
xnu-2050.48.11.tar.gz
[apple/xnu.git] / tools / tests / affinity / pool.c
... / ...
CommitLineData
1#include <AvailabilityMacros.h>
2#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
3#include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h>
4#endif
5#include <mach/mach.h>
6#include <mach/mach_error.h>
7#include <mach/mach_time.h>
8#include <pthread.h>
9#include <sys/queue.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <unistd.h>
14#include <err.h>
15
16/*
17 * Pool is another multithreaded test/benchmarking program to evaluate
18 * affinity set placement in Leopard.
19 *
20 * The basic picture is:
21 *
22 * -> producer -- -> consumer --
23 * free / \ work / \
24 * -> queue -- ... --> queue -- --
25 * | \ / \ / |
26 * | -> producer -- -> consumer -- |
27 * ---------------------------------------------------------------
28 *
29 * <---------- "stage" ---------> <---------- "stage" --------->
30 *
31 * There are a series of work stages. Each stage has an input and an output
32 * queue and multiple threads. The first stage is the producer and subsequent
33 * stages are consumers. By defuaut there are 2 stages. There are N producer
34 * and M consumer threads. The are B buffers per producer threads circulating
35 * through the system.
36 *
37 * When affinity is enabled, each producer thread is tagged with an affinity tag
38 * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to
39 * the work queue it is tagged with this affinity. When a consumer dequeues a
40 * work item, it sets its affinity to this tag. Hence consumer threads migrate
41 * to the same affinity set where the data was produced.
42 *
43 * Buffer management uses pthread mutex/condition variables. A thread blocks
44 * when no buffer is available on a queue and it is signaled when a buffer
45 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
46 * The queue management is centralized in a single routine: what queues to
47 * use as input and output and what function to call for processing is
48 * data-driven.
49 */
50
51pthread_mutex_t funnel;
52pthread_cond_t barrier;
53
54uint64_t timer;
55int threads;
56int threads_ready = 0;
57
58int iterations = 10000;
59boolean_t affinity = FALSE;
60boolean_t halting = FALSE;
61int verbosity = 1;
62
63typedef struct work {
64 TAILQ_ENTRY(work) link;
65 int *data;
66 int isize;
67 int tag;
68 int number;
69} work_t;
70
71/*
72 * A work queue, complete with pthread objects for its management
73 */
74typedef struct work_queue {
75 pthread_mutex_t mtx;
76 pthread_cond_t cnd;
77 TAILQ_HEAD(, work) queue;
78 unsigned int waiters;
79} work_queue_t;
80
81/* Worker functions take a integer array and size */
82typedef void (worker_fn_t)(int *, int);
83
84/* This struct controls the function of a stage */
85#define WORKERS_MAX 10
86typedef struct {
87 int stagenum;
88 char *name;
89 worker_fn_t *fn;
90 work_queue_t *input;
91 work_queue_t *output;
92 work_queue_t bufq;
93 int work_todo;
94} stage_info_t;
95
96/* This defines a worker thread */
97typedef struct worker_info {
98 int setnum;
99 stage_info_t *stage;
100 pthread_t thread;
101} worker_info_t;
102
103#define DBG(x...) do { \
104 if (verbosity > 1) { \
105 pthread_mutex_lock(&funnel); \
106 printf(x); \
107 pthread_mutex_unlock(&funnel); \
108 } \
109} while (0)
110
111#define mutter(x...) do { \
112 if (verbosity > 0) { \
113 printf(x); \
114 } \
115} while (0)
116
117#define s_if_plural(x) (((x) > 1) ? "s" : "")
118
119static void
120usage()
121{
122 fprintf(stderr,
123#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
124 "usage: pool [-a] Turn affinity on (off)\n"
125 " [-b B] Number of buffers per producer (2)\n"
126#else
127 "usage: pool [-b B] Number of buffers per producer (2)\n"
128#endif
129 " [-i I] Number of buffers to produce (10000)\n"
130 " [-s S] Number of stages (2)\n"
131 " [-p P] Number of pages per buffer (256=1MB)]\n"
132 " [-w] Consumer writes data\n"
133 " [-v V] Verbosity level 0..2 (1)\n"
134 " [N [M]] Number of producer and consumers (2)\n"
135 );
136 exit(1);
137}
138
139/* Trivial producer: write to each byte */
140void
141writer_fn(int *data, int isize)
142{
143 int i;
144
145 for (i = 0; i < isize; i++) {
146 data[i] = i;
147 }
148}
149
150/* Trivial consumer: read each byte */
151void
152reader_fn(int *data, int isize)
153{
154 int i;
155 int datum;
156
157 for (i = 0; i < isize; i++) {
158 datum = data[i];
159 }
160}
161
162/* Consumer reading and writing the buffer */
163void
164reader_writer_fn(int *data, int isize)
165{
166 int i;
167
168 for (i = 0; i < isize; i++) {
169 data[i] += 1;
170 }
171}
172
173void
174affinity_set(int tag)
175{
176#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
177 kern_return_t ret;
178 thread_affinity_policy_data_t policy;
179 if (affinity) {
180 policy.affinity_tag = tag;
181 ret = thread_policy_set(
182 mach_thread_self(), THREAD_AFFINITY_POLICY,
183 (thread_policy_t) &policy,
184 THREAD_AFFINITY_POLICY_COUNT);
185 if (ret != KERN_SUCCESS)
186 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
187 }
188#endif
189}
190
191/*
192 * This is the central function for every thread.
193 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
194 */
195void *
196manager_fn(void *arg)
197{
198 worker_info_t *wp = (worker_info_t *) arg;
199 stage_info_t *sp = wp->stage;
200 boolean_t is_producer = (sp->stagenum == 0);
201 long iteration = 0;
202 int current_tag = 0;
203
204#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
205 kern_return_t ret;
206 thread_extended_policy_data_t epolicy;
207 epolicy.timeshare = FALSE;
208 ret = thread_policy_set(
209 mach_thread_self(), THREAD_EXTENDED_POLICY,
210 (thread_policy_t) &epolicy,
211 THREAD_EXTENDED_POLICY_COUNT);
212 if (ret != KERN_SUCCESS)
213 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
214
215#endif
216 /*
217 * If we're using affinity sets and we're a producer
218 * set our tag to by our thread set number.
219 */
220 if (affinity && is_producer) {
221 affinity_set(wp->setnum);
222 current_tag = wp->setnum;
223 }
224
225 DBG("Starting %s %d, stage: %d\n", sp->name, wp->setnum, sp->stagenum);
226
227 /*
228 * Start barrier.
229 * The tets thread to get here releases everyone and starts the timer.
230 */
231 pthread_mutex_lock(&funnel);
232 threads_ready++;
233 if (threads_ready == threads) {
234 pthread_mutex_unlock(&funnel);
235 if (halting) {
236 printf(" all threads ready for process %d, "
237 "hit any key to start", getpid());
238 fflush(stdout);
239 (void) getchar();
240 }
241 pthread_cond_broadcast(&barrier);
242 timer = mach_absolute_time();
243 } else {
244 pthread_cond_wait(&barrier, &funnel);
245 pthread_mutex_unlock(&funnel);
246 }
247
248 do {
249 work_t *workp;
250
251 /*
252 * Get a buffer from the input queue.
253 * Block if none.
254 * Quit if all work done.
255 */
256 pthread_mutex_lock(&sp->input->mtx);
257 while (1) {
258 if (sp->work_todo == 0) {
259 pthread_mutex_unlock(&sp->input->mtx);
260 goto out;
261 }
262 workp = TAILQ_FIRST(&(sp->input->queue));
263 if (workp != NULL)
264 break;
265 DBG(" %s[%d,%d] todo %d waiting for buffer\n",
266 sp->name, wp->setnum, sp->stagenum, sp->work_todo);
267 sp->input->waiters++;
268 pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
269 sp->input->waiters--;
270 }
271 TAILQ_REMOVE(&(sp->input->queue), workp, link);
272 iteration = sp->work_todo--;
273 pthread_mutex_unlock(&sp->input->mtx);
274
275 if (is_producer) {
276 workp->number = iteration;
277 workp->tag = wp->setnum;
278 } else {
279 if (affinity && current_tag != workp->tag) {
280 affinity_set(workp->tag);
281 current_tag = workp->tag;
282 }
283 }
284
285 DBG(" %s[%d,%d] todo %d work %p data %p\n",
286 sp->name, wp->setnum, sp->stagenum, iteration, workp, workp->data);
287
288 /* Do our stuff with the buffer */
289 (void) sp->fn(workp->data, workp->isize);
290
291 /*
292 * Place the buffer on the input queue of the next stage.
293 * Signal waiters if required.
294 */
295 pthread_mutex_lock(&sp->output->mtx);
296 TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
297 if (sp->output->waiters) {
298 DBG(" %s[%d,%d] todo %d signaling work\n",
299 sp->name, wp->setnum, sp->stagenum, iteration);
300 pthread_cond_signal(&sp->output->cnd);
301 }
302 pthread_mutex_unlock(&sp->output->mtx);
303
304 } while (1);
305
306out:
307 pthread_cond_broadcast(&sp->output->cnd);
308
309 DBG("Ending %s[%d,%d]\n", sp->name, wp->setnum, sp->stagenum);
310
311 return (void *) iteration;
312}
313
314void (*producer_fnp)(int *data, int isize) = &writer_fn;
315void (*consumer_fnp)(int *data, int isize) = &reader_fn;
316
317int
318main(int argc, char *argv[])
319{
320 int i;
321 int j;
322 int k;
323 int pages = 256; /* 1MB */
324 int buffers = 2;
325 int producers = 2;
326 int consumers = 2;
327 int stages = 2;
328 int *status;
329 stage_info_t *stage_info;
330 stage_info_t *sp;
331 worker_info_t *worker_info;
332 worker_info_t *wp;
333 kern_return_t ret;
334 int c;
335
336 /* Do switch parsing: */
337 while ((c = getopt (argc, argv, "ab:i:p:s:twv:")) != -1) {
338 switch (c) {
339 case 'a':
340#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
341 affinity = !affinity;
342 break;
343#else
344 usage();
345#endif
346 case 'b':
347 buffers = atoi(optarg);
348 break;
349 case 'i':
350 iterations = atoi(optarg);
351 break;
352 case 'p':
353 pages = atoi(optarg);
354 break;
355 case 's':
356 stages = atoi(optarg);
357 if (stages >= WORKERS_MAX)
358 usage();
359 break;
360 case 't':
361 halting = TRUE;
362 break;
363 case 'w':
364 consumer_fnp = &reader_writer_fn;
365 break;
366 case 'v':
367 verbosity = atoi(optarg);
368 break;
369 case 'h':
370 case '?':
371 default:
372 usage();
373 }
374 }
375 argc -= optind; argv += optind;
376 if (argc > 0)
377 producers = atoi(*argv);
378 argc--; argv++;
379 if (argc > 0)
380 consumers = atoi(*argv);
381
382 pthread_mutex_init(&funnel, NULL);
383 pthread_cond_init(&barrier, NULL);
384
385 /*
386 * Fire up the worker threads.
387 */
388 threads = consumers * (stages - 1) + producers;
389 mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n"
390 " with %saffinity, consumer reads%s data\n",
391 producers, s_if_plural(producers),
392 stages - 1, s_if_plural(stages - 1),
393 consumers, s_if_plural(consumers),
394 affinity? "": "no ",
395 (consumer_fnp == &reader_writer_fn)? " and writes" : "");
396 if (pages < 256)
397 mutter(" %dkB bytes per buffer, ", pages * 4);
398 else
399 mutter(" %dMB bytes per buffer, ", pages / 256);
400 mutter("%d buffer%s per producer ",
401 buffers, s_if_plural(buffers));
402 if (buffers * pages < 256)
403 mutter("(total %dkB)\n", buffers * pages * 4);
404 else
405 mutter("(total %dMB)\n", buffers * pages / 256);
406 mutter(" processing %d buffer%s...\n",
407 iterations, s_if_plural(iterations));
408
409 stage_info = (stage_info_t *) malloc(stages * sizeof(stage_info_t));
410 worker_info = (worker_info_t *) malloc(threads * sizeof(worker_info_t));
411
412 /* Set up the queue for the workers of this thread set: */
413 for (i = 0; i < stages; i++) {
414 sp = &stage_info[i];
415 sp->stagenum = i;
416 pthread_mutex_init(&sp->bufq.mtx, NULL);
417 pthread_cond_init(&sp->bufq.cnd, NULL);
418 TAILQ_INIT(&sp->bufq.queue);
419 sp->bufq.waiters = 0;
420 if (i == 0) {
421 sp->fn = producer_fnp;
422 sp->name = "producer";
423 } else {
424 sp->fn = consumer_fnp;
425 sp->name = "consumer";
426 }
427 sp->input = &sp->bufq;
428 sp->output = &stage_info[(i + 1) % stages].bufq;
429 stage_info[i].work_todo = iterations;
430 }
431
432 /* Create the producers */
433 for (i = 0; i < producers; i++) {
434 work_t *work_array;
435 int *data;
436 int isize;
437
438 isize = pages * 4096 / sizeof(int);
439 data = (int *) malloc(buffers * pages * 4096);
440
441 /* Set up the empty work buffers */
442 work_array = (work_t *) malloc(buffers * sizeof(work_t));
443 for (j = 0; j < buffers; j++) {
444 work_array[j].data = data + (isize * j);
445 work_array[j].isize = isize;
446 work_array[j].tag = 0;
447 TAILQ_INSERT_TAIL(&stage_info[0].bufq.queue, &work_array[j], link);
448 DBG(" empty work item %p for data %p\n",
449 &work_array[j], work_array[j].data);
450 }
451 wp = &worker_info[i];
452 wp->setnum = i + 1;
453 wp->stage = &stage_info[0];
454 if (ret = pthread_create(&wp->thread,
455 NULL,
456 &manager_fn,
457 (void *) wp))
458 err(1, "pthread_create %d,%d", 0, i);
459 }
460
461 /* Create consumers */
462 for (i = 1; i < stages; i++) {
463 for (j = 0; j < consumers; j++) {
464 wp = &worker_info[producers + (consumers*(i-1)) + j];
465 wp->setnum = j + 1;
466 wp->stage = &stage_info[i];
467 if (ret = pthread_create(&wp->thread,
468 NULL,
469 &manager_fn,
470 (void *) wp))
471 err(1, "pthread_create %d,%d", i, j);
472 }
473 }
474
475 /*
476 * We sit back anf wait for the slaves to finish.
477 */
478 for (k = 0; k < threads; k++) {
479 int i;
480 int j;
481
482 wp = &worker_info[k];
483 if (k < producers) {
484 i = 0;
485 j = k;
486 } else {
487 i = (k - producers) / consumers;
488 j = (k - producers) % consumers;
489 }
490 if(ret = pthread_join(wp->thread, (void **)&status))
491 err(1, "pthread_join %d,%d", i, j);
492 DBG("Thread %d,%d status %d\n", i, j, status);
493 }
494
495 /*
496 * See how long the work took.
497 */
498 timer = mach_absolute_time() - timer;
499 timer = timer / 1000000ULL;
500 printf("%d.%03d seconds elapsed.\n",
501 (int) (timer/1000ULL), (int) (timer % 1000ULL));
502
503 return 0;
504}