--- /dev/null
+/*
+ * Ring buffer I/O for sslThroughput test.
+ */
+
+#include "ringBufferIo.h"
+#include <strings.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <CoreServices/../Frameworks/CarbonCore.framework/Headers/MacErrors.h>
+
+/* synchronizes multi-threaded access to printf() */
+pthread_mutex_t printfMutex = PTHREAD_MUTEX_INITIALIZER;
+
+/* initialize a RingBuffer */
+void ringBufSetup(
+ RingBuffer *ring,
+ const char *bufName,
+ size_t numElements,
+ size_t bufSize)
+{
+ unsigned dex;
+
+ memset(ring, 0, sizeof(*ring));
+ ring->numElements = numElements;
+ ring->elements = (RingElement *)malloc(sizeof(RingElement) * numElements);
+ memset(ring->elements, 0, sizeof(RingElement) * numElements);
+ for(dex=0; dex<numElements; dex++) {
+ RingElement *elt = &ring->elements[dex];
+ elt->buf = (unsigned char *)malloc(bufSize);
+ elt->capacity = bufSize;
+ }
+ ring->writerDex = 0;
+ ring->readerDex = 0;
+ ring->bufName = bufName;
+}
+
+#define LOG_RING 0
+#define LOG_RING_DUMP 0
+#if LOG_RING
+
+static void logRingWrite(
+ RingBuffer *ring,
+ size_t written,
+ unsigned dex,
+ void *from)
+{
+ pthread_mutex_lock(&printfMutex);
+ printf("+++ wrote %4u bytes to %s buf %2u\n",
+ (unsigned)written, ring->bufName, dex);
+ #if LOG_RING_DUMP
+ {
+ unsigned i;
+ unsigned char *cp = (unsigned char *)from;
+
+ for(i=0; i<written; i++) {
+ printf("%02X ", cp[i]);
+ if((i < (written - 1)) && ((i % 16) == 15)) {
+ printf("\n");
+ }
+ }
+ printf("\n");
+ }
+ #endif
+ pthread_mutex_unlock(&printfMutex);
+}
+
+static void logRingRead(
+ RingBuffer *ring,
+ size_t bytesRead,
+ unsigned dex,
+ void *to)
+{
+ pthread_mutex_lock(&printfMutex);
+ printf("--- read %4u bytes from %s buf %2u\n",
+ (unsigned)bytesRead, ring->bufName, dex);
+ #if LOG_RING_DUMP
+ {
+ unsigned i;
+ unsigned char *cp = (unsigned char *)to;
+
+ for(i=0; i<bytesRead; i++) {
+ printf("%02X ", cp[i]);
+ if((i < (bytesRead - 1)) && ((i % 16) == 15)) {
+ printf("\n");
+ }
+ }
+ printf("\n");
+ }
+ #endif
+ pthread_mutex_unlock(&printfMutex);
+}
+
+static void logRingStall(
+ RingBuffer *ring,
+ char *readerOrWriter,
+ unsigned dex)
+{
+ pthread_mutex_lock(&printfMutex);
+ printf("=== %s stalled on %s buf %u\n",
+ readerOrWriter, ring->bufName, dex);
+ pthread_mutex_unlock(&printfMutex);
+}
+
+static void logRingClose(
+ RingBuffer *ring,
+ char *readerOrWriter)
+{
+ pthread_mutex_lock(&printfMutex);
+ printf("=== %s CLOSED by %s\n",
+ ring->bufName, readerOrWriter);
+ pthread_mutex_unlock(&printfMutex);
+}
+
+static void logRingReset(
+ RingBuffer *ring)
+{
+ pthread_mutex_lock(&printfMutex);
+ printf("=== %s RESET\n", ring->bufName);
+ pthread_mutex_unlock(&printfMutex);
+}
+
+#else /* LOG_RING */
+#define logRingWrite(r, w, d, t)
+#define logRingRead(r, b, d, t)
+#define logRingStall(r, row, d)
+#define logRingClose(r, row)
+#define logRingReset(r)
+#endif /* LOG_RING */
+
+void ringBufferReset(
+ RingBuffer *ring)
+{
+ unsigned dex;
+ for(dex=0; dex<ring->numElements; dex++) {
+ RingElement *elt = &ring->elements[dex];
+ elt->validBytes = 0;
+ elt->readOffset = 0;
+ }
+ ring->writerDex = 0;
+ ring->readerDex = 0;
+ ring->closed = false;
+ logRingReset(ring);
+}
+
+/*
+ * The "I/O" callbacks for SecureTransport.
+ * The SSLConnectionRef is a RingBuffers *.
+ */
+OSStatus ringReadFunc(
+ SSLConnectionRef connRef,
+ void *data,
+ size_t *dataLen) /* IN/OUT */
+{
+ RingBuffer *ring = ((RingBuffers *)connRef)->rdBuf;
+
+ if(ring->writerDex == ring->readerDex) {
+ if(ring->closed) {
+ /*
+ * Handle race condition: we saw a stall, then writer filled a
+ * RingElement and then set closed. Make sure we read the data before
+ * handling the close event.
+ */
+ if(ring->writerDex == ring->readerDex) {
+ /* writer closed: ECONNRESET */
+ *dataLen = 0;
+ return errSSLClosedAbort;
+ }
+ /* else proceed to read data */
+ }
+ else {
+ /* read stalled, writer thread is writing to our next element */
+ *dataLen = 0;
+ return errSSLWouldBlock;
+ }
+ }
+
+ unsigned char *outp = (unsigned char *)data;
+ size_t toMove = *dataLen;
+ size_t haveMoved = 0;
+
+ /* we own ring->elements[ring->readerDex] */
+ do {
+ /*
+ * Read as much data as there is in the buffer, or
+ * toMove, whichever is less
+ */
+ RingElement *elt = &ring->elements[ring->readerDex];
+ size_t thisMove = elt->validBytes;
+ if(thisMove > toMove) {
+ thisMove = toMove;
+ }
+ memmove(outp, elt->buf + elt->readOffset, thisMove);
+ logRingRead(ring, thisMove, ring->readerDex, outp);
+ if(thisMove == 0) {
+ /* should never happen */
+ printf("***thisMove 0!\n");
+ return internalComponentErr;
+ }
+ elt->validBytes -= thisMove;
+ elt->readOffset += thisMove;
+ toMove -= thisMove;
+ haveMoved += thisMove;
+ outp += thisMove;
+
+ if(elt->validBytes == 0) {
+ /*
+ * End of this buffer - advance to next one and keep going if it's
+ * not in use
+ */
+ unsigned nextDex;
+ elt->readOffset = 0;
+ /* increment and wrap must be atomic from the point of
+ * view of readerDex */
+ nextDex = ring->readerDex + 1;
+ if(nextDex == ring->numElements) {
+ nextDex = 0;
+ }
+ ring->readerDex = nextDex;
+ }
+ if(toMove == 0) {
+ /* caller got what they want */
+ break;
+ }
+ if(ring->readerDex == ring->writerDex) {
+ logRingStall(ring, "reader ", ring->readerDex);
+ /* stalled */
+ break;
+ }
+ } while(toMove);
+
+ OSStatus ortn = noErr;
+ if(haveMoved != *dataLen) {
+ if((haveMoved == 0) && ring->closed) {
+ /* writer closed: ECONNRESET */
+ ortn = errSSLClosedAbort;
+ }
+ else {
+ ortn = errSSLWouldBlock;
+ }
+ }
+ *dataLen = haveMoved;
+ return ortn;
+}
+
+/*
+ * This never returns errSSLWouldBlock - we block (spinning) if
+ * we stall because we run into the reader's element.
+ * Also, each call to this function uses up at least one
+ * RingElement - we don't coalesce multiple writes into one
+ * RingElement.
+ *
+ * On entry, writerDex is the element we're going to write to.
+ * On exit, writerDex is the element we're going to write to next,
+ * and we might stall before we update it as such.
+ */
+OSStatus ringWriteFunc(
+ SSLConnectionRef connRef,
+ const void *data,
+ size_t *dataLen) /* IN/OUT */
+{
+ RingBuffer *ring = ((RingBuffers *)connRef)->wrtBuf;
+ unsigned char *inp = (unsigned char *)data;
+ size_t toMove = *dataLen;
+ size_t haveMoved = 0;
+ unsigned nextDex;
+ OSStatus ortn = noErr;
+
+ /* we own ring->elements[ring->writerDex] */
+ do {
+ RingElement *elt = &ring->elements[ring->writerDex];
+ elt->validBytes = 0;
+
+ size_t thisMove = toMove;
+ if(thisMove > elt->capacity) {
+ thisMove = elt->capacity;
+ }
+ memmove(elt->buf, inp, thisMove);
+ logRingWrite(ring, thisMove, ring->writerDex, inp);
+
+ elt->validBytes = thisMove;
+ toMove -= thisMove;
+ haveMoved += thisMove;
+ inp += thisMove;
+
+ /* move on to next element, when it becomes available */
+ nextDex = ring->writerDex + 1;
+ if(nextDex == ring->numElements) {
+ nextDex = 0;
+ }
+ if(nextDex == ring->readerDex) {
+ logRingStall(ring, "writer", nextDex);
+ while(nextDex == ring->readerDex) {
+ /* if(ring->closed) {
+ break;
+ } */
+ ;
+ /* else stall */
+ }
+ }
+ /* we own nextDex */
+ ring->writerDex = nextDex;
+ if(ring->closed) {
+ break;
+ }
+ } while(toMove);
+ if(ring->closed && (haveMoved == 0)) {
+ /* reader closed socket: EPIPE */
+ ortn = errSSLClosedAbort;
+ }
+ *dataLen = haveMoved;
+ return ortn;
+}
+
+/* close both sides of a RingBuffers */
+void ringBuffersClose(
+ RingBuffers *rbs)
+{
+ if(rbs == NULL) {
+ return;
+ }
+ if(rbs->rdBuf) {
+ logRingClose(rbs->rdBuf, "reader");
+ rbs->rdBuf->closed = true;
+ }
+ if(rbs->wrtBuf) {
+ logRingClose(rbs->wrtBuf, "writer");
+ rbs->wrtBuf->closed = true;
+ }
+}
+