]> git.saurik.com Git - redis.git/blobdiff - redis.c
function to create a new ziplist encoded list
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index da5baa2ec6eb217c353b11a3c83fe21c9f42bcae..33750b933b36ba5027f61ad0e6cb3093d62818ab 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -75,6 +75,7 @@
 #include "lzf.h"    /* LZF compression library */
 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
 #include "zipmap.h" /* Compact dictionary-alike data structure */
 #include "lzf.h"    /* LZF compression library */
 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
 #include "zipmap.h" /* Compact dictionary-alike data structure */
+#include "ziplist.h" /* Compact list data structure */
 #include "sha1.h"   /* SHA1 is used for DEBUG DIGEST */
 #include "release.h" /* Release and/or git repository information */
 
 #include "sha1.h"   /* SHA1 is used for DEBUG DIGEST */
 #include "release.h" /* Release and/or git repository information */
 
 /* Objects encoding. Some kind of objects like Strings and Hashes can be
  * internally represented in multiple ways. The 'encoding' field of the object
  * is set to one of this fields for this object. */
 /* Objects encoding. Some kind of objects like Strings and Hashes can be
  * internally represented in multiple ways. The 'encoding' field of the object
  * is set to one of this fields for this object. */
-#define REDIS_ENCODING_RAW 0    /* Raw representation */
-#define REDIS_ENCODING_INT 1    /* Encoded as integer */
-#define REDIS_ENCODING_ZIPMAP 2 /* Encoded as zipmap */
-#define REDIS_ENCODING_HT 3     /* Encoded as an hash table */
+#define REDIS_ENCODING_RAW 0     /* Raw representation */
+#define REDIS_ENCODING_INT 1     /* Encoded as integer */
+#define REDIS_ENCODING_HT 2      /* Encoded as hash table */
+#define REDIS_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
+#define REDIS_ENCODING_LIST 4    /* Encoded as zipmap */
+#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
 
 static char* strencoding[] = {
     "raw", "int", "zipmap", "hashtable"
 
 static char* strencoding[] = {
     "raw", "int", "zipmap", "hashtable"
@@ -752,7 +755,8 @@ static void unwatchCommand(redisClient *c);
 
 /* Global vars */
 static struct redisServer server; /* server global state */
 
 /* Global vars */
 static struct redisServer server; /* server global state */
-static struct redisCommand cmdTable[] = {
+static struct redisCommand *commandTable;
+static struct redisCommand readonlyCommandTable[] = {
     {"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
     {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0},
     {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0},
     {"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
     {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0},
     {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0},
@@ -860,8 +864,7 @@ static struct redisCommand cmdTable[] = {
     {"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0},
     {"publish",publishCommand,3,REDIS_CMD_BULK|REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0},
     {"watch",watchCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
     {"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0},
     {"publish",publishCommand,3,REDIS_CMD_BULK|REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0},
     {"watch",watchCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
-    {"unwatch",unwatchCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
-    {NULL,NULL,0,0,NULL,0,0,0}
+    {"unwatch",unwatchCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}
 };
 
 /*============================ Utility functions ============================ */
 };
 
 /*============================ Utility functions ============================ */
@@ -2247,13 +2250,29 @@ static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int
     }
 }
 
     }
 }
 
+static int qsortRedisCommands(const void *r1, const void *r2) {
+    return strcasecmp(
+        ((struct redisCommand*)r1)->name,
+        ((struct redisCommand*)r2)->name);
+}
+
+static void sortCommandTable() {
+    /* Copy and sort the read-only version of the command table */
+    commandTable = (struct redisCommand*)malloc(sizeof(readonlyCommandTable));
+    memcpy(commandTable,readonlyCommandTable,sizeof(readonlyCommandTable));
+    qsort(commandTable,
+        sizeof(readonlyCommandTable)/sizeof(struct redisCommand),
+        sizeof(struct redisCommand),qsortRedisCommands);
+}
+
 static struct redisCommand *lookupCommand(char *name) {
 static struct redisCommand *lookupCommand(char *name) {
-    int j = 0;
-    while(cmdTable[j].name != NULL) {
-        if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
-        j++;
-    }
-    return NULL;
+    struct redisCommand tmp = {name,NULL,0,0,NULL,0,0,0};
+    return bsearch(
+        &tmp,
+        commandTable,
+        sizeof(readonlyCommandTable)/sizeof(struct redisCommand),
+        sizeof(struct redisCommand),
+        qsortRedisCommands);
 }
 
 /* resetClient prepare the client to process the next command */
 }
 
 /* resetClient prepare the client to process the next command */
@@ -2965,9 +2984,17 @@ static robj *dupStringObject(robj *o) {
 
 static robj *createListObject(void) {
     list *l = listCreate();
 
 static robj *createListObject(void) {
     list *l = listCreate();
-
+    robj *o = createObject(REDIS_LIST,l);
     listSetFreeMethod(l,decrRefCount);
     listSetFreeMethod(l,decrRefCount);
-    return createObject(REDIS_LIST,l);
+    o->encoding = REDIS_ENCODING_LIST;
+    return o;
+}
+
+static robj *createZiplistObject(void) {
+    unsigned char *zl = ziplistNew();
+    robj *o = createObject(REDIS_LIST,zl);
+    o->encoding = REDIS_ENCODING_ZIPLIST;
+    return o;
 }
 
 static robj *createSetObject(void) {
 }
 
 static robj *createSetObject(void) {
@@ -3000,7 +3027,16 @@ static void freeStringObject(robj *o) {
 }
 
 static void freeListObject(robj *o) {
 }
 
 static void freeListObject(robj *o) {
-    listRelease((list*) o->ptr);
+    switch (o->encoding) {
+    case REDIS_ENCODING_LIST:
+        listRelease((list*) o->ptr);
+        break;
+    case REDIS_ENCODING_ZIPLIST:
+        zfree(o->ptr);
+        break;
+    default:
+        redisPanic("Unknown list encoding type");
+    }
 }
 
 static void freeSetObject(robj *o) {
 }
 
 static void freeSetObject(robj *o) {
@@ -3516,38 +3552,32 @@ static int rdbSaveRawString(FILE *fp, unsigned char *s, size_t len) {
     return 0;
 }
 
     return 0;
 }
 
+/* Save a long long value as either an encoded string or a string. */
+static int rdbSaveLongLongAsStringObject(FILE *fp, long long value) {
+    unsigned char buf[32];
+    int enclen = rdbEncodeInteger(value,buf);
+    if (enclen > 0) {
+        if (fwrite(buf,enclen,1,fp) == 0) return -1;
+    } else {
+        /* Encode as string */
+        enclen = ll2string((char*)buf,32,value);
+        redisAssert(enclen < 32);
+        if (rdbSaveLen(fp,enclen) == -1) return -1;
+        if (fwrite(buf,enclen,1,fp) == 0) return -1;
+    }
+    return 0;
+}
+
 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
 static int rdbSaveStringObject(FILE *fp, robj *obj) {
 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
 static int rdbSaveStringObject(FILE *fp, robj *obj) {
-    int retval;
-
     /* Avoid to decode the object, then encode it again, if the
      * object is alrady integer encoded. */
     if (obj->encoding == REDIS_ENCODING_INT) {
     /* Avoid to decode the object, then encode it again, if the
      * object is alrady integer encoded. */
     if (obj->encoding == REDIS_ENCODING_INT) {
-        long val = (long) obj->ptr;
-        unsigned char buf[5];
-        int enclen;
-
-        if ((enclen = rdbEncodeInteger(val,buf)) > 0) {
-            if (fwrite(buf,enclen,1,fp) == 0) return -1;
-            return 0;
-        }
-        /* otherwise... fall throught and continue with the usual
-         * code path. */
-    }
-
-    /* Avoid incr/decr ref count business when possible.
-     * This plays well with copy-on-write given that we are probably
-     * in a child process (BGSAVE). Also this makes sure key objects
-     * of swapped objects are not incRefCount-ed (an assert does not allow
-     * this in order to avoid bugs) */
-    if (obj->encoding != REDIS_ENCODING_RAW) {
-        obj = getDecodedObject(obj);
-        retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr));
-        decrRefCount(obj);
+        return rdbSaveLongLongAsStringObject(fp,(long)obj->ptr);
     } else {
     } else {
-        retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr));
+        redisAssert(obj->encoding == REDIS_ENCODING_RAW);
+        return rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr));
     }
     }
-    return retval;
 }
 
 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
 }
 
 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
@@ -3600,16 +3630,37 @@ static int rdbSaveObject(FILE *fp, robj *o) {
         if (rdbSaveStringObject(fp,o) == -1) return -1;
     } else if (o->type == REDIS_LIST) {
         /* Save a list value */
         if (rdbSaveStringObject(fp,o) == -1) return -1;
     } else if (o->type == REDIS_LIST) {
         /* Save a list value */
-        list *list = o->ptr;
-        listIter li;
-        listNode *ln;
-
-        if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
-        listRewind(list,&li);
-        while((ln = listNext(&li))) {
-            robj *eleobj = listNodeValue(ln);
+        if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+            unsigned char *p;
+            unsigned char *vstr;
+            unsigned int vlen;
+            long long vlong;
+
+            if (rdbSaveLen(fp,ziplistLen(o->ptr)) == -1) return -1;
+            p = ziplistIndex(o->ptr,0);
+            while(ziplistGet(p,&vstr,&vlen,&vlong)) {
+                if (vstr) {
+                    if (rdbSaveRawString(fp,vstr,vlen) == -1)
+                        return -1;
+                } else {
+                    if (rdbSaveLongLongAsStringObject(fp,vlong) == -1)
+                        return -1;
+                }
+                p = ziplistNext(o->ptr,p);
+            }
+        } else if (o->encoding == REDIS_ENCODING_LIST) {
+            list *list = o->ptr;
+            listIter li;
+            listNode *ln;
 
 
-            if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
+            if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
+            listRewind(list,&li);
+            while((ln = listNext(&li))) {
+                robj *eleobj = listNodeValue(ln);
+                if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
+            }
+        } else {
+            redisPanic("Unknown list encoding");
         }
     } else if (o->type == REDIS_SET) {
         /* Save a set value */
         }
     } else if (o->type == REDIS_SET) {
         /* Save a set value */
@@ -3976,34 +4027,48 @@ static int rdbLoadDoubleValue(FILE *fp, double *val) {
 /* Load a Redis object of the specified type from the specified file.
  * On success a newly allocated object is returned, otherwise NULL. */
 static robj *rdbLoadObject(int type, FILE *fp) {
 /* Load a Redis object of the specified type from the specified file.
  * On success a newly allocated object is returned, otherwise NULL. */
 static robj *rdbLoadObject(int type, FILE *fp) {
-    robj *o;
+    robj *o, *ele, *dec;
+    size_t len;
 
     redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp));
     if (type == REDIS_STRING) {
         /* Read string value */
         if ((o = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
         o = tryObjectEncoding(o);
 
     redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp));
     if (type == REDIS_STRING) {
         /* Read string value */
         if ((o = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
         o = tryObjectEncoding(o);
-    } else if (type == REDIS_LIST || type == REDIS_SET) {
-        /* Read list/set value */
-        uint32_t listlen;
+    } else if (type == REDIS_LIST) {
+        /* Read list value */
+        if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
+
+        o = createZiplistObject();
+
+        /* Load every single element of the list */
+        while(len--) {
+            if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
 
 
-        if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
-        o = (type == REDIS_LIST) ? createListObject() : createSetObject();
+            if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+                dec = getDecodedObject(ele);
+                o->ptr = ziplistPush(o->ptr,dec->ptr,sdslen(dec->ptr),REDIS_TAIL);
+                decrRefCount(dec);
+                decrRefCount(ele);
+            } else {
+                ele = tryObjectEncoding(ele);
+                listAddNodeTail(o->ptr,ele);
+                incrRefCount(ele);
+            }
+        }
+    } else if (type == REDIS_SET) {
+        /* Read list/set value */
+        if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
+        o = createSetObject();
         /* It's faster to expand the dict to the right size asap in order
          * to avoid rehashing */
         /* It's faster to expand the dict to the right size asap in order
          * to avoid rehashing */
-        if (type == REDIS_SET && listlen > DICT_HT_INITIAL_SIZE)
-            dictExpand(o->ptr,listlen);
+        if (len > DICT_HT_INITIAL_SIZE)
+            dictExpand(o->ptr,len);
         /* Load every single element of the list/set */
         /* Load every single element of the list/set */
-        while(listlen--) {
-            robj *ele;
-
+        while(len--) {
             if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
             ele = tryObjectEncoding(ele);
             if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
             ele = tryObjectEncoding(ele);
-            if (type == REDIS_LIST) {
-                listAddNodeTail((list*)o->ptr,ele);
-            } else {
-                dictAdd((dict*)o->ptr,ele,NULL);
-            }
+            dictAdd((dict*)o->ptr,ele,NULL);
         }
     } else if (type == REDIS_ZSET) {
         /* Read list/set value */
         }
     } else if (type == REDIS_ZSET) {
         /* Read list/set value */
@@ -4744,26 +4809,211 @@ static void moveCommand(redisClient *c) {
 }
 
 /* =================================== Lists ================================ */
 }
 
 /* =================================== Lists ================================ */
-static void pushGenericCommand(redisClient *c, int where) {
-    robj *lobj;
-    list *list;
+static void lPush(robj *subject, robj *value, int where) {
+    if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
+        int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
+        value = getDecodedObject(value);
+        subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
+        decrRefCount(value);
+    } else if (subject->encoding == REDIS_ENCODING_LIST) {
+        if (where == REDIS_HEAD) {
+            listAddNodeHead(subject->ptr,value);
+        } else {
+            listAddNodeTail(subject->ptr,value);
+        }
+        incrRefCount(value);
+    } else {
+        redisPanic("Unknown list encoding");
+    }
+}
+
+static robj *lPop(robj *subject, int where) {
+    robj *value = NULL;
+    if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
+        unsigned char *p;
+        unsigned char *vstr;
+        unsigned int vlen;
+        long long vlong;
+        int pos = (where == REDIS_HEAD) ? 0 : -1;
+        p = ziplistIndex(subject->ptr,pos);
+        if (ziplistGet(p,&vstr,&vlen,&vlong)) {
+            if (vstr) {
+                value = createStringObject((char*)vstr,vlen);
+            } else {
+                value = createStringObjectFromLongLong(vlong);
+            }
+            /* We only need to delete an element when it exists */
+            subject->ptr = ziplistDelete(subject->ptr,&p);
+        }
+    } else if (subject->encoding == REDIS_ENCODING_LIST) {
+        list *list = subject->ptr;
+        listNode *ln;
+        if (where == REDIS_HEAD) {
+            ln = listFirst(list);
+        } else {
+            ln = listLast(list);
+        }
+        if (ln != NULL) {
+            value = listNodeValue(ln);
+            incrRefCount(value);
+            listDelNode(list,ln);
+        }
+    } else {
+        redisPanic("Unknown list encoding");
+    }
+    return value;
+}
 
 
-    lobj = lookupKeyWrite(c->db,c->argv[1]);
+static unsigned long lLength(robj *subject) {
+    if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
+        return ziplistLen(subject->ptr);
+    } else if (subject->encoding == REDIS_ENCODING_LIST) {
+        return listLength((list*)subject->ptr);
+    } else {
+        redisPanic("Unknown list encoding");
+    }
+}
+
+/* Structure to hold set iteration abstraction. */
+typedef struct {
+    robj *subject;
+    unsigned char encoding;
+    unsigned char direction; /* Iteration direction */
+    unsigned char *zi;
+    listNode *ln;
+} lIterator;
+
+/* Structure for an entry while iterating over a list. */
+typedef struct {
+    lIterator *li;
+    unsigned char *zi;  /* Entry in ziplist */
+    listNode *ln;       /* Entry in linked list */
+} lEntry;
+
+/* Initialize an iterator at the specified index. */
+static lIterator *lInitIterator(robj *subject, int index, unsigned char direction) {
+    lIterator *li = zmalloc(sizeof(lIterator));
+    li->subject = subject;
+    li->encoding = subject->encoding;
+    li->direction = direction;
+    if (li->encoding == REDIS_ENCODING_ZIPLIST) {
+        li->zi = ziplistIndex(subject->ptr,index);
+    } else if (li->encoding == REDIS_ENCODING_LIST) {
+        li->ln = listIndex(subject->ptr,index);
+    } else {
+        redisPanic("Unknown list encoding");
+    }
+    return li;
+}
+
+/* Clean up the iterator. */
+static void lReleaseIterator(lIterator *li) {
+    zfree(li);
+}
+
+/* Stores pointer to current the entry in the provided entry structure
+ * and advances the position of the iterator. Returns 1 when the current
+ * entry is in fact an entry, 0 otherwise. */
+static int lNext(lIterator *li, lEntry *entry) {
+    entry->li = li;
+    if (li->encoding == REDIS_ENCODING_ZIPLIST) {
+        entry->zi = li->zi;
+        if (entry->zi != NULL) {
+            if (li->direction == REDIS_TAIL)
+                li->zi = ziplistNext(li->subject->ptr,li->zi);
+            else
+                li->zi = ziplistPrev(li->subject->ptr,li->zi);
+            return 1;
+        }
+    } else if (li->encoding == REDIS_ENCODING_LIST) {
+        entry->ln = li->ln;
+        if (entry->ln != NULL) {
+            if (li->direction == REDIS_TAIL)
+                li->ln = li->ln->next;
+            else
+                li->ln = li->ln->prev;
+            return 1;
+        }
+    } else {
+        redisPanic("Unknown list encoding");
+    }
+    return 0;
+}
+
+/* Return entry or NULL at the current position of the iterator. */
+static robj *lGet(lEntry *entry) {
+    lIterator *li = entry->li;
+    robj *value = NULL;
+    if (li->encoding == REDIS_ENCODING_ZIPLIST) {
+        unsigned char *vstr;
+        unsigned int vlen;
+        long long vlong;
+        redisAssert(entry->zi != NULL);
+        if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
+            if (vstr) {
+                value = createStringObject((char*)vstr,vlen);
+            } else {
+                value = createStringObjectFromLongLong(vlong);
+            }
+        }
+    } else if (li->encoding == REDIS_ENCODING_LIST) {
+        redisAssert(entry->ln != NULL);
+        value = listNodeValue(entry->ln);
+        incrRefCount(value);
+    } else {
+        redisPanic("Unknown list encoding");
+    }
+    return value;
+}
+
+/* Compare the given object with the entry at the current position. */
+static int lEqual(lEntry *entry, robj *o) {
+    lIterator *li = entry->li;
+    if (li->encoding == REDIS_ENCODING_ZIPLIST) {
+        redisAssert(o->encoding == REDIS_ENCODING_RAW);
+        return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
+    } else if (li->encoding == REDIS_ENCODING_LIST) {
+        return equalStringObjects(o,listNodeValue(entry->ln));
+    } else {
+        redisPanic("Unknown list encoding");
+    }
+}
+
+/* Delete the element pointed to. */
+static void lDelete(lEntry *entry) {
+    lIterator *li = entry->li;
+    if (li->encoding == REDIS_ENCODING_ZIPLIST) {
+        unsigned char *p = entry->zi;
+        li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
+
+        /* Update position of the iterator depending on the direction */
+        if (li->direction == REDIS_TAIL)
+            li->zi = p;
+        else
+            li->zi = ziplistPrev(li->subject->ptr,p);
+    } else if (entry->li->encoding == REDIS_ENCODING_LIST) {
+        listNode *next;
+        if (li->direction == REDIS_TAIL)
+            next = entry->ln->next;
+        else
+            next = entry->ln->prev;
+        listDelNode(li->subject->ptr,entry->ln);
+        li->ln = next;
+    } else {
+        redisPanic("Unknown list encoding");
+    }
+}
+
+static void pushGenericCommand(redisClient *c, int where) {
+    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
     if (lobj == NULL) {
         if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
             addReply(c,shared.cone);
             return;
         }
     if (lobj == NULL) {
         if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
             addReply(c,shared.cone);
             return;
         }
-        lobj = createListObject();
-        list = lobj->ptr;
-        if (where == REDIS_HEAD) {
-            listAddNodeHead(list,c->argv[2]);
-        } else {
-            listAddNodeTail(list,c->argv[2]);
-        }
+        lobj = createZiplistObject();
         dictAdd(c->db->dict,c->argv[1],lobj);
         incrRefCount(c->argv[1]);
         dictAdd(c->db->dict,c->argv[1],lobj);
         incrRefCount(c->argv[1]);
-        incrRefCount(c->argv[2]);
     } else {
         if (lobj->type != REDIS_LIST) {
             addReply(c,shared.wrongtypeerr);
     } else {
         if (lobj->type != REDIS_LIST) {
             addReply(c,shared.wrongtypeerr);
@@ -4773,16 +5023,10 @@ static void pushGenericCommand(redisClient *c, int where) {
             addReply(c,shared.cone);
             return;
         }
             addReply(c,shared.cone);
             return;
         }
-        list = lobj->ptr;
-        if (where == REDIS_HEAD) {
-            listAddNodeHead(list,c->argv[2]);
-        } else {
-            listAddNodeTail(list,c->argv[2]);
-        }
-        incrRefCount(c->argv[2]);
     }
     }
+    lPush(lobj,c->argv[2],where);
+    addReplyLongLong(c,lLength(lobj));
     server.dirty++;
     server.dirty++;
-    addReplyLongLong(c,listLength(list));
 }
 
 static void lpushCommand(redisClient *c) {
 }
 
 static void lpushCommand(redisClient *c) {
@@ -4794,80 +5038,93 @@ static void rpushCommand(redisClient *c) {
 }
 
 static void llenCommand(redisClient *c) {
 }
 
 static void llenCommand(redisClient *c) {
-    robj *o;
-    list *l;
-
-    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
-        checkType(c,o,REDIS_LIST)) return;
-
-    l = o->ptr;
-    addReplyUlong(c,listLength(l));
+    robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
+    if (o == NULL || checkType(c,o,REDIS_LIST)) return;
+    addReplyUlong(c,lLength(o));
 }
 
 static void lindexCommand(redisClient *c) {
 }
 
 static void lindexCommand(redisClient *c) {
-    robj *o;
+    robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
+    if (o == NULL || checkType(c,o,REDIS_LIST)) return;
     int index = atoi(c->argv[2]->ptr);
     int index = atoi(c->argv[2]->ptr);
-    list *list;
-    listNode *ln;
-
-    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
-        checkType(c,o,REDIS_LIST)) return;
-    list = o->ptr;
+    robj *value = NULL;
 
 
-    ln = listIndex(list, index);
-    if (ln == NULL) {
-        addReply(c,shared.nullbulk);
+    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+        unsigned char *p;
+        unsigned char *vstr;
+        unsigned int vlen;
+        long long vlong;
+        p = ziplistIndex(o->ptr,index);
+        if (ziplistGet(p,&vstr,&vlen,&vlong)) {
+            if (vstr) {
+                value = createStringObject((char*)vstr,vlen);
+            } else {
+                value = createStringObjectFromLongLong(vlong);
+            }
+            addReplyBulk(c,value);
+            decrRefCount(value);
+        } else {
+            addReply(c,shared.nullbulk);
+        }
+    } else if (o->encoding == REDIS_ENCODING_LIST) {
+        listNode *ln = listIndex(o->ptr,index);
+        if (ln != NULL) {
+            value = listNodeValue(ln);
+            addReplyBulk(c,value);
+        } else {
+            addReply(c,shared.nullbulk);
+        }
     } else {
     } else {
-        robj *ele = listNodeValue(ln);
-        addReplyBulk(c,ele);
+        redisPanic("Unknown list encoding");
     }
 }
 
 static void lsetCommand(redisClient *c) {
     }
 }
 
 static void lsetCommand(redisClient *c) {
-    robj *o;
+    robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
+    if (o == NULL || checkType(c,o,REDIS_LIST)) return;
     int index = atoi(c->argv[2]->ptr);
     int index = atoi(c->argv[2]->ptr);
-    list *list;
-    listNode *ln;
-
-    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL ||
-        checkType(c,o,REDIS_LIST)) return;
-    list = o->ptr;
+    robj *value = c->argv[3];
 
 
-    ln = listIndex(list, index);
-    if (ln == NULL) {
-        addReply(c,shared.outofrangeerr);
+    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+        unsigned char *p, *zl = o->ptr;
+        p = ziplistIndex(zl,index);
+        if (p == NULL) {
+            addReply(c,shared.outofrangeerr);
+        } else {
+            o->ptr = ziplistDelete(o->ptr,&p);
+            value = getDecodedObject(value);
+            o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
+            decrRefCount(value);
+            addReply(c,shared.ok);
+            server.dirty++;
+        }
+    } else if (o->encoding == REDIS_ENCODING_LIST) {
+        listNode *ln = listIndex(o->ptr,index);
+        if (ln == NULL) {
+            addReply(c,shared.outofrangeerr);
+        } else {
+            decrRefCount((robj*)listNodeValue(ln));
+            listNodeValue(ln) = value;
+            incrRefCount(value);
+            addReply(c,shared.ok);
+            server.dirty++;
+        }
     } else {
     } else {
-        robj *ele = listNodeValue(ln);
-
-        decrRefCount(ele);
-        listNodeValue(ln) = c->argv[3];
-        incrRefCount(c->argv[3]);
-        addReply(c,shared.ok);
-        server.dirty++;
+        redisPanic("Unknown list encoding");
     }
 }
 
 static void popGenericCommand(redisClient *c, int where) {
     }
 }
 
 static void popGenericCommand(redisClient *c, int where) {
-    robj *o;
-    list *list;
-    listNode *ln;
-
-    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
-        checkType(c,o,REDIS_LIST)) return;
-    list = o->ptr;
-
-    if (where == REDIS_HEAD)
-        ln = listFirst(list);
-    else
-        ln = listLast(list);
+    robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
+    if (o == NULL || checkType(c,o,REDIS_LIST)) return;
 
 
-    if (ln == NULL) {
+    robj *value = lPop(o,where);
+    if (value == NULL) {
         addReply(c,shared.nullbulk);
     } else {
         addReply(c,shared.nullbulk);
     } else {
-        robj *ele = listNodeValue(ln);
-        addReplyBulk(c,ele);
-        listDelNode(list,ln);
-        if (listLength(list) == 0) deleteKey(c->db,c->argv[1]);
+        addReplyBulk(c,value);
+        decrRefCount(value);
+        if (lLength(o) == 0) deleteKey(c->db,c->argv[1]);
         server.dirty++;
     }
 }
         server.dirty++;
     }
 }
