]> git.saurik.com Git - apple/xnu.git/blobdiff - bsd/net/content_filter.c
xnu-6153.101.6.tar.gz
[apple/xnu.git] / bsd / net / content_filter.c
index 62988b66b456454d8a7961a3bf95cb1b8ce5ae16..626c2b2bf6ddbb965f793e99966ee1e16c6ead88 100644 (file)
@@ -359,6 +359,7 @@ struct content_filter **content_filters = NULL;
 uint32_t cfil_active_count = 0; /* Number of active content filters */
 uint32_t cfil_sock_attached_count = 0;  /* Number of sockets attachements */
 uint32_t cfil_sock_udp_attached_count = 0;      /* Number of UDP sockets attachements */
+uint32_t cfil_sock_attached_stats_count = 0;    /* Number of sockets requested periodic stats report */
 uint32_t cfil_close_wait_timeout = 1000; /* in milliseconds */
 
 static kern_ctl_ref cfil_kctlref = NULL;
@@ -408,6 +409,11 @@ struct cfil_entry {
        uint32_t                cfe_necp_control_unit;
        struct timeval          cfe_last_event; /* To user space */
        struct timeval          cfe_last_action; /* From user space */
+       uint64_t                cfe_byte_inbound_count_reported; /* stats already been reported */
+       uint64_t                cfe_byte_outbound_count_reported; /* stats already been reported */
+       struct timeval          cfe_stats_report_ts; /* Timestamp for last stats report */
+       uint32_t                cfe_stats_report_frequency; /* Interval for stats report in msecs */
+       boolean_t               cfe_laddr_sent;
 
        struct cfe_buf {
                /*
@@ -455,6 +461,7 @@ struct cfil_hash_entry;
  */
 struct cfil_info {
        TAILQ_ENTRY(cfil_info)  cfi_link;
+       TAILQ_ENTRY(cfil_info)  cfi_link_stats;
        struct socket           *cfi_so;
        uint64_t                cfi_flags;
        uint64_t                cfi_sock_id;
@@ -517,6 +524,7 @@ struct cfil_info {
 #define CFI_ENTRY_KCUNIT(i, e) (((e) - &((i)->cfi_entries[0])) + 1)
 
 TAILQ_HEAD(cfil_sock_head, cfil_info) cfil_sock_head;
+TAILQ_HEAD(cfil_sock_head_stats, cfil_info) cfil_sock_head_stats;
 
 #define CFIL_QUEUE_VERIFY(x) if (cfil_debug) cfil_queue_verify(x)
 #define CFIL_INFO_VERIFY(x) if (cfil_debug) cfil_info_verify(x)
@@ -538,6 +546,23 @@ LIST_HEAD(cfilhashhead, cfil_hash_entry);
                           (addr.sa.sa_family == AF_INET && addr.sin.sin_addr.s_addr == 0) || \
                           (addr.sa.sa_family == AF_INET6 && IN6_IS_ADDR_UNSPECIFIED(&addr.sin6.sin6_addr)))
 
+/*
+ * Periodic Statistics Report:
+ */
+static struct thread *cfil_stats_report_thread;
+#define CFIL_STATS_REPORT_INTERVAL_MIN_MSEC  500   // Highest report frequency
+#define CFIL_STATS_REPORT_RUN_INTERVAL_NSEC  (CFIL_STATS_REPORT_INTERVAL_MIN_MSEC * NSEC_PER_MSEC)
+#define CFIL_STATS_REPORT_MAX_COUNT          50    // Max stats to be reported per run
+
+/* This buffer must have same layout as struct cfil_msg_stats_report */
+struct cfil_stats_report_buffer {
+       struct cfil_msg_hdr        msghdr;
+       uint32_t                   count;
+       struct cfil_msg_sock_stats stats[CFIL_STATS_REPORT_MAX_COUNT];
+};
+static struct cfil_stats_report_buffer *global_cfil_stats_report_buffers[MAX_CONTENT_FILTER];
+static uint32_t global_cfil_stats_counts[MAX_CONTENT_FILTER];
+
 /*
  * UDP Garbage Collection:
  */
@@ -571,7 +596,7 @@ struct cfil_hash_entry {
        u_short cfentry_lport;
        sa_family_t                    cfentry_family;
        u_int32_t                      cfentry_flowhash;
-       u_int32_t                      cfentry_lastused;
+       u_int64_t                      cfentry_lastused;
        union {
                /* foreign host table entry */
                struct in_addr_4in6 addr46;
@@ -637,6 +662,7 @@ int cfil_debug = 1;
 #define DATA_DEBUG 0
 #define SHOW_DEBUG 0
 #define GC_DEBUG 0
+#define STATS_DEBUG 0
 
 /*
  * Sysctls for logs and statistics
@@ -754,6 +780,8 @@ static void cfil_sock_received_verdict(struct socket *so);
 static void cfil_fill_event_msg_addresses(struct cfil_hash_entry *, struct inpcb *,
     union sockaddr_in_4_6 *, union sockaddr_in_4_6 *,
     boolean_t, boolean_t);
+static void cfil_stats_report_thread_func(void *, wait_result_t);
+static void cfil_stats_report(void *v, wait_result_t w);
 
 bool check_port(struct sockaddr *, u_short);
 
@@ -1190,6 +1218,34 @@ cfil_ctl_connect(kern_ctl_ref kctlref, struct sockaddr_ctl *sac,
 
                *unitinfo = cfc;
                cfil_active_count++;
+
+               // Allocate periodic stats buffer for this filter
+               if (global_cfil_stats_report_buffers[cfc->cf_kcunit - 1] == NULL) {
+                       cfil_rw_unlock_exclusive(&cfil_lck_rw);
+
+                       struct cfil_stats_report_buffer *buf;
+
+                       MALLOC(buf,
+                           struct cfil_stats_report_buffer *,
+                           sizeof(struct cfil_stats_report_buffer),
+                           M_TEMP,
+                           M_WAITOK | M_ZERO);
+
+                       cfil_rw_lock_exclusive(&cfil_lck_rw);
+
+                       if (buf == NULL) {
+                               error = ENOMEM;
+                               cfil_rw_unlock_exclusive(&cfil_lck_rw);
+                               goto done;
+                       }
+
+                       /* Another thread may have won the race */
+                       if (global_cfil_stats_report_buffers[cfc->cf_kcunit - 1] != NULL) {
+                               FREE(buf, M_TEMP);
+                       } else {
+                               global_cfil_stats_report_buffers[cfc->cf_kcunit - 1] = buf;
+                       }
+               }
        }
        cfil_rw_unlock_exclusive(&cfil_lck_rw);
 done:
@@ -1334,6 +1390,11 @@ release:
        }
        verify_content_filter(cfc);
 
+       /* Free the stats buffer for this filter */
+       if (global_cfil_stats_report_buffers[cfc->cf_kcunit - 1] != NULL) {
+               FREE(global_cfil_stats_report_buffers[cfc->cf_kcunit - 1], M_TEMP);
+               global_cfil_stats_report_buffers[cfc->cf_kcunit - 1] = NULL;
+       }
        VERIFY(cfc->cf_sock_count == 0);
 
        /*
@@ -1593,6 +1654,90 @@ done:
        return so;
 }
 
+static void
+cfil_info_stats_toggle(struct cfil_info *cfil_info, struct cfil_entry *entry, uint32_t report_frequency)
+{
+       struct cfil_info *cfil = NULL;
+       Boolean found = FALSE;
+       int kcunit;
+
+       if (cfil_info == NULL) {
+               return;
+       }
+
+       if (report_frequency) {
+               if (entry == NULL) {
+                       return;
+               }
+
+               // Update stats reporting frequency.
+               if (entry->cfe_stats_report_frequency != report_frequency) {
+                       entry->cfe_stats_report_frequency = report_frequency;
+                       if (entry->cfe_stats_report_frequency < CFIL_STATS_REPORT_INTERVAL_MIN_MSEC) {
+                               entry->cfe_stats_report_frequency = CFIL_STATS_REPORT_INTERVAL_MIN_MSEC;
+                       }
+                       microuptime(&entry->cfe_stats_report_ts);
+
+                       // Insert cfil_info into list only if it is not in yet.
+                       TAILQ_FOREACH(cfil, &cfil_sock_head_stats, cfi_link_stats) {
+                               if (cfil == cfil_info) {
+                                       return;
+                               }
+                       }
+
+                       TAILQ_INSERT_TAIL(&cfil_sock_head_stats, cfil_info, cfi_link_stats);
+
+                       // Wake up stats thread if this is first flow added
+                       if (cfil_sock_attached_stats_count == 0) {
+                               thread_wakeup((caddr_t)&cfil_sock_attached_stats_count);
+                       }
+                       cfil_sock_attached_stats_count++;
+#if STATS_DEBUG
+                       CFIL_LOG(LOG_ERR, "CFIL: VERDICT RECEIVED - STATS FLOW INSERTED: <so %llx sockID %llu> stats frequency %d msecs",
+                           cfil_info->cfi_so ? (uint64_t)VM_KERNEL_ADDRPERM(cfil_info->cfi_so) : 0,
+                           cfil_info->cfi_sock_id,
+                           entry->cfe_stats_report_frequency);
+#endif
+               }
+       } else {
+               // Turn off stats reporting for this filter.
+               if (entry != NULL) {
+                       // Already off, no change.
+                       if (entry->cfe_stats_report_frequency == 0) {
+                               return;
+                       }
+
+                       entry->cfe_stats_report_frequency = 0;
+                       // If cfil_info still has filter(s) asking for stats, no need to remove from list.
+                       for (kcunit = 1; kcunit <= MAX_CONTENT_FILTER; kcunit++) {
+                               if (cfil_info->cfi_entries[kcunit - 1].cfe_stats_report_frequency > 0) {
+                                       return;
+                               }
+                       }
+               }
+
+               // No more filter asking for stats for this cfil_info, remove from list.
+               if (!TAILQ_EMPTY(&cfil_sock_head_stats)) {
+                       found = FALSE;
+                       TAILQ_FOREACH(cfil, &cfil_sock_head_stats, cfi_link_stats) {
+                               if (cfil == cfil_info) {
+                                       found = TRUE;
+                                       break;
+                               }
+                       }
+                       if (found) {
+                               cfil_sock_attached_stats_count--;
+                               TAILQ_REMOVE(&cfil_sock_head_stats, cfil_info, cfi_link_stats);
+#if STATS_DEBUG
+                               CFIL_LOG(LOG_ERR, "CFIL: VERDICT RECEIVED - STATS FLOW DELETED: <so %llx sockID %llu> stats frequency reset",
+                                   cfil_info->cfi_so ? (uint64_t)VM_KERNEL_ADDRPERM(cfil_info->cfi_so) : 0,
+                                   cfil_info->cfi_sock_id);
+#endif
+                       }
+               }
+       }
+}
+
 static errno_t
 cfil_ctl_send(kern_ctl_ref kctlref, u_int32_t kcunit, void *unitinfo, mbuf_t m,
     int flags)
@@ -1787,6 +1932,12 @@ cfil_ctl_send(kern_ctl_ref kctlref, u_int32_t kcunit, void *unitinfo, mbuf_t m,
                if (error == EJUSTRETURN) {
                        error = 0;
                }
+
+               // Toggle stats reporting according to received verdict.
+               cfil_rw_lock_exclusive(&cfil_lck_rw);
+               cfil_info_stats_toggle(cfil_info, entry, action_msg->cfa_stats_frequency);
+               cfil_rw_unlock_exclusive(&cfil_lck_rw);
+
                break;
 
        case CFM_OP_DROP:
@@ -2247,6 +2398,7 @@ cfil_init(void)
        lck_rw_init(&cfil_lck_rw, cfil_lck_grp, cfil_lck_attr);
 
        TAILQ_INIT(&cfil_sock_head);
+       TAILQ_INIT(&cfil_sock_head_stats);
 
        /*
         * Register kernel control
@@ -2278,10 +2430,21 @@ cfil_init(void)
        /* this must not fail */
        VERIFY(cfil_udp_gc_thread != NULL);
 
+       // Spawn thread for statistics reporting
+       if (kernel_thread_start(cfil_stats_report_thread_func, NULL,
+           &cfil_stats_report_thread) != KERN_SUCCESS) {
+               panic_plain("%s: Can't create statistics report thread", __func__);
+               /* NOTREACHED */
+       }
+       /* this must not fail */
+       VERIFY(cfil_stats_report_thread != NULL);
+
        // Set UDP per-flow mbuf thresholds to 1/32 of platform max
        mbuf_limit = MAX(UDP_FLOW_GC_MBUF_CNT_MAX, (nmbclusters << MCLSHIFT) >> UDP_FLOW_GC_MBUF_SHIFT);
        cfil_udp_gc_mbuf_num_max = (mbuf_limit >> MCLSHIFT);
        cfil_udp_gc_mbuf_cnt_max = mbuf_limit;
+
+       memset(&global_cfil_stats_report_buffers, 0, sizeof(global_cfil_stats_report_buffers));
 }
 
 struct cfil_info *
@@ -2486,6 +2649,9 @@ cfil_info_free(struct cfil_info *cfil_info)
        cfil_sock_attached_count--;
        TAILQ_REMOVE(&cfil_sock_head, cfil_info, cfi_link);
 
+       // Turn off stats reporting for cfil_info.
+       cfil_info_stats_toggle(cfil_info, NULL, 0);
+
        out_drained += cfil_queue_drain(&cfil_info->cfi_snd.cfi_inject_q);
        in_drain += cfil_queue_drain(&cfil_info->cfi_rcv.cfi_inject_q);
 
@@ -3258,6 +3424,10 @@ static void
 fill_ip6_sockaddr_4_6(union sockaddr_in_4_6 *sin46,
     struct in6_addr *ip6, u_int16_t port)
 {
+       if (sin46 == NULL) {
+               return;
+       }
+
        struct sockaddr_in6 *sin6 = &sin46->sin6;
 
        sin6->sin6_family = AF_INET6;
@@ -3274,6 +3444,10 @@ static void
 fill_ip_sockaddr_4_6(union sockaddr_in_4_6 *sin46,
     struct in_addr ip, u_int16_t port)
 {
+       if (sin46 == NULL) {
+               return;
+       }
+
        struct sockaddr_in *sin = &sin46->sin;
 
        sin->sin_family = AF_INET;
@@ -6548,7 +6722,7 @@ cfil_info_udp_expire(void *v, wait_result_t w)
        struct cfil_hash_entry *hash_entry;
        struct cfil_db *db;
        struct socket *so;
-       u_int32_t current_time = 0;
+       u_int64_t current_time = 0;
 
        current_time = net_uptime();
 
@@ -6699,3 +6873,318 @@ cfil_udp_get_socket_state(struct mbuf *m, uint32_t *state_change_cnt, short *opt
        }
        return NULL;
 }
+
+static int
+cfil_dispatch_stats_event_locked(int kcunit, struct cfil_stats_report_buffer *buffer, uint32_t stats_count)
+{
+       struct content_filter *cfc = NULL;
+       errno_t error = 0;
+       size_t msgsize = 0;
+
+       if (buffer == NULL || stats_count == 0) {
+               return error;
+       }
+
+       if (content_filters == NULL || kcunit > MAX_CONTENT_FILTER) {
+               return error;
+       }
+
+       cfc = content_filters[kcunit - 1];
+       if (cfc == NULL) {
+               return error;
+       }
+
+       /* Would be wasteful to try */
+       if (cfc->cf_flags & CFF_FLOW_CONTROLLED) {
+               error = ENOBUFS;
+               goto done;
+       }
+
+       msgsize = sizeof(struct cfil_msg_stats_report) + (sizeof(struct cfil_msg_sock_stats) * stats_count);
+       buffer->msghdr.cfm_len = msgsize;
+       buffer->msghdr.cfm_version = 1;
+       buffer->msghdr.cfm_type = CFM_TYPE_EVENT;
+       buffer->msghdr.cfm_op = CFM_OP_STATS;
+       buffer->msghdr.cfm_sock_id = 0;
+       buffer->count = stats_count;
+
+#if STATS_DEBUG
+       CFIL_LOG(LOG_ERR, "STATS (kcunit %d): msg size %lu - %lu %lu %lu",
+           kcunit,
+           (unsigned long)msgsize,
+           (unsigned long)sizeof(struct cfil_msg_stats_report),
+           (unsigned long)sizeof(struct cfil_msg_sock_stats),
+           (unsigned long)stats_count);
+#endif
+
+       error = ctl_enqueuedata(cfc->cf_kcref, cfc->cf_kcunit,
+           buffer,
+           msgsize,
+           CTL_DATA_EOR);
+       if (error != 0) {
+               CFIL_LOG(LOG_ERR, "ctl_enqueuedata() failed: %d", error);
+               goto done;
+       }
+       OSIncrementAtomic(&cfil_stats.cfs_stats_event_ok);
+
+#if STATS_DEBUG
+       CFIL_LOG(LOG_ERR, "CFIL: STATS REPORT: send msg to %d", kcunit);
+#endif
+
+done:
+
+       if (error == ENOBUFS) {
+               OSIncrementAtomic(
+                       &cfil_stats.cfs_stats_event_flow_control);
+
+               if (!cfil_rw_lock_shared_to_exclusive(&cfil_lck_rw)) {
+                       cfil_rw_lock_exclusive(&cfil_lck_rw);
+               }
+
+               cfc->cf_flags |= CFF_FLOW_CONTROLLED;
+
+               cfil_rw_unlock_exclusive(&cfil_lck_rw);
+       } else if (error != 0) {
+               OSIncrementAtomic(&cfil_stats.cfs_stats_event_fail);
+       }
+
+       return error;
+}
+
+static void
+cfil_stats_report_thread_sleep(bool forever)
+{
+#if STATS_DEBUG
+       CFIL_LOG(LOG_ERR, "CFIL: STATS COLLECTION SLEEP");
+#endif
+
+       if (forever) {
+               (void) assert_wait((event_t) &cfil_sock_attached_stats_count,
+                   THREAD_INTERRUPTIBLE);
+       } else {
+               uint64_t deadline = 0;
+               nanoseconds_to_absolutetime(CFIL_STATS_REPORT_RUN_INTERVAL_NSEC, &deadline);
+               clock_absolutetime_interval_to_deadline(deadline, &deadline);
+
+               (void) assert_wait_deadline(&cfil_sock_attached_stats_count,
+                   THREAD_INTERRUPTIBLE, deadline);
+       }
+}
+
+static void
+cfil_stats_report_thread_func(void *v, wait_result_t w)
+{
+#pragma unused(v, w)
+
+       ASSERT(cfil_stats_report_thread == current_thread());
+       thread_set_thread_name(current_thread(), "CFIL_STATS_REPORT");
+
+       // Kick off gc shortly
+       cfil_stats_report_thread_sleep(false);
+       thread_block_parameter((thread_continue_t) cfil_stats_report, NULL);
+       /* NOTREACHED */
+}
+
+static bool
+cfil_stats_collect_flow_stats_for_filter(int kcunit,
+    struct cfil_info *cfil_info,
+    struct cfil_entry *entry,
+    struct timeval current_tv)
+{
+       struct cfil_stats_report_buffer *buffer = NULL;
+       struct cfil_msg_sock_stats *flow_array = NULL;
+       struct cfil_msg_sock_stats *stats = NULL;
+       struct inpcb *inp = NULL;
+       struct timeval diff_time;
+       uint64_t diff_time_usecs;
+       int index = 0;
+
+       if (entry->cfe_stats_report_frequency == 0) {
+               return false;
+       }
+
+       buffer = global_cfil_stats_report_buffers[kcunit - 1];
+       if (buffer == NULL) {
+#if STATS_DEBUG
+               CFIL_LOG(LOG_ERR, "CFIL: STATS: no buffer");
+#endif
+               return false;
+       }
+
+       timersub(&current_tv, &entry->cfe_stats_report_ts, &diff_time);
+       diff_time_usecs = (diff_time.tv_sec * USEC_PER_SEC) + diff_time.tv_usec;
+
+#if STATS_DEBUG
+       CFIL_LOG(LOG_ERR, "CFIL: STATS REPORT - elapsed time - ts %llu %llu cur ts %llu %llu diff %llu %llu (usecs %llu) @freq %llu usecs sockID %llu",
+           (unsigned long long)entry->cfe_stats_report_ts.tv_sec,
+           (unsigned long long)entry->cfe_stats_report_ts.tv_usec,
+           (unsigned long long)current_tv.tv_sec,
+           (unsigned long long)current_tv.tv_usec,
+           (unsigned long long)diff_time.tv_sec,
+           (unsigned long long)diff_time.tv_usec,
+           (unsigned long long)diff_time_usecs,
+           (unsigned long long)((entry->cfe_stats_report_frequency * NSEC_PER_MSEC) / NSEC_PER_USEC),
+           cfil_info->cfi_sock_id);
+#endif
+
+       // Compare elapsed time in usecs
+       if (diff_time_usecs >= (entry->cfe_stats_report_frequency * NSEC_PER_MSEC) / NSEC_PER_USEC) {
+#if STATS_DEBUG
+               CFIL_LOG(LOG_ERR, "CFIL: STATS REPORT - in %llu reported %llu",
+                   cfil_info->cfi_byte_inbound_count,
+                   entry->cfe_byte_inbound_count_reported);
+               CFIL_LOG(LOG_ERR, "CFIL: STATS REPORT - out %llu reported %llu",
+                   cfil_info->cfi_byte_outbound_count,
+                   entry->cfe_byte_outbound_count_reported);
+#endif
+               // Check if flow has new bytes that have not been reported
+               if (entry->cfe_byte_inbound_count_reported < cfil_info->cfi_byte_inbound_count ||
+                   entry->cfe_byte_outbound_count_reported < cfil_info->cfi_byte_outbound_count) {
+                       flow_array = (struct cfil_msg_sock_stats *)&buffer->stats;
+                       index = global_cfil_stats_counts[kcunit - 1];
+
+                       stats = &flow_array[index];
+                       stats->cfs_sock_id = cfil_info->cfi_sock_id;
+                       stats->cfs_byte_inbound_count = cfil_info->cfi_byte_inbound_count;
+                       stats->cfs_byte_outbound_count = cfil_info->cfi_byte_outbound_count;
+
+                       if (entry->cfe_laddr_sent == false) {
+                               /* cache it if necessary */
+                               if (cfil_info->cfi_so_attach_laddr.sa.sa_len == 0) {
+                                       inp = cfil_info->cfi_so ? sotoinpcb(cfil_info->cfi_so) : NULL;
+                                       if (inp != NULL) {
+                                               boolean_t outgoing = (cfil_info->cfi_dir == CFS_CONNECTION_DIR_OUT);
+                                               union sockaddr_in_4_6 *src = outgoing ? &cfil_info->cfi_so_attach_laddr : NULL;
+                                               union sockaddr_in_4_6 *dst = outgoing ? NULL : &cfil_info->cfi_so_attach_laddr;
+                                               cfil_fill_event_msg_addresses(cfil_info->cfi_hash_entry, inp,
+                                                   src, dst, inp->inp_vflag & INP_IPV4, outgoing);
+                                       }
+                               }
+
+                               if (cfil_info->cfi_so_attach_laddr.sa.sa_len != 0) {
+                                       stats->cfs_laddr.sin6 = cfil_info->cfi_so_attach_laddr.sin6;
+                                       entry->cfe_laddr_sent = true;
+                               }
+                       }
+
+                       global_cfil_stats_counts[kcunit - 1]++;
+
+                       entry->cfe_stats_report_ts = current_tv;
+                       entry->cfe_byte_inbound_count_reported = cfil_info->cfi_byte_inbound_count;
+                       entry->cfe_byte_outbound_count_reported = cfil_info->cfi_byte_outbound_count;
+#if STATS_DEBUG
+                       cfil_info_log(LOG_ERR, cfil_info, "CFIL: LIFECYCLE: STATS COLLECTED");
+#endif
+                       CFI_ADD_TIME_LOG(cfil_info, &current_tv, &cfil_info->cfi_first_event, CFM_OP_STATS);
+                       return true;
+               }
+       }
+       return false;
+}
+
+static void
+cfil_stats_report(void *v, wait_result_t w)
+{
+#pragma unused(v, w)
+
+       struct cfil_info *cfil_info = NULL;
+       struct cfil_entry *entry = NULL;
+       struct timeval current_tv;
+       uint32_t flow_count = 0;
+       uint64_t saved_next_sock_id = 0; // Next sock id to be reported for next loop
+       bool flow_reported = false;
+
+#if STATS_DEBUG
+       CFIL_LOG(LOG_ERR, "CFIL: STATS COLLECTION RUNNING");
+#endif
+
+       do {
+               // Collect all sock ids of flows that has new stats
+               cfil_rw_lock_shared(&cfil_lck_rw);
+
+               if (cfil_sock_attached_stats_count == 0) {
+#if STATS_DEBUG
+                       CFIL_LOG(LOG_ERR, "CFIL: STATS: no flow");
+#endif
+                       cfil_rw_unlock_shared(&cfil_lck_rw);
+                       goto go_sleep;
+               }
+
+               for (int kcunit = 1; kcunit <= MAX_CONTENT_FILTER; kcunit++) {
+                       if (global_cfil_stats_report_buffers[kcunit - 1] != NULL) {
+                               memset(global_cfil_stats_report_buffers[kcunit - 1], 0, sizeof(struct cfil_stats_report_buffer));
+                       }
+                       global_cfil_stats_counts[kcunit - 1] = 0;
+               }
+
+               microuptime(&current_tv);
+               flow_count = 0;
+
+               TAILQ_FOREACH(cfil_info, &cfil_sock_head_stats, cfi_link_stats) {
+                       if (saved_next_sock_id != 0 &&
+                           saved_next_sock_id == cfil_info->cfi_sock_id) {
+                               // Here is where we left off previously, start accumulating
+                               saved_next_sock_id = 0;
+                       }
+
+                       if (saved_next_sock_id == 0) {
+                               if (flow_count >= CFIL_STATS_REPORT_MAX_COUNT) {
+                                       // Examine a fixed number of flows each round.  Remember the current flow
+                                       // so we can start from here for next loop
+                                       saved_next_sock_id = cfil_info->cfi_sock_id;
+                                       break;
+                               }
+
+                               flow_reported = false;
+                               for (int kcunit = 1; kcunit <= MAX_CONTENT_FILTER; kcunit++) {
+                                       entry = &cfil_info->cfi_entries[kcunit - 1];
+                                       if (entry->cfe_filter == NULL) {
+#if STATS_DEBUG
+                                               CFIL_LOG(LOG_NOTICE, "CFIL: STATS REPORT - so %llx no filter",
+                                                   cfil_info->cfi_so ? (uint64_t)VM_KERNEL_ADDRPERM(cfil_info->cfi_so) : 0);
+#endif
+                                               continue;
+                                       }
+
+                                       if ((entry->cfe_stats_report_frequency > 0) &&
+                                           cfil_stats_collect_flow_stats_for_filter(kcunit, cfil_info, entry, current_tv) == true) {
+                                               flow_reported = true;
+                                       }
+                               }
+                               if (flow_reported == true) {
+                                       flow_count++;
+                               }
+                       }
+               }
+
+               if (flow_count > 0) {
+#if STATS_DEBUG
+                       CFIL_LOG(LOG_ERR, "CFIL: STATS reporting for %d flows", flow_count);
+#endif
+                       for (int kcunit = 1; kcunit <= MAX_CONTENT_FILTER; kcunit++) {
+                               if (global_cfil_stats_report_buffers[kcunit - 1] != NULL &&
+                                   global_cfil_stats_counts[kcunit - 1] > 0) {
+                                       cfil_dispatch_stats_event_locked(kcunit,
+                                           global_cfil_stats_report_buffers[kcunit - 1],
+                                           global_cfil_stats_counts[kcunit - 1]);
+                               }
+                       }
+               } else {
+                       cfil_rw_unlock_shared(&cfil_lck_rw);
+                       goto go_sleep;
+               }
+
+               cfil_rw_unlock_shared(&cfil_lck_rw);
+
+               // Loop again if we haven't finished the whole cfil_info list
+       } while (saved_next_sock_id != 0);
+
+go_sleep:
+
+       // Sleep forever (until waken up) if no more flow to report
+       cfil_rw_lock_shared(&cfil_lck_rw);
+       cfil_stats_report_thread_sleep(cfil_sock_attached_stats_count == 0 ? true : false);
+       cfil_rw_unlock_shared(&cfil_lck_rw);
+       thread_block_parameter((thread_continue_t) cfil_stats_report, NULL);
+       /* NOTREACHED */
+}