]>
Commit | Line | Data |
---|---|---|
d288ee65 | 1 | /* |
2 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> | |
3 | * All rights reserved. | |
4 | * | |
5 | * Redistribution and use in source and binary forms, with or without | |
6 | * modification, are permitted provided that the following conditions are met: | |
7 | * | |
8 | * * Redistributions of source code must retain the above copyright notice, | |
9 | * this list of conditions and the following disclaimer. | |
10 | * * Redistributions in binary form must reproduce the above copyright | |
11 | * notice, this list of conditions and the following disclaimer in the | |
12 | * documentation and/or other materials provided with the distribution. | |
13 | * * Neither the name of Redis nor the names of its contributors may be used | |
14 | * to endorse or promote products derived from this software without | |
15 | * specific prior written permission. | |
16 | * | |
17 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
18 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
19 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
20 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE | |
21 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
22 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
23 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
24 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
25 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
26 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
27 | * POSSIBILITY OF SUCH DAMAGE. | |
28 | */ | |
29 | ||
e2641e09 | 30 | #include "redis.h" |
31 | ||
32 | /* ================================ MULTI/EXEC ============================== */ | |
33 | ||
34 | /* Client state initialization for MULTI/EXEC */ | |
35 | void initClientMultiState(redisClient *c) { | |
36 | c->mstate.commands = NULL; | |
37 | c->mstate.count = 0; | |
38 | } | |
39 | ||
40 | /* Release all the resources associated with MULTI/EXEC state */ | |
41 | void freeClientMultiState(redisClient *c) { | |
42 | int j; | |
43 | ||
44 | for (j = 0; j < c->mstate.count; j++) { | |
45 | int i; | |
46 | multiCmd *mc = c->mstate.commands+j; | |
47 | ||
48 | for (i = 0; i < mc->argc; i++) | |
49 | decrRefCount(mc->argv[i]); | |
50 | zfree(mc->argv); | |
51 | } | |
52 | zfree(c->mstate.commands); | |
53 | } | |
54 | ||
55 | /* Add a new command into the MULTI commands queue */ | |
09e2d9ee | 56 | void queueMultiCommand(redisClient *c) { |
e2641e09 | 57 | multiCmd *mc; |
58 | int j; | |
59 | ||
60 | c->mstate.commands = zrealloc(c->mstate.commands, | |
61 | sizeof(multiCmd)*(c->mstate.count+1)); | |
62 | mc = c->mstate.commands+c->mstate.count; | |
09e2d9ee | 63 | mc->cmd = c->cmd; |
e2641e09 | 64 | mc->argc = c->argc; |
65 | mc->argv = zmalloc(sizeof(robj*)*c->argc); | |
66 | memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc); | |
67 | for (j = 0; j < c->argc; j++) | |
68 | incrRefCount(mc->argv[j]); | |
69 | c->mstate.count++; | |
70 | } | |
71 | ||
05406168 | 72 | void discardTransaction(redisClient *c) { |
73 | freeClientMultiState(c); | |
74 | initClientMultiState(c); | |
41f0f927 | 75 | c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);; |
05406168 | 76 | unwatchAllKeys(c); |
77 | } | |
78 | ||
41f0f927 | 79 | /* Flag the transacation as DIRTY_EXEC so that EXEC will fail. |
80 | * Should be called every time there is an error while queueing a command. */ | |
81 | void flagTransaction(redisClient *c) { | |
82 | if (c->flags & REDIS_MULTI) | |
83 | c->flags |= REDIS_DIRTY_EXEC; | |
84 | } | |
85 | ||
e2641e09 | 86 | void multiCommand(redisClient *c) { |
87 | if (c->flags & REDIS_MULTI) { | |
3ab20376 | 88 | addReplyError(c,"MULTI calls can not be nested"); |
e2641e09 | 89 | return; |
90 | } | |
91 | c->flags |= REDIS_MULTI; | |
92 | addReply(c,shared.ok); | |
93 | } | |
94 | ||
95 | void discardCommand(redisClient *c) { | |
96 | if (!(c->flags & REDIS_MULTI)) { | |
3ab20376 | 97 | addReplyError(c,"DISCARD without MULTI"); |
e2641e09 | 98 | return; |
99 | } | |
05406168 | 100 | discardTransaction(c); |
e2641e09 | 101 | addReply(c,shared.ok); |
102 | } | |
103 | ||
104 | /* Send a MULTI command to all the slaves and AOF file. Check the execCommand | |
105 | * implememntation for more information. */ | |
106 | void execCommandReplicateMulti(redisClient *c) { | |
e2641e09 | 107 | robj *multistring = createStringObject("MULTI",5); |
108 | ||
e394114d | 109 | if (server.aof_state != REDIS_AOF_OFF) |
1b1f47c9 | 110 | feedAppendOnlyFile(server.multiCommand,c->db->id,&multistring,1); |
e2641e09 | 111 | if (listLength(server.slaves)) |
112 | replicationFeedSlaves(server.slaves,c->db->id,&multistring,1); | |
113 | decrRefCount(multistring); | |
114 | } | |
115 | ||
116 | void execCommand(redisClient *c) { | |
117 | int j; | |
118 | robj **orig_argv; | |
119 | int orig_argc; | |
09e2d9ee | 120 | struct redisCommand *orig_cmd; |
e2641e09 | 121 | |
122 | if (!(c->flags & REDIS_MULTI)) { | |
3ab20376 | 123 | addReplyError(c,"EXEC without MULTI"); |
e2641e09 | 124 | return; |
125 | } | |
126 | ||
41f0f927 | 127 | /* Check if we need to abort the EXEC because: |
128 | * 1) Some WATCHed key was touched. | |
129 | * 2) There was a previous error while queueing commands. | |
130 | * A failed EXEC in the first case returns a multi bulk nil object | |
131 | * (technically it is not an error but a special behavior), while | |
132 | * in the second an EXECABORT error is returned. */ | |
133 | if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) { | |
134 | addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr : | |
135 | shared.nullmultibulk); | |
e2641e09 | 136 | freeClientMultiState(c); |
137 | initClientMultiState(c); | |
41f0f927 | 138 | c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC); |
e2641e09 | 139 | unwatchAllKeys(c); |
21645232 | 140 | goto handle_monitor; |
e2641e09 | 141 | } |
142 | ||
143 | /* Replicate a MULTI request now that we are sure the block is executed. | |
144 | * This way we'll deliver the MULTI/..../EXEC block as a whole and | |
145 | * both the AOF and the replication link will have the same consistency | |
146 | * and atomicity guarantees. */ | |
147 | execCommandReplicateMulti(c); | |
148 | ||
149 | /* Exec all the queued commands */ | |
150 | unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ | |
151 | orig_argv = c->argv; | |
152 | orig_argc = c->argc; | |
09e2d9ee | 153 | orig_cmd = c->cmd; |
0537e7bf | 154 | addReplyMultiBulkLen(c,c->mstate.count); |
e2641e09 | 155 | for (j = 0; j < c->mstate.count; j++) { |
156 | c->argc = c->mstate.commands[j].argc; | |
157 | c->argv = c->mstate.commands[j].argv; | |
09e2d9ee | 158 | c->cmd = c->mstate.commands[j].cmd; |
ce8b772b | 159 | call(c,REDIS_CALL_FULL); |
6c682e55 PN |
160 | |
161 | /* Commands may alter argc/argv, restore mstate. */ | |
162 | c->mstate.commands[j].argc = c->argc; | |
163 | c->mstate.commands[j].argv = c->argv; | |
09e2d9ee | 164 | c->mstate.commands[j].cmd = c->cmd; |
e2641e09 | 165 | } |
166 | c->argv = orig_argv; | |
167 | c->argc = orig_argc; | |
09e2d9ee | 168 | c->cmd = orig_cmd; |
e2641e09 | 169 | freeClientMultiState(c); |
170 | initClientMultiState(c); | |
41f0f927 | 171 | c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC); |
e2641e09 | 172 | /* Make sure the EXEC command is always replicated / AOF, since we |
173 | * always send the MULTI command (we can't know beforehand if the | |
174 | * next operations will contain at least a modification to the DB). */ | |
175 | server.dirty++; | |
21645232 | 176 | |
177 | handle_monitor: | |
178 | /* Send EXEC to clients waiting data from MONITOR. We do it here | |
179 | * since the natural order of commands execution is actually: | |
180 | * MUTLI, EXEC, ... commands inside transaction ... | |
181 | * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command | |
182 | * table, and we do it here with correct ordering. */ | |
183 | if (listLength(server.monitors) && !server.loading) | |
184 | replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); | |
e2641e09 | 185 | } |
186 | ||
187 | /* ===================== WATCH (CAS alike for MULTI/EXEC) =================== | |
188 | * | |
189 | * The implementation uses a per-DB hash table mapping keys to list of clients | |
190 | * WATCHing those keys, so that given a key that is going to be modified | |
191 | * we can mark all the associated clients as dirty. | |
192 | * | |
193 | * Also every client contains a list of WATCHed keys so that's possible to | |
194 | * un-watch such keys when the client is freed or when UNWATCH is called. */ | |
195 | ||
196 | /* In the client->watched_keys list we need to use watchedKey structures | |
197 | * as in order to identify a key in Redis we need both the key name and the | |
198 | * DB */ | |
199 | typedef struct watchedKey { | |
200 | robj *key; | |
201 | redisDb *db; | |
202 | } watchedKey; | |
203 | ||
204 | /* Watch for the specified key */ | |
205 | void watchForKey(redisClient *c, robj *key) { | |
206 | list *clients = NULL; | |
207 | listIter li; | |
208 | listNode *ln; | |
209 | watchedKey *wk; | |
210 | ||
211 | /* Check if we are already watching for this key */ | |
212 | listRewind(c->watched_keys,&li); | |
213 | while((ln = listNext(&li))) { | |
214 | wk = listNodeValue(ln); | |
215 | if (wk->db == c->db && equalStringObjects(key,wk->key)) | |
216 | return; /* Key already watched */ | |
217 | } | |
218 | /* This key is not already watched in this DB. Let's add it */ | |
219 | clients = dictFetchValue(c->db->watched_keys,key); | |
220 | if (!clients) { | |
221 | clients = listCreate(); | |
222 | dictAdd(c->db->watched_keys,key,clients); | |
223 | incrRefCount(key); | |
224 | } | |
225 | listAddNodeTail(clients,c); | |
226 | /* Add the new key to the lits of keys watched by this client */ | |
227 | wk = zmalloc(sizeof(*wk)); | |
228 | wk->key = key; | |
229 | wk->db = c->db; | |
230 | incrRefCount(key); | |
231 | listAddNodeTail(c->watched_keys,wk); | |
232 | } | |
233 | ||
234 | /* Unwatch all the keys watched by this client. To clean the EXEC dirty | |
235 | * flag is up to the caller. */ | |
236 | void unwatchAllKeys(redisClient *c) { | |
237 | listIter li; | |
238 | listNode *ln; | |
239 | ||
240 | if (listLength(c->watched_keys) == 0) return; | |
241 | listRewind(c->watched_keys,&li); | |
242 | while((ln = listNext(&li))) { | |
243 | list *clients; | |
244 | watchedKey *wk; | |
245 | ||
246 | /* Lookup the watched key -> clients list and remove the client | |
247 | * from the list */ | |
248 | wk = listNodeValue(ln); | |
249 | clients = dictFetchValue(wk->db->watched_keys, wk->key); | |
eab0e26e | 250 | redisAssertWithInfo(c,NULL,clients != NULL); |
e2641e09 | 251 | listDelNode(clients,listSearchKey(clients,c)); |
252 | /* Kill the entry at all if this was the only client */ | |
253 | if (listLength(clients) == 0) | |
254 | dictDelete(wk->db->watched_keys, wk->key); | |
255 | /* Remove this watched key from the client->watched list */ | |
256 | listDelNode(c->watched_keys,ln); | |
257 | decrRefCount(wk->key); | |
258 | zfree(wk); | |
259 | } | |
260 | } | |
261 | ||
262 | /* "Touch" a key, so that if this key is being WATCHed by some client the | |
263 | * next EXEC will fail. */ | |
264 | void touchWatchedKey(redisDb *db, robj *key) { | |
265 | list *clients; | |
266 | listIter li; | |
267 | listNode *ln; | |
268 | ||
269 | if (dictSize(db->watched_keys) == 0) return; | |
270 | clients = dictFetchValue(db->watched_keys, key); | |
271 | if (!clients) return; | |
272 | ||
273 | /* Mark all the clients watching this key as REDIS_DIRTY_CAS */ | |
274 | /* Check if we are already watching for this key */ | |
275 | listRewind(clients,&li); | |
276 | while((ln = listNext(&li))) { | |
277 | redisClient *c = listNodeValue(ln); | |
278 | ||
279 | c->flags |= REDIS_DIRTY_CAS; | |
280 | } | |
281 | } | |
282 | ||
283 | /* On FLUSHDB or FLUSHALL all the watched keys that are present before the | |
284 | * flush but will be deleted as effect of the flushing operation should | |
285 | * be touched. "dbid" is the DB that's getting the flush. -1 if it is | |
286 | * a FLUSHALL operation (all the DBs flushed). */ | |
287 | void touchWatchedKeysOnFlush(int dbid) { | |
288 | listIter li1, li2; | |
289 | listNode *ln; | |
290 | ||
291 | /* For every client, check all the waited keys */ | |
292 | listRewind(server.clients,&li1); | |
293 | while((ln = listNext(&li1))) { | |
294 | redisClient *c = listNodeValue(ln); | |
295 | listRewind(c->watched_keys,&li2); | |
296 | while((ln = listNext(&li2))) { | |
297 | watchedKey *wk = listNodeValue(ln); | |
298 | ||
299 | /* For every watched key matching the specified DB, if the | |
300 | * key exists, mark the client as dirty, as the key will be | |
301 | * removed. */ | |
302 | if (dbid == -1 || wk->db->id == dbid) { | |
303 | if (dictFind(wk->db->dict, wk->key->ptr) != NULL) | |
304 | c->flags |= REDIS_DIRTY_CAS; | |
305 | } | |
306 | } | |
307 | } | |
308 | } | |
309 | ||
310 | void watchCommand(redisClient *c) { | |
311 | int j; | |
312 | ||
313 | if (c->flags & REDIS_MULTI) { | |
3ab20376 | 314 | addReplyError(c,"WATCH inside MULTI is not allowed"); |
e2641e09 | 315 | return; |
316 | } | |
317 | for (j = 1; j < c->argc; j++) | |
318 | watchForKey(c,c->argv[j]); | |
319 | addReply(c,shared.ok); | |
320 | } | |
321 | ||
322 | void unwatchCommand(redisClient *c) { | |
323 | unwatchAllKeys(c); | |
324 | c->flags &= (~REDIS_DIRTY_CAS); | |
325 | addReply(c,shared.ok); | |
326 | } |