]> git.saurik.com Git - redis.git/commitdiff
Merge branch 'zset-mem' into zrevrangebyscore
authorPieter Noordhuis <pcnoordhuis@gmail.com>
Thu, 16 Sep 2010 12:32:30 +0000 (14:32 +0200)
committerPieter Noordhuis <pcnoordhuis@gmail.com>
Thu, 16 Sep 2010 12:32:30 +0000 (14:32 +0200)
1  2 
src/redis.c
src/redis.h
src/t_zset.c

diff --combined src/redis.c
index caa9624739ef9defa9a58c1b55f330a096eb9eb0,e6a1a1370179fead88efc1b751a60b11ab52aadf..7b1b3f4fb7c7c539165c0e919b85322dadd8e0ee
@@@ -51,7 -51,6 +51,7 @@@
  #include <float.h>
  #include <math.h>
  #include <pthread.h>
 +#include <sys/resource.h>
  
  /* Our shared "common" objects */
  
@@@ -171,7 -170,6 +171,7 @@@ struct redisCommand readonlyCommandTabl
      {"info",infoCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
      {"monitor",monitorCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
      {"ttl",ttlCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
 +    {"persist",persistCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
      {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0},
      {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
      {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0},
@@@ -340,7 -338,7 +340,7 @@@ dictType zsetDictType = 
      NULL,                      /* val dup */
      dictEncObjKeyCompare,      /* key compare */
      dictRedisObjectDestructor, /* key destructor */
-     dictVanillaFree            /* val destructor of malloc(sizeof(double)) */
+     NULL                       /* val destructor */
  };
  
  /* Db->dict, keys are sds strings, vals are Redis objects. */
@@@ -437,48 -435,6 +437,48 @@@ void updateDictResizePolicy(void) 
  
  /* ======================= Cron: called every 100 ms ======================== */
  
 +/* Try to expire a few timed out keys. The algorithm used is adaptive and
 + * will use few CPU cycles if there are few expiring keys, otherwise
 + * it will get more aggressive to avoid that too much memory is used by
 + * keys that can be removed from the keyspace. */
 +void activeExpireCycle(void) {
 +    int j;
 +
 +    for (j = 0; j < server.dbnum; j++) {
 +        int expired;
 +        redisDb *db = server.db+j;
 +
 +        /* Continue to expire if at the end of the cycle more than 25%
 +         * of the keys were expired. */
 +        do {
 +            long num = dictSize(db->expires);
 +            time_t now = time(NULL);
 +
 +            expired = 0;
 +            if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
 +                num = REDIS_EXPIRELOOKUPS_PER_CRON;
 +            while (num--) {
 +                dictEntry *de;
 +                time_t t;
 +
 +                if ((de = dictGetRandomKey(db->expires)) == NULL) break;
 +                t = (time_t) dictGetEntryVal(de);
 +                if (now > t) {
 +                    sds key = dictGetEntryKey(de);
 +                    robj *keyobj = createStringObject(key,sdslen(key));
 +
 +                    propagateExpire(db,keyobj);
 +                    dbDelete(db,keyobj);
 +                    decrRefCount(keyobj);
 +                    expired++;
 +                    server.stat_expiredkeys++;
 +                }
 +            }
 +        } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
 +    }
 +}
 +
 +
  int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      int j, loops = server.cronloops++;
      REDIS_NOTUSED(eventLoop);
           }
      }
  
 -    /* Try to expire a few timed out keys. The algorithm used is adaptive and
 -     * will use few CPU cycles if there are few expiring keys, otherwise
 -     * it will get more aggressive to avoid that too much memory is used by
 -     * keys that can be removed from the keyspace. */
 -    for (j = 0; j < server.dbnum; j++) {
 -        int expired;
 -        redisDb *db = server.db+j;
 -
 -        /* Continue to expire if at the end of the cycle more than 25%
 -         * of the keys were expired. */
 -        do {
 -            long num = dictSize(db->expires);
 -            time_t now = time(NULL);
 -
 -            expired = 0;
 -            if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
 -                num = REDIS_EXPIRELOOKUPS_PER_CRON;
 -            while (num--) {
 -                dictEntry *de;
 -                time_t t;
 -
 -                if ((de = dictGetRandomKey(db->expires)) == NULL) break;
 -                t = (time_t) dictGetEntryVal(de);
 -                if (now > t) {
 -                    sds key = dictGetEntryKey(de);
 -                    robj *keyobj = createStringObject(key,sdslen(key));
 -
 -                    dbDelete(db,keyobj);
 -                    decrRefCount(keyobj);
 -                    expired++;
 -                    server.stat_expiredkeys++;
 -                }
 -            }
 -        } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
 -    }
 +    /* Expire a few keys per cycle, only if this is a master.
 +     * On slaves we wait for DEL operations synthesized by the master
 +     * in order to guarantee a strict consistency. */
 +    if (server.masterhost == NULL) activeExpireCycle();
  
      /* Swap a few keys on disk if we are over the memory limit and VM
       * is enbled. Try to free objects from the free list first. */
@@@ -744,7 -731,6 +744,7 @@@ void initServerConfig() 
      server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
      server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
      server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
 +    server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
      server.shutdown_asap = 0;
  
      resetServerSaveParams();
@@@ -910,20 -896,15 +910,20 @@@ int processCommand(redisClient *c) 
      } else if (c->multibulk) {
          if (c->bulklen == -1) {
              if (((char*)c->argv[0]->ptr)[0] != '$') {
 -                addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
 +                addReplyError(c,"multi bulk protocol error");
                  resetClient(c);
                  return 1;
              } else {
 -                int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
 +                char *eptr;
 +                long bulklen = strtol(((char*)c->argv[0]->ptr)+1,&eptr,10);
 +                int perr = eptr[0] != '\0';
 +
                  decrRefCount(c->argv[0]);
 -                if (bulklen < 0 || bulklen > 1024*1024*1024) {
 +                if (perr || bulklen == LONG_MIN || bulklen == LONG_MAX ||
 +                    bulklen < 0 || bulklen > 1024*1024*1024)
 +                {
                      c->argc--;
 -                    addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
 +                    addReplyError(c,"invalid bulk write count");
                      resetClient(c);
                      return 1;
                  }
       * such wrong arity, bad command name and so forth. */
      cmd = lookupCommand(c->argv[0]->ptr);
      if (!cmd) {
 -        addReplySds(c,
 -            sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
 -                (char*)c->argv[0]->ptr));
 +        addReplyErrorFormat(c,"unknown command '%s'",
 +            (char*)c->argv[0]->ptr);
          resetClient(c);
          return 1;
      } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
                 (c->argc < -cmd->arity)) {
 -        addReplySds(c,
 -            sdscatprintf(sdsempty(),
 -                "-ERR wrong number of arguments for '%s' command\r\n",
 -                cmd->name));
 +        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
 +            cmd->name);
          resetClient(c);
          return 1;
      } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
          /* This is a bulk command, we have to read the last argument yet. */
 -        int bulklen = atoi(c->argv[c->argc-1]->ptr);
 +        char *eptr;
 +        long bulklen = strtol(c->argv[c->argc-1]->ptr,&eptr,10);
 +        int perr = eptr[0] != '\0';
  
          decrRefCount(c->argv[c->argc-1]);
 -        if (bulklen < 0 || bulklen > 1024*1024*1024) {
 +        if (perr || bulklen == LONG_MAX || bulklen == LONG_MIN ||
 +            bulklen < 0 || bulklen > 1024*1024*1024)
 +        {
              c->argc--;
 -            addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
 +            addReplyError(c,"invalid bulk write count");
              resetClient(c);
              return 1;
          }
  
      /* Check if the user is authenticated */
      if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
 -        addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
 +        addReplyError(c,"operation not permitted");
          resetClient(c);
          return 1;
      }
      if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) &&
          zmalloc_used_memory() > server.maxmemory)
      {
 -        addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
 +        addReplyError(c,"command not allowed when used memory > 'maxmemory'");
          resetClient(c);
          return 1;
      }
          &&
          cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand &&
          cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) {
 -        addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n"));
 +        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
          resetClient(c);
          return 1;
      }
