]> git.saurik.com Git - redis.git/blob - deps/hiredis/async.c
Re-use variable data in redis-benchmark
[redis.git] / deps / hiredis / async.c
1 /*
2 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2010, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 *
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include <string.h>
33 #include <assert.h>
34 #include "async.h"
35 #include "sds.h"
36 #include "util.h"
37
38 /* Forward declaration of function in hiredis.c */
39 void __redisAppendCommand(redisContext *c, char *cmd, size_t len);
40
41 static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
42 redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext));
43 c = &(ac->c);
44
45 /* The regular connect functions will always set the flag REDIS_CONNECTED.
46 * For the async API, we want to wait until the first write event is
47 * received up before setting this flag, so reset it here. */
48 c->flags &= ~REDIS_CONNECTED;
49
50 ac->err = 0;
51 ac->errstr = NULL;
52 ac->data = NULL;
53 ac->_adapter_data = NULL;
54
55 ac->evAddRead = NULL;
56 ac->evDelRead = NULL;
57 ac->evAddWrite = NULL;
58 ac->evDelWrite = NULL;
59 ac->evCleanup = NULL;
60
61 ac->onConnect = NULL;
62 ac->onDisconnect = NULL;
63
64 ac->replies.head = NULL;
65 ac->replies.tail = NULL;
66 return ac;
67 }
68
69 /* We want the error field to be accessible directly instead of requiring
70 * an indirection to the redisContext struct. */
71 static void __redisAsyncCopyError(redisAsyncContext *ac) {
72 redisContext *c = &(ac->c);
73 ac->err = c->err;
74 ac->errstr = c->errstr;
75 }
76
77 redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
78 redisContext *c = redisConnectNonBlock(ip,port);
79 redisAsyncContext *ac = redisAsyncInitialize(c);
80 __redisAsyncCopyError(ac);
81 return ac;
82 }
83
84 redisAsyncContext *redisAsyncConnectUnix(const char *path) {
85 redisContext *c = redisConnectUnixNonBlock(path);
86 redisAsyncContext *ac = redisAsyncInitialize(c);
87 __redisAsyncCopyError(ac);
88 return ac;
89 }
90
91 int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn) {
92 redisContext *c = &(ac->c);
93 return redisSetReplyObjectFunctions(c,fn);
94 }
95
96 int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
97 if (ac->onConnect == NULL) {
98 ac->onConnect = fn;
99 return REDIS_OK;
100 }
101 return REDIS_ERR;
102 }
103
104 int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
105 if (ac->onDisconnect == NULL) {
106 ac->onDisconnect = fn;
107 return REDIS_OK;
108 }
109 return REDIS_ERR;
110 }
111
112 /* Helper functions to push/shift callbacks */
113 static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
114 redisCallback *cb;
115
116 /* Copy callback from stack to heap */
117 cb = calloc(1,sizeof(*cb));
118 if (!cb) redisOOM();
119 if (source != NULL) {
120 cb->fn = source->fn;
121 cb->privdata = source->privdata;
122 }
123
124 /* Store callback in list */
125 if (list->head == NULL)
126 list->head = cb;
127 if (list->tail != NULL)
128 list->tail->next = cb;
129 list->tail = cb;
130 return REDIS_OK;
131 }
132
133 static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
134 redisCallback *cb = list->head;
135 if (cb != NULL) {
136 list->head = cb->next;
137 if (cb == list->tail)
138 list->tail = NULL;
139
140 /* Copy callback from heap to stack */
141 if (target != NULL)
142 memcpy(target,cb,sizeof(*cb));
143 free(cb);
144 return REDIS_OK;
145 }
146 return REDIS_ERR;
147 }
148
149 /* Tries to do a clean disconnect from Redis, meaning it stops new commands
150 * from being issued, but tries to flush the output buffer and execute
151 * callbacks for all remaining replies.
152 *
153 * This functions is generally called from within a callback, so the
154 * processCallbacks function will pick up the flag when there are no
155 * more replies. */
156 void redisAsyncDisconnect(redisAsyncContext *ac) {
157 redisContext *c = &(ac->c);
158 c->flags |= REDIS_DISCONNECTING;
159 }
160
161 /* Helper function to make the disconnect happen and clean up. */
162 static void __redisAsyncDisconnect(redisAsyncContext *ac) {
163 redisContext *c = &(ac->c);
164 redisCallback cb;
165 int status;
166
167 /* Make sure error is accessible if there is any */
168 __redisAsyncCopyError(ac);
169 status = (ac->err == 0) ? REDIS_OK : REDIS_ERR;
170
171 if (status == REDIS_OK) {
172 /* When the connection is cleanly disconnected, there should not
173 * be pending callbacks. */
174 assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
175 } else {
176 /* Callbacks should not be able to issue new commands. */
177 c->flags |= REDIS_DISCONNECTING;
178
179 /* Execute pending callbacks with NULL reply. */
180 while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) {
181 if (cb.fn != NULL)
182 cb.fn(ac,NULL,cb.privdata);
183 }
184 }
185
186 /* Signal event lib to clean up */
187 if (ac->evCleanup) ac->evCleanup(ac->_adapter_data);
188
189 /* Execute callback with proper status */
190 if (ac->onDisconnect) ac->onDisconnect(ac,status);
191
192 /* Cleanup self */
193 redisFree(c);
194 }
195
196 void redisProcessCallbacks(redisAsyncContext *ac) {
197 redisContext *c = &(ac->c);
198 redisCallback cb;
199 void *reply = NULL;
200 int status;
201
202 while((status = redisGetReply(c,&reply)) == REDIS_OK) {
203 if (reply == NULL) {
204 /* When the connection is being disconnected and there are
205 * no more replies, this is the cue to really disconnect. */
206 if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) {
207 __redisAsyncDisconnect(ac);
208 return;
209 }
210
211 /* When the connection is not being disconnected, simply stop
212 * trying to get replies and wait for the next loop tick. */
213 break;
214 }
215
216 /* Shift callback and execute it */
217 assert(__redisShiftCallback(&ac->replies,&cb) == REDIS_OK);
218 if (cb.fn != NULL) {
219 cb.fn(ac,reply,cb.privdata);
220 } else {
221 c->fn->freeObject(reply);
222 }
223 }
224
225 /* Disconnect when there was an error reading the reply */
226 if (status != REDIS_OK)
227 __redisAsyncDisconnect(ac);
228 }
229
230 /* This function should be called when the socket is readable.
231 * It processes all replies that can be read and executes their callbacks.
232 */
233 void redisAsyncHandleRead(redisAsyncContext *ac) {
234 redisContext *c = &(ac->c);
235
236 if (redisBufferRead(c) == REDIS_ERR) {
237 __redisAsyncDisconnect(ac);
238 } else {
239 /* Always re-schedule reads */
240 if (ac->evAddRead) ac->evAddRead(ac->_adapter_data);
241 redisProcessCallbacks(ac);
242 }
243 }
244
245 void redisAsyncHandleWrite(redisAsyncContext *ac) {
246 redisContext *c = &(ac->c);
247 int done = 0;
248
249 if (redisBufferWrite(c,&done) == REDIS_ERR) {
250 __redisAsyncDisconnect(ac);
251 } else {
252 /* Continue writing when not done, stop writing otherwise */
253 if (!done) {
254 if (ac->evAddWrite) ac->evAddWrite(ac->_adapter_data);
255 } else {
256 if (ac->evDelWrite) ac->evDelWrite(ac->_adapter_data);
257 }
258
259 /* Always schedule reads after writes */
260 if (ac->evAddRead) ac->evAddRead(ac->_adapter_data);
261
262 /* Fire onConnect when this is the first write event. */
263 if (!(c->flags & REDIS_CONNECTED)) {
264 c->flags |= REDIS_CONNECTED;
265 if (ac->onConnect) ac->onConnect(ac);
266 }
267 }
268 }
269
270 /* Helper function for the redisAsyncCommand* family of functions.
271 *
272 * Write a formatted command to the output buffer and register the provided
273 * callback function with the context.
274 */
275 static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) {
276 redisContext *c = &(ac->c);
277 redisCallback cb;
278
279 /* Don't accept new commands when the connection is lazily closed. */
280 if (c->flags & REDIS_DISCONNECTING) return REDIS_ERR;
281 __redisAppendCommand(c,cmd,len);
282
283 /* Store callback */
284 cb.fn = fn;
285 cb.privdata = privdata;
286 __redisPushCallback(&ac->replies,&cb);
287
288 /* Always schedule a write when the write buffer is non-empty */
289 if (ac->evAddWrite) ac->evAddWrite(ac->_adapter_data);
290
291 return REDIS_OK;
292 }
293
294 int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
295 char *cmd;
296 int len;
297 int status;
298 len = redisvFormatCommand(&cmd,format,ap);
299 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
300 free(cmd);
301 return status;
302 }
303
304 int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
305 va_list ap;
306 int status;
307 va_start(ap,format);
308 status = redisvAsyncCommand(ac,fn,privdata,format,ap);
309 va_end(ap);
310 return status;
311 }
312
313 int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
314 char *cmd;
315 int len;
316 int status;
317 len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
318 status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
319 free(cmd);
320 return status;
321 }