1 typedef struct _IODataQueueEntry
{
6 #define DATA_QUEUE_ENTRY_HEADER_SIZE sizeof(IODataQueueEntry)
8 typedef struct _IODataQueueMemory
{
9 volatile uint32_t head
;
10 volatile uint32_t tail
;
11 volatile uint8_t needServicedCallback
;
12 volatile uint8_t _resv
[31];
13 IODataQueueEntry queue
[0];
16 struct IODataQueueDispatchSource_IVars
{
17 IODataQueueMemory
* dataQueue
;
18 IODataQueueDispatchSource
* source
;
19 // IODispatchQueue * queue;
20 IOMemoryDescriptor
* memory
;
21 OSAction
* dataAvailableAction
;
22 OSAction
* dataServicedAction
;
24 uint32_t queueByteCount
;
33 IODataQueueDispatchSource::init()
39 ivars
= IONewZero(IODataQueueDispatchSource_IVars
, 1);
45 ret
= CopyMemory(&ivars
->memory
);
46 assert(kIOReturnSuccess
== ret
);
51 ret
= ivars
->memory
->Map(0, 0, 0, 0, &address
, &length
);
52 assert(kIOReturnSuccess
== ret
);
53 ivars
->dataQueue
= (typeof(ivars
->dataQueue
))(uintptr_t) address
;
54 ivars
->queueByteCount
= length
;
61 IMPL(IODataQueueDispatchSource
, CheckForWork
)
63 IOReturn ret
= kIOReturnNotReady
;
71 IMPL(IODataQueueDispatchSource
, Create
)
73 IODataQueueDispatchSource
* inst
;
74 IOBufferMemoryDescriptor
* bmd
;
76 if (3 & queueByteCount
) {
77 return kIOReturnBadArgument
;
79 inst
= OSTypeAlloc(IODataQueueDispatchSource
);
81 return kIOReturnNoMemory
;
85 return kIOReturnError
;
88 bmd
= IOBufferMemoryDescriptor::withOptions(
89 kIODirectionOutIn
| kIOMemoryKernelUserShared
,
90 queueByteCount
, page_size
);
93 return kIOReturnNoMemory
;
95 inst
->ivars
->memory
= bmd
;
96 inst
->ivars
->queueByteCount
= queueByteCount
;
97 inst
->ivars
->options
= 0;
98 inst
->ivars
->dataQueue
= (typeof(inst
->ivars
->dataQueue
))bmd
->getBytesNoCopy();
102 return kIOReturnSuccess
;
106 IMPL(IODataQueueDispatchSource
, CopyMemory
)
109 IOMemoryDescriptor
* result
;
111 result
= ivars
->memory
;
114 ret
= kIOReturnSuccess
;
116 ret
= kIOReturnNotReady
;
124 IMPL(IODataQueueDispatchSource
, CopyDataAvailableHandler
)
129 result
= ivars
->dataAvailableAction
;
132 ret
= kIOReturnSuccess
;
134 ret
= kIOReturnNotReady
;
142 IMPL(IODataQueueDispatchSource
, CopyDataServicedHandler
)
147 result
= ivars
->dataServicedAction
;
150 ret
= kIOReturnSuccess
;
152 ret
= kIOReturnNotReady
;
159 IMPL(IODataQueueDispatchSource
, SetDataAvailableHandler
)
162 OSAction
* oldAction
;
164 oldAction
= ivars
->dataAvailableAction
;
165 if (oldAction
&& OSCompareAndSwapPtr(oldAction
, NULL
, &ivars
->dataAvailableAction
)) {
166 oldAction
->release();
170 ivars
->dataAvailableAction
= action
;
171 if (IsDataAvailable()) {
172 DataAvailable(ivars
->dataAvailableAction
);
175 ret
= kIOReturnSuccess
;
181 IMPL(IODataQueueDispatchSource
, SetDataServicedHandler
)
184 OSAction
* oldAction
;
186 oldAction
= ivars
->dataServicedAction
;
187 if (oldAction
&& OSCompareAndSwapPtr(oldAction
, NULL
, &ivars
->dataServicedAction
)) {
188 oldAction
->release();
192 ivars
->dataServicedAction
= action
;
194 ret
= kIOReturnSuccess
;
202 IODataQueueDispatchSource::SendDataAvailable(void)
206 if (!ivars
->dataAvailableAction
) {
207 ret
= CopyDataAvailableHandler(&ivars
->dataAvailableAction
);
208 if (kIOReturnSuccess
!= ret
) {
209 ivars
->dataAvailableAction
= NULL
;
212 if (ivars
->dataAvailableAction
) {
213 DataAvailable(ivars
->dataAvailableAction
);
218 IODataQueueDispatchSource::SendDataServiced(void)
222 if (!ivars
->dataServicedAction
) {
223 ret
= CopyDataServicedHandler(&ivars
->dataServicedAction
);
224 if (kIOReturnSuccess
!= ret
) {
225 ivars
->dataServicedAction
= NULL
;
228 if (ivars
->dataServicedAction
) {
229 ivars
->dataQueue
->needServicedCallback
= false;
230 DataServiced(ivars
->dataServicedAction
);
235 IMPL(IODataQueueDispatchSource
, SetEnableWithCompletion
)
240 ivars
->enable
= enable
;
243 ret
= kIOReturnSuccess
;
248 IODataQueueDispatchSource::free()
250 OSSafeReleaseNULL(ivars
->memory
);
251 OSSafeReleaseNULL(ivars
->dataAvailableAction
);
252 OSSafeReleaseNULL(ivars
->dataServicedAction
);
253 IOSafeDeleteNULL(ivars
, IODataQueueDispatchSource_IVars
, 1);
258 IMPL(IODataQueueDispatchSource
, Cancel
)
260 return kIOReturnSuccess
;
264 IODataQueueDispatchSource::IsDataAvailable(void)
266 IODataQueueMemory
*dataQueue
= ivars
->dataQueue
;
268 return dataQueue
&& (dataQueue
->head
!= dataQueue
->tail
);
272 IODataQueueDispatchSource::Peek(IODataQueueClientDequeueEntryBlock callback
)
274 IODataQueueEntry
* entry
= NULL
;
275 IODataQueueMemory
* dataQueue
;
276 uint32_t callerDataSize
;
281 dataQueue
= ivars
->dataQueue
;
283 return kIOReturnNoMemory
;
286 // Read head and tail with acquire barrier
287 headOffset
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->head
, __ATOMIC_RELAXED
);
288 tailOffset
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->tail
, __ATOMIC_ACQUIRE
);
290 if (headOffset
!= tailOffset
) {
291 IODataQueueEntry
* head
= NULL
;
292 uint32_t headSize
= 0;
293 uint32_t queueSize
= ivars
->queueByteCount
;
295 if (headOffset
> queueSize
) {
296 return kIOReturnError
;
299 head
= (IODataQueueEntry
*)((uintptr_t)dataQueue
->queue
+ headOffset
);
300 callerDataSize
= head
->size
;
301 if (os_add_overflow(3, callerDataSize
, &headSize
)) {
302 return kIOReturnError
;
306 // Check if there's enough room before the end of the queue for a header.
307 // If there is room, check if there's enough room to hold the header and
310 if ((headOffset
> UINT32_MAX
- DATA_QUEUE_ENTRY_HEADER_SIZE
) ||
311 (headOffset
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> queueSize
) ||
312 (headOffset
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> UINT32_MAX
- headSize
) ||
313 (headOffset
+ headSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> queueSize
)) {
314 // No room for the header or the data, wrap to the beginning of the queue.
315 // Note: wrapping even with the UINT32_MAX checks, as we have to support
316 // queueSize of UINT32_MAX
317 entry
= dataQueue
->queue
;
318 callerDataSize
= entry
->size
;
319 dataSize
= entry
->size
;
320 if (os_add_overflow(3, callerDataSize
, &dataSize
)) {
321 return kIOReturnError
;
325 if ((dataSize
> UINT32_MAX
- DATA_QUEUE_ENTRY_HEADER_SIZE
) ||
326 (dataSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> queueSize
)) {
327 return kIOReturnError
;
330 callback(&entry
->data
, callerDataSize
);
331 return kIOReturnSuccess
;
333 callback(&head
->data
, callerDataSize
);
334 return kIOReturnSuccess
;
338 return kIOReturnUnderrun
;
342 IODataQueueDispatchSource::Dequeue(IODataQueueClientDequeueEntryBlock callback
)
345 bool sendDataServiced
;
347 sendDataServiced
= false;
348 ret
= DequeueWithCoalesce(&sendDataServiced
, callback
);
349 if (sendDataServiced
) {
356 IODataQueueDispatchSource::DequeueWithCoalesce(bool * sendDataServiced
,
357 IODataQueueClientDequeueEntryBlock callback
)
359 IOReturn retVal
= kIOReturnSuccess
;
360 IODataQueueEntry
* entry
= NULL
;
361 IODataQueueMemory
* dataQueue
;
362 uint32_t callerDataSize
;
363 uint32_t dataSize
= 0;
364 uint32_t headOffset
= 0;
365 uint32_t tailOffset
= 0;
366 uint32_t newHeadOffset
= 0;
368 dataQueue
= ivars
->dataQueue
;
370 return kIOReturnNoMemory
;
373 // Read head and tail with acquire barrier
374 headOffset
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->head
, __ATOMIC_RELAXED
);
375 tailOffset
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->tail
, __ATOMIC_ACQUIRE
);
377 if (headOffset
!= tailOffset
) {
378 IODataQueueEntry
* head
= NULL
;
379 uint32_t headSize
= 0;
380 uint32_t queueSize
= ivars
->queueByteCount
;
382 if (headOffset
> queueSize
) {
383 return kIOReturnError
;
386 head
= (IODataQueueEntry
*)((uintptr_t)dataQueue
->queue
+ headOffset
);
387 callerDataSize
= head
->size
;
388 if (os_add_overflow(3, callerDataSize
, &headSize
)) {
389 return kIOReturnError
;
393 // we wrapped around to beginning, so read from there
394 // either there was not even room for the header
395 if ((headOffset
> UINT32_MAX
- DATA_QUEUE_ENTRY_HEADER_SIZE
) ||
396 (headOffset
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> queueSize
) ||
397 // or there was room for the header, but not for the data
398 (headOffset
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> UINT32_MAX
- headSize
) ||
399 (headOffset
+ headSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> queueSize
)) {
400 // Note: we have to wrap to the beginning even with the UINT32_MAX checks
401 // because we have to support a queueSize of UINT32_MAX.
402 entry
= dataQueue
->queue
;
403 callerDataSize
= entry
->size
;
405 if (os_add_overflow(callerDataSize
, 3, &dataSize
)) {
406 return kIOReturnError
;
409 if ((dataSize
> UINT32_MAX
- DATA_QUEUE_ENTRY_HEADER_SIZE
) ||
410 (dataSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> queueSize
)) {
411 return kIOReturnError
;
413 newHeadOffset
= dataSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
;
414 // else it is at the end
418 if ((headSize
> UINT32_MAX
- DATA_QUEUE_ENTRY_HEADER_SIZE
) ||
419 (headSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
> UINT32_MAX
- headOffset
) ||
420 (headSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
+ headOffset
> queueSize
)) {
421 return kIOReturnError
;
423 newHeadOffset
= headOffset
+ headSize
+ DATA_QUEUE_ENTRY_HEADER_SIZE
;
427 if (dataQueue
->needServicedCallback
) {
428 *sendDataServiced
= true;
430 return kIOReturnUnderrun
;
433 callback(&entry
->data
, callerDataSize
);
434 if (dataQueue
->needServicedCallback
) {
435 *sendDataServiced
= true;
438 __c11_atomic_store((_Atomic
uint32_t *)&dataQueue
->head
, newHeadOffset
, __ATOMIC_RELEASE
);
440 if (newHeadOffset
== tailOffset
) {
442 // If we are making the queue empty, then we need to make sure
443 // that either the enqueuer notices, or we notice the enqueue
444 // that raced with our making of the queue empty.
446 __c11_atomic_thread_fence(__ATOMIC_SEQ_CST
);
453 IODataQueueDispatchSource::Enqueue(uint32_t callerDataSize
,
454 IODataQueueClientEnqueueEntryBlock callback
)
457 bool sendDataAvailable
;
459 sendDataAvailable
= false;
460 ret
= EnqueueWithCoalesce(callerDataSize
, &sendDataAvailable
, callback
);
461 if (sendDataAvailable
) {
468 IODataQueueDispatchSource::EnqueueWithCoalesce(uint32_t callerDataSize
,
469 bool * sendDataAvailable
,
470 IODataQueueClientEnqueueEntryBlock callback
)
472 IODataQueueMemory
* dataQueue
;
473 IODataQueueEntry
* entry
;
480 IOReturn retVal
= kIOReturnSuccess
;
482 dataQueue
= ivars
->dataQueue
;
484 return kIOReturnNoMemory
;
486 queueSize
= ivars
->queueByteCount
;
488 // Force a single read of head and tail
489 tail
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->tail
, __ATOMIC_RELAXED
);
490 head
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->head
, __ATOMIC_ACQUIRE
);
492 if (os_add_overflow(callerDataSize
, 3, &dataSize
)) {
493 return kIOReturnOverrun
;
497 // Check for overflow of entrySize
498 if (os_add_overflow(DATA_QUEUE_ENTRY_HEADER_SIZE
, dataSize
, &entrySize
)) {
499 return kIOReturnOverrun
;
502 // Check for underflow of (getQueueSize() - tail)
503 if (queueSize
< tail
|| queueSize
< head
) {
504 return kIOReturnUnderrun
;
509 // Is there enough room at the end for the entry?
510 if ((entrySize
<= (UINT32_MAX
- tail
)) &&
511 ((tail
+ entrySize
) <= queueSize
)) {
512 entry
= (IODataQueueEntry
*)((uintptr_t)dataQueue
->queue
+ tail
);
514 callback(&entry
->data
, callerDataSize
);
516 entry
->size
= callerDataSize
;
518 // The tail can be out of bound when the size of the new entry
519 // exactly matches the available space at the end of the queue.
520 // The tail can range from 0 to queueSize inclusive.
522 newTail
= tail
+ entrySize
;
523 } else if (head
> entrySize
) { // Is there enough room at the beginning?
524 entry
= (IODataQueueEntry
*)((uintptr_t)dataQueue
->queue
);
526 callback(&entry
->data
, callerDataSize
);
528 // Wrap around to the beginning, but do not allow the tail to catch
531 entry
->size
= callerDataSize
;
533 // We need to make sure that there is enough room to set the size before
534 // doing this. The user client checks for this and will look for the size
535 // at the beginning if there isn't room for it at the end.
537 if ((queueSize
- tail
) >= DATA_QUEUE_ENTRY_HEADER_SIZE
) {
538 ((IODataQueueEntry
*)((uintptr_t)dataQueue
->queue
+ tail
))->size
= dataSize
;
543 retVal
= kIOReturnOverrun
; // queue is full
546 // Do not allow the tail to catch up to the head when the queue is full.
547 // That's why the comparison uses a '>' rather than '>='.
549 if ((head
- tail
) > entrySize
) {
550 entry
= (IODataQueueEntry
*)((uintptr_t)dataQueue
->queue
+ tail
);
552 callback(&entry
->data
, callerDataSize
);
554 entry
->size
= callerDataSize
;
556 newTail
= tail
+ entrySize
;
558 retVal
= kIOReturnOverrun
; // queue is full
562 // Send notification (via mach message) that data is available.
564 if (retVal
== kIOReturnSuccess
) {
565 // Publish the data we just enqueued
566 __c11_atomic_store((_Atomic
uint32_t *)&dataQueue
->tail
, newTail
, __ATOMIC_RELEASE
);
570 // The memory barrier below pairs with the one in dequeue
571 // so that either our store to the tail cannot be missed by
572 // the next dequeue attempt, or we will observe the dequeuer
573 // making the queue empty.
575 // Of course, if we already think the queue is empty,
576 // there's no point paying this extra cost.
578 __c11_atomic_thread_fence(__ATOMIC_SEQ_CST
);
579 head
= __c11_atomic_load((_Atomic
uint32_t *)&dataQueue
->head
, __ATOMIC_RELAXED
);
583 // Send notification that data is now available.
584 *sendDataAvailable
= true;
585 retVal
= kIOReturnSuccess
;
587 } else if (retVal
== kIOReturnOverrun
) {
588 // ask to be notified of Dequeue()
589 dataQueue
->needServicedCallback
= true;
590 *sendDataAvailable
= true;