+ if ((m != NULL) && (m_pktlen(m) <= 0)) {
+ m_freem(m);
+ return (ret);
+ }
+
+ if (so->so_flags & SOF_ENABLE_MSGS) {
+ ret = sbappendmsgstream_rcv(&so->so_rcv, m, seqnum, unordered);
+ }
+#if MPTCP
+ else if (so->so_flags & SOF_MPTCP_TRUE) {
+ ret = sbappendmptcpstream_rcv(&so->so_rcv, m);
+ }
+#endif /* MPTCP */
+ else {
+ ret = sbappendstream(&so->so_rcv, m);
+ }
+ return (ret);
+}
+
+#if MPTCP
+int
+sbappendmptcpstream_rcv(struct sockbuf *sb, struct mbuf *m)
+{
+ struct socket *so = sb->sb_so;
+
+ VERIFY(m == NULL || (m->m_flags & M_PKTHDR));
+ /* SB_NOCOMPRESS must be set prevent loss of M_PKTHDR data */
+ VERIFY((sb->sb_flags & (SB_RECV|SB_NOCOMPRESS)) ==
+ (SB_RECV|SB_NOCOMPRESS));
+
+ if (m == NULL || m_pktlen(m) == 0 || (sb->sb_flags & SB_DROP) ||
+ (so->so_state & SS_CANTRCVMORE)) {
+ if (m != NULL)
+ m_freem(m);
+ return (0);
+ }
+ /* the socket is not closed, so SOF_MP_SUBFLOW must be set */
+ VERIFY(so->so_flags & SOF_MP_SUBFLOW);
+
+ if (m->m_nextpkt != NULL || (sb->sb_mb != sb->sb_lastrecord)) {
+ panic("%s: nexpkt %p || mb %p != lastrecord %p\n", __func__,
+ m->m_nextpkt, sb->sb_mb, sb->sb_lastrecord);
+ /* NOTREACHED */
+ }
+
+ SBLASTMBUFCHK(sb, __func__);
+
+ if (mptcp_adj_rmap(so, m) != 0)
+ return (0);
+
+ /* No filter support (SB_RECV) on mptcp subflow sockets */
+
+ sbcompress(sb, m, sb->sb_mbtail);
+ sb->sb_lastrecord = sb->sb_mb;
+ SBLASTRECORDCHK(sb, __func__);
+ return (1);
+}
+#endif /* MPTCP */
+
+/*
+ * Append message to send socket buffer based on priority.
+ */
+int
+sbappendmsg_snd(struct sockbuf *sb, struct mbuf *m)
+{
+ struct socket *so = sb->sb_so;
+ struct msg_priq *priq;
+ int set_eor = 0;
+
+ VERIFY(so->so_msg_state != NULL);
+
+ if (m->m_nextpkt != NULL || (sb->sb_mb != sb->sb_lastrecord))
+ panic("sbappendstream: nexpkt %p || mb %p != lastrecord %p\n",
+ m->m_nextpkt, sb->sb_mb, sb->sb_lastrecord);
+
+ SBLASTMBUFCHK(sb, __func__);
+
+ if (m == NULL || (sb->sb_flags & SB_DROP) || so->so_msg_state == NULL) {
+ if (m != NULL)
+ m_freem(m);
+ return (0);
+ }
+
+ priq = &so->so_msg_state->msg_priq[m->m_pkthdr.msg_pri];
+
+ /* note if we need to propogate M_EOR to the last mbuf */
+ if (m->m_flags & M_EOR) {
+ set_eor = 1;
+
+ /* Reset M_EOR from the first mbuf */
+ m->m_flags &= ~(M_EOR);
+ }
+
+ if (priq->msgq_head == NULL) {
+ VERIFY(priq->msgq_tail == NULL && priq->msgq_lastmsg == NULL);
+ priq->msgq_head = priq->msgq_lastmsg = m;
+ } else {
+ VERIFY(priq->msgq_tail->m_next == NULL);
+
+ /* Check if the last message has M_EOR flag set */
+ if (priq->msgq_tail->m_flags & M_EOR) {
+ /* Insert as a new message */
+ priq->msgq_lastmsg->m_nextpkt = m;
+
+ /* move the lastmsg pointer */
+ priq->msgq_lastmsg = m;
+ } else {
+ /* Append to the existing message */
+ priq->msgq_tail->m_next = m;
+ }
+ }
+
+ /* Update accounting and the queue tail pointer */
+
+ while (m->m_next != NULL) {
+ sballoc(sb, m);
+ priq->msgq_bytes += m->m_len;
+ m = m->m_next;
+ }
+ sballoc(sb, m);
+ priq->msgq_bytes += m->m_len;
+
+ if (set_eor) {
+ m->m_flags |= M_EOR;
+
+ /*
+ * Since the user space can not write a new msg
+ * without completing the previous one, we can
+ * reset this flag to start sending again.
+ */
+ priq->msgq_flags &= ~(MSGQ_MSG_NOTDONE);
+ }
+
+ priq->msgq_tail = m;
+
+ SBLASTRECORDCHK(sb, "sbappendstream 2");
+ postevent(0, sb, EV_RWBYTES);
+ return (1);
+}
+
+/*
+ * Pull data from priority queues to the serial snd queue
+ * right before sending.
+ */
+void
+sbpull_unordered_data(struct socket *so, int32_t off, int32_t len)
+{
+ int32_t topull, i;
+ struct msg_priq *priq = NULL;
+
+ VERIFY(so->so_msg_state != NULL);
+
+ topull = (off + len) - so->so_msg_state->msg_serial_bytes;
+
+ i = MSG_PRI_MAX;
+ while (i >= MSG_PRI_MIN && topull > 0) {
+ struct mbuf *m = NULL, *mqhead = NULL, *mend = NULL;
+ priq = &so->so_msg_state->msg_priq[i];
+ if ((priq->msgq_flags & MSGQ_MSG_NOTDONE) &&
+ priq->msgq_head == NULL) {
+ /*
+ * We were in the middle of sending
+ * a message and we have not seen the
+ * end of it.
+ */
+ VERIFY(priq->msgq_lastmsg == NULL &&
+ priq->msgq_tail == NULL);
+ return;
+ }
+ if (priq->msgq_head != NULL) {
+ int32_t bytes = 0, topull_tmp = topull;
+ /*
+ * We found a msg while scanning the priority
+ * queue from high to low priority.
+ */
+ m = priq->msgq_head;
+ mqhead = m;
+ mend = m;
+
+ /*
+ * Move bytes from the priority queue to the
+ * serial queue. Compute the number of bytes
+ * being added.
+ */
+ while (mqhead->m_next != NULL && topull_tmp > 0) {
+ bytes += mqhead->m_len;
+ topull_tmp -= mqhead->m_len;
+ mend = mqhead;
+ mqhead = mqhead->m_next;
+ }
+
+ if (mqhead->m_next == NULL) {
+ /*
+ * If we have only one more mbuf left,
+ * move the last mbuf of this message to
+ * serial queue and set the head of the
+ * queue to be the next message.
+ */
+ bytes += mqhead->m_len;
+ mend = mqhead;
+ mqhead = m->m_nextpkt;
+ if (!(mend->m_flags & M_EOR)) {
+ /*
+ * We have not seen the end of
+ * this message, so we can not
+ * pull anymore.
+ */
+ priq->msgq_flags |= MSGQ_MSG_NOTDONE;
+ } else {
+ /* Reset M_EOR */
+ mend->m_flags &= ~(M_EOR);
+ }
+ } else {
+ /* propogate the next msg pointer */
+ mqhead->m_nextpkt = m->m_nextpkt;
+ }
+ priq->msgq_head = mqhead;
+
+ /*
+ * if the lastmsg pointer points to
+ * the mbuf that is being dequeued, update
+ * it to point to the new head.
+ */
+ if (priq->msgq_lastmsg == m)
+ priq->msgq_lastmsg = priq->msgq_head;
+
+ m->m_nextpkt = NULL;
+ mend->m_next = NULL;
+
+ if (priq->msgq_head == NULL) {
+ /* Moved all messages, update tail */
+ priq->msgq_tail = NULL;
+ VERIFY(priq->msgq_lastmsg == NULL);
+ }
+
+ /* Move it to serial sb_mb queue */
+ if (so->so_snd.sb_mb == NULL) {
+ so->so_snd.sb_mb = m;
+ } else {
+ so->so_snd.sb_mbtail->m_next = m;
+ }
+
+ priq->msgq_bytes -= bytes;
+ VERIFY(priq->msgq_bytes >= 0);
+ sbwakeup(&so->so_snd);
+
+ so->so_msg_state->msg_serial_bytes += bytes;
+ so->so_snd.sb_mbtail = mend;
+ so->so_snd.sb_lastrecord = so->so_snd.sb_mb;
+
+ topull =
+ (off + len) - so->so_msg_state->msg_serial_bytes;
+
+ if (priq->msgq_flags & MSGQ_MSG_NOTDONE)
+ break;
+ } else {
+ --i;
+ }
+ }
+ sblastrecordchk(&so->so_snd, "sbpull_unordered_data");
+ sblastmbufchk(&so->so_snd, "sbpull_unordered_data");
+}
+
+/*
+ * Compress mbuf chain m into the socket
+ * buffer sb following mbuf n. If n
+ * is null, the buffer is presumed empty.
+ */
+static inline void
+sbcompress(struct sockbuf *sb, struct mbuf *m, struct mbuf *n)
+{
+ int eor = 0, compress = (!(sb->sb_flags & SB_NOCOMPRESS));
+ struct mbuf *o;
+
+ if (m == NULL) {
+ /* There is nothing to compress; just update the tail */
+ for (; n->m_next != NULL; n = n->m_next)
+ ;
+ sb->sb_mbtail = n;
+ goto done;
+ }
+
+ while (m != NULL) {
+ eor |= m->m_flags & M_EOR;
+ if (compress && m->m_len == 0 && (eor == 0 ||
+ (((o = m->m_next) || (o = n)) && o->m_type == m->m_type))) {
+ if (sb->sb_lastrecord == m)
+ sb->sb_lastrecord = m->m_next;
+ m = m_free(m);
+ continue;
+ }
+ if (compress && n != NULL && (n->m_flags & M_EOR) == 0 &&
+#ifndef __APPLE__
+ M_WRITABLE(n) &&
+#endif
+ m->m_len <= MCLBYTES / 4 && /* XXX: Don't copy too much */
+ m->m_len <= M_TRAILINGSPACE(n) &&
+ n->m_type == m->m_type) {
+ bcopy(mtod(m, caddr_t), mtod(n, caddr_t) + n->m_len,
+ (unsigned)m->m_len);
+ n->m_len += m->m_len;
+ sb->sb_cc += m->m_len;
+ if (m->m_type != MT_DATA && m->m_type != MT_HEADER &&
+ m->m_type != MT_OOBDATA) {
+ /* XXX: Probably don't need */
+ sb->sb_ctl += m->m_len;
+ }
+
+ /* update send byte count */
+ if (sb->sb_flags & SB_SNDBYTE_CNT) {
+ inp_incr_sndbytes_total(sb->sb_so,
+ m->m_len);
+ inp_incr_sndbytes_unsent(sb->sb_so,
+ m->m_len);
+ }
+ m = m_free(m);
+ continue;
+ }
+ if (n != NULL)
+ n->m_next = m;
+ else
+ sb->sb_mb = m;
+ sb->sb_mbtail = m;
+ sballoc(sb, m);
+ n = m;
+ m->m_flags &= ~M_EOR;
+ m = m->m_next;
+ n->m_next = NULL;
+ }
+ if (eor != 0) {
+ if (n != NULL)
+ n->m_flags |= eor;
+ else
+ printf("semi-panic: sbcompress\n");
+ }
+done:
+ SBLASTMBUFCHK(sb, __func__);
+ postevent(0, sb, EV_RWBYTES);
+}
+
+void
+sb_empty_assert(struct sockbuf *sb, const char *where)
+{
+ if (!(sb->sb_cc == 0 && sb->sb_mb == NULL && sb->sb_mbcnt == 0 &&
+ sb->sb_mbtail == NULL && sb->sb_lastrecord == NULL)) {
+ panic("%s: sb %p so %p cc %d mbcnt %d mb %p mbtail %p "
+ "lastrecord %p\n", where, sb, sb->sb_so, sb->sb_cc,
+ sb->sb_mbcnt, sb->sb_mb, sb->sb_mbtail,
+ sb->sb_lastrecord);
+ /* NOTREACHED */
+ }
+}
+
+static void
+sbflush_priq(struct msg_priq *priq)
+{
+ struct mbuf *m;
+ m = priq->msgq_head;
+ if (m != NULL)
+ m_freem_list(m);
+ priq->msgq_head = priq->msgq_tail = priq->msgq_lastmsg = NULL;
+ priq->msgq_bytes = priq->msgq_flags = 0;
+}
+
+/*
+ * Free all mbufs in a sockbuf.
+ * Check that all resources are reclaimed.
+ */
+void
+sbflush(struct sockbuf *sb)
+{
+ void *lr_saved = __builtin_return_address(0);
+ struct socket *so = sb->sb_so;
+ u_int32_t i;
+
+ /* so_usecount may be 0 if we get here from sofreelastref() */
+ if (so == NULL) {
+ panic("%s: null so, sb=%p sb_flags=0x%x lr=%p\n",
+ __func__, sb, sb->sb_flags, lr_saved);
+ /* NOTREACHED */
+ } else if (so->so_usecount < 0) {
+ panic("%s: sb=%p sb_flags=0x%x sb_so=%p usecount=%d lr=%p "
+ "lrh= %s\n", __func__, sb, sb->sb_flags, so,
+ so->so_usecount, lr_saved, solockhistory_nr(so));
+ /* NOTREACHED */
+ }
+
+ /*
+ * Obtain lock on the socket buffer (SB_LOCK). This is required
+ * to prevent the socket buffer from being unexpectedly altered
+ * while it is used by another thread in socket send/receive.
+ *
+ * sblock() must not fail here, hence the assertion.
+ */
+ (void) sblock(sb, SBL_WAIT | SBL_NOINTR | SBL_IGNDEFUNCT);
+ VERIFY(sb->sb_flags & SB_LOCK);
+
+ while (sb->sb_mbcnt > 0) {
+ /*
+ * Don't call sbdrop(sb, 0) if the leading mbuf is non-empty:
+ * we would loop forever. Panic instead.
+ */
+ if (!sb->sb_cc && (sb->sb_mb == NULL || sb->sb_mb->m_len))
+ break;
+ sbdrop(sb, (int)sb->sb_cc);
+ }
+
+ if (!(sb->sb_flags & SB_RECV) && (so->so_flags & SOF_ENABLE_MSGS)) {
+ VERIFY(so->so_msg_state != NULL);
+ for (i = MSG_PRI_MIN; i <= MSG_PRI_MAX; ++i) {
+ sbflush_priq(&so->so_msg_state->msg_priq[i]);
+ }
+ so->so_msg_state->msg_serial_bytes = 0;
+ so->so_msg_state->msg_uno_bytes = 0;
+ }
+
+ sb_empty_assert(sb, __func__);
+ postevent(0, sb, EV_RWBYTES);
+
+ sbunlock(sb, TRUE); /* keep socket locked */
+}
+
+/*
+ * Drop data from (the front of) a sockbuf.
+ * use m_freem_list to free the mbuf structures
+ * under a single lock... this is done by pruning
+ * the top of the tree from the body by keeping track
+ * of where we get to in the tree and then zeroing the
+ * two pertinent pointers m_nextpkt and m_next
+ * the socket buffer is then updated to point at the new
+ * top of the tree and the pruned area is released via
+ * m_freem_list.
+ */
+void
+sbdrop(struct sockbuf *sb, int len)
+{
+ struct mbuf *m, *free_list, *ml;
+ struct mbuf *next, *last;
+
+ next = (m = sb->sb_mb) ? m->m_nextpkt : 0;
+#if MPTCP
+ if ((m != NULL) && (len > 0) &&
+ (!(sb->sb_flags & SB_RECV)) &&
+ ((sb->sb_so->so_flags & SOF_MP_SUBFLOW) ||
+ ((SOCK_CHECK_DOM(sb->sb_so, PF_MULTIPATH)) &&
+ (SOCK_CHECK_PROTO(sb->sb_so, IPPROTO_TCP)))) &&
+ (!(sb->sb_so->so_flags1 & SOF1_POST_FALLBACK_SYNC))) {
+ mptcp_preproc_sbdrop(sb->sb_so, m, (unsigned int)len);
+ }
+#endif /* MPTCP */
+ KERNEL_DEBUG((DBG_FNC_SBDROP | DBG_FUNC_START), sb, len, 0, 0, 0);
+
+ free_list = last = m;
+ ml = (struct mbuf *)0;