* are met.
*/
/*
- * Copyright (c) 2003-2007 Apple Inc. All rights reserved.
+ * Copyright (c) 2003-2014 Apple Inc. All rights reserved.
*
* @APPLE_OSREFERENCE_LICENSE_HEADER_START@
*
* pipes scheme originally used in FreeBSD/4.4Lite. It does not support
* all features of sockets, but does do everything that pipes normally
* do.
+ *
+ * Pipes are implemented as circular buffers. Following are the valid states in pipes operations
+ *
+ * _________________________________
+ * 1. |_________________________________| r=w, c=0
+ *
+ * _________________________________
+ * 2. |__r:::::wc_______________________| r <= w , c > 0
+ *
+ * _________________________________
+ * 3. |::::wc_____r:::::::::::::::::::::| r>w , c > 0
+ *
+ * _________________________________
+ * 4. |:::::::wrc:::::::::::::::::::::::| w=r, c = Max size
+ *
+ *
+ * Nomenclature:-
+ * a-z define the steps in a program flow
+ * 1-4 are the states as defined aboe
+ * Action: is what file operation is done on the pipe
+ *
+ * Current:None Action: initialize with size M=200
+ * a. State 1 ( r=0, w=0, c=0)
+ *
+ * Current: a Action: write(100) (w < M)
+ * b. State 2 (r=0, w=100, c=100)
+ *
+ * Current: b Action: write(100) (w = M-w)
+ * c. State 4 (r=0,w=0,c=200)
+ *
+ * Current: b Action: read(70) ( r < c )
+ * d. State 2(r=70,w=100,c=30)
+ *
+ * Current: d Action: write(75) ( w < (m-w))
+ * e. State 2 (r=70,w=175,c=105)
+ *
+ * Current: d Action: write(110) ( w > (m-w))
+ * f. State 3 (r=70,w=10,c=140)
+ *
+ * Current: d Action: read(30) (r >= c )
+ * g. State 1 (r=100,w=100,c=0)
+ *
*/
/*
- * This code has two modes of operation, a small write mode and a large
- * write mode. The small write mode acts like conventional pipes with
- * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the
- * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT
- * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
- * the receiving process can copy it directly from the pages in the sending
- * process.
- *
- * If the sending process receives a signal, it is possible that it will
- * go away, and certainly its address space can change, because control
- * is returned back to the user-mode side. In that case, the pipe code
- * arranges to copy the buffer supplied by the user process, to a pageable
- * kernel buffer, and the receiving process will grab the data from the
- * pageable kernel buffer. Since signals don't happen all that often,
- * the copy operation is normally eliminated.
- *
- * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
- * happen for small transfers so that the system will not spend all of
- * its time context switching.
+ * This code create half duplex pipe buffers for facilitating file like
+ * operations on pipes. The initial buffer is very small, but this can
+ * dynamically change to larger sizes based on usage. The buffer size is never
+ * reduced. The total amount of kernel memory used is governed by maxpipekva.
+ * In case of dynamic expansion limit is reached, the output thread is blocked
+ * until the pipe buffer empties enough to continue.
*
* In order to limit the resource use of pipes, two sysctls exist:
*
* kern.ipc.maxpipekva - This is a hard limit on the amount of pageable
- * address space available to us in pipe_map. Whenever the amount in use
- * exceeds half of this value, all new pipes will be created with size
- * SMALL_PIPE_SIZE, rather than PIPE_SIZE. Big pipe creation will be limited
- * as well. This value is loader tunable only.
- *
- * kern.ipc.maxpipekvawired - This value limits the amount of memory that may
- * be wired in order to facilitate direct copies using page flipping.
- * Whenever this value is exceeded, pipes will fall back to using regular
- * copies. This value is sysctl controllable at all times.
- *
- * These values are autotuned in subr_param.c.
+ * address space available to us in pipe_map.
*
* Memory usage may be monitored through the sysctls
- * kern.ipc.pipes, kern.ipc.pipekva and kern.ipc.pipekvawired.
+ * kern.ipc.pipes, kern.ipc.pipekva.
*
*/
#include <sys/kdebug.h>
#include <kern/zalloc.h>
+#include <kern/kalloc.h>
#include <vm/vm_kern.h>
#include <libkern/OSAtomic.h>
+#include <libkern/section_keywords.h>
+
+#if CONFIG_MACF
+#include <security/mac_framework.h>
+#endif
#define f_flag f_fglob->fg_flag
-#define f_type f_fglob->fg_type
#define f_msgcount f_fglob->fg_msgcount
#define f_cred f_fglob->fg_cred
#define f_ops f_fglob->fg_ops
#define f_offset f_fglob->fg_offset
#define f_data f_fglob->fg_data
-/*
- * Use this define if you want to disable *fancy* VM things. Expect an
- * approx 30% decrease in transfer rate. This could be useful for
- * NetBSD or OpenBSD.
- *
- * this needs to be ported to X and the performance measured
- * before committing to supporting it
- */
-#define PIPE_NODIRECT 1
-
-#ifndef PIPE_NODIRECT
-
-#include <vm/vm.h>
-#include <vm/vm_param.h>
-#include <vm/vm_object.h>
-#include <vm/vm_kern.h>
-#include <vm/vm_extern.h>
-#include <vm/pmap.h>
-#include <vm/vm_map.h>
-#include <vm/vm_page.h>
-#include <vm/uma.h>
-
-#endif
/*
- * interfaces to the outside world
+ * interfaces to the outside world exported through file operations
*/
static int pipe_read(struct fileproc *fp, struct uio *uio,
- int flags, vfs_context_t ctx);
-
+ int flags, vfs_context_t ctx);
static int pipe_write(struct fileproc *fp, struct uio *uio,
- int flags, vfs_context_t ctx);
-
+ int flags, vfs_context_t ctx);
static int pipe_close(struct fileglob *fg, vfs_context_t ctx);
-
static int pipe_select(struct fileproc *fp, int which, void * wql,
vfs_context_t ctx);
-
static int pipe_kqfilter(struct fileproc *fp, struct knote *kn,
- vfs_context_t ctx);
-
+ struct kevent_internal_s *kev, vfs_context_t ctx);
static int pipe_ioctl(struct fileproc *fp, u_long cmd, caddr_t data,
vfs_context_t ctx);
-
static int pipe_drain(struct fileproc *fp,vfs_context_t ctx);
+static const struct fileops pipeops = {
+ .fo_type = DTYPE_PIPE,
+ .fo_read = pipe_read,
+ .fo_write = pipe_write,
+ .fo_ioctl = pipe_ioctl,
+ .fo_select = pipe_select,
+ .fo_close = pipe_close,
+ .fo_kqfilter = pipe_kqfilter,
+ .fo_drain = pipe_drain,
+};
-struct fileops pipeops =
- { pipe_read,
- pipe_write,
- pipe_ioctl,
- pipe_select,
- pipe_close,
- pipe_kqfilter,
- pipe_drain };
+static void filt_pipedetach(struct knote *kn);
+static int filt_piperead(struct knote *kn, long hint);
+static int filt_pipereadtouch(struct knote *kn, struct kevent_internal_s *kev);
+static int filt_pipereadprocess(struct knote *kn, struct filt_process_s *data, struct kevent_internal_s *kev);
-static void filt_pipedetach(struct knote *kn);
-static int filt_piperead(struct knote *kn, long hint);
-static int filt_pipewrite(struct knote *kn, long hint);
+static int filt_pipewrite(struct knote *kn, long hint);
+static int filt_pipewritetouch(struct knote *kn, struct kevent_internal_s *kev);
+static int filt_pipewriteprocess(struct knote *kn, struct filt_process_s *data, struct kevent_internal_s *kev);
-static struct filterops pipe_rfiltops = {
+SECURITY_READ_ONLY_EARLY(struct filterops) pipe_rfiltops = {
.f_isfd = 1,
.f_detach = filt_pipedetach,
.f_event = filt_piperead,
+ .f_touch = filt_pipereadtouch,
+ .f_process = filt_pipereadprocess,
};
-static struct filterops pipe_wfiltops = {
+
+SECURITY_READ_ONLY_EARLY(struct filterops) pipe_wfiltops = {
.f_isfd = 1,
.f_detach = filt_pipedetach,
.f_event = filt_pipewrite,
+ .f_touch = filt_pipewritetouch,
+ .f_process = filt_pipewriteprocess,
};
-/*
- * Default pipe buffer size(s), this can be kind-of large now because pipe
- * space is pageable. The pipe code will try to maintain locality of
- * reference for performance reasons, so small amounts of outstanding I/O
- * will not wipe the cache.
- */
-#define MINPIPESIZE (PIPE_SIZE/3)
-
-/*
- * Limit the number of "big" pipes
- */
-#define LIMITBIGPIPES 32
-static int nbigpipe;
-
-static int amountpipes;
-static int amountpipekva;
+static int nbigpipe; /* for compatibility sake. no longer used */
+static int amountpipes; /* total number of pipes in system */
+static int amountpipekva; /* total memory used by pipes */
-#ifndef PIPE_NODIRECT
-static int amountpipekvawired;
-#endif
-int maxpipekva = 1024 * 1024 * 16;
+int maxpipekva __attribute__((used)) = PIPE_KVAMAX; /* allowing 16MB max. */
#if PIPE_SYSCTLS
SYSCTL_DECL(_kern_ipc);
static void pipeclose(struct pipe *cpipe);
static void pipe_free_kmem(struct pipe *cpipe);
static int pipe_create(struct pipe **cpipep);
+static int pipespace(struct pipe *cpipe, int size);
+static int choose_pipespace(unsigned long current, unsigned long expected);
+static int expand_pipespace(struct pipe *p, int target_size);
static void pipeselwakeup(struct pipe *cpipe, struct pipe *spipe);
-static __inline int pipelock(struct pipe *cpipe, int catch);
-static __inline void pipeunlock(struct pipe *cpipe);
-
-#ifndef PIPE_NODIRECT
-static int pipe_build_write_buffer(struct pipe *wpipe, struct uio *uio);
-static void pipe_destroy_write_buffer(struct pipe *wpipe);
-static int pipe_direct_write(struct pipe *wpipe, struct uio *uio);
-static void pipe_clone_write_buffer(struct pipe *wpipe);
-#endif
+static __inline int pipeio_lock(struct pipe *cpipe, int catch);
+static __inline void pipeio_unlock(struct pipe *cpipe);
extern int postpipeevent(struct pipe *, int);
extern void evpipefree(struct pipe *cpipe);
-
-static int pipespace(struct pipe *cpipe, int size);
-
static lck_grp_t *pipe_mtx_grp;
static lck_attr_t *pipe_mtx_attr;
static lck_grp_attr_t *pipe_mtx_grp_attr;
static zone_t pipe_zone;
+#define MAX_PIPESIZE(pipe) ( MAX(PIPE_SIZE, (pipe)->pipe_buffer.size) )
+
#define PIPE_GARBAGE_AGE_LIMIT 5000 /* In milliseconds */
#define PIPE_GARBAGE_QUEUE_LIMIT 32000
static uint64_t pipe_garbage_age_limit = PIPE_GARBAGE_AGE_LIMIT;
static int pipe_garbage_count = 0;
static lck_mtx_t *pipe_garbage_lock;
+static void pipe_garbage_collect(struct pipe *cpipe);
SYSINIT(vfs, SI_SUB_VFS, SI_ORDER_ANY, pipeinit, NULL);
+/* initial setup done at time of sysinit */
void
pipeinit(void)
{
+ nbigpipe=0;
vm_size_t zone_size;
-
+
zone_size = 8192 * sizeof(struct pipe);
pipe_zone = zinit(sizeof(struct pipe), zone_size, 4096, "pipe zone");
- /*
- * allocate lock group attribute and group for pipe mutexes
- */
+
+ /* allocate lock group attribute and group for pipe mutexes */
pipe_mtx_grp_attr = lck_grp_attr_alloc_init();
pipe_mtx_grp = lck_grp_alloc_init("pipe", pipe_mtx_grp_attr);
- /*
- * allocate the lock attribute for pipe mutexes
- */
+ /* allocate the lock attribute for pipe mutexes */
pipe_mtx_attr = lck_attr_alloc_init();
/*
pipe_garbage_zone = (zone_t)zinit(sizeof(struct pipe_garbage),
zone_size, 4096, "pipe garbage zone");
pipe_garbage_lock = lck_mtx_alloc_init(pipe_mtx_grp, pipe_mtx_attr);
+
}
+#ifndef CONFIG_EMBEDDED
/* Bitmap for things to touch in pipe_touch() */
#define PIPE_ATIME 0x00000001 /* time of last access */
#define PIPE_MTIME 0x00000002 /* time of last modification */
static void
pipe_touch(struct pipe *tpipe, int touch)
{
- struct timeval now;
+ struct timespec now;
- microtime(&now);
+ nanotime(&now);
if (touch & PIPE_ATIME) {
tpipe->st_atimespec.tv_sec = now.tv_sec;
- tpipe->st_atimespec.tv_nsec = now.tv_usec * 1000;
+ tpipe->st_atimespec.tv_nsec = now.tv_nsec;
}
if (touch & PIPE_MTIME) {
tpipe->st_mtimespec.tv_sec = now.tv_sec;
- tpipe->st_mtimespec.tv_nsec = now.tv_usec * 1000;
+ tpipe->st_mtimespec.tv_nsec = now.tv_nsec;
}
if (touch & PIPE_CTIME) {
tpipe->st_ctimespec.tv_sec = now.tv_sec;
- tpipe->st_ctimespec.tv_nsec = now.tv_usec * 1000;
+ tpipe->st_ctimespec.tv_nsec = now.tv_nsec;
+ }
+}
+#endif
+
+static const unsigned int pipesize_blocks[] = {512,1024,2048,4096, 4096 * 2, PIPE_SIZE , PIPE_SIZE * 4 };
+
+/*
+ * finds the right size from possible sizes in pipesize_blocks
+ * returns the size which matches max(current,expected)
+ */
+static int
+choose_pipespace(unsigned long current, unsigned long expected)
+{
+ int i = sizeof(pipesize_blocks)/sizeof(unsigned int) -1;
+ unsigned long target;
+
+ /*
+ * assert that we always get an atomic transaction sized pipe buffer,
+ * even if the system pipe buffer high-water mark has been crossed.
+ */
+ assert(PIPE_BUF == pipesize_blocks[0]);
+
+ if (expected > current)
+ target = expected;
+ else
+ target = current;
+
+ while ( i >0 && pipesize_blocks[i-1] > target) {
+ i=i-1;
+
}
+
+ return pipesize_blocks[i];
}
+/*
+ * expand the size of pipe while there is data to be read,
+ * and then free the old buffer once the current buffered
+ * data has been transferred to new storage.
+ * Required: PIPE_LOCK and io lock to be held by caller.
+ * returns 0 on success or no expansion possible
+ */
+static int
+expand_pipespace(struct pipe *p, int target_size)
+{
+ struct pipe tmp, oldpipe;
+ int error;
+ tmp.pipe_buffer.buffer = 0;
+
+ if (p->pipe_buffer.size >= (unsigned) target_size) {
+ return 0; /* the existing buffer is max size possible */
+ }
+
+ /* create enough space in the target */
+ error = pipespace(&tmp, target_size);
+ if (error != 0)
+ return (error);
+
+ oldpipe.pipe_buffer.buffer = p->pipe_buffer.buffer;
+ oldpipe.pipe_buffer.size = p->pipe_buffer.size;
+
+ memcpy(tmp.pipe_buffer.buffer, p->pipe_buffer.buffer, p->pipe_buffer.size);
+ if (p->pipe_buffer.cnt > 0 && p->pipe_buffer.in <= p->pipe_buffer.out ){
+ /* we are in State 3 and need extra copying for read to be consistent */
+ memcpy(&tmp.pipe_buffer.buffer[p->pipe_buffer.size], p->pipe_buffer.buffer, p->pipe_buffer.size);
+ p->pipe_buffer.in += p->pipe_buffer.size;
+ }
+
+ p->pipe_buffer.buffer = tmp.pipe_buffer.buffer;
+ p->pipe_buffer.size = tmp.pipe_buffer.size;
+
+
+ pipe_free_kmem(&oldpipe);
+ return 0;
+}
/*
* The pipe system call for the DTYPE_PIPE type of pipes
+ *
+ * returns:
+ * FREAD | fd0 | -->[struct rpipe] --> |~~buffer~~| \
+ * (pipe_mutex)
+ * FWRITE | fd1 | -->[struct wpipe] --X /
*/
/* ARGSUSED */
/*
* allocate the space for the normal I/O direction up
* front... we'll delay the allocation for the other
- * direction until a write actually occurs (most
- * likely it won't)...
- *
- * Reduce to 1/4th pipe size if we're over our global max.
+ * direction until a write actually occurs (most likely it won't)...
*/
- if (amountpipekva > maxpipekva / 2)
- error = pipespace(rpipe, SMALL_PIPE_SIZE);
- else
- error = pipespace(rpipe, PIPE_SIZE);
+ error = pipespace(rpipe, choose_pipespace(rpipe->pipe_buffer.size, 0));
if (error)
goto freepipes;
-#ifndef PIPE_NODIRECT
- rpipe->pipe_state |= PIPE_DIRECTOK;
- wpipe->pipe_state |= PIPE_DIRECTOK;
-#endif
TAILQ_INIT(&rpipe->pipe_evlist);
TAILQ_INIT(&wpipe->pipe_evlist);
retval[0] = fd;
/*
- * for now we'll create half-duplex
- * pipes... this is what we've always
- * supported..
+ * for now we'll create half-duplex pipes(refer returns section above).
+ * this is what we've always supported..
*/
rf->f_flag = FREAD;
- rf->f_type = DTYPE_PIPE;
rf->f_data = (caddr_t)rpipe;
rf->f_ops = &pipeops;
goto freepipes;
}
wf->f_flag = FWRITE;
- wf->f_type = DTYPE_PIPE;
wf->f_data = (caddr_t)wpipe;
wf->f_ops = &pipeops;
rpipe->pipe_peer = wpipe;
wpipe->pipe_peer = rpipe;
- rpipe->pipe_mtxp = wpipe->pipe_mtxp = pmtx;
+ /* both structures share the same mutex */
+ rpipe->pipe_mtxp = wpipe->pipe_mtxp = pmtx;
retval[1] = fd;
#if CONFIG_MACF
}
#endif
if (cpipe->pipe_buffer.buffer == 0) {
- /*
- * must be stat'ing the write fd
- */
+ /* must be stat'ing the write fd */
if (cpipe->pipe_peer) {
- /*
- * the peer still exists, use it's info
- */
- pipe_size = cpipe->pipe_peer->pipe_buffer.size;
+ /* the peer still exists, use it's info */
+ pipe_size = MAX_PIPESIZE(cpipe->pipe_peer);
pipe_count = cpipe->pipe_peer->pipe_buffer.cnt;
} else {
pipe_count = 0;
}
} else {
- pipe_size = cpipe->pipe_buffer.size;
+ pipe_size = MAX_PIPESIZE(cpipe);
pipe_count = cpipe->pipe_buffer.cnt;
}
/*
* we might catch it in transient state
*/
if (pipe_size == 0)
- pipe_size = PIPE_SIZE;
+ pipe_size = MAX(PIPE_SIZE, pipesize_blocks[0]);
if (isstat64 != 0) {
sb64 = (struct stat64 *)ub;
* address of this pipe's struct pipe. This number may be recycled
* relatively quickly.
*/
- sb64->st_ino = (ino64_t)((uintptr_t)cpipe);
+ sb64->st_ino = (ino64_t)VM_KERNEL_ADDRPERM((uintptr_t)cpipe);
} else {
sb = (struct stat *)ub;
* address of this pipe's struct pipe. This number may be recycled
* relatively quickly.
*/
- sb->st_ino = (ino_t)(uintptr_t)cpipe;
+ sb->st_ino = (ino_t)VM_KERNEL_ADDRPERM((uintptr_t)cpipe);
}
PIPE_UNLOCK(cpipe);
{
vm_offset_t buffer;
- size = round_page(size);
+ if (size <= 0)
+ return(EINVAL);
- if (kmem_alloc(kernel_map, &buffer, size) != KERN_SUCCESS)
- return(ENOMEM);
+ if ((buffer = (vm_offset_t)kalloc(size)) == 0 )
+ return(ENOMEM);
/* free old resources if we're resizing */
pipe_free_kmem(cpipe);
pipe_create(struct pipe **cpipep)
{
struct pipe *cpipe;
-
cpipe = (struct pipe *)zalloc(pipe_zone);
if ((*cpipep = cpipe) == NULL)
*/
bzero(cpipe, sizeof *cpipe);
+#ifndef CONFIG_EMBEDDED
/* Initial times are all the time of creation of the pipe */
pipe_touch(cpipe, PIPE_ATIME | PIPE_MTIME | PIPE_CTIME);
-
+#endif
return (0);
}
* lock a pipe for I/O, blocking other access
*/
static inline int
-pipelock(struct pipe *cpipe, int catch)
+pipeio_lock(struct pipe *cpipe, int catch)
{
int error;
-
while (cpipe->pipe_state & PIPE_LOCKFL) {
cpipe->pipe_state |= PIPE_LWANT;
-
error = msleep(cpipe, PIPE_MTX(cpipe), catch ? (PRIBIO | PCATCH) : PRIBIO,
"pipelk", 0);
if (error != 0)
return (error);
}
cpipe->pipe_state |= PIPE_LOCKFL;
-
return (0);
}
* unlock a pipe I/O lock
*/
static inline void
-pipeunlock(struct pipe *cpipe)
+pipeio_unlock(struct pipe *cpipe)
{
cpipe->pipe_state &= ~PIPE_LOCKFL;
-
if (cpipe->pipe_state & PIPE_LWANT) {
cpipe->pipe_state &= ~PIPE_LWANT;
wakeup(cpipe);
}
}
+/*
+ * wakeup anyone whos blocked in select
+ */
static void
pipeselwakeup(struct pipe *cpipe, struct pipe *spipe)
{
}
}
+/*
+ * Read n bytes from the buffer. Semantics are similar to file read.
+ * returns: number of bytes read from the buffer
+ */
/* ARGSUSED */
static int
pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags,
PIPE_LOCK(rpipe);
++rpipe->pipe_busy;
- error = pipelock(rpipe, 1);
+ error = pipeio_lock(rpipe, 1);
if (error)
goto unlocked_error;
goto locked_error;
#endif
+
while (uio_resid(uio)) {
/*
* normal pipe buffer receive
*/
if (rpipe->pipe_buffer.cnt > 0) {
+ /*
+ * # bytes to read is min( bytes from read pointer until end of buffer,
+ * total unread bytes,
+ * user requested byte count)
+ */
size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
if (size > rpipe->pipe_buffer.cnt)
size = rpipe->pipe_buffer.cnt;
if (size > (u_int) uio_resid(uio))
size = (u_int) uio_resid(uio);
- PIPE_UNLOCK(rpipe);
+ PIPE_UNLOCK(rpipe); /* we still hold io lock.*/
error = uiomove(
&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
size, uio);
rpipe->pipe_buffer.out = 0;
rpipe->pipe_buffer.cnt -= size;
-
+
/*
* If there is no more to read in the pipe, reset
* its pointers to the beginning. This improves
rpipe->pipe_buffer.out = 0;
}
nread += size;
-#ifndef PIPE_NODIRECT
- /*
- * Direct copy, bypassing a kernel buffer.
- */
- } else if ((size = rpipe->pipe_map.cnt) &&
- (rpipe->pipe_state & PIPE_DIRECTW)) {
- caddr_t va;
- // LP64todo - fix this!
- if (size > (u_int) uio_resid(uio))
- size = (u_int) uio_resid(uio);
-
- va = (caddr_t) rpipe->pipe_map.kva +
- rpipe->pipe_map.pos;
- PIPE_UNLOCK(rpipe);
- error = uiomove(va, size, uio);
- PIPE_LOCK(rpipe);
- if (error)
- break;
- nread += size;
- rpipe->pipe_map.pos += size;
- rpipe->pipe_map.cnt -= size;
- if (rpipe->pipe_map.cnt == 0) {
- rpipe->pipe_state &= ~PIPE_DIRECTW;
- wakeup(rpipe);
- }
-#endif
} else {
/*
* detect EOF condition
}
/*
- * Break if some data was read.
+ * Break if some data was read in previous iteration.
*/
if (nread > 0)
break;
* We will either break out with an error or we will
* sleep and relock to loop.
*/
- pipeunlock(rpipe);
+ pipeio_unlock(rpipe);
/*
* Handle non-blocking mode operation or
error = EAGAIN;
} else {
rpipe->pipe_state |= PIPE_WANTR;
-
error = msleep(rpipe, PIPE_MTX(rpipe), PRIBIO | PCATCH, "piperd", 0);
-
if (error == 0)
- error = pipelock(rpipe, 1);
+ error = pipeio_lock(rpipe, 1);
}
if (error)
goto unlocked_error;
#if CONFIG_MACF
locked_error:
#endif
- pipeunlock(rpipe);
+ pipeio_unlock(rpipe);
unlocked_error:
--rpipe->pipe_busy;
if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
wakeup(rpipe);
- } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
+ } else if (rpipe->pipe_buffer.cnt < rpipe->pipe_buffer.size) {
/*
* Handle write blocking hysteresis.
*/
}
}
- if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
+ if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) > 0)
pipeselwakeup(rpipe, rpipe->pipe_peer);
+#ifndef CONFIG_EMBEDDED
/* update last read time */
pipe_touch(rpipe, PIPE_ATIME);
+#endif
PIPE_UNLOCK(rpipe);
return (error);
}
-
-
-#ifndef PIPE_NODIRECT
-/*
- * Map the sending processes' buffer into kernel space and wire it.
- * This is similar to a physical write operation.
- */
-static int
-pipe_build_write_buffer(wpipe, uio)
- struct pipe *wpipe;
- struct uio *uio;
-{
- pmap_t pmap;
- u_int size;
- int i, j;
- vm_offset_t addr, endaddr;
-
-
- size = (u_int) uio->uio_iov->iov_len;
- if (size > wpipe->pipe_buffer.size)
- size = wpipe->pipe_buffer.size;
-
- pmap = vmspace_pmap(curproc->p_vmspace);
- endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
- addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
- for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
- /*
- * vm_fault_quick() can sleep. Consequently,
- * vm_page_lock_queue() and vm_page_unlock_queue()
- * should not be performed outside of this loop.
- */
- race:
- if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0) {
- vm_page_lock_queues();
- for (j = 0; j < i; j++)
- vm_page_unhold(wpipe->pipe_map.ms[j]);
- vm_page_unlock_queues();
- return (EFAULT);
- }
- wpipe->pipe_map.ms[i] = pmap_extract_and_hold(pmap, addr,
- VM_PROT_READ);
- if (wpipe->pipe_map.ms[i] == NULL)
- goto race;
- }
-
-/*
- * set up the control block
- */
- wpipe->pipe_map.npages = i;
- wpipe->pipe_map.pos =
- ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
- wpipe->pipe_map.cnt = size;
-
-/*
- * and map the buffer
- */
- if (wpipe->pipe_map.kva == 0) {
- /*
- * We need to allocate space for an extra page because the
- * address range might (will) span pages at times.
- */
- wpipe->pipe_map.kva = kmem_alloc_nofault(kernel_map,
- wpipe->pipe_buffer.size + PAGE_SIZE);
- atomic_add_int(&amountpipekvawired,
- wpipe->pipe_buffer.size + PAGE_SIZE);
- }
- pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
- wpipe->pipe_map.npages);
-
-/*
- * and update the uio data
- */
-
- uio->uio_iov->iov_len -= size;
- uio->uio_iov->iov_base = (char *)uio->uio_iov->iov_base + size;
- if (uio->uio_iov->iov_len == 0)
- uio->uio_iov++;
- uio_setresid(uio, (uio_resid(uio) - size));
- uio->uio_offset += size;
- return (0);
-}
-
-/*
- * unmap and unwire the process buffer
- */
-static void
-pipe_destroy_write_buffer(wpipe)
- struct pipe *wpipe;
-{
- int i;
-
- if (wpipe->pipe_map.kva) {
- pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
-
- if (amountpipekvawired > maxpipekvawired / 2) {
- /* Conserve address space */
- vm_offset_t kva = wpipe->pipe_map.kva;
- wpipe->pipe_map.kva = 0;
- kmem_free(kernel_map, kva,
- wpipe->pipe_buffer.size + PAGE_SIZE);
- atomic_subtract_int(&amountpipekvawired,
- wpipe->pipe_buffer.size + PAGE_SIZE);
- }
- }
- vm_page_lock_queues();
- for (i = 0; i < wpipe->pipe_map.npages; i++) {
- vm_page_unhold(wpipe->pipe_map.ms[i]);
- }
- vm_page_unlock_queues();
- wpipe->pipe_map.npages = 0;
-}
-
-/*
- * In the case of a signal, the writing process might go away. This
- * code copies the data into the circular buffer so that the source
- * pages can be freed without loss of data.
- */
-static void
-pipe_clone_write_buffer(wpipe)
- struct pipe *wpipe;
-{
- int size;
- int pos;
-
- size = wpipe->pipe_map.cnt;
- pos = wpipe->pipe_map.pos;
-
- wpipe->pipe_buffer.in = size;
- wpipe->pipe_buffer.out = 0;
- wpipe->pipe_buffer.cnt = size;
- wpipe->pipe_state &= ~PIPE_DIRECTW;
-
- PIPE_UNLOCK(wpipe);
- bcopy((caddr_t) wpipe->pipe_map.kva + pos,
- wpipe->pipe_buffer.buffer, size);
- pipe_destroy_write_buffer(wpipe);
- PIPE_LOCK(wpipe);
-}
-
/*
- * This implements the pipe buffer write mechanism. Note that only
- * a direct write OR a normal pipe write can be pending at any given time.
- * If there are any characters in the pipe buffer, the direct write will
- * be deferred until the receiving process grabs all of the bytes from
- * the pipe buffer. Then the direct mapping write is set-up.
+ * perform a write of n bytes into the read side of buffer. Since
+ * pipes are unidirectional a write is meant to be read by the otherside only.
*/
-static int
-pipe_direct_write(wpipe, uio)
- struct pipe *wpipe;
- struct uio *uio;
-{
- int error;
-
-retry:
- while (wpipe->pipe_state & PIPE_DIRECTW) {
- if (wpipe->pipe_state & PIPE_WANTR) {
- wpipe->pipe_state &= ~PIPE_WANTR;
- wakeup(wpipe);
- }
- wpipe->pipe_state |= PIPE_WANTW;
- error = msleep(wpipe, PIPE_MTX(wpipe),
- PRIBIO | PCATCH, "pipdww", 0);
- if (error)
- goto error1;
- if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) {
- error = EPIPE;
- goto error1;
- }
- }
- wpipe->pipe_map.cnt = 0; /* transfer not ready yet */
- if (wpipe->pipe_buffer.cnt > 0) {
- if (wpipe->pipe_state & PIPE_WANTR) {
- wpipe->pipe_state &= ~PIPE_WANTR;
- wakeup(wpipe);
- }
-
- wpipe->pipe_state |= PIPE_WANTW;
- error = msleep(wpipe, PIPE_MTX(wpipe),
- PRIBIO | PCATCH, "pipdwc", 0);
- if (error)
- goto error1;
- if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) {
- error = EPIPE;
- goto error1;
- }
- goto retry;
- }
-
- wpipe->pipe_state |= PIPE_DIRECTW;
-
- pipelock(wpipe, 0);
- PIPE_UNLOCK(wpipe);
- error = pipe_build_write_buffer(wpipe, uio);
- PIPE_LOCK(wpipe);
- pipeunlock(wpipe);
- if (error) {
- wpipe->pipe_state &= ~PIPE_DIRECTW;
- goto error1;
- }
-
- error = 0;
- while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
- if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) {
- pipelock(wpipe, 0);
- PIPE_UNLOCK(wpipe);
- pipe_destroy_write_buffer(wpipe);
- PIPE_LOCK(wpipe);
- pipeselwakeup(wpipe, wpipe);
- pipeunlock(wpipe);
- error = EPIPE;
- goto error1;
- }
- if (wpipe->pipe_state & PIPE_WANTR) {
- wpipe->pipe_state &= ~PIPE_WANTR;
- wakeup(wpipe);
- }
- pipeselwakeup(wpipe, wpipe);
- error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
- "pipdwt", 0);
- }
-
- pipelock(wpipe,0);
- if (wpipe->pipe_state & PIPE_DIRECTW) {
- /*
- * this bit of trickery substitutes a kernel buffer for
- * the process that might be going away.
- */
- pipe_clone_write_buffer(wpipe);
- } else {
- PIPE_UNLOCK(wpipe);
- pipe_destroy_write_buffer(wpipe);
- PIPE_LOCK(wpipe);
- }
- pipeunlock(wpipe);
- return (error);
-
-error1:
- wakeup(wpipe);
- return (error);
-}
-#endif
-
-
-
static int
pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags,
__unused vfs_context_t ctx)
int orig_resid;
int pipe_size;
struct pipe *wpipe, *rpipe;
+ // LP64todo - fix this!
+ orig_resid = uio_resid(uio);
+ int space;
rpipe = (struct pipe *)fp->f_data;
pipe_size = 0;
- if (wpipe->pipe_buffer.buffer == 0) {
- /*
- * need to allocate some storage... we delay the allocation
- * until the first write on fd[0] to avoid allocating storage for both
- * 'pipe ends'... most pipes are half-duplex with the writes targeting
- * fd[1], so allocating space for both ends is a waste...
- *
- * Reduce to 1/4th pipe size if we're over our global max.
- */
- if (amountpipekva > maxpipekva / 2)
- pipe_size = SMALL_PIPE_SIZE;
- else
- pipe_size = PIPE_SIZE;
- }
-
/*
- * If it is advantageous to resize the pipe buffer, do
- * so.
+ * need to allocate some storage... we delay the allocation
+ * until the first write on fd[0] to avoid allocating storage for both
+ * 'pipe ends'... most pipes are half-duplex with the writes targeting
+ * fd[1], so allocating space for both ends is a waste...
*/
- if ((uio_resid(uio) > PIPE_SIZE) &&
- (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
- (amountpipekva < maxpipekva / 2) &&
- (nbigpipe < LIMITBIGPIPES) &&
-#ifndef PIPE_NODIRECT
- (wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
-#endif
- (wpipe->pipe_buffer.cnt == 0)) {
- pipe_size = BIG_PIPE_SIZE;
+ if ( wpipe->pipe_buffer.buffer == 0 || (
+ (unsigned)orig_resid > wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt &&
+ amountpipekva < maxpipekva ) ) {
+ pipe_size = choose_pipespace(wpipe->pipe_buffer.size, wpipe->pipe_buffer.cnt + orig_resid);
}
if (pipe_size) {
/*
* need to do initial allocation or resizing of pipe
+ * holding both structure and io locks.
*/
- if ((error = pipelock(wpipe, 1)) == 0) {
- PIPE_UNLOCK(wpipe);
- if (pipespace(wpipe, pipe_size) == 0)
- OSAddAtomic(1, &nbigpipe);
- PIPE_LOCK(wpipe);
- pipeunlock(wpipe);
-
- if (wpipe->pipe_buffer.buffer == 0) {
- /*
- * initial allocation failed
- */
+ if ((error = pipeio_lock(wpipe, 1)) == 0) {
+ if (wpipe->pipe_buffer.cnt == 0)
+ error = pipespace(wpipe, pipe_size);
+ else
+ error = expand_pipespace(wpipe, pipe_size);
+
+ pipeio_unlock(wpipe);
+
+ /* allocation failed */
+ if (wpipe->pipe_buffer.buffer == 0)
error = ENOMEM;
- }
}
if (error) {
/*
return(error);
}
}
- // LP64todo - fix this!
- orig_resid = uio_resid(uio);
while (uio_resid(uio)) {
- int space;
-
-#ifndef PIPE_NODIRECT
- /*
- * If the transfer is large, we can gain performance if
- * we do process-to-process copies directly.
- * If the write is non-blocking, we don't use the
- * direct write mechanism.
- *
- * The direct write mechanism will detect the reader going
- * away on us.
- */
- if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
- (fp->f_flag & FNONBLOCK) == 0 &&
- amountpipekvawired + uio_resid(uio) < maxpipekvawired) {
- error = pipe_direct_write(wpipe, uio);
- if (error)
- break;
- continue;
- }
-
- /*
- * Pipe buffered writes cannot be coincidental with
- * direct writes. We wait until the currently executing
- * direct write is completed before we start filling the
- * pipe buffer. We break out if a signal occurs or the
- * reader goes away.
- */
- retrywrite:
- while (wpipe->pipe_state & PIPE_DIRECTW) {
- if (wpipe->pipe_state & PIPE_WANTR) {
- wpipe->pipe_state &= ~PIPE_WANTR;
- wakeup(wpipe);
- }
- error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH, "pipbww", 0);
- if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF))
- break;
- if (error)
- break;
- }
-#else
retrywrite:
-#endif
space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
- /*
- * Writes of size <= PIPE_BUF must be atomic.
- */
+ /* Writes of size <= PIPE_BUF must be atomic. */
if ((space < uio_resid(uio)) && (orig_resid <= PIPE_BUF))
space = 0;
if (space > 0) {
- if ((error = pipelock(wpipe,1)) == 0) {
+ if ((error = pipeio_lock(wpipe,1)) == 0) {
int size; /* Transfer size */
int segsize; /* first segment to transfer */
if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) {
- pipeunlock(wpipe);
+ pipeio_unlock(wpipe);
error = EPIPE;
break;
}
-#ifndef PIPE_NODIRECT
- /*
- * It is possible for a direct write to
- * slip in on us... handle it here...
- */
- if (wpipe->pipe_state & PIPE_DIRECTW) {
- pipeunlock(wpipe);
- goto retrywrite;
- }
-#endif
- /*
- * If a process blocked in pipelock, our
- * value for space might be bad... the mutex
- * is dropped while we're blocked
+ /*
+ * If a process blocked in pipeio_lock, our
+ * value for space might be bad... the mutex
+ * is dropped while we're blocked
*/
if (space > (int)(wpipe->pipe_buffer.size -
wpipe->pipe_buffer.cnt)) {
- pipeunlock(wpipe);
+ pipeio_unlock(wpipe);
goto retrywrite;
}
/*
* Transfer remaining part now, to
* support atomic writes. Wraparound
- * happened.
+ * happened. (State 3)
*/
if (wpipe->pipe_buffer.in + segsize !=
wpipe->pipe_buffer.size)
size - segsize, uio);
PIPE_LOCK(rpipe);
}
+ /*
+ * readers never know to read until count is updated.
+ */
if (error == 0) {
wpipe->pipe_buffer.in += size;
- if (wpipe->pipe_buffer.in >=
+ if (wpipe->pipe_buffer.in >
wpipe->pipe_buffer.size) {
if (wpipe->pipe_buffer.in !=
size - segsize +
panic("Pipe buffer overflow");
}
- pipeunlock(wpipe);
+ pipeio_unlock(wpipe);
}
if (error)
break;
pipeselwakeup(wpipe, wpipe);
}
+#ifndef CONFIG_EMBEDDED
/* Update modification, status change (# of bytes in pipe) times */
pipe_touch(rpipe, PIPE_MTIME | PIPE_CTIME);
pipe_touch(wpipe, PIPE_MTIME | PIPE_CTIME);
+#endif
PIPE_UNLOCK(rpipe);
return (error);
return (0);
case FIONREAD:
-#ifndef PIPE_NODIRECT
- if (mpipe->pipe_state & PIPE_DIRECTW)
- *(int *)data = mpipe->pipe_map.cnt;
- else
-#endif
- *(int *)data = mpipe->pipe_buffer.cnt;
+ *(int *)data = mpipe->pipe_buffer.cnt;
PIPE_UNLOCK(mpipe);
return (0);
PIPE_LOCK(rpipe);
wpipe = rpipe->pipe_peer;
+
#if CONFIG_MACF
/*
wpipe->pipe_state |= PIPE_WSELECT;
if (wpipe == NULL || (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) ||
(((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
- (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
+ (MAX_PIPESIZE(wpipe) - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
retnum = 1;
} else {
cpipe = (struct pipe *)fg->fg_data;
fg->fg_data = NULL;
proc_fdunlock(vfs_context_proc(ctx));
-
if (cpipe)
pipeclose(cpipe);
static void
pipe_free_kmem(struct pipe *cpipe)
{
-
if (cpipe->pipe_buffer.buffer != NULL) {
- if (cpipe->pipe_buffer.size > PIPE_SIZE)
- OSAddAtomic(-1, &nbigpipe);
OSAddAtomic(-(cpipe->pipe_buffer.size), &amountpipekva);
OSAddAtomic(-1, &amountpipes);
-
- kmem_free(kernel_map, (vm_offset_t)cpipe->pipe_buffer.buffer,
+ kfree((void *)cpipe->pipe_buffer.buffer,
cpipe->pipe_buffer.size);
cpipe->pipe_buffer.buffer = NULL;
+ cpipe->pipe_buffer.size = 0;
}
-#ifndef PIPE_NODIRECT
- if (cpipe->pipe_map.kva != 0) {
- atomic_subtract_int(&amountpipekvawired,
- cpipe->pipe_buffer.size + PAGE_SIZE);
- kmem_free(kernel_map,
- cpipe->pipe_map.kva,
- cpipe->pipe_buffer.size + PAGE_SIZE);
- cpipe->pipe_map.cnt = 0;
- cpipe->pipe_map.kva = 0;
- cpipe->pipe_map.pos = 0;
- cpipe->pipe_map.npages = 0;
- }
-#endif
-}
-
-/*
- * When a thread sets a write-select on a pipe, it creates an implicit,
- * untracked dependency between that thread and the peer of the pipe
- * on which the select is set. If the peer pipe is closed and freed
- * before the select()ing thread wakes up, the system will panic as
- * it attempts to unwind the dangling select(). To avoid that panic,
- * we notice whenever a dangerous select() is set on a pipe, and
- * defer the final deletion of the pipe until that select()s are all
- * resolved. Since we can't currently detect exactly when that
- * resolution happens, we use a simple garbage collection queue to
- * reap the at-risk pipes 'later'.
- */
-static void
-pipe_garbage_collect(struct pipe *cpipe)
-{
- uint64_t old, now;
- struct pipe_garbage *pgp;
-
- /* Convert msecs to nsecs and then to abstime */
- old = pipe_garbage_age_limit * 1000000;
- nanoseconds_to_absolutetime(old, &old);
-
- lck_mtx_lock(pipe_garbage_lock);
-
- /* Free anything that's been on the queue for <mumble> seconds */
- now = mach_absolute_time();
- old = now - old;
- while ((pgp = pipe_garbage_head) && pgp->pg_timestamp < old) {
- pipe_garbage_head = pgp->pg_next;
- if (pipe_garbage_head == NULL)
- pipe_garbage_tail = NULL;
- pipe_garbage_count--;
- zfree(pipe_zone, pgp->pg_pipe);
- zfree(pipe_garbage_zone, pgp);
- }
-
- /* Add the new pipe (if any) to the tail of the garbage queue */
- if (cpipe) {
- cpipe->pipe_state = PIPE_DEAD;
- pgp = (struct pipe_garbage *)zalloc(pipe_garbage_zone);
- if (pgp == NULL) {
- /*
- * We're too low on memory to garbage collect the
- * pipe. Freeing it runs the risk of panicing the
- * system. All we can do is leak it and leave
- * a breadcrumb behind. The good news, such as it
- * is, is that this will probably never happen.
- * We will probably hit the panic below first.
- */
- printf("Leaking pipe %p - no room left in the queue",
- cpipe);
- lck_mtx_unlock(pipe_garbage_lock);
- return;
- }
-
- pgp->pg_pipe = cpipe;
- pgp->pg_timestamp = now;
- pgp->pg_next = NULL;
-
- if (pipe_garbage_tail)
- pipe_garbage_tail->pg_next = pgp;
- pipe_garbage_tail = pgp;
- if (pipe_garbage_head == NULL)
- pipe_garbage_head = pipe_garbage_tail;
-
- if (pipe_garbage_count++ >= PIPE_GARBAGE_QUEUE_LIMIT)
- panic("Length of pipe garbage queue exceeded %d",
- PIPE_GARBAGE_QUEUE_LIMIT);
- }
- lck_mtx_unlock(pipe_garbage_lock);
}
/*
if (cpipe == NULL)
return;
-
/* partially created pipes won't have a valid mutex. */
if (PIPE_MTX(cpipe) != NULL)
PIPE_LOCK(cpipe);
* free resources
*/
if (PIPE_MTX(cpipe) != NULL) {
- if (ppipe != NULL) {
- /*
+ if (ppipe != NULL) {
+ /*
* since the mutex is shared and the peer is still
* alive, we need to release the mutex, not free it
*/
- PIPE_UNLOCK(cpipe);
+ PIPE_UNLOCK(cpipe);
} else {
- /*
+ /*
* peer is gone, so we're the sole party left with
- * interest in this mutex... we can just free it
+ * interest in this mutex... unlock and free it
*/
+ PIPE_UNLOCK(cpipe);
lck_mtx_free(PIPE_MTX(cpipe), pipe_mtx_grp);
}
}
zfree(pipe_zone, cpipe);
pipe_garbage_collect(NULL);
}
+
}
/*ARGSUSED*/
static int
-pipe_kqfilter(__unused struct fileproc *fp, struct knote *kn, __unused vfs_context_t ctx)
+filt_piperead_common(struct knote *kn, struct pipe *rpipe)
{
- struct pipe *cpipe;
+ struct pipe *wpipe;
+ int retval;
+
+ /*
+ * we're being called back via the KNOTE post
+ * we made in pipeselwakeup, and we already hold the mutex...
+ */
+
+ wpipe = rpipe->pipe_peer;
+ kn->kn_data = rpipe->pipe_buffer.cnt;
+ if ((rpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) ||
+ (wpipe == NULL) || (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF))) {
+ kn->kn_flags |= EV_EOF;
+ retval = 1;
+ } else {
+ int64_t lowwat = 1;
+ if (kn->kn_sfflags & NOTE_LOWAT) {
+ if (rpipe->pipe_buffer.size && kn->kn_sdata > MAX_PIPESIZE(rpipe))
+ lowwat = MAX_PIPESIZE(rpipe);
+ else if (kn->kn_sdata > lowwat)
+ lowwat = kn->kn_sdata;
+ }
+ retval = kn->kn_data >= lowwat;
+ }
+ return (retval);
+}
+
+static int
+filt_piperead(struct knote *kn, long hint)
+{
+#pragma unused(hint)
+ struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
+
+ return filt_piperead_common(kn, rpipe);
+}
+
+static int
+filt_pipereadtouch(struct knote *kn, struct kevent_internal_s *kev)
+{
+ struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
+ int retval;
+
+ PIPE_LOCK(rpipe);
- cpipe = (struct pipe *)kn->kn_fp->f_data;
+ /* accept new inputs (and save the low water threshold and flag) */
+ kn->kn_sdata = kev->data;
+ kn->kn_sfflags = kev->fflags;
+ if ((kn->kn_status & KN_UDATA_SPECIFIC) == 0)
+ kn->kn_udata = kev->udata;
+
+ /* identify if any events are now fired */
+ retval = filt_piperead_common(kn, rpipe);
+
+ PIPE_UNLOCK(rpipe);
+
+ return retval;
+}
+
+static int
+filt_pipereadprocess(struct knote *kn, struct filt_process_s *data, struct kevent_internal_s *kev)
+{
+#pragma unused(data)
+ struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
+ int retval;
+
+ PIPE_LOCK(rpipe);
+ retval = filt_piperead_common(kn, rpipe);
+ if (retval) {
+ *kev = kn->kn_kevent;
+ if (kn->kn_flags & EV_CLEAR) {
+ kn->kn_fflags = 0;
+ kn->kn_data = 0;
+ }
+ }
+ PIPE_UNLOCK(rpipe);
+
+ return (retval);
+}
+
+/*ARGSUSED*/
+static int
+filt_pipewrite_common(struct knote *kn, struct pipe *rpipe)
+{
+ struct pipe *wpipe;
+
+ /*
+ * we're being called back via the KNOTE post
+ * we made in pipeselwakeup, and we already hold the mutex...
+ */
+ wpipe = rpipe->pipe_peer;
+
+ if ((wpipe == NULL) || (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF))) {
+ kn->kn_data = 0;
+ kn->kn_flags |= EV_EOF;
+ return (1);
+ }
+ kn->kn_data = MAX_PIPESIZE(wpipe) - wpipe->pipe_buffer.cnt;
+
+ int64_t lowwat = PIPE_BUF;
+ if (kn->kn_sfflags & NOTE_LOWAT) {
+ if (wpipe->pipe_buffer.size && kn->kn_sdata > MAX_PIPESIZE(wpipe))
+ lowwat = MAX_PIPESIZE(wpipe);
+ else if (kn->kn_sdata > lowwat)
+ lowwat = kn->kn_sdata;
+ }
+
+ return (kn->kn_data >= lowwat);
+}
+
+/*ARGSUSED*/
+static int
+filt_pipewrite(struct knote *kn, long hint)
+{
+#pragma unused(hint)
+ struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
+
+ return filt_pipewrite_common(kn, rpipe);
+}
+
+
+static int
+filt_pipewritetouch(struct knote *kn, struct kevent_internal_s *kev)
+{
+ struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
+ int res;
+
+ PIPE_LOCK(rpipe);
+
+ /* accept new kevent data (and save off lowat threshold and flag) */
+ kn->kn_sfflags = kev->fflags;
+ kn->kn_sdata = kev->data;
+ if ((kn->kn_status & KN_UDATA_SPECIFIC) == 0)
+ kn->kn_udata = kev->udata;
+
+ /* determine if any event is now deemed fired */
+ res = filt_pipewrite_common(kn, rpipe);
+
+ PIPE_UNLOCK(rpipe);
+
+ return res;
+}
+
+static int
+filt_pipewriteprocess(struct knote *kn, struct filt_process_s *data, struct kevent_internal_s *kev)
+{
+#pragma unused(data)
+ struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
+ int res;
+
+ PIPE_LOCK(rpipe);
+ res = filt_pipewrite_common(kn, rpipe);
+ if (res) {
+ *kev = kn->kn_kevent;
+ if (kn->kn_flags & EV_CLEAR) {
+ kn->kn_fflags = 0;
+ kn->kn_data = 0;
+ }
+ }
+ PIPE_UNLOCK(rpipe);
+
+ return res;
+}
+
+/*ARGSUSED*/
+static int
+pipe_kqfilter(__unused struct fileproc *fp, struct knote *kn,
+ __unused struct kevent_internal_s *kev, __unused vfs_context_t ctx)
+{
+ struct pipe *cpipe = (struct pipe *)kn->kn_fp->f_data;
+ int res;
PIPE_LOCK(cpipe);
#if CONFIG_MACF
*/
if (mac_pipe_check_kqfilter(vfs_context_ucred(ctx), kn, cpipe) != 0) {
PIPE_UNLOCK(cpipe);
- return (1);
+ kn->kn_flags = EV_ERROR;
+ kn->kn_data = EPERM;
+ return 0;
}
#endif
switch (kn->kn_filter) {
case EVFILT_READ:
- kn->kn_fop = &pipe_rfiltops;
+ kn->kn_filtid = EVFILTID_PIPE_R;
+ /* determine initial state */
+ res = filt_piperead_common(kn, cpipe);
break;
+
case EVFILT_WRITE:
- kn->kn_fop = &pipe_wfiltops;
+ kn->kn_filtid = EVFILTID_PIPE_W;
if (cpipe->pipe_peer == NULL) {
/*
* other end of pipe has been closed
*/
PIPE_UNLOCK(cpipe);
- return (EPIPE);
+ kn->kn_flags = EV_ERROR;
+ kn->kn_data = EPIPE;
+ return 0;
}
if (cpipe->pipe_peer)
cpipe = cpipe->pipe_peer;
+
+ /* determine inital state */
+ res = filt_pipewrite_common(kn, cpipe);
break;
default:
PIPE_UNLOCK(cpipe);
- return (1);
+ kn->kn_flags = EV_ERROR;
+ kn->kn_data = EINVAL;
+ return 0;
}
if (KNOTE_ATTACH(&cpipe->pipe_sel.si_note, kn))
cpipe->pipe_state |= PIPE_KNOTE;
PIPE_UNLOCK(cpipe);
- return (0);
+ return res;
}
static void
PIPE_UNLOCK(cpipe);
}
-/*ARGSUSED*/
-static int
-filt_piperead(struct knote *kn, long hint)
-{
- struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
- struct pipe *wpipe;
- int retval;
-
- /*
- * if hint == 0, then we've been called from the kevent
- * world directly and do not currently hold the pipe mutex...
- * if hint == 1, we're being called back via the KNOTE post
- * we made in pipeselwakeup, and we already hold the mutex...
- */
- if (hint == 0)
- PIPE_LOCK(rpipe);
-
- wpipe = rpipe->pipe_peer;
- kn->kn_data = rpipe->pipe_buffer.cnt;
-
-#ifndef PIPE_NODIRECT
- if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
- kn->kn_data = rpipe->pipe_map.cnt;
-#endif
- if ((rpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) ||
- (wpipe == NULL) || (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF))) {
- kn->kn_flags |= EV_EOF;
- retval = 1;
- } else {
- int64_t lowwat = 1;
- if (kn->kn_sfflags & NOTE_LOWAT) {
- if (rpipe->pipe_buffer.size && kn->kn_sdata > rpipe->pipe_buffer.size)
- lowwat = rpipe->pipe_buffer.size;
- else if (kn->kn_sdata > lowwat)
- lowwat = kn->kn_sdata;
- }
- retval = kn->kn_data >= lowwat;
- }
-
- if (hint == 0)
- PIPE_UNLOCK(rpipe);
-
- return (retval);
-}
-
-/*ARGSUSED*/
-static int
-filt_pipewrite(struct knote *kn, long hint)
-{
- struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
- struct pipe *wpipe;
-
- /*
- * if hint == 0, then we've been called from the kevent
- * world directly and do not currently hold the pipe mutex...
- * if hint == 1, we're being called back via the KNOTE post
- * we made in pipeselwakeup, and we already hold the mutex...
- */
- if (hint == 0)
- PIPE_LOCK(rpipe);
-
- wpipe = rpipe->pipe_peer;
-
- if ((wpipe == NULL) || (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF))) {
- kn->kn_data = 0;
- kn->kn_flags |= EV_EOF;
-
- if (hint == 0)
- PIPE_UNLOCK(rpipe);
- return (1);
- }
- kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
- if (!kn->kn_data && wpipe->pipe_buffer.size == 0)
- kn->kn_data = PIPE_BUF; /* unwritten pipe is ready for write */
-
-#ifndef PIPE_NODIRECT
- if (wpipe->pipe_state & PIPE_DIRECTW)
- kn->kn_data = 0;
-#endif
- int64_t lowwat = PIPE_BUF;
- if (kn->kn_sfflags & NOTE_LOWAT) {
- if (wpipe->pipe_buffer.size && kn->kn_sdata > wpipe->pipe_buffer.size)
- lowwat = wpipe->pipe_buffer.size;
- else if (kn->kn_sdata > lowwat)
- lowwat = kn->kn_sdata;
- }
-
- if (hint == 0)
- PIPE_UNLOCK(rpipe);
-
- return (kn->kn_data >= lowwat);
-}
-
int
fill_pipeinfo(struct pipe * cpipe, struct pipe_info * pinfo)
{
#if CONFIG_MACF
int error;
#endif
- struct timeval now;
+ struct timespec now;
struct vinfo_stat * ub;
int pipe_size = 0;
int pipe_count;
/*
* the peer still exists, use it's info
*/
- pipe_size = cpipe->pipe_peer->pipe_buffer.size;
+ pipe_size = MAX_PIPESIZE(cpipe->pipe_peer);
pipe_count = cpipe->pipe_peer->pipe_buffer.cnt;
} else {
pipe_count = 0;
}
} else {
- pipe_size = cpipe->pipe_buffer.size;
+ pipe_size = MAX_PIPESIZE(cpipe);
pipe_count = cpipe->pipe_buffer.cnt;
}
/*
ub->vst_uid = kauth_getuid();
ub->vst_gid = kauth_getgid();
- microtime(&now);
+ nanotime(&now);
ub->vst_atime = now.tv_sec;
- ub->vst_atimensec = now.tv_usec * 1000;
+ ub->vst_atimensec = now.tv_nsec;
ub->vst_mtime = now.tv_sec;
- ub->vst_mtimensec = now.tv_usec * 1000;
+ ub->vst_mtimensec = now.tv_nsec;
ub->vst_ctime = now.tv_sec;
- ub->vst_ctimensec = now.tv_usec * 1000;
+ ub->vst_ctimensec = now.tv_nsec;
/*
* Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen, st_uid, st_gid.
* XXX (st_dev, st_ino) should be unique.
*/
- pinfo->pipe_handle = (uint64_t)((uintptr_t)cpipe);
- pinfo->pipe_peerhandle = (uint64_t)((uintptr_t)(cpipe->pipe_peer));
+ pinfo->pipe_handle = (uint64_t)VM_KERNEL_ADDRPERM((uintptr_t)cpipe);
+ pinfo->pipe_peerhandle = (uint64_t)VM_KERNEL_ADDRPERM((uintptr_t)(cpipe->pipe_peer));
pinfo->pipe_status = cpipe->pipe_state;
PIPE_UNLOCK(cpipe);
}
+ /*
+ * When a thread sets a write-select on a pipe, it creates an implicit,
+ * untracked dependency between that thread and the peer of the pipe
+ * on which the select is set. If the peer pipe is closed and freed
+ * before the select()ing thread wakes up, the system will panic as
+ * it attempts to unwind the dangling select(). To avoid that panic,
+ * we notice whenever a dangerous select() is set on a pipe, and
+ * defer the final deletion of the pipe until that select()s are all
+ * resolved. Since we can't currently detect exactly when that
+ * resolution happens, we use a simple garbage collection queue to
+ * reap the at-risk pipes 'later'.
+ */
+static void
+pipe_garbage_collect(struct pipe *cpipe)
+{
+ uint64_t old, now;
+ struct pipe_garbage *pgp;
+
+ /* Convert msecs to nsecs and then to abstime */
+ old = pipe_garbage_age_limit * 1000000;
+ nanoseconds_to_absolutetime(old, &old);
+
+ lck_mtx_lock(pipe_garbage_lock);
+
+ /* Free anything that's been on the queue for <mumble> seconds */
+ now = mach_absolute_time();
+ old = now - old;
+ while ((pgp = pipe_garbage_head) && pgp->pg_timestamp < old) {
+ pipe_garbage_head = pgp->pg_next;
+ if (pipe_garbage_head == NULL)
+ pipe_garbage_tail = NULL;
+ pipe_garbage_count--;
+ zfree(pipe_zone, pgp->pg_pipe);
+ zfree(pipe_garbage_zone, pgp);
+ }
+
+ /* Add the new pipe (if any) to the tail of the garbage queue */
+ if (cpipe) {
+ cpipe->pipe_state = PIPE_DEAD;
+ pgp = (struct pipe_garbage *)zalloc(pipe_garbage_zone);
+ if (pgp == NULL) {
+ /*
+ * We're too low on memory to garbage collect the
+ * pipe. Freeing it runs the risk of panicing the
+ * system. All we can do is leak it and leave
+ * a breadcrumb behind. The good news, such as it
+ * is, is that this will probably never happen.
+ * We will probably hit the panic below first.
+ */
+ printf("Leaking pipe %p - no room left in the queue",
+ cpipe);
+ lck_mtx_unlock(pipe_garbage_lock);
+ return;
+ }
+
+ pgp->pg_pipe = cpipe;
+ pgp->pg_timestamp = now;
+ pgp->pg_next = NULL;
+ if (pipe_garbage_tail)
+ pipe_garbage_tail->pg_next = pgp;
+ pipe_garbage_tail = pgp;
+ if (pipe_garbage_head == NULL)
+ pipe_garbage_head = pipe_garbage_tail;
+ if (pipe_garbage_count++ >= PIPE_GARBAGE_QUEUE_LIMIT)
+ panic("Length of pipe garbage queue exceeded %d",
+ PIPE_GARBAGE_QUEUE_LIMIT);
+ }
+ lck_mtx_unlock(pipe_garbage_lock);
+}