--- /dev/null
+/*
+ * Copyright (c) 2017-2017 Apple Inc. All rights reserved.
+ *
+ * @APPLE_APACHE_LICENSE_HEADER_START@
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * @APPLE_APACHE_LICENSE_HEADER_END@
+ */
+
+#include "internal.h"
+
+#if DISPATCH_USE_INTERNAL_WORKQUEUE
+
+/*
+ * dispatch_workq monitors the thread pool that is
+ * executing the work enqueued on libdispatch's pthread
+ * root queues and dynamically adjusts its size.
+ *
+ * The dynamic monitoring could be implemented using either
+ * (a) low-frequency user-level approximation of the number of runnable
+ * worker threads via reading the /proc file system
+ * (b) a Linux kernel extension that hooks the process change handler
+ * to accurately track the number of runnable normal worker threads
+ * This file provides an implementation of option (a).
+ *
+ * Using either form of monitoring, if (i) there appears to be
+ * work available in the monitored pthread root queue, (ii) the
+ * number of runnable workers is below the target size for the pool,
+ * and (iii) the total number of worker threads is below an upper limit,
+ * then an additional worker thread will be added to the pool.
+ */
+
+#pragma mark static data for monitoring subsystem
+
+/*
+ * State for the user-level monitoring of a workqueue.
+ */
+typedef struct dispatch_workq_monitor_s {
+ /* The dispatch_queue we are monitoring */
+ dispatch_queue_t dq;
+
+ /* The observed number of runnable worker threads */
+ int32_t num_runnable;
+
+ /* The desired number of runnable worker threads */
+ int32_t target_runnable;
+
+ /*
+ * Tracking of registered workers; all accesses must hold lock.
+ * Invariant: registered_tids[0]...registered_tids[num_registered_tids-1]
+ * contain the dispatch_tids of the worker threads we are monitoring.
+ */
+ dispatch_unfair_lock_s registered_tid_lock;
+ dispatch_tid *registered_tids;
+ int num_registered_tids;
+} dispatch_workq_monitor_s, *dispatch_workq_monitor_t;
+
+static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_MAX];
+
+#pragma mark Implementation of the monitoring subsystem.
+
+#define WORKQ_MAX_TRACKED_TIDS DISPATCH_WORKQ_MAX_PTHREAD_COUNT
+#define WORKQ_OVERSUBSCRIBE_FACTOR 2
+
+static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED);
+static dispatch_once_t _dispatch_workq_init_once_pred;
+
+void
+_dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls)
+{
+ dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once);
+
+#if HAVE_DISPATCH_WORKQ_MONITORING
+ dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
+ dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
+ dispatch_assert(mon->dq == root_q);
+ dispatch_tid tid = _dispatch_tid_self();
+ _dispatch_unfair_lock_lock(&mon->registered_tid_lock);
+ dispatch_assert(mon->num_registered_tids < WORKQ_MAX_TRACKED_TIDS-1);
+ int worker_id = mon->num_registered_tids++;
+ mon->registered_tids[worker_id] = tid;
+ _dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
+#endif // HAVE_DISPATCH_WORKQ_MONITORING
+}
+
+void
+_dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls)
+{
+#if HAVE_DISPATCH_WORKQ_MONITORING
+ dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
+ dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
+ dispatch_assert(mon->dq == root_q);
+ dispatch_tid tid = _dispatch_tid_self();
+ _dispatch_unfair_lock_lock(&mon->registered_tid_lock);
+ for (int i = 0; i < mon->num_registered_tids; i++) {
+ if (mon->registered_tids[i] == tid) {
+ int last = mon->num_registered_tids - 1;
+ mon->registered_tids[i] = mon->registered_tids[last];
+ mon->registered_tids[last] = 0;
+ mon->num_registered_tids--;
+ break;
+ }
+ }
+ _dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
+#endif // HAVE_DISPATCH_WORKQ_MONITORING
+}
+
+
+#if HAVE_DISPATCH_WORKQ_MONITORING
+#if defined(__linux__)
+/*
+ * For each pid that is a registered worker, read /proc/[pid]/stat
+ * to get a count of the number of them that are actually runnable.
+ * See the proc(5) man page for the format of the contents of /proc/[pid]/stat
+ */
+static void
+_dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon)
+{
+ char path[128];
+ char buf[4096];
+ int running_count = 0;
+
+ _dispatch_unfair_lock_lock(&mon->registered_tid_lock);
+
+ for (int i = 0; i < mon->num_registered_tids; i++) {
+ dispatch_tid tid = mon->registered_tids[i];
+ int fd;
+ ssize_t bytes_read = -1;
+
+ int r = snprintf(path, sizeof(path), "/proc/%d/stat", tid);
+ dispatch_assert(r > 0 && r < (int)sizeof(path));
+
+ fd = open(path, O_RDONLY | O_NONBLOCK);
+ if (unlikely(fd == -1)) {
+ DISPATCH_CLIENT_CRASH(tid,
+ "workq: registered worker exited prematurely");
+ } else {
+ bytes_read = read(fd, buf, sizeof(buf)-1);
+ (void)close(fd);
+ }
+
+ if (bytes_read > 0) {
+ buf[bytes_read] = '\0';
+ char state;
+ if (sscanf(buf, "%*d %*s %c", &state) == 1) {
+ // _dispatch_debug("workq: Worker %d, state %c\n", tid, state);
+ if (state == 'R') {
+ running_count++;
+ }
+ } else {
+ _dispatch_debug("workq: sscanf of state failed for %d", tid);
+ }
+ } else {
+ _dispatch_debug("workq: Failed to read %s", path);
+ }
+ }
+
+ mon->num_runnable = running_count;
+
+ _dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
+}
+#else
+#error must define _dispatch_workq_count_runnable_workers
+#endif
+
+static void
+_dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
+{
+ int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * (int)dispatch_hw_config(active_cpus);
+ int global_runnable = 0;
+ for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
+ dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
+ dispatch_queue_t dq = mon->dq;
+
+ if (!_dispatch_queue_class_probe(dq)) {
+ _dispatch_debug("workq: %s is empty.", dq->dq_label);
+ continue;
+ }
+
+ _dispatch_workq_count_runnable_workers(mon);
+ _dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
+ dq->dq_label, mon->num_runnable, mon->target_runnable);
+
+ global_runnable += mon->num_runnable;
+
+ if (mon->num_runnable == 0) {
+ // We have work, but no worker is runnable.
+ // It is likely the program is stalled. Therefore treat
+ // this as if dq were an overcommit queue and call poke
+ // with the limit being the maximum number of workers for dq.
+ int32_t floor = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
+ _dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
+ dq->dq_label, floor);
+ _dispatch_global_queue_poke(dq, 1, floor);
+ global_runnable += 1; // account for poke in global estimate
+ } else if (mon->num_runnable < mon->target_runnable &&
+ global_runnable < global_soft_max) {
+ // We are below target, but some workers are still runnable.
+ // We want to oversubscribe to hit the desired load target.
+ // However, this under-utilization may be transitory so set the
+ // floor as a small multiple of threads per core.
+ int32_t floor = (1 - WORKQ_OVERSUBSCRIBE_FACTOR) * mon->target_runnable;
+ int32_t floor2 = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
+ floor = MAX(floor, floor2);
+ _dispatch_debug("workq: %s under utilization target; poking with floor %d",
+ dq->dq_label, floor);
+ _dispatch_global_queue_poke(dq, 1, floor);
+ global_runnable += 1; // account for poke in global estimate
+ }
+ }
+}
+#endif // HAVE_DISPATCH_WORKQ_MONITORING
+
+static void
+_dispatch_workq_init_once(void *context DISPATCH_UNUSED)
+{
+#if HAVE_DISPATCH_WORKQ_MONITORING
+ int target_runnable = (int)dispatch_hw_config(active_cpus);
+ for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
+ dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
+ mon->dq = _dispatch_get_root_queue(i, false);
+ void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid));
+ mon->registered_tids = buf;
+ mon->target_runnable = target_runnable;
+ }
+
+ // Create monitoring timer that will periodically run on dispatch_mgr_q
+ dispatch_source_t ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER,
+ 0, 0, &_dispatch_mgr_q);
+ dispatch_source_set_timer(ds, dispatch_time(DISPATCH_TIME_NOW, 0),
+ NSEC_PER_SEC, 0);
+ dispatch_source_set_event_handler_f(ds, _dispatch_workq_monitor_pools);
+ dispatch_set_context(ds, ds); // avoid appearing as leaked
+ dispatch_activate(ds);
+#endif // HAVE_DISPATCH_WORKQ_MONITORING
+}
+
+#endif // DISPATCH_USE_INTERNAL_WORKQUEUE