]> git.saurik.com Git - redis.git/blobdiff - redis.c
sorted sets saving fixed
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index 48ad6287f0d6cdb98df81e02d8bac1283fabe09d..9221dc0d2ffb3b2bdf8624bc82b31c79e5bc5290 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -27,7 +27,7 @@
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
-#define REDIS_VERSION "1.050"
+#define REDIS_VERSION "1.06"
 
 #include "fmacros.h"
 #include "config.h"
@@ -56,6 +56,7 @@
 #include <fcntl.h>
 #include <sys/time.h>
 #include <sys/resource.h>
+#include <sys/uio.h>
 #include <limits.h>
 #include <math.h>
 
 #define REDIS_MAX_SYNC_TIME     60      /* Slave can't take more to sync */
 #define REDIS_EXPIRELOOKUPS_PER_CRON    100 /* try to expire 100 keys/second */
 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
-#define REDIS_REQUEST_MAX_SIZE  (1024*1024*256) /* max bytes in inline command */
+#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
+
+/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
+#define REDIS_WRITEV_THRESHOLD      3
+/* Max number of iovecs used for each writev call */
+#define REDIS_WRITEV_IOVEC_COUNT    256
 
 /* Hash table parameters */
 #define REDIS_HT_MINFILL        10      /* Minimal hash table fill 10% */
@@ -394,6 +400,7 @@ static void processInputBuffer(redisClient *c);
 static zskiplist *zslCreate(void);
 static void zslFree(zskiplist *zsl);
 static void zslInsert(zskiplist *zsl, double score, robj *obj);
+static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
@@ -446,6 +453,7 @@ static void flushdbCommand(redisClient *c);
 static void flushallCommand(redisClient *c);
 static void sortCommand(redisClient *c);
 static void lremCommand(redisClient *c);
+static void rpoplpushcommand(redisClient *c);
 static void infoCommand(redisClient *c);
 static void mgetCommand(redisClient *c);
 static void monitorCommand(redisClient *c);
@@ -458,6 +466,7 @@ static void debugCommand(redisClient *c);
 static void msetCommand(redisClient *c);
 static void msetnxCommand(redisClient *c);
 static void zaddCommand(redisClient *c);
+static void zincrbyCommand(redisClient *c);
 static void zrangeCommand(redisClient *c);
 static void zrangebyscoreCommand(redisClient *c);
 static void zrevrangeCommand(redisClient *c);
