]>
git.saurik.com Git - redis.git/blob - deps/hiredis/async.c
2 * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
44 #define _EL_ADD_READ(ctx) do { \
45 if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
47 #define _EL_DEL_READ(ctx) do { \
48 if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
50 #define _EL_ADD_WRITE(ctx) do { \
51 if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
53 #define _EL_DEL_WRITE(ctx) do { \
54 if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
56 #define _EL_CLEANUP(ctx) do { \
57 if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
60 /* Forward declaration of function in hiredis.c */
61 void __redisAppendCommand(redisContext
*c
, char *cmd
, size_t len
);
63 /* Functions managing dictionary of callbacks for pub/sub. */
64 static unsigned int callbackHash(const void *key
) {
65 return dictGenHashFunction((unsigned char*)key
,sdslen((char*)key
));
68 static void *callbackValDup(void *privdata
, const void *src
) {
70 redisCallback
*dup
= malloc(sizeof(*dup
));
71 memcpy(dup
,src
,sizeof(*dup
));
75 static int callbackKeyCompare(void *privdata
, const void *key1
, const void *key2
) {
79 l1
= sdslen((sds
)key1
);
80 l2
= sdslen((sds
)key2
);
81 if (l1
!= l2
) return 0;
82 return memcmp(key1
,key2
,l1
) == 0;
85 static void callbackKeyDestructor(void *privdata
, void *key
) {
90 static void callbackValDestructor(void *privdata
, void *val
) {
95 static dictType callbackDict
= {
100 callbackKeyDestructor
,
101 callbackValDestructor
104 static redisAsyncContext
*redisAsyncInitialize(redisContext
*c
) {
105 redisAsyncContext
*ac
= realloc(c
,sizeof(redisAsyncContext
));
108 /* The regular connect functions will always set the flag REDIS_CONNECTED.
109 * For the async API, we want to wait until the first write event is
110 * received up before setting this flag, so reset it here. */
111 c
->flags
&= ~REDIS_CONNECTED
;
118 ac
->ev
.addRead
= NULL
;
119 ac
->ev
.delRead
= NULL
;
120 ac
->ev
.addWrite
= NULL
;
121 ac
->ev
.delWrite
= NULL
;
122 ac
->ev
.cleanup
= NULL
;
124 ac
->onConnect
= NULL
;
125 ac
->onDisconnect
= NULL
;
127 ac
->replies
.head
= NULL
;
128 ac
->replies
.tail
= NULL
;
129 ac
->sub
.invalid
.head
= NULL
;
130 ac
->sub
.invalid
.tail
= NULL
;
131 ac
->sub
.channels
= dictCreate(&callbackDict
,NULL
);
132 ac
->sub
.patterns
= dictCreate(&callbackDict
,NULL
);
136 /* We want the error field to be accessible directly instead of requiring
137 * an indirection to the redisContext struct. */
138 static void __redisAsyncCopyError(redisAsyncContext
*ac
) {
139 redisContext
*c
= &(ac
->c
);
141 ac
->errstr
= c
->errstr
;
144 redisAsyncContext
*redisAsyncConnect(const char *ip
, int port
) {
145 redisContext
*c
= redisConnectNonBlock(ip
,port
);
146 redisAsyncContext
*ac
= redisAsyncInitialize(c
);
147 __redisAsyncCopyError(ac
);
151 redisAsyncContext
*redisAsyncConnectUnix(const char *path
) {
152 redisContext
*c
= redisConnectUnixNonBlock(path
);
153 redisAsyncContext
*ac
= redisAsyncInitialize(c
);
154 __redisAsyncCopyError(ac
);
158 int redisAsyncSetConnectCallback(redisAsyncContext
*ac
, redisConnectCallback
*fn
) {
159 if (ac
->onConnect
== NULL
) {
162 /* The common way to detect an established connection is to wait for
163 * the first write event to be fired. This assumes the related event
164 * library functions are already set. */
171 int redisAsyncSetDisconnectCallback(redisAsyncContext
*ac
, redisDisconnectCallback
*fn
) {
172 if (ac
->onDisconnect
== NULL
) {
173 ac
->onDisconnect
= fn
;
179 /* Helper functions to push/shift callbacks */
180 static int __redisPushCallback(redisCallbackList
*list
, redisCallback
*source
) {
183 /* Copy callback from stack to heap */
184 cb
= malloc(sizeof(*cb
));
185 if (source
!= NULL
) {
186 memcpy(cb
,source
,sizeof(*cb
));
190 /* Store callback in list */
191 if (list
->head
== NULL
)
193 if (list
->tail
!= NULL
)
194 list
->tail
->next
= cb
;
199 static int __redisShiftCallback(redisCallbackList
*list
, redisCallback
*target
) {
200 redisCallback
*cb
= list
->head
;
202 list
->head
= cb
->next
;
203 if (cb
== list
->tail
)
206 /* Copy callback from heap to stack */
208 memcpy(target
,cb
,sizeof(*cb
));
215 static void __redisRunCallback(redisAsyncContext
*ac
, redisCallback
*cb
, redisReply
*reply
) {
216 redisContext
*c
= &(ac
->c
);
217 if (cb
->fn
!= NULL
) {
218 c
->flags
|= REDIS_IN_CALLBACK
;
219 cb
->fn(ac
,reply
,cb
->privdata
);
220 c
->flags
&= ~REDIS_IN_CALLBACK
;
224 /* Helper function to free the context. */
225 static void __redisAsyncFree(redisAsyncContext
*ac
) {
226 redisContext
*c
= &(ac
->c
);
231 /* Execute pending callbacks with NULL reply. */
232 while (__redisShiftCallback(&ac
->replies
,&cb
) == REDIS_OK
)
233 __redisRunCallback(ac
,&cb
,NULL
);
235 /* Execute callbacks for invalid commands */
236 while (__redisShiftCallback(&ac
->sub
.invalid
,&cb
) == REDIS_OK
)
237 __redisRunCallback(ac
,&cb
,NULL
);
239 /* Run subscription callbacks callbacks with NULL reply */
240 it
= dictGetIterator(ac
->sub
.channels
);
241 while ((de
= dictNext(it
)) != NULL
)
242 __redisRunCallback(ac
,dictGetEntryVal(de
),NULL
);
243 dictReleaseIterator(it
);
244 dictRelease(ac
->sub
.channels
);
246 it
= dictGetIterator(ac
->sub
.patterns
);
247 while ((de
= dictNext(it
)) != NULL
)
248 __redisRunCallback(ac
,dictGetEntryVal(de
),NULL
);
249 dictReleaseIterator(it
);
250 dictRelease(ac
->sub
.patterns
);
252 /* Signal event lib to clean up */
255 /* Execute disconnect callback. When redisAsyncFree() initiated destroying
256 * this context, the status will always be REDIS_OK. */
257 if (ac
->onDisconnect
&& (c
->flags
& REDIS_CONNECTED
)) {
258 if (c
->flags
& REDIS_FREEING
) {
259 ac
->onDisconnect(ac
,REDIS_OK
);
261 ac
->onDisconnect(ac
,(ac
->err
== 0) ? REDIS_OK
: REDIS_ERR
);
269 /* Free the async context. When this function is called from a callback,
270 * control needs to be returned to redisProcessCallbacks() before actual
271 * free'ing. To do so, a flag is set on the context which is picked up by
272 * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
273 void redisAsyncFree(redisAsyncContext
*ac
) {
274 redisContext
*c
= &(ac
->c
);
275 c
->flags
|= REDIS_FREEING
;
276 if (!(c
->flags
& REDIS_IN_CALLBACK
))
277 __redisAsyncFree(ac
);
280 /* Helper function to make the disconnect happen and clean up. */
281 static void __redisAsyncDisconnect(redisAsyncContext
*ac
) {
282 redisContext
*c
= &(ac
->c
);
284 /* Make sure error is accessible if there is any */
285 __redisAsyncCopyError(ac
);
288 /* For clean disconnects, there should be no pending callbacks. */
289 assert(__redisShiftCallback(&ac
->replies
,NULL
) == REDIS_ERR
);
291 /* Disconnection is caused by an error, make sure that pending
292 * callbacks cannot call new commands. */
293 c
->flags
|= REDIS_DISCONNECTING
;
296 /* For non-clean disconnects, __redisAsyncFree() will execute pending
297 * callbacks with a NULL-reply. */
298 __redisAsyncFree(ac
);
301 /* Tries to do a clean disconnect from Redis, meaning it stops new commands
302 * from being issued, but tries to flush the output buffer and execute
303 * callbacks for all remaining replies. When this function is called from a
304 * callback, there might be more replies and we can safely defer disconnecting
305 * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
306 * when there are no pending callbacks. */
307 void redisAsyncDisconnect(redisAsyncContext
*ac
) {
308 redisContext
*c
= &(ac
->c
);
309 c
->flags
|= REDIS_DISCONNECTING
;
310 if (!(c
->flags
& REDIS_IN_CALLBACK
) && ac
->replies
.head
== NULL
)
311 __redisAsyncDisconnect(ac
);
314 static int __redisGetSubscribeCallback(redisAsyncContext
*ac
, redisReply
*reply
, redisCallback
*dstcb
) {
315 redisContext
*c
= &(ac
->c
);
322 /* Custom reply functions are not supported for pub/sub. This will fail
323 * very hard when they are used... */
324 if (reply
->type
== REDIS_REPLY_ARRAY
) {
325 assert(reply
->elements
>= 2);
326 assert(reply
->element
[0]->type
== REDIS_REPLY_STRING
);
327 stype
= reply
->element
[0]->str
;
328 pvariant
= (tolower(stype
[0]) == 'p') ? 1 : 0;
331 callbacks
= ac
->sub
.patterns
;
333 callbacks
= ac
->sub
.channels
;
335 /* Locate the right callback */
336 assert(reply
->element
[1]->type
== REDIS_REPLY_STRING
);
337 sname
= sdsnewlen(reply
->element
[1]->str
,reply
->element
[1]->len
);
338 de
= dictFind(callbacks
,sname
);
340 memcpy(dstcb
,dictGetEntryVal(de
),sizeof(*dstcb
));
342 /* If this is an unsubscribe message, remove it. */
343 if (strcasecmp(stype
+pvariant
,"unsubscribe") == 0) {
344 dictDelete(callbacks
,sname
);
346 /* If this was the last unsubscribe message, revert to
347 * non-subscribe mode. */
348 assert(reply
->element
[2]->type
== REDIS_REPLY_INTEGER
);
349 if (reply
->element
[2]->integer
== 0)
350 c
->flags
&= ~REDIS_SUBSCRIBED
;
355 /* Shift callback for invalid commands. */
356 __redisShiftCallback(&ac
->sub
.invalid
,dstcb
);
361 void redisProcessCallbacks(redisAsyncContext
*ac
) {
362 redisContext
*c
= &(ac
->c
);
367 while((status
= redisGetReply(c
,&reply
)) == REDIS_OK
) {
369 /* When the connection is being disconnected and there are
370 * no more replies, this is the cue to really disconnect. */
371 if (c
->flags
& REDIS_DISCONNECTING
&& sdslen(c
->obuf
) == 0) {
372 __redisAsyncDisconnect(ac
);
376 /* If monitor mode, repush callback */
377 if(c
->flags
& REDIS_MONITORING
) {
378 __redisPushCallback(&ac
->replies
,&cb
);
381 /* When the connection is not being disconnected, simply stop
382 * trying to get replies and wait for the next loop tick. */
386 /* Even if the context is subscribed, pending regular callbacks will
387 * get a reply before pub/sub messages arrive. */
388 if (__redisShiftCallback(&ac
->replies
,&cb
) != REDIS_OK
) {
390 * A spontaneous reply in a not-subscribed context can be the error
391 * reply that is sent when a new connection exceeds the maximum
392 * number of allowed connections on the server side.
394 * This is seen as an error instead of a regular reply because the
395 * server closes the connection after sending it.
397 * To prevent the error from being overwritten by an EOF error the
398 * connection is closed here. See issue #43.
400 * Another possibility is that the server is loading its dataset.
401 * In this case we also want to close the connection, and have the
402 * user wait until the server is ready to take our request.
404 if (((redisReply
*)reply
)->type
== REDIS_REPLY_ERROR
) {
405 c
->err
= REDIS_ERR_OTHER
;
406 snprintf(c
->errstr
,sizeof(c
->errstr
),"%s",((redisReply
*)reply
)->str
);
407 __redisAsyncDisconnect(ac
);
410 /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
411 assert((c
->flags
& REDIS_SUBSCRIBED
|| c
->flags
& REDIS_MONITORING
));
412 if(c
->flags
& REDIS_SUBSCRIBED
)
413 __redisGetSubscribeCallback(ac
,reply
,&cb
);
417 __redisRunCallback(ac
,&cb
,reply
);
418 c
->reader
->fn
->freeObject(reply
);
420 /* Proceed with free'ing when redisAsyncFree() was called. */
421 if (c
->flags
& REDIS_FREEING
) {
422 __redisAsyncFree(ac
);
426 /* No callback for this reply. This can either be a NULL callback,
427 * or there were no callbacks to begin with. Either way, don't
428 * abort with an error, but simply ignore it because the client
429 * doesn't know what the server will spit out over the wire. */
430 c
->reader
->fn
->freeObject(reply
);
434 /* Disconnect when there was an error reading the reply */
435 if (status
!= REDIS_OK
)
436 __redisAsyncDisconnect(ac
);
439 /* Internal helper function to detect socket status the first time a read or
440 * write event fires. When connecting was not succesful, the connect callback
441 * is called with a REDIS_ERR status and the context is free'd. */
442 static int __redisAsyncHandleConnect(redisAsyncContext
*ac
) {
443 redisContext
*c
= &(ac
->c
);
445 if (redisCheckSocketError(c
,c
->fd
) == REDIS_ERR
) {
446 /* Try again later when connect(2) is still in progress. */
447 if (errno
== EINPROGRESS
)
450 if (ac
->onConnect
) ac
->onConnect(ac
,REDIS_ERR
);
451 __redisAsyncDisconnect(ac
);
455 /* Mark context as connected. */
456 c
->flags
|= REDIS_CONNECTED
;
457 if (ac
->onConnect
) ac
->onConnect(ac
,REDIS_OK
);
461 /* This function should be called when the socket is readable.
462 * It processes all replies that can be read and executes their callbacks.
464 void redisAsyncHandleRead(redisAsyncContext
*ac
) {
465 redisContext
*c
= &(ac
->c
);
467 if (!(c
->flags
& REDIS_CONNECTED
)) {
468 /* Abort connect was not successful. */
469 if (__redisAsyncHandleConnect(ac
) != REDIS_OK
)
471 /* Try again later when the context is still not connected. */
472 if (!(c
->flags
& REDIS_CONNECTED
))
476 if (redisBufferRead(c
) == REDIS_ERR
) {
477 __redisAsyncDisconnect(ac
);
479 /* Always re-schedule reads */
481 redisProcessCallbacks(ac
);
485 void redisAsyncHandleWrite(redisAsyncContext
*ac
) {
486 redisContext
*c
= &(ac
->c
);
489 if (!(c
->flags
& REDIS_CONNECTED
)) {
490 /* Abort connect was not successful. */
491 if (__redisAsyncHandleConnect(ac
) != REDIS_OK
)
493 /* Try again later when the context is still not connected. */
494 if (!(c
->flags
& REDIS_CONNECTED
))
498 if (redisBufferWrite(c
,&done
) == REDIS_ERR
) {
499 __redisAsyncDisconnect(ac
);
501 /* Continue writing when not done, stop writing otherwise */
507 /* Always schedule reads after writes */
512 /* Sets a pointer to the first argument and its length starting at p. Returns
513 * the number of bytes to skip to get to the following argument. */
514 static char *nextArgument(char *start
, char **str
, size_t *len
) {
518 if (p
== NULL
) return NULL
;
521 *len
= (int)strtol(p
+1,NULL
,10);
528 /* Helper function for the redisAsyncCommand* family of functions. Writes a
529 * formatted command to the output buffer and registers the provided callback
530 * function with the context. */
531 static int __redisAsyncCommand(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, char *cmd
, size_t len
) {
532 redisContext
*c
= &(ac
->c
);
534 int pvariant
, hasnext
;
540 /* Don't accept new commands when the connection is about to be closed. */
541 if (c
->flags
& (REDIS_DISCONNECTING
| REDIS_FREEING
)) return REDIS_ERR
;
545 cb
.privdata
= privdata
;
547 /* Find out which command will be appended. */
548 p
= nextArgument(cmd
,&cstr
,&clen
);
550 hasnext
= (p
[0] == '$');
551 pvariant
= (tolower(cstr
[0]) == 'p') ? 1 : 0;
555 if (hasnext
&& strncasecmp(cstr
,"subscribe\r\n",11) == 0) {
556 c
->flags
|= REDIS_SUBSCRIBED
;
558 /* Add every channel/pattern to the list of subscription callbacks. */
559 while ((p
= nextArgument(p
,&astr
,&alen
)) != NULL
) {
560 sname
= sdsnewlen(astr
,alen
);
562 dictReplace(ac
->sub
.patterns
,sname
,&cb
);
564 dictReplace(ac
->sub
.channels
,sname
,&cb
);
566 } else if (strncasecmp(cstr
,"unsubscribe\r\n",13) == 0) {
567 /* It is only useful to call (P)UNSUBSCRIBE when the context is
568 * subscribed to one or more channels or patterns. */
569 if (!(c
->flags
& REDIS_SUBSCRIBED
)) return REDIS_ERR
;
571 /* (P)UNSUBSCRIBE does not have its own response: every channel or
572 * pattern that is unsubscribed will receive a message. This means we
573 * should not append a callback function for this command. */
574 } else if(strncasecmp(cstr
,"monitor\r\n",9) == 0) {
575 /* Set monitor flag and push callback */
576 c
->flags
|= REDIS_MONITORING
;
577 __redisPushCallback(&ac
->replies
,&cb
);
579 if (c
->flags
& REDIS_SUBSCRIBED
)
580 /* This will likely result in an error reply, but it needs to be
581 * received and passed to the callback. */
582 __redisPushCallback(&ac
->sub
.invalid
,&cb
);
584 __redisPushCallback(&ac
->replies
,&cb
);
587 __redisAppendCommand(c
,cmd
,len
);
589 /* Always schedule a write when the write buffer is non-empty */
595 int redisvAsyncCommand(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, const char *format
, va_list ap
) {
599 len
= redisvFormatCommand(&cmd
,format
,ap
);
600 status
= __redisAsyncCommand(ac
,fn
,privdata
,cmd
,len
);
605 int redisAsyncCommand(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, const char *format
, ...) {
609 status
= redisvAsyncCommand(ac
,fn
,privdata
,format
,ap
);
614 int redisAsyncCommandArgv(redisAsyncContext
*ac
, redisCallbackFn
*fn
, void *privdata
, int argc
, const char **argv
, const size_t *argvlen
) {
618 len
= redisFormatCommandArgv(&cmd
,argc
,argv
,argvlen
);
619 status
= __redisAsyncCommand(ac
,fn
,privdata
,cmd
,len
);