]> git.saurik.com Git - redis.git/commitdiff
Merge remote branch 'origin/unstable' into unstable
authorantirez <antirez@gmail.com>
Sun, 5 Jun 2011 18:51:49 +0000 (20:51 +0200)
committerantirez <antirez@gmail.com>
Sun, 5 Jun 2011 18:51:49 +0000 (20:51 +0200)
12 files changed:
src/aof.c
src/object.c
src/rdb.c
src/redis-cli.c
src/redis.c
src/redis.h
src/replication.c
src/t_set.c
src/t_zset.c
tests/test_helper.tcl
tests/unit/type/set.tcl
tests/unit/type/zset.tcl

index ef72a2b169a007387179e5d24d4830fcbb50e047..cd409a0b376d85d2df9bdc6e583abd8e3b6aacb9 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
@@ -565,16 +565,18 @@ werr:
  */
 int rewriteAppendOnlyFileBackground(void) {
     pid_t childpid;
+    long long start;
 
     if (server.bgrewritechildpid != -1) return REDIS_ERR;
     if (server.ds_enabled != 0) {
         redisLog(REDIS_WARNING,"BGREWRITEAOF called with diskstore enabled: AOF is not supported when diskstore is enabled. Operation not performed.");
         return REDIS_ERR;
     }
+    start = ustime();
     if ((childpid = fork()) == 0) {
-        /* Child */
         char tmpfile[256];
 
+        /* Child */
         if (server.ipfd > 0) close(server.ipfd);
         if (server.sofd > 0) close(server.sofd);
         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
@@ -585,6 +587,7 @@ int rewriteAppendOnlyFileBackground(void) {
         }
     } else {
         /* Parent */
+        server.stat_fork_time = ustime()-start;
         if (childpid == -1) {
             redisLog(REDIS_WARNING,
                 "Can't rewrite append only file in background: fork: %s",
index 22f538371a72bca93c4bd1095febf12a98766c66..20e7f57a3f0a9ce77a0e8e4d7ebb76d804fa9e73 100644 (file)
@@ -180,7 +180,7 @@ void decrRefCount(void *obj) {
     robj *o = obj;
 
     if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
-    if (--(o->refcount) == 0) {
+    if (o->refcount == 1) {
         switch(o->type) {
         case REDIS_STRING: freeStringObject(o); break;
         case REDIS_LIST: freeListObject(o); break;
@@ -189,8 +189,9 @@ void decrRefCount(void *obj) {
         case REDIS_HASH: freeHashObject(o); break;
         default: redisPanic("Unknown object type"); break;
         }
-        o->ptr = NULL; /* defensive programming. We'll see NULL in traces. */
         zfree(o);
+    } else {
+        o->refcount--;
     }
 }
 
index eeafc053d30fbbcf4471ee935f3b562cfb6a28b6..0d4940d264e5fe55828e760d16b40970c19e7ee6 100644 (file)
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -482,6 +482,7 @@ werr:
 
 int rdbSaveBackground(char *filename) {
     pid_t childpid;
+    long long start;
 
     if (server.bgsavechildpid != -1 ||
         server.bgsavethread != (pthread_t) -1) return REDIS_ERR;
@@ -493,6 +494,7 @@ int rdbSaveBackground(char *filename) {
         return dsRdbSaveBackground(filename);
     }
 
+    start = ustime();
     if ((childpid = fork()) == 0) {
         int retval;
 
@@ -503,6 +505,7 @@ int rdbSaveBackground(char *filename) {
         _exit((retval == REDIS_OK) ? 0 : 1);
     } else {
         /* Parent */
+        server.stat_fork_time = ustime()-start;
         if (childpid == -1) {
             redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                 strerror(errno));
index b53a4c8239fa1247791d713bc74f09bd3ed35b72..d0c9d979f014a769e88e78e311bf484f651e8f08 100644 (file)
@@ -55,6 +55,7 @@ static struct config {
     int hostport;
     char *hostsocket;
     long repeat;
+    long interval;
     int dbnum;
     int interactive;
     int shutdown;
@@ -87,9 +88,11 @@ static long long mstime(void) {
 
 static void cliRefreshPrompt(void) {
     if (config.dbnum == 0)
-        snprintf(config.prompt,sizeof(config.prompt),"redis> ");
+        snprintf(config.prompt,sizeof(config.prompt),"redis %s:%d> ",
+            config.hostip, config.hostport);
     else
-        snprintf(config.prompt,sizeof(config.prompt),"redis:%d> ",config.dbnum);
+        snprintf(config.prompt,sizeof(config.prompt),"redis %s:%d[%d]> ",
+            config.hostip, config.hostport, config.dbnum);
 }
 
 /*------------------------------------------------------------------------------
@@ -314,10 +317,9 @@ static int cliConnect(int force) {
     return REDIS_OK;
 }
 
-static void cliPrintContextErrorAndExit() {
+static void cliPrintContextError() {
     if (context == NULL) return;
     fprintf(stderr,"Error: %s\n",context->errstr);
-    exit(1);
 }
 
 static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
@@ -436,7 +438,8 @@ static int cliReadReply(int output_raw_strings) {
             if (context->err == REDIS_ERR_EOF)
                 return REDIS_ERR;
         }
-        cliPrintContextErrorAndExit();
+        cliPrintContextError();
+        exit(1);
         return REDIS_ERR; /* avoid compiler warning */
     }
 
@@ -462,10 +465,7 @@ static int cliSendCommand(int argc, char **argv, int repeat) {
     size_t *argvlen;
     int j, output_raw;
 
-    if (context == NULL) {
-        printf("Not connected, please use: connect <host> <port>\n");
-        return REDIS_OK;
-    }
+    if (context == NULL) return REDIS_ERR;
 
     output_raw = 0;
     if (!strcasecmp(command,"info") ||
@@ -518,6 +518,8 @@ static int cliSendCommand(int argc, char **argv, int repeat) {
                 cliRefreshPrompt();
             }
         }
+        if (config.interval) usleep(config.interval);
+        fflush(stdout); /* Make it grep friendly */
     }
 
     free(argvlen);
@@ -553,6 +555,10 @@ static int parseOptions(int argc, char **argv) {
         } else if (!strcmp(argv[i],"-r") && !lastarg) {
             config.repeat = strtoll(argv[i+1],NULL,10);
             i++;
+        } else if (!strcmp(argv[i],"-i") && !lastarg) {
+            double seconds = atof(argv[i+1]);
+            config.interval = seconds*1000000;
+            i++;
         } else if (!strcmp(argv[i],"-n") && !lastarg) {
             config.dbnum = atoi(argv[i+1]);
             i++;
@@ -605,6 +611,8 @@ static void usage() {
 "  -s <socket>      Server socket (overrides hostname and port)\n"
 "  -a <password>    Password to use when connecting to the server\n"
 "  -r <repeat>      Execute specified command N times\n"
+"  -i <interval>    When -r is used, waits <interval> seconds per command.\n"
+"                   It is possible to specify sub-second times like -i 0.1.\n"
 "  -n <db>          Database number\n"
 "  -x               Read last argument from STDIN\n"
 "  -d <delimiter>   Multi-bulk delimiter in for raw formatting (default: \\n)\n"
@@ -616,6 +624,7 @@ static void usage() {
 "  cat /etc/passwd | redis-cli -x set mypasswd\n"
 "  redis-cli get mypasswd\n"
 "  redis-cli -r 100 lpush mylist x\n"
+"  redis-cli -r 100 -i 1 info | grep used_memory_human:\n"
 "\n"
 "When no command is given, redis-cli starts in interactive mode.\n"
 "Type \"help\" in interactive mode for information on available commands.\n"
@@ -681,14 +690,25 @@ static void repl() {
                     linenoiseClearScreen();
                 } else {
                     long long start_time = mstime(), elapsed;
+                    int repeat, skipargs = 0;
+
+                    repeat = atoi(argv[0]);
+                    if (repeat) {
+                        skipargs = 1;
+                    } else {
+                        repeat = 1;
+                    }
 
-                    if (cliSendCommand(argc,argv,1) != REDIS_OK) {
+                    if (cliSendCommand(argc-skipargs,argv+skipargs,repeat)
+                        != REDIS_OK)
+                    {
                         cliConnect(1);
 
-                        /* If we still cannot send the command,
-                         * print error and abort. */
-                        if (cliSendCommand(argc,argv,1) != REDIS_OK)
-                            cliPrintContextErrorAndExit();
+                        /* If we still cannot send the command print error.
+                         * We'll try to reconnect the next time. */
+                        if (cliSendCommand(argc-skipargs,argv+skipargs,repeat)
+                            != REDIS_OK)
+                            cliPrintContextError();
                     }
                     elapsed = mstime()-start_time;
                     if (elapsed >= 500) {
@@ -726,6 +746,7 @@ int main(int argc, char **argv) {
     config.hostport = 6379;
     config.hostsocket = NULL;
     config.repeat = 1;
+    config.interval = 0;
     config.dbnum = 0;
     config.interactive = 0;
     config.shutdown = 0;
@@ -741,11 +762,15 @@ int main(int argc, char **argv) {
     argc -= firstarg;
     argv += firstarg;
 
-    /* Try to connect */
-    if (cliConnect(0) != REDIS_OK) exit(1);
-
     /* Start interactive mode when no command is provided */
-    if (argc == 0) repl();
+    if (argc == 0) {
+        /* Note that in repl mode we don't abort on connection error.
+         * A new attempt will be performed for every command send. */
+        cliConnect(0);
+        repl();
+    }
+
     /* Otherwise, we have some arguments to execute */
+    if (cliConnect(0) != REDIS_OK) exit(1);
     return noninteractive(argc,convertToSds(argc,argv));
 }
index 63b41ba841a0467f2057f3845e84880efdffeb2b..dc8a0032caf5329ae462f439df77b78ed8767b25 100644 (file)
@@ -116,9 +116,9 @@ struct redisCommand redisCommandTable[] = {
     {"sdiff",sdiffCommand,-2,REDIS_CMD_DENYOOM,NULL,1,-1,1,0,0},
     {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_DENYOOM,NULL,2,-1,1,0,0},
     {"smembers",sinterCommand,2,0,NULL,1,1,1,0,0},
-    {"zadd",zaddCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
+    {"zadd",zaddCommand,-4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
     {"zincrby",zincrbyCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
-    {"zrem",zremCommand,3,0,NULL,1,1,1,0,0},
+    {"zrem",zremCommand,-3,0,NULL,1,1,1,0,0},
     {"zremrangebyscore",zremrangebyscoreCommand,4,0,NULL,1,1,1,0,0},
     {"zremrangebyrank",zremrangebyrankCommand,4,0,NULL,1,1,1,0,0},
     {"zunionstore",zunionstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterGetKeys,0,0,0,0,0},
@@ -871,6 +871,7 @@ void initServerConfig() {
     server.masterport = 6379;
     server.master = NULL;
     server.replstate = REDIS_REPL_NONE;
+    server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
     server.repl_serve_stale_data = 1;
 
     /* Double constants initialization */
@@ -963,6 +964,7 @@ void initServer() {
     server.stat_keyspace_misses = 0;
     server.stat_keyspace_hits = 0;
     server.stat_peak_memory = 0;
+    server.stat_fork_time = 0;
     server.unixtime = time(NULL);
     aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
     if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
@@ -1438,7 +1440,8 @@ sds genRedisInfoString(char *section) {
             "keyspace_hits:%lld\r\n"
             "keyspace_misses:%lld\r\n"
             "pubsub_channels:%ld\r\n"
-            "pubsub_patterns:%u\r\n",
+            "pubsub_patterns:%u\r\n"
+            "latest_fork_usec:%lld\r\n",
             server.stat_numconnections,
             server.stat_numcommands,
             server.stat_expiredkeys,
@@ -1446,7 +1449,8 @@ sds genRedisInfoString(char *section) {
             server.stat_keyspace_hits,
             server.stat_keyspace_misses,
             dictSize(server.pubsub_channels),
-            listLength(server.pubsub_patterns));
+            listLength(server.pubsub_patterns),
+            server.stat_fork_time);
     }
 
     /* Replication */
index 5934b6a6f111203b6b98fd596e7e345384c95004..4249985ffad190a191d31d0b2a5de92657f8eaaa 100644 (file)
@@ -41,7 +41,6 @@
 #define REDIS_MAXIDLETIME       (60*5)  /* default client timeout */
 #define REDIS_IOBUF_LEN         1024
 #define REDIS_LOADBUF_LEN       1024
-#define REDIS_STATIC_ARGS       8
 #define REDIS_DEFAULT_DBNUM     16
 #define REDIS_CONFIGLINE_MAX    1024
 #define REDIS_MAX_SYNC_TIME     60      /* Slave can't take more to sync */
 #define REDIS_REQ_MULTIBULK 2
 
 /* Slave replication state - slave side */
-#define REDIS_REPL_NONE 0   /* No active replication */
-#define REDIS_REPL_CONNECT 1    /* Must connect to master */
-#define REDIS_REPL_TRANSFER 2    /* Receiving .rdb from master */
-#define REDIS_REPL_CONNECTED 3  /* Connected to master */
+#define REDIS_REPL_NONE 0 /* No active replication */
+#define REDIS_REPL_CONNECT 1 /* Must connect to master */
+#define REDIS_REPL_CONNECTING 2 /* Connecting to master */
+#define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */
+#define REDIS_REPL_CONNECTED 4 /* Connected to master */
+
+/* Synchronous read timeout - slave side */
+#define REDIS_REPL_SYNCIO_TIMEOUT 5
 
 /* Slave replication state - from the point of view of master
  * Note that in SEND_BULK and ONLINE state the slave receives new updates
@@ -543,6 +546,7 @@ struct redisServer {
     long long stat_keyspace_hits;   /* number of successful lookups of keys */
     long long stat_keyspace_misses; /* number of failed lookups of keys */
     size_t stat_peak_memory;        /* max used memory record */
+    long long stat_fork_time;       /* time needed to perform latets fork() */
     /* Configuration */
     int verbosity;
     int maxidletime;
@@ -585,6 +589,7 @@ struct redisServer {
     char *masterhost;
     int masterport;
     redisClient *master;    /* client that is master for this slave */
+    int repl_syncio_timeout; /* timeout for synchronous I/O calls */
     int replstate;          /* replication status if the instance is a slave */
     off_t repl_transfer_left;  /* bytes left reading .rdb  */
     int repl_transfer_s;    /* slave -> master SYNC socket */
@@ -888,7 +893,6 @@ int fwriteBulkCount(FILE *fp, char prefix, int count);
 /* Replication */
 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
 void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc);
-int syncWithMaster(void);
 void updateSlavesWaitingBgsave(int bgsaveerr);
 void replicationCron(void);
 
index b0fa7055672816a5289b04c8001648e3f191c209..c8ebb6e7fb7cf6f19bb8bb63150f2d4372706c86 100644 (file)
 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
     listNode *ln;
     listIter li;
-    int outc = 0, j;
-    robj **outv;
-    /* We need 1+(ARGS*3) objects since commands are using the new protocol
-     * and we one 1 object for the first "*<count>\r\n" multibulk count, then
-     * for every additional object we have "$<count>\r\n" + object + "\r\n". */
-    robj *static_outv[REDIS_STATIC_ARGS*3+1];
-    robj *lenobj;
-
-    if (argc <= REDIS_STATIC_ARGS) {
-        outv = static_outv;
-    } else {
-        outv = zmalloc(sizeof(robj*)*(argc*3+1));
-    }
-
-    lenobj = createObject(REDIS_STRING,
-            sdscatprintf(sdsempty(), "*%d\r\n", argc));
-    lenobj->refcount = 0;
-    outv[outc++] = lenobj;
-    for (j = 0; j < argc; j++) {
-        lenobj = createObject(REDIS_STRING,
-            sdscatprintf(sdsempty(),"$%lu\r\n",
-                (unsigned long) stringObjectLen(argv[j])));
-        lenobj->refcount = 0;
-        outv[outc++] = lenobj;
-        outv[outc++] = argv[j];
-        outv[outc++] = shared.crlf;
-    }
+    int j;
 
-    /* Increment all the refcounts at start and decrement at end in order to
-     * be sure to free objects if there is no slave in a replication state
-     * able to be feed with commands */
-    for (j = 0; j < outc; j++) incrRefCount(outv[j]);
     listRewind(slaves,&li);
     while((ln = listNext(&li))) {
         redisClient *slave = ln->value;
@@ -49,7 +19,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
         /* Don't feed slaves that are still waiting for BGSAVE to start */
         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
 
-        /* Feed all the other slaves, MONITORs and so on */
+        /* Feed slaves that are waiting for the initial SYNC (so these commands
+         * are queued in the output buffer until the intial SYNC completes),
+         * or are already in sync with the master. */
         if (slave->slaveseldb != dictid) {
             robj *selectcmd;
 
@@ -73,10 +45,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
             addReply(slave,selectcmd);
             slave->slaveseldb = dictid;
         }
-        for (j = 0; j < outc; j++) addReply(slave,outv[j]);
+        addReplyMultiBulkLen(slave,argc);
+        for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]);
     }
-    for (j = 0; j < outc; j++) decrRefCount(outv[j]);
-    if (outv != static_outv) zfree(outv);
 }
 
 void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc) {
@@ -315,19 +286,18 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
     /* If repl_transfer_left == -1 we still have to read the bulk length
      * from the master reply. */
     if (server.repl_transfer_left == -1) {
-        if (syncReadLine(fd,buf,1024,3600) == -1) {
+        if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout) == -1) {
             redisLog(REDIS_WARNING,
                 "I/O error reading bulk count from MASTER: %s",
                 strerror(errno));
-            replicationAbortSyncTransfer();
-            return;
+            goto error;
         }
+
         if (buf[0] == '-') {
             redisLog(REDIS_WARNING,
                 "MASTER aborted replication with an error: %s",
                 buf+1);
-            replicationAbortSyncTransfer();
-            return;
+            goto error;
         } else if (buf[0] == '\0') {
             /* At this stage just a newline works as a PING in order to take
              * the connection live. So we refresh our last interaction
@@ -336,8 +306,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
             return;
         } else if (buf[0] != '$') {
             redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
-            replicationAbortSyncTransfer();
-            return;
+            goto error;
         }
         server.repl_transfer_left = strtol(buf+1,NULL,10);
         redisLog(REDIS_NOTICE,
@@ -359,8 +328,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
     server.repl_transfer_lastio = time(NULL);
     if (write(server.repl_transfer_fd,buf,nread) != nread) {
         redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
-        replicationAbortSyncTransfer();
-        return;
+        goto error;
     }
     server.repl_transfer_left -= nread;
     /* Check if the transfer is now complete */
@@ -391,48 +359,54 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
         server.replstate = REDIS_REPL_CONNECTED;
         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
     }
+
+    return;
+
+error:
+    replicationAbortSyncTransfer();
+    return;
 }
 
-int syncWithMaster(void) {
-    char buf[1024], tmpfile[256], authcmd[1024];
-    int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
+void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
+    char buf[1024], tmpfile[256];
     int dfd, maxtries = 5;
+    REDIS_NOTUSED(el);
+    REDIS_NOTUSED(privdata);
+    REDIS_NOTUSED(mask);
 
-    if (fd == -1) {
-        redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
-            strerror(errno));
-        return REDIS_ERR;
-    }
+    /* This event should only be triggered once since it is used to have a
+     * non-blocking connect(2) to the master. It has been triggered when this
+     * function is called, so we can delete it. */
+    aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
 
     /* AUTH with the master if required. */
     if(server.masterauth) {
-       snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
-       if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
-            close(fd);
+        char authcmd[1024];
+        size_t authlen;
+
+        authlen = snprintf(authcmd,sizeof(authcmd),"AUTH %s\r\n",server.masterauth);
+        if (syncWrite(fd,authcmd,authlen,server.repl_syncio_timeout) == -1) {
             redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
                 strerror(errno));
-            return REDIS_ERR;
-       }
+            goto error;
+        }
         /* Read the AUTH result.  */
-        if (syncReadLine(fd,buf,1024,3600) == -1) {
-            close(fd);
+        if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout) == -1) {
             redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
                 strerror(errno));
-            return REDIS_ERR;
+            goto error;
         }
         if (buf[0] != '+') {
-            close(fd);
             redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
-            return REDIS_ERR;
+            goto error;
         }
     }
 
     /* Issue the SYNC command */
-    if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
-        close(fd);
+    if (syncWrite(fd,"SYNC \r\n",7,server.repl_syncio_timeout) == -1) {
         redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
             strerror(errno));
-        return REDIS_ERR;
+        goto error;
     }
 
     /* Prepare a suitable temp file for bulk transfer */
@@ -444,25 +418,51 @@ int syncWithMaster(void) {
         sleep(1);
     }
     if (dfd == -1) {
-        close(fd);
         redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
-        return REDIS_ERR;
+        goto error;
     }
 
     /* Setup the non blocking download of the bulk file. */
-    if (aeCreateFileEvent(server.el, fd, AE_READABLE, readSyncBulkPayload, NULL)
+    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
             == AE_ERR)
     {
-        close(fd);
         redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
-        return REDIS_ERR;
+        goto error;
     }
+
     server.replstate = REDIS_REPL_TRANSFER;
     server.repl_transfer_left = -1;
-    server.repl_transfer_s = fd;
     server.repl_transfer_fd = dfd;
     server.repl_transfer_lastio = time(NULL);
     server.repl_transfer_tmpfile = zstrdup(tmpfile);
+    return;
+
+error:
+    server.replstate = REDIS_REPL_CONNECT;
+    close(fd);
+    return;
+}
+
+int connectWithMaster(void) {
+    int fd;
+
+    fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
+    if (fd == -1) {
+        redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
+            strerror(errno));
+        return REDIS_ERR;
+    }
+
+    if (aeCreateFileEvent(server.el,fd,AE_WRITABLE,syncWithMaster,NULL) ==
+            AE_ERR)
+    {
+        close(fd);
+        redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
+        return REDIS_ERR;
+    }
+
+    server.repl_transfer_s = fd;
+    server.replstate = REDIS_REPL_CONNECTING;
     return REDIS_OK;
 }
 
@@ -517,8 +517,8 @@ void replicationCron(void) {
     /* Check if we should connect to a MASTER */
     if (server.replstate == REDIS_REPL_CONNECT) {
         redisLog(REDIS_NOTICE,"Connecting to MASTER...");
-        if (syncWithMaster() == REDIS_OK) {
-            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started: SYNC sent");
+        if (connectWithMaster() == REDIS_OK) {
+            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
             if (server.appendonly) rewriteAppendOnlyFileBackground();
         }
     }
index be083c8b0a66346c03452c80478a85d806f7cb6d..c7d05c2f1f8f82c68f10b54596bae8816481da33 100644 (file)
@@ -249,8 +249,11 @@ void sremCommand(redisClient *c) {
 
     for (j = 2; j < c->argc; j++) {
         if (setTypeRemove(set,c->argv[j])) {
-            if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
             deleted++;
+            if (setTypeSize(set) == 0) {
+                dbDelete(c->db,c->argv[1]);
+                break;
+            }
         }
     }
     if (deleted) {
index a5dc27c7685e7677f0f138d9284ddc41b25a68ab..6a56c3b4eaf8fd25dfccfd598c4778893bba4d38 100644 (file)
@@ -810,11 +810,29 @@ void zaddGenericCommand(redisClient *c, int incr) {
     robj *ele;
     robj *zobj;
     robj *curobj;
-    double score, curscore = 0.0;
+    double score = 0, *scores, curscore = 0.0;
+    int j, elements = (c->argc-2)/2;
+    int added = 0;
 
-    if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
+    if (c->argc % 2) {
+        addReply(c,shared.syntaxerr);
         return;
+    }
+
+    /* Start parsing all the scores, we need to emit any syntax error
+     * before executing additions to the sorted set, as the command should
+     * either execute fully or nothing at all. */
+    scores = zmalloc(sizeof(double)*elements);
+    for (j = 0; j < elements; j++) {
+        if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
+            != REDIS_OK)
+        {
+            zfree(scores);
+            return;
+        }
+    }
 
+    /* Lookup the key and create the sorted set if does not exist. */
     zobj = lookupKeyWrite(c->db,key);
     if (zobj == NULL) {
         if (server.zset_max_ziplist_entries == 0 ||
@@ -828,111 +846,105 @@ void zaddGenericCommand(redisClient *c, int incr) {
     } else {
         if (zobj->type != REDIS_ZSET) {
             addReply(c,shared.wrongtypeerr);
+            zfree(scores);
             return;
         }
     }
 
-    if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
-        unsigned char *eptr;
+    for (j = 0; j < elements; j++) {
+        score = scores[j];
 
-        /* Prefer non-encoded element when dealing with ziplists. */
-        ele = c->argv[3];
-        if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
-            if (incr) {
-                score += curscore;
-                if (isnan(score)) {
-                    addReplyError(c,nanerr);
-                    /* Don't need to check if the sorted set is empty, because
-                     * we know it has at least one element. */
-                    return;
+        if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+            unsigned char *eptr;
+
+            /* Prefer non-encoded element when dealing with ziplists. */
+            ele = c->argv[3+j*2];
+            if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
+                if (incr) {
+                    score += curscore;
+                    if (isnan(score)) {
+                        addReplyError(c,nanerr);
+                        /* Don't need to check if the sorted set is empty
+                         * because we know it has at least one element. */
+                        zfree(scores);
+                        return;
+                    }
                 }
-            }
 
-            /* Remove and re-insert when score changed. */
-            if (score != curscore) {
-                zobj->ptr = zzlDelete(zobj->ptr,eptr);
+                /* Remove and re-insert when score changed. */
+                if (score != curscore) {
+                    zobj->ptr = zzlDelete(zobj->ptr,eptr);
+                    zobj->ptr = zzlInsert(zobj->ptr,ele,score);
+
+                    signalModifiedKey(c->db,key);
+                    server.dirty++;
+                }
+            } else {
+                /* Optimize: check if the element is too large or the list
+                 * becomes too long *before* executing zzlInsert. */
                 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
+                if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
+                    zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
+                if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
+                    zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
 
                 signalModifiedKey(c->db,key);
                 server.dirty++;
+                if (!incr) added++;
             }
+        } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
+            zset *zs = zobj->ptr;
+            zskiplistNode *znode;
+            dictEntry *de;
 
-            if (incr) /* ZINCRBY */
-                addReplyDouble(c,score);
-            else /* ZADD */
-                addReply(c,shared.czero);
-        } else {
-            /* Optimize: check if the element is too large or the list becomes
-             * too long *before* executing zzlInsert. */
-            zobj->ptr = zzlInsert(zobj->ptr,ele,score);
-            if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
-                zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
-            if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
-                zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
-
-            signalModifiedKey(c->db,key);
-            server.dirty++;
-
-            if (incr) /* ZINCRBY */
-                addReplyDouble(c,score);
-            else /* ZADD */
-                addReply(c,shared.cone);
-        }
-    } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
-        zset *zs = zobj->ptr;
-        zskiplistNode *znode;
-        dictEntry *de;
-
-        ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
-        de = dictFind(zs->dict,ele);
-        if (de != NULL) {
-            curobj = dictGetEntryKey(de);
-            curscore = *(double*)dictGetEntryVal(de);
-
-            if (incr) {
-                score += curscore;
-                if (isnan(score)) {
-                    addReplyError(c,nanerr);
-                    /* Don't need to check if the sorted set is empty, because
-                     * we know it has at least one element. */
-                    return;
+            ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
+            de = dictFind(zs->dict,ele);
+            if (de != NULL) {
+                curobj = dictGetEntryKey(de);
+                curscore = *(double*)dictGetEntryVal(de);
+
+                if (incr) {
+                    score += curscore;
+                    if (isnan(score)) {
+                        addReplyError(c,nanerr);
+                        /* Don't need to check if the sorted set is empty
+                         * because we know it has at least one element. */
+                        zfree(scores);
+                        return;
+                    }
                 }
-            }
 
-            /* Remove and re-insert when score changed. We can safely delete
-             * the key object from the skiplist, since the dictionary still has
-             * a reference to it. */
-            if (score != curscore) {
-                redisAssert(zslDelete(zs->zsl,curscore,curobj));
-                znode = zslInsert(zs->zsl,score,curobj);
-                incrRefCount(curobj); /* Re-inserted in skiplist. */
-                dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
+                /* Remove and re-insert when score changed. We can safely
+                 * delete the key object from the skiplist, since the
+                 * dictionary still has a reference to it. */
+                if (score != curscore) {
+                    redisAssert(zslDelete(zs->zsl,curscore,curobj));
+                    znode = zslInsert(zs->zsl,score,curobj);
+                    incrRefCount(curobj); /* Re-inserted in skiplist. */
+                    dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
+
+                    signalModifiedKey(c->db,key);
+                    server.dirty++;
+                }
+            } else {
+                znode = zslInsert(zs->zsl,score,ele);
+                incrRefCount(ele); /* Inserted in skiplist. */
+                redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
+                incrRefCount(ele); /* Added to dictionary. */
 
                 signalModifiedKey(c->db,key);
                 server.dirty++;
+                if (!incr) added++;
             }
-
-            if (incr) /* ZINCRBY */
-                addReplyDouble(c,score);
-            else /* ZADD */
-                addReply(c,shared.czero);
         } else {
-            znode = zslInsert(zs->zsl,score,ele);
-            incrRefCount(ele); /* Inserted in skiplist. */
-            redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
-            incrRefCount(ele); /* Added to dictionary. */
-
-            signalModifiedKey(c->db,key);
-            server.dirty++;
-
-            if (incr) /* ZINCRBY */
-                addReplyDouble(c,score);
-            else /* ZADD */
-                addReply(c,shared.cone);
+            redisPanic("Unknown sorted set encoding");
         }
-    } else {
-        redisPanic("Unknown sorted set encoding");
     }
+    zfree(scores);
+    if (incr) /* ZINCRBY */
+        addReplyDouble(c,score);
+    else /* ZADD */
+        addReplyLongLong(c,added);
 }
 
 void zaddCommand(redisClient *c) {
@@ -945,8 +957,8 @@ void zincrbyCommand(redisClient *c) {
 
 void zremCommand(redisClient *c) {
     robj *key = c->argv[1];
-    robj *ele = c->argv[2];
     robj *zobj;
+    int deleted = 0, j;
 
     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
         checkType(c,zobj,REDIS_ZSET)) return;
@@ -954,39 +966,48 @@ void zremCommand(redisClient *c) {
     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
         unsigned char *eptr;
 
-        if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
-            zobj->ptr = zzlDelete(zobj->ptr,eptr);
-            if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
-        } else {
-            addReply(c,shared.czero);
-            return;
+        for (j = 2; j < c->argc; j++) {
+            if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
+                deleted++;
+                zobj->ptr = zzlDelete(zobj->ptr,eptr);
+                if (zzlLength(zobj->ptr) == 0) {
+                    dbDelete(c->db,key);
+                    break;
+                }
+            }
         }
     } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
         zset *zs = zobj->ptr;
         dictEntry *de;
         double score;
 
-        de = dictFind(zs->dict,ele);
-        if (de != NULL) {
-            /* Delete from the skiplist */
-            score = *(double*)dictGetEntryVal(de);
-            redisAssert(zslDelete(zs->zsl,score,ele));
-
-            /* Delete from the hash table */
-            dictDelete(zs->dict,ele);
-            if (htNeedsResize(zs->dict)) dictResize(zs->dict);
-            if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
-        } else {
-            addReply(c,shared.czero);
-            return;
+        for (j = 2; j < c->argc; j++) {
+            de = dictFind(zs->dict,c->argv[j]);
+            if (de != NULL) {
+                deleted++;
+
+                /* Delete from the skiplist */
+                score = *(double*)dictGetEntryVal(de);
+                redisAssert(zslDelete(zs->zsl,score,c->argv[j]));
+
+                /* Delete from the hash table */
+                dictDelete(zs->dict,c->argv[j]);
+                if (htNeedsResize(zs->dict)) dictResize(zs->dict);
+                if (dictSize(zs->dict) == 0) {
+                    dbDelete(c->db,key);
+                    break;
+                }
+            }
         }
     } else {
         redisPanic("Unknown sorted set encoding");
     }
 
-    signalModifiedKey(c->db,key);
-    server.dirty++;
-    addReply(c,shared.cone);
+    if (deleted) {
+        signalModifiedKey(c->db,key);
+        server.dirty += deleted;
+    }
+    addReplyLongLong(c,deleted);
 }
 
 void zremrangebyscoreCommand(redisClient *c) {
index 6dc85eff37c4a137fad0c382d76af675335f0d52..e2a9e52524da91f7cb651c2c1dce4268a4802b8b 100644 (file)
@@ -110,6 +110,13 @@ proc cleanup {} {
 }
 
 proc execute_everything {} {
+    if 0 {
+        # Use this when hacking on new tests.
+        set ::verbose 1
+        execute_tests "unit/first"
+        return
+    }
+
     execute_tests "unit/printver"
     execute_tests "unit/auth"
     execute_tests "unit/protocol"
index 1a37ed616ae48648996f80722f1e22bf2237b5aa..bdd1f9bfa11955a2026ccb9caad974204126267a 100644 (file)
@@ -105,6 +105,12 @@ start_server {
         lsort [r smembers myset]
     } {a c}
 
+    test {SREM variadic version with more args needed to destroy the key} {
+        r del myset
+        r sadd myset 1 2 3
+        r srem myset 1 2 3 4 5 6 7 8
+    } {3}
+
     foreach {type} {hashtable intset} {
         for {set i 1} {$i <= 5} {incr i} {
             r del [format "set%d" $i]
index 761cac49cc77a6553cfb7a0ca829729f125fbf25..46d40f6fb10ffeeec7931e6bec8f646f985e47de 100644 (file)
@@ -48,6 +48,34 @@ start_server {tags {"zset"}} {
             assert_error "*NaN*" {r zincrby myzset -inf abc}
         }
 
+        test {ZADD - Variadic version base case} {
+            r del myzset
+            list [r zadd myzset 10 a 20 b 30 c] [r zrange myzset 0 -1 withscores]
+        } {3 {a 10 b 20 c 30}}
+
+        test {ZADD - Return value is the number of actually added items} {
+            list [r zadd myzset 5 x 20 b 30 c] [r zrange myzset 0 -1 withscores]
+        } {1 {x 5 a 10 b 20 c 30}}
+
+        test {ZADD - Variadic version does not add nothing on single parsing err} {
+            r del myzset
+            catch {r zadd myzset 10 a 20 b 30.badscore c} e
+            assert_match {*ERR*not*double*} $e
+            r exists myzset
+        } {0}
+
+        test {ZADD - Variadic version will raise error on missing arg} {
+            r del myzset
+            catch {r zadd myzset 10 a 20 b 30 c 40} e
+            assert_match {*ERR*syntax*} $e
+        }
+
+        test {ZINCRBY does not work variadic even if shares ZADD implementation} {
+            r del myzset
+            catch {r zincrby myzset 10 a 20 b 30 c} e
+            assert_match {*ERR*wrong*number*arg*} $e
+        }
+
         test "ZCARD basics - $encoding" {
             assert_equal 3 [r zcard ztmp]
             assert_equal 0 [r zcard zdoesntexist]
@@ -65,6 +93,21 @@ start_server {tags {"zset"}} {
             assert_equal 0 [r exists ztmp]
         }
 
+        test "ZREM variadic version" {
+            r del ztmp
+            r zadd ztmp 10 a 20 b 30 c
+            assert_equal 2 [r zrem ztmp x y a b k]
+            assert_equal 0 [r zrem ztmp foo bar]
+            assert_equal 1 [r zrem ztmp c]
+            r exists ztmp
+        } {0}
+
+        test "ZREM variadic version -- remove elements after key deletion" {
+            r del ztmp
+            r zadd ztmp 10 a 20 b 30 c
+            r zrem ztmp a b c d e f g
+        } {3}
+
         test "ZRANGE basics - $encoding" {
             r del ztmp
             r zadd ztmp 1 a