]> git.saurik.com Git - apple/xnu.git/blobdiff - bsd/kern/kern_aio.c
xnu-3247.10.11.tar.gz
[apple/xnu.git] / bsd / kern / kern_aio.c
index 5586c7846356fc49c6c2132c0eea356ca62938c0..44c956e9bb63c65639db5a295db0064c393603db 100644 (file)
@@ -1,31 +1,29 @@
 /*
- * Copyright (c) 2003-2004 Apple Computer, Inc. All rights reserved.
+ * Copyright (c) 2003-2014 Apple Inc. All rights reserved.
  *
- * @APPLE_LICENSE_OSREFERENCE_HEADER_START@
+ * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
  * 
- * This file contains Original Code and/or Modifications of Original Code 
- * as defined in and that are subject to the Apple Public Source License 
- * Version 2.0 (the 'License'). You may not use this file except in 
- * compliance with the License.  The rights granted to you under the 
- * License may not be used to create, or enable the creation or 
- * redistribution of, unlawful or unlicensed copies of an Apple operating 
- * system, or to circumvent, violate, or enable the circumvention or 
- * violation of, any terms of an Apple operating system software license 
- * agreement.
- *
- * Please obtain a copy of the License at 
- * http://www.opensource.apple.com/apsl/ and read it before using this 
- * file.
- *
- * The Original Code and all software distributed under the License are 
- * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER 
- * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, 
- * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, 
- * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. 
- * Please see the License for the specific language governing rights and 
+ * This file contains Original Code and/or Modifications of Original Code
+ * as defined in and that are subject to the Apple Public Source License
+ * Version 2.0 (the 'License'). You may not use this file except in
+ * compliance with the License. The rights granted to you under the License
+ * may not be used to create, or enable the creation or redistribution of,
+ * unlawful or unlicensed copies of an Apple operating system, or to
+ * circumvent, violate, or enable the circumvention or violation of, any
+ * terms of an Apple operating system software license agreement.
+ * 
+ * Please obtain a copy of the License at
+ * http://www.opensource.apple.com/apsl/ and read it before using this file.
+ * 
+ * The Original Code and all software distributed under the License are
+ * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
+ * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
+ * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
+ * Please see the License for the specific language governing rights and
  * limitations under the License.
- *
- * @APPLE_LICENSE_OSREFERENCE_HEADER_END@
+ * 
+ * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
  */
 
 
 
 #include <mach/mach_types.h>
 #include <kern/kern_types.h>
+#include <kern/waitq.h>
 #include <kern/zalloc.h>
 #include <kern/task.h>
 #include <kern/sched_prim.h>
 
 #include <vm/vm_map.h>
 
+#include <libkern/OSAtomic.h>
+
 #include <sys/kdebug.h>
 #define AIO_work_queued                                        1
 #define AIO_worker_wake                                        2
  * user process calls aio_return or the process exits, either way that is our 
  * trigger to release aio resources. 
  */
+typedef struct aio_workq   {
+       TAILQ_HEAD(, aio_workq_entry)   aioq_entries;
+       int                             aioq_count;
+       lck_mtx_t                       aioq_mtx;
+       struct waitq                    aioq_waitq;
+} *aio_workq_t;
+
+#define AIO_NUM_WORK_QUEUES 1
 struct aio_anchor_cb
 {
-       int                                                                     aio_async_workq_count;  /* entries on aio_async_workq */
-       int                                                                     lio_sync_workq_count;   /* entries on lio_sync_workq */
-       int                                                                     aio_active_count;       /* entries on all active queues (proc.aio_activeq) */
-       int                                                                     aio_done_count;         /* entries on all done queues (proc.aio_doneq) */
-       TAILQ_HEAD( , aio_workq_entry )         aio_async_workq;
-       TAILQ_HEAD( , aio_workq_entry )         lio_sync_workq;
+       volatile int32_t        aio_inflight_count;     /* entries that have been taken from a workq */
+       volatile int32_t        aio_done_count;         /* entries on all done queues (proc.aio_doneq) */
+       volatile int32_t        aio_total_count;        /* total extant entries */
+       
+       /* Hash table of queues here */
+       int                     aio_num_workqs;
+       struct aio_workq        aio_async_workqs[AIO_NUM_WORK_QUEUES];
 };
 typedef struct aio_anchor_cb aio_anchor_cb;
 
+struct aio_lio_context
+{
+       int             io_waiter;
+       int             io_issued;
+       int             io_completed;
+};
+typedef struct aio_lio_context aio_lio_context;
+
 
 /*
  * Notes on aio sleep / wake channels.
@@ -137,88 +155,344 @@ typedef struct aio_anchor_cb aio_anchor_cb;
  * us sleep channels that currently do not collide with any other kernel routines.
  * At this time, for binary compatibility reasons, we cannot create new proc fields.
  */
-#define AIO_SUSPEND_SLEEP_CHAN  p_estcpu
-#define AIO_CLEANUP_SLEEP_CHAN         p_pctcpu
-
-
-/*
- * aysnc IO locking macros used to protect critical sections.
- */
-#define AIO_LOCK       lck_mtx_lock(aio_lock)
-#define AIO_UNLOCK     lck_mtx_unlock(aio_lock)
+#define AIO_SUSPEND_SLEEP_CHAN  p_aio_active_count
+#define AIO_CLEANUP_SLEEP_CHAN         p_aio_total_count
 
+#define ASSERT_AIO_FROM_PROC(aiop, theproc)    \
+       if ((aiop)->procp != (theproc)) {       \
+               panic("AIO on a proc list that does not belong to that proc.\n"); \
+       }
 
 /*
  *  LOCAL PROTOTYPES
  */
-static int                     aio_active_requests_for_process( struct proc *procp );
+static void            aio_proc_lock(proc_t procp);
+static void            aio_proc_lock_spin(proc_t procp);
+static void            aio_proc_unlock(proc_t procp);
+static lck_mtx_t*      aio_proc_mutex(proc_t procp);
+static void            aio_proc_move_done_locked(proc_t procp, aio_workq_entry *entryp);
+static void            aio_proc_remove_done_locked(proc_t procp, aio_workq_entry *entryp);
+static int             aio_get_process_count(proc_t procp );
+static int             aio_active_requests_for_process(proc_t procp );
+static int             aio_proc_active_requests_for_file(proc_t procp, int fd);
+static boolean_t       is_already_queued(proc_t procp, user_addr_t aiocbp );
+static boolean_t       should_cancel(aio_workq_entry *entryp, user_addr_t aiocbp, int fd);
+
+static void            aio_entry_lock(aio_workq_entry *entryp);
+static void            aio_entry_lock_spin(aio_workq_entry *entryp);
+static aio_workq_t     aio_entry_workq(aio_workq_entry *entryp);
+static lck_mtx_t*      aio_entry_mutex(__unused aio_workq_entry *entryp);
+static void            aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp);
+static void            aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp);
+static void            aio_entry_ref_locked(aio_workq_entry *entryp);
+static void            aio_entry_unref_locked(aio_workq_entry *entryp);
+static void            aio_entry_ref(aio_workq_entry *entryp);
+static void            aio_entry_unref(aio_workq_entry *entryp);
+static void            aio_entry_update_for_cancel(aio_workq_entry *entryp, boolean_t cancelled, 
+                                       int wait_for_completion, boolean_t disable_notification);
+static int             aio_entry_try_workq_remove(aio_workq_entry *entryp);
 static boolean_t       aio_delay_fsync_request( aio_workq_entry *entryp );
-static int                     aio_free_request( aio_workq_entry *entryp, vm_map_t the_map );
-static int                     aio_get_all_queues_count( void );
-static int                     aio_get_process_count( struct proc *procp );
-static aio_workq_entry *  aio_get_some_work( void );
-static boolean_t       aio_last_group_io( aio_workq_entry *entryp );
-static void                    aio_mark_requests( aio_workq_entry *entryp );
-static int                     aio_queue_async_request( struct proc *procp, 
-                                                                                        user_addr_t aiocbp,
-                                                                                        int kindOfIO );
-static int                     aio_validate( aio_workq_entry *entryp );
-static void                    aio_work_thread( void );
-static int                     do_aio_cancel(  struct proc *p, 
-                                                                       int fd, 
-                                                                       user_addr_t aiocbp, 
-                                                                       boolean_t wait_for_completion,
-                                                                       boolean_t disable_notification );
-static void                    do_aio_completion( aio_workq_entry *entryp );
-static int                     do_aio_fsync( aio_workq_entry *entryp );
-static int                     do_aio_read( aio_workq_entry *entryp );
-static int                     do_aio_write( aio_workq_entry *entryp );
-static void            do_munge_aiocb( struct aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp );
-static boolean_t       is_already_queued(      struct proc *procp, 
-                                                                               user_addr_t aiocbp );
-static int                     lio_create_async_entry( struct proc *procp, 
-                                                                                        user_addr_t aiocbp, 
-                                                                                        user_addr_t sigp, 
-                                                                                        long group_tag,
-                                                                                        aio_workq_entry **entrypp );
-static int                     lio_create_sync_entry( struct proc *procp, 
-                                                                                       user_addr_t aiocbp, 
-                                                                                       long group_tag,
-                                                                                       aio_workq_entry **entrypp );
-
+static int             aio_free_request(aio_workq_entry *entryp);
+
+static void            aio_workq_init(aio_workq_t wq);
+static void            aio_workq_lock_spin(aio_workq_t wq);
+static void            aio_workq_unlock(aio_workq_t wq);
+static lck_mtx_t*      aio_workq_mutex(aio_workq_t wq);
+
+static void            aio_work_thread( void );
+static aio_workq_entry *aio_get_some_work( void );
+
+static int             aio_get_all_queues_count( void );
+static int             aio_queue_async_request(proc_t procp, user_addr_t aiocbp, int kindOfIO );
+static int             aio_validate( aio_workq_entry *entryp );
+static int             aio_increment_total_count(void);
+static int             aio_decrement_total_count(void);
+
+static int             do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp, int wait_for_completion, boolean_t disable_notification );
+static void            do_aio_completion( aio_workq_entry *entryp );
+static int             do_aio_fsync( aio_workq_entry *entryp );
+static int             do_aio_read( aio_workq_entry *entryp );
+static int             do_aio_write( aio_workq_entry *entryp );
+static void            do_munge_aiocb_user32_to_user( struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp );
+static void            do_munge_aiocb_user64_to_user( struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp );
+static int     lio_create_entry(proc_t procp, 
+                                        user_addr_t aiocbp, 
+                                        void *group_tag,
+                                        aio_workq_entry **entrypp );
+static aio_workq_entry *aio_create_queue_entry(proc_t procp,
+                                       user_addr_t aiocbp,
+                                       void *group_tag,
+                                       int kindOfIO);
+static user_addr_t *aio_copy_in_list(proc_t procp, user_addr_t aiocblist, int nent);
+static void            free_lio_context(aio_lio_context* context);
+static void            aio_enqueue_work( proc_t procp, aio_workq_entry *entryp, int proc_locked);
+
+#define ASSERT_AIO_PROC_LOCK_OWNED(p)  lck_mtx_assert(aio_proc_mutex((p)), LCK_MTX_ASSERT_OWNED)
+#define ASSERT_AIO_WORKQ_LOCK_OWNED(q) lck_mtx_assert(aio_workq_mutex((q)), LCK_MTX_ASSERT_OWNED)
+#define ASSERT_AIO_ENTRY_LOCK_OWNED(e) lck_mtx_assert(aio_entry_mutex((e)), LCK_MTX_ASSERT_OWNED)
 
 /*
  *  EXTERNAL PROTOTYPES
  */
 
 /* in ...bsd/kern/sys_generic.c */
-extern int                     dofileread( struct proc *p, struct fileproc *fp, int fd, 
-                                                               user_addr_t bufp, user_size_t nbyte, 
-                                                               off_t offset, int flags, user_ssize_t *retval );
-extern int                     dofilewrite( struct proc *p, struct fileproc *fp, int fd, 
-                                                                user_addr_t bufp, user_size_t nbyte, off_t offset, 
-                                                                int flags, user_ssize_t *retval );
+extern int dofileread(vfs_context_t ctx, struct fileproc *fp,
+                       user_addr_t bufp, user_size_t nbyte, 
+                       off_t offset, int flags, user_ssize_t *retval );
+extern int dofilewrite(vfs_context_t ctx, struct fileproc *fp,
+                        user_addr_t bufp, user_size_t nbyte, off_t offset, 
+                        int flags, user_ssize_t *retval );
+#if DEBUG
+static uint32_t                         lio_contexts_alloced = 0; 
+#endif  /* DEBUG */
 
 /*
  * aio external global variables.
  */
-extern int aio_max_requests;                           /* AIO_MAX - configurable */
+extern int aio_max_requests;                   /* AIO_MAX - configurable */
 extern int aio_max_requests_per_process;       /* AIO_PROCESS_MAX - configurable */
-extern int aio_worker_threads;                         /* AIO_THREAD_COUNT - configurable */
+extern int aio_worker_threads;                 /* AIO_THREAD_COUNT - configurable */
 
 
 /*
  * aio static variables.
  */
-static aio_anchor_cb           aio_anchor;
-static lck_mtx_t *             aio_lock;
-static lck_grp_t *             aio_lock_grp;
-static lck_attr_t *            aio_lock_attr;
-static lck_grp_attr_t *        aio_lock_grp_attr;
-static struct zone             *aio_workq_zonep;
+static aio_anchor_cb   aio_anchor;
+static lck_grp_t       *aio_proc_lock_grp;
+static lck_grp_t       *aio_entry_lock_grp;
+static lck_grp_t       *aio_queue_lock_grp;
+static lck_attr_t      *aio_lock_attr;
+static lck_grp_attr_t  *aio_lock_grp_attr;
+static struct zone     *aio_workq_zonep;
+static lck_mtx_t       aio_entry_mtx;
+static lck_mtx_t       aio_proc_mtx;
+
+static void
+aio_entry_lock(__unused aio_workq_entry *entryp)
+{
+       lck_mtx_lock(&aio_entry_mtx);
+}
+
+static void            
+aio_entry_lock_spin(__unused aio_workq_entry *entryp)
+{
+       lck_mtx_lock_spin(&aio_entry_mtx);
+}
+
+static void    
+aio_entry_unlock(__unused aio_workq_entry *entryp)
+{
+       lck_mtx_unlock(&aio_entry_mtx);
+}
+
+/* Hash */
+static aio_workq_t
+aio_entry_workq(__unused aio_workq_entry *entryp) 
+{
+       return &aio_anchor.aio_async_workqs[0];
+}
+
+static lck_mtx_t*
+aio_entry_mutex(__unused aio_workq_entry *entryp) 
+{
+       return &aio_entry_mtx;
+}
+
+static void 
+aio_workq_init(aio_workq_t wq)
+{
+       TAILQ_INIT(&wq->aioq_entries);
+       wq->aioq_count = 0;
+       lck_mtx_init(&wq->aioq_mtx, aio_queue_lock_grp, aio_lock_attr);
+       waitq_init(&wq->aioq_waitq, SYNC_POLICY_FIFO|SYNC_POLICY_DISABLE_IRQ);
+}
+
+
+/* 
+ * Can be passed a queue which is locked spin.
+ */
+static void            
+aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp)
+{
+       ASSERT_AIO_WORKQ_LOCK_OWNED(queue);
+
+       if (entryp->aio_workq_link.tqe_prev == NULL) {
+               panic("Trying to remove an entry from a work queue, but it is not on a queue\n");
+       }
+       
+       TAILQ_REMOVE(&queue->aioq_entries, entryp, aio_workq_link);
+       queue->aioq_count--;
+       entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */
+       
+       if (queue->aioq_count  < 0) {
+               panic("Negative count on a queue.\n");
+       }
+}
+
+static void            
+aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp)
+{
+       ASSERT_AIO_WORKQ_LOCK_OWNED(queue);
+
+       TAILQ_INSERT_TAIL(&queue->aioq_entries, entryp, aio_workq_link);
+       if (queue->aioq_count  < 0) {
+               panic("Negative count on a queue.\n");
+       }
+       queue->aioq_count++;
+}
+
+static void            
+aio_proc_lock(proc_t procp) 
+{
+       lck_mtx_lock(aio_proc_mutex(procp));
+}
+
+static void            
+aio_proc_lock_spin(proc_t procp)
+{
+       lck_mtx_lock_spin(aio_proc_mutex(procp));
+}
+
+static void
+aio_proc_move_done_locked(proc_t procp, aio_workq_entry *entryp)
+{
+       ASSERT_AIO_PROC_LOCK_OWNED(procp);
+
+       TAILQ_REMOVE(&procp->p_aio_activeq, entryp, aio_proc_link );
+       TAILQ_INSERT_TAIL( &procp->p_aio_doneq, entryp, aio_proc_link);
+       procp->p_aio_active_count--;
+       OSIncrementAtomic(&aio_anchor.aio_done_count);
+}
 
