]> git.saurik.com Git - redis.git/blame - src/syncio.c
Allow Pub/Sub in contexts where other commands are blocked.
[redis.git] / src / syncio.c
CommitLineData
d08fac3e 1/* Synchronous socket and file I/O operations useful across the core.
19e61097 2 *
3 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#include "redis.h"
32
d08fac3e 33/* ----------------- Blocking sockets I/O with timeouts --------------------- */
34
35/* Redis performs most of the I/O in a nonblocking way, with the exception
36 * of the SYNC command where the slave does it in a blocking way, and
37 * the MIGRATE command that must be blocking in order to be atomic from the
38 * point of view of the two instances (one migrating the key and one receiving
04d360fd 39 * the key). This is why need the following blocking I/O functions.
40 *
41 * All the functions take the timeout in milliseconds. */
42
43#define REDIS_SYNCIO_RESOLUTION 10 /* Resolution in milliseconds */
d08fac3e 44
af3853c3 45/* Write the specified payload to 'fd'. If writing the whole payload will be
46 * done within 'timeout' milliseconds the operation succeeds and 'size' is
47 * returned. Otherwise the operation fails, -1 is returned, and an unspecified
48 * partial write could be performed against the file descriptor. */
04d360fd 49ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout) {
19e61097 50 ssize_t nwritten, ret = size;
04d360fd 51 long long start = mstime();
52 long long remaining = timeout;
19e61097 53
04d360fd 54 while(1) {
55 long long wait = (remaining > REDIS_SYNCIO_RESOLUTION) ?
56 remaining : REDIS_SYNCIO_RESOLUTION;
57 long long elapsed;
58
af3853c3 59 /* Optimistically try to write before checking if the file descriptor
60 * is actually writable. At worst we get EAGAIN. */
61 nwritten = write(fd,ptr,size);
62 if (nwritten == -1) {
63 if (errno != EAGAIN) return -1;
64 } else {
19e61097 65 ptr += nwritten;
66 size -= nwritten;
67 }
af3853c3 68 if (size == 0) return ret;
69
70 /* Wait */
71 aeWait(fd,AE_WRITABLE,wait);
04d360fd 72 elapsed = mstime() - start;
73 if (elapsed >= timeout) {
19e61097 74 errno = ETIMEDOUT;
75 return -1;
76 }
04d360fd 77 remaining = timeout - elapsed;
19e61097 78 }
19e61097 79}
80
af3853c3 81/* Read the specified amount of bytes from 'fd'. If all the bytes are read
82 * within 'timeout' milliseconds the operation succeed and 'size' is returned.
04d360fd 83 * Otherwise the operation fails, -1 is returned, and an unspecified amount of
84 * data could be read from the file descriptor. */
85ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout) {
19e61097 86 ssize_t nread, totread = 0;
04d360fd 87 long long start = mstime();
88 long long remaining = timeout;
19e61097 89
af3853c3 90 if (size == 0) return 0;
04d360fd 91 while(1) {
92 long long wait = (remaining > REDIS_SYNCIO_RESOLUTION) ?
93 remaining : REDIS_SYNCIO_RESOLUTION;
94 long long elapsed;
95
af3853c3 96 /* Optimistically try to read before checking if the file descriptor
97 * is actually readable. At worst we get EAGAIN. */
98 nread = read(fd,ptr,size);
99 if (nread == 0) return -1; /* short read. */
100 if (nread == -1) {
101 if (errno != EAGAIN) return -1;
102 } else {
19e61097 103 ptr += nread;
104 size -= nread;
105 totread += nread;
106 }
af3853c3 107 if (size == 0) return totread;
108
109 /* Wait */
110 aeWait(fd,AE_READABLE,wait);
04d360fd 111 elapsed = mstime() - start;
112 if (elapsed >= timeout) {
19e61097 113 errno = ETIMEDOUT;
114 return -1;
115 }
04d360fd 116 remaining = timeout - elapsed;
19e61097 117 }
19e61097 118}
119
04d360fd 120/* Read a line making sure that every char will not require more than 'timeout'
121 * milliseconds to be read.
122 *
123 * On success the number of bytes read is returned, otherwise -1.
124 * On success the string is always correctly terminated with a 0 byte. */
125ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) {
19e61097 126 ssize_t nread = 0;
127
128 size--;
129 while(size) {
130 char c;
131
132 if (syncRead(fd,&c,1,timeout) == -1) return -1;
133 if (c == '\n') {
134 *ptr = '\0';
135 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
136 return nread;
137 } else {
138 *ptr++ = c;
139 *ptr = '\0';
140 nread++;
141 }
142 }
143 return nread;
144}