]> git.saurik.com Git - apple/xnu.git/blobdiff - bsd/kern/sys_pipe.c
xnu-2782.1.97.tar.gz
[apple/xnu.git] / bsd / kern / sys_pipe.c
index c374ea07e6dd3d32e761034a7c8417b652327e05..374d82381f46dc38b56986ac68bf6a043e894dec 100644 (file)
@@ -17,7 +17,7 @@
  *    are met.
  */
 /*
- * Copyright (c) 2003-2007 Apple Inc. All rights reserved.
+ * Copyright (c) 2003-2014 Apple Inc. All rights reserved.
  *
  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
  * 
  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
  * all features of sockets, but does do everything that pipes normally
  * do.
+ *
+ * Pipes are implemented as circular buffers. Following are the valid states in pipes operations
+ *  
+ *      _________________________________
+ * 1.  |_________________________________| r=w, c=0
+ * 
+ *      _________________________________
+ * 2.  |__r:::::wc_______________________| r <= w , c > 0
+ *
+ *      _________________________________
+ * 3.  |::::wc_____r:::::::::::::::::::::| r>w , c > 0
+ *
+ *      _________________________________
+ * 4.  |:::::::wrc:::::::::::::::::::::::| w=r, c = Max size
+ *
+ *
+ *  Nomenclature:-
+ *  a-z define the steps in a program flow
+ *  1-4 are the states as defined aboe
+ *  Action: is what file operation is done on the pipe
+ *  
+ *  Current:None  Action: initialize with size M=200
+ *  a. State 1 ( r=0, w=0, c=0)
+ *  
+ *  Current: a    Action: write(100) (w < M)
+ *  b. State 2 (r=0, w=100, c=100)
+ *  
+ *  Current: b    Action: write(100) (w = M-w)
+ *  c. State 4 (r=0,w=0,c=200)
+ *  
+ *  Current: b    Action: read(70)  ( r < c )
+ *  d. State 2(r=70,w=100,c=30)
+ *  
+ *  Current: d   Action: write(75) ( w < (m-w))
+ *  e. State 2 (r=70,w=175,c=105)
+ *  
+ *  Current: d    Action: write(110) ( w > (m-w))
+ *  f. State 3 (r=70,w=10,c=140)
+ *  
+ *  Current: d   Action: read(30) (r >= c )
+ *  g. State 1 (r=100,w=100,c=0)
+ *  
  */
 
 /*
- * This code has two modes of operation, a small write mode and a large
- * write mode.  The small write mode acts like conventional pipes with
- * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
- * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
- * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
- * the receiving process can copy it directly from the pages in the sending
- * process.
- *
- * If the sending process receives a signal, it is possible that it will
- * go away, and certainly its address space can change, because control
- * is returned back to the user-mode side.  In that case, the pipe code
- * arranges to copy the buffer supplied by the user process, to a pageable
- * kernel buffer, and the receiving process will grab the data from the
- * pageable kernel buffer.  Since signals don't happen all that often,
- * the copy operation is normally eliminated.
- *
- * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
- * happen for small transfers so that the system will not spend all of
- * its time context switching.
+ * This code create half duplex pipe buffers for facilitating file like
+ * operations on pipes. The initial buffer is very small, but this can
+ * dynamically change to larger sizes based on usage. The buffer size is never
+ * reduced. The total amount of kernel memory used is governed by maxpipekva.
+ * In case of dynamic expansion limit is reached, the output thread is blocked
+ * until the pipe buffer empties enough to continue. 
  *
  * In order to limit the resource use of pipes, two sysctls exist:
  *
  * kern.ipc.maxpipekva - This is a hard limit on the amount of pageable
- * address space available to us in pipe_map.  Whenever the amount in use
- * exceeds half of this value, all new pipes will be created with size
- * SMALL_PIPE_SIZE, rather than PIPE_SIZE.  Big pipe creation will be limited
- * as well.  This value is loader tunable only.
- *
- * kern.ipc.maxpipekvawired - This value limits the amount of memory that may
- * be wired in order to facilitate direct copies using page flipping.
- * Whenever this value is exceeded, pipes will fall back to using regular
- * copies.  This value is sysctl controllable at all times.
- *
- * These values are autotuned in subr_param.c.
+ * address space available to us in pipe_map. 
  *
  * Memory usage may be monitored through the sysctls
- * kern.ipc.pipes, kern.ipc.pipekva and kern.ipc.pipekvawired.
+ * kern.ipc.pipes, kern.ipc.pipekva.
  *
  */
 
 #include <sys/kdebug.h>
 
 #include <kern/zalloc.h>
+#include <kern/kalloc.h>
 #include <vm/vm_kern.h>
 #include <libkern/OSAtomic.h>
 
 #define f_flag f_fglob->fg_flag
-#define f_type f_fglob->fg_type
 #define f_msgcount f_fglob->fg_msgcount
 #define f_cred f_fglob->fg_cred
 #define f_ops f_fglob->fg_ops
 #define f_offset f_fglob->fg_offset
 #define f_data f_fglob->fg_data
-/*
- * Use this define if you want to disable *fancy* VM things.  Expect an
- * approx 30% decrease in transfer rate.  This could be useful for
- * NetBSD or OpenBSD.
- *
- * this needs to be ported to X and the performance measured
- * before committing to supporting it
- */
-#define PIPE_NODIRECT  1
-
-#ifndef PIPE_NODIRECT
-
-#include <vm/vm.h>
-#include <vm/vm_param.h>
-#include <vm/vm_object.h>
-#include <vm/vm_kern.h>
-#include <vm/vm_extern.h>
-#include <vm/pmap.h>
-#include <vm/vm_map.h>
-#include <vm/vm_page.h>
-#include <vm/uma.h>
-
-#endif
 
 /*
- * interfaces to the outside world
+ * interfaces to the outside world exported through file operations 
  */
 static int pipe_read(struct fileproc *fp, struct uio *uio,
                 int flags, vfs_context_t ctx);
-
 static int pipe_write(struct fileproc *fp, struct uio *uio,
                 int flags, vfs_context_t ctx);
-
 static int pipe_close(struct fileglob *fg, vfs_context_t ctx);
-
 static int pipe_select(struct fileproc *fp, int which, void * wql,
                vfs_context_t ctx);
-
 static int pipe_kqfilter(struct fileproc *fp, struct knote *kn,
                vfs_context_t ctx);
-
 static int pipe_ioctl(struct fileproc *fp, u_long cmd, caddr_t data,
                vfs_context_t ctx);
-
 static int pipe_drain(struct fileproc *fp,vfs_context_t ctx);
 