+static void
+aio_proc_remove_done_locked(proc_t procp, aio_workq_entry *entryp)
+{
+       TAILQ_REMOVE(&procp->p_aio_doneq, entryp, aio_proc_link);
+       OSDecrementAtomic(&aio_anchor.aio_done_count);
+       aio_decrement_total_count();
+       procp->p_aio_total_count--;
+}
+
+static void            
+aio_proc_unlock(proc_t procp)
+{
+       lck_mtx_unlock(aio_proc_mutex(procp));
+}
+
+static lck_mtx_t*
+aio_proc_mutex(proc_t procp)
+{
+       return &procp->p_mlock;
+}
+
+static void            
+aio_entry_ref_locked(aio_workq_entry *entryp)
+{
+       ASSERT_AIO_ENTRY_LOCK_OWNED(entryp);
+
+       if (entryp->aio_refcount < 0) {
+               panic("AIO workq entry with a negative refcount.\n");
+       }
+       entryp->aio_refcount++;
+}
+
+
+/* Return 1 if you've freed it */
+static void
+aio_entry_unref_locked(aio_workq_entry *entryp)
+{
+       ASSERT_AIO_ENTRY_LOCK_OWNED(entryp);
+
+       entryp->aio_refcount--;
+       if (entryp->aio_refcount < 0) {
+               panic("AIO workq entry with a negative refcount.\n");
+       }
+}
+
+static void    
+aio_entry_ref(aio_workq_entry *entryp)
+{
+       aio_entry_lock_spin(entryp);
+       aio_entry_ref_locked(entryp);
+       aio_entry_unlock(entryp);
+}
+static void            
+aio_entry_unref(aio_workq_entry *entryp)
+{
+       aio_entry_lock_spin(entryp);
+       aio_entry_unref_locked(entryp);
+
+       if ((entryp->aio_refcount == 0) && ((entryp->flags & AIO_DO_FREE) != 0)) {
+               aio_entry_unlock(entryp);
+               aio_free_request(entryp);
+       } else {
+               aio_entry_unlock(entryp);
+       }
+       
+       return;
+}
+
+static void            
+aio_entry_update_for_cancel(aio_workq_entry *entryp, boolean_t cancelled, int wait_for_completion, boolean_t disable_notification)
+{
+       aio_entry_lock_spin(entryp);
 
+       if (cancelled) {
+               aio_entry_ref_locked(entryp);
+               entryp->errorval = ECANCELED;
+               entryp->returnval = -1;
+       }
+       
+       if ( wait_for_completion ) {
+               entryp->flags |= wait_for_completion; /* flag for special completion processing */
+       }
+       
+       if ( disable_notification ) { 
+               entryp->flags |= AIO_DISABLE; /* Don't want a signal */
+       }
+
+       aio_entry_unlock(entryp); 
+}
+
+static int
+aio_entry_try_workq_remove(aio_workq_entry *entryp)
+{      
+       /* Can only be cancelled if it's still on a work queue */
+       if (entryp->aio_workq_link.tqe_prev != NULL) {
+               aio_workq_t queue;
+
+               /* Will have to check again under the lock */
+               queue = aio_entry_workq(entryp);
+               aio_workq_lock_spin(queue);
+               if (entryp->aio_workq_link.tqe_prev != NULL) {
+                       aio_workq_remove_entry_locked(queue, entryp);
+                       aio_workq_unlock(queue);
+                       return 1;
+               }  else {
+                       aio_workq_unlock(queue);
+               }
+       }
+
+       return 0;
+}
+
+static void            
+aio_workq_lock_spin(aio_workq_t wq)
+{
+       lck_mtx_lock_spin(aio_workq_mutex(wq));
+}
+
+static void            
+aio_workq_unlock(aio_workq_t wq)
+{
+       lck_mtx_unlock(aio_workq_mutex(wq));
+}
 
+static lck_mtx_t*
+aio_workq_mutex(aio_workq_t wq)
+{
+       return &wq->aioq_mtx;
+}
 
 /*
  * aio_cancel - attempt to cancel one or more async IO requests currently
@@ -227,9 +501,8 @@ static struct zone                  *aio_workq_zonep;
  * is NULL then all outstanding async IO request for the given file
  * descriptor are cancelled (if possible).
  */
-
 int
-aio_cancel( struct proc *p, struct aio_cancel_args *uap, int *retval )
+aio_cancel(proc_t p, struct aio_cancel_args *uap, int *retval )
 {
        struct user_aiocb               my_aiocb;
        int                                                     result;
@@ -238,24 +511,28 @@ aio_cancel( struct proc *p, struct aio_cancel_args *uap, int *retval )
                          (int)p, (int)uap->aiocbp, 0, 0, 0 );
 
        /* quick check to see if there are any async IO requests queued up */
-       AIO_LOCK;
-       result = aio_get_all_queues_count( );
-       AIO_UNLOCK;
-       if ( result < 1 ) {
-               result = EBADF;
+       if (aio_get_all_queues_count() < 1) {
+               result = 0;
+               *retval = AIO_ALLDONE;
                goto ExitRoutine;
        }
        
        *retval = -1; 
        if ( uap->aiocbp != USER_ADDR_NULL ) {
-               if ( !IS_64BIT_PROCESS(p) ) {
-                       struct aiocb aiocb32;
+               if ( proc_is64bit(p) ) {
+                       struct user64_aiocb aiocb64;
+                       
+                       result = copyin( uap->aiocbp, &aiocb64, sizeof(aiocb64) );
+                       if (result == 0 )
+                               do_munge_aiocb_user64_to_user(&aiocb64, &my_aiocb);
+
+               } else {
+                       struct user32_aiocb aiocb32;
 
                        result = copyin( uap->aiocbp, &aiocb32, sizeof(aiocb32) );
                        if ( result == 0 )
-                               do_munge_aiocb( &aiocb32, &my_aiocb );
-               } else
-                       result = copyin( uap->aiocbp, &my_aiocb, sizeof(my_aiocb) );
+                               do_munge_aiocb_user32_to_user( &aiocb32, &my_aiocb );
+               }
 
                if ( result != 0 ) {
                        result = EAGAIN; 
@@ -271,7 +548,11 @@ aio_cancel( struct proc *p, struct aio_cancel_args *uap, int *retval )
                        goto ExitRoutine;
                }
        }
-       result = do_aio_cancel( p, uap->fd, uap->aiocbp, FALSE, FALSE );
+
+       aio_proc_lock(p);
+       result = do_aio_cancel_locked( p, uap->fd, uap->aiocbp, 0, FALSE );
+       ASSERT_AIO_PROC_LOCK_OWNED(p);
+       aio_proc_unlock(p);
 
        if ( result != -1 ) {
                *retval = result;
@@ -295,24 +576,23 @@ ExitRoutine:
  * a file descriptor that is closing.  
  * THIS MAY BLOCK.
  */
-
 __private_extern__ void
-_aio_close( struct proc *p, int fd )
+_aio_close(proc_t p, int fd )
 {
-       int                     error, count;
+       int                     error;
 
        /* quick check to see if there are any async IO requests queued up */
-       AIO_LOCK;
-       count = aio_get_all_queues_count( );
-       AIO_UNLOCK;
-       if ( count < 1 )
+       if (aio_get_all_queues_count() < 1) {
                return;
+       }
 
        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_close)) | DBG_FUNC_START,
                          (int)p, fd, 0, 0, 0 );
        
        /* cancel all async IO requests on our todo queues for this file descriptor */
-       error = do_aio_cancel( p, fd, 0, TRUE, FALSE );
+       aio_proc_lock(p);
+       error = do_aio_cancel_locked( p, fd, 0, AIO_CLOSE_WAIT, FALSE );
+       ASSERT_AIO_PROC_LOCK_OWNED(p);
        if ( error == AIO_NOTCANCELED ) {
                /* 
                 * AIO_NOTCANCELED is returned when we find an aio request for this process 
@@ -326,9 +606,14 @@ _aio_close( struct proc *p, int fd )
                KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_close_sleep)) | DBG_FUNC_NONE,
                                  (int)p, fd, 0, 0, 0 );
 
-               tsleep( &p->AIO_CLEANUP_SLEEP_CHAN, PRIBIO, "aio_close", 0 );
-       }
+               while (aio_proc_active_requests_for_file(p, fd) > 0) {
+                       msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_close", 0 );
+               }
 
+       }
+       
+       aio_proc_unlock(p);
+       
        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_close)) | DBG_FUNC_END,
                          (int)p, fd, 0, 0, 0 );
 
@@ -343,9 +628,8 @@ _aio_close( struct proc *p, int fd )
  * value that would be set by the corresponding IO request (read, wrtie,
  * fdatasync, or sync).
  */
-
 int
-aio_error( struct proc *p, struct aio_error_args *uap, int *retval )
+aio_error(proc_t p, struct aio_error_args *uap, int *retval )
 {
        aio_workq_entry                         *entryp;
        int                                                     error;
@@ -353,19 +637,22 @@ aio_error( struct proc *p, struct aio_error_args *uap, int *retval )
        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error)) | DBG_FUNC_START,
                          (int)p, (int)uap->aiocbp, 0, 0, 0 );
 
-       AIO_LOCK;
-
-       /* quick check to see if there are any async IO requests queued up */
-       if ( aio_get_all_queues_count( ) < 1 ) {
-               error = EINVAL;
-               goto ExitRoutine;
+       /* see if there are any aios to check */
+       if (aio_get_all_queues_count() < 1) {
+               return EINVAL;
        }
        
+       aio_proc_lock(p);
+       
        /* look for a match on our queue of async IO requests that have completed */
-       TAILQ_FOREACH( entryp, &p->aio_doneq, aio_workq_link ) {
+       TAILQ_FOREACH( entryp, &p->p_aio_doneq, aio_proc_link) {
                if ( entryp->uaiocbp == uap->aiocbp ) {
+                       ASSERT_AIO_FROM_PROC(entryp, p);
+
+                       aio_entry_lock_spin(entryp);
                        *retval = entryp->errorval;
                        error = 0;
+                       aio_entry_unlock(entryp);
                        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error_val)) | DBG_FUNC_NONE,
                                           (int)p, (int)uap->aiocbp, *retval, 0, 0 );
                        goto ExitRoutine;
@@ -373,8 +660,9 @@ aio_error( struct proc *p, struct aio_error_args *uap, int *retval )
        }
        
        /* look for a match on our queue of active async IO requests */
-       TAILQ_FOREACH( entryp, &p->aio_activeq, aio_workq_link ) {
+       TAILQ_FOREACH( entryp, &p->p_aio_activeq, aio_proc_link) {
                if ( entryp->uaiocbp == uap->aiocbp ) {
+                       ASSERT_AIO_FROM_PROC(entryp, p);
                        *retval = EINPROGRESS;
                        error = 0;
                        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error_activeq)) | DBG_FUNC_NONE,
@@ -382,23 +670,13 @@ aio_error( struct proc *p, struct aio_error_args *uap, int *retval )
                        goto ExitRoutine;
                }
        }
-       
-       /* look for a match on our queue of todo work */
-       TAILQ_FOREACH( entryp, &aio_anchor.aio_async_workq, aio_workq_link ) {
-               if ( p == entryp->procp && entryp->uaiocbp == uap->aiocbp ) {
-                       *retval = EINPROGRESS;
-                       error = 0;
-                       KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error_workq)) | DBG_FUNC_NONE,
-                                          (int)p, (int)uap->aiocbp, *retval, 0, 0 );
-                       goto ExitRoutine;
-               }
-       }
+
        error = EINVAL;
        
 ExitRoutine:
        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_error)) | DBG_FUNC_END,
                          (int)p, (int)uap->aiocbp, error, 0, 0 );
-       AIO_UNLOCK;
+       aio_proc_unlock(p);
 
        return( error );
 
@@ -412,9 +690,8 @@ ExitRoutine:
  * NOTE - we do not support op O_DSYNC at this point since we do not support the 
  * fdatasync() call.
  */
-
 int
-aio_fsync( struct proc *p, struct aio_fsync_args *uap, int *retval )
+aio_fsync(proc_t p, struct aio_fsync_args *uap, int *retval )
 {
        int                     error;
        int                     fsync_kind;
@@ -426,10 +703,8 @@ aio_fsync( struct proc *p, struct aio_fsync_args *uap, int *retval )
        /* 0 := O_SYNC for binary backward compatibility with Panther */
        if (uap->op == O_SYNC || uap->op == 0)
                fsync_kind = AIO_FSYNC;
-#if 0 // we don't support fdatasync() call yet
        else if ( uap->op == O_DSYNC )
                fsync_kind = AIO_DSYNC;
-#endif
        else {
                *retval = -1;
                error = EINVAL;
@@ -453,9 +728,8 @@ ExitRoutine:
  * file descriptor (uap->aiocbp->aio_fildes) into the buffer 
  * (uap->aiocbp->aio_buf).
  */
-
 int
-aio_read( struct proc *p, struct aio_read_args *uap, int *retval )
+aio_read(proc_t p, struct aio_read_args *uap, int *retval )
 {
        int                     error;
 
@@ -479,54 +753,57 @@ aio_read( struct proc *p, struct aio_read_args *uap, int *retval )
 /*
  * aio_return - return the return status associated with the async IO
  * request referred to by uap->aiocbp.  The return status is the value
- * that would be returned by corresponding IO request (read, wrtie,
+ * that would be returned by corresponding IO request (read, write,
  * fdatasync, or sync).  This is where we release kernel resources 
  * held for async IO call associated with the given aiocb pointer.
  */
-
 int
-aio_return( struct proc *p, struct aio_return_args *uap, user_ssize_t *retval )
+aio_return(proc_t p, struct aio_return_args *uap, user_ssize_t *retval )
 {
        aio_workq_entry                         *entryp;
        int                                                     error;
-       boolean_t                                       lock_held;
+       boolean_t                                       proc_lock_held = FALSE;
        
        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return)) | DBG_FUNC_START,
                          (int)p, (int)uap->aiocbp, 0, 0, 0 );
 
-       AIO_LOCK;
-       lock_held = TRUE;
-       *retval = 0;
-       
-       /* quick check to see if there are any async IO requests queued up */
-       if ( aio_get_all_queues_count( ) < 1 ) {
+       /* See if there are any entries to check */
+       if (aio_get_all_queues_count() < 1) {
                error = EINVAL;
                goto ExitRoutine;
        }
 
+       aio_proc_lock(p);
+       proc_lock_held = TRUE;
+       *retval = 0;
+
        /* look for a match on our queue of async IO requests that have completed */
-       TAILQ_FOREACH( entryp, &p->aio_doneq, aio_workq_link ) {
+       TAILQ_FOREACH( entryp, &p->p_aio_doneq, aio_proc_link) {
+               ASSERT_AIO_FROM_PROC(entryp, p);
                if ( entryp->uaiocbp == uap->aiocbp ) {
-                       TAILQ_REMOVE( &p->aio_doneq, entryp, aio_workq_link );
-                       aio_anchor.aio_done_count--;
-                       p->aio_done_count--;
+                       /* Done and valid for aio_return(), pull it off the list */
+                       aio_proc_remove_done_locked(p, entryp);
                        
+                       /* Drop the proc lock, but keep the entry locked */
+                       aio_entry_lock(entryp);
+                       aio_proc_unlock(p);
+                       proc_lock_held = FALSE;
+
                        *retval = entryp->returnval;
+                       error = 0;
 
-                       /* we cannot free requests that are still completing */
-                       if ( (entryp->flags & AIO_COMPLETION) == 0 ) {
-                               vm_map_t                my_map;
-                       
-                               my_map = entryp->aio_map;
-                               entryp->aio_map = VM_MAP_NULL;
-                               AIO_UNLOCK;
-                               lock_held = FALSE;
-                               aio_free_request( entryp, my_map );
+                       /* No references and off all lists, safe to free */
+                       if (entryp->aio_refcount == 0) {
+                               aio_entry_unlock(entryp);
+                               aio_free_request(entryp);
                        }
-                       else
-                               /* tell completion code to free this request */
+                       else {
+                               /* Whoever has the refcount will have to free it */
                                entryp->flags |= AIO_DO_FREE;
-                       error = 0;
+                               aio_entry_unlock(entryp);
+                       }
+
+
                        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return_val)) | DBG_FUNC_NONE,
                                           (int)p, (int)uap->aiocbp, *retval, 0, 0 );
                        goto ExitRoutine;
@@ -534,7 +811,8 @@ aio_return( struct proc *p, struct aio_return_args *uap, user_ssize_t *retval )
        }
        
        /* look for a match on our queue of active async IO requests */
-       TAILQ_FOREACH( entryp, &p->aio_activeq, aio_workq_link ) {
+       TAILQ_FOREACH( entryp, &p->p_aio_activeq, aio_proc_link) {
+               ASSERT_AIO_FROM_PROC(entryp, p);
                if ( entryp->uaiocbp == uap->aiocbp ) {
                        error = EINPROGRESS;
                        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return_activeq)) | DBG_FUNC_NONE,
@@ -543,20 +821,11 @@ aio_return( struct proc *p, struct aio_return_args *uap, user_ssize_t *retval )
                }
        }
        
-       /* look for a match on our queue of todo work */
-       TAILQ_FOREACH( entryp, &aio_anchor.aio_async_workq, aio_workq_link ) {
-               if ( p == entryp->procp && entryp->uaiocbp == uap->aiocbp ) {
-                       error = EINPROGRESS;
-                       KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return_workq)) | DBG_FUNC_NONE,
-                                          (int)p, (int)uap->aiocbp, *retval, 0, 0 );
-                       goto ExitRoutine;
-               }
-       }
        error = EINVAL;
        
 ExitRoutine:
