]> git.saurik.com Git - redis.git/blobdiff - redis.c
extract preloading of multiple keys according to the command prototype to a separate...
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index b3db3f583b0ad13de2da99b3c1d15c6737e40c06..e342416f1b8a48808d3a681992b9ba6aa5602b84 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -37,8 +37,6 @@
 #include <string.h>
 #include <time.h>
 #include <unistd.h>
-#define __USE_POSIX199309
-#define __USE_UNIX98
 #include <signal.h>
 
 #ifdef HAVE_BACKTRACE
@@ -372,6 +370,7 @@ struct redisServer {
     pid_t bgsavechildpid;
     pid_t bgrewritechildpid;
     sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
+    sds aofbuf;       /* AOF buffer, written before entering the event loop */
     struct saveparam *saveparams;
     int saveparamslen;
     char *logfile;
@@ -515,8 +514,9 @@ struct sharedObjectsStruct {
     *outofrangeerr, *plus,
     *select0, *select1, *select2, *select3, *select4,
     *select5, *select6, *select7, *select8, *select9,
-    *messagebulk, *subscribebulk, *unsubscribebulk, *mbulk3,
-    *psubscribebulk, *punsubscribebulk, *integers[REDIS_SHARED_INTEGERS];
+    *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3,
+    *mbulk4, *psubscribebulk, *punsubscribebulk,
+    *integers[REDIS_SHARED_INTEGERS];
 } shared;
 
 /* Global vars that are actally used as constants. The following double
@@ -557,6 +557,8 @@ static int rdbSaveBackground(char *filename);
 static robj *createStringObject(char *ptr, size_t len);
 static robj *dupStringObject(robj *o);
 static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
+static void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc);
+static void flushAppendOnlyFile(void);
 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
 static int syncWithMaster(void);
 static robj *tryObjectEncoding(robj *o);
@@ -621,12 +623,14 @@ static void freePubsubPattern(void *p);
 static int listMatchPubsubPattern(void *a, void *b);
 static int compareStringObjects(robj *a, robj *b);
 static void usage();
+static int rewriteAppendOnlyFileBackground(void);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
 static void echoCommand(redisClient *c);
 static void setCommand(redisClient *c);
 static void setnxCommand(redisClient *c);
+static void setexCommand(redisClient *c);
 static void getCommand(redisClient *c);
 static void delCommand(redisClient *c);
 static void existsCommand(redisClient *c);
@@ -735,6 +739,7 @@ static struct redisCommand cmdTable[] = {
     {"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
     {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0},
     {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0},
+    {"setex",setexCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0},
     {"append",appendCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1},
     {"substr",substrCommand,4,REDIS_CMD_INLINE,NULL,1,1,1},
     {"del",delCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
@@ -989,17 +994,17 @@ static long long memtoll(const char *p, int *err) {
     while(*u && isdigit(*u)) u++;
     if (*u == '\0' || !strcasecmp(u,"b")) {
         mul = 1;
-    } else if (!strcasecmp(u,"k") || !strcasecmp(u,"kb")) {
+    } else if (!strcasecmp(u,"k")) {
         mul = 1000;
-    } else if (!strcasecmp(u,"ki") || !strcasecmp(u,"kib")) {
+    } else if (!strcasecmp(u,"kb")) {
         mul = 1024;
-    } else if (!strcasecmp(u,"m") || !strcasecmp(u,"mb")) {
+    } else if (!strcasecmp(u,"m")) {
         mul = 1000*1000;
-    } else if (!strcasecmp(u,"mi") || !strcasecmp(u,"mib")) {
+    } else if (!strcasecmp(u,"mb")) {
         mul = 1024*1024;
-    } else if (!strcasecmp(u,"g") || !strcasecmp(u,"hb")) {
+    } else if (!strcasecmp(u,"g")) {
         mul = 1000L*1000*1000;
-    } else if (!strcasecmp(u,"gi") || !strcasecmp(u,"gib")) {
+    } else if (!strcasecmp(u,"gb")) {
         mul = 1024L*1024*1024;
     } else {
         if (err) *err = 1;
@@ -1513,6 +1518,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
         redisLog(REDIS_NOTICE,"Connecting to MASTER...");
         if (syncWithMaster() == REDIS_OK) {
             redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
+            if (server.appendonly) rewriteAppendOnlyFileBackground();
         }
     }
     return 100;
@@ -1524,6 +1530,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
 static void beforeSleep(struct aeEventLoop *eventLoop) {
     REDIS_NOTUSED(eventLoop);
 
+    /* Awake clients that got all the swapped keys they requested */
     if (server.vm_enabled && listLength(server.io_ready_clients)) {
         listIter li;
         listNode *ln;
@@ -1548,6 +1555,8 @@ static void beforeSleep(struct aeEventLoop *eventLoop) {
                 processInputBuffer(c);
         }
     }
+    /* Write the AOF buffer on disk */
+    flushAppendOnlyFile();
 }
 
 static void createSharedObjects(void) {
@@ -1588,11 +1597,13 @@ static void createSharedObjects(void) {
     shared.select8 = createStringObject("select 8\r\n",10);
     shared.select9 = createStringObject("select 9\r\n",10);
     shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
+    shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
     shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
     shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
     shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
     shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
     shared.mbulk3 = createStringObject("*3\r\n",4);
+    shared.mbulk4 = createStringObject("*4\r\n",4);
     for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
         shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
         shared.integers[j]->encoding = REDIS_ENCODING_INT;
@@ -1706,6 +1717,7 @@ static void initServer() {
     server.bgsavechildpid = -1;
     server.bgrewritechildpid = -1;
     server.bgrewritebuf = sdsempty();
+    server.aofbuf = sdsempty();
     server.lastsave = time(NULL);
     server.dirty = 0;
     server.stat_numcommands = 0;
@@ -1874,6 +1886,9 @@ static void loadServerConfig(char *filename) {
             if ((server.appendonly = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
             }
+        } else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) {
+            zfree(server.appendfilename);
+            server.appendfilename = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
             if (!strcasecmp(argv[1],"no")) {
                 server.appendfsync = APPENDFSYNC_NO;
@@ -2205,7 +2220,7 @@ static void call(redisClient *c, struct redisCommand *cmd) {
         listLength(server.slaves))
         replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
     if (listLength(server.monitors))
-        replicationFeedSlaves(server.monitors,c->db->id,c->argv,c->argc);
+        replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
     server.stat_numcommands++;
 }
 
@@ -2460,6 +2475,64 @@ static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int arg
     if (outv != static_outv) zfree(outv);
 }
 
+static sds sdscatrepr(sds s, char *p, size_t len) {
+    s = sdscatlen(s,"\"",1);
+    while(len--) {
+        switch(*p) {
+        case '\\':
+        case '"':
+            s = sdscatprintf(s,"\\%c",*p);
+            break;
+        case '\n': s = sdscatlen(s,"\\n",1); break;
+        case '\r': s = sdscatlen(s,"\\r",1); break;
+        case '\t': s = sdscatlen(s,"\\t",1); break;
+        case '\a': s = sdscatlen(s,"\\a",1); break;
+        case '\b': s = sdscatlen(s,"\\b",1); break;
+        default:
+            if (isprint(*p))
+                s = sdscatprintf(s,"%c",*p);
+            else
+                s = sdscatprintf(s,"\\x%02x",(unsigned char)*p);
+            break;
+        }
+        p++;
+    }
+    return sdscatlen(s,"\"",1);
+}
+
+static void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc) {
+    listNode *ln;
+    listIter li;
+    int j;
+    sds cmdrepr = sdsnew("+");
+    robj *cmdobj;
+    struct timeval tv;
+
+    gettimeofday(&tv,NULL);
+    cmdrepr = sdscatprintf(cmdrepr,"%ld.%ld ",(long)tv.tv_sec,(long)tv.tv_usec);
+    if (dictid != 0) cmdrepr = sdscatprintf(cmdrepr,"(db %d) ", dictid);
+
+    for (j = 0; j < argc; j++) {
+        if (argv[j]->encoding == REDIS_ENCODING_INT) {
+            cmdrepr = sdscatprintf(cmdrepr, "%ld", (long)argv[j]->ptr);
+        } else {
+            cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
+                        sdslen(argv[j]->ptr));
+        }
+        if (j != argc-1)
+            cmdrepr = sdscatlen(cmdrepr," ",1);
+    }
+    cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
+    cmdobj = createObject(REDIS_STRING,cmdrepr);
+
+    listRewind(monitors,&li);
+    while((ln = listNext(&li))) {
+        redisClient *monitor = ln->value;
+        addReply(monitor,cmdobj);
+    }
+    decrRefCount(cmdobj);
+}
+
 static void processInputBuffer(redisClient *c) {
 again:
     /* Before to process the input buffer, make sure the client is not
@@ -3157,7 +3230,7 @@ static int getDoubleFromObject(robj *o, double *target) {
         } else if (o->encoding == REDIS_ENCODING_INT) {
             value = (long)o->ptr;
         } else {
-            redisAssert(1 != 1);
+            redisPanic("Unknown string encoding");
         }
     }
 
@@ -3194,7 +3267,7 @@ static int getLongLongFromObject(robj *o, long long *target) {
         } else if (o->encoding == REDIS_ENCODING_INT) {
             value = (long)o->ptr;
         } else {
-            redisAssert(1 != 1);
+            redisPanic("Unknown string encoding");
         }
     }
 
@@ -3981,40 +4054,55 @@ static void echoCommand(redisClient *c) {
 
 /*=================================== Strings =============================== */
 
-static void setGenericCommand(redisClient *c, int nx) {
+static void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expire) {
     int retval;
+    long seconds = 0; /* initialized to avoid an harmness warning */
+
+    if (expire) {
+        if (getLongFromObjectOrReply(c, expire, &seconds, NULL) != REDIS_OK)
+            return;
+        if (seconds <= 0) {
+            addReplySds(c,sdsnew("-ERR invalid expire time in SETEX\r\n"));
+            return;
+        }
+    }
 
-    if (nx) deleteIfVolatile(c->db,c->argv[1]);
-    retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
+    if (nx) deleteIfVolatile(c->db,key);
+    retval = dictAdd(c->db->dict,key,val);
     if (retval == DICT_ERR) {
         if (!nx) {
             /* If the key is about a swapped value, we want a new key object
              * to overwrite the old. So we delete the old key in the database.
              * This will also make sure that swap pages about the old object
              * will be marked as free. */
-            if (server.vm_enabled && deleteIfSwapped(c->db,c->argv[1]))
-                incrRefCount(c->argv[1]);
-            dictReplace(c->db->dict,c->argv[1],c->argv[2]);
-            incrRefCount(c->argv[2]);
+            if (server.vm_enabled && deleteIfSwapped(c->db,key))
+                incrRefCount(key);
+            dictReplace(c->db->dict,key,val);
+            incrRefCount(val);
         } else {
             addReply(c,shared.czero);
             return;
         }
     } else {
-        incrRefCount(c->argv[1]);
-        incrRefCount(c->argv[2]);
+        incrRefCount(key);
+        incrRefCount(val);
     }
     server.dirty++;
-    removeExpire(c->db,c->argv[1]);
+    removeExpire(c->db,key);
+    if (expire) setExpire(c->db,key,time(NULL)+seconds);
     addReply(c, nx ? shared.cone : shared.ok);
 }
 
 static void setCommand(redisClient *c) {
-    setGenericCommand(c,0);
+    setGenericCommand(c,0,c->argv[1],c->argv[2],NULL);
 }
 
 static void setnxCommand(redisClient *c) {
-    setGenericCommand(c,1);
+    setGenericCommand(c,1,c->argv[1],c->argv[2],NULL);
+}
+
+static void setexCommand(redisClient *c) {
+    setGenericCommand(c,0,c->argv[1],c->argv[3],c->argv[2]);
 }
 
 static int getGenericCommand(redisClient *c) {
@@ -4258,7 +4346,12 @@ static void delCommand(redisClient *c) {
 }
 
 static void existsCommand(redisClient *c) {
-    addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
+    expireIfNeeded(c->db,c->argv[1]);
+    if (dictFind(c->db->dict,c->argv[1])) {
+        addReply(c, shared.cone);
+    } else {
+        addReply(c, shared.czero);
+    }
 }
 
 static void selectCommand(redisClient *c) {
@@ -4400,7 +4493,6 @@ static void shutdownCommand(redisClient *c) {
                 unlink(server.pidfile);
             redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
             redisLog(REDIS_WARNING,"Server exit now, bye bye...");
-            if (server.vm_enabled) unlink(server.vm_swap_file);
             exit(0);
         } else {
             /* Ooops.. error saving! The best we can do is to continue
@@ -6383,12 +6475,11 @@ static void hincrbyCommand(redisClient *c) {
     if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != REDIS_OK) return;
     if ((o = hashLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
     if ((current = hashGet(o,c->argv[2])) != NULL) {
-        if (current->encoding == REDIS_ENCODING_RAW)
-            value = strtoll(current->ptr,NULL,10);
-        else if (current->encoding == REDIS_ENCODING_INT)
-            value = (long)current->ptr;
-        else
-            redisAssert(1 != 1);
+        if (getLongLongFromObjectOrReply(c,current,&value,
+            "hash value is not an integer") != REDIS_OK) {
+            decrRefCount(current);
+            return;
+        }
         decrRefCount(current);
     } else {
         value = 0;
@@ -7929,11 +8020,55 @@ static void freeMemoryIfNeeded(void) {
 
 /* ============================== Append Only file ========================== */
 
+/* Write the append only file buffer on disk.
+ *
+ * Since we are required to write the AOF before replying to the client,
+ * and the only way the client socket can get a write is entering when the
+ * the event loop, we accumulate all the AOF writes in a memory
+ * buffer and write it on disk using this function just before entering
+ * the event loop again. */
+static void flushAppendOnlyFile(void) {
+    time_t now;
+    ssize_t nwritten;
+
+    if (sdslen(server.aofbuf) == 0) return;
+
+    /* We want to perform a single write. This should be guaranteed atomic
+     * at least if the filesystem we are writing is a real physical one.
+     * While this will save us against the server being killed I don't think
+     * there is much to do about the whole server stopping for power problems
+     * or alike */
+     nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf));
+     if (nwritten != (signed)sdslen(server.aofbuf)) {
+        /* Ooops, we are in troubles. The best thing to do for now is
+         * aborting instead of giving the illusion that everything is
+         * working as expected. */
+         if (nwritten == -1) {
+            redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
+         } else {
+            redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
+         }
+         exit(1);
+    }
+    sdsfree(server.aofbuf);
+    server.aofbuf = sdsempty();
+
+    /* Fsync if needed */
+    now = time(NULL);
+    if (server.appendfsync == APPENDFSYNC_ALWAYS ||
+        (server.appendfsync == APPENDFSYNC_EVERYSEC &&
+         now-server.lastfsync > 1))
+    {
+        /* aof_fsync is defined as fdatasync() for Linux in order to avoid
+         * flushing metadata. */
+        aof_fsync(server.appendfd); /* Let's try to get this data on the disk */
+        server.lastfsync = now;
+    }
+}
+
 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
     sds buf = sdsempty();
     int j;
-    ssize_t nwritten;
-    time_t now;
     robj *tmpargv[3];
 
     /* The DB this command was targetting is not the same as the last command
@@ -7979,23 +8114,11 @@ static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv
             decrRefCount(argv[j]);
     }
 
-    /* We want to perform a single write. This should be guaranteed atomic
-     * at least if the filesystem we are writing is a real physical one.
-     * While this will save us against the server being killed I don't think
-     * there is much to do about the whole server stopping for power problems
-     * or alike */
-     nwritten = write(server.appendfd,buf,sdslen(buf));
-     if (nwritten != (signed)sdslen(buf)) {
-        /* Ooops, we are in troubles. The best thing to do for now is
-         * to simply exit instead to give the illusion that everything is
-         * working as expected. */
-         if (nwritten == -1) {
-            redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
-         } else {
-            redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
-         }
-         exit(1);
-    }
+    /* Append to the AOF buffer. This will be flushed on disk just before
+     * of re-entering the event loop, so before the client will get a
+     * positive reply about the operation performed. */
+    server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf));
+
     /* If a background append only file rewriting is in progress we want to
      * accumulate the differences between the child DB and the current one
      * in a buffer, so that when the child process will do its work we
@@ -8004,14 +8127,6 @@ static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv
         server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
 
     sdsfree(buf);
-    now = time(NULL);
-    if (server.appendfsync == APPENDFSYNC_ALWAYS ||
-        (server.appendfsync == APPENDFSYNC_EVERYSEC &&
-         now-server.lastfsync > 1))
-    {
-        fsync(server.appendfd); /* Let's try to get this data on the disk */
-        server.lastfsync = now;
-    }
 }
 
 /* In Redis commands are always executed in the context of a client, so in
@@ -8031,12 +8146,14 @@ static struct redisClient *createFakeClient(void) {
     c->reply = listCreate();
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
+    initClientMultiState(c);
     return c;
 }
 
 static void freeFakeClient(struct redisClient *c) {
     sdsfree(c->querybuf);
     listRelease(c->reply);
+    freeClientMultiState(c);
     zfree(c);
 }
 
@@ -8048,6 +8165,7 @@ int loadAppendOnlyFile(char *filename) {
     FILE *fp = fopen(filename,"r");
     struct redis_stat sb;
     unsigned long long loadedkeys = 0;
+    int appendonly = server.appendonly;
 
     if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
         return REDIS_ERR;
@@ -8057,6 +8175,10 @@ int loadAppendOnlyFile(char *filename) {
         exit(1);
     }
 
+    /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
+     * to the same file we're about to read. */
+    server.appendonly = 0;
+
     fakeClient = createFakeClient();
     while(1) {
         int argc, j;
@@ -8112,8 +8234,14 @@ int loadAppendOnlyFile(char *filename) {
             }
         }
     }