@@ -4881,19 +5138,16 @@ static void rpopCommand(redisClient *c) {
 }
 
 static void lrangeCommand(redisClient *c) {
 }
 
 static void lrangeCommand(redisClient *c) {
-    robj *o;
+    robj *o, *value;
     int start = atoi(c->argv[2]->ptr);
     int end = atoi(c->argv[3]->ptr);
     int llen;
     int rangelen, j;
     int start = atoi(c->argv[2]->ptr);
     int end = atoi(c->argv[3]->ptr);
     int llen;
     int rangelen, j;
-    list *list;
-    listNode *ln;
-    robj *ele;
+    lEntry entry;
 
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
          || checkType(c,o,REDIS_LIST)) return;
 
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
          || checkType(c,o,REDIS_LIST)) return;
-    list = o->ptr;
-    llen = listLength(list);
+    llen = lLength(o);
 
     /* convert negative indexes */
     if (start < 0) start = llen+start;
 
     /* convert negative indexes */
     if (start < 0) start = llen+start;
@@ -4911,13 +5165,15 @@ static void lrangeCommand(redisClient *c) {
     rangelen = (end-start)+1;
 
     /* Return the result in form of a multi-bulk reply */
     rangelen = (end-start)+1;
 
     /* Return the result in form of a multi-bulk reply */
-    ln = listIndex(list, start);
     addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
     addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
+    lIterator *li = lInitIterator(o,start,REDIS_TAIL);
     for (j = 0; j < rangelen; j++) {
     for (j = 0; j < rangelen; j++) {
-        ele = listNodeValue(ln);
-        addReplyBulk(c,ele);
-        ln = ln->next;
+        redisAssert(lNext(li,&entry));
+        value = lGet(&entry);
+        addReplyBulk(c,value);
+        decrRefCount(value);
     }
     }
+    lReleaseIterator(li);
 }
 
 static void ltrimCommand(redisClient *c) {
 }
 
 static void ltrimCommand(redisClient *c) {
@@ -4931,8 +5187,7 @@ static void ltrimCommand(redisClient *c) {
 
     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
         checkType(c,o,REDIS_LIST)) return;
 
     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
         checkType(c,o,REDIS_LIST)) return;
