]> git.saurik.com Git - apple/cf.git/blobdiff - CFConcreteStreams.c
CF-476.10.tar.gz
[apple/cf.git] / CFConcreteStreams.c
diff --git a/CFConcreteStreams.c b/CFConcreteStreams.c
new file mode 100644 (file)
index 0000000..e71ca10
--- /dev/null
@@ -0,0 +1,853 @@
+/*
+ * Copyright (c) 2008 Apple Inc. All rights reserved.
+ *
+ * @APPLE_LICENSE_HEADER_START@
+ * 
+ * This file contains Original Code and/or Modifications of Original Code
+ * as defined in and that are subject to the Apple Public Source License
+ * Version 2.0 (the 'License'). You may not use this file except in
+ * compliance with the License. Please obtain a copy of the License at
+ * http://www.opensource.apple.com/apsl/ and read it before using this
+ * file.
+ * 
+ * The Original Code and all software distributed under the License are
+ * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
+ * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
+ * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
+ * Please see the License for the specific language governing rights and
+ * limitations under the License.
+ * 
+ * @APPLE_LICENSE_HEADER_END@
+ */
+/*     CFConcreteStreams.c
+       Copyright 2000-2002, Apple, Inc. All rights reserved.
+       Responsibility: Becky Willrich
+*/
+
+#define _DARWIN_UNLIMITED_SELECT 1
+
+#include "CFStreamInternal.h"
+#include "CFInternal.h"
+#include "CFPriv.h"
+#include <CoreFoundation/CFNumber.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <fcntl.h>
+#include <string.h>
+#include <stdio.h>
+#if DEPLOYMENT_TARGET_MACOSX
+#include <sys/time.h>
+#include <unistd.h>
+#endif
+
+// On Unix, you can schedule an fd with the RunLoop by creating a CFSocket around it.  On Win32
+// files and sockets are not interchangeable, and we do cheapo scheduling, where the file is
+// always readable and writable until we hit EOF (similar to the way CFData streams are scheduled).
+#if DEPLOYMENT_TARGET_MACOSX
+#define REAL_FILE_SCHEDULING (1)
+#endif
+
+#define SCHEDULE_AFTER_WRITE  (0)
+#define SCHEDULE_AFTER_READ   (1)
+#define APPEND                (3)
+#define AT_EOF                (4)
+#define USE_RUNLOOP_ARRAY     (5)
+
+
+/* File callbacks */
+typedef struct {
+    CFURLRef url;
+    int fd;
+#ifdef REAL_FILE_SCHEDULING
+    union {
+        CFSocketRef sock;              // socket created once we open and have an fd
+        CFMutableArrayRef rlArray;     // scheduling information prior to open
+    } rlInfo; // If fd > 0, sock exists.  Otherwise, rlArray.
+#else
+    uint16_t scheduled;        // ref count of how many times we've been scheduled
+#endif
+    CFOptionFlags flags;    
+    off_t offset;
+} _CFFileStreamContext;
+
+
+CONST_STRING_DECL(kCFStreamPropertyFileCurrentOffset, "kCFStreamPropertyFileCurrentOffset");
+
+
+#ifdef REAL_FILE_SCHEDULING
+static void fileCallBack(CFSocketRef s, CFSocketCallBackType type, CFDataRef address, const void *data, void *info);
+
+static void constructCFSocket(_CFFileStreamContext *fileStream, Boolean forRead, struct _CFStream *stream) {
+    CFSocketContext context = {0, stream, NULL, NULL, CFCopyDescription};
+    CFSocketRef sock = CFSocketCreateWithNative(CFGetAllocator(stream), fileStream->fd, forRead ? kCFSocketReadCallBack : kCFSocketWriteCallBack, fileCallBack, &context);
+    CFSocketSetSocketFlags(sock, 0);
+    if (fileStream->rlInfo.rlArray) {
+        CFIndex i, c = CFArrayGetCount(fileStream->rlInfo.rlArray);
+        CFRunLoopSourceRef src = CFSocketCreateRunLoopSource(CFGetAllocator(stream), sock, 0);
+        for (i = 0; i+1 < c; i += 2) {
+            CFRunLoopRef rl = (CFRunLoopRef)CFArrayGetValueAtIndex(fileStream->rlInfo.rlArray, i);
+            CFStringRef mode = CFArrayGetValueAtIndex(fileStream->rlInfo.rlArray, i+1);
+            CFRunLoopAddSource(rl, src, mode);
+        }
+        CFRelease(fileStream->rlInfo.rlArray);
+        CFRelease(src);
+    }    
+    fileStream->rlInfo.sock = sock;
+}
+#endif
+
+static Boolean constructFD(_CFFileStreamContext *fileStream, CFStreamError *error, Boolean forRead, struct _CFStream *stream) {
+    UInt8 path[1024];
+    int flags = forRead ? O_RDONLY : (O_CREAT | O_TRUNC | O_WRONLY);
+
+    if (CFURLGetFileSystemRepresentation(fileStream->url, TRUE, path, 1024) == FALSE) {
+        error->error = ENOENT;
+        error->domain = kCFStreamErrorDomainPOSIX;
+        return FALSE;
+    }
+    if (__CFBitIsSet(fileStream->flags, APPEND)) {
+        flags |= O_APPEND;
+        if(_CFExecutableLinkedOnOrAfter(CFSystemVersionPanther)) flags &= ~O_TRUNC;
+    }
+    
+    do {
+        fileStream->fd = open((const char *)path, flags, 0666);
+        
+        if (fileStream->fd < 0)
+            break;
+        
+        if ((fileStream->offset != -1) && (lseek(fileStream->fd, fileStream->offset, SEEK_SET) == -1))
+            break;
+
+#ifdef REAL_FILE_SCHEDULING
+        if (fileStream->rlInfo.rlArray != NULL) {
+            constructCFSocket(fileStream, forRead, stream);
+        }
+#endif
+
+        return TRUE;
+    } while (1);
+
+    __CFBitSet(fileStream->flags, USE_RUNLOOP_ARRAY);
+    error->error = errno;
+    error->domain = kCFStreamErrorDomainPOSIX;
+
+    return FALSE;
+}
+
+static Boolean fileOpen(struct _CFStream *stream, CFStreamError *errorCode, Boolean *openComplete, void *info) {
+    _CFFileStreamContext *ctxt = (_CFFileStreamContext *)info;
+    Boolean forRead = (CFGetTypeID(stream) == CFReadStreamGetTypeID());
+    *openComplete = TRUE;
+    if (ctxt->url) {
+        if (constructFD(ctxt, errorCode, forRead, stream)) {
+#ifndef REAL_FILE_SCHEDULING
+            if (ctxt->scheduled > 0) {
+                if (forRead)
+                    CFReadStreamSignalEvent((CFReadStreamRef)stream, kCFStreamEventHasBytesAvailable, NULL);
+                else
+                    CFWriteStreamSignalEvent((CFWriteStreamRef)stream, kCFStreamEventCanAcceptBytes, NULL);
+            }
+#endif
+            return TRUE;
+        } else {
+            return FALSE;
+        }
+#ifdef REAL_FILE_SCHEDULING
+    } else if (ctxt->rlInfo.rlArray != NULL) {
+        constructCFSocket(ctxt, forRead, stream);
+#endif
+    }
+    return TRUE;
+}
+
+__private_extern__ CFIndex fdRead(int fd, UInt8 *buffer, CFIndex bufferLength, CFStreamError *errorCode, Boolean *atEOF) {
+    CFIndex bytesRead = read(fd, buffer, bufferLength);
+    if (bytesRead < 0) {
+        errorCode->error = errno;
+        errorCode->domain = kCFStreamErrorDomainPOSIX;
+        return -1;
+    } else {
+        *atEOF = (bytesRead == 0) ? TRUE : FALSE;
+        errorCode->error = 0;
+        return bytesRead;
+    }
+}
+
+static CFIndex fileRead(CFReadStreamRef stream, UInt8 *buffer, CFIndex bufferLength, CFStreamError *errorCode, Boolean *atEOF, void *info) {
+    _CFFileStreamContext *ctxt = (_CFFileStreamContext *)info;
+    CFIndex result;
+    result = fdRead(ctxt->fd, buffer, bufferLength, errorCode, atEOF);
+#ifdef REAL_FILE_SCHEDULING
+    if (__CFBitIsSet(ctxt->flags, SCHEDULE_AFTER_READ)) {
+        __CFBitClear(ctxt->flags, SCHEDULE_AFTER_READ);
+        if (ctxt->rlInfo.sock) {
+            CFSocketEnableCallBacks(ctxt->rlInfo.sock, kCFSocketReadCallBack);
+        }
+    }
+#else
+    if (*atEOF)
+        __CFBitSet(ctxt->flags, AT_EOF);
+    if (ctxt->scheduled > 0 && !*atEOF) {
+        CFReadStreamSignalEvent(stream, kCFStreamEventHasBytesAvailable, NULL);
+    }
+#endif
+    return result;
+}
+
+#ifdef REAL_FILE_SCHEDULING
+__private_extern__ Boolean fdCanRead(int fd) {
+    struct timeval timeout = {0, 0};
+    fd_set *readSetPtr;
+    fd_set readSet;
+    Boolean result;
+// fd_set is not a mask in Win32, so checking for an fd that's too big is not relevant
+    if (fd < FD_SETSIZE) {
+        FD_ZERO(&readSet);
+        readSetPtr = &readSet;
+    } else {
+        int size = howmany(fd+1, NFDBITS) * sizeof(uint32_t);
+        uint32_t *fds_bits = (uint32_t *)malloc(size);
+        memset(fds_bits, 0, size);
+        readSetPtr = (fd_set *)fds_bits;
+    }
+    FD_SET(fd, readSetPtr);
+    result = (select(fd + 1, readSetPtr, NULL, NULL, &timeout) == 1) ? TRUE : FALSE;
+    if (readSetPtr != &readSet) {
+        free(readSetPtr);
+    }
+    return result;
+}
+#endif
+
+static Boolean fileCanRead(CFReadStreamRef stream, void *info) {
+    _CFFileStreamContext *ctxt = (_CFFileStreamContext *)info;
+#ifdef REAL_FILE_SCHEDULING
+    return fdCanRead(ctxt->fd);
+#else
+    return !__CFBitIsSet(ctxt->flags, AT_EOF);
+#endif
+}
+
+__private_extern__ CFIndex fdWrite(int fd, const UInt8 *buffer, CFIndex bufferLength, CFStreamError *errorCode) {
+    CFIndex bytesWritten = write(fd, buffer, bufferLength);
+    if (bytesWritten < 0) {
+        errorCode->error = errno;
+        errorCode->domain = kCFStreamErrorDomainPOSIX;
+        return -1;
+    } else {
+        errorCode->error = 0;
+        return bytesWritten;
+    }
+}
+
+static CFIndex fileWrite(CFWriteStreamRef stream, const UInt8 *buffer, CFIndex bufferLength, CFStreamError *errorCode, void *info) {
+    _CFFileStreamContext *fileStream = ((_CFFileStreamContext *)info);
+    CFIndex result = fdWrite(fileStream->fd, buffer, bufferLength, errorCode);
+#ifdef REAL_FILE_SCHEDULING
+    if (__CFBitIsSet(fileStream->flags, SCHEDULE_AFTER_WRITE)) {
+        __CFBitClear(fileStream->flags, SCHEDULE_AFTER_WRITE);
+        if (fileStream->rlInfo.sock) {
+            CFSocketEnableCallBacks(fileStream->rlInfo.sock, kCFSocketWriteCallBack);
+        }
+    }
+#else
+    if (fileStream->scheduled > 0) {
+        CFWriteStreamSignalEvent(stream, kCFStreamEventCanAcceptBytes, NULL);
+    }
+#endif
+    return result;
+}
+
+#ifdef REAL_FILE_SCHEDULING
+__private_extern__ Boolean fdCanWrite(int fd) {
+    struct timeval timeout = {0, 0};
+    fd_set *writeSetPtr;
+    fd_set writeSet;
+    Boolean result;
+    if (fd < FD_SETSIZE) {
+        FD_ZERO(&writeSet);
+        writeSetPtr = &writeSet;
+    } else {
+        int size = howmany(fd+1, NFDBITS) * sizeof(uint32_t);
+        uint32_t *fds_bits = (uint32_t *)malloc(size);
+        memset(fds_bits, 0, size);
+        writeSetPtr = (fd_set *)fds_bits;
+    }
+    FD_SET(fd, writeSetPtr);
+    result = (select(fd + 1, NULL, writeSetPtr, NULL, &timeout) == 1) ? TRUE : FALSE;
+    if (writeSetPtr != &writeSet) {
+        free(writeSetPtr);
+    }
+    return result;
+}
+#endif
+
+static Boolean fileCanWrite(CFWriteStreamRef stream, void *info) {
+#ifdef REAL_FILE_SCHEDULING
+    return fdCanWrite(((_CFFileStreamContext *)info)->fd);
+#else
+    return TRUE;
+#endif
+}
+
+static void fileClose(struct _CFStream *stream, void *info) {
+    _CFFileStreamContext *ctxt = (_CFFileStreamContext *)info;
+    if (ctxt->fd >= 0) {
+        close(ctxt->fd);
+        ctxt->fd = -1;
+#ifdef REAL_FILE_SCHEDULING
+        if (ctxt->rlInfo.sock) {
+            CFSocketInvalidate(ctxt->rlInfo.sock);
+            CFRelease(ctxt->rlInfo.sock);
+            ctxt->rlInfo.sock = NULL;
+        }
+    } else if (ctxt->rlInfo.rlArray) {
+        CFRelease(ctxt->rlInfo.rlArray);
+        ctxt->rlInfo.rlArray = NULL;
+#endif
+    }
+}
+
+#ifdef REAL_FILE_SCHEDULING
+static void fileCallBack(CFSocketRef s, CFSocketCallBackType type, CFDataRef address, const void *data, void *info) {
+    struct _CFStream *stream = (struct _CFStream *)info;
+    Boolean isReadStream = (CFGetTypeID(stream) == CFReadStreamGetTypeID());
+    _CFFileStreamContext *fileStream = isReadStream ? CFReadStreamGetInfoPointer((CFReadStreamRef)stream) : CFWriteStreamGetInfoPointer((CFWriteStreamRef)stream);
+    if (type == kCFSocketWriteCallBack) {
+        __CFBitSet(fileStream->flags, SCHEDULE_AFTER_WRITE);
+        CFWriteStreamSignalEvent((CFWriteStreamRef)stream, kCFStreamEventCanAcceptBytes, NULL);
+    } else {
+        // type == kCFSocketReadCallBack
+        __CFBitSet(fileStream->flags, SCHEDULE_AFTER_READ);
+        CFReadStreamSignalEvent((CFReadStreamRef)stream, kCFStreamEventHasBytesAvailable, NULL);
+    }
+}
+#endif
+
+static void fileSchedule(struct _CFStream *stream, CFRunLoopRef runLoop, CFStringRef runLoopMode, void *info) {
+    _CFFileStreamContext *fileStream = (_CFFileStreamContext *)info;
+    Boolean isReadStream = (CFGetTypeID(stream) == CFReadStreamGetTypeID());
+    CFStreamStatus status = isReadStream ? CFReadStreamGetStatus((CFReadStreamRef)stream) : CFWriteStreamGetStatus((CFWriteStreamRef)stream);
+    if (fileStream->fd < 0 && status != kCFStreamStatusNotOpen) {
+        // Stream's already closed or error-ed out 
+        return;
+    }
+#ifdef REAL_FILE_SCHEDULING
+    if (status == kCFStreamStatusNotOpen) {
+        if (!fileStream->rlInfo.rlArray) {
+            fileStream->rlInfo.rlArray = CFArrayCreateMutable(CFGetAllocator(stream), 0, &kCFTypeArrayCallBacks);
+        }
+        CFArrayAppendValue(fileStream->rlInfo.rlArray, runLoop);
+        CFArrayAppendValue(fileStream->rlInfo.rlArray, runLoopMode);
+    } else {
+        CFRunLoopSourceRef rlSrc;
+        if (!fileStream->rlInfo.sock) {
+            constructCFSocket(fileStream, isReadStream, stream);
+        }
+        rlSrc = CFSocketCreateRunLoopSource(CFGetAllocator(stream), fileStream->rlInfo.sock, 0);
+        CFRunLoopAddSource(runLoop, rlSrc, runLoopMode);
+        CFRelease(rlSrc);
+    }
+#else
+    fileStream->scheduled++;
+    if (fileStream->scheduled == 1 && fileStream->fd > 0 && status == kCFStreamStatusOpen) {
+        if (isReadStream)
+            CFReadStreamSignalEvent((CFReadStreamRef)stream, kCFStreamEventHasBytesAvailable, NULL);
+        else
+            CFWriteStreamSignalEvent((CFWriteStreamRef)stream, kCFStreamEventCanAcceptBytes, NULL);
+    }
+#endif
+}
+
+static void fileUnschedule(struct _CFStream *stream, CFRunLoopRef runLoop, CFStringRef runLoopMode, void *info) {
+    _CFFileStreamContext *fileStream = (_CFFileStreamContext *)info;
+#ifdef REAL_FILE_SCHEDULING
+    Boolean isReadStream = (CFGetTypeID(stream) == CFReadStreamGetTypeID());
+    CFStreamStatus status = isReadStream ? CFReadStreamGetStatus((CFReadStreamRef)stream) : CFWriteStreamGetStatus((CFWriteStreamRef)stream);
+    if (status == kCFStreamStatusNotOpen) {
+        // Not opened yet
+        if (fileStream->rlInfo.rlArray) {
+            CFMutableArrayRef runloops = fileStream->rlInfo.rlArray;
+            CFIndex i, c;
+            for (i = 0, c = CFArrayGetCount(runloops); i+1 < c; i += 2) {
+                if (CFEqual(CFArrayGetValueAtIndex(runloops, i), runLoop) && CFEqual(CFArrayGetValueAtIndex(runloops, i+1), runLoopMode)) {
+                    CFArrayRemoveValueAtIndex(runloops, i);
+                    CFArrayRemoveValueAtIndex(runloops, i);
+                    break;
+                }
+            }
+        }
+    } else if (fileStream->rlInfo.sock) {
+               if (__CFBitIsSet(fileStream->flags, USE_RUNLOOP_ARRAY)) {
+                       // we know that fileStream->rlInfo.rlArray is non-NULL because it is in a union with fileStream->rlInfo.sock
+            CFMutableArrayRef runloops = fileStream->rlInfo.rlArray;
+            CFIndex i, c;
+            for (i = 0, c = CFArrayGetCount(runloops); i+1 < c; i += 2) {
+                if (CFEqual(CFArrayGetValueAtIndex(runloops, i), runLoop) && CFEqual(CFArrayGetValueAtIndex(runloops, i+1), runLoopMode)) {
+                    CFArrayRemoveValueAtIndex(runloops, i);
+                    CFArrayRemoveValueAtIndex(runloops, i);
+                    break;
+                }
+            }
+        } else {
+                       CFRunLoopSourceRef sockSource = CFSocketCreateRunLoopSource(CFGetAllocator(stream), fileStream->rlInfo.sock, 0);
+                       CFRunLoopRemoveSource(runLoop, sockSource, runLoopMode);
+                       CFRelease(sockSource);
+               }
+    }
+#else
+    if (fileStream->scheduled > 0)
+        fileStream->scheduled--;
+#endif
+}
+
+static CFTypeRef fileCopyProperty(struct _CFStream *stream, CFStringRef propertyName, void *info) {
+    
+    CFTypeRef result = NULL;
+    _CFFileStreamContext *fileStream = (_CFFileStreamContext *)info;
+
+    if (CFEqual(propertyName, kCFStreamPropertyFileCurrentOffset)) {
+        
+        // NOTE that this does a lseek of 0 from the current location in
+        // order to populate the offset field which will then be used to
+        // create the resulting value.
+        if (!__CFBitIsSet(fileStream->flags, APPEND) && fileStream->fd != -1) {
+            fileStream->offset = lseek(fileStream->fd, 0, SEEK_CUR);
+        }
+        
+        if (fileStream->offset != -1) {
+            result = CFNumberCreate(CFGetAllocator((CFTypeRef)stream), kCFNumberSInt64Type, &(fileStream->offset));
+        }
+    }
+
+    return result;
+}
+
+static Boolean fileSetProperty(struct _CFStream *stream, CFStringRef prop, CFTypeRef val, void *info) {
+    
+    Boolean result = FALSE;
+    _CFFileStreamContext *fileStream = (_CFFileStreamContext *)info;
+
+    if (CFEqual(prop, kCFStreamPropertyAppendToFile) && CFGetTypeID(stream) == CFWriteStreamGetTypeID() &&
+        CFWriteStreamGetStatus((CFWriteStreamRef)stream) == kCFStreamStatusNotOpen)
+    {
+        if (val == kCFBooleanTrue) {
+            __CFBitSet(fileStream->flags, APPEND);
+            fileStream->offset = -1;                           // Can't offset and append on the stream
+        } else {
+            __CFBitClear(fileStream->flags, APPEND);
+        }
+        result = TRUE;
+    }
+    
+    else if (CFEqual(prop, kCFStreamPropertyFileCurrentOffset)) {
+        
+        if (!__CFBitIsSet(fileStream->flags, APPEND))
+        {
+            result = CFNumberGetValue((CFNumberRef)val, kCFNumberSInt64Type, &(fileStream->offset));
+        }
+        
+        if ((fileStream->fd != -1) && (lseek(fileStream->fd, fileStream->offset, SEEK_SET) == -1)) {
+            result = FALSE;
+        }
+    }
+    
+    return result;
+}
+
+static void *fileCreate(struct _CFStream *stream, void *info) {
+    _CFFileStreamContext *ctxt = (_CFFileStreamContext *)info;
+    _CFFileStreamContext *newCtxt = (_CFFileStreamContext *)CFAllocatorAllocate(CFGetAllocator(stream), sizeof(_CFFileStreamContext), 0);
+    if (!newCtxt) return NULL;
+    newCtxt->url = ctxt->url;
+    if (newCtxt->url) {
+        CFRetain(newCtxt->url);
+    }
+    newCtxt->fd = ctxt->fd;
+#ifdef REAL_FILE_SCHEDULING
+    newCtxt->rlInfo.sock = NULL;
+#else
+    newCtxt->scheduled = 0;
+#endif
+    newCtxt->flags = 0;
+    newCtxt->offset = -1;
+    return newCtxt;
+}
+
+static void    fileFinalize(struct _CFStream *stream, void *info) {
+    _CFFileStreamContext *ctxt = (_CFFileStreamContext *)info;
+    if (ctxt->fd > 0) {
+#ifdef REAL_FILE_SCHEDULING
+        if (ctxt->rlInfo.sock) {
+            CFSocketInvalidate(ctxt->rlInfo.sock); 
+            CFRelease(ctxt->rlInfo.sock);
+        }
+#endif
+        close(ctxt->fd);
+#ifdef REAL_FILE_SCHEDULING
+    } else if (ctxt->rlInfo.rlArray) {
+        CFRelease(ctxt->rlInfo.rlArray);
+#endif
+    }
+    if (ctxt->url) {
+        CFRelease(ctxt->url);
+    }
+    CFAllocatorDeallocate(CFGetAllocator(stream), ctxt);
+}
+
+static CFStringRef fileCopyDescription(struct _CFStream *stream, void *info) {
+    // This needs work
+    _CFFileStreamContext *ctxt = (_CFFileStreamContext *)info;
+    if (ctxt->url) {
+        return CFCopyDescription(ctxt->url);
+    } else {
+        return CFStringCreateWithFormat(CFGetAllocator(stream), NULL, CFSTR("fd = %d"), ctxt->fd);
+    }
+}
+
+/* CFData stream callbacks */
+typedef struct {
+    CFDataRef data; // Mutable if the stream was constructed writable
+    const UInt8 *loc; // Current location in the file
+    Boolean scheduled;
+    char _padding[3];
+} _CFReadDataStreamContext;
+
+#define BUF_SIZE 1024
+typedef struct _CFStreamByteBuffer {
+    UInt8 *bytes;
+    CFIndex capacity, length;
+    struct _CFStreamByteBuffer *next;
+} _CFStreamByteBuffer;
+
+typedef struct {
+    _CFStreamByteBuffer *firstBuf, *currentBuf;
+    CFAllocatorRef bufferAllocator;
+    Boolean scheduled;
+    char _padding[3];
+} _CFWriteDataStreamContext;
+
+static Boolean readDataOpen(struct _CFStream *stream, CFStreamError *errorCode, Boolean *openComplete, void *info) {
+    _CFReadDataStreamContext *dataStream = (_CFReadDataStreamContext *)info;
+    if (dataStream->scheduled) {
+        if (CFDataGetLength(dataStream->data) != 0) {
+            CFReadStreamSignalEvent((CFReadStreamRef)stream, kCFStreamEventHasBytesAvailable, NULL);
+        } else {
+            CFReadStreamSignalEvent((CFReadStreamRef)stream, kCFStreamEventEndEncountered, NULL);
+        }
+    }
+    errorCode->error = 0;
+    *openComplete = TRUE;
+    return TRUE;
+}
+
+static void readDataSchedule(struct _CFStream *stream, CFRunLoopRef rl, CFStringRef rlMode, void *info) {
+    _CFReadDataStreamContext *dataStream = (_CFReadDataStreamContext *)info;
+    if (dataStream->scheduled == FALSE) {
+        dataStream->scheduled = TRUE;
+               if (CFReadStreamGetStatus((CFReadStreamRef)stream) != kCFStreamStatusOpen)
+                       return;
+        if (CFDataGetBytePtr(dataStream->data) + CFDataGetLength(dataStream->data) > dataStream->loc) {
+            CFReadStreamSignalEvent((CFReadStreamRef)stream, kCFStreamEventHasBytesAvailable, NULL);
+        } else {
+            CFReadStreamSignalEvent((CFReadStreamRef)stream, kCFStreamEventEndEncountered, NULL);
+        }
+    }
+}
+
+static CFIndex dataRead(CFReadStreamRef stream, UInt8 *buffer, CFIndex bufferLength, CFStreamError *error, Boolean *atEOF, void *info) {
+    _CFReadDataStreamContext *dataCtxt = (_CFReadDataStreamContext *)info;
+    const UInt8 *bytePtr = CFDataGetBytePtr(dataCtxt->data);
+    CFIndex length = CFDataGetLength(dataCtxt->data);
+    CFIndex bytesToCopy = bytePtr + length - dataCtxt->loc;
+    if (bytesToCopy > bufferLength) {
+        bytesToCopy = bufferLength;
+    }
+    if (bytesToCopy < 0) {
+        bytesToCopy = 0;
+    }
+    if (bytesToCopy != 0) {
+        memmove(buffer, dataCtxt->loc, bytesToCopy);
+        dataCtxt->loc += bytesToCopy;
+    }
+    error->error = 0;
+    *atEOF = (dataCtxt->loc < bytePtr + length) ? FALSE : TRUE;
+    if (dataCtxt->scheduled && !*atEOF) {
+        CFReadStreamSignalEvent(stream, kCFStreamEventHasBytesAvailable, NULL);
+    }
+    return bytesToCopy;
+}
+
+static const UInt8 *dataGetBuffer(CFReadStreamRef stream, CFIndex maxBytesToRead, CFIndex *numBytesRead, CFStreamError *error, Boolean *atEOF, void *info) {
+    _CFReadDataStreamContext *dataCtxt = (_CFReadDataStreamContext *)info;
+    const UInt8 *bytes = CFDataGetBytePtr(dataCtxt->data);
+    if (dataCtxt->loc - bytes > maxBytesToRead) {
+        *numBytesRead = maxBytesToRead;
+        *atEOF = FALSE;
+    } else {
+        *numBytesRead = dataCtxt->loc - bytes;
+        *atEOF = TRUE;
+    }
+    error->error = 0;
+    bytes = dataCtxt->loc;
+    dataCtxt->loc += *numBytesRead;
+    if (dataCtxt->scheduled && !*atEOF) {
+        CFReadStreamSignalEvent(stream, kCFStreamEventHasBytesAvailable, NULL);
+    }
+    return bytes;
+}
+
+static Boolean dataCanRead(CFReadStreamRef stream, void *info) {
+    _CFReadDataStreamContext *dataCtxt = (_CFReadDataStreamContext *)info;
+    return (CFDataGetBytePtr(dataCtxt->data) + CFDataGetLength(dataCtxt->data) > dataCtxt->loc) ? TRUE : FALSE;
+}
+
+static Boolean writeDataOpen(struct _CFStream *stream, CFStreamError *errorCode, Boolean *openComplete, void *info) {
+    _CFWriteDataStreamContext *dataStream = (_CFWriteDataStreamContext *)info;
+    if (dataStream->scheduled) {
+        if (dataStream->bufferAllocator != kCFAllocatorNull || dataStream->currentBuf->capacity > dataStream->currentBuf->length) {
+            CFWriteStreamSignalEvent((CFWriteStreamRef)stream, kCFStreamEventCanAcceptBytes, NULL);
+        } else {
+            CFWriteStreamSignalEvent((CFWriteStreamRef)stream, kCFStreamEventEndEncountered, NULL);
+        }
+    }
+    errorCode->error = 0;
+    *openComplete = TRUE;
+    return TRUE;
+}
+
+static void writeDataSchedule(struct _CFStream *stream, CFRunLoopRef rl, CFStringRef rlMode, void *info) {
+    _CFWriteDataStreamContext *dataStream = (_CFWriteDataStreamContext *)info;
+    if (dataStream->scheduled == FALSE) {
+        dataStream->scheduled = TRUE;
+               if (CFWriteStreamGetStatus((CFWriteStreamRef)stream) != kCFStreamStatusOpen)
+                       return;
+        if (dataStream->bufferAllocator != kCFAllocatorNull || dataStream->currentBuf->capacity > dataStream->currentBuf->length) {
+            CFWriteStreamSignalEvent((CFWriteStreamRef)stream, kCFStreamEventCanAcceptBytes, NULL);
+        } else {
+            CFWriteStreamSignalEvent((CFWriteStreamRef)stream, kCFStreamEventEndEncountered, NULL);
+        }
+    }
+}
+
+static CFIndex dataWrite(CFWriteStreamRef stream, const UInt8 *buffer, CFIndex bufferLength, CFStreamError *errorCode, void *info) {
+    _CFWriteDataStreamContext *dataStream = (_CFWriteDataStreamContext *)info;
+    CFIndex result;
+    CFIndex freeSpace = dataStream->currentBuf->capacity - dataStream->currentBuf->length;
+    if (dataStream->bufferAllocator == kCFAllocatorNull && bufferLength > freeSpace) {
+        errorCode->error = ENOMEM;
+        errorCode->domain = kCFStreamErrorDomainPOSIX;
+        return -1;
+    } else {
+        result = bufferLength;
+        while (bufferLength > 0) {
+            CFIndex amountToCopy = (bufferLength > freeSpace) ? freeSpace : bufferLength;
+            if (freeSpace > 0) {
+                memmove(dataStream->currentBuf->bytes + dataStream->currentBuf->length, buffer, amountToCopy);
+                buffer += amountToCopy;
+                bufferLength -= amountToCopy;
+                dataStream->currentBuf->length += amountToCopy;
+            }
+            if (bufferLength > 0) {
+                CFIndex bufSize = BUF_SIZE > bufferLength ? BUF_SIZE : bufferLength;
+                _CFStreamByteBuffer *newBuf = (_CFStreamByteBuffer *)CFAllocatorAllocate(dataStream->bufferAllocator, sizeof(_CFStreamByteBuffer) + bufSize, 0);
+                newBuf->bytes = (UInt8 *)(newBuf + 1);
+                newBuf->capacity = bufSize;
+                newBuf->length = 0;
+                newBuf->next = NULL;
+                dataStream->currentBuf->next = newBuf;
+                dataStream->currentBuf = newBuf;
+                freeSpace = bufSize;
+            }
+        }
+        errorCode->error = 0;
+    }
+    if (dataStream->scheduled && (dataStream->bufferAllocator != kCFAllocatorNull || dataStream->currentBuf->capacity > dataStream->currentBuf->length)) {
+        CFWriteStreamSignalEvent(stream, kCFStreamEventCanAcceptBytes, NULL);
+    }
+    return result;
+}
+
+static  Boolean dataCanWrite(CFWriteStreamRef stream, void *info) {
+    _CFWriteDataStreamContext *dataStream = (_CFWriteDataStreamContext *)info;
+    if (dataStream->bufferAllocator != kCFAllocatorNull) return TRUE;
+    if (dataStream->currentBuf->capacity  > dataStream->currentBuf->length) return TRUE;
+    return FALSE;
+}
+
+static CFPropertyListRef dataCopyProperty(struct _CFStream *stream, CFStringRef propertyName, void *info) {
+    _CFWriteDataStreamContext *dataStream = (_CFWriteDataStreamContext *)info;
+    CFIndex size = 0;
+    _CFStreamByteBuffer *buf;
+    CFAllocatorRef alloc;
+    UInt8 *bytes, *currByte;
+    if (!CFEqual(propertyName, kCFStreamPropertyDataWritten)) return NULL;
+    if (dataStream->bufferAllocator == kCFAllocatorNull)  return NULL;
+    alloc = dataStream->bufferAllocator;
+    for (buf = dataStream->firstBuf; buf != NULL; buf = buf->next) {
+        size += buf->length;
+    }
+    if (size == 0) return NULL;
+    bytes = (UInt8 *)CFAllocatorAllocate(alloc, size, 0);
+    currByte = bytes;
+    for (buf = dataStream->firstBuf; buf != NULL; buf = buf->next) {
+        memmove(currByte, buf->bytes, buf->length);
+        currByte += buf->length;
+    }
+    return CFDataCreateWithBytesNoCopy(alloc, bytes, size, alloc);
+}
+
+static void *readDataCreate(struct _CFStream *stream, void *info) {
+    _CFReadDataStreamContext *ctxt = (_CFReadDataStreamContext *)info;
+    _CFReadDataStreamContext *newCtxt = (_CFReadDataStreamContext *)CFAllocatorAllocate(CFGetAllocator(stream), sizeof(_CFReadDataStreamContext), 0);
+    if (!newCtxt) return NULL;
+    newCtxt->data = (CFDataRef)CFRetain(ctxt->data);
+    newCtxt->loc = CFDataGetBytePtr(newCtxt->data);
+    newCtxt->scheduled = FALSE;
+    return (void *)newCtxt;
+}
+
+static void readDataFinalize(struct _CFStream *stream, void *info) {
+    _CFReadDataStreamContext *ctxt = (_CFReadDataStreamContext *)info;
+    CFRelease(ctxt->data);
+    CFAllocatorDeallocate(CFGetAllocator(stream), ctxt);
+}
+
+static CFStringRef readDataCopyDescription(struct _CFStream *stream, void *info) {
+    return CFCopyDescription(((_CFReadDataStreamContext *)info)->data);
+}
+
+static void *writeDataCreate(struct _CFStream *stream, void *info) {
+    _CFWriteDataStreamContext *ctxt = (_CFWriteDataStreamContext *)info;
+    _CFWriteDataStreamContext *newCtxt;
+    if (ctxt->bufferAllocator != kCFAllocatorNull) {
+        if (ctxt->bufferAllocator == NULL) ctxt->bufferAllocator = CFAllocatorGetDefault();
+        CFRetain(ctxt->bufferAllocator);
+        newCtxt = (_CFWriteDataStreamContext *)CFAllocatorAllocate(CFGetAllocator(stream), sizeof(_CFWriteDataStreamContext) + sizeof(_CFStreamByteBuffer) + BUF_SIZE, 0);
+        newCtxt->firstBuf = (_CFStreamByteBuffer *)(newCtxt + 1);
+        newCtxt->firstBuf->bytes = (UInt8 *)(newCtxt->firstBuf + 1);
+        newCtxt->firstBuf->capacity = BUF_SIZE;
+        newCtxt->firstBuf->length = 0;
+        newCtxt->firstBuf->next = NULL;
+        newCtxt->currentBuf = newCtxt->firstBuf;
+        newCtxt->bufferAllocator = ctxt->bufferAllocator;
+        newCtxt->scheduled = FALSE;
+    } else {
+        newCtxt = (_CFWriteDataStreamContext *)CFAllocatorAllocate(CFGetAllocator(stream), sizeof(_CFWriteDataStreamContext) + sizeof(_CFStreamByteBuffer), 0);
+        newCtxt->firstBuf = (_CFStreamByteBuffer *)(newCtxt+1);
+        newCtxt->firstBuf->bytes = ctxt->firstBuf->bytes;
+        newCtxt->firstBuf->capacity = ctxt->firstBuf->capacity;
+        newCtxt->firstBuf->length = 0;
+        newCtxt->firstBuf->next = NULL;
+        newCtxt->currentBuf = newCtxt->firstBuf;
+        newCtxt->bufferAllocator = kCFAllocatorNull;
+        newCtxt->scheduled = FALSE;
+    }
+    return (void *)newCtxt;
+}
+
+static void writeDataFinalize(struct _CFStream *stream, void *info) {
+    _CFWriteDataStreamContext *ctxt = (_CFWriteDataStreamContext *)info;
+    if (ctxt->bufferAllocator != kCFAllocatorNull) {
+        _CFStreamByteBuffer *buf = ctxt->firstBuf->next, *next;
+        while (buf != NULL) {
+            next = buf->next;
+            CFAllocatorDeallocate(ctxt->bufferAllocator, buf);
+            buf = next;
+        }
+        CFRelease(ctxt->bufferAllocator);
+    }
+    CFAllocatorDeallocate(CFGetAllocator(stream), ctxt);
+}
+
+static CFStringRef writeDataCopyDescription(struct _CFStream *stream, void *info) {
+    return CFStringCreateWithFormat(kCFAllocatorSystemDefault, NULL, CFSTR("<CFWriteDataContext %p>"), info);
+}
+
+static const struct _CFStreamCallBacksV1 fileCallBacks = {1, fileCreate, fileFinalize, fileCopyDescription, fileOpen, NULL, fileRead, NULL, fileCanRead, fileWrite, fileCanWrite, fileClose, fileCopyProperty, fileSetProperty, NULL, fileSchedule, fileUnschedule};
+
+static struct _CFStream *_CFStreamCreateWithFile(CFAllocatorRef alloc, CFURLRef fileURL, Boolean forReading) {
+    _CFFileStreamContext fileContext;
+    CFStringRef scheme = fileURL ? CFURLCopyScheme(fileURL) : NULL;
+    if (!scheme || !CFEqual(scheme, CFSTR("file"))) {
+        if (scheme) CFRelease(scheme);
+        return NULL;
+    }
+    CFRelease(scheme);
+    fileContext.url = fileURL;
+    fileContext.fd = -1;
+    return _CFStreamCreateWithConstantCallbacks(alloc, &fileContext, (struct _CFStreamCallBacks *)(&fileCallBacks), forReading);
+}
+
+CF_EXPORT CFReadStreamRef CFReadStreamCreateWithFile(CFAllocatorRef alloc, CFURLRef fileURL) {
+    return (CFReadStreamRef)_CFStreamCreateWithFile(alloc, fileURL, TRUE);
+}
+
+CF_EXPORT CFWriteStreamRef CFWriteStreamCreateWithFile(CFAllocatorRef alloc, CFURLRef fileURL) {
+    return (CFWriteStreamRef)_CFStreamCreateWithFile(alloc, fileURL, FALSE);
+}
+
+CFReadStreamRef _CFReadStreamCreateFromFileDescriptor(CFAllocatorRef alloc, int fd) {
+    _CFFileStreamContext fileContext;
+    fileContext.url = NULL;
+    fileContext.fd = fd;
+    return (CFReadStreamRef)_CFStreamCreateWithConstantCallbacks(alloc, &fileContext, (struct _CFStreamCallBacks *)(&fileCallBacks), TRUE);
+}
+
+CFWriteStreamRef _CFWriteStreamCreateFromFileDescriptor(CFAllocatorRef alloc, int fd) {
+    _CFFileStreamContext fileContext;
+    fileContext.url = NULL;
+    fileContext.fd = fd;
+    return (CFWriteStreamRef)_CFStreamCreateWithConstantCallbacks(alloc, &fileContext, (struct _CFStreamCallBacks *)(&fileCallBacks), FALSE);
+}
+
+
+
+static const struct _CFStreamCallBacksV1 readDataCallBacks = {1, readDataCreate, readDataFinalize, readDataCopyDescription, readDataOpen, NULL, dataRead, dataGetBuffer, dataCanRead, NULL, NULL, NULL, NULL, NULL, NULL, readDataSchedule, NULL};
+static const struct _CFStreamCallBacksV1 writeDataCallBacks = {1, writeDataCreate, writeDataFinalize, writeDataCopyDescription, writeDataOpen, NULL, NULL, NULL, NULL, dataWrite, dataCanWrite, NULL, dataCopyProperty, NULL, NULL, writeDataSchedule, NULL};
+
+CF_EXPORT CFReadStreamRef CFReadStreamCreateWithBytesNoCopy(CFAllocatorRef alloc, const UInt8 *bytes, CFIndex length, CFAllocatorRef bytesDeallocator) {
+    _CFReadDataStreamContext ctxt;
+    CFReadStreamRef result;
+    ctxt.data = CFDataCreateWithBytesNoCopy(alloc, bytes, length, bytesDeallocator);
+    result = (CFReadStreamRef)_CFStreamCreateWithConstantCallbacks(alloc, &ctxt, (struct _CFStreamCallBacks *)(&readDataCallBacks), TRUE);
+    CFRelease(ctxt.data);
+    return result;
+}
+
+/* This needs to be exported to make it callable from Foundation. */
+CF_EXPORT CFReadStreamRef CFReadStreamCreateWithData(CFAllocatorRef alloc, CFDataRef data) {
+    _CFReadDataStreamContext ctxt;
+    CFReadStreamRef result = NULL;
+    
+    ctxt.data = CFRetain(data);
+    result = (CFReadStreamRef)_CFStreamCreateWithConstantCallbacks(alloc, &ctxt, (struct _CFStreamCallBacks *)(&readDataCallBacks), TRUE);
+    CFRelease(data);
+    return result;
+}
+
+CFWriteStreamRef CFWriteStreamCreateWithBuffer(CFAllocatorRef alloc, UInt8 *buffer, CFIndex bufferCapacity) {
+    _CFStreamByteBuffer buf;
+    _CFWriteDataStreamContext ctxt;
+    buf.bytes = buffer;
+    buf.capacity = bufferCapacity;
+    buf.length = 0;
+    buf.next = NULL;
+    ctxt.firstBuf = &buf;
+    ctxt.currentBuf = ctxt.firstBuf;
+    ctxt.bufferAllocator = kCFAllocatorNull;
+    return (CFWriteStreamRef)_CFStreamCreateWithConstantCallbacks(alloc, &ctxt, (struct _CFStreamCallBacks *)(&writeDataCallBacks), FALSE);
+}
+
+CF_EXPORT CFWriteStreamRef CFWriteStreamCreateWithAllocatedBuffers(CFAllocatorRef alloc, CFAllocatorRef bufferAllocator) {
+    _CFWriteDataStreamContext ctxt;
+    ctxt.firstBuf = NULL;
+    ctxt.currentBuf = NULL;
+    ctxt.bufferAllocator = bufferAllocator;
+    return (CFWriteStreamRef)_CFStreamCreateWithConstantCallbacks(alloc, &ctxt, (struct _CFStreamCallBacks *)(&writeDataCallBacks), FALSE);
+}
+
+#undef BUF_SIZE
+