]>
Commit | Line | Data |
---|---|---|
2d21ac55 | 1 | #include <AvailabilityMacros.h> |
fe8ab488 | 2 | #include <mach/thread_policy.h> |
2d21ac55 A |
3 | #include <mach/mach.h> |
4 | #include <mach/mach_error.h> | |
5 | #include <mach/mach_time.h> | |
6 | #include <pthread.h> | |
7 | #include <sys/queue.h> | |
8 | #include <stdio.h> | |
9 | #include <stdlib.h> | |
10 | #include <string.h> | |
11 | #include <unistd.h> | |
12 | #include <err.h> | |
13 | ||
14 | /* | |
15 | * Pool is another multithreaded test/benchmarking program to evaluate | |
16 | * affinity set placement in Leopard. | |
17 | * | |
18 | * The basic picture is: | |
19 | * | |
20 | * -> producer -- -> consumer -- | |
21 | * free / \ work / \ | |
22 | * -> queue -- ... --> queue -- -- | |
23 | * | \ / \ / | | |
24 | * | -> producer -- -> consumer -- | | |
25 | * --------------------------------------------------------------- | |
26 | * | |
27 | * <---------- "stage" ---------> <---------- "stage" ---------> | |
28 | * | |
29 | * There are a series of work stages. Each stage has an input and an output | |
30 | * queue and multiple threads. The first stage is the producer and subsequent | |
31 | * stages are consumers. By defuaut there are 2 stages. There are N producer | |
32 | * and M consumer threads. The are B buffers per producer threads circulating | |
33 | * through the system. | |
34 | * | |
35 | * When affinity is enabled, each producer thread is tagged with an affinity tag | |
36 | * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to | |
37 | * the work queue it is tagged with this affinity. When a consumer dequeues a | |
38 | * work item, it sets its affinity to this tag. Hence consumer threads migrate | |
39 | * to the same affinity set where the data was produced. | |
40 | * | |
41 | * Buffer management uses pthread mutex/condition variables. A thread blocks | |
42 | * when no buffer is available on a queue and it is signaled when a buffer | |
43 | * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>. | |
44 | * The queue management is centralized in a single routine: what queues to | |
45 | * use as input and output and what function to call for processing is | |
46 | * data-driven. | |
47 | */ | |
0a7de745 | 48 | |
2d21ac55 | 49 | pthread_mutex_t funnel; |
0a7de745 | 50 | pthread_cond_t barrier; |
2d21ac55 | 51 | |
0a7de745 A |
52 | uint64_t timer; |
53 | int threads; | |
54 | int threads_ready = 0; | |
2d21ac55 | 55 | |
0a7de745 A |
56 | int iterations = 10000; |
57 | boolean_t affinity = FALSE; | |
58 | boolean_t halting = FALSE; | |
59 | int verbosity = 1; | |
2d21ac55 A |
60 | |
61 | typedef struct work { | |
0a7de745 A |
62 | TAILQ_ENTRY(work) link; |
63 | int *data; | |
64 | int isize; | |
65 | int tag; | |
66 | int number; | |
2d21ac55 A |
67 | } work_t; |
68 | ||
69 | /* | |
70 | * A work queue, complete with pthread objects for its management | |
71 | */ | |
72 | typedef struct work_queue { | |
0a7de745 A |
73 | pthread_mutex_t mtx; |
74 | pthread_cond_t cnd; | |
75 | TAILQ_HEAD(, work) queue; | |
76 | unsigned int waiters; | |
2d21ac55 A |
77 | } work_queue_t; |
78 | ||
79 | /* Worker functions take a integer array and size */ | |
0a7de745 | 80 | typedef void (worker_fn_t)(int *, int); |
2d21ac55 A |
81 | |
82 | /* This struct controls the function of a stage */ | |
83 | #define WORKERS_MAX 10 | |
84 | typedef struct { | |
0a7de745 A |
85 | int stagenum; |
86 | char *name; | |
87 | worker_fn_t *fn; | |
88 | work_queue_t *input; | |
89 | work_queue_t *output; | |
90 | work_queue_t bufq; | |
91 | int work_todo; | |
2d21ac55 A |
92 | } stage_info_t; |
93 | ||
94 | /* This defines a worker thread */ | |
95 | typedef struct worker_info { | |
0a7de745 A |
96 | int setnum; |
97 | stage_info_t *stage; | |
98 | pthread_t thread; | |
2d21ac55 A |
99 | } worker_info_t; |
100 | ||
0a7de745 A |
101 | #define DBG(x...) do { \ |
102 | if (verbosity > 1) { \ | |
103 | pthread_mutex_lock(&funnel); \ | |
104 | printf(x); \ | |
105 | pthread_mutex_unlock(&funnel); \ | |
106 | } \ | |
2d21ac55 A |
107 | } while (0) |
108 | ||
0a7de745 A |
109 | #define mutter(x...) do { \ |
110 | if (verbosity > 0) { \ | |
111 | printf(x); \ | |
112 | } \ | |
2d21ac55 A |
113 | } while (0) |
114 | ||
0a7de745 | 115 | #define s_if_plural(x) (((x) > 1) ? "s" : "") |
2d21ac55 A |
116 | |
117 | static void | |
118 | usage() | |
119 | { | |
120 | fprintf(stderr, | |
0a7de745 A |
121 | "usage: pool [-a] Turn affinity on (off)\n" |
122 | " [-b B] Number of buffers per producer (2)\n" | |
123 | " [-i I] Number of buffers to produce (10000)\n" | |
124 | " [-s S] Number of stages (2)\n" | |
125 | " [-p P] Number of pages per buffer (256=1MB)]\n" | |
126 | " [-w] Consumer writes data\n" | |
127 | " [-v V] Verbosity level 0..2 (1)\n" | |
128 | " [N [M]] Number of producer and consumers (2)\n" | |
129 | ); | |
2d21ac55 A |
130 | exit(1); |
131 | } | |
132 | ||
133 | /* Trivial producer: write to each byte */ | |
134 | void | |
135 | writer_fn(int *data, int isize) | |
136 | { | |
0a7de745 | 137 | int i; |
2d21ac55 A |
138 | |
139 | for (i = 0; i < isize; i++) { | |
140 | data[i] = i; | |
141 | } | |
142 | } | |
143 | ||
144 | /* Trivial consumer: read each byte */ | |
145 | void | |
146 | reader_fn(int *data, int isize) | |
147 | { | |
0a7de745 A |
148 | int i; |
149 | int datum; | |
2d21ac55 A |
150 | |
151 | for (i = 0; i < isize; i++) { | |
152 | datum = data[i]; | |
153 | } | |
154 | } | |
155 | ||
156 | /* Consumer reading and writing the buffer */ | |
157 | void | |
158 | reader_writer_fn(int *data, int isize) | |
159 | { | |
0a7de745 | 160 | int i; |
2d21ac55 A |
161 | |
162 | for (i = 0; i < isize; i++) { | |
163 | data[i] += 1; | |
164 | } | |
165 | } | |
166 | ||
167 | void | |
168 | affinity_set(int tag) | |
169 | { | |
0a7de745 A |
170 | kern_return_t ret; |
171 | thread_affinity_policy_data_t policy; | |
2d21ac55 A |
172 | if (affinity) { |
173 | policy.affinity_tag = tag; | |
174 | ret = thread_policy_set( | |
0a7de745 A |
175 | mach_thread_self(), THREAD_AFFINITY_POLICY, |
176 | (thread_policy_t) &policy, | |
177 | THREAD_AFFINITY_POLICY_COUNT); | |
178 | if (ret != KERN_SUCCESS) { | |
2d21ac55 | 179 | printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret); |
0a7de745 | 180 | } |
2d21ac55 | 181 | } |
2d21ac55 A |
182 | } |
183 | ||
184 | /* | |
185 | * This is the central function for every thread. | |
186 | * For each invocation, its role is ets by (a pointer to) a stage_info_t. | |
187 | */ | |
188 | void * | |
189 | manager_fn(void *arg) | |
190 | { | |
0a7de745 A |
191 | worker_info_t *wp = (worker_info_t *) arg; |
192 | stage_info_t *sp = wp->stage; | |
193 | boolean_t is_producer = (sp->stagenum == 0); | |
194 | long iteration = 0; | |
195 | int current_tag = 0; | |
196 | ||
197 | kern_return_t ret; | |
198 | thread_extended_policy_data_t epolicy; | |
2d21ac55 A |
199 | epolicy.timeshare = FALSE; |
200 | ret = thread_policy_set( | |
0a7de745 A |
201 | mach_thread_self(), THREAD_EXTENDED_POLICY, |
202 | (thread_policy_t) &epolicy, | |
203 | THREAD_EXTENDED_POLICY_COUNT); | |
204 | if (ret != KERN_SUCCESS) { | |
2d21ac55 | 205 | printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret); |
0a7de745 A |
206 | } |
207 | ||
2d21ac55 A |
208 | /* |
209 | * If we're using affinity sets and we're a producer | |
210 | * set our tag to by our thread set number. | |
211 | */ | |
212 | if (affinity && is_producer) { | |
213 | affinity_set(wp->setnum); | |
214 | current_tag = wp->setnum; | |
215 | } | |
216 | ||
217 | DBG("Starting %s %d, stage: %d\n", sp->name, wp->setnum, sp->stagenum); | |
218 | ||
219 | /* | |
220 | * Start barrier. | |
221 | * The tets thread to get here releases everyone and starts the timer. | |
222 | */ | |
223 | pthread_mutex_lock(&funnel); | |
224 | threads_ready++; | |
225 | if (threads_ready == threads) { | |
226 | pthread_mutex_unlock(&funnel); | |
227 | if (halting) { | |
228 | printf(" all threads ready for process %d, " | |
0a7de745 | 229 | "hit any key to start", getpid()); |
2d21ac55 A |
230 | fflush(stdout); |
231 | (void) getchar(); | |
232 | } | |
233 | pthread_cond_broadcast(&barrier); | |
234 | timer = mach_absolute_time(); | |
235 | } else { | |
236 | pthread_cond_wait(&barrier, &funnel); | |
237 | pthread_mutex_unlock(&funnel); | |
238 | } | |
239 | ||
240 | do { | |
0a7de745 | 241 | work_t *workp; |
2d21ac55 A |
242 | |
243 | /* | |
244 | * Get a buffer from the input queue. | |
245 | * Block if none. | |
246 | * Quit if all work done. | |
247 | */ | |
248 | pthread_mutex_lock(&sp->input->mtx); | |
249 | while (1) { | |
250 | if (sp->work_todo == 0) { | |
251 | pthread_mutex_unlock(&sp->input->mtx); | |
252 | goto out; | |
253 | } | |
254 | workp = TAILQ_FIRST(&(sp->input->queue)); | |
0a7de745 | 255 | if (workp != NULL) { |
2d21ac55 | 256 | break; |
0a7de745 | 257 | } |
2d21ac55 | 258 | DBG(" %s[%d,%d] todo %d waiting for buffer\n", |
0a7de745 | 259 | sp->name, wp->setnum, sp->stagenum, sp->work_todo); |
2d21ac55 A |
260 | sp->input->waiters++; |
261 | pthread_cond_wait(&sp->input->cnd, &sp->input->mtx); | |
262 | sp->input->waiters--; | |
263 | } | |
264 | TAILQ_REMOVE(&(sp->input->queue), workp, link); | |
265 | iteration = sp->work_todo--; | |
266 | pthread_mutex_unlock(&sp->input->mtx); | |
267 | ||
268 | if (is_producer) { | |
269 | workp->number = iteration; | |
270 | workp->tag = wp->setnum; | |
271 | } else { | |
272 | if (affinity && current_tag != workp->tag) { | |
273 | affinity_set(workp->tag); | |
274 | current_tag = workp->tag; | |
275 | } | |
276 | } | |
277 | ||
278 | DBG(" %s[%d,%d] todo %d work %p data %p\n", | |
0a7de745 | 279 | sp->name, wp->setnum, sp->stagenum, iteration, workp, workp->data); |
2d21ac55 A |
280 | |
281 | /* Do our stuff with the buffer */ | |
282 | (void) sp->fn(workp->data, workp->isize); | |
283 | ||
284 | /* | |
285 | * Place the buffer on the input queue of the next stage. | |
286 | * Signal waiters if required. | |
287 | */ | |
288 | pthread_mutex_lock(&sp->output->mtx); | |
289 | TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link); | |
290 | if (sp->output->waiters) { | |
291 | DBG(" %s[%d,%d] todo %d signaling work\n", | |
0a7de745 | 292 | sp->name, wp->setnum, sp->stagenum, iteration); |
2d21ac55 A |
293 | pthread_cond_signal(&sp->output->cnd); |
294 | } | |
295 | pthread_mutex_unlock(&sp->output->mtx); | |
2d21ac55 A |
296 | } while (1); |
297 | ||
298 | out: | |
299 | pthread_cond_broadcast(&sp->output->cnd); | |
300 | ||
301 | DBG("Ending %s[%d,%d]\n", sp->name, wp->setnum, sp->stagenum); | |
302 | ||
303 | return (void *) iteration; | |
304 | } | |
305 | ||
306 | void (*producer_fnp)(int *data, int isize) = &writer_fn; | |
307 | void (*consumer_fnp)(int *data, int isize) = &reader_fn; | |
308 | ||
309 | int | |
310 | main(int argc, char *argv[]) | |
311 | { | |
0a7de745 A |
312 | int i; |
313 | int j; | |
314 | int k; | |
315 | int pages = 256; /* 1MB */ | |
316 | int buffers = 2; | |
317 | int producers = 2; | |
318 | int consumers = 2; | |
319 | int stages = 2; | |
320 | int *status; | |
321 | stage_info_t *stage_info; | |
322 | stage_info_t *sp; | |
323 | worker_info_t *worker_info; | |
324 | worker_info_t *wp; | |
325 | kern_return_t ret; | |
326 | int c; | |
2d21ac55 A |
327 | |
328 | /* Do switch parsing: */ | |
0a7de745 | 329 | while ((c = getopt(argc, argv, "ab:i:p:s:twv:")) != -1) { |
2d21ac55 A |
330 | switch (c) { |
331 | case 'a': | |
2d21ac55 A |
332 | affinity = !affinity; |
333 | break; | |
2d21ac55 A |
334 | case 'b': |
335 | buffers = atoi(optarg); | |
336 | break; | |
337 | case 'i': | |
338 | iterations = atoi(optarg); | |
339 | break; | |
340 | case 'p': | |
341 | pages = atoi(optarg); | |
342 | break; | |
343 | case 's': | |
344 | stages = atoi(optarg); | |
0a7de745 | 345 | if (stages >= WORKERS_MAX) { |
2d21ac55 | 346 | usage(); |
0a7de745 | 347 | } |
2d21ac55 A |
348 | break; |
349 | case 't': | |
350 | halting = TRUE; | |
351 | break; | |
352 | case 'w': | |
353 | consumer_fnp = &reader_writer_fn; | |
354 | break; | |
355 | case 'v': | |
356 | verbosity = atoi(optarg); | |
357 | break; | |
358 | case 'h': | |
359 | case '?': | |
360 | default: | |
361 | usage(); | |
362 | } | |
363 | } | |
364 | argc -= optind; argv += optind; | |
0a7de745 | 365 | if (argc > 0) { |
2d21ac55 | 366 | producers = atoi(*argv); |
0a7de745 | 367 | } |
2d21ac55 | 368 | argc--; argv++; |
0a7de745 | 369 | if (argc > 0) { |
2d21ac55 | 370 | consumers = atoi(*argv); |
0a7de745 A |
371 | } |
372 | ||
2d21ac55 A |
373 | pthread_mutex_init(&funnel, NULL); |
374 | pthread_cond_init(&barrier, NULL); | |
375 | ||
376 | /* | |
0a7de745 | 377 | * Fire up the worker threads. |
2d21ac55 A |
378 | */ |
379 | threads = consumers * (stages - 1) + producers; | |
380 | mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n" | |
0a7de745 A |
381 | " with %saffinity, consumer reads%s data\n", |
382 | producers, s_if_plural(producers), | |
383 | stages - 1, s_if_plural(stages - 1), | |
384 | consumers, s_if_plural(consumers), | |
385 | affinity? "": "no ", | |
386 | (consumer_fnp == &reader_writer_fn)? " and writes" : ""); | |
387 | if (pages < 256) { | |
2d21ac55 | 388 | mutter(" %dkB bytes per buffer, ", pages * 4); |
0a7de745 | 389 | } else { |
2d21ac55 | 390 | mutter(" %dMB bytes per buffer, ", pages / 256); |
0a7de745 | 391 | } |
2d21ac55 | 392 | mutter("%d buffer%s per producer ", |
0a7de745 A |
393 | buffers, s_if_plural(buffers)); |
394 | if (buffers * pages < 256) { | |
2d21ac55 | 395 | mutter("(total %dkB)\n", buffers * pages * 4); |
0a7de745 | 396 | } else { |
2d21ac55 | 397 | mutter("(total %dMB)\n", buffers * pages / 256); |
0a7de745 | 398 | } |
2d21ac55 | 399 | mutter(" processing %d buffer%s...\n", |
0a7de745 | 400 | iterations, s_if_plural(iterations)); |
2d21ac55 A |
401 | |
402 | stage_info = (stage_info_t *) malloc(stages * sizeof(stage_info_t)); | |
403 | worker_info = (worker_info_t *) malloc(threads * sizeof(worker_info_t)); | |
404 | ||
405 | /* Set up the queue for the workers of this thread set: */ | |
406 | for (i = 0; i < stages; i++) { | |
407 | sp = &stage_info[i]; | |
408 | sp->stagenum = i; | |
409 | pthread_mutex_init(&sp->bufq.mtx, NULL); | |
410 | pthread_cond_init(&sp->bufq.cnd, NULL); | |
411 | TAILQ_INIT(&sp->bufq.queue); | |
412 | sp->bufq.waiters = 0; | |
413 | if (i == 0) { | |
414 | sp->fn = producer_fnp; | |
415 | sp->name = "producer"; | |
416 | } else { | |
417 | sp->fn = consumer_fnp; | |
418 | sp->name = "consumer"; | |
419 | } | |
420 | sp->input = &sp->bufq; | |
421 | sp->output = &stage_info[(i + 1) % stages].bufq; | |
422 | stage_info[i].work_todo = iterations; | |
423 | } | |
0a7de745 | 424 | |
2d21ac55 A |
425 | /* Create the producers */ |
426 | for (i = 0; i < producers; i++) { | |
0a7de745 A |
427 | work_t *work_array; |
428 | int *data; | |
429 | int isize; | |
2d21ac55 A |
430 | |
431 | isize = pages * 4096 / sizeof(int); | |
432 | data = (int *) malloc(buffers * pages * 4096); | |
433 | ||
434 | /* Set up the empty work buffers */ | |
435 | work_array = (work_t *) malloc(buffers * sizeof(work_t)); | |
436 | for (j = 0; j < buffers; j++) { | |
0a7de745 | 437 | work_array[j].data = data + (isize * j); |
2d21ac55 A |
438 | work_array[j].isize = isize; |
439 | work_array[j].tag = 0; | |
440 | TAILQ_INSERT_TAIL(&stage_info[0].bufq.queue, &work_array[j], link); | |
441 | DBG(" empty work item %p for data %p\n", | |
0a7de745 | 442 | &work_array[j], work_array[j].data); |
2d21ac55 A |
443 | } |
444 | wp = &worker_info[i]; | |
445 | wp->setnum = i + 1; | |
446 | wp->stage = &stage_info[0]; | |
447 | if (ret = pthread_create(&wp->thread, | |
0a7de745 A |
448 | NULL, |
449 | &manager_fn, | |
450 | (void *) wp)) { | |
2d21ac55 | 451 | err(1, "pthread_create %d,%d", 0, i); |
0a7de745 | 452 | } |
2d21ac55 A |
453 | } |
454 | ||
455 | /* Create consumers */ | |
456 | for (i = 1; i < stages; i++) { | |
457 | for (j = 0; j < consumers; j++) { | |
0a7de745 | 458 | wp = &worker_info[producers + (consumers * (i - 1)) + j]; |
2d21ac55 A |
459 | wp->setnum = j + 1; |
460 | wp->stage = &stage_info[i]; | |
461 | if (ret = pthread_create(&wp->thread, | |
0a7de745 A |
462 | NULL, |
463 | &manager_fn, | |
464 | (void *) wp)) { | |
2d21ac55 | 465 | err(1, "pthread_create %d,%d", i, j); |
0a7de745 | 466 | } |
2d21ac55 A |
467 | } |
468 | } | |
469 | ||
470 | /* | |
471 | * We sit back anf wait for the slaves to finish. | |
472 | */ | |
473 | for (k = 0; k < threads; k++) { | |
0a7de745 A |
474 | int i; |
475 | int j; | |
2d21ac55 A |
476 | |
477 | wp = &worker_info[k]; | |
478 | if (k < producers) { | |
479 | i = 0; | |
480 | j = k; | |
481 | } else { | |
482 | i = (k - producers) / consumers; | |
483 | j = (k - producers) % consumers; | |
484 | } | |
0a7de745 A |
485 | if (ret = pthread_join(wp->thread, (void **)&status)) { |
486 | err(1, "pthread_join %d,%d", i, j); | |
487 | } | |
2d21ac55 A |
488 | DBG("Thread %d,%d status %d\n", i, j, status); |
489 | } | |
490 | ||
491 | /* | |
492 | * See how long the work took. | |
493 | */ | |
494 | timer = mach_absolute_time() - timer; | |
495 | timer = timer / 1000000ULL; | |
496 | printf("%d.%03d seconds elapsed.\n", | |
0a7de745 | 497 | (int) (timer / 1000ULL), (int) (timer % 1000ULL)); |
2d21ac55 A |
498 | |
499 | return 0; | |
500 | } |