-
-struct  fileops pipeops =
-  { pipe_read,
-    pipe_write,
-    pipe_ioctl,
-    pipe_select,
-    pipe_close,
-    pipe_kqfilter,
-    pipe_drain };
-
+static const struct fileops pipeops = {
+       DTYPE_PIPE,
+       pipe_read,
+       pipe_write,
+       pipe_ioctl,
+       pipe_select,
+       pipe_close,
+       pipe_kqfilter,
+       pipe_drain
+};
 
 static void    filt_pipedetach(struct knote *kn);
 static int     filt_piperead(struct knote *kn, long hint);
@@ -200,33 +190,18 @@ static struct filterops pipe_rfiltops = {
         .f_detach = filt_pipedetach,
         .f_event = filt_piperead,
 };
+
 static struct filterops pipe_wfiltops = {
         .f_isfd = 1,
         .f_detach = filt_pipedetach,
         .f_event = filt_pipewrite,
 };
 
-/*
- * Default pipe buffer size(s), this can be kind-of large now because pipe
- * space is pageable.  The pipe code will try to maintain locality of
- * reference for performance reasons, so small amounts of outstanding I/O
- * will not wipe the cache.
- */
-#define MINPIPESIZE (PIPE_SIZE/3)
+static int nbigpipe;      /* for compatibility sake. no longer used */
+static int amountpipes;   /* total number of pipes in system */
+static int amountpipekva; /* total memory used by pipes */
 
-/*
- * Limit the number of "big" pipes
- */
-#define LIMITBIGPIPES  32
-static int nbigpipe;
-
-static int amountpipes;
-static int amountpipekva;
-
-#ifndef PIPE_NODIRECT
-static int amountpipekvawired;
-#endif
-int maxpipekva = 1024 * 1024 * 16;
+int maxpipekva __attribute__((used)) = PIPE_KVAMAX;  /* allowing 16MB max. */
 
 #if PIPE_SYSCTLS
 SYSCTL_DECL(_kern_ipc);
@@ -248,46 +223,70 @@ SYSCTL_INT(_kern_ipc, OID_AUTO, pipekvawired, CTLFLAG_RD|CTLFLAG_LOCKED,
 static void pipeclose(struct pipe *cpipe);
 static void pipe_free_kmem(struct pipe *cpipe);
 static int pipe_create(struct pipe **cpipep);
+static int pipespace(struct pipe *cpipe, int size);
+static int choose_pipespace(unsigned long current, unsigned long expected);
+static int expand_pipespace(struct pipe *p, int target_size);
 static void pipeselwakeup(struct pipe *cpipe, struct pipe *spipe);
-static __inline int pipelock(struct pipe *cpipe, int catch);
-static __inline void pipeunlock(struct pipe *cpipe);
-
-#ifndef PIPE_NODIRECT
-static int pipe_build_write_buffer(struct pipe *wpipe, struct uio *uio);
-static void pipe_destroy_write_buffer(struct pipe *wpipe);
-static int pipe_direct_write(struct pipe *wpipe, struct uio *uio);
-static void pipe_clone_write_buffer(struct pipe *wpipe);
-#endif
+static __inline int pipeio_lock(struct pipe *cpipe, int catch);
+static __inline void pipeio_unlock(struct pipe *cpipe);
 
 extern int postpipeevent(struct pipe *, int);
 extern void evpipefree(struct pipe *cpipe);
 
-
-static int pipespace(struct pipe *cpipe, int size);
-
 static lck_grp_t       *pipe_mtx_grp;
 static lck_attr_t      *pipe_mtx_attr;
 static lck_grp_attr_t  *pipe_mtx_grp_attr;
 
 static zone_t pipe_zone;
 
+#define MAX_PIPESIZE(pipe)             ( MAX(PIPE_SIZE, (pipe)->pipe_buffer.size) )
+
+#define        PIPE_GARBAGE_AGE_LIMIT          5000    /* In milliseconds */
+#define PIPE_GARBAGE_QUEUE_LIMIT       32000
+
+struct pipe_garbage {
+       struct pipe             *pg_pipe;
+       struct pipe_garbage     *pg_next;
+       uint64_t                pg_timestamp;
+};
+
+static zone_t pipe_garbage_zone;
+static struct pipe_garbage *pipe_garbage_head = NULL;
+static struct pipe_garbage *pipe_garbage_tail = NULL;
+static uint64_t pipe_garbage_age_limit = PIPE_GARBAGE_AGE_LIMIT;
+static int pipe_garbage_count = 0;
+static lck_mtx_t *pipe_garbage_lock;
+static void pipe_garbage_collect(struct pipe *cpipe);
+
 SYSINIT(vfs, SI_SUB_VFS, SI_ORDER_ANY, pipeinit, NULL);
 
+/* initial setup done at time of sysinit */
 void
 pipeinit(void)
 {
-        pipe_zone = (zone_t)zinit(sizeof(struct pipe), 8192 * sizeof(struct pipe), 4096, "pipe zone");
+       nbigpipe=0;
+       vm_size_t zone_size;
+       zone_size = 8192 * sizeof(struct pipe);
+        pipe_zone = zinit(sizeof(struct pipe), zone_size, 4096, "pipe zone");
 
-       /*
-        * allocate lock group attribute and group for pipe mutexes
-        */
+
+       /* allocate lock group attribute and group for pipe mutexes */
        pipe_mtx_grp_attr = lck_grp_attr_alloc_init();
        pipe_mtx_grp = lck_grp_alloc_init("pipe", pipe_mtx_grp_attr);
 
+       /* allocate the lock attribute for pipe mutexes */
+       pipe_mtx_attr = lck_attr_alloc_init();
+
        /*
-        * allocate the lock attribute for pipe mutexes
+        * Set up garbage collection for dead pipes
         */
-       pipe_mtx_attr = lck_attr_alloc_init();
+       zone_size = (PIPE_GARBAGE_QUEUE_LIMIT + 20) *
+           sizeof(struct pipe_garbage);
+        pipe_garbage_zone = (zone_t)zinit(sizeof(struct pipe_garbage),
+           zone_size, 4096, "pipe garbage zone");
+       pipe_garbage_lock = lck_mtx_alloc_init(pipe_mtx_grp, pipe_mtx_attr);
+       
 }
 
 /* Bitmap for things to touch in pipe_touch() */
@@ -318,10 +317,80 @@ pipe_touch(struct pipe *tpipe, int touch)
        }
 }
 