-    list = o->ptr;
-    llen = listLength(list);
+    llen = lLength(o);
 
     /* convert negative indexes */
     if (start < 0) start = llen+start;
 
     /* convert negative indexes */
     if (start < 0) start = llen+start;
@@ -4952,49 +5207,63 @@ static void ltrimCommand(redisClient *c) {
     }
 
     /* Remove list elements to perform the trim */
     }
 
     /* Remove list elements to perform the trim */
-    for (j = 0; j < ltrim; j++) {
-        ln = listFirst(list);
-        listDelNode(list,ln);
-    }
-    for (j = 0; j < rtrim; j++) {
-        ln = listLast(list);
-        listDelNode(list,ln);
+    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+        o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
+        o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
+    } else if (o->encoding == REDIS_ENCODING_LIST) {
+        list = o->ptr;
+        for (j = 0; j < ltrim; j++) {
+            ln = listFirst(list);
+            listDelNode(list,ln);
+        }
+        for (j = 0; j < rtrim; j++) {
+            ln = listLast(list);
+            listDelNode(list,ln);
+        }
+    } else {
+        redisPanic("Unknown list encoding");
     }
     }
-    if (listLength(list) == 0) deleteKey(c->db,c->argv[1]);
+    if (lLength(o) == 0) deleteKey(c->db,c->argv[1]);
     server.dirty++;
     addReply(c,shared.ok);
 }
 
 static void lremCommand(redisClient *c) {
     server.dirty++;
     addReply(c,shared.ok);
 }
 
 static void lremCommand(redisClient *c) {
-    robj *o;
-    list *list;
-    listNode *ln, *next;
+    robj *subject, *obj = c->argv[3];
     int toremove = atoi(c->argv[2]->ptr);
     int removed = 0;
     int toremove = atoi(c->argv[2]->ptr);
     int removed = 0;
-    int fromtail = 0;
+    lEntry entry;
 
 
-    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
-        checkType(c,o,REDIS_LIST)) return;
-    list = o->ptr;
+    subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
+    if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
+
+    /* Make sure obj is raw when we're dealing with a ziplist */
+    if (subject->encoding == REDIS_ENCODING_ZIPLIST)
+        obj = getDecodedObject(obj);
 
 
+    lIterator *li;
     if (toremove < 0) {
         toremove = -toremove;
     if (toremove < 0) {
         toremove = -toremove;
-        fromtail = 1;
+        li = lInitIterator(subject,-1,REDIS_HEAD);
+    } else {
+        li = lInitIterator(subject,0,REDIS_TAIL);
     }
     }
