#define REDIS_MULTI 8 /* This client is in a MULTI context */
#define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */
#define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */
+#define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */
/* Slave replication state - slave side */
#define REDIS_REPL_NONE 0 /* No active replication */
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
- dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
+ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *io_keys; /* Keys with clients waiting for VM I/O */
+ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
} redisDb;
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
multiState mstate; /* MULTI/EXEC state */
- robj **blockingkeys; /* The key we are waiting to terminate a blocking
+ robj **blocking_keys; /* The key we are waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
- int blockingkeysnum; /* Number of blocking keys */
+ int blocking_keys_num; /* Number of blocking keys */
time_t blockingto; /* Blocking operation timeout. If UNIX current time
* is >= blockingto then the operation timed out. */
list *io_keys; /* Keys this client is waiting to be loaded from the
* swap file in order to continue. */
+ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
} redisClient;
static int rewriteAppendOnlyFileBackground(void);
static int vmSwapObjectBlocking(robj *key, robj *val);
static int prepareForShutdown();
+static void touchWatchedKey(redisDb *db, robj *key);
+static void unwatchAllKeys(redisClient *c);
static void authCommand(redisClient *c);
static void pingCommand(redisClient *c);
static void psubscribeCommand(redisClient *c);
static void punsubscribeCommand(redisClient *c);
static void publishCommand(redisClient *c);
+static void watchCommand(redisClient *c);
+static void unwatchCommand(redisClient *c);
/*================================= Globals ================================= */
{"psubscribe",psubscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0},
{"publish",publishCommand,3,REDIS_CMD_BULK|REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0},
+ {"watch",watchCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
+ {"unwatch",unwatchCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
{NULL,NULL,0,0,NULL,0,0,0}
};
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
- server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
+ server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
+ server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
if (server.vm_enabled)
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
if (c->flags & REDIS_BLOCKED)
unblockClientWaitingData(c);
+ /* UNWATCH all the keys */
+ unwatchAllKeys(c);
+ listRelease(c->watched_keys);
/* Unsubscribe from all the pubsub channels */
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeAllPatterns(c,0);
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
- /* Remove from the list of clients waiting for swapped keys */
+ /* Remove from the list of clients that are now ready to be restarted
+ * after waiting for swapped keys */
if (c->flags & REDIS_IO_WAIT && listLength(c->io_keys) == 0) {
ln = listSearchKey(server.io_ready_clients,c);
if (ln) {
server.vm_blocked_clients--;
}
}
+ /* Remove from the list of clients waiting for swapped keys */
while (server.vm_enabled && listLength(c->io_keys)) {
ln = listFirst(c->io_keys);
dontWaitForSwappedKey(c,ln->value);
c->reply = listCreate();
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
- c->blockingkeys = NULL;
- c->blockingkeysnum = 0;
+ c->blocking_keys = NULL;
+ c->blocking_keys_num = 0;
c->io_keys = listCreate();
listSetFreeMethod(c->io_keys,decrRefCount);
c->pubsub_channels = dictCreate(&setDictType,NULL);
static robj *lookupKeyWrite(redisDb *db, robj *key) {
deleteIfVolatile(db,key);
+ touchWatchedKey(db,key);
return lookupKey(db,key);
}
}
}
+ touchWatchedKey(c->db,key);
if (nx) deleteIfVolatile(c->db,key);
retval = dictAdd(c->db->dict,key,val);
if (retval == DICT_ERR) {
for (j = 1; j < c->argc; j++) {
if (deleteKey(c->db,c->argv[j])) {
+ touchWatchedKey(c->db,c->argv[j]);
server.dirty++;
deleted++;
}
return;
}
+ /* Check if we need to abort the EXEC if some WATCHed key was touched.
+ * A failed EXEC will return a multi bulk nil object. */
+ if (c->flags & REDIS_DIRTY_CAS) {
+ freeClientMultiState(c);
+ initClientMultiState(c);
+ c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
+ unwatchAllKeys(c);
+ addReply(c,shared.nullmultibulk);
+ return;
+ }
+
/* Replicate a MULTI request now that we are sure the block is executed.
* This way we'll deliver the MULTI/..../EXEC block as a whole and
* both the AOF and the replication link will have the same consistency
freeClientMultiState(c);
initClientMultiState(c);
c->flags &= (~REDIS_MULTI);
+ unwatchAllKeys(c);
/* Make sure the EXEC command is always replicated / AOF, since we
* always send the MULTI command (we can't know beforehand if the
* next operations will contain at least a modification to the DB). */
* empty we need to block. In order to do so we remove the notification for
* new data to read in the client socket (so that we'll not serve new
* requests if the blocking request is not served). Also we put the client
- * in a dictionary (db->blockingkeys) mapping keys to a list of clients
+ * in a dictionary (db->blocking_keys) mapping keys to a list of clients
* blocking for this keys.
* - If a PUSH operation against a key with blocked clients waiting is
* performed, we serve the first in the list: basically instead to push
list *l;
int j;
- c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
- c->blockingkeysnum = numkeys;
+ c->blocking_keys = zmalloc(sizeof(robj*)*numkeys);
+ c->blocking_keys_num = numkeys;
c->blockingto = timeout;
for (j = 0; j < numkeys; j++) {
/* Add the key in the client structure, to map clients -> keys */
- c->blockingkeys[j] = keys[j];
+ c->blocking_keys[j] = keys[j];
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
- de = dictFind(c->db->blockingkeys,keys[j]);
+ de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;
/* For every key we take a list of clients blocked for it */
l = listCreate();
- retval = dictAdd(c->db->blockingkeys,keys[j],l);
+ retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
assert(retval == DICT_OK);
} else {
list *l;
int j;
- assert(c->blockingkeys != NULL);
+ assert(c->blocking_keys != NULL);
/* The client may wait for multiple keys, so unblock it for every key. */
- for (j = 0; j < c->blockingkeysnum; j++) {
+ for (j = 0; j < c->blocking_keys_num; j++) {
/* Remove this client from the list of clients waiting for this key. */
- de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
+ de = dictFind(c->db->blocking_keys,c->blocking_keys[j]);
assert(de != NULL);
l = dictGetEntryVal(de);
listDelNode(l,listSearchKey(l,c));
/* If the list is empty we need to remove it to avoid wasting memory */
if (listLength(l) == 0)
- dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
- decrRefCount(c->blockingkeys[j]);
+ dictDelete(c->db->blocking_keys,c->blocking_keys[j]);
+ decrRefCount(c->blocking_keys[j]);
}
/* Cleanup the client structure */
- zfree(c->blockingkeys);
- c->blockingkeys = NULL;
+ zfree(c->blocking_keys);
+ c->blocking_keys = NULL;
c->flags &= (~REDIS_BLOCKED);
server.blpop_blocked_clients--;
/* We want to process data if there is some command waiting
list *l;
listNode *ln;
- de = dictFind(c->db->blockingkeys,key);
+ de = dictFind(c->db->blocking_keys,key);
if (de == NULL) return 0;
l = dictGetEntryVal(de);
ln = listFirst(l);
addReplyLongLong(c,receivers);
}
+/* ===================== WATCH (CAS alike for MULTI/EXEC) ===================
+ *
+ * The implementation uses a per-DB hash table mapping keys to list of clients
+ * WATCHing those keys, so that given a key that is going to be modified
+ * we can mark all the associated clients as dirty.
+ *
+ * Also every client contains a list of WATCHed keys so that's possible to
+ * un-watch such keys when the client is freed or when UNWATCH is called. */
+
+/* In the client->watched_keys list we need to use watchedKey structures
+ * as in order to identify a key in Redis we need both the key name and the
+ * DB */
+typedef struct watchedKey {
+ robj *key;
+ redisDb *db;
+} watchedKey;
+
+/* Watch for the specified key */
+static void watchForKey(redisClient *c, robj *key) {
+ list *clients = NULL;
+ listIter li;
+ listNode *ln;
+ watchedKey *wk;
+
+ /* Check if we are already watching for this key */
+ listRewind(c->watched_keys,&li);
+ while((ln = listNext(&li))) {
+ wk = listNodeValue(ln);
+ if (wk->db == c->db && equalStringObjects(key,wk->key))
+ return; /* Key already watched */
+ }
+ /* This key is not already watched in this DB. Let's add it */
+ clients = dictFetchValue(c->db->watched_keys,key);
+ if (!clients) {
+ clients = listCreate();
+ dictAdd(c->db->watched_keys,key,clients);
+ incrRefCount(key);
+ }
+ listAddNodeTail(clients,c);
+ /* Add the new key to the lits of keys watched by this client */
+ wk = zmalloc(sizeof(*wk));
+ wk->key = key;
+ wk->db = c->db;
+ incrRefCount(key);
+ listAddNodeTail(c->watched_keys,wk);
+}
+
+/* Unwatch all the keys watched by this client. To clean the EXEC dirty
+ * flag is up to the caller. */
+static void unwatchAllKeys(redisClient *c) {
+ listIter li;
+ listNode *ln;
+
+ if (listLength(c->watched_keys) == 0) return;
+ listRewind(c->watched_keys,&li);
+ while((ln = listNext(&li))) {
+ list *clients;
+ watchedKey *wk;
+
+ /* Lookup the watched key -> clients list and remove the client
+ * from the list */
+ wk = listNodeValue(ln);
+ clients = dictFetchValue(wk->db->watched_keys, wk->key);
+ assert(clients != NULL);
+ listDelNode(clients,listSearchKey(clients,c));
+ /* Kill the entry at all if this was the only client */
+ if (listLength(clients) == 0)
+ dictDelete(wk->db->watched_keys, wk->key);
+ /* Remove this watched key from the client->watched list */
+ listDelNode(c->watched_keys,ln);
+ decrRefCount(wk->key);
+ zfree(wk);
+ }
+}
+
+/* "Touch" a key, so that if this key is being WATCHed by soem client the
+ * next EXEC will fail. */
+static void touchWatchedKey(redisDb *db, robj *key) {
+ list *clients;
+ listIter li;
+ listNode *ln;
+
+ if (dictSize(db->watched_keys) == 0) return;
+ clients = dictFetchValue(db->watched_keys, key);
+ if (!clients) return;
+
+ /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
+ /* Check if we are already watching for this key */
+ listRewind(clients,&li);
+ while((ln = listNext(&li))) {
+ redisClient *c = listNodeValue(ln);
+
+ c->flags |= REDIS_DIRTY_CAS;
+ }
+}
+
+static void watchCommand(redisClient *c) {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ watchForKey(c,c->argv[j]);
+ addReply(c,shared.ok);
+}
+
+static void unwatchCommand(redisClient *c) {
+ unwatchAllKeys(c);
+ c->flags &= (~REDIS_DIRTY_CAS);
+ addReply(c,shared.ok);
+}
+
/* ================================= Debugging ============================== */
/* Compute the sha1 of string at 's' with 'len' bytes long.