]> git.saurik.com Git - apple/xnu.git/blob - tools/tests/affinity/sets.c
50eda2626ee6cd2c711b87ffae76c5db59ea4046
[apple/xnu.git] / tools / tests / affinity / sets.c
1 #include <AvailabilityMacros.h>
2 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
3 #include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h>
4 #endif
5 #include <mach/mach.h>
6 #include <mach/mach_error.h>
7 #include <mach/mach_time.h>
8 #include <pthread.h>
9 #include <sys/queue.h>
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <unistd.h>
14 #include <err.h>
15 #include <errno.h>
16
17 /*
18 * Sets is a multithreaded test/benchmarking program to evaluate
19 * affinity set placement in Leopard.
20 *
21 * The picture here, for each set, is:
22 *
23 * free work
24 * -> queue --> producer --> queue --> consumer --
25 * | |
26 * -----------------------------------------------
27 *
28 * <------ "stage" -----> <------ "stage" ----->
29
30 * We spin off sets of production line threads (2 sets by default).
31 * All threads of each line sets the same affinity tag (unless disabled).
32 * By default there are 2 stage (worker) threads per production line.
33 * A worker thread removes a buffer from an input queue, processses it and
34 * queues it on an output queue. By default the initial stage (producer)
35 * writes every byte in a buffer and the other (consumer) stages read every
36 * byte. By default the buffers are 1MB (256 pages) in size but this can be
37 * overidden. By default there are 2 buffers per set (again overridable).
38 * Worker threads process (iterate over) 10000 buffers by default.
39 *
40 * With affinity enabled, each producer and consumer thread sets its affinity
41 * to the set number, 1 .. N. So the threads of each set share an L2 cache.
42 *
43 * Buffer management uses pthread mutex/condition variables. A thread blocks
44 * when no buffer is available on a queue and it is signaled when a buffer
45 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
46 * The queue management is centralized in a single routine: what queues to
47 * use as input and output and what function to call for processing is
48 * data-driven.
49 */
50
51 pthread_mutex_t funnel;
52 pthread_cond_t barrier;
53
54 uint64_t timer;
55 int threads;
56 int threads_ready = 0;
57
58 int iterations = 10000;
59 boolean_t affinity = FALSE;
60 boolean_t halting = FALSE;
61 boolean_t cache_config = FALSE;
62 int verbosity = 1;
63
64 typedef struct work {
65 TAILQ_ENTRY(work) link;
66 int *data;
67 } work_t;
68
69 /*
70 * A work queue, complete with pthread objects for its management
71 */
72 typedef struct work_queue {
73 pthread_mutex_t mtx;
74 pthread_cond_t cnd;
75 TAILQ_HEAD(, work) queue;
76 boolean_t waiters;
77 } work_queue_t;
78
79 /* Worker functions take a integer array and size */
80 typedef void (worker_fn_t)(int *, int);
81
82 /* This struct controls the function of a thread */
83 typedef struct {
84 int stagenum;
85 char *name;
86 worker_fn_t *fn;
87 work_queue_t *input;
88 work_queue_t *output;
89 struct line_info *set;
90 pthread_t thread;
91 work_queue_t bufq;
92 } stage_info_t;
93
94 /* This defines a thread set */
95 #define WORKERS_MAX 10
96 typedef struct line_info {
97 int setnum;
98 int *data;
99 int isize;
100 stage_info_t *stage[WORKERS_MAX];
101 } line_info_t;
102
103 #define DBG(x...) do { \
104 if (verbosity > 1) { \
105 pthread_mutex_lock(&funnel); \
106 printf(x); \
107 pthread_mutex_unlock(&funnel); \
108 } \
109 } while (0)
110
111 #define mutter(x...) do { \
112 if (verbosity > 0) { \
113 printf(x); \
114 } \
115 } while (0)
116
117 #define s_if_plural(x) (((x) > 1) ? "s" : "")
118
119 static void
120 usage()
121 {
122 fprintf(stderr,
123 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
124 "usage: sets [-a] Turn affinity on (off)\n"
125 " [-b B] Number of buffers per set/line (2)\n"
126 #else
127 "usage: sets [-b B] Number of buffers per set/line (2)\n"
128 #endif
129 " [-c] Configure for max cache performance\n"
130 " [-h] Print this\n"
131 " [-i I] Number of items/buffers to process (1000)\n"
132 " [-s S] Number of stages per set/line (2)\n"
133 " [-t] Halt for keyboard input to start\n"
134 " [-p P] Number of pages per buffer (256=1MB)]\n"
135 " [-w] Consumer writes data\n"
136 " [-v V] Level of verbosity 0..2 (1)\n"
137 " [N] Number of sets/lines (2)\n"
138 );
139 exit(1);
140 }
141
142 /* Trivial producer: write to each byte */
143 void
144 writer_fn(int *data, int isize)
145 {
146 int i;
147
148 for (i = 0; i < isize; i++) {
149 data[i] = i;
150 }
151 }
152
153 /* Trivial consumer: read each byte */
154 void
155 reader_fn(int *data, int isize)
156 {
157 int i;
158 int datum;
159
160 for (i = 0; i < isize; i++) {
161 datum = data[i];
162 }
163 }
164
165 /* Consumer reading and writing the buffer */
166 void
167 reader_writer_fn(int *data, int isize)
168 {
169 int i;
170
171 for (i = 0; i < isize; i++) {
172 data[i] += 1;
173 }
174 }
175
176 /*
177 * This is the central function for every thread.
178 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
179 */
180 void *
181 manager_fn(void *arg)
182 {
183 stage_info_t *sp = (stage_info_t *) arg;
184 line_info_t *lp = sp->set;
185 kern_return_t ret;
186 long iteration = 0;
187
188 /*
189 * If we're using affinity sets (we are by default)
190 * set our tag to by our thread set number.
191 */
192 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
193 thread_extended_policy_data_t epolicy;
194 thread_affinity_policy_data_t policy;
195
196 epolicy.timeshare = FALSE;
197 ret = thread_policy_set(
198 mach_thread_self(), THREAD_EXTENDED_POLICY,
199 (thread_policy_t) &epolicy,
200 THREAD_EXTENDED_POLICY_COUNT);
201 if (ret != KERN_SUCCESS)
202 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
203
204 if (affinity) {
205 policy.affinity_tag = lp->setnum;
206 ret = thread_policy_set(
207 mach_thread_self(), THREAD_AFFINITY_POLICY,
208 (thread_policy_t) &policy,
209 THREAD_AFFINITY_POLICY_COUNT);
210 if (ret != KERN_SUCCESS)
211 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
212 }
213 #endif
214
215 DBG("Starting %s set: %d stage: %d\n", sp->name, lp->setnum, sp->stagenum);
216
217 /*
218 * Start barrier.
219 * The tets thread to get here releases everyone and starts the timer.
220 */
221 pthread_mutex_lock(&funnel);
222 threads_ready++;
223 if (threads_ready == threads) {
224 pthread_mutex_unlock(&funnel);
225 if (halting) {
226 printf(" all threads ready for process %d, "
227 "hit any key to start", getpid());
228 fflush(stdout);
229 (void) getchar();
230 }
231 pthread_cond_broadcast(&barrier);
232 timer = mach_absolute_time();
233 } else {
234 pthread_cond_wait(&barrier, &funnel);
235 pthread_mutex_unlock(&funnel);
236 }
237
238 do {
239 int i;
240 work_t *workp;
241
242 /*
243 * Get a buffer from the input queue.
244 * Block if none.
245 */
246 pthread_mutex_lock(&sp->input->mtx);
247 while (1) {
248 workp = TAILQ_FIRST(&(sp->input->queue));
249 if (workp != NULL)
250 break;
251 DBG(" %s[%d,%d] iteration %d waiting for buffer\n",
252 sp->name, lp->setnum, sp->stagenum, iteration);
253 sp->input->waiters = TRUE;
254 pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
255 sp->input->waiters = FALSE;
256 }
257 TAILQ_REMOVE(&(sp->input->queue), workp, link);
258 pthread_mutex_unlock(&sp->input->mtx);
259
260 DBG(" %s[%d,%d] iteration %d work %p data %p\n",
261 sp->name, lp->setnum, sp->stagenum, iteration, workp, workp->data);
262
263 /* Do our stuff with the buffer */
264 (void) sp->fn(workp->data, lp->isize);
265
266 /*
267 * Place the buffer on the input queue.
268 * Signal waiters if required.
269 */
270 pthread_mutex_lock(&sp->output->mtx);
271 TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
272 if (sp->output->waiters) {
273 DBG(" %s[%d,%d] iteration %d signaling work\n",
274 sp->name, lp->setnum, sp->stagenum, iteration);
275 pthread_cond_signal(&sp->output->cnd);
276 }
277 pthread_mutex_unlock(&sp->output->mtx);
278 } while (++iteration < iterations);
279
280 DBG("Ending %s[%d,%d]\n", sp->name, lp->setnum, sp->stagenum);
281
282 return (void *) iteration;
283 }
284
285 #define MAX_CACHE_DEPTH 10
286 static void
287 auto_config(int npages, int *nbufs, int *nsets)
288 {
289 int len;
290 int ncpu;
291 int llc;
292 int64_t cacheconfig[MAX_CACHE_DEPTH];
293 int64_t cachesize[MAX_CACHE_DEPTH];
294
295 mutter("Autoconfiguring...\n");
296
297 len = sizeof(cacheconfig);
298 if (sysctlbyname("hw.cacheconfig",
299 &cacheconfig[0], &len, NULL, 0) != 0) {
300 printf("Unable to get hw.cacheconfig, %d\n", errno);
301 exit(1);
302 }
303 len = sizeof(cachesize);
304 if (sysctlbyname("hw.cachesize",
305 &cachesize[0], &len, NULL, 0) != 0) {
306 printf("Unable to get hw.cachesize, %d\n", errno);
307 exit(1);
308 }
309
310 /*
311 * Find LLC
312 */
313 for (llc = MAX_CACHE_DEPTH - 1; llc > 0; llc--)
314 if (cacheconfig[llc] != 0)
315 break;
316
317 /*
318 * Calculate number of buffers of size pages*4096 bytes
319 * fit into 90% of an L2 cache.
320 */
321 *nbufs = cachesize[llc] * 9 / (npages * 4096 * 10);
322 mutter(" L%d (LLC) cache %qd bytes: "
323 "using %d buffers of size %d bytes\n",
324 llc, cachesize[llc], *nbufs, (npages * 4096));
325
326 /*
327 * Calcalute how many sets:
328 */
329 *nsets = cacheconfig[0]/cacheconfig[llc];
330 mutter(" %qd cpus; %qd cpus per L%d cache: using %d sets\n",
331 cacheconfig[0], cacheconfig[llc], llc, *nsets);
332 }
333
334 void (*producer_fnp)(int *data, int isize) = &writer_fn;
335 void (*consumer_fnp)(int *data, int isize) = &reader_fn;
336
337 int
338 main(int argc, char *argv[])
339 {
340 int i;
341 int j;
342 int pages = 256; /* 1MB */
343 int buffers = 2;
344 int sets = 2;
345 int stages = 2;
346 int *status;
347 line_info_t *line_info;
348 line_info_t *lp;
349 stage_info_t *stage_info;
350 stage_info_t *sp;
351 kern_return_t ret;
352 int c;
353
354 /* Do switch parsing: */
355 while ((c = getopt (argc, argv, "ab:chi:p:s:twv:")) != -1) {
356 switch (c) {
357 case 'a':
358 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
359 affinity = !affinity;
360 break;
361 #else
362 usage();
363 #endif
364 case 'b':
365 buffers = atoi(optarg);
366 break;
367 case 'c':
368 cache_config = TRUE;
369 break;
370 case 'i':
371 iterations = atoi(optarg);
372 break;
373 case 'p':
374 pages = atoi(optarg);
375 break;
376 case 's':
377 stages = atoi(optarg);
378 if (stages >= WORKERS_MAX)
379 usage();
380 break;
381 case 't':
382 halting = TRUE;
383 break;
384 case 'w':
385 consumer_fnp = &reader_writer_fn;
386 break;
387 case 'v':
388 verbosity = atoi(optarg);
389 break;
390 case '?':
391 case 'h':
392 default:
393 usage();
394 }
395 }
396 argc -= optind; argv += optind;
397 if (argc > 0)
398 sets = atoi(*argv);
399
400 if (cache_config)
401 auto_config(pages, &buffers, &sets);
402
403 pthread_mutex_init(&funnel, NULL);
404 pthread_cond_init(&barrier, NULL);
405
406 /*
407 * Fire up the worker threads.
408 */
409 threads = sets * stages;
410 mutter("Launching %d set%s of %d threads with %saffinity, "
411 "consumer reads%s data\n",
412 sets, s_if_plural(sets), stages, affinity? "": "no ",
413 (consumer_fnp == &reader_writer_fn)? " and writes" : "");
414 if (pages < 256)
415 mutter(" %dkB bytes per buffer, ", pages * 4);
416 else
417 mutter(" %dMB bytes per buffer, ", pages / 256);
418 mutter("%d buffer%s per set ",
419 buffers, s_if_plural(buffers));
420 if (buffers * pages < 256)
421 mutter("(total %dkB)\n", buffers * pages * 4);
422 else
423 mutter("(total %dMB)\n", buffers * pages / 256);
424 mutter(" processing %d buffer%s...\n",
425 iterations, s_if_plural(iterations));
426 line_info = (line_info_t *) malloc(sets * sizeof(line_info_t));
427 stage_info = (stage_info_t *) malloc(sets * stages * sizeof(stage_info_t));
428 for (i = 0; i < sets; i++) {
429 work_t *work_array;
430
431 lp = &line_info[i];
432
433 lp->setnum = i + 1;
434 lp->isize = pages * 4096 / sizeof(int);
435 lp->data = (int *) malloc(buffers * pages * 4096);
436
437 /* Set up the queue for the workers of this thread set: */
438 for (j = 0; j < stages; j++) {
439 sp = &stage_info[(i*stages) + j];
440 sp->stagenum = j;
441 sp->set = lp;
442 lp->stage[j] = sp;
443 pthread_mutex_init(&sp->bufq.mtx, NULL);
444 pthread_cond_init(&sp->bufq.cnd, NULL);
445 TAILQ_INIT(&sp->bufq.queue);
446 sp->bufq.waiters = FALSE;
447 }
448
449 /*
450 * Take a second pass through the stages
451 * to define what the workers are and to interconnect their input/outputs
452 */
453 for (j = 0; j < stages; j++) {
454 sp = lp->stage[j];
455 if (j == 0) {
456 sp->fn = producer_fnp;
457 sp->name = "producer";
458 } else {
459 sp->fn = consumer_fnp;
460 sp->name = "consumer";
461 }
462 sp->input = &lp->stage[j]->bufq;
463 sp->output = &lp->stage[(j + 1) % stages]->bufq;
464 }
465
466 /* Set up the buffers on the first worker of the set. */
467 work_array = (work_t *) malloc(buffers * sizeof(work_t));
468 for (j = 0; j < buffers; j++) {
469 work_array[j].data = lp->data + (lp->isize * j);
470 TAILQ_INSERT_TAIL(&lp->stage[0]->bufq.queue, &work_array[j], link);
471 DBG(" empty work item %p for set %d data %p\n",
472 &work_array[j], i, work_array[j].data);
473 }
474
475 /* Create this set of threads */
476 for (j = 0; j < stages; j++) {
477 if (ret = pthread_create(&lp->stage[j]->thread, NULL,
478 &manager_fn,
479 (void *) lp->stage[j]))
480 err(1, "pthread_create %d,%d", i, j);
481 }
482 }
483
484 /*
485 * We sit back anf wait for the slave to finish.
486 */
487 for (i = 0; i < sets; i++) {
488 lp = &line_info[i];
489 for (j = 0; j < stages; j++) {
490 if(ret = pthread_join(lp->stage[j]->thread, (void **)&status))
491 err(1, "pthread_join %d,%d", i, j);
492 DBG("Thread %d,%d status %d\n", i, j, status);
493 }
494 }
495
496 /*
497 * See how long the work took.
498 */
499 timer = mach_absolute_time() - timer;
500 timer = timer / 1000000ULL;
501 printf("%d.%03d seconds elapsed.\n",
502 (int) (timer/1000ULL), (int) (timer % 1000ULL));
503
504 return 0;
505 }