-/* Synchronous socket I/O operations, with timeouts.
- * Redis performs most of the I/O in a nonblocking way, with the exception
- * of the SYNC command where the slave does it in a blocking way, and
- * the MIGRATE command that must be blocking in order to be atomic from the
- * point of view of the two instances (one migrating the key and one receiving
- * the key). This is why need the following blocking I/O functions.
+/* Synchronous socket and file I/O operations useful across the core.
*
* Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
#include "redis.h"
-int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
+/* ----------------- Blocking sockets I/O with timeouts --------------------- */
+
+/* Redis performs most of the I/O in a nonblocking way, with the exception
+ * of the SYNC command where the slave does it in a blocking way, and
+ * the MIGRATE command that must be blocking in order to be atomic from the
+ * point of view of the two instances (one migrating the key and one receiving
+ * the key). This is why need the following blocking I/O functions.
+ *
+ * All the functions take the timeout in milliseconds. */
+
+#define REDIS_SYNCIO_RESOLUTION 10 /* Resolution in milliseconds */
+
+/* Write the specified payload to 'fd'. If writing the whole payload will be done
+ * within 'timeout' milliseconds the operation succeeds and 'size' is returned.
+ * Otherwise the operation fails, -1 is returned, and an unspecified partial write
+ * could be performed against the file descriptor. */
+ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout) {
ssize_t nwritten, ret = size;
- time_t start = time(NULL);
+ long long start = mstime();
+ long long remaining = timeout;
- timeout++;
- while(size) {
- if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
+ while(1) {
+ long long wait = (remaining > REDIS_SYNCIO_RESOLUTION) ?
+ remaining : REDIS_SYNCIO_RESOLUTION;
+ long long elapsed;
+
+ if (aeWait(fd,AE_WRITABLE,wait) & AE_WRITABLE) {
nwritten = write(fd,ptr,size);
if (nwritten == -1) return -1;
ptr += nwritten;
size -= nwritten;
+ if (size == 0) return ret;
}
- if ((time(NULL)-start) > timeout) {
+ elapsed = mstime() - start;
+ if (elapsed >= timeout) {
errno = ETIMEDOUT;
return -1;
}
+ remaining = timeout - elapsed;
}
- return ret;
}
-int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
+/* Read the specified amount of bytes from 'fd'. If all the bytes are read within
+ * 'timeout' milliseconds the operation succeed and 'size' is returned.
+ * Otherwise the operation fails, -1 is returned, and an unspecified amount of
+ * data could be read from the file descriptor. */
+ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout) {
ssize_t nread, totread = 0;
- time_t start = time(NULL);
+ long long start = mstime();
+ long long remaining = timeout;
- timeout++;
- while(size) {
- if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
+ while(1) {
+ long long wait = (remaining > REDIS_SYNCIO_RESOLUTION) ?
+ remaining : REDIS_SYNCIO_RESOLUTION;
+ long long elapsed;
+
+ if (aeWait(fd,AE_READABLE,wait) & AE_READABLE) {
nread = read(fd,ptr,size);
if (nread <= 0) return -1;
ptr += nread;
size -= nread;
totread += nread;
+ if (size == 0) return totread;
}
- if ((time(NULL)-start) > timeout) {
+ elapsed = mstime() - start;
+ if (elapsed >= timeout) {
errno = ETIMEDOUT;
return -1;
}
+ remaining = timeout - elapsed;
}
- return totread;
}
-int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
+/* Read a line making sure that every char will not require more than 'timeout'
+ * milliseconds to be read.
+ *
+ * On success the number of bytes read is returned, otherwise -1.
+ * On success the string is always correctly terminated with a 0 byte. */
+ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) {
ssize_t nread = 0;
size--;