]> git.saurik.com Git - redis.git/blobdiff - src/syncio.c
"SORT by nosort" (skip sorting) respect sorted set ordering.
[redis.git] / src / syncio.c
index 28ac181140d0963afba5a07580693e72f7a030fb..0c202c9e242a47f730a2f7fa95aa64ae24354d2f 100644 (file)
  * of the SYNC command where the slave does it in a blocking way, and
  * the MIGRATE command that must be blocking in order to be atomic from the
  * point of view of the two instances (one migrating the key and one receiving
- * the key). This is why need the following blocking I/O functions. */
+ * the key). This is why need the following blocking I/O functions.
+ *
+ * All the functions take the timeout in milliseconds. */
 
-int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
-    ssize_t nwritten, ret = size;
-    time_t start = time(NULL);
+#define REDIS_SYNCIO_RESOLUTION 10 /* Resolution in milliseconds */
 
-    timeout++;
-    while(size) {
-        if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
-            nwritten = write(fd,ptr,size);
-            if (nwritten == -1) return -1;
+/* Write the specified payload to 'fd'. If writing the whole payload will be
+ * done within 'timeout' milliseconds the operation succeeds and 'size' is
+ * returned. Otherwise the operation fails, -1 is returned, and an unspecified
+ * partial write could be performed against the file descriptor. */
+ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout) {
+    ssize_t nwritten, ret = size;
+    long long start = mstime();
+    long long remaining = timeout;
+
+    while(1) {
+        long long wait = (remaining > REDIS_SYNCIO_RESOLUTION) ?
+                          remaining : REDIS_SYNCIO_RESOLUTION;
+        long long elapsed;
+
+        /* Optimistically try to write before checking if the file descriptor
+         * is actually writable. At worst we get EAGAIN. */
+        nwritten = write(fd,ptr,size);
+        if (nwritten == -1) {
+            if (errno != EAGAIN) return -1;
+        } else {
             ptr += nwritten;
             size -= nwritten;
         }
-        if ((time(NULL)-start) > timeout) {
+        if (size == 0) return ret;
+
+        /* Wait */
+        aeWait(fd,AE_WRITABLE,wait);
+        elapsed = mstime() - start;
+        if (elapsed >= timeout) {
             errno = ETIMEDOUT;
             return -1;
         }
+        remaining = timeout - elapsed;
     }
-    return ret;
 }
 
-int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
+/* Read the specified amount of bytes from 'fd'. If all the bytes are read
+ * within 'timeout' milliseconds the operation succeed and 'size' is returned.
+ * Otherwise the operation fails, -1 is returned, and an unspecified amount of
+ * data could be read from the file descriptor. */
+ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout) {
     ssize_t nread, totread = 0;
-    time_t start = time(NULL);
-
-    timeout++;
-    while(size) {
-        if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
-            nread = read(fd,ptr,size);
-            if (nread <= 0) return -1;
+    long long start = mstime();
+    long long remaining = timeout;
+
+    if (size == 0) return 0;
+    while(1) {
+        long long wait = (remaining > REDIS_SYNCIO_RESOLUTION) ?
+                          remaining : REDIS_SYNCIO_RESOLUTION;
+        long long elapsed;
+
+        /* Optimistically try to read before checking if the file descriptor
+         * is actually readable. At worst we get EAGAIN. */
+        nread = read(fd,ptr,size);
+        if (nread == 0) return -1; /* short read. */
+        if (nread == -1) {
+            if (errno != EAGAIN) return -1;
+        } else {
             ptr += nread;
             size -= nread;
             totread += nread;
         }
-        if ((time(NULL)-start) > timeout) {
+        if (size == 0) return totread;
+
+        /* Wait */
+        aeWait(fd,AE_READABLE,wait);
+        elapsed = mstime() - start;
+        if (elapsed >= timeout) {
             errno = ETIMEDOUT;
             return -1;
         }
+        remaining = timeout - elapsed;
     }
-    return totread;
 }
 
-int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
+/* Read a line making sure that every char will not require more than 'timeout'
+ * milliseconds to be read.
+ * 
+ * On success the number of bytes read is returned, otherwise -1.
+ * On success the string is always correctly terminated with a 0 byte. */
+ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) {
     ssize_t nread = 0;
 
     size--;
@@ -99,56 +142,3 @@ int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
     }
     return nread;
 }
-
-/* ----------------- Blocking sockets I/O with timeouts --------------------- */
-
-/* Write binary-safe string into a file in the bulkformat
- * $<count>\r\n<payload>\r\n */
-int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
-    char cbuf[128];
-    int clen;
-    cbuf[0] = '$';
-    clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len);
-    cbuf[clen++] = '\r';
-    cbuf[clen++] = '\n';
-    if (fwrite(cbuf,clen,1,fp) == 0) return 0;
-    if (len > 0 && fwrite(s,len,1,fp) == 0) return 0;
-    if (fwrite("\r\n",2,1,fp) == 0) return 0;
-    return 1;
-}
-
-/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
-int fwriteBulkDouble(FILE *fp, double d) {
-    char buf[128], dbuf[128];
-
-    snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
-    snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
-    if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
-    if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
-    return 1;
-}
-
-/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
-int fwriteBulkLongLong(FILE *fp, long long l) {
-    char bbuf[128], lbuf[128];
-    unsigned int blen, llen;
-    llen = ll2string(lbuf,32,l);
-    blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf);
-    if (fwrite(bbuf,blen,1,fp) == 0) return 0;
-    return 1;
-}
-
-/* Delegate writing an object to writing a bulk string or bulk long long. */
-int fwriteBulkObject(FILE *fp, robj *obj) {
-    /* Avoid using getDecodedObject to help copy-on-write (we are often
-     * in a child process when this function is called). */
-    if (obj->encoding == REDIS_ENCODING_INT) {
-        return fwriteBulkLongLong(fp,(long)obj->ptr);
-    } else if (obj->encoding == REDIS_ENCODING_RAW) {
-        return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr));
-    } else {
-        redisPanic("Unknown string encoding");
-    }
-}
-
-