1 #include "StreamSource.h"
7 CFStringRef gStreamSourceName
= CFSTR("StreamSource");
9 const CFIndex kMaximumSize
= 2048;
11 StreamSource::StreamSource(CFReadStreamRef input
, Transform
* transform
, CFStringRef name
)
12 : Source(gStreamSourceName
, transform
, name
),
14 mReading(dispatch_group_create())
16 dispatch_group_enter(mReading
);
17 CFRetain(mReadStream
);
20 void StreamSource::BackgroundActivate()
26 // make a buffer big enough to handle the object
27 // NOTE: allocating this on the stack and letting CFDataCreate copy it is _faster_ then malloc and CFDataCreateWithBytes(..., kCFAllactorMalloc) by a fair margin. At least for 2K chunks. Retest if changing the size.
28 UInt8 buffer
[kMaximumSize
];
30 result
= CFReadStreamRead(mReadStream
, buffer
, kMaximumSize
);
32 if (result
> 0) // was data returned?
34 // make the data and send it to the transform
35 CFDataRef data
= CFDataCreate(NULL
, buffer
, result
);
37 CFErrorRef error
= mDestination
->SetAttribute(mDestinationName
, data
);
41 if (error
!= NULL
) // we have a problem, there was probably an abort on the chain
43 return; // quiesce the source
51 CFErrorRef error
= CFReadStreamCopyError(mReadStream
);
52 mDestination
->SetAttribute(mDestinationName
, error
);
55 // NOTE: CF doesn't always tell us about this error. Arguably it could be better to
56 // "invent" a generic error, but it is a hard argument that we want to crash in CFRelease(NULL)...
63 mDestination
->SetAttribute(mDestinationName
, NULL
); // end of stream
67 void StreamSource::DoActivate()
69 CFRetain(mDestination
->GetCFObject());
70 dispatch_group_async(mReading
, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW
, 0), ^{
71 this->BackgroundActivate();
72 CFRelease(mDestination
->GetCFObject());
74 dispatch_group_leave(mReading
);
77 void StreamSource::Finalize()
79 dispatch_group_notify(mReading
, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH
, 0), ^{
84 StreamSource::~StreamSource()
86 CFRelease(mReadStream
);
88 dispatch_release(mReading
);
93 Boolean
StreamSource::Equal(const CoreFoundationObject
* object
)
95 // not equal if we are not the same object
96 if (Source::Equal(object
))
98 const StreamSource
* ss
= (StreamSource
*) object
;
99 return CFEqual(ss
->mReadStream
, mReadStream
);
107 CFTypeRef
StreamSource::Make(CFReadStreamRef input
, Transform
* transform
, CFStringRef name
)
109 return CoreFoundationHolder::MakeHolder(gInternalCFObjectName
, new StreamSource(input
, transform
, name
));
114 string
StreamSource::DebugDescription()
116 string result
= Source::DebugDescription() + ": Stream ";
119 snprintf(buffer
, sizeof(buffer
), "(mReadStream = %p)", mReadStream
);