2 * Ring buffer I/O for sslThroughput test.
5 #include "ringBufferIo.h"
9 #include <CoreServices/../Frameworks/CarbonCore.framework/Headers/MacErrors.h>
11 /* synchronizes multi-threaded access to printf() */
12 pthread_mutex_t printfMutex
= PTHREAD_MUTEX_INITIALIZER
;
14 /* initialize a RingBuffer */
23 memset(ring
, 0, sizeof(*ring
));
24 ring
->numElements
= numElements
;
25 ring
->elements
= (RingElement
*)malloc(sizeof(RingElement
) * numElements
);
26 memset(ring
->elements
, 0, sizeof(RingElement
) * numElements
);
27 for(dex
=0; dex
<numElements
; dex
++) {
28 RingElement
*elt
= &ring
->elements
[dex
];
29 elt
->buf
= (unsigned char *)malloc(bufSize
);
30 elt
->capacity
= bufSize
;
34 ring
->bufName
= bufName
;
38 #define LOG_RING_DUMP 0
41 static void logRingWrite(
47 pthread_mutex_lock(&printfMutex
);
48 printf("+++ wrote %4u bytes to %s buf %2u\n",
49 (unsigned)written
, ring
->bufName
, dex
);
53 unsigned char *cp
= (unsigned char *)from
;
55 for(i
=0; i
<written
; i
++) {
56 printf("%02X ", cp
[i
]);
57 if((i
< (written
- 1)) && ((i
% 16) == 15)) {
64 pthread_mutex_unlock(&printfMutex
);
67 static void logRingRead(
73 pthread_mutex_lock(&printfMutex
);
74 printf("--- read %4u bytes from %s buf %2u\n",
75 (unsigned)bytesRead
, ring
->bufName
, dex
);
79 unsigned char *cp
= (unsigned char *)to
;
81 for(i
=0; i
<bytesRead
; i
++) {
82 printf("%02X ", cp
[i
]);
83 if((i
< (bytesRead
- 1)) && ((i
% 16) == 15)) {
90 pthread_mutex_unlock(&printfMutex
);
93 static void logRingStall(
98 pthread_mutex_lock(&printfMutex
);
99 printf("=== %s stalled on %s buf %u\n",
100 readerOrWriter
, ring
->bufName
, dex
);
101 pthread_mutex_unlock(&printfMutex
);
104 static void logRingClose(
106 char *readerOrWriter
)
108 pthread_mutex_lock(&printfMutex
);
109 printf("=== %s CLOSED by %s\n",
110 ring
->bufName
, readerOrWriter
);
111 pthread_mutex_unlock(&printfMutex
);
114 static void logRingReset(
117 pthread_mutex_lock(&printfMutex
);
118 printf("=== %s RESET\n", ring
->bufName
);
119 pthread_mutex_unlock(&printfMutex
);
123 #define logRingWrite(r, w, d, t)
124 #define logRingRead(r, b, d, t)
125 #define logRingStall(r, row, d)
126 #define logRingClose(r, row)
127 #define logRingReset(r)
128 #endif /* LOG_RING */
130 void ringBufferReset(
134 for(dex
=0; dex
<ring
->numElements
; dex
++) {
135 RingElement
*elt
= &ring
->elements
[dex
];
141 ring
->closed
= false;
146 * The "I/O" callbacks for SecureTransport.
147 * The SSLConnectionRef is a RingBuffers *.
149 OSStatus
ringReadFunc(
150 SSLConnectionRef connRef
,
152 size_t *dataLen
) /* IN/OUT */
154 RingBuffer
*ring
= ((RingBuffers
*)connRef
)->rdBuf
;
156 if(ring
->writerDex
== ring
->readerDex
) {
159 * Handle race condition: we saw a stall, then writer filled a
160 * RingElement and then set closed. Make sure we read the data before
161 * handling the close event.
163 if(ring
->writerDex
== ring
->readerDex
) {
164 /* writer closed: ECONNRESET */
166 return errSSLClosedAbort
;
168 /* else proceed to read data */
171 /* read stalled, writer thread is writing to our next element */
173 return errSSLWouldBlock
;
177 unsigned char *outp
= (unsigned char *)data
;
178 size_t toMove
= *dataLen
;
179 size_t haveMoved
= 0;
181 /* we own ring->elements[ring->readerDex] */
184 * Read as much data as there is in the buffer, or
185 * toMove, whichever is less
187 RingElement
*elt
= &ring
->elements
[ring
->readerDex
];
188 size_t thisMove
= elt
->validBytes
;
189 if(thisMove
> toMove
) {
192 memmove(outp
, elt
->buf
+ elt
->readOffset
, thisMove
);
193 logRingRead(ring
, thisMove
, ring
->readerDex
, outp
);
195 /* should never happen */
196 printf("***thisMove 0!\n");
197 return internalComponentErr
;
199 elt
->validBytes
-= thisMove
;
200 elt
->readOffset
+= thisMove
;
202 haveMoved
+= thisMove
;
205 if(elt
->validBytes
== 0) {
207 * End of this buffer - advance to next one and keep going if it's
212 /* increment and wrap must be atomic from the point of
213 * view of readerDex */
214 nextDex
= ring
->readerDex
+ 1;
215 if(nextDex
== ring
->numElements
) {
218 ring
->readerDex
= nextDex
;
221 /* caller got what they want */
224 if(ring
->readerDex
== ring
->writerDex
) {
225 logRingStall(ring
, "reader ", ring
->readerDex
);
231 OSStatus ortn
= noErr
;
232 if(haveMoved
!= *dataLen
) {
233 if((haveMoved
== 0) && ring
->closed
) {
234 /* writer closed: ECONNRESET */
235 ortn
= errSSLClosedAbort
;
238 ortn
= errSSLWouldBlock
;
241 *dataLen
= haveMoved
;
246 * This never returns errSSLWouldBlock - we block (spinning) if
247 * we stall because we run into the reader's element.
248 * Also, each call to this function uses up at least one
249 * RingElement - we don't coalesce multiple writes into one
252 * On entry, writerDex is the element we're going to write to.
253 * On exit, writerDex is the element we're going to write to next,
254 * and we might stall before we update it as such.
256 OSStatus
ringWriteFunc(
257 SSLConnectionRef connRef
,
259 size_t *dataLen
) /* IN/OUT */
261 RingBuffer
*ring
= ((RingBuffers
*)connRef
)->wrtBuf
;
262 unsigned char *inp
= (unsigned char *)data
;
263 size_t toMove
= *dataLen
;
264 size_t haveMoved
= 0;
266 OSStatus ortn
= noErr
;
268 /* we own ring->elements[ring->writerDex] */
270 RingElement
*elt
= &ring
->elements
[ring
->writerDex
];
273 size_t thisMove
= toMove
;
274 if(thisMove
> elt
->capacity
) {
275 thisMove
= elt
->capacity
;
277 memmove(elt
->buf
, inp
, thisMove
);
278 logRingWrite(ring
, thisMove
, ring
->writerDex
, inp
);
280 elt
->validBytes
= thisMove
;
282 haveMoved
+= thisMove
;
285 /* move on to next element, when it becomes available */
286 nextDex
= ring
->writerDex
+ 1;
287 if(nextDex
== ring
->numElements
) {
290 if(nextDex
== ring
->readerDex
) {
291 logRingStall(ring
, "writer", nextDex
);
292 while(nextDex
== ring
->readerDex
) {
293 /* if(ring->closed) {
301 ring
->writerDex
= nextDex
;
306 if(ring
->closed
&& (haveMoved
== 0)) {
307 /* reader closed socket: EPIPE */
308 ortn
= errSSLClosedAbort
;
310 *dataLen
= haveMoved
;
314 /* close both sides of a RingBuffers */
315 void ringBuffersClose(
322 logRingClose(rbs
->rdBuf
, "reader");
323 rbs
->rdBuf
->closed
= true;
326 logRingClose(rbs
->wrtBuf
, "writer");
327 rbs
->wrtBuf
->closed
= true;