X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/7b45bfb2a4f9bc0296d9126274010a11d1073e8d..5a6e8b1daa259141917ccc83cb3851614efac615:/redis.c diff --git a/redis.c b/redis.c index bb7737b5..05a623d5 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,9 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "0.08" +#define REDIS_VERSION "0.09" + +#include "fmacros.h" #include #include @@ -46,6 +48,7 @@ #include #include #include +#include #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ @@ -53,6 +56,7 @@ #include "dict.h" /* Hash tables */ #include "adlist.h" /* Linked lists */ #include "zmalloc.h" /* total memory usage aware version of malloc/free */ +#include "lzf.h" /* Error codes */ #define REDIS_OK 0 @@ -68,6 +72,7 @@ #define REDIS_CONFIGLINE_MAX 1024 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */ #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */ +#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */ /* Hash table parameters */ #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */ @@ -82,9 +87,39 @@ #define REDIS_LIST 1 #define REDIS_SET 2 #define REDIS_HASH 3 + +/* Object types only used for dumping to disk */ +#define REDIS_EXPIRETIME 253 #define REDIS_SELECTDB 254 #define REDIS_EOF 255 +/* Defines related to the dump file format. To store 32 bits lengths for short + * keys requires a lot of space, so we check the most significant 2 bits of + * the first byte to interpreter the length: + * + * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte + * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte + * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow + * 11|000000 this means: specially encoded object will follow. The six bits + * number specify the kind of object that follows. + * See the REDIS_RDB_ENC_* defines. + * + * Lenghts up to 63 are stored using a single byte, most DB keys, and may + * values, will fit inside. */ +#define REDIS_RDB_6BITLEN 0 +#define REDIS_RDB_14BITLEN 1 +#define REDIS_RDB_32BITLEN 2 +#define REDIS_RDB_ENCVAL 3 +#define REDIS_RDB_LENERR UINT_MAX + +/* When a length of a string object stored on disk has the first two bits + * set, the remaining two bits specify a special encoding for the object + * accordingly to the following defines: */ +#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */ +#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */ +#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */ +#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */ + /* Client flags */ #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */ #define REDIS_SLAVE 2 /* This client is a slave server */ @@ -121,16 +156,22 @@ /* A redis object, that is a type able to hold a string / list / set */ typedef struct redisObject { - int type; void *ptr; + int type; int refcount; } robj; +typedef struct redisDb { + dict *dict; + dict *expires; + int id; +} redisDb; + /* With multiplexing we need to take per-clinet state. * Clients are taken in a liked list. */ typedef struct redisClient { int fd; - dict *dict; + redisDb *db; int dictid; sds querybuf; robj *argv[REDIS_MAX_ARGS]; @@ -141,6 +182,7 @@ typedef struct redisClient { time_t lastinteraction; /* time of the last interaction, used for timeout */ int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */ int slaveseldb; /* slave selected db, if this client is a slave */ + int authenticated; /* when requirepass is non-NULL */ } redisClient; struct saveparam { @@ -152,7 +194,9 @@ struct saveparam { struct redisServer { int port; int fd; - dict **dict; + redisDb *db; + dict *sharingpool; + unsigned int sharingpoolsize; long long dirty; /* changes to DB from the last save */ list *clients; list *slaves, *monitors; @@ -161,7 +205,7 @@ struct redisServer { int cronloops; /* number of times the cron function run */ list *objfreelist; /* A list of freed objects to avoid malloc() */ time_t lastsave; /* Unix time of last save succeeede */ - int usedmemory; /* Used memory in megabytes */ + size_t usedmemory; /* Used memory in megabytes */ /* Fields used only for stats */ time_t stat_starttime; /* server start time */ long long stat_numcommands; /* number of processed commands */ @@ -179,6 +223,8 @@ struct redisServer { char *logfile; char *bindaddr; char *dbfilename; + char *requirepass; + int shareobjects; /* Replication related */ int isslave; char *masterhost; @@ -230,15 +276,23 @@ static void freeSetObject(robj *o); static void decrRefCount(void *o); static robj *createObject(int type, void *ptr); static void freeClient(redisClient *c); -static int loadDb(char *filename); +static int rdbLoad(char *filename); static void addReply(redisClient *c, robj *obj); static void addReplySds(redisClient *c, sds s); static void incrRefCount(robj *o); -static int saveDbBackground(char *filename); +static int rdbSaveBackground(char *filename); static robj *createStringObject(char *ptr, size_t len); static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc); static int syncWithMaster(void); - +static robj *tryObjectSharing(robj *o); +static int removeExpire(redisDb *db, robj *key); +static int expireIfNeeded(redisDb *db, robj *key); +static int deleteIfVolatile(redisDb *db, robj *key); +static int deleteKey(redisDb *db, robj *key); +static time_t getExpire(redisDb *db, robj *key); +static int setExpire(redisDb *db, robj *key, time_t when); + +static void authCommand(redisClient *c); static void pingCommand(redisClient *c); static void echoCommand(redisClient *c); static void setCommand(redisClient *c); @@ -285,6 +339,7 @@ static void lremCommand(redisClient *c); static void infoCommand(redisClient *c); static void mgetCommand(redisClient *c); static void monitorCommand(redisClient *c); +static void expireCommand(redisClient *c); /*================================= Globals ================================= */ @@ -325,6 +380,7 @@ static struct redisCommand cmdTable[] = { {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE}, {"keys",keysCommand,2,REDIS_CMD_INLINE}, {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE}, + {"auth",authCommand,2,REDIS_CMD_INLINE}, {"ping",pingCommand,1,REDIS_CMD_INLINE}, {"echo",echoCommand,2,REDIS_CMD_BULK}, {"save",saveCommand,1,REDIS_CMD_INLINE}, @@ -338,6 +394,7 @@ static struct redisCommand cmdTable[] = { {"sort",sortCommand,-2,REDIS_CMD_INLINE}, {"info",infoCommand,1,REDIS_CMD_INLINE}, {"monitor",monitorCommand,1,REDIS_CMD_INLINE}, + {"expire",expireCommand,3,REDIS_CMD_INLINE}, {NULL,NULL,0,0} }; @@ -577,7 +634,7 @@ void closeTimedoutClients(void) { } int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { - int j, size, used, loops = server.cronloops++; + int j, loops = server.cronloops++; REDIS_NOTUSED(eventLoop); REDIS_NOTUSED(id); REDIS_NOTUSED(clientData); @@ -588,26 +645,30 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL * we resize the hash table to save memory */ for (j = 0; j < server.dbnum; j++) { - size = dictGetHashTableSize(server.dict[j]); - used = dictGetHashTableUsed(server.dict[j]); + int size, used, vkeys; + + size = dictSlots(server.db[j].dict); + used = dictSize(server.db[j].dict); + vkeys = dictSize(server.db[j].expires); if (!(loops % 5) && used > 0) { - redisLog(REDIS_DEBUG,"DB %d: %d keys in %d slots HT.",j,used,size); - // dictPrintStats(server.dict); + redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size); + /* dictPrintStats(server.dict); */ } if (size && used && size > REDIS_HT_MINSLOTS && (used*100/size < REDIS_HT_MINFILL)) { redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j); - dictResize(server.dict[j]); + dictResize(server.db[j].dict); redisLog(REDIS_NOTICE,"Hash table %d resized.",j); } } /* Show information about connected clients */ if (!(loops % 5)) { - redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %d bytes in use", + redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), - server.usedmemory); + server.usedmemory, + dictSize(server.sharingpool)); } /* Close connections of timedout clients */ @@ -641,11 +702,35 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { now-server.lastsave > sp->seconds) { redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, sp->seconds); - saveDbBackground(server.dbfilename); + rdbSaveBackground(server.dbfilename); break; } } } + + /* Try to expire a few timed out keys */ + for (j = 0; j < server.dbnum; j++) { + redisDb *db = server.db+j; + int num = dictSize(db->expires); + + if (num) { + time_t now = time(NULL); + + if (num > REDIS_EXPIRELOOKUPS_PER_CRON) + num = REDIS_EXPIRELOOKUPS_PER_CRON; + while (num--) { + dictEntry *de; + time_t t; + + if ((de = dictGetRandomKey(db->expires)) == NULL) break; + t = (time_t) dictGetEntryVal(de); + if (now > t) { + deleteKey(db,dictGetEntryKey(de)); + } + } + } + } + /* Check if we should connect to a MASTER */ if (server.replstate == REDIS_REPL_CONNECT) { redisLog(REDIS_NOTICE,"Connecting to MASTER..."); @@ -719,6 +804,8 @@ static void initServerConfig() { server.daemonize = 0; server.pidfile = "/var/run/redis.pid"; server.dbfilename = "dump.rdb"; + server.requirepass = NULL; + server.shareobjects = 0; ResetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -744,8 +831,10 @@ static void initServer() { server.objfreelist = listCreate(); createSharedObjects(); server.el = aeCreateEventLoop(); - server.dict = zmalloc(sizeof(dict*)*server.dbnum); - if (!server.dict || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist) + server.db = zmalloc(sizeof(redisDb)*server.dbnum); + server.sharingpool = dictCreate(&setDictType,NULL); + server.sharingpoolsize = 1024; + if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist) oom("server initialization"); /* Fatal OOM */ server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); if (server.fd == -1) { @@ -753,9 +842,9 @@ static void initServer() { exit(1); } for (j = 0; j < server.dbnum; j++) { - server.dict[j] = dictCreate(&hashDictType,NULL); - if (!server.dict[j]) - oom("dictCreate"); /* Fatal OOM */ + server.db[j].dict = dictCreate(&hashDictType,NULL); + server.db[j].expires = dictCreate(&setDictType,NULL); + server.db[j].id = j; } server.cronloops = 0; server.bgsaveinprogress = 0; @@ -772,8 +861,10 @@ static void initServer() { static void emptyDb() { int j; - for (j = 0; j < server.dbnum; j++) - dictEmpty(server.dict[j]); + for (j = 0; j < server.dbnum; j++) { + dictEmpty(server.db[j].dict); + dictEmpty(server.db[j].expires); + } } /* I agree, this is a very rudimental way to load a configuration... @@ -875,6 +966,13 @@ static void loadServerConfig(char *filename) { else { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcmp(argv[0],"shareobjects") && argc == 2) { + sdstolower(argv[1]); + if (!strcmp(argv[1],"yes")) server.shareobjects = 1; + else if (!strcmp(argv[1],"no")) server.shareobjects = 0; + else { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcmp(argv[0],"daemonize") && argc == 2) { sdstolower(argv[1]); if (!strcmp(argv[1],"yes")) server.daemonize = 1; @@ -882,6 +980,8 @@ static void loadServerConfig(char *filename) { else { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcmp(argv[0],"requirepass") && argc == 2) { + server.requirepass = zstrdup(argv[1]); } else if (!strcmp(argv[0],"pidfile") && argc == 2) { server.pidfile = zstrdup(argv[1]); } else { @@ -988,7 +1088,7 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) if (c->flags & REDIS_MASTER) { nwritten = objlen - c->sentlen; } else { - nwritten = write(fd, o->ptr+c->sentlen, objlen - c->sentlen); + nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen); if (nwritten <= 0) break; } c->sentlen += nwritten; @@ -1082,13 +1182,26 @@ static int processCommand(redisClient *c) { return 1; } } + /* Let's try to share objects on the command arguments vector */ + if (server.shareobjects) { + int j; + for(j = 1; j < c->argc; j++) + c->argv[j] = tryObjectSharing(c->argv[j]); + } + /* Check if the user is authenticated */ + if (server.requirepass && !c->authenticated && cmd->proc != authCommand) { + addReplySds(c,sdsnew("-ERR operation not permitted\r\n")); + resetClient(c); + return 1; + } + /* Exec the command */ dirty = server.dirty; cmd->proc(c); if (server.dirty-dirty != 0 && listLength(server.slaves)) - replicationFeedSlaves(server.slaves,cmd,c->dictid,c->argv,c->argc); + replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc); if (listLength(server.monitors)) - replicationFeedSlaves(server.monitors,cmd,c->dictid,c->argv,c->argc); + replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc); server.stat_numcommands++; /* Prepare the client for the next command */ @@ -1246,8 +1359,7 @@ again: static int selectDb(redisClient *c, int id) { if (id < 0 || id >= server.dbnum) return REDIS_ERR; - c->dict = server.dict[id]; - c->dictid = id; + c->db = &server.db[id]; return REDIS_OK; } @@ -1265,6 +1377,7 @@ static redisClient *createClient(int fd) { c->sentlen = 0; c->flags = 0; c->lastinteraction = time(NULL); + c->authenticated = 0; if ((c->reply = listCreate()) == NULL) oom("listCreate"); listSetFreeMethod(c->reply,decrRefCount); if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, @@ -1348,14 +1461,6 @@ static robj *createSetObject(void) { return createObject(REDIS_SET,d); } -#if 0 -static robj *createHashObject(void) { - dict *d = dictCreate(&hashDictType,NULL); - if (!d) oom("dictCreate"); - return createObject(REDIS_SET,d); -} -#endif - static void freeStringObject(robj *o) { sdsfree(o->ptr); } @@ -1374,10 +1479,19 @@ static void freeHashObject(robj *o) { static void incrRefCount(robj *o) { o->refcount++; +#ifdef DEBUG_REFCOUNT + if (o->type == REDIS_STRING) + printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount); +#endif } static void decrRefCount(void *obj) { robj *o = obj; + +#ifdef DEBUG_REFCOUNT + if (o->type == REDIS_STRING) + printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1); +#endif if (--(o->refcount) == 0) { switch(o->type) { case REDIS_STRING: freeStringObject(o); break; @@ -1392,17 +1506,223 @@ static void decrRefCount(void *obj) { } } +/* Try to share an object against the shared objects pool */ +static robj *tryObjectSharing(robj *o) { + struct dictEntry *de; + unsigned long c; + + if (o == NULL || server.shareobjects == 0) return o; + + assert(o->type == REDIS_STRING); + de = dictFind(server.sharingpool,o); + if (de) { + robj *shared = dictGetEntryKey(de); + + c = ((unsigned long) dictGetEntryVal(de))+1; + dictGetEntryVal(de) = (void*) c; + incrRefCount(shared); + decrRefCount(o); + return shared; + } else { + /* Here we are using a stream algorihtm: Every time an object is + * shared we increment its count, everytime there is a miss we + * recrement the counter of a random object. If this object reaches + * zero we remove the object and put the current object instead. */ + if (dictSize(server.sharingpool) >= + server.sharingpoolsize) { + de = dictGetRandomKey(server.sharingpool); + assert(de != NULL); + c = ((unsigned long) dictGetEntryVal(de))-1; + dictGetEntryVal(de) = (void*) c; + if (c == 0) { + dictDelete(server.sharingpool,de->key); + } + } else { + c = 0; /* If the pool is empty we want to add this object */ + } + if (c == 0) { + int retval; + + retval = dictAdd(server.sharingpool,o,(void*)1); + assert(retval == DICT_OK); + incrRefCount(o); + } + return o; + } +} + +static robj *lookupKey(redisDb *db, robj *key) { + dictEntry *de = dictFind(db->dict,key); + return de ? dictGetEntryVal(de) : NULL; +} + +static robj *lookupKeyRead(redisDb *db, robj *key) { + expireIfNeeded(db,key); + return lookupKey(db,key); +} + +static robj *lookupKeyWrite(redisDb *db, robj *key) { + deleteIfVolatile(db,key); + return lookupKey(db,key); +} + +static int deleteKey(redisDb *db, robj *key) { + int retval; + + /* We need to protect key from destruction: after the first dictDelete() + * it may happen that 'key' is no longer valid if we don't increment + * it's count. This may happen when we get the object reference directly + * from the hash table with dictRandomKey() or dict iterators */ + incrRefCount(key); + if (dictSize(db->expires)) dictDelete(db->expires,key); + retval = dictDelete(db->dict,key); + decrRefCount(key); + + return retval == DICT_OK; +} + /*============================ DB saving/loading ============================ */ +static int rdbSaveType(FILE *fp, unsigned char type) { + if (fwrite(&type,1,1,fp) == 0) return -1; + return 0; +} + +static int rdbSaveTime(FILE *fp, time_t t) { + int32_t t32 = (int32_t) t; + if (fwrite(&t32,4,1,fp) == 0) return -1; + return 0; +} + +/* check rdbLoadLen() comments for more info */ +static int rdbSaveLen(FILE *fp, uint32_t len) { + unsigned char buf[2]; + + if (len < (1<<6)) { + /* Save a 6 bit len */ + buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6); + if (fwrite(buf,1,1,fp) == 0) return -1; + } else if (len < (1<<14)) { + /* Save a 14 bit len */ + buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6); + buf[1] = len&0xFF; + if (fwrite(buf,2,1,fp) == 0) return -1; + } else { + /* Save a 32 bit len */ + buf[0] = (REDIS_RDB_32BITLEN<<6); + if (fwrite(buf,1,1,fp) == 0) return -1; + len = htonl(len); + if (fwrite(&len,4,1,fp) == 0) return -1; + } + return 0; +} + +/* String objects in the form "2391" "-100" without any space and with a + * range of values that can fit in an 8, 16 or 32 bit signed value can be + * encoded as integers to save space */ +int rdbTryIntegerEncoding(sds s, unsigned char *enc) { + long long value; + char *endptr, buf[32]; + + /* Check if it's possible to encode this value as a number */ + value = strtoll(s, &endptr, 10); + if (endptr[0] != '\0') return 0; + snprintf(buf,32,"%lld",value); + + /* If the number converted back into a string is not identical + * then it's not possible to encode the string as integer */ + if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0; + + /* Finally check if it fits in our ranges */ + if (value >= -(1<<7) && value <= (1<<7)-1) { + enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8; + enc[1] = value&0xFF; + return 2; + } else if (value >= -(1<<15) && value <= (1<<15)-1) { + enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16; + enc[1] = value&0xFF; + enc[2] = (value>>8)&0xFF; + return 3; + } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) { + enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32; + enc[1] = value&0xFF; + enc[2] = (value>>8)&0xFF; + enc[3] = (value>>16)&0xFF; + enc[4] = (value>>24)&0xFF; + return 5; + } else { + return 0; + } +} + +static int rdbSaveLzfStringObject(FILE *fp, robj *obj) { + unsigned int comprlen, outlen; + unsigned char byte; + void *out; + + /* We require at least four bytes compression for this to be worth it */ + outlen = sdslen(obj->ptr)-4; + if (outlen <= 0) return 0; + if ((out = zmalloc(outlen)) == NULL) return 0; + comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen); + if (comprlen == 0) { + zfree(out); + return 0; + } + /* Data compressed! Let's save it on disk */ + byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF; + if (fwrite(&byte,1,1,fp) == 0) goto writeerr; + if (rdbSaveLen(fp,comprlen) == -1) goto writeerr; + if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr; + if (fwrite(out,comprlen,1,fp) == 0) goto writeerr; + zfree(out); + return comprlen; + +writeerr: + zfree(out); + return -1; +} + +/* Save a string objet as [len][data] on disk. If the object is a string + * representation of an integer value we try to safe it in a special form */ +static int rdbSaveStringObject(FILE *fp, robj *obj) { + size_t len = sdslen(obj->ptr); + int enclen; + + /* Try integer encoding */ + if (len <= 11) { + unsigned char buf[5]; + if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) { + if (fwrite(buf,enclen,1,fp) == 0) return -1; + return 0; + } + } + + /* Try LZF compression - under 20 bytes it's unable to compress even + * aaaaaaaaaaaaaaaaaa so skip it */ + if (len > 20) { + int retval; + + retval = rdbSaveLzfStringObject(fp,obj); + if (retval == -1) return -1; + if (retval > 0) return 0; + /* retval == 0 means data can't be compressed, save the old way */ + } + + /* Store verbatim */ + if (rdbSaveLen(fp,len) == -1) return -1; + if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1; + return 0; +} + /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */ -static int saveDb(char *filename) { +static int rdbSave(char *filename) { dictIterator *di = NULL; dictEntry *de; - uint32_t len; - uint8_t type; FILE *fp; char tmpfile[256]; int j; + time_t now = time(NULL); snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random()); fp = fopen(tmpfile,"w"); @@ -1410,10 +1730,11 @@ static int saveDb(char *filename) { redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno)); return REDIS_ERR; } - if (fwrite("REDIS0000",9,1,fp) == 0) goto werr; + if (fwrite("REDIS0001",9,1,fp) == 0) goto werr; for (j = 0; j < server.dbnum; j++) { - dict *d = server.dict[j]; - if (dictGetHashTableUsed(d) == 0) continue; + redisDb *db = server.db+j; + dict *d = db->dict; + if (dictSize(d) == 0) continue; di = dictGetIterator(d); if (!di) { fclose(fp); @@ -1421,60 +1742,52 @@ static int saveDb(char *filename) { } /* Write the SELECT DB opcode */ - type = REDIS_SELECTDB; - len = htonl(j); - if (fwrite(&type,1,1,fp) == 0) goto werr; - if (fwrite(&len,4,1,fp) == 0) goto werr; + if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr; + if (rdbSaveLen(fp,j) == -1) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { robj *key = dictGetEntryKey(de); robj *o = dictGetEntryVal(de); - - type = o->type; - len = htonl(sdslen(key->ptr)); - if (fwrite(&type,1,1,fp) == 0) goto werr; - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (fwrite(key->ptr,sdslen(key->ptr),1,fp) == 0) goto werr; - if (type == REDIS_STRING) { + time_t expiretime = getExpire(db,key); + + /* Save the expire time */ + if (expiretime != -1) { + /* If this key is already expired skip it */ + if (expiretime < now) continue; + if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr; + if (rdbSaveTime(fp,expiretime) == -1) goto werr; + } + /* Save the key and associated value */ + if (rdbSaveType(fp,o->type) == -1) goto werr; + if (rdbSaveStringObject(fp,key) == -1) goto werr; + if (o->type == REDIS_STRING) { /* Save a string value */ - sds sval = o->ptr; - len = htonl(sdslen(sval)); - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (sdslen(sval) && - fwrite(sval,sdslen(sval),1,fp) == 0) goto werr; - } else if (type == REDIS_LIST) { + if (rdbSaveStringObject(fp,o) == -1) goto werr; + } else if (o->type == REDIS_LIST) { /* Save a list value */ list *list = o->ptr; listNode *ln = list->head; - len = htonl(listLength(list)); - if (fwrite(&len,4,1,fp) == 0) goto werr; + if (rdbSaveLen(fp,listLength(list)) == -1) goto werr; while(ln) { robj *eleobj = listNodeValue(ln); - len = htonl(sdslen(eleobj->ptr)); - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (sdslen(eleobj->ptr) && fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0) - goto werr; + + if (rdbSaveStringObject(fp,eleobj) == -1) goto werr; ln = ln->next; } - } else if (type == REDIS_SET) { + } else if (o->type == REDIS_SET) { /* Save a set value */ dict *set = o->ptr; dictIterator *di = dictGetIterator(set); dictEntry *de; if (!set) oom("dictGetIteraotr"); - len = htonl(dictGetHashTableUsed(set)); - if (fwrite(&len,4,1,fp) == 0) goto werr; + if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr; while((de = dictNext(di)) != NULL) { - robj *eleobj; + robj *eleobj = dictGetEntryKey(de); - eleobj = dictGetEntryKey(de); - len = htonl(sdslen(eleobj->ptr)); - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (sdslen(eleobj->ptr) && fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0) - goto werr; + if (rdbSaveStringObject(fp,eleobj) == -1) goto werr; } dictReleaseIterator(di); } else { @@ -1484,8 +1797,9 @@ static int saveDb(char *filename) { dictReleaseIterator(di); } /* EOF opcode */ - type = REDIS_EOF; - if (fwrite(&type,1,1,fp) == 0) goto werr; + if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr; + + /* Make sure data will not remain on the OS's output buffers */ fflush(fp); fsync(fileno(fp)); fclose(fp); @@ -1510,14 +1824,14 @@ werr: return REDIS_ERR; } -static int saveDbBackground(char *filename) { +static int rdbSaveBackground(char *filename) { pid_t childpid; if (server.bgsaveinprogress) return REDIS_ERR; if ((childpid = fork()) == 0) { /* Child */ close(server.fd); - if (saveDb(filename) == REDIS_OK) { + if (rdbSave(filename) == REDIS_OK) { exit(0); } else { exit(1); @@ -1531,90 +1845,191 @@ static int saveDbBackground(char *filename) { return REDIS_OK; /* unreached */ } -static int loadType(FILE *fp) { - uint8_t type; +static int rdbLoadType(FILE *fp) { + unsigned char type; if (fread(&type,1,1,fp) == 0) return -1; return type; } -static int loadDb(char *filename) { +static time_t rdbLoadTime(FILE *fp) { + int32_t t32; + if (fread(&t32,4,1,fp) == 0) return -1; + return (time_t) t32; +} + +/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top + * of this file for a description of how this are stored on disk. + * + * isencoded is set to 1 if the readed length is not actually a length but + * an "encoding type", check the above comments for more info */ +static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) { + unsigned char buf[2]; + uint32_t len; + + if (isencoded) *isencoded = 0; + if (rdbver == 0) { + if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR; + return ntohl(len); + } else { + int type; + + if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR; + type = (buf[0]&0xC0)>>6; + if (type == REDIS_RDB_6BITLEN) { + /* Read a 6 bit len */ + return buf[0]&0x3F; + } else if (type == REDIS_RDB_ENCVAL) { + /* Read a 6 bit len encoding type */ + if (isencoded) *isencoded = 1; + return buf[0]&0x3F; + } else if (type == REDIS_RDB_14BITLEN) { + /* Read a 14 bit len */ + if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR; + return ((buf[0]&0x3F)<<8)|buf[1]; + } else { + /* Read a 32 bit len */ + if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR; + return ntohl(len); + } + } +} + +static robj *rdbLoadIntegerObject(FILE *fp, int enctype) { + unsigned char enc[4]; + long long val; + + if (enctype == REDIS_RDB_ENC_INT8) { + if (fread(enc,1,1,fp) == 0) return NULL; + val = (signed char)enc[0]; + } else if (enctype == REDIS_RDB_ENC_INT16) { + uint16_t v; + if (fread(enc,2,1,fp) == 0) return NULL; + v = enc[0]|(enc[1]<<8); + val = (int16_t)v; + } else if (enctype == REDIS_RDB_ENC_INT32) { + uint32_t v; + if (fread(enc,4,1,fp) == 0) return NULL; + v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24); + val = (int32_t)v; + } else { + val = 0; /* anti-warning */ + assert(0!=0); + } + return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val)); +} + +static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) { + unsigned int len, clen; + unsigned char *c = NULL; + sds val = NULL; + + if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((c = zmalloc(clen)) == NULL) goto err; + if ((val = sdsnewlen(NULL,len)) == NULL) goto err; + if (fread(c,clen,1,fp) == 0) goto err; + if (lzf_decompress(c,clen,val,len) == 0) goto err; + return createObject(REDIS_STRING,val); +err: + zfree(c); + sdsfree(val); + return NULL; +} + +static robj *rdbLoadStringObject(FILE*fp, int rdbver) { + int isencoded; + uint32_t len; + sds val; + + len = rdbLoadLen(fp,rdbver,&isencoded); + if (isencoded) { + switch(len) { + case REDIS_RDB_ENC_INT8: + case REDIS_RDB_ENC_INT16: + case REDIS_RDB_ENC_INT32: + return tryObjectSharing(rdbLoadIntegerObject(fp,len)); + case REDIS_RDB_ENC_LZF: + return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver)); + default: + assert(0!=0); + } + } + + if (len == REDIS_RDB_LENERR) return NULL; + val = sdsnewlen(NULL,len); + if (len && fread(val,len,1,fp) == 0) { + sdsfree(val); + return NULL; + } + return tryObjectSharing(createObject(REDIS_STRING,val)); +} + +static int rdbLoad(char *filename) { FILE *fp; - char buf[REDIS_LOADBUF_LEN]; /* Try to use this buffer instead of */ - char vbuf[REDIS_LOADBUF_LEN]; /* malloc() when the element is small */ - char *key = NULL, *val = NULL; - uint32_t klen,vlen,dbid; - int type; - int retval; - dict *d = server.dict[0]; + robj *keyobj = NULL; + uint32_t dbid; + int type, retval, rdbver; + dict *d = server.db[0].dict; + redisDb *db = server.db+0; + char buf[1024]; + time_t expiretime = -1, now = time(NULL); fp = fopen(filename,"r"); if (!fp) return REDIS_ERR; if (fread(buf,9,1,fp) == 0) goto eoferr; - if (memcmp(buf,"REDIS0000",9) != 0) { + buf[9] = '\0'; + if (memcmp(buf,"REDIS",5) != 0) { fclose(fp); redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file"); return REDIS_ERR; } + rdbver = atoi(buf+5); + if (rdbver > 1) { + fclose(fp); + redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver); + return REDIS_ERR; + } while(1) { robj *o; /* Read type. */ - if ((type = loadType(fp)) == -1) goto eoferr; + if ((type = rdbLoadType(fp)) == -1) goto eoferr; + if (type == REDIS_EXPIRETIME) { + if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr; + /* We read the time so we need to read the object type again */ + if ((type = rdbLoadType(fp)) == -1) goto eoferr; + } if (type == REDIS_EOF) break; /* Handle SELECT DB opcode as a special case */ if (type == REDIS_SELECTDB) { - if (fread(&dbid,4,1,fp) == 0) goto eoferr; - dbid = ntohl(dbid); + if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) + goto eoferr; if (dbid >= (unsigned)server.dbnum) { - redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server compiled to handle more than %d databases. Exiting\n", server.dbnum); + redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum); exit(1); } - d = server.dict[dbid]; + db = server.db+dbid; + d = db->dict; continue; } /* Read key */ - if (fread(&klen,4,1,fp) == 0) goto eoferr; - klen = ntohl(klen); - if (klen <= REDIS_LOADBUF_LEN) { - key = buf; - } else { - key = zmalloc(klen); - if (!key) oom("Loading DB from file"); - } - if (fread(key,klen,1,fp) == 0) goto eoferr; + if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr; if (type == REDIS_STRING) { /* Read string value */ - if (fread(&vlen,4,1,fp) == 0) goto eoferr; - vlen = ntohl(vlen); - if (vlen <= REDIS_LOADBUF_LEN) { - val = vbuf; - } else { - val = zmalloc(vlen); - if (!val) oom("Loading DB from file"); - } - if (vlen && fread(val,vlen,1,fp) == 0) goto eoferr; - o = createObject(REDIS_STRING,sdsnewlen(val,vlen)); + if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr; } else if (type == REDIS_LIST || type == REDIS_SET) { /* Read list/set value */ uint32_t listlen; - if (fread(&listlen,4,1,fp) == 0) goto eoferr; - listlen = ntohl(listlen); + + if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) + goto eoferr; o = (type == REDIS_LIST) ? createListObject() : createSetObject(); /* Load every single element of the list/set */ while(listlen--) { robj *ele; - if (fread(&vlen,4,1,fp) == 0) goto eoferr; - vlen = ntohl(vlen); - if (vlen <= REDIS_LOADBUF_LEN) { - val = vbuf; - } else { - val = zmalloc(vlen); - if (!val) oom("Loading DB from file"); - } - if (vlen && fread(val,vlen,1,fp) == 0) goto eoferr; - ele = createObject(REDIS_STRING,sdsnewlen(val,vlen)); + if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr; if (type == REDIS_LIST) { if (!listAddNodeTail((list*)o->ptr,ele)) oom("listAddNodeTail"); @@ -1622,37 +2037,47 @@ static int loadDb(char *filename) { if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR) oom("dictAdd"); } - /* free the temp buffer if needed */ - if (val != vbuf) zfree(val); - val = NULL; } } else { assert(0 != 0); } /* Add the new object in the hash table */ - retval = dictAdd(d,createStringObject(key,klen),o); + retval = dictAdd(d,keyobj,o); if (retval == DICT_ERR) { - redisLog(REDIS_WARNING,"Loading DB, duplicated key found! Unrecoverable error, exiting now."); + redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr); exit(1); } - /* Iteration cleanup */ - if (key != buf) zfree(key); - if (val != vbuf) zfree(val); - key = val = NULL; + /* Set the expire time if needed */ + if (expiretime != -1) { + setExpire(db,keyobj,expiretime); + /* Delete this key if already expired */ + if (expiretime < now) deleteKey(db,keyobj); + expiretime = -1; + } + keyobj = o = NULL; } fclose(fp); return REDIS_OK; eoferr: /* unexpected end of file is handled here with a fatal exit */ - if (key != buf) zfree(key); - if (val != vbuf) zfree(val); - redisLog(REDIS_WARNING,"Short read loading DB. Unrecoverable error, exiting now."); + if (keyobj) decrRefCount(keyobj); + redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now."); exit(1); return REDIS_ERR; /* Just to avoid warning */ } /*================================== Commands =============================== */ +static void authCommand(redisClient *c) { + if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) { + c->authenticated = 1; + addReply(c,shared.ok); + } else { + c->authenticated = 0; + addReply(c,shared.err); + } +} + static void pingCommand(redisClient *c) { addReply(c,shared.pong); } @@ -1669,10 +2094,10 @@ static void echoCommand(redisClient *c) { static void setGenericCommand(redisClient *c, int nx) { int retval; - retval = dictAdd(c->dict,c->argv[1],c->argv[2]); + retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); if (retval == DICT_ERR) { if (!nx) { - dictReplace(c->dict,c->argv[1],c->argv[2]); + dictReplace(c->db->dict,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); } else { addReply(c,shared.czero); @@ -1683,26 +2108,24 @@ static void setGenericCommand(redisClient *c, int nx) { incrRefCount(c->argv[2]); } server.dirty++; + removeExpire(c->db,c->argv[1]); addReply(c, nx ? shared.cone : shared.ok); } static void setCommand(redisClient *c) { - return setGenericCommand(c,0); + setGenericCommand(c,0); } static void setnxCommand(redisClient *c) { - return setGenericCommand(c,1); + setGenericCommand(c,1); } static void getCommand(redisClient *c) { - dictEntry *de; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + robj *o = lookupKeyRead(c->db,c->argv[1]); + + if (o == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_STRING) { addReply(c,shared.wrongtypeerr); } else { @@ -1714,17 +2137,14 @@ static void getCommand(redisClient *c) { } static void mgetCommand(redisClient *c) { - dictEntry *de; int j; addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1)); for (j = 1; j < c->argc; j++) { - de = dictFind(c->dict,c->argv[j]); - if (de == NULL) { + robj *o = lookupKeyRead(c->db,c->argv[j]); + if (o == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_STRING) { addReply(c,shared.nullbulk); } else { @@ -1737,17 +2157,14 @@ static void mgetCommand(redisClient *c) { } static void incrDecrCommand(redisClient *c, int incr) { - dictEntry *de; long long value; int retval; robj *o; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { value = 0; } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_STRING) { value = 0; } else { @@ -1759,9 +2176,10 @@ static void incrDecrCommand(redisClient *c, int incr) { value += incr; o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value)); - retval = dictAdd(c->dict,c->argv[1],o); + retval = dictAdd(c->db->dict,c->argv[1],o); if (retval == DICT_ERR) { - dictReplace(c->dict,c->argv[1],o); + dictReplace(c->db->dict,c->argv[1],o); + removeExpire(c->db,c->argv[1]); } else { incrRefCount(c->argv[1]); } @@ -1772,27 +2190,27 @@ static void incrDecrCommand(redisClient *c, int incr) { } static void incrCommand(redisClient *c) { - return incrDecrCommand(c,1); + incrDecrCommand(c,1); } static void decrCommand(redisClient *c) { - return incrDecrCommand(c,-1); + incrDecrCommand(c,-1); } static void incrbyCommand(redisClient *c) { int incr = atoi(c->argv[2]->ptr); - return incrDecrCommand(c,incr); + incrDecrCommand(c,incr); } static void decrbyCommand(redisClient *c) { int incr = atoi(c->argv[2]->ptr); - return incrDecrCommand(c,-incr); + incrDecrCommand(c,-incr); } /* ========================= Type agnostic commands ========================= */ static void delCommand(redisClient *c) { - if (dictDelete(c->dict,c->argv[1]) == DICT_OK) { + if (deleteKey(c->db,c->argv[1])) { server.dirty++; addReply(c,shared.cone); } else { @@ -1801,20 +2219,14 @@ static void delCommand(redisClient *c) { } static void existsCommand(redisClient *c) { - dictEntry *de; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) - addReply(c,shared.czero); - else - addReply(c,shared.cone); + addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero); } static void selectCommand(redisClient *c) { int id = atoi(c->argv[1]->ptr); if (selectDb(c,id) == REDIS_ERR) { - addReplySds(c,"-ERR invalid DB index\r\n"); + addReplySds(c,sdsnew("-ERR invalid DB index\r\n")); } else { addReply(c,shared.ok); } @@ -1822,9 +2234,13 @@ static void selectCommand(redisClient *c) { static void randomkeyCommand(redisClient *c) { dictEntry *de; - - de = dictGetRandomKey(c->dict); + + while(1) { + de = dictGetRandomKey(c->db->dict); + if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break; + } if (de == NULL) { + addReply(c,shared.plus); addReply(c,shared.crlf); } else { addReply(c,shared.plus); @@ -1841,20 +2257,23 @@ static void keysCommand(redisClient *c) { int numkeys = 0, keyslen = 0; robj *lenobj = createObject(REDIS_STRING,NULL); - di = dictGetIterator(c->dict); + di = dictGetIterator(c->db->dict); if (!di) oom("dictGetIterator"); addReply(c,lenobj); decrRefCount(lenobj); while((de = dictNext(di)) != NULL) { robj *keyobj = dictGetEntryKey(de); + sds key = keyobj->ptr; if ((pattern[0] == '*' && pattern[1] == '\0') || stringmatchlen(pattern,plen,key,sdslen(key),0)) { - if (numkeys != 0) - addReply(c,shared.space); - addReply(c,keyobj); - numkeys++; - keyslen += sdslen(key); + if (expireIfNeeded(c->db,keyobj) == 0) { + if (numkeys != 0) + addReply(c,shared.space); + addReply(c,keyobj); + numkeys++; + keyslen += sdslen(key); + } } } dictReleaseIterator(di); @@ -1864,7 +2283,7 @@ static void keysCommand(redisClient *c) { static void dbsizeCommand(redisClient *c) { addReplySds(c, - sdscatprintf(sdsempty(),":%lu\r\n",dictGetHashTableUsed(c->dict))); + sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict))); } static void lastsaveCommand(redisClient *c) { @@ -1873,15 +2292,13 @@ static void lastsaveCommand(redisClient *c) { } static void typeCommand(redisClient *c) { - dictEntry *de; + robj *o; char *type; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { type = "+none"; } else { - robj *o = dictGetEntryVal(de); - switch(o->type) { case REDIS_STRING: type = "+string"; break; case REDIS_LIST: type = "+list"; break; @@ -1894,7 +2311,11 @@ static void typeCommand(redisClient *c) { } static void saveCommand(redisClient *c) { - if (saveDb(server.dbfilename) == REDIS_OK) { + if (server.bgsaveinprogress) { + addReplySds(c,sdsnew("-ERR background save in progress\r\n")); + return; + } + if (rdbSave(server.dbfilename) == REDIS_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); @@ -1906,7 +2327,7 @@ static void bgsaveCommand(redisClient *c) { addReplySds(c,sdsnew("-ERR background save already in progress\r\n")); return; } - if (saveDbBackground(server.dbfilename) == REDIS_OK) { + if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); @@ -1915,7 +2336,7 @@ static void bgsaveCommand(redisClient *c) { static void shutdownCommand(redisClient *c) { redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); - if (saveDb(server.dbfilename) == REDIS_OK) { + if (rdbSave(server.dbfilename) == REDIS_OK) { if (server.daemonize) { unlink(server.pidfile); } @@ -1928,7 +2349,6 @@ static void shutdownCommand(redisClient *c) { } static void renameGenericCommand(redisClient *c, int nx) { - dictEntry *de; robj *o; /* To use the same key as src and dst is probably an error */ @@ -1937,24 +2357,24 @@ static void renameGenericCommand(redisClient *c, int nx) { return; } - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nokeyerr); return; } - o = dictGetEntryVal(de); incrRefCount(o); - if (dictAdd(c->dict,c->argv[2],o) == DICT_ERR) { + deleteIfVolatile(c->db,c->argv[2]); + if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) { if (nx) { decrRefCount(o); addReply(c,shared.czero); return; } - dictReplace(c->dict,c->argv[2],o); + dictReplace(c->db->dict,c->argv[2],o); } else { incrRefCount(c->argv[2]); } - dictDelete(c->dict,c->argv[1]); + deleteKey(c->db,c->argv[1]); server.dirty++; addReply(c,nx ? shared.cone : shared.ok); } @@ -1968,21 +2388,19 @@ static void renamenxCommand(redisClient *c) { } static void moveCommand(redisClient *c) { - dictEntry *de; - robj *o, *key; - dict *src, *dst; + robj *o; + redisDb *src, *dst; int srcid; /* Obtain source and target DB pointers */ - src = c->dict; - srcid = c->dictid; + src = c->db; + srcid = c->db->id; if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) { addReply(c,shared.outofrangeerr); return; } - dst = c->dict; - c->dict = src; - c->dictid = srcid; + dst = c->db; + selectDb(c,srcid); /* Back to the source DB */ /* If the user is moving using as target the same * DB as the source DB it is probably an error. */ @@ -1992,24 +2410,23 @@ static void moveCommand(redisClient *c) { } /* Check if the element exists and get a reference */ - de = dictFind(c->dict,c->argv[1]); - if (!de) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (!o) { addReply(c,shared.czero); return; } /* Try to add the element to the target DB */ - key = dictGetEntryKey(de); - o = dictGetEntryVal(de); - if (dictAdd(dst,key,o) == DICT_ERR) { + deleteIfVolatile(dst,c->argv[1]); + if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) { addReply(c,shared.czero); return; } - incrRefCount(key); + incrRefCount(c->argv[1]); incrRefCount(o); /* OK! key moved, free the entry in the source DB */ - dictDelete(src,c->argv[1]); + deleteKey(src,c->argv[1]); server.dirty++; addReply(c,shared.cone); } @@ -2017,11 +2434,10 @@ static void moveCommand(redisClient *c) { /* =================================== Lists ================================ */ static void pushGenericCommand(redisClient *c, int where) { robj *lobj; - dictEntry *de; list *list; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + + lobj = lookupKeyWrite(c->db,c->argv[1]); + if (lobj == NULL) { lobj = createListObject(); list = lobj->ptr; if (where == REDIS_HEAD) { @@ -2029,11 +2445,10 @@ static void pushGenericCommand(redisClient *c, int where) { } else { if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail"); } - dictAdd(c->dict,c->argv[1],lobj); + dictAdd(c->db->dict,c->argv[1],lobj); incrRefCount(c->argv[1]); incrRefCount(c->argv[2]); } else { - lobj = dictGetEntryVal(de); if (lobj->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); return; @@ -2059,15 +2474,14 @@ static void rpushCommand(redisClient *c) { } static void llenCommand(redisClient *c) { - dictEntry *de; + robj *o; list *l; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.czero); return; } else { - robj *o = dictGetEntryVal(de); if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2078,15 +2492,13 @@ static void llenCommand(redisClient *c) { } static void lindexCommand(redisClient *c) { - dictEntry *de; + robj *o; int index = atoi(c->argv[2]->ptr); - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2107,15 +2519,13 @@ static void lindexCommand(redisClient *c) { } static void lsetCommand(redisClient *c) { - dictEntry *de; + robj *o; int index = atoi(c->argv[2]->ptr); - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nokeyerr); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2139,14 +2549,12 @@ static void lsetCommand(redisClient *c) { } static void popGenericCommand(redisClient *c, int where) { - dictEntry *de; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + robj *o; + + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2181,16 +2589,14 @@ static void rpopCommand(redisClient *c) { } static void lrangeCommand(redisClient *c) { - dictEntry *de; + robj *o; int start = atoi(c->argv[2]->ptr); int end = atoi(c->argv[3]->ptr); - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nullmultibulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2230,16 +2636,14 @@ static void lrangeCommand(redisClient *c) { } static void ltrimCommand(redisClient *c) { - dictEntry *de; + robj *o; int start = atoi(c->argv[2]->ptr); int end = atoi(c->argv[3]->ptr); - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nokeyerr); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2281,14 +2685,12 @@ static void ltrimCommand(redisClient *c) { } static void lremCommand(redisClient *c) { - dictEntry *de; + robj *o; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nokeyerr); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2304,8 +2706,9 @@ static void lremCommand(redisClient *c) { } ln = fromtail ? list->tail : list->head; while (ln) { - next = fromtail ? ln->prev : ln->next; robj *ele = listNodeValue(ln); + + next = fromtail ? ln->prev : ln->next; if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) { listDelNode(list,ln); server.dirty++; @@ -2322,16 +2725,14 @@ static void lremCommand(redisClient *c) { /* ==================================== Sets ================================ */ static void saddCommand(redisClient *c) { - dictEntry *de; robj *set; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + set = lookupKeyWrite(c->db,c->argv[1]); + if (set == NULL) { set = createSetObject(); - dictAdd(c->dict,c->argv[1],set); + dictAdd(c->db->dict,c->argv[1],set); incrRefCount(c->argv[1]); } else { - set = dictGetEntryVal(de); if (set->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); return; @@ -2347,15 +2748,12 @@ static void saddCommand(redisClient *c) { } static void sremCommand(redisClient *c) { - dictEntry *de; + robj *set; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + set = lookupKeyWrite(c->db,c->argv[1]); + if (set == NULL) { addReply(c,shared.czero); } else { - robj *set; - - set = dictGetEntryVal(de); if (set->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); return; @@ -2370,15 +2768,12 @@ static void sremCommand(redisClient *c) { } static void sismemberCommand(redisClient *c) { - dictEntry *de; + robj *set; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + set = lookupKeyRead(c->db,c->argv[1]); + if (set == NULL) { addReply(c,shared.czero); } else { - robj *set; - - set = dictGetEntryVal(de); if (set->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); return; @@ -2391,21 +2786,20 @@ static void sismemberCommand(redisClient *c) { } static void scardCommand(redisClient *c) { - dictEntry *de; + robj *o; dict *s; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.czero); return; } else { - robj *o = dictGetEntryVal(de); if (o->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); } else { s = o->ptr; addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n", - dictGetHashTableUsed(s))); + dictSize(s))); } } } @@ -2413,7 +2807,7 @@ static void scardCommand(redisClient *c) { static int qsortCompareSetsByCardinality(const void *s1, const void *s2) { dict **d1 = (void*) s1, **d2 = (void*) s2; - return dictGetHashTableUsed(*d1)-dictGetHashTableUsed(*d2); + return dictSize(*d1)-dictSize(*d2); } static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) { @@ -2426,15 +2820,15 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r if (!dv) oom("sinterCommand"); for (j = 0; j < setsnum; j++) { robj *setobj; - dictEntry *de; - - de = dictFind(c->dict,setskeys[j]); - if (!de) { + + setobj = dstkey ? + lookupKeyWrite(c->db,setskeys[j]) : + lookupKeyRead(c->db,setskeys[j]); + if (!setobj) { zfree(dv); addReply(c,shared.nokeyerr); return; } - setobj = dictGetEntryVal(de); if (setobj->type != REDIS_SET) { zfree(dv); addReply(c,shared.wrongtypeerr); @@ -2459,9 +2853,10 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r /* If we have a target key where to store the resulting set * create this key with an empty set inside */ dstset = createSetObject(); - dictDelete(c->dict,dstkey); - dictAdd(c->dict,dstkey,dstset); + deleteKey(c->db,dstkey); + dictAdd(c->db->dict,dstkey,dstset); incrRefCount(dstkey); + server.dirty++; } /* Iterate all the elements of the first (smallest) set, and test @@ -2486,6 +2881,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r } else { dictAdd(dstset->ptr,ele,NULL); incrRefCount(ele); + server.dirty++; } } dictReleaseIterator(di); @@ -2506,15 +2902,18 @@ static void sinterstoreCommand(redisClient *c) { } static void flushdbCommand(redisClient *c) { - dictEmpty(c->dict); + dictEmpty(c->db->dict); + dictEmpty(c->db->expires); + server.dirty++; addReply(c,shared.ok); - saveDb(server.dbfilename); + rdbSave(server.dbfilename); } static void flushallCommand(redisClient *c) { emptyDb(); + server.dirty++; addReply(c,shared.ok); - saveDb(server.dbfilename); + rdbSave(server.dbfilename); } redisSortOperation *createSortOperation(int type, robj *pattern) { @@ -2527,12 +2926,11 @@ redisSortOperation *createSortOperation(int type, robj *pattern) { /* Return the value associated to the key with a name obtained * substituting the first occurence of '*' in 'pattern' with 'subst' */ -robj *lookupKeyByPattern(dict *dict, robj *pattern, robj *subst) { +robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { char *p; sds spat, ssub; robj keyobj; int prefixlen, sublen, postfixlen; - dictEntry *de; /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */ struct { long len; @@ -2540,7 +2938,6 @@ robj *lookupKeyByPattern(dict *dict, robj *pattern, robj *subst) { char buf[REDIS_SORTKEY_MAX+1]; } keyname; - spat = pattern->ptr; ssub = subst->ptr; if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL; @@ -2560,10 +2957,8 @@ robj *lookupKeyByPattern(dict *dict, robj *pattern, robj *subst) { keyobj.type = REDIS_STRING; keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2); - de = dictFind(dict,&keyobj); - // printf("lookup '%s' => %p\n", keyname.buf,de); - if (!de) return NULL; - return dictGetEntryVal(de); + /* printf("lookup '%s' => %p\n", keyname.buf,de); */ + return lookupKeyRead(db,&keyobj); } /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with @@ -2608,7 +3003,6 @@ static int sortCompare(const void *s1, const void *s2) { /* The SORT command is the most complex command in Redis. Warning: this code * is optimized for speed and a bit less for readability */ static void sortCommand(redisClient *c) { - dictEntry *de; list *operations; int outputlen = 0; int desc = 0, alpha = 0; @@ -2619,12 +3013,11 @@ static void sortCommand(redisClient *c) { redisSortObject *vector; /* Resulting vector to sort */ /* Lookup the key to sort. It must be of the right types */ - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + sortval = lookupKeyRead(c->db,c->argv[1]); + if (sortval == NULL) { addReply(c,shared.nokeyerr); return; } - sortval = dictGetEntryVal(de); if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); return; @@ -2689,7 +3082,7 @@ static void sortCommand(redisClient *c) { /* Load the sorting vector with all the objects to sort */ vectorlen = (sortval->type == REDIS_LIST) ? listLength((list*)sortval->ptr) : - dictGetHashTableUsed((dict*)sortval->ptr); + dictSize((dict*)sortval->ptr); vector = zmalloc(sizeof(redisSortObject)*vectorlen); if (!vector) oom("allocating objects vector for SORT"); j = 0; @@ -2727,7 +3120,7 @@ static void sortCommand(redisClient *c) { if (sortby) { robj *byval; - byval = lookupKeyByPattern(c->dict,sortby,vector[j].obj); + byval = lookupKeyByPattern(c->db,sortby,vector[j].obj); if (!byval || byval->type != REDIS_STRING) continue; if (alpha) { vector[j].u.cmpobj = byval; @@ -2772,7 +3165,7 @@ static void sortCommand(redisClient *c) { } while(ln) { redisSortOperation *sop = ln->value; - robj *val = lookupKeyByPattern(c->dict,sop->pattern, + robj *val = lookupKeyByPattern(c->db,sop->pattern, vector[j].obj); if (sop->type == REDIS_SORT_GET) { @@ -2809,7 +3202,7 @@ static void infoCommand(redisClient *c) { "redis_version:%s\r\n" "connected_clients:%d\r\n" "connected_slaves:%d\r\n" - "used_memory:%d\r\n" + "used_memory:%zu\r\n" "changes_since_last_save:%lld\r\n" "last_save_time:%d\r\n" "total_connections_received:%lld\r\n" @@ -2832,6 +3225,98 @@ static void infoCommand(redisClient *c) { addReply(c,shared.crlf); } +static void monitorCommand(redisClient *c) { + /* ignore MONITOR if aleady slave or in monitor mode */ + if (c->flags & REDIS_SLAVE) return; + + c->flags |= (REDIS_SLAVE|REDIS_MONITOR); + c->slaveseldb = 0; + if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail"); + addReply(c,shared.ok); +} + +/* ================================= Expire ================================= */ +static int removeExpire(redisDb *db, robj *key) { + if (dictDelete(db->expires,key) == DICT_OK) { + return 1; + } else { + return 0; + } +} + +static int setExpire(redisDb *db, robj *key, time_t when) { + if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) { + return 0; + } else { + incrRefCount(key); + return 1; + } +} + +/* Return the expire time of the specified key, or -1 if no expire + * is associated with this key (i.e. the key is non volatile) */ +static time_t getExpire(redisDb *db, robj *key) { + dictEntry *de; + + /* No expire? return ASAP */ + if (dictSize(db->expires) == 0 || + (de = dictFind(db->expires,key)) == NULL) return -1; + + return (time_t) dictGetEntryVal(de); +} + +static int expireIfNeeded(redisDb *db, robj *key) { + time_t when; + dictEntry *de; + + /* No expire? return ASAP */ + if (dictSize(db->expires) == 0 || + (de = dictFind(db->expires,key)) == NULL) return 0; + + /* Lookup the expire */ + when = (time_t) dictGetEntryVal(de); + if (time(NULL) <= when) return 0; + + /* Delete the key */ + dictDelete(db->expires,key); + return dictDelete(db->dict,key) == DICT_OK; +} + +static int deleteIfVolatile(redisDb *db, robj *key) { + dictEntry *de; + + /* No expire? return ASAP */ + if (dictSize(db->expires) == 0 || + (de = dictFind(db->expires,key)) == NULL) return 0; + + /* Delete the key */ + server.dirty++; + dictDelete(db->expires,key); + return dictDelete(db->dict,key) == DICT_OK; +} + +static void expireCommand(redisClient *c) { + dictEntry *de; + int seconds = atoi(c->argv[2]->ptr); + + de = dictFind(c->db->dict,c->argv[1]); + if (de == NULL) { + addReply(c,shared.czero); + return; + } + if (seconds <= 0) { + addReply(c, shared.czero); + return; + } else { + time_t when = time(NULL)+seconds; + if (setExpire(c->db,c->argv[1],when)) + addReply(c,shared.cone); + else + addReply(c,shared.czero); + return; + } +} + /* =============================== Replication ============================= */ /* Send the whole output buffer syncronously to the slave. This a general operation in theory, but it is actually useful only for replication. */ @@ -2851,7 +3336,7 @@ static int flushClientOutput(redisClient *c) { return REDIS_OK; } -static int syncWrite(int fd, void *ptr, ssize_t size, int timeout) { +static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) { ssize_t nwritten, ret = size; time_t start = time(NULL); @@ -2871,7 +3356,7 @@ static int syncWrite(int fd, void *ptr, ssize_t size, int timeout) { return ret; } -static int syncRead(int fd, void *ptr, ssize_t size, int timeout) { +static int syncRead(int fd, char *ptr, ssize_t size, int timeout) { ssize_t nread, totread = 0; time_t start = time(NULL); @@ -2923,7 +3408,8 @@ static void syncCommand(redisClient *c) { if (c->flags & REDIS_SLAVE) return; redisLog(REDIS_NOTICE,"Slave ask for syncronization"); - if (flushClientOutput(c) == REDIS_ERR || saveDb(server.dbfilename) != REDIS_OK) + if (flushClientOutput(c) == REDIS_ERR || + rdbSave(server.dbfilename) != REDIS_OK) goto closeconn; fd = open(server.dbfilename, O_RDONLY); @@ -3020,7 +3506,7 @@ static int syncWithMaster(void) { return REDIS_ERR; } emptyDb(); - if (loadDb(server.dbfilename) != REDIS_OK) { + if (rdbLoad(server.dbfilename) != REDIS_OK) { redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); close(fd); return REDIS_ERR; @@ -3031,16 +3517,6 @@ static int syncWithMaster(void) { return REDIS_OK; } -static void monitorCommand(redisClient *c) { - /* ignore MONITOR if aleady slave or in monitor mode */ - if (c->flags & REDIS_SLAVE) return; - - c->flags |= (REDIS_SLAVE|REDIS_MONITOR); - c->slaveseldb = 0; - if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail"); - addReply(c,shared.ok); -} - /* =================================== Main! ================================ */ static void daemonize(void) { @@ -3079,7 +3555,7 @@ int main(int argc, char **argv) { initServer(); if (server.daemonize) daemonize(); redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION); - if (loadDb(server.dbfilename) == REDIS_OK) + if (rdbLoad(server.dbfilename) == REDIS_OK) redisLog(REDIS_NOTICE,"DB loaded from disk"); if (aeCreateFileEvent(server.el, server.fd, AE_READABLE, acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");