]> git.saurik.com Git - redis.git/blame_incremental - src/syncio.c
Remove the write handler only if there are no longer objects in the output queue...
[redis.git] / src / syncio.c
... / ...
CommitLineData
1/* Synchronous socket and file I/O operations useful across the core.
2 *
3 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#include "redis.h"
32
33/* ----------------- Blocking sockets I/O with timeouts --------------------- */
34
35/* Redis performs most of the I/O in a nonblocking way, with the exception
36 * of the SYNC command where the slave does it in a blocking way, and
37 * the MIGRATE command that must be blocking in order to be atomic from the
38 * point of view of the two instances (one migrating the key and one receiving
39 * the key). This is why need the following blocking I/O functions. */
40
41int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
42 ssize_t nwritten, ret = size;
43 time_t start = time(NULL);
44
45 timeout++;
46 while(size) {
47 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
48 nwritten = write(fd,ptr,size);
49 if (nwritten == -1) return -1;
50 ptr += nwritten;
51 size -= nwritten;
52 }
53 if ((time(NULL)-start) > timeout) {
54 errno = ETIMEDOUT;
55 return -1;
56 }
57 }
58 return ret;
59}
60
61int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
62 ssize_t nread, totread = 0;
63 time_t start = time(NULL);
64
65 timeout++;
66 while(size) {
67 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
68 nread = read(fd,ptr,size);
69 if (nread <= 0) return -1;
70 ptr += nread;
71 size -= nread;
72 totread += nread;
73 }
74 if ((time(NULL)-start) > timeout) {
75 errno = ETIMEDOUT;
76 return -1;
77 }
78 }
79 return totread;
80}
81
82int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
83 ssize_t nread = 0;
84
85 size--;
86 while(size) {
87 char c;
88
89 if (syncRead(fd,&c,1,timeout) == -1) return -1;
90 if (c == '\n') {
91 *ptr = '\0';
92 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
93 return nread;
94 } else {
95 *ptr++ = c;
96 *ptr = '\0';
97 nread++;
98 }
99 }
100 return nread;
101}
102
103/* ----------------- Blocking sockets I/O with timeouts --------------------- */
104
105/* Write binary-safe string into a file in the bulkformat
106 * $<count>\r\n<payload>\r\n */
107int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
108 char cbuf[128];
109 int clen;
110
111 cbuf[0] = '$';
112 clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len);
113 cbuf[clen++] = '\r';
114 cbuf[clen++] = '\n';
115 if (fwrite(cbuf,clen,1,fp) == 0) return 0;
116 if (len > 0 && fwrite(s,len,1,fp) == 0) return 0;
117 if (fwrite("\r\n",2,1,fp) == 0) return 0;
118 return 1;
119}
120
121/* Write a multi bulk count in the form "*<count>\r\n" */
122int fwriteBulkCount(FILE *fp, char prefix, int count) {
123 char cbuf[128];
124 int clen;
125
126 cbuf[0] = prefix;
127 clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
128 cbuf[clen++] = '\r';
129 cbuf[clen++] = '\n';
130 if (fwrite(cbuf,clen,1,fp) == 0) return 0;
131 return 1;
132}
133
134/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
135int fwriteBulkDouble(FILE *fp, double d) {
136 char buf[128], dbuf[128];
137
138 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
139 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
140 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
141 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
142 return 1;
143}
144
145/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
146int fwriteBulkLongLong(FILE *fp, long long l) {
147 char bbuf[128], lbuf[128];
148 unsigned int blen, llen;
149 llen = ll2string(lbuf,32,l);
150 blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf);
151 if (fwrite(bbuf,blen,1,fp) == 0) return 0;
152 return 1;
153}
154
155/* Delegate writing an object to writing a bulk string or bulk long long. */
156int fwriteBulkObject(FILE *fp, robj *obj) {
157 /* Avoid using getDecodedObject to help copy-on-write (we are often
158 * in a child process when this function is called). */
159 if (obj->encoding == REDIS_ENCODING_INT) {
160 return fwriteBulkLongLong(fp,(long)obj->ptr);
161 } else if (obj->encoding == REDIS_ENCODING_RAW) {
162 return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr));
163 } else {
164 redisPanic("Unknown string encoding");
165 }
166}
167
168