]> git.saurik.com Git - redis.git/blob - deps/hiredis/async.c
redis-benchmark: a few fixes to pipelining implementation.
[redis.git] / deps / hiredis / async.c
1 /*
2 * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 *
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include "fmacros.h"
33 #include <stdlib.h>
34 #include <string.h>
35 #include <strings.h>
36 #include <assert.h>
37 #include <ctype.h>
38 #include <errno.h>
39 #include "async.h"
40 #include "net.h"
41 #include "dict.c"
42 #include "sds.h"
43
44 #define _EL_ADD_READ(ctx) do { \
45 if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
46 } while(0)
47 #define _EL_DEL_READ(ctx) do { \
48 if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
49 } while(0)
50 #define _EL_ADD_WRITE(ctx) do { \
51 if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
52 } while(0)
53 #define _EL_DEL_WRITE(ctx) do { \
54 if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
55 } while(0)
56 #define _EL_CLEANUP(ctx) do { \
57 if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
58 } while(0);
59
60 /* Forward declaration of function in hiredis.c */
61 void __redisAppendCommand(redisContext *c, char *cmd, size_t len);
62
63 /* Functions managing dictionary of callbacks for pub/sub. */
64 static unsigned int callbackHash(const void *key) {
65 return dictGenHashFunction((unsigned char*)key,sdslen((char*)key));
66 }
67
68 static void *callbackValDup(void *privdata, const void *src) {
69 ((void) privdata);
70 redisCallback *dup = malloc(sizeof(*dup));
71 memcpy(dup,src,sizeof(*dup));
72 return dup;
73 }
74
75 static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) {
76 int l1, l2;
77 ((void) privdata);
78
79 l1 = sdslen((sds)key1);
80 l2 = sdslen((sds)key2);
81 if (l1 != l2) return 0;
82 return memcmp(key1,key2,l1) == 0;
83 }
84
85 static void callbackKeyDestructor(void *privdata, void *key) {
86 ((void) privdata);
87 sdsfree((sds)key);
88 }
89
90 static void callbackValDestructor(void *privdata, void *val) {
91 ((void) privdata);
92 free(val);
93 }
94
95 static dictType callbackDict = {
96 callbackHash,
97 NULL,
98 callbackValDup,
99 callbackKeyCompare,
100 callbackKeyDestructor,
101 callbackValDestructor
102 };
103
104 static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
105 redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext));
106 c = &(ac->c);
107
108 /* The regular connect functions will always set the flag REDIS_CONNECTED.
109 * For the async API, we want to wait until the first write event is
110 * received up before setting this flag, so reset it here. */
111 c->flags &= ~REDIS_CONNECTED;
112
113 ac->err = 0;
114 ac->errstr = NULL;
115 ac->data = NULL;
116
117 ac->ev.data = NULL;
118 ac->ev.addRead = NULL;
119 ac->ev.delRead = NULL;
120 ac->ev.addWrite = NULL;
121 ac->ev.delWrite = NULL;
122 ac->ev.cleanup = NULL;
123
124 ac->onConnect = NULL;
125 ac->onDisconnect = NULL;
126
127 ac->replies.head = NULL;
128 ac->replies.tail = NULL;
129 ac->sub.invalid.head = NULL;
130 ac->sub.invalid.tail = NULL;
131 ac->sub.channels = dictCreate(&callbackDict,NULL);
132 ac->sub.patterns = dictCreate(&callbackDict,NULL);
133 return ac;
134 }
135
136 /* We want the error field to be accessible directly instead of requiring
137 * an indirection to the redisContext struct. */
138 static void __redisAsyncCopyError(redisAsyncContext *ac) {
139 redisContext *c = &(ac->c);
140 ac->err = c->err;
141 ac->errstr = c->errstr;
142 }
143
144 redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
145 redisContext *c = redisConnectNonBlock(ip,port);
146 redisAsyncContext *ac = redisAsyncInitialize(c);
147 __redisAsyncCopyError(ac);
148 return ac;
149 }
150
151 redisAsyncContext *redisAsyncConnectUnix(const char *path) {
152 redisContext *c = redisConnectUnixNonBlock(path);
153 redisAsyncContext *ac = redisAsyncInitialize(c);
154 __redisAsyncCopyError(ac);
155 return ac;
156 }
157
158 int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
159 if (ac->onConnect == NULL) {
160 ac->onConnect = fn;
161
162 /* The common way to detect an established connection is to wait for
163 * the first write event to be fired. This assumes the related event
164 * library functions are already set. */
165 _EL_ADD_WRITE(ac);
166 return REDIS_OK;
167 }
168 return REDIS_ERR;
169 }
170
171 int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
172 if (ac->onDisconnect == NULL) {
173 ac->onDisconnect = fn;
174 return REDIS_OK;
175 }
176 return REDIS_ERR;
177 }
178
179 /* Helper functions to push/shift callbacks */
180 static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
181 redisCallback *cb;
182
183 /* Copy callback from stack to heap */
184 cb = malloc(sizeof(*cb));
185 if (source != NULL) {
186 memcpy(cb,source,sizeof(*cb));
187 cb->next = NULL;
188 }
189
190 /* Store callback in list */
191 if (list->head == NULL)
192 list->head = cb;
193 if (list->tail != NULL)
194 list->tail->next = cb;
195 list->tail = cb;
196 return REDIS_OK;
197 }
198
199 static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
200 redisCallback *cb = list->head;
201 if (cb != NULL) {
202 list->head = cb->next;
203 if (cb == list->tail)
204 list->tail = NULL;
205
206 /* Copy callback from heap to stack */
207 if (target != NULL)
208 memcpy(target,cb,sizeof(*cb));
209 free(cb);
210 return REDIS_OK;
211 }
212 return REDIS_ERR;
213 }
214
215 static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
216 redisContext *c = &(ac->c);
217 if (cb->fn != NULL) {
218 c->flags |= REDIS_IN_CALLBACK;
219 cb->fn(ac,reply,cb->privdata);
220 c->flags &= ~REDIS_IN_CALLBACK;
221 }
222 }
223
224 /* Helper function to free the context. */
225 static void __redisAsyncFree(redisAsyncContext *ac) {
226 redisContext *c = &(ac->c);
227 redisCallback cb;
228 dictIterator *it;
229 dictEntry *de;
230
231 /* Execute pending callbacks with NULL reply. */
232 while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
233 __redisRunCallback(ac,&cb,NULL);
234
235 /* Execute callbacks for invalid commands */
236 while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
237 __redisRunCallback(ac,&cb,NULL);
238
239 /* Run subscription callbacks callbacks with NULL reply */
240 it = dictGetIterator(ac->sub.channels);
241 while ((de = dictNext(it)) != NULL)
242 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
243 dictReleaseIterator(it);
244 dictRelease(ac->sub.channels);
245
246 it = dictGetIterator(ac->sub.patterns);
247 while ((de = dictNext(it)) != NULL)
248 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
249 dictReleaseIterator(it);
250 dictRelease(ac->sub.patterns);
251
252 /* Signal event lib to clean up */
253 _EL_CLEANUP(ac);
254
255 /* Execute disconnect callback. When redisAsyncFree() initiated destroying
256 * this context, the status will always be REDIS_OK. */
257 if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
258 if (c->flags & REDIS_FREEING) {
259 ac->onDisconnect(ac,REDIS_OK);
260 } else {
261 ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
262 }
263 }
264
265 /* Cleanup self */
266 redisFree(c);
267 }
268
269 /* Free the async context. When this function is called from a callback,
270 * control needs to be returned to redisProcessCallbacks() before actual
271 * free'ing. To do so, a flag is set on the context which is picked up by
272 * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
273 void redisAsyncFree(redisAsyncContext *ac) {
274 redisContext *c = &(ac->c);
275 c->flags |= REDIS_FREEING;
276 if (!(c->flags & REDIS_IN_CALLBACK))
277 __redisAsyncFree(ac);
278 }
279
280 /* Helper function to make the disconnect happen and clean up. */
281 static void __redisAsyncDisconnect(redisAsyncContext *ac) {
282 redisContext *c = &(ac->c);
283
284 /* Make sure error is accessible if there is any */
285 __redisAsyncCopyError(ac);
286
287 if (ac->err == 0) {
288 /* For clean disconnects, there should be no pending callbacks. */
289 assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
290 } else {
291 /* Disconnection is caused by an error, make sure that pending
292 * callbacks cannot call new commands. */
293 c->flags |= REDIS_DISCONNECTING;
294 }
295
296 /* For non-clean disconnects, __redisAsyncFree() will execute pending
297 * callbacks with a NULL-reply. */
298 __redisAsyncFree(ac);
299 }
300
301 /* Tries to do a clean disconnect from Redis, meaning it stops new commands
302 * from being issued, but tries to flush the output buffer and execute
303 * callbacks for all remaining replies. When this function is called from a
304 * callback, there might be more replies and we can safely defer disconnecting
305 * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
306 * when there are no pending callbacks. */
307 void redisAsyncDisconnect(redisAsyncContext *ac) {
308 redisContext *c = &(ac->c);
309 c->flags |= REDIS_DISCONNECTING;
310 if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
311 __redisAsyncDisconnect(ac);
312 }
313
314 static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
315 redisContext *c = &(ac->c);
316 dict *callbacks;
317 dictEntry *de;
318 int pvariant;
319 char *stype;
320 sds sname;
321
322 /* Custom reply functions are not supported for pub/sub. This will fail
323 * very hard when they are used... */
324 if (reply->type == REDIS_REPLY_ARRAY) {
325 assert(reply->elements >= 2);
326 assert(reply->element[0]->type == REDIS_REPLY_STRING);
327 stype = reply->element[0]->str;
328 pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
329
330 if (pvariant)
331 callbacks = ac->sub.patterns;
332 else
333 callbacks = ac->sub.channels;
334
335 /* Locate the right callback */
336 assert(reply->element[1]->type == REDIS_REPLY_STRING);
337 sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
338 de = dictFind(callbacks,sname);
339 if (de != NULL) {
340 memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb));
341
342 /* If this is an unsubscribe message, remove it. */
343 if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
344 dictDelete(callbacks,sname);
345
346 /* If this was the last unsubscribe message, revert to
347 * non-subscribe mode. */
348 assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
349 if (reply->element[2]->integer == 0)
350 c->flags &= ~REDIS_SUBSCRIBED;
351 }
352 }
353 sdsfree(sname);
354 } else {
355 /* Shift callback for invalid commands. */
356 __redisShiftCallback(&ac->sub.invalid,dstcb);
357 }
358 return REDIS_OK;
359 }
360
361 void redisProcessCallbacks(redisAsyncContext *ac) {
362 redisContext *c = &(ac->c);
363 redisCallback cb;
364 void *reply = NULL;
365 int status;
366
367 while((status = redisGetReply(c,&reply)) == REDIS_OK) {
368 if (reply == NULL) {
369 /* When the connection is being disconnected and there are
370 * no more replies, this is the cue to really disconnect. */
371 if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) {
372 __redisAsyncDisconnect(ac);
373 return;
374 }
375
376 /* When the connection is not being disconnected, simply stop
377 * trying to get replies and wait for the next loop tick. */
378 break;
379 }
380
381 /* Even if the context is subscribed, pending regular callbacks will
382 * get a reply before pub/sub messages arrive. */
383 if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
384 /* A spontaneous reply in a not-subscribed context can only be the
385 * error reply that is sent when a new connection exceeds the
386 * maximum number of allowed connections on the server side. This
387 * is seen as an error instead of a regular reply because the
388 * server closes the connection after sending it. To prevent the
389 * error from being overwritten by an EOF error the connection is
390 * closed here. See issue #43. */
391 if ( !(c->flags & REDIS_SUBSCRIBED) && ((redisReply*)reply)->type == REDIS_REPLY_ERROR ) {
392 c->err = REDIS_ERR_OTHER;
393 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
394 __redisAsyncDisconnect(ac);
395 return;
396 }
397 /* No more regular callbacks and no errors, the context *must* be subscribed. */
398 assert(c->flags & REDIS_SUBSCRIBED);
399 __redisGetSubscribeCallback(ac,reply,&cb);
400 }
401
402 if (cb.fn != NULL) {
403 __redisRunCallback(ac,&cb,reply);
404 c->reader->fn->freeObject(reply);
405
406 /* Proceed with free'ing when redisAsyncFree() was called. */
407 if (c->flags & REDIS_FREEING) {
408 __redisAsyncFree(ac);
409 return;
410 }
411 } else {
412 /* No callback for this reply. This can either be a NULL callback,
413 * or there were no callbacks to begin with. Either way, don't
414 * abort with an error, but simply ignore it because the client
415 * doesn't know what the server will spit out over the wire. */
416 c->reader->fn->freeObject(reply);
417 }
418 }
419
420 /* Disconnect when there was an error reading the reply */
421 if (status != REDIS_OK)
422 __redisAsyncDisconnect(ac);
423 }
424
425 /* Internal helper function to detect socket status the first time a read or
426 * write event fires. When connecting was not succesful, the connect callback
427 * is called with a REDIS_ERR status and the context is free'd. */
428 static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
429 redisContext *c = &(ac->c);
430
431 if (redisCheckSocketError(c,c->fd) == REDIS_ERR) {
432 /* Try again later when connect(2) is still in progress. */
433 if (errno == EINPROGRESS)
434 return REDIS_OK;
435
436 if (ac->onConnect) ac->onConnect(ac,REDIS_ERR);
437 __redisAsyncDisconnect(ac);
438 return REDIS_ERR;
439 }
440
441 /* Mark context as connected. */
442 c->flags |= REDIS_CONNECTED;
443 if (ac->onConnect) ac->onConnect(ac,REDIS_OK);
444 return REDIS_OK;
445 }
446
447 /* This function should be called when the socket is readable.
448 * It processes all replies that can be read and executes their callbacks.
449 */
450 void redisAsyncHandleRead(redisAsyncContext *ac) {
451 redisContext *c = &(ac->c);
452
453 if (!(c->flags & REDIS_CONNECTED)) {
454 /* Abort connect was not successful. */
455 if (__redisAsyncHandleConnect(ac) != REDIS_OK)
456 return;
457 /* Try again later when the context is still not connected. */
458 if (!(c->flags & REDIS_CONNECTED))
459 return;
460 }
461
462 if (redisBufferRead(c) == REDIS_ERR) {
463 __redisAsyncDisconnect(ac);
464 } else {
465 /* Always re-schedule reads */
466 _EL_ADD_READ(ac);
467 redisProcessCallbacks(ac);
468 }
469 }
470
471 void redisAsyncHandleWrite(redisAsyncContext *ac) {
472 redisContext *c = &(ac->c);
473 int done = 0;
474
475 if (!(c->flags & REDIS_CONNECTED)) {
476 /* Abort connect was not successful. */
477 if (__redisAsyncHandleConnect(ac) != REDIS_OK)
478 return;
479 /* Try again later when the context is still not connected. */
480 if (!(c->flags & REDIS_CONNECTED))
481 return;
482 }
483
484 if (redisBufferWrite(c,&done) == REDIS_ERR) {
485 __redisAsyncDisconnect(ac);
486 } else {
487 /* Continue writing when not done, stop writing otherwise */
488 if (!done)
489 _EL_ADD_WRITE(ac);
490 else
491 _EL_DEL_WRITE(ac);
492
493 /* Always schedule reads after writes */
494 _EL_ADD_READ(ac);
495 }
496 }
497
498 /* Sets a pointer to the first argument and its length starting at p. Returns
499 * the number of bytes to skip to get to the following argument. */
500 static char *nextArgument(char *start, char **str, size_t *len) {
501 char *p = start;
502 if (p[0] != '$') {
503 p = strchr(p,'$');
504 if (p == NULL) return NULL;
505 }
506
507 *len = (int)strtol(p+1,NULL,10);
508 p = strchr(p,'\r');
509 assert(p);
510 *str = p+2;
511 return p+2+(*len)+2;
512 }
513
514 /* Helper function for the redisAsyncCommand* family of functions. Writes a
515 * formatted command to the output buffer and registers the provided callback
516 * function with the context. */
517 static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) {
518 redisContext *c = &(ac->c);
519 redisCallback cb;
520 int pvariant, hasnext;
521 char *cstr, *astr;
522 size_t clen, alen;
523 char *p;
524 sds sname;
525
526 /* Don't accept new commands when the connection is about to be closed. */
527 if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
528
529 /* Setup callback */
530 cb.fn = fn;
531 cb.privdata = privdata;
532
533 /* Find out which command will be appended. */
534 p = nextArgument(cmd,&cstr,&clen);
535 assert(p != NULL);
536 hasnext = (p[0] == '$');
537 pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
538 cstr += pvariant;
539 clen -= pvariant;
540
541 if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
542 c->flags |= REDIS_SUBSCRIBED;
543
544 /* Add every channel/pattern to the list of subscription callbacks. */
545 while ((p = nextArgument(p,&astr,&alen)) != NULL) {
546 sname = sdsnewlen(astr,alen);
547 if (pvariant)
548 dictReplace(ac->sub.patterns,sname,&cb);
549 else
550 dictReplace(ac->sub.channels,sname,&cb);
551 }
552 } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
553 /* It is only useful to call (P)UNSUBSCRIBE when the context is
554 * subscribed to one or more channels or patterns. */
555 if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
556
557 /* (P)UNSUBSCRIBE does not have its own response: every channel or
558 * pattern that is unsubscribed will receive a message. This means we
559 * should not append a callback function for this command. */
560 } else {
561 if (c->flags & REDIS_SUBSCRIBED)
562 /* This will likely result in an error reply, but it needs to be
563 * received and passed to the callback. */
564 __redisPushCallback(&ac->sub.invalid,&cb);
565 else
566 __redisPushCallback(&ac->replies,&cb);
567 }
568
569 __redisAppendCommand(c,cmd,len);
570
571 /* Always schedule a write when the write buffer is non-empty */
572 _EL_ADD_WRITE(ac);
573
574 return REDIS_OK;
575 }
576
577 int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
578 char *cmd;
579 int len;
580 int status;
581 len = redisvFormatCommand(&cmd,format,ap);
582 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
583 free(cmd);
584 return status;
585 }
586
587 int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
588 va_list ap;
589 int status;
590 va_start(ap,format);
591 status = redisvAsyncCommand(ac,fn,privdata,format,ap);
592 va_end(ap);
593 return status;
594 }
595
596 int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
597 char *cmd;
598 int len;
599 int status;
600 len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
601 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
602 free(cmd);
603 return status;
604 }