-       if ( lock_held )
-               AIO_UNLOCK;
+       if (proc_lock_held)
+               aio_proc_unlock(p);
        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_return)) | DBG_FUNC_END,
                          (int)p, (int)uap->aiocbp, error, 0, 0 );
 
@@ -572,9 +841,8 @@ ExitRoutine:
  * for cancelled or active aio requests that complete. 
  * This routine MAY block!
  */
-
 __private_extern__ void
-_aio_exec( struct proc *p )
+_aio_exec(proc_t p )
 {
 
        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_exec)) | DBG_FUNC_START,
@@ -596,29 +864,29 @@ _aio_exec( struct proc *p )
  * we can and wait for those already active.  We also disable signaling
  * for cancelled or active aio requests that complete.  This routine MAY block!
  */
-
 __private_extern__ void
-_aio_exit( struct proc *p )
+_aio_exit(proc_t p )
 {
-       int                                             error, count;
+       int                                             error;
        aio_workq_entry                 *entryp;
 
+
        /* quick check to see if there are any async IO requests queued up */
-       AIO_LOCK;
-       count = aio_get_all_queues_count( );
-       AIO_UNLOCK;
-       if ( count < 1 ) {
+       if (aio_get_all_queues_count() < 1) {
                return;
        }
 
        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_exit)) | DBG_FUNC_START,
                          (int)p, 0, 0, 0, 0 );
 
+       aio_proc_lock(p);
+
        /* 
         * cancel async IO requests on the todo work queue and wait for those  
         * already active to complete. 
         */
-       error = do_aio_cancel( p, 0, 0, TRUE, TRUE );
+       error = do_aio_cancel_locked( p, 0, 0, AIO_EXIT_WAIT, TRUE );
+       ASSERT_AIO_PROC_LOCK_OWNED(p);
        if ( error == AIO_NOTCANCELED ) {
                /* 
                 * AIO_NOTCANCELED is returned when we find an aio request for this process 
@@ -632,52 +900,69 @@ _aio_exit( struct proc *p )
                KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_exit_sleep)) | DBG_FUNC_NONE,
                                  (int)p, 0, 0, 0, 0 );
 
-               tsleep( &p->AIO_CLEANUP_SLEEP_CHAN, PRIBIO, "aio_exit", 0 );
+               while (p->p_aio_active_count != 0) {
+                       msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_exit", 0 );
+               }
+       }
+               
+       if (p->p_aio_active_count != 0) {
+               panic("Exiting process has %d active AIOs after cancellation has completed.\n", p->p_aio_active_count);
        }
        
        /* release all aio resources used by this process */
-       AIO_LOCK;
-       entryp = TAILQ_FIRST( &p->aio_doneq );
+       entryp = TAILQ_FIRST( &p->p_aio_doneq );
        while ( entryp != NULL ) {
+               ASSERT_AIO_FROM_PROC(entryp, p);
                aio_workq_entry                 *next_entryp;
                        
-               next_entryp = TAILQ_NEXT( entryp, aio_workq_link );
-               TAILQ_REMOVE( &p->aio_doneq, entryp, aio_workq_link );
-               aio_anchor.aio_done_count--;
-               p->aio_done_count--;
+               next_entryp = TAILQ_NEXT( entryp, aio_proc_link);
+               aio_proc_remove_done_locked(p, entryp);
                        
                /* we cannot free requests that are still completing */
-               if ( (entryp->flags & AIO_COMPLETION) == 0 ) {
-                       vm_map_t                my_map;
-                       
-                       my_map = entryp->aio_map;
-                       entryp->aio_map = VM_MAP_NULL;
-                       AIO_UNLOCK;
-                       aio_free_request( entryp, my_map );
+               aio_entry_lock_spin(entryp);
+               if (entryp->aio_refcount == 0) {
+                       aio_proc_unlock(p);
+                       aio_entry_unlock(entryp);
+                       aio_free_request(entryp);
 
                        /* need to start over since aio_doneq may have been */
                        /* changed while we were away.  */
-                       AIO_LOCK;
-                       entryp = TAILQ_FIRST( &p->aio_doneq );
+                       aio_proc_lock(p);
+                       entryp = TAILQ_FIRST( &p->p_aio_doneq );
                        continue;
                }
-               else
-                       /* tell completion code to free this request */
+               else {
+                       /* whoever has the reference will have to do the free */
                        entryp->flags |= AIO_DO_FREE;
+               } 
+
+               aio_entry_unlock(entryp);
                entryp = next_entryp;
        }
-       AIO_UNLOCK;
-
+       
+       aio_proc_unlock(p);
+       
        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_exit)) | DBG_FUNC_END,
                          (int)p, 0, 0, 0, 0 );
-
        return;
        
 } /* _aio_exit */
 
 
+static boolean_t
+should_cancel(aio_workq_entry *entryp, user_addr_t aiocbp, int fd) 
+{
+       if ( (aiocbp == USER_ADDR_NULL && fd == 0) ||
+                       (aiocbp != USER_ADDR_NULL && entryp->uaiocbp == aiocbp) ||
+                       (aiocbp == USER_ADDR_NULL && fd == entryp->aiocb.aio_fildes) ) {
+               return TRUE;
+       }
+
+       return FALSE;
+}
+
 /*
- * do_aio_cancel - cancel async IO requests (if possible).  We get called by
+ * do_aio_cancel_locked - cancel async IO requests (if possible).  We get called by
  * aio_cancel, close, and at exit.  
  * There are three modes of operation: 1) cancel all async IOs for a process - 
  * fd is 0 and aiocbp is NULL 2) cancel all async IOs for file descriptor - fd 
@@ -689,167 +974,113 @@ _aio_exit( struct proc *p )
  * were already complete.
  * WARNING - do not deference aiocbp in this routine, it may point to user 
  * land data that has not been copied in (when called from aio_cancel() )
+ *
+ * Called with proc locked, and returns the same way.
  */
-
 static int
-do_aio_cancel(         struct proc *p, int fd, user_addr_t aiocbp, 
-                               boolean_t wait_for_completion, boolean_t disable_notification )
+do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp, 
+       int wait_for_completion, boolean_t disable_notification )
 {
+       ASSERT_AIO_PROC_LOCK_OWNED(p);
+
        aio_workq_entry                 *entryp;
        int                                             result;
 
        result = -1;
                
        /* look for a match on our queue of async todo work. */
-       AIO_LOCK;
-       entryp = TAILQ_FIRST( &aio_anchor.aio_async_workq );
+       entryp = TAILQ_FIRST(&p->p_aio_activeq);
        while ( entryp != NULL ) {
+               ASSERT_AIO_FROM_PROC(entryp, p);
                aio_workq_entry                 *next_entryp;
-               
-               next_entryp = TAILQ_NEXT( entryp, aio_workq_link );
-               if ( p == entryp->procp ) {
-                       if ( (aiocbp == USER_ADDR_NULL && fd == 0) ||
-                                (aiocbp != USER_ADDR_NULL && entryp->uaiocbp == aiocbp) ||
-                                (aiocbp == USER_ADDR_NULL && fd == entryp->aiocb.aio_fildes) ) {
-                               /* we found a match so we remove the entry from the */
-                               /* todo work queue and place it on the done queue */
-                               TAILQ_REMOVE( &aio_anchor.aio_async_workq, entryp, aio_workq_link );
-                               aio_anchor.aio_async_workq_count--;
-                               entryp->errorval = ECANCELED;
-                               entryp->returnval = -1;
-                               if ( disable_notification )
-                                       entryp->flags |= AIO_DISABLE; /* flag for special completion processing */
-                               result = AIO_CANCELED;
-
-                               KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_async_workq)) | DBG_FUNC_NONE,
-                                                         (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 );
-
-                               TAILQ_INSERT_TAIL( &p->aio_doneq, entryp, aio_workq_link );
-                               aio_anchor.aio_done_count++;
-                               p->aio_done_count++;
-                               entryp->flags |= AIO_COMPLETION;
-                               AIO_UNLOCK;
-                               
-                               /* do completion processing for this request */
-                               do_aio_completion( entryp );
-                       
-                               AIO_LOCK;
-                               entryp->flags &= ~AIO_COMPLETION;
-                               if ( (entryp->flags & AIO_DO_FREE) != 0 ) {
-                                       vm_map_t                my_map;
-                                       
-                                       my_map = entryp->aio_map;
-                                       entryp->aio_map = VM_MAP_NULL;
-                                       AIO_UNLOCK;
-                                       aio_free_request( entryp, my_map );
-                               }
-                               else
-                                       AIO_UNLOCK;
 
-                               if ( aiocbp != USER_ADDR_NULL ) {
-                                       return( result );
-                               }
-                               
-                               /* need to start over since aio_async_workq may have been */
-                               /* changed while we were away doing completion processing.  */
-                               AIO_LOCK;
-                               entryp = TAILQ_FIRST( &aio_anchor.aio_async_workq );
-                               continue;
-                       }
+               next_entryp = TAILQ_NEXT( entryp, aio_proc_link);
+               if (!should_cancel(entryp, aiocbp, fd)) {
+                       entryp = next_entryp;
+                       continue;
                }
-               entryp = next_entryp;
-       } /* while... */
-               
-       /* 
-        * look for a match on our queue of synchronous todo work.  This will 
-        * be a rare occurrence but could happen if a process is terminated while 
-        * processing a lio_listio call. 
-        */
-       entryp = TAILQ_FIRST( &aio_anchor.lio_sync_workq );
-       while ( entryp != NULL ) {
-               aio_workq_entry                 *next_entryp;
-               
-               next_entryp = TAILQ_NEXT( entryp, aio_workq_link );
-               if ( p == entryp->procp ) {
-                       if ( (aiocbp == USER_ADDR_NULL && fd == 0) ||
-                                (aiocbp != USER_ADDR_NULL && entryp->uaiocbp == aiocbp) ||
-                                (aiocbp == USER_ADDR_NULL && fd == entryp->aiocb.aio_fildes) ) {
-                               /* we found a match so we remove the entry from the */
-                               /* todo work queue and place it on the done queue */
-                               TAILQ_REMOVE( &aio_anchor.lio_sync_workq, entryp, aio_workq_link );
-                               aio_anchor.lio_sync_workq_count--;
-                               entryp->errorval = ECANCELED;
-                               entryp->returnval = -1;
-                               if ( disable_notification )
-                                       entryp->flags |= AIO_DISABLE; /* flag for special completion processing */
-                               result = AIO_CANCELED;
-
-                               KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_sync_workq)) | DBG_FUNC_NONE,
-                                                         (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 );
-
-                               TAILQ_INSERT_TAIL( &p->aio_doneq, entryp, aio_workq_link );
-                               aio_anchor.aio_done_count++;
-                               p->aio_done_count++;
-                               if ( aiocbp != USER_ADDR_NULL ) {
-                                       AIO_UNLOCK;
-                                       return( result );
-                               }
+
+               /* Can only be cancelled if it's still on a work queue */
+               if (aio_entry_try_workq_remove(entryp) != 0) {
+                       /* Have removed from workq. Update entry state and take a ref */
+                       aio_entry_update_for_cancel(entryp, TRUE, 0, disable_notification);
+
+                       /* Put on the proc done queue and update counts, then unlock the proc */
+                       aio_proc_move_done_locked(p, entryp);
+                       aio_proc_unlock(p);
+
+                       /* Now it's officially cancelled.  Do the completion */
+                       result = AIO_CANCELED;
+                       KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_async_workq)) | DBG_FUNC_NONE,
+                                       (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 );
+                       do_aio_completion(entryp);
+
+                       /* This will free if the aio_return() has already happened ... */
+                       aio_entry_unref(entryp);
+                       aio_proc_lock(p);
+
+                       if ( aiocbp != USER_ADDR_NULL ) {
+                               return( result );
                        }
-               }
-               entryp = next_entryp;
-       } /* while... */
 
-       /* 
-        * look for a match on our queue of active async IO requests and 
-        * return AIO_NOTCANCELED result. 
-        */
-       TAILQ_FOREACH( entryp, &p->aio_activeq, aio_workq_link ) {
-               if ( (aiocbp == USER_ADDR_NULL && fd == 0) ||
-                        (aiocbp != USER_ADDR_NULL && entryp->uaiocbp == aiocbp) ||
-                        (aiocbp == USER_ADDR_NULL && fd == entryp->aiocb.aio_fildes) ) {
+                       /* 
+                        * Restart from the head of the proc active queue since it 
+                        * may have been changed while we were away doing completion 
+                        * processing. 
+                        * 
+                        * Note that if we found an uncancellable AIO before, we will
+                        * either find it again or discover that it's been completed,
+                        * so resetting the result will not cause us to return success
+                        * despite outstanding AIOs.
+                        */
+                       entryp = TAILQ_FIRST(&p->p_aio_activeq);
+                       result = -1; /* As if beginning anew */
+               } else {
+                       /* 
+                        * It's been taken off the active queue already, i.e. is in flight.
+                        * All we can do is ask for notification.
+                        */
                        result = AIO_NOTCANCELED;
 
                        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_activeq)) | DBG_FUNC_NONE,
-                                                 (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 );
+                                       (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 );
+
+                       /* Mark for waiting and such; will not take a ref if "cancelled" arg is FALSE */
+                       aio_entry_update_for_cancel(entryp, FALSE, wait_for_completion, disable_notification);
 
-                       if ( wait_for_completion )
-                               entryp->flags |= AIO_WAITING; /* flag for special completion processing */
-                       if ( disable_notification )
-                               entryp->flags |= AIO_DISABLE; /* flag for special completion processing */
                        if ( aiocbp != USER_ADDR_NULL ) {
-                               AIO_UNLOCK;
                                return( result );
                        }
+                       entryp = next_entryp;
                }
-       }
-       
+       } /* while... */
+               
        /* 
         * if we didn't find any matches on the todo or active queues then look for a 
         * match on our queue of async IO requests that have completed and if found 
         * return AIO_ALLDONE result.  
+        *
+        * Proc AIO lock is still held.
         */
        if ( result == -1 ) {
-               TAILQ_FOREACH( entryp, &p->aio_doneq, aio_workq_link ) {
-               if ( (aiocbp == USER_ADDR_NULL && fd == 0) ||
-                        (aiocbp != USER_ADDR_NULL && entryp->uaiocbp == aiocbp) ||
-                        (aiocbp == USER_ADDR_NULL && fd == entryp->aiocb.aio_fildes) ) {
+               TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
+                       ASSERT_AIO_FROM_PROC(entryp, p);
+                       if (should_cancel(entryp, aiocbp, fd)) {
                                result = AIO_ALLDONE;
-
                                KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_doneq)) | DBG_FUNC_NONE,
-                                                         (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 );
+                                               (int)entryp->procp, (int)entryp->uaiocbp, fd, 0, 0 );
 
                                if ( aiocbp != USER_ADDR_NULL ) {
-                                       AIO_UNLOCK;
                                        return( result );
                                }
                        }
                }
        }
-       AIO_UNLOCK;
 
        return( result );
        
-} /* do_aio_cancel */
+}
+ /* do_aio_cancel_locked */
 
 
 /*
@@ -861,9 +1092,16 @@ do_aio_cancel(    struct proc *p, int fd, user_addr_t aiocbp,
  * set appropriately - EAGAIN if timeout elapses or EINTR if an interrupt
  * woke us up.
  */
+int
+aio_suspend(proc_t p, struct aio_suspend_args *uap, int *retval )
+{
+       __pthread_testcancel(1);
+       return(aio_suspend_nocancel(p, (struct aio_suspend_nocancel_args *)uap, retval));
+}
+
 
 int
-aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval )
+aio_suspend_nocancel(proc_t p, struct aio_suspend_nocancel_args *uap, int *retval )
 {
        int                                     error;
        int                                     i, count;
@@ -879,10 +1117,7 @@ aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval )
        abstime = 0;
        aiocbpp = NULL;
 