@@ -489,6 +498,7 @@ static struct redisCommand cmdTable[] = {
     {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
     {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
     {"lrem",lremCommand,4,REDIS_CMD_BULK},
+    {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK},
     {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
     {"srem",sremCommand,3,REDIS_CMD_BULK},
     {"smove",smoveCommand,4,REDIS_CMD_BULK},
@@ -504,6 +514,7 @@ static struct redisCommand cmdTable[] = {
     {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
     {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
     {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
+    {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
     {"zrem",zremCommand,3,REDIS_CMD_BULK},
     {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
     {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
@@ -544,6 +555,7 @@ static struct redisCommand cmdTable[] = {
     {"debug",debugCommand,-2,REDIS_CMD_INLINE},
     {NULL,NULL,0,0}
 };
+
 /*============================ Utility functions ============================ */
 
 /* Glob-style pattern matching. */
@@ -903,7 +915,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
     /* Check if a background saving in progress terminated */
     if (server.bgsaveinprogress) {
         int statloc;
-        if (wait4(-1,&statloc,WNOHANG,NULL)) {
+        if (wait3(&statloc,WNOHANG,NULL)) {
             int exitcode = WEXITSTATUS(statloc);
             int bysignal = WIFSIGNALED(statloc);
 
@@ -940,14 +952,21 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
          }
     }
 
-    /* Try to expire a few timed out keys */
+    /* Try to expire a few timed out keys. The algorithm used is adaptive and
+     * will use few CPU cycles if there are few expiring keys, otherwise
+     * it will get more aggressive to avoid that too much memory is used by
+     * keys that can be removed from the keyspace. */
     for (j = 0; j < server.dbnum; j++) {
+        int expired;
         redisDb *db = server.db+j;
-        int num = dictSize(db->expires);
 
-        if (num) {
+        /* Continue to expire if at the end of the cycle more than 25%
+         * of the keys were expired. */
+        do {
+            int num = dictSize(db->expires);
             time_t now = time(NULL);
 
+            expired = 0;
             if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
                 num = REDIS_EXPIRELOOKUPS_PER_CRON;
             while (num--) {
@@ -958,9 +977,10 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
                 t = (time_t) dictGetEntryVal(de);
                 if (now > t) {
                     deleteKey(db,dictGetEntryKey(de));
+                    expired++;
                 }
             }
-        }
+        } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
     }
 
     /* Check if we should connect to a MASTER */
@@ -1017,7 +1037,7 @@ static void appendServerSaveParams(time_t seconds, int changes) {
     server.saveparamslen++;
 }
 
-static void ResetServerSaveParams() {
+static void resetServerSaveParams() {
     zfree(server.saveparams);
     server.saveparams = NULL;
     server.saveparamslen = 0;
@@ -1046,7 +1066,7 @@ static void initServerConfig() {
     server.sharingpoolsize = 1024;
     server.maxclients = 0;
     server.maxmemory = 0;
-    ResetServerSaveParams();
+    resetServerSaveParams();
 
     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
     appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
@@ -1331,34 +1351,31 @@ static void freeClient(redisClient *c) {
     zfree(c);
 }
 
+#define GLUEREPLY_UP_TO (1024)
 static void glueReplyBuffersIfNeeded(redisClient *c) {
-    int totlen = 0;
+    int copylen = 0;
+    char buf[GLUEREPLY_UP_TO];
     listNode *ln;
     robj *o;
 
     listRewind(c->reply);
     while((ln = listYield(c->reply))) {
+        int objlen;
+
         o = ln->value;
-        totlen += sdslen(o->ptr);
-        /* This optimization makes more sense if we don't have to copy
-         * too much data */
-        if (totlen > 1024) return;
-    }
-    if (totlen > 0) {
-        char buf[1024];
-        int copylen = 0;
-
-        listRewind(c->reply);
-        while((ln = listYield(c->reply))) {
-            o = ln->value;
-            memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
-            copylen += sdslen(o->ptr);
+        objlen = sdslen(o->ptr);
+        if (copylen + objlen <= GLUEREPLY_UP_TO) {
+            memcpy(buf+copylen,o->ptr,objlen);
+            copylen += objlen;
             listDelNode(c->reply,ln);
+        } else {
+            if (copylen == 0) return;
+            break;
         }
-        /* Now the output buffer is empty, add the new single element */
-        o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
-        listAddNodeTail(c->reply,o);
     }
+    /* Now the output buffer is empty, add the new single element */
+    o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
+    listAddNodeHead(c->reply,o);
 }
 
 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
@@ -1368,9 +1385,19 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(mask);
 
-    if (server.glueoutputbuf && listLength(c->reply) > 1)
-        glueReplyBuffersIfNeeded(c);
+    /* Use writev() if we have enough buffers to send */
+    if (!server.glueoutputbuf &&
+        listLength(c->reply) > REDIS_WRITEV_THRESHOLD && 
+        !(c->flags & REDIS_MASTER))
+    {
+        sendReplyToClientWritev(el, fd, privdata, mask);
+        return;
+    }
+
     while(listLength(c->reply)) {
+        if (server.glueoutputbuf && listLength(c->reply) > 1)
+            glueReplyBuffersIfNeeded(c);
+
         o = listNodeValue(listFirst(c->reply));
         objlen = sdslen(o->ptr);
 
@@ -1394,10 +1421,10 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
             c->sentlen = 0;
         }
         /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
-         * bytes, in a single threaded server it's a good idea to server
+         * bytes, in a single threaded server it's a good idea to serve
          * other clients as well, even if a very large request comes from
          * super fast link that is always able to accept data (in real world
-         * terms think to 'KEYS *' against the loopback interfae) */
+         * scenario think about 'KEYS *' against the loopback interfae) */
         if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
     }
     if (nwritten == -1) {
@@ -1417,6 +1444,84 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
     }
 }
 
+static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
+{
+    redisClient *c = privdata;
+    int nwritten = 0, totwritten = 0, objlen, willwrite;
+    robj *o;
+    struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
+    int offset, ion = 0;
+    REDIS_NOTUSED(el);
+    REDIS_NOTUSED(mask);
+
+    listNode *node;
+    while (listLength(c->reply)) {
+        offset = c->sentlen;
+        ion = 0;
+        willwrite = 0;
+
+        /* fill-in the iov[] array */
+        for(node = listFirst(c->reply); node; node = listNextNode(node)) {
+            o = listNodeValue(node);
+            objlen = sdslen(o->ptr);
+
+            if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT) 
+                break;
+
+            if(ion == REDIS_WRITEV_IOVEC_COUNT)
+                break; /* no more iovecs */
+
+            iov[ion].iov_base = ((char*)o->ptr) + offset;
+            iov[ion].iov_len = objlen - offset;
+            willwrite += objlen - offset;
+            offset = 0; /* just for the first item */
+            ion++;
+        }
+
+        if(willwrite == 0)
+            break;
+
+        /* write all collected blocks at once */
+        if((nwritten = writev(fd, iov, ion)) < 0) {
+            if (errno != EAGAIN) {
+                redisLog(REDIS_DEBUG,
+                         "Error writing to client: %s", strerror(errno));
+                freeClient(c);
+                return;
+            }
+            break;
+        }
+
+        totwritten += nwritten;
+        offset = c->sentlen;
+
+        /* remove written robjs from c->reply */
+        while (nwritten && listLength(c->reply)) {
+            o = listNodeValue(listFirst(c->reply));
+            objlen = sdslen(o->ptr);
+
+            if(nwritten >= objlen - offset) {
+                listDelNode(c->reply, listFirst(c->reply));
+                nwritten -= objlen - offset;
+                c->sentlen = 0;
+            } else {
+                /* partial write */
+                c->sentlen += nwritten;
+                break;
+            }
+            offset = 0;
+        }
+    }
+
+    if (totwritten > 0) 
+        c->lastinteraction = time(NULL);
+
+    if (listLength(c->reply) == 0) {
+        c->sentlen = 0;
+        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
+    }
+}
+
 static struct redisCommand *lookupCommand(char *name) {
     int j = 0;
     while(cmdTable[j].name != NULL) {
@@ -1807,7 +1912,7 @@ static redisClient *createClient(int fd) {
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
     if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
-        readQueryFromClient, c, NULL) == AE_ERR) {
+        readQueryFromClient, c) == AE_ERR) {
         freeClient(c);
         return NULL;
     }
@@ -1820,7 +1925,7 @@ static void addReply(redisClient *c, robj *obj) {
         (c->replstate == REDIS_REPL_NONE ||
          c->replstate == REDIS_REPL_ONLINE) &&
         aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
-        sendReplyToClient, c, NULL) == AE_ERR) return;
+        sendReplyToClient, c) == AE_ERR) return;
     if (obj->encoding != REDIS_ENCODING_RAW) {
         obj = getDecodedObject(obj);
     } else {
@@ -1835,6 +1940,14 @@ static void addReplySds(redisClient *c, sds s) {
     decrRefCount(o);
 }
 
+static void addReplyDouble(redisClient *c, double d) {
+    char buf[128];
+
+    snprintf(buf,sizeof(buf),"%.17g",d);
+    addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
+        strlen(buf),buf));
+}
+
 static void addReplyBulkLen(redisClient *c, robj *obj) {
     size_t len;
 
@@ -1882,7 +1995,9 @@ static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
         char *err = "-ERR max number of clients reached\r\n";
 
         /* That's a best effort error message, don't check write errors */
-        (void) write(c->fd,err,strlen(err));
+        if (write(c->fd,err,strlen(err)) == -1) {
+            /* Nothing to do, Just to avoid the warning... */
+        }
         freeClient(c);
         return;
     }
@@ -2336,7 +2451,7 @@ static int rdbSaveDoubleValue(FILE *fp, double val) {
         buf[0] = (val < 0) ? 255 : 254;
     } else {
         snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
-        buf[0] = strlen((char*)buf);
+        buf[0] = strlen((char*)buf+1);
         len = buf[0]+1;
     }
     if (fwrite(buf,len,1,fp) == 0) return -1;
@@ -2861,6 +2976,49 @@ static void mgetCommand(redisClient *c) {
     }
 }
 
+static void msetGenericCommand(redisClient *c, int nx) {
+    int j;
+
+    if ((c->argc % 2) == 0) {
+        addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
+        return;
+    }
+    /* Handle the NX flag. The MSETNX semantic is to return zero and don't
+     * set nothing at all if at least one already key exists. */
+    if (nx) {
+        for (j = 1; j < c->argc; j += 2) {
+            if (dictFind(c->db->dict,c->argv[j]) != NULL) {
+                addReply(c, shared.czero);
+                return;
+            }
+        }
+    }
+
+    for (j = 1; j < c->argc; j += 2) {
+        int retval;
+
+        retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
+        if (retval == DICT_ERR) {
+            dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
+            incrRefCount(c->argv[j+1]);
+        } else {
+            incrRefCount(c->argv[j]);
+            incrRefCount(c->argv[j+1]);
+        }
+        removeExpire(c->db,c->argv[j]);
+    }
+    server.dirty += (c->argc-1)/2;
+    addReply(c, nx ? shared.cone : shared.ok);
+}
+
+static void msetCommand(redisClient *c) {
+    msetGenericCommand(c,0);
+}
+
+static void msetnxCommand(redisClient *c) {
+    msetGenericCommand(c,1);
+}
+
 static void incrDecrCommand(redisClient *c, long long incr) {
     long long value;
     int retval;
@@ -3459,6 +3617,70 @@ static void lremCommand(redisClient *c) {
     }
 }
 
+/* This is the semantic of this command:
+ *  RPOPLPUSH srclist dstlist:
+ *   IF LLEN(srclist) > 0
+ *     element = RPOP srclist
+ *     LPUSH dstlist element
+ *     RETURN element
+ *   ELSE
+ *     RETURN nil
+ *   END
+ *  END
+ *
+ * The idea is to be able to get an element from a list in a reliable way
+ * since the element is not just returned but pushed against another list
+ * as well. This command was originally proposed by Ezra Zygmuntowicz.
+ */
+static void rpoplpushcommand(redisClient *c) {
+    robj *sobj;
+
+    sobj = lookupKeyWrite(c->db,c->argv[1]);
+    if (sobj == NULL) {
+        addReply(c,shared.nullbulk);
+    } else {
+        if (sobj->type != REDIS_LIST) {
+            addReply(c,shared.wrongtypeerr);
+        } else {
+            list *srclist = sobj->ptr;
+            listNode *ln = listLast(srclist);
+
+            if (ln == NULL) {
+                addReply(c,shared.nullbulk);
+            } else {
+                robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
+                robj *ele = listNodeValue(ln);
+                list *dstlist;
+
+                if (dobj == NULL) {
+
+                    /* Create the list if the key does not exist */
+                    dobj = createListObject();
+                    dictAdd(c->db->dict,c->argv[2],dobj);
+                    incrRefCount(c->argv[2]);
+                } else if (dobj->type != REDIS_LIST) {
+                    addReply(c,shared.wrongtypeerr);
+                    return;
+                }
+                /* Add the element to the target list */
+                dstlist = dobj->ptr;
+                listAddNodeHead(dstlist,ele);
+                incrRefCount(ele);
+
+                /* Send the element to the client as reply as well */
+                addReplyBulkLen(c,ele);
+                addReply(c,ele);
+                addReply(c,shared.crlf);
+
+                /* Finally remove the element from the source list */
+                listDelNode(srclist,ln);
+                server.dirty++;
+            }
+        }
+    }
+}
+
+
 /* ==================================== Sets ================================ */
 
 static void saddCommand(redisClient *c) {
@@ -4050,56 +4272,101 @@ static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
 
 /* The actual Z-commands implementations */
 
-static void zaddCommand(redisClient *c) {
+/* This generic command implements both ZADD and ZINCRBY.
+ * scoreval is the score if the operation is a ZADD (doincrement == 0) or
+ * the increment if the operation is a ZINCRBY (doincrement == 1). */
+static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
     robj *zsetobj;
     zset *zs;
     double *score;
 
-    zsetobj = lookupKeyWrite(c->db,c->argv[1]);
+    zsetobj = lookupKeyWrite(c->db,key);
     if (zsetobj == NULL) {
         zsetobj = createZsetObject();
-        dictAdd(c->db->dict,c->argv[1],zsetobj);
-        incrRefCount(c->argv[1]);
+        dictAdd(c->db->dict,key,zsetobj);
+        incrRefCount(key);
     } else {
         if (zsetobj->type != REDIS_ZSET) {
             addReply(c,shared.wrongtypeerr);
             return;
         }
     }
-    score = zmalloc(sizeof(double));
-    *score = strtod(c->argv[2]->ptr,NULL);
     zs = zsetobj->ptr;
-    if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
+
+    /* Ok now since we implement both ZADD and ZINCRBY here the code
+     * needs to handle the two different conditions. It's all about setting
+     * '*score', that is, the new score to set, to the right value. */
+    score = zmalloc(sizeof(double));
+    if (doincrement) {
+        dictEntry *de;
+
+        /* Read the old score. If the element was not present starts from 0 */
+        de = dictFind(zs->dict,ele);
+        if (de) {
+            double *oldscore = dictGetEntryVal(de);
+            *score = *oldscore + scoreval;
+        } else {
+            *score = scoreval;
+        }
+    } else {
+        *score = scoreval;
+    }
+
+    /* What follows is a simple remove and re-insert operation that is common
+     * to both ZADD and ZINCRBY... */
+    if (dictAdd(zs->dict,ele,score) == DICT_OK) {
         /* case 1: New element */
-        incrRefCount(c->argv[3]); /* added to hash */
-        zslInsert(zs->zsl,*score,c->argv[3]);
-        incrRefCount(c->argv[3]); /* added to skiplist */
+        incrRefCount(ele); /* added to hash */
+        zslInsert(zs->zsl,*score,ele);
+        incrRefCount(ele); /* added to skiplist */
         server.dirty++;
-        addReply(c,shared.cone);
+        if (doincrement)
+            addReplyDouble(c,*score);
+        else
+            addReply(c,shared.cone);
     } else {
         dictEntry *de;
         double *oldscore;
         
         /* case 2: Score update operation */
-        de = dictFind(zs->dict,c->argv[3]);
+        de = dictFind(zs->dict,ele);
         assert(de != NULL);
         oldscore = dictGetEntryVal(de);
         if (*score != *oldscore) {
             int deleted;
 
-            deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
+            /* Remove and insert the element in the skip list with new score */
+            deleted = zslDelete(zs->zsl,*oldscore,ele);
             assert(deleted != 0);
-            zslInsert(zs->zsl,*score,c->argv[3]);
-            incrRefCount(c->argv[3]);
-            dictReplace(zs->dict,c->argv[3],score);
+            zslInsert(zs->zsl,*score,ele);
+            incrRefCount(ele);
+            /* Update the score in the hash table */
+            dictReplace(zs->dict,ele,score);
             server.dirty++;
         } else {
             zfree(score);
         }
-        addReply(c,shared.czero);
+        if (doincrement)
+            addReplyDouble(c,*score);
+        else
+            addReply(c,shared.czero);
     }
 }
 
+static void zaddCommand(redisClient *c) {
+    double scoreval;
+
+    scoreval = strtod(c->argv[2]->ptr,NULL);
+    zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
+}
+
+static void zincrbyCommand(redisClient *c) {
+    double scoreval;
+
+    scoreval = strtod(c->argv[2]->ptr,NULL);
+    zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
+}
+
 static void zremCommand(redisClient *c) {
     robj *zsetobj;
     zset *zs;
@@ -4295,7 +4562,7 @@ static void zscoreCommand(redisClient *c) {
     
     o = lookupKeyRead(c->db,c->argv[1]);
     if (o == NULL) {
-        addReply(c,shared.czero);
+        addReply(c,shared.nullbulk);
         return;
     } else {
         if (o->type != REDIS_ZSET) {
@@ -4308,12 +4575,9 @@ static void zscoreCommand(redisClient *c) {
             if (!de) {
                 addReply(c,shared.nullbulk);
             } else {
-                char buf[128];
                 double *score = dictGetEntryVal(de);
 
-                snprintf(buf,sizeof(buf),"%.17g",*score);
-                addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
-                    strlen(buf),buf));
+                addReplyDouble(c,*score);
             }
         }
     }
@@ -4356,17 +4620,31 @@ static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
         char buf[REDIS_SORTKEY_MAX+1];
     } keyname;
 
+    /* If the pattern is "#" return the substitution object itself in order
+     * to implement the "SORT ... GET #" feature. */
+    spat = pattern->ptr;
+    if (spat[0] == '#' && spat[1] == '\0') {
+        return subst;
+    }
+
+    /* The substitution object may be specially encoded. If so we create
+     * a decoded object on the fly. */
     if (subst->encoding == REDIS_ENCODING_RAW)
+        /* If we don't need to get a decoded object increment the refcount
+         * so that the final decrRefCount() call will restore the original
+         * count */
         incrRefCount(subst);
     else {
         subst = getDecodedObject(subst);
     }
 
-    spat = pattern->ptr;
     ssub = subst->ptr;
     if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
     p = strchr(spat,'*');
-    if (!p) return NULL;
+    if (!p) {
+        decrRefCount(subst);
+        return NULL;
+    }
 
     prefixlen = p-spat;
     sublen = sdslen(ssub);
@@ -4859,49 +5137,6 @@ static void ttlCommand(redisClient *c) {
     addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
 }
 
-static void msetGenericCommand(redisClient *c, int nx) {
-    int j;
-
-    if ((c->argc % 2) == 0) {
-        addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
-        return;
-    }
-    /* Handle the NX flag. The MSETNX semantic is to return zero and don't
-     * set nothing at all if at least one already key exists. */
-    if (nx) {
-        for (j = 1; j < c->argc; j += 2) {
-            if (dictFind(c->db->dict,c->argv[j]) != NULL) {
-                addReply(c, shared.czero);
-                return;
-            }
-        }
-    }
-
-    for (j = 1; j < c->argc; j += 2) {
-        int retval;
-
-        retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
-        if (retval == DICT_ERR) {
-            dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
-            incrRefCount(c->argv[j+1]);
-        } else {
-            incrRefCount(c->argv[j]);
-            incrRefCount(c->argv[j+1]);
-        }
-        removeExpire(c->db,c->argv[j]);
-    }
-    server.dirty += (c->argc-1)/2;
-    addReply(c, nx ? shared.cone : shared.ok);
-}
-
-static void msetCommand(redisClient *c) {
-    msetGenericCommand(c,0);
-}
-
-static void msetnxCommand(redisClient *c) {
-    msetGenericCommand(c,1);
-}
-
 /* =============================== Replication  ============================= */
 
 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
@@ -5069,7 +5304,7 @@ static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
         aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
         slave->replstate = REDIS_REPL_ONLINE;
         if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
-            sendReplyToClient, slave, NULL) == AE_ERR) {
+            sendReplyToClient, slave) == AE_ERR) {
             freeClient(slave);
             return;
         }
@@ -5113,7 +5348,7 @@ static void updateSlavesWaitingBgsave(int bgsaveerr) {
             slave->repldbsize = buf.st_size;
             slave->replstate = REDIS_REPL_SEND_BULK;
             aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
-            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
+            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                 freeClient(slave);
                 continue;
             }
@@ -5526,149 +5761,90 @@ static void debugCommand(redisClient *c) {
     }
 }
 
-#ifdef HAVE_BACKTRACE
-static struct redisFunctionSym symsTable[] = {
-{"compareStringObjects", (unsigned long)compareStringObjects},
-{"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
-{"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
-{"dictEncObjHash", (unsigned long)dictEncObjHash},
-{"incrDecrCommand", (unsigned long)incrDecrCommand},
-{"freeStringObject", (unsigned long)freeStringObject},
-{"freeListObject", (unsigned long)freeListObject},
-{"freeSetObject", (unsigned long)freeSetObject},
-{"decrRefCount", (unsigned long)decrRefCount},
-{"createObject", (unsigned long)createObject},
-{"freeClient", (unsigned long)freeClient},
-{"rdbLoad", (unsigned long)rdbLoad},
-{"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
-{"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
-{"addReply", (unsigned long)addReply},
-{"addReplySds", (unsigned long)addReplySds},
-{"incrRefCount", (unsigned long)incrRefCount},
-{"rdbSaveBackground", (unsigned long)rdbSaveBackground},
-{"createStringObject", (unsigned long)createStringObject},
-{"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
-{"syncWithMaster", (unsigned long)syncWithMaster},
-{"tryObjectSharing", (unsigned long)tryObjectSharing},
-{"tryObjectEncoding", (unsigned long)tryObjectEncoding},
-{"getDecodedObject", (unsigned long)getDecodedObject},
-{"removeExpire", (unsigned long)removeExpire},
-{"expireIfNeeded", (unsigned long)expireIfNeeded},
-{"deleteIfVolatile", (unsigned long)deleteIfVolatile},
-{"deleteKey", (unsigned long)deleteKey},
-{"getExpire", (unsigned long)getExpire},
-{"setExpire", (unsigned long)setExpire},
-{"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
-{"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
-{"authCommand", (unsigned long)authCommand},
-{"pingCommand", (unsigned long)pingCommand},
-{"echoCommand", (unsigned long)echoCommand},
-{"setCommand", (unsigned long)setCommand},
-{"setnxCommand", (unsigned long)setnxCommand},
-{"getCommand", (unsigned long)getCommand},
-{"delCommand", (unsigned long)delCommand},
-{"existsCommand", (unsigned long)existsCommand},
-{"incrCommand", (unsigned long)incrCommand},
-{"decrCommand", (unsigned long)decrCommand},
-{"incrbyCommand", (unsigned long)incrbyCommand},
-{"decrbyCommand", (unsigned long)decrbyCommand},
-{"selectCommand", (unsigned long)selectCommand},
-{"randomkeyCommand", (unsigned long)randomkeyCommand},
-{"keysCommand", (unsigned long)keysCommand},
-{"dbsizeCommand", (unsigned long)dbsizeCommand},
-{"lastsaveCommand", (unsigned long)lastsaveCommand},
-{"saveCommand", (unsigned long)saveCommand},
-{"bgsaveCommand", (unsigned long)bgsaveCommand},
-{"shutdownCommand", (unsigned long)shutdownCommand},
-{"moveCommand", (unsigned long)moveCommand},
-{"renameCommand", (unsigned long)renameCommand},
-{"renamenxCommand", (unsigned long)renamenxCommand},
-{"lpushCommand", (unsigned long)lpushCommand},
-{"rpushCommand", (unsigned long)rpushCommand},
-{"lpopCommand", (unsigned long)lpopCommand},
-{"rpopCommand", (unsigned long)rpopCommand},
-{"llenCommand", (unsigned long)llenCommand},
-{"lindexCommand", (unsigned long)lindexCommand},
-{"lrangeCommand", (unsigned long)lrangeCommand},
-{"ltrimCommand", (unsigned long)ltrimCommand},
-{"typeCommand", (unsigned long)typeCommand},
-{"lsetCommand", (unsigned long)lsetCommand},
-{"saddCommand", (unsigned long)saddCommand},
-{"sremCommand", (unsigned long)sremCommand},
-{"smoveCommand", (unsigned long)smoveCommand},
-{"sismemberCommand", (unsigned long)sismemberCommand},
-{"scardCommand", (unsigned long)scardCommand},
-{"spopCommand", (unsigned long)spopCommand},
-{"srandmemberCommand", (unsigned long)srandmemberCommand},
-{"sinterCommand", (unsigned long)sinterCommand},
-{"sinterstoreCommand", (unsigned long)sinterstoreCommand},
-{"sunionCommand", (unsigned long)sunionCommand},
-{"sunionstoreCommand", (unsigned long)sunionstoreCommand},
-{"sdiffCommand", (unsigned long)sdiffCommand},
-{"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
-{"syncCommand", (unsigned long)syncCommand},
-{"flushdbCommand", (unsigned long)flushdbCommand},
-{"flushallCommand", (unsigned long)flushallCommand},
-{"sortCommand", (unsigned long)sortCommand},
-{"lremCommand", (unsigned long)lremCommand},
-{"infoCommand", (unsigned long)infoCommand},
-{"mgetCommand", (unsigned long)mgetCommand},
-{"monitorCommand", (unsigned long)monitorCommand},
-{"expireCommand", (unsigned long)expireCommand},
-{"expireatCommand", (unsigned long)expireatCommand},
-{"getsetCommand", (unsigned long)getsetCommand},
-{"ttlCommand", (unsigned long)ttlCommand},
-{"slaveofCommand", (unsigned long)slaveofCommand},
-{"debugCommand", (unsigned long)debugCommand},
-{"processCommand", (unsigned long)processCommand},
-{"setupSigSegvAction", (unsigned long)setupSigSegvAction},
-{"readQueryFromClient", (unsigned long)readQueryFromClient},
-{"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
-{"msetGenericCommand", (unsigned long)msetGenericCommand},
-{"msetCommand", (unsigned long)msetCommand},
-{"msetnxCommand", (unsigned long)msetnxCommand},
-{"zslCreateNode", (unsigned long)zslCreateNode},
-{"zslCreate", (unsigned long)zslCreate},
-{"zslFreeNode",(unsigned long)zslFreeNode},
-{"zslFree",(unsigned long)zslFree},
-{"zslRandomLevel",(unsigned long)zslRandomLevel},
-{"zslInsert",(unsigned long)zslInsert},
-{"zslDelete",(unsigned long)zslDelete},
-{"createZsetObject",(unsigned long)createZsetObject},
-{"zaddCommand",(unsigned long)zaddCommand},
-{"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
-{"zrangeCommand",(unsigned long)zrangeCommand},
-{"zrevrangeCommand",(unsigned long)zrevrangeCommand},
-{"zremCommand",(unsigned long)zremCommand},
-{"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
-{"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
-{"feedAppendOnlyFile",(unsigned long)feedAppendOnlyFile},
-{NULL,0}
-};
+/* =================================== Main! ================================ */
 
-/* This function try to convert a pointer into a function name. It's used in
- * oreder to provide a backtrace under segmentation fault that's able to
- * display functions declared as static (otherwise the backtrace is useless). */
-static char *findFuncName(void *pointer, unsigned long *offset){
-    int i, ret = -1;
-    unsigned long off, minoff = 0;
+#ifdef __linux__
+int linuxOvercommitMemoryValue(void) {
+    FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
+    char buf[64];
 
-    /* Try to match against the Symbol with the smallest offset */
-    for (i=0; symsTable[i].pointer; i++) {
-        unsigned long lp = (unsigned long) pointer;
+    if (!fp) return -1;
+    if (fgets(buf,64,fp) == NULL) {
+        fclose(fp);
+        return -1;
+    }
+    fclose(fp);
 
-        if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
-            off=lp-symsTable[i].pointer;
-            if (ret < 0 || off < minoff) {
-                minoff=off;
-                ret=i;
-            }
-        }
+    return atoi(buf);
+}
+
+void linuxOvercommitMemoryWarning(void) {
+    if (linuxOvercommitMemoryValue() == 0) {
+        redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
     }
-    if (ret == -1) return NULL;
-    *offset = minoff;
-    return symsTable[ret].name;
 }
+#endif /* __linux__ */
+
+static void daemonize(void) {
+    int fd;
+    FILE *fp;
+
+    if (fork() != 0) exit(0); /* parent exits */
+    setsid(); /* create a new session */
+
+    /* Every output goes to /dev/null. If Redis is daemonized but
+     * the 'logfile' is set to 'stdout' in the configuration file
+     * it will not log at all. */
+    if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
+        dup2(fd, STDIN_FILENO);
+        dup2(fd, STDOUT_FILENO);
+        dup2(fd, STDERR_FILENO);
+        if (fd > STDERR_FILENO) close(fd);
+    }
+    /* Try to write the pid file */
+    fp = fopen(server.pidfile,"w");
+    if (fp) {
+        fprintf(fp,"%d\n",getpid());
+        fclose(fp);
+    }
+}
+
+int main(int argc, char **argv) {
+    initServerConfig();
+    if (argc == 2) {
+        resetServerSaveParams();
+        loadServerConfig(argv[1]);
+    } else if (argc > 2) {
+        fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
+        exit(1);
+    } else {
+        redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
+    }
+    initServer();
+    if (server.daemonize) daemonize();
+    redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
+#ifdef __linux__
+    linuxOvercommitMemoryWarning();
+#endif
+    if (server.appendonly) {
+        if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
+            redisLog(REDIS_NOTICE,"DB loaded from append only file");
+    } else {
+        if (rdbLoad(server.dbfilename) == REDIS_OK)
+            redisLog(REDIS_NOTICE,"DB loaded from disk");
+    }
+    if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
+        acceptHandler, NULL) == AE_ERR) oom("creating file event");
+    redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
+    aeMain(server.el);
+    aeDeleteEventLoop(server.el);
+    return 0;
+}
+
+/* ============================= Backtrace support ========================= */
+
+#ifdef HAVE_BACKTRACE
+static char *findFuncName(void *pointer, unsigned long *offset);
 
 static void *getMcontextEip(ucontext_t *uc) {
 #if defined(__FreeBSD__)
@@ -5764,87 +5940,39 @@ static void setupSigSegvAction(void) {
     sigaction (SIGBUS, &act, NULL);
     return;
 }
-#else /* HAVE_BACKTRACE */
-static void setupSigSegvAction(void) {
-}
-#endif /* HAVE_BACKTRACE */
 
-/* =================================== Main! ================================ */
+#include "staticsymbols.h"
+/* This function try to convert a pointer into a function name. It's used in
+ * oreder to provide a backtrace under segmentation fault that's able to
+ * display functions declared as static (otherwise the backtrace is useless). */
+static char *findFuncName(void *pointer, unsigned long *offset){
+    int i, ret = -1;
+    unsigned long off, minoff = 0;
 
-#ifdef __linux__
-int linuxOvercommitMemoryValue(void) {
-    FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
-    char buf[64];
+    /* Try to match against the Symbol with the smallest offset */
+    for (i=0; symsTable[i].pointer; i++) {
+        unsigned long lp = (unsigned long) pointer;
 
-    if (!fp) return -1;
-    if (fgets(buf,64,fp) == NULL) {
-        fclose(fp);
-        return -1;
+        if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
+            off=lp-symsTable[i].pointer;
+            if (ret < 0 || off < minoff) {
+                minoff=off;
+                ret=i;
+            }
+        }
     }
-    fclose(fp);
-
-    return atoi(buf);
+    if (ret == -1) return NULL;
+    *offset = minoff;
+    return symsTable[ret].name;
 }
-
-void linuxOvercommitMemoryWarning(void) {
-    if (linuxOvercommitMemoryValue() == 0) {
-        redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
-    }
+#else /* HAVE_BACKTRACE */
+static void setupSigSegvAction(void) {
 }
-#endif /* __linux__ */
+#endif /* HAVE_BACKTRACE */
 
-static void daemonize(void) {
-    int fd;
-    FILE *fp;
 
-    if (fork() != 0) exit(0); /* parent exits */
-    setsid(); /* create a new session */
 
-    /* Every output goes to /dev/null. If Redis is daemonized but
-     * the 'logfile' is set to 'stdout' in the configuration file
-     * it will not log at all. */
-    if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
-        dup2(fd, STDIN_FILENO);
-        dup2(fd, STDOUT_FILENO);
-        dup2(fd, STDERR_FILENO);
-        if (fd > STDERR_FILENO) close(fd);
-    }
-    /* Try to write the pid file */
-    fp = fopen(server.pidfile,"w");
-    if (fp) {
-        fprintf(fp,"%d\n",getpid());
-        fclose(fp);
-    }
-}
+/* The End */
+
+
 
-int main(int argc, char **argv) {
-    initServerConfig();
-    if (argc == 2) {
-        ResetServerSaveParams();
-        loadServerConfig(argv[1]);
-    } else if (argc > 2) {
-        fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
-        exit(1);
-    } else {
-        redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
-    }
-    initServer();
-    if (server.daemonize) daemonize();
-    redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
-#ifdef __linux__
-    linuxOvercommitMemoryWarning();
-#endif
-    if (server.appendonly) {
-        if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
-            redisLog(REDIS_NOTICE,"DB loaded from append only file");
-    } else {
-        if (rdbLoad(server.dbfilename) == REDIS_OK)
-            redisLog(REDIS_NOTICE,"DB loaded from disk");
-    }
-    if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
-        acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
-    redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
-    aeMain(server.el);
-    aeDeleteEventLoop(server.el);
-    return 0;
-}