]> git.saurik.com Git - redis.git/blobdiff - redis.c
New protocol fix for LREM
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index fa0d6158f4ae173ac313667a9af60bfe2ad3b094..bb7737b5c503e1e09185df6d85fdc81ed7fbc3d2 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -27,7 +27,7 @@
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
-#define REDIS_VERSION "0.07"
+#define REDIS_VERSION "0.08"
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -89,6 +89,7 @@
 #define REDIS_CLOSE 1       /* This client connection should be closed ASAP */
 #define REDIS_SLAVE 2       /* This client is a slave server */
 #define REDIS_MASTER 4      /* This client is a master server */
+#define REDIS_MONITOR 8      /* This client is a slave monitor, see MONITOR */
 
 /* Server replication state */
 #define REDIS_REPL_NONE 0   /* No active replication */
@@ -138,7 +139,7 @@ typedef struct redisClient {
     list *reply;
     int sentlen;
     time_t lastinteraction; /* time of the last interaction, used for timeout */
-    int flags; /* REDIS_CLOSE | REDIS_SLAVE */
+    int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
     int slaveseldb; /* slave selected db, if this client is a slave */
 } redisClient;
 
@@ -154,7 +155,7 @@ struct redisServer {
     dict **dict;
     long long dirty;            /* changes to DB from the last save */
     list *clients;
-    list *slaves;
+    list *slaves, *monitors;
     char neterr[ANET_ERR_LEN];
     aeEventLoop *el;
     int cronloops;              /* number of times the cron function run */
@@ -213,10 +214,10 @@ typedef struct _redisSortOperation {
 } redisSortOperation;
 
 struct sharedObjectsStruct {
-    robj *crlf, *ok, *err, *zerobulk, *nil, *zero, *one, *pong, *space,
-    *minus1, *minus2, *minus3, *minus4,
-    *wrongtypeerr, *nokeyerr, *wrongtypeerrbulk, *nokeyerrbulk,
-    *syntaxerr, *syntaxerrbulk,
+    robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
+    *colon, *nullbulk, *nullmultibulk,
+    *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
+    *outofrangeerr, *plus,
     *select0, *select1, *select2, *select3, *select4,
     *select5, *select6, *select7, *select8, *select9;
 } shared;
@@ -235,7 +236,7 @@ static void addReplySds(redisClient *c, sds s);
 static void incrRefCount(robj *o);
 static int saveDbBackground(char *filename);
 static robj *createStringObject(char *ptr, size_t len);
-static void replicationFeedSlaves(struct redisCommand *cmd, int dictid, robj **argv, int argc);
+static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
 static int syncWithMaster(void);
 
 static void pingCommand(redisClient *c);
@@ -283,6 +284,7 @@ static void sortCommand(redisClient *c);
 static void lremCommand(redisClient *c);
 static void infoCommand(redisClient *c);
 static void mgetCommand(redisClient *c);
+static void monitorCommand(redisClient *c);
 
 /*================================= Globals ================================= */
 
@@ -335,6 +337,7 @@ static struct redisCommand cmdTable[] = {
     {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
     {"sort",sortCommand,-2,REDIS_CMD_INLINE},
     {"info",infoCommand,1,REDIS_CMD_INLINE},
+    {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
     {NULL,NULL,0,0}
 };
 
@@ -657,29 +660,27 @@ static void createSharedObjects(void) {
     shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
     shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
     shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
-    shared.zerobulk = createObject(REDIS_STRING,sdsnew("0\r\n\r\n"));
-    shared.nil = createObject(REDIS_STRING,sdsnew("nil\r\n"));
-    shared.zero = createObject(REDIS_STRING,sdsnew("0\r\n"));
-    shared.one = createObject(REDIS_STRING,sdsnew("1\r\n"));
+    shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
+    shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
+    shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
+    shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
+    shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
+    shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
     /* no such key */
-    shared.minus1 = createObject(REDIS_STRING,sdsnew("-1\r\n"));
-    /* operation against key holding a value of the wrong type */
-    shared.minus2 = createObject(REDIS_STRING,sdsnew("-2\r\n"));
-    /* src and dest objects are the same */
-    shared.minus3 = createObject(REDIS_STRING,sdsnew("-3\r\n"));
-    /* out of range argument */
-    shared.minus4 = createObject(REDIS_STRING,sdsnew("-4\r\n"));
     shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
     shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
         "-ERR Operation against a key holding the wrong kind of value\r\n"));
-    shared.wrongtypeerrbulk = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%d\r\n%s",-sdslen(shared.wrongtypeerr->ptr)+2,shared.wrongtypeerr->ptr));
     shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
         "-ERR no such key\r\n"));
