X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/ed329fcf25bb1c41949678eec125034326f86af2..f2aa84bd638b8d18e80531abfd7191f5d0a58c71:/redis.c?ds=sidebyside diff --git a/redis.c b/redis.c index dae4e0af..fb18e090 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "0.07" +#define REDIS_VERSION "0.08" #include #include @@ -46,6 +46,7 @@ #include #include #include +#include #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ @@ -82,13 +83,33 @@ #define REDIS_LIST 1 #define REDIS_SET 2 #define REDIS_HASH 3 + +/* Object types only used for dumping to disk */ #define REDIS_SELECTDB 254 #define REDIS_EOF 255 +/* Defines related to the dump file format. To store 32 bits lengths for short + * keys requires a lot of space, so we check the most significant 2 bits of + * the first byte to interpreter the length: + * + * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte + * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte + * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow + * 11|000000 reserved for future uses + * + * Lenghts up to 63 are stored using a single byte, most DB keys, and may + * values, will fit inside. */ +#define REDIS_RDB_6BITLEN 0 +#define REDIS_RDB_14BITLEN 1 +#define REDIS_RDB_32BITLEN 2 +#define REDIS_RDB_64BITLEN 3 +#define REDIS_RDB_LENERR UINT_MAX + /* Client flags */ #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */ #define REDIS_SLAVE 2 /* This client is a slave server */ #define REDIS_MASTER 4 /* This client is a master server */ +#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */ /* Server replication state */ #define REDIS_REPL_NONE 0 /* No active replication */ @@ -138,8 +159,9 @@ typedef struct redisClient { list *reply; int sentlen; time_t lastinteraction; /* time of the last interaction, used for timeout */ - int flags; /* REDIS_CLOSE | REDIS_SLAVE */ + int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */ int slaveseldb; /* slave selected db, if this client is a slave */ + int authenticated; /* when requirepass is non-NULL */ } redisClient; struct saveparam { @@ -152,9 +174,11 @@ struct redisServer { int port; int fd; dict **dict; + dict *sharingpool; + unsigned int sharingpoolsize; long long dirty; /* changes to DB from the last save */ list *clients; - list *slaves; + list *slaves, *monitors; char neterr[ANET_ERR_LEN]; aeEventLoop *el; int cronloops; /* number of times the cron function run */ @@ -178,6 +202,8 @@ struct redisServer { char *logfile; char *bindaddr; char *dbfilename; + char *requirepass; + int shareobjects; /* Replication related */ int isslave; char *masterhost; @@ -213,10 +239,10 @@ typedef struct _redisSortOperation { } redisSortOperation; struct sharedObjectsStruct { - robj *crlf, *ok, *err, *zerobulk, *nil, *zero, *one, *pong, *space, - *minus1, *minus2, *minus3, *minus4, - *wrongtypeerr, *nokeyerr, *wrongtypeerrbulk, *nokeyerrbulk, - *syntaxerr, *syntaxerrbulk, + robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space, + *colon, *nullbulk, *nullmultibulk, + *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, + *outofrangeerr, *plus, *select0, *select1, *select2, *select3, *select4, *select5, *select6, *select7, *select8, *select9; } shared; @@ -229,15 +255,17 @@ static void freeSetObject(robj *o); static void decrRefCount(void *o); static robj *createObject(int type, void *ptr); static void freeClient(redisClient *c); -static int loadDb(char *filename); +static int rdbLoad(char *filename); static void addReply(redisClient *c, robj *obj); static void addReplySds(redisClient *c, sds s); static void incrRefCount(robj *o); -static int saveDbBackground(char *filename); +static int rdbSaveBackground(char *filename); static robj *createStringObject(char *ptr, size_t len); -static void replicationFeedSlaves(struct redisCommand *cmd, int dictid, robj **argv, int argc); +static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc); static int syncWithMaster(void); +static robj *tryObjectSharing(robj *o); +static void authCommand(redisClient *c); static void pingCommand(redisClient *c); static void echoCommand(redisClient *c); static void setCommand(redisClient *c); @@ -283,6 +311,7 @@ static void sortCommand(redisClient *c); static void lremCommand(redisClient *c); static void infoCommand(redisClient *c); static void mgetCommand(redisClient *c); +static void monitorCommand(redisClient *c); /*================================= Globals ================================= */ @@ -323,6 +352,7 @@ static struct redisCommand cmdTable[] = { {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE}, {"keys",keysCommand,2,REDIS_CMD_INLINE}, {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE}, + {"auth",authCommand,2,REDIS_CMD_INLINE}, {"ping",pingCommand,1,REDIS_CMD_INLINE}, {"echo",echoCommand,2,REDIS_CMD_BULK}, {"save",saveCommand,1,REDIS_CMD_INLINE}, @@ -335,6 +365,7 @@ static struct redisCommand cmdTable[] = { {"flushall",flushallCommand,1,REDIS_CMD_INLINE}, {"sort",sortCommand,-2,REDIS_CMD_INLINE}, {"info",infoCommand,1,REDIS_CMD_INLINE}, + {"monitor",monitorCommand,1,REDIS_CMD_INLINE}, {NULL,NULL,0,0} }; @@ -604,7 +635,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %d bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), - server.usedmemory); + server.usedmemory, + dictGetHashTableUsed(server.sharingpool)); } /* Close connections of timedout clients */ @@ -638,7 +670,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { now-server.lastsave > sp->seconds) { redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, sp->seconds); - saveDbBackground(server.dbfilename); + rdbSaveBackground(server.dbfilename); break; } } @@ -657,29 +689,27 @@ static void createSharedObjects(void) { shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n")); shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n")); shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n")); - shared.zerobulk = createObject(REDIS_STRING,sdsnew("0\r\n\r\n")); - shared.nil = createObject(REDIS_STRING,sdsnew("nil\r\n")); - shared.zero = createObject(REDIS_STRING,sdsnew("0\r\n")); - shared.one = createObject(REDIS_STRING,sdsnew("1\r\n")); + shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n")); + shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n")); + shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n")); + shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n")); + shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n")); + shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n")); /* no such key */ - shared.minus1 = createObject(REDIS_STRING,sdsnew("-1\r\n")); - /* operation against key holding a value of the wrong type */ - shared.minus2 = createObject(REDIS_STRING,sdsnew("-2\r\n")); - /* src and dest objects are the same */ - shared.minus3 = createObject(REDIS_STRING,sdsnew("-3\r\n")); - /* out of range argument */ - shared.minus4 = createObject(REDIS_STRING,sdsnew("-4\r\n")); shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n")); shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew( "-ERR Operation against a key holding the wrong kind of value\r\n")); - shared.wrongtypeerrbulk = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%d\r\n%s",-sdslen(shared.wrongtypeerr->ptr)+2,shared.wrongtypeerr->ptr)); shared.nokeyerr = createObject(REDIS_STRING,sdsnew( "-ERR no such key\r\n")); - shared.nokeyerrbulk = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%d\r\n%s",-sdslen(shared.nokeyerr->ptr)+2,shared.nokeyerr->ptr)); shared.syntaxerr = createObject(REDIS_STRING,sdsnew( "-ERR syntax error\r\n")); - shared.syntaxerrbulk = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%d\r\n%s",-sdslen(shared.syntaxerr->ptr)+2,shared.syntaxerr->ptr)); + shared.sameobjecterr = createObject(REDIS_STRING,sdsnew( + "-ERR source and destination objects are the same\r\n")); + shared.outofrangeerr = createObject(REDIS_STRING,sdsnew( + "-ERR index out of range\r\n")); shared.space = createObject(REDIS_STRING,sdsnew(" ")); + shared.colon = createObject(REDIS_STRING,sdsnew(":")); + shared.plus = createObject(REDIS_STRING,sdsnew("+")); shared.select0 = createStringObject("select 0\r\n",10); shared.select1 = createStringObject("select 1\r\n",10); shared.select2 = createStringObject("select 2\r\n",10); @@ -718,6 +748,8 @@ static void initServerConfig() { server.daemonize = 0; server.pidfile = "/var/run/redis.pid"; server.dbfilename = "dump.rdb"; + server.requirepass = NULL; + server.shareobjects = 0; ResetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -739,22 +771,22 @@ static void initServer() { server.clients = listCreate(); server.slaves = listCreate(); + server.monitors = listCreate(); server.objfreelist = listCreate(); createSharedObjects(); server.el = aeCreateEventLoop(); server.dict = zmalloc(sizeof(dict*)*server.dbnum); - if (!server.dict || !server.clients || !server.slaves || !server.el || !server.objfreelist) + server.sharingpool = dictCreate(&setDictType,NULL); + server.sharingpoolsize = 1024; + if (!server.dict || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist) oom("server initialization"); /* Fatal OOM */ server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); if (server.fd == -1) { redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr); exit(1); } - for (j = 0; j < server.dbnum; j++) { + for (j = 0; j < server.dbnum; j++) server.dict[j] = dictCreate(&hashDictType,NULL); - if (!server.dict[j]) - oom("dictCreate"); /* Fatal OOM */ - } server.cronloops = 0; server.bgsaveinprogress = 0; server.lastsave = time(NULL); @@ -873,6 +905,13 @@ static void loadServerConfig(char *filename) { else { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcmp(argv[0],"shareobjects") && argc == 2) { + sdstolower(argv[1]); + if (!strcmp(argv[1],"yes")) server.shareobjects = 1; + else if (!strcmp(argv[1],"no")) server.shareobjects = 0; + else { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcmp(argv[0],"daemonize") && argc == 2) { sdstolower(argv[1]); if (!strcmp(argv[1],"yes")) server.daemonize = 1; @@ -880,6 +919,8 @@ static void loadServerConfig(char *filename) { else { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcmp(argv[0],"requirepass") && argc == 2) { + server.requirepass = zstrdup(argv[1]); } else if (!strcmp(argv[0],"pidfile") && argc == 2) { server.pidfile = zstrdup(argv[1]); } else { @@ -922,9 +963,10 @@ static void freeClient(redisClient *c) { assert(ln != NULL); listDelNode(server.clients,ln); if (c->flags & REDIS_SLAVE) { - ln = listSearchKey(server.slaves,c); + list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves; + ln = listSearchKey(l,c); assert(ln != NULL); - listDelNode(server.slaves,ln); + listDelNode(l,ln); } if (c->flags & REDIS_MASTER) { server.master = NULL; @@ -1079,11 +1121,26 @@ static int processCommand(redisClient *c) { return 1; } } + /* Let's try to share objects on the command arguments vector */ + if (server.shareobjects) { + int j; + for(j = 1; j < c->argc; j++) + c->argv[j] = tryObjectSharing(c->argv[j]); + } + /* Check if the user is authenticated */ + if (server.requirepass && !c->authenticated && cmd->proc != authCommand) { + addReplySds(c,sdsnew("-ERR operation not permitted\r\n")); + resetClient(c); + return 1; + } + /* Exec the command */ dirty = server.dirty; cmd->proc(c); if (server.dirty-dirty != 0 && listLength(server.slaves)) - replicationFeedSlaves(cmd,c->dictid,c->argv,c->argc); + replicationFeedSlaves(server.slaves,cmd,c->dictid,c->argv,c->argc); + if (listLength(server.monitors)) + replicationFeedSlaves(server.monitors,cmd,c->dictid,c->argv,c->argc); server.stat_numcommands++; /* Prepare the client for the next command */ @@ -1095,8 +1152,8 @@ static int processCommand(redisClient *c) { return 1; } -static void replicationFeedSlaves(struct redisCommand *cmd, int dictid, robj **argv, int argc) { - listNode *ln = server.slaves->head; +static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) { + listNode *ln = slaves->head; robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */ int outc = 0, j; @@ -1260,6 +1317,7 @@ static redisClient *createClient(int fd) { c->sentlen = 0; c->flags = 0; c->lastinteraction = time(NULL); + c->authenticated = 0; if ((c->reply = listCreate()) == NULL) oom("listCreate"); listSetFreeMethod(c->reply,decrRefCount); if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, @@ -1387,14 +1445,92 @@ static void decrRefCount(void *obj) { } } +/* Try to share an object against the shared objects pool */ +static robj *tryObjectSharing(robj *o) { + struct dictEntry *de; + unsigned long c; + + if (server.shareobjects == 0) return o; + + assert(o->type == REDIS_STRING); + de = dictFind(server.sharingpool,o); + if (de) { + robj *shared = dictGetEntryKey(de); + + c = ((unsigned long) dictGetEntryVal(de))+1; + dictGetEntryVal(de) = (void*) c; + incrRefCount(shared); + decrRefCount(o); + return shared; + } else { + /* Here we are using a stream algorihtm: Every time an object is + * shared we increment its count, everytime there is a miss we + * recrement the counter of a random object. If this object reaches + * zero we remove the object and put the current object instead. */ + if (dictGetHashTableUsed(server.sharingpool) >= + server.sharingpoolsize) { + de = dictGetRandomKey(server.sharingpool); + assert(de != NULL); + c = ((unsigned long) dictGetEntryVal(de))-1; + dictGetEntryVal(de) = (void*) c; + if (c == 0) { + dictDelete(server.sharingpool,de->key); + } + } else { + c = 0; /* If the pool is empty we want to add this object */ + } + if (c == 0) { + int retval; + + retval = dictAdd(server.sharingpool,o,(void*)1); + assert(retval == DICT_OK); + incrRefCount(o); + } + return o; + } +} + /*============================ DB saving/loading ============================ */ +static int rdbSaveType(FILE *fp, unsigned char type) { + if (fwrite(&type,1,1,fp) == 0) return -1; + return 0; +} + +static int rdbSaveLen(FILE *fp, uint32_t len) { + unsigned char buf[2]; + + if (len < (1<<6)) { + /* Save a 6 bit len */ + buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6); + if (fwrite(buf,1,1,fp) == 0) return -1; + } else if (len < (1<<14)) { + /* Save a 14 bit len */ + buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6); + buf[1] = len&0xFF; + if (fwrite(buf,4,1,fp) == 0) return -1; + } else { + /* Save a 32 bit len */ + buf[0] = (REDIS_RDB_32BITLEN<<6); + if (fwrite(buf,1,1,fp) == 0) return -1; + len = htonl(len); + if (fwrite(&len,4,1,fp) == 0) return -1; + } + return 0; +} + +static int rdbSaveStringObject(FILE *fp, robj *obj) { + size_t len = sdslen(obj->ptr); + + if (rdbSaveLen(fp,len) == -1) return -1; + if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1; + return 0; +} + /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */ -static int saveDb(char *filename) { +static int rdbSave(char *filename) { dictIterator *di = NULL; dictEntry *de; - uint32_t len; - uint8_t type; FILE *fp; char tmpfile[256]; int j; @@ -1405,7 +1541,7 @@ static int saveDb(char *filename) { redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno)); return REDIS_ERR; } - if (fwrite("REDIS0000",9,1,fp) == 0) goto werr; + if (fwrite("REDIS0001",9,1,fp) == 0) goto werr; for (j = 0; j < server.dbnum; j++) { dict *d = server.dict[j]; if (dictGetHashTableUsed(d) == 0) continue; @@ -1416,60 +1552,43 @@ static int saveDb(char *filename) { } /* Write the SELECT DB opcode */ - type = REDIS_SELECTDB; - len = htonl(j); - if (fwrite(&type,1,1,fp) == 0) goto werr; - if (fwrite(&len,4,1,fp) == 0) goto werr; + if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr; + if (rdbSaveLen(fp,j) == -1) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { robj *key = dictGetEntryKey(de); robj *o = dictGetEntryVal(de); - type = o->type; - len = htonl(sdslen(key->ptr)); - if (fwrite(&type,1,1,fp) == 0) goto werr; - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (fwrite(key->ptr,sdslen(key->ptr),1,fp) == 0) goto werr; - if (type == REDIS_STRING) { + if (rdbSaveType(fp,o->type) == -1) goto werr; + if (rdbSaveStringObject(fp,key) == -1) goto werr; + if (o->type == REDIS_STRING) { /* Save a string value */ - sds sval = o->ptr; - len = htonl(sdslen(sval)); - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (sdslen(sval) && - fwrite(sval,sdslen(sval),1,fp) == 0) goto werr; - } else if (type == REDIS_LIST) { + if (rdbSaveStringObject(fp,o) == -1) goto werr; + } else if (o->type == REDIS_LIST) { /* Save a list value */ list *list = o->ptr; listNode *ln = list->head; - len = htonl(listLength(list)); - if (fwrite(&len,4,1,fp) == 0) goto werr; + if (rdbSaveLen(fp,listLength(list)) == -1) goto werr; while(ln) { robj *eleobj = listNodeValue(ln); - len = htonl(sdslen(eleobj->ptr)); - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (sdslen(eleobj->ptr) && fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0) - goto werr; + + if (rdbSaveStringObject(fp,eleobj) == -1) goto werr; ln = ln->next; } - } else if (type == REDIS_SET) { + } else if (o->type == REDIS_SET) { /* Save a set value */ dict *set = o->ptr; dictIterator *di = dictGetIterator(set); dictEntry *de; if (!set) oom("dictGetIteraotr"); - len = htonl(dictGetHashTableUsed(set)); - if (fwrite(&len,4,1,fp) == 0) goto werr; + if (rdbSaveLen(fp,dictGetHashTableUsed(set)) == -1) goto werr; while((de = dictNext(di)) != NULL) { - robj *eleobj; + robj *eleobj = dictGetEntryKey(de); - eleobj = dictGetEntryKey(de); - len = htonl(sdslen(eleobj->ptr)); - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (sdslen(eleobj->ptr) && fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0) - goto werr; + if (rdbSaveStringObject(fp,eleobj) == -1) goto werr; } dictReleaseIterator(di); } else { @@ -1479,8 +1598,9 @@ static int saveDb(char *filename) { dictReleaseIterator(di); } /* EOF opcode */ - type = REDIS_EOF; - if (fwrite(&type,1,1,fp) == 0) goto werr; + if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr; + + /* Make sure data will not remain on the OS's output buffers */ fflush(fp); fsync(fileno(fp)); fclose(fp); @@ -1505,14 +1625,14 @@ werr: return REDIS_ERR; } -static int saveDbBackground(char *filename) { +static int rdbSaveBackground(char *filename) { pid_t childpid; if (server.bgsaveinprogress) return REDIS_ERR; if ((childpid = fork()) == 0) { /* Child */ close(server.fd); - if (saveDb(filename) == REDIS_OK) { + if (rdbSave(filename) == REDIS_OK) { exit(0); } else { exit(1); @@ -1526,84 +1646,107 @@ static int saveDbBackground(char *filename) { return REDIS_OK; /* unreached */ } -static int loadDb(char *filename) { +static int rdbLoadType(FILE *fp) { + unsigned char type; + if (fread(&type,1,1,fp) == 0) return -1; + return type; +} + +static uint32_t rdbLoadLen(FILE *fp, int rdbver) { + unsigned char buf[2]; + uint32_t len; + + if (rdbver == 0) { + if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR; + return ntohl(len); + } else { + if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR; + if ((buf[0]&0xC0) == REDIS_RDB_6BITLEN) { + /* Read a 6 bit len */ + return buf[0]; + } else if ((buf[0]&0xC0) == REDIS_RDB_14BITLEN) { + /* Read a 14 bit len */ + if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR; + return ((buf[0]&0x3F)<<8)|buf[1]; + } else { + /* Read a 32 bit len */ + if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR; + return ntohl(len); + } + } +} + +static robj *rdbLoadStringObject(FILE*fp,int rdbver) { + uint32_t len = rdbLoadLen(fp,rdbver); + sds val; + + if (len == REDIS_RDB_LENERR) return NULL; + val = sdsnewlen(NULL,len); + if (len && fread(val,len,1,fp) == 0) { + sdsfree(val); + return NULL; + } + return tryObjectSharing(createObject(REDIS_STRING,val)); +} + +static int rdbLoad(char *filename) { FILE *fp; - char buf[REDIS_LOADBUF_LEN]; /* Try to use this buffer instead of */ - char vbuf[REDIS_LOADBUF_LEN]; /* malloc() when the element is small */ - char *key = NULL, *val = NULL; - uint32_t klen,vlen,dbid; - uint8_t type; + robj *keyobj = NULL; + uint32_t dbid; + int type; int retval; dict *d = server.dict[0]; - + char buf[1024]; + int rdbver; fp = fopen(filename,"r"); if (!fp) return REDIS_ERR; if (fread(buf,9,1,fp) == 0) goto eoferr; - if (memcmp(buf,"REDIS0000",9) != 0) { + buf[9] = '\0'; + if (memcmp(buf,"REDIS",5) != 0) { fclose(fp); redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file"); return REDIS_ERR; } + rdbver = atoi(buf+5); + if (rdbver > 1) { + fclose(fp); + redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver); + return REDIS_ERR; + } while(1) { robj *o; /* Read type. */ - if (fread(&type,1,1,fp) == 0) goto eoferr; + if ((type = rdbLoadType(fp)) == -1) goto eoferr; if (type == REDIS_EOF) break; /* Handle SELECT DB opcode as a special case */ if (type == REDIS_SELECTDB) { - if (fread(&dbid,4,1,fp) == 0) goto eoferr; - dbid = ntohl(dbid); + if ((dbid = rdbLoadLen(fp,rdbver)) == REDIS_RDB_LENERR) goto eoferr; if (dbid >= (unsigned)server.dbnum) { - redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server compiled to handle more than %d databases. Exiting\n", server.dbnum); + redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum); exit(1); } d = server.dict[dbid]; continue; } /* Read key */ - if (fread(&klen,4,1,fp) == 0) goto eoferr; - klen = ntohl(klen); - if (klen <= REDIS_LOADBUF_LEN) { - key = buf; - } else { - key = zmalloc(klen); - if (!key) oom("Loading DB from file"); - } - if (fread(key,klen,1,fp) == 0) goto eoferr; + if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr; if (type == REDIS_STRING) { /* Read string value */ - if (fread(&vlen,4,1,fp) == 0) goto eoferr; - vlen = ntohl(vlen); - if (vlen <= REDIS_LOADBUF_LEN) { - val = vbuf; - } else { - val = zmalloc(vlen); - if (!val) oom("Loading DB from file"); - } - if (vlen && fread(val,vlen,1,fp) == 0) goto eoferr; - o = createObject(REDIS_STRING,sdsnewlen(val,vlen)); + if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr; } else if (type == REDIS_LIST || type == REDIS_SET) { /* Read list/set value */ uint32_t listlen; - if (fread(&listlen,4,1,fp) == 0) goto eoferr; - listlen = ntohl(listlen); + + if ((listlen = rdbLoadLen(fp,rdbver)) == REDIS_RDB_LENERR) + goto eoferr; o = (type == REDIS_LIST) ? createListObject() : createSetObject(); /* Load every single element of the list/set */ while(listlen--) { robj *ele; - if (fread(&vlen,4,1,fp) == 0) goto eoferr; - vlen = ntohl(vlen); - if (vlen <= REDIS_LOADBUF_LEN) { - val = vbuf; - } else { - val = zmalloc(vlen); - if (!val) oom("Loading DB from file"); - } - if (vlen && fread(val,vlen,1,fp) == 0) goto eoferr; - ele = createObject(REDIS_STRING,sdsnewlen(val,vlen)); + if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr; if (type == REDIS_LIST) { if (!listAddNodeTail((list*)o->ptr,ele)) oom("listAddNodeTail"); @@ -1611,30 +1754,23 @@ static int loadDb(char *filename) { if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR) oom("dictAdd"); } - /* free the temp buffer if needed */ - if (val != vbuf) zfree(val); - val = NULL; } } else { assert(0 != 0); } /* Add the new object in the hash table */ - retval = dictAdd(d,createStringObject(key,klen),o); + retval = dictAdd(d,keyobj,o); if (retval == DICT_ERR) { - redisLog(REDIS_WARNING,"Loading DB, duplicated key found! Unrecoverable error, exiting now."); + redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr); exit(1); } - /* Iteration cleanup */ - if (key != buf) zfree(key); - if (val != vbuf) zfree(val); - key = val = NULL; + keyobj = o = NULL; } fclose(fp); return REDIS_OK; eoferr: /* unexpected end of file is handled here with a fatal exit */ - if (key != buf) zfree(key); - if (val != vbuf) zfree(val); + decrRefCount(keyobj); redisLog(REDIS_WARNING,"Short read loading DB. Unrecoverable error, exiting now."); exit(1); return REDIS_ERR; /* Just to avoid warning */ @@ -1642,12 +1778,22 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ /*================================== Commands =============================== */ +static void authCommand(redisClient *c) { + if (!strcmp(c->argv[1]->ptr, server.requirepass)) { + c->authenticated = 1; + addReply(c,shared.ok); + } else { + c->authenticated = 0; + addReply(c,shared.err); + } +} + static void pingCommand(redisClient *c) { addReply(c,shared.pong); } static void echoCommand(redisClient *c) { - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n", + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n", (int)sdslen(c->argv[1]->ptr))); addReply(c,c->argv[1]); addReply(c,shared.crlf); @@ -1664,7 +1810,7 @@ static void setGenericCommand(redisClient *c, int nx) { dictReplace(c->dict,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); } else { - addReply(c,shared.zero); + addReply(c,shared.czero); return; } } else { @@ -1672,7 +1818,7 @@ static void setGenericCommand(redisClient *c, int nx) { incrRefCount(c->argv[2]); } server.dirty++; - addReply(c, nx ? shared.one : shared.ok); + addReply(c, nx ? shared.cone : shared.ok); } static void setCommand(redisClient *c) { @@ -1688,14 +1834,14 @@ static void getCommand(redisClient *c) { de = dictFind(c->dict,c->argv[1]); if (de == NULL) { - addReply(c,shared.nil); + addReply(c,shared.nullbulk); } else { robj *o = dictGetEntryVal(de); if (o->type != REDIS_STRING) { - addReply(c,shared.wrongtypeerrbulk); + addReply(c,shared.wrongtypeerr); } else { - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(o->ptr))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr))); addReply(c,o); addReply(c,shared.crlf); } @@ -1706,18 +1852,18 @@ static void mgetCommand(redisClient *c) { dictEntry *de; int j; - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",c->argc-1)); + addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1)); for (j = 1; j < c->argc; j++) { de = dictFind(c->dict,c->argv[j]); if (de == NULL) { - addReply(c,shared.minus1); + addReply(c,shared.nullbulk); } else { robj *o = dictGetEntryVal(de); if (o->type != REDIS_STRING) { - addReply(c,shared.minus1); + addReply(c,shared.nullbulk); } else { - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(o->ptr))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr))); addReply(c,o); addReply(c,shared.crlf); } @@ -1755,6 +1901,7 @@ static void incrDecrCommand(redisClient *c, int incr) { incrRefCount(c->argv[1]); } server.dirty++; + addReply(c,shared.colon); addReply(c,o); addReply(c,shared.crlf); } @@ -1782,9 +1929,9 @@ static void decrbyCommand(redisClient *c) { static void delCommand(redisClient *c) { if (dictDelete(c->dict,c->argv[1]) == DICT_OK) { server.dirty++; - addReply(c,shared.one); + addReply(c,shared.cone); } else { - addReply(c,shared.zero); + addReply(c,shared.czero); } } @@ -1793,9 +1940,9 @@ static void existsCommand(redisClient *c) { de = dictFind(c->dict,c->argv[1]); if (de == NULL) - addReply(c,shared.zero); + addReply(c,shared.czero); else - addReply(c,shared.one); + addReply(c,shared.cone); } static void selectCommand(redisClient *c) { @@ -1815,6 +1962,7 @@ static void randomkeyCommand(redisClient *c) { if (de == NULL) { addReply(c,shared.crlf); } else { + addReply(c,shared.plus); addReply(c,dictGetEntryKey(de)); addReply(c,shared.crlf); } @@ -1845,18 +1993,18 @@ static void keysCommand(redisClient *c) { } } dictReleaseIterator(di); - lenobj->ptr = sdscatprintf(sdsempty(),"%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0)); + lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0)); addReply(c,shared.crlf); } static void dbsizeCommand(redisClient *c) { addReplySds(c, - sdscatprintf(sdsempty(),"%lu\r\n",dictGetHashTableUsed(c->dict))); + sdscatprintf(sdsempty(),":%lu\r\n",dictGetHashTableUsed(c->dict))); } static void lastsaveCommand(redisClient *c) { addReplySds(c, - sdscatprintf(sdsempty(),"%lu\r\n",server.lastsave)); + sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave)); } static void typeCommand(redisClient *c) { @@ -1865,14 +2013,14 @@ static void typeCommand(redisClient *c) { de = dictFind(c->dict,c->argv[1]); if (de == NULL) { - type = "none"; + type = "+none"; } else { robj *o = dictGetEntryVal(de); switch(o->type) { - case REDIS_STRING: type = "string"; break; - case REDIS_LIST: type = "list"; break; - case REDIS_SET: type = "set"; break; + case REDIS_STRING: type = "+string"; break; + case REDIS_LIST: type = "+list"; break; + case REDIS_SET: type = "+set"; break; default: type = "unknown"; break; } } @@ -1881,7 +2029,7 @@ static void typeCommand(redisClient *c) { } static void saveCommand(redisClient *c) { - if (saveDb(server.dbfilename) == REDIS_OK) { + if (rdbSave(server.dbfilename) == REDIS_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); @@ -1893,7 +2041,7 @@ static void bgsaveCommand(redisClient *c) { addReplySds(c,sdsnew("-ERR background save already in progress\r\n")); return; } - if (saveDbBackground(server.dbfilename) == REDIS_OK) { + if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); @@ -1902,7 +2050,7 @@ static void bgsaveCommand(redisClient *c) { static void shutdownCommand(redisClient *c) { redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); - if (saveDb(server.dbfilename) == REDIS_OK) { + if (rdbSave(server.dbfilename) == REDIS_OK) { if (server.daemonize) { unlink(server.pidfile); } @@ -1920,19 +2068,13 @@ static void renameGenericCommand(redisClient *c, int nx) { /* To use the same key as src and dst is probably an error */ if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) { - if (nx) - addReply(c,shared.minus3); - else - addReplySds(c,sdsnew("-ERR src and dest key are the same\r\n")); + addReply(c,shared.sameobjecterr); return; } de = dictFind(c->dict,c->argv[1]); if (de == NULL) { - if (nx) - addReply(c,shared.minus1); - else - addReply(c,shared.nokeyerr); + addReply(c,shared.nokeyerr); return; } o = dictGetEntryVal(de); @@ -1940,7 +2082,7 @@ static void renameGenericCommand(redisClient *c, int nx) { if (dictAdd(c->dict,c->argv[2],o) == DICT_ERR) { if (nx) { decrRefCount(o); - addReply(c,shared.zero); + addReply(c,shared.czero); return; } dictReplace(c->dict,c->argv[2],o); @@ -1949,7 +2091,7 @@ static void renameGenericCommand(redisClient *c, int nx) { } dictDelete(c->dict,c->argv[1]); server.dirty++; - addReply(c,nx ? shared.one : shared.ok); + addReply(c,nx ? shared.cone : shared.ok); } static void renameCommand(redisClient *c) { @@ -1970,7 +2112,7 @@ static void moveCommand(redisClient *c) { src = c->dict; srcid = c->dictid; if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) { - addReply(c,shared.minus4); + addReply(c,shared.outofrangeerr); return; } dst = c->dict; @@ -1980,14 +2122,14 @@ static void moveCommand(redisClient *c) { /* If the user is moving using as target the same * DB as the source DB it is probably an error. */ if (src == dst) { - addReply(c,shared.minus3); + addReply(c,shared.sameobjecterr); return; } /* Check if the element exists and get a reference */ de = dictFind(c->dict,c->argv[1]); if (!de) { - addReply(c,shared.zero); + addReply(c,shared.czero); return; } @@ -1995,7 +2137,7 @@ static void moveCommand(redisClient *c) { key = dictGetEntryKey(de); o = dictGetEntryVal(de); if (dictAdd(dst,key,o) == DICT_ERR) { - addReply(c,shared.zero); + addReply(c,shared.czero); return; } incrRefCount(key); @@ -2004,7 +2146,7 @@ static void moveCommand(redisClient *c) { /* OK! key moved, free the entry in the source DB */ dictDelete(src,c->argv[1]); server.dirty++; - addReply(c,shared.one); + addReply(c,shared.cone); } /* =================================== Lists ================================ */ @@ -2057,15 +2199,15 @@ static void llenCommand(redisClient *c) { de = dictFind(c->dict,c->argv[1]); if (de == NULL) { - addReply(c,shared.zero); + addReply(c,shared.czero); return; } else { robj *o = dictGetEntryVal(de); if (o->type != REDIS_LIST) { - addReply(c,shared.minus2); + addReply(c,shared.wrongtypeerr); } else { l = o->ptr; - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",listLength(l))); + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l))); } } } @@ -2076,22 +2218,22 @@ static void lindexCommand(redisClient *c) { de = dictFind(c->dict,c->argv[1]); if (de == NULL) { - addReply(c,shared.nil); + addReply(c,shared.nullbulk); } else { robj *o = dictGetEntryVal(de); if (o->type != REDIS_LIST) { - addReply(c,shared.wrongtypeerrbulk); + addReply(c,shared.wrongtypeerr); } else { list *list = o->ptr; listNode *ln; ln = listIndex(list, index); if (ln == NULL) { - addReply(c,shared.nil); + addReply(c,shared.nullbulk); } else { robj *ele = listNodeValue(ln); - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(ele->ptr))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr))); addReply(c,ele); addReply(c,shared.crlf); } @@ -2117,7 +2259,7 @@ static void lsetCommand(redisClient *c) { ln = listIndex(list, index); if (ln == NULL) { - addReplySds(c,sdsnew("-ERR index out of range\r\n")); + addReply(c,shared.outofrangeerr); } else { robj *ele = listNodeValue(ln); @@ -2136,12 +2278,12 @@ static void popGenericCommand(redisClient *c, int where) { de = dictFind(c->dict,c->argv[1]); if (de == NULL) { - addReply(c,shared.nil); + addReply(c,shared.nullbulk); } else { robj *o = dictGetEntryVal(de); if (o->type != REDIS_LIST) { - addReply(c,shared.wrongtypeerrbulk); + addReply(c,shared.wrongtypeerr); } else { list *list = o->ptr; listNode *ln; @@ -2152,10 +2294,10 @@ static void popGenericCommand(redisClient *c, int where) { ln = listLast(list); if (ln == NULL) { - addReply(c,shared.nil); + addReply(c,shared.nullbulk); } else { robj *ele = listNodeValue(ln); - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(ele->ptr))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr))); addReply(c,ele); addReply(c,shared.crlf); listDelNode(list,ln); @@ -2180,12 +2322,12 @@ static void lrangeCommand(redisClient *c) { de = dictFind(c->dict,c->argv[1]); if (de == NULL) { - addReply(c,shared.nil); + addReply(c,shared.nullmultibulk); } else { robj *o = dictGetEntryVal(de); if (o->type != REDIS_LIST) { - addReply(c,shared.wrongtypeerrbulk); + addReply(c,shared.wrongtypeerr); } else { list *list = o->ptr; listNode *ln; @@ -2202,7 +2344,7 @@ static void lrangeCommand(redisClient *c) { /* indexes sanity checks */ if (start > end || start >= llen) { /* Out of range start or start > end result in empty list */ - addReply(c,shared.zero); + addReply(c,shared.emptymultibulk); return; } if (end >= llen) end = llen-1; @@ -2210,10 +2352,10 @@ static void lrangeCommand(redisClient *c) { /* Return the result in form of a multi-bulk reply */ ln = listIndex(list, start); - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",rangelen)); + addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen)); for (j = 0; j < rangelen; j++) { ele = listNodeValue(ln); - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(ele->ptr))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr))); addReply(c,ele); addReply(c,shared.crlf); ln = ln->next; @@ -2278,12 +2420,12 @@ static void lremCommand(redisClient *c) { de = dictFind(c->dict,c->argv[1]); if (de == NULL) { - addReply(c,shared.minus1); + addReply(c,shared.nokeyerr); } else { robj *o = dictGetEntryVal(de); if (o->type != REDIS_LIST) { - addReply(c,shared.minus2); + addReply(c,shared.wrongtypeerr); } else { list *list = o->ptr; listNode *ln, *next; @@ -2307,7 +2449,7 @@ static void lremCommand(redisClient *c) { } ln = next; } - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",removed)); + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed)); } } } @@ -2326,16 +2468,16 @@ static void saddCommand(redisClient *c) { } else { set = dictGetEntryVal(de); if (set->type != REDIS_SET) { - addReply(c,shared.minus2); + addReply(c,shared.wrongtypeerr); return; } } if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) { incrRefCount(c->argv[2]); server.dirty++; - addReply(c,shared.one); + addReply(c,shared.cone); } else { - addReply(c,shared.zero); + addReply(c,shared.czero); } } @@ -2344,20 +2486,20 @@ static void sremCommand(redisClient *c) { de = dictFind(c->dict,c->argv[1]); if (de == NULL) { - addReply(c,shared.zero); + addReply(c,shared.czero); } else { robj *set; set = dictGetEntryVal(de); if (set->type != REDIS_SET) { - addReply(c,shared.minus2); + addReply(c,shared.wrongtypeerr); return; } if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) { server.dirty++; - addReply(c,shared.one); + addReply(c,shared.cone); } else { - addReply(c,shared.zero); + addReply(c,shared.czero); } } } @@ -2367,19 +2509,19 @@ static void sismemberCommand(redisClient *c) { de = dictFind(c->dict,c->argv[1]); if (de == NULL) { - addReply(c,shared.zero); + addReply(c,shared.czero); } else { robj *set; set = dictGetEntryVal(de); if (set->type != REDIS_SET) { - addReply(c,shared.minus2); + addReply(c,shared.wrongtypeerr); return; } if (dictFind(set->ptr,c->argv[2])) - addReply(c,shared.one); + addReply(c,shared.cone); else - addReply(c,shared.zero); + addReply(c,shared.czero); } } @@ -2389,15 +2531,15 @@ static void scardCommand(redisClient *c) { de = dictFind(c->dict,c->argv[1]); if (de == NULL) { - addReply(c,shared.zero); + addReply(c,shared.czero); return; } else { robj *o = dictGetEntryVal(de); if (o->type != REDIS_SET) { - addReply(c,shared.minus2); + addReply(c,shared.wrongtypeerr); } else { s = o->ptr; - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n", + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n", dictGetHashTableUsed(s))); } } @@ -2424,13 +2566,13 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r de = dictFind(c->dict,setskeys[j]); if (!de) { zfree(dv); - addReply(c,dstkey ? shared.nokeyerr : shared.nil); + addReply(c,shared.nokeyerr); return; } setobj = dictGetEntryVal(de); if (setobj->type != REDIS_SET) { zfree(dv); - addReply(c,dstkey ? shared.wrongtypeerr : shared.wrongtypeerrbulk); + addReply(c,shared.wrongtypeerr); return; } dv[j] = setobj->ptr; @@ -2472,7 +2614,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r continue; /* at least one set does not contain the member */ ele = dictGetEntryKey(de); if (!dstkey) { - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",sdslen(ele->ptr))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr))); addReply(c,ele); addReply(c,shared.crlf); cardinality++; @@ -2484,7 +2626,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r dictReleaseIterator(di); if (!dstkey) - lenobj->ptr = sdscatprintf(sdsempty(),"%d\r\n",cardinality); + lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality); else addReply(c,shared.ok); zfree(dv); @@ -2501,13 +2643,13 @@ static void sinterstoreCommand(redisClient *c) { static void flushdbCommand(redisClient *c) { dictEmpty(c->dict); addReply(c,shared.ok); - saveDb(server.dbfilename); + rdbSave(server.dbfilename); } static void flushallCommand(redisClient *c) { emptyDb(); addReply(c,shared.ok); - saveDb(server.dbfilename); + rdbSave(server.dbfilename); } redisSortOperation *createSortOperation(int type, robj *pattern) { @@ -2614,19 +2756,19 @@ static void sortCommand(redisClient *c) { /* Lookup the key to sort. It must be of the right types */ de = dictFind(c->dict,c->argv[1]); if (de == NULL) { - addReply(c,shared.nokeyerrbulk); + addReply(c,shared.nokeyerr); return; } sortval = dictGetEntryVal(de); if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) { - addReply(c,shared.wrongtypeerrbulk); + addReply(c,shared.wrongtypeerr); return; } /* Create a list of operations to perform for every sorted element. * Operations can be GET/DEL/INCR/DECR */ operations = listCreate(); - listSetFreeMethod(operations,free); + listSetFreeMethod(operations,zfree); j = 2; /* Now we need to protect sortval incrementing its count, in the future @@ -2673,7 +2815,7 @@ static void sortCommand(redisClient *c) { } else { decrRefCount(sortval); listRelease(operations); - addReply(c,shared.syntaxerrbulk); + addReply(c,shared.syntaxerr); return; } j++; @@ -2754,11 +2896,11 @@ static void sortCommand(redisClient *c) { /* Send command output to the output buffer, performing the specified * GET/DEL/INCR/DECR operations if any. */ outputlen = getop ? getop*(end-start+1) : end-start+1; - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",outputlen)); + addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen)); for (j = start; j <= end; j++) { listNode *ln = operations->head; if (!getop) { - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n", + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n", sdslen(vector[j].obj->ptr))); addReply(c,vector[j].obj); addReply(c,shared.crlf); @@ -2770,9 +2912,9 @@ static void sortCommand(redisClient *c) { if (sop->type == REDIS_SORT_GET) { if (!val || val->type != REDIS_STRING) { - addReply(c,shared.minus1); + addReply(c,shared.nullbulk); } else { - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n", + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n", sdslen(val->ptr))); addReply(c,val); addReply(c,shared.crlf); @@ -2820,7 +2962,7 @@ static void infoCommand(redisClient *c) { uptime, uptime/(3600*24) ); - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",sdslen(info))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info))); addReplySds(c,info); addReply(c,shared.crlf); } @@ -2912,15 +3054,19 @@ static void syncCommand(redisClient *c) { time_t start = time(NULL); char sizebuf[32]; + /* ignore SYNC if aleady slave or in monitor mode */ + if (c->flags & REDIS_SLAVE) return; + redisLog(REDIS_NOTICE,"Slave ask for syncronization"); - if (flushClientOutput(c) == REDIS_ERR || saveDb(server.dbfilename) != REDIS_OK) + if (flushClientOutput(c) == REDIS_ERR || + rdbSave(server.dbfilename) != REDIS_OK) goto closeconn; fd = open(server.dbfilename, O_RDONLY); if (fd == -1 || fstat(fd,&sb) == -1) goto closeconn; len = sb.st_size; - snprintf(sizebuf,32,"%d\r\n",len); + snprintf(sizebuf,32,"$%d\r\n",len); if (syncWrite(c->fd,sizebuf,strlen(sizebuf),5) == -1) goto closeconn; while(len) { char buf[1024]; @@ -2972,7 +3118,7 @@ static int syncWithMaster(void) { strerror(errno)); return REDIS_ERR; } - dumpsize = atoi(buf); + dumpsize = atoi(buf+1); redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize); /* Read the bulk write data on a temp file */ snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random()); @@ -3010,7 +3156,7 @@ static int syncWithMaster(void) { return REDIS_ERR; } emptyDb(); - if (loadDb(server.dbfilename) != REDIS_OK) { + if (rdbLoad(server.dbfilename) != REDIS_OK) { redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); close(fd); return REDIS_ERR; @@ -3021,6 +3167,16 @@ static int syncWithMaster(void) { return REDIS_OK; } +static void monitorCommand(redisClient *c) { + /* ignore MONITOR if aleady slave or in monitor mode */ + if (c->flags & REDIS_SLAVE) return; + + c->flags |= (REDIS_SLAVE|REDIS_MONITOR); + c->slaveseldb = 0; + if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail"); + addReply(c,shared.ok); +} + /* =================================== Main! ================================ */ static void daemonize(void) { @@ -3059,11 +3215,11 @@ int main(int argc, char **argv) { initServer(); if (server.daemonize) daemonize(); redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION); - if (loadDb(server.dbfilename) == REDIS_OK) + if (rdbLoad(server.dbfilename) == REDIS_OK) redisLog(REDIS_NOTICE,"DB loaded from disk"); if (aeCreateFileEvent(server.el, server.fd, AE_READABLE, acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event"); - redisLog(REDIS_NOTICE,"The server is now ready to accept connections"); + redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); aeMain(server.el); aeDeleteEventLoop(server.el); return 0;