X-Git-Url: https://git.saurik.com/apple/xnu.git/blobdiff_plain/89b3af67bb32e691275bf6fa803d1834b2284115..a1c7dba18ef36983396c282fe85292db066e39db:/bsd/kern/kern_aio.c diff --git a/bsd/kern/kern_aio.c b/bsd/kern/kern_aio.c index aadb1654d..2513122e6 100644 --- a/bsd/kern/kern_aio.c +++ b/bsd/kern/kern_aio.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2003-2004 Apple Computer, Inc. All rights reserved. + * Copyright (c) 2003-2014 Apple Inc. All rights reserved. * * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ * @@ -69,6 +69,8 @@ #include +#include + #include #define AIO_work_queued 1 #define AIO_worker_wake 2 @@ -117,17 +119,34 @@ * user process calls aio_return or the process exits, either way that is our * trigger to release aio resources. */ +typedef struct aio_workq { + TAILQ_HEAD(, aio_workq_entry) aioq_entries; + int aioq_count; + lck_mtx_t aioq_mtx; + wait_queue_t aioq_waitq; +} *aio_workq_t; + +#define AIO_NUM_WORK_QUEUES 1 struct aio_anchor_cb { - int aio_async_workq_count; /* entries on aio_async_workq */ - int lio_sync_workq_count; /* entries on lio_sync_workq */ - int aio_active_count; /* entries on all active queues (proc.aio_activeq) */ - int aio_done_count; /* entries on all done queues (proc.aio_doneq) */ - TAILQ_HEAD( , aio_workq_entry ) aio_async_workq; - TAILQ_HEAD( , aio_workq_entry ) lio_sync_workq; + volatile int32_t aio_inflight_count; /* entries that have been taken from a workq */ + volatile int32_t aio_done_count; /* entries on all done queues (proc.aio_doneq) */ + volatile int32_t aio_total_count; /* total extant entries */ + + /* Hash table of queues here */ + int aio_num_workqs; + struct aio_workq aio_async_workqs[AIO_NUM_WORK_QUEUES]; }; typedef struct aio_anchor_cb aio_anchor_cb; +struct aio_lio_context +{ + int io_waiter; + int io_issued; + int io_completed; +}; +typedef struct aio_lio_context aio_lio_context; + /* * Notes on aio sleep / wake channels. @@ -135,88 +154,344 @@ typedef struct aio_anchor_cb aio_anchor_cb; * us sleep channels that currently do not collide with any other kernel routines. * At this time, for binary compatibility reasons, we cannot create new proc fields. */ -#define AIO_SUSPEND_SLEEP_CHAN p_estcpu -#define AIO_CLEANUP_SLEEP_CHAN p_pctcpu - - -/* - * aysnc IO locking macros used to protect critical sections. - */ -#define AIO_LOCK lck_mtx_lock(aio_lock) -#define AIO_UNLOCK lck_mtx_unlock(aio_lock) +#define AIO_SUSPEND_SLEEP_CHAN p_aio_active_count +#define AIO_CLEANUP_SLEEP_CHAN p_aio_total_count +#define ASSERT_AIO_FROM_PROC(aiop, theproc) \ + if ((aiop)->procp != (theproc)) { \ + panic("AIO on a proc list that does not belong to that proc.\n"); \ + } /* * LOCAL PROTOTYPES */ -static int aio_active_requests_for_process( struct proc *procp ); +static void aio_proc_lock(proc_t procp); +static void aio_proc_lock_spin(proc_t procp); +static void aio_proc_unlock(proc_t procp); +static lck_mtx_t* aio_proc_mutex(proc_t procp); +static void aio_proc_move_done_locked(proc_t procp, aio_workq_entry *entryp); +static void aio_proc_remove_done_locked(proc_t procp, aio_workq_entry *entryp); +static int aio_get_process_count(proc_t procp ); +static int aio_active_requests_for_process(proc_t procp ); +static int aio_proc_active_requests_for_file(proc_t procp, int fd); +static boolean_t is_already_queued(proc_t procp, user_addr_t aiocbp ); +static boolean_t should_cancel(aio_workq_entry *entryp, user_addr_t aiocbp, int fd); + +static void aio_entry_lock(aio_workq_entry *entryp); +static void aio_entry_lock_spin(aio_workq_entry *entryp); +static aio_workq_t aio_entry_workq(aio_workq_entry *entryp); +static lck_mtx_t* aio_entry_mutex(__unused aio_workq_entry *entryp); +static void aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp); +static void aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp); +static void aio_entry_ref_locked(aio_workq_entry *entryp); +static void aio_entry_unref_locked(aio_workq_entry *entryp); +static void aio_entry_ref(aio_workq_entry *entryp); +static void aio_entry_unref(aio_workq_entry *entryp); +static void aio_entry_update_for_cancel(aio_workq_entry *entryp, boolean_t cancelled, + int wait_for_completion, boolean_t disable_notification); +static int aio_entry_try_workq_remove(aio_workq_entry *entryp); static boolean_t aio_delay_fsync_request( aio_workq_entry *entryp ); -static int aio_free_request( aio_workq_entry *entryp, vm_map_t the_map ); -static int aio_get_all_queues_count( void ); -static int aio_get_process_count( struct proc *procp ); -static aio_workq_entry * aio_get_some_work( void ); -static boolean_t aio_last_group_io( aio_workq_entry *entryp ); -static void aio_mark_requests( aio_workq_entry *entryp ); -static int aio_queue_async_request( struct proc *procp, - user_addr_t aiocbp, - int kindOfIO ); -static int aio_validate( aio_workq_entry *entryp ); -static void aio_work_thread( void ); -static int do_aio_cancel( struct proc *p, - int fd, - user_addr_t aiocbp, - boolean_t wait_for_completion, - boolean_t disable_notification ); -static void do_aio_completion( aio_workq_entry *entryp ); -static int do_aio_fsync( aio_workq_entry *entryp ); -static int do_aio_read( aio_workq_entry *entryp ); -static int do_aio_write( aio_workq_entry *entryp ); -static void do_munge_aiocb( struct aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp ); -static boolean_t is_already_queued( struct proc *procp, - user_addr_t aiocbp ); -static int lio_create_async_entry( struct proc *procp, - user_addr_t aiocbp, - user_addr_t sigp, - long group_tag, - aio_workq_entry **entrypp ); -static int lio_create_sync_entry( struct proc *procp, - user_addr_t aiocbp, - long group_tag, - aio_workq_entry **entrypp ); - +static int aio_free_request(aio_workq_entry *entryp); + +static void aio_workq_init(aio_workq_t wq); +static void aio_workq_lock_spin(aio_workq_t wq); +static void aio_workq_unlock(aio_workq_t wq); +static lck_mtx_t* aio_workq_mutex(aio_workq_t wq); + +static void aio_work_thread( void ); +static aio_workq_entry *aio_get_some_work( void ); + +static int aio_get_all_queues_count( void ); +static int aio_queue_async_request(proc_t procp, user_addr_t aiocbp, int kindOfIO ); +static int aio_validate( aio_workq_entry *entryp ); +static int aio_increment_total_count(void); +static int aio_decrement_total_count(void); + +static int do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp, int wait_for_completion, boolean_t disable_notification ); +static void do_aio_completion( aio_workq_entry *entryp ); +static int do_aio_fsync( aio_workq_entry *entryp ); +static int do_aio_read( aio_workq_entry *entryp ); +static int do_aio_write( aio_workq_entry *entryp ); +static void do_munge_aiocb_user32_to_user( struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp ); +static void do_munge_aiocb_user64_to_user( struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp ); +static int lio_create_entry(proc_t procp, + user_addr_t aiocbp, + void *group_tag, + aio_workq_entry **entrypp ); +static aio_workq_entry *aio_create_queue_entry(proc_t procp, + user_addr_t aiocbp, + void *group_tag, + int kindOfIO); +static user_addr_t *aio_copy_in_list(proc_t procp, user_addr_t aiocblist, int nent); +static void free_lio_context(aio_lio_context* context); +static void aio_enqueue_work( proc_t procp, aio_workq_entry *entryp, int proc_locked); + +#define ASSERT_AIO_PROC_LOCK_OWNED(p) lck_mtx_assert(aio_proc_mutex((p)), LCK_MTX_ASSERT_OWNED) +#define ASSERT_AIO_WORKQ_LOCK_OWNED(q) lck_mtx_assert(aio_workq_mutex((q)), LCK_MTX_ASSERT_OWNED) +#define ASSERT_AIO_ENTRY_LOCK_OWNED(e) lck_mtx_assert(aio_entry_mutex((e)), LCK_MTX_ASSERT_OWNED) /* * EXTERNAL PROTOTYPES */ /* in ...bsd/kern/sys_generic.c */ -extern int dofileread( struct proc *p, struct fileproc *fp, int fd, - user_addr_t bufp, user_size_t nbyte, - off_t offset, int flags, user_ssize_t *retval ); -extern int dofilewrite( struct proc *p, struct fileproc *fp, int fd, - user_addr_t bufp, user_size_t nbyte, off_t offset, - int flags, user_ssize_t *retval ); +extern int dofileread(vfs_context_t ctx, struct fileproc *fp, + user_addr_t bufp, user_size_t nbyte, + off_t offset, int flags, user_ssize_t *retval ); +extern int dofilewrite(vfs_context_t ctx, struct fileproc *fp, + user_addr_t bufp, user_size_t nbyte, off_t offset, + int flags, user_ssize_t *retval ); +#if DEBUG +static uint32_t lio_contexts_alloced = 0; +#endif /* DEBUG */ /* * aio external global variables. */ -extern int aio_max_requests; /* AIO_MAX - configurable */ +extern int aio_max_requests; /* AIO_MAX - configurable */ extern int aio_max_requests_per_process; /* AIO_PROCESS_MAX - configurable */ -extern int aio_worker_threads; /* AIO_THREAD_COUNT - configurable */ +extern int aio_worker_threads; /* AIO_THREAD_COUNT - configurable */ /* * aio static variables. */ -static aio_anchor_cb aio_anchor; -static lck_mtx_t * aio_lock; -static lck_grp_t * aio_lock_grp; -static lck_attr_t * aio_lock_attr; -static lck_grp_attr_t * aio_lock_grp_attr; -static struct zone *aio_workq_zonep; +static aio_anchor_cb aio_anchor; +static lck_grp_t *aio_proc_lock_grp; +static lck_grp_t *aio_entry_lock_grp; +static lck_grp_t *aio_queue_lock_grp; +static lck_attr_t *aio_lock_attr; +static lck_grp_attr_t *aio_lock_grp_attr; +static struct zone *aio_workq_zonep; +static lck_mtx_t aio_entry_mtx; +static lck_mtx_t aio_proc_mtx; + +static void +aio_entry_lock(__unused aio_workq_entry *entryp) +{ + lck_mtx_lock(&aio_entry_mtx); +} + +static void +aio_entry_lock_spin(__unused aio_workq_entry *entryp) +{ + lck_mtx_lock_spin(&aio_entry_mtx); +} + +static void +aio_entry_unlock(__unused aio_workq_entry *entryp) +{ + lck_mtx_unlock(&aio_entry_mtx); +} + +/* Hash */ +static aio_workq_t +aio_entry_workq(__unused aio_workq_entry *entryp) +{ + return &aio_anchor.aio_async_workqs[0]; +} + +static lck_mtx_t* +aio_entry_mutex(__unused aio_workq_entry *entryp) +{ + return &aio_entry_mtx; +} + +static void +aio_workq_init(aio_workq_t wq) +{ + TAILQ_INIT(&wq->aioq_entries); + wq->aioq_count = 0; + lck_mtx_init(&wq->aioq_mtx, aio_queue_lock_grp, aio_lock_attr); + wq->aioq_waitq = wait_queue_alloc(SYNC_POLICY_FIFO); +} + + +/* + * Can be passed a queue which is locked spin. + */ +static void +aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp) +{ + ASSERT_AIO_WORKQ_LOCK_OWNED(queue); + + if (entryp->aio_workq_link.tqe_prev == NULL) { + panic("Trying to remove an entry from a work queue, but it is not on a queue\n"); + } + + TAILQ_REMOVE(&queue->aioq_entries, entryp, aio_workq_link); + queue->aioq_count--; + entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */ + + if (queue->aioq_count < 0) { + panic("Negative count on a queue.\n"); + } +} + +static void +aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp) +{ + ASSERT_AIO_WORKQ_LOCK_OWNED(queue); + + TAILQ_INSERT_TAIL(&queue->aioq_entries, entryp, aio_workq_link); + if (queue->aioq_count < 0) { + panic("Negative count on a queue.\n"); + } + queue->aioq_count++; +} + +static void +aio_proc_lock(proc_t procp) +{ + lck_mtx_lock(aio_proc_mutex(procp)); +} + +static void +aio_proc_lock_spin(proc_t procp) +{ + lck_mtx_lock_spin(aio_proc_mutex(procp)); +} + +static void +aio_proc_move_done_locked(proc_t procp, aio_workq_entry *entryp) +{ + ASSERT_AIO_PROC_LOCK_OWNED(procp); + + TAILQ_REMOVE(&procp->p_aio_activeq, entryp, aio_proc_link ); + TAILQ_INSERT_TAIL( &procp->p_aio_doneq, entryp, aio_proc_link); + procp->p_aio_active_count--; + OSIncrementAtomic(&aio_anchor.aio_done_count); +} + +static void +aio_proc_remove_done_locked(proc_t procp, aio_workq_entry *entryp) +{ + TAILQ_REMOVE(&procp->p_aio_doneq, entryp, aio_proc_link); + OSDecrementAtomic(&aio_anchor.aio_done_count); + aio_decrement_total_count(); + procp->p_aio_total_count--; +} + +static void +aio_proc_unlock(proc_t procp) +{ + lck_mtx_unlock(aio_proc_mutex(procp)); +} + +static lck_mtx_t* +aio_proc_mutex(proc_t procp) +{ + return &procp->p_mlock; +} + +static void +aio_entry_ref_locked(aio_workq_entry *entryp) +{ + ASSERT_AIO_ENTRY_LOCK_OWNED(entryp); + + if (entryp->aio_refcount < 0) { + panic("AIO workq entry with a negative refcount.\n"); + } + entryp->aio_refcount++; +} + + +/* Return 1 if you've freed it */ +static void +aio_entry_unref_locked(aio_workq_entry *entryp) +{ + ASSERT_AIO_ENTRY_LOCK_OWNED(entryp); + + entryp->aio_refcount--; + if (entryp->aio_refcount < 0) { + panic("AIO workq entry with a negative refcount.\n"); + } +} + +static void +aio_entry_ref(aio_workq_entry *entryp) +{ + aio_entry_lock_spin(entryp); + aio_entry_ref_locked(entryp); + aio_entry_unlock(entryp); +} +static void +aio_entry_unref(aio_workq_entry *entryp) +{ + aio_entry_lock_spin(entryp); + aio_entry_unref_locked(entryp); + + if ((entryp->aio_refcount == 0) && ((entryp->flags & AIO_DO_FREE) != 0)) { + aio_entry_unlock(entryp); + aio_free_request(entryp); + } else { + aio_entry_unlock(entryp); + } + + return; +} + +static void +aio_entry_update_for_cancel(aio_workq_entry *entryp, boolean_t cancelled, int wait_for_completion, boolean_t disable_notification) +{ + aio_entry_lock_spin(entryp); + + if (cancelled) { + aio_entry_ref_locked(entryp); + entryp->errorval = ECANCELED; + entryp->returnval = -1; + } + + if ( wait_for_completion ) { + entryp->flags |= wait_for_completion; /* flag for special completion processing */ + } + + if ( disable_notification ) { + entryp->flags |= AIO_DISABLE; /* Don't want a signal */ + } + + aio_entry_unlock(entryp); +} + +static int +aio_entry_try_workq_remove(aio_workq_entry *entryp) +{ + /* Can only be cancelled if it's still on a work queue */ + if (entryp->aio_workq_link.tqe_prev != NULL) { + aio_workq_t queue; + + /* Will have to check again under the lock */ + queue = aio_entry_workq(entryp); + aio_workq_lock_spin(queue); + if (entryp->aio_workq_link.tqe_prev != NULL) { + aio_workq_remove_entry_locked(queue, entryp); + aio_workq_unlock(queue); + return 1; + } else { + aio_workq_unlock(queue); + } + } + return 0; +} + +static void +aio_workq_lock_spin(aio_workq_t wq) +{ + lck_mtx_lock_spin(aio_workq_mutex(wq)); +} +static void +aio_workq_unlock(aio_workq_t wq) +{ + lck_mtx_unlock(aio_workq_mutex(wq)); +} +static lck_mtx_t* +aio_workq_mutex(aio_workq_t wq) +{ + return &wq->aioq_mtx; +} /* * aio_cancel - attempt to cancel one or more async IO requests currently @@ -225,9 +500,8 @@ static struct zone *aio_workq_zonep; * is NULL then all outstanding async IO request for the given file * descriptor are cancelled (if possible). */ - int -aio_cancel( struct proc *p, struct aio_cancel_args *uap, int *retval ) +aio_cancel(proc_t p, struct aio_cancel_args *uap, int *retval ) { struct user_aiocb my_aiocb; int result; @@ -236,24 +510,28 @@ aio_cancel( struct proc *p, struct aio_cancel_args *uap, int *retval ) (int)p, (int)uap->aiocbp, 0, 0, 0 ); /* quick check to see if there are any async IO requests queued up */ - AIO_LOCK; - result = aio_get_all_queues_count( ); - AIO_UNLOCK; - if ( result < 1 ) { - result = EBADF; + if (aio_get_all_queues_count() < 1) { + result = 0; + *retval = AIO_ALLDONE; goto ExitRoutine; } *retval = -1; if ( uap->aiocbp != USER_ADDR_NULL ) { - if ( !IS_64BIT_PROCESS(p) ) { - struct aiocb aiocb32; + if ( proc_is64bit(p) ) { + struct user64_aiocb aiocb64; + + result = copyin( uap->aiocbp, &aiocb64, sizeof(aiocb64) ); + if (result == 0 ) + do_munge_aiocb_user64_to_user(&aiocb64, &my_aiocb); + + } else { + struct user32_aiocb aiocb32; result = copyin( uap->aiocbp, &aiocb32, sizeof(aiocb32) ); if ( result == 0 ) - do_munge_aiocb( &aiocb32, &my_aiocb ); - } else - result = copyin( uap->aiocbp, &my_aiocb, sizeof(my_aiocb) ); + do_munge_aiocb_user32_to_user( &aiocb32, &my_aiocb ); + } if ( result != 0 ) { result = EAGAIN; @@ -269,7 +547,11 @@ aio_cancel( struct proc *p, struct aio_cancel_args *uap, int *retval ) goto ExitRoutine; } } - result = do_aio_cancel( p, uap->fd, uap->aiocbp, FALSE, FALSE ); + + aio_proc_lock(p); + result = do_aio_cancel_locked( p, uap->fd, uap->aiocbp, 0, FALSE ); + ASSERT_AIO_PROC_LOCK_OWNED(p); + aio_proc_unlock(p); if ( result != -1 ) { *retval = result; @@ -293,24 +575,23 @@ ExitRoutine: * a file descriptor that is closing. * THIS MAY BLOCK. */ - __private_extern__ void -_aio_close( struct proc *p, int fd ) +_aio_close(proc_t p, int fd ) { - int error, count; + int error; /* quick check to see if there are any async IO requests queued up */ - AIO_LOCK; - count = aio_get_all_queues_count( ); - AIO_UNLOCK; - if ( count < 1 ) + if (aio_get_all_queues_count() < 1) { return; + } KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_close)) | DBG_FUNC_START, (int)p, fd, 0, 0, 0 ); /* cancel all async IO requests on our todo queues for this file descriptor */ - error = do_aio_cancel( p, fd, 0, TRUE, FALSE ); + aio_proc_lock(p); + error = do_aio_cancel_locked( p, fd, 0, AIO_CLOSE_WAIT, FALSE ); + ASSERT_AIO_PROC_LOCK_OWNED(p); if ( error == AIO_NOTCANCELED ) { /* * AIO_NOTCANCELED is returned when we find an aio request for this process @@ -324,9 +605,14 @@ _aio_close( struct proc *p, int fd ) KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_close_sleep)) | DBG_FUNC_NONE, (int)p, fd, 0, 0, 0 ); - tsleep( &p->AIO_CLEANUP_SLEEP_CHAN, PRIBIO, "aio_close", 0 ); - } + while (aio_proc_active_requests_for_file(p, fd) > 0) { + msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_close", 0 ); + } + } + + aio_proc_unlock(p); + KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_close)) | DBG_FUNC_END, (int)p, fd, 0, 0, 0 ); @@ -341,9 +627,8 @@ _aio_close( struct proc *p, int fd ) * value that would be set by the corresponding IO request (read, wrtie, * fdatasync, or sync). */ - int -aio_error( struct proc *p, struct aio_error_args *uap, int *retval ) +aio_error(proc_t p, struct aio_error_args *uap, int *retval ) { aio_workq_entry *entryp; int error; @@ -351,19 +636,22 @@ aio_error( struct proc *p, struct aio_error_args *uap, int *retval ) KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error)) | DBG_FUNC_START, (int)p, (int)uap->aiocbp, 0, 0, 0 ); - AIO_LOCK; - - /* quick check to see if there are any async IO requests queued up */ - if ( aio_get_all_queues_count( ) < 1 ) { - error = EINVAL; - goto ExitRoutine; + /* see if there are any aios to check */ + if (aio_get_all_queues_count() < 1) { + return EINVAL; } + aio_proc_lock(p); + /* look for a match on our queue of async IO requests that have completed */ - TAILQ_FOREACH( entryp, &p->aio_doneq, aio_workq_link ) { + TAILQ_FOREACH( entryp, &p->p_aio_doneq, aio_proc_link) { if ( entryp->uaiocbp == uap->aiocbp ) { + ASSERT_AIO_FROM_PROC(entryp, p); + + aio_entry_lock_spin(entryp); *retval = entryp->errorval; error = 0; + aio_entry_unlock(entryp); KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error_val)) | DBG_FUNC_NONE, (int)p, (int)uap->aiocbp, *retval, 0, 0 ); goto ExitRoutine; @@ -371,8 +659,9 @@ aio_error( struct proc *p, struct aio_error_args *uap, int *retval ) } /* look for a match on our queue of active async IO requests */ - TAILQ_FOREACH( entryp, &p->aio_activeq, aio_workq_link ) { + TAILQ_FOREACH( entryp, &p->p_aio_activeq, aio_proc_link) { if ( entryp->uaiocbp == uap->aiocbp ) { + ASSERT_AIO_FROM_PROC(entryp, p); *retval = EINPROGRESS; error = 0; KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error_activeq)) | DBG_FUNC_NONE, @@ -380,23 +669,13 @@ aio_error( struct proc *p, struct aio_error_args *uap, int *retval ) goto ExitRoutine; } } - - /* look for a match on our queue of todo work */ - TAILQ_FOREACH( entryp, &aio_anchor.aio_async_workq, aio_workq_link ) { - if ( p == entryp->procp && entryp->uaiocbp == uap->aiocbp ) { - *retval = EINPROGRESS; - error = 0; - KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error_workq)) | DBG_FUNC_NONE, - (int)p, (int)uap->aiocbp, *retval, 0, 0 ); - goto ExitRoutine; - } - } + error = EINVAL; ExitRoutine: KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error)) | DBG_FUNC_END, (int)p, (int)uap->aiocbp, error, 0, 0 ); - AIO_UNLOCK; + aio_proc_unlock(p); return( error ); @@ -410,9 +689,8 @@ ExitRoutine: * NOTE - we do not support op O_DSYNC at this point since we do not support the * fdatasync() call. */ - int -aio_fsync( struct proc *p, struct aio_fsync_args *uap, int *retval ) +aio_fsync(proc_t p, struct aio_fsync_args *uap, int *retval ) { int error; int fsync_kind; @@ -424,10 +702,8 @@ aio_fsync( struct proc *p, struct aio_fsync_args *uap, int *retval ) /* 0 := O_SYNC for binary backward compatibility with Panther */ if (uap->op == O_SYNC || uap->op == 0) fsync_kind = AIO_FSYNC; -#if 0 // we don't support fdatasync() call yet else if ( uap->op == O_DSYNC ) fsync_kind = AIO_DSYNC; -#endif else { *retval = -1; error = EINVAL; @@ -451,9 +727,8 @@ ExitRoutine: * file descriptor (uap->aiocbp->aio_fildes) into the buffer * (uap->aiocbp->aio_buf). */ - int -aio_read( struct proc *p, struct aio_read_args *uap, int *retval ) +aio_read(proc_t p, struct aio_read_args *uap, int *retval ) { int error; @@ -477,54 +752,57 @@ aio_read( struct proc *p, struct aio_read_args *uap, int *retval ) /* * aio_return - return the return status associated with the async IO * request referred to by uap->aiocbp. The return status is the value - * that would be returned by corresponding IO request (read, wrtie, + * that would be returned by corresponding IO request (read, write, * fdatasync, or sync). This is where we release kernel resources * held for async IO call associated with the given aiocb pointer. */ - int -aio_return( struct proc *p, struct aio_return_args *uap, user_ssize_t *retval ) +aio_return(proc_t p, struct aio_return_args *uap, user_ssize_t *retval ) { aio_workq_entry *entryp; int error; - boolean_t lock_held; + boolean_t proc_lock_held = FALSE; KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return)) | DBG_FUNC_START, (int)p, (int)uap->aiocbp, 0, 0, 0 ); - AIO_LOCK; - lock_held = TRUE; - *retval = 0; - - /* quick check to see if there are any async IO requests queued up */ - if ( aio_get_all_queues_count( ) < 1 ) { + /* See if there are any entries to check */ + if (aio_get_all_queues_count() < 1) { error = EINVAL; goto ExitRoutine; } + aio_proc_lock(p); + proc_lock_held = TRUE; + *retval = 0; + /* look for a match on our queue of async IO requests that have completed */ - TAILQ_FOREACH( entryp, &p->aio_doneq, aio_workq_link ) { + TAILQ_FOREACH( entryp, &p->p_aio_doneq, aio_proc_link) { + ASSERT_AIO_FROM_PROC(entryp, p); if ( entryp->uaiocbp == uap->aiocbp ) { - TAILQ_REMOVE( &p->aio_doneq, entryp, aio_workq_link ); - aio_anchor.aio_done_count--; - p->aio_done_count--; + /* Done and valid for aio_return(), pull it off the list */ + aio_proc_remove_done_locked(p, entryp); + /* Drop the proc lock, but keep the entry locked */ + aio_entry_lock(entryp); + aio_proc_unlock(p); + proc_lock_held = FALSE; + *retval = entryp->returnval; + error = 0; - /* we cannot free requests that are still completing */ - if ( (entryp->flags & AIO_COMPLETION) == 0 ) { - vm_map_t my_map; - - my_map = entryp->aio_map; - entryp->aio_map = VM_MAP_NULL; - AIO_UNLOCK; - lock_held = FALSE; - aio_free_request( entryp, my_map ); + /* No references and off all lists, safe to free */ + if (entryp->aio_refcount == 0) { + aio_entry_unlock(entryp); + aio_free_request(entryp); } - else - /* tell completion code to free this request */ + else { + /* Whoever has the refcount will have to free it */ entryp->flags |= AIO_DO_FREE; - error = 0; + aio_entry_unlock(entryp); + } + + KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return_val)) | DBG_FUNC_NONE, (int)p, (int)uap->aiocbp, *retval, 0, 0 ); goto ExitRoutine; @@ -532,7 +810,8 @@ aio_return( struct proc *p, struct aio_return_args *uap, user_ssize_t *retval ) } /* look for a match on our queue of active async IO requests */ - TAILQ_FOREACH( entryp, &p->aio_activeq, aio_workq_link ) { + TAILQ_FOREACH( entryp, &p->p_aio_activeq, aio_proc_link) { + ASSERT_AIO_FROM_PROC(entryp, p); if ( entryp->uaiocbp == uap->aiocbp ) { error = EINPROGRESS; KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return_activeq)) | DBG_FUNC_NONE, @@ -541,20 +820,11 @@ aio_return( struct proc *p, struct aio_return_args *uap, user_ssize_t *retval ) } } - /* look for a match on our queue of todo work */ - TAILQ_FOREACH( entryp, &aio_anchor.aio_async_workq, aio_workq_link ) { - if ( p == entryp->procp && entryp->uaiocbp == uap->aiocbp ) { - error = EINPROGRESS; - KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return_workq)) | DBG_FUNC_NONE, - (int)p, (int)uap->aiocbp, *retval, 0, 0 ); - goto ExitRoutine; - } - } error = EINVAL; ExitRoutine: - if ( lock_held ) - AIO_UNLOCK; + if (proc_lock_held) + aio_proc_unlock(p); KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return)) | DBG_FUNC_END, (int)p, (int)uap->aiocbp, error, 0, 0 ); @@ -570,9 +840,8 @@ ExitRoutine: * for cancelled or active aio requests that complete. * This routine MAY block! */ - __private_extern__ void -_aio_exec( struct proc *p ) +_aio_exec(proc_t p ) { KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_exec)) | DBG_FUNC_START, @@ -594,29 +863,29 @@ _aio_exec( struct proc *p ) * we can and wait for those already active. We also disable signaling * for cancelled or active aio requests that complete. This routine MAY block! */ - __private_extern__ void -_aio_exit( struct proc *p ) +_aio_exit(proc_t p ) { - int error, count; + int error; aio_workq_entry *entryp; + /* quick check to see if there are any async IO requests queued up */ - AIO_LOCK; - count = aio_get_all_queues_count( ); - AIO_UNLOCK; - if ( count < 1 ) { + if (aio_get_all_queues_count() < 1) { return; } KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_exit)) | DBG_FUNC_START, (int)p, 0, 0, 0, 0 ); + aio_proc_lock(p); + /* * cancel async IO requests on the todo work queue and wait for those * already active to complete. */ - error = do_aio_cancel( p, 0, 0, TRUE, TRUE ); + error = do_aio_cancel_locked( p, 0, 0, AIO_EXIT_WAIT, TRUE ); + ASSERT_AIO_PROC_LOCK_OWNED(p); if ( error == AIO_NOTCANCELED ) { /* * AIO_NOTCANCELED is returned when we find an aio request for this process @@ -630,52 +899,69 @@ _aio_exit( struct proc *p ) KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_exit_sleep)) | DBG_FUNC_NONE, (int)p, 0, 0, 0, 0 ); - tsleep( &p->AIO_CLEANUP_SLEEP_CHAN, PRIBIO, "aio_exit", 0 ); + while (p->p_aio_active_count != 0) { + msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_exit", 0 ); + } + } + + if (p->p_aio_active_count != 0) { + panic("Exiting process has %d active AIOs after cancellation has completed.\n", p->p_aio_active_count); } /* release all aio resources used by this process */ - AIO_LOCK; - entryp = TAILQ_FIRST( &p->aio_doneq ); + entryp = TAILQ_FIRST( &p->p_aio_doneq ); while ( entryp != NULL ) { + ASSERT_AIO_FROM_PROC(entryp, p); aio_workq_entry *next_entryp; - next_entryp = TAILQ_NEXT( entryp, aio_workq_link ); - TAILQ_REMOVE( &p->aio_doneq, entryp, aio_workq_link ); - aio_anchor.aio_done_count--; - p->aio_done_count--; + next_entryp = TAILQ_NEXT( entryp, aio_proc_link); + aio_proc_remove_done_locked(p, entryp); /* we cannot free requests that are still completing */ - if ( (entryp->flags & AIO_COMPLETION) == 0 ) { - vm_map_t my_map; - - my_map = entryp->aio_map; - entryp->aio_map = VM_MAP_NULL; - AIO_UNLOCK; - aio_free_request( entryp, my_map ); + aio_entry_lock_spin(entryp); + if (entryp->aio_refcount == 0) { + aio_proc_unlock(p); + aio_entry_unlock(entryp); + aio_free_request(entryp); /* need to start over since aio_doneq may have been */ /* changed while we were away. */ - AIO_LOCK; - entryp = TAILQ_FIRST( &p->aio_doneq ); + aio_proc_lock(p); + entryp = TAILQ_FIRST( &p->p_aio_doneq ); continue; } - else - /* tell completion code to free this request */ + else { + /* whoever has the reference will have to do the free */ entryp->flags |= AIO_DO_FREE; + } + + aio_entry_unlock(entryp); entryp = next_entryp; } - AIO_UNLOCK; - + + aio_proc_unlock(p); + KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_exit)) | DBG_FUNC_END, (int)p, 0, 0, 0, 0 ); - return; } /* _aio_exit */ +static boolean_t +should_cancel(aio_workq_entry *entryp, user_addr_t aiocbp, int fd) +{ + if ( (aiocbp == USER_ADDR_NULL && fd == 0) || + (aiocbp != USER_ADDR_NULL && entryp->uaiocbp == aiocbp) || + (aiocbp == USER_ADDR_NULL && fd == entryp->aiocb.aio_fildes) ) { + return TRUE; + } + + return FALSE; +} + /* - * do_aio_cancel - cancel async IO requests (if possible). We get called by + * do_aio_cancel_locked - cancel async IO requests (if possible). We get called by * aio_cancel, close, and at exit. * There are three modes of operation: 1) cancel all async IOs for a process - * fd is 0 and aiocbp is NULL 2) cancel all async IOs for file descriptor - fd @@ -687,167 +973,113 @@ _aio_exit( struct proc *p ) * were already complete. * WARNING - do not deference aiocbp in this routine, it may point to user * land data that has not been copied in (when called from aio_cancel() ) + * + * Called with proc locked, and returns the same way. */ - static int -do_aio_cancel( struct proc *p, int fd, user_addr_t aiocbp, - boolean_t wait_for_completion, boolean_t disable_notification ) +do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp, + int wait_for_completion, boolean_t disable_notification ) { + ASSERT_AIO_PROC_LOCK_OWNED(p); + aio_workq_entry *entryp; int result; result = -1; /* look for a match on our queue of async todo work. */ - AIO_LOCK; - entryp = TAILQ_FIRST( &aio_anchor.aio_async_workq ); + entryp = TAILQ_FIRST(&p->p_aio_activeq); while ( entryp != NULL ) { + ASSERT_AIO_FROM_PROC(entryp, p); aio_workq_entry *next_entryp; - - next_entryp = TAILQ_NEXT( entryp, aio_workq_link ); - if ( p == entryp->procp ) { - if ( (aiocbp == USER_ADDR_NULL && fd == 0) || - (aiocbp != USER_ADDR_NULL && entryp->uaiocbp == aiocbp) || - (aiocbp == USER_ADDR_NULL && fd == entryp->aiocb.aio_fildes) ) { - /* we found a match so we remove the entry from the */ - /* todo work queue and place it on the done queue */ - TAILQ_REMOVE( &aio_anchor.aio_async_workq, entryp, aio_workq_link ); - aio_anchor.aio_async_workq_count--; - entryp->errorval = ECANCELED; - entryp->returnval = -1; - if ( disable_notification ) - entryp->flags |= AIO_DISABLE; /* flag for special completion processing */ - result = AIO_CANCELED; - - KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_async_workq)) | DBG_FUNC_NONE, - (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 ); - - TAILQ_INSERT_TAIL( &p->aio_doneq, entryp, aio_workq_link ); - aio_anchor.aio_done_count++; - p->aio_done_count++; - entryp->flags |= AIO_COMPLETION; - AIO_UNLOCK; - - /* do completion processing for this request */ - do_aio_completion( entryp ); - - AIO_LOCK; - entryp->flags &= ~AIO_COMPLETION; - if ( (entryp->flags & AIO_DO_FREE) != 0 ) { - vm_map_t my_map; - - my_map = entryp->aio_map; - entryp->aio_map = VM_MAP_NULL; - AIO_UNLOCK; - aio_free_request( entryp, my_map ); - } - else - AIO_UNLOCK; - if ( aiocbp != USER_ADDR_NULL ) { - return( result ); - } - - /* need to start over since aio_async_workq may have been */ - /* changed while we were away doing completion processing. */ - AIO_LOCK; - entryp = TAILQ_FIRST( &aio_anchor.aio_async_workq ); - continue; - } + next_entryp = TAILQ_NEXT( entryp, aio_proc_link); + if (!should_cancel(entryp, aiocbp, fd)) { + entryp = next_entryp; + continue; } - entryp = next_entryp; - } /* while... */ - - /* - * look for a match on our queue of synchronous todo work. This will - * be a rare occurrence but could happen if a process is terminated while - * processing a lio_listio call. - */ - entryp = TAILQ_FIRST( &aio_anchor.lio_sync_workq ); - while ( entryp != NULL ) { - aio_workq_entry *next_entryp; - - next_entryp = TAILQ_NEXT( entryp, aio_workq_link ); - if ( p == entryp->procp ) { - if ( (aiocbp == USER_ADDR_NULL && fd == 0) || - (aiocbp != USER_ADDR_NULL && entryp->uaiocbp == aiocbp) || - (aiocbp == USER_ADDR_NULL && fd == entryp->aiocb.aio_fildes) ) { - /* we found a match so we remove the entry from the */ - /* todo work queue and place it on the done queue */ - TAILQ_REMOVE( &aio_anchor.lio_sync_workq, entryp, aio_workq_link ); - aio_anchor.lio_sync_workq_count--; - entryp->errorval = ECANCELED; - entryp->returnval = -1; - if ( disable_notification ) - entryp->flags |= AIO_DISABLE; /* flag for special completion processing */ - result = AIO_CANCELED; - - KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_sync_workq)) | DBG_FUNC_NONE, - (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 ); - - TAILQ_INSERT_TAIL( &p->aio_doneq, entryp, aio_workq_link ); - aio_anchor.aio_done_count++; - p->aio_done_count++; - if ( aiocbp != USER_ADDR_NULL ) { - AIO_UNLOCK; - return( result ); - } + + /* Can only be cancelled if it's still on a work queue */ + if (aio_entry_try_workq_remove(entryp) != 0) { + /* Have removed from workq. Update entry state and take a ref */ + aio_entry_update_for_cancel(entryp, TRUE, 0, disable_notification); + + /* Put on the proc done queue and update counts, then unlock the proc */ + aio_proc_move_done_locked(p, entryp); + aio_proc_unlock(p); + + /* Now it's officially cancelled. Do the completion */ + result = AIO_CANCELED; + KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_async_workq)) | DBG_FUNC_NONE, + (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 ); + do_aio_completion(entryp); + + /* This will free if the aio_return() has already happened ... */ + aio_entry_unref(entryp); + aio_proc_lock(p); + + if ( aiocbp != USER_ADDR_NULL ) { + return( result ); } - } - entryp = next_entryp; - } /* while... */ - /* - * look for a match on our queue of active async IO requests and - * return AIO_NOTCANCELED result. - */ - TAILQ_FOREACH( entryp, &p->aio_activeq, aio_workq_link ) { - if ( (aiocbp == USER_ADDR_NULL && fd == 0) || - (aiocbp != USER_ADDR_NULL && entryp->uaiocbp == aiocbp) || - (aiocbp == USER_ADDR_NULL && fd == entryp->aiocb.aio_fildes) ) { + /* + * Restart from the head of the proc active queue since it + * may have been changed while we were away doing completion + * processing. + * + * Note that if we found an uncancellable AIO before, we will + * either find it again or discover that it's been completed, + * so resetting the result will not cause us to return success + * despite outstanding AIOs. + */ + entryp = TAILQ_FIRST(&p->p_aio_activeq); + result = -1; /* As if beginning anew */ + } else { + /* + * It's been taken off the active queue already, i.e. is in flight. + * All we can do is ask for notification. + */ result = AIO_NOTCANCELED; KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_activeq)) | DBG_FUNC_NONE, - (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 ); + (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 ); + + /* Mark for waiting and such; will not take a ref if "cancelled" arg is FALSE */ + aio_entry_update_for_cancel(entryp, FALSE, wait_for_completion, disable_notification); - if ( wait_for_completion ) - entryp->flags |= AIO_WAITING; /* flag for special completion processing */ - if ( disable_notification ) - entryp->flags |= AIO_DISABLE; /* flag for special completion processing */ if ( aiocbp != USER_ADDR_NULL ) { - AIO_UNLOCK; return( result ); } + entryp = next_entryp; } - } - + } /* while... */ + /* * if we didn't find any matches on the todo or active queues then look for a * match on our queue of async IO requests that have completed and if found * return AIO_ALLDONE result. + * + * Proc AIO lock is still held. */ if ( result == -1 ) { - TAILQ_FOREACH( entryp, &p->aio_doneq, aio_workq_link ) { - if ( (aiocbp == USER_ADDR_NULL && fd == 0) || - (aiocbp != USER_ADDR_NULL && entryp->uaiocbp == aiocbp) || - (aiocbp == USER_ADDR_NULL && fd == entryp->aiocb.aio_fildes) ) { + TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) { + ASSERT_AIO_FROM_PROC(entryp, p); + if (should_cancel(entryp, aiocbp, fd)) { result = AIO_ALLDONE; - KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_doneq)) | DBG_FUNC_NONE, - (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 ); + (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 ); if ( aiocbp != USER_ADDR_NULL ) { - AIO_UNLOCK; return( result ); } } } } - AIO_UNLOCK; return( result ); -} /* do_aio_cancel */ +} + /* do_aio_cancel_locked */ /* @@ -859,9 +1091,16 @@ do_aio_cancel( struct proc *p, int fd, user_addr_t aiocbp, * set appropriately - EAGAIN if timeout elapses or EINTR if an interrupt * woke us up. */ +int +aio_suspend(proc_t p, struct aio_suspend_args *uap, int *retval ) +{ + __pthread_testcancel(1); + return(aio_suspend_nocancel(p, (struct aio_suspend_nocancel_args *)uap, retval)); +} + int -aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval ) +aio_suspend_nocancel(proc_t p, struct aio_suspend_nocancel_args *uap, int *retval ) { int error; int i, count; @@ -877,10 +1116,7 @@ aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval ) abstime = 0; aiocbpp = NULL; - /* quick check to see if there are any async IO requests queued up */ - AIO_LOCK; - count = aio_get_all_queues_count( ); - AIO_UNLOCK; + count = aio_get_all_queues_count( ); if ( count < 1 ) { error = EINVAL; goto ExitThisRoutine; @@ -893,10 +1129,15 @@ aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval ) if ( uap->timeoutp != USER_ADDR_NULL ) { if ( proc_is64bit(p) ) { - error = copyin( uap->timeoutp, &ts, sizeof(ts) ); + struct user64_timespec temp; + error = copyin( uap->timeoutp, &temp, sizeof(temp) ); + if ( error == 0 ) { + ts.tv_sec = temp.tv_sec; + ts.tv_nsec = temp.tv_nsec; + } } else { - struct timespec temp; + struct user32_timespec temp; error = copyin( uap->timeoutp, &temp, sizeof(temp) ); if ( error == 0 ) { ts.tv_sec = temp.tv_sec; @@ -908,7 +1149,7 @@ aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval ) goto ExitThisRoutine; } - if ( ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000 ) { + if ( ts.tv_sec < 0 || ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000 ) { error = EINVAL; goto ExitThisRoutine; } @@ -918,35 +1159,15 @@ aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval ) clock_absolutetime_interval_to_deadline( abstime, &abstime ); } - /* we reserve enough space for largest possible pointer size */ - MALLOC( aiocbpp, user_addr_t *, (uap->nent * sizeof(user_addr_t)), M_TEMP, M_WAITOK ); + aiocbpp = aio_copy_in_list(p, uap->aiocblist, uap->nent); if ( aiocbpp == NULL ) { error = EAGAIN; goto ExitThisRoutine; } - /* copyin our aiocb pointers from list */ - error = copyin( uap->aiocblist, aiocbpp, - proc_is64bit(p) ? (uap->nent * sizeof(user_addr_t)) - : (uap->nent * sizeof(uintptr_t)) ); - if ( error != 0 ) { - error = EAGAIN; - goto ExitThisRoutine; - } - - /* we depend on a list of user_addr_t's so we need to munge and expand */ - /* when these pointers came from a 32-bit process */ - if ( !proc_is64bit(p) && sizeof(uintptr_t) < sizeof(user_addr_t) ) { - /* position to the last entry and work back from there */ - uintptr_t *my_ptrp = ((uintptr_t *)aiocbpp) + (uap->nent - 1); - user_addr_t *my_addrp = aiocbpp + (uap->nent - 1); - for (i = 0; i < uap->nent; i++, my_ptrp--, my_addrp--) { - *my_addrp = (user_addr_t) (*my_ptrp); - } - } - /* check list of aio requests to see if any have completed */ - AIO_LOCK; +check_for_our_aiocbp: + aio_proc_lock_spin(p); for ( i = 0; i < uap->nent; i++ ) { user_addr_t aiocbp; @@ -956,11 +1177,12 @@ aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval ) continue; /* return immediately if any aio request in the list is done */ - TAILQ_FOREACH( entryp, &p->aio_doneq, aio_workq_link ) { + TAILQ_FOREACH( entryp, &p->p_aio_doneq, aio_proc_link) { + ASSERT_AIO_FROM_PROC(entryp, p); if ( entryp->uaiocbp == aiocbp ) { + aio_proc_unlock(p); *retval = 0; error = 0; - AIO_UNLOCK; goto ExitThisRoutine; } } @@ -975,17 +1197,20 @@ aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval ) * interrupts us. If an async IO completes before a signal fires or our * timeout expires, we get a wakeup call from aio_work_thread(). */ - assert_wait_deadline( (event_t) &p->AIO_SUSPEND_SLEEP_CHAN, THREAD_ABORTSAFE, abstime ); - AIO_UNLOCK; - - error = thread_block( THREAD_CONTINUE_NULL ); - if ( error == THREAD_AWAKENED ) { - /* got our wakeup call from aio_work_thread() */ - *retval = 0; - error = 0; + error = msleep1(&p->AIO_SUSPEND_SLEEP_CHAN, aio_proc_mutex(p), PCATCH | PWAIT | PDROP, "aio_suspend", abstime); /* XXX better priority? */ + if ( error == 0 ) { + /* + * got our wakeup call from aio_work_thread(). + * Since we can get a wakeup on this channel from another thread in the + * same process we head back up to make sure this is for the correct aiocbp. + * If it is the correct aiocbp we will return from where we do the check + * (see entryp->uaiocbp == aiocbp after check_for_our_aiocbp label) + * else we will fall out and just sleep again. + */ + goto check_for_our_aiocbp; } - else if ( error == THREAD_TIMED_OUT ) { + else if ( error == EWOULDBLOCK ) { /* our timeout expired */ error = EAGAIN; } @@ -1012,7 +1237,7 @@ ExitThisRoutine: */ int -aio_write( struct proc *p, struct aio_write_args *uap, int *retval ) +aio_write(proc_t p, struct aio_write_args *uap, int *retval ) { int error; @@ -1033,28 +1258,217 @@ aio_write( struct proc *p, struct aio_write_args *uap, int *retval ) } /* aio_write */ +static user_addr_t * +aio_copy_in_list(proc_t procp, user_addr_t aiocblist, int nent) +{ + user_addr_t *aiocbpp; + int i, result; + + /* we reserve enough space for largest possible pointer size */ + MALLOC( aiocbpp, user_addr_t *, (nent * sizeof(user_addr_t)), M_TEMP, M_WAITOK ); + if ( aiocbpp == NULL ) + goto err; + + /* copyin our aiocb pointers from list */ + result = copyin( aiocblist, aiocbpp, + proc_is64bit(procp) ? (nent * sizeof(user64_addr_t)) + : (nent * sizeof(user32_addr_t)) ); + if ( result) { + FREE( aiocbpp, M_TEMP ); + aiocbpp = NULL; + goto err; + } + + /* + * We depend on a list of user_addr_t's so we need to + * munge and expand when these pointers came from a + * 32-bit process + */ + if ( !proc_is64bit(procp) ) { + /* copy from last to first to deal with overlap */ + user32_addr_t *my_ptrp = ((user32_addr_t *)aiocbpp) + (nent - 1); + user_addr_t *my_addrp = aiocbpp + (nent - 1); + + for (i = 0; i < nent; i++, my_ptrp--, my_addrp--) { + *my_addrp = (user_addr_t) (*my_ptrp); + } + } + +err: + return (aiocbpp); +} + + +static int +aio_copy_in_sigev(proc_t procp, user_addr_t sigp, struct user_sigevent *sigev) +{ + int result = 0; + + if (sigp == USER_ADDR_NULL) + goto out; + + /* + * We need to munge aio_sigevent since it contains pointers. + * Since we do not know if sigev_value is an int or a ptr we do + * NOT cast the ptr to a user_addr_t. This means if we send + * this info back to user space we need to remember sigev_value + * was not expanded for the 32-bit case. + * + * Notes: This does NOT affect us since we don't support + * sigev_value yet in the aio context. + */ + if ( proc_is64bit(procp) ) { + struct user64_sigevent sigevent64; + + result = copyin( sigp, &sigevent64, sizeof(sigevent64) ); + if ( result == 0 ) { + sigev->sigev_notify = sigevent64.sigev_notify; + sigev->sigev_signo = sigevent64.sigev_signo; + sigev->sigev_value.size_equivalent.sival_int = sigevent64.sigev_value.size_equivalent.sival_int; + sigev->sigev_notify_function = sigevent64.sigev_notify_function; + sigev->sigev_notify_attributes = sigevent64.sigev_notify_attributes; + } + + } else { + struct user32_sigevent sigevent32; + + result = copyin( sigp, &sigevent32, sizeof(sigevent32) ); + if ( result == 0 ) { + sigev->sigev_notify = sigevent32.sigev_notify; + sigev->sigev_signo = sigevent32.sigev_signo; + sigev->sigev_value.size_equivalent.sival_int = sigevent32.sigev_value.sival_int; + sigev->sigev_notify_function = CAST_USER_ADDR_T(sigevent32.sigev_notify_function); + sigev->sigev_notify_attributes = CAST_USER_ADDR_T(sigevent32.sigev_notify_attributes); + } + } + + if ( result != 0 ) { + result = EAGAIN; + } + +out: + return (result); +} + /* - * lio_listio - initiate a list of IO requests. We process the list of aiocbs - * either synchronously (mode == LIO_WAIT) or asynchronously (mode == LIO_NOWAIT). - * The caller gets error and return status for each aiocb in the list via aio_error - * and aio_return. We must keep completed requests until released by the - * aio_return call. + * aio_enqueue_work + * + * Queue up the entry on the aio asynchronous work queue in priority order + * based on the relative priority of the request. We calculate the relative + * priority using the nice value of the caller and the value + * + * Parameters: procp Process queueing the I/O + * entryp The work queue entry being queued + * + * Returns: (void) No failure modes + * + * Notes: This function is used for both lio_listio and aio + * + * XXX: At some point, we may have to consider thread priority + * rather than process priority, but we don't maintain the + * adjusted priority for threads the POSIX way. + * + * + * Called with proc locked. */ +static void +aio_enqueue_work( proc_t procp, aio_workq_entry *entryp, int proc_locked) +{ +#if 0 + aio_workq_entry *my_entryp; /* used for insertion sort */ +#endif /* 0 */ + aio_workq_t queue = aio_entry_workq(entryp); + + if (proc_locked == 0) { + aio_proc_lock(procp); + } + + ASSERT_AIO_PROC_LOCK_OWNED(procp); + + /* Onto proc queue */ + TAILQ_INSERT_TAIL(&procp->p_aio_activeq, entryp, aio_proc_link); + procp->p_aio_active_count++; + procp->p_aio_total_count++; + /* And work queue */ + aio_workq_lock_spin(queue); + aio_workq_add_entry_locked(queue, entryp); + wait_queue_wakeup_one(queue->aioq_waitq, queue, THREAD_AWAKENED, -1); + aio_workq_unlock(queue); + + if (proc_locked == 0) { + aio_proc_unlock(procp); + } + +#if 0 + /* + * Procedure: + * + * (1) The nice value is in the range PRIO_MIN..PRIO_MAX [-20..20] + * (2) The normalized nice value is in the range 0..((2 * NZERO) - 1) + * which is [0..39], with 0 not being used. In nice values, the + * lower the nice value, the higher the priority. + * (3) The normalized scheduling prioritiy is the highest nice value + * minus the current nice value. In I/O scheduling priority, the + * higher the value the lower the priority, so it is the inverse + * of the nice value (the higher the number, the higher the I/O + * priority). + * (4) From the normalized scheduling priority, we subtract the + * request priority to get the request priority value number; + * this means that requests are only capable of depressing their + * priority relative to other requests, + */ + entryp->priority = (((2 * NZERO) - 1) - procp->p_nice); + + /* only premit depressing the priority */ + if (entryp->aiocb.aio_reqprio < 0) + entryp->aiocb.aio_reqprio = 0; + if (entryp->aiocb.aio_reqprio > 0) { + entryp->priority -= entryp->aiocb.aio_reqprio; + if (entryp->priority < 0) + entryp->priority = 0; + } + + /* Insertion sort the entry; lowest ->priority to highest */ + TAILQ_FOREACH(my_entryp, &aio_anchor.aio_async_workq, aio_workq_link) { + if ( entryp->priority <= my_entryp->priority) { + TAILQ_INSERT_BEFORE(my_entryp, entryp, aio_workq_link); + break; + } + } + if (my_entryp == NULL) + TAILQ_INSERT_TAIL( &aio_anchor.aio_async_workq, entryp, aio_workq_link ); +#endif /* 0 */ +} + + +/* + * lio_listio - initiate a list of IO requests. We process the list of + * aiocbs either synchronously (mode == LIO_WAIT) or asynchronously + * (mode == LIO_NOWAIT). + * + * The caller gets error and return status for each aiocb in the list + * via aio_error and aio_return. We must keep completed requests until + * released by the aio_return call. + */ int -lio_listio( struct proc *p, struct lio_listio_args *uap, int *retval ) +lio_listio(proc_t p, struct lio_listio_args *uap, int *retval ) { - int i; - int call_result; - int result; - long group_tag; - aio_workq_entry * *entryp_listp; - user_addr_t *aiocbpp; - + int i; + int call_result; + int result; + int old_count; + aio_workq_entry **entryp_listp; + user_addr_t *aiocbpp; + struct user_sigevent aiosigev; + aio_lio_context *lio_context; + boolean_t free_context = FALSE; + KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_listio)) | DBG_FUNC_START, (int)p, uap->nent, uap->mode, 0, 0 ); entryp_listp = NULL; + lio_context = NULL; aiocbpp = NULL; call_result = -1; *retval = -1; @@ -1067,171 +1481,149 @@ lio_listio( struct proc *p, struct lio_listio_args *uap, int *retval ) call_result = EINVAL; goto ExitRoutine; } - - /* - * we use group_tag to mark IO requests for delayed completion processing - * which means we wait until all IO requests in the group have completed - * before we either return to the caller when mode is LIO_WAIT or signal - * user when mode is LIO_NOWAIT. - */ - group_tag = random(); /* - * allocate a list of aio_workq_entry pointers that we will use to queue - * up all our requests at once while holding our lock. + * allocate a list of aio_workq_entry pointers that we will use + * to queue up all our requests at once while holding our lock. */ MALLOC( entryp_listp, void *, (uap->nent * sizeof(aio_workq_entry *)), M_TEMP, M_WAITOK ); if ( entryp_listp == NULL ) { call_result = EAGAIN; goto ExitRoutine; } - - /* we reserve enough space for largest possible pointer size */ - MALLOC( aiocbpp, user_addr_t *, (uap->nent * sizeof(user_addr_t)), M_TEMP, M_WAITOK ); - if ( aiocbpp == NULL ) { + + MALLOC( lio_context, aio_lio_context*, sizeof(aio_lio_context), M_TEMP, M_WAITOK ); + if ( lio_context == NULL ) { call_result = EAGAIN; goto ExitRoutine; } - /* copyin our aiocb pointers from list */ - result = copyin( uap->aiocblist, aiocbpp, - IS_64BIT_PROCESS(p) ? (uap->nent * sizeof(user_addr_t)) - : (uap->nent * sizeof(uintptr_t)) ); - if ( result != 0 ) { +#if DEBUG + OSIncrementAtomic(&lio_contexts_alloced); +#endif /* DEBUG */ + + bzero(lio_context, sizeof(aio_lio_context)); + + aiocbpp = aio_copy_in_list(p, uap->aiocblist, uap->nent); + if ( aiocbpp == NULL ) { call_result = EAGAIN; goto ExitRoutine; } - - /* we depend on a list of user_addr_t's so we need to munge and expand */ - /* when these pointers came from a 32-bit process */ - if ( !IS_64BIT_PROCESS(p) && sizeof(uintptr_t) < sizeof(user_addr_t) ) { - /* position to the last entry and work back from there */ - uintptr_t *my_ptrp = ((uintptr_t *)aiocbpp) + (uap->nent - 1); - user_addr_t *my_addrp = aiocbpp + (uap->nent - 1); - for (i = 0; i < uap->nent; i++, my_ptrp--, my_addrp--) { - *my_addrp = (user_addr_t) (*my_ptrp); - } + + /* + * Use sigevent passed in to lio_listio for each of our calls, but + * only do completion notification after the last request completes. + */ + bzero(&aiosigev, sizeof(aiosigev)); + /* Only copy in an sigev if the user supplied one */ + if (uap->sigp != USER_ADDR_NULL) { + call_result = aio_copy_in_sigev(p, uap->sigp, &aiosigev); + if ( call_result) + goto ExitRoutine; } /* process list of aio requests */ + lio_context->io_issued = uap->nent; + lio_context->io_waiter = uap->mode == LIO_WAIT ? 1 : 0; /* Should it be freed by last AIO */ for ( i = 0; i < uap->nent; i++ ) { user_addr_t my_aiocbp; + aio_workq_entry *entryp; *(entryp_listp + i) = NULL; my_aiocbp = *(aiocbpp + i); /* NULL elements are legal so check for 'em */ - if ( my_aiocbp == USER_ADDR_NULL ) + if ( my_aiocbp == USER_ADDR_NULL ) { + aio_proc_lock_spin(p); + lio_context->io_issued--; + aio_proc_unlock(p); continue; + } - if ( uap->mode == LIO_NOWAIT ) - result = lio_create_async_entry( p, my_aiocbp, uap->sigp, - group_tag, (entryp_listp + i) ); - else - result = lio_create_sync_entry( p, my_aiocbp, group_tag, - (entryp_listp + i) ); - + /* + * We use lio_context to mark IO requests for delayed completion + * processing which means we wait until all IO requests in the + * group have completed before we either return to the caller + * when mode is LIO_WAIT or signal user when mode is LIO_NOWAIT. + * + * We use the address of the lio_context for this, since it is + * unique in the address space. + */ + result = lio_create_entry( p, my_aiocbp, lio_context, (entryp_listp + i) ); if ( result != 0 && call_result == -1 ) call_result = result; - } - - /* - * we need to protect this section since we do not want any of these grouped - * IO requests to begin until we have them all on the queue. - */ - AIO_LOCK; - for ( i = 0; i < uap->nent; i++ ) { - aio_workq_entry *entryp; /* NULL elements are legal so check for 'em */ entryp = *(entryp_listp + i); - if ( entryp == NULL ) + if ( entryp == NULL ) { + aio_proc_lock_spin(p); + lio_context->io_issued--; + aio_proc_unlock(p); continue; + } + + if ( uap->mode == LIO_NOWAIT ) { + /* Set signal hander, if any */ + entryp->aiocb.aio_sigevent = aiosigev; + } else { + /* flag that this thread blocks pending completion */ + entryp->flags |= AIO_LIO_NOTIFY; + } /* check our aio limits to throttle bad or rude user land behavior */ - if ( aio_get_all_queues_count( ) >= aio_max_requests || + old_count = aio_increment_total_count(); + + aio_proc_lock_spin(p); + if ( old_count >= aio_max_requests || aio_get_process_count( entryp->procp ) >= aio_max_requests_per_process || is_already_queued( entryp->procp, entryp->uaiocbp ) == TRUE ) { - vm_map_t my_map; - my_map = entryp->aio_map; - entryp->aio_map = VM_MAP_NULL; + lio_context->io_issued--; + aio_proc_unlock(p); + + aio_decrement_total_count(); + if ( call_result == -1 ) - call_result = EAGAIN; - AIO_UNLOCK; - aio_free_request( entryp, my_map ); - AIO_LOCK; + call_result = EAGAIN; + aio_free_request(entryp); + entryp_listp[i] = NULL; continue; } - /* place the request on the appropriate queue */ - if ( uap->mode == LIO_NOWAIT ) { - TAILQ_INSERT_TAIL( &aio_anchor.aio_async_workq, entryp, aio_workq_link ); - aio_anchor.aio_async_workq_count++; - - KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued)) | DBG_FUNC_NONE, - (int)p, (int)entryp->uaiocbp, 0, 0, 0 ); - } - else { - TAILQ_INSERT_TAIL( &aio_anchor.lio_sync_workq, entryp, aio_workq_link ); - aio_anchor.lio_sync_workq_count++; - } - } - - if ( uap->mode == LIO_NOWAIT ) { - /* caller does not want to wait so we'll fire off a worker thread and return */ - wakeup_one( (caddr_t) &aio_anchor.aio_async_workq ); + lck_mtx_convert_spin(aio_proc_mutex(p)); + aio_enqueue_work(p, entryp, 1); + aio_proc_unlock(p); + + KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued)) | DBG_FUNC_NONE, + (int)p, (int)entryp->uaiocbp, 0, 0, 0 ); } - else { - aio_workq_entry *entryp; - int error; - /* - * mode is LIO_WAIT - handle the IO requests now. - */ - entryp = TAILQ_FIRST( &aio_anchor.lio_sync_workq ); - while ( entryp != NULL ) { - if ( p == entryp->procp && group_tag == entryp->group_tag ) { - - TAILQ_REMOVE( &aio_anchor.lio_sync_workq, entryp, aio_workq_link ); - aio_anchor.lio_sync_workq_count--; - AIO_UNLOCK; - - if ( (entryp->flags & AIO_READ) != 0 ) { - error = do_aio_read( entryp ); - } - else if ( (entryp->flags & AIO_WRITE) != 0 ) { - error = do_aio_write( entryp ); - } - else if ( (entryp->flags & AIO_FSYNC) != 0 ) { - error = do_aio_fsync( entryp ); - } - else { - printf( "%s - unknown aio request - flags 0x%02X \n", - __FUNCTION__, entryp->flags ); - error = EINVAL; - } - entryp->errorval = error; - if ( error != 0 && call_result == -1 ) - call_result = EIO; - - AIO_LOCK; - /* we're done with the IO request so move it on the done queue */ - TAILQ_INSERT_TAIL( &p->aio_doneq, entryp, aio_workq_link ); - aio_anchor.aio_done_count++; - p->aio_done_count++; - - /* need to start over since lio_sync_workq may have been changed while we */ - /* were away doing the IO. */ - entryp = TAILQ_FIRST( &aio_anchor.lio_sync_workq ); - continue; - } /* p == entryp->procp */ + switch(uap->mode) { + case LIO_WAIT: + aio_proc_lock_spin(p); + while (lio_context->io_completed < lio_context->io_issued) { + result = msleep(lio_context, aio_proc_mutex(p), PCATCH | PRIBIO | PSPIN, "lio_listio", 0); - entryp = TAILQ_NEXT( entryp, aio_workq_link ); - } /* while ( entryp != NULL ) */ - } /* uap->mode == LIO_WAIT */ - AIO_UNLOCK; + /* If we were interrupted, fail out (even if all finished) */ + if (result != 0) { + call_result = EINTR; + lio_context->io_waiter = 0; + break; + } + } + /* If all IOs have finished must free it */ + if (lio_context->io_completed == lio_context->io_issued) { + free_context = TRUE; + } + + aio_proc_unlock(p); + break; + + case LIO_NOWAIT: + break; + } + /* call_result == -1 means we had no trouble queueing up requests */ if ( call_result == -1 ) { call_result = 0; @@ -1243,7 +1635,10 @@ ExitRoutine: FREE( entryp_listp, M_TEMP ); if ( aiocbpp != NULL ) FREE( aiocbpp, M_TEMP ); - + if ((lio_context != NULL) && ((lio_context->io_issued == 0) || (free_context == TRUE))) { + free_lio_context(lio_context); + } + KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_listio)) | DBG_FUNC_END, (int)p, call_result, 0, 0, 0 ); @@ -1257,115 +1652,93 @@ ExitRoutine: * we get a wake up call on sleep channel &aio_anchor.aio_async_workq * after new work is queued up. */ - static void aio_work_thread( void ) { aio_workq_entry *entryp; + int error; + vm_map_t currentmap; + vm_map_t oldmap = VM_MAP_NULL; + task_t oldaiotask = TASK_NULL; + struct uthread *uthreadp = NULL; for( ;; ) { - AIO_LOCK; - entryp = aio_get_some_work(); - if ( entryp == NULL ) { - /* - * aio worker threads wait for some work to get queued up - * by aio_queue_async_request. Once some work gets queued - * it will wake up one of these worker threads just before - * returning to our caller in user land. - */ - assert_wait( (event_t) &aio_anchor.aio_async_workq, THREAD_UNINT ); - AIO_UNLOCK; - - thread_block( (thread_continue_t)aio_work_thread ); - /* NOT REACHED */ - } + /* + * returns with the entry ref'ed. + * sleeps until work is available. + */ + entryp = aio_get_some_work(); + + KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread)) | DBG_FUNC_START, + (int)entryp->procp, (int)entryp->uaiocbp, entryp->flags, 0, 0 ); + + /* + * Assume the target's address space identity for the duration + * of the IO. Note: don't need to have the entryp locked, + * because the proc and map don't change until it's freed. + */ + currentmap = get_task_map( (current_proc())->task ); + if ( currentmap != entryp->aio_map ) { + uthreadp = (struct uthread *) get_bsdthread_info(current_thread()); + oldaiotask = uthreadp->uu_aio_task; + uthreadp->uu_aio_task = entryp->procp->task; + oldmap = vm_map_switch( entryp->aio_map ); + } + + if ( (entryp->flags & AIO_READ) != 0 ) { + error = do_aio_read( entryp ); + } + else if ( (entryp->flags & AIO_WRITE) != 0 ) { + error = do_aio_write( entryp ); + } + else if ( (entryp->flags & (AIO_FSYNC | AIO_DSYNC)) != 0 ) { + error = do_aio_fsync( entryp ); + } else { - int error; - vm_map_t currentmap; - vm_map_t oldmap = VM_MAP_NULL; - task_t oldaiotask = TASK_NULL; - struct uthread *uthreadp = NULL; + printf( "%s - unknown aio request - flags 0x%02X \n", + __FUNCTION__, entryp->flags ); + error = EINVAL; + } - AIO_UNLOCK; + /* Restore old map */ + if ( currentmap != entryp->aio_map ) { + (void) vm_map_switch( oldmap ); + uthreadp->uu_aio_task = oldaiotask; + } - KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread)) | DBG_FUNC_START, - (int)entryp->procp, (int)entryp->uaiocbp, entryp->flags, 0, 0 ); - - /* - * Assume the target's address space identity for the duration - * of the IO. - */ - currentmap = get_task_map( (current_proc())->task ); - if ( currentmap != entryp->aio_map ) { - uthreadp = (struct uthread *) get_bsdthread_info(current_thread()); - oldaiotask = uthreadp->uu_aio_task; - uthreadp->uu_aio_task = entryp->procp->task; - oldmap = vm_map_switch( entryp->aio_map ); - } - - if ( (entryp->flags & AIO_READ) != 0 ) { - error = do_aio_read( entryp ); - } - else if ( (entryp->flags & AIO_WRITE) != 0 ) { - error = do_aio_write( entryp ); - } - else if ( (entryp->flags & AIO_FSYNC) != 0 ) { - error = do_aio_fsync( entryp ); - } - else { - printf( "%s - unknown aio request - flags 0x%02X \n", - __FUNCTION__, entryp->flags ); - error = EINVAL; - } - entryp->errorval = error; - if ( currentmap != entryp->aio_map ) { - (void) vm_map_switch( oldmap ); - uthreadp->uu_aio_task = oldaiotask; - } - - /* we're done with the IO request so pop it off the active queue and */ - /* push it on the done queue */ - AIO_LOCK; - TAILQ_REMOVE( &entryp->procp->aio_activeq, entryp, aio_workq_link ); - aio_anchor.aio_active_count--; - entryp->procp->aio_active_count--; - TAILQ_INSERT_TAIL( &entryp->procp->aio_doneq, entryp, aio_workq_link ); - aio_anchor.aio_done_count++; - entryp->procp->aio_done_count++; - entryp->flags |= AIO_COMPLETION; - - /* remove our reference to the user land map. */ - if ( VM_MAP_NULL != entryp->aio_map ) { - vm_map_t my_map; - - my_map = entryp->aio_map; - entryp->aio_map = VM_MAP_NULL; - AIO_UNLOCK; /* must unlock before calling vm_map_deallocate() */ - vm_map_deallocate( my_map ); - } - else { - AIO_UNLOCK; - } - - do_aio_completion( entryp ); - - KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread)) | DBG_FUNC_END, - (int)entryp->procp, (int)entryp->uaiocbp, entryp->errorval, - entryp->returnval, 0 ); - - AIO_LOCK; - entryp->flags &= ~AIO_COMPLETION; - if ( (entryp->flags & AIO_DO_FREE) != 0 ) { - vm_map_t my_map; - - my_map = entryp->aio_map; - entryp->aio_map = VM_MAP_NULL; - AIO_UNLOCK; - aio_free_request( entryp, my_map ); - } - else - AIO_UNLOCK; + KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread)) | DBG_FUNC_END, + (int)entryp->procp, (int)entryp->uaiocbp, entryp->errorval, + entryp->returnval, 0 ); + + + /* XXX COUNTS */ + aio_entry_lock_spin(entryp); + entryp->errorval = error; + aio_entry_unlock(entryp); + + /* we're done with the IO request so pop it off the active queue and */ + /* push it on the done queue */ + aio_proc_lock(entryp->procp); + aio_proc_move_done_locked(entryp->procp, entryp); + aio_proc_unlock(entryp->procp); + + OSDecrementAtomic(&aio_anchor.aio_inflight_count); + + /* remove our reference to the user land map. */ + if ( VM_MAP_NULL != entryp->aio_map ) { + vm_map_t my_map; + + my_map = entryp->aio_map; + entryp->aio_map = VM_MAP_NULL; + vm_map_deallocate( my_map ); } + + /* Provide notifications */ + do_aio_completion( entryp ); + + /* Will free if needed */ + aio_entry_unref(entryp); + } /* for ( ;; ) */ /* NOT REACHED */ @@ -1379,387 +1752,327 @@ aio_work_thread( void ) * IO requests at the time the aio_fsync call came in have completed. * NOTE - AIO_LOCK must be held by caller */ - static aio_workq_entry * aio_get_some_work( void ) { - aio_workq_entry *entryp; - - /* pop some work off the work queue and add to our active queue */ - for ( entryp = TAILQ_FIRST( &aio_anchor.aio_async_workq ); - entryp != NULL; - entryp = TAILQ_NEXT( entryp, aio_workq_link ) ) { + aio_workq_entry *entryp = NULL; + aio_workq_t queue = NULL; + + /* Just one queue for the moment. In the future there will be many. */ + queue = &aio_anchor.aio_async_workqs[0]; + aio_workq_lock_spin(queue); + if (queue->aioq_count == 0) { + goto nowork; + } + /* + * Hold the queue lock. + * + * pop some work off the work queue and add to our active queue + * Always start with the queue lock held. + */ + for(;;) { + /* + * Pull of of work queue. Once it's off, it can't be cancelled, + * so we can take our ref once we drop the queue lock. + */ + entryp = TAILQ_FIRST(&queue->aioq_entries); + + /* + * If there's no work or only fsyncs that need delay, go to sleep + * and then start anew from aio_work_thread + */ + if (entryp == NULL) { + goto nowork; + } + + aio_workq_remove_entry_locked(queue, entryp); + + aio_workq_unlock(queue); + + /* + * Check if it's an fsync that must be delayed. No need to lock the entry; + * that flag would have been set at initialization. + */ if ( (entryp->flags & AIO_FSYNC) != 0 ) { - /* leave aio_fsync calls on the work queue if there are IO */ - /* requests on the active queue for the same file descriptor. */ + /* + * Check for unfinished operations on the same file + * in this proc's queue. + */ + aio_proc_lock_spin(entryp->procp); if ( aio_delay_fsync_request( entryp ) ) { - + /* It needs to be delayed. Put it back on the end of the work queue */ KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync_delay)) | DBG_FUNC_NONE, (int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 ); + + aio_proc_unlock(entryp->procp); + + aio_workq_lock_spin(queue); + aio_workq_add_entry_locked(queue, entryp); continue; - } + } + aio_proc_unlock(entryp->procp); } + break; } - - if ( entryp != NULL ) { - TAILQ_REMOVE( &aio_anchor.aio_async_workq, entryp, aio_workq_link ); - aio_anchor.aio_async_workq_count--; - TAILQ_INSERT_TAIL( &entryp->procp->aio_activeq, entryp, aio_workq_link ); - aio_anchor.aio_active_count++; - entryp->procp->aio_active_count++; - } - + + aio_entry_ref(entryp); + + OSIncrementAtomic(&aio_anchor.aio_inflight_count); return( entryp ); - -} /* aio_get_some_work */ +nowork: + /* We will wake up when someone enqueues something */ + wait_queue_assert_wait(queue->aioq_waitq, queue, THREAD_UNINT, 0); + aio_workq_unlock(queue); + thread_block( (thread_continue_t)aio_work_thread ); + + // notreached + return NULL; +} /* - * aio_delay_fsync_request - look to see if this aio_fsync request should be delayed at - * this time. Delay will happen when there are any active IOs for the same file - * descriptor that were queued at time the aio_sync call was queued. - * NOTE - AIO_LOCK must be held by caller + * aio_delay_fsync_request - look to see if this aio_fsync request should be delayed. + * A big, simple hammer: only send it off if it's the most recently filed IO which has + * not been completed. */ static boolean_t aio_delay_fsync_request( aio_workq_entry *entryp ) { - aio_workq_entry *my_entryp; - - TAILQ_FOREACH( my_entryp, &entryp->procp->aio_activeq, aio_workq_link ) { - if ( my_entryp->fsyncp != USER_ADDR_NULL && - entryp->uaiocbp == my_entryp->fsyncp && - entryp->aiocb.aio_fildes == my_entryp->aiocb.aio_fildes ) { - return( TRUE ); - } + if (entryp == TAILQ_FIRST(&entryp->procp->p_aio_activeq)) { + return FALSE; } - return( FALSE ); - + return TRUE; } /* aio_delay_fsync_request */ - -/* - * aio_queue_async_request - queue up an async IO request on our work queue then - * wake up one of our worker threads to do the actual work. We get a reference - * to our caller's user land map in order to keep it around while we are - * processing the request. - */ - -static int -aio_queue_async_request( struct proc *procp, user_addr_t aiocbp, int kindOfIO ) +static aio_workq_entry * +aio_create_queue_entry(proc_t procp, user_addr_t aiocbp, void *group_tag, int kindOfIO) { - aio_workq_entry *entryp; - int result; + aio_workq_entry *entryp; + int result = 0; entryp = (aio_workq_entry *) zalloc( aio_workq_zonep ); if ( entryp == NULL ) { result = EAGAIN; goto error_exit; } + bzero( entryp, sizeof(*entryp) ); /* fill in the rest of the aio_workq_entry */ entryp->procp = procp; entryp->uaiocbp = aiocbp; entryp->flags |= kindOfIO; + entryp->group_tag = group_tag; entryp->aio_map = VM_MAP_NULL; + entryp->aio_refcount = 0; - if ( !IS_64BIT_PROCESS(procp) ) { - struct aiocb aiocb32; - + if ( proc_is64bit(procp) ) { + struct user64_aiocb aiocb64; + + result = copyin( aiocbp, &aiocb64, sizeof(aiocb64) ); + if (result == 0 ) + do_munge_aiocb_user64_to_user(&aiocb64, &entryp->aiocb); + + } else { + struct user32_aiocb aiocb32; + result = copyin( aiocbp, &aiocb32, sizeof(aiocb32) ); if ( result == 0 ) - do_munge_aiocb( &aiocb32, &entryp->aiocb ); - } else - result = copyin( aiocbp, &entryp->aiocb, sizeof(entryp->aiocb) ); + do_munge_aiocb_user32_to_user( &aiocb32, &entryp->aiocb ); + } if ( result != 0 ) { result = EAGAIN; goto error_exit; } - /* do some more validation on the aiocb and embedded file descriptor */ - result = aio_validate( entryp ); - if ( result != 0 ) - goto error_exit; - /* get a reference to the user land map in order to keep it around */ entryp->aio_map = get_task_map( procp->task ); vm_map_reference( entryp->aio_map ); - AIO_LOCK; + /* do some more validation on the aiocb and embedded file descriptor */ + result = aio_validate( entryp ); + if ( result != 0 ) + goto error_exit_with_ref; - if ( is_already_queued( entryp->procp, entryp->uaiocbp ) == TRUE ) { - AIO_UNLOCK; - result = EAGAIN; - goto error_exit; - } + /* get a reference on the current_thread, which is passed in vfs_context. */ + entryp->thread = current_thread(); + thread_reference( entryp->thread ); + return ( entryp ); - /* check our aio limits to throttle bad or rude user land behavior */ - if ( aio_get_all_queues_count( ) >= aio_max_requests || - aio_get_process_count( procp ) >= aio_max_requests_per_process ) { - AIO_UNLOCK; - result = EAGAIN; - goto error_exit; +error_exit_with_ref: + if ( VM_MAP_NULL != entryp->aio_map ) { + vm_map_deallocate( entryp->aio_map ); } - - /* - * aio_fsync calls sync up all async IO requests queued at the time - * the aio_fsync call was made. So we mark each currently queued async - * IO with a matching file descriptor as must complete before we do the - * fsync. We set the fsyncp field of each matching async IO - * request with the aiocb pointer passed in on the aio_fsync call to - * know which IOs must complete before we process the aio_fsync call. - */ - if ( (kindOfIO & AIO_FSYNC) != 0 ) - aio_mark_requests( entryp ); - - /* queue up on our aio asynchronous work queue */ - TAILQ_INSERT_TAIL( &aio_anchor.aio_async_workq, entryp, aio_workq_link ); - aio_anchor.aio_async_workq_count++; - - wakeup_one( (caddr_t) &aio_anchor.aio_async_workq ); - AIO_UNLOCK; - - KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued)) | DBG_FUNC_NONE, - (int)procp, (int)aiocbp, 0, 0, 0 ); - - return( 0 ); - error_exit: - if ( entryp != NULL ) { - /* this entry has not been queued up so no worries about unlocked */ - /* state and aio_map */ - aio_free_request( entryp, entryp->aio_map ); + if ( result && entryp != NULL ) { + zfree( aio_workq_zonep, entryp ); + entryp = NULL; } - - return( result ); - -} /* aio_queue_async_request */ + + return ( entryp ); +} /* - * lio_create_async_entry - allocate an aio_workq_entry and fill it in. - * If all goes well return 0 and pass the aio_workq_entry pointer back to - * our caller. We get a reference to our caller's user land map in order to keep - * it around while we are processing the request. - * lio_listio calls behave differently at completion they do completion notification - * when all async IO requests have completed. We use group_tag to tag IO requests - * that behave in the delay notification manner. + * aio_queue_async_request - queue up an async IO request on our work queue then + * wake up one of our worker threads to do the actual work. We get a reference + * to our caller's user land map in order to keep it around while we are + * processing the request. */ - static int -lio_create_async_entry( struct proc *procp, user_addr_t aiocbp, - user_addr_t sigp, long group_tag, - aio_workq_entry **entrypp ) +aio_queue_async_request(proc_t procp, user_addr_t aiocbp, int kindOfIO ) { - aio_workq_entry *entryp; - int result; + aio_workq_entry *entryp; + int result; + int old_count; - entryp = (aio_workq_entry *) zalloc( aio_workq_zonep ); - if ( entryp == NULL ) { - result = EAGAIN; - goto error_exit; + old_count = aio_increment_total_count(); + if (old_count >= aio_max_requests) { + result = EAGAIN; + goto error_noalloc; } - bzero( entryp, sizeof(*entryp) ); - - /* fill in the rest of the aio_workq_entry */ - entryp->procp = procp; - entryp->uaiocbp = aiocbp; - entryp->flags |= AIO_LIO; - entryp->group_tag = group_tag; - entryp->aio_map = VM_MAP_NULL; - - if ( !IS_64BIT_PROCESS(procp) ) { - struct aiocb aiocb32; - - result = copyin( aiocbp, &aiocb32, sizeof(aiocb32) ); - if ( result == 0 ) - do_munge_aiocb( &aiocb32, &entryp->aiocb ); - } else - result = copyin( aiocbp, &entryp->aiocb, sizeof(entryp->aiocb) ); - if ( result != 0 ) { + entryp = aio_create_queue_entry( procp, aiocbp, 0, kindOfIO); + if ( entryp == NULL ) { result = EAGAIN; - goto error_exit; + goto error_noalloc; } - /* look for lio_listio LIO_NOP requests and ignore them. */ - /* Not really an error, but we need to free our aio_workq_entry. */ - if ( entryp->aiocb.aio_lio_opcode == LIO_NOP ) { - result = 0; - goto error_exit; - } - /* use sigevent passed in to lio_listio for each of our calls, but only */ - /* do completion notification after the last request completes. */ - if ( sigp != USER_ADDR_NULL ) { - if ( !IS_64BIT_PROCESS(procp) ) { - struct sigevent sigevent32; - - result = copyin( sigp, &sigevent32, sizeof(sigevent32) ); - if ( result == 0 ) { - /* also need to munge aio_sigevent since it contains pointers */ - /* special case here. since we do not know if sigev_value is an */ - /* int or a ptr we do NOT cast the ptr to a user_addr_t. This */ - /* means if we send this info back to user space we need to remember */ - /* sigev_value was not expanded for the 32-bit case. */ - /* NOTE - this does NOT affect us since we don't support sigev_value */ - /* yet in the aio context. */ - //LP64 - entryp->aiocb.aio_sigevent.sigev_notify = sigevent32.sigev_notify; - entryp->aiocb.aio_sigevent.sigev_signo = sigevent32.sigev_signo; - entryp->aiocb.aio_sigevent.sigev_value.size_equivalent.sival_int = - sigevent32.sigev_value.sival_int; - entryp->aiocb.aio_sigevent.sigev_notify_function = - CAST_USER_ADDR_T(sigevent32.sigev_notify_function); - entryp->aiocb.aio_sigevent.sigev_notify_attributes = - CAST_USER_ADDR_T(sigevent32.sigev_notify_attributes); - } - } else - result = copyin( sigp, &entryp->aiocb.aio_sigevent, sizeof(entryp->aiocb.aio_sigevent) ); + aio_proc_lock_spin(procp); - if ( result != 0 ) { - result = EAGAIN; - goto error_exit; - } + if ( is_already_queued( entryp->procp, entryp->uaiocbp ) == TRUE ) { + result = EAGAIN; + goto error_exit; } - /* do some more validation on the aiocb and embedded file descriptor */ - result = aio_validate( entryp ); - if ( result != 0 ) + /* check our aio limits to throttle bad or rude user land behavior */ + if (aio_get_process_count( procp ) >= aio_max_requests_per_process) { + printf("aio_queue_async_request(): too many in flight for proc: %d.\n", procp->p_aio_total_count); + result = EAGAIN; goto error_exit; - - /* get a reference to the user land map in order to keep it around */ - entryp->aio_map = get_task_map( procp->task ); - vm_map_reference( entryp->aio_map ); + } - *entrypp = entryp; - return( 0 ); + /* Add the IO to proc and work queues, wake up threads as appropriate */ + lck_mtx_convert_spin(aio_proc_mutex(procp)); + aio_enqueue_work(procp, entryp, 1); -error_exit: - if ( entryp != NULL ) - zfree( aio_workq_zonep, entryp ); - - return( result ); + aio_proc_unlock(procp); -} /* lio_create_async_entry */ - + KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued)) | DBG_FUNC_NONE, + (int)procp, (int)aiocbp, 0, 0, 0 ); -/* - * aio_mark_requests - aio_fsync calls synchronize file data for all queued async IO - * requests at the moment the aio_fsync call is queued. We use aio_workq_entry.fsyncp - * to mark each async IO that must complete before the fsync is done. We use the uaiocbp - * field from the aio_fsync call as the aio_workq_entry.fsyncp in marked requests. - * NOTE - AIO_LOCK must be held by caller - */ + return( 0 ); + +error_exit: + /* + * This entry has not been queued up so no worries about + * unlocked state and aio_map + */ + aio_proc_unlock(procp); + aio_free_request(entryp); -static void -aio_mark_requests( aio_workq_entry *entryp ) -{ - aio_workq_entry *my_entryp; +error_noalloc: + aio_decrement_total_count(); - TAILQ_FOREACH( my_entryp, &entryp->procp->aio_activeq, aio_workq_link ) { - if ( entryp->aiocb.aio_fildes == my_entryp->aiocb.aio_fildes ) { - my_entryp->fsyncp = entryp->uaiocbp; - } - } + return( result ); - TAILQ_FOREACH( my_entryp, &aio_anchor.aio_async_workq, aio_workq_link ) { - if ( entryp->procp == my_entryp->procp && - entryp->aiocb.aio_fildes == my_entryp->aiocb.aio_fildes ) { - my_entryp->fsyncp = entryp->uaiocbp; - } - } - -} /* aio_mark_requests */ +} /* aio_queue_async_request */ /* - * lio_create_sync_entry - allocate an aio_workq_entry and fill it in. - * If all goes well return 0 and pass the aio_workq_entry pointer back to - * our caller. - * lio_listio calls behave differently at completion they do completion notification - * when all async IO requests have completed. We use group_tag to tag IO requests - * that behave in the delay notification manner. + * lio_create_entry + * + * Allocate an aio_workq_entry and fill it in. If all goes well return 0 + * and pass the aio_workq_entry pointer back to our caller. + * + * Parameters: procp The process makign the request + * aiocbp The aio context buffer pointer + * group_tag The group tag used to indicate a + * group of operations has completed + * entrypp Pointer to the pointer to receive the + * address of the created aio_workq_entry + * + * Returns: 0 Successfully created + * EAGAIN Try again (usually resource shortage) + * + * + * Notes: We get a reference to our caller's user land map in order + * to keep it around while we are processing the request. + * + * lio_listio calls behave differently at completion they do + * completion notification when all async IO requests have + * completed. We use group_tag to tag IO requests that behave + * in the delay notification manner. + * + * All synchronous operations are considered to not have a + * signal routine associated with them (sigp == USER_ADDR_NULL). */ - static int -lio_create_sync_entry( struct proc *procp, user_addr_t aiocbp, - long group_tag, aio_workq_entry **entrypp ) +lio_create_entry(proc_t procp, user_addr_t aiocbp, void *group_tag, + aio_workq_entry **entrypp ) { - aio_workq_entry *entryp; - int result; + aio_workq_entry *entryp; + int result; - entryp = (aio_workq_entry *) zalloc( aio_workq_zonep ); + entryp = aio_create_queue_entry( procp, aiocbp, group_tag, AIO_LIO); if ( entryp == NULL ) { result = EAGAIN; goto error_exit; } - bzero( entryp, sizeof(*entryp) ); - - /* fill in the rest of the aio_workq_entry */ - entryp->procp = procp; - entryp->uaiocbp = aiocbp; - entryp->flags |= AIO_LIO; - entryp->group_tag = group_tag; - entryp->aio_map = VM_MAP_NULL; - - if ( !IS_64BIT_PROCESS(procp) ) { - struct aiocb aiocb32; - - result = copyin( aiocbp, &aiocb32, sizeof(aiocb32) ); - if ( result == 0 ) - do_munge_aiocb( &aiocb32, &entryp->aiocb ); - } else - result = copyin( aiocbp, &entryp->aiocb, sizeof(entryp->aiocb) ); - - if ( result != 0 ) { - result = EAGAIN; - goto error_exit; - } - /* look for lio_listio LIO_NOP requests and ignore them. */ - /* Not really an error, but we need to free our aio_workq_entry. */ + /* + * Look for lio_listio LIO_NOP requests and ignore them; this is + * not really an error, but we need to free our aio_workq_entry. + */ if ( entryp->aiocb.aio_lio_opcode == LIO_NOP ) { result = 0; goto error_exit; } - result = aio_validate( entryp ); - if ( result != 0 ) { - goto error_exit; - } - *entrypp = entryp; return( 0 ); error_exit: - if ( entryp != NULL ) - zfree( aio_workq_zonep, entryp ); + + if ( entryp != NULL ) { + /* + * This entry has not been queued up so no worries about + * unlocked state and aio_map + */ + aio_free_request(entryp); + } return( result ); -} /* lio_create_sync_entry */ +} /* lio_create_entry */ /* * aio_free_request - remove our reference on the user land map and - * free the work queue entry resources. - * We are not holding the lock here thus aio_map is passed in and - * zeroed while we did have the lock. + * free the work queue entry resources. The entry is off all lists + * and has zero refcount, so no one can have a pointer to it. */ static int -aio_free_request( aio_workq_entry *entryp, vm_map_t the_map ) +aio_free_request(aio_workq_entry *entryp) { /* remove our reference to the user land map. */ - if ( VM_MAP_NULL != the_map ) { - vm_map_deallocate( the_map ); + if ( VM_MAP_NULL != entryp->aio_map) { + vm_map_deallocate(entryp->aio_map); } - + + /* remove our reference to thread which enqueued the request */ + if ( NULL != entryp->thread ) { + thread_deallocate( entryp->thread ); + } + + entryp->aio_refcount = -1; /* A bit of poisoning in case of bad refcounting. */ + zfree( aio_workq_zonep, entryp ); return( 0 ); @@ -1767,9 +2080,11 @@ aio_free_request( aio_workq_entry *entryp, vm_map_t the_map ) } /* aio_free_request */ -/* aio_validate - validate the aiocb passed in by one of the aio syscalls. +/* + * aio_validate + * + * validate the aiocb passed in by one of the aio syscalls. */ - static int aio_validate( aio_workq_entry *entryp ) { @@ -1791,32 +2106,46 @@ aio_validate( aio_workq_entry *entryp ) } flag = FREAD; - if ( (entryp->flags & (AIO_WRITE | AIO_FSYNC)) != 0 ) { + if ( (entryp->flags & (AIO_WRITE | AIO_FSYNC | AIO_DSYNC)) != 0 ) { flag = FWRITE; } if ( (entryp->flags & (AIO_READ | AIO_WRITE)) != 0 ) { - // LP64todo - does max value for aio_nbytes need to grow? if ( entryp->aiocb.aio_nbytes > INT_MAX || entryp->aiocb.aio_buf == USER_ADDR_NULL || entryp->aiocb.aio_offset < 0 ) return( EINVAL ); } - /* validate aiocb.aio_sigevent. at this point we only support sigev_notify - * equal to SIGEV_SIGNAL or SIGEV_NONE. this means sigev_value, - * sigev_notify_function, and sigev_notify_attributes are ignored. + /* + * validate aiocb.aio_sigevent. at this point we only support + * sigev_notify equal to SIGEV_SIGNAL or SIGEV_NONE. this means + * sigev_value, sigev_notify_function, and sigev_notify_attributes + * are ignored, since SIGEV_THREAD is unsupported. This is consistent + * with no [RTS] (RalTime Signal) option group support. */ - if ( entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL ) { + switch ( entryp->aiocb.aio_sigevent.sigev_notify ) { + case SIGEV_SIGNAL: + { int signum; + /* make sure we have a valid signal number */ signum = entryp->aiocb.aio_sigevent.sigev_signo; if ( signum <= 0 || signum >= NSIG || signum == SIGKILL || signum == SIGSTOP ) return (EINVAL); - } - else if ( entryp->aiocb.aio_sigevent.sigev_notify != SIGEV_NONE ) + } + break; + + case SIGEV_NONE: + break; + + case SIGEV_THREAD: + /* Unsupported [RTS] */ + + default: return (EINVAL); + } /* validate the file descriptor and that the file was opened * for the appropriate read / write access. @@ -1829,7 +2158,7 @@ aio_validate( aio_workq_entry *entryp ) /* we don't have read or write access */ result = EBADF; } - else if ( fp->f_fglob->fg_type != DTYPE_VNODE ) { + else if ( FILEGLOB_DTYPE(fp->f_fglob) != DTYPE_VNODE ) { /* this is not a file */ result = ESPIPE; } else @@ -1847,61 +2176,34 @@ aio_validate( aio_workq_entry *entryp ) } /* aio_validate */ +static int +aio_increment_total_count() +{ + return OSIncrementAtomic(&aio_anchor.aio_total_count); +} -/* - * aio_get_process_count - runs through our queues that hold outstanding - * async IO reqests and totals up number of requests for the given - * process. - * NOTE - caller must hold aio lock! - */ +static int +aio_decrement_total_count() +{ + int old = OSDecrementAtomic(&aio_anchor.aio_total_count); + if (old <= 0) { + panic("Negative total AIO count!\n"); + } + + return old; +} static int -aio_get_process_count( struct proc *procp ) +aio_get_process_count(proc_t procp ) { - aio_workq_entry *entryp; - int count; - - /* begin with count of completed async IO requests for this process */ - count = procp->aio_done_count; - - /* add in count of active async IO requests for this process */ - count += procp->aio_active_count; - - /* look for matches on our queue of asynchronous todo work */ - TAILQ_FOREACH( entryp, &aio_anchor.aio_async_workq, aio_workq_link ) { - if ( procp == entryp->procp ) { - count++; - } - } - - /* look for matches on our queue of synchronous todo work */ - TAILQ_FOREACH( entryp, &aio_anchor.lio_sync_workq, aio_workq_link ) { - if ( procp == entryp->procp ) { - count++; - } - } - - return( count ); + return procp->p_aio_total_count; } /* aio_get_process_count */ - -/* - * aio_get_all_queues_count - get total number of entries on all aio work queues. - * NOTE - caller must hold aio lock! - */ - static int aio_get_all_queues_count( void ) { - int count; - - count = aio_anchor.aio_async_workq_count; - count += aio_anchor.lio_sync_workq_count; - count += aio_anchor.aio_active_count; - count += aio_anchor.aio_done_count; - - return( count ); + return aio_anchor.aio_total_count; } /* aio_get_all_queues_count */ @@ -1909,113 +2211,143 @@ aio_get_all_queues_count( void ) /* * do_aio_completion. Handle async IO completion. */ - static void do_aio_completion( aio_workq_entry *entryp ) { - /* signal user land process if appropriate */ + + boolean_t lastLioCompleted = FALSE; + aio_lio_context *lio_context = NULL; + int waiter = 0; + + lio_context = (aio_lio_context *)entryp->group_tag; + + if (lio_context != NULL) { + + aio_proc_lock_spin(entryp->procp); + + /* Account for this I/O completing. */ + lio_context->io_completed++; + + /* Are we done with this lio context? */ + if (lio_context->io_issued == lio_context->io_completed) { + lastLioCompleted = TRUE; + } + + waiter = lio_context->io_waiter; + + /* explicit wakeup of lio_listio() waiting in LIO_WAIT */ + if ((entryp->flags & AIO_LIO_NOTIFY) && (lastLioCompleted) && (waiter != 0)) { + /* wake up the waiter */ + wakeup(lio_context); + } + + aio_proc_unlock(entryp->procp); + } + if ( entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL && (entryp->flags & AIO_DISABLE) == 0 ) { - - /* - * if group_tag is non zero then make sure this is the last IO request - * in the group before we signal. - */ - if ( entryp->group_tag == 0 || - (entryp->group_tag != 0 && aio_last_group_io( entryp )) ) { + + boolean_t performSignal = FALSE; + if (lio_context == NULL) { + performSignal = TRUE; + } + else { + /* + * If this was the last request in the group and a signal + * is desired, send one. + */ + performSignal = lastLioCompleted; + } + + if (performSignal) { + KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_sig)) | DBG_FUNC_NONE, - (int)entryp->procp, (int)entryp->uaiocbp, - entryp->aiocb.aio_sigevent.sigev_signo, 0, 0 ); + (int)entryp->procp, (int)entryp->uaiocbp, + entryp->aiocb.aio_sigevent.sigev_signo, 0, 0 ); psignal( entryp->procp, entryp->aiocb.aio_sigevent.sigev_signo ); - return; } } + if ((entryp->flags & AIO_EXIT_WAIT) && (entryp->flags & AIO_CLOSE_WAIT)) { + panic("Close and exit flags set at the same time\n"); + } + /* - * need to handle case where a process is trying to exit, exec, or close - * and is currently waiting for active aio requests to complete. If - * AIO_WAITING is set then we need to look to see if there are any + * need to handle case where a process is trying to exit, exec, or + * close and is currently waiting for active aio requests to complete. + * If AIO_CLEANUP_WAIT is set then we need to look to see if there are any * other requests in the active queue for this process. If there are - * none then wakeup using the AIO_CLEANUP_SLEEP_CHAN tsleep channel. If - * there are some still active then do nothing - we only want to wakeup - * when all active aio requests for the process are complete. + * none then wakeup using the AIO_CLEANUP_SLEEP_CHAN tsleep channel. + * If there are some still active then do nothing - we only want to + * wakeup when all active aio requests for the process are complete. + * + * Don't need to lock the entry or proc to check the cleanup flag. It can only be + * set for cancellation, while the entryp is still on a proc list; now it's + * off, so that flag is already set if it's going to be. */ - if ( (entryp->flags & AIO_WAITING) != 0 ) { + if ( (entryp->flags & AIO_EXIT_WAIT) != 0 ) { int active_requests; KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait)) | DBG_FUNC_NONE, (int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 ); - AIO_LOCK; + aio_proc_lock_spin(entryp->procp); active_requests = aio_active_requests_for_process( entryp->procp ); - //AIO_UNLOCK; if ( active_requests < 1 ) { - /* no active aio requests for this process, continue exiting */ - wakeup_one( (caddr_t) &entryp->procp->AIO_CLEANUP_SLEEP_CHAN ); + /* + * no active aio requests for this process, continue exiting. In this + * case, there should be no one else waiting ont he proc in AIO... + */ + wakeup_one((caddr_t)&entryp->procp->AIO_CLEANUP_SLEEP_CHAN); + aio_proc_unlock(entryp->procp); KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake)) | DBG_FUNC_NONE, (int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 ); + } else { + aio_proc_unlock(entryp->procp); } - AIO_UNLOCK; - return; } + + if ( (entryp->flags & AIO_CLOSE_WAIT) != 0 ) { + int active_requests; + + KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait)) | DBG_FUNC_NONE, + (int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 ); + + aio_proc_lock_spin(entryp->procp); + active_requests = aio_proc_active_requests_for_file( entryp->procp, entryp->aiocb.aio_fildes); + if ( active_requests < 1 ) { + /* Can't wakeup_one(); multiple closes might be in progress. */ + wakeup(&entryp->procp->AIO_CLEANUP_SLEEP_CHAN); + aio_proc_unlock(entryp->procp); + KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake)) | DBG_FUNC_NONE, + (int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 ); + } else { + aio_proc_unlock(entryp->procp); + } + } /* - * aio_suspend case when a signal was not requested. In that scenario we - * are sleeping on the AIO_SUSPEND_SLEEP_CHAN channel. - * NOTE - the assumption here is that this wakeup call is inexpensive. - * we really only need to do this when an aio_suspend call is pending. - * If we find the wakeup call should be avoided we could mark the - * async IO requests given in the list provided by aio_suspend and only - * call wakeup for them. If we do mark them we should unmark them after - * the aio_suspend wakes up. + * A thread in aio_suspend() wants to known about completed IOs. If it checked + * the done list before we moved our AIO there, then it already asserted its wait, + * and we can wake it up without holding the lock. If it checked the list after + * we did our move, then it already has seen the AIO that we moved. Herego, we + * can do our wakeup without holding the lock. */ - AIO_LOCK; - wakeup_one( (caddr_t) &entryp->procp->AIO_SUSPEND_SLEEP_CHAN ); - AIO_UNLOCK; - + wakeup( (caddr_t) &entryp->procp->AIO_SUSPEND_SLEEP_CHAN ); KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_suspend_wake)) | DBG_FUNC_NONE, (int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 ); - - return; - -} /* do_aio_completion */ + /* + * free the LIO context if the last lio completed and no thread is + * waiting + */ + if (lastLioCompleted && (waiter == 0)) + free_lio_context (lio_context); -/* - * aio_last_group_io - checks to see if this is the last unfinished IO request - * for the given group_tag. Returns TRUE if there are no other active IO - * requests for this group or FALSE if the are active IO requests - * NOTE - AIO_LOCK must be held by caller - */ - -static boolean_t -aio_last_group_io( aio_workq_entry *entryp ) -{ - aio_workq_entry *my_entryp; - - /* look for matches on our queue of active async IO requests */ - TAILQ_FOREACH( my_entryp, &entryp->procp->aio_activeq, aio_workq_link ) { - if ( my_entryp->group_tag == entryp->group_tag ) - return( FALSE ); - } - - /* look for matches on our queue of asynchronous todo work */ - TAILQ_FOREACH( my_entryp, &aio_anchor.aio_async_workq, aio_workq_link ) { - if ( my_entryp->group_tag == entryp->group_tag ) - return( FALSE ); - } - - /* look for matches on our queue of synchronous todo work */ - TAILQ_FOREACH( my_entryp, &aio_anchor.lio_sync_workq, aio_workq_link ) { - if ( my_entryp->group_tag == entryp->group_tag ) - return( FALSE ); - } - - return( TRUE ); -} /* aio_last_group_io */ +} /* do_aio_completion */ /* @@ -2024,8 +2356,9 @@ aio_last_group_io( aio_workq_entry *entryp ) static int do_aio_read( aio_workq_entry *entryp ) { - struct fileproc *fp; - int error; + struct fileproc *fp; + int error; + struct vfs_context context; if ( (error = fp_lookup(entryp->procp, entryp->aiocb.aio_fildes, &fp , 0)) ) return(error); @@ -2033,18 +2366,16 @@ do_aio_read( aio_workq_entry *entryp ) fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0); return(EBADF); } - if ( fp != NULL ) { - error = dofileread( entryp->procp, fp, entryp->aiocb.aio_fildes, - entryp->aiocb.aio_buf, - entryp->aiocb.aio_nbytes, - entryp->aiocb.aio_offset, FOF_OFFSET, - &entryp->returnval ); - fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0); - } - else { - fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0); - error = EBADF; - } + + context.vc_thread = entryp->thread; /* XXX */ + context.vc_ucred = fp->f_fglob->fg_cred; + + error = dofileread(&context, fp, + entryp->aiocb.aio_buf, + entryp->aiocb.aio_nbytes, + entryp->aiocb.aio_offset, FOF_OFFSET, + &entryp->returnval); + fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0); return( error ); @@ -2058,7 +2389,8 @@ static int do_aio_write( aio_workq_entry *entryp ) { struct fileproc *fp; - int error; + int error, flags; + struct vfs_context context; if ( (error = fp_lookup(entryp->procp, entryp->aiocb.aio_fildes, &fp , 0)) ) return(error); @@ -2066,23 +2398,28 @@ do_aio_write( aio_workq_entry *entryp ) fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0); return(EBADF); } - if ( fp != NULL ) { - /* NB: tell dofilewrite the offset, and to use the proc cred */ - error = dofilewrite( entryp->procp, - fp, - entryp->aiocb.aio_fildes, - entryp->aiocb.aio_buf, - entryp->aiocb.aio_nbytes, - entryp->aiocb.aio_offset, - FOF_OFFSET | FOF_PCRED, - &entryp->returnval); - - fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0); + + flags = FOF_PCRED; + if ( (fp->f_fglob->fg_flag & O_APPEND) == 0 ) { + flags |= FOF_OFFSET; } - else { + + context.vc_thread = entryp->thread; /* XXX */ + context.vc_ucred = fp->f_fglob->fg_cred; + + /* NB: tell dofilewrite the offset, and to use the proc cred */ + error = dofilewrite(&context, + fp, + entryp->aiocb.aio_buf, + entryp->aiocb.aio_nbytes, + entryp->aiocb.aio_offset, + flags, + &entryp->returnval); + + if (entryp->returnval) + fp_drop_written(entryp->procp, entryp->aiocb.aio_fildes, fp); + else fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0); - error = EBADF; - } return( error ); @@ -2092,18 +2429,33 @@ do_aio_write( aio_workq_entry *entryp ) /* * aio_active_requests_for_process - return number of active async IO * requests for the given process. - * NOTE - caller must hold aio lock! */ +static int +aio_active_requests_for_process(proc_t procp ) +{ + return( procp->p_aio_active_count ); + +} /* aio_active_requests_for_process */ +/* + * Called with the proc locked. + */ static int -aio_active_requests_for_process( struct proc *procp ) +aio_proc_active_requests_for_file(proc_t procp, int fd) { - - return( procp->aio_active_count ); + int count = 0; + aio_workq_entry *entryp; + TAILQ_FOREACH(entryp, &procp->p_aio_activeq, aio_proc_link) { + if (entryp->aiocb.aio_fildes == fd) { + count++; + } + } + return count; } /* aio_active_requests_for_process */ + /* * do_aio_fsync */ @@ -2113,14 +2465,28 @@ do_aio_fsync( aio_workq_entry *entryp ) struct vfs_context context; struct vnode *vp; struct fileproc *fp; - int error; - - /* - * NOTE - we will not support AIO_DSYNC until fdatasync() is supported. - * AIO_DSYNC is caught before we queue up a request and flagged as an error. - * The following was shamelessly extracted from fsync() implementation. - */ + int sync_flag; + int error; + /* + * We are never called unless either AIO_FSYNC or AIO_DSYNC are set. + * + * If AIO_DSYNC is set, we can tell the lower layers that it is OK + * to mark for update the metadata not strictly necessary for data + * retrieval, rather than forcing it to disk. + * + * If AIO_FSYNC is set, we have to also wait for metadata not really + * necessary to data retrival are committed to stable storage (e.g. + * atime, mtime, ctime, etc.). + * + * Metadata necessary for data retrieval ust be committed to stable + * storage in either case (file length, etc.). + */ + if (entryp->flags & AIO_FSYNC) + sync_flag = MNT_WAIT; + else + sync_flag = MNT_DWAIT; + error = fp_getfvp( entryp->procp, entryp->aiocb.aio_fildes, &fp, &vp); if ( error == 0 ) { if ( (error = vnode_getwithref(vp)) ) { @@ -2128,10 +2494,10 @@ do_aio_fsync( aio_workq_entry *entryp ) entryp->returnval = -1; return(error); } - context.vc_proc = entryp->procp; + context.vc_thread = current_thread(); context.vc_ucred = fp->f_fglob->fg_cred; - error = VNOP_FSYNC( vp, MNT_WAIT, &context); + error = VNOP_FSYNC( vp, sync_flag, &context); (void)vnode_put(vp); @@ -2149,20 +2515,20 @@ do_aio_fsync( aio_workq_entry *entryp ) * is_already_queued - runs through our queues to see if the given * aiocbp / process is there. Returns TRUE if there is a match * on any of our aio queues. - * NOTE - callers must hold aio lock! + * + * Called with proc aio lock held (can be held spin) */ - static boolean_t -is_already_queued( struct proc *procp, +is_already_queued(proc_t procp, user_addr_t aiocbp ) { aio_workq_entry *entryp; boolean_t result; - + result = FALSE; /* look for matches on our queue of async IO requests that have completed */ - TAILQ_FOREACH( entryp, &procp->aio_doneq, aio_workq_link ) { + TAILQ_FOREACH( entryp, &procp->p_aio_doneq, aio_proc_link ) { if ( aiocbp == entryp->uaiocbp ) { result = TRUE; goto ExitThisRoutine; @@ -2170,35 +2536,32 @@ is_already_queued( struct proc *procp, } /* look for matches on our queue of active async IO requests */ - TAILQ_FOREACH( entryp, &procp->aio_activeq, aio_workq_link ) { + TAILQ_FOREACH( entryp, &procp->p_aio_activeq, aio_proc_link ) { if ( aiocbp == entryp->uaiocbp ) { result = TRUE; goto ExitThisRoutine; } } - /* look for matches on our queue of asynchronous todo work */ - TAILQ_FOREACH( entryp, &aio_anchor.aio_async_workq, aio_workq_link ) { - if ( procp == entryp->procp && aiocbp == entryp->uaiocbp ) { - result = TRUE; - goto ExitThisRoutine; - } - } - - /* look for matches on our queue of synchronous todo work */ - TAILQ_FOREACH( entryp, &aio_anchor.lio_sync_workq, aio_workq_link ) { - if ( procp == entryp->procp && aiocbp == entryp->uaiocbp ) { - result = TRUE; - goto ExitThisRoutine; - } - } - ExitThisRoutine: return( result ); } /* is_already_queued */ +static void +free_lio_context(aio_lio_context* context) +{ + +#if DEBUG + OSDecrementAtomic(&lio_contexts_alloced); +#endif /* DEBUG */ + + FREE( context, M_TEMP ); + +} /* free_lio_context */ + + /* * aio initialization */ @@ -2208,26 +2571,28 @@ aio_init( void ) int i; aio_lock_grp_attr = lck_grp_attr_alloc_init(); - aio_lock_grp = lck_grp_alloc_init("aio", aio_lock_grp_attr); + aio_proc_lock_grp = lck_grp_alloc_init("aio_proc", aio_lock_grp_attr);; + aio_entry_lock_grp = lck_grp_alloc_init("aio_entry", aio_lock_grp_attr);; + aio_queue_lock_grp = lck_grp_alloc_init("aio_queue", aio_lock_grp_attr);; aio_lock_attr = lck_attr_alloc_init(); - aio_lock = lck_mtx_alloc_init(aio_lock_grp, aio_lock_attr); + lck_mtx_init(&aio_entry_mtx, aio_entry_lock_grp, aio_lock_attr); + lck_mtx_init(&aio_proc_mtx, aio_proc_lock_grp, aio_lock_attr); - AIO_LOCK; - TAILQ_INIT( &aio_anchor.aio_async_workq ); - TAILQ_INIT( &aio_anchor.lio_sync_workq ); - aio_anchor.aio_async_workq_count = 0; - aio_anchor.lio_sync_workq_count = 0; - aio_anchor.aio_active_count = 0; + aio_anchor.aio_inflight_count = 0; aio_anchor.aio_done_count = 0; - AIO_UNLOCK; + aio_anchor.aio_total_count = 0; + aio_anchor.aio_num_workqs = AIO_NUM_WORK_QUEUES; + + for (i = 0; i < AIO_NUM_WORK_QUEUES; i++) { + aio_workq_init(&aio_anchor.aio_async_workqs[i]); + } + i = sizeof( aio_workq_entry ); aio_workq_zonep = zinit( i, i * aio_max_requests, i * aio_max_requests, "aiowq" ); _aio_create_worker_threads( aio_worker_threads ); - - return; } /* aio_init */ @@ -2244,10 +2609,11 @@ _aio_create_worker_threads( int num ) for ( i = 0; i < num; i++ ) { thread_t myThread; - myThread = kernel_thread( kernel_task, aio_work_thread ); - if ( THREAD_NULL == myThread ) { + if ( KERN_SUCCESS != kernel_thread_start((thread_continue_t)aio_work_thread, NULL, &myThread) ) { printf( "%s - failed to create a work thread \n", __FUNCTION__ ); } + else + thread_deallocate(myThread); } return; @@ -2271,7 +2637,7 @@ get_aiotask(void) * aiocb (in our case that is a user_aiocb) */ static void -do_munge_aiocb( struct aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp ) +do_munge_aiocb_user32_to_user( struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp ) { the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes; the_user_aiocbp->aio_offset = my_aiocbp->aio_offset; @@ -2296,3 +2662,26 @@ do_munge_aiocb( struct aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp ) the_user_aiocbp->aio_sigevent.sigev_notify_attributes = CAST_USER_ADDR_T(my_aiocbp->aio_sigevent.sigev_notify_attributes); } + +/* Similar for 64-bit user process, so that we don't need to satisfy + * the alignment constraints of the original user64_aiocb + */ +static void +do_munge_aiocb_user64_to_user( struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp ) +{ + the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes; + the_user_aiocbp->aio_offset = my_aiocbp->aio_offset; + the_user_aiocbp->aio_buf = my_aiocbp->aio_buf; + the_user_aiocbp->aio_nbytes = my_aiocbp->aio_nbytes; + the_user_aiocbp->aio_reqprio = my_aiocbp->aio_reqprio; + the_user_aiocbp->aio_lio_opcode = my_aiocbp->aio_lio_opcode; + + the_user_aiocbp->aio_sigevent.sigev_notify = my_aiocbp->aio_sigevent.sigev_notify; + the_user_aiocbp->aio_sigevent.sigev_signo = my_aiocbp->aio_sigevent.sigev_signo; + the_user_aiocbp->aio_sigevent.sigev_value.size_equivalent.sival_int = + my_aiocbp->aio_sigevent.sigev_value.size_equivalent.sival_int; + the_user_aiocbp->aio_sigevent.sigev_notify_function = + my_aiocbp->aio_sigevent.sigev_notify_function; + the_user_aiocbp->aio_sigevent.sigev_notify_attributes = + my_aiocbp->aio_sigevent.sigev_notify_attributes; +}