2 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
36 /* Forward declaration of function in hiredis.c */
37 void __redisAppendCommand(redisContext
*c
, char *cmd
, size_t len
);
39 static redisAsyncContext
*redisAsyncInitialize(redisContext
*c
) {
40 redisAsyncContext
*ac
= realloc(c
,sizeof(redisAsyncContext
));
41 /* Set all bytes in the async part of the context to 0 */
42 memset(ac
+sizeof(redisContext
),0,sizeof(redisAsyncContext
)-sizeof(redisContext
));
46 /* We want the error field to be accessible directly instead of requiring
47 * an indirection to the redisContext struct. */
48 static void __redisAsyncCopyError(redisAsyncContext
*ac
) {
49 redisContext
*c
= &(ac
->c
);
51 ac
->errstr
= c
->errstr
;
54 redisAsyncContext
*redisAsyncConnect(const char *ip
, int port
) {
55 redisContext
*c
= redisConnectNonBlock(ip
,port
);
56 redisAsyncContext
*ac
= redisAsyncInitialize(c
);
57 __redisAsyncCopyError(ac
);
61 redisAsyncContext
*redisAsyncConnectUnix(const char *path
) {
62 redisContext
*c
= redisConnectUnixNonBlock(path
);
63 redisAsyncContext
*ac
= redisAsyncInitialize(c
);
64 __redisAsyncCopyError(ac
);
68 int redisAsyncSetReplyObjectFunctions(redisAsyncContext
*ac
, redisReplyObjectFunctions
*fn
) {
69 redisContext
*c
= &(ac
->c
);
70 return redisSetReplyObjectFunctions(c
,fn
);
73 int redisAsyncSetDisconnectCallback(redisAsyncContext
*ac
, redisDisconnectCallback
*fn
) {
74 if (ac
->onDisconnect
== NULL
) {
75 ac
->onDisconnect
= fn
;
81 /* Helper functions to push/shift callbacks */
82 static int __redisPushCallback(redisCallbackList
*list
, redisCallback
*source
) {
85 /* Copy callback from stack to heap */
86 cb
= calloc(1,sizeof(*cb
));
90 cb
->privdata
= source
->privdata
;
93 /* Store callback in list */
94 if (list
->head
== NULL
)
96 if (list
->tail
!= NULL
)
97 list
->tail
->next
= cb
;
102 static int __redisShiftCallback(redisCallbackList
*list
, redisCallback
*target
) {
103 redisCallback
*cb
= list
->head
;
105 list
->head
= cb
->next
;
106 if (cb
== list
->tail
)
109 /* Copy callback from heap to stack */
111 memcpy(target
,cb
,sizeof(*cb
));
118 /* Tries to do a clean disconnect from Redis, meaning it stops new commands
119 * from being issued, but tries to flush the output buffer and execute
120 * callbacks for all remaining replies.
122 * This functions is generally called from within a callback, so the
123 * processCallbacks function will pick up the flag when there are no
125 void redisAsyncDisconnect(redisAsyncContext
*ac
) {
126 redisContext
*c
= &(ac
->c
);
127 c
->flags
|= REDIS_DISCONNECTING
;
130 /* Helper function to make the disconnect happen and clean up. */
131 static void __redisAsyncDisconnect(redisAsyncContext
*ac
) {
132 redisContext
*c
= &(ac
->c
);
136 /* Make sure error is accessible if there is any */
137 __redisAsyncCopyError(ac
);
138 status
= (ac
->err
== 0) ? REDIS_OK
: REDIS_ERR
;
140 if (status
== REDIS_OK
) {
141 /* When the connection is cleanly disconnected, there should not
142 * be pending callbacks. */
143 assert(__redisShiftCallback(&ac
->replies
,NULL
) == REDIS_ERR
);
145 /* Callbacks should not be able to issue new commands. */
146 c
->flags
|= REDIS_DISCONNECTING
;
148 /* Execute pending callbacks with NULL reply. */
149 while (__redisShiftCallback(&ac
->replies
,&cb
) == REDIS_OK
) {
151 cb
.fn(ac
,NULL
,cb
.privdata
);
155 /* Signal event lib to clean up */
156 if (ac
->evCleanup
) ac
->evCleanup(ac
->data
);
158 /* Execute callback with proper status */
159 if (ac
->onDisconnect
) ac
->onDisconnect(ac
,status
);
165 void redisProcessCallbacks(redisAsyncContext
*ac
) {
166 redisContext
*c
= &(ac
->c
);
171 while((status
= redisGetReply(c
,&reply
)) == REDIS_OK
) {
173 /* When the connection is being disconnected and there are
174 * no more replies, this is the cue to really disconnect. */
175 if (c
->flags
& REDIS_DISCONNECTING
&& sdslen(c
->obuf
) == 0) {
176 __redisAsyncDisconnect(ac
);
180 /* When the connection is not being disconnected, simply stop
181 * trying to get replies and wait for the next loop tick. */
185 /* Shift callback and execute it */
186 assert(__redisShiftCallback(&ac
->replies
,&cb
) == REDIS_OK
);
188 cb
.fn(ac
,reply
,cb
.privdata
);
190 c
->fn
->freeObject(reply
);
194 /* Disconnect when there was an error reading the reply */
195 if (status
!= REDIS_OK
)
196 __redisAsyncDisconnect(ac
);
199 /* This function should be called when the socket is readable.
200 * It processes all replies that can be read and executes their callbacks.
202 void redisAsyncHandleRead(redisAsyncContext
*ac
) {
203 redisContext
*c
= &(ac
->c
);
205 if (redisBufferRead(c
) == REDIS_ERR
) {
206 __redisAsyncDisconnect(ac
);
208 /* Always re-schedule reads */
209 if (ac
->evAddRead
) ac
->evAddRead(ac
->data
);
210 redisProcessCallbacks(ac
);
214 void redisAsyncHandleWrite(redisAsyncContext
*ac
) {
215 redisContext
*c
= &(ac
->c
);
218 if (redisBufferWrite(c
,&done
) == REDIS_ERR
) {
219 __redisAsyncDisconnect(ac
);
221 /* Continue writing when not done, stop writing otherwise */
223 if (ac
->evAddWrite
) ac
->evAddWrite(ac
->data
);
225 if (ac
->evDelWrite
) ac
->evDelWrite(ac
->data
);
228 /* Always schedule reads when something was written */
229 if (ac
->evAddRead
) ac
->evAddRead(ac
->data
);
233 /* Helper function for the redisAsyncCommand* family of functions.
235 * Write a formatted command to the output buffer and register the provided
236 * callback function with the context.
238 static int __redisAsyncCommand(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, char *cmd
, size_t len
) {
239 redisContext
*c
= &(ac
->c
);
242 /* Don't accept new commands when the connection is lazily closed. */
243 if (c
->flags
& REDIS_DISCONNECTING
) return REDIS_ERR
;
244 __redisAppendCommand(c
,cmd
,len
);
248 cb
.privdata
= privdata
;
249 __redisPushCallback(&ac
->replies
,&cb
);
251 /* Always schedule a write when the write buffer is non-empty */
252 if (ac
->evAddWrite
) ac
->evAddWrite(ac
->data
);
257 int redisvAsyncCommand(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, const char *format
, va_list ap
) {
261 len
= redisvFormatCommand(&cmd
,format
,ap
);
262 status
= __redisAsyncCommand(ac
,fn
,privdata
,cmd
,len
);
267 int redisAsyncCommand(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, const char *format
, ...) {
271 status
= redisvAsyncCommand(ac
,fn
,privdata
,format
,ap
);
276 int redisAsyncCommandArgv(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, int argc
, const char **argv
, const size_t *argvlen
) {
280 len
= redisFormatCommandArgv(&cmd
,argc
,argv
,argvlen
);
281 status
= __redisAsyncCommand(ac
,fn
,privdata
,cmd
,len
);