+static const unsigned int pipesize_blocks[] = {128,256,1024,2048,4096, 4096 * 2, PIPE_SIZE , PIPE_SIZE * 4 };
+
+/* 
+ * finds the right size from possible sizes in pipesize_blocks 
+ * returns the size which matches max(current,expected) 
+ */
+static int 
+choose_pipespace(unsigned long current, unsigned long expected)
+{
+       int i = sizeof(pipesize_blocks)/sizeof(unsigned int) -1;
+       unsigned long target;
+
+       if (expected > current) 
+               target = expected;
+       else
+               target = current;
+
+       while ( i >0 && pipesize_blocks[i-1] > target) {
+               i=i-1;
+
+       }
+       
+       return pipesize_blocks[i];
+}
+
+
+/*
+ * expand the size of pipe while there is data to be read,
+ * and then free the old buffer once the current buffered
+ * data has been transferred to new storage.
+ * Required: PIPE_LOCK and io lock to be held by caller.
+ * returns 0 on success or no expansion possible
+ */
+static int 
+expand_pipespace(struct pipe *p, int target_size)
+{
+       struct pipe tmp, oldpipe;
+       int error;
+       tmp.pipe_buffer.buffer = 0;
+       
+       if (p->pipe_buffer.size >= (unsigned) target_size) {
+               return 0; /* the existing buffer is max size possible */
+       }
+       
+       /* create enough space in the target */
+       error = pipespace(&tmp, target_size);
+       if (error != 0)
+               return (error);
+
+       oldpipe.pipe_buffer.buffer = p->pipe_buffer.buffer;
+       oldpipe.pipe_buffer.size = p->pipe_buffer.size;
+       
+       memcpy(tmp.pipe_buffer.buffer, p->pipe_buffer.buffer, p->pipe_buffer.size);
+       if (p->pipe_buffer.cnt > 0 && p->pipe_buffer.in <= p->pipe_buffer.out ){
+               /* we are in State 3 and need extra copying for read to be consistent */
+               memcpy(&tmp.pipe_buffer.buffer[p->pipe_buffer.size], p->pipe_buffer.buffer, p->pipe_buffer.size);
+               p->pipe_buffer.in += p->pipe_buffer.size;
+       }
+
+       p->pipe_buffer.buffer = tmp.pipe_buffer.buffer;
+       p->pipe_buffer.size = tmp.pipe_buffer.size;
 
 
+       pipe_free_kmem(&oldpipe);
+       return 0;
+}
+
 /*
  * The pipe system call for the DTYPE_PIPE type of pipes
+ * 
+ * returns:
+ *  FREAD  | fd0 | -->[struct rpipe] --> |~~buffer~~| \  
+ *                                                    (pipe_mutex)
+ *  FWRITE | fd1 | -->[struct wpipe] --X              / 
  */
 
 /* ARGSUSED */
@@ -344,22 +413,12 @@ pipe(proc_t p, __unused struct pipe_args *uap, int32_t *retval)
         /*
         * allocate the space for the normal I/O direction up
         * front... we'll delay the allocation for the other
-        * direction until a write actually occurs (most
-        * likely it won't)...
-        *
-         * Reduce to 1/4th pipe size if we're over our global max.
+        * direction until a write actually occurs (most likely it won't)...
          */
-        if (amountpipekva > maxpipekva / 2)
-               error = pipespace(rpipe, SMALL_PIPE_SIZE);
-        else
-               error = pipespace(rpipe, PIPE_SIZE);
+       error = pipespace(rpipe, choose_pipespace(rpipe->pipe_buffer.size, 0));
         if (error)
                goto freepipes;
 
-#ifndef PIPE_NODIRECT
-       rpipe->pipe_state |= PIPE_DIRECTOK;
-       wpipe->pipe_state |= PIPE_DIRECTOK;
-#endif
        TAILQ_INIT(&rpipe->pipe_evlist);
        TAILQ_INIT(&wpipe->pipe_evlist);
 
@@ -370,12 +429,10 @@ pipe(proc_t p, __unused struct pipe_args *uap, int32_t *retval)
        retval[0] = fd;
 
        /*
-        * for now we'll create half-duplex
-        * pipes... this is what we've always
-        * supported..
+        * for now we'll create half-duplex pipes(refer returns section above). 
+        * this is what we've always supported..
         */
        rf->f_flag = FREAD;
-       rf->f_type = DTYPE_PIPE;
        rf->f_data = (caddr_t)rpipe;
        rf->f_ops = &pipeops;
 
@@ -385,13 +442,13 @@ pipe(proc_t p, __unused struct pipe_args *uap, int32_t *retval)
                goto freepipes;
        }
        wf->f_flag = FWRITE;
-       wf->f_type = DTYPE_PIPE;
        wf->f_data = (caddr_t)wpipe;
        wf->f_ops = &pipeops;
 
        rpipe->pipe_peer = wpipe;
        wpipe->pipe_peer = rpipe;
-       rpipe->pipe_mtxp = wpipe->pipe_mtxp = pmtx;
+       /* both structures share the same mutex */
+       rpipe->pipe_mtxp = wpipe->pipe_mtxp = pmtx; 
 
        retval[1] = fd;
 #if CONFIG_MACF
@@ -448,20 +505,16 @@ pipe_stat(struct pipe *cpipe, void *ub, int isstat64)
        }
 #endif
        if (cpipe->pipe_buffer.buffer == 0) {
-               /*
-                * must be stat'ing the write fd
-                */
+               /* must be stat'ing the write fd */
                if (cpipe->pipe_peer) {
-                       /*
-                        * the peer still exists, use it's info
-                        */
-                       pipe_size  = cpipe->pipe_peer->pipe_buffer.size;
+                       /* the peer still exists, use it's info */
+                       pipe_size  = MAX_PIPESIZE(cpipe->pipe_peer);
                        pipe_count = cpipe->pipe_peer->pipe_buffer.cnt;
                } else {
                        pipe_count = 0;
                }
        } else {
-               pipe_size  = cpipe->pipe_buffer.size;
+               pipe_size  = MAX_PIPESIZE(cpipe);
                pipe_count = cpipe->pipe_buffer.cnt;
        }
        /*
@@ -469,7 +522,7 @@ pipe_stat(struct pipe *cpipe, void *ub, int isstat64)
         * we might catch it in transient state
         */
        if (pipe_size == 0)
-               pipe_size  = PIPE_SIZE;
+               pipe_size  = MAX(PIPE_SIZE, pipesize_blocks[0]);
 
        if (isstat64 != 0) {
                sb64 = (struct stat64 *)ub;     
@@ -497,7 +550,7 @@ pipe_stat(struct pipe *cpipe, void *ub, int isstat64)
                * address of this pipe's struct pipe.  This number may be recycled
                * relatively quickly.
                */
-               sb64->st_ino = (ino64_t)((uintptr_t)cpipe);
+               sb64->st_ino = (ino64_t)VM_KERNEL_ADDRPERM((uintptr_t)cpipe);
        } else {
                sb = (struct stat *)ub; 
 
@@ -524,7 +577,7 @@ pipe_stat(struct pipe *cpipe, void *ub, int isstat64)
                * address of this pipe's struct pipe.  This number may be recycled
                * relatively quickly.
                */
-               sb->st_ino = (ino_t)(uintptr_t)cpipe;
+               sb->st_ino = (ino_t)VM_KERNEL_ADDRPERM((uintptr_t)cpipe);
        }
        PIPE_UNLOCK(cpipe);
 
