X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/abcb223ec05740c3534b31e3c756ac0d63f8c07e..5a6e8b1daa259141917ccc83cb3851614efac615:/redis.c diff --git a/redis.c b/redis.c index fa62c54c..05a623d5 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,9 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "0.08" +#define REDIS_VERSION "0.09" + +#include "fmacros.h" #include #include @@ -54,6 +56,7 @@ #include "dict.h" /* Hash tables */ #include "adlist.h" /* Linked lists */ #include "zmalloc.h" /* total memory usage aware version of malloc/free */ +#include "lzf.h" /* Error codes */ #define REDIS_OK 0 @@ -69,6 +72,7 @@ #define REDIS_CONFIGLINE_MAX 1024 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */ #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */ +#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */ /* Hash table parameters */ #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */ @@ -85,6 +89,7 @@ #define REDIS_HASH 3 /* Object types only used for dumping to disk */ +#define REDIS_EXPIRETIME 253 #define REDIS_SELECTDB 254 #define REDIS_EOF 255 @@ -95,16 +100,26 @@ * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow - * 11|000000 reserved for future uses + * 11|000000 this means: specially encoded object will follow. The six bits + * number specify the kind of object that follows. + * See the REDIS_RDB_ENC_* defines. * * Lenghts up to 63 are stored using a single byte, most DB keys, and may * values, will fit inside. */ #define REDIS_RDB_6BITLEN 0 #define REDIS_RDB_14BITLEN 1 #define REDIS_RDB_32BITLEN 2 -#define REDIS_RDB_64BITLEN 3 +#define REDIS_RDB_ENCVAL 3 #define REDIS_RDB_LENERR UINT_MAX +/* When a length of a string object stored on disk has the first two bits + * set, the remaining two bits specify a special encoding for the object + * accordingly to the following defines: */ +#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */ +#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */ +#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */ +#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */ + /* Client flags */ #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */ #define REDIS_SLAVE 2 /* This client is a slave server */ @@ -141,16 +156,22 @@ /* A redis object, that is a type able to hold a string / list / set */ typedef struct redisObject { - int type; void *ptr; + int type; int refcount; } robj; +typedef struct redisDb { + dict *dict; + dict *expires; + int id; +} redisDb; + /* With multiplexing we need to take per-clinet state. * Clients are taken in a liked list. */ typedef struct redisClient { int fd; - dict *dict; + redisDb *db; int dictid; sds querybuf; robj *argv[REDIS_MAX_ARGS]; @@ -173,7 +194,7 @@ struct saveparam { struct redisServer { int port; int fd; - dict **dict; + redisDb *db; dict *sharingpool; unsigned int sharingpoolsize; long long dirty; /* changes to DB from the last save */ @@ -184,7 +205,7 @@ struct redisServer { int cronloops; /* number of times the cron function run */ list *objfreelist; /* A list of freed objects to avoid malloc() */ time_t lastsave; /* Unix time of last save succeeede */ - int usedmemory; /* Used memory in megabytes */ + size_t usedmemory; /* Used memory in megabytes */ /* Fields used only for stats */ time_t stat_starttime; /* server start time */ long long stat_numcommands; /* number of processed commands */ @@ -264,6 +285,12 @@ static robj *createStringObject(char *ptr, size_t len); static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc); static int syncWithMaster(void); static robj *tryObjectSharing(robj *o); +static int removeExpire(redisDb *db, robj *key); +static int expireIfNeeded(redisDb *db, robj *key); +static int deleteIfVolatile(redisDb *db, robj *key); +static int deleteKey(redisDb *db, robj *key); +static time_t getExpire(redisDb *db, robj *key); +static int setExpire(redisDb *db, robj *key, time_t when); static void authCommand(redisClient *c); static void pingCommand(redisClient *c); @@ -312,6 +339,7 @@ static void lremCommand(redisClient *c); static void infoCommand(redisClient *c); static void mgetCommand(redisClient *c); static void monitorCommand(redisClient *c); +static void expireCommand(redisClient *c); /*================================= Globals ================================= */ @@ -366,6 +394,7 @@ static struct redisCommand cmdTable[] = { {"sort",sortCommand,-2,REDIS_CMD_INLINE}, {"info",infoCommand,1,REDIS_CMD_INLINE}, {"monitor",monitorCommand,1,REDIS_CMD_INLINE}, + {"expire",expireCommand,3,REDIS_CMD_INLINE}, {NULL,NULL,0,0} }; @@ -605,7 +634,7 @@ void closeTimedoutClients(void) { } int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { - int j, size, used, loops = server.cronloops++; + int j, loops = server.cronloops++; REDIS_NOTUSED(eventLoop); REDIS_NOTUSED(id); REDIS_NOTUSED(clientData); @@ -616,27 +645,30 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL * we resize the hash table to save memory */ for (j = 0; j < server.dbnum; j++) { - size = dictGetHashTableSize(server.dict[j]); - used = dictGetHashTableUsed(server.dict[j]); + int size, used, vkeys; + + size = dictSlots(server.db[j].dict); + used = dictSize(server.db[j].dict); + vkeys = dictSize(server.db[j].expires); if (!(loops % 5) && used > 0) { - redisLog(REDIS_DEBUG,"DB %d: %d keys in %d slots HT.",j,used,size); - // dictPrintStats(server.dict); + redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size); + /* dictPrintStats(server.dict); */ } if (size && used && size > REDIS_HT_MINSLOTS && (used*100/size < REDIS_HT_MINFILL)) { redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j); - dictResize(server.dict[j]); + dictResize(server.db[j].dict); redisLog(REDIS_NOTICE,"Hash table %d resized.",j); } } /* Show information about connected clients */ if (!(loops % 5)) { - redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %d bytes in use", + redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), server.usedmemory, - dictGetHashTableUsed(server.sharingpool)); + dictSize(server.sharingpool)); } /* Close connections of timedout clients */ @@ -675,6 +707,30 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } } } + + /* Try to expire a few timed out keys */ + for (j = 0; j < server.dbnum; j++) { + redisDb *db = server.db+j; + int num = dictSize(db->expires); + + if (num) { + time_t now = time(NULL); + + if (num > REDIS_EXPIRELOOKUPS_PER_CRON) + num = REDIS_EXPIRELOOKUPS_PER_CRON; + while (num--) { + dictEntry *de; + time_t t; + + if ((de = dictGetRandomKey(db->expires)) == NULL) break; + t = (time_t) dictGetEntryVal(de); + if (now > t) { + deleteKey(db,dictGetEntryKey(de)); + } + } + } + } + /* Check if we should connect to a MASTER */ if (server.replstate == REDIS_REPL_CONNECT) { redisLog(REDIS_NOTICE,"Connecting to MASTER..."); @@ -775,18 +831,21 @@ static void initServer() { server.objfreelist = listCreate(); createSharedObjects(); server.el = aeCreateEventLoop(); - server.dict = zmalloc(sizeof(dict*)*server.dbnum); + server.db = zmalloc(sizeof(redisDb)*server.dbnum); server.sharingpool = dictCreate(&setDictType,NULL); server.sharingpoolsize = 1024; - if (!server.dict || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist) + if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist) oom("server initialization"); /* Fatal OOM */ server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); if (server.fd == -1) { redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr); exit(1); } - for (j = 0; j < server.dbnum; j++) - server.dict[j] = dictCreate(&hashDictType,NULL); + for (j = 0; j < server.dbnum; j++) { + server.db[j].dict = dictCreate(&hashDictType,NULL); + server.db[j].expires = dictCreate(&setDictType,NULL); + server.db[j].id = j; + } server.cronloops = 0; server.bgsaveinprogress = 0; server.lastsave = time(NULL); @@ -802,8 +861,10 @@ static void initServer() { static void emptyDb() { int j; - for (j = 0; j < server.dbnum; j++) - dictEmpty(server.dict[j]); + for (j = 0; j < server.dbnum; j++) { + dictEmpty(server.db[j].dict); + dictEmpty(server.db[j].expires); + } } /* I agree, this is a very rudimental way to load a configuration... @@ -1027,7 +1088,7 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) if (c->flags & REDIS_MASTER) { nwritten = objlen - c->sentlen; } else { - nwritten = write(fd, o->ptr+c->sentlen, objlen - c->sentlen); + nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen); if (nwritten <= 0) break; } c->sentlen += nwritten; @@ -1099,10 +1160,6 @@ static int processCommand(redisClient *c) { addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n")); resetClient(c); return 1; - } else if (server.requirepass && !c->authenticated && strcmp(c->argv[0]->ptr,"auth")) { - addReplySds(c,sdsnew("-ERR operation not permitted\r\n")); - resetClient(c); - return 1; } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { int bulklen = atoi(c->argv[c->argc-1]->ptr); @@ -1131,13 +1188,20 @@ static int processCommand(redisClient *c) { for(j = 1; j < c->argc; j++) c->argv[j] = tryObjectSharing(c->argv[j]); } + /* Check if the user is authenticated */ + if (server.requirepass && !c->authenticated && cmd->proc != authCommand) { + addReplySds(c,sdsnew("-ERR operation not permitted\r\n")); + resetClient(c); + return 1; + } + /* Exec the command */ dirty = server.dirty; cmd->proc(c); if (server.dirty-dirty != 0 && listLength(server.slaves)) - replicationFeedSlaves(server.slaves,cmd,c->dictid,c->argv,c->argc); + replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc); if (listLength(server.monitors)) - replicationFeedSlaves(server.monitors,cmd,c->dictid,c->argv,c->argc); + replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc); server.stat_numcommands++; /* Prepare the client for the next command */ @@ -1295,8 +1359,7 @@ again: static int selectDb(redisClient *c, int id) { if (id < 0 || id >= server.dbnum) return REDIS_ERR; - c->dict = server.dict[id]; - c->dictid = id; + c->db = &server.db[id]; return REDIS_OK; } @@ -1398,14 +1461,6 @@ static robj *createSetObject(void) { return createObject(REDIS_SET,d); } -#if 0 -static robj *createHashObject(void) { - dict *d = dictCreate(&hashDictType,NULL); - if (!d) oom("dictCreate"); - return createObject(REDIS_SET,d); -} -#endif - static void freeStringObject(robj *o) { sdsfree(o->ptr); } @@ -1424,10 +1479,19 @@ static void freeHashObject(robj *o) { static void incrRefCount(robj *o) { o->refcount++; +#ifdef DEBUG_REFCOUNT + if (o->type == REDIS_STRING) + printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount); +#endif } static void decrRefCount(void *obj) { robj *o = obj; + +#ifdef DEBUG_REFCOUNT + if (o->type == REDIS_STRING) + printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1); +#endif if (--(o->refcount) == 0) { switch(o->type) { case REDIS_STRING: freeStringObject(o); break; @@ -1447,7 +1511,7 @@ static robj *tryObjectSharing(robj *o) { struct dictEntry *de; unsigned long c; - if (server.shareobjects == 0) return o; + if (o == NULL || server.shareobjects == 0) return o; assert(o->type == REDIS_STRING); de = dictFind(server.sharingpool,o); @@ -1464,7 +1528,7 @@ static robj *tryObjectSharing(robj *o) { * shared we increment its count, everytime there is a miss we * recrement the counter of a random object. If this object reaches * zero we remove the object and put the current object instead. */ - if (dictGetHashTableUsed(server.sharingpool) >= + if (dictSize(server.sharingpool) >= server.sharingpoolsize) { de = dictGetRandomKey(server.sharingpool); assert(de != NULL); @@ -1487,6 +1551,36 @@ static robj *tryObjectSharing(robj *o) { } } +static robj *lookupKey(redisDb *db, robj *key) { + dictEntry *de = dictFind(db->dict,key); + return de ? dictGetEntryVal(de) : NULL; +} + +static robj *lookupKeyRead(redisDb *db, robj *key) { + expireIfNeeded(db,key); + return lookupKey(db,key); +} + +static robj *lookupKeyWrite(redisDb *db, robj *key) { + deleteIfVolatile(db,key); + return lookupKey(db,key); +} + +static int deleteKey(redisDb *db, robj *key) { + int retval; + + /* We need to protect key from destruction: after the first dictDelete() + * it may happen that 'key' is no longer valid if we don't increment + * it's count. This may happen when we get the object reference directly + * from the hash table with dictRandomKey() or dict iterators */ + incrRefCount(key); + if (dictSize(db->expires)) dictDelete(db->expires,key); + retval = dictDelete(db->dict,key); + decrRefCount(key); + + return retval == DICT_OK; +} + /*============================ DB saving/loading ============================ */ static int rdbSaveType(FILE *fp, unsigned char type) { @@ -1494,6 +1588,13 @@ static int rdbSaveType(FILE *fp, unsigned char type) { return 0; } +static int rdbSaveTime(FILE *fp, time_t t) { + int32_t t32 = (int32_t) t; + if (fwrite(&t32,4,1,fp) == 0) return -1; + return 0; +} + +/* check rdbLoadLen() comments for more info */ static int rdbSaveLen(FILE *fp, uint32_t len) { unsigned char buf[2]; @@ -1505,7 +1606,7 @@ static int rdbSaveLen(FILE *fp, uint32_t len) { /* Save a 14 bit len */ buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6); buf[1] = len&0xFF; - if (fwrite(buf,4,1,fp) == 0) return -1; + if (fwrite(buf,2,1,fp) == 0) return -1; } else { /* Save a 32 bit len */ buf[0] = (REDIS_RDB_32BITLEN<<6); @@ -1516,9 +1617,99 @@ static int rdbSaveLen(FILE *fp, uint32_t len) { return 0; } +/* String objects in the form "2391" "-100" without any space and with a + * range of values that can fit in an 8, 16 or 32 bit signed value can be + * encoded as integers to save space */ +int rdbTryIntegerEncoding(sds s, unsigned char *enc) { + long long value; + char *endptr, buf[32]; + + /* Check if it's possible to encode this value as a number */ + value = strtoll(s, &endptr, 10); + if (endptr[0] != '\0') return 0; + snprintf(buf,32,"%lld",value); + + /* If the number converted back into a string is not identical + * then it's not possible to encode the string as integer */ + if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0; + + /* Finally check if it fits in our ranges */ + if (value >= -(1<<7) && value <= (1<<7)-1) { + enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8; + enc[1] = value&0xFF; + return 2; + } else if (value >= -(1<<15) && value <= (1<<15)-1) { + enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16; + enc[1] = value&0xFF; + enc[2] = (value>>8)&0xFF; + return 3; + } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) { + enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32; + enc[1] = value&0xFF; + enc[2] = (value>>8)&0xFF; + enc[3] = (value>>16)&0xFF; + enc[4] = (value>>24)&0xFF; + return 5; + } else { + return 0; + } +} + +static int rdbSaveLzfStringObject(FILE *fp, robj *obj) { + unsigned int comprlen, outlen; + unsigned char byte; + void *out; + + /* We require at least four bytes compression for this to be worth it */ + outlen = sdslen(obj->ptr)-4; + if (outlen <= 0) return 0; + if ((out = zmalloc(outlen)) == NULL) return 0; + comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen); + if (comprlen == 0) { + zfree(out); + return 0; + } + /* Data compressed! Let's save it on disk */ + byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF; + if (fwrite(&byte,1,1,fp) == 0) goto writeerr; + if (rdbSaveLen(fp,comprlen) == -1) goto writeerr; + if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr; + if (fwrite(out,comprlen,1,fp) == 0) goto writeerr; + zfree(out); + return comprlen; + +writeerr: + zfree(out); + return -1; +} + +/* Save a string objet as [len][data] on disk. If the object is a string + * representation of an integer value we try to safe it in a special form */ static int rdbSaveStringObject(FILE *fp, robj *obj) { size_t len = sdslen(obj->ptr); + int enclen; + + /* Try integer encoding */ + if (len <= 11) { + unsigned char buf[5]; + if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) { + if (fwrite(buf,enclen,1,fp) == 0) return -1; + return 0; + } + } + + /* Try LZF compression - under 20 bytes it's unable to compress even + * aaaaaaaaaaaaaaaaaa so skip it */ + if (len > 20) { + int retval; + + retval = rdbSaveLzfStringObject(fp,obj); + if (retval == -1) return -1; + if (retval > 0) return 0; + /* retval == 0 means data can't be compressed, save the old way */ + } + /* Store verbatim */ if (rdbSaveLen(fp,len) == -1) return -1; if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1; return 0; @@ -1531,6 +1722,7 @@ static int rdbSave(char *filename) { FILE *fp; char tmpfile[256]; int j; + time_t now = time(NULL); snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random()); fp = fopen(tmpfile,"w"); @@ -1540,8 +1732,9 @@ static int rdbSave(char *filename) { } if (fwrite("REDIS0001",9,1,fp) == 0) goto werr; for (j = 0; j < server.dbnum; j++) { - dict *d = server.dict[j]; - if (dictGetHashTableUsed(d) == 0) continue; + redisDb *db = server.db+j; + dict *d = db->dict; + if (dictSize(d) == 0) continue; di = dictGetIterator(d); if (!di) { fclose(fp); @@ -1556,7 +1749,16 @@ static int rdbSave(char *filename) { while((de = dictNext(di)) != NULL) { robj *key = dictGetEntryKey(de); robj *o = dictGetEntryVal(de); - + time_t expiretime = getExpire(db,key); + + /* Save the expire time */ + if (expiretime != -1) { + /* If this key is already expired skip it */ + if (expiretime < now) continue; + if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr; + if (rdbSaveTime(fp,expiretime) == -1) goto werr; + } + /* Save the key and associated value */ if (rdbSaveType(fp,o->type) == -1) goto werr; if (rdbSaveStringObject(fp,key) == -1) goto werr; if (o->type == REDIS_STRING) { @@ -1581,7 +1783,7 @@ static int rdbSave(char *filename) { dictEntry *de; if (!set) oom("dictGetIteraotr"); - if (rdbSaveLen(fp,dictGetHashTableUsed(set)) == -1) goto werr; + if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr; while((de = dictNext(di)) != NULL) { robj *eleobj = dictGetEntryKey(de); @@ -1649,19 +1851,38 @@ static int rdbLoadType(FILE *fp) { return type; } -static uint32_t rdbLoadLen(FILE *fp, int rdbver) { +static time_t rdbLoadTime(FILE *fp) { + int32_t t32; + if (fread(&t32,4,1,fp) == 0) return -1; + return (time_t) t32; +} + +/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top + * of this file for a description of how this are stored on disk. + * + * isencoded is set to 1 if the readed length is not actually a length but + * an "encoding type", check the above comments for more info */ +static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) { unsigned char buf[2]; uint32_t len; + if (isencoded) *isencoded = 0; if (rdbver == 0) { if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR; return ntohl(len); } else { + int type; + if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR; - if ((buf[0]&0xC0) == REDIS_RDB_6BITLEN) { + type = (buf[0]&0xC0)>>6; + if (type == REDIS_RDB_6BITLEN) { /* Read a 6 bit len */ - return buf[0]; - } else if ((buf[0]&0xC0) == REDIS_RDB_14BITLEN) { + return buf[0]&0x3F; + } else if (type == REDIS_RDB_ENCVAL) { + /* Read a 6 bit len encoding type */ + if (isencoded) *isencoded = 1; + return buf[0]&0x3F; + } else if (type == REDIS_RDB_14BITLEN) { /* Read a 14 bit len */ if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR; return ((buf[0]&0x3F)<<8)|buf[1]; @@ -1673,10 +1894,67 @@ static uint32_t rdbLoadLen(FILE *fp, int rdbver) { } } -static robj *rdbLoadStringObject(FILE*fp,int rdbver) { - uint32_t len = rdbLoadLen(fp,rdbver); +static robj *rdbLoadIntegerObject(FILE *fp, int enctype) { + unsigned char enc[4]; + long long val; + + if (enctype == REDIS_RDB_ENC_INT8) { + if (fread(enc,1,1,fp) == 0) return NULL; + val = (signed char)enc[0]; + } else if (enctype == REDIS_RDB_ENC_INT16) { + uint16_t v; + if (fread(enc,2,1,fp) == 0) return NULL; + v = enc[0]|(enc[1]<<8); + val = (int16_t)v; + } else if (enctype == REDIS_RDB_ENC_INT32) { + uint32_t v; + if (fread(enc,4,1,fp) == 0) return NULL; + v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24); + val = (int32_t)v; + } else { + val = 0; /* anti-warning */ + assert(0!=0); + } + return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val)); +} + +static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) { + unsigned int len, clen; + unsigned char *c = NULL; + sds val = NULL; + + if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((c = zmalloc(clen)) == NULL) goto err; + if ((val = sdsnewlen(NULL,len)) == NULL) goto err; + if (fread(c,clen,1,fp) == 0) goto err; + if (lzf_decompress(c,clen,val,len) == 0) goto err; + return createObject(REDIS_STRING,val); +err: + zfree(c); + sdsfree(val); + return NULL; +} + +static robj *rdbLoadStringObject(FILE*fp, int rdbver) { + int isencoded; + uint32_t len; sds val; + len = rdbLoadLen(fp,rdbver,&isencoded); + if (isencoded) { + switch(len) { + case REDIS_RDB_ENC_INT8: + case REDIS_RDB_ENC_INT16: + case REDIS_RDB_ENC_INT32: + return tryObjectSharing(rdbLoadIntegerObject(fp,len)); + case REDIS_RDB_ENC_LZF: + return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver)); + default: + assert(0!=0); + } + } + if (len == REDIS_RDB_LENERR) return NULL; val = sdsnewlen(NULL,len); if (len && fread(val,len,1,fp) == 0) { @@ -1690,11 +1968,12 @@ static int rdbLoad(char *filename) { FILE *fp; robj *keyobj = NULL; uint32_t dbid; - int type; - int retval; - dict *d = server.dict[0]; + int type, retval, rdbver; + dict *d = server.db[0].dict; + redisDb *db = server.db+0; char buf[1024]; - int rdbver; + time_t expiretime = -1, now = time(NULL); + fp = fopen(filename,"r"); if (!fp) return REDIS_ERR; if (fread(buf,9,1,fp) == 0) goto eoferr; @@ -1715,15 +1994,22 @@ static int rdbLoad(char *filename) { /* Read type. */ if ((type = rdbLoadType(fp)) == -1) goto eoferr; + if (type == REDIS_EXPIRETIME) { + if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr; + /* We read the time so we need to read the object type again */ + if ((type = rdbLoadType(fp)) == -1) goto eoferr; + } if (type == REDIS_EOF) break; /* Handle SELECT DB opcode as a special case */ if (type == REDIS_SELECTDB) { - if ((dbid = rdbLoadLen(fp,rdbver)) == REDIS_RDB_LENERR) goto eoferr; + if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) + goto eoferr; if (dbid >= (unsigned)server.dbnum) { redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum); exit(1); } - d = server.dict[dbid]; + db = server.db+dbid; + d = db->dict; continue; } /* Read key */ @@ -1736,7 +2022,7 @@ static int rdbLoad(char *filename) { /* Read list/set value */ uint32_t listlen; - if ((listlen = rdbLoadLen(fp,rdbver)) == REDIS_RDB_LENERR) + if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) goto eoferr; o = (type == REDIS_LIST) ? createListObject() : createSetObject(); /* Load every single element of the list/set */ @@ -1761,14 +2047,21 @@ static int rdbLoad(char *filename) { redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr); exit(1); } + /* Set the expire time if needed */ + if (expiretime != -1) { + setExpire(db,keyobj,expiretime); + /* Delete this key if already expired */ + if (expiretime < now) deleteKey(db,keyobj); + expiretime = -1; + } keyobj = o = NULL; } fclose(fp); return REDIS_OK; eoferr: /* unexpected end of file is handled here with a fatal exit */ - decrRefCount(keyobj); - redisLog(REDIS_WARNING,"Short read loading DB. Unrecoverable error, exiting now."); + if (keyobj) decrRefCount(keyobj); + redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now."); exit(1); return REDIS_ERR; /* Just to avoid warning */ } @@ -1776,7 +2069,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ /*================================== Commands =============================== */ static void authCommand(redisClient *c) { - if (!strcmp(c->argv[1]->ptr, server.requirepass)) { + if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) { c->authenticated = 1; addReply(c,shared.ok); } else { @@ -1801,10 +2094,10 @@ static void echoCommand(redisClient *c) { static void setGenericCommand(redisClient *c, int nx) { int retval; - retval = dictAdd(c->dict,c->argv[1],c->argv[2]); + retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); if (retval == DICT_ERR) { if (!nx) { - dictReplace(c->dict,c->argv[1],c->argv[2]); + dictReplace(c->db->dict,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); } else { addReply(c,shared.czero); @@ -1815,26 +2108,24 @@ static void setGenericCommand(redisClient *c, int nx) { incrRefCount(c->argv[2]); } server.dirty++; + removeExpire(c->db,c->argv[1]); addReply(c, nx ? shared.cone : shared.ok); } static void setCommand(redisClient *c) { - return setGenericCommand(c,0); + setGenericCommand(c,0); } static void setnxCommand(redisClient *c) { - return setGenericCommand(c,1); + setGenericCommand(c,1); } static void getCommand(redisClient *c) { - dictEntry *de; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + robj *o = lookupKeyRead(c->db,c->argv[1]); + + if (o == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_STRING) { addReply(c,shared.wrongtypeerr); } else { @@ -1846,17 +2137,14 @@ static void getCommand(redisClient *c) { } static void mgetCommand(redisClient *c) { - dictEntry *de; int j; addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1)); for (j = 1; j < c->argc; j++) { - de = dictFind(c->dict,c->argv[j]); - if (de == NULL) { + robj *o = lookupKeyRead(c->db,c->argv[j]); + if (o == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_STRING) { addReply(c,shared.nullbulk); } else { @@ -1869,17 +2157,14 @@ static void mgetCommand(redisClient *c) { } static void incrDecrCommand(redisClient *c, int incr) { - dictEntry *de; long long value; int retval; robj *o; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { value = 0; } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_STRING) { value = 0; } else { @@ -1891,9 +2176,10 @@ static void incrDecrCommand(redisClient *c, int incr) { value += incr; o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value)); - retval = dictAdd(c->dict,c->argv[1],o); + retval = dictAdd(c->db->dict,c->argv[1],o); if (retval == DICT_ERR) { - dictReplace(c->dict,c->argv[1],o); + dictReplace(c->db->dict,c->argv[1],o); + removeExpire(c->db,c->argv[1]); } else { incrRefCount(c->argv[1]); } @@ -1904,27 +2190,27 @@ static void incrDecrCommand(redisClient *c, int incr) { } static void incrCommand(redisClient *c) { - return incrDecrCommand(c,1); + incrDecrCommand(c,1); } static void decrCommand(redisClient *c) { - return incrDecrCommand(c,-1); + incrDecrCommand(c,-1); } static void incrbyCommand(redisClient *c) { int incr = atoi(c->argv[2]->ptr); - return incrDecrCommand(c,incr); + incrDecrCommand(c,incr); } static void decrbyCommand(redisClient *c) { int incr = atoi(c->argv[2]->ptr); - return incrDecrCommand(c,-incr); + incrDecrCommand(c,-incr); } /* ========================= Type agnostic commands ========================= */ static void delCommand(redisClient *c) { - if (dictDelete(c->dict,c->argv[1]) == DICT_OK) { + if (deleteKey(c->db,c->argv[1])) { server.dirty++; addReply(c,shared.cone); } else { @@ -1933,20 +2219,14 @@ static void delCommand(redisClient *c) { } static void existsCommand(redisClient *c) { - dictEntry *de; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) - addReply(c,shared.czero); - else - addReply(c,shared.cone); + addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero); } static void selectCommand(redisClient *c) { int id = atoi(c->argv[1]->ptr); if (selectDb(c,id) == REDIS_ERR) { - addReplySds(c,"-ERR invalid DB index\r\n"); + addReplySds(c,sdsnew("-ERR invalid DB index\r\n")); } else { addReply(c,shared.ok); } @@ -1954,9 +2234,13 @@ static void selectCommand(redisClient *c) { static void randomkeyCommand(redisClient *c) { dictEntry *de; - - de = dictGetRandomKey(c->dict); + + while(1) { + de = dictGetRandomKey(c->db->dict); + if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break; + } if (de == NULL) { + addReply(c,shared.plus); addReply(c,shared.crlf); } else { addReply(c,shared.plus); @@ -1973,20 +2257,23 @@ static void keysCommand(redisClient *c) { int numkeys = 0, keyslen = 0; robj *lenobj = createObject(REDIS_STRING,NULL); - di = dictGetIterator(c->dict); + di = dictGetIterator(c->db->dict); if (!di) oom("dictGetIterator"); addReply(c,lenobj); decrRefCount(lenobj); while((de = dictNext(di)) != NULL) { robj *keyobj = dictGetEntryKey(de); + sds key = keyobj->ptr; if ((pattern[0] == '*' && pattern[1] == '\0') || stringmatchlen(pattern,plen,key,sdslen(key),0)) { - if (numkeys != 0) - addReply(c,shared.space); - addReply(c,keyobj); - numkeys++; - keyslen += sdslen(key); + if (expireIfNeeded(c->db,keyobj) == 0) { + if (numkeys != 0) + addReply(c,shared.space); + addReply(c,keyobj); + numkeys++; + keyslen += sdslen(key); + } } } dictReleaseIterator(di); @@ -1996,7 +2283,7 @@ static void keysCommand(redisClient *c) { static void dbsizeCommand(redisClient *c) { addReplySds(c, - sdscatprintf(sdsempty(),":%lu\r\n",dictGetHashTableUsed(c->dict))); + sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict))); } static void lastsaveCommand(redisClient *c) { @@ -2005,15 +2292,13 @@ static void lastsaveCommand(redisClient *c) { } static void typeCommand(redisClient *c) { - dictEntry *de; + robj *o; char *type; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { type = "+none"; } else { - robj *o = dictGetEntryVal(de); - switch(o->type) { case REDIS_STRING: type = "+string"; break; case REDIS_LIST: type = "+list"; break; @@ -2026,6 +2311,10 @@ static void typeCommand(redisClient *c) { } static void saveCommand(redisClient *c) { + if (server.bgsaveinprogress) { + addReplySds(c,sdsnew("-ERR background save in progress\r\n")); + return; + } if (rdbSave(server.dbfilename) == REDIS_OK) { addReply(c,shared.ok); } else { @@ -2060,7 +2349,6 @@ static void shutdownCommand(redisClient *c) { } static void renameGenericCommand(redisClient *c, int nx) { - dictEntry *de; robj *o; /* To use the same key as src and dst is probably an error */ @@ -2069,24 +2357,24 @@ static void renameGenericCommand(redisClient *c, int nx) { return; } - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nokeyerr); return; } - o = dictGetEntryVal(de); incrRefCount(o); - if (dictAdd(c->dict,c->argv[2],o) == DICT_ERR) { + deleteIfVolatile(c->db,c->argv[2]); + if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) { if (nx) { decrRefCount(o); addReply(c,shared.czero); return; } - dictReplace(c->dict,c->argv[2],o); + dictReplace(c->db->dict,c->argv[2],o); } else { incrRefCount(c->argv[2]); } - dictDelete(c->dict,c->argv[1]); + deleteKey(c->db,c->argv[1]); server.dirty++; addReply(c,nx ? shared.cone : shared.ok); } @@ -2100,21 +2388,19 @@ static void renamenxCommand(redisClient *c) { } static void moveCommand(redisClient *c) { - dictEntry *de; - robj *o, *key; - dict *src, *dst; + robj *o; + redisDb *src, *dst; int srcid; /* Obtain source and target DB pointers */ - src = c->dict; - srcid = c->dictid; + src = c->db; + srcid = c->db->id; if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) { addReply(c,shared.outofrangeerr); return; } - dst = c->dict; - c->dict = src; - c->dictid = srcid; + dst = c->db; + selectDb(c,srcid); /* Back to the source DB */ /* If the user is moving using as target the same * DB as the source DB it is probably an error. */ @@ -2124,24 +2410,23 @@ static void moveCommand(redisClient *c) { } /* Check if the element exists and get a reference */ - de = dictFind(c->dict,c->argv[1]); - if (!de) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (!o) { addReply(c,shared.czero); return; } /* Try to add the element to the target DB */ - key = dictGetEntryKey(de); - o = dictGetEntryVal(de); - if (dictAdd(dst,key,o) == DICT_ERR) { + deleteIfVolatile(dst,c->argv[1]); + if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) { addReply(c,shared.czero); return; } - incrRefCount(key); + incrRefCount(c->argv[1]); incrRefCount(o); /* OK! key moved, free the entry in the source DB */ - dictDelete(src,c->argv[1]); + deleteKey(src,c->argv[1]); server.dirty++; addReply(c,shared.cone); } @@ -2149,11 +2434,10 @@ static void moveCommand(redisClient *c) { /* =================================== Lists ================================ */ static void pushGenericCommand(redisClient *c, int where) { robj *lobj; - dictEntry *de; list *list; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + + lobj = lookupKeyWrite(c->db,c->argv[1]); + if (lobj == NULL) { lobj = createListObject(); list = lobj->ptr; if (where == REDIS_HEAD) { @@ -2161,11 +2445,10 @@ static void pushGenericCommand(redisClient *c, int where) { } else { if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail"); } - dictAdd(c->dict,c->argv[1],lobj); + dictAdd(c->db->dict,c->argv[1],lobj); incrRefCount(c->argv[1]); incrRefCount(c->argv[2]); } else { - lobj = dictGetEntryVal(de); if (lobj->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); return; @@ -2191,15 +2474,14 @@ static void rpushCommand(redisClient *c) { } static void llenCommand(redisClient *c) { - dictEntry *de; + robj *o; list *l; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.czero); return; } else { - robj *o = dictGetEntryVal(de); if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2210,15 +2492,13 @@ static void llenCommand(redisClient *c) { } static void lindexCommand(redisClient *c) { - dictEntry *de; + robj *o; int index = atoi(c->argv[2]->ptr); - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2239,15 +2519,13 @@ static void lindexCommand(redisClient *c) { } static void lsetCommand(redisClient *c) { - dictEntry *de; + robj *o; int index = atoi(c->argv[2]->ptr); - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nokeyerr); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2271,14 +2549,12 @@ static void lsetCommand(redisClient *c) { } static void popGenericCommand(redisClient *c, int where) { - dictEntry *de; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + robj *o; + + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2313,16 +2589,14 @@ static void rpopCommand(redisClient *c) { } static void lrangeCommand(redisClient *c) { - dictEntry *de; + robj *o; int start = atoi(c->argv[2]->ptr); int end = atoi(c->argv[3]->ptr); - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nullmultibulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2362,16 +2636,14 @@ static void lrangeCommand(redisClient *c) { } static void ltrimCommand(redisClient *c) { - dictEntry *de; + robj *o; int start = atoi(c->argv[2]->ptr); int end = atoi(c->argv[3]->ptr); - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nokeyerr); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2413,14 +2685,12 @@ static void ltrimCommand(redisClient *c) { } static void lremCommand(redisClient *c) { - dictEntry *de; + robj *o; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nokeyerr); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2436,8 +2706,9 @@ static void lremCommand(redisClient *c) { } ln = fromtail ? list->tail : list->head; while (ln) { - next = fromtail ? ln->prev : ln->next; robj *ele = listNodeValue(ln); + + next = fromtail ? ln->prev : ln->next; if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) { listDelNode(list,ln); server.dirty++; @@ -2454,16 +2725,14 @@ static void lremCommand(redisClient *c) { /* ==================================== Sets ================================ */ static void saddCommand(redisClient *c) { - dictEntry *de; robj *set; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + set = lookupKeyWrite(c->db,c->argv[1]); + if (set == NULL) { set = createSetObject(); - dictAdd(c->dict,c->argv[1],set); + dictAdd(c->db->dict,c->argv[1],set); incrRefCount(c->argv[1]); } else { - set = dictGetEntryVal(de); if (set->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); return; @@ -2479,15 +2748,12 @@ static void saddCommand(redisClient *c) { } static void sremCommand(redisClient *c) { - dictEntry *de; + robj *set; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + set = lookupKeyWrite(c->db,c->argv[1]); + if (set == NULL) { addReply(c,shared.czero); } else { - robj *set; - - set = dictGetEntryVal(de); if (set->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); return; @@ -2502,15 +2768,12 @@ static void sremCommand(redisClient *c) { } static void sismemberCommand(redisClient *c) { - dictEntry *de; + robj *set; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + set = lookupKeyRead(c->db,c->argv[1]); + if (set == NULL) { addReply(c,shared.czero); } else { - robj *set; - - set = dictGetEntryVal(de); if (set->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); return; @@ -2523,21 +2786,20 @@ static void sismemberCommand(redisClient *c) { } static void scardCommand(redisClient *c) { - dictEntry *de; + robj *o; dict *s; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.czero); return; } else { - robj *o = dictGetEntryVal(de); if (o->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); } else { s = o->ptr; addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n", - dictGetHashTableUsed(s))); + dictSize(s))); } } } @@ -2545,7 +2807,7 @@ static void scardCommand(redisClient *c) { static int qsortCompareSetsByCardinality(const void *s1, const void *s2) { dict **d1 = (void*) s1, **d2 = (void*) s2; - return dictGetHashTableUsed(*d1)-dictGetHashTableUsed(*d2); + return dictSize(*d1)-dictSize(*d2); } static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) { @@ -2558,15 +2820,15 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r if (!dv) oom("sinterCommand"); for (j = 0; j < setsnum; j++) { robj *setobj; - dictEntry *de; - - de = dictFind(c->dict,setskeys[j]); - if (!de) { + + setobj = dstkey ? + lookupKeyWrite(c->db,setskeys[j]) : + lookupKeyRead(c->db,setskeys[j]); + if (!setobj) { zfree(dv); addReply(c,shared.nokeyerr); return; } - setobj = dictGetEntryVal(de); if (setobj->type != REDIS_SET) { zfree(dv); addReply(c,shared.wrongtypeerr); @@ -2591,9 +2853,10 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r /* If we have a target key where to store the resulting set * create this key with an empty set inside */ dstset = createSetObject(); - dictDelete(c->dict,dstkey); - dictAdd(c->dict,dstkey,dstset); + deleteKey(c->db,dstkey); + dictAdd(c->db->dict,dstkey,dstset); incrRefCount(dstkey); + server.dirty++; } /* Iterate all the elements of the first (smallest) set, and test @@ -2618,6 +2881,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r } else { dictAdd(dstset->ptr,ele,NULL); incrRefCount(ele); + server.dirty++; } } dictReleaseIterator(di); @@ -2638,13 +2902,16 @@ static void sinterstoreCommand(redisClient *c) { } static void flushdbCommand(redisClient *c) { - dictEmpty(c->dict); + dictEmpty(c->db->dict); + dictEmpty(c->db->expires); + server.dirty++; addReply(c,shared.ok); rdbSave(server.dbfilename); } static void flushallCommand(redisClient *c) { emptyDb(); + server.dirty++; addReply(c,shared.ok); rdbSave(server.dbfilename); } @@ -2659,12 +2926,11 @@ redisSortOperation *createSortOperation(int type, robj *pattern) { /* Return the value associated to the key with a name obtained * substituting the first occurence of '*' in 'pattern' with 'subst' */ -robj *lookupKeyByPattern(dict *dict, robj *pattern, robj *subst) { +robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { char *p; sds spat, ssub; robj keyobj; int prefixlen, sublen, postfixlen; - dictEntry *de; /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */ struct { long len; @@ -2672,7 +2938,6 @@ robj *lookupKeyByPattern(dict *dict, robj *pattern, robj *subst) { char buf[REDIS_SORTKEY_MAX+1]; } keyname; - spat = pattern->ptr; ssub = subst->ptr; if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL; @@ -2692,10 +2957,8 @@ robj *lookupKeyByPattern(dict *dict, robj *pattern, robj *subst) { keyobj.type = REDIS_STRING; keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2); - de = dictFind(dict,&keyobj); - // printf("lookup '%s' => %p\n", keyname.buf,de); - if (!de) return NULL; - return dictGetEntryVal(de); + /* printf("lookup '%s' => %p\n", keyname.buf,de); */ + return lookupKeyRead(db,&keyobj); } /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with @@ -2740,7 +3003,6 @@ static int sortCompare(const void *s1, const void *s2) { /* The SORT command is the most complex command in Redis. Warning: this code * is optimized for speed and a bit less for readability */ static void sortCommand(redisClient *c) { - dictEntry *de; list *operations; int outputlen = 0; int desc = 0, alpha = 0; @@ -2751,12 +3013,11 @@ static void sortCommand(redisClient *c) { redisSortObject *vector; /* Resulting vector to sort */ /* Lookup the key to sort. It must be of the right types */ - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + sortval = lookupKeyRead(c->db,c->argv[1]); + if (sortval == NULL) { addReply(c,shared.nokeyerr); return; } - sortval = dictGetEntryVal(de); if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); return; @@ -2821,7 +3082,7 @@ static void sortCommand(redisClient *c) { /* Load the sorting vector with all the objects to sort */ vectorlen = (sortval->type == REDIS_LIST) ? listLength((list*)sortval->ptr) : - dictGetHashTableUsed((dict*)sortval->ptr); + dictSize((dict*)sortval->ptr); vector = zmalloc(sizeof(redisSortObject)*vectorlen); if (!vector) oom("allocating objects vector for SORT"); j = 0; @@ -2859,7 +3120,7 @@ static void sortCommand(redisClient *c) { if (sortby) { robj *byval; - byval = lookupKeyByPattern(c->dict,sortby,vector[j].obj); + byval = lookupKeyByPattern(c->db,sortby,vector[j].obj); if (!byval || byval->type != REDIS_STRING) continue; if (alpha) { vector[j].u.cmpobj = byval; @@ -2904,7 +3165,7 @@ static void sortCommand(redisClient *c) { } while(ln) { redisSortOperation *sop = ln->value; - robj *val = lookupKeyByPattern(c->dict,sop->pattern, + robj *val = lookupKeyByPattern(c->db,sop->pattern, vector[j].obj); if (sop->type == REDIS_SORT_GET) { @@ -2941,7 +3202,7 @@ static void infoCommand(redisClient *c) { "redis_version:%s\r\n" "connected_clients:%d\r\n" "connected_slaves:%d\r\n" - "used_memory:%d\r\n" + "used_memory:%zu\r\n" "changes_since_last_save:%lld\r\n" "last_save_time:%d\r\n" "total_connections_received:%lld\r\n" @@ -2964,6 +3225,98 @@ static void infoCommand(redisClient *c) { addReply(c,shared.crlf); } +static void monitorCommand(redisClient *c) { + /* ignore MONITOR if aleady slave or in monitor mode */ + if (c->flags & REDIS_SLAVE) return; + + c->flags |= (REDIS_SLAVE|REDIS_MONITOR); + c->slaveseldb = 0; + if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail"); + addReply(c,shared.ok); +} + +/* ================================= Expire ================================= */ +static int removeExpire(redisDb *db, robj *key) { + if (dictDelete(db->expires,key) == DICT_OK) { + return 1; + } else { + return 0; + } +} + +static int setExpire(redisDb *db, robj *key, time_t when) { + if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) { + return 0; + } else { + incrRefCount(key); + return 1; + } +} + +/* Return the expire time of the specified key, or -1 if no expire + * is associated with this key (i.e. the key is non volatile) */ +static time_t getExpire(redisDb *db, robj *key) { + dictEntry *de; + + /* No expire? return ASAP */ + if (dictSize(db->expires) == 0 || + (de = dictFind(db->expires,key)) == NULL) return -1; + + return (time_t) dictGetEntryVal(de); +} + +static int expireIfNeeded(redisDb *db, robj *key) { + time_t when; + dictEntry *de; + + /* No expire? return ASAP */ + if (dictSize(db->expires) == 0 || + (de = dictFind(db->expires,key)) == NULL) return 0; + + /* Lookup the expire */ + when = (time_t) dictGetEntryVal(de); + if (time(NULL) <= when) return 0; + + /* Delete the key */ + dictDelete(db->expires,key); + return dictDelete(db->dict,key) == DICT_OK; +} + +static int deleteIfVolatile(redisDb *db, robj *key) { + dictEntry *de; + + /* No expire? return ASAP */ + if (dictSize(db->expires) == 0 || + (de = dictFind(db->expires,key)) == NULL) return 0; + + /* Delete the key */ + server.dirty++; + dictDelete(db->expires,key); + return dictDelete(db->dict,key) == DICT_OK; +} + +static void expireCommand(redisClient *c) { + dictEntry *de; + int seconds = atoi(c->argv[2]->ptr); + + de = dictFind(c->db->dict,c->argv[1]); + if (de == NULL) { + addReply(c,shared.czero); + return; + } + if (seconds <= 0) { + addReply(c, shared.czero); + return; + } else { + time_t when = time(NULL)+seconds; + if (setExpire(c->db,c->argv[1],when)) + addReply(c,shared.cone); + else + addReply(c,shared.czero); + return; + } +} + /* =============================== Replication ============================= */ /* Send the whole output buffer syncronously to the slave. This a general operation in theory, but it is actually useful only for replication. */ @@ -2983,7 +3336,7 @@ static int flushClientOutput(redisClient *c) { return REDIS_OK; } -static int syncWrite(int fd, void *ptr, ssize_t size, int timeout) { +static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) { ssize_t nwritten, ret = size; time_t start = time(NULL); @@ -3003,7 +3356,7 @@ static int syncWrite(int fd, void *ptr, ssize_t size, int timeout) { return ret; } -static int syncRead(int fd, void *ptr, ssize_t size, int timeout) { +static int syncRead(int fd, char *ptr, ssize_t size, int timeout) { ssize_t nread, totread = 0; time_t start = time(NULL); @@ -3164,16 +3517,6 @@ static int syncWithMaster(void) { return REDIS_OK; } -static void monitorCommand(redisClient *c) { - /* ignore MONITOR if aleady slave or in monitor mode */ - if (c->flags & REDIS_SLAVE) return; - - c->flags |= (REDIS_SLAVE|REDIS_MONITOR); - c->slaveseldb = 0; - if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail"); - addReply(c,shared.ok); -} - /* =================================== Main! ================================ */ static void daemonize(void) {