]> git.saurik.com Git - redis.git/blob - src/syncio.c
Handle BRPOPLPUSH inside a transaction.
[redis.git] / src / syncio.c
1 /* Synchronous socket and file I/O operations useful across the core.
2 *
3 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "redis.h"
32
33 /* ----------------- Blocking sockets I/O with timeouts --------------------- */
34
35 /* Redis performs most of the I/O in a nonblocking way, with the exception
36 * of the SYNC command where the slave does it in a blocking way, and
37 * the MIGRATE command that must be blocking in order to be atomic from the
38 * point of view of the two instances (one migrating the key and one receiving
39 * the key). This is why need the following blocking I/O functions. */
40
41 int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
42 ssize_t nwritten, ret = size;
43 time_t start = time(NULL);
44
45 timeout++;
46 while(size) {
47 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
48 nwritten = write(fd,ptr,size);
49 if (nwritten == -1) return -1;
50 ptr += nwritten;
51 size -= nwritten;
52 }
53 if ((time(NULL)-start) > timeout) {
54 errno = ETIMEDOUT;
55 return -1;
56 }
57 }
58 return ret;
59 }
60
61 int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
62 ssize_t nread, totread = 0;
63 time_t start = time(NULL);
64
65 timeout++;
66 while(size) {
67 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
68 nread = read(fd,ptr,size);
69 if (nread <= 0) return -1;
70 ptr += nread;
71 size -= nread;
72 totread += nread;
73 }
74 if ((time(NULL)-start) > timeout) {
75 errno = ETIMEDOUT;
76 return -1;
77 }
78 }
79 return totread;
80 }
81
82 int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
83 ssize_t nread = 0;
84
85 size--;
86 while(size) {
87 char c;
88
89 if (syncRead(fd,&c,1,timeout) == -1) return -1;
90 if (c == '\n') {
91 *ptr = '\0';
92 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
93 return nread;
94 } else {
95 *ptr++ = c;
96 *ptr = '\0';
97 nread++;
98 }
99 }
100 return nread;
101 }
102
103 /* ----------------- Blocking sockets I/O with timeouts --------------------- */
104
105 /* Write binary-safe string into a file in the bulkformat
106 * $<count>\r\n<payload>\r\n */
107 int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
108 char cbuf[128];
109 int clen;
110 cbuf[0] = '$';
111 clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len);
112 cbuf[clen++] = '\r';
113 cbuf[clen++] = '\n';
114 if (fwrite(cbuf,clen,1,fp) == 0) return 0;
115 if (len > 0 && fwrite(s,len,1,fp) == 0) return 0;
116 if (fwrite("\r\n",2,1,fp) == 0) return 0;
117 return 1;
118 }
119
120 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
121 int fwriteBulkDouble(FILE *fp, double d) {
122 char buf[128], dbuf[128];
123
124 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
125 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
126 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
127 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
128 return 1;
129 }
130
131 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
132 int fwriteBulkLongLong(FILE *fp, long long l) {
133 char bbuf[128], lbuf[128];
134 unsigned int blen, llen;
135 llen = ll2string(lbuf,32,l);
136 blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf);
137 if (fwrite(bbuf,blen,1,fp) == 0) return 0;
138 return 1;
139 }
140
141 /* Delegate writing an object to writing a bulk string or bulk long long. */
142 int fwriteBulkObject(FILE *fp, robj *obj) {
143 /* Avoid using getDecodedObject to help copy-on-write (we are often
144 * in a child process when this function is called). */
145 if (obj->encoding == REDIS_ENCODING_INT) {
146 return fwriteBulkLongLong(fp,(long)obj->ptr);
147 } else if (obj->encoding == REDIS_ENCODING_RAW) {
148 return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr));
149 } else {
150 redisPanic("Unknown string encoding");
151 }
152 }
153
154