]> git.saurik.com Git - apple/cf.git/blobdiff - CFMessagePort.c
CF-550.42.tar.gz
[apple/cf.git] / CFMessagePort.c
index 0e9b79b818f88b67eaefce2e8544565185c20fc3..40afce5ed0ac8a1ecdf4e03b0d9e9a54d5a3bdb2 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008 Apple Inc. All rights reserved.
+ * Copyright (c) 2010 Apple Inc. All rights reserved.
  *
  * @APPLE_LICENSE_HEADER_START@
  * 
@@ -20,8 +20,9 @@
  * 
  * @APPLE_LICENSE_HEADER_END@
  */
+
 /*     CFMessagePort.c
-       Copyright 1998-2002, Apple, Inc. All rights reserved.
+       Copyright (c) 1998-2010, Apple Inc. All rights reserved.
        Responsibility: Christopher Kane
 */
 
@@ -39,6 +40,9 @@
 #include <math.h>
 #include <mach/mach_time.h>
 #include <dlfcn.h>
+#include <dispatch/dispatch.h>
+
+extern pid_t getpid(void);
 
 
 #define __kCFMessagePortMaxNameLengthMax 255
     #define __kCFMessagePortMaxNameLength __kCFMessagePortMaxNameLengthMax
 #endif
 
+#define __CFMessagePortMaxDataSize 0x60000000L
+
+
+DISPATCH_HELPER_FUNCTIONS(mport, CFMessagePort)
+
+
 static CFSpinLock_t __CFAllMessagePortsLock = CFSpinLockInit;
 static CFMutableDictionaryRef __CFAllLocalMessagePorts = NULL;
 static CFMutableDictionaryRef __CFAllRemoteMessagePorts = NULL;