+
+    /* This point can only be reached when EOF is reached without errors.
+     * If the client is in the middle of a MULTI/EXEC, log error and quit. */
+    if (fakeClient->flags & REDIS_MULTI) goto readerr;
+
     fclose(fp);
     freeFakeClient(fakeClient);
+    server.appendonly = appendonly;
     return REDIS_OK;
 
 readerr:
@@ -8463,42 +8591,38 @@ static void aofRemoveTempFile(pid_t childpid) {
 
 /* =================== Virtual Memory - Blocking Side  ====================== */
 
-/* substitute the first occurrence of '%p' with the process pid in the
- * swap file name. */
-static void expandVmSwapFilename(void) {
-    char *p = strstr(server.vm_swap_file,"%p");
-    sds new;
-
-    if (!p) return;
-    new = sdsempty();
-    *p = '\0';
-    new = sdscat(new,server.vm_swap_file);
-    new = sdscatprintf(new,"%ld",(long) getpid());
-    new = sdscat(new,p+2);
-    zfree(server.vm_swap_file);
-    server.vm_swap_file = new;
-}
-
 static void vmInit(void) {
     off_t totsize;
     int pipefds[2];
     size_t stacksize;
+    struct flock fl;
 
     if (server.vm_max_threads != 0)
         zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
 
-    expandVmSwapFilename();
     redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file);
+    /* Try to open the old swap file, otherwise create it */
     if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) {
         server.vm_fp = fopen(server.vm_swap_file,"w+b");
     }
     if (server.vm_fp == NULL) {
         redisLog(REDIS_WARNING,
-            "Impossible to open the swap file: %s. Exiting.",
+            "Can't open the swap file: %s. Exiting.",
             strerror(errno));
         exit(1);
     }
     server.vm_fd = fileno(server.vm_fp);
