X-Git-Url: https://git.saurik.com/apple/xnu.git/blobdiff_plain/6d2010ae8f7a6078e10b361c6962983bab233e0f..d26ffc64f583ab2d29df48f13518685602bc8832:/bsd/kern/sys_pipe.c?ds=sidebyside diff --git a/bsd/kern/sys_pipe.c b/bsd/kern/sys_pipe.c index c374ea07e..9e8b346e9 100644 --- a/bsd/kern/sys_pipe.c +++ b/bsd/kern/sys_pipe.c @@ -17,7 +17,7 @@ * are met. */ /* - * Copyright (c) 2003-2007 Apple Inc. All rights reserved. + * Copyright (c) 2003-2014 Apple Inc. All rights reserved. * * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ * @@ -55,46 +55,65 @@ * pipes scheme originally used in FreeBSD/4.4Lite. It does not support * all features of sockets, but does do everything that pipes normally * do. + * + * Pipes are implemented as circular buffers. Following are the valid states in pipes operations + * + * _________________________________ + * 1. |_________________________________| r=w, c=0 + * + * _________________________________ + * 2. |__r:::::wc_______________________| r <= w , c > 0 + * + * _________________________________ + * 3. |::::wc_____r:::::::::::::::::::::| r>w , c > 0 + * + * _________________________________ + * 4. |:::::::wrc:::::::::::::::::::::::| w=r, c = Max size + * + * + * Nomenclature:- + * a-z define the steps in a program flow + * 1-4 are the states as defined aboe + * Action: is what file operation is done on the pipe + * + * Current:None Action: initialize with size M=200 + * a. State 1 ( r=0, w=0, c=0) + * + * Current: a Action: write(100) (w < M) + * b. State 2 (r=0, w=100, c=100) + * + * Current: b Action: write(100) (w = M-w) + * c. State 4 (r=0,w=0,c=200) + * + * Current: b Action: read(70) ( r < c ) + * d. State 2(r=70,w=100,c=30) + * + * Current: d Action: write(75) ( w < (m-w)) + * e. State 2 (r=70,w=175,c=105) + * + * Current: d Action: write(110) ( w > (m-w)) + * f. State 3 (r=70,w=10,c=140) + * + * Current: d Action: read(30) (r >= c ) + * g. State 1 (r=100,w=100,c=0) + * */ /* - * This code has two modes of operation, a small write mode and a large - * write mode. The small write mode acts like conventional pipes with - * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the - * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT - * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and - * the receiving process can copy it directly from the pages in the sending - * process. - * - * If the sending process receives a signal, it is possible that it will - * go away, and certainly its address space can change, because control - * is returned back to the user-mode side. In that case, the pipe code - * arranges to copy the buffer supplied by the user process, to a pageable - * kernel buffer, and the receiving process will grab the data from the - * pageable kernel buffer. Since signals don't happen all that often, - * the copy operation is normally eliminated. - * - * The constant PIPE_MINDIRECT is chosen to make sure that buffering will - * happen for small transfers so that the system will not spend all of - * its time context switching. + * This code create half duplex pipe buffers for facilitating file like + * operations on pipes. The initial buffer is very small, but this can + * dynamically change to larger sizes based on usage. The buffer size is never + * reduced. The total amount of kernel memory used is governed by maxpipekva. + * In case of dynamic expansion limit is reached, the output thread is blocked + * until the pipe buffer empties enough to continue. * * In order to limit the resource use of pipes, two sysctls exist: * * kern.ipc.maxpipekva - This is a hard limit on the amount of pageable - * address space available to us in pipe_map. Whenever the amount in use - * exceeds half of this value, all new pipes will be created with size - * SMALL_PIPE_SIZE, rather than PIPE_SIZE. Big pipe creation will be limited - * as well. This value is loader tunable only. - * - * kern.ipc.maxpipekvawired - This value limits the amount of memory that may - * be wired in order to facilitate direct copies using page flipping. - * Whenever this value is exceeded, pipes will fall back to using regular - * copies. This value is sysctl controllable at all times. - * - * These values are autotuned in subr_param.c. + * address space available to us in pipe_map. * * Memory usage may be monitored through the sysctls - * kern.ipc.pipes, kern.ipc.pipekva and kern.ipc.pipekvawired. + * kern.ipc.pipes, kern.ipc.pipekva. * */ @@ -124,109 +143,80 @@ #include #include +#include #include #include +#include + +#if CONFIG_MACF +#include +#endif #define f_flag f_fglob->fg_flag -#define f_type f_fglob->fg_type #define f_msgcount f_fglob->fg_msgcount #define f_cred f_fglob->fg_cred #define f_ops f_fglob->fg_ops #define f_offset f_fglob->fg_offset #define f_data f_fglob->fg_data -/* - * Use this define if you want to disable *fancy* VM things. Expect an - * approx 30% decrease in transfer rate. This could be useful for - * NetBSD or OpenBSD. - * - * this needs to be ported to X and the performance measured - * before committing to supporting it - */ -#define PIPE_NODIRECT 1 - -#ifndef PIPE_NODIRECT - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#endif /* - * interfaces to the outside world + * interfaces to the outside world exported through file operations */ static int pipe_read(struct fileproc *fp, struct uio *uio, - int flags, vfs_context_t ctx); - + int flags, vfs_context_t ctx); static int pipe_write(struct fileproc *fp, struct uio *uio, - int flags, vfs_context_t ctx); - + int flags, vfs_context_t ctx); static int pipe_close(struct fileglob *fg, vfs_context_t ctx); - static int pipe_select(struct fileproc *fp, int which, void * wql, vfs_context_t ctx); - static int pipe_kqfilter(struct fileproc *fp, struct knote *kn, - vfs_context_t ctx); - + struct kevent_internal_s *kev, vfs_context_t ctx); static int pipe_ioctl(struct fileproc *fp, u_long cmd, caddr_t data, vfs_context_t ctx); - static int pipe_drain(struct fileproc *fp,vfs_context_t ctx); +static const struct fileops pipeops = { + .fo_type = DTYPE_PIPE, + .fo_read = pipe_read, + .fo_write = pipe_write, + .fo_ioctl = pipe_ioctl, + .fo_select = pipe_select, + .fo_close = pipe_close, + .fo_kqfilter = pipe_kqfilter, + .fo_drain = pipe_drain, +}; -struct fileops pipeops = - { pipe_read, - pipe_write, - pipe_ioctl, - pipe_select, - pipe_close, - pipe_kqfilter, - pipe_drain }; +static void filt_pipedetach(struct knote *kn); +static int filt_piperead(struct knote *kn, long hint); +static int filt_pipereadtouch(struct knote *kn, struct kevent_internal_s *kev); +static int filt_pipereadprocess(struct knote *kn, struct filt_process_s *data, struct kevent_internal_s *kev); -static void filt_pipedetach(struct knote *kn); -static int filt_piperead(struct knote *kn, long hint); -static int filt_pipewrite(struct knote *kn, long hint); +static int filt_pipewrite(struct knote *kn, long hint); +static int filt_pipewritetouch(struct knote *kn, struct kevent_internal_s *kev); +static int filt_pipewriteprocess(struct knote *kn, struct filt_process_s *data, struct kevent_internal_s *kev); -static struct filterops pipe_rfiltops = { +SECURITY_READ_ONLY_EARLY(struct filterops) pipe_rfiltops = { .f_isfd = 1, .f_detach = filt_pipedetach, .f_event = filt_piperead, + .f_touch = filt_pipereadtouch, + .f_process = filt_pipereadprocess, }; -static struct filterops pipe_wfiltops = { + +SECURITY_READ_ONLY_EARLY(struct filterops) pipe_wfiltops = { .f_isfd = 1, .f_detach = filt_pipedetach, .f_event = filt_pipewrite, + .f_touch = filt_pipewritetouch, + .f_process = filt_pipewriteprocess, }; -/* - * Default pipe buffer size(s), this can be kind-of large now because pipe - * space is pageable. The pipe code will try to maintain locality of - * reference for performance reasons, so small amounts of outstanding I/O - * will not wipe the cache. - */ -#define MINPIPESIZE (PIPE_SIZE/3) - -/* - * Limit the number of "big" pipes - */ -#define LIMITBIGPIPES 32 -static int nbigpipe; - -static int amountpipes; -static int amountpipekva; +static int nbigpipe; /* for compatibility sake. no longer used */ +static int amountpipes; /* total number of pipes in system */ +static int amountpipekva; /* total memory used by pipes */ -#ifndef PIPE_NODIRECT -static int amountpipekvawired; -#endif -int maxpipekva = 1024 * 1024 * 16; +int maxpipekva __attribute__((used)) = PIPE_KVAMAX; /* allowing 16MB max. */ #if PIPE_SYSCTLS SYSCTL_DECL(_kern_ipc); @@ -248,48 +238,73 @@ SYSCTL_INT(_kern_ipc, OID_AUTO, pipekvawired, CTLFLAG_RD|CTLFLAG_LOCKED, static void pipeclose(struct pipe *cpipe); static void pipe_free_kmem(struct pipe *cpipe); static int pipe_create(struct pipe **cpipep); +static int pipespace(struct pipe *cpipe, int size); +static int choose_pipespace(unsigned long current, unsigned long expected); +static int expand_pipespace(struct pipe *p, int target_size); static void pipeselwakeup(struct pipe *cpipe, struct pipe *spipe); -static __inline int pipelock(struct pipe *cpipe, int catch); -static __inline void pipeunlock(struct pipe *cpipe); - -#ifndef PIPE_NODIRECT -static int pipe_build_write_buffer(struct pipe *wpipe, struct uio *uio); -static void pipe_destroy_write_buffer(struct pipe *wpipe); -static int pipe_direct_write(struct pipe *wpipe, struct uio *uio); -static void pipe_clone_write_buffer(struct pipe *wpipe); -#endif +static __inline int pipeio_lock(struct pipe *cpipe, int catch); +static __inline void pipeio_unlock(struct pipe *cpipe); extern int postpipeevent(struct pipe *, int); extern void evpipefree(struct pipe *cpipe); - -static int pipespace(struct pipe *cpipe, int size); - static lck_grp_t *pipe_mtx_grp; static lck_attr_t *pipe_mtx_attr; static lck_grp_attr_t *pipe_mtx_grp_attr; static zone_t pipe_zone; +#define MAX_PIPESIZE(pipe) ( MAX(PIPE_SIZE, (pipe)->pipe_buffer.size) ) + +#define PIPE_GARBAGE_AGE_LIMIT 5000 /* In milliseconds */ +#define PIPE_GARBAGE_QUEUE_LIMIT 32000 + +struct pipe_garbage { + struct pipe *pg_pipe; + struct pipe_garbage *pg_next; + uint64_t pg_timestamp; +}; + +static zone_t pipe_garbage_zone; +static struct pipe_garbage *pipe_garbage_head = NULL; +static struct pipe_garbage *pipe_garbage_tail = NULL; +static uint64_t pipe_garbage_age_limit = PIPE_GARBAGE_AGE_LIMIT; +static int pipe_garbage_count = 0; +static lck_mtx_t *pipe_garbage_lock; +static void pipe_garbage_collect(struct pipe *cpipe); + SYSINIT(vfs, SI_SUB_VFS, SI_ORDER_ANY, pipeinit, NULL); +/* initial setup done at time of sysinit */ void pipeinit(void) { - pipe_zone = (zone_t)zinit(sizeof(struct pipe), 8192 * sizeof(struct pipe), 4096, "pipe zone"); + nbigpipe=0; + vm_size_t zone_size; + + zone_size = 8192 * sizeof(struct pipe); + pipe_zone = zinit(sizeof(struct pipe), zone_size, 4096, "pipe zone"); - /* - * allocate lock group attribute and group for pipe mutexes - */ + + /* allocate lock group attribute and group for pipe mutexes */ pipe_mtx_grp_attr = lck_grp_attr_alloc_init(); pipe_mtx_grp = lck_grp_alloc_init("pipe", pipe_mtx_grp_attr); + /* allocate the lock attribute for pipe mutexes */ + pipe_mtx_attr = lck_attr_alloc_init(); + /* - * allocate the lock attribute for pipe mutexes + * Set up garbage collection for dead pipes */ - pipe_mtx_attr = lck_attr_alloc_init(); + zone_size = (PIPE_GARBAGE_QUEUE_LIMIT + 20) * + sizeof(struct pipe_garbage); + pipe_garbage_zone = (zone_t)zinit(sizeof(struct pipe_garbage), + zone_size, 4096, "pipe garbage zone"); + pipe_garbage_lock = lck_mtx_alloc_init(pipe_mtx_grp, pipe_mtx_attr); + } +#ifndef CONFIG_EMBEDDED /* Bitmap for things to touch in pipe_touch() */ #define PIPE_ATIME 0x00000001 /* time of last access */ #define PIPE_MTIME 0x00000002 /* time of last modification */ @@ -298,30 +313,107 @@ pipeinit(void) static void pipe_touch(struct pipe *tpipe, int touch) { - struct timeval now; + struct timespec now; - microtime(&now); + nanotime(&now); if (touch & PIPE_ATIME) { tpipe->st_atimespec.tv_sec = now.tv_sec; - tpipe->st_atimespec.tv_nsec = now.tv_usec * 1000; + tpipe->st_atimespec.tv_nsec = now.tv_nsec; } if (touch & PIPE_MTIME) { tpipe->st_mtimespec.tv_sec = now.tv_sec; - tpipe->st_mtimespec.tv_nsec = now.tv_usec * 1000; + tpipe->st_mtimespec.tv_nsec = now.tv_nsec; } if (touch & PIPE_CTIME) { tpipe->st_ctimespec.tv_sec = now.tv_sec; - tpipe->st_ctimespec.tv_nsec = now.tv_usec * 1000; + tpipe->st_ctimespec.tv_nsec = now.tv_nsec; + } +} +#endif + +static const unsigned int pipesize_blocks[] = {512,1024,2048,4096, 4096 * 2, PIPE_SIZE , PIPE_SIZE * 4 }; + +/* + * finds the right size from possible sizes in pipesize_blocks + * returns the size which matches max(current,expected) + */ +static int +choose_pipespace(unsigned long current, unsigned long expected) +{ + int i = sizeof(pipesize_blocks)/sizeof(unsigned int) -1; + unsigned long target; + + /* + * assert that we always get an atomic transaction sized pipe buffer, + * even if the system pipe buffer high-water mark has been crossed. + */ + assert(PIPE_BUF == pipesize_blocks[0]); + + if (expected > current) + target = expected; + else + target = current; + + while ( i >0 && pipesize_blocks[i-1] > target) { + i=i-1; + } + + return pipesize_blocks[i]; } +/* + * expand the size of pipe while there is data to be read, + * and then free the old buffer once the current buffered + * data has been transferred to new storage. + * Required: PIPE_LOCK and io lock to be held by caller. + * returns 0 on success or no expansion possible + */ +static int +expand_pipespace(struct pipe *p, int target_size) +{ + struct pipe tmp, oldpipe; + int error; + tmp.pipe_buffer.buffer = 0; + + if (p->pipe_buffer.size >= (unsigned) target_size) { + return 0; /* the existing buffer is max size possible */ + } + + /* create enough space in the target */ + error = pipespace(&tmp, target_size); + if (error != 0) + return (error); + + oldpipe.pipe_buffer.buffer = p->pipe_buffer.buffer; + oldpipe.pipe_buffer.size = p->pipe_buffer.size; + + memcpy(tmp.pipe_buffer.buffer, p->pipe_buffer.buffer, p->pipe_buffer.size); + if (p->pipe_buffer.cnt > 0 && p->pipe_buffer.in <= p->pipe_buffer.out ){ + /* we are in State 3 and need extra copying for read to be consistent */ + memcpy(&tmp.pipe_buffer.buffer[p->pipe_buffer.size], p->pipe_buffer.buffer, p->pipe_buffer.size); + p->pipe_buffer.in += p->pipe_buffer.size; + } + + p->pipe_buffer.buffer = tmp.pipe_buffer.buffer; + p->pipe_buffer.size = tmp.pipe_buffer.size; + + + pipe_free_kmem(&oldpipe); + return 0; +} /* * The pipe system call for the DTYPE_PIPE type of pipes + * + * returns: + * FREAD | fd0 | -->[struct rpipe] --> |~~buffer~~| \ + * (pipe_mutex) + * FWRITE | fd1 | -->[struct wpipe] --X / */ /* ARGSUSED */ @@ -344,22 +436,12 @@ pipe(proc_t p, __unused struct pipe_args *uap, int32_t *retval) /* * allocate the space for the normal I/O direction up * front... we'll delay the allocation for the other - * direction until a write actually occurs (most - * likely it won't)... - * - * Reduce to 1/4th pipe size if we're over our global max. + * direction until a write actually occurs (most likely it won't)... */ - if (amountpipekva > maxpipekva / 2) - error = pipespace(rpipe, SMALL_PIPE_SIZE); - else - error = pipespace(rpipe, PIPE_SIZE); + error = pipespace(rpipe, choose_pipespace(rpipe->pipe_buffer.size, 0)); if (error) goto freepipes; -#ifndef PIPE_NODIRECT - rpipe->pipe_state |= PIPE_DIRECTOK; - wpipe->pipe_state |= PIPE_DIRECTOK; -#endif TAILQ_INIT(&rpipe->pipe_evlist); TAILQ_INIT(&wpipe->pipe_evlist); @@ -370,12 +452,10 @@ pipe(proc_t p, __unused struct pipe_args *uap, int32_t *retval) retval[0] = fd; /* - * for now we'll create half-duplex - * pipes... this is what we've always - * supported.. + * for now we'll create half-duplex pipes(refer returns section above). + * this is what we've always supported.. */ rf->f_flag = FREAD; - rf->f_type = DTYPE_PIPE; rf->f_data = (caddr_t)rpipe; rf->f_ops = &pipeops; @@ -385,13 +465,13 @@ pipe(proc_t p, __unused struct pipe_args *uap, int32_t *retval) goto freepipes; } wf->f_flag = FWRITE; - wf->f_type = DTYPE_PIPE; wf->f_data = (caddr_t)wpipe; wf->f_ops = &pipeops; rpipe->pipe_peer = wpipe; wpipe->pipe_peer = rpipe; - rpipe->pipe_mtxp = wpipe->pipe_mtxp = pmtx; + /* both structures share the same mutex */ + rpipe->pipe_mtxp = wpipe->pipe_mtxp = pmtx; retval[1] = fd; #if CONFIG_MACF @@ -448,20 +528,16 @@ pipe_stat(struct pipe *cpipe, void *ub, int isstat64) } #endif if (cpipe->pipe_buffer.buffer == 0) { - /* - * must be stat'ing the write fd - */ + /* must be stat'ing the write fd */ if (cpipe->pipe_peer) { - /* - * the peer still exists, use it's info - */ - pipe_size = cpipe->pipe_peer->pipe_buffer.size; + /* the peer still exists, use it's info */ + pipe_size = MAX_PIPESIZE(cpipe->pipe_peer); pipe_count = cpipe->pipe_peer->pipe_buffer.cnt; } else { pipe_count = 0; } } else { - pipe_size = cpipe->pipe_buffer.size; + pipe_size = MAX_PIPESIZE(cpipe); pipe_count = cpipe->pipe_buffer.cnt; } /* @@ -469,7 +545,7 @@ pipe_stat(struct pipe *cpipe, void *ub, int isstat64) * we might catch it in transient state */ if (pipe_size == 0) - pipe_size = PIPE_SIZE; + pipe_size = MAX(PIPE_SIZE, pipesize_blocks[0]); if (isstat64 != 0) { sb64 = (struct stat64 *)ub; @@ -497,7 +573,7 @@ pipe_stat(struct pipe *cpipe, void *ub, int isstat64) * address of this pipe's struct pipe. This number may be recycled * relatively quickly. */ - sb64->st_ino = (ino64_t)((uintptr_t)cpipe); + sb64->st_ino = (ino64_t)VM_KERNEL_ADDRPERM((uintptr_t)cpipe); } else { sb = (struct stat *)ub; @@ -524,7 +600,7 @@ pipe_stat(struct pipe *cpipe, void *ub, int isstat64) * address of this pipe's struct pipe. This number may be recycled * relatively quickly. */ - sb->st_ino = (ino_t)(uintptr_t)cpipe; + sb->st_ino = (ino_t)VM_KERNEL_ADDRPERM((uintptr_t)cpipe); } PIPE_UNLOCK(cpipe); @@ -551,10 +627,11 @@ pipespace(struct pipe *cpipe, int size) { vm_offset_t buffer; - size = round_page(size); + if (size <= 0) + return(EINVAL); - if (kmem_alloc(kernel_map, &buffer, size) != KERN_SUCCESS) - return(ENOMEM); + if ((buffer = (vm_offset_t)kalloc(size)) == 0 ) + return(ENOMEM); /* free old resources if we're resizing */ pipe_free_kmem(cpipe); @@ -577,7 +654,6 @@ static int pipe_create(struct pipe **cpipep) { struct pipe *cpipe; - cpipe = (struct pipe *)zalloc(pipe_zone); if ((*cpipep = cpipe) == NULL) @@ -589,9 +665,10 @@ pipe_create(struct pipe **cpipep) */ bzero(cpipe, sizeof *cpipe); +#ifndef CONFIG_EMBEDDED /* Initial times are all the time of creation of the pipe */ pipe_touch(cpipe, PIPE_ATIME | PIPE_MTIME | PIPE_CTIME); - +#endif return (0); } @@ -600,20 +677,17 @@ pipe_create(struct pipe **cpipep) * lock a pipe for I/O, blocking other access */ static inline int -pipelock(struct pipe *cpipe, int catch) +pipeio_lock(struct pipe *cpipe, int catch) { int error; - while (cpipe->pipe_state & PIPE_LOCKFL) { cpipe->pipe_state |= PIPE_LWANT; - error = msleep(cpipe, PIPE_MTX(cpipe), catch ? (PRIBIO | PCATCH) : PRIBIO, "pipelk", 0); if (error != 0) return (error); } cpipe->pipe_state |= PIPE_LOCKFL; - return (0); } @@ -621,16 +695,18 @@ pipelock(struct pipe *cpipe, int catch) * unlock a pipe I/O lock */ static inline void -pipeunlock(struct pipe *cpipe) +pipeio_unlock(struct pipe *cpipe) { cpipe->pipe_state &= ~PIPE_LOCKFL; - if (cpipe->pipe_state & PIPE_LWANT) { cpipe->pipe_state &= ~PIPE_LWANT; wakeup(cpipe); } } +/* + * wakeup anyone whos blocked in select + */ static void pipeselwakeup(struct pipe *cpipe, struct pipe *spipe) { @@ -651,6 +727,10 @@ pipeselwakeup(struct pipe *cpipe, struct pipe *spipe) } } +/* + * Read n bytes from the buffer. Semantics are similar to file read. + * returns: number of bytes read from the buffer + */ /* ARGSUSED */ static int pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags, @@ -664,7 +744,7 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags, PIPE_LOCK(rpipe); ++rpipe->pipe_busy; - error = pipelock(rpipe, 1); + error = pipeio_lock(rpipe, 1); if (error) goto unlocked_error; @@ -674,11 +754,17 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags, goto locked_error; #endif + while (uio_resid(uio)) { /* * normal pipe buffer receive */ if (rpipe->pipe_buffer.cnt > 0) { + /* + * # bytes to read is min( bytes from read pointer until end of buffer, + * total unread bytes, + * user requested byte count) + */ size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; if (size > rpipe->pipe_buffer.cnt) size = rpipe->pipe_buffer.cnt; @@ -686,7 +772,7 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags, if (size > (u_int) uio_resid(uio)) size = (u_int) uio_resid(uio); - PIPE_UNLOCK(rpipe); + PIPE_UNLOCK(rpipe); /* we still hold io lock.*/ error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], size, uio); @@ -699,7 +785,7 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags, rpipe->pipe_buffer.out = 0; rpipe->pipe_buffer.cnt -= size; - + /* * If there is no more to read in the pipe, reset * its pointers to the beginning. This improves @@ -710,32 +796,6 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags, rpipe->pipe_buffer.out = 0; } nread += size; -#ifndef PIPE_NODIRECT - /* - * Direct copy, bypassing a kernel buffer. - */ - } else if ((size = rpipe->pipe_map.cnt) && - (rpipe->pipe_state & PIPE_DIRECTW)) { - caddr_t va; - // LP64todo - fix this! - if (size > (u_int) uio_resid(uio)) - size = (u_int) uio_resid(uio); - - va = (caddr_t) rpipe->pipe_map.kva + - rpipe->pipe_map.pos; - PIPE_UNLOCK(rpipe); - error = uiomove(va, size, uio); - PIPE_LOCK(rpipe); - if (error) - break; - nread += size; - rpipe->pipe_map.pos += size; - rpipe->pipe_map.cnt -= size; - if (rpipe->pipe_map.cnt == 0) { - rpipe->pipe_state &= ~PIPE_DIRECTW; - wakeup(rpipe); - } -#endif } else { /* * detect EOF condition @@ -754,7 +814,7 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags, } /* - * Break if some data was read. + * Break if some data was read in previous iteration. */ if (nread > 0) break; @@ -764,7 +824,7 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags, * We will either break out with an error or we will * sleep and relock to loop. */ - pipeunlock(rpipe); + pipeio_unlock(rpipe); /* * Handle non-blocking mode operation or @@ -774,11 +834,9 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags, error = EAGAIN; } else { rpipe->pipe_state |= PIPE_WANTR; - error = msleep(rpipe, PIPE_MTX(rpipe), PRIBIO | PCATCH, "piperd", 0); - if (error == 0) - error = pipelock(rpipe, 1); + error = pipeio_lock(rpipe, 1); } if (error) goto unlocked_error; @@ -787,7 +845,7 @@ pipe_read(struct fileproc *fp, struct uio *uio, __unused int flags, #if CONFIG_MACF locked_error: #endif - pipeunlock(rpipe); + pipeio_unlock(rpipe); unlocked_error: --rpipe->pipe_busy; @@ -798,7 +856,7 @@ unlocked_error: if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); wakeup(rpipe); - } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { + } else if (rpipe->pipe_buffer.cnt < rpipe->pipe_buffer.size) { /* * Handle write blocking hysteresis. */ @@ -808,261 +866,23 @@ unlocked_error: } } - if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) + if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) > 0) pipeselwakeup(rpipe, rpipe->pipe_peer); +#ifndef CONFIG_EMBEDDED /* update last read time */ pipe_touch(rpipe, PIPE_ATIME); +#endif PIPE_UNLOCK(rpipe); return (error); } - - -#ifndef PIPE_NODIRECT -/* - * Map the sending processes' buffer into kernel space and wire it. - * This is similar to a physical write operation. - */ -static int -pipe_build_write_buffer(wpipe, uio) - struct pipe *wpipe; - struct uio *uio; -{ - pmap_t pmap; - u_int size; - int i, j; - vm_offset_t addr, endaddr; - - - size = (u_int) uio->uio_iov->iov_len; - if (size > wpipe->pipe_buffer.size) - size = wpipe->pipe_buffer.size; - - pmap = vmspace_pmap(curproc->p_vmspace); - endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size); - addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base); - for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) { - /* - * vm_fault_quick() can sleep. Consequently, - * vm_page_lock_queue() and vm_page_unlock_queue() - * should not be performed outside of this loop. - */ - race: - if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0) { - vm_page_lock_queues(); - for (j = 0; j < i; j++) - vm_page_unhold(wpipe->pipe_map.ms[j]); - vm_page_unlock_queues(); - return (EFAULT); - } - wpipe->pipe_map.ms[i] = pmap_extract_and_hold(pmap, addr, - VM_PROT_READ); - if (wpipe->pipe_map.ms[i] == NULL) - goto race; - } - -/* - * set up the control block - */ - wpipe->pipe_map.npages = i; - wpipe->pipe_map.pos = - ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK; - wpipe->pipe_map.cnt = size; - -/* - * and map the buffer - */ - if (wpipe->pipe_map.kva == 0) { - /* - * We need to allocate space for an extra page because the - * address range might (will) span pages at times. - */ - wpipe->pipe_map.kva = kmem_alloc_nofault(kernel_map, - wpipe->pipe_buffer.size + PAGE_SIZE); - atomic_add_int(&amountpipekvawired, - wpipe->pipe_buffer.size + PAGE_SIZE); - } - pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms, - wpipe->pipe_map.npages); - -/* - * and update the uio data - */ - - uio->uio_iov->iov_len -= size; - uio->uio_iov->iov_base = (char *)uio->uio_iov->iov_base + size; - if (uio->uio_iov->iov_len == 0) - uio->uio_iov++; - uio_setresid(uio, (uio_resid(uio) - size)); - uio->uio_offset += size; - return (0); -} - -/* - * unmap and unwire the process buffer - */ -static void -pipe_destroy_write_buffer(wpipe) - struct pipe *wpipe; -{ - int i; - - if (wpipe->pipe_map.kva) { - pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages); - - if (amountpipekvawired > maxpipekvawired / 2) { - /* Conserve address space */ - vm_offset_t kva = wpipe->pipe_map.kva; - wpipe->pipe_map.kva = 0; - kmem_free(kernel_map, kva, - wpipe->pipe_buffer.size + PAGE_SIZE); - atomic_subtract_int(&amountpipekvawired, - wpipe->pipe_buffer.size + PAGE_SIZE); - } - } - vm_page_lock_queues(); - for (i = 0; i < wpipe->pipe_map.npages; i++) { - vm_page_unhold(wpipe->pipe_map.ms[i]); - } - vm_page_unlock_queues(); - wpipe->pipe_map.npages = 0; -} - /* - * In the case of a signal, the writing process might go away. This - * code copies the data into the circular buffer so that the source - * pages can be freed without loss of data. + * perform a write of n bytes into the read side of buffer. Since + * pipes are unidirectional a write is meant to be read by the otherside only. */ -static void -pipe_clone_write_buffer(wpipe) - struct pipe *wpipe; -{ - int size; - int pos; - - size = wpipe->pipe_map.cnt; - pos = wpipe->pipe_map.pos; - - wpipe->pipe_buffer.in = size; - wpipe->pipe_buffer.out = 0; - wpipe->pipe_buffer.cnt = size; - wpipe->pipe_state &= ~PIPE_DIRECTW; - - PIPE_UNLOCK(wpipe); - bcopy((caddr_t) wpipe->pipe_map.kva + pos, - wpipe->pipe_buffer.buffer, size); - pipe_destroy_write_buffer(wpipe); - PIPE_LOCK(wpipe); -} - -/* - * This implements the pipe buffer write mechanism. Note that only - * a direct write OR a normal pipe write can be pending at any given time. - * If there are any characters in the pipe buffer, the direct write will - * be deferred until the receiving process grabs all of the bytes from - * the pipe buffer. Then the direct mapping write is set-up. - */ -static int -pipe_direct_write(wpipe, uio) - struct pipe *wpipe; - struct uio *uio; -{ - int error; - -retry: - while (wpipe->pipe_state & PIPE_DIRECTW) { - if (wpipe->pipe_state & PIPE_WANTR) { - wpipe->pipe_state &= ~PIPE_WANTR; - wakeup(wpipe); - } - wpipe->pipe_state |= PIPE_WANTW; - error = msleep(wpipe, PIPE_MTX(wpipe), - PRIBIO | PCATCH, "pipdww", 0); - if (error) - goto error1; - if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) { - error = EPIPE; - goto error1; - } - } - wpipe->pipe_map.cnt = 0; /* transfer not ready yet */ - if (wpipe->pipe_buffer.cnt > 0) { - if (wpipe->pipe_state & PIPE_WANTR) { - wpipe->pipe_state &= ~PIPE_WANTR; - wakeup(wpipe); - } - - wpipe->pipe_state |= PIPE_WANTW; - error = msleep(wpipe, PIPE_MTX(wpipe), - PRIBIO | PCATCH, "pipdwc", 0); - if (error) - goto error1; - if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) { - error = EPIPE; - goto error1; - } - goto retry; - } - - wpipe->pipe_state |= PIPE_DIRECTW; - - pipelock(wpipe, 0); - PIPE_UNLOCK(wpipe); - error = pipe_build_write_buffer(wpipe, uio); - PIPE_LOCK(wpipe); - pipeunlock(wpipe); - if (error) { - wpipe->pipe_state &= ~PIPE_DIRECTW; - goto error1; - } - - error = 0; - while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) { - if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) { - pipelock(wpipe, 0); - PIPE_UNLOCK(wpipe); - pipe_destroy_write_buffer(wpipe); - PIPE_LOCK(wpipe); - pipeselwakeup(wpipe, wpipe); - pipeunlock(wpipe); - error = EPIPE; - goto error1; - } - if (wpipe->pipe_state & PIPE_WANTR) { - wpipe->pipe_state &= ~PIPE_WANTR; - wakeup(wpipe); - } - pipeselwakeup(wpipe, wpipe); - error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH, - "pipdwt", 0); - } - - pipelock(wpipe,0); - if (wpipe->pipe_state & PIPE_DIRECTW) { - /* - * this bit of trickery substitutes a kernel buffer for - * the process that might be going away. - */ - pipe_clone_write_buffer(wpipe); - } else { - PIPE_UNLOCK(wpipe); - pipe_destroy_write_buffer(wpipe); - PIPE_LOCK(wpipe); - } - pipeunlock(wpipe); - return (error); - -error1: - wakeup(wpipe); - return (error); -} -#endif - - - static int pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags, __unused vfs_context_t ctx) @@ -1071,6 +891,9 @@ pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags, int orig_resid; int pipe_size; struct pipe *wpipe, *rpipe; + // LP64todo - fix this! + orig_resid = uio_resid(uio); + int space; rpipe = (struct pipe *)fp->f_data; @@ -1095,54 +918,35 @@ pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags, pipe_size = 0; - if (wpipe->pipe_buffer.buffer == 0) { - /* - * need to allocate some storage... we delay the allocation - * until the first write on fd[0] to avoid allocating storage for both - * 'pipe ends'... most pipes are half-duplex with the writes targeting - * fd[1], so allocating space for both ends is a waste... - * - * Reduce to 1/4th pipe size if we're over our global max. - */ - if (amountpipekva > maxpipekva / 2) - pipe_size = SMALL_PIPE_SIZE; - else - pipe_size = PIPE_SIZE; - } - /* - * If it is advantageous to resize the pipe buffer, do - * so. + * need to allocate some storage... we delay the allocation + * until the first write on fd[0] to avoid allocating storage for both + * 'pipe ends'... most pipes are half-duplex with the writes targeting + * fd[1], so allocating space for both ends is a waste... */ - if ((uio_resid(uio) > PIPE_SIZE) && - (wpipe->pipe_buffer.size <= PIPE_SIZE) && - (amountpipekva < maxpipekva / 2) && - (nbigpipe < LIMITBIGPIPES) && -#ifndef PIPE_NODIRECT - (wpipe->pipe_state & PIPE_DIRECTW) == 0 && -#endif - (wpipe->pipe_buffer.cnt == 0)) { - pipe_size = BIG_PIPE_SIZE; + if ( wpipe->pipe_buffer.buffer == 0 || ( + (unsigned)orig_resid > wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt && + amountpipekva < maxpipekva ) ) { + pipe_size = choose_pipespace(wpipe->pipe_buffer.size, wpipe->pipe_buffer.cnt + orig_resid); } if (pipe_size) { /* * need to do initial allocation or resizing of pipe + * holding both structure and io locks. */ - if ((error = pipelock(wpipe, 1)) == 0) { - PIPE_UNLOCK(wpipe); - if (pipespace(wpipe, pipe_size) == 0) - OSAddAtomic(1, &nbigpipe); - PIPE_LOCK(wpipe); - pipeunlock(wpipe); - - if (wpipe->pipe_buffer.buffer == 0) { - /* - * initial allocation failed - */ + if ((error = pipeio_lock(wpipe, 1)) == 0) { + if (wpipe->pipe_buffer.cnt == 0) + error = pipespace(wpipe, pipe_size); + else + error = expand_pipespace(wpipe, pipe_size); + + pipeio_unlock(wpipe); + + /* allocation failed */ + if (wpipe->pipe_buffer.buffer == 0) error = ENOMEM; - } } if (error) { /* @@ -1159,91 +963,35 @@ pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags, return(error); } } - // LP64todo - fix this! - orig_resid = uio_resid(uio); while (uio_resid(uio)) { - int space; - -#ifndef PIPE_NODIRECT - /* - * If the transfer is large, we can gain performance if - * we do process-to-process copies directly. - * If the write is non-blocking, we don't use the - * direct write mechanism. - * - * The direct write mechanism will detect the reader going - * away on us. - */ - if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) && - (fp->f_flag & FNONBLOCK) == 0 && - amountpipekvawired + uio_resid(uio) < maxpipekvawired) { - error = pipe_direct_write(wpipe, uio); - if (error) - break; - continue; - } - /* - * Pipe buffered writes cannot be coincidental with - * direct writes. We wait until the currently executing - * direct write is completed before we start filling the - * pipe buffer. We break out if a signal occurs or the - * reader goes away. - */ retrywrite: - while (wpipe->pipe_state & PIPE_DIRECTW) { - if (wpipe->pipe_state & PIPE_WANTR) { - wpipe->pipe_state &= ~PIPE_WANTR; - wakeup(wpipe); - } - error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH, "pipbww", 0); - - if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) - break; - if (error) - break; - } -#else - retrywrite: -#endif space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; - /* - * Writes of size <= PIPE_BUF must be atomic. - */ + /* Writes of size <= PIPE_BUF must be atomic. */ if ((space < uio_resid(uio)) && (orig_resid <= PIPE_BUF)) space = 0; if (space > 0) { - if ((error = pipelock(wpipe,1)) == 0) { + if ((error = pipeio_lock(wpipe,1)) == 0) { int size; /* Transfer size */ int segsize; /* first segment to transfer */ if (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) { - pipeunlock(wpipe); + pipeio_unlock(wpipe); error = EPIPE; break; } -#ifndef PIPE_NODIRECT - /* - * It is possible for a direct write to - * slip in on us... handle it here... - */ - if (wpipe->pipe_state & PIPE_DIRECTW) { - pipeunlock(wpipe); - goto retrywrite; - } -#endif /* - * If a process blocked in pipelock, our + * If a process blocked in pipeio_lock, our * value for space might be bad... the mutex * is dropped while we're blocked */ if (space > (int)(wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt)) { - pipeunlock(wpipe); + pipeio_unlock(wpipe); goto retrywrite; } @@ -1279,7 +1027,7 @@ pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags, /* * Transfer remaining part now, to * support atomic writes. Wraparound - * happened. + * happened. (State 3) */ if (wpipe->pipe_buffer.in + segsize != wpipe->pipe_buffer.size) @@ -1292,9 +1040,12 @@ pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags, size - segsize, uio); PIPE_LOCK(rpipe); } + /* + * readers never know to read until count is updated. + */ if (error == 0) { wpipe->pipe_buffer.in += size; - if (wpipe->pipe_buffer.in >= + if (wpipe->pipe_buffer.in > wpipe->pipe_buffer.size) { if (wpipe->pipe_buffer.in != size - segsize + @@ -1311,7 +1062,7 @@ pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags, panic("Pipe buffer overflow"); } - pipeunlock(wpipe); + pipeio_unlock(wpipe); } if (error) break; @@ -1377,9 +1128,11 @@ pipe_write(struct fileproc *fp, struct uio *uio, __unused int flags, pipeselwakeup(wpipe, wpipe); } +#ifndef CONFIG_EMBEDDED /* Update modification, status change (# of bytes in pipe) times */ pipe_touch(rpipe, PIPE_MTIME | PIPE_CTIME); pipe_touch(wpipe, PIPE_MTIME | PIPE_CTIME); +#endif PIPE_UNLOCK(rpipe); return (error); @@ -1425,12 +1178,7 @@ pipe_ioctl(struct fileproc *fp, u_long cmd, caddr_t data, return (0); case FIONREAD: -#ifndef PIPE_NODIRECT - if (mpipe->pipe_state & PIPE_DIRECTW) - *(int *)data = mpipe->pipe_map.cnt; - else -#endif - *(int *)data = mpipe->pipe_buffer.cnt; + *(int *)data = mpipe->pipe_buffer.cnt; PIPE_UNLOCK(mpipe); return (0); @@ -1465,6 +1213,7 @@ pipe_select(struct fileproc *fp, int which, void *wql, vfs_context_t ctx) PIPE_LOCK(rpipe); wpipe = rpipe->pipe_peer; + #if CONFIG_MACF /* @@ -1492,9 +1241,11 @@ pipe_select(struct fileproc *fp, int which, void *wql, vfs_context_t ctx) break; case FWRITE: + if (wpipe) + wpipe->pipe_state |= PIPE_WSELECT; if (wpipe == NULL || (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) || (((wpipe->pipe_state & PIPE_DIRECTW) == 0) && - (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) { + (MAX_PIPESIZE(wpipe) - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) { retnum = 1; } else { @@ -1523,7 +1274,6 @@ pipe_close(struct fileglob *fg, __unused vfs_context_t ctx) cpipe = (struct pipe *)fg->fg_data; fg->fg_data = NULL; proc_fdunlock(vfs_context_proc(ctx)); - if (cpipe) pipeclose(cpipe); @@ -1533,30 +1283,14 @@ pipe_close(struct fileglob *fg, __unused vfs_context_t ctx) static void pipe_free_kmem(struct pipe *cpipe) { - if (cpipe->pipe_buffer.buffer != NULL) { - if (cpipe->pipe_buffer.size > PIPE_SIZE) - OSAddAtomic(-1, &nbigpipe); OSAddAtomic(-(cpipe->pipe_buffer.size), &amountpipekva); OSAddAtomic(-1, &amountpipes); - - kmem_free(kernel_map, (vm_offset_t)cpipe->pipe_buffer.buffer, + kfree((void *)cpipe->pipe_buffer.buffer, cpipe->pipe_buffer.size); cpipe->pipe_buffer.buffer = NULL; + cpipe->pipe_buffer.size = 0; } -#ifndef PIPE_NODIRECT - if (cpipe->pipe_map.kva != 0) { - atomic_subtract_int(&amountpipekvawired, - cpipe->pipe_buffer.size + PAGE_SIZE); - kmem_free(kernel_map, - cpipe->pipe_map.kva, - cpipe->pipe_buffer.size + PAGE_SIZE); - cpipe->pipe_map.cnt = 0; - cpipe->pipe_map.kva = 0; - cpipe->pipe_map.pos = 0; - cpipe->pipe_map.npages = 0; - } -#endif } /* @@ -1569,7 +1303,6 @@ pipeclose(struct pipe *cpipe) if (cpipe == NULL) return; - /* partially created pipes won't have a valid mutex. */ if (PIPE_MTX(cpipe) != NULL) PIPE_LOCK(cpipe); @@ -1622,33 +1355,205 @@ pipeclose(struct pipe *cpipe) * free resources */ if (PIPE_MTX(cpipe) != NULL) { - if (ppipe != NULL) { - /* + if (ppipe != NULL) { + /* * since the mutex is shared and the peer is still * alive, we need to release the mutex, not free it */ - PIPE_UNLOCK(cpipe); + PIPE_UNLOCK(cpipe); } else { - /* + /* * peer is gone, so we're the sole party left with - * interest in this mutex... we can just free it + * interest in this mutex... unlock and free it */ + PIPE_UNLOCK(cpipe); lck_mtx_free(PIPE_MTX(cpipe), pipe_mtx_grp); } } pipe_free_kmem(cpipe); + if (cpipe->pipe_state & PIPE_WSELECT) { + pipe_garbage_collect(cpipe); + } else { + zfree(pipe_zone, cpipe); + pipe_garbage_collect(NULL); + } + +} + +/*ARGSUSED*/ +static int +filt_piperead_common(struct knote *kn, struct pipe *rpipe) +{ + struct pipe *wpipe; + int retval; + + /* + * we're being called back via the KNOTE post + * we made in pipeselwakeup, and we already hold the mutex... + */ + + wpipe = rpipe->pipe_peer; + kn->kn_data = rpipe->pipe_buffer.cnt; + if ((rpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) || + (wpipe == NULL) || (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF))) { + kn->kn_flags |= EV_EOF; + retval = 1; + } else { + int64_t lowwat = 1; + if (kn->kn_sfflags & NOTE_LOWAT) { + if (rpipe->pipe_buffer.size && kn->kn_sdata > MAX_PIPESIZE(rpipe)) + lowwat = MAX_PIPESIZE(rpipe); + else if (kn->kn_sdata > lowwat) + lowwat = kn->kn_sdata; + } + retval = kn->kn_data >= lowwat; + } + return (retval); +} + +static int +filt_piperead(struct knote *kn, long hint) +{ +#pragma unused(hint) + struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; + + return filt_piperead_common(kn, rpipe); +} + +static int +filt_pipereadtouch(struct knote *kn, struct kevent_internal_s *kev) +{ + struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; + int retval; + + PIPE_LOCK(rpipe); + + /* accept new inputs (and save the low water threshold and flag) */ + kn->kn_sdata = kev->data; + kn->kn_sfflags = kev->fflags; + if ((kn->kn_status & KN_UDATA_SPECIFIC) == 0) + kn->kn_udata = kev->udata; + + /* identify if any events are now fired */ + retval = filt_piperead_common(kn, rpipe); + + PIPE_UNLOCK(rpipe); + + return retval; +} + +static int +filt_pipereadprocess(struct knote *kn, struct filt_process_s *data, struct kevent_internal_s *kev) +{ +#pragma unused(data) + struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; + int retval; - zfree(pipe_zone, cpipe); + PIPE_LOCK(rpipe); + retval = filt_piperead_common(kn, rpipe); + if (retval) { + *kev = kn->kn_kevent; + if (kn->kn_flags & EV_CLEAR) { + kn->kn_fflags = 0; + kn->kn_data = 0; + } + } + PIPE_UNLOCK(rpipe); + return (retval); } /*ARGSUSED*/ static int -pipe_kqfilter(__unused struct fileproc *fp, struct knote *kn, __unused vfs_context_t ctx) +filt_pipewrite_common(struct knote *kn, struct pipe *rpipe) { - struct pipe *cpipe; + struct pipe *wpipe; + + /* + * we're being called back via the KNOTE post + * we made in pipeselwakeup, and we already hold the mutex... + */ + wpipe = rpipe->pipe_peer; + + if ((wpipe == NULL) || (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF))) { + kn->kn_data = 0; + kn->kn_flags |= EV_EOF; + return (1); + } + kn->kn_data = MAX_PIPESIZE(wpipe) - wpipe->pipe_buffer.cnt; + + int64_t lowwat = PIPE_BUF; + if (kn->kn_sfflags & NOTE_LOWAT) { + if (wpipe->pipe_buffer.size && kn->kn_sdata > MAX_PIPESIZE(wpipe)) + lowwat = MAX_PIPESIZE(wpipe); + else if (kn->kn_sdata > lowwat) + lowwat = kn->kn_sdata; + } + + return (kn->kn_data >= lowwat); +} + +/*ARGSUSED*/ +static int +filt_pipewrite(struct knote *kn, long hint) +{ +#pragma unused(hint) + struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; + + return filt_pipewrite_common(kn, rpipe); +} + + +static int +filt_pipewritetouch(struct knote *kn, struct kevent_internal_s *kev) +{ + struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; + int res; + + PIPE_LOCK(rpipe); + + /* accept new kevent data (and save off lowat threshold and flag) */ + kn->kn_sfflags = kev->fflags; + kn->kn_sdata = kev->data; + if ((kn->kn_status & KN_UDATA_SPECIFIC) == 0) + kn->kn_udata = kev->udata; + + /* determine if any event is now deemed fired */ + res = filt_pipewrite_common(kn, rpipe); + + PIPE_UNLOCK(rpipe); + + return res; +} + +static int +filt_pipewriteprocess(struct knote *kn, struct filt_process_s *data, struct kevent_internal_s *kev) +{ +#pragma unused(data) + struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; + int res; + + PIPE_LOCK(rpipe); + res = filt_pipewrite_common(kn, rpipe); + if (res) { + *kev = kn->kn_kevent; + if (kn->kn_flags & EV_CLEAR) { + kn->kn_fflags = 0; + kn->kn_data = 0; + } + } + PIPE_UNLOCK(rpipe); + + return res; +} - cpipe = (struct pipe *)kn->kn_fp->f_data; +/*ARGSUSED*/ +static int +pipe_kqfilter(__unused struct fileproc *fp, struct knote *kn, + __unused struct kevent_internal_s *kev, __unused vfs_context_t ctx) +{ + struct pipe *cpipe = (struct pipe *)kn->kn_fp->f_data; + int res; PIPE_LOCK(cpipe); #if CONFIG_MACF @@ -1659,38 +1564,50 @@ pipe_kqfilter(__unused struct fileproc *fp, struct knote *kn, __unused vfs_conte */ if (mac_pipe_check_kqfilter(vfs_context_ucred(ctx), kn, cpipe) != 0) { PIPE_UNLOCK(cpipe); - return (1); + kn->kn_flags = EV_ERROR; + kn->kn_data = EPERM; + return 0; } #endif switch (kn->kn_filter) { case EVFILT_READ: - kn->kn_fop = &pipe_rfiltops; + kn->kn_filtid = EVFILTID_PIPE_R; + /* determine initial state */ + res = filt_piperead_common(kn, cpipe); break; + case EVFILT_WRITE: - kn->kn_fop = &pipe_wfiltops; + kn->kn_filtid = EVFILTID_PIPE_W; if (cpipe->pipe_peer == NULL) { /* * other end of pipe has been closed */ PIPE_UNLOCK(cpipe); - return (EPIPE); + kn->kn_flags = EV_ERROR; + kn->kn_data = EPIPE; + return 0; } if (cpipe->pipe_peer) cpipe = cpipe->pipe_peer; + + /* determine inital state */ + res = filt_pipewrite_common(kn, cpipe); break; default: PIPE_UNLOCK(cpipe); - return (1); + kn->kn_flags = EV_ERROR; + kn->kn_data = EINVAL; + return 0; } if (KNOTE_ATTACH(&cpipe->pipe_sel.si_note, kn)) cpipe->pipe_state |= PIPE_KNOTE; PIPE_UNLOCK(cpipe); - return (0); + return res; } static void @@ -1714,106 +1631,13 @@ filt_pipedetach(struct knote *kn) PIPE_UNLOCK(cpipe); } -/*ARGSUSED*/ -static int -filt_piperead(struct knote *kn, long hint) -{ - struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; - struct pipe *wpipe; - int retval; - - /* - * if hint == 0, then we've been called from the kevent - * world directly and do not currently hold the pipe mutex... - * if hint == 1, we're being called back via the KNOTE post - * we made in pipeselwakeup, and we already hold the mutex... - */ - if (hint == 0) - PIPE_LOCK(rpipe); - - wpipe = rpipe->pipe_peer; - kn->kn_data = rpipe->pipe_buffer.cnt; - -#ifndef PIPE_NODIRECT - if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW)) - kn->kn_data = rpipe->pipe_map.cnt; -#endif - if ((rpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF)) || - (wpipe == NULL) || (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF))) { - kn->kn_flags |= EV_EOF; - retval = 1; - } else { - int64_t lowwat = 1; - if (kn->kn_sfflags & NOTE_LOWAT) { - if (rpipe->pipe_buffer.size && kn->kn_sdata > rpipe->pipe_buffer.size) - lowwat = rpipe->pipe_buffer.size; - else if (kn->kn_sdata > lowwat) - lowwat = kn->kn_sdata; - } - retval = kn->kn_data >= lowwat; - } - - if (hint == 0) - PIPE_UNLOCK(rpipe); - - return (retval); -} - -/*ARGSUSED*/ -static int -filt_pipewrite(struct knote *kn, long hint) -{ - struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; - struct pipe *wpipe; - - /* - * if hint == 0, then we've been called from the kevent - * world directly and do not currently hold the pipe mutex... - * if hint == 1, we're being called back via the KNOTE post - * we made in pipeselwakeup, and we already hold the mutex... - */ - if (hint == 0) - PIPE_LOCK(rpipe); - - wpipe = rpipe->pipe_peer; - - if ((wpipe == NULL) || (wpipe->pipe_state & (PIPE_DRAIN | PIPE_EOF))) { - kn->kn_data = 0; - kn->kn_flags |= EV_EOF; - - if (hint == 0) - PIPE_UNLOCK(rpipe); - return (1); - } - kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; - if (!kn->kn_data && wpipe->pipe_buffer.size == 0) - kn->kn_data = PIPE_BUF; /* unwritten pipe is ready for write */ - -#ifndef PIPE_NODIRECT - if (wpipe->pipe_state & PIPE_DIRECTW) - kn->kn_data = 0; -#endif - int64_t lowwat = PIPE_BUF; - if (kn->kn_sfflags & NOTE_LOWAT) { - if (wpipe->pipe_buffer.size && kn->kn_sdata > wpipe->pipe_buffer.size) - lowwat = wpipe->pipe_buffer.size; - else if (kn->kn_sdata > lowwat) - lowwat = kn->kn_sdata; - } - - if (hint == 0) - PIPE_UNLOCK(rpipe); - - return (kn->kn_data >= lowwat); -} - int fill_pipeinfo(struct pipe * cpipe, struct pipe_info * pinfo) { #if CONFIG_MACF int error; #endif - struct timeval now; + struct timespec now; struct vinfo_stat * ub; int pipe_size = 0; int pipe_count; @@ -1837,13 +1661,13 @@ fill_pipeinfo(struct pipe * cpipe, struct pipe_info * pinfo) /* * the peer still exists, use it's info */ - pipe_size = cpipe->pipe_peer->pipe_buffer.size; + pipe_size = MAX_PIPESIZE(cpipe->pipe_peer); pipe_count = cpipe->pipe_peer->pipe_buffer.cnt; } else { pipe_count = 0; } } else { - pipe_size = cpipe->pipe_buffer.size; + pipe_size = MAX_PIPESIZE(cpipe); pipe_count = cpipe->pipe_buffer.cnt; } /* @@ -1866,23 +1690,23 @@ fill_pipeinfo(struct pipe * cpipe, struct pipe_info * pinfo) ub->vst_uid = kauth_getuid(); ub->vst_gid = kauth_getgid(); - microtime(&now); + nanotime(&now); ub->vst_atime = now.tv_sec; - ub->vst_atimensec = now.tv_usec * 1000; + ub->vst_atimensec = now.tv_nsec; ub->vst_mtime = now.tv_sec; - ub->vst_mtimensec = now.tv_usec * 1000; + ub->vst_mtimensec = now.tv_nsec; ub->vst_ctime = now.tv_sec; - ub->vst_ctimensec = now.tv_usec * 1000; + ub->vst_ctimensec = now.tv_nsec; /* * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen, st_uid, st_gid. * XXX (st_dev, st_ino) should be unique. */ - pinfo->pipe_handle = (uint64_t)((uintptr_t)cpipe); - pinfo->pipe_peerhandle = (uint64_t)((uintptr_t)(cpipe->pipe_peer)); + pinfo->pipe_handle = (uint64_t)VM_KERNEL_ADDRPERM((uintptr_t)cpipe); + pinfo->pipe_peerhandle = (uint64_t)VM_KERNEL_ADDRPERM((uintptr_t)(cpipe->pipe_peer)); pinfo->pipe_status = cpipe->pipe_state; PIPE_UNLOCK(cpipe); @@ -1919,6 +1743,75 @@ pipe_drain(struct fileproc *fp, __unused vfs_context_t ctx) } + /* + * When a thread sets a write-select on a pipe, it creates an implicit, + * untracked dependency between that thread and the peer of the pipe + * on which the select is set. If the peer pipe is closed and freed + * before the select()ing thread wakes up, the system will panic as + * it attempts to unwind the dangling select(). To avoid that panic, + * we notice whenever a dangerous select() is set on a pipe, and + * defer the final deletion of the pipe until that select()s are all + * resolved. Since we can't currently detect exactly when that + * resolution happens, we use a simple garbage collection queue to + * reap the at-risk pipes 'later'. + */ +static void +pipe_garbage_collect(struct pipe *cpipe) +{ + uint64_t old, now; + struct pipe_garbage *pgp; + + /* Convert msecs to nsecs and then to abstime */ + old = pipe_garbage_age_limit * 1000000; + nanoseconds_to_absolutetime(old, &old); + + lck_mtx_lock(pipe_garbage_lock); + + /* Free anything that's been on the queue for seconds */ + now = mach_absolute_time(); + old = now - old; + while ((pgp = pipe_garbage_head) && pgp->pg_timestamp < old) { + pipe_garbage_head = pgp->pg_next; + if (pipe_garbage_head == NULL) + pipe_garbage_tail = NULL; + pipe_garbage_count--; + zfree(pipe_zone, pgp->pg_pipe); + zfree(pipe_garbage_zone, pgp); + } + /* Add the new pipe (if any) to the tail of the garbage queue */ + if (cpipe) { + cpipe->pipe_state = PIPE_DEAD; + pgp = (struct pipe_garbage *)zalloc(pipe_garbage_zone); + if (pgp == NULL) { + /* + * We're too low on memory to garbage collect the + * pipe. Freeing it runs the risk of panicing the + * system. All we can do is leak it and leave + * a breadcrumb behind. The good news, such as it + * is, is that this will probably never happen. + * We will probably hit the panic below first. + */ + printf("Leaking pipe %p - no room left in the queue", + cpipe); + lck_mtx_unlock(pipe_garbage_lock); + return; + } + + pgp->pg_pipe = cpipe; + pgp->pg_timestamp = now; + pgp->pg_next = NULL; + if (pipe_garbage_tail) + pipe_garbage_tail->pg_next = pgp; + pipe_garbage_tail = pgp; + if (pipe_garbage_head == NULL) + pipe_garbage_head = pipe_garbage_tail; + + if (pipe_garbage_count++ >= PIPE_GARBAGE_QUEUE_LIMIT) + panic("Length of pipe garbage queue exceeded %d", + PIPE_GARBAGE_QUEUE_LIMIT); + } + lck_mtx_unlock(pipe_garbage_lock); +}