-       /* quick check to see if there are any async IO requests queued up */
-       AIO_LOCK;
-       count = aio_get_all_queues_count( );
-       AIO_UNLOCK;
+       count = aio_get_all_queues_count( ); 
        if ( count < 1 ) {
                error = EINVAL;
                goto ExitThisRoutine;
@@ -895,10 +1130,15 @@ aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval )
 
        if ( uap->timeoutp != USER_ADDR_NULL ) {
                if ( proc_is64bit(p) ) {
-                       error = copyin( uap->timeoutp, &ts, sizeof(ts) );
+                       struct user64_timespec temp;
+                       error = copyin( uap->timeoutp, &temp, sizeof(temp) );
+                       if ( error == 0 ) {
+                               ts.tv_sec = temp.tv_sec;
+                               ts.tv_nsec = temp.tv_nsec;
+                       }
                }
                else {
-                       struct timespec temp;
+                       struct user32_timespec temp;
                        error = copyin( uap->timeoutp, &temp, sizeof(temp) );
                        if ( error == 0 ) {
                                ts.tv_sec = temp.tv_sec;
@@ -910,7 +1150,7 @@ aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval )
                        goto ExitThisRoutine;
                }
                        
-               if ( ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000 ) {
+               if ( ts.tv_sec < 0 || ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000 ) {
                        error = EINVAL;
                        goto ExitThisRoutine;
                }
@@ -920,35 +1160,15 @@ aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval )
                clock_absolutetime_interval_to_deadline( abstime, &abstime );
        }
 
-       /* we reserve enough space for largest possible pointer size */
-       MALLOC( aiocbpp, user_addr_t *, (uap->nent * sizeof(user_addr_t)), M_TEMP, M_WAITOK );
+       aiocbpp = aio_copy_in_list(p, uap->aiocblist, uap->nent);
        if ( aiocbpp == NULL ) {
                error = EAGAIN;
                goto ExitThisRoutine;
        }
 
-       /* copyin our aiocb pointers from list */
-       error = copyin( uap->aiocblist, aiocbpp, 
-                                       proc_is64bit(p) ? (uap->nent * sizeof(user_addr_t)) 
-                                                                       : (uap->nent * sizeof(uintptr_t)) );
-       if ( error != 0 ) {
-               error = EAGAIN;
-               goto ExitThisRoutine;
-       }
-       
-       /* we depend on a list of user_addr_t's so we need to munge and expand */
-       /* when these pointers came from a 32-bit process */
-       if ( !proc_is64bit(p) && sizeof(uintptr_t) < sizeof(user_addr_t) ) {
-               /* position to the last entry and work back from there */
-               uintptr_t       *my_ptrp = ((uintptr_t *)aiocbpp) + (uap->nent - 1);
-               user_addr_t *my_addrp = aiocbpp + (uap->nent - 1);
-               for (i = 0; i < uap->nent; i++, my_ptrp--, my_addrp--) {
-                       *my_addrp = (user_addr_t) (*my_ptrp);
-               }
-       }
-       
        /* check list of aio requests to see if any have completed */
-       AIO_LOCK;
+check_for_our_aiocbp:
+       aio_proc_lock_spin(p);
        for ( i = 0; i < uap->nent; i++ ) {
                user_addr_t     aiocbp;  
 
@@ -958,11 +1178,12 @@ aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval )
                        continue;
        
                /* return immediately if any aio request in the list is done */
-               TAILQ_FOREACH( entryp, &p->aio_doneq, aio_workq_link ) {
+               TAILQ_FOREACH( entryp, &p->p_aio_doneq, aio_proc_link) {
+                       ASSERT_AIO_FROM_PROC(entryp, p);
                        if ( entryp->uaiocbp == aiocbp ) {
+                               aio_proc_unlock(p);
                                *retval = 0;
                                error = 0;
-                               AIO_UNLOCK;
                                goto ExitThisRoutine;
                        }
                }
@@ -977,17 +1198,20 @@ aio_suspend( struct proc *p, struct aio_suspend_args *uap, int *retval )
         * interrupts us.  If an async IO completes before a signal fires or our 
         * timeout expires, we get a wakeup call from aio_work_thread().
         */
-       assert_wait_deadline( (event_t) &p->AIO_SUSPEND_SLEEP_CHAN, THREAD_ABORTSAFE, abstime );
-       AIO_UNLOCK;
 
-       error = thread_block( THREAD_CONTINUE_NULL );
-
-       if ( error == THREAD_AWAKENED ) {
-               /* got our wakeup call from aio_work_thread() */
-               *retval = 0;
-               error = 0;
+       error = msleep1(&p->AIO_SUSPEND_SLEEP_CHAN, aio_proc_mutex(p), PCATCH | PWAIT | PDROP, "aio_suspend", abstime); /* XXX better priority? */
+       if ( error == 0 ) {
+               /* 
+                * got our wakeup call from aio_work_thread().
+                * Since we can get a wakeup on this channel from another thread in the 
+                * same process we head back up to make sure this is for the correct aiocbp.  
+                * If it is the correct aiocbp we will return from where we do the check 
+                * (see entryp->uaiocbp == aiocbp after check_for_our_aiocbp label)
+                * else we will fall out and just sleep again.  
+                */
+               goto check_for_our_aiocbp;
        }
-       else if ( error == THREAD_TIMED_OUT ) {
+       else if ( error == EWOULDBLOCK ) {
                /* our timeout expired */
                error = EAGAIN;
        }
@@ -1014,7 +1238,7 @@ ExitThisRoutine:
  */
 
 int
-aio_write( struct proc *p, struct aio_write_args *uap, int *retval )
+aio_write(proc_t p, struct aio_write_args *uap, int *retval )
 {
        int                     error;
        
@@ -1035,28 +1259,218 @@ aio_write( struct proc *p, struct aio_write_args *uap, int *retval )
 } /* aio_write */
 
 
+static user_addr_t *
+aio_copy_in_list(proc_t procp, user_addr_t aiocblist, int nent)
+{
+       user_addr_t     *aiocbpp;
+       int             i, result;
+
+       /* we reserve enough space for largest possible pointer size */
+       MALLOC( aiocbpp, user_addr_t *, (nent * sizeof(user_addr_t)), M_TEMP, M_WAITOK );
+       if ( aiocbpp == NULL )
+               goto err;
+
+       /* copyin our aiocb pointers from list */
+       result = copyin( aiocblist, aiocbpp, 
+                       proc_is64bit(procp) ? (nent * sizeof(user64_addr_t))
+                                           : (nent * sizeof(user32_addr_t)) );
+       if ( result) {
+               FREE( aiocbpp, M_TEMP );
+               aiocbpp = NULL;
+               goto err;
+       }
+
+       /*
+        * We depend on a list of user_addr_t's so we need to
+        * munge and expand when these pointers came from a
+        * 32-bit process
+        */
+       if ( !proc_is64bit(procp) ) {
+               /* copy from last to first to deal with overlap */
+               user32_addr_t *my_ptrp = ((user32_addr_t *)aiocbpp) + (nent - 1);
+               user_addr_t *my_addrp = aiocbpp + (nent - 1);
+
+               for (i = 0; i < nent; i++, my_ptrp--, my_addrp--) {
+                       *my_addrp = (user_addr_t) (*my_ptrp);
+               }
+       }
+
+err:
+       return (aiocbpp);
+}
+
+
+static int
+aio_copy_in_sigev(proc_t procp, user_addr_t sigp, struct user_sigevent *sigev)
+{
+       int     result = 0;
+
+       if (sigp == USER_ADDR_NULL)
+               goto out;
+
+       /*
+        * We need to munge aio_sigevent since it contains pointers.
+        * Since we do not know if sigev_value is an int or a ptr we do
+        * NOT cast the ptr to a user_addr_t.   This means if we send
+        * this info back to user space we need to remember sigev_value
+        * was not expanded for the 32-bit case.
+        *
+        * Notes:        This does NOT affect us since we don't support
+        *              sigev_value yet in the aio context.
+        */
+       if ( proc_is64bit(procp) ) {
+               struct user64_sigevent sigevent64;
+
+               result = copyin( sigp, &sigevent64, sizeof(sigevent64) );
+               if ( result == 0 ) {
+                       sigev->sigev_notify = sigevent64.sigev_notify;
+                       sigev->sigev_signo = sigevent64.sigev_signo;
+                       sigev->sigev_value.size_equivalent.sival_int = sigevent64.sigev_value.size_equivalent.sival_int;
+                       sigev->sigev_notify_function = sigevent64.sigev_notify_function;
+                       sigev->sigev_notify_attributes = sigevent64.sigev_notify_attributes;
+               }
+               
+       } else {
+               struct user32_sigevent sigevent32;
+
+               result = copyin( sigp, &sigevent32, sizeof(sigevent32) );
+               if ( result == 0 ) {
+                       sigev->sigev_notify = sigevent32.sigev_notify;
+                       sigev->sigev_signo = sigevent32.sigev_signo;
+                       sigev->sigev_value.size_equivalent.sival_int = sigevent32.sigev_value.sival_int;
+                       sigev->sigev_notify_function = CAST_USER_ADDR_T(sigevent32.sigev_notify_function);
+                       sigev->sigev_notify_attributes = CAST_USER_ADDR_T(sigevent32.sigev_notify_attributes);
+               }
+       }
+
+       if ( result != 0 ) {
+               result = EAGAIN;
+       }
+
+out:
+       return (result);
+}
+
 /*
- * lio_listio - initiate a list of IO requests.  We process the list of aiocbs
- * either synchronously (mode == LIO_WAIT) or asynchronously (mode == LIO_NOWAIT).
- * The caller gets error and return status for each aiocb in the list via aio_error 
- * and aio_return.  We must keep completed requests until released by the 
- * aio_return call.
+ * aio_enqueue_work
+ *
+ * Queue up the entry on the aio asynchronous work queue in priority order
+ * based on the relative priority of the request.  We calculate the relative
+ * priority using the nice value of the caller and the value
+ *
+ * Parameters: procp                   Process queueing the I/O
+ *             entryp                  The work queue entry being queued
+ *
+ * Returns:    (void)                  No failure modes
+ *
+ * Notes:      This function is used for both lio_listio and aio
+ *
+ * XXX:                At some point, we may have to consider thread priority
+ *             rather than process priority, but we don't maintain the
+ *             adjusted priority for threads the POSIX way.
+ *
+ *
+ * Called with proc locked.
  */
+static void
+aio_enqueue_work( proc_t procp, aio_workq_entry *entryp, int proc_locked)
+{
+#if 0
+       aio_workq_entry *my_entryp;     /* used for insertion sort */
+#endif /* 0 */
+       aio_workq_t queue = aio_entry_workq(entryp);
+
+       if (proc_locked == 0) {
+               aio_proc_lock(procp);
+       }
+
+       ASSERT_AIO_PROC_LOCK_OWNED(procp);
 
+       /* Onto proc queue */
+       TAILQ_INSERT_TAIL(&procp->p_aio_activeq, entryp,  aio_proc_link);
+       procp->p_aio_active_count++;
+       procp->p_aio_total_count++;
+
+       /* And work queue */
+       aio_workq_lock_spin(queue);
+       aio_workq_add_entry_locked(queue, entryp);
+       waitq_wakeup64_one(&queue->aioq_waitq, CAST_EVENT64_T(queue),
+                          THREAD_AWAKENED, WAITQ_ALL_PRIORITIES);
+       aio_workq_unlock(queue);
+       
+       if (proc_locked == 0) {
+               aio_proc_unlock(procp);
+       }
+
+#if 0
+       /*
+        * Procedure:
+        *
+        * (1)  The nice value is in the range PRIO_MIN..PRIO_MAX [-20..20]
+        * (2)  The normalized nice value is in the range 0..((2 * NZERO) - 1)
+        *      which is [0..39], with 0 not being used.  In nice values, the
+        *      lower the nice value, the higher the priority.
+        * (3)  The normalized scheduling prioritiy is the highest nice value
+        *      minus the current nice value.  In I/O scheduling priority, the
+        *      higher the value the lower the priority, so it is the inverse
+        *      of the nice value (the higher the number, the higher the I/O
+        *      priority).
+        * (4)  From the normalized scheduling priority, we subtract the
+        *      request priority to get the request priority value number;
+        *      this means that requests are only capable of depressing their
+        *      priority relative to other requests,
+        */
+       entryp->priority = (((2 * NZERO) - 1) - procp->p_nice);
+
+       /* only premit depressing the priority */
+       if (entryp->aiocb.aio_reqprio < 0)
+               entryp->aiocb.aio_reqprio = 0;
+       if (entryp->aiocb.aio_reqprio > 0) {
+               entryp->priority -= entryp->aiocb.aio_reqprio;
+               if (entryp->priority < 0)
+                       entryp->priority = 0;
+       }
+
+       /* Insertion sort the entry; lowest ->priority to highest */
+       TAILQ_FOREACH(my_entryp, &aio_anchor.aio_async_workq, aio_workq_link) {
+               if ( entryp->priority <= my_entryp->priority) {
+                       TAILQ_INSERT_BEFORE(my_entryp, entryp, aio_workq_link);
+                       break;
+               }
+       }
+       if (my_entryp == NULL)
+               TAILQ_INSERT_TAIL( &aio_anchor.aio_async_workq, entryp, aio_workq_link );
+#endif /* 0 */
+}
+
+
+/*
+ * lio_listio - initiate a list of IO requests.  We process the list of
+ * aiocbs either synchronously (mode == LIO_WAIT) or asynchronously
+ * (mode == LIO_NOWAIT).
+ *
+ * The caller gets error and return status for each aiocb in the list
+ * via aio_error and aio_return.  We must keep completed requests until
+ * released by the aio_return call.
+ */
 int
-lio_listio( struct proc *p, struct lio_listio_args *uap, int *retval )
+lio_listio(proc_t p, struct lio_listio_args *uap, int *retval )
 {
-       int                                                     i;
-       int                                                     call_result;
-       int                                                     result;
-       long                                            group_tag;
-       aio_workq_entry *                       *entryp_listp;
-       user_addr_t                                     *aiocbpp;
-
+       int                             i;
+       int                             call_result;
+       int                             result;
+       int                             old_count;
+       aio_workq_entry                 **entryp_listp;
+       user_addr_t                     *aiocbpp;
+       struct user_sigevent            aiosigev;
+       aio_lio_context         *lio_context;
+       boolean_t                       free_context = FALSE;
+       
        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_listio)) | DBG_FUNC_START,
                          (int)p, uap->nent, uap->mode, 0, 0 );
        
        entryp_listp = NULL;
+       lio_context = NULL;
        aiocbpp = NULL;
        call_result = -1;
        *retval = -1;
@@ -1069,171 +1483,149 @@ lio_listio( struct proc *p, struct lio_listio_args *uap, int *retval )
                call_result = EINVAL;
                goto ExitRoutine;
        }
-       
-       /* 
-        * we use group_tag to mark IO requests for delayed completion processing
-        * which means we wait until all IO requests in the group have completed 
-        * before we either return to the caller when mode is LIO_WAIT or signal
-        * user when mode is LIO_NOWAIT. 
-        */
-       group_tag = random();
                
        /* 
-        * allocate a list of aio_workq_entry pointers that we will use to queue
-        * up all our requests at once while holding our lock.
+        * allocate a list of aio_workq_entry pointers that we will use
+        * to queue up all our requests at once while holding our lock.
         */
        MALLOC( entryp_listp, void *, (uap->nent * sizeof(aio_workq_entry *)), M_TEMP, M_WAITOK );
        if ( entryp_listp == NULL ) {
                call_result = EAGAIN;
                goto ExitRoutine;
        }
