Merge master with resolved conflict in src/redis-cli.c
authorPieter Noordhuis <pcnoordhuis@gmail.com>
Wed, 13 Oct 2010 16:55:46 +0000 (18:55 +0200)
committerPieter Noordhuis <pcnoordhuis@gmail.com>
Wed, 13 Oct 2010 16:55:46 +0000 (18:55 +0200)
1  2 
src/aof.c
src/config.c
src/networking.c
src/rdb.c
src/redis-benchmark.c
src/redis-cli.c
src/redis.c
src/redis.h

diff --combined src/aof.c
index 942d4afd20a064dd12a0096974e410ef36cac8e5,167134818ac3292c8095a205378fa9e15bf110da..eb67a7bd546132df6335bd6be2a0eefd2be583ee
+++ b/src/aof.c
@@@ -189,6 -189,7 +189,7 @@@ struct redisClient *createFakeClient(vo
      c->querybuf = sdsempty();
      c->argc = 0;
      c->argv = NULL;
+     c->bufpos = 0;
      c->flags = 0;
      /* We set the fake client as a slave waiting for the synchronization
       * so that Redis will not try to send replies to this client. */
@@@ -272,12 -273,14 +273,14 @@@ int loadAppendOnlyFile(char *filename) 
          fakeClient->argc = argc;
          fakeClient->argv = argv;
          cmd->proc(fakeClient);
-         /* Discard the reply objects list from the fake client */
-         while(listLength(fakeClient->reply))
-             listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
+         /* The fake client should not have a reply */
+         redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
          /* Clean up, ready for the next command */
          for (j = 0; j < argc; j++) decrRefCount(argv[j]);
          zfree(argv);
          /* Handle swapping while loading big datasets when VM is on */
          force_swapout = 0;
          if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
@@@ -307,7 -310,7 +310,7 @@@ readerr
      }
      exit(1);
  fmterr:
-     redisLog(REDIS_WARNING,"Bad file format reading the append only file");
+     redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
      exit(1);
  }
  
@@@ -463,20 -466,30 +466,30 @@@ int rewriteAppendOnlyFile(char *filenam
                      redisPanic("Unknown list encoding");
                  }
              } else if (o->type == REDIS_SET) {
-                 /* Emit the SADDs needed to rebuild the set */
-                 dict *set = o->ptr;
-                 dictIterator *di = dictGetIterator(set);
-                 dictEntry *de;
+                 char cmd[]="*3\r\n$4\r\nSADD\r\n";
  
-                 while((de = dictNext(di)) != NULL) {
-                     char cmd[]="*3\r\n$4\r\nSADD\r\n";
-                     robj *eleobj = dictGetEntryKey(de);
-                     if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                     if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                     if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                 /* Emit the SADDs needed to rebuild the set */
+                 if (o->encoding == REDIS_ENCODING_INTSET) {
+                     int ii = 0;
+                     int64_t llval;
+                     while(intsetGet(o->ptr,ii++,&llval)) {
+                         if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
+                         if (fwriteBulkLongLong(fp,llval) == 0) goto werr;
+                     }
+                 } else if (o->encoding == REDIS_ENCODING_HT) {
+                     dictIterator *di = dictGetIterator(o->ptr);
+                     dictEntry *de;
+                     while((de = dictNext(di)) != NULL) {
+                         robj *eleobj = dictGetEntryKey(de);
+                         if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
+                         if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                     }
+                     dictReleaseIterator(di);
+                 } else {
+                     redisPanic("Unknown set encoding");
                  }
-                 dictReleaseIterator(di);
              } else if (o->type == REDIS_ZSET) {
                  /* Emit the ZADDs needed to rebuild the sorted set */
                  zset *zs = o->ptr;
@@@ -588,8 -601,7 +601,8 @@@ int rewriteAppendOnlyFileBackground(voi
          char tmpfile[256];
  
          if (server.vm_enabled) vmReopenSwapFile();
 -        close(server.fd);
 +        if (server.ipfd > 0) close(server.ipfd);
 +        if (server.sofd > 0) close(server.sofd);
          snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
          if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
              _exit(0);
  
  void bgrewriteaofCommand(redisClient *c) {
      if (server.bgrewritechildpid != -1) {
-         addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
+         addReplyError(c,"Background append only file rewriting already in progress");
          return;
      }
      if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
-         char *status = "+Background append only file rewriting started\r\n";
-         addReplySds(c,sdsnew(status));
+         addReplyStatus(c,"Background append only file rewriting started");
      } else {
          addReply(c,shared.err);
      }
diff --combined src/config.c
index eeec9e8c6e0d51ac08cec46dc6e8dc1cc053a798,1bd678c783050927f662f99e1a78df9ede38d771..4257fc36a30463b4e2160232078238f28e19ffe8
@@@ -71,8 -71,6 +71,8 @@@ void loadServerConfig(char *filename) 
              }
          } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
              server.bindaddr = zstrdup(argv[1]);
 +        } else if (!strcasecmp(argv[0],"unixsocket") && argc == 2) {
 +            server.unixsocket = zstrdup(argv[1]);
          } else if (!strcasecmp(argv[0],"save") && argc == 3) {
              int seconds = atoi(argv[1]);
              int changes = atoi(argv[2]);
              server.list_max_ziplist_entries = memtoll(argv[1], NULL);
          } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2){
              server.list_max_ziplist_value = memtoll(argv[1], NULL);
+         } else if (!strcasecmp(argv[0],"set-max-intset-entries") && argc == 2){
+             server.set_max_intset_entries = memtoll(argv[1], NULL);
          } else {
              err = "Bad directive or wrong number of arguments"; goto loaderr;
          }
@@@ -241,6 -241,7 +243,7 @@@ void configSetCommand(redisClient *c) 
          if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
              ll < 0) goto badfmt;
          server.maxmemory = ll;
+         if (server.maxmemory) freeMemoryIfNeeded();
      } else if (!strcasecmp(c->argv[2]->ptr,"timeout")) {
          if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
              ll < 0 || ll > LONG_MAX) goto badfmt;
                  stopAppendOnly();
              } else {
                  if (startAppendOnly() == REDIS_ERR) {
-                     addReplySds(c,sdscatprintf(sdsempty(),
-                         "-ERR Unable to turn on AOF. Check server logs.\r\n"));
+                     addReplyError(c,
+                         "Unable to turn on AOF. Check server logs.");
                      decrRefCount(o);
                      return;
                  }
          }
          sdsfreesplitres(v,vlen);
      } else {
-         addReplySds(c,sdscatprintf(sdsempty(),
-             "-ERR not supported CONFIG parameter %s\r\n",
-             (char*)c->argv[2]->ptr));
+         addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
+             (char*)c->argv[2]->ptr);
          decrRefCount(o);
          return;
      }
      return;
  
  badfmt: /* Bad format errors */
-     addReplySds(c,sdscatprintf(sdsempty(),
-         "-ERR invalid argument '%s' for CONFIG SET '%s'\r\n",
+     addReplyErrorFormat(c,"Invalid argument '%s' for CONFIG SET '%s'",
              (char*)o->ptr,
-             (char*)c->argv[2]->ptr));
+             (char*)c->argv[2]->ptr);
      decrRefCount(o);
  }
  
  void configGetCommand(redisClient *c) {
      robj *o = getDecodedObject(c->argv[2]);
-     robj *lenobj = createObject(REDIS_STRING,NULL);
+     void *replylen = addDeferredMultiBulkLength(c);
      char *pattern = o->ptr;
      int matches = 0;
  
-     addReply(c,lenobj);
-     decrRefCount(lenobj);
      if (stringmatch(pattern,"dbfilename",0)) {
          addReplyBulkCString(c,"dbfilename");
          addReplyBulkCString(c,server.dbfilename);
          matches++;
      }
      decrRefCount(o);
-     lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2);
+     setDeferredMultiBulkLength(c,replylen,matches*2);
  }
  
  void configCommand(redisClient *c) {
          server.stat_starttime = time(NULL);
          addReply(c,shared.ok);
      } else {
-         addReplySds(c,sdscatprintf(sdsempty(),
-             "-ERR CONFIG subcommand must be one of GET, SET, RESETSTAT\r\n"));
+         addReplyError(c,
+             "CONFIG subcommand must be one of GET, SET, RESETSTAT");
      }
      return;
  
  badarity:
-     addReplySds(c,sdscatprintf(sdsempty(),
-         "-ERR Wrong number of arguments for CONFIG %s\r\n",
-         (char*) c->argv[1]->ptr));
+     addReplyErrorFormat(c,"Wrong number of arguments for CONFIG %s",
+         (char*) c->argv[1]->ptr);
  }
diff --combined src/networking.c
index 4e93186e02549163236eef3ed0ae53a45b27c49b,632fd04769dca5a3c464b26b982d8a9a064a4faa..d1c6a75add020014e0a7baa44ab2be105d7287da
@@@ -1,5 -1,4 +1,4 @@@
  #include "redis.h"
  #include <sys/uio.h>
  
  void *dupClientReplyValue(void *o) {
@@@ -12,14 -11,24 +11,24 @@@ int listMatchObjects(void *a, void *b) 
  }
  
  redisClient *createClient(int fd) {
-     redisClient *c = zmalloc(sizeof(*c));
+     redisClient *c = zmalloc(sizeof(redisClient));
+     c->bufpos = 0;
  
      anetNonBlock(NULL,fd);
      anetTcpNoDelay(NULL,fd);
      if (!c) return NULL;
+     if (aeCreateFileEvent(server.el,fd,AE_READABLE,
+         readQueryFromClient, c) == AE_ERR)
+     {
+         close(fd);
+         zfree(c);
+         return NULL;
+     }
      selectDb(c,0);
      c->fd = fd;
      c->querybuf = sdsempty();
+     c->newline = NULL;
      c->argc = 0;
      c->argv = NULL;
      c->bulklen = -1;
      c->pubsub_patterns = listCreate();
      listSetFreeMethod(c->pubsub_patterns,decrRefCount);
      listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
-     if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
-         readQueryFromClient, c) == AE_ERR) {
-         freeClient(c);
-         return NULL;
-     }
      listAddNodeTail(server.clients,c);
      initClientMultiState(c);
      return c;
  }
  