@@ -551,10 +604,11 @@ pipespace(struct pipe *cpipe, int size)
 {
        vm_offset_t buffer;
 
-       size = round_page(size);
+       if (size <= 0)
+               return(EINVAL);
 
-       if (kmem_alloc(kernel_map, &buffer, size) != KERN_SUCCESS)
-               return(ENOMEM);
+       if ((buffer = (vm_offset_t)kalloc(size)) == 0 )
+               return(ENOMEM);
 
        /* free old resources if we're resizing */
        pipe_free_kmem(cpipe);
@@ -577,7 +631,6 @@ static int
 pipe_create(struct pipe **cpipep)
 {
        struct pipe *cpipe;
-
        cpipe = (struct pipe *)zalloc(pipe_zone);
 
        if ((*cpipep = cpipe) == NULL)
@@ -591,7 +644,6 @@ pipe_create(struct pipe **cpipep)
 
        /* Initial times are all the time of creation of the pipe */
        pipe_touch(cpipe, PIPE_ATIME | PIPE_MTIME | PIPE_CTIME);
-
        return (0);
 }
 
@@ -600,20 +652,17 @@ pipe_create(struct pipe **cpipep)
  * lock a pipe for I/O, blocking other access
  */
 static inline int
-pipelock(struct pipe *cpipe, int catch)
+pipeio_lock(struct pipe *cpipe, int catch)
 {
        int error;
-
        while (cpipe->pipe_state & PIPE_LOCKFL) {
                cpipe->pipe_state |= PIPE_LWANT;
-
                error = msleep(cpipe, PIPE_MTX(cpipe), catch ? (PRIBIO | PCATCH) : PRIBIO,
                               "pipelk", 0);
                if (error != 0) 
                        return (error);
        }
        cpipe->pipe_state |= PIPE_LOCKFL;
-
        return (0);
 }
 
@@ -621,16 +670,18 @@ pipelock(struct pipe *cpipe, int catch)
  * unlock a pipe I/O lock
  */
 static inline void
-pipeunlock(struct pipe *cpipe)
+pipeio_unlock(struct pipe *cpipe)
 {
        cpipe->pipe_state &= ~PIPE_LOCKFL;
-
        if (cpipe->pipe_state & PIPE_LWANT) {
                cpipe->pipe_state &= ~PIPE_LWANT;
                wakeup(cpipe);
        }
 }
 
+/*
+ * wakeup anyone whos blocked in select
+ */
 static void
 pipeselwakeup(struct pipe *cpipe, struct pipe *spipe)
 {
@@ -651,6 +702,10 @@ pipeselwakeup(struct pipe *cpipe, struct pipe *spipe)
         }
 }
 
+/*
+ * Read n bytes from the buffer. Semantics are similar to file read.
+ * returns: number of bytes read from the buffer
+ */
 /* ARGSUSED */
 static int
 pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags,
@@ -664,7 +719,7 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags,
        PIPE_LOCK(rpipe);
        ++rpipe->pipe_busy;
 
-       error = pipelock(rpipe, 1);
+       error = pipeio_lock(rpipe, 1);
        if (error)
                goto unlocked_error;
 
@@ -674,11 +729,17 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags,
                goto locked_error;
 #endif
 
+
        while (uio_resid(uio)) {
                /*
                 * normal pipe buffer receive
                 */
                if (rpipe->pipe_buffer.cnt > 0) {
+                       /*
+                        * # bytes to read is min( bytes from read pointer until end of buffer,
+                        *                         total unread bytes, 
+                        *                         user requested byte count)
+                        */
                        size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
                        if (size > rpipe->pipe_buffer.cnt)
                                size = rpipe->pipe_buffer.cnt;
@@ -686,7 +747,7 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags,
                        if (size > (u_int) uio_resid(uio))
                                size = (u_int) uio_resid(uio);
 
-                       PIPE_UNLOCK(rpipe);
+                       PIPE_UNLOCK(rpipe); /* we still hold io lock.*/
                        error = uiomove(
                            &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
                            size, uio);
@@ -699,7 +760,7 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags,
                                rpipe->pipe_buffer.out = 0;
 
                        rpipe->pipe_buffer.cnt -= size;
-
+                       
                        /*
                         * If there is no more to read in the pipe, reset
                         * its pointers to the beginning.  This improves
@@ -710,32 +771,6 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags,
                                rpipe->pipe_buffer.out = 0;
                        }
                        nread += size;
-#ifndef PIPE_NODIRECT
-               /*
-                * Direct copy, bypassing a kernel buffer.
-                */
-               } else if ((size = rpipe->pipe_map.cnt) &&
-                          (rpipe->pipe_state & PIPE_DIRECTW)) {
-                       caddr_t va;
-                       // LP64todo - fix this!
-                       if (size > (u_int) uio_resid(uio))
-                               size = (u_int) uio_resid(uio);
-
-                       va = (caddr_t) rpipe->pipe_map.kva +
-                           rpipe->pipe_map.pos;
-                       PIPE_UNLOCK(rpipe);
-                       error = uiomove(va, size, uio);
-                       PIPE_LOCK(rpipe);
-                       if (error)
-                               break;
-                       nread += size;
-                       rpipe->pipe_map.pos += size;
-                       rpipe->pipe_map.cnt -= size;
-                       if (rpipe->pipe_map.cnt == 0) {
-                               rpipe->pipe_state &= ~PIPE_DIRECTW;
-                               wakeup(rpipe);
-                       }
-#endif
                } else {
                        /*
                         * detect EOF condition
@@ -754,7 +789,7 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags,
                        }
 
                        /*
-                        * Break if some data was read.
+                        * Break if some data was read in previous iteration.
                         */
                        if (nread > 0)
                                break;
@@ -764,7 +799,7 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags,
                         * We will either break out with an error or we will
                         * sleep and relock to loop.
                         */
-                       pipeunlock(rpipe);
+                       pipeio_unlock(rpipe);
 
                        /*
                         * Handle non-blocking mode operation or
@@ -774,11 +809,9 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags,
                                error = EAGAIN;
                        } else {
                                rpipe->pipe_state |= PIPE_WANTR;
-
                                error = msleep(rpipe, PIPE_MTX(rpipe), PRIBIO | PCATCH, "piperd", 0);
-
                                if (error == 0)
-                                       error = pipelock(rpipe, 1);
+                                       error = pipeio_lock(rpipe, 1);
                        }
                        if (error)
                                goto unlocked_error;
@@ -787,7 +820,7 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags,
 #if CONFIG_MACF
 locked_error:
 #endif
-       pipeunlock(rpipe);
+       pipeio_unlock(rpipe);
 
 unlocked_error:
        --rpipe->pipe_busy;
@@ -798,7 +831,7 @@ unlocked_error:
        if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
                rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
                wakeup(rpipe);
-       } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
+       } else if (rpipe->pipe_buffer.cnt < rpipe->pipe_buffer.size) {
                /*
                 * Handle write blocking hysteresis.
                 */
@@ -808,7 +841,7 @@ unlocked_error:
                }
        }
 
