+ VERIFY(sb->sb_mbtail->m_flags & M_EOR);
+ return (ret);
+}
+
+/*
+ * TCP streams have message based out of order delivery support, or have
+ * Multipath TCP support, or are regular TCP sockets
+ */
+int
+sbappendstream_rcvdemux(struct socket *so, struct mbuf *m, uint32_t seqnum,
+ int unordered)
+{
+ int ret = 0;
+
+ if ((m != NULL) && (m_pktlen(m) <= 0)) {
+ m_freem(m);
+ return (ret);
+ }
+
+ if (so->so_flags & SOF_ENABLE_MSGS) {
+ ret = sbappendmsgstream_rcv(&so->so_rcv, m, seqnum, unordered);
+ }
+#if MPTCP
+ else if (so->so_flags & SOF_MPTCP_TRUE) {
+ ret = sbappendmptcpstream_rcv(&so->so_rcv, m);
+ }
+#endif /* MPTCP */
+ else {
+ ret = sbappendstream(&so->so_rcv, m);
+ }
+ return (ret);
+}
+
+#if MPTCP
+int
+sbappendmptcpstream_rcv(struct sockbuf *sb, struct mbuf *m)
+{
+ struct socket *so = sb->sb_so;
+
+ VERIFY(m == NULL || (m->m_flags & M_PKTHDR));
+ /* SB_NOCOMPRESS must be set prevent loss of M_PKTHDR data */
+ VERIFY((sb->sb_flags & (SB_RECV|SB_NOCOMPRESS)) ==
+ (SB_RECV|SB_NOCOMPRESS));
+
+ if (m == NULL || m_pktlen(m) == 0 || (sb->sb_flags & SB_DROP) ||
+ (so->so_state & SS_CANTRCVMORE)) {
+ if (m != NULL)
+ m_freem(m);
+ return (0);
+ }
+ /* the socket is not closed, so SOF_MP_SUBFLOW must be set */
+ VERIFY(so->so_flags & SOF_MP_SUBFLOW);
+
+ if (m->m_nextpkt != NULL || (sb->sb_mb != sb->sb_lastrecord)) {
+ panic("%s: nexpkt %p || mb %p != lastrecord %p\n", __func__,
+ m->m_nextpkt, sb->sb_mb, sb->sb_lastrecord);
+ /* NOTREACHED */
+ }
+
+ SBLASTMBUFCHK(sb, __func__);
+
+ if (mptcp_adj_rmap(so, m) != 0)
+ return (0);
+
+ /* No filter support (SB_RECV) on mptcp subflow sockets */
+
+ sbcompress(sb, m, sb->sb_mbtail);
+ sb->sb_lastrecord = sb->sb_mb;
+ SBLASTRECORDCHK(sb, __func__);
+ return (1);
+}
+#endif /* MPTCP */
+
+/*
+ * Append message to send socket buffer based on priority.
+ */
+int
+sbappendmsg_snd(struct sockbuf *sb, struct mbuf *m)
+{
+ struct socket *so = sb->sb_so;
+ struct msg_priq *priq;
+ int set_eor = 0;
+
+ VERIFY(so->so_msg_state != NULL);
+
+ if (m->m_nextpkt != NULL || (sb->sb_mb != sb->sb_lastrecord))
+ panic("sbappendstream: nexpkt %p || mb %p != lastrecord %p\n",
+ m->m_nextpkt, sb->sb_mb, sb->sb_lastrecord);
+
+ SBLASTMBUFCHK(sb, __func__);
+
+ if (m == NULL || (sb->sb_flags & SB_DROP) || so->so_msg_state == NULL) {
+ if (m != NULL)
+ m_freem(m);
+ return (0);
+ }
+
+ priq = &so->so_msg_state->msg_priq[m->m_pkthdr.msg_pri];
+
+ /* note if we need to propogate M_EOR to the last mbuf */
+ if (m->m_flags & M_EOR) {
+ set_eor = 1;
+
+ /* Reset M_EOR from the first mbuf */
+ m->m_flags &= ~(M_EOR);
+ }
+
+ if (priq->msgq_head == NULL) {
+ VERIFY(priq->msgq_tail == NULL && priq->msgq_lastmsg == NULL);
+ priq->msgq_head = priq->msgq_lastmsg = m;
+ } else {
+ VERIFY(priq->msgq_tail->m_next == NULL);
+
+ /* Check if the last message has M_EOR flag set */
+ if (priq->msgq_tail->m_flags & M_EOR) {
+ /* Insert as a new message */
+ priq->msgq_lastmsg->m_nextpkt = m;
+
+ /* move the lastmsg pointer */
+ priq->msgq_lastmsg = m;
+ } else {
+ /* Append to the existing message */
+ priq->msgq_tail->m_next = m;
+ }
+ }
+
+ /* Update accounting and the queue tail pointer */
+
+ while (m->m_next != NULL) {
+ sballoc(sb, m);
+ priq->msgq_bytes += m->m_len;
+ m = m->m_next;
+ }
+ sballoc(sb, m);
+ priq->msgq_bytes += m->m_len;
+
+ if (set_eor) {
+ m->m_flags |= M_EOR;
+
+ /*
+ * Since the user space can not write a new msg
+ * without completing the previous one, we can
+ * reset this flag to start sending again.
+ */
+ priq->msgq_flags &= ~(MSGQ_MSG_NOTDONE);
+ }
+
+ priq->msgq_tail = m;
+
+ SBLASTRECORDCHK(sb, "sbappendstream 2");
+ postevent(0, sb, EV_RWBYTES);
+ return (1);
+}
+
+/*
+ * Pull data from priority queues to the serial snd queue
+ * right before sending.
+ */
+void
+sbpull_unordered_data(struct socket *so, int32_t off, int32_t len)
+{
+ int32_t topull, i;
+ struct msg_priq *priq = NULL;
+
+ VERIFY(so->so_msg_state != NULL);
+
+ topull = (off + len) - so->so_msg_state->msg_serial_bytes;
+
+ i = MSG_PRI_MAX;
+ while (i >= MSG_PRI_MIN && topull > 0) {
+ struct mbuf *m = NULL, *mqhead = NULL, *mend = NULL;
+ priq = &so->so_msg_state->msg_priq[i];
+ if ((priq->msgq_flags & MSGQ_MSG_NOTDONE) &&
+ priq->msgq_head == NULL) {
+ /*
+ * We were in the middle of sending
+ * a message and we have not seen the
+ * end of it.
+ */
+ VERIFY(priq->msgq_lastmsg == NULL &&
+ priq->msgq_tail == NULL);
+ return;
+ }
+ if (priq->msgq_head != NULL) {
+ int32_t bytes = 0, topull_tmp = topull;
+ /*
+ * We found a msg while scanning the priority
+ * queue from high to low priority.
+ */
+ m = priq->msgq_head;
+ mqhead = m;
+ mend = m;
+
+ /*
+ * Move bytes from the priority queue to the
+ * serial queue. Compute the number of bytes
+ * being added.
+ */
+ while (mqhead->m_next != NULL && topull_tmp > 0) {
+ bytes += mqhead->m_len;
+ topull_tmp -= mqhead->m_len;
+ mend = mqhead;
+ mqhead = mqhead->m_next;
+ }
+
+ if (mqhead->m_next == NULL) {
+ /*
+ * If we have only one more mbuf left,
+ * move the last mbuf of this message to
+ * serial queue and set the head of the
+ * queue to be the next message.
+ */
+ bytes += mqhead->m_len;
+ mend = mqhead;
+ mqhead = m->m_nextpkt;
+ if (!(mend->m_flags & M_EOR)) {
+ /*
+ * We have not seen the end of
+ * this message, so we can not
+ * pull anymore.
+ */
+ priq->msgq_flags |= MSGQ_MSG_NOTDONE;
+ } else {
+ /* Reset M_EOR */
+ mend->m_flags &= ~(M_EOR);
+ }
+ } else {
+ /* propogate the next msg pointer */
+ mqhead->m_nextpkt = m->m_nextpkt;
+ }
+ priq->msgq_head = mqhead;
+
+ /*
+ * if the lastmsg pointer points to
+ * the mbuf that is being dequeued, update
+ * it to point to the new head.
+ */
+ if (priq->msgq_lastmsg == m)
+ priq->msgq_lastmsg = priq->msgq_head;
+
+ m->m_nextpkt = NULL;
+ mend->m_next = NULL;
+
+ if (priq->msgq_head == NULL) {
+ /* Moved all messages, update tail */
+ priq->msgq_tail = NULL;
+ VERIFY(priq->msgq_lastmsg == NULL);
+ }
+
+ /* Move it to serial sb_mb queue */
+ if (so->so_snd.sb_mb == NULL) {
+ so->so_snd.sb_mb = m;
+ } else {
+ so->so_snd.sb_mbtail->m_next = m;
+ }
+
+ priq->msgq_bytes -= bytes;
+ VERIFY(priq->msgq_bytes >= 0);
+ sbwakeup(&so->so_snd);
+
+ so->so_msg_state->msg_serial_bytes += bytes;
+ so->so_snd.sb_mbtail = mend;
+ so->so_snd.sb_lastrecord = so->so_snd.sb_mb;
+
+ topull =
+ (off + len) - so->so_msg_state->msg_serial_bytes;
+
+ if (priq->msgq_flags & MSGQ_MSG_NOTDONE)
+ break;
+ } else {
+ --i;
+ }
+ }
+ sblastrecordchk(&so->so_snd, "sbpull_unordered_data");
+ sblastmbufchk(&so->so_snd, "sbpull_unordered_data");