-    ln = fromtail ? list->tail : list->head;
-    while (ln) {
-        robj *ele = listNodeValue(ln);
 
 
-        next = fromtail ? ln->prev : ln->next;
-        if (equalStringObjects(ele,c->argv[3])) {
-            listDelNode(list,ln);
+    while (lNext(li,&entry)) {
+        if (lEqual(&entry,obj)) {
+            lDelete(&entry);
             server.dirty++;
             removed++;
             if (toremove && removed == toremove) break;
         }
             server.dirty++;
             removed++;
             if (toremove && removed == toremove) break;
         }
-        ln = next;
     }
     }
-    if (listLength(list) == 0) deleteKey(c->db,c->argv[1]);
+    lReleaseIterator(li);
+
+    /* Clean up raw encoded object */
+    if (subject->encoding == REDIS_ENCODING_ZIPLIST)
+        decrRefCount(obj);
+
+    if (lLength(subject) == 0) deleteKey(c->db,c->argv[1]);
     addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
 }
 
     addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
 }
 
@@ -5014,47 +5283,37 @@ static void lremCommand(redisClient *c) {
  * as well. This command was originally proposed by Ezra Zygmuntowicz.
  */
 static void rpoplpushcommand(redisClient *c) {
  * as well. This command was originally proposed by Ezra Zygmuntowicz.
  */
 static void rpoplpushcommand(redisClient *c) {
-    robj *sobj;
-    list *srclist;
-    listNode *ln;
-
+    robj *sobj, *value;
     if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,sobj,REDIS_LIST)) return;
     if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,sobj,REDIS_LIST)) return;
