]>
Commit | Line | Data |
---|---|---|
24f753a8 PN |
1 | /* |
2 | * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com> | |
3 | * All rights reserved. | |
4 | * | |
5 | * Redistribution and use in source and binary forms, with or without | |
6 | * modification, are permitted provided that the following conditions are met: | |
7 | * | |
8 | * * Redistributions of source code must retain the above copyright notice, | |
9 | * this list of conditions and the following disclaimer. | |
10 | * * Redistributions in binary form must reproduce the above copyright | |
11 | * notice, this list of conditions and the following disclaimer in the | |
12 | * documentation and/or other materials provided with the distribution. | |
13 | * * Neither the name of Redis nor the names of its contributors may be used | |
14 | * to endorse or promote products derived from this software without | |
15 | * specific prior written permission. | |
16 | * | |
17 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
18 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
19 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
20 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE | |
21 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
22 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
23 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
24 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
25 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
26 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
27 | * POSSIBILITY OF SUCH DAMAGE. | |
28 | */ | |
29 | ||
30 | #include <string.h> | |
31 | #include <assert.h> | |
32 | #include "async.h" | |
33 | #include "sds.h" | |
34 | #include "util.h" | |
35 | ||
36 | /* Forward declaration of function in hiredis.c */ | |
37 | void __redisAppendCommand(redisContext *c, char *cmd, size_t len); | |
38 | ||
39 | static redisAsyncContext *redisAsyncInitialize(redisContext *c) { | |
40 | redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext)); | |
41 | /* Set all bytes in the async part of the context to 0 */ | |
42 | memset(ac+sizeof(redisContext),0,sizeof(redisAsyncContext)-sizeof(redisContext)); | |
43 | return ac; | |
44 | } | |
45 | ||
46 | /* We want the error field to be accessible directly instead of requiring | |
47 | * an indirection to the redisContext struct. */ | |
48 | static void __redisAsyncCopyError(redisAsyncContext *ac) { | |
49 | redisContext *c = &(ac->c); | |
50 | ac->err = c->err; | |
51 | ac->errstr = c->errstr; | |
52 | } | |
53 | ||
54 | redisAsyncContext *redisAsyncConnect(const char *ip, int port) { | |
55 | redisContext *c = redisConnectNonBlock(ip,port); | |
56 | redisAsyncContext *ac = redisAsyncInitialize(c); | |
57 | __redisAsyncCopyError(ac); | |
58 | return ac; | |
59 | } | |
60 | ||
61 | redisAsyncContext *redisAsyncConnectUnix(const char *path) { | |
62 | redisContext *c = redisConnectUnixNonBlock(path); | |
63 | redisAsyncContext *ac = redisAsyncInitialize(c); | |
64 | __redisAsyncCopyError(ac); | |
65 | return ac; | |
66 | } | |
67 | ||
68 | int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn) { | |
69 | redisContext *c = &(ac->c); | |
70 | return redisSetReplyObjectFunctions(c,fn); | |
71 | } | |
72 | ||
73 | int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) { | |
74 | if (ac->onDisconnect == NULL) { | |
75 | ac->onDisconnect = fn; | |
76 | return REDIS_OK; | |
77 | } | |
78 | return REDIS_ERR; | |
79 | } | |
80 | ||
81 | /* Helper functions to push/shift callbacks */ | |
82 | static int __redisPushCallback(redisCallbackList *list, redisCallback *source) { | |
83 | redisCallback *cb; | |
84 | ||
85 | /* Copy callback from stack to heap */ | |
86 | cb = calloc(1,sizeof(*cb)); | |
87 | if (!cb) redisOOM(); | |
88 | if (source != NULL) { | |
89 | cb->fn = source->fn; | |
90 | cb->privdata = source->privdata; | |
91 | } | |
92 | ||
93 | /* Store callback in list */ | |
94 | if (list->head == NULL) | |
95 | list->head = cb; | |
96 | if (list->tail != NULL) | |
97 | list->tail->next = cb; | |
98 | list->tail = cb; | |
99 | return REDIS_OK; | |
100 | } | |
101 | ||
102 | static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) { | |
103 | redisCallback *cb = list->head; | |
104 | if (cb != NULL) { | |
105 | list->head = cb->next; | |
106 | if (cb == list->tail) | |
107 | list->tail = NULL; | |
108 | ||
109 | /* Copy callback from heap to stack */ | |
110 | if (target != NULL) | |
111 | memcpy(target,cb,sizeof(*cb)); | |
112 | free(cb); | |
113 | return REDIS_OK; | |
114 | } | |
115 | return REDIS_ERR; | |
116 | } | |
117 | ||
118 | /* Tries to do a clean disconnect from Redis, meaning it stops new commands | |
119 | * from being issued, but tries to flush the output buffer and execute | |
120 | * callbacks for all remaining replies. | |
121 | * | |
122 | * This functions is generally called from within a callback, so the | |
123 | * processCallbacks function will pick up the flag when there are no | |
124 | * more replies. */ | |
125 | void redisAsyncDisconnect(redisAsyncContext *ac) { | |
126 | redisContext *c = &(ac->c); | |
127 | c->flags |= REDIS_DISCONNECTING; | |
128 | } | |
129 | ||
130 | /* Helper function to make the disconnect happen and clean up. */ | |
131 | static void __redisAsyncDisconnect(redisAsyncContext *ac) { | |
132 | redisContext *c = &(ac->c); | |
133 | redisCallback cb; | |
134 | int status; | |
135 | ||
136 | /* Make sure error is accessible if there is any */ | |
137 | __redisAsyncCopyError(ac); | |
138 | status = (ac->err == 0) ? REDIS_OK : REDIS_ERR; | |
139 | ||
140 | if (status == REDIS_OK) { | |
141 | /* When the connection is cleanly disconnected, there should not | |
142 | * be pending callbacks. */ | |
143 | assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR); | |
144 | } else { | |
145 | /* Callbacks should not be able to issue new commands. */ | |
146 | c->flags |= REDIS_DISCONNECTING; | |
147 | ||
148 | /* Execute pending callbacks with NULL reply. */ | |
149 | while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) { | |
150 | if (cb.fn != NULL) | |
151 | cb.fn(ac,NULL,cb.privdata); | |
152 | } | |
153 | } | |
154 | ||
155 | /* Signal event lib to clean up */ | |
156 | if (ac->evCleanup) ac->evCleanup(ac->data); | |
157 | ||
158 | /* Execute callback with proper status */ | |
159 | if (ac->onDisconnect) ac->onDisconnect(ac,status); | |
160 | ||
161 | /* Cleanup self */ | |
162 | redisFree(c); | |
163 | } | |
164 | ||
165 | void redisProcessCallbacks(redisAsyncContext *ac) { | |
166 | redisContext *c = &(ac->c); | |
167 | redisCallback cb; | |
168 | void *reply = NULL; | |
169 | int status; | |
170 | ||
171 | while((status = redisGetReply(c,&reply)) == REDIS_OK) { | |
172 | if (reply == NULL) { | |
173 | /* When the connection is being disconnected and there are | |
174 | * no more replies, this is the cue to really disconnect. */ | |
175 | if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) { | |
176 | __redisAsyncDisconnect(ac); | |
177 | return; | |
178 | } | |
179 | ||
180 | /* When the connection is not being disconnected, simply stop | |
181 | * trying to get replies and wait for the next loop tick. */ | |
182 | break; | |
183 | } | |
184 | ||
185 | /* Shift callback and execute it */ | |
186 | assert(__redisShiftCallback(&ac->replies,&cb) == REDIS_OK); | |
187 | if (cb.fn != NULL) { | |
188 | cb.fn(ac,reply,cb.privdata); | |
189 | } else { | |
190 | c->fn->freeObject(reply); | |
191 | } | |
192 | } | |
193 | ||
194 | /* Disconnect when there was an error reading the reply */ | |
195 | if (status != REDIS_OK) | |
196 | __redisAsyncDisconnect(ac); | |
197 | } | |
198 | ||
199 | /* This function should be called when the socket is readable. | |
200 | * It processes all replies that can be read and executes their callbacks. | |
201 | */ | |
202 | void redisAsyncHandleRead(redisAsyncContext *ac) { | |
203 | redisContext *c = &(ac->c); | |
204 | ||
205 | if (redisBufferRead(c) == REDIS_ERR) { | |
206 | __redisAsyncDisconnect(ac); | |
207 | } else { | |
208 | /* Always re-schedule reads */ | |
209 | if (ac->evAddRead) ac->evAddRead(ac->data); | |
210 | redisProcessCallbacks(ac); | |
211 | } | |
212 | } | |
213 | ||
214 | void redisAsyncHandleWrite(redisAsyncContext *ac) { | |
215 | redisContext *c = &(ac->c); | |
216 | int done = 0; | |
217 | ||
218 | if (redisBufferWrite(c,&done) == REDIS_ERR) { | |
219 | __redisAsyncDisconnect(ac); | |
220 | } else { | |
221 | /* Continue writing when not done, stop writing otherwise */ | |
222 | if (!done) { | |
223 | if (ac->evAddWrite) ac->evAddWrite(ac->data); | |
224 | } else { | |
225 | if (ac->evDelWrite) ac->evDelWrite(ac->data); | |
226 | } | |
227 | ||
228 | /* Always schedule reads when something was written */ | |
229 | if (ac->evAddRead) ac->evAddRead(ac->data); | |
230 | } | |
231 | } | |
232 | ||
233 | /* Helper function for the redisAsyncCommand* family of functions. | |
234 | * | |
235 | * Write a formatted command to the output buffer and register the provided | |
236 | * callback function with the context. | |
237 | */ | |
238 | static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) { | |
239 | redisContext *c = &(ac->c); | |
240 | redisCallback cb; | |
241 | ||
242 | /* Don't accept new commands when the connection is lazily closed. */ | |
243 | if (c->flags & REDIS_DISCONNECTING) return REDIS_ERR; | |
244 | __redisAppendCommand(c,cmd,len); | |
245 | ||
246 | /* Store callback */ | |
247 | cb.fn = fn; | |
248 | cb.privdata = privdata; | |
249 | __redisPushCallback(&ac->replies,&cb); | |
250 | ||
251 | /* Always schedule a write when the write buffer is non-empty */ | |
252 | if (ac->evAddWrite) ac->evAddWrite(ac->data); | |
253 | ||
254 | return REDIS_OK; | |
255 | } | |
256 | ||
257 | int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) { | |
258 | char *cmd; | |
259 | int len; | |
260 | int status; | |
261 | len = redisvFormatCommand(&cmd,format,ap); | |
262 | status = __redisAsyncCommand(ac,fn,privdata,cmd,len); | |
263 | free(cmd); | |
264 | return status; | |
265 | } | |
266 | ||
267 | int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) { | |
268 | va_list ap; | |
269 | int status; | |
270 | va_start(ap,format); | |
271 | status = redisvAsyncCommand(ac,fn,privdata,format,ap); | |
272 | va_end(ap); | |
273 | return status; | |
274 | } | |
275 | ||
276 | int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) { | |
277 | char *cmd; | |
278 | int len; | |
279 | int status; | |
280 | len = redisFormatCommandArgv(&cmd,argc,argv,argvlen); | |
281 | status = __redisAsyncCommand(ac,fn,privdata,cmd,len); | |
282 | free(cmd); | |
283 | return status; | |
284 | } |