]> git.saurik.com Git - redis.git/commitdiff
touched key for WATCH refactored into a more general thing that can be used also...
authorantirez <antirez@gmail.com>
Wed, 29 Dec 2010 18:39:42 +0000 (19:39 +0100)
committerantirez <antirez@gmail.com>
Wed, 29 Dec 2010 18:39:42 +0000 (19:39 +0100)
12 files changed:
src/db.c
src/diskstore.c
src/object.c
src/rdb.c
src/redis.c
src/redis.h
src/sort.c
src/t_hash.c
src/t_list.c
src/t_set.c
src/t_string.c
src/t_zset.c

index 8ce5d67334ff423c4003c9b585cf964878ce94de..f0701c2a5685f7d7aad9c4b3ee1912085e53360b 100644 (file)
--- a/src/db.c
+++ b/src/db.c
@@ -152,20 +152,41 @@ int selectDb(redisClient *c, int id) {
     return REDIS_OK;
 }
 
+/*-----------------------------------------------------------------------------
+ * Hooks for key space changes.
+ *
+ * Every time a key in the database is modified the function
+ * signalModifiedKey() is called.
+ *
+ * Every time a DB is flushed the function signalFlushDb() is called.
+ *----------------------------------------------------------------------------*/
+
+void signalModifiedKey(redisDb *db, robj *key) {
+    touchWatchedKey(db,key);
+    if (server.ds_enabled)
+        cacheScheduleForFlush(db,key);
+}
+
+void signalFlushedDb(int dbid) {
+    touchWatchedKeysOnFlush(dbid);
+    if (server.ds_enabled)
+        dsFlushDb(dbid);
+}
+
 /*-----------------------------------------------------------------------------
  * Type agnostic commands operating on the key space
  *----------------------------------------------------------------------------*/
 
 void flushdbCommand(redisClient *c) {
     server.dirty += dictSize(c->db->dict);
-    touchWatchedKeysOnFlush(c->db->id);
+    signalFlushedDb(c->db->id);
     dictEmpty(c->db->dict);
     dictEmpty(c->db->expires);
     addReply(c,shared.ok);
 }
 
 void flushallCommand(redisClient *c) {
-    touchWatchedKeysOnFlush(-1);
+    signalFlushedDb(-1);
     server.dirty += emptyDb();
     addReply(c,shared.ok);
     if (server.bgsavechildpid != -1) {
@@ -181,7 +202,7 @@ void delCommand(redisClient *c) {
 
     for (j = 1; j < c->argc; j++) {
         if (dbDelete(c->db,c->argv[j])) {
-            touchWatchedKey(c->db,c->argv[j]);
+            signalModifiedKey(c->db,c->argv[j]);
             server.dirty++;
             deleted++;
         }
@@ -327,8 +348,8 @@ void renameGenericCommand(redisClient *c, int nx) {
         dbReplace(c->db,c->argv[2],o);
     }
     dbDelete(c->db,c->argv[1]);
-    touchWatchedKey(c->db,c->argv[1]);
-    touchWatchedKey(c->db,c->argv[2]);
+    signalModifiedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[2]);
     server.dirty++;
     addReply(c,nx ? shared.cone : shared.ok);
 }
@@ -487,13 +508,13 @@ void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
     if (seconds <= 0) {
         if (dbDelete(c->db,key)) server.dirty++;
         addReply(c, shared.cone);
-        touchWatchedKey(c->db,key);
+        signalModifiedKey(c->db,key);
         return;
     } else {
         time_t when = time(NULL)+seconds;
         setExpire(c->db,key,when);
         addReply(c,shared.cone);
-        touchWatchedKey(c->db,key);
+        signalModifiedKey(c->db,key);
         server.dirty++;
         return;
     }
index abaecb6b1b6bdd7acc9f6f710779d7d2250d4907..0eece403d98da86e08fa519dcac7ede0112d9730 100644 (file)
@@ -120,3 +120,6 @@ int dsDel(redisDb *db, robj *key) {
 
 int dsExists(redisDb *db, robj *key) {
 }
+
+int dsFlushDb(int dbid) {
+}
index 4374a07c1562b5f4a186b8fd2d7cc94870896f73..f4c34dcfd8ab039d2f5801632b5dfff9bff38ae7 100644 (file)
@@ -208,16 +208,16 @@ robj *tryObjectEncoding(robj *o) {
     /* Ok, this object can be encoded...
      *
      * Can I use a shared object? Only if the object is inside a given
-     * range and if this is the main thread, since when VM is enabled we
-     * have the constraint that I/O thread should only handle non-shared
-     * objects, in order to avoid race conditions (we don't have per-object
-     * locking).
+     * range and if the back end in use is in-memory. For disk store every
+     * object in memory used as value should be independent.
      *
      * Note that we also avoid using shared integers when maxmemory is used
-     * because very object needs to have a private LRU field for the LRU
+     * because every object needs to have a private LRU field for the LRU
      * algorithm to work well. */
-    if (server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS &&
-        pthread_equal(pthread_self(),server.mainthread)) {
+    if (server.ds_enabled == 0 &&
+        server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS &&
+        pthread_equal(pthread_self(),server.mainthread))
+    {
         decrRefCount(o);
         incrRefCount(shared.integers[value]);
         return shared.integers[value];
index e9ca111e5cd3a92f75c73c32166b64dbb8949d4d..acec829a6322a2ed2d28d4fd658dc8d5f5ca0027 100644 (file)
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -848,7 +848,6 @@ int rdbLoad(char *filename) {
     FILE *fp;
     uint32_t dbid;
     int type, retval, rdbver;
-    int swap_all_values = 0;
     redisDb *db = server.db+0;
     char buf[1024];
     time_t expiretime, now = time(NULL);
@@ -919,28 +918,6 @@ int rdbLoad(char *filename) {
         /* Set the expire time if needed */
         if (expiretime != -1) setExpire(db,key,expiretime);
 
-        /* Handle swapping while loading big datasets when VM is on */
-
-        /* If we detecter we are hopeless about fitting something in memory
-         * we just swap every new key on disk. Directly...
-         * Note that's important to check for this condition before resorting
-         * to random sampling, otherwise we may try to swap already
-         * swapped keys. */
-        if (swap_all_values) {
-            dictEntry *de = dictFind(db->dict,key->ptr);
-
-            /* de may be NULL since the key already expired */
-            if (de) {
-                vmpointer *vp;
-                val = dictGetEntryVal(de);
-
-                if (val->refcount == 1 &&
-                    (vp = vmSwapObjectBlocking(val)) != NULL)
-                    dictGetEntryVal(de) = vp;
-            }
-            decrRefCount(key);
-            continue;
-        }
         decrRefCount(key);
     }
     fclose(fp);
index b28d717cec4eab56edc99910e373ee44023081b0..12ff99064f299eba758cdb263efa7b184f1d5c9b 100644 (file)
@@ -830,6 +830,9 @@ void initServer() {
     server.slaves = listCreate();
     server.monitors = listCreate();
     server.unblocked_clients = listCreate();
+    server.cache_flush_queue = listCreate();
+    server.cache_flush_delay = 0;
+
     createSharedObjects();
     server.el = aeCreateEventLoop();
     server.db = zmalloc(sizeof(redisDb)*server.dbnum);
index 1557e2604fd3296c2ecd870591d05659150de831..25d76ab09dcb15b8b2b27c5bd9de3707f3186e7c 100644 (file)
@@ -431,7 +431,9 @@ struct redisServer {
     /* Blocked clients */
     unsigned int bpop_blocked_clients;
     unsigned int cache_blocked_clients;
-    list *unblocked_clients;
+    list *unblocked_clients; /* list of clients to unblock before next loop */
+    list *cache_flush_queue; /* keys to flush on disk */
+    int cache_flush_delay;   /* seconds to wait before flushing keys */
     /* Sort parameters - qsort_r() is only available under BSD so we
      * have to take this state global, in order to pass it to sortCompare() */
     int sort_desc;
@@ -554,6 +556,14 @@ typedef struct iojob {
                  * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */
 } iojob;
 
+/* When diskstore is enabled and a flush operation is requested we push
+ * one of this structures into server.cache_flush_queue. */
+typedef struct dirtykey {
+    redisDb *db;
+    robj *key;
+    time_t ctime; /* This is the creation time of the entry. */
+} dirtykey;
+
 /* Structure to hold list iteration abstraction. */
 typedef struct {
     robj *subject;
@@ -774,33 +784,22 @@ int dsSet(redisDb *db, robj *key, robj *val);
 robj *dsGet(redisDb *db, robj *key);
 int dsDel(redisDb *db, robj *key);
 int dsExists(redisDb *db, robj *key);
+int dsFlushDb(int dbid);
 
 /* Disk Store Cache */
-void vmInit(void);
-void vmMarkPagesFree(off_t page, off_t count);
-robj *vmLoadObject(robj *o);
-robj *vmPreviewObject(robj *o);
-int vmSwapOneObjectBlocking(void);
-int vmSwapOneObjectThreaded(void);
-int vmCanSwapOut(void);
+void dsInit(void);
 void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask);
-void vmCancelThreadedIOJob(robj *o);
 void lockThreadedIO(void);
 void unlockThreadedIO(void);
-int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db);
 void freeIOJob(iojob *j);
 void queueIOJob(iojob *j);
-int vmWriteObjectOnSwap(robj *o, off_t page);
-robj *vmReadObjectFromSwap(off_t page, int type);
 void waitEmptyIOJobsQueue(void);
-void vmReopenSwapFile(void);
-int vmFreePage(off_t page);
 void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
 void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
 int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
 int dontWaitForSwappedKey(redisClient *c, robj *key);
 void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
-vmpointer *vmSwapObjectBlocking(robj *val);
+int cacheFreeOneEntry(void);
 
 /* Set data type */
 robj *setTypeCreate(robj *value);
@@ -871,6 +870,8 @@ robj *dbRandomKey(redisDb *db);
 int dbDelete(redisDb *db, robj *key);
 long long emptyDb();
 int selectDb(redisClient *c, int id);
+void signalModifiedKey(redisDb *db, robj *key);
+void signalFlushedDb(int dbid);
 
 /* Git SHA1 */
 char *redisGitSHA1(void);
index a44a6d63ba891ca21ea32c2fe881c58b62750dd1..1cf8932e3758c14d756d92dd0da5241b1ca49e05 100644 (file)
@@ -368,7 +368,7 @@ void sortCommand(redisClient *c) {
          * SORT result is empty a new key is set and maybe the old content
          * replaced. */
         server.dirty += 1+outputlen;
-        touchWatchedKey(c->db,storekey);
+        signalModifiedKey(c->db,storekey);
         addReplyLongLong(c,outputlen);
     }
 
index 488bf6b7a039449137ca874dbfba50687ba256fa..5e7525bd13a383f38dc54f1420783c01a4acaf64 100644 (file)
@@ -279,7 +279,7 @@ void hsetCommand(redisClient *c) {
     hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]);
     update = hashTypeSet(o,c->argv[2],c->argv[3]);
     addReply(c, update ? shared.czero : shared.cone);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
 }
 
@@ -294,7 +294,7 @@ void hsetnxCommand(redisClient *c) {
         hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]);
         hashTypeSet(o,c->argv[2],c->argv[3]);
         addReply(c, shared.cone);
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
     }
 }
@@ -315,7 +315,7 @@ void hmsetCommand(redisClient *c) {
         hashTypeSet(o,c->argv[i],c->argv[i+1]);
     }
     addReply(c, shared.ok);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
 }
 
@@ -342,7 +342,7 @@ void hincrbyCommand(redisClient *c) {
     hashTypeSet(o,c->argv[2],new);
     decrRefCount(new);
     addReplyLongLong(c,value);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
 }
 
@@ -402,7 +402,7 @@ void hdelCommand(redisClient *c) {
     if (hashTypeDelete(o,c->argv[2])) {
         if (hashTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
         addReply(c,shared.cone);
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
     } else {
         addReply(c,shared.czero);
index 8ee3b987215a059a73c9900c152383c7988a90a3..d5db6feb324c21e7811203e0ed320662d6e0dd4c 100644 (file)
@@ -274,14 +274,14 @@ void pushGenericCommand(redisClient *c, int where) {
             return;
         }
         if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
-            touchWatchedKey(c->db,c->argv[1]);
+            signalModifiedKey(c->db,c->argv[1]);
             addReply(c,shared.cone);
             return;
         }
     }
     listTypePush(lobj,c->argv[2],where);
     addReplyLongLong(c,listTypeLength(lobj));
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
 }
 
@@ -330,7 +330,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
             if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
                 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
                     listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
-            touchWatchedKey(c->db,c->argv[1]);
+            signalModifiedKey(c->db,c->argv[1]);
             server.dirty++;
         } else {
             /* Notify client of a failed insert */
@@ -339,7 +339,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
         }
     } else {
         listTypePush(subject,val,where);
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
     }
 
