Commit | Line | Data |
---|---|---|
e2641e09 | 1 | #include "redis.h" |
2 | ||
3 | /* ================================ MULTI/EXEC ============================== */ | |
4 | ||
5 | /* Client state initialization for MULTI/EXEC */ | |
6 | void initClientMultiState(redisClient *c) { | |
7 | c->mstate.commands = NULL; | |
8 | c->mstate.count = 0; | |
9 | } | |
10 | ||
11 | /* Release all the resources associated with MULTI/EXEC state */ | |
12 | void freeClientMultiState(redisClient *c) { | |
13 | int j; | |
14 | ||
15 | for (j = 0; j < c->mstate.count; j++) { | |
16 | int i; | |
17 | multiCmd *mc = c->mstate.commands+j; | |
18 | ||
19 | for (i = 0; i < mc->argc; i++) | |
20 | decrRefCount(mc->argv[i]); | |
21 | zfree(mc->argv); | |
22 | } | |
23 | zfree(c->mstate.commands); | |
24 | } | |
25 | ||
26 | /* Add a new command into the MULTI commands queue */ | |
09e2d9ee | 27 | void queueMultiCommand(redisClient *c) { |
e2641e09 | 28 | multiCmd *mc; |
29 | int j; | |
30 | ||
31 | c->mstate.commands = zrealloc(c->mstate.commands, | |
32 | sizeof(multiCmd)*(c->mstate.count+1)); | |
33 | mc = c->mstate.commands+c->mstate.count; | |
09e2d9ee | 34 | mc->cmd = c->cmd; |
e2641e09 | 35 | mc->argc = c->argc; |
36 | mc->argv = zmalloc(sizeof(robj*)*c->argc); | |
37 | memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc); | |
38 | for (j = 0; j < c->argc; j++) | |
39 | incrRefCount(mc->argv[j]); | |
40 | c->mstate.count++; | |
41 | } | |
42 | ||
05406168 | 43 | void discardTransaction(redisClient *c) { |
44 | freeClientMultiState(c); | |
45 | initClientMultiState(c); | |
46 | c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);; | |
47 | unwatchAllKeys(c); | |
48 | } | |
49 | ||
e2641e09 | 50 | void multiCommand(redisClient *c) { |
51 | if (c->flags & REDIS_MULTI) { | |
3ab20376 | 52 | addReplyError(c,"MULTI calls can not be nested"); |
e2641e09 | 53 | return; |
54 | } | |
55 | c->flags |= REDIS_MULTI; | |
56 | addReply(c,shared.ok); | |
57 | } | |
58 | ||
59 | void discardCommand(redisClient *c) { | |
60 | if (!(c->flags & REDIS_MULTI)) { | |
3ab20376 | 61 | addReplyError(c,"DISCARD without MULTI"); |
e2641e09 | 62 | return; |
63 | } | |
05406168 | 64 | discardTransaction(c); |
e2641e09 | 65 | addReply(c,shared.ok); |
66 | } | |
67 | ||
68 | /* Send a MULTI command to all the slaves and AOF file. Check the execCommand | |
69 | * implememntation for more information. */ | |
70 | void execCommandReplicateMulti(redisClient *c) { | |
e2641e09 | 71 | robj *multistring = createStringObject("MULTI",5); |
72 | ||
e394114d | 73 | if (server.aof_state != REDIS_AOF_OFF) |
1b1f47c9 | 74 | feedAppendOnlyFile(server.multiCommand,c->db->id,&multistring,1); |
e2641e09 | 75 | if (listLength(server.slaves)) |
76 | replicationFeedSlaves(server.slaves,c->db->id,&multistring,1); | |
77 | decrRefCount(multistring); | |
78 | } | |
79 | ||
80 | void execCommand(redisClient *c) { | |
81 | int j; | |
82 | robj **orig_argv; | |
83 | int orig_argc; | |
09e2d9ee | 84 | struct redisCommand *orig_cmd; |
e2641e09 | 85 | |
86 | if (!(c->flags & REDIS_MULTI)) { | |
3ab20376 | 87 | addReplyError(c,"EXEC without MULTI"); |
e2641e09 | 88 | return; |
89 | } | |
90 | ||
91 | /* Check if we need to abort the EXEC if some WATCHed key was touched. | |
92 | * A failed EXEC will return a multi bulk nil object. */ | |
93 | if (c->flags & REDIS_DIRTY_CAS) { | |
94 | freeClientMultiState(c); | |
95 | initClientMultiState(c); | |
96 | c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); | |
97 | unwatchAllKeys(c); | |
98 | addReply(c,shared.nullmultibulk); | |
99 | return; | |
100 | } | |
101 | ||
102 | /* Replicate a MULTI request now that we are sure the block is executed. | |
103 | * This way we'll deliver the MULTI/..../EXEC block as a whole and | |
104 | * both the AOF and the replication link will have the same consistency | |
105 | * and atomicity guarantees. */ | |
106 | execCommandReplicateMulti(c); | |
107 | ||
108 | /* Exec all the queued commands */ | |
109 | unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ | |
110 | orig_argv = c->argv; | |
111 | orig_argc = c->argc; | |
09e2d9ee | 112 | orig_cmd = c->cmd; |
0537e7bf | 113 | addReplyMultiBulkLen(c,c->mstate.count); |
e2641e09 | 114 | for (j = 0; j < c->mstate.count; j++) { |
115 | c->argc = c->mstate.commands[j].argc; | |
116 | c->argv = c->mstate.commands[j].argv; | |
09e2d9ee | 117 | c->cmd = c->mstate.commands[j].cmd; |
ce8b772b | 118 | call(c,REDIS_CALL_FULL); |
6c682e55 PN |
119 | |
120 | /* Commands may alter argc/argv, restore mstate. */ | |
121 | c->mstate.commands[j].argc = c->argc; | |
122 | c->mstate.commands[j].argv = c->argv; | |
09e2d9ee | 123 | c->mstate.commands[j].cmd = c->cmd; |
e2641e09 | 124 | } |
125 | c->argv = orig_argv; | |
126 | c->argc = orig_argc; | |
09e2d9ee | 127 | c->cmd = orig_cmd; |
e2641e09 | 128 | freeClientMultiState(c); |
129 | initClientMultiState(c); | |
130 | c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); | |
131 | /* Make sure the EXEC command is always replicated / AOF, since we | |
132 | * always send the MULTI command (we can't know beforehand if the | |
133 | * next operations will contain at least a modification to the DB). */ | |
134 | server.dirty++; | |
135 | } | |
136 | ||
137 | /* ===================== WATCH (CAS alike for MULTI/EXEC) =================== | |
138 | * | |
139 | * The implementation uses a per-DB hash table mapping keys to list of clients | |
140 | * WATCHing those keys, so that given a key that is going to be modified | |
141 | * we can mark all the associated clients as dirty. | |
142 | * | |
143 | * Also every client contains a list of WATCHed keys so that's possible to | |
144 | * un-watch such keys when the client is freed or when UNWATCH is called. */ | |
145 | ||
146 | /* In the client->watched_keys list we need to use watchedKey structures | |
147 | * as in order to identify a key in Redis we need both the key name and the | |
148 | * DB */ | |
149 | typedef struct watchedKey { | |
150 | robj *key; | |
151 | redisDb *db; | |
152 | } watchedKey; | |
153 | ||
154 | /* Watch for the specified key */ | |
155 | void watchForKey(redisClient *c, robj *key) { | |
156 | list *clients = NULL; | |
157 | listIter li; | |
158 | listNode *ln; | |
159 | watchedKey *wk; | |
160 | ||
161 | /* Check if we are already watching for this key */ | |
162 | listRewind(c->watched_keys,&li); | |
163 | while((ln = listNext(&li))) { | |
164 | wk = listNodeValue(ln); | |
165 | if (wk->db == c->db && equalStringObjects(key,wk->key)) | |
166 | return; /* Key already watched */ | |
167 | } | |
168 | /* This key is not already watched in this DB. Let's add it */ | |
169 | clients = dictFetchValue(c->db->watched_keys,key); | |
170 | if (!clients) { | |
171 | clients = listCreate(); | |
172 | dictAdd(c->db->watched_keys,key,clients); | |
173 | incrRefCount(key); | |
174 | } | |
175 | listAddNodeTail(clients,c); | |
176 | /* Add the new key to the lits of keys watched by this client */ | |
177 | wk = zmalloc(sizeof(*wk)); | |
178 | wk->key = key; | |
179 | wk->db = c->db; | |
180 | incrRefCount(key); | |
181 | listAddNodeTail(c->watched_keys,wk); | |
182 | } | |
183 | ||
184 | /* Unwatch all the keys watched by this client. To clean the EXEC dirty | |
185 | * flag is up to the caller. */ | |
186 | void unwatchAllKeys(redisClient *c) { | |
187 | listIter li; | |
188 | listNode *ln; | |
189 | ||
190 | if (listLength(c->watched_keys) == 0) return; | |
191 | listRewind(c->watched_keys,&li); | |
192 | while((ln = listNext(&li))) { | |
193 | list *clients; | |
194 | watchedKey *wk; | |
195 | ||
196 | /* Lookup the watched key -> clients list and remove the client | |
197 | * from the list */ | |
198 | wk = listNodeValue(ln); | |
199 | clients = dictFetchValue(wk->db->watched_keys, wk->key); | |
eab0e26e | 200 | redisAssertWithInfo(c,NULL,clients != NULL); |
e2641e09 | 201 | listDelNode(clients,listSearchKey(clients,c)); |
202 | /* Kill the entry at all if this was the only client */ | |
203 | if (listLength(clients) == 0) | |
204 | dictDelete(wk->db->watched_keys, wk->key); | |
205 | /* Remove this watched key from the client->watched list */ | |
206 | listDelNode(c->watched_keys,ln); | |
207 | decrRefCount(wk->key); | |
208 | zfree(wk); | |
209 | } | |
210 | } | |
211 | ||
212 | /* "Touch" a key, so that if this key is being WATCHed by some client the | |
213 | * next EXEC will fail. */ | |
214 | void touchWatchedKey(redisDb *db, robj *key) { | |
215 | list *clients; | |
216 | listIter li; | |
217 | listNode *ln; | |
218 | ||
219 | if (dictSize(db->watched_keys) == 0) return; | |
220 | clients = dictFetchValue(db->watched_keys, key); | |
221 | if (!clients) return; | |
222 | ||
223 | /* Mark all the clients watching this key as REDIS_DIRTY_CAS */ | |
224 | /* Check if we are already watching for this key */ | |
225 | listRewind(clients,&li); | |
226 | while((ln = listNext(&li))) { | |
227 | redisClient *c = listNodeValue(ln); | |
228 | ||
229 | c->flags |= REDIS_DIRTY_CAS; | |
230 | } | |
231 | } | |
232 | ||
233 | /* On FLUSHDB or FLUSHALL all the watched keys that are present before the | |
234 | * flush but will be deleted as effect of the flushing operation should | |
235 | * be touched. "dbid" is the DB that's getting the flush. -1 if it is | |
236 | * a FLUSHALL operation (all the DBs flushed). */ | |
237 | void touchWatchedKeysOnFlush(int dbid) { | |
238 | listIter li1, li2; | |
239 | listNode *ln; | |
240 | ||
241 | /* For every client, check all the waited keys */ | |
242 | listRewind(server.clients,&li1); | |
243 | while((ln = listNext(&li1))) { | |
244 | redisClient *c = listNodeValue(ln); | |
245 | listRewind(c->watched_keys,&li2); | |
246 | while((ln = listNext(&li2))) { | |
247 | watchedKey *wk = listNodeValue(ln); | |
248 | ||
249 | /* For every watched key matching the specified DB, if the | |
250 | * key exists, mark the client as dirty, as the key will be | |
251 | * removed. */ | |
252 | if (dbid == -1 || wk->db->id == dbid) { | |
253 | if (dictFind(wk->db->dict, wk->key->ptr) != NULL) | |
254 | c->flags |= REDIS_DIRTY_CAS; | |
255 | } | |
256 | } | |
257 | } | |
258 | } | |
259 | ||
260 | void watchCommand(redisClient *c) { | |
261 | int j; | |
262 | ||
263 | if (c->flags & REDIS_MULTI) { | |
3ab20376 | 264 | addReplyError(c,"WATCH inside MULTI is not allowed"); |
e2641e09 | 265 | return; |
266 | } | |
267 | for (j = 1; j < c->argc; j++) | |
268 | watchForKey(c,c->argv[j]); | |
269 | addReply(c,shared.ok); | |
270 | } | |
271 | ||
272 | void unwatchCommand(redisClient *c) { | |
273 | unwatchAllKeys(c); | |
274 | c->flags &= (~REDIS_DIRTY_CAS); | |
275 | addReply(c,shared.ok); | |
276 | } |