-       if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
+       if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) > 0)
                pipeselwakeup(rpipe, rpipe->pipe_peer);
 
        /* update last read time */
@@ -819,250 +852,10 @@ unlocked_error:
        return (error);
 }
 
-
-
-#ifndef PIPE_NODIRECT
-/*
- * Map the sending processes' buffer into kernel space and wire it.
- * This is similar to a physical write operation.
- */
-static int
-pipe_build_write_buffer(wpipe, uio)
-       struct pipe *wpipe;
-       struct uio *uio;
-{
-       pmap_t pmap;
-       u_int size;
-       int i, j;
-       vm_offset_t addr, endaddr;
-
-
-       size = (u_int) uio->uio_iov->iov_len;
-       if (size > wpipe->pipe_buffer.size)
-               size = wpipe->pipe_buffer.size;
-
-       pmap = vmspace_pmap(curproc->p_vmspace);
-       endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
-       addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
-       for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
-               /*
-                * vm_fault_quick() can sleep.  Consequently,
-                * vm_page_lock_queue() and vm_page_unlock_queue()
-                * should not be performed outside of this loop.
-                */
-       race:
-               if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0) {
-                       vm_page_lock_queues();
-                       for (j = 0; j < i; j++)
-                               vm_page_unhold(wpipe->pipe_map.ms[j]);
-                       vm_page_unlock_queues();
-                       return (EFAULT);
-               }
-               wpipe->pipe_map.ms[i] = pmap_extract_and_hold(pmap, addr,
-                   VM_PROT_READ);
-               if (wpipe->pipe_map.ms[i] == NULL)
-                       goto race;
-       }
-
-/*
- * set up the control block
- */
-       wpipe->pipe_map.npages = i;
-       wpipe->pipe_map.pos =
-           ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
-       wpipe->pipe_map.cnt = size;
-
-/*
- * and map the buffer
- */
-       if (wpipe->pipe_map.kva == 0) {
-               /*
-                * We need to allocate space for an extra page because the
-                * address range might (will) span pages at times.
-                */
-               wpipe->pipe_map.kva = kmem_alloc_nofault(kernel_map,
-                       wpipe->pipe_buffer.size + PAGE_SIZE);
-               atomic_add_int(&amountpipekvawired,
-                   wpipe->pipe_buffer.size + PAGE_SIZE);
-       }
-       pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
-               wpipe->pipe_map.npages);
-
-/*
- * and update the uio data
- */
-
-       uio->uio_iov->iov_len -= size;
-       uio->uio_iov->iov_base = (char *)uio->uio_iov->iov_base + size;
-       if (uio->uio_iov->iov_len == 0)
-               uio->uio_iov++;
-       uio_setresid(uio, (uio_resid(uio) - size));
-       uio->uio_offset += size;
-       return (0);
-}
-
-/*
- * unmap and unwire the process buffer
- */
-static void
-pipe_destroy_write_buffer(wpipe)
-       struct pipe *wpipe;
-{
-       int i;
-
-       if (wpipe->pipe_map.kva) {
-               pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
-
-               if (amountpipekvawired > maxpipekvawired / 2) {
-                       /* Conserve address space */
-                       vm_offset_t kva = wpipe->pipe_map.kva;
-                       wpipe->pipe_map.kva = 0;
-                       kmem_free(kernel_map, kva,
-                           wpipe->pipe_buffer.size + PAGE_SIZE);
-                       atomic_subtract_int(&amountpipekvawired,
-                           wpipe->pipe_buffer.size + PAGE_SIZE);
-               }
-       }
-       vm_page_lock_queues();
-       for (i = 0; i < wpipe->pipe_map.npages; i++) {
-               vm_page_unhold(wpipe->pipe_map.ms[i]);
-       }
-       vm_page_unlock_queues();
-       wpipe->pipe_map.npages = 0;
-}
-
 /*
- * In the case of a signal, the writing process might go away.  This
- * code copies the data into the circular buffer so that the source
- * pages can be freed without loss of data.
+ * perform a write of n bytes into the read side of buffer. Since 
+ * pipes are unidirectional a write is meant to be read by the otherside only.
  */
-static void
-pipe_clone_write_buffer(wpipe)
-       struct pipe *wpipe;
-{
-       int size;
-       int pos;
-
-       size = wpipe->pipe_map.cnt;
-       pos = wpipe->pipe_map.pos;
-
-       wpipe->pipe_buffer.in = size;
-       wpipe->pipe_buffer.out = 0;
-       wpipe->pipe_buffer.cnt = size;
-       wpipe->pipe_state &= ~PIPE_DIRECTW;
-
-       PIPE_UNLOCK(wpipe);
-       bcopy((caddr_t) wpipe->pipe_map.kva + pos,
-           wpipe->pipe_buffer.buffer, size);
-       pipe_destroy_write_buffer(wpipe);
-       PIPE_LOCK(wpipe);
-}
-
-/*
- * This implements the pipe buffer write mechanism.  Note that only
- * a direct write OR a normal pipe write can be pending at any given time.
- * If there are any characters in the pipe buffer, the direct write will
- * be deferred until the receiving process grabs all of the bytes from
- * the pipe buffer.  Then the direct mapping write is set-up.
- */
-static int
-pipe_direct_write(wpipe, uio)
-       struct pipe *wpipe;
-       struct uio *uio;
-{
-       int error;
-
-retry:
-       while (wpipe->pipe_state & PIPE_DIRECTW) {
-               if (wpipe->pipe_state & PIPE_WANTR) {
-                       wpipe->pipe_state &= ~PIPE_WANTR;
-                       wakeup(wpipe);
-               }
-               wpipe->pipe_state |= PIPE_WANTW;
-               error = msleep(wpipe, PIPE_MTX(wpipe),
-                   PRIBIO | PCATCH, "pipdww", 0);
-               if (error)
-                       goto error1;
-               if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) {
-                       error = EPIPE;
-                       goto error1;
-               }
-       }
-       wpipe->pipe_map.cnt = 0;        /* transfer not ready yet */
-       if (wpipe->pipe_buffer.cnt > 0) {
-               if (wpipe->pipe_state & PIPE_WANTR) {
-                       wpipe->pipe_state &= ~PIPE_WANTR;
-                       wakeup(wpipe);
-               }
-                       
-               wpipe->pipe_state |= PIPE_WANTW;
-               error = msleep(wpipe, PIPE_MTX(wpipe),
-                   PRIBIO | PCATCH, "pipdwc", 0);
-               if (error)
-                       goto error1;
-               if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) {
-                       error = EPIPE;
-                       goto error1;
-               }
-               goto retry;
-       }
-
-       wpipe->pipe_state |= PIPE_DIRECTW;
-
-       pipelock(wpipe, 0);
-       PIPE_UNLOCK(wpipe);
-       error = pipe_build_write_buffer(wpipe, uio);
-       PIPE_LOCK(wpipe);
-       pipeunlock(wpipe);
-       if (error) {
-               wpipe->pipe_state &= ~PIPE_DIRECTW;
-               goto error1;
-       }
-
-       error = 0;
-       while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
-               if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) {
-                       pipelock(wpipe, 0);
-                       PIPE_UNLOCK(wpipe);
-                       pipe_destroy_write_buffer(wpipe);
-                       PIPE_LOCK(wpipe);
-                       pipeselwakeup(wpipe, wpipe);
-                       pipeunlock(wpipe);
-                       error = EPIPE;
-                       goto error1;
-               }
-               if (wpipe->pipe_state & PIPE_WANTR) {
-                       wpipe->pipe_state &= ~PIPE_WANTR;
-                       wakeup(wpipe);
-               }
-               pipeselwakeup(wpipe, wpipe);
-               error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
-                   "pipdwt", 0);
-       }
-
-       pipelock(wpipe,0);
-       if (wpipe->pipe_state & PIPE_DIRECTW) {
-               /*
-                * this bit of trickery substitutes a kernel buffer for
-                * the process that might be going away.
-                */
-               pipe_clone_write_buffer(wpipe);
-       } else {
-               PIPE_UNLOCK(wpipe);
-               pipe_destroy_write_buffer(wpipe);
-               PIPE_LOCK(wpipe);
-       }
-       pipeunlock(wpipe);
-       return (error);
-
-error1:
-       wakeup(wpipe);
-       return (error);
-}
-#endif
-       
-
-
 static int
 pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags,
        __unused vfs_context_t ctx)
