+++ /dev/null
-#include "StreamSource.h"
-#include <string>
-#include "misc.h"
-
-using namespace std;
-
-CFStringRef gStreamSourceName = CFSTR("StreamSource");
-
-const CFIndex kMaximumSize = 2048;
-
-StreamSource::StreamSource(CFReadStreamRef input, Transform* transform, CFStringRef name)
- : Source(gStreamSourceName, transform, name),
- mReadStream(input),
- mReading(dispatch_group_create())
-{
- dispatch_group_enter(mReading);
- CFRetain(mReadStream);
-}
-
-void StreamSource::BackgroundActivate()
-{
- CFIndex result = 0;
-
- do
- {
- // make a buffer big enough to handle the object
- // NOTE: allocating this on the stack and letting CFDataCreate copy it is _faster_ then malloc and CFDataCreateWithBytes(..., kCFAllactorMalloc) by a fair margin. At least for 2K chunks. Retest if changing the size.
- UInt8 buffer[kMaximumSize];
-
- result = CFReadStreamRead(mReadStream, buffer, kMaximumSize);
-
- if (result > 0) // was data returned?
- {
- // make the data and send it to the transform
- CFDataRef data = CFDataCreate(NULL, buffer, result);
-
- CFErrorRef error = mDestination->SetAttribute(mDestinationName, data);
-
- CFRelease(data);
-
- if (error != NULL) // we have a problem, there was probably an abort on the chain
- {
- return; // quiesce the source
- }
- }
- } while (result > 0);
-
- if (result < 0)
- {
- // we got an error!
- CFErrorRef error = CFReadStreamCopyError(mReadStream);
- mDestination->SetAttribute(mDestinationName, error);
- if (error)
- {
- // NOTE: CF doesn't always tell us about this error. Arguably it could be better to
- // "invent" a generic error, but it is a hard argument that we want to crash in CFRelease(NULL)...
- CFRelease(error);
- }
- }
- else
- {
- // send an EOS
- mDestination->SetAttribute(mDestinationName, NULL); // end of stream
- }
-}
-
-void StreamSource::DoActivate()
-{
- CFRetain(mDestination->GetCFObject());
- dispatch_group_async(mReading, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
- this->BackgroundActivate();
- CFRelease(mDestination->GetCFObject());
- });
- dispatch_group_leave(mReading);
-}
-
-void StreamSource::Finalize()
-{
- dispatch_group_notify(mReading, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{
- delete this;
- });
-}
-
-StreamSource::~StreamSource()
-{
- CFRelease(mReadStream);
- mReadStream = NULL;
- dispatch_release(mReading);
- mReading = NULL;
-}
-
-
-Boolean StreamSource::Equal(const CoreFoundationObject* object)
-{
- // not equal if we are not the same object
- if (Source::Equal(object))
- {
- const StreamSource* ss = (StreamSource*) object;
- return CFEqual(ss->mReadStream, mReadStream);
- }
-
- return false;
-}
-
-
-
-CFTypeRef StreamSource::Make(CFReadStreamRef input, Transform* transform, CFStringRef name)
-{
- return CoreFoundationHolder::MakeHolder(gInternalCFObjectName, new StreamSource(input, transform, name));
-}
-
-
-
-string StreamSource::DebugDescription()
-{
- string result = Source::DebugDescription() + ": Stream ";
-
- char buffer[256];
- snprintf(buffer, sizeof(buffer), "(mReadStream = %p)", mReadStream);
-
- result += buffer;
-
- return result;
-}