@@ -427,7 +427,7 @@ void lsetCommand(redisClient *c) {
             o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
             decrRefCount(value);
             addReply(c,shared.ok);
-            touchWatchedKey(c->db,c->argv[1]);
+            signalModifiedKey(c->db,c->argv[1]);
             server.dirty++;
         }
     } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
@@ -439,7 +439,7 @@ void lsetCommand(redisClient *c) {
             listNodeValue(ln) = value;
             incrRefCount(value);
             addReply(c,shared.ok);
-            touchWatchedKey(c->db,c->argv[1]);
+            signalModifiedKey(c->db,c->argv[1]);
             server.dirty++;
         }
     } else {
@@ -458,7 +458,7 @@ void popGenericCommand(redisClient *c, int where) {
         addReplyBulk(c,value);
         decrRefCount(value);
         if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
     }
 }
@@ -573,7 +573,7 @@ void ltrimCommand(redisClient *c) {
         redisPanic("Unknown list encoding");
     }
     if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
     addReply(c,shared.ok);
 }
@@ -616,7 +616,7 @@ void lremCommand(redisClient *c) {
 
     if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
     addReplyLongLong(c,removed);
-    if (removed) touchWatchedKey(c->db,c->argv[1]);
+    if (removed) signalModifiedKey(c->db,c->argv[1]);
 }
 
 /* This is the semantic of this command:
@@ -642,7 +642,7 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
             dstobj = createZiplistObject();
             dbAdd(c->db,dstkey,dstobj);
         } else {
-            touchWatchedKey(c->db,dstkey);
+            signalModifiedKey(c->db,dstkey);
             server.dirty++;
         }
         listTypePush(dstobj,value,REDIS_HEAD);
@@ -670,7 +670,7 @@ void rpoplpushCommand(redisClient *c) {
 
         /* Delete the source list when it is empty */
         if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
     }
 }
