+ }
+ sb->sb_lastrecord = control;
+ sb->sb_mbtail = mlast;
+
+ SBLASTMBUFCHK(sb, __func__);
+ SBLASTRECORDCHK(sb, "sbappendcontrol 2");
+
+ postevent(0, sb, EV_RWBYTES);
+ return (1);
+}
+
+int
+sbappendcontrol(struct sockbuf *sb, struct mbuf *m0, struct mbuf *control,
+ int *error_out)
+{
+ int result = 0;
+ boolean_t sb_unix = (sb->sb_flags & SB_UNIX);
+
+ if (error_out)
+ *error_out = 0;
+
+ if (sb->sb_flags & SB_DROP) {
+ if (m0 != NULL)
+ m_freem(m0);
+ if (control != NULL && !sb_unix)
+ m_freem(control);
+ if (error_out != NULL)
+ *error_out = EINVAL;
+ return (0);
+ }
+
+ if (sb->sb_flags & SB_RECV && !(m0 && m0->m_flags & M_SKIPCFIL)) {
+ int error;
+
+ error = sflt_data_in(sb->sb_so, NULL, &m0, &control, 0);
+ SBLASTRECORDCHK(sb, __func__);
+
+#if CONTENT_FILTER
+ if (error == 0)
+ error = cfil_sock_data_in(sb->sb_so, NULL, m0, control,
+ 0);
+#endif /* CONTENT_FILTER */
+
+ if (error) {
+ if (error != EJUSTRETURN) {
+ if (m0)
+ m_freem(m0);
+ if (control != NULL && !sb_unix)
+ m_freem(control);
+ if (error_out)
+ *error_out = error;
+ }
+ return (0);
+ }
+ } else if (m0) {
+ m0->m_flags &= ~M_SKIPCFIL;
+ }
+
+ result = sbappendcontrol_internal(sb, m0, control);
+ if (result == 0) {
+ if (m0)
+ m_freem(m0);
+ if (control != NULL && !sb_unix)
+ m_freem(control);
+ if (error_out)
+ *error_out = ENOBUFS;
+ }
+
+ return (result);
+}
+
+/*
+ * Append a contiguous TCP data blob with TCP sequence number as control data
+ * as a new msg to the receive socket buffer.
+ */
+int
+sbappendmsgstream_rcv(struct sockbuf *sb, struct mbuf *m, uint32_t seqnum,
+ int unordered)
+{
+ struct mbuf *m_eor = NULL;
+ u_int32_t data_len = 0;
+ int ret = 0;
+ struct socket *so = sb->sb_so;
+
+ VERIFY((m->m_flags & M_PKTHDR) && m_pktlen(m) > 0);
+ VERIFY(so->so_msg_state != NULL);
+ VERIFY(sb->sb_flags & SB_RECV);
+
+ /* Keep the TCP sequence number in the mbuf pkthdr */
+ m->m_pkthdr.msg_seq = seqnum;
+
+ /* find last mbuf and set M_EOR */
+ for (m_eor = m; ; m_eor = m_eor->m_next) {
+ /*
+ * If the msg is unordered, we need to account for
+ * these bytes in receive socket buffer size. Otherwise,
+ * the receive window advertised will shrink because
+ * of the additional unordered bytes added to the
+ * receive buffer.
+ */
+ if (unordered) {
+ m_eor->m_flags |= M_UNORDERED_DATA;
+ data_len += m_eor->m_len;
+ so->so_msg_state->msg_uno_bytes += m_eor->m_len;
+ } else {
+ m_eor->m_flags &= ~M_UNORDERED_DATA;
+ }
+ if (m_eor->m_next == NULL)
+ break;
+ }
+
+ /* set EOR flag at end of byte blob */
+ m_eor->m_flags |= M_EOR;
+
+ /* expand the receive socket buffer to allow unordered data */
+ if (unordered && !sbreserve(sb, sb->sb_hiwat + data_len)) {
+ /*
+ * Could not allocate memory for unordered data, it
+ * means this packet will have to be delivered in order
+ */
+ printf("%s: could not reserve space for unordered data\n",
+ __func__);
+ }
+
+ if (!unordered && (sb->sb_mbtail != NULL) &&
+ !(sb->sb_mbtail->m_flags & M_UNORDERED_DATA)) {
+ sb->sb_mbtail->m_flags &= ~M_EOR;
+ sbcompress(sb, m, sb->sb_mbtail);
+ ret = 1;
+ } else {
+ ret = sbappendrecord(sb, m);
+ }
+ VERIFY(sb->sb_mbtail->m_flags & M_EOR);
+ return (ret);
+}
+
+/*
+ * TCP streams have message based out of order delivery support, or have
+ * Multipath TCP support, or are regular TCP sockets
+ */
+int
+sbappendstream_rcvdemux(struct socket *so, struct mbuf *m, uint32_t seqnum,
+ int unordered)
+{
+ int ret = 0;
+
+ if ((m != NULL) && (m_pktlen(m) <= 0)) {
+ m_freem(m);
+ return (ret);
+ }
+
+ if (so->so_flags & SOF_ENABLE_MSGS) {
+ ret = sbappendmsgstream_rcv(&so->so_rcv, m, seqnum, unordered);
+ }
+#if MPTCP
+ else if (so->so_flags & SOF_MPTCP_TRUE) {
+ ret = sbappendmptcpstream_rcv(&so->so_rcv, m);
+ }
+#endif /* MPTCP */
+ else {
+ ret = sbappendstream(&so->so_rcv, m);
+ }
+ return (ret);
+}
+
+#if MPTCP
+int
+sbappendmptcpstream_rcv(struct sockbuf *sb, struct mbuf *m)
+{
+ struct socket *so = sb->sb_so;
+
+ VERIFY(m == NULL || (m->m_flags & M_PKTHDR));
+ /* SB_NOCOMPRESS must be set prevent loss of M_PKTHDR data */
+ VERIFY((sb->sb_flags & (SB_RECV|SB_NOCOMPRESS)) ==
+ (SB_RECV|SB_NOCOMPRESS));
+
+ if (m == NULL || m_pktlen(m) == 0 || (sb->sb_flags & SB_DROP) ||
+ (so->so_state & SS_CANTRCVMORE)) {
+ if (m != NULL)
+ m_freem(m);
+ return (0);
+ }
+ /* the socket is not closed, so SOF_MP_SUBFLOW must be set */
+ VERIFY(so->so_flags & SOF_MP_SUBFLOW);
+
+ if (m->m_nextpkt != NULL || (sb->sb_mb != sb->sb_lastrecord)) {
+ panic("%s: nexpkt %p || mb %p != lastrecord %p\n", __func__,
+ m->m_nextpkt, sb->sb_mb, sb->sb_lastrecord);
+ /* NOTREACHED */
+ }
+
+ SBLASTMBUFCHK(sb, __func__);
+
+ if (mptcp_adj_rmap(so, m) != 0)
+ return (0);
+
+ /* No filter support (SB_RECV) on mptcp subflow sockets */
+
+ sbcompress(sb, m, sb->sb_mbtail);
+ sb->sb_lastrecord = sb->sb_mb;
+ SBLASTRECORDCHK(sb, __func__);
+ return (1);
+}
+#endif /* MPTCP */
+
+/*
+ * Append message to send socket buffer based on priority.
+ */
+int
+sbappendmsg_snd(struct sockbuf *sb, struct mbuf *m)
+{
+ struct socket *so = sb->sb_so;
+ struct msg_priq *priq;
+ int set_eor = 0;
+
+ VERIFY(so->so_msg_state != NULL);
+
+ if (m->m_nextpkt != NULL || (sb->sb_mb != sb->sb_lastrecord))
+ panic("sbappendstream: nexpkt %p || mb %p != lastrecord %p\n",
+ m->m_nextpkt, sb->sb_mb, sb->sb_lastrecord);
+
+ SBLASTMBUFCHK(sb, __func__);
+
+ if (m == NULL || (sb->sb_flags & SB_DROP) || so->so_msg_state == NULL) {
+ if (m != NULL)
+ m_freem(m);
+ return (0);
+ }
+
+ priq = &so->so_msg_state->msg_priq[m->m_pkthdr.msg_pri];
+
+ /* note if we need to propogate M_EOR to the last mbuf */
+ if (m->m_flags & M_EOR) {
+ set_eor = 1;
+
+ /* Reset M_EOR from the first mbuf */
+ m->m_flags &= ~(M_EOR);
+ }
+
+ if (priq->msgq_head == NULL) {
+ VERIFY(priq->msgq_tail == NULL && priq->msgq_lastmsg == NULL);
+ priq->msgq_head = priq->msgq_lastmsg = m;
+ } else {
+ VERIFY(priq->msgq_tail->m_next == NULL);
+
+ /* Check if the last message has M_EOR flag set */
+ if (priq->msgq_tail->m_flags & M_EOR) {
+ /* Insert as a new message */
+ priq->msgq_lastmsg->m_nextpkt = m;
+
+ /* move the lastmsg pointer */
+ priq->msgq_lastmsg = m;
+ } else {
+ /* Append to the existing message */
+ priq->msgq_tail->m_next = m;
+ }
+ }
+
+ /* Update accounting and the queue tail pointer */
+
+ while (m->m_next != NULL) {
+ sballoc(sb, m);
+ priq->msgq_bytes += m->m_len;
+ m = m->m_next;
+ }
+ sballoc(sb, m);
+ priq->msgq_bytes += m->m_len;
+
+ if (set_eor) {
+ m->m_flags |= M_EOR;
+
+ /*
+ * Since the user space can not write a new msg
+ * without completing the previous one, we can
+ * reset this flag to start sending again.
+ */
+ priq->msgq_flags &= ~(MSGQ_MSG_NOTDONE);
+ }
+
+ priq->msgq_tail = m;
+
+ SBLASTRECORDCHK(sb, "sbappendstream 2");
+ postevent(0, sb, EV_RWBYTES);