]> git.saurik.com Git - redis.git/blame - deps/hiredis/async.c
Merge branch 'master' into unstable
[redis.git] / deps / hiredis / async.c
CommitLineData
24f753a8
PN
1/*
2 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
a1e97d69
PN
3 * Copyright (c) 2010, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 *
24f753a8
PN
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32#include <string.h>
33#include <assert.h>
34#include "async.h"
35#include "sds.h"
36#include "util.h"
37
38/* Forward declaration of function in hiredis.c */
39void __redisAppendCommand(redisContext *c, char *cmd, size_t len);
40
41static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
42 redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext));
a1e97d69
PN
43 c = &(ac->c);
44
45 /* The regular connect functions will always set the flag REDIS_CONNECTED.
46 * For the async API, we want to wait until the first write event is
47 * received up before setting this flag, so reset it here. */
48 c->flags &= ~REDIS_CONNECTED;
49
50 ac->err = 0;
51 ac->errstr = NULL;
52 ac->data = NULL;
53 ac->_adapter_data = NULL;
54
55 ac->evAddRead = NULL;
56 ac->evDelRead = NULL;
57 ac->evAddWrite = NULL;
58 ac->evDelWrite = NULL;
59 ac->evCleanup = NULL;
60
61 ac->onConnect = NULL;
62 ac->onDisconnect = NULL;
63
64 ac->replies.head = NULL;
65 ac->replies.tail = NULL;
24f753a8
PN
66 return ac;
67}
68
69/* We want the error field to be accessible directly instead of requiring
70 * an indirection to the redisContext struct. */
71static void __redisAsyncCopyError(redisAsyncContext *ac) {
72 redisContext *c = &(ac->c);
73 ac->err = c->err;
74 ac->errstr = c->errstr;
75}
76
77redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
78 redisContext *c = redisConnectNonBlock(ip,port);
79 redisAsyncContext *ac = redisAsyncInitialize(c);
80 __redisAsyncCopyError(ac);
81 return ac;
82}
83
84redisAsyncContext *redisAsyncConnectUnix(const char *path) {
85 redisContext *c = redisConnectUnixNonBlock(path);
86 redisAsyncContext *ac = redisAsyncInitialize(c);
87 __redisAsyncCopyError(ac);
88 return ac;
89}
90
91int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn) {
92 redisContext *c = &(ac->c);
93 return redisSetReplyObjectFunctions(c,fn);
94}
95
a1e97d69
PN
96int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
97 if (ac->onConnect == NULL) {
98 ac->onConnect = fn;
99 return REDIS_OK;
100 }
101 return REDIS_ERR;
102}
103
24f753a8
PN
104int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
105 if (ac->onDisconnect == NULL) {
106 ac->onDisconnect = fn;
107 return REDIS_OK;
108 }
109 return REDIS_ERR;
110}
111
112/* Helper functions to push/shift callbacks */
113static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
114 redisCallback *cb;
115
116 /* Copy callback from stack to heap */
117 cb = calloc(1,sizeof(*cb));
118 if (!cb) redisOOM();
119 if (source != NULL) {
120 cb->fn = source->fn;
121 cb->privdata = source->privdata;
122 }
123
124 /* Store callback in list */
125 if (list->head == NULL)
126 list->head = cb;
127 if (list->tail != NULL)
128 list->tail->next = cb;
129 list->tail = cb;
130 return REDIS_OK;
131}
132
133static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
134 redisCallback *cb = list->head;
135 if (cb != NULL) {
136 list->head = cb->next;
137 if (cb == list->tail)
138 list->tail = NULL;
139
140 /* Copy callback from heap to stack */
141 if (target != NULL)
142 memcpy(target,cb,sizeof(*cb));
143 free(cb);
144 return REDIS_OK;
145 }
146 return REDIS_ERR;
147}
148
149/* Tries to do a clean disconnect from Redis, meaning it stops new commands
150 * from being issued, but tries to flush the output buffer and execute
151 * callbacks for all remaining replies.
152 *
153 * This functions is generally called from within a callback, so the
154 * processCallbacks function will pick up the flag when there are no
155 * more replies. */
156void redisAsyncDisconnect(redisAsyncContext *ac) {
157 redisContext *c = &(ac->c);
158 c->flags |= REDIS_DISCONNECTING;
159}
160
161/* Helper function to make the disconnect happen and clean up. */
162static void __redisAsyncDisconnect(redisAsyncContext *ac) {
163 redisContext *c = &(ac->c);
164 redisCallback cb;
165 int status;
166
167 /* Make sure error is accessible if there is any */
168 __redisAsyncCopyError(ac);
169 status = (ac->err == 0) ? REDIS_OK : REDIS_ERR;
170
171 if (status == REDIS_OK) {
172 /* When the connection is cleanly disconnected, there should not
173 * be pending callbacks. */
174 assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
175 } else {
176 /* Callbacks should not be able to issue new commands. */
177 c->flags |= REDIS_DISCONNECTING;
178
179 /* Execute pending callbacks with NULL reply. */
180 while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) {
181 if (cb.fn != NULL)
182 cb.fn(ac,NULL,cb.privdata);
183 }
184 }
185
186 /* Signal event lib to clean up */
a1e97d69 187 if (ac->evCleanup) ac->evCleanup(ac->_adapter_data);
24f753a8
PN
188
189 /* Execute callback with proper status */
190 if (ac->onDisconnect) ac->onDisconnect(ac,status);
191
192 /* Cleanup self */
193 redisFree(c);
194}
195
196void redisProcessCallbacks(redisAsyncContext *ac) {
197 redisContext *c = &(ac->c);
198 redisCallback cb;
199 void *reply = NULL;
200 int status;
201
202 while((status = redisGetReply(c,&reply)) == REDIS_OK) {
203 if (reply == NULL) {
204 /* When the connection is being disconnected and there are
205 * no more replies, this is the cue to really disconnect. */
206 if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) {
207 __redisAsyncDisconnect(ac);
208 return;
209 }
210
211 /* When the connection is not being disconnected, simply stop
212 * trying to get replies and wait for the next loop tick. */
213 break;
214 }
215
216 /* Shift callback and execute it */
217 assert(__redisShiftCallback(&ac->replies,&cb) == REDIS_OK);
218 if (cb.fn != NULL) {
219 cb.fn(ac,reply,cb.privdata);
220 } else {
221 c->fn->freeObject(reply);
222 }
223 }
224
225 /* Disconnect when there was an error reading the reply */
226 if (status != REDIS_OK)
227 __redisAsyncDisconnect(ac);
228}
229
230/* This function should be called when the socket is readable.
231 * It processes all replies that can be read and executes their callbacks.
232 */
233void redisAsyncHandleRead(redisAsyncContext *ac) {
234 redisContext *c = &(ac->c);
235
236 if (redisBufferRead(c) == REDIS_ERR) {
237 __redisAsyncDisconnect(ac);
238 } else {
239 /* Always re-schedule reads */
a1e97d69 240 if (ac->evAddRead) ac->evAddRead(ac->_adapter_data);
24f753a8
PN
241 redisProcessCallbacks(ac);
242 }
243}
244
245void redisAsyncHandleWrite(redisAsyncContext *ac) {
246 redisContext *c = &(ac->c);
247 int done = 0;
248
249 if (redisBufferWrite(c,&done) == REDIS_ERR) {
250 __redisAsyncDisconnect(ac);
251 } else {
252 /* Continue writing when not done, stop writing otherwise */
253 if (!done) {
a1e97d69 254 if (ac->evAddWrite) ac->evAddWrite(ac->_adapter_data);
24f753a8 255 } else {
a1e97d69 256 if (ac->evDelWrite) ac->evDelWrite(ac->_adapter_data);
24f753a8
PN
257 }
258
a1e97d69
PN
259 /* Always schedule reads after writes */
260 if (ac->evAddRead) ac->evAddRead(ac->_adapter_data);
261
262 /* Fire onConnect when this is the first write event. */
263 if (!(c->flags & REDIS_CONNECTED)) {
264 c->flags |= REDIS_CONNECTED;
265 if (ac->onConnect) ac->onConnect(ac);
266 }
24f753a8
PN
267 }
268}
269
270/* Helper function for the redisAsyncCommand* family of functions.
271 *
272 * Write a formatted command to the output buffer and register the provided
273 * callback function with the context.
274 */
275static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) {
276 redisContext *c = &(ac->c);
277 redisCallback cb;
278
279 /* Don't accept new commands when the connection is lazily closed. */
280 if (c->flags & REDIS_DISCONNECTING) return REDIS_ERR;
281 __redisAppendCommand(c,cmd,len);
282
283 /* Store callback */
284 cb.fn = fn;
285 cb.privdata = privdata;
286 __redisPushCallback(&ac->replies,&cb);
287
288 /* Always schedule a write when the write buffer is non-empty */
a1e97d69 289 if (ac->evAddWrite) ac->evAddWrite(ac->_adapter_data);
24f753a8
PN
290
291 return REDIS_OK;
292}
293
294int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
295 char *cmd;
296 int len;
297 int status;
298 len = redisvFormatCommand(&cmd,format,ap);
299 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
300 free(cmd);
301 return status;
302}
303
304int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
305 va_list ap;
306 int status;
307 va_start(ap,format);
308 status = redisvAsyncCommand(ac,fn,privdata,format,ap);
309 va_end(ap);
310 return status;
311}
312
313int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
314 char *cmd;
315 int len;
316 int status;
317 len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
318 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
319 free(cmd);
320 return status;
321}