@@@ -1084,7 -1064,11 +1084,7 @@@ int prepareForShutdown() 
          if (server.vm_enabled) unlink(server.vm_swap_file);
      } else {
          /* Snapshotting. Perform a SYNC SAVE and exit */
 -        if (rdbSave(server.dbfilename) == REDIS_OK) {
 -            if (server.daemonize)
 -                unlink(server.pidfile);
 -            redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
 -        } else {
 +        if (rdbSave(server.dbfilename) != REDIS_OK) {
              /* Ooops.. error saving! The best we can do is to continue
               * operating. Note that if there was a background saving process,
               * in the next cron() Redis will be notified that the background
              return REDIS_ERR;
          }
      }
 +    if (server.daemonize) unlink(server.pidfile);
      redisLog(REDIS_WARNING,"Server exit now, bye bye...");
      return REDIS_OK;
  }
@@@ -1107,7 -1090,7 +1107,7 @@@ void authCommand(redisClient *c) 
        addReply(c,shared.ok);
      } else {
        c->authenticated = 0;
 -      addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
 +      addReplyError(c,"invalid password");
      }
  }
  
@@@ -1148,10 -1131,6 +1148,10 @@@ sds genRedisInfoString(void) 
      time_t uptime = time(NULL)-server.stat_starttime;
      int j;
      char hmem[64];
 +    struct rusage self_ru, c_ru;
 +
 +    getrusage(RUSAGE_SELF, &self_ru);
 +    getrusage(RUSAGE_CHILDREN, &c_ru);
  
      bytesToHuman(hmem,zmalloc_used_memory());
      info = sdscatprintf(sdsempty(),
          "process_id:%ld\r\n"
          "uptime_in_seconds:%ld\r\n"
          "uptime_in_days:%ld\r\n"
 +        "used_cpu_sys:%.2f\r\n"
 +        "used_cpu_user:%.2f\r\n"
 +        "used_cpu_sys_childrens:%.2f\r\n"
 +        "used_cpu_user_childrens:%.2f\r\n"
          "connected_clients:%d\r\n"
          "connected_slaves:%d\r\n"
          "blocked_clients:%d\r\n"
          "used_memory:%zu\r\n"
          "used_memory_human:%s\r\n"
 +        "mem_fragmentation_ratio:%.2f\r\n"
          "changes_since_last_save:%lld\r\n"
          "bgsave_in_progress:%d\r\n"
          "last_save_time:%ld\r\n"
          (long) getpid(),
          uptime,
          uptime/(3600*24),
 +        (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000,
 +        (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000,
 +        (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000,
 +        (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
          listLength(server.clients)-listLength(server.slaves),
          listLength(server.slaves),
          server.blpop_blocked_clients,
          zmalloc_used_memory(),
          hmem,
 +        zmalloc_get_fragmentation_ratio(),
          server.dirty,
          server.bgsavechildpid != -1,
          server.lastsave,
@@@ -1333,8 -1302,7 +1333,8 @@@ void freeMemoryIfNeeded(void) 
          if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
          for (j = 0; j < server.dbnum; j++) {
              int minttl = -1;
 -            robj *minkey = NULL;
 +            sds minkey = NULL;
 +            robj *keyobj = NULL;
              struct dictEntry *de;
  
              if (dictSize(server.db[j].expires)) {
                          minttl = t;
                      }
                  }
 -                dbDelete(server.db+j,minkey);
 +                keyobj = createStringObject(minkey,sdslen(minkey));
 +                dbDelete(server.db+j,keyobj);
 +                server.stat_expiredkeys++;
 +                decrRefCount(keyobj);
              }
          }
          if (!freed) return; /* nothing to free... */
@@@ -1385,17 -1350,9 +1385,17 @@@ void linuxOvercommitMemoryWarning(void
  }
  #endif /* __linux__ */
  
 +void createPidFile(void) {
 +    /* Try to write the pid file in a best-effort way. */
 +    FILE *fp = fopen(server.pidfile,"w");
 +    if (fp) {
 +        fprintf(fp,"%d\n",getpid());
 +        fclose(fp);
 +    }
 +}
 +
  void daemonize(void) {
      int fd;
 -    FILE *fp;
  
      if (fork() != 0) exit(0); /* parent exits */
      setsid(); /* create a new session */
          dup2(fd, STDERR_FILENO);
          if (fd > STDERR_FILENO) close(fd);
      }
 -    /* Try to write the pid file */
 -    fp = fopen(server.pidfile,"w");
 -    if (fp) {
 -        fprintf(fp,"%d\n",getpid());
 -        fclose(fp);
 -    }
  }
  
  void version() {
@@@ -1441,7 -1404,6 +1441,7 @@@ int main(int argc, char **argv) 
      }
      if (server.daemonize) daemonize();
      initServer();
 +    if (server.daemonize) createPidFile();
      redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
  #ifdef __linux__
      linuxOvercommitMemoryWarning();
@@@ -1518,7 -1480,6 +1518,7 @@@ void segvHandler(int sig, siginfo_t *in
          redisLog(REDIS_WARNING,"%s", messages[i]);
  
      /* free(messages); Don't call free() with possibly corrupted memory. */
 +    if (server.daemonize) unlink(server.pidfile);
      _exit(0);
  }
  
diff --combined src/redis.h
index 38727ae200d48493aaf2cac7420e3485eab18ed1,4b45f5f4b40856c2d990dfc58a73773842f31128..3de054f286140abeafd642ca3c88312d4673a90e
@@@ -26,7 -26,6 +26,7 @@@
  #include "anet.h"   /* Networking the easy way */
  #include "zipmap.h" /* Compact string -> string data structure */
  #include "ziplist.h" /* Compact list data structure */
 +#include "intset.h" /* Compact integer set structure */
  #include "version.h"
  
  /* Error codes */
@@@ -47,7 -46,6 +47,7 @@@
  #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
  #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
  #define REDIS_SHARED_INTEGERS 10000
 +#define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */
  
  /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
  #define REDIS_WRITEV_THRESHOLD      3
@@@ -84,7 -82,6 +84,7 @@@
  #define REDIS_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
  #define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
  #define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
 +#define REDIS_ENCODING_INTSET 6  /* Encoded as intset */
  
  /* Object types only used for dumping to disk */
  #define REDIS_EXPIRETIME 253
  #define REDIS_HASH_MAX_ZIPMAP_VALUE 512
  #define REDIS_LIST_MAX_ZIPLIST_ENTRIES 1024
  #define REDIS_LIST_MAX_ZIPLIST_VALUE 32
 +#define REDIS_SET_MAX_INTSET_ENTRIES 4096
  
  /* Sets operations codes */
  #define REDIS_OP_UNION 0
@@@ -287,7 -283,7 +287,7 @@@ typedef struct redisClient 
      sds querybuf;
      robj **argv, **mbargv;
      int argc, mbargc;
 -    int bulklen;            /* bulk read len. -1 if not in bulk read mode */
 +    long bulklen;            /* bulk read len. -1 if not in bulk read mode */
      int multibulk;          /* multi bulk command format active */
      list *reply;
      int sentlen;
      list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
      dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
      list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
 +
 +    /* Response buffer */
 +    int bufpos;
 +    char buf[REDIS_REPLY_CHUNK_BYTES];
  } redisClient;
  
  struct saveparam {
@@@ -340,7 -332,6 +340,7 @@@ struct redisServer 
      int fd;
      redisDb *db;
      long long dirty;            /* changes to DB from the last save */
 +    long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */
      list *clients;
      list *slaves, *monitors;
      char neterr[ANET_ERR_LEN];
      size_t hash_max_zipmap_value;
      size_t list_max_ziplist_entries;
      size_t list_max_ziplist_value;
 +    size_t set_max_intset_entries;
      /* Virtual memory state */
      FILE *vm_fp;
      int vm_fd;
@@@ -490,13 -480,14 +490,14 @@@ typedef struct _redisSortOperation 
  } redisSortOperation;
  
  /* ZSETs use a specialized version of Skiplists */
  typedef struct zskiplistNode {
-     struct zskiplistNode **forward;
-     struct zskiplistNode *backward;
-     unsigned int *span;
-     double score;
      robj *obj;
+     double score;
+     struct zskiplistNode *backward;
+     struct zskiplistLevel {
+         struct zskiplistNode *forward;
+         unsigned int span;
+     } level[];
  } zskiplistNode;
  
  typedef struct zskiplist {
@@@ -545,14 -536,6 +546,14 @@@ typedef struct 
      listNode *ln;       /* Entry in linked list */
  } listTypeEntry;
  
 +/* Structure to hold set iteration abstraction. */
 +typedef struct {
 +    robj *subject;
 +    int encoding;
 +    int ii; /* intset iterator */
 +    dictIterator *di;
 +} setTypeIterator;
 +
  /* Structure to hold hash iteration abstration. Note that iteration over
   * hashes involves both fields and values. Because it is possible that
   * not both are required, store pointers in the iterator to avoid
@@@ -593,8 -576,6 +594,8 @@@ void resetClient(redisClient *c)
  void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
  void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
  void addReply(redisClient *c, robj *obj);
 +void *addDeferredMultiBulkLength(redisClient *c);
 +void setDeferredMultiBulkLength(redisClient *c, void *node, long length);
  void addReplySds(redisClient *c, sds s);
  void processInputBuffer(redisClient *c);
  void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
@@@ -604,23 -585,11 +605,23 @@@ void addReplyBulkCString(redisClient *c
  void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
  void addReply(redisClient *c, robj *obj);
  void addReplySds(redisClient *c, sds s);
 +void addReplyError(redisClient *c, char *err);
 +void addReplyStatus(redisClient *c, char *status);
  void addReplyDouble(redisClient *c, double d);
  void addReplyLongLong(redisClient *c, long long ll);
 -void addReplyUlong(redisClient *c, unsigned long ul);
 +void addReplyMultiBulkLen(redisClient *c, long length);
  void *dupClientReplyValue(void *o);
  
 +#ifdef __GNUC__
 +void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
 +    __attribute__((format(printf, 2, 3)));
 +void addReplyStatusFormat(redisClient *c, const char *fmt, ...)
 +    __attribute__((format(printf, 2, 3)));
 +#else
 +void addReplyErrorFormat(redisClient *c, const char *fmt, ...);
 +void addReplyStatusFormat(redisClient *c, const char *fmt, ...);
 +#endif
 +
  /* List data type */
  void listTypeTryConversion(robj *subject, robj *value);
  void listTypePush(robj *subject, robj *value, int where);
@@@ -665,7 -634,6 +666,7 @@@ robj *createStringObjectFromLongLong(lo
  robj *createListObject(void);
  robj *createZiplistObject(void);
  robj *createSetObject(void);
 +robj *createIntsetObject(void);
  robj *createHashObject(void);
  robj *createZsetObject(void);
  int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg);
@@@ -707,7 -675,7 +708,7 @@@ void backgroundRewriteDoneHandler(int s
  /* Sorted sets data type */
  zskiplist *zslCreate(void);
  void zslFree(zskiplist *zsl);
void zslInsert(zskiplist *zsl, double score, robj *obj);
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
  
  /* Core functions */
  void freeMemoryIfNeeded(void);
@@@ -749,18 -717,6 +750,18 @@@ int dontWaitForSwappedKey(redisClient *
  void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
  vmpointer *vmSwapObjectBlocking(robj *val);
  
 +/* Set data type */
 +robj *setTypeCreate(robj *value);
 +int setTypeAdd(robj *subject, robj *value);
 +int setTypeRemove(robj *subject, robj *value);
 +int setTypeIsMember(robj *subject, robj *value);
 +setTypeIterator *setTypeInitIterator(robj *subject);
 +void setTypeReleaseIterator(setTypeIterator *si);
 +robj *setTypeNext(setTypeIterator *si);
 +robj *setTypeRandomElement(robj *subject);
 +unsigned long setTypeSize(robj *subject);
 +void setTypeConvert(robj *subject, int enc);
 +
  /* Hash data type */
  void convertToRealHash(robj *o);
  void hashTypeTryConversion(robj *subject, robj **argv, int start, int end);
@@@ -789,8 -745,6 +790,8 @@@ int stringmatch(const char *pattern, co
  long long memtoll(const char *p, int *err);
  int ll2string(char *s, size_t len, long long value);
  int isStringRepresentableAsLong(sds s, long *longval);
 +int isStringRepresentableAsLongLong(sds s, long long *longval);
 +int isObjectRepresentableAsLongLong(robj *o, long long *llongval);
  
  /* Configuration */
  void loadServerConfig(char *filename);
@@@ -799,10 -753,10 +800,10 @@@ void resetServerSaveParams()
  
  /* db.c -- Keyspace access API */
  int removeExpire(redisDb *db, robj *key);
 +void propagateExpire(redisDb *db, robj *key);
  int expireIfNeeded(redisDb *db, robj *key);
 -int deleteIfVolatile(redisDb *db, robj *key);
  time_t getExpire(redisDb *db, robj *key);
 -int setExpire(redisDb *db, robj *key, time_t when);
 +void setExpire(redisDb *db, robj *key, time_t when);
  robj *lookupKey(redisDb *db, robj *key);
  robj *lookupKeyRead(redisDb *db, robj *key);
  robj *lookupKeyWrite(redisDb *db, robj *key);
@@@ -885,7 -839,6 +886,7 @@@ void expireCommand(redisClient *c)
  void expireatCommand(redisClient *c);
  void getsetCommand(redisClient *c);
  void ttlCommand(redisClient *c);
 +void persistCommand(redisClient *c);
  void slaveofCommand(redisClient *c);
  void debugCommand(redisClient *c);
  void msetCommand(redisClient *c);
diff --combined src/t_zset.c
index d944e92329fca9d8f8f0033216dbf7f0ae631706,3d9f612afd66eb96c9f2d40584627c9dd5a799b2..93ade5aaf01b6f07d7c976594bd1aa194486b285
   * from tail to head, useful for ZREVRANGE. */
  
  zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
-     zskiplistNode *zn = zmalloc(sizeof(*zn));
-     zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
-     if (level > 1)
-         zn->span = zmalloc(sizeof(unsigned int) * (level - 1));
-     else
-         zn->span = NULL;
+     zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
      zn->score = score;
      zn->obj = obj;
      return zn;
@@@ -45,11 -39,8 +39,8 @@@ zskiplist *zslCreate(void) 
      zsl->length = 0;
      zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
      for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
-         zsl->header->forward[j] = NULL;
-         /* span has space for ZSKIPLIST_MAXLEVEL-1 elements */
-         if (j < ZSKIPLIST_MAXLEVEL-1)
-             zsl->header->span[j] = 0;
+         zsl->header->level[j].forward = NULL;
+         zsl->header->level[j].span = 0;
      }
      zsl->header->backward = NULL;
      zsl->tail = NULL;
  
  void zslFreeNode(zskiplistNode *node) {
      decrRefCount(node->obj);
-     zfree(node->forward);
-     zfree(node->span);
      zfree(node);
  }
  
  void zslFree(zskiplist *zsl) {
-     zskiplistNode *node = zsl->header->forward[0], *next;
+     zskiplistNode *node = zsl->header->level[0].forward, *next;
  
-     zfree(zsl->header->forward);
-     zfree(zsl->header->span);
      zfree(zsl->header);
      while(node) {
-         next = node->forward[0];
+         next = node->level[0].forward;
          zslFreeNode(node);
          node = next;
      }
@@@ -84,7 -71,7 +71,7 @@@ int zslRandomLevel(void) 
      return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
  }
  
void zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
      zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
      unsigned int rank[ZSKIPLIST_MAXLEVEL];
      int i, level;
      for (i = zsl->level-1; i >= 0; i--) {
          /* store rank that is crossed to reach the insert position */
          rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
-         while (x->forward[i] &&
-             (x->forward[i]->score < score ||
-                 (x->forward[i]->score == score &&
-                 compareStringObjects(x->forward[i]->obj,obj) < 0))) {
-             rank[i] += i > 0 ? x->span[i-1] : 1;
-             x = x->forward[i];
+         while (x->level[i].forward &&
+             (x->level[i].forward->score < score ||
+                 (x->level[i].forward->score == score &&
+                 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
+             rank[i] += x->level[i].span;
+             x = x->level[i].forward;
          }
          update[i] = x;
      }
          for (i = zsl->level; i < level; i++) {
              rank[i] = 0;
              update[i] = zsl->header;
-             update[i]->span[i-1] = zsl->length;
+             update[i]->level[i].span = zsl->length;
          }
          zsl->level = level;
      }
      x = zslCreateNode(level,score,obj);
      for (i = 0; i < level; i++) {
-         x->forward[i] = update[i]->forward[i];
-         update[i]->forward[i] = x;
+         x->level[i].forward = update[i]->level[i].forward;
+         update[i]->level[i].forward = x;
  
          /* update span covered by update[i] as x is inserted here */
-         if (i > 0) {
-             x->span[i-1] = update[i]->span[i-1] - (rank[0] - rank[i]);
-             update[i]->span[i-1] = (rank[0] - rank[i]) + 1;
-         }
+         x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
+         update[i]->level[i].span = (rank[0] - rank[i]) + 1;
      }
  
      /* increment span for untouched levels */
      for (i = level; i < zsl->level; i++) {
-         update[i]->span[i-1]++;
+         update[i]->level[i].span++;
      }
  
      x->backward = (update[0] == zsl->header) ? NULL : update[0];
-     if (x->forward[0])
-         x->forward[0]->backward = x;
+     if (x->level[0].forward)
+         x->level[0].forward->backward = x;
      else
          zsl->tail = x;
      zsl->length++;
+     return x;
  }
  
  /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
  void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
      int i;
      for (i = 0; i < zsl->level; i++) {
-         if (update[i]->forward[i] == x) {
-             if (i > 0) {
-                 update[i]->span[i-1] += x->span[i-1] - 1;
-             }
-             update[i]->forward[i] = x->forward[i];
+         if (update[i]->level[i].forward == x) {
+             update[i]->level[i].span += x->level[i].span - 1;
+             update[i]->level[i].forward = x->level[i].forward;
          } else {
-             /* invariant: i > 0, because update[0]->forward[0]
-              * is always equal to x */
-             update[i]->span[i-1] -= 1;
+             update[i]->level[i].span -= 1;
          }
      }
-     if (x->forward[0]) {
-         x->forward[0]->backward = x->backward;
+     if (x->level[0].forward) {
+         x->level[0].forward->backward = x->backward;
      } else {
          zsl->tail = x->backward;
      }
-     while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
+     while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
          zsl->level--;
      zsl->length--;
  }
@@@ -173,16 -154,16 +154,16 @@@ int zslDelete(zskiplist *zsl, double sc
  
      x = zsl->header;
      for (i = zsl->level-1; i >= 0; i--) {
-         while (x->forward[i] &&
-             (x->forward[i]->score < score ||
-                 (x->forward[i]->score == score &&
-                 compareStringObjects(x->forward[i]->obj,obj) < 0)))
-             x = x->forward[i];
+         while (x->level[i].forward &&
+             (x->level[i].forward->score < score ||
+                 (x->level[i].forward->score == score &&
+                 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
+             x = x->level[i].forward;
          update[i] = x;
      }
      /* We may have multiple elements with the same score, what we need
       * is to find the element with both the right score and object. */
-     x = x->forward[0];
+     x = x->level[0].forward;
      if (x && score == x->score && equalStringObjects(x->obj,obj)) {
          zslDeleteNode(zsl, x, update);
          zslFreeNode(x);
@@@ -204,16 -185,16 +185,16 @@@ unsigned long zslDeleteRangeByScore(zsk
  
      x = zsl->header;
      for (i = zsl->level-1; i >= 0; i--) {
-         while (x->forward[i] && x->forward[i]->score < min)
-             x = x->forward[i];
+         while (x->level[i].forward && x->level[i].forward->score < min)
+             x = x->level[i].forward;
          update[i] = x;
      }
      /* We may have multiple elements with the same score, what we need
       * is to find the element with both the right score and object. */
-     x = x->forward[0];
+     x = x->level[0].forward;
      while (x && x->score <= max) {
-         zskiplistNode *next = x->forward[0];
-         zslDeleteNode(zsl, x, update);
+         zskiplistNode *next = x->level[0].forward;
+         zslDeleteNode(zsl,x,update);
          dictDelete(dict,x->obj);
          zslFreeNode(x);
          removed++;
@@@ -231,18 -212,18 +212,18 @@@ unsigned long zslDeleteRangeByRank(zski
  
      x = zsl->header;
      for (i = zsl->level-1; i >= 0; i--) {
-         while (x->forward[i] && (traversed + (i > 0 ? x->span[i-1] : 1)) < start) {
-             traversed += i > 0 ? x->span[i-1] : 1;
-             x = x->forward[i];
+         while (x->level[i].forward && (traversed + x->level[i].span) < start) {
+             traversed += x->level[i].span;
+             x = x->level[i].forward;
          }
          update[i] = x;
      }
  
      traversed++;
-     x = x->forward[0];
+     x = x->level[0].forward;
      while (x && traversed <= end) {
-         zskiplistNode *next = x->forward[0];
-         zslDeleteNode(zsl, x, update);
+         zskiplistNode *next = x->level[0].forward;
+         zslDeleteNode(zsl,x,update);
          dictDelete(dict,x->obj);
          zslFreeNode(x);
          removed++;
@@@ -260,12 -241,12 +241,12 @@@ zskiplistNode *zslFirstWithScore(zskipl
  
      x = zsl->header;
      for (i = zsl->level-1; i >= 0; i--) {
-         while (x->forward[i] && x->forward[i]->score < score)
-             x = x->forward[i];
+         while (x->level[i].forward && x->level[i].forward->score < score)
+             x = x->level[i].forward;
      }
      /* We may have multiple elements with the same score, what we need
       * is to find the element with both the right score and object. */
-     return x->forward[0];
+     return x->level[0].forward;
  }
  
  /* Find the rank for an element by both score and key.
@@@ -279,12 -260,12 +260,12 @@@ unsigned long zslistTypeGetRank(zskipli
  
      x = zsl->header;
      for (i = zsl->level-1; i >= 0; i--) {
-         while (x->forward[i] &&
-             (x->forward[i]->score < score ||
-                 (x->forward[i]->score == score &&
-                 compareStringObjects(x->forward[i]->obj,o) <= 0))) {
-             rank += i > 0 ? x->span[i-1] : 1;
-             x = x->forward[i];
+         while (x->level[i].forward &&
+             (x->level[i].forward->score < score ||
+                 (x->level[i].forward->score == score &&
+                 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
+             rank += x->level[i].span;
+             x = x->level[i].forward;
          }
  
          /* x might be equal to zsl->header, so test if obj is non-NULL */
@@@ -303,10 -284,10 +284,10 @@@ zskiplistNode* zslistTypeGetElementByRa
  
      x = zsl->header;
      for (i = zsl->level-1; i >= 0; i--) {
-         while (x->forward[i] && (traversed + (i>0 ? x->span[i-1] : 1)) <= rank)
+         while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
          {
-             traversed += i > 0 ? x->span[i-1] : 1;
-             x = x->forward[i];
+             traversed += x->level[i].span;
+             x = x->level[i].forward;
          }
          if (traversed == rank) {
              return x;
   * Sorted set commands 
   *----------------------------------------------------------------------------*/
  
- /* This generic command implements both ZADD and ZINCRBY.
-  * scoreval is the score if the operation is a ZADD (doincrement == 0) or
-  * the increment if the operation is a ZINCRBY (doincrement == 1). */
- void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
+ /* This generic command implements both ZADD and ZINCRBY. */
+ void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double score, int incr) {
      robj *zsetobj;
      zset *zs;
-     double *score;
+     zskiplistNode *znode;
  
      zsetobj = lookupKeyWrite(c->db,key);
      if (zsetobj == NULL) {
      }
      zs = zsetobj->ptr;
  
-     /* Ok now since we implement both ZADD and ZINCRBY here the code
-      * needs to handle the two different conditions. It's all about setting
-      * '*score', that is, the new score to set, to the right value. */
-     score = zmalloc(sizeof(double));
-     if (doincrement) {
-         dictEntry *de;
+     /* Since both ZADD and ZINCRBY are implemented here, we need to increment
+      * the score first by the current score if ZINCRBY is called. */
+     if (incr) {
          /* Read the old score. If the element was not present starts from 0 */
-         de = dictFind(zs->dict,ele);
-         if (de) {
-             double *oldscore = dictGetEntryVal(de);
-             *score = *oldscore + scoreval;
-         } else {
-             *score = scoreval;
-         }
-         if (isnan(*score)) {
+         dictEntry *de = dictFind(zs->dict,ele);
+         if (de != NULL)
+             score += *(double*)dictGetEntryVal(de);
+         if (isnan(score)) {
 -            addReplySds(c,
 -                sdsnew("-ERR resulting score is not a number (NaN)\r\n"));
 +            addReplyError(c,"resulting score is not a number (NaN)");
-             zfree(score);
              /* Note that we don't need to check if the zset may be empty and
               * should be removed here, as we can only obtain Nan as score if
               * there was already an element in the sorted set. */
              return;
          }
-     } else {
-         *score = scoreval;
      }
  
-     /* What follows is a simple remove and re-insert operation that is common
-      * to both ZADD and ZINCRBY... */
-     if (dictAdd(zs->dict,ele,score) == DICT_OK) {
-         /* case 1: New element */
+     /* We need to remove and re-insert the element when it was already present
+      * in the dictionary, to update the skiplist. Note that we delay adding a
+      * pointer to the score because we want to reference the score in the
+      * skiplist node. */
+     if (dictAdd(zs->dict,ele,NULL) == DICT_OK) {
+         dictEntry *de;
+         /* New element */
          incrRefCount(ele); /* added to hash */
-         zslInsert(zs->zsl,*score,ele);
+         znode = zslInsert(zs->zsl,score,ele);
          incrRefCount(ele); /* added to skiplist */
+         /* Update the score in the dict entry */
+         de = dictFind(zs->dict,ele);
+         redisAssert(de != NULL);
+         dictGetEntryVal(de) = &znode->score;
          touchWatchedKey(c->db,c->argv[1]);
          server.dirty++;
-         if (doincrement)
-             addReplyDouble(c,*score);
+         if (incr)
+             addReplyDouble(c,score);
          else
              addReply(c,shared.cone);
      } else {
          dictEntry *de;
-         double *oldscore;
+         robj *curobj;
+         double *curscore;
+         int deleted;
  
-         /* case 2: Score update operation */
+         /* Update score */
          de = dictFind(zs->dict,ele);
          redisAssert(de != NULL);
-         oldscore = dictGetEntryVal(de);
-         if (*score != *oldscore) {
-             int deleted;
+         curobj = dictGetEntryKey(de);
+         curscore = dictGetEntryVal(de);
  
-             /* Remove and insert the element in the skip list with new score */
-             deleted = zslDelete(zs->zsl,*oldscore,ele);
+         /* When the score is updated, reuse the existing string object to
+          * prevent extra alloc/dealloc of strings on ZINCRBY. */
+         if (score != *curscore) {
+             deleted = zslDelete(zs->zsl,*curscore,curobj);
              redisAssert(deleted != 0);
-             zslInsert(zs->zsl,*score,ele);
-             incrRefCount(ele);
-             /* Update the score in the hash table */
-             dictReplace(zs->dict,ele,score);
+             znode = zslInsert(zs->zsl,score,curobj);
+             incrRefCount(curobj);
+             /* Update the score in the current dict entry */
+             dictGetEntryVal(de) = &znode->score;
              touchWatchedKey(c->db,c->argv[1]);
              server.dirty++;
-         } else {
-             zfree(score);
          }
-         if (doincrement)
-             addReplyDouble(c,*score);
+         if (incr)
+             addReplyDouble(c,score);
          else
              addReply(c,shared.czero);
      }
@@@ -425,7 -406,7 +405,7 @@@ void zremCommand(redisClient *c) 
      robj *zsetobj;
      zset *zs;
      dictEntry *de;
-     double *oldscore;
+     double curscore;
      int deleted;
  
      if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
          return;
      }
      /* Delete from the skiplist */
-     oldscore = dictGetEntryVal(de);
-     deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
+     curscore = *(double*)dictGetEntryVal(de);
+     deleted = zslDelete(zs->zsl,curscore,c->argv[2]);
      redisAssert(deleted != 0);
  
      /* Delete from the hash table */
@@@ -553,6 -534,7 +533,7 @@@ void zunionInterGenericCommand(redisCli
      zsetopsrc *src;
      robj *dstobj;
      zset *dstzset;
+     zskiplistNode *znode;
      dictIterator *di;
      dictEntry *de;
      int touched = 0;
      /* expect setnum input keys to be given */
      setnum = atoi(c->argv[2]->ptr);
      if (setnum < 1) {
 -        addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n"));
 +        addReplyError(c,
 +            "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
          return;
      }
  
                      zfree(score);
                  } else {
                      robj *o = dictGetEntryKey(de);
-                     dictAdd(dstzset->dict,o,score);
-                     incrRefCount(o); /* added to dictionary */
-                     zslInsert(dstzset->zsl,*score,o);
+                     znode = zslInsert(dstzset->zsl,*score,o);
                      incrRefCount(o); /* added to skiplist */
+                     dictAdd(dstzset->dict,o,&znode->score);
+                     incrRefCount(o); /* added to dictionary */
                  }
              }
              dictReleaseIterator(di);
                  }
  
                  robj *o = dictGetEntryKey(de);
-                 dictAdd(dstzset->dict,o,score);
-                 incrRefCount(o); /* added to dictionary */
-                 zslInsert(dstzset->zsl,*score,o);
+                 znode = zslInsert(dstzset->zsl,*score,o);
                  incrRefCount(o); /* added to skiplist */
+                 dictAdd(dstzset->dict,o,&znode->score);
+                 incrRefCount(o); /* added to dictionary */
              }
              dictReleaseIterator(di);
          }
@@@ -778,17 -759,18 +759,17 @@@ void zrangeGenericCommand(redisClient *
          ln = start == 0 ? zsl->tail : zslistTypeGetElementByRank(zsl, llen-start);
      } else {
          ln = start == 0 ?
-             zsl->header->forward[0] : zslistTypeGetElementByRank(zsl, start+1);
+             zsl->header->level[0].forward : zslistTypeGetElementByRank(zsl, start+1);
      }
  
      /* Return the result in form of a multi-bulk reply */
 -    addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
 -        withscores ? (rangelen*2) : rangelen));
 +    addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen);
      for (j = 0; j < rangelen; j++) {
          ele = ln->obj;
          addReplyBulk(c,ele);
          if (withscores)
              addReplyDouble(c,ln->score);
-         ln = reverse ? ln->backward : ln->forward[0];
+         ln = reverse ? ln->backward : ln->level[0].forward;
      }
  }
  
@@@ -839,7 -821,8 +820,7 @@@ void genericZrangebyscoreCommand(redisC
      if (c->argc != (4 + withscores) && c->argc != (7 + withscores))
          badsyntax = 1;
      if (badsyntax) {
 -        addReplySds(c,
 -            sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
 +        addReplyError(c,"wrong number of arguments for ZRANGEBYSCORE");
          return;
      }
  
              zset *zsetobj = o->ptr;
              zskiplist *zsl = zsetobj->zsl;
              zskiplistNode *ln;
 -            robj *ele, *lenobj = NULL;
 +            robj *ele;
 +            void *replylen = NULL;
              unsigned long rangelen = 0;
  
              /* Get the first node with the score >= min, or with
               * score > min if 'minex' is true. */
              ln = zslFirstWithScore(zsl,min);
-             while (minex && ln && ln->score == min) ln = ln->forward[0];
+             while (minex && ln && ln->score == min) ln = ln->level[0].forward;
  
              if (ln == NULL) {
                  /* No element matching the speciifed interval */
               * are in the list, so we push this object that will represent
               * the multi-bulk length in the output buffer, and will "fix"
               * it later */
 -            if (!justcount) {
 -                lenobj = createObject(REDIS_STRING,NULL);
 -                addReply(c,lenobj);
 -                decrRefCount(lenobj);
 -            }
 +            if (!justcount)
 +                replylen = addDeferredMultiBulkLength(c);
  
              while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) {
                  if (offset) {
                      offset--;
-                     ln = ln->forward[0];
+                     ln = ln->level[0].forward;
                      continue;
                  }
                  if (limit == 0) break;
                      if (withscores)
                          addReplyDouble(c,ln->score);
                  }
-                 ln = ln->forward[0];
+                 ln = ln->level[0].forward;
                  rangelen++;
                  if (limit > 0) limit--;
              }
              if (justcount) {
                  addReplyLongLong(c,(long)rangelen);
              } else {
 -                lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",
 +                setDeferredMultiBulkLength(c,replylen,
                       withscores ? (rangelen*2) : rangelen);
              }
          }
@@@ -929,7 -914,7 +910,7 @@@ void zcardCommand(redisClient *c) 
          checkType(c,o,REDIS_ZSET)) return;
  
      zs = o->ptr;
 -    addReplyUlong(c,zs->zsl->length);
 +    addReplyLongLong(c,zs->zsl->length);
  }
  
  void zscoreCommand(redisClient *c) {