1 typedef struct _IODataQueueEntry
{
6 #define DATA_QUEUE_ENTRY_HEADER_SIZE sizeof(IODataQueueEntry)
8 typedef struct _IODataQueueMemory
{
9 volatile uint32_t head
;
10 volatile uint32_t tail
;
11 volatile uint8_t needServicedCallback
;
12 volatile uint8_t _resv
[31];
13 IODataQueueEntry queue
[0];
16 struct IODataQueueDispatchSource_IVars
{
17 IODataQueueMemory
* dataQueue
;
18 IODataQueueDispatchSource
* source
;
19 // IODispatchQueue * queue;
20 IOMemoryDescriptor
* memory
;
21 OSAction
* dataAvailableAction
;
22 OSAction
* dataServicedAction
;
24 uint32_t queueByteCount
;
33 IODataQueueDispatchSource::init()
39 ivars
= IONewZero(IODataQueueDispatchSource_IVars
, 1);
45 ret
= CopyMemory(&ivars
->memory
);
46 assert(kIOReturnSuccess
== ret
);
51 ret
= ivars
->memory
->Map(0, 0, 0, 0, &address
, &length
);
52 assert(kIOReturnSuccess
== ret
);
53 ivars
->dataQueue
= (typeof(ivars
->dataQueue
))(uintptr_t) address
;
54 ivars
->queueByteCount
= length
;
61 IODataQueueDispatchSource::CheckForWork_Impl(
65 IOReturn ret
= kIOReturnNotReady
;
73 IODataQueueDispatchSource::Create_Impl(
74 uint64_t queueByteCount
,
75 IODispatchQueue
* queue
,
76 IODataQueueDispatchSource
** source
)
78 IODataQueueDispatchSource
* inst
;
79 IOBufferMemoryDescriptor
* bmd
;
81 if (3 & queueByteCount
) {
82 return kIOReturnBadArgument
;
84 if (queueByteCount
> UINT_MAX
) {
85 return kIOReturnBadArgument
;
87 inst
= OSTypeAlloc(IODataQueueDispatchSource
);
89 return kIOReturnNoMemory
;
93 return kIOReturnError
;
96 bmd
= IOBufferMemoryDescriptor::withOptions(
97 kIODirectionOutIn
| kIOMemoryKernelUserShared
,
98 queueByteCount
, page_size
);
101 return kIOReturnNoMemory
;
103 inst
->ivars
->memory
= bmd
;
104 inst
->ivars
->queueByteCount
= ((uint32_t) queueByteCount
);
105 inst
->ivars
->options
= 0;
106 inst
->ivars
->dataQueue
= (typeof(inst
->ivars
->dataQueue
))bmd
->getBytesNoCopy();
110 return kIOReturnSuccess
;
114 IODataQueueDispatchSource::CopyMemory_Impl(
115 IOMemoryDescriptor
** memory
)
118 IOMemoryDescriptor
* result
;
120 result
= ivars
->memory
;
123 ret
= kIOReturnSuccess
;
125 ret
= kIOReturnNotReady
;
133 IODataQueueDispatchSource::CopyDataAvailableHandler_Impl(
139 result
= ivars
->dataAvailableAction
;
142 ret
= kIOReturnSuccess
;
144 ret
= kIOReturnNotReady
;
152 IODataQueueDispatchSource::CopyDataServicedHandler_Impl(
158 result
= ivars
->dataServicedAction
;
161 ret
= kIOReturnSuccess
;
163 ret
= kIOReturnNotReady
;
170 IODataQueueDispatchSource::SetDataAvailableHandler_Impl(
174 OSAction
* oldAction
;
176 oldAction
= ivars
->dataAvailableAction
;
177 if (oldAction
&& OSCompareAndSwapPtr(oldAction
, NULL
, &ivars
->dataAvailableAction
)) {
178 oldAction
->release();
182 ivars
->dataAvailableAction
= action
;
183 if (IsDataAvailable()) {
184 DataAvailable(ivars
->dataAvailableAction
);
187 ret
= kIOReturnSuccess
;
193 IODataQueueDispatchSource::SetDataServicedHandler_Impl(
197 OSAction
* oldAction
;
199 oldAction
= ivars
->dataServicedAction
;
200 if (oldAction
&& OSCompareAndSwapPtr(oldAction
, NULL
, &ivars
->dataServicedAction
)) {
201 oldAction
->release();
205 ivars
->dataServicedAction
= action
;
207 ret
= kIOReturnSuccess
;
215 IODataQueueDispatchSource::SendDataAvailable(void)
219 if (!ivars
->dataAvailableAction
) {
220 ret
= CopyDataAvailableHandler(&ivars
->dataAvailableAction
);
221 if (kIOReturnSuccess
!= ret
) {
222 ivars
->dataAvailableAction
= NULL
;
225 if (ivars
->dataAvailableAction
) {
226 DataAvailable(ivars
->dataAvailableAction
);
231 IODataQueueDispatchSource::SendDataServiced(void)
235 if (!ivars
->dataServicedAction
) {
236 ret
= CopyDataServicedHandler(&ivars
->dataServicedAction
);
237 if (kIOReturnSuccess
!= ret
) {
238 ivars
->dataServicedAction
= NULL
;
241 if (ivars
->dataServicedAction
) {
242 ivars
->dataQueue
->needServicedCallback
= false;
243 DataServiced(ivars
->dataServicedAction
);
248 IODataQueueDispatchSource::SetEnableWithCompletion_Impl(
250 IODispatchSourceCancelHandler handler
)
255 ivars
->enable
= enable
;
258 ret
= kIOReturnSuccess
;
263 IODataQueueDispatchSource::free()
265 OSSafeReleaseNULL(ivars
->memory
);
266 OSSafeReleaseNULL(ivars
->dataAvailableAction
);
267 OSSafeReleaseNULL(ivars
->dataServicedAction
);
268 IOSafeDeleteNULL(ivars
, IODataQueueDispatchSource_IVars
, 1);
273 IODataQueueDispatchSource::Cancel_Impl(
274 IODispatchSourceCancelHandler handler
)
276 return kIOReturnSuccess
;
280 IODataQueueDispatchSource::IsDataAvailable(void)
282 IODataQueueMemory
*dataQueue
= ivars
->dataQueue
;
284 return dataQueue
&& (dataQueue
->head
!= dataQueue
->tail
);
288 IODataQueueDispatchSource::Peek(IODataQueueClientDequeueEntryBlock callback
)
290 IODataQueueEntry
* entry
= NULL
;
291 IODataQueueMemory
* dataQueue
;
292 uint32_t callerDataSize
;
297 dataQueue
= ivars
->dataQueue
;
299 return kIOReturnNoMemory
;
302 // Read head and tail with acquire barrier
303 headOffset
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->head
, __ATOMIC_RELAXED
);
304 tailOffset
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->tail
, __ATOMIC_ACQUIRE
);
306 if (headOffset
!= tailOffset
) {
307 IODataQueueEntry
* head
= NULL
;
308 uint32_t headSize
= 0;
309 uint32_t queueSize
= ivars
->queueByteCount
;
311 if (headOffset
> queueSize
) {
312 return kIOReturnError
;
315 head
= (IODataQueueEntry
*)((uintptr_t)dataQueue
->queue
+ headOffset
);
316 callerDataSize
= head
->size
;
317 if (os_add_overflow(3, callerDataSize
, &headSize
)) {
318 return kIOReturnError
;
322 // Check if there's enough room before the end of the queue for a header.
323 // If there is room, check if there's enough room to hold the header and
326 if ((headOffset
> UINT32_MAX
- DATA_QUEUE_ENTRY_HEADER_SIZE
) ||
327 (headOffset
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> queueSize
) ||
328 (headOffset
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> UINT32_MAX
- headSize
) ||
329 (headOffset
+ headSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> queueSize
)) {
330 // No room for the header or the data, wrap to the beginning of the queue.
331 // Note: wrapping even with the UINT32_MAX checks, as we have to support
332 // queueSize of UINT32_MAX
333 entry
= dataQueue
->queue
;
334 callerDataSize
= entry
->size
;
335 dataSize
= entry
->size
;
336 if (os_add_overflow(3, callerDataSize
, &dataSize
)) {
337 return kIOReturnError
;
341 if ((dataSize
> UINT32_MAX
- DATA_QUEUE_ENTRY_HEADER_SIZE
) ||
342 (dataSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> queueSize
)) {
343 return kIOReturnError
;
346 callback(&entry
->data
, callerDataSize
);
347 return kIOReturnSuccess
;
349 callback(&head
->data
, callerDataSize
);
350 return kIOReturnSuccess
;
354 return kIOReturnUnderrun
;
358 IODataQueueDispatchSource::Dequeue(IODataQueueClientDequeueEntryBlock callback
)
361 bool sendDataServiced
;
363 sendDataServiced
= false;
364 ret
= DequeueWithCoalesce(&sendDataServiced
, callback
);
365 if (sendDataServiced
) {
372 IODataQueueDispatchSource::DequeueWithCoalesce(bool * sendDataServiced
,
373 IODataQueueClientDequeueEntryBlock callback
)
375 IOReturn retVal
= kIOReturnSuccess
;
376 IODataQueueEntry
* entry
= NULL
;
377 IODataQueueMemory
* dataQueue
;
378 uint32_t callerDataSize
;
379 uint32_t dataSize
= 0;
380 uint32_t headOffset
= 0;
381 uint32_t tailOffset
= 0;
382 uint32_t newHeadOffset
= 0;
384 dataQueue
= ivars
->dataQueue
;
386 return kIOReturnNoMemory
;
389 // Read head and tail with acquire barrier
390 headOffset
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->head
, __ATOMIC_RELAXED
);
391 tailOffset
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->tail
, __ATOMIC_ACQUIRE
);
393 if (headOffset
!= tailOffset
) {
394 IODataQueueEntry
* head
= NULL
;
395 uint32_t headSize
= 0;
396 uint32_t queueSize
= ivars
->queueByteCount
;
398 if (headOffset
> queueSize
) {
399 return kIOReturnError
;
402 head
= (IODataQueueEntry
*)((uintptr_t)dataQueue
->queue
+ headOffset
);
403 callerDataSize
= head
->size
;
404 if (os_add_overflow(3, callerDataSize
, &headSize
)) {
405 return kIOReturnError
;
409 // we wrapped around to beginning, so read from there
410 // either there was not even room for the header
411 if ((headOffset
> UINT32_MAX
- DATA_QUEUE_ENTRY_HEADER_SIZE
) ||
412 (headOffset
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> queueSize
) ||
413 // or there was room for the header, but not for the data
414 (headOffset
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> UINT32_MAX
- headSize
) ||
415 (headOffset
+ headSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> queueSize
)) {
416 // Note: we have to wrap to the beginning even with the UINT32_MAX checks
417 // because we have to support a queueSize of UINT32_MAX.
418 entry
= dataQueue
->queue
;
419 callerDataSize
= entry
->size
;
421 if (os_add_overflow(callerDataSize
, 3, &dataSize
)) {
422 return kIOReturnError
;
425 if ((dataSize
> UINT32_MAX
- DATA_QUEUE_ENTRY_HEADER_SIZE
) ||
426 (dataSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> queueSize
)) {
427 return kIOReturnError
;
429 newHeadOffset
= dataSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
;
430 // else it is at the end
434 if ((headSize
> UINT32_MAX
- DATA_QUEUE_ENTRY_HEADER_SIZE
) ||
435 (headSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> UINT32_MAX
- headOffset
) ||
436 (headSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
+ headOffset
> queueSize
)) {
437 return kIOReturnError
;
439 newHeadOffset
= headOffset
+ headSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
;
443 if (dataQueue
->needServicedCallback
) {
444 *sendDataServiced
= true;
446 return kIOReturnUnderrun
;
449 callback(&entry
->data
, callerDataSize
);
450 if (dataQueue
->needServicedCallback
) {
451 *sendDataServiced
= true;
454 __c11_atomic_store((_Atomic
uint32_t *)&dataQueue
->head
, newHeadOffset
, __ATOMIC_RELEASE
);
456 if (newHeadOffset
== tailOffset
) {
458 // If we are making the queue empty, then we need to make sure
459 // that either the enqueuer notices, or we notice the enqueue
460 // that raced with our making of the queue empty.
462 __c11_atomic_thread_fence(__ATOMIC_SEQ_CST
);
469 IODataQueueDispatchSource::Enqueue(uint32_t callerDataSize
,
470 IODataQueueClientEnqueueEntryBlock callback
)
473 bool sendDataAvailable
;
475 sendDataAvailable
= false;
476 ret
= EnqueueWithCoalesce(callerDataSize
, &sendDataAvailable
, callback
);
477 if (sendDataAvailable
) {
484 IODataQueueDispatchSource::EnqueueWithCoalesce(uint32_t callerDataSize
,
485 bool * sendDataAvailable
,
486 IODataQueueClientEnqueueEntryBlock callback
)
488 IODataQueueMemory
* dataQueue
;
489 IODataQueueEntry
* entry
;
496 IOReturn retVal
= kIOReturnSuccess
;
498 dataQueue
= ivars
->dataQueue
;
500 return kIOReturnNoMemory
;
502 queueSize
= ivars
->queueByteCount
;
504 // Force a single read of head and tail
505 tail
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->tail
, __ATOMIC_RELAXED
);
506 head
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->head
, __ATOMIC_ACQUIRE
);
508 if (os_add_overflow(callerDataSize
, 3, &dataSize
)) {
509 return kIOReturnOverrun
;
513 // Check for overflow of entrySize
514 if (os_add_overflow(DATA_QUEUE_ENTRY_HEADER_SIZE
, dataSize
, &entrySize
)) {
515 return kIOReturnOverrun
;
518 // Check for underflow of (getQueueSize() - tail)
519 if (queueSize
< tail
|| queueSize
< head
) {
520 return kIOReturnUnderrun
;
525 // Is there enough room at the end for the entry?
526 if ((entrySize
<= (UINT32_MAX
- tail
)) &&
527 ((tail
+ entrySize
) <= queueSize
)) {
528 entry
= (IODataQueueEntry
*)((uintptr_t)dataQueue
->queue
+ tail
);
530 callback(&entry
->data
, callerDataSize
);
532 entry
->size
= callerDataSize
;
534 // The tail can be out of bound when the size of the new entry
535 // exactly matches the available space at the end of the queue.
536 // The tail can range from 0 to queueSize inclusive.
538 newTail
= tail
+ entrySize
;
539 } else if (head
> entrySize
) { // Is there enough room at the beginning?
540 entry
= (IODataQueueEntry
*)((uintptr_t)dataQueue
->queue
);
542 callback(&entry
->data
, callerDataSize
);
544 // Wrap around to the beginning, but do not allow the tail to catch
547 entry
->size
= callerDataSize
;
549 // We need to make sure that there is enough room to set the size before
550 // doing this. The user client checks for this and will look for the size
551 // at the beginning if there isn't room for it at the end.
553 if ((queueSize
- tail
) >= DATA_QUEUE_ENTRY_HEADER_SIZE
) {
554 ((IODataQueueEntry
*)((uintptr_t)dataQueue
->queue
+ tail
))->size
= dataSize
;
559 retVal
= kIOReturnOverrun
; // queue is full
562 // Do not allow the tail to catch up to the head when the queue is full.
563 // That's why the comparison uses a '>' rather than '>='.
565 if ((head
- tail
) > entrySize
) {
566 entry
= (IODataQueueEntry
*)((uintptr_t)dataQueue
->queue
+ tail
);
568 callback(&entry
->data
, callerDataSize
);
570 entry
->size
= callerDataSize
;
572 newTail
= tail
+ entrySize
;
574 retVal
= kIOReturnOverrun
; // queue is full
578 // Send notification (via mach message) that data is available.
580 if (retVal
== kIOReturnSuccess
) {
581 // Publish the data we just enqueued
582 __c11_atomic_store((_Atomic
uint32_t *)&dataQueue
->tail
, newTail
, __ATOMIC_RELEASE
);
586 // The memory barrier below pairs with the one in dequeue
587 // so that either our store to the tail cannot be missed by
588 // the next dequeue attempt, or we will observe the dequeuer
589 // making the queue empty.
591 // Of course, if we already think the queue is empty,
592 // there's no point paying this extra cost.
594 __c11_atomic_thread_fence(__ATOMIC_SEQ_CST
);
595 head
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->head
, __ATOMIC_RELAXED
);
599 // Send notification that data is now available.
600 *sendDataAvailable
= true;
601 retVal
= kIOReturnSuccess
;
603 } else if (retVal
== kIOReturnOverrun
) {
604 // ask to be notified of Dequeue()
605 dataQueue
->needServicedCallback
= true;
606 *sendDataAvailable
= true;