2 * Copyright (c) 2017-2017 Apple Inc. All rights reserved.
4 * @APPLE_APACHE_LICENSE_HEADER_START@
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * @APPLE_APACHE_LICENSE_HEADER_END@
23 #if DISPATCH_USE_INTERNAL_WORKQUEUE
26 * dispatch_workq monitors the thread pool that is
27 * executing the work enqueued on libdispatch's pthread
28 * root queues and dynamically adjusts its size.
30 * The dynamic monitoring could be implemented using either
31 * (a) low-frequency user-level approximation of the number of runnable
32 * worker threads via reading the /proc file system
33 * (b) a Linux kernel extension that hooks the process change handler
34 * to accurately track the number of runnable normal worker threads
35 * This file provides an implementation of option (a).
37 * Using either form of monitoring, if (i) there appears to be
38 * work available in the monitored pthread root queue, (ii) the
39 * number of runnable workers is below the target size for the pool,
40 * and (iii) the total number of worker threads is below an upper limit,
41 * then an additional worker thread will be added to the pool.
44 #pragma mark static data for monitoring subsystem
47 * State for the user-level monitoring of a workqueue.
49 typedef struct dispatch_workq_monitor_s
{
50 /* The dispatch_queue we are monitoring */
53 /* The observed number of runnable worker threads */
56 /* The desired number of runnable worker threads */
57 int32_t target_runnable
;
60 * Tracking of registered workers; all accesses must hold lock.
61 * Invariant: registered_tids[0]...registered_tids[num_registered_tids-1]
62 * contain the dispatch_tids of the worker threads we are monitoring.
64 dispatch_unfair_lock_s registered_tid_lock
;
65 dispatch_tid
*registered_tids
;
66 int num_registered_tids
;
67 } dispatch_workq_monitor_s
, *dispatch_workq_monitor_t
;
69 static dispatch_workq_monitor_s _dispatch_workq_monitors
[DISPATCH_QOS_MAX
];
71 #pragma mark Implementation of the monitoring subsystem.
73 #define WORKQ_MAX_TRACKED_TIDS DISPATCH_WORKQ_MAX_PTHREAD_COUNT
74 #define WORKQ_OVERSUBSCRIBE_FACTOR 2
76 static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED
);
77 static dispatch_once_t _dispatch_workq_init_once_pred
;
80 _dispatch_workq_worker_register(dispatch_queue_t root_q
, qos_class_t cls
)
82 dispatch_once_f(&_dispatch_workq_init_once_pred
, NULL
, &_dispatch_workq_init_once
);
84 #if HAVE_DISPATCH_WORKQ_MONITORING
85 dispatch_qos_t qos
= _dispatch_qos_from_qos_class(cls
);
86 dispatch_workq_monitor_t mon
= &_dispatch_workq_monitors
[qos
-1];
87 dispatch_assert(mon
->dq
== root_q
);
88 dispatch_tid tid
= _dispatch_tid_self();
89 _dispatch_unfair_lock_lock(&mon
->registered_tid_lock
);
90 dispatch_assert(mon
->num_registered_tids
< WORKQ_MAX_TRACKED_TIDS
-1);
91 int worker_id
= mon
->num_registered_tids
++;
92 mon
->registered_tids
[worker_id
] = tid
;
93 _dispatch_unfair_lock_unlock(&mon
->registered_tid_lock
);
94 #endif // HAVE_DISPATCH_WORKQ_MONITORING
98 _dispatch_workq_worker_unregister(dispatch_queue_t root_q
, qos_class_t cls
)
100 #if HAVE_DISPATCH_WORKQ_MONITORING
101 dispatch_qos_t qos
= _dispatch_qos_from_qos_class(cls
);
102 dispatch_workq_monitor_t mon
= &_dispatch_workq_monitors
[qos
-1];
103 dispatch_assert(mon
->dq
== root_q
);
104 dispatch_tid tid
= _dispatch_tid_self();
105 _dispatch_unfair_lock_lock(&mon
->registered_tid_lock
);
106 for (int i
= 0; i
< mon
->num_registered_tids
; i
++) {
107 if (mon
->registered_tids
[i
] == tid
) {
108 int last
= mon
->num_registered_tids
- 1;
109 mon
->registered_tids
[i
] = mon
->registered_tids
[last
];
110 mon
->registered_tids
[last
] = 0;
111 mon
->num_registered_tids
--;
115 _dispatch_unfair_lock_unlock(&mon
->registered_tid_lock
);
116 #endif // HAVE_DISPATCH_WORKQ_MONITORING
120 #if HAVE_DISPATCH_WORKQ_MONITORING
121 #if defined(__linux__)
123 * For each pid that is a registered worker, read /proc/[pid]/stat
124 * to get a count of the number of them that are actually runnable.
125 * See the proc(5) man page for the format of the contents of /proc/[pid]/stat
128 _dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon
)
132 int running_count
= 0;
134 _dispatch_unfair_lock_lock(&mon
->registered_tid_lock
);
136 for (int i
= 0; i
< mon
->num_registered_tids
; i
++) {
137 dispatch_tid tid
= mon
->registered_tids
[i
];
139 ssize_t bytes_read
= -1;
141 int r
= snprintf(path
, sizeof(path
), "/proc/%d/stat", tid
);
142 dispatch_assert(r
> 0 && r
< (int)sizeof(path
));
144 fd
= open(path
, O_RDONLY
| O_NONBLOCK
);
145 if (unlikely(fd
== -1)) {
146 DISPATCH_CLIENT_CRASH(tid
,
147 "workq: registered worker exited prematurely");
149 bytes_read
= read(fd
, buf
, sizeof(buf
)-1);
153 if (bytes_read
> 0) {
154 buf
[bytes_read
] = '\0';
156 if (sscanf(buf
, "%*d %*s %c", &state
) == 1) {
157 // _dispatch_debug("workq: Worker %d, state %c\n", tid, state);
162 _dispatch_debug("workq: sscanf of state failed for %d", tid
);
165 _dispatch_debug("workq: Failed to read %s", path
);
169 mon
->num_runnable
= running_count
;
171 _dispatch_unfair_lock_unlock(&mon
->registered_tid_lock
);
174 #error must define _dispatch_workq_count_runnable_workers
178 _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED
)
180 int global_soft_max
= WORKQ_OVERSUBSCRIBE_FACTOR
* (int)dispatch_hw_config(active_cpus
);
181 int global_runnable
= 0;
182 for (dispatch_qos_t i
= DISPATCH_QOS_MAX
; i
> DISPATCH_QOS_UNSPECIFIED
; i
--) {
183 dispatch_workq_monitor_t mon
= &_dispatch_workq_monitors
[i
-1];
184 dispatch_queue_t dq
= mon
->dq
;
186 if (!_dispatch_queue_class_probe(dq
)) {
187 _dispatch_debug("workq: %s is empty.", dq
->dq_label
);
191 _dispatch_workq_count_runnable_workers(mon
);
192 _dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
193 dq
->dq_label
, mon
->num_runnable
, mon
->target_runnable
);
195 global_runnable
+= mon
->num_runnable
;
197 if (mon
->num_runnable
== 0) {
198 // We have work, but no worker is runnable.
199 // It is likely the program is stalled. Therefore treat
200 // this as if dq were an overcommit queue and call poke
201 // with the limit being the maximum number of workers for dq.
202 int32_t floor
= mon
->target_runnable
- WORKQ_MAX_TRACKED_TIDS
;
203 _dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
204 dq
->dq_label
, floor
);
205 _dispatch_global_queue_poke(dq
, 1, floor
);
206 global_runnable
+= 1; // account for poke in global estimate
207 } else if (mon
->num_runnable
< mon
->target_runnable
&&
208 global_runnable
< global_soft_max
) {
209 // We are below target, but some workers are still runnable.
210 // We want to oversubscribe to hit the desired load target.
211 // However, this under-utilization may be transitory so set the
212 // floor as a small multiple of threads per core.
213 int32_t floor
= (1 - WORKQ_OVERSUBSCRIBE_FACTOR
) * mon
->target_runnable
;
214 int32_t floor2
= mon
->target_runnable
- WORKQ_MAX_TRACKED_TIDS
;
215 floor
= MAX(floor
, floor2
);
216 _dispatch_debug("workq: %s under utilization target; poking with floor %d",
217 dq
->dq_label
, floor
);
218 _dispatch_global_queue_poke(dq
, 1, floor
);
219 global_runnable
+= 1; // account for poke in global estimate
223 #endif // HAVE_DISPATCH_WORKQ_MONITORING
226 _dispatch_workq_init_once(void *context DISPATCH_UNUSED
)
228 #if HAVE_DISPATCH_WORKQ_MONITORING
229 int target_runnable
= (int)dispatch_hw_config(active_cpus
);
230 for (dispatch_qos_t i
= DISPATCH_QOS_MAX
; i
> DISPATCH_QOS_UNSPECIFIED
; i
--) {
231 dispatch_workq_monitor_t mon
= &_dispatch_workq_monitors
[i
-1];
232 mon
->dq
= _dispatch_get_root_queue(i
, false);
233 void *buf
= _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS
, sizeof(dispatch_tid
));
234 mon
->registered_tids
= buf
;
235 mon
->target_runnable
= target_runnable
;
238 // Create monitoring timer that will periodically run on dispatch_mgr_q
239 dispatch_source_t ds
= dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER
,
240 0, 0, &_dispatch_mgr_q
);
241 dispatch_source_set_timer(ds
, dispatch_time(DISPATCH_TIME_NOW
, 0),
243 dispatch_source_set_event_handler_f(ds
, _dispatch_workq_monitor_pools
);
244 dispatch_set_context(ds
, ds
); // avoid appearing as leaked
245 dispatch_activate(ds
);
246 #endif // HAVE_DISPATCH_WORKQ_MONITORING
249 #endif // DISPATCH_USE_INTERNAL_WORKQUEUE