index 0b4128adf4c804053b9e237b0a7a9b9f354e8f1c..2af383d10f60a951de475247dffb8d66fdf67d10 100644 (file)
@@ -231,7 +231,7 @@ void saddCommand(redisClient *c) {
         }
     }
     if (setTypeAdd(set,c->argv[2])) {
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
         addReply(c,shared.cone);
     } else {
@@ -248,7 +248,7 @@ void sremCommand(redisClient *c) {
     c->argv[2] = tryObjectEncoding(c->argv[2]);
     if (setTypeRemove(set,c->argv[2])) {
         if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
         addReply(c,shared.cone);
     } else {
@@ -287,8 +287,8 @@ void smoveCommand(redisClient *c) {
 
     /* Remove the src set from the database when empty */
     if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]);
-    touchWatchedKey(c->db,c->argv[1]);
-    touchWatchedKey(c->db,c->argv[2]);
+    signalModifiedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[2]);
     server.dirty++;
 
     /* Create the destination set when it doesn't exist */
@@ -341,7 +341,7 @@ void spopCommand(redisClient *c) {
         setTypeRemove(set,ele);
     }
     if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
 }
 
@@ -382,7 +382,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
             zfree(sets);
             if (dstkey) {
                 if (dbDelete(c->db,dstkey)) {
-                    touchWatchedKey(c->db,dstkey);
+                    signalModifiedKey(c->db,dstkey);
                     server.dirty++;
                 }
                 addReply(c,shared.czero);
@@ -486,7 +486,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
             decrRefCount(dstset);
             addReply(c,shared.czero);
         }
