]> git.saurik.com Git - redis.git/blob - deps/hiredis/async.c
added the mandatory Cheers in the release notes
[redis.git] / deps / hiredis / async.c
1 /*
2 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include <string.h>
31 #include <assert.h>
32 #include "async.h"
33 #include "sds.h"
34 #include "util.h"
35
36 /* Forward declaration of function in hiredis.c */
37 void __redisAppendCommand(redisContext *c, char *cmd, size_t len);
38
39 static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
40 redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext));
41 /* Set all bytes in the async part of the context to 0 */
42 memset(ac+sizeof(redisContext),0,sizeof(redisAsyncContext)-sizeof(redisContext));
43 return ac;
44 }
45
46 /* We want the error field to be accessible directly instead of requiring
47 * an indirection to the redisContext struct. */
48 static void __redisAsyncCopyError(redisAsyncContext *ac) {
49 redisContext *c = &(ac->c);
50 ac->err = c->err;
51 ac->errstr = c->errstr;
52 }
53
54 redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
55 redisContext *c = redisConnectNonBlock(ip,port);
56 redisAsyncContext *ac = redisAsyncInitialize(c);
57 __redisAsyncCopyError(ac);
58 return ac;
59 }
60
61 redisAsyncContext *redisAsyncConnectUnix(const char *path) {
62 redisContext *c = redisConnectUnixNonBlock(path);
63 redisAsyncContext *ac = redisAsyncInitialize(c);
64 __redisAsyncCopyError(ac);
65 return ac;
66 }
67
68 int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn) {
69 redisContext *c = &(ac->c);
70 return redisSetReplyObjectFunctions(c,fn);
71 }
72
73 int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
74 if (ac->onDisconnect == NULL) {
75 ac->onDisconnect = fn;
76 return REDIS_OK;
77 }
78 return REDIS_ERR;
79 }
80
81 /* Helper functions to push/shift callbacks */
82 static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
83 redisCallback *cb;
84
85 /* Copy callback from stack to heap */
86 cb = calloc(1,sizeof(*cb));
87 if (!cb) redisOOM();
88 if (source != NULL) {
89 cb->fn = source->fn;
90 cb->privdata = source->privdata;
91 }
92
93 /* Store callback in list */
94 if (list->head == NULL)
95 list->head = cb;
96 if (list->tail != NULL)
97 list->tail->next = cb;
98 list->tail = cb;
99 return REDIS_OK;
100 }
101
102 static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
103 redisCallback *cb = list->head;
104 if (cb != NULL) {
105 list->head = cb->next;
106 if (cb == list->tail)
107 list->tail = NULL;
108
109 /* Copy callback from heap to stack */
110 if (target != NULL)
111 memcpy(target,cb,sizeof(*cb));
112 free(cb);
113 return REDIS_OK;
114 }
115 return REDIS_ERR;
116 }
117
118 /* Tries to do a clean disconnect from Redis, meaning it stops new commands
119 * from being issued, but tries to flush the output buffer and execute
120 * callbacks for all remaining replies.
121 *
122 * This functions is generally called from within a callback, so the
123 * processCallbacks function will pick up the flag when there are no
124 * more replies. */
125 void redisAsyncDisconnect(redisAsyncContext *ac) {
126 redisContext *c = &(ac->c);
127 c->flags |= REDIS_DISCONNECTING;
128 }
129
130 /* Helper function to make the disconnect happen and clean up. */
131 static void __redisAsyncDisconnect(redisAsyncContext *ac) {
132 redisContext *c = &(ac->c);
133 redisCallback cb;
134 int status;
135
136 /* Make sure error is accessible if there is any */
137 __redisAsyncCopyError(ac);
138 status = (ac->err == 0) ? REDIS_OK : REDIS_ERR;
139
140 if (status == REDIS_OK) {
141 /* When the connection is cleanly disconnected, there should not
142 * be pending callbacks. */
143 assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
144 } else {
145 /* Callbacks should not be able to issue new commands. */
146 c->flags |= REDIS_DISCONNECTING;
147
148 /* Execute pending callbacks with NULL reply. */
149 while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) {
150 if (cb.fn != NULL)
151 cb.fn(ac,NULL,cb.privdata);
152 }
153 }
154
155 /* Signal event lib to clean up */
156 if (ac->evCleanup) ac->evCleanup(ac->data);
157
158 /* Execute callback with proper status */
159 if (ac->onDisconnect) ac->onDisconnect(ac,status);
160
161 /* Cleanup self */
162 redisFree(c);
163 }
164
165 void redisProcessCallbacks(redisAsyncContext *ac) {
166 redisContext *c = &(ac->c);
167 redisCallback cb;
168 void *reply = NULL;
169 int status;
170
171 while((status = redisGetReply(c,&reply)) == REDIS_OK) {
172 if (reply == NULL) {
173 /* When the connection is being disconnected and there are
174 * no more replies, this is the cue to really disconnect. */
175 if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) {
176 __redisAsyncDisconnect(ac);
177 return;
178 }
179
180 /* When the connection is not being disconnected, simply stop
181 * trying to get replies and wait for the next loop tick. */
182 break;
183 }
184
185 /* Shift callback and execute it */
186 assert(__redisShiftCallback(&ac->replies,&cb) == REDIS_OK);
187 if (cb.fn != NULL) {
188 cb.fn(ac,reply,cb.privdata);
189 } else {
190 c->fn->freeObject(reply);
191 }
192 }
193
194 /* Disconnect when there was an error reading the reply */
195 if (status != REDIS_OK)
196 __redisAsyncDisconnect(ac);
197 }
198
199 /* This function should be called when the socket is readable.
200 * It processes all replies that can be read and executes their callbacks.
201 */
202 void redisAsyncHandleRead(redisAsyncContext *ac) {
203 redisContext *c = &(ac->c);
204
205 if (redisBufferRead(c) == REDIS_ERR) {
206 __redisAsyncDisconnect(ac);
207 } else {
208 /* Always re-schedule reads */
209 if (ac->evAddRead) ac->evAddRead(ac->data);
210 redisProcessCallbacks(ac);
211 }
212 }
213
214 void redisAsyncHandleWrite(redisAsyncContext *ac) {
215 redisContext *c = &(ac->c);
216 int done = 0;
217
218 if (redisBufferWrite(c,&done) == REDIS_ERR) {
219 __redisAsyncDisconnect(ac);
220 } else {
221 /* Continue writing when not done, stop writing otherwise */
222 if (!done) {
223 if (ac->evAddWrite) ac->evAddWrite(ac->data);
224 } else {
225 if (ac->evDelWrite) ac->evDelWrite(ac->data);
226 }
227
228 /* Always schedule reads when something was written */
229 if (ac->evAddRead) ac->evAddRead(ac->data);
230 }
231 }
232
233 /* Helper function for the redisAsyncCommand* family of functions.
234 *
235 * Write a formatted command to the output buffer and register the provided
236 * callback function with the context.
237 */
238 static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) {
239 redisContext *c = &(ac->c);
240 redisCallback cb;
241
242 /* Don't accept new commands when the connection is lazily closed. */
243 if (c->flags & REDIS_DISCONNECTING) return REDIS_ERR;
244 __redisAppendCommand(c,cmd,len);
245
246 /* Store callback */
247 cb.fn = fn;
248 cb.privdata = privdata;
249 __redisPushCallback(&ac->replies,&cb);
250
251 /* Always schedule a write when the write buffer is non-empty */
252 if (ac->evAddWrite) ac->evAddWrite(ac->data);
253
254 return REDIS_OK;
255 }
256
257 int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
258 char *cmd;
259 int len;
260 int status;
261 len = redisvFormatCommand(&cmd,format,ap);
262 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
263 free(cmd);
264 return status;
265 }
266
267 int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
268 va_list ap;
269 int status;
270 va_start(ap,format);
271 status = redisvAsyncCommand(ac,fn,privdata,format,ap);
272 va_end(ap);
273 return status;
274 }
275
276 int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
277 char *cmd;
278 int len;
279 int status;
280 len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
281 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
282 free(cmd);
283 return status;
284 }