+static oslog_stream_buf_entry_t
+oslog_stream_find_free_buf_entry_locked(void)
+{
+ struct msgbuf *mbp;
+ oslog_stream_buf_entry_t buf_entry = NULL;
+
+ LCK_SPIN_ASSERT(&oslog_stream_lock, LCK_ASSERT_OWNED);
+
+ mbp = oslog_streambufp;
+
+ buf_entry = STAILQ_FIRST(&oslog_stream_free_head);
+ if (buf_entry) {
+ STAILQ_REMOVE_HEAD(&oslog_stream_free_head, buf_entries);
+ }
+ else {
+ // If no list elements are available in the free-list,
+ // consume the next log line so we can free up its list element
+ oslog_stream_buf_entry_t prev_entry = NULL;
+
+ buf_entry = STAILQ_FIRST(&oslog_stream_buf_head);
+ while (buf_entry->type == oslog_stream_link_type_metadata) {
+ prev_entry = buf_entry;
+ buf_entry = STAILQ_NEXT(buf_entry, buf_entries);
+ }
+
+ if (prev_entry == NULL) {
+ STAILQ_REMOVE_HEAD(&oslog_stream_buf_head, buf_entries);
+ }
+ else {
+ STAILQ_REMOVE_AFTER(&oslog_stream_buf_head, prev_entry, buf_entries);
+ }
+
+ mbp->msg_bufr += buf_entry->size;
+ oslog_s_dropped_msgcount++;
+ if (mbp->msg_bufr >= mbp->msg_size) {
+ mbp->msg_bufr = (mbp->msg_bufr % mbp->msg_size);
+ }
+ }
+
+ return buf_entry;
+}
+
+void
+oslog_streamwrite_metadata_locked(oslog_stream_buf_entry_t m_entry)
+{
+ LCK_SPIN_ASSERT(&oslog_stream_lock, LCK_ASSERT_OWNED);
+ STAILQ_INSERT_TAIL(&oslog_stream_buf_head, m_entry, buf_entries);
+
+ return;
+}
+
+static void
+oslog_streamwrite_append_bytes(const char *buffer, int buflen)
+{
+ struct msgbuf *mbp;
+
+ LCK_SPIN_ASSERT(&oslog_stream_lock, LCK_ASSERT_OWNED);
+
+ mbp = oslog_streambufp;
+ // Check if we have enough space in the stream buffer to write the data
+ if (mbp->msg_bufx + buflen <= mbp->msg_size) {
+ memcpy((void *)(mbp->msg_bufc + mbp->msg_bufx), buffer, buflen);
+
+ mbp->msg_bufx += buflen;
+ if (mbp->msg_bufx == mbp->msg_size) {
+ mbp->msg_bufx = 0;
+ }
+ } else {
+ // Copy part of the data until the end of the stream
+ int bytes_left = mbp->msg_size - mbp->msg_bufx;
+ memcpy((void *)(mbp->msg_bufc + mbp->msg_bufx), buffer, bytes_left);
+
+ buflen -= bytes_left;
+ buffer += bytes_left;
+
+ // Copy the remainder of the data from the beginning of stream
+ memcpy((void *)mbp->msg_bufc, buffer, buflen);
+ mbp->msg_bufx = buflen;
+ }
+ return;
+}
+
+
+void
+oslog_streamwrite_locked(firehose_tracepoint_id_u ftid,
+ uint64_t stamp, const void *pubdata, size_t publen)
+{
+ struct msgbuf *mbp;
+ int available_space = 0;
+ oslog_stream_buf_entry_t buf_entry = NULL;
+ oslog_stream_buf_entry_t next_entry = NULL;
+
+ uint16_t ft_size = offsetof(struct firehose_tracepoint_s, ft_data);
+ int ft_length = ft_size + publen;
+
+ LCK_SPIN_ASSERT(&oslog_stream_lock, LCK_ASSERT_OWNED);
+
+ mbp = oslog_streambufp;
+ if (ft_length > mbp->msg_size) {
+ (void)hw_atomic_add(&oslog_s_error_count, 1);
+ return;
+ }
+
+ // Ensure that we have a list element for this record
+ buf_entry = oslog_stream_find_free_buf_entry_locked();
+
+ assert(buf_entry != NULL);
+
+ // Ensure that we have space in the ring buffer for the current logline
+ if (mbp->msg_bufr > mbp->msg_bufx) {
+ available_space = mbp->msg_bufr - mbp->msg_bufx;
+ } else {
+ available_space = mbp->msg_size - mbp->msg_bufx + mbp->msg_bufr;
+ }
+ while(ft_length > available_space) {
+ oslog_stream_buf_entry_t prev_entry = NULL;
+
+ next_entry = STAILQ_FIRST(&oslog_stream_buf_head);
+ assert(next_entry != NULL);
+ while (next_entry->type == oslog_stream_link_type_metadata) {
+ prev_entry = next_entry;
+ next_entry = STAILQ_NEXT(next_entry, buf_entries);
+ }
+
+ if (prev_entry == NULL) {
+ STAILQ_REMOVE_HEAD(&oslog_stream_buf_head, buf_entries);
+ }
+ else {
+ STAILQ_REMOVE_AFTER(&oslog_stream_buf_head, prev_entry, buf_entries);
+ }
+
+ mbp->msg_bufr += next_entry->size;
+ if (mbp->msg_bufr >= mbp->msg_size) {
+ mbp->msg_bufr = (mbp->msg_bufr % mbp->msg_size);
+ }
+
+ oslog_s_dropped_msgcount++;
+ available_space += next_entry->size;
+
+ STAILQ_INSERT_TAIL(&oslog_stream_free_head, next_entry, buf_entries);
+ }
+
+ assert(ft_length <= available_space);
+
+ // Write the log line and update the list entry for this record
+ buf_entry->offset = mbp->msg_bufx;
+ buf_entry->size = ft_length;
+ buf_entry->timestamp = stamp;
+ buf_entry->type = oslog_stream_link_type_log;
+
+ // Construct a tracepoint
+ struct firehose_tracepoint_s fs = {
+ .ft_thread = thread_tid(current_thread()),
+ .ft_id.ftid_value = ftid.ftid_value,
+ .ft_length = publen
+ };
+
+ oslog_streamwrite_append_bytes((char *)&fs, sizeof(fs));
+ oslog_streamwrite_append_bytes(pubdata, publen);
+
+ assert(mbp->msg_bufr < mbp->msg_size);
+ // Insert the element to the buffer data list
+ STAILQ_INSERT_TAIL(&oslog_stream_buf_head, buf_entry, buf_entries);
+
+ return;
+}
+
+