2 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2010, Pieter Noordhuis <pcnoordhuis at gmail dot com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
38 /* Forward declaration of function in hiredis.c */
39 void __redisAppendCommand(redisContext
*c
, char *cmd
, size_t len
);
41 static redisAsyncContext
*redisAsyncInitialize(redisContext
*c
) {
42 redisAsyncContext
*ac
= realloc(c
,sizeof(redisAsyncContext
));
45 /* The regular connect functions will always set the flag REDIS_CONNECTED.
46 * For the async API, we want to wait until the first write event is
47 * received up before setting this flag, so reset it here. */
48 c
->flags
&= ~REDIS_CONNECTED
;
53 ac
->_adapter_data
= NULL
;
57 ac
->evAddWrite
= NULL
;
58 ac
->evDelWrite
= NULL
;
62 ac
->onDisconnect
= NULL
;
64 ac
->replies
.head
= NULL
;
65 ac
->replies
.tail
= NULL
;
69 /* We want the error field to be accessible directly instead of requiring
70 * an indirection to the redisContext struct. */
71 static void __redisAsyncCopyError(redisAsyncContext
*ac
) {
72 redisContext
*c
= &(ac
->c
);
74 ac
->errstr
= c
->errstr
;
77 redisAsyncContext
*redisAsyncConnect(const char *ip
, int port
) {
78 redisContext
*c
= redisConnectNonBlock(ip
,port
);
79 redisAsyncContext
*ac
= redisAsyncInitialize(c
);
80 __redisAsyncCopyError(ac
);
84 redisAsyncContext
*redisAsyncConnectUnix(const char *path
) {
85 redisContext
*c
= redisConnectUnixNonBlock(path
);
86 redisAsyncContext
*ac
= redisAsyncInitialize(c
);
87 __redisAsyncCopyError(ac
);
91 int redisAsyncSetReplyObjectFunctions(redisAsyncContext
*ac
, redisReplyObjectFunctions
*fn
) {
92 redisContext
*c
= &(ac
->c
);
93 return redisSetReplyObjectFunctions(c
,fn
);
96 int redisAsyncSetConnectCallback(redisAsyncContext
*ac
, redisConnectCallback
*fn
) {
97 if (ac
->onConnect
== NULL
) {
104 int redisAsyncSetDisconnectCallback(redisAsyncContext
*ac
, redisDisconnectCallback
*fn
) {
105 if (ac
->onDisconnect
== NULL
) {
106 ac
->onDisconnect
= fn
;
112 /* Helper functions to push/shift callbacks */
113 static int __redisPushCallback(redisCallbackList
*list
, redisCallback
*source
) {
116 /* Copy callback from stack to heap */
117 cb
= calloc(1,sizeof(*cb
));
119 if (source
!= NULL
) {
121 cb
->privdata
= source
->privdata
;
124 /* Store callback in list */
125 if (list
->head
== NULL
)
127 if (list
->tail
!= NULL
)
128 list
->tail
->next
= cb
;
133 static int __redisShiftCallback(redisCallbackList
*list
, redisCallback
*target
) {
134 redisCallback
*cb
= list
->head
;
136 list
->head
= cb
->next
;
137 if (cb
== list
->tail
)
140 /* Copy callback from heap to stack */
142 memcpy(target
,cb
,sizeof(*cb
));
149 /* Tries to do a clean disconnect from Redis, meaning it stops new commands
150 * from being issued, but tries to flush the output buffer and execute
151 * callbacks for all remaining replies.
153 * This functions is generally called from within a callback, so the
154 * processCallbacks function will pick up the flag when there are no
156 void redisAsyncDisconnect(redisAsyncContext
*ac
) {
157 redisContext
*c
= &(ac
->c
);
158 c
->flags
|= REDIS_DISCONNECTING
;
161 /* Helper function to make the disconnect happen and clean up. */
162 static void __redisAsyncDisconnect(redisAsyncContext
*ac
) {
163 redisContext
*c
= &(ac
->c
);
167 /* Make sure error is accessible if there is any */
168 __redisAsyncCopyError(ac
);
169 status
= (ac
->err
== 0) ? REDIS_OK
: REDIS_ERR
;
171 if (status
== REDIS_OK
) {
172 /* When the connection is cleanly disconnected, there should not
173 * be pending callbacks. */
174 assert(__redisShiftCallback(&ac
->replies
,NULL
) == REDIS_ERR
);
176 /* Callbacks should not be able to issue new commands. */
177 c
->flags
|= REDIS_DISCONNECTING
;
179 /* Execute pending callbacks with NULL reply. */
180 while (__redisShiftCallback(&ac
->replies
,&cb
) == REDIS_OK
) {
182 cb
.fn(ac
,NULL
,cb
.privdata
);
186 /* Signal event lib to clean up */
187 if (ac
->evCleanup
) ac
->evCleanup(ac
->_adapter_data
);
189 /* Execute callback with proper status */
190 if (ac
->onDisconnect
) ac
->onDisconnect(ac
,status
);
196 void redisProcessCallbacks(redisAsyncContext
*ac
) {
197 redisContext
*c
= &(ac
->c
);
202 while((status
= redisGetReply(c
,&reply
)) == REDIS_OK
) {
204 /* When the connection is being disconnected and there are
205 * no more replies, this is the cue to really disconnect. */
206 if (c
->flags
& REDIS_DISCONNECTING
&& sdslen(c
->obuf
) == 0) {
207 __redisAsyncDisconnect(ac
);
211 /* When the connection is not being disconnected, simply stop
212 * trying to get replies and wait for the next loop tick. */
216 /* Shift callback and execute it */
217 assert(__redisShiftCallback(&ac
->replies
,&cb
) == REDIS_OK
);
219 cb
.fn(ac
,reply
,cb
.privdata
);
221 c
->fn
->freeObject(reply
);
225 /* Disconnect when there was an error reading the reply */
226 if (status
!= REDIS_OK
)
227 __redisAsyncDisconnect(ac
);
230 /* This function should be called when the socket is readable.
231 * It processes all replies that can be read and executes their callbacks.
233 void redisAsyncHandleRead(redisAsyncContext
*ac
) {
234 redisContext
*c
= &(ac
->c
);
236 if (redisBufferRead(c
) == REDIS_ERR
) {
237 __redisAsyncDisconnect(ac
);
239 /* Always re-schedule reads */
240 if (ac
->evAddRead
) ac
->evAddRead(ac
->_adapter_data
);
241 redisProcessCallbacks(ac
);
245 void redisAsyncHandleWrite(redisAsyncContext
*ac
) {
246 redisContext
*c
= &(ac
->c
);
249 if (redisBufferWrite(c
,&done
) == REDIS_ERR
) {
250 __redisAsyncDisconnect(ac
);
252 /* Continue writing when not done, stop writing otherwise */
254 if (ac
->evAddWrite
) ac
->evAddWrite(ac
->_adapter_data
);
256 if (ac
->evDelWrite
) ac
->evDelWrite(ac
->_adapter_data
);
259 /* Always schedule reads after writes */
260 if (ac
->evAddRead
) ac
->evAddRead(ac
->_adapter_data
);
262 /* Fire onConnect when this is the first write event. */
263 if (!(c
->flags
& REDIS_CONNECTED
)) {
264 c
->flags
|= REDIS_CONNECTED
;
265 if (ac
->onConnect
) ac
->onConnect(ac
);
270 /* Helper function for the redisAsyncCommand* family of functions.
272 * Write a formatted command to the output buffer and register the provided
273 * callback function with the context.
275 static int __redisAsyncCommand(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, char *cmd
, size_t len
) {
276 redisContext
*c
= &(ac
->c
);
279 /* Don't accept new commands when the connection is lazily closed. */
280 if (c
->flags
& REDIS_DISCONNECTING
) return REDIS_ERR
;
281 __redisAppendCommand(c
,cmd
,len
);
285 cb
.privdata
= privdata
;
286 __redisPushCallback(&ac
->replies
,&cb
);
288 /* Always schedule a write when the write buffer is non-empty */
289 if (ac
->evAddWrite
) ac
->evAddWrite(ac
->_adapter_data
);
294 int redisvAsyncCommand(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, const char *format
, va_list ap
) {
298 len
= redisvFormatCommand(&cmd
,format
,ap
);
299 status
= __redisAsyncCommand(ac
,fn
,privdata
,cmd
,len
);
304 int redisAsyncCommand(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, const char *format
, ...) {
308 status
= redisvAsyncCommand(ac
,fn
,privdata
,format
,ap
);
313 int redisAsyncCommandArgv(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, int argc
, const char **argv
, const size_t *argvlen
) {
317 len
= redisFormatCommandArgv(&cmd
,argc
,argv
,argvlen
);
318 status
= __redisAsyncCommand(ac
,fn
,privdata
,cmd
,len
);