]> git.saurik.com Git - redis.git/blame_incremental - deps/hiredis/async.c
redis-check-dump now understands dumps produced by Redis 2.6
[redis.git] / deps / hiredis / async.c
... / ...
CommitLineData
1/*
2 * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 *
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32#include "fmacros.h"
33#include <stdlib.h>
34#include <string.h>
35#include <strings.h>
36#include <assert.h>
37#include <ctype.h>
38#include <errno.h>
39#include "async.h"
40#include "net.h"
41#include "dict.c"
42#include "sds.h"
43
44#define _EL_ADD_READ(ctx) do { \
45 if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
46 } while(0)
47#define _EL_DEL_READ(ctx) do { \
48 if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
49 } while(0)
50#define _EL_ADD_WRITE(ctx) do { \
51 if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
52 } while(0)
53#define _EL_DEL_WRITE(ctx) do { \
54 if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
55 } while(0)
56#define _EL_CLEANUP(ctx) do { \
57 if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
58 } while(0);
59
60/* Forward declaration of function in hiredis.c */
61void __redisAppendCommand(redisContext *c, char *cmd, size_t len);
62
63/* Functions managing dictionary of callbacks for pub/sub. */
64static unsigned int callbackHash(const void *key) {
65 return dictGenHashFunction((unsigned char*)key,sdslen((char*)key));
66}
67
68static void *callbackValDup(void *privdata, const void *src) {
69 ((void) privdata);
70 redisCallback *dup = malloc(sizeof(*dup));
71 memcpy(dup,src,sizeof(*dup));
72 return dup;
73}
74
75static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) {
76 int l1, l2;
77 ((void) privdata);
78
79 l1 = sdslen((sds)key1);
80 l2 = sdslen((sds)key2);
81 if (l1 != l2) return 0;
82 return memcmp(key1,key2,l1) == 0;
83}
84
85static void callbackKeyDestructor(void *privdata, void *key) {
86 ((void) privdata);
87 sdsfree((sds)key);
88}
89
90static void callbackValDestructor(void *privdata, void *val) {
91 ((void) privdata);
92 free(val);
93}
94
95static dictType callbackDict = {
96 callbackHash,
97 NULL,
98 callbackValDup,
99 callbackKeyCompare,
100 callbackKeyDestructor,
101 callbackValDestructor
102};
103
104static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
105 redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext));
106 c = &(ac->c);
107
108 /* The regular connect functions will always set the flag REDIS_CONNECTED.
109 * For the async API, we want to wait until the first write event is
110 * received up before setting this flag, so reset it here. */
111 c->flags &= ~REDIS_CONNECTED;
112
113 ac->err = 0;
114 ac->errstr = NULL;
115 ac->data = NULL;
116
117 ac->ev.data = NULL;
118 ac->ev.addRead = NULL;
119 ac->ev.delRead = NULL;
120 ac->ev.addWrite = NULL;
121 ac->ev.delWrite = NULL;
122 ac->ev.cleanup = NULL;
123
124 ac->onConnect = NULL;
125 ac->onDisconnect = NULL;
126
127 ac->replies.head = NULL;
128 ac->replies.tail = NULL;
129 ac->sub.invalid.head = NULL;
130 ac->sub.invalid.tail = NULL;
131 ac->sub.channels = dictCreate(&callbackDict,NULL);
132 ac->sub.patterns = dictCreate(&callbackDict,NULL);
133 return ac;
134}
135
136/* We want the error field to be accessible directly instead of requiring
137 * an indirection to the redisContext struct. */
138static void __redisAsyncCopyError(redisAsyncContext *ac) {
139 redisContext *c = &(ac->c);
140 ac->err = c->err;
141 ac->errstr = c->errstr;
142}
143
144redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
145 redisContext *c = redisConnectNonBlock(ip,port);
146 redisAsyncContext *ac = redisAsyncInitialize(c);
147 __redisAsyncCopyError(ac);
148 return ac;
149}
150
151redisAsyncContext *redisAsyncConnectUnix(const char *path) {
152 redisContext *c = redisConnectUnixNonBlock(path);
153 redisAsyncContext *ac = redisAsyncInitialize(c);
154 __redisAsyncCopyError(ac);
155 return ac;
156}
157
158int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
159 if (ac->onConnect == NULL) {
160 ac->onConnect = fn;
161
162 /* The common way to detect an established connection is to wait for
163 * the first write event to be fired. This assumes the related event
164 * library functions are already set. */
165 _EL_ADD_WRITE(ac);
166 return REDIS_OK;
167 }
168 return REDIS_ERR;
169}
170
171int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
172 if (ac->onDisconnect == NULL) {
173 ac->onDisconnect = fn;
174 return REDIS_OK;
175 }
176 return REDIS_ERR;
177}
178
179/* Helper functions to push/shift callbacks */
180static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
181 redisCallback *cb;
182
183 /* Copy callback from stack to heap */
184 cb = malloc(sizeof(*cb));
185 if (source != NULL) {
186 memcpy(cb,source,sizeof(*cb));
187 cb->next = NULL;
188 }
189
190 /* Store callback in list */
191 if (list->head == NULL)
192 list->head = cb;
193 if (list->tail != NULL)
194 list->tail->next = cb;
195 list->tail = cb;
196 return REDIS_OK;
197}
198
199static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
200 redisCallback *cb = list->head;
201 if (cb != NULL) {
202 list->head = cb->next;
203 if (cb == list->tail)
204 list->tail = NULL;
205
206 /* Copy callback from heap to stack */
207 if (target != NULL)
208 memcpy(target,cb,sizeof(*cb));
209 free(cb);
210 return REDIS_OK;
211 }
212 return REDIS_ERR;
213}
214
215static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
216 redisContext *c = &(ac->c);
217 if (cb->fn != NULL) {
218 c->flags |= REDIS_IN_CALLBACK;
219 cb->fn(ac,reply,cb->privdata);
220 c->flags &= ~REDIS_IN_CALLBACK;
221 }
222}
223
224/* Helper function to free the context. */
225static void __redisAsyncFree(redisAsyncContext *ac) {
226 redisContext *c = &(ac->c);
227 redisCallback cb;
228 dictIterator *it;
229 dictEntry *de;
230
231 /* Execute pending callbacks with NULL reply. */
232 while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
233 __redisRunCallback(ac,&cb,NULL);
234
235 /* Execute callbacks for invalid commands */
236 while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
237 __redisRunCallback(ac,&cb,NULL);
238
239 /* Run subscription callbacks callbacks with NULL reply */
240 it = dictGetIterator(ac->sub.channels);
241 while ((de = dictNext(it)) != NULL)
242 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
243 dictReleaseIterator(it);
244 dictRelease(ac->sub.channels);
245
246 it = dictGetIterator(ac->sub.patterns);
247 while ((de = dictNext(it)) != NULL)
248 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
249 dictReleaseIterator(it);
250 dictRelease(ac->sub.patterns);
251
252 /* Signal event lib to clean up */
253 _EL_CLEANUP(ac);
254
255 /* Execute disconnect callback. When redisAsyncFree() initiated destroying
256 * this context, the status will always be REDIS_OK. */
257 if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
258 if (c->flags & REDIS_FREEING) {
259 ac->onDisconnect(ac,REDIS_OK);
260 } else {
261 ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
262 }
263 }
264
265 /* Cleanup self */
266 redisFree(c);
267}
268
269/* Free the async context. When this function is called from a callback,
270 * control needs to be returned to redisProcessCallbacks() before actual
271 * free'ing. To do so, a flag is set on the context which is picked up by
272 * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
273void redisAsyncFree(redisAsyncContext *ac) {
274 redisContext *c = &(ac->c);
275 c->flags |= REDIS_FREEING;
276 if (!(c->flags & REDIS_IN_CALLBACK))
277 __redisAsyncFree(ac);
278}
279
280/* Helper function to make the disconnect happen and clean up. */
281static void __redisAsyncDisconnect(redisAsyncContext *ac) {
282 redisContext *c = &(ac->c);
283
284 /* Make sure error is accessible if there is any */
285 __redisAsyncCopyError(ac);
286
287 if (ac->err == 0) {
288 /* For clean disconnects, there should be no pending callbacks. */
289 assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
290 } else {
291 /* Disconnection is caused by an error, make sure that pending
292 * callbacks cannot call new commands. */
293 c->flags |= REDIS_DISCONNECTING;
294 }
295
296 /* For non-clean disconnects, __redisAsyncFree() will execute pending
297 * callbacks with a NULL-reply. */
298 __redisAsyncFree(ac);
299}
300
301/* Tries to do a clean disconnect from Redis, meaning it stops new commands
302 * from being issued, but tries to flush the output buffer and execute
303 * callbacks for all remaining replies. When this function is called from a
304 * callback, there might be more replies and we can safely defer disconnecting
305 * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
306 * when there are no pending callbacks. */
307void redisAsyncDisconnect(redisAsyncContext *ac) {
308 redisContext *c = &(ac->c);
309 c->flags |= REDIS_DISCONNECTING;
310 if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
311 __redisAsyncDisconnect(ac);
312}
313
314static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
315 redisContext *c = &(ac->c);
316 dict *callbacks;
317 dictEntry *de;
318 int pvariant;
319 char *stype;
320 sds sname;
321
322 /* Custom reply functions are not supported for pub/sub. This will fail
323 * very hard when they are used... */
324 if (reply->type == REDIS_REPLY_ARRAY) {
325 assert(reply->elements >= 2);
326 assert(reply->element[0]->type == REDIS_REPLY_STRING);
327 stype = reply->element[0]->str;
328 pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
329
330 if (pvariant)
331 callbacks = ac->sub.patterns;
332 else
333 callbacks = ac->sub.channels;
334
335 /* Locate the right callback */
336 assert(reply->element[1]->type == REDIS_REPLY_STRING);
337 sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
338 de = dictFind(callbacks,sname);
339 if (de != NULL) {
340 memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb));
341
342 /* If this is an unsubscribe message, remove it. */
343 if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
344 dictDelete(callbacks,sname);
345
346 /* If this was the last unsubscribe message, revert to
347 * non-subscribe mode. */
348 assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
349 if (reply->element[2]->integer == 0)
350 c->flags &= ~REDIS_SUBSCRIBED;
351 }
352 }
353 sdsfree(sname);
354 } else {
355 /* Shift callback for invalid commands. */
356 __redisShiftCallback(&ac->sub.invalid,dstcb);
357 }
358 return REDIS_OK;
359}
360
361void redisProcessCallbacks(redisAsyncContext *ac) {
362 redisContext *c = &(ac->c);
363 redisCallback cb;
364 void *reply = NULL;
365 int status;
366
367 while((status = redisGetReply(c,&reply)) == REDIS_OK) {
368 if (reply == NULL) {
369 /* When the connection is being disconnected and there are
370 * no more replies, this is the cue to really disconnect. */
371 if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) {
372 __redisAsyncDisconnect(ac);
373 return;
374 }
375
376 /* If monitor mode, repush callback */
377 if(c->flags & REDIS_MONITORING) {
378 __redisPushCallback(&ac->replies,&cb);
379 }
380
381 /* When the connection is not being disconnected, simply stop
382 * trying to get replies and wait for the next loop tick. */
383 break;
384 }
385
386 /* Even if the context is subscribed, pending regular callbacks will
387 * get a reply before pub/sub messages arrive. */
388 if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
389 /*
390 * A spontaneous reply in a not-subscribed context can be the error
391 * reply that is sent when a new connection exceeds the maximum
392 * number of allowed connections on the server side.
393 *
394 * This is seen as an error instead of a regular reply because the
395 * server closes the connection after sending it.
396 *
397 * To prevent the error from being overwritten by an EOF error the
398 * connection is closed here. See issue #43.
399 *
400 * Another possibility is that the server is loading its dataset.
401 * In this case we also want to close the connection, and have the
402 * user wait until the server is ready to take our request.
403 */
404 if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
405 c->err = REDIS_ERR_OTHER;
406 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
407 __redisAsyncDisconnect(ac);
408 return;
409 }
410 /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
411 assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
412 if(c->flags & REDIS_SUBSCRIBED)
413 __redisGetSubscribeCallback(ac,reply,&cb);
414 }
415
416 if (cb.fn != NULL) {
417 __redisRunCallback(ac,&cb,reply);
418 c->reader->fn->freeObject(reply);
419
420 /* Proceed with free'ing when redisAsyncFree() was called. */
421 if (c->flags & REDIS_FREEING) {
422 __redisAsyncFree(ac);
423 return;
424 }
425 } else {
426 /* No callback for this reply. This can either be a NULL callback,
427 * or there were no callbacks to begin with. Either way, don't
428 * abort with an error, but simply ignore it because the client
429 * doesn't know what the server will spit out over the wire. */
430 c->reader->fn->freeObject(reply);
431 }
432 }
433
434 /* Disconnect when there was an error reading the reply */
435 if (status != REDIS_OK)
436 __redisAsyncDisconnect(ac);
437}
438
439/* Internal helper function to detect socket status the first time a read or
440 * write event fires. When connecting was not succesful, the connect callback
441 * is called with a REDIS_ERR status and the context is free'd. */
442static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
443 redisContext *c = &(ac->c);
444
445 if (redisCheckSocketError(c,c->fd) == REDIS_ERR) {
446 /* Try again later when connect(2) is still in progress. */
447 if (errno == EINPROGRESS)
448 return REDIS_OK;
449
450 if (ac->onConnect) ac->onConnect(ac,REDIS_ERR);
451 __redisAsyncDisconnect(ac);
452 return REDIS_ERR;
453 }
454
455 /* Mark context as connected. */
456 c->flags |= REDIS_CONNECTED;
457 if (ac->onConnect) ac->onConnect(ac,REDIS_OK);
458 return REDIS_OK;
459}
460
461/* This function should be called when the socket is readable.
462 * It processes all replies that can be read and executes their callbacks.
463 */
464void redisAsyncHandleRead(redisAsyncContext *ac) {
465 redisContext *c = &(ac->c);
466
467 if (!(c->flags & REDIS_CONNECTED)) {
468 /* Abort connect was not successful. */
469 if (__redisAsyncHandleConnect(ac) != REDIS_OK)
470 return;
471 /* Try again later when the context is still not connected. */
472 if (!(c->flags & REDIS_CONNECTED))
473 return;
474 }
475
476 if (redisBufferRead(c) == REDIS_ERR) {
477 __redisAsyncDisconnect(ac);
478 } else {
479 /* Always re-schedule reads */
480 _EL_ADD_READ(ac);
481 redisProcessCallbacks(ac);
482 }
483}
484
485void redisAsyncHandleWrite(redisAsyncContext *ac) {
486 redisContext *c = &(ac->c);
487 int done = 0;
488
489 if (!(c->flags & REDIS_CONNECTED)) {
490 /* Abort connect was not successful. */
491 if (__redisAsyncHandleConnect(ac) != REDIS_OK)
492 return;
493 /* Try again later when the context is still not connected. */
494 if (!(c->flags & REDIS_CONNECTED))
495 return;
496 }
497
498 if (redisBufferWrite(c,&done) == REDIS_ERR) {
499 __redisAsyncDisconnect(ac);
500 } else {
501 /* Continue writing when not done, stop writing otherwise */
502 if (!done)
503 _EL_ADD_WRITE(ac);
504 else
505 _EL_DEL_WRITE(ac);
506
507 /* Always schedule reads after writes */
508 _EL_ADD_READ(ac);
509 }
510}
511
512/* Sets a pointer to the first argument and its length starting at p. Returns
513 * the number of bytes to skip to get to the following argument. */
514static char *nextArgument(char *start, char **str, size_t *len) {
515 char *p = start;
516 if (p[0] != '$') {
517 p = strchr(p,'$');
518 if (p == NULL) return NULL;
519 }
520
521 *len = (int)strtol(p+1,NULL,10);
522 p = strchr(p,'\r');
523 assert(p);
524 *str = p+2;
525 return p+2+(*len)+2;
526}
527
528/* Helper function for the redisAsyncCommand* family of functions. Writes a
529 * formatted command to the output buffer and registers the provided callback
530 * function with the context. */
531static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) {
532 redisContext *c = &(ac->c);
533 redisCallback cb;
534 int pvariant, hasnext;
535 char *cstr, *astr;
536 size_t clen, alen;
537 char *p;
538 sds sname;
539
540 /* Don't accept new commands when the connection is about to be closed. */
541 if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
542
543 /* Setup callback */
544 cb.fn = fn;
545 cb.privdata = privdata;
546
547 /* Find out which command will be appended. */
548 p = nextArgument(cmd,&cstr,&clen);
549 assert(p != NULL);
550 hasnext = (p[0] == '$');
551 pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
552 cstr += pvariant;
553 clen -= pvariant;
554
555 if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
556 c->flags |= REDIS_SUBSCRIBED;
557
558 /* Add every channel/pattern to the list of subscription callbacks. */
559 while ((p = nextArgument(p,&astr,&alen)) != NULL) {
560 sname = sdsnewlen(astr,alen);
561 if (pvariant)
562 dictReplace(ac->sub.patterns,sname,&cb);
563 else
564 dictReplace(ac->sub.channels,sname,&cb);
565 }
566 } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
567 /* It is only useful to call (P)UNSUBSCRIBE when the context is
568 * subscribed to one or more channels or patterns. */
569 if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
570
571 /* (P)UNSUBSCRIBE does not have its own response: every channel or
572 * pattern that is unsubscribed will receive a message. This means we
573 * should not append a callback function for this command. */
574 } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
575 /* Set monitor flag and push callback */
576 c->flags |= REDIS_MONITORING;
577 __redisPushCallback(&ac->replies,&cb);
578 } else {
579 if (c->flags & REDIS_SUBSCRIBED)
580 /* This will likely result in an error reply, but it needs to be
581 * received and passed to the callback. */
582 __redisPushCallback(&ac->sub.invalid,&cb);
583 else
584 __redisPushCallback(&ac->replies,&cb);
585 }
586
587 __redisAppendCommand(c,cmd,len);
588
589 /* Always schedule a write when the write buffer is non-empty */
590 _EL_ADD_WRITE(ac);
591
592 return REDIS_OK;
593}
594
595int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
596 char *cmd;
597 int len;
598 int status;
599 len = redisvFormatCommand(&cmd,format,ap);
600 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
601 free(cmd);
602 return status;
603}
604
605int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
606 va_list ap;
607 int status;
608 va_start(ap,format);
609 status = redisvAsyncCommand(ac,fn,privdata,format,ap);
610 va_end(ap);
611 return status;
612}
613
614int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
615 char *cmd;
616 int len;
617 int status;
618 len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
619 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
620 free(cmd);
621 return status;
622}