-    srclist = sobj->ptr;
-    ln = listLast(srclist);
 
 
-    if (ln == NULL) {
+    if (lLength(sobj) == 0) {
         addReply(c,shared.nullbulk);
     } else {
         robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
         addReply(c,shared.nullbulk);
     } else {
         robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
-        robj *ele = listNodeValue(ln);
-        list *dstlist;
-
-        if (dobj && dobj->type != REDIS_LIST) {
-            addReply(c,shared.wrongtypeerr);
-            return;
-        }
+        if (dobj && checkType(c,dobj,REDIS_LIST)) return;
+        value = lPop(sobj,REDIS_TAIL);
 
         /* Add the element to the target list (unless it's directly
          * passed to some BLPOP-ing client */
 
         /* Add the element to the target list (unless it's directly
          * passed to some BLPOP-ing client */
-        if (!handleClientsWaitingListPush(c,c->argv[2],ele)) {
-            if (dobj == NULL) {
-                /* Create the list if the key does not exist */
-                dobj = createListObject();
+        if (!handleClientsWaitingListPush(c,c->argv[2],value)) {
+            /* Create the list if the key does not exist */
+            if (!dobj) {
+                dobj = createZiplistObject();
                 dictAdd(c->db->dict,c->argv[2],dobj);
                 incrRefCount(c->argv[2]);
             }
                 dictAdd(c->db->dict,c->argv[2],dobj);
                 incrRefCount(c->argv[2]);
             }
-            dstlist = dobj->ptr;
-            listAddNodeHead(dstlist,ele);
-            incrRefCount(ele);
+            lPush(dobj,value,REDIS_HEAD);
         }
 
         /* Send the element to the client as reply as well */
         }
 
         /* Send the element to the client as reply as well */
-        addReplyBulk(c,ele);
+        addReplyBulk(c,value);
+
+        /* lPop returns an object with its refcount incremented */
+        decrRefCount(value);
 
 
-        /* Finally remove the element from the source list */
-        listDelNode(srclist,ln);
-        if (listLength(srclist) == 0) deleteKey(c->db,c->argv[1]);
+        /* Delete the source list when it is empty */
+        if (lLength(sobj) == 0) deleteKey(c->db,c->argv[1]);
         server.dirty++;
     }
 }
         server.dirty++;
     }
 }
