]>
Commit | Line | Data |
---|---|---|
24f753a8 PN |
1 | /* |
2 | * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com> | |
a1e97d69 PN |
3 | * Copyright (c) 2010, Pieter Noordhuis <pcnoordhuis at gmail dot com> |
4 | * | |
24f753a8 PN |
5 | * All rights reserved. |
6 | * | |
7 | * Redistribution and use in source and binary forms, with or without | |
8 | * modification, are permitted provided that the following conditions are met: | |
9 | * | |
10 | * * Redistributions of source code must retain the above copyright notice, | |
11 | * this list of conditions and the following disclaimer. | |
12 | * * Redistributions in binary form must reproduce the above copyright | |
13 | * notice, this list of conditions and the following disclaimer in the | |
14 | * documentation and/or other materials provided with the distribution. | |
15 | * * Neither the name of Redis nor the names of its contributors may be used | |
16 | * to endorse or promote products derived from this software without | |
17 | * specific prior written permission. | |
18 | * | |
19 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
20 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
21 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
22 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE | |
23 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
24 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
25 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
26 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
27 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
28 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
29 | * POSSIBILITY OF SUCH DAMAGE. | |
30 | */ | |
31 | ||
32 | #include <string.h> | |
33 | #include <assert.h> | |
34 | #include "async.h" | |
35 | #include "sds.h" | |
36 | #include "util.h" | |
37 | ||
38 | /* Forward declaration of function in hiredis.c */ | |
39 | void __redisAppendCommand(redisContext *c, char *cmd, size_t len); | |
40 | ||
41 | static redisAsyncContext *redisAsyncInitialize(redisContext *c) { | |
42 | redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext)); | |
a1e97d69 PN |
43 | c = &(ac->c); |
44 | ||
45 | /* The regular connect functions will always set the flag REDIS_CONNECTED. | |
46 | * For the async API, we want to wait until the first write event is | |
47 | * received up before setting this flag, so reset it here. */ | |
48 | c->flags &= ~REDIS_CONNECTED; | |
49 | ||
50 | ac->err = 0; | |
51 | ac->errstr = NULL; | |
52 | ac->data = NULL; | |
53 | ac->_adapter_data = NULL; | |
54 | ||
55 | ac->evAddRead = NULL; | |
56 | ac->evDelRead = NULL; | |
57 | ac->evAddWrite = NULL; | |
58 | ac->evDelWrite = NULL; | |
59 | ac->evCleanup = NULL; | |
60 | ||
61 | ac->onConnect = NULL; | |
62 | ac->onDisconnect = NULL; | |
63 | ||
64 | ac->replies.head = NULL; | |
65 | ac->replies.tail = NULL; | |
24f753a8 PN |
66 | return ac; |
67 | } | |
68 | ||
69 | /* We want the error field to be accessible directly instead of requiring | |
70 | * an indirection to the redisContext struct. */ | |
71 | static void __redisAsyncCopyError(redisAsyncContext *ac) { | |
72 | redisContext *c = &(ac->c); | |
73 | ac->err = c->err; | |
74 | ac->errstr = c->errstr; | |
75 | } | |
76 | ||
77 | redisAsyncContext *redisAsyncConnect(const char *ip, int port) { | |
78 | redisContext *c = redisConnectNonBlock(ip,port); | |
79 | redisAsyncContext *ac = redisAsyncInitialize(c); | |
80 | __redisAsyncCopyError(ac); | |
81 | return ac; | |
82 | } | |
83 | ||
84 | redisAsyncContext *redisAsyncConnectUnix(const char *path) { | |
85 | redisContext *c = redisConnectUnixNonBlock(path); | |
86 | redisAsyncContext *ac = redisAsyncInitialize(c); | |
87 | __redisAsyncCopyError(ac); | |
88 | return ac; | |
89 | } | |
90 | ||
91 | int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn) { | |
92 | redisContext *c = &(ac->c); | |
93 | return redisSetReplyObjectFunctions(c,fn); | |
94 | } | |
95 | ||
a1e97d69 PN |
96 | int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) { |
97 | if (ac->onConnect == NULL) { | |
98 | ac->onConnect = fn; | |
99 | return REDIS_OK; | |
100 | } | |
101 | return REDIS_ERR; | |
102 | } | |
103 | ||
24f753a8 PN |
104 | int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) { |
105 | if (ac->onDisconnect == NULL) { | |
106 | ac->onDisconnect = fn; | |
107 | return REDIS_OK; | |
108 | } | |
109 | return REDIS_ERR; | |
110 | } | |
111 | ||
112 | /* Helper functions to push/shift callbacks */ | |
113 | static int __redisPushCallback(redisCallbackList *list, redisCallback *source) { | |
114 | redisCallback *cb; | |
115 | ||
116 | /* Copy callback from stack to heap */ | |
117 | cb = calloc(1,sizeof(*cb)); | |
118 | if (!cb) redisOOM(); | |
119 | if (source != NULL) { | |
120 | cb->fn = source->fn; | |
121 | cb->privdata = source->privdata; | |
122 | } | |
123 | ||
124 | /* Store callback in list */ | |
125 | if (list->head == NULL) | |
126 | list->head = cb; | |
127 | if (list->tail != NULL) | |
128 | list->tail->next = cb; | |
129 | list->tail = cb; | |
130 | return REDIS_OK; | |
131 | } | |
132 | ||
133 | static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) { | |
134 | redisCallback *cb = list->head; | |
135 | if (cb != NULL) { | |
136 | list->head = cb->next; | |
137 | if (cb == list->tail) | |
138 | list->tail = NULL; | |
139 | ||
140 | /* Copy callback from heap to stack */ | |
141 | if (target != NULL) | |
142 | memcpy(target,cb,sizeof(*cb)); | |
143 | free(cb); | |
144 | return REDIS_OK; | |
145 | } | |
146 | return REDIS_ERR; | |
147 | } | |
148 | ||
149 | /* Tries to do a clean disconnect from Redis, meaning it stops new commands | |
150 | * from being issued, but tries to flush the output buffer and execute | |
151 | * callbacks for all remaining replies. | |
152 | * | |
153 | * This functions is generally called from within a callback, so the | |
154 | * processCallbacks function will pick up the flag when there are no | |
155 | * more replies. */ | |
156 | void redisAsyncDisconnect(redisAsyncContext *ac) { | |
157 | redisContext *c = &(ac->c); | |
158 | c->flags |= REDIS_DISCONNECTING; | |
159 | } | |
160 | ||
161 | /* Helper function to make the disconnect happen and clean up. */ | |
162 | static void __redisAsyncDisconnect(redisAsyncContext *ac) { | |
163 | redisContext *c = &(ac->c); | |
164 | redisCallback cb; | |
165 | int status; | |
166 | ||
167 | /* Make sure error is accessible if there is any */ | |
168 | __redisAsyncCopyError(ac); | |
169 | status = (ac->err == 0) ? REDIS_OK : REDIS_ERR; | |
170 | ||
171 | if (status == REDIS_OK) { | |
172 | /* When the connection is cleanly disconnected, there should not | |
173 | * be pending callbacks. */ | |
174 | assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR); | |
175 | } else { | |
176 | /* Callbacks should not be able to issue new commands. */ | |
177 | c->flags |= REDIS_DISCONNECTING; | |
178 | ||
179 | /* Execute pending callbacks with NULL reply. */ | |
180 | while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) { | |
181 | if (cb.fn != NULL) | |
182 | cb.fn(ac,NULL,cb.privdata); | |
183 | } | |
184 | } | |
185 | ||
186 | /* Signal event lib to clean up */ | |
a1e97d69 | 187 | if (ac->evCleanup) ac->evCleanup(ac->_adapter_data); |
24f753a8 PN |
188 | |
189 | /* Execute callback with proper status */ | |
190 | if (ac->onDisconnect) ac->onDisconnect(ac,status); | |
191 | ||
192 | /* Cleanup self */ | |
193 | redisFree(c); | |
194 | } | |
195 | ||
196 | void redisProcessCallbacks(redisAsyncContext *ac) { | |
197 | redisContext *c = &(ac->c); | |
198 | redisCallback cb; | |
199 | void *reply = NULL; | |
200 | int status; | |
201 | ||
202 | while((status = redisGetReply(c,&reply)) == REDIS_OK) { | |
203 | if (reply == NULL) { | |
204 | /* When the connection is being disconnected and there are | |
205 | * no more replies, this is the cue to really disconnect. */ | |
206 | if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) { | |
207 | __redisAsyncDisconnect(ac); | |
208 | return; | |
209 | } | |
210 | ||
211 | /* When the connection is not being disconnected, simply stop | |
212 | * trying to get replies and wait for the next loop tick. */ | |
213 | break; | |
214 | } | |
215 | ||
216 | /* Shift callback and execute it */ | |
217 | assert(__redisShiftCallback(&ac->replies,&cb) == REDIS_OK); | |
218 | if (cb.fn != NULL) { | |
219 | cb.fn(ac,reply,cb.privdata); | |
220 | } else { | |
221 | c->fn->freeObject(reply); | |
222 | } | |
223 | } | |
224 | ||
225 | /* Disconnect when there was an error reading the reply */ | |
226 | if (status != REDIS_OK) | |
227 | __redisAsyncDisconnect(ac); | |
228 | } | |
229 | ||
230 | /* This function should be called when the socket is readable. | |
231 | * It processes all replies that can be read and executes their callbacks. | |
232 | */ | |
233 | void redisAsyncHandleRead(redisAsyncContext *ac) { | |
234 | redisContext *c = &(ac->c); | |
235 | ||
236 | if (redisBufferRead(c) == REDIS_ERR) { | |
237 | __redisAsyncDisconnect(ac); | |
238 | } else { | |
239 | /* Always re-schedule reads */ | |
a1e97d69 | 240 | if (ac->evAddRead) ac->evAddRead(ac->_adapter_data); |
24f753a8 PN |
241 | redisProcessCallbacks(ac); |
242 | } | |
243 | } | |
244 | ||
245 | void redisAsyncHandleWrite(redisAsyncContext *ac) { | |
246 | redisContext *c = &(ac->c); | |
247 | int done = 0; | |
248 | ||
249 | if (redisBufferWrite(c,&done) == REDIS_ERR) { | |
250 | __redisAsyncDisconnect(ac); | |
251 | } else { | |
252 | /* Continue writing when not done, stop writing otherwise */ | |
253 | if (!done) { | |
a1e97d69 | 254 | if (ac->evAddWrite) ac->evAddWrite(ac->_adapter_data); |
24f753a8 | 255 | } else { |
a1e97d69 | 256 | if (ac->evDelWrite) ac->evDelWrite(ac->_adapter_data); |
24f753a8 PN |
257 | } |
258 | ||
a1e97d69 PN |
259 | /* Always schedule reads after writes */ |
260 | if (ac->evAddRead) ac->evAddRead(ac->_adapter_data); | |
261 | ||
262 | /* Fire onConnect when this is the first write event. */ | |
263 | if (!(c->flags & REDIS_CONNECTED)) { | |
264 | c->flags |= REDIS_CONNECTED; | |
265 | if (ac->onConnect) ac->onConnect(ac); | |
266 | } | |
24f753a8 PN |
267 | } |
268 | } | |
269 | ||
270 | /* Helper function for the redisAsyncCommand* family of functions. | |
271 | * | |
272 | * Write a formatted command to the output buffer and register the provided | |
273 | * callback function with the context. | |
274 | */ | |
275 | static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) { | |
276 | redisContext *c = &(ac->c); | |
277 | redisCallback cb; | |
278 | ||
279 | /* Don't accept new commands when the connection is lazily closed. */ | |
280 | if (c->flags & REDIS_DISCONNECTING) return REDIS_ERR; | |
281 | __redisAppendCommand(c,cmd,len); | |
282 | ||
283 | /* Store callback */ | |
284 | cb.fn = fn; | |
285 | cb.privdata = privdata; | |
286 | __redisPushCallback(&ac->replies,&cb); | |
287 | ||
288 | /* Always schedule a write when the write buffer is non-empty */ | |
a1e97d69 | 289 | if (ac->evAddWrite) ac->evAddWrite(ac->_adapter_data); |
24f753a8 PN |
290 | |
291 | return REDIS_OK; | |
292 | } | |
293 | ||
294 | int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) { | |
295 | char *cmd; | |
296 | int len; | |
297 | int status; | |
298 | len = redisvFormatCommand(&cmd,format,ap); | |
299 | status = __redisAsyncCommand(ac,fn,privdata,cmd,len); | |
300 | free(cmd); | |
301 | return status; | |
302 | } | |
303 | ||
304 | int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) { | |
305 | va_list ap; | |
306 | int status; | |
307 | va_start(ap,format); | |
308 | status = redisvAsyncCommand(ac,fn,privdata,format,ap); | |
309 | va_end(ap); | |
310 | return status; | |
311 | } | |
312 | ||
313 | int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) { | |
314 | char *cmd; | |
315 | int len; | |
316 | int status; | |
317 | len = redisFormatCommandArgv(&cmd,argc,argv,argvlen); | |
318 | status = __redisAsyncCommand(ac,fn,privdata,cmd,len); | |
319 | free(cmd); | |
320 | return status; | |
321 | } |