- void addReply(redisClient *c, robj *obj) {
-     if (listLength(c->reply) == 0 &&
+ int _installWriteEvent(redisClient *c) {
+     if (c->fd <= 0) return REDIS_ERR;
+     if (c->bufpos == 0 && listLength(c->reply) == 0 &&
          (c->replstate == REDIS_REPL_NONE ||
           c->replstate == REDIS_REPL_ONLINE) &&
          aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
-         sendReplyToClient, c) == AE_ERR) return;
+         sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
+     return REDIS_OK;
+ }
  
-     if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
-         obj = dupStringObject(obj);
-         obj->refcount = 0; /* getDecodedObject() will increment the refcount */
+ /* Create a duplicate of the last object in the reply list when
+  * it is not exclusively owned by the reply list. */
+ robj *dupLastObjectIfNeeded(list *reply) {
+     robj *new, *cur;
+     listNode *ln;
+     redisAssert(listLength(reply) > 0);
+     ln = listLast(reply);
+     cur = listNodeValue(ln);
+     if (cur->refcount > 1) {
+         new = dupStringObject(cur);
+         decrRefCount(cur);
+         listNodeValue(ln) = new;
      }
-     listAddNodeTail(c->reply,getDecodedObject(obj));
+     return listNodeValue(ln);
  }
  
- void addReplySds(redisClient *c, sds s) {
-     robj *o = createObject(REDIS_STRING,s);
-     addReply(c,o);
-     decrRefCount(o);
+ int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
+     size_t available = sizeof(c->buf)-c->bufpos;
+     /* If there already are entries in the reply list, we cannot
+      * add anything more to the static buffer. */
+     if (listLength(c->reply) > 0) return REDIS_ERR;
+     /* Check that the buffer has enough space available for this string. */
+     if (len > available) return REDIS_ERR;
+     memcpy(c->buf+c->bufpos,s,len);
+     c->bufpos+=len;
+     return REDIS_OK;
  }
  
- void addReplyDouble(redisClient *c, double d) {
-     char buf[128];
+ void _addReplyObjectToList(redisClient *c, robj *o) {
+     robj *tail;
+     if (listLength(c->reply) == 0) {
+         incrRefCount(o);
+         listAddNodeTail(c->reply,o);
+     } else {
+         tail = listNodeValue(listLast(c->reply));
  
-     snprintf(buf,sizeof(buf),"%.17g",d);
-     addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
-         (unsigned long) strlen(buf),buf));
+         /* Append to this object when possible. */
+         if (tail->ptr != NULL &&
+             sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
+         {
+             tail = dupLastObjectIfNeeded(c->reply);
+             tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
+         } else {
+             incrRefCount(o);
+             listAddNodeTail(c->reply,o);
+         }
+     }
  }
  
- void addReplyLongLong(redisClient *c, long long ll) {
-     char buf[128];
-     size_t len;
+ /* This method takes responsibility over the sds. When it is no longer
+  * needed it will be free'd, otherwise it ends up in a robj. */
+ void _addReplySdsToList(redisClient *c, sds s) {
+     robj *tail;
+     if (listLength(c->reply) == 0) {
+         listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
+     } else {
+         tail = listNodeValue(listLast(c->reply));
  
-     if (ll == 0) {
-         addReply(c,shared.czero);
-         return;
-     } else if (ll == 1) {
-         addReply(c,shared.cone);
+         /* Append to this object when possible. */
+         if (tail->ptr != NULL &&
+             sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
+         {
+             tail = dupLastObjectIfNeeded(c->reply);
+             tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
+             sdsfree(s);
+         } else {
+             listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
+         }
+     }
+ }
+ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
+     robj *tail;
+     if (listLength(c->reply) == 0) {
+         listAddNodeTail(c->reply,createStringObject(s,len));
+     } else {
+         tail = listNodeValue(listLast(c->reply));
+         /* Append to this object when possible. */
+         if (tail->ptr != NULL &&
+             sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
+         {
+             tail = dupLastObjectIfNeeded(c->reply);
+             tail->ptr = sdscatlen(tail->ptr,s,len);
+         } else {
+             listAddNodeTail(c->reply,createStringObject(s,len));
+         }
+     }
+ }
+ void addReply(redisClient *c, robj *obj) {
+     if (_installWriteEvent(c) != REDIS_OK) return;
+     redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY);
+     /* This is an important place where we can avoid copy-on-write
+      * when there is a saving child running, avoiding touching the
+      * refcount field of the object if it's not needed.
+      *
+      * If the encoding is RAW and there is room in the static buffer
+      * we'll be able to send the object to the client without
+      * messing with its page. */
+     if (obj->encoding == REDIS_ENCODING_RAW) {
+         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
+             _addReplyObjectToList(c,obj);
+     } else {
+         obj = getDecodedObject(obj);
+         if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
+             _addReplyObjectToList(c,obj);
+         decrRefCount(obj);
+     }
+ }
+ void addReplySds(redisClient *c, sds s) {
+     if (_installWriteEvent(c) != REDIS_OK) {
+         /* The caller expects the sds to be free'd. */
+         sdsfree(s);
          return;
      }
-     buf[0] = ':';
+     if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
+         sdsfree(s);
+     } else {
+         /* This method free's the sds when it is no longer needed. */
+         _addReplySdsToList(c,s);
+     }
+ }
+ void addReplyString(redisClient *c, char *s, size_t len) {
+     if (_installWriteEvent(c) != REDIS_OK) return;
+     if (_addReplyToBuffer(c,s,len) != REDIS_OK)
+         _addReplyStringToList(c,s,len);
+ }
+ void _addReplyError(redisClient *c, char *s, size_t len) {
+     addReplyString(c,"-ERR ",5);
+     addReplyString(c,s,len);
+     addReplyString(c,"\r\n",2);
+ }
+ void addReplyError(redisClient *c, char *err) {
+     _addReplyError(c,err,strlen(err));
+ }
+ void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
+     va_list ap;
+     va_start(ap,fmt);
+     sds s = sdscatvprintf(sdsempty(),fmt,ap);
+     va_end(ap);
+     _addReplyError(c,s,sdslen(s));
+     sdsfree(s);
+ }
+ void _addReplyStatus(redisClient *c, char *s, size_t len) {
+     addReplyString(c,"+",1);
+     addReplyString(c,s,len);
+     addReplyString(c,"\r\n",2);
+ }
+ void addReplyStatus(redisClient *c, char *status) {
+     _addReplyStatus(c,status,strlen(status));
+ }
+ void addReplyStatusFormat(redisClient *c, const char *fmt, ...) {
+     va_list ap;
+     va_start(ap,fmt);
+     sds s = sdscatvprintf(sdsempty(),fmt,ap);
+     va_end(ap);
+     _addReplyStatus(c,s,sdslen(s));
+     sdsfree(s);
+ }
+ /* Adds an empty object to the reply list that will contain the multi bulk
+  * length, which is not known when this function is called. */
+ void *addDeferredMultiBulkLength(redisClient *c) {
+     /* Note that we install the write event here even if the object is not
+      * ready to be sent, since we are sure that before returning to the
+      * event loop setDeferredMultiBulkLength() will be called. */
+     if (_installWriteEvent(c) != REDIS_OK) return NULL;
+     listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL));
+     return listLast(c->reply);
+ }
+ /* Populate the length object and try glueing it to the next chunk. */
+ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
+     listNode *ln = (listNode*)node;
+     robj *len, *next;
+     /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
+     if (node == NULL) return;
+     len = listNodeValue(ln);
+     len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
+     if (ln->next != NULL) {
+         next = listNodeValue(ln->next);
+         /* Only glue when the next node is non-NULL (an sds in this case) */
+         if (next->ptr != NULL) {
+             len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
+             listDelNode(c->reply,ln->next);
+         }
+     }
+ }
+ void addReplyDouble(redisClient *c, double d) {
+     char dbuf[128], sbuf[128];
+     int dlen, slen;
+     dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
+     slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
+     addReplyString(c,sbuf,slen);
+ }
+ void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
+     char buf[128];
+     int len;
+     buf[0] = prefix;
      len = ll2string(buf+1,sizeof(buf)-1,ll);
      buf[len+1] = '\r';
      buf[len+2] = '\n';
-     addReplySds(c,sdsnewlen(buf,len+3));
+     addReplyString(c,buf,len+3);
  }
  
- void addReplyUlong(redisClient *c, unsigned long ul) {
-     char buf[128];
-     size_t len;
+ void addReplyLongLong(redisClient *c, long long ll) {
+     _addReplyLongLong(c,ll,':');
+ }
  
-     if (ul == 0) {
-         addReply(c,shared.czero);
-         return;
-     } else if (ul == 1) {
-         addReply(c,shared.cone);
-         return;
-     }
-     len = snprintf(buf,sizeof(buf),":%lu\r\n",ul);
-     addReplySds(c,sdsnewlen(buf,len));
+ void addReplyMultiBulkLen(redisClient *c, long length) {
+     _addReplyLongLong(c,length,'*');
  }
  
  void addReplyBulkLen(redisClient *c, robj *obj) {
-     size_t len, intlen;
-     char buf[128];
+     size_t len;
  
      if (obj->encoding == REDIS_ENCODING_RAW) {
          len = sdslen(obj->ptr);
              len++;
          }
      }
-     buf[0] = '$';
-     intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len);
-     buf[intlen+1] = '\r';
-     buf[intlen+2] = '\n';
-     addReplySds(c,sdsnewlen(buf,intlen+3));
+     _addReplyLongLong(c,len,'$');
  }
  
  void addReplyBulk(redisClient *c, robj *obj) {
@@@ -157,11 -336,23 +336,11 @@@ void addReplyBulkCString(redisClient *c
      }
  }
  
 -void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
 -    int cport, cfd;
 -    char cip[128];
 +static void acceptCommonHandler(int fd) {
      redisClient *c;
 -    REDIS_NOTUSED(el);
 -    REDIS_NOTUSED(mask);
 -    REDIS_NOTUSED(privdata);
 -
 -    cfd = anetAccept(server.neterr, fd, cip, &cport);
 -    if (cfd == AE_ERR) {
 -        redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
 -        return;
 -    }
 -    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
 -    if ((c = createClient(cfd)) == NULL) {
 +    if ((c = createClient(fd)) == NULL) {
          redisLog(REDIS_WARNING,"Error allocating resoures for the client");
 -        close(cfd); /* May be already closed, just ingore errors */
 +        close(fd); /* May be already closed, just ingore errors */
          return;
      }
      /* If maxclient directive is set and this is one client more... close the
      server.stat_numconnections++;
  }
  
 +void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
 +    int cport, cfd;
 +    char cip[128];
 +    REDIS_NOTUSED(el);
 +    REDIS_NOTUSED(mask);
 +    REDIS_NOTUSED(privdata);
 +
 +    cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
 +    if (cfd == AE_ERR) {
 +        redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
 +        return;
 +    }
 +    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
 +    acceptCommonHandler(cfd);
 +}
 +
 +void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
 +    int cfd;
 +    REDIS_NOTUSED(el);
 +    REDIS_NOTUSED(mask);
 +    REDIS_NOTUSED(privdata);
 +
 +    cfd = anetUnixAccept(server.neterr, fd);
 +    if (cfd == AE_ERR) {
 +        redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
 +        return;
 +    }
 +    redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
 +    acceptCommonHandler(cfd);
 +}
 +
 +
  static void freeClientArgv(redisClient *c) {
      int j;
  
@@@ -275,7 -434,8 +454,8 @@@ void freeClient(redisClient *c) 
          server.vm_blocked_clients--;
      }
      listRelease(c->io_keys);
-     /* Master/slave cleanup */
+     /* Master/slave cleanup.
+      * Case 1: we lost the connection with a slave. */
      if (c->flags & REDIS_SLAVE) {
          if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
              close(c->repldbfd);
          redisAssert(ln != NULL);
          listDelNode(l,ln);
      }
+     /* Case 2: we lost the connection with the master. */
      if (c->flags & REDIS_MASTER) {
          server.master = NULL;
          server.replstate = REDIS_REPL_CONNECT;
+         /* Since we lost the connection with the master, we should also
+          * close the connection with all our slaves if we have any, so
+          * when we'll resync with the master the other slaves will sync again
+          * with us as well. Note that also when the slave is not connected
+          * to the master it will keep refusing connections by other slaves. */
+         while (listLength(server.slaves)) {
+             ln = listFirst(server.slaves);
+             freeClient((redisClient*)ln->value);
+         }
      }
      /* Release memory */
      zfree(c->argv);
      zfree(c);
  }
  
- #define GLUEREPLY_UP_TO (1024)
- static void glueReplyBuffersIfNeeded(redisClient *c) {
-     int copylen = 0;
-     char buf[GLUEREPLY_UP_TO];
-     listNode *ln;
-     listIter li;
-     robj *o;
-     listRewind(c->reply,&li);
-     while((ln = listNext(&li))) {
-         int objlen;
-         o = ln->value;
-         objlen = sdslen(o->ptr);
-         if (copylen + objlen <= GLUEREPLY_UP_TO) {
-             memcpy(buf+copylen,o->ptr,objlen);
-             copylen += objlen;
-             listDelNode(c->reply,ln);
-         } else {
-             if (copylen == 0) return;
-             break;
-         }
-     }
-     /* Now the output buffer is empty, add the new single element */
-     o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
-     listAddNodeHead(c->reply,o);
- }
  void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
      redisClient *c = privdata;
      int nwritten = 0, totwritten = 0, objlen;
          return;
      }
  
