X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/092dac2a64f912fe67be2e924c58979b200aa64b..bb32ede52e8e222735d12d7b350532a06b648c8d:/redis.c?ds=inline diff --git a/redis.c b/redis.c index e6606f18..3d225ba9 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,9 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "0.07" +#define REDIS_VERSION "0.09" + +#include "fmacros.h" #include #include @@ -46,6 +48,7 @@ #include #include #include +#include #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ @@ -53,6 +56,7 @@ #include "dict.h" /* Hash tables */ #include "adlist.h" /* Linked lists */ #include "zmalloc.h" /* total memory usage aware version of malloc/free */ +#include "lzf.h" /* Error codes */ #define REDIS_OK 0 @@ -68,6 +72,7 @@ #define REDIS_CONFIGLINE_MAX 1024 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */ #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */ +#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */ /* Hash table parameters */ #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */ @@ -82,13 +87,44 @@ #define REDIS_LIST 1 #define REDIS_SET 2 #define REDIS_HASH 3 + +/* Object types only used for dumping to disk */ +#define REDIS_EXPIRETIME 253 #define REDIS_SELECTDB 254 #define REDIS_EOF 255 +/* Defines related to the dump file format. To store 32 bits lengths for short + * keys requires a lot of space, so we check the most significant 2 bits of + * the first byte to interpreter the length: + * + * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte + * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte + * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow + * 11|000000 this means: specially encoded object will follow. The six bits + * number specify the kind of object that follows. + * See the REDIS_RDB_ENC_* defines. + * + * Lenghts up to 63 are stored using a single byte, most DB keys, and may + * values, will fit inside. */ +#define REDIS_RDB_6BITLEN 0 +#define REDIS_RDB_14BITLEN 1 +#define REDIS_RDB_32BITLEN 2 +#define REDIS_RDB_ENCVAL 3 +#define REDIS_RDB_LENERR UINT_MAX + +/* When a length of a string object stored on disk has the first two bits + * set, the remaining two bits specify a special encoding for the object + * accordingly to the following defines: */ +#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */ +#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */ +#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */ +#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */ + /* Client flags */ #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */ #define REDIS_SLAVE 2 /* This client is a slave server */ #define REDIS_MASTER 4 /* This client is a master server */ +#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */ /* Server replication state */ #define REDIS_REPL_NONE 0 /* No active replication */ @@ -125,11 +161,17 @@ typedef struct redisObject { int refcount; } robj; +typedef struct redisDb { + dict *dict; + dict *expires; + int id; +} redisDb; + /* With multiplexing we need to take per-clinet state. * Clients are taken in a liked list. */ typedef struct redisClient { int fd; - dict *dict; + redisDb *db; int dictid; sds querybuf; robj *argv[REDIS_MAX_ARGS]; @@ -138,8 +180,9 @@ typedef struct redisClient { list *reply; int sentlen; time_t lastinteraction; /* time of the last interaction, used for timeout */ - int flags; /* REDIS_CLOSE | REDIS_SLAVE */ + int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */ int slaveseldb; /* slave selected db, if this client is a slave */ + int authenticated; /* when requirepass is non-NULL */ } redisClient; struct saveparam { @@ -151,10 +194,12 @@ struct saveparam { struct redisServer { int port; int fd; - dict **dict; + redisDb *db; + dict *sharingpool; + unsigned int sharingpoolsize; long long dirty; /* changes to DB from the last save */ list *clients; - list *slaves; + list *slaves, *monitors; char neterr[ANET_ERR_LEN]; aeEventLoop *el; int cronloops; /* number of times the cron function run */ @@ -171,12 +216,15 @@ struct redisServer { int maxidletime; int dbnum; int daemonize; + char *pidfile; int bgsaveinprogress; struct saveparam *saveparams; int saveparamslen; char *logfile; char *bindaddr; char *dbfilename; + char *requirepass; + int shareobjects; /* Replication related */ int isslave; char *masterhost; @@ -212,10 +260,10 @@ typedef struct _redisSortOperation { } redisSortOperation; struct sharedObjectsStruct { - robj *crlf, *ok, *err, *zerobulk, *nil, *zero, *one, *pong, *space, - *minus1, *minus2, *minus3, *minus4, - *wrongtypeerr, *nokeyerr, *wrongtypeerrbulk, *nokeyerrbulk, - *syntaxerr, *syntaxerrbulk, + robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space, + *colon, *nullbulk, *nullmultibulk, + *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, + *outofrangeerr, *plus, *select0, *select1, *select2, *select3, *select4, *select5, *select6, *select7, *select8, *select9; } shared; @@ -228,15 +276,23 @@ static void freeSetObject(robj *o); static void decrRefCount(void *o); static robj *createObject(int type, void *ptr); static void freeClient(redisClient *c); -static int loadDb(char *filename); +static int rdbLoad(char *filename); static void addReply(redisClient *c, robj *obj); static void addReplySds(redisClient *c, sds s); static void incrRefCount(robj *o); -static int saveDbBackground(char *filename); +static int rdbSaveBackground(char *filename); static robj *createStringObject(char *ptr, size_t len); -static void replicationFeedSlaves(struct redisCommand *cmd, int dictid, robj **argv, int argc); +static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc); static int syncWithMaster(void); - +static robj *tryObjectSharing(robj *o); +static int removeExpire(redisDb *db, robj *key); +static int expireIfNeeded(redisDb *db, robj *key); +static int deleteIfVolatile(redisDb *db, robj *key); +static int deleteKey(redisDb *db, robj *key); +static time_t getExpire(redisDb *db, robj *key); +static int setExpire(redisDb *db, robj *key, time_t when); + +static void authCommand(redisClient *c); static void pingCommand(redisClient *c); static void echoCommand(redisClient *c); static void setCommand(redisClient *c); @@ -282,6 +338,8 @@ static void sortCommand(redisClient *c); static void lremCommand(redisClient *c); static void infoCommand(redisClient *c); static void mgetCommand(redisClient *c); +static void monitorCommand(redisClient *c); +static void expireCommand(redisClient *c); /*================================= Globals ================================= */ @@ -322,6 +380,7 @@ static struct redisCommand cmdTable[] = { {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE}, {"keys",keysCommand,2,REDIS_CMD_INLINE}, {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE}, + {"auth",authCommand,2,REDIS_CMD_INLINE}, {"ping",pingCommand,1,REDIS_CMD_INLINE}, {"echo",echoCommand,2,REDIS_CMD_BULK}, {"save",saveCommand,1,REDIS_CMD_INLINE}, @@ -334,6 +393,8 @@ static struct redisCommand cmdTable[] = { {"flushall",flushallCommand,1,REDIS_CMD_INLINE}, {"sort",sortCommand,-2,REDIS_CMD_INLINE}, {"info",infoCommand,1,REDIS_CMD_INLINE}, + {"monitor",monitorCommand,1,REDIS_CMD_INLINE}, + {"expire",expireCommand,3,REDIS_CMD_INLINE}, {NULL,NULL,0,0} }; @@ -573,7 +634,7 @@ void closeTimedoutClients(void) { } int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { - int j, size, used, loops = server.cronloops++; + int j, loops = server.cronloops++; REDIS_NOTUSED(eventLoop); REDIS_NOTUSED(id); REDIS_NOTUSED(clientData); @@ -584,16 +645,19 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL * we resize the hash table to save memory */ for (j = 0; j < server.dbnum; j++) { - size = dictGetHashTableSize(server.dict[j]); - used = dictGetHashTableUsed(server.dict[j]); + int size, used, vkeys; + + size = dictSlots(server.db[j].dict); + used = dictSize(server.db[j].dict); + vkeys = dictSize(server.db[j].expires); if (!(loops % 5) && used > 0) { - redisLog(REDIS_DEBUG,"DB %d: %d keys in %d slots HT.",j,used,size); - // dictPrintStats(server.dict); + redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size); + /* dictPrintStats(server.dict); */ } if (size && used && size > REDIS_HT_MINSLOTS && (used*100/size < REDIS_HT_MINFILL)) { redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j); - dictResize(server.dict[j]); + dictResize(server.db[j].dict); redisLog(REDIS_NOTICE,"Hash table %d resized.",j); } } @@ -603,7 +667,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %d bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), - server.usedmemory); + server.usedmemory, + dictSize(server.sharingpool)); } /* Close connections of timedout clients */ @@ -637,11 +702,35 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { now-server.lastsave > sp->seconds) { redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, sp->seconds); - saveDbBackground(server.dbfilename); + rdbSaveBackground(server.dbfilename); break; } } } + + /* Try to expire a few timed out keys */ + for (j = 0; j < server.dbnum; j++) { + redisDb *db = server.db+j; + int num = dictSize(db->expires); + + if (num) { + time_t now = time(NULL); + + if (num > REDIS_EXPIRELOOKUPS_PER_CRON) + num = REDIS_EXPIRELOOKUPS_PER_CRON; + while (num--) { + dictEntry *de; + time_t t; + + if ((de = dictGetRandomKey(db->expires)) == NULL) break; + t = (time_t) dictGetEntryVal(de); + if (now > t) { + deleteKey(db,dictGetEntryKey(de)); + } + } + } + } + /* Check if we should connect to a MASTER */ if (server.replstate == REDIS_REPL_CONNECT) { redisLog(REDIS_NOTICE,"Connecting to MASTER..."); @@ -656,29 +745,27 @@ static void createSharedObjects(void) { shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n")); shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n")); shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n")); - shared.zerobulk = createObject(REDIS_STRING,sdsnew("0\r\n\r\n")); - shared.nil = createObject(REDIS_STRING,sdsnew("nil\r\n")); - shared.zero = createObject(REDIS_STRING,sdsnew("0\r\n")); - shared.one = createObject(REDIS_STRING,sdsnew("1\r\n")); + shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n")); + shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n")); + shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n")); + shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n")); + shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n")); + shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n")); /* no such key */ - shared.minus1 = createObject(REDIS_STRING,sdsnew("-1\r\n")); - /* operation against key holding a value of the wrong type */ - shared.minus2 = createObject(REDIS_STRING,sdsnew("-2\r\n")); - /* src and dest objects are the same */ - shared.minus3 = createObject(REDIS_STRING,sdsnew("-3\r\n")); - /* out of range argument */ - shared.minus4 = createObject(REDIS_STRING,sdsnew("-4\r\n")); shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n")); shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew( "-ERR Operation against a key holding the wrong kind of value\r\n")); - shared.wrongtypeerrbulk = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%d\r\n%s",-sdslen(shared.wrongtypeerr->ptr)+2,shared.wrongtypeerr->ptr)); shared.nokeyerr = createObject(REDIS_STRING,sdsnew( "-ERR no such key\r\n")); - shared.nokeyerrbulk = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%d\r\n%s",-sdslen(shared.nokeyerr->ptr)+2,shared.nokeyerr->ptr)); shared.syntaxerr = createObject(REDIS_STRING,sdsnew( "-ERR syntax error\r\n")); - shared.syntaxerrbulk = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%d\r\n%s",-sdslen(shared.syntaxerr->ptr)+2,shared.syntaxerr->ptr)); + shared.sameobjecterr = createObject(REDIS_STRING,sdsnew( + "-ERR source and destination objects are the same\r\n")); + shared.outofrangeerr = createObject(REDIS_STRING,sdsnew( + "-ERR index out of range\r\n")); shared.space = createObject(REDIS_STRING,sdsnew(" ")); + shared.colon = createObject(REDIS_STRING,sdsnew(":")); + shared.plus = createObject(REDIS_STRING,sdsnew("+")); shared.select0 = createStringObject("select 0\r\n",10); shared.select1 = createStringObject("select 1\r\n",10); shared.select2 = createStringObject("select 2\r\n",10); @@ -715,7 +802,10 @@ static void initServerConfig() { server.bindaddr = NULL; server.glueoutputbuf = 1; server.daemonize = 0; + server.pidfile = "/var/run/redis.pid"; server.dbfilename = "dump.rdb"; + server.requirepass = NULL; + server.shareobjects = 0; ResetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -737,11 +827,14 @@ static void initServer() { server.clients = listCreate(); server.slaves = listCreate(); + server.monitors = listCreate(); server.objfreelist = listCreate(); createSharedObjects(); server.el = aeCreateEventLoop(); - server.dict = zmalloc(sizeof(dict*)*server.dbnum); - if (!server.dict || !server.clients || !server.slaves || !server.el || !server.objfreelist) + server.db = zmalloc(sizeof(redisDb)*server.dbnum); + server.sharingpool = dictCreate(&setDictType,NULL); + server.sharingpoolsize = 1024; + if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist) oom("server initialization"); /* Fatal OOM */ server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); if (server.fd == -1) { @@ -749,9 +842,9 @@ static void initServer() { exit(1); } for (j = 0; j < server.dbnum; j++) { - server.dict[j] = dictCreate(&hashDictType,NULL); - if (!server.dict[j]) - oom("dictCreate"); /* Fatal OOM */ + server.db[j].dict = dictCreate(&hashDictType,NULL); + server.db[j].expires = dictCreate(&setDictType,NULL); + server.db[j].id = j; } server.cronloops = 0; server.bgsaveinprogress = 0; @@ -768,8 +861,10 @@ static void initServer() { static void emptyDb() { int j; - for (j = 0; j < server.dbnum; j++) - dictEmpty(server.dict[j]); + for (j = 0; j < server.dbnum; j++) { + dictEmpty(server.db[j].dict); + dictEmpty(server.db[j].expires); + } } /* I agree, this is a very rudimental way to load a configuration... @@ -871,6 +966,13 @@ static void loadServerConfig(char *filename) { else { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcmp(argv[0],"shareobjects") && argc == 2) { + sdstolower(argv[1]); + if (!strcmp(argv[1],"yes")) server.shareobjects = 1; + else if (!strcmp(argv[1],"no")) server.shareobjects = 0; + else { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcmp(argv[0],"daemonize") && argc == 2) { sdstolower(argv[1]); if (!strcmp(argv[1],"yes")) server.daemonize = 1; @@ -878,6 +980,10 @@ static void loadServerConfig(char *filename) { else { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcmp(argv[0],"requirepass") && argc == 2) { + server.requirepass = zstrdup(argv[1]); + } else if (!strcmp(argv[0],"pidfile") && argc == 2) { + server.pidfile = zstrdup(argv[1]); } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@ -918,9 +1024,10 @@ static void freeClient(redisClient *c) { assert(ln != NULL); listDelNode(server.clients,ln); if (c->flags & REDIS_SLAVE) { - ln = listSearchKey(server.slaves,c); + list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves; + ln = listSearchKey(l,c); assert(ln != NULL); - listDelNode(server.slaves,ln); + listDelNode(l,ln); } if (c->flags & REDIS_MASTER) { server.master = NULL; @@ -981,7 +1088,7 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) if (c->flags & REDIS_MASTER) { nwritten = objlen - c->sentlen; } else { - nwritten = write(fd, o->ptr+c->sentlen, objlen - c->sentlen); + nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen); if (nwritten <= 0) break; } c->sentlen += nwritten; @@ -1075,11 +1182,26 @@ static int processCommand(redisClient *c) { return 1; } } + /* Let's try to share objects on the command arguments vector */ + if (server.shareobjects) { + int j; + for(j = 1; j < c->argc; j++) + c->argv[j] = tryObjectSharing(c->argv[j]); + } + /* Check if the user is authenticated */ + if (server.requirepass && !c->authenticated && cmd->proc != authCommand) { + addReplySds(c,sdsnew("-ERR operation not permitted\r\n")); + resetClient(c); + return 1; + } + /* Exec the command */ dirty = server.dirty; cmd->proc(c); if (server.dirty-dirty != 0 && listLength(server.slaves)) - replicationFeedSlaves(cmd,c->dictid,c->argv,c->argc); + replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc); + if (listLength(server.monitors)) + replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc); server.stat_numcommands++; /* Prepare the client for the next command */ @@ -1091,8 +1213,8 @@ static int processCommand(redisClient *c) { return 1; } -static void replicationFeedSlaves(struct redisCommand *cmd, int dictid, robj **argv, int argc) { - listNode *ln = server.slaves->head; +static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) { + listNode *ln = slaves->head; robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */ int outc = 0, j; @@ -1237,8 +1359,7 @@ again: static int selectDb(redisClient *c, int id) { if (id < 0 || id >= server.dbnum) return REDIS_ERR; - c->dict = server.dict[id]; - c->dictid = id; + c->db = &server.db[id]; return REDIS_OK; } @@ -1256,6 +1377,7 @@ static redisClient *createClient(int fd) { c->sentlen = 0; c->flags = 0; c->lastinteraction = time(NULL); + c->authenticated = 0; if ((c->reply = listCreate()) == NULL) oom("listCreate"); listSetFreeMethod(c->reply,decrRefCount); if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, @@ -1339,14 +1461,6 @@ static robj *createSetObject(void) { return createObject(REDIS_SET,d); } -#if 0 -static robj *createHashObject(void) { - dict *d = dictCreate(&hashDictType,NULL); - if (!d) oom("dictCreate"); - return createObject(REDIS_SET,d); -} -#endif - static void freeStringObject(robj *o) { sdsfree(o->ptr); } @@ -1365,10 +1479,19 @@ static void freeHashObject(robj *o) { static void incrRefCount(robj *o) { o->refcount++; +#ifdef DEBUG_REFCOUNT + if (o->type == REDIS_STRING) + printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount); +#endif } static void decrRefCount(void *obj) { robj *o = obj; + +#ifdef DEBUG_REFCOUNT + if (o->type == REDIS_STRING) + printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1); +#endif if (--(o->refcount) == 0) { switch(o->type) { case REDIS_STRING: freeStringObject(o); break; @@ -1383,17 +1506,223 @@ static void decrRefCount(void *obj) { } } +/* Try to share an object against the shared objects pool */ +static robj *tryObjectSharing(robj *o) { + struct dictEntry *de; + unsigned long c; + + if (o == NULL || server.shareobjects == 0) return o; + + assert(o->type == REDIS_STRING); + de = dictFind(server.sharingpool,o); + if (de) { + robj *shared = dictGetEntryKey(de); + + c = ((unsigned long) dictGetEntryVal(de))+1; + dictGetEntryVal(de) = (void*) c; + incrRefCount(shared); + decrRefCount(o); + return shared; + } else { + /* Here we are using a stream algorihtm: Every time an object is + * shared we increment its count, everytime there is a miss we + * recrement the counter of a random object. If this object reaches + * zero we remove the object and put the current object instead. */ + if (dictSize(server.sharingpool) >= + server.sharingpoolsize) { + de = dictGetRandomKey(server.sharingpool); + assert(de != NULL); + c = ((unsigned long) dictGetEntryVal(de))-1; + dictGetEntryVal(de) = (void*) c; + if (c == 0) { + dictDelete(server.sharingpool,de->key); + } + } else { + c = 0; /* If the pool is empty we want to add this object */ + } + if (c == 0) { + int retval; + + retval = dictAdd(server.sharingpool,o,(void*)1); + assert(retval == DICT_OK); + incrRefCount(o); + } + return o; + } +} + +static robj *lookupKey(redisDb *db, robj *key) { + dictEntry *de = dictFind(db->dict,key); + return de ? dictGetEntryVal(de) : NULL; +} + +static robj *lookupKeyRead(redisDb *db, robj *key) { + expireIfNeeded(db,key); + return lookupKey(db,key); +} + +static robj *lookupKeyWrite(redisDb *db, robj *key) { + deleteIfVolatile(db,key); + return lookupKey(db,key); +} + +static int deleteKey(redisDb *db, robj *key) { + int retval; + + /* We need to protect key from destruction: after the first dictDelete() + * it may happen that 'key' is no longer valid if we don't increment + * it's count. This may happen when we get the object reference directly + * from the hash table with dictRandomKey() or dict iterators */ + incrRefCount(key); + if (dictSize(db->expires)) dictDelete(db->expires,key); + retval = dictDelete(db->dict,key); + decrRefCount(key); + + return retval == DICT_OK; +} + /*============================ DB saving/loading ============================ */ +static int rdbSaveType(FILE *fp, unsigned char type) { + if (fwrite(&type,1,1,fp) == 0) return -1; + return 0; +} + +static int rdbSaveTime(FILE *fp, time_t t) { + int32_t t32 = (int32_t) t; + if (fwrite(&t32,4,1,fp) == 0) return -1; + return 0; +} + +/* check rdbLoadLen() comments for more info */ +static int rdbSaveLen(FILE *fp, uint32_t len) { + unsigned char buf[2]; + + if (len < (1<<6)) { + /* Save a 6 bit len */ + buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6); + if (fwrite(buf,1,1,fp) == 0) return -1; + } else if (len < (1<<14)) { + /* Save a 14 bit len */ + buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6); + buf[1] = len&0xFF; + if (fwrite(buf,2,1,fp) == 0) return -1; + } else { + /* Save a 32 bit len */ + buf[0] = (REDIS_RDB_32BITLEN<<6); + if (fwrite(buf,1,1,fp) == 0) return -1; + len = htonl(len); + if (fwrite(&len,4,1,fp) == 0) return -1; + } + return 0; +} + +/* String objects in the form "2391" "-100" without any space and with a + * range of values that can fit in an 8, 16 or 32 bit signed value can be + * encoded as integers to save space */ +int rdbTryIntegerEncoding(sds s, unsigned char *enc) { + long long value; + char *endptr, buf[32]; + + /* Check if it's possible to encode this value as a number */ + value = strtoll(s, &endptr, 10); + if (endptr[0] != '\0') return 0; + snprintf(buf,32,"%lld",value); + + /* If the number converted back into a string is not identical + * then it's not possible to encode the string as integer */ + if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0; + + /* Finally check if it fits in our ranges */ + if (value >= -(1<<7) && value <= (1<<7)-1) { + enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8; + enc[1] = value&0xFF; + return 2; + } else if (value >= -(1<<15) && value <= (1<<15)-1) { + enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16; + enc[1] = value&0xFF; + enc[2] = (value>>8)&0xFF; + return 3; + } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) { + enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32; + enc[1] = value&0xFF; + enc[2] = (value>>8)&0xFF; + enc[3] = (value>>16)&0xFF; + enc[4] = (value>>24)&0xFF; + return 5; + } else { + return 0; + } +} + +static int rdbSaveLzfStringObject(FILE *fp, robj *obj) { + unsigned int comprlen, outlen; + unsigned char byte; + void *out; + + /* We require at least four bytes compression for this to be worth it */ + outlen = sdslen(obj->ptr)-4; + if (outlen <= 0) return 0; + if ((out = zmalloc(outlen)) == NULL) return 0; + comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen); + if (comprlen == 0) { + zfree(out); + return 0; + } + /* Data compressed! Let's save it on disk */ + byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF; + if (fwrite(&byte,1,1,fp) == 0) goto writeerr; + if (rdbSaveLen(fp,comprlen) == -1) goto writeerr; + if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr; + if (fwrite(out,comprlen,1,fp) == 0) goto writeerr; + zfree(out); + return comprlen; + +writeerr: + zfree(out); + return -1; +} + +/* Save a string objet as [len][data] on disk. If the object is a string + * representation of an integer value we try to safe it in a special form */ +static int rdbSaveStringObject(FILE *fp, robj *obj) { + size_t len = sdslen(obj->ptr); + int enclen; + + /* Try integer encoding */ + if (len <= 11) { + unsigned char buf[5]; + if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) { + if (fwrite(buf,enclen,1,fp) == 0) return -1; + return 0; + } + } + + /* Try LZF compression - under 20 bytes it's unable to compress even + * aaaaaaaaaaaaaaaaaa so skip it */ + if (len > 20) { + int retval; + + retval = rdbSaveLzfStringObject(fp,obj); + if (retval == -1) return -1; + if (retval > 0) return 0; + /* retval == 0 means data can't be compressed, save the old way */ + } + + /* Store verbatim */ + if (rdbSaveLen(fp,len) == -1) return -1; + if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1; + return 0; +} + /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */ -static int saveDb(char *filename) { +static int rdbSave(char *filename) { dictIterator *di = NULL; dictEntry *de; - uint32_t len; - uint8_t type; FILE *fp; char tmpfile[256]; int j; + time_t now = time(NULL); snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random()); fp = fopen(tmpfile,"w"); @@ -1401,10 +1730,11 @@ static int saveDb(char *filename) { redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno)); return REDIS_ERR; } - if (fwrite("REDIS0000",9,1,fp) == 0) goto werr; + if (fwrite("REDIS0001",9,1,fp) == 0) goto werr; for (j = 0; j < server.dbnum; j++) { - dict *d = server.dict[j]; - if (dictGetHashTableUsed(d) == 0) continue; + redisDb *db = server.db+j; + dict *d = db->dict; + if (dictSize(d) == 0) continue; di = dictGetIterator(d); if (!di) { fclose(fp); @@ -1412,60 +1742,52 @@ static int saveDb(char *filename) { } /* Write the SELECT DB opcode */ - type = REDIS_SELECTDB; - len = htonl(j); - if (fwrite(&type,1,1,fp) == 0) goto werr; - if (fwrite(&len,4,1,fp) == 0) goto werr; + if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr; + if (rdbSaveLen(fp,j) == -1) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { robj *key = dictGetEntryKey(de); robj *o = dictGetEntryVal(de); - - type = o->type; - len = htonl(sdslen(key->ptr)); - if (fwrite(&type,1,1,fp) == 0) goto werr; - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (fwrite(key->ptr,sdslen(key->ptr),1,fp) == 0) goto werr; - if (type == REDIS_STRING) { + time_t expiretime = getExpire(db,key); + + /* Save the expire time */ + if (expiretime != -1) { + /* If this key is already expired skip it */ + if (expiretime < now) continue; + if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr; + if (rdbSaveTime(fp,expiretime) == -1) goto werr; + } + /* Save the key and associated value */ + if (rdbSaveType(fp,o->type) == -1) goto werr; + if (rdbSaveStringObject(fp,key) == -1) goto werr; + if (o->type == REDIS_STRING) { /* Save a string value */ - sds sval = o->ptr; - len = htonl(sdslen(sval)); - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (sdslen(sval) && - fwrite(sval,sdslen(sval),1,fp) == 0) goto werr; - } else if (type == REDIS_LIST) { + if (rdbSaveStringObject(fp,o) == -1) goto werr; + } else if (o->type == REDIS_LIST) { /* Save a list value */ list *list = o->ptr; listNode *ln = list->head; - len = htonl(listLength(list)); - if (fwrite(&len,4,1,fp) == 0) goto werr; + if (rdbSaveLen(fp,listLength(list)) == -1) goto werr; while(ln) { robj *eleobj = listNodeValue(ln); - len = htonl(sdslen(eleobj->ptr)); - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (sdslen(eleobj->ptr) && fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0) - goto werr; + + if (rdbSaveStringObject(fp,eleobj) == -1) goto werr; ln = ln->next; } - } else if (type == REDIS_SET) { + } else if (o->type == REDIS_SET) { /* Save a set value */ dict *set = o->ptr; dictIterator *di = dictGetIterator(set); dictEntry *de; if (!set) oom("dictGetIteraotr"); - len = htonl(dictGetHashTableUsed(set)); - if (fwrite(&len,4,1,fp) == 0) goto werr; + if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr; while((de = dictNext(di)) != NULL) { - robj *eleobj; + robj *eleobj = dictGetEntryKey(de); - eleobj = dictGetEntryKey(de); - len = htonl(sdslen(eleobj->ptr)); - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (sdslen(eleobj->ptr) && fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0) - goto werr; + if (rdbSaveStringObject(fp,eleobj) == -1) goto werr; } dictReleaseIterator(di); } else { @@ -1475,8 +1797,9 @@ static int saveDb(char *filename) { dictReleaseIterator(di); } /* EOF opcode */ - type = REDIS_EOF; - if (fwrite(&type,1,1,fp) == 0) goto werr; + if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr; + + /* Make sure data will not remain on the OS's output buffers */ fflush(fp); fsync(fileno(fp)); fclose(fp); @@ -1501,14 +1824,14 @@ werr: return REDIS_ERR; } -static int saveDbBackground(char *filename) { +static int rdbSaveBackground(char *filename) { pid_t childpid; if (server.bgsaveinprogress) return REDIS_ERR; if ((childpid = fork()) == 0) { /* Child */ close(server.fd); - if (saveDb(filename) == REDIS_OK) { + if (rdbSave(filename) == REDIS_OK) { exit(0); } else { exit(1); @@ -1522,84 +1845,191 @@ static int saveDbBackground(char *filename) { return REDIS_OK; /* unreached */ } -static int loadDb(char *filename) { +static int rdbLoadType(FILE *fp) { + unsigned char type; + if (fread(&type,1,1,fp) == 0) return -1; + return type; +} + +static time_t rdbLoadTime(FILE *fp) { + int32_t t32; + if (fread(&t32,4,1,fp) == 0) return -1; + return (time_t) t32; +} + +/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top + * of this file for a description of how this are stored on disk. + * + * isencoded is set to 1 if the readed length is not actually a length but + * an "encoding type", check the above comments for more info */ +static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) { + unsigned char buf[2]; + uint32_t len; + + if (isencoded) *isencoded = 0; + if (rdbver == 0) { + if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR; + return ntohl(len); + } else { + int type; + + if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR; + type = (buf[0]&0xC0)>>6; + if (type == REDIS_RDB_6BITLEN) { + /* Read a 6 bit len */ + return buf[0]&0x3F; + } else if (type == REDIS_RDB_ENCVAL) { + /* Read a 6 bit len encoding type */ + if (isencoded) *isencoded = 1; + return buf[0]&0x3F; + } else if (type == REDIS_RDB_14BITLEN) { + /* Read a 14 bit len */ + if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR; + return ((buf[0]&0x3F)<<8)|buf[1]; + } else { + /* Read a 32 bit len */ + if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR; + return ntohl(len); + } + } +} + +static robj *rdbLoadIntegerObject(FILE *fp, int enctype) { + unsigned char enc[4]; + long long val; + + if (enctype == REDIS_RDB_ENC_INT8) { + if (fread(enc,1,1,fp) == 0) return NULL; + val = (signed char)enc[0]; + } else if (enctype == REDIS_RDB_ENC_INT16) { + uint16_t v; + if (fread(enc,2,1,fp) == 0) return NULL; + v = enc[0]|(enc[1]<<8); + val = (int16_t)v; + } else if (enctype == REDIS_RDB_ENC_INT32) { + uint32_t v; + if (fread(enc,4,1,fp) == 0) return NULL; + v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24); + val = (int32_t)v; + } else { + val = 0; /* anti-warning */ + assert(0!=0); + } + return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val)); +} + +static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) { + unsigned int len, clen; + unsigned char *c = NULL; + sds val = NULL; + + if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((c = zmalloc(clen)) == NULL) goto err; + if ((val = sdsnewlen(NULL,len)) == NULL) goto err; + if (fread(c,clen,1,fp) == 0) goto err; + if (lzf_decompress(c,clen,val,len) == 0) goto err; + return createObject(REDIS_STRING,val); +err: + zfree(c); + sdsfree(val); + return NULL; +} + +static robj *rdbLoadStringObject(FILE*fp, int rdbver) { + int isencoded; + uint32_t len; + sds val; + + len = rdbLoadLen(fp,rdbver,&isencoded); + if (isencoded) { + switch(len) { + case REDIS_RDB_ENC_INT8: + case REDIS_RDB_ENC_INT16: + case REDIS_RDB_ENC_INT32: + return tryObjectSharing(rdbLoadIntegerObject(fp,len)); + case REDIS_RDB_ENC_LZF: + return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver)); + default: + assert(0!=0); + } + } + + if (len == REDIS_RDB_LENERR) return NULL; + val = sdsnewlen(NULL,len); + if (len && fread(val,len,1,fp) == 0) { + sdsfree(val); + return NULL; + } + return tryObjectSharing(createObject(REDIS_STRING,val)); +} + +static int rdbLoad(char *filename) { FILE *fp; - char buf[REDIS_LOADBUF_LEN]; /* Try to use this buffer instead of */ - char vbuf[REDIS_LOADBUF_LEN]; /* malloc() when the element is small */ - char *key = NULL, *val = NULL; - uint32_t klen,vlen,dbid; - uint8_t type; - int retval; - dict *d = server.dict[0]; + robj *keyobj = NULL; + uint32_t dbid; + int type, retval, rdbver; + dict *d = server.db[0].dict; + redisDb *db = server.db+0; + char buf[1024]; + time_t expiretime = -1, now = time(NULL); fp = fopen(filename,"r"); if (!fp) return REDIS_ERR; if (fread(buf,9,1,fp) == 0) goto eoferr; - if (memcmp(buf,"REDIS0000",9) != 0) { + buf[9] = '\0'; + if (memcmp(buf,"REDIS",5) != 0) { fclose(fp); redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file"); return REDIS_ERR; } + rdbver = atoi(buf+5); + if (rdbver > 1) { + fclose(fp); + redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver); + return REDIS_ERR; + } while(1) { robj *o; /* Read type. */ - if (fread(&type,1,1,fp) == 0) goto eoferr; + if ((type = rdbLoadType(fp)) == -1) goto eoferr; + if (type == REDIS_EXPIRETIME) { + if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr; + /* We read the time so we need to read the object type again */ + if ((type = rdbLoadType(fp)) == -1) goto eoferr; + } if (type == REDIS_EOF) break; /* Handle SELECT DB opcode as a special case */ if (type == REDIS_SELECTDB) { - if (fread(&dbid,4,1,fp) == 0) goto eoferr; - dbid = ntohl(dbid); + if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) + goto eoferr; if (dbid >= (unsigned)server.dbnum) { - redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server compiled to handle more than %d databases. Exiting\n", server.dbnum); + redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum); exit(1); } - d = server.dict[dbid]; + db = server.db+dbid; + d = db->dict; continue; } /* Read key */ - if (fread(&klen,4,1,fp) == 0) goto eoferr; - klen = ntohl(klen); - if (klen <= REDIS_LOADBUF_LEN) { - key = buf; - } else { - key = zmalloc(klen); - if (!key) oom("Loading DB from file"); - } - if (fread(key,klen,1,fp) == 0) goto eoferr; + if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr; if (type == REDIS_STRING) { /* Read string value */ - if (fread(&vlen,4,1,fp) == 0) goto eoferr; - vlen = ntohl(vlen); - if (vlen <= REDIS_LOADBUF_LEN) { - val = vbuf; - } else { - val = zmalloc(vlen); - if (!val) oom("Loading DB from file"); - } - if (vlen && fread(val,vlen,1,fp) == 0) goto eoferr; - o = createObject(REDIS_STRING,sdsnewlen(val,vlen)); + if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr; } else if (type == REDIS_LIST || type == REDIS_SET) { /* Read list/set value */ uint32_t listlen; - if (fread(&listlen,4,1,fp) == 0) goto eoferr; - listlen = ntohl(listlen); + + if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) + goto eoferr; o = (type == REDIS_LIST) ? createListObject() : createSetObject(); /* Load every single element of the list/set */ while(listlen--) { robj *ele; - if (fread(&vlen,4,1,fp) == 0) goto eoferr; - vlen = ntohl(vlen); - if (vlen <= REDIS_LOADBUF_LEN) { - val = vbuf; - } else { - val = zmalloc(vlen); - if (!val) oom("Loading DB from file"); - } - if (vlen && fread(val,vlen,1,fp) == 0) goto eoferr; - ele = createObject(REDIS_STRING,sdsnewlen(val,vlen)); + if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr; if (type == REDIS_LIST) { if (!listAddNodeTail((list*)o->ptr,ele)) oom("listAddNodeTail"); @@ -1607,43 +2037,53 @@ static int loadDb(char *filename) { if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR) oom("dictAdd"); } - /* free the temp buffer if needed */ - if (val != vbuf) zfree(val); - val = NULL; } } else { assert(0 != 0); } /* Add the new object in the hash table */ - retval = dictAdd(d,createStringObject(key,klen),o); + retval = dictAdd(d,keyobj,o); if (retval == DICT_ERR) { - redisLog(REDIS_WARNING,"Loading DB, duplicated key found! Unrecoverable error, exiting now."); + redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr); exit(1); } - /* Iteration cleanup */ - if (key != buf) zfree(key); - if (val != vbuf) zfree(val); - key = val = NULL; + /* Set the expire time if needed */ + if (expiretime != -1) { + setExpire(db,keyobj,expiretime); + /* Delete this key if already expired */ + if (expiretime < now) deleteKey(db,keyobj); + expiretime = -1; + } + keyobj = o = NULL; } fclose(fp); return REDIS_OK; eoferr: /* unexpected end of file is handled here with a fatal exit */ - if (key != buf) zfree(key); - if (val != vbuf) zfree(val); - redisLog(REDIS_WARNING,"Short read loading DB. Unrecoverable error, exiting now."); + if (keyobj) decrRefCount(keyobj); + redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now."); exit(1); return REDIS_ERR; /* Just to avoid warning */ } /*================================== Commands =============================== */ +static void authCommand(redisClient *c) { + if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) { + c->authenticated = 1; + addReply(c,shared.ok); + } else { + c->authenticated = 0; + addReply(c,shared.err); + } +} + static void pingCommand(redisClient *c) { addReply(c,shared.pong); } static void echoCommand(redisClient *c) { - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n", + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n", (int)sdslen(c->argv[1]->ptr))); addReply(c,c->argv[1]); addReply(c,shared.crlf); @@ -1654,13 +2094,13 @@ static void echoCommand(redisClient *c) { static void setGenericCommand(redisClient *c, int nx) { int retval; - retval = dictAdd(c->dict,c->argv[1],c->argv[2]); + retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); if (retval == DICT_ERR) { if (!nx) { - dictReplace(c->dict,c->argv[1],c->argv[2]); + dictReplace(c->db->dict,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); } else { - addReply(c,shared.zero); + addReply(c,shared.czero); return; } } else { @@ -1668,30 +2108,28 @@ static void setGenericCommand(redisClient *c, int nx) { incrRefCount(c->argv[2]); } server.dirty++; - addReply(c, nx ? shared.one : shared.ok); + removeExpire(c->db,c->argv[1]); + addReply(c, nx ? shared.cone : shared.ok); } static void setCommand(redisClient *c) { - return setGenericCommand(c,0); + setGenericCommand(c,0); } static void setnxCommand(redisClient *c) { - return setGenericCommand(c,1); + setGenericCommand(c,1); } static void getCommand(redisClient *c) { - dictEntry *de; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { - addReply(c,shared.nil); + robj *o = lookupKeyRead(c->db,c->argv[1]); + + if (o == NULL) { + addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_STRING) { - addReply(c,shared.wrongtypeerrbulk); + addReply(c,shared.wrongtypeerr); } else { - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(o->ptr))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr))); addReply(c,o); addReply(c,shared.crlf); } @@ -1699,21 +2137,18 @@ static void getCommand(redisClient *c) { } static void mgetCommand(redisClient *c) { - dictEntry *de; int j; - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",c->argc-1)); + addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1)); for (j = 1; j < c->argc; j++) { - de = dictFind(c->dict,c->argv[j]); - if (de == NULL) { - addReply(c,shared.minus1); + robj *o = lookupKeyRead(c->db,c->argv[j]); + if (o == NULL) { + addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_STRING) { - addReply(c,shared.minus1); + addReply(c,shared.nullbulk); } else { - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(o->ptr))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr))); addReply(c,o); addReply(c,shared.crlf); } @@ -1722,17 +2157,14 @@ static void mgetCommand(redisClient *c) { } static void incrDecrCommand(redisClient *c, int incr) { - dictEntry *de; long long value; int retval; robj *o; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { value = 0; } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_STRING) { value = 0; } else { @@ -1744,61 +2176,57 @@ static void incrDecrCommand(redisClient *c, int incr) { value += incr; o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value)); - retval = dictAdd(c->dict,c->argv[1],o); + retval = dictAdd(c->db->dict,c->argv[1],o); if (retval == DICT_ERR) { - dictReplace(c->dict,c->argv[1],o); + dictReplace(c->db->dict,c->argv[1],o); + removeExpire(c->db,c->argv[1]); } else { incrRefCount(c->argv[1]); } server.dirty++; + addReply(c,shared.colon); addReply(c,o); addReply(c,shared.crlf); } static void incrCommand(redisClient *c) { - return incrDecrCommand(c,1); + incrDecrCommand(c,1); } static void decrCommand(redisClient *c) { - return incrDecrCommand(c,-1); + incrDecrCommand(c,-1); } static void incrbyCommand(redisClient *c) { int incr = atoi(c->argv[2]->ptr); - return incrDecrCommand(c,incr); + incrDecrCommand(c,incr); } static void decrbyCommand(redisClient *c) { int incr = atoi(c->argv[2]->ptr); - return incrDecrCommand(c,-incr); + incrDecrCommand(c,-incr); } /* ========================= Type agnostic commands ========================= */ static void delCommand(redisClient *c) { - if (dictDelete(c->dict,c->argv[1]) == DICT_OK) { + if (deleteKey(c->db,c->argv[1])) { server.dirty++; - addReply(c,shared.one); + addReply(c,shared.cone); } else { - addReply(c,shared.zero); + addReply(c,shared.czero); } } static void existsCommand(redisClient *c) { - dictEntry *de; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) - addReply(c,shared.zero); - else - addReply(c,shared.one); + addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero); } static void selectCommand(redisClient *c) { int id = atoi(c->argv[1]->ptr); if (selectDb(c,id) == REDIS_ERR) { - addReplySds(c,"-ERR invalid DB index\r\n"); + addReplySds(c,sdsnew("-ERR invalid DB index\r\n")); } else { addReply(c,shared.ok); } @@ -1806,11 +2234,15 @@ static void selectCommand(redisClient *c) { static void randomkeyCommand(redisClient *c) { dictEntry *de; - - de = dictGetRandomKey(c->dict); + + while(1) { + de = dictGetRandomKey(c->db->dict); + if (expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break; + } if (de == NULL) { addReply(c,shared.crlf); } else { + addReply(c,shared.plus); addReply(c,dictGetEntryKey(de)); addReply(c,shared.crlf); } @@ -1824,51 +2256,52 @@ static void keysCommand(redisClient *c) { int numkeys = 0, keyslen = 0; robj *lenobj = createObject(REDIS_STRING,NULL); - di = dictGetIterator(c->dict); + di = dictGetIterator(c->db->dict); if (!di) oom("dictGetIterator"); addReply(c,lenobj); decrRefCount(lenobj); while((de = dictNext(di)) != NULL) { robj *keyobj = dictGetEntryKey(de); + sds key = keyobj->ptr; if ((pattern[0] == '*' && pattern[1] == '\0') || stringmatchlen(pattern,plen,key,sdslen(key),0)) { - if (numkeys != 0) - addReply(c,shared.space); - addReply(c,keyobj); - numkeys++; - keyslen += sdslen(key); + if (expireIfNeeded(c->db,keyobj) == 0) { + if (numkeys != 0) + addReply(c,shared.space); + addReply(c,keyobj); + numkeys++; + keyslen += sdslen(key); + } } } dictReleaseIterator(di); - lenobj->ptr = sdscatprintf(sdsempty(),"%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0)); + lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0)); addReply(c,shared.crlf); } static void dbsizeCommand(redisClient *c) { addReplySds(c, - sdscatprintf(sdsempty(),"%lu\r\n",dictGetHashTableUsed(c->dict))); + sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict))); } static void lastsaveCommand(redisClient *c) { addReplySds(c, - sdscatprintf(sdsempty(),"%lu\r\n",server.lastsave)); + sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave)); } static void typeCommand(redisClient *c) { - dictEntry *de; + robj *o; char *type; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { - type = "none"; + + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { + type = "+none"; } else { - robj *o = dictGetEntryVal(de); - switch(o->type) { - case REDIS_STRING: type = "string"; break; - case REDIS_LIST: type = "list"; break; - case REDIS_SET: type = "set"; break; + case REDIS_STRING: type = "+string"; break; + case REDIS_LIST: type = "+list"; break; + case REDIS_SET: type = "+set"; break; default: type = "unknown"; break; } } @@ -1877,7 +2310,7 @@ static void typeCommand(redisClient *c) { } static void saveCommand(redisClient *c) { - if (saveDb(server.dbfilename) == REDIS_OK) { + if (rdbSave(server.dbfilename) == REDIS_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); @@ -1889,7 +2322,7 @@ static void bgsaveCommand(redisClient *c) { addReplySds(c,sdsnew("-ERR background save already in progress\r\n")); return; } - if (saveDbBackground(server.dbfilename) == REDIS_OK) { + if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); @@ -1898,7 +2331,10 @@ static void bgsaveCommand(redisClient *c) { static void shutdownCommand(redisClient *c) { redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); - if (saveDb(server.dbfilename) == REDIS_OK) { + if (rdbSave(server.dbfilename) == REDIS_OK) { + if (server.daemonize) { + unlink(server.pidfile); + } redisLog(REDIS_WARNING,"Server exit now, bye bye..."); exit(1); } else { @@ -1908,41 +2344,34 @@ static void shutdownCommand(redisClient *c) { } static void renameGenericCommand(redisClient *c, int nx) { - dictEntry *de; robj *o; /* To use the same key as src and dst is probably an error */ if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) { - if (nx) - addReply(c,shared.minus3); - else - addReplySds(c,sdsnew("-ERR src and dest key are the same\r\n")); + addReply(c,shared.sameobjecterr); return; } - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { - if (nx) - addReply(c,shared.minus1); - else - addReply(c,shared.nokeyerr); + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { + addReply(c,shared.nokeyerr); return; } - o = dictGetEntryVal(de); incrRefCount(o); - if (dictAdd(c->dict,c->argv[2],o) == DICT_ERR) { + deleteIfVolatile(c->db,c->argv[2]); + if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) { if (nx) { decrRefCount(o); - addReply(c,shared.zero); + addReply(c,shared.czero); return; } - dictReplace(c->dict,c->argv[2],o); + dictReplace(c->db->dict,c->argv[2],o); } else { incrRefCount(c->argv[2]); } - dictDelete(c->dict,c->argv[1]); + deleteKey(c->db,c->argv[1]); server.dirty++; - addReply(c,nx ? shared.one : shared.ok); + addReply(c,nx ? shared.cone : shared.ok); } static void renameCommand(redisClient *c) { @@ -1954,60 +2383,56 @@ static void renamenxCommand(redisClient *c) { } static void moveCommand(redisClient *c) { - dictEntry *de; - robj *o, *key; - dict *src, *dst; + robj *o; + redisDb *src, *dst; int srcid; /* Obtain source and target DB pointers */ - src = c->dict; - srcid = c->dictid; + src = c->db; + srcid = c->db->id; if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) { - addReply(c,shared.minus4); + addReply(c,shared.outofrangeerr); return; } - dst = c->dict; - c->dict = src; - c->dictid = srcid; + dst = c->db; + selectDb(c,srcid); /* Back to the source DB */ /* If the user is moving using as target the same * DB as the source DB it is probably an error. */ if (src == dst) { - addReply(c,shared.minus3); + addReply(c,shared.sameobjecterr); return; } /* Check if the element exists and get a reference */ - de = dictFind(c->dict,c->argv[1]); - if (!de) { - addReply(c,shared.zero); + o = lookupKeyWrite(c->db,c->argv[1]); + if (!o) { + addReply(c,shared.czero); return; } /* Try to add the element to the target DB */ - key = dictGetEntryKey(de); - o = dictGetEntryVal(de); - if (dictAdd(dst,key,o) == DICT_ERR) { - addReply(c,shared.zero); + deleteIfVolatile(dst,c->argv[1]); + if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) { + addReply(c,shared.czero); return; } - incrRefCount(key); + incrRefCount(c->argv[1]); incrRefCount(o); /* OK! key moved, free the entry in the source DB */ - dictDelete(src,c->argv[1]); + deleteKey(src,c->argv[1]); server.dirty++; - addReply(c,shared.one); + addReply(c,shared.cone); } /* =================================== Lists ================================ */ static void pushGenericCommand(redisClient *c, int where) { robj *lobj; - dictEntry *de; list *list; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + + lobj = lookupKeyWrite(c->db,c->argv[1]); + if (lobj == NULL) { lobj = createListObject(); list = lobj->ptr; if (where == REDIS_HEAD) { @@ -2015,11 +2440,10 @@ static void pushGenericCommand(redisClient *c, int where) { } else { if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail"); } - dictAdd(c->dict,c->argv[1],lobj); + dictAdd(c->db->dict,c->argv[1],lobj); incrRefCount(c->argv[1]); incrRefCount(c->argv[2]); } else { - lobj = dictGetEntryVal(de); if (lobj->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); return; @@ -2045,46 +2469,43 @@ static void rpushCommand(redisClient *c) { } static void llenCommand(redisClient *c) { - dictEntry *de; + robj *o; list *l; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { - addReply(c,shared.zero); + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { + addReply(c,shared.czero); return; } else { - robj *o = dictGetEntryVal(de); if (o->type != REDIS_LIST) { - addReply(c,shared.minus2); + addReply(c,shared.wrongtypeerr); } else { l = o->ptr; - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",listLength(l))); + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l))); } } } static void lindexCommand(redisClient *c) { - dictEntry *de; + robj *o; int index = atoi(c->argv[2]->ptr); - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { - addReply(c,shared.nil); + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { + addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { - addReply(c,shared.wrongtypeerrbulk); + addReply(c,shared.wrongtypeerr); } else { list *list = o->ptr; listNode *ln; ln = listIndex(list, index); if (ln == NULL) { - addReply(c,shared.nil); + addReply(c,shared.nullbulk); } else { robj *ele = listNodeValue(ln); - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(ele->ptr))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr))); addReply(c,ele); addReply(c,shared.crlf); } @@ -2093,15 +2514,13 @@ static void lindexCommand(redisClient *c) { } static void lsetCommand(redisClient *c) { - dictEntry *de; + robj *o; int index = atoi(c->argv[2]->ptr); - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nokeyerr); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2110,7 +2529,7 @@ static void lsetCommand(redisClient *c) { ln = listIndex(list, index); if (ln == NULL) { - addReplySds(c,sdsnew("-ERR index out of range\r\n")); + addReply(c,shared.outofrangeerr); } else { robj *ele = listNodeValue(ln); @@ -2125,16 +2544,14 @@ static void lsetCommand(redisClient *c) { } static void popGenericCommand(redisClient *c, int where) { - dictEntry *de; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { - addReply(c,shared.nil); + robj *o; + + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { + addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { - addReply(c,shared.wrongtypeerrbulk); + addReply(c,shared.wrongtypeerr); } else { list *list = o->ptr; listNode *ln; @@ -2145,10 +2562,10 @@ static void popGenericCommand(redisClient *c, int where) { ln = listLast(list); if (ln == NULL) { - addReply(c,shared.nil); + addReply(c,shared.nullbulk); } else { robj *ele = listNodeValue(ln); - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(ele->ptr))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr))); addReply(c,ele); addReply(c,shared.crlf); listDelNode(list,ln); @@ -2167,18 +2584,16 @@ static void rpopCommand(redisClient *c) { } static void lrangeCommand(redisClient *c) { - dictEntry *de; + robj *o; int start = atoi(c->argv[2]->ptr); int end = atoi(c->argv[3]->ptr); - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { - addReply(c,shared.nil); + + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { + addReply(c,shared.nullmultibulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { - addReply(c,shared.wrongtypeerrbulk); + addReply(c,shared.wrongtypeerr); } else { list *list = o->ptr; listNode *ln; @@ -2195,7 +2610,7 @@ static void lrangeCommand(redisClient *c) { /* indexes sanity checks */ if (start > end || start >= llen) { /* Out of range start or start > end result in empty list */ - addReply(c,shared.zero); + addReply(c,shared.emptymultibulk); return; } if (end >= llen) end = llen-1; @@ -2203,10 +2618,10 @@ static void lrangeCommand(redisClient *c) { /* Return the result in form of a multi-bulk reply */ ln = listIndex(list, start); - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",rangelen)); + addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen)); for (j = 0; j < rangelen; j++) { ele = listNodeValue(ln); - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",(int)sdslen(ele->ptr))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr))); addReply(c,ele); addReply(c,shared.crlf); ln = ln->next; @@ -2216,16 +2631,14 @@ static void lrangeCommand(redisClient *c) { } static void ltrimCommand(redisClient *c) { - dictEntry *de; + robj *o; int start = atoi(c->argv[2]->ptr); int end = atoi(c->argv[3]->ptr); - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nokeyerr); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2267,16 +2680,14 @@ static void ltrimCommand(redisClient *c) { } static void lremCommand(redisClient *c) { - dictEntry *de; + robj *o; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { - addReply(c,shared.minus1); + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { + addReply(c,shared.nokeyerr); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { - addReply(c,shared.minus2); + addReply(c,shared.wrongtypeerr); } else { list *list = o->ptr; listNode *ln, *next; @@ -2290,8 +2701,9 @@ static void lremCommand(redisClient *c) { } ln = fromtail ? list->tail : list->head; while (ln) { - next = fromtail ? ln->prev : ln->next; robj *ele = listNodeValue(ln); + + next = fromtail ? ln->prev : ln->next; if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) { listDelNode(list,ln); server.dirty++; @@ -2300,7 +2712,7 @@ static void lremCommand(redisClient *c) { } ln = next; } - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",removed)); + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed)); } } } @@ -2308,90 +2720,81 @@ static void lremCommand(redisClient *c) { /* ==================================== Sets ================================ */ static void saddCommand(redisClient *c) { - dictEntry *de; robj *set; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + set = lookupKeyWrite(c->db,c->argv[1]); + if (set == NULL) { set = createSetObject(); - dictAdd(c->dict,c->argv[1],set); + dictAdd(c->db->dict,c->argv[1],set); incrRefCount(c->argv[1]); } else { - set = dictGetEntryVal(de); if (set->type != REDIS_SET) { - addReply(c,shared.minus2); + addReply(c,shared.wrongtypeerr); return; } } if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) { incrRefCount(c->argv[2]); server.dirty++; - addReply(c,shared.one); + addReply(c,shared.cone); } else { - addReply(c,shared.zero); + addReply(c,shared.czero); } } static void sremCommand(redisClient *c) { - dictEntry *de; + robj *set; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { - addReply(c,shared.zero); + set = lookupKeyWrite(c->db,c->argv[1]); + if (set == NULL) { + addReply(c,shared.czero); } else { - robj *set; - - set = dictGetEntryVal(de); if (set->type != REDIS_SET) { - addReply(c,shared.minus2); + addReply(c,shared.wrongtypeerr); return; } if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) { server.dirty++; - addReply(c,shared.one); + addReply(c,shared.cone); } else { - addReply(c,shared.zero); + addReply(c,shared.czero); } } } static void sismemberCommand(redisClient *c) { - dictEntry *de; + robj *set; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { - addReply(c,shared.zero); + set = lookupKeyRead(c->db,c->argv[1]); + if (set == NULL) { + addReply(c,shared.czero); } else { - robj *set; - - set = dictGetEntryVal(de); if (set->type != REDIS_SET) { - addReply(c,shared.minus2); + addReply(c,shared.wrongtypeerr); return; } if (dictFind(set->ptr,c->argv[2])) - addReply(c,shared.one); + addReply(c,shared.cone); else - addReply(c,shared.zero); + addReply(c,shared.czero); } } static void scardCommand(redisClient *c) { - dictEntry *de; + robj *o; dict *s; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { - addReply(c,shared.zero); + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { + addReply(c,shared.czero); return; } else { - robj *o = dictGetEntryVal(de); if (o->type != REDIS_SET) { - addReply(c,shared.minus2); + addReply(c,shared.wrongtypeerr); } else { s = o->ptr; - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n", - dictGetHashTableUsed(s))); + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n", + dictSize(s))); } } } @@ -2399,7 +2802,7 @@ static void scardCommand(redisClient *c) { static int qsortCompareSetsByCardinality(const void *s1, const void *s2) { dict **d1 = (void*) s1, **d2 = (void*) s2; - return dictGetHashTableUsed(*d1)-dictGetHashTableUsed(*d2); + return dictSize(*d1)-dictSize(*d2); } static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) { @@ -2412,18 +2815,18 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r if (!dv) oom("sinterCommand"); for (j = 0; j < setsnum; j++) { robj *setobj; - dictEntry *de; - - de = dictFind(c->dict,setskeys[j]); - if (!de) { + + setobj = dstkey ? + lookupKeyWrite(c->db,setskeys[j]) : + lookupKeyRead(c->db,setskeys[j]); + if (!setobj) { zfree(dv); - addReply(c,dstkey ? shared.nokeyerr : shared.nil); + addReply(c,shared.nokeyerr); return; } - setobj = dictGetEntryVal(de); if (setobj->type != REDIS_SET) { zfree(dv); - addReply(c,dstkey ? shared.wrongtypeerr : shared.wrongtypeerrbulk); + addReply(c,shared.wrongtypeerr); return; } dv[j] = setobj->ptr; @@ -2445,9 +2848,10 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r /* If we have a target key where to store the resulting set * create this key with an empty set inside */ dstset = createSetObject(); - dictDelete(c->dict,dstkey); - dictAdd(c->dict,dstkey,dstset); + deleteKey(c->db,dstkey); + dictAdd(c->db->dict,dstkey,dstset); incrRefCount(dstkey); + server.dirty++; } /* Iterate all the elements of the first (smallest) set, and test @@ -2465,19 +2869,20 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r continue; /* at least one set does not contain the member */ ele = dictGetEntryKey(de); if (!dstkey) { - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",sdslen(ele->ptr))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr))); addReply(c,ele); addReply(c,shared.crlf); cardinality++; } else { dictAdd(dstset->ptr,ele,NULL); incrRefCount(ele); + server.dirty++; } } dictReleaseIterator(di); if (!dstkey) - lenobj->ptr = sdscatprintf(sdsempty(),"%d\r\n",cardinality); + lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality); else addReply(c,shared.ok); zfree(dv); @@ -2492,15 +2897,18 @@ static void sinterstoreCommand(redisClient *c) { } static void flushdbCommand(redisClient *c) { - dictEmpty(c->dict); + dictEmpty(c->db->dict); + dictEmpty(c->db->expires); + server.dirty++; addReply(c,shared.ok); - saveDb(server.dbfilename); + rdbSave(server.dbfilename); } static void flushallCommand(redisClient *c) { emptyDb(); + server.dirty++; addReply(c,shared.ok); - saveDb(server.dbfilename); + rdbSave(server.dbfilename); } redisSortOperation *createSortOperation(int type, robj *pattern) { @@ -2513,12 +2921,11 @@ redisSortOperation *createSortOperation(int type, robj *pattern) { /* Return the value associated to the key with a name obtained * substituting the first occurence of '*' in 'pattern' with 'subst' */ -robj *lookupKeyByPattern(dict *dict, robj *pattern, robj *subst) { +robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { char *p; sds spat, ssub; robj keyobj; int prefixlen, sublen, postfixlen; - dictEntry *de; /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */ struct { long len; @@ -2526,7 +2933,6 @@ robj *lookupKeyByPattern(dict *dict, robj *pattern, robj *subst) { char buf[REDIS_SORTKEY_MAX+1]; } keyname; - spat = pattern->ptr; ssub = subst->ptr; if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL; @@ -2546,10 +2952,8 @@ robj *lookupKeyByPattern(dict *dict, robj *pattern, robj *subst) { keyobj.type = REDIS_STRING; keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2); - de = dictFind(dict,&keyobj); - // printf("lookup '%s' => %p\n", keyname.buf,de); - if (!de) return NULL; - return dictGetEntryVal(de); + /* printf("lookup '%s' => %p\n", keyname.buf,de); */ + return lookupKeyRead(db,&keyobj); } /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with @@ -2594,7 +2998,6 @@ static int sortCompare(const void *s1, const void *s2) { /* The SORT command is the most complex command in Redis. Warning: this code * is optimized for speed and a bit less for readability */ static void sortCommand(redisClient *c) { - dictEntry *de; list *operations; int outputlen = 0; int desc = 0, alpha = 0; @@ -2605,14 +3008,13 @@ static void sortCommand(redisClient *c) { redisSortObject *vector; /* Resulting vector to sort */ /* Lookup the key to sort. It must be of the right types */ - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { - addReply(c,shared.nokeyerrbulk); + sortval = lookupKeyRead(c->db,c->argv[1]); + if (sortval == NULL) { + addReply(c,shared.nokeyerr); return; } - sortval = dictGetEntryVal(de); if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) { - addReply(c,shared.wrongtypeerrbulk); + addReply(c,shared.wrongtypeerr); return; } @@ -2666,7 +3068,7 @@ static void sortCommand(redisClient *c) { } else { decrRefCount(sortval); listRelease(operations); - addReply(c,shared.syntaxerrbulk); + addReply(c,shared.syntaxerr); return; } j++; @@ -2675,7 +3077,7 @@ static void sortCommand(redisClient *c) { /* Load the sorting vector with all the objects to sort */ vectorlen = (sortval->type == REDIS_LIST) ? listLength((list*)sortval->ptr) : - dictGetHashTableUsed((dict*)sortval->ptr); + dictSize((dict*)sortval->ptr); vector = zmalloc(sizeof(redisSortObject)*vectorlen); if (!vector) oom("allocating objects vector for SORT"); j = 0; @@ -2713,7 +3115,7 @@ static void sortCommand(redisClient *c) { if (sortby) { robj *byval; - byval = lookupKeyByPattern(c->dict,sortby,vector[j].obj); + byval = lookupKeyByPattern(c->db,sortby,vector[j].obj); if (!byval || byval->type != REDIS_STRING) continue; if (alpha) { vector[j].u.cmpobj = byval; @@ -2747,25 +3149,25 @@ static void sortCommand(redisClient *c) { /* Send command output to the output buffer, performing the specified * GET/DEL/INCR/DECR operations if any. */ outputlen = getop ? getop*(end-start+1) : end-start+1; - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",outputlen)); + addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen)); for (j = start; j <= end; j++) { listNode *ln = operations->head; if (!getop) { - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n", + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n", sdslen(vector[j].obj->ptr))); addReply(c,vector[j].obj); addReply(c,shared.crlf); } while(ln) { redisSortOperation *sop = ln->value; - robj *val = lookupKeyByPattern(c->dict,sop->pattern, + robj *val = lookupKeyByPattern(c->db,sop->pattern, vector[j].obj); if (sop->type == REDIS_SORT_GET) { if (!val || val->type != REDIS_STRING) { - addReply(c,shared.minus1); + addReply(c,shared.nullbulk); } else { - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n", + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n", sdslen(val->ptr))); addReply(c,val); addReply(c,shared.crlf); @@ -2813,11 +3215,103 @@ static void infoCommand(redisClient *c) { uptime, uptime/(3600*24) ); - addReplySds(c,sdscatprintf(sdsempty(),"%d\r\n",sdslen(info))); + addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info))); addReplySds(c,info); addReply(c,shared.crlf); } +static void monitorCommand(redisClient *c) { + /* ignore MONITOR if aleady slave or in monitor mode */ + if (c->flags & REDIS_SLAVE) return; + + c->flags |= (REDIS_SLAVE|REDIS_MONITOR); + c->slaveseldb = 0; + if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail"); + addReply(c,shared.ok); +} + +/* ================================= Expire ================================= */ +static int removeExpire(redisDb *db, robj *key) { + if (dictDelete(db->expires,key) == DICT_OK) { + return 1; + } else { + return 0; + } +} + +static int setExpire(redisDb *db, robj *key, time_t when) { + if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) { + return 0; + } else { + incrRefCount(key); + return 1; + } +} + +/* Return the expire time of the specified key, or -1 if no expire + * is associated with this key (i.e. the key is non volatile) */ +static time_t getExpire(redisDb *db, robj *key) { + dictEntry *de; + + /* No expire? return ASAP */ + if (dictSize(db->expires) == 0 || + (de = dictFind(db->expires,key)) == NULL) return -1; + + return (time_t) dictGetEntryVal(de); +} + +static int expireIfNeeded(redisDb *db, robj *key) { + time_t when; + dictEntry *de; + + /* No expire? return ASAP */ + if (dictSize(db->expires) == 0 || + (de = dictFind(db->expires,key)) == NULL) return 0; + + /* Lookup the expire */ + when = (time_t) dictGetEntryVal(de); + if (time(NULL) <= when) return 0; + + /* Delete the key */ + dictDelete(db->expires,key); + return dictDelete(db->dict,key) == DICT_OK; +} + +static int deleteIfVolatile(redisDb *db, robj *key) { + dictEntry *de; + + /* No expire? return ASAP */ + if (dictSize(db->expires) == 0 || + (de = dictFind(db->expires,key)) == NULL) return 0; + + /* Delete the key */ + server.dirty++; + dictDelete(db->expires,key); + return dictDelete(db->dict,key) == DICT_OK; +} + +static void expireCommand(redisClient *c) { + dictEntry *de; + int seconds = atoi(c->argv[2]->ptr); + + de = dictFind(c->db->dict,c->argv[1]); + if (de == NULL) { + addReply(c,shared.czero); + return; + } + if (seconds <= 0) { + addReply(c, shared.czero); + return; + } else { + time_t when = time(NULL)+seconds; + if (setExpire(c->db,c->argv[1],when)) + addReply(c,shared.cone); + else + addReply(c,shared.czero); + return; + } +} + /* =============================== Replication ============================= */ /* Send the whole output buffer syncronously to the slave. This a general operation in theory, but it is actually useful only for replication. */ @@ -2837,7 +3331,7 @@ static int flushClientOutput(redisClient *c) { return REDIS_OK; } -static int syncWrite(int fd, void *ptr, ssize_t size, int timeout) { +static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) { ssize_t nwritten, ret = size; time_t start = time(NULL); @@ -2857,7 +3351,7 @@ static int syncWrite(int fd, void *ptr, ssize_t size, int timeout) { return ret; } -static int syncRead(int fd, void *ptr, ssize_t size, int timeout) { +static int syncRead(int fd, char *ptr, ssize_t size, int timeout) { ssize_t nread, totread = 0; time_t start = time(NULL); @@ -2905,15 +3399,19 @@ static void syncCommand(redisClient *c) { time_t start = time(NULL); char sizebuf[32]; + /* ignore SYNC if aleady slave or in monitor mode */ + if (c->flags & REDIS_SLAVE) return; + redisLog(REDIS_NOTICE,"Slave ask for syncronization"); - if (flushClientOutput(c) == REDIS_ERR || saveDb(server.dbfilename) != REDIS_OK) + if (flushClientOutput(c) == REDIS_ERR || + rdbSave(server.dbfilename) != REDIS_OK) goto closeconn; fd = open(server.dbfilename, O_RDONLY); if (fd == -1 || fstat(fd,&sb) == -1) goto closeconn; len = sb.st_size; - snprintf(sizebuf,32,"%d\r\n",len); + snprintf(sizebuf,32,"$%d\r\n",len); if (syncWrite(c->fd,sizebuf,strlen(sizebuf),5) == -1) goto closeconn; while(len) { char buf[1024]; @@ -2965,7 +3463,7 @@ static int syncWithMaster(void) { strerror(errno)); return REDIS_ERR; } - dumpsize = atoi(buf); + dumpsize = atoi(buf+1); redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize); /* Read the bulk write data on a temp file */ snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random()); @@ -3003,7 +3501,7 @@ static int syncWithMaster(void) { return REDIS_ERR; } emptyDb(); - if (loadDb(server.dbfilename) != REDIS_OK) { + if (rdbLoad(server.dbfilename) != REDIS_OK) { redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); close(fd); return REDIS_ERR; @@ -3033,7 +3531,7 @@ static void daemonize(void) { if (fd > STDERR_FILENO) close(fd); } /* Try to write the pid file */ - fp = fopen("/var/run/redis.pid","w"); + fp = fopen(server.pidfile,"w"); if (fp) { fprintf(fp,"%d\n",getpid()); fclose(fp); @@ -3052,11 +3550,11 @@ int main(int argc, char **argv) { initServer(); if (server.daemonize) daemonize(); redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION); - if (loadDb(server.dbfilename) == REDIS_OK) + if (rdbLoad(server.dbfilename) == REDIS_OK) redisLog(REDIS_NOTICE,"DB loaded from disk"); if (aeCreateFileEvent(server.el, server.fd, AE_READABLE, acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event"); - redisLog(REDIS_NOTICE,"The server is now ready to accept connections"); + redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); aeMain(server.el); aeDeleteEventLoop(server.el); return 0;