@@ -1071,6 +864,9 @@ pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags,
        int orig_resid;
        int pipe_size;
        struct pipe *wpipe, *rpipe;
+       // LP64todo - fix this!
+       orig_resid = uio_resid(uio);
+       int space;
 
        rpipe = (struct pipe *)fp->f_data;
 
@@ -1095,54 +891,35 @@ pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags,
 
        pipe_size = 0;
 
-       if (wpipe->pipe_buffer.buffer == 0) {
-               /*
-                * need to allocate some storage... we delay the allocation
-                * until the first write on fd[0] to avoid allocating storage for both
-                * 'pipe ends'... most pipes are half-duplex with the writes targeting
-                * fd[1], so allocating space for both ends is a waste...
-                *
-                * Reduce to 1/4th pipe size if we're over our global max.
-                */
-               if (amountpipekva > maxpipekva / 2)
-                       pipe_size = SMALL_PIPE_SIZE;
-               else
-                       pipe_size = PIPE_SIZE;
-       }
-
        /*
-        * If it is advantageous to resize the pipe buffer, do
-        * so.
+        * need to allocate some storage... we delay the allocation
+        * until the first write on fd[0] to avoid allocating storage for both
+        * 'pipe ends'... most pipes are half-duplex with the writes targeting
+        * fd[1], so allocating space for both ends is a waste...
         */