-        touchWatchedKey(c->db,dstkey);
+        signalModifiedKey(c->db,dstkey);
         server.dirty++;
     } else {
         setDeferredMultiBulkLength(c,replylen,cardinality);
@@ -578,7 +578,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *
             decrRefCount(dstset);
             addReply(c,shared.czero);
         }
-        touchWatchedKey(c->db,dstkey);
+        signalModifiedKey(c->db,dstkey);
         server.dirty++;
     }
     zfree(sets);
index c3e3607f305b5ee31ca76462047b6d5133d60d2c..2d7785ff3e33dde5d9f7bd9489048c23e816811f 100644 (file)
@@ -38,7 +38,7 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir
     } else {
         incrRefCount(val);
     }
-    touchWatchedKey(c->db,key);
+    signalModifiedKey(c->db,key);
     server.dirty++;
     removeExpire(c->db,key);
     if (expire) setExpire(c->db,key,time(NULL)+seconds);
@@ -84,7 +84,7 @@ void getsetCommand(redisClient *c) {
     c->argv[2] = tryObjectEncoding(c->argv[2]);
     dbReplace(c->db,c->argv[1],c->argv[2]);
     incrRefCount(c->argv[2]);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
     removeExpire(c->db,c->argv[1]);
 }
@@ -156,7 +156,7 @@ void setbitCommand(redisClient *c) {
     byteval &= ~(1 << bit);
     byteval |= ((on & 0x1) << bit);
     ((char*)o->ptr)[byte] = byteval;
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
     addReply(c, bitval ? shared.cone : shared.czero);
 }
