]> git.saurik.com Git - apple/libdispatch.git/blob - src/event/workqueue.c
libdispatch-913.1.6.tar.gz
[apple/libdispatch.git] / src / event / workqueue.c
1 /*
2 * Copyright (c) 2017-2017 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21 #include "internal.h"
22
23 #if DISPATCH_USE_INTERNAL_WORKQUEUE
24
25 /*
26 * dispatch_workq monitors the thread pool that is
27 * executing the work enqueued on libdispatch's pthread
28 * root queues and dynamically adjusts its size.
29 *
30 * The dynamic monitoring could be implemented using either
31 * (a) low-frequency user-level approximation of the number of runnable
32 * worker threads via reading the /proc file system
33 * (b) a Linux kernel extension that hooks the process change handler
34 * to accurately track the number of runnable normal worker threads
35 * This file provides an implementation of option (a).
36 *
37 * Using either form of monitoring, if (i) there appears to be
38 * work available in the monitored pthread root queue, (ii) the
39 * number of runnable workers is below the target size for the pool,
40 * and (iii) the total number of worker threads is below an upper limit,
41 * then an additional worker thread will be added to the pool.
42 */
43
44 #pragma mark static data for monitoring subsystem
45
46 /*
47 * State for the user-level monitoring of a workqueue.
48 */
49 typedef struct dispatch_workq_monitor_s {
50 /* The dispatch_queue we are monitoring */
51 dispatch_queue_t dq;
52
53 /* The observed number of runnable worker threads */
54 int32_t num_runnable;
55
56 /* The desired number of runnable worker threads */
57 int32_t target_runnable;
58
59 /*
60 * Tracking of registered workers; all accesses must hold lock.
61 * Invariant: registered_tids[0]...registered_tids[num_registered_tids-1]
62 * contain the dispatch_tids of the worker threads we are monitoring.
63 */
64 dispatch_unfair_lock_s registered_tid_lock;
65 dispatch_tid *registered_tids;
66 int num_registered_tids;
67 } dispatch_workq_monitor_s, *dispatch_workq_monitor_t;
68
69 static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_MAX];
70
71 #pragma mark Implementation of the monitoring subsystem.
72
73 #define WORKQ_MAX_TRACKED_TIDS DISPATCH_WORKQ_MAX_PTHREAD_COUNT
74 #define WORKQ_OVERSUBSCRIBE_FACTOR 2
75
76 static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED);
77 static dispatch_once_t _dispatch_workq_init_once_pred;
78
79 void
80 _dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls)
81 {
82 dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once);
83
84 #if HAVE_DISPATCH_WORKQ_MONITORING
85 dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
86 dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
87 dispatch_assert(mon->dq == root_q);
88 dispatch_tid tid = _dispatch_tid_self();
89 _dispatch_unfair_lock_lock(&mon->registered_tid_lock);
90 dispatch_assert(mon->num_registered_tids < WORKQ_MAX_TRACKED_TIDS-1);
91 int worker_id = mon->num_registered_tids++;
92 mon->registered_tids[worker_id] = tid;
93 _dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
94 #endif // HAVE_DISPATCH_WORKQ_MONITORING
95 }
96
97 void
98 _dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls)
99 {
100 #if HAVE_DISPATCH_WORKQ_MONITORING
101 dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
102 dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
103 dispatch_assert(mon->dq == root_q);
104 dispatch_tid tid = _dispatch_tid_self();
105 _dispatch_unfair_lock_lock(&mon->registered_tid_lock);
106 for (int i = 0; i < mon->num_registered_tids; i++) {
107 if (mon->registered_tids[i] == tid) {
108 int last = mon->num_registered_tids - 1;
109 mon->registered_tids[i] = mon->registered_tids[last];
110 mon->registered_tids[last] = 0;
111 mon->num_registered_tids--;
112 break;
113 }
114 }
115 _dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
116 #endif // HAVE_DISPATCH_WORKQ_MONITORING
117 }
118
119
120 #if HAVE_DISPATCH_WORKQ_MONITORING
121 #if defined(__linux__)
122 /*
123 * For each pid that is a registered worker, read /proc/[pid]/stat
124 * to get a count of the number of them that are actually runnable.
125 * See the proc(5) man page for the format of the contents of /proc/[pid]/stat
126 */
127 static void
128 _dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon)
129 {
130 char path[128];
131 char buf[4096];
132 int running_count = 0;
133
134 _dispatch_unfair_lock_lock(&mon->registered_tid_lock);
135
136 for (int i = 0; i < mon->num_registered_tids; i++) {
137 dispatch_tid tid = mon->registered_tids[i];
138 int fd;
139 ssize_t bytes_read = -1;
140
141 int r = snprintf(path, sizeof(path), "/proc/%d/stat", tid);
142 dispatch_assert(r > 0 && r < (int)sizeof(path));
143
144 fd = open(path, O_RDONLY | O_NONBLOCK);
145 if (unlikely(fd == -1)) {
146 DISPATCH_CLIENT_CRASH(tid,
147 "workq: registered worker exited prematurely");
148 } else {
149 bytes_read = read(fd, buf, sizeof(buf)-1);
150 (void)close(fd);
151 }
152
153 if (bytes_read > 0) {
154 buf[bytes_read] = '\0';
155 char state;
156 if (sscanf(buf, "%*d %*s %c", &state) == 1) {
157 // _dispatch_debug("workq: Worker %d, state %c\n", tid, state);
158 if (state == 'R') {
159 running_count++;
160 }
161 } else {
162 _dispatch_debug("workq: sscanf of state failed for %d", tid);
163 }
164 } else {
165 _dispatch_debug("workq: Failed to read %s", path);
166 }
167 }
168
169 mon->num_runnable = running_count;
170
171 _dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
172 }
173 #else
174 #error must define _dispatch_workq_count_runnable_workers
175 #endif
176
177 static void
178 _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
179 {
180 int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * (int)dispatch_hw_config(active_cpus);
181 int global_runnable = 0;
182 for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
183 dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
184 dispatch_queue_t dq = mon->dq;
185
186 if (!_dispatch_queue_class_probe(dq)) {
187 _dispatch_debug("workq: %s is empty.", dq->dq_label);
188 continue;
189 }
190
191 _dispatch_workq_count_runnable_workers(mon);
192 _dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
193 dq->dq_label, mon->num_runnable, mon->target_runnable);
194
195 global_runnable += mon->num_runnable;
196
197 if (mon->num_runnable == 0) {
198 // We have work, but no worker is runnable.
199 // It is likely the program is stalled. Therefore treat
200 // this as if dq were an overcommit queue and call poke
201 // with the limit being the maximum number of workers for dq.
202 int32_t floor = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
203 _dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
204 dq->dq_label, floor);
205 _dispatch_global_queue_poke(dq, 1, floor);
206 global_runnable += 1; // account for poke in global estimate
207 } else if (mon->num_runnable < mon->target_runnable &&
208 global_runnable < global_soft_max) {
209 // We are below target, but some workers are still runnable.
210 // We want to oversubscribe to hit the desired load target.
211 // However, this under-utilization may be transitory so set the
212 // floor as a small multiple of threads per core.
213 int32_t floor = (1 - WORKQ_OVERSUBSCRIBE_FACTOR) * mon->target_runnable;
214 int32_t floor2 = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
215 floor = MAX(floor, floor2);
216 _dispatch_debug("workq: %s under utilization target; poking with floor %d",
217 dq->dq_label, floor);
218 _dispatch_global_queue_poke(dq, 1, floor);
219 global_runnable += 1; // account for poke in global estimate
220 }
221 }
222 }
223 #endif // HAVE_DISPATCH_WORKQ_MONITORING
224
225 static void
226 _dispatch_workq_init_once(void *context DISPATCH_UNUSED)
227 {
228 #if HAVE_DISPATCH_WORKQ_MONITORING
229 int target_runnable = (int)dispatch_hw_config(active_cpus);
230 for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
231 dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
232 mon->dq = _dispatch_get_root_queue(i, false);
233 void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid));
234 mon->registered_tids = buf;
235 mon->target_runnable = target_runnable;
236 }
237
238 // Create monitoring timer that will periodically run on dispatch_mgr_q
239 dispatch_source_t ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER,
240 0, 0, &_dispatch_mgr_q);
241 dispatch_source_set_timer(ds, dispatch_time(DISPATCH_TIME_NOW, 0),
242 NSEC_PER_SEC, 0);
243 dispatch_source_set_event_handler_f(ds, _dispatch_workq_monitor_pools);
244 dispatch_set_context(ds, ds); // avoid appearing as leaked
245 dispatch_activate(ds);
246 #endif // HAVE_DISPATCH_WORKQ_MONITORING
247 }
248
249 #endif // DISPATCH_USE_INTERNAL_WORKQUEUE