-       if ((uio_resid(uio) > PIPE_SIZE) &&
-               (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
-               (amountpipekva < maxpipekva / 2) &&
-               (nbigpipe < LIMITBIGPIPES) &&
-#ifndef PIPE_NODIRECT
-               (wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
-#endif
-               (wpipe->pipe_buffer.cnt == 0)) {
 
-               pipe_size = BIG_PIPE_SIZE;
+       if ( wpipe->pipe_buffer.buffer == 0 || ( 
+               (unsigned)orig_resid > wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt &&
+               amountpipekva < maxpipekva ) ) {
 
+               pipe_size = choose_pipespace(wpipe->pipe_buffer.size, wpipe->pipe_buffer.cnt + orig_resid);
        }
        if (pipe_size) {
                /*
                 * need to do initial allocation or resizing of pipe
+                * holding both structure and io locks. 
                 */
-               if ((error = pipelock(wpipe, 1)) == 0) {
-                       PIPE_UNLOCK(wpipe);
-                       if (pipespace(wpipe, pipe_size) == 0)
-                               OSAddAtomic(1, &nbigpipe);
-                       PIPE_LOCK(wpipe);
-                       pipeunlock(wpipe);
-
-                       if (wpipe->pipe_buffer.buffer == 0) {
-                               /*
-                                * initial allocation failed
-                                */
+               if ((error = pipeio_lock(wpipe, 1)) == 0) {
+                       if (wpipe->pipe_buffer.cnt == 0)                        
+                               error = pipespace(wpipe, pipe_size);
+                       else 
+                               error = expand_pipespace(wpipe, pipe_size);
+               
+                       pipeio_unlock(wpipe);
+                       
+                       /* allocation failed */
+                       if (wpipe->pipe_buffer.buffer == 0)
                                error = ENOMEM;
-                       }
                }
                if (error) {
                        /*
@@ -1159,91 +936,35 @@ pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags,
                        return(error);
                }
        }
-       // LP64todo - fix this!
-       orig_resid = uio_resid(uio);
 
        while (uio_resid(uio)) {
-               int space;
-
-#ifndef PIPE_NODIRECT
-               /*
-                * If the transfer is large, we can gain performance if
-                * we do process-to-process copies directly.
-                * If the write is non-blocking, we don't use the
-                * direct write mechanism.
-                *
-                * The direct write mechanism will detect the reader going
-                * away on us.
-                */
-               if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
-                   (fp->f_flag & FNONBLOCK) == 0 &&
-                   amountpipekvawired + uio_resid(uio) < maxpipekvawired) { 
-                       error = pipe_direct_write(wpipe, uio);
-                       if (error)
-                               break;
-                       continue;
-               }
-
-               /*
-                * Pipe buffered writes cannot be coincidental with
-                * direct writes.  We wait until the currently executing
-                * direct write is completed before we start filling the
-                * pipe buffer.  We break out if a signal occurs or the
-                * reader goes away.
-                */
-       retrywrite:
-               while (wpipe->pipe_state & PIPE_DIRECTW) {
-                       if (wpipe->pipe_state & PIPE_WANTR) {
-                               wpipe->pipe_state &= ~PIPE_WANTR;
-                               wakeup(wpipe);
-                       }
-                       error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH, "pipbww", 0);
 
-                       if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF))
-                               break;
-                       if (error)
-                               break;
-               }
-#else
        retrywrite:
-#endif
                space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
 
-               /*
-                * Writes of size <= PIPE_BUF must be atomic.
-                */
+               /* Writes of size <= PIPE_BUF must be atomic. */
                if ((space < uio_resid(uio)) && (orig_resid <= PIPE_BUF))
                        space = 0;
 
                if (space > 0) {
 
-                       if ((error = pipelock(wpipe,1)) == 0) {
+                       if ((error = pipeio_lock(wpipe,1)) == 0) {
                                int size;       /* Transfer size */
                                int segsize;    /* first segment to transfer */
 
                                if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) {
-                                       pipeunlock(wpipe);
+                                       pipeio_unlock(wpipe);
                                        error = EPIPE;
                                        break;
                                }
-#ifndef PIPE_NODIRECT
-                               /*
-                                * It is possible for a direct write to
-                                * slip in on us... handle it here...
-                                */
-                               if (wpipe->pipe_state & PIPE_DIRECTW) {
-                                       pipeunlock(wpipe);
-                                       goto retrywrite;
-                               }
-#endif
                                /* 
-                                * If a process blocked in pipelock, our
+                                * If a process blocked in pipeio_lock, our
                                 * value for space might be bad... the mutex
                                 * is dropped while we're blocked
                                 */
                                if (space > (int)(wpipe->pipe_buffer.size - 
                                    wpipe->pipe_buffer.cnt)) {
-                                       pipeunlock(wpipe);
+                                       pipeio_unlock(wpipe);
                                        goto retrywrite;
                                }
 
@@ -1279,7 +1000,7 @@ pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags,
                                        /* 
                                         * Transfer remaining part now, to
                                         * support atomic writes.  Wraparound
-                                        * happened.
+                                        * happened. (State 3)
                                         */
                                        if (wpipe->pipe_buffer.in + segsize != 
                                            wpipe->pipe_buffer.size)
@@ -1292,9 +1013,12 @@ pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags,
                                            size - segsize, uio);
                                        PIPE_LOCK(rpipe);
                                }
+                               /* 
+                                * readers never know to read until count is updated.
+                                */
                                if (error == 0) {
                                        wpipe->pipe_buffer.in += size;
-                                       if (wpipe->pipe_buffer.in >=
+                                       if (wpipe->pipe_buffer.in >
                                            wpipe->pipe_buffer.size) {
                                                if (wpipe->pipe_buffer.in !=
                                                    size - segsize +
@@ -1311,7 +1035,7 @@ pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags,
                                                panic("Pipe buffer overflow");
                                
                                }
-                               pipeunlock(wpipe);
+                               pipeio_unlock(wpipe);
                        }
                        if (error)
                                break;
@@ -1425,12 +1149,7 @@ pipe_ioctl(struct fileproc *fp, u_long cmd, caddr_t data,
                return (0);
 
        case FIONREAD:
-#ifndef PIPE_NODIRECT
-               if (mpipe->pipe_state & PIPE_DIRECTW)
-                       *(int *)data = mpipe->pipe_map.cnt;
-               else
-#endif
-                       *(int *)data = mpipe->pipe_buffer.cnt;
+               *(int *)data = mpipe->pipe_buffer.cnt;
                PIPE_UNLOCK(mpipe);
                return (0);
 
@@ -1465,6 +1184,7 @@ pipe_select(struct fileproc *fp, int which, void *wql, vfs_context_t ctx)
        PIPE_LOCK(rpipe);
 
        wpipe = rpipe->pipe_peer;
+       
 
 #if CONFIG_MACF
        /*
@@ -1492,9 +1212,11 @@ pipe_select(struct fileproc *fp, int which, void *wql, vfs_context_t ctx)
                break;
 
         case FWRITE:
+               if (wpipe)
+                       wpipe->pipe_state |= PIPE_WSELECT;
                if (wpipe == NULL || (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) ||
                    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
-                    (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
+                    (MAX_PIPESIZE(wpipe) - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
 
                        retnum = 1;
                } else {
@@ -1523,7 +1245,6 @@ pipe_close(struct fileglob *fg, __unused vfs_context_t ctx)
        cpipe = (struct pipe *)fg->fg_data;
        fg->fg_data = NULL;
        proc_fdunlock(vfs_context_proc(ctx));
-
        if (cpipe)
                pipeclose(cpipe);
 
@@ -1533,30 +1254,14 @@ pipe_close(struct fileglob *fg, __unused vfs_context_t ctx)
 static void
 pipe_free_kmem(struct pipe *cpipe)
 {
-
        if (cpipe->pipe_buffer.buffer != NULL) {
-               if (cpipe->pipe_buffer.size > PIPE_SIZE)
-                       OSAddAtomic(-1, &nbigpipe);
                OSAddAtomic(-(cpipe->pipe_buffer.size), &amountpipekva);
                OSAddAtomic(-1, &amountpipes);
-
-               kmem_free(kernel_map, (vm_offset_t)cpipe->pipe_buffer.buffer,
+               kfree((void *)cpipe->pipe_buffer.buffer,
                          cpipe->pipe_buffer.size);
                cpipe->pipe_buffer.buffer = NULL;
+               cpipe->pipe_buffer.size = 0;
        }
-#ifndef PIPE_NODIRECT
-       if (cpipe->pipe_map.kva != 0) {
-               atomic_subtract_int(&amountpipekvawired,
-                   cpipe->pipe_buffer.size + PAGE_SIZE);
-               kmem_free(kernel_map,
-                       cpipe->pipe_map.kva,
-                       cpipe->pipe_buffer.size + PAGE_SIZE);
-               cpipe->pipe_map.cnt = 0;
-               cpipe->pipe_map.kva = 0;
-               cpipe->pipe_map.pos = 0;
-               cpipe->pipe_map.npages = 0;
-       }
-#endif
 }
 
 /*
@@ -1569,7 +1274,6 @@ pipeclose(struct pipe *cpipe)
 
        if (cpipe == NULL)
                return;
-
        /* partially created pipes won't have a valid mutex. */
        if (PIPE_MTX(cpipe) != NULL)
                PIPE_LOCK(cpipe);
@@ -1622,23 +1326,28 @@ pipeclose(struct pipe *cpipe)
         * free resources
         */
        if (PIPE_MTX(cpipe) != NULL) {
-               if (ppipe != NULL) {
-                       /*
+               if (ppipe != NULL) {
+                       /*
                         * since the mutex is shared and the peer is still
                         * alive, we need to release the mutex, not free it
                         */
-                       PIPE_UNLOCK(cpipe);
+                       PIPE_UNLOCK(cpipe);
                } else {
-                       /*
+                       /*
                         * peer is gone, so we're the sole party left with
-                        * interest in this mutex... we can just free it
+                        * interest in this mutex... unlock and free it
                         */
+                       PIPE_UNLOCK(cpipe);
                        lck_mtx_free(PIPE_MTX(cpipe), pipe_mtx_grp);
                }
        }
        pipe_free_kmem(cpipe);
-
-       zfree(pipe_zone, cpipe);
+       if (cpipe->pipe_state & PIPE_WSELECT) {
+               pipe_garbage_collect(cpipe);
+       } else {
+               zfree(pipe_zone, cpipe);
+               pipe_garbage_collect(NULL);
+       }
 
 }
 
@@ -1733,11 +1442,6 @@ filt_piperead(struct knote *kn, long hint)
 
        wpipe = rpipe->pipe_peer;
        kn->kn_data = rpipe->pipe_buffer.cnt;
-
-#ifndef PIPE_NODIRECT
-       if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
-               kn->kn_data = rpipe->pipe_map.cnt;
-#endif
        if ((rpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) ||
            (wpipe == NULL) || (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF))) {
                kn->kn_flags |= EV_EOF;
@@ -1745,8 +1449,8 @@ filt_piperead(struct knote *kn, long hint)
        } else {
                int64_t lowwat = 1;
                if (kn->kn_sfflags & NOTE_LOWAT) {
-                       if (rpipe->pipe_buffer.size && kn->kn_sdata > rpipe->pipe_buffer.size)
-                               lowwat = rpipe->pipe_buffer.size;
+                       if (rpipe->pipe_buffer.size && kn->kn_sdata > MAX_PIPESIZE(rpipe))
+                               lowwat = MAX_PIPESIZE(rpipe);
                        else if (kn->kn_sdata > lowwat)
                                lowwat = kn->kn_sdata;
                }
@@ -1785,18 +1489,12 @@ filt_pipewrite(struct knote *kn, long hint)
                        PIPE_UNLOCK(rpipe);
                return (1);
        }
-       kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
-       if (!kn->kn_data && wpipe->pipe_buffer.size == 0)
-               kn->kn_data = PIPE_BUF; /* unwritten pipe is ready for write */
+       kn->kn_data = MAX_PIPESIZE(wpipe) - wpipe->pipe_buffer.cnt;
 
-#ifndef PIPE_NODIRECT
-       if (wpipe->pipe_state & PIPE_DIRECTW)
-               kn->kn_data = 0;
-#endif
        int64_t lowwat = PIPE_BUF;
        if (kn->kn_sfflags & NOTE_LOWAT) {
-               if (wpipe->pipe_buffer.size && kn->kn_sdata > wpipe->pipe_buffer.size)
-                       lowwat = wpipe->pipe_buffer.size;
+               if (wpipe->pipe_buffer.size && kn->kn_sdata > MAX_PIPESIZE(wpipe))
+                       lowwat = MAX_PIPESIZE(wpipe);
                else if (kn->kn_sdata > lowwat)
                        lowwat = kn->kn_sdata;
        }
@@ -1837,13 +1535,13 @@ fill_pipeinfo(struct pipe * cpipe, struct pipe_info * pinfo)
                        /*
                         * the peer still exists, use it's info
                         */
-                       pipe_size  = cpipe->pipe_peer->pipe_buffer.size;
+                       pipe_size  = MAX_PIPESIZE(cpipe->pipe_peer);
                        pipe_count = cpipe->pipe_peer->pipe_buffer.cnt;
                } else {
                        pipe_count = 0;
                }
        } else {
-               pipe_size  = cpipe->pipe_buffer.size;
+               pipe_size  = MAX_PIPESIZE(cpipe);
                pipe_count = cpipe->pipe_buffer.cnt;
        }
        /*
@@ -1881,8 +1579,8 @@ fill_pipeinfo(struct pipe * cpipe, struct pipe_info * pinfo)
         * XXX (st_dev, st_ino) should be unique.
         */
 
-       pinfo->pipe_handle = (uint64_t)((uintptr_t)cpipe);
-       pinfo->pipe_peerhandle = (uint64_t)((uintptr_t)(cpipe->pipe_peer));
+       pinfo->pipe_handle = (uint64_t)VM_KERNEL_ADDRPERM((uintptr_t)cpipe);
+       pinfo->pipe_peerhandle = (uint64_t)VM_KERNEL_ADDRPERM((uintptr_t)(cpipe->pipe_peer));
        pinfo->pipe_status = cpipe->pipe_state;
 
        PIPE_UNLOCK(cpipe);
@@ -1919,6 +1617,75 @@ pipe_drain(struct fileproc *fp, __unused vfs_context_t ctx)
 }
 
 
