| 1 | #include "redis.h" |
| 2 | |
| 3 | /* ================================ MULTI/EXEC ============================== */ |
| 4 | |
| 5 | /* Client state initialization for MULTI/EXEC */ |
| 6 | void initClientMultiState(redisClient *c) { |
| 7 | c->mstate.commands = NULL; |
| 8 | c->mstate.count = 0; |
| 9 | } |
| 10 | |
| 11 | /* Release all the resources associated with MULTI/EXEC state */ |
| 12 | void freeClientMultiState(redisClient *c) { |
| 13 | int j; |
| 14 | |
| 15 | for (j = 0; j < c->mstate.count; j++) { |
| 16 | int i; |
| 17 | multiCmd *mc = c->mstate.commands+j; |
| 18 | |
| 19 | for (i = 0; i < mc->argc; i++) |
| 20 | decrRefCount(mc->argv[i]); |
| 21 | zfree(mc->argv); |
| 22 | } |
| 23 | zfree(c->mstate.commands); |
| 24 | } |
| 25 | |
| 26 | /* Add a new command into the MULTI commands queue */ |
| 27 | void queueMultiCommand(redisClient *c) { |
| 28 | multiCmd *mc; |
| 29 | int j; |
| 30 | |
| 31 | c->mstate.commands = zrealloc(c->mstate.commands, |
| 32 | sizeof(multiCmd)*(c->mstate.count+1)); |
| 33 | mc = c->mstate.commands+c->mstate.count; |
| 34 | mc->cmd = c->cmd; |
| 35 | mc->argc = c->argc; |
| 36 | mc->argv = zmalloc(sizeof(robj*)*c->argc); |
| 37 | memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc); |
| 38 | for (j = 0; j < c->argc; j++) |
| 39 | incrRefCount(mc->argv[j]); |
| 40 | c->mstate.count++; |
| 41 | } |
| 42 | |
| 43 | void discardTransaction(redisClient *c) { |
| 44 | freeClientMultiState(c); |
| 45 | initClientMultiState(c); |
| 46 | c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);; |
| 47 | unwatchAllKeys(c); |
| 48 | } |
| 49 | |
| 50 | void multiCommand(redisClient *c) { |
| 51 | if (c->flags & REDIS_MULTI) { |
| 52 | addReplyError(c,"MULTI calls can not be nested"); |
| 53 | return; |
| 54 | } |
| 55 | c->flags |= REDIS_MULTI; |
| 56 | addReply(c,shared.ok); |
| 57 | } |
| 58 | |
| 59 | void discardCommand(redisClient *c) { |
| 60 | if (!(c->flags & REDIS_MULTI)) { |
| 61 | addReplyError(c,"DISCARD without MULTI"); |
| 62 | return; |
| 63 | } |
| 64 | discardTransaction(c); |
| 65 | addReply(c,shared.ok); |
| 66 | } |
| 67 | |
| 68 | /* Send a MULTI command to all the slaves and AOF file. Check the execCommand |
| 69 | * implememntation for more information. */ |
| 70 | void execCommandReplicateMulti(redisClient *c) { |
| 71 | robj *multistring = createStringObject("MULTI",5); |
| 72 | |
| 73 | if (server.aof_state != REDIS_AOF_OFF) |
| 74 | feedAppendOnlyFile(server.multiCommand,c->db->id,&multistring,1); |
| 75 | if (listLength(server.slaves)) |
| 76 | replicationFeedSlaves(server.slaves,c->db->id,&multistring,1); |
| 77 | decrRefCount(multistring); |
| 78 | } |
| 79 | |
| 80 | void execCommand(redisClient *c) { |
| 81 | int j; |
| 82 | robj **orig_argv; |
| 83 | int orig_argc; |
| 84 | struct redisCommand *orig_cmd; |
| 85 | |
| 86 | if (!(c->flags & REDIS_MULTI)) { |
| 87 | addReplyError(c,"EXEC without MULTI"); |
| 88 | return; |
| 89 | } |
| 90 | |
| 91 | /* Check if we need to abort the EXEC if some WATCHed key was touched. |
| 92 | * A failed EXEC will return a multi bulk nil object. */ |
| 93 | if (c->flags & REDIS_DIRTY_CAS) { |
| 94 | freeClientMultiState(c); |
| 95 | initClientMultiState(c); |
| 96 | c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); |
| 97 | unwatchAllKeys(c); |
| 98 | addReply(c,shared.nullmultibulk); |
| 99 | return; |
| 100 | } |
| 101 | |
| 102 | /* Replicate a MULTI request now that we are sure the block is executed. |
| 103 | * This way we'll deliver the MULTI/..../EXEC block as a whole and |
| 104 | * both the AOF and the replication link will have the same consistency |
| 105 | * and atomicity guarantees. */ |
| 106 | execCommandReplicateMulti(c); |
| 107 | |
| 108 | /* Exec all the queued commands */ |
| 109 | unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ |
| 110 | orig_argv = c->argv; |
| 111 | orig_argc = c->argc; |
| 112 | orig_cmd = c->cmd; |
| 113 | addReplyMultiBulkLen(c,c->mstate.count); |
| 114 | for (j = 0; j < c->mstate.count; j++) { |
| 115 | c->argc = c->mstate.commands[j].argc; |
| 116 | c->argv = c->mstate.commands[j].argv; |
| 117 | c->cmd = c->mstate.commands[j].cmd; |
| 118 | call(c,REDIS_CALL_FULL); |
| 119 | |
| 120 | /* Commands may alter argc/argv, restore mstate. */ |
| 121 | c->mstate.commands[j].argc = c->argc; |
| 122 | c->mstate.commands[j].argv = c->argv; |
| 123 | c->mstate.commands[j].cmd = c->cmd; |
| 124 | } |
| 125 | c->argv = orig_argv; |
| 126 | c->argc = orig_argc; |
| 127 | c->cmd = orig_cmd; |
| 128 | freeClientMultiState(c); |
| 129 | initClientMultiState(c); |
| 130 | c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); |
| 131 | /* Make sure the EXEC command is always replicated / AOF, since we |
| 132 | * always send the MULTI command (we can't know beforehand if the |
| 133 | * next operations will contain at least a modification to the DB). */ |
| 134 | server.dirty++; |
| 135 | } |
| 136 | |
| 137 | /* ===================== WATCH (CAS alike for MULTI/EXEC) =================== |
| 138 | * |
| 139 | * The implementation uses a per-DB hash table mapping keys to list of clients |
| 140 | * WATCHing those keys, so that given a key that is going to be modified |
| 141 | * we can mark all the associated clients as dirty. |
| 142 | * |
| 143 | * Also every client contains a list of WATCHed keys so that's possible to |
| 144 | * un-watch such keys when the client is freed or when UNWATCH is called. */ |
| 145 | |
| 146 | /* In the client->watched_keys list we need to use watchedKey structures |
| 147 | * as in order to identify a key in Redis we need both the key name and the |
| 148 | * DB */ |
| 149 | typedef struct watchedKey { |
| 150 | robj *key; |
| 151 | redisDb *db; |
| 152 | } watchedKey; |
| 153 | |
| 154 | /* Watch for the specified key */ |
| 155 | void watchForKey(redisClient *c, robj *key) { |
| 156 | list *clients = NULL; |
| 157 | listIter li; |
| 158 | listNode *ln; |
| 159 | watchedKey *wk; |
| 160 | |
| 161 | /* Check if we are already watching for this key */ |
| 162 | listRewind(c->watched_keys,&li); |
| 163 | while((ln = listNext(&li))) { |
| 164 | wk = listNodeValue(ln); |
| 165 | if (wk->db == c->db && equalStringObjects(key,wk->key)) |
| 166 | return; /* Key already watched */ |
| 167 | } |
| 168 | /* This key is not already watched in this DB. Let's add it */ |
| 169 | clients = dictFetchValue(c->db->watched_keys,key); |
| 170 | if (!clients) { |
| 171 | clients = listCreate(); |
| 172 | dictAdd(c->db->watched_keys,key,clients); |
| 173 | incrRefCount(key); |
| 174 | } |
| 175 | listAddNodeTail(clients,c); |
| 176 | /* Add the new key to the lits of keys watched by this client */ |
| 177 | wk = zmalloc(sizeof(*wk)); |
| 178 | wk->key = key; |
| 179 | wk->db = c->db; |
| 180 | incrRefCount(key); |
| 181 | listAddNodeTail(c->watched_keys,wk); |
| 182 | } |
| 183 | |
| 184 | /* Unwatch all the keys watched by this client. To clean the EXEC dirty |
| 185 | * flag is up to the caller. */ |
| 186 | void unwatchAllKeys(redisClient *c) { |
| 187 | listIter li; |
| 188 | listNode *ln; |
| 189 | |
| 190 | if (listLength(c->watched_keys) == 0) return; |
| 191 | listRewind(c->watched_keys,&li); |
| 192 | while((ln = listNext(&li))) { |
| 193 | list *clients; |
| 194 | watchedKey *wk; |
| 195 | |
| 196 | /* Lookup the watched key -> clients list and remove the client |
| 197 | * from the list */ |
| 198 | wk = listNodeValue(ln); |
| 199 | clients = dictFetchValue(wk->db->watched_keys, wk->key); |
| 200 | redisAssertWithInfo(c,NULL,clients != NULL); |
| 201 | listDelNode(clients,listSearchKey(clients,c)); |
| 202 | /* Kill the entry at all if this was the only client */ |
| 203 | if (listLength(clients) == 0) |
| 204 | dictDelete(wk->db->watched_keys, wk->key); |
| 205 | /* Remove this watched key from the client->watched list */ |
| 206 | listDelNode(c->watched_keys,ln); |
| 207 | decrRefCount(wk->key); |
| 208 | zfree(wk); |
| 209 | } |
| 210 | } |
| 211 | |
| 212 | /* "Touch" a key, so that if this key is being WATCHed by some client the |
| 213 | * next EXEC will fail. */ |
| 214 | void touchWatchedKey(redisDb *db, robj *key) { |
| 215 | list *clients; |
| 216 | listIter li; |
| 217 | listNode *ln; |
| 218 | |
| 219 | if (dictSize(db->watched_keys) == 0) return; |
| 220 | clients = dictFetchValue(db->watched_keys, key); |
| 221 | if (!clients) return; |
| 222 | |
| 223 | /* Mark all the clients watching this key as REDIS_DIRTY_CAS */ |
| 224 | /* Check if we are already watching for this key */ |
| 225 | listRewind(clients,&li); |
| 226 | while((ln = listNext(&li))) { |
| 227 | redisClient *c = listNodeValue(ln); |
| 228 | |
| 229 | c->flags |= REDIS_DIRTY_CAS; |
| 230 | } |
| 231 | } |
| 232 | |
| 233 | /* On FLUSHDB or FLUSHALL all the watched keys that are present before the |
| 234 | * flush but will be deleted as effect of the flushing operation should |
| 235 | * be touched. "dbid" is the DB that's getting the flush. -1 if it is |
| 236 | * a FLUSHALL operation (all the DBs flushed). */ |
| 237 | void touchWatchedKeysOnFlush(int dbid) { |
| 238 | listIter li1, li2; |
| 239 | listNode *ln; |
| 240 | |
| 241 | /* For every client, check all the waited keys */ |
| 242 | listRewind(server.clients,&li1); |
| 243 | while((ln = listNext(&li1))) { |
| 244 | redisClient *c = listNodeValue(ln); |
| 245 | listRewind(c->watched_keys,&li2); |
| 246 | while((ln = listNext(&li2))) { |
| 247 | watchedKey *wk = listNodeValue(ln); |
| 248 | |
| 249 | /* For every watched key matching the specified DB, if the |
| 250 | * key exists, mark the client as dirty, as the key will be |
| 251 | * removed. */ |
| 252 | if (dbid == -1 || wk->db->id == dbid) { |
| 253 | if (dictFind(wk->db->dict, wk->key->ptr) != NULL) |
| 254 | c->flags |= REDIS_DIRTY_CAS; |
| 255 | } |
| 256 | } |
| 257 | } |
| 258 | } |
| 259 | |
| 260 | void watchCommand(redisClient *c) { |
| 261 | int j; |
| 262 | |
| 263 | if (c->flags & REDIS_MULTI) { |
| 264 | addReplyError(c,"WATCH inside MULTI is not allowed"); |
| 265 | return; |
| 266 | } |
| 267 | for (j = 1; j < c->argc; j++) |
| 268 | watchForKey(c,c->argv[j]); |
| 269 | addReply(c,shared.ok); |
| 270 | } |
| 271 | |
| 272 | void unwatchCommand(redisClient *c) { |
| 273 | unwatchAllKeys(c); |
| 274 | c->flags &= (~REDIS_DIRTY_CAS); |
| 275 | addReply(c,shared.ok); |
| 276 | } |