-     while(listLength(c->reply)) {
-         if (server.glueoutputbuf && listLength(c->reply) > 1)
-             glueReplyBuffersIfNeeded(c);
+     while(c->bufpos > 0 || listLength(c->reply)) {
+         if (c->bufpos > 0) {
+             if (c->flags & REDIS_MASTER) {
+                 /* Don't reply to a master */
+                 nwritten = c->bufpos - c->sentlen;
+             } else {
+                 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
+                 if (nwritten <= 0) break;
+             }
+             c->sentlen += nwritten;
+             totwritten += nwritten;
+             /* If the buffer was sent, set bufpos to zero to continue with
+              * the remainder of the reply. */
+             if (c->sentlen == c->bufpos) {
+                 c->bufpos = 0;
+                 c->sentlen = 0;
+             }
+         } else {
+             o = listNodeValue(listFirst(c->reply));
+             objlen = sdslen(o->ptr);
  
-         o = listNodeValue(listFirst(c->reply));
-         objlen = sdslen(o->ptr);
+             if (objlen == 0) {
+                 listDelNode(c->reply,listFirst(c->reply));
+                 continue;
+             }
  
-         if (objlen == 0) {
-             listDelNode(c->reply,listFirst(c->reply));
-             continue;
-         }
+             if (c->flags & REDIS_MASTER) {
+                 /* Don't reply to a master */
+                 nwritten = objlen - c->sentlen;
+             } else {
+                 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
+                 if (nwritten <= 0) break;
+             }
+             c->sentlen += nwritten;
+             totwritten += nwritten;
  
-         if (c->flags & REDIS_MASTER) {
-             /* Don't reply to a master */
-             nwritten = objlen - c->sentlen;
-         } else {
-             nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
-             if (nwritten <= 0) break;
-         }
-         c->sentlen += nwritten;
-         totwritten += nwritten;
-         /* If we fully sent the object on head go to the next one */
-         if (c->sentlen == objlen) {
-             listDelNode(c->reply,listFirst(c->reply));
-             c->sentlen = 0;
+             /* If we fully sent the object on head go to the next one */
+             if (c->sentlen == objlen) {
+                 listDelNode(c->reply,listFirst(c->reply));
+                 c->sentlen = 0;
+             }
          }
          /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
           * bytes, in a single threaded server it's a good idea to serve
@@@ -472,6 -632,7 +652,7 @@@ void resetClient(redisClient *c) 
      freeClientArgv(c);
      c->bulklen = -1;
      c->multibulk = 0;
+     c->newline = NULL;
  }
  
  void closeTimedoutClients(void) {
          if (server.maxidletime &&
              !(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
              !(c->flags & REDIS_MASTER) &&   /* no timeout for masters */
+             !(c->flags & REDIS_BLOCKED) &&  /* no timeout for BLPOP */
              dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
              listLength(c->pubsub_patterns) == 0 &&
              (now - c->lastinteraction > server.maxidletime))
  }
  
  void processInputBuffer(redisClient *c) {
+     int seeknewline = 0;
  again:
      /* Before to process the input buffer, make sure the client is not
       * waitig for a blocking operation such as BLPOP. Note that the first
       * in the input buffer the client may be blocked, and the "goto again"
       * will try to reiterate. The following line will make it return asap. */
      if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
+     if (seeknewline && c->bulklen == -1) c->newline = strchr(c->querybuf,'\n');
+     seeknewline = 1;
      if (c->bulklen == -1) {
          /* Read the first line of the query */
-         char *p = strchr(c->querybuf,'\n');
          size_t querylen;
  
-         if (p) {
+         if (c->newline) {
+             char *p = c->newline;
              sds query, *argv;
              int argc, j;
  
+             c->newline = NULL;
              query = c->querybuf;
              c->querybuf = sdsempty();
              querylen = 1+(p-(query));
@@@ -605,8 -773,14 +793,14 @@@ void readQueryFromClient(aeEventLoop *e
          return;
      }
      if (nread) {
+         size_t oldlen = sdslen(c->querybuf);
          c->querybuf = sdscatlen(c->querybuf, buf, nread);
          c->lastinteraction = time(NULL);
+         /* Scan this new piece of the query for the newline. We do this
+          * here in order to make sure we perform this scan just one time
+          * per piece of buffer, leading to an O(N) scan instead of O(N*N) */
+         if (c->bulklen == -1 && c->newline == NULL)
+             c->newline = strchr(c->querybuf+oldlen,'\n');
      } else {
          return;
      }
diff --combined src/rdb.c
index 3fa284e127be9cd78a1a6bc9c5a7dfaa492189df,a401a5b9dd2d5e59c525796c3be63fe22aeeb950..589b536af5f466b106bb60f4d85c1d5d93652730
+++ b/src/rdb.c
@@@ -260,17 -260,29 +260,29 @@@ int rdbSaveObject(FILE *fp, robj *o) 
          }
      } else if (o->type == REDIS_SET) {
          /* Save a set value */
-         dict *set = o->ptr;
-         dictIterator *di = dictGetIterator(set);
-         dictEntry *de;
-         if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
-         while((de = dictNext(di)) != NULL) {
-             robj *eleobj = dictGetEntryKey(de);
+         if (o->encoding == REDIS_ENCODING_HT) {
+             dict *set = o->ptr;
+             dictIterator *di = dictGetIterator(set);
+             dictEntry *de;
  
-             if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
+             if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
+             while((de = dictNext(di)) != NULL) {
+                 robj *eleobj = dictGetEntryKey(de);
+                 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
+             }
+             dictReleaseIterator(di);
+         } else if (o->encoding == REDIS_ENCODING_INTSET) {
+             intset *is = o->ptr;
+             int64_t llval;
+             int i = 0;
+             if (rdbSaveLen(fp,intsetLen(is)) == -1) return -1;
+             while(intsetGet(is,i++,&llval)) {
+                 if (rdbSaveLongLongAsStringObject(fp,llval) == -1) return -1;
+             }
+         } else {
+             redisPanic("Unknown set encoding");
          }
-         dictReleaseIterator(di);
      } else if (o->type == REDIS_ZSET) {
          /* Save a set value */
          zset *zs = o->ptr;
@@@ -445,11 -457,11 +457,12 @@@ int rdbSaveBackground(char *filename) 
  
      if (server.bgsavechildpid != -1) return REDIS_ERR;
      if (server.vm_enabled) waitEmptyIOJobsQueue();
+     server.dirty_before_bgsave = server.dirty;
      if ((childpid = fork()) == 0) {
          /* Child */
          if (server.vm_enabled) vmReopenSwapFile();
 -        close(server.fd);
 +        if (server.ipfd > 0) close(server.ipfd);
 +        if (server.sofd > 0) close(server.sofd);
          if (rdbSave(filename) == REDIS_OK) {
              _exit(0);
          } else {
@@@ -629,6 -641,7 +642,7 @@@ int rdbLoadDoubleValue(FILE *fp, doubl
  robj *rdbLoadObject(int type, FILE *fp) {
      robj *o, *ele, *dec;
      size_t len;
+     unsigned int i;
  
      redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp));
      if (type == REDIS_STRING) {
      } else if (type == REDIS_SET) {
          /* Read list/set value */
          if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
-         o = createSetObject();
-         /* It's faster to expand the dict to the right size asap in order
-          * to avoid rehashing */
-         if (len > DICT_HT_INITIAL_SIZE)
-             dictExpand(o->ptr,len);
+         /* Use a regular set when there are too many entries. */
+         if (len > server.set_max_intset_entries) {
+             o = createSetObject();
+             /* It's faster to expand the dict to the right size asap in order
+              * to avoid rehashing */
+             if (len > DICT_HT_INITIAL_SIZE)
+                 dictExpand(o->ptr,len);
+         } else {
+             o = createIntsetObject();
+         }
          /* Load every single element of the list/set */
-         while(len--) {
+         for (i = 0; i < len; i++) {
+             long long llval;
              if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
              ele = tryObjectEncoding(ele);
-             dictAdd((dict*)o->ptr,ele,NULL);
+             if (o->encoding == REDIS_ENCODING_INTSET) {
+                 /* Fetch integer value from element */
+                 if (isObjectRepresentableAsLongLong(ele,&llval) == REDIS_OK) {
+                     o->ptr = intsetAdd(o->ptr,llval,NULL);
+                 } else {
+                     setTypeConvert(o,REDIS_ENCODING_HT);
+                     dictExpand(o->ptr,len);
+                 }
+             }
+             /* This will also be called when the set was just converted
+              * to regular hashtable encoded set */
+             if (o->encoding == REDIS_ENCODING_HT) {
+                 dictAdd((dict*)o->ptr,ele,NULL);
+             } else {
+                 decrRefCount(ele);
+             }
          }
      } else if (type == REDIS_ZSET) {
          /* Read list/set value */
          /* Load every single element of the list/set */
          while(zsetlen--) {
              robj *ele;
-             double *score = zmalloc(sizeof(double));
+             double score;
+             zskiplistNode *znode;
  
              if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
              ele = tryObjectEncoding(ele);
-             if (rdbLoadDoubleValue(fp,score) == -1) return NULL;
-             dictAdd(zs->dict,ele,score);
-             zslInsert(zs->zsl,*score,ele);
+             if (rdbLoadDoubleValue(fp,&score) == -1) return NULL;
+             znode = zslInsert(zs->zsl,score,ele);
+             dictAdd(zs->dict,ele,&znode->score);
              incrRefCount(ele); /* added to skiplist */
          }
      } else if (type == REDIS_HASH) {
@@@ -876,7 -915,7 +916,7 @@@ void backgroundSaveDoneHandler(int stat
      if (!bysignal && exitcode == 0) {
          redisLog(REDIS_NOTICE,
              "Background saving terminated with success");
-         server.dirty = 0;
+         server.dirty = server.dirty - server.dirty_before_bgsave;
          server.lastsave = time(NULL);
      } else if (!bysignal && exitcode != 0) {
          redisLog(REDIS_WARNING, "Background saving error");
diff --combined src/redis-benchmark.c
index b3e729f128a7e811982adb57727f8c5a1f68f775,297ecc6c5a87ab28cc8f135fcd7842de4c26f4d4..68c46ad84e6fdd22f436f4803dae2322d7436a3c
@@@ -71,11 -71,11 +71,12 @@@ static struct config 
      aeEventLoop *el;
      char *hostip;
      int hostport;
 +    char *hostsocket;
      int keepalive;
      long long start;
      long long totlatency;
      int *latency;
+     char *title;
      list *clients;
      int quiet;
      int loop;
@@@ -207,16 -207,27 +208,27 @@@ static void clientDone(client c) 
      }
  }
  
+ /* Read a length from the buffer pointed to by *p, store the length in *len,
+  * and return the number of bytes that the cursor advanced. */
+ static int readLen(char *p, int *len) {
+     char *tail = strstr(p,"\r\n");
+     if (tail == NULL)
+         return 0;
+     *tail = '\0';
+     *len = atoi(p+1);
+     return tail+2-p;
+ }
  static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask)
  {
-     char buf[1024];
-     int nread;
+     char buf[1024], *p;
+     int nread, pos=0, len=0;
      client c = privdata;
      REDIS_NOTUSED(el);
      REDIS_NOTUSED(fd);
      REDIS_NOTUSED(mask);
  
-     nread = read(c->fd, buf, 1024);
+     nread = read(c->fd,buf,sizeof(buf));
      if (nread == -1) {
          fprintf(stderr, "Reading from socket: %s\n", strerror(errno));
          freeClient(c);
      }
      c->totreceived += nread;
      c->ibuf = sdscatlen(c->ibuf,buf,nread);
+     len = sdslen(c->ibuf);
  
- processdata:
-     /* Are we waiting for the first line of the command of for  sdf 
-      * count in bulk or multi bulk operations? */
      if (c->replytype == REPLY_INT ||
-         c->replytype == REPLY_RETCODE ||
-         (c->replytype == REPLY_BULK && c->readlen == -1) ||
-         (c->replytype == REPLY_MBULK && c->readlen == -1) ||
-         (c->replytype == REPLY_MBULK && c->mbulk == -1)) {
-         char *p;
-         /* Check if the first line is complete. This is only true if
-          * there is a newline inside the buffer. */
-         if ((p = strchr(c->ibuf,'\n')) != NULL) {
-             if (c->replytype == REPLY_BULK ||
-                 (c->replytype == REPLY_MBULK && c->mbulk != -1))
-             {
-                 /* Read the count of a bulk reply (being it a single bulk or
-                  * a multi bulk reply). "$<count>" for the protocol spec. */
-                 *p = '\0';
-                 *(p-1) = '\0';
-                 c->readlen = atoi(c->ibuf+1)+2;
-                 // printf("BULK ATOI: %s\n", c->ibuf+1);
-                 /* Handle null bulk reply "$-1" */
-                 if (c->readlen-2 == -1) {
-                     clientDone(c);
-                     return;
-                 }
-                 /* Leave all the rest in the input buffer */
-                 c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1);
-                 /* fall through to reach the point where the code will try
-                  * to check if the bulk reply is complete. */
-             } else if (c->replytype == REPLY_MBULK && c->mbulk == -1) {
-                 /* Read the count of a multi bulk reply. That is, how many
-                  * bulk replies we have to read next. "*<count>" protocol. */
-                 *p = '\0';
-                 *(p-1) = '\0';
-                 c->mbulk = atoi(c->ibuf+1);
-                 /* Handle null bulk reply "*-1" */
-                 if (c->mbulk == -1) {
-                     clientDone(c);
-                     return;
+         c->replytype == REPLY_RETCODE)
+     {
+         /* Check if the first line is complete. This is everything we need
+          * when waiting for an integer or status code reply.*/
+         if ((p = strstr(c->ibuf,"\r\n")) != NULL)
+             goto done;
+     } else if (c->replytype == REPLY_BULK) {
+         int advance = 0;
+         if (c->readlen < 0) {
+             advance = readLen(c->ibuf+pos,&c->readlen);
+             if (advance) {
+                 pos += advance;
+                 if (c->readlen == -1) {
+                     goto done;
+                 } else {
+                     /* include the trailing \r\n */
+                     c->readlen += 2;
                  }
-                 // printf("%p) %d elements list\n", c, c->mbulk);
-                 /* Leave all the rest in the input buffer */
-                 c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1);
-                 goto processdata;
              } else {
-                 c->ibuf = sdstrim(c->ibuf,"\r\n");
-                 clientDone(c);
-                 return;
+                 goto skip;
              }
          }
-     }
-     /* bulk read, did we read everything? */
-     if (((c->replytype == REPLY_MBULK && c->mbulk != -1) || 
-          (c->replytype == REPLY_BULK)) && c->readlen != -1 &&
-           (unsigned)c->readlen <= sdslen(c->ibuf))
-     {
-         // printf("BULKSTATUS mbulk:%d readlen:%d sdslen:%d\n",
-         //    c->mbulk,c->readlen,sdslen(c->ibuf));
-         if (c->replytype == REPLY_BULK) {
-             clientDone(c);
-         } else if (c->replytype == REPLY_MBULK) {
-             // printf("%p) %d (%d)) ",c, c->mbulk, c->readlen);
-             // fwrite(c->ibuf,c->readlen,1,stdout);
-             // printf("\n");
-             if (--c->mbulk == 0) {
-                 clientDone(c);
+         int canconsume;
+         if (c->readlen > 0) {
+             canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen;
+             c->readlen -= canconsume;
+             pos += canconsume;
+         }
+         if (c->readlen == 0)
+             goto done;
+     } else if (c->replytype == REPLY_MBULK) {
+         int advance = 0;
+         if (c->mbulk == -1) {
+             advance = readLen(c->ibuf+pos,&c->mbulk);
+             if (advance) {
+                 pos += advance;
+                 if (c->mbulk == -1)
+                     goto done;
+             } else {
+                 goto skip;
+             }
+         }
+         int canconsume;
+         while(c->mbulk > 0 && pos < len) {
+             if (c->readlen > 0) {
+                 canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen;
+                 c->readlen -= canconsume;
+                 pos += canconsume;
+                 if (c->readlen == 0)
+                     c->mbulk--;
              } else {
-                 c->ibuf = sdsrange(c->ibuf,c->readlen,-1);
-                 c->readlen = -1;
-                 goto processdata;
+                 advance = readLen(c->ibuf+pos,&c->readlen);
+                 if (advance) {
+                     pos += advance;
+                     if (c->readlen == -1) {
+                         c->mbulk--;
+                         continue;
+                     } else {
+                         /* include the trailing \r\n */
+                         c->readlen += 2;
+                     }
+                 } else {
+                     goto skip;
+                 }
              }
          }
+         if (c->mbulk == 0)
+             goto done;
      }
+ skip:
+     c->ibuf = sdsrange(c->ibuf,pos,-1);
+     return;
+ done:
+     clientDone(c);
+     return;
  }
  
  static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask)
@@@ -341,11 -359,7 +360,11 @@@ static client createClient(void) 
      client c = zmalloc(sizeof(struct _client));
      char err[ANET_ERR_LEN];
  
 -    c->fd = anetTcpNonBlockConnect(err,config.hostip,config.hostport);
 +    if (config.hostsocket == NULL)
 +        c->fd = anetTcpNonBlockConnect(err,config.hostip,config.hostport);
 +    else
 +        c->fd = anetUnixNonBlockConnect(err,config.hostsocket);
 +
      if (c->fd == ANET_ERR) {
          zfree(c);
          fprintf(stderr,"Connect: %s\n",err);
@@@ -376,13 -390,13 +395,13 @@@ static void createMissingClients(clien
      }
  }
  
- static void showLatencyReport(char *title) {
+ static void showLatencyReport(void) {
      int j, seen = 0;
      float perc, reqpersec;
  
      reqpersec = (float)config.donerequests/((float)config.totlatency/1000);
      if (!config.quiet) {
-         printf("====== %s ======\n", title);
+         printf("====== %s ======\n", config.title);
          printf("  %d requests completed in %.2f seconds\n", config.donerequests,
              (float)config.totlatency/1000);
          printf("  %d parallel clients\n", config.numclients);
          }
          printf("%.2f requests per second\n\n", reqpersec);
      } else {
-         printf("%s: %.2f requests per second\n", title, reqpersec);
+         printf("%s: %.2f requests per second\n", config.title, reqpersec);
      }
  }
  
- static void prepareForBenchmark(void)
- {
+ static void prepareForBenchmark(char *title) {
      memset(config.latency,0,sizeof(int)*(MAX_LATENCY+1));
+     config.title = title;
      config.start = mstime();
      config.donerequests = 0;
  }
  
- static void endBenchmark(char *title) {
+ static void endBenchmark(void) {
      config.totlatency = mstime()-config.start;
-     showLatencyReport(title);
+     showLatencyReport();
      freeAllClients();
  }
  
@@@ -441,9 -455,6 +460,9 @@@ void parseOptions(int argc, char **argv
          } else if (!strcmp(argv[i],"-p") && !lastarg) {
              config.hostport = atoi(argv[i+1]);
              i++;
 +        } else if (!strcmp(argv[i],"-s") && !lastarg) {
 +            config.hostsocket = argv[i+1];
 +            i++;
          } else if (!strcmp(argv[i],"-d") && !lastarg) {
              config.datasize = atoi(argv[i+1]);
              i++;
              printf("Wrong option '%s' or option argument missing\n\n",argv[i]);
              printf("Usage: redis-benchmark [-h <host>] [-p <port>] [-c <clients>] [-n <requests]> [-k <boolean>]\n\n");
              printf(" -h <hostname>      Server hostname (default 127.0.0.1)\n");
 -            printf(" -p <hostname>      Server port (default 6379)\n");
 +            printf(" -p <port>          Server port (default 6379)\n");
 +            printf(" -s <socket>        Server socket (overrides host and port)\n");
              printf(" -c <clients>       Number of parallel connections (default 50)\n");
              printf(" -n <requests>      Total number of requests (default 10000)\n");
              printf(" -d <size>          Data size of SET/GET value in bytes (default 2)\n");
      }
  }
  
+ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) {
+     REDIS_NOTUSED(eventLoop);
+     REDIS_NOTUSED(id);
+     REDIS_NOTUSED(clientData);
+     float dt = (float)(mstime()-config.start)/1000.0;
+     float rps = (float)config.donerequests/dt;
+     printf("%s: %.2f\r", config.title, rps);
+     fflush(stdout);
+     return 250; /* every 250ms */
+ }
  int main(int argc, char **argv) {
      client c;
  
      config.requests = 10000;
      config.liveclients = 0;
      config.el = aeCreateEventLoop();
+     aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL);
      config.keepalive = 1;
      config.donerequests = 0;
      config.datasize = 3;
  
      config.hostip = "127.0.0.1";
      config.hostport = 6379;
 +    config.hostsocket = NULL;
  
      parseOptions(argc,argv);
  
  
      if (config.idlemode) {
          printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients);
-         prepareForBenchmark();
+         prepareForBenchmark("IDLE");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdsempty();
      }
  
      do {
-         prepareForBenchmark();
+         prepareForBenchmark("PING");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscat(c->obuf,"PING\r\n");
          prepareClientForReply(c,REPLY_RETCODE);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("PING");
+         endBenchmark();
  
-         prepareForBenchmark();
+         prepareForBenchmark("PING (multi bulk)");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscat(c->obuf,"*1\r\n$4\r\nPING\r\n");
          prepareClientForReply(c,REPLY_RETCODE);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("PING (multi bulk)");
+         endBenchmark();
  
-         prepareForBenchmark();
+         prepareForBenchmark("SET");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscatprintf(c->obuf,"SET foo_rand000000000000 %d\r\n",config.datasize);
          prepareClientForReply(c,REPLY_RETCODE);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("SET");
+         endBenchmark();
  
-         prepareForBenchmark();
+         prepareForBenchmark("GET");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscat(c->obuf,"GET foo_rand000000000000\r\n");
          prepareClientForReply(c,REPLY_BULK);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("GET");
+         endBenchmark();
  
-         prepareForBenchmark();
+         prepareForBenchmark("INCR");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscat(c->obuf,"INCR counter_rand000000000000\r\n");
          prepareClientForReply(c,REPLY_INT);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("INCR");
+         endBenchmark();
  
-         prepareForBenchmark();
+         prepareForBenchmark("LPUSH");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n");
          prepareClientForReply(c,REPLY_INT);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("LPUSH");
+         endBenchmark();
  
-         prepareForBenchmark();
+         prepareForBenchmark("LPOP");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscat(c->obuf,"LPOP mylist\r\n");
          prepareClientForReply(c,REPLY_BULK);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("LPOP");
+         endBenchmark();
  
-         prepareForBenchmark();
+         prepareForBenchmark("SADD");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscat(c->obuf,"SADD myset 24\r\ncounter_rand000000000000\r\n");
          prepareClientForReply(c,REPLY_RETCODE);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("SADD");
+         endBenchmark();
  
-         prepareForBenchmark();
+         prepareForBenchmark("SPOP");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscat(c->obuf,"SPOP myset\r\n");
          prepareClientForReply(c,REPLY_BULK);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("SPOP");
+         endBenchmark();
  
-         prepareForBenchmark();
+         prepareForBenchmark("LPUSH (again, in order to bench LRANGE)");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n");
          prepareClientForReply(c,REPLY_RETCODE);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("LPUSH (again, in order to bench LRANGE)");
+         endBenchmark();
  
-         prepareForBenchmark();
+         prepareForBenchmark("LRANGE (first 100 elements)");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscat(c->obuf,"LRANGE mylist 0 99\r\n");
          prepareClientForReply(c,REPLY_MBULK);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("LRANGE (first 100 elements)");
+         endBenchmark();
  
-         prepareForBenchmark();
+         prepareForBenchmark("LRANGE (first 300 elements)");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscat(c->obuf,"LRANGE mylist 0 299\r\n");
          prepareClientForReply(c,REPLY_MBULK);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("LRANGE (first 300 elements)");
+         endBenchmark();
  
-         prepareForBenchmark();
+         prepareForBenchmark("LRANGE (first 450 elements)");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscat(c->obuf,"LRANGE mylist 0 449\r\n");
          prepareClientForReply(c,REPLY_MBULK);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("LRANGE (first 450 elements)");
+         endBenchmark();
  
-         prepareForBenchmark();
+         prepareForBenchmark("LRANGE (first 600 elements)");
          c = createClient();
          if (!c) exit(1);
          c->obuf = sdscat(c->obuf,"LRANGE mylist 0 599\r\n");
          prepareClientForReply(c,REPLY_MBULK);
          createMissingClients(c);
          aeMain(config.el);
-         endBenchmark("LRANGE (first 600 elements)");
+         endBenchmark();
  
          printf("\n");
      } while(config.loop);
diff --combined src/redis-cli.c
index 4dafba325905544ec03e5dbf63b3f2f7e34d8b35,5071604b871112b23b0acdb3a7abd40881dc89b1..8866678b206c1876d4709905188a4dd9f4d4b6c9
@@@ -36,6 -36,8 +36,8 @@@
  #include <stdlib.h>
  #include <unistd.h>
  #include <ctype.h>
+ #include <errno.h>
+ #include <sys/stat.h>
  
  #include "anet.h"
  #include "sds.h"
  static struct config {
      char *hostip;
      int hostport;
 +    char *hostsocket;
      long repeat;
      int dbnum;
-     int argn_from_stdin;
      int interactive;
      int shutdown;
      int monitor_mode;
      int pubsub_mode;
-     int raw_output;
+     int raw_output; /* output mode per command */
+     int tty; /* flag for default output format */
+     int stdinarg; /* get last arg from stdin. (-x option) */
+     char mb_sep;
      char *auth;
      char *historyfile;
  } config;
  static int cliReadReply(int fd);
  static void usage();
  
- static int cliConnect(void) {
+ /* Connect to the client. If force is not zero the connection is performed
+  * even if there is already a connected socket. */
+ static int cliConnect(int force) {
      char err[ANET_ERR_LEN];
      static int fd = ANET_ERR;
  
-     if (fd == ANET_ERR) {
+     if (fd == ANET_ERR || force) {
+         if (force) close(fd);
 -        fd = anetTcpConnect(err,config.hostip,config.hostport);
 +        if (config.hostsocket == NULL) {
 +            fd = anetTcpConnect(err,config.hostip,config.hostport);
 +        } else {
 +            fd = anetUnixConnect(err,config.hostsocket);
-             if (fd == ANET_ERR) {
-                 fprintf(stderr, "Could not connect to Redis at %s: %s", config.hostsocket, err);
-                 return -1;
-             }
 +        }
          if (fd == ANET_ERR) {
 -            fprintf(stderr, "Could not connect to Redis at %s:%d: %s", config.hostip, config.hostport, err);
 +            fprintf(stderr,"Could not connect to Redis at ");
 +            if (config.hostsocket == NULL)
 +                fprintf(stderr,"%s:%d: %s",config.hostip,config.hostport,err);
 +            else
 +                fprintf(stderr,"%s: %s",config.hostsocket,err);
              return -1;
          }
          anetTcpNoDelay(NULL,fd);
@@@ -103,7 -97,7 +106,7 @@@ static sds cliReadLine(int fd) 
          ssize_t ret;
  
          ret = read(fd,&c,1);
-         if (ret == -1) {
+         if (ret <= 0) {
              sdsfree(line);
              return NULL;
          } else if ((ret == 0) || (c == '\n')) {
@@@ -120,7 -114,7 +123,7 @@@ static int cliReadSingleLineReply(int f
  
      if (reply == NULL) return 1;
      if (!quiet)
-         printf("%s\n", reply);
+         printf("%s", reply);
      sdsfree(reply);
      return 0;
  }
@@@ -147,7 -141,7 +150,7 @@@ static void printStringRepr(char *s, in
          }
          s++;
      }
-     printf("\"\n");
+     printf("\"");
  }
  
  static int cliReadBulkReply(int fd) {
      reply = zmalloc(bulklen);
      anetRead(fd,reply,bulklen);
      anetRead(fd,crlf,2);
-     if (config.raw_output || !isatty(fileno(stdout))) {
+     if (config.raw_output || !config.tty) {
          if (bulklen && fwrite(reply,bulklen,1,stdout) == 0) {
              zfree(reply);
              return 1;
  static int cliReadMultiBulkReply(int fd) {
      sds replylen = cliReadLine(fd);
      int elements, c = 1;
+     int retval = 0;
  
      if (replylen == NULL) return 1;
      elements = atoi(replylen);
          printf("(empty list or set)\n");
      }
      while(elements--) {
-         printf("%d. ", c);
-         if (cliReadReply(fd)) return 1;
+         if (config.tty) printf("%d. ", c);
+         if (cliReadReply(fd)) retval = 1;
+         if (elements) printf("%c",config.mb_sep);
          c++;
      }
-     return 0;
+     return retval;
  }
  
  static int cliReadReply(int fd) {
      char type;
+     int nread;
  
-     if (anetRead(fd,&type,1) <= 0) {
+     if ((nread = anetRead(fd,&type,1)) <= 0) {
          if (config.shutdown) return 0;
-         exit(1);
+         if (config.interactive &&
+             (nread == 0 || (nread == -1 && errno == ECONNRESET)))
+         {
+             return ECONNRESET;
+         } else {
+             printf("I/O error while reading from socket: %s",strerror(errno));
+             exit(1);
+         }
      }
      switch(type) {
      case '-':
-         printf("(error) ");
+         if (config.tty) printf("(error) ");
          cliReadSingleLineReply(fd,0);
          return 1;
      case '+':
          return cliReadSingleLineReply(fd,0);
      case ':':
-         printf("(integer) ");
+         if (config.tty) printf("(integer) ");
          return cliReadSingleLineReply(fd,0);
      case '$':
          return cliReadBulkReply(fd);
      case '*':
          return cliReadMultiBulkReply(fd);
      default:
-         printf("protocol error, got '%c' as reply type byte\n", type);
+         printf("protocol error, got '%c' as reply type byte", type);
          return 1;
      }
  }
@@@ -248,17 -252,37 +261,37 @@@ static int selectDb(int fd) 
      return 0;
  }
  
+ static void showInteractiveHelp(void) {
+     printf(
+     "\n"
+     "Welcome to redis-cli " REDIS_VERSION "!\n"
+     "Just type any valid Redis command to see a pretty printed output.\n"
+     "\n"
+     "It is possible to quote strings, like in:\n"
+     "  set \"my key\" \"some string \\xff\\n\"\n"
+     "\n"
+     "You can find a list of valid Redis commands at\n"
+     "  http://code.google.com/p/redis/wiki/CommandReference\n"
+     "\n"
+     "Note: redis-cli supports line editing, use up/down arrows for history."
+     "\n\n");
+ }
  static int cliSendCommand(int argc, char **argv, int repeat) {
      char *command = argv[0];
      int fd, j, retval = 0;
      sds cmd;
  
      config.raw_output = !strcasecmp(command,"info");
+     if (!strcasecmp(command,"help")) {
+         showInteractiveHelp();
+         return 0;
+     }
      if (!strcasecmp(command,"shutdown")) config.shutdown = 1;
      if (!strcasecmp(command,"monitor")) config.monitor_mode = 1;
      if (!strcasecmp(command,"subscribe") ||
          !strcasecmp(command,"psubscribe")) config.pubsub_mode = 1;
-     if ((fd = cliConnect()) == -1) return 1;
+     if ((fd = cliConnect(0)) == -1) return 1;
  
      /* Select db number */
      retval = selectDb(fd);
      while(repeat--) {
          anetWrite(fd,cmd,sdslen(cmd));
          while (config.monitor_mode) {
-             cliReadSingleLineReply(fd,0);
+             if (cliReadSingleLineReply(fd,0)) exit(1);
+             printf("\n");
          }
  
          if (config.pubsub_mode) {
              printf("Reading messages... (press Ctrl-c to quit)\n");
              while (1) {
                  cliReadReply(fd);
-                 printf("\n");
+                 printf("\n\n");
              }
          }
  
          retval = cliReadReply(fd);
-         if (retval) {
-             return retval;
-         }
+         if (!config.raw_output && config.tty) printf("\n");
+         if (retval) return retval;
      }
      return 0;
  }
@@@ -314,12 -338,11 +347,14 @@@ static int parseOptions(int argc, char 
              i++;
          } else if (!strcmp(argv[i],"-h") && lastarg) {
              usage();
+         } else if (!strcmp(argv[i],"-x")) {
+             config.stdinarg = 1;
          } else if (!strcmp(argv[i],"-p") && !lastarg) {
              config.hostport = atoi(argv[i+1]);
              i++;
 +        } else if (!strcmp(argv[i],"-s") && !lastarg) {
 +            config.hostsocket = argv[i+1];
 +            i++;
          } else if (!strcmp(argv[i],"-r") && !lastarg) {
              config.repeat = strtoll(argv[i+1],NULL,10);
              i++;
              config.auth = argv[i+1];
              i++;
          } else if (!strcmp(argv[i],"-i")) {
-             config.interactive = 1;
+             fprintf(stderr,
+ "Starting interactive mode using -i is deprecated. Interactive mode is started\n"
+ "by default when redis-cli is executed without a command to execute.\n"
+             );
          } else if (!strcmp(argv[i],"-c")) {
-             config.argn_from_stdin = 1;
+             fprintf(stderr,
+ "Reading last argument from standard input using -c is deprecated.\n"
+ "When standard input is connected to a pipe or regular file, it is\n"
+ "automatically used as last argument.\n"
+             );
          } else if (!strcmp(argv[i],"-v")) {
-             printf("redis-cli shipped with Redis verison %s\n", REDIS_VERSION);
+             printf("redis-cli shipped with Redis version %s\n", REDIS_VERSION);
              exit(0);
          } else {
              break;
@@@ -361,10 -391,9 +403,9 @@@ static sds readArgFromStdin(void) 
  }
  
  static void usage() {
 -    fprintf(stderr, "usage: redis-cli [-iv] [-h host] [-p port] [-a authpw] [-r repeat_times] [-n db_num] cmd arg1 arg2 arg3 ... argN\n");
 +    fprintf(stderr, "usage: redis-cli [-iv] [-h host] [-p port] [-s /path/to/socket] [-a authpw] [-r repeat_times] [-n db_num] cmd arg1 arg2 arg3 ... argN\n");
-     fprintf(stderr, "usage: echo \"argN\" | redis-cli -c [-h host] [-p port] [-s /path/to/socket] [-a authpw] [-r repeat_times] [-n db_num] cmd arg1 arg2 ... arg(N-1)\n");
-     fprintf(stderr, "\nIf a pipe from standard input is detected this data is used as last argument.\n\n");
-     fprintf(stderr, "example: cat /etc/passwd | redis-cli set my_passwd\n");
+     fprintf(stderr, "usage: echo \"argN\" | redis-cli -x [options] cmd arg1 arg2 ... arg(N-1)\n\n");
+     fprintf(stderr, "example: cat /etc/passwd | redis-cli -x set my_passwd\n");
      fprintf(stderr, "example: redis-cli get my_passwd\n");
      fprintf(stderr, "example: redis-cli -r 100 lpush mylist x\n");
      fprintf(stderr, "\nRun in interactive mode: redis-cli -i or just don't pass any command\n");
@@@ -382,87 -411,39 +423,39 @@@ static char **convertToSds(int count, c
    return sds;
  }
  
- static char **splitArguments(char *line, int *argc) {
-     char *p = line;
-     char *current = NULL;
-     char **vector = NULL;
-     *argc = 0;
-     while(1) {
-         /* skip blanks */
-         while(*p && isspace(*p)) p++;
-         if (*p) {
-             /* get a token */
-             int inq=0; /* set to 1 if we are in "quotes" */
-             int done = 0;
-             if (current == NULL) current = sdsempty();
-             while(!done) {
-                 if (inq) {
-                     if (*p == '\\' && *(p+1)) {
-                         char c;
-                         p++;
-                         switch(*p) {
-                         case 'n': c = '\n'; break;
-                         case 'r': c = '\r'; break;
-                         case 't': c = '\t'; break;
-                         case 'b': c = '\b'; break;
-                         case 'a': c = '\a'; break;
-                         default: c = *p; break;
-                         }
-                         current = sdscatlen(current,&c,1);
-                     } else if (*p == '"') {
-                         done = 1;
-                     } else {
-                         current = sdscatlen(current,p,1);
-                     }
-                 } else {
-                     switch(*p) {
-                     case ' ':
-                     case '\n':
-                     case '\r':
-                     case '\t':
-                     case '\0':
-                         done=1;
-                         break;
-                     case '"':
-                         inq=1;
-                         break;
-                     default:
-                         current = sdscatlen(current,p,1);
-                         break;
-                     }
-                 }
-                 if (*p) p++;
-             }
-             /* add the token to the vector */
-             vector = zrealloc(vector,((*argc)+1)*sizeof(char*));
-             vector[*argc] = current;
-             (*argc)++;
-             current = NULL;
-         } else {
-             return vector;
-         }
-     }
- }
  #define LINE_BUFLEN 4096
  static void repl() {
      int argc, j;
-     char *line, **argv;
+     char *line;
+     sds *argv;
  
+     config.interactive = 1;
      while((line = linenoise("redis> ")) != NULL) {
          if (line[0] != '\0') {
-             argv = splitArguments(line,&argc);
+             argv = sdssplitargs(line,&argc);
              linenoiseHistoryAdd(line);
              if (config.historyfile) linenoiseHistorySave(config.historyfile);
-             if (argc > 0) {
+             if (argv == NULL) {
+                 printf("Invalid argument(s)\n");
+                 continue;
+             } else if (argc > 0) {
                  if (strcasecmp(argv[0],"quit") == 0 ||
                      strcasecmp(argv[0],"exit") == 0)
-                         exit(0);
-                 else
-                     cliSendCommand(argc, argv, 1);
+                 {
+                     exit(0);
+                 } else {
+                     int err;
+                     if ((err = cliSendCommand(argc, argv, 1)) != 0) {
+                         if (err == ECONNRESET) {
+                             printf("Reconnecting... ");
+                             fflush(stdout);
+                             if (cliConnect(1) == -1) exit(1);
+                             printf("OK\n");
+                             cliSendCommand(argc,argv,1);
+                         }
+                     }
+                 }
              }
              /* Free the argument vector */
              for (j = 0; j < argc; j++)
      exit(0);
  }
  
+ static int noninteractive(int argc, char **argv) {
+     int retval = 0;
+     if (config.stdinarg) {
+         argv = zrealloc(argv, (argc+1)*sizeof(char*));
+         argv[argc] = readArgFromStdin();
+         retval = cliSendCommand(argc+1, argv, config.repeat);
+     } else {
+         /* stdin is probably a tty, can be tested with S_ISCHR(s.st_mode) */
+         retval = cliSendCommand(argc, argv, config.repeat);
+     }
+     return retval;
+ }
  int main(int argc, char **argv) {
      int firstarg;
-     char **argvcopy;
  
      config.hostip = "127.0.0.1";
      config.hostport = 6379;
 +    config.hostsocket = NULL;
      config.repeat = 1;
      config.dbnum = 0;
-     config.argn_from_stdin = 0;
-     config.shutdown = 0;
      config.interactive = 0;
+     config.shutdown = 0;
      config.monitor_mode = 0;
      config.pubsub_mode = 0;
      config.raw_output = 0;
+     config.stdinarg = 0;
      config.auth = NULL;
      config.historyfile = NULL;
+     config.tty = isatty(fileno(stdout)) || (getenv("FAKETTY") != NULL);
+     config.mb_sep = '\n';
  
      if (getenv("HOME") != NULL) {
          config.historyfile = malloc(256);
  
      if (config.auth != NULL) {
          char *authargv[2];
+         int dbnum = config.dbnum;
  
+         /* We need to save the real configured database number and set it to
+          * zero here, otherwise cliSendCommand() will try to perform the
+          * SELECT command before the authentication, and it will fail. */
+         config.dbnum = 0;
          authargv[0] = "AUTH";
          authargv[1] = config.auth;
          cliSendCommand(2, convertToSds(2, authargv), 1);
+         config.dbnum = dbnum; /* restore the right DB number */
      }
  
-     if (argc == 0 || config.interactive == 1) repl();
-     argvcopy = convertToSds(argc+1, argv);
-     if (config.argn_from_stdin) {
-         sds lastarg = readArgFromStdin();
-         argvcopy[argc] = lastarg;
-         argc++;
-     }
-     return cliSendCommand(argc, argvcopy, config.repeat);
+     /* Start interactive mode when no command is provided */
+     if (argc == 0) repl();
+     /* Otherwise, we have some arguments to execute */
+     return noninteractive(argc,convertToSds(argc,argv));
  }
diff --combined src/redis.c
index 0e9b73b777405180d484781843c63b6231e47449,27a855d97a0d0f6d4ff5c7f5328af250f6bc7c91..50cf2f6c275e94abd0d9c16faf9dc9e00443489d
@@@ -51,6 -51,7 +51,7 @@@
  #include <float.h>
  #include <math.h>
  #include <pthread.h>
+ #include <sys/resource.h>
  
  /* Our shared "common" objects */
  
@@@ -170,6 -171,7 +171,7 @@@ struct redisCommand readonlyCommandTabl
      {"info",infoCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
      {"monitor",monitorCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
      {"ttl",ttlCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
+     {"persist",persistCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
      {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0},
      {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
      {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0},
@@@ -338,7 -340,7 +340,7 @@@ dictType zsetDictType = 
      NULL,                      /* val dup */
      dictEncObjKeyCompare,      /* key compare */
      dictRedisObjectDestructor, /* key destructor */
-     dictVanillaFree            /* val destructor of malloc(sizeof(double)) */
+     NULL                       /* val destructor */
  };
  
  /* Db->dict, keys are sds strings, vals are Redis objects. */
@@@ -435,6 -437,48 +437,48 @@@ void updateDictResizePolicy(void) 
  
  /* ======================= Cron: called every 100 ms ======================== */
  
+ /* Try to expire a few timed out keys. The algorithm used is adaptive and
+  * will use few CPU cycles if there are few expiring keys, otherwise
+  * it will get more aggressive to avoid that too much memory is used by
+  * keys that can be removed from the keyspace. */
+ void activeExpireCycle(void) {
+     int j;
+     for (j = 0; j < server.dbnum; j++) {
+         int expired;
+         redisDb *db = server.db+j;
+         /* Continue to expire if at the end of the cycle more than 25%
+          * of the keys were expired. */
+         do {
+             long num = dictSize(db->expires);
+             time_t now = time(NULL);
+             expired = 0;
+             if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
+                 num = REDIS_EXPIRELOOKUPS_PER_CRON;
+             while (num--) {
+                 dictEntry *de;
+                 time_t t;
+                 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
+                 t = (time_t) dictGetEntryVal(de);
+                 if (now > t) {
+                     sds key = dictGetEntryKey(de);
+                     robj *keyobj = createStringObject(key,sdslen(key));
+                     propagateExpire(db,keyobj);
+                     dbDelete(db,keyobj);
+                     decrRefCount(keyobj);
+                     expired++;
+                     server.stat_expiredkeys++;
+                 }
+             }
+         } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
+     }
+ }
  int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      int j, loops = server.cronloops++;
      REDIS_NOTUSED(eventLoop);
           }
      }
  
-     /* Try to expire a few timed out keys. The algorithm used is adaptive and
-      * will use few CPU cycles if there are few expiring keys, otherwise
-      * it will get more aggressive to avoid that too much memory is used by
-      * keys that can be removed from the keyspace. */
-     for (j = 0; j < server.dbnum; j++) {
-         int expired;
-         redisDb *db = server.db+j;
-         /* Continue to expire if at the end of the cycle more than 25%
-          * of the keys were expired. */
-         do {
-             long num = dictSize(db->expires);
-             time_t now = time(NULL);
-             expired = 0;
-             if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
-                 num = REDIS_EXPIRELOOKUPS_PER_CRON;
-             while (num--) {
-                 dictEntry *de;
-                 time_t t;
-                 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
-                 t = (time_t) dictGetEntryVal(de);
-                 if (now > t) {
-                     sds key = dictGetEntryKey(de);
-                     robj *keyobj = createStringObject(key,sdslen(key));
-                     dbDelete(db,keyobj);
-                     decrRefCount(keyobj);
-                     expired++;
-                     server.stat_expiredkeys++;
-                 }
-             }
-         } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
-     }
+     /* Expire a few keys per cycle, only if this is a master.
+      * On slaves we wait for DEL operations synthesized by the master
+      * in order to guarantee a strict consistency. */
+     if (server.masterhost == NULL) activeExpireCycle();
  
      /* Swap a few keys on disk if we are over the memory limit and VM
       * is enbled. Try to free objects from the free list first. */
@@@ -696,16 -709,13 +709,16 @@@ void createSharedObjects(void) 
  }
  
  void initServerConfig() {
 -    server.dbnum = REDIS_DEFAULT_DBNUM;
      server.port = REDIS_SERVERPORT;
 +    server.bindaddr = NULL;
 +    server.unixsocket = NULL;
 +    server.ipfd = -1;
 +    server.sofd = -1;
 +    server.dbnum = REDIS_DEFAULT_DBNUM;
      server.verbosity = REDIS_VERBOSE;
      server.maxidletime = REDIS_MAXIDLETIME;
      server.saveparams = NULL;
      server.logfile = NULL; /* NULL = log on standard output */
 -    server.bindaddr = NULL;
      server.glueoutputbuf = 1;
      server.daemonize = 0;
      server.appendonly = 0;
      server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
      server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
      server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
+     server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
      server.shutdown_asap = 0;
  
      resetServerSaveParams();
@@@ -776,21 -787,9 +790,21 @@@ void initServer() 
      createSharedObjects();
      server.el = aeCreateEventLoop();
      server.db = zmalloc(sizeof(redisDb)*server.dbnum);
 -    server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
 -    if (server.fd == -1) {
 -        redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
 +    server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);
 +    if (server.ipfd == ANET_ERR) {
 +        redisLog(REDIS_WARNING, "Opening port: %s", server.neterr);
 +        exit(1);
 +    }
 +    if (server.unixsocket != NULL) {
 +        unlink(server.unixsocket); /* don't care if this fails */
 +        server.sofd = anetUnixServer(server.neterr,server.unixsocket);
 +        if (server.sofd == ANET_ERR) {
 +            redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
 +            exit(1);
 +        }
 +    }
 +    if (server.ipfd < 0 && server.sofd < 0) {
 +        redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
          exit(1);
      }
      for (j = 0; j < server.dbnum; j++) {
      server.stat_starttime = time(NULL);
      server.unixtime = time(NULL);
      aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
 -    if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
 -        acceptHandler, NULL) == AE_ERR) oom("creating file event");
 +    if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
 +        acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
 +    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
 +        acceptUnixHandler,NULL) == AE_ERR) oom("creating file event");
  
      if (server.appendonly) {
          server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
@@@ -892,9 -889,6 +906,6 @@@ void call(redisClient *c, struct redisC
  int processCommand(redisClient *c) {
      struct redisCommand *cmd;
  
-     /* Free some memory if needed (maxmemory setting) */
-     if (server.maxmemory) freeMemoryIfNeeded();
      /* Handle the multi bulk command type. This is an alternative protocol
       * supported by Redis in order to receive commands that are composed of
       * multiple binary-safe "bulk" arguments. The latency of processing is
      } else if (c->multibulk) {
          if (c->bulklen == -1) {
              if (((char*)c->argv[0]->ptr)[0] != '$') {
-                 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
+                 addReplyError(c,"multi bulk protocol error");
                  resetClient(c);
                  return 1;
              } else {
-                 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
+                 char *eptr;
+                 long bulklen = strtol(((char*)c->argv[0]->ptr)+1,&eptr,10);
+                 int perr = eptr[0] != '\0';
                  decrRefCount(c->argv[0]);
-                 if (bulklen < 0 || bulklen > 1024*1024*1024) {
+                 if (perr || bulklen == LONG_MIN || bulklen == LONG_MAX ||
+                     bulklen < 0 || bulklen > 1024*1024*1024)
+                 {
                      c->argc--;
-                     addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
+                     addReplyError(c,"invalid bulk write count");
                      resetClient(c);
                      return 1;
                  }
       * such wrong arity, bad command name and so forth. */
      cmd = lookupCommand(c->argv[0]->ptr);
      if (!cmd) {
-         addReplySds(c,
-             sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
-                 (char*)c->argv[0]->ptr));
+         addReplyErrorFormat(c,"unknown command '%s'",
+             (char*)c->argv[0]->ptr);
          resetClient(c);
          return 1;
      } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
                 (c->argc < -cmd->arity)) {
-         addReplySds(c,
-             sdscatprintf(sdsempty(),
-                 "-ERR wrong number of arguments for '%s' command\r\n",
-                 cmd->name));
+         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
+             cmd->name);
          resetClient(c);
          return 1;
      } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
          /* This is a bulk command, we have to read the last argument yet. */
-         int bulklen = atoi(c->argv[c->argc-1]->ptr);
+         char *eptr;
+         long bulklen = strtol(c->argv[c->argc-1]->ptr,&eptr,10);
+         int perr = eptr[0] != '\0';
  
          decrRefCount(c->argv[c->argc-1]);
-         if (bulklen < 0 || bulklen > 1024*1024*1024) {
+         if (perr || bulklen == LONG_MAX || bulklen == LONG_MIN ||
+             bulklen < 0 || bulklen > 1024*1024*1024)
+         {
              c->argc--;
-             addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
+             addReplyError(c,"invalid bulk write count");
              resetClient(c);
              return 1;
          }
  
      /* Check if the user is authenticated */
      if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
-         addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
+         addReplyError(c,"operation not permitted");
          resetClient(c);
          return 1;
      }
  
-     /* Handle the maxmemory directive */
+     /* Handle the maxmemory directive.
+      *
+      * First we try to free some memory if possible (if there are volatile
+      * keys in the dataset). If there are not the only thing we can do
+      * is returning an error. */
+     if (server.maxmemory) freeMemoryIfNeeded();
      if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) &&
          zmalloc_used_memory() > server.maxmemory)
      {
-         addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
+         addReplyError(c,"command not allowed when used memory > 'maxmemory'");
          resetClient(c);
          return 1;
      }
          &&
          cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand &&
          cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) {
-         addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n"));
+         addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
          resetClient(c);
          return 1;
      }
@@@ -1081,11 -1086,7 +1103,7 @@@ int prepareForShutdown() 
          if (server.vm_enabled) unlink(server.vm_swap_file);
      } else {
          /* Snapshotting. Perform a SYNC SAVE and exit */
-         if (rdbSave(server.dbfilename) == REDIS_OK) {
-             if (server.daemonize)
-                 unlink(server.pidfile);
-             redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
-         } else {
+         if (rdbSave(server.dbfilename) != REDIS_OK) {
              /* Ooops.. error saving! The best we can do is to continue
               * operating. Note that if there was a background saving process,
               * in the next cron() Redis will be notified that the background
              return REDIS_ERR;
          }
      }
+     if (server.daemonize) unlink(server.pidfile);
      redisLog(REDIS_WARNING,"Server exit now, bye bye...");
      return REDIS_OK;
  }
@@@ -1107,7 -1109,7 +1126,7 @@@ void authCommand(redisClient *c) 
        addReply(c,shared.ok);
      } else {
        c->authenticated = 0;
-       addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
+       addReplyError(c,"invalid password");
      }
  }
  
@@@ -1148,6 -1150,10 +1167,10 @@@ sds genRedisInfoString(void) 
      time_t uptime = time(NULL)-server.stat_starttime;
      int j;
      char hmem[64];
+     struct rusage self_ru, c_ru;
+     getrusage(RUSAGE_SELF, &self_ru);
+     getrusage(RUSAGE_CHILDREN, &c_ru);
  
      bytesToHuman(hmem,zmalloc_used_memory());
      info = sdscatprintf(sdsempty(),
          "process_id:%ld\r\n"
          "uptime_in_seconds:%ld\r\n"
          "uptime_in_days:%ld\r\n"
+         "used_cpu_sys:%.2f\r\n"
+         "used_cpu_user:%.2f\r\n"
+         "used_cpu_sys_childrens:%.2f\r\n"
+         "used_cpu_user_childrens:%.2f\r\n"
          "connected_clients:%d\r\n"
          "connected_slaves:%d\r\n"
          "blocked_clients:%d\r\n"
          "used_memory:%zu\r\n"
          "used_memory_human:%s\r\n"
+         "mem_fragmentation_ratio:%.2f\r\n"
          "changes_since_last_save:%lld\r\n"
          "bgsave_in_progress:%d\r\n"
          "last_save_time:%ld\r\n"
          (long) getpid(),
          uptime,
          uptime/(3600*24),
+         (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000,
+         (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000,
+         (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000,
+         (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
          listLength(server.clients)-listLength(server.slaves),
          listLength(server.slaves),
          server.blpop_blocked_clients,
          zmalloc_used_memory(),
          hmem,
+         zmalloc_get_fragmentation_ratio(),
          server.dirty,
          server.bgsavechildpid != -1,
          server.lastsave,
@@@ -1319,7 -1335,8 +1352,8 @@@ void freeMemoryIfNeeded(void) 
          if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
          for (j = 0; j < server.dbnum; j++) {
              int minttl = -1;
-             robj *minkey = NULL;
+             sds minkey = NULL;
+             robj *keyobj = NULL;
              struct dictEntry *de;
  
              if (dictSize(server.db[j].expires)) {
                          minttl = t;
                      }
                  }
-                 dbDelete(server.db+j,minkey);
+                 keyobj = createStringObject(minkey,sdslen(minkey));
+                 dbDelete(server.db+j,keyobj);
+                 server.stat_expiredkeys++;
+                 decrRefCount(keyobj);
              }
          }
          if (!freed) return; /* nothing to free... */
@@@ -1367,9 -1387,17 +1404,17 @@@ void linuxOvercommitMemoryWarning(void
  }
  #endif /* __linux__ */
  
+ void createPidFile(void) {
+     /* Try to write the pid file in a best-effort way. */
+     FILE *fp = fopen(server.pidfile,"w");
+     if (fp) {
+         fprintf(fp,"%d\n",getpid());
+         fclose(fp);
+     }
+ }
  void daemonize(void) {
      int fd;
-     FILE *fp;
  
      if (fork() != 0) exit(0); /* parent exits */
      setsid(); /* create a new session */
          dup2(fd, STDERR_FILENO);
          if (fd > STDERR_FILENO) close(fd);
      }
-     /* Try to write the pid file */
-     fp = fopen(server.pidfile,"w");
-     if (fp) {
-         fprintf(fp,"%d\n",getpid());
-         fclose(fp);
-     }
  }
  
  void version() {
@@@ -1421,6 -1443,7 +1460,7 @@@ int main(int argc, char **argv) 
      }
      if (server.daemonize) daemonize();
      initServer();
+     if (server.daemonize) createPidFile();
      redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
  #ifdef __linux__
      linuxOvercommitMemoryWarning();
          if (rdbLoad(server.dbfilename) == REDIS_OK)
              redisLog(REDIS_NOTICE,"DB loaded from disk: %ld seconds",time(NULL)-start);
      }
 -    redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
 +    if (server.ipfd > 0)
 +        redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
 +    if (server.sofd > 0)
 +        redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
      aeSetBeforeSleepProc(server.el,beforeSleep);
      aeMain(server.el);
      aeDeleteEventLoop(server.el);
@@@ -1500,6 -1520,7 +1540,7 @@@ void segvHandler(int sig, siginfo_t *in
          redisLog(REDIS_WARNING,"%s", messages[i]);
  
      /* free(messages); Don't call free() with possibly corrupted memory. */
+     if (server.daemonize) unlink(server.pidfile);
      _exit(0);
  }
  
diff --combined src/redis.h
index 38f0c140c5feaa3a505fb7f11765fa16e715c92e,3e9fc2369cdfacf9f3f3536be582b22c34791fba..8e05a4d4e826a1ad1a1b74defed63dee2653a798
@@@ -26,6 -26,7 +26,7 @@@
  #include "anet.h"   /* Networking the easy way */
  #include "zipmap.h" /* Compact string -> string data structure */
  #include "ziplist.h" /* Compact list data structure */
+ #include "intset.h" /* Compact integer set structure */
  #include "version.h"
  
  /* Error codes */
@@@ -46,6 -47,7 +47,7 @@@
  #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
  #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
  #define REDIS_SHARED_INTEGERS 10000
+ #define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */
  
  /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
  #define REDIS_WRITEV_THRESHOLD      3
@@@ -82,6 -84,7 +84,7 @@@
  #define REDIS_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
  #define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
  #define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
+ #define REDIS_ENCODING_INTSET 6  /* Encoded as intset */
  
  /* Object types only used for dumping to disk */
  #define REDIS_EXPIRETIME 253
  #define REDIS_HASH_MAX_ZIPMAP_VALUE 512
  #define REDIS_LIST_MAX_ZIPLIST_ENTRIES 1024
  #define REDIS_LIST_MAX_ZIPLIST_VALUE 32
+ #define REDIS_SET_MAX_INTSET_ENTRIES 4096
  
  /* Sets operations codes */
  #define REDIS_OP_UNION 0
@@@ -282,8 -286,9 +286,9 @@@ typedef struct redisClient 
      int dictid;
      sds querybuf;
      robj **argv, **mbargv;
+     char *newline;          /* pointing to the detected newline in querybuf */
      int argc, mbargc;
-     int bulklen;            /* bulk read len. -1 if not in bulk read mode */
+     long bulklen;            /* bulk read len. -1 if not in bulk read mode */
      int multibulk;          /* multi bulk command format active */
      list *reply;
      int sentlen;
      list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
      dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
      list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
+     /* Response buffer */
+     int bufpos;
+     char buf[REDIS_REPLY_CHUNK_BYTES];
  } redisClient;
  
  struct saveparam {
@@@ -329,12 -338,10 +338,13 @@@ struct sharedObjectsStruct 
  struct redisServer {
      pthread_t mainthread;
      int port;
 -    int fd;
 +    char *bindaddr;
 +    char *unixsocket;
 +    int ipfd;
 +    int sofd;
      redisDb *db;
      long long dirty;            /* changes to DB from the last save */
+     long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */
      list *clients;
      list *slaves, *monitors;
      char neterr[ANET_ERR_LEN];
      struct saveparam *saveparams;
      int saveparamslen;
      char *logfile;
 -    char *bindaddr;
      char *dbfilename;
      char *appendfilename;
      char *requirepass;
      size_t hash_max_zipmap_value;
      size_t list_max_ziplist_entries;
      size_t list_max_ziplist_value;
+     size_t set_max_intset_entries;
      /* Virtual memory state */
      FILE *vm_fp;
      int vm_fd;
@@@ -482,13 -491,14 +493,14 @@@ typedef struct _redisSortOperation 
  } redisSortOperation;
  
  /* ZSETs use a specialized version of Skiplists */
  typedef struct zskiplistNode {
-     struct zskiplistNode **forward;
-     struct zskiplistNode *backward;
-     unsigned int *span;
-     double score;
      robj *obj;
+     double score;
+     struct zskiplistNode *backward;
+     struct zskiplistLevel {
+         struct zskiplistNode *forward;
+         unsigned int span;
+     } level[];
  } zskiplistNode;
  
  typedef struct zskiplist {
@@@ -537,6 -547,14 +549,14 @@@ typedef struct 
      listNode *ln;       /* Entry in linked list */
  } listTypeEntry;
  
+ /* Structure to hold set iteration abstraction. */
+ typedef struct {
+     robj *subject;
+     int encoding;
+     int ii; /* intset iterator */
+     dictIterator *di;
+ } setTypeIterator;
  /* Structure to hold hash iteration abstration. Note that iteration over
   * hashes involves both fields and values. Because it is possible that
   * not both are required, store pointers in the iterator to avoid
@@@ -577,21 -595,34 +597,35 @@@ void resetClient(redisClient *c)
  void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
  void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
  void addReply(redisClient *c, robj *obj);
+ void *addDeferredMultiBulkLength(redisClient *c);
+ void setDeferredMultiBulkLength(redisClient *c, void *node, long length);
  void addReplySds(redisClient *c, sds s);
  void processInputBuffer(redisClient *c);
 -void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
 +void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
 +void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
  void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask);
  void addReplyBulk(redisClient *c, robj *obj);
  void addReplyBulkCString(redisClient *c, char *s);
  void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
  void addReply(redisClient *c, robj *obj);
  void addReplySds(redisClient *c, sds s);
+ void addReplyError(redisClient *c, char *err);
+ void addReplyStatus(redisClient *c, char *status);
  void addReplyDouble(redisClient *c, double d);
  void addReplyLongLong(redisClient *c, long long ll);
- void addReplyUlong(redisClient *c, unsigned long ul);
+ void addReplyMultiBulkLen(redisClient *c, long length);
  void *dupClientReplyValue(void *o);
  
+ #ifdef __GNUC__
+ void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
+     __attribute__((format(printf, 2, 3)));
+ void addReplyStatusFormat(redisClient *c, const char *fmt, ...)
+     __attribute__((format(printf, 2, 3)));
+ #else
+ void addReplyErrorFormat(redisClient *c, const char *fmt, ...);
+ void addReplyStatusFormat(redisClient *c, const char *fmt, ...);
+ #endif
  /* List data type */
  void listTypeTryConversion(robj *subject, robj *value);
  void listTypePush(robj *subject, robj *value, int where);
@@@ -636,6 -667,7 +670,7 @@@ robj *createStringObjectFromLongLong(lo
  robj *createListObject(void);
  robj *createZiplistObject(void);
  robj *createSetObject(void);
+ robj *createIntsetObject(void);
  robj *createHashObject(void);
  robj *createZsetObject(void);
  int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg);
@@@ -677,7 -709,7 +712,7 @@@ void backgroundRewriteDoneHandler(int s
  /* Sorted sets data type */
  zskiplist *zslCreate(void);
  void zslFree(zskiplist *zsl);
void zslInsert(zskiplist *zsl, double score, robj *obj);
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
  
  /* Core functions */
  void freeMemoryIfNeeded(void);
@@@ -719,6 -751,18 +754,18 @@@ int dontWaitForSwappedKey(redisClient *
  void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
  vmpointer *vmSwapObjectBlocking(robj *val);
  
+ /* Set data type */
+ robj *setTypeCreate(robj *value);
+ int setTypeAdd(robj *subject, robj *value);
+ int setTypeRemove(robj *subject, robj *value);
+ int setTypeIsMember(robj *subject, robj *value);
+ setTypeIterator *setTypeInitIterator(robj *subject);
+ void setTypeReleaseIterator(setTypeIterator *si);
+ robj *setTypeNext(setTypeIterator *si);
+ robj *setTypeRandomElement(robj *subject);
+ unsigned long setTypeSize(robj *subject);
+ void setTypeConvert(robj *subject, int enc);
  /* Hash data type */
  void convertToRealHash(robj *o);
  void hashTypeTryConversion(robj *subject, robj **argv, int start, int end);
@@@ -747,6 -791,8 +794,8 @@@ int stringmatch(const char *pattern, co
  long long memtoll(const char *p, int *err);
  int ll2string(char *s, size_t len, long long value);
  int isStringRepresentableAsLong(sds s, long *longval);
+ int isStringRepresentableAsLongLong(sds s, long long *longval);
+ int isObjectRepresentableAsLongLong(robj *o, long long *llongval);
  
  /* Configuration */
  void loadServerConfig(char *filename);
@@@ -755,10 -801,10 +804,10 @@@ void resetServerSaveParams()
  
  /* db.c -- Keyspace access API */
  int removeExpire(redisDb *db, robj *key);
+ void propagateExpire(redisDb *db, robj *key);
  int expireIfNeeded(redisDb *db, robj *key);
- int deleteIfVolatile(redisDb *db, robj *key);
  time_t getExpire(redisDb *db, robj *key);
int setExpire(redisDb *db, robj *key, time_t when);
void setExpire(redisDb *db, robj *key, time_t when);
  robj *lookupKey(redisDb *db, robj *key);
  robj *lookupKeyRead(redisDb *db, robj *key);
  robj *lookupKeyWrite(redisDb *db, robj *key);
@@@ -841,6 -887,7 +890,7 @@@ void expireCommand(redisClient *c)
  void expireatCommand(redisClient *c);
  void getsetCommand(redisClient *c);
  void ttlCommand(redisClient *c);
+ void persistCommand(redisClient *c);
  void slaveofCommand(redisClient *c);
  void debugCommand(redisClient *c);
  void msetCommand(redisClient *c);