]> git.saurik.com Git - apple/cf.git/blobdiff - CFMessagePort.c
CF-550.42.tar.gz
[apple/cf.git] / CFMessagePort.c
index f9c0964abde1107e3a6dac989eb3c9fc46cc9f1f..40afce5ed0ac8a1ecdf4e03b0d9e9a54d5a3bdb2 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009 Apple Inc. All rights reserved.
+ * Copyright (c) 2010 Apple Inc. All rights reserved.
  *
  * @APPLE_LICENSE_HEADER_START@
  * 
@@ -20,8 +20,9 @@
  * 
  * @APPLE_LICENSE_HEADER_END@
  */
+
 /*     CFMessagePort.c
-       Copyright (c) 1998-2009, Apple Inc. All rights reserved.
+       Copyright (c) 1998-2010, Apple Inc. All rights reserved.
        Responsibility: Christopher Kane
 */
 
@@ -144,54 +145,74 @@ CF_INLINE void __CFMessagePortUnlock(CFMessagePortRef ms) {
 // Just a heuristic
 #define __CFMessagePortMaxInlineBytes 4096*10
 
-struct __CFMessagePortMachMessage {
-    mach_msg_header_t head;
-    mach_msg_body_t body;
-    union {
-        mach_msg_ool_descriptor32_t _0;
-        mach_msg_ool_descriptor64_t _1;
-        mach_msg_ool_descriptor_t out_of_line;
-    } desc; // ignored for inline bytes messages
+struct __CFMessagePortMachMessage0 {
+    mach_msg_base_t base;
+    int32_t magic;
     int32_t msgid;
     int32_t byteslen;
     uint8_t bytes[0];
 };
 
-static struct __CFMessagePortMachMessage *__CFMessagePortCreateMessage(bool reply, mach_port_t port, mach_port_t replyPort, int32_t convid, int32_t msgid, const uint8_t *bytes, int32_t byteslen) {
+struct __CFMessagePortMachMessage1 {
+    mach_msg_base_t base;
+    mach_msg_ool_descriptor_t ool;
+    int32_t magic;
+    int32_t msgid;
+    int32_t byteslen;
+};
+
+#define MAGIC 0xF1F2F3F4
+
+#define MSGP0_FIELD(msgp, ident) ((struct __CFMessagePortMachMessage0 *)msgp)->ident
+#define MSGP1_FIELD(msgp, ident) ((struct __CFMessagePortMachMessage1 *)msgp)->ident
+#define MSGP_GET(msgp, ident) \
+    ((((mach_msg_base_t *)msgp)->body.msgh_descriptor_count) ? MSGP1_FIELD(msgp, ident) : MSGP0_FIELD(msgp, ident))
+
+static mach_msg_base_t *__CFMessagePortCreateMessage(bool reply, mach_port_t port, mach_port_t replyPort, int32_t convid, int32_t msgid, const uint8_t *bytes, int32_t byteslen) {
     if (__CFMessagePortMaxDataSize < byteslen) return NULL;
-    struct __CFMessagePortMachMessage *msg;
-    int32_t size = sizeof(struct __CFMessagePortMachMessage);
     int32_t rounded_byteslen = ((byteslen + 3) & ~0x3);
     if (rounded_byteslen <= __CFMessagePortMaxInlineBytes) {
-       size += rounded_byteslen;
-    }
-    msg = CFAllocatorAllocate(kCFAllocatorSystemDefault, size, 0);
-    if (!msg) return NULL;
-    memset(msg, 0, size);
-    msg->head.msgh_id = convid;
-    msg->head.msgh_size = size;
-    msg->head.msgh_remote_port = port;
-    msg->head.msgh_local_port = replyPort;
-    msg->head.msgh_reserved = 0;
-//    msg->head.msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_COPY_SEND, (replyPort ? MACH_MSG_TYPE_MAKE_SEND : 0));
-    msg->head.msgh_bits = MACH_MSGH_BITS((reply ? MACH_MSG_TYPE_MOVE_SEND_ONCE : MACH_MSG_TYPE_COPY_SEND), (MACH_PORT_NULL != replyPort ? MACH_MSG_TYPE_MAKE_SEND_ONCE : 0));
-    msg->msgid = CFSwapInt32HostToLittle(msgid);
-    msg->byteslen = CFSwapInt32HostToLittle(byteslen);
-    if (rounded_byteslen <= __CFMessagePortMaxInlineBytes) {
-       msg->body.msgh_descriptor_count = 0;
+        int32_t size = sizeof(struct __CFMessagePortMachMessage0) + rounded_byteslen;
+        struct __CFMessagePortMachMessage0 *msg = CFAllocatorAllocate(kCFAllocatorSystemDefault, size, 0);
+        if (!msg) return NULL;
+        memset(msg, 0, size);
+        msg->base.header.msgh_id = convid;
+        msg->base.header.msgh_size = size;
+        msg->base.header.msgh_remote_port = port;
+        msg->base.header.msgh_local_port = replyPort;
+        msg->base.header.msgh_reserved = 0;
+        msg->base.header.msgh_bits = MACH_MSGH_BITS((reply ? MACH_MSG_TYPE_MOVE_SEND_ONCE : MACH_MSG_TYPE_COPY_SEND), (MACH_PORT_NULL != replyPort ? MACH_MSG_TYPE_MAKE_SEND_ONCE : 0));
+       msg->base.body.msgh_descriptor_count = 0;
+        msg->magic = MAGIC;
+        msg->msgid = CFSwapInt32HostToLittle(msgid);
+        msg->byteslen = CFSwapInt32HostToLittle(byteslen);
        if (NULL != bytes && 0 < byteslen) {
            memmove(msg->bytes, bytes, byteslen);
        }
+        return (mach_msg_base_t *)msg;
     } else {
-       msg->head.msgh_bits |= MACH_MSGH_BITS_COMPLEX;
-       msg->body.msgh_descriptor_count = 1;
-       msg->desc.out_of_line.deallocate = false;
-       msg->desc.out_of_line.copy = MACH_MSG_VIRTUAL_COPY;
-       msg->desc.out_of_line.address = (void *)bytes;
-       msg->desc.out_of_line.size = byteslen;
-       msg->desc.out_of_line.type = MACH_MSG_OOL_DESCRIPTOR;
-    }
-    return msg;
+        int32_t size = sizeof(struct __CFMessagePortMachMessage1);
+        struct __CFMessagePortMachMessage1 *msg = CFAllocatorAllocate(kCFAllocatorSystemDefault, size, 0);
+        if (!msg) return NULL;
+        memset(msg, 0, size);
+        msg->base.header.msgh_id = convid;
+        msg->base.header.msgh_size = size;
+        msg->base.header.msgh_remote_port = port;
+        msg->base.header.msgh_local_port = replyPort;
+        msg->base.header.msgh_reserved = 0;
+        msg->base.header.msgh_bits = MACH_MSGH_BITS((reply ? MACH_MSG_TYPE_MOVE_SEND_ONCE : MACH_MSG_TYPE_COPY_SEND), (MACH_PORT_NULL != replyPort ? MACH_MSG_TYPE_MAKE_SEND_ONCE : 0));
+       msg->base.header.msgh_bits |= MACH_MSGH_BITS_COMPLEX;
+       msg->base.body.msgh_descriptor_count = 1;
+        msg->magic = MAGIC;
+        msg->msgid = CFSwapInt32HostToLittle(msgid);
+        msg->byteslen = CFSwapInt32HostToLittle(byteslen);
+       msg->ool.deallocate = false;
+       msg->ool.copy = MACH_MSG_VIRTUAL_COPY;
+       msg->ool.address = (void *)bytes;
+       msg->ool.size = byteslen;
+       msg->ool.type = MACH_MSG_OOL_DESCRIPTOR;
+        return (mach_msg_base_t *)msg;
+    }
 }
 
 static CFStringRef __CFMessagePortCopyDescription(CFTypeRef cf) {
@@ -339,6 +360,10 @@ static CFMessagePortRef __CFMessagePortCreateLocal(CFAllocatorRef allocator, CFS
            __CFSpinUnlock(&__CFAllMessagePortsLock);
            CFRelease(name);
            CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
+            if (!CFMessagePortIsValid(existing)) { // must do this outside lock to avoid deadlock
+               CFRelease(existing);
+                existing = NULL;
+            }
            return (CFMessagePortRef)(existing);
        }
     }
@@ -474,6 +499,10 @@ static CFMessagePortRef __CFMessagePortCreateRemote(CFAllocatorRef allocator, CF
            __CFSpinUnlock(&__CFAllMessagePortsLock);
            CFRelease(name);
            CFAllocatorDeallocate(kCFAllocatorSystemDefault, utfname);
+            if (!CFMessagePortIsValid(existing)) { // must do this outside lock to avoid deadlock
+               CFRelease(existing);
+                existing = NULL;
+            }
            return (CFMessagePortRef)(existing);
        }
     }
@@ -673,7 +702,7 @@ void CFMessagePortInvalidate(CFMessagePortRef ms) {
     __CFMessagePortLock(ms);
     if (__CFMessagePortIsValid(ms)) {
         if (ms->_dispatchSource) {
-            dispatch_cancel(ms->_dispatchSource);
+            dispatch_source_cancel(ms->_dispatchSource);
             ms->_dispatchSource = NULL;
            ms->_dispatchQ = NULL;
         }
@@ -743,10 +772,6 @@ Boolean CFMessagePortIsValid(CFMessagePortRef ms) {
        CFMessagePortInvalidate(ms);
        return false;
     }
-    if (NULL != ms->_source && !CFRunLoopSourceIsValid(ms->_source)) {
-       CFMessagePortInvalidate(ms);
-       return false;
-    }
     return true;
 }
 
@@ -766,8 +791,8 @@ void CFMessagePortSetInvalidationCallBack(CFMessagePortRef ms, CFMessagePortInva
 
 static void __CFMessagePortReplyCallBack(CFMachPortRef port, void *msg, CFIndex size, void *info) {
     CFMessagePortRef ms = info;
-    struct __CFMessagePortMachMessage *msgp = msg;
-    struct __CFMessagePortMachMessage *replymsg;
+    mach_msg_base_t *msgp = msg;
+    mach_msg_base_t *replymsg;
     __CFMessagePortLock(ms);
     if (!__CFMessagePortIsValid(ms)) {
        __CFMessagePortUnlock(ms);
@@ -776,55 +801,56 @@ static void __CFMessagePortReplyCallBack(CFMachPortRef port, void *msg, CFIndex
 
     int32_t byteslen = 0;
 
-    Boolean invalidComplex = (0 != msgp->body.msgh_descriptor_count) && !(msgp->head.msgh_bits & MACH_MSGH_BITS_COMPLEX);
-    invalidComplex = invalidComplex || ((msgp->head.msgh_bits & MACH_MSGH_BITS_COMPLEX) && (0 == msgp->body.msgh_descriptor_count));
-    Boolean wayTooBig = sizeof(struct __CFMessagePortMachMessage) + __CFMessagePortMaxInlineBytes < msgp->head.msgh_size;
-    Boolean wayTooSmall = msgp->head.msgh_size < sizeof(struct __CFMessagePortMachMessage);
+    Boolean invalidMagic = (MSGP_GET(msgp, magic) != MAGIC) && (CFSwapInt32(MSGP_GET(msgp, magic)) != MAGIC);
+    Boolean invalidComplex = (0 != msgp->body.msgh_descriptor_count) && !(msgp->header.msgh_bits & MACH_MSGH_BITS_COMPLEX);
+    invalidComplex = invalidComplex || ((msgp->header.msgh_bits & MACH_MSGH_BITS_COMPLEX) && (0 == msgp->body.msgh_descriptor_count));
+    Boolean wayTooBig = ((msgp->body.msgh_descriptor_count) ? sizeof(struct __CFMessagePortMachMessage1) : sizeof(struct __CFMessagePortMachMessage0) + __CFMessagePortMaxInlineBytes) < msgp->header.msgh_size;
+    Boolean wayTooSmall = msgp->header.msgh_size < sizeof(struct __CFMessagePortMachMessage0);
     Boolean wrongSize = false;
     if (!(invalidComplex || wayTooBig || wayTooSmall)) {
-        byteslen = CFSwapInt32LittleToHost(msgp->byteslen);
+        byteslen = CFSwapInt32LittleToHost(MSGP_GET(msgp, byteslen));
         wrongSize = (byteslen < 0) || (__CFMessagePortMaxDataSize < byteslen);
-        if (msgp->head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
-            wrongSize = wrongSize || (msgp->desc.out_of_line.size != byteslen);
+        if (0 != msgp->body.msgh_descriptor_count) {
+            wrongSize = wrongSize || (MSGP1_FIELD(msgp, ool).size != byteslen);
         } else {
-            wrongSize = wrongSize || (msgp->head.msgh_size - sizeof(struct __CFMessagePortMachMessage) < byteslen);
+            wrongSize = wrongSize || (msgp->header.msgh_size - sizeof(struct __CFMessagePortMachMessage0) < byteslen);
         }
     }
-    Boolean invalidMsgID = (0 <= msgp->head.msgh_id) && (msgp->head.msgh_id <= INT32_MAX); // conversation id
-    if (invalidComplex || wayTooBig || wayTooSmall || wrongSize || invalidMsgID) {
-        CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePort: dropping corrupt reply Mach message (0b%d%d%d%d%d)"), invalidComplex, wayTooBig, wayTooSmall, wrongSize, invalidMsgID);
+    Boolean invalidMsgID = (0 <= msgp->header.msgh_id) && (msgp->header.msgh_id <= INT32_MAX); // conversation id
+    if (invalidMagic || invalidComplex || wayTooBig || wayTooSmall || wrongSize || invalidMsgID) {
+        CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePort: dropping corrupt reply Mach message (0b%d%d%d%d%d%d)"), invalidMagic, invalidComplex, wayTooBig, wayTooSmall, wrongSize, invalidMsgID);
         mach_msg_destroy((mach_msg_header_t *)msgp);
         __CFMessagePortUnlock(ms);
         return;
     }
 
-    if (CFDictionaryContainsKey(ms->_replies, (void *)(uintptr_t)msgp->head.msgh_id)) {
+    if (CFDictionaryContainsKey(ms->_replies, (void *)(uintptr_t)msgp->header.msgh_id)) {
        CFDataRef reply = NULL;
-       replymsg = (struct __CFMessagePortMachMessage *)msg;
+       replymsg = (mach_msg_base_t *)msg;
        if (0 == replymsg->body.msgh_descriptor_count) {
-           uintptr_t msgp_extent = (uintptr_t)((uint8_t *)msgp + msgp->head.msgh_size);
-           uintptr_t data_extent = (uintptr_t)((uint8_t *)&(replymsg->bytes) + byteslen);
+           uintptr_t msgp_extent = (uintptr_t)((uint8_t *)msgp + msgp->header.msgh_size);
+           uintptr_t data_extent = (uintptr_t)((uint8_t *)&(MSGP0_FIELD(replymsg, bytes)) + byteslen);
            if (0 <= byteslen && data_extent <= msgp_extent) {
-               reply = CFDataCreate(kCFAllocatorSystemDefault, replymsg->bytes, byteslen);
+               reply = CFDataCreate(kCFAllocatorSystemDefault, MSGP0_FIELD(replymsg, bytes), byteslen);
            } else {
                reply = (void *)~0;     // means NULL data
            }
        } else {
 //#warning CF: should create a no-copy data here that has a custom VM-freeing allocator, and not vm_dealloc here
-           reply = CFDataCreate(kCFAllocatorSystemDefault, replymsg->desc.out_of_line.address, replymsg->desc.out_of_line.size);
-           vm_deallocate(mach_task_self(), (vm_address_t)replymsg->desc.out_of_line.address, replymsg->desc.out_of_line.size);
+           reply = CFDataCreate(kCFAllocatorSystemDefault, MSGP1_FIELD(replymsg, ool).address, MSGP1_FIELD(replymsg, ool).size);
+           vm_deallocate(mach_task_self(), (vm_address_t)MSGP1_FIELD(replymsg, ool).address, MSGP1_FIELD(replymsg, ool).size);
        }
-       CFDictionarySetValue(ms->_replies, (void *)(uintptr_t)msgp->head.msgh_id, (void *)reply);
+       CFDictionarySetValue(ms->_replies, (void *)(uintptr_t)msgp->header.msgh_id, (void *)reply);
     } else {   /* discard message */
        if (1 == msgp->body.msgh_descriptor_count) {
-           vm_deallocate(mach_task_self(), (vm_address_t)msgp->desc.out_of_line.address, msgp->desc.out_of_line.size);
+           vm_deallocate(mach_task_self(), (vm_address_t)MSGP1_FIELD(msgp, ool).address, MSGP1_FIELD(msgp, ool).size);
        }
     }
     __CFMessagePortUnlock(ms);
 }
 
 SInt32 CFMessagePortSendRequest(CFMessagePortRef remote, SInt32 msgid, CFDataRef data, CFTimeInterval sendTimeout, CFTimeInterval rcvTimeout, CFStringRef replyMode, CFDataRef *returnDatap) {
-    struct __CFMessagePortMachMessage *sendmsg;
+    mach_msg_base_t *sendmsg;
     CFRunLoopRef currentRL = CFRunLoopGetCurrent();
     CFRunLoopSourceRef source = NULL;
     CFDataRef reply = NULL;
@@ -878,7 +904,7 @@ SInt32 CFMessagePortSendRequest(CFMessagePortRef remote, SInt32 msgid, CFDataRef
        if (sendTimeout < 1.0) sendTimeout = 0.0;
        sendTimeOut = floor(sendTimeout);
     }
-    ret = mach_msg((mach_msg_header_t *)sendmsg, MACH_SEND_MSG|sendOpts, sendmsg->head.msgh_size, 0, MACH_PORT_NULL, sendTimeOut, MACH_PORT_NULL);
+    ret = mach_msg((mach_msg_header_t *)sendmsg, MACH_SEND_MSG|sendOpts, sendmsg->header.msgh_size, 0, MACH_PORT_NULL, sendTimeOut, MACH_PORT_NULL);
     if (KERN_SUCCESS != ret) {
        // need to deallocate the send-once right that might have been created
        if (replyMode != NULL) mach_port_deallocate(mach_task_self(), ((mach_msg_header_t *)sendmsg)->msgh_local_port);
@@ -939,8 +965,8 @@ static mach_port_t __CFMessagePortGetPort(void *info) {
 
 static void *__CFMessagePortPerform(void *msg, CFIndex size, CFAllocatorRef allocator, void *info) {
     CFMessagePortRef ms = info;
-    struct __CFMessagePortMachMessage *msgp = msg;
-    struct __CFMessagePortMachMessage *replymsg;
+    mach_msg_base_t *msgp = msg;
+    mach_msg_base_t *replymsg;
     void *context_info;
     void (*context_release)(const void *);
     CFDataRef returnData, data = NULL;
@@ -964,39 +990,39 @@ static void *__CFMessagePortPerform(void *msg, CFIndex size, CFAllocatorRef allo
 
     int32_t byteslen = 0;
 
-    Boolean invalidComplex = (0 != msgp->body.msgh_descriptor_count) && !(msgp->head.msgh_bits & MACH_MSGH_BITS_COMPLEX);
-    invalidComplex = invalidComplex || ((msgp->head.msgh_bits & MACH_MSGH_BITS_COMPLEX) && (0 == msgp->body.msgh_descriptor_count));
-    Boolean wayTooBig = sizeof(struct __CFMessagePortMachMessage) + __CFMessagePortMaxInlineBytes < msgp->head.msgh_size;
-    Boolean wayTooSmall = msgp->head.msgh_size < sizeof(struct __CFMessagePortMachMessage);
+    Boolean invalidMagic = (MSGP_GET(msgp, magic) != MAGIC) && (CFSwapInt32(MSGP_GET(msgp, magic)) != MAGIC);
+    Boolean invalidComplex = (0 != msgp->body.msgh_descriptor_count) && !(msgp->header.msgh_bits & MACH_MSGH_BITS_COMPLEX);
+    invalidComplex = invalidComplex || ((msgp->header.msgh_bits & MACH_MSGH_BITS_COMPLEX) && (0 == msgp->body.msgh_descriptor_count));
+    Boolean wayTooBig = ((msgp->body.msgh_descriptor_count) ? sizeof(struct __CFMessagePortMachMessage1) : sizeof(struct __CFMessagePortMachMessage0) + __CFMessagePortMaxInlineBytes) < msgp->header.msgh_size;
+    Boolean wayTooSmall = msgp->header.msgh_size < sizeof(struct __CFMessagePortMachMessage0);
     Boolean wrongSize = false;
     if (!(invalidComplex || wayTooBig || wayTooSmall)) {
-        byteslen = CFSwapInt32LittleToHost(msgp->byteslen);
+        byteslen = CFSwapInt32LittleToHost(MSGP_GET(msgp, byteslen));
         wrongSize = (byteslen < 0) || (__CFMessagePortMaxDataSize < byteslen);
-        if (msgp->head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
-            wrongSize = wrongSize || (msgp->desc.out_of_line.size != byteslen);
+        if (0 != msgp->body.msgh_descriptor_count) {
+            wrongSize = wrongSize || (MSGP1_FIELD(msgp, ool).size != byteslen);
         } else {
-            wrongSize = wrongSize || (msgp->head.msgh_size - sizeof(struct __CFMessagePortMachMessage) < byteslen);
+            wrongSize = wrongSize || (msgp->header.msgh_size - sizeof(struct __CFMessagePortMachMessage0) < byteslen);
         }
     }
-    Boolean invalidMsgID = (msgp->head.msgh_id <= 0) || (INT32_MAX < msgp->head.msgh_id); // conversation id
-    if (invalidComplex || wayTooBig || wayTooSmall || wrongSize || invalidMsgID) {
-        mach_msg_security_trailer_t *trailer = (void *)msgp + msgp->head.msgh_size;
-       CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePort: dropping corrupt request Mach message (0b%d%d%d%d%d)"), invalidComplex, wayTooBig, wayTooSmall, wrongSize, invalidMsgID);
+    Boolean invalidMsgID = (msgp->header.msgh_id <= 0) || (INT32_MAX < msgp->header.msgh_id); // conversation id
+    if (invalidMagic || invalidComplex || wayTooBig || wayTooSmall || wrongSize || invalidMsgID) {
+       CFLog(kCFLogLevelWarning, CFSTR("*** CFMessagePort: dropping corrupt request Mach message (0b%d%d%d%d%d%d)"), invalidMagic, invalidComplex, wayTooBig, wayTooSmall, wrongSize, invalidMsgID);
         mach_msg_destroy((mach_msg_header_t *)msgp);
         return NULL;
     }
 
     /* Create no-copy, no-free-bytes wrapper CFData */
     if (0 == msgp->body.msgh_descriptor_count) {
-       uintptr_t msgp_extent = (uintptr_t)((uint8_t *)msgp + msgp->head.msgh_size);
-       uintptr_t data_extent = (uintptr_t)((uint8_t *)&(msgp->bytes) + byteslen);
-       msgid = CFSwapInt32LittleToHost(msgp->msgid);
+       uintptr_t msgp_extent = (uintptr_t)((uint8_t *)msgp + msgp->header.msgh_size);
+       uintptr_t data_extent = (uintptr_t)((uint8_t *)&(MSGP0_FIELD(msgp, bytes)) + byteslen);
+       msgid = CFSwapInt32LittleToHost(MSGP_GET(msgp, msgid));
        if (0 <= byteslen && data_extent <= msgp_extent) {
-           data = CFDataCreateWithBytesNoCopy(allocator, msgp->bytes, byteslen, kCFAllocatorNull);
+           data = CFDataCreateWithBytesNoCopy(allocator, MSGP0_FIELD(msgp, bytes), byteslen, kCFAllocatorNull);
        }
     } else {
-       msgid = CFSwapInt32LittleToHost(msgp->msgid);
-       data = CFDataCreateWithBytesNoCopy(allocator, msgp->desc.out_of_line.address, msgp->desc.out_of_line.size, kCFAllocatorNull);
+       msgid = CFSwapInt32LittleToHost(MSGP_GET(msgp, msgid));
+       data = CFDataCreateWithBytesNoCopy(allocator, MSGP1_FIELD(msgp, ool).address, MSGP1_FIELD(msgp, ool).size, kCFAllocatorNull);
     }
     returnData = ms->_callout(ms, msgid, data, context_info);
     /* Now, returnData could be (1) NULL, (2) an ordinary data < MAX_INLINE,
@@ -1028,13 +1054,13 @@ static void *__CFMessagePortPerform(void *msg, CFIndex size, CFAllocatorRef allo
            memmove(return_bytes, CFDataGetBytePtr(returnData), return_len);
        }
     }
-    replymsg = __CFMessagePortCreateMessage(true, msgp->head.msgh_remote_port, MACH_PORT_NULL, -1 * (int32_t)msgp->head.msgh_id, msgid, return_bytes, return_len);
+    replymsg = __CFMessagePortCreateMessage(true, msgp->header.msgh_remote_port, MACH_PORT_NULL, -1 * (int32_t)msgp->header.msgh_id, msgid, return_bytes, return_len);
     if (1 == replymsg->body.msgh_descriptor_count) {
-       replymsg->desc.out_of_line.deallocate = true;
+       MSGP1_FIELD(replymsg, ool).deallocate = true;
     }
     if (data) CFRelease(data);
     if (1 == msgp->body.msgh_descriptor_count) {
-       vm_deallocate(mach_task_self(), (vm_address_t)msgp->desc.out_of_line.address, msgp->desc.out_of_line.size);
+       vm_deallocate(mach_task_self(), (vm_address_t)MSGP1_FIELD(msgp, ool).address, MSGP1_FIELD(msgp, ool).size);
     }
     if (returnData) CFRelease(returnData);
     if (context_release) {
@@ -1046,9 +1072,13 @@ static void *__CFMessagePortPerform(void *msg, CFIndex size, CFAllocatorRef allo
 CFRunLoopSourceRef CFMessagePortCreateRunLoopSource(CFAllocatorRef allocator, CFMessagePortRef ms, CFIndex order) {
     CFRunLoopSourceRef result = NULL;
     __CFGenericValidateType(ms, __kCFMessagePortTypeID);
-//#warning CF: This should be an assert
-   // if (__CFMessagePortIsRemote(ms)) return NULL;
+    if (!CFMessagePortIsValid(ms)) return NULL;
+    if (__CFMessagePortIsRemote(ms)) return NULL;
     __CFMessagePortLock(ms);
+    if (NULL != ms->_source && !CFRunLoopSourceIsValid(ms->_source)) {
+        CFRelease(ms->_source);
+        ms->_source = NULL;
+    }
     if (NULL == ms->_source && NULL == ms->_dispatchSource && __CFMessagePortIsValid(ms)) {
        CFRunLoopSourceContext1 context;
        context.version = 1;
@@ -1089,7 +1119,7 @@ void CFMessagePortSetDispatchQueue(CFMessagePortRef ms, dispatch_queue_t queue)
     }
 
     if (ms->_dispatchSource) {
-        dispatch_cancel(ms->_dispatchSource);
+        dispatch_source_cancel(ms->_dispatchSource);
         ms->_dispatchSource = NULL;
         ms->_dispatchQ = NULL;
     }
@@ -1097,49 +1127,44 @@ void CFMessagePortSetDispatchQueue(CFMessagePortRef ms, dispatch_queue_t queue)
     if (queue) {
         mach_port_t port = __CFMessagePortGetPort(ms);
         if (MACH_PORT_NULL != port) {
-           ms->_dispatchSource = dispatch_source_machport_create(port, DISPATCH_MACHPORT_RECV, DISPATCH_SOURCE_CREATE_SUSPENDED, __mportQueue(), ^(dispatch_source_t source) {
-                    long e = 0, d = dispatch_source_get_error(source, &e);
-                    if (DISPATCH_ERROR_DOMAIN_POSIX == d && ECANCELED == e) {
-                        dispatch_release(queue);
-                        dispatch_release(source);
-                        return;
-                    }
-                    if (DISPATCH_ERROR_DOMAIN_NO_ERROR != d) {
-                        HALT;
-                    }
-
-                    CFRetain(ms);
-                   mach_port_t port = dispatch_source_get_handle(source);
-                    mach_msg_header_t *msg = (mach_msg_header_t *)CFAllocatorAllocate(kCFAllocatorSystemDefault, 2048, 0);
-                    msg->msgh_size = 2048;
-
-                    for (;;) {
-                        msg->msgh_bits = 0;
-                        msg->msgh_local_port = port;
-                        msg->msgh_remote_port = MACH_PORT_NULL;
-                        msg->msgh_id = 0;
-
-                        kern_return_t ret = mach_msg(msg, MACH_RCV_MSG|MACH_RCV_LARGE|MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)|MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_AV), 0, msg->msgh_size, port, 0, MACH_PORT_NULL);
-                        if (MACH_MSG_SUCCESS == ret) break;
-                        if (MACH_RCV_TOO_LARGE != ret) HALT;
-
-                        uint32_t newSize = round_msg(msg->msgh_size + MAX_TRAILER_SIZE);
-                        msg = CFAllocatorReallocate(kCFAllocatorSystemDefault, msg, newSize, 0);
-                        msg->msgh_size = newSize;
-                    }
-
-                    dispatch_async(queue, ^{
-                            mach_msg_header_t *reply = __CFMessagePortPerform(msg, msg->msgh_size, kCFAllocatorSystemDefault, ms);
-                            if (NULL != reply) {
-                                kern_return_t ret = mach_msg(reply, MACH_SEND_MSG, reply->msgh_size, 0, MACH_PORT_NULL, 0, MACH_PORT_NULL);
-                                if (KERN_SUCCESS != ret) mach_msg_destroy(reply);
-                                CFAllocatorDeallocate(kCFAllocatorSystemDefault, reply);
-                            }
-                            CFAllocatorDeallocate(kCFAllocatorSystemDefault, msg);
-                            CFRelease(ms);
-                        });
-                });
-       }
+               dispatch_source_t theSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_MACH_RECV, port, 0, __mportQueue());
+               dispatch_source_set_cancel_handler(theSource, ^{
+                   dispatch_release(queue);
+                   dispatch_release(theSource);
+               });
+               dispatch_source_set_event_handler(theSource, ^{
+                   CFRetain(ms);
+                   mach_msg_header_t *msg = (mach_msg_header_t *)CFAllocatorAllocate(kCFAllocatorSystemDefault, 2048, 0);
+                   msg->msgh_size = 2048;
+
+                   for (;;) {
+                       msg->msgh_bits = 0;
+                       msg->msgh_local_port = port;
+                       msg->msgh_remote_port = MACH_PORT_NULL;
+                       msg->msgh_id = 0;
+
+                       kern_return_t ret = mach_msg(msg, MACH_RCV_MSG|MACH_RCV_LARGE|MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)|MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_AV), 0, msg->msgh_size, port, 0, MACH_PORT_NULL);
+                       if (MACH_MSG_SUCCESS == ret) break;
+                       if (MACH_RCV_TOO_LARGE != ret) HALT;
+
+                       uint32_t newSize = round_msg(msg->msgh_size + MAX_TRAILER_SIZE);
+                       msg = CFAllocatorReallocate(kCFAllocatorSystemDefault, msg, newSize, 0);
+                       msg->msgh_size = newSize;
+                   }
+
+                   dispatch_async(queue, ^{
+                       mach_msg_header_t *reply = __CFMessagePortPerform(msg, msg->msgh_size, kCFAllocatorSystemDefault, ms);
+                       if (NULL != reply) {
+                           kern_return_t ret = mach_msg(reply, MACH_SEND_MSG, reply->msgh_size, 0, MACH_PORT_NULL, 0, MACH_PORT_NULL);
+                           if (KERN_SUCCESS != ret) mach_msg_destroy(reply);
+                           CFAllocatorDeallocate(kCFAllocatorSystemDefault, reply);
+                       }
+                       CFAllocatorDeallocate(kCFAllocatorSystemDefault, msg);
+                       CFRelease(ms);
+                   });
+               });
+               ms->_dispatchSource = theSource;
+           }
         if (ms->_dispatchSource) {
             dispatch_retain(queue);
             ms->_dispatchQ = queue;