@@ -5734,6 +5993,11 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor
     zset *zs;
     double *score;
 
     zset *zs;
     double *score;
 
+    if (isnan(scoreval)) {
+        addReplySds(c,sdsnew("-ERR provide score is Not A Number (nan)\r\n"));
+        return;
+    }
+
     zsetobj = lookupKeyWrite(c->db,key);
     if (zsetobj == NULL) {
         zsetobj = createZsetObject();
     zsetobj = lookupKeyWrite(c->db,key);
     if (zsetobj == NULL) {
         zsetobj = createZsetObject();
@@ -5762,6 +6026,15 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor
         } else {
             *score = scoreval;
         }
         } else {
             *score = scoreval;
         }
+        if (isnan(*score)) {
+            addReplySds(c,
+                sdsnew("-ERR resulting score is Not A Number (nan)\r\n"));
+            zfree(score);
+            /* Note that we don't need to check if the zset may be empty and
+             * should be removed here, as we can only obtain Nan as score if
+             * there was already an element in the sorted set. */
+            return;
+        }
     } else {
         *score = scoreval;
     }
     } else {
         *score = scoreval;
     }
@@ -7563,6 +7836,7 @@ static void execCommand(redisClient *c) {
     execCommandReplicateMulti(c);
 
     /* Exec all the queued commands */
     execCommandReplicateMulti(c);
 
     /* Exec all the queued commands */
+    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
     orig_argv = c->argv;
     orig_argc = c->argc;
     addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
     orig_argv = c->argv;
     orig_argc = c->argc;
     addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
@@ -7575,8 +7849,7 @@ static void execCommand(redisClient *c) {
     c->argc = orig_argc;
     freeClientMultiState(c);
     initClientMultiState(c);
     c->argc = orig_argc;
     freeClientMultiState(c);
     initClientMultiState(c);
-    c->flags &= (~REDIS_MULTI);
-    unwatchAllKeys(c);
+    c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
     /* Make sure the EXEC command is always replicated / AOF, since we
      * always send the MULTI command (we can't know beforehand if the
      * next operations will contain at least a modification to the DB). */
     /* Make sure the EXEC command is always replicated / AOF, since we
      * always send the MULTI command (we can't know beforehand if the
      * next operations will contain at least a modification to the DB). */
@@ -10469,7 +10742,7 @@ static void unwatchAllKeys(redisClient *c) {
     }
 }
 
     }
 }
 