+ /*
+ * When a thread sets a write-select on a pipe, it creates an implicit,
+ * untracked dependency between that thread and the peer of the pipe
+ * on which the select is set.  If the peer pipe is closed and freed
+ * before the select()ing thread wakes up, the system will panic as
+ * it attempts to unwind the dangling select().  To avoid that panic,
+ * we notice whenever a dangerous select() is set on a pipe, and
+ * defer the final deletion of the pipe until that select()s are all
+ * resolved.  Since we can't currently detect exactly when that
+ * resolution happens, we use a simple garbage collection queue to 
+ * reap the at-risk pipes 'later'.
+ */
+static void
+pipe_garbage_collect(struct pipe *cpipe)
+{
+       uint64_t old, now;
+       struct pipe_garbage *pgp;
+
+       /* Convert msecs to nsecs and then to abstime */
+       old = pipe_garbage_age_limit * 1000000;
+       nanoseconds_to_absolutetime(old, &old);
+
+       lck_mtx_lock(pipe_garbage_lock);
+
+       /* Free anything that's been on the queue for <mumble> seconds */
+       now = mach_absolute_time();
+       old = now - old;
+       while ((pgp = pipe_garbage_head) && pgp->pg_timestamp < old) {
+               pipe_garbage_head = pgp->pg_next;
+               if (pipe_garbage_head == NULL)
+                       pipe_garbage_tail = NULL;
+               pipe_garbage_count--;
+               zfree(pipe_zone, pgp->pg_pipe);
+               zfree(pipe_garbage_zone, pgp);
+       }
+
+       /* Add the new pipe (if any) to the tail of the garbage queue */
+       if (cpipe) {
+               cpipe->pipe_state = PIPE_DEAD;
+               pgp = (struct pipe_garbage *)zalloc(pipe_garbage_zone);
+               if (pgp == NULL) {
+                       /*
+                        * We're too low on memory to garbage collect the
+                        * pipe.  Freeing it runs the risk of panicing the
+                        * system.  All we can do is leak it and leave
+                        * a breadcrumb behind.  The good news, such as it
+                        * is, is that this will probably never happen.
+                        * We will probably hit the panic below first.
+                        */
+                       printf("Leaking pipe %p - no room left in the queue",
+                           cpipe);
+                       lck_mtx_unlock(pipe_garbage_lock);
+                       return;
+               }
+
+               pgp->pg_pipe = cpipe;
+               pgp->pg_timestamp = now;
+               pgp->pg_next = NULL;
 
+               if (pipe_garbage_tail)
+                       pipe_garbage_tail->pg_next = pgp;
+               pipe_garbage_tail = pgp;
+               if (pipe_garbage_head == NULL)
+                       pipe_garbage_head = pipe_garbage_tail;
 
+               if (pipe_garbage_count++ >= PIPE_GARBAGE_QUEUE_LIMIT)
+                       panic("Length of pipe garbage queue exceeded %d",
+                           PIPE_GARBAGE_QUEUE_LIMIT);
+       }
+       lck_mtx_unlock(pipe_garbage_lock);
+}