2 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2010, Pieter Noordhuis <pcnoordhuis at gmail dot com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
41 /* Forward declaration of function in hiredis.c */
42 void __redisAppendCommand(redisContext
*c
, char *cmd
, size_t len
);
44 /* Functions managing dictionary of callbacks for pub/sub. */
45 static unsigned int callbackHash(const void *key
) {
46 return dictGenHashFunction((unsigned char*)key
,sdslen((char*)key
));
49 static void *callbackValDup(void *privdata
, const void *src
) {
51 redisCallback
*dup
= malloc(sizeof(*dup
));
52 memcpy(dup
,src
,sizeof(*dup
));
56 static int callbackKeyCompare(void *privdata
, const void *key1
, const void *key2
) {
60 l1
= sdslen((sds
)key1
);
61 l2
= sdslen((sds
)key2
);
62 if (l1
!= l2
) return 0;
63 return memcmp(key1
,key2
,l1
) == 0;
66 static void callbackKeyDestructor(void *privdata
, void *key
) {
71 static void callbackValDestructor(void *privdata
, void *val
) {
76 static dictType callbackDict
= {
81 callbackKeyDestructor
,
85 static redisAsyncContext
*redisAsyncInitialize(redisContext
*c
) {
86 redisAsyncContext
*ac
= realloc(c
,sizeof(redisAsyncContext
));
89 /* The regular connect functions will always set the flag REDIS_CONNECTED.
90 * For the async API, we want to wait until the first write event is
91 * received up before setting this flag, so reset it here. */
92 c
->flags
&= ~REDIS_CONNECTED
;
99 ac
->ev
.addRead
= NULL
;
100 ac
->ev
.delRead
= NULL
;
101 ac
->ev
.addWrite
= NULL
;
102 ac
->ev
.delWrite
= NULL
;
103 ac
->ev
.cleanup
= NULL
;
105 ac
->onConnect
= NULL
;
106 ac
->onDisconnect
= NULL
;
108 ac
->replies
.head
= NULL
;
109 ac
->replies
.tail
= NULL
;
110 ac
->sub
.invalid
.head
= NULL
;
111 ac
->sub
.invalid
.tail
= NULL
;
112 ac
->sub
.channels
= dictCreate(&callbackDict
,NULL
);
113 ac
->sub
.patterns
= dictCreate(&callbackDict
,NULL
);
117 /* We want the error field to be accessible directly instead of requiring
118 * an indirection to the redisContext struct. */
119 static void __redisAsyncCopyError(redisAsyncContext
*ac
) {
120 redisContext
*c
= &(ac
->c
);
122 ac
->errstr
= c
->errstr
;
125 redisAsyncContext
*redisAsyncConnect(const char *ip
, int port
) {
126 redisContext
*c
= redisConnectNonBlock(ip
,port
);
127 redisAsyncContext
*ac
= redisAsyncInitialize(c
);
128 __redisAsyncCopyError(ac
);
132 redisAsyncContext
*redisAsyncConnectUnix(const char *path
) {
133 redisContext
*c
= redisConnectUnixNonBlock(path
);
134 redisAsyncContext
*ac
= redisAsyncInitialize(c
);
135 __redisAsyncCopyError(ac
);
139 int redisAsyncSetReplyObjectFunctions(redisAsyncContext
*ac
, redisReplyObjectFunctions
*fn
) {
140 redisContext
*c
= &(ac
->c
);
141 return redisSetReplyObjectFunctions(c
,fn
);
144 int redisAsyncSetConnectCallback(redisAsyncContext
*ac
, redisConnectCallback
*fn
) {
145 if (ac
->onConnect
== NULL
) {
148 /* The common way to detect an established connection is to wait for
149 * the first write event to be fired. This assumes the related event
150 * library functions are already set. */
151 if (ac
->ev
.addWrite
) ac
->ev
.addWrite(ac
->ev
.data
);
157 int redisAsyncSetDisconnectCallback(redisAsyncContext
*ac
, redisDisconnectCallback
*fn
) {
158 if (ac
->onDisconnect
== NULL
) {
159 ac
->onDisconnect
= fn
;
165 /* Helper functions to push/shift callbacks */
166 static int __redisPushCallback(redisCallbackList
*list
, redisCallback
*source
) {
169 /* Copy callback from stack to heap */
170 cb
= malloc(sizeof(*cb
));
172 if (source
!= NULL
) {
173 memcpy(cb
,source
,sizeof(*cb
));
177 /* Store callback in list */
178 if (list
->head
== NULL
)
180 if (list
->tail
!= NULL
)
181 list
->tail
->next
= cb
;
186 static int __redisShiftCallback(redisCallbackList
*list
, redisCallback
*target
) {
187 redisCallback
*cb
= list
->head
;
189 list
->head
= cb
->next
;
190 if (cb
== list
->tail
)
193 /* Copy callback from heap to stack */
195 memcpy(target
,cb
,sizeof(*cb
));
202 static void __redisRunCallback(redisAsyncContext
*ac
, redisCallback
*cb
, redisReply
*reply
) {
203 redisContext
*c
= &(ac
->c
);
204 if (cb
->fn
!= NULL
) {
205 c
->flags
|= REDIS_IN_CALLBACK
;
206 cb
->fn(ac
,reply
,cb
->privdata
);
207 c
->flags
&= ~REDIS_IN_CALLBACK
;
211 /* Helper function to free the context. */
212 static void __redisAsyncFree(redisAsyncContext
*ac
) {
213 redisContext
*c
= &(ac
->c
);
218 /* Execute pending callbacks with NULL reply. */
219 while (__redisShiftCallback(&ac
->replies
,&cb
) == REDIS_OK
)
220 __redisRunCallback(ac
,&cb
,NULL
);
222 /* Execute callbacks for invalid commands */
223 while (__redisShiftCallback(&ac
->sub
.invalid
,&cb
) == REDIS_OK
)
224 __redisRunCallback(ac
,&cb
,NULL
);
226 /* Run subscription callbacks callbacks with NULL reply */
227 it
= dictGetIterator(ac
->sub
.channels
);
228 while ((de
= dictNext(it
)) != NULL
)
229 __redisRunCallback(ac
,dictGetEntryVal(de
),NULL
);
230 dictReleaseIterator(it
);
231 dictRelease(ac
->sub
.channels
);
233 it
= dictGetIterator(ac
->sub
.patterns
);
234 while ((de
= dictNext(it
)) != NULL
)
235 __redisRunCallback(ac
,dictGetEntryVal(de
),NULL
);
236 dictReleaseIterator(it
);
237 dictRelease(ac
->sub
.patterns
);
239 /* Signal event lib to clean up */
240 if (ac
->ev
.cleanup
) ac
->ev
.cleanup(ac
->ev
.data
);
242 /* Execute disconnect callback. When redisAsyncFree() initiated destroying
243 * this context, the status will always be REDIS_OK. */
244 if (ac
->onDisconnect
&& (c
->flags
& REDIS_CONNECTED
)) {
245 if (c
->flags
& REDIS_FREEING
) {
246 ac
->onDisconnect(ac
,REDIS_OK
);
248 ac
->onDisconnect(ac
,(ac
->err
== 0) ? REDIS_OK
: REDIS_ERR
);
256 /* Free the async context. When this function is called from a callback,
257 * control needs to be returned to redisProcessCallbacks() before actual
258 * free'ing. To do so, a flag is set on the context which is picked up by
259 * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
260 void redisAsyncFree(redisAsyncContext
*ac
) {
261 redisContext
*c
= &(ac
->c
);
262 c
->flags
|= REDIS_FREEING
;
263 if (!(c
->flags
& REDIS_IN_CALLBACK
))
264 __redisAsyncFree(ac
);
267 /* Helper function to make the disconnect happen and clean up. */
268 static void __redisAsyncDisconnect(redisAsyncContext
*ac
) {
269 redisContext
*c
= &(ac
->c
);
271 /* Make sure error is accessible if there is any */
272 __redisAsyncCopyError(ac
);
275 /* For clean disconnects, there should be no pending callbacks. */
276 assert(__redisShiftCallback(&ac
->replies
,NULL
) == REDIS_ERR
);
278 /* Disconnection is caused by an error, make sure that pending
279 * callbacks cannot call new commands. */
280 c
->flags
|= REDIS_DISCONNECTING
;
283 /* For non-clean disconnects, __redisAsyncFree() will execute pending
284 * callbacks with a NULL-reply. */
285 __redisAsyncFree(ac
);
288 /* Tries to do a clean disconnect from Redis, meaning it stops new commands
289 * from being issued, but tries to flush the output buffer and execute
290 * callbacks for all remaining replies. When this function is called from a
291 * callback, there might be more replies and we can safely defer disconnecting
292 * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
293 * when there are no pending callbacks. */
294 void redisAsyncDisconnect(redisAsyncContext
*ac
) {
295 redisContext
*c
= &(ac
->c
);
296 c
->flags
|= REDIS_DISCONNECTING
;
297 if (!(c
->flags
& REDIS_IN_CALLBACK
) && ac
->replies
.head
== NULL
)
298 __redisAsyncDisconnect(ac
);
301 static int __redisGetSubscribeCallback(redisAsyncContext
*ac
, redisReply
*reply
, redisCallback
*dstcb
) {
302 redisContext
*c
= &(ac
->c
);
309 /* Custom reply functions are not supported for pub/sub. This will fail
310 * very hard when they are used... */
311 if (reply
->type
== REDIS_REPLY_ARRAY
) {
312 assert(reply
->elements
>= 2);
313 assert(reply
->element
[0]->type
== REDIS_REPLY_STRING
);
314 stype
= reply
->element
[0]->str
;
315 pvariant
= (tolower(stype
[0]) == 'p') ? 1 : 0;
318 callbacks
= ac
->sub
.patterns
;
320 callbacks
= ac
->sub
.channels
;
322 /* Locate the right callback */
323 assert(reply
->element
[1]->type
== REDIS_REPLY_STRING
);
324 sname
= sdsnewlen(reply
->element
[1]->str
,reply
->element
[1]->len
);
325 de
= dictFind(callbacks
,sname
);
327 memcpy(dstcb
,dictGetEntryVal(de
),sizeof(*dstcb
));
329 /* If this is an unsubscribe message, remove it. */
330 if (strcasecmp(stype
+pvariant
,"unsubscribe") == 0) {
331 dictDelete(callbacks
,sname
);
333 /* If this was the last unsubscribe message, revert to
334 * non-subscribe mode. */
335 assert(reply
->element
[2]->type
== REDIS_REPLY_INTEGER
);
336 if (reply
->element
[2]->integer
== 0)
337 c
->flags
&= ~REDIS_SUBSCRIBED
;
342 /* Shift callback for invalid commands. */
343 __redisShiftCallback(&ac
->sub
.invalid
,dstcb
);
348 void redisProcessCallbacks(redisAsyncContext
*ac
) {
349 redisContext
*c
= &(ac
->c
);
354 while((status
= redisGetReply(c
,&reply
)) == REDIS_OK
) {
356 /* When the connection is being disconnected and there are
357 * no more replies, this is the cue to really disconnect. */
358 if (c
->flags
& REDIS_DISCONNECTING
&& sdslen(c
->obuf
) == 0) {
359 __redisAsyncDisconnect(ac
);
363 /* When the connection is not being disconnected, simply stop
364 * trying to get replies and wait for the next loop tick. */
368 /* Even if the context is subscribed, pending regular callbacks will
369 * get a reply before pub/sub messages arrive. */
370 if (__redisShiftCallback(&ac
->replies
,&cb
) != REDIS_OK
) {
371 /* No more regular callbacks, the context *must* be subscribed. */
372 assert(c
->flags
& REDIS_SUBSCRIBED
);
373 __redisGetSubscribeCallback(ac
,reply
,&cb
);
377 __redisRunCallback(ac
,&cb
,reply
);
378 c
->fn
->freeObject(reply
);
380 /* Proceed with free'ing when redisAsyncFree() was called. */
381 if (c
->flags
& REDIS_FREEING
) {
382 __redisAsyncFree(ac
);
386 /* No callback for this reply. This can either be a NULL callback,
387 * or there were no callbacks to begin with. Either way, don't
388 * abort with an error, but simply ignore it because the client
389 * doesn't know what the server will spit out over the wire. */
390 c
->fn
->freeObject(reply
);
394 /* Disconnect when there was an error reading the reply */
395 if (status
!= REDIS_OK
)
396 __redisAsyncDisconnect(ac
);
399 /* This function should be called when the socket is readable.
400 * It processes all replies that can be read and executes their callbacks.
402 void redisAsyncHandleRead(redisAsyncContext
*ac
) {
403 redisContext
*c
= &(ac
->c
);
405 if (redisBufferRead(c
) == REDIS_ERR
) {
406 __redisAsyncDisconnect(ac
);
408 /* Always re-schedule reads */
409 if (ac
->ev
.addRead
) ac
->ev
.addRead(ac
->ev
.data
);
410 redisProcessCallbacks(ac
);
414 void redisAsyncHandleWrite(redisAsyncContext
*ac
) {
415 redisContext
*c
= &(ac
->c
);
418 if (redisBufferWrite(c
,&done
) == REDIS_ERR
) {
419 __redisAsyncDisconnect(ac
);
421 /* Continue writing when not done, stop writing otherwise */
423 if (ac
->ev
.addWrite
) ac
->ev
.addWrite(ac
->ev
.data
);
425 if (ac
->ev
.delWrite
) ac
->ev
.delWrite(ac
->ev
.data
);
428 /* Always schedule reads after writes */
429 if (ac
->ev
.addRead
) ac
->ev
.addRead(ac
->ev
.data
);
431 /* Fire onConnect when this is the first write event. */
432 if (!(c
->flags
& REDIS_CONNECTED
)) {
433 c
->flags
|= REDIS_CONNECTED
;
434 if (ac
->onConnect
) ac
->onConnect(ac
);
439 /* Sets a pointer to the first argument and its length starting at p. Returns
440 * the number of bytes to skip to get to the following argument. */
441 static char *nextArgument(char *start
, char **str
, size_t *len
) {
445 if (p
== NULL
) return NULL
;
448 *len
= (int)strtol(p
+1,NULL
,10);
455 /* Helper function for the redisAsyncCommand* family of functions. Writes a
456 * formatted command to the output buffer and registers the provided callback
457 * function with the context. */
458 static int __redisAsyncCommand(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, char *cmd
, size_t len
) {
459 redisContext
*c
= &(ac
->c
);
461 int pvariant
, hasnext
;
467 /* Don't accept new commands when the connection is about to be closed. */
468 if (c
->flags
& (REDIS_DISCONNECTING
| REDIS_FREEING
)) return REDIS_ERR
;
472 cb
.privdata
= privdata
;
474 /* Find out which command will be appended. */
475 p
= nextArgument(cmd
,&cstr
,&clen
);
477 hasnext
= (p
[0] == '$');
478 pvariant
= (tolower(cstr
[0]) == 'p') ? 1 : 0;
482 if (hasnext
&& strncasecmp(cstr
,"subscribe\r\n",11) == 0) {
483 c
->flags
|= REDIS_SUBSCRIBED
;
485 /* Add every channel/pattern to the list of subscription callbacks. */
486 while ((p
= nextArgument(p
,&astr
,&alen
)) != NULL
) {
487 sname
= sdsnewlen(astr
,alen
);
489 dictReplace(ac
->sub
.patterns
,sname
,&cb
);
491 dictReplace(ac
->sub
.channels
,sname
,&cb
);
493 } else if (strncasecmp(cstr
,"unsubscribe\r\n",13) == 0) {
494 /* It is only useful to call (P)UNSUBSCRIBE when the context is
495 * subscribed to one or more channels or patterns. */
496 if (!(c
->flags
& REDIS_SUBSCRIBED
)) return REDIS_ERR
;
498 /* (P)UNSUBSCRIBE does not have its own response: every channel or
499 * pattern that is unsubscribed will receive a message. This means we
500 * should not append a callback function for this command. */
502 if (c
->flags
& REDIS_SUBSCRIBED
)
503 /* This will likely result in an error reply, but it needs to be
504 * received and passed to the callback. */
505 __redisPushCallback(&ac
->sub
.invalid
,&cb
);
507 __redisPushCallback(&ac
->replies
,&cb
);
510 __redisAppendCommand(c
,cmd
,len
);
512 /* Always schedule a write when the write buffer is non-empty */
513 if (ac
->ev
.addWrite
) ac
->ev
.addWrite(ac
->ev
.data
);
518 int redisvAsyncCommand(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, const char *format
, va_list ap
) {
522 len
= redisvFormatCommand(&cmd
,format
,ap
);
523 status
= __redisAsyncCommand(ac
,fn
,privdata
,cmd
,len
);
528 int redisAsyncCommand(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, const char *format
, ...) {
532 status
= redisvAsyncCommand(ac
,fn
,privdata
,format
,ap
);
537 int redisAsyncCommandArgv(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, int argc
, const char **argv
, const size_t *argvlen
) {
541 len
= redisFormatCommandArgv(&cmd
,argc
,argv
,argvlen
);
542 status
= __redisAsyncCommand(ac
,fn
,privdata
,cmd
,len
);