@@ -244,7 +244,7 @@ void setrangeCommand(redisClient *c) {
     if (sdslen(value) > 0) {
         o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value));
         memcpy((char*)o->ptr+offset,value,sdslen(value));
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
     }
     addReplyLongLong(c,sdslen(o->ptr));
@@ -331,7 +331,7 @@ void msetGenericCommand(redisClient *c, int nx) {
         dbReplace(c->db,c->argv[j],c->argv[j+1]);
         incrRefCount(c->argv[j+1]);
         removeExpire(c->db,c->argv[j]);
-        touchWatchedKey(c->db,c->argv[j]);
+        signalModifiedKey(c->db,c->argv[j]);
     }
     server.dirty += (c->argc-1)/2;
     addReply(c, nx ? shared.cone : shared.ok);
@@ -361,7 +361,7 @@ void incrDecrCommand(redisClient *c, long long incr) {
     }
     o = createStringObjectFromLongLong(value);
     dbReplace(c->db,c->argv[1],o);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
     addReply(c,shared.colon);
     addReply(c,o);
@@ -424,7 +424,7 @@ void appendCommand(redisClient *c) {
         o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr));
         totlen = sdslen(o->ptr);
     }
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
     addReplyLongLong(c,totlen);
 }
index 8139b53d84bb43b94ea7455786eac7ee54403003..27522367fb277e59f95157a1cc5501e67c5707bb 100644 (file)
@@ -399,7 +399,7 @@ void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double score, int
         de = dictFind(zs->dict,ele);
         redisAssert(de != NULL);
         dictGetEntryVal(de) = &znode->score;
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
         if (incr)
             addReplyDouble(c,score);
@@ -427,7 +427,7 @@ void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double score, int
 
             /* Update the score in the current dict entry */
             dictGetEntryVal(de) = &znode->score;
-            touchWatchedKey(c->db,c->argv[1]);
+            signalModifiedKey(c->db,c->argv[1]);
             server.dirty++;
         }
         if (incr)
@@ -477,7 +477,7 @@ void zremCommand(redisClient *c) {
     dictDelete(zs->dict,c->argv[2]);
     if (htNeedsResize(zs->dict)) dictResize(zs->dict);
     if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
     addReply(c,shared.cone);
 }
@@ -501,7 +501,7 @@ void zremrangebyscoreCommand(redisClient *c) {
     deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
     if (htNeedsResize(zs->dict)) dictResize(zs->dict);
     if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
-    if (deleted) touchWatchedKey(c->db,c->argv[1]);
+    if (deleted) signalModifiedKey(c->db,c->argv[1]);
     server.dirty += deleted;
     addReplyLongLong(c,deleted);
 }
@@ -540,7 +540,7 @@ void zremrangebyrankCommand(redisClient *c) {
     deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
     if (htNeedsResize(zs->dict)) dictResize(zs->dict);
     if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
-    if (deleted) touchWatchedKey(c->db,c->argv[1]);
+    if (deleted) signalModifiedKey(c->db,c->argv[1]);
     server.dirty += deleted;
     addReplyLongLong(c, deleted);
 }
@@ -741,14 +741,14 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
     }
 
     if (dbDelete(c->db,dstkey)) {
-        touchWatchedKey(c->db,dstkey);
+        signalModifiedKey(c->db,dstkey);
         touched = 1;
         server.dirty++;
     }
     if (dstzset->zsl->length) {
         dbAdd(c->db,dstkey,dstobj);
         addReplyLongLong(c, dstzset->zsl->length);
-        if (!touched) touchWatchedKey(c->db,dstkey);
+        if (!touched) signalModifiedKey(c->db,dstkey);
         server.dirty++;
     } else {
         decrRefCount(dstobj);