-
-       /* we reserve enough space for largest possible pointer size */
-       MALLOC( aiocbpp, user_addr_t *, (uap->nent * sizeof(user_addr_t)), M_TEMP, M_WAITOK );
-       if ( aiocbpp == NULL ) {
+       
+       MALLOC( lio_context, aio_lio_context*, sizeof(aio_lio_context), M_TEMP, M_WAITOK );
+       if ( lio_context == NULL ) {
                call_result = EAGAIN;
                goto ExitRoutine;
        }
 
-       /* copyin our aiocb pointers from list */
-       result = copyin( uap->aiocblist, aiocbpp, 
-                                       IS_64BIT_PROCESS(p) ? (uap->nent * sizeof(user_addr_t)) 
-                                                                               : (uap->nent * sizeof(uintptr_t)) );
-       if ( result != 0 ) {
+#if DEBUG      
+       OSIncrementAtomic(&lio_contexts_alloced);
+#endif /* DEBUG */
+
+       bzero(lio_context, sizeof(aio_lio_context));
+       
+       aiocbpp = aio_copy_in_list(p, uap->aiocblist, uap->nent);
+       if ( aiocbpp == NULL ) {
                call_result = EAGAIN;
                goto ExitRoutine;
        }
-       
-       /* we depend on a list of user_addr_t's so we need to munge and expand */
-       /* when these pointers came from a 32-bit process */
-       if ( !IS_64BIT_PROCESS(p) && sizeof(uintptr_t) < sizeof(user_addr_t) ) {
-               /* position to the last entry and work back from there */
-               uintptr_t       *my_ptrp = ((uintptr_t *)aiocbpp) + (uap->nent - 1);
-               user_addr_t *my_addrp = aiocbpp + (uap->nent - 1);
-               for (i = 0; i < uap->nent; i++, my_ptrp--, my_addrp--) {
-                       *my_addrp = (user_addr_t) (*my_ptrp);
-               }
+
+       /*
+        * Use sigevent passed in to lio_listio for each of our calls, but
+        * only do completion notification after the last request completes.
+        */
+       bzero(&aiosigev, sizeof(aiosigev));
+       /* Only copy in an sigev if the user supplied one */
+       if (uap->sigp != USER_ADDR_NULL) {
+               call_result = aio_copy_in_sigev(p, uap->sigp, &aiosigev);
+               if ( call_result)
+                       goto ExitRoutine;
        }
 
        /* process list of aio requests */
+       lio_context->io_issued = uap->nent;
+       lio_context->io_waiter = uap->mode == LIO_WAIT ? 1 : 0; /* Should it be freed by last AIO */
        for ( i = 0; i < uap->nent; i++ ) {
                user_addr_t my_aiocbp; 
+               aio_workq_entry                         *entryp;
        
                *(entryp_listp + i) = NULL;
                my_aiocbp = *(aiocbpp + i);
                
                /* NULL elements are legal so check for 'em */
-               if ( my_aiocbp == USER_ADDR_NULL )
+               if ( my_aiocbp == USER_ADDR_NULL ) {
+                       aio_proc_lock_spin(p);
+                       lio_context->io_issued--;
+                       aio_proc_unlock(p);
                        continue;
+               }
 
-               if ( uap->mode == LIO_NOWAIT )
-                       result = lio_create_async_entry( p, my_aiocbp, uap->sigp, 
-                                                                                        group_tag, (entryp_listp + i) );
-               else
-                       result = lio_create_sync_entry( p, my_aiocbp, group_tag, 
-                                                                                       (entryp_listp + i) );
-
+               /* 
+                * We use lio_context to mark IO requests for delayed completion
+                * processing which means we wait until all IO requests in the
+                * group have completed before we either return to the caller
+                * when mode is LIO_WAIT or signal user when mode is LIO_NOWAIT.
+                *
+                * We use the address of the lio_context for this, since it is
+                * unique in the address space.
+                */
+               result = lio_create_entry( p, my_aiocbp, lio_context, (entryp_listp + i) );
                if ( result != 0 && call_result == -1 )
                        call_result = result;
-       }
-
-       /* 
-        * we need to protect this section since we do not want any of these grouped 
-        * IO requests to begin until we have them all on the queue.
-        */
-       AIO_LOCK;
-       for ( i = 0; i < uap->nent; i++ ) {
-               aio_workq_entry                         *entryp;
                
                /* NULL elements are legal so check for 'em */
                entryp = *(entryp_listp + i);
-               if ( entryp == NULL )
+               if ( entryp == NULL ) {
+                       aio_proc_lock_spin(p);
+                       lio_context->io_issued--;
+                       aio_proc_unlock(p);
                        continue;
+               }
+       
+               if ( uap->mode == LIO_NOWAIT ) {
+                       /* Set signal hander, if any */
+                       entryp->aiocb.aio_sigevent = aiosigev;
+               } else {
+                       /* flag that this thread blocks pending completion */
+                       entryp->flags |= AIO_LIO_NOTIFY;
+               }
 
                /* check our aio limits to throttle bad or rude user land behavior */
-               if ( aio_get_all_queues_count( ) >= aio_max_requests || 
+               old_count = aio_increment_total_count();
+
+               aio_proc_lock_spin(p);
+               if ( old_count >= aio_max_requests ||
                         aio_get_process_count( entryp->procp ) >= aio_max_requests_per_process ||
                         is_already_queued( entryp->procp, entryp->uaiocbp ) == TRUE ) {
-                       vm_map_t                my_map;
                        
-                       my_map = entryp->aio_map;
-                       entryp->aio_map = VM_MAP_NULL;
+                       lio_context->io_issued--;
+                       aio_proc_unlock(p);
+       
+                       aio_decrement_total_count();
+
                        if ( call_result == -1 )
-                               call_result = EAGAIN; 
-                       AIO_UNLOCK;
-                       aio_free_request( entryp, my_map );
-                       AIO_LOCK;
+                               call_result = EAGAIN;
+                       aio_free_request(entryp);
+                       entryp_listp[i] = NULL;
                        continue;
                }
                
-               /* place the request on the appropriate queue */
-               if ( uap->mode == LIO_NOWAIT ) {
-                       TAILQ_INSERT_TAIL( &aio_anchor.aio_async_workq, entryp, aio_workq_link );
-                       aio_anchor.aio_async_workq_count++;
-
-                       KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued)) | DBG_FUNC_NONE,
-                                         (int)p, (int)entryp->uaiocbp, 0, 0, 0 );
-               }
-               else {
-                       TAILQ_INSERT_TAIL( &aio_anchor.lio_sync_workq, entryp, aio_workq_link );
-                       aio_anchor.lio_sync_workq_count++;
-               }
-       }
-
-       if ( uap->mode == LIO_NOWAIT ) { 
-               /* caller does not want to wait so we'll fire off a worker thread and return */
-               wakeup_one( (caddr_t) &aio_anchor.aio_async_workq );
+               lck_mtx_convert_spin(aio_proc_mutex(p));
+               aio_enqueue_work(p, entryp, 1);
+               aio_proc_unlock(p);
+               
+               KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued)) | DBG_FUNC_NONE,
+                                 (int)p, (int)entryp->uaiocbp, 0, 0, 0 );
        }
-       else {
-               aio_workq_entry                 *entryp;
-               int                                     error;
 
-               /* 
-                * mode is LIO_WAIT - handle the IO requests now.
-                */
-               entryp = TAILQ_FIRST( &aio_anchor.lio_sync_workq );
-               while ( entryp != NULL ) {
-                       if ( p == entryp->procp && group_tag == entryp->group_tag ) {
-                                       
-                               TAILQ_REMOVE( &aio_anchor.lio_sync_workq, entryp, aio_workq_link );
-                               aio_anchor.lio_sync_workq_count--;
-                               AIO_UNLOCK;
-                               
-                               if ( (entryp->flags & AIO_READ) != 0 ) {
-                                       error = do_aio_read( entryp );
-                               }
-                               else if ( (entryp->flags & AIO_WRITE) != 0 ) {
-                                       error = do_aio_write( entryp );
-                               }
-                               else if ( (entryp->flags & AIO_FSYNC) != 0 ) {
-                                       error = do_aio_fsync( entryp );
-                               }
-                               else {
-                                       printf( "%s - unknown aio request - flags 0x%02X \n", 
-                                                       __FUNCTION__, entryp->flags );
-                                       error = EINVAL;
-                               }
-                               entryp->errorval = error;       
-                               if ( error != 0 && call_result == -1 )
-                                       call_result = EIO;
-
-                               AIO_LOCK;
-                               /* we're done with the IO request so move it on the done queue */
-                               TAILQ_INSERT_TAIL( &p->aio_doneq, entryp, aio_workq_link );
-                               aio_anchor.aio_done_count++;
-                               p->aio_done_count++;
-
-                               /* need to start over since lio_sync_workq may have been changed while we */
-                               /* were away doing the IO.  */
-                               entryp = TAILQ_FIRST( &aio_anchor.lio_sync_workq );
-                               continue;
-                       } /* p == entryp->procp */
+       switch(uap->mode) {
+       case LIO_WAIT:
+               aio_proc_lock_spin(p);
+               while (lio_context->io_completed < lio_context->io_issued) {
+                       result = msleep(lio_context, aio_proc_mutex(p), PCATCH | PRIBIO | PSPIN, "lio_listio", 0);
                        
-                       entryp = TAILQ_NEXT( entryp, aio_workq_link );
-        } /* while ( entryp != NULL ) */
-       } /* uap->mode == LIO_WAIT */
-       AIO_UNLOCK;
+                       /* If we were interrupted, fail out (even if all finished) */
+                       if (result != 0) {
+                               call_result = EINTR;
+                               lio_context->io_waiter = 0;
+                               break;
+                       } 
+               }
 
+               /* If all IOs have finished must free it */
+               if (lio_context->io_completed == lio_context->io_issued) {
+                       free_context = TRUE;
+               } 
+
+               aio_proc_unlock(p);
+               break;
+               
+       case LIO_NOWAIT:
+               break;
+       }
+       
        /* call_result == -1 means we had no trouble queueing up requests */
        if ( call_result == -1 ) {
                call_result = 0;
@@ -1245,7 +1637,10 @@ ExitRoutine:
                FREE( entryp_listp, M_TEMP );
        if ( aiocbpp != NULL )
                FREE( aiocbpp, M_TEMP );
-
+       if ((lio_context != NULL) && ((lio_context->io_issued == 0) || (free_context == TRUE))) {
+               free_lio_context(lio_context);
+       }
+       
        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_listio)) | DBG_FUNC_END,
                          (int)p, call_result, 0, 0, 0 );
        
@@ -1259,115 +1654,93 @@ ExitRoutine:
  * we get a wake up call on sleep channel &aio_anchor.aio_async_workq 
  * after new work is queued up.
  */
-
 static void
 aio_work_thread( void )
 {
        aio_workq_entry                 *entryp;
+       int                     error;
+       vm_map_t                currentmap;
+       vm_map_t                oldmap = VM_MAP_NULL;
+       task_t                  oldaiotask = TASK_NULL;
+       struct uthread  *uthreadp = NULL;
        
        for( ;; ) {
-               AIO_LOCK;
-               entryp = aio_get_some_work();
-        if ( entryp == NULL ) {
-               /* 
-                * aio worker threads wait for some work to get queued up 
-                * by aio_queue_async_request.  Once some work gets queued 
-                * it will wake up one of these worker threads just before 
-                * returning to our caller in user land.
-                */
-                       assert_wait( (event_t) &aio_anchor.aio_async_workq, THREAD_UNINT );
-                       AIO_UNLOCK; 
-                       
-                       thread_block( (thread_continue_t)aio_work_thread );
-                       /* NOT REACHED */
-        }
+               /* 
+                * returns with the entry ref'ed.
+                * sleeps until work is available. 
+                */
+               entryp = aio_get_some_work();         
+
+               KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread)) | DBG_FUNC_START,
+                               (int)entryp->procp, (int)entryp->uaiocbp, entryp->flags, 0, 0 );
+
+               /*
+                * Assume the target's address space identity for the duration
+                * of the IO.  Note: don't need to have the entryp locked,
+                * because the proc and map don't change until it's freed.
+                */
+               currentmap = get_task_map( (current_proc())->task );
+               if ( currentmap != entryp->aio_map ) {
+                       uthreadp = (struct uthread *) get_bsdthread_info(current_thread());
+                       oldaiotask = uthreadp->uu_aio_task;
+                       uthreadp->uu_aio_task = entryp->procp->task;
+                       oldmap = vm_map_switch( entryp->aio_map );
+               }
+
+               if ( (entryp->flags & AIO_READ) != 0 ) {
+                       error = do_aio_read( entryp );
+               }
+               else if ( (entryp->flags & AIO_WRITE) != 0 ) {
+                       error = do_aio_write( entryp );
+               }
+               else if ( (entryp->flags & (AIO_FSYNC | AIO_DSYNC)) != 0 ) {
+                       error = do_aio_fsync( entryp );
+               }
                else {
-                       int                     error;
-                       vm_map_t                currentmap;
-                       vm_map_t                oldmap = VM_MAP_NULL;
-                       task_t                  oldaiotask = TASK_NULL;
-                       struct uthread  *uthreadp = NULL;
+                       printf( "%s - unknown aio request - flags 0x%02X \n", 
+                                       __FUNCTION__, entryp->flags );
+                       error = EINVAL;
+               }
 
-                       AIO_UNLOCK; 
+               /* Restore old map */
+               if ( currentmap != entryp->aio_map ) {
+                       (void) vm_map_switch( oldmap );
+                       uthreadp->uu_aio_task = oldaiotask;
+               }
 
-                       KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread)) | DBG_FUNC_START,
-                                                 (int)entryp->procp, (int)entryp->uaiocbp, entryp->flags, 0, 0 );
-                       
-                       /*
-                        * Assume the target's address space identity for the duration
-                        * of the IO.
-                        */
-                       currentmap = get_task_map( (current_proc())->task );
-                       if ( currentmap != entryp->aio_map ) {
-                               uthreadp = (struct uthread *) get_bsdthread_info(current_thread());
-                               oldaiotask = uthreadp->uu_aio_task;
-                               uthreadp->uu_aio_task = entryp->procp->task;
-                               oldmap = vm_map_switch( entryp->aio_map );
-                       }
-                       
-                       if ( (entryp->flags & AIO_READ) != 0 ) {
-                               error = do_aio_read( entryp );
-                       }
-                       else if ( (entryp->flags & AIO_WRITE) != 0 ) {
-                               error = do_aio_write( entryp );
-                       }
-                       else if ( (entryp->flags & AIO_FSYNC) != 0 ) {
-                               error = do_aio_fsync( entryp );
-                       }
-                       else {
-                               printf( "%s - unknown aio request - flags 0x%02X \n", 
-                                               __FUNCTION__, entryp->flags );
-                               error = EINVAL;
-                       }
-                       entryp->errorval = error;               
-                       if ( currentmap != entryp->aio_map ) {
-                               (void) vm_map_switch( oldmap );
-                               uthreadp->uu_aio_task = oldaiotask;
-                       }
-                               
-                       /* we're done with the IO request so pop it off the active queue and */
-                       /* push it on the done queue */
-                       AIO_LOCK;
-                       TAILQ_REMOVE( &entryp->procp->aio_activeq, entryp, aio_workq_link );
-                       aio_anchor.aio_active_count--;
-                       entryp->procp->aio_active_count--;
-                       TAILQ_INSERT_TAIL( &entryp->procp->aio_doneq, entryp, aio_workq_link );
-                       aio_anchor.aio_done_count++;
-                       entryp->procp->aio_done_count++;
-                       entryp->flags |= AIO_COMPLETION;
-
-                       /* remove our reference to the user land map. */
-                       if ( VM_MAP_NULL != entryp->aio_map ) {
-                               vm_map_t                my_map;
-                               
-                               my_map = entryp->aio_map;
-                               entryp->aio_map = VM_MAP_NULL;
-                               AIO_UNLOCK;  /* must unlock before calling vm_map_deallocate() */
-                               vm_map_deallocate( my_map );
-                       }
-                       else {
-                               AIO_UNLOCK;
-                       }
-                       
-                       do_aio_completion( entryp );
-                       
-                       KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread)) | DBG_FUNC_END,
-                                                 (int)entryp->procp, (int)entryp->uaiocbp, entryp->errorval, 
-                                                 entryp->returnval, 0 );
-                       
-                       AIO_LOCK;
-                       entryp->flags &= ~AIO_COMPLETION;
-                       if ( (entryp->flags & AIO_DO_FREE) != 0 ) {
-                               vm_map_t                my_map;
-                       
-                               my_map = entryp->aio_map;
-                               entryp->aio_map = VM_MAP_NULL;
-                               AIO_UNLOCK;
-                               aio_free_request( entryp, my_map );
-                       }
-                       else
-                               AIO_UNLOCK;
+               KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread)) | DBG_FUNC_END,
+                               (int)entryp->procp, (int)entryp->uaiocbp, entryp->errorval, 
+                               entryp->returnval, 0 );
+
+               
+               /* XXX COUNTS */
+               aio_entry_lock_spin(entryp);
+               entryp->errorval = error;       
+               aio_entry_unlock(entryp);
+
+               /* we're done with the IO request so pop it off the active queue and */
+               /* push it on the done queue */
+               aio_proc_lock(entryp->procp);
+               aio_proc_move_done_locked(entryp->procp, entryp);
+               aio_proc_unlock(entryp->procp);
+
+               OSDecrementAtomic(&aio_anchor.aio_inflight_count);
+
+               /* remove our reference to the user land map. */
+               if ( VM_MAP_NULL != entryp->aio_map ) {
+                       vm_map_t                my_map;
+
+                       my_map = entryp->aio_map;
+                       entryp->aio_map = VM_MAP_NULL;
+                       vm_map_deallocate( my_map );
                }
