]> git.saurik.com Git - redis.git/blob - deps/hiredis/async.c
TODO updated
[redis.git] / deps / hiredis / async.c
1 /*
2 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2010, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 *
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include <string.h>
33 #include <strings.h>
34 #include <assert.h>
35 #include <ctype.h>
36 #include "async.h"
37 #include "dict.c"
38 #include "sds.h"
39 #include "util.h"
40
41 /* Forward declaration of function in hiredis.c */
42 void __redisAppendCommand(redisContext *c, char *cmd, size_t len);
43
44 /* Functions managing dictionary of callbacks for pub/sub. */
45 static unsigned int callbackHash(const void *key) {
46 return dictGenHashFunction((unsigned char*)key,sdslen((char*)key));
47 }
48
49 static void *callbackValDup(void *privdata, const void *src) {
50 ((void) privdata);
51 redisCallback *dup = malloc(sizeof(*dup));
52 memcpy(dup,src,sizeof(*dup));
53 return dup;
54 }
55
56 static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) {
57 int l1, l2;
58 ((void) privdata);
59
60 l1 = sdslen((sds)key1);
61 l2 = sdslen((sds)key2);
62 if (l1 != l2) return 0;
63 return memcmp(key1,key2,l1) == 0;
64 }
65
66 static void callbackKeyDestructor(void *privdata, void *key) {
67 ((void) privdata);
68 sdsfree((sds)key);
69 }
70
71 static void callbackValDestructor(void *privdata, void *val) {
72 ((void) privdata);
73 free(val);
74 }
75
76 static dictType callbackDict = {
77 callbackHash,
78 NULL,
79 callbackValDup,
80 callbackKeyCompare,
81 callbackKeyDestructor,
82 callbackValDestructor
83 };
84
85 static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
86 redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext));
87 c = &(ac->c);
88
89 /* The regular connect functions will always set the flag REDIS_CONNECTED.
90 * For the async API, we want to wait until the first write event is
91 * received up before setting this flag, so reset it here. */
92 c->flags &= ~REDIS_CONNECTED;
93
94 ac->err = 0;
95 ac->errstr = NULL;
96 ac->data = NULL;
97
98 ac->ev.data = NULL;
99 ac->ev.addRead = NULL;
100 ac->ev.delRead = NULL;
101 ac->ev.addWrite = NULL;
102 ac->ev.delWrite = NULL;
103 ac->ev.cleanup = NULL;
104
105 ac->onConnect = NULL;
106 ac->onDisconnect = NULL;
107
108 ac->replies.head = NULL;
109 ac->replies.tail = NULL;
110 ac->sub.invalid.head = NULL;
111 ac->sub.invalid.tail = NULL;
112 ac->sub.channels = dictCreate(&callbackDict,NULL);
113 ac->sub.patterns = dictCreate(&callbackDict,NULL);
114 return ac;
115 }
116
117 /* We want the error field to be accessible directly instead of requiring
118 * an indirection to the redisContext struct. */
119 static void __redisAsyncCopyError(redisAsyncContext *ac) {
120 redisContext *c = &(ac->c);
121 ac->err = c->err;
122 ac->errstr = c->errstr;
123 }
124
125 redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
126 redisContext *c = redisConnectNonBlock(ip,port);
127 redisAsyncContext *ac = redisAsyncInitialize(c);
128 __redisAsyncCopyError(ac);
129 return ac;
130 }
131
132 redisAsyncContext *redisAsyncConnectUnix(const char *path) {
133 redisContext *c = redisConnectUnixNonBlock(path);
134 redisAsyncContext *ac = redisAsyncInitialize(c);
135 __redisAsyncCopyError(ac);
136 return ac;
137 }
138
139 int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn) {
140 redisContext *c = &(ac->c);
141 return redisSetReplyObjectFunctions(c,fn);
142 }
143
144 int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
145 if (ac->onConnect == NULL) {
146 ac->onConnect = fn;
147
148 /* The common way to detect an established connection is to wait for
149 * the first write event to be fired. This assumes the related event
150 * library functions are already set. */
151 if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data);
152 return REDIS_OK;
153 }
154 return REDIS_ERR;
155 }
156
157 int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
158 if (ac->onDisconnect == NULL) {
159 ac->onDisconnect = fn;
160 return REDIS_OK;
161 }
162 return REDIS_ERR;
163 }
164
165 /* Helper functions to push/shift callbacks */
166 static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
167 redisCallback *cb;
168
169 /* Copy callback from stack to heap */
170 cb = malloc(sizeof(*cb));
171 if (!cb) redisOOM();
172 if (source != NULL) {
173 memcpy(cb,source,sizeof(*cb));
174 cb->next = NULL;
175 }
176
177 /* Store callback in list */
178 if (list->head == NULL)
179 list->head = cb;
180 if (list->tail != NULL)
181 list->tail->next = cb;
182 list->tail = cb;
183 return REDIS_OK;
184 }
185
186 static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
187 redisCallback *cb = list->head;
188 if (cb != NULL) {
189 list->head = cb->next;
190 if (cb == list->tail)
191 list->tail = NULL;
192
193 /* Copy callback from heap to stack */
194 if (target != NULL)
195 memcpy(target,cb,sizeof(*cb));
196 free(cb);
197 return REDIS_OK;
198 }
199 return REDIS_ERR;
200 }
201
202 static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
203 redisContext *c = &(ac->c);
204 if (cb->fn != NULL) {
205 c->flags |= REDIS_IN_CALLBACK;
206 cb->fn(ac,reply,cb->privdata);
207 c->flags &= ~REDIS_IN_CALLBACK;
208 }
209 }
210
211 /* Helper function to free the context. */
212 static void __redisAsyncFree(redisAsyncContext *ac) {
213 redisContext *c = &(ac->c);
214 redisCallback cb;
215 dictIterator *it;
216 dictEntry *de;
217
218 /* Execute pending callbacks with NULL reply. */
219 while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
220 __redisRunCallback(ac,&cb,NULL);
221
222 /* Execute callbacks for invalid commands */
223 while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
224 __redisRunCallback(ac,&cb,NULL);
225
226 /* Run subscription callbacks callbacks with NULL reply */
227 it = dictGetIterator(ac->sub.channels);
228 while ((de = dictNext(it)) != NULL)
229 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
230 dictReleaseIterator(it);
231 dictRelease(ac->sub.channels);
232
233 it = dictGetIterator(ac->sub.patterns);
234 while ((de = dictNext(it)) != NULL)
235 __redisRunCallback(ac,dictGetEntryVal(de),NULL);
236 dictReleaseIterator(it);
237 dictRelease(ac->sub.patterns);
238
239 /* Signal event lib to clean up */
240 if (ac->ev.cleanup) ac->ev.cleanup(ac->ev.data);
241
242 /* Execute disconnect callback. When redisAsyncFree() initiated destroying
243 * this context, the status will always be REDIS_OK. */
244 if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
245 if (c->flags & REDIS_FREEING) {
246 ac->onDisconnect(ac,REDIS_OK);
247 } else {
248 ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
249 }
250 }
251
252 /* Cleanup self */
253 redisFree(c);
254 }
255
256 /* Free the async context. When this function is called from a callback,
257 * control needs to be returned to redisProcessCallbacks() before actual
258 * free'ing. To do so, a flag is set on the context which is picked up by
259 * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
260 void redisAsyncFree(redisAsyncContext *ac) {
261 redisContext *c = &(ac->c);
262 c->flags |= REDIS_FREEING;
263 if (!(c->flags & REDIS_IN_CALLBACK))
264 __redisAsyncFree(ac);
265 }
266
267 /* Helper function to make the disconnect happen and clean up. */
268 static void __redisAsyncDisconnect(redisAsyncContext *ac) {
269 redisContext *c = &(ac->c);
270
271 /* Make sure error is accessible if there is any */
272 __redisAsyncCopyError(ac);
273
274 if (ac->err == 0) {
275 /* For clean disconnects, there should be no pending callbacks. */
276 assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
277 } else {
278 /* Disconnection is caused by an error, make sure that pending
279 * callbacks cannot call new commands. */
280 c->flags |= REDIS_DISCONNECTING;
281 }
282
283 /* For non-clean disconnects, __redisAsyncFree() will execute pending
284 * callbacks with a NULL-reply. */
285 __redisAsyncFree(ac);
286 }
287
288 /* Tries to do a clean disconnect from Redis, meaning it stops new commands
289 * from being issued, but tries to flush the output buffer and execute
290 * callbacks for all remaining replies. When this function is called from a
291 * callback, there might be more replies and we can safely defer disconnecting
292 * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
293 * when there are no pending callbacks. */
294 void redisAsyncDisconnect(redisAsyncContext *ac) {
295 redisContext *c = &(ac->c);
296 c->flags |= REDIS_DISCONNECTING;
297 if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
298 __redisAsyncDisconnect(ac);
299 }
300
301 static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
302 redisContext *c = &(ac->c);
303 dict *callbacks;
304 dictEntry *de;
305 int pvariant;
306 char *stype;
307 sds sname;
308
309 /* Custom reply functions are not supported for pub/sub. This will fail
310 * very hard when they are used... */
311 if (reply->type == REDIS_REPLY_ARRAY) {
312 assert(reply->elements >= 2);
313 assert(reply->element[0]->type == REDIS_REPLY_STRING);
314 stype = reply->element[0]->str;
315 pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
316
317 if (pvariant)
318 callbacks = ac->sub.patterns;
319 else
320 callbacks = ac->sub.channels;
321
322 /* Locate the right callback */
323 assert(reply->element[1]->type == REDIS_REPLY_STRING);
324 sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
325 de = dictFind(callbacks,sname);
326 if (de != NULL) {
327 memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb));
328
329 /* If this is an unsubscribe message, remove it. */
330 if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
331 dictDelete(callbacks,sname);
332
333 /* If this was the last unsubscribe message, revert to
334 * non-subscribe mode. */
335 assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
336 if (reply->element[2]->integer == 0)
337 c->flags &= ~REDIS_SUBSCRIBED;
338 }
339 }
340 sdsfree(sname);
341 } else {
342 /* Shift callback for invalid commands. */
343 __redisShiftCallback(&ac->sub.invalid,dstcb);
344 }
345 return REDIS_OK;
346 }
347
348 void redisProcessCallbacks(redisAsyncContext *ac) {
349 redisContext *c = &(ac->c);
350 redisCallback cb;
351 void *reply = NULL;
352 int status;
353
354 while((status = redisGetReply(c,&reply)) == REDIS_OK) {
355 if (reply == NULL) {
356 /* When the connection is being disconnected and there are
357 * no more replies, this is the cue to really disconnect. */
358 if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) {
359 __redisAsyncDisconnect(ac);
360 return;
361 }
362
363 /* When the connection is not being disconnected, simply stop
364 * trying to get replies and wait for the next loop tick. */
365 break;
366 }
367
368 /* Even if the context is subscribed, pending regular callbacks will
369 * get a reply before pub/sub messages arrive. */
370 if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
371 /* No more regular callbacks, the context *must* be subscribed. */
372 assert(c->flags & REDIS_SUBSCRIBED);
373 __redisGetSubscribeCallback(ac,reply,&cb);
374 }
375
376 if (cb.fn != NULL) {
377 __redisRunCallback(ac,&cb,reply);
378 c->fn->freeObject(reply);
379
380 /* Proceed with free'ing when redisAsyncFree() was called. */
381 if (c->flags & REDIS_FREEING) {
382 __redisAsyncFree(ac);
383 return;
384 }
385 } else {
386 /* No callback for this reply. This can either be a NULL callback,
387 * or there were no callbacks to begin with. Either way, don't
388 * abort with an error, but simply ignore it because the client
389 * doesn't know what the server will spit out over the wire. */
390 c->fn->freeObject(reply);
391 }
392 }
393
394 /* Disconnect when there was an error reading the reply */
395 if (status != REDIS_OK)
396 __redisAsyncDisconnect(ac);
397 }
398
399 /* This function should be called when the socket is readable.
400 * It processes all replies that can be read and executes their callbacks.
401 */
402 void redisAsyncHandleRead(redisAsyncContext *ac) {
403 redisContext *c = &(ac->c);
404
405 if (redisBufferRead(c) == REDIS_ERR) {
406 __redisAsyncDisconnect(ac);
407 } else {
408 /* Always re-schedule reads */
409 if (ac->ev.addRead) ac->ev.addRead(ac->ev.data);
410 redisProcessCallbacks(ac);
411 }
412 }
413
414 void redisAsyncHandleWrite(redisAsyncContext *ac) {
415 redisContext *c = &(ac->c);
416 int done = 0;
417
418 if (redisBufferWrite(c,&done) == REDIS_ERR) {
419 __redisAsyncDisconnect(ac);
420 } else {
421 /* Continue writing when not done, stop writing otherwise */
422 if (!done) {
423 if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data);
424 } else {
425 if (ac->ev.delWrite) ac->ev.delWrite(ac->ev.data);
426 }
427
428 /* Always schedule reads after writes */
429 if (ac->ev.addRead) ac->ev.addRead(ac->ev.data);
430
431 /* Fire onConnect when this is the first write event. */
432 if (!(c->flags & REDIS_CONNECTED)) {
433 c->flags |= REDIS_CONNECTED;
434 if (ac->onConnect) ac->onConnect(ac);
435 }
436 }
437 }
438
439 /* Sets a pointer to the first argument and its length starting at p. Returns
440 * the number of bytes to skip to get to the following argument. */
441 static char *nextArgument(char *start, char **str, size_t *len) {
442 char *p = start;
443 if (p[0] != '$') {
444 p = strchr(p,'$');
445 if (p == NULL) return NULL;
446 }
447
448 *len = (int)strtol(p+1,NULL,10);
449 p = strchr(p,'\r');
450 assert(p);
451 *str = p+2;
452 return p+2+(*len)+2;
453 }
454
455 /* Helper function for the redisAsyncCommand* family of functions. Writes a
456 * formatted command to the output buffer and registers the provided callback
457 * function with the context. */
458 static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) {
459 redisContext *c = &(ac->c);
460 redisCallback cb;
461 int pvariant, hasnext;
462 char *cstr, *astr;
463 size_t clen, alen;
464 char *p;
465 sds sname;
466
467 /* Don't accept new commands when the connection is about to be closed. */
468 if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
469
470 /* Setup callback */
471 cb.fn = fn;
472 cb.privdata = privdata;
473
474 /* Find out which command will be appended. */
475 p = nextArgument(cmd,&cstr,&clen);
476 assert(p != NULL);
477 hasnext = (p[0] == '$');
478 pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
479 cstr += pvariant;
480 clen -= pvariant;
481
482 if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
483 c->flags |= REDIS_SUBSCRIBED;
484
485 /* Add every channel/pattern to the list of subscription callbacks. */
486 while ((p = nextArgument(p,&astr,&alen)) != NULL) {
487 sname = sdsnewlen(astr,alen);
488 if (pvariant)
489 dictReplace(ac->sub.patterns,sname,&cb);
490 else
491 dictReplace(ac->sub.channels,sname,&cb);
492 }
493 } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
494 /* It is only useful to call (P)UNSUBSCRIBE when the context is
495 * subscribed to one or more channels or patterns. */
496 if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
497
498 /* (P)UNSUBSCRIBE does not have its own response: every channel or
499 * pattern that is unsubscribed will receive a message. This means we
500 * should not append a callback function for this command. */
501 } else {
502 if (c->flags & REDIS_SUBSCRIBED)
503 /* This will likely result in an error reply, but it needs to be
504 * received and passed to the callback. */
505 __redisPushCallback(&ac->sub.invalid,&cb);
506 else
507 __redisPushCallback(&ac->replies,&cb);
508 }
509
510 __redisAppendCommand(c,cmd,len);
511
512 /* Always schedule a write when the write buffer is non-empty */
513 if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data);
514
515 return REDIS_OK;
516 }
517
518 int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
519 char *cmd;
520 int len;
521 int status;
522 len = redisvFormatCommand(&cmd,format,ap);
523 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
524 free(cmd);
525 return status;
526 }
527
528 int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
529 va_list ap;
530 int status;
531 va_start(ap,format);
532 status = redisvAsyncCommand(ac,fn,privdata,format,ap);
533 va_end(ap);
534 return status;
535 }
536
537 int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
538 char *cmd;
539 int len;
540 int status;
541 len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
542 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
543 free(cmd);
544 return status;
545 }