@@ -68,6 +78,8 @@ struct __CFMessagePort {
     int32_t _perPID;                   /* zero if not per-pid, else pid */
     CFMachPortRef _replyPort;          /* only used by remote port; immutable once created; invalidated */
     CFRunLoopSourceRef _source;                /* only used by local port; immutable once created; invalidated */
+    dispatch_source_t _dispatchSource;  /* only used by local port; invalidated */
+    dispatch_queue_t _dispatchQ;       /* only used by local port */
     CFMessagePortInvalidationCallBack _icallout;
     CFMessagePortCallBack _callout;    /* only used by local port; immutable */
     CFMessagePortContext _context;     /* not part of remote port; immutable; invalidated */
@@ -133,61 +145,74 @@ CF_INLINE void __CFMessagePortUnlock(CFMessagePortRef ms) {
 // Just a heuristic
 #define __CFMessagePortMaxInlineBytes 4096*10
 
-struct __CFMessagePortMachMsg0 {
+struct __CFMessagePortMachMessage0 {
+    mach_msg_base_t base;
+    int32_t magic;
     int32_t msgid;
     int32_t byteslen;
-    uint8_t bytes[__CFMessagePortMaxInlineBytes];
+    uint8_t bytes[0];
 };
 
-struct __CFMessagePortMachMsg1 {
-    mach_msg_descriptor_t desc;
+struct __CFMessagePortMachMessage1 {
+    mach_msg_base_t base;
+    mach_msg_ool_descriptor_t ool;
+    int32_t magic;
     int32_t msgid;
+    int32_t byteslen;
 };
 
-struct __CFMessagePortMachMessage {
-    mach_msg_header_t head;
-    mach_msg_body_t body;
-    union {
-       struct __CFMessagePortMachMsg0 msg0;
-       struct __CFMessagePortMachMsg1 msg1;
-    } contents;
-};
-
-static struct __CFMessagePortMachMessage *__CFMessagePortCreateMessage(CFAllocatorRef allocator, bool reply, mach_port_t port, mach_port_t replyPort, int32_t convid, int32_t msgid, const uint8_t *bytes, int32_t byteslen) {
-    struct __CFMessagePortMachMessage *msg;
-    int32_t size = sizeof(mach_msg_header_t) + sizeof(mach_msg_body_t);
-    if (byteslen < __CFMessagePortMaxInlineBytes) {
-       size += 2 * sizeof(int32_t) + ((byteslen + 3) & ~0x3);
-    } else {
-       size += sizeof(struct __CFMessagePortMachMsg1);
-    }
-    msg = CFAllocatorAllocate(allocator, size, 0);
-    msg->head.msgh_id = convid;
-    msg->head.msgh_size = size;
-    msg->head.msgh_remote_port = port;
-    msg->head.msgh_local_port = replyPort;
-    msg->head.msgh_reserved = 0;
-//    msg->head.msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_COPY_SEND, (replyPort ? MACH_MSG_TYPE_MAKE_SEND : 0));
-    msg->head.msgh_bits = MACH_MSGH_BITS((reply ? MACH_MSG_TYPE_MOVE_SEND_ONCE : MACH_MSG_TYPE_COPY_SEND), (MACH_PORT_NULL != replyPort ? MACH_MSG_TYPE_MAKE_SEND_ONCE : 0));
-    if (byteslen < __CFMessagePortMaxInlineBytes) {
-       msg->body.msgh_descriptor_count = 0;
-       msg->contents.msg0.msgid = CFSwapInt32HostToLittle(msgid);
-       msg->contents.msg0.byteslen = CFSwapInt32HostToLittle(byteslen);
+#define MAGIC 0xF1F2F3F4
+
+#define MSGP0_FIELD(msgp, ident) ((struct __CFMessagePortMachMessage0 *)msgp)->ident
+#define MSGP1_FIELD(msgp, ident) ((struct __CFMessagePortMachMessage1 *)msgp)->ident
+#define MSGP_GET(msgp, ident) \
+    ((((mach_msg_base_t *)msgp)->body.msgh_descriptor_count) ? MSGP1_FIELD(msgp, ident) : MSGP0_FIELD(msgp, ident))
+
+static mach_msg_base_t *__CFMessagePortCreateMessage(bool reply, mach_port_t port, mach_port_t replyPort, int32_t convid, int32_t msgid, const uint8_t *bytes, int32_t byteslen) {
+    if (__CFMessagePortMaxDataSize < byteslen) return NULL;
+    int32_t rounded_byteslen = ((byteslen + 3) & ~0x3);
+    if (rounded_byteslen <= __CFMessagePortMaxInlineBytes) {
+        int32_t size = sizeof(struct __CFMessagePortMachMessage0) + rounded_byteslen;
+        struct __CFMessagePortMachMessage0 *msg = CFAllocatorAllocate(kCFAllocatorSystemDefault, size, 0);
+        if (!msg) return NULL;
+        memset(msg, 0, size);
+        msg->base.header.msgh_id = convid;
+        msg->base.header.msgh_size = size;
+        msg->base.header.msgh_remote_port = port;
+        msg->base.header.msgh_local_port = replyPort;
+        msg->base.header.msgh_reserved = 0;
+        msg->base.header.msgh_bits = MACH_MSGH_BITS((reply ? MACH_MSG_TYPE_MOVE_SEND_ONCE : MACH_MSG_TYPE_COPY_SEND), (MACH_PORT_NULL != replyPort ? MACH_MSG_TYPE_MAKE_SEND_ONCE : 0));
+       msg->base.body.msgh_descriptor_count = 0;
+        msg->magic = MAGIC;
+        msg->msgid = CFSwapInt32HostToLittle(msgid);
+        msg->byteslen = CFSwapInt32HostToLittle(byteslen);
        if (NULL != bytes && 0 < byteslen) {
-           memmove(msg->contents.msg0.bytes, bytes, byteslen);
+           memmove(msg->bytes, bytes, byteslen);
        }
-       memset(msg->contents.msg0.bytes + byteslen, 0, ((byteslen + 3) & ~0x3) - byteslen);
+        return (mach_msg_base_t *)msg;
     } else {
-       msg->head.msgh_bits |= MACH_MSGH_BITS_COMPLEX;
-       msg->body.msgh_descriptor_count = 1;
-       msg->contents.msg1.desc.out_of_line.deallocate = false;
-       msg->contents.msg1.desc.out_of_line.copy = MACH_MSG_VIRTUAL_COPY;
-       msg->contents.msg1.desc.out_of_line.address = (void *)bytes;
-       msg->contents.msg1.desc.out_of_line.size = byteslen;
-       msg->contents.msg1.desc.out_of_line.type = MACH_MSG_OOL_DESCRIPTOR;
-       msg->contents.msg1.msgid = CFSwapInt32HostToLittle(msgid);
+        int32_t size = sizeof(struct __CFMessagePortMachMessage1);
+        struct __CFMessagePortMachMessage1 *msg = CFAllocatorAllocate(kCFAllocatorSystemDefault, size, 0);
+        if (!msg) return NULL;
+        memset(msg, 0, size);
+        msg->base.header.msgh_id = convid;
+        msg->base.header.msgh_size = size;
+        msg->base.header.msgh_remote_port = port;
+        msg->base.header.msgh_local_port = replyPort;
+        msg->base.header.msgh_reserved = 0;
+        msg->base.header.msgh_bits = MACH_MSGH_BITS((reply ? MACH_MSG_TYPE_MOVE_SEND_ONCE : MACH_MSG_TYPE_COPY_SEND), (MACH_PORT_NULL != replyPort ? MACH_MSG_TYPE_MAKE_SEND_ONCE : 0));
+       msg->base.header.msgh_bits |= MACH_MSGH_BITS_COMPLEX;
+       msg->base.body.msgh_descriptor_count = 1;
+        msg->magic = MAGIC;
+        msg->msgid = CFSwapInt32HostToLittle(msgid);
+        msg->byteslen = CFSwapInt32HostToLittle(byteslen);
+       msg->ool.deallocate = false;
+       msg->ool.copy = MACH_MSG_VIRTUAL_COPY;
+       msg->ool.address = (void *)bytes;
+       msg->ool.size = byteslen;
+       msg->ool.type = MACH_MSG_OOL_DESCRIPTOR;
+        return (mach_msg_base_t *)msg;
     }
-    return msg;
 }
 
 static CFStringRef __CFMessagePortCopyDescription(CFTypeRef cf) {
@@ -196,15 +221,15 @@ static CFStringRef __CFMessagePortCopyDescription(CFTypeRef cf) {
     const char *locked;
     CFStringRef contextDesc = NULL;
     locked = ms->_lock ? "Yes" : "No";
-    if (!__CFMessagePortIsRemote(ms)) {
+    if (__CFMessagePortIsRemote(ms)) {
+       result = CFStringCreateWithFormat(kCFAllocatorSystemDefault, NULL, CFSTR("<CFMessagePort %p [%p]>{locked = %s, valid = %s, remote = %s, name = %@}"), cf, CFGetAllocator(ms), locked, (__CFMessagePortIsValid(ms) ? "Yes" : "No"), (__CFMessagePortIsRemote(ms) ? "Yes" : "No"), ms->_name);
+    } else {
        if (NULL != ms->_context.info && NULL != ms->_context.copyDescription) {
            contextDesc = ms->_context.copyDescription(ms->_context.info);
        }
        if (NULL == contextDesc) {
            contextDesc = CFStringCreateWithFormat(kCFAllocatorSystemDefault, NULL, CFSTR("<CFMessagePort context %p>"), ms->_context.info);
        }
-       result = CFStringCreateWithFormat(kCFAllocatorSystemDefault, NULL, CFSTR("<CFMessagePort %p [%p]>{locked = %s, valid = %s, remote = %s, name = %@}"), cf, CFGetAllocator(ms), locked, (__CFMessagePortIsValid(ms) ? "Yes" : "No"), (__CFMessagePortIsRemote(ms) ? "Yes" : "No"), ms->_name);
-    } else {
        void *addr = ms->_callout;
        Dl_info info;
        const char *name = (dladdr(addr, &info) && info.dli_saddr == addr && info.dli_sname) ? info.dli_sname : "???";
@@ -287,22 +312,22 @@ CFTypeID CFMessagePortGetTypeID(void) {
     return __kCFMessagePortTypeID;
 }
 
-static CFStringRef __CFMessagePortSanitizeStringName(CFAllocatorRef allocator, CFStringRef name, uint8_t **utfnamep, CFIndex *utfnamelenp) {
+static CFStringRef __CFMessagePortSanitizeStringName(CFStringRef name, uint8_t **utfnamep, CFIndex *utfnamelenp) {
     uint8_t *utfname;
     CFIndex utflen;
     CFStringRef result;
-    utfname = CFAllocatorAllocate(allocator, __kCFMessagePortMaxNameLength + 1, 0);
+    utfname = CFAllocatorAllocate(kCFAllocatorSystemDefault, __kCFMessagePortMaxNameLength + 1, 0);
     CFStringGetBytes(name, CFRangeMake(0, CFStringGetLength(name)), kCFStringEncodingUTF8, 0, false, utfname, __kCFMessagePortMaxNameLength, &utflen);
     utfname[utflen] = '\0';
     /* A new string is created, because the original string may have been
        truncated to the max length, and we want the string name to definitely
        match the raw UTF-8 chunk that has been created. Also, this is useful
        to get a constant string in case the original name string was mutable. */
-    result = CFStringCreateWithBytes(allocator, utfname, utflen, kCFStringEncodingUTF8, false);
+    result = CFStringCreateWithBytes(kCFAllocatorSystemDefault, utfname, utflen, kCFStringEncodingUTF8, false);
     if (NULL != utfnamep) {
        *utfnamep = utfname;
     } else {
-       CFAllocatorDeallocate(allocator, utfname);
+       CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
     }
     if (NULL != utfnamelenp) {
        *utfnamelenp = utflen;
@@ -316,7 +341,7 @@ static void __CFMessagePortDummyCallback(CFMachPortRef port, void *msg, CFIndex
 
 static void __CFMessagePortInvalidationCallBack(CFMachPortRef port, void *info) {
     // info has been setup as the CFMessagePort owning the CFMachPort
-    CFMessagePortInvalidate(info);
+    if (info) CFMessagePortInvalidate(info);
 }
 
 static CFMessagePortRef __CFMessagePortCreateLocal(CFAllocatorRef allocator, CFStringRef name, CFMessagePortCallBack callout, CFMessagePortContext *context, Boolean *shouldFreeInfo, Boolean perPID) {
@@ -325,7 +350,7 @@ static CFMessagePortRef __CFMessagePortCreateLocal(CFAllocatorRef allocator, CFS
 
     if (shouldFreeInfo) *shouldFreeInfo = true;
     if (NULL != name) {
-       name = __CFMessagePortSanitizeStringName(allocator, name, &utfname, NULL);
+       name = __CFMessagePortSanitizeStringName(name, &utfname, NULL);
     }
     __CFSpinLock(&__CFAllMessagePortsLock);
     if (!perPID && NULL != name) {
@@ -334,7 +359,11 @@ static CFMessagePortRef __CFMessagePortCreateLocal(CFAllocatorRef allocator, CFS
            CFRetain(existing);
            __CFSpinUnlock(&__CFAllMessagePortsLock);
            CFRelease(name);
-           CFAllocatorDeallocate(allocator, utfname);
+           CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
+            if (!CFMessagePortIsValid(existing)) { // must do this outside lock to avoid deadlock
+               CFRelease(existing);
+                existing = NULL;
+            }
            return (CFMessagePortRef)(existing);
        }
     }
@@ -345,7 +374,7 @@ static CFMessagePortRef __CFMessagePortCreateLocal(CFAllocatorRef allocator, CFS
        if (NULL != name) {
            CFRelease(name);
        }
-       CFAllocatorDeallocate(allocator, utfname);
+       CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
        return NULL;
     }
     __CFMessagePortUnsetValid(memory);
@@ -359,6 +388,8 @@ static CFMessagePortRef __CFMessagePortCreateLocal(CFAllocatorRef allocator, CFS
     memory->_perPID = perPID ? getpid() : 0;   // actual value not terribly useful for local ports
     memory->_replyPort = NULL;
     memory->_source = NULL;
+    memory->_dispatchSource = NULL;
+    memory->_dispatchQ = NULL;
     memory->_icallout = NULL;
     memory->_callout = callout;
     memory->_context.info = NULL;
@@ -380,9 +411,9 @@ static CFMessagePortRef __CFMessagePortCreateLocal(CFAllocatorRef allocator, CFS
                    native = CFMachPortCreateWithPort(allocator, mp, __CFMessagePortDummyCallback, &ctx, NULL);
                    __CFMessagePortSetExtraMachRef(memory);
                } else {
-                   CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePort: mach_port_insert_member() after bootstrap_check_in(): failed %d (0x%x) '%s', port = 0x%x, name = '%s'"), ret, ret, bootstrap_strerror(ret), mp, utfname);
+                   CFLog(kCFLogLevelDebug, CFSTR("*** CFMessagePort: mach_port_insert_member() after bootstrap_check_in(): failed %d (0x%x) '%s', port = 0x%x, name = '%s'"), ret, ret, bootstrap_strerror(ret), mp, utfname);
                    mach_port_destroy(mach_task_self(), mp);
-                   CFAllocatorDeallocate(allocator, utfname);
+                   CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
                    // name is released by deallocation
                    CFRelease(memory);
                    return NULL;
@@ -392,13 +423,19 @@ static CFMessagePortRef __CFMessagePortCreateLocal(CFAllocatorRef allocator, CFS
        if (!native) {
            CFMachPortContext ctx = {0, memory, NULL, NULL, NULL};
            native = CFMachPortCreate(allocator, __CFMessagePortDummyCallback, &ctx, NULL);
+           if (!native) {
+               CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
+               // name is released by deallocation
+               CFRelease(memory);
+               return NULL;
+           }
            mp = CFMachPortGetPort(native);
            ret = bootstrap_register2(bs, (char *)utfname, mp, perPID ? BOOTSTRAP_PER_PID_SERVICE : 0);
            if (ret != KERN_SUCCESS) {
-               CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePort: bootstrap_register(): failed %d (0x%x) '%s', port = 0x%x, name = '%s'\nSee /usr/include/servers/bootstrap_defs.h for the error codes."), ret, ret, bootstrap_strerror(ret), mp, utfname);
+               CFLog(kCFLogLevelDebug, CFSTR("*** CFMessagePort: bootstrap_register(): failed %d (0x%x) '%s', port = 0x%x, name = '%s'\nSee /usr/include/servers/bootstrap_defs.h for the error codes."), ret, ret, bootstrap_strerror(ret), mp, utfname);
                CFMachPortInvalidate(native);
                CFRelease(native);
-               CFAllocatorDeallocate(allocator, utfname);
+               CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
                // name is released by deallocation
                CFRelease(memory);
                return NULL;
@@ -408,7 +445,7 @@ static CFMessagePortRef __CFMessagePortCreateLocal(CFAllocatorRef allocator, CFS
        memory->_port = native;
     }
 
-    CFAllocatorDeallocate(allocator, utfname);
+    CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
     __CFMessagePortSetValid(memory);
     if (NULL != context) {
        memmove(&memory->_context, context, sizeof(CFMessagePortContext));
@@ -450,7 +487,7 @@ static CFMessagePortRef __CFMessagePortCreateRemote(CFAllocatorRef allocator, CF
     mach_port_t bp, port;
     kern_return_t ret;
 
-    name = __CFMessagePortSanitizeStringName(allocator, name, &utfname, NULL);
+    name = __CFMessagePortSanitizeStringName(name, &utfname, NULL);
     if (NULL == name) {
        return NULL;
     }
@@ -461,7 +498,11 @@ static CFMessagePortRef __CFMessagePortCreateRemote(CFAllocatorRef allocator, CF
            CFRetain(existing);
            __CFSpinUnlock(&__CFAllMessagePortsLock);
            CFRelease(name);
-           CFAllocatorDeallocate(allocator, utfname);
+           CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
+            if (!CFMessagePortIsValid(existing)) { // must do this outside lock to avoid deadlock
+               CFRelease(existing);
+                existing = NULL;
+            }
            return (CFMessagePortRef)(existing);
        }
     }
@@ -472,7 +513,7 @@ static CFMessagePortRef __CFMessagePortCreateRemote(CFAllocatorRef allocator, CF
        if (NULL != name) {
            CFRelease(name);
        }
-       CFAllocatorDeallocate(allocator, utfname);
+       CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
        return NULL;
     }
     __CFMessagePortUnsetValid(memory);
@@ -486,6 +527,8 @@ static CFMessagePortRef __CFMessagePortCreateRemote(CFAllocatorRef allocator, CF
     memory->_perPID = perPID ? pid : 0;
     memory->_replyPort = NULL;
     memory->_source = NULL;
+    memory->_dispatchSource = NULL;
+    memory->_dispatchQ = NULL;
     memory->_icallout = NULL;
     memory->_callout = NULL;
     ctx.version = 0;
@@ -496,14 +539,13 @@ static CFMessagePortRef __CFMessagePortCreateRemote(CFAllocatorRef allocator, CF
     task_get_bootstrap_port(mach_task_self(), &bp);
     ret = bootstrap_look_up2(bp, (char *)utfname, &port, perPID ? (pid_t)pid : 0, perPID ? BOOTSTRAP_PER_PID_SERVICE : 0);
     native = (KERN_SUCCESS == ret) ? CFMachPortCreateWithPort(allocator, port, __CFMessagePortDummyCallback, &ctx, NULL) : NULL;
-    CFAllocatorDeallocate(allocator, utfname);
+    CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
     if (NULL == native) {
        // name is released by deallocation
        CFRelease(memory);
        return NULL;
     }
     memory->_port = native;
-    CFMachPortSetInvalidationCallBack(native, __CFMessagePortInvalidationCallBack);
     __CFMessagePortSetValid(memory);
     __CFSpinLock(&__CFAllMessagePortsLock);
     if (!perPID && NULL != name) {
@@ -520,6 +562,15 @@ static CFMessagePortRef __CFMessagePortCreateRemote(CFAllocatorRef allocator, CF
        CFDictionaryAddValue(__CFAllRemoteMessagePorts, name, memory);
     }
     __CFSpinUnlock(&__CFAllMessagePortsLock);
+    CFMachPortSetInvalidationCallBack(native, __CFMessagePortInvalidationCallBack);
+    // that set-invalidation-callback might have called back into us
+    // if the CFMachPort is already bad, but that was a no-op since
+    // there was no callback setup at the (previous) time the CFMachPort
+    // went invalid; so check for validity manually and react
+    if (!CFMachPortIsValid(native)) {
+        CFRelease(memory); // does the invalidate
+        return NULL;
+    }
     return (CFMessagePortRef)memory;
 }
 
@@ -547,7 +598,7 @@ Boolean CFMessagePortSetName(CFMessagePortRef ms, CFStringRef name) {
 
     __CFGenericValidateType(ms, __kCFMessagePortTypeID);
     if (ms->_perPID || __CFMessagePortIsRemote(ms)) return false;
-    name = __CFMessagePortSanitizeStringName(allocator, name, &utfname, NULL);
+    name = __CFMessagePortSanitizeStringName(name, &utfname, NULL);
     if (NULL == name) {
        return false;
     }
@@ -557,7 +608,7 @@ Boolean CFMessagePortSetName(CFMessagePortRef ms, CFStringRef name) {
        if (NULL != __CFAllLocalMessagePorts && CFDictionaryGetValueIfPresent(__CFAllLocalMessagePorts, name, (const void **)&existing)) {
            __CFSpinUnlock(&__CFAllMessagePortsLock);
            CFRelease(name);
-           CFAllocatorDeallocate(allocator, utfname);
+           CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
            return false;
        }
     }
@@ -578,7 +629,7 @@ Boolean CFMessagePortSetName(CFMessagePortRef ms, CFStringRef name) {
                __CFMessagePortSetExtraMachRef(ms);
             } else {
                 mach_port_destroy(mach_task_self(), mp);
-                CFAllocatorDeallocate(allocator, utfname);
+                CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
                CFRelease(name);
                 return false;
             }
@@ -586,13 +637,18 @@ Boolean CFMessagePortSetName(CFMessagePortRef ms, CFStringRef name) {
         if (!native) {
             CFMachPortContext ctx = {0, ms, NULL, NULL, NULL};
             native = CFMachPortCreate(allocator, __CFMessagePortDummyCallback, &ctx, NULL);
+           if (!native) {
+                CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
+                CFRelease(name);
+                return false;
+           }
             mp = CFMachPortGetPort(native);
             ret = bootstrap_register2(bs, (char *)utfname, mp, 0);
             if (ret != KERN_SUCCESS) {
-                CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePort: bootstrap_register(): failed %d (0x%x) '%s', port = 0x%x, name = '%s'\nSee /usr/include/servers/bootstrap_defs.h for the error codes."), ret, ret, bootstrap_strerror(ret), mp, utfname);
+                CFLog(kCFLogLevelDebug, CFSTR("*** CFMessagePort: bootstrap_register(): failed %d (0x%x) '%s', port = 0x%x, name = '%s'\nSee /usr/include/servers/bootstrap_defs.h for the error codes."), ret, ret, bootstrap_strerror(ret), mp, utfname);
                 CFMachPortInvalidate(native);
                 CFRelease(native);
-                CFAllocatorDeallocate(allocator, utfname);
+                CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
                 CFRelease(name);
                 return false;
             }
@@ -627,7 +683,7 @@ Boolean CFMessagePortSetName(CFMessagePortRef ms, CFStringRef name) {
        __CFSpinUnlock(&__CFAllMessagePortsLock);
     }
 
-    CFAllocatorDeallocate(allocator, utfname);
+    CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
     return true;
 }
 
@@ -645,6 +701,12 @@ void CFMessagePortInvalidate(CFMessagePortRef ms) {
     }
     __CFMessagePortLock(ms);
     if (__CFMessagePortIsValid(ms)) {
+        if (ms->_dispatchSource) {
+            dispatch_source_cancel(ms->_dispatchSource);
+            ms->_dispatchSource = NULL;
+           ms->_dispatchQ = NULL;
+        }
+
        CFMessagePortInvalidationCallBack callout = ms->_icallout;
        CFRunLoopSourceRef source = ms->_source;
        CFMachPortRef replyPort = ms->_replyPort;
@@ -710,10 +772,6 @@ Boolean CFMessagePortIsValid(CFMessagePortRef ms) {
        CFMessagePortInvalidate(ms);
        return false;
     }
-    if (NULL != ms->_source && !CFRunLoopSourceIsValid(ms->_source)) {
-       CFMessagePortInvalidate(ms);
-       return false;
-    }
     return true;
 }
 
@@ -733,40 +791,66 @@ void CFMessagePortSetInvalidationCallBack(CFMessagePortRef ms, CFMessagePortInva
 
 static void __CFMessagePortReplyCallBack(CFMachPortRef port, void *msg, CFIndex size, void *info) {
     CFMessagePortRef ms = info;
-    struct __CFMessagePortMachMessage *msgp = msg;
-    struct __CFMessagePortMachMessage *replymsg;
+    mach_msg_base_t *msgp = msg;
+    mach_msg_base_t *replymsg;
     __CFMessagePortLock(ms);
     if (!__CFMessagePortIsValid(ms)) {
        __CFMessagePortUnlock(ms);
        return;
     }
-// assert: (int32_t)msgp->head.msgh_id < 0
-    if (CFDictionaryContainsKey(ms->_replies, (void *)(uintptr_t)msgp->head.msgh_id)) {
+
+    int32_t byteslen = 0;
+
+    Boolean invalidMagic = (MSGP_GET(msgp, magic) != MAGIC) && (CFSwapInt32(MSGP_GET(msgp, magic)) != MAGIC);
+    Boolean invalidComplex = (0 != msgp->body.msgh_descriptor_count) && !(msgp->header.msgh_bits & MACH_MSGH_BITS_COMPLEX);
+    invalidComplex = invalidComplex || ((msgp->header.msgh_bits & MACH_MSGH_BITS_COMPLEX) && (0 == msgp->body.msgh_descriptor_count));
+    Boolean wayTooBig = ((msgp->body.msgh_descriptor_count) ? sizeof(struct __CFMessagePortMachMessage1) : sizeof(struct __CFMessagePortMachMessage0) + __CFMessagePortMaxInlineBytes) < msgp->header.msgh_size;
+    Boolean wayTooSmall = msgp->header.msgh_size < sizeof(struct __CFMessagePortMachMessage0);
+    Boolean wrongSize = false;
+    if (!(invalidComplex || wayTooBig || wayTooSmall)) {
+        byteslen = CFSwapInt32LittleToHost(MSGP_GET(msgp, byteslen));
+        wrongSize = (byteslen < 0) || (__CFMessagePortMaxDataSize < byteslen);
+        if (0 != msgp->body.msgh_descriptor_count) {
+            wrongSize = wrongSize || (MSGP1_FIELD(msgp, ool).size != byteslen);
+        } else {
+            wrongSize = wrongSize || (msgp->header.msgh_size - sizeof(struct __CFMessagePortMachMessage0) < byteslen);
+        }
+    }
+    Boolean invalidMsgID = (0 <= msgp->header.msgh_id) && (msgp->header.msgh_id <= INT32_MAX); // conversation id
+    if (invalidMagic || invalidComplex || wayTooBig || wayTooSmall || wrongSize || invalidMsgID) {
+        CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePort: dropping corrupt reply Mach message (0b%d%d%d%d%d%d)"), invalidMagic, invalidComplex, wayTooBig, wayTooSmall, wrongSize, invalidMsgID);
+        mach_msg_destroy((mach_msg_header_t *)msgp);
+        __CFMessagePortUnlock(ms);
+        return;
+    }
+
+    if (CFDictionaryContainsKey(ms->_replies, (void *)(uintptr_t)msgp->header.msgh_id)) {
        CFDataRef reply = NULL;
-       replymsg = (struct __CFMessagePortMachMessage *)msg;
+       replymsg = (mach_msg_base_t *)msg;
        if (0 == replymsg->body.msgh_descriptor_count) {
-           int32_t byteslen = CFSwapInt32LittleToHost(replymsg->contents.msg0.byteslen);
-           if (0 <= byteslen) {
-               reply = CFDataCreate(kCFAllocatorSystemDefault, replymsg->contents.msg0.bytes, byteslen);
+           uintptr_t msgp_extent = (uintptr_t)((uint8_t *)msgp + msgp->header.msgh_size);
+           uintptr_t data_extent = (uintptr_t)((uint8_t *)&(MSGP0_FIELD(replymsg, bytes)) + byteslen);
+           if (0 <= byteslen && data_extent <= msgp_extent) {
+               reply = CFDataCreate(kCFAllocatorSystemDefault, MSGP0_FIELD(replymsg, bytes), byteslen);
            } else {
                reply = (void *)~0;     // means NULL data
            }
        } else {
 //#warning CF: should create a no-copy data here that has a custom VM-freeing allocator, and not vm_dealloc here
-           reply = CFDataCreate(kCFAllocatorSystemDefault, replymsg->contents.msg1.desc.out_of_line.address, replymsg->contents.msg1.desc.out_of_line.size);
-           vm_deallocate(mach_task_self(), (vm_address_t)replymsg->contents.msg1.desc.out_of_line.address, replymsg->contents.msg1.desc.out_of_line.size);
+           reply = CFDataCreate(kCFAllocatorSystemDefault, MSGP1_FIELD(replymsg, ool).address, MSGP1_FIELD(replymsg, ool).size);
+           vm_deallocate(mach_task_self(), (vm_address_t)MSGP1_FIELD(replymsg, ool).address, MSGP1_FIELD(replymsg, ool).size);
        }
-       CFDictionarySetValue(ms->_replies, (void *)(uintptr_t)msgp->head.msgh_id, (void *)reply);
+       CFDictionarySetValue(ms->_replies, (void *)(uintptr_t)msgp->header.msgh_id, (void *)reply);
     } else {   /* discard message */
        if (1 == msgp->body.msgh_descriptor_count) {
-           vm_deallocate(mach_task_self(), (vm_address_t)msgp->contents.msg1.desc.out_of_line.address, msgp->contents.msg1.desc.out_of_line.size);
+           vm_deallocate(mach_task_self(), (vm_address_t)MSGP1_FIELD(msgp, ool).address, MSGP1_FIELD(msgp, ool).size);
        }
     }
     __CFMessagePortUnlock(ms);
 }
 
 SInt32 CFMessagePortSendRequest(CFMessagePortRef remote, SInt32 msgid, CFDataRef data, CFTimeInterval sendTimeout, CFTimeInterval rcvTimeout, CFStringRef replyMode, CFDataRef *returnDatap) {
-    struct __CFMessagePortMachMessage *sendmsg;
+    mach_msg_base_t *sendmsg;
     CFRunLoopRef currentRL = CFRunLoopGetCurrent();
     CFRunLoopSourceRef source = NULL;
     CFDataRef reply = NULL;
@@ -778,8 +862,16 @@ SInt32 CFMessagePortSendRequest(CFMessagePortRef remote, SInt32 msgid, CFDataRef
 
     //#warning CF: This should be an assert
     // if (!__CFMessagePortIsRemote(remote)) return -999;
-    if (!__CFMessagePortIsValid(remote)) return kCFMessagePortIsInvalid;
+    if (data && __CFMessagePortMaxDataSize < CFDataGetLength(data)) {
+        CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePortSendRequest: CFMessagePort cannot send more than %lu bytes of data"), __CFMessagePortMaxDataSize);
+        return kCFMessagePortTransportError;
+    }
     __CFMessagePortLock(remote);
+    if (!__CFMessagePortIsValid(remote)) {
+        __CFMessagePortUnlock(remote);
+        return kCFMessagePortIsInvalid;
+    }
+    CFRetain(remote); // retain during run loop to avoid invalidation causing freeing
     if (NULL == remote->_replyPort) {
        CFMachPortContext context;
        context.version = 0;
@@ -791,8 +883,12 @@ SInt32 CFMessagePortSendRequest(CFMessagePortRef remote, SInt32 msgid, CFDataRef
     }
     remote->_convCounter++;
     desiredReply = -remote->_convCounter;
-    sendmsg = __CFMessagePortCreateMessage(kCFAllocatorSystemDefault, false, CFMachPortGetPort(remote->_port), (replyMode != NULL ? CFMachPortGetPort(remote->_replyPort) : MACH_PORT_NULL), -desiredReply, msgid, (data ? CFDataGetBytePtr(data) : NULL), (data ? CFDataGetLength(data) : 0));
-    __CFMessagePortUnlock(remote);
+    sendmsg = __CFMessagePortCreateMessage(false, CFMachPortGetPort(remote->_port), (replyMode != NULL ? CFMachPortGetPort(remote->_replyPort) : MACH_PORT_NULL), -desiredReply, msgid, (data ? CFDataGetBytePtr(data) : NULL), (data ? CFDataGetLength(data) : 0));
+    if (!sendmsg) {
+        __CFMessagePortUnlock(remote);
+        CFRelease(remote);
+        return kCFMessagePortTransportError;
+    }
     if (replyMode != NULL) {
         CFDictionarySetValue(remote->_replies, (void *)(uintptr_t)desiredReply, NULL);
         source = CFMachPortCreateRunLoopSource(CFGetAllocator(remote), remote->_replyPort, -100);
@@ -808,7 +904,7 @@ SInt32 CFMessagePortSendRequest(CFMessagePortRef remote, SInt32 msgid, CFDataRef
        if (sendTimeout < 1.0) sendTimeout = 0.0;
        sendTimeOut = floor(sendTimeout);
     }
-    ret = mach_msg((mach_msg_header_t *)sendmsg, MACH_SEND_MSG|sendOpts, sendmsg->head.msgh_size, 0, MACH_PORT_NULL, sendTimeOut, MACH_PORT_NULL);
+    ret = mach_msg((mach_msg_header_t *)sendmsg, MACH_SEND_MSG|sendOpts, sendmsg->header.msgh_size, 0, MACH_PORT_NULL, sendTimeOut, MACH_PORT_NULL);
     if (KERN_SUCCESS != ret) {
        // need to deallocate the send-once right that might have been created
        if (replyMode != NULL) mach_port_deallocate(mach_task_self(), ((mach_msg_header_t *)sendmsg)->msgh_local_port);
@@ -816,14 +912,17 @@ SInt32 CFMessagePortSendRequest(CFMessagePortRef remote, SInt32 msgid, CFDataRef
            CFRunLoopRemoveSource(currentRL, source, replyMode);
        }
        if (source) CFRelease(source);
+        __CFMessagePortUnlock(remote);
         CFAllocatorDeallocate(kCFAllocatorSystemDefault, sendmsg);
+        CFRelease(remote);
        return (MACH_SEND_TIMED_OUT == ret) ? kCFMessagePortSendTimeout : kCFMessagePortTransportError;
     }
+    __CFMessagePortUnlock(remote);
     CFAllocatorDeallocate(kCFAllocatorSystemDefault, sendmsg);
     if (replyMode == NULL) {
+        CFRelease(remote);
        return kCFMessagePortSuccess;
     }
-    CFRetain(remote); // retain during run loop to avoid invalidation causing freeing
     _CFMachPortInstallNotifyPort(currentRL, replyMode);
     termTSR = mach_absolute_time() + __CFTimeIntervalToTSR(rcvTimeout);
     for (;;) {
@@ -846,7 +945,7 @@ SInt32 CFMessagePortSendRequest(CFMessagePortRef remote, SInt32 msgid, CFDataRef
     if (NULL == reply) {
        CFDictionaryRemoveValue(remote->_replies, (void *)(uintptr_t)desiredReply);
        CFRelease(remote);
-       return CFMessagePortIsValid(remote) ? kCFMessagePortReceiveTimeout : -5;
+       return CFMessagePortIsValid(remote) ? kCFMessagePortReceiveTimeout : kCFMessagePortBecameInvalidError;
     }
     if (NULL != returnDatap) {
        *returnDatap = ((void *)~0 == reply) ? NULL : reply;
@@ -860,14 +959,14 @@ SInt32 CFMessagePortSendRequest(CFMessagePortRef remote, SInt32 msgid, CFDataRef
 
 static mach_port_t __CFMessagePortGetPort(void *info) {
     CFMessagePortRef ms = info;
-    if (!ms->_port) CFLog(kCFLogLevelWarning, CFSTR("*** Warning: A local CFMessagePort (%p) is being put in a run loop, but it has not been named yet, so this will be a no-op and no messages are going to be received, even if named later."), info);
+    if (!ms->_port) CFLog(kCFLogLevelWarning, CFSTR("*** Warning: A local CFMessagePort (%p) is being put in a run loop or dispatch queue, but it has not been named yet, so this will be a no-op and no messages are going to be received, even if named later."), info);
     return ms->_port ? CFMachPortGetPort(ms->_port) : MACH_PORT_NULL;
 }
 
 static void *__CFMessagePortPerform(void *msg, CFIndex size, CFAllocatorRef allocator, void *info) {
     CFMessagePortRef ms = info;
-    struct __CFMessagePortMachMessage *msgp = msg;
-    struct __CFMessagePortMachMessage *replymsg;
+    mach_msg_base_t *msgp = msg;
+    mach_msg_base_t *replymsg;
     void *context_info;
     void (*context_release)(const void *);
     CFDataRef returnData, data = NULL;
@@ -880,7 +979,6 @@ static void *__CFMessagePortPerform(void *msg, CFIndex size, CFAllocatorRef allo
        __CFMessagePortUnlock(ms);
        return NULL;
     }
-// assert: 0 < (int32_t)msgp->head.msgh_id
     if (NULL != ms->_context.retain) {
        context_info = (void *)ms->_context.retain(ms->_context.info);
        context_release = ms->_context.release;
@@ -889,16 +987,42 @@ static void *__CFMessagePortPerform(void *msg, CFIndex size, CFAllocatorRef allo
        context_release = NULL;
     }
     __CFMessagePortUnlock(ms);
+
+    int32_t byteslen = 0;
+
+    Boolean invalidMagic = (MSGP_GET(msgp, magic) != MAGIC) && (CFSwapInt32(MSGP_GET(msgp, magic)) != MAGIC);
+    Boolean invalidComplex = (0 != msgp->body.msgh_descriptor_count) && !(msgp->header.msgh_bits & MACH_MSGH_BITS_COMPLEX);
+    invalidComplex = invalidComplex || ((msgp->header.msgh_bits & MACH_MSGH_BITS_COMPLEX) && (0 == msgp->body.msgh_descriptor_count));
+    Boolean wayTooBig = ((msgp->body.msgh_descriptor_count) ? sizeof(struct __CFMessagePortMachMessage1) : sizeof(struct __CFMessagePortMachMessage0) + __CFMessagePortMaxInlineBytes) < msgp->header.msgh_size;
+    Boolean wayTooSmall = msgp->header.msgh_size < sizeof(struct __CFMessagePortMachMessage0);
+    Boolean wrongSize = false;
+    if (!(invalidComplex || wayTooBig || wayTooSmall)) {
+        byteslen = CFSwapInt32LittleToHost(MSGP_GET(msgp, byteslen));
+        wrongSize = (byteslen < 0) || (__CFMessagePortMaxDataSize < byteslen);
+        if (0 != msgp->body.msgh_descriptor_count) {
+            wrongSize = wrongSize || (MSGP1_FIELD(msgp, ool).size != byteslen);
+        } else {
+            wrongSize = wrongSize || (msgp->header.msgh_size - sizeof(struct __CFMessagePortMachMessage0) < byteslen);
+        }
+    }
+    Boolean invalidMsgID = (msgp->header.msgh_id <= 0) || (INT32_MAX < msgp->header.msgh_id); // conversation id
+    if (invalidMagic || invalidComplex || wayTooBig || wayTooSmall || wrongSize || invalidMsgID) {
+       CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePort: dropping corrupt request Mach message (0b%d%d%d%d%d%d)"), invalidMagic, invalidComplex, wayTooBig, wayTooSmall, wrongSize, invalidMsgID);
+        mach_msg_destroy((mach_msg_header_t *)msgp);
+        return NULL;
+    }
+
     /* Create no-copy, no-free-bytes wrapper CFData */
     if (0 == msgp->body.msgh_descriptor_count) {
-       int32_t byteslen = CFSwapInt32LittleToHost(msgp->contents.msg0.byteslen);
-       msgid = CFSwapInt32LittleToHost(msgp->contents.msg0.msgid);
-       if (0 <= byteslen) {
-           data = CFDataCreateWithBytesNoCopy(kCFAllocatorSystemDefault, msgp->contents.msg0.bytes, byteslen, kCFAllocatorNull);
+       uintptr_t msgp_extent = (uintptr_t)((uint8_t *)msgp + msgp->header.msgh_size);
+       uintptr_t data_extent = (uintptr_t)((uint8_t *)&(MSGP0_FIELD(msgp, bytes)) + byteslen);
+       msgid = CFSwapInt32LittleToHost(MSGP_GET(msgp, msgid));
+       if (0 <= byteslen && data_extent <= msgp_extent) {
+           data = CFDataCreateWithBytesNoCopy(allocator, MSGP0_FIELD(msgp, bytes), byteslen, kCFAllocatorNull);
        }
     } else {
-       msgid = CFSwapInt32LittleToHost(msgp->contents.msg1.msgid);
-       data = CFDataCreateWithBytesNoCopy(kCFAllocatorSystemDefault, msgp->contents.msg1.desc.out_of_line.address, msgp->contents.msg1.desc.out_of_line.size, kCFAllocatorNull);
+       msgid = CFSwapInt32LittleToHost(MSGP_GET(msgp, msgid));
+       data = CFDataCreateWithBytesNoCopy(allocator, MSGP1_FIELD(msgp, ool).address, MSGP1_FIELD(msgp, ool).size, kCFAllocatorNull);
     }
     returnData = ms->_callout(ms, msgid, data, context_info);
     /* Now, returnData could be (1) NULL, (2) an ordinary data < MAX_INLINE,
@@ -913,9 +1037,14 @@ static void *__CFMessagePortPerform(void *msg, CFIndex size, CFAllocatorRef allo
     wad until the message was sent either, if we didn't make the copy. */
     if (NULL != returnData) {
        return_len = CFDataGetLength(returnData);
-       if (return_len < __CFMessagePortMaxInlineBytes) {
+        if (__CFMessagePortMaxDataSize < return_len) {
+            CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePort reply: CFMessagePort cannot send more than %lu bytes of data"), __CFMessagePortMaxDataSize);
+            return_len = 0;
+            returnData = NULL;
+        }
+       if (returnData && return_len < __CFMessagePortMaxInlineBytes) {
            return_bytes = (void *)CFDataGetBytePtr(returnData);
-       } else {
+       } else if (returnData) {
            return_bytes = NULL;
            vm_allocate(mach_task_self(), (vm_address_t *)&return_bytes, return_len, VM_FLAGS_ANYWHERE | VM_MAKE_TAG(VM_MEMORY_MACH_MSG));
            /* vm_copy would only be a win here if the source address
@@ -925,13 +1054,13 @@ static void *__CFMessagePortPerform(void *msg, CFIndex size, CFAllocatorRef allo
            memmove(return_bytes, CFDataGetBytePtr(returnData), return_len);
        }
     }
-    replymsg = __CFMessagePortCreateMessage(allocator, true, msgp->head.msgh_remote_port, MACH_PORT_NULL, -1 * (int32_t)msgp->head.msgh_id, msgid, return_bytes, return_len);
+    replymsg = __CFMessagePortCreateMessage(true, msgp->header.msgh_remote_port, MACH_PORT_NULL, -1 * (int32_t)msgp->header.msgh_id, msgid, return_bytes, return_len);
     if (1 == replymsg->body.msgh_descriptor_count) {
-       replymsg->contents.msg1.desc.out_of_line.deallocate = true;
+       MSGP1_FIELD(replymsg, ool).deallocate = true;
     }
     if (data) CFRelease(data);
     if (1 == msgp->body.msgh_descriptor_count) {
-       vm_deallocate(mach_task_self(), (vm_address_t)msgp->contents.msg1.desc.out_of_line.address, msgp->contents.msg1.desc.out_of_line.size);
+       vm_deallocate(mach_task_self(), (vm_address_t)MSGP1_FIELD(msgp, ool).address, MSGP1_FIELD(msgp, ool).size);
     }
     if (returnData) CFRelease(returnData);
     if (context_release) {
@@ -943,10 +1072,14 @@ static void *__CFMessagePortPerform(void *msg, CFIndex size, CFAllocatorRef allo
 CFRunLoopSourceRef CFMessagePortCreateRunLoopSource(CFAllocatorRef allocator, CFMessagePortRef ms, CFIndex order) {
     CFRunLoopSourceRef result = NULL;
     __CFGenericValidateType(ms, __kCFMessagePortTypeID);
-//#warning CF: This should be an assert
-   // if (__CFMessagePortIsRemote(ms)) return NULL;
+    if (!CFMessagePortIsValid(ms)) return NULL;
+    if (__CFMessagePortIsRemote(ms)) return NULL;
     __CFMessagePortLock(ms);
-    if (NULL == ms->_source && __CFMessagePortIsValid(ms)) {
+    if (NULL != ms->_source && !CFRunLoopSourceIsValid(ms->_source)) {
+        CFRelease(ms->_source);
+        ms->_source = NULL;
+    }
+    if (NULL == ms->_source && NULL == ms->_dispatchSource && __CFMessagePortIsValid(ms)) {
        CFRunLoopSourceContext1 context;
        context.version = 1;
        context.info = (void *)ms;
@@ -966,4 +1099,80 @@ CFRunLoopSourceRef CFMessagePortCreateRunLoopSource(CFAllocatorRef allocator, CF
     return result;
 }
 
+void CFMessagePortSetDispatchQueue(CFMessagePortRef ms, dispatch_queue_t queue) {
+    __CFGenericValidateType(ms, __kCFMessagePortTypeID);
+    __CFMessagePortLock(ms);
+    if (!__CFMessagePortIsValid(ms)) {
+       __CFMessagePortUnlock(ms);
+       CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePortSetDispatchQueue(): CFMessagePort is invalid"));
+       return;
+    }
+    if (__CFMessagePortIsRemote(ms)) {
+       __CFMessagePortUnlock(ms);
+       CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePortSetDispatchQueue(): CFMessagePort is not a local port, queue cannot be set"));
+       return;
+    }
+    if (NULL != ms->_source) {
+       __CFMessagePortUnlock(ms);
+       CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePortSetDispatchQueue(): CFMessagePort already has a CFRunLoopSourceRef, queue cannot be set"));
+       return;
+    }
+
+    if (ms->_dispatchSource) {
+        dispatch_source_cancel(ms->_dispatchSource);
+        ms->_dispatchSource = NULL;
+        ms->_dispatchQ = NULL;
+    }
+
+    if (queue) {
+        mach_port_t port = __CFMessagePortGetPort(ms);
+        if (MACH_PORT_NULL != port) {
+               dispatch_source_t theSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_MACH_RECV, port, 0, __mportQueue());
+               dispatch_source_set_cancel_handler(theSource, ^{
+                   dispatch_release(queue);
+                   dispatch_release(theSource);
+               });
+               dispatch_source_set_event_handler(theSource, ^{
+                   CFRetain(ms);
+                   mach_msg_header_t *msg = (mach_msg_header_t *)CFAllocatorAllocate(kCFAllocatorSystemDefault, 2048, 0);
+                   msg->msgh_size = 2048;
+
+                   for (;;) {
+                       msg->msgh_bits = 0;
+                       msg->msgh_local_port = port;
+                       msg->msgh_remote_port = MACH_PORT_NULL;
+                       msg->msgh_id = 0;
+
+                       kern_return_t ret = mach_msg(msg, MACH_RCV_MSG|MACH_RCV_LARGE|MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)|MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_AV), 0, msg->msgh_size, port, 0, MACH_PORT_NULL);
+                       if (MACH_MSG_SUCCESS == ret) break;
+                       if (MACH_RCV_TOO_LARGE != ret) HALT;
+
+                       uint32_t newSize = round_msg(msg->msgh_size + MAX_TRAILER_SIZE);
+                       msg = CFAllocatorReallocate(kCFAllocatorSystemDefault, msg, newSize, 0);
+                       msg->msgh_size = newSize;
+                   }
+
+                   dispatch_async(queue, ^{
+                       mach_msg_header_t *reply = __CFMessagePortPerform(msg, msg->msgh_size, kCFAllocatorSystemDefault, ms);
+                       if (NULL != reply) {
+                           kern_return_t ret = mach_msg(reply, MACH_SEND_MSG, reply->msgh_size, 0, MACH_PORT_NULL, 0, MACH_PORT_NULL);
+                           if (KERN_SUCCESS != ret) mach_msg_destroy(reply);
+                           CFAllocatorDeallocate(kCFAllocatorSystemDefault, reply);
+                       }
+                       CFAllocatorDeallocate(kCFAllocatorSystemDefault, msg);
+                       CFRelease(ms);
+                   });
+               });
+               ms->_dispatchSource = theSource;
+           }
+        if (ms->_dispatchSource) {
+            dispatch_retain(queue);
+            ms->_dispatchQ = queue;
+            dispatch_resume(ms->_dispatchSource);
+        } else {
+            CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePortSetDispatchQueue(): dispatch source could not be created"));
+        }
+    }
+    __CFMessagePortUnlock(ms);
+}