]>
Commit | Line | Data |
---|---|---|
2d21ac55 | 1 | #include <AvailabilityMacros.h> |
fe8ab488 | 2 | #include <mach/thread_policy.h> |
2d21ac55 A |
3 | #include <mach/mach.h> |
4 | #include <mach/mach_error.h> | |
5 | #include <mach/mach_time.h> | |
6 | #include <pthread.h> | |
7 | #include <sys/queue.h> | |
8 | #include <stdio.h> | |
9 | #include <stdlib.h> | |
10 | #include <string.h> | |
11 | #include <unistd.h> | |
12 | #include <err.h> | |
13 | #include <errno.h> | |
39236c6e | 14 | #include <sys/sysctl.h> |
2d21ac55 A |
15 | |
16 | /* | |
17 | * Sets is a multithreaded test/benchmarking program to evaluate | |
18 | * affinity set placement in Leopard. | |
19 | * | |
20 | * The picture here, for each set, is: | |
0a7de745 | 21 | * |
2d21ac55 A |
22 | * free work |
23 | * -> queue --> producer --> queue --> consumer -- | |
24 | * | | | |
25 | * ----------------------------------------------- | |
26 | * | |
27 | * <------ "stage" -----> <------ "stage" -----> | |
0a7de745 | 28 | * |
2d21ac55 A |
29 | * We spin off sets of production line threads (2 sets by default). |
30 | * All threads of each line sets the same affinity tag (unless disabled). | |
31 | * By default there are 2 stage (worker) threads per production line. | |
32 | * A worker thread removes a buffer from an input queue, processses it and | |
33 | * queues it on an output queue. By default the initial stage (producer) | |
34 | * writes every byte in a buffer and the other (consumer) stages read every | |
35 | * byte. By default the buffers are 1MB (256 pages) in size but this can be | |
36 | * overidden. By default there are 2 buffers per set (again overridable). | |
37 | * Worker threads process (iterate over) 10000 buffers by default. | |
38 | * | |
39 | * With affinity enabled, each producer and consumer thread sets its affinity | |
40 | * to the set number, 1 .. N. So the threads of each set share an L2 cache. | |
41 | * | |
42 | * Buffer management uses pthread mutex/condition variables. A thread blocks | |
43 | * when no buffer is available on a queue and it is signaled when a buffer | |
44 | * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>. | |
45 | * The queue management is centralized in a single routine: what queues to | |
46 | * use as input and output and what function to call for processing is | |
47 | * data-driven. | |
48 | */ | |
0a7de745 | 49 | |
2d21ac55 | 50 | pthread_mutex_t funnel; |
0a7de745 | 51 | pthread_cond_t barrier; |
2d21ac55 | 52 | |
0a7de745 A |
53 | uint64_t timer; |
54 | int threads; | |
55 | int threads_ready = 0; | |
2d21ac55 | 56 | |
0a7de745 A |
57 | int iterations = 10000; |
58 | boolean_t affinity = FALSE; | |
59 | boolean_t halting = FALSE; | |
60 | boolean_t cache_config = FALSE; | |
61 | int verbosity = 1; | |
2d21ac55 A |
62 | |
63 | typedef struct work { | |
0a7de745 A |
64 | TAILQ_ENTRY(work) link; |
65 | int *data; | |
2d21ac55 A |
66 | } work_t; |
67 | ||
68 | /* | |
69 | * A work queue, complete with pthread objects for its management | |
70 | */ | |
71 | typedef struct work_queue { | |
0a7de745 A |
72 | pthread_mutex_t mtx; |
73 | pthread_cond_t cnd; | |
74 | TAILQ_HEAD(, work) queue; | |
75 | boolean_t waiters; | |
2d21ac55 A |
76 | } work_queue_t; |
77 | ||
78 | /* Worker functions take a integer array and size */ | |
0a7de745 | 79 | typedef void (worker_fn_t)(int *, int); |
2d21ac55 A |
80 | |
81 | /* This struct controls the function of a thread */ | |
82 | typedef struct { | |
0a7de745 A |
83 | int stagenum; |
84 | char *name; | |
85 | worker_fn_t *fn; | |
86 | work_queue_t *input; | |
87 | work_queue_t *output; | |
88 | struct line_info *set; | |
89 | pthread_t thread; | |
90 | work_queue_t bufq; | |
2d21ac55 A |
91 | } stage_info_t; |
92 | ||
93 | /* This defines a thread set */ | |
94 | #define WORKERS_MAX 10 | |
95 | typedef struct line_info { | |
0a7de745 A |
96 | int setnum; |
97 | int *data; | |
98 | int isize; | |
99 | stage_info_t *stage[WORKERS_MAX]; | |
2d21ac55 A |
100 | } line_info_t; |
101 | ||
0a7de745 A |
102 | #define DBG(x...) do { \ |
103 | if (verbosity > 1) { \ | |
104 | pthread_mutex_lock(&funnel); \ | |
105 | printf(x); \ | |
106 | pthread_mutex_unlock(&funnel); \ | |
107 | } \ | |
2d21ac55 A |
108 | } while (0) |
109 | ||
0a7de745 A |
110 | #define mutter(x...) do { \ |
111 | if (verbosity > 0) { \ | |
112 | printf(x); \ | |
113 | } \ | |
2d21ac55 A |
114 | } while (0) |
115 | ||
0a7de745 | 116 | #define s_if_plural(x) (((x) > 1) ? "s" : "") |
2d21ac55 A |
117 | |
118 | static void | |
119 | usage() | |
120 | { | |
121 | fprintf(stderr, | |
0a7de745 A |
122 | "usage: sets [-a] Turn affinity on (off)\n" |
123 | " [-b B] Number of buffers per set/line (2)\n" | |
124 | " [-c] Configure for max cache performance\n" | |
125 | " [-h] Print this\n" | |
126 | " [-i I] Number of items/buffers to process (1000)\n" | |
127 | " [-s S] Number of stages per set/line (2)\n" | |
128 | " [-t] Halt for keyboard input to start\n" | |
129 | " [-p P] Number of pages per buffer (256=1MB)]\n" | |
130 | " [-w] Consumer writes data\n" | |
131 | " [-v V] Level of verbosity 0..2 (1)\n" | |
132 | " [N] Number of sets/lines (2)\n" | |
133 | ); | |
2d21ac55 A |
134 | exit(1); |
135 | } | |
136 | ||
137 | /* Trivial producer: write to each byte */ | |
138 | void | |
139 | writer_fn(int *data, int isize) | |
140 | { | |
0a7de745 | 141 | int i; |
2d21ac55 A |
142 | |
143 | for (i = 0; i < isize; i++) { | |
144 | data[i] = i; | |
145 | } | |
146 | } | |
147 | ||
148 | /* Trivial consumer: read each byte */ | |
149 | void | |
150 | reader_fn(int *data, int isize) | |
151 | { | |
0a7de745 A |
152 | int i; |
153 | int datum; | |
2d21ac55 A |
154 | |
155 | for (i = 0; i < isize; i++) { | |
156 | datum = data[i]; | |
157 | } | |
158 | } | |
159 | ||
160 | /* Consumer reading and writing the buffer */ | |
161 | void | |
162 | reader_writer_fn(int *data, int isize) | |
163 | { | |
0a7de745 | 164 | int i; |
2d21ac55 A |
165 | |
166 | for (i = 0; i < isize; i++) { | |
167 | data[i] += 1; | |
168 | } | |
169 | } | |
170 | ||
171 | /* | |
172 | * This is the central function for every thread. | |
173 | * For each invocation, its role is ets by (a pointer to) a stage_info_t. | |
174 | */ | |
175 | void * | |
176 | manager_fn(void *arg) | |
177 | { | |
0a7de745 A |
178 | stage_info_t *sp = (stage_info_t *) arg; |
179 | line_info_t *lp = sp->set; | |
180 | kern_return_t ret; | |
181 | long iteration = 0; | |
2d21ac55 A |
182 | |
183 | /* | |
184 | * If we're using affinity sets (we are by default) | |
185 | * set our tag to by our thread set number. | |
186 | */ | |
0a7de745 A |
187 | thread_extended_policy_data_t epolicy; |
188 | thread_affinity_policy_data_t policy; | |
2d21ac55 A |
189 | |
190 | epolicy.timeshare = FALSE; | |
191 | ret = thread_policy_set( | |
0a7de745 A |
192 | mach_thread_self(), THREAD_EXTENDED_POLICY, |
193 | (thread_policy_t) &epolicy, | |
194 | THREAD_EXTENDED_POLICY_COUNT); | |
195 | if (ret != KERN_SUCCESS) { | |
2d21ac55 | 196 | printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret); |
0a7de745 A |
197 | } |
198 | ||
2d21ac55 A |
199 | if (affinity) { |
200 | policy.affinity_tag = lp->setnum; | |
201 | ret = thread_policy_set( | |
0a7de745 A |
202 | mach_thread_self(), THREAD_AFFINITY_POLICY, |
203 | (thread_policy_t) &policy, | |
204 | THREAD_AFFINITY_POLICY_COUNT); | |
205 | if (ret != KERN_SUCCESS) { | |
2d21ac55 | 206 | printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret); |
0a7de745 | 207 | } |
2d21ac55 | 208 | } |
2d21ac55 A |
209 | |
210 | DBG("Starting %s set: %d stage: %d\n", sp->name, lp->setnum, sp->stagenum); | |
211 | ||
212 | /* | |
213 | * Start barrier. | |
214 | * The tets thread to get here releases everyone and starts the timer. | |
215 | */ | |
216 | pthread_mutex_lock(&funnel); | |
217 | threads_ready++; | |
218 | if (threads_ready == threads) { | |
219 | pthread_mutex_unlock(&funnel); | |
220 | if (halting) { | |
221 | printf(" all threads ready for process %d, " | |
0a7de745 | 222 | "hit any key to start", getpid()); |
2d21ac55 A |
223 | fflush(stdout); |
224 | (void) getchar(); | |
225 | } | |
226 | pthread_cond_broadcast(&barrier); | |
227 | timer = mach_absolute_time(); | |
228 | } else { | |
229 | pthread_cond_wait(&barrier, &funnel); | |
230 | pthread_mutex_unlock(&funnel); | |
231 | } | |
232 | ||
233 | do { | |
0a7de745 A |
234 | int i; |
235 | work_t *workp; | |
2d21ac55 A |
236 | |
237 | /* | |
238 | * Get a buffer from the input queue. | |
239 | * Block if none. | |
240 | */ | |
241 | pthread_mutex_lock(&sp->input->mtx); | |
242 | while (1) { | |
243 | workp = TAILQ_FIRST(&(sp->input->queue)); | |
0a7de745 | 244 | if (workp != NULL) { |
2d21ac55 | 245 | break; |
0a7de745 | 246 | } |
2d21ac55 | 247 | DBG(" %s[%d,%d] iteration %d waiting for buffer\n", |
0a7de745 | 248 | sp->name, lp->setnum, sp->stagenum, iteration); |
2d21ac55 A |
249 | sp->input->waiters = TRUE; |
250 | pthread_cond_wait(&sp->input->cnd, &sp->input->mtx); | |
251 | sp->input->waiters = FALSE; | |
252 | } | |
253 | TAILQ_REMOVE(&(sp->input->queue), workp, link); | |
254 | pthread_mutex_unlock(&sp->input->mtx); | |
255 | ||
256 | DBG(" %s[%d,%d] iteration %d work %p data %p\n", | |
0a7de745 | 257 | sp->name, lp->setnum, sp->stagenum, iteration, workp, workp->data); |
2d21ac55 A |
258 | |
259 | /* Do our stuff with the buffer */ | |
260 | (void) sp->fn(workp->data, lp->isize); | |
261 | ||
262 | /* | |
263 | * Place the buffer on the input queue. | |
264 | * Signal waiters if required. | |
265 | */ | |
266 | pthread_mutex_lock(&sp->output->mtx); | |
267 | TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link); | |
268 | if (sp->output->waiters) { | |
269 | DBG(" %s[%d,%d] iteration %d signaling work\n", | |
0a7de745 | 270 | sp->name, lp->setnum, sp->stagenum, iteration); |
2d21ac55 A |
271 | pthread_cond_signal(&sp->output->cnd); |
272 | } | |
273 | pthread_mutex_unlock(&sp->output->mtx); | |
274 | } while (++iteration < iterations); | |
275 | ||
276 | DBG("Ending %s[%d,%d]\n", sp->name, lp->setnum, sp->stagenum); | |
277 | ||
278 | return (void *) iteration; | |
279 | } | |
280 | ||
0a7de745 | 281 | #define MAX_CACHE_DEPTH 10 |
2d21ac55 A |
282 | static void |
283 | auto_config(int npages, int *nbufs, int *nsets) | |
284 | { | |
0a7de745 A |
285 | size_t len; |
286 | int ncpu; | |
287 | int llc; | |
288 | int64_t cacheconfig[MAX_CACHE_DEPTH]; | |
289 | int64_t cachesize[MAX_CACHE_DEPTH]; | |
2d21ac55 A |
290 | |
291 | mutter("Autoconfiguring...\n"); | |
292 | ||
293 | len = sizeof(cacheconfig); | |
294 | if (sysctlbyname("hw.cacheconfig", | |
0a7de745 | 295 | &cacheconfig[0], &len, NULL, 0) != 0) { |
2d21ac55 A |
296 | printf("Unable to get hw.cacheconfig, %d\n", errno); |
297 | exit(1); | |
298 | } | |
299 | len = sizeof(cachesize); | |
300 | if (sysctlbyname("hw.cachesize", | |
0a7de745 | 301 | &cachesize[0], &len, NULL, 0) != 0) { |
2d21ac55 A |
302 | printf("Unable to get hw.cachesize, %d\n", errno); |
303 | exit(1); | |
304 | } | |
305 | ||
593a1d5f A |
306 | /* |
307 | * Find LLC | |
308 | */ | |
0a7de745 A |
309 | for (llc = MAX_CACHE_DEPTH - 1; llc > 0; llc--) { |
310 | if (cacheconfig[llc] != 0) { | |
593a1d5f | 311 | break; |
0a7de745 A |
312 | } |
313 | } | |
593a1d5f | 314 | |
2d21ac55 A |
315 | /* |
316 | * Calculate number of buffers of size pages*4096 bytes | |
317 | * fit into 90% of an L2 cache. | |
318 | */ | |
593a1d5f A |
319 | *nbufs = cachesize[llc] * 9 / (npages * 4096 * 10); |
320 | mutter(" L%d (LLC) cache %qd bytes: " | |
0a7de745 A |
321 | "using %d buffers of size %d bytes\n", |
322 | llc, cachesize[llc], *nbufs, (npages * 4096)); | |
2d21ac55 | 323 | |
0a7de745 | 324 | /* |
2d21ac55 A |
325 | * Calcalute how many sets: |
326 | */ | |
0a7de745 | 327 | *nsets = cacheconfig[0] / cacheconfig[llc]; |
593a1d5f | 328 | mutter(" %qd cpus; %qd cpus per L%d cache: using %d sets\n", |
0a7de745 | 329 | cacheconfig[0], cacheconfig[llc], llc, *nsets); |
2d21ac55 A |
330 | } |
331 | ||
332 | void (*producer_fnp)(int *data, int isize) = &writer_fn; | |
333 | void (*consumer_fnp)(int *data, int isize) = &reader_fn; | |
334 | ||
335 | int | |
336 | main(int argc, char *argv[]) | |
337 | { | |
0a7de745 A |
338 | int i; |
339 | int j; | |
340 | int pages = 256; /* 1MB */ | |
341 | int buffers = 2; | |
342 | int sets = 2; | |
343 | int stages = 2; | |
344 | int *status; | |
345 | line_info_t *line_info; | |
346 | line_info_t *lp; | |
347 | stage_info_t *stage_info; | |
348 | stage_info_t *sp; | |
349 | kern_return_t ret; | |
350 | int c; | |
2d21ac55 A |
351 | |
352 | /* Do switch parsing: */ | |
0a7de745 | 353 | while ((c = getopt(argc, argv, "ab:chi:p:s:twv:")) != -1) { |
2d21ac55 A |
354 | switch (c) { |
355 | case 'a': | |
2d21ac55 A |
356 | affinity = !affinity; |
357 | break; | |
2d21ac55 A |
358 | case 'b': |
359 | buffers = atoi(optarg); | |
360 | break; | |
361 | case 'c': | |
362 | cache_config = TRUE; | |
363 | break; | |
364 | case 'i': | |
365 | iterations = atoi(optarg); | |
366 | break; | |
367 | case 'p': | |
368 | pages = atoi(optarg); | |
369 | break; | |
370 | case 's': | |
371 | stages = atoi(optarg); | |
0a7de745 | 372 | if (stages >= WORKERS_MAX) { |
2d21ac55 | 373 | usage(); |
0a7de745 | 374 | } |
2d21ac55 A |
375 | break; |
376 | case 't': | |
377 | halting = TRUE; | |
378 | break; | |
379 | case 'w': | |
380 | consumer_fnp = &reader_writer_fn; | |
381 | break; | |
382 | case 'v': | |
383 | verbosity = atoi(optarg); | |
384 | break; | |
385 | case '?': | |
386 | case 'h': | |
387 | default: | |
388 | usage(); | |
389 | } | |
390 | } | |
391 | argc -= optind; argv += optind; | |
0a7de745 | 392 | if (argc > 0) { |
2d21ac55 | 393 | sets = atoi(*argv); |
0a7de745 | 394 | } |
2d21ac55 | 395 | |
0a7de745 | 396 | if (cache_config) { |
2d21ac55 | 397 | auto_config(pages, &buffers, &sets); |
0a7de745 | 398 | } |
2d21ac55 A |
399 | |
400 | pthread_mutex_init(&funnel, NULL); | |
401 | pthread_cond_init(&barrier, NULL); | |
402 | ||
403 | /* | |
0a7de745 | 404 | * Fire up the worker threads. |
2d21ac55 A |
405 | */ |
406 | threads = sets * stages; | |
407 | mutter("Launching %d set%s of %d threads with %saffinity, " | |
0a7de745 A |
408 | "consumer reads%s data\n", |
409 | sets, s_if_plural(sets), stages, affinity? "": "no ", | |
410 | (consumer_fnp == &reader_writer_fn)? " and writes" : ""); | |
411 | if (pages < 256) { | |
2d21ac55 | 412 | mutter(" %dkB bytes per buffer, ", pages * 4); |
0a7de745 | 413 | } else { |
2d21ac55 | 414 | mutter(" %dMB bytes per buffer, ", pages / 256); |
0a7de745 | 415 | } |
2d21ac55 | 416 | mutter("%d buffer%s per set ", |
0a7de745 A |
417 | buffers, s_if_plural(buffers)); |
418 | if (buffers * pages < 256) { | |
2d21ac55 | 419 | mutter("(total %dkB)\n", buffers * pages * 4); |
0a7de745 | 420 | } else { |
2d21ac55 | 421 | mutter("(total %dMB)\n", buffers * pages / 256); |
0a7de745 | 422 | } |
2d21ac55 | 423 | mutter(" processing %d buffer%s...\n", |
0a7de745 | 424 | iterations, s_if_plural(iterations)); |
2d21ac55 A |
425 | line_info = (line_info_t *) malloc(sets * sizeof(line_info_t)); |
426 | stage_info = (stage_info_t *) malloc(sets * stages * sizeof(stage_info_t)); | |
427 | for (i = 0; i < sets; i++) { | |
0a7de745 | 428 | work_t *work_array; |
2d21ac55 A |
429 | |
430 | lp = &line_info[i]; | |
431 | ||
432 | lp->setnum = i + 1; | |
433 | lp->isize = pages * 4096 / sizeof(int); | |
434 | lp->data = (int *) malloc(buffers * pages * 4096); | |
435 | ||
436 | /* Set up the queue for the workers of this thread set: */ | |
437 | for (j = 0; j < stages; j++) { | |
0a7de745 | 438 | sp = &stage_info[(i * stages) + j]; |
2d21ac55 A |
439 | sp->stagenum = j; |
440 | sp->set = lp; | |
441 | lp->stage[j] = sp; | |
442 | pthread_mutex_init(&sp->bufq.mtx, NULL); | |
443 | pthread_cond_init(&sp->bufq.cnd, NULL); | |
444 | TAILQ_INIT(&sp->bufq.queue); | |
445 | sp->bufq.waiters = FALSE; | |
446 | } | |
447 | ||
448 | /* | |
449 | * Take a second pass through the stages | |
450 | * to define what the workers are and to interconnect their input/outputs | |
451 | */ | |
452 | for (j = 0; j < stages; j++) { | |
453 | sp = lp->stage[j]; | |
454 | if (j == 0) { | |
455 | sp->fn = producer_fnp; | |
456 | sp->name = "producer"; | |
457 | } else { | |
458 | sp->fn = consumer_fnp; | |
459 | sp->name = "consumer"; | |
460 | } | |
461 | sp->input = &lp->stage[j]->bufq; | |
462 | sp->output = &lp->stage[(j + 1) % stages]->bufq; | |
463 | } | |
464 | ||
465 | /* Set up the buffers on the first worker of the set. */ | |
466 | work_array = (work_t *) malloc(buffers * sizeof(work_t)); | |
467 | for (j = 0; j < buffers; j++) { | |
0a7de745 | 468 | work_array[j].data = lp->data + (lp->isize * j); |
2d21ac55 A |
469 | TAILQ_INSERT_TAIL(&lp->stage[0]->bufq.queue, &work_array[j], link); |
470 | DBG(" empty work item %p for set %d data %p\n", | |
0a7de745 | 471 | &work_array[j], i, work_array[j].data); |
2d21ac55 A |
472 | } |
473 | ||
474 | /* Create this set of threads */ | |
475 | for (j = 0; j < stages; j++) { | |
476 | if (ret = pthread_create(&lp->stage[j]->thread, NULL, | |
0a7de745 A |
477 | &manager_fn, |
478 | (void *) lp->stage[j])) { | |
479 | err(1, "pthread_create %d,%d", i, j); | |
480 | } | |
2d21ac55 A |
481 | } |
482 | } | |
483 | ||
484 | /* | |
485 | * We sit back anf wait for the slave to finish. | |
486 | */ | |
487 | for (i = 0; i < sets; i++) { | |
488 | lp = &line_info[i]; | |
489 | for (j = 0; j < stages; j++) { | |
0a7de745 A |
490 | if (ret = pthread_join(lp->stage[j]->thread, (void **)&status)) { |
491 | err(1, "pthread_join %d,%d", i, j); | |
492 | } | |
2d21ac55 A |
493 | DBG("Thread %d,%d status %d\n", i, j, status); |
494 | } | |
495 | } | |
496 | ||
497 | /* | |
498 | * See how long the work took. | |
499 | */ | |
500 | timer = mach_absolute_time() - timer; | |
501 | timer = timer / 1000000ULL; | |
502 | printf("%d.%03d seconds elapsed.\n", | |
0a7de745 | 503 | (int) (timer / 1000ULL), (int) (timer % 1000ULL)); |
2d21ac55 A |
504 | |
505 | return 0; | |
506 | } |