-    shared.nokeyerrbulk = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%d\r\n%s",-sdslen(shared.nokeyerr->ptr)+2,shared.nokeyerr->ptr));
     shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
         "-ERR syntax error\r\n"));
-    shared.syntaxerrbulk = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%d\r\n%s",-sdslen(shared.syntaxerr->ptr)+2,shared.syntaxerr->ptr));
+    shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
+        "-ERR source and destination objects are the same\r\n"));
+    shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
+        "-ERR index out of range\r\n"));
     shared.space = createObject(REDIS_STRING,sdsnew(" "));
+    shared.colon = createObject(REDIS_STRING,sdsnew(":"));
+    shared.plus = createObject(REDIS_STRING,sdsnew("+"));
     shared.select0 = createStringObject("select 0\r\n",10);
     shared.select1 = createStringObject("select 1\r\n",10);
     shared.select2 = createStringObject("select 2\r\n",10);
@@ -739,11 +740,12 @@ static void initServer() {
 
     server.clients = listCreate();
     server.slaves = listCreate();
+    server.monitors = listCreate();
     server.objfreelist = listCreate();
     createSharedObjects();
     server.el = aeCreateEventLoop();
     server.dict = zmalloc(sizeof(dict*)*server.dbnum);
-    if (!server.dict || !server.clients || !server.slaves || !server.el || !server.objfreelist)
+    if (!server.dict || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
         oom("server initialization"); /* Fatal OOM */
     server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
     if (server.fd == -1) {
@@ -922,9 +924,10 @@ static void freeClient(redisClient *c) {
     assert(ln != NULL);
     listDelNode(server.clients,ln);
     if (c->flags & REDIS_SLAVE) {
-        ln = listSearchKey(server.slaves,c);
+        list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
+        ln = listSearchKey(l,c);
         assert(ln != NULL);
-        listDelNode(server.slaves,ln);
+        listDelNode(l,ln);
     }
     if (c->flags & REDIS_MASTER) {
         server.master = NULL;
@@ -1083,7 +1086,9 @@ static int processCommand(redisClient *c) {
     dirty = server.dirty;
     cmd->proc(c);
     if (server.dirty-dirty != 0 && listLength(server.slaves))
-        replicationFeedSlaves(cmd,c->dictid,c->argv,c->argc);
+        replicationFeedSlaves(server.slaves,cmd,c->dictid,c->argv,c->argc);
+    if (listLength(server.monitors))
+        replicationFeedSlaves(server.monitors,cmd,c->dictid,c->argv,c->argc);
     server.stat_numcommands++;
 
     /* Prepare the client for the next command */
@@ -1095,8 +1100,8 @@ static int processCommand(redisClient *c) {
     return 1;
 }
 
-static void replicationFeedSlaves(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
-    listNode *ln = server.slaves->head;
+static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
+    listNode *ln = slaves->head;
     robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */
     int outc = 0, j;
     
@@ -1526,13 +1531,19 @@ static int saveDbBackground(char *filename) {
     return REDIS_OK; /* unreached */
 }
 
+static int loadType(FILE *fp) {
+    uint8_t type;
+    if (fread(&type,1,1,fp) == 0) return -1;
+    return type;
+}
+
 static int loadDb(char *filename) {
     FILE *fp;
     char buf[REDIS_LOADBUF_LEN];    /* Try to use this buffer instead of */
     char vbuf[REDIS_LOADBUF_LEN];   /* malloc() when the element is small */
     char *key = NULL, *val = NULL;
     uint32_t klen,vlen,dbid;
-    uint8_t type;
+    int type;
     int retval;
     dict *d = server.dict[0];
 
@@ -1548,7 +1559,7 @@ static int loadDb(char *filename) {
         robj *o;
 
         /* Read type. */
-        if (fread(&type,1,1,fp) == 0) goto eoferr;
+        if ((type = loadType(fp)) == -1) goto eoferr;
         if (type == REDIS_EOF) break;
         /* Handle SELECT DB opcode as a special case */
         if (type == REDIS_SELECTDB) {
@@ -1647,7 +1658,7 @@ static void pingCommand(redisClient *c) {
 }
 
 static void echoCommand(redisClient *c) {
-    addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",
+    addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
         (int)sdslen(c->argv[1]->ptr)));
     addReply(c,c->argv[1]);
     addReply(c,shared.crlf);
@@ -1664,7 +1675,7 @@ static void setGenericCommand(redisClient *c, int nx) {
             dictReplace(c->dict,c->argv[1],c->argv[2]);
             incrRefCount(c->argv[2]);
         } else {
-            addReply(c,shared.zero);
+            addReply(c,shared.czero);
             return;
         }
     } else {
@@ -1672,7 +1683,7 @@ static void setGenericCommand(redisClient *c, int nx) {
         incrRefCount(c->argv[2]);
     }
     server.dirty++;
-    addReply(c, nx ? shared.one : shared.ok);
+    addReply(c, nx ? shared.cone : shared.ok);
 }
 
 static void setCommand(redisClient *c) {
@@ -1688,14 +1699,14 @@ static void getCommand(redisClient *c) {
     
     de = dictFind(c->dict,c->argv[1]);
     if (de == NULL) {
-        addReply(c,shared.nil);
+        addReply(c,shared.nullbulk);
     } else {
         robj *o = dictGetEntryVal(de);
         
         if (o->type != REDIS_STRING) {
-            addReply(c,shared.wrongtypeerrbulk);
+            addReply(c,shared.wrongtypeerr);
         } else {
-            addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(o->ptr)));
+            addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
             addReply(c,o);
             addReply(c,shared.crlf);
         }
@@ -1706,18 +1717,18 @@ static void mgetCommand(redisClient *c) {
     dictEntry *de;
     int j;
   
-    addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",c->argc-1));
+    addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
     for (j = 1; j < c->argc; j++) {
         de = dictFind(c->dict,c->argv[j]);
         if (de == NULL) {
-            addReply(c,shared.minus1);
+            addReply(c,shared.nullbulk);
         } else {
             robj *o = dictGetEntryVal(de);
             
             if (o->type != REDIS_STRING) {
-                addReply(c,shared.minus1);
+                addReply(c,shared.nullbulk);
             } else {
-                addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(o->ptr)));
+                addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
                 addReply(c,o);
                 addReply(c,shared.crlf);
             }
@@ -1755,6 +1766,7 @@ static void incrDecrCommand(redisClient *c, int incr) {
         incrRefCount(c->argv[1]);
     }
     server.dirty++;
+    addReply(c,shared.colon);
     addReply(c,o);
     addReply(c,shared.crlf);
 }
@@ -1782,9 +1794,9 @@ static void decrbyCommand(redisClient *c) {
 static void delCommand(redisClient *c) {
     if (dictDelete(c->dict,c->argv[1]) == DICT_OK) {
         server.dirty++;
-        addReply(c,shared.one);
+        addReply(c,shared.cone);
     } else {
-        addReply(c,shared.zero);
+        addReply(c,shared.czero);
     }
 }
 
@@ -1793,9 +1805,9 @@ static void existsCommand(redisClient *c) {
     
     de = dictFind(c->dict,c->argv[1]);
     if (de == NULL)
-        addReply(c,shared.zero);
+        addReply(c,shared.czero);
     else
-        addReply(c,shared.one);
+        addReply(c,shared.cone);
 }
 
 static void selectCommand(redisClient *c) {
@@ -1815,6 +1827,7 @@ static void randomkeyCommand(redisClient *c) {
     if (de == NULL) {
         addReply(c,shared.crlf);
     } else {
+        addReply(c,shared.plus);
         addReply(c,dictGetEntryKey(de));
         addReply(c,shared.crlf);
     }
@@ -1845,18 +1858,18 @@ static void keysCommand(redisClient *c) {
         }
     }
     dictReleaseIterator(di);
-    lenobj->ptr = sdscatprintf(sdsempty(),"%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
+    lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
     addReply(c,shared.crlf);
 }
 
 static void dbsizeCommand(redisClient *c) {
     addReplySds(c,
-        sdscatprintf(sdsempty(),"%lu\r\n",dictGetHashTableUsed(c->dict)));
+        sdscatprintf(sdsempty(),":%lu\r\n",dictGetHashTableUsed(c->dict)));
 }
 
 static void lastsaveCommand(redisClient *c) {
     addReplySds(c,
-        sdscatprintf(sdsempty(),"%lu\r\n",server.lastsave));
+        sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
 }
 
 static void typeCommand(redisClient *c) {
@@ -1865,14 +1878,14 @@ static void typeCommand(redisClient *c) {
     
     de = dictFind(c->dict,c->argv[1]);
     if (de == NULL) {
-        type = "none";
+        type = "+none";
     } else {
         robj *o = dictGetEntryVal(de);
         
         switch(o->type) {
-        case REDIS_STRING: type = "string"; break;
-        case REDIS_LIST: type = "list"; break;
-        case REDIS_SET: type = "set"; break;
+        case REDIS_STRING: type = "+string"; break;
+        case REDIS_LIST: type = "+list"; break;
+        case REDIS_SET: type = "+set"; break;
         default: type = "unknown"; break;
         }
     }
@@ -1920,19 +1933,13 @@ static void renameGenericCommand(redisClient *c, int nx) {
 
     /* To use the same key as src and dst is probably an error */
     if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
-        if (nx)
-            addReply(c,shared.minus3);
-        else
-            addReplySds(c,sdsnew("-ERR src and dest key are the same\r\n"));
+        addReply(c,shared.sameobjecterr);
         return;
     }
 
     de = dictFind(c->dict,c->argv[1]);
     if (de == NULL) {
-        if (nx)
-            addReply(c,shared.minus1);
-        else
-            addReply(c,shared.nokeyerr);
+        addReply(c,shared.nokeyerr);
         return;
     }
     o = dictGetEntryVal(de);
@@ -1940,7 +1947,7 @@ static void renameGenericCommand(redisClient *c, int nx) {
     if (dictAdd(c->dict,c->argv[2],o) == DICT_ERR) {
         if (nx) {
             decrRefCount(o);
-            addReply(c,shared.zero);
+            addReply(c,shared.czero);
             return;
         }
         dictReplace(c->dict,c->argv[2],o);
@@ -1949,7 +1956,7 @@ static void renameGenericCommand(redisClient *c, int nx) {
     }
     dictDelete(c->dict,c->argv[1]);
     server.dirty++;
-    addReply(c,nx ? shared.one : shared.ok);
+    addReply(c,nx ? shared.cone : shared.ok);
 }
 
 static void renameCommand(redisClient *c) {
@@ -1970,7 +1977,7 @@ static void moveCommand(redisClient *c) {
     src = c->dict;
     srcid = c->dictid;
     if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
-        addReply(c,shared.minus4);
+        addReply(c,shared.outofrangeerr);
         return;
     }
     dst = c->dict;
@@ -1980,14 +1987,14 @@ static void moveCommand(redisClient *c) {
     /* If the user is moving using as target the same
      * DB as the source DB it is probably an error. */
     if (src == dst) {
-        addReply(c,shared.minus3);
+        addReply(c,shared.sameobjecterr);
         return;
     }
 
     /* Check if the element exists and get a reference */
     de = dictFind(c->dict,c->argv[1]);
     if (!de) {
-        addReply(c,shared.zero);
+        addReply(c,shared.czero);
         return;
     }
 
@@ -1995,7 +2002,7 @@ static void moveCommand(redisClient *c) {
     key = dictGetEntryKey(de);
     o = dictGetEntryVal(de);
     if (dictAdd(dst,key,o) == DICT_ERR) {
-        addReply(c,shared.zero);
+        addReply(c,shared.czero);
         return;
     }
     incrRefCount(key);
@@ -2004,7 +2011,7 @@ static void moveCommand(redisClient *c) {
     /* OK! key moved, free the entry in the source DB */
     dictDelete(src,c->argv[1]);
     server.dirty++;
-    addReply(c,shared.one);
+    addReply(c,shared.cone);
 }
 
 /* =================================== Lists ================================ */
@@ -2057,15 +2064,15 @@ static void llenCommand(redisClient *c) {
     
     de = dictFind(c->dict,c->argv[1]);
     if (de == NULL) {
-        addReply(c,shared.zero);
+        addReply(c,shared.czero);
         return;
     } else {
         robj *o = dictGetEntryVal(de);
         if (o->type != REDIS_LIST) {
-            addReply(c,shared.minus2);
+            addReply(c,shared.wrongtypeerr);
         } else {
             l = o->ptr;
-            addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",listLength(l)));
+            addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
         }
     }
 }
@@ -2076,22 +2083,22 @@ static void lindexCommand(redisClient *c) {
     
     de = dictFind(c->dict,c->argv[1]);
     if (de == NULL) {
-        addReply(c,shared.nil);
+        addReply(c,shared.nullbulk);
     } else {
         robj *o = dictGetEntryVal(de);
         
         if (o->type != REDIS_LIST) {
-            addReply(c,shared.wrongtypeerrbulk);
+            addReply(c,shared.wrongtypeerr);
         } else {
             list *list = o->ptr;
             listNode *ln;
             
             ln = listIndex(list, index);
             if (ln == NULL) {
-                addReply(c,shared.nil);
+                addReply(c,shared.nullbulk);
             } else {
                 robj *ele = listNodeValue(ln);
-                addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(ele->ptr)));
+                addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
                 addReply(c,ele);
                 addReply(c,shared.crlf);
             }
@@ -2117,7 +2124,7 @@ static void lsetCommand(redisClient *c) {
             
             ln = listIndex(list, index);
             if (ln == NULL) {
-                addReplySds(c,sdsnew("-ERR index out of range\r\n"));
+                addReply(c,shared.outofrangeerr);
             } else {
                 robj *ele = listNodeValue(ln);
 
@@ -2136,12 +2143,12 @@ static void popGenericCommand(redisClient *c, int where) {
     
     de = dictFind(c->dict,c->argv[1]);
     if (de == NULL) {
-        addReply(c,shared.nil);
+        addReply(c,shared.nullbulk);
     } else {
         robj *o = dictGetEntryVal(de);
         
         if (o->type != REDIS_LIST) {
-            addReply(c,shared.wrongtypeerrbulk);
+            addReply(c,shared.wrongtypeerr);
         } else {
             list *list = o->ptr;
             listNode *ln;
@@ -2152,10 +2159,10 @@ static void popGenericCommand(redisClient *c, int where) {
                 ln = listLast(list);
 
             if (ln == NULL) {
-                addReply(c,shared.nil);
+                addReply(c,shared.nullbulk);
             } else {
                 robj *ele = listNodeValue(ln);
-                addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(ele->ptr)));
+                addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
                 addReply(c,ele);
                 addReply(c,shared.crlf);
                 listDelNode(list,ln);
@@ -2180,12 +2187,12 @@ static void lrangeCommand(redisClient *c) {
     
     de = dictFind(c->dict,c->argv[1]);
     if (de == NULL) {
-        addReply(c,shared.nil);
+        addReply(c,shared.nullmultibulk);
     } else {
         robj *o = dictGetEntryVal(de);
         
         if (o->type != REDIS_LIST) {
-            addReply(c,shared.wrongtypeerrbulk);
+            addReply(c,shared.wrongtypeerr);
         } else {
             list *list = o->ptr;
             listNode *ln;
@@ -2202,7 +2209,7 @@ static void lrangeCommand(redisClient *c) {
             /* indexes sanity checks */
             if (start > end || start >= llen) {
                 /* Out of range start or start > end result in empty list */
-                addReply(c,shared.zero);
+                addReply(c,shared.emptymultibulk);
                 return;
             }
             if (end >= llen) end = llen-1;
@@ -2210,10 +2217,10 @@ static void lrangeCommand(redisClient *c) {
 
             /* Return the result in form of a multi-bulk reply */
             ln = listIndex(list, start);
-            addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",rangelen));
+            addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
             for (j = 0; j < rangelen; j++) {
                 ele = listNodeValue(ln);
-                addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(ele->ptr)));
+                addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
                 addReply(c,ele);
                 addReply(c,shared.crlf);
                 ln = ln->next;
@@ -2278,12 +2285,12 @@ static void lremCommand(redisClient *c) {
     
     de = dictFind(c->dict,c->argv[1]);
     if (de == NULL) {
-        addReply(c,shared.minus1);
+        addReply(c,shared.nokeyerr);
     } else {
         robj *o = dictGetEntryVal(de);
         
         if (o->type != REDIS_LIST) {
-            addReply(c,shared.minus2);
+            addReply(c,shared.wrongtypeerr);
         } else {
             list *list = o->ptr;
             listNode *ln, *next;
@@ -2307,7 +2314,7 @@ static void lremCommand(redisClient *c) {
                 }
                 ln = next;
             }
-            addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",removed));
+            addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
         }
     }
 }
@@ -2326,16 +2333,16 @@ static void saddCommand(redisClient *c) {
     } else {
         set = dictGetEntryVal(de);
         if (set->type != REDIS_SET) {
-            addReply(c,shared.minus2);
+            addReply(c,shared.wrongtypeerr);
             return;
         }
     }
     if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
         incrRefCount(c->argv[2]);
         server.dirty++;
-        addReply(c,shared.one);
+        addReply(c,shared.cone);
     } else {
-        addReply(c,shared.zero);
+        addReply(c,shared.czero);
     }
 }
 
@@ -2344,20 +2351,20 @@ static void sremCommand(redisClient *c) {
 
     de = dictFind(c->dict,c->argv[1]);
     if (de == NULL) {
-        addReply(c,shared.zero);
+        addReply(c,shared.czero);
     } else {
         robj *set;
 
         set = dictGetEntryVal(de);
         if (set->type != REDIS_SET) {
-            addReply(c,shared.minus2);
+            addReply(c,shared.wrongtypeerr);
             return;
         }
         if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
             server.dirty++;
-            addReply(c,shared.one);
+            addReply(c,shared.cone);
         } else {
-            addReply(c,shared.zero);
+            addReply(c,shared.czero);
         }
     }
 }
@@ -2367,19 +2374,19 @@ static void sismemberCommand(redisClient *c) {
 
     de = dictFind(c->dict,c->argv[1]);
     if (de == NULL) {
-        addReply(c,shared.zero);
+        addReply(c,shared.czero);
     } else {
         robj *set;
 
         set = dictGetEntryVal(de);
         if (set->type != REDIS_SET) {
-            addReply(c,shared.minus2);
+            addReply(c,shared.wrongtypeerr);
             return;
         }
         if (dictFind(set->ptr,c->argv[2]))
-            addReply(c,shared.one);
+            addReply(c,shared.cone);
         else
-            addReply(c,shared.zero);
+            addReply(c,shared.czero);
     }
 }
 
@@ -2389,15 +2396,15 @@ static void scardCommand(redisClient *c) {
     
     de = dictFind(c->dict,c->argv[1]);
     if (de == NULL) {
-        addReply(c,shared.zero);
+        addReply(c,shared.czero);
         return;
     } else {
         robj *o = dictGetEntryVal(de);
         if (o->type != REDIS_SET) {
-            addReply(c,shared.minus2);
+            addReply(c,shared.wrongtypeerr);
         } else {
             s = o->ptr;
-            addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",
+            addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
                 dictGetHashTableUsed(s)));
         }
     }
@@ -2424,13 +2431,13 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r
         de = dictFind(c->dict,setskeys[j]);
         if (!de) {
             zfree(dv);
-            addReply(c,dstkey ? shared.nokeyerr : shared.nil);
+            addReply(c,shared.nokeyerr);
             return;
         }
         setobj = dictGetEntryVal(de);
         if (setobj->type != REDIS_SET) {
             zfree(dv);
-            addReply(c,dstkey ? shared.wrongtypeerr : shared.wrongtypeerrbulk);
+            addReply(c,shared.wrongtypeerr);
             return;
         }
         dv[j] = setobj->ptr;
@@ -2472,7 +2479,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r
             continue; /* at least one set does not contain the member */
         ele = dictGetEntryKey(de);
         if (!dstkey) {
-            addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",sdslen(ele->ptr)));
+            addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
             addReply(c,ele);
             addReply(c,shared.crlf);
             cardinality++;
@@ -2484,7 +2491,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r
     dictReleaseIterator(di);
 
     if (!dstkey)
-        lenobj->ptr = sdscatprintf(sdsempty(),"%d\r\n",cardinality);
+        lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
     else
         addReply(c,shared.ok);
     zfree(dv);
@@ -2614,19 +2621,19 @@ static void sortCommand(redisClient *c) {
     /* Lookup the key to sort. It must be of the right types */
     de = dictFind(c->dict,c->argv[1]);
     if (de == NULL) {
-        addReply(c,shared.nokeyerrbulk);
+        addReply(c,shared.nokeyerr);
         return;
     }
     sortval = dictGetEntryVal(de);
     if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
-        addReply(c,shared.wrongtypeerrbulk);
+        addReply(c,shared.wrongtypeerr);
         return;
     }
 
     /* Create a list of operations to perform for every sorted element.
      * Operations can be GET/DEL/INCR/DECR */
     operations = listCreate();
-    listSetFreeMethod(operations,free);
+    listSetFreeMethod(operations,zfree);
     j = 2;
 
     /* Now we need to protect sortval incrementing its count, in the future
@@ -2673,7 +2680,7 @@ static void sortCommand(redisClient *c) {
         } else {
             decrRefCount(sortval);
             listRelease(operations);
-            addReply(c,shared.syntaxerrbulk);
+            addReply(c,shared.syntaxerr);
             return;
         }
         j++;
@@ -2754,11 +2761,11 @@ static void sortCommand(redisClient *c) {
     /* Send command output to the output buffer, performing the specified
      * GET/DEL/INCR/DECR operations if any. */
     outputlen = getop ? getop*(end-start+1) : end-start+1;
-    addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",outputlen));
+    addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
     for (j = start; j <= end; j++) {
         listNode *ln = operations->head;
         if (!getop) {
-            addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",
+            addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
                 sdslen(vector[j].obj->ptr)));
             addReply(c,vector[j].obj);
             addReply(c,shared.crlf);
@@ -2770,9 +2777,9 @@ static void sortCommand(redisClient *c) {
 
             if (sop->type == REDIS_SORT_GET) {
                 if (!val || val->type != REDIS_STRING) {
-                    addReply(c,shared.minus1);
+                    addReply(c,shared.nullbulk);
                 } else {
-                    addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",
+                    addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
                         sdslen(val->ptr)));
                     addReply(c,val);
                     addReply(c,shared.crlf);
@@ -2820,7 +2827,7 @@ static void infoCommand(redisClient *c) {
         uptime,
         uptime/(3600*24)
     );
-    addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",sdslen(info)));
+    addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
     addReplySds(c,info);
     addReply(c,shared.crlf);
 }
@@ -2912,6 +2919,9 @@ static void syncCommand(redisClient *c) {
     time_t start = time(NULL);
     char sizebuf[32];
 
+    /* ignore SYNC if aleady slave or in monitor mode */
+    if (c->flags & REDIS_SLAVE) return;
+
     redisLog(REDIS_NOTICE,"Slave ask for syncronization");
     if (flushClientOutput(c) == REDIS_ERR || saveDb(server.dbfilename) != REDIS_OK)
         goto closeconn;
@@ -2920,7 +2930,7 @@ static void syncCommand(redisClient *c) {
     if (fd == -1 || fstat(fd,&sb) == -1) goto closeconn;
     len = sb.st_size;
 
-    snprintf(sizebuf,32,"%d\r\n",len);
+    snprintf(sizebuf,32,"$%d\r\n",len);
     if (syncWrite(c->fd,sizebuf,strlen(sizebuf),5) == -1) goto closeconn;
     while(len) {
         char buf[1024];
@@ -2972,7 +2982,7 @@ static int syncWithMaster(void) {
             strerror(errno));
         return REDIS_ERR;
     }
-    dumpsize = atoi(buf);
+    dumpsize = atoi(buf+1);
     redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
     /* Read the bulk write data on a temp file */
     snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
@@ -3021,6 +3031,16 @@ static int syncWithMaster(void) {
     return REDIS_OK;
 }
 
+static void monitorCommand(redisClient *c) {
+    /* ignore MONITOR if aleady slave or in monitor mode */
+    if (c->flags & REDIS_SLAVE) return;
+
+    c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
+    c->slaveseldb = 0;
+    if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
+    addReply(c,shared.ok);
+}
+
 /* =================================== Main! ================================ */
 
 static void daemonize(void) {