+    /* Lock the swap file for writing, this is useful in order to avoid
+     * another instance to use the same swap file for a config error. */
+    fl.l_type = F_WRLCK;
+    fl.l_whence = SEEK_SET;
+    fl.l_start = fl.l_len = 0;
+    if (fcntl(server.vm_fd,F_SETLK,&fl) == -1) {
+        redisLog(REDIS_WARNING,
+            "Can't lock the swap file at '%s': %s. Make sure it is not used by another Redis instance.", server.vm_swap_file, strerror(errno));
+        exit(1);
+    }
+    /* Initialize */
     server.vm_next_page = 0;
     server.vm_near_pages = 0;
     server.vm_stats_used_pages = 0;
@@ -9424,6 +9548,19 @@ static int waitForSwappedKey(redisClient *c, robj *key) {
     return 1;
 }
 
+/* Preload keys for any command with first, last and step values for
+ * the command keys prototype, as defined in the command table. */
+static void waitForMultipleSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) {
+    int j, last;
+    if (cmd->vm_firstkey == 0) return;
+    last = cmd->vm_lastkey;
+    if (last < 0) last = argc+last;
+    for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) {
+        redisAssert(j < argc);
+        waitForSwappedKey(c,argv[j]);
+    }
+}
+
 /* Preload keys needed for the ZUNION and ZINTER commands. */
 static void zunionInterBlockClientOnSwappedKeys(redisClient *c) {
     int i, num;
@@ -9444,16 +9581,10 @@ static void zunionInterBlockClientOnSwappedKeys(redisClient *c) {
  * Return 1 if the client is marked as blocked, 0 if the client can
  * continue as the keys it is going to access appear to be in memory. */
 static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c) {
-    int j, last;
-
     if (cmd->vm_preload_proc != NULL) {
         cmd->vm_preload_proc(c);
     } else {
-        if (cmd->vm_firstkey == 0) return 0;
-        last = cmd->vm_lastkey;
-        if (last < 0) last = c->argc+last;
-        for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep)
-            waitForSwappedKey(c,c->argv[j]);
+        waitForMultipleSwappedKeys(c,cmd,c->argc,c->argv);
     }
 
     /* If the client was blocked for at least one key, mark it as blocked. */
@@ -9817,8 +9948,9 @@ static int pubsubPublishMessage(robj *channel, robj *message) {
                                 sdslen(pat->pattern->ptr),
                                 (char*)channel->ptr,
                                 sdslen(channel->ptr),0)) {
-                addReply(pat->client,shared.mbulk3);
-                addReply(pat->client,shared.messagebulk);
+                addReply(pat->client,shared.mbulk4);
+                addReply(pat->client,shared.pmessagebulk);
+                addReplyBulk(pat->client,pat->pattern);
                 addReplyBulk(pat->client,channel);
                 addReplyBulk(pat->client,message);
                 receivers++;