+ struct stat *sb = (struct stat *)ub;
+
+ bzero((void *)sb, sizeof(*sb));
+ sb->st_size = kq->kq_count;
+ if (kq->kq_state & KQ_KEV_QOS)
+ sb->st_blksize = sizeof(struct kevent_qos_s);
+ else if (kq->kq_state & KQ_KEV64)
+ sb->st_blksize = sizeof(struct kevent64_s);
+ else if (IS_64BIT_PROCESS(p))
+ sb->st_blksize = sizeof(struct user64_kevent);
+ else
+ sb->st_blksize = sizeof(struct user32_kevent);
+ sb->st_mode = S_IFIFO;
+ }
+ kqunlock(kq);
+ return (0);
+}
+
+
+/*
+ * Interact with the pthread kext to request a servicing there.
+ * Eventually, this will request threads at specific QoS levels.
+ * For now, it only requests a dispatch-manager-QoS thread, and
+ * only one-at-a-time.
+ *
+ * - Caller holds the workq request lock
+ *
+ * - May be called with the kqueue's wait queue set locked,
+ * so cannot do anything that could recurse on that.
+ */
+static void
+kqworkq_request_thread(
+ struct kqworkq *kqwq,
+ kq_index_t qos_index)
+{
+ struct kqrequest *kqr;
+
+ assert(kqwq->kqwq_state & KQ_WORKQ);
+ assert(qos_index < KQWQ_NQOS);
+
+ kqr = kqworkq_get_request(kqwq, qos_index);
+
+ /*
+ * If we have already requested a thread, and it hasn't
+ * started processing yet, there's no use hammering away
+ * on the pthread kext.
+ */
+ if (kqr->kqr_state & KQWQ_THREQUESTED)
+ return;
+
+ assert(kqr->kqr_thread == THREAD_NULL);
+
+ /* request additional workq threads if appropriate */
+ if (pthread_functions != NULL &&
+ pthread_functions->workq_reqthreads != NULL) {
+ unsigned int flags = KEVENT_FLAG_WORKQ;
+
+ /* Compute a priority based on qos_index. */
+ struct workq_reqthreads_req_s request = {
+ .priority = qos_from_qos_index(qos_index),
+ .count = 1
+ };
+
+ thread_t wqthread;
+ wqthread = (*pthread_functions->workq_reqthreads)(kqwq->kqwq_p, 1, &request);
+ kqr->kqr_state |= KQWQ_THREQUESTED;
+
+ /* Have we been switched to the emergency/manager thread? */
+ if (wqthread == (thread_t)-1) {
+ flags |= KEVENT_FLAG_WORKQ_MANAGER;
+ wqthread = THREAD_NULL;
+ } else if (qos_index == KQWQ_QOS_MANAGER)
+ flags |= KEVENT_FLAG_WORKQ_MANAGER;
+
+ /* bind the thread */
+ kqworkq_bind_thread(kqwq, qos_index, wqthread, flags);
+ }
+}
+
+/*
+ * If we aren't already busy processing events [for this QoS],
+ * request workq thread support as appropriate.
+ *
+ * TBD - for now, we don't segregate out processing by QoS.
+ *
+ * - May be called with the kqueue's wait queue set locked,
+ * so cannot do anything that could recurse on that.
+ */
+static void
+kqworkq_request_help(
+ struct kqworkq *kqwq,
+ kq_index_t qos_index,
+ uint32_t type)
+{
+ struct kqrequest *kqr;
+
+ /* convert to thread qos value */
+ assert(qos_index < KQWQ_NQOS);
+
+ kqwq_req_lock(kqwq);
+ kqr = kqworkq_get_request(kqwq, qos_index);
+
+ /*
+ * If someone is processing the queue, just mark what type
+ * of attempt this was (from a kq wakeup or from a waitq hook).
+ * They'll be noticed at the end of servicing and a new thread
+ * will be requested at that point.
+ */
+ if (kqr->kqr_state & KQWQ_PROCESSING) {
+ kqr->kqr_state |= type;
+ kqwq_req_unlock(kqwq);
+ return;
+ }
+
+ kqworkq_request_thread(kqwq, qos_index);
+ kqwq_req_unlock(kqwq);
+}
+
+/*
+ * These arrays described the low and high qindexes for a given qos_index.
+ * The values come from the chart in <sys/eventvar.h> (must stay in sync).
+ */
+static kq_index_t _kq_base_index[KQWQ_NQOS] = {0, 0, 6, 11, 15, 18, 20, 21};
+static kq_index_t _kq_high_index[KQWQ_NQOS] = {0, 5, 10, 14, 17, 19, 20, 21};
+
+static struct kqtailq *
+kqueue_get_base_queue(struct kqueue *kq, kq_index_t qos_index)
+{
+ assert(qos_index < KQWQ_NQOS);
+ return &kq->kq_queue[_kq_base_index[qos_index]];
+}
+
+static struct kqtailq *
+kqueue_get_high_queue(struct kqueue *kq, kq_index_t qos_index)
+{
+ assert(qos_index < KQWQ_NQOS);
+ return &kq->kq_queue[_kq_high_index[qos_index]];
+}
+
+static int
+kqueue_queue_empty(struct kqueue *kq, kq_index_t qos_index)
+{
+ struct kqtailq *base_queue = kqueue_get_base_queue(kq, qos_index);
+ struct kqtailq *queue = kqueue_get_high_queue(kq, qos_index);
+
+ do {
+ if (!TAILQ_EMPTY(queue))
+ return 0;
+ } while (queue-- > base_queue);
+ return 1;
+}
+
+static struct kqtailq *
+kqueue_get_suppressed_queue(struct kqueue *kq, kq_index_t qos_index)
+{
+ if (kq->kq_state & KQ_WORKQ) {
+ struct kqworkq *kqwq = (struct kqworkq *)kq;
+ struct kqrequest *kqr;
+
+ kqr = kqworkq_get_request(kqwq, qos_index);
+ return &kqr->kqr_suppressed;
+ } else {
+ struct kqfile *kqf = (struct kqfile *)kq;
+ return &kqf->kqf_suppressed;
+ }
+}
+
+static kq_index_t
+knote_get_queue_index(struct knote *kn)
+{
+ kq_index_t override_index = knote_get_qos_override_index(kn);
+ kq_index_t qos_index = knote_get_qos_index(kn);
+ struct kqueue *kq = knote_get_kq(kn);
+ kq_index_t res;
+
+ if ((kq->kq_state & KQ_WORKQ) == 0) {
+ assert(qos_index == 0);
+ assert(override_index == 0);
+ }
+ res = _kq_base_index[qos_index];
+ if (override_index > qos_index)
+ res += override_index - qos_index;
+
+ assert(res <= _kq_high_index[qos_index]);
+ return res;
+}
+
+static struct kqtailq *
+knote_get_queue(struct knote *kn)
+{
+ kq_index_t qindex = knote_get_queue_index(kn);
+
+ return &(knote_get_kq(kn))->kq_queue[qindex];
+}
+
+static struct kqtailq *
+knote_get_suppressed_queue(struct knote *kn)
+{
+ kq_index_t qos_index = knote_get_qos_index(kn);
+ struct kqueue *kq = knote_get_kq(kn);
+
+ return kqueue_get_suppressed_queue(kq, qos_index);
+}
+
+static kq_index_t
+knote_get_req_index(struct knote *kn)
+{
+ return kn->kn_req_index;
+}
+
+static kq_index_t
+knote_get_qos_index(struct knote *kn)
+{
+ return kn->kn_qos_index;
+}
+
+static void
+knote_set_qos_index(struct knote *kn, kq_index_t qos_index)
+{
+ struct kqueue *kq = knote_get_kq(kn);
+
+ assert(qos_index < KQWQ_NQOS);
+ assert((kn->kn_status & KN_QUEUED) == 0);
+
+ if (kq->kq_state & KQ_WORKQ)
+ assert(qos_index > QOS_INDEX_KQFILE);
+ else
+ assert(qos_index == QOS_INDEX_KQFILE);
+
+ /* always set requested */
+ kn->kn_req_index = qos_index;
+
+ /* only adjust in-use qos index when not suppressed */
+ if ((kn->kn_status & KN_SUPPRESSED) == 0)
+ kn->kn_qos_index = qos_index;
+}
+
+static kq_index_t
+knote_get_qos_override_index(struct knote *kn)
+{
+ return kn->kn_qos_override;
+}
+
+static void
+knote_set_qos_override_index(struct knote *kn, kq_index_t override_index)
+{
+ struct kqueue *kq = knote_get_kq(kn);
+ kq_index_t qos_index = knote_get_qos_index(kn);
+
+ assert((kn->kn_status & KN_QUEUED) == 0);
+
+ if (override_index == KQWQ_QOS_MANAGER)
+ assert(qos_index == KQWQ_QOS_MANAGER);
+ else
+ assert(override_index < KQWQ_QOS_MANAGER);
+
+ kn->kn_qos_override = override_index;
+
+ /*
+ * If this is a workq kqueue, apply the override to the
+ * workq servicing thread.
+ */
+ if (kq->kq_state & KQ_WORKQ) {
+ struct kqworkq *kqwq = (struct kqworkq *)kq;
+
+ assert(qos_index > QOS_INDEX_KQFILE);
+ kqworkq_update_override(kqwq, qos_index, override_index);
+ }
+}
+
+static void
+kqworkq_update_override(struct kqworkq *kqwq, kq_index_t qos_index, kq_index_t override_index)
+{
+ struct kqrequest *kqr;
+ kq_index_t new_delta;
+ kq_index_t old_delta;
+
+ new_delta = (override_index > qos_index) ?
+ override_index - qos_index : 0;
+
+ kqr = kqworkq_get_request(kqwq, qos_index);
+
+ kqwq_req_lock(kqwq);
+ old_delta = kqr->kqr_override_delta;
+
+ if (new_delta > old_delta) {
+ thread_t wqthread = kqr->kqr_thread;
+
+ /* store the new override delta */
+ kqr->kqr_override_delta = new_delta;
+
+ /* apply the override to [incoming?] servicing thread */
+ if (wqthread) {
+ /* only apply if non-manager */
+ if ((kqr->kqr_state & KQWQ_THMANAGER) == 0) {
+ if (old_delta)
+ thread_update_ipc_override(wqthread, override_index);
+ else
+ thread_add_ipc_override(wqthread, override_index);
+ }
+ }
+ }
+ kqwq_req_unlock(kqwq);
+}
+
+/* called with the kqworkq lock held */
+static void
+kqworkq_bind_thread(
+ struct kqworkq *kqwq,
+ kq_index_t qos_index,
+ thread_t thread,
+ unsigned int flags)
+{
+ struct kqrequest *kqr = kqworkq_get_request(kqwq, qos_index);
+ thread_t old_thread = kqr->kqr_thread;
+ struct uthread *ut;
+
+ assert(kqr->kqr_state & KQWQ_THREQUESTED);
+
+ /* If no identity yet, just set flags as needed */
+ if (thread == THREAD_NULL) {
+ assert(old_thread == THREAD_NULL);
+
+ /* emergency or unindetified */
+ if (flags & KEVENT_FLAG_WORKQ_MANAGER) {
+ assert((kqr->kqr_state & KQWQ_THMANAGER) == 0);
+ kqr->kqr_state |= KQWQ_THMANAGER;
+ }
+ return;
+ }
+
+ /* Known thread identity */
+ ut = get_bsdthread_info(thread);
+
+ /*
+ * If this is a manager, and the manager request bit is
+ * not set, assure no other thread is bound. If the bit
+ * is set, make sure the old thread is us (or not set).
+ */
+ if (flags & KEVENT_FLAG_WORKQ_MANAGER) {
+ if ((kqr->kqr_state & KQWQ_THMANAGER) == 0) {
+ assert(old_thread == THREAD_NULL);
+ kqr->kqr_state |= KQWQ_THMANAGER;
+ } else if (old_thread == THREAD_NULL) {
+ kqr->kqr_thread = thread;
+ ut->uu_kqueue_bound = KQWQ_QOS_MANAGER;
+ ut->uu_kqueue_flags = (KEVENT_FLAG_WORKQ |
+ KEVENT_FLAG_WORKQ_MANAGER);
+ } else {
+ assert(thread == old_thread);
+ assert(ut->uu_kqueue_bound == KQWQ_QOS_MANAGER);
+ assert(ut->uu_kqueue_flags & KEVENT_FLAG_WORKQ_MANAGER);
+ }
+ return;
+ }
+
+ /* Just a normal one-queue servicing thread */
+ assert(old_thread == THREAD_NULL);
+ assert((kqr->kqr_state & KQWQ_THMANAGER) == 0);
+
+ kqr->kqr_thread = thread;
+
+ /* apply an ipc QoS override if one is needed */
+ if (kqr->kqr_override_delta)
+ thread_add_ipc_override(thread, qos_index + kqr->kqr_override_delta);
+
+ /* indicate that we are processing in the uthread */
+ ut->uu_kqueue_bound = qos_index;
+ ut->uu_kqueue_flags = flags;
+}
+
+/* called with the kqworkq lock held */
+static void
+kqworkq_unbind_thread(
+ struct kqworkq *kqwq,
+ kq_index_t qos_index,
+ thread_t thread,
+ __unused unsigned int flags)
+{
+ struct kqrequest *kqr = kqworkq_get_request(kqwq, qos_index);
+ kq_index_t override = 0;
+
+ assert(thread == current_thread());
+
+ /*
+ * If there is an override, drop it from the current thread
+ * and then we are free to recompute (a potentially lower)
+ * minimum override to apply to the next thread request.
+ */
+ if (kqr->kqr_override_delta) {
+ struct kqtailq *base_queue = kqueue_get_base_queue(&kqwq->kqwq_kqueue, qos_index);
+ struct kqtailq *queue = kqueue_get_high_queue(&kqwq->kqwq_kqueue, qos_index);
+
+ /* if not bound to a manager thread, drop the current ipc override */
+ if ((kqr->kqr_state & KQWQ_THMANAGER) == 0) {
+ assert(thread == kqr->kqr_thread);
+ thread_drop_ipc_override(thread);
+ }
+
+ /* recompute the new override */
+ do {
+ if (!TAILQ_EMPTY(queue)) {
+ override = queue - base_queue;
+ break;
+ }
+ } while (queue-- > base_queue);
+ }
+
+ /* unbind the thread and apply the new override */
+ kqr->kqr_thread = THREAD_NULL;
+ kqr->kqr_override_delta = override;
+}
+
+struct kqrequest *
+kqworkq_get_request(struct kqworkq *kqwq, kq_index_t qos_index)
+{
+ assert(qos_index < KQWQ_NQOS);
+ return &kqwq->kqwq_request[qos_index];
+}
+
+void
+knote_adjust_qos(struct knote *kn, qos_t new_qos, qos_t new_override)
+{
+ if (knote_get_kq(kn)->kq_state & KQ_WORKQ) {
+ kq_index_t new_qos_index;
+ kq_index_t new_override_index;
+ kq_index_t servicer_qos_index;
+
+ new_qos_index = qos_index_from_qos(new_qos, FALSE);
+ new_override_index = qos_index_from_qos(new_override, TRUE);
+
+ /* make sure the servicer qos acts as a floor */
+ servicer_qos_index = qos_index_from_qos(kn->kn_qos, FALSE);
+ if (servicer_qos_index > new_qos_index)
+ new_qos_index = servicer_qos_index;
+ if (servicer_qos_index > new_override_index)
+ new_override_index = servicer_qos_index;
+
+ kqlock(knote_get_kq(kn));
+ if (new_qos_index != knote_get_req_index(kn) ||
+ new_override_index != knote_get_qos_override_index(kn)) {
+ if (kn->kn_status & KN_QUEUED) {
+ knote_dequeue(kn);
+ knote_set_qos_index(kn, new_qos_index);
+ knote_set_qos_override_index(kn, new_override_index);
+ knote_enqueue(kn);
+ knote_wakeup(kn);
+ } else {
+ knote_set_qos_index(kn, new_qos_index);
+ knote_set_qos_override_index(kn, new_override_index);
+ }
+ }
+ kqunlock(knote_get_kq(kn));
+ }
+}
+
+static void
+knote_wakeup(struct knote *kn)
+{
+ struct kqueue *kq = knote_get_kq(kn);
+
+ if (kq->kq_state & KQ_WORKQ) {
+ /* request a servicing thread */
+ struct kqworkq *kqwq = (struct kqworkq *)kq;
+ kq_index_t qos_index = knote_get_qos_index(kn);
+
+ kqworkq_request_help(kqwq, qos_index, KQWQ_WAKEUP);
+
+ } else {
+ struct kqfile *kqf = (struct kqfile *)kq;
+
+ /* flag wakeups during processing */
+ if (kq->kq_state & KQ_PROCESSING)
+ kq->kq_state |= KQ_WAKEUP;
+
+ /* wakeup a thread waiting on this queue */
+ if (kq->kq_state & (KQ_SLEEP | KQ_SEL)) {
+ kq->kq_state &= ~(KQ_SLEEP | KQ_SEL);
+ waitq_wakeup64_all((struct waitq *)&kq->kq_wqs,
+ KQ_EVENT,
+ THREAD_AWAKENED,
+ WAITQ_ALL_PRIORITIES);
+ }