-/* "Touch" a key, so that if this key is being WATCHed by soem client the
+/* "Touch" a key, so that if this key is being WATCHed by some client the
  * next EXEC will fail. */
 static void touchWatchedKey(redisDb *db, robj *key) {
     list *clients;
  * next EXEC will fail. */
 static void touchWatchedKey(redisDb *db, robj *key) {
     list *clients;
@@ -10906,7 +11179,8 @@ static void daemonize(void) {
 }
 
 static void version() {
 }
 
 static void version() {
-    printf("Redis server version %s\n", REDIS_VERSION);
+    printf("Redis server version %s (%s:%d)\n", REDIS_VERSION,
+        REDIS_GIT_SHA1, atoi(REDIS_GIT_DIRTY) > 0);
     exit(0);
 }
 
     exit(0);
 }
 
@@ -10920,6 +11194,7 @@ int main(int argc, char **argv) {
     time_t start;
 
     initServerConfig();
     time_t start;
 
     initServerConfig();
+    sortCommandTable();
     if (argc == 2) {
         if (strcmp(argv[1], "-v") == 0 ||
             strcmp(argv[1], "--version") == 0) version();
     if (argc == 2) {
         if (strcmp(argv[1], "-v") == 0 ||
             strcmp(argv[1], "--version") == 0) version();