+
+               /* Provide notifications */
+               do_aio_completion( entryp );
+
+               /* Will free if needed */
+               aio_entry_unref(entryp);
+
        } /* for ( ;; ) */
 
        /* NOT REACHED */
@@ -1381,387 +1754,327 @@ aio_work_thread( void )
  * IO requests at the time the aio_fsync call came in have completed.
  * NOTE - AIO_LOCK must be held by caller
  */
-
 static aio_workq_entry *
 aio_get_some_work( void )
 {
-       aio_workq_entry                         *entryp;
-       
-       /* pop some work off the work queue and add to our active queue */
-       for ( entryp = TAILQ_FIRST( &aio_anchor.aio_async_workq );
-                 entryp != NULL;
-                 entryp = TAILQ_NEXT( entryp, aio_workq_link ) ) {
+       aio_workq_entry                         *entryp = NULL;
+       aio_workq_t                             queue = NULL;
 
+       /* Just one queue for the moment.  In the future there will be many. */
+       queue = &aio_anchor.aio_async_workqs[0];        
+       aio_workq_lock_spin(queue);
+       if (queue->aioq_count == 0) {
+               goto nowork;
+       }
+
+       /* 
+        * Hold the queue lock.
+        *
+        * pop some work off the work queue and add to our active queue
+        * Always start with the queue lock held. 
+        */
+       for(;;) {
+               /* 
+                * Pull of of work queue.  Once it's off, it can't be cancelled,
+                * so we can take our ref once we drop the queue lock.
+                */
+               entryp = TAILQ_FIRST(&queue->aioq_entries);
+
+               /* 
+                * If there's no work or only fsyncs that need delay, go to sleep 
+                * and then start anew from aio_work_thread 
+                */
+               if (entryp == NULL) {
+                       goto nowork;
+               }
+
+               aio_workq_remove_entry_locked(queue, entryp);
+               
+               aio_workq_unlock(queue);
+
+               /* 
+                * Check if it's an fsync that must be delayed.  No need to lock the entry;
+                * that flag would have been set at initialization.
+                */
                if ( (entryp->flags & AIO_FSYNC) != 0 ) {
-                       /* leave aio_fsync calls on the work queue if there are IO */
-                       /* requests on the active queue for the same file descriptor. */
+                       /* 
+                        * Check for unfinished operations on the same file
+                        * in this proc's queue.
+                        */
+                       aio_proc_lock_spin(entryp->procp);
                        if ( aio_delay_fsync_request( entryp ) ) {
-
+                               /* It needs to be delayed.  Put it back on the end of the work queue */
                                KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync_delay)) | DBG_FUNC_NONE,
                                                          (int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 );
+
+                               aio_proc_unlock(entryp->procp);
+
+                               aio_workq_lock_spin(queue);
+                               aio_workq_add_entry_locked(queue, entryp);
                                continue;
-                       }
+                       } 
+                       aio_proc_unlock(entryp->procp);
                }
+               
                break;
        }
-       
-       if ( entryp != NULL ) {
-               TAILQ_REMOVE( &aio_anchor.aio_async_workq, entryp, aio_workq_link );
-               aio_anchor.aio_async_workq_count--;
-               TAILQ_INSERT_TAIL( &entryp->procp->aio_activeq, entryp, aio_workq_link );
-               aio_anchor.aio_active_count++;
-               entryp->procp->aio_active_count++;
-       }
-               
+
+       aio_entry_ref(entryp);
+
+       OSIncrementAtomic(&aio_anchor.aio_inflight_count);
        return( entryp );
-       
-} /* aio_get_some_work */
 
+nowork:
+       /* We will wake up when someone enqueues something */
+       waitq_assert_wait64(&queue->aioq_waitq, CAST_EVENT64_T(queue), THREAD_UNINT, 0);
+       aio_workq_unlock(queue);
+       thread_block( (thread_continue_t)aio_work_thread );
+
+       // notreached
+       return NULL;
+}
 
 /*
- * aio_delay_fsync_request - look to see if this aio_fsync request should be delayed at
- * this time.  Delay will happen when there are any active IOs for the same file 
- * descriptor that were queued at time the aio_sync call was queued.  
- * NOTE - AIO_LOCK must be held by caller
+ * aio_delay_fsync_request - look to see if this aio_fsync request should be delayed.
+ * A big, simple hammer: only send it off if it's the most recently filed IO which has
+ * not been completed.
  */
 static boolean_t
 aio_delay_fsync_request( aio_workq_entry *entryp )
 {
-       aio_workq_entry                 *my_entryp;
-
-       TAILQ_FOREACH( my_entryp, &entryp->procp->aio_activeq, aio_workq_link ) {
-               if ( my_entryp->fsyncp != USER_ADDR_NULL &&
-                        entryp->uaiocbp == my_entryp->fsyncp &&
-                        entryp->aiocb.aio_fildes == my_entryp->aiocb.aio_fildes ) {
-                       return( TRUE );
-               }
+       if (entryp == TAILQ_FIRST(&entryp->procp->p_aio_activeq)) {
+               return FALSE;
        }
                
-       return( FALSE );
-       
+       return TRUE;
 } /* aio_delay_fsync_request */
 
-
-/*
- * aio_queue_async_request - queue up an async IO request on our work queue then
- * wake up one of our worker threads to do the actual work.  We get a reference
- * to our caller's user land map in order to keep it around while we are
- * processing the request. 
- */
-
-static int
-aio_queue_async_request( struct proc *procp, user_addr_t aiocbp, int kindOfIO )
+static aio_workq_entry *
+aio_create_queue_entry(proc_t procp, user_addr_t aiocbp, void *group_tag, int kindOfIO)
 {
-       aio_workq_entry                 *entryp;
-       int                                             result;
+       aio_workq_entry *entryp;
+       int             result = 0;
 
        entryp = (aio_workq_entry *) zalloc( aio_workq_zonep );
        if ( entryp == NULL ) {
                result = EAGAIN; 
                goto error_exit;
        }
+
        bzero( entryp, sizeof(*entryp) );
 
        /* fill in the rest of the aio_workq_entry */
        entryp->procp = procp;
        entryp->uaiocbp = aiocbp;
        entryp->flags |= kindOfIO;
+       entryp->group_tag = group_tag;
        entryp->aio_map = VM_MAP_NULL;
+       entryp->aio_refcount = 0;
 
-       if ( !IS_64BIT_PROCESS(procp) ) {
-               struct aiocb aiocb32;
-
+       if ( proc_is64bit(procp) ) {
+               struct user64_aiocb aiocb64;
+               
+               result = copyin( aiocbp, &aiocb64, sizeof(aiocb64) );
+               if (result == 0 )
+                       do_munge_aiocb_user64_to_user(&aiocb64, &entryp->aiocb);
+               
+       } else {
+               struct user32_aiocb aiocb32;
+               
                result = copyin( aiocbp, &aiocb32, sizeof(aiocb32) );
                if ( result == 0 )
-                       do_munge_aiocb( &aiocb32, &entryp->aiocb );
-       } else 
-               result = copyin( aiocbp, &entryp->aiocb, sizeof(entryp->aiocb) );
+                       do_munge_aiocb_user32_to_user( &aiocb32, &entryp->aiocb );
+       }
 
        if ( result != 0 ) {
                result = EAGAIN;
                goto error_exit;
        }
 
-       /* do some more validation on the aiocb and embedded file descriptor */
-       result = aio_validate( entryp );
-       if ( result != 0 ) 
-               goto error_exit;
-
        /* get a reference to the user land map in order to keep it around */
        entryp->aio_map = get_task_map( procp->task );
        vm_map_reference( entryp->aio_map );
 
-       AIO_LOCK;
+       /* do some more validation on the aiocb and embedded file descriptor */
+       result = aio_validate( entryp );
+       if ( result != 0 )
+               goto error_exit_with_ref;
 
-       if ( is_already_queued( entryp->procp, entryp->uaiocbp ) == TRUE ) {
-               AIO_UNLOCK;
-               result = EAGAIN; 
-               goto error_exit;
-       }
+       /* get a reference on the current_thread, which is passed in vfs_context. */
+       entryp->thread = current_thread();
+       thread_reference( entryp->thread );
+       return ( entryp );
 
-       /* check our aio limits to throttle bad or rude user land behavior */
-       if ( aio_get_all_queues_count( ) >= aio_max_requests || 
-                aio_get_process_count( procp ) >= aio_max_requests_per_process ) {
-               AIO_UNLOCK;
-               result = EAGAIN; 
-               goto error_exit;
+error_exit_with_ref:
+       if ( VM_MAP_NULL != entryp->aio_map ) {
+               vm_map_deallocate( entryp->aio_map );
        }
-       
-       /* 
-        * aio_fsync calls sync up all async IO requests queued at the time 
-        * the aio_fsync call was made.  So we mark each currently queued async 
-        * IO with a matching file descriptor as must complete before we do the 
-        * fsync.  We set the fsyncp field of each matching async IO 
-        * request with the aiocb pointer passed in on the aio_fsync call to 
-        * know which IOs must complete before we process the aio_fsync call. 
-        */
-       if ( (kindOfIO & AIO_FSYNC) != 0 )
-               aio_mark_requests( entryp );
-       
-       /* queue up on our aio asynchronous work queue */
-       TAILQ_INSERT_TAIL( &aio_anchor.aio_async_workq, entryp, aio_workq_link );
-       aio_anchor.aio_async_workq_count++;
-       
-       wakeup_one( (caddr_t) &aio_anchor.aio_async_workq );
-       AIO_UNLOCK; 
-
-       KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued)) | DBG_FUNC_NONE,
-                         (int)procp, (int)aiocbp, 0, 0, 0 );
-       
-       return( 0 );
-       
 error_exit:
-       if ( entryp != NULL ) {
-               /* this entry has not been queued up so no worries about unlocked */
-               /* state and aio_map */
-               aio_free_request( entryp, entryp->aio_map );
+       if ( result && entryp != NULL ) {
+               zfree( aio_workq_zonep, entryp );
+               entryp = NULL;
        }
-               
-       return( result );
-       
-} /* aio_queue_async_request */
+
+       return ( entryp );
+}
 
 
 /*
- * lio_create_async_entry - allocate an aio_workq_entry and fill it in.
- * If all goes well return 0 and pass the aio_workq_entry pointer back to
- * our caller.  We get a reference to our caller's user land map in order to keep 
- * it around while we are processing the request.  
- * lio_listio calls behave differently at completion they do completion notification 
- * when all async IO requests have completed.  We use group_tag to tag IO requests 
- * that behave in the delay notification manner. 
+ * aio_queue_async_request - queue up an async IO request on our work queue then
+ * wake up one of our worker threads to do the actual work.  We get a reference
+ * to our caller's user land map in order to keep it around while we are
+ * processing the request. 
  */
-
 static int
-lio_create_async_entry( struct proc *procp, user_addr_t aiocbp, 
-                                                user_addr_t sigp, long group_tag,
-                                                aio_workq_entry **entrypp )
+aio_queue_async_request(proc_t procp, user_addr_t aiocbp, int kindOfIO )
 {
-       aio_workq_entry                         *entryp;
-       int                                                     result;
+       aio_workq_entry *entryp;
+       int             result;
+       int             old_count;
 
-       entryp = (aio_workq_entry *) zalloc( aio_workq_zonep );
-       if ( entryp == NULL ) {
-               result = EAGAIN; 
-               goto error_exit;
+       old_count = aio_increment_total_count();
+       if (old_count >= aio_max_requests) {
+               result = EAGAIN;
+               goto error_noalloc;
        }
-       bzero( entryp, sizeof(*entryp) );
-
-       /* fill in the rest of the aio_workq_entry */
-       entryp->procp = procp;
-       entryp->uaiocbp = aiocbp;
-       entryp->flags |= AIO_LIO;
-       entryp->group_tag = group_tag;
-       entryp->aio_map = VM_MAP_NULL;
 
-       if ( !IS_64BIT_PROCESS(procp) ) {
-               struct aiocb aiocb32;
-
-               result = copyin( aiocbp, &aiocb32, sizeof(aiocb32) );
-               if ( result == 0 )
-                       do_munge_aiocb( &aiocb32, &entryp->aiocb );
-       } else
-               result = copyin( aiocbp, &entryp->aiocb, sizeof(entryp->aiocb) );
-
-       if ( result != 0 ) {
+       entryp = aio_create_queue_entry( procp, aiocbp, 0, kindOfIO);
+       if ( entryp == NULL ) {
                result = EAGAIN;
-               goto error_exit;
+               goto error_noalloc;
        }
 
-       /* look for lio_listio LIO_NOP requests and ignore them. */
-       /* Not really an error, but we need to free our aio_workq_entry.  */
-       if ( entryp->aiocb.aio_lio_opcode == LIO_NOP ) {
-               result = 0;
-               goto error_exit;
-       }
 
-       /* use sigevent passed in to lio_listio for each of our calls, but only */
-       /* do completion notification after the last request completes. */
-       if ( sigp != USER_ADDR_NULL ) {
-               if ( !IS_64BIT_PROCESS(procp) ) {
-                       struct sigevent sigevent32;
-
-                       result = copyin( sigp, &sigevent32, sizeof(sigevent32) );
-                       if ( result == 0 ) {
-                               /* also need to munge aio_sigevent since it contains pointers */
-                               /* special case here.  since we do not know if sigev_value is an */
-                               /* int or a ptr we do NOT cast the ptr to a user_addr_t.   This  */
-                               /* means if we send this info back to user space we need to remember */
-                               /* sigev_value was not expanded for the 32-bit case.  */
-                               /* NOTE - this does NOT affect us since we don't support sigev_value */
-                               /* yet in the aio context.  */
-                               //LP64
-                               entryp->aiocb.aio_sigevent.sigev_notify = sigevent32.sigev_notify;
-                               entryp->aiocb.aio_sigevent.sigev_signo = sigevent32.sigev_signo;
-                               entryp->aiocb.aio_sigevent.sigev_value.size_equivalent.sival_int = 
-                                       sigevent32.sigev_value.sival_int;
-                               entryp->aiocb.aio_sigevent.sigev_notify_function = 
-                                       CAST_USER_ADDR_T(sigevent32.sigev_notify_function);
-                               entryp->aiocb.aio_sigevent.sigev_notify_attributes = 
-                                       CAST_USER_ADDR_T(sigevent32.sigev_notify_attributes);
-                       }
-               } else
-                       result = copyin( sigp, &entryp->aiocb.aio_sigevent, sizeof(entryp->aiocb.aio_sigevent) );
+       aio_proc_lock_spin(procp);
 
-               if ( result != 0 ) {
-                       result = EAGAIN;
-                       goto error_exit;
-               }
+       if ( is_already_queued( entryp->procp, entryp->uaiocbp ) == TRUE ) {
+               result = EAGAIN; 
+               goto error_exit;
        }
 
-       /* do some more validation on the aiocb and embedded file descriptor */
-       result = aio_validate( entryp );
-       if ( result != 0 ) 
+       /* check our aio limits to throttle bad or rude user land behavior */
+       if (aio_get_process_count( procp ) >= aio_max_requests_per_process) {
+               printf("aio_queue_async_request(): too many in flight for proc: %d.\n", procp->p_aio_total_count);
+               result = EAGAIN; 
                goto error_exit;
-
-       /* get a reference to the user land map in order to keep it around */
-       entryp->aio_map = get_task_map( procp->task );
-       vm_map_reference( entryp->aio_map );
+       }
        
-       *entrypp = entryp;
-       return( 0 );
+       /* Add the IO to proc and work queues, wake up threads as appropriate */
+       lck_mtx_convert_spin(aio_proc_mutex(procp));
+       aio_enqueue_work(procp, entryp, 1);
        
-error_exit:
-       if ( entryp != NULL )
-               zfree( aio_workq_zonep, entryp );
-               
-       return( result );
+       aio_proc_unlock(procp);
        
-} /* lio_create_async_entry */
-
+       KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued)) | DBG_FUNC_NONE,
+                         (int)procp, (int)aiocbp, 0, 0, 0 );
 
-/*
- * aio_mark_requests - aio_fsync calls synchronize file data for all queued async IO
- * requests at the moment the aio_fsync call is queued.  We use aio_workq_entry.fsyncp
- * to mark each async IO that must complete before the fsync is done.  We use the uaiocbp
- * field from the aio_fsync call as the aio_workq_entry.fsyncp in marked requests.
- * NOTE - AIO_LOCK must be held by caller
- */
+       return( 0 );
+       
+error_exit:
+       /*
+        * This entry has not been queued up so no worries about
+        * unlocked state and aio_map
+        */
+       aio_proc_unlock(procp);
+       aio_free_request(entryp);
 
-static void
-aio_mark_requests( aio_workq_entry *entryp )
-{
-       aio_workq_entry                 *my_entryp;
+error_noalloc:
+       aio_decrement_total_count();
 
-       TAILQ_FOREACH( my_entryp, &entryp->procp->aio_activeq, aio_workq_link ) {
-               if ( entryp->aiocb.aio_fildes == my_entryp->aiocb.aio_fildes ) {
-                       my_entryp->fsyncp = entryp->uaiocbp;
-               }
-       }
+       return( result );
        
-       TAILQ_FOREACH( my_entryp, &aio_anchor.aio_async_workq, aio_workq_link ) {
-               if ( entryp->procp == my_entryp->procp &&
-                        entryp->aiocb.aio_fildes == my_entryp->aiocb.aio_fildes ) {
-                       my_entryp->fsyncp = entryp->uaiocbp;
-               }
-       }
-                               
-} /* aio_mark_requests */
+} /* aio_queue_async_request */
 
 
 /*
- * lio_create_sync_entry - allocate an aio_workq_entry and fill it in.
- * If all goes well return 0 and pass the aio_workq_entry pointer back to
- * our caller.  
- * lio_listio calls behave differently at completion they do completion notification 
- * when all async IO requests have completed.  We use group_tag to tag IO requests 
- * that behave in the delay notification manner. 
+ * lio_create_entry
+ *
+ * Allocate an aio_workq_entry and fill it in.  If all goes well return 0
+ * and pass the aio_workq_entry pointer back to our caller.
+ *
+ * Parameters: procp                   The process makign the request
+ *             aiocbp                  The aio context buffer pointer
+ *             group_tag               The group tag used to indicate a
+ *                                     group of operations has completed
+ *             entrypp                 Pointer to the pointer to receive the
+ *                                     address of the created aio_workq_entry
+ *
+ * Returns:    0                       Successfully created
+ *             EAGAIN                  Try again (usually resource shortage)
+ *
+ *
+ * Notes:      We get a reference to our caller's user land map in order
+ *             to keep it around while we are processing the request.  
+ *
+ *             lio_listio calls behave differently at completion they do
+ *             completion notification when all async IO requests have
+ *             completed.  We use group_tag to tag IO requests that behave
+ *             in the delay notification manner. 
+ *
+ *             All synchronous operations are considered to not have a
+ *             signal routine associated with them (sigp == USER_ADDR_NULL).
  */
-
 static int
-lio_create_sync_entry( struct proc *procp, user_addr_t aiocbp, 
-                                               long group_tag, aio_workq_entry **entrypp )
+lio_create_entry(proc_t procp, user_addr_t aiocbp, void *group_tag,
+               aio_workq_entry **entrypp )
 {
-       aio_workq_entry                         *entryp;
-       int                                                     result;
+       aio_workq_entry *entryp;
+       int             result;
 
-       entryp = (aio_workq_entry *) zalloc( aio_workq_zonep );
+       entryp = aio_create_queue_entry( procp, aiocbp, group_tag, AIO_LIO);
        if ( entryp == NULL ) {
                result = EAGAIN; 
                goto error_exit;
        }
-       bzero( entryp, sizeof(*entryp) );
-
-       /* fill in the rest of the aio_workq_entry */
-       entryp->procp = procp;
-       entryp->uaiocbp = aiocbp;
-       entryp->flags |= AIO_LIO;
-       entryp->group_tag = group_tag;
-       entryp->aio_map = VM_MAP_NULL;
-
-       if ( !IS_64BIT_PROCESS(procp) ) {
-               struct aiocb aiocb32;
-
-               result = copyin( aiocbp, &aiocb32, sizeof(aiocb32) );
-               if ( result == 0 )
-                       do_munge_aiocb( &aiocb32, &entryp->aiocb );
-       } else 
-               result = copyin( aiocbp, &entryp->aiocb, sizeof(entryp->aiocb) );
-
-       if ( result != 0 ) {
-               result = EAGAIN;
-               goto error_exit;
-       }
 
-       /* look for lio_listio LIO_NOP requests and ignore them. */
-       /* Not really an error, but we need to free our aio_workq_entry.  */
+       /*
+        * Look for lio_listio LIO_NOP requests and ignore them; this is
+        * not really an error, but we need to free our aio_workq_entry.
+        */
        if ( entryp->aiocb.aio_lio_opcode == LIO_NOP ) {
                result = 0;
                goto error_exit;
        }
 
-       result = aio_validate( entryp );
-       if ( result != 0 ) {
-               goto error_exit;
-       }
-
        *entrypp = entryp;
        return( 0 );
        
 error_exit:
-       if ( entryp != NULL )
-               zfree( aio_workq_zonep, entryp );
+
+       if ( entryp != NULL ) {
+               /*
+                * This entry has not been queued up so no worries about
+                * unlocked state and aio_map
+                */
+               aio_free_request(entryp);
+       }
                
        return( result );
        
-} /* lio_create_sync_entry */
+} /* lio_create_entry */
 
 
 /*
  * aio_free_request - remove our reference on the user land map and
- * free the work queue entry resources.
- * We are not holding the lock here thus aio_map is passed in and
- * zeroed while we did have the lock.
+ * free the work queue entry resources.  The entry is off all lists
+ * and has zero refcount, so no one can have a pointer to it.
  */
 
 static int
-aio_free_request( aio_workq_entry *entryp, vm_map_t the_map )
+aio_free_request(aio_workq_entry *entryp)
 {
        /* remove our reference to the user land map. */
-       if ( VM_MAP_NULL != the_map ) {
-               vm_map_deallocate( the_map );
+       if ( VM_MAP_NULL != entryp->aio_map) {
+               vm_map_deallocate(entryp->aio_map);
        }
-               
+
+       /* remove our reference to thread which enqueued the request */
+       if ( NULL != entryp->thread ) {
+               thread_deallocate( entryp->thread );
+       }
+
+       entryp->aio_refcount = -1; /* A bit of poisoning in case of bad refcounting. */
+       
        zfree( aio_workq_zonep, entryp );
 
        return( 0 );
@@ -1769,9 +2082,11 @@ aio_free_request( aio_workq_entry *entryp, vm_map_t the_map )
 } /* aio_free_request */
 
 
-/* aio_validate - validate the aiocb passed in by one of the aio syscalls.
+/*
+ * aio_validate
+ *
+ * validate the aiocb passed in by one of the aio syscalls.
  */
-
 static int
 aio_validate( aio_workq_entry *entryp ) 
 {
@@ -1793,32 +2108,46 @@ aio_validate( aio_workq_entry *entryp )
        }
 
        flag = FREAD;
-       if ( (entryp->flags & (AIO_WRITE | AIO_FSYNC)) != 0 ) {
+       if ( (entryp->flags & (AIO_WRITE | AIO_FSYNC | AIO_DSYNC)) != 0 ) {
                flag = FWRITE;
        }
 
        if ( (entryp->flags & (AIO_READ | AIO_WRITE)) != 0 ) {
-               // LP64todo - does max value for aio_nbytes need to grow? 
                if ( entryp->aiocb.aio_nbytes > INT_MAX         ||
                         entryp->aiocb.aio_buf == USER_ADDR_NULL ||
                         entryp->aiocb.aio_offset < 0 )
                        return( EINVAL );
        }
 
-       /* validate aiocb.aio_sigevent.  at this point we only support sigev_notify
-        * equal to SIGEV_SIGNAL or SIGEV_NONE.  this means sigev_value, 
-        * sigev_notify_function, and sigev_notify_attributes are ignored.
+       /*
+        * validate aiocb.aio_sigevent.  at this point we only support
+        * sigev_notify equal to SIGEV_SIGNAL or SIGEV_NONE.  this means
+        * sigev_value, sigev_notify_function, and sigev_notify_attributes
+        * are ignored, since SIGEV_THREAD is unsupported.  This is consistent
+        * with no [RTS] (RalTime Signal) option group support.
         */
-       if ( entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL ) {
+       switch ( entryp->aiocb.aio_sigevent.sigev_notify ) {
+       case SIGEV_SIGNAL:
+           {
                int             signum;
+
                /* make sure we have a valid signal number */
                signum = entryp->aiocb.aio_sigevent.sigev_signo;
                if ( signum <= 0 || signum >= NSIG || 
                         signum == SIGKILL || signum == SIGSTOP )
                        return (EINVAL);
-       }
-       else if ( entryp->aiocb.aio_sigevent.sigev_notify != SIGEV_NONE )
+           }
+           break;
+
+       case SIGEV_NONE:
+               break;
+
+       case SIGEV_THREAD:
+               /* Unsupported [RTS] */
+
+       default:
                return (EINVAL);
+       }
        
        /* validate the file descriptor and that the file was opened
         * for the appropriate read / write access.
@@ -1831,7 +2160,7 @@ aio_validate( aio_workq_entry *entryp )
                        /* we don't have read or write access */
                        result = EBADF;
                }
-               else if ( fp->f_fglob->fg_type != DTYPE_VNODE ) {
+               else if ( FILEGLOB_DTYPE(fp->f_fglob) != DTYPE_VNODE ) {
                        /* this is not a file */
                        result = ESPIPE;
                } else
@@ -1849,61 +2178,34 @@ aio_validate( aio_workq_entry *entryp )
 
 } /* aio_validate */
 
+static int 
+aio_increment_total_count()
+{
+       return OSIncrementAtomic(&aio_anchor.aio_total_count);
+}
 
-/*
- * aio_get_process_count - runs through our queues that hold outstanding 
- * async IO reqests and totals up number of requests for the given
- * process. 
- * NOTE - caller must hold aio lock! 
- */
+static int             
+aio_decrement_total_count()
+{
+       int old = OSDecrementAtomic(&aio_anchor.aio_total_count);
+       if (old <= 0) {
+               panic("Negative total AIO count!\n");
+       }
+
+       return old;
+}
 
 static int
-aio_get_process_count( struct proc *procp ) 
+aio_get_process_count(proc_t procp ) 
 {
-       aio_workq_entry                         *entryp;
-       int                                                     count;
-       
-       /* begin with count of completed async IO requests for this process */
-       count = procp->aio_done_count;
-       
-       /* add in count of active async IO requests for this process */
-       count += procp->aio_active_count;
-       
-       /* look for matches on our queue of asynchronous todo work */
-       TAILQ_FOREACH( entryp, &aio_anchor.aio_async_workq, aio_workq_link ) {
-               if ( procp == entryp->procp ) {
-                       count++;
-               }
-       }
-       
-       /* look for matches on our queue of synchronous todo work */
-       TAILQ_FOREACH( entryp, &aio_anchor.lio_sync_workq, aio_workq_link ) {
-               if ( procp == entryp->procp ) {
-                       count++;
-               }
-       }
-       
-       return( count );
+       return procp->p_aio_total_count;
        
 } /* aio_get_process_count */
 
-
-/*
- * aio_get_all_queues_count - get total number of entries on all aio work queues.  
- * NOTE - caller must hold aio lock! 
- */
-
 static int
 aio_get_all_queues_count( void ) 
 {
-       int                                                     count;
-       
-       count = aio_anchor.aio_async_workq_count;
-       count += aio_anchor.lio_sync_workq_count;
-       count += aio_anchor.aio_active_count;
-       count += aio_anchor.aio_done_count;
-               
-       return( count );
+       return aio_anchor.aio_total_count;
        
 } /* aio_get_all_queues_count */
 
@@ -1911,113 +2213,143 @@ aio_get_all_queues_count( void )
 /*
  * do_aio_completion.  Handle async IO completion.  
  */
-
 static void
 do_aio_completion( aio_workq_entry *entryp ) 
 {
-       /* signal user land process if appropriate */
+
+       boolean_t               lastLioCompleted = FALSE;
+       aio_lio_context *lio_context = NULL;
+       int waiter = 0;
+       
+       lio_context = (aio_lio_context *)entryp->group_tag;
+       
+       if (lio_context != NULL) {
+               
+               aio_proc_lock_spin(entryp->procp);
+
+               /* Account for this I/O completing. */
+               lio_context->io_completed++;
+               
+               /* Are we done with this lio context? */
+               if (lio_context->io_issued == lio_context->io_completed) {
+                       lastLioCompleted = TRUE;
+               }
+               
+               waiter = lio_context->io_waiter;
+               
+               /* explicit wakeup of lio_listio() waiting in LIO_WAIT */
+               if ((entryp->flags & AIO_LIO_NOTIFY) && (lastLioCompleted) && (waiter != 0)) {
+                       /* wake up the waiter */
+                       wakeup(lio_context);
+               }
+               
+               aio_proc_unlock(entryp->procp);
+       }
+       
        if ( entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL &&
                 (entryp->flags & AIO_DISABLE) == 0 ) {
-
-               /* 
-                * if group_tag is non zero then make sure this is the last IO request
-                * in the group before we signal.
-                */
-               if ( entryp->group_tag == 0 || 
-                        (entryp->group_tag != 0 && aio_last_group_io( entryp )) ) {
+               
+               boolean_t       performSignal = FALSE;
+                if (lio_context == NULL) {
+                       performSignal = TRUE;
+                }
+                else {
+                       /* 
+                        * If this was the last request in the group and a signal
+                        * is desired, send one.
+                        */
+                       performSignal = lastLioCompleted;
+                }
+                
+                if (performSignal) {
+                       
                        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_sig)) | DBG_FUNC_NONE,
-                                                 (int)entryp->procp, (int)entryp->uaiocbp, 
-                                                 entryp->aiocb.aio_sigevent.sigev_signo, 0, 0 );
+                                (int)entryp->procp, (int)entryp->uaiocbp, 
+                                entryp->aiocb.aio_sigevent.sigev_signo, 0, 0 );
                        
                        psignal( entryp->procp, entryp->aiocb.aio_sigevent.sigev_signo );
-                       return;
                }
        }
 
+       if ((entryp->flags & AIO_EXIT_WAIT) && (entryp->flags & AIO_CLOSE_WAIT)) {
+               panic("Close and exit flags set at the same time\n");
+       }
+       
        /*
-        * need to handle case where a process is trying to exit, exec, or close
-        * and is currently waiting for active aio requests to complete.  If  
-        * AIO_WAITING is set then we need to look to see if there are any 
+        * need to handle case where a process is trying to exit, exec, or
+        * close and is currently waiting for active aio requests to complete.
+        * If AIO_CLEANUP_WAIT is set then we need to look to see if there are any 
         * other requests in the active queue for this process.  If there are 
-        * none then wakeup using the AIO_CLEANUP_SLEEP_CHAN tsleep channel.  If 
-        * there are some still active then do nothing - we only want to wakeup 
-        * when all active aio requests for the process are complete. 
+        * none then wakeup using the AIO_CLEANUP_SLEEP_CHAN tsleep channel.
+        * If there are some still active then do nothing - we only want to
+        * wakeup when all active aio requests for the process are complete. 
+        *
+        * Don't need to lock the entry or proc to check the cleanup flag.  It can only be
+        * set for cancellation, while the entryp is still on a proc list; now it's 
+        * off, so that flag is already set if it's going to be.
         */
-       if ( (entryp->flags & AIO_WAITING) != 0 ) {
+       if ( (entryp->flags & AIO_EXIT_WAIT) != 0 ) {
                int             active_requests;
 
                KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait)) | DBG_FUNC_NONE,
                                          (int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 );
                
-               AIO_LOCK;
+               aio_proc_lock_spin(entryp->procp);
                active_requests = aio_active_requests_for_process( entryp->procp );
-               //AIO_UNLOCK;
                if ( active_requests < 1 ) {
-                       /* no active aio requests for this process, continue exiting */
-                       wakeup_one( (caddr_t) &entryp->procp->AIO_CLEANUP_SLEEP_CHAN );
+                       /* 
+                        * no active aio requests for this process, continue exiting.  In this
+                        * case, there should be no one else waiting ont he proc in AIO...
+                        */
+                       wakeup_one((caddr_t)&entryp->procp->AIO_CLEANUP_SLEEP_CHAN);
+                       aio_proc_unlock(entryp->procp);
 
                        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake)) | DBG_FUNC_NONE,
                                                  (int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 );
+               } else {
+                       aio_proc_unlock(entryp->procp);
                }
-               AIO_UNLOCK;
-               return;
        }
+       
+       if ( (entryp->flags & AIO_CLOSE_WAIT) != 0 ) {
+               int             active_requests;
+
+               KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait)) | DBG_FUNC_NONE,
+                                         (int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 );
+               
+               aio_proc_lock_spin(entryp->procp);
+               active_requests = aio_proc_active_requests_for_file( entryp->procp, entryp->aiocb.aio_fildes);
+               if ( active_requests < 1 ) {
+                       /* Can't wakeup_one(); multiple closes might be in progress. */
+                       wakeup(&entryp->procp->AIO_CLEANUP_SLEEP_CHAN);
+                       aio_proc_unlock(entryp->procp);
 
+                       KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake)) | DBG_FUNC_NONE,
+                                                 (int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 );
+               } else {
+                       aio_proc_unlock(entryp->procp);
+               }
+       }
        /* 
-        * aio_suspend case when a signal was not requested.  In that scenario we  
-        * are sleeping on the AIO_SUSPEND_SLEEP_CHAN channel.   
-        * NOTE - the assumption here is that this wakeup call is inexpensive.
-        * we really only need to do this when an aio_suspend call is pending.
-        * If we find the wakeup call should be avoided we could mark the 
-        * async IO requests given in the list provided by aio_suspend and only
-        * call wakeup for them.  If we do mark them we should unmark them after
-        * the aio_suspend wakes up.
+        * A thread in aio_suspend() wants to known about completed IOs.  If it checked
+        * the done list before we moved our AIO there, then it already asserted its wait,
+        * and we can wake it up without holding the lock.  If it checked the list after
+        * we did our move, then it already has seen the AIO that we moved.  Herego, we
+        * can do our wakeup without holding the lock.
         */
-       AIO_LOCK; 
-       wakeup_one( (caddr_t) &entryp->procp->AIO_SUSPEND_SLEEP_CHAN ); 
-       AIO_UNLOCK;
-
+       wakeup( (caddr_t) &entryp->procp->AIO_SUSPEND_SLEEP_CHAN ); 
        KERNEL_DEBUG( (BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_suspend_wake)) | DBG_FUNC_NONE,
                                  (int)entryp->procp, (int)entryp->uaiocbp, 0, 0, 0 );
-       
-       return;
-       
-} /* do_aio_completion */
 
+       /*   
+        * free the LIO context if the last lio completed and no thread is
+        * waiting
+        */
+       if (lastLioCompleted && (waiter == 0)) 
+               free_lio_context (lio_context);
 
-/*
- * aio_last_group_io - checks to see if this is the last unfinished IO request
- * for the given group_tag.  Returns TRUE if there are no other active IO 
- * requests for this group or FALSE if the are active IO requests 
- * NOTE - AIO_LOCK must be held by caller
- */
-
-static boolean_t
-aio_last_group_io( aio_workq_entry *entryp ) 
-{
-       aio_workq_entry                         *my_entryp;
-                       
-       /* look for matches on our queue of active async IO requests */
-       TAILQ_FOREACH( my_entryp, &entryp->procp->aio_activeq, aio_workq_link ) {
-               if ( my_entryp->group_tag == entryp->group_tag )
-                       return( FALSE );
-       }
-       
-       /* look for matches on our queue of asynchronous todo work */
-       TAILQ_FOREACH( my_entryp, &aio_anchor.aio_async_workq, aio_workq_link ) {
-               if ( my_entryp->group_tag == entryp->group_tag )
-                       return( FALSE );
-       }
-       
-       /* look for matches on our queue of synchronous todo work */
-       TAILQ_FOREACH( my_entryp, &aio_anchor.lio_sync_workq, aio_workq_link ) {
-               if ( my_entryp->group_tag == entryp->group_tag )
-                       return( FALSE );
-       }
-
-       return( TRUE );
        
-} /* aio_last_group_io */
+} /* do_aio_completion */
 
 
 /*
@@ -2026,8 +2358,9 @@ aio_last_group_io( aio_workq_entry *entryp )
 static int
 do_aio_read( aio_workq_entry *entryp )
 {
-       struct fileproc                         *fp;
-       int                                             error;
+       struct fileproc         *fp;
+       int                                     error;
+       struct vfs_context      context;
 
        if ( (error = fp_lookup(entryp->procp, entryp->aiocb.aio_fildes, &fp , 0)) )
                return(error);
@@ -2035,18 +2368,16 @@ do_aio_read( aio_workq_entry *entryp )
                fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0);
                return(EBADF);
        }
-       if ( fp != NULL ) {
-               error = dofileread( entryp->procp, fp, entryp->aiocb.aio_fildes, 
-                                                       entryp->aiocb.aio_buf, 
-                                                       entryp->aiocb.aio_nbytes,
-                                                       entryp->aiocb.aio_offset, FOF_OFFSET, 
-                                                       &entryp->returnval );
-               fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0);
-       }
-       else {
-               fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0);
-               error = EBADF;
-       }
+
+       context.vc_thread = entryp->thread;     /* XXX */
+       context.vc_ucred = fp->f_fglob->fg_cred;
+
+       error = dofileread(&context, fp, 
+                               entryp->aiocb.aio_buf, 
+                               entryp->aiocb.aio_nbytes,
+                               entryp->aiocb.aio_offset, FOF_OFFSET, 
+                               &entryp->returnval);
+       fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0);
                        
        return( error );
        
@@ -2060,7 +2391,8 @@ static int
 do_aio_write( aio_workq_entry *entryp )
 {
        struct fileproc                 *fp;
-       int                                             error;
+       int                             error, flags;
+       struct vfs_context              context;
 
        if ( (error = fp_lookup(entryp->procp, entryp->aiocb.aio_fildes, &fp , 0)) )
                return(error);
@@ -2068,23 +2400,28 @@ do_aio_write( aio_workq_entry *entryp )
                fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0);
                return(EBADF);
        }
-       if ( fp != NULL ) {
-               /* NB: tell dofilewrite the offset, and to use the proc cred */
-               error = dofilewrite( entryp->procp,
-                                    fp,
-                                    entryp->aiocb.aio_fildes,
-                                    entryp->aiocb.aio_buf,
-                                    entryp->aiocb.aio_nbytes,
-                                    entryp->aiocb.aio_offset,
-                                    FOF_OFFSET | FOF_PCRED,
-                                    &entryp->returnval);
-               
-               fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0);
+
+       flags = FOF_PCRED;
+       if ( (fp->f_fglob->fg_flag & O_APPEND) == 0 ) {
+               flags |= FOF_OFFSET;
        }
-       else {
+
+       context.vc_thread = entryp->thread;     /* XXX */
+       context.vc_ucred = fp->f_fglob->fg_cred;
+
+       /* NB: tell dofilewrite the offset, and to use the proc cred */
+       error = dofilewrite(&context,
+                               fp,
+                               entryp->aiocb.aio_buf,
+                               entryp->aiocb.aio_nbytes,
+                               entryp->aiocb.aio_offset,
+                               flags,
+                               &entryp->returnval);
+
+       if (entryp->returnval)
+               fp_drop_written(entryp->procp, entryp->aiocb.aio_fildes, fp);
+       else
                fp_drop(entryp->procp, entryp->aiocb.aio_fildes, fp, 0);
-               error = EBADF;
-       }
 
        return( error );
 
@@ -2094,18 +2431,33 @@ do_aio_write( aio_workq_entry *entryp )
 /*
  * aio_active_requests_for_process - return number of active async IO
  * requests for the given process.
- * NOTE - caller must hold aio lock!
  */
+static int
+aio_active_requests_for_process(proc_t procp )
+{
+       return( procp->p_aio_active_count );
+
+} /* aio_active_requests_for_process */
 
+/*
+ * Called with the proc locked.
+ */
 static int
-aio_active_requests_for_process( struct proc *procp )
+aio_proc_active_requests_for_file(proc_t procp, int fd)
 {
-                               
-       return( procp->aio_active_count );
+       int count = 0;
+       aio_workq_entry *entryp;
+       TAILQ_FOREACH(entryp, &procp->p_aio_activeq, aio_proc_link) {
+               if (entryp->aiocb.aio_fildes == fd) {
+                       count++;
+               }
+       }
 
+       return count;
 } /* aio_active_requests_for_process */
 
 
+
 /*
  * do_aio_fsync
  */
@@ -2115,14 +2467,28 @@ do_aio_fsync( aio_workq_entry *entryp )
        struct vfs_context      context;
        struct vnode            *vp;
        struct fileproc         *fp;
-       int                                     error;
-       
-       /* 
-        * NOTE - we will not support AIO_DSYNC until fdatasync() is supported.  
-        * AIO_DSYNC is caught before we queue up a request and flagged as an error.  
-        * The following was shamelessly extracted from fsync() implementation. 
-        */
+       int                     sync_flag;
+       int                     error;
 
+       /*
+        * We are never called unless either AIO_FSYNC or AIO_DSYNC are set.
+        *
+        * If AIO_DSYNC is set, we can tell the lower layers that it is OK
+        * to mark for update the metadata not strictly necessary for data
+        * retrieval, rather than forcing it to disk.
+        *
+        * If AIO_FSYNC is set, we have to also wait for metadata not really
+        * necessary to data retrival are committed to stable storage (e.g.
+        * atime, mtime, ctime, etc.).
+        *
+        * Metadata necessary for data retrieval ust be committed to stable
+        * storage in either case (file length, etc.).
+        */
+       if (entryp->flags & AIO_FSYNC)
+               sync_flag = MNT_WAIT;
+       else
+               sync_flag = MNT_DWAIT;
+       
        error = fp_getfvp( entryp->procp, entryp->aiocb.aio_fildes, &fp, &vp);
        if ( error == 0 ) {
                if ( (error = vnode_getwithref(vp)) ) {
@@ -2130,10 +2496,10 @@ do_aio_fsync( aio_workq_entry *entryp )
                        entryp->returnval = -1;
                        return(error);
                }
-               context.vc_proc = entryp->procp;
+               context.vc_thread = current_thread();
                context.vc_ucred = fp->f_fglob->fg_cred;
 
-               error = VNOP_FSYNC( vp, MNT_WAIT, &context);
+               error = VNOP_FSYNC( vp, sync_flag, &context);
 
                (void)vnode_put(vp);
 
@@ -2151,20 +2517,20 @@ do_aio_fsync( aio_workq_entry *entryp )
  * is_already_queued - runs through our queues to see if the given  
  * aiocbp / process is there.  Returns TRUE if there is a match
  * on any of our aio queues.
- * NOTE - callers must hold aio lock!
+ *
+ * Called with proc aio lock held (can be held spin)
  */
-
 static boolean_t
-is_already_queued(     struct proc *procp, 
+is_already_queued(proc_t procp, 
                                        user_addr_t aiocbp ) 
 {
        aio_workq_entry                 *entryp;
        boolean_t                               result;
-               
+       
        result = FALSE;
                
        /* look for matches on our queue of async IO requests that have completed */
-       TAILQ_FOREACH( entryp, &procp->aio_doneq, aio_workq_link ) {
+       TAILQ_FOREACH( entryp, &procp->p_aio_doneq, aio_proc_link ) {
                if ( aiocbp == entryp->uaiocbp ) {
                        result = TRUE;
                        goto ExitThisRoutine;
@@ -2172,35 +2538,32 @@ is_already_queued(      struct proc *procp,
        }
        
        /* look for matches on our queue of active async IO requests */
-       TAILQ_FOREACH( entryp, &procp->aio_activeq, aio_workq_link ) {
+       TAILQ_FOREACH( entryp, &procp->p_aio_activeq, aio_proc_link ) {
                if ( aiocbp == entryp->uaiocbp ) {
                        result = TRUE;
                        goto ExitThisRoutine;
                }
        }
        
-       /* look for matches on our queue of asynchronous todo work */
-       TAILQ_FOREACH( entryp, &aio_anchor.aio_async_workq, aio_workq_link ) {
-               if ( procp == entryp->procp && aiocbp == entryp->uaiocbp ) {
-                       result = TRUE;
-                       goto ExitThisRoutine;
-               }
-       }
-       
-       /* look for matches on our queue of synchronous todo work */
-       TAILQ_FOREACH( entryp, &aio_anchor.lio_sync_workq, aio_workq_link ) {
-               if ( procp == entryp->procp && aiocbp == entryp->uaiocbp ) {
-                       result = TRUE;
-                       goto ExitThisRoutine;
-               }
-       }
-
 ExitThisRoutine:
        return( result );
        
 } /* is_already_queued */
 
 
+static void
+free_lio_context(aio_lio_context* context)
+{
+
+#if DEBUG      
+       OSDecrementAtomic(&lio_contexts_alloced);
+#endif /* DEBUG */
+
+       FREE( context, M_TEMP );
+
+} /* free_lio_context */
+
+
 /*
  * aio initialization
  */
@@ -2210,26 +2573,28 @@ aio_init( void )
        int                     i;
        
        aio_lock_grp_attr = lck_grp_attr_alloc_init();
-       aio_lock_grp = lck_grp_alloc_init("aio", aio_lock_grp_attr);
+       aio_proc_lock_grp = lck_grp_alloc_init("aio_proc", aio_lock_grp_attr);;
+       aio_entry_lock_grp = lck_grp_alloc_init("aio_entry", aio_lock_grp_attr);;
+       aio_queue_lock_grp = lck_grp_alloc_init("aio_queue", aio_lock_grp_attr);;
        aio_lock_attr = lck_attr_alloc_init();
 
-       aio_lock = lck_mtx_alloc_init(aio_lock_grp, aio_lock_attr);
+       lck_mtx_init(&aio_entry_mtx, aio_entry_lock_grp, aio_lock_attr);
+       lck_mtx_init(&aio_proc_mtx, aio_proc_lock_grp, aio_lock_attr);
 
-       AIO_LOCK;
-       TAILQ_INIT( &aio_anchor.aio_async_workq );      
-       TAILQ_INIT( &aio_anchor.lio_sync_workq );       
-       aio_anchor.aio_async_workq_count = 0;
-       aio_anchor.lio_sync_workq_count = 0;
-       aio_anchor.aio_active_count = 0;
+       aio_anchor.aio_inflight_count = 0;
        aio_anchor.aio_done_count = 0;
-       AIO_UNLOCK;
+       aio_anchor.aio_total_count = 0;
+       aio_anchor.aio_num_workqs = AIO_NUM_WORK_QUEUES;
+
+       for (i = 0; i < AIO_NUM_WORK_QUEUES; i++) {
+               aio_workq_init(&aio_anchor.aio_async_workqs[i]);
+       }
+
 
        i = sizeof( aio_workq_entry );
        aio_workq_zonep = zinit( i, i * aio_max_requests, i * aio_max_requests, "aiowq" );
                
        _aio_create_worker_threads( aio_worker_threads );
-
-       return;
        
 } /* aio_init */
 
@@ -2246,10 +2611,11 @@ _aio_create_worker_threads( int num )
        for ( i = 0; i < num; i++ ) {
                thread_t                myThread;
                
-               myThread = kernel_thread( kernel_task, aio_work_thread );
-               if ( THREAD_NULL == myThread ) {
+               if ( KERN_SUCCESS != kernel_thread_start((thread_continue_t)aio_work_thread, NULL, &myThread) ) {
                        printf( "%s - failed to create a work thread \n", __FUNCTION__ ); 
                }
+               else
+                       thread_deallocate(myThread);
        }
        
        return;
@@ -2273,7 +2639,7 @@ get_aiotask(void)
  * aiocb (in our case that is a user_aiocb)
  */
 static void 
-do_munge_aiocb( struct aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp ) 
+do_munge_aiocb_user32_to_user( struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp ) 
 {
        the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes;
        the_user_aiocbp->aio_offset = my_aiocbp->aio_offset;
@@ -2298,3 +2664,26 @@ do_munge_aiocb( struct aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp )
        the_user_aiocbp->aio_sigevent.sigev_notify_attributes = 
                CAST_USER_ADDR_T(my_aiocbp->aio_sigevent.sigev_notify_attributes);
 }
+
+/* Similar for 64-bit user process, so that we don't need to satisfy
+ * the alignment constraints of the original user64_aiocb
+ */
+static void 
+do_munge_aiocb_user64_to_user( struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp ) 
+{
+       the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes;
+       the_user_aiocbp->aio_offset = my_aiocbp->aio_offset;
+       the_user_aiocbp->aio_buf = my_aiocbp->aio_buf;
+       the_user_aiocbp->aio_nbytes = my_aiocbp->aio_nbytes;
+       the_user_aiocbp->aio_reqprio = my_aiocbp->aio_reqprio;
+       the_user_aiocbp->aio_lio_opcode = my_aiocbp->aio_lio_opcode;
+       
+       the_user_aiocbp->aio_sigevent.sigev_notify = my_aiocbp->aio_sigevent.sigev_notify;
+       the_user_aiocbp->aio_sigevent.sigev_signo = my_aiocbp->aio_sigevent.sigev_signo;
+       the_user_aiocbp->aio_sigevent.sigev_value.size_equivalent.sival_int = 
+               my_aiocbp->aio_sigevent.sigev_value.size_equivalent.sival_int;
+       the_user_aiocbp->aio_sigevent.sigev_notify_function = 
+               my_aiocbp->aio_sigevent.sigev_notify_function;
+       the_user_aiocbp->aio_sigevent.sigev_notify_attributes = 
+               my_aiocbp->